#include "block/raw-posix-aio.h"
+static void do_spawn_thread(void);
struct qemu_paiocb {
BlockDriverAIOCB common;
int aio_niov;
size_t aio_nbytes;
#define aio_ioctl_cmd aio_nbytes /* for QEMU_AIO_IOCTL */
- int ev_signo;
off_t aio_offset;
QTAILQ_ENTRY(qemu_paiocb) node;
ssize_t ret;
int active;
struct qemu_paiocb *next;
-
- int async_context_id;
};
typedef struct PosixAioState {
static int max_threads = 64;
static int cur_threads = 0;
static int idle_threads = 0;
+static int new_threads = 0; /* backlog of threads we need to create */
+static int pending_threads = 0; /* threads created but not running yet */
+static QEMUBH *new_thread_bh;
static QTAILQ_HEAD(, qemu_paiocb) request_list;
#ifdef CONFIG_PREADV
static ssize_t handle_aiocb_rw_vector(struct qemu_paiocb *aiocb)
{
- size_t offset = 0;
ssize_t len;
do {
len = qemu_pwritev(aiocb->aio_fildes,
aiocb->aio_iov,
aiocb->aio_niov,
- aiocb->aio_offset + offset);
+ aiocb->aio_offset);
else
len = qemu_preadv(aiocb->aio_fildes,
aiocb->aio_iov,
aiocb->aio_niov,
- aiocb->aio_offset + offset);
+ aiocb->aio_offset);
} while (len == -1 && errno == EINTR);
if (len == -1)
return len;
}
+/*
+ * Read/writes the data to/from a given linear buffer.
+ *
+ * Returns the number of bytes handles or -errno in case of an error. Short
+ * reads are only returned if the end of the file is reached.
+ */
static ssize_t handle_aiocb_rw_linear(struct qemu_paiocb *aiocb, char *buf)
{
ssize_t offset = 0;
return nbytes;
}
+static void posix_aio_notify_event(void);
+
static void *aio_thread(void *unused)
{
- pid_t pid;
-
- pid = getpid();
+ mutex_lock(&lock);
+ pending_threads--;
+ mutex_unlock(&lock);
+ do_spawn_thread();
while (1) {
struct qemu_paiocb *aiocb;
switch (aiocb->aio_type & QEMU_AIO_TYPE_MASK) {
case QEMU_AIO_READ:
+ ret = handle_aiocb_rw(aiocb);
+ if (ret >= 0 && ret < aiocb->aio_nbytes && aiocb->common.bs->growable) {
+ /* A short read means that we have reached EOF. Pad the buffer
+ * with zeros for bytes after EOF. */
+ QEMUIOVector qiov;
+
+ qemu_iovec_init_external(&qiov, aiocb->aio_iov,
+ aiocb->aio_niov);
+ qemu_iovec_memset_skip(&qiov, 0, aiocb->aio_nbytes - ret, ret);
+
+ ret = aiocb->aio_nbytes;
+ }
+ break;
case QEMU_AIO_WRITE:
ret = handle_aiocb_rw(aiocb);
break;
aiocb->ret = ret;
mutex_unlock(&lock);
- if (kill(pid, aiocb->ev_signo)) die("kill failed");
+ posix_aio_notify_event();
}
cur_threads--;
return NULL;
}
-static void spawn_thread(void)
+static void do_spawn_thread(void)
{
sigset_t set, oldset;
- cur_threads++;
+ mutex_lock(&lock);
+ if (!new_threads) {
+ mutex_unlock(&lock);
+ return;
+ }
+
+ new_threads--;
+ pending_threads++;
+
+ mutex_unlock(&lock);
/* block all signals */
if (sigfillset(&set)) die("sigfillset");
if (sigprocmask(SIG_SETMASK, &oldset, NULL)) die("sigprocmask restore");
}
+static void spawn_thread_bh_fn(void *opaque)
+{
+ do_spawn_thread();
+}
+
+static void spawn_thread(void)
+{
+ cur_threads++;
+ new_threads++;
+ /* If there are threads being created, they will spawn new workers, so
+ * we don't spend time creating many threads in a loop holding a mutex or
+ * starving the current vcpu.
+ *
+ * If there are no idle threads, ask the main thread to create one, so we
+ * inherit the correct affinity instead of the vcpu affinity.
+ */
+ if (!pending_threads) {
+ qemu_bh_schedule(new_thread_bh);
+ }
+}
+
static void qemu_paio_submit(struct qemu_paiocb *aiocb)
{
aiocb->ret = -EINPROGRESS;
struct qemu_paiocb *acb, **pacb;
int ret;
int result = 0;
- int async_context_id = get_async_context_id();
for(;;) {
pacb = &s->first_aio;
if (!acb)
return result;
- /* we're only interested in requests in the right context */
- if (acb->async_context_id != async_context_id) {
- pacb = &acb->next;
- continue;
- }
-
ret = qemu_paio_error(acb);
if (ret == ECANCELED) {
/* remove the request */
static PosixAioState *posix_aio_state;
-static void aio_signal_handler(int signum)
+static void posix_aio_notify_event(void)
{
- if (posix_aio_state) {
- char byte = 0;
- ssize_t ret;
-
- ret = write(posix_aio_state->wfd, &byte, sizeof(byte));
- if (ret < 0 && errno != EAGAIN)
- die("write()");
- }
+ char byte = 0;
+ ssize_t ret;
- qemu_service_io();
+ ret = write(posix_aio_state->wfd, &byte, sizeof(byte));
+ if (ret < 0 && errno != EAGAIN)
+ die("write()");
}
static void paio_remove(struct qemu_paiocb *acb)
return NULL;
acb->aio_type = type;
acb->aio_fildes = fd;
- acb->ev_signo = SIGUSR2;
- acb->async_context_id = get_async_context_id();
if (qiov) {
acb->aio_iov = qiov->iov;
return NULL;
acb->aio_type = QEMU_AIO_IOCTL;
acb->aio_fildes = fd;
- acb->ev_signo = SIGUSR2;
- acb->async_context_id = get_async_context_id();
acb->aio_offset = 0;
acb->aio_ioctl_buf = buf;
acb->aio_ioctl_cmd = req;
int paio_init(void)
{
- struct sigaction act;
PosixAioState *s;
int fds[2];
int ret;
if (posix_aio_state)
return 0;
- s = qemu_malloc(sizeof(PosixAioState));
-
- sigfillset(&act.sa_mask);
- act.sa_flags = 0; /* do not restart syscalls to interrupt select() */
- act.sa_handler = aio_signal_handler;
- sigaction(SIGUSR2, &act, NULL);
+ s = g_malloc(sizeof(PosixAioState));
s->first_aio = NULL;
if (qemu_pipe(fds) == -1) {
fprintf(stderr, "failed to create pipe\n");
+ g_free(s);
return -1;
}
die2(ret, "pthread_attr_setdetachstate");
QTAILQ_INIT(&request_list);
+ new_thread_bh = qemu_bh_new(spawn_thread_bh_fn, NULL);
posix_aio_state = s;
return 0;