]> Git Repo - qemu.git/blob - block/rbd.c
block/rbd: add all the currently supported runtime_opts
[qemu.git] / block / rbd.c
1 /*
2  * QEMU Block driver for RADOS (Ceph)
3  *
4  * Copyright (C) 2010-2011 Christian Brunner <[email protected]>,
5  *                         Josh Durgin <[email protected]>
6  *
7  * This work is licensed under the terms of the GNU GPL, version 2.  See
8  * the COPYING file in the top-level directory.
9  *
10  * Contributions after 2012-01-13 are licensed under the terms of the
11  * GNU GPL, version 2 or (at your option) any later version.
12  */
13
14 #include "qemu/osdep.h"
15
16 #include "qapi/error.h"
17 #include "qemu/error-report.h"
18 #include "block/block_int.h"
19 #include "crypto/secret.h"
20 #include "qemu/cutils.h"
21
22 #include <rbd/librbd.h>
23
24 /*
25  * When specifying the image filename use:
26  *
27  * rbd:poolname/devicename[@snapshotname][:option1=value1[:option2=value2...]]
28  *
29  * poolname must be the name of an existing rados pool.
30  *
31  * devicename is the name of the rbd image.
32  *
33  * Each option given is used to configure rados, and may be any valid
34  * Ceph option, "id", or "conf".
35  *
36  * The "id" option indicates what user we should authenticate as to
37  * the Ceph cluster.  If it is excluded we will use the Ceph default
38  * (normally 'admin').
39  *
40  * The "conf" option specifies a Ceph configuration file to read.  If
41  * it is not specified, we will read from the default Ceph locations
42  * (e.g., /etc/ceph/ceph.conf).  To avoid reading _any_ configuration
43  * file, specify conf=/dev/null.
44  *
45  * Configuration values containing :, @, or = can be escaped with a
46  * leading "\".
47  */
48
49 /* rbd_aio_discard added in 0.1.2 */
50 #if LIBRBD_VERSION_CODE >= LIBRBD_VERSION(0, 1, 2)
51 #define LIBRBD_SUPPORTS_DISCARD
52 #else
53 #undef LIBRBD_SUPPORTS_DISCARD
54 #endif
55
56 #define OBJ_MAX_SIZE (1UL << OBJ_DEFAULT_OBJ_ORDER)
57
58 #define RBD_MAX_CONF_NAME_SIZE 128
59 #define RBD_MAX_CONF_VAL_SIZE 512
60 #define RBD_MAX_CONF_SIZE 1024
61 #define RBD_MAX_POOL_NAME_SIZE 128
62 #define RBD_MAX_SNAP_NAME_SIZE 128
63 #define RBD_MAX_SNAPS 100
64
65 /* The LIBRBD_SUPPORTS_IOVEC is defined in librbd.h */
66 #ifdef LIBRBD_SUPPORTS_IOVEC
67 #define LIBRBD_USE_IOVEC 1
68 #else
69 #define LIBRBD_USE_IOVEC 0
70 #endif
71
72 typedef enum {
73     RBD_AIO_READ,
74     RBD_AIO_WRITE,
75     RBD_AIO_DISCARD,
76     RBD_AIO_FLUSH
77 } RBDAIOCmd;
78
79 typedef struct RBDAIOCB {
80     BlockAIOCB common;
81     int64_t ret;
82     QEMUIOVector *qiov;
83     char *bounce;
84     RBDAIOCmd cmd;
85     int error;
86     struct BDRVRBDState *s;
87 } RBDAIOCB;
88
89 typedef struct RADOSCB {
90     RBDAIOCB *acb;
91     struct BDRVRBDState *s;
92     int64_t size;
93     char *buf;
94     int64_t ret;
95 } RADOSCB;
96
97 typedef struct BDRVRBDState {
98     rados_t cluster;
99     rados_ioctx_t io_ctx;
100     rbd_image_t image;
101     char name[RBD_MAX_IMAGE_NAME_SIZE];
102     char *snap;
103 } BDRVRBDState;
104
105 static char *qemu_rbd_next_tok(int max_len,
106                                char *src, char delim,
107                                const char *name,
108                                char **p, Error **errp)
109 {
110     int l;
111     char *end;
112
113     *p = NULL;
114
115     if (delim != '\0') {
116         for (end = src; *end; ++end) {
117             if (*end == delim) {
118                 break;
119             }
120             if (*end == '\\' && end[1] != '\0') {
121                 end++;
122             }
123         }
124         if (*end == delim) {
125             *p = end + 1;
126             *end = '\0';
127         }
128     }
129     l = strlen(src);
130     if (l >= max_len) {
131         error_setg(errp, "%s too long", name);
132         return NULL;
133     } else if (l == 0) {
134         error_setg(errp, "%s too short", name);
135         return NULL;
136     }
137
138     return src;
139 }
140
141 static void qemu_rbd_unescape(char *src)
142 {
143     char *p;
144
145     for (p = src; *src; ++src, ++p) {
146         if (*src == '\\' && src[1] != '\0') {
147             src++;
148         }
149         *p = *src;
150     }
151     *p = '\0';
152 }
153
154 static int qemu_rbd_parsename(const char *filename,
155                               char *pool, int pool_len,
156                               char *snap, int snap_len,
157                               char *name, int name_len,
158                               char *conf, int conf_len,
159                               Error **errp)
160 {
161     const char *start;
162     char *p, *buf;
163     int ret = 0;
164     char *found_str;
165     Error *local_err = NULL;
166
167     if (!strstart(filename, "rbd:", &start)) {
168         error_setg(errp, "File name must start with 'rbd:'");
169         return -EINVAL;
170     }
171
172     buf = g_strdup(start);
173     p = buf;
174     *snap = '\0';
175     *conf = '\0';
176
177     found_str = qemu_rbd_next_tok(pool_len, p,
178                                   '/', "pool name", &p, &local_err);
179     if (local_err) {
180         goto done;
181     }
182     if (!p) {
183         ret = -EINVAL;
184         error_setg(errp, "Pool name is required");
185         goto done;
186     }
187     qemu_rbd_unescape(found_str);
188     g_strlcpy(pool, found_str, pool_len);
189
190     if (strchr(p, '@')) {
191         found_str = qemu_rbd_next_tok(name_len, p,
192                                       '@', "object name", &p, &local_err);
193         if (local_err) {
194             goto done;
195         }
196         qemu_rbd_unescape(found_str);
197         g_strlcpy(name, found_str, name_len);
198
199         found_str = qemu_rbd_next_tok(snap_len, p,
200                                       ':', "snap name", &p, &local_err);
201         if (local_err) {
202             goto done;
203         }
204         qemu_rbd_unescape(found_str);
205         g_strlcpy(snap, found_str, snap_len);
206     } else {
207         found_str = qemu_rbd_next_tok(name_len, p,
208                                       ':', "object name", &p, &local_err);
209         if (local_err) {
210             goto done;
211         }
212         qemu_rbd_unescape(found_str);
213         g_strlcpy(name, found_str, name_len);
214     }
215     if (!p) {
216         goto done;
217     }
218
219     found_str = qemu_rbd_next_tok(conf_len, p,
220                                   '\0', "configuration", &p, &local_err);
221     if (local_err) {
222         goto done;
223     }
224     g_strlcpy(conf, found_str, conf_len);
225
226 done:
227     if (local_err) {
228         ret = -EINVAL;
229         error_propagate(errp, local_err);
230     }
231     g_free(buf);
232     return ret;
233 }
234
235 static char *qemu_rbd_parse_clientname(const char *conf, char *clientname)
236 {
237     const char *p = conf;
238
239     while (*p) {
240         int len;
241         const char *end = strchr(p, ':');
242
243         if (end) {
244             len = end - p;
245         } else {
246             len = strlen(p);
247         }
248
249         if (strncmp(p, "id=", 3) == 0) {
250             len -= 3;
251             strncpy(clientname, p + 3, len);
252             clientname[len] = '\0';
253             return clientname;
254         }
255         if (end == NULL) {
256             break;
257         }
258         p = end + 1;
259     }
260     return NULL;
261 }
262
263
264 static int qemu_rbd_set_auth(rados_t cluster, const char *secretid,
265                              Error **errp)
266 {
267     if (secretid == 0) {
268         return 0;
269     }
270
271     gchar *secret = qcrypto_secret_lookup_as_base64(secretid,
272                                                     errp);
273     if (!secret) {
274         return -1;
275     }
276
277     rados_conf_set(cluster, "key", secret);
278     g_free(secret);
279
280     return 0;
281 }
282
283
284 static int qemu_rbd_set_conf(rados_t cluster, const char *conf,
285                              bool only_read_conf_file,
286                              Error **errp)
287 {
288     char *p, *buf;
289     char *name;
290     char *value;
291     Error *local_err = NULL;
292     int ret = 0;
293
294     buf = g_strdup(conf);
295     p = buf;
296
297     while (p) {
298         name = qemu_rbd_next_tok(RBD_MAX_CONF_NAME_SIZE, p,
299                                  '=', "conf option name", &p, &local_err);
300         if (local_err) {
301             break;
302         }
303         qemu_rbd_unescape(name);
304
305         if (!p) {
306             error_setg(errp, "conf option %s has no value", name);
307             ret = -EINVAL;
308             break;
309         }
310
311         value = qemu_rbd_next_tok(RBD_MAX_CONF_VAL_SIZE, p,
312                                   ':', "conf option value", &p, &local_err);
313         if (local_err) {
314             break;
315         }
316         qemu_rbd_unescape(value);
317
318         if (strcmp(name, "conf") == 0) {
319             /* read the conf file alone, so it doesn't override more
320                specific settings for a particular device */
321             if (only_read_conf_file) {
322                 ret = rados_conf_read_file(cluster, value);
323                 if (ret < 0) {
324                     error_setg_errno(errp, -ret, "error reading conf file %s",
325                                      value);
326                     break;
327                 }
328             }
329         } else if (strcmp(name, "id") == 0) {
330             /* ignore, this is parsed by qemu_rbd_parse_clientname() */
331         } else if (!only_read_conf_file) {
332             ret = rados_conf_set(cluster, name, value);
333             if (ret < 0) {
334                 error_setg_errno(errp, -ret, "invalid conf option %s", name);
335                 ret = -EINVAL;
336                 break;
337             }
338         }
339     }
340
341     if (local_err) {
342         error_propagate(errp, local_err);
343         ret = -EINVAL;
344     }
345     g_free(buf);
346     return ret;
347 }
348
349 static void qemu_rbd_memset(RADOSCB *rcb, int64_t offs)
350 {
351     if (LIBRBD_USE_IOVEC) {
352         RBDAIOCB *acb = rcb->acb;
353         iov_memset(acb->qiov->iov, acb->qiov->niov, offs, 0,
354                    acb->qiov->size - offs);
355     } else {
356         memset(rcb->buf + offs, 0, rcb->size - offs);
357     }
358 }
359
360 static QemuOptsList runtime_opts = {
361     .name = "rbd",
362     .head = QTAILQ_HEAD_INITIALIZER(runtime_opts.head),
363     .desc = {
364         {
365             .name = "filename",
366             .type = QEMU_OPT_STRING,
367             .help = "Specification of the rbd image",
368         },
369         {
370             .name = "password-secret",
371             .type = QEMU_OPT_STRING,
372             .help = "ID of secret providing the password",
373         },
374         {
375             .name = "conf",
376             .type = QEMU_OPT_STRING,
377             .help = "Rados config file location",
378         },
379         {
380             .name = "pool",
381             .type = QEMU_OPT_STRING,
382             .help = "Rados pool name",
383         },
384         {
385             .name = "image",
386             .type = QEMU_OPT_STRING,
387             .help = "Image name in the pool",
388         },
389         {
390             .name = "snapshot",
391             .type = QEMU_OPT_STRING,
392             .help = "Ceph snapshot name",
393         },
394         {
395             /* maps to 'id' in rados_create() */
396             .name = "user",
397             .type = QEMU_OPT_STRING,
398             .help = "Rados id name",
399         },
400         {
401             .name = "keyvalue-pairs",
402             .type = QEMU_OPT_STRING,
403             .help = "Legacy rados key/value option parameters",
404         },
405         { /* end of list */ }
406     },
407 };
408
409 static int qemu_rbd_create(const char *filename, QemuOpts *opts, Error **errp)
410 {
411     Error *local_err = NULL;
412     int64_t bytes = 0;
413     int64_t objsize;
414     int obj_order = 0;
415     char pool[RBD_MAX_POOL_NAME_SIZE];
416     char name[RBD_MAX_IMAGE_NAME_SIZE];
417     char snap_buf[RBD_MAX_SNAP_NAME_SIZE];
418     char conf[RBD_MAX_CONF_SIZE];
419     char clientname_buf[RBD_MAX_CONF_SIZE];
420     char *clientname;
421     const char *secretid;
422     rados_t cluster;
423     rados_ioctx_t io_ctx;
424     int ret;
425
426     secretid = qemu_opt_get(opts, "password-secret");
427
428     if (qemu_rbd_parsename(filename, pool, sizeof(pool),
429                            snap_buf, sizeof(snap_buf),
430                            name, sizeof(name),
431                            conf, sizeof(conf), &local_err) < 0) {
432         error_propagate(errp, local_err);
433         return -EINVAL;
434     }
435
436     /* Read out options */
437     bytes = ROUND_UP(qemu_opt_get_size_del(opts, BLOCK_OPT_SIZE, 0),
438                      BDRV_SECTOR_SIZE);
439     objsize = qemu_opt_get_size_del(opts, BLOCK_OPT_CLUSTER_SIZE, 0);
440     if (objsize) {
441         if ((objsize - 1) & objsize) {    /* not a power of 2? */
442             error_setg(errp, "obj size needs to be power of 2");
443             return -EINVAL;
444         }
445         if (objsize < 4096) {
446             error_setg(errp, "obj size too small");
447             return -EINVAL;
448         }
449         obj_order = ctz32(objsize);
450     }
451
452     clientname = qemu_rbd_parse_clientname(conf, clientname_buf);
453     ret = rados_create(&cluster, clientname);
454     if (ret < 0) {
455         error_setg_errno(errp, -ret, "error initializing");
456         return ret;
457     }
458
459     if (strstr(conf, "conf=") == NULL) {
460         /* try default location, but ignore failure */
461         rados_conf_read_file(cluster, NULL);
462     } else if (conf[0] != '\0' &&
463                qemu_rbd_set_conf(cluster, conf, true, &local_err) < 0) {
464         error_propagate(errp, local_err);
465         ret = -EIO;
466         goto shutdown;
467     }
468
469     if (conf[0] != '\0' &&
470         qemu_rbd_set_conf(cluster, conf, false, &local_err) < 0) {
471         error_propagate(errp, local_err);
472         ret = -EIO;
473         goto shutdown;
474     }
475
476     if (qemu_rbd_set_auth(cluster, secretid, errp) < 0) {
477         ret = -EIO;
478         goto shutdown;
479     }
480
481     ret = rados_connect(cluster);
482     if (ret < 0) {
483         error_setg_errno(errp, -ret, "error connecting");
484         goto shutdown;
485     }
486
487     ret = rados_ioctx_create(cluster, pool, &io_ctx);
488     if (ret < 0) {
489         error_setg_errno(errp, -ret, "error opening pool %s", pool);
490         goto shutdown;
491     }
492
493     ret = rbd_create(io_ctx, name, bytes, &obj_order);
494     if (ret < 0) {
495         error_setg_errno(errp, -ret, "error rbd create");
496     }
497
498     rados_ioctx_destroy(io_ctx);
499
500 shutdown:
501     rados_shutdown(cluster);
502     return ret;
503 }
504
505 /*
506  * This aio completion is being called from rbd_finish_bh() and runs in qemu
507  * BH context.
508  */
509 static void qemu_rbd_complete_aio(RADOSCB *rcb)
510 {
511     RBDAIOCB *acb = rcb->acb;
512     int64_t r;
513
514     r = rcb->ret;
515
516     if (acb->cmd != RBD_AIO_READ) {
517         if (r < 0) {
518             acb->ret = r;
519             acb->error = 1;
520         } else if (!acb->error) {
521             acb->ret = rcb->size;
522         }
523     } else {
524         if (r < 0) {
525             qemu_rbd_memset(rcb, 0);
526             acb->ret = r;
527             acb->error = 1;
528         } else if (r < rcb->size) {
529             qemu_rbd_memset(rcb, r);
530             if (!acb->error) {
531                 acb->ret = rcb->size;
532             }
533         } else if (!acb->error) {
534             acb->ret = r;
535         }
536     }
537
538     g_free(rcb);
539
540     if (!LIBRBD_USE_IOVEC) {
541         if (acb->cmd == RBD_AIO_READ) {
542             qemu_iovec_from_buf(acb->qiov, 0, acb->bounce, acb->qiov->size);
543         }
544         qemu_vfree(acb->bounce);
545     }
546
547     acb->common.cb(acb->common.opaque, (acb->ret > 0 ? 0 : acb->ret));
548
549     qemu_aio_unref(acb);
550 }
551
552 static int qemu_rbd_open(BlockDriverState *bs, QDict *options, int flags,
553                          Error **errp)
554 {
555     BDRVRBDState *s = bs->opaque;
556     char pool[RBD_MAX_POOL_NAME_SIZE];
557     char snap_buf[RBD_MAX_SNAP_NAME_SIZE];
558     char conf[RBD_MAX_CONF_SIZE];
559     char clientname_buf[RBD_MAX_CONF_SIZE];
560     char *clientname;
561     const char *secretid;
562     QemuOpts *opts;
563     Error *local_err = NULL;
564     const char *filename;
565     int r;
566
567     opts = qemu_opts_create(&runtime_opts, NULL, 0, &error_abort);
568     qemu_opts_absorb_qdict(opts, options, &local_err);
569     if (local_err) {
570         error_propagate(errp, local_err);
571         qemu_opts_del(opts);
572         return -EINVAL;
573     }
574
575     filename = qemu_opt_get(opts, "filename");
576     secretid = qemu_opt_get(opts, "password-secret");
577
578     if (qemu_rbd_parsename(filename, pool, sizeof(pool),
579                            snap_buf, sizeof(snap_buf),
580                            s->name, sizeof(s->name),
581                            conf, sizeof(conf), errp) < 0) {
582         r = -EINVAL;
583         goto failed_opts;
584     }
585
586     clientname = qemu_rbd_parse_clientname(conf, clientname_buf);
587     r = rados_create(&s->cluster, clientname);
588     if (r < 0) {
589         error_setg_errno(errp, -r, "error initializing");
590         goto failed_opts;
591     }
592
593     s->snap = NULL;
594     if (snap_buf[0] != '\0') {
595         s->snap = g_strdup(snap_buf);
596     }
597
598     if (strstr(conf, "conf=") == NULL) {
599         /* try default location, but ignore failure */
600         rados_conf_read_file(s->cluster, NULL);
601     } else if (conf[0] != '\0') {
602         r = qemu_rbd_set_conf(s->cluster, conf, true, errp);
603         if (r < 0) {
604             goto failed_shutdown;
605         }
606     }
607
608     if (conf[0] != '\0') {
609         r = qemu_rbd_set_conf(s->cluster, conf, false, errp);
610         if (r < 0) {
611             goto failed_shutdown;
612         }
613     }
614
615     if (qemu_rbd_set_auth(s->cluster, secretid, errp) < 0) {
616         r = -EIO;
617         goto failed_shutdown;
618     }
619
620     /*
621      * Fallback to more conservative semantics if setting cache
622      * options fails. Ignore errors from setting rbd_cache because the
623      * only possible error is that the option does not exist, and
624      * librbd defaults to no caching. If write through caching cannot
625      * be set up, fall back to no caching.
626      */
627     if (flags & BDRV_O_NOCACHE) {
628         rados_conf_set(s->cluster, "rbd_cache", "false");
629     } else {
630         rados_conf_set(s->cluster, "rbd_cache", "true");
631     }
632
633     r = rados_connect(s->cluster);
634     if (r < 0) {
635         error_setg_errno(errp, -r, "error connecting");
636         goto failed_shutdown;
637     }
638
639     r = rados_ioctx_create(s->cluster, pool, &s->io_ctx);
640     if (r < 0) {
641         error_setg_errno(errp, -r, "error opening pool %s", pool);
642         goto failed_shutdown;
643     }
644
645     r = rbd_open(s->io_ctx, s->name, &s->image, s->snap);
646     if (r < 0) {
647         error_setg_errno(errp, -r, "error reading header from %s", s->name);
648         goto failed_open;
649     }
650
651     bs->read_only = (s->snap != NULL);
652
653     qemu_opts_del(opts);
654     return 0;
655
656 failed_open:
657     rados_ioctx_destroy(s->io_ctx);
658 failed_shutdown:
659     rados_shutdown(s->cluster);
660     g_free(s->snap);
661 failed_opts:
662     qemu_opts_del(opts);
663     return r;
664 }
665
666 static void qemu_rbd_close(BlockDriverState *bs)
667 {
668     BDRVRBDState *s = bs->opaque;
669
670     rbd_close(s->image);
671     rados_ioctx_destroy(s->io_ctx);
672     g_free(s->snap);
673     rados_shutdown(s->cluster);
674 }
675
676 static const AIOCBInfo rbd_aiocb_info = {
677     .aiocb_size = sizeof(RBDAIOCB),
678 };
679
680 static void rbd_finish_bh(void *opaque)
681 {
682     RADOSCB *rcb = opaque;
683     qemu_rbd_complete_aio(rcb);
684 }
685
686 /*
687  * This is the callback function for rbd_aio_read and _write
688  *
689  * Note: this function is being called from a non qemu thread so
690  * we need to be careful about what we do here. Generally we only
691  * schedule a BH, and do the rest of the io completion handling
692  * from rbd_finish_bh() which runs in a qemu context.
693  */
694 static void rbd_finish_aiocb(rbd_completion_t c, RADOSCB *rcb)
695 {
696     RBDAIOCB *acb = rcb->acb;
697
698     rcb->ret = rbd_aio_get_return_value(c);
699     rbd_aio_release(c);
700
701     aio_bh_schedule_oneshot(bdrv_get_aio_context(acb->common.bs),
702                             rbd_finish_bh, rcb);
703 }
704
705 static int rbd_aio_discard_wrapper(rbd_image_t image,
706                                    uint64_t off,
707                                    uint64_t len,
708                                    rbd_completion_t comp)
709 {
710 #ifdef LIBRBD_SUPPORTS_DISCARD
711     return rbd_aio_discard(image, off, len, comp);
712 #else
713     return -ENOTSUP;
714 #endif
715 }
716
717 static int rbd_aio_flush_wrapper(rbd_image_t image,
718                                  rbd_completion_t comp)
719 {
720 #ifdef LIBRBD_SUPPORTS_AIO_FLUSH
721     return rbd_aio_flush(image, comp);
722 #else
723     return -ENOTSUP;
724 #endif
725 }
726
727 static BlockAIOCB *rbd_start_aio(BlockDriverState *bs,
728                                  int64_t off,
729                                  QEMUIOVector *qiov,
730                                  int64_t size,
731                                  BlockCompletionFunc *cb,
732                                  void *opaque,
733                                  RBDAIOCmd cmd)
734 {
735     RBDAIOCB *acb;
736     RADOSCB *rcb = NULL;
737     rbd_completion_t c;
738     int r;
739
740     BDRVRBDState *s = bs->opaque;
741
742     acb = qemu_aio_get(&rbd_aiocb_info, bs, cb, opaque);
743     acb->cmd = cmd;
744     acb->qiov = qiov;
745     assert(!qiov || qiov->size == size);
746
747     rcb = g_new(RADOSCB, 1);
748
749     if (!LIBRBD_USE_IOVEC) {
750         if (cmd == RBD_AIO_DISCARD || cmd == RBD_AIO_FLUSH) {
751             acb->bounce = NULL;
752         } else {
753             acb->bounce = qemu_try_blockalign(bs, qiov->size);
754             if (acb->bounce == NULL) {
755                 goto failed;
756             }
757         }
758         if (cmd == RBD_AIO_WRITE) {
759             qemu_iovec_to_buf(acb->qiov, 0, acb->bounce, qiov->size);
760         }
761         rcb->buf = acb->bounce;
762     }
763
764     acb->ret = 0;
765     acb->error = 0;
766     acb->s = s;
767
768     rcb->acb = acb;
769     rcb->s = acb->s;
770     rcb->size = size;
771     r = rbd_aio_create_completion(rcb, (rbd_callback_t) rbd_finish_aiocb, &c);
772     if (r < 0) {
773         goto failed;
774     }
775
776     switch (cmd) {
777     case RBD_AIO_WRITE:
778 #ifdef LIBRBD_SUPPORTS_IOVEC
779             r = rbd_aio_writev(s->image, qiov->iov, qiov->niov, off, c);
780 #else
781             r = rbd_aio_write(s->image, off, size, rcb->buf, c);
782 #endif
783         break;
784     case RBD_AIO_READ:
785 #ifdef LIBRBD_SUPPORTS_IOVEC
786             r = rbd_aio_readv(s->image, qiov->iov, qiov->niov, off, c);
787 #else
788             r = rbd_aio_read(s->image, off, size, rcb->buf, c);
789 #endif
790         break;
791     case RBD_AIO_DISCARD:
792         r = rbd_aio_discard_wrapper(s->image, off, size, c);
793         break;
794     case RBD_AIO_FLUSH:
795         r = rbd_aio_flush_wrapper(s->image, c);
796         break;
797     default:
798         r = -EINVAL;
799     }
800
801     if (r < 0) {
802         goto failed_completion;
803     }
804     return &acb->common;
805
806 failed_completion:
807     rbd_aio_release(c);
808 failed:
809     g_free(rcb);
810     if (!LIBRBD_USE_IOVEC) {
811         qemu_vfree(acb->bounce);
812     }
813
814     qemu_aio_unref(acb);
815     return NULL;
816 }
817
818 static BlockAIOCB *qemu_rbd_aio_readv(BlockDriverState *bs,
819                                       int64_t sector_num,
820                                       QEMUIOVector *qiov,
821                                       int nb_sectors,
822                                       BlockCompletionFunc *cb,
823                                       void *opaque)
824 {
825     return rbd_start_aio(bs, sector_num << BDRV_SECTOR_BITS, qiov,
826                          (int64_t) nb_sectors << BDRV_SECTOR_BITS, cb, opaque,
827                          RBD_AIO_READ);
828 }
829
830 static BlockAIOCB *qemu_rbd_aio_writev(BlockDriverState *bs,
831                                        int64_t sector_num,
832                                        QEMUIOVector *qiov,
833                                        int nb_sectors,
834                                        BlockCompletionFunc *cb,
835                                        void *opaque)
836 {
837     return rbd_start_aio(bs, sector_num << BDRV_SECTOR_BITS, qiov,
838                          (int64_t) nb_sectors << BDRV_SECTOR_BITS, cb, opaque,
839                          RBD_AIO_WRITE);
840 }
841
842 #ifdef LIBRBD_SUPPORTS_AIO_FLUSH
843 static BlockAIOCB *qemu_rbd_aio_flush(BlockDriverState *bs,
844                                       BlockCompletionFunc *cb,
845                                       void *opaque)
846 {
847     return rbd_start_aio(bs, 0, NULL, 0, cb, opaque, RBD_AIO_FLUSH);
848 }
849
850 #else
851
852 static int qemu_rbd_co_flush(BlockDriverState *bs)
853 {
854 #if LIBRBD_VERSION_CODE >= LIBRBD_VERSION(0, 1, 1)
855     /* rbd_flush added in 0.1.1 */
856     BDRVRBDState *s = bs->opaque;
857     return rbd_flush(s->image);
858 #else
859     return 0;
860 #endif
861 }
862 #endif
863
864 static int qemu_rbd_getinfo(BlockDriverState *bs, BlockDriverInfo *bdi)
865 {
866     BDRVRBDState *s = bs->opaque;
867     rbd_image_info_t info;
868     int r;
869
870     r = rbd_stat(s->image, &info, sizeof(info));
871     if (r < 0) {
872         return r;
873     }
874
875     bdi->cluster_size = info.obj_size;
876     return 0;
877 }
878
879 static int64_t qemu_rbd_getlength(BlockDriverState *bs)
880 {
881     BDRVRBDState *s = bs->opaque;
882     rbd_image_info_t info;
883     int r;
884
885     r = rbd_stat(s->image, &info, sizeof(info));
886     if (r < 0) {
887         return r;
888     }
889
890     return info.size;
891 }
892
893 static int qemu_rbd_truncate(BlockDriverState *bs, int64_t offset)
894 {
895     BDRVRBDState *s = bs->opaque;
896     int r;
897
898     r = rbd_resize(s->image, offset);
899     if (r < 0) {
900         return r;
901     }
902
903     return 0;
904 }
905
906 static int qemu_rbd_snap_create(BlockDriverState *bs,
907                                 QEMUSnapshotInfo *sn_info)
908 {
909     BDRVRBDState *s = bs->opaque;
910     int r;
911
912     if (sn_info->name[0] == '\0') {
913         return -EINVAL; /* we need a name for rbd snapshots */
914     }
915
916     /*
917      * rbd snapshots are using the name as the user controlled unique identifier
918      * we can't use the rbd snapid for that purpose, as it can't be set
919      */
920     if (sn_info->id_str[0] != '\0' &&
921         strcmp(sn_info->id_str, sn_info->name) != 0) {
922         return -EINVAL;
923     }
924
925     if (strlen(sn_info->name) >= sizeof(sn_info->id_str)) {
926         return -ERANGE;
927     }
928
929     r = rbd_snap_create(s->image, sn_info->name);
930     if (r < 0) {
931         error_report("failed to create snap: %s", strerror(-r));
932         return r;
933     }
934
935     return 0;
936 }
937
938 static int qemu_rbd_snap_remove(BlockDriverState *bs,
939                                 const char *snapshot_id,
940                                 const char *snapshot_name,
941                                 Error **errp)
942 {
943     BDRVRBDState *s = bs->opaque;
944     int r;
945
946     if (!snapshot_name) {
947         error_setg(errp, "rbd need a valid snapshot name");
948         return -EINVAL;
949     }
950
951     /* If snapshot_id is specified, it must be equal to name, see
952        qemu_rbd_snap_list() */
953     if (snapshot_id && strcmp(snapshot_id, snapshot_name)) {
954         error_setg(errp,
955                    "rbd do not support snapshot id, it should be NULL or "
956                    "equal to snapshot name");
957         return -EINVAL;
958     }
959
960     r = rbd_snap_remove(s->image, snapshot_name);
961     if (r < 0) {
962         error_setg_errno(errp, -r, "Failed to remove the snapshot");
963     }
964     return r;
965 }
966
967 static int qemu_rbd_snap_rollback(BlockDriverState *bs,
968                                   const char *snapshot_name)
969 {
970     BDRVRBDState *s = bs->opaque;
971
972     return rbd_snap_rollback(s->image, snapshot_name);
973 }
974
975 static int qemu_rbd_snap_list(BlockDriverState *bs,
976                               QEMUSnapshotInfo **psn_tab)
977 {
978     BDRVRBDState *s = bs->opaque;
979     QEMUSnapshotInfo *sn_info, *sn_tab = NULL;
980     int i, snap_count;
981     rbd_snap_info_t *snaps;
982     int max_snaps = RBD_MAX_SNAPS;
983
984     do {
985         snaps = g_new(rbd_snap_info_t, max_snaps);
986         snap_count = rbd_snap_list(s->image, snaps, &max_snaps);
987         if (snap_count <= 0) {
988             g_free(snaps);
989         }
990     } while (snap_count == -ERANGE);
991
992     if (snap_count <= 0) {
993         goto done;
994     }
995
996     sn_tab = g_new0(QEMUSnapshotInfo, snap_count);
997
998     for (i = 0; i < snap_count; i++) {
999         const char *snap_name = snaps[i].name;
1000
1001         sn_info = sn_tab + i;
1002         pstrcpy(sn_info->id_str, sizeof(sn_info->id_str), snap_name);
1003         pstrcpy(sn_info->name, sizeof(sn_info->name), snap_name);
1004
1005         sn_info->vm_state_size = snaps[i].size;
1006         sn_info->date_sec = 0;
1007         sn_info->date_nsec = 0;
1008         sn_info->vm_clock_nsec = 0;
1009     }
1010     rbd_snap_list_end(snaps);
1011     g_free(snaps);
1012
1013  done:
1014     *psn_tab = sn_tab;
1015     return snap_count;
1016 }
1017
1018 #ifdef LIBRBD_SUPPORTS_DISCARD
1019 static BlockAIOCB *qemu_rbd_aio_pdiscard(BlockDriverState *bs,
1020                                          int64_t offset,
1021                                          int count,
1022                                          BlockCompletionFunc *cb,
1023                                          void *opaque)
1024 {
1025     return rbd_start_aio(bs, offset, NULL, count, cb, opaque,
1026                          RBD_AIO_DISCARD);
1027 }
1028 #endif
1029
1030 #ifdef LIBRBD_SUPPORTS_INVALIDATE
1031 static void qemu_rbd_invalidate_cache(BlockDriverState *bs,
1032                                       Error **errp)
1033 {
1034     BDRVRBDState *s = bs->opaque;
1035     int r = rbd_invalidate_cache(s->image);
1036     if (r < 0) {
1037         error_setg_errno(errp, -r, "Failed to invalidate the cache");
1038     }
1039 }
1040 #endif
1041
1042 static QemuOptsList qemu_rbd_create_opts = {
1043     .name = "rbd-create-opts",
1044     .head = QTAILQ_HEAD_INITIALIZER(qemu_rbd_create_opts.head),
1045     .desc = {
1046         {
1047             .name = BLOCK_OPT_SIZE,
1048             .type = QEMU_OPT_SIZE,
1049             .help = "Virtual disk size"
1050         },
1051         {
1052             .name = BLOCK_OPT_CLUSTER_SIZE,
1053             .type = QEMU_OPT_SIZE,
1054             .help = "RBD object size"
1055         },
1056         {
1057             .name = "password-secret",
1058             .type = QEMU_OPT_STRING,
1059             .help = "ID of secret providing the password",
1060         },
1061         { /* end of list */ }
1062     }
1063 };
1064
1065 static BlockDriver bdrv_rbd = {
1066     .format_name        = "rbd",
1067     .instance_size      = sizeof(BDRVRBDState),
1068     .bdrv_needs_filename = true,
1069     .bdrv_file_open     = qemu_rbd_open,
1070     .bdrv_close         = qemu_rbd_close,
1071     .bdrv_create        = qemu_rbd_create,
1072     .bdrv_has_zero_init = bdrv_has_zero_init_1,
1073     .bdrv_get_info      = qemu_rbd_getinfo,
1074     .create_opts        = &qemu_rbd_create_opts,
1075     .bdrv_getlength     = qemu_rbd_getlength,
1076     .bdrv_truncate      = qemu_rbd_truncate,
1077     .protocol_name      = "rbd",
1078
1079     .bdrv_aio_readv         = qemu_rbd_aio_readv,
1080     .bdrv_aio_writev        = qemu_rbd_aio_writev,
1081
1082 #ifdef LIBRBD_SUPPORTS_AIO_FLUSH
1083     .bdrv_aio_flush         = qemu_rbd_aio_flush,
1084 #else
1085     .bdrv_co_flush_to_disk  = qemu_rbd_co_flush,
1086 #endif
1087
1088 #ifdef LIBRBD_SUPPORTS_DISCARD
1089     .bdrv_aio_pdiscard      = qemu_rbd_aio_pdiscard,
1090 #endif
1091
1092     .bdrv_snapshot_create   = qemu_rbd_snap_create,
1093     .bdrv_snapshot_delete   = qemu_rbd_snap_remove,
1094     .bdrv_snapshot_list     = qemu_rbd_snap_list,
1095     .bdrv_snapshot_goto     = qemu_rbd_snap_rollback,
1096 #ifdef LIBRBD_SUPPORTS_INVALIDATE
1097     .bdrv_invalidate_cache  = qemu_rbd_invalidate_cache,
1098 #endif
1099 };
1100
1101 static void bdrv_rbd_init(void)
1102 {
1103     bdrv_register(&bdrv_rbd);
1104 }
1105
1106 block_init(bdrv_rbd_init);
This page took 0.085415 seconds and 4 git commands to generate.