]> Git Repo - qemu.git/blob - block/nbd-client.c
Merge remote-tracking branch 'remotes/bonzini/tags/for-upstream' into staging
[qemu.git] / block / nbd-client.c
1 /*
2  * QEMU Block driver for  NBD
3  *
4  * Copyright (C) 2016 Red Hat, Inc.
5  * Copyright (C) 2008 Bull S.A.S.
6  *     Author: Laurent Vivier <[email protected]>
7  *
8  * Some parts:
9  *    Copyright (C) 2007 Anthony Liguori <[email protected]>
10  *
11  * Permission is hereby granted, free of charge, to any person obtaining a copy
12  * of this software and associated documentation files (the "Software"), to deal
13  * in the Software without restriction, including without limitation the rights
14  * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
15  * copies of the Software, and to permit persons to whom the Software is
16  * furnished to do so, subject to the following conditions:
17  *
18  * The above copyright notice and this permission notice shall be included in
19  * all copies or substantial portions of the Software.
20  *
21  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
22  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
23  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
24  * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
25  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
26  * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
27  * THE SOFTWARE.
28  */
29
30 #include "qemu/osdep.h"
31 #include "nbd-client.h"
32
33 #define HANDLE_TO_INDEX(bs, handle) ((handle) ^ ((uint64_t)(intptr_t)bs))
34 #define INDEX_TO_HANDLE(bs, index)  ((index)  ^ ((uint64_t)(intptr_t)bs))
35
36 static void nbd_recv_coroutines_enter_all(NBDClientSession *s)
37 {
38     int i;
39
40     for (i = 0; i < MAX_NBD_REQUESTS; i++) {
41         if (s->recv_coroutine[i]) {
42             qemu_coroutine_enter(s->recv_coroutine[i]);
43         }
44     }
45 }
46
47 static void nbd_teardown_connection(BlockDriverState *bs)
48 {
49     NBDClientSession *client = nbd_get_client_session(bs);
50
51     if (!client->ioc) { /* Already closed */
52         return;
53     }
54
55     /* finish any pending coroutines */
56     qio_channel_shutdown(client->ioc,
57                          QIO_CHANNEL_SHUTDOWN_BOTH,
58                          NULL);
59     nbd_recv_coroutines_enter_all(client);
60
61     nbd_client_detach_aio_context(bs);
62     object_unref(OBJECT(client->sioc));
63     client->sioc = NULL;
64     object_unref(OBJECT(client->ioc));
65     client->ioc = NULL;
66 }
67
68 static void nbd_reply_ready(void *opaque)
69 {
70     BlockDriverState *bs = opaque;
71     NBDClientSession *s = nbd_get_client_session(bs);
72     uint64_t i;
73     int ret;
74
75     if (!s->ioc) { /* Already closed */
76         return;
77     }
78
79     if (s->reply.handle == 0) {
80         /* No reply already in flight.  Fetch a header.  It is possible
81          * that another thread has done the same thing in parallel, so
82          * the socket is not readable anymore.
83          */
84         ret = nbd_receive_reply(s->ioc, &s->reply);
85         if (ret == -EAGAIN) {
86             return;
87         }
88         if (ret < 0) {
89             s->reply.handle = 0;
90             goto fail;
91         }
92     }
93
94     /* There's no need for a mutex on the receive side, because the
95      * handler acts as a synchronization point and ensures that only
96      * one coroutine is called until the reply finishes.  */
97     i = HANDLE_TO_INDEX(s, s->reply.handle);
98     if (i >= MAX_NBD_REQUESTS) {
99         goto fail;
100     }
101
102     if (s->recv_coroutine[i]) {
103         qemu_coroutine_enter(s->recv_coroutine[i]);
104         return;
105     }
106
107 fail:
108     nbd_teardown_connection(bs);
109 }
110
111 static void nbd_restart_write(void *opaque)
112 {
113     BlockDriverState *bs = opaque;
114
115     qemu_coroutine_enter(nbd_get_client_session(bs)->send_coroutine);
116 }
117
118 static int nbd_co_send_request(BlockDriverState *bs,
119                                NBDRequest *request,
120                                QEMUIOVector *qiov)
121 {
122     NBDClientSession *s = nbd_get_client_session(bs);
123     AioContext *aio_context;
124     int rc, ret, i;
125
126     qemu_co_mutex_lock(&s->send_mutex);
127
128     for (i = 0; i < MAX_NBD_REQUESTS; i++) {
129         if (s->recv_coroutine[i] == NULL) {
130             s->recv_coroutine[i] = qemu_coroutine_self();
131             break;
132         }
133     }
134
135     g_assert(qemu_in_coroutine());
136     assert(i < MAX_NBD_REQUESTS);
137     request->handle = INDEX_TO_HANDLE(s, i);
138
139     if (!s->ioc) {
140         qemu_co_mutex_unlock(&s->send_mutex);
141         return -EPIPE;
142     }
143
144     s->send_coroutine = qemu_coroutine_self();
145     aio_context = bdrv_get_aio_context(bs);
146
147     aio_set_fd_handler(aio_context, s->sioc->fd, false,
148                        nbd_reply_ready, nbd_restart_write, bs);
149     if (qiov) {
150         qio_channel_set_cork(s->ioc, true);
151         rc = nbd_send_request(s->ioc, request);
152         if (rc >= 0) {
153             ret = nbd_wr_syncv(s->ioc, qiov->iov, qiov->niov, request->len,
154                                false);
155             if (ret != request->len) {
156                 rc = -EIO;
157             }
158         }
159         qio_channel_set_cork(s->ioc, false);
160     } else {
161         rc = nbd_send_request(s->ioc, request);
162     }
163     aio_set_fd_handler(aio_context, s->sioc->fd, false,
164                        nbd_reply_ready, NULL, bs);
165     s->send_coroutine = NULL;
166     qemu_co_mutex_unlock(&s->send_mutex);
167     return rc;
168 }
169
170 static void nbd_co_receive_reply(NBDClientSession *s,
171                                  NBDRequest *request,
172                                  NBDReply *reply,
173                                  QEMUIOVector *qiov)
174 {
175     int ret;
176
177     /* Wait until we're woken up by the read handler.  TODO: perhaps
178      * peek at the next reply and avoid yielding if it's ours?  */
179     qemu_coroutine_yield();
180     *reply = s->reply;
181     if (reply->handle != request->handle ||
182         !s->ioc) {
183         reply->error = EIO;
184     } else {
185         if (qiov && reply->error == 0) {
186             ret = nbd_wr_syncv(s->ioc, qiov->iov, qiov->niov, request->len,
187                                true);
188             if (ret != request->len) {
189                 reply->error = EIO;
190             }
191         }
192
193         /* Tell the read handler to read another header.  */
194         s->reply.handle = 0;
195     }
196 }
197
198 static void nbd_coroutine_start(NBDClientSession *s,
199                                 NBDRequest *request)
200 {
201     /* Poor man semaphore.  The free_sema is locked when no other request
202      * can be accepted, and unlocked after receiving one reply.  */
203     if (s->in_flight == MAX_NBD_REQUESTS) {
204         qemu_co_queue_wait(&s->free_sema);
205         assert(s->in_flight < MAX_NBD_REQUESTS);
206     }
207     s->in_flight++;
208
209     /* s->recv_coroutine[i] is set as soon as we get the send_lock.  */
210 }
211
212 static void nbd_coroutine_end(NBDClientSession *s,
213                               NBDRequest *request)
214 {
215     int i = HANDLE_TO_INDEX(s, request->handle);
216     s->recv_coroutine[i] = NULL;
217     if (s->in_flight-- == MAX_NBD_REQUESTS) {
218         qemu_co_queue_next(&s->free_sema);
219     }
220 }
221
222 int nbd_client_co_preadv(BlockDriverState *bs, uint64_t offset,
223                          uint64_t bytes, QEMUIOVector *qiov, int flags)
224 {
225     NBDClientSession *client = nbd_get_client_session(bs);
226     NBDRequest request = {
227         .type = NBD_CMD_READ,
228         .from = offset,
229         .len = bytes,
230     };
231     NBDReply reply;
232     ssize_t ret;
233
234     assert(bytes <= NBD_MAX_BUFFER_SIZE);
235     assert(!flags);
236
237     nbd_coroutine_start(client, &request);
238     ret = nbd_co_send_request(bs, &request, NULL);
239     if (ret < 0) {
240         reply.error = -ret;
241     } else {
242         nbd_co_receive_reply(client, &request, &reply, qiov);
243     }
244     nbd_coroutine_end(client, &request);
245     return -reply.error;
246 }
247
248 int nbd_client_co_pwritev(BlockDriverState *bs, uint64_t offset,
249                           uint64_t bytes, QEMUIOVector *qiov, int flags)
250 {
251     NBDClientSession *client = nbd_get_client_session(bs);
252     NBDRequest request = {
253         .type = NBD_CMD_WRITE,
254         .from = offset,
255         .len = bytes,
256     };
257     NBDReply reply;
258     ssize_t ret;
259
260     if (flags & BDRV_REQ_FUA) {
261         assert(client->nbdflags & NBD_FLAG_SEND_FUA);
262         request.flags |= NBD_CMD_FLAG_FUA;
263     }
264
265     assert(bytes <= NBD_MAX_BUFFER_SIZE);
266
267     nbd_coroutine_start(client, &request);
268     ret = nbd_co_send_request(bs, &request, qiov);
269     if (ret < 0) {
270         reply.error = -ret;
271     } else {
272         nbd_co_receive_reply(client, &request, &reply, NULL);
273     }
274     nbd_coroutine_end(client, &request);
275     return -reply.error;
276 }
277
278 int nbd_client_co_pwrite_zeroes(BlockDriverState *bs, int64_t offset,
279                                 int count, BdrvRequestFlags flags)
280 {
281     ssize_t ret;
282     NBDClientSession *client = nbd_get_client_session(bs);
283     NBDRequest request = {
284         .type = NBD_CMD_WRITE_ZEROES,
285         .from = offset,
286         .len = count,
287     };
288     NBDReply reply;
289
290     if (!(client->nbdflags & NBD_FLAG_SEND_WRITE_ZEROES)) {
291         return -ENOTSUP;
292     }
293
294     if (flags & BDRV_REQ_FUA) {
295         assert(client->nbdflags & NBD_FLAG_SEND_FUA);
296         request.flags |= NBD_CMD_FLAG_FUA;
297     }
298     if (!(flags & BDRV_REQ_MAY_UNMAP)) {
299         request.flags |= NBD_CMD_FLAG_NO_HOLE;
300     }
301
302     nbd_coroutine_start(client, &request);
303     ret = nbd_co_send_request(bs, &request, NULL);
304     if (ret < 0) {
305         reply.error = -ret;
306     } else {
307         nbd_co_receive_reply(client, &request, &reply, NULL);
308     }
309     nbd_coroutine_end(client, &request);
310     return -reply.error;
311 }
312
313 int nbd_client_co_flush(BlockDriverState *bs)
314 {
315     NBDClientSession *client = nbd_get_client_session(bs);
316     NBDRequest request = { .type = NBD_CMD_FLUSH };
317     NBDReply reply;
318     ssize_t ret;
319
320     if (!(client->nbdflags & NBD_FLAG_SEND_FLUSH)) {
321         return 0;
322     }
323
324     request.from = 0;
325     request.len = 0;
326
327     nbd_coroutine_start(client, &request);
328     ret = nbd_co_send_request(bs, &request, NULL);
329     if (ret < 0) {
330         reply.error = -ret;
331     } else {
332         nbd_co_receive_reply(client, &request, &reply, NULL);
333     }
334     nbd_coroutine_end(client, &request);
335     return -reply.error;
336 }
337
338 int nbd_client_co_pdiscard(BlockDriverState *bs, int64_t offset, int count)
339 {
340     NBDClientSession *client = nbd_get_client_session(bs);
341     NBDRequest request = {
342         .type = NBD_CMD_TRIM,
343         .from = offset,
344         .len = count,
345     };
346     NBDReply reply;
347     ssize_t ret;
348
349     if (!(client->nbdflags & NBD_FLAG_SEND_TRIM)) {
350         return 0;
351     }
352
353     nbd_coroutine_start(client, &request);
354     ret = nbd_co_send_request(bs, &request, NULL);
355     if (ret < 0) {
356         reply.error = -ret;
357     } else {
358         nbd_co_receive_reply(client, &request, &reply, NULL);
359     }
360     nbd_coroutine_end(client, &request);
361     return -reply.error;
362
363 }
364
365 void nbd_client_detach_aio_context(BlockDriverState *bs)
366 {
367     aio_set_fd_handler(bdrv_get_aio_context(bs),
368                        nbd_get_client_session(bs)->sioc->fd,
369                        false, NULL, NULL, NULL);
370 }
371
372 void nbd_client_attach_aio_context(BlockDriverState *bs,
373                                    AioContext *new_context)
374 {
375     aio_set_fd_handler(new_context, nbd_get_client_session(bs)->sioc->fd,
376                        false, nbd_reply_ready, NULL, bs);
377 }
378
379 void nbd_client_close(BlockDriverState *bs)
380 {
381     NBDClientSession *client = nbd_get_client_session(bs);
382     NBDRequest request = { .type = NBD_CMD_DISC };
383
384     if (client->ioc == NULL) {
385         return;
386     }
387
388     nbd_send_request(client->ioc, &request);
389
390     nbd_teardown_connection(bs);
391 }
392
393 int nbd_client_init(BlockDriverState *bs,
394                     QIOChannelSocket *sioc,
395                     const char *export,
396                     QCryptoTLSCreds *tlscreds,
397                     const char *hostname,
398                     Error **errp)
399 {
400     NBDClientSession *client = nbd_get_client_session(bs);
401     int ret;
402
403     /* NBD handshake */
404     logout("session init %s\n", export);
405     qio_channel_set_blocking(QIO_CHANNEL(sioc), true, NULL);
406
407     ret = nbd_receive_negotiate(QIO_CHANNEL(sioc), export,
408                                 &client->nbdflags,
409                                 tlscreds, hostname,
410                                 &client->ioc,
411                                 &client->size, errp);
412     if (ret < 0) {
413         logout("Failed to negotiate with the NBD server\n");
414         return ret;
415     }
416     if (client->nbdflags & NBD_FLAG_SEND_FUA) {
417         bs->supported_write_flags = BDRV_REQ_FUA;
418     }
419
420     qemu_co_mutex_init(&client->send_mutex);
421     qemu_co_queue_init(&client->free_sema);
422     client->sioc = sioc;
423     object_ref(OBJECT(client->sioc));
424
425     if (!client->ioc) {
426         client->ioc = QIO_CHANNEL(sioc);
427         object_ref(OBJECT(client->ioc));
428     }
429
430     /* Now that we're connected, set the socket to be non-blocking and
431      * kick the reply mechanism.  */
432     qio_channel_set_blocking(QIO_CHANNEL(sioc), false, NULL);
433
434     nbd_client_attach_aio_context(bs, bdrv_get_aio_context(bs));
435
436     logout("Established connection with NBD server\n");
437     return 0;
438 }
This page took 0.04751 seconds and 4 git commands to generate.