#include "qemu/osdep.h"
#include "qemu/cutils.h"
+#include "qemu/module.h"
#include "qemu/option.h"
#include "block/block_int.h"
+#include "block/qdict.h"
#include "qapi/error.h"
#include "qapi/qapi-events-block.h"
#include "qapi/qmp/qdict.h"
/* Request metadata */
uint64_t offset;
uint64_t bytes;
+ int flags;
QEMUIOVector *qiov; /* calling IOV */
static QuorumAIOCB *quorum_aio_get(BlockDriverState *bs,
QEMUIOVector *qiov,
uint64_t offset,
- uint64_t bytes)
+ uint64_t bytes,
+ int flags)
{
BDRVQuorumState *s = bs->opaque;
QuorumAIOCB *acb = g_new(QuorumAIOCB, 1);
.bs = bs,
.offset = offset,
.bytes = bytes,
+ .flags = flags,
.qiov = qiov,
.votes.compare = quorum_sha256_compare,
.votes.vote_list = QLIST_HEAD_INITIALIZER(acb.votes.vote_list),
}
qapi_event_send_quorum_report_bad(type, !!msg, msg, node_name, start_sector,
- end_sector - start_sector, &error_abort);
+ end_sector - start_sector);
}
static void quorum_report_failure(QuorumAIOCB *acb)
BDRV_SECTOR_SIZE);
qapi_event_send_quorum_failure(reference, start_sector,
- end_sector - start_sector, &error_abort);
+ end_sector - start_sector);
}
static int quorum_vote_error(QuorumAIOCB *acb);
BDRVQuorumState *s = acb->bs->opaque;
/* Ignore any errors, it's just a correction attempt for already
- * corrupted data. */
+ * corrupted data.
+ * Mask out BDRV_REQ_WRITE_UNCHANGED because this overwrites the
+ * area with different data from the other children. */
bdrv_co_pwritev(s->children[co->idx], acb->offset, acb->bytes,
- acb->qiov, 0);
+ acb->qiov, acb->flags & ~BDRV_REQ_WRITE_UNCHANGED);
/* Wake up the caller after the last rewrite */
acb->rewrite_count--;
return true;
}
-static void GCC_FMT_ATTR(2, 3) quorum_err(QuorumAIOCB *acb,
- const char *fmt, ...)
-{
- va_list ap;
-
- va_start(ap, fmt);
- fprintf(stderr, "quorum: offset=%" PRIu64 " bytes=%" PRIu64 " ",
- acb->offset, acb->bytes);
- vfprintf(stderr, fmt, ap);
- fprintf(stderr, "\n");
- va_end(ap);
- exit(1);
-}
-
-static bool quorum_compare(QuorumAIOCB *acb,
- QEMUIOVector *a,
- QEMUIOVector *b)
+static bool quorum_compare(QuorumAIOCB *acb, QEMUIOVector *a, QEMUIOVector *b)
{
BDRVQuorumState *s = acb->bs->opaque;
ssize_t offset;
if (s->is_blkverify) {
offset = qemu_iovec_compare(a, b);
if (offset != -1) {
- quorum_err(acb, "contents mismatch at offset %" PRIu64,
- acb->offset + offset);
+ fprintf(stderr, "quorum: offset=%" PRIu64 " bytes=%" PRIu64
+ " contents mismatch at offset %" PRIu64 "\n",
+ acb->offset, acb->bytes, acb->offset + offset);
+ exit(1);
}
return true;
}
static int read_quorum_children(QuorumAIOCB *acb)
{
BDRVQuorumState *s = acb->bs->opaque;
- int i, ret;
+ int i;
acb->children_read = s->num_children;
for (i = 0; i < s->num_children; i++) {
qemu_coroutine_yield();
}
- ret = acb->vote_ret;
-
- return ret;
+ return acb->vote_ret;
}
static int read_fifo_child(QuorumAIOCB *acb)
uint64_t bytes, QEMUIOVector *qiov, int flags)
{
BDRVQuorumState *s = bs->opaque;
- QuorumAIOCB *acb = quorum_aio_get(bs, qiov, offset, bytes);
+ QuorumAIOCB *acb = quorum_aio_get(bs, qiov, offset, bytes, flags);
int ret;
acb->is_read = true;
sacb->bs = s->children[i]->bs;
sacb->ret = bdrv_co_pwritev(s->children[i], acb->offset, acb->bytes,
- acb->qiov, 0);
+ acb->qiov, acb->flags);
if (sacb->ret == 0) {
acb->success_count++;
} else {
uint64_t bytes, QEMUIOVector *qiov, int flags)
{
BDRVQuorumState *s = bs->opaque;
- QuorumAIOCB *acb = quorum_aio_get(bs, qiov, offset, bytes);
+ QuorumAIOCB *acb = quorum_aio_get(bs, qiov, offset, bytes, flags);
int i, ret;
for (i = 0; i < s->num_children; i++) {
return result;
}
-static bool quorum_recurse_is_first_non_filter(BlockDriverState *bs,
- BlockDriverState *candidate)
+static bool quorum_recurse_can_replace(BlockDriverState *bs,
+ BlockDriverState *to_replace)
{
BDRVQuorumState *s = bs->opaque;
int i;
for (i = 0; i < s->num_children; i++) {
- bool perm = bdrv_recurse_is_first_non_filter(s->children[i]->bs,
- candidate);
- if (perm) {
- return true;
+ /*
+ * We have no idea whether our children show the same data as
+ * this node (@bs). It is actually highly likely that
+ * @to_replace does not, because replacing a broken child is
+ * one of the main use cases here.
+ *
+ * We do know that the new BDS will match @bs, so replacing
+ * any of our children by it will be safe. It cannot change
+ * the data this quorum node presents to its parents.
+ *
+ * However, replacing @to_replace by @bs in any of our
+ * children's chains may change visible data somewhere in
+ * there. We therefore cannot recurse down those chains with
+ * bdrv_recurse_can_replace().
+ * (More formally, bdrv_recurse_can_replace() requires that
+ * @to_replace will be replaced by something matching the @bs
+ * passed to it. We cannot guarantee that.)
+ *
+ * Thus, we can only check whether any of our immediate
+ * children matches @to_replace.
+ *
+ * (In the future, we might add a function to recurse down a
+ * chain that checks that nothing there cares about a change
+ * in data from the respective child in question. For
+ * example, most filters do not care when their child's data
+ * suddenly changes, as long as their parents do not care.)
+ */
+ if (s->children[i]->bs == to_replace) {
+ /*
+ * We now have to ensure that there is no other parent
+ * that cares about replacing this child by a node with
+ * potentially different data.
+ * We do so by checking whether there are any other parents
+ * at all, which is stricter than necessary, but also very
+ * simple. (We may decide to implement something more
+ * complex and permissive when there is an actual need for
+ * it.)
+ */
+ return QLIST_FIRST(&to_replace->parents) == s->children[i] &&
+ QLIST_NEXT(s->children[i], next_parent) == NULL;
}
}
s->read_pattern = ret;
if (s->read_pattern == QUORUM_READ_PATTERN_QUORUM) {
- /* is the driver in blkverify mode */
- if (qemu_opt_get_bool(opts, QUORUM_OPT_BLKVERIFY, false) &&
- s->num_children == 2 && s->threshold == 2) {
- s->is_blkverify = true;
- } else if (qemu_opt_get_bool(opts, QUORUM_OPT_BLKVERIFY, false)) {
- fprintf(stderr, "blkverify mode is set by setting blkverify=on "
- "and using two files with vote_threshold=2\n");
+ s->is_blkverify = qemu_opt_get_bool(opts, QUORUM_OPT_BLKVERIFY, false);
+ if (s->is_blkverify && (s->num_children != 2 || s->threshold != 2)) {
+ error_setg(&local_err, "blkverify=on can only be set if there are "
+ "exactly two files and vote-threshold is 2");
+ ret = -EINVAL;
+ goto exit;
}
s->rewrite_corrupted = qemu_opt_get_bool(opts, QUORUM_OPT_REWRITE,
}
s->next_child_index = s->num_children;
+ bs->supported_write_flags = BDRV_REQ_WRITE_UNCHANGED;
+
g_free(opened);
goto exit;
char indexstr[32];
int ret;
+ if (s->is_blkverify) {
+ error_setg(errp, "Cannot add a child to a quorum in blkverify mode");
+ return;
+ }
+
assert(s->num_children <= INT_MAX / sizeof(BdrvChild *));
if (s->num_children == INT_MAX / sizeof(BdrvChild *) ||
s->next_child_index == UINT_MAX) {
child = bdrv_attach_child(bs, child_bs, indexstr, &child_format, errp);
if (child == NULL) {
s->next_child_index--;
- bdrv_unref(child_bs);
goto out;
}
s->children = g_renew(BdrvChild *, s->children, s->num_children + 1);
return;
}
+ /* We know now that num_children > threshold, so blkverify must be false */
+ assert(!s->is_blkverify);
+
bdrv_drained_begin(bs);
/* We can safely remove this child now */
bdrv_drained_end(bs);
}
-static void quorum_refresh_filename(BlockDriverState *bs, QDict *options)
+static void quorum_gather_child_options(BlockDriverState *bs, QDict *target,
+ bool backing_overridden)
{
BDRVQuorumState *s = bs->opaque;
- QDict *opts;
- QList *children;
+ QList *children_list;
int i;
- for (i = 0; i < s->num_children; i++) {
- bdrv_refresh_filename(s->children[i]->bs);
- if (!s->children[i]->bs->full_open_options) {
- return;
- }
- }
+ /*
+ * The generic implementation for gathering child options in
+ * bdrv_refresh_filename() would use the names of the children
+ * as specified for bdrv_open_child() or bdrv_attach_child(),
+ * which is "children.%u" with %u being a value
+ * (s->next_child_index) that is incremented each time a new child
+ * is added (and never decremented). Since children can be
+ * deleted at runtime, there may be gaps in that enumeration.
+ * When creating a new quorum BDS and specifying the children for
+ * it through runtime options, the enumeration used there may not
+ * have any gaps, though.
+ *
+ * Therefore, we have to create a new gap-less enumeration here
+ * (which we can achieve by simply putting all of the children's
+ * full_open_options into a QList).
+ *
+ * XXX: Note that there are issues with the current child option
+ * structure quorum uses (such as the fact that children do
+ * not really have unique permanent names). Therefore, this
+ * is going to have to change in the future and ideally we
+ * want quorum to be covered by the generic implementation.
+ */
+
+ children_list = qlist_new();
+ qdict_put(target, "children", children_list);
- children = qlist_new();
for (i = 0; i < s->num_children; i++) {
- QINCREF(s->children[i]->bs->full_open_options);
- qlist_append(children, s->children[i]->bs->full_open_options);
+ qlist_append(children_list,
+ qobject_ref(s->children[i]->bs->full_open_options));
}
+}
- opts = qdict_new();
- qdict_put_str(opts, "driver", "quorum");
- qdict_put_int(opts, QUORUM_OPT_VOTE_THRESHOLD, s->threshold);
- qdict_put_bool(opts, QUORUM_OPT_BLKVERIFY, s->is_blkverify);
- qdict_put_bool(opts, QUORUM_OPT_REWRITE, s->rewrite_corrupted);
- qdict_put(opts, "children", children);
+static char *quorum_dirname(BlockDriverState *bs, Error **errp)
+{
+ /* In general, there are multiple BDSs with different dirnames below this
+ * one; so there is no unique dirname we could return (unless all are equal
+ * by chance, or there is only one). Therefore, to be consistent, just
+ * always return NULL. */
+ error_setg(errp, "Cannot generate a base directory for quorum nodes");
+ return NULL;
+}
- bs->full_open_options = opts;
+static void quorum_child_perm(BlockDriverState *bs, BdrvChild *c,
+ const BdrvChildRole *role,
+ BlockReopenQueue *reopen_queue,
+ uint64_t perm, uint64_t shared,
+ uint64_t *nperm, uint64_t *nshared)
+{
+ *nperm = perm & DEFAULT_PERM_PASSTHROUGH;
+
+ /*
+ * We cannot share RESIZE or WRITE, as this would make the
+ * children differ from each other.
+ */
+ *nshared = (shared & (BLK_PERM_CONSISTENT_READ |
+ BLK_PERM_WRITE_UNCHANGED))
+ | DEFAULT_PERM_UNCHANGED;
}
+static const char *const quorum_strong_runtime_opts[] = {
+ QUORUM_OPT_VOTE_THRESHOLD,
+ QUORUM_OPT_BLKVERIFY,
+ QUORUM_OPT_REWRITE,
+ QUORUM_OPT_READ_PATTERN,
+
+ NULL
+};
+
static BlockDriver bdrv_quorum = {
.format_name = "quorum",
- .protocol_name = "quorum",
.instance_size = sizeof(BDRVQuorumState),
- .bdrv_file_open = quorum_open,
+ .bdrv_open = quorum_open,
.bdrv_close = quorum_close,
- .bdrv_refresh_filename = quorum_refresh_filename,
+ .bdrv_gather_child_options = quorum_gather_child_options,
+ .bdrv_dirname = quorum_dirname,
.bdrv_co_flush_to_disk = quorum_co_flush,
.bdrv_add_child = quorum_add_child,
.bdrv_del_child = quorum_del_child,
- .bdrv_child_perm = bdrv_filter_default_perms,
+ .bdrv_child_perm = quorum_child_perm,
+
+ .bdrv_recurse_can_replace = quorum_recurse_can_replace,
- .is_filter = true,
- .bdrv_recurse_is_first_non_filter = quorum_recurse_is_first_non_filter,
+ .strong_runtime_opts = quorum_strong_runtime_opts,
};
static void bdrv_quorum_init(void)