/*
- * Copyright (C) 2016-2018 Red Hat, Inc.
+ * Copyright (C) 2016-2020 Red Hat, Inc.
*
* Network Block Device Server Side
*/
#include "qemu/osdep.h"
+
+#include "block/export.h"
#include "qapi/error.h"
#include "qemu/queue.h"
#include "trace.h"
#include "qemu/units.h"
#define NBD_META_ID_BASE_ALLOCATION 0
-#define NBD_META_ID_DIRTY_BITMAP 1
+#define NBD_META_ID_ALLOCATION_DEPTH 1
+/* Dirty bitmaps use 'NBD_META_ID_DIRTY_BITMAP + i', so keep this id last. */
+#define NBD_META_ID_DIRTY_BITMAP 2
/*
* NBD_MAX_BLOCK_STATUS_EXTENTS: 1 MiB of extents data. An empirical
};
struct NBDExport {
- int refcount;
- void (*close)(NBDExport *exp);
+ BlockExport common;
- BlockBackend *blk;
char *name;
char *description;
- uint64_t dev_offset;
uint64_t size;
uint16_t nbdflags;
QTAILQ_HEAD(, NBDClient) clients;
QTAILQ_ENTRY(NBDExport) next;
- AioContext *ctx;
-
BlockBackend *eject_notifier_blk;
Notifier eject_notifier;
- BdrvDirtyBitmap *export_bitmap;
- char *export_bitmap_context;
+ bool allocation_depth;
+ BdrvDirtyBitmap **export_bitmaps;
+ size_t nr_export_bitmaps;
};
static QTAILQ_HEAD(, NBDExport) exports = QTAILQ_HEAD_INITIALIZER(exports);
-static QTAILQ_HEAD(, NBDExport) closed_exports =
- QTAILQ_HEAD_INITIALIZER(closed_exports);
/* NBDExportMetaContexts represents a list of contexts to be exported,
* as selected by NBD_OPT_SET_META_CONTEXT. Also used for
* NBD_OPT_LIST_META_CONTEXT. */
typedef struct NBDExportMetaContexts {
NBDExport *exp;
- bool valid; /* means that negotiation of the option finished without
- errors */
+ size_t count; /* number of negotiated contexts */
bool base_allocation; /* export base:allocation context (block status) */
- bool bitmap; /* export qemu:dirty-bitmap:<export bitmap name> */
+ bool allocation_depth; /* export qemu:allocation-depth */
+ bool *bitmaps; /*
+ * export qemu:dirty-bitmap:<export bitmap name>,
+ * sized by exp->nr_export_bitmaps
+ */
} NBDExportMetaContexts;
struct NBDClient {
CoMutex send_lock;
Coroutine *send_coroutine;
+ bool read_yielding;
+ bool quiescing;
+
QTAILQ_ENTRY(NBDClient) next;
int nb_requests;
bool closing;
}
/* Read size bytes from the unparsed payload of the current option.
+ * If @check_nul, require that no NUL bytes appear in buffer.
* Return -errno on I/O error, 0 if option was completely handled by
* sending a reply about inconsistent lengths, or 1 on success. */
static int nbd_opt_read(NBDClient *client, void *buffer, size_t size,
- Error **errp)
+ bool check_nul, Error **errp)
{
if (size > client->optlen) {
return nbd_opt_invalid(client, errp,
nbd_opt_lookup(client->opt));
}
client->optlen -= size;
- return qio_channel_read_all(client->ioc, buffer, size, errp) < 0 ? -EIO : 1;
+ if (qio_channel_read_all(client->ioc, buffer, size, errp) < 0) {
+ return -EIO;
+ }
+
+ if (check_nul && strnlen(buffer, size) != size) {
+ return nbd_opt_invalid(client, errp,
+ "Unexpected embedded NUL in option %s",
+ nbd_opt_lookup(client->opt));
+ }
+ return 1;
}
/* Drop size bytes from the unparsed payload of the current option.
g_autofree char *local_name = NULL;
*name = NULL;
- ret = nbd_opt_read(client, &len, sizeof(len), errp);
+ ret = nbd_opt_read(client, &len, sizeof(len), false, errp);
if (ret <= 0) {
return ret;
}
}
local_name = g_malloc(len + 1);
- ret = nbd_opt_read(client, local_name, len, errp);
+ ret = nbd_opt_read(client, local_name, len, true, errp);
if (ret <= 0) {
return ret;
}
static void nbd_check_meta_export(NBDClient *client)
{
- client->export_meta.valid &= client->exp == client->export_meta.exp;
+ if (client->exp != client->export_meta.exp) {
+ client->export_meta.count = 0;
+ }
}
/* Send a reply to NBD_OPT_EXPORT_NAME.
}
QTAILQ_INSERT_TAIL(&client->exp->clients, client, next);
- nbd_export_get(client->exp);
+ blk_exp_ref(&client->exp->common);
nbd_check_meta_export(client);
return 0;
NBDExport *exp;
uint16_t requests;
uint16_t request;
- uint32_t namelen;
+ uint32_t namelen = 0;
bool sendname = false;
bool blocksize = false;
uint32_t sizes[3];
}
trace_nbd_negotiate_handle_export_name_request(name);
- rc = nbd_opt_read(client, &requests, sizeof(requests), errp);
+ rc = nbd_opt_read(client, &requests, sizeof(requests), false, errp);
if (rc <= 0) {
return rc;
}
requests = be16_to_cpu(requests);
trace_nbd_negotiate_handle_info_requests(requests);
while (requests--) {
- rc = nbd_opt_read(client, &request, sizeof(request), errp);
+ rc = nbd_opt_read(client, &request, sizeof(request), false, errp);
if (rc <= 0) {
return rc;
}
* whether this is OPT_INFO or OPT_GO. */
/* minimum - 1 for back-compat, or actual if client will obey it. */
if (client->opt == NBD_OPT_INFO || blocksize) {
- check_align = sizes[0] = blk_get_request_alignment(exp->blk);
+ check_align = sizes[0] = blk_get_request_alignment(exp->common.blk);
} else {
sizes[0] = 1;
}
* TODO: is blk_bs(blk)->bl.opt_transfer appropriate? */
sizes[1] = MAX(4096, sizes[0]);
/* maximum - At most 32M, but smaller as appropriate. */
- sizes[2] = MIN(blk_get_max_transfer(exp->blk), NBD_MAX_BUFFER_SIZE);
+ sizes[2] = MIN(blk_get_max_transfer(exp->common.blk), NBD_MAX_BUFFER_SIZE);
trace_nbd_negotiate_handle_info_block_size(sizes[0], sizes[1], sizes[2]);
sizes[0] = cpu_to_be32(sizes[0]);
sizes[1] = cpu_to_be32(sizes[1]);
* tolerate all clients, regardless of alignments.
*/
if (client->opt == NBD_OPT_INFO && !blocksize &&
- blk_get_request_alignment(exp->blk) > 1) {
+ blk_get_request_alignment(exp->common.blk) > 1) {
return nbd_negotiate_send_rep_err(client,
NBD_REP_ERR_BLOCK_SIZE_REQD,
errp,
client->exp = exp;
client->check_align = check_align;
QTAILQ_INSERT_TAIL(&client->exp->clients, client, next);
- nbd_export_get(client->exp);
+ blk_exp_ref(&client->exp->common);
nbd_check_meta_export(client);
rc = 1;
}
return qio_channel_writev_all(client->ioc, iov, 2, errp) < 0 ? -EIO : 0;
}
-/* Read strlen(@pattern) bytes, and set @match to true if they match @pattern.
- * @match is never set to false.
- *
- * Return -errno on I/O error, 0 if option was completely handled by
- * sending a reply about inconsistent lengths, or 1 on success.
- *
- * Note: return code = 1 doesn't mean that we've read exactly @pattern.
- * It only means that there are no errors.
+/*
+ * Return true if @query matches @pattern, or if @query is empty when
+ * the @client is performing _LIST_.
*/
-static int nbd_meta_pattern(NBDClient *client, const char *pattern, bool *match,
- Error **errp)
+static bool nbd_meta_empty_or_pattern(NBDClient *client, const char *pattern,
+ const char *query)
{
- int ret;
- char *query;
- size_t len = strlen(pattern);
-
- assert(len);
-
- query = g_malloc(len);
- ret = nbd_opt_read(client, query, len, errp);
- if (ret <= 0) {
- g_free(query);
- return ret;
+ if (!*query) {
+ trace_nbd_negotiate_meta_query_parse("empty");
+ return client->opt == NBD_OPT_LIST_META_CONTEXT;
}
-
- if (strncmp(query, pattern, len) == 0) {
+ if (strcmp(query, pattern) == 0) {
trace_nbd_negotiate_meta_query_parse(pattern);
- *match = true;
- } else {
- trace_nbd_negotiate_meta_query_skip("pattern not matched");
+ return true;
}
- g_free(query);
-
- return 1;
+ trace_nbd_negotiate_meta_query_skip("pattern not matched");
+ return false;
}
/*
- * Read @len bytes, and set @match to true if they match @pattern, or if @len
- * is 0 and the client is performing _LIST_. @match is never set to false.
- *
- * Return -errno on I/O error, 0 if option was completely handled by
- * sending a reply about inconsistent lengths, or 1 on success.
- *
- * Note: return code = 1 doesn't mean that we've read exactly @pattern.
- * It only means that there are no errors.
+ * Return true and adjust @str in place if it begins with @prefix.
*/
-static int nbd_meta_empty_or_pattern(NBDClient *client, const char *pattern,
- uint32_t len, bool *match, Error **errp)
+static bool nbd_strshift(const char **str, const char *prefix)
{
- if (len == 0) {
- if (client->opt == NBD_OPT_LIST_META_CONTEXT) {
- *match = true;
- }
- trace_nbd_negotiate_meta_query_parse("empty");
- return 1;
- }
+ size_t len = strlen(prefix);
- if (len != strlen(pattern)) {
- trace_nbd_negotiate_meta_query_skip("different lengths");
- return nbd_opt_skip(client, len, errp);
+ if (strncmp(*str, prefix, len) == 0) {
+ *str += len;
+ return true;
}
-
- return nbd_meta_pattern(client, pattern, match, errp);
+ return false;
}
/* nbd_meta_base_query
*
* Handle queries to 'base' namespace. For now, only the base:allocation
- * context is available. 'len' is the amount of text remaining to be read from
- * the current name, after the 'base:' portion has been stripped.
- *
- * Return -errno on I/O error, 0 if option was completely handled by
- * sending a reply about inconsistent lengths, or 1 on success.
+ * context is available. Return true if @query has been handled.
*/
-static int nbd_meta_base_query(NBDClient *client, NBDExportMetaContexts *meta,
- uint32_t len, Error **errp)
+static bool nbd_meta_base_query(NBDClient *client, NBDExportMetaContexts *meta,
+ const char *query)
{
- return nbd_meta_empty_or_pattern(client, "allocation", len,
- &meta->base_allocation, errp);
+ if (!nbd_strshift(&query, "base:")) {
+ return false;
+ }
+ trace_nbd_negotiate_meta_query_parse("base:");
+
+ if (nbd_meta_empty_or_pattern(client, "allocation", query)) {
+ meta->base_allocation = true;
+ }
+ return true;
}
-/* nbd_meta_bitmap_query
+/* nbd_meta_qemu_query
*
- * Handle query to 'qemu:' namespace.
- * @len is the amount of text remaining to be read from the current name, after
- * the 'qemu:' portion has been stripped.
- *
- * Return -errno on I/O error, 0 if option was completely handled by
- * sending a reply about inconsistent lengths, or 1 on success. */
-static int nbd_meta_qemu_query(NBDClient *client, NBDExportMetaContexts *meta,
- uint32_t len, Error **errp)
+ * Handle queries to 'qemu' namespace. For now, only the qemu:dirty-bitmap:
+ * and qemu:allocation-depth contexts are available. Return true if @query
+ * has been handled.
+ */
+static bool nbd_meta_qemu_query(NBDClient *client, NBDExportMetaContexts *meta,
+ const char *query)
{
- bool dirty_bitmap = false;
- size_t dirty_bitmap_len = strlen("dirty-bitmap:");
- int ret;
+ size_t i;
- if (!meta->exp->export_bitmap) {
- trace_nbd_negotiate_meta_query_skip("no dirty-bitmap exported");
- return nbd_opt_skip(client, len, errp);
+ if (!nbd_strshift(&query, "qemu:")) {
+ return false;
}
+ trace_nbd_negotiate_meta_query_parse("qemu:");
- if (len == 0) {
+ if (!*query) {
if (client->opt == NBD_OPT_LIST_META_CONTEXT) {
- meta->bitmap = true;
+ meta->allocation_depth = meta->exp->allocation_depth;
+ memset(meta->bitmaps, 1, meta->exp->nr_export_bitmaps);
}
trace_nbd_negotiate_meta_query_parse("empty");
- return 1;
+ return true;
}
- if (len < dirty_bitmap_len) {
- trace_nbd_negotiate_meta_query_skip("not dirty-bitmap:");
- return nbd_opt_skip(client, len, errp);
+ if (strcmp(query, "allocation-depth") == 0) {
+ trace_nbd_negotiate_meta_query_parse("allocation-depth");
+ meta->allocation_depth = meta->exp->allocation_depth;
+ return true;
}
- len -= dirty_bitmap_len;
- ret = nbd_meta_pattern(client, "dirty-bitmap:", &dirty_bitmap, errp);
- if (ret <= 0) {
- return ret;
- }
- if (!dirty_bitmap) {
- trace_nbd_negotiate_meta_query_skip("not dirty-bitmap:");
- return nbd_opt_skip(client, len, errp);
- }
+ if (nbd_strshift(&query, "dirty-bitmap:")) {
+ trace_nbd_negotiate_meta_query_parse("dirty-bitmap:");
+ if (!*query) {
+ if (client->opt == NBD_OPT_LIST_META_CONTEXT) {
+ memset(meta->bitmaps, 1, meta->exp->nr_export_bitmaps);
+ }
+ trace_nbd_negotiate_meta_query_parse("empty");
+ return true;
+ }
+
+ for (i = 0; i < meta->exp->nr_export_bitmaps; i++) {
+ const char *bm_name;
- trace_nbd_negotiate_meta_query_parse("dirty-bitmap:");
+ bm_name = bdrv_dirty_bitmap_name(meta->exp->export_bitmaps[i]);
+ if (strcmp(bm_name, query) == 0) {
+ meta->bitmaps[i] = true;
+ trace_nbd_negotiate_meta_query_parse(query);
+ return true;
+ }
+ }
+ trace_nbd_negotiate_meta_query_skip("no dirty-bitmap match");
+ return true;
+ }
- return nbd_meta_empty_or_pattern(
- client, meta->exp->export_bitmap_context +
- strlen("qemu:dirty_bitmap:"), len, &meta->bitmap, errp);
+ trace_nbd_negotiate_meta_query_skip("unknown qemu context");
+ return true;
}
/* nbd_negotiate_meta_query
*
* The only supported namespaces are 'base' and 'qemu'.
*
- * The function aims not wasting time and memory to read long unknown namespace
- * names.
- *
* Return -errno on I/O error, 0 if option was completely handled by
* sending a reply about inconsistent lengths, or 1 on success. */
static int nbd_negotiate_meta_query(NBDClient *client,
NBDExportMetaContexts *meta, Error **errp)
{
- /*
- * Both 'qemu' and 'base' namespaces have length = 5 including a
- * colon. If another length namespace is later introduced, this
- * should certainly be refactored.
- */
int ret;
- size_t ns_len = 5;
- char ns[5];
+ g_autofree char *query = NULL;
uint32_t len;
- ret = nbd_opt_read(client, &len, sizeof(len), errp);
+ ret = nbd_opt_read(client, &len, sizeof(len), false, errp);
if (ret <= 0) {
return ret;
}
trace_nbd_negotiate_meta_query_skip("length too long");
return nbd_opt_skip(client, len, errp);
}
- if (len < ns_len) {
- trace_nbd_negotiate_meta_query_skip("length too short");
- return nbd_opt_skip(client, len, errp);
- }
- len -= ns_len;
- ret = nbd_opt_read(client, ns, ns_len, errp);
+ query = g_malloc(len + 1);
+ ret = nbd_opt_read(client, query, len, true, errp);
if (ret <= 0) {
return ret;
}
+ query[len] = '\0';
- if (!strncmp(ns, "base:", ns_len)) {
- trace_nbd_negotiate_meta_query_parse("base:");
- return nbd_meta_base_query(client, meta, len, errp);
- } else if (!strncmp(ns, "qemu:", ns_len)) {
- trace_nbd_negotiate_meta_query_parse("qemu:");
- return nbd_meta_qemu_query(client, meta, len, errp);
+ if (nbd_meta_base_query(client, meta, query)) {
+ return 1;
+ }
+ if (nbd_meta_qemu_query(client, meta, query)) {
+ return 1;
}
trace_nbd_negotiate_meta_query_skip("unknown namespace");
- return nbd_opt_skip(client, len, errp);
+ return 1;
}
/* nbd_negotiate_meta_queries
{
int ret;
g_autofree char *export_name = NULL;
- NBDExportMetaContexts local_meta;
+ g_autofree bool *bitmaps = NULL;
+ NBDExportMetaContexts local_meta = {0};
uint32_t nb_queries;
- int i;
+ size_t i;
+ size_t count = 0;
if (!client->structured_reply) {
return nbd_opt_invalid(client, errp,
meta = &local_meta;
}
+ g_free(meta->bitmaps);
memset(meta, 0, sizeof(*meta));
ret = nbd_opt_read_name(client, &export_name, NULL, errp);
return nbd_opt_drop(client, NBD_REP_ERR_UNKNOWN, errp,
"export '%s' not present", sane_name);
}
+ meta->bitmaps = g_new0(bool, meta->exp->nr_export_bitmaps);
+ if (client->opt == NBD_OPT_LIST_META_CONTEXT) {
+ bitmaps = meta->bitmaps;
+ }
- ret = nbd_opt_read(client, &nb_queries, sizeof(nb_queries), errp);
+ ret = nbd_opt_read(client, &nb_queries, sizeof(nb_queries), false, errp);
if (ret <= 0) {
return ret;
}
if (client->opt == NBD_OPT_LIST_META_CONTEXT && !nb_queries) {
/* enable all known contexts */
meta->base_allocation = true;
- meta->bitmap = !!meta->exp->export_bitmap;
+ meta->allocation_depth = meta->exp->allocation_depth;
+ memset(meta->bitmaps, 1, meta->exp->nr_export_bitmaps);
} else {
for (i = 0; i < nb_queries; ++i) {
ret = nbd_negotiate_meta_query(client, meta, errp);
if (ret < 0) {
return ret;
}
+ count++;
+ }
+
+ if (meta->allocation_depth) {
+ ret = nbd_negotiate_send_meta_context(client, "qemu:allocation-depth",
+ NBD_META_ID_ALLOCATION_DEPTH,
+ errp);
+ if (ret < 0) {
+ return ret;
+ }
+ count++;
}
- if (meta->bitmap) {
- ret = nbd_negotiate_send_meta_context(client,
- meta->exp->export_bitmap_context,
- NBD_META_ID_DIRTY_BITMAP,
+ for (i = 0; i < meta->exp->nr_export_bitmaps; i++) {
+ const char *bm_name;
+ g_autofree char *context = NULL;
+
+ if (!meta->bitmaps[i]) {
+ continue;
+ }
+
+ bm_name = bdrv_dirty_bitmap_name(meta->exp->export_bitmaps[i]);
+ context = g_strdup_printf("qemu:dirty-bitmap:%s", bm_name);
+
+ ret = nbd_negotiate_send_meta_context(client, context,
+ NBD_META_ID_DIRTY_BITMAP + i,
errp);
if (ret < 0) {
return ret;
}
+ count++;
}
ret = nbd_negotiate_send_rep(client, NBD_REP_ACK, errp);
if (ret == 0) {
- meta->valid = true;
+ meta->count = count;
}
return ret;
}
/* Attach the channel to the same AioContext as the export */
- if (client->exp && client->exp->ctx) {
- qio_channel_attach_aio_context(client->ioc, client->exp->ctx);
+ if (client->exp && client->exp->common.ctx) {
+ qio_channel_attach_aio_context(client->ioc, client->exp->common.ctx);
}
assert(!client->optlen);
return 0;
}
-static int nbd_receive_request(QIOChannel *ioc, NBDRequest *request,
+/* nbd_read_eof
+ * Tries to read @size bytes from @ioc. This is a local implementation of
+ * qio_channel_readv_all_eof. We have it here because we need it to be
+ * interruptible and to know when the coroutine is yielding.
+ * Returns 1 on success
+ * 0 on eof, when no data was read (errp is not set)
+ * negative errno on failure (errp is set)
+ */
+static inline int coroutine_fn
+nbd_read_eof(NBDClient *client, void *buffer, size_t size, Error **errp)
+{
+ bool partial = false;
+
+ assert(size);
+ while (size > 0) {
+ struct iovec iov = { .iov_base = buffer, .iov_len = size };
+ ssize_t len;
+
+ len = qio_channel_readv(client->ioc, &iov, 1, errp);
+ if (len == QIO_CHANNEL_ERR_BLOCK) {
+ client->read_yielding = true;
+ qio_channel_yield(client->ioc, G_IO_IN);
+ client->read_yielding = false;
+ if (client->quiescing) {
+ return -EAGAIN;
+ }
+ continue;
+ } else if (len < 0) {
+ return -EIO;
+ } else if (len == 0) {
+ if (partial) {
+ error_setg(errp,
+ "Unexpected end-of-file before all bytes were read");
+ return -EIO;
+ } else {
+ return 0;
+ }
+ }
+
+ partial = true;
+ size -= len;
+ buffer = (uint8_t *) buffer + len;
+ }
+ return 1;
+}
+
+static int nbd_receive_request(NBDClient *client, NBDRequest *request,
Error **errp)
{
uint8_t buf[NBD_REQUEST_SIZE];
uint32_t magic;
int ret;
- ret = nbd_read(ioc, buf, sizeof(buf), "request", errp);
+ ret = nbd_read_eof(client, buf, sizeof(buf), errp);
if (ret < 0) {
return ret;
}
g_free(client->tlsauthz);
if (client->exp) {
QTAILQ_REMOVE(&client->exp->clients, client, next);
- nbd_export_put(client->exp);
+ blk_exp_unref(&client->exp->common);
}
+ g_free(client->export_meta.bitmaps);
g_free(client);
}
}
trace_nbd_blk_aio_attached(exp->name, ctx);
- exp->ctx = ctx;
+ exp->common.ctx = ctx;
QTAILQ_FOREACH(client, &exp->clients, next) {
qio_channel_attach_aio_context(client->ioc, ctx);
+
+ assert(client->recv_coroutine == NULL);
+ assert(client->send_coroutine == NULL);
+
+ if (client->quiescing) {
+ client->quiescing = false;
+ nbd_client_receive_next_request(client);
+ }
+ }
+}
+
+static void nbd_aio_detach_bh(void *opaque)
+{
+ NBDExport *exp = opaque;
+ NBDClient *client;
+
+ QTAILQ_FOREACH(client, &exp->clients, next) {
+ qio_channel_detach_aio_context(client->ioc);
+ client->quiescing = true;
+
if (client->recv_coroutine) {
- aio_co_schedule(ctx, client->recv_coroutine);
+ if (client->read_yielding) {
+ qemu_aio_coroutine_enter(exp->common.ctx,
+ client->recv_coroutine);
+ } else {
+ AIO_WAIT_WHILE(exp->common.ctx, client->recv_coroutine != NULL);
+ }
}
+
if (client->send_coroutine) {
- aio_co_schedule(ctx, client->send_coroutine);
+ AIO_WAIT_WHILE(exp->common.ctx, client->send_coroutine != NULL);
}
}
}
static void blk_aio_detach(void *opaque)
{
NBDExport *exp = opaque;
- NBDClient *client;
- trace_nbd_blk_aio_detach(exp->name, exp->ctx);
+ trace_nbd_blk_aio_detach(exp->name, exp->common.ctx);
- QTAILQ_FOREACH(client, &exp->clients, next) {
- qio_channel_detach_aio_context(client->ioc);
- }
+ aio_wait_bh_oneshot(exp->common.ctx, nbd_aio_detach_bh, exp);
- exp->ctx = NULL;
+ exp->common.ctx = NULL;
}
static void nbd_eject_notifier(Notifier *n, void *data)
{
NBDExport *exp = container_of(n, NBDExport, eject_notifier);
- AioContext *aio_context;
- aio_context = exp->ctx;
- aio_context_acquire(aio_context);
- nbd_export_close(exp);
- aio_context_release(aio_context);
+ blk_exp_request_shutdown(&exp->common);
}
-NBDExport *nbd_export_new(BlockDriverState *bs, uint64_t dev_offset,
- uint64_t size, const char *name, const char *desc,
- const char *bitmap, bool readonly, bool shared,
- void (*close)(NBDExport *), bool writethrough,
- BlockBackend *on_eject_blk, Error **errp)
+void nbd_export_set_on_eject_blk(BlockExport *exp, BlockBackend *blk)
{
- AioContext *ctx;
- BlockBackend *blk;
- NBDExport *exp = g_new0(NBDExport, 1);
- uint64_t perm;
+ NBDExport *nbd_exp = container_of(exp, NBDExport, common);
+ assert(exp->drv == &blk_exp_nbd);
+ assert(nbd_exp->eject_notifier_blk == NULL);
+
+ blk_ref(blk);
+ nbd_exp->eject_notifier_blk = blk;
+ nbd_exp->eject_notifier.notify = nbd_eject_notifier;
+ blk_add_remove_bs_notifier(blk, &nbd_exp->eject_notifier);
+}
+
+static int nbd_export_create(BlockExport *blk_exp, BlockExportOptions *exp_args,
+ Error **errp)
+{
+ NBDExport *exp = container_of(blk_exp, NBDExport, common);
+ BlockExportOptionsNbd *arg = &exp_args->u.nbd;
+ BlockBackend *blk = blk_exp->blk;
+ int64_t size;
+ uint64_t perm, shared_perm;
+ bool readonly = !exp_args->writable;
+ bool shared = !exp_args->writable;
+ strList *bitmaps;
+ size_t i;
int ret;
- /*
- * NBD exports are used for non-shared storage migration. Make sure
- * that BDRV_O_INACTIVE is cleared and the image is ready for write
- * access since the export could be available before migration handover.
- * ctx was acquired in the caller.
- */
- assert(name && strlen(name) <= NBD_MAX_STRING_SIZE);
- ctx = bdrv_get_aio_context(bs);
- bdrv_invalidate_cache(bs, NULL);
+ assert(exp_args->type == BLOCK_EXPORT_TYPE_NBD);
+
+ if (!nbd_server_is_running()) {
+ error_setg(errp, "NBD server not running");
+ return -EINVAL;
+ }
+
+ if (!arg->has_name) {
+ arg->name = exp_args->node_name;
+ }
+
+ if (strlen(arg->name) > NBD_MAX_STRING_SIZE) {
+ error_setg(errp, "export name '%s' too long", arg->name);
+ return -EINVAL;
+ }
+
+ if (arg->description && strlen(arg->description) > NBD_MAX_STRING_SIZE) {
+ error_setg(errp, "description '%s' too long", arg->description);
+ return -EINVAL;
+ }
+
+ if (nbd_export_find(arg->name)) {
+ error_setg(errp, "NBD server already has export named '%s'", arg->name);
+ return -EEXIST;
+ }
+
+ size = blk_getlength(blk);
+ if (size < 0) {
+ error_setg_errno(errp, -size,
+ "Failed to determine the NBD export's length");
+ return size;
+ }
/* Don't allow resize while the NBD server is running, otherwise we don't
* care what happens with the node. */
- perm = BLK_PERM_CONSISTENT_READ;
- if (!readonly) {
- perm |= BLK_PERM_WRITE;
- }
- blk = blk_new(ctx, perm,
- BLK_PERM_CONSISTENT_READ | BLK_PERM_WRITE_UNCHANGED |
- BLK_PERM_WRITE | BLK_PERM_GRAPH_MOD);
- ret = blk_insert_bs(blk, bs, errp);
+ blk_get_perm(blk, &perm, &shared_perm);
+ ret = blk_set_perm(blk, perm, shared_perm & ~BLK_PERM_RESIZE, errp);
if (ret < 0) {
- goto fail;
+ return ret;
}
- blk_set_enable_write_cache(blk, !writethrough);
- blk_set_allow_aio_context_change(blk, true);
- exp->refcount = 1;
QTAILQ_INIT(&exp->clients);
- exp->blk = blk;
- assert(dev_offset <= INT64_MAX);
- exp->dev_offset = dev_offset;
- exp->name = g_strdup(name);
- assert(!desc || strlen(desc) <= NBD_MAX_STRING_SIZE);
- exp->description = g_strdup(desc);
+ exp->name = g_strdup(arg->name);
+ exp->description = g_strdup(arg->description);
exp->nbdflags = (NBD_FLAG_HAS_FLAGS | NBD_FLAG_SEND_FLUSH |
NBD_FLAG_SEND_FUA | NBD_FLAG_SEND_CACHE);
if (readonly) {
exp->nbdflags |= (NBD_FLAG_SEND_TRIM | NBD_FLAG_SEND_WRITE_ZEROES |
NBD_FLAG_SEND_FAST_ZERO);
}
- assert(size <= INT64_MAX - dev_offset);
exp->size = QEMU_ALIGN_DOWN(size, BDRV_SECTOR_SIZE);
- if (bitmap) {
+ for (bitmaps = arg->bitmaps; bitmaps; bitmaps = bitmaps->next) {
+ exp->nr_export_bitmaps++;
+ }
+ exp->export_bitmaps = g_new0(BdrvDirtyBitmap *, exp->nr_export_bitmaps);
+ for (i = 0, bitmaps = arg->bitmaps; bitmaps;
+ i++, bitmaps = bitmaps->next) {
+ const char *bitmap = bitmaps->value;
+ BlockDriverState *bs = blk_bs(blk);
BdrvDirtyBitmap *bm = NULL;
- while (true) {
+ while (bs) {
bm = bdrv_find_dirty_bitmap(bs, bitmap);
- if (bm != NULL || bs->backing == NULL) {
+ if (bm != NULL) {
break;
}
- bs = bs->backing->bs;
+ bs = bdrv_filter_or_cow_bs(bs);
}
if (bm == NULL) {
+ ret = -ENOENT;
error_setg(errp, "Bitmap '%s' is not found", bitmap);
goto fail;
}
if (bdrv_dirty_bitmap_check(bm, BDRV_BITMAP_ALLOW_RO, errp)) {
+ ret = -EINVAL;
goto fail;
}
if (readonly && bdrv_is_writable(bs) &&
bdrv_dirty_bitmap_enabled(bm)) {
+ ret = -EINVAL;
error_setg(errp,
"Enabled bitmap '%s' incompatible with readonly export",
bitmap);
goto fail;
}
- bdrv_dirty_bitmap_set_busy(bm, true);
- exp->export_bitmap = bm;
+ exp->export_bitmaps[i] = bm;
assert(strlen(bitmap) <= BDRV_BITMAP_MAX_NAME_SIZE);
- exp->export_bitmap_context = g_strdup_printf("qemu:dirty-bitmap:%s",
- bitmap);
- assert(strlen(exp->export_bitmap_context) < NBD_MAX_STRING_SIZE);
}
- exp->close = close;
- exp->ctx = ctx;
+ /* Mark bitmaps busy in a separate loop, to simplify roll-back concerns. */
+ for (i = 0; i < exp->nr_export_bitmaps; i++) {
+ bdrv_dirty_bitmap_set_busy(exp->export_bitmaps[i], true);
+ }
+
+ exp->allocation_depth = arg->allocation_depth;
+
blk_add_aio_context_notifier(blk, blk_aio_attached, blk_aio_detach, exp);
- if (on_eject_blk) {
- blk_ref(on_eject_blk);
- exp->eject_notifier_blk = on_eject_blk;
- exp->eject_notifier.notify = nbd_eject_notifier;
- blk_add_remove_bs_notifier(on_eject_blk, &exp->eject_notifier);
- }
QTAILQ_INSERT_TAIL(&exports, exp, next);
- nbd_export_get(exp);
- return exp;
+
+ return 0;
fail:
- blk_unref(blk);
+ g_free(exp->export_bitmaps);
g_free(exp->name);
g_free(exp->description);
- g_free(exp);
- return NULL;
+ return ret;
}
NBDExport *nbd_export_find(const char *name)
AioContext *
nbd_export_aio_context(NBDExport *exp)
{
- return exp->ctx;
+ return exp->common.ctx;
}
-void nbd_export_close(NBDExport *exp)
+static void nbd_export_request_shutdown(BlockExport *blk_exp)
{
+ NBDExport *exp = container_of(blk_exp, NBDExport, common);
NBDClient *client, *next;
- nbd_export_get(exp);
+ blk_exp_ref(&exp->common);
/*
* TODO: Should we expand QMP NbdServerRemoveNode enum to allow a
* close mode that stops advertising the export to new clients but
client_close(client, true);
}
if (exp->name) {
- nbd_export_put(exp);
g_free(exp->name);
exp->name = NULL;
QTAILQ_REMOVE(&exports, exp, next);
- QTAILQ_INSERT_TAIL(&closed_exports, exp, next);
- }
- g_free(exp->description);
- exp->description = NULL;
- nbd_export_put(exp);
-}
-
-void nbd_export_remove(NBDExport *exp, NbdServerRemoveMode mode, Error **errp)
-{
- ERRP_GUARD();
- if (mode == NBD_SERVER_REMOVE_MODE_HARD || QTAILQ_EMPTY(&exp->clients)) {
- nbd_export_close(exp);
- return;
}
-
- assert(mode == NBD_SERVER_REMOVE_MODE_SAFE);
-
- error_setg(errp, "export '%s' still in use", exp->name);
- error_append_hint(errp, "Use mode='hard' to force client disconnect\n");
+ blk_exp_unref(&exp->common);
}
-void nbd_export_get(NBDExport *exp)
+static void nbd_export_delete(BlockExport *blk_exp)
{
- assert(exp->refcount > 0);
- exp->refcount++;
-}
-
-void nbd_export_put(NBDExport *exp)
-{
- assert(exp->refcount > 0);
- if (exp->refcount == 1) {
- nbd_export_close(exp);
- }
+ size_t i;
+ NBDExport *exp = container_of(blk_exp, NBDExport, common);
- /* nbd_export_close() may theoretically reduce refcount to 0. It may happen
- * if someone calls nbd_export_put() on named export not through
- * nbd_export_set_name() when refcount is 1. So, let's assert that
- * it is > 0.
- */
- assert(exp->refcount > 0);
- if (--exp->refcount == 0) {
- assert(exp->name == NULL);
- assert(exp->description == NULL);
-
- if (exp->close) {
- exp->close(exp);
- }
+ assert(exp->name == NULL);
+ assert(QTAILQ_EMPTY(&exp->clients));
- if (exp->blk) {
- if (exp->eject_notifier_blk) {
- notifier_remove(&exp->eject_notifier);
- blk_unref(exp->eject_notifier_blk);
- }
- blk_remove_aio_context_notifier(exp->blk, blk_aio_attached,
- blk_aio_detach, exp);
- blk_unref(exp->blk);
- exp->blk = NULL;
- }
+ g_free(exp->description);
+ exp->description = NULL;
- if (exp->export_bitmap) {
- bdrv_dirty_bitmap_set_busy(exp->export_bitmap, false);
- g_free(exp->export_bitmap_context);
+ if (exp->common.blk) {
+ if (exp->eject_notifier_blk) {
+ notifier_remove(&exp->eject_notifier);
+ blk_unref(exp->eject_notifier_blk);
}
-
- QTAILQ_REMOVE(&closed_exports, exp, next);
- g_free(exp);
- aio_wait_kick();
+ blk_remove_aio_context_notifier(exp->common.blk, blk_aio_attached,
+ blk_aio_detach, exp);
}
-}
-
-BlockBackend *nbd_export_get_blockdev(NBDExport *exp)
-{
- return exp->blk;
-}
-
-void nbd_export_close_all(void)
-{
- NBDExport *exp, *next;
- AioContext *aio_context;
- QTAILQ_FOREACH_SAFE(exp, &exports, next, next) {
- aio_context = exp->ctx;
- aio_context_acquire(aio_context);
- nbd_export_close(exp);
- aio_context_release(aio_context);
+ for (i = 0; i < exp->nr_export_bitmaps; i++) {
+ bdrv_dirty_bitmap_set_busy(exp->export_bitmaps[i], false);
}
-
- AIO_WAIT_WHILE(NULL, !(QTAILQ_EMPTY(&exports) &&
- QTAILQ_EMPTY(&closed_exports)));
}
+const BlockExportDriver blk_exp_nbd = {
+ .type = BLOCK_EXPORT_TYPE_NBD,
+ .instance_size = sizeof(NBDExport),
+ .create = nbd_export_create,
+ .delete = nbd_export_delete,
+ .request_shutdown = nbd_export_request_shutdown,
+};
+
static int coroutine_fn nbd_co_send_iov(NBDClient *client, struct iovec *iov,
unsigned niov, Error **errp)
{
while (progress < size) {
int64_t pnum;
- int status = bdrv_block_status_above(blk_bs(exp->blk), NULL,
+ int status = bdrv_block_status_above(blk_bs(exp->common.blk), NULL,
offset + progress,
size - progress, &pnum, NULL,
NULL);
stl_be_p(&chunk.length, pnum);
ret = nbd_co_send_iov(client, iov, 1, errp);
} else {
- ret = blk_pread(exp->blk, offset + progress + exp->dev_offset,
+ ret = blk_pread(exp->common.blk, offset + progress,
data + progress, pnum);
if (ret < 0) {
error_setg_errno(errp, -ret, "reading from file failed");
return 0;
}
+static int blockalloc_to_extents(BlockDriverState *bs, uint64_t offset,
+ uint64_t bytes, NBDExtentArray *ea)
+{
+ while (bytes) {
+ int64_t num;
+ int ret = bdrv_is_allocated_above(bs, NULL, false, offset, bytes,
+ &num);
+
+ if (ret < 0) {
+ return ret;
+ }
+
+ if (nbd_extent_array_add(ea, num, ret) < 0) {
+ return 0;
+ }
+
+ offset += num;
+ bytes -= num;
+ }
+
+ return 0;
+}
+
/*
* nbd_co_send_extents
*
unsigned int nb_extents = dont_fragment ? 1 : NBD_MAX_BLOCK_STATUS_EXTENTS;
g_autoptr(NBDExtentArray) ea = nbd_extent_array_new(nb_extents);
- ret = blockstatus_to_extents(bs, offset, length, ea);
+ if (context_id == NBD_META_ID_BASE_ALLOCATION) {
+ ret = blockstatus_to_extents(bs, offset, length, ea);
+ } else {
+ ret = blockalloc_to_extents(bs, offset, length, ea);
+ }
if (ret < 0) {
return nbd_co_send_structured_error(
client, handle, -ret, "can't get block status", errp);
}
if (!full) {
- /* last non dirty extent */
- nbd_extent_array_add(es, end - start, 0);
+ /* last non dirty extent, nothing to do if array is now full */
+ (void) nbd_extent_array_add(es, end - start, 0);
}
bdrv_dirty_bitmap_unlock(bitmap);
/* nbd_co_receive_request
* Collect a client request. Return 0 if request looks valid, -EIO to drop
- * connection right away, and any other negative value to report an error to
- * the client (although the caller may still need to disconnect after reporting
- * the error).
+ * connection right away, -EAGAIN to indicate we were interrupted and the
+ * channel should be quiesced, and any other negative value to report an error
+ * to the client (although the caller may still need to disconnect after
+ * reporting the error).
*/
static int nbd_co_receive_request(NBDRequestData *req, NBDRequest *request,
Error **errp)
{
NBDClient *client = req->client;
int valid_flags;
+ int ret;
g_assert(qemu_in_coroutine());
assert(client->recv_coroutine == qemu_coroutine_self());
- if (nbd_receive_request(client->ioc, request, errp) < 0) {
- return -EIO;
+ ret = nbd_receive_request(client, request, errp);
+ if (ret < 0) {
+ return ret;
}
trace_nbd_co_receive_request_decode_type(request->handle, request->type,
}
if (request->type != NBD_CMD_CACHE) {
- req->data = blk_try_blockalign(client->exp->blk, request->len);
+ req->data = blk_try_blockalign(client->exp->common.blk,
+ request->len);
if (req->data == NULL) {
error_setg(errp, "No memory");
return -ENOMEM;
/* XXX: NBD Protocol only documents use of FUA with WRITE */
if (request->flags & NBD_CMD_FLAG_FUA) {
- ret = blk_co_flush(exp->blk);
+ ret = blk_co_flush(exp->common.blk);
if (ret < 0) {
return nbd_send_generic_reply(client, request->handle, ret,
"flush failed", errp);
data, request->len, errp);
}
- ret = blk_pread(exp->blk, request->from + exp->dev_offset, data,
- request->len);
+ ret = blk_pread(exp->common.blk, request->from, data, request->len);
if (ret < 0) {
return nbd_send_generic_reply(client, request->handle, ret,
"reading from file failed", errp);
assert(request->type == NBD_CMD_CACHE);
- ret = blk_co_preadv(exp->blk, request->from + exp->dev_offset, request->len,
+ ret = blk_co_preadv(exp->common.blk, request->from, request->len,
NULL, BDRV_REQ_COPY_ON_READ | BDRV_REQ_PREFETCH);
return nbd_send_generic_reply(client, request->handle, ret,
int flags;
NBDExport *exp = client->exp;
char *msg;
+ size_t i;
switch (request->type) {
case NBD_CMD_CACHE:
if (request->flags & NBD_CMD_FLAG_FUA) {
flags |= BDRV_REQ_FUA;
}
- ret = blk_pwrite(exp->blk, request->from + exp->dev_offset,
- data, request->len, flags);
+ ret = blk_pwrite(exp->common.blk, request->from, data, request->len,
+ flags);
return nbd_send_generic_reply(client, request->handle, ret,
"writing to file failed", errp);
if (request->flags & NBD_CMD_FLAG_FAST_ZERO) {
flags |= BDRV_REQ_NO_FALLBACK;
}
- ret = blk_pwrite_zeroes(exp->blk, request->from + exp->dev_offset,
- request->len, flags);
+ ret = 0;
+ /* FIXME simplify this when blk_pwrite_zeroes switches to 64-bit */
+ while (ret >= 0 && request->len) {
+ int align = client->check_align ?: 1;
+ int len = MIN(request->len, QEMU_ALIGN_DOWN(BDRV_REQUEST_MAX_BYTES,
+ align));
+ ret = blk_pwrite_zeroes(exp->common.blk, request->from, len, flags);
+ request->len -= len;
+ request->from += len;
+ }
return nbd_send_generic_reply(client, request->handle, ret,
"writing to file failed", errp);
abort();
case NBD_CMD_FLUSH:
- ret = blk_co_flush(exp->blk);
+ ret = blk_co_flush(exp->common.blk);
return nbd_send_generic_reply(client, request->handle, ret,
"flush failed", errp);
case NBD_CMD_TRIM:
- ret = blk_co_pdiscard(exp->blk, request->from + exp->dev_offset,
- request->len);
- if (ret == 0 && request->flags & NBD_CMD_FLAG_FUA) {
- ret = blk_co_flush(exp->blk);
+ ret = 0;
+ /* FIXME simplify this when blk_co_pdiscard switches to 64-bit */
+ while (ret >= 0 && request->len) {
+ int align = client->check_align ?: 1;
+ int len = MIN(request->len, QEMU_ALIGN_DOWN(BDRV_REQUEST_MAX_BYTES,
+ align));
+ ret = blk_co_pdiscard(exp->common.blk, request->from, len);
+ request->len -= len;
+ request->from += len;
+ }
+ if (ret >= 0 && request->flags & NBD_CMD_FLAG_FUA) {
+ ret = blk_co_flush(exp->common.blk);
}
return nbd_send_generic_reply(client, request->handle, ret,
"discard failed", errp);
return nbd_send_generic_reply(client, request->handle, -EINVAL,
"need non-zero length", errp);
}
- if (client->export_meta.valid &&
- (client->export_meta.base_allocation ||
- client->export_meta.bitmap))
- {
+ if (client->export_meta.count) {
bool dont_fragment = request->flags & NBD_CMD_FLAG_REQ_ONE;
+ int contexts_remaining = client->export_meta.count;
if (client->export_meta.base_allocation) {
ret = nbd_co_send_block_status(client, request->handle,
- blk_bs(exp->blk), request->from,
+ blk_bs(exp->common.blk),
+ request->from,
request->len, dont_fragment,
- !client->export_meta.bitmap,
+ !--contexts_remaining,
NBD_META_ID_BASE_ALLOCATION,
errp);
if (ret < 0) {
}
}
- if (client->export_meta.bitmap) {
+ if (client->export_meta.allocation_depth) {
+ ret = nbd_co_send_block_status(client, request->handle,
+ blk_bs(exp->common.blk),
+ request->from, request->len,
+ dont_fragment,
+ !--contexts_remaining,
+ NBD_META_ID_ALLOCATION_DEPTH,
+ errp);
+ if (ret < 0) {
+ return ret;
+ }
+ }
+
+ for (i = 0; i < client->exp->nr_export_bitmaps; i++) {
+ if (!client->export_meta.bitmaps[i]) {
+ continue;
+ }
ret = nbd_co_send_bitmap(client, request->handle,
- client->exp->export_bitmap,
+ client->exp->export_bitmaps[i],
request->from, request->len,
- dont_fragment,
- true, NBD_META_ID_DIRTY_BITMAP, errp);
+ dont_fragment, !--contexts_remaining,
+ NBD_META_ID_DIRTY_BITMAP + i, errp);
if (ret < 0) {
return ret;
}
}
+ assert(!contexts_remaining);
+
return 0;
} else {
return nbd_send_generic_reply(client, request->handle, -EINVAL,
return;
}
+ if (client->quiescing) {
+ /*
+ * We're switching between AIO contexts. Don't attempt to receive a new
+ * request and kick the main context which may be waiting for us.
+ */
+ nbd_client_put(client);
+ client->recv_coroutine = NULL;
+ aio_wait_kick();
+ return;
+ }
+
req = nbd_request_get(client);
ret = nbd_co_receive_request(req, &request, &local_err);
client->recv_coroutine = NULL;
goto done;
}
+ if (ret == -EAGAIN) {
+ assert(client->quiescing);
+ goto done;
+ }
+
nbd_client_receive_next_request(client);
if (ret == -EIO) {
goto disconnect;
static void nbd_client_receive_next_request(NBDClient *client)
{
- if (!client->recv_coroutine && client->nb_requests < MAX_NBD_REQUESTS) {
+ if (!client->recv_coroutine && client->nb_requests < MAX_NBD_REQUESTS &&
+ !client->quiescing) {
nbd_client_get(client);
client->recv_coroutine = qemu_coroutine_create(nbd_trip, client);
- aio_co_schedule(client->exp->ctx, client->recv_coroutine);
+ aio_co_schedule(client->exp->common.ctx, client->recv_coroutine);
}
}