*/
#include "qemu/osdep.h"
+#include "qemu/cutils.h"
#include "block/block_int.h"
#include "qapi/qmp/qbool.h"
#include "qapi/qmp/qdict.h"
typedef struct BDRVQuorumState {
BdrvChild **children; /* children BlockDriverStates */
int num_children; /* children count */
+ unsigned next_child_index; /* the index of the next child that should
+ * be added
+ */
int threshold; /* if less than threshold children reads gave the
* same result a quorum error occurs.
*/
bool is_read;
int vote_ret;
- int child_iter; /* which child to read in fifo pattern */
+ int children_read; /* how many children have been read from */
};
static bool quorum_vote(QuorumAIOCB *acb);
static void quorum_aio_finalize(QuorumAIOCB *acb)
{
- int i, ret = 0;
-
- if (acb->vote_ret) {
- ret = acb->vote_ret;
- }
-
- acb->common.cb(acb->common.opaque, ret);
-
- if (acb->is_read) {
- /* on the quorum case acb->child_iter == s->num_children - 1 */
- for (i = 0; i <= acb->child_iter; i++) {
- qemu_vfree(acb->qcrs[i].buf);
- qemu_iovec_destroy(&acb->qcrs[i].qiov);
- }
- }
-
+ acb->common.cb(acb->common.opaque, acb->vote_ret);
g_free(acb->qcrs);
qemu_aio_unref(acb);
}
return acb;
}
-static void quorum_report_bad(QuorumAIOCB *acb, char *node_name, int ret)
+static void quorum_report_bad(QuorumOpType type, uint64_t sector_num,
+ int nb_sectors, char *node_name, int ret)
{
const char *msg = NULL;
if (ret < 0) {
msg = strerror(-ret);
}
- qapi_event_send_quorum_report_bad(!!msg, msg, node_name,
- acb->sector_num, acb->nb_sectors, &error_abort);
+
+ qapi_event_send_quorum_report_bad(type, !!msg, msg, node_name,
+ sector_num, nb_sectors, &error_abort);
}
static void quorum_report_failure(QuorumAIOCB *acb)
}
}
-static void quorum_aio_cb(void *opaque, int ret)
+static void quorum_report_bad_acb(QuorumChildRequest *sacb, int ret)
+{
+ QuorumAIOCB *acb = sacb->parent;
+ QuorumOpType type = acb->is_read ? QUORUM_OP_TYPE_READ : QUORUM_OP_TYPE_WRITE;
+ quorum_report_bad(type, acb->sector_num, acb->nb_sectors,
+ sacb->aiocb->bs->node_name, ret);
+}
+
+static void quorum_fifo_aio_cb(void *opaque, int ret)
{
QuorumChildRequest *sacb = opaque;
QuorumAIOCB *acb = sacb->parent;
BDRVQuorumState *s = acb->common.bs->opaque;
- bool rewrite = false;
- if (acb->is_read && s->read_pattern == QUORUM_READ_PATTERN_FIFO) {
+ assert(acb->is_read && s->read_pattern == QUORUM_READ_PATTERN_FIFO);
+
+ if (ret < 0) {
+ quorum_report_bad_acb(sacb, ret);
+
/* We try to read next child in FIFO order if we fail to read */
- if (ret < 0 && ++acb->child_iter < s->num_children) {
+ if (acb->children_read < s->num_children) {
read_fifo_child(acb);
return;
}
-
- if (ret == 0) {
- quorum_copy_qiov(acb->qiov, &acb->qcrs[acb->child_iter].qiov);
- }
- acb->vote_ret = ret;
- quorum_aio_finalize(acb);
- return;
}
+ acb->vote_ret = ret;
+
+ /* FIXME: rewrite failed children if acb->children_read > 1? */
+ quorum_aio_finalize(acb);
+}
+
+static void quorum_aio_cb(void *opaque, int ret)
+{
+ QuorumChildRequest *sacb = opaque;
+ QuorumAIOCB *acb = sacb->parent;
+ BDRVQuorumState *s = acb->common.bs->opaque;
+ bool rewrite = false;
+ int i;
+
sacb->ret = ret;
- acb->count++;
if (ret == 0) {
acb->success_count++;
} else {
- quorum_report_bad(acb, sacb->aiocb->bs->node_name, ret);
+ quorum_report_bad_acb(sacb, ret);
}
+ acb->count++;
assert(acb->count <= s->num_children);
assert(acb->success_count <= s->num_children);
if (acb->count < s->num_children) {
/* Do the vote on read */
if (acb->is_read) {
rewrite = quorum_vote(acb);
+ for (i = 0; i < s->num_children; i++) {
+ qemu_vfree(acb->qcrs[i].buf);
+ qemu_iovec_destroy(&acb->qcrs[i].qiov);
+ }
} else {
quorum_has_too_much_io_failed(acb);
}
continue;
}
QLIST_FOREACH(item, &version->items, next) {
- quorum_report_bad(acb, s->children[item->index]->bs->node_name, 0);
+ quorum_report_bad(QUORUM_OP_TYPE_READ, acb->sector_num,
+ acb->nb_sectors,
+ s->children[item->index]->bs->node_name, 0);
}
}
}
continue;
}
QLIST_FOREACH(item, &version->items, next) {
- bdrv_aio_writev(s->children[item->index]->bs, acb->sector_num,
+ bdrv_aio_writev(s->children[item->index], acb->sector_num,
acb->qiov, acb->nb_sectors, quorum_rewrite_aio_cb,
acb);
}
BDRVQuorumState *s = acb->common.bs->opaque;
int i;
+ acb->children_read = s->num_children;
for (i = 0; i < s->num_children; i++) {
acb->qcrs[i].buf = qemu_blockalign(s->children[i]->bs, acb->qiov->size);
qemu_iovec_init(&acb->qcrs[i].qiov, acb->qiov->niov);
}
for (i = 0; i < s->num_children; i++) {
- bdrv_aio_readv(s->children[i]->bs, acb->sector_num, &acb->qcrs[i].qiov,
- acb->nb_sectors, quorum_aio_cb, &acb->qcrs[i]);
+ acb->qcrs[i].aiocb = bdrv_aio_readv(s->children[i], acb->sector_num,
+ &acb->qcrs[i].qiov, acb->nb_sectors,
+ quorum_aio_cb, &acb->qcrs[i]);
}
return &acb->common;
static BlockAIOCB *read_fifo_child(QuorumAIOCB *acb)
{
BDRVQuorumState *s = acb->common.bs->opaque;
+ int n = acb->children_read++;
- acb->qcrs[acb->child_iter].buf =
- qemu_blockalign(s->children[acb->child_iter]->bs, acb->qiov->size);
- qemu_iovec_init(&acb->qcrs[acb->child_iter].qiov, acb->qiov->niov);
- qemu_iovec_clone(&acb->qcrs[acb->child_iter].qiov, acb->qiov,
- acb->qcrs[acb->child_iter].buf);
- bdrv_aio_readv(s->children[acb->child_iter]->bs, acb->sector_num,
- &acb->qcrs[acb->child_iter].qiov, acb->nb_sectors,
- quorum_aio_cb, &acb->qcrs[acb->child_iter]);
+ acb->qcrs[n].aiocb = bdrv_aio_readv(s->children[n], acb->sector_num,
+ acb->qiov, acb->nb_sectors,
+ quorum_fifo_aio_cb, &acb->qcrs[n]);
return &acb->common;
}
QuorumAIOCB *acb = quorum_aio_get(s, bs, qiov, sector_num,
nb_sectors, cb, opaque);
acb->is_read = true;
+ acb->children_read = 0;
if (s->read_pattern == QUORUM_READ_PATTERN_QUORUM) {
- acb->child_iter = s->num_children - 1;
return read_quorum_children(acb);
}
- acb->child_iter = 0;
return read_fifo_child(acb);
}
int i;
for (i = 0; i < s->num_children; i++) {
- acb->qcrs[i].aiocb = bdrv_aio_writev(s->children[i]->bs, sector_num,
+ acb->qcrs[i].aiocb = bdrv_aio_writev(s->children[i], sector_num,
qiov, nb_sectors, &quorum_aio_cb,
&acb->qcrs[i]);
}
return result;
}
-static void quorum_invalidate_cache(BlockDriverState *bs, Error **errp)
-{
- BDRVQuorumState *s = bs->opaque;
- Error *local_err = NULL;
- int i;
-
- for (i = 0; i < s->num_children; i++) {
- bdrv_invalidate_cache(s->children[i]->bs, &local_err);
- if (local_err) {
- error_propagate(errp, local_err);
- return;
- }
- }
-}
-
static coroutine_fn int quorum_co_flush(BlockDriverState *bs)
{
BDRVQuorumState *s = bs->opaque;
QuorumVoteValue result_value;
int i;
int result = 0;
+ int success_count = 0;
QLIST_INIT(&error_votes.vote_list);
error_votes.compare = quorum_64bits_compare;
for (i = 0; i < s->num_children; i++) {
result = bdrv_co_flush(s->children[i]->bs);
- result_value.l = result;
- quorum_count_vote(&error_votes, &result_value, i);
+ if (result) {
+ quorum_report_bad(QUORUM_OP_TYPE_FLUSH, 0,
+ bdrv_nb_sectors(s->children[i]->bs),
+ s->children[i]->bs->node_name, result);
+ result_value.l = result;
+ quorum_count_vote(&error_votes, &result_value, i);
+ } else {
+ success_count++;
+ }
}
- winner = quorum_get_vote_winner(&error_votes);
- result = winner->value.l;
-
+ if (success_count >= s->threshold) {
+ result = 0;
+ } else {
+ winner = quorum_get_vote_winner(&error_votes);
+ result = winner->value.l;
+ }
quorum_free_vote_list(&error_votes);
return result;
ret = -EINVAL;
goto exit;
}
- if (s->num_children < 2) {
+ if (s->num_children < 1) {
error_setg(&local_err,
- "Number of provided children must be greater than 1");
+ "Number of provided children must be 1 or more");
ret = -EINVAL;
goto exit;
}
opened[i] = true;
}
+ s->next_child_index = s->num_children;
g_free(opened);
goto exit;
exit:
qemu_opts_del(opts);
/* propagate error */
- if (local_err) {
- error_propagate(errp, local_err);
- }
+ error_propagate(errp, local_err);
return ret;
}
g_free(s->children);
}
-static void quorum_detach_aio_context(BlockDriverState *bs)
+static void quorum_add_child(BlockDriverState *bs, BlockDriverState *child_bs,
+ Error **errp)
{
BDRVQuorumState *s = bs->opaque;
- int i;
+ BdrvChild *child;
+ char indexstr[32];
+ int ret;
- for (i = 0; i < s->num_children; i++) {
- bdrv_detach_aio_context(s->children[i]->bs);
+ assert(s->num_children <= INT_MAX / sizeof(BdrvChild *));
+ if (s->num_children == INT_MAX / sizeof(BdrvChild *) ||
+ s->next_child_index == UINT_MAX) {
+ error_setg(errp, "Too many children");
+ return;
}
+
+ ret = snprintf(indexstr, 32, "children.%u", s->next_child_index);
+ if (ret < 0 || ret >= 32) {
+ error_setg(errp, "cannot generate child name");
+ return;
+ }
+ s->next_child_index++;
+
+ bdrv_drained_begin(bs);
+
+ /* We can safely add the child now */
+ bdrv_ref(child_bs);
+ child = bdrv_attach_child(bs, child_bs, indexstr, &child_format);
+ s->children = g_renew(BdrvChild *, s->children, s->num_children + 1);
+ s->children[s->num_children++] = child;
+
+ bdrv_drained_end(bs);
}
-static void quorum_attach_aio_context(BlockDriverState *bs,
- AioContext *new_context)
+static void quorum_del_child(BlockDriverState *bs, BdrvChild *child,
+ Error **errp)
{
BDRVQuorumState *s = bs->opaque;
int i;
for (i = 0; i < s->num_children; i++) {
- bdrv_attach_aio_context(s->children[i]->bs, new_context);
+ if (s->children[i] == child) {
+ break;
+ }
+ }
+
+ /* we have checked it in bdrv_del_child() */
+ assert(i < s->num_children);
+
+ if (s->num_children <= s->threshold) {
+ error_setg(errp,
+ "The number of children cannot be lower than the vote threshold %d",
+ s->threshold);
+ return;
}
+
+ bdrv_drained_begin(bs);
+
+ /* We can safely remove this child now */
+ memmove(&s->children[i], &s->children[i + 1],
+ (s->num_children - i - 1) * sizeof(BdrvChild *));
+ s->children = g_renew(BdrvChild *, s->children, --s->num_children);
+ bdrv_unref_child(bs, child);
+
+ bdrv_drained_end(bs);
}
static void quorum_refresh_filename(BlockDriverState *bs, QDict *options)
.bdrv_aio_readv = quorum_aio_readv,
.bdrv_aio_writev = quorum_aio_writev,
- .bdrv_invalidate_cache = quorum_invalidate_cache,
- .bdrv_detach_aio_context = quorum_detach_aio_context,
- .bdrv_attach_aio_context = quorum_attach_aio_context,
+ .bdrv_add_child = quorum_add_child,
+ .bdrv_del_child = quorum_del_child,
.is_filter = true,
.bdrv_recurse_is_first_non_filter = quorum_recurse_is_first_non_filter,