* GNU GPL, version 2 or (at your option) any later version.
*/
+#include "qemu/osdep.h"
#include "qemu-common.h"
#include "block/block.h"
#include "qemu/queue.h"
#include "qemu/sockets.h"
+#include "qapi/error.h"
+#include "qemu/rcu_queue.h"
struct AioHandler {
EventNotifier *e;
GPollFD pfd;
int deleted;
void *opaque;
+ bool is_external;
QLIST_ENTRY(AioHandler) node;
};
void aio_set_fd_handler(AioContext *ctx,
int fd,
+ bool is_external,
IOHandler *io_read,
IOHandler *io_write,
+ AioPollFn *io_poll,
void *opaque)
{
/* fd is a SOCKET in our case */
AioHandler *node;
+ qemu_lockcnt_lock(&ctx->list_lock);
QLIST_FOREACH(node, &ctx->aio_handlers, node) {
if (node->pfd.fd == fd && !node->deleted) {
break;
/* Are we deleting the fd handler? */
if (!io_read && !io_write) {
if (node) {
- /* If the lock is held, just mark the node as deleted */
- if (ctx->walking_handlers) {
+ /* If aio_poll is in progress, just mark the node as deleted */
+ if (qemu_lockcnt_count(&ctx->list_lock)) {
node->deleted = 1;
node->pfd.revents = 0;
} else {
/* Otherwise, delete it for real. We can't just mark it as
* deleted because deleted nodes are only cleaned up after
- * releasing the walking_handlers lock.
+ * releasing the list_lock.
*/
QLIST_REMOVE(node, node);
g_free(node);
/* Alloc and insert if it's not already there */
node = g_new0(AioHandler, 1);
node->pfd.fd = fd;
- QLIST_INSERT_HEAD(&ctx->aio_handlers, node, node);
+ QLIST_INSERT_HEAD_RCU(&ctx->aio_handlers, node, node);
}
node->pfd.events = 0;
node->opaque = opaque;
node->io_read = io_read;
node->io_write = io_write;
+ node->is_external = is_external;
event = event_notifier_get_handle(&ctx->notifier);
WSAEventSelect(node->pfd.fd, event,
FD_CONNECT | FD_WRITE | FD_OOB);
}
+ qemu_lockcnt_unlock(&ctx->list_lock);
aio_notify(ctx);
}
+void aio_set_fd_poll(AioContext *ctx, int fd,
+ IOHandler *io_poll_begin,
+ IOHandler *io_poll_end)
+{
+ /* Not implemented */
+}
+
void aio_set_event_notifier(AioContext *ctx,
EventNotifier *e,
- EventNotifierHandler *io_notify)
+ bool is_external,
+ EventNotifierHandler *io_notify,
+ AioPollFn *io_poll)
{
AioHandler *node;
+ qemu_lockcnt_lock(&ctx->list_lock);
QLIST_FOREACH(node, &ctx->aio_handlers, node) {
if (node->e == e && !node->deleted) {
break;
if (node) {
g_source_remove_poll(&ctx->source, &node->pfd);
- /* If the lock is held, just mark the node as deleted */
- if (ctx->walking_handlers) {
+ /* aio_poll is in progress, just mark the node as deleted */
+ if (qemu_lockcnt_count(&ctx->list_lock)) {
node->deleted = 1;
node->pfd.revents = 0;
} else {
/* Otherwise, delete it for real. We can't just mark it as
* deleted because deleted nodes are only cleaned up after
- * releasing the walking_handlers lock.
+ * releasing the list_lock.
*/
QLIST_REMOVE(node, node);
g_free(node);
node->e = e;
node->pfd.fd = (uintptr_t)event_notifier_get_handle(e);
node->pfd.events = G_IO_IN;
- QLIST_INSERT_HEAD(&ctx->aio_handlers, node, node);
+ node->is_external = is_external;
+ QLIST_INSERT_HEAD_RCU(&ctx->aio_handlers, node, node);
g_source_add_poll(&ctx->source, &node->pfd);
}
node->io_notify = io_notify;
}
+ qemu_lockcnt_unlock(&ctx->list_lock);
aio_notify(ctx);
}
+void aio_set_event_notifier_poll(AioContext *ctx,
+ EventNotifier *notifier,
+ EventNotifierHandler *io_poll_begin,
+ EventNotifierHandler *io_poll_end)
+{
+ /* Not implemented */
+}
+
bool aio_prepare(AioContext *ctx)
{
static struct timeval tv0;
bool have_select_revents = false;
fd_set rfds, wfds;
+ /*
+ * We have to walk very carefully in case aio_set_fd_handler is
+ * called while we're walking.
+ */
+ qemu_lockcnt_inc(&ctx->list_lock);
+
/* fill fd sets */
FD_ZERO(&rfds);
FD_ZERO(&wfds);
- QLIST_FOREACH(node, &ctx->aio_handlers, node) {
+ QLIST_FOREACH_RCU(node, &ctx->aio_handlers, node) {
if (node->io_read) {
FD_SET ((SOCKET)node->pfd.fd, &rfds);
}
}
if (select(0, &rfds, &wfds, NULL, &tv0) > 0) {
- QLIST_FOREACH(node, &ctx->aio_handlers, node) {
+ QLIST_FOREACH_RCU(node, &ctx->aio_handlers, node) {
node->pfd.revents = 0;
if (FD_ISSET(node->pfd.fd, &rfds)) {
node->pfd.revents |= G_IO_IN;
}
}
+ qemu_lockcnt_dec(&ctx->list_lock);
return have_select_revents;
}
bool aio_pending(AioContext *ctx)
{
AioHandler *node;
+ bool result = false;
- QLIST_FOREACH(node, &ctx->aio_handlers, node) {
+ /*
+ * We have to walk very carefully in case aio_set_fd_handler is
+ * called while we're walking.
+ */
+ qemu_lockcnt_inc(&ctx->list_lock);
+ QLIST_FOREACH_RCU(node, &ctx->aio_handlers, node) {
if (node->pfd.revents && node->io_notify) {
- return true;
+ result = true;
+ break;
}
if ((node->pfd.revents & G_IO_IN) && node->io_read) {
- return true;
+ result = true;
+ break;
}
if ((node->pfd.revents & G_IO_OUT) && node->io_write) {
- return true;
+ result = true;
+ break;
}
}
- return false;
+ qemu_lockcnt_dec(&ctx->list_lock);
+ return result;
}
static bool aio_dispatch_handlers(AioContext *ctx, HANDLE event)
{
AioHandler *node;
bool progress = false;
+ AioHandler *tmp;
+
+ qemu_lockcnt_inc(&ctx->list_lock);
/*
* We have to walk very carefully in case aio_set_fd_handler is
* called while we're walking.
*/
- node = QLIST_FIRST(&ctx->aio_handlers);
- while (node) {
- AioHandler *tmp;
+ QLIST_FOREACH_SAFE_RCU(node, &ctx->aio_handlers, node, tmp) {
int revents = node->pfd.revents;
- ctx->walking_handlers++;
-
if (!node->deleted &&
(revents || event_notifier_get_handle(node->e) == event) &&
node->io_notify) {
}
}
- tmp = node;
- node = QLIST_NEXT(node, node);
-
- ctx->walking_handlers--;
-
- if (!ctx->walking_handlers && tmp->deleted) {
- QLIST_REMOVE(tmp, node);
- g_free(tmp);
+ if (node->deleted) {
+ if (qemu_lockcnt_dec_if_lock(&ctx->list_lock)) {
+ QLIST_REMOVE(node, node);
+ g_free(node);
+ qemu_lockcnt_inc_and_unlock(&ctx->list_lock);
+ }
}
}
+ qemu_lockcnt_dec(&ctx->list_lock);
return progress;
}
-bool aio_dispatch(AioContext *ctx)
+bool aio_dispatch(AioContext *ctx, bool dispatch_fds)
{
bool progress;
progress = aio_bh_poll(ctx);
- progress |= aio_dispatch_handlers(ctx, INVALID_HANDLE_VALUE);
+ if (dispatch_fds) {
+ progress |= aio_dispatch_handlers(ctx, INVALID_HANDLE_VALUE);
+ }
progress |= timerlistgroup_run_timers(&ctx->tlg);
return progress;
}
{
AioHandler *node;
HANDLE events[MAXIMUM_WAIT_OBJECTS + 1];
- bool was_dispatching, progress, have_select_revents, first;
+ bool progress, have_select_revents, first;
int count;
int timeout;
aio_context_acquire(ctx);
- was_dispatching = ctx->dispatching;
progress = false;
/* aio_notify can avoid the expensive event_notifier_set if
* everything (file descriptors, bottom halves, timers) will
* be re-evaluated before the next blocking poll(). This is
* already true when aio_poll is called with blocking == false;
- * if blocking == true, it is only true after poll() returns.
- *
- * If we're in a nested event loop, ctx->dispatching might be true.
- * In that case we can restore it just before returning, but we
- * have to clear it now.
+ * if blocking == true, it is only true after poll() returns,
+ * so disable the optimization now.
*/
- aio_set_dispatching(ctx, !blocking);
+ if (blocking) {
+ atomic_add(&ctx->notify_me, 2);
+ }
+ qemu_lockcnt_inc(&ctx->list_lock);
have_select_revents = aio_prepare(ctx);
- ctx->walking_handlers++;
-
/* fill fd sets */
count = 0;
- QLIST_FOREACH(node, &ctx->aio_handlers, node) {
- if (!node->deleted && node->io_notify) {
+ QLIST_FOREACH_RCU(node, &ctx->aio_handlers, node) {
+ if (!node->deleted && node->io_notify
+ && aio_node_check(ctx, node->is_external)) {
events[count++] = event_notifier_get_handle(node->e);
}
}
- ctx->walking_handlers--;
+ qemu_lockcnt_dec(&ctx->list_lock);
first = true;
/* ctx->notifier is always registered. */
aio_context_release(ctx);
}
ret = WaitForMultipleObjects(count, events, FALSE, timeout);
+ if (blocking) {
+ assert(first);
+ atomic_sub(&ctx->notify_me, 2);
+ }
if (timeout) {
aio_context_acquire(ctx);
}
- aio_set_dispatching(ctx, true);
- if (first && aio_bh_poll(ctx)) {
- progress = true;
+ if (first) {
+ aio_notify_accept(ctx);
+ progress |= aio_bh_poll(ctx);
+ first = false;
}
- first = false;
/* if we have any signaled events, dispatch event */
event = NULL;
progress |= timerlistgroup_run_timers(&ctx->tlg);
- aio_set_dispatching(ctx, was_dispatching);
aio_context_release(ctx);
return progress;
}
+
+void aio_context_setup(AioContext *ctx)
+{
+}
+
+void aio_context_set_poll_params(AioContext *ctx, int64_t max_ns,
+ int64_t grow, int64_t shrink, Error **errp)
+{
+ error_setg(errp, "AioContext polling is not implemented on Windows");
+}