]> Git Repo - qemu.git/blob - block/sheepdog.c
Merge remote-tracking branch 'remotes/xtensa/tags/20181030-xtensa' into staging
[qemu.git] / block / sheepdog.c
1 /*
2  * Copyright (C) 2009-2010 Nippon Telegraph and Telephone Corporation.
3  *
4  * This program is free software; you can redistribute it and/or
5  * modify it under the terms of the GNU General Public License version
6  * 2 as published by the Free Software Foundation.
7  *
8  * You should have received a copy of the GNU General Public License
9  * along with this program. If not, see <http://www.gnu.org/licenses/>.
10  *
11  * Contributions after 2012-01-13 are licensed under the terms of the
12  * GNU GPL, version 2 or (at your option) any later version.
13  */
14
15 #include "qemu/osdep.h"
16 #include "qapi/error.h"
17 #include "qapi/qapi-visit-sockets.h"
18 #include "qapi/qapi-visit-block-core.h"
19 #include "qapi/qmp/qdict.h"
20 #include "qapi/qobject-input-visitor.h"
21 #include "qapi/qobject-output-visitor.h"
22 #include "qemu/uri.h"
23 #include "qemu/error-report.h"
24 #include "qemu/option.h"
25 #include "qemu/sockets.h"
26 #include "block/block_int.h"
27 #include "block/qdict.h"
28 #include "sysemu/block-backend.h"
29 #include "qemu/bitops.h"
30 #include "qemu/cutils.h"
31
32 #define SD_PROTO_VER 0x01
33
34 #define SD_DEFAULT_ADDR "localhost"
35 #define SD_DEFAULT_PORT 7000
36
37 #define SD_OP_CREATE_AND_WRITE_OBJ  0x01
38 #define SD_OP_READ_OBJ       0x02
39 #define SD_OP_WRITE_OBJ      0x03
40 /* 0x04 is used internally by Sheepdog */
41
42 #define SD_OP_NEW_VDI        0x11
43 #define SD_OP_LOCK_VDI       0x12
44 #define SD_OP_RELEASE_VDI    0x13
45 #define SD_OP_GET_VDI_INFO   0x14
46 #define SD_OP_READ_VDIS      0x15
47 #define SD_OP_FLUSH_VDI      0x16
48 #define SD_OP_DEL_VDI        0x17
49 #define SD_OP_GET_CLUSTER_DEFAULT   0x18
50
51 #define SD_FLAG_CMD_WRITE    0x01
52 #define SD_FLAG_CMD_COW      0x02
53 #define SD_FLAG_CMD_CACHE    0x04 /* Writeback mode for cache */
54 #define SD_FLAG_CMD_DIRECT   0x08 /* Don't use cache */
55
56 #define SD_RES_SUCCESS       0x00 /* Success */
57 #define SD_RES_UNKNOWN       0x01 /* Unknown error */
58 #define SD_RES_NO_OBJ        0x02 /* No object found */
59 #define SD_RES_EIO           0x03 /* I/O error */
60 #define SD_RES_VDI_EXIST     0x04 /* Vdi exists already */
61 #define SD_RES_INVALID_PARMS 0x05 /* Invalid parameters */
62 #define SD_RES_SYSTEM_ERROR  0x06 /* System error */
63 #define SD_RES_VDI_LOCKED    0x07 /* Vdi is locked */
64 #define SD_RES_NO_VDI        0x08 /* No vdi found */
65 #define SD_RES_NO_BASE_VDI   0x09 /* No base vdi found */
66 #define SD_RES_VDI_READ      0x0A /* Cannot read requested vdi */
67 #define SD_RES_VDI_WRITE     0x0B /* Cannot write requested vdi */
68 #define SD_RES_BASE_VDI_READ 0x0C /* Cannot read base vdi */
69 #define SD_RES_BASE_VDI_WRITE   0x0D /* Cannot write base vdi */
70 #define SD_RES_NO_TAG        0x0E /* Requested tag is not found */
71 #define SD_RES_STARTUP       0x0F /* Sheepdog is on starting up */
72 #define SD_RES_VDI_NOT_LOCKED   0x10 /* Vdi is not locked */
73 #define SD_RES_SHUTDOWN      0x11 /* Sheepdog is shutting down */
74 #define SD_RES_NO_MEM        0x12 /* Cannot allocate memory */
75 #define SD_RES_FULL_VDI      0x13 /* we already have the maximum vdis */
76 #define SD_RES_VER_MISMATCH  0x14 /* Protocol version mismatch */
77 #define SD_RES_NO_SPACE      0x15 /* Server has no room for new objects */
78 #define SD_RES_WAIT_FOR_FORMAT  0x16 /* Waiting for a format operation */
79 #define SD_RES_WAIT_FOR_JOIN    0x17 /* Waiting for other nodes joining */
80 #define SD_RES_JOIN_FAILED   0x18 /* Target node had failed to join sheepdog */
81 #define SD_RES_HALT          0x19 /* Sheepdog is stopped serving IO request */
82 #define SD_RES_READONLY      0x1A /* Object is read-only */
83
84 /*
85  * Object ID rules
86  *
87  *  0 - 19 (20 bits): data object space
88  * 20 - 31 (12 bits): reserved data object space
89  * 32 - 55 (24 bits): vdi object space
90  * 56 - 59 ( 4 bits): reserved vdi object space
91  * 60 - 63 ( 4 bits): object type identifier space
92  */
93
94 #define VDI_SPACE_SHIFT   32
95 #define VDI_BIT (UINT64_C(1) << 63)
96 #define VMSTATE_BIT (UINT64_C(1) << 62)
97 #define MAX_DATA_OBJS (UINT64_C(1) << 20)
98 #define MAX_CHILDREN 1024
99 #define SD_MAX_VDI_LEN 256
100 #define SD_MAX_VDI_TAG_LEN 256
101 #define SD_NR_VDIS   (1U << 24)
102 #define SD_DATA_OBJ_SIZE (UINT64_C(1) << 22)
103 #define SD_MAX_VDI_SIZE (SD_DATA_OBJ_SIZE * MAX_DATA_OBJS)
104 #define SD_DEFAULT_BLOCK_SIZE_SHIFT 22
105 /*
106  * For erasure coding, we use at most SD_EC_MAX_STRIP for data strips and
107  * (SD_EC_MAX_STRIP - 1) for parity strips
108  *
109  * SD_MAX_COPIES is sum of number of data strips and parity strips.
110  */
111 #define SD_EC_MAX_STRIP 16
112 #define SD_MAX_COPIES (SD_EC_MAX_STRIP * 2 - 1)
113
114 #define SD_INODE_SIZE (sizeof(SheepdogInode))
115 #define CURRENT_VDI_ID 0
116
117 #define LOCK_TYPE_NORMAL 0
118 #define LOCK_TYPE_SHARED 1      /* for iSCSI multipath */
119
120 typedef struct SheepdogReq {
121     uint8_t proto_ver;
122     uint8_t opcode;
123     uint16_t flags;
124     uint32_t epoch;
125     uint32_t id;
126     uint32_t data_length;
127     uint32_t opcode_specific[8];
128 } SheepdogReq;
129
130 typedef struct SheepdogRsp {
131     uint8_t proto_ver;
132     uint8_t opcode;
133     uint16_t flags;
134     uint32_t epoch;
135     uint32_t id;
136     uint32_t data_length;
137     uint32_t result;
138     uint32_t opcode_specific[7];
139 } SheepdogRsp;
140
141 typedef struct SheepdogObjReq {
142     uint8_t proto_ver;
143     uint8_t opcode;
144     uint16_t flags;
145     uint32_t epoch;
146     uint32_t id;
147     uint32_t data_length;
148     uint64_t oid;
149     uint64_t cow_oid;
150     uint8_t copies;
151     uint8_t copy_policy;
152     uint8_t reserved[6];
153     uint64_t offset;
154 } SheepdogObjReq;
155
156 typedef struct SheepdogObjRsp {
157     uint8_t proto_ver;
158     uint8_t opcode;
159     uint16_t flags;
160     uint32_t epoch;
161     uint32_t id;
162     uint32_t data_length;
163     uint32_t result;
164     uint8_t copies;
165     uint8_t copy_policy;
166     uint8_t reserved[2];
167     uint32_t pad[6];
168 } SheepdogObjRsp;
169
170 typedef struct SheepdogVdiReq {
171     uint8_t proto_ver;
172     uint8_t opcode;
173     uint16_t flags;
174     uint32_t epoch;
175     uint32_t id;
176     uint32_t data_length;
177     uint64_t vdi_size;
178     uint32_t base_vdi_id;
179     uint8_t copies;
180     uint8_t copy_policy;
181     uint8_t store_policy;
182     uint8_t block_size_shift;
183     uint32_t snapid;
184     uint32_t type;
185     uint32_t pad[2];
186 } SheepdogVdiReq;
187
188 typedef struct SheepdogVdiRsp {
189     uint8_t proto_ver;
190     uint8_t opcode;
191     uint16_t flags;
192     uint32_t epoch;
193     uint32_t id;
194     uint32_t data_length;
195     uint32_t result;
196     uint32_t rsvd;
197     uint32_t vdi_id;
198     uint32_t pad[5];
199 } SheepdogVdiRsp;
200
201 typedef struct SheepdogClusterRsp {
202     uint8_t proto_ver;
203     uint8_t opcode;
204     uint16_t flags;
205     uint32_t epoch;
206     uint32_t id;
207     uint32_t data_length;
208     uint32_t result;
209     uint8_t nr_copies;
210     uint8_t copy_policy;
211     uint8_t block_size_shift;
212     uint8_t __pad1;
213     uint32_t __pad2[6];
214 } SheepdogClusterRsp;
215
216 typedef struct SheepdogInode {
217     char name[SD_MAX_VDI_LEN];
218     char tag[SD_MAX_VDI_TAG_LEN];
219     uint64_t ctime;
220     uint64_t snap_ctime;
221     uint64_t vm_clock_nsec;
222     uint64_t vdi_size;
223     uint64_t vm_state_size;
224     uint16_t copy_policy;
225     uint8_t nr_copies;
226     uint8_t block_size_shift;
227     uint32_t snap_id;
228     uint32_t vdi_id;
229     uint32_t parent_vdi_id;
230     uint32_t child_vdi_id[MAX_CHILDREN];
231     uint32_t data_vdi_id[MAX_DATA_OBJS];
232 } SheepdogInode;
233
234 #define SD_INODE_HEADER_SIZE offsetof(SheepdogInode, data_vdi_id)
235
236 /*
237  * 64 bit FNV-1a non-zero initial basis
238  */
239 #define FNV1A_64_INIT ((uint64_t)0xcbf29ce484222325ULL)
240
241 /*
242  * 64 bit Fowler/Noll/Vo FNV-1a hash code
243  */
244 static inline uint64_t fnv_64a_buf(void *buf, size_t len, uint64_t hval)
245 {
246     unsigned char *bp = buf;
247     unsigned char *be = bp + len;
248     while (bp < be) {
249         hval ^= (uint64_t) *bp++;
250         hval += (hval << 1) + (hval << 4) + (hval << 5) +
251             (hval << 7) + (hval << 8) + (hval << 40);
252     }
253     return hval;
254 }
255
256 static inline bool is_data_obj_writable(SheepdogInode *inode, unsigned int idx)
257 {
258     return inode->vdi_id == inode->data_vdi_id[idx];
259 }
260
261 static inline bool is_data_obj(uint64_t oid)
262 {
263     return !(VDI_BIT & oid);
264 }
265
266 static inline uint64_t data_oid_to_idx(uint64_t oid)
267 {
268     return oid & (MAX_DATA_OBJS - 1);
269 }
270
271 static inline uint32_t oid_to_vid(uint64_t oid)
272 {
273     return (oid & ~VDI_BIT) >> VDI_SPACE_SHIFT;
274 }
275
276 static inline uint64_t vid_to_vdi_oid(uint32_t vid)
277 {
278     return VDI_BIT | ((uint64_t)vid << VDI_SPACE_SHIFT);
279 }
280
281 static inline uint64_t vid_to_vmstate_oid(uint32_t vid, uint32_t idx)
282 {
283     return VMSTATE_BIT | ((uint64_t)vid << VDI_SPACE_SHIFT) | idx;
284 }
285
286 static inline uint64_t vid_to_data_oid(uint32_t vid, uint32_t idx)
287 {
288     return ((uint64_t)vid << VDI_SPACE_SHIFT) | idx;
289 }
290
291 static inline bool is_snapshot(struct SheepdogInode *inode)
292 {
293     return !!inode->snap_ctime;
294 }
295
296 static inline size_t count_data_objs(const struct SheepdogInode *inode)
297 {
298     return DIV_ROUND_UP(inode->vdi_size,
299                         (1UL << inode->block_size_shift));
300 }
301
302 #undef DPRINTF
303 #ifdef DEBUG_SDOG
304 #define DEBUG_SDOG_PRINT 1
305 #else
306 #define DEBUG_SDOG_PRINT 0
307 #endif
308 #define DPRINTF(fmt, args...)                                           \
309     do {                                                                \
310         if (DEBUG_SDOG_PRINT) {                                         \
311             fprintf(stderr, "%s %d: " fmt, __func__, __LINE__, ##args); \
312         }                                                               \
313     } while (0)
314
315 typedef struct SheepdogAIOCB SheepdogAIOCB;
316 typedef struct BDRVSheepdogState BDRVSheepdogState;
317
318 typedef struct AIOReq {
319     SheepdogAIOCB *aiocb;
320     unsigned int iov_offset;
321
322     uint64_t oid;
323     uint64_t base_oid;
324     uint64_t offset;
325     unsigned int data_len;
326     uint8_t flags;
327     uint32_t id;
328     bool create;
329
330     QLIST_ENTRY(AIOReq) aio_siblings;
331 } AIOReq;
332
333 enum AIOCBState {
334     AIOCB_WRITE_UDATA,
335     AIOCB_READ_UDATA,
336     AIOCB_FLUSH_CACHE,
337     AIOCB_DISCARD_OBJ,
338 };
339
340 #define AIOCBOverlapping(x, y)                                 \
341     (!(x->max_affect_data_idx < y->min_affect_data_idx          \
342        || y->max_affect_data_idx < x->min_affect_data_idx))
343
344 struct SheepdogAIOCB {
345     BDRVSheepdogState *s;
346
347     QEMUIOVector *qiov;
348
349     int64_t sector_num;
350     int nb_sectors;
351
352     int ret;
353     enum AIOCBState aiocb_type;
354
355     Coroutine *coroutine;
356     int nr_pending;
357
358     uint32_t min_affect_data_idx;
359     uint32_t max_affect_data_idx;
360
361     /*
362      * The difference between affect_data_idx and dirty_data_idx:
363      * affect_data_idx represents range of index of all request types.
364      * dirty_data_idx represents range of index updated by COW requests.
365      * dirty_data_idx is used for updating an inode object.
366      */
367     uint32_t min_dirty_data_idx;
368     uint32_t max_dirty_data_idx;
369
370     QLIST_ENTRY(SheepdogAIOCB) aiocb_siblings;
371 };
372
373 struct BDRVSheepdogState {
374     BlockDriverState *bs;
375     AioContext *aio_context;
376
377     SheepdogInode inode;
378
379     char name[SD_MAX_VDI_LEN];
380     bool is_snapshot;
381     uint32_t cache_flags;
382     bool discard_supported;
383
384     SocketAddress *addr;
385     int fd;
386
387     CoMutex lock;
388     Coroutine *co_send;
389     Coroutine *co_recv;
390
391     uint32_t aioreq_seq_num;
392
393     /* Every aio request must be linked to either of these queues. */
394     QLIST_HEAD(inflight_aio_head, AIOReq) inflight_aio_head;
395     QLIST_HEAD(failed_aio_head, AIOReq) failed_aio_head;
396
397     CoMutex queue_lock;
398     CoQueue overlapping_queue;
399     QLIST_HEAD(inflight_aiocb_head, SheepdogAIOCB) inflight_aiocb_head;
400 };
401
402 typedef struct BDRVSheepdogReopenState {
403     int fd;
404     int cache_flags;
405 } BDRVSheepdogReopenState;
406
407 static const char *sd_strerror(int err)
408 {
409     int i;
410
411     static const struct {
412         int err;
413         const char *desc;
414     } errors[] = {
415         {SD_RES_SUCCESS, "Success"},
416         {SD_RES_UNKNOWN, "Unknown error"},
417         {SD_RES_NO_OBJ, "No object found"},
418         {SD_RES_EIO, "I/O error"},
419         {SD_RES_VDI_EXIST, "VDI exists already"},
420         {SD_RES_INVALID_PARMS, "Invalid parameters"},
421         {SD_RES_SYSTEM_ERROR, "System error"},
422         {SD_RES_VDI_LOCKED, "VDI is already locked"},
423         {SD_RES_NO_VDI, "No vdi found"},
424         {SD_RES_NO_BASE_VDI, "No base VDI found"},
425         {SD_RES_VDI_READ, "Failed read the requested VDI"},
426         {SD_RES_VDI_WRITE, "Failed to write the requested VDI"},
427         {SD_RES_BASE_VDI_READ, "Failed to read the base VDI"},
428         {SD_RES_BASE_VDI_WRITE, "Failed to write the base VDI"},
429         {SD_RES_NO_TAG, "Failed to find the requested tag"},
430         {SD_RES_STARTUP, "The system is still booting"},
431         {SD_RES_VDI_NOT_LOCKED, "VDI isn't locked"},
432         {SD_RES_SHUTDOWN, "The system is shutting down"},
433         {SD_RES_NO_MEM, "Out of memory on the server"},
434         {SD_RES_FULL_VDI, "We already have the maximum vdis"},
435         {SD_RES_VER_MISMATCH, "Protocol version mismatch"},
436         {SD_RES_NO_SPACE, "Server has no space for new objects"},
437         {SD_RES_WAIT_FOR_FORMAT, "Sheepdog is waiting for a format operation"},
438         {SD_RES_WAIT_FOR_JOIN, "Sheepdog is waiting for other nodes joining"},
439         {SD_RES_JOIN_FAILED, "Target node had failed to join sheepdog"},
440         {SD_RES_HALT, "Sheepdog is stopped serving IO request"},
441         {SD_RES_READONLY, "Object is read-only"},
442     };
443
444     for (i = 0; i < ARRAY_SIZE(errors); ++i) {
445         if (errors[i].err == err) {
446             return errors[i].desc;
447         }
448     }
449
450     return "Invalid error code";
451 }
452
453 /*
454  * Sheepdog I/O handling:
455  *
456  * 1. In sd_co_rw_vector, we send the I/O requests to the server and
457  *    link the requests to the inflight_list in the
458  *    BDRVSheepdogState.  The function yields while waiting for
459  *    receiving the response.
460  *
461  * 2. We receive the response in aio_read_response, the fd handler to
462  *    the sheepdog connection.  We switch back to sd_co_readv/sd_writev
463  *    after all the requests belonging to the AIOCB are finished.  If
464  *    needed, sd_co_writev will send another requests for the vdi object.
465  */
466
467 static inline AIOReq *alloc_aio_req(BDRVSheepdogState *s, SheepdogAIOCB *acb,
468                                     uint64_t oid, unsigned int data_len,
469                                     uint64_t offset, uint8_t flags, bool create,
470                                     uint64_t base_oid, unsigned int iov_offset)
471 {
472     AIOReq *aio_req;
473
474     aio_req = g_malloc(sizeof(*aio_req));
475     aio_req->aiocb = acb;
476     aio_req->iov_offset = iov_offset;
477     aio_req->oid = oid;
478     aio_req->base_oid = base_oid;
479     aio_req->offset = offset;
480     aio_req->data_len = data_len;
481     aio_req->flags = flags;
482     aio_req->id = s->aioreq_seq_num++;
483     aio_req->create = create;
484
485     acb->nr_pending++;
486     return aio_req;
487 }
488
489 static void wait_for_overlapping_aiocb(BDRVSheepdogState *s, SheepdogAIOCB *acb)
490 {
491     SheepdogAIOCB *cb;
492
493 retry:
494     QLIST_FOREACH(cb, &s->inflight_aiocb_head, aiocb_siblings) {
495         if (AIOCBOverlapping(acb, cb)) {
496             qemu_co_queue_wait(&s->overlapping_queue, &s->queue_lock);
497             goto retry;
498         }
499     }
500 }
501
502 static void sd_aio_setup(SheepdogAIOCB *acb, BDRVSheepdogState *s,
503                          QEMUIOVector *qiov, int64_t sector_num, int nb_sectors,
504                          int type)
505 {
506     uint32_t object_size;
507
508     object_size = (UINT32_C(1) << s->inode.block_size_shift);
509
510     acb->s = s;
511
512     acb->qiov = qiov;
513
514     acb->sector_num = sector_num;
515     acb->nb_sectors = nb_sectors;
516
517     acb->coroutine = qemu_coroutine_self();
518     acb->ret = 0;
519     acb->nr_pending = 0;
520
521     acb->min_affect_data_idx = acb->sector_num * BDRV_SECTOR_SIZE / object_size;
522     acb->max_affect_data_idx = (acb->sector_num * BDRV_SECTOR_SIZE +
523                               acb->nb_sectors * BDRV_SECTOR_SIZE) / object_size;
524
525     acb->min_dirty_data_idx = UINT32_MAX;
526     acb->max_dirty_data_idx = 0;
527     acb->aiocb_type = type;
528
529     if (type == AIOCB_FLUSH_CACHE) {
530         return;
531     }
532
533     qemu_co_mutex_lock(&s->queue_lock);
534     wait_for_overlapping_aiocb(s, acb);
535     QLIST_INSERT_HEAD(&s->inflight_aiocb_head, acb, aiocb_siblings);
536     qemu_co_mutex_unlock(&s->queue_lock);
537 }
538
539 static SocketAddress *sd_server_config(QDict *options, Error **errp)
540 {
541     QDict *server = NULL;
542     Visitor *iv = NULL;
543     SocketAddress *saddr = NULL;
544     Error *local_err = NULL;
545
546     qdict_extract_subqdict(options, &server, "server.");
547
548     iv = qobject_input_visitor_new_flat_confused(server, errp);
549     if (!iv) {
550         goto done;
551     }
552
553     visit_type_SocketAddress(iv, NULL, &saddr, &local_err);
554     if (local_err) {
555         error_propagate(errp, local_err);
556         goto done;
557     }
558
559 done:
560     visit_free(iv);
561     qobject_unref(server);
562     return saddr;
563 }
564
565 /* Return -EIO in case of error, file descriptor on success */
566 static int connect_to_sdog(BDRVSheepdogState *s, Error **errp)
567 {
568     int fd;
569
570     fd = socket_connect(s->addr, errp);
571
572     if (s->addr->type == SOCKET_ADDRESS_TYPE_INET && fd >= 0) {
573         int ret = socket_set_nodelay(fd);
574         if (ret < 0) {
575             warn_report("can't set TCP_NODELAY: %s", strerror(errno));
576         }
577     }
578
579     if (fd >= 0) {
580         qemu_set_nonblock(fd);
581     } else {
582         fd = -EIO;
583     }
584
585     return fd;
586 }
587
588 /* Return 0 on success and -errno in case of error */
589 static coroutine_fn int send_co_req(int sockfd, SheepdogReq *hdr, void *data,
590                                     unsigned int *wlen)
591 {
592     int ret;
593
594     ret = qemu_co_send(sockfd, hdr, sizeof(*hdr));
595     if (ret != sizeof(*hdr)) {
596         error_report("failed to send a req, %s", strerror(errno));
597         return -errno;
598     }
599
600     ret = qemu_co_send(sockfd, data, *wlen);
601     if (ret != *wlen) {
602         error_report("failed to send a req, %s", strerror(errno));
603         return -errno;
604     }
605
606     return ret;
607 }
608
609 typedef struct SheepdogReqCo {
610     int sockfd;
611     BlockDriverState *bs;
612     AioContext *aio_context;
613     SheepdogReq *hdr;
614     void *data;
615     unsigned int *wlen;
616     unsigned int *rlen;
617     int ret;
618     bool finished;
619     Coroutine *co;
620 } SheepdogReqCo;
621
622 static void restart_co_req(void *opaque)
623 {
624     SheepdogReqCo *srco = opaque;
625
626     aio_co_wake(srco->co);
627 }
628
629 static coroutine_fn void do_co_req(void *opaque)
630 {
631     int ret;
632     SheepdogReqCo *srco = opaque;
633     int sockfd = srco->sockfd;
634     SheepdogReq *hdr = srco->hdr;
635     void *data = srco->data;
636     unsigned int *wlen = srco->wlen;
637     unsigned int *rlen = srco->rlen;
638
639     srco->co = qemu_coroutine_self();
640     aio_set_fd_handler(srco->aio_context, sockfd, false,
641                        NULL, restart_co_req, NULL, srco);
642
643     ret = send_co_req(sockfd, hdr, data, wlen);
644     if (ret < 0) {
645         goto out;
646     }
647
648     aio_set_fd_handler(srco->aio_context, sockfd, false,
649                        restart_co_req, NULL, NULL, srco);
650
651     ret = qemu_co_recv(sockfd, hdr, sizeof(*hdr));
652     if (ret != sizeof(*hdr)) {
653         error_report("failed to get a rsp, %s", strerror(errno));
654         ret = -errno;
655         goto out;
656     }
657
658     if (*rlen > hdr->data_length) {
659         *rlen = hdr->data_length;
660     }
661
662     if (*rlen) {
663         ret = qemu_co_recv(sockfd, data, *rlen);
664         if (ret != *rlen) {
665             error_report("failed to get the data, %s", strerror(errno));
666             ret = -errno;
667             goto out;
668         }
669     }
670     ret = 0;
671 out:
672     /* there is at most one request for this sockfd, so it is safe to
673      * set each handler to NULL. */
674     aio_set_fd_handler(srco->aio_context, sockfd, false,
675                        NULL, NULL, NULL, NULL);
676
677     srco->co = NULL;
678     srco->ret = ret;
679     /* Set srco->finished before reading bs->wakeup.  */
680     atomic_mb_set(&srco->finished, true);
681     if (srco->bs) {
682         bdrv_wakeup(srco->bs);
683     }
684 }
685
686 /*
687  * Send the request to the sheep in a synchronous manner.
688  *
689  * Return 0 on success, -errno in case of error.
690  */
691 static int do_req(int sockfd, BlockDriverState *bs, SheepdogReq *hdr,
692                   void *data, unsigned int *wlen, unsigned int *rlen)
693 {
694     Coroutine *co;
695     SheepdogReqCo srco = {
696         .sockfd = sockfd,
697         .aio_context = bs ? bdrv_get_aio_context(bs) : qemu_get_aio_context(),
698         .bs = bs,
699         .hdr = hdr,
700         .data = data,
701         .wlen = wlen,
702         .rlen = rlen,
703         .ret = 0,
704         .finished = false,
705     };
706
707     if (qemu_in_coroutine()) {
708         do_co_req(&srco);
709     } else {
710         co = qemu_coroutine_create(do_co_req, &srco);
711         if (bs) {
712             bdrv_coroutine_enter(bs, co);
713             BDRV_POLL_WHILE(bs, !srco.finished);
714         } else {
715             qemu_coroutine_enter(co);
716             while (!srco.finished) {
717                 aio_poll(qemu_get_aio_context(), true);
718             }
719         }
720     }
721
722     return srco.ret;
723 }
724
725 static void coroutine_fn add_aio_request(BDRVSheepdogState *s, AIOReq *aio_req,
726                                          struct iovec *iov, int niov,
727                                          enum AIOCBState aiocb_type);
728 static void coroutine_fn resend_aioreq(BDRVSheepdogState *s, AIOReq *aio_req);
729 static int reload_inode(BDRVSheepdogState *s, uint32_t snapid, const char *tag);
730 static int get_sheep_fd(BDRVSheepdogState *s, Error **errp);
731 static void co_write_request(void *opaque);
732
733 static coroutine_fn void reconnect_to_sdog(void *opaque)
734 {
735     BDRVSheepdogState *s = opaque;
736     AIOReq *aio_req, *next;
737
738     aio_set_fd_handler(s->aio_context, s->fd, false, NULL,
739                        NULL, NULL, NULL);
740     close(s->fd);
741     s->fd = -1;
742
743     /* Wait for outstanding write requests to be completed. */
744     while (s->co_send != NULL) {
745         co_write_request(opaque);
746     }
747
748     /* Try to reconnect the sheepdog server every one second. */
749     while (s->fd < 0) {
750         Error *local_err = NULL;
751         s->fd = get_sheep_fd(s, &local_err);
752         if (s->fd < 0) {
753             DPRINTF("Wait for connection to be established\n");
754             error_report_err(local_err);
755             qemu_co_sleep_ns(QEMU_CLOCK_REALTIME, 1000000000ULL);
756         }
757     };
758
759     /*
760      * Now we have to resend all the request in the inflight queue.  However,
761      * resend_aioreq() can yield and newly created requests can be added to the
762      * inflight queue before the coroutine is resumed.  To avoid mixing them, we
763      * have to move all the inflight requests to the failed queue before
764      * resend_aioreq() is called.
765      */
766     qemu_co_mutex_lock(&s->queue_lock);
767     QLIST_FOREACH_SAFE(aio_req, &s->inflight_aio_head, aio_siblings, next) {
768         QLIST_REMOVE(aio_req, aio_siblings);
769         QLIST_INSERT_HEAD(&s->failed_aio_head, aio_req, aio_siblings);
770     }
771
772     /* Resend all the failed aio requests. */
773     while (!QLIST_EMPTY(&s->failed_aio_head)) {
774         aio_req = QLIST_FIRST(&s->failed_aio_head);
775         QLIST_REMOVE(aio_req, aio_siblings);
776         qemu_co_mutex_unlock(&s->queue_lock);
777         resend_aioreq(s, aio_req);
778         qemu_co_mutex_lock(&s->queue_lock);
779     }
780     qemu_co_mutex_unlock(&s->queue_lock);
781 }
782
783 /*
784  * Receive responses of the I/O requests.
785  *
786  * This function is registered as a fd handler, and called from the
787  * main loop when s->fd is ready for reading responses.
788  */
789 static void coroutine_fn aio_read_response(void *opaque)
790 {
791     SheepdogObjRsp rsp;
792     BDRVSheepdogState *s = opaque;
793     int fd = s->fd;
794     int ret;
795     AIOReq *aio_req = NULL;
796     SheepdogAIOCB *acb;
797     uint64_t idx;
798
799     /* read a header */
800     ret = qemu_co_recv(fd, &rsp, sizeof(rsp));
801     if (ret != sizeof(rsp)) {
802         error_report("failed to get the header, %s", strerror(errno));
803         goto err;
804     }
805
806     /* find the right aio_req from the inflight aio list */
807     QLIST_FOREACH(aio_req, &s->inflight_aio_head, aio_siblings) {
808         if (aio_req->id == rsp.id) {
809             break;
810         }
811     }
812     if (!aio_req) {
813         error_report("cannot find aio_req %x", rsp.id);
814         goto err;
815     }
816
817     acb = aio_req->aiocb;
818
819     switch (acb->aiocb_type) {
820     case AIOCB_WRITE_UDATA:
821         if (!is_data_obj(aio_req->oid)) {
822             break;
823         }
824         idx = data_oid_to_idx(aio_req->oid);
825
826         if (aio_req->create) {
827             /*
828              * If the object is newly created one, we need to update
829              * the vdi object (metadata object).  min_dirty_data_idx
830              * and max_dirty_data_idx are changed to include updated
831              * index between them.
832              */
833             if (rsp.result == SD_RES_SUCCESS) {
834                 s->inode.data_vdi_id[idx] = s->inode.vdi_id;
835                 acb->max_dirty_data_idx = MAX(idx, acb->max_dirty_data_idx);
836                 acb->min_dirty_data_idx = MIN(idx, acb->min_dirty_data_idx);
837             }
838         }
839         break;
840     case AIOCB_READ_UDATA:
841         ret = qemu_co_recvv(fd, acb->qiov->iov, acb->qiov->niov,
842                             aio_req->iov_offset, rsp.data_length);
843         if (ret != rsp.data_length) {
844             error_report("failed to get the data, %s", strerror(errno));
845             goto err;
846         }
847         break;
848     case AIOCB_FLUSH_CACHE:
849         if (rsp.result == SD_RES_INVALID_PARMS) {
850             DPRINTF("disable cache since the server doesn't support it\n");
851             s->cache_flags = SD_FLAG_CMD_DIRECT;
852             rsp.result = SD_RES_SUCCESS;
853         }
854         break;
855     case AIOCB_DISCARD_OBJ:
856         switch (rsp.result) {
857         case SD_RES_INVALID_PARMS:
858             error_report("server doesn't support discard command");
859             rsp.result = SD_RES_SUCCESS;
860             s->discard_supported = false;
861             break;
862         default:
863             break;
864         }
865     }
866
867     /* No more data for this aio_req (reload_inode below uses its own file
868      * descriptor handler which doesn't use co_recv).
869     */
870     s->co_recv = NULL;
871
872     qemu_co_mutex_lock(&s->queue_lock);
873     QLIST_REMOVE(aio_req, aio_siblings);
874     qemu_co_mutex_unlock(&s->queue_lock);
875
876     switch (rsp.result) {
877     case SD_RES_SUCCESS:
878         break;
879     case SD_RES_READONLY:
880         if (s->inode.vdi_id == oid_to_vid(aio_req->oid)) {
881             ret = reload_inode(s, 0, "");
882             if (ret < 0) {
883                 goto err;
884             }
885         }
886         if (is_data_obj(aio_req->oid)) {
887             aio_req->oid = vid_to_data_oid(s->inode.vdi_id,
888                                            data_oid_to_idx(aio_req->oid));
889         } else {
890             aio_req->oid = vid_to_vdi_oid(s->inode.vdi_id);
891         }
892         resend_aioreq(s, aio_req);
893         return;
894     default:
895         acb->ret = -EIO;
896         error_report("%s", sd_strerror(rsp.result));
897         break;
898     }
899
900     g_free(aio_req);
901
902     if (!--acb->nr_pending) {
903         /*
904          * We've finished all requests which belong to the AIOCB, so
905          * we can switch back to sd_co_readv/writev now.
906          */
907         aio_co_wake(acb->coroutine);
908     }
909
910     return;
911
912 err:
913     reconnect_to_sdog(opaque);
914 }
915
916 static void co_read_response(void *opaque)
917 {
918     BDRVSheepdogState *s = opaque;
919
920     if (!s->co_recv) {
921         s->co_recv = qemu_coroutine_create(aio_read_response, opaque);
922     }
923
924     aio_co_enter(s->aio_context, s->co_recv);
925 }
926
927 static void co_write_request(void *opaque)
928 {
929     BDRVSheepdogState *s = opaque;
930
931     aio_co_wake(s->co_send);
932 }
933
934 /*
935  * Return a socket descriptor to read/write objects.
936  *
937  * We cannot use this descriptor for other operations because
938  * the block driver may be on waiting response from the server.
939  */
940 static int get_sheep_fd(BDRVSheepdogState *s, Error **errp)
941 {
942     int fd;
943
944     fd = connect_to_sdog(s, errp);
945     if (fd < 0) {
946         return fd;
947     }
948
949     aio_set_fd_handler(s->aio_context, fd, false,
950                        co_read_response, NULL, NULL, s);
951     return fd;
952 }
953
954 /*
955  * Parse numeric snapshot ID in @str
956  * If @str can't be parsed as number, return false.
957  * Else, if the number is zero or too large, set *@snapid to zero and
958  * return true.
959  * Else, set *@snapid to the number and return true.
960  */
961 static bool sd_parse_snapid(const char *str, uint32_t *snapid)
962 {
963     unsigned long ul;
964     int ret;
965
966     ret = qemu_strtoul(str, NULL, 10, &ul);
967     if (ret == -ERANGE) {
968         ul = ret = 0;
969     }
970     if (ret) {
971         return false;
972     }
973     if (ul > UINT32_MAX) {
974         ul = 0;
975     }
976
977     *snapid = ul;
978     return true;
979 }
980
981 static bool sd_parse_snapid_or_tag(const char *str,
982                                    uint32_t *snapid, char tag[])
983 {
984     if (!sd_parse_snapid(str, snapid)) {
985         *snapid = 0;
986         if (g_strlcpy(tag, str, SD_MAX_VDI_TAG_LEN) >= SD_MAX_VDI_TAG_LEN) {
987             return false;
988         }
989     } else if (!*snapid) {
990         return false;
991     } else {
992         tag[0] = 0;
993     }
994     return true;
995 }
996
997 typedef struct {
998     const char *path;           /* non-null iff transport is tcp */
999     const char *host;           /* valid when transport is tcp */
1000     int port;                   /* valid when transport is tcp */
1001     char vdi[SD_MAX_VDI_LEN];
1002     char tag[SD_MAX_VDI_TAG_LEN];
1003     uint32_t snap_id;
1004     /* Remainder is only for sd_config_done() */
1005     URI *uri;
1006     QueryParams *qp;
1007 } SheepdogConfig;
1008
1009 static void sd_config_done(SheepdogConfig *cfg)
1010 {
1011     if (cfg->qp) {
1012         query_params_free(cfg->qp);
1013     }
1014     uri_free(cfg->uri);
1015 }
1016
1017 static void sd_parse_uri(SheepdogConfig *cfg, const char *filename,
1018                          Error **errp)
1019 {
1020     Error *err = NULL;
1021     QueryParams *qp = NULL;
1022     bool is_unix;
1023     URI *uri;
1024
1025     memset(cfg, 0, sizeof(*cfg));
1026
1027     cfg->uri = uri = uri_parse(filename);
1028     if (!uri) {
1029         error_setg(&err, "invalid URI '%s'", filename);
1030         goto out;
1031     }
1032
1033     /* transport */
1034     if (!g_strcmp0(uri->scheme, "sheepdog")) {
1035         is_unix = false;
1036     } else if (!g_strcmp0(uri->scheme, "sheepdog+tcp")) {
1037         is_unix = false;
1038     } else if (!g_strcmp0(uri->scheme, "sheepdog+unix")) {
1039         is_unix = true;
1040     } else {
1041         error_setg(&err, "URI scheme must be 'sheepdog', 'sheepdog+tcp',"
1042                    " or 'sheepdog+unix'");
1043         goto out;
1044     }
1045
1046     if (uri->path == NULL || !strcmp(uri->path, "/")) {
1047         error_setg(&err, "missing file path in URI");
1048         goto out;
1049     }
1050     if (g_strlcpy(cfg->vdi, uri->path + 1, SD_MAX_VDI_LEN)
1051         >= SD_MAX_VDI_LEN) {
1052         error_setg(&err, "VDI name is too long");
1053         goto out;
1054     }
1055
1056     cfg->qp = qp = query_params_parse(uri->query);
1057
1058     if (is_unix) {
1059         /* sheepdog+unix:///vdiname?socket=path */
1060         if (uri->server || uri->port) {
1061             error_setg(&err, "URI scheme %s doesn't accept a server address",
1062                        uri->scheme);
1063             goto out;
1064         }
1065         if (!qp->n) {
1066             error_setg(&err,
1067                        "URI scheme %s requires query parameter 'socket'",
1068                        uri->scheme);
1069             goto out;
1070         }
1071         if (qp->n != 1 || strcmp(qp->p[0].name, "socket")) {
1072             error_setg(&err, "unexpected query parameters");
1073             goto out;
1074         }
1075         cfg->path = qp->p[0].value;
1076     } else {
1077         /* sheepdog[+tcp]://[host:port]/vdiname */
1078         if (qp->n) {
1079             error_setg(&err, "unexpected query parameters");
1080             goto out;
1081         }
1082         cfg->host = uri->server;
1083         cfg->port = uri->port;
1084     }
1085
1086     /* snapshot tag */
1087     if (uri->fragment) {
1088         if (!sd_parse_snapid_or_tag(uri->fragment,
1089                                     &cfg->snap_id, cfg->tag)) {
1090             error_setg(&err, "'%s' is not a valid snapshot ID",
1091                        uri->fragment);
1092             goto out;
1093         }
1094     } else {
1095         cfg->snap_id = CURRENT_VDI_ID; /* search current vdi */
1096     }
1097
1098 out:
1099     if (err) {
1100         error_propagate(errp, err);
1101         sd_config_done(cfg);
1102     }
1103 }
1104
1105 /*
1106  * Parse a filename (old syntax)
1107  *
1108  * filename must be one of the following formats:
1109  *   1. [vdiname]
1110  *   2. [vdiname]:[snapid]
1111  *   3. [vdiname]:[tag]
1112  *   4. [hostname]:[port]:[vdiname]
1113  *   5. [hostname]:[port]:[vdiname]:[snapid]
1114  *   6. [hostname]:[port]:[vdiname]:[tag]
1115  *
1116  * You can boot from the snapshot images by specifying `snapid` or
1117  * `tag'.
1118  *
1119  * You can run VMs outside the Sheepdog cluster by specifying
1120  * `hostname' and `port' (experimental).
1121  */
1122 static void parse_vdiname(SheepdogConfig *cfg, const char *filename,
1123                           Error **errp)
1124 {
1125     Error *err = NULL;
1126     char *p, *q, *uri;
1127     const char *host_spec, *vdi_spec;
1128     int nr_sep;
1129
1130     strstart(filename, "sheepdog:", &filename);
1131     p = q = g_strdup(filename);
1132
1133     /* count the number of separators */
1134     nr_sep = 0;
1135     while (*p) {
1136         if (*p == ':') {
1137             nr_sep++;
1138         }
1139         p++;
1140     }
1141     p = q;
1142
1143     /* use the first two tokens as host_spec. */
1144     if (nr_sep >= 2) {
1145         host_spec = p;
1146         p = strchr(p, ':');
1147         p++;
1148         p = strchr(p, ':');
1149         *p++ = '\0';
1150     } else {
1151         host_spec = "";
1152     }
1153
1154     vdi_spec = p;
1155
1156     p = strchr(vdi_spec, ':');
1157     if (p) {
1158         *p++ = '#';
1159     }
1160
1161     uri = g_strdup_printf("sheepdog://%s/%s", host_spec, vdi_spec);
1162
1163     /*
1164      * FIXME We to escape URI meta-characters, e.g. "x?y=z"
1165      * produces "sheepdog://x?y=z".  Because of that ...
1166      */
1167     sd_parse_uri(cfg, uri, &err);
1168     if (err) {
1169         /*
1170          * ... this can fail, but the error message is misleading.
1171          * Replace it by the traditional useless one until the
1172          * escaping is fixed.
1173          */
1174         error_free(err);
1175         error_setg(errp, "Can't parse filename");
1176     }
1177
1178     g_free(q);
1179     g_free(uri);
1180 }
1181
1182 static void sd_parse_filename(const char *filename, QDict *options,
1183                               Error **errp)
1184 {
1185     Error *err = NULL;
1186     SheepdogConfig cfg;
1187     char buf[32];
1188
1189     if (strstr(filename, "://")) {
1190         sd_parse_uri(&cfg, filename, &err);
1191     } else {
1192         parse_vdiname(&cfg, filename, &err);
1193     }
1194     if (err) {
1195         error_propagate(errp, err);
1196         return;
1197     }
1198
1199     if (cfg.path) {
1200         qdict_set_default_str(options, "server.path", cfg.path);
1201         qdict_set_default_str(options, "server.type", "unix");
1202     } else {
1203         qdict_set_default_str(options, "server.type", "inet");
1204         qdict_set_default_str(options, "server.host",
1205                               cfg.host ?: SD_DEFAULT_ADDR);
1206         snprintf(buf, sizeof(buf), "%d", cfg.port ?: SD_DEFAULT_PORT);
1207         qdict_set_default_str(options, "server.port", buf);
1208     }
1209     qdict_set_default_str(options, "vdi", cfg.vdi);
1210     qdict_set_default_str(options, "tag", cfg.tag);
1211     if (cfg.snap_id) {
1212         snprintf(buf, sizeof(buf), "%d", cfg.snap_id);
1213         qdict_set_default_str(options, "snap-id", buf);
1214     }
1215
1216     sd_config_done(&cfg);
1217 }
1218
1219 static int find_vdi_name(BDRVSheepdogState *s, const char *filename,
1220                          uint32_t snapid, const char *tag, uint32_t *vid,
1221                          bool lock, Error **errp)
1222 {
1223     int ret, fd;
1224     SheepdogVdiReq hdr;
1225     SheepdogVdiRsp *rsp = (SheepdogVdiRsp *)&hdr;
1226     unsigned int wlen, rlen = 0;
1227     char buf[SD_MAX_VDI_LEN + SD_MAX_VDI_TAG_LEN];
1228
1229     fd = connect_to_sdog(s, errp);
1230     if (fd < 0) {
1231         return fd;
1232     }
1233
1234     /* This pair of strncpy calls ensures that the buffer is zero-filled,
1235      * which is desirable since we'll soon be sending those bytes, and
1236      * don't want the send_req to read uninitialized data.
1237      */
1238     strncpy(buf, filename, SD_MAX_VDI_LEN);
1239     strncpy(buf + SD_MAX_VDI_LEN, tag, SD_MAX_VDI_TAG_LEN);
1240
1241     memset(&hdr, 0, sizeof(hdr));
1242     if (lock) {
1243         hdr.opcode = SD_OP_LOCK_VDI;
1244         hdr.type = LOCK_TYPE_NORMAL;
1245     } else {
1246         hdr.opcode = SD_OP_GET_VDI_INFO;
1247     }
1248     wlen = SD_MAX_VDI_LEN + SD_MAX_VDI_TAG_LEN;
1249     hdr.proto_ver = SD_PROTO_VER;
1250     hdr.data_length = wlen;
1251     hdr.snapid = snapid;
1252     hdr.flags = SD_FLAG_CMD_WRITE;
1253
1254     ret = do_req(fd, s->bs, (SheepdogReq *)&hdr, buf, &wlen, &rlen);
1255     if (ret) {
1256         error_setg_errno(errp, -ret, "cannot get vdi info");
1257         goto out;
1258     }
1259
1260     if (rsp->result != SD_RES_SUCCESS) {
1261         error_setg(errp, "cannot get vdi info, %s, %s %" PRIu32 " %s",
1262                    sd_strerror(rsp->result), filename, snapid, tag);
1263         if (rsp->result == SD_RES_NO_VDI) {
1264             ret = -ENOENT;
1265         } else if (rsp->result == SD_RES_VDI_LOCKED) {
1266             ret = -EBUSY;
1267         } else {
1268             ret = -EIO;
1269         }
1270         goto out;
1271     }
1272     *vid = rsp->vdi_id;
1273
1274     ret = 0;
1275 out:
1276     closesocket(fd);
1277     return ret;
1278 }
1279
1280 static void coroutine_fn add_aio_request(BDRVSheepdogState *s, AIOReq *aio_req,
1281                                          struct iovec *iov, int niov,
1282                                          enum AIOCBState aiocb_type)
1283 {
1284     int nr_copies = s->inode.nr_copies;
1285     SheepdogObjReq hdr;
1286     unsigned int wlen = 0;
1287     int ret;
1288     uint64_t oid = aio_req->oid;
1289     unsigned int datalen = aio_req->data_len;
1290     uint64_t offset = aio_req->offset;
1291     uint8_t flags = aio_req->flags;
1292     uint64_t old_oid = aio_req->base_oid;
1293     bool create = aio_req->create;
1294
1295     qemu_co_mutex_lock(&s->queue_lock);
1296     QLIST_INSERT_HEAD(&s->inflight_aio_head, aio_req, aio_siblings);
1297     qemu_co_mutex_unlock(&s->queue_lock);
1298
1299     if (!nr_copies) {
1300         error_report("bug");
1301     }
1302
1303     memset(&hdr, 0, sizeof(hdr));
1304
1305     switch (aiocb_type) {
1306     case AIOCB_FLUSH_CACHE:
1307         hdr.opcode = SD_OP_FLUSH_VDI;
1308         break;
1309     case AIOCB_READ_UDATA:
1310         hdr.opcode = SD_OP_READ_OBJ;
1311         hdr.flags = flags;
1312         break;
1313     case AIOCB_WRITE_UDATA:
1314         if (create) {
1315             hdr.opcode = SD_OP_CREATE_AND_WRITE_OBJ;
1316         } else {
1317             hdr.opcode = SD_OP_WRITE_OBJ;
1318         }
1319         wlen = datalen;
1320         hdr.flags = SD_FLAG_CMD_WRITE | flags;
1321         break;
1322     case AIOCB_DISCARD_OBJ:
1323         hdr.opcode = SD_OP_WRITE_OBJ;
1324         hdr.flags = SD_FLAG_CMD_WRITE | flags;
1325         s->inode.data_vdi_id[data_oid_to_idx(oid)] = 0;
1326         offset = offsetof(SheepdogInode,
1327                           data_vdi_id[data_oid_to_idx(oid)]);
1328         oid = vid_to_vdi_oid(s->inode.vdi_id);
1329         wlen = datalen = sizeof(uint32_t);
1330         break;
1331     }
1332
1333     if (s->cache_flags) {
1334         hdr.flags |= s->cache_flags;
1335     }
1336
1337     hdr.oid = oid;
1338     hdr.cow_oid = old_oid;
1339     hdr.copies = s->inode.nr_copies;
1340
1341     hdr.data_length = datalen;
1342     hdr.offset = offset;
1343
1344     hdr.id = aio_req->id;
1345
1346     qemu_co_mutex_lock(&s->lock);
1347     s->co_send = qemu_coroutine_self();
1348     aio_set_fd_handler(s->aio_context, s->fd, false,
1349                        co_read_response, co_write_request, NULL, s);
1350     socket_set_cork(s->fd, 1);
1351
1352     /* send a header */
1353     ret = qemu_co_send(s->fd, &hdr, sizeof(hdr));
1354     if (ret != sizeof(hdr)) {
1355         error_report("failed to send a req, %s", strerror(errno));
1356         goto out;
1357     }
1358
1359     if (wlen) {
1360         ret = qemu_co_sendv(s->fd, iov, niov, aio_req->iov_offset, wlen);
1361         if (ret != wlen) {
1362             error_report("failed to send a data, %s", strerror(errno));
1363         }
1364     }
1365 out:
1366     socket_set_cork(s->fd, 0);
1367     aio_set_fd_handler(s->aio_context, s->fd, false,
1368                        co_read_response, NULL, NULL, s);
1369     s->co_send = NULL;
1370     qemu_co_mutex_unlock(&s->lock);
1371 }
1372
1373 static int read_write_object(int fd, BlockDriverState *bs, char *buf,
1374                              uint64_t oid, uint8_t copies,
1375                              unsigned int datalen, uint64_t offset,
1376                              bool write, bool create, uint32_t cache_flags)
1377 {
1378     SheepdogObjReq hdr;
1379     SheepdogObjRsp *rsp = (SheepdogObjRsp *)&hdr;
1380     unsigned int wlen, rlen;
1381     int ret;
1382
1383     memset(&hdr, 0, sizeof(hdr));
1384
1385     if (write) {
1386         wlen = datalen;
1387         rlen = 0;
1388         hdr.flags = SD_FLAG_CMD_WRITE;
1389         if (create) {
1390             hdr.opcode = SD_OP_CREATE_AND_WRITE_OBJ;
1391         } else {
1392             hdr.opcode = SD_OP_WRITE_OBJ;
1393         }
1394     } else {
1395         wlen = 0;
1396         rlen = datalen;
1397         hdr.opcode = SD_OP_READ_OBJ;
1398     }
1399
1400     hdr.flags |= cache_flags;
1401
1402     hdr.oid = oid;
1403     hdr.data_length = datalen;
1404     hdr.offset = offset;
1405     hdr.copies = copies;
1406
1407     ret = do_req(fd, bs, (SheepdogReq *)&hdr, buf, &wlen, &rlen);
1408     if (ret) {
1409         error_report("failed to send a request to the sheep");
1410         return ret;
1411     }
1412
1413     switch (rsp->result) {
1414     case SD_RES_SUCCESS:
1415         return 0;
1416     default:
1417         error_report("%s", sd_strerror(rsp->result));
1418         return -EIO;
1419     }
1420 }
1421
1422 static int read_object(int fd, BlockDriverState *bs, char *buf,
1423                        uint64_t oid, uint8_t copies,
1424                        unsigned int datalen, uint64_t offset,
1425                        uint32_t cache_flags)
1426 {
1427     return read_write_object(fd, bs, buf, oid, copies,
1428                              datalen, offset, false,
1429                              false, cache_flags);
1430 }
1431
1432 static int write_object(int fd, BlockDriverState *bs, char *buf,
1433                         uint64_t oid, uint8_t copies,
1434                         unsigned int datalen, uint64_t offset, bool create,
1435                         uint32_t cache_flags)
1436 {
1437     return read_write_object(fd, bs, buf, oid, copies,
1438                              datalen, offset, true,
1439                              create, cache_flags);
1440 }
1441
1442 /* update inode with the latest state */
1443 static int reload_inode(BDRVSheepdogState *s, uint32_t snapid, const char *tag)
1444 {
1445     Error *local_err = NULL;
1446     SheepdogInode *inode;
1447     int ret = 0, fd;
1448     uint32_t vid = 0;
1449
1450     fd = connect_to_sdog(s, &local_err);
1451     if (fd < 0) {
1452         error_report_err(local_err);
1453         return -EIO;
1454     }
1455
1456     inode = g_malloc(SD_INODE_HEADER_SIZE);
1457
1458     ret = find_vdi_name(s, s->name, snapid, tag, &vid, false, &local_err);
1459     if (ret) {
1460         error_report_err(local_err);
1461         goto out;
1462     }
1463
1464     ret = read_object(fd, s->bs, (char *)inode, vid_to_vdi_oid(vid),
1465                       s->inode.nr_copies, SD_INODE_HEADER_SIZE, 0,
1466                       s->cache_flags);
1467     if (ret < 0) {
1468         goto out;
1469     }
1470
1471     if (inode->vdi_id != s->inode.vdi_id) {
1472         memcpy(&s->inode, inode, SD_INODE_HEADER_SIZE);
1473     }
1474
1475 out:
1476     g_free(inode);
1477     closesocket(fd);
1478
1479     return ret;
1480 }
1481
1482 static void coroutine_fn resend_aioreq(BDRVSheepdogState *s, AIOReq *aio_req)
1483 {
1484     SheepdogAIOCB *acb = aio_req->aiocb;
1485
1486     aio_req->create = false;
1487
1488     /* check whether this request becomes a CoW one */
1489     if (acb->aiocb_type == AIOCB_WRITE_UDATA && is_data_obj(aio_req->oid)) {
1490         int idx = data_oid_to_idx(aio_req->oid);
1491
1492         if (is_data_obj_writable(&s->inode, idx)) {
1493             goto out;
1494         }
1495
1496         if (s->inode.data_vdi_id[idx]) {
1497             aio_req->base_oid = vid_to_data_oid(s->inode.data_vdi_id[idx], idx);
1498             aio_req->flags |= SD_FLAG_CMD_COW;
1499         }
1500         aio_req->create = true;
1501     }
1502 out:
1503     if (is_data_obj(aio_req->oid)) {
1504         add_aio_request(s, aio_req, acb->qiov->iov, acb->qiov->niov,
1505                         acb->aiocb_type);
1506     } else {
1507         struct iovec iov;
1508         iov.iov_base = &s->inode;
1509         iov.iov_len = sizeof(s->inode);
1510         add_aio_request(s, aio_req, &iov, 1, AIOCB_WRITE_UDATA);
1511     }
1512 }
1513
1514 static void sd_detach_aio_context(BlockDriverState *bs)
1515 {
1516     BDRVSheepdogState *s = bs->opaque;
1517
1518     aio_set_fd_handler(s->aio_context, s->fd, false, NULL,
1519                        NULL, NULL, NULL);
1520 }
1521
1522 static void sd_attach_aio_context(BlockDriverState *bs,
1523                                   AioContext *new_context)
1524 {
1525     BDRVSheepdogState *s = bs->opaque;
1526
1527     s->aio_context = new_context;
1528     aio_set_fd_handler(new_context, s->fd, false,
1529                        co_read_response, NULL, NULL, s);
1530 }
1531
1532 static QemuOptsList runtime_opts = {
1533     .name = "sheepdog",
1534     .head = QTAILQ_HEAD_INITIALIZER(runtime_opts.head),
1535     .desc = {
1536         {
1537             .name = "vdi",
1538             .type = QEMU_OPT_STRING,
1539         },
1540         {
1541             .name = "snap-id",
1542             .type = QEMU_OPT_NUMBER,
1543         },
1544         {
1545             .name = "tag",
1546             .type = QEMU_OPT_STRING,
1547         },
1548         { /* end of list */ }
1549     },
1550 };
1551
1552 static int sd_open(BlockDriverState *bs, QDict *options, int flags,
1553                    Error **errp)
1554 {
1555     int ret, fd;
1556     uint32_t vid = 0;
1557     BDRVSheepdogState *s = bs->opaque;
1558     const char *vdi, *snap_id_str, *tag;
1559     uint64_t snap_id;
1560     char *buf = NULL;
1561     QemuOpts *opts;
1562     Error *local_err = NULL;
1563
1564     s->bs = bs;
1565     s->aio_context = bdrv_get_aio_context(bs);
1566
1567     opts = qemu_opts_create(&runtime_opts, NULL, 0, &error_abort);
1568     qemu_opts_absorb_qdict(opts, options, &local_err);
1569     if (local_err) {
1570         error_propagate(errp, local_err);
1571         ret = -EINVAL;
1572         goto err_no_fd;
1573     }
1574
1575     s->addr = sd_server_config(options, errp);
1576     if (!s->addr) {
1577         ret = -EINVAL;
1578         goto err_no_fd;
1579     }
1580
1581     vdi = qemu_opt_get(opts, "vdi");
1582     snap_id_str = qemu_opt_get(opts, "snap-id");
1583     snap_id = qemu_opt_get_number(opts, "snap-id", CURRENT_VDI_ID);
1584     tag = qemu_opt_get(opts, "tag");
1585
1586     if (!vdi) {
1587         error_setg(errp, "parameter 'vdi' is missing");
1588         ret = -EINVAL;
1589         goto err_no_fd;
1590     }
1591     if (strlen(vdi) >= SD_MAX_VDI_LEN) {
1592         error_setg(errp, "value of parameter 'vdi' is too long");
1593         ret = -EINVAL;
1594         goto err_no_fd;
1595     }
1596
1597     if (snap_id > UINT32_MAX) {
1598         snap_id = 0;
1599     }
1600     if (snap_id_str && !snap_id) {
1601         error_setg(errp, "'snap-id=%s' is not a valid snapshot ID",
1602                    snap_id_str);
1603         ret = -EINVAL;
1604         goto err_no_fd;
1605     }
1606
1607     if (!tag) {
1608         tag = "";
1609     }
1610     if (strlen(tag) >= SD_MAX_VDI_TAG_LEN) {
1611         error_setg(errp, "value of parameter 'tag' is too long");
1612         ret = -EINVAL;
1613         goto err_no_fd;
1614     }
1615
1616     QLIST_INIT(&s->inflight_aio_head);
1617     QLIST_INIT(&s->failed_aio_head);
1618     QLIST_INIT(&s->inflight_aiocb_head);
1619
1620     s->fd = get_sheep_fd(s, errp);
1621     if (s->fd < 0) {
1622         ret = s->fd;
1623         goto err_no_fd;
1624     }
1625
1626     ret = find_vdi_name(s, vdi, (uint32_t)snap_id, tag, &vid, true, errp);
1627     if (ret) {
1628         goto err;
1629     }
1630
1631     /*
1632      * QEMU block layer emulates writethrough cache as 'writeback + flush', so
1633      * we always set SD_FLAG_CMD_CACHE (writeback cache) as default.
1634      */
1635     s->cache_flags = SD_FLAG_CMD_CACHE;
1636     if (flags & BDRV_O_NOCACHE) {
1637         s->cache_flags = SD_FLAG_CMD_DIRECT;
1638     }
1639     s->discard_supported = true;
1640
1641     if (snap_id || tag[0]) {
1642         DPRINTF("%" PRIx32 " snapshot inode was open.\n", vid);
1643         s->is_snapshot = true;
1644     }
1645
1646     fd = connect_to_sdog(s, errp);
1647     if (fd < 0) {
1648         ret = fd;
1649         goto err;
1650     }
1651
1652     buf = g_malloc(SD_INODE_SIZE);
1653     ret = read_object(fd, s->bs, buf, vid_to_vdi_oid(vid),
1654                       0, SD_INODE_SIZE, 0, s->cache_flags);
1655
1656     closesocket(fd);
1657
1658     if (ret) {
1659         error_setg(errp, "Can't read snapshot inode");
1660         goto err;
1661     }
1662
1663     memcpy(&s->inode, buf, sizeof(s->inode));
1664
1665     bs->total_sectors = s->inode.vdi_size / BDRV_SECTOR_SIZE;
1666     pstrcpy(s->name, sizeof(s->name), vdi);
1667     qemu_co_mutex_init(&s->lock);
1668     qemu_co_mutex_init(&s->queue_lock);
1669     qemu_co_queue_init(&s->overlapping_queue);
1670     qemu_opts_del(opts);
1671     g_free(buf);
1672     return 0;
1673
1674 err:
1675     aio_set_fd_handler(bdrv_get_aio_context(bs), s->fd,
1676                        false, NULL, NULL, NULL, NULL);
1677     closesocket(s->fd);
1678 err_no_fd:
1679     qemu_opts_del(opts);
1680     g_free(buf);
1681     return ret;
1682 }
1683
1684 static int sd_reopen_prepare(BDRVReopenState *state, BlockReopenQueue *queue,
1685                              Error **errp)
1686 {
1687     BDRVSheepdogState *s = state->bs->opaque;
1688     BDRVSheepdogReopenState *re_s;
1689     int ret = 0;
1690
1691     re_s = state->opaque = g_new0(BDRVSheepdogReopenState, 1);
1692
1693     re_s->cache_flags = SD_FLAG_CMD_CACHE;
1694     if (state->flags & BDRV_O_NOCACHE) {
1695         re_s->cache_flags = SD_FLAG_CMD_DIRECT;
1696     }
1697
1698     re_s->fd = get_sheep_fd(s, errp);
1699     if (re_s->fd < 0) {
1700         ret = re_s->fd;
1701         return ret;
1702     }
1703
1704     return ret;
1705 }
1706
1707 static void sd_reopen_commit(BDRVReopenState *state)
1708 {
1709     BDRVSheepdogReopenState *re_s = state->opaque;
1710     BDRVSheepdogState *s = state->bs->opaque;
1711
1712     if (s->fd) {
1713         aio_set_fd_handler(s->aio_context, s->fd, false,
1714                            NULL, NULL, NULL, NULL);
1715         closesocket(s->fd);
1716     }
1717
1718     s->fd = re_s->fd;
1719     s->cache_flags = re_s->cache_flags;
1720
1721     g_free(state->opaque);
1722     state->opaque = NULL;
1723
1724     return;
1725 }
1726
1727 static void sd_reopen_abort(BDRVReopenState *state)
1728 {
1729     BDRVSheepdogReopenState *re_s = state->opaque;
1730     BDRVSheepdogState *s = state->bs->opaque;
1731
1732     if (re_s == NULL) {
1733         return;
1734     }
1735
1736     if (re_s->fd) {
1737         aio_set_fd_handler(s->aio_context, re_s->fd, false,
1738                            NULL, NULL, NULL, NULL);
1739         closesocket(re_s->fd);
1740     }
1741
1742     g_free(state->opaque);
1743     state->opaque = NULL;
1744
1745     return;
1746 }
1747
1748 static int do_sd_create(BDRVSheepdogState *s, uint32_t *vdi_id, int snapshot,
1749                         Error **errp)
1750 {
1751     SheepdogVdiReq hdr;
1752     SheepdogVdiRsp *rsp = (SheepdogVdiRsp *)&hdr;
1753     int fd, ret;
1754     unsigned int wlen, rlen = 0;
1755     char buf[SD_MAX_VDI_LEN];
1756
1757     fd = connect_to_sdog(s, errp);
1758     if (fd < 0) {
1759         return fd;
1760     }
1761
1762     /* FIXME: would it be better to fail (e.g., return -EIO) when filename
1763      * does not fit in buf?  For now, just truncate and avoid buffer overrun.
1764      */
1765     memset(buf, 0, sizeof(buf));
1766     pstrcpy(buf, sizeof(buf), s->name);
1767
1768     memset(&hdr, 0, sizeof(hdr));
1769     hdr.opcode = SD_OP_NEW_VDI;
1770     hdr.base_vdi_id = s->inode.vdi_id;
1771
1772     wlen = SD_MAX_VDI_LEN;
1773
1774     hdr.flags = SD_FLAG_CMD_WRITE;
1775     hdr.snapid = snapshot;
1776
1777     hdr.data_length = wlen;
1778     hdr.vdi_size = s->inode.vdi_size;
1779     hdr.copy_policy = s->inode.copy_policy;
1780     hdr.copies = s->inode.nr_copies;
1781     hdr.block_size_shift = s->inode.block_size_shift;
1782
1783     ret = do_req(fd, NULL, (SheepdogReq *)&hdr, buf, &wlen, &rlen);
1784
1785     closesocket(fd);
1786
1787     if (ret) {
1788         error_setg_errno(errp, -ret, "create failed");
1789         return ret;
1790     }
1791
1792     if (rsp->result != SD_RES_SUCCESS) {
1793         error_setg(errp, "%s, %s", sd_strerror(rsp->result), s->inode.name);
1794         return -EIO;
1795     }
1796
1797     if (vdi_id) {
1798         *vdi_id = rsp->vdi_id;
1799     }
1800
1801     return 0;
1802 }
1803
1804 static int sd_prealloc(BlockDriverState *bs, int64_t old_size, int64_t new_size,
1805                        Error **errp)
1806 {
1807     BlockBackend *blk = NULL;
1808     BDRVSheepdogState *base = bs->opaque;
1809     unsigned long buf_size;
1810     uint32_t idx, max_idx;
1811     uint32_t object_size;
1812     void *buf = NULL;
1813     int ret;
1814
1815     blk = blk_new(BLK_PERM_CONSISTENT_READ | BLK_PERM_WRITE | BLK_PERM_RESIZE,
1816                   BLK_PERM_ALL);
1817
1818     ret = blk_insert_bs(blk, bs, errp);
1819     if (ret < 0) {
1820         goto out_with_err_set;
1821     }
1822
1823     blk_set_allow_write_beyond_eof(blk, true);
1824
1825     object_size = (UINT32_C(1) << base->inode.block_size_shift);
1826     buf_size = MIN(object_size, SD_DATA_OBJ_SIZE);
1827     buf = g_malloc0(buf_size);
1828
1829     max_idx = DIV_ROUND_UP(new_size, buf_size);
1830
1831     for (idx = old_size / buf_size; idx < max_idx; idx++) {
1832         /*
1833          * The created image can be a cloned image, so we need to read
1834          * a data from the source image.
1835          */
1836         ret = blk_pread(blk, idx * buf_size, buf, buf_size);
1837         if (ret < 0) {
1838             goto out;
1839         }
1840         ret = blk_pwrite(blk, idx * buf_size, buf, buf_size, 0);
1841         if (ret < 0) {
1842             goto out;
1843         }
1844     }
1845
1846     ret = 0;
1847 out:
1848     if (ret < 0) {
1849         error_setg_errno(errp, -ret, "Can't pre-allocate");
1850     }
1851 out_with_err_set:
1852     blk_unref(blk);
1853     g_free(buf);
1854
1855     return ret;
1856 }
1857
1858 static int sd_create_prealloc(BlockdevOptionsSheepdog *location, int64_t size,
1859                               Error **errp)
1860 {
1861     BlockDriverState *bs;
1862     Visitor *v;
1863     QObject *obj = NULL;
1864     QDict *qdict;
1865     Error *local_err = NULL;
1866     int ret;
1867
1868     v = qobject_output_visitor_new(&obj);
1869     visit_type_BlockdevOptionsSheepdog(v, NULL, &location, &local_err);
1870     visit_free(v);
1871
1872     if (local_err) {
1873         error_propagate(errp, local_err);
1874         qobject_unref(obj);
1875         return -EINVAL;
1876     }
1877
1878     qdict = qobject_to(QDict, obj);
1879     qdict_flatten(qdict);
1880
1881     qdict_put_str(qdict, "driver", "sheepdog");
1882
1883     bs = bdrv_open(NULL, NULL, qdict, BDRV_O_PROTOCOL | BDRV_O_RDWR, errp);
1884     if (bs == NULL) {
1885         ret = -EIO;
1886         goto fail;
1887     }
1888
1889     ret = sd_prealloc(bs, 0, size, errp);
1890 fail:
1891     bdrv_unref(bs);
1892     qobject_unref(qdict);
1893     return ret;
1894 }
1895
1896 static int parse_redundancy(BDRVSheepdogState *s, SheepdogRedundancy *opt)
1897 {
1898     struct SheepdogInode *inode = &s->inode;
1899
1900     switch (opt->type) {
1901     case SHEEPDOG_REDUNDANCY_TYPE_FULL:
1902         if (opt->u.full.copies > SD_MAX_COPIES || opt->u.full.copies < 1) {
1903             return -EINVAL;
1904         }
1905         inode->copy_policy = 0;
1906         inode->nr_copies = opt->u.full.copies;
1907         return 0;
1908
1909     case SHEEPDOG_REDUNDANCY_TYPE_ERASURE_CODED:
1910     {
1911         int64_t copy = opt->u.erasure_coded.data_strips;
1912         int64_t parity = opt->u.erasure_coded.parity_strips;
1913
1914         if (copy != 2 && copy != 4 && copy != 8 && copy != 16) {
1915             return -EINVAL;
1916         }
1917
1918         if (parity >= SD_EC_MAX_STRIP || parity < 1) {
1919             return -EINVAL;
1920         }
1921
1922         /*
1923          * 4 bits for parity and 4 bits for data.
1924          * We have to compress upper data bits because it can't represent 16
1925          */
1926         inode->copy_policy = ((copy / 2) << 4) + parity;
1927         inode->nr_copies = copy + parity;
1928         return 0;
1929     }
1930
1931     default:
1932         g_assert_not_reached();
1933     }
1934
1935     return -EINVAL;
1936 }
1937
1938 /*
1939  * Sheepdog support two kinds of redundancy, full replication and erasure
1940  * coding.
1941  *
1942  * # create a fully replicated vdi with x copies
1943  * -o redundancy=x (1 <= x <= SD_MAX_COPIES)
1944  *
1945  * # create a erasure coded vdi with x data strips and y parity strips
1946  * -o redundancy=x:y (x must be one of {2,4,8,16} and 1 <= y < SD_EC_MAX_STRIP)
1947  */
1948 static SheepdogRedundancy *parse_redundancy_str(const char *opt)
1949 {
1950     SheepdogRedundancy *redundancy;
1951     const char *n1, *n2;
1952     long copy, parity;
1953     char p[10];
1954     int ret;
1955
1956     pstrcpy(p, sizeof(p), opt);
1957     n1 = strtok(p, ":");
1958     n2 = strtok(NULL, ":");
1959
1960     if (!n1) {
1961         return NULL;
1962     }
1963
1964     ret = qemu_strtol(n1, NULL, 10, &copy);
1965     if (ret < 0) {
1966         return NULL;
1967     }
1968
1969     redundancy = g_new0(SheepdogRedundancy, 1);
1970     if (!n2) {
1971         *redundancy = (SheepdogRedundancy) {
1972             .type               = SHEEPDOG_REDUNDANCY_TYPE_FULL,
1973             .u.full.copies      = copy,
1974         };
1975     } else {
1976         ret = qemu_strtol(n2, NULL, 10, &parity);
1977         if (ret < 0) {
1978             g_free(redundancy);
1979             return NULL;
1980         }
1981
1982         *redundancy = (SheepdogRedundancy) {
1983             .type               = SHEEPDOG_REDUNDANCY_TYPE_ERASURE_CODED,
1984             .u.erasure_coded    = {
1985                 .data_strips    = copy,
1986                 .parity_strips  = parity,
1987             },
1988         };
1989     }
1990
1991     return redundancy;
1992 }
1993
1994 static int parse_block_size_shift(BDRVSheepdogState *s,
1995                                   BlockdevCreateOptionsSheepdog *opts)
1996 {
1997     struct SheepdogInode *inode = &s->inode;
1998     uint64_t object_size;
1999     int obj_order;
2000
2001     if (opts->has_object_size) {
2002         object_size = opts->object_size;
2003
2004         if ((object_size - 1) & object_size) {    /* not a power of 2? */
2005             return -EINVAL;
2006         }
2007         obj_order = ctz32(object_size);
2008         if (obj_order < 20 || obj_order > 31) {
2009             return -EINVAL;
2010         }
2011         inode->block_size_shift = (uint8_t)obj_order;
2012     }
2013
2014     return 0;
2015 }
2016
2017 static int sd_co_create(BlockdevCreateOptions *options, Error **errp)
2018 {
2019     BlockdevCreateOptionsSheepdog *opts = &options->u.sheepdog;
2020     int ret = 0;
2021     uint32_t vid = 0;
2022     char *backing_file = NULL;
2023     char *buf = NULL;
2024     BDRVSheepdogState *s;
2025     uint64_t max_vdi_size;
2026     bool prealloc = false;
2027
2028     assert(options->driver == BLOCKDEV_DRIVER_SHEEPDOG);
2029
2030     s = g_new0(BDRVSheepdogState, 1);
2031
2032     /* Steal SocketAddress from QAPI, set NULL to prevent double free */
2033     s->addr = opts->location->server;
2034     opts->location->server = NULL;
2035
2036     if (strlen(opts->location->vdi) >= sizeof(s->name)) {
2037         error_setg(errp, "'vdi' string too long");
2038         ret = -EINVAL;
2039         goto out;
2040     }
2041     pstrcpy(s->name, sizeof(s->name), opts->location->vdi);
2042
2043     s->inode.vdi_size = opts->size;
2044     backing_file = opts->backing_file;
2045
2046     if (!opts->has_preallocation) {
2047         opts->preallocation = PREALLOC_MODE_OFF;
2048     }
2049     switch (opts->preallocation) {
2050     case PREALLOC_MODE_OFF:
2051         prealloc = false;
2052         break;
2053     case PREALLOC_MODE_FULL:
2054         prealloc = true;
2055         break;
2056     default:
2057         error_setg(errp, "Preallocation mode not supported for Sheepdog");
2058         ret = -EINVAL;
2059         goto out;
2060     }
2061
2062     if (opts->has_redundancy) {
2063         ret = parse_redundancy(s, opts->redundancy);
2064         if (ret < 0) {
2065             error_setg(errp, "Invalid redundancy mode");
2066             goto out;
2067         }
2068     }
2069     ret = parse_block_size_shift(s, opts);
2070     if (ret < 0) {
2071         error_setg(errp, "Invalid object_size."
2072                          " obect_size needs to be power of 2"
2073                          " and be limited from 2^20 to 2^31");
2074         goto out;
2075     }
2076
2077     if (opts->has_backing_file) {
2078         BlockBackend *blk;
2079         BDRVSheepdogState *base;
2080         BlockDriver *drv;
2081
2082         /* Currently, only Sheepdog backing image is supported. */
2083         drv = bdrv_find_protocol(opts->backing_file, true, NULL);
2084         if (!drv || strcmp(drv->protocol_name, "sheepdog") != 0) {
2085             error_setg(errp, "backing_file must be a sheepdog image");
2086             ret = -EINVAL;
2087             goto out;
2088         }
2089
2090         blk = blk_new_open(opts->backing_file, NULL, NULL,
2091                            BDRV_O_PROTOCOL, errp);
2092         if (blk == NULL) {
2093             ret = -EIO;
2094             goto out;
2095         }
2096
2097         base = blk_bs(blk)->opaque;
2098
2099         if (!is_snapshot(&base->inode)) {
2100             error_setg(errp, "cannot clone from a non snapshot vdi");
2101             blk_unref(blk);
2102             ret = -EINVAL;
2103             goto out;
2104         }
2105         s->inode.vdi_id = base->inode.vdi_id;
2106         blk_unref(blk);
2107     }
2108
2109     s->aio_context = qemu_get_aio_context();
2110
2111     /* if block_size_shift is not specified, get cluster default value */
2112     if (s->inode.block_size_shift == 0) {
2113         SheepdogVdiReq hdr;
2114         SheepdogClusterRsp *rsp = (SheepdogClusterRsp *)&hdr;
2115         int fd;
2116         unsigned int wlen = 0, rlen = 0;
2117
2118         fd = connect_to_sdog(s, errp);
2119         if (fd < 0) {
2120             ret = fd;
2121             goto out;
2122         }
2123
2124         memset(&hdr, 0, sizeof(hdr));
2125         hdr.opcode = SD_OP_GET_CLUSTER_DEFAULT;
2126         hdr.proto_ver = SD_PROTO_VER;
2127
2128         ret = do_req(fd, NULL, (SheepdogReq *)&hdr,
2129                      NULL, &wlen, &rlen);
2130         closesocket(fd);
2131         if (ret) {
2132             error_setg_errno(errp, -ret, "failed to get cluster default");
2133             goto out;
2134         }
2135         if (rsp->result == SD_RES_SUCCESS) {
2136             s->inode.block_size_shift = rsp->block_size_shift;
2137         } else {
2138             s->inode.block_size_shift = SD_DEFAULT_BLOCK_SIZE_SHIFT;
2139         }
2140     }
2141
2142     max_vdi_size = (UINT64_C(1) << s->inode.block_size_shift) * MAX_DATA_OBJS;
2143
2144     if (s->inode.vdi_size > max_vdi_size) {
2145         error_setg(errp, "An image is too large."
2146                          " The maximum image size is %"PRIu64 "GB",
2147                          max_vdi_size / 1024 / 1024 / 1024);
2148         ret = -EINVAL;
2149         goto out;
2150     }
2151
2152     ret = do_sd_create(s, &vid, 0, errp);
2153     if (ret) {
2154         goto out;
2155     }
2156
2157     if (prealloc) {
2158         ret = sd_create_prealloc(opts->location, opts->size, errp);
2159     }
2160 out:
2161     g_free(backing_file);
2162     g_free(buf);
2163     g_free(s->addr);
2164     g_free(s);
2165     return ret;
2166 }
2167
2168 static int coroutine_fn sd_co_create_opts(const char *filename, QemuOpts *opts,
2169                                           Error **errp)
2170 {
2171     BlockdevCreateOptions *create_options = NULL;
2172     QDict *qdict, *location_qdict;
2173     Visitor *v;
2174     char *redundancy;
2175     Error *local_err = NULL;
2176     int ret;
2177
2178     redundancy = qemu_opt_get_del(opts, BLOCK_OPT_REDUNDANCY);
2179
2180     qdict = qemu_opts_to_qdict(opts, NULL);
2181     qdict_put_str(qdict, "driver", "sheepdog");
2182
2183     location_qdict = qdict_new();
2184     qdict_put(qdict, "location", location_qdict);
2185
2186     sd_parse_filename(filename, location_qdict, &local_err);
2187     if (local_err) {
2188         error_propagate(errp, local_err);
2189         ret = -EINVAL;
2190         goto fail;
2191     }
2192
2193     qdict_flatten(qdict);
2194
2195     /* Change legacy command line options into QMP ones */
2196     static const QDictRenames opt_renames[] = {
2197         { BLOCK_OPT_BACKING_FILE,       "backing-file" },
2198         { BLOCK_OPT_OBJECT_SIZE,        "object-size" },
2199         { NULL, NULL },
2200     };
2201
2202     if (!qdict_rename_keys(qdict, opt_renames, errp)) {
2203         ret = -EINVAL;
2204         goto fail;
2205     }
2206
2207     /* Get the QAPI object */
2208     v = qobject_input_visitor_new_flat_confused(qdict, errp);
2209     if (!v) {
2210         ret = -EINVAL;
2211         goto fail;
2212     }
2213
2214     visit_type_BlockdevCreateOptions(v, NULL, &create_options, &local_err);
2215     visit_free(v);
2216
2217     if (local_err) {
2218         error_propagate(errp, local_err);
2219         ret = -EINVAL;
2220         goto fail;
2221     }
2222
2223     assert(create_options->driver == BLOCKDEV_DRIVER_SHEEPDOG);
2224     create_options->u.sheepdog.size =
2225         ROUND_UP(create_options->u.sheepdog.size, BDRV_SECTOR_SIZE);
2226
2227     if (redundancy) {
2228         create_options->u.sheepdog.has_redundancy = true;
2229         create_options->u.sheepdog.redundancy =
2230             parse_redundancy_str(redundancy);
2231         if (create_options->u.sheepdog.redundancy == NULL) {
2232             error_setg(errp, "Invalid redundancy mode");
2233             ret = -EINVAL;
2234             goto fail;
2235         }
2236     }
2237
2238     ret = sd_co_create(create_options, errp);
2239 fail:
2240     qapi_free_BlockdevCreateOptions(create_options);
2241     qobject_unref(qdict);
2242     g_free(redundancy);
2243     return ret;
2244 }
2245
2246 static void sd_close(BlockDriverState *bs)
2247 {
2248     Error *local_err = NULL;
2249     BDRVSheepdogState *s = bs->opaque;
2250     SheepdogVdiReq hdr;
2251     SheepdogVdiRsp *rsp = (SheepdogVdiRsp *)&hdr;
2252     unsigned int wlen, rlen = 0;
2253     int fd, ret;
2254
2255     DPRINTF("%s\n", s->name);
2256
2257     fd = connect_to_sdog(s, &local_err);
2258     if (fd < 0) {
2259         error_report_err(local_err);
2260         return;
2261     }
2262
2263     memset(&hdr, 0, sizeof(hdr));
2264
2265     hdr.opcode = SD_OP_RELEASE_VDI;
2266     hdr.type = LOCK_TYPE_NORMAL;
2267     hdr.base_vdi_id = s->inode.vdi_id;
2268     wlen = strlen(s->name) + 1;
2269     hdr.data_length = wlen;
2270     hdr.flags = SD_FLAG_CMD_WRITE;
2271
2272     ret = do_req(fd, s->bs, (SheepdogReq *)&hdr,
2273                  s->name, &wlen, &rlen);
2274
2275     closesocket(fd);
2276
2277     if (!ret && rsp->result != SD_RES_SUCCESS &&
2278         rsp->result != SD_RES_VDI_NOT_LOCKED) {
2279         error_report("%s, %s", sd_strerror(rsp->result), s->name);
2280     }
2281
2282     aio_set_fd_handler(bdrv_get_aio_context(bs), s->fd,
2283                        false, NULL, NULL, NULL, NULL);
2284     closesocket(s->fd);
2285     qapi_free_SocketAddress(s->addr);
2286 }
2287
2288 static int64_t sd_getlength(BlockDriverState *bs)
2289 {
2290     BDRVSheepdogState *s = bs->opaque;
2291
2292     return s->inode.vdi_size;
2293 }
2294
2295 static int coroutine_fn sd_co_truncate(BlockDriverState *bs, int64_t offset,
2296                                        PreallocMode prealloc, Error **errp)
2297 {
2298     BDRVSheepdogState *s = bs->opaque;
2299     int ret, fd;
2300     unsigned int datalen;
2301     uint64_t max_vdi_size;
2302     int64_t old_size = s->inode.vdi_size;
2303
2304     if (prealloc != PREALLOC_MODE_OFF && prealloc != PREALLOC_MODE_FULL) {
2305         error_setg(errp, "Unsupported preallocation mode '%s'",
2306                    PreallocMode_str(prealloc));
2307         return -ENOTSUP;
2308     }
2309
2310     max_vdi_size = (UINT64_C(1) << s->inode.block_size_shift) * MAX_DATA_OBJS;
2311     if (offset < old_size) {
2312         error_setg(errp, "shrinking is not supported");
2313         return -EINVAL;
2314     } else if (offset > max_vdi_size) {
2315         error_setg(errp, "too big image size");
2316         return -EINVAL;
2317     }
2318
2319     fd = connect_to_sdog(s, errp);
2320     if (fd < 0) {
2321         return fd;
2322     }
2323
2324     /* we don't need to update entire object */
2325     datalen = SD_INODE_HEADER_SIZE;
2326     s->inode.vdi_size = offset;
2327     ret = write_object(fd, s->bs, (char *)&s->inode,
2328                        vid_to_vdi_oid(s->inode.vdi_id), s->inode.nr_copies,
2329                        datalen, 0, false, s->cache_flags);
2330     close(fd);
2331
2332     if (ret < 0) {
2333         error_setg_errno(errp, -ret, "failed to update an inode");
2334         return ret;
2335     }
2336
2337     if (prealloc == PREALLOC_MODE_FULL) {
2338         ret = sd_prealloc(bs, old_size, offset, errp);
2339         if (ret < 0) {
2340             return ret;
2341         }
2342     }
2343
2344     return 0;
2345 }
2346
2347 /*
2348  * This function is called after writing data objects.  If we need to
2349  * update metadata, this sends a write request to the vdi object.
2350  */
2351 static void coroutine_fn sd_write_done(SheepdogAIOCB *acb)
2352 {
2353     BDRVSheepdogState *s = acb->s;
2354     struct iovec iov;
2355     AIOReq *aio_req;
2356     uint32_t offset, data_len, mn, mx;
2357
2358     mn = acb->min_dirty_data_idx;
2359     mx = acb->max_dirty_data_idx;
2360     if (mn <= mx) {
2361         /* we need to update the vdi object. */
2362         ++acb->nr_pending;
2363         offset = sizeof(s->inode) - sizeof(s->inode.data_vdi_id) +
2364             mn * sizeof(s->inode.data_vdi_id[0]);
2365         data_len = (mx - mn + 1) * sizeof(s->inode.data_vdi_id[0]);
2366
2367         acb->min_dirty_data_idx = UINT32_MAX;
2368         acb->max_dirty_data_idx = 0;
2369
2370         iov.iov_base = &s->inode;
2371         iov.iov_len = sizeof(s->inode);
2372         aio_req = alloc_aio_req(s, acb, vid_to_vdi_oid(s->inode.vdi_id),
2373                                 data_len, offset, 0, false, 0, offset);
2374         add_aio_request(s, aio_req, &iov, 1, AIOCB_WRITE_UDATA);
2375         if (--acb->nr_pending) {
2376             qemu_coroutine_yield();
2377         }
2378     }
2379 }
2380
2381 /* Delete current working VDI on the snapshot chain */
2382 static bool sd_delete(BDRVSheepdogState *s)
2383 {
2384     Error *local_err = NULL;
2385     unsigned int wlen = SD_MAX_VDI_LEN, rlen = 0;
2386     SheepdogVdiReq hdr = {
2387         .opcode = SD_OP_DEL_VDI,
2388         .base_vdi_id = s->inode.vdi_id,
2389         .data_length = wlen,
2390         .flags = SD_FLAG_CMD_WRITE,
2391     };
2392     SheepdogVdiRsp *rsp = (SheepdogVdiRsp *)&hdr;
2393     int fd, ret;
2394
2395     fd = connect_to_sdog(s, &local_err);
2396     if (fd < 0) {
2397         error_report_err(local_err);
2398         return false;
2399     }
2400
2401     ret = do_req(fd, s->bs, (SheepdogReq *)&hdr,
2402                  s->name, &wlen, &rlen);
2403     closesocket(fd);
2404     if (ret) {
2405         return false;
2406     }
2407     switch (rsp->result) {
2408     case SD_RES_NO_VDI:
2409         error_report("%s was already deleted", s->name);
2410         /* fall through */
2411     case SD_RES_SUCCESS:
2412         break;
2413     default:
2414         error_report("%s, %s", sd_strerror(rsp->result), s->name);
2415         return false;
2416     }
2417
2418     return true;
2419 }
2420
2421 /*
2422  * Create a writable VDI from a snapshot
2423  */
2424 static int sd_create_branch(BDRVSheepdogState *s)
2425 {
2426     Error *local_err = NULL;
2427     int ret, fd;
2428     uint32_t vid;
2429     char *buf;
2430     bool deleted;
2431
2432     DPRINTF("%" PRIx32 " is snapshot.\n", s->inode.vdi_id);
2433
2434     buf = g_malloc(SD_INODE_SIZE);
2435
2436     /*
2437      * Even If deletion fails, we will just create extra snapshot based on
2438      * the working VDI which was supposed to be deleted. So no need to
2439      * false bail out.
2440      */
2441     deleted = sd_delete(s);
2442     ret = do_sd_create(s, &vid, !deleted, &local_err);
2443     if (ret) {
2444         error_report_err(local_err);
2445         goto out;
2446     }
2447
2448     DPRINTF("%" PRIx32 " is created.\n", vid);
2449
2450     fd = connect_to_sdog(s, &local_err);
2451     if (fd < 0) {
2452         error_report_err(local_err);
2453         ret = fd;
2454         goto out;
2455     }
2456
2457     ret = read_object(fd, s->bs, buf, vid_to_vdi_oid(vid),
2458                       s->inode.nr_copies, SD_INODE_SIZE, 0, s->cache_flags);
2459
2460     closesocket(fd);
2461
2462     if (ret < 0) {
2463         goto out;
2464     }
2465
2466     memcpy(&s->inode, buf, sizeof(s->inode));
2467
2468     s->is_snapshot = false;
2469     ret = 0;
2470     DPRINTF("%" PRIx32 " was newly created.\n", s->inode.vdi_id);
2471
2472 out:
2473     g_free(buf);
2474
2475     return ret;
2476 }
2477
2478 /*
2479  * Send I/O requests to the server.
2480  *
2481  * This function sends requests to the server, links the requests to
2482  * the inflight_list in BDRVSheepdogState, and exits without
2483  * waiting the response.  The responses are received in the
2484  * `aio_read_response' function which is called from the main loop as
2485  * a fd handler.
2486  *
2487  * Returns 1 when we need to wait a response, 0 when there is no sent
2488  * request and -errno in error cases.
2489  */
2490 static void coroutine_fn sd_co_rw_vector(SheepdogAIOCB *acb)
2491 {
2492     int ret = 0;
2493     unsigned long len, done = 0, total = acb->nb_sectors * BDRV_SECTOR_SIZE;
2494     unsigned long idx;
2495     uint32_t object_size;
2496     uint64_t oid;
2497     uint64_t offset;
2498     BDRVSheepdogState *s = acb->s;
2499     SheepdogInode *inode = &s->inode;
2500     AIOReq *aio_req;
2501
2502     if (acb->aiocb_type == AIOCB_WRITE_UDATA && s->is_snapshot) {
2503         /*
2504          * In the case we open the snapshot VDI, Sheepdog creates the
2505          * writable VDI when we do a write operation first.
2506          */
2507         ret = sd_create_branch(s);
2508         if (ret) {
2509             acb->ret = -EIO;
2510             return;
2511         }
2512     }
2513
2514     object_size = (UINT32_C(1) << inode->block_size_shift);
2515     idx = acb->sector_num * BDRV_SECTOR_SIZE / object_size;
2516     offset = (acb->sector_num * BDRV_SECTOR_SIZE) % object_size;
2517
2518     /*
2519      * Make sure we don't free the aiocb before we are done with all requests.
2520      * This additional reference is dropped at the end of this function.
2521      */
2522     acb->nr_pending++;
2523
2524     while (done != total) {
2525         uint8_t flags = 0;
2526         uint64_t old_oid = 0;
2527         bool create = false;
2528
2529         oid = vid_to_data_oid(inode->data_vdi_id[idx], idx);
2530
2531         len = MIN(total - done, object_size - offset);
2532
2533         switch (acb->aiocb_type) {
2534         case AIOCB_READ_UDATA:
2535             if (!inode->data_vdi_id[idx]) {
2536                 qemu_iovec_memset(acb->qiov, done, 0, len);
2537                 goto done;
2538             }
2539             break;
2540         case AIOCB_WRITE_UDATA:
2541             if (!inode->data_vdi_id[idx]) {
2542                 create = true;
2543             } else if (!is_data_obj_writable(inode, idx)) {
2544                 /* Copy-On-Write */
2545                 create = true;
2546                 old_oid = oid;
2547                 flags = SD_FLAG_CMD_COW;
2548             }
2549             break;
2550         case AIOCB_DISCARD_OBJ:
2551             /*
2552              * We discard the object only when the whole object is
2553              * 1) allocated 2) trimmed. Otherwise, simply skip it.
2554              */
2555             if (len != object_size || inode->data_vdi_id[idx] == 0) {
2556                 goto done;
2557             }
2558             break;
2559         default:
2560             break;
2561         }
2562
2563         if (create) {
2564             DPRINTF("update ino (%" PRIu32 ") %" PRIu64 " %" PRIu64 " %ld\n",
2565                     inode->vdi_id, oid,
2566                     vid_to_data_oid(inode->data_vdi_id[idx], idx), idx);
2567             oid = vid_to_data_oid(inode->vdi_id, idx);
2568             DPRINTF("new oid %" PRIx64 "\n", oid);
2569         }
2570
2571         aio_req = alloc_aio_req(s, acb, oid, len, offset, flags, create,
2572                                 old_oid,
2573                                 acb->aiocb_type == AIOCB_DISCARD_OBJ ?
2574                                 0 : done);
2575         add_aio_request(s, aio_req, acb->qiov->iov, acb->qiov->niov,
2576                         acb->aiocb_type);
2577     done:
2578         offset = 0;
2579         idx++;
2580         done += len;
2581     }
2582     if (--acb->nr_pending) {
2583         qemu_coroutine_yield();
2584     }
2585 }
2586
2587 static void sd_aio_complete(SheepdogAIOCB *acb)
2588 {
2589     BDRVSheepdogState *s;
2590     if (acb->aiocb_type == AIOCB_FLUSH_CACHE) {
2591         return;
2592     }
2593
2594     s = acb->s;
2595     qemu_co_mutex_lock(&s->queue_lock);
2596     QLIST_REMOVE(acb, aiocb_siblings);
2597     qemu_co_queue_restart_all(&s->overlapping_queue);
2598     qemu_co_mutex_unlock(&s->queue_lock);
2599 }
2600
2601 static coroutine_fn int sd_co_writev(BlockDriverState *bs, int64_t sector_num,
2602                                      int nb_sectors, QEMUIOVector *qiov,
2603                                      int flags)
2604 {
2605     SheepdogAIOCB acb;
2606     int ret;
2607     int64_t offset = (sector_num + nb_sectors) * BDRV_SECTOR_SIZE;
2608     BDRVSheepdogState *s = bs->opaque;
2609
2610     assert(!flags);
2611     if (offset > s->inode.vdi_size) {
2612         ret = sd_co_truncate(bs, offset, PREALLOC_MODE_OFF, NULL);
2613         if (ret < 0) {
2614             return ret;
2615         }
2616     }
2617
2618     sd_aio_setup(&acb, s, qiov, sector_num, nb_sectors, AIOCB_WRITE_UDATA);
2619     sd_co_rw_vector(&acb);
2620     sd_write_done(&acb);
2621     sd_aio_complete(&acb);
2622
2623     return acb.ret;
2624 }
2625
2626 static coroutine_fn int sd_co_readv(BlockDriverState *bs, int64_t sector_num,
2627                        int nb_sectors, QEMUIOVector *qiov)
2628 {
2629     SheepdogAIOCB acb;
2630     BDRVSheepdogState *s = bs->opaque;
2631
2632     sd_aio_setup(&acb, s, qiov, sector_num, nb_sectors, AIOCB_READ_UDATA);
2633     sd_co_rw_vector(&acb);
2634     sd_aio_complete(&acb);
2635
2636     return acb.ret;
2637 }
2638
2639 static int coroutine_fn sd_co_flush_to_disk(BlockDriverState *bs)
2640 {
2641     BDRVSheepdogState *s = bs->opaque;
2642     SheepdogAIOCB acb;
2643     AIOReq *aio_req;
2644
2645     if (s->cache_flags != SD_FLAG_CMD_CACHE) {
2646         return 0;
2647     }
2648
2649     sd_aio_setup(&acb, s, NULL, 0, 0, AIOCB_FLUSH_CACHE);
2650
2651     acb.nr_pending++;
2652     aio_req = alloc_aio_req(s, &acb, vid_to_vdi_oid(s->inode.vdi_id),
2653                             0, 0, 0, false, 0, 0);
2654     add_aio_request(s, aio_req, NULL, 0, acb.aiocb_type);
2655
2656     if (--acb.nr_pending) {
2657         qemu_coroutine_yield();
2658     }
2659
2660     sd_aio_complete(&acb);
2661     return acb.ret;
2662 }
2663
2664 static int sd_snapshot_create(BlockDriverState *bs, QEMUSnapshotInfo *sn_info)
2665 {
2666     Error *local_err = NULL;
2667     BDRVSheepdogState *s = bs->opaque;
2668     int ret, fd;
2669     uint32_t new_vid;
2670     SheepdogInode *inode;
2671     unsigned int datalen;
2672
2673     DPRINTF("sn_info: name %s id_str %s s: name %s vm_state_size %" PRId64 " "
2674             "is_snapshot %d\n", sn_info->name, sn_info->id_str,
2675             s->name, sn_info->vm_state_size, s->is_snapshot);
2676
2677     if (s->is_snapshot) {
2678         error_report("You can't create a snapshot of a snapshot VDI, "
2679                      "%s (%" PRIu32 ").", s->name, s->inode.vdi_id);
2680
2681         return -EINVAL;
2682     }
2683
2684     DPRINTF("%s %s\n", sn_info->name, sn_info->id_str);
2685
2686     s->inode.vm_state_size = sn_info->vm_state_size;
2687     s->inode.vm_clock_nsec = sn_info->vm_clock_nsec;
2688     /* It appears that inode.tag does not require a NUL terminator,
2689      * which means this use of strncpy is ok.
2690      */
2691     strncpy(s->inode.tag, sn_info->name, sizeof(s->inode.tag));
2692     /* we don't need to update entire object */
2693     datalen = SD_INODE_HEADER_SIZE;
2694     inode = g_malloc(datalen);
2695
2696     /* refresh inode. */
2697     fd = connect_to_sdog(s, &local_err);
2698     if (fd < 0) {
2699         error_report_err(local_err);
2700         ret = fd;
2701         goto cleanup;
2702     }
2703
2704     ret = write_object(fd, s->bs, (char *)&s->inode,
2705                        vid_to_vdi_oid(s->inode.vdi_id), s->inode.nr_copies,
2706                        datalen, 0, false, s->cache_flags);
2707     if (ret < 0) {
2708         error_report("failed to write snapshot's inode.");
2709         goto cleanup;
2710     }
2711
2712     ret = do_sd_create(s, &new_vid, 1, &local_err);
2713     if (ret < 0) {
2714         error_reportf_err(local_err,
2715                           "failed to create inode for snapshot: ");
2716         goto cleanup;
2717     }
2718
2719     ret = read_object(fd, s->bs, (char *)inode,
2720                       vid_to_vdi_oid(new_vid), s->inode.nr_copies, datalen, 0,
2721                       s->cache_flags);
2722
2723     if (ret < 0) {
2724         error_report("failed to read new inode info. %s", strerror(errno));
2725         goto cleanup;
2726     }
2727
2728     memcpy(&s->inode, inode, datalen);
2729     DPRINTF("s->inode: name %s snap_id %x oid %x\n",
2730             s->inode.name, s->inode.snap_id, s->inode.vdi_id);
2731
2732 cleanup:
2733     g_free(inode);
2734     closesocket(fd);
2735     return ret;
2736 }
2737
2738 /*
2739  * We implement rollback(loadvm) operation to the specified snapshot by
2740  * 1) switch to the snapshot
2741  * 2) rely on sd_create_branch to delete working VDI and
2742  * 3) create a new working VDI based on the specified snapshot
2743  */
2744 static int sd_snapshot_goto(BlockDriverState *bs, const char *snapshot_id)
2745 {
2746     BDRVSheepdogState *s = bs->opaque;
2747     BDRVSheepdogState *old_s;
2748     char tag[SD_MAX_VDI_TAG_LEN];
2749     uint32_t snapid = 0;
2750     int ret;
2751
2752     if (!sd_parse_snapid_or_tag(snapshot_id, &snapid, tag)) {
2753         return -EINVAL;
2754     }
2755
2756     old_s = g_new(BDRVSheepdogState, 1);
2757
2758     memcpy(old_s, s, sizeof(BDRVSheepdogState));
2759
2760     ret = reload_inode(s, snapid, tag);
2761     if (ret) {
2762         goto out;
2763     }
2764
2765     ret = sd_create_branch(s);
2766     if (ret) {
2767         goto out;
2768     }
2769
2770     g_free(old_s);
2771
2772     return 0;
2773 out:
2774     /* recover bdrv_sd_state */
2775     memcpy(s, old_s, sizeof(BDRVSheepdogState));
2776     g_free(old_s);
2777
2778     error_report("failed to open. recover old bdrv_sd_state.");
2779
2780     return ret;
2781 }
2782
2783 #define NR_BATCHED_DISCARD 128
2784
2785 static int remove_objects(BDRVSheepdogState *s, Error **errp)
2786 {
2787     int fd, i = 0, nr_objs = 0;
2788     int ret;
2789     SheepdogInode *inode = &s->inode;
2790
2791     fd = connect_to_sdog(s, errp);
2792     if (fd < 0) {
2793         return fd;
2794     }
2795
2796     nr_objs = count_data_objs(inode);
2797     while (i < nr_objs) {
2798         int start_idx, nr_filled_idx;
2799
2800         while (i < nr_objs && !inode->data_vdi_id[i]) {
2801             i++;
2802         }
2803         start_idx = i;
2804
2805         nr_filled_idx = 0;
2806         while (i < nr_objs && nr_filled_idx < NR_BATCHED_DISCARD) {
2807             if (inode->data_vdi_id[i]) {
2808                 inode->data_vdi_id[i] = 0;
2809                 nr_filled_idx++;
2810             }
2811
2812             i++;
2813         }
2814
2815         ret = write_object(fd, s->bs,
2816                            (char *)&inode->data_vdi_id[start_idx],
2817                            vid_to_vdi_oid(s->inode.vdi_id), inode->nr_copies,
2818                            (i - start_idx) * sizeof(uint32_t),
2819                            offsetof(struct SheepdogInode,
2820                                     data_vdi_id[start_idx]),
2821                            false, s->cache_flags);
2822         if (ret < 0) {
2823             error_setg(errp, "Failed to discard snapshot inode");
2824             goto out;
2825         }
2826     }
2827
2828     ret = 0;
2829 out:
2830     closesocket(fd);
2831     return ret;
2832 }
2833
2834 static int sd_snapshot_delete(BlockDriverState *bs,
2835                               const char *snapshot_id,
2836                               const char *name,
2837                               Error **errp)
2838 {
2839     /*
2840      * FIXME should delete the snapshot matching both @snapshot_id and
2841      * @name, but @name not used here
2842      */
2843     unsigned long snap_id = 0;
2844     char snap_tag[SD_MAX_VDI_TAG_LEN];
2845     int fd, ret;
2846     char buf[SD_MAX_VDI_LEN + SD_MAX_VDI_TAG_LEN];
2847     BDRVSheepdogState *s = bs->opaque;
2848     unsigned int wlen = SD_MAX_VDI_LEN + SD_MAX_VDI_TAG_LEN, rlen = 0;
2849     uint32_t vid;
2850     SheepdogVdiReq hdr = {
2851         .opcode = SD_OP_DEL_VDI,
2852         .data_length = wlen,
2853         .flags = SD_FLAG_CMD_WRITE,
2854     };
2855     SheepdogVdiRsp *rsp = (SheepdogVdiRsp *)&hdr;
2856
2857     ret = remove_objects(s, errp);
2858     if (ret) {
2859         return ret;
2860     }
2861
2862     memset(buf, 0, sizeof(buf));
2863     memset(snap_tag, 0, sizeof(snap_tag));
2864     pstrcpy(buf, SD_MAX_VDI_LEN, s->name);
2865     /* TODO Use sd_parse_snapid() once this mess is cleaned up */
2866     ret = qemu_strtoul(snapshot_id, NULL, 10, &snap_id);
2867     if (ret || snap_id > UINT32_MAX) {
2868         /*
2869          * FIXME Since qemu_strtoul() returns -EINVAL when
2870          * @snapshot_id is null, @snapshot_id is mandatory.  Correct
2871          * would be to require at least one of @snapshot_id and @name.
2872          */
2873         error_setg(errp, "Invalid snapshot ID: %s",
2874                          snapshot_id ? snapshot_id : "<null>");
2875         return -EINVAL;
2876     }
2877
2878     if (snap_id) {
2879         hdr.snapid = (uint32_t) snap_id;
2880     } else {
2881         /* FIXME I suspect we should use @name here */
2882         /* FIXME don't truncate silently */
2883         pstrcpy(snap_tag, sizeof(snap_tag), snapshot_id);
2884         pstrcpy(buf + SD_MAX_VDI_LEN, SD_MAX_VDI_TAG_LEN, snap_tag);
2885     }
2886
2887     ret = find_vdi_name(s, s->name, snap_id, snap_tag, &vid, true, errp);
2888     if (ret) {
2889         return ret;
2890     }
2891
2892     fd = connect_to_sdog(s, errp);
2893     if (fd < 0) {
2894         return fd;
2895     }
2896
2897     ret = do_req(fd, s->bs, (SheepdogReq *)&hdr,
2898                  buf, &wlen, &rlen);
2899     closesocket(fd);
2900     if (ret) {
2901         error_setg_errno(errp, -ret, "Couldn't send request to server");
2902         return ret;
2903     }
2904
2905     switch (rsp->result) {
2906     case SD_RES_NO_VDI:
2907         error_setg(errp, "Can't find the snapshot");
2908         return -ENOENT;
2909     case SD_RES_SUCCESS:
2910         break;
2911     default:
2912         error_setg(errp, "%s", sd_strerror(rsp->result));
2913         return -EIO;
2914     }
2915
2916     return 0;
2917 }
2918
2919 static int sd_snapshot_list(BlockDriverState *bs, QEMUSnapshotInfo **psn_tab)
2920 {
2921     Error *local_err = NULL;
2922     BDRVSheepdogState *s = bs->opaque;
2923     SheepdogReq req;
2924     int fd, nr = 1024, ret, max = BITS_TO_LONGS(SD_NR_VDIS) * sizeof(long);
2925     QEMUSnapshotInfo *sn_tab = NULL;
2926     unsigned wlen, rlen;
2927     int found = 0;
2928     SheepdogInode *inode;
2929     unsigned long *vdi_inuse;
2930     unsigned int start_nr;
2931     uint64_t hval;
2932     uint32_t vid;
2933
2934     vdi_inuse = g_malloc(max);
2935     inode = g_malloc(SD_INODE_HEADER_SIZE);
2936
2937     fd = connect_to_sdog(s, &local_err);
2938     if (fd < 0) {
2939         error_report_err(local_err);
2940         ret = fd;
2941         goto out;
2942     }
2943
2944     rlen = max;
2945     wlen = 0;
2946
2947     memset(&req, 0, sizeof(req));
2948
2949     req.opcode = SD_OP_READ_VDIS;
2950     req.data_length = max;
2951
2952     ret = do_req(fd, s->bs, &req, vdi_inuse, &wlen, &rlen);
2953
2954     closesocket(fd);
2955     if (ret) {
2956         goto out;
2957     }
2958
2959     sn_tab = g_new0(QEMUSnapshotInfo, nr);
2960
2961     /* calculate a vdi id with hash function */
2962     hval = fnv_64a_buf(s->name, strlen(s->name), FNV1A_64_INIT);
2963     start_nr = hval & (SD_NR_VDIS - 1);
2964
2965     fd = connect_to_sdog(s, &local_err);
2966     if (fd < 0) {
2967         error_report_err(local_err);
2968         ret = fd;
2969         goto out;
2970     }
2971
2972     for (vid = start_nr; found < nr; vid = (vid + 1) % SD_NR_VDIS) {
2973         if (!test_bit(vid, vdi_inuse)) {
2974             break;
2975         }
2976
2977         /* we don't need to read entire object */
2978         ret = read_object(fd, s->bs, (char *)inode,
2979                           vid_to_vdi_oid(vid),
2980                           0, SD_INODE_HEADER_SIZE, 0,
2981                           s->cache_flags);
2982
2983         if (ret) {
2984             continue;
2985         }
2986
2987         if (!strcmp(inode->name, s->name) && is_snapshot(inode)) {
2988             sn_tab[found].date_sec = inode->snap_ctime >> 32;
2989             sn_tab[found].date_nsec = inode->snap_ctime & 0xffffffff;
2990             sn_tab[found].vm_state_size = inode->vm_state_size;
2991             sn_tab[found].vm_clock_nsec = inode->vm_clock_nsec;
2992
2993             snprintf(sn_tab[found].id_str, sizeof(sn_tab[found].id_str),
2994                      "%" PRIu32, inode->snap_id);
2995             pstrcpy(sn_tab[found].name,
2996                     MIN(sizeof(sn_tab[found].name), sizeof(inode->tag)),
2997                     inode->tag);
2998             found++;
2999         }
3000     }
3001
3002     closesocket(fd);
3003 out:
3004     *psn_tab = sn_tab;
3005
3006     g_free(vdi_inuse);
3007     g_free(inode);
3008
3009     if (ret < 0) {
3010         return ret;
3011     }
3012
3013     return found;
3014 }
3015
3016 static int do_load_save_vmstate(BDRVSheepdogState *s, uint8_t *data,
3017                                 int64_t pos, int size, int load)
3018 {
3019     Error *local_err = NULL;
3020     bool create;
3021     int fd, ret = 0, remaining = size;
3022     unsigned int data_len;
3023     uint64_t vmstate_oid;
3024     uint64_t offset;
3025     uint32_t vdi_index;
3026     uint32_t vdi_id = load ? s->inode.parent_vdi_id : s->inode.vdi_id;
3027     uint32_t object_size = (UINT32_C(1) << s->inode.block_size_shift);
3028
3029     fd = connect_to_sdog(s, &local_err);
3030     if (fd < 0) {
3031         error_report_err(local_err);
3032         return fd;
3033     }
3034
3035     while (remaining) {
3036         vdi_index = pos / object_size;
3037         offset = pos % object_size;
3038
3039         data_len = MIN(remaining, object_size - offset);
3040
3041         vmstate_oid = vid_to_vmstate_oid(vdi_id, vdi_index);
3042
3043         create = (offset == 0);
3044         if (load) {
3045             ret = read_object(fd, s->bs, (char *)data, vmstate_oid,
3046                               s->inode.nr_copies, data_len, offset,
3047                               s->cache_flags);
3048         } else {
3049             ret = write_object(fd, s->bs, (char *)data, vmstate_oid,
3050                                s->inode.nr_copies, data_len, offset, create,
3051                                s->cache_flags);
3052         }
3053
3054         if (ret < 0) {
3055             error_report("failed to save vmstate %s", strerror(errno));
3056             goto cleanup;
3057         }
3058
3059         pos += data_len;
3060         data += data_len;
3061         remaining -= data_len;
3062     }
3063     ret = size;
3064 cleanup:
3065     closesocket(fd);
3066     return ret;
3067 }
3068
3069 static int sd_save_vmstate(BlockDriverState *bs, QEMUIOVector *qiov,
3070                            int64_t pos)
3071 {
3072     BDRVSheepdogState *s = bs->opaque;
3073     void *buf;
3074     int ret;
3075
3076     buf = qemu_blockalign(bs, qiov->size);
3077     qemu_iovec_to_buf(qiov, 0, buf, qiov->size);
3078     ret = do_load_save_vmstate(s, (uint8_t *) buf, pos, qiov->size, 0);
3079     qemu_vfree(buf);
3080
3081     return ret;
3082 }
3083
3084 static int sd_load_vmstate(BlockDriverState *bs, QEMUIOVector *qiov,
3085                            int64_t pos)
3086 {
3087     BDRVSheepdogState *s = bs->opaque;
3088     void *buf;
3089     int ret;
3090
3091     buf = qemu_blockalign(bs, qiov->size);
3092     ret = do_load_save_vmstate(s, buf, pos, qiov->size, 1);
3093     qemu_iovec_from_buf(qiov, 0, buf, qiov->size);
3094     qemu_vfree(buf);
3095
3096     return ret;
3097 }
3098
3099
3100 static coroutine_fn int sd_co_pdiscard(BlockDriverState *bs, int64_t offset,
3101                                       int bytes)
3102 {
3103     SheepdogAIOCB acb;
3104     BDRVSheepdogState *s = bs->opaque;
3105     QEMUIOVector discard_iov;
3106     struct iovec iov;
3107     uint32_t zero = 0;
3108
3109     if (!s->discard_supported) {
3110         return 0;
3111     }
3112
3113     memset(&discard_iov, 0, sizeof(discard_iov));
3114     memset(&iov, 0, sizeof(iov));
3115     iov.iov_base = &zero;
3116     iov.iov_len = sizeof(zero);
3117     discard_iov.iov = &iov;
3118     discard_iov.niov = 1;
3119     if (!QEMU_IS_ALIGNED(offset | bytes, BDRV_SECTOR_SIZE)) {
3120         return -ENOTSUP;
3121     }
3122     sd_aio_setup(&acb, s, &discard_iov, offset >> BDRV_SECTOR_BITS,
3123                  bytes >> BDRV_SECTOR_BITS, AIOCB_DISCARD_OBJ);
3124     sd_co_rw_vector(&acb);
3125     sd_aio_complete(&acb);
3126
3127     return acb.ret;
3128 }
3129
3130 static coroutine_fn int
3131 sd_co_block_status(BlockDriverState *bs, bool want_zero, int64_t offset,
3132                    int64_t bytes, int64_t *pnum, int64_t *map,
3133                    BlockDriverState **file)
3134 {
3135     BDRVSheepdogState *s = bs->opaque;
3136     SheepdogInode *inode = &s->inode;
3137     uint32_t object_size = (UINT32_C(1) << inode->block_size_shift);
3138     unsigned long start = offset / object_size,
3139                   end = DIV_ROUND_UP(offset + bytes, object_size);
3140     unsigned long idx;
3141     *map = offset;
3142     int ret = BDRV_BLOCK_DATA | BDRV_BLOCK_OFFSET_VALID;
3143
3144     for (idx = start; idx < end; idx++) {
3145         if (inode->data_vdi_id[idx] == 0) {
3146             break;
3147         }
3148     }
3149     if (idx == start) {
3150         /* Get the longest length of unallocated sectors */
3151         ret = 0;
3152         for (idx = start + 1; idx < end; idx++) {
3153             if (inode->data_vdi_id[idx] != 0) {
3154                 break;
3155             }
3156         }
3157     }
3158
3159     *pnum = (idx - start) * object_size;
3160     if (*pnum > bytes) {
3161         *pnum = bytes;
3162     }
3163     if (ret > 0 && ret & BDRV_BLOCK_OFFSET_VALID) {
3164         *file = bs;
3165     }
3166     return ret;
3167 }
3168
3169 static int64_t sd_get_allocated_file_size(BlockDriverState *bs)
3170 {
3171     BDRVSheepdogState *s = bs->opaque;
3172     SheepdogInode *inode = &s->inode;
3173     uint32_t object_size = (UINT32_C(1) << inode->block_size_shift);
3174     unsigned long i, last = DIV_ROUND_UP(inode->vdi_size, object_size);
3175     uint64_t size = 0;
3176
3177     for (i = 0; i < last; i++) {
3178         if (inode->data_vdi_id[i] == 0) {
3179             continue;
3180         }
3181         size += object_size;
3182     }
3183     return size;
3184 }
3185
3186 static QemuOptsList sd_create_opts = {
3187     .name = "sheepdog-create-opts",
3188     .head = QTAILQ_HEAD_INITIALIZER(sd_create_opts.head),
3189     .desc = {
3190         {
3191             .name = BLOCK_OPT_SIZE,
3192             .type = QEMU_OPT_SIZE,
3193             .help = "Virtual disk size"
3194         },
3195         {
3196             .name = BLOCK_OPT_BACKING_FILE,
3197             .type = QEMU_OPT_STRING,
3198             .help = "File name of a base image"
3199         },
3200         {
3201             .name = BLOCK_OPT_PREALLOC,
3202             .type = QEMU_OPT_STRING,
3203             .help = "Preallocation mode (allowed values: off, full)"
3204         },
3205         {
3206             .name = BLOCK_OPT_REDUNDANCY,
3207             .type = QEMU_OPT_STRING,
3208             .help = "Redundancy of the image"
3209         },
3210         {
3211             .name = BLOCK_OPT_OBJECT_SIZE,
3212             .type = QEMU_OPT_SIZE,
3213             .help = "Object size of the image"
3214         },
3215         { /* end of list */ }
3216     }
3217 };
3218
3219 static BlockDriver bdrv_sheepdog = {
3220     .format_name                  = "sheepdog",
3221     .protocol_name                = "sheepdog",
3222     .instance_size                = sizeof(BDRVSheepdogState),
3223     .bdrv_parse_filename          = sd_parse_filename,
3224     .bdrv_file_open               = sd_open,
3225     .bdrv_reopen_prepare          = sd_reopen_prepare,
3226     .bdrv_reopen_commit           = sd_reopen_commit,
3227     .bdrv_reopen_abort            = sd_reopen_abort,
3228     .bdrv_close                   = sd_close,
3229     .bdrv_co_create               = sd_co_create,
3230     .bdrv_co_create_opts          = sd_co_create_opts,
3231     .bdrv_has_zero_init           = bdrv_has_zero_init_1,
3232     .bdrv_getlength               = sd_getlength,
3233     .bdrv_get_allocated_file_size = sd_get_allocated_file_size,
3234     .bdrv_co_truncate             = sd_co_truncate,
3235
3236     .bdrv_co_readv                = sd_co_readv,
3237     .bdrv_co_writev               = sd_co_writev,
3238     .bdrv_co_flush_to_disk        = sd_co_flush_to_disk,
3239     .bdrv_co_pdiscard             = sd_co_pdiscard,
3240     .bdrv_co_block_status         = sd_co_block_status,
3241
3242     .bdrv_snapshot_create         = sd_snapshot_create,
3243     .bdrv_snapshot_goto           = sd_snapshot_goto,
3244     .bdrv_snapshot_delete         = sd_snapshot_delete,
3245     .bdrv_snapshot_list           = sd_snapshot_list,
3246
3247     .bdrv_save_vmstate            = sd_save_vmstate,
3248     .bdrv_load_vmstate            = sd_load_vmstate,
3249
3250     .bdrv_detach_aio_context      = sd_detach_aio_context,
3251     .bdrv_attach_aio_context      = sd_attach_aio_context,
3252
3253     .create_opts                  = &sd_create_opts,
3254 };
3255
3256 static BlockDriver bdrv_sheepdog_tcp = {
3257     .format_name                  = "sheepdog",
3258     .protocol_name                = "sheepdog+tcp",
3259     .instance_size                = sizeof(BDRVSheepdogState),
3260     .bdrv_parse_filename          = sd_parse_filename,
3261     .bdrv_file_open               = sd_open,
3262     .bdrv_reopen_prepare          = sd_reopen_prepare,
3263     .bdrv_reopen_commit           = sd_reopen_commit,
3264     .bdrv_reopen_abort            = sd_reopen_abort,
3265     .bdrv_close                   = sd_close,
3266     .bdrv_co_create               = sd_co_create,
3267     .bdrv_co_create_opts          = sd_co_create_opts,
3268     .bdrv_has_zero_init           = bdrv_has_zero_init_1,
3269     .bdrv_getlength               = sd_getlength,
3270     .bdrv_get_allocated_file_size = sd_get_allocated_file_size,
3271     .bdrv_co_truncate             = sd_co_truncate,
3272
3273     .bdrv_co_readv                = sd_co_readv,
3274     .bdrv_co_writev               = sd_co_writev,
3275     .bdrv_co_flush_to_disk        = sd_co_flush_to_disk,
3276     .bdrv_co_pdiscard             = sd_co_pdiscard,
3277     .bdrv_co_block_status         = sd_co_block_status,
3278
3279     .bdrv_snapshot_create         = sd_snapshot_create,
3280     .bdrv_snapshot_goto           = sd_snapshot_goto,
3281     .bdrv_snapshot_delete         = sd_snapshot_delete,
3282     .bdrv_snapshot_list           = sd_snapshot_list,
3283
3284     .bdrv_save_vmstate            = sd_save_vmstate,
3285     .bdrv_load_vmstate            = sd_load_vmstate,
3286
3287     .bdrv_detach_aio_context      = sd_detach_aio_context,
3288     .bdrv_attach_aio_context      = sd_attach_aio_context,
3289
3290     .create_opts                  = &sd_create_opts,
3291 };
3292
3293 static BlockDriver bdrv_sheepdog_unix = {
3294     .format_name                  = "sheepdog",
3295     .protocol_name                = "sheepdog+unix",
3296     .instance_size                = sizeof(BDRVSheepdogState),
3297     .bdrv_parse_filename          = sd_parse_filename,
3298     .bdrv_file_open               = sd_open,
3299     .bdrv_reopen_prepare          = sd_reopen_prepare,
3300     .bdrv_reopen_commit           = sd_reopen_commit,
3301     .bdrv_reopen_abort            = sd_reopen_abort,
3302     .bdrv_close                   = sd_close,
3303     .bdrv_co_create               = sd_co_create,
3304     .bdrv_co_create_opts          = sd_co_create_opts,
3305     .bdrv_has_zero_init           = bdrv_has_zero_init_1,
3306     .bdrv_getlength               = sd_getlength,
3307     .bdrv_get_allocated_file_size = sd_get_allocated_file_size,
3308     .bdrv_co_truncate             = sd_co_truncate,
3309
3310     .bdrv_co_readv                = sd_co_readv,
3311     .bdrv_co_writev               = sd_co_writev,
3312     .bdrv_co_flush_to_disk        = sd_co_flush_to_disk,
3313     .bdrv_co_pdiscard             = sd_co_pdiscard,
3314     .bdrv_co_block_status         = sd_co_block_status,
3315
3316     .bdrv_snapshot_create         = sd_snapshot_create,
3317     .bdrv_snapshot_goto           = sd_snapshot_goto,
3318     .bdrv_snapshot_delete         = sd_snapshot_delete,
3319     .bdrv_snapshot_list           = sd_snapshot_list,
3320
3321     .bdrv_save_vmstate            = sd_save_vmstate,
3322     .bdrv_load_vmstate            = sd_load_vmstate,
3323
3324     .bdrv_detach_aio_context      = sd_detach_aio_context,
3325     .bdrv_attach_aio_context      = sd_attach_aio_context,
3326
3327     .create_opts                  = &sd_create_opts,
3328 };
3329
3330 static void bdrv_sheepdog_init(void)
3331 {
3332     bdrv_register(&bdrv_sheepdog);
3333     bdrv_register(&bdrv_sheepdog_tcp);
3334     bdrv_register(&bdrv_sheepdog_unix);
3335 }
3336 block_init(bdrv_sheepdog_init);
This page took 0.213322 seconds and 4 git commands to generate.