]> Git Repo - qemu.git/blob - block/archipelago.c
iotests: Send the correct fd in socket_scm_helper
[qemu.git] / block / archipelago.c
1 /*
2  * QEMU Block driver for Archipelago
3  *
4  * Copyright (C) 2014 Chrysostomos Nanakos <[email protected]>
5  *
6  * This work is licensed under the terms of the GNU GPL, version 2 or later.
7  * See the COPYING file in the top-level directory.
8  *
9  */
10
11 /*
12  * VM Image on Archipelago volume is specified like this:
13  *
14  * file.driver=archipelago,file.volume=<volumename>
15  * [,file.mport=<mapperd_port>[,file.vport=<vlmcd_port>]
16  * [,file.segment=<segment_name>]]
17  *
18  * or
19  *
20  * file=archipelago:<volumename>[/mport=<mapperd_port>[:vport=<vlmcd_port>][:
21  * segment=<segment_name>]]
22  *
23  * 'archipelago' is the protocol.
24  *
25  * 'mport' is the port number on which mapperd is listening. This is optional
26  * and if not specified, QEMU will make Archipelago to use the default port.
27  *
28  * 'vport' is the port number on which vlmcd is listening. This is optional
29  * and if not specified, QEMU will make Archipelago to use the default port.
30  *
31  * 'segment' is the name of the shared memory segment Archipelago stack
32  * is using. This is optional and if not specified, QEMU will make Archipelago
33  * to use the default value, 'archipelago'.
34  *
35  * Examples:
36  *
37  * file.driver=archipelago,file.volume=my_vm_volume
38  * file.driver=archipelago,file.volume=my_vm_volume,file.mport=123
39  * file.driver=archipelago,file.volume=my_vm_volume,file.mport=123,
40  *  file.vport=1234
41  * file.driver=archipelago,file.volume=my_vm_volume,file.mport=123,
42  *  file.vport=1234,file.segment=my_segment
43  *
44  * or
45  *
46  * file=archipelago:my_vm_volume
47  * file=archipelago:my_vm_volume/mport=123
48  * file=archipelago:my_vm_volume/mport=123:vport=1234
49  * file=archipelago:my_vm_volume/mport=123:vport=1234:segment=my_segment
50  *
51  */
52
53 #include "qemu-common.h"
54 #include "block/block_int.h"
55 #include "qemu/error-report.h"
56 #include "qemu/thread.h"
57 #include "qapi/qmp/qint.h"
58 #include "qapi/qmp/qstring.h"
59 #include "qapi/qmp/qjson.h"
60 #include "qemu/atomic.h"
61
62 #include <inttypes.h>
63 #include <xseg/xseg.h>
64 #include <xseg/protocol.h>
65
66 #define MAX_REQUEST_SIZE    524288
67
68 #define ARCHIPELAGO_OPT_VOLUME      "volume"
69 #define ARCHIPELAGO_OPT_SEGMENT     "segment"
70 #define ARCHIPELAGO_OPT_MPORT       "mport"
71 #define ARCHIPELAGO_OPT_VPORT       "vport"
72 #define ARCHIPELAGO_DFL_MPORT       1001
73 #define ARCHIPELAGO_DFL_VPORT       501
74
75 #define archipelagolog(fmt, ...) \
76     do {                         \
77         fprintf(stderr, "archipelago\t%-24s: " fmt, __func__, ##__VA_ARGS__); \
78     } while (0)
79
80 typedef enum {
81     ARCHIP_OP_READ,
82     ARCHIP_OP_WRITE,
83     ARCHIP_OP_FLUSH,
84     ARCHIP_OP_VOLINFO,
85     ARCHIP_OP_TRUNCATE,
86 } ARCHIPCmd;
87
88 typedef struct ArchipelagoAIOCB {
89     BlockDriverAIOCB common;
90     QEMUBH *bh;
91     struct BDRVArchipelagoState *s;
92     QEMUIOVector *qiov;
93     ARCHIPCmd cmd;
94     bool cancelled;
95     int status;
96     int64_t size;
97     int64_t ret;
98 } ArchipelagoAIOCB;
99
100 typedef struct BDRVArchipelagoState {
101     ArchipelagoAIOCB *event_acb;
102     char *volname;
103     char *segment_name;
104     uint64_t size;
105     /* Archipelago specific */
106     struct xseg *xseg;
107     struct xseg_port *port;
108     xport srcport;
109     xport sport;
110     xport mportno;
111     xport vportno;
112     QemuMutex archip_mutex;
113     QemuCond archip_cond;
114     bool is_signaled;
115     /* Request handler specific */
116     QemuThread request_th;
117     QemuCond request_cond;
118     QemuMutex request_mutex;
119     bool th_is_signaled;
120     bool stopping;
121 } BDRVArchipelagoState;
122
123 typedef struct ArchipelagoSegmentedRequest {
124     size_t count;
125     size_t total;
126     int ref;
127     int failed;
128 } ArchipelagoSegmentedRequest;
129
130 typedef struct AIORequestData {
131     const char *volname;
132     off_t offset;
133     size_t size;
134     uint64_t bufidx;
135     int ret;
136     int op;
137     ArchipelagoAIOCB *aio_cb;
138     ArchipelagoSegmentedRequest *segreq;
139 } AIORequestData;
140
141 static void qemu_archipelago_complete_aio(void *opaque);
142
143 static void init_local_signal(struct xseg *xseg, xport sport, xport srcport)
144 {
145     if (xseg && (sport != srcport)) {
146         xseg_init_local_signal(xseg, srcport);
147         sport = srcport;
148     }
149 }
150
151 static void archipelago_finish_aiocb(AIORequestData *reqdata)
152 {
153     if (reqdata->aio_cb->ret != reqdata->segreq->total) {
154         reqdata->aio_cb->ret = -EIO;
155     } else if (reqdata->aio_cb->ret == reqdata->segreq->total) {
156         reqdata->aio_cb->ret = 0;
157     }
158     reqdata->aio_cb->bh = aio_bh_new(
159                         bdrv_get_aio_context(reqdata->aio_cb->common.bs),
160                         qemu_archipelago_complete_aio, reqdata
161                         );
162     qemu_bh_schedule(reqdata->aio_cb->bh);
163 }
164
165 static int wait_reply(struct xseg *xseg, xport srcport, struct xseg_port *port,
166                       struct xseg_request *expected_req)
167 {
168     struct xseg_request *req;
169     xseg_prepare_wait(xseg, srcport);
170     void *psd = xseg_get_signal_desc(xseg, port);
171     while (1) {
172         req = xseg_receive(xseg, srcport, X_NONBLOCK);
173         if (req) {
174             if (req != expected_req) {
175                 archipelagolog("Unknown received request\n");
176                 xseg_put_request(xseg, req, srcport);
177             } else if (!(req->state & XS_SERVED)) {
178                 return -1;
179             } else {
180                 break;
181             }
182         }
183         xseg_wait_signal(xseg, psd, 100000UL);
184     }
185     xseg_cancel_wait(xseg, srcport);
186     return 0;
187 }
188
189 static void xseg_request_handler(void *state)
190 {
191     BDRVArchipelagoState *s = (BDRVArchipelagoState *) state;
192     void *psd = xseg_get_signal_desc(s->xseg, s->port);
193     qemu_mutex_lock(&s->request_mutex);
194
195     while (!s->stopping) {
196         struct xseg_request *req;
197         void *data;
198         xseg_prepare_wait(s->xseg, s->srcport);
199         req = xseg_receive(s->xseg, s->srcport, X_NONBLOCK);
200         if (req) {
201             AIORequestData *reqdata;
202             ArchipelagoSegmentedRequest *segreq;
203             xseg_get_req_data(s->xseg, req, (void **)&reqdata);
204
205             switch (reqdata->op) {
206             case ARCHIP_OP_READ:
207                 data = xseg_get_data(s->xseg, req);
208                 segreq = reqdata->segreq;
209                 segreq->count += req->serviced;
210
211                 qemu_iovec_from_buf(reqdata->aio_cb->qiov, reqdata->bufidx,
212                                     data,
213                                     req->serviced);
214
215                 xseg_put_request(s->xseg, req, s->srcport);
216
217                 if (atomic_fetch_dec(&segreq->ref) == 1) {
218                     if (!segreq->failed) {
219                         reqdata->aio_cb->ret = segreq->count;
220                         archipelago_finish_aiocb(reqdata);
221                         g_free(segreq);
222                     } else {
223                         g_free(segreq);
224                         g_free(reqdata);
225                     }
226                 } else {
227                     g_free(reqdata);
228                 }
229                 break;
230             case ARCHIP_OP_WRITE:
231             case ARCHIP_OP_FLUSH:
232                 segreq = reqdata->segreq;
233                 segreq->count += req->serviced;
234                 xseg_put_request(s->xseg, req, s->srcport);
235
236                 if (atomic_fetch_dec(&segreq->ref) == 1) {
237                     if (!segreq->failed) {
238                         reqdata->aio_cb->ret = segreq->count;
239                         archipelago_finish_aiocb(reqdata);
240                         g_free(segreq);
241                     } else {
242                         g_free(segreq);
243                         g_free(reqdata);
244                     }
245                 } else {
246                     g_free(reqdata);
247                 }
248                 break;
249             case ARCHIP_OP_VOLINFO:
250             case ARCHIP_OP_TRUNCATE:
251                 s->is_signaled = true;
252                 qemu_cond_signal(&s->archip_cond);
253                 break;
254             }
255         } else {
256             xseg_wait_signal(s->xseg, psd, 100000UL);
257         }
258         xseg_cancel_wait(s->xseg, s->srcport);
259     }
260
261     s->th_is_signaled = true;
262     qemu_cond_signal(&s->request_cond);
263     qemu_mutex_unlock(&s->request_mutex);
264     qemu_thread_exit(NULL);
265 }
266
267 static int qemu_archipelago_xseg_init(BDRVArchipelagoState *s)
268 {
269     if (xseg_initialize()) {
270         archipelagolog("Cannot initialize XSEG\n");
271         goto err_exit;
272     }
273
274     s->xseg = xseg_join("posix", s->segment_name,
275                         "posixfd", NULL);
276     if (!s->xseg) {
277         archipelagolog("Cannot join XSEG shared memory segment\n");
278         goto err_exit;
279     }
280     s->port = xseg_bind_dynport(s->xseg);
281     s->srcport = s->port->portno;
282     init_local_signal(s->xseg, s->sport, s->srcport);
283     return 0;
284
285 err_exit:
286     return -1;
287 }
288
289 static int qemu_archipelago_init(BDRVArchipelagoState *s)
290 {
291     int ret;
292
293     ret = qemu_archipelago_xseg_init(s);
294     if (ret < 0) {
295         error_report("Cannot initialize XSEG. Aborting...\n");
296         goto err_exit;
297     }
298
299     qemu_cond_init(&s->archip_cond);
300     qemu_mutex_init(&s->archip_mutex);
301     qemu_cond_init(&s->request_cond);
302     qemu_mutex_init(&s->request_mutex);
303     s->th_is_signaled = false;
304     qemu_thread_create(&s->request_th, "xseg_io_th",
305                        (void *) xseg_request_handler,
306                        (void *) s, QEMU_THREAD_JOINABLE);
307
308 err_exit:
309     return ret;
310 }
311
312 static void qemu_archipelago_complete_aio(void *opaque)
313 {
314     AIORequestData *reqdata = (AIORequestData *) opaque;
315     ArchipelagoAIOCB *aio_cb = (ArchipelagoAIOCB *) reqdata->aio_cb;
316
317     qemu_bh_delete(aio_cb->bh);
318     aio_cb->common.cb(aio_cb->common.opaque, aio_cb->ret);
319     aio_cb->status = 0;
320
321     if (!aio_cb->cancelled) {
322         qemu_aio_release(aio_cb);
323     }
324     g_free(reqdata);
325 }
326
327 static void xseg_find_port(char *pstr, const char *needle, xport *aport)
328 {
329     const char *a;
330     char *endptr = NULL;
331     unsigned long port;
332     if (strstart(pstr, needle, &a)) {
333         if (strlen(a) > 0) {
334             port = strtoul(a, &endptr, 10);
335             if (strlen(endptr)) {
336                 *aport = -2;
337                 return;
338             }
339             *aport = (xport) port;
340         }
341     }
342 }
343
344 static void xseg_find_segment(char *pstr, const char *needle,
345                               char **segment_name)
346 {
347     const char *a;
348     if (strstart(pstr, needle, &a)) {
349         if (strlen(a) > 0) {
350             *segment_name = g_strdup(a);
351         }
352     }
353 }
354
355 static void parse_filename_opts(const char *filename, Error **errp,
356                                 char **volume, char **segment_name,
357                                 xport *mport, xport *vport)
358 {
359     const char *start;
360     char *tokens[4], *ds;
361     int idx;
362     xport lmport = NoPort, lvport = NoPort;
363
364     strstart(filename, "archipelago:", &start);
365
366     ds = g_strdup(start);
367     tokens[0] = strtok(ds, "/");
368     tokens[1] = strtok(NULL, ":");
369     tokens[2] = strtok(NULL, ":");
370     tokens[3] = strtok(NULL, "\0");
371
372     if (!strlen(tokens[0])) {
373         error_setg(errp, "volume name must be specified first");
374         g_free(ds);
375         return;
376     }
377
378     for (idx = 1; idx < 4; idx++) {
379         if (tokens[idx] != NULL) {
380             if (strstart(tokens[idx], "mport=", NULL)) {
381                 xseg_find_port(tokens[idx], "mport=", &lmport);
382             }
383             if (strstart(tokens[idx], "vport=", NULL)) {
384                 xseg_find_port(tokens[idx], "vport=", &lvport);
385             }
386             if (strstart(tokens[idx], "segment=", NULL)) {
387                 xseg_find_segment(tokens[idx], "segment=", segment_name);
388             }
389         }
390     }
391
392     if ((lmport == -2) || (lvport == -2)) {
393         error_setg(errp, "mport and/or vport must be set");
394         g_free(ds);
395         return;
396     }
397     *volume = g_strdup(tokens[0]);
398     *mport = lmport;
399     *vport = lvport;
400     g_free(ds);
401 }
402
403 static void archipelago_parse_filename(const char *filename, QDict *options,
404                                        Error **errp)
405 {
406     const char *start;
407     char *volume = NULL, *segment_name = NULL;
408     xport mport = NoPort, vport = NoPort;
409
410     if (qdict_haskey(options, ARCHIPELAGO_OPT_VOLUME)
411             || qdict_haskey(options, ARCHIPELAGO_OPT_SEGMENT)
412             || qdict_haskey(options, ARCHIPELAGO_OPT_MPORT)
413             || qdict_haskey(options, ARCHIPELAGO_OPT_VPORT)) {
414         error_setg(errp, "volume/mport/vport/segment and a file name may not"
415                          " be specified at the same time");
416         return;
417     }
418
419     if (!strstart(filename, "archipelago:", &start)) {
420         error_setg(errp, "File name must start with 'archipelago:'");
421         return;
422     }
423
424     if (!strlen(start) || strstart(start, "/", NULL)) {
425         error_setg(errp, "volume name must be specified");
426         return;
427     }
428
429     parse_filename_opts(filename, errp, &volume, &segment_name, &mport, &vport);
430
431     if (volume) {
432         qdict_put(options, ARCHIPELAGO_OPT_VOLUME, qstring_from_str(volume));
433         g_free(volume);
434     }
435     if (segment_name) {
436         qdict_put(options, ARCHIPELAGO_OPT_SEGMENT,
437                   qstring_from_str(segment_name));
438         g_free(segment_name);
439     }
440     if (mport != NoPort) {
441         qdict_put(options, ARCHIPELAGO_OPT_MPORT, qint_from_int(mport));
442     }
443     if (vport != NoPort) {
444         qdict_put(options, ARCHIPELAGO_OPT_VPORT, qint_from_int(vport));
445     }
446 }
447
448 static QemuOptsList archipelago_runtime_opts = {
449     .name = "archipelago",
450     .head = QTAILQ_HEAD_INITIALIZER(archipelago_runtime_opts.head),
451     .desc = {
452         {
453             .name = ARCHIPELAGO_OPT_VOLUME,
454             .type = QEMU_OPT_STRING,
455             .help = "Name of the volume image",
456         },
457         {
458             .name = ARCHIPELAGO_OPT_SEGMENT,
459             .type = QEMU_OPT_STRING,
460             .help = "Name of the Archipelago shared memory segment",
461         },
462         {
463             .name = ARCHIPELAGO_OPT_MPORT,
464             .type = QEMU_OPT_NUMBER,
465             .help = "Archipelago mapperd port number"
466         },
467         {
468             .name = ARCHIPELAGO_OPT_VPORT,
469             .type = QEMU_OPT_NUMBER,
470             .help = "Archipelago vlmcd port number"
471
472         },
473         { /* end of list */ }
474     },
475 };
476
477 static int qemu_archipelago_open(BlockDriverState *bs,
478                                  QDict *options,
479                                  int bdrv_flags,
480                                  Error **errp)
481 {
482     int ret = 0;
483     const char *volume, *segment_name;
484     QemuOpts *opts;
485     Error *local_err = NULL;
486     BDRVArchipelagoState *s = bs->opaque;
487
488     opts = qemu_opts_create(&archipelago_runtime_opts, NULL, 0, &error_abort);
489     qemu_opts_absorb_qdict(opts, options, &local_err);
490     if (local_err) {
491         error_propagate(errp, local_err);
492         ret = -EINVAL;
493         goto err_exit;
494     }
495
496     s->mportno = qemu_opt_get_number(opts, ARCHIPELAGO_OPT_MPORT,
497                                      ARCHIPELAGO_DFL_MPORT);
498     s->vportno = qemu_opt_get_number(opts, ARCHIPELAGO_OPT_VPORT,
499                                      ARCHIPELAGO_DFL_VPORT);
500
501     segment_name = qemu_opt_get(opts, ARCHIPELAGO_OPT_SEGMENT);
502     if (segment_name == NULL) {
503         s->segment_name = g_strdup("archipelago");
504     } else {
505         s->segment_name = g_strdup(segment_name);
506     }
507
508     volume = qemu_opt_get(opts, ARCHIPELAGO_OPT_VOLUME);
509     if (volume == NULL) {
510         error_setg(errp, "archipelago block driver requires the 'volume'"
511                    " option");
512         ret = -EINVAL;
513         goto err_exit;
514     }
515     s->volname = g_strdup(volume);
516
517     /* Initialize XSEG, join shared memory segment */
518     ret = qemu_archipelago_init(s);
519     if (ret < 0) {
520         error_setg(errp, "cannot initialize XSEG and join shared "
521                    "memory segment");
522         goto err_exit;
523     }
524
525     qemu_opts_del(opts);
526     return 0;
527
528 err_exit:
529     g_free(s->volname);
530     g_free(s->segment_name);
531     qemu_opts_del(opts);
532     return ret;
533 }
534
535 static void qemu_archipelago_close(BlockDriverState *bs)
536 {
537     int r, targetlen;
538     char *target;
539     struct xseg_request *req;
540     BDRVArchipelagoState *s = bs->opaque;
541
542     s->stopping = true;
543
544     qemu_mutex_lock(&s->request_mutex);
545     while (!s->th_is_signaled) {
546         qemu_cond_wait(&s->request_cond,
547                        &s->request_mutex);
548     }
549     qemu_mutex_unlock(&s->request_mutex);
550     qemu_thread_join(&s->request_th);
551     qemu_cond_destroy(&s->request_cond);
552     qemu_mutex_destroy(&s->request_mutex);
553
554     qemu_cond_destroy(&s->archip_cond);
555     qemu_mutex_destroy(&s->archip_mutex);
556
557     targetlen = strlen(s->volname);
558     req = xseg_get_request(s->xseg, s->srcport, s->vportno, X_ALLOC);
559     if (!req) {
560         archipelagolog("Cannot get XSEG request\n");
561         goto err_exit;
562     }
563     r = xseg_prep_request(s->xseg, req, targetlen, 0);
564     if (r < 0) {
565         xseg_put_request(s->xseg, req, s->srcport);
566         archipelagolog("Cannot prepare XSEG close request\n");
567         goto err_exit;
568     }
569
570     target = xseg_get_target(s->xseg, req);
571     memcpy(target, s->volname, targetlen);
572     req->size = req->datalen;
573     req->offset = 0;
574     req->op = X_CLOSE;
575
576     xport p = xseg_submit(s->xseg, req, s->srcport, X_ALLOC);
577     if (p == NoPort) {
578         xseg_put_request(s->xseg, req, s->srcport);
579         archipelagolog("Cannot submit XSEG close request\n");
580         goto err_exit;
581     }
582
583     xseg_signal(s->xseg, p);
584     wait_reply(s->xseg, s->srcport, s->port, req);
585
586     xseg_put_request(s->xseg, req, s->srcport);
587
588 err_exit:
589     g_free(s->volname);
590     g_free(s->segment_name);
591     xseg_quit_local_signal(s->xseg, s->srcport);
592     xseg_leave_dynport(s->xseg, s->port);
593     xseg_leave(s->xseg);
594 }
595
596 static int qemu_archipelago_create_volume(Error **errp, const char *volname,
597                                           char *segment_name,
598                                           uint64_t size, xport mportno,
599                                           xport vportno)
600 {
601     int ret, targetlen;
602     struct xseg *xseg = NULL;
603     struct xseg_request *req;
604     struct xseg_request_clone *xclone;
605     struct xseg_port *port;
606     xport srcport = NoPort, sport = NoPort;
607     char *target;
608
609     /* Try default values if none has been set */
610     if (mportno == (xport) -1) {
611         mportno = ARCHIPELAGO_DFL_MPORT;
612     }
613
614     if (vportno == (xport) -1) {
615         vportno = ARCHIPELAGO_DFL_VPORT;
616     }
617
618     if (xseg_initialize()) {
619         error_setg(errp, "Cannot initialize XSEG");
620         return -1;
621     }
622
623     xseg = xseg_join("posix", segment_name,
624                      "posixfd", NULL);
625
626     if (!xseg) {
627         error_setg(errp, "Cannot join XSEG shared memory segment");
628         return -1;
629     }
630
631     port = xseg_bind_dynport(xseg);
632     srcport = port->portno;
633     init_local_signal(xseg, sport, srcport);
634
635     req = xseg_get_request(xseg, srcport, mportno, X_ALLOC);
636     if (!req) {
637         error_setg(errp, "Cannot get XSEG request");
638         return -1;
639     }
640
641     targetlen = strlen(volname);
642     ret = xseg_prep_request(xseg, req, targetlen,
643                             sizeof(struct xseg_request_clone));
644     if (ret < 0) {
645         error_setg(errp, "Cannot prepare XSEG request");
646         goto err_exit;
647     }
648
649     target = xseg_get_target(xseg, req);
650     if (!target) {
651         error_setg(errp, "Cannot get XSEG target.\n");
652         goto err_exit;
653     }
654     memcpy(target, volname, targetlen);
655     xclone = (struct xseg_request_clone *) xseg_get_data(xseg, req);
656     memset(xclone->target, 0 , XSEG_MAX_TARGETLEN);
657     xclone->targetlen = 0;
658     xclone->size = size;
659     req->offset = 0;
660     req->size = req->datalen;
661     req->op = X_CLONE;
662
663     xport p = xseg_submit(xseg, req, srcport, X_ALLOC);
664     if (p == NoPort) {
665         error_setg(errp, "Could not submit XSEG request");
666         goto err_exit;
667     }
668     xseg_signal(xseg, p);
669
670     ret = wait_reply(xseg, srcport, port, req);
671     if (ret < 0) {
672         error_setg(errp, "wait_reply() error.");
673     }
674
675     xseg_put_request(xseg, req, srcport);
676     xseg_quit_local_signal(xseg, srcport);
677     xseg_leave_dynport(xseg, port);
678     xseg_leave(xseg);
679     return ret;
680
681 err_exit:
682     xseg_put_request(xseg, req, srcport);
683     xseg_quit_local_signal(xseg, srcport);
684     xseg_leave_dynport(xseg, port);
685     xseg_leave(xseg);
686     return -1;
687 }
688
689 static int qemu_archipelago_create(const char *filename,
690                                    QemuOpts *options,
691                                    Error **errp)
692 {
693     int ret = 0;
694     uint64_t total_size = 0;
695     char *volname = NULL, *segment_name = NULL;
696     const char *start;
697     xport mport = NoPort, vport = NoPort;
698
699     if (!strstart(filename, "archipelago:", &start)) {
700         error_setg(errp, "File name must start with 'archipelago:'");
701         return -1;
702     }
703
704     if (!strlen(start) || strstart(start, "/", NULL)) {
705         error_setg(errp, "volume name must be specified");
706         return -1;
707     }
708
709     parse_filename_opts(filename, errp, &volname, &segment_name, &mport,
710                         &vport);
711     total_size = qemu_opt_get_size_del(options, BLOCK_OPT_SIZE, 0);
712
713     if (segment_name == NULL) {
714         segment_name = g_strdup("archipelago");
715     }
716
717     /* Create an Archipelago volume */
718     ret = qemu_archipelago_create_volume(errp, volname, segment_name,
719                                          total_size, mport,
720                                          vport);
721
722     g_free(volname);
723     g_free(segment_name);
724     return ret;
725 }
726
727 static void qemu_archipelago_aio_cancel(BlockDriverAIOCB *blockacb)
728 {
729     ArchipelagoAIOCB *aio_cb = (ArchipelagoAIOCB *) blockacb;
730     aio_cb->cancelled = true;
731     while (aio_cb->status == -EINPROGRESS) {
732         aio_poll(bdrv_get_aio_context(aio_cb->common.bs), true);
733     }
734     qemu_aio_release(aio_cb);
735 }
736
737 static const AIOCBInfo archipelago_aiocb_info = {
738     .aiocb_size = sizeof(ArchipelagoAIOCB),
739     .cancel = qemu_archipelago_aio_cancel,
740 };
741
742 static int archipelago_submit_request(BDRVArchipelagoState *s,
743                                         uint64_t bufidx,
744                                         size_t count,
745                                         off_t offset,
746                                         ArchipelagoAIOCB *aio_cb,
747                                         ArchipelagoSegmentedRequest *segreq,
748                                         int op)
749 {
750     int ret, targetlen;
751     char *target;
752     void *data = NULL;
753     struct xseg_request *req;
754     AIORequestData *reqdata = g_new(AIORequestData, 1);
755
756     targetlen = strlen(s->volname);
757     req = xseg_get_request(s->xseg, s->srcport, s->vportno, X_ALLOC);
758     if (!req) {
759         archipelagolog("Cannot get XSEG request\n");
760         goto err_exit2;
761     }
762     ret = xseg_prep_request(s->xseg, req, targetlen, count);
763     if (ret < 0) {
764         archipelagolog("Cannot prepare XSEG request\n");
765         goto err_exit;
766     }
767     target = xseg_get_target(s->xseg, req);
768     if (!target) {
769         archipelagolog("Cannot get XSEG target\n");
770         goto err_exit;
771     }
772     memcpy(target, s->volname, targetlen);
773     req->size = count;
774     req->offset = offset;
775
776     switch (op) {
777     case ARCHIP_OP_READ:
778         req->op = X_READ;
779         break;
780     case ARCHIP_OP_WRITE:
781         req->op = X_WRITE;
782         break;
783     case ARCHIP_OP_FLUSH:
784         req->op = X_FLUSH;
785         break;
786     }
787     reqdata->volname = s->volname;
788     reqdata->offset = offset;
789     reqdata->size = count;
790     reqdata->bufidx = bufidx;
791     reqdata->aio_cb = aio_cb;
792     reqdata->segreq = segreq;
793     reqdata->op = op;
794
795     xseg_set_req_data(s->xseg, req, reqdata);
796     if (op == ARCHIP_OP_WRITE) {
797         data = xseg_get_data(s->xseg, req);
798         if (!data) {
799             archipelagolog("Cannot get XSEG data\n");
800             goto err_exit;
801         }
802         qemu_iovec_to_buf(aio_cb->qiov, bufidx, data, count);
803     }
804
805     xport p = xseg_submit(s->xseg, req, s->srcport, X_ALLOC);
806     if (p == NoPort) {
807         archipelagolog("Could not submit XSEG request\n");
808         goto err_exit;
809     }
810     xseg_signal(s->xseg, p);
811     return 0;
812
813 err_exit:
814     g_free(reqdata);
815     xseg_put_request(s->xseg, req, s->srcport);
816     return -EIO;
817 err_exit2:
818     g_free(reqdata);
819     return -EIO;
820 }
821
822 static int archipelago_aio_segmented_rw(BDRVArchipelagoState *s,
823                                         size_t count,
824                                         off_t offset,
825                                         ArchipelagoAIOCB *aio_cb,
826                                         int op)
827 {
828     int ret, segments_nr;
829     size_t pos = 0;
830     ArchipelagoSegmentedRequest *segreq;
831
832     segreq = g_new0(ArchipelagoSegmentedRequest, 1);
833
834     if (op == ARCHIP_OP_FLUSH) {
835         segments_nr = 1;
836     } else {
837         segments_nr = (int)(count / MAX_REQUEST_SIZE) + \
838                       ((count % MAX_REQUEST_SIZE) ? 1 : 0);
839     }
840     segreq->total = count;
841     atomic_mb_set(&segreq->ref, segments_nr);
842
843     while (segments_nr > 1) {
844         ret = archipelago_submit_request(s, pos,
845                                             MAX_REQUEST_SIZE,
846                                             offset + pos,
847                                             aio_cb, segreq, op);
848
849         if (ret < 0) {
850             goto err_exit;
851         }
852         count -= MAX_REQUEST_SIZE;
853         pos += MAX_REQUEST_SIZE;
854         segments_nr--;
855     }
856     ret = archipelago_submit_request(s, pos, count, offset + pos,
857                                      aio_cb, segreq, op);
858
859     if (ret < 0) {
860         goto err_exit;
861     }
862     return 0;
863
864 err_exit:
865     segreq->failed = 1;
866     if (atomic_fetch_sub(&segreq->ref, segments_nr) == segments_nr) {
867         g_free(segreq);
868     }
869     return ret;
870 }
871
872 static BlockDriverAIOCB *qemu_archipelago_aio_rw(BlockDriverState *bs,
873                                                  int64_t sector_num,
874                                                  QEMUIOVector *qiov,
875                                                  int nb_sectors,
876                                                  BlockDriverCompletionFunc *cb,
877                                                  void *opaque,
878                                                  int op)
879 {
880     ArchipelagoAIOCB *aio_cb;
881     BDRVArchipelagoState *s = bs->opaque;
882     int64_t size, off;
883     int ret;
884
885     aio_cb = qemu_aio_get(&archipelago_aiocb_info, bs, cb, opaque);
886     aio_cb->cmd = op;
887     aio_cb->qiov = qiov;
888
889     aio_cb->ret = 0;
890     aio_cb->s = s;
891     aio_cb->cancelled = false;
892     aio_cb->status = -EINPROGRESS;
893
894     off = sector_num * BDRV_SECTOR_SIZE;
895     size = nb_sectors * BDRV_SECTOR_SIZE;
896     aio_cb->size = size;
897
898     ret = archipelago_aio_segmented_rw(s, size, off,
899                                        aio_cb, op);
900     if (ret < 0) {
901         goto err_exit;
902     }
903     return &aio_cb->common;
904
905 err_exit:
906     error_report("qemu_archipelago_aio_rw(): I/O Error\n");
907     qemu_aio_release(aio_cb);
908     return NULL;
909 }
910
911 static BlockDriverAIOCB *qemu_archipelago_aio_readv(BlockDriverState *bs,
912         int64_t sector_num, QEMUIOVector *qiov, int nb_sectors,
913         BlockDriverCompletionFunc *cb, void *opaque)
914 {
915     return qemu_archipelago_aio_rw(bs, sector_num, qiov, nb_sectors, cb,
916                                    opaque, ARCHIP_OP_READ);
917 }
918
919 static BlockDriverAIOCB *qemu_archipelago_aio_writev(BlockDriverState *bs,
920         int64_t sector_num, QEMUIOVector *qiov, int nb_sectors,
921         BlockDriverCompletionFunc *cb, void *opaque)
922 {
923     return qemu_archipelago_aio_rw(bs, sector_num, qiov, nb_sectors, cb,
924                                    opaque, ARCHIP_OP_WRITE);
925 }
926
927 static int64_t archipelago_volume_info(BDRVArchipelagoState *s)
928 {
929     uint64_t size;
930     int ret, targetlen;
931     struct xseg_request *req;
932     struct xseg_reply_info *xinfo;
933     AIORequestData *reqdata = g_new(AIORequestData, 1);
934
935     const char *volname = s->volname;
936     targetlen = strlen(volname);
937     req = xseg_get_request(s->xseg, s->srcport, s->mportno, X_ALLOC);
938     if (!req) {
939         archipelagolog("Cannot get XSEG request\n");
940         goto err_exit2;
941     }
942     ret = xseg_prep_request(s->xseg, req, targetlen,
943                             sizeof(struct xseg_reply_info));
944     if (ret < 0) {
945         archipelagolog("Cannot prepare XSEG request\n");
946         goto err_exit;
947     }
948     char *target = xseg_get_target(s->xseg, req);
949     if (!target) {
950         archipelagolog("Cannot get XSEG target\n");
951         goto err_exit;
952     }
953     memcpy(target, volname, targetlen);
954     req->size = req->datalen;
955     req->offset = 0;
956     req->op = X_INFO;
957
958     reqdata->op = ARCHIP_OP_VOLINFO;
959     reqdata->volname = volname;
960     xseg_set_req_data(s->xseg, req, reqdata);
961
962     xport p = xseg_submit(s->xseg, req, s->srcport, X_ALLOC);
963     if (p == NoPort) {
964         archipelagolog("Cannot submit XSEG request\n");
965         goto err_exit;
966     }
967     xseg_signal(s->xseg, p);
968     qemu_mutex_lock(&s->archip_mutex);
969     while (!s->is_signaled) {
970         qemu_cond_wait(&s->archip_cond, &s->archip_mutex);
971     }
972     s->is_signaled = false;
973     qemu_mutex_unlock(&s->archip_mutex);
974
975     xinfo = (struct xseg_reply_info *) xseg_get_data(s->xseg, req);
976     size = xinfo->size;
977     xseg_put_request(s->xseg, req, s->srcport);
978     g_free(reqdata);
979     s->size = size;
980     return size;
981
982 err_exit:
983     xseg_put_request(s->xseg, req, s->srcport);
984 err_exit2:
985     g_free(reqdata);
986     return -EIO;
987 }
988
989 static int64_t qemu_archipelago_getlength(BlockDriverState *bs)
990 {
991     int64_t ret;
992     BDRVArchipelagoState *s = bs->opaque;
993
994     ret = archipelago_volume_info(s);
995     return ret;
996 }
997
998 static int qemu_archipelago_truncate(BlockDriverState *bs, int64_t offset)
999 {
1000     int ret, targetlen;
1001     struct xseg_request *req;
1002     BDRVArchipelagoState *s = bs->opaque;
1003     AIORequestData *reqdata = g_new(AIORequestData, 1);
1004
1005     const char *volname = s->volname;
1006     targetlen = strlen(volname);
1007     req = xseg_get_request(s->xseg, s->srcport, s->mportno, X_ALLOC);
1008     if (!req) {
1009         archipelagolog("Cannot get XSEG request\n");
1010         return err_exit2;
1011     }
1012
1013     ret = xseg_prep_request(s->xseg, req, targetlen, 0);
1014     if (ret < 0) {
1015         archipelagolog("Cannot prepare XSEG request\n");
1016         goto err_exit;
1017     }
1018     char *target = xseg_get_target(s->xseg, req);
1019     if (!target) {
1020         archipelagolog("Cannot get XSEG target\n");
1021         goto err_exit;
1022     }
1023     memcpy(target, volname, targetlen);
1024     req->offset = offset;
1025     req->op = X_TRUNCATE;
1026
1027     reqdata->op = ARCHIP_OP_TRUNCATE;
1028     reqdata->volname = volname;
1029
1030     xseg_set_req_data(s->xseg, req, reqdata);
1031
1032     xport p = xseg_submit(s->xseg, req, s->srcport, X_ALLOC);
1033     if (p == NoPort) {
1034         archipelagolog("Cannot submit XSEG request\n");
1035         goto err_exit;
1036     }
1037
1038     xseg_signal(s->xseg, p);
1039     qemu_mutex_lock(&s->archip_mutex);
1040     while (!s->is_signaled) {
1041         qemu_cond_wait(&s->archip_cond, &s->archip_mutex);
1042     }
1043     s->is_signaled = false;
1044     qemu_mutex_unlock(&s->archip_mutex);
1045     xseg_put_request(s->xseg, req, s->srcport);
1046     g_free(reqdata);
1047     return 0;
1048
1049 err_exit:
1050     xseg_put_request(s->xseg, req, s->srcport);
1051 err_exit2:
1052     g_free(reqdata);
1053     return -EIO;
1054 }
1055
1056 static QemuOptsList qemu_archipelago_create_opts = {
1057     .name = "archipelago-create-opts",
1058     .head = QTAILQ_HEAD_INITIALIZER(qemu_archipelago_create_opts.head),
1059     .desc = {
1060         {
1061             .name = BLOCK_OPT_SIZE,
1062             .type = QEMU_OPT_SIZE,
1063             .help = "Virtual disk size"
1064         },
1065         { /* end of list */ }
1066     }
1067 };
1068
1069 static BlockDriverAIOCB *qemu_archipelago_aio_flush(BlockDriverState *bs,
1070         BlockDriverCompletionFunc *cb, void *opaque)
1071 {
1072     return qemu_archipelago_aio_rw(bs, 0, NULL, 0, cb, opaque,
1073                                    ARCHIP_OP_FLUSH);
1074 }
1075
1076 static BlockDriver bdrv_archipelago = {
1077     .format_name         = "archipelago",
1078     .protocol_name       = "archipelago",
1079     .instance_size       = sizeof(BDRVArchipelagoState),
1080     .bdrv_parse_filename = archipelago_parse_filename,
1081     .bdrv_file_open      = qemu_archipelago_open,
1082     .bdrv_close          = qemu_archipelago_close,
1083     .bdrv_create         = qemu_archipelago_create,
1084     .bdrv_getlength      = qemu_archipelago_getlength,
1085     .bdrv_truncate       = qemu_archipelago_truncate,
1086     .bdrv_aio_readv      = qemu_archipelago_aio_readv,
1087     .bdrv_aio_writev     = qemu_archipelago_aio_writev,
1088     .bdrv_aio_flush      = qemu_archipelago_aio_flush,
1089     .bdrv_has_zero_init  = bdrv_has_zero_init_1,
1090     .create_opts         = &qemu_archipelago_create_opts,
1091 };
1092
1093 static void bdrv_archipelago_init(void)
1094 {
1095     bdrv_register(&bdrv_archipelago);
1096 }
1097
1098 block_init(bdrv_archipelago_init);
This page took 0.083207 seconds and 4 git commands to generate.