*/
#include "qemu/osdep.h"
-#include "qemu-common.h"
#include "block/block.h"
#include "qemu/rcu_queue.h"
#include "qemu/sockets.h"
#ifdef CONFIG_EPOLL_CREATE1
-/* The fd number threashold to switch to epoll */
+/* The fd number threshold to switch to epoll */
#define EPOLL_ENABLE_THRESHOLD 64
static void aio_epoll_disable(AioContext *ctx)
{
- ctx->epoll_available = false;
- if (!ctx->epoll_enabled) {
+ ctx->epoll_enabled = false;
+ if (!ctx->epoll_available) {
return;
}
- ctx->epoll_enabled = false;
+ ctx->epoll_available = false;
close(ctx->epollfd);
}
}
if (timeout <= 0 || ret > 0) {
ret = epoll_wait(ctx->epollfd, events,
- sizeof(events) / sizeof(events[0]),
+ ARRAY_SIZE(events),
timeout);
if (ret <= 0) {
goto out;
return NULL;
}
+static bool aio_remove_fd_handler(AioContext *ctx, AioHandler *node)
+{
+ /* If the GSource is in the process of being destroyed then
+ * g_source_remove_poll() causes an assertion failure. Skip
+ * removal in that case, because glib cleans up its state during
+ * destruction anyway.
+ */
+ if (!g_source_is_destroyed(&ctx->source)) {
+ g_source_remove_poll(&ctx->source, &node->pfd);
+ }
+
+ /* If a read is in progress, just mark the node as deleted */
+ if (qemu_lockcnt_count(&ctx->list_lock)) {
+ node->deleted = 1;
+ node->pfd.revents = 0;
+ return false;
+ }
+ /* Otherwise, delete it for real. We can't just mark it as
+ * deleted because deleted nodes are only cleaned up while
+ * no one is walking the handlers list.
+ */
+ QLIST_REMOVE(node, node);
+ return true;
+}
+
void aio_set_fd_handler(AioContext *ctx,
int fd,
bool is_external,
void *opaque)
{
AioHandler *node;
+ AioHandler *new_node = NULL;
bool is_new = false;
bool deleted = false;
+ int poll_disable_change;
qemu_lockcnt_lock(&ctx->list_lock);
qemu_lockcnt_unlock(&ctx->list_lock);
return;
}
+ /* Clean events in order to unregister fd from the ctx epoll. */
+ node->pfd.events = 0;
- g_source_remove_poll(&ctx->source, &node->pfd);
-
- /* If the lock is held, just mark the node as deleted */
- if (qemu_lockcnt_count(&ctx->list_lock)) {
- node->deleted = 1;
- node->pfd.revents = 0;
- } else {
- /* Otherwise, delete it for real. We can't just mark it as
- * deleted because deleted nodes are only cleaned up while
- * no one is walking the handlers list.
- */
- QLIST_REMOVE(node, node);
- deleted = true;
- }
-
- if (!node->io_poll) {
- ctx->poll_disable_cnt--;
- }
+ poll_disable_change = -!node->io_poll;
} else {
+ poll_disable_change = !io_poll - (node && !node->io_poll);
if (node == NULL) {
- /* Alloc and insert if it's not already there */
- node = g_new0(AioHandler, 1);
- node->pfd.fd = fd;
- QLIST_INSERT_HEAD_RCU(&ctx->aio_handlers, node, node);
-
- g_source_add_poll(&ctx->source, &node->pfd);
is_new = true;
+ }
+ /* Alloc and insert if it's not already there */
+ new_node = g_new0(AioHandler, 1);
- ctx->poll_disable_cnt += !io_poll;
+ /* Update handler with latest information */
+ new_node->io_read = io_read;
+ new_node->io_write = io_write;
+ new_node->io_poll = io_poll;
+ new_node->opaque = opaque;
+ new_node->is_external = is_external;
+
+ if (is_new) {
+ new_node->pfd.fd = fd;
} else {
- ctx->poll_disable_cnt += !io_poll - !node->io_poll;
+ new_node->pfd = node->pfd;
}
+ g_source_add_poll(&ctx->source, &new_node->pfd);
- /* Update handler with latest information */
- node->io_read = io_read;
- node->io_write = io_write;
- node->io_poll = io_poll;
- node->opaque = opaque;
- node->is_external = is_external;
+ new_node->pfd.events = (io_read ? G_IO_IN | G_IO_HUP | G_IO_ERR : 0);
+ new_node->pfd.events |= (io_write ? G_IO_OUT | G_IO_ERR : 0);
- node->pfd.events = (io_read ? G_IO_IN | G_IO_HUP | G_IO_ERR : 0);
- node->pfd.events |= (io_write ? G_IO_OUT | G_IO_ERR : 0);
+ QLIST_INSERT_HEAD_RCU(&ctx->aio_handlers, new_node, node);
+ }
+ if (node) {
+ deleted = aio_remove_fd_handler(ctx, node);
}
- aio_epoll_update(ctx, node, is_new);
+ /* No need to order poll_disable_cnt writes against other updates;
+ * the counter is only used to avoid wasting time and latency on
+ * iterated polling when the system call will be ultimately necessary.
+ * Changing handlers is a rare event, and a little wasted polling until
+ * the aio_notify below is not an issue.
+ */
+ atomic_set(&ctx->poll_disable_cnt,
+ atomic_read(&ctx->poll_disable_cnt) + poll_disable_change);
+
+ if (new_node) {
+ aio_epoll_update(ctx, new_node, is_new);
+ } else if (node) {
+ /* Unregister deleted fd_handler */
+ aio_epoll_update(ctx, node, false);
+ }
qemu_lockcnt_unlock(&ctx->list_lock);
aio_notify(ctx);
AioHandler *node, *tmp;
bool progress = false;
- /*
- * We have to walk very carefully in case aio_set_fd_handler is
- * called while we're walking.
- */
- qemu_lockcnt_inc(&ctx->list_lock);
-
QLIST_FOREACH_SAFE_RCU(node, &ctx->aio_handlers, node, tmp) {
int revents;
(revents & (G_IO_IN | G_IO_HUP | G_IO_ERR)) &&
aio_node_check(ctx, node->is_external) &&
node->io_read) {
- aio_context_acquire(ctx);
node->io_read(node->opaque);
- aio_context_release(ctx);
/* aio_notify() does not count as progress */
if (node->opaque != &ctx->notifier) {
(revents & (G_IO_OUT | G_IO_ERR)) &&
aio_node_check(ctx, node->is_external) &&
node->io_write) {
- aio_context_acquire(ctx);
node->io_write(node->opaque);
- aio_context_release(ctx);
progress = true;
}
}
}
- qemu_lockcnt_dec(&ctx->list_lock);
return progress;
}
-/*
- * Note that dispatch_fds == false has the side-effect of post-poning the
- * freeing of deleted handlers.
- */
-bool aio_dispatch(AioContext *ctx, bool dispatch_fds)
+void aio_dispatch(AioContext *ctx)
{
- bool progress;
-
- /*
- * If there are callbacks left that have been queued, we need to call them.
- * Do not call select in this case, because it is possible that the caller
- * does not need a complete flush (as is the case for aio_poll loops).
- */
- progress = aio_bh_poll(ctx);
-
- if (dispatch_fds) {
- progress |= aio_dispatch_handlers(ctx);
- }
-
- /* Run our timers */
- progress |= timerlistgroup_run_timers(&ctx->tlg);
+ qemu_lockcnt_inc(&ctx->list_lock);
+ aio_bh_poll(ctx);
+ aio_dispatch_handlers(ctx);
+ qemu_lockcnt_dec(&ctx->list_lock);
- return progress;
+ timerlistgroup_run_timers(&ctx->tlg);
}
/* These thread-local variables are used only in a small part of aio_poll
npfd++;
}
-static bool run_poll_handlers_once(AioContext *ctx)
+static bool run_poll_handlers_once(AioContext *ctx, int64_t *timeout)
{
bool progress = false;
AioHandler *node;
if (!node->deleted && node->io_poll &&
aio_node_check(ctx, node->is_external) &&
node->io_poll(node->opaque)) {
- progress = true;
+ /*
+ * Polling was successful, exit try_poll_mode immediately
+ * to adjust the next polling time.
+ */
+ *timeout = 0;
+ if (node->opaque != &ctx->notifier) {
+ progress = true;
+ }
}
/* Caller handles freeing deleted nodes. Don't do it here. */
*
* Returns: true if progress was made, false otherwise
*/
-static bool run_poll_handlers(AioContext *ctx, int64_t max_ns)
+static bool run_poll_handlers(AioContext *ctx, int64_t max_ns, int64_t *timeout)
{
bool progress;
- int64_t end_time;
+ int64_t start_time, elapsed_time;
assert(ctx->notify_me);
assert(qemu_lockcnt_count(&ctx->list_lock) > 0);
- assert(ctx->poll_disable_cnt == 0);
- trace_run_poll_handlers_begin(ctx, max_ns);
-
- end_time = qemu_clock_get_ns(QEMU_CLOCK_REALTIME) + max_ns;
+ trace_run_poll_handlers_begin(ctx, max_ns, *timeout);
+ start_time = qemu_clock_get_ns(QEMU_CLOCK_REALTIME);
do {
- progress = run_poll_handlers_once(ctx);
- } while (!progress && qemu_clock_get_ns(QEMU_CLOCK_REALTIME) < end_time);
-
- trace_run_poll_handlers_end(ctx, progress);
+ progress = run_poll_handlers_once(ctx, timeout);
+ elapsed_time = qemu_clock_get_ns(QEMU_CLOCK_REALTIME) - start_time;
+ max_ns = qemu_soonest_timeout(*timeout, max_ns);
+ assert(!(max_ns && progress));
+ } while (elapsed_time < max_ns && !atomic_read(&ctx->poll_disable_cnt));
+
+ /* If time has passed with no successful polling, adjust *timeout to
+ * keep the same ending time.
+ */
+ if (*timeout != -1) {
+ *timeout -= MIN(*timeout, elapsed_time);
+ }
+ trace_run_poll_handlers_end(ctx, progress, *timeout);
return progress;
}
/* try_poll_mode:
* @ctx: the AioContext
- * @blocking: busy polling is only attempted when blocking is true
+ * @timeout: timeout for blocking wait, computed by the caller and updated if
+ * polling succeeds.
*
* ctx->notify_me must be non-zero so this function can detect aio_notify().
*
*
* Returns: true if progress was made, false otherwise
*/
-static bool try_poll_mode(AioContext *ctx, bool blocking)
+static bool try_poll_mode(AioContext *ctx, int64_t *timeout)
{
- if (blocking && ctx->poll_max_ns && ctx->poll_disable_cnt == 0) {
- /* See qemu_soonest_timeout() uint64_t hack */
- int64_t max_ns = MIN((uint64_t)aio_compute_timeout(ctx),
- (uint64_t)ctx->poll_ns);
+ int64_t max_ns = qemu_soonest_timeout(*timeout, ctx->poll_ns);
- if (max_ns) {
- poll_set_started(ctx, true);
+ if (max_ns && !atomic_read(&ctx->poll_disable_cnt)) {
+ poll_set_started(ctx, true);
- if (run_poll_handlers(ctx, max_ns)) {
- return true;
- }
+ if (run_poll_handlers(ctx, max_ns, timeout)) {
+ return true;
}
}
/* Even if we don't run busy polling, try polling once in case it can make
* progress and the caller will be able to avoid ppoll(2)/epoll_wait(2).
*/
- return run_poll_handlers_once(ctx);
+ return run_poll_handlers_once(ctx, timeout);
}
bool aio_poll(AioContext *ctx, bool blocking)
int64_t timeout;
int64_t start = 0;
+ assert(in_aio_context_home_thread(ctx));
+
/* aio_notify can avoid the expensive event_notifier_set if
* everything (file descriptors, bottom halves, timers) will
* be re-evaluated before the next blocking poll(). This is
start = qemu_clock_get_ns(QEMU_CLOCK_REALTIME);
}
- aio_context_acquire(ctx);
- progress = try_poll_mode(ctx, blocking);
- aio_context_release(ctx);
+ timeout = blocking ? aio_compute_timeout(ctx) : 0;
+ progress = try_poll_mode(ctx, &timeout);
+ assert(!(timeout && progress));
- if (!progress) {
+ /* If polling is allowed, non-blocking aio_poll does not need the
+ * system call---a single round of run_poll_handlers_once suffices.
+ */
+ if (timeout || atomic_read(&ctx->poll_disable_cnt)) {
assert(npfd == 0);
/* fill pollfds */
}
}
- timeout = blocking ? aio_compute_timeout(ctx) : 0;
-
/* wait until next event */
if (aio_epoll_check_poll(ctx, pollfds, npfd, timeout)) {
AioHandler epoll_handler;
if (blocking) {
atomic_sub(&ctx->notify_me, 2);
+ aio_notify_accept(ctx);
}
/* Adjust polling time */
}
}
- aio_notify_accept(ctx);
-
/* if we have any readable fds, dispatch event */
if (ret > 0) {
for (i = 0; i < npfd; i++) {
}
npfd = 0;
- qemu_lockcnt_dec(&ctx->list_lock);
- /* Run dispatch even if there were no readable fds to run timers */
- if (aio_dispatch(ctx, ret > 0)) {
- progress = true;
+ progress |= aio_bh_poll(ctx);
+
+ if (ret > 0) {
+ progress |= aio_dispatch_handlers(ctx);
}
+ qemu_lockcnt_dec(&ctx->list_lock);
+
+ progress |= timerlistgroup_run_timers(&ctx->tlg);
+
return progress;
}
void aio_context_setup(AioContext *ctx)
{
- /* TODO remove this in final patch submission */
- if (getenv("QEMU_AIO_POLL_MAX_NS")) {
- fprintf(stderr, "The QEMU_AIO_POLL_MAX_NS environment variable has "
- "been replaced with -object iothread,poll-max-ns=NUM\n");
- exit(1);
- }
-
#ifdef CONFIG_EPOLL_CREATE1
assert(!ctx->epollfd);
ctx->epollfd = epoll_create1(EPOLL_CLOEXEC);
#endif
}
+void aio_context_destroy(AioContext *ctx)
+{
+#ifdef CONFIG_EPOLL_CREATE1
+ aio_epoll_disable(ctx);
+#endif
+}
+
void aio_context_set_poll_params(AioContext *ctx, int64_t max_ns,
int64_t grow, int64_t shrink, Error **errp)
{