X-Git-Url: https://repo.jachan.dev/qemu.git/blobdiff_plain/501a7ce7270955be151c442c27620fa7af2f3ce5..96f8f23a1e900796494a54e8b56610e1a7db2a89:/aio-win32.c diff --git a/aio-win32.c b/aio-win32.c index f5ea027f8c..d81313b00b 100644 --- a/aio-win32.c +++ b/aio-win32.c @@ -22,17 +22,83 @@ struct AioHandler { EventNotifier *e; + IOHandler *io_read; + IOHandler *io_write; EventNotifierHandler *io_notify; - AioFlushEventNotifierHandler *io_flush; GPollFD pfd; int deleted; + void *opaque; QLIST_ENTRY(AioHandler) node; }; +void aio_set_fd_handler(AioContext *ctx, + int fd, + IOHandler *io_read, + IOHandler *io_write, + void *opaque) +{ + /* fd is a SOCKET in our case */ + AioHandler *node; + + QLIST_FOREACH(node, &ctx->aio_handlers, node) { + if (node->pfd.fd == fd && !node->deleted) { + break; + } + } + + /* Are we deleting the fd handler? */ + if (!io_read && !io_write) { + if (node) { + /* If the lock is held, just mark the node as deleted */ + if (ctx->walking_handlers) { + node->deleted = 1; + node->pfd.revents = 0; + } else { + /* Otherwise, delete it for real. We can't just mark it as + * deleted because deleted nodes are only cleaned up after + * releasing the walking_handlers lock. + */ + QLIST_REMOVE(node, node); + g_free(node); + } + } + } else { + HANDLE event; + + if (node == NULL) { + /* Alloc and insert if it's not already there */ + node = g_malloc0(sizeof(AioHandler)); + node->pfd.fd = fd; + QLIST_INSERT_HEAD(&ctx->aio_handlers, node, node); + } + + node->pfd.events = 0; + if (node->io_read) { + node->pfd.events |= G_IO_IN; + } + if (node->io_write) { + node->pfd.events |= G_IO_OUT; + } + + node->e = &ctx->notifier; + + /* Update handler with latest information */ + node->opaque = opaque; + node->io_read = io_read; + node->io_write = io_write; + + event = event_notifier_get_handle(&ctx->notifier); + WSAEventSelect(node->pfd.fd, event, + FD_READ | FD_ACCEPT | FD_CLOSE | + FD_CONNECT | FD_WRITE | FD_OOB); + } + + aio_notify(ctx); +} + void aio_set_event_notifier(AioContext *ctx, EventNotifier *e, - EventNotifierHandler *io_notify, - AioFlushEventNotifierHandler *io_flush) + EventNotifierHandler *io_notify) { AioHandler *node; @@ -73,12 +139,48 @@ void aio_set_event_notifier(AioContext *ctx, } /* Update handler with latest information */ node->io_notify = io_notify; - node->io_flush = io_flush; } aio_notify(ctx); } +bool aio_prepare(AioContext *ctx) +{ + static struct timeval tv0; + AioHandler *node; + bool have_select_revents = false; + fd_set rfds, wfds; + + /* fill fd sets */ + FD_ZERO(&rfds); + FD_ZERO(&wfds); + QLIST_FOREACH(node, &ctx->aio_handlers, node) { + if (node->io_read) { + FD_SET ((SOCKET)node->pfd.fd, &rfds); + } + if (node->io_write) { + FD_SET ((SOCKET)node->pfd.fd, &wfds); + } + } + + if (select(0, &rfds, &wfds, NULL, &tv0) > 0) { + QLIST_FOREACH(node, &ctx->aio_handlers, node) { + node->pfd.revents = 0; + if (FD_ISSET(node->pfd.fd, &rfds)) { + node->pfd.revents |= G_IO_IN; + have_select_revents = true; + } + + if (FD_ISSET(node->pfd.fd, &wfds)) { + node->pfd.revents |= G_IO_OUT; + have_select_revents = true; + } + } + } + + return have_select_revents; +} + bool aio_pending(AioContext *ctx) { AioHandler *node; @@ -87,46 +189,66 @@ bool aio_pending(AioContext *ctx) if (node->pfd.revents && node->io_notify) { return true; } + + if ((node->pfd.revents & G_IO_IN) && node->io_read) { + return true; + } + if ((node->pfd.revents & G_IO_OUT) && node->io_write) { + return true; + } } return false; } -bool aio_poll(AioContext *ctx, bool blocking) +static bool aio_dispatch_handlers(AioContext *ctx, HANDLE event) { AioHandler *node; - HANDLE events[MAXIMUM_WAIT_OBJECTS + 1]; - bool busy, progress; - int count; - - progress = false; - - /* - * If there are callbacks left that have been queued, we need to call then. - * Do not call select in this case, because it is possible that the caller - * does not need a complete flush (as is the case for qemu_aio_wait loops). - */ - if (aio_bh_poll(ctx)) { - blocking = false; - progress = true; - } + bool progress = false; /* - * Then dispatch any pending callbacks from the GSource. - * - * We have to walk very carefully in case qemu_aio_set_fd_handler is + * We have to walk very carefully in case aio_set_fd_handler is * called while we're walking. */ node = QLIST_FIRST(&ctx->aio_handlers); while (node) { AioHandler *tmp; + int revents = node->pfd.revents; ctx->walking_handlers++; - if (node->pfd.revents && node->io_notify) { + if (!node->deleted && + (revents || event_notifier_get_handle(node->e) == event) && + node->io_notify) { node->pfd.revents = 0; node->io_notify(node->e); - progress = true; + + /* aio_notify() does not count as progress */ + if (node->e != &ctx->notifier) { + progress = true; + } + } + + if (!node->deleted && + (node->io_read || node->io_write)) { + node->pfd.revents = 0; + if ((revents & G_IO_IN) && node->io_read) { + node->io_read(node->opaque); + progress = true; + } + if ((revents & G_IO_OUT) && node->io_write) { + node->io_write(node->opaque); + progress = true; + } + + /* if the next select() will return an event, we have progressed */ + if (event == event_notifier_get_handle(&ctx->notifier)) { + WSANETWORKEVENTS ev; + WSAEnumNetworkEvents(node->pfd.fd, event, &ev); + if (ev.lNetworkEvents) { + progress = true; + } + } } tmp = node; @@ -140,79 +262,92 @@ bool aio_poll(AioContext *ctx, bool blocking) } } - if (progress && !blocking) { - return true; + return progress; +} + +bool aio_dispatch(AioContext *ctx) +{ + bool progress; + + progress = aio_bh_poll(ctx); + progress |= aio_dispatch_handlers(ctx, INVALID_HANDLE_VALUE); + progress |= timerlistgroup_run_timers(&ctx->tlg); + return progress; +} + +bool aio_poll(AioContext *ctx, bool blocking) +{ + AioHandler *node; + HANDLE events[MAXIMUM_WAIT_OBJECTS + 1]; + bool was_dispatching, progress, have_select_revents, first; + int count; + int timeout; + + have_select_revents = aio_prepare(ctx); + if (have_select_revents) { + blocking = false; } + was_dispatching = ctx->dispatching; + progress = false; + + /* aio_notify can avoid the expensive event_notifier_set if + * everything (file descriptors, bottom halves, timers) will + * be re-evaluated before the next blocking poll(). This is + * already true when aio_poll is called with blocking == false; + * if blocking == true, it is only true after poll() returns. + * + * If we're in a nested event loop, ctx->dispatching might be true. + * In that case we can restore it just before returning, but we + * have to clear it now. + */ + aio_set_dispatching(ctx, !blocking); + ctx->walking_handlers++; /* fill fd sets */ - busy = false; count = 0; QLIST_FOREACH(node, &ctx->aio_handlers, node) { - /* If there aren't pending AIO operations, don't invoke callbacks. - * Otherwise, if there are no AIO requests, qemu_aio_wait() would - * wait indefinitely. - */ - if (!node->deleted && node->io_flush) { - if (node->io_flush(node->e) == 0) { - continue; - } - busy = true; - } if (!node->deleted && node->io_notify) { events[count++] = event_notifier_get_handle(node->e); } } ctx->walking_handlers--; - - /* No AIO operations? Get us out of here */ - if (!busy) { - return progress; - } + first = true; /* wait until next event */ while (count > 0) { - int timeout = blocking ? INFINITE : 0; - int ret = WaitForMultipleObjects(count, events, FALSE, timeout); + HANDLE event; + int ret; + + timeout = blocking + ? qemu_timeout_ns_to_ms(aio_compute_timeout(ctx)) : 0; + ret = WaitForMultipleObjects(count, events, FALSE, timeout); + aio_set_dispatching(ctx, true); + + if (first && aio_bh_poll(ctx)) { + progress = true; + } + first = false; /* if we have any signaled events, dispatch event */ - if ((DWORD) (ret - WAIT_OBJECT_0) >= count) { + event = NULL; + if ((DWORD) (ret - WAIT_OBJECT_0) < count) { + event = events[ret - WAIT_OBJECT_0]; + events[ret - WAIT_OBJECT_0] = events[--count]; + } else if (!have_select_revents) { break; } + have_select_revents = false; blocking = false; - /* we have to walk very carefully in case - * qemu_aio_set_fd_handler is called while we're walking */ - node = QLIST_FIRST(&ctx->aio_handlers); - while (node) { - AioHandler *tmp; - - ctx->walking_handlers++; - - if (!node->deleted && - event_notifier_get_handle(node->e) == events[ret - WAIT_OBJECT_0] && - node->io_notify) { - node->io_notify(node->e); - progress = true; - } - - tmp = node; - node = QLIST_NEXT(node, node); - - ctx->walking_handlers--; - - if (!ctx->walking_handlers && tmp->deleted) { - QLIST_REMOVE(tmp, node); - g_free(tmp); - } - } - - /* Try again, but only call each handler once. */ - events[ret - WAIT_OBJECT_0] = events[--count]; + progress |= aio_dispatch_handlers(ctx, event); } + progress |= timerlistgroup_run_timers(&ctx->tlg); + + aio_set_dispatching(ctx, was_dispatching); return progress; }