]> Git Repo - qemu.git/blob - block/sheepdog.c
monitor: split off monitor_data_init()
[qemu.git] / block / sheepdog.c
1 /*
2  * Copyright (C) 2009-2010 Nippon Telegraph and Telephone Corporation.
3  *
4  * This program is free software; you can redistribute it and/or
5  * modify it under the terms of the GNU General Public License version
6  * 2 as published by the Free Software Foundation.
7  *
8  * You should have received a copy of the GNU General Public License
9  * along with this program. If not, see <http://www.gnu.org/licenses/>.
10  *
11  * Contributions after 2012-01-13 are licensed under the terms of the
12  * GNU GPL, version 2 or (at your option) any later version.
13  */
14
15 #include "qemu-common.h"
16 #include "qemu/uri.h"
17 #include "qemu/error-report.h"
18 #include "qemu/sockets.h"
19 #include "block/block_int.h"
20 #include "qemu/bitops.h"
21
22 #define SD_PROTO_VER 0x01
23
24 #define SD_DEFAULT_ADDR "localhost"
25 #define SD_DEFAULT_PORT 7000
26
27 #define SD_OP_CREATE_AND_WRITE_OBJ  0x01
28 #define SD_OP_READ_OBJ       0x02
29 #define SD_OP_WRITE_OBJ      0x03
30 /* 0x04 is used internally by Sheepdog */
31 #define SD_OP_DISCARD_OBJ    0x05
32
33 #define SD_OP_NEW_VDI        0x11
34 #define SD_OP_LOCK_VDI       0x12
35 #define SD_OP_RELEASE_VDI    0x13
36 #define SD_OP_GET_VDI_INFO   0x14
37 #define SD_OP_READ_VDIS      0x15
38 #define SD_OP_FLUSH_VDI      0x16
39 #define SD_OP_DEL_VDI        0x17
40
41 #define SD_FLAG_CMD_WRITE    0x01
42 #define SD_FLAG_CMD_COW      0x02
43 #define SD_FLAG_CMD_CACHE    0x04 /* Writeback mode for cache */
44 #define SD_FLAG_CMD_DIRECT   0x08 /* Don't use cache */
45
46 #define SD_RES_SUCCESS       0x00 /* Success */
47 #define SD_RES_UNKNOWN       0x01 /* Unknown error */
48 #define SD_RES_NO_OBJ        0x02 /* No object found */
49 #define SD_RES_EIO           0x03 /* I/O error */
50 #define SD_RES_VDI_EXIST     0x04 /* Vdi exists already */
51 #define SD_RES_INVALID_PARMS 0x05 /* Invalid parameters */
52 #define SD_RES_SYSTEM_ERROR  0x06 /* System error */
53 #define SD_RES_VDI_LOCKED    0x07 /* Vdi is locked */
54 #define SD_RES_NO_VDI        0x08 /* No vdi found */
55 #define SD_RES_NO_BASE_VDI   0x09 /* No base vdi found */
56 #define SD_RES_VDI_READ      0x0A /* Cannot read requested vdi */
57 #define SD_RES_VDI_WRITE     0x0B /* Cannot write requested vdi */
58 #define SD_RES_BASE_VDI_READ 0x0C /* Cannot read base vdi */
59 #define SD_RES_BASE_VDI_WRITE   0x0D /* Cannot write base vdi */
60 #define SD_RES_NO_TAG        0x0E /* Requested tag is not found */
61 #define SD_RES_STARTUP       0x0F /* Sheepdog is on starting up */
62 #define SD_RES_VDI_NOT_LOCKED   0x10 /* Vdi is not locked */
63 #define SD_RES_SHUTDOWN      0x11 /* Sheepdog is shutting down */
64 #define SD_RES_NO_MEM        0x12 /* Cannot allocate memory */
65 #define SD_RES_FULL_VDI      0x13 /* we already have the maximum vdis */
66 #define SD_RES_VER_MISMATCH  0x14 /* Protocol version mismatch */
67 #define SD_RES_NO_SPACE      0x15 /* Server has no room for new objects */
68 #define SD_RES_WAIT_FOR_FORMAT  0x16 /* Waiting for a format operation */
69 #define SD_RES_WAIT_FOR_JOIN    0x17 /* Waiting for other nodes joining */
70 #define SD_RES_JOIN_FAILED   0x18 /* Target node had failed to join sheepdog */
71 #define SD_RES_HALT          0x19 /* Sheepdog is stopped serving IO request */
72 #define SD_RES_READONLY      0x1A /* Object is read-only */
73
74 /*
75  * Object ID rules
76  *
77  *  0 - 19 (20 bits): data object space
78  * 20 - 31 (12 bits): reserved data object space
79  * 32 - 55 (24 bits): vdi object space
80  * 56 - 59 ( 4 bits): reserved vdi object space
81  * 60 - 63 ( 4 bits): object type identifier space
82  */
83
84 #define VDI_SPACE_SHIFT   32
85 #define VDI_BIT (UINT64_C(1) << 63)
86 #define VMSTATE_BIT (UINT64_C(1) << 62)
87 #define MAX_DATA_OBJS (UINT64_C(1) << 20)
88 #define MAX_CHILDREN 1024
89 #define SD_MAX_VDI_LEN 256
90 #define SD_MAX_VDI_TAG_LEN 256
91 #define SD_NR_VDIS   (1U << 24)
92 #define SD_DATA_OBJ_SIZE (UINT64_C(1) << 22)
93 #define SD_MAX_VDI_SIZE (SD_DATA_OBJ_SIZE * MAX_DATA_OBJS)
94
95 #define SD_INODE_SIZE (sizeof(SheepdogInode))
96 #define CURRENT_VDI_ID 0
97
98 typedef struct SheepdogReq {
99     uint8_t proto_ver;
100     uint8_t opcode;
101     uint16_t flags;
102     uint32_t epoch;
103     uint32_t id;
104     uint32_t data_length;
105     uint32_t opcode_specific[8];
106 } SheepdogReq;
107
108 typedef struct SheepdogRsp {
109     uint8_t proto_ver;
110     uint8_t opcode;
111     uint16_t flags;
112     uint32_t epoch;
113     uint32_t id;
114     uint32_t data_length;
115     uint32_t result;
116     uint32_t opcode_specific[7];
117 } SheepdogRsp;
118
119 typedef struct SheepdogObjReq {
120     uint8_t proto_ver;
121     uint8_t opcode;
122     uint16_t flags;
123     uint32_t epoch;
124     uint32_t id;
125     uint32_t data_length;
126     uint64_t oid;
127     uint64_t cow_oid;
128     uint32_t copies;
129     uint32_t rsvd;
130     uint64_t offset;
131 } SheepdogObjReq;
132
133 typedef struct SheepdogObjRsp {
134     uint8_t proto_ver;
135     uint8_t opcode;
136     uint16_t flags;
137     uint32_t epoch;
138     uint32_t id;
139     uint32_t data_length;
140     uint32_t result;
141     uint32_t copies;
142     uint32_t pad[6];
143 } SheepdogObjRsp;
144
145 typedef struct SheepdogVdiReq {
146     uint8_t proto_ver;
147     uint8_t opcode;
148     uint16_t flags;
149     uint32_t epoch;
150     uint32_t id;
151     uint32_t data_length;
152     uint64_t vdi_size;
153     uint32_t vdi_id;
154     uint32_t copies;
155     uint32_t snapid;
156     uint32_t pad[3];
157 } SheepdogVdiReq;
158
159 typedef struct SheepdogVdiRsp {
160     uint8_t proto_ver;
161     uint8_t opcode;
162     uint16_t flags;
163     uint32_t epoch;
164     uint32_t id;
165     uint32_t data_length;
166     uint32_t result;
167     uint32_t rsvd;
168     uint32_t vdi_id;
169     uint32_t pad[5];
170 } SheepdogVdiRsp;
171
172 typedef struct SheepdogInode {
173     char name[SD_MAX_VDI_LEN];
174     char tag[SD_MAX_VDI_TAG_LEN];
175     uint64_t ctime;
176     uint64_t snap_ctime;
177     uint64_t vm_clock_nsec;
178     uint64_t vdi_size;
179     uint64_t vm_state_size;
180     uint16_t copy_policy;
181     uint8_t nr_copies;
182     uint8_t block_size_shift;
183     uint32_t snap_id;
184     uint32_t vdi_id;
185     uint32_t parent_vdi_id;
186     uint32_t child_vdi_id[MAX_CHILDREN];
187     uint32_t data_vdi_id[MAX_DATA_OBJS];
188 } SheepdogInode;
189
190 /*
191  * 64 bit FNV-1a non-zero initial basis
192  */
193 #define FNV1A_64_INIT ((uint64_t)0xcbf29ce484222325ULL)
194
195 /*
196  * 64 bit Fowler/Noll/Vo FNV-1a hash code
197  */
198 static inline uint64_t fnv_64a_buf(void *buf, size_t len, uint64_t hval)
199 {
200     unsigned char *bp = buf;
201     unsigned char *be = bp + len;
202     while (bp < be) {
203         hval ^= (uint64_t) *bp++;
204         hval += (hval << 1) + (hval << 4) + (hval << 5) +
205             (hval << 7) + (hval << 8) + (hval << 40);
206     }
207     return hval;
208 }
209
210 static inline bool is_data_obj_writable(SheepdogInode *inode, unsigned int idx)
211 {
212     return inode->vdi_id == inode->data_vdi_id[idx];
213 }
214
215 static inline bool is_data_obj(uint64_t oid)
216 {
217     return !(VDI_BIT & oid);
218 }
219
220 static inline uint64_t data_oid_to_idx(uint64_t oid)
221 {
222     return oid & (MAX_DATA_OBJS - 1);
223 }
224
225 static inline uint64_t vid_to_vdi_oid(uint32_t vid)
226 {
227     return VDI_BIT | ((uint64_t)vid << VDI_SPACE_SHIFT);
228 }
229
230 static inline uint64_t vid_to_vmstate_oid(uint32_t vid, uint32_t idx)
231 {
232     return VMSTATE_BIT | ((uint64_t)vid << VDI_SPACE_SHIFT) | idx;
233 }
234
235 static inline uint64_t vid_to_data_oid(uint32_t vid, uint32_t idx)
236 {
237     return ((uint64_t)vid << VDI_SPACE_SHIFT) | idx;
238 }
239
240 static inline bool is_snapshot(struct SheepdogInode *inode)
241 {
242     return !!inode->snap_ctime;
243 }
244
245 #undef DPRINTF
246 #ifdef DEBUG_SDOG
247 #define DPRINTF(fmt, args...)                                       \
248     do {                                                            \
249         fprintf(stdout, "%s %d: " fmt, __func__, __LINE__, ##args); \
250     } while (0)
251 #else
252 #define DPRINTF(fmt, args...)
253 #endif
254
255 typedef struct SheepdogAIOCB SheepdogAIOCB;
256
257 typedef struct AIOReq {
258     SheepdogAIOCB *aiocb;
259     unsigned int iov_offset;
260
261     uint64_t oid;
262     uint64_t base_oid;
263     uint64_t offset;
264     unsigned int data_len;
265     uint8_t flags;
266     uint32_t id;
267
268     QLIST_ENTRY(AIOReq) aio_siblings;
269 } AIOReq;
270
271 enum AIOCBState {
272     AIOCB_WRITE_UDATA,
273     AIOCB_READ_UDATA,
274     AIOCB_FLUSH_CACHE,
275     AIOCB_DISCARD_OBJ,
276 };
277
278 struct SheepdogAIOCB {
279     BlockDriverAIOCB common;
280
281     QEMUIOVector *qiov;
282
283     int64_t sector_num;
284     int nb_sectors;
285
286     int ret;
287     enum AIOCBState aiocb_type;
288
289     Coroutine *coroutine;
290     void (*aio_done_func)(SheepdogAIOCB *);
291
292     bool canceled;
293     int nr_pending;
294 };
295
296 typedef struct BDRVSheepdogState {
297     SheepdogInode inode;
298
299     uint32_t min_dirty_data_idx;
300     uint32_t max_dirty_data_idx;
301
302     char name[SD_MAX_VDI_LEN];
303     bool is_snapshot;
304     uint32_t cache_flags;
305     bool discard_supported;
306
307     char *host_spec;
308     bool is_unix;
309     int fd;
310
311     CoMutex lock;
312     Coroutine *co_send;
313     Coroutine *co_recv;
314
315     uint32_t aioreq_seq_num;
316     QLIST_HEAD(inflight_aio_head, AIOReq) inflight_aio_head;
317     QLIST_HEAD(pending_aio_head, AIOReq) pending_aio_head;
318 } BDRVSheepdogState;
319
320 static const char * sd_strerror(int err)
321 {
322     int i;
323
324     static const struct {
325         int err;
326         const char *desc;
327     } errors[] = {
328         {SD_RES_SUCCESS, "Success"},
329         {SD_RES_UNKNOWN, "Unknown error"},
330         {SD_RES_NO_OBJ, "No object found"},
331         {SD_RES_EIO, "I/O error"},
332         {SD_RES_VDI_EXIST, "VDI exists already"},
333         {SD_RES_INVALID_PARMS, "Invalid parameters"},
334         {SD_RES_SYSTEM_ERROR, "System error"},
335         {SD_RES_VDI_LOCKED, "VDI is already locked"},
336         {SD_RES_NO_VDI, "No vdi found"},
337         {SD_RES_NO_BASE_VDI, "No base VDI found"},
338         {SD_RES_VDI_READ, "Failed read the requested VDI"},
339         {SD_RES_VDI_WRITE, "Failed to write the requested VDI"},
340         {SD_RES_BASE_VDI_READ, "Failed to read the base VDI"},
341         {SD_RES_BASE_VDI_WRITE, "Failed to write the base VDI"},
342         {SD_RES_NO_TAG, "Failed to find the requested tag"},
343         {SD_RES_STARTUP, "The system is still booting"},
344         {SD_RES_VDI_NOT_LOCKED, "VDI isn't locked"},
345         {SD_RES_SHUTDOWN, "The system is shutting down"},
346         {SD_RES_NO_MEM, "Out of memory on the server"},
347         {SD_RES_FULL_VDI, "We already have the maximum vdis"},
348         {SD_RES_VER_MISMATCH, "Protocol version mismatch"},
349         {SD_RES_NO_SPACE, "Server has no space for new objects"},
350         {SD_RES_WAIT_FOR_FORMAT, "Sheepdog is waiting for a format operation"},
351         {SD_RES_WAIT_FOR_JOIN, "Sheepdog is waiting for other nodes joining"},
352         {SD_RES_JOIN_FAILED, "Target node had failed to join sheepdog"},
353         {SD_RES_HALT, "Sheepdog is stopped serving IO request"},
354         {SD_RES_READONLY, "Object is read-only"},
355     };
356
357     for (i = 0; i < ARRAY_SIZE(errors); ++i) {
358         if (errors[i].err == err) {
359             return errors[i].desc;
360         }
361     }
362
363     return "Invalid error code";
364 }
365
366 /*
367  * Sheepdog I/O handling:
368  *
369  * 1. In sd_co_rw_vector, we send the I/O requests to the server and
370  *    link the requests to the inflight_list in the
371  *    BDRVSheepdogState.  The function exits without waiting for
372  *    receiving the response.
373  *
374  * 2. We receive the response in aio_read_response, the fd handler to
375  *    the sheepdog connection.  If metadata update is needed, we send
376  *    the write request to the vdi object in sd_write_done, the write
377  *    completion function.  We switch back to sd_co_readv/writev after
378  *    all the requests belonging to the AIOCB are finished.
379  */
380
381 static inline AIOReq *alloc_aio_req(BDRVSheepdogState *s, SheepdogAIOCB *acb,
382                                     uint64_t oid, unsigned int data_len,
383                                     uint64_t offset, uint8_t flags,
384                                     uint64_t base_oid, unsigned int iov_offset)
385 {
386     AIOReq *aio_req;
387
388     aio_req = g_malloc(sizeof(*aio_req));
389     aio_req->aiocb = acb;
390     aio_req->iov_offset = iov_offset;
391     aio_req->oid = oid;
392     aio_req->base_oid = base_oid;
393     aio_req->offset = offset;
394     aio_req->data_len = data_len;
395     aio_req->flags = flags;
396     aio_req->id = s->aioreq_seq_num++;
397
398     acb->nr_pending++;
399     return aio_req;
400 }
401
402 static inline void free_aio_req(BDRVSheepdogState *s, AIOReq *aio_req)
403 {
404     SheepdogAIOCB *acb = aio_req->aiocb;
405
406     QLIST_REMOVE(aio_req, aio_siblings);
407     g_free(aio_req);
408
409     acb->nr_pending--;
410 }
411
412 static void coroutine_fn sd_finish_aiocb(SheepdogAIOCB *acb)
413 {
414     if (!acb->canceled) {
415         qemu_coroutine_enter(acb->coroutine, NULL);
416     }
417     qemu_aio_release(acb);
418 }
419
420 static void sd_aio_cancel(BlockDriverAIOCB *blockacb)
421 {
422     SheepdogAIOCB *acb = (SheepdogAIOCB *)blockacb;
423
424     /*
425      * Sheepdog cannot cancel the requests which are already sent to
426      * the servers, so we just complete the request with -EIO here.
427      */
428     acb->ret = -EIO;
429     qemu_coroutine_enter(acb->coroutine, NULL);
430     acb->canceled = true;
431 }
432
433 static const AIOCBInfo sd_aiocb_info = {
434     .aiocb_size = sizeof(SheepdogAIOCB),
435     .cancel = sd_aio_cancel,
436 };
437
438 static SheepdogAIOCB *sd_aio_setup(BlockDriverState *bs, QEMUIOVector *qiov,
439                                    int64_t sector_num, int nb_sectors)
440 {
441     SheepdogAIOCB *acb;
442
443     acb = qemu_aio_get(&sd_aiocb_info, bs, NULL, NULL);
444
445     acb->qiov = qiov;
446
447     acb->sector_num = sector_num;
448     acb->nb_sectors = nb_sectors;
449
450     acb->aio_done_func = NULL;
451     acb->canceled = false;
452     acb->coroutine = qemu_coroutine_self();
453     acb->ret = 0;
454     acb->nr_pending = 0;
455     return acb;
456 }
457
458 static int connect_to_sdog(BDRVSheepdogState *s)
459 {
460     int fd;
461     Error *err = NULL;
462
463     if (s->is_unix) {
464         fd = unix_connect(s->host_spec, &err);
465     } else {
466         fd = inet_connect(s->host_spec, &err);
467
468         if (err == NULL) {
469             int ret = socket_set_nodelay(fd);
470             if (ret < 0) {
471                 error_report("%s", strerror(errno));
472             }
473         }
474     }
475
476     if (err != NULL) {
477         qerror_report_err(err);
478         error_free(err);
479     } else {
480         qemu_set_nonblock(fd);
481     }
482
483     return fd;
484 }
485
486 static coroutine_fn int send_co_req(int sockfd, SheepdogReq *hdr, void *data,
487                                     unsigned int *wlen)
488 {
489     int ret;
490
491     ret = qemu_co_send(sockfd, hdr, sizeof(*hdr));
492     if (ret < sizeof(*hdr)) {
493         error_report("failed to send a req, %s", strerror(errno));
494         return ret;
495     }
496
497     ret = qemu_co_send(sockfd, data, *wlen);
498     if (ret < *wlen) {
499         error_report("failed to send a req, %s", strerror(errno));
500     }
501
502     return ret;
503 }
504
505 static void restart_co_req(void *opaque)
506 {
507     Coroutine *co = opaque;
508
509     qemu_coroutine_enter(co, NULL);
510 }
511
512 typedef struct SheepdogReqCo {
513     int sockfd;
514     SheepdogReq *hdr;
515     void *data;
516     unsigned int *wlen;
517     unsigned int *rlen;
518     int ret;
519     bool finished;
520 } SheepdogReqCo;
521
522 static coroutine_fn void do_co_req(void *opaque)
523 {
524     int ret;
525     Coroutine *co;
526     SheepdogReqCo *srco = opaque;
527     int sockfd = srco->sockfd;
528     SheepdogReq *hdr = srco->hdr;
529     void *data = srco->data;
530     unsigned int *wlen = srco->wlen;
531     unsigned int *rlen = srco->rlen;
532
533     co = qemu_coroutine_self();
534     qemu_aio_set_fd_handler(sockfd, NULL, restart_co_req, co);
535
536     ret = send_co_req(sockfd, hdr, data, wlen);
537     if (ret < 0) {
538         goto out;
539     }
540
541     qemu_aio_set_fd_handler(sockfd, restart_co_req, NULL, co);
542
543     ret = qemu_co_recv(sockfd, hdr, sizeof(*hdr));
544     if (ret < sizeof(*hdr)) {
545         error_report("failed to get a rsp, %s", strerror(errno));
546         ret = -errno;
547         goto out;
548     }
549
550     if (*rlen > hdr->data_length) {
551         *rlen = hdr->data_length;
552     }
553
554     if (*rlen) {
555         ret = qemu_co_recv(sockfd, data, *rlen);
556         if (ret < *rlen) {
557             error_report("failed to get the data, %s", strerror(errno));
558             ret = -errno;
559             goto out;
560         }
561     }
562     ret = 0;
563 out:
564     /* there is at most one request for this sockfd, so it is safe to
565      * set each handler to NULL. */
566     qemu_aio_set_fd_handler(sockfd, NULL, NULL, NULL);
567
568     srco->ret = ret;
569     srco->finished = true;
570 }
571
572 static int do_req(int sockfd, SheepdogReq *hdr, void *data,
573                   unsigned int *wlen, unsigned int *rlen)
574 {
575     Coroutine *co;
576     SheepdogReqCo srco = {
577         .sockfd = sockfd,
578         .hdr = hdr,
579         .data = data,
580         .wlen = wlen,
581         .rlen = rlen,
582         .ret = 0,
583         .finished = false,
584     };
585
586     if (qemu_in_coroutine()) {
587         do_co_req(&srco);
588     } else {
589         co = qemu_coroutine_create(do_co_req);
590         qemu_coroutine_enter(co, &srco);
591         while (!srco.finished) {
592             qemu_aio_wait();
593         }
594     }
595
596     return srco.ret;
597 }
598
599 static int coroutine_fn add_aio_request(BDRVSheepdogState *s, AIOReq *aio_req,
600                            struct iovec *iov, int niov, bool create,
601                            enum AIOCBState aiocb_type);
602 static int coroutine_fn resend_aioreq(BDRVSheepdogState *s, AIOReq *aio_req);
603
604
605 static AIOReq *find_pending_req(BDRVSheepdogState *s, uint64_t oid)
606 {
607     AIOReq *aio_req;
608
609     QLIST_FOREACH(aio_req, &s->pending_aio_head, aio_siblings) {
610         if (aio_req->oid == oid) {
611             return aio_req;
612         }
613     }
614
615     return NULL;
616 }
617
618 /*
619  * This function searchs pending requests to the object `oid', and
620  * sends them.
621  */
622 static void coroutine_fn send_pending_req(BDRVSheepdogState *s, uint64_t oid)
623 {
624     AIOReq *aio_req;
625     SheepdogAIOCB *acb;
626     int ret;
627
628     while ((aio_req = find_pending_req(s, oid)) != NULL) {
629         acb = aio_req->aiocb;
630         /* move aio_req from pending list to inflight one */
631         QLIST_REMOVE(aio_req, aio_siblings);
632         QLIST_INSERT_HEAD(&s->inflight_aio_head, aio_req, aio_siblings);
633         ret = add_aio_request(s, aio_req, acb->qiov->iov,
634                               acb->qiov->niov, false, acb->aiocb_type);
635         if (ret < 0) {
636             error_report("add_aio_request is failed");
637             free_aio_req(s, aio_req);
638             if (!acb->nr_pending) {
639                 sd_finish_aiocb(acb);
640             }
641         }
642     }
643 }
644
645 /*
646  * Receive responses of the I/O requests.
647  *
648  * This function is registered as a fd handler, and called from the
649  * main loop when s->fd is ready for reading responses.
650  */
651 static void coroutine_fn aio_read_response(void *opaque)
652 {
653     SheepdogObjRsp rsp;
654     BDRVSheepdogState *s = opaque;
655     int fd = s->fd;
656     int ret;
657     AIOReq *aio_req = NULL;
658     SheepdogAIOCB *acb;
659     uint64_t idx;
660
661     if (QLIST_EMPTY(&s->inflight_aio_head)) {
662         goto out;
663     }
664
665     /* read a header */
666     ret = qemu_co_recv(fd, &rsp, sizeof(rsp));
667     if (ret < 0) {
668         error_report("failed to get the header, %s", strerror(errno));
669         goto out;
670     }
671
672     /* find the right aio_req from the inflight aio list */
673     QLIST_FOREACH(aio_req, &s->inflight_aio_head, aio_siblings) {
674         if (aio_req->id == rsp.id) {
675             break;
676         }
677     }
678     if (!aio_req) {
679         error_report("cannot find aio_req %x", rsp.id);
680         goto out;
681     }
682
683     acb = aio_req->aiocb;
684
685     switch (acb->aiocb_type) {
686     case AIOCB_WRITE_UDATA:
687         /* this coroutine context is no longer suitable for co_recv
688          * because we may send data to update vdi objects */
689         s->co_recv = NULL;
690         if (!is_data_obj(aio_req->oid)) {
691             break;
692         }
693         idx = data_oid_to_idx(aio_req->oid);
694
695         if (s->inode.data_vdi_id[idx] != s->inode.vdi_id) {
696             /*
697              * If the object is newly created one, we need to update
698              * the vdi object (metadata object).  min_dirty_data_idx
699              * and max_dirty_data_idx are changed to include updated
700              * index between them.
701              */
702             if (rsp.result == SD_RES_SUCCESS) {
703                 s->inode.data_vdi_id[idx] = s->inode.vdi_id;
704                 s->max_dirty_data_idx = MAX(idx, s->max_dirty_data_idx);
705                 s->min_dirty_data_idx = MIN(idx, s->min_dirty_data_idx);
706             }
707             /*
708              * Some requests may be blocked because simultaneous
709              * create requests are not allowed, so we search the
710              * pending requests here.
711              */
712             send_pending_req(s, aio_req->oid);
713         }
714         break;
715     case AIOCB_READ_UDATA:
716         ret = qemu_co_recvv(fd, acb->qiov->iov, acb->qiov->niov,
717                             aio_req->iov_offset, rsp.data_length);
718         if (ret < 0) {
719             error_report("failed to get the data, %s", strerror(errno));
720             goto out;
721         }
722         break;
723     case AIOCB_FLUSH_CACHE:
724         if (rsp.result == SD_RES_INVALID_PARMS) {
725             DPRINTF("disable cache since the server doesn't support it\n");
726             s->cache_flags = SD_FLAG_CMD_DIRECT;
727             rsp.result = SD_RES_SUCCESS;
728         }
729         break;
730     case AIOCB_DISCARD_OBJ:
731         switch (rsp.result) {
732         case SD_RES_INVALID_PARMS:
733             error_report("sheep(%s) doesn't support discard command",
734                          s->host_spec);
735             rsp.result = SD_RES_SUCCESS;
736             s->discard_supported = false;
737             break;
738         case SD_RES_SUCCESS:
739             idx = data_oid_to_idx(aio_req->oid);
740             s->inode.data_vdi_id[idx] = 0;
741             break;
742         default:
743             break;
744         }
745     }
746
747     switch (rsp.result) {
748     case SD_RES_SUCCESS:
749         break;
750     case SD_RES_READONLY:
751         ret = resend_aioreq(s, aio_req);
752         if (ret == SD_RES_SUCCESS) {
753             goto out;
754         }
755         /* fall through */
756     default:
757         acb->ret = -EIO;
758         error_report("%s", sd_strerror(rsp.result));
759         break;
760     }
761
762     free_aio_req(s, aio_req);
763     if (!acb->nr_pending) {
764         /*
765          * We've finished all requests which belong to the AIOCB, so
766          * we can switch back to sd_co_readv/writev now.
767          */
768         acb->aio_done_func(acb);
769     }
770 out:
771     s->co_recv = NULL;
772 }
773
774 static void co_read_response(void *opaque)
775 {
776     BDRVSheepdogState *s = opaque;
777
778     if (!s->co_recv) {
779         s->co_recv = qemu_coroutine_create(aio_read_response);
780     }
781
782     qemu_coroutine_enter(s->co_recv, opaque);
783 }
784
785 static void co_write_request(void *opaque)
786 {
787     BDRVSheepdogState *s = opaque;
788
789     qemu_coroutine_enter(s->co_send, NULL);
790 }
791
792 /*
793  * Return a socket discriptor to read/write objects.
794  *
795  * We cannot use this discriptor for other operations because
796  * the block driver may be on waiting response from the server.
797  */
798 static int get_sheep_fd(BDRVSheepdogState *s)
799 {
800     int fd;
801
802     fd = connect_to_sdog(s);
803     if (fd < 0) {
804         return fd;
805     }
806
807     qemu_aio_set_fd_handler(fd, co_read_response, NULL, s);
808     return fd;
809 }
810
811 static int sd_parse_uri(BDRVSheepdogState *s, const char *filename,
812                         char *vdi, uint32_t *snapid, char *tag)
813 {
814     URI *uri;
815     QueryParams *qp = NULL;
816     int ret = 0;
817
818     uri = uri_parse(filename);
819     if (!uri) {
820         return -EINVAL;
821     }
822
823     /* transport */
824     if (!strcmp(uri->scheme, "sheepdog")) {
825         s->is_unix = false;
826     } else if (!strcmp(uri->scheme, "sheepdog+tcp")) {
827         s->is_unix = false;
828     } else if (!strcmp(uri->scheme, "sheepdog+unix")) {
829         s->is_unix = true;
830     } else {
831         ret = -EINVAL;
832         goto out;
833     }
834
835     if (uri->path == NULL || !strcmp(uri->path, "/")) {
836         ret = -EINVAL;
837         goto out;
838     }
839     pstrcpy(vdi, SD_MAX_VDI_LEN, uri->path + 1);
840
841     qp = query_params_parse(uri->query);
842     if (qp->n > 1 || (s->is_unix && !qp->n) || (!s->is_unix && qp->n)) {
843         ret = -EINVAL;
844         goto out;
845     }
846
847     if (s->is_unix) {
848         /* sheepdog+unix:///vdiname?socket=path */
849         if (uri->server || uri->port || strcmp(qp->p[0].name, "socket")) {
850             ret = -EINVAL;
851             goto out;
852         }
853         s->host_spec = g_strdup(qp->p[0].value);
854     } else {
855         /* sheepdog[+tcp]://[host:port]/vdiname */
856         s->host_spec = g_strdup_printf("%s:%d", uri->server ?: SD_DEFAULT_ADDR,
857                                        uri->port ?: SD_DEFAULT_PORT);
858     }
859
860     /* snapshot tag */
861     if (uri->fragment) {
862         *snapid = strtoul(uri->fragment, NULL, 10);
863         if (*snapid == 0) {
864             pstrcpy(tag, SD_MAX_VDI_TAG_LEN, uri->fragment);
865         }
866     } else {
867         *snapid = CURRENT_VDI_ID; /* search current vdi */
868     }
869
870 out:
871     if (qp) {
872         query_params_free(qp);
873     }
874     uri_free(uri);
875     return ret;
876 }
877
878 /*
879  * Parse a filename (old syntax)
880  *
881  * filename must be one of the following formats:
882  *   1. [vdiname]
883  *   2. [vdiname]:[snapid]
884  *   3. [vdiname]:[tag]
885  *   4. [hostname]:[port]:[vdiname]
886  *   5. [hostname]:[port]:[vdiname]:[snapid]
887  *   6. [hostname]:[port]:[vdiname]:[tag]
888  *
889  * You can boot from the snapshot images by specifying `snapid` or
890  * `tag'.
891  *
892  * You can run VMs outside the Sheepdog cluster by specifying
893  * `hostname' and `port' (experimental).
894  */
895 static int parse_vdiname(BDRVSheepdogState *s, const char *filename,
896                          char *vdi, uint32_t *snapid, char *tag)
897 {
898     char *p, *q, *uri;
899     const char *host_spec, *vdi_spec;
900     int nr_sep, ret;
901
902     strstart(filename, "sheepdog:", (const char **)&filename);
903     p = q = g_strdup(filename);
904
905     /* count the number of separators */
906     nr_sep = 0;
907     while (*p) {
908         if (*p == ':') {
909             nr_sep++;
910         }
911         p++;
912     }
913     p = q;
914
915     /* use the first two tokens as host_spec. */
916     if (nr_sep >= 2) {
917         host_spec = p;
918         p = strchr(p, ':');
919         p++;
920         p = strchr(p, ':');
921         *p++ = '\0';
922     } else {
923         host_spec = "";
924     }
925
926     vdi_spec = p;
927
928     p = strchr(vdi_spec, ':');
929     if (p) {
930         *p++ = '#';
931     }
932
933     uri = g_strdup_printf("sheepdog://%s/%s", host_spec, vdi_spec);
934
935     ret = sd_parse_uri(s, uri, vdi, snapid, tag);
936
937     g_free(q);
938     g_free(uri);
939
940     return ret;
941 }
942
943 static int find_vdi_name(BDRVSheepdogState *s, const char *filename,
944                          uint32_t snapid, const char *tag, uint32_t *vid,
945                          bool lock)
946 {
947     int ret, fd;
948     SheepdogVdiReq hdr;
949     SheepdogVdiRsp *rsp = (SheepdogVdiRsp *)&hdr;
950     unsigned int wlen, rlen = 0;
951     char buf[SD_MAX_VDI_LEN + SD_MAX_VDI_TAG_LEN];
952
953     fd = connect_to_sdog(s);
954     if (fd < 0) {
955         return fd;
956     }
957
958     /* This pair of strncpy calls ensures that the buffer is zero-filled,
959      * which is desirable since we'll soon be sending those bytes, and
960      * don't want the send_req to read uninitialized data.
961      */
962     strncpy(buf, filename, SD_MAX_VDI_LEN);
963     strncpy(buf + SD_MAX_VDI_LEN, tag, SD_MAX_VDI_TAG_LEN);
964
965     memset(&hdr, 0, sizeof(hdr));
966     if (lock) {
967         hdr.opcode = SD_OP_LOCK_VDI;
968     } else {
969         hdr.opcode = SD_OP_GET_VDI_INFO;
970     }
971     wlen = SD_MAX_VDI_LEN + SD_MAX_VDI_TAG_LEN;
972     hdr.proto_ver = SD_PROTO_VER;
973     hdr.data_length = wlen;
974     hdr.snapid = snapid;
975     hdr.flags = SD_FLAG_CMD_WRITE;
976
977     ret = do_req(fd, (SheepdogReq *)&hdr, buf, &wlen, &rlen);
978     if (ret) {
979         goto out;
980     }
981
982     if (rsp->result != SD_RES_SUCCESS) {
983         error_report("cannot get vdi info, %s, %s %d %s",
984                      sd_strerror(rsp->result), filename, snapid, tag);
985         if (rsp->result == SD_RES_NO_VDI) {
986             ret = -ENOENT;
987         } else {
988             ret = -EIO;
989         }
990         goto out;
991     }
992     *vid = rsp->vdi_id;
993
994     ret = 0;
995 out:
996     closesocket(fd);
997     return ret;
998 }
999
1000 static int coroutine_fn add_aio_request(BDRVSheepdogState *s, AIOReq *aio_req,
1001                            struct iovec *iov, int niov, bool create,
1002                            enum AIOCBState aiocb_type)
1003 {
1004     int nr_copies = s->inode.nr_copies;
1005     SheepdogObjReq hdr;
1006     unsigned int wlen = 0;
1007     int ret;
1008     uint64_t oid = aio_req->oid;
1009     unsigned int datalen = aio_req->data_len;
1010     uint64_t offset = aio_req->offset;
1011     uint8_t flags = aio_req->flags;
1012     uint64_t old_oid = aio_req->base_oid;
1013
1014     if (!nr_copies) {
1015         error_report("bug");
1016     }
1017
1018     memset(&hdr, 0, sizeof(hdr));
1019
1020     switch (aiocb_type) {
1021     case AIOCB_FLUSH_CACHE:
1022         hdr.opcode = SD_OP_FLUSH_VDI;
1023         break;
1024     case AIOCB_READ_UDATA:
1025         hdr.opcode = SD_OP_READ_OBJ;
1026         hdr.flags = flags;
1027         break;
1028     case AIOCB_WRITE_UDATA:
1029         if (create) {
1030             hdr.opcode = SD_OP_CREATE_AND_WRITE_OBJ;
1031         } else {
1032             hdr.opcode = SD_OP_WRITE_OBJ;
1033         }
1034         wlen = datalen;
1035         hdr.flags = SD_FLAG_CMD_WRITE | flags;
1036         break;
1037     case AIOCB_DISCARD_OBJ:
1038         hdr.opcode = SD_OP_DISCARD_OBJ;
1039         break;
1040     }
1041
1042     if (s->cache_flags) {
1043         hdr.flags |= s->cache_flags;
1044     }
1045
1046     hdr.oid = oid;
1047     hdr.cow_oid = old_oid;
1048     hdr.copies = s->inode.nr_copies;
1049
1050     hdr.data_length = datalen;
1051     hdr.offset = offset;
1052
1053     hdr.id = aio_req->id;
1054
1055     qemu_co_mutex_lock(&s->lock);
1056     s->co_send = qemu_coroutine_self();
1057     qemu_aio_set_fd_handler(s->fd, co_read_response, co_write_request, s);
1058     socket_set_cork(s->fd, 1);
1059
1060     /* send a header */
1061     ret = qemu_co_send(s->fd, &hdr, sizeof(hdr));
1062     if (ret < 0) {
1063         qemu_co_mutex_unlock(&s->lock);
1064         error_report("failed to send a req, %s", strerror(errno));
1065         return -errno;
1066     }
1067
1068     if (wlen) {
1069         ret = qemu_co_sendv(s->fd, iov, niov, aio_req->iov_offset, wlen);
1070         if (ret < 0) {
1071             qemu_co_mutex_unlock(&s->lock);
1072             error_report("failed to send a data, %s", strerror(errno));
1073             return -errno;
1074         }
1075     }
1076
1077     socket_set_cork(s->fd, 0);
1078     qemu_aio_set_fd_handler(s->fd, co_read_response, NULL, s);
1079     qemu_co_mutex_unlock(&s->lock);
1080
1081     return 0;
1082 }
1083
1084 static int read_write_object(int fd, char *buf, uint64_t oid, int copies,
1085                              unsigned int datalen, uint64_t offset,
1086                              bool write, bool create, uint32_t cache_flags)
1087 {
1088     SheepdogObjReq hdr;
1089     SheepdogObjRsp *rsp = (SheepdogObjRsp *)&hdr;
1090     unsigned int wlen, rlen;
1091     int ret;
1092
1093     memset(&hdr, 0, sizeof(hdr));
1094
1095     if (write) {
1096         wlen = datalen;
1097         rlen = 0;
1098         hdr.flags = SD_FLAG_CMD_WRITE;
1099         if (create) {
1100             hdr.opcode = SD_OP_CREATE_AND_WRITE_OBJ;
1101         } else {
1102             hdr.opcode = SD_OP_WRITE_OBJ;
1103         }
1104     } else {
1105         wlen = 0;
1106         rlen = datalen;
1107         hdr.opcode = SD_OP_READ_OBJ;
1108     }
1109
1110     hdr.flags |= cache_flags;
1111
1112     hdr.oid = oid;
1113     hdr.data_length = datalen;
1114     hdr.offset = offset;
1115     hdr.copies = copies;
1116
1117     ret = do_req(fd, (SheepdogReq *)&hdr, buf, &wlen, &rlen);
1118     if (ret) {
1119         error_report("failed to send a request to the sheep");
1120         return ret;
1121     }
1122
1123     switch (rsp->result) {
1124     case SD_RES_SUCCESS:
1125         return 0;
1126     default:
1127         error_report("%s", sd_strerror(rsp->result));
1128         return -EIO;
1129     }
1130 }
1131
1132 static int read_object(int fd, char *buf, uint64_t oid, int copies,
1133                        unsigned int datalen, uint64_t offset,
1134                        uint32_t cache_flags)
1135 {
1136     return read_write_object(fd, buf, oid, copies, datalen, offset, false,
1137                              false, cache_flags);
1138 }
1139
1140 static int write_object(int fd, char *buf, uint64_t oid, int copies,
1141                         unsigned int datalen, uint64_t offset, bool create,
1142                         uint32_t cache_flags)
1143 {
1144     return read_write_object(fd, buf, oid, copies, datalen, offset, true,
1145                              create, cache_flags);
1146 }
1147
1148 /* update inode with the latest state */
1149 static int reload_inode(BDRVSheepdogState *s, uint32_t snapid, const char *tag)
1150 {
1151     SheepdogInode *inode;
1152     int ret = 0, fd;
1153     uint32_t vid = 0;
1154
1155     fd = connect_to_sdog(s);
1156     if (fd < 0) {
1157         return -EIO;
1158     }
1159
1160     inode = g_malloc(sizeof(s->inode));
1161
1162     ret = find_vdi_name(s, s->name, snapid, tag, &vid, false);
1163     if (ret) {
1164         goto out;
1165     }
1166
1167     ret = read_object(fd, (char *)inode, vid_to_vdi_oid(vid),
1168                       s->inode.nr_copies, sizeof(*inode), 0, s->cache_flags);
1169     if (ret < 0) {
1170         goto out;
1171     }
1172
1173     if (inode->vdi_id != s->inode.vdi_id) {
1174         memcpy(&s->inode, inode, sizeof(s->inode));
1175     }
1176
1177 out:
1178     g_free(inode);
1179     closesocket(fd);
1180
1181     return ret;
1182 }
1183
1184 static int coroutine_fn resend_aioreq(BDRVSheepdogState *s, AIOReq *aio_req)
1185 {
1186     SheepdogAIOCB *acb = aio_req->aiocb;
1187     bool create = false;
1188     int ret;
1189
1190     ret = reload_inode(s, 0, "");
1191     if (ret < 0) {
1192         return ret;
1193     }
1194
1195     aio_req->oid = vid_to_data_oid(s->inode.vdi_id,
1196                                    data_oid_to_idx(aio_req->oid));
1197
1198     /* check whether this request becomes a CoW one */
1199     if (acb->aiocb_type == AIOCB_WRITE_UDATA) {
1200         int idx = data_oid_to_idx(aio_req->oid);
1201         AIOReq *areq;
1202
1203         if (s->inode.data_vdi_id[idx] == 0) {
1204             create = true;
1205             goto out;
1206         }
1207         if (is_data_obj_writable(&s->inode, idx)) {
1208             goto out;
1209         }
1210
1211         /* link to the pending list if there is another CoW request to
1212          * the same object */
1213         QLIST_FOREACH(areq, &s->inflight_aio_head, aio_siblings) {
1214             if (areq != aio_req && areq->oid == aio_req->oid) {
1215                 DPRINTF("simultaneous CoW to %" PRIx64 "\n", aio_req->oid);
1216                 QLIST_REMOVE(aio_req, aio_siblings);
1217                 QLIST_INSERT_HEAD(&s->pending_aio_head, aio_req, aio_siblings);
1218                 return SD_RES_SUCCESS;
1219             }
1220         }
1221
1222         aio_req->base_oid = vid_to_data_oid(s->inode.data_vdi_id[idx], idx);
1223         aio_req->flags |= SD_FLAG_CMD_COW;
1224         create = true;
1225     }
1226 out:
1227     return add_aio_request(s, aio_req, acb->qiov->iov, acb->qiov->niov,
1228                            create, acb->aiocb_type);
1229 }
1230
1231 /* TODO Convert to fine grained options */
1232 static QemuOptsList runtime_opts = {
1233     .name = "sheepdog",
1234     .head = QTAILQ_HEAD_INITIALIZER(runtime_opts.head),
1235     .desc = {
1236         {
1237             .name = "filename",
1238             .type = QEMU_OPT_STRING,
1239             .help = "URL to the sheepdog image",
1240         },
1241         { /* end of list */ }
1242     },
1243 };
1244
1245 static int sd_open(BlockDriverState *bs, QDict *options, int flags)
1246 {
1247     int ret, fd;
1248     uint32_t vid = 0;
1249     BDRVSheepdogState *s = bs->opaque;
1250     char vdi[SD_MAX_VDI_LEN], tag[SD_MAX_VDI_TAG_LEN];
1251     uint32_t snapid;
1252     char *buf = NULL;
1253     QemuOpts *opts;
1254     Error *local_err = NULL;
1255     const char *filename;
1256
1257     opts = qemu_opts_create_nofail(&runtime_opts);
1258     qemu_opts_absorb_qdict(opts, options, &local_err);
1259     if (error_is_set(&local_err)) {
1260         qerror_report_err(local_err);
1261         error_free(local_err);
1262         ret = -EINVAL;
1263         goto out;
1264     }
1265
1266     filename = qemu_opt_get(opts, "filename");
1267
1268     QLIST_INIT(&s->inflight_aio_head);
1269     QLIST_INIT(&s->pending_aio_head);
1270     s->fd = -1;
1271
1272     memset(vdi, 0, sizeof(vdi));
1273     memset(tag, 0, sizeof(tag));
1274
1275     if (strstr(filename, "://")) {
1276         ret = sd_parse_uri(s, filename, vdi, &snapid, tag);
1277     } else {
1278         ret = parse_vdiname(s, filename, vdi, &snapid, tag);
1279     }
1280     if (ret < 0) {
1281         goto out;
1282     }
1283     s->fd = get_sheep_fd(s);
1284     if (s->fd < 0) {
1285         ret = s->fd;
1286         goto out;
1287     }
1288
1289     ret = find_vdi_name(s, vdi, snapid, tag, &vid, true);
1290     if (ret) {
1291         goto out;
1292     }
1293
1294     /*
1295      * QEMU block layer emulates writethrough cache as 'writeback + flush', so
1296      * we always set SD_FLAG_CMD_CACHE (writeback cache) as default.
1297      */
1298     s->cache_flags = SD_FLAG_CMD_CACHE;
1299     if (flags & BDRV_O_NOCACHE) {
1300         s->cache_flags = SD_FLAG_CMD_DIRECT;
1301     }
1302     s->discard_supported = true;
1303
1304     if (snapid || tag[0] != '\0') {
1305         DPRINTF("%" PRIx32 " snapshot inode was open.\n", vid);
1306         s->is_snapshot = true;
1307     }
1308
1309     fd = connect_to_sdog(s);
1310     if (fd < 0) {
1311         ret = fd;
1312         goto out;
1313     }
1314
1315     buf = g_malloc(SD_INODE_SIZE);
1316     ret = read_object(fd, buf, vid_to_vdi_oid(vid), 0, SD_INODE_SIZE, 0,
1317                       s->cache_flags);
1318
1319     closesocket(fd);
1320
1321     if (ret) {
1322         goto out;
1323     }
1324
1325     memcpy(&s->inode, buf, sizeof(s->inode));
1326     s->min_dirty_data_idx = UINT32_MAX;
1327     s->max_dirty_data_idx = 0;
1328
1329     bs->total_sectors = s->inode.vdi_size / BDRV_SECTOR_SIZE;
1330     pstrcpy(s->name, sizeof(s->name), vdi);
1331     qemu_co_mutex_init(&s->lock);
1332     qemu_opts_del(opts);
1333     g_free(buf);
1334     return 0;
1335 out:
1336     qemu_aio_set_fd_handler(s->fd, NULL, NULL, NULL);
1337     if (s->fd >= 0) {
1338         closesocket(s->fd);
1339     }
1340     qemu_opts_del(opts);
1341     g_free(buf);
1342     return ret;
1343 }
1344
1345 static int do_sd_create(BDRVSheepdogState *s, char *filename, int64_t vdi_size,
1346                         uint32_t base_vid, uint32_t *vdi_id, int snapshot)
1347 {
1348     SheepdogVdiReq hdr;
1349     SheepdogVdiRsp *rsp = (SheepdogVdiRsp *)&hdr;
1350     int fd, ret;
1351     unsigned int wlen, rlen = 0;
1352     char buf[SD_MAX_VDI_LEN];
1353
1354     fd = connect_to_sdog(s);
1355     if (fd < 0) {
1356         return fd;
1357     }
1358
1359     /* FIXME: would it be better to fail (e.g., return -EIO) when filename
1360      * does not fit in buf?  For now, just truncate and avoid buffer overrun.
1361      */
1362     memset(buf, 0, sizeof(buf));
1363     pstrcpy(buf, sizeof(buf), filename);
1364
1365     memset(&hdr, 0, sizeof(hdr));
1366     hdr.opcode = SD_OP_NEW_VDI;
1367     hdr.vdi_id = base_vid;
1368
1369     wlen = SD_MAX_VDI_LEN;
1370
1371     hdr.flags = SD_FLAG_CMD_WRITE;
1372     hdr.snapid = snapshot;
1373
1374     hdr.data_length = wlen;
1375     hdr.vdi_size = vdi_size;
1376
1377     ret = do_req(fd, (SheepdogReq *)&hdr, buf, &wlen, &rlen);
1378
1379     closesocket(fd);
1380
1381     if (ret) {
1382         return ret;
1383     }
1384
1385     if (rsp->result != SD_RES_SUCCESS) {
1386         error_report("%s, %s", sd_strerror(rsp->result), filename);
1387         return -EIO;
1388     }
1389
1390     if (vdi_id) {
1391         *vdi_id = rsp->vdi_id;
1392     }
1393
1394     return 0;
1395 }
1396
1397 static int sd_prealloc(const char *filename)
1398 {
1399     BlockDriverState *bs = NULL;
1400     uint32_t idx, max_idx;
1401     int64_t vdi_size;
1402     void *buf = g_malloc0(SD_DATA_OBJ_SIZE);
1403     int ret;
1404
1405     ret = bdrv_file_open(&bs, filename, NULL, BDRV_O_RDWR);
1406     if (ret < 0) {
1407         goto out;
1408     }
1409
1410     vdi_size = bdrv_getlength(bs);
1411     if (vdi_size < 0) {
1412         ret = vdi_size;
1413         goto out;
1414     }
1415     max_idx = DIV_ROUND_UP(vdi_size, SD_DATA_OBJ_SIZE);
1416
1417     for (idx = 0; idx < max_idx; idx++) {
1418         /*
1419          * The created image can be a cloned image, so we need to read
1420          * a data from the source image.
1421          */
1422         ret = bdrv_pread(bs, idx * SD_DATA_OBJ_SIZE, buf, SD_DATA_OBJ_SIZE);
1423         if (ret < 0) {
1424             goto out;
1425         }
1426         ret = bdrv_pwrite(bs, idx * SD_DATA_OBJ_SIZE, buf, SD_DATA_OBJ_SIZE);
1427         if (ret < 0) {
1428             goto out;
1429         }
1430     }
1431 out:
1432     if (bs) {
1433         bdrv_delete(bs);
1434     }
1435     g_free(buf);
1436
1437     return ret;
1438 }
1439
1440 static int sd_create(const char *filename, QEMUOptionParameter *options)
1441 {
1442     int ret = 0;
1443     uint32_t vid = 0, base_vid = 0;
1444     int64_t vdi_size = 0;
1445     char *backing_file = NULL;
1446     BDRVSheepdogState *s;
1447     char vdi[SD_MAX_VDI_LEN], tag[SD_MAX_VDI_TAG_LEN];
1448     uint32_t snapid;
1449     bool prealloc = false;
1450
1451     s = g_malloc0(sizeof(BDRVSheepdogState));
1452
1453     memset(vdi, 0, sizeof(vdi));
1454     memset(tag, 0, sizeof(tag));
1455     if (strstr(filename, "://")) {
1456         ret = sd_parse_uri(s, filename, vdi, &snapid, tag);
1457     } else {
1458         ret = parse_vdiname(s, filename, vdi, &snapid, tag);
1459     }
1460     if (ret < 0) {
1461         goto out;
1462     }
1463
1464     while (options && options->name) {
1465         if (!strcmp(options->name, BLOCK_OPT_SIZE)) {
1466             vdi_size = options->value.n;
1467         } else if (!strcmp(options->name, BLOCK_OPT_BACKING_FILE)) {
1468             backing_file = options->value.s;
1469         } else if (!strcmp(options->name, BLOCK_OPT_PREALLOC)) {
1470             if (!options->value.s || !strcmp(options->value.s, "off")) {
1471                 prealloc = false;
1472             } else if (!strcmp(options->value.s, "full")) {
1473                 prealloc = true;
1474             } else {
1475                 error_report("Invalid preallocation mode: '%s'",
1476                              options->value.s);
1477                 ret = -EINVAL;
1478                 goto out;
1479             }
1480         }
1481         options++;
1482     }
1483
1484     if (vdi_size > SD_MAX_VDI_SIZE) {
1485         error_report("too big image size");
1486         ret = -EINVAL;
1487         goto out;
1488     }
1489
1490     if (backing_file) {
1491         BlockDriverState *bs;
1492         BDRVSheepdogState *s;
1493         BlockDriver *drv;
1494
1495         /* Currently, only Sheepdog backing image is supported. */
1496         drv = bdrv_find_protocol(backing_file, true);
1497         if (!drv || strcmp(drv->protocol_name, "sheepdog") != 0) {
1498             error_report("backing_file must be a sheepdog image");
1499             ret = -EINVAL;
1500             goto out;
1501         }
1502
1503         ret = bdrv_file_open(&bs, backing_file, NULL, 0);
1504         if (ret < 0) {
1505             goto out;
1506         }
1507
1508         s = bs->opaque;
1509
1510         if (!is_snapshot(&s->inode)) {
1511             error_report("cannot clone from a non snapshot vdi");
1512             bdrv_delete(bs);
1513             ret = -EINVAL;
1514             goto out;
1515         }
1516
1517         base_vid = s->inode.vdi_id;
1518         bdrv_delete(bs);
1519     }
1520
1521     ret = do_sd_create(s, vdi, vdi_size, base_vid, &vid, 0);
1522     if (!prealloc || ret) {
1523         goto out;
1524     }
1525
1526     ret = sd_prealloc(filename);
1527 out:
1528     g_free(s);
1529     return ret;
1530 }
1531
1532 static void sd_close(BlockDriverState *bs)
1533 {
1534     BDRVSheepdogState *s = bs->opaque;
1535     SheepdogVdiReq hdr;
1536     SheepdogVdiRsp *rsp = (SheepdogVdiRsp *)&hdr;
1537     unsigned int wlen, rlen = 0;
1538     int fd, ret;
1539
1540     DPRINTF("%s\n", s->name);
1541
1542     fd = connect_to_sdog(s);
1543     if (fd < 0) {
1544         return;
1545     }
1546
1547     memset(&hdr, 0, sizeof(hdr));
1548
1549     hdr.opcode = SD_OP_RELEASE_VDI;
1550     hdr.vdi_id = s->inode.vdi_id;
1551     wlen = strlen(s->name) + 1;
1552     hdr.data_length = wlen;
1553     hdr.flags = SD_FLAG_CMD_WRITE;
1554
1555     ret = do_req(fd, (SheepdogReq *)&hdr, s->name, &wlen, &rlen);
1556
1557     closesocket(fd);
1558
1559     if (!ret && rsp->result != SD_RES_SUCCESS &&
1560         rsp->result != SD_RES_VDI_NOT_LOCKED) {
1561         error_report("%s, %s", sd_strerror(rsp->result), s->name);
1562     }
1563
1564     qemu_aio_set_fd_handler(s->fd, NULL, NULL, NULL);
1565     closesocket(s->fd);
1566     g_free(s->host_spec);
1567 }
1568
1569 static int64_t sd_getlength(BlockDriverState *bs)
1570 {
1571     BDRVSheepdogState *s = bs->opaque;
1572
1573     return s->inode.vdi_size;
1574 }
1575
1576 static int sd_truncate(BlockDriverState *bs, int64_t offset)
1577 {
1578     BDRVSheepdogState *s = bs->opaque;
1579     int ret, fd;
1580     unsigned int datalen;
1581
1582     if (offset < s->inode.vdi_size) {
1583         error_report("shrinking is not supported");
1584         return -EINVAL;
1585     } else if (offset > SD_MAX_VDI_SIZE) {
1586         error_report("too big image size");
1587         return -EINVAL;
1588     }
1589
1590     fd = connect_to_sdog(s);
1591     if (fd < 0) {
1592         return fd;
1593     }
1594
1595     /* we don't need to update entire object */
1596     datalen = SD_INODE_SIZE - sizeof(s->inode.data_vdi_id);
1597     s->inode.vdi_size = offset;
1598     ret = write_object(fd, (char *)&s->inode, vid_to_vdi_oid(s->inode.vdi_id),
1599                        s->inode.nr_copies, datalen, 0, false, s->cache_flags);
1600     close(fd);
1601
1602     if (ret < 0) {
1603         error_report("failed to update an inode.");
1604     }
1605
1606     return ret;
1607 }
1608
1609 /*
1610  * This function is called after writing data objects.  If we need to
1611  * update metadata, this sends a write request to the vdi object.
1612  * Otherwise, this switches back to sd_co_readv/writev.
1613  */
1614 static void coroutine_fn sd_write_done(SheepdogAIOCB *acb)
1615 {
1616     int ret;
1617     BDRVSheepdogState *s = acb->common.bs->opaque;
1618     struct iovec iov;
1619     AIOReq *aio_req;
1620     uint32_t offset, data_len, mn, mx;
1621
1622     mn = s->min_dirty_data_idx;
1623     mx = s->max_dirty_data_idx;
1624     if (mn <= mx) {
1625         /* we need to update the vdi object. */
1626         offset = sizeof(s->inode) - sizeof(s->inode.data_vdi_id) +
1627             mn * sizeof(s->inode.data_vdi_id[0]);
1628         data_len = (mx - mn + 1) * sizeof(s->inode.data_vdi_id[0]);
1629
1630         s->min_dirty_data_idx = UINT32_MAX;
1631         s->max_dirty_data_idx = 0;
1632
1633         iov.iov_base = &s->inode;
1634         iov.iov_len = sizeof(s->inode);
1635         aio_req = alloc_aio_req(s, acb, vid_to_vdi_oid(s->inode.vdi_id),
1636                                 data_len, offset, 0, 0, offset);
1637         QLIST_INSERT_HEAD(&s->inflight_aio_head, aio_req, aio_siblings);
1638         ret = add_aio_request(s, aio_req, &iov, 1, false, AIOCB_WRITE_UDATA);
1639         if (ret) {
1640             free_aio_req(s, aio_req);
1641             acb->ret = -EIO;
1642             goto out;
1643         }
1644
1645         acb->aio_done_func = sd_finish_aiocb;
1646         acb->aiocb_type = AIOCB_WRITE_UDATA;
1647         return;
1648     }
1649 out:
1650     sd_finish_aiocb(acb);
1651 }
1652
1653 /* Delete current working VDI on the snapshot chain */
1654 static bool sd_delete(BDRVSheepdogState *s)
1655 {
1656     unsigned int wlen = SD_MAX_VDI_LEN, rlen = 0;
1657     SheepdogVdiReq hdr = {
1658         .opcode = SD_OP_DEL_VDI,
1659         .vdi_id = s->inode.vdi_id,
1660         .data_length = wlen,
1661         .flags = SD_FLAG_CMD_WRITE,
1662     };
1663     SheepdogVdiRsp *rsp = (SheepdogVdiRsp *)&hdr;
1664     int fd, ret;
1665
1666     fd = connect_to_sdog(s);
1667     if (fd < 0) {
1668         return false;
1669     }
1670
1671     ret = do_req(fd, (SheepdogReq *)&hdr, s->name, &wlen, &rlen);
1672     closesocket(fd);
1673     if (ret) {
1674         return false;
1675     }
1676     switch (rsp->result) {
1677     case SD_RES_NO_VDI:
1678         error_report("%s was already deleted", s->name);
1679         /* fall through */
1680     case SD_RES_SUCCESS:
1681         break;
1682     default:
1683         error_report("%s, %s", sd_strerror(rsp->result), s->name);
1684         return false;
1685     }
1686
1687     return true;
1688 }
1689
1690 /*
1691  * Create a writable VDI from a snapshot
1692  */
1693 static int sd_create_branch(BDRVSheepdogState *s)
1694 {
1695     int ret, fd;
1696     uint32_t vid;
1697     char *buf;
1698     bool deleted;
1699
1700     DPRINTF("%" PRIx32 " is snapshot.\n", s->inode.vdi_id);
1701
1702     buf = g_malloc(SD_INODE_SIZE);
1703
1704     /*
1705      * Even If deletion fails, we will just create extra snapshot based on
1706      * the workding VDI which was supposed to be deleted. So no need to
1707      * false bail out.
1708      */
1709     deleted = sd_delete(s);
1710     ret = do_sd_create(s, s->name, s->inode.vdi_size, s->inode.vdi_id, &vid,
1711                        !deleted);
1712     if (ret) {
1713         goto out;
1714     }
1715
1716     DPRINTF("%" PRIx32 " is created.\n", vid);
1717
1718     fd = connect_to_sdog(s);
1719     if (fd < 0) {
1720         ret = fd;
1721         goto out;
1722     }
1723
1724     ret = read_object(fd, buf, vid_to_vdi_oid(vid), s->inode.nr_copies,
1725                       SD_INODE_SIZE, 0, s->cache_flags);
1726
1727     closesocket(fd);
1728
1729     if (ret < 0) {
1730         goto out;
1731     }
1732
1733     memcpy(&s->inode, buf, sizeof(s->inode));
1734
1735     s->is_snapshot = false;
1736     ret = 0;
1737     DPRINTF("%" PRIx32 " was newly created.\n", s->inode.vdi_id);
1738
1739 out:
1740     g_free(buf);
1741
1742     return ret;
1743 }
1744
1745 /*
1746  * Send I/O requests to the server.
1747  *
1748  * This function sends requests to the server, links the requests to
1749  * the inflight_list in BDRVSheepdogState, and exits without
1750  * waiting the response.  The responses are received in the
1751  * `aio_read_response' function which is called from the main loop as
1752  * a fd handler.
1753  *
1754  * Returns 1 when we need to wait a response, 0 when there is no sent
1755  * request and -errno in error cases.
1756  */
1757 static int coroutine_fn sd_co_rw_vector(void *p)
1758 {
1759     SheepdogAIOCB *acb = p;
1760     int ret = 0;
1761     unsigned long len, done = 0, total = acb->nb_sectors * BDRV_SECTOR_SIZE;
1762     unsigned long idx = acb->sector_num * BDRV_SECTOR_SIZE / SD_DATA_OBJ_SIZE;
1763     uint64_t oid;
1764     uint64_t offset = (acb->sector_num * BDRV_SECTOR_SIZE) % SD_DATA_OBJ_SIZE;
1765     BDRVSheepdogState *s = acb->common.bs->opaque;
1766     SheepdogInode *inode = &s->inode;
1767     AIOReq *aio_req;
1768
1769     if (acb->aiocb_type == AIOCB_WRITE_UDATA && s->is_snapshot) {
1770         /*
1771          * In the case we open the snapshot VDI, Sheepdog creates the
1772          * writable VDI when we do a write operation first.
1773          */
1774         ret = sd_create_branch(s);
1775         if (ret) {
1776             acb->ret = -EIO;
1777             goto out;
1778         }
1779     }
1780
1781     /*
1782      * Make sure we don't free the aiocb before we are done with all requests.
1783      * This additional reference is dropped at the end of this function.
1784      */
1785     acb->nr_pending++;
1786
1787     while (done != total) {
1788         uint8_t flags = 0;
1789         uint64_t old_oid = 0;
1790         bool create = false;
1791
1792         oid = vid_to_data_oid(inode->data_vdi_id[idx], idx);
1793
1794         len = MIN(total - done, SD_DATA_OBJ_SIZE - offset);
1795
1796         switch (acb->aiocb_type) {
1797         case AIOCB_READ_UDATA:
1798             if (!inode->data_vdi_id[idx]) {
1799                 qemu_iovec_memset(acb->qiov, done, 0, len);
1800                 goto done;
1801             }
1802             break;
1803         case AIOCB_WRITE_UDATA:
1804             if (!inode->data_vdi_id[idx]) {
1805                 create = true;
1806             } else if (!is_data_obj_writable(inode, idx)) {
1807                 /* Copy-On-Write */
1808                 create = true;
1809                 old_oid = oid;
1810                 flags = SD_FLAG_CMD_COW;
1811             }
1812             break;
1813         case AIOCB_DISCARD_OBJ:
1814             /*
1815              * We discard the object only when the whole object is
1816              * 1) allocated 2) trimmed. Otherwise, simply skip it.
1817              */
1818             if (len != SD_DATA_OBJ_SIZE || inode->data_vdi_id[idx] == 0) {
1819                 goto done;
1820             }
1821             break;
1822         default:
1823             break;
1824         }
1825
1826         if (create) {
1827             DPRINTF("update ino (%" PRIu32 ") %" PRIu64 " %" PRIu64 " %ld\n",
1828                     inode->vdi_id, oid,
1829                     vid_to_data_oid(inode->data_vdi_id[idx], idx), idx);
1830             oid = vid_to_data_oid(inode->vdi_id, idx);
1831             DPRINTF("new oid %" PRIx64 "\n", oid);
1832         }
1833
1834         aio_req = alloc_aio_req(s, acb, oid, len, offset, flags, old_oid, done);
1835
1836         if (create) {
1837             AIOReq *areq;
1838             QLIST_FOREACH(areq, &s->inflight_aio_head, aio_siblings) {
1839                 if (areq->oid == oid) {
1840                     /*
1841                      * Sheepdog cannot handle simultaneous create
1842                      * requests to the same object.  So we cannot send
1843                      * the request until the previous request
1844                      * finishes.
1845                      */
1846                     aio_req->flags = 0;
1847                     aio_req->base_oid = 0;
1848                     QLIST_INSERT_HEAD(&s->pending_aio_head, aio_req,
1849                                       aio_siblings);
1850                     goto done;
1851                 }
1852             }
1853         }
1854
1855         QLIST_INSERT_HEAD(&s->inflight_aio_head, aio_req, aio_siblings);
1856         ret = add_aio_request(s, aio_req, acb->qiov->iov, acb->qiov->niov,
1857                               create, acb->aiocb_type);
1858         if (ret < 0) {
1859             error_report("add_aio_request is failed");
1860             free_aio_req(s, aio_req);
1861             acb->ret = -EIO;
1862             goto out;
1863         }
1864     done:
1865         offset = 0;
1866         idx++;
1867         done += len;
1868     }
1869 out:
1870     if (!--acb->nr_pending) {
1871         return acb->ret;
1872     }
1873     return 1;
1874 }
1875
1876 static coroutine_fn int sd_co_writev(BlockDriverState *bs, int64_t sector_num,
1877                         int nb_sectors, QEMUIOVector *qiov)
1878 {
1879     SheepdogAIOCB *acb;
1880     int ret;
1881
1882     if (bs->growable && sector_num + nb_sectors > bs->total_sectors) {
1883         ret = sd_truncate(bs, (sector_num + nb_sectors) * BDRV_SECTOR_SIZE);
1884         if (ret < 0) {
1885             return ret;
1886         }
1887         bs->total_sectors = sector_num + nb_sectors;
1888     }
1889
1890     acb = sd_aio_setup(bs, qiov, sector_num, nb_sectors);
1891     acb->aio_done_func = sd_write_done;
1892     acb->aiocb_type = AIOCB_WRITE_UDATA;
1893
1894     ret = sd_co_rw_vector(acb);
1895     if (ret <= 0) {
1896         qemu_aio_release(acb);
1897         return ret;
1898     }
1899
1900     qemu_coroutine_yield();
1901
1902     return acb->ret;
1903 }
1904
1905 static coroutine_fn int sd_co_readv(BlockDriverState *bs, int64_t sector_num,
1906                        int nb_sectors, QEMUIOVector *qiov)
1907 {
1908     SheepdogAIOCB *acb;
1909     int ret;
1910
1911     acb = sd_aio_setup(bs, qiov, sector_num, nb_sectors);
1912     acb->aiocb_type = AIOCB_READ_UDATA;
1913     acb->aio_done_func = sd_finish_aiocb;
1914
1915     ret = sd_co_rw_vector(acb);
1916     if (ret <= 0) {
1917         qemu_aio_release(acb);
1918         return ret;
1919     }
1920
1921     qemu_coroutine_yield();
1922
1923     return acb->ret;
1924 }
1925
1926 static int coroutine_fn sd_co_flush_to_disk(BlockDriverState *bs)
1927 {
1928     BDRVSheepdogState *s = bs->opaque;
1929     SheepdogAIOCB *acb;
1930     AIOReq *aio_req;
1931     int ret;
1932
1933     if (s->cache_flags != SD_FLAG_CMD_CACHE) {
1934         return 0;
1935     }
1936
1937     acb = sd_aio_setup(bs, NULL, 0, 0);
1938     acb->aiocb_type = AIOCB_FLUSH_CACHE;
1939     acb->aio_done_func = sd_finish_aiocb;
1940
1941     aio_req = alloc_aio_req(s, acb, vid_to_vdi_oid(s->inode.vdi_id),
1942                             0, 0, 0, 0, 0);
1943     QLIST_INSERT_HEAD(&s->inflight_aio_head, aio_req, aio_siblings);
1944     ret = add_aio_request(s, aio_req, NULL, 0, false, acb->aiocb_type);
1945     if (ret < 0) {
1946         error_report("add_aio_request is failed");
1947         free_aio_req(s, aio_req);
1948         qemu_aio_release(acb);
1949         return ret;
1950     }
1951
1952     qemu_coroutine_yield();
1953     return acb->ret;
1954 }
1955
1956 static int sd_snapshot_create(BlockDriverState *bs, QEMUSnapshotInfo *sn_info)
1957 {
1958     BDRVSheepdogState *s = bs->opaque;
1959     int ret, fd;
1960     uint32_t new_vid;
1961     SheepdogInode *inode;
1962     unsigned int datalen;
1963
1964     DPRINTF("sn_info: name %s id_str %s s: name %s vm_state_size %" PRId64 " "
1965             "is_snapshot %d\n", sn_info->name, sn_info->id_str,
1966             s->name, sn_info->vm_state_size, s->is_snapshot);
1967
1968     if (s->is_snapshot) {
1969         error_report("You can't create a snapshot of a snapshot VDI, "
1970                      "%s (%" PRIu32 ").", s->name, s->inode.vdi_id);
1971
1972         return -EINVAL;
1973     }
1974
1975     DPRINTF("%s %s\n", sn_info->name, sn_info->id_str);
1976
1977     s->inode.vm_state_size = sn_info->vm_state_size;
1978     s->inode.vm_clock_nsec = sn_info->vm_clock_nsec;
1979     /* It appears that inode.tag does not require a NUL terminator,
1980      * which means this use of strncpy is ok.
1981      */
1982     strncpy(s->inode.tag, sn_info->name, sizeof(s->inode.tag));
1983     /* we don't need to update entire object */
1984     datalen = SD_INODE_SIZE - sizeof(s->inode.data_vdi_id);
1985
1986     /* refresh inode. */
1987     fd = connect_to_sdog(s);
1988     if (fd < 0) {
1989         ret = fd;
1990         goto cleanup;
1991     }
1992
1993     ret = write_object(fd, (char *)&s->inode, vid_to_vdi_oid(s->inode.vdi_id),
1994                        s->inode.nr_copies, datalen, 0, false, s->cache_flags);
1995     if (ret < 0) {
1996         error_report("failed to write snapshot's inode.");
1997         goto cleanup;
1998     }
1999
2000     ret = do_sd_create(s, s->name, s->inode.vdi_size, s->inode.vdi_id, &new_vid,
2001                        1);
2002     if (ret < 0) {
2003         error_report("failed to create inode for snapshot. %s",
2004                      strerror(errno));
2005         goto cleanup;
2006     }
2007
2008     inode = (SheepdogInode *)g_malloc(datalen);
2009
2010     ret = read_object(fd, (char *)inode, vid_to_vdi_oid(new_vid),
2011                       s->inode.nr_copies, datalen, 0, s->cache_flags);
2012
2013     if (ret < 0) {
2014         error_report("failed to read new inode info. %s", strerror(errno));
2015         goto cleanup;
2016     }
2017
2018     memcpy(&s->inode, inode, datalen);
2019     DPRINTF("s->inode: name %s snap_id %x oid %x\n",
2020             s->inode.name, s->inode.snap_id, s->inode.vdi_id);
2021
2022 cleanup:
2023     closesocket(fd);
2024     return ret;
2025 }
2026
2027 /*
2028  * We implement rollback(loadvm) operation to the specified snapshot by
2029  * 1) switch to the snapshot
2030  * 2) rely on sd_create_branch to delete working VDI and
2031  * 3) create a new working VDI based on the speicified snapshot
2032  */
2033 static int sd_snapshot_goto(BlockDriverState *bs, const char *snapshot_id)
2034 {
2035     BDRVSheepdogState *s = bs->opaque;
2036     BDRVSheepdogState *old_s;
2037     char tag[SD_MAX_VDI_TAG_LEN];
2038     uint32_t snapid = 0;
2039     int ret = 0;
2040
2041     old_s = g_malloc(sizeof(BDRVSheepdogState));
2042
2043     memcpy(old_s, s, sizeof(BDRVSheepdogState));
2044
2045     snapid = strtoul(snapshot_id, NULL, 10);
2046     if (snapid) {
2047         tag[0] = 0;
2048     } else {
2049         pstrcpy(tag, sizeof(tag), snapshot_id);
2050     }
2051
2052     ret = reload_inode(s, snapid, tag);
2053     if (ret) {
2054         goto out;
2055     }
2056
2057     ret = sd_create_branch(s);
2058     if (ret) {
2059         goto out;
2060     }
2061
2062     g_free(old_s);
2063
2064     return 0;
2065 out:
2066     /* recover bdrv_sd_state */
2067     memcpy(s, old_s, sizeof(BDRVSheepdogState));
2068     g_free(old_s);
2069
2070     error_report("failed to open. recover old bdrv_sd_state.");
2071
2072     return ret;
2073 }
2074
2075 static int sd_snapshot_delete(BlockDriverState *bs, const char *snapshot_id)
2076 {
2077     /* FIXME: Delete specified snapshot id.  */
2078     return 0;
2079 }
2080
2081 static int sd_snapshot_list(BlockDriverState *bs, QEMUSnapshotInfo **psn_tab)
2082 {
2083     BDRVSheepdogState *s = bs->opaque;
2084     SheepdogReq req;
2085     int fd, nr = 1024, ret, max = BITS_TO_LONGS(SD_NR_VDIS) * sizeof(long);
2086     QEMUSnapshotInfo *sn_tab = NULL;
2087     unsigned wlen, rlen;
2088     int found = 0;
2089     static SheepdogInode inode;
2090     unsigned long *vdi_inuse;
2091     unsigned int start_nr;
2092     uint64_t hval;
2093     uint32_t vid;
2094
2095     vdi_inuse = g_malloc(max);
2096
2097     fd = connect_to_sdog(s);
2098     if (fd < 0) {
2099         ret = fd;
2100         goto out;
2101     }
2102
2103     rlen = max;
2104     wlen = 0;
2105
2106     memset(&req, 0, sizeof(req));
2107
2108     req.opcode = SD_OP_READ_VDIS;
2109     req.data_length = max;
2110
2111     ret = do_req(fd, (SheepdogReq *)&req, vdi_inuse, &wlen, &rlen);
2112
2113     closesocket(fd);
2114     if (ret) {
2115         goto out;
2116     }
2117
2118     sn_tab = g_malloc0(nr * sizeof(*sn_tab));
2119
2120     /* calculate a vdi id with hash function */
2121     hval = fnv_64a_buf(s->name, strlen(s->name), FNV1A_64_INIT);
2122     start_nr = hval & (SD_NR_VDIS - 1);
2123
2124     fd = connect_to_sdog(s);
2125     if (fd < 0) {
2126         ret = fd;
2127         goto out;
2128     }
2129
2130     for (vid = start_nr; found < nr; vid = (vid + 1) % SD_NR_VDIS) {
2131         if (!test_bit(vid, vdi_inuse)) {
2132             break;
2133         }
2134
2135         /* we don't need to read entire object */
2136         ret = read_object(fd, (char *)&inode, vid_to_vdi_oid(vid),
2137                           0, SD_INODE_SIZE - sizeof(inode.data_vdi_id), 0,
2138                           s->cache_flags);
2139
2140         if (ret) {
2141             continue;
2142         }
2143
2144         if (!strcmp(inode.name, s->name) && is_snapshot(&inode)) {
2145             sn_tab[found].date_sec = inode.snap_ctime >> 32;
2146             sn_tab[found].date_nsec = inode.snap_ctime & 0xffffffff;
2147             sn_tab[found].vm_state_size = inode.vm_state_size;
2148             sn_tab[found].vm_clock_nsec = inode.vm_clock_nsec;
2149
2150             snprintf(sn_tab[found].id_str, sizeof(sn_tab[found].id_str), "%u",
2151                      inode.snap_id);
2152             pstrcpy(sn_tab[found].name,
2153                     MIN(sizeof(sn_tab[found].name), sizeof(inode.tag)),
2154                     inode.tag);
2155             found++;
2156         }
2157     }
2158
2159     closesocket(fd);
2160 out:
2161     *psn_tab = sn_tab;
2162
2163     g_free(vdi_inuse);
2164
2165     if (ret < 0) {
2166         return ret;
2167     }
2168
2169     return found;
2170 }
2171
2172 static int do_load_save_vmstate(BDRVSheepdogState *s, uint8_t *data,
2173                                 int64_t pos, int size, int load)
2174 {
2175     bool create;
2176     int fd, ret = 0, remaining = size;
2177     unsigned int data_len;
2178     uint64_t vmstate_oid;
2179     uint64_t offset;
2180     uint32_t vdi_index;
2181     uint32_t vdi_id = load ? s->inode.parent_vdi_id : s->inode.vdi_id;
2182
2183     fd = connect_to_sdog(s);
2184     if (fd < 0) {
2185         return fd;
2186     }
2187
2188     while (remaining) {
2189         vdi_index = pos / SD_DATA_OBJ_SIZE;
2190         offset = pos % SD_DATA_OBJ_SIZE;
2191
2192         data_len = MIN(remaining, SD_DATA_OBJ_SIZE - offset);
2193
2194         vmstate_oid = vid_to_vmstate_oid(vdi_id, vdi_index);
2195
2196         create = (offset == 0);
2197         if (load) {
2198             ret = read_object(fd, (char *)data, vmstate_oid,
2199                               s->inode.nr_copies, data_len, offset,
2200                               s->cache_flags);
2201         } else {
2202             ret = write_object(fd, (char *)data, vmstate_oid,
2203                                s->inode.nr_copies, data_len, offset, create,
2204                                s->cache_flags);
2205         }
2206
2207         if (ret < 0) {
2208             error_report("failed to save vmstate %s", strerror(errno));
2209             goto cleanup;
2210         }
2211
2212         pos += data_len;
2213         data += data_len;
2214         remaining -= data_len;
2215     }
2216     ret = size;
2217 cleanup:
2218     closesocket(fd);
2219     return ret;
2220 }
2221
2222 static int sd_save_vmstate(BlockDriverState *bs, QEMUIOVector *qiov,
2223                            int64_t pos)
2224 {
2225     BDRVSheepdogState *s = bs->opaque;
2226     void *buf;
2227     int ret;
2228
2229     buf = qemu_blockalign(bs, qiov->size);
2230     qemu_iovec_to_buf(qiov, 0, buf, qiov->size);
2231     ret = do_load_save_vmstate(s, (uint8_t *) buf, pos, qiov->size, 0);
2232     qemu_vfree(buf);
2233
2234     return ret;
2235 }
2236
2237 static int sd_load_vmstate(BlockDriverState *bs, uint8_t *data,
2238                            int64_t pos, int size)
2239 {
2240     BDRVSheepdogState *s = bs->opaque;
2241
2242     return do_load_save_vmstate(s, data, pos, size, 1);
2243 }
2244
2245
2246 static coroutine_fn int sd_co_discard(BlockDriverState *bs, int64_t sector_num,
2247                                       int nb_sectors)
2248 {
2249     SheepdogAIOCB *acb;
2250     QEMUIOVector dummy;
2251     BDRVSheepdogState *s = bs->opaque;
2252     int ret;
2253
2254     if (!s->discard_supported) {
2255             return 0;
2256     }
2257
2258     acb = sd_aio_setup(bs, &dummy, sector_num, nb_sectors);
2259     acb->aiocb_type = AIOCB_DISCARD_OBJ;
2260     acb->aio_done_func = sd_finish_aiocb;
2261
2262     ret = sd_co_rw_vector(acb);
2263     if (ret <= 0) {
2264         qemu_aio_release(acb);
2265         return ret;
2266     }
2267
2268     qemu_coroutine_yield();
2269
2270     return acb->ret;
2271 }
2272
2273 static coroutine_fn int
2274 sd_co_is_allocated(BlockDriverState *bs, int64_t sector_num, int nb_sectors,
2275                    int *pnum)
2276 {
2277     BDRVSheepdogState *s = bs->opaque;
2278     SheepdogInode *inode = &s->inode;
2279     unsigned long start = sector_num * BDRV_SECTOR_SIZE / SD_DATA_OBJ_SIZE,
2280                   end = DIV_ROUND_UP((sector_num + nb_sectors) *
2281                                      BDRV_SECTOR_SIZE, SD_DATA_OBJ_SIZE);
2282     unsigned long idx;
2283     int ret = 1;
2284
2285     for (idx = start; idx < end; idx++) {
2286         if (inode->data_vdi_id[idx] == 0) {
2287             break;
2288         }
2289     }
2290     if (idx == start) {
2291         /* Get the longest length of unallocated sectors */
2292         ret = 0;
2293         for (idx = start + 1; idx < end; idx++) {
2294             if (inode->data_vdi_id[idx] != 0) {
2295                 break;
2296             }
2297         }
2298     }
2299
2300     *pnum = (idx - start) * SD_DATA_OBJ_SIZE / BDRV_SECTOR_SIZE;
2301     if (*pnum > nb_sectors) {
2302         *pnum = nb_sectors;
2303     }
2304     return ret;
2305 }
2306
2307 static QEMUOptionParameter sd_create_options[] = {
2308     {
2309         .name = BLOCK_OPT_SIZE,
2310         .type = OPT_SIZE,
2311         .help = "Virtual disk size"
2312     },
2313     {
2314         .name = BLOCK_OPT_BACKING_FILE,
2315         .type = OPT_STRING,
2316         .help = "File name of a base image"
2317     },
2318     {
2319         .name = BLOCK_OPT_PREALLOC,
2320         .type = OPT_STRING,
2321         .help = "Preallocation mode (allowed values: off, full)"
2322     },
2323     { NULL }
2324 };
2325
2326 static BlockDriver bdrv_sheepdog = {
2327     .format_name    = "sheepdog",
2328     .protocol_name  = "sheepdog",
2329     .instance_size  = sizeof(BDRVSheepdogState),
2330     .bdrv_file_open = sd_open,
2331     .bdrv_close     = sd_close,
2332     .bdrv_create    = sd_create,
2333     .bdrv_has_zero_init = bdrv_has_zero_init_1,
2334     .bdrv_getlength = sd_getlength,
2335     .bdrv_truncate  = sd_truncate,
2336
2337     .bdrv_co_readv  = sd_co_readv,
2338     .bdrv_co_writev = sd_co_writev,
2339     .bdrv_co_flush_to_disk  = sd_co_flush_to_disk,
2340     .bdrv_co_discard = sd_co_discard,
2341     .bdrv_co_is_allocated = sd_co_is_allocated,
2342
2343     .bdrv_snapshot_create   = sd_snapshot_create,
2344     .bdrv_snapshot_goto     = sd_snapshot_goto,
2345     .bdrv_snapshot_delete   = sd_snapshot_delete,
2346     .bdrv_snapshot_list     = sd_snapshot_list,
2347
2348     .bdrv_save_vmstate  = sd_save_vmstate,
2349     .bdrv_load_vmstate  = sd_load_vmstate,
2350
2351     .create_options = sd_create_options,
2352 };
2353
2354 static BlockDriver bdrv_sheepdog_tcp = {
2355     .format_name    = "sheepdog",
2356     .protocol_name  = "sheepdog+tcp",
2357     .instance_size  = sizeof(BDRVSheepdogState),
2358     .bdrv_file_open = sd_open,
2359     .bdrv_close     = sd_close,
2360     .bdrv_create    = sd_create,
2361     .bdrv_has_zero_init = bdrv_has_zero_init_1,
2362     .bdrv_getlength = sd_getlength,
2363     .bdrv_truncate  = sd_truncate,
2364
2365     .bdrv_co_readv  = sd_co_readv,
2366     .bdrv_co_writev = sd_co_writev,
2367     .bdrv_co_flush_to_disk  = sd_co_flush_to_disk,
2368     .bdrv_co_discard = sd_co_discard,
2369     .bdrv_co_is_allocated = sd_co_is_allocated,
2370
2371     .bdrv_snapshot_create   = sd_snapshot_create,
2372     .bdrv_snapshot_goto     = sd_snapshot_goto,
2373     .bdrv_snapshot_delete   = sd_snapshot_delete,
2374     .bdrv_snapshot_list     = sd_snapshot_list,
2375
2376     .bdrv_save_vmstate  = sd_save_vmstate,
2377     .bdrv_load_vmstate  = sd_load_vmstate,
2378
2379     .create_options = sd_create_options,
2380 };
2381
2382 static BlockDriver bdrv_sheepdog_unix = {
2383     .format_name    = "sheepdog",
2384     .protocol_name  = "sheepdog+unix",
2385     .instance_size  = sizeof(BDRVSheepdogState),
2386     .bdrv_file_open = sd_open,
2387     .bdrv_close     = sd_close,
2388     .bdrv_create    = sd_create,
2389     .bdrv_has_zero_init = bdrv_has_zero_init_1,
2390     .bdrv_getlength = sd_getlength,
2391     .bdrv_truncate  = sd_truncate,
2392
2393     .bdrv_co_readv  = sd_co_readv,
2394     .bdrv_co_writev = sd_co_writev,
2395     .bdrv_co_flush_to_disk  = sd_co_flush_to_disk,
2396     .bdrv_co_discard = sd_co_discard,
2397     .bdrv_co_is_allocated = sd_co_is_allocated,
2398
2399     .bdrv_snapshot_create   = sd_snapshot_create,
2400     .bdrv_snapshot_goto     = sd_snapshot_goto,
2401     .bdrv_snapshot_delete   = sd_snapshot_delete,
2402     .bdrv_snapshot_list     = sd_snapshot_list,
2403
2404     .bdrv_save_vmstate  = sd_save_vmstate,
2405     .bdrv_load_vmstate  = sd_load_vmstate,
2406
2407     .create_options = sd_create_options,
2408 };
2409
2410 static void bdrv_sheepdog_init(void)
2411 {
2412     bdrv_register(&bdrv_sheepdog);
2413     bdrv_register(&bdrv_sheepdog_tcp);
2414     bdrv_register(&bdrv_sheepdog_unix);
2415 }
2416 block_init(bdrv_sheepdog_init);
This page took 0.158655 seconds and 4 git commands to generate.