#include "qemu/osdep.h"
#include "qemu/cutils.h"
+#include "qemu/module.h"
+#include "qemu/option.h"
#include "block/block_int.h"
-#include "qapi/qmp/qbool.h"
+#include "block/qdict.h"
+#include "qapi/error.h"
+#include "qapi/qapi-events-block.h"
#include "qapi/qmp/qdict.h"
#include "qapi/qmp/qerror.h"
-#include "qapi/qmp/qjson.h"
#include "qapi/qmp/qlist.h"
#include "qapi/qmp/qstring.h"
-#include "qapi-event.h"
#include "crypto/hash.h"
#define HASH_LENGTH 32
/* Request metadata */
uint64_t offset;
uint64_t bytes;
+ int flags;
QEMUIOVector *qiov; /* calling IOV */
static QuorumAIOCB *quorum_aio_get(BlockDriverState *bs,
QEMUIOVector *qiov,
uint64_t offset,
- uint64_t bytes)
+ uint64_t bytes,
+ int flags)
{
BDRVQuorumState *s = bs->opaque;
QuorumAIOCB *acb = g_new(QuorumAIOCB, 1);
.bs = bs,
.offset = offset,
.bytes = bytes,
+ .flags = flags,
.qiov = qiov,
.votes.compare = quorum_sha256_compare,
.votes.vote_list = QLIST_HEAD_INITIALIZER(acb.votes.vote_list),
}
qapi_event_send_quorum_report_bad(type, !!msg, msg, node_name, start_sector,
- end_sector - start_sector, &error_abort);
+ end_sector - start_sector);
}
static void quorum_report_failure(QuorumAIOCB *acb)
BDRV_SECTOR_SIZE);
qapi_event_send_quorum_failure(reference, start_sector,
- end_sector - start_sector, &error_abort);
+ end_sector - start_sector);
}
static int quorum_vote_error(QuorumAIOCB *acb);
BDRVQuorumState *s = acb->bs->opaque;
/* Ignore any errors, it's just a correction attempt for already
- * corrupted data. */
+ * corrupted data.
+ * Mask out BDRV_REQ_WRITE_UNCHANGED because this overwrites the
+ * area with different data from the other children. */
bdrv_co_pwritev(s->children[co->idx], acb->offset, acb->bytes,
- acb->qiov, 0);
+ acb->qiov, acb->flags & ~BDRV_REQ_WRITE_UNCHANGED);
/* Wake up the caller after the last rewrite */
acb->rewrite_count--;
return true;
}
-static void GCC_FMT_ATTR(2, 3) quorum_err(QuorumAIOCB *acb,
- const char *fmt, ...)
-{
- va_list ap;
-
- va_start(ap, fmt);
- fprintf(stderr, "quorum: offset=%" PRIu64 " bytes=%" PRIu64 " ",
- acb->offset, acb->bytes);
- vfprintf(stderr, fmt, ap);
- fprintf(stderr, "\n");
- va_end(ap);
- exit(1);
-}
-
-static bool quorum_compare(QuorumAIOCB *acb,
- QEMUIOVector *a,
- QEMUIOVector *b)
+static bool quorum_compare(QuorumAIOCB *acb, QEMUIOVector *a, QEMUIOVector *b)
{
BDRVQuorumState *s = acb->bs->opaque;
ssize_t offset;
if (s->is_blkverify) {
offset = qemu_iovec_compare(a, b);
if (offset != -1) {
- quorum_err(acb, "contents mismatch at offset %" PRIu64,
- acb->offset + offset);
+ fprintf(stderr, "quorum: offset=%" PRIu64 " bytes=%" PRIu64
+ " contents mismatch at offset %" PRIu64 "\n",
+ acb->offset, acb->bytes, acb->offset + offset);
+ exit(1);
}
return true;
}
static int read_quorum_children(QuorumAIOCB *acb)
{
BDRVQuorumState *s = acb->bs->opaque;
- int i, ret;
+ int i;
acb->children_read = s->num_children;
for (i = 0; i < s->num_children; i++) {
qemu_coroutine_yield();
}
- ret = acb->vote_ret;
-
- return ret;
+ return acb->vote_ret;
}
static int read_fifo_child(QuorumAIOCB *acb)
uint64_t bytes, QEMUIOVector *qiov, int flags)
{
BDRVQuorumState *s = bs->opaque;
- QuorumAIOCB *acb = quorum_aio_get(bs, qiov, offset, bytes);
+ QuorumAIOCB *acb = quorum_aio_get(bs, qiov, offset, bytes, flags);
int ret;
acb->is_read = true;
sacb->bs = s->children[i]->bs;
sacb->ret = bdrv_co_pwritev(s->children[i], acb->offset, acb->bytes,
- acb->qiov, 0);
+ acb->qiov, acb->flags);
if (sacb->ret == 0) {
acb->success_count++;
} else {
uint64_t bytes, QEMUIOVector *qiov, int flags)
{
BDRVQuorumState *s = bs->opaque;
- QuorumAIOCB *acb = quorum_aio_get(bs, qiov, offset, bytes);
+ QuorumAIOCB *acb = quorum_aio_get(bs, qiov, offset, bytes, flags);
int i, ret;
for (i = 0; i < s->num_children; i++) {
s->read_pattern = ret;
if (s->read_pattern == QUORUM_READ_PATTERN_QUORUM) {
- /* is the driver in blkverify mode */
- if (qemu_opt_get_bool(opts, QUORUM_OPT_BLKVERIFY, false) &&
- s->num_children == 2 && s->threshold == 2) {
- s->is_blkverify = true;
- } else if (qemu_opt_get_bool(opts, QUORUM_OPT_BLKVERIFY, false)) {
- fprintf(stderr, "blkverify mode is set by setting blkverify=on "
- "and using two files with vote_threshold=2\n");
+ s->is_blkverify = qemu_opt_get_bool(opts, QUORUM_OPT_BLKVERIFY, false);
+ if (s->is_blkverify && (s->num_children != 2 || s->threshold != 2)) {
+ error_setg(&local_err, "blkverify=on can only be set if there are "
+ "exactly two files and vote-threshold is 2");
+ ret = -EINVAL;
+ goto exit;
}
s->rewrite_corrupted = qemu_opt_get_bool(opts, QUORUM_OPT_REWRITE,
}
s->next_child_index = s->num_children;
+ bs->supported_write_flags = BDRV_REQ_WRITE_UNCHANGED;
+
g_free(opened);
goto exit;
char indexstr[32];
int ret;
+ if (s->is_blkverify) {
+ error_setg(errp, "Cannot add a child to a quorum in blkverify mode");
+ return;
+ }
+
assert(s->num_children <= INT_MAX / sizeof(BdrvChild *));
if (s->num_children == INT_MAX / sizeof(BdrvChild *) ||
s->next_child_index == UINT_MAX) {
child = bdrv_attach_child(bs, child_bs, indexstr, &child_format, errp);
if (child == NULL) {
s->next_child_index--;
- bdrv_unref(child_bs);
goto out;
}
s->children = g_renew(BdrvChild *, s->children, s->num_children + 1);
return;
}
+ /* We know now that num_children > threshold, so blkverify must be false */
+ assert(!s->is_blkverify);
+
bdrv_drained_begin(bs);
/* We can safely remove this child now */
bdrv_drained_end(bs);
}
-static void quorum_refresh_filename(BlockDriverState *bs, QDict *options)
+static void quorum_gather_child_options(BlockDriverState *bs, QDict *target,
+ bool backing_overridden)
{
BDRVQuorumState *s = bs->opaque;
- QDict *opts;
- QList *children;
+ QList *children_list;
int i;
- for (i = 0; i < s->num_children; i++) {
- bdrv_refresh_filename(s->children[i]->bs);
- if (!s->children[i]->bs->full_open_options) {
- return;
- }
- }
+ /*
+ * The generic implementation for gathering child options in
+ * bdrv_refresh_filename() would use the names of the children
+ * as specified for bdrv_open_child() or bdrv_attach_child(),
+ * which is "children.%u" with %u being a value
+ * (s->next_child_index) that is incremented each time a new child
+ * is added (and never decremented). Since children can be
+ * deleted at runtime, there may be gaps in that enumeration.
+ * When creating a new quorum BDS and specifying the children for
+ * it through runtime options, the enumeration used there may not
+ * have any gaps, though.
+ *
+ * Therefore, we have to create a new gap-less enumeration here
+ * (which we can achieve by simply putting all of the children's
+ * full_open_options into a QList).
+ *
+ * XXX: Note that there are issues with the current child option
+ * structure quorum uses (such as the fact that children do
+ * not really have unique permanent names). Therefore, this
+ * is going to have to change in the future and ideally we
+ * want quorum to be covered by the generic implementation.
+ */
+
+ children_list = qlist_new();
+ qdict_put(target, "children", children_list);
- children = qlist_new();
for (i = 0; i < s->num_children; i++) {
- QINCREF(s->children[i]->bs->full_open_options);
- qlist_append(children, s->children[i]->bs->full_open_options);
+ qlist_append(children_list,
+ qobject_ref(s->children[i]->bs->full_open_options));
}
+}
- opts = qdict_new();
- qdict_put_str(opts, "driver", "quorum");
- qdict_put_int(opts, QUORUM_OPT_VOTE_THRESHOLD, s->threshold);
- qdict_put_bool(opts, QUORUM_OPT_BLKVERIFY, s->is_blkverify);
- qdict_put_bool(opts, QUORUM_OPT_REWRITE, s->rewrite_corrupted);
- qdict_put(opts, "children", children);
-
- bs->full_open_options = opts;
+static char *quorum_dirname(BlockDriverState *bs, Error **errp)
+{
+ /* In general, there are multiple BDSs with different dirnames below this
+ * one; so there is no unique dirname we could return (unless all are equal
+ * by chance, or there is only one). Therefore, to be consistent, just
+ * always return NULL. */
+ error_setg(errp, "Cannot generate a base directory for quorum nodes");
+ return NULL;
}
+static const char *const quorum_strong_runtime_opts[] = {
+ QUORUM_OPT_VOTE_THRESHOLD,
+ QUORUM_OPT_BLKVERIFY,
+ QUORUM_OPT_REWRITE,
+ QUORUM_OPT_READ_PATTERN,
+
+ NULL
+};
+
static BlockDriver bdrv_quorum = {
.format_name = "quorum",
- .protocol_name = "quorum",
.instance_size = sizeof(BDRVQuorumState),
- .bdrv_file_open = quorum_open,
+ .bdrv_open = quorum_open,
.bdrv_close = quorum_close,
- .bdrv_refresh_filename = quorum_refresh_filename,
+ .bdrv_gather_child_options = quorum_gather_child_options,
+ .bdrv_dirname = quorum_dirname,
.bdrv_co_flush_to_disk = quorum_co_flush,
.is_filter = true,
.bdrv_recurse_is_first_non_filter = quorum_recurse_is_first_non_filter,
+
+ .strong_runtime_opts = quorum_strong_runtime_opts,
};
static void bdrv_quorum_init(void)