*
*/
+#include "qemu/osdep.h"
#include "trace.h"
#include "qemu-common.h"
#include "qemu/thread.h"
#include "qemu/atomic.h"
#include "qemu/coroutine.h"
#include "qemu/coroutine_int.h"
+#include "block/aio.h"
enum {
POOL_BATCH_SIZE = 64,
}
}
-Coroutine *qemu_coroutine_create(CoroutineEntry *entry)
+Coroutine *qemu_coroutine_create(CoroutineEntry *entry, void *opaque)
{
Coroutine *co = NULL;
}
co->entry = entry;
- QTAILQ_INIT(&co->co_queue_wakeup);
+ co->entry_arg = opaque;
+ QSIMPLEQ_INIT(&co->co_queue_wakeup);
return co;
}
qemu_coroutine_delete(co);
}
-void qemu_coroutine_enter(Coroutine *co, void *opaque)
+void qemu_aio_coroutine_enter(AioContext *ctx, Coroutine *co)
{
Coroutine *self = qemu_coroutine_self();
CoroutineAction ret;
- trace_qemu_coroutine_enter(self, co, opaque);
+ /* Cannot rely on the read barrier for co in aio_co_wake(), as there are
+ * callers outside of aio_co_wake() */
+ const char *scheduled = atomic_mb_read(&co->scheduled);
+
+ trace_qemu_aio_coroutine_enter(ctx, self, co, co->entry_arg);
+
+ /* if the Coroutine has already been scheduled, entering it again will
+ * cause us to enter it twice, potentially even after the coroutine has
+ * been deleted */
+ if (scheduled) {
+ fprintf(stderr,
+ "%s: Co-routine was already scheduled in '%s'\n",
+ __func__, scheduled);
+ abort();
+ }
if (co->caller) {
fprintf(stderr, "Co-routine re-entered recursively\n");
}
co->caller = self;
- co->entry_arg = opaque;
+ co->ctx = ctx;
+
+ /* Store co->ctx before anything that stores co. Matches
+ * barrier in aio_co_wake and qemu_co_mutex_wake.
+ */
+ smp_wmb();
+
ret = qemu_coroutine_switch(self, co, COROUTINE_ENTER);
qemu_co_queue_run_restart(co);
+ /* Beware, if ret == COROUTINE_YIELD and qemu_co_queue_run_restart()
+ * has started any other coroutine, "co" might have been reentered
+ * and even freed by now! So be careful and do not touch it.
+ */
+
switch (ret) {
case COROUTINE_YIELD:
return;
case COROUTINE_TERMINATE:
+ assert(!co->locks_held);
trace_qemu_coroutine_terminate(co);
coroutine_delete(co);
return;
}
}
+void qemu_coroutine_enter(Coroutine *co)
+{
+ qemu_aio_coroutine_enter(qemu_get_current_aio_context(), co);
+}
+
+void qemu_coroutine_enter_if_inactive(Coroutine *co)
+{
+ if (!qemu_coroutine_entered(co)) {
+ qemu_coroutine_enter(co);
+ }
+}
+
void coroutine_fn qemu_coroutine_yield(void)
{
Coroutine *self = qemu_coroutine_self();
self->caller = NULL;
qemu_coroutine_switch(self, to, COROUTINE_YIELD);
}
+
+bool qemu_coroutine_entered(Coroutine *co)
+{
+ return co->caller;
+}