]> Git Repo - qemu.git/blob - posix-aio-compat.c
posix-aio-compat: Removed unused offset variable
[qemu.git] / posix-aio-compat.c
1 /*
2  * QEMU posix-aio emulation
3  *
4  * Copyright IBM, Corp. 2008
5  *
6  * Authors:
7  *  Anthony Liguori   <[email protected]>
8  *
9  * This work is licensed under the terms of the GNU GPL, version 2.  See
10  * the COPYING file in the top-level directory.
11  *
12  */
13
14 #include <sys/ioctl.h>
15 #include <sys/types.h>
16 #include <pthread.h>
17 #include <unistd.h>
18 #include <errno.h>
19 #include <time.h>
20 #include <string.h>
21 #include <stdlib.h>
22 #include <stdio.h>
23
24 #include "qemu-queue.h"
25 #include "osdep.h"
26 #include "sysemu.h"
27 #include "qemu-common.h"
28 #include "trace.h"
29 #include "block_int.h"
30
31 #include "block/raw-posix-aio.h"
32
33 static void do_spawn_thread(void);
34
35 struct qemu_paiocb {
36     BlockDriverAIOCB common;
37     int aio_fildes;
38     union {
39         struct iovec *aio_iov;
40         void *aio_ioctl_buf;
41     };
42     int aio_niov;
43     size_t aio_nbytes;
44 #define aio_ioctl_cmd   aio_nbytes /* for QEMU_AIO_IOCTL */
45     int ev_signo;
46     off_t aio_offset;
47
48     QTAILQ_ENTRY(qemu_paiocb) node;
49     int aio_type;
50     ssize_t ret;
51     int active;
52     struct qemu_paiocb *next;
53 };
54
55 typedef struct PosixAioState {
56     int rfd, wfd;
57     struct qemu_paiocb *first_aio;
58 } PosixAioState;
59
60
61 static pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER;
62 static pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
63 static pthread_t thread_id;
64 static pthread_attr_t attr;
65 static int max_threads = 64;
66 static int cur_threads = 0;
67 static int idle_threads = 0;
68 static int new_threads = 0;     /* backlog of threads we need to create */
69 static int pending_threads = 0; /* threads created but not running yet */
70 static QEMUBH *new_thread_bh;
71 static QTAILQ_HEAD(, qemu_paiocb) request_list;
72
73 #ifdef CONFIG_PREADV
74 static int preadv_present = 1;
75 #else
76 static int preadv_present = 0;
77 #endif
78
79 static void die2(int err, const char *what)
80 {
81     fprintf(stderr, "%s failed: %s\n", what, strerror(err));
82     abort();
83 }
84
85 static void die(const char *what)
86 {
87     die2(errno, what);
88 }
89
90 static void mutex_lock(pthread_mutex_t *mutex)
91 {
92     int ret = pthread_mutex_lock(mutex);
93     if (ret) die2(ret, "pthread_mutex_lock");
94 }
95
96 static void mutex_unlock(pthread_mutex_t *mutex)
97 {
98     int ret = pthread_mutex_unlock(mutex);
99     if (ret) die2(ret, "pthread_mutex_unlock");
100 }
101
102 static int cond_timedwait(pthread_cond_t *cond, pthread_mutex_t *mutex,
103                            struct timespec *ts)
104 {
105     int ret = pthread_cond_timedwait(cond, mutex, ts);
106     if (ret && ret != ETIMEDOUT) die2(ret, "pthread_cond_timedwait");
107     return ret;
108 }
109
110 static void cond_signal(pthread_cond_t *cond)
111 {
112     int ret = pthread_cond_signal(cond);
113     if (ret) die2(ret, "pthread_cond_signal");
114 }
115
116 static void thread_create(pthread_t *thread, pthread_attr_t *attr,
117                           void *(*start_routine)(void*), void *arg)
118 {
119     int ret = pthread_create(thread, attr, start_routine, arg);
120     if (ret) die2(ret, "pthread_create");
121 }
122
123 static ssize_t handle_aiocb_ioctl(struct qemu_paiocb *aiocb)
124 {
125     int ret;
126
127     ret = ioctl(aiocb->aio_fildes, aiocb->aio_ioctl_cmd, aiocb->aio_ioctl_buf);
128     if (ret == -1)
129         return -errno;
130
131     /*
132      * This looks weird, but the aio code only consideres a request
133      * successful if it has written the number full number of bytes.
134      *
135      * Now we overload aio_nbytes as aio_ioctl_cmd for the ioctl command,
136      * so in fact we return the ioctl command here to make posix_aio_read()
137      * happy..
138      */
139     return aiocb->aio_nbytes;
140 }
141
142 static ssize_t handle_aiocb_flush(struct qemu_paiocb *aiocb)
143 {
144     int ret;
145
146     ret = qemu_fdatasync(aiocb->aio_fildes);
147     if (ret == -1)
148         return -errno;
149     return 0;
150 }
151
152 #ifdef CONFIG_PREADV
153
154 static ssize_t
155 qemu_preadv(int fd, const struct iovec *iov, int nr_iov, off_t offset)
156 {
157     return preadv(fd, iov, nr_iov, offset);
158 }
159
160 static ssize_t
161 qemu_pwritev(int fd, const struct iovec *iov, int nr_iov, off_t offset)
162 {
163     return pwritev(fd, iov, nr_iov, offset);
164 }
165
166 #else
167
168 static ssize_t
169 qemu_preadv(int fd, const struct iovec *iov, int nr_iov, off_t offset)
170 {
171     return -ENOSYS;
172 }
173
174 static ssize_t
175 qemu_pwritev(int fd, const struct iovec *iov, int nr_iov, off_t offset)
176 {
177     return -ENOSYS;
178 }
179
180 #endif
181
182 static ssize_t handle_aiocb_rw_vector(struct qemu_paiocb *aiocb)
183 {
184     ssize_t len;
185
186     do {
187         if (aiocb->aio_type & QEMU_AIO_WRITE)
188             len = qemu_pwritev(aiocb->aio_fildes,
189                                aiocb->aio_iov,
190                                aiocb->aio_niov,
191                                aiocb->aio_offset);
192          else
193             len = qemu_preadv(aiocb->aio_fildes,
194                               aiocb->aio_iov,
195                               aiocb->aio_niov,
196                               aiocb->aio_offset);
197     } while (len == -1 && errno == EINTR);
198
199     if (len == -1)
200         return -errno;
201     return len;
202 }
203
204 /*
205  * Read/writes the data to/from a given linear buffer.
206  *
207  * Returns the number of bytes handles or -errno in case of an error. Short
208  * reads are only returned if the end of the file is reached.
209  */
210 static ssize_t handle_aiocb_rw_linear(struct qemu_paiocb *aiocb, char *buf)
211 {
212     ssize_t offset = 0;
213     ssize_t len;
214
215     while (offset < aiocb->aio_nbytes) {
216          if (aiocb->aio_type & QEMU_AIO_WRITE)
217              len = pwrite(aiocb->aio_fildes,
218                           (const char *)buf + offset,
219                           aiocb->aio_nbytes - offset,
220                           aiocb->aio_offset + offset);
221          else
222              len = pread(aiocb->aio_fildes,
223                          buf + offset,
224                          aiocb->aio_nbytes - offset,
225                          aiocb->aio_offset + offset);
226
227          if (len == -1 && errno == EINTR)
228              continue;
229          else if (len == -1) {
230              offset = -errno;
231              break;
232          } else if (len == 0)
233              break;
234
235          offset += len;
236     }
237
238     return offset;
239 }
240
241 static ssize_t handle_aiocb_rw(struct qemu_paiocb *aiocb)
242 {
243     ssize_t nbytes;
244     char *buf;
245
246     if (!(aiocb->aio_type & QEMU_AIO_MISALIGNED)) {
247         /*
248          * If there is just a single buffer, and it is properly aligned
249          * we can just use plain pread/pwrite without any problems.
250          */
251         if (aiocb->aio_niov == 1)
252              return handle_aiocb_rw_linear(aiocb, aiocb->aio_iov->iov_base);
253
254         /*
255          * We have more than one iovec, and all are properly aligned.
256          *
257          * Try preadv/pwritev first and fall back to linearizing the
258          * buffer if it's not supported.
259          */
260         if (preadv_present) {
261             nbytes = handle_aiocb_rw_vector(aiocb);
262             if (nbytes == aiocb->aio_nbytes)
263                 return nbytes;
264             if (nbytes < 0 && nbytes != -ENOSYS)
265                 return nbytes;
266             preadv_present = 0;
267         }
268
269         /*
270          * XXX(hch): short read/write.  no easy way to handle the reminder
271          * using these interfaces.  For now retry using plain
272          * pread/pwrite?
273          */
274     }
275
276     /*
277      * Ok, we have to do it the hard way, copy all segments into
278      * a single aligned buffer.
279      */
280     buf = qemu_blockalign(aiocb->common.bs, aiocb->aio_nbytes);
281     if (aiocb->aio_type & QEMU_AIO_WRITE) {
282         char *p = buf;
283         int i;
284
285         for (i = 0; i < aiocb->aio_niov; ++i) {
286             memcpy(p, aiocb->aio_iov[i].iov_base, aiocb->aio_iov[i].iov_len);
287             p += aiocb->aio_iov[i].iov_len;
288         }
289     }
290
291     nbytes = handle_aiocb_rw_linear(aiocb, buf);
292     if (!(aiocb->aio_type & QEMU_AIO_WRITE)) {
293         char *p = buf;
294         size_t count = aiocb->aio_nbytes, copy;
295         int i;
296
297         for (i = 0; i < aiocb->aio_niov && count; ++i) {
298             copy = count;
299             if (copy > aiocb->aio_iov[i].iov_len)
300                 copy = aiocb->aio_iov[i].iov_len;
301             memcpy(aiocb->aio_iov[i].iov_base, p, copy);
302             p     += copy;
303             count -= copy;
304         }
305     }
306     qemu_vfree(buf);
307
308     return nbytes;
309 }
310
311 static void *aio_thread(void *unused)
312 {
313     pid_t pid;
314
315     pid = getpid();
316
317     mutex_lock(&lock);
318     pending_threads--;
319     mutex_unlock(&lock);
320     do_spawn_thread();
321
322     while (1) {
323         struct qemu_paiocb *aiocb;
324         ssize_t ret = 0;
325         qemu_timeval tv;
326         struct timespec ts;
327
328         qemu_gettimeofday(&tv);
329         ts.tv_sec = tv.tv_sec + 10;
330         ts.tv_nsec = 0;
331
332         mutex_lock(&lock);
333
334         while (QTAILQ_EMPTY(&request_list) &&
335                !(ret == ETIMEDOUT)) {
336             idle_threads++;
337             ret = cond_timedwait(&cond, &lock, &ts);
338             idle_threads--;
339         }
340
341         if (QTAILQ_EMPTY(&request_list))
342             break;
343
344         aiocb = QTAILQ_FIRST(&request_list);
345         QTAILQ_REMOVE(&request_list, aiocb, node);
346         aiocb->active = 1;
347         mutex_unlock(&lock);
348
349         switch (aiocb->aio_type & QEMU_AIO_TYPE_MASK) {
350         case QEMU_AIO_READ:
351             ret = handle_aiocb_rw(aiocb);
352             if (ret >= 0 && ret < aiocb->aio_nbytes && aiocb->common.bs->growable) {
353                 /* A short read means that we have reached EOF. Pad the buffer
354                  * with zeros for bytes after EOF. */
355                 QEMUIOVector qiov;
356
357                 qemu_iovec_init_external(&qiov, aiocb->aio_iov,
358                                          aiocb->aio_niov);
359                 qemu_iovec_memset_skip(&qiov, 0, aiocb->aio_nbytes - ret, ret);
360
361                 ret = aiocb->aio_nbytes;
362             }
363             break;
364         case QEMU_AIO_WRITE:
365             ret = handle_aiocb_rw(aiocb);
366             break;
367         case QEMU_AIO_FLUSH:
368             ret = handle_aiocb_flush(aiocb);
369             break;
370         case QEMU_AIO_IOCTL:
371             ret = handle_aiocb_ioctl(aiocb);
372             break;
373         default:
374             fprintf(stderr, "invalid aio request (0x%x)\n", aiocb->aio_type);
375             ret = -EINVAL;
376             break;
377         }
378
379         mutex_lock(&lock);
380         aiocb->ret = ret;
381         mutex_unlock(&lock);
382
383         if (kill(pid, aiocb->ev_signo)) die("kill failed");
384     }
385
386     cur_threads--;
387     mutex_unlock(&lock);
388
389     return NULL;
390 }
391
392 static void do_spawn_thread(void)
393 {
394     sigset_t set, oldset;
395
396     mutex_lock(&lock);
397     if (!new_threads) {
398         mutex_unlock(&lock);
399         return;
400     }
401
402     new_threads--;
403     pending_threads++;
404
405     mutex_unlock(&lock);
406
407     /* block all signals */
408     if (sigfillset(&set)) die("sigfillset");
409     if (sigprocmask(SIG_SETMASK, &set, &oldset)) die("sigprocmask");
410
411     thread_create(&thread_id, &attr, aio_thread, NULL);
412
413     if (sigprocmask(SIG_SETMASK, &oldset, NULL)) die("sigprocmask restore");
414 }
415
416 static void spawn_thread_bh_fn(void *opaque)
417 {
418     do_spawn_thread();
419 }
420
421 static void spawn_thread(void)
422 {
423     cur_threads++;
424     new_threads++;
425     /* If there are threads being created, they will spawn new workers, so
426      * we don't spend time creating many threads in a loop holding a mutex or
427      * starving the current vcpu.
428      *
429      * If there are no idle threads, ask the main thread to create one, so we
430      * inherit the correct affinity instead of the vcpu affinity.
431      */
432     if (!pending_threads) {
433         qemu_bh_schedule(new_thread_bh);
434     }
435 }
436
437 static void qemu_paio_submit(struct qemu_paiocb *aiocb)
438 {
439     aiocb->ret = -EINPROGRESS;
440     aiocb->active = 0;
441     mutex_lock(&lock);
442     if (idle_threads == 0 && cur_threads < max_threads)
443         spawn_thread();
444     QTAILQ_INSERT_TAIL(&request_list, aiocb, node);
445     mutex_unlock(&lock);
446     cond_signal(&cond);
447 }
448
449 static ssize_t qemu_paio_return(struct qemu_paiocb *aiocb)
450 {
451     ssize_t ret;
452
453     mutex_lock(&lock);
454     ret = aiocb->ret;
455     mutex_unlock(&lock);
456
457     return ret;
458 }
459
460 static int qemu_paio_error(struct qemu_paiocb *aiocb)
461 {
462     ssize_t ret = qemu_paio_return(aiocb);
463
464     if (ret < 0)
465         ret = -ret;
466     else
467         ret = 0;
468
469     return ret;
470 }
471
472 static int posix_aio_process_queue(void *opaque)
473 {
474     PosixAioState *s = opaque;
475     struct qemu_paiocb *acb, **pacb;
476     int ret;
477     int result = 0;
478
479     for(;;) {
480         pacb = &s->first_aio;
481         for(;;) {
482             acb = *pacb;
483             if (!acb)
484                 return result;
485
486             ret = qemu_paio_error(acb);
487             if (ret == ECANCELED) {
488                 /* remove the request */
489                 *pacb = acb->next;
490                 qemu_aio_release(acb);
491                 result = 1;
492             } else if (ret != EINPROGRESS) {
493                 /* end of aio */
494                 if (ret == 0) {
495                     ret = qemu_paio_return(acb);
496                     if (ret == acb->aio_nbytes)
497                         ret = 0;
498                     else
499                         ret = -EINVAL;
500                 } else {
501                     ret = -ret;
502                 }
503
504                 trace_paio_complete(acb, acb->common.opaque, ret);
505
506                 /* remove the request */
507                 *pacb = acb->next;
508                 /* call the callback */
509                 acb->common.cb(acb->common.opaque, ret);
510                 qemu_aio_release(acb);
511                 result = 1;
512                 break;
513             } else {
514                 pacb = &acb->next;
515             }
516         }
517     }
518
519     return result;
520 }
521
522 static void posix_aio_read(void *opaque)
523 {
524     PosixAioState *s = opaque;
525     ssize_t len;
526
527     /* read all bytes from signal pipe */
528     for (;;) {
529         char bytes[16];
530
531         len = read(s->rfd, bytes, sizeof(bytes));
532         if (len == -1 && errno == EINTR)
533             continue; /* try again */
534         if (len == sizeof(bytes))
535             continue; /* more to read */
536         break;
537     }
538
539     posix_aio_process_queue(s);
540 }
541
542 static int posix_aio_flush(void *opaque)
543 {
544     PosixAioState *s = opaque;
545     return !!s->first_aio;
546 }
547
548 static PosixAioState *posix_aio_state;
549
550 static void aio_signal_handler(int signum)
551 {
552     if (posix_aio_state) {
553         char byte = 0;
554         ssize_t ret;
555
556         ret = write(posix_aio_state->wfd, &byte, sizeof(byte));
557         if (ret < 0 && errno != EAGAIN)
558             die("write()");
559     }
560
561     qemu_service_io();
562 }
563
564 static void paio_remove(struct qemu_paiocb *acb)
565 {
566     struct qemu_paiocb **pacb;
567
568     /* remove the callback from the queue */
569     pacb = &posix_aio_state->first_aio;
570     for(;;) {
571         if (*pacb == NULL) {
572             fprintf(stderr, "paio_remove: aio request not found!\n");
573             break;
574         } else if (*pacb == acb) {
575             *pacb = acb->next;
576             qemu_aio_release(acb);
577             break;
578         }
579         pacb = &(*pacb)->next;
580     }
581 }
582
583 static void paio_cancel(BlockDriverAIOCB *blockacb)
584 {
585     struct qemu_paiocb *acb = (struct qemu_paiocb *)blockacb;
586     int active = 0;
587
588     trace_paio_cancel(acb, acb->common.opaque);
589
590     mutex_lock(&lock);
591     if (!acb->active) {
592         QTAILQ_REMOVE(&request_list, acb, node);
593         acb->ret = -ECANCELED;
594     } else if (acb->ret == -EINPROGRESS) {
595         active = 1;
596     }
597     mutex_unlock(&lock);
598
599     if (active) {
600         /* fail safe: if the aio could not be canceled, we wait for
601            it */
602         while (qemu_paio_error(acb) == EINPROGRESS)
603             ;
604     }
605
606     paio_remove(acb);
607 }
608
609 static AIOPool raw_aio_pool = {
610     .aiocb_size         = sizeof(struct qemu_paiocb),
611     .cancel             = paio_cancel,
612 };
613
614 BlockDriverAIOCB *paio_submit(BlockDriverState *bs, int fd,
615         int64_t sector_num, QEMUIOVector *qiov, int nb_sectors,
616         BlockDriverCompletionFunc *cb, void *opaque, int type)
617 {
618     struct qemu_paiocb *acb;
619
620     acb = qemu_aio_get(&raw_aio_pool, bs, cb, opaque);
621     if (!acb)
622         return NULL;
623     acb->aio_type = type;
624     acb->aio_fildes = fd;
625     acb->ev_signo = SIGUSR2;
626
627     if (qiov) {
628         acb->aio_iov = qiov->iov;
629         acb->aio_niov = qiov->niov;
630     }
631     acb->aio_nbytes = nb_sectors * 512;
632     acb->aio_offset = sector_num * 512;
633
634     acb->next = posix_aio_state->first_aio;
635     posix_aio_state->first_aio = acb;
636
637     trace_paio_submit(acb, opaque, sector_num, nb_sectors, type);
638     qemu_paio_submit(acb);
639     return &acb->common;
640 }
641
642 BlockDriverAIOCB *paio_ioctl(BlockDriverState *bs, int fd,
643         unsigned long int req, void *buf,
644         BlockDriverCompletionFunc *cb, void *opaque)
645 {
646     struct qemu_paiocb *acb;
647
648     acb = qemu_aio_get(&raw_aio_pool, bs, cb, opaque);
649     if (!acb)
650         return NULL;
651     acb->aio_type = QEMU_AIO_IOCTL;
652     acb->aio_fildes = fd;
653     acb->ev_signo = SIGUSR2;
654     acb->aio_offset = 0;
655     acb->aio_ioctl_buf = buf;
656     acb->aio_ioctl_cmd = req;
657
658     acb->next = posix_aio_state->first_aio;
659     posix_aio_state->first_aio = acb;
660
661     qemu_paio_submit(acb);
662     return &acb->common;
663 }
664
665 int paio_init(void)
666 {
667     struct sigaction act;
668     PosixAioState *s;
669     int fds[2];
670     int ret;
671
672     if (posix_aio_state)
673         return 0;
674
675     s = g_malloc(sizeof(PosixAioState));
676
677     sigfillset(&act.sa_mask);
678     act.sa_flags = 0; /* do not restart syscalls to interrupt select() */
679     act.sa_handler = aio_signal_handler;
680     sigaction(SIGUSR2, &act, NULL);
681
682     s->first_aio = NULL;
683     if (qemu_pipe(fds) == -1) {
684         fprintf(stderr, "failed to create pipe\n");
685         return -1;
686     }
687
688     s->rfd = fds[0];
689     s->wfd = fds[1];
690
691     fcntl(s->rfd, F_SETFL, O_NONBLOCK);
692     fcntl(s->wfd, F_SETFL, O_NONBLOCK);
693
694     qemu_aio_set_fd_handler(s->rfd, posix_aio_read, NULL, posix_aio_flush,
695         posix_aio_process_queue, s);
696
697     ret = pthread_attr_init(&attr);
698     if (ret)
699         die2(ret, "pthread_attr_init");
700
701     ret = pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
702     if (ret)
703         die2(ret, "pthread_attr_setdetachstate");
704
705     QTAILQ_INIT(&request_list);
706     new_thread_bh = qemu_bh_new(spawn_thread_bh_fn, NULL);
707
708     posix_aio_state = s;
709     return 0;
710 }
This page took 0.063649 seconds and 4 git commands to generate.