#include "qemu/osdep.h"
#include "qemu-common.h"
#include "block/block.h"
-#include "qemu/queue.h"
+#include "qemu/rcu_queue.h"
#include "qemu/sockets.h"
#include "qemu/cutils.h"
#include "trace.h"
IOHandler *io_read;
IOHandler *io_write;
AioPollFn *io_poll;
+ IOHandler *io_poll_begin;
+ IOHandler *io_poll_end;
int deleted;
void *opaque;
bool is_external;
AioHandler *node;
struct epoll_event event;
- QLIST_FOREACH(node, &ctx->aio_handlers, node) {
+ QLIST_FOREACH_RCU(node, &ctx->aio_handlers, node) {
int r;
if (node->deleted || !node->pfd.events) {
continue;
bool is_new = false;
bool deleted = false;
+ qemu_lockcnt_lock(&ctx->list_lock);
+
node = find_aio_handler(ctx, fd);
/* Are we deleting the fd handler? */
if (!io_read && !io_write && !io_poll) {
if (node == NULL) {
+ qemu_lockcnt_unlock(&ctx->list_lock);
return;
}
g_source_remove_poll(&ctx->source, &node->pfd);
/* If the lock is held, just mark the node as deleted */
- if (ctx->walking_handlers) {
+ if (qemu_lockcnt_count(&ctx->list_lock)) {
node->deleted = 1;
node->pfd.revents = 0;
} else {
/* Otherwise, delete it for real. We can't just mark it as
- * deleted because deleted nodes are only cleaned up after
- * releasing the walking_handlers lock.
+ * deleted because deleted nodes are only cleaned up while
+ * no one is walking the handlers list.
*/
QLIST_REMOVE(node, node);
deleted = true;
/* Alloc and insert if it's not already there */
node = g_new0(AioHandler, 1);
node->pfd.fd = fd;
- QLIST_INSERT_HEAD(&ctx->aio_handlers, node, node);
+ QLIST_INSERT_HEAD_RCU(&ctx->aio_handlers, node, node);
g_source_add_poll(&ctx->source, &node->pfd);
is_new = true;
}
aio_epoll_update(ctx, node, is_new);
+ qemu_lockcnt_unlock(&ctx->list_lock);
aio_notify(ctx);
if (deleted) {
}
}
+void aio_set_fd_poll(AioContext *ctx, int fd,
+ IOHandler *io_poll_begin,
+ IOHandler *io_poll_end)
+{
+ AioHandler *node = find_aio_handler(ctx, fd);
+
+ if (!node) {
+ return;
+ }
+
+ node->io_poll_begin = io_poll_begin;
+ node->io_poll_end = io_poll_end;
+}
+
void aio_set_event_notifier(AioContext *ctx,
EventNotifier *notifier,
bool is_external,
(IOHandler *)io_read, NULL, io_poll, notifier);
}
+void aio_set_event_notifier_poll(AioContext *ctx,
+ EventNotifier *notifier,
+ EventNotifierHandler *io_poll_begin,
+ EventNotifierHandler *io_poll_end)
+{
+ aio_set_fd_poll(ctx, event_notifier_get_fd(notifier),
+ (IOHandler *)io_poll_begin,
+ (IOHandler *)io_poll_end);
+}
+
+static void poll_set_started(AioContext *ctx, bool started)
+{
+ AioHandler *node;
+
+ if (started == ctx->poll_started) {
+ return;
+ }
+
+ ctx->poll_started = started;
+
+ qemu_lockcnt_inc(&ctx->list_lock);
+ QLIST_FOREACH_RCU(node, &ctx->aio_handlers, node) {
+ IOHandler *fn;
+
+ if (node->deleted) {
+ continue;
+ }
+
+ if (started) {
+ fn = node->io_poll_begin;
+ } else {
+ fn = node->io_poll_end;
+ }
+
+ if (fn) {
+ fn(node->opaque);
+ }
+ }
+ qemu_lockcnt_dec(&ctx->list_lock);
+}
+
+
bool aio_prepare(AioContext *ctx)
{
+ /* Poll mode cannot be used with glib's event loop, disable it. */
+ poll_set_started(ctx, false);
+
return false;
}
bool aio_pending(AioContext *ctx)
{
AioHandler *node;
+ bool result = false;
- QLIST_FOREACH(node, &ctx->aio_handlers, node) {
+ /*
+ * We have to walk very carefully in case aio_set_fd_handler is
+ * called while we're walking.
+ */
+ qemu_lockcnt_inc(&ctx->list_lock);
+
+ QLIST_FOREACH_RCU(node, &ctx->aio_handlers, node) {
int revents;
revents = node->pfd.revents & node->pfd.events;
if (revents & (G_IO_IN | G_IO_HUP | G_IO_ERR) && node->io_read &&
aio_node_check(ctx, node->is_external)) {
- return true;
+ result = true;
+ break;
}
if (revents & (G_IO_OUT | G_IO_ERR) && node->io_write &&
aio_node_check(ctx, node->is_external)) {
- return true;
+ result = true;
+ break;
}
}
+ qemu_lockcnt_dec(&ctx->list_lock);
- return false;
+ return result;
}
-/*
- * Note that dispatch_fds == false has the side-effect of post-poning the
- * freeing of deleted handlers.
- */
-bool aio_dispatch(AioContext *ctx, bool dispatch_fds)
+static bool aio_dispatch_handlers(AioContext *ctx)
{
- AioHandler *node = NULL;
+ AioHandler *node, *tmp;
bool progress = false;
- /*
- * If there are callbacks left that have been queued, we need to call them.
- * Do not call select in this case, because it is possible that the caller
- * does not need a complete flush (as is the case for aio_poll loops).
- */
- if (aio_bh_poll(ctx)) {
- progress = true;
- }
-
/*
* We have to walk very carefully in case aio_set_fd_handler is
* called while we're walking.
*/
- if (dispatch_fds) {
- node = QLIST_FIRST(&ctx->aio_handlers);
- }
- while (node) {
- AioHandler *tmp;
- int revents;
+ qemu_lockcnt_inc(&ctx->list_lock);
- ctx->walking_handlers++;
+ QLIST_FOREACH_SAFE_RCU(node, &ctx->aio_handlers, node, tmp) {
+ int revents;
revents = node->pfd.revents & node->pfd.events;
node->pfd.revents = 0;
progress = true;
}
- tmp = node;
- node = QLIST_NEXT(node, node);
+ if (node->deleted) {
+ if (qemu_lockcnt_dec_if_lock(&ctx->list_lock)) {
+ QLIST_REMOVE(node, node);
+ g_free(node);
+ qemu_lockcnt_inc_and_unlock(&ctx->list_lock);
+ }
+ }
+ }
+
+ qemu_lockcnt_dec(&ctx->list_lock);
+ return progress;
+}
- ctx->walking_handlers--;
+/*
+ * Note that dispatch_fds == false has the side-effect of post-poning the
+ * freeing of deleted handlers.
+ */
+bool aio_dispatch(AioContext *ctx, bool dispatch_fds)
+{
+ bool progress;
- if (!ctx->walking_handlers && tmp->deleted) {
- QLIST_REMOVE(tmp, node);
- g_free(tmp);
- }
+ /*
+ * If there are callbacks left that have been queued, we need to call them.
+ * Do not call select in this case, because it is possible that the caller
+ * does not need a complete flush (as is the case for aio_poll loops).
+ */
+ progress = aio_bh_poll(ctx);
+
+ if (dispatch_fds) {
+ progress |= aio_dispatch_handlers(ctx);
}
/* Run our timers */
npfd++;
}
+static bool run_poll_handlers_once(AioContext *ctx)
+{
+ bool progress = false;
+ AioHandler *node;
+
+ QLIST_FOREACH_RCU(node, &ctx->aio_handlers, node) {
+ if (!node->deleted && node->io_poll &&
+ node->io_poll(node->opaque)) {
+ progress = true;
+ }
+
+ /* Caller handles freeing deleted nodes. Don't do it here. */
+ }
+
+ return progress;
+}
+
/* run_poll_handlers:
* @ctx: the AioContext
* @max_ns: maximum time to poll for, in nanoseconds
* Note that ctx->notify_me must be non-zero so this function can detect
* aio_notify().
*
- * Note that the caller must have incremented ctx->walking_handlers.
+ * Note that the caller must have incremented ctx->list_lock.
*
* Returns: true if progress was made, false otherwise
*/
static bool run_poll_handlers(AioContext *ctx, int64_t max_ns)
{
- bool progress = false;
+ bool progress;
int64_t end_time;
assert(ctx->notify_me);
- assert(ctx->walking_handlers > 0);
+ assert(qemu_lockcnt_count(&ctx->list_lock) > 0);
assert(ctx->poll_disable_cnt == 0);
trace_run_poll_handlers_begin(ctx, max_ns);
end_time = qemu_clock_get_ns(QEMU_CLOCK_REALTIME) + max_ns;
do {
- AioHandler *node;
-
- QLIST_FOREACH(node, &ctx->aio_handlers, node) {
- if (!node->deleted && node->io_poll &&
- node->io_poll(node->opaque)) {
- progress = true;
- }
-
- /* Caller handles freeing deleted nodes. Don't do it here. */
- }
+ progress = run_poll_handlers_once(ctx);
} while (!progress && qemu_clock_get_ns(QEMU_CLOCK_REALTIME) < end_time);
trace_run_poll_handlers_end(ctx, progress);
/* try_poll_mode:
* @ctx: the AioContext
- * @blocking: polling is only attempted when blocking is true
+ * @blocking: busy polling is only attempted when blocking is true
*
- * If blocking is true then ctx->notify_me must be non-zero so this function
- * can detect aio_notify().
+ * ctx->notify_me must be non-zero so this function can detect aio_notify().
*
- * Note that the caller must have incremented ctx->walking_handlers.
+ * Note that the caller must have incremented ctx->list_lock.
*
* Returns: true if progress was made, false otherwise
*/
if (blocking && ctx->poll_max_ns && ctx->poll_disable_cnt == 0) {
/* See qemu_soonest_timeout() uint64_t hack */
int64_t max_ns = MIN((uint64_t)aio_compute_timeout(ctx),
- (uint64_t)ctx->poll_max_ns);
+ (uint64_t)ctx->poll_ns);
if (max_ns) {
+ poll_set_started(ctx, true);
+
if (run_poll_handlers(ctx, max_ns)) {
return true;
}
}
}
- return false;
+ poll_set_started(ctx, false);
+
+ /* Even if we don't run busy polling, try polling once in case it can make
+ * progress and the caller will be able to avoid ppoll(2)/epoll_wait(2).
+ */
+ return run_poll_handlers_once(ctx);
}
bool aio_poll(AioContext *ctx, bool blocking)
int ret = 0;
bool progress;
int64_t timeout;
+ int64_t start = 0;
aio_context_acquire(ctx);
progress = false;
atomic_add(&ctx->notify_me, 2);
}
- ctx->walking_handlers++;
+ qemu_lockcnt_inc(&ctx->list_lock);
+
+ if (ctx->poll_max_ns) {
+ start = qemu_clock_get_ns(QEMU_CLOCK_REALTIME);
+ }
if (try_poll_mode(ctx, blocking)) {
progress = true;
/* fill pollfds */
if (!aio_epoll_enabled(ctx)) {
- QLIST_FOREACH(node, &ctx->aio_handlers, node) {
+ QLIST_FOREACH_RCU(node, &ctx->aio_handlers, node) {
if (!node->deleted && node->pfd.events
&& aio_node_check(ctx, node->is_external)) {
add_pollfd(node);
atomic_sub(&ctx->notify_me, 2);
}
+ /* Adjust polling time */
+ if (ctx->poll_max_ns) {
+ int64_t block_ns = qemu_clock_get_ns(QEMU_CLOCK_REALTIME) - start;
+
+ if (block_ns <= ctx->poll_ns) {
+ /* This is the sweet spot, no adjustment needed */
+ } else if (block_ns > ctx->poll_max_ns) {
+ /* We'd have to poll for too long, poll less */
+ int64_t old = ctx->poll_ns;
+
+ if (ctx->poll_shrink) {
+ ctx->poll_ns /= ctx->poll_shrink;
+ } else {
+ ctx->poll_ns = 0;
+ }
+
+ trace_poll_shrink(ctx, old, ctx->poll_ns);
+ } else if (ctx->poll_ns < ctx->poll_max_ns &&
+ block_ns < ctx->poll_max_ns) {
+ /* There is room to grow, poll longer */
+ int64_t old = ctx->poll_ns;
+ int64_t grow = ctx->poll_grow;
+
+ if (grow == 0) {
+ grow = 2;
+ }
+
+ if (ctx->poll_ns) {
+ ctx->poll_ns *= grow;
+ } else {
+ ctx->poll_ns = 4000; /* start polling at 4 microseconds */
+ }
+
+ if (ctx->poll_ns > ctx->poll_max_ns) {
+ ctx->poll_ns = ctx->poll_max_ns;
+ }
+
+ trace_poll_grow(ctx, old, ctx->poll_ns);
+ }
+ }
+
aio_notify_accept(ctx);
/* if we have any readable fds, dispatch event */
}
npfd = 0;
- ctx->walking_handlers--;
+ qemu_lockcnt_dec(&ctx->list_lock);
/* Run dispatch even if there were no readable fds to run timers */
if (aio_dispatch(ctx, ret > 0)) {
#endif
}
-void aio_context_set_poll_params(AioContext *ctx, int64_t max_ns, Error **errp)
+void aio_context_set_poll_params(AioContext *ctx, int64_t max_ns,
+ int64_t grow, int64_t shrink, Error **errp)
{
- /* No thread synchronization here, it doesn't matter if an incorrect poll
- * timeout is used once.
+ /* No thread synchronization here, it doesn't matter if an incorrect value
+ * is used once.
*/
ctx->poll_max_ns = max_ns;
+ ctx->poll_ns = 0;
+ ctx->poll_grow = grow;
+ ctx->poll_shrink = shrink;
aio_notify(ctx);
}