#include "qemu/units.h"
#define NBD_META_ID_BASE_ALLOCATION 0
-#define NBD_META_ID_DIRTY_BITMAP 1
+#define NBD_META_ID_ALLOCATION_DEPTH 1
+/* Dirty bitmaps use 'NBD_META_ID_DIRTY_BITMAP + i', so keep this id last. */
+#define NBD_META_ID_DIRTY_BITMAP 2
/*
* NBD_MAX_BLOCK_STATUS_EXTENTS: 1 MiB of extents data. An empirical
BlockBackend *eject_notifier_blk;
Notifier eject_notifier;
- BdrvDirtyBitmap *export_bitmap;
- char *export_bitmap_context;
+ bool allocation_depth;
+ BdrvDirtyBitmap **export_bitmaps;
+ size_t nr_export_bitmaps;
};
static QTAILQ_HEAD(, NBDExport) exports = QTAILQ_HEAD_INITIALIZER(exports);
* NBD_OPT_LIST_META_CONTEXT. */
typedef struct NBDExportMetaContexts {
NBDExport *exp;
- bool valid; /* means that negotiation of the option finished without
- errors */
+ size_t count; /* number of negotiated contexts */
bool base_allocation; /* export base:allocation context (block status) */
- bool bitmap; /* export qemu:dirty-bitmap:<export bitmap name> */
+ bool allocation_depth; /* export qemu:allocation-depth */
+ bool *bitmaps; /*
+ * export qemu:dirty-bitmap:<export bitmap name>,
+ * sized by exp->nr_export_bitmaps
+ */
} NBDExportMetaContexts;
struct NBDClient {
CoMutex send_lock;
Coroutine *send_coroutine;
+ bool read_yielding;
+ bool quiescing;
+
QTAILQ_ENTRY(NBDClient) next;
int nb_requests;
bool closing;
static void nbd_check_meta_export(NBDClient *client)
{
- client->export_meta.valid &= client->exp == client->export_meta.exp;
+ if (client->exp != client->export_meta.exp) {
+ client->export_meta.count = 0;
+ }
}
/* Send a reply to NBD_OPT_EXPORT_NAME.
/* nbd_meta_qemu_query
*
* Handle queries to 'qemu' namespace. For now, only the qemu:dirty-bitmap:
- * context is available. Return true if @query has been handled.
+ * and qemu:allocation-depth contexts are available. Return true if @query
+ * has been handled.
*/
static bool nbd_meta_qemu_query(NBDClient *client, NBDExportMetaContexts *meta,
const char *query)
{
+ size_t i;
+
if (!nbd_strshift(&query, "qemu:")) {
return false;
}
if (!*query) {
if (client->opt == NBD_OPT_LIST_META_CONTEXT) {
- meta->bitmap = !!meta->exp->export_bitmap;
+ meta->allocation_depth = meta->exp->allocation_depth;
+ memset(meta->bitmaps, 1, meta->exp->nr_export_bitmaps);
}
trace_nbd_negotiate_meta_query_parse("empty");
return true;
}
+ if (strcmp(query, "allocation-depth") == 0) {
+ trace_nbd_negotiate_meta_query_parse("allocation-depth");
+ meta->allocation_depth = meta->exp->allocation_depth;
+ return true;
+ }
+
if (nbd_strshift(&query, "dirty-bitmap:")) {
trace_nbd_negotiate_meta_query_parse("dirty-bitmap:");
- if (!meta->exp->export_bitmap) {
- trace_nbd_negotiate_meta_query_skip("no dirty-bitmap exported");
+ if (!*query) {
+ if (client->opt == NBD_OPT_LIST_META_CONTEXT) {
+ memset(meta->bitmaps, 1, meta->exp->nr_export_bitmaps);
+ }
+ trace_nbd_negotiate_meta_query_parse("empty");
return true;
}
- if (nbd_meta_empty_or_pattern(client,
- meta->exp->export_bitmap_context +
- strlen("qemu:dirty-bitmap:"), query)) {
- meta->bitmap = true;
+
+ for (i = 0; i < meta->exp->nr_export_bitmaps; i++) {
+ const char *bm_name;
+
+ bm_name = bdrv_dirty_bitmap_name(meta->exp->export_bitmaps[i]);
+ if (strcmp(bm_name, query) == 0) {
+ meta->bitmaps[i] = true;
+ trace_nbd_negotiate_meta_query_parse(query);
+ return true;
+ }
}
+ trace_nbd_negotiate_meta_query_skip("no dirty-bitmap match");
return true;
}
- trace_nbd_negotiate_meta_query_skip("not dirty-bitmap");
+ trace_nbd_negotiate_meta_query_skip("unknown qemu context");
return true;
}
{
int ret;
g_autofree char *export_name = NULL;
- NBDExportMetaContexts local_meta;
+ g_autofree bool *bitmaps = NULL;
+ NBDExportMetaContexts local_meta = {0};
uint32_t nb_queries;
- int i;
+ size_t i;
+ size_t count = 0;
if (!client->structured_reply) {
return nbd_opt_invalid(client, errp,
meta = &local_meta;
}
+ g_free(meta->bitmaps);
memset(meta, 0, sizeof(*meta));
ret = nbd_opt_read_name(client, &export_name, NULL, errp);
return nbd_opt_drop(client, NBD_REP_ERR_UNKNOWN, errp,
"export '%s' not present", sane_name);
}
+ meta->bitmaps = g_new0(bool, meta->exp->nr_export_bitmaps);
+ if (client->opt == NBD_OPT_LIST_META_CONTEXT) {
+ bitmaps = meta->bitmaps;
+ }
ret = nbd_opt_read(client, &nb_queries, sizeof(nb_queries), false, errp);
if (ret <= 0) {
if (client->opt == NBD_OPT_LIST_META_CONTEXT && !nb_queries) {
/* enable all known contexts */
meta->base_allocation = true;
- meta->bitmap = !!meta->exp->export_bitmap;
+ meta->allocation_depth = meta->exp->allocation_depth;
+ memset(meta->bitmaps, 1, meta->exp->nr_export_bitmaps);
} else {
for (i = 0; i < nb_queries; ++i) {
ret = nbd_negotiate_meta_query(client, meta, errp);
if (ret < 0) {
return ret;
}
+ count++;
}
- if (meta->bitmap) {
- ret = nbd_negotiate_send_meta_context(client,
- meta->exp->export_bitmap_context,
- NBD_META_ID_DIRTY_BITMAP,
+ if (meta->allocation_depth) {
+ ret = nbd_negotiate_send_meta_context(client, "qemu:allocation-depth",
+ NBD_META_ID_ALLOCATION_DEPTH,
errp);
if (ret < 0) {
return ret;
}
+ count++;
+ }
+
+ for (i = 0; i < meta->exp->nr_export_bitmaps; i++) {
+ const char *bm_name;
+ g_autofree char *context = NULL;
+
+ if (!meta->bitmaps[i]) {
+ continue;
+ }
+
+ bm_name = bdrv_dirty_bitmap_name(meta->exp->export_bitmaps[i]);
+ context = g_strdup_printf("qemu:dirty-bitmap:%s", bm_name);
+
+ ret = nbd_negotiate_send_meta_context(client, context,
+ NBD_META_ID_DIRTY_BITMAP + i,
+ errp);
+ if (ret < 0) {
+ return ret;
+ }
+ count++;
}
ret = nbd_negotiate_send_rep(client, NBD_REP_ACK, errp);
if (ret == 0) {
- meta->valid = true;
+ meta->count = count;
}
return ret;
return 0;
}
-static int nbd_receive_request(QIOChannel *ioc, NBDRequest *request,
+/* nbd_read_eof
+ * Tries to read @size bytes from @ioc. This is a local implementation of
+ * qio_channel_readv_all_eof. We have it here because we need it to be
+ * interruptible and to know when the coroutine is yielding.
+ * Returns 1 on success
+ * 0 on eof, when no data was read (errp is not set)
+ * negative errno on failure (errp is set)
+ */
+static inline int coroutine_fn
+nbd_read_eof(NBDClient *client, void *buffer, size_t size, Error **errp)
+{
+ bool partial = false;
+
+ assert(size);
+ while (size > 0) {
+ struct iovec iov = { .iov_base = buffer, .iov_len = size };
+ ssize_t len;
+
+ len = qio_channel_readv(client->ioc, &iov, 1, errp);
+ if (len == QIO_CHANNEL_ERR_BLOCK) {
+ client->read_yielding = true;
+ qio_channel_yield(client->ioc, G_IO_IN);
+ client->read_yielding = false;
+ if (client->quiescing) {
+ return -EAGAIN;
+ }
+ continue;
+ } else if (len < 0) {
+ return -EIO;
+ } else if (len == 0) {
+ if (partial) {
+ error_setg(errp,
+ "Unexpected end-of-file before all bytes were read");
+ return -EIO;
+ } else {
+ return 0;
+ }
+ }
+
+ partial = true;
+ size -= len;
+ buffer = (uint8_t *) buffer + len;
+ }
+ return 1;
+}
+
+static int nbd_receive_request(NBDClient *client, NBDRequest *request,
Error **errp)
{
uint8_t buf[NBD_REQUEST_SIZE];
uint32_t magic;
int ret;
- ret = nbd_read(ioc, buf, sizeof(buf), "request", errp);
+ ret = nbd_read_eof(client, buf, sizeof(buf), errp);
if (ret < 0) {
return ret;
}
QTAILQ_REMOVE(&client->exp->clients, client, next);
blk_exp_unref(&client->exp->common);
}
+ g_free(client->export_meta.bitmaps);
g_free(client);
}
}
QTAILQ_FOREACH(client, &exp->clients, next) {
qio_channel_attach_aio_context(client->ioc, ctx);
+
+ assert(client->recv_coroutine == NULL);
+ assert(client->send_coroutine == NULL);
+
+ if (client->quiescing) {
+ client->quiescing = false;
+ nbd_client_receive_next_request(client);
+ }
+ }
+}
+
+static void nbd_aio_detach_bh(void *opaque)
+{
+ NBDExport *exp = opaque;
+ NBDClient *client;
+
+ QTAILQ_FOREACH(client, &exp->clients, next) {
+ qio_channel_detach_aio_context(client->ioc);
+ client->quiescing = true;
+
if (client->recv_coroutine) {
- aio_co_schedule(ctx, client->recv_coroutine);
+ if (client->read_yielding) {
+ qemu_aio_coroutine_enter(exp->common.ctx,
+ client->recv_coroutine);
+ } else {
+ AIO_WAIT_WHILE(exp->common.ctx, client->recv_coroutine != NULL);
+ }
}
+
if (client->send_coroutine) {
- aio_co_schedule(ctx, client->send_coroutine);
+ AIO_WAIT_WHILE(exp->common.ctx, client->send_coroutine != NULL);
}
}
}
static void blk_aio_detach(void *opaque)
{
NBDExport *exp = opaque;
- NBDClient *client;
trace_nbd_blk_aio_detach(exp->name, exp->common.ctx);
- QTAILQ_FOREACH(client, &exp->clients, next) {
- qio_channel_detach_aio_context(client->ioc);
- }
+ aio_wait_bh_oneshot(exp->common.ctx, nbd_aio_detach_bh, exp);
exp->common.ctx = NULL;
}
uint64_t perm, shared_perm;
bool readonly = !exp_args->writable;
bool shared = !exp_args->writable;
+ strList *bitmaps;
+ size_t i;
int ret;
assert(exp_args->type == BLOCK_EXPORT_TYPE_NBD);
}
exp->size = QEMU_ALIGN_DOWN(size, BDRV_SECTOR_SIZE);
- if (arg->bitmap) {
+ for (bitmaps = arg->bitmaps; bitmaps; bitmaps = bitmaps->next) {
+ exp->nr_export_bitmaps++;
+ }
+ exp->export_bitmaps = g_new0(BdrvDirtyBitmap *, exp->nr_export_bitmaps);
+ for (i = 0, bitmaps = arg->bitmaps; bitmaps;
+ i++, bitmaps = bitmaps->next) {
+ const char *bitmap = bitmaps->value;
BlockDriverState *bs = blk_bs(blk);
BdrvDirtyBitmap *bm = NULL;
while (bs) {
- bm = bdrv_find_dirty_bitmap(bs, arg->bitmap);
+ bm = bdrv_find_dirty_bitmap(bs, bitmap);
if (bm != NULL) {
break;
}
if (bm == NULL) {
ret = -ENOENT;
- error_setg(errp, "Bitmap '%s' is not found", arg->bitmap);
+ error_setg(errp, "Bitmap '%s' is not found", bitmap);
goto fail;
}
ret = -EINVAL;
error_setg(errp,
"Enabled bitmap '%s' incompatible with readonly export",
- arg->bitmap);
+ bitmap);
goto fail;
}
- bdrv_dirty_bitmap_set_busy(bm, true);
- exp->export_bitmap = bm;
- assert(strlen(arg->bitmap) <= BDRV_BITMAP_MAX_NAME_SIZE);
- exp->export_bitmap_context = g_strdup_printf("qemu:dirty-bitmap:%s",
- arg->bitmap);
- assert(strlen(exp->export_bitmap_context) < NBD_MAX_STRING_SIZE);
+ exp->export_bitmaps[i] = bm;
+ assert(strlen(bitmap) <= BDRV_BITMAP_MAX_NAME_SIZE);
+ }
+
+ /* Mark bitmaps busy in a separate loop, to simplify roll-back concerns. */
+ for (i = 0; i < exp->nr_export_bitmaps; i++) {
+ bdrv_dirty_bitmap_set_busy(exp->export_bitmaps[i], true);
}
+ exp->allocation_depth = arg->allocation_depth;
+
blk_add_aio_context_notifier(blk, blk_aio_attached, blk_aio_detach, exp);
QTAILQ_INSERT_TAIL(&exports, exp, next);
return 0;
fail:
+ g_free(exp->export_bitmaps);
g_free(exp->name);
g_free(exp->description);
return ret;
static void nbd_export_delete(BlockExport *blk_exp)
{
+ size_t i;
NBDExport *exp = container_of(blk_exp, NBDExport, common);
assert(exp->name == NULL);
blk_aio_detach, exp);
}
- if (exp->export_bitmap) {
- bdrv_dirty_bitmap_set_busy(exp->export_bitmap, false);
- g_free(exp->export_bitmap_context);
+ for (i = 0; i < exp->nr_export_bitmaps; i++) {
+ bdrv_dirty_bitmap_set_busy(exp->export_bitmaps[i], false);
}
}
return 0;
}
+static int blockalloc_to_extents(BlockDriverState *bs, uint64_t offset,
+ uint64_t bytes, NBDExtentArray *ea)
+{
+ while (bytes) {
+ int64_t num;
+ int ret = bdrv_is_allocated_above(bs, NULL, false, offset, bytes,
+ &num);
+
+ if (ret < 0) {
+ return ret;
+ }
+
+ if (nbd_extent_array_add(ea, num, ret) < 0) {
+ return 0;
+ }
+
+ offset += num;
+ bytes -= num;
+ }
+
+ return 0;
+}
+
/*
* nbd_co_send_extents
*
unsigned int nb_extents = dont_fragment ? 1 : NBD_MAX_BLOCK_STATUS_EXTENTS;
g_autoptr(NBDExtentArray) ea = nbd_extent_array_new(nb_extents);
- ret = blockstatus_to_extents(bs, offset, length, ea);
+ if (context_id == NBD_META_ID_BASE_ALLOCATION) {
+ ret = blockstatus_to_extents(bs, offset, length, ea);
+ } else {
+ ret = blockalloc_to_extents(bs, offset, length, ea);
+ }
if (ret < 0) {
return nbd_co_send_structured_error(
client, handle, -ret, "can't get block status", errp);
}
if (!full) {
- /* last non dirty extent */
- nbd_extent_array_add(es, end - start, 0);
+ /* last non dirty extent, nothing to do if array is now full */
+ (void) nbd_extent_array_add(es, end - start, 0);
}
bdrv_dirty_bitmap_unlock(bitmap);
/* nbd_co_receive_request
* Collect a client request. Return 0 if request looks valid, -EIO to drop
- * connection right away, and any other negative value to report an error to
- * the client (although the caller may still need to disconnect after reporting
- * the error).
+ * connection right away, -EAGAIN to indicate we were interrupted and the
+ * channel should be quiesced, and any other negative value to report an error
+ * to the client (although the caller may still need to disconnect after
+ * reporting the error).
*/
static int nbd_co_receive_request(NBDRequestData *req, NBDRequest *request,
Error **errp)
{
NBDClient *client = req->client;
int valid_flags;
+ int ret;
g_assert(qemu_in_coroutine());
assert(client->recv_coroutine == qemu_coroutine_self());
- if (nbd_receive_request(client->ioc, request, errp) < 0) {
- return -EIO;
+ ret = nbd_receive_request(client, request, errp);
+ if (ret < 0) {
+ return ret;
}
trace_nbd_co_receive_request_decode_type(request->handle, request->type,
int flags;
NBDExport *exp = client->exp;
char *msg;
+ size_t i;
switch (request->type) {
case NBD_CMD_CACHE:
return nbd_send_generic_reply(client, request->handle, -EINVAL,
"need non-zero length", errp);
}
- if (client->export_meta.valid &&
- (client->export_meta.base_allocation ||
- client->export_meta.bitmap))
- {
+ if (client->export_meta.count) {
bool dont_fragment = request->flags & NBD_CMD_FLAG_REQ_ONE;
+ int contexts_remaining = client->export_meta.count;
if (client->export_meta.base_allocation) {
ret = nbd_co_send_block_status(client, request->handle,
blk_bs(exp->common.blk),
request->from,
request->len, dont_fragment,
- !client->export_meta.bitmap,
+ !--contexts_remaining,
NBD_META_ID_BASE_ALLOCATION,
errp);
if (ret < 0) {
}
}
- if (client->export_meta.bitmap) {
+ if (client->export_meta.allocation_depth) {
+ ret = nbd_co_send_block_status(client, request->handle,
+ blk_bs(exp->common.blk),
+ request->from, request->len,
+ dont_fragment,
+ !--contexts_remaining,
+ NBD_META_ID_ALLOCATION_DEPTH,
+ errp);
+ if (ret < 0) {
+ return ret;
+ }
+ }
+
+ for (i = 0; i < client->exp->nr_export_bitmaps; i++) {
+ if (!client->export_meta.bitmaps[i]) {
+ continue;
+ }
ret = nbd_co_send_bitmap(client, request->handle,
- client->exp->export_bitmap,
+ client->exp->export_bitmaps[i],
request->from, request->len,
- dont_fragment,
- true, NBD_META_ID_DIRTY_BITMAP, errp);
+ dont_fragment, !--contexts_remaining,
+ NBD_META_ID_DIRTY_BITMAP + i, errp);
if (ret < 0) {
return ret;
}
}
+ assert(!contexts_remaining);
+
return 0;
} else {
return nbd_send_generic_reply(client, request->handle, -EINVAL,
return;
}
+ if (client->quiescing) {
+ /*
+ * We're switching between AIO contexts. Don't attempt to receive a new
+ * request and kick the main context which may be waiting for us.
+ */
+ nbd_client_put(client);
+ client->recv_coroutine = NULL;
+ aio_wait_kick();
+ return;
+ }
+
req = nbd_request_get(client);
ret = nbd_co_receive_request(req, &request, &local_err);
client->recv_coroutine = NULL;
goto done;
}
+ if (ret == -EAGAIN) {
+ assert(client->quiescing);
+ goto done;
+ }
+
nbd_client_receive_next_request(client);
if (ret == -EIO) {
goto disconnect;
static void nbd_client_receive_next_request(NBDClient *client)
{
- if (!client->recv_coroutine && client->nb_requests < MAX_NBD_REQUESTS) {
+ if (!client->recv_coroutine && client->nb_requests < MAX_NBD_REQUESTS &&
+ !client->quiescing) {
nbd_client_get(client);
client->recv_coroutine = qemu_coroutine_create(nbd_trip, client);
aio_co_schedule(client->exp->common.ctx, client->recv_coroutine);