* This work is licensed under the terms of the GNU GPL, version 2. See
* the COPYING file in the top-level directory.
*
+ * Contributions after 2012-01-13 are licensed under the terms of the
+ * GNU GPL, version 2 or (at your option) any later version.
*/
#include <inttypes.h>
#include "qemu-common.h"
#include "qemu-error.h"
-
#include "block_int.h"
#include <rbd/librbd.h>
-
-
/*
* When specifying the image filename use:
*
* rbd:poolname/devicename[@snapshotname][:option1=value1[:option2=value2...]]
*
- * poolname must be the name of an existing rados pool
+ * poolname must be the name of an existing rados pool.
*
- * devicename is the basename for all objects used to
- * emulate the raw device.
+ * devicename is the name of the rbd image.
*
- * Each option given is used to configure rados, and may be
- * any Ceph option, or "conf". The "conf" option specifies
- * a Ceph configuration file to read.
+ * Each option given is used to configure rados, and may be any valid
+ * Ceph option, "id", or "conf".
*
- * Metadata information (image size, ...) is stored in an
- * object with the name "devicename.rbd".
+ * The "id" option indicates what user we should authenticate as to
+ * the Ceph cluster. If it is excluded we will use the Ceph default
+ * (normally 'admin').
*
- * The raw device is split into 4MB sized objects by default.
- * The sequencenumber is encoded in a 12 byte long hex-string,
- * and is attached to the devicename, separated by a dot.
- * e.g. "devicename.1234567890ab"
+ * The "conf" option specifies a Ceph configuration file to read. If
+ * it is not specified, we will read from the default Ceph locations
+ * (e.g., /etc/ceph/ceph.conf). To avoid reading _any_ configuration
+ * file, specify conf=/dev/null.
*
+ * Configuration values containing :, @, or = can be escaped with a
+ * leading "\".
*/
+/* rbd_aio_discard added in 0.1.2 */
+#if LIBRBD_VERSION_CODE >= LIBRBD_VERSION(0, 1, 2)
+#define LIBRBD_SUPPORTS_DISCARD
+#else
+#undef LIBRBD_SUPPORTS_DISCARD
+#endif
+
#define OBJ_MAX_SIZE (1UL << OBJ_DEFAULT_OBJ_ORDER)
#define RBD_MAX_CONF_NAME_SIZE 128
#define RBD_MAX_SNAP_NAME_SIZE 128
#define RBD_MAX_SNAPS 100
+typedef enum {
+ RBD_AIO_READ,
+ RBD_AIO_WRITE,
+ RBD_AIO_DISCARD
+} RBDAIOCmd;
+
typedef struct RBDAIOCB {
BlockDriverAIOCB common;
QEMUBH *bh;
int ret;
QEMUIOVector *qiov;
char *bounce;
- int write;
+ RBDAIOCmd cmd;
int64_t sector_num;
int error;
struct BDRVRBDState *s;
*p = NULL;
if (delim != '\0') {
- end = strchr(src, delim);
- if (end) {
+ for (end = src; *end; ++end) {
+ if (*end == delim) {
+ break;
+ }
+ if (*end == '\\' && end[1] != '\0') {
+ end++;
+ }
+ }
+ if (*end == delim) {
*p = end + 1;
*end = '\0';
}
return 0;
}
+static void qemu_rbd_unescape(char *src)
+{
+ char *p;
+
+ for (p = src; *src; ++src, ++p) {
+ if (*src == '\\' && src[1] != '\0') {
+ src++;
+ }
+ *p = *src;
+ }
+ *p = '\0';
+}
+
static int qemu_rbd_parsename(const char *filename,
char *pool, int pool_len,
char *snap, int snap_len,
return -EINVAL;
}
- buf = qemu_strdup(start);
+ buf = g_strdup(start);
p = buf;
*snap = '\0';
*conf = '\0';
ret = -EINVAL;
goto done;
}
+ qemu_rbd_unescape(pool);
if (strchr(p, '@')) {
ret = qemu_rbd_next_tok(name, name_len, p, '@', "object name", &p);
goto done;
}
ret = qemu_rbd_next_tok(snap, snap_len, p, ':', "snap name", &p);
+ qemu_rbd_unescape(snap);
} else {
ret = qemu_rbd_next_tok(name, name_len, p, ':', "object name", &p);
}
+ qemu_rbd_unescape(name);
if (ret < 0 || !p) {
goto done;
}
ret = qemu_rbd_next_tok(conf, conf_len, p, '\0', "configuration", &p);
done:
- qemu_free(buf);
+ g_free(buf);
return ret;
}
+static char *qemu_rbd_parse_clientname(const char *conf, char *clientname)
+{
+ const char *p = conf;
+
+ while (*p) {
+ int len;
+ const char *end = strchr(p, ':');
+
+ if (end) {
+ len = end - p;
+ } else {
+ len = strlen(p);
+ }
+
+ if (strncmp(p, "id=", 3) == 0) {
+ len -= 3;
+ strncpy(clientname, p + 3, len);
+ clientname[len] = '\0';
+ return clientname;
+ }
+ if (end == NULL) {
+ break;
+ }
+ p = end + 1;
+ }
+ return NULL;
+}
+
static int qemu_rbd_set_conf(rados_t cluster, const char *conf)
{
char *p, *buf;
char value[RBD_MAX_CONF_VAL_SIZE];
int ret = 0;
- buf = qemu_strdup(conf);
+ buf = g_strdup(conf);
p = buf;
while (p) {
if (ret < 0) {
break;
}
+ qemu_rbd_unescape(name);
if (!p) {
error_report("conf option %s has no value", name);
if (ret < 0) {
break;
}
+ qemu_rbd_unescape(value);
- if (strcmp(name, "conf")) {
- ret = rados_conf_set(cluster, name, value);
+ if (strcmp(name, "conf") == 0) {
+ ret = rados_conf_read_file(cluster, value);
if (ret < 0) {
- error_report("invalid conf option %s", name);
- ret = -EINVAL;
+ error_report("error reading conf file %s", value);
break;
}
+ } else if (strcmp(name, "id") == 0) {
+ /* ignore, this is parsed by qemu_rbd_parse_clientname() */
} else {
- ret = rados_conf_read_file(cluster, value);
+ ret = rados_conf_set(cluster, name, value);
if (ret < 0) {
- error_report("error reading conf file %s", value);
+ error_report("invalid conf option %s", name);
+ ret = -EINVAL;
break;
}
}
}
- qemu_free(buf);
+ g_free(buf);
return ret;
}
char name[RBD_MAX_IMAGE_NAME_SIZE];
char snap_buf[RBD_MAX_SNAP_NAME_SIZE];
char conf[RBD_MAX_CONF_SIZE];
+ char clientname_buf[RBD_MAX_CONF_SIZE];
+ char *clientname;
rados_t cluster;
rados_ioctx_t io_ctx;
int ret;
options++;
}
- if (rados_create(&cluster, NULL) < 0) {
+ clientname = qemu_rbd_parse_clientname(conf, clientname_buf);
+ if (rados_create(&cluster, clientname) < 0) {
error_report("error initializing");
return -EIO;
}
if (strstr(conf, "conf=") == NULL) {
- if (rados_conf_read_file(cluster, NULL) < 0) {
- error_report("error reading config file");
- rados_shutdown(cluster);
- return -EIO;
- }
+ /* try default location, but ignore failure */
+ rados_conf_read_file(cluster, NULL);
}
if (conf[0] != '\0' &&
r = rcb->ret;
- if (acb->write) {
+ if (acb->cmd == RBD_AIO_WRITE ||
+ acb->cmd == RBD_AIO_DISCARD) {
if (r < 0) {
acb->ret = r;
acb->error = 1;
acb->bh = qemu_bh_new(rbd_aio_bh_cb, acb);
qemu_bh_schedule(acb->bh);
done:
- qemu_free(rcb);
+ g_free(rcb);
}
/*
char *p = (char *)&s->event_rcb;
/* now read the rcb pointer that was sent from a non qemu thread */
- if ((ret = read(s->fds[RBD_FD_READ], p + s->event_reader_pos,
- sizeof(s->event_rcb) - s->event_reader_pos)) > 0) {
- if (ret > 0) {
- s->event_reader_pos += ret;
- if (s->event_reader_pos == sizeof(s->event_rcb)) {
- s->event_reader_pos = 0;
- qemu_rbd_complete_aio(s->event_rcb);
- s->qemu_aio_count--;
- }
+ ret = read(s->fds[RBD_FD_READ], p + s->event_reader_pos,
+ sizeof(s->event_rcb) - s->event_reader_pos);
+ if (ret > 0) {
+ s->event_reader_pos += ret;
+ if (s->event_reader_pos == sizeof(s->event_rcb)) {
+ s->event_reader_pos = 0;
+ qemu_rbd_complete_aio(s->event_rcb);
+ s->qemu_aio_count--;
}
}
} while (ret < 0 && errno == EINTR);
char pool[RBD_MAX_POOL_NAME_SIZE];
char snap_buf[RBD_MAX_SNAP_NAME_SIZE];
char conf[RBD_MAX_CONF_SIZE];
+ char clientname_buf[RBD_MAX_CONF_SIZE];
+ char *clientname;
int r;
if (qemu_rbd_parsename(filename, pool, sizeof(pool),
conf, sizeof(conf)) < 0) {
return -EINVAL;
}
- s->snap = NULL;
- if (snap_buf[0] != '\0') {
- s->snap = qemu_strdup(snap_buf);
- }
- r = rados_create(&s->cluster, NULL);
+ clientname = qemu_rbd_parse_clientname(conf, clientname_buf);
+ r = rados_create(&s->cluster, clientname);
if (r < 0) {
error_report("error initializing");
return r;
}
+ s->snap = NULL;
+ if (snap_buf[0] != '\0') {
+ s->snap = g_strdup(snap_buf);
+ }
+
if (strstr(conf, "conf=") == NULL) {
- r = rados_conf_read_file(s->cluster, NULL);
- if (r < 0) {
- error_report("error reading config file");
- rados_shutdown(s->cluster);
- return r;
- }
+ /* try default location, but ignore failure */
+ rados_conf_read_file(s->cluster, NULL);
}
if (conf[0] != '\0') {
r = qemu_rbd_set_conf(s->cluster, conf);
if (r < 0) {
error_report("error setting config options");
- rados_shutdown(s->cluster);
- return r;
+ goto failed_shutdown;
}
}
r = rados_connect(s->cluster);
if (r < 0) {
error_report("error connecting");
- rados_shutdown(s->cluster);
- return r;
+ goto failed_shutdown;
}
r = rados_ioctx_create(s->cluster, pool, &s->io_ctx);
if (r < 0) {
error_report("error opening pool %s", pool);
- rados_shutdown(s->cluster);
- return r;
+ goto failed_shutdown;
}
r = rbd_open(s->io_ctx, s->name, &s->image, s->snap);
if (r < 0) {
error_report("error reading header from %s", s->name);
- rados_ioctx_destroy(s->io_ctx);
- rados_shutdown(s->cluster);
- return r;
+ goto failed_open;
}
bs->read_only = (s->snap != NULL);
fcntl(s->fds[0], F_SETFL, O_NONBLOCK);
fcntl(s->fds[1], F_SETFL, O_NONBLOCK);
qemu_aio_set_fd_handler(s->fds[RBD_FD_READ], qemu_rbd_aio_event_reader,
- NULL, qemu_rbd_aio_flush_cb, NULL, s);
+ NULL, qemu_rbd_aio_flush_cb, s);
return 0;
failed:
rbd_close(s->image);
+failed_open:
rados_ioctx_destroy(s->io_ctx);
+failed_shutdown:
rados_shutdown(s->cluster);
+ g_free(s->snap);
return r;
}
close(s->fds[0]);
close(s->fds[1]);
- qemu_aio_set_fd_handler(s->fds[RBD_FD_READ], NULL , NULL, NULL, NULL,
- NULL);
+ qemu_aio_set_fd_handler(s->fds[RBD_FD_READ], NULL, NULL, NULL, NULL);
rbd_close(s->image);
rados_ioctx_destroy(s->io_ctx);
- qemu_free(s->snap);
+ g_free(s->snap);
rados_shutdown(s->cluster);
}
ret = qemu_rbd_send_pipe(rcb->s, rcb);
if (ret < 0) {
error_report("failed writing to acb->s->fds");
- qemu_free(rcb);
+ g_free(rcb);
}
}
{
RBDAIOCB *acb = opaque;
- if (!acb->write) {
+ if (acb->cmd == RBD_AIO_READ) {
qemu_iovec_from_buffer(acb->qiov, acb->bounce, acb->qiov->size);
}
qemu_vfree(acb->bounce);
qemu_aio_release(acb);
}
-static BlockDriverAIOCB *rbd_aio_rw_vector(BlockDriverState *bs,
- int64_t sector_num,
- QEMUIOVector *qiov,
- int nb_sectors,
- BlockDriverCompletionFunc *cb,
- void *opaque, int write)
+static int rbd_aio_discard_wrapper(rbd_image_t image,
+ uint64_t off,
+ uint64_t len,
+ rbd_completion_t comp)
+{
+#ifdef LIBRBD_SUPPORTS_DISCARD
+ return rbd_aio_discard(image, off, len, comp);
+#else
+ return -ENOTSUP;
+#endif
+}
+
+static BlockDriverAIOCB *rbd_start_aio(BlockDriverState *bs,
+ int64_t sector_num,
+ QEMUIOVector *qiov,
+ int nb_sectors,
+ BlockDriverCompletionFunc *cb,
+ void *opaque,
+ RBDAIOCmd cmd)
{
RBDAIOCB *acb;
RADOSCB *rcb;
BDRVRBDState *s = bs->opaque;
acb = qemu_aio_get(&rbd_aio_pool, bs, cb, opaque);
- if (!acb) {
- return NULL;
- }
- acb->write = write;
+ acb->cmd = cmd;
acb->qiov = qiov;
- acb->bounce = qemu_blockalign(bs, qiov->size);
+ if (cmd == RBD_AIO_DISCARD) {
+ acb->bounce = NULL;
+ } else {
+ acb->bounce = qemu_blockalign(bs, qiov->size);
+ }
acb->ret = 0;
acb->error = 0;
acb->s = s;
acb->cancelled = 0;
acb->bh = NULL;
- if (write) {
+ if (cmd == RBD_AIO_WRITE) {
qemu_iovec_to_buffer(acb->qiov, acb->bounce);
}
s->qemu_aio_count++; /* All the RADOSCB */
- rcb = qemu_malloc(sizeof(RADOSCB));
+ rcb = g_malloc(sizeof(RADOSCB));
rcb->done = 0;
rcb->acb = acb;
rcb->buf = buf;
goto failed;
}
- if (write) {
+ switch (cmd) {
+ case RBD_AIO_WRITE:
r = rbd_aio_write(s->image, off, size, buf, c);
- } else {
+ break;
+ case RBD_AIO_READ:
r = rbd_aio_read(s->image, off, size, buf, c);
+ break;
+ case RBD_AIO_DISCARD:
+ r = rbd_aio_discard_wrapper(s->image, off, size, c);
+ break;
+ default:
+ r = -EINVAL;
}
if (r < 0) {
return &acb->common;
failed:
- qemu_free(rcb);
+ g_free(rcb);
s->qemu_aio_count--;
qemu_aio_release(acb);
return NULL;
BlockDriverCompletionFunc *cb,
void *opaque)
{
- return rbd_aio_rw_vector(bs, sector_num, qiov, nb_sectors, cb, opaque, 0);
+ return rbd_start_aio(bs, sector_num, qiov, nb_sectors, cb, opaque,
+ RBD_AIO_READ);
}
static BlockDriverAIOCB *qemu_rbd_aio_writev(BlockDriverState *bs,
BlockDriverCompletionFunc *cb,
void *opaque)
{
- return rbd_aio_rw_vector(bs, sector_num, qiov, nb_sectors, cb, opaque, 1);
+ return rbd_start_aio(bs, sector_num, qiov, nb_sectors, cb, opaque,
+ RBD_AIO_WRITE);
+}
+
+static int qemu_rbd_co_flush(BlockDriverState *bs)
+{
+#if LIBRBD_VERSION_CODE >= LIBRBD_VERSION(0, 1, 1)
+ /* rbd_flush added in 0.1.1 */
+ BDRVRBDState *s = bs->opaque;
+ return rbd_flush(s->image);
+#else
+ return 0;
+#endif
}
static int qemu_rbd_getinfo(BlockDriverState *bs, BlockDriverInfo *bdi)
return 0;
}
+static int qemu_rbd_snap_remove(BlockDriverState *bs,
+ const char *snapshot_name)
+{
+ BDRVRBDState *s = bs->opaque;
+ int r;
+
+ r = rbd_snap_remove(s->image, snapshot_name);
+ return r;
+}
+
+static int qemu_rbd_snap_rollback(BlockDriverState *bs,
+ const char *snapshot_name)
+{
+ BDRVRBDState *s = bs->opaque;
+ int r;
+
+ r = rbd_snap_rollback(s->image, snapshot_name);
+ return r;
+}
+
static int qemu_rbd_snap_list(BlockDriverState *bs,
QEMUSnapshotInfo **psn_tab)
{
int max_snaps = RBD_MAX_SNAPS;
do {
- snaps = qemu_malloc(sizeof(*snaps) * max_snaps);
+ snaps = g_malloc(sizeof(*snaps) * max_snaps);
snap_count = rbd_snap_list(s->image, snaps, &max_snaps);
if (snap_count < 0) {
- qemu_free(snaps);
+ g_free(snaps);
}
} while (snap_count == -ERANGE);
if (snap_count <= 0) {
- return snap_count;
+ goto done;
}
- sn_tab = qemu_mallocz(snap_count * sizeof(QEMUSnapshotInfo));
+ sn_tab = g_malloc0(snap_count * sizeof(QEMUSnapshotInfo));
for (i = 0; i < snap_count; i++) {
const char *snap_name = snaps[i].name;
}
rbd_snap_list_end(snaps);
+ done:
*psn_tab = sn_tab;
return snap_count;
}
+#ifdef LIBRBD_SUPPORTS_DISCARD
+static BlockDriverAIOCB* qemu_rbd_aio_discard(BlockDriverState *bs,
+ int64_t sector_num,
+ int nb_sectors,
+ BlockDriverCompletionFunc *cb,
+ void *opaque)
+{
+ return rbd_start_aio(bs, sector_num, NULL, nb_sectors, cb, opaque,
+ RBD_AIO_DISCARD);
+}
+#endif
+
static QEMUOptionParameter qemu_rbd_create_options[] = {
{
.name = BLOCK_OPT_SIZE,
.bdrv_truncate = qemu_rbd_truncate,
.protocol_name = "rbd",
- .bdrv_aio_readv = qemu_rbd_aio_readv,
- .bdrv_aio_writev = qemu_rbd_aio_writev,
+ .bdrv_aio_readv = qemu_rbd_aio_readv,
+ .bdrv_aio_writev = qemu_rbd_aio_writev,
+ .bdrv_co_flush_to_disk = qemu_rbd_co_flush,
+
+#ifdef LIBRBD_SUPPORTS_DISCARD
+ .bdrv_aio_discard = qemu_rbd_aio_discard,
+#endif
- .bdrv_snapshot_create = qemu_rbd_snap_create,
- .bdrv_snapshot_list = qemu_rbd_snap_list,
+ .bdrv_snapshot_create = qemu_rbd_snap_create,
+ .bdrv_snapshot_delete = qemu_rbd_snap_remove,
+ .bdrv_snapshot_list = qemu_rbd_snap_list,
+ .bdrv_snapshot_goto = qemu_rbd_snap_rollback,
};
static void bdrv_rbd_init(void)