2 rbd.c -- Export ceph rados objects as a Linux block device
5 based on drivers/block/osdblk.c:
7 Copyright 2009 Red Hat, Inc.
9 This program is free software; you can redistribute it and/or modify
10 it under the terms of the GNU General Public License as published by
11 the Free Software Foundation.
13 This program is distributed in the hope that it will be useful,
14 but WITHOUT ANY WARRANTY; without even the implied warranty of
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 GNU General Public License for more details.
18 You should have received a copy of the GNU General Public License
19 along with this program; see the file COPYING. If not, write to
20 the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
24 For usage instructions, please refer to:
26 Documentation/ABI/testing/sysfs-bus-rbd
30 #include <linux/ceph/libceph.h>
31 #include <linux/ceph/osd_client.h>
32 #include <linux/ceph/mon_client.h>
33 #include <linux/ceph/decode.h>
34 #include <linux/parser.h>
36 #include <linux/kernel.h>
37 #include <linux/device.h>
38 #include <linux/module.h>
40 #include <linux/blkdev.h>
42 #include "rbd_types.h"
44 #define RBD_DEBUG /* Activate rbd_assert() calls */
47 * The basic unit of block I/O is a sector. It is interpreted in a
48 * number of contexts in Linux (blk, bio, genhd), but the default is
49 * universally 512 bytes. These symbols are just slightly more
50 * meaningful than the bare numbers they represent.
52 #define SECTOR_SHIFT 9
53 #define SECTOR_SIZE (1ULL << SECTOR_SHIFT)
55 /* It might be useful to have this defined elsewhere too */
57 #define U64_MAX ((u64) (~0ULL))
59 #define RBD_DRV_NAME "rbd"
60 #define RBD_DRV_NAME_LONG "rbd (rados block device)"
62 #define RBD_MINORS_PER_MAJOR 256 /* max minors per blkdev */
64 #define RBD_MAX_SNAP_NAME_LEN 32
65 #define RBD_MAX_SNAP_COUNT 510 /* allows max snapc to fit in 4KB */
66 #define RBD_MAX_OPT_LEN 1024
68 #define RBD_SNAP_HEAD_NAME "-"
70 #define RBD_IMAGE_ID_LEN_MAX 64
71 #define RBD_OBJ_PREFIX_LEN_MAX 64
74 * An RBD device name will be "rbd#", where the "rbd" comes from
75 * RBD_DRV_NAME above, and # is a unique integer identifier.
76 * MAX_INT_FORMAT_WIDTH is used in ensuring DEV_NAME_LEN is big
77 * enough to hold all possible device names.
79 #define DEV_NAME_LEN 32
80 #define MAX_INT_FORMAT_WIDTH ((5 * sizeof (int)) / 2 + 1)
82 #define RBD_READ_ONLY_DEFAULT false
85 * block device image metadata (in-memory version)
87 struct rbd_image_header {
88 /* These four fields never change for a given rbd image */
95 /* The remaining fields need to be updated occasionally */
97 struct ceph_snap_context *snapc;
109 * an instance of the client. multiple devices may share an rbd client.
112 struct ceph_client *client;
114 struct list_head node;
118 * a request completion status
120 struct rbd_req_status {
127 * a collection of requests
129 struct rbd_req_coll {
133 struct rbd_req_status status[0];
137 * a single io request
140 struct request *rq; /* blk layer request */
141 struct bio *bio; /* cloned bio */
142 struct page **pages; /* list of used pages */
145 struct rbd_req_coll *coll;
152 struct list_head node;
170 int dev_id; /* blkdev unique id */
172 int major; /* blkdev assigned major */
173 struct gendisk *disk; /* blkdev's gendisk and rq */
175 u32 image_format; /* Either 1 or 2 */
176 struct rbd_options rbd_opts;
177 struct rbd_client *rbd_client;
179 char name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */
181 spinlock_t lock; /* queue lock */
183 struct rbd_image_header header;
187 size_t image_name_len;
192 struct ceph_osd_event *watch_event;
193 struct ceph_osd_request *watch_request;
195 /* protects updating the header */
196 struct rw_semaphore header_rwsem;
198 struct rbd_mapping mapping;
200 struct list_head node;
202 /* list of snapshots */
203 struct list_head snaps;
209 static DEFINE_MUTEX(ctl_mutex); /* Serialize open/close/setup/teardown */
211 static LIST_HEAD(rbd_dev_list); /* devices */
212 static DEFINE_SPINLOCK(rbd_dev_list_lock);
214 static LIST_HEAD(rbd_client_list); /* clients */
215 static DEFINE_SPINLOCK(rbd_client_list_lock);
217 static int rbd_dev_snaps_update(struct rbd_device *rbd_dev);
218 static int rbd_dev_snaps_register(struct rbd_device *rbd_dev);
220 static void rbd_dev_release(struct device *dev);
221 static void __rbd_remove_snap_dev(struct rbd_snap *snap);
223 static ssize_t rbd_add(struct bus_type *bus, const char *buf,
225 static ssize_t rbd_remove(struct bus_type *bus, const char *buf,
228 static struct bus_attribute rbd_bus_attrs[] = {
229 __ATTR(add, S_IWUSR, NULL, rbd_add),
230 __ATTR(remove, S_IWUSR, NULL, rbd_remove),
234 static struct bus_type rbd_bus_type = {
236 .bus_attrs = rbd_bus_attrs,
239 static void rbd_root_dev_release(struct device *dev)
243 static struct device rbd_root_dev = {
245 .release = rbd_root_dev_release,
249 #define rbd_assert(expr) \
250 if (unlikely(!(expr))) { \
251 printk(KERN_ERR "\nAssertion failure in %s() " \
253 "\trbd_assert(%s);\n\n", \
254 __func__, __LINE__, #expr); \
257 #else /* !RBD_DEBUG */
258 # define rbd_assert(expr) ((void) 0)
259 #endif /* !RBD_DEBUG */
261 static struct device *rbd_get_dev(struct rbd_device *rbd_dev)
263 return get_device(&rbd_dev->dev);
266 static void rbd_put_dev(struct rbd_device *rbd_dev)
268 put_device(&rbd_dev->dev);
271 static int rbd_refresh_header(struct rbd_device *rbd_dev, u64 *hver);
273 static int rbd_open(struct block_device *bdev, fmode_t mode)
275 struct rbd_device *rbd_dev = bdev->bd_disk->private_data;
277 if ((mode & FMODE_WRITE) && rbd_dev->mapping.read_only)
280 rbd_get_dev(rbd_dev);
281 set_device_ro(bdev, rbd_dev->mapping.read_only);
286 static int rbd_release(struct gendisk *disk, fmode_t mode)
288 struct rbd_device *rbd_dev = disk->private_data;
290 rbd_put_dev(rbd_dev);
295 static const struct block_device_operations rbd_bd_ops = {
296 .owner = THIS_MODULE,
298 .release = rbd_release,
302 * Initialize an rbd client instance.
305 static struct rbd_client *rbd_client_create(struct ceph_options *ceph_opts)
307 struct rbd_client *rbdc;
310 dout("rbd_client_create\n");
311 rbdc = kmalloc(sizeof(struct rbd_client), GFP_KERNEL);
315 kref_init(&rbdc->kref);
316 INIT_LIST_HEAD(&rbdc->node);
318 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
320 rbdc->client = ceph_create_client(ceph_opts, rbdc, 0, 0);
321 if (IS_ERR(rbdc->client))
323 ceph_opts = NULL; /* Now rbdc->client is responsible for ceph_opts */
325 ret = ceph_open_session(rbdc->client);
329 spin_lock(&rbd_client_list_lock);
330 list_add_tail(&rbdc->node, &rbd_client_list);
331 spin_unlock(&rbd_client_list_lock);
333 mutex_unlock(&ctl_mutex);
335 dout("rbd_client_create created %p\n", rbdc);
339 ceph_destroy_client(rbdc->client);
341 mutex_unlock(&ctl_mutex);
345 ceph_destroy_options(ceph_opts);
350 * Find a ceph client with specific addr and configuration. If
351 * found, bump its reference count.
353 static struct rbd_client *rbd_client_find(struct ceph_options *ceph_opts)
355 struct rbd_client *client_node;
358 if (ceph_opts->flags & CEPH_OPT_NOSHARE)
361 spin_lock(&rbd_client_list_lock);
362 list_for_each_entry(client_node, &rbd_client_list, node) {
363 if (!ceph_compare_options(ceph_opts, client_node->client)) {
364 kref_get(&client_node->kref);
369 spin_unlock(&rbd_client_list_lock);
371 return found ? client_node : NULL;
381 /* string args above */
384 /* Boolean args above */
388 static match_table_t rbd_opts_tokens = {
390 /* string args above */
391 {Opt_read_only, "mapping.read_only"},
392 {Opt_read_only, "ro"}, /* Alternate spelling */
393 {Opt_read_write, "read_write"},
394 {Opt_read_write, "rw"}, /* Alternate spelling */
395 /* Boolean args above */
399 static int parse_rbd_opts_token(char *c, void *private)
401 struct rbd_options *rbd_opts = private;
402 substring_t argstr[MAX_OPT_ARGS];
403 int token, intval, ret;
405 token = match_token(c, rbd_opts_tokens, argstr);
409 if (token < Opt_last_int) {
410 ret = match_int(&argstr[0], &intval);
412 pr_err("bad mount option arg (not int) "
416 dout("got int token %d val %d\n", token, intval);
417 } else if (token > Opt_last_int && token < Opt_last_string) {
418 dout("got string token %d val %s\n", token,
420 } else if (token > Opt_last_string && token < Opt_last_bool) {
421 dout("got Boolean token %d\n", token);
423 dout("got token %d\n", token);
428 rbd_opts->read_only = true;
431 rbd_opts->read_only = false;
441 * Get a ceph client with specific addr and configuration, if one does
442 * not exist create it.
444 static int rbd_get_client(struct rbd_device *rbd_dev, const char *mon_addr,
445 size_t mon_addr_len, char *options)
447 struct rbd_options *rbd_opts = &rbd_dev->rbd_opts;
448 struct ceph_options *ceph_opts;
449 struct rbd_client *rbdc;
451 rbd_opts->read_only = RBD_READ_ONLY_DEFAULT;
453 ceph_opts = ceph_parse_options(options, mon_addr,
454 mon_addr + mon_addr_len,
455 parse_rbd_opts_token, rbd_opts);
456 if (IS_ERR(ceph_opts))
457 return PTR_ERR(ceph_opts);
459 rbdc = rbd_client_find(ceph_opts);
461 /* using an existing client */
462 ceph_destroy_options(ceph_opts);
464 rbdc = rbd_client_create(ceph_opts);
466 return PTR_ERR(rbdc);
468 rbd_dev->rbd_client = rbdc;
474 * Destroy ceph client
476 * Caller must hold rbd_client_list_lock.
478 static void rbd_client_release(struct kref *kref)
480 struct rbd_client *rbdc = container_of(kref, struct rbd_client, kref);
482 dout("rbd_release_client %p\n", rbdc);
483 spin_lock(&rbd_client_list_lock);
484 list_del(&rbdc->node);
485 spin_unlock(&rbd_client_list_lock);
487 ceph_destroy_client(rbdc->client);
492 * Drop reference to ceph client node. If it's not referenced anymore, release
495 static void rbd_put_client(struct rbd_device *rbd_dev)
497 kref_put(&rbd_dev->rbd_client->kref, rbd_client_release);
498 rbd_dev->rbd_client = NULL;
502 * Destroy requests collection
504 static void rbd_coll_release(struct kref *kref)
506 struct rbd_req_coll *coll =
507 container_of(kref, struct rbd_req_coll, kref);
509 dout("rbd_coll_release %p\n", coll);
513 static bool rbd_image_format_valid(u32 image_format)
515 return image_format == 1 || image_format == 2;
518 static bool rbd_dev_ondisk_valid(struct rbd_image_header_ondisk *ondisk)
523 /* The header has to start with the magic rbd header text */
524 if (memcmp(&ondisk->text, RBD_HEADER_TEXT, sizeof (RBD_HEADER_TEXT)))
528 * The size of a snapshot header has to fit in a size_t, and
529 * that limits the number of snapshots.
531 snap_count = le32_to_cpu(ondisk->snap_count);
532 size = SIZE_MAX - sizeof (struct ceph_snap_context);
533 if (snap_count > size / sizeof (__le64))
537 * Not only that, but the size of the entire the snapshot
538 * header must also be representable in a size_t.
540 size -= snap_count * sizeof (__le64);
541 if ((u64) size < le64_to_cpu(ondisk->snap_names_len))
548 * Create a new header structure, translate header format from the on-disk
551 static int rbd_header_from_disk(struct rbd_image_header *header,
552 struct rbd_image_header_ondisk *ondisk)
559 memset(header, 0, sizeof (*header));
561 snap_count = le32_to_cpu(ondisk->snap_count);
563 len = strnlen(ondisk->object_prefix, sizeof (ondisk->object_prefix));
564 header->object_prefix = kmalloc(len + 1, GFP_KERNEL);
565 if (!header->object_prefix)
567 memcpy(header->object_prefix, ondisk->object_prefix, len);
568 header->object_prefix[len] = '\0';
571 u64 snap_names_len = le64_to_cpu(ondisk->snap_names_len);
573 /* Save a copy of the snapshot names */
575 if (snap_names_len > (u64) SIZE_MAX)
577 header->snap_names = kmalloc(snap_names_len, GFP_KERNEL);
578 if (!header->snap_names)
581 * Note that rbd_dev_v1_header_read() guarantees
582 * the ondisk buffer we're working with has
583 * snap_names_len bytes beyond the end of the
584 * snapshot id array, this memcpy() is safe.
586 memcpy(header->snap_names, &ondisk->snaps[snap_count],
589 /* Record each snapshot's size */
591 size = snap_count * sizeof (*header->snap_sizes);
592 header->snap_sizes = kmalloc(size, GFP_KERNEL);
593 if (!header->snap_sizes)
595 for (i = 0; i < snap_count; i++)
596 header->snap_sizes[i] =
597 le64_to_cpu(ondisk->snaps[i].image_size);
599 WARN_ON(ondisk->snap_names_len);
600 header->snap_names = NULL;
601 header->snap_sizes = NULL;
604 header->features = 0; /* No features support in v1 images */
605 header->obj_order = ondisk->options.order;
606 header->crypt_type = ondisk->options.crypt_type;
607 header->comp_type = ondisk->options.comp_type;
609 /* Allocate and fill in the snapshot context */
611 header->image_size = le64_to_cpu(ondisk->image_size);
612 size = sizeof (struct ceph_snap_context);
613 size += snap_count * sizeof (header->snapc->snaps[0]);
614 header->snapc = kzalloc(size, GFP_KERNEL);
618 atomic_set(&header->snapc->nref, 1);
619 header->snapc->seq = le64_to_cpu(ondisk->snap_seq);
620 header->snapc->num_snaps = snap_count;
621 for (i = 0; i < snap_count; i++)
622 header->snapc->snaps[i] =
623 le64_to_cpu(ondisk->snaps[i].id);
628 kfree(header->snap_sizes);
629 header->snap_sizes = NULL;
630 kfree(header->snap_names);
631 header->snap_names = NULL;
632 kfree(header->object_prefix);
633 header->object_prefix = NULL;
638 static int snap_by_name(struct rbd_device *rbd_dev, const char *snap_name)
641 struct rbd_snap *snap;
643 list_for_each_entry(snap, &rbd_dev->snaps, node) {
644 if (!strcmp(snap_name, snap->name)) {
645 rbd_dev->mapping.snap_id = snap->id;
646 rbd_dev->mapping.size = snap->size;
647 rbd_dev->mapping.features = snap->features;
656 static int rbd_dev_set_mapping(struct rbd_device *rbd_dev, char *snap_name)
660 if (!memcmp(snap_name, RBD_SNAP_HEAD_NAME,
661 sizeof (RBD_SNAP_HEAD_NAME))) {
662 rbd_dev->mapping.snap_id = CEPH_NOSNAP;
663 rbd_dev->mapping.size = rbd_dev->header.image_size;
664 rbd_dev->mapping.features = rbd_dev->header.features;
665 rbd_dev->mapping.snap_exists = false;
666 rbd_dev->mapping.read_only = rbd_dev->rbd_opts.read_only;
669 ret = snap_by_name(rbd_dev, snap_name);
672 rbd_dev->mapping.snap_exists = true;
673 rbd_dev->mapping.read_only = true;
675 rbd_dev->mapping.snap_name = snap_name;
680 static void rbd_header_free(struct rbd_image_header *header)
682 kfree(header->object_prefix);
683 header->object_prefix = NULL;
684 kfree(header->snap_sizes);
685 header->snap_sizes = NULL;
686 kfree(header->snap_names);
687 header->snap_names = NULL;
688 ceph_put_snap_context(header->snapc);
689 header->snapc = NULL;
692 static char *rbd_segment_name(struct rbd_device *rbd_dev, u64 offset)
698 name = kmalloc(RBD_MAX_SEG_NAME_LEN + 1, GFP_NOIO);
701 segment = offset >> rbd_dev->header.obj_order;
702 ret = snprintf(name, RBD_MAX_SEG_NAME_LEN, "%s.%012llx",
703 rbd_dev->header.object_prefix, segment);
704 if (ret < 0 || ret >= RBD_MAX_SEG_NAME_LEN) {
705 pr_err("error formatting segment name for #%llu (%d)\n",
714 static u64 rbd_segment_offset(struct rbd_device *rbd_dev, u64 offset)
716 u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
718 return offset & (segment_size - 1);
721 static u64 rbd_segment_length(struct rbd_device *rbd_dev,
722 u64 offset, u64 length)
724 u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
726 offset &= segment_size - 1;
728 rbd_assert(length <= U64_MAX - offset);
729 if (offset + length > segment_size)
730 length = segment_size - offset;
735 static int rbd_get_num_segments(struct rbd_image_header *header,
743 if (len - 1 > U64_MAX - ofs)
746 start_seg = ofs >> header->obj_order;
747 end_seg = (ofs + len - 1) >> header->obj_order;
749 return end_seg - start_seg + 1;
753 * returns the size of an object in the image
755 static u64 rbd_obj_bytes(struct rbd_image_header *header)
757 return 1 << header->obj_order;
764 static void bio_chain_put(struct bio *chain)
770 chain = chain->bi_next;
776 * zeros a bio chain, starting at specific offset
778 static void zero_bio_chain(struct bio *chain, int start_ofs)
787 bio_for_each_segment(bv, chain, i) {
788 if (pos + bv->bv_len > start_ofs) {
789 int remainder = max(start_ofs - pos, 0);
790 buf = bvec_kmap_irq(bv, &flags);
791 memset(buf + remainder, 0,
792 bv->bv_len - remainder);
793 bvec_kunmap_irq(buf, &flags);
798 chain = chain->bi_next;
803 * bio_chain_clone - clone a chain of bios up to a certain length.
804 * might return a bio_pair that will need to be released.
806 static struct bio *bio_chain_clone(struct bio **old, struct bio **next,
807 struct bio_pair **bp,
808 int len, gfp_t gfpmask)
810 struct bio *old_chain = *old;
811 struct bio *new_chain = NULL;
816 bio_pair_release(*bp);
820 while (old_chain && (total < len)) {
823 tmp = bio_kmalloc(gfpmask, old_chain->bi_max_vecs);
826 gfpmask &= ~__GFP_WAIT; /* can't wait after the first */
828 if (total + old_chain->bi_size > len) {
832 * this split can only happen with a single paged bio,
833 * split_bio will BUG_ON if this is not the case
835 dout("bio_chain_clone split! total=%d remaining=%d"
837 total, len - total, old_chain->bi_size);
839 /* split the bio. We'll release it either in the next
840 call, or it will have to be released outside */
841 bp = bio_split(old_chain, (len - total) / SECTOR_SIZE);
845 __bio_clone(tmp, &bp->bio1);
849 __bio_clone(tmp, old_chain);
850 *next = old_chain->bi_next;
860 old_chain = old_chain->bi_next;
862 total += tmp->bi_size;
865 rbd_assert(total == len);
872 dout("bio_chain_clone with err\n");
873 bio_chain_put(new_chain);
878 * helpers for osd request op vectors.
880 static struct ceph_osd_req_op *rbd_create_rw_ops(int num_ops,
881 int opcode, u32 payload_len)
883 struct ceph_osd_req_op *ops;
885 ops = kzalloc(sizeof (*ops) * (num_ops + 1), GFP_NOIO);
892 * op extent offset and length will be set later on
893 * in calc_raw_layout()
895 ops[0].payload_len = payload_len;
900 static void rbd_destroy_ops(struct ceph_osd_req_op *ops)
905 static void rbd_coll_end_req_index(struct request *rq,
906 struct rbd_req_coll *coll,
910 struct request_queue *q;
913 dout("rbd_coll_end_req_index %p index %d ret %d len %llu\n",
914 coll, index, ret, (unsigned long long) len);
920 blk_end_request(rq, ret, len);
926 spin_lock_irq(q->queue_lock);
927 coll->status[index].done = 1;
928 coll->status[index].rc = ret;
929 coll->status[index].bytes = len;
930 max = min = coll->num_done;
931 while (max < coll->total && coll->status[max].done)
934 for (i = min; i<max; i++) {
935 __blk_end_request(rq, coll->status[i].rc,
936 coll->status[i].bytes);
938 kref_put(&coll->kref, rbd_coll_release);
940 spin_unlock_irq(q->queue_lock);
943 static void rbd_coll_end_req(struct rbd_request *req,
946 rbd_coll_end_req_index(req->rq, req->coll, req->coll_index, ret, len);
950 * Send ceph osd request
952 static int rbd_do_request(struct request *rq,
953 struct rbd_device *rbd_dev,
954 struct ceph_snap_context *snapc,
956 const char *object_name, u64 ofs, u64 len,
961 struct ceph_osd_req_op *ops,
962 struct rbd_req_coll *coll,
964 void (*rbd_cb)(struct ceph_osd_request *req,
965 struct ceph_msg *msg),
966 struct ceph_osd_request **linger_req,
969 struct ceph_osd_request *req;
970 struct ceph_file_layout *layout;
973 struct timespec mtime = CURRENT_TIME;
974 struct rbd_request *req_data;
975 struct ceph_osd_request_head *reqhead;
976 struct ceph_osd_client *osdc;
978 req_data = kzalloc(sizeof(*req_data), GFP_NOIO);
981 rbd_coll_end_req_index(rq, coll, coll_index,
987 req_data->coll = coll;
988 req_data->coll_index = coll_index;
991 dout("rbd_do_request object_name=%s ofs=%llu len=%llu\n", object_name,
992 (unsigned long long) ofs, (unsigned long long) len);
994 osdc = &rbd_dev->rbd_client->client->osdc;
995 req = ceph_osdc_alloc_request(osdc, flags, snapc, ops,
996 false, GFP_NOIO, pages, bio);
1002 req->r_callback = rbd_cb;
1005 req_data->bio = bio;
1006 req_data->pages = pages;
1007 req_data->len = len;
1009 req->r_priv = req_data;
1011 reqhead = req->r_request->front.iov_base;
1012 reqhead->snapid = cpu_to_le64(CEPH_NOSNAP);
1014 strncpy(req->r_oid, object_name, sizeof(req->r_oid));
1015 req->r_oid_len = strlen(req->r_oid);
1017 layout = &req->r_file_layout;
1018 memset(layout, 0, sizeof(*layout));
1019 layout->fl_stripe_unit = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
1020 layout->fl_stripe_count = cpu_to_le32(1);
1021 layout->fl_object_size = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
1022 layout->fl_pg_pool = cpu_to_le32(rbd_dev->pool_id);
1023 ret = ceph_calc_raw_layout(osdc, layout, snapid, ofs, &len, &bno,
1025 rbd_assert(ret == 0);
1027 ceph_osdc_build_request(req, ofs, &len,
1031 req->r_oid, req->r_oid_len);
1034 ceph_osdc_set_request_linger(osdc, req);
1038 ret = ceph_osdc_start_request(osdc, req, false);
1043 ret = ceph_osdc_wait_request(osdc, req);
1045 *ver = le64_to_cpu(req->r_reassert_version.version);
1046 dout("reassert_ver=%llu\n",
1047 (unsigned long long)
1048 le64_to_cpu(req->r_reassert_version.version));
1049 ceph_osdc_put_request(req);
1054 bio_chain_put(req_data->bio);
1055 ceph_osdc_put_request(req);
1057 rbd_coll_end_req(req_data, ret, len);
1063 * Ceph osd op callback
1065 static void rbd_req_cb(struct ceph_osd_request *req, struct ceph_msg *msg)
1067 struct rbd_request *req_data = req->r_priv;
1068 struct ceph_osd_reply_head *replyhead;
1069 struct ceph_osd_op *op;
1075 replyhead = msg->front.iov_base;
1076 WARN_ON(le32_to_cpu(replyhead->num_ops) == 0);
1077 op = (void *)(replyhead + 1);
1078 rc = le32_to_cpu(replyhead->result);
1079 bytes = le64_to_cpu(op->extent.length);
1080 read_op = (le16_to_cpu(op->op) == CEPH_OSD_OP_READ);
1082 dout("rbd_req_cb bytes=%llu readop=%d rc=%d\n",
1083 (unsigned long long) bytes, read_op, (int) rc);
1085 if (rc == -ENOENT && read_op) {
1086 zero_bio_chain(req_data->bio, 0);
1088 } else if (rc == 0 && read_op && bytes < req_data->len) {
1089 zero_bio_chain(req_data->bio, bytes);
1090 bytes = req_data->len;
1093 rbd_coll_end_req(req_data, rc, bytes);
1096 bio_chain_put(req_data->bio);
1098 ceph_osdc_put_request(req);
1102 static void rbd_simple_req_cb(struct ceph_osd_request *req, struct ceph_msg *msg)
1104 ceph_osdc_put_request(req);
1108 * Do a synchronous ceph osd operation
1110 static int rbd_req_sync_op(struct rbd_device *rbd_dev,
1111 struct ceph_snap_context *snapc,
1114 struct ceph_osd_req_op *ops,
1115 const char *object_name,
1116 u64 ofs, u64 inbound_size,
1118 struct ceph_osd_request **linger_req,
1122 struct page **pages;
1125 rbd_assert(ops != NULL);
1127 num_pages = calc_pages_for(ofs, inbound_size);
1128 pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL);
1130 return PTR_ERR(pages);
1132 ret = rbd_do_request(NULL, rbd_dev, snapc, snapid,
1133 object_name, ofs, inbound_size, NULL,
1143 if ((flags & CEPH_OSD_FLAG_READ) && inbound)
1144 ret = ceph_copy_from_page_vector(pages, inbound, ofs, ret);
1147 ceph_release_page_vector(pages, num_pages);
1152 * Do an asynchronous ceph osd operation
1154 static int rbd_do_op(struct request *rq,
1155 struct rbd_device *rbd_dev,
1156 struct ceph_snap_context *snapc,
1158 int opcode, int flags,
1161 struct rbd_req_coll *coll,
1168 struct ceph_osd_req_op *ops;
1171 seg_name = rbd_segment_name(rbd_dev, ofs);
1174 seg_len = rbd_segment_length(rbd_dev, ofs, len);
1175 seg_ofs = rbd_segment_offset(rbd_dev, ofs);
1177 payload_len = (flags & CEPH_OSD_FLAG_WRITE ? seg_len : 0);
1180 ops = rbd_create_rw_ops(1, opcode, payload_len);
1184 /* we've taken care of segment sizes earlier when we
1185 cloned the bios. We should never have a segment
1186 truncated at this point */
1187 rbd_assert(seg_len == len);
1189 ret = rbd_do_request(rq, rbd_dev, snapc, snapid,
1190 seg_name, seg_ofs, seg_len,
1196 rbd_req_cb, 0, NULL);
1198 rbd_destroy_ops(ops);
1205 * Request async osd write
1207 static int rbd_req_write(struct request *rq,
1208 struct rbd_device *rbd_dev,
1209 struct ceph_snap_context *snapc,
1212 struct rbd_req_coll *coll,
1215 return rbd_do_op(rq, rbd_dev, snapc, CEPH_NOSNAP,
1217 CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1218 ofs, len, bio, coll, coll_index);
1222 * Request async osd read
1224 static int rbd_req_read(struct request *rq,
1225 struct rbd_device *rbd_dev,
1229 struct rbd_req_coll *coll,
1232 return rbd_do_op(rq, rbd_dev, NULL,
1236 ofs, len, bio, coll, coll_index);
1240 * Request sync osd read
1242 static int rbd_req_sync_read(struct rbd_device *rbd_dev,
1244 const char *object_name,
1249 struct ceph_osd_req_op *ops;
1252 ops = rbd_create_rw_ops(1, CEPH_OSD_OP_READ, 0);
1256 ret = rbd_req_sync_op(rbd_dev, NULL,
1259 ops, object_name, ofs, len, buf, NULL, ver);
1260 rbd_destroy_ops(ops);
1266 * Request sync osd watch
1268 static int rbd_req_sync_notify_ack(struct rbd_device *rbd_dev,
1272 struct ceph_osd_req_op *ops;
1275 ops = rbd_create_rw_ops(1, CEPH_OSD_OP_NOTIFY_ACK, 0);
1279 ops[0].watch.ver = cpu_to_le64(ver);
1280 ops[0].watch.cookie = notify_id;
1281 ops[0].watch.flag = 0;
1283 ret = rbd_do_request(NULL, rbd_dev, NULL, CEPH_NOSNAP,
1284 rbd_dev->header_name, 0, 0, NULL,
1289 rbd_simple_req_cb, 0, NULL);
1291 rbd_destroy_ops(ops);
1295 static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
1297 struct rbd_device *rbd_dev = (struct rbd_device *)data;
1304 dout("rbd_watch_cb %s notify_id=%llu opcode=%u\n",
1305 rbd_dev->header_name, (unsigned long long) notify_id,
1306 (unsigned int) opcode);
1307 rc = rbd_refresh_header(rbd_dev, &hver);
1309 pr_warning(RBD_DRV_NAME "%d got notification but failed to "
1310 " update snaps: %d\n", rbd_dev->major, rc);
1312 rbd_req_sync_notify_ack(rbd_dev, hver, notify_id);
1316 * Request sync osd watch
1318 static int rbd_req_sync_watch(struct rbd_device *rbd_dev)
1320 struct ceph_osd_req_op *ops;
1321 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
1324 ops = rbd_create_rw_ops(1, CEPH_OSD_OP_WATCH, 0);
1328 ret = ceph_osdc_create_event(osdc, rbd_watch_cb, 0,
1329 (void *)rbd_dev, &rbd_dev->watch_event);
1333 ops[0].watch.ver = cpu_to_le64(rbd_dev->header.obj_version);
1334 ops[0].watch.cookie = cpu_to_le64(rbd_dev->watch_event->cookie);
1335 ops[0].watch.flag = 1;
1337 ret = rbd_req_sync_op(rbd_dev, NULL,
1339 CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1341 rbd_dev->header_name,
1343 &rbd_dev->watch_request, NULL);
1348 rbd_destroy_ops(ops);
1352 ceph_osdc_cancel_event(rbd_dev->watch_event);
1353 rbd_dev->watch_event = NULL;
1355 rbd_destroy_ops(ops);
1360 * Request sync osd unwatch
1362 static int rbd_req_sync_unwatch(struct rbd_device *rbd_dev)
1364 struct ceph_osd_req_op *ops;
1367 ops = rbd_create_rw_ops(1, CEPH_OSD_OP_WATCH, 0);
1371 ops[0].watch.ver = 0;
1372 ops[0].watch.cookie = cpu_to_le64(rbd_dev->watch_event->cookie);
1373 ops[0].watch.flag = 0;
1375 ret = rbd_req_sync_op(rbd_dev, NULL,
1377 CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1379 rbd_dev->header_name,
1380 0, 0, NULL, NULL, NULL);
1383 rbd_destroy_ops(ops);
1384 ceph_osdc_cancel_event(rbd_dev->watch_event);
1385 rbd_dev->watch_event = NULL;
1390 * Synchronous osd object method call
1392 static int rbd_req_sync_exec(struct rbd_device *rbd_dev,
1393 const char *object_name,
1394 const char *class_name,
1395 const char *method_name,
1396 const char *outbound,
1397 size_t outbound_size,
1399 size_t inbound_size,
1403 struct ceph_osd_req_op *ops;
1404 int class_name_len = strlen(class_name);
1405 int method_name_len = strlen(method_name);
1410 * Any input parameters required by the method we're calling
1411 * will be sent along with the class and method names as
1412 * part of the message payload. That data and its size are
1413 * supplied via the indata and indata_len fields (named from
1414 * the perspective of the server side) in the OSD request
1417 payload_size = class_name_len + method_name_len + outbound_size;
1418 ops = rbd_create_rw_ops(1, CEPH_OSD_OP_CALL, payload_size);
1422 ops[0].cls.class_name = class_name;
1423 ops[0].cls.class_len = (__u8) class_name_len;
1424 ops[0].cls.method_name = method_name;
1425 ops[0].cls.method_len = (__u8) method_name_len;
1426 ops[0].cls.argc = 0;
1427 ops[0].cls.indata = outbound;
1428 ops[0].cls.indata_len = outbound_size;
1430 ret = rbd_req_sync_op(rbd_dev, NULL,
1433 object_name, 0, inbound_size, inbound,
1436 rbd_destroy_ops(ops);
1438 dout("cls_exec returned %d\n", ret);
1442 static struct rbd_req_coll *rbd_alloc_coll(int num_reqs)
1444 struct rbd_req_coll *coll =
1445 kzalloc(sizeof(struct rbd_req_coll) +
1446 sizeof(struct rbd_req_status) * num_reqs,
1451 coll->total = num_reqs;
1452 kref_init(&coll->kref);
1457 * block device queue callback
1459 static void rbd_rq_fn(struct request_queue *q)
1461 struct rbd_device *rbd_dev = q->queuedata;
1463 struct bio_pair *bp = NULL;
1465 while ((rq = blk_fetch_request(q))) {
1467 struct bio *rq_bio, *next_bio = NULL;
1472 int num_segs, cur_seg = 0;
1473 struct rbd_req_coll *coll;
1474 struct ceph_snap_context *snapc;
1476 dout("fetched request\n");
1478 /* filter out block requests we don't understand */
1479 if ((rq->cmd_type != REQ_TYPE_FS)) {
1480 __blk_end_request_all(rq, 0);
1484 /* deduce our operation (read, write) */
1485 do_write = (rq_data_dir(rq) == WRITE);
1487 size = blk_rq_bytes(rq);
1488 ofs = blk_rq_pos(rq) * SECTOR_SIZE;
1490 if (do_write && rbd_dev->mapping.read_only) {
1491 __blk_end_request_all(rq, -EROFS);
1495 spin_unlock_irq(q->queue_lock);
1497 down_read(&rbd_dev->header_rwsem);
1499 if (rbd_dev->mapping.snap_id != CEPH_NOSNAP &&
1500 !rbd_dev->mapping.snap_exists) {
1501 up_read(&rbd_dev->header_rwsem);
1502 dout("request for non-existent snapshot");
1503 spin_lock_irq(q->queue_lock);
1504 __blk_end_request_all(rq, -ENXIO);
1508 snapc = ceph_get_snap_context(rbd_dev->header.snapc);
1510 up_read(&rbd_dev->header_rwsem);
1512 dout("%s 0x%x bytes at 0x%llx\n",
1513 do_write ? "write" : "read",
1514 size, (unsigned long long) blk_rq_pos(rq) * SECTOR_SIZE);
1516 num_segs = rbd_get_num_segments(&rbd_dev->header, ofs, size);
1517 if (num_segs <= 0) {
1518 spin_lock_irq(q->queue_lock);
1519 __blk_end_request_all(rq, num_segs);
1520 ceph_put_snap_context(snapc);
1523 coll = rbd_alloc_coll(num_segs);
1525 spin_lock_irq(q->queue_lock);
1526 __blk_end_request_all(rq, -ENOMEM);
1527 ceph_put_snap_context(snapc);
1532 /* a bio clone to be passed down to OSD req */
1533 dout("rq->bio->bi_vcnt=%hu\n", rq->bio->bi_vcnt);
1534 op_size = rbd_segment_length(rbd_dev, ofs, size);
1535 kref_get(&coll->kref);
1536 bio = bio_chain_clone(&rq_bio, &next_bio, &bp,
1537 op_size, GFP_ATOMIC);
1539 rbd_coll_end_req_index(rq, coll, cur_seg,
1545 /* init OSD command: write or read */
1547 rbd_req_write(rq, rbd_dev,
1553 rbd_req_read(rq, rbd_dev,
1554 rbd_dev->mapping.snap_id,
1566 kref_put(&coll->kref, rbd_coll_release);
1569 bio_pair_release(bp);
1570 spin_lock_irq(q->queue_lock);
1572 ceph_put_snap_context(snapc);
1577 * a queue callback. Makes sure that we don't create a bio that spans across
1578 * multiple osd objects. One exception would be with a single page bios,
1579 * which we handle later at bio_chain_clone
1581 static int rbd_merge_bvec(struct request_queue *q, struct bvec_merge_data *bmd,
1582 struct bio_vec *bvec)
1584 struct rbd_device *rbd_dev = q->queuedata;
1585 unsigned int chunk_sectors;
1587 unsigned int bio_sectors;
1590 chunk_sectors = 1 << (rbd_dev->header.obj_order - SECTOR_SHIFT);
1591 sector = bmd->bi_sector + get_start_sect(bmd->bi_bdev);
1592 bio_sectors = bmd->bi_size >> SECTOR_SHIFT;
1594 max = (chunk_sectors - ((sector & (chunk_sectors - 1))
1595 + bio_sectors)) << SECTOR_SHIFT;
1597 max = 0; /* bio_add cannot handle a negative return */
1598 if (max <= bvec->bv_len && bio_sectors == 0)
1599 return bvec->bv_len;
1603 static void rbd_free_disk(struct rbd_device *rbd_dev)
1605 struct gendisk *disk = rbd_dev->disk;
1610 if (disk->flags & GENHD_FL_UP)
1613 blk_cleanup_queue(disk->queue);
1618 * Read the complete header for the given rbd device.
1620 * Returns a pointer to a dynamically-allocated buffer containing
1621 * the complete and validated header. Caller can pass the address
1622 * of a variable that will be filled in with the version of the
1623 * header object at the time it was read.
1625 * Returns a pointer-coded errno if a failure occurs.
1627 static struct rbd_image_header_ondisk *
1628 rbd_dev_v1_header_read(struct rbd_device *rbd_dev, u64 *version)
1630 struct rbd_image_header_ondisk *ondisk = NULL;
1637 * The complete header will include an array of its 64-bit
1638 * snapshot ids, followed by the names of those snapshots as
1639 * a contiguous block of NUL-terminated strings. Note that
1640 * the number of snapshots could change by the time we read
1641 * it in, in which case we re-read it.
1648 size = sizeof (*ondisk);
1649 size += snap_count * sizeof (struct rbd_image_snap_ondisk);
1651 ondisk = kmalloc(size, GFP_KERNEL);
1653 return ERR_PTR(-ENOMEM);
1655 ret = rbd_req_sync_read(rbd_dev, CEPH_NOSNAP,
1656 rbd_dev->header_name,
1658 (char *) ondisk, version);
1662 if (WARN_ON((size_t) ret < size)) {
1664 pr_warning("short header read for image %s"
1665 " (want %zd got %d)\n",
1666 rbd_dev->image_name, size, ret);
1669 if (!rbd_dev_ondisk_valid(ondisk)) {
1671 pr_warning("invalid header for image %s\n",
1672 rbd_dev->image_name);
1676 names_size = le64_to_cpu(ondisk->snap_names_len);
1677 want_count = snap_count;
1678 snap_count = le32_to_cpu(ondisk->snap_count);
1679 } while (snap_count != want_count);
1686 return ERR_PTR(ret);
1690 * reload the ondisk the header
1692 static int rbd_read_header(struct rbd_device *rbd_dev,
1693 struct rbd_image_header *header)
1695 struct rbd_image_header_ondisk *ondisk;
1699 ondisk = rbd_dev_v1_header_read(rbd_dev, &ver);
1701 return PTR_ERR(ondisk);
1702 ret = rbd_header_from_disk(header, ondisk);
1704 header->obj_version = ver;
1710 static void __rbd_remove_all_snaps(struct rbd_device *rbd_dev)
1712 struct rbd_snap *snap;
1713 struct rbd_snap *next;
1715 list_for_each_entry_safe(snap, next, &rbd_dev->snaps, node)
1716 __rbd_remove_snap_dev(snap);
1720 * only read the first part of the ondisk header, without the snaps info
1722 static int __rbd_refresh_header(struct rbd_device *rbd_dev, u64 *hver)
1725 struct rbd_image_header h;
1727 ret = rbd_read_header(rbd_dev, &h);
1731 down_write(&rbd_dev->header_rwsem);
1734 if (rbd_dev->mapping.snap_id == CEPH_NOSNAP) {
1735 sector_t size = (sector_t) h.image_size / SECTOR_SIZE;
1737 if (size != (sector_t) rbd_dev->mapping.size) {
1738 dout("setting size to %llu sectors",
1739 (unsigned long long) size);
1740 rbd_dev->mapping.size = (u64) size;
1741 set_capacity(rbd_dev->disk, size);
1745 /* rbd_dev->header.object_prefix shouldn't change */
1746 kfree(rbd_dev->header.snap_sizes);
1747 kfree(rbd_dev->header.snap_names);
1748 /* osd requests may still refer to snapc */
1749 ceph_put_snap_context(rbd_dev->header.snapc);
1752 *hver = h.obj_version;
1753 rbd_dev->header.obj_version = h.obj_version;
1754 rbd_dev->header.image_size = h.image_size;
1755 rbd_dev->header.snapc = h.snapc;
1756 rbd_dev->header.snap_names = h.snap_names;
1757 rbd_dev->header.snap_sizes = h.snap_sizes;
1758 /* Free the extra copy of the object prefix */
1759 WARN_ON(strcmp(rbd_dev->header.object_prefix, h.object_prefix));
1760 kfree(h.object_prefix);
1762 ret = rbd_dev_snaps_update(rbd_dev);
1764 ret = rbd_dev_snaps_register(rbd_dev);
1766 up_write(&rbd_dev->header_rwsem);
1771 static int rbd_refresh_header(struct rbd_device *rbd_dev, u64 *hver)
1775 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
1776 ret = __rbd_refresh_header(rbd_dev, hver);
1777 mutex_unlock(&ctl_mutex);
1782 static int rbd_init_disk(struct rbd_device *rbd_dev)
1784 struct gendisk *disk;
1785 struct request_queue *q;
1788 /* create gendisk info */
1789 disk = alloc_disk(RBD_MINORS_PER_MAJOR);
1793 snprintf(disk->disk_name, sizeof(disk->disk_name), RBD_DRV_NAME "%d",
1795 disk->major = rbd_dev->major;
1796 disk->first_minor = 0;
1797 disk->fops = &rbd_bd_ops;
1798 disk->private_data = rbd_dev;
1801 q = blk_init_queue(rbd_rq_fn, &rbd_dev->lock);
1805 /* We use the default size, but let's be explicit about it. */
1806 blk_queue_physical_block_size(q, SECTOR_SIZE);
1808 /* set io sizes to object size */
1809 segment_size = rbd_obj_bytes(&rbd_dev->header);
1810 blk_queue_max_hw_sectors(q, segment_size / SECTOR_SIZE);
1811 blk_queue_max_segment_size(q, segment_size);
1812 blk_queue_io_min(q, segment_size);
1813 blk_queue_io_opt(q, segment_size);
1815 blk_queue_merge_bvec(q, rbd_merge_bvec);
1818 q->queuedata = rbd_dev;
1820 rbd_dev->disk = disk;
1822 set_capacity(rbd_dev->disk, rbd_dev->mapping.size / SECTOR_SIZE);
1835 static struct rbd_device *dev_to_rbd_dev(struct device *dev)
1837 return container_of(dev, struct rbd_device, dev);
1840 static ssize_t rbd_size_show(struct device *dev,
1841 struct device_attribute *attr, char *buf)
1843 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1846 down_read(&rbd_dev->header_rwsem);
1847 size = get_capacity(rbd_dev->disk);
1848 up_read(&rbd_dev->header_rwsem);
1850 return sprintf(buf, "%llu\n", (unsigned long long) size * SECTOR_SIZE);
1854 * Note this shows the features for whatever's mapped, which is not
1855 * necessarily the base image.
1857 static ssize_t rbd_features_show(struct device *dev,
1858 struct device_attribute *attr, char *buf)
1860 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1862 return sprintf(buf, "0x%016llx\n",
1863 (unsigned long long) rbd_dev->mapping.features);
1866 static ssize_t rbd_major_show(struct device *dev,
1867 struct device_attribute *attr, char *buf)
1869 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1871 return sprintf(buf, "%d\n", rbd_dev->major);
1874 static ssize_t rbd_client_id_show(struct device *dev,
1875 struct device_attribute *attr, char *buf)
1877 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1879 return sprintf(buf, "client%lld\n",
1880 ceph_client_id(rbd_dev->rbd_client->client));
1883 static ssize_t rbd_pool_show(struct device *dev,
1884 struct device_attribute *attr, char *buf)
1886 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1888 return sprintf(buf, "%s\n", rbd_dev->pool_name);
1891 static ssize_t rbd_pool_id_show(struct device *dev,
1892 struct device_attribute *attr, char *buf)
1894 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1896 return sprintf(buf, "%d\n", rbd_dev->pool_id);
1899 static ssize_t rbd_name_show(struct device *dev,
1900 struct device_attribute *attr, char *buf)
1902 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1904 return sprintf(buf, "%s\n", rbd_dev->image_name);
1907 static ssize_t rbd_image_id_show(struct device *dev,
1908 struct device_attribute *attr, char *buf)
1910 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1912 return sprintf(buf, "%s\n", rbd_dev->image_id);
1916 * Shows the name of the currently-mapped snapshot (or
1917 * RBD_SNAP_HEAD_NAME for the base image).
1919 static ssize_t rbd_snap_show(struct device *dev,
1920 struct device_attribute *attr,
1923 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1925 return sprintf(buf, "%s\n", rbd_dev->mapping.snap_name);
1928 static ssize_t rbd_image_refresh(struct device *dev,
1929 struct device_attribute *attr,
1933 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1936 ret = rbd_refresh_header(rbd_dev, NULL);
1938 return ret < 0 ? ret : size;
1941 static DEVICE_ATTR(size, S_IRUGO, rbd_size_show, NULL);
1942 static DEVICE_ATTR(features, S_IRUGO, rbd_features_show, NULL);
1943 static DEVICE_ATTR(major, S_IRUGO, rbd_major_show, NULL);
1944 static DEVICE_ATTR(client_id, S_IRUGO, rbd_client_id_show, NULL);
1945 static DEVICE_ATTR(pool, S_IRUGO, rbd_pool_show, NULL);
1946 static DEVICE_ATTR(pool_id, S_IRUGO, rbd_pool_id_show, NULL);
1947 static DEVICE_ATTR(name, S_IRUGO, rbd_name_show, NULL);
1948 static DEVICE_ATTR(image_id, S_IRUGO, rbd_image_id_show, NULL);
1949 static DEVICE_ATTR(refresh, S_IWUSR, NULL, rbd_image_refresh);
1950 static DEVICE_ATTR(current_snap, S_IRUGO, rbd_snap_show, NULL);
1952 static struct attribute *rbd_attrs[] = {
1953 &dev_attr_size.attr,
1954 &dev_attr_features.attr,
1955 &dev_attr_major.attr,
1956 &dev_attr_client_id.attr,
1957 &dev_attr_pool.attr,
1958 &dev_attr_pool_id.attr,
1959 &dev_attr_name.attr,
1960 &dev_attr_image_id.attr,
1961 &dev_attr_current_snap.attr,
1962 &dev_attr_refresh.attr,
1966 static struct attribute_group rbd_attr_group = {
1970 static const struct attribute_group *rbd_attr_groups[] = {
1975 static void rbd_sysfs_dev_release(struct device *dev)
1979 static struct device_type rbd_device_type = {
1981 .groups = rbd_attr_groups,
1982 .release = rbd_sysfs_dev_release,
1990 static ssize_t rbd_snap_size_show(struct device *dev,
1991 struct device_attribute *attr,
1994 struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
1996 return sprintf(buf, "%llu\n", (unsigned long long)snap->size);
1999 static ssize_t rbd_snap_id_show(struct device *dev,
2000 struct device_attribute *attr,
2003 struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
2005 return sprintf(buf, "%llu\n", (unsigned long long)snap->id);
2008 static ssize_t rbd_snap_features_show(struct device *dev,
2009 struct device_attribute *attr,
2012 struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
2014 return sprintf(buf, "0x%016llx\n",
2015 (unsigned long long) snap->features);
2018 static DEVICE_ATTR(snap_size, S_IRUGO, rbd_snap_size_show, NULL);
2019 static DEVICE_ATTR(snap_id, S_IRUGO, rbd_snap_id_show, NULL);
2020 static DEVICE_ATTR(snap_features, S_IRUGO, rbd_snap_features_show, NULL);
2022 static struct attribute *rbd_snap_attrs[] = {
2023 &dev_attr_snap_size.attr,
2024 &dev_attr_snap_id.attr,
2025 &dev_attr_snap_features.attr,
2029 static struct attribute_group rbd_snap_attr_group = {
2030 .attrs = rbd_snap_attrs,
2033 static void rbd_snap_dev_release(struct device *dev)
2035 struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
2040 static const struct attribute_group *rbd_snap_attr_groups[] = {
2041 &rbd_snap_attr_group,
2045 static struct device_type rbd_snap_device_type = {
2046 .groups = rbd_snap_attr_groups,
2047 .release = rbd_snap_dev_release,
2050 static bool rbd_snap_registered(struct rbd_snap *snap)
2052 bool ret = snap->dev.type == &rbd_snap_device_type;
2053 bool reg = device_is_registered(&snap->dev);
2055 rbd_assert(!ret ^ reg);
2060 static void __rbd_remove_snap_dev(struct rbd_snap *snap)
2062 list_del(&snap->node);
2063 if (device_is_registered(&snap->dev))
2064 device_unregister(&snap->dev);
2067 static int rbd_register_snap_dev(struct rbd_snap *snap,
2068 struct device *parent)
2070 struct device *dev = &snap->dev;
2073 dev->type = &rbd_snap_device_type;
2074 dev->parent = parent;
2075 dev->release = rbd_snap_dev_release;
2076 dev_set_name(dev, "snap_%s", snap->name);
2077 dout("%s: registering device for snapshot %s\n", __func__, snap->name);
2079 ret = device_register(dev);
2084 static struct rbd_snap *__rbd_add_snap_dev(struct rbd_device *rbd_dev,
2085 const char *snap_name,
2086 u64 snap_id, u64 snap_size,
2089 struct rbd_snap *snap;
2092 snap = kzalloc(sizeof (*snap), GFP_KERNEL);
2094 return ERR_PTR(-ENOMEM);
2097 snap->name = kstrdup(snap_name, GFP_KERNEL);
2102 snap->size = snap_size;
2103 snap->features = snap_features;
2111 return ERR_PTR(ret);
2114 static char *rbd_dev_v1_snap_info(struct rbd_device *rbd_dev, u32 which,
2115 u64 *snap_size, u64 *snap_features)
2119 rbd_assert(which < rbd_dev->header.snapc->num_snaps);
2121 *snap_size = rbd_dev->header.snap_sizes[which];
2122 *snap_features = 0; /* No features for v1 */
2124 /* Skip over names until we find the one we are looking for */
2126 snap_name = rbd_dev->header.snap_names;
2128 snap_name += strlen(snap_name) + 1;
2134 * Get the size and object order for an image snapshot, or if
2135 * snap_id is CEPH_NOSNAP, gets this information for the base
2138 static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
2139 u8 *order, u64 *snap_size)
2141 __le64 snapid = cpu_to_le64(snap_id);
2146 } __attribute__ ((packed)) size_buf = { 0 };
2148 ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name,
2150 (char *) &snapid, sizeof (snapid),
2151 (char *) &size_buf, sizeof (size_buf),
2152 CEPH_OSD_FLAG_READ, NULL);
2153 dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret);
2157 *order = size_buf.order;
2158 *snap_size = le64_to_cpu(size_buf.size);
2160 dout(" snap_id 0x%016llx order = %u, snap_size = %llu\n",
2161 (unsigned long long) snap_id, (unsigned int) *order,
2162 (unsigned long long) *snap_size);
2167 static int rbd_dev_v2_image_size(struct rbd_device *rbd_dev)
2169 return _rbd_dev_v2_snap_size(rbd_dev, CEPH_NOSNAP,
2170 &rbd_dev->header.obj_order,
2171 &rbd_dev->header.image_size);
2174 static int rbd_dev_v2_object_prefix(struct rbd_device *rbd_dev)
2180 reply_buf = kzalloc(RBD_OBJ_PREFIX_LEN_MAX, GFP_KERNEL);
2184 ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name,
2185 "rbd", "get_object_prefix",
2187 reply_buf, RBD_OBJ_PREFIX_LEN_MAX,
2188 CEPH_OSD_FLAG_READ, NULL);
2189 dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret);
2194 rbd_dev->header.object_prefix = ceph_extract_encoded_string(&p,
2195 p + RBD_OBJ_PREFIX_LEN_MAX,
2198 if (IS_ERR(rbd_dev->header.object_prefix)) {
2199 ret = PTR_ERR(rbd_dev->header.object_prefix);
2200 rbd_dev->header.object_prefix = NULL;
2202 dout(" object_prefix = %s\n", rbd_dev->header.object_prefix);
2211 static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
2214 __le64 snapid = cpu_to_le64(snap_id);
2218 } features_buf = { 0 };
2221 ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name,
2222 "rbd", "get_features",
2223 (char *) &snapid, sizeof (snapid),
2224 (char *) &features_buf, sizeof (features_buf),
2225 CEPH_OSD_FLAG_READ, NULL);
2226 dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret);
2229 *snap_features = le64_to_cpu(features_buf.features);
2231 dout(" snap_id 0x%016llx features = 0x%016llx incompat = 0x%016llx\n",
2232 (unsigned long long) snap_id,
2233 (unsigned long long) *snap_features,
2234 (unsigned long long) le64_to_cpu(features_buf.incompat));
2239 static int rbd_dev_v2_features(struct rbd_device *rbd_dev)
2241 return _rbd_dev_v2_snap_features(rbd_dev, CEPH_NOSNAP,
2242 &rbd_dev->header.features);
2245 static int rbd_dev_v2_snap_context(struct rbd_device *rbd_dev, u64 *ver)
2254 struct ceph_snap_context *snapc;
2258 * We'll need room for the seq value (maximum snapshot id),
2259 * snapshot count, and array of that many snapshot ids.
2260 * For now we have a fixed upper limit on the number we're
2261 * prepared to receive.
2263 size = sizeof (__le64) + sizeof (__le32) +
2264 RBD_MAX_SNAP_COUNT * sizeof (__le64);
2265 reply_buf = kzalloc(size, GFP_KERNEL);
2269 ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name,
2270 "rbd", "get_snapcontext",
2273 CEPH_OSD_FLAG_READ, ver);
2274 dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret);
2280 end = (char *) reply_buf + size;
2281 ceph_decode_64_safe(&p, end, seq, out);
2282 ceph_decode_32_safe(&p, end, snap_count, out);
2285 * Make sure the reported number of snapshot ids wouldn't go
2286 * beyond the end of our buffer. But before checking that,
2287 * make sure the computed size of the snapshot context we
2288 * allocate is representable in a size_t.
2290 if (snap_count > (SIZE_MAX - sizeof (struct ceph_snap_context))
2295 if (!ceph_has_room(&p, end, snap_count * sizeof (__le64)))
2298 size = sizeof (struct ceph_snap_context) +
2299 snap_count * sizeof (snapc->snaps[0]);
2300 snapc = kmalloc(size, GFP_KERNEL);
2306 atomic_set(&snapc->nref, 1);
2308 snapc->num_snaps = snap_count;
2309 for (i = 0; i < snap_count; i++)
2310 snapc->snaps[i] = ceph_decode_64(&p);
2312 rbd_dev->header.snapc = snapc;
2314 dout(" snap context seq = %llu, snap_count = %u\n",
2315 (unsigned long long) seq, (unsigned int) snap_count);
2323 static char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev, u32 which)
2331 size_t snap_name_len;
2334 size = sizeof (__le32) + RBD_MAX_SNAP_NAME_LEN;
2335 reply_buf = kmalloc(size, GFP_KERNEL);
2337 return ERR_PTR(-ENOMEM);
2339 snap_id = cpu_to_le64(rbd_dev->header.snapc->snaps[which]);
2340 ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name,
2341 "rbd", "get_snapshot_name",
2342 (char *) &snap_id, sizeof (snap_id),
2344 CEPH_OSD_FLAG_READ, NULL);
2345 dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret);
2350 end = (char *) reply_buf + size;
2352 snap_name = ceph_extract_encoded_string(&p, end, &snap_name_len,
2354 if (IS_ERR(snap_name)) {
2355 ret = PTR_ERR(snap_name);
2358 dout(" snap_id 0x%016llx snap_name = %s\n",
2359 (unsigned long long) le64_to_cpu(snap_id), snap_name);
2367 return ERR_PTR(ret);
2370 static char *rbd_dev_v2_snap_info(struct rbd_device *rbd_dev, u32 which,
2371 u64 *snap_size, u64 *snap_features)
2377 snap_id = rbd_dev->header.snapc->snaps[which];
2378 ret = _rbd_dev_v2_snap_size(rbd_dev, snap_id, &order, snap_size);
2380 return ERR_PTR(ret);
2381 ret = _rbd_dev_v2_snap_features(rbd_dev, snap_id, snap_features);
2383 return ERR_PTR(ret);
2385 return rbd_dev_v2_snap_name(rbd_dev, which);
2388 static char *rbd_dev_snap_info(struct rbd_device *rbd_dev, u32 which,
2389 u64 *snap_size, u64 *snap_features)
2391 if (rbd_dev->image_format == 1)
2392 return rbd_dev_v1_snap_info(rbd_dev, which,
2393 snap_size, snap_features);
2394 if (rbd_dev->image_format == 2)
2395 return rbd_dev_v2_snap_info(rbd_dev, which,
2396 snap_size, snap_features);
2397 return ERR_PTR(-EINVAL);
2401 * Scan the rbd device's current snapshot list and compare it to the
2402 * newly-received snapshot context. Remove any existing snapshots
2403 * not present in the new snapshot context. Add a new snapshot for
2404 * any snaphots in the snapshot context not in the current list.
2405 * And verify there are no changes to snapshots we already know
2408 * Assumes the snapshots in the snapshot context are sorted by
2409 * snapshot id, highest id first. (Snapshots in the rbd_dev's list
2410 * are also maintained in that order.)
2412 static int rbd_dev_snaps_update(struct rbd_device *rbd_dev)
2414 struct ceph_snap_context *snapc = rbd_dev->header.snapc;
2415 const u32 snap_count = snapc->num_snaps;
2416 struct list_head *head = &rbd_dev->snaps;
2417 struct list_head *links = head->next;
2420 dout("%s: snap count is %u\n", __func__, (unsigned int) snap_count);
2421 while (index < snap_count || links != head) {
2423 struct rbd_snap *snap;
2426 u64 snap_features = 0;
2428 snap_id = index < snap_count ? snapc->snaps[index]
2430 snap = links != head ? list_entry(links, struct rbd_snap, node)
2432 rbd_assert(!snap || snap->id != CEPH_NOSNAP);
2434 if (snap_id == CEPH_NOSNAP || (snap && snap->id > snap_id)) {
2435 struct list_head *next = links->next;
2437 /* Existing snapshot not in the new snap context */
2439 if (rbd_dev->mapping.snap_id == snap->id)
2440 rbd_dev->mapping.snap_exists = false;
2441 __rbd_remove_snap_dev(snap);
2442 dout("%ssnap id %llu has been removed\n",
2443 rbd_dev->mapping.snap_id == snap->id ?
2445 (unsigned long long) snap->id);
2447 /* Done with this list entry; advance */
2453 snap_name = rbd_dev_snap_info(rbd_dev, index,
2454 &snap_size, &snap_features);
2455 if (IS_ERR(snap_name))
2456 return PTR_ERR(snap_name);
2458 dout("entry %u: snap_id = %llu\n", (unsigned int) snap_count,
2459 (unsigned long long) snap_id);
2460 if (!snap || (snap_id != CEPH_NOSNAP && snap->id < snap_id)) {
2461 struct rbd_snap *new_snap;
2463 /* We haven't seen this snapshot before */
2465 new_snap = __rbd_add_snap_dev(rbd_dev, snap_name,
2466 snap_id, snap_size, snap_features);
2467 if (IS_ERR(new_snap)) {
2468 int err = PTR_ERR(new_snap);
2470 dout(" failed to add dev, error %d\n", err);
2475 /* New goes before existing, or at end of list */
2477 dout(" added dev%s\n", snap ? "" : " at end\n");
2479 list_add_tail(&new_snap->node, &snap->node);
2481 list_add_tail(&new_snap->node, head);
2483 /* Already have this one */
2485 dout(" already present\n");
2487 rbd_assert(snap->size == snap_size);
2488 rbd_assert(!strcmp(snap->name, snap_name));
2489 rbd_assert(snap->features == snap_features);
2491 /* Done with this list entry; advance */
2493 links = links->next;
2496 /* Advance to the next entry in the snapshot context */
2500 dout("%s: done\n", __func__);
2506 * Scan the list of snapshots and register the devices for any that
2507 * have not already been registered.
2509 static int rbd_dev_snaps_register(struct rbd_device *rbd_dev)
2511 struct rbd_snap *snap;
2514 dout("%s called\n", __func__);
2515 if (WARN_ON(!device_is_registered(&rbd_dev->dev)))
2518 list_for_each_entry(snap, &rbd_dev->snaps, node) {
2519 if (!rbd_snap_registered(snap)) {
2520 ret = rbd_register_snap_dev(snap, &rbd_dev->dev);
2525 dout("%s: returning %d\n", __func__, ret);
2530 static int rbd_bus_add_dev(struct rbd_device *rbd_dev)
2535 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2537 dev = &rbd_dev->dev;
2538 dev->bus = &rbd_bus_type;
2539 dev->type = &rbd_device_type;
2540 dev->parent = &rbd_root_dev;
2541 dev->release = rbd_dev_release;
2542 dev_set_name(dev, "%d", rbd_dev->dev_id);
2543 ret = device_register(dev);
2545 mutex_unlock(&ctl_mutex);
2550 static void rbd_bus_del_dev(struct rbd_device *rbd_dev)
2552 device_unregister(&rbd_dev->dev);
2555 static int rbd_init_watch_dev(struct rbd_device *rbd_dev)
2560 ret = rbd_req_sync_watch(rbd_dev);
2561 if (ret == -ERANGE) {
2562 rc = rbd_refresh_header(rbd_dev, NULL);
2566 } while (ret == -ERANGE);
2571 static atomic64_t rbd_dev_id_max = ATOMIC64_INIT(0);
2574 * Get a unique rbd identifier for the given new rbd_dev, and add
2575 * the rbd_dev to the global list. The minimum rbd id is 1.
2577 static void rbd_dev_id_get(struct rbd_device *rbd_dev)
2579 rbd_dev->dev_id = atomic64_inc_return(&rbd_dev_id_max);
2581 spin_lock(&rbd_dev_list_lock);
2582 list_add_tail(&rbd_dev->node, &rbd_dev_list);
2583 spin_unlock(&rbd_dev_list_lock);
2584 dout("rbd_dev %p given dev id %llu\n", rbd_dev,
2585 (unsigned long long) rbd_dev->dev_id);
2589 * Remove an rbd_dev from the global list, and record that its
2590 * identifier is no longer in use.
2592 static void rbd_dev_id_put(struct rbd_device *rbd_dev)
2594 struct list_head *tmp;
2595 int rbd_id = rbd_dev->dev_id;
2598 rbd_assert(rbd_id > 0);
2600 dout("rbd_dev %p released dev id %llu\n", rbd_dev,
2601 (unsigned long long) rbd_dev->dev_id);
2602 spin_lock(&rbd_dev_list_lock);
2603 list_del_init(&rbd_dev->node);
2606 * If the id being "put" is not the current maximum, there
2607 * is nothing special we need to do.
2609 if (rbd_id != atomic64_read(&rbd_dev_id_max)) {
2610 spin_unlock(&rbd_dev_list_lock);
2615 * We need to update the current maximum id. Search the
2616 * list to find out what it is. We're more likely to find
2617 * the maximum at the end, so search the list backward.
2620 list_for_each_prev(tmp, &rbd_dev_list) {
2621 struct rbd_device *rbd_dev;
2623 rbd_dev = list_entry(tmp, struct rbd_device, node);
2624 if (rbd_id > max_id)
2627 spin_unlock(&rbd_dev_list_lock);
2630 * The max id could have been updated by rbd_dev_id_get(), in
2631 * which case it now accurately reflects the new maximum.
2632 * Be careful not to overwrite the maximum value in that
2635 atomic64_cmpxchg(&rbd_dev_id_max, rbd_id, max_id);
2636 dout(" max dev id has been reset\n");
2640 * Skips over white space at *buf, and updates *buf to point to the
2641 * first found non-space character (if any). Returns the length of
2642 * the token (string of non-white space characters) found. Note
2643 * that *buf must be terminated with '\0'.
2645 static inline size_t next_token(const char **buf)
2648 * These are the characters that produce nonzero for
2649 * isspace() in the "C" and "POSIX" locales.
2651 const char *spaces = " \f\n\r\t\v";
2653 *buf += strspn(*buf, spaces); /* Find start of token */
2655 return strcspn(*buf, spaces); /* Return token length */
2659 * Finds the next token in *buf, and if the provided token buffer is
2660 * big enough, copies the found token into it. The result, if
2661 * copied, is guaranteed to be terminated with '\0'. Note that *buf
2662 * must be terminated with '\0' on entry.
2664 * Returns the length of the token found (not including the '\0').
2665 * Return value will be 0 if no token is found, and it will be >=
2666 * token_size if the token would not fit.
2668 * The *buf pointer will be updated to point beyond the end of the
2669 * found token. Note that this occurs even if the token buffer is
2670 * too small to hold it.
2672 static inline size_t copy_token(const char **buf,
2678 len = next_token(buf);
2679 if (len < token_size) {
2680 memcpy(token, *buf, len);
2681 *(token + len) = '\0';
2689 * Finds the next token in *buf, dynamically allocates a buffer big
2690 * enough to hold a copy of it, and copies the token into the new
2691 * buffer. The copy is guaranteed to be terminated with '\0'. Note
2692 * that a duplicate buffer is created even for a zero-length token.
2694 * Returns a pointer to the newly-allocated duplicate, or a null
2695 * pointer if memory for the duplicate was not available. If
2696 * the lenp argument is a non-null pointer, the length of the token
2697 * (not including the '\0') is returned in *lenp.
2699 * If successful, the *buf pointer will be updated to point beyond
2700 * the end of the found token.
2702 * Note: uses GFP_KERNEL for allocation.
2704 static inline char *dup_token(const char **buf, size_t *lenp)
2709 len = next_token(buf);
2710 dup = kmalloc(len + 1, GFP_KERNEL);
2714 memcpy(dup, *buf, len);
2715 *(dup + len) = '\0';
2725 * This fills in the pool_name, image_name, image_name_len, rbd_dev,
2726 * rbd_md_name, and name fields of the given rbd_dev, based on the
2727 * list of monitor addresses and other options provided via
2728 * /sys/bus/rbd/add. Returns a pointer to a dynamically-allocated
2729 * copy of the snapshot name to map if successful, or a
2730 * pointer-coded error otherwise.
2732 * Note: rbd_dev is assumed to have been initially zero-filled.
2734 static char *rbd_add_parse_args(struct rbd_device *rbd_dev,
2736 const char **mon_addrs,
2737 size_t *mon_addrs_size,
2739 size_t options_size)
2742 char *err_ptr = ERR_PTR(-EINVAL);
2745 /* The first four tokens are required */
2747 len = next_token(&buf);
2750 *mon_addrs_size = len + 1;
2755 len = copy_token(&buf, options, options_size);
2756 if (!len || len >= options_size)
2759 err_ptr = ERR_PTR(-ENOMEM);
2760 rbd_dev->pool_name = dup_token(&buf, NULL);
2761 if (!rbd_dev->pool_name)
2764 rbd_dev->image_name = dup_token(&buf, &rbd_dev->image_name_len);
2765 if (!rbd_dev->image_name)
2768 /* Snapshot name is optional */
2769 len = next_token(&buf);
2771 buf = RBD_SNAP_HEAD_NAME; /* No snapshot supplied */
2772 len = sizeof (RBD_SNAP_HEAD_NAME) - 1;
2774 snap_name = kmalloc(len + 1, GFP_KERNEL);
2777 memcpy(snap_name, buf, len);
2778 *(snap_name + len) = '\0';
2780 dout(" SNAP_NAME is <%s>, len is %zd\n", snap_name, len);
2785 kfree(rbd_dev->image_name);
2786 rbd_dev->image_name = NULL;
2787 rbd_dev->image_name_len = 0;
2788 kfree(rbd_dev->pool_name);
2789 rbd_dev->pool_name = NULL;
2795 * An rbd format 2 image has a unique identifier, distinct from the
2796 * name given to it by the user. Internally, that identifier is
2797 * what's used to specify the names of objects related to the image.
2799 * A special "rbd id" object is used to map an rbd image name to its
2800 * id. If that object doesn't exist, then there is no v2 rbd image
2801 * with the supplied name.
2803 * This function will record the given rbd_dev's image_id field if
2804 * it can be determined, and in that case will return 0. If any
2805 * errors occur a negative errno will be returned and the rbd_dev's
2806 * image_id field will be unchanged (and should be NULL).
2808 static int rbd_dev_image_id(struct rbd_device *rbd_dev)
2817 * First, see if the format 2 image id file exists, and if
2818 * so, get the image's persistent id from it.
2820 size = sizeof (RBD_ID_PREFIX) + rbd_dev->image_name_len;
2821 object_name = kmalloc(size, GFP_NOIO);
2824 sprintf(object_name, "%s%s", RBD_ID_PREFIX, rbd_dev->image_name);
2825 dout("rbd id object name is %s\n", object_name);
2827 /* Response will be an encoded string, which includes a length */
2829 size = sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX;
2830 response = kzalloc(size, GFP_NOIO);
2836 ret = rbd_req_sync_exec(rbd_dev, object_name,
2839 response, RBD_IMAGE_ID_LEN_MAX,
2840 CEPH_OSD_FLAG_READ, NULL);
2841 dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret);
2846 rbd_dev->image_id = ceph_extract_encoded_string(&p,
2847 p + RBD_IMAGE_ID_LEN_MAX,
2848 &rbd_dev->image_id_len,
2850 if (IS_ERR(rbd_dev->image_id)) {
2851 ret = PTR_ERR(rbd_dev->image_id);
2852 rbd_dev->image_id = NULL;
2854 dout("image_id is %s\n", rbd_dev->image_id);
2863 static int rbd_dev_v1_probe(struct rbd_device *rbd_dev)
2868 /* Version 1 images have no id; empty string is used */
2870 rbd_dev->image_id = kstrdup("", GFP_KERNEL);
2871 if (!rbd_dev->image_id)
2873 rbd_dev->image_id_len = 0;
2875 /* Record the header object name for this rbd image. */
2877 size = rbd_dev->image_name_len + sizeof (RBD_SUFFIX);
2878 rbd_dev->header_name = kmalloc(size, GFP_KERNEL);
2879 if (!rbd_dev->header_name) {
2883 sprintf(rbd_dev->header_name, "%s%s", rbd_dev->image_name, RBD_SUFFIX);
2885 /* Populate rbd image metadata */
2887 ret = rbd_read_header(rbd_dev, &rbd_dev->header);
2890 rbd_dev->image_format = 1;
2892 dout("discovered version 1 image, header name is %s\n",
2893 rbd_dev->header_name);
2898 kfree(rbd_dev->header_name);
2899 rbd_dev->header_name = NULL;
2900 kfree(rbd_dev->image_id);
2901 rbd_dev->image_id = NULL;
2906 static int rbd_dev_v2_probe(struct rbd_device *rbd_dev)
2913 * Image id was filled in by the caller. Record the header
2914 * object name for this rbd image.
2916 size = sizeof (RBD_HEADER_PREFIX) + rbd_dev->image_id_len;
2917 rbd_dev->header_name = kmalloc(size, GFP_KERNEL);
2918 if (!rbd_dev->header_name)
2920 sprintf(rbd_dev->header_name, "%s%s",
2921 RBD_HEADER_PREFIX, rbd_dev->image_id);
2923 /* Get the size and object order for the image */
2925 ret = rbd_dev_v2_image_size(rbd_dev);
2929 /* Get the object prefix (a.k.a. block_name) for the image */
2931 ret = rbd_dev_v2_object_prefix(rbd_dev);
2935 /* Get the features for the image */
2937 ret = rbd_dev_v2_features(rbd_dev);
2941 /* crypto and compression type aren't (yet) supported for v2 images */
2943 rbd_dev->header.crypt_type = 0;
2944 rbd_dev->header.comp_type = 0;
2946 /* Get the snapshot context, plus the header version */
2948 ret = rbd_dev_v2_snap_context(rbd_dev, &ver);
2951 rbd_dev->header.obj_version = ver;
2953 rbd_dev->image_format = 2;
2955 dout("discovered version 2 image, header name is %s\n",
2956 rbd_dev->header_name);
2960 kfree(rbd_dev->header_name);
2961 rbd_dev->header_name = NULL;
2962 kfree(rbd_dev->header.object_prefix);
2963 rbd_dev->header.object_prefix = NULL;
2969 * Probe for the existence of the header object for the given rbd
2970 * device. For format 2 images this includes determining the image
2973 static int rbd_dev_probe(struct rbd_device *rbd_dev)
2978 * Get the id from the image id object. If it's not a
2979 * format 2 image, we'll get ENOENT back, and we'll assume
2980 * it's a format 1 image.
2982 ret = rbd_dev_image_id(rbd_dev);
2984 ret = rbd_dev_v1_probe(rbd_dev);
2986 ret = rbd_dev_v2_probe(rbd_dev);
2988 dout("probe failed, returning %d\n", ret);
2993 static ssize_t rbd_add(struct bus_type *bus,
2998 struct rbd_device *rbd_dev = NULL;
2999 const char *mon_addrs = NULL;
3000 size_t mon_addrs_size = 0;
3001 struct ceph_osd_client *osdc;
3005 if (!try_module_get(THIS_MODULE))
3008 options = kmalloc(count, GFP_KERNEL);
3011 rbd_dev = kzalloc(sizeof(*rbd_dev), GFP_KERNEL);
3015 /* static rbd_device initialization */
3016 spin_lock_init(&rbd_dev->lock);
3017 INIT_LIST_HEAD(&rbd_dev->node);
3018 INIT_LIST_HEAD(&rbd_dev->snaps);
3019 init_rwsem(&rbd_dev->header_rwsem);
3021 /* parse add command */
3022 snap_name = rbd_add_parse_args(rbd_dev, buf,
3023 &mon_addrs, &mon_addrs_size, options, count);
3024 if (IS_ERR(snap_name)) {
3025 rc = PTR_ERR(snap_name);
3029 rc = rbd_get_client(rbd_dev, mon_addrs, mon_addrs_size - 1, options);
3034 osdc = &rbd_dev->rbd_client->client->osdc;
3035 rc = ceph_pg_poolid_by_name(osdc->osdmap, rbd_dev->pool_name);
3037 goto err_out_client;
3038 rbd_dev->pool_id = rc;
3040 rc = rbd_dev_probe(rbd_dev);
3042 goto err_out_client;
3043 rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
3045 /* no need to lock here, as rbd_dev is not registered yet */
3046 rc = rbd_dev_snaps_update(rbd_dev);
3048 goto err_out_header;
3050 rc = rbd_dev_set_mapping(rbd_dev, snap_name);
3052 goto err_out_header;
3054 /* generate unique id: find highest unique id, add one */
3055 rbd_dev_id_get(rbd_dev);
3057 /* Fill in the device name, now that we have its id. */
3058 BUILD_BUG_ON(DEV_NAME_LEN
3059 < sizeof (RBD_DRV_NAME) + MAX_INT_FORMAT_WIDTH);
3060 sprintf(rbd_dev->name, "%s%d", RBD_DRV_NAME, rbd_dev->dev_id);
3062 /* Get our block major device number. */
3064 rc = register_blkdev(0, rbd_dev->name);
3067 rbd_dev->major = rc;
3069 /* Set up the blkdev mapping. */
3071 rc = rbd_init_disk(rbd_dev);
3073 goto err_out_blkdev;
3075 rc = rbd_bus_add_dev(rbd_dev);
3080 * At this point cleanup in the event of an error is the job
3081 * of the sysfs code (initiated by rbd_bus_del_dev()).
3084 down_write(&rbd_dev->header_rwsem);
3085 rc = rbd_dev_snaps_register(rbd_dev);
3086 up_write(&rbd_dev->header_rwsem);
3090 rc = rbd_init_watch_dev(rbd_dev);
3094 /* Everything's ready. Announce the disk to the world. */
3096 add_disk(rbd_dev->disk);
3098 pr_info("%s: added with size 0x%llx\n", rbd_dev->disk->disk_name,
3099 (unsigned long long) rbd_dev->mapping.size);
3104 /* this will also clean up rest of rbd_dev stuff */
3106 rbd_bus_del_dev(rbd_dev);
3111 rbd_free_disk(rbd_dev);
3113 unregister_blkdev(rbd_dev->major, rbd_dev->name);
3115 rbd_dev_id_put(rbd_dev);
3117 rbd_header_free(&rbd_dev->header);
3119 kfree(rbd_dev->header_name);
3120 rbd_put_client(rbd_dev);
3121 kfree(rbd_dev->image_id);
3123 kfree(rbd_dev->mapping.snap_name);
3124 kfree(rbd_dev->image_name);
3125 kfree(rbd_dev->pool_name);
3130 dout("Error adding device %s\n", buf);
3131 module_put(THIS_MODULE);
3133 return (ssize_t) rc;
3136 static struct rbd_device *__rbd_get_dev(unsigned long dev_id)
3138 struct list_head *tmp;
3139 struct rbd_device *rbd_dev;
3141 spin_lock(&rbd_dev_list_lock);
3142 list_for_each(tmp, &rbd_dev_list) {
3143 rbd_dev = list_entry(tmp, struct rbd_device, node);
3144 if (rbd_dev->dev_id == dev_id) {
3145 spin_unlock(&rbd_dev_list_lock);
3149 spin_unlock(&rbd_dev_list_lock);
3153 static void rbd_dev_release(struct device *dev)
3155 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3157 if (rbd_dev->watch_request) {
3158 struct ceph_client *client = rbd_dev->rbd_client->client;
3160 ceph_osdc_unregister_linger_request(&client->osdc,
3161 rbd_dev->watch_request);
3163 if (rbd_dev->watch_event)
3164 rbd_req_sync_unwatch(rbd_dev);
3166 rbd_put_client(rbd_dev);
3168 /* clean up and free blkdev */
3169 rbd_free_disk(rbd_dev);
3170 unregister_blkdev(rbd_dev->major, rbd_dev->name);
3172 /* release allocated disk header fields */
3173 rbd_header_free(&rbd_dev->header);
3175 /* done with the id, and with the rbd_dev */
3176 kfree(rbd_dev->mapping.snap_name);
3177 kfree(rbd_dev->image_id);
3178 kfree(rbd_dev->header_name);
3179 kfree(rbd_dev->pool_name);
3180 kfree(rbd_dev->image_name);
3181 rbd_dev_id_put(rbd_dev);
3184 /* release module ref */
3185 module_put(THIS_MODULE);
3188 static ssize_t rbd_remove(struct bus_type *bus,
3192 struct rbd_device *rbd_dev = NULL;
3197 rc = strict_strtoul(buf, 10, &ul);
3201 /* convert to int; abort if we lost anything in the conversion */
3202 target_id = (int) ul;
3203 if (target_id != ul)
3206 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
3208 rbd_dev = __rbd_get_dev(target_id);
3214 __rbd_remove_all_snaps(rbd_dev);
3215 rbd_bus_del_dev(rbd_dev);
3218 mutex_unlock(&ctl_mutex);
3224 * create control files in sysfs
3227 static int rbd_sysfs_init(void)
3231 ret = device_register(&rbd_root_dev);
3235 ret = bus_register(&rbd_bus_type);
3237 device_unregister(&rbd_root_dev);
3242 static void rbd_sysfs_cleanup(void)
3244 bus_unregister(&rbd_bus_type);
3245 device_unregister(&rbd_root_dev);
3248 int __init rbd_init(void)
3252 rc = rbd_sysfs_init();
3255 pr_info("loaded " RBD_DRV_NAME_LONG "\n");
3259 void __exit rbd_exit(void)
3261 rbd_sysfs_cleanup();
3264 module_init(rbd_init);
3265 module_exit(rbd_exit);
3269 MODULE_DESCRIPTION("rados block device");
3271 /* following authorship retained from original osdblk.c */
3274 MODULE_LICENSE("GPL");