GPollFD pfd;
IOHandler *io_read;
IOHandler *io_write;
- AioFlushHandler *io_flush;
int deleted;
- int pollfds_idx;
void *opaque;
QLIST_ENTRY(AioHandler) node;
};
int fd,
IOHandler *io_read,
IOHandler *io_write,
- AioFlushHandler *io_flush,
void *opaque)
{
AioHandler *node;
} else {
if (node == NULL) {
/* Alloc and insert if it's not already there */
- node = g_malloc0(sizeof(AioHandler));
+ node = g_new0(AioHandler, 1);
node->pfd.fd = fd;
QLIST_INSERT_HEAD(&ctx->aio_handlers, node, node);
/* Update handler with latest information */
node->io_read = io_read;
node->io_write = io_write;
- node->io_flush = io_flush;
node->opaque = opaque;
- node->pollfds_idx = -1;
- node->pfd.events = (io_read ? G_IO_IN | G_IO_HUP : 0);
- node->pfd.events |= (io_write ? G_IO_OUT : 0);
+ node->pfd.events = (io_read ? G_IO_IN | G_IO_HUP | G_IO_ERR : 0);
+ node->pfd.events |= (io_write ? G_IO_OUT | G_IO_ERR : 0);
}
aio_notify(ctx);
void aio_set_event_notifier(AioContext *ctx,
EventNotifier *notifier,
- EventNotifierHandler *io_read,
- AioFlushEventNotifierHandler *io_flush)
+ EventNotifierHandler *io_read)
{
aio_set_fd_handler(ctx, event_notifier_get_fd(notifier),
- (IOHandler *)io_read, NULL,
- (AioFlushHandler *)io_flush, notifier);
+ (IOHandler *)io_read, NULL, notifier);
+}
+
+bool aio_prepare(AioContext *ctx)
+{
+ return false;
}
bool aio_pending(AioContext *ctx)
QLIST_FOREACH(node, &ctx->aio_handlers, node) {
int revents;
- /*
- * FIXME: right now we cannot get G_IO_HUP and G_IO_ERR because
- * main-loop.c is still select based (due to the slirp legacy).
- * If main-loop.c ever switches to poll, G_IO_ERR should be
- * tested too. Dispatching G_IO_ERR to both handlers should be
- * okay, since handlers need to be ready for spurious wakeups.
- */
revents = node->pfd.revents & node->pfd.events;
if (revents & (G_IO_IN | G_IO_HUP | G_IO_ERR) && node->io_read) {
return true;
return false;
}
-static bool aio_dispatch(AioContext *ctx)
+bool aio_dispatch(AioContext *ctx)
{
AioHandler *node;
bool progress = false;
/*
- * We have to walk very carefully in case qemu_aio_set_fd_handler is
+ * If there are callbacks left that have been queued, we need to call them.
+ * Do not call select in this case, because it is possible that the caller
+ * does not need a complete flush (as is the case for aio_poll loops).
+ */
+ if (aio_bh_poll(ctx)) {
+ progress = true;
+ }
+
+ /*
+ * We have to walk very carefully in case aio_set_fd_handler is
* called while we're walking.
*/
node = QLIST_FIRST(&ctx->aio_handlers);
revents = node->pfd.revents & node->pfd.events;
node->pfd.revents = 0;
- /* See comment in aio_pending. */
if (!node->deleted &&
(revents & (G_IO_IN | G_IO_HUP | G_IO_ERR)) &&
node->io_read) {
node->io_read(node->opaque);
- progress = true;
+
+ /* aio_notify() does not count as progress */
+ if (node->opaque != &ctx->notifier) {
+ progress = true;
+ }
}
if (!node->deleted &&
(revents & (G_IO_OUT | G_IO_ERR)) &&
g_free(tmp);
}
}
+
+ /* Run our timers */
+ progress |= timerlistgroup_run_timers(&ctx->tlg);
+
return progress;
}
+/* These thread-local variables are used only in a small part of aio_poll
+ * around the call to the poll() system call. In particular they are not
+ * used while aio_poll is performing callbacks, which makes it much easier
+ * to think about reentrancy!
+ *
+ * Stack-allocated arrays would be perfect but they have size limitations;
+ * heap allocation is expensive enough that we want to reuse arrays across
+ * calls to aio_poll(). And because poll() has to be called without holding
+ * any lock, the arrays cannot be stored in AioContext. Thread-local data
+ * has none of the disadvantages of these three options.
+ */
+static __thread GPollFD *pollfds;
+static __thread AioHandler **nodes;
+static __thread unsigned npfd, nalloc;
+static __thread Notifier pollfds_cleanup_notifier;
+
+static void pollfds_cleanup(Notifier *n, void *unused)
+{
+ g_assert(npfd == 0);
+ g_free(pollfds);
+ g_free(nodes);
+ nalloc = 0;
+}
+
+static void add_pollfd(AioHandler *node)
+{
+ if (npfd == nalloc) {
+ if (nalloc == 0) {
+ pollfds_cleanup_notifier.notify = pollfds_cleanup;
+ qemu_thread_atexit_add(&pollfds_cleanup_notifier);
+ nalloc = 8;
+ } else {
+ g_assert(nalloc <= INT_MAX);
+ nalloc *= 2;
+ }
+ pollfds = g_renew(GPollFD, pollfds, nalloc);
+ nodes = g_renew(AioHandler *, nodes, nalloc);
+ }
+ nodes[npfd] = node;
+ pollfds[npfd] = (GPollFD) {
+ .fd = node->pfd.fd,
+ .events = node->pfd.events,
+ };
+ npfd++;
+}
+
bool aio_poll(AioContext *ctx, bool blocking)
{
AioHandler *node;
- int ret;
- bool busy, progress;
+ int i, ret;
+ bool progress;
+ int64_t timeout;
+ aio_context_acquire(ctx);
progress = false;
- /*
- * If there are callbacks left that have been queued, we need to call them.
- * Do not call select in this case, because it is possible that the caller
- * does not need a complete flush (as is the case for qemu_aio_wait loops).
+ /* aio_notify can avoid the expensive event_notifier_set if
+ * everything (file descriptors, bottom halves, timers) will
+ * be re-evaluated before the next blocking poll(). This is
+ * already true when aio_poll is called with blocking == false;
+ * if blocking == true, it is only true after poll() returns,
+ * so disable the optimization now.
*/
- if (aio_bh_poll(ctx)) {
- blocking = false;
- progress = true;
- }
-
- if (aio_dispatch(ctx)) {
- progress = true;
- }
-
- if (progress && !blocking) {
- return true;
+ if (blocking) {
+ atomic_add(&ctx->notify_me, 2);
}
ctx->walking_handlers++;
- g_array_set_size(ctx->pollfds, 0);
+ assert(npfd == 0);
/* fill pollfds */
- busy = false;
QLIST_FOREACH(node, &ctx->aio_handlers, node) {
- node->pollfds_idx = -1;
-
- /* If there aren't pending AIO operations, don't invoke callbacks.
- * Otherwise, if there are no AIO requests, qemu_aio_wait() would
- * wait indefinitely.
- */
- if (!node->deleted && node->io_flush) {
- if (node->io_flush(node->opaque) == 0) {
- continue;
- }
- busy = true;
- }
if (!node->deleted && node->pfd.events) {
- GPollFD pfd = {
- .fd = node->pfd.fd,
- .events = node->pfd.events,
- };
- node->pollfds_idx = ctx->pollfds->len;
- g_array_append_val(ctx->pollfds, pfd);
+ add_pollfd(node);
}
}
- ctx->walking_handlers--;
+ timeout = blocking ? aio_compute_timeout(ctx) : 0;
- /* No AIO operations? Get us out of here */
- if (!busy) {
- return progress;
+ /* wait until next event */
+ if (timeout) {
+ aio_context_release(ctx);
+ }
+ ret = qemu_poll_ns((GPollFD *)pollfds, npfd, timeout);
+ if (blocking) {
+ atomic_sub(&ctx->notify_me, 2);
+ }
+ if (timeout) {
+ aio_context_acquire(ctx);
}
- /* wait until next event */
- ret = g_poll((GPollFD *)ctx->pollfds->data,
- ctx->pollfds->len,
- blocking ? -1 : 0);
+ aio_notify_accept(ctx);
/* if we have any readable fds, dispatch event */
if (ret > 0) {
- QLIST_FOREACH(node, &ctx->aio_handlers, node) {
- if (node->pollfds_idx != -1) {
- GPollFD *pfd = &g_array_index(ctx->pollfds, GPollFD,
- node->pollfds_idx);
- node->pfd.revents = pfd->revents;
- }
- }
- if (aio_dispatch(ctx)) {
- progress = true;
+ for (i = 0; i < npfd; i++) {
+ nodes[i]->pfd.revents = pollfds[i].revents;
}
}
- assert(progress || busy);
- return true;
+ npfd = 0;
+ ctx->walking_handlers--;
+
+ /* Run dispatch even if there were no readable fds to run timers */
+ if (aio_dispatch(ctx)) {
+ progress = true;
+ }
+
+ aio_context_release(ctx);
+
+ return progress;
}