#include "nbd.h"
#include "block.h"
-#include "block_int.h"
#include "qemu-coroutine.h"
/* That's all folks */
-#define read_sync(fd, buffer, size) nbd_wr_sync(fd, buffer, size, true)
-#define write_sync(fd, buffer, size) nbd_wr_sync(fd, buffer, size, false)
-
ssize_t nbd_wr_sync(int fd, void *buffer, size_t size, bool do_read)
{
size_t offset = 0;
err = socket_error();
/* recoverable error */
- if (err == EINTR || err == EAGAIN) {
+ if (err == EINTR || (offset > 0 && err == EAGAIN)) {
continue;
}
return offset;
}
+static ssize_t read_sync(int fd, void *buffer, size_t size)
+{
+ /* Sockets are kept in blocking mode in the negotiation phase. After
+ * that, a non-readable socket simply means that another thread stole
+ * our request/reply. Synchronization is done with recv_coroutine, so
+ * that this is coroutine-safe.
+ */
+ return nbd_wr_sync(fd, buffer, size, true);
+}
+
+static ssize_t write_sync(int fd, void *buffer, size_t size)
+{
+ int ret;
+ do {
+ /* For writes, we do expect the socket to be writable. */
+ ret = nbd_wr_sync(fd, buffer, size, false);
+ } while (ret == -EAGAIN);
+ return ret;
+}
+
static void combine_addr(char *buf, size_t len, const char* address,
uint16_t port)
{
int tcp_socket_outgoing_spec(const char *address_and_port)
{
- return inet_connect(address_and_port, SOCK_STREAM);
+ return inet_connect(address_and_port, true, NULL);
}
int tcp_socket_incoming(const char *address, uint16_t port)
{
char *ostr = NULL;
int olen = 0;
- return inet_listen(address_and_port, ostr, olen, SOCK_STREAM, 0);
+ return inet_listen(address_and_port, ostr, olen, SOCK_STREAM, 0, NULL);
}
int unix_socket_incoming(const char *path)
[28 .. 151] reserved (0)
*/
+ socket_set_block(csock);
rc = -EINVAL;
TRACE("Beginning negotiation.");
TRACE("Negotiation succeeded.");
rc = 0;
fail:
+ socket_set_nonblock(csock);
return rc;
}
TRACE("Receiving negotiation.");
+ socket_set_block(csock);
rc = -EINVAL;
if (read_sync(csock, buf, 8) != 8) {
rc = 0;
fail:
+ socket_set_nonblock(csock);
return rc;
}
exp->bs = bs;
exp->dev_offset = dev_offset;
exp->nbdflags = nbdflags;
- exp->size = size == -1 ? exp->bs->total_sectors * 512 : size;
+ exp->size = size == -1 ? bdrv_getlength(bs) : size;
return exp;
}
ssize_t rc;
client->recv_coroutine = qemu_coroutine_self();
- if (nbd_receive_request(csock, request) < 0) {
- rc = -EIO;
+ rc = nbd_receive_request(csock, request);
+ if (rc < 0) {
+ if (rc != -EAGAIN) {
+ rc = -EIO;
+ }
goto out;
}
TRACE("Reading request.");
ret = nbd_co_receive_request(req, &request);
+ if (ret == -EAGAIN) {
+ goto done;
+ }
if (ret == -EIO) {
goto out;
}
case NBD_CMD_READ:
TRACE("Request type is READ");
+ if (request.type & NBD_CMD_FLAG_FUA) {
+ ret = bdrv_co_flush(exp->bs);
+ if (ret < 0) {
+ LOG("flush failed");
+ reply.error = -ret;
+ goto error_reply;
+ }
+ }
+
ret = bdrv_read(exp->bs, (request.from + exp->dev_offset) / 512,
req->data, request.len / 512);
if (ret < 0) {
TRACE("Request/Reply complete");
+done:
nbd_request_put(req);
return;