#include "nbd.h"
#include "block.h"
-#include "block_int.h"
#include "qemu-coroutine.h"
/* That's all folks */
-#define read_sync(fd, buffer, size) nbd_wr_sync(fd, buffer, size, true)
-#define write_sync(fd, buffer, size) nbd_wr_sync(fd, buffer, size, false)
-
-size_t nbd_wr_sync(int fd, void *buffer, size_t size, bool do_read)
+ssize_t nbd_wr_sync(int fd, void *buffer, size_t size, bool do_read)
{
size_t offset = 0;
+ int err;
if (qemu_in_coroutine()) {
if (do_read) {
len = send(fd, buffer + offset, size - offset, 0);
}
- if (len == -1)
- errno = socket_error();
+ if (len < 0) {
+ err = socket_error();
+
+ /* recoverable error */
+ if (err == EINTR || (offset > 0 && err == EAGAIN)) {
+ continue;
+ }
- /* recoverable error */
- if (len == -1 && (errno == EAGAIN || errno == EINTR)) {
- continue;
+ /* unrecoverable error */
+ return -err;
}
/* eof */
break;
}
- /* unrecoverable error */
- if (len == -1) {
- return 0;
- }
-
offset += len;
}
return offset;
}
+static ssize_t read_sync(int fd, void *buffer, size_t size)
+{
+ /* Sockets are kept in blocking mode in the negotiation phase. After
+ * that, a non-readable socket simply means that another thread stole
+ * our request/reply. Synchronization is done with recv_coroutine, so
+ * that this is coroutine-safe.
+ */
+ return nbd_wr_sync(fd, buffer, size, true);
+}
+
+static ssize_t write_sync(int fd, void *buffer, size_t size)
+{
+ int ret;
+ do {
+ /* For writes, we do expect the socket to be writable. */
+ ret = nbd_wr_sync(fd, buffer, size, false);
+ } while (ret == -EAGAIN);
+ return ret;
+}
+
static void combine_addr(char *buf, size_t len, const char* address,
uint16_t port)
{
int tcp_socket_outgoing_spec(const char *address_and_port)
{
- return inet_connect(address_and_port, SOCK_STREAM);
+ return inet_connect(address_and_port, true, NULL);
}
int tcp_socket_incoming(const char *address, uint16_t port)
{
char *ostr = NULL;
int olen = 0;
- return inet_listen(address_and_port, ostr, olen, SOCK_STREAM, 0);
+ return inet_listen(address_and_port, ostr, olen, SOCK_STREAM, 0, NULL);
}
int unix_socket_incoming(const char *path)
static int nbd_send_negotiate(int csock, off_t size, uint32_t flags)
{
char buf[8 + 8 + 8 + 128];
+ int rc;
/* Negotiate
[ 0 .. 7] passwd ("NBDMAGIC")
[28 .. 151] reserved (0)
*/
+ socket_set_block(csock);
+ rc = -EINVAL;
+
TRACE("Beginning negotiation.");
memcpy(buf, "NBDMAGIC", 8);
cpu_to_be64w((uint64_t*)(buf + 8), 0x00420281861253LL);
if (write_sync(csock, buf, sizeof(buf)) != sizeof(buf)) {
LOG("write failed");
- errno = EINVAL;
- return -1;
+ goto fail;
}
TRACE("Negotiation succeeded.");
-
- return 0;
+ rc = 0;
+fail:
+ socket_set_nonblock(csock);
+ return rc;
}
int nbd_receive_negotiate(int csock, const char *name, uint32_t *flags,
char buf[256];
uint64_t magic, s;
uint16_t tmp;
+ int rc;
TRACE("Receiving negotiation.");
+ socket_set_block(csock);
+ rc = -EINVAL;
+
if (read_sync(csock, buf, 8) != 8) {
LOG("read failed");
- errno = EINVAL;
- return -1;
+ goto fail;
}
buf[8] = '\0';
if (strlen(buf) == 0) {
LOG("server connection closed");
- errno = EINVAL;
- return -1;
+ goto fail;
}
TRACE("Magic is %c%c%c%c%c%c%c%c",
if (memcmp(buf, "NBDMAGIC", 8) != 0) {
LOG("Invalid magic received");
- errno = EINVAL;
- return -1;
+ goto fail;
}
if (read_sync(csock, &magic, sizeof(magic)) != sizeof(magic)) {
LOG("read failed");
- errno = EINVAL;
- return -1;
+ goto fail;
}
magic = be64_to_cpu(magic);
TRACE("Magic is 0x%" PRIx64, magic);
TRACE("Checking magic (opts_magic)");
if (magic != 0x49484156454F5054LL) {
LOG("Bad magic received");
- errno = EINVAL;
- return -1;
+ goto fail;
}
if (read_sync(csock, &tmp, sizeof(tmp)) != sizeof(tmp)) {
LOG("flags read failed");
- errno = EINVAL;
- return -1;
+ goto fail;
}
*flags = be16_to_cpu(tmp) << 16;
/* reserved for future use */
if (write_sync(csock, &reserved, sizeof(reserved)) !=
sizeof(reserved)) {
LOG("write failed (reserved)");
- errno = EINVAL;
- return -1;
+ goto fail;
}
/* write the export name */
magic = cpu_to_be64(magic);
if (write_sync(csock, &magic, sizeof(magic)) != sizeof(magic)) {
LOG("write failed (magic)");
- errno = EINVAL;
- return -1;
+ goto fail;
}
opt = cpu_to_be32(NBD_OPT_EXPORT_NAME);
if (write_sync(csock, &opt, sizeof(opt)) != sizeof(opt)) {
LOG("write failed (opt)");
- errno = EINVAL;
- return -1;
+ goto fail;
}
namesize = cpu_to_be32(strlen(name));
if (write_sync(csock, &namesize, sizeof(namesize)) !=
sizeof(namesize)) {
LOG("write failed (namesize)");
- errno = EINVAL;
- return -1;
+ goto fail;
}
if (write_sync(csock, (char*)name, strlen(name)) != strlen(name)) {
LOG("write failed (name)");
- errno = EINVAL;
- return -1;
+ goto fail;
}
} else {
TRACE("Checking magic (cli_magic)");
if (magic != 0x00420281861253LL) {
LOG("Bad magic received");
- errno = EINVAL;
- return -1;
+ goto fail;
}
}
if (read_sync(csock, &s, sizeof(s)) != sizeof(s)) {
LOG("read failed");
- errno = EINVAL;
- return -1;
+ goto fail;
}
*size = be64_to_cpu(s);
*blocksize = 1024;
if (!name) {
if (read_sync(csock, flags, sizeof(*flags)) != sizeof(*flags)) {
LOG("read failed (flags)");
- errno = EINVAL;
- return -1;
+ goto fail;
}
*flags = be32_to_cpup(flags);
} else {
if (read_sync(csock, &tmp, sizeof(tmp)) != sizeof(tmp)) {
LOG("read failed (tmp)");
- errno = EINVAL;
- return -1;
+ goto fail;
}
*flags |= be32_to_cpu(tmp);
}
if (read_sync(csock, &buf, 124) != 124) {
LOG("read failed (buf)");
- errno = EINVAL;
- return -1;
+ goto fail;
}
- return 0;
+ rc = 0;
+
+fail:
+ socket_set_nonblock(csock);
+ return rc;
}
#ifdef __linux__
{
TRACE("Setting NBD socket");
- if (ioctl(fd, NBD_SET_SOCK, csock) == -1) {
+ if (ioctl(fd, NBD_SET_SOCK, csock) < 0) {
int serrno = errno;
LOG("Failed to set NBD socket");
- errno = serrno;
- return -1;
+ return -serrno;
}
TRACE("Setting block size to %lu", (unsigned long)blocksize);
- if (ioctl(fd, NBD_SET_BLKSIZE, blocksize) == -1) {
+ if (ioctl(fd, NBD_SET_BLKSIZE, blocksize) < 0) {
int serrno = errno;
LOG("Failed setting NBD block size");
- errno = serrno;
- return -1;
+ return -serrno;
}
TRACE("Setting size to %zd block(s)", (size_t)(size / blocksize));
- if (ioctl(fd, NBD_SET_SIZE_BLOCKS, size / blocksize) == -1) {
+ if (ioctl(fd, NBD_SET_SIZE_BLOCKS, size / blocksize) < 0) {
int serrno = errno;
LOG("Failed setting size (in blocks)");
- errno = serrno;
- return -1;
+ return -serrno;
}
if (flags & NBD_FLAG_READ_ONLY) {
if (ioctl(fd, BLKROSET, (unsigned long) &read_only) < 0) {
int serrno = errno;
LOG("Failed setting read-only attribute");
- errno = serrno;
- return -1;
+ return -serrno;
}
}
&& errno != ENOTTY) {
int serrno = errno;
LOG("Failed setting flags");
- errno = serrno;
- return -1;
+ return -serrno;
}
TRACE("Negotiation ended");
TRACE("Doing NBD loop");
ret = ioctl(fd, NBD_DO_IT);
- if (ret == -1 && errno == EPIPE) {
+ if (ret < 0 && errno == EPIPE) {
/* NBD_DO_IT normally returns EPIPE when someone has disconnected
* the socket via NBD_DISCONNECT. We do not want to return 1 in
* that case.
#else
int nbd_init(int fd, int csock, uint32_t flags, off_t size, size_t blocksize)
{
- errno = ENOTSUP;
- return -1;
+ return -ENOTSUP;
}
int nbd_disconnect(int fd)
{
- errno = ENOTSUP;
- return -1;
+ return -ENOTSUP;
}
int nbd_client(int fd)
{
- errno = ENOTSUP;
- return -1;
+ return -ENOTSUP;
}
#endif
-int nbd_send_request(int csock, struct nbd_request *request)
+ssize_t nbd_send_request(int csock, struct nbd_request *request)
{
uint8_t buf[4 + 4 + 8 + 8 + 4];
+ ssize_t ret;
cpu_to_be32w((uint32_t*)buf, NBD_REQUEST_MAGIC);
cpu_to_be32w((uint32_t*)(buf + 4), request->type);
"{ .from = %" PRIu64", .len = %u, .handle = %" PRIu64", .type=%i}",
request->from, request->len, request->handle, request->type);
- if (write_sync(csock, buf, sizeof(buf)) != sizeof(buf)) {
+ ret = write_sync(csock, buf, sizeof(buf));
+ if (ret < 0) {
+ return ret;
+ }
+
+ if (ret != sizeof(buf)) {
LOG("writing to socket failed");
- errno = EINVAL;
- return -1;
+ return -EINVAL;
}
return 0;
}
-static int nbd_receive_request(int csock, struct nbd_request *request)
+static ssize_t nbd_receive_request(int csock, struct nbd_request *request)
{
uint8_t buf[4 + 4 + 8 + 8 + 4];
uint32_t magic;
+ ssize_t ret;
+
+ ret = read_sync(csock, buf, sizeof(buf));
+ if (ret < 0) {
+ return ret;
+ }
- if (read_sync(csock, buf, sizeof(buf)) != sizeof(buf)) {
+ if (ret != sizeof(buf)) {
LOG("read failed");
- errno = EINVAL;
- return -1;
+ return -EINVAL;
}
/* Request
if (magic != NBD_REQUEST_MAGIC) {
LOG("invalid magic (got 0x%x)", magic);
- errno = EINVAL;
- return -1;
+ return -EINVAL;
}
return 0;
}
-int nbd_receive_reply(int csock, struct nbd_reply *reply)
+ssize_t nbd_receive_reply(int csock, struct nbd_reply *reply)
{
uint8_t buf[NBD_REPLY_SIZE];
uint32_t magic;
+ ssize_t ret;
- memset(buf, 0xAA, sizeof(buf));
+ ret = read_sync(csock, buf, sizeof(buf));
+ if (ret < 0) {
+ return ret;
+ }
- if (read_sync(csock, buf, sizeof(buf)) != sizeof(buf)) {
+ if (ret != sizeof(buf)) {
LOG("read failed");
- errno = EINVAL;
- return -1;
+ return -EINVAL;
}
/* Reply
if (magic != NBD_REPLY_MAGIC) {
LOG("invalid magic (got 0x%x)", magic);
- errno = EINVAL;
- return -1;
+ return -EINVAL;
}
return 0;
}
-static int nbd_send_reply(int csock, struct nbd_reply *reply)
+static ssize_t nbd_send_reply(int csock, struct nbd_reply *reply)
{
uint8_t buf[4 + 4 + 8];
+ ssize_t ret;
/* Reply
[ 0 .. 3] magic (NBD_REPLY_MAGIC)
TRACE("Sending response to client");
- if (write_sync(csock, buf, sizeof(buf)) != sizeof(buf)) {
+ ret = write_sync(csock, buf, sizeof(buf));
+ if (ret < 0) {
+ return ret;
+ }
+
+ if (ret != sizeof(buf)) {
LOG("writing to socket failed");
- errno = EINVAL;
- return -1;
+ return -EINVAL;
}
return 0;
}
+#define MAX_NBD_REQUESTS 16
+
typedef struct NBDRequest NBDRequest;
struct NBDRequest {
CoMutex send_lock;
Coroutine *send_coroutine;
+
+ int nb_requests;
};
static void nbd_client_get(NBDClient *client)
NBDRequest *req;
NBDExport *exp = client->exp;
+ assert(client->nb_requests <= MAX_NBD_REQUESTS - 1);
+ client->nb_requests++;
+
if (QSIMPLEQ_EMPTY(&exp->requests)) {
req = g_malloc0(sizeof(NBDRequest));
req->data = qemu_blockalign(exp->bs, NBD_BUFFER_SIZE);
{
NBDClient *client = req->client;
QSIMPLEQ_INSERT_HEAD(&client->exp->requests, req, entry);
+ if (client->nb_requests-- == MAX_NBD_REQUESTS) {
+ qemu_notify_event();
+ }
nbd_client_put(client);
}
exp->bs = bs;
exp->dev_offset = dev_offset;
exp->nbdflags = nbdflags;
- exp->size = size == -1 ? exp->bs->total_sectors * 512 : size;
+ exp->size = size == -1 ? bdrv_getlength(bs) : size;
return exp;
}
g_free(exp);
}
+static int nbd_can_read(void *opaque);
static void nbd_read(void *opaque);
static void nbd_restart_write(void *opaque);
-static int nbd_co_send_reply(NBDRequest *req, struct nbd_reply *reply,
- int len)
+static ssize_t nbd_co_send_reply(NBDRequest *req, struct nbd_reply *reply,
+ int len)
{
NBDClient *client = req->client;
int csock = client->sock;
- int rc, ret;
+ ssize_t rc, ret;
qemu_co_mutex_lock(&client->send_lock);
- qemu_set_fd_handler2(csock, NULL, nbd_read, nbd_restart_write, client);
+ qemu_set_fd_handler2(csock, nbd_can_read, nbd_read,
+ nbd_restart_write, client);
client->send_coroutine = qemu_coroutine_self();
if (!len) {
rc = nbd_send_reply(csock, reply);
- if (rc == -1) {
- rc = -errno;
- }
} else {
socket_set_cork(csock, 1);
rc = nbd_send_reply(csock, reply);
- if (rc != -1) {
+ if (rc >= 0) {
ret = qemu_co_send(csock, req->data, len);
if (ret != len) {
- errno = EIO;
- rc = -1;
+ rc = -EIO;
}
}
- if (rc == -1) {
- rc = -errno;
- }
socket_set_cork(csock, 0);
}
client->send_coroutine = NULL;
- qemu_set_fd_handler2(csock, NULL, nbd_read, NULL, client);
+ qemu_set_fd_handler2(csock, nbd_can_read, nbd_read, NULL, client);
qemu_co_mutex_unlock(&client->send_lock);
return rc;
}
-static int nbd_co_receive_request(NBDRequest *req, struct nbd_request *request)
+static ssize_t nbd_co_receive_request(NBDRequest *req, struct nbd_request *request)
{
NBDClient *client = req->client;
int csock = client->sock;
- int rc;
+ ssize_t rc;
client->recv_coroutine = qemu_coroutine_self();
- if (nbd_receive_request(csock, request) == -1) {
- rc = -EIO;
+ rc = nbd_receive_request(csock, request);
+ if (rc < 0) {
+ if (rc != -EAGAIN) {
+ rc = -EIO;
+ }
goto out;
}
NBDExport *exp = client->exp;
struct nbd_request request;
struct nbd_reply reply;
- int ret;
+ ssize_t ret;
TRACE("Reading request.");
ret = nbd_co_receive_request(req, &request);
+ if (ret == -EAGAIN) {
+ goto done;
+ }
if (ret == -EIO) {
goto out;
}
LOG("From: %" PRIu64 ", Len: %u, Size: %" PRIu64
", Offset: %" PRIu64 "\n",
request.from, request.len,
- (uint64_t)exp->size, exp->dev_offset);
+ (uint64_t)exp->size, (uint64_t)exp->dev_offset);
LOG("requested operation past EOF--bad client?");
goto invalid_request;
}
case NBD_CMD_READ:
TRACE("Request type is READ");
+ if (request.type & NBD_CMD_FLAG_FUA) {
+ ret = bdrv_co_flush(exp->bs);
+ if (ret < 0) {
+ LOG("flush failed");
+ reply.error = -ret;
+ goto error_reply;
+ }
+ }
+
ret = bdrv_read(exp->bs, (request.from + exp->dev_offset) / 512,
req->data, request.len / 512);
if (ret < 0) {
}
}
- if (nbd_co_send_reply(req, &reply, 0) < 0)
+ if (nbd_co_send_reply(req, &reply, 0) < 0) {
goto out;
+ }
break;
case NBD_CMD_DISC:
TRACE("Request type is DISCONNECT");
LOG("flush failed");
reply.error = -ret;
}
-
- if (nbd_co_send_reply(req, &reply, 0) < 0)
+ if (nbd_co_send_reply(req, &reply, 0) < 0) {
goto out;
+ }
break;
case NBD_CMD_TRIM:
TRACE("Request type is TRIM");
LOG("discard failed");
reply.error = -ret;
}
- if (nbd_co_send_reply(req, &reply, 0) < 0)
+ if (nbd_co_send_reply(req, &reply, 0) < 0) {
goto out;
+ }
break;
default:
LOG("invalid request type (%u) received", request.type);
invalid_request:
reply.error = -EINVAL;
error_reply:
- if (nbd_co_send_reply(req, &reply, 0) == -1)
+ if (nbd_co_send_reply(req, &reply, 0) < 0) {
goto out;
+ }
break;
}
TRACE("Request/Reply complete");
+done:
nbd_request_put(req);
return;
nbd_client_close(client);
}
+static int nbd_can_read(void *opaque)
+{
+ NBDClient *client = opaque;
+
+ return client->recv_coroutine || client->nb_requests < MAX_NBD_REQUESTS;
+}
+
static void nbd_read(void *opaque)
{
NBDClient *client = opaque;
void (*close)(NBDClient *))
{
NBDClient *client;
- if (nbd_send_negotiate(csock, exp->size, exp->nbdflags) == -1) {
+ if (nbd_send_negotiate(csock, exp->size, exp->nbdflags) < 0) {
return NULL;
}
client = g_malloc0(sizeof(NBDClient));
client->sock = csock;
client->close = close;
qemu_co_mutex_init(&client->send_lock);
- qemu_set_fd_handler2(csock, NULL, nbd_read, NULL, client);
+ qemu_set_fd_handler2(csock, nbd_can_read, nbd_read, NULL, client);
return client;
}