aio: remove process_queue callback and qemu_aio_process_queue
[qemu.git] / block / sheepdog.c
1 /*
2  * Copyright (C) 2009-2010 Nippon Telegraph and Telephone Corporation.
3  *
4  * This program is free software; you can redistribute it and/or
5  * modify it under the terms of the GNU General Public License version
6  * 2 as published by the Free Software Foundation.
7  *
8  * You should have received a copy of the GNU General Public License
9  * along with this program. If not, see <http://www.gnu.org/licenses/>.
10  *
11  * Contributions after 2012-01-13 are licensed under the terms of the
12  * GNU GPL, version 2 or (at your option) any later version.
13  */
14
15 #include "qemu-common.h"
16 #include "qemu-error.h"
17 #include "qemu_socket.h"
18 #include "block_int.h"
19 #include "bitops.h"
20
21 #define SD_PROTO_VER 0x01
22
23 #define SD_DEFAULT_ADDR "localhost"
24 #define SD_DEFAULT_PORT "7000"
25
26 #define SD_OP_CREATE_AND_WRITE_OBJ  0x01
27 #define SD_OP_READ_OBJ       0x02
28 #define SD_OP_WRITE_OBJ      0x03
29
30 #define SD_OP_NEW_VDI        0x11
31 #define SD_OP_LOCK_VDI       0x12
32 #define SD_OP_RELEASE_VDI    0x13
33 #define SD_OP_GET_VDI_INFO   0x14
34 #define SD_OP_READ_VDIS      0x15
35 #define SD_OP_FLUSH_VDI      0x16
36
37 #define SD_FLAG_CMD_WRITE    0x01
38 #define SD_FLAG_CMD_COW      0x02
39 #define SD_FLAG_CMD_CACHE    0x04
40
41 #define SD_RES_SUCCESS       0x00 /* Success */
42 #define SD_RES_UNKNOWN       0x01 /* Unknown error */
43 #define SD_RES_NO_OBJ        0x02 /* No object found */
44 #define SD_RES_EIO           0x03 /* I/O error */
45 #define SD_RES_VDI_EXIST     0x04 /* Vdi exists already */
46 #define SD_RES_INVALID_PARMS 0x05 /* Invalid parameters */
47 #define SD_RES_SYSTEM_ERROR  0x06 /* System error */
48 #define SD_RES_VDI_LOCKED    0x07 /* Vdi is locked */
49 #define SD_RES_NO_VDI        0x08 /* No vdi found */
50 #define SD_RES_NO_BASE_VDI   0x09 /* No base vdi found */
51 #define SD_RES_VDI_READ      0x0A /* Cannot read requested vdi */
52 #define SD_RES_VDI_WRITE     0x0B /* Cannot write requested vdi */
53 #define SD_RES_BASE_VDI_READ 0x0C /* Cannot read base vdi */
54 #define SD_RES_BASE_VDI_WRITE   0x0D /* Cannot write base vdi */
55 #define SD_RES_NO_TAG        0x0E /* Requested tag is not found */
56 #define SD_RES_STARTUP       0x0F /* Sheepdog is on starting up */
57 #define SD_RES_VDI_NOT_LOCKED   0x10 /* Vdi is not locked */
58 #define SD_RES_SHUTDOWN      0x11 /* Sheepdog is shutting down */
59 #define SD_RES_NO_MEM        0x12 /* Cannot allocate memory */
60 #define SD_RES_FULL_VDI      0x13 /* we already have the maximum vdis */
61 #define SD_RES_VER_MISMATCH  0x14 /* Protocol version mismatch */
62 #define SD_RES_NO_SPACE      0x15 /* Server has no room for new objects */
63 #define SD_RES_WAIT_FOR_FORMAT  0x16 /* Waiting for a format operation */
64 #define SD_RES_WAIT_FOR_JOIN    0x17 /* Waiting for other nodes joining */
65 #define SD_RES_JOIN_FAILED   0x18 /* Target node had failed to join sheepdog */
66
67 /*
68  * Object ID rules
69  *
70  *  0 - 19 (20 bits): data object space
71  * 20 - 31 (12 bits): reserved data object space
72  * 32 - 55 (24 bits): vdi object space
73  * 56 - 59 ( 4 bits): reserved vdi object space
74  * 60 - 63 ( 4 bits): object type identifier space
75  */
76
77 #define VDI_SPACE_SHIFT   32
78 #define VDI_BIT (UINT64_C(1) << 63)
79 #define VMSTATE_BIT (UINT64_C(1) << 62)
80 #define MAX_DATA_OBJS (UINT64_C(1) << 20)
81 #define MAX_CHILDREN 1024
82 #define SD_MAX_VDI_LEN 256
83 #define SD_MAX_VDI_TAG_LEN 256
84 #define SD_NR_VDIS   (1U << 24)
85 #define SD_DATA_OBJ_SIZE (UINT64_C(1) << 22)
86 #define SD_MAX_VDI_SIZE (SD_DATA_OBJ_SIZE * MAX_DATA_OBJS)
87 #define SECTOR_SIZE 512
88
89 #define SD_INODE_SIZE (sizeof(SheepdogInode))
90 #define CURRENT_VDI_ID 0
91
92 typedef struct SheepdogReq {
93     uint8_t proto_ver;
94     uint8_t opcode;
95     uint16_t flags;
96     uint32_t epoch;
97     uint32_t id;
98     uint32_t data_length;
99     uint32_t opcode_specific[8];
100 } SheepdogReq;
101
102 typedef struct SheepdogRsp {
103     uint8_t proto_ver;
104     uint8_t opcode;
105     uint16_t flags;
106     uint32_t epoch;
107     uint32_t id;
108     uint32_t data_length;
109     uint32_t result;
110     uint32_t opcode_specific[7];
111 } SheepdogRsp;
112
113 typedef struct SheepdogObjReq {
114     uint8_t proto_ver;
115     uint8_t opcode;
116     uint16_t flags;
117     uint32_t epoch;
118     uint32_t id;
119     uint32_t data_length;
120     uint64_t oid;
121     uint64_t cow_oid;
122     uint32_t copies;
123     uint32_t rsvd;
124     uint64_t offset;
125 } SheepdogObjReq;
126
127 typedef struct SheepdogObjRsp {
128     uint8_t proto_ver;
129     uint8_t opcode;
130     uint16_t flags;
131     uint32_t epoch;
132     uint32_t id;
133     uint32_t data_length;
134     uint32_t result;
135     uint32_t copies;
136     uint32_t pad[6];
137 } SheepdogObjRsp;
138
139 typedef struct SheepdogVdiReq {
140     uint8_t proto_ver;
141     uint8_t opcode;
142     uint16_t flags;
143     uint32_t epoch;
144     uint32_t id;
145     uint32_t data_length;
146     uint64_t vdi_size;
147     uint32_t base_vdi_id;
148     uint32_t copies;
149     uint32_t snapid;
150     uint32_t pad[3];
151 } SheepdogVdiReq;
152
153 typedef struct SheepdogVdiRsp {
154     uint8_t proto_ver;
155     uint8_t opcode;
156     uint16_t flags;
157     uint32_t epoch;
158     uint32_t id;
159     uint32_t data_length;
160     uint32_t result;
161     uint32_t rsvd;
162     uint32_t vdi_id;
163     uint32_t pad[5];
164 } SheepdogVdiRsp;
165
166 typedef struct SheepdogInode {
167     char name[SD_MAX_VDI_LEN];
168     char tag[SD_MAX_VDI_TAG_LEN];
169     uint64_t ctime;
170     uint64_t snap_ctime;
171     uint64_t vm_clock_nsec;
172     uint64_t vdi_size;
173     uint64_t vm_state_size;
174     uint16_t copy_policy;
175     uint8_t nr_copies;
176     uint8_t block_size_shift;
177     uint32_t snap_id;
178     uint32_t vdi_id;
179     uint32_t parent_vdi_id;
180     uint32_t child_vdi_id[MAX_CHILDREN];
181     uint32_t data_vdi_id[MAX_DATA_OBJS];
182 } SheepdogInode;
183
184 /*
185  * 64 bit FNV-1a non-zero initial basis
186  */
187 #define FNV1A_64_INIT ((uint64_t)0xcbf29ce484222325ULL)
188
189 /*
190  * 64 bit Fowler/Noll/Vo FNV-1a hash code
191  */
192 static inline uint64_t fnv_64a_buf(void *buf, size_t len, uint64_t hval)
193 {
194     unsigned char *bp = buf;
195     unsigned char *be = bp + len;
196     while (bp < be) {
197         hval ^= (uint64_t) *bp++;
198         hval += (hval << 1) + (hval << 4) + (hval << 5) +
199             (hval << 7) + (hval << 8) + (hval << 40);
200     }
201     return hval;
202 }
203
204 static inline int is_data_obj_writable(SheepdogInode *inode, unsigned int idx)
205 {
206     return inode->vdi_id == inode->data_vdi_id[idx];
207 }
208
209 static inline int is_data_obj(uint64_t oid)
210 {
211     return !(VDI_BIT & oid);
212 }
213
214 static inline uint64_t data_oid_to_idx(uint64_t oid)
215 {
216     return oid & (MAX_DATA_OBJS - 1);
217 }
218
219 static inline uint64_t vid_to_vdi_oid(uint32_t vid)
220 {
221     return VDI_BIT | ((uint64_t)vid << VDI_SPACE_SHIFT);
222 }
223
224 static inline uint64_t vid_to_vmstate_oid(uint32_t vid, uint32_t idx)
225 {
226     return VMSTATE_BIT | ((uint64_t)vid << VDI_SPACE_SHIFT) | idx;
227 }
228
229 static inline uint64_t vid_to_data_oid(uint32_t vid, uint32_t idx)
230 {
231     return ((uint64_t)vid << VDI_SPACE_SHIFT) | idx;
232 }
233
234 static inline int is_snapshot(struct SheepdogInode *inode)
235 {
236     return !!inode->snap_ctime;
237 }
238
239 #undef dprintf
240 #ifdef DEBUG_SDOG
241 #define dprintf(fmt, args...)                                       \
242     do {                                                            \
243         fprintf(stdout, "%s %d: " fmt, __func__, __LINE__, ##args); \
244     } while (0)
245 #else
246 #define dprintf(fmt, args...)
247 #endif
248
249 typedef struct SheepdogAIOCB SheepdogAIOCB;
250
251 typedef struct AIOReq {
252     SheepdogAIOCB *aiocb;
253     unsigned int iov_offset;
254
255     uint64_t oid;
256     uint64_t base_oid;
257     uint64_t offset;
258     unsigned int data_len;
259     uint8_t flags;
260     uint32_t id;
261
262     QLIST_ENTRY(AIOReq) outstanding_aio_siblings;
263     QLIST_ENTRY(AIOReq) aioreq_siblings;
264 } AIOReq;
265
266 enum AIOCBState {
267     AIOCB_WRITE_UDATA,
268     AIOCB_READ_UDATA,
269 };
270
271 struct SheepdogAIOCB {
272     BlockDriverAIOCB common;
273
274     QEMUIOVector *qiov;
275
276     int64_t sector_num;
277     int nb_sectors;
278
279     int ret;
280     enum AIOCBState aiocb_type;
281
282     Coroutine *coroutine;
283     void (*aio_done_func)(SheepdogAIOCB *);
284
285     int canceled;
286
287     QLIST_HEAD(aioreq_head, AIOReq) aioreq_head;
288 };
289
290 typedef struct BDRVSheepdogState {
291     SheepdogInode inode;
292
293     uint32_t min_dirty_data_idx;
294     uint32_t max_dirty_data_idx;
295
296     char name[SD_MAX_VDI_LEN];
297     int is_snapshot;
298     uint8_t cache_enabled;
299
300     char *addr;
301     char *port;
302     int fd;
303     int flush_fd;
304
305     CoMutex lock;
306     Coroutine *co_send;
307     Coroutine *co_recv;
308
309     uint32_t aioreq_seq_num;
310     QLIST_HEAD(outstanding_aio_head, AIOReq) outstanding_aio_head;
311 } BDRVSheepdogState;
312
313 static const char * sd_strerror(int err)
314 {
315     int i;
316
317     static const struct {
318         int err;
319         const char *desc;
320     } errors[] = {
321         {SD_RES_SUCCESS, "Success"},
322         {SD_RES_UNKNOWN, "Unknown error"},
323         {SD_RES_NO_OBJ, "No object found"},
324         {SD_RES_EIO, "I/O error"},
325         {SD_RES_VDI_EXIST, "VDI exists already"},
326         {SD_RES_INVALID_PARMS, "Invalid parameters"},
327         {SD_RES_SYSTEM_ERROR, "System error"},
328         {SD_RES_VDI_LOCKED, "VDI is already locked"},
329         {SD_RES_NO_VDI, "No vdi found"},
330         {SD_RES_NO_BASE_VDI, "No base VDI found"},
331         {SD_RES_VDI_READ, "Failed read the requested VDI"},
332         {SD_RES_VDI_WRITE, "Failed to write the requested VDI"},
333         {SD_RES_BASE_VDI_READ, "Failed to read the base VDI"},
334         {SD_RES_BASE_VDI_WRITE, "Failed to write the base VDI"},
335         {SD_RES_NO_TAG, "Failed to find the requested tag"},
336         {SD_RES_STARTUP, "The system is still booting"},
337         {SD_RES_VDI_NOT_LOCKED, "VDI isn't locked"},
338         {SD_RES_SHUTDOWN, "The system is shutting down"},
339         {SD_RES_NO_MEM, "Out of memory on the server"},
340         {SD_RES_FULL_VDI, "We already have the maximum vdis"},
341         {SD_RES_VER_MISMATCH, "Protocol version mismatch"},
342         {SD_RES_NO_SPACE, "Server has no space for new objects"},
343         {SD_RES_WAIT_FOR_FORMAT, "Sheepdog is waiting for a format operation"},
344         {SD_RES_WAIT_FOR_JOIN, "Sheepdog is waiting for other nodes joining"},
345         {SD_RES_JOIN_FAILED, "Target node had failed to join sheepdog"},
346     };
347
348     for (i = 0; i < ARRAY_SIZE(errors); ++i) {
349         if (errors[i].err == err) {
350             return errors[i].desc;
351         }
352     }
353
354     return "Invalid error code";
355 }
356
357 /*
358  * Sheepdog I/O handling:
359  *
360  * 1. In sd_co_rw_vector, we send the I/O requests to the server and
361  *    link the requests to the outstanding_list in the
362  *    BDRVSheepdogState.  The function exits without waiting for
363  *    receiving the response.
364  *
365  * 2. We receive the response in aio_read_response, the fd handler to
366  *    the sheepdog connection.  If metadata update is needed, we send
367  *    the write request to the vdi object in sd_write_done, the write
368  *    completion function.  We switch back to sd_co_readv/writev after
369  *    all the requests belonging to the AIOCB are finished.
370  */
371
372 static inline AIOReq *alloc_aio_req(BDRVSheepdogState *s, SheepdogAIOCB *acb,
373                                     uint64_t oid, unsigned int data_len,
374                                     uint64_t offset, uint8_t flags,
375                                     uint64_t base_oid, unsigned int iov_offset)
376 {
377     AIOReq *aio_req;
378
379     aio_req = g_malloc(sizeof(*aio_req));
380     aio_req->aiocb = acb;
381     aio_req->iov_offset = iov_offset;
382     aio_req->oid = oid;
383     aio_req->base_oid = base_oid;
384     aio_req->offset = offset;
385     aio_req->data_len = data_len;
386     aio_req->flags = flags;
387     aio_req->id = s->aioreq_seq_num++;
388
389     QLIST_INSERT_HEAD(&s->outstanding_aio_head, aio_req,
390                       outstanding_aio_siblings);
391     QLIST_INSERT_HEAD(&acb->aioreq_head, aio_req, aioreq_siblings);
392
393     return aio_req;
394 }
395
396 static inline int free_aio_req(BDRVSheepdogState *s, AIOReq *aio_req)
397 {
398     SheepdogAIOCB *acb = aio_req->aiocb;
399     QLIST_REMOVE(aio_req, outstanding_aio_siblings);
400     QLIST_REMOVE(aio_req, aioreq_siblings);
401     g_free(aio_req);
402
403     return !QLIST_EMPTY(&acb->aioreq_head);
404 }
405
406 static void coroutine_fn sd_finish_aiocb(SheepdogAIOCB *acb)
407 {
408     if (!acb->canceled) {
409         qemu_coroutine_enter(acb->coroutine, NULL);
410     }
411     qemu_aio_release(acb);
412 }
413
414 static void sd_aio_cancel(BlockDriverAIOCB *blockacb)
415 {
416     SheepdogAIOCB *acb = (SheepdogAIOCB *)blockacb;
417
418     /*
419      * Sheepdog cannot cancel the requests which are already sent to
420      * the servers, so we just complete the request with -EIO here.
421      */
422     acb->ret = -EIO;
423     qemu_coroutine_enter(acb->coroutine, NULL);
424     acb->canceled = 1;
425 }
426
427 static AIOPool sd_aio_pool = {
428     .aiocb_size = sizeof(SheepdogAIOCB),
429     .cancel = sd_aio_cancel,
430 };
431
432 static SheepdogAIOCB *sd_aio_setup(BlockDriverState *bs, QEMUIOVector *qiov,
433                                    int64_t sector_num, int nb_sectors,
434                                    BlockDriverCompletionFunc *cb, void *opaque)
435 {
436     SheepdogAIOCB *acb;
437
438     acb = qemu_aio_get(&sd_aio_pool, bs, cb, opaque);
439
440     acb->qiov = qiov;
441
442     acb->sector_num = sector_num;
443     acb->nb_sectors = nb_sectors;
444
445     acb->aio_done_func = NULL;
446     acb->canceled = 0;
447     acb->coroutine = qemu_coroutine_self();
448     acb->ret = 0;
449     QLIST_INIT(&acb->aioreq_head);
450     return acb;
451 }
452
453 static int connect_to_sdog(const char *addr, const char *port)
454 {
455     char hbuf[NI_MAXHOST], sbuf[NI_MAXSERV];
456     int fd, ret;
457     struct addrinfo hints, *res, *res0;
458
459     if (!addr) {
460         addr = SD_DEFAULT_ADDR;
461         port = SD_DEFAULT_PORT;
462     }
463
464     memset(&hints, 0, sizeof(hints));
465     hints.ai_socktype = SOCK_STREAM;
466
467     ret = getaddrinfo(addr, port, &hints, &res0);
468     if (ret) {
469         error_report("unable to get address info %s, %s",
470                      addr, strerror(errno));
471         return -1;
472     }
473
474     for (res = res0; res; res = res->ai_next) {
475         ret = getnameinfo(res->ai_addr, res->ai_addrlen, hbuf, sizeof(hbuf),
476                           sbuf, sizeof(sbuf), NI_NUMERICHOST | NI_NUMERICSERV);
477         if (ret) {
478             continue;
479         }
480
481         fd = socket(res->ai_family, res->ai_socktype, res->ai_protocol);
482         if (fd < 0) {
483             continue;
484         }
485
486     reconnect:
487         ret = connect(fd, res->ai_addr, res->ai_addrlen);
488         if (ret < 0) {
489             if (errno == EINTR) {
490                 goto reconnect;
491             }
492             break;
493         }
494
495         dprintf("connected to %s:%s\n", addr, port);
496         goto success;
497     }
498     fd = -1;
499     error_report("failed connect to %s:%s", addr, port);
500 success:
501     freeaddrinfo(res0);
502     return fd;
503 }
504
505 static int send_req(int sockfd, SheepdogReq *hdr, void *data,
506                     unsigned int *wlen)
507 {
508     int ret;
509
510     ret = qemu_send_full(sockfd, hdr, sizeof(*hdr), 0);
511     if (ret < sizeof(*hdr)) {
512         error_report("failed to send a req, %s", strerror(errno));
513         return ret;
514     }
515
516     ret = qemu_send_full(sockfd, data, *wlen, 0);
517     if (ret < *wlen) {
518         error_report("failed to send a req, %s", strerror(errno));
519     }
520
521     return ret;
522 }
523
524 static int send_co_req(int sockfd, SheepdogReq *hdr, void *data,
525                        unsigned int *wlen)
526 {
527     int ret;
528
529     ret = qemu_co_send(sockfd, hdr, sizeof(*hdr));
530     if (ret < sizeof(*hdr)) {
531         error_report("failed to send a req, %s", strerror(errno));
532         return ret;
533     }
534
535     ret = qemu_co_send(sockfd, data, *wlen);
536     if (ret < *wlen) {
537         error_report("failed to send a req, %s", strerror(errno));
538     }
539
540     return ret;
541 }
542 static int do_req(int sockfd, SheepdogReq *hdr, void *data,
543                   unsigned int *wlen, unsigned int *rlen)
544 {
545     int ret;
546
547     socket_set_block(sockfd);
548     ret = send_req(sockfd, hdr, data, wlen);
549     if (ret < 0) {
550         goto out;
551     }
552
553     ret = qemu_recv_full(sockfd, hdr, sizeof(*hdr), 0);
554     if (ret < sizeof(*hdr)) {
555         error_report("failed to get a rsp, %s", strerror(errno));
556         goto out;
557     }
558
559     if (*rlen > hdr->data_length) {
560         *rlen = hdr->data_length;
561     }
562
563     if (*rlen) {
564         ret = qemu_recv_full(sockfd, data, *rlen, 0);
565         if (ret < *rlen) {
566             error_report("failed to get the data, %s", strerror(errno));
567             goto out;
568         }
569     }
570     ret = 0;
571 out:
572     socket_set_nonblock(sockfd);
573     return ret;
574 }
575
576 static int do_co_req(int sockfd, SheepdogReq *hdr, void *data,
577                      unsigned int *wlen, unsigned int *rlen)
578 {
579     int ret;
580
581     socket_set_block(sockfd);
582     ret = send_co_req(sockfd, hdr, data, wlen);
583     if (ret < 0) {
584         goto out;
585     }
586
587     ret = qemu_co_recv(sockfd, hdr, sizeof(*hdr));
588     if (ret < sizeof(*hdr)) {
589         error_report("failed to get a rsp, %s", strerror(errno));
590         goto out;
591     }
592
593     if (*rlen > hdr->data_length) {
594         *rlen = hdr->data_length;
595     }
596
597     if (*rlen) {
598         ret = qemu_co_recv(sockfd, data, *rlen);
599         if (ret < *rlen) {
600             error_report("failed to get the data, %s", strerror(errno));
601             goto out;
602         }
603     }
604     ret = 0;
605 out:
606     socket_set_nonblock(sockfd);
607     return ret;
608 }
609
610 static int coroutine_fn add_aio_request(BDRVSheepdogState *s, AIOReq *aio_req,
611                            struct iovec *iov, int niov, int create,
612                            enum AIOCBState aiocb_type);
613
614 /*
615  * This function searchs pending requests to the object `oid', and
616  * sends them.
617  */
618 static void coroutine_fn send_pending_req(BDRVSheepdogState *s, uint64_t oid, uint32_t id)
619 {
620     AIOReq *aio_req, *next;
621     SheepdogAIOCB *acb;
622     int ret;
623
624     QLIST_FOREACH_SAFE(aio_req, &s->outstanding_aio_head,
625                        outstanding_aio_siblings, next) {
626         if (id == aio_req->id) {
627             continue;
628         }
629         if (aio_req->oid != oid) {
630             continue;
631         }
632
633         acb = aio_req->aiocb;
634         ret = add_aio_request(s, aio_req, acb->qiov->iov,
635                               acb->qiov->niov, 0, acb->aiocb_type);
636         if (ret < 0) {
637             error_report("add_aio_request is failed");
638             free_aio_req(s, aio_req);
639             if (QLIST_EMPTY(&acb->aioreq_head)) {
640                 sd_finish_aiocb(acb);
641             }
642         }
643     }
644 }
645
646 /*
647  * Receive responses of the I/O requests.
648  *
649  * This function is registered as a fd handler, and called from the
650  * main loop when s->fd is ready for reading responses.
651  */
652 static void coroutine_fn aio_read_response(void *opaque)
653 {
654     SheepdogObjRsp rsp;
655     BDRVSheepdogState *s = opaque;
656     int fd = s->fd;
657     int ret;
658     AIOReq *aio_req = NULL;
659     SheepdogAIOCB *acb;
660     int rest;
661     unsigned long idx;
662
663     if (QLIST_EMPTY(&s->outstanding_aio_head)) {
664         goto out;
665     }
666
667     /* read a header */
668     ret = qemu_co_recv(fd, &rsp, sizeof(rsp));
669     if (ret < 0) {
670         error_report("failed to get the header, %s", strerror(errno));
671         goto out;
672     }
673
674     /* find the right aio_req from the outstanding_aio list */
675     QLIST_FOREACH(aio_req, &s->outstanding_aio_head, outstanding_aio_siblings) {
676         if (aio_req->id == rsp.id) {
677             break;
678         }
679     }
680     if (!aio_req) {
681         error_report("cannot find aio_req %x", rsp.id);
682         goto out;
683     }
684
685     acb = aio_req->aiocb;
686
687     switch (acb->aiocb_type) {
688     case AIOCB_WRITE_UDATA:
689         /* this coroutine context is no longer suitable for co_recv
690          * because we may send data to update vdi objects */
691         s->co_recv = NULL;
692         if (!is_data_obj(aio_req->oid)) {
693             break;
694         }
695         idx = data_oid_to_idx(aio_req->oid);
696
697         if (s->inode.data_vdi_id[idx] != s->inode.vdi_id) {
698             /*
699              * If the object is newly created one, we need to update
700              * the vdi object (metadata object).  min_dirty_data_idx
701              * and max_dirty_data_idx are changed to include updated
702              * index between them.
703              */
704             s->inode.data_vdi_id[idx] = s->inode.vdi_id;
705             s->max_dirty_data_idx = MAX(idx, s->max_dirty_data_idx);
706             s->min_dirty_data_idx = MIN(idx, s->min_dirty_data_idx);
707
708             /*
709              * Some requests may be blocked because simultaneous
710              * create requests are not allowed, so we search the
711              * pending requests here.
712              */
713             send_pending_req(s, vid_to_data_oid(s->inode.vdi_id, idx), rsp.id);
714         }
715         break;
716     case AIOCB_READ_UDATA:
717         ret = qemu_co_recvv(fd, acb->qiov->iov, rsp.data_length,
718                             aio_req->iov_offset);
719         if (ret < 0) {
720             error_report("failed to get the data, %s", strerror(errno));
721             goto out;
722         }
723         break;
724     }
725
726     if (rsp.result != SD_RES_SUCCESS) {
727         acb->ret = -EIO;
728         error_report("%s", sd_strerror(rsp.result));
729     }
730
731     rest = free_aio_req(s, aio_req);
732     if (!rest) {
733         /*
734          * We've finished all requests which belong to the AIOCB, so
735          * we can switch back to sd_co_readv/writev now.
736          */
737         acb->aio_done_func(acb);
738     }
739 out:
740     s->co_recv = NULL;
741 }
742
743 static void co_read_response(void *opaque)
744 {
745     BDRVSheepdogState *s = opaque;
746
747     if (!s->co_recv) {
748         s->co_recv = qemu_coroutine_create(aio_read_response);
749     }
750
751     qemu_coroutine_enter(s->co_recv, opaque);
752 }
753
754 static void co_write_request(void *opaque)
755 {
756     BDRVSheepdogState *s = opaque;
757
758     qemu_coroutine_enter(s->co_send, NULL);
759 }
760
761 static int aio_flush_request(void *opaque)
762 {
763     BDRVSheepdogState *s = opaque;
764
765     return !QLIST_EMPTY(&s->outstanding_aio_head);
766 }
767
768 static int set_nodelay(int fd)
769 {
770     int ret, opt;
771
772     opt = 1;
773     ret = setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, (char *)&opt, sizeof(opt));
774     return ret;
775 }
776
777 /*
778  * Return a socket discriptor to read/write objects.
779  *
780  * We cannot use this discriptor for other operations because
781  * the block driver may be on waiting response from the server.
782  */
783 static int get_sheep_fd(BDRVSheepdogState *s)
784 {
785     int ret, fd;
786
787     fd = connect_to_sdog(s->addr, s->port);
788     if (fd < 0) {
789         error_report("%s", strerror(errno));
790         return -1;
791     }
792
793     socket_set_nonblock(fd);
794
795     ret = set_nodelay(fd);
796     if (ret) {
797         error_report("%s", strerror(errno));
798         closesocket(fd);
799         return -1;
800     }
801
802     qemu_aio_set_fd_handler(fd, co_read_response, NULL, aio_flush_request, s);
803     return fd;
804 }
805
806 /*
807  * Parse a filename
808  *
809  * filename must be one of the following formats:
810  *   1. [vdiname]
811  *   2. [vdiname]:[snapid]
812  *   3. [vdiname]:[tag]
813  *   4. [hostname]:[port]:[vdiname]
814  *   5. [hostname]:[port]:[vdiname]:[snapid]
815  *   6. [hostname]:[port]:[vdiname]:[tag]
816  *
817  * You can boot from the snapshot images by specifying `snapid` or
818  * `tag'.
819  *
820  * You can run VMs outside the Sheepdog cluster by specifying
821  * `hostname' and `port' (experimental).
822  */
823 static int parse_vdiname(BDRVSheepdogState *s, const char *filename,
824                          char *vdi, uint32_t *snapid, char *tag)
825 {
826     char *p, *q;
827     int nr_sep;
828
829     p = q = g_strdup(filename);
830
831     /* count the number of separators */
832     nr_sep = 0;
833     while (*p) {
834         if (*p == ':') {
835             nr_sep++;
836         }
837         p++;
838     }
839     p = q;
840
841     /* use the first two tokens as hostname and port number. */
842     if (nr_sep >= 2) {
843         s->addr = p;
844         p = strchr(p, ':');
845         *p++ = '\0';
846
847         s->port = p;
848         p = strchr(p, ':');
849         *p++ = '\0';
850     } else {
851         s->addr = NULL;
852         s->port = 0;
853     }
854
855     strncpy(vdi, p, SD_MAX_VDI_LEN);
856
857     p = strchr(vdi, ':');
858     if (p) {
859         *p++ = '\0';
860         *snapid = strtoul(p, NULL, 10);
861         if (*snapid == 0) {
862             strncpy(tag, p, SD_MAX_VDI_TAG_LEN);
863         }
864     } else {
865         *snapid = CURRENT_VDI_ID; /* search current vdi */
866     }
867
868     if (s->addr == NULL) {
869         g_free(q);
870     }
871
872     return 0;
873 }
874
875 static int find_vdi_name(BDRVSheepdogState *s, char *filename, uint32_t snapid,
876                          char *tag, uint32_t *vid, int for_snapshot)
877 {
878     int ret, fd;
879     SheepdogVdiReq hdr;
880     SheepdogVdiRsp *rsp = (SheepdogVdiRsp *)&hdr;
881     unsigned int wlen, rlen = 0;
882     char buf[SD_MAX_VDI_LEN + SD_MAX_VDI_TAG_LEN];
883
884     fd = connect_to_sdog(s->addr, s->port);
885     if (fd < 0) {
886         return -1;
887     }
888
889     memset(buf, 0, sizeof(buf));
890     strncpy(buf, filename, SD_MAX_VDI_LEN);
891     strncpy(buf + SD_MAX_VDI_LEN, tag, SD_MAX_VDI_TAG_LEN);
892
893     memset(&hdr, 0, sizeof(hdr));
894     if (for_snapshot) {
895         hdr.opcode = SD_OP_GET_VDI_INFO;
896     } else {
897         hdr.opcode = SD_OP_LOCK_VDI;
898     }
899     wlen = SD_MAX_VDI_LEN + SD_MAX_VDI_TAG_LEN;
900     hdr.proto_ver = SD_PROTO_VER;
901     hdr.data_length = wlen;
902     hdr.snapid = snapid;
903     hdr.flags = SD_FLAG_CMD_WRITE;
904
905     ret = do_req(fd, (SheepdogReq *)&hdr, buf, &wlen, &rlen);
906     if (ret) {
907         ret = -1;
908         goto out;
909     }
910
911     if (rsp->result != SD_RES_SUCCESS) {
912         error_report("cannot get vdi info, %s, %s %d %s",
913                      sd_strerror(rsp->result), filename, snapid, tag);
914         ret = -1;
915         goto out;
916     }
917     *vid = rsp->vdi_id;
918
919     ret = 0;
920 out:
921     closesocket(fd);
922     return ret;
923 }
924
925 static int coroutine_fn add_aio_request(BDRVSheepdogState *s, AIOReq *aio_req,
926                            struct iovec *iov, int niov, int create,
927                            enum AIOCBState aiocb_type)
928 {
929     int nr_copies = s->inode.nr_copies;
930     SheepdogObjReq hdr;
931     unsigned int wlen;
932     int ret;
933     uint64_t oid = aio_req->oid;
934     unsigned int datalen = aio_req->data_len;
935     uint64_t offset = aio_req->offset;
936     uint8_t flags = aio_req->flags;
937     uint64_t old_oid = aio_req->base_oid;
938
939     if (!nr_copies) {
940         error_report("bug");
941     }
942
943     memset(&hdr, 0, sizeof(hdr));
944
945     if (aiocb_type == AIOCB_READ_UDATA) {
946         wlen = 0;
947         hdr.opcode = SD_OP_READ_OBJ;
948         hdr.flags = flags;
949     } else if (create) {
950         wlen = datalen;
951         hdr.opcode = SD_OP_CREATE_AND_WRITE_OBJ;
952         hdr.flags = SD_FLAG_CMD_WRITE | flags;
953     } else {
954         wlen = datalen;
955         hdr.opcode = SD_OP_WRITE_OBJ;
956         hdr.flags = SD_FLAG_CMD_WRITE | flags;
957     }
958
959     if (s->cache_enabled) {
960         hdr.flags |= SD_FLAG_CMD_CACHE;
961     }
962
963     hdr.oid = oid;
964     hdr.cow_oid = old_oid;
965     hdr.copies = s->inode.nr_copies;
966
967     hdr.data_length = datalen;
968     hdr.offset = offset;
969
970     hdr.id = aio_req->id;
971
972     qemu_co_mutex_lock(&s->lock);
973     s->co_send = qemu_coroutine_self();
974     qemu_aio_set_fd_handler(s->fd, co_read_response, co_write_request,
975                             aio_flush_request, s);
976     socket_set_cork(s->fd, 1);
977
978     /* send a header */
979     ret = qemu_co_send(s->fd, &hdr, sizeof(hdr));
980     if (ret < 0) {
981         qemu_co_mutex_unlock(&s->lock);
982         error_report("failed to send a req, %s", strerror(errno));
983         return -EIO;
984     }
985
986     if (wlen) {
987         ret = qemu_co_sendv(s->fd, iov, wlen, aio_req->iov_offset);
988         if (ret < 0) {
989             qemu_co_mutex_unlock(&s->lock);
990             error_report("failed to send a data, %s", strerror(errno));
991             return -EIO;
992         }
993     }
994
995     socket_set_cork(s->fd, 0);
996     qemu_aio_set_fd_handler(s->fd, co_read_response, NULL,
997                             aio_flush_request, s);
998     qemu_co_mutex_unlock(&s->lock);
999
1000     return 0;
1001 }
1002
1003 static int read_write_object(int fd, char *buf, uint64_t oid, int copies,
1004                              unsigned int datalen, uint64_t offset,
1005                              int write, int create, uint8_t cache)
1006 {
1007     SheepdogObjReq hdr;
1008     SheepdogObjRsp *rsp = (SheepdogObjRsp *)&hdr;
1009     unsigned int wlen, rlen;
1010     int ret;
1011
1012     memset(&hdr, 0, sizeof(hdr));
1013
1014     if (write) {
1015         wlen = datalen;
1016         rlen = 0;
1017         hdr.flags = SD_FLAG_CMD_WRITE;
1018         if (create) {
1019             hdr.opcode = SD_OP_CREATE_AND_WRITE_OBJ;
1020         } else {
1021             hdr.opcode = SD_OP_WRITE_OBJ;
1022         }
1023     } else {
1024         wlen = 0;
1025         rlen = datalen;
1026         hdr.opcode = SD_OP_READ_OBJ;
1027     }
1028
1029     if (cache) {
1030         hdr.flags |= SD_FLAG_CMD_CACHE;
1031     }
1032
1033     hdr.oid = oid;
1034     hdr.data_length = datalen;
1035     hdr.offset = offset;
1036     hdr.copies = copies;
1037
1038     ret = do_req(fd, (SheepdogReq *)&hdr, buf, &wlen, &rlen);
1039     if (ret) {
1040         error_report("failed to send a request to the sheep");
1041         return -1;
1042     }
1043
1044     switch (rsp->result) {
1045     case SD_RES_SUCCESS:
1046         return 0;
1047     default:
1048         error_report("%s", sd_strerror(rsp->result));
1049         return -1;
1050     }
1051 }
1052
1053 static int read_object(int fd, char *buf, uint64_t oid, int copies,
1054                        unsigned int datalen, uint64_t offset, uint8_t cache)
1055 {
1056     return read_write_object(fd, buf, oid, copies, datalen, offset, 0, 0,
1057                              cache);
1058 }
1059
1060 static int write_object(int fd, char *buf, uint64_t oid, int copies,
1061                         unsigned int datalen, uint64_t offset, int create,
1062                         uint8_t cache)
1063 {
1064     return read_write_object(fd, buf, oid, copies, datalen, offset, 1, create,
1065                              cache);
1066 }
1067
1068 static int sd_open(BlockDriverState *bs, const char *filename, int flags)
1069 {
1070     int ret, fd;
1071     uint32_t vid = 0;
1072     BDRVSheepdogState *s = bs->opaque;
1073     char vdi[SD_MAX_VDI_LEN], tag[SD_MAX_VDI_TAG_LEN];
1074     uint32_t snapid;
1075     char *buf = NULL;
1076
1077     strstart(filename, "sheepdog:", (const char **)&filename);
1078
1079     QLIST_INIT(&s->outstanding_aio_head);
1080     s->fd = -1;
1081
1082     memset(vdi, 0, sizeof(vdi));
1083     memset(tag, 0, sizeof(tag));
1084     if (parse_vdiname(s, filename, vdi, &snapid, tag) < 0) {
1085         goto out;
1086     }
1087     s->fd = get_sheep_fd(s);
1088     if (s->fd < 0) {
1089         goto out;
1090     }
1091
1092     ret = find_vdi_name(s, vdi, snapid, tag, &vid, 0);
1093     if (ret) {
1094         goto out;
1095     }
1096
1097     if (flags & BDRV_O_CACHE_WB) {
1098         s->cache_enabled = 1;
1099         s->flush_fd = connect_to_sdog(s->addr, s->port);
1100         if (s->flush_fd < 0) {
1101             error_report("failed to connect");
1102             goto out;
1103         }
1104     }
1105
1106     if (snapid) {
1107         dprintf("%" PRIx32 " snapshot inode was open.\n", vid);
1108         s->is_snapshot = 1;
1109     }
1110
1111     fd = connect_to_sdog(s->addr, s->port);
1112     if (fd < 0) {
1113         error_report("failed to connect");
1114         goto out;
1115     }
1116
1117     buf = g_malloc(SD_INODE_SIZE);
1118     ret = read_object(fd, buf, vid_to_vdi_oid(vid), 0, SD_INODE_SIZE, 0,
1119                       s->cache_enabled);
1120
1121     closesocket(fd);
1122
1123     if (ret) {
1124         goto out;
1125     }
1126
1127     memcpy(&s->inode, buf, sizeof(s->inode));
1128     s->min_dirty_data_idx = UINT32_MAX;
1129     s->max_dirty_data_idx = 0;
1130
1131     bs->total_sectors = s->inode.vdi_size / SECTOR_SIZE;
1132     strncpy(s->name, vdi, sizeof(s->name));
1133     qemu_co_mutex_init(&s->lock);
1134     g_free(buf);
1135     return 0;
1136 out:
1137     qemu_aio_set_fd_handler(s->fd, NULL, NULL, NULL, NULL);
1138     if (s->fd >= 0) {
1139         closesocket(s->fd);
1140     }
1141     g_free(buf);
1142     return -1;
1143 }
1144
1145 static int do_sd_create(char *filename, int64_t vdi_size,
1146                         uint32_t base_vid, uint32_t *vdi_id, int snapshot,
1147                         const char *addr, const char *port)
1148 {
1149     SheepdogVdiReq hdr;
1150     SheepdogVdiRsp *rsp = (SheepdogVdiRsp *)&hdr;
1151     int fd, ret;
1152     unsigned int wlen, rlen = 0;
1153     char buf[SD_MAX_VDI_LEN];
1154
1155     fd = connect_to_sdog(addr, port);
1156     if (fd < 0) {
1157         return -EIO;
1158     }
1159
1160     memset(buf, 0, sizeof(buf));
1161     strncpy(buf, filename, SD_MAX_VDI_LEN);
1162
1163     memset(&hdr, 0, sizeof(hdr));
1164     hdr.opcode = SD_OP_NEW_VDI;
1165     hdr.base_vdi_id = base_vid;
1166
1167     wlen = SD_MAX_VDI_LEN;
1168
1169     hdr.flags = SD_FLAG_CMD_WRITE;
1170     hdr.snapid = snapshot;
1171
1172     hdr.data_length = wlen;
1173     hdr.vdi_size = vdi_size;
1174
1175     ret = do_req(fd, (SheepdogReq *)&hdr, buf, &wlen, &rlen);
1176
1177     closesocket(fd);
1178
1179     if (ret) {
1180         return -EIO;
1181     }
1182
1183     if (rsp->result != SD_RES_SUCCESS) {
1184         error_report("%s, %s", sd_strerror(rsp->result), filename);
1185         return -EIO;
1186     }
1187
1188     if (vdi_id) {
1189         *vdi_id = rsp->vdi_id;
1190     }
1191
1192     return 0;
1193 }
1194
1195 static int sd_prealloc(const char *filename)
1196 {
1197     BlockDriverState *bs = NULL;
1198     uint32_t idx, max_idx;
1199     int64_t vdi_size;
1200     void *buf = g_malloc0(SD_DATA_OBJ_SIZE);
1201     int ret;
1202
1203     ret = bdrv_file_open(&bs, filename, BDRV_O_RDWR);
1204     if (ret < 0) {
1205         goto out;
1206     }
1207
1208     vdi_size = bdrv_getlength(bs);
1209     if (vdi_size < 0) {
1210         ret = vdi_size;
1211         goto out;
1212     }
1213     max_idx = DIV_ROUND_UP(vdi_size, SD_DATA_OBJ_SIZE);
1214
1215     for (idx = 0; idx < max_idx; idx++) {
1216         /*
1217          * The created image can be a cloned image, so we need to read
1218          * a data from the source image.
1219          */
1220         ret = bdrv_pread(bs, idx * SD_DATA_OBJ_SIZE, buf, SD_DATA_OBJ_SIZE);
1221         if (ret < 0) {
1222             goto out;
1223         }
1224         ret = bdrv_pwrite(bs, idx * SD_DATA_OBJ_SIZE, buf, SD_DATA_OBJ_SIZE);
1225         if (ret < 0) {
1226             goto out;
1227         }
1228     }
1229 out:
1230     if (bs) {
1231         bdrv_delete(bs);
1232     }
1233     g_free(buf);
1234
1235     return ret;
1236 }
1237
1238 static int sd_create(const char *filename, QEMUOptionParameter *options)
1239 {
1240     int ret;
1241     uint32_t vid = 0, base_vid = 0;
1242     int64_t vdi_size = 0;
1243     char *backing_file = NULL;
1244     BDRVSheepdogState s;
1245     char vdi[SD_MAX_VDI_LEN], tag[SD_MAX_VDI_TAG_LEN];
1246     uint32_t snapid;
1247     int prealloc = 0;
1248     const char *vdiname;
1249
1250     strstart(filename, "sheepdog:", &vdiname);
1251
1252     memset(&s, 0, sizeof(s));
1253     memset(vdi, 0, sizeof(vdi));
1254     memset(tag, 0, sizeof(tag));
1255     if (parse_vdiname(&s, vdiname, vdi, &snapid, tag) < 0) {
1256         error_report("invalid filename");
1257         return -EINVAL;
1258     }
1259
1260     while (options && options->name) {
1261         if (!strcmp(options->name, BLOCK_OPT_SIZE)) {
1262             vdi_size = options->value.n;
1263         } else if (!strcmp(options->name, BLOCK_OPT_BACKING_FILE)) {
1264             backing_file = options->value.s;
1265         } else if (!strcmp(options->name, BLOCK_OPT_PREALLOC)) {
1266             if (!options->value.s || !strcmp(options->value.s, "off")) {
1267                 prealloc = 0;
1268             } else if (!strcmp(options->value.s, "full")) {
1269                 prealloc = 1;
1270             } else {
1271                 error_report("Invalid preallocation mode: '%s'",
1272                              options->value.s);
1273                 return -EINVAL;
1274             }
1275         }
1276         options++;
1277     }
1278
1279     if (vdi_size > SD_MAX_VDI_SIZE) {
1280         error_report("too big image size");
1281         return -EINVAL;
1282     }
1283
1284     if (backing_file) {
1285         BlockDriverState *bs;
1286         BDRVSheepdogState *s;
1287         BlockDriver *drv;
1288
1289         /* Currently, only Sheepdog backing image is supported. */
1290         drv = bdrv_find_protocol(backing_file);
1291         if (!drv || strcmp(drv->protocol_name, "sheepdog") != 0) {
1292             error_report("backing_file must be a sheepdog image");
1293             return -EINVAL;
1294         }
1295
1296         ret = bdrv_file_open(&bs, backing_file, 0);
1297         if (ret < 0)
1298             return -EIO;
1299
1300         s = bs->opaque;
1301
1302         if (!is_snapshot(&s->inode)) {
1303             error_report("cannot clone from a non snapshot vdi");
1304             bdrv_delete(bs);
1305             return -EINVAL;
1306         }
1307
1308         base_vid = s->inode.vdi_id;
1309         bdrv_delete(bs);
1310     }
1311
1312     ret = do_sd_create(vdi, vdi_size, base_vid, &vid, 0, s.addr, s.port);
1313     if (!prealloc || ret) {
1314         return ret;
1315     }
1316
1317     return sd_prealloc(filename);
1318 }
1319
1320 static void sd_close(BlockDriverState *bs)
1321 {
1322     BDRVSheepdogState *s = bs->opaque;
1323     SheepdogVdiReq hdr;
1324     SheepdogVdiRsp *rsp = (SheepdogVdiRsp *)&hdr;
1325     unsigned int wlen, rlen = 0;
1326     int fd, ret;
1327
1328     dprintf("%s\n", s->name);
1329
1330     fd = connect_to_sdog(s->addr, s->port);
1331     if (fd < 0) {
1332         return;
1333     }
1334
1335     memset(&hdr, 0, sizeof(hdr));
1336
1337     hdr.opcode = SD_OP_RELEASE_VDI;
1338     wlen = strlen(s->name) + 1;
1339     hdr.data_length = wlen;
1340     hdr.flags = SD_FLAG_CMD_WRITE;
1341
1342     ret = do_req(fd, (SheepdogReq *)&hdr, s->name, &wlen, &rlen);
1343
1344     closesocket(fd);
1345
1346     if (!ret && rsp->result != SD_RES_SUCCESS &&
1347         rsp->result != SD_RES_VDI_NOT_LOCKED) {
1348         error_report("%s, %s", sd_strerror(rsp->result), s->name);
1349     }
1350
1351     qemu_aio_set_fd_handler(s->fd, NULL, NULL, NULL, NULL);
1352     closesocket(s->fd);
1353     if (s->cache_enabled) {
1354         closesocket(s->flush_fd);
1355     }
1356     g_free(s->addr);
1357 }
1358
1359 static int64_t sd_getlength(BlockDriverState *bs)
1360 {
1361     BDRVSheepdogState *s = bs->opaque;
1362
1363     return s->inode.vdi_size;
1364 }
1365
1366 static int sd_truncate(BlockDriverState *bs, int64_t offset)
1367 {
1368     BDRVSheepdogState *s = bs->opaque;
1369     int ret, fd;
1370     unsigned int datalen;
1371
1372     if (offset < s->inode.vdi_size) {
1373         error_report("shrinking is not supported");
1374         return -EINVAL;
1375     } else if (offset > SD_MAX_VDI_SIZE) {
1376         error_report("too big image size");
1377         return -EINVAL;
1378     }
1379
1380     fd = connect_to_sdog(s->addr, s->port);
1381     if (fd < 0) {
1382         return -EIO;
1383     }
1384
1385     /* we don't need to update entire object */
1386     datalen = SD_INODE_SIZE - sizeof(s->inode.data_vdi_id);
1387     s->inode.vdi_size = offset;
1388     ret = write_object(fd, (char *)&s->inode, vid_to_vdi_oid(s->inode.vdi_id),
1389                        s->inode.nr_copies, datalen, 0, 0, s->cache_enabled);
1390     close(fd);
1391
1392     if (ret < 0) {
1393         error_report("failed to update an inode.");
1394         return -EIO;
1395     }
1396
1397     return 0;
1398 }
1399
1400 /*
1401  * This function is called after writing data objects.  If we need to
1402  * update metadata, this sends a write request to the vdi object.
1403  * Otherwise, this switches back to sd_co_readv/writev.
1404  */
1405 static void coroutine_fn sd_write_done(SheepdogAIOCB *acb)
1406 {
1407     int ret;
1408     BDRVSheepdogState *s = acb->common.bs->opaque;
1409     struct iovec iov;
1410     AIOReq *aio_req;
1411     uint32_t offset, data_len, mn, mx;
1412
1413     mn = s->min_dirty_data_idx;
1414     mx = s->max_dirty_data_idx;
1415     if (mn <= mx) {
1416         /* we need to update the vdi object. */
1417         offset = sizeof(s->inode) - sizeof(s->inode.data_vdi_id) +
1418             mn * sizeof(s->inode.data_vdi_id[0]);
1419         data_len = (mx - mn + 1) * sizeof(s->inode.data_vdi_id[0]);
1420
1421         s->min_dirty_data_idx = UINT32_MAX;
1422         s->max_dirty_data_idx = 0;
1423
1424         iov.iov_base = &s->inode;
1425         iov.iov_len = sizeof(s->inode);
1426         aio_req = alloc_aio_req(s, acb, vid_to_vdi_oid(s->inode.vdi_id),
1427                                 data_len, offset, 0, 0, offset);
1428         ret = add_aio_request(s, aio_req, &iov, 1, 0, AIOCB_WRITE_UDATA);
1429         if (ret) {
1430             free_aio_req(s, aio_req);
1431             acb->ret = -EIO;
1432             goto out;
1433         }
1434
1435         acb->aio_done_func = sd_finish_aiocb;
1436         acb->aiocb_type = AIOCB_WRITE_UDATA;
1437         return;
1438     }
1439 out:
1440     sd_finish_aiocb(acb);
1441 }
1442
1443 /*
1444  * Create a writable VDI from a snapshot
1445  */
1446 static int sd_create_branch(BDRVSheepdogState *s)
1447 {
1448     int ret, fd;
1449     uint32_t vid;
1450     char *buf;
1451
1452     dprintf("%" PRIx32 " is snapshot.\n", s->inode.vdi_id);
1453
1454     buf = g_malloc(SD_INODE_SIZE);
1455
1456     ret = do_sd_create(s->name, s->inode.vdi_size, s->inode.vdi_id, &vid, 1,
1457                        s->addr, s->port);
1458     if (ret) {
1459         goto out;
1460     }
1461
1462     dprintf("%" PRIx32 " is created.\n", vid);
1463
1464     fd = connect_to_sdog(s->addr, s->port);
1465     if (fd < 0) {
1466         error_report("failed to connect");
1467         goto out;
1468     }
1469
1470     ret = read_object(fd, buf, vid_to_vdi_oid(vid), s->inode.nr_copies,
1471                       SD_INODE_SIZE, 0, s->cache_enabled);
1472
1473     closesocket(fd);
1474
1475     if (ret < 0) {
1476         goto out;
1477     }
1478
1479     memcpy(&s->inode, buf, sizeof(s->inode));
1480
1481     s->is_snapshot = 0;
1482     ret = 0;
1483     dprintf("%" PRIx32 " was newly created.\n", s->inode.vdi_id);
1484
1485 out:
1486     g_free(buf);
1487
1488     return ret;
1489 }
1490
1491 /*
1492  * Send I/O requests to the server.
1493  *
1494  * This function sends requests to the server, links the requests to
1495  * the outstanding_list in BDRVSheepdogState, and exits without
1496  * waiting the response.  The responses are received in the
1497  * `aio_read_response' function which is called from the main loop as
1498  * a fd handler.
1499  *
1500  * Returns 1 when we need to wait a response, 0 when there is no sent
1501  * request and -errno in error cases.
1502  */
1503 static int coroutine_fn sd_co_rw_vector(void *p)
1504 {
1505     SheepdogAIOCB *acb = p;
1506     int ret = 0;
1507     unsigned long len, done = 0, total = acb->nb_sectors * SECTOR_SIZE;
1508     unsigned long idx = acb->sector_num * SECTOR_SIZE / SD_DATA_OBJ_SIZE;
1509     uint64_t oid;
1510     uint64_t offset = (acb->sector_num * SECTOR_SIZE) % SD_DATA_OBJ_SIZE;
1511     BDRVSheepdogState *s = acb->common.bs->opaque;
1512     SheepdogInode *inode = &s->inode;
1513     AIOReq *aio_req;
1514
1515     if (acb->aiocb_type == AIOCB_WRITE_UDATA && s->is_snapshot) {
1516         /*
1517          * In the case we open the snapshot VDI, Sheepdog creates the
1518          * writable VDI when we do a write operation first.
1519          */
1520         ret = sd_create_branch(s);
1521         if (ret) {
1522             acb->ret = -EIO;
1523             goto out;
1524         }
1525     }
1526
1527     while (done != total) {
1528         uint8_t flags = 0;
1529         uint64_t old_oid = 0;
1530         int create = 0;
1531
1532         oid = vid_to_data_oid(inode->data_vdi_id[idx], idx);
1533
1534         len = MIN(total - done, SD_DATA_OBJ_SIZE - offset);
1535
1536         if (!inode->data_vdi_id[idx]) {
1537             if (acb->aiocb_type == AIOCB_READ_UDATA) {
1538                 goto done;
1539             }
1540
1541             create = 1;
1542         } else if (acb->aiocb_type == AIOCB_WRITE_UDATA
1543                    && !is_data_obj_writable(inode, idx)) {
1544             /* Copy-On-Write */
1545             create = 1;
1546             old_oid = oid;
1547             flags = SD_FLAG_CMD_COW;
1548         }
1549
1550         if (create) {
1551             dprintf("update ino (%" PRIu32") %" PRIu64 " %" PRIu64
1552                     " %" PRIu64 "\n", inode->vdi_id, oid,
1553                     vid_to_data_oid(inode->data_vdi_id[idx], idx), idx);
1554             oid = vid_to_data_oid(inode->vdi_id, idx);
1555             dprintf("new oid %lx\n", oid);
1556         }
1557
1558         aio_req = alloc_aio_req(s, acb, oid, len, offset, flags, old_oid, done);
1559
1560         if (create) {
1561             AIOReq *areq;
1562             QLIST_FOREACH(areq, &s->outstanding_aio_head,
1563                           outstanding_aio_siblings) {
1564                 if (areq == aio_req) {
1565                     continue;
1566                 }
1567                 if (areq->oid == oid) {
1568                     /*
1569                      * Sheepdog cannot handle simultaneous create
1570                      * requests to the same object.  So we cannot send
1571                      * the request until the previous request
1572                      * finishes.
1573                      */
1574                     aio_req->flags = 0;
1575                     aio_req->base_oid = 0;
1576                     goto done;
1577                 }
1578             }
1579         }
1580
1581         ret = add_aio_request(s, aio_req, acb->qiov->iov, acb->qiov->niov,
1582                               create, acb->aiocb_type);
1583         if (ret < 0) {
1584             error_report("add_aio_request is failed");
1585             free_aio_req(s, aio_req);
1586             acb->ret = -EIO;
1587             goto out;
1588         }
1589     done:
1590         offset = 0;
1591         idx++;
1592         done += len;
1593     }
1594 out:
1595     if (QLIST_EMPTY(&acb->aioreq_head)) {
1596         return acb->ret;
1597     }
1598     return 1;
1599 }
1600
1601 static coroutine_fn int sd_co_writev(BlockDriverState *bs, int64_t sector_num,
1602                         int nb_sectors, QEMUIOVector *qiov)
1603 {
1604     SheepdogAIOCB *acb;
1605     int ret;
1606
1607     if (bs->growable && sector_num + nb_sectors > bs->total_sectors) {
1608         /* TODO: shouldn't block here */
1609         if (sd_truncate(bs, (sector_num + nb_sectors) * SECTOR_SIZE) < 0) {
1610             return -EIO;
1611         }
1612         bs->total_sectors = sector_num + nb_sectors;
1613     }
1614
1615     acb = sd_aio_setup(bs, qiov, sector_num, nb_sectors, NULL, NULL);
1616     acb->aio_done_func = sd_write_done;
1617     acb->aiocb_type = AIOCB_WRITE_UDATA;
1618
1619     ret = sd_co_rw_vector(acb);
1620     if (ret <= 0) {
1621         qemu_aio_release(acb);
1622         return ret;
1623     }
1624
1625     qemu_coroutine_yield();
1626
1627     return acb->ret;
1628 }
1629
1630 static coroutine_fn int sd_co_readv(BlockDriverState *bs, int64_t sector_num,
1631                        int nb_sectors, QEMUIOVector *qiov)
1632 {
1633     SheepdogAIOCB *acb;
1634     int i, ret;
1635
1636     acb = sd_aio_setup(bs, qiov, sector_num, nb_sectors, NULL, NULL);
1637     acb->aiocb_type = AIOCB_READ_UDATA;
1638     acb->aio_done_func = sd_finish_aiocb;
1639
1640     /*
1641      * TODO: we can do better; we don't need to initialize
1642      * blindly.
1643      */
1644     for (i = 0; i < qiov->niov; i++) {
1645         memset(qiov->iov[i].iov_base, 0, qiov->iov[i].iov_len);
1646     }
1647
1648     ret = sd_co_rw_vector(acb);
1649     if (ret <= 0) {
1650         qemu_aio_release(acb);
1651         return ret;
1652     }
1653
1654     qemu_coroutine_yield();
1655
1656     return acb->ret;
1657 }
1658
1659 static int coroutine_fn sd_co_flush_to_disk(BlockDriverState *bs)
1660 {
1661     BDRVSheepdogState *s = bs->opaque;
1662     SheepdogObjReq hdr = { 0 };
1663     SheepdogObjRsp *rsp = (SheepdogObjRsp *)&hdr;
1664     SheepdogInode *inode = &s->inode;
1665     int ret;
1666     unsigned int wlen = 0, rlen = 0;
1667
1668     if (!s->cache_enabled) {
1669         return 0;
1670     }
1671
1672     hdr.opcode = SD_OP_FLUSH_VDI;
1673     hdr.oid = vid_to_vdi_oid(inode->vdi_id);
1674
1675     ret = do_co_req(s->flush_fd, (SheepdogReq *)&hdr, NULL, &wlen, &rlen);
1676     if (ret) {
1677         error_report("failed to send a request to the sheep");
1678         return ret;
1679     }
1680
1681     if (rsp->result != SD_RES_SUCCESS) {
1682         error_report("%s", sd_strerror(rsp->result));
1683         return -EIO;
1684     }
1685
1686     return 0;
1687 }
1688
1689 static int sd_snapshot_create(BlockDriverState *bs, QEMUSnapshotInfo *sn_info)
1690 {
1691     BDRVSheepdogState *s = bs->opaque;
1692     int ret, fd;
1693     uint32_t new_vid;
1694     SheepdogInode *inode;
1695     unsigned int datalen;
1696
1697     dprintf("sn_info: name %s id_str %s s: name %s vm_state_size %d "
1698             "is_snapshot %d\n", sn_info->name, sn_info->id_str,
1699             s->name, sn_info->vm_state_size, s->is_snapshot);
1700
1701     if (s->is_snapshot) {
1702         error_report("You can't create a snapshot of a snapshot VDI, "
1703                      "%s (%" PRIu32 ").", s->name, s->inode.vdi_id);
1704
1705         return -EINVAL;
1706     }
1707
1708     dprintf("%s %s\n", sn_info->name, sn_info->id_str);
1709
1710     s->inode.vm_state_size = sn_info->vm_state_size;
1711     s->inode.vm_clock_nsec = sn_info->vm_clock_nsec;
1712     strncpy(s->inode.tag, sn_info->name, sizeof(s->inode.tag));
1713     /* we don't need to update entire object */
1714     datalen = SD_INODE_SIZE - sizeof(s->inode.data_vdi_id);
1715
1716     /* refresh inode. */
1717     fd = connect_to_sdog(s->addr, s->port);
1718     if (fd < 0) {
1719         ret = -EIO;
1720         goto cleanup;
1721     }
1722
1723     ret = write_object(fd, (char *)&s->inode, vid_to_vdi_oid(s->inode.vdi_id),
1724                        s->inode.nr_copies, datalen, 0, 0, s->cache_enabled);
1725     if (ret < 0) {
1726         error_report("failed to write snapshot's inode.");
1727         ret = -EIO;
1728         goto cleanup;
1729     }
1730
1731     ret = do_sd_create(s->name, s->inode.vdi_size, s->inode.vdi_id, &new_vid, 1,
1732                        s->addr, s->port);
1733     if (ret < 0) {
1734         error_report("failed to create inode for snapshot. %s",
1735                      strerror(errno));
1736         ret = -EIO;
1737         goto cleanup;
1738     }
1739
1740     inode = (SheepdogInode *)g_malloc(datalen);
1741
1742     ret = read_object(fd, (char *)inode, vid_to_vdi_oid(new_vid),
1743                       s->inode.nr_copies, datalen, 0, s->cache_enabled);
1744
1745     if (ret < 0) {
1746         error_report("failed to read new inode info. %s", strerror(errno));
1747         ret = -EIO;
1748         goto cleanup;
1749     }
1750
1751     memcpy(&s->inode, inode, datalen);
1752     dprintf("s->inode: name %s snap_id %x oid %x\n",
1753             s->inode.name, s->inode.snap_id, s->inode.vdi_id);
1754
1755 cleanup:
1756     closesocket(fd);
1757     return ret;
1758 }
1759
1760 static int sd_snapshot_goto(BlockDriverState *bs, const char *snapshot_id)
1761 {
1762     BDRVSheepdogState *s = bs->opaque;
1763     BDRVSheepdogState *old_s;
1764     char vdi[SD_MAX_VDI_LEN], tag[SD_MAX_VDI_TAG_LEN];
1765     char *buf = NULL;
1766     uint32_t vid;
1767     uint32_t snapid = 0;
1768     int ret = -ENOENT, fd;
1769
1770     old_s = g_malloc(sizeof(BDRVSheepdogState));
1771
1772     memcpy(old_s, s, sizeof(BDRVSheepdogState));
1773
1774     memset(vdi, 0, sizeof(vdi));
1775     strncpy(vdi, s->name, sizeof(vdi));
1776
1777     memset(tag, 0, sizeof(tag));
1778     snapid = strtoul(snapshot_id, NULL, 10);
1779     if (!snapid) {
1780         strncpy(tag, s->name, sizeof(tag));
1781     }
1782
1783     ret = find_vdi_name(s, vdi, snapid, tag, &vid, 1);
1784     if (ret) {
1785         error_report("Failed to find_vdi_name");
1786         ret = -ENOENT;
1787         goto out;
1788     }
1789
1790     fd = connect_to_sdog(s->addr, s->port);
1791     if (fd < 0) {
1792         error_report("failed to connect");
1793         goto out;
1794     }
1795
1796     buf = g_malloc(SD_INODE_SIZE);
1797     ret = read_object(fd, buf, vid_to_vdi_oid(vid), s->inode.nr_copies,
1798                       SD_INODE_SIZE, 0, s->cache_enabled);
1799
1800     closesocket(fd);
1801
1802     if (ret) {
1803         ret = -ENOENT;
1804         goto out;
1805     }
1806
1807     memcpy(&s->inode, buf, sizeof(s->inode));
1808
1809     if (!s->inode.vm_state_size) {
1810         error_report("Invalid snapshot");
1811         ret = -ENOENT;
1812         goto out;
1813     }
1814
1815     s->is_snapshot = 1;
1816
1817     g_free(buf);
1818     g_free(old_s);
1819
1820     return 0;
1821 out:
1822     /* recover bdrv_sd_state */
1823     memcpy(s, old_s, sizeof(BDRVSheepdogState));
1824     g_free(buf);
1825     g_free(old_s);
1826
1827     error_report("failed to open. recover old bdrv_sd_state.");
1828
1829     return ret;
1830 }
1831
1832 static int sd_snapshot_delete(BlockDriverState *bs, const char *snapshot_id)
1833 {
1834     /* FIXME: Delete specified snapshot id.  */
1835     return 0;
1836 }
1837
1838 static int sd_snapshot_list(BlockDriverState *bs, QEMUSnapshotInfo **psn_tab)
1839 {
1840     BDRVSheepdogState *s = bs->opaque;
1841     SheepdogReq req;
1842     int fd, nr = 1024, ret, max = BITS_TO_LONGS(SD_NR_VDIS) * sizeof(long);
1843     QEMUSnapshotInfo *sn_tab = NULL;
1844     unsigned wlen, rlen;
1845     int found = 0;
1846     static SheepdogInode inode;
1847     unsigned long *vdi_inuse;
1848     unsigned int start_nr;
1849     uint64_t hval;
1850     uint32_t vid;
1851
1852     vdi_inuse = g_malloc(max);
1853
1854     fd = connect_to_sdog(s->addr, s->port);
1855     if (fd < 0) {
1856         goto out;
1857     }
1858
1859     rlen = max;
1860     wlen = 0;
1861
1862     memset(&req, 0, sizeof(req));
1863
1864     req.opcode = SD_OP_READ_VDIS;
1865     req.data_length = max;
1866
1867     ret = do_req(fd, (SheepdogReq *)&req, vdi_inuse, &wlen, &rlen);
1868
1869     closesocket(fd);
1870     if (ret) {
1871         goto out;
1872     }
1873
1874     sn_tab = g_malloc0(nr * sizeof(*sn_tab));
1875
1876     /* calculate a vdi id with hash function */
1877     hval = fnv_64a_buf(s->name, strlen(s->name), FNV1A_64_INIT);
1878     start_nr = hval & (SD_NR_VDIS - 1);
1879
1880     fd = connect_to_sdog(s->addr, s->port);
1881     if (fd < 0) {
1882         error_report("failed to connect");
1883         goto out;
1884     }
1885
1886     for (vid = start_nr; found < nr; vid = (vid + 1) % SD_NR_VDIS) {
1887         if (!test_bit(vid, vdi_inuse)) {
1888             break;
1889         }
1890
1891         /* we don't need to read entire object */
1892         ret = read_object(fd, (char *)&inode, vid_to_vdi_oid(vid),
1893                           0, SD_INODE_SIZE - sizeof(inode.data_vdi_id), 0,
1894                           s->cache_enabled);
1895
1896         if (ret) {
1897             continue;
1898         }
1899
1900         if (!strcmp(inode.name, s->name) && is_snapshot(&inode)) {
1901             sn_tab[found].date_sec = inode.snap_ctime >> 32;
1902             sn_tab[found].date_nsec = inode.snap_ctime & 0xffffffff;
1903             sn_tab[found].vm_state_size = inode.vm_state_size;
1904             sn_tab[found].vm_clock_nsec = inode.vm_clock_nsec;
1905
1906             snprintf(sn_tab[found].id_str, sizeof(sn_tab[found].id_str), "%u",
1907                      inode.snap_id);
1908             strncpy(sn_tab[found].name, inode.tag,
1909                     MIN(sizeof(sn_tab[found].name), sizeof(inode.tag)));
1910             found++;
1911         }
1912     }
1913
1914     closesocket(fd);
1915 out:
1916     *psn_tab = sn_tab;
1917
1918     g_free(vdi_inuse);
1919
1920     return found;
1921 }
1922
1923 static int do_load_save_vmstate(BDRVSheepdogState *s, uint8_t *data,
1924                                 int64_t pos, int size, int load)
1925 {
1926     int fd, create;
1927     int ret = 0;
1928     unsigned int data_len;
1929     uint64_t vmstate_oid;
1930     uint32_t vdi_index;
1931     uint64_t offset;
1932
1933     fd = connect_to_sdog(s->addr, s->port);
1934     if (fd < 0) {
1935         ret = -EIO;
1936         goto cleanup;
1937     }
1938
1939     while (size) {
1940         vdi_index = pos / SD_DATA_OBJ_SIZE;
1941         offset = pos % SD_DATA_OBJ_SIZE;
1942
1943         data_len = MIN(size, SD_DATA_OBJ_SIZE);
1944
1945         vmstate_oid = vid_to_vmstate_oid(s->inode.vdi_id, vdi_index);
1946
1947         create = (offset == 0);
1948         if (load) {
1949             ret = read_object(fd, (char *)data, vmstate_oid,
1950                               s->inode.nr_copies, data_len, offset,
1951                               s->cache_enabled);
1952         } else {
1953             ret = write_object(fd, (char *)data, vmstate_oid,
1954                                s->inode.nr_copies, data_len, offset, create,
1955                                s->cache_enabled);
1956         }
1957
1958         if (ret < 0) {
1959             error_report("failed to save vmstate %s", strerror(errno));
1960             ret = -EIO;
1961             goto cleanup;
1962         }
1963
1964         pos += data_len;
1965         size -= data_len;
1966         ret += data_len;
1967     }
1968 cleanup:
1969     closesocket(fd);
1970     return ret;
1971 }
1972
1973 static int sd_save_vmstate(BlockDriverState *bs, const uint8_t *data,
1974                            int64_t pos, int size)
1975 {
1976     BDRVSheepdogState *s = bs->opaque;
1977
1978     return do_load_save_vmstate(s, (uint8_t *)data, pos, size, 0);
1979 }
1980
1981 static int sd_load_vmstate(BlockDriverState *bs, uint8_t *data,
1982                            int64_t pos, int size)
1983 {
1984     BDRVSheepdogState *s = bs->opaque;
1985
1986     return do_load_save_vmstate(s, data, pos, size, 1);
1987 }
1988
1989
1990 static QEMUOptionParameter sd_create_options[] = {
1991     {
1992         .name = BLOCK_OPT_SIZE,
1993         .type = OPT_SIZE,
1994         .help = "Virtual disk size"
1995     },
1996     {
1997         .name = BLOCK_OPT_BACKING_FILE,
1998         .type = OPT_STRING,
1999         .help = "File name of a base image"
2000     },
2001     {
2002         .name = BLOCK_OPT_PREALLOC,
2003         .type = OPT_STRING,
2004         .help = "Preallocation mode (allowed values: off, full)"
2005     },
2006     { NULL }
2007 };
2008
2009 BlockDriver bdrv_sheepdog = {
2010     .format_name    = "sheepdog",
2011     .protocol_name  = "sheepdog",
2012     .instance_size  = sizeof(BDRVSheepdogState),
2013     .bdrv_file_open = sd_open,
2014     .bdrv_close     = sd_close,
2015     .bdrv_create    = sd_create,
2016     .bdrv_getlength = sd_getlength,
2017     .bdrv_truncate  = sd_truncate,
2018
2019     .bdrv_co_readv  = sd_co_readv,
2020     .bdrv_co_writev = sd_co_writev,
2021     .bdrv_co_flush_to_disk  = sd_co_flush_to_disk,
2022
2023     .bdrv_snapshot_create   = sd_snapshot_create,
2024     .bdrv_snapshot_goto     = sd_snapshot_goto,
2025     .bdrv_snapshot_delete   = sd_snapshot_delete,
2026     .bdrv_snapshot_list     = sd_snapshot_list,
2027
2028     .bdrv_save_vmstate  = sd_save_vmstate,
2029     .bdrv_load_vmstate  = sd_load_vmstate,
2030
2031     .create_options = sd_create_options,
2032 };
2033
2034 static void bdrv_sheepdog_init(void)
2035 {
2036     bdrv_register(&bdrv_sheepdog);
2037 }
2038 block_init(bdrv_sheepdog_init);
This page took 0.13927 seconds and 4 git commands to generate.