]> Git Repo - qemu.git/blobdiff - block/rbd.c
block/rbd: add all the currently supported runtime_opts
[qemu.git] / block / rbd.c
index 5226b6fef8a9b38d6f9a85315bad6baac1e92cd0..67d680c1cc7f00cac8a7fb5bd0c3b260267df7d9 100644 (file)
 #define RBD_MAX_SNAP_NAME_SIZE 128
 #define RBD_MAX_SNAPS 100
 
+/* The LIBRBD_SUPPORTS_IOVEC is defined in librbd.h */
+#ifdef LIBRBD_SUPPORTS_IOVEC
+#define LIBRBD_USE_IOVEC 1
+#else
+#define LIBRBD_USE_IOVEC 0
+#endif
+
 typedef enum {
     RBD_AIO_READ,
     RBD_AIO_WRITE,
@@ -71,7 +78,6 @@ typedef enum {
 
 typedef struct RBDAIOCB {
     BlockAIOCB common;
-    QEMUBH *bh;
     int64_t ret;
     QEMUIOVector *qiov;
     char *bounce;
@@ -96,10 +102,10 @@ typedef struct BDRVRBDState {
     char *snap;
 } BDRVRBDState;
 
-static int qemu_rbd_next_tok(char *dst, int dst_len,
-                             char *src, char delim,
-                             const char *name,
-                             char **p, Error **errp)
+static char *qemu_rbd_next_tok(int max_len,
+                               char *src, char delim,
+                               const char *name,
+                               char **p, Error **errp)
 {
     int l;
     char *end;
@@ -121,17 +127,15 @@ static int qemu_rbd_next_tok(char *dst, int dst_len,
         }
     }
     l = strlen(src);
-    if (l >= dst_len) {
+    if (l >= max_len) {
         error_setg(errp, "%s too long", name);
-        return -EINVAL;
+        return NULL;
     } else if (l == 0) {
         error_setg(errp, "%s too short", name);
-        return -EINVAL;
+        return NULL;
     }
 
-    pstrcpy(dst, dst_len, src);
-
-    return 0;
+    return src;
 }
 
 static void qemu_rbd_unescape(char *src)
@@ -156,7 +160,9 @@ static int qemu_rbd_parsename(const char *filename,
 {
     const char *start;
     char *p, *buf;
-    int ret;
+    int ret = 0;
+    char *found_str;
+    Error *local_err = NULL;
 
     if (!strstart(filename, "rbd:", &start)) {
         error_setg(errp, "File name must start with 'rbd:'");
@@ -168,36 +174,60 @@ static int qemu_rbd_parsename(const char *filename,
     *snap = '\0';
     *conf = '\0';
 
-    ret = qemu_rbd_next_tok(pool, pool_len, p,
-                            '/', "pool name", &p, errp);
-    if (ret < 0 || !p) {
+    found_str = qemu_rbd_next_tok(pool_len, p,
+                                  '/', "pool name", &p, &local_err);
+    if (local_err) {
+        goto done;
+    }
+    if (!p) {
         ret = -EINVAL;
+        error_setg(errp, "Pool name is required");
         goto done;
     }
-    qemu_rbd_unescape(pool);
+    qemu_rbd_unescape(found_str);
+    g_strlcpy(pool, found_str, pool_len);
 
     if (strchr(p, '@')) {
-        ret = qemu_rbd_next_tok(name, name_len, p,
-                                '@', "object name", &p, errp);
-        if (ret < 0) {
+        found_str = qemu_rbd_next_tok(name_len, p,
+                                      '@', "object name", &p, &local_err);
+        if (local_err) {
+            goto done;
+        }
+        qemu_rbd_unescape(found_str);
+        g_strlcpy(name, found_str, name_len);
+
+        found_str = qemu_rbd_next_tok(snap_len, p,
+                                      ':', "snap name", &p, &local_err);
+        if (local_err) {
             goto done;
         }
-        ret = qemu_rbd_next_tok(snap, snap_len, p,
-                                ':', "snap name", &p, errp);
-        qemu_rbd_unescape(snap);
+        qemu_rbd_unescape(found_str);
+        g_strlcpy(snap, found_str, snap_len);
     } else {
-        ret = qemu_rbd_next_tok(name, name_len, p,
-                                ':', "object name", &p, errp);
+        found_str = qemu_rbd_next_tok(name_len, p,
+                                      ':', "object name", &p, &local_err);
+        if (local_err) {
+            goto done;
+        }
+        qemu_rbd_unescape(found_str);
+        g_strlcpy(name, found_str, name_len);
     }
-    qemu_rbd_unescape(name);
-    if (ret < 0 || !p) {
+    if (!p) {
         goto done;
     }
 
-    ret = qemu_rbd_next_tok(conf, conf_len, p,
-                            '\0', "configuration", &p, errp);
+    found_str = qemu_rbd_next_tok(conf_len, p,
+                                  '\0', "configuration", &p, &local_err);
+    if (local_err) {
+        goto done;
+    }
+    g_strlcpy(conf, found_str, conf_len);
 
 done:
+    if (local_err) {
+        ret = -EINVAL;
+        error_propagate(errp, local_err);
+    }
     g_free(buf);
     return ret;
 }
@@ -256,17 +286,18 @@ static int qemu_rbd_set_conf(rados_t cluster, const char *conf,
                              Error **errp)
 {
     char *p, *buf;
-    char name[RBD_MAX_CONF_NAME_SIZE];
-    char value[RBD_MAX_CONF_VAL_SIZE];
+    char *name;
+    char *value;
+    Error *local_err = NULL;
     int ret = 0;
 
     buf = g_strdup(conf);
     p = buf;
 
     while (p) {
-        ret = qemu_rbd_next_tok(name, sizeof(name), p,
-                                '=', "conf option name", &p, errp);
-        if (ret < 0) {
+        name = qemu_rbd_next_tok(RBD_MAX_CONF_NAME_SIZE, p,
+                                 '=', "conf option name", &p, &local_err);
+        if (local_err) {
             break;
         }
         qemu_rbd_unescape(name);
@@ -277,9 +308,9 @@ static int qemu_rbd_set_conf(rados_t cluster, const char *conf,
             break;
         }
 
-        ret = qemu_rbd_next_tok(value, sizeof(value), p,
-                                ':', "conf option value", &p, errp);
-        if (ret < 0) {
+        value = qemu_rbd_next_tok(RBD_MAX_CONF_VAL_SIZE, p,
+                                  ':', "conf option value", &p, &local_err);
+        if (local_err) {
             break;
         }
         qemu_rbd_unescape(value);
@@ -307,10 +338,74 @@ static int qemu_rbd_set_conf(rados_t cluster, const char *conf,
         }
     }
 
+    if (local_err) {
+        error_propagate(errp, local_err);
+        ret = -EINVAL;
+    }
     g_free(buf);
     return ret;
 }
 
+static void qemu_rbd_memset(RADOSCB *rcb, int64_t offs)
+{
+    if (LIBRBD_USE_IOVEC) {
+        RBDAIOCB *acb = rcb->acb;
+        iov_memset(acb->qiov->iov, acb->qiov->niov, offs, 0,
+                   acb->qiov->size - offs);
+    } else {
+        memset(rcb->buf + offs, 0, rcb->size - offs);
+    }
+}
+
+static QemuOptsList runtime_opts = {
+    .name = "rbd",
+    .head = QTAILQ_HEAD_INITIALIZER(runtime_opts.head),
+    .desc = {
+        {
+            .name = "filename",
+            .type = QEMU_OPT_STRING,
+            .help = "Specification of the rbd image",
+        },
+        {
+            .name = "password-secret",
+            .type = QEMU_OPT_STRING,
+            .help = "ID of secret providing the password",
+        },
+        {
+            .name = "conf",
+            .type = QEMU_OPT_STRING,
+            .help = "Rados config file location",
+        },
+        {
+            .name = "pool",
+            .type = QEMU_OPT_STRING,
+            .help = "Rados pool name",
+        },
+        {
+            .name = "image",
+            .type = QEMU_OPT_STRING,
+            .help = "Image name in the pool",
+        },
+        {
+            .name = "snapshot",
+            .type = QEMU_OPT_STRING,
+            .help = "Ceph snapshot name",
+        },
+        {
+            /* maps to 'id' in rados_create() */
+            .name = "user",
+            .type = QEMU_OPT_STRING,
+            .help = "Rados id name",
+        },
+        {
+            .name = "keyvalue-pairs",
+            .type = QEMU_OPT_STRING,
+            .help = "Legacy rados key/value option parameters",
+        },
+        { /* end of list */ }
+    },
+};
+
 static int qemu_rbd_create(const char *filename, QemuOpts *opts, Error **errp)
 {
     Error *local_err = NULL;
@@ -366,45 +461,44 @@ static int qemu_rbd_create(const char *filename, QemuOpts *opts, Error **errp)
         rados_conf_read_file(cluster, NULL);
     } else if (conf[0] != '\0' &&
                qemu_rbd_set_conf(cluster, conf, true, &local_err) < 0) {
-        rados_shutdown(cluster);
         error_propagate(errp, local_err);
-        return -EIO;
+        ret = -EIO;
+        goto shutdown;
     }
 
     if (conf[0] != '\0' &&
         qemu_rbd_set_conf(cluster, conf, false, &local_err) < 0) {
-        rados_shutdown(cluster);
         error_propagate(errp, local_err);
-        return -EIO;
+        ret = -EIO;
+        goto shutdown;
     }
 
     if (qemu_rbd_set_auth(cluster, secretid, errp) < 0) {
-        rados_shutdown(cluster);
-        return -EIO;
+        ret = -EIO;
+        goto shutdown;
     }
 
     ret = rados_connect(cluster);
     if (ret < 0) {
         error_setg_errno(errp, -ret, "error connecting");
-        rados_shutdown(cluster);
-        return ret;
+        goto shutdown;
     }
 
     ret = rados_ioctx_create(cluster, pool, &io_ctx);
     if (ret < 0) {
         error_setg_errno(errp, -ret, "error opening pool %s", pool);
-        rados_shutdown(cluster);
-        return ret;
+        goto shutdown;
     }
 
     ret = rbd_create(io_ctx, name, bytes, &obj_order);
-    rados_ioctx_destroy(io_ctx);
-    rados_shutdown(cluster);
     if (ret < 0) {
         error_setg_errno(errp, -ret, "error rbd create");
-        return ret;
     }
 
+    rados_ioctx_destroy(io_ctx);
+
+shutdown:
+    rados_shutdown(cluster);
     return ret;
 }
 
@@ -428,11 +522,11 @@ static void qemu_rbd_complete_aio(RADOSCB *rcb)
         }
     } else {
         if (r < 0) {
-            memset(rcb->buf, 0, rcb->size);
+            qemu_rbd_memset(rcb, 0);
             acb->ret = r;
             acb->error = 1;
         } else if (r < rcb->size) {
-            memset(rcb->buf + r, 0, rcb->size - r);
+            qemu_rbd_memset(rcb, r);
             if (!acb->error) {
                 acb->ret = rcb->size;
             }
@@ -443,34 +537,18 @@ static void qemu_rbd_complete_aio(RADOSCB *rcb)
 
     g_free(rcb);
 
-    if (acb->cmd == RBD_AIO_READ) {
-        qemu_iovec_from_buf(acb->qiov, 0, acb->bounce, acb->qiov->size);
+    if (!LIBRBD_USE_IOVEC) {
+        if (acb->cmd == RBD_AIO_READ) {
+            qemu_iovec_from_buf(acb->qiov, 0, acb->bounce, acb->qiov->size);
+        }
+        qemu_vfree(acb->bounce);
     }
-    qemu_vfree(acb->bounce);
+
     acb->common.cb(acb->common.opaque, (acb->ret > 0 ? 0 : acb->ret));
 
     qemu_aio_unref(acb);
 }
 
-/* TODO Convert to fine grained options */
-static QemuOptsList runtime_opts = {
-    .name = "rbd",
-    .head = QTAILQ_HEAD_INITIALIZER(runtime_opts.head),
-    .desc = {
-        {
-            .name = "filename",
-            .type = QEMU_OPT_STRING,
-            .help = "Specification of the rbd image",
-        },
-        {
-            .name = "password-secret",
-            .type = QEMU_OPT_STRING,
-            .help = "ID of secret providing the password",
-        },
-        { /* end of list */ }
-    },
-};
-
 static int qemu_rbd_open(BlockDriverState *bs, QDict *options, int flags,
                          Error **errp)
 {
@@ -602,7 +680,6 @@ static const AIOCBInfo rbd_aiocb_info = {
 static void rbd_finish_bh(void *opaque)
 {
     RADOSCB *rcb = opaque;
-    qemu_bh_delete(rcb->acb->bh);
     qemu_rbd_complete_aio(rcb);
 }
 
@@ -621,9 +698,8 @@ static void rbd_finish_aiocb(rbd_completion_t c, RADOSCB *rcb)
     rcb->ret = rbd_aio_get_return_value(c);
     rbd_aio_release(c);
 
-    acb->bh = aio_bh_new(bdrv_get_aio_context(acb->common.bs),
-                         rbd_finish_bh, rcb);
-    qemu_bh_schedule(acb->bh);
+    aio_bh_schedule_oneshot(bdrv_get_aio_context(acb->common.bs),
+                            rbd_finish_bh, rcb);
 }
 
 static int rbd_aio_discard_wrapper(rbd_image_t image,
@@ -649,9 +725,9 @@ static int rbd_aio_flush_wrapper(rbd_image_t image,
 }
 
 static BlockAIOCB *rbd_start_aio(BlockDriverState *bs,
-                                 int64_t sector_num,
+                                 int64_t off,
                                  QEMUIOVector *qiov,
-                                 int nb_sectors,
+                                 int64_t size,
                                  BlockCompletionFunc *cb,
                                  void *opaque,
                                  RBDAIOCmd cmd)
@@ -659,8 +735,6 @@ static BlockAIOCB *rbd_start_aio(BlockDriverState *bs,
     RBDAIOCB *acb;
     RADOSCB *rcb = NULL;
     rbd_completion_t c;
-    int64_t off, size;
-    char *buf;
     int r;
 
     BDRVRBDState *s = bs->opaque;
@@ -668,31 +742,30 @@ static BlockAIOCB *rbd_start_aio(BlockDriverState *bs,
     acb = qemu_aio_get(&rbd_aiocb_info, bs, cb, opaque);
     acb->cmd = cmd;
     acb->qiov = qiov;
-    if (cmd == RBD_AIO_DISCARD || cmd == RBD_AIO_FLUSH) {
-        acb->bounce = NULL;
-    } else {
-        acb->bounce = qemu_try_blockalign(bs, qiov->size);
-        if (acb->bounce == NULL) {
-            goto failed;
+    assert(!qiov || qiov->size == size);
+
+    rcb = g_new(RADOSCB, 1);
+
+    if (!LIBRBD_USE_IOVEC) {
+        if (cmd == RBD_AIO_DISCARD || cmd == RBD_AIO_FLUSH) {
+            acb->bounce = NULL;
+        } else {
+            acb->bounce = qemu_try_blockalign(bs, qiov->size);
+            if (acb->bounce == NULL) {
+                goto failed;
+            }
+        }
+        if (cmd == RBD_AIO_WRITE) {
+            qemu_iovec_to_buf(acb->qiov, 0, acb->bounce, qiov->size);
         }
+        rcb->buf = acb->bounce;
     }
+
     acb->ret = 0;
     acb->error = 0;
     acb->s = s;
-    acb->bh = NULL;
-
-    if (cmd == RBD_AIO_WRITE) {
-        qemu_iovec_to_buf(acb->qiov, 0, acb->bounce, qiov->size);
-    }
-
-    buf = acb->bounce;
 
-    off = sector_num * BDRV_SECTOR_SIZE;
-    size = nb_sectors * BDRV_SECTOR_SIZE;
-
-    rcb = g_new(RADOSCB, 1);
     rcb->acb = acb;
-    rcb->buf = buf;
     rcb->s = acb->s;
     rcb->size = size;
     r = rbd_aio_create_completion(rcb, (rbd_callback_t) rbd_finish_aiocb, &c);
@@ -702,10 +775,18 @@ static BlockAIOCB *rbd_start_aio(BlockDriverState *bs,
 
     switch (cmd) {
     case RBD_AIO_WRITE:
-        r = rbd_aio_write(s->image, off, size, buf, c);
+#ifdef LIBRBD_SUPPORTS_IOVEC
+            r = rbd_aio_writev(s->image, qiov->iov, qiov->niov, off, c);
+#else
+            r = rbd_aio_write(s->image, off, size, rcb->buf, c);
+#endif
         break;
     case RBD_AIO_READ:
-        r = rbd_aio_read(s->image, off, size, buf, c);
+#ifdef LIBRBD_SUPPORTS_IOVEC
+            r = rbd_aio_readv(s->image, qiov->iov, qiov->niov, off, c);
+#else
+            r = rbd_aio_read(s->image, off, size, rcb->buf, c);
+#endif
         break;
     case RBD_AIO_DISCARD:
         r = rbd_aio_discard_wrapper(s->image, off, size, c);
@@ -720,14 +801,16 @@ static BlockAIOCB *rbd_start_aio(BlockDriverState *bs,
     if (r < 0) {
         goto failed_completion;
     }
-
     return &acb->common;
 
 failed_completion:
     rbd_aio_release(c);
 failed:
     g_free(rcb);
-    qemu_vfree(acb->bounce);
+    if (!LIBRBD_USE_IOVEC) {
+        qemu_vfree(acb->bounce);
+    }
+
     qemu_aio_unref(acb);
     return NULL;
 }
@@ -739,7 +822,8 @@ static BlockAIOCB *qemu_rbd_aio_readv(BlockDriverState *bs,
                                       BlockCompletionFunc *cb,
                                       void *opaque)
 {
-    return rbd_start_aio(bs, sector_num, qiov, nb_sectors, cb, opaque,
+    return rbd_start_aio(bs, sector_num << BDRV_SECTOR_BITS, qiov,
+                         (int64_t) nb_sectors << BDRV_SECTOR_BITS, cb, opaque,
                          RBD_AIO_READ);
 }
 
@@ -750,7 +834,8 @@ static BlockAIOCB *qemu_rbd_aio_writev(BlockDriverState *bs,
                                        BlockCompletionFunc *cb,
                                        void *opaque)
 {
-    return rbd_start_aio(bs, sector_num, qiov, nb_sectors, cb, opaque,
+    return rbd_start_aio(bs, sector_num << BDRV_SECTOR_BITS, qiov,
+                         (int64_t) nb_sectors << BDRV_SECTOR_BITS, cb, opaque,
                          RBD_AIO_WRITE);
 }
 
@@ -883,10 +968,8 @@ static int qemu_rbd_snap_rollback(BlockDriverState *bs,
                                   const char *snapshot_name)
 {
     BDRVRBDState *s = bs->opaque;
-    int r;
 
-    r = rbd_snap_rollback(s->image, snapshot_name);
-    return r;
+    return rbd_snap_rollback(s->image, snapshot_name);
 }
 
 static int qemu_rbd_snap_list(BlockDriverState *bs,
@@ -933,13 +1016,13 @@ static int qemu_rbd_snap_list(BlockDriverState *bs,
 }
 
 #ifdef LIBRBD_SUPPORTS_DISCARD
-static BlockAIOCB* qemu_rbd_aio_discard(BlockDriverState *bs,
-                                        int64_t sector_num,
-                                        int nb_sectors,
-                                        BlockCompletionFunc *cb,
-                                        void *opaque)
+static BlockAIOCB *qemu_rbd_aio_pdiscard(BlockDriverState *bs,
+                                         int64_t offset,
+                                         int count,
+                                         BlockCompletionFunc *cb,
+                                         void *opaque)
 {
-    return rbd_start_aio(bs, sector_num, NULL, nb_sectors, cb, opaque,
+    return rbd_start_aio(bs, offset, NULL, count, cb, opaque,
                          RBD_AIO_DISCARD);
 }
 #endif
@@ -1003,7 +1086,7 @@ static BlockDriver bdrv_rbd = {
 #endif
 
 #ifdef LIBRBD_SUPPORTS_DISCARD
-    .bdrv_aio_discard       = qemu_rbd_aio_discard,
+    .bdrv_aio_pdiscard      = qemu_rbd_aio_pdiscard,
 #endif
 
     .bdrv_snapshot_create   = qemu_rbd_snap_create,
This page took 0.040738 seconds and 4 git commands to generate.