X-Git-Url: https://repo.jachan.dev/qemu.git/blobdiff_plain/938406dfb11d8a40d9228b3596d49a583d7218ff..96f8f23a1e900796494a54e8b56610e1a7db2a89:/nbd.c diff --git a/nbd.c b/nbd.c index 6f0db62deb..a7bce45117 100644 --- a/nbd.c +++ b/nbd.c @@ -16,10 +16,11 @@ * along with this program; if not, see . */ -#include "nbd.h" -#include "block.h" +#include "block/nbd.h" +#include "block/block.h" +#include "block/block_int.h" -#include "qemu-coroutine.h" +#include "block/coroutine.h" #include #include @@ -36,8 +37,9 @@ #include #endif -#include "qemu_socket.h" -#include "qemu-queue.h" +#include "qemu/sockets.h" +#include "qemu/queue.h" +#include "qemu/main-loop.h" //#define DEBUG_NBD @@ -55,7 +57,11 @@ __FILE__, __FUNCTION__, __LINE__, ## __VA_ARGS__); \ } while(0) -/* This is all part of the "official" NBD API */ +/* This is all part of the "official" NBD API. + * + * The most up-to-date documentation is available at: + * https://github.com/yoe/nbd/blob/master/doc/proto.txt + */ #define NBD_REQUEST_SIZE (4 + 4 + 8 + 8 + 4) #define NBD_REPLY_SIZE (4 + 4 + 8) @@ -63,6 +69,7 @@ #define NBD_REPLY_MAGIC 0x67446698 #define NBD_OPTS_MAGIC 0x49484156454F5054LL #define NBD_CLIENT_MAGIC 0x0000420281861253LL +#define NBD_REP_MAGIC 0x3e889045565a9LL #define NBD_SET_SOCK _IO(0xab, 0) #define NBD_SET_BLKSIZE _IO(0xab, 1) @@ -76,7 +83,9 @@ #define NBD_SET_TIMEOUT _IO(0xab, 9) #define NBD_SET_FLAGS _IO(0xab, 10) -#define NBD_OPT_EXPORT_NAME (1 << 0) +#define NBD_OPT_EXPORT_NAME (1) +#define NBD_OPT_ABORT (2) +#define NBD_OPT_LIST (3) /* Definitions for opaque data types */ @@ -98,8 +107,9 @@ struct NBDExport { off_t size; uint32_t nbdflags; QTAILQ_HEAD(, NBDClient) clients; - QSIMPLEQ_HEAD(, NBDRequest) requests; QTAILQ_ENTRY(NBDExport) next; + + AioContext *ctx; }; static QTAILQ_HEAD(, NBDExport) exports = QTAILQ_HEAD_INITIALIZER(exports); @@ -116,6 +126,8 @@ struct NBDClient { CoMutex send_lock; Coroutine *send_coroutine; + bool can_read; + QTAILQ_ENTRY(NBDClient) next; int nb_requests; bool closing; @@ -123,6 +135,10 @@ struct NBDClient { /* That's all folks */ +static void nbd_set_handlers(NBDClient *client); +static void nbd_unset_handlers(NBDClient *client); +static void nbd_update_can_read(NBDClient *client); + ssize_t nbd_wr_sync(int fd, void *buffer, size_t size, bool do_read) { size_t offset = 0; @@ -149,7 +165,7 @@ ssize_t nbd_wr_sync(int fd, void *buffer, size_t size, bool do_read) err = socket_error(); /* recoverable error */ - if (err == EINTR || (offset > 0 && err == EAGAIN)) { + if (err == EINTR || (offset > 0 && (err == EAGAIN || err == EWOULDBLOCK))) { continue; } @@ -188,56 +204,6 @@ static ssize_t write_sync(int fd, void *buffer, size_t size) return ret; } -static void combine_addr(char *buf, size_t len, const char* address, - uint16_t port) -{ - /* If the address-part contains a colon, it's an IPv6 IP so needs [] */ - if (strstr(address, ":")) { - snprintf(buf, len, "[%s]:%u", address, port); - } else { - snprintf(buf, len, "%s:%u", address, port); - } -} - -int tcp_socket_outgoing(const char *address, uint16_t port) -{ - char address_and_port[128]; - combine_addr(address_and_port, 128, address, port); - return tcp_socket_outgoing_spec(address_and_port); -} - -int tcp_socket_outgoing_spec(const char *address_and_port) -{ - return inet_connect(address_and_port, NULL); -} - -int tcp_socket_incoming(const char *address, uint16_t port) -{ - char address_and_port[128]; - combine_addr(address_and_port, 128, address, port); - return tcp_socket_incoming_spec(address_and_port); -} - -int tcp_socket_incoming_spec(const char *address_and_port) -{ - char *ostr = NULL; - int olen = 0; - return inet_listen(address_and_port, ostr, olen, SOCK_STREAM, 0, NULL); -} - -int unix_socket_incoming(const char *path) -{ - char *ostr = NULL; - int olen = 0; - - return unix_listen(path, ostr, olen); -} - -int unix_socket_outgoing(const char *path) -{ - return unix_connect(path); -} - /* Basic flow for negotiation Server Client @@ -265,59 +231,101 @@ int unix_socket_outgoing(const char *path) */ -static int nbd_receive_options(NBDClient *client) +static int nbd_send_rep(int csock, uint32_t type, uint32_t opt) { - int csock = client->sock; - char name[256]; - uint32_t tmp, length; uint64_t magic; - int rc; - - /* Client sends: - [ 0 .. 3] reserved (0) - [ 4 .. 11] NBD_OPTS_MAGIC - [12 .. 15] NBD_OPT_EXPORT_NAME - [16 .. 19] length - [20 .. xx] export name (length bytes) - */ + uint32_t len; - rc = -EINVAL; - if (read_sync(csock, &tmp, sizeof(tmp)) != sizeof(tmp)) { - LOG("read failed"); - goto fail; + magic = cpu_to_be64(NBD_REP_MAGIC); + if (write_sync(csock, &magic, sizeof(magic)) != sizeof(magic)) { + LOG("write failed (rep magic)"); + return -EINVAL; } - TRACE("Checking reserved"); - if (tmp != 0) { - LOG("Bad reserved received"); - goto fail; + opt = cpu_to_be32(opt); + if (write_sync(csock, &opt, sizeof(opt)) != sizeof(opt)) { + LOG("write failed (rep opt)"); + return -EINVAL; } - - if (read_sync(csock, &magic, sizeof(magic)) != sizeof(magic)) { - LOG("read failed"); - goto fail; + type = cpu_to_be32(type); + if (write_sync(csock, &type, sizeof(type)) != sizeof(type)) { + LOG("write failed (rep type)"); + return -EINVAL; } - TRACE("Checking reserved"); - if (magic != be64_to_cpu(NBD_OPTS_MAGIC)) { - LOG("Bad magic received"); - goto fail; + len = cpu_to_be32(0); + if (write_sync(csock, &len, sizeof(len)) != sizeof(len)) { + LOG("write failed (rep data length)"); + return -EINVAL; } + return 0; +} - if (read_sync(csock, &tmp, sizeof(tmp)) != sizeof(tmp)) { - LOG("read failed"); - goto fail; +static int nbd_send_rep_list(int csock, NBDExport *exp) +{ + uint64_t magic, name_len; + uint32_t opt, type, len; + + name_len = strlen(exp->name); + magic = cpu_to_be64(NBD_REP_MAGIC); + if (write_sync(csock, &magic, sizeof(magic)) != sizeof(magic)) { + LOG("write failed (magic)"); + return -EINVAL; + } + opt = cpu_to_be32(NBD_OPT_LIST); + if (write_sync(csock, &opt, sizeof(opt)) != sizeof(opt)) { + LOG("write failed (opt)"); + return -EINVAL; } - TRACE("Checking option"); - if (tmp != be32_to_cpu(NBD_OPT_EXPORT_NAME)) { - LOG("Bad option received"); - goto fail; + type = cpu_to_be32(NBD_REP_SERVER); + if (write_sync(csock, &type, sizeof(type)) != sizeof(type)) { + LOG("write failed (reply type)"); + return -EINVAL; + } + len = cpu_to_be32(name_len + sizeof(len)); + if (write_sync(csock, &len, sizeof(len)) != sizeof(len)) { + LOG("write failed (length)"); + return -EINVAL; + } + len = cpu_to_be32(name_len); + if (write_sync(csock, &len, sizeof(len)) != sizeof(len)) { + LOG("write failed (length)"); + return -EINVAL; + } + if (write_sync(csock, exp->name, name_len) != name_len) { + LOG("write failed (buffer)"); + return -EINVAL; } + return 0; +} - if (read_sync(csock, &length, sizeof(length)) != sizeof(length)) { - LOG("read failed"); - goto fail; +static int nbd_handle_list(NBDClient *client, uint32_t length) +{ + int csock; + NBDExport *exp; + + csock = client->sock; + if (length) { + return nbd_send_rep(csock, NBD_REP_ERR_INVALID, NBD_OPT_LIST); } + + /* For each export, send a NBD_REP_SERVER reply. */ + QTAILQ_FOREACH(exp, &exports, next) { + if (nbd_send_rep_list(csock, exp)) { + return -EINVAL; + } + } + /* Finish with a NBD_REP_ACK. */ + return nbd_send_rep(csock, NBD_REP_ACK, NBD_OPT_LIST); +} + +static int nbd_handle_export_name(NBDClient *client, uint32_t length) +{ + int rc = -EINVAL, csock = client->sock; + char name[256]; + + /* Client sends: + [20 .. xx] export name (length bytes) + */ TRACE("Checking length"); - length = be32_to_cpu(length); if (length > 255) { LOG("Bad length received"); goto fail; @@ -336,13 +344,81 @@ static int nbd_receive_options(NBDClient *client) QTAILQ_INSERT_TAIL(&client->exp->clients, client, next); nbd_export_get(client->exp); - - TRACE("Option negotiation succeeded."); rc = 0; fail: return rc; } +static int nbd_receive_options(NBDClient *client) +{ + while (1) { + int csock = client->sock; + uint32_t tmp, length; + uint64_t magic; + + /* Client sends: + [ 0 .. 3] client flags + [ 4 .. 11] NBD_OPTS_MAGIC + [12 .. 15] NBD option + [16 .. 19] length + ... Rest of request + */ + + if (read_sync(csock, &tmp, sizeof(tmp)) != sizeof(tmp)) { + LOG("read failed"); + return -EINVAL; + } + TRACE("Checking client flags"); + tmp = be32_to_cpu(tmp); + if (tmp != 0 && tmp != NBD_FLAG_C_FIXED_NEWSTYLE) { + LOG("Bad client flags received"); + return -EINVAL; + } + + if (read_sync(csock, &magic, sizeof(magic)) != sizeof(magic)) { + LOG("read failed"); + return -EINVAL; + } + TRACE("Checking opts magic"); + if (magic != be64_to_cpu(NBD_OPTS_MAGIC)) { + LOG("Bad magic received"); + return -EINVAL; + } + + if (read_sync(csock, &tmp, sizeof(tmp)) != sizeof(tmp)) { + LOG("read failed"); + return -EINVAL; + } + + if (read_sync(csock, &length, sizeof(length)) != sizeof(length)) { + LOG("read failed"); + return -EINVAL; + } + length = be32_to_cpu(length); + + TRACE("Checking option"); + switch (be32_to_cpu(tmp)) { + case NBD_OPT_LIST: + if (nbd_handle_list(client, length) < 0) { + return 1; + } + break; + + case NBD_OPT_ABORT: + return -EINVAL; + + case NBD_OPT_EXPORT_NAME: + return nbd_handle_export_name(client, length); + + default: + tmp = be32_to_cpu(tmp); + LOG("Unsupported option 0x%x", tmp); + nbd_send_rep(client->sock, NBD_REP_ERR_UNSUP, tmp); + return -EINVAL; + } + } +} + static int nbd_send_negotiate(NBDClient *client) { int csock = client->sock; @@ -356,7 +432,7 @@ static int nbd_send_negotiate(NBDClient *client) [ 8 .. 15] magic (NBD_CLIENT_MAGIC) [16 .. 23] size [24 .. 25] server flags (0) - [24 .. 27] export flags + [26 .. 27] export flags [28 .. 151] reserved (0) Negotiation header with options, part 1: @@ -370,10 +446,11 @@ static int nbd_send_negotiate(NBDClient *client) [28 .. 151] reserved (0) */ - socket_set_block(csock); + qemu_set_block(csock); rc = -EINVAL; TRACE("Beginning negotiation."); + memset(buf, 0, sizeof(buf)); memcpy(buf, "NBDMAGIC", 8); if (client->exp) { assert ((client->exp->nbdflags & ~65535) == 0); @@ -382,8 +459,8 @@ static int nbd_send_negotiate(NBDClient *client) cpu_to_be16w((uint16_t*)(buf + 26), client->exp->nbdflags | myflags); } else { cpu_to_be64w((uint64_t*)(buf + 8), NBD_OPTS_MAGIC); + cpu_to_be16w((uint16_t *)(buf + 16), NBD_FLAG_FIXED_NEWSTYLE); } - memset(buf + 28, 0, 124); if (client->exp) { if (write_sync(csock, buf, sizeof(buf)) != sizeof(buf)) { @@ -396,7 +473,7 @@ static int nbd_send_negotiate(NBDClient *client) goto fail; } rc = nbd_receive_options(client); - if (rc < 0) { + if (rc != 0) { LOG("option negotiation failed"); goto fail; } @@ -413,7 +490,7 @@ static int nbd_send_negotiate(NBDClient *client) TRACE("Negotiation succeeded."); rc = 0; fail: - socket_set_nonblock(csock); + qemu_set_nonblock(csock); return rc; } @@ -427,7 +504,6 @@ int nbd_receive_negotiate(int csock, const char *name, uint32_t *flags, TRACE("Receiving negotiation."); - socket_set_block(csock); rc = -EINVAL; if (read_sync(csock, buf, 8) != 8) { @@ -542,7 +618,6 @@ int nbd_receive_negotiate(int csock, const char *name, uint32_t *flags, rc = 0; fail: - socket_set_nonblock(csock); return rc; } @@ -573,24 +648,23 @@ int nbd_init(int fd, int csock, uint32_t flags, off_t size, size_t blocksize) return -serrno; } - if (flags & NBD_FLAG_READ_ONLY) { - int read_only = 1; - TRACE("Setting readonly attribute"); + if (ioctl(fd, NBD_SET_FLAGS, flags) < 0) { + if (errno == ENOTTY) { + int read_only = (flags & NBD_FLAG_READ_ONLY) != 0; + TRACE("Setting readonly attribute"); - if (ioctl(fd, BLKROSET, (unsigned long) &read_only) < 0) { + if (ioctl(fd, BLKROSET, (unsigned long) &read_only) < 0) { + int serrno = errno; + LOG("Failed setting read-only attribute"); + return -serrno; + } + } else { int serrno = errno; - LOG("Failed setting read-only attribute"); + LOG("Failed setting flags"); return -serrno; } } - if (ioctl(fd, NBD_SET_FLAGS, flags) < 0 - && errno != ENOTTY) { - int serrno = errno; - LOG("Failed setting flags"); - return -serrno; - } - TRACE("Negotiation ended"); return 0; @@ -797,7 +871,7 @@ void nbd_client_put(NBDClient *client) */ assert(client->closing); - qemu_set_fd_handler2(client->sock, NULL, NULL, NULL, NULL); + nbd_unset_handlers(client); close(client->sock); client->sock = -1; if (client->exp) { @@ -830,18 +904,12 @@ void nbd_client_close(NBDClient *client) static NBDRequest *nbd_request_get(NBDClient *client) { NBDRequest *req; - NBDExport *exp = client->exp; assert(client->nb_requests <= MAX_NBD_REQUESTS - 1); client->nb_requests++; + nbd_update_can_read(client); - if (QSIMPLEQ_EMPTY(&exp->requests)) { - req = g_malloc0(sizeof(NBDRequest)); - req->data = qemu_blockalign(exp->bs, NBD_BUFFER_SIZE); - } else { - req = QSIMPLEQ_FIRST(&exp->requests); - QSIMPLEQ_REMOVE_HEAD(&exp->requests, entry); - } + req = g_slice_new0(NBDRequest); nbd_client_get(client); req->client = client; return req; @@ -850,19 +918,50 @@ static NBDRequest *nbd_request_get(NBDClient *client) static void nbd_request_put(NBDRequest *req) { NBDClient *client = req->client; - QSIMPLEQ_INSERT_HEAD(&client->exp->requests, req, entry); - if (client->nb_requests-- == MAX_NBD_REQUESTS) { - qemu_notify_event(); + + if (req->data) { + qemu_vfree(req->data); } + g_slice_free(NBDRequest, req); + + client->nb_requests--; + nbd_update_can_read(client); nbd_client_put(client); } +static void bs_aio_attached(AioContext *ctx, void *opaque) +{ + NBDExport *exp = opaque; + NBDClient *client; + + TRACE("Export %s: Attaching clients to AIO context %p\n", exp->name, ctx); + + exp->ctx = ctx; + + QTAILQ_FOREACH(client, &exp->clients, next) { + nbd_set_handlers(client); + } +} + +static void bs_aio_detach(void *opaque) +{ + NBDExport *exp = opaque; + NBDClient *client; + + TRACE("Export %s: Detaching clients from AIO context %p\n", exp->name, exp->ctx); + + QTAILQ_FOREACH(client, &exp->clients, next) { + nbd_unset_handlers(client); + } + + exp->ctx = NULL; +} + NBDExport *nbd_export_new(BlockDriverState *bs, off_t dev_offset, off_t size, uint32_t nbdflags, void (*close)(NBDExport *)) { NBDExport *exp = g_malloc0(sizeof(NBDExport)); - QSIMPLEQ_INIT(&exp->requests); exp->refcount = 1; QTAILQ_INIT(&exp->clients); exp->bs = bs; @@ -870,6 +969,15 @@ NBDExport *nbd_export_new(BlockDriverState *bs, off_t dev_offset, exp->nbdflags = nbdflags; exp->size = size == -1 ? bdrv_getlength(bs) : size; exp->close = close; + exp->ctx = bdrv_get_aio_context(bs); + bdrv_ref(bs); + bdrv_add_aio_context_notifier(bs, bs_aio_attached, bs_aio_detach, exp); + /* + * NBD exports are used for non-shared storage migration. Make sure + * that BDRV_O_INCOMING is cleared and the image is ready for write + * access since the export could be available before migration handover. + */ + bdrv_invalidate_cache(bs, NULL); return exp; } @@ -916,6 +1024,12 @@ void nbd_export_close(NBDExport *exp) } nbd_export_set_name(exp, NULL); nbd_export_put(exp); + if (exp->bs) { + bdrv_remove_aio_context_notifier(exp->bs, bs_aio_attached, + bs_aio_detach, exp); + bdrv_unref(exp->bs); + exp->bs = NULL; + } } void nbd_export_get(NBDExport *exp) @@ -938,13 +1052,6 @@ void nbd_export_put(NBDExport *exp) exp->close(exp); } - while (!QSIMPLEQ_EMPTY(&exp->requests)) { - NBDRequest *first = QSIMPLEQ_FIRST(&exp->requests); - QSIMPLEQ_REMOVE_HEAD(&exp->requests, entry); - qemu_vfree(first->data); - g_free(first); - } - g_free(exp); } } @@ -963,10 +1070,6 @@ void nbd_export_close_all(void) } } -static int nbd_can_read(void *opaque); -static void nbd_read(void *opaque); -static void nbd_restart_write(void *opaque); - static ssize_t nbd_co_send_reply(NBDRequest *req, struct nbd_reply *reply, int len) { @@ -975,9 +1078,8 @@ static ssize_t nbd_co_send_reply(NBDRequest *req, struct nbd_reply *reply, ssize_t rc, ret; qemu_co_mutex_lock(&client->send_lock); - qemu_set_fd_handler2(csock, nbd_can_read, nbd_read, - nbd_restart_write, client); client->send_coroutine = qemu_coroutine_self(); + nbd_set_handlers(client); if (!len) { rc = nbd_send_reply(csock, reply); @@ -994,7 +1096,7 @@ static ssize_t nbd_co_send_reply(NBDRequest *req, struct nbd_reply *reply, } client->send_coroutine = NULL; - qemu_set_fd_handler2(csock, nbd_can_read, nbd_read, NULL, client); + nbd_set_handlers(client); qemu_co_mutex_unlock(&client->send_lock); return rc; } @@ -1003,9 +1105,12 @@ static ssize_t nbd_co_receive_request(NBDRequest *req, struct nbd_request *reque { NBDClient *client = req->client; int csock = client->sock; + uint32_t command; ssize_t rc; client->recv_coroutine = qemu_coroutine_self(); + nbd_update_can_read(client); + rc = nbd_receive_request(csock, request); if (rc < 0) { if (rc != -EAGAIN) { @@ -1014,9 +1119,9 @@ static ssize_t nbd_co_receive_request(NBDRequest *req, struct nbd_request *reque goto out; } - if (request->len > NBD_BUFFER_SIZE) { + if (request->len > NBD_MAX_BUFFER_SIZE) { LOG("len (%u) is larger than max len (%u)", - request->len, NBD_BUFFER_SIZE); + request->len, NBD_MAX_BUFFER_SIZE); rc = -EINVAL; goto out; } @@ -1030,7 +1135,11 @@ static ssize_t nbd_co_receive_request(NBDRequest *req, struct nbd_request *reque TRACE("Decoding type"); - if ((request->type & NBD_CMD_MASK_COMMAND) == NBD_CMD_WRITE) { + command = request->type & NBD_CMD_MASK_COMMAND; + if (command == NBD_CMD_READ || command == NBD_CMD_WRITE) { + req->data = qemu_blockalign(client->exp->bs, request->len); + } + if (command == NBD_CMD_WRITE) { TRACE("Reading %u byte(s)", request->len); if (qemu_co_recv(csock, req->data, request->len) != request->len) { @@ -1043,6 +1152,8 @@ static ssize_t nbd_co_receive_request(NBDRequest *req, struct nbd_request *reque out: client->recv_coroutine = NULL; + nbd_update_can_read(client); + return rc; } @@ -1054,6 +1165,7 @@ static void nbd_trip(void *opaque) struct nbd_request request; struct nbd_reply reply; ssize_t ret; + uint32_t command; TRACE("Reading request."); if (client->closing) { @@ -1076,8 +1188,8 @@ static void nbd_trip(void *opaque) reply.error = -ret; goto error_reply; } - - if ((request.from + request.len) > exp->size) { + command = request.type & NBD_CMD_MASK_COMMAND; + if (command != NBD_CMD_DISC && (request.from + request.len) > exp->size) { LOG("From: %" PRIu64 ", Len: %u, Size: %" PRIu64 ", Offset: %" PRIu64 "\n", request.from, request.len, @@ -1086,7 +1198,7 @@ static void nbd_trip(void *opaque) goto invalid_request; } - switch (request.type & NBD_CMD_MASK_COMMAND) { + switch (command) { case NBD_CMD_READ: TRACE("Request type is READ"); @@ -1193,13 +1305,6 @@ out: nbd_client_close(client); } -static int nbd_can_read(void *opaque) -{ - NBDClient *client = opaque; - - return client->recv_coroutine || client->nb_requests < MAX_NBD_REQUESTS; -} - static void nbd_read(void *opaque) { NBDClient *client = opaque; @@ -1218,6 +1323,37 @@ static void nbd_restart_write(void *opaque) qemu_coroutine_enter(client->send_coroutine, NULL); } +static void nbd_set_handlers(NBDClient *client) +{ + if (client->exp && client->exp->ctx) { + aio_set_fd_handler(client->exp->ctx, client->sock, + client->can_read ? nbd_read : NULL, + client->send_coroutine ? nbd_restart_write : NULL, + client); + } +} + +static void nbd_unset_handlers(NBDClient *client) +{ + if (client->exp && client->exp->ctx) { + aio_set_fd_handler(client->exp->ctx, client->sock, NULL, NULL, NULL); + } +} + +static void nbd_update_can_read(NBDClient *client) +{ + bool can_read = client->recv_coroutine || + client->nb_requests < MAX_NBD_REQUESTS; + + if (can_read != client->can_read) { + client->can_read = can_read; + nbd_set_handlers(client); + + /* There is no need to invoke aio_notify(), since aio_set_fd_handler() + * in nbd_set_handlers() will have taken care of that */ + } +} + NBDClient *nbd_client_new(NBDExport *exp, int csock, void (*close)(NBDClient *)) { @@ -1226,13 +1362,14 @@ NBDClient *nbd_client_new(NBDExport *exp, int csock, client->refcount = 1; client->exp = exp; client->sock = csock; - if (nbd_send_negotiate(client) < 0) { + client->can_read = true; + if (nbd_send_negotiate(client)) { g_free(client); return NULL; } client->close = close; qemu_co_mutex_init(&client->send_lock); - qemu_set_fd_handler2(csock, nbd_can_read, nbd_read, NULL, client); + nbd_set_handlers(client); if (exp) { QTAILQ_INSERT_TAIL(&exp->clients, client, next);