]> Git Repo - qemu.git/blob - block/sheepdog.c
qcow2: Implement .bdrv_inactivate
[qemu.git] / block / sheepdog.c
1 /*
2  * Copyright (C) 2009-2010 Nippon Telegraph and Telephone Corporation.
3  *
4  * This program is free software; you can redistribute it and/or
5  * modify it under the terms of the GNU General Public License version
6  * 2 as published by the Free Software Foundation.
7  *
8  * You should have received a copy of the GNU General Public License
9  * along with this program. If not, see <http://www.gnu.org/licenses/>.
10  *
11  * Contributions after 2012-01-13 are licensed under the terms of the
12  * GNU GPL, version 2 or (at your option) any later version.
13  */
14
15 #include "qemu/osdep.h"
16 #include "qemu-common.h"
17 #include "qemu/uri.h"
18 #include "qemu/error-report.h"
19 #include "qemu/sockets.h"
20 #include "block/block_int.h"
21 #include "qemu/bitops.h"
22
23 #define SD_PROTO_VER 0x01
24
25 #define SD_DEFAULT_ADDR "localhost"
26 #define SD_DEFAULT_PORT 7000
27
28 #define SD_OP_CREATE_AND_WRITE_OBJ  0x01
29 #define SD_OP_READ_OBJ       0x02
30 #define SD_OP_WRITE_OBJ      0x03
31 /* 0x04 is used internally by Sheepdog */
32
33 #define SD_OP_NEW_VDI        0x11
34 #define SD_OP_LOCK_VDI       0x12
35 #define SD_OP_RELEASE_VDI    0x13
36 #define SD_OP_GET_VDI_INFO   0x14
37 #define SD_OP_READ_VDIS      0x15
38 #define SD_OP_FLUSH_VDI      0x16
39 #define SD_OP_DEL_VDI        0x17
40 #define SD_OP_GET_CLUSTER_DEFAULT   0x18
41
42 #define SD_FLAG_CMD_WRITE    0x01
43 #define SD_FLAG_CMD_COW      0x02
44 #define SD_FLAG_CMD_CACHE    0x04 /* Writeback mode for cache */
45 #define SD_FLAG_CMD_DIRECT   0x08 /* Don't use cache */
46
47 #define SD_RES_SUCCESS       0x00 /* Success */
48 #define SD_RES_UNKNOWN       0x01 /* Unknown error */
49 #define SD_RES_NO_OBJ        0x02 /* No object found */
50 #define SD_RES_EIO           0x03 /* I/O error */
51 #define SD_RES_VDI_EXIST     0x04 /* Vdi exists already */
52 #define SD_RES_INVALID_PARMS 0x05 /* Invalid parameters */
53 #define SD_RES_SYSTEM_ERROR  0x06 /* System error */
54 #define SD_RES_VDI_LOCKED    0x07 /* Vdi is locked */
55 #define SD_RES_NO_VDI        0x08 /* No vdi found */
56 #define SD_RES_NO_BASE_VDI   0x09 /* No base vdi found */
57 #define SD_RES_VDI_READ      0x0A /* Cannot read requested vdi */
58 #define SD_RES_VDI_WRITE     0x0B /* Cannot write requested vdi */
59 #define SD_RES_BASE_VDI_READ 0x0C /* Cannot read base vdi */
60 #define SD_RES_BASE_VDI_WRITE   0x0D /* Cannot write base vdi */
61 #define SD_RES_NO_TAG        0x0E /* Requested tag is not found */
62 #define SD_RES_STARTUP       0x0F /* Sheepdog is on starting up */
63 #define SD_RES_VDI_NOT_LOCKED   0x10 /* Vdi is not locked */
64 #define SD_RES_SHUTDOWN      0x11 /* Sheepdog is shutting down */
65 #define SD_RES_NO_MEM        0x12 /* Cannot allocate memory */
66 #define SD_RES_FULL_VDI      0x13 /* we already have the maximum vdis */
67 #define SD_RES_VER_MISMATCH  0x14 /* Protocol version mismatch */
68 #define SD_RES_NO_SPACE      0x15 /* Server has no room for new objects */
69 #define SD_RES_WAIT_FOR_FORMAT  0x16 /* Waiting for a format operation */
70 #define SD_RES_WAIT_FOR_JOIN    0x17 /* Waiting for other nodes joining */
71 #define SD_RES_JOIN_FAILED   0x18 /* Target node had failed to join sheepdog */
72 #define SD_RES_HALT          0x19 /* Sheepdog is stopped serving IO request */
73 #define SD_RES_READONLY      0x1A /* Object is read-only */
74
75 /*
76  * Object ID rules
77  *
78  *  0 - 19 (20 bits): data object space
79  * 20 - 31 (12 bits): reserved data object space
80  * 32 - 55 (24 bits): vdi object space
81  * 56 - 59 ( 4 bits): reserved vdi object space
82  * 60 - 63 ( 4 bits): object type identifier space
83  */
84
85 #define VDI_SPACE_SHIFT   32
86 #define VDI_BIT (UINT64_C(1) << 63)
87 #define VMSTATE_BIT (UINT64_C(1) << 62)
88 #define MAX_DATA_OBJS (UINT64_C(1) << 20)
89 #define MAX_CHILDREN 1024
90 #define SD_MAX_VDI_LEN 256
91 #define SD_MAX_VDI_TAG_LEN 256
92 #define SD_NR_VDIS   (1U << 24)
93 #define SD_DATA_OBJ_SIZE (UINT64_C(1) << 22)
94 #define SD_MAX_VDI_SIZE (SD_DATA_OBJ_SIZE * MAX_DATA_OBJS)
95 #define SD_DEFAULT_BLOCK_SIZE_SHIFT 22
96 /*
97  * For erasure coding, we use at most SD_EC_MAX_STRIP for data strips and
98  * (SD_EC_MAX_STRIP - 1) for parity strips
99  *
100  * SD_MAX_COPIES is sum of number of data strips and parity strips.
101  */
102 #define SD_EC_MAX_STRIP 16
103 #define SD_MAX_COPIES (SD_EC_MAX_STRIP * 2 - 1)
104
105 #define SD_INODE_SIZE (sizeof(SheepdogInode))
106 #define CURRENT_VDI_ID 0
107
108 #define LOCK_TYPE_NORMAL 0
109 #define LOCK_TYPE_SHARED 1      /* for iSCSI multipath */
110
111 typedef struct SheepdogReq {
112     uint8_t proto_ver;
113     uint8_t opcode;
114     uint16_t flags;
115     uint32_t epoch;
116     uint32_t id;
117     uint32_t data_length;
118     uint32_t opcode_specific[8];
119 } SheepdogReq;
120
121 typedef struct SheepdogRsp {
122     uint8_t proto_ver;
123     uint8_t opcode;
124     uint16_t flags;
125     uint32_t epoch;
126     uint32_t id;
127     uint32_t data_length;
128     uint32_t result;
129     uint32_t opcode_specific[7];
130 } SheepdogRsp;
131
132 typedef struct SheepdogObjReq {
133     uint8_t proto_ver;
134     uint8_t opcode;
135     uint16_t flags;
136     uint32_t epoch;
137     uint32_t id;
138     uint32_t data_length;
139     uint64_t oid;
140     uint64_t cow_oid;
141     uint8_t copies;
142     uint8_t copy_policy;
143     uint8_t reserved[6];
144     uint64_t offset;
145 } SheepdogObjReq;
146
147 typedef struct SheepdogObjRsp {
148     uint8_t proto_ver;
149     uint8_t opcode;
150     uint16_t flags;
151     uint32_t epoch;
152     uint32_t id;
153     uint32_t data_length;
154     uint32_t result;
155     uint8_t copies;
156     uint8_t copy_policy;
157     uint8_t reserved[2];
158     uint32_t pad[6];
159 } SheepdogObjRsp;
160
161 typedef struct SheepdogVdiReq {
162     uint8_t proto_ver;
163     uint8_t opcode;
164     uint16_t flags;
165     uint32_t epoch;
166     uint32_t id;
167     uint32_t data_length;
168     uint64_t vdi_size;
169     uint32_t base_vdi_id;
170     uint8_t copies;
171     uint8_t copy_policy;
172     uint8_t store_policy;
173     uint8_t block_size_shift;
174     uint32_t snapid;
175     uint32_t type;
176     uint32_t pad[2];
177 } SheepdogVdiReq;
178
179 typedef struct SheepdogVdiRsp {
180     uint8_t proto_ver;
181     uint8_t opcode;
182     uint16_t flags;
183     uint32_t epoch;
184     uint32_t id;
185     uint32_t data_length;
186     uint32_t result;
187     uint32_t rsvd;
188     uint32_t vdi_id;
189     uint32_t pad[5];
190 } SheepdogVdiRsp;
191
192 typedef struct SheepdogClusterRsp {
193     uint8_t proto_ver;
194     uint8_t opcode;
195     uint16_t flags;
196     uint32_t epoch;
197     uint32_t id;
198     uint32_t data_length;
199     uint32_t result;
200     uint8_t nr_copies;
201     uint8_t copy_policy;
202     uint8_t block_size_shift;
203     uint8_t __pad1;
204     uint32_t __pad2[6];
205 } SheepdogClusterRsp;
206
207 typedef struct SheepdogInode {
208     char name[SD_MAX_VDI_LEN];
209     char tag[SD_MAX_VDI_TAG_LEN];
210     uint64_t ctime;
211     uint64_t snap_ctime;
212     uint64_t vm_clock_nsec;
213     uint64_t vdi_size;
214     uint64_t vm_state_size;
215     uint16_t copy_policy;
216     uint8_t nr_copies;
217     uint8_t block_size_shift;
218     uint32_t snap_id;
219     uint32_t vdi_id;
220     uint32_t parent_vdi_id;
221     uint32_t child_vdi_id[MAX_CHILDREN];
222     uint32_t data_vdi_id[MAX_DATA_OBJS];
223 } SheepdogInode;
224
225 #define SD_INODE_HEADER_SIZE offsetof(SheepdogInode, data_vdi_id)
226
227 /*
228  * 64 bit FNV-1a non-zero initial basis
229  */
230 #define FNV1A_64_INIT ((uint64_t)0xcbf29ce484222325ULL)
231
232 /*
233  * 64 bit Fowler/Noll/Vo FNV-1a hash code
234  */
235 static inline uint64_t fnv_64a_buf(void *buf, size_t len, uint64_t hval)
236 {
237     unsigned char *bp = buf;
238     unsigned char *be = bp + len;
239     while (bp < be) {
240         hval ^= (uint64_t) *bp++;
241         hval += (hval << 1) + (hval << 4) + (hval << 5) +
242             (hval << 7) + (hval << 8) + (hval << 40);
243     }
244     return hval;
245 }
246
247 static inline bool is_data_obj_writable(SheepdogInode *inode, unsigned int idx)
248 {
249     return inode->vdi_id == inode->data_vdi_id[idx];
250 }
251
252 static inline bool is_data_obj(uint64_t oid)
253 {
254     return !(VDI_BIT & oid);
255 }
256
257 static inline uint64_t data_oid_to_idx(uint64_t oid)
258 {
259     return oid & (MAX_DATA_OBJS - 1);
260 }
261
262 static inline uint32_t oid_to_vid(uint64_t oid)
263 {
264     return (oid & ~VDI_BIT) >> VDI_SPACE_SHIFT;
265 }
266
267 static inline uint64_t vid_to_vdi_oid(uint32_t vid)
268 {
269     return VDI_BIT | ((uint64_t)vid << VDI_SPACE_SHIFT);
270 }
271
272 static inline uint64_t vid_to_vmstate_oid(uint32_t vid, uint32_t idx)
273 {
274     return VMSTATE_BIT | ((uint64_t)vid << VDI_SPACE_SHIFT) | idx;
275 }
276
277 static inline uint64_t vid_to_data_oid(uint32_t vid, uint32_t idx)
278 {
279     return ((uint64_t)vid << VDI_SPACE_SHIFT) | idx;
280 }
281
282 static inline bool is_snapshot(struct SheepdogInode *inode)
283 {
284     return !!inode->snap_ctime;
285 }
286
287 #undef DPRINTF
288 #ifdef DEBUG_SDOG
289 #define DPRINTF(fmt, args...)                                       \
290     do {                                                            \
291         fprintf(stdout, "%s %d: " fmt, __func__, __LINE__, ##args); \
292     } while (0)
293 #else
294 #define DPRINTF(fmt, args...)
295 #endif
296
297 typedef struct SheepdogAIOCB SheepdogAIOCB;
298
299 typedef struct AIOReq {
300     SheepdogAIOCB *aiocb;
301     unsigned int iov_offset;
302
303     uint64_t oid;
304     uint64_t base_oid;
305     uint64_t offset;
306     unsigned int data_len;
307     uint8_t flags;
308     uint32_t id;
309     bool create;
310
311     QLIST_ENTRY(AIOReq) aio_siblings;
312 } AIOReq;
313
314 enum AIOCBState {
315     AIOCB_WRITE_UDATA,
316     AIOCB_READ_UDATA,
317     AIOCB_FLUSH_CACHE,
318     AIOCB_DISCARD_OBJ,
319 };
320
321 #define AIOCBOverlapping(x, y)                                 \
322     (!(x->max_affect_data_idx < y->min_affect_data_idx          \
323        || y->max_affect_data_idx < x->min_affect_data_idx))
324
325 struct SheepdogAIOCB {
326     BlockAIOCB common;
327
328     QEMUIOVector *qiov;
329
330     int64_t sector_num;
331     int nb_sectors;
332
333     int ret;
334     enum AIOCBState aiocb_type;
335
336     Coroutine *coroutine;
337     void (*aio_done_func)(SheepdogAIOCB *);
338
339     bool cancelable;
340     int nr_pending;
341
342     uint32_t min_affect_data_idx;
343     uint32_t max_affect_data_idx;
344
345     /*
346      * The difference between affect_data_idx and dirty_data_idx:
347      * affect_data_idx represents range of index of all request types.
348      * dirty_data_idx represents range of index updated by COW requests.
349      * dirty_data_idx is used for updating an inode object.
350      */
351     uint32_t min_dirty_data_idx;
352     uint32_t max_dirty_data_idx;
353
354     QLIST_ENTRY(SheepdogAIOCB) aiocb_siblings;
355 };
356
357 typedef struct BDRVSheepdogState {
358     BlockDriverState *bs;
359     AioContext *aio_context;
360
361     SheepdogInode inode;
362
363     char name[SD_MAX_VDI_LEN];
364     bool is_snapshot;
365     uint32_t cache_flags;
366     bool discard_supported;
367
368     char *host_spec;
369     bool is_unix;
370     int fd;
371
372     CoMutex lock;
373     Coroutine *co_send;
374     Coroutine *co_recv;
375
376     uint32_t aioreq_seq_num;
377
378     /* Every aio request must be linked to either of these queues. */
379     QLIST_HEAD(inflight_aio_head, AIOReq) inflight_aio_head;
380     QLIST_HEAD(failed_aio_head, AIOReq) failed_aio_head;
381
382     CoQueue overlapping_queue;
383     QLIST_HEAD(inflight_aiocb_head, SheepdogAIOCB) inflight_aiocb_head;
384 } BDRVSheepdogState;
385
386 typedef struct BDRVSheepdogReopenState {
387     int fd;
388     int cache_flags;
389 } BDRVSheepdogReopenState;
390
391 static const char * sd_strerror(int err)
392 {
393     int i;
394
395     static const struct {
396         int err;
397         const char *desc;
398     } errors[] = {
399         {SD_RES_SUCCESS, "Success"},
400         {SD_RES_UNKNOWN, "Unknown error"},
401         {SD_RES_NO_OBJ, "No object found"},
402         {SD_RES_EIO, "I/O error"},
403         {SD_RES_VDI_EXIST, "VDI exists already"},
404         {SD_RES_INVALID_PARMS, "Invalid parameters"},
405         {SD_RES_SYSTEM_ERROR, "System error"},
406         {SD_RES_VDI_LOCKED, "VDI is already locked"},
407         {SD_RES_NO_VDI, "No vdi found"},
408         {SD_RES_NO_BASE_VDI, "No base VDI found"},
409         {SD_RES_VDI_READ, "Failed read the requested VDI"},
410         {SD_RES_VDI_WRITE, "Failed to write the requested VDI"},
411         {SD_RES_BASE_VDI_READ, "Failed to read the base VDI"},
412         {SD_RES_BASE_VDI_WRITE, "Failed to write the base VDI"},
413         {SD_RES_NO_TAG, "Failed to find the requested tag"},
414         {SD_RES_STARTUP, "The system is still booting"},
415         {SD_RES_VDI_NOT_LOCKED, "VDI isn't locked"},
416         {SD_RES_SHUTDOWN, "The system is shutting down"},
417         {SD_RES_NO_MEM, "Out of memory on the server"},
418         {SD_RES_FULL_VDI, "We already have the maximum vdis"},
419         {SD_RES_VER_MISMATCH, "Protocol version mismatch"},
420         {SD_RES_NO_SPACE, "Server has no space for new objects"},
421         {SD_RES_WAIT_FOR_FORMAT, "Sheepdog is waiting for a format operation"},
422         {SD_RES_WAIT_FOR_JOIN, "Sheepdog is waiting for other nodes joining"},
423         {SD_RES_JOIN_FAILED, "Target node had failed to join sheepdog"},
424         {SD_RES_HALT, "Sheepdog is stopped serving IO request"},
425         {SD_RES_READONLY, "Object is read-only"},
426     };
427
428     for (i = 0; i < ARRAY_SIZE(errors); ++i) {
429         if (errors[i].err == err) {
430             return errors[i].desc;
431         }
432     }
433
434     return "Invalid error code";
435 }
436
437 /*
438  * Sheepdog I/O handling:
439  *
440  * 1. In sd_co_rw_vector, we send the I/O requests to the server and
441  *    link the requests to the inflight_list in the
442  *    BDRVSheepdogState.  The function exits without waiting for
443  *    receiving the response.
444  *
445  * 2. We receive the response in aio_read_response, the fd handler to
446  *    the sheepdog connection.  If metadata update is needed, we send
447  *    the write request to the vdi object in sd_write_done, the write
448  *    completion function.  We switch back to sd_co_readv/writev after
449  *    all the requests belonging to the AIOCB are finished.
450  */
451
452 static inline AIOReq *alloc_aio_req(BDRVSheepdogState *s, SheepdogAIOCB *acb,
453                                     uint64_t oid, unsigned int data_len,
454                                     uint64_t offset, uint8_t flags, bool create,
455                                     uint64_t base_oid, unsigned int iov_offset)
456 {
457     AIOReq *aio_req;
458
459     aio_req = g_malloc(sizeof(*aio_req));
460     aio_req->aiocb = acb;
461     aio_req->iov_offset = iov_offset;
462     aio_req->oid = oid;
463     aio_req->base_oid = base_oid;
464     aio_req->offset = offset;
465     aio_req->data_len = data_len;
466     aio_req->flags = flags;
467     aio_req->id = s->aioreq_seq_num++;
468     aio_req->create = create;
469
470     acb->nr_pending++;
471     return aio_req;
472 }
473
474 static inline void free_aio_req(BDRVSheepdogState *s, AIOReq *aio_req)
475 {
476     SheepdogAIOCB *acb = aio_req->aiocb;
477
478     acb->cancelable = false;
479     QLIST_REMOVE(aio_req, aio_siblings);
480     g_free(aio_req);
481
482     acb->nr_pending--;
483 }
484
485 static void coroutine_fn sd_finish_aiocb(SheepdogAIOCB *acb)
486 {
487     qemu_coroutine_enter(acb->coroutine, NULL);
488     qemu_aio_unref(acb);
489 }
490
491 /*
492  * Check whether the specified acb can be canceled
493  *
494  * We can cancel aio when any request belonging to the acb is:
495  *  - Not processed by the sheepdog server.
496  *  - Not linked to the inflight queue.
497  */
498 static bool sd_acb_cancelable(const SheepdogAIOCB *acb)
499 {
500     BDRVSheepdogState *s = acb->common.bs->opaque;
501     AIOReq *aioreq;
502
503     if (!acb->cancelable) {
504         return false;
505     }
506
507     QLIST_FOREACH(aioreq, &s->inflight_aio_head, aio_siblings) {
508         if (aioreq->aiocb == acb) {
509             return false;
510         }
511     }
512
513     return true;
514 }
515
516 static void sd_aio_cancel(BlockAIOCB *blockacb)
517 {
518     SheepdogAIOCB *acb = (SheepdogAIOCB *)blockacb;
519     BDRVSheepdogState *s = acb->common.bs->opaque;
520     AIOReq *aioreq, *next;
521
522     if (sd_acb_cancelable(acb)) {
523         /* Remove outstanding requests from failed queue.  */
524         QLIST_FOREACH_SAFE(aioreq, &s->failed_aio_head, aio_siblings,
525                            next) {
526             if (aioreq->aiocb == acb) {
527                 free_aio_req(s, aioreq);
528             }
529         }
530
531         assert(acb->nr_pending == 0);
532         if (acb->common.cb) {
533             acb->common.cb(acb->common.opaque, -ECANCELED);
534         }
535         sd_finish_aiocb(acb);
536     }
537 }
538
539 static const AIOCBInfo sd_aiocb_info = {
540     .aiocb_size     = sizeof(SheepdogAIOCB),
541     .cancel_async   = sd_aio_cancel,
542 };
543
544 static SheepdogAIOCB *sd_aio_setup(BlockDriverState *bs, QEMUIOVector *qiov,
545                                    int64_t sector_num, int nb_sectors)
546 {
547     SheepdogAIOCB *acb;
548     uint32_t object_size;
549     BDRVSheepdogState *s = bs->opaque;
550
551     object_size = (UINT32_C(1) << s->inode.block_size_shift);
552
553     acb = qemu_aio_get(&sd_aiocb_info, bs, NULL, NULL);
554
555     acb->qiov = qiov;
556
557     acb->sector_num = sector_num;
558     acb->nb_sectors = nb_sectors;
559
560     acb->aio_done_func = NULL;
561     acb->cancelable = true;
562     acb->coroutine = qemu_coroutine_self();
563     acb->ret = 0;
564     acb->nr_pending = 0;
565
566     acb->min_affect_data_idx = acb->sector_num * BDRV_SECTOR_SIZE / object_size;
567     acb->max_affect_data_idx = (acb->sector_num * BDRV_SECTOR_SIZE +
568                               acb->nb_sectors * BDRV_SECTOR_SIZE) / object_size;
569
570     acb->min_dirty_data_idx = UINT32_MAX;
571     acb->max_dirty_data_idx = 0;
572
573     return acb;
574 }
575
576 /* Return -EIO in case of error, file descriptor on success */
577 static int connect_to_sdog(BDRVSheepdogState *s, Error **errp)
578 {
579     int fd;
580
581     if (s->is_unix) {
582         fd = unix_connect(s->host_spec, errp);
583     } else {
584         fd = inet_connect(s->host_spec, errp);
585
586         if (fd >= 0) {
587             int ret = socket_set_nodelay(fd);
588             if (ret < 0) {
589                 error_report("%s", strerror(errno));
590             }
591         }
592     }
593
594     if (fd >= 0) {
595         qemu_set_nonblock(fd);
596     } else {
597         fd = -EIO;
598     }
599
600     return fd;
601 }
602
603 /* Return 0 on success and -errno in case of error */
604 static coroutine_fn int send_co_req(int sockfd, SheepdogReq *hdr, void *data,
605                                     unsigned int *wlen)
606 {
607     int ret;
608
609     ret = qemu_co_send(sockfd, hdr, sizeof(*hdr));
610     if (ret != sizeof(*hdr)) {
611         error_report("failed to send a req, %s", strerror(errno));
612         ret = -socket_error();
613         return ret;
614     }
615
616     ret = qemu_co_send(sockfd, data, *wlen);
617     if (ret != *wlen) {
618         ret = -socket_error();
619         error_report("failed to send a req, %s", strerror(errno));
620     }
621
622     return ret;
623 }
624
625 static void restart_co_req(void *opaque)
626 {
627     Coroutine *co = opaque;
628
629     qemu_coroutine_enter(co, NULL);
630 }
631
632 typedef struct SheepdogReqCo {
633     int sockfd;
634     AioContext *aio_context;
635     SheepdogReq *hdr;
636     void *data;
637     unsigned int *wlen;
638     unsigned int *rlen;
639     int ret;
640     bool finished;
641 } SheepdogReqCo;
642
643 static coroutine_fn void do_co_req(void *opaque)
644 {
645     int ret;
646     Coroutine *co;
647     SheepdogReqCo *srco = opaque;
648     int sockfd = srco->sockfd;
649     SheepdogReq *hdr = srco->hdr;
650     void *data = srco->data;
651     unsigned int *wlen = srco->wlen;
652     unsigned int *rlen = srco->rlen;
653
654     co = qemu_coroutine_self();
655     aio_set_fd_handler(srco->aio_context, sockfd, false,
656                        NULL, restart_co_req, co);
657
658     ret = send_co_req(sockfd, hdr, data, wlen);
659     if (ret < 0) {
660         goto out;
661     }
662
663     aio_set_fd_handler(srco->aio_context, sockfd, false,
664                        restart_co_req, NULL, co);
665
666     ret = qemu_co_recv(sockfd, hdr, sizeof(*hdr));
667     if (ret != sizeof(*hdr)) {
668         error_report("failed to get a rsp, %s", strerror(errno));
669         ret = -errno;
670         goto out;
671     }
672
673     if (*rlen > hdr->data_length) {
674         *rlen = hdr->data_length;
675     }
676
677     if (*rlen) {
678         ret = qemu_co_recv(sockfd, data, *rlen);
679         if (ret != *rlen) {
680             error_report("failed to get the data, %s", strerror(errno));
681             ret = -errno;
682             goto out;
683         }
684     }
685     ret = 0;
686 out:
687     /* there is at most one request for this sockfd, so it is safe to
688      * set each handler to NULL. */
689     aio_set_fd_handler(srco->aio_context, sockfd, false,
690                        NULL, NULL, NULL);
691
692     srco->ret = ret;
693     srco->finished = true;
694 }
695
696 /*
697  * Send the request to the sheep in a synchronous manner.
698  *
699  * Return 0 on success, -errno in case of error.
700  */
701 static int do_req(int sockfd, AioContext *aio_context, SheepdogReq *hdr,
702                   void *data, unsigned int *wlen, unsigned int *rlen)
703 {
704     Coroutine *co;
705     SheepdogReqCo srco = {
706         .sockfd = sockfd,
707         .aio_context = aio_context,
708         .hdr = hdr,
709         .data = data,
710         .wlen = wlen,
711         .rlen = rlen,
712         .ret = 0,
713         .finished = false,
714     };
715
716     if (qemu_in_coroutine()) {
717         do_co_req(&srco);
718     } else {
719         co = qemu_coroutine_create(do_co_req);
720         qemu_coroutine_enter(co, &srco);
721         while (!srco.finished) {
722             aio_poll(aio_context, true);
723         }
724     }
725
726     return srco.ret;
727 }
728
729 static void coroutine_fn add_aio_request(BDRVSheepdogState *s, AIOReq *aio_req,
730                                          struct iovec *iov, int niov,
731                                          enum AIOCBState aiocb_type);
732 static void coroutine_fn resend_aioreq(BDRVSheepdogState *s, AIOReq *aio_req);
733 static int reload_inode(BDRVSheepdogState *s, uint32_t snapid, const char *tag);
734 static int get_sheep_fd(BDRVSheepdogState *s, Error **errp);
735 static void co_write_request(void *opaque);
736
737 static coroutine_fn void reconnect_to_sdog(void *opaque)
738 {
739     BDRVSheepdogState *s = opaque;
740     AIOReq *aio_req, *next;
741
742     aio_set_fd_handler(s->aio_context, s->fd, false, NULL,
743                        NULL, NULL);
744     close(s->fd);
745     s->fd = -1;
746
747     /* Wait for outstanding write requests to be completed. */
748     while (s->co_send != NULL) {
749         co_write_request(opaque);
750     }
751
752     /* Try to reconnect the sheepdog server every one second. */
753     while (s->fd < 0) {
754         Error *local_err = NULL;
755         s->fd = get_sheep_fd(s, &local_err);
756         if (s->fd < 0) {
757             DPRINTF("Wait for connection to be established\n");
758             error_report_err(local_err);
759             co_aio_sleep_ns(bdrv_get_aio_context(s->bs), QEMU_CLOCK_REALTIME,
760                             1000000000ULL);
761         }
762     };
763
764     /*
765      * Now we have to resend all the request in the inflight queue.  However,
766      * resend_aioreq() can yield and newly created requests can be added to the
767      * inflight queue before the coroutine is resumed.  To avoid mixing them, we
768      * have to move all the inflight requests to the failed queue before
769      * resend_aioreq() is called.
770      */
771     QLIST_FOREACH_SAFE(aio_req, &s->inflight_aio_head, aio_siblings, next) {
772         QLIST_REMOVE(aio_req, aio_siblings);
773         QLIST_INSERT_HEAD(&s->failed_aio_head, aio_req, aio_siblings);
774     }
775
776     /* Resend all the failed aio requests. */
777     while (!QLIST_EMPTY(&s->failed_aio_head)) {
778         aio_req = QLIST_FIRST(&s->failed_aio_head);
779         QLIST_REMOVE(aio_req, aio_siblings);
780         QLIST_INSERT_HEAD(&s->inflight_aio_head, aio_req, aio_siblings);
781         resend_aioreq(s, aio_req);
782     }
783 }
784
785 /*
786  * Receive responses of the I/O requests.
787  *
788  * This function is registered as a fd handler, and called from the
789  * main loop when s->fd is ready for reading responses.
790  */
791 static void coroutine_fn aio_read_response(void *opaque)
792 {
793     SheepdogObjRsp rsp;
794     BDRVSheepdogState *s = opaque;
795     int fd = s->fd;
796     int ret;
797     AIOReq *aio_req = NULL;
798     SheepdogAIOCB *acb;
799     uint64_t idx;
800
801     /* read a header */
802     ret = qemu_co_recv(fd, &rsp, sizeof(rsp));
803     if (ret != sizeof(rsp)) {
804         error_report("failed to get the header, %s", strerror(errno));
805         goto err;
806     }
807
808     /* find the right aio_req from the inflight aio list */
809     QLIST_FOREACH(aio_req, &s->inflight_aio_head, aio_siblings) {
810         if (aio_req->id == rsp.id) {
811             break;
812         }
813     }
814     if (!aio_req) {
815         error_report("cannot find aio_req %x", rsp.id);
816         goto err;
817     }
818
819     acb = aio_req->aiocb;
820
821     switch (acb->aiocb_type) {
822     case AIOCB_WRITE_UDATA:
823         /* this coroutine context is no longer suitable for co_recv
824          * because we may send data to update vdi objects */
825         s->co_recv = NULL;
826         if (!is_data_obj(aio_req->oid)) {
827             break;
828         }
829         idx = data_oid_to_idx(aio_req->oid);
830
831         if (aio_req->create) {
832             /*
833              * If the object is newly created one, we need to update
834              * the vdi object (metadata object).  min_dirty_data_idx
835              * and max_dirty_data_idx are changed to include updated
836              * index between them.
837              */
838             if (rsp.result == SD_RES_SUCCESS) {
839                 s->inode.data_vdi_id[idx] = s->inode.vdi_id;
840                 acb->max_dirty_data_idx = MAX(idx, acb->max_dirty_data_idx);
841                 acb->min_dirty_data_idx = MIN(idx, acb->min_dirty_data_idx);
842             }
843         }
844         break;
845     case AIOCB_READ_UDATA:
846         ret = qemu_co_recvv(fd, acb->qiov->iov, acb->qiov->niov,
847                             aio_req->iov_offset, rsp.data_length);
848         if (ret != rsp.data_length) {
849             error_report("failed to get the data, %s", strerror(errno));
850             goto err;
851         }
852         break;
853     case AIOCB_FLUSH_CACHE:
854         if (rsp.result == SD_RES_INVALID_PARMS) {
855             DPRINTF("disable cache since the server doesn't support it\n");
856             s->cache_flags = SD_FLAG_CMD_DIRECT;
857             rsp.result = SD_RES_SUCCESS;
858         }
859         break;
860     case AIOCB_DISCARD_OBJ:
861         switch (rsp.result) {
862         case SD_RES_INVALID_PARMS:
863             error_report("sheep(%s) doesn't support discard command",
864                          s->host_spec);
865             rsp.result = SD_RES_SUCCESS;
866             s->discard_supported = false;
867             break;
868         default:
869             break;
870         }
871     }
872
873     switch (rsp.result) {
874     case SD_RES_SUCCESS:
875         break;
876     case SD_RES_READONLY:
877         if (s->inode.vdi_id == oid_to_vid(aio_req->oid)) {
878             ret = reload_inode(s, 0, "");
879             if (ret < 0) {
880                 goto err;
881             }
882         }
883         if (is_data_obj(aio_req->oid)) {
884             aio_req->oid = vid_to_data_oid(s->inode.vdi_id,
885                                            data_oid_to_idx(aio_req->oid));
886         } else {
887             aio_req->oid = vid_to_vdi_oid(s->inode.vdi_id);
888         }
889         resend_aioreq(s, aio_req);
890         goto out;
891     default:
892         acb->ret = -EIO;
893         error_report("%s", sd_strerror(rsp.result));
894         break;
895     }
896
897     free_aio_req(s, aio_req);
898     if (!acb->nr_pending) {
899         /*
900          * We've finished all requests which belong to the AIOCB, so
901          * we can switch back to sd_co_readv/writev now.
902          */
903         acb->aio_done_func(acb);
904     }
905 out:
906     s->co_recv = NULL;
907     return;
908 err:
909     s->co_recv = NULL;
910     reconnect_to_sdog(opaque);
911 }
912
913 static void co_read_response(void *opaque)
914 {
915     BDRVSheepdogState *s = opaque;
916
917     if (!s->co_recv) {
918         s->co_recv = qemu_coroutine_create(aio_read_response);
919     }
920
921     qemu_coroutine_enter(s->co_recv, opaque);
922 }
923
924 static void co_write_request(void *opaque)
925 {
926     BDRVSheepdogState *s = opaque;
927
928     qemu_coroutine_enter(s->co_send, NULL);
929 }
930
931 /*
932  * Return a socket descriptor to read/write objects.
933  *
934  * We cannot use this descriptor for other operations because
935  * the block driver may be on waiting response from the server.
936  */
937 static int get_sheep_fd(BDRVSheepdogState *s, Error **errp)
938 {
939     int fd;
940
941     fd = connect_to_sdog(s, errp);
942     if (fd < 0) {
943         return fd;
944     }
945
946     aio_set_fd_handler(s->aio_context, fd, false,
947                        co_read_response, NULL, s);
948     return fd;
949 }
950
951 static int sd_parse_uri(BDRVSheepdogState *s, const char *filename,
952                         char *vdi, uint32_t *snapid, char *tag)
953 {
954     URI *uri;
955     QueryParams *qp = NULL;
956     int ret = 0;
957
958     uri = uri_parse(filename);
959     if (!uri) {
960         return -EINVAL;
961     }
962
963     /* transport */
964     if (!strcmp(uri->scheme, "sheepdog")) {
965         s->is_unix = false;
966     } else if (!strcmp(uri->scheme, "sheepdog+tcp")) {
967         s->is_unix = false;
968     } else if (!strcmp(uri->scheme, "sheepdog+unix")) {
969         s->is_unix = true;
970     } else {
971         ret = -EINVAL;
972         goto out;
973     }
974
975     if (uri->path == NULL || !strcmp(uri->path, "/")) {
976         ret = -EINVAL;
977         goto out;
978     }
979     pstrcpy(vdi, SD_MAX_VDI_LEN, uri->path + 1);
980
981     qp = query_params_parse(uri->query);
982     if (qp->n > 1 || (s->is_unix && !qp->n) || (!s->is_unix && qp->n)) {
983         ret = -EINVAL;
984         goto out;
985     }
986
987     if (s->is_unix) {
988         /* sheepdog+unix:///vdiname?socket=path */
989         if (uri->server || uri->port || strcmp(qp->p[0].name, "socket")) {
990             ret = -EINVAL;
991             goto out;
992         }
993         s->host_spec = g_strdup(qp->p[0].value);
994     } else {
995         /* sheepdog[+tcp]://[host:port]/vdiname */
996         s->host_spec = g_strdup_printf("%s:%d", uri->server ?: SD_DEFAULT_ADDR,
997                                        uri->port ?: SD_DEFAULT_PORT);
998     }
999
1000     /* snapshot tag */
1001     if (uri->fragment) {
1002         *snapid = strtoul(uri->fragment, NULL, 10);
1003         if (*snapid == 0) {
1004             pstrcpy(tag, SD_MAX_VDI_TAG_LEN, uri->fragment);
1005         }
1006     } else {
1007         *snapid = CURRENT_VDI_ID; /* search current vdi */
1008     }
1009
1010 out:
1011     if (qp) {
1012         query_params_free(qp);
1013     }
1014     uri_free(uri);
1015     return ret;
1016 }
1017
1018 /*
1019  * Parse a filename (old syntax)
1020  *
1021  * filename must be one of the following formats:
1022  *   1. [vdiname]
1023  *   2. [vdiname]:[snapid]
1024  *   3. [vdiname]:[tag]
1025  *   4. [hostname]:[port]:[vdiname]
1026  *   5. [hostname]:[port]:[vdiname]:[snapid]
1027  *   6. [hostname]:[port]:[vdiname]:[tag]
1028  *
1029  * You can boot from the snapshot images by specifying `snapid` or
1030  * `tag'.
1031  *
1032  * You can run VMs outside the Sheepdog cluster by specifying
1033  * `hostname' and `port' (experimental).
1034  */
1035 static int parse_vdiname(BDRVSheepdogState *s, const char *filename,
1036                          char *vdi, uint32_t *snapid, char *tag)
1037 {
1038     char *p, *q, *uri;
1039     const char *host_spec, *vdi_spec;
1040     int nr_sep, ret;
1041
1042     strstart(filename, "sheepdog:", (const char **)&filename);
1043     p = q = g_strdup(filename);
1044
1045     /* count the number of separators */
1046     nr_sep = 0;
1047     while (*p) {
1048         if (*p == ':') {
1049             nr_sep++;
1050         }
1051         p++;
1052     }
1053     p = q;
1054
1055     /* use the first two tokens as host_spec. */
1056     if (nr_sep >= 2) {
1057         host_spec = p;
1058         p = strchr(p, ':');
1059         p++;
1060         p = strchr(p, ':');
1061         *p++ = '\0';
1062     } else {
1063         host_spec = "";
1064     }
1065
1066     vdi_spec = p;
1067
1068     p = strchr(vdi_spec, ':');
1069     if (p) {
1070         *p++ = '#';
1071     }
1072
1073     uri = g_strdup_printf("sheepdog://%s/%s", host_spec, vdi_spec);
1074
1075     ret = sd_parse_uri(s, uri, vdi, snapid, tag);
1076
1077     g_free(q);
1078     g_free(uri);
1079
1080     return ret;
1081 }
1082
1083 static int find_vdi_name(BDRVSheepdogState *s, const char *filename,
1084                          uint32_t snapid, const char *tag, uint32_t *vid,
1085                          bool lock, Error **errp)
1086 {
1087     int ret, fd;
1088     SheepdogVdiReq hdr;
1089     SheepdogVdiRsp *rsp = (SheepdogVdiRsp *)&hdr;
1090     unsigned int wlen, rlen = 0;
1091     char buf[SD_MAX_VDI_LEN + SD_MAX_VDI_TAG_LEN];
1092
1093     fd = connect_to_sdog(s, errp);
1094     if (fd < 0) {
1095         return fd;
1096     }
1097
1098     /* This pair of strncpy calls ensures that the buffer is zero-filled,
1099      * which is desirable since we'll soon be sending those bytes, and
1100      * don't want the send_req to read uninitialized data.
1101      */
1102     strncpy(buf, filename, SD_MAX_VDI_LEN);
1103     strncpy(buf + SD_MAX_VDI_LEN, tag, SD_MAX_VDI_TAG_LEN);
1104
1105     memset(&hdr, 0, sizeof(hdr));
1106     if (lock) {
1107         hdr.opcode = SD_OP_LOCK_VDI;
1108         hdr.type = LOCK_TYPE_NORMAL;
1109     } else {
1110         hdr.opcode = SD_OP_GET_VDI_INFO;
1111     }
1112     wlen = SD_MAX_VDI_LEN + SD_MAX_VDI_TAG_LEN;
1113     hdr.proto_ver = SD_PROTO_VER;
1114     hdr.data_length = wlen;
1115     hdr.snapid = snapid;
1116     hdr.flags = SD_FLAG_CMD_WRITE;
1117
1118     ret = do_req(fd, s->aio_context, (SheepdogReq *)&hdr, buf, &wlen, &rlen);
1119     if (ret) {
1120         error_setg_errno(errp, -ret, "cannot get vdi info");
1121         goto out;
1122     }
1123
1124     if (rsp->result != SD_RES_SUCCESS) {
1125         error_setg(errp, "cannot get vdi info, %s, %s %" PRIu32 " %s",
1126                    sd_strerror(rsp->result), filename, snapid, tag);
1127         if (rsp->result == SD_RES_NO_VDI) {
1128             ret = -ENOENT;
1129         } else if (rsp->result == SD_RES_VDI_LOCKED) {
1130             ret = -EBUSY;
1131         } else {
1132             ret = -EIO;
1133         }
1134         goto out;
1135     }
1136     *vid = rsp->vdi_id;
1137
1138     ret = 0;
1139 out:
1140     closesocket(fd);
1141     return ret;
1142 }
1143
1144 static void coroutine_fn add_aio_request(BDRVSheepdogState *s, AIOReq *aio_req,
1145                                          struct iovec *iov, int niov,
1146                                          enum AIOCBState aiocb_type)
1147 {
1148     int nr_copies = s->inode.nr_copies;
1149     SheepdogObjReq hdr;
1150     unsigned int wlen = 0;
1151     int ret;
1152     uint64_t oid = aio_req->oid;
1153     unsigned int datalen = aio_req->data_len;
1154     uint64_t offset = aio_req->offset;
1155     uint8_t flags = aio_req->flags;
1156     uint64_t old_oid = aio_req->base_oid;
1157     bool create = aio_req->create;
1158
1159     if (!nr_copies) {
1160         error_report("bug");
1161     }
1162
1163     memset(&hdr, 0, sizeof(hdr));
1164
1165     switch (aiocb_type) {
1166     case AIOCB_FLUSH_CACHE:
1167         hdr.opcode = SD_OP_FLUSH_VDI;
1168         break;
1169     case AIOCB_READ_UDATA:
1170         hdr.opcode = SD_OP_READ_OBJ;
1171         hdr.flags = flags;
1172         break;
1173     case AIOCB_WRITE_UDATA:
1174         if (create) {
1175             hdr.opcode = SD_OP_CREATE_AND_WRITE_OBJ;
1176         } else {
1177             hdr.opcode = SD_OP_WRITE_OBJ;
1178         }
1179         wlen = datalen;
1180         hdr.flags = SD_FLAG_CMD_WRITE | flags;
1181         break;
1182     case AIOCB_DISCARD_OBJ:
1183         hdr.opcode = SD_OP_WRITE_OBJ;
1184         hdr.flags = SD_FLAG_CMD_WRITE | flags;
1185         s->inode.data_vdi_id[data_oid_to_idx(oid)] = 0;
1186         offset = offsetof(SheepdogInode,
1187                           data_vdi_id[data_oid_to_idx(oid)]);
1188         oid = vid_to_vdi_oid(s->inode.vdi_id);
1189         wlen = datalen = sizeof(uint32_t);
1190         break;
1191     }
1192
1193     if (s->cache_flags) {
1194         hdr.flags |= s->cache_flags;
1195     }
1196
1197     hdr.oid = oid;
1198     hdr.cow_oid = old_oid;
1199     hdr.copies = s->inode.nr_copies;
1200
1201     hdr.data_length = datalen;
1202     hdr.offset = offset;
1203
1204     hdr.id = aio_req->id;
1205
1206     qemu_co_mutex_lock(&s->lock);
1207     s->co_send = qemu_coroutine_self();
1208     aio_set_fd_handler(s->aio_context, s->fd, false,
1209                        co_read_response, co_write_request, s);
1210     socket_set_cork(s->fd, 1);
1211
1212     /* send a header */
1213     ret = qemu_co_send(s->fd, &hdr, sizeof(hdr));
1214     if (ret != sizeof(hdr)) {
1215         error_report("failed to send a req, %s", strerror(errno));
1216         goto out;
1217     }
1218
1219     if (wlen) {
1220         ret = qemu_co_sendv(s->fd, iov, niov, aio_req->iov_offset, wlen);
1221         if (ret != wlen) {
1222             error_report("failed to send a data, %s", strerror(errno));
1223         }
1224     }
1225 out:
1226     socket_set_cork(s->fd, 0);
1227     aio_set_fd_handler(s->aio_context, s->fd, false,
1228                        co_read_response, NULL, s);
1229     s->co_send = NULL;
1230     qemu_co_mutex_unlock(&s->lock);
1231 }
1232
1233 static int read_write_object(int fd, AioContext *aio_context, char *buf,
1234                              uint64_t oid, uint8_t copies,
1235                              unsigned int datalen, uint64_t offset,
1236                              bool write, bool create, uint32_t cache_flags)
1237 {
1238     SheepdogObjReq hdr;
1239     SheepdogObjRsp *rsp = (SheepdogObjRsp *)&hdr;
1240     unsigned int wlen, rlen;
1241     int ret;
1242
1243     memset(&hdr, 0, sizeof(hdr));
1244
1245     if (write) {
1246         wlen = datalen;
1247         rlen = 0;
1248         hdr.flags = SD_FLAG_CMD_WRITE;
1249         if (create) {
1250             hdr.opcode = SD_OP_CREATE_AND_WRITE_OBJ;
1251         } else {
1252             hdr.opcode = SD_OP_WRITE_OBJ;
1253         }
1254     } else {
1255         wlen = 0;
1256         rlen = datalen;
1257         hdr.opcode = SD_OP_READ_OBJ;
1258     }
1259
1260     hdr.flags |= cache_flags;
1261
1262     hdr.oid = oid;
1263     hdr.data_length = datalen;
1264     hdr.offset = offset;
1265     hdr.copies = copies;
1266
1267     ret = do_req(fd, aio_context, (SheepdogReq *)&hdr, buf, &wlen, &rlen);
1268     if (ret) {
1269         error_report("failed to send a request to the sheep");
1270         return ret;
1271     }
1272
1273     switch (rsp->result) {
1274     case SD_RES_SUCCESS:
1275         return 0;
1276     default:
1277         error_report("%s", sd_strerror(rsp->result));
1278         return -EIO;
1279     }
1280 }
1281
1282 static int read_object(int fd, AioContext *aio_context, char *buf,
1283                        uint64_t oid, uint8_t copies,
1284                        unsigned int datalen, uint64_t offset,
1285                        uint32_t cache_flags)
1286 {
1287     return read_write_object(fd, aio_context, buf, oid, copies,
1288                              datalen, offset, false,
1289                              false, cache_flags);
1290 }
1291
1292 static int write_object(int fd, AioContext *aio_context, char *buf,
1293                         uint64_t oid, uint8_t copies,
1294                         unsigned int datalen, uint64_t offset, bool create,
1295                         uint32_t cache_flags)
1296 {
1297     return read_write_object(fd, aio_context, buf, oid, copies,
1298                              datalen, offset, true,
1299                              create, cache_flags);
1300 }
1301
1302 /* update inode with the latest state */
1303 static int reload_inode(BDRVSheepdogState *s, uint32_t snapid, const char *tag)
1304 {
1305     Error *local_err = NULL;
1306     SheepdogInode *inode;
1307     int ret = 0, fd;
1308     uint32_t vid = 0;
1309
1310     fd = connect_to_sdog(s, &local_err);
1311     if (fd < 0) {
1312         error_report_err(local_err);
1313         return -EIO;
1314     }
1315
1316     inode = g_malloc(SD_INODE_HEADER_SIZE);
1317
1318     ret = find_vdi_name(s, s->name, snapid, tag, &vid, false, &local_err);
1319     if (ret) {
1320         error_report_err(local_err);
1321         goto out;
1322     }
1323
1324     ret = read_object(fd, s->aio_context, (char *)inode, vid_to_vdi_oid(vid),
1325                       s->inode.nr_copies, SD_INODE_HEADER_SIZE, 0,
1326                       s->cache_flags);
1327     if (ret < 0) {
1328         goto out;
1329     }
1330
1331     if (inode->vdi_id != s->inode.vdi_id) {
1332         memcpy(&s->inode, inode, SD_INODE_HEADER_SIZE);
1333     }
1334
1335 out:
1336     g_free(inode);
1337     closesocket(fd);
1338
1339     return ret;
1340 }
1341
1342 static void coroutine_fn resend_aioreq(BDRVSheepdogState *s, AIOReq *aio_req)
1343 {
1344     SheepdogAIOCB *acb = aio_req->aiocb;
1345
1346     aio_req->create = false;
1347
1348     /* check whether this request becomes a CoW one */
1349     if (acb->aiocb_type == AIOCB_WRITE_UDATA && is_data_obj(aio_req->oid)) {
1350         int idx = data_oid_to_idx(aio_req->oid);
1351
1352         if (is_data_obj_writable(&s->inode, idx)) {
1353             goto out;
1354         }
1355
1356         if (s->inode.data_vdi_id[idx]) {
1357             aio_req->base_oid = vid_to_data_oid(s->inode.data_vdi_id[idx], idx);
1358             aio_req->flags |= SD_FLAG_CMD_COW;
1359         }
1360         aio_req->create = true;
1361     }
1362 out:
1363     if (is_data_obj(aio_req->oid)) {
1364         add_aio_request(s, aio_req, acb->qiov->iov, acb->qiov->niov,
1365                         acb->aiocb_type);
1366     } else {
1367         struct iovec iov;
1368         iov.iov_base = &s->inode;
1369         iov.iov_len = sizeof(s->inode);
1370         add_aio_request(s, aio_req, &iov, 1, AIOCB_WRITE_UDATA);
1371     }
1372 }
1373
1374 static void sd_detach_aio_context(BlockDriverState *bs)
1375 {
1376     BDRVSheepdogState *s = bs->opaque;
1377
1378     aio_set_fd_handler(s->aio_context, s->fd, false, NULL,
1379                        NULL, NULL);
1380 }
1381
1382 static void sd_attach_aio_context(BlockDriverState *bs,
1383                                   AioContext *new_context)
1384 {
1385     BDRVSheepdogState *s = bs->opaque;
1386
1387     s->aio_context = new_context;
1388     aio_set_fd_handler(new_context, s->fd, false,
1389                        co_read_response, NULL, s);
1390 }
1391
1392 /* TODO Convert to fine grained options */
1393 static QemuOptsList runtime_opts = {
1394     .name = "sheepdog",
1395     .head = QTAILQ_HEAD_INITIALIZER(runtime_opts.head),
1396     .desc = {
1397         {
1398             .name = "filename",
1399             .type = QEMU_OPT_STRING,
1400             .help = "URL to the sheepdog image",
1401         },
1402         { /* end of list */ }
1403     },
1404 };
1405
1406 static int sd_open(BlockDriverState *bs, QDict *options, int flags,
1407                    Error **errp)
1408 {
1409     int ret, fd;
1410     uint32_t vid = 0;
1411     BDRVSheepdogState *s = bs->opaque;
1412     char vdi[SD_MAX_VDI_LEN], tag[SD_MAX_VDI_TAG_LEN];
1413     uint32_t snapid;
1414     char *buf = NULL;
1415     QemuOpts *opts;
1416     Error *local_err = NULL;
1417     const char *filename;
1418
1419     s->bs = bs;
1420     s->aio_context = bdrv_get_aio_context(bs);
1421
1422     opts = qemu_opts_create(&runtime_opts, NULL, 0, &error_abort);
1423     qemu_opts_absorb_qdict(opts, options, &local_err);
1424     if (local_err) {
1425         error_propagate(errp, local_err);
1426         ret = -EINVAL;
1427         goto out;
1428     }
1429
1430     filename = qemu_opt_get(opts, "filename");
1431
1432     QLIST_INIT(&s->inflight_aio_head);
1433     QLIST_INIT(&s->failed_aio_head);
1434     QLIST_INIT(&s->inflight_aiocb_head);
1435     s->fd = -1;
1436
1437     memset(vdi, 0, sizeof(vdi));
1438     memset(tag, 0, sizeof(tag));
1439
1440     if (strstr(filename, "://")) {
1441         ret = sd_parse_uri(s, filename, vdi, &snapid, tag);
1442     } else {
1443         ret = parse_vdiname(s, filename, vdi, &snapid, tag);
1444     }
1445     if (ret < 0) {
1446         error_setg(errp, "Can't parse filename");
1447         goto out;
1448     }
1449     s->fd = get_sheep_fd(s, errp);
1450     if (s->fd < 0) {
1451         ret = s->fd;
1452         goto out;
1453     }
1454
1455     ret = find_vdi_name(s, vdi, snapid, tag, &vid, true, errp);
1456     if (ret) {
1457         goto out;
1458     }
1459
1460     /*
1461      * QEMU block layer emulates writethrough cache as 'writeback + flush', so
1462      * we always set SD_FLAG_CMD_CACHE (writeback cache) as default.
1463      */
1464     s->cache_flags = SD_FLAG_CMD_CACHE;
1465     if (flags & BDRV_O_NOCACHE) {
1466         s->cache_flags = SD_FLAG_CMD_DIRECT;
1467     }
1468     s->discard_supported = true;
1469
1470     if (snapid || tag[0] != '\0') {
1471         DPRINTF("%" PRIx32 " snapshot inode was open.\n", vid);
1472         s->is_snapshot = true;
1473     }
1474
1475     fd = connect_to_sdog(s, errp);
1476     if (fd < 0) {
1477         ret = fd;
1478         goto out;
1479     }
1480
1481     buf = g_malloc(SD_INODE_SIZE);
1482     ret = read_object(fd, s->aio_context, buf, vid_to_vdi_oid(vid),
1483                       0, SD_INODE_SIZE, 0, s->cache_flags);
1484
1485     closesocket(fd);
1486
1487     if (ret) {
1488         error_setg(errp, "Can't read snapshot inode");
1489         goto out;
1490     }
1491
1492     memcpy(&s->inode, buf, sizeof(s->inode));
1493
1494     bs->total_sectors = s->inode.vdi_size / BDRV_SECTOR_SIZE;
1495     pstrcpy(s->name, sizeof(s->name), vdi);
1496     qemu_co_mutex_init(&s->lock);
1497     qemu_co_queue_init(&s->overlapping_queue);
1498     qemu_opts_del(opts);
1499     g_free(buf);
1500     return 0;
1501 out:
1502     aio_set_fd_handler(bdrv_get_aio_context(bs), s->fd,
1503                        false, NULL, NULL, NULL);
1504     if (s->fd >= 0) {
1505         closesocket(s->fd);
1506     }
1507     qemu_opts_del(opts);
1508     g_free(buf);
1509     return ret;
1510 }
1511
1512 static int sd_reopen_prepare(BDRVReopenState *state, BlockReopenQueue *queue,
1513                              Error **errp)
1514 {
1515     BDRVSheepdogState *s = state->bs->opaque;
1516     BDRVSheepdogReopenState *re_s;
1517     int ret = 0;
1518
1519     re_s = state->opaque = g_new0(BDRVSheepdogReopenState, 1);
1520
1521     re_s->cache_flags = SD_FLAG_CMD_CACHE;
1522     if (state->flags & BDRV_O_NOCACHE) {
1523         re_s->cache_flags = SD_FLAG_CMD_DIRECT;
1524     }
1525
1526     re_s->fd = get_sheep_fd(s, errp);
1527     if (re_s->fd < 0) {
1528         ret = re_s->fd;
1529         return ret;
1530     }
1531
1532     return ret;
1533 }
1534
1535 static void sd_reopen_commit(BDRVReopenState *state)
1536 {
1537     BDRVSheepdogReopenState *re_s = state->opaque;
1538     BDRVSheepdogState *s = state->bs->opaque;
1539
1540     if (s->fd) {
1541         aio_set_fd_handler(s->aio_context, s->fd, false,
1542                            NULL, NULL, NULL);
1543         closesocket(s->fd);
1544     }
1545
1546     s->fd = re_s->fd;
1547     s->cache_flags = re_s->cache_flags;
1548
1549     g_free(state->opaque);
1550     state->opaque = NULL;
1551
1552     return;
1553 }
1554
1555 static void sd_reopen_abort(BDRVReopenState *state)
1556 {
1557     BDRVSheepdogReopenState *re_s = state->opaque;
1558     BDRVSheepdogState *s = state->bs->opaque;
1559
1560     if (re_s == NULL) {
1561         return;
1562     }
1563
1564     if (re_s->fd) {
1565         aio_set_fd_handler(s->aio_context, re_s->fd, false,
1566                            NULL, NULL, NULL);
1567         closesocket(re_s->fd);
1568     }
1569
1570     g_free(state->opaque);
1571     state->opaque = NULL;
1572
1573     return;
1574 }
1575
1576 static int do_sd_create(BDRVSheepdogState *s, uint32_t *vdi_id, int snapshot,
1577                         Error **errp)
1578 {
1579     SheepdogVdiReq hdr;
1580     SheepdogVdiRsp *rsp = (SheepdogVdiRsp *)&hdr;
1581     int fd, ret;
1582     unsigned int wlen, rlen = 0;
1583     char buf[SD_MAX_VDI_LEN];
1584
1585     fd = connect_to_sdog(s, errp);
1586     if (fd < 0) {
1587         return fd;
1588     }
1589
1590     /* FIXME: would it be better to fail (e.g., return -EIO) when filename
1591      * does not fit in buf?  For now, just truncate and avoid buffer overrun.
1592      */
1593     memset(buf, 0, sizeof(buf));
1594     pstrcpy(buf, sizeof(buf), s->name);
1595
1596     memset(&hdr, 0, sizeof(hdr));
1597     hdr.opcode = SD_OP_NEW_VDI;
1598     hdr.base_vdi_id = s->inode.vdi_id;
1599
1600     wlen = SD_MAX_VDI_LEN;
1601
1602     hdr.flags = SD_FLAG_CMD_WRITE;
1603     hdr.snapid = snapshot;
1604
1605     hdr.data_length = wlen;
1606     hdr.vdi_size = s->inode.vdi_size;
1607     hdr.copy_policy = s->inode.copy_policy;
1608     hdr.copies = s->inode.nr_copies;
1609     hdr.block_size_shift = s->inode.block_size_shift;
1610
1611     ret = do_req(fd, s->aio_context, (SheepdogReq *)&hdr, buf, &wlen, &rlen);
1612
1613     closesocket(fd);
1614
1615     if (ret) {
1616         error_setg_errno(errp, -ret, "create failed");
1617         return ret;
1618     }
1619
1620     if (rsp->result != SD_RES_SUCCESS) {
1621         error_setg(errp, "%s, %s", sd_strerror(rsp->result), s->inode.name);
1622         return -EIO;
1623     }
1624
1625     if (vdi_id) {
1626         *vdi_id = rsp->vdi_id;
1627     }
1628
1629     return 0;
1630 }
1631
1632 static int sd_prealloc(const char *filename, Error **errp)
1633 {
1634     BlockDriverState *bs = NULL;
1635     BDRVSheepdogState *base = NULL;
1636     unsigned long buf_size;
1637     uint32_t idx, max_idx;
1638     uint32_t object_size;
1639     int64_t vdi_size;
1640     void *buf = NULL;
1641     int ret;
1642
1643     ret = bdrv_open(&bs, filename, NULL, NULL, BDRV_O_RDWR | BDRV_O_PROTOCOL,
1644                     errp);
1645     if (ret < 0) {
1646         goto out_with_err_set;
1647     }
1648
1649     vdi_size = bdrv_getlength(bs);
1650     if (vdi_size < 0) {
1651         ret = vdi_size;
1652         goto out;
1653     }
1654
1655     base = bs->opaque;
1656     object_size = (UINT32_C(1) << base->inode.block_size_shift);
1657     buf_size = MIN(object_size, SD_DATA_OBJ_SIZE);
1658     buf = g_malloc0(buf_size);
1659
1660     max_idx = DIV_ROUND_UP(vdi_size, buf_size);
1661
1662     for (idx = 0; idx < max_idx; idx++) {
1663         /*
1664          * The created image can be a cloned image, so we need to read
1665          * a data from the source image.
1666          */
1667         ret = bdrv_pread(bs, idx * buf_size, buf, buf_size);
1668         if (ret < 0) {
1669             goto out;
1670         }
1671         ret = bdrv_pwrite(bs, idx * buf_size, buf, buf_size);
1672         if (ret < 0) {
1673             goto out;
1674         }
1675     }
1676
1677 out:
1678     if (ret < 0) {
1679         error_setg_errno(errp, -ret, "Can't pre-allocate");
1680     }
1681 out_with_err_set:
1682     if (bs) {
1683         bdrv_unref(bs);
1684     }
1685     g_free(buf);
1686
1687     return ret;
1688 }
1689
1690 /*
1691  * Sheepdog support two kinds of redundancy, full replication and erasure
1692  * coding.
1693  *
1694  * # create a fully replicated vdi with x copies
1695  * -o redundancy=x (1 <= x <= SD_MAX_COPIES)
1696  *
1697  * # create a erasure coded vdi with x data strips and y parity strips
1698  * -o redundancy=x:y (x must be one of {2,4,8,16} and 1 <= y < SD_EC_MAX_STRIP)
1699  */
1700 static int parse_redundancy(BDRVSheepdogState *s, const char *opt)
1701 {
1702     struct SheepdogInode *inode = &s->inode;
1703     const char *n1, *n2;
1704     long copy, parity;
1705     char p[10];
1706
1707     pstrcpy(p, sizeof(p), opt);
1708     n1 = strtok(p, ":");
1709     n2 = strtok(NULL, ":");
1710
1711     if (!n1) {
1712         return -EINVAL;
1713     }
1714
1715     copy = strtol(n1, NULL, 10);
1716     if (copy > SD_MAX_COPIES || copy < 1) {
1717         return -EINVAL;
1718     }
1719     if (!n2) {
1720         inode->copy_policy = 0;
1721         inode->nr_copies = copy;
1722         return 0;
1723     }
1724
1725     if (copy != 2 && copy != 4 && copy != 8 && copy != 16) {
1726         return -EINVAL;
1727     }
1728
1729     parity = strtol(n2, NULL, 10);
1730     if (parity >= SD_EC_MAX_STRIP || parity < 1) {
1731         return -EINVAL;
1732     }
1733
1734     /*
1735      * 4 bits for parity and 4 bits for data.
1736      * We have to compress upper data bits because it can't represent 16
1737      */
1738     inode->copy_policy = ((copy / 2) << 4) + parity;
1739     inode->nr_copies = copy + parity;
1740
1741     return 0;
1742 }
1743
1744 static int parse_block_size_shift(BDRVSheepdogState *s, QemuOpts *opt)
1745 {
1746     struct SheepdogInode *inode = &s->inode;
1747     uint64_t object_size;
1748     int obj_order;
1749
1750     object_size = qemu_opt_get_size_del(opt, BLOCK_OPT_OBJECT_SIZE, 0);
1751     if (object_size) {
1752         if ((object_size - 1) & object_size) {    /* not a power of 2? */
1753             return -EINVAL;
1754         }
1755         obj_order = ctz32(object_size);
1756         if (obj_order < 20 || obj_order > 31) {
1757             return -EINVAL;
1758         }
1759         inode->block_size_shift = (uint8_t)obj_order;
1760     }
1761
1762     return 0;
1763 }
1764
1765 static int sd_create(const char *filename, QemuOpts *opts,
1766                      Error **errp)
1767 {
1768     int ret = 0;
1769     uint32_t vid = 0;
1770     char *backing_file = NULL;
1771     char *buf = NULL;
1772     BDRVSheepdogState *s;
1773     char tag[SD_MAX_VDI_TAG_LEN];
1774     uint32_t snapid;
1775     uint64_t max_vdi_size;
1776     bool prealloc = false;
1777
1778     s = g_new0(BDRVSheepdogState, 1);
1779
1780     memset(tag, 0, sizeof(tag));
1781     if (strstr(filename, "://")) {
1782         ret = sd_parse_uri(s, filename, s->name, &snapid, tag);
1783     } else {
1784         ret = parse_vdiname(s, filename, s->name, &snapid, tag);
1785     }
1786     if (ret < 0) {
1787         error_setg(errp, "Can't parse filename");
1788         goto out;
1789     }
1790
1791     s->inode.vdi_size = ROUND_UP(qemu_opt_get_size_del(opts, BLOCK_OPT_SIZE, 0),
1792                                  BDRV_SECTOR_SIZE);
1793     backing_file = qemu_opt_get_del(opts, BLOCK_OPT_BACKING_FILE);
1794     buf = qemu_opt_get_del(opts, BLOCK_OPT_PREALLOC);
1795     if (!buf || !strcmp(buf, "off")) {
1796         prealloc = false;
1797     } else if (!strcmp(buf, "full")) {
1798         prealloc = true;
1799     } else {
1800         error_setg(errp, "Invalid preallocation mode: '%s'", buf);
1801         ret = -EINVAL;
1802         goto out;
1803     }
1804
1805     g_free(buf);
1806     buf = qemu_opt_get_del(opts, BLOCK_OPT_REDUNDANCY);
1807     if (buf) {
1808         ret = parse_redundancy(s, buf);
1809         if (ret < 0) {
1810             error_setg(errp, "Invalid redundancy mode: '%s'", buf);
1811             goto out;
1812         }
1813     }
1814     ret = parse_block_size_shift(s, opts);
1815     if (ret < 0) {
1816         error_setg(errp, "Invalid object_size."
1817                          " obect_size needs to be power of 2"
1818                          " and be limited from 2^20 to 2^31");
1819         goto out;
1820     }
1821
1822     if (backing_file) {
1823         BlockDriverState *bs;
1824         BDRVSheepdogState *base;
1825         BlockDriver *drv;
1826
1827         /* Currently, only Sheepdog backing image is supported. */
1828         drv = bdrv_find_protocol(backing_file, true, NULL);
1829         if (!drv || strcmp(drv->protocol_name, "sheepdog") != 0) {
1830             error_setg(errp, "backing_file must be a sheepdog image");
1831             ret = -EINVAL;
1832             goto out;
1833         }
1834
1835         bs = NULL;
1836         ret = bdrv_open(&bs, backing_file, NULL, NULL, BDRV_O_PROTOCOL, errp);
1837         if (ret < 0) {
1838             goto out;
1839         }
1840
1841         base = bs->opaque;
1842
1843         if (!is_snapshot(&base->inode)) {
1844             error_setg(errp, "cannot clone from a non snapshot vdi");
1845             bdrv_unref(bs);
1846             ret = -EINVAL;
1847             goto out;
1848         }
1849         s->inode.vdi_id = base->inode.vdi_id;
1850         bdrv_unref(bs);
1851     }
1852
1853     s->aio_context = qemu_get_aio_context();
1854
1855     /* if block_size_shift is not specified, get cluster default value */
1856     if (s->inode.block_size_shift == 0) {
1857         SheepdogVdiReq hdr;
1858         SheepdogClusterRsp *rsp = (SheepdogClusterRsp *)&hdr;
1859         Error *local_err = NULL;
1860         int fd;
1861         unsigned int wlen = 0, rlen = 0;
1862
1863         fd = connect_to_sdog(s, &local_err);
1864         if (fd < 0) {
1865             error_report_err(local_err);
1866             ret = -EIO;
1867             goto out;
1868         }
1869
1870         memset(&hdr, 0, sizeof(hdr));
1871         hdr.opcode = SD_OP_GET_CLUSTER_DEFAULT;
1872         hdr.proto_ver = SD_PROTO_VER;
1873
1874         ret = do_req(fd, s->aio_context, (SheepdogReq *)&hdr,
1875                      NULL, &wlen, &rlen);
1876         closesocket(fd);
1877         if (ret) {
1878             error_setg_errno(errp, -ret, "failed to get cluster default");
1879             goto out;
1880         }
1881         if (rsp->result == SD_RES_SUCCESS) {
1882             s->inode.block_size_shift = rsp->block_size_shift;
1883         } else {
1884             s->inode.block_size_shift = SD_DEFAULT_BLOCK_SIZE_SHIFT;
1885         }
1886     }
1887
1888     max_vdi_size = (UINT64_C(1) << s->inode.block_size_shift) * MAX_DATA_OBJS;
1889
1890     if (s->inode.vdi_size > max_vdi_size) {
1891         error_setg(errp, "An image is too large."
1892                          " The maximum image size is %"PRIu64 "GB",
1893                          max_vdi_size / 1024 / 1024 / 1024);
1894         ret = -EINVAL;
1895         goto out;
1896     }
1897
1898     ret = do_sd_create(s, &vid, 0, errp);
1899     if (ret) {
1900         goto out;
1901     }
1902
1903     if (prealloc) {
1904         ret = sd_prealloc(filename, errp);
1905     }
1906 out:
1907     g_free(backing_file);
1908     g_free(buf);
1909     g_free(s);
1910     return ret;
1911 }
1912
1913 static void sd_close(BlockDriverState *bs)
1914 {
1915     Error *local_err = NULL;
1916     BDRVSheepdogState *s = bs->opaque;
1917     SheepdogVdiReq hdr;
1918     SheepdogVdiRsp *rsp = (SheepdogVdiRsp *)&hdr;
1919     unsigned int wlen, rlen = 0;
1920     int fd, ret;
1921
1922     DPRINTF("%s\n", s->name);
1923
1924     fd = connect_to_sdog(s, &local_err);
1925     if (fd < 0) {
1926         error_report_err(local_err);
1927         return;
1928     }
1929
1930     memset(&hdr, 0, sizeof(hdr));
1931
1932     hdr.opcode = SD_OP_RELEASE_VDI;
1933     hdr.type = LOCK_TYPE_NORMAL;
1934     hdr.base_vdi_id = s->inode.vdi_id;
1935     wlen = strlen(s->name) + 1;
1936     hdr.data_length = wlen;
1937     hdr.flags = SD_FLAG_CMD_WRITE;
1938
1939     ret = do_req(fd, s->aio_context, (SheepdogReq *)&hdr,
1940                  s->name, &wlen, &rlen);
1941
1942     closesocket(fd);
1943
1944     if (!ret && rsp->result != SD_RES_SUCCESS &&
1945         rsp->result != SD_RES_VDI_NOT_LOCKED) {
1946         error_report("%s, %s", sd_strerror(rsp->result), s->name);
1947     }
1948
1949     aio_set_fd_handler(bdrv_get_aio_context(bs), s->fd,
1950                        false, NULL, NULL, NULL);
1951     closesocket(s->fd);
1952     g_free(s->host_spec);
1953 }
1954
1955 static int64_t sd_getlength(BlockDriverState *bs)
1956 {
1957     BDRVSheepdogState *s = bs->opaque;
1958
1959     return s->inode.vdi_size;
1960 }
1961
1962 static int sd_truncate(BlockDriverState *bs, int64_t offset)
1963 {
1964     Error *local_err = NULL;
1965     BDRVSheepdogState *s = bs->opaque;
1966     int ret, fd;
1967     unsigned int datalen;
1968     uint64_t max_vdi_size;
1969
1970     max_vdi_size = (UINT64_C(1) << s->inode.block_size_shift) * MAX_DATA_OBJS;
1971     if (offset < s->inode.vdi_size) {
1972         error_report("shrinking is not supported");
1973         return -EINVAL;
1974     } else if (offset > max_vdi_size) {
1975         error_report("too big image size");
1976         return -EINVAL;
1977     }
1978
1979     fd = connect_to_sdog(s, &local_err);
1980     if (fd < 0) {
1981         error_report_err(local_err);
1982         return fd;
1983     }
1984
1985     /* we don't need to update entire object */
1986     datalen = SD_INODE_SIZE - sizeof(s->inode.data_vdi_id);
1987     s->inode.vdi_size = offset;
1988     ret = write_object(fd, s->aio_context, (char *)&s->inode,
1989                        vid_to_vdi_oid(s->inode.vdi_id), s->inode.nr_copies,
1990                        datalen, 0, false, s->cache_flags);
1991     close(fd);
1992
1993     if (ret < 0) {
1994         error_report("failed to update an inode.");
1995     }
1996
1997     return ret;
1998 }
1999
2000 /*
2001  * This function is called after writing data objects.  If we need to
2002  * update metadata, this sends a write request to the vdi object.
2003  * Otherwise, this switches back to sd_co_readv/writev.
2004  */
2005 static void coroutine_fn sd_write_done(SheepdogAIOCB *acb)
2006 {
2007     BDRVSheepdogState *s = acb->common.bs->opaque;
2008     struct iovec iov;
2009     AIOReq *aio_req;
2010     uint32_t offset, data_len, mn, mx;
2011
2012     mn = acb->min_dirty_data_idx;
2013     mx = acb->max_dirty_data_idx;
2014     if (mn <= mx) {
2015         /* we need to update the vdi object. */
2016         offset = sizeof(s->inode) - sizeof(s->inode.data_vdi_id) +
2017             mn * sizeof(s->inode.data_vdi_id[0]);
2018         data_len = (mx - mn + 1) * sizeof(s->inode.data_vdi_id[0]);
2019
2020         acb->min_dirty_data_idx = UINT32_MAX;
2021         acb->max_dirty_data_idx = 0;
2022
2023         iov.iov_base = &s->inode;
2024         iov.iov_len = sizeof(s->inode);
2025         aio_req = alloc_aio_req(s, acb, vid_to_vdi_oid(s->inode.vdi_id),
2026                                 data_len, offset, 0, false, 0, offset);
2027         QLIST_INSERT_HEAD(&s->inflight_aio_head, aio_req, aio_siblings);
2028         add_aio_request(s, aio_req, &iov, 1, AIOCB_WRITE_UDATA);
2029
2030         acb->aio_done_func = sd_finish_aiocb;
2031         acb->aiocb_type = AIOCB_WRITE_UDATA;
2032         return;
2033     }
2034
2035     sd_finish_aiocb(acb);
2036 }
2037
2038 /* Delete current working VDI on the snapshot chain */
2039 static bool sd_delete(BDRVSheepdogState *s)
2040 {
2041     Error *local_err = NULL;
2042     unsigned int wlen = SD_MAX_VDI_LEN, rlen = 0;
2043     SheepdogVdiReq hdr = {
2044         .opcode = SD_OP_DEL_VDI,
2045         .base_vdi_id = s->inode.vdi_id,
2046         .data_length = wlen,
2047         .flags = SD_FLAG_CMD_WRITE,
2048     };
2049     SheepdogVdiRsp *rsp = (SheepdogVdiRsp *)&hdr;
2050     int fd, ret;
2051
2052     fd = connect_to_sdog(s, &local_err);
2053     if (fd < 0) {
2054         error_report_err(local_err);
2055         return false;
2056     }
2057
2058     ret = do_req(fd, s->aio_context, (SheepdogReq *)&hdr,
2059                  s->name, &wlen, &rlen);
2060     closesocket(fd);
2061     if (ret) {
2062         return false;
2063     }
2064     switch (rsp->result) {
2065     case SD_RES_NO_VDI:
2066         error_report("%s was already deleted", s->name);
2067         /* fall through */
2068     case SD_RES_SUCCESS:
2069         break;
2070     default:
2071         error_report("%s, %s", sd_strerror(rsp->result), s->name);
2072         return false;
2073     }
2074
2075     return true;
2076 }
2077
2078 /*
2079  * Create a writable VDI from a snapshot
2080  */
2081 static int sd_create_branch(BDRVSheepdogState *s)
2082 {
2083     Error *local_err = NULL;
2084     int ret, fd;
2085     uint32_t vid;
2086     char *buf;
2087     bool deleted;
2088
2089     DPRINTF("%" PRIx32 " is snapshot.\n", s->inode.vdi_id);
2090
2091     buf = g_malloc(SD_INODE_SIZE);
2092
2093     /*
2094      * Even If deletion fails, we will just create extra snapshot based on
2095      * the working VDI which was supposed to be deleted. So no need to
2096      * false bail out.
2097      */
2098     deleted = sd_delete(s);
2099     ret = do_sd_create(s, &vid, !deleted, &local_err);
2100     if (ret) {
2101         error_report_err(local_err);
2102         goto out;
2103     }
2104
2105     DPRINTF("%" PRIx32 " is created.\n", vid);
2106
2107     fd = connect_to_sdog(s, &local_err);
2108     if (fd < 0) {
2109         error_report_err(local_err);
2110         ret = fd;
2111         goto out;
2112     }
2113
2114     ret = read_object(fd, s->aio_context, buf, vid_to_vdi_oid(vid),
2115                       s->inode.nr_copies, SD_INODE_SIZE, 0, s->cache_flags);
2116
2117     closesocket(fd);
2118
2119     if (ret < 0) {
2120         goto out;
2121     }
2122
2123     memcpy(&s->inode, buf, sizeof(s->inode));
2124
2125     s->is_snapshot = false;
2126     ret = 0;
2127     DPRINTF("%" PRIx32 " was newly created.\n", s->inode.vdi_id);
2128
2129 out:
2130     g_free(buf);
2131
2132     return ret;
2133 }
2134
2135 /*
2136  * Send I/O requests to the server.
2137  *
2138  * This function sends requests to the server, links the requests to
2139  * the inflight_list in BDRVSheepdogState, and exits without
2140  * waiting the response.  The responses are received in the
2141  * `aio_read_response' function which is called from the main loop as
2142  * a fd handler.
2143  *
2144  * Returns 1 when we need to wait a response, 0 when there is no sent
2145  * request and -errno in error cases.
2146  */
2147 static int coroutine_fn sd_co_rw_vector(void *p)
2148 {
2149     SheepdogAIOCB *acb = p;
2150     int ret = 0;
2151     unsigned long len, done = 0, total = acb->nb_sectors * BDRV_SECTOR_SIZE;
2152     unsigned long idx;
2153     uint32_t object_size;
2154     uint64_t oid;
2155     uint64_t offset;
2156     BDRVSheepdogState *s = acb->common.bs->opaque;
2157     SheepdogInode *inode = &s->inode;
2158     AIOReq *aio_req;
2159
2160     if (acb->aiocb_type == AIOCB_WRITE_UDATA && s->is_snapshot) {
2161         /*
2162          * In the case we open the snapshot VDI, Sheepdog creates the
2163          * writable VDI when we do a write operation first.
2164          */
2165         ret = sd_create_branch(s);
2166         if (ret) {
2167             acb->ret = -EIO;
2168             goto out;
2169         }
2170     }
2171
2172     object_size = (UINT32_C(1) << inode->block_size_shift);
2173     idx = acb->sector_num * BDRV_SECTOR_SIZE / object_size;
2174     offset = (acb->sector_num * BDRV_SECTOR_SIZE) % object_size;
2175
2176     /*
2177      * Make sure we don't free the aiocb before we are done with all requests.
2178      * This additional reference is dropped at the end of this function.
2179      */
2180     acb->nr_pending++;
2181
2182     while (done != total) {
2183         uint8_t flags = 0;
2184         uint64_t old_oid = 0;
2185         bool create = false;
2186
2187         oid = vid_to_data_oid(inode->data_vdi_id[idx], idx);
2188
2189         len = MIN(total - done, object_size - offset);
2190
2191         switch (acb->aiocb_type) {
2192         case AIOCB_READ_UDATA:
2193             if (!inode->data_vdi_id[idx]) {
2194                 qemu_iovec_memset(acb->qiov, done, 0, len);
2195                 goto done;
2196             }
2197             break;
2198         case AIOCB_WRITE_UDATA:
2199             if (!inode->data_vdi_id[idx]) {
2200                 create = true;
2201             } else if (!is_data_obj_writable(inode, idx)) {
2202                 /* Copy-On-Write */
2203                 create = true;
2204                 old_oid = oid;
2205                 flags = SD_FLAG_CMD_COW;
2206             }
2207             break;
2208         case AIOCB_DISCARD_OBJ:
2209             /*
2210              * We discard the object only when the whole object is
2211              * 1) allocated 2) trimmed. Otherwise, simply skip it.
2212              */
2213             if (len != object_size || inode->data_vdi_id[idx] == 0) {
2214                 goto done;
2215             }
2216             break;
2217         default:
2218             break;
2219         }
2220
2221         if (create) {
2222             DPRINTF("update ino (%" PRIu32 ") %" PRIu64 " %" PRIu64 " %ld\n",
2223                     inode->vdi_id, oid,
2224                     vid_to_data_oid(inode->data_vdi_id[idx], idx), idx);
2225             oid = vid_to_data_oid(inode->vdi_id, idx);
2226             DPRINTF("new oid %" PRIx64 "\n", oid);
2227         }
2228
2229         aio_req = alloc_aio_req(s, acb, oid, len, offset, flags, create,
2230                                 old_oid,
2231                                 acb->aiocb_type == AIOCB_DISCARD_OBJ ?
2232                                 0 : done);
2233         QLIST_INSERT_HEAD(&s->inflight_aio_head, aio_req, aio_siblings);
2234
2235         add_aio_request(s, aio_req, acb->qiov->iov, acb->qiov->niov,
2236                         acb->aiocb_type);
2237     done:
2238         offset = 0;
2239         idx++;
2240         done += len;
2241     }
2242 out:
2243     if (!--acb->nr_pending) {
2244         return acb->ret;
2245     }
2246     return 1;
2247 }
2248
2249 static bool check_overlapping_aiocb(BDRVSheepdogState *s, SheepdogAIOCB *aiocb)
2250 {
2251     SheepdogAIOCB *cb;
2252
2253     QLIST_FOREACH(cb, &s->inflight_aiocb_head, aiocb_siblings) {
2254         if (AIOCBOverlapping(aiocb, cb)) {
2255             return true;
2256         }
2257     }
2258
2259     QLIST_INSERT_HEAD(&s->inflight_aiocb_head, aiocb, aiocb_siblings);
2260     return false;
2261 }
2262
2263 static coroutine_fn int sd_co_writev(BlockDriverState *bs, int64_t sector_num,
2264                         int nb_sectors, QEMUIOVector *qiov)
2265 {
2266     SheepdogAIOCB *acb;
2267     int ret;
2268     int64_t offset = (sector_num + nb_sectors) * BDRV_SECTOR_SIZE;
2269     BDRVSheepdogState *s = bs->opaque;
2270
2271     if (offset > s->inode.vdi_size) {
2272         ret = sd_truncate(bs, offset);
2273         if (ret < 0) {
2274             return ret;
2275         }
2276     }
2277
2278     acb = sd_aio_setup(bs, qiov, sector_num, nb_sectors);
2279     acb->aio_done_func = sd_write_done;
2280     acb->aiocb_type = AIOCB_WRITE_UDATA;
2281
2282 retry:
2283     if (check_overlapping_aiocb(s, acb)) {
2284         qemu_co_queue_wait(&s->overlapping_queue);
2285         goto retry;
2286     }
2287
2288     ret = sd_co_rw_vector(acb);
2289     if (ret <= 0) {
2290         QLIST_REMOVE(acb, aiocb_siblings);
2291         qemu_co_queue_restart_all(&s->overlapping_queue);
2292         qemu_aio_unref(acb);
2293         return ret;
2294     }
2295
2296     qemu_coroutine_yield();
2297
2298     QLIST_REMOVE(acb, aiocb_siblings);
2299     qemu_co_queue_restart_all(&s->overlapping_queue);
2300
2301     return acb->ret;
2302 }
2303
2304 static coroutine_fn int sd_co_readv(BlockDriverState *bs, int64_t sector_num,
2305                        int nb_sectors, QEMUIOVector *qiov)
2306 {
2307     SheepdogAIOCB *acb;
2308     int ret;
2309     BDRVSheepdogState *s = bs->opaque;
2310
2311     acb = sd_aio_setup(bs, qiov, sector_num, nb_sectors);
2312     acb->aiocb_type = AIOCB_READ_UDATA;
2313     acb->aio_done_func = sd_finish_aiocb;
2314
2315 retry:
2316     if (check_overlapping_aiocb(s, acb)) {
2317         qemu_co_queue_wait(&s->overlapping_queue);
2318         goto retry;
2319     }
2320
2321     ret = sd_co_rw_vector(acb);
2322     if (ret <= 0) {
2323         QLIST_REMOVE(acb, aiocb_siblings);
2324         qemu_co_queue_restart_all(&s->overlapping_queue);
2325         qemu_aio_unref(acb);
2326         return ret;
2327     }
2328
2329     qemu_coroutine_yield();
2330
2331     QLIST_REMOVE(acb, aiocb_siblings);
2332     qemu_co_queue_restart_all(&s->overlapping_queue);
2333     return acb->ret;
2334 }
2335
2336 static int coroutine_fn sd_co_flush_to_disk(BlockDriverState *bs)
2337 {
2338     BDRVSheepdogState *s = bs->opaque;
2339     SheepdogAIOCB *acb;
2340     AIOReq *aio_req;
2341
2342     if (s->cache_flags != SD_FLAG_CMD_CACHE) {
2343         return 0;
2344     }
2345
2346     acb = sd_aio_setup(bs, NULL, 0, 0);
2347     acb->aiocb_type = AIOCB_FLUSH_CACHE;
2348     acb->aio_done_func = sd_finish_aiocb;
2349
2350     aio_req = alloc_aio_req(s, acb, vid_to_vdi_oid(s->inode.vdi_id),
2351                             0, 0, 0, false, 0, 0);
2352     QLIST_INSERT_HEAD(&s->inflight_aio_head, aio_req, aio_siblings);
2353     add_aio_request(s, aio_req, NULL, 0, acb->aiocb_type);
2354
2355     qemu_coroutine_yield();
2356     return acb->ret;
2357 }
2358
2359 static int sd_snapshot_create(BlockDriverState *bs, QEMUSnapshotInfo *sn_info)
2360 {
2361     Error *local_err = NULL;
2362     BDRVSheepdogState *s = bs->opaque;
2363     int ret, fd;
2364     uint32_t new_vid;
2365     SheepdogInode *inode;
2366     unsigned int datalen;
2367
2368     DPRINTF("sn_info: name %s id_str %s s: name %s vm_state_size %" PRId64 " "
2369             "is_snapshot %d\n", sn_info->name, sn_info->id_str,
2370             s->name, sn_info->vm_state_size, s->is_snapshot);
2371
2372     if (s->is_snapshot) {
2373         error_report("You can't create a snapshot of a snapshot VDI, "
2374                      "%s (%" PRIu32 ").", s->name, s->inode.vdi_id);
2375
2376         return -EINVAL;
2377     }
2378
2379     DPRINTF("%s %s\n", sn_info->name, sn_info->id_str);
2380
2381     s->inode.vm_state_size = sn_info->vm_state_size;
2382     s->inode.vm_clock_nsec = sn_info->vm_clock_nsec;
2383     /* It appears that inode.tag does not require a NUL terminator,
2384      * which means this use of strncpy is ok.
2385      */
2386     strncpy(s->inode.tag, sn_info->name, sizeof(s->inode.tag));
2387     /* we don't need to update entire object */
2388     datalen = SD_INODE_SIZE - sizeof(s->inode.data_vdi_id);
2389     inode = g_malloc(datalen);
2390
2391     /* refresh inode. */
2392     fd = connect_to_sdog(s, &local_err);
2393     if (fd < 0) {
2394         error_report_err(local_err);
2395         ret = fd;
2396         goto cleanup;
2397     }
2398
2399     ret = write_object(fd, s->aio_context, (char *)&s->inode,
2400                        vid_to_vdi_oid(s->inode.vdi_id), s->inode.nr_copies,
2401                        datalen, 0, false, s->cache_flags);
2402     if (ret < 0) {
2403         error_report("failed to write snapshot's inode.");
2404         goto cleanup;
2405     }
2406
2407     ret = do_sd_create(s, &new_vid, 1, &local_err);
2408     if (ret < 0) {
2409         error_reportf_err(local_err,
2410                           "failed to create inode for snapshot: ");
2411         goto cleanup;
2412     }
2413
2414     ret = read_object(fd, s->aio_context, (char *)inode,
2415                       vid_to_vdi_oid(new_vid), s->inode.nr_copies, datalen, 0,
2416                       s->cache_flags);
2417
2418     if (ret < 0) {
2419         error_report("failed to read new inode info. %s", strerror(errno));
2420         goto cleanup;
2421     }
2422
2423     memcpy(&s->inode, inode, datalen);
2424     DPRINTF("s->inode: name %s snap_id %x oid %x\n",
2425             s->inode.name, s->inode.snap_id, s->inode.vdi_id);
2426
2427 cleanup:
2428     g_free(inode);
2429     closesocket(fd);
2430     return ret;
2431 }
2432
2433 /*
2434  * We implement rollback(loadvm) operation to the specified snapshot by
2435  * 1) switch to the snapshot
2436  * 2) rely on sd_create_branch to delete working VDI and
2437  * 3) create a new working VDI based on the specified snapshot
2438  */
2439 static int sd_snapshot_goto(BlockDriverState *bs, const char *snapshot_id)
2440 {
2441     BDRVSheepdogState *s = bs->opaque;
2442     BDRVSheepdogState *old_s;
2443     char tag[SD_MAX_VDI_TAG_LEN];
2444     uint32_t snapid = 0;
2445     int ret = 0;
2446
2447     old_s = g_new(BDRVSheepdogState, 1);
2448
2449     memcpy(old_s, s, sizeof(BDRVSheepdogState));
2450
2451     snapid = strtoul(snapshot_id, NULL, 10);
2452     if (snapid) {
2453         tag[0] = 0;
2454     } else {
2455         pstrcpy(tag, sizeof(tag), snapshot_id);
2456     }
2457
2458     ret = reload_inode(s, snapid, tag);
2459     if (ret) {
2460         goto out;
2461     }
2462
2463     ret = sd_create_branch(s);
2464     if (ret) {
2465         goto out;
2466     }
2467
2468     g_free(old_s);
2469
2470     return 0;
2471 out:
2472     /* recover bdrv_sd_state */
2473     memcpy(s, old_s, sizeof(BDRVSheepdogState));
2474     g_free(old_s);
2475
2476     error_report("failed to open. recover old bdrv_sd_state.");
2477
2478     return ret;
2479 }
2480
2481 static int sd_snapshot_delete(BlockDriverState *bs,
2482                               const char *snapshot_id,
2483                               const char *name,
2484                               Error **errp)
2485 {
2486     /* FIXME: Delete specified snapshot id.  */
2487     return 0;
2488 }
2489
2490 static int sd_snapshot_list(BlockDriverState *bs, QEMUSnapshotInfo **psn_tab)
2491 {
2492     Error *local_err = NULL;
2493     BDRVSheepdogState *s = bs->opaque;
2494     SheepdogReq req;
2495     int fd, nr = 1024, ret, max = BITS_TO_LONGS(SD_NR_VDIS) * sizeof(long);
2496     QEMUSnapshotInfo *sn_tab = NULL;
2497     unsigned wlen, rlen;
2498     int found = 0;
2499     static SheepdogInode inode;
2500     unsigned long *vdi_inuse;
2501     unsigned int start_nr;
2502     uint64_t hval;
2503     uint32_t vid;
2504
2505     vdi_inuse = g_malloc(max);
2506
2507     fd = connect_to_sdog(s, &local_err);
2508     if (fd < 0) {
2509         error_report_err(local_err);
2510         ret = fd;
2511         goto out;
2512     }
2513
2514     rlen = max;
2515     wlen = 0;
2516
2517     memset(&req, 0, sizeof(req));
2518
2519     req.opcode = SD_OP_READ_VDIS;
2520     req.data_length = max;
2521
2522     ret = do_req(fd, s->aio_context, (SheepdogReq *)&req,
2523                  vdi_inuse, &wlen, &rlen);
2524
2525     closesocket(fd);
2526     if (ret) {
2527         goto out;
2528     }
2529
2530     sn_tab = g_new0(QEMUSnapshotInfo, nr);
2531
2532     /* calculate a vdi id with hash function */
2533     hval = fnv_64a_buf(s->name, strlen(s->name), FNV1A_64_INIT);
2534     start_nr = hval & (SD_NR_VDIS - 1);
2535
2536     fd = connect_to_sdog(s, &local_err);
2537     if (fd < 0) {
2538         error_report_err(local_err);
2539         ret = fd;
2540         goto out;
2541     }
2542
2543     for (vid = start_nr; found < nr; vid = (vid + 1) % SD_NR_VDIS) {
2544         if (!test_bit(vid, vdi_inuse)) {
2545             break;
2546         }
2547
2548         /* we don't need to read entire object */
2549         ret = read_object(fd, s->aio_context, (char *)&inode,
2550                           vid_to_vdi_oid(vid),
2551                           0, SD_INODE_SIZE - sizeof(inode.data_vdi_id), 0,
2552                           s->cache_flags);
2553
2554         if (ret) {
2555             continue;
2556         }
2557
2558         if (!strcmp(inode.name, s->name) && is_snapshot(&inode)) {
2559             sn_tab[found].date_sec = inode.snap_ctime >> 32;
2560             sn_tab[found].date_nsec = inode.snap_ctime & 0xffffffff;
2561             sn_tab[found].vm_state_size = inode.vm_state_size;
2562             sn_tab[found].vm_clock_nsec = inode.vm_clock_nsec;
2563
2564             snprintf(sn_tab[found].id_str, sizeof(sn_tab[found].id_str),
2565                      "%" PRIu32, inode.snap_id);
2566             pstrcpy(sn_tab[found].name,
2567                     MIN(sizeof(sn_tab[found].name), sizeof(inode.tag)),
2568                     inode.tag);
2569             found++;
2570         }
2571     }
2572
2573     closesocket(fd);
2574 out:
2575     *psn_tab = sn_tab;
2576
2577     g_free(vdi_inuse);
2578
2579     if (ret < 0) {
2580         return ret;
2581     }
2582
2583     return found;
2584 }
2585
2586 static int do_load_save_vmstate(BDRVSheepdogState *s, uint8_t *data,
2587                                 int64_t pos, int size, int load)
2588 {
2589     Error *local_err = NULL;
2590     bool create;
2591     int fd, ret = 0, remaining = size;
2592     unsigned int data_len;
2593     uint64_t vmstate_oid;
2594     uint64_t offset;
2595     uint32_t vdi_index;
2596     uint32_t vdi_id = load ? s->inode.parent_vdi_id : s->inode.vdi_id;
2597     uint32_t object_size = (UINT32_C(1) << s->inode.block_size_shift);
2598
2599     fd = connect_to_sdog(s, &local_err);
2600     if (fd < 0) {
2601         error_report_err(local_err);
2602         return fd;
2603     }
2604
2605     while (remaining) {
2606         vdi_index = pos / object_size;
2607         offset = pos % object_size;
2608
2609         data_len = MIN(remaining, object_size - offset);
2610
2611         vmstate_oid = vid_to_vmstate_oid(vdi_id, vdi_index);
2612
2613         create = (offset == 0);
2614         if (load) {
2615             ret = read_object(fd, s->aio_context, (char *)data, vmstate_oid,
2616                               s->inode.nr_copies, data_len, offset,
2617                               s->cache_flags);
2618         } else {
2619             ret = write_object(fd, s->aio_context, (char *)data, vmstate_oid,
2620                                s->inode.nr_copies, data_len, offset, create,
2621                                s->cache_flags);
2622         }
2623
2624         if (ret < 0) {
2625             error_report("failed to save vmstate %s", strerror(errno));
2626             goto cleanup;
2627         }
2628
2629         pos += data_len;
2630         data += data_len;
2631         remaining -= data_len;
2632     }
2633     ret = size;
2634 cleanup:
2635     closesocket(fd);
2636     return ret;
2637 }
2638
2639 static int sd_save_vmstate(BlockDriverState *bs, QEMUIOVector *qiov,
2640                            int64_t pos)
2641 {
2642     BDRVSheepdogState *s = bs->opaque;
2643     void *buf;
2644     int ret;
2645
2646     buf = qemu_blockalign(bs, qiov->size);
2647     qemu_iovec_to_buf(qiov, 0, buf, qiov->size);
2648     ret = do_load_save_vmstate(s, (uint8_t *) buf, pos, qiov->size, 0);
2649     qemu_vfree(buf);
2650
2651     return ret;
2652 }
2653
2654 static int sd_load_vmstate(BlockDriverState *bs, uint8_t *data,
2655                            int64_t pos, int size)
2656 {
2657     BDRVSheepdogState *s = bs->opaque;
2658
2659     return do_load_save_vmstate(s, data, pos, size, 1);
2660 }
2661
2662
2663 static coroutine_fn int sd_co_discard(BlockDriverState *bs, int64_t sector_num,
2664                                       int nb_sectors)
2665 {
2666     SheepdogAIOCB *acb;
2667     BDRVSheepdogState *s = bs->opaque;
2668     int ret;
2669     QEMUIOVector discard_iov;
2670     struct iovec iov;
2671     uint32_t zero = 0;
2672
2673     if (!s->discard_supported) {
2674             return 0;
2675     }
2676
2677     memset(&discard_iov, 0, sizeof(discard_iov));
2678     memset(&iov, 0, sizeof(iov));
2679     iov.iov_base = &zero;
2680     iov.iov_len = sizeof(zero);
2681     discard_iov.iov = &iov;
2682     discard_iov.niov = 1;
2683     acb = sd_aio_setup(bs, &discard_iov, sector_num, nb_sectors);
2684     acb->aiocb_type = AIOCB_DISCARD_OBJ;
2685     acb->aio_done_func = sd_finish_aiocb;
2686
2687 retry:
2688     if (check_overlapping_aiocb(s, acb)) {
2689         qemu_co_queue_wait(&s->overlapping_queue);
2690         goto retry;
2691     }
2692
2693     ret = sd_co_rw_vector(acb);
2694     if (ret <= 0) {
2695         QLIST_REMOVE(acb, aiocb_siblings);
2696         qemu_co_queue_restart_all(&s->overlapping_queue);
2697         qemu_aio_unref(acb);
2698         return ret;
2699     }
2700
2701     qemu_coroutine_yield();
2702
2703     QLIST_REMOVE(acb, aiocb_siblings);
2704     qemu_co_queue_restart_all(&s->overlapping_queue);
2705
2706     return acb->ret;
2707 }
2708
2709 static coroutine_fn int64_t
2710 sd_co_get_block_status(BlockDriverState *bs, int64_t sector_num, int nb_sectors,
2711                        int *pnum)
2712 {
2713     BDRVSheepdogState *s = bs->opaque;
2714     SheepdogInode *inode = &s->inode;
2715     uint32_t object_size = (UINT32_C(1) << inode->block_size_shift);
2716     uint64_t offset = sector_num * BDRV_SECTOR_SIZE;
2717     unsigned long start = offset / object_size,
2718                   end = DIV_ROUND_UP((sector_num + nb_sectors) *
2719                                      BDRV_SECTOR_SIZE, object_size);
2720     unsigned long idx;
2721     int64_t ret = BDRV_BLOCK_DATA | BDRV_BLOCK_OFFSET_VALID | offset;
2722
2723     for (idx = start; idx < end; idx++) {
2724         if (inode->data_vdi_id[idx] == 0) {
2725             break;
2726         }
2727     }
2728     if (idx == start) {
2729         /* Get the longest length of unallocated sectors */
2730         ret = 0;
2731         for (idx = start + 1; idx < end; idx++) {
2732             if (inode->data_vdi_id[idx] != 0) {
2733                 break;
2734             }
2735         }
2736     }
2737
2738     *pnum = (idx - start) * object_size / BDRV_SECTOR_SIZE;
2739     if (*pnum > nb_sectors) {
2740         *pnum = nb_sectors;
2741     }
2742     return ret;
2743 }
2744
2745 static int64_t sd_get_allocated_file_size(BlockDriverState *bs)
2746 {
2747     BDRVSheepdogState *s = bs->opaque;
2748     SheepdogInode *inode = &s->inode;
2749     uint32_t object_size = (UINT32_C(1) << inode->block_size_shift);
2750     unsigned long i, last = DIV_ROUND_UP(inode->vdi_size, object_size);
2751     uint64_t size = 0;
2752
2753     for (i = 0; i < last; i++) {
2754         if (inode->data_vdi_id[i] == 0) {
2755             continue;
2756         }
2757         size += object_size;
2758     }
2759     return size;
2760 }
2761
2762 static QemuOptsList sd_create_opts = {
2763     .name = "sheepdog-create-opts",
2764     .head = QTAILQ_HEAD_INITIALIZER(sd_create_opts.head),
2765     .desc = {
2766         {
2767             .name = BLOCK_OPT_SIZE,
2768             .type = QEMU_OPT_SIZE,
2769             .help = "Virtual disk size"
2770         },
2771         {
2772             .name = BLOCK_OPT_BACKING_FILE,
2773             .type = QEMU_OPT_STRING,
2774             .help = "File name of a base image"
2775         },
2776         {
2777             .name = BLOCK_OPT_PREALLOC,
2778             .type = QEMU_OPT_STRING,
2779             .help = "Preallocation mode (allowed values: off, full)"
2780         },
2781         {
2782             .name = BLOCK_OPT_REDUNDANCY,
2783             .type = QEMU_OPT_STRING,
2784             .help = "Redundancy of the image"
2785         },
2786         {
2787             .name = BLOCK_OPT_OBJECT_SIZE,
2788             .type = QEMU_OPT_SIZE,
2789             .help = "Object size of the image"
2790         },
2791         { /* end of list */ }
2792     }
2793 };
2794
2795 static BlockDriver bdrv_sheepdog = {
2796     .format_name    = "sheepdog",
2797     .protocol_name  = "sheepdog",
2798     .instance_size  = sizeof(BDRVSheepdogState),
2799     .bdrv_needs_filename = true,
2800     .bdrv_file_open = sd_open,
2801     .bdrv_reopen_prepare    = sd_reopen_prepare,
2802     .bdrv_reopen_commit     = sd_reopen_commit,
2803     .bdrv_reopen_abort      = sd_reopen_abort,
2804     .bdrv_close     = sd_close,
2805     .bdrv_create    = sd_create,
2806     .bdrv_has_zero_init = bdrv_has_zero_init_1,
2807     .bdrv_getlength = sd_getlength,
2808     .bdrv_get_allocated_file_size = sd_get_allocated_file_size,
2809     .bdrv_truncate  = sd_truncate,
2810
2811     .bdrv_co_readv  = sd_co_readv,
2812     .bdrv_co_writev = sd_co_writev,
2813     .bdrv_co_flush_to_disk  = sd_co_flush_to_disk,
2814     .bdrv_co_discard = sd_co_discard,
2815     .bdrv_co_get_block_status = sd_co_get_block_status,
2816
2817     .bdrv_snapshot_create   = sd_snapshot_create,
2818     .bdrv_snapshot_goto     = sd_snapshot_goto,
2819     .bdrv_snapshot_delete   = sd_snapshot_delete,
2820     .bdrv_snapshot_list     = sd_snapshot_list,
2821
2822     .bdrv_save_vmstate  = sd_save_vmstate,
2823     .bdrv_load_vmstate  = sd_load_vmstate,
2824
2825     .bdrv_detach_aio_context = sd_detach_aio_context,
2826     .bdrv_attach_aio_context = sd_attach_aio_context,
2827
2828     .create_opts    = &sd_create_opts,
2829 };
2830
2831 static BlockDriver bdrv_sheepdog_tcp = {
2832     .format_name    = "sheepdog",
2833     .protocol_name  = "sheepdog+tcp",
2834     .instance_size  = sizeof(BDRVSheepdogState),
2835     .bdrv_needs_filename = true,
2836     .bdrv_file_open = sd_open,
2837     .bdrv_reopen_prepare    = sd_reopen_prepare,
2838     .bdrv_reopen_commit     = sd_reopen_commit,
2839     .bdrv_reopen_abort      = sd_reopen_abort,
2840     .bdrv_close     = sd_close,
2841     .bdrv_create    = sd_create,
2842     .bdrv_has_zero_init = bdrv_has_zero_init_1,
2843     .bdrv_getlength = sd_getlength,
2844     .bdrv_get_allocated_file_size = sd_get_allocated_file_size,
2845     .bdrv_truncate  = sd_truncate,
2846
2847     .bdrv_co_readv  = sd_co_readv,
2848     .bdrv_co_writev = sd_co_writev,
2849     .bdrv_co_flush_to_disk  = sd_co_flush_to_disk,
2850     .bdrv_co_discard = sd_co_discard,
2851     .bdrv_co_get_block_status = sd_co_get_block_status,
2852
2853     .bdrv_snapshot_create   = sd_snapshot_create,
2854     .bdrv_snapshot_goto     = sd_snapshot_goto,
2855     .bdrv_snapshot_delete   = sd_snapshot_delete,
2856     .bdrv_snapshot_list     = sd_snapshot_list,
2857
2858     .bdrv_save_vmstate  = sd_save_vmstate,
2859     .bdrv_load_vmstate  = sd_load_vmstate,
2860
2861     .bdrv_detach_aio_context = sd_detach_aio_context,
2862     .bdrv_attach_aio_context = sd_attach_aio_context,
2863
2864     .create_opts    = &sd_create_opts,
2865 };
2866
2867 static BlockDriver bdrv_sheepdog_unix = {
2868     .format_name    = "sheepdog",
2869     .protocol_name  = "sheepdog+unix",
2870     .instance_size  = sizeof(BDRVSheepdogState),
2871     .bdrv_needs_filename = true,
2872     .bdrv_file_open = sd_open,
2873     .bdrv_reopen_prepare    = sd_reopen_prepare,
2874     .bdrv_reopen_commit     = sd_reopen_commit,
2875     .bdrv_reopen_abort      = sd_reopen_abort,
2876     .bdrv_close     = sd_close,
2877     .bdrv_create    = sd_create,
2878     .bdrv_has_zero_init = bdrv_has_zero_init_1,
2879     .bdrv_getlength = sd_getlength,
2880     .bdrv_get_allocated_file_size = sd_get_allocated_file_size,
2881     .bdrv_truncate  = sd_truncate,
2882
2883     .bdrv_co_readv  = sd_co_readv,
2884     .bdrv_co_writev = sd_co_writev,
2885     .bdrv_co_flush_to_disk  = sd_co_flush_to_disk,
2886     .bdrv_co_discard = sd_co_discard,
2887     .bdrv_co_get_block_status = sd_co_get_block_status,
2888
2889     .bdrv_snapshot_create   = sd_snapshot_create,
2890     .bdrv_snapshot_goto     = sd_snapshot_goto,
2891     .bdrv_snapshot_delete   = sd_snapshot_delete,
2892     .bdrv_snapshot_list     = sd_snapshot_list,
2893
2894     .bdrv_save_vmstate  = sd_save_vmstate,
2895     .bdrv_load_vmstate  = sd_load_vmstate,
2896
2897     .bdrv_detach_aio_context = sd_detach_aio_context,
2898     .bdrv_attach_aio_context = sd_attach_aio_context,
2899
2900     .create_opts    = &sd_create_opts,
2901 };
2902
2903 static void bdrv_sheepdog_init(void)
2904 {
2905     bdrv_register(&bdrv_sheepdog);
2906     bdrv_register(&bdrv_sheepdog_tcp);
2907     bdrv_register(&bdrv_sheepdog_unix);
2908 }
2909 block_init(bdrv_sheepdog_init);
This page took 0.179774 seconds and 4 git commands to generate.