]> Git Repo - qemu.git/blob - posix-aio-compat.c
audio/sdlaudio: remove unused variable
[qemu.git] / posix-aio-compat.c
1 /*
2  * QEMU posix-aio emulation
3  *
4  * Copyright IBM, Corp. 2008
5  *
6  * Authors:
7  *  Anthony Liguori   <[email protected]>
8  *
9  * This work is licensed under the terms of the GNU GPL, version 2.  See
10  * the COPYING file in the top-level directory.
11  *
12  */
13
14 #include <sys/ioctl.h>
15 #include <sys/types.h>
16 #include <pthread.h>
17 #include <unistd.h>
18 #include <errno.h>
19 #include <time.h>
20 #include <signal.h>
21 #include <string.h>
22 #include <stdlib.h>
23 #include <stdio.h>
24
25 #include "qemu-queue.h"
26 #include "osdep.h"
27 #include "qemu-common.h"
28 #include "block_int.h"
29
30 #include "block/raw-posix-aio.h"
31
32
33 struct qemu_paiocb {
34     BlockDriverAIOCB common;
35     int aio_fildes;
36     union {
37         struct iovec *aio_iov;
38         void *aio_ioctl_buf;
39     };
40     int aio_niov;
41     size_t aio_nbytes;
42 #define aio_ioctl_cmd   aio_nbytes /* for QEMU_AIO_IOCTL */
43     int ev_signo;
44     off_t aio_offset;
45
46     QTAILQ_ENTRY(qemu_paiocb) node;
47     int aio_type;
48     ssize_t ret;
49     int active;
50     struct qemu_paiocb *next;
51
52     int async_context_id;
53 };
54
55 typedef struct PosixAioState {
56     int rfd, wfd;
57     struct qemu_paiocb *first_aio;
58 } PosixAioState;
59
60
61 static pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER;
62 static pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
63 static pthread_t thread_id;
64 static pthread_attr_t attr;
65 static int max_threads = 64;
66 static int cur_threads = 0;
67 static int idle_threads = 0;
68 static QTAILQ_HEAD(, qemu_paiocb) request_list;
69
70 #ifdef CONFIG_PREADV
71 static int preadv_present = 1;
72 #else
73 static int preadv_present = 0;
74 #endif
75
76 static void die2(int err, const char *what)
77 {
78     fprintf(stderr, "%s failed: %s\n", what, strerror(err));
79     abort();
80 }
81
82 static void die(const char *what)
83 {
84     die2(errno, what);
85 }
86
87 static void mutex_lock(pthread_mutex_t *mutex)
88 {
89     int ret = pthread_mutex_lock(mutex);
90     if (ret) die2(ret, "pthread_mutex_lock");
91 }
92
93 static void mutex_unlock(pthread_mutex_t *mutex)
94 {
95     int ret = pthread_mutex_unlock(mutex);
96     if (ret) die2(ret, "pthread_mutex_unlock");
97 }
98
99 static int cond_timedwait(pthread_cond_t *cond, pthread_mutex_t *mutex,
100                            struct timespec *ts)
101 {
102     int ret = pthread_cond_timedwait(cond, mutex, ts);
103     if (ret && ret != ETIMEDOUT) die2(ret, "pthread_cond_timedwait");
104     return ret;
105 }
106
107 static void cond_signal(pthread_cond_t *cond)
108 {
109     int ret = pthread_cond_signal(cond);
110     if (ret) die2(ret, "pthread_cond_signal");
111 }
112
113 static void thread_create(pthread_t *thread, pthread_attr_t *attr,
114                           void *(*start_routine)(void*), void *arg)
115 {
116     int ret = pthread_create(thread, attr, start_routine, arg);
117     if (ret) die2(ret, "pthread_create");
118 }
119
120 static ssize_t handle_aiocb_ioctl(struct qemu_paiocb *aiocb)
121 {
122         int ret;
123
124         ret = ioctl(aiocb->aio_fildes, aiocb->aio_ioctl_cmd, aiocb->aio_ioctl_buf);
125         if (ret == -1)
126                 return -errno;
127
128         /*
129          * This looks weird, but the aio code only consideres a request
130          * successfull if it has written the number full number of bytes.
131          *
132          * Now we overload aio_nbytes as aio_ioctl_cmd for the ioctl command,
133          * so in fact we return the ioctl command here to make posix_aio_read()
134          * happy..
135          */
136         return aiocb->aio_nbytes;
137 }
138
139 static ssize_t handle_aiocb_flush(struct qemu_paiocb *aiocb)
140 {
141     int ret;
142
143     ret = qemu_fdatasync(aiocb->aio_fildes);
144     if (ret == -1)
145         return -errno;
146     return 0;
147 }
148
149 #ifdef CONFIG_PREADV
150
151 static ssize_t
152 qemu_preadv(int fd, const struct iovec *iov, int nr_iov, off_t offset)
153 {
154     return preadv(fd, iov, nr_iov, offset);
155 }
156
157 static ssize_t
158 qemu_pwritev(int fd, const struct iovec *iov, int nr_iov, off_t offset)
159 {
160     return pwritev(fd, iov, nr_iov, offset);
161 }
162
163 #else
164
165 static ssize_t
166 qemu_preadv(int fd, const struct iovec *iov, int nr_iov, off_t offset)
167 {
168     return -ENOSYS;
169 }
170
171 static ssize_t
172 qemu_pwritev(int fd, const struct iovec *iov, int nr_iov, off_t offset)
173 {
174     return -ENOSYS;
175 }
176
177 #endif
178
179 static ssize_t handle_aiocb_rw_vector(struct qemu_paiocb *aiocb)
180 {
181     size_t offset = 0;
182     ssize_t len;
183
184     do {
185         if (aiocb->aio_type & QEMU_AIO_WRITE)
186             len = qemu_pwritev(aiocb->aio_fildes,
187                                aiocb->aio_iov,
188                                aiocb->aio_niov,
189                                aiocb->aio_offset + offset);
190          else
191             len = qemu_preadv(aiocb->aio_fildes,
192                               aiocb->aio_iov,
193                               aiocb->aio_niov,
194                               aiocb->aio_offset + offset);
195     } while (len == -1 && errno == EINTR);
196
197     if (len == -1)
198         return -errno;
199     return len;
200 }
201
202 static ssize_t handle_aiocb_rw_linear(struct qemu_paiocb *aiocb, char *buf)
203 {
204     ssize_t offset = 0;
205     ssize_t len;
206
207     while (offset < aiocb->aio_nbytes) {
208          if (aiocb->aio_type & QEMU_AIO_WRITE)
209              len = pwrite(aiocb->aio_fildes,
210                           (const char *)buf + offset,
211                           aiocb->aio_nbytes - offset,
212                           aiocb->aio_offset + offset);
213          else
214              len = pread(aiocb->aio_fildes,
215                          buf + offset,
216                          aiocb->aio_nbytes - offset,
217                          aiocb->aio_offset + offset);
218
219          if (len == -1 && errno == EINTR)
220              continue;
221          else if (len == -1) {
222              offset = -errno;
223              break;
224          } else if (len == 0)
225              break;
226
227          offset += len;
228     }
229
230     return offset;
231 }
232
233 static ssize_t handle_aiocb_rw(struct qemu_paiocb *aiocb)
234 {
235     ssize_t nbytes;
236     char *buf;
237
238     if (!(aiocb->aio_type & QEMU_AIO_MISALIGNED)) {
239         /*
240          * If there is just a single buffer, and it is properly aligned
241          * we can just use plain pread/pwrite without any problems.
242          */
243         if (aiocb->aio_niov == 1)
244              return handle_aiocb_rw_linear(aiocb, aiocb->aio_iov->iov_base);
245
246         /*
247          * We have more than one iovec, and all are properly aligned.
248          *
249          * Try preadv/pwritev first and fall back to linearizing the
250          * buffer if it's not supported.
251          */
252         if (preadv_present) {
253             nbytes = handle_aiocb_rw_vector(aiocb);
254             if (nbytes == aiocb->aio_nbytes)
255                 return nbytes;
256             if (nbytes < 0 && nbytes != -ENOSYS)
257                 return nbytes;
258             preadv_present = 0;
259         }
260
261         /*
262          * XXX(hch): short read/write.  no easy way to handle the reminder
263          * using these interfaces.  For now retry using plain
264          * pread/pwrite?
265          */
266     }
267
268     /*
269      * Ok, we have to do it the hard way, copy all segments into
270      * a single aligned buffer.
271      */
272     buf = qemu_memalign(512, aiocb->aio_nbytes);
273     if (aiocb->aio_type & QEMU_AIO_WRITE) {
274         char *p = buf;
275         int i;
276
277         for (i = 0; i < aiocb->aio_niov; ++i) {
278             memcpy(p, aiocb->aio_iov[i].iov_base, aiocb->aio_iov[i].iov_len);
279             p += aiocb->aio_iov[i].iov_len;
280         }
281     }
282
283     nbytes = handle_aiocb_rw_linear(aiocb, buf);
284     if (!(aiocb->aio_type & QEMU_AIO_WRITE)) {
285         char *p = buf;
286         size_t count = aiocb->aio_nbytes, copy;
287         int i;
288
289         for (i = 0; i < aiocb->aio_niov && count; ++i) {
290             copy = count;
291             if (copy > aiocb->aio_iov[i].iov_len)
292                 copy = aiocb->aio_iov[i].iov_len;
293             memcpy(aiocb->aio_iov[i].iov_base, p, copy);
294             p     += copy;
295             count -= copy;
296         }
297     }
298     qemu_vfree(buf);
299
300     return nbytes;
301 }
302
303 static void *aio_thread(void *unused)
304 {
305     pid_t pid;
306
307     pid = getpid();
308
309     while (1) {
310         struct qemu_paiocb *aiocb;
311         ssize_t ret = 0;
312         qemu_timeval tv;
313         struct timespec ts;
314
315         qemu_gettimeofday(&tv);
316         ts.tv_sec = tv.tv_sec + 10;
317         ts.tv_nsec = 0;
318
319         mutex_lock(&lock);
320
321         while (QTAILQ_EMPTY(&request_list) &&
322                !(ret == ETIMEDOUT)) {
323             ret = cond_timedwait(&cond, &lock, &ts);
324         }
325
326         if (QTAILQ_EMPTY(&request_list))
327             break;
328
329         aiocb = QTAILQ_FIRST(&request_list);
330         QTAILQ_REMOVE(&request_list, aiocb, node);
331         aiocb->active = 1;
332         idle_threads--;
333         mutex_unlock(&lock);
334
335         switch (aiocb->aio_type & QEMU_AIO_TYPE_MASK) {
336         case QEMU_AIO_READ:
337         case QEMU_AIO_WRITE:
338                 ret = handle_aiocb_rw(aiocb);
339                 break;
340         case QEMU_AIO_FLUSH:
341                 ret = handle_aiocb_flush(aiocb);
342                 break;
343         case QEMU_AIO_IOCTL:
344                 ret = handle_aiocb_ioctl(aiocb);
345                 break;
346         default:
347                 fprintf(stderr, "invalid aio request (0x%x)\n", aiocb->aio_type);
348                 ret = -EINVAL;
349                 break;
350         }
351
352         mutex_lock(&lock);
353         aiocb->ret = ret;
354         idle_threads++;
355         mutex_unlock(&lock);
356
357         if (kill(pid, aiocb->ev_signo)) die("kill failed");
358     }
359
360     idle_threads--;
361     cur_threads--;
362     mutex_unlock(&lock);
363
364     return NULL;
365 }
366
367 static void spawn_thread(void)
368 {
369     sigset_t set, oldset;
370
371     cur_threads++;
372     idle_threads++;
373
374     /* block all signals */
375     if (sigfillset(&set)) die("sigfillset");
376     if (sigprocmask(SIG_SETMASK, &set, &oldset)) die("sigprocmask");
377
378     thread_create(&thread_id, &attr, aio_thread, NULL);
379
380     if (sigprocmask(SIG_SETMASK, &oldset, NULL)) die("sigprocmask restore");
381 }
382
383 static void qemu_paio_submit(struct qemu_paiocb *aiocb)
384 {
385     aiocb->ret = -EINPROGRESS;
386     aiocb->active = 0;
387     mutex_lock(&lock);
388     if (idle_threads == 0 && cur_threads < max_threads)
389         spawn_thread();
390     QTAILQ_INSERT_TAIL(&request_list, aiocb, node);
391     mutex_unlock(&lock);
392     cond_signal(&cond);
393 }
394
395 static ssize_t qemu_paio_return(struct qemu_paiocb *aiocb)
396 {
397     ssize_t ret;
398
399     mutex_lock(&lock);
400     ret = aiocb->ret;
401     mutex_unlock(&lock);
402
403     return ret;
404 }
405
406 static int qemu_paio_error(struct qemu_paiocb *aiocb)
407 {
408     ssize_t ret = qemu_paio_return(aiocb);
409
410     if (ret < 0)
411         ret = -ret;
412     else
413         ret = 0;
414
415     return ret;
416 }
417
418 static int posix_aio_process_queue(void *opaque)
419 {
420     PosixAioState *s = opaque;
421     struct qemu_paiocb *acb, **pacb;
422     int ret;
423     int result = 0;
424     int async_context_id = get_async_context_id();
425
426     for(;;) {
427         pacb = &s->first_aio;
428         for(;;) {
429             acb = *pacb;
430             if (!acb)
431                 return result;
432
433             /* we're only interested in requests in the right context */
434             if (acb->async_context_id != async_context_id) {
435                 pacb = &acb->next;
436                 continue;
437             }
438
439             ret = qemu_paio_error(acb);
440             if (ret == ECANCELED) {
441                 /* remove the request */
442                 *pacb = acb->next;
443                 qemu_aio_release(acb);
444                 result = 1;
445             } else if (ret != EINPROGRESS) {
446                 /* end of aio */
447                 if (ret == 0) {
448                     ret = qemu_paio_return(acb);
449                     if (ret == acb->aio_nbytes)
450                         ret = 0;
451                     else
452                         ret = -EINVAL;
453                 } else {
454                     ret = -ret;
455                 }
456                 /* remove the request */
457                 *pacb = acb->next;
458                 /* call the callback */
459                 acb->common.cb(acb->common.opaque, ret);
460                 qemu_aio_release(acb);
461                 result = 1;
462                 break;
463             } else {
464                 pacb = &acb->next;
465             }
466         }
467     }
468
469     return result;
470 }
471
472 static void posix_aio_read(void *opaque)
473 {
474     PosixAioState *s = opaque;
475     ssize_t len;
476
477     /* read all bytes from signal pipe */
478     for (;;) {
479         char bytes[16];
480
481         len = read(s->rfd, bytes, sizeof(bytes));
482         if (len == -1 && errno == EINTR)
483             continue; /* try again */
484         if (len == sizeof(bytes))
485             continue; /* more to read */
486         break;
487     }
488
489     posix_aio_process_queue(s);
490 }
491
492 static int posix_aio_flush(void *opaque)
493 {
494     PosixAioState *s = opaque;
495     return !!s->first_aio;
496 }
497
498 static PosixAioState *posix_aio_state;
499
500 static void aio_signal_handler(int signum)
501 {
502     if (posix_aio_state) {
503         char byte = 0;
504         ssize_t ret;
505
506         ret = write(posix_aio_state->wfd, &byte, sizeof(byte));
507         if (ret < 0 && errno != EAGAIN)
508             die("write()");
509     }
510
511     qemu_service_io();
512 }
513
514 static void paio_remove(struct qemu_paiocb *acb)
515 {
516     struct qemu_paiocb **pacb;
517
518     /* remove the callback from the queue */
519     pacb = &posix_aio_state->first_aio;
520     for(;;) {
521         if (*pacb == NULL) {
522             fprintf(stderr, "paio_remove: aio request not found!\n");
523             break;
524         } else if (*pacb == acb) {
525             *pacb = acb->next;
526             qemu_aio_release(acb);
527             break;
528         }
529         pacb = &(*pacb)->next;
530     }
531 }
532
533 static void paio_cancel(BlockDriverAIOCB *blockacb)
534 {
535     struct qemu_paiocb *acb = (struct qemu_paiocb *)blockacb;
536     int active = 0;
537
538     mutex_lock(&lock);
539     if (!acb->active) {
540         QTAILQ_REMOVE(&request_list, acb, node);
541         acb->ret = -ECANCELED;
542     } else if (acb->ret == -EINPROGRESS) {
543         active = 1;
544     }
545     mutex_unlock(&lock);
546
547     if (active) {
548         /* fail safe: if the aio could not be canceled, we wait for
549            it */
550         while (qemu_paio_error(acb) == EINPROGRESS)
551             ;
552     }
553
554     paio_remove(acb);
555 }
556
557 static AIOPool raw_aio_pool = {
558     .aiocb_size         = sizeof(struct qemu_paiocb),
559     .cancel             = paio_cancel,
560 };
561
562 BlockDriverAIOCB *paio_submit(BlockDriverState *bs, int fd,
563         int64_t sector_num, QEMUIOVector *qiov, int nb_sectors,
564         BlockDriverCompletionFunc *cb, void *opaque, int type)
565 {
566     struct qemu_paiocb *acb;
567
568     acb = qemu_aio_get(&raw_aio_pool, bs, cb, opaque);
569     if (!acb)
570         return NULL;
571     acb->aio_type = type;
572     acb->aio_fildes = fd;
573     acb->ev_signo = SIGUSR2;
574     acb->async_context_id = get_async_context_id();
575
576     if (qiov) {
577         acb->aio_iov = qiov->iov;
578         acb->aio_niov = qiov->niov;
579     }
580     acb->aio_nbytes = nb_sectors * 512;
581     acb->aio_offset = sector_num * 512;
582
583     acb->next = posix_aio_state->first_aio;
584     posix_aio_state->first_aio = acb;
585
586     qemu_paio_submit(acb);
587     return &acb->common;
588 }
589
590 BlockDriverAIOCB *paio_ioctl(BlockDriverState *bs, int fd,
591         unsigned long int req, void *buf,
592         BlockDriverCompletionFunc *cb, void *opaque)
593 {
594     struct qemu_paiocb *acb;
595
596     acb = qemu_aio_get(&raw_aio_pool, bs, cb, opaque);
597     if (!acb)
598         return NULL;
599     acb->aio_type = QEMU_AIO_IOCTL;
600     acb->aio_fildes = fd;
601     acb->ev_signo = SIGUSR2;
602     acb->aio_offset = 0;
603     acb->aio_ioctl_buf = buf;
604     acb->aio_ioctl_cmd = req;
605
606     acb->next = posix_aio_state->first_aio;
607     posix_aio_state->first_aio = acb;
608
609     qemu_paio_submit(acb);
610     return &acb->common;
611 }
612
613 int paio_init(void)
614 {
615     struct sigaction act;
616     PosixAioState *s;
617     int fds[2];
618     int ret;
619
620     if (posix_aio_state)
621         return 0;
622
623     s = qemu_malloc(sizeof(PosixAioState));
624
625     sigfillset(&act.sa_mask);
626     act.sa_flags = 0; /* do not restart syscalls to interrupt select() */
627     act.sa_handler = aio_signal_handler;
628     sigaction(SIGUSR2, &act, NULL);
629
630     s->first_aio = NULL;
631     if (qemu_pipe(fds) == -1) {
632         fprintf(stderr, "failed to create pipe\n");
633         return -1;
634     }
635
636     s->rfd = fds[0];
637     s->wfd = fds[1];
638
639     fcntl(s->rfd, F_SETFL, O_NONBLOCK);
640     fcntl(s->wfd, F_SETFL, O_NONBLOCK);
641
642     qemu_aio_set_fd_handler(s->rfd, posix_aio_read, NULL, posix_aio_flush,
643         posix_aio_process_queue, s);
644
645     ret = pthread_attr_init(&attr);
646     if (ret)
647         die2(ret, "pthread_attr_init");
648
649     ret = pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
650     if (ret)
651         die2(ret, "pthread_attr_setdetachstate");
652
653     QTAILQ_INIT(&request_list);
654
655     posix_aio_state = s;
656     return 0;
657 }
This page took 0.059764 seconds and 4 git commands to generate.