#include <stdlib.h>
#include <stdio.h>
-#include "sys-queue.h"
+#include "qemu-queue.h"
#include "osdep.h"
#include "qemu-common.h"
#include "block_int.h"
int aio_fildes;
union {
struct iovec *aio_iov;
- void *aio_ioctl_buf;
+ void *aio_ioctl_buf;
};
int aio_niov;
size_t aio_nbytes;
int ev_signo;
off_t aio_offset;
- TAILQ_ENTRY(qemu_paiocb) node;
+ QTAILQ_ENTRY(qemu_paiocb) node;
int aio_type;
ssize_t ret;
int active;
struct qemu_paiocb *next;
+
+ int async_context_id;
};
typedef struct PosixAioState {
static int max_threads = 64;
static int cur_threads = 0;
static int idle_threads = 0;
-static TAILQ_HEAD(, qemu_paiocb) request_list;
+static QTAILQ_HEAD(, qemu_paiocb) request_list;
#ifdef CONFIG_PREADV
static int preadv_present = 1;
if (ret) die2(ret, "pthread_create");
}
-static size_t handle_aiocb_ioctl(struct qemu_paiocb *aiocb)
+static ssize_t handle_aiocb_ioctl(struct qemu_paiocb *aiocb)
{
- int ret;
-
- ret = ioctl(aiocb->aio_fildes, aiocb->aio_ioctl_cmd, aiocb->aio_ioctl_buf);
- if (ret == -1)
- return -errno;
-
- /*
- * This looks weird, but the aio code only consideres a request
- * successfull if it has written the number full number of bytes.
- *
- * Now we overload aio_nbytes as aio_ioctl_cmd for the ioctl command,
- * so in fact we return the ioctl command here to make posix_aio_read()
- * happy..
- */
- return aiocb->aio_nbytes;
+ int ret;
+
+ ret = ioctl(aiocb->aio_fildes, aiocb->aio_ioctl_cmd, aiocb->aio_ioctl_buf);
+ if (ret == -1)
+ return -errno;
+
+ /*
+ * This looks weird, but the aio code only consideres a request
+ * successfull if it has written the number full number of bytes.
+ *
+ * Now we overload aio_nbytes as aio_ioctl_cmd for the ioctl command,
+ * so in fact we return the ioctl command here to make posix_aio_read()
+ * happy..
+ */
+ return aiocb->aio_nbytes;
}
-static size_t handle_aiocb_flush(struct qemu_paiocb *aiocb)
+static ssize_t handle_aiocb_flush(struct qemu_paiocb *aiocb)
{
int ret;
#endif
-static size_t handle_aiocb_rw_vector(struct qemu_paiocb *aiocb)
+static ssize_t handle_aiocb_rw_vector(struct qemu_paiocb *aiocb)
{
size_t offset = 0;
ssize_t len;
return len;
}
-static size_t handle_aiocb_rw_linear(struct qemu_paiocb *aiocb, char *buf)
+static ssize_t handle_aiocb_rw_linear(struct qemu_paiocb *aiocb, char *buf)
{
- size_t offset = 0;
- size_t len;
+ ssize_t offset = 0;
+ ssize_t len;
while (offset < aiocb->aio_nbytes) {
if (aiocb->aio_type & QEMU_AIO_WRITE)
return offset;
}
-static size_t handle_aiocb_rw(struct qemu_paiocb *aiocb)
+static ssize_t handle_aiocb_rw(struct qemu_paiocb *aiocb)
{
- size_t nbytes;
+ ssize_t nbytes;
char *buf;
if (!(aiocb->aio_type & QEMU_AIO_MISALIGNED)) {
* Try preadv/pwritev first and fall back to linearizing the
* buffer if it's not supported.
*/
- if (preadv_present) {
+ if (preadv_present) {
nbytes = handle_aiocb_rw_vector(aiocb);
if (nbytes == aiocb->aio_nbytes)
- return nbytes;
+ return nbytes;
if (nbytes < 0 && nbytes != -ENOSYS)
return nbytes;
preadv_present = 0;
static void *aio_thread(void *unused)
{
pid_t pid;
- sigset_t set;
pid = getpid();
- /* block all signals */
- if (sigfillset(&set)) die("sigfillset");
- if (sigprocmask(SIG_BLOCK, &set, NULL)) die("sigprocmask");
-
while (1) {
struct qemu_paiocb *aiocb;
- size_t ret = 0;
+ ssize_t ret = 0;
qemu_timeval tv;
struct timespec ts;
mutex_lock(&lock);
- while (TAILQ_EMPTY(&request_list) &&
+ while (QTAILQ_EMPTY(&request_list) &&
!(ret == ETIMEDOUT)) {
ret = cond_timedwait(&cond, &lock, &ts);
}
- if (TAILQ_EMPTY(&request_list))
+ if (QTAILQ_EMPTY(&request_list))
break;
- aiocb = TAILQ_FIRST(&request_list);
- TAILQ_REMOVE(&request_list, aiocb, node);
+ aiocb = QTAILQ_FIRST(&request_list);
+ QTAILQ_REMOVE(&request_list, aiocb, node);
aiocb->active = 1;
idle_threads--;
mutex_unlock(&lock);
switch (aiocb->aio_type & QEMU_AIO_TYPE_MASK) {
case QEMU_AIO_READ:
case QEMU_AIO_WRITE:
- ret = handle_aiocb_rw(aiocb);
- break;
+ ret = handle_aiocb_rw(aiocb);
+ break;
case QEMU_AIO_FLUSH:
- ret = handle_aiocb_flush(aiocb);
- break;
+ ret = handle_aiocb_flush(aiocb);
+ break;
case QEMU_AIO_IOCTL:
- ret = handle_aiocb_ioctl(aiocb);
- break;
- default:
- fprintf(stderr, "invalid aio request (0x%x)\n", aiocb->aio_type);
- ret = -EINVAL;
- break;
- }
+ ret = handle_aiocb_ioctl(aiocb);
+ break;
+ default:
+ fprintf(stderr, "invalid aio request (0x%x)\n", aiocb->aio_type);
+ ret = -EINVAL;
+ break;
+ }
mutex_lock(&lock);
aiocb->ret = ret;
static void spawn_thread(void)
{
+ sigset_t set, oldset;
+
cur_threads++;
idle_threads++;
+
+ /* block all signals */
+ if (sigfillset(&set)) die("sigfillset");
+ if (sigprocmask(SIG_SETMASK, &set, &oldset)) die("sigprocmask");
+
thread_create(&thread_id, &attr, aio_thread, NULL);
+
+ if (sigprocmask(SIG_SETMASK, &oldset, NULL)) die("sigprocmask restore");
}
static void qemu_paio_submit(struct qemu_paiocb *aiocb)
mutex_lock(&lock);
if (idle_threads == 0 && cur_threads < max_threads)
spawn_thread();
- TAILQ_INSERT_TAIL(&request_list, aiocb, node);
+ QTAILQ_INSERT_TAIL(&request_list, aiocb, node);
mutex_unlock(&lock);
cond_signal(&cond);
}
return ret;
}
-static void posix_aio_read(void *opaque)
+static int posix_aio_process_queue(void *opaque)
{
PosixAioState *s = opaque;
struct qemu_paiocb *acb, **pacb;
int ret;
- ssize_t len;
-
- /* read all bytes from signal pipe */
- for (;;) {
- char bytes[16];
-
- len = read(s->rfd, bytes, sizeof(bytes));
- if (len == -1 && errno == EINTR)
- continue; /* try again */
- if (len == sizeof(bytes))
- continue; /* more to read */
- break;
- }
+ int result = 0;
+ int async_context_id = get_async_context_id();
for(;;) {
pacb = &s->first_aio;
for(;;) {
acb = *pacb;
if (!acb)
- goto the_end;
+ return result;
+
+ /* we're only interested in requests in the right context */
+ if (acb->async_context_id != async_context_id) {
+ pacb = &acb->next;
+ continue;
+ }
+
ret = qemu_paio_error(acb);
if (ret == ECANCELED) {
/* remove the request */
*pacb = acb->next;
qemu_aio_release(acb);
+ result = 1;
} else if (ret != EINPROGRESS) {
/* end of aio */
if (ret == 0) {
/* call the callback */
acb->common.cb(acb->common.opaque, ret);
qemu_aio_release(acb);
+ result = 1;
break;
} else {
pacb = &acb->next;
}
}
}
- the_end: ;
+
+ return result;
+}
+
+static void posix_aio_read(void *opaque)
+{
+ PosixAioState *s = opaque;
+ ssize_t len;
+
+ /* read all bytes from signal pipe */
+ for (;;) {
+ char bytes[16];
+
+ len = read(s->rfd, bytes, sizeof(bytes));
+ if (len == -1 && errno == EINTR)
+ continue; /* try again */
+ if (len == sizeof(bytes))
+ continue; /* more to read */
+ break;
+ }
+
+ posix_aio_process_queue(s);
}
static int posix_aio_flush(void *opaque)
{
if (posix_aio_state) {
char byte = 0;
+ ssize_t ret;
- write(posix_aio_state->wfd, &byte, sizeof(byte));
+ ret = write(posix_aio_state->wfd, &byte, sizeof(byte));
+ if (ret < 0 && errno != EAGAIN)
+ die("write()");
}
qemu_service_io();
mutex_lock(&lock);
if (!acb->active) {
- TAILQ_REMOVE(&request_list, acb, node);
+ QTAILQ_REMOVE(&request_list, acb, node);
acb->ret = -ECANCELED;
} else if (acb->ret == -EINPROGRESS) {
active = 1;
.cancel = paio_cancel,
};
-BlockDriverAIOCB *paio_submit(BlockDriverState *bs, void *aio_ctx, int fd,
+BlockDriverAIOCB *paio_submit(BlockDriverState *bs, int fd,
int64_t sector_num, QEMUIOVector *qiov, int nb_sectors,
BlockDriverCompletionFunc *cb, void *opaque, int type)
{
acb->aio_type = type;
acb->aio_fildes = fd;
acb->ev_signo = SIGUSR2;
+ acb->async_context_id = get_async_context_id();
+
if (qiov) {
acb->aio_iov = qiov->iov;
acb->aio_niov = qiov->niov;
return &acb->common;
}
-void *paio_init(void)
+int paio_init(void)
{
struct sigaction act;
PosixAioState *s;
int ret;
if (posix_aio_state)
- return posix_aio_state;
+ return 0;
s = qemu_malloc(sizeof(PosixAioState));
sigaction(SIGUSR2, &act, NULL);
s->first_aio = NULL;
- if (pipe(fds) == -1) {
+ if (qemu_pipe(fds) == -1) {
fprintf(stderr, "failed to create pipe\n");
- return NULL;
+ return -1;
}
s->rfd = fds[0];
fcntl(s->rfd, F_SETFL, O_NONBLOCK);
fcntl(s->wfd, F_SETFL, O_NONBLOCK);
- qemu_aio_set_fd_handler(s->rfd, posix_aio_read, NULL, posix_aio_flush, s);
+ qemu_aio_set_fd_handler(s->rfd, posix_aio_read, NULL, posix_aio_flush,
+ posix_aio_process_queue, s);
ret = pthread_attr_init(&attr);
if (ret)
if (ret)
die2(ret, "pthread_attr_setdetachstate");
- TAILQ_INIT(&request_list);
+ QTAILQ_INIT(&request_list);
posix_aio_state = s;
-
- return posix_aio_state;
+ return 0;
}