]> Git Repo - qemu.git/blob - aio-posix.c
aio: Introduce aio_context_setup
[qemu.git] / aio-posix.c
1 /*
2  * QEMU aio implementation
3  *
4  * Copyright IBM, Corp. 2008
5  *
6  * Authors:
7  *  Anthony Liguori   <[email protected]>
8  *
9  * This work is licensed under the terms of the GNU GPL, version 2.  See
10  * the COPYING file in the top-level directory.
11  *
12  * Contributions after 2012-01-13 are licensed under the terms of the
13  * GNU GPL, version 2 or (at your option) any later version.
14  */
15
16 #include "qemu-common.h"
17 #include "block/block.h"
18 #include "qemu/queue.h"
19 #include "qemu/sockets.h"
20
21 struct AioHandler
22 {
23     GPollFD pfd;
24     IOHandler *io_read;
25     IOHandler *io_write;
26     int deleted;
27     void *opaque;
28     bool is_external;
29     QLIST_ENTRY(AioHandler) node;
30 };
31
32 static AioHandler *find_aio_handler(AioContext *ctx, int fd)
33 {
34     AioHandler *node;
35
36     QLIST_FOREACH(node, &ctx->aio_handlers, node) {
37         if (node->pfd.fd == fd)
38             if (!node->deleted)
39                 return node;
40     }
41
42     return NULL;
43 }
44
45 void aio_set_fd_handler(AioContext *ctx,
46                         int fd,
47                         bool is_external,
48                         IOHandler *io_read,
49                         IOHandler *io_write,
50                         void *opaque)
51 {
52     AioHandler *node;
53
54     node = find_aio_handler(ctx, fd);
55
56     /* Are we deleting the fd handler? */
57     if (!io_read && !io_write) {
58         if (node) {
59             g_source_remove_poll(&ctx->source, &node->pfd);
60
61             /* If the lock is held, just mark the node as deleted */
62             if (ctx->walking_handlers) {
63                 node->deleted = 1;
64                 node->pfd.revents = 0;
65             } else {
66                 /* Otherwise, delete it for real.  We can't just mark it as
67                  * deleted because deleted nodes are only cleaned up after
68                  * releasing the walking_handlers lock.
69                  */
70                 QLIST_REMOVE(node, node);
71                 g_free(node);
72             }
73         }
74     } else {
75         if (node == NULL) {
76             /* Alloc and insert if it's not already there */
77             node = g_new0(AioHandler, 1);
78             node->pfd.fd = fd;
79             QLIST_INSERT_HEAD(&ctx->aio_handlers, node, node);
80
81             g_source_add_poll(&ctx->source, &node->pfd);
82         }
83         /* Update handler with latest information */
84         node->io_read = io_read;
85         node->io_write = io_write;
86         node->opaque = opaque;
87         node->is_external = is_external;
88
89         node->pfd.events = (io_read ? G_IO_IN | G_IO_HUP | G_IO_ERR : 0);
90         node->pfd.events |= (io_write ? G_IO_OUT | G_IO_ERR : 0);
91     }
92
93     aio_notify(ctx);
94 }
95
96 void aio_set_event_notifier(AioContext *ctx,
97                             EventNotifier *notifier,
98                             bool is_external,
99                             EventNotifierHandler *io_read)
100 {
101     aio_set_fd_handler(ctx, event_notifier_get_fd(notifier),
102                        is_external, (IOHandler *)io_read, NULL, notifier);
103 }
104
105 bool aio_prepare(AioContext *ctx)
106 {
107     return false;
108 }
109
110 bool aio_pending(AioContext *ctx)
111 {
112     AioHandler *node;
113
114     QLIST_FOREACH(node, &ctx->aio_handlers, node) {
115         int revents;
116
117         revents = node->pfd.revents & node->pfd.events;
118         if (revents & (G_IO_IN | G_IO_HUP | G_IO_ERR) && node->io_read) {
119             return true;
120         }
121         if (revents & (G_IO_OUT | G_IO_ERR) && node->io_write) {
122             return true;
123         }
124     }
125
126     return false;
127 }
128
129 bool aio_dispatch(AioContext *ctx)
130 {
131     AioHandler *node;
132     bool progress = false;
133
134     /*
135      * If there are callbacks left that have been queued, we need to call them.
136      * Do not call select in this case, because it is possible that the caller
137      * does not need a complete flush (as is the case for aio_poll loops).
138      */
139     if (aio_bh_poll(ctx)) {
140         progress = true;
141     }
142
143     /*
144      * We have to walk very carefully in case aio_set_fd_handler is
145      * called while we're walking.
146      */
147     node = QLIST_FIRST(&ctx->aio_handlers);
148     while (node) {
149         AioHandler *tmp;
150         int revents;
151
152         ctx->walking_handlers++;
153
154         revents = node->pfd.revents & node->pfd.events;
155         node->pfd.revents = 0;
156
157         if (!node->deleted &&
158             (revents & (G_IO_IN | G_IO_HUP | G_IO_ERR)) &&
159             node->io_read) {
160             node->io_read(node->opaque);
161
162             /* aio_notify() does not count as progress */
163             if (node->opaque != &ctx->notifier) {
164                 progress = true;
165             }
166         }
167         if (!node->deleted &&
168             (revents & (G_IO_OUT | G_IO_ERR)) &&
169             node->io_write) {
170             node->io_write(node->opaque);
171             progress = true;
172         }
173
174         tmp = node;
175         node = QLIST_NEXT(node, node);
176
177         ctx->walking_handlers--;
178
179         if (!ctx->walking_handlers && tmp->deleted) {
180             QLIST_REMOVE(tmp, node);
181             g_free(tmp);
182         }
183     }
184
185     /* Run our timers */
186     progress |= timerlistgroup_run_timers(&ctx->tlg);
187
188     return progress;
189 }
190
191 /* These thread-local variables are used only in a small part of aio_poll
192  * around the call to the poll() system call.  In particular they are not
193  * used while aio_poll is performing callbacks, which makes it much easier
194  * to think about reentrancy!
195  *
196  * Stack-allocated arrays would be perfect but they have size limitations;
197  * heap allocation is expensive enough that we want to reuse arrays across
198  * calls to aio_poll().  And because poll() has to be called without holding
199  * any lock, the arrays cannot be stored in AioContext.  Thread-local data
200  * has none of the disadvantages of these three options.
201  */
202 static __thread GPollFD *pollfds;
203 static __thread AioHandler **nodes;
204 static __thread unsigned npfd, nalloc;
205 static __thread Notifier pollfds_cleanup_notifier;
206
207 static void pollfds_cleanup(Notifier *n, void *unused)
208 {
209     g_assert(npfd == 0);
210     g_free(pollfds);
211     g_free(nodes);
212     nalloc = 0;
213 }
214
215 static void add_pollfd(AioHandler *node)
216 {
217     if (npfd == nalloc) {
218         if (nalloc == 0) {
219             pollfds_cleanup_notifier.notify = pollfds_cleanup;
220             qemu_thread_atexit_add(&pollfds_cleanup_notifier);
221             nalloc = 8;
222         } else {
223             g_assert(nalloc <= INT_MAX);
224             nalloc *= 2;
225         }
226         pollfds = g_renew(GPollFD, pollfds, nalloc);
227         nodes = g_renew(AioHandler *, nodes, nalloc);
228     }
229     nodes[npfd] = node;
230     pollfds[npfd] = (GPollFD) {
231         .fd = node->pfd.fd,
232         .events = node->pfd.events,
233     };
234     npfd++;
235 }
236
237 bool aio_poll(AioContext *ctx, bool blocking)
238 {
239     AioHandler *node;
240     int i, ret;
241     bool progress;
242     int64_t timeout;
243
244     aio_context_acquire(ctx);
245     progress = false;
246
247     /* aio_notify can avoid the expensive event_notifier_set if
248      * everything (file descriptors, bottom halves, timers) will
249      * be re-evaluated before the next blocking poll().  This is
250      * already true when aio_poll is called with blocking == false;
251      * if blocking == true, it is only true after poll() returns,
252      * so disable the optimization now.
253      */
254     if (blocking) {
255         atomic_add(&ctx->notify_me, 2);
256     }
257
258     ctx->walking_handlers++;
259
260     assert(npfd == 0);
261
262     /* fill pollfds */
263     QLIST_FOREACH(node, &ctx->aio_handlers, node) {
264         if (!node->deleted && node->pfd.events
265             && aio_node_check(ctx, node->is_external)) {
266             add_pollfd(node);
267         }
268     }
269
270     timeout = blocking ? aio_compute_timeout(ctx) : 0;
271
272     /* wait until next event */
273     if (timeout) {
274         aio_context_release(ctx);
275     }
276     ret = qemu_poll_ns((GPollFD *)pollfds, npfd, timeout);
277     if (blocking) {
278         atomic_sub(&ctx->notify_me, 2);
279     }
280     if (timeout) {
281         aio_context_acquire(ctx);
282     }
283
284     aio_notify_accept(ctx);
285
286     /* if we have any readable fds, dispatch event */
287     if (ret > 0) {
288         for (i = 0; i < npfd; i++) {
289             nodes[i]->pfd.revents = pollfds[i].revents;
290         }
291     }
292
293     npfd = 0;
294     ctx->walking_handlers--;
295
296     /* Run dispatch even if there were no readable fds to run timers */
297     if (aio_dispatch(ctx)) {
298         progress = true;
299     }
300
301     aio_context_release(ctx);
302
303     return progress;
304 }
305
306 void aio_context_setup(AioContext *ctx, Error **errp)
307 {
308 }
This page took 0.041664 seconds and 4 git commands to generate.