]> Git Repo - qemu.git/blobdiff - block/quorum.c
block/nbd: Use qdict_put()
[qemu.git] / block / quorum.c
index a5ae4b812b97678c7864ded1baf16e8b6a9b3afd..d122299352710d47253369b2bf447211ba7bb89c 100644 (file)
@@ -14,6 +14,7 @@
  */
 
 #include "qemu/osdep.h"
+#include "qemu/cutils.h"
 #include "block/block_int.h"
 #include "qapi/qmp/qbool.h"
 #include "qapi/qmp/qdict.h"
@@ -67,6 +68,9 @@ typedef struct QuorumVotes {
 typedef struct BDRVQuorumState {
     BdrvChild **children;  /* children BlockDriverStates */
     int num_children;      /* children count */
+    unsigned next_child_index;  /* the index of the next child that should
+                                 * be added
+                                 */
     int threshold;         /* if less than threshold children reads gave the
                             * same result a quorum error occurs.
                             */
@@ -126,7 +130,7 @@ struct QuorumAIOCB {
 
     bool is_read;
     int vote_ret;
-    int child_iter;             /* which child to read in fifo pattern */
+    int children_read;          /* how many children have been read from */
 };
 
 static bool quorum_vote(QuorumAIOCB *acb);
@@ -152,22 +156,7 @@ static AIOCBInfo quorum_aiocb_info = {
 
 static void quorum_aio_finalize(QuorumAIOCB *acb)
 {
-    int i, ret = 0;
-
-    if (acb->vote_ret) {
-        ret = acb->vote_ret;
-    }
-
-    acb->common.cb(acb->common.opaque, ret);
-
-    if (acb->is_read) {
-        /* on the quorum case acb->child_iter == s->num_children - 1 */
-        for (i = 0; i <= acb->child_iter; i++) {
-            qemu_vfree(acb->qcrs[i].buf);
-            qemu_iovec_destroy(&acb->qcrs[i].qiov);
-        }
-    }
-
+    acb->common.cb(acb->common.opaque, acb->vote_ret);
     g_free(acb->qcrs);
     qemu_aio_unref(acb);
 }
@@ -215,14 +204,16 @@ static QuorumAIOCB *quorum_aio_get(BDRVQuorumState *s,
     return acb;
 }
 
-static void quorum_report_bad(QuorumAIOCB *acb, char *node_name, int ret)
+static void quorum_report_bad(QuorumOpType type, uint64_t sector_num,
+                              int nb_sectors, char *node_name, int ret)
 {
     const char *msg = NULL;
     if (ret < 0) {
         msg = strerror(-ret);
     }
-    qapi_event_send_quorum_report_bad(!!msg, msg, node_name,
-                                      acb->sector_num, acb->nb_sectors, &error_abort);
+
+    qapi_event_send_quorum_report_bad(type, !!msg, msg, node_name,
+                                      sector_num, nb_sectors, &error_abort);
 }
 
 static void quorum_report_failure(QuorumAIOCB *acb)
@@ -277,35 +268,53 @@ static void quorum_copy_qiov(QEMUIOVector *dest, QEMUIOVector *source)
     }
 }
 
-static void quorum_aio_cb(void *opaque, int ret)
+static void quorum_report_bad_acb(QuorumChildRequest *sacb, int ret)
+{
+    QuorumAIOCB *acb = sacb->parent;
+    QuorumOpType type = acb->is_read ? QUORUM_OP_TYPE_READ : QUORUM_OP_TYPE_WRITE;
+    quorum_report_bad(type, acb->sector_num, acb->nb_sectors,
+                      sacb->aiocb->bs->node_name, ret);
+}
+
+static void quorum_fifo_aio_cb(void *opaque, int ret)
 {
     QuorumChildRequest *sacb = opaque;
     QuorumAIOCB *acb = sacb->parent;
     BDRVQuorumState *s = acb->common.bs->opaque;
-    bool rewrite = false;
 
-    if (acb->is_read && s->read_pattern == QUORUM_READ_PATTERN_FIFO) {
+    assert(acb->is_read && s->read_pattern == QUORUM_READ_PATTERN_FIFO);
+
+    if (ret < 0) {
+        quorum_report_bad_acb(sacb, ret);
+
         /* We try to read next child in FIFO order if we fail to read */
-        if (ret < 0 && ++acb->child_iter < s->num_children) {
+        if (acb->children_read < s->num_children) {
             read_fifo_child(acb);
             return;
         }
-
-        if (ret == 0) {
-            quorum_copy_qiov(acb->qiov, &acb->qcrs[acb->child_iter].qiov);
-        }
-        acb->vote_ret = ret;
-        quorum_aio_finalize(acb);
-        return;
     }
 
+    acb->vote_ret = ret;
+
+    /* FIXME: rewrite failed children if acb->children_read > 1? */
+    quorum_aio_finalize(acb);
+}
+
+static void quorum_aio_cb(void *opaque, int ret)
+{
+    QuorumChildRequest *sacb = opaque;
+    QuorumAIOCB *acb = sacb->parent;
+    BDRVQuorumState *s = acb->common.bs->opaque;
+    bool rewrite = false;
+    int i;
+
     sacb->ret = ret;
-    acb->count++;
     if (ret == 0) {
         acb->success_count++;
     } else {
-        quorum_report_bad(acb, sacb->aiocb->bs->node_name, ret);
+        quorum_report_bad_acb(sacb, ret);
     }
+    acb->count++;
     assert(acb->count <= s->num_children);
     assert(acb->success_count <= s->num_children);
     if (acb->count < s->num_children) {
@@ -315,6 +324,10 @@ static void quorum_aio_cb(void *opaque, int ret)
     /* Do the vote on read */
     if (acb->is_read) {
         rewrite = quorum_vote(acb);
+        for (i = 0; i < s->num_children; i++) {
+            qemu_vfree(acb->qcrs[i].buf);
+            qemu_iovec_destroy(&acb->qcrs[i].qiov);
+        }
     } else {
         quorum_has_too_much_io_failed(acb);
     }
@@ -337,7 +350,9 @@ static void quorum_report_bad_versions(BDRVQuorumState *s,
             continue;
         }
         QLIST_FOREACH(item, &version->items, next) {
-            quorum_report_bad(acb, s->children[item->index]->bs->node_name, 0);
+            quorum_report_bad(QUORUM_OP_TYPE_READ, acb->sector_num,
+                              acb->nb_sectors,
+                              s->children[item->index]->bs->node_name, 0);
         }
     }
 }
@@ -370,7 +385,7 @@ static bool quorum_rewrite_bad_versions(BDRVQuorumState *s, QuorumAIOCB *acb,
             continue;
         }
         QLIST_FOREACH(item, &version->items, next) {
-            bdrv_aio_writev(s->children[item->index]->bs, acb->sector_num,
+            bdrv_aio_writev(s->children[item->index], acb->sector_num,
                             acb->qiov, acb->nb_sectors, quorum_rewrite_aio_cb,
                             acb);
         }
@@ -640,6 +655,7 @@ static BlockAIOCB *read_quorum_children(QuorumAIOCB *acb)
     BDRVQuorumState *s = acb->common.bs->opaque;
     int i;
 
+    acb->children_read = s->num_children;
     for (i = 0; i < s->num_children; i++) {
         acb->qcrs[i].buf = qemu_blockalign(s->children[i]->bs, acb->qiov->size);
         qemu_iovec_init(&acb->qcrs[i].qiov, acb->qiov->niov);
@@ -647,8 +663,9 @@ static BlockAIOCB *read_quorum_children(QuorumAIOCB *acb)
     }
 
     for (i = 0; i < s->num_children; i++) {
-        bdrv_aio_readv(s->children[i]->bs, acb->sector_num, &acb->qcrs[i].qiov,
-                       acb->nb_sectors, quorum_aio_cb, &acb->qcrs[i]);
+        acb->qcrs[i].aiocb = bdrv_aio_readv(s->children[i], acb->sector_num,
+                                            &acb->qcrs[i].qiov, acb->nb_sectors,
+                                            quorum_aio_cb, &acb->qcrs[i]);
     }
 
     return &acb->common;
@@ -657,15 +674,11 @@ static BlockAIOCB *read_quorum_children(QuorumAIOCB *acb)
 static BlockAIOCB *read_fifo_child(QuorumAIOCB *acb)
 {
     BDRVQuorumState *s = acb->common.bs->opaque;
+    int n = acb->children_read++;
 
-    acb->qcrs[acb->child_iter].buf =
-        qemu_blockalign(s->children[acb->child_iter]->bs, acb->qiov->size);
-    qemu_iovec_init(&acb->qcrs[acb->child_iter].qiov, acb->qiov->niov);
-    qemu_iovec_clone(&acb->qcrs[acb->child_iter].qiov, acb->qiov,
-                     acb->qcrs[acb->child_iter].buf);
-    bdrv_aio_readv(s->children[acb->child_iter]->bs, acb->sector_num,
-                   &acb->qcrs[acb->child_iter].qiov, acb->nb_sectors,
-                   quorum_aio_cb, &acb->qcrs[acb->child_iter]);
+    acb->qcrs[n].aiocb = bdrv_aio_readv(s->children[n], acb->sector_num,
+                                        acb->qiov, acb->nb_sectors,
+                                        quorum_fifo_aio_cb, &acb->qcrs[n]);
 
     return &acb->common;
 }
@@ -681,13 +694,12 @@ static BlockAIOCB *quorum_aio_readv(BlockDriverState *bs,
     QuorumAIOCB *acb = quorum_aio_get(s, bs, qiov, sector_num,
                                       nb_sectors, cb, opaque);
     acb->is_read = true;
+    acb->children_read = 0;
 
     if (s->read_pattern == QUORUM_READ_PATTERN_QUORUM) {
-        acb->child_iter = s->num_children - 1;
         return read_quorum_children(acb);
     }
 
-    acb->child_iter = 0;
     return read_fifo_child(acb);
 }
 
@@ -704,7 +716,7 @@ static BlockAIOCB *quorum_aio_writev(BlockDriverState *bs,
     int i;
 
     for (i = 0; i < s->num_children; i++) {
-        acb->qcrs[i].aiocb = bdrv_aio_writev(s->children[i]->bs, sector_num,
+        acb->qcrs[i].aiocb = bdrv_aio_writev(s->children[i], sector_num,
                                              qiov, nb_sectors, &quorum_aio_cb,
                                              &acb->qcrs[i]);
     }
@@ -736,21 +748,6 @@ static int64_t quorum_getlength(BlockDriverState *bs)
     return result;
 }
 
-static void quorum_invalidate_cache(BlockDriverState *bs, Error **errp)
-{
-    BDRVQuorumState *s = bs->opaque;
-    Error *local_err = NULL;
-    int i;
-
-    for (i = 0; i < s->num_children; i++) {
-        bdrv_invalidate_cache(s->children[i]->bs, &local_err);
-        if (local_err) {
-            error_propagate(errp, local_err);
-            return;
-        }
-    }
-}
-
 static coroutine_fn int quorum_co_flush(BlockDriverState *bs)
 {
     BDRVQuorumState *s = bs->opaque;
@@ -759,19 +756,30 @@ static coroutine_fn int quorum_co_flush(BlockDriverState *bs)
     QuorumVoteValue result_value;
     int i;
     int result = 0;
+    int success_count = 0;
 
     QLIST_INIT(&error_votes.vote_list);
     error_votes.compare = quorum_64bits_compare;
 
     for (i = 0; i < s->num_children; i++) {
         result = bdrv_co_flush(s->children[i]->bs);
-        result_value.l = result;
-        quorum_count_vote(&error_votes, &result_value, i);
+        if (result) {
+            quorum_report_bad(QUORUM_OP_TYPE_FLUSH, 0,
+                              bdrv_nb_sectors(s->children[i]->bs),
+                              s->children[i]->bs->node_name, result);
+            result_value.l = result;
+            quorum_count_vote(&error_votes, &result_value, i);
+        } else {
+            success_count++;
+        }
     }
 
-    winner = quorum_get_vote_winner(&error_votes);
-    result = winner->value.l;
-
+    if (success_count >= s->threshold) {
+        result = 0;
+    } else {
+        winner = quorum_get_vote_winner(&error_votes);
+        result = winner->value.l;
+    }
     quorum_free_vote_list(&error_votes);
 
     return result;
@@ -876,9 +884,9 @@ static int quorum_open(BlockDriverState *bs, QDict *options, int flags,
         ret = -EINVAL;
         goto exit;
     }
-    if (s->num_children < 2) {
+    if (s->num_children < 1) {
         error_setg(&local_err,
-                   "Number of provided children must be greater than 1");
+                   "Number of provided children must be 1 or more");
         ret = -EINVAL;
         goto exit;
     }
@@ -942,6 +950,7 @@ static int quorum_open(BlockDriverState *bs, QDict *options, int flags,
 
         opened[i] = true;
     }
+    s->next_child_index = s->num_children;
 
     g_free(opened);
     goto exit;
@@ -959,9 +968,7 @@ close_exit:
 exit:
     qemu_opts_del(opts);
     /* propagate error */
-    if (local_err) {
-        error_propagate(errp, local_err);
-    }
+    error_propagate(errp, local_err);
     return ret;
 }
 
@@ -977,25 +984,70 @@ static void quorum_close(BlockDriverState *bs)
     g_free(s->children);
 }
 
-static void quorum_detach_aio_context(BlockDriverState *bs)
+static void quorum_add_child(BlockDriverState *bs, BlockDriverState *child_bs,
+                             Error **errp)
 {
     BDRVQuorumState *s = bs->opaque;
-    int i;
+    BdrvChild *child;
+    char indexstr[32];
+    int ret;
 
-    for (i = 0; i < s->num_children; i++) {
-        bdrv_detach_aio_context(s->children[i]->bs);
+    assert(s->num_children <= INT_MAX / sizeof(BdrvChild *));
+    if (s->num_children == INT_MAX / sizeof(BdrvChild *) ||
+        s->next_child_index == UINT_MAX) {
+        error_setg(errp, "Too many children");
+        return;
     }
+
+    ret = snprintf(indexstr, 32, "children.%u", s->next_child_index);
+    if (ret < 0 || ret >= 32) {
+        error_setg(errp, "cannot generate child name");
+        return;
+    }
+    s->next_child_index++;
+
+    bdrv_drained_begin(bs);
+
+    /* We can safely add the child now */
+    bdrv_ref(child_bs);
+    child = bdrv_attach_child(bs, child_bs, indexstr, &child_format);
+    s->children = g_renew(BdrvChild *, s->children, s->num_children + 1);
+    s->children[s->num_children++] = child;
+
+    bdrv_drained_end(bs);
 }
 
-static void quorum_attach_aio_context(BlockDriverState *bs,
-                                      AioContext *new_context)
+static void quorum_del_child(BlockDriverState *bs, BdrvChild *child,
+                             Error **errp)
 {
     BDRVQuorumState *s = bs->opaque;
     int i;
 
     for (i = 0; i < s->num_children; i++) {
-        bdrv_attach_aio_context(s->children[i]->bs, new_context);
+        if (s->children[i] == child) {
+            break;
+        }
+    }
+
+    /* we have checked it in bdrv_del_child() */
+    assert(i < s->num_children);
+
+    if (s->num_children <= s->threshold) {
+        error_setg(errp,
+            "The number of children cannot be lower than the vote threshold %d",
+            s->threshold);
+        return;
     }
+
+    bdrv_drained_begin(bs);
+
+    /* We can safely remove this child now */
+    memmove(&s->children[i], &s->children[i + 1],
+            (s->num_children - i - 1) * sizeof(BdrvChild *));
+    s->children = g_renew(BdrvChild *, s->children, --s->num_children);
+    bdrv_unref_child(bs, child);
+
+    bdrv_drained_end(bs);
 }
 
 static void quorum_refresh_filename(BlockDriverState *bs, QDict *options)
@@ -1048,10 +1100,9 @@ static BlockDriver bdrv_quorum = {
 
     .bdrv_aio_readv                     = quorum_aio_readv,
     .bdrv_aio_writev                    = quorum_aio_writev,
-    .bdrv_invalidate_cache              = quorum_invalidate_cache,
 
-    .bdrv_detach_aio_context            = quorum_detach_aio_context,
-    .bdrv_attach_aio_context            = quorum_attach_aio_context,
+    .bdrv_add_child                     = quorum_add_child,
+    .bdrv_del_child                     = quorum_del_child,
 
     .is_filter                          = true,
     .bdrv_recurse_is_first_non_filter   = quorum_recurse_is_first_non_filter,
This page took 0.038942 seconds and 4 git commands to generate.