]> Git Repo - qemu.git/blobdiff - nbd/server.c
stream: rework backing-file changing
[qemu.git] / nbd / server.c
index 20754e9ebc3c1342692c99e7a99ff75ee6af35fc..7229f487d29649ee5747b79786d2857fc55f6cf6 100644 (file)
@@ -1,5 +1,5 @@
 /*
- *  Copyright (C) 2016-2018 Red Hat, Inc.
+ *  Copyright (C) 2016-2020 Red Hat, Inc.
  *  Copyright (C) 2005  Anthony Liguori <[email protected]>
  *
  *  Network Block Device Server Side
@@ -18,6 +18,8 @@
  */
 
 #include "qemu/osdep.h"
+
+#include "block/export.h"
 #include "qapi/error.h"
 #include "qemu/queue.h"
 #include "trace.h"
@@ -25,7 +27,9 @@
 #include "qemu/units.h"
 
 #define NBD_META_ID_BASE_ALLOCATION 0
-#define NBD_META_ID_DIRTY_BITMAP 1
+#define NBD_META_ID_ALLOCATION_DEPTH 1
+/* Dirty bitmaps use 'NBD_META_ID_DIRTY_BITMAP + i', so keep this id last. */
+#define NBD_META_ID_DIRTY_BITMAP 2
 
 /*
  * NBD_MAX_BLOCK_STATUS_EXTENTS: 1 MiB of extents data. An empirical
@@ -80,25 +84,21 @@ struct NBDRequestData {
 };
 
 struct NBDExport {
-    int refcount;
-    void (*close)(NBDExport *exp);
+    BlockExport common;
 
-    BlockBackend *blk;
     char *name;
     char *description;
-    uint64_t dev_offset;
     uint64_t size;
     uint16_t nbdflags;
     QTAILQ_HEAD(, NBDClient) clients;
     QTAILQ_ENTRY(NBDExport) next;
 
-    AioContext *ctx;
-
     BlockBackend *eject_notifier_blk;
     Notifier eject_notifier;
 
-    BdrvDirtyBitmap *export_bitmap;
-    char *export_bitmap_context;
+    bool allocation_depth;
+    BdrvDirtyBitmap **export_bitmaps;
+    size_t nr_export_bitmaps;
 };
 
 static QTAILQ_HEAD(, NBDExport) exports = QTAILQ_HEAD_INITIALIZER(exports);
@@ -108,10 +108,13 @@ static QTAILQ_HEAD(, NBDExport) exports = QTAILQ_HEAD_INITIALIZER(exports);
  * NBD_OPT_LIST_META_CONTEXT. */
 typedef struct NBDExportMetaContexts {
     NBDExport *exp;
-    bool valid; /* means that negotiation of the option finished without
-                   errors */
+    size_t count; /* number of negotiated contexts */
     bool base_allocation; /* export base:allocation context (block status) */
-    bool bitmap; /* export qemu:dirty-bitmap:<export bitmap name> */
+    bool allocation_depth; /* export qemu:allocation-depth */
+    bool *bitmaps; /*
+                    * export qemu:dirty-bitmap:<export bitmap name>,
+                    * sized by exp->nr_export_bitmaps
+                    */
 } NBDExportMetaContexts;
 
 struct NBDClient {
@@ -129,6 +132,9 @@ struct NBDClient {
     CoMutex send_lock;
     Coroutine *send_coroutine;
 
+    bool read_yielding;
+    bool quiescing;
+
     QTAILQ_ENTRY(NBDClient) next;
     int nb_requests;
     bool closing;
@@ -211,6 +217,7 @@ static int GCC_FMT_ATTR(4, 0)
 nbd_negotiate_send_rep_verr(NBDClient *client, uint32_t type,
                             Error **errp, const char *fmt, va_list va)
 {
+    ERRP_GUARD();
     g_autofree char *msg = NULL;
     int ret;
     size_t len;
@@ -303,10 +310,11 @@ nbd_opt_invalid(NBDClient *client, Error **errp, const char *fmt, ...)
 }
 
 /* Read size bytes from the unparsed payload of the current option.
+ * If @check_nul, require that no NUL bytes appear in buffer.
  * Return -errno on I/O error, 0 if option was completely handled by
  * sending a reply about inconsistent lengths, or 1 on success. */
 static int nbd_opt_read(NBDClient *client, void *buffer, size_t size,
-                        Error **errp)
+                        bool check_nul, Error **errp)
 {
     if (size > client->optlen) {
         return nbd_opt_invalid(client, errp,
@@ -314,7 +322,16 @@ static int nbd_opt_read(NBDClient *client, void *buffer, size_t size,
                                nbd_opt_lookup(client->opt));
     }
     client->optlen -= size;
-    return qio_channel_read_all(client->ioc, buffer, size, errp) < 0 ? -EIO : 1;
+    if (qio_channel_read_all(client->ioc, buffer, size, errp) < 0) {
+        return -EIO;
+    }
+
+    if (check_nul && strnlen(buffer, size) != size) {
+        return nbd_opt_invalid(client, errp,
+                               "Unexpected embedded NUL in option %s",
+                               nbd_opt_lookup(client->opt));
+    }
+    return 1;
 }
 
 /* Drop size bytes from the unparsed payload of the current option.
@@ -351,7 +368,7 @@ static int nbd_opt_read_name(NBDClient *client, char **name, uint32_t *length,
     g_autofree char *local_name = NULL;
 
     *name = NULL;
-    ret = nbd_opt_read(client, &len, sizeof(len), errp);
+    ret = nbd_opt_read(client, &len, sizeof(len), false, errp);
     if (ret <= 0) {
         return ret;
     }
@@ -363,7 +380,7 @@ static int nbd_opt_read_name(NBDClient *client, char **name, uint32_t *length,
     }
 
     local_name = g_malloc(len + 1);
-    ret = nbd_opt_read(client, local_name, len, errp);
+    ret = nbd_opt_read(client, local_name, len, true, errp);
     if (ret <= 0) {
         return ret;
     }
@@ -382,6 +399,7 @@ static int nbd_opt_read_name(NBDClient *client, char **name, uint32_t *length,
 static int nbd_negotiate_send_rep_list(NBDClient *client, NBDExport *exp,
                                        Error **errp)
 {
+    ERRP_GUARD();
     size_t name_len, desc_len;
     uint32_t len;
     const char *name = exp->name ? exp->name : "";
@@ -437,7 +455,9 @@ static int nbd_negotiate_handle_list(NBDClient *client, Error **errp)
 
 static void nbd_check_meta_export(NBDClient *client)
 {
-    client->export_meta.valid &= client->exp == client->export_meta.exp;
+    if (client->exp != client->export_meta.exp) {
+        client->export_meta.count = 0;
+    }
 }
 
 /* Send a reply to NBD_OPT_EXPORT_NAME.
@@ -445,6 +465,7 @@ static void nbd_check_meta_export(NBDClient *client)
 static int nbd_negotiate_handle_export_name(NBDClient *client, bool no_zeroes,
                                             Error **errp)
 {
+    ERRP_GUARD();
     g_autofree char *name = NULL;
     char buf[NBD_REPLY_EXPORT_NAME_SIZE] = "";
     size_t len;
@@ -493,7 +514,7 @@ static int nbd_negotiate_handle_export_name(NBDClient *client, bool no_zeroes,
     }
 
     QTAILQ_INSERT_TAIL(&client->exp->clients, client, next);
-    nbd_export_get(client->exp);
+    blk_exp_ref(&client->exp->common);
     nbd_check_meta_export(client);
 
     return 0;
@@ -556,7 +577,7 @@ static int nbd_negotiate_handle_info(NBDClient *client, Error **errp)
     NBDExport *exp;
     uint16_t requests;
     uint16_t request;
-    uint32_t namelen;
+    uint32_t namelen = 0;
     bool sendname = false;
     bool blocksize = false;
     uint32_t sizes[3];
@@ -576,14 +597,14 @@ static int nbd_negotiate_handle_info(NBDClient *client, Error **errp)
     }
     trace_nbd_negotiate_handle_export_name_request(name);
 
-    rc = nbd_opt_read(client, &requests, sizeof(requests), errp);
+    rc = nbd_opt_read(client, &requests, sizeof(requests), false, errp);
     if (rc <= 0) {
         return rc;
     }
     requests = be16_to_cpu(requests);
     trace_nbd_negotiate_handle_info_requests(requests);
     while (requests--) {
-        rc = nbd_opt_read(client, &request, sizeof(request), errp);
+        rc = nbd_opt_read(client, &request, sizeof(request), false, errp);
         if (rc <= 0) {
             return rc;
         }
@@ -642,7 +663,7 @@ static int nbd_negotiate_handle_info(NBDClient *client, Error **errp)
      * whether this is OPT_INFO or OPT_GO. */
     /* minimum - 1 for back-compat, or actual if client will obey it. */
     if (client->opt == NBD_OPT_INFO || blocksize) {
-        check_align = sizes[0] = blk_get_request_alignment(exp->blk);
+        check_align = sizes[0] = blk_get_request_alignment(exp->common.blk);
     } else {
         sizes[0] = 1;
     }
@@ -651,7 +672,7 @@ static int nbd_negotiate_handle_info(NBDClient *client, Error **errp)
      * TODO: is blk_bs(blk)->bl.opt_transfer appropriate? */
     sizes[1] = MAX(4096, sizes[0]);
     /* maximum - At most 32M, but smaller as appropriate. */
-    sizes[2] = MIN(blk_get_max_transfer(exp->blk), NBD_MAX_BUFFER_SIZE);
+    sizes[2] = MIN(blk_get_max_transfer(exp->common.blk), NBD_MAX_BUFFER_SIZE);
     trace_nbd_negotiate_handle_info_block_size(sizes[0], sizes[1], sizes[2]);
     sizes[0] = cpu_to_be32(sizes[0]);
     sizes[1] = cpu_to_be32(sizes[1]);
@@ -683,7 +704,7 @@ static int nbd_negotiate_handle_info(NBDClient *client, Error **errp)
      * tolerate all clients, regardless of alignments.
      */
     if (client->opt == NBD_OPT_INFO && !blocksize &&
-        blk_get_request_alignment(exp->blk) > 1) {
+        blk_get_request_alignment(exp->common.blk) > 1) {
         return nbd_negotiate_send_rep_err(client,
                                           NBD_REP_ERR_BLOCK_SIZE_REQD,
                                           errp,
@@ -701,7 +722,7 @@ static int nbd_negotiate_handle_info(NBDClient *client, Error **errp)
         client->exp = exp;
         client->check_align = check_align;
         QTAILQ_INSERT_TAIL(&client->exp->clients, client, next);
-        nbd_export_get(client->exp);
+        blk_exp_ref(&client->exp->common);
         nbd_check_meta_export(client);
         rc = 1;
     }
@@ -787,135 +808,115 @@ static int nbd_negotiate_send_meta_context(NBDClient *client,
     return qio_channel_writev_all(client->ioc, iov, 2, errp) < 0 ? -EIO : 0;
 }
 
-/* Read strlen(@pattern) bytes, and set @match to true if they match @pattern.
- * @match is never set to false.
- *
- * Return -errno on I/O error, 0 if option was completely handled by
- * sending a reply about inconsistent lengths, or 1 on success.
- *
- * Note: return code = 1 doesn't mean that we've read exactly @pattern.
- * It only means that there are no errors.
+/*
+ * Return true if @query matches @pattern, or if @query is empty when
+ * the @client is performing _LIST_.
  */
-static int nbd_meta_pattern(NBDClient *client, const char *pattern, bool *match,
-                            Error **errp)
+static bool nbd_meta_empty_or_pattern(NBDClient *client, const char *pattern,
+                                      const char *query)
 {
-    int ret;
-    char *query;
-    size_t len = strlen(pattern);
-
-    assert(len);
-
-    query = g_malloc(len);
-    ret = nbd_opt_read(client, query, len, errp);
-    if (ret <= 0) {
-        g_free(query);
-        return ret;
+    if (!*query) {
+        trace_nbd_negotiate_meta_query_parse("empty");
+        return client->opt == NBD_OPT_LIST_META_CONTEXT;
     }
-
-    if (strncmp(query, pattern, len) == 0) {
+    if (strcmp(query, pattern) == 0) {
         trace_nbd_negotiate_meta_query_parse(pattern);
-        *match = true;
-    } else {
-        trace_nbd_negotiate_meta_query_skip("pattern not matched");
+        return true;
     }
-    g_free(query);
-
-    return 1;
+    trace_nbd_negotiate_meta_query_skip("pattern not matched");
+    return false;
 }
 
 /*
- * Read @len bytes, and set @match to true if they match @pattern, or if @len
- * is 0 and the client is performing _LIST_. @match is never set to false.
- *
- * Return -errno on I/O error, 0 if option was completely handled by
- * sending a reply about inconsistent lengths, or 1 on success.
- *
- * Note: return code = 1 doesn't mean that we've read exactly @pattern.
- * It only means that there are no errors.
+ * Return true and adjust @str in place if it begins with @prefix.
  */
-static int nbd_meta_empty_or_pattern(NBDClient *client, const char *pattern,
-                                     uint32_t len, bool *match, Error **errp)
+static bool nbd_strshift(const char **str, const char *prefix)
 {
-    if (len == 0) {
-        if (client->opt == NBD_OPT_LIST_META_CONTEXT) {
-            *match = true;
-        }
-        trace_nbd_negotiate_meta_query_parse("empty");
-        return 1;
-    }
+    size_t len = strlen(prefix);
 
-    if (len != strlen(pattern)) {
-        trace_nbd_negotiate_meta_query_skip("different lengths");
-        return nbd_opt_skip(client, len, errp);
+    if (strncmp(*str, prefix, len) == 0) {
+        *str += len;
+        return true;
     }
-
-    return nbd_meta_pattern(client, pattern, match, errp);
+    return false;
 }
 
 /* nbd_meta_base_query
  *
  * Handle queries to 'base' namespace. For now, only the base:allocation
- * context is available.  'len' is the amount of text remaining to be read from
- * the current name, after the 'base:' portion has been stripped.
- *
- * Return -errno on I/O error, 0 if option was completely handled by
- * sending a reply about inconsistent lengths, or 1 on success.
+ * context is available.  Return true if @query has been handled.
  */
-static int nbd_meta_base_query(NBDClient *client, NBDExportMetaContexts *meta,
-                               uint32_t len, Error **errp)
+static bool nbd_meta_base_query(NBDClient *client, NBDExportMetaContexts *meta,
+                                const char *query)
 {
-    return nbd_meta_empty_or_pattern(client, "allocation", len,
-                                     &meta->base_allocation, errp);
+    if (!nbd_strshift(&query, "base:")) {
+        return false;
+    }
+    trace_nbd_negotiate_meta_query_parse("base:");
+
+    if (nbd_meta_empty_or_pattern(client, "allocation", query)) {
+        meta->base_allocation = true;
+    }
+    return true;
 }
 
-/* nbd_meta_bitmap_query
- *
- * Handle query to 'qemu:' namespace.
- * @len is the amount of text remaining to be read from the current name, after
- * the 'qemu:' portion has been stripped.
+/* nbd_meta_qemu_query
  *
- * Return -errno on I/O error, 0 if option was completely handled by
- * sending a reply about inconsistent lengths, or 1 on success. */
-static int nbd_meta_qemu_query(NBDClient *client, NBDExportMetaContexts *meta,
-                               uint32_t len, Error **errp)
+ * Handle queries to 'qemu' namespace. For now, only the qemu:dirty-bitmap:
+ * and qemu:allocation-depth contexts are available.  Return true if @query
+ * has been handled.
+ */
+static bool nbd_meta_qemu_query(NBDClient *client, NBDExportMetaContexts *meta,
+                                const char *query)
 {
-    bool dirty_bitmap = false;
-    size_t dirty_bitmap_len = strlen("dirty-bitmap:");
-    int ret;
+    size_t i;
 
-    if (!meta->exp->export_bitmap) {
-        trace_nbd_negotiate_meta_query_skip("no dirty-bitmap exported");
-        return nbd_opt_skip(client, len, errp);
+    if (!nbd_strshift(&query, "qemu:")) {
+        return false;
     }
+    trace_nbd_negotiate_meta_query_parse("qemu:");
 
-    if (len == 0) {
+    if (!*query) {
         if (client->opt == NBD_OPT_LIST_META_CONTEXT) {
-            meta->bitmap = true;
+            meta->allocation_depth = meta->exp->allocation_depth;
+            memset(meta->bitmaps, 1, meta->exp->nr_export_bitmaps);
         }
         trace_nbd_negotiate_meta_query_parse("empty");
-        return 1;
+        return true;
     }
 
-    if (len < dirty_bitmap_len) {
-        trace_nbd_negotiate_meta_query_skip("not dirty-bitmap:");
-        return nbd_opt_skip(client, len, errp);
+    if (strcmp(query, "allocation-depth") == 0) {
+        trace_nbd_negotiate_meta_query_parse("allocation-depth");
+        meta->allocation_depth = meta->exp->allocation_depth;
+        return true;
     }
 
-    len -= dirty_bitmap_len;
-    ret = nbd_meta_pattern(client, "dirty-bitmap:", &dirty_bitmap, errp);
-    if (ret <= 0) {
-        return ret;
-    }
-    if (!dirty_bitmap) {
-        trace_nbd_negotiate_meta_query_skip("not dirty-bitmap:");
-        return nbd_opt_skip(client, len, errp);
-    }
+    if (nbd_strshift(&query, "dirty-bitmap:")) {
+        trace_nbd_negotiate_meta_query_parse("dirty-bitmap:");
+        if (!*query) {
+            if (client->opt == NBD_OPT_LIST_META_CONTEXT) {
+                memset(meta->bitmaps, 1, meta->exp->nr_export_bitmaps);
+            }
+            trace_nbd_negotiate_meta_query_parse("empty");
+            return true;
+        }
 
-    trace_nbd_negotiate_meta_query_parse("dirty-bitmap:");
+        for (i = 0; i < meta->exp->nr_export_bitmaps; i++) {
+            const char *bm_name;
+
+            bm_name = bdrv_dirty_bitmap_name(meta->exp->export_bitmaps[i]);
+            if (strcmp(bm_name, query) == 0) {
+                meta->bitmaps[i] = true;
+                trace_nbd_negotiate_meta_query_parse(query);
+                return true;
+            }
+        }
+        trace_nbd_negotiate_meta_query_skip("no dirty-bitmap match");
+        return true;
+    }
 
-    return nbd_meta_empty_or_pattern(
-            client, meta->exp->export_bitmap_context +
-            strlen("qemu:dirty_bitmap:"), len, &meta->bitmap, errp);
+    trace_nbd_negotiate_meta_query_skip("unknown qemu context");
+    return true;
 }
 
 /* nbd_negotiate_meta_query
@@ -925,25 +926,16 @@ static int nbd_meta_qemu_query(NBDClient *client, NBDExportMetaContexts *meta,
  *
  * The only supported namespaces are 'base' and 'qemu'.
  *
- * The function aims not wasting time and memory to read long unknown namespace
- * names.
- *
  * Return -errno on I/O error, 0 if option was completely handled by
  * sending a reply about inconsistent lengths, or 1 on success. */
 static int nbd_negotiate_meta_query(NBDClient *client,
                                     NBDExportMetaContexts *meta, Error **errp)
 {
-    /*
-     * Both 'qemu' and 'base' namespaces have length = 5 including a
-     * colon. If another length namespace is later introduced, this
-     * should certainly be refactored.
-     */
     int ret;
-    size_t ns_len = 5;
-    char ns[5];
+    g_autofree char *query = NULL;
     uint32_t len;
 
-    ret = nbd_opt_read(client, &len, sizeof(len), errp);
+    ret = nbd_opt_read(client, &len, sizeof(len), false, errp);
     if (ret <= 0) {
         return ret;
     }
@@ -953,27 +945,23 @@ static int nbd_negotiate_meta_query(NBDClient *client,
         trace_nbd_negotiate_meta_query_skip("length too long");
         return nbd_opt_skip(client, len, errp);
     }
-    if (len < ns_len) {
-        trace_nbd_negotiate_meta_query_skip("length too short");
-        return nbd_opt_skip(client, len, errp);
-    }
 
-    len -= ns_len;
-    ret = nbd_opt_read(client, ns, ns_len, errp);
+    query = g_malloc(len + 1);
+    ret = nbd_opt_read(client, query, len, true, errp);
     if (ret <= 0) {
         return ret;
     }
+    query[len] = '\0';
 
-    if (!strncmp(ns, "base:", ns_len)) {
-        trace_nbd_negotiate_meta_query_parse("base:");
-        return nbd_meta_base_query(client, meta, len, errp);
-    } else if (!strncmp(ns, "qemu:", ns_len)) {
-        trace_nbd_negotiate_meta_query_parse("qemu:");
-        return nbd_meta_qemu_query(client, meta, len, errp);
+    if (nbd_meta_base_query(client, meta, query)) {
+        return 1;
+    }
+    if (nbd_meta_qemu_query(client, meta, query)) {
+        return 1;
     }
 
     trace_nbd_negotiate_meta_query_skip("unknown namespace");
-    return nbd_opt_skip(client, len, errp);
+    return 1;
 }
 
 /* nbd_negotiate_meta_queries
@@ -985,9 +973,11 @@ static int nbd_negotiate_meta_queries(NBDClient *client,
 {
     int ret;
     g_autofree char *export_name = NULL;
-    NBDExportMetaContexts local_meta;
+    g_autofree bool *bitmaps = NULL;
+    NBDExportMetaContexts local_meta = {0};
     uint32_t nb_queries;
-    int i;
+    size_t i;
+    size_t count = 0;
 
     if (!client->structured_reply) {
         return nbd_opt_invalid(client, errp,
@@ -1001,6 +991,7 @@ static int nbd_negotiate_meta_queries(NBDClient *client,
         meta = &local_meta;
     }
 
+    g_free(meta->bitmaps);
     memset(meta, 0, sizeof(*meta));
 
     ret = nbd_opt_read_name(client, &export_name, NULL, errp);
@@ -1015,8 +1006,12 @@ static int nbd_negotiate_meta_queries(NBDClient *client,
         return nbd_opt_drop(client, NBD_REP_ERR_UNKNOWN, errp,
                             "export '%s' not present", sane_name);
     }
+    meta->bitmaps = g_new0(bool, meta->exp->nr_export_bitmaps);
+    if (client->opt == NBD_OPT_LIST_META_CONTEXT) {
+        bitmaps = meta->bitmaps;
+    }
 
-    ret = nbd_opt_read(client, &nb_queries, sizeof(nb_queries), errp);
+    ret = nbd_opt_read(client, &nb_queries, sizeof(nb_queries), false, errp);
     if (ret <= 0) {
         return ret;
     }
@@ -1027,7 +1022,8 @@ static int nbd_negotiate_meta_queries(NBDClient *client,
     if (client->opt == NBD_OPT_LIST_META_CONTEXT && !nb_queries) {
         /* enable all known contexts */
         meta->base_allocation = true;
-        meta->bitmap = !!meta->exp->export_bitmap;
+        meta->allocation_depth = meta->exp->allocation_depth;
+        memset(meta->bitmaps, 1, meta->exp->nr_export_bitmaps);
     } else {
         for (i = 0; i < nb_queries; ++i) {
             ret = nbd_negotiate_meta_query(client, meta, errp);
@@ -1044,21 +1040,42 @@ static int nbd_negotiate_meta_queries(NBDClient *client,
         if (ret < 0) {
             return ret;
         }
+        count++;
+    }
+
+    if (meta->allocation_depth) {
+        ret = nbd_negotiate_send_meta_context(client, "qemu:allocation-depth",
+                                              NBD_META_ID_ALLOCATION_DEPTH,
+                                              errp);
+        if (ret < 0) {
+            return ret;
+        }
+        count++;
     }
 
-    if (meta->bitmap) {
-        ret = nbd_negotiate_send_meta_context(client,
-                                              meta->exp->export_bitmap_context,
-                                              NBD_META_ID_DIRTY_BITMAP,
+    for (i = 0; i < meta->exp->nr_export_bitmaps; i++) {
+        const char *bm_name;
+        g_autofree char *context = NULL;
+
+        if (!meta->bitmaps[i]) {
+            continue;
+        }
+
+        bm_name = bdrv_dirty_bitmap_name(meta->exp->export_bitmaps[i]);
+        context = g_strdup_printf("qemu:dirty-bitmap:%s", bm_name);
+
+        ret = nbd_negotiate_send_meta_context(client, context,
+                                              NBD_META_ID_DIRTY_BITMAP + i,
                                               errp);
         if (ret < 0) {
             return ret;
         }
+        count++;
     }
 
     ret = nbd_negotiate_send_rep(client, NBD_REP_ACK, errp);
     if (ret == 0) {
-        meta->valid = true;
+        meta->count = count;
     }
 
     return ret;
@@ -1289,6 +1306,7 @@ static int nbd_negotiate_options(NBDClient *client, Error **errp)
  */
 static coroutine_fn int nbd_negotiate(NBDClient *client, Error **errp)
 {
+    ERRP_GUARD();
     char buf[NBD_OLDSTYLE_NEGOTIATE_SIZE] = "";
     int ret;
 
@@ -1327,8 +1345,8 @@ static coroutine_fn int nbd_negotiate(NBDClient *client, Error **errp)
     }
 
     /* Attach the channel to the same AioContext as the export */
-    if (client->exp && client->exp->ctx) {
-        qio_channel_attach_aio_context(client->ioc, client->exp->ctx);
+    if (client->exp && client->exp->common.ctx) {
+        qio_channel_attach_aio_context(client->ioc, client->exp->common.ctx);
     }
 
     assert(!client->optlen);
@@ -1337,14 +1355,60 @@ static coroutine_fn int nbd_negotiate(NBDClient *client, Error **errp)
     return 0;
 }
 
-static int nbd_receive_request(QIOChannel *ioc, NBDRequest *request,
+/* nbd_read_eof
+ * Tries to read @size bytes from @ioc. This is a local implementation of
+ * qio_channel_readv_all_eof. We have it here because we need it to be
+ * interruptible and to know when the coroutine is yielding.
+ * Returns 1 on success
+ *         0 on eof, when no data was read (errp is not set)
+ *         negative errno on failure (errp is set)
+ */
+static inline int coroutine_fn
+nbd_read_eof(NBDClient *client, void *buffer, size_t size, Error **errp)
+{
+    bool partial = false;
+
+    assert(size);
+    while (size > 0) {
+        struct iovec iov = { .iov_base = buffer, .iov_len = size };
+        ssize_t len;
+
+        len = qio_channel_readv(client->ioc, &iov, 1, errp);
+        if (len == QIO_CHANNEL_ERR_BLOCK) {
+            client->read_yielding = true;
+            qio_channel_yield(client->ioc, G_IO_IN);
+            client->read_yielding = false;
+            if (client->quiescing) {
+                return -EAGAIN;
+            }
+            continue;
+        } else if (len < 0) {
+            return -EIO;
+        } else if (len == 0) {
+            if (partial) {
+                error_setg(errp,
+                           "Unexpected end-of-file before all bytes were read");
+                return -EIO;
+            } else {
+                return 0;
+            }
+        }
+
+        partial = true;
+        size -= len;
+        buffer = (uint8_t *) buffer + len;
+    }
+    return 1;
+}
+
+static int nbd_receive_request(NBDClient *client, NBDRequest *request,
                                Error **errp)
 {
     uint8_t buf[NBD_REQUEST_SIZE];
     uint32_t magic;
     int ret;
 
-    ret = nbd_read(ioc, buf, sizeof(buf), "request", errp);
+    ret = nbd_read_eof(client, buf, sizeof(buf), errp);
     if (ret < 0) {
         return ret;
     }
@@ -1399,8 +1463,9 @@ void nbd_client_put(NBDClient *client)
         g_free(client->tlsauthz);
         if (client->exp) {
             QTAILQ_REMOVE(&client->exp->clients, client, next);
-            nbd_export_put(client->exp);
+            blk_exp_unref(&client->exp->common);
         }
+        g_free(client->export_meta.bitmaps);
         g_free(client);
     }
 }
@@ -1460,15 +1525,41 @@ static void blk_aio_attached(AioContext *ctx, void *opaque)
 
     trace_nbd_blk_aio_attached(exp->name, ctx);
 
-    exp->ctx = ctx;
+    exp->common.ctx = ctx;
 
     QTAILQ_FOREACH(client, &exp->clients, next) {
         qio_channel_attach_aio_context(client->ioc, ctx);
+
+        assert(client->recv_coroutine == NULL);
+        assert(client->send_coroutine == NULL);
+
+        if (client->quiescing) {
+            client->quiescing = false;
+            nbd_client_receive_next_request(client);
+        }
+    }
+}
+
+static void nbd_aio_detach_bh(void *opaque)
+{
+    NBDExport *exp = opaque;
+    NBDClient *client;
+
+    QTAILQ_FOREACH(client, &exp->clients, next) {
+        qio_channel_detach_aio_context(client->ioc);
+        client->quiescing = true;
+
         if (client->recv_coroutine) {
-            aio_co_schedule(ctx, client->recv_coroutine);
+            if (client->read_yielding) {
+                qemu_aio_coroutine_enter(exp->common.ctx,
+                                         client->recv_coroutine);
+            } else {
+                AIO_WAIT_WHILE(exp->common.ctx, client->recv_coroutine != NULL);
+            }
         }
+
         if (client->send_coroutine) {
-            aio_co_schedule(ctx, client->send_coroutine);
+            AIO_WAIT_WHILE(exp->common.ctx, client->send_coroutine != NULL);
         }
     }
 }
@@ -1476,74 +1567,91 @@ static void blk_aio_attached(AioContext *ctx, void *opaque)
 static void blk_aio_detach(void *opaque)
 {
     NBDExport *exp = opaque;
-    NBDClient *client;
 
-    trace_nbd_blk_aio_detach(exp->name, exp->ctx);
+    trace_nbd_blk_aio_detach(exp->name, exp->common.ctx);
 
-    QTAILQ_FOREACH(client, &exp->clients, next) {
-        qio_channel_detach_aio_context(client->ioc);
-    }
+    aio_wait_bh_oneshot(exp->common.ctx, nbd_aio_detach_bh, exp);
 
-    exp->ctx = NULL;
+    exp->common.ctx = NULL;
 }
 
 static void nbd_eject_notifier(Notifier *n, void *data)
 {
     NBDExport *exp = container_of(n, NBDExport, eject_notifier);
-    AioContext *aio_context;
 
-    aio_context = exp->ctx;
-    aio_context_acquire(aio_context);
-    nbd_export_close(exp);
-    aio_context_release(aio_context);
+    blk_exp_request_shutdown(&exp->common);
+}
+
+void nbd_export_set_on_eject_blk(BlockExport *exp, BlockBackend *blk)
+{
+    NBDExport *nbd_exp = container_of(exp, NBDExport, common);
+    assert(exp->drv == &blk_exp_nbd);
+    assert(nbd_exp->eject_notifier_blk == NULL);
+
+    blk_ref(blk);
+    nbd_exp->eject_notifier_blk = blk;
+    nbd_exp->eject_notifier.notify = nbd_eject_notifier;
+    blk_add_remove_bs_notifier(blk, &nbd_exp->eject_notifier);
 }
 
-NBDExport *nbd_export_new(BlockDriverState *bs, uint64_t dev_offset,
-                          uint64_t size, const char *name, const char *desc,
-                          const char *bitmap, bool readonly, bool shared,
-                          void (*close)(NBDExport *), bool writethrough,
-                          BlockBackend *on_eject_blk, Error **errp)
+static int nbd_export_create(BlockExport *blk_exp, BlockExportOptions *exp_args,
+                             Error **errp)
 {
-    AioContext *ctx;
-    BlockBackend *blk;
-    NBDExport *exp = g_new0(NBDExport, 1);
-    uint64_t perm;
+    NBDExport *exp = container_of(blk_exp, NBDExport, common);
+    BlockExportOptionsNbd *arg = &exp_args->u.nbd;
+    BlockBackend *blk = blk_exp->blk;
+    int64_t size;
+    uint64_t perm, shared_perm;
+    bool readonly = !exp_args->writable;
+    bool shared = !exp_args->writable;
+    strList *bitmaps;
+    size_t i;
     int ret;
 
-    /*
-     * NBD exports are used for non-shared storage migration.  Make sure
-     * that BDRV_O_INACTIVE is cleared and the image is ready for write
-     * access since the export could be available before migration handover.
-     * ctx was acquired in the caller.
-     */
-    assert(name && strlen(name) <= NBD_MAX_STRING_SIZE);
-    ctx = bdrv_get_aio_context(bs);
-    bdrv_invalidate_cache(bs, NULL);
+    assert(exp_args->type == BLOCK_EXPORT_TYPE_NBD);
+
+    if (!nbd_server_is_running()) {
+        error_setg(errp, "NBD server not running");
+        return -EINVAL;
+    }
+
+    if (!arg->has_name) {
+        arg->name = exp_args->node_name;
+    }
+
+    if (strlen(arg->name) > NBD_MAX_STRING_SIZE) {
+        error_setg(errp, "export name '%s' too long", arg->name);
+        return -EINVAL;
+    }
+
+    if (arg->description && strlen(arg->description) > NBD_MAX_STRING_SIZE) {
+        error_setg(errp, "description '%s' too long", arg->description);
+        return -EINVAL;
+    }
+
+    if (nbd_export_find(arg->name)) {
+        error_setg(errp, "NBD server already has export named '%s'", arg->name);
+        return -EEXIST;
+    }
+
+    size = blk_getlength(blk);
+    if (size < 0) {
+        error_setg_errno(errp, -size,
+                         "Failed to determine the NBD export's length");
+        return size;
+    }
 
     /* Don't allow resize while the NBD server is running, otherwise we don't
      * care what happens with the node. */
-    perm = BLK_PERM_CONSISTENT_READ;
-    if (!readonly) {
-        perm |= BLK_PERM_WRITE;
-    }
-    blk = blk_new(ctx, perm,
-                  BLK_PERM_CONSISTENT_READ | BLK_PERM_WRITE_UNCHANGED |
-                  BLK_PERM_WRITE | BLK_PERM_GRAPH_MOD);
-    ret = blk_insert_bs(blk, bs, errp);
+    blk_get_perm(blk, &perm, &shared_perm);
+    ret = blk_set_perm(blk, perm, shared_perm & ~BLK_PERM_RESIZE, errp);
     if (ret < 0) {
-        goto fail;
+        return ret;
     }
-    blk_set_enable_write_cache(blk, !writethrough);
-    blk_set_allow_aio_context_change(blk, true);
 
-    exp->refcount = 1;
     QTAILQ_INIT(&exp->clients);
-    exp->blk = blk;
-    assert(dev_offset <= INT64_MAX);
-    exp->dev_offset = dev_offset;
-    exp->name = g_strdup(name);
-    assert(!desc || strlen(desc) <= NBD_MAX_STRING_SIZE);
-    exp->description = g_strdup(desc);
+    exp->name = g_strdup(arg->name);
+    exp->description = g_strdup(arg->description);
     exp->nbdflags = (NBD_FLAG_HAS_FLAGS | NBD_FLAG_SEND_FLUSH |
                      NBD_FLAG_SEND_FUA | NBD_FLAG_SEND_CACHE);
     if (readonly) {
@@ -1555,66 +1663,69 @@ NBDExport *nbd_export_new(BlockDriverState *bs, uint64_t dev_offset,
         exp->nbdflags |= (NBD_FLAG_SEND_TRIM | NBD_FLAG_SEND_WRITE_ZEROES |
                           NBD_FLAG_SEND_FAST_ZERO);
     }
-    assert(size <= INT64_MAX - dev_offset);
     exp->size = QEMU_ALIGN_DOWN(size, BDRV_SECTOR_SIZE);
 
-    if (bitmap) {
+    for (bitmaps = arg->bitmaps; bitmaps; bitmaps = bitmaps->next) {
+        exp->nr_export_bitmaps++;
+    }
+    exp->export_bitmaps = g_new0(BdrvDirtyBitmap *, exp->nr_export_bitmaps);
+    for (i = 0, bitmaps = arg->bitmaps; bitmaps;
+         i++, bitmaps = bitmaps->next) {
+        const char *bitmap = bitmaps->value;
+        BlockDriverState *bs = blk_bs(blk);
         BdrvDirtyBitmap *bm = NULL;
 
-        while (true) {
+        while (bs) {
             bm = bdrv_find_dirty_bitmap(bs, bitmap);
-            if (bm != NULL || bs->backing == NULL) {
+            if (bm != NULL) {
                 break;
             }
 
-            bs = bs->backing->bs;
+            bs = bdrv_filter_or_cow_bs(bs);
         }
 
         if (bm == NULL) {
+            ret = -ENOENT;
             error_setg(errp, "Bitmap '%s' is not found", bitmap);
             goto fail;
         }
 
         if (bdrv_dirty_bitmap_check(bm, BDRV_BITMAP_ALLOW_RO, errp)) {
+            ret = -EINVAL;
             goto fail;
         }
 
         if (readonly && bdrv_is_writable(bs) &&
             bdrv_dirty_bitmap_enabled(bm)) {
+            ret = -EINVAL;
             error_setg(errp,
                        "Enabled bitmap '%s' incompatible with readonly export",
                        bitmap);
             goto fail;
         }
 
-        bdrv_dirty_bitmap_set_busy(bm, true);
-        exp->export_bitmap = bm;
+        exp->export_bitmaps[i] = bm;
         assert(strlen(bitmap) <= BDRV_BITMAP_MAX_NAME_SIZE);
-        exp->export_bitmap_context = g_strdup_printf("qemu:dirty-bitmap:%s",
-                                                     bitmap);
-        assert(strlen(exp->export_bitmap_context) < NBD_MAX_STRING_SIZE);
     }
 
-    exp->close = close;
-    exp->ctx = ctx;
+    /* Mark bitmaps busy in a separate loop, to simplify roll-back concerns. */
+    for (i = 0; i < exp->nr_export_bitmaps; i++) {
+        bdrv_dirty_bitmap_set_busy(exp->export_bitmaps[i], true);
+    }
+
+    exp->allocation_depth = arg->allocation_depth;
+
     blk_add_aio_context_notifier(blk, blk_aio_attached, blk_aio_detach, exp);
 
-    if (on_eject_blk) {
-        blk_ref(on_eject_blk);
-        exp->eject_notifier_blk = on_eject_blk;
-        exp->eject_notifier.notify = nbd_eject_notifier;
-        blk_add_remove_bs_notifier(on_eject_blk, &exp->eject_notifier);
-    }
     QTAILQ_INSERT_TAIL(&exports, exp, next);
-    nbd_export_get(exp);
-    return exp;
+
+    return 0;
 
 fail:
-    blk_unref(blk);
+    g_free(exp->export_bitmaps);
     g_free(exp->name);
     g_free(exp->description);
-    g_free(exp);
-    return NULL;
+    return ret;
 }
 
 NBDExport *nbd_export_find(const char *name)
@@ -1632,14 +1743,15 @@ NBDExport *nbd_export_find(const char *name)
 AioContext *
 nbd_export_aio_context(NBDExport *exp)
 {
-    return exp->ctx;
+    return exp->common.ctx;
 }
 
-void nbd_export_close(NBDExport *exp)
+static void nbd_export_request_shutdown(BlockExport *blk_exp)
 {
+    NBDExport *exp = container_of(blk_exp, NBDExport, common);
     NBDClient *client, *next;
 
-    nbd_export_get(exp);
+    blk_exp_ref(&exp->common);
     /*
      * TODO: Should we expand QMP NbdServerRemoveNode enum to allow a
      * close mode that stops advertising the export to new clients but
@@ -1651,94 +1763,46 @@ void nbd_export_close(NBDExport *exp)
         client_close(client, true);
     }
     if (exp->name) {
-        nbd_export_put(exp);
         g_free(exp->name);
         exp->name = NULL;
         QTAILQ_REMOVE(&exports, exp, next);
     }
-    g_free(exp->description);
-    exp->description = NULL;
-    nbd_export_put(exp);
-}
-
-void nbd_export_remove(NBDExport *exp, NbdServerRemoveMode mode, Error **errp)
-{
-    if (mode == NBD_SERVER_REMOVE_MODE_HARD || QTAILQ_EMPTY(&exp->clients)) {
-        nbd_export_close(exp);
-        return;
-    }
-
-    assert(mode == NBD_SERVER_REMOVE_MODE_SAFE);
-
-    error_setg(errp, "export '%s' still in use", exp->name);
-    error_append_hint(errp, "Use mode='hard' to force client disconnect\n");
-}
-
-void nbd_export_get(NBDExport *exp)
-{
-    assert(exp->refcount > 0);
-    exp->refcount++;
+    blk_exp_unref(&exp->common);
 }
 
-void nbd_export_put(NBDExport *exp)
+static void nbd_export_delete(BlockExport *blk_exp)
 {
-    assert(exp->refcount > 0);
-    if (exp->refcount == 1) {
-        nbd_export_close(exp);
-    }
+    size_t i;
+    NBDExport *exp = container_of(blk_exp, NBDExport, common);
 
-    /* nbd_export_close() may theoretically reduce refcount to 0. It may happen
-     * if someone calls nbd_export_put() on named export not through
-     * nbd_export_set_name() when refcount is 1. So, let's assert that
-     * it is > 0.
-     */
-    assert(exp->refcount > 0);
-    if (--exp->refcount == 0) {
-        assert(exp->name == NULL);
-        assert(exp->description == NULL);
+    assert(exp->name == NULL);
+    assert(QTAILQ_EMPTY(&exp->clients));
 
-        if (exp->close) {
-            exp->close(exp);
-        }
-
-        if (exp->blk) {
-            if (exp->eject_notifier_blk) {
-                notifier_remove(&exp->eject_notifier);
-                blk_unref(exp->eject_notifier_blk);
-            }
-            blk_remove_aio_context_notifier(exp->blk, blk_aio_attached,
-                                            blk_aio_detach, exp);
-            blk_unref(exp->blk);
-            exp->blk = NULL;
-        }
+    g_free(exp->description);
+    exp->description = NULL;
 
-        if (exp->export_bitmap) {
-            bdrv_dirty_bitmap_set_busy(exp->export_bitmap, false);
-            g_free(exp->export_bitmap_context);
+    if (exp->common.blk) {
+        if (exp->eject_notifier_blk) {
+            notifier_remove(&exp->eject_notifier);
+            blk_unref(exp->eject_notifier_blk);
         }
-
-        g_free(exp);
+        blk_remove_aio_context_notifier(exp->common.blk, blk_aio_attached,
+                                        blk_aio_detach, exp);
     }
-}
-
-BlockBackend *nbd_export_get_blockdev(NBDExport *exp)
-{
-    return exp->blk;
-}
 
-void nbd_export_close_all(void)
-{
-    NBDExport *exp, *next;
-    AioContext *aio_context;
-
-    QTAILQ_FOREACH_SAFE(exp, &exports, next, next) {
-        aio_context = exp->ctx;
-        aio_context_acquire(aio_context);
-        nbd_export_close(exp);
-        aio_context_release(aio_context);
+    for (i = 0; i < exp->nr_export_bitmaps; i++) {
+        bdrv_dirty_bitmap_set_busy(exp->export_bitmaps[i], false);
     }
 }
 
+const BlockExportDriver blk_exp_nbd = {
+    .type               = BLOCK_EXPORT_TYPE_NBD,
+    .instance_size      = sizeof(NBDExport),
+    .create             = nbd_export_create,
+    .delete             = nbd_export_delete,
+    .request_shutdown   = nbd_export_request_shutdown,
+};
+
 static int coroutine_fn nbd_co_send_iov(NBDClient *client, struct iovec *iov,
                                         unsigned niov, Error **errp)
 {
@@ -1875,7 +1939,7 @@ static int coroutine_fn nbd_co_send_sparse_read(NBDClient *client,
 
     while (progress < size) {
         int64_t pnum;
-        int status = bdrv_block_status_above(blk_bs(exp->blk), NULL,
+        int status = bdrv_block_status_above(blk_bs(exp->common.blk), NULL,
                                              offset + progress,
                                              size - progress, &pnum, NULL,
                                              NULL);
@@ -1907,7 +1971,7 @@ static int coroutine_fn nbd_co_send_sparse_read(NBDClient *client,
             stl_be_p(&chunk.length, pnum);
             ret = nbd_co_send_iov(client, iov, 1, errp);
         } else {
-            ret = blk_pread(exp->blk, offset + progress + exp->dev_offset,
+            ret = blk_pread(exp->common.blk, offset + progress,
                             data + progress, pnum);
             if (ret < 0) {
                 error_setg_errno(errp, -ret, "reading from file failed");
@@ -2037,6 +2101,29 @@ static int blockstatus_to_extents(BlockDriverState *bs, uint64_t offset,
     return 0;
 }
 
+static int blockalloc_to_extents(BlockDriverState *bs, uint64_t offset,
+                                 uint64_t bytes, NBDExtentArray *ea)
+{
+    while (bytes) {
+        int64_t num;
+        int ret = bdrv_is_allocated_above(bs, NULL, false, offset, bytes,
+                                          &num);
+
+        if (ret < 0) {
+            return ret;
+        }
+
+        if (nbd_extent_array_add(ea, num, ret) < 0) {
+            return 0;
+        }
+
+        offset += num;
+        bytes -= num;
+    }
+
+    return 0;
+}
+
 /*
  * nbd_co_send_extents
  *
@@ -2076,7 +2163,11 @@ static int nbd_co_send_block_status(NBDClient *client, uint64_t handle,
     unsigned int nb_extents = dont_fragment ? 1 : NBD_MAX_BLOCK_STATUS_EXTENTS;
     g_autoptr(NBDExtentArray) ea = nbd_extent_array_new(nb_extents);
 
-    ret = blockstatus_to_extents(bs, offset, length, ea);
+    if (context_id == NBD_META_ID_BASE_ALLOCATION) {
+        ret = blockstatus_to_extents(bs, offset, length, ea);
+    } else {
+        ret = blockalloc_to_extents(bs, offset, length, ea);
+    }
     if (ret < 0) {
         return nbd_co_send_structured_error(
                 client, handle, -ret, "can't get block status", errp);
@@ -2110,8 +2201,8 @@ static void bitmap_to_extents(BdrvDirtyBitmap *bitmap,
     }
 
     if (!full) {
-        /* last non dirty extent */
-        nbd_extent_array_add(es, end - start, 0);
+        /* last non dirty extent, nothing to do if array is now full */
+        (void) nbd_extent_array_add(es, end - start, 0);
     }
 
     bdrv_dirty_bitmap_unlock(bitmap);
@@ -2132,20 +2223,23 @@ static int nbd_co_send_bitmap(NBDClient *client, uint64_t handle,
 
 /* nbd_co_receive_request
  * Collect a client request. Return 0 if request looks valid, -EIO to drop
- * connection right away, and any other negative value to report an error to
- * the client (although the caller may still need to disconnect after reporting
- * the error).
+ * connection right away, -EAGAIN to indicate we were interrupted and the
+ * channel should be quiesced, and any other negative value to report an error
+ * to the client (although the caller may still need to disconnect after
+ * reporting the error).
  */
 static int nbd_co_receive_request(NBDRequestData *req, NBDRequest *request,
                                   Error **errp)
 {
     NBDClient *client = req->client;
     int valid_flags;
+    int ret;
 
     g_assert(qemu_in_coroutine());
     assert(client->recv_coroutine == qemu_coroutine_self());
-    if (nbd_receive_request(client->ioc, request, errp) < 0) {
-        return -EIO;
+    ret = nbd_receive_request(client, request, errp);
+    if (ret < 0) {
+        return  ret;
     }
 
     trace_nbd_co_receive_request_decode_type(request->handle, request->type,
@@ -2172,7 +2266,8 @@ static int nbd_co_receive_request(NBDRequestData *req, NBDRequest *request,
         }
 
         if (request->type != NBD_CMD_CACHE) {
-            req->data = blk_try_blockalign(client->exp->blk, request->len);
+            req->data = blk_try_blockalign(client->exp->common.blk,
+                                           request->len);
             if (req->data == NULL) {
                 error_setg(errp, "No memory");
                 return -ENOMEM;
@@ -2268,7 +2363,7 @@ static coroutine_fn int nbd_do_cmd_read(NBDClient *client, NBDRequest *request,
 
     /* XXX: NBD Protocol only documents use of FUA with WRITE */
     if (request->flags & NBD_CMD_FLAG_FUA) {
-        ret = blk_co_flush(exp->blk);
+        ret = blk_co_flush(exp->common.blk);
         if (ret < 0) {
             return nbd_send_generic_reply(client, request->handle, ret,
                                           "flush failed", errp);
@@ -2282,8 +2377,7 @@ static coroutine_fn int nbd_do_cmd_read(NBDClient *client, NBDRequest *request,
                                        data, request->len, errp);
     }
 
-    ret = blk_pread(exp->blk, request->from + exp->dev_offset, data,
-                    request->len);
+    ret = blk_pread(exp->common.blk, request->from, data, request->len);
     if (ret < 0) {
         return nbd_send_generic_reply(client, request->handle, ret,
                                       "reading from file failed", errp);
@@ -2318,7 +2412,7 @@ static coroutine_fn int nbd_do_cmd_cache(NBDClient *client, NBDRequest *request,
 
     assert(request->type == NBD_CMD_CACHE);
 
-    ret = blk_co_preadv(exp->blk, request->from + exp->dev_offset, request->len,
+    ret = blk_co_preadv(exp->common.blk, request->from, request->len,
                         NULL, BDRV_REQ_COPY_ON_READ | BDRV_REQ_PREFETCH);
 
     return nbd_send_generic_reply(client, request->handle, ret,
@@ -2336,6 +2430,7 @@ static coroutine_fn int nbd_handle_request(NBDClient *client,
     int flags;
     NBDExport *exp = client->exp;
     char *msg;
+    size_t i;
 
     switch (request->type) {
     case NBD_CMD_CACHE:
@@ -2349,8 +2444,8 @@ static coroutine_fn int nbd_handle_request(NBDClient *client,
         if (request->flags & NBD_CMD_FLAG_FUA) {
             flags |= BDRV_REQ_FUA;
         }
-        ret = blk_pwrite(exp->blk, request->from + exp->dev_offset,
-                         data, request->len, flags);
+        ret = blk_pwrite(exp->common.blk, request->from, data, request->len,
+                         flags);
         return nbd_send_generic_reply(client, request->handle, ret,
                                       "writing to file failed", errp);
 
@@ -2365,8 +2460,16 @@ static coroutine_fn int nbd_handle_request(NBDClient *client,
         if (request->flags & NBD_CMD_FLAG_FAST_ZERO) {
             flags |= BDRV_REQ_NO_FALLBACK;
         }
-        ret = blk_pwrite_zeroes(exp->blk, request->from + exp->dev_offset,
-                                request->len, flags);
+        ret = 0;
+        /* FIXME simplify this when blk_pwrite_zeroes switches to 64-bit */
+        while (ret >= 0 && request->len) {
+            int align = client->check_align ?: 1;
+            int len = MIN(request->len, QEMU_ALIGN_DOWN(BDRV_REQUEST_MAX_BYTES,
+                                                        align));
+            ret = blk_pwrite_zeroes(exp->common.blk, request->from, len, flags);
+            request->len -= len;
+            request->from += len;
+        }
         return nbd_send_generic_reply(client, request->handle, ret,
                                       "writing to file failed", errp);
 
@@ -2375,15 +2478,23 @@ static coroutine_fn int nbd_handle_request(NBDClient *client,
         abort();
 
     case NBD_CMD_FLUSH:
-        ret = blk_co_flush(exp->blk);
+        ret = blk_co_flush(exp->common.blk);
         return nbd_send_generic_reply(client, request->handle, ret,
                                       "flush failed", errp);
 
     case NBD_CMD_TRIM:
-        ret = blk_co_pdiscard(exp->blk, request->from + exp->dev_offset,
-                              request->len);
-        if (ret == 0 && request->flags & NBD_CMD_FLAG_FUA) {
-            ret = blk_co_flush(exp->blk);
+        ret = 0;
+        /* FIXME simplify this when blk_co_pdiscard switches to 64-bit */
+        while (ret >= 0 && request->len) {
+            int align = client->check_align ?: 1;
+            int len = MIN(request->len, QEMU_ALIGN_DOWN(BDRV_REQUEST_MAX_BYTES,
+                                                        align));
+            ret = blk_co_pdiscard(exp->common.blk, request->from, len);
+            request->len -= len;
+            request->from += len;
+        }
+        if (ret >= 0 && request->flags & NBD_CMD_FLAG_FUA) {
+            ret = blk_co_flush(exp->common.blk);
         }
         return nbd_send_generic_reply(client, request->handle, ret,
                                       "discard failed", errp);
@@ -2393,17 +2504,16 @@ static coroutine_fn int nbd_handle_request(NBDClient *client,
             return nbd_send_generic_reply(client, request->handle, -EINVAL,
                                           "need non-zero length", errp);
         }
-        if (client->export_meta.valid &&
-            (client->export_meta.base_allocation ||
-             client->export_meta.bitmap))
-        {
+        if (client->export_meta.count) {
             bool dont_fragment = request->flags & NBD_CMD_FLAG_REQ_ONE;
+            int contexts_remaining = client->export_meta.count;
 
             if (client->export_meta.base_allocation) {
                 ret = nbd_co_send_block_status(client, request->handle,
-                                               blk_bs(exp->blk), request->from,
+                                               blk_bs(exp->common.blk),
+                                               request->from,
                                                request->len, dont_fragment,
-                                               !client->export_meta.bitmap,
+                                               !--contexts_remaining,
                                                NBD_META_ID_BASE_ALLOCATION,
                                                errp);
                 if (ret < 0) {
@@ -2411,17 +2521,35 @@ static coroutine_fn int nbd_handle_request(NBDClient *client,
                 }
             }
 
-            if (client->export_meta.bitmap) {
+            if (client->export_meta.allocation_depth) {
+                ret = nbd_co_send_block_status(client, request->handle,
+                                               blk_bs(exp->common.blk),
+                                               request->from, request->len,
+                                               dont_fragment,
+                                               !--contexts_remaining,
+                                               NBD_META_ID_ALLOCATION_DEPTH,
+                                               errp);
+                if (ret < 0) {
+                    return ret;
+                }
+            }
+
+            for (i = 0; i < client->exp->nr_export_bitmaps; i++) {
+                if (!client->export_meta.bitmaps[i]) {
+                    continue;
+                }
                 ret = nbd_co_send_bitmap(client, request->handle,
-                                         client->exp->export_bitmap,
+                                         client->exp->export_bitmaps[i],
                                          request->from, request->len,
-                                         dont_fragment,
-                                         true, NBD_META_ID_DIRTY_BITMAP, errp);
+                                         dont_fragment, !--contexts_remaining,
+                                         NBD_META_ID_DIRTY_BITMAP + i, errp);
                 if (ret < 0) {
                     return ret;
                 }
             }
 
+            assert(!contexts_remaining);
+
             return 0;
         } else {
             return nbd_send_generic_reply(client, request->handle, -EINVAL,
@@ -2454,6 +2582,17 @@ static coroutine_fn void nbd_trip(void *opaque)
         return;
     }
 
+    if (client->quiescing) {
+        /*
+         * We're switching between AIO contexts. Don't attempt to receive a new
+         * request and kick the main context which may be waiting for us.
+         */
+        nbd_client_put(client);
+        client->recv_coroutine = NULL;
+        aio_wait_kick();
+        return;
+    }
+
     req = nbd_request_get(client);
     ret = nbd_co_receive_request(req, &request, &local_err);
     client->recv_coroutine = NULL;
@@ -2466,6 +2605,11 @@ static coroutine_fn void nbd_trip(void *opaque)
         goto done;
     }
 
+    if (ret == -EAGAIN) {
+        assert(client->quiescing);
+        goto done;
+    }
+
     nbd_client_receive_next_request(client);
     if (ret == -EIO) {
         goto disconnect;
@@ -2512,10 +2656,11 @@ disconnect:
 
 static void nbd_client_receive_next_request(NBDClient *client)
 {
-    if (!client->recv_coroutine && client->nb_requests < MAX_NBD_REQUESTS) {
+    if (!client->recv_coroutine && client->nb_requests < MAX_NBD_REQUESTS &&
+        !client->quiescing) {
         nbd_client_get(client);
         client->recv_coroutine = qemu_coroutine_create(nbd_trip, client);
-        aio_co_schedule(client->exp->ctx, client->recv_coroutine);
+        aio_co_schedule(client->exp->common.ctx, client->recv_coroutine);
     }
 }
 
This page took 0.06843 seconds and 4 git commands to generate.