]> Git Repo - qemu.git/blobdiff - block/rbd.c
vmdk: Implement .bdrv_get_info()
[qemu.git] / block / rbd.c
index 737bab16cc1002d06abe88119adcc24a89fb3462..dbc79f45251c863f956ec05c28bd84d79a42a431 100644 (file)
@@ -14,8 +14,8 @@
 #include <inttypes.h>
 
 #include "qemu-common.h"
-#include "qemu-error.h"
-#include "block_int.h"
+#include "qemu/error-report.h"
+#include "block/block_int.h"
 
 #include <rbd/librbd.h>
 
@@ -63,7 +63,8 @@
 typedef enum {
     RBD_AIO_READ,
     RBD_AIO_WRITE,
-    RBD_AIO_DISCARD
+    RBD_AIO_DISCARD,
+    RBD_AIO_FLUSH
 } RBDAIOCmd;
 
 typedef struct RBDAIOCB {
@@ -94,19 +95,13 @@ typedef struct RADOSCB {
 #define RBD_FD_WRITE 1
 
 typedef struct BDRVRBDState {
-    int fds[2];
     rados_t cluster;
     rados_ioctx_t io_ctx;
     rbd_image_t image;
     char name[RBD_MAX_IMAGE_NAME_SIZE];
-    int qemu_aio_count;
     char *snap;
-    int event_reader_pos;
-    RADOSCB *event_rcb;
 } BDRVRBDState;
 
-static void rbd_aio_bh_cb(void *opaque);
-
 static int qemu_rbd_next_tok(char *dst, int dst_len,
                              char *src, char delim,
                              const char *name,
@@ -287,7 +282,8 @@ static int qemu_rbd_set_conf(rados_t cluster, const char *conf)
     return ret;
 }
 
-static int qemu_rbd_create(const char *filename, QEMUOptionParameter *options)
+static int qemu_rbd_create(const char *filename, QEMUOptionParameter *options,
+                           Error **errp)
 {
     int64_t bytes = 0;
     int64_t objsize;
@@ -368,9 +364,8 @@ static int qemu_rbd_create(const char *filename, QEMUOptionParameter *options)
 }
 
 /*
- * This aio completion is being called from qemu_rbd_aio_event_reader()
- * and runs in qemu context. It schedules a bh, but just in case the aio
- * was not cancelled before.
+ * This aio completion is being called from rbd_finish_bh() and runs in qemu
+ * BH context.
  */
 static void qemu_rbd_complete_aio(RADOSCB *rcb)
 {
@@ -379,8 +374,7 @@ static void qemu_rbd_complete_aio(RADOSCB *rcb)
 
     r = rcb->ret;
 
-    if (acb->cmd == RBD_AIO_WRITE ||
-        acb->cmd == RBD_AIO_DISCARD) {
+    if (acb->cmd != RBD_AIO_READ) {
         if (r < 0) {
             acb->ret = r;
             acb->error = 1;
@@ -401,47 +395,37 @@ static void qemu_rbd_complete_aio(RADOSCB *rcb)
             acb->ret = r;
         }
     }
-    /* Note that acb->bh can be NULL in case where the aio was cancelled */
-    acb->bh = qemu_bh_new(rbd_aio_bh_cb, acb);
-    qemu_bh_schedule(acb->bh);
-    g_free(rcb);
-}
 
-/*
- * aio fd read handler. It runs in the qemu context and calls the
- * completion handling of completed rados aio operations.
- */
-static void qemu_rbd_aio_event_reader(void *opaque)
-{
-    BDRVRBDState *s = opaque;
+    g_free(rcb);
 
-    ssize_t ret;
+    if (acb->cmd == RBD_AIO_READ) {
+        qemu_iovec_from_buf(acb->qiov, 0, acb->bounce, acb->qiov->size);
+    }
+    qemu_vfree(acb->bounce);
+    acb->common.cb(acb->common.opaque, (acb->ret > 0 ? 0 : acb->ret));
+    acb->status = 0;
 
-    do {
-        char *p = (char *)&s->event_rcb;
-
-        /* now read the rcb pointer that was sent from a non qemu thread */
-        ret = read(s->fds[RBD_FD_READ], p + s->event_reader_pos,
-                   sizeof(s->event_rcb) - s->event_reader_pos);
-        if (ret > 0) {
-            s->event_reader_pos += ret;
-            if (s->event_reader_pos == sizeof(s->event_rcb)) {
-                s->event_reader_pos = 0;
-                qemu_rbd_complete_aio(s->event_rcb);
-                s->qemu_aio_count--;
-            }
-        }
-    } while (ret < 0 && errno == EINTR);
+    if (!acb->cancelled) {
+        qemu_aio_release(acb);
+    }
 }
 
-static int qemu_rbd_aio_flush_cb(void *opaque)
-{
-    BDRVRBDState *s = opaque;
-
-    return (s->qemu_aio_count > 0);
-}
+/* TODO Convert to fine grained options */
+static QemuOptsList runtime_opts = {
+    .name = "rbd",
+    .head = QTAILQ_HEAD_INITIALIZER(runtime_opts.head),
+    .desc = {
+        {
+            .name = "filename",
+            .type = QEMU_OPT_STRING,
+            .help = "Specification of the rbd image",
+        },
+        { /* end of list */ }
+    },
+};
 
-static int qemu_rbd_open(BlockDriverState *bs, const char *filename, int flags)
+static int qemu_rbd_open(BlockDriverState *bs, QDict *options, int flags,
+                         Error **errp)
 {
     BDRVRBDState *s = bs->opaque;
     char pool[RBD_MAX_POOL_NAME_SIZE];
@@ -449,20 +433,35 @@ static int qemu_rbd_open(BlockDriverState *bs, const char *filename, int flags)
     char conf[RBD_MAX_CONF_SIZE];
     char clientname_buf[RBD_MAX_CONF_SIZE];
     char *clientname;
+    QemuOpts *opts;
+    Error *local_err = NULL;
+    const char *filename;
     int r;
 
+    opts = qemu_opts_create(&runtime_opts, NULL, 0, &error_abort);
+    qemu_opts_absorb_qdict(opts, options, &local_err);
+    if (local_err) {
+        qerror_report_err(local_err);
+        error_free(local_err);
+        qemu_opts_del(opts);
+        return -EINVAL;
+    }
+
+    filename = qemu_opt_get(opts, "filename");
+
     if (qemu_rbd_parsename(filename, pool, sizeof(pool),
                            snap_buf, sizeof(snap_buf),
                            s->name, sizeof(s->name),
                            conf, sizeof(conf)) < 0) {
-        return -EINVAL;
+        r = -EINVAL;
+        goto failed_opts;
     }
 
     clientname = qemu_rbd_parse_clientname(conf, clientname_buf);
     r = rados_create(&s->cluster, clientname);
     if (r < 0) {
         error_report("error initializing");
-        return r;
+        goto failed_opts;
     }
 
     s->snap = NULL;
@@ -516,27 +515,16 @@ static int qemu_rbd_open(BlockDriverState *bs, const char *filename, int flags)
 
     bs->read_only = (s->snap != NULL);
 
-    s->event_reader_pos = 0;
-    r = qemu_pipe(s->fds);
-    if (r < 0) {
-        error_report("error opening eventfd");
-        goto failed;
-    }
-    fcntl(s->fds[0], F_SETFL, O_NONBLOCK);
-    fcntl(s->fds[1], F_SETFL, O_NONBLOCK);
-    qemu_aio_set_fd_handler(s->fds[RBD_FD_READ], qemu_rbd_aio_event_reader,
-                            NULL, qemu_rbd_aio_flush_cb, s);
-
-
+    qemu_opts_del(opts);
     return 0;
 
-failed:
-    rbd_close(s->image);
 failed_open:
     rados_ioctx_destroy(s->io_ctx);
 failed_shutdown:
     rados_shutdown(s->cluster);
     g_free(s->snap);
+failed_opts:
+    qemu_opts_del(opts);
     return r;
 }
 
@@ -544,10 +532,6 @@ static void qemu_rbd_close(BlockDriverState *bs)
 {
     BDRVRBDState *s = bs->opaque;
 
-    close(s->fds[0]);
-    close(s->fds[1]);
-    qemu_aio_set_fd_handler(s->fds[RBD_FD_READ], NULL, NULL, NULL, NULL);
-
     rbd_close(s->image);
     rados_ioctx_destroy(s->io_ctx);
     g_free(s->snap);
@@ -575,34 +559,11 @@ static const AIOCBInfo rbd_aiocb_info = {
     .cancel = qemu_rbd_aio_cancel,
 };
 
-static int qemu_rbd_send_pipe(BDRVRBDState *s, RADOSCB *rcb)
+static void rbd_finish_bh(void *opaque)
 {
-    int ret = 0;
-    while (1) {
-        fd_set wfd;
-        int fd = s->fds[RBD_FD_WRITE];
-
-        /* send the op pointer to the qemu thread that is responsible
-           for the aio/op completion. Must do it in a qemu thread context */
-        ret = write(fd, (void *)&rcb, sizeof(rcb));
-        if (ret >= 0) {
-            break;
-        }
-        if (errno == EINTR) {
-            continue;
-        }
-        if (errno != EAGAIN) {
-            break;
-        }
-
-        FD_ZERO(&wfd);
-        FD_SET(fd, &wfd);
-        do {
-            ret = select(fd + 1, NULL, &wfd, NULL, NULL);
-        } while (ret < 0 && errno == EINTR);
-    }
-
-    return ret;
+    RADOSCB *rcb = opaque;
+    qemu_bh_delete(rcb->acb->bh);
+    qemu_rbd_complete_aio(rcb);
 }
 
 /*
@@ -610,40 +571,18 @@ static int qemu_rbd_send_pipe(BDRVRBDState *s, RADOSCB *rcb)
  *
  * Note: this function is being called from a non qemu thread so
  * we need to be careful about what we do here. Generally we only
- * write to the block notification pipe, and do the rest of the
- * io completion handling from qemu_rbd_aio_event_reader() which
- * runs in a qemu context.
+ * schedule a BH, and do the rest of the io completion handling
+ * from rbd_finish_bh() which runs in a qemu context.
  */
 static void rbd_finish_aiocb(rbd_completion_t c, RADOSCB *rcb)
 {
-    int ret;
+    RBDAIOCB *acb = rcb->acb;
+
     rcb->ret = rbd_aio_get_return_value(c);
     rbd_aio_release(c);
-    ret = qemu_rbd_send_pipe(rcb->s, rcb);
-    if (ret < 0) {
-        error_report("failed writing to acb->s->fds");
-        g_free(rcb);
-    }
-}
-
-/* Callback when all queued rbd_aio requests are complete */
-
-static void rbd_aio_bh_cb(void *opaque)
-{
-    RBDAIOCB *acb = opaque;
-
-    if (acb->cmd == RBD_AIO_READ) {
-        qemu_iovec_from_buf(acb->qiov, 0, acb->bounce, acb->qiov->size);
-    }
-    qemu_vfree(acb->bounce);
-    acb->common.cb(acb->common.opaque, (acb->ret > 0 ? 0 : acb->ret));
-    qemu_bh_delete(acb->bh);
-    acb->bh = NULL;
-    acb->status = 0;
 
-    if (!acb->cancelled) {
-        qemu_aio_release(acb);
-    }
+    acb->bh = qemu_bh_new(rbd_finish_bh, rcb);
+    qemu_bh_schedule(acb->bh);
 }
 
 static int rbd_aio_discard_wrapper(rbd_image_t image,
@@ -658,6 +597,16 @@ static int rbd_aio_discard_wrapper(rbd_image_t image,
 #endif
 }
 
+static int rbd_aio_flush_wrapper(rbd_image_t image,
+                                 rbd_completion_t comp)
+{
+#ifdef LIBRBD_SUPPORTS_AIO_FLUSH
+    return rbd_aio_flush(image, comp);
+#else
+    return -ENOTSUP;
+#endif
+}
+
 static BlockDriverAIOCB *rbd_start_aio(BlockDriverState *bs,
                                        int64_t sector_num,
                                        QEMUIOVector *qiov,
@@ -678,7 +627,7 @@ static BlockDriverAIOCB *rbd_start_aio(BlockDriverState *bs,
     acb = qemu_aio_get(&rbd_aiocb_info, bs, cb, opaque);
     acb->cmd = cmd;
     acb->qiov = qiov;
-    if (cmd == RBD_AIO_DISCARD) {
+    if (cmd == RBD_AIO_DISCARD || cmd == RBD_AIO_FLUSH) {
         acb->bounce = NULL;
     } else {
         acb->bounce = qemu_blockalign(bs, qiov->size);
@@ -699,8 +648,6 @@ static BlockDriverAIOCB *rbd_start_aio(BlockDriverState *bs,
     off = sector_num * BDRV_SECTOR_SIZE;
     size = nb_sectors * BDRV_SECTOR_SIZE;
 
-    s->qemu_aio_count++; /* All the RADOSCB */
-
     rcb = g_malloc(sizeof(RADOSCB));
     rcb->done = 0;
     rcb->acb = acb;
@@ -722,6 +669,9 @@ static BlockDriverAIOCB *rbd_start_aio(BlockDriverState *bs,
     case RBD_AIO_DISCARD:
         r = rbd_aio_discard_wrapper(s->image, off, size, c);
         break;
+    case RBD_AIO_FLUSH:
+        r = rbd_aio_flush_wrapper(s->image, c);
+        break;
     default:
         r = -EINVAL;
     }
@@ -734,7 +684,6 @@ static BlockDriverAIOCB *rbd_start_aio(BlockDriverState *bs,
 
 failed:
     g_free(rcb);
-    s->qemu_aio_count--;
     qemu_aio_release(acb);
     return NULL;
 }
@@ -761,6 +710,16 @@ static BlockDriverAIOCB *qemu_rbd_aio_writev(BlockDriverState *bs,
                          RBD_AIO_WRITE);
 }
 
+#ifdef LIBRBD_SUPPORTS_AIO_FLUSH
+static BlockDriverAIOCB *qemu_rbd_aio_flush(BlockDriverState *bs,
+                                            BlockDriverCompletionFunc *cb,
+                                            void *opaque)
+{
+    return rbd_start_aio(bs, 0, NULL, 0, cb, opaque, RBD_AIO_FLUSH);
+}
+
+#else
+
 static int qemu_rbd_co_flush(BlockDriverState *bs)
 {
 #if LIBRBD_VERSION_CODE >= LIBRBD_VERSION(0, 1, 1)
@@ -771,6 +730,7 @@ static int qemu_rbd_co_flush(BlockDriverState *bs)
     return 0;
 #endif
 }
+#endif
 
 static int qemu_rbd_getinfo(BlockDriverState *bs, BlockDriverInfo *bdi)
 {
@@ -847,12 +807,31 @@ static int qemu_rbd_snap_create(BlockDriverState *bs,
 }
 
 static int qemu_rbd_snap_remove(BlockDriverState *bs,
-                                const char *snapshot_name)
+                                const char *snapshot_id,
+                                const char *snapshot_name,
+                                Error **errp)
 {
     BDRVRBDState *s = bs->opaque;
     int r;
 
+    if (!snapshot_name) {
+        error_setg(errp, "rbd need a valid snapshot name");
+        return -EINVAL;
+    }
+
+    /* If snapshot_id is specified, it must be equal to name, see
+       qemu_rbd_snap_list() */
+    if (snapshot_id && strcmp(snapshot_id, snapshot_name)) {
+        error_setg(errp,
+                   "rbd do not support snapshot id, it should be NULL or "
+                   "equal to snapshot name");
+        return -EINVAL;
+    }
+
     r = rbd_snap_remove(s->image, snapshot_name);
+    if (r < 0) {
+        error_setg_errno(errp, -r, "Failed to remove the snapshot");
+    }
     return r;
 }
 
@@ -878,7 +857,7 @@ static int qemu_rbd_snap_list(BlockDriverState *bs,
     do {
         snaps = g_malloc(sizeof(*snaps) * max_snaps);
         snap_count = rbd_snap_list(s->image, snaps, &max_snaps);
-        if (snap_count < 0) {
+        if (snap_count <= 0) {
             g_free(snaps);
         }
     } while (snap_count == -ERANGE);
@@ -902,6 +881,7 @@ static int qemu_rbd_snap_list(BlockDriverState *bs,
         sn_info->vm_clock_nsec = 0;
     }
     rbd_snap_list_end(snaps);
+    g_free(snaps);
 
  done:
     *psn_tab = sn_tab;
@@ -937,9 +917,11 @@ static QEMUOptionParameter qemu_rbd_create_options[] = {
 static BlockDriver bdrv_rbd = {
     .format_name        = "rbd",
     .instance_size      = sizeof(BDRVRBDState),
+    .bdrv_needs_filename = true,
     .bdrv_file_open     = qemu_rbd_open,
     .bdrv_close         = qemu_rbd_close,
     .bdrv_create        = qemu_rbd_create,
+    .bdrv_has_zero_init = bdrv_has_zero_init_1,
     .bdrv_get_info      = qemu_rbd_getinfo,
     .create_options     = qemu_rbd_create_options,
     .bdrv_getlength     = qemu_rbd_getlength,
@@ -948,7 +930,12 @@ static BlockDriver bdrv_rbd = {
 
     .bdrv_aio_readv         = qemu_rbd_aio_readv,
     .bdrv_aio_writev        = qemu_rbd_aio_writev,
+
+#ifdef LIBRBD_SUPPORTS_AIO_FLUSH
+    .bdrv_aio_flush         = qemu_rbd_aio_flush,
+#else
     .bdrv_co_flush_to_disk  = qemu_rbd_co_flush,
+#endif
 
 #ifdef LIBRBD_SUPPORTS_DISCARD
     .bdrv_aio_discard       = qemu_rbd_aio_discard,
This page took 0.036995 seconds and 4 git commands to generate.