]> Git Repo - qemu.git/blob - block/nbd-client.c
Merge remote-tracking branch 'remotes/cohuck/tags/s390x-20140520' into staging
[qemu.git] / block / nbd-client.c
1 /*
2  * QEMU Block driver for  NBD
3  *
4  * Copyright (C) 2008 Bull S.A.S.
5  *     Author: Laurent Vivier <[email protected]>
6  *
7  * Some parts:
8  *    Copyright (C) 2007 Anthony Liguori <[email protected]>
9  *
10  * Permission is hereby granted, free of charge, to any person obtaining a copy
11  * of this software and associated documentation files (the "Software"), to deal
12  * in the Software without restriction, including without limitation the rights
13  * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
14  * copies of the Software, and to permit persons to whom the Software is
15  * furnished to do so, subject to the following conditions:
16  *
17  * The above copyright notice and this permission notice shall be included in
18  * all copies or substantial portions of the Software.
19  *
20  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
21  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
22  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
23  * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
24  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
25  * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
26  * THE SOFTWARE.
27  */
28
29 #include "nbd-client.h"
30 #include "qemu/sockets.h"
31
32 #define HANDLE_TO_INDEX(bs, handle) ((handle) ^ ((uint64_t)(intptr_t)bs))
33 #define INDEX_TO_HANDLE(bs, index)  ((index)  ^ ((uint64_t)(intptr_t)bs))
34
35 static void nbd_recv_coroutines_enter_all(NbdClientSession *s)
36 {
37     int i;
38
39     for (i = 0; i < MAX_NBD_REQUESTS; i++) {
40         if (s->recv_coroutine[i]) {
41             qemu_coroutine_enter(s->recv_coroutine[i], NULL);
42         }
43     }
44 }
45
46 static void nbd_teardown_connection(NbdClientSession *client)
47 {
48     /* finish any pending coroutines */
49     shutdown(client->sock, 2);
50     nbd_recv_coroutines_enter_all(client);
51
52     qemu_aio_set_fd_handler(client->sock, NULL, NULL, NULL);
53     closesocket(client->sock);
54     client->sock = -1;
55 }
56
57 static void nbd_reply_ready(void *opaque)
58 {
59     NbdClientSession *s = opaque;
60     uint64_t i;
61     int ret;
62
63     if (s->reply.handle == 0) {
64         /* No reply already in flight.  Fetch a header.  It is possible
65          * that another thread has done the same thing in parallel, so
66          * the socket is not readable anymore.
67          */
68         ret = nbd_receive_reply(s->sock, &s->reply);
69         if (ret == -EAGAIN) {
70             return;
71         }
72         if (ret < 0) {
73             s->reply.handle = 0;
74             goto fail;
75         }
76     }
77
78     /* There's no need for a mutex on the receive side, because the
79      * handler acts as a synchronization point and ensures that only
80      * one coroutine is called until the reply finishes.  */
81     i = HANDLE_TO_INDEX(s, s->reply.handle);
82     if (i >= MAX_NBD_REQUESTS) {
83         goto fail;
84     }
85
86     if (s->recv_coroutine[i]) {
87         qemu_coroutine_enter(s->recv_coroutine[i], NULL);
88         return;
89     }
90
91 fail:
92     nbd_teardown_connection(s);
93 }
94
95 static void nbd_restart_write(void *opaque)
96 {
97     NbdClientSession *s = opaque;
98
99     qemu_coroutine_enter(s->send_coroutine, NULL);
100 }
101
102 static int nbd_co_send_request(NbdClientSession *s,
103     struct nbd_request *request,
104     QEMUIOVector *qiov, int offset)
105 {
106     int rc, ret;
107
108     qemu_co_mutex_lock(&s->send_mutex);
109     s->send_coroutine = qemu_coroutine_self();
110     qemu_aio_set_fd_handler(s->sock, nbd_reply_ready, nbd_restart_write, s);
111     if (qiov) {
112         if (!s->is_unix) {
113             socket_set_cork(s->sock, 1);
114         }
115         rc = nbd_send_request(s->sock, request);
116         if (rc >= 0) {
117             ret = qemu_co_sendv(s->sock, qiov->iov, qiov->niov,
118                                 offset, request->len);
119             if (ret != request->len) {
120                 rc = -EIO;
121             }
122         }
123         if (!s->is_unix) {
124             socket_set_cork(s->sock, 0);
125         }
126     } else {
127         rc = nbd_send_request(s->sock, request);
128     }
129     qemu_aio_set_fd_handler(s->sock, nbd_reply_ready, NULL, s);
130     s->send_coroutine = NULL;
131     qemu_co_mutex_unlock(&s->send_mutex);
132     return rc;
133 }
134
135 static void nbd_co_receive_reply(NbdClientSession *s,
136     struct nbd_request *request, struct nbd_reply *reply,
137     QEMUIOVector *qiov, int offset)
138 {
139     int ret;
140
141     /* Wait until we're woken up by the read handler.  TODO: perhaps
142      * peek at the next reply and avoid yielding if it's ours?  */
143     qemu_coroutine_yield();
144     *reply = s->reply;
145     if (reply->handle != request->handle) {
146         reply->error = EIO;
147     } else {
148         if (qiov && reply->error == 0) {
149             ret = qemu_co_recvv(s->sock, qiov->iov, qiov->niov,
150                                 offset, request->len);
151             if (ret != request->len) {
152                 reply->error = EIO;
153             }
154         }
155
156         /* Tell the read handler to read another header.  */
157         s->reply.handle = 0;
158     }
159 }
160
161 static void nbd_coroutine_start(NbdClientSession *s,
162    struct nbd_request *request)
163 {
164     int i;
165
166     /* Poor man semaphore.  The free_sema is locked when no other request
167      * can be accepted, and unlocked after receiving one reply.  */
168     if (s->in_flight >= MAX_NBD_REQUESTS - 1) {
169         qemu_co_mutex_lock(&s->free_sema);
170         assert(s->in_flight < MAX_NBD_REQUESTS);
171     }
172     s->in_flight++;
173
174     for (i = 0; i < MAX_NBD_REQUESTS; i++) {
175         if (s->recv_coroutine[i] == NULL) {
176             s->recv_coroutine[i] = qemu_coroutine_self();
177             break;
178         }
179     }
180
181     assert(i < MAX_NBD_REQUESTS);
182     request->handle = INDEX_TO_HANDLE(s, i);
183 }
184
185 static void nbd_coroutine_end(NbdClientSession *s,
186     struct nbd_request *request)
187 {
188     int i = HANDLE_TO_INDEX(s, request->handle);
189     s->recv_coroutine[i] = NULL;
190     if (s->in_flight-- == MAX_NBD_REQUESTS) {
191         qemu_co_mutex_unlock(&s->free_sema);
192     }
193 }
194
195 static int nbd_co_readv_1(NbdClientSession *client, int64_t sector_num,
196                           int nb_sectors, QEMUIOVector *qiov,
197                           int offset)
198 {
199     struct nbd_request request = { .type = NBD_CMD_READ };
200     struct nbd_reply reply;
201     ssize_t ret;
202
203     request.from = sector_num * 512;
204     request.len = nb_sectors * 512;
205
206     nbd_coroutine_start(client, &request);
207     ret = nbd_co_send_request(client, &request, NULL, 0);
208     if (ret < 0) {
209         reply.error = -ret;
210     } else {
211         nbd_co_receive_reply(client, &request, &reply, qiov, offset);
212     }
213     nbd_coroutine_end(client, &request);
214     return -reply.error;
215
216 }
217
218 static int nbd_co_writev_1(NbdClientSession *client, int64_t sector_num,
219                            int nb_sectors, QEMUIOVector *qiov,
220                            int offset)
221 {
222     struct nbd_request request = { .type = NBD_CMD_WRITE };
223     struct nbd_reply reply;
224     ssize_t ret;
225
226     if (!bdrv_enable_write_cache(client->bs) &&
227         (client->nbdflags & NBD_FLAG_SEND_FUA)) {
228         request.type |= NBD_CMD_FLAG_FUA;
229     }
230
231     request.from = sector_num * 512;
232     request.len = nb_sectors * 512;
233
234     nbd_coroutine_start(client, &request);
235     ret = nbd_co_send_request(client, &request, qiov, offset);
236     if (ret < 0) {
237         reply.error = -ret;
238     } else {
239         nbd_co_receive_reply(client, &request, &reply, NULL, 0);
240     }
241     nbd_coroutine_end(client, &request);
242     return -reply.error;
243 }
244
245 /* qemu-nbd has a limit of slightly less than 1M per request.  Try to
246  * remain aligned to 4K. */
247 #define NBD_MAX_SECTORS 2040
248
249 int nbd_client_session_co_readv(NbdClientSession *client, int64_t sector_num,
250     int nb_sectors, QEMUIOVector *qiov)
251 {
252     int offset = 0;
253     int ret;
254     while (nb_sectors > NBD_MAX_SECTORS) {
255         ret = nbd_co_readv_1(client, sector_num,
256                              NBD_MAX_SECTORS, qiov, offset);
257         if (ret < 0) {
258             return ret;
259         }
260         offset += NBD_MAX_SECTORS * 512;
261         sector_num += NBD_MAX_SECTORS;
262         nb_sectors -= NBD_MAX_SECTORS;
263     }
264     return nbd_co_readv_1(client, sector_num, nb_sectors, qiov, offset);
265 }
266
267 int nbd_client_session_co_writev(NbdClientSession *client, int64_t sector_num,
268                                  int nb_sectors, QEMUIOVector *qiov)
269 {
270     int offset = 0;
271     int ret;
272     while (nb_sectors > NBD_MAX_SECTORS) {
273         ret = nbd_co_writev_1(client, sector_num,
274                               NBD_MAX_SECTORS, qiov, offset);
275         if (ret < 0) {
276             return ret;
277         }
278         offset += NBD_MAX_SECTORS * 512;
279         sector_num += NBD_MAX_SECTORS;
280         nb_sectors -= NBD_MAX_SECTORS;
281     }
282     return nbd_co_writev_1(client, sector_num, nb_sectors, qiov, offset);
283 }
284
285 int nbd_client_session_co_flush(NbdClientSession *client)
286 {
287     struct nbd_request request = { .type = NBD_CMD_FLUSH };
288     struct nbd_reply reply;
289     ssize_t ret;
290
291     if (!(client->nbdflags & NBD_FLAG_SEND_FLUSH)) {
292         return 0;
293     }
294
295     if (client->nbdflags & NBD_FLAG_SEND_FUA) {
296         request.type |= NBD_CMD_FLAG_FUA;
297     }
298
299     request.from = 0;
300     request.len = 0;
301
302     nbd_coroutine_start(client, &request);
303     ret = nbd_co_send_request(client, &request, NULL, 0);
304     if (ret < 0) {
305         reply.error = -ret;
306     } else {
307         nbd_co_receive_reply(client, &request, &reply, NULL, 0);
308     }
309     nbd_coroutine_end(client, &request);
310     return -reply.error;
311 }
312
313 int nbd_client_session_co_discard(NbdClientSession *client, int64_t sector_num,
314     int nb_sectors)
315 {
316     struct nbd_request request = { .type = NBD_CMD_TRIM };
317     struct nbd_reply reply;
318     ssize_t ret;
319
320     if (!(client->nbdflags & NBD_FLAG_SEND_TRIM)) {
321         return 0;
322     }
323     request.from = sector_num * 512;
324     request.len = nb_sectors * 512;
325
326     nbd_coroutine_start(client, &request);
327     ret = nbd_co_send_request(client, &request, NULL, 0);
328     if (ret < 0) {
329         reply.error = -ret;
330     } else {
331         nbd_co_receive_reply(client, &request, &reply, NULL, 0);
332     }
333     nbd_coroutine_end(client, &request);
334     return -reply.error;
335
336 }
337
338 void nbd_client_session_close(NbdClientSession *client)
339 {
340     struct nbd_request request = {
341         .type = NBD_CMD_DISC,
342         .from = 0,
343         .len = 0
344     };
345
346     if (!client->bs) {
347         return;
348     }
349     if (client->sock == -1) {
350         return;
351     }
352
353     nbd_send_request(client->sock, &request);
354
355     nbd_teardown_connection(client);
356     client->bs = NULL;
357 }
358
359 int nbd_client_session_init(NbdClientSession *client, BlockDriverState *bs,
360     int sock, const char *export)
361 {
362     int ret;
363
364     /* NBD handshake */
365     logout("session init %s\n", export);
366     qemu_set_block(sock);
367     ret = nbd_receive_negotiate(sock, export,
368                                 &client->nbdflags, &client->size,
369                                 &client->blocksize);
370     if (ret < 0) {
371         logout("Failed to negotiate with the NBD server\n");
372         closesocket(sock);
373         return ret;
374     }
375
376     qemu_co_mutex_init(&client->send_mutex);
377     qemu_co_mutex_init(&client->free_sema);
378     client->bs = bs;
379     client->sock = sock;
380
381     /* Now that we're connected, set the socket to be non-blocking and
382      * kick the reply mechanism.  */
383     qemu_set_nonblock(sock);
384     qemu_aio_set_fd_handler(sock, nbd_reply_ready, NULL, client);
385
386     logout("Established connection with NBD server\n");
387     return 0;
388 }
This page took 0.046657 seconds and 4 git commands to generate.