*/
#include "qemu-common.h"
-#include "qemu-coroutine.h"
-#include "qemu-coroutine-int.h"
-#include "qemu-queue.h"
+#include "block/coroutine.h"
+#include "block/coroutine_int.h"
+#include "qemu/queue.h"
+#include "block/aio.h"
#include "trace.h"
-static QTAILQ_HEAD(, Coroutine) unlock_bh_queue =
- QTAILQ_HEAD_INITIALIZER(unlock_bh_queue);
-static QEMUBH* unlock_bh;
+/* Coroutines are awoken from a BH to allow the current coroutine to complete
+ * its flow of execution. The BH may run after the CoQueue has been destroyed,
+ * so keep BH data in a separate heap-allocated struct.
+ */
+typedef struct {
+ QEMUBH *bh;
+ QTAILQ_HEAD(, Coroutine) entries;
+} CoQueueNextData;
static void qemu_co_queue_next_bh(void *opaque)
{
+ CoQueueNextData *data = opaque;
Coroutine *next;
trace_qemu_co_queue_next_bh();
- while ((next = QTAILQ_FIRST(&unlock_bh_queue))) {
- QTAILQ_REMOVE(&unlock_bh_queue, next, co_queue_next);
+ while ((next = QTAILQ_FIRST(&data->entries))) {
+ QTAILQ_REMOVE(&data->entries, next, co_queue_next);
qemu_coroutine_enter(next, NULL);
}
+
+ qemu_bh_delete(data->bh);
+ g_slice_free(CoQueueNextData, data);
}
void qemu_co_queue_init(CoQueue *queue)
{
QTAILQ_INIT(&queue->entries);
- if (!unlock_bh) {
- unlock_bh = qemu_bh_new(qemu_co_queue_next_bh, NULL);
- }
+ /* This will be exposed to callers once there are multiple AioContexts */
+ queue->ctx = qemu_get_aio_context();
}
void coroutine_fn qemu_co_queue_wait(CoQueue *queue)
assert(qemu_in_coroutine());
}
-bool qemu_co_queue_next(CoQueue *queue)
+void coroutine_fn qemu_co_queue_wait_insert_head(CoQueue *queue)
+{
+ Coroutine *self = qemu_coroutine_self();
+ QTAILQ_INSERT_HEAD(&queue->entries, self, co_queue_next);
+ qemu_coroutine_yield();
+ assert(qemu_in_coroutine());
+}
+
+static bool qemu_co_queue_do_restart(CoQueue *queue, bool single)
{
Coroutine *next;
+ CoQueueNextData *data;
+
+ if (QTAILQ_EMPTY(&queue->entries)) {
+ return false;
+ }
- next = QTAILQ_FIRST(&queue->entries);
- if (next) {
+ data = g_slice_new(CoQueueNextData);
+ data->bh = aio_bh_new(queue->ctx, qemu_co_queue_next_bh, data);
+ QTAILQ_INIT(&data->entries);
+ qemu_bh_schedule(data->bh);
+
+ while ((next = QTAILQ_FIRST(&queue->entries)) != NULL) {
QTAILQ_REMOVE(&queue->entries, next, co_queue_next);
- QTAILQ_INSERT_TAIL(&unlock_bh_queue, next, co_queue_next);
+ QTAILQ_INSERT_TAIL(&data->entries, next, co_queue_next);
trace_qemu_co_queue_next(next);
- qemu_bh_schedule(unlock_bh);
+ if (single) {
+ break;
+ }
}
+ return true;
+}
- return (next != NULL);
+bool qemu_co_queue_next(CoQueue *queue)
+{
+ return qemu_co_queue_do_restart(queue, true);
+}
+
+void qemu_co_queue_restart_all(CoQueue *queue)
+{
+ qemu_co_queue_do_restart(queue, false);
}
bool qemu_co_queue_empty(CoQueue *queue)
trace_qemu_co_mutex_unlock_return(mutex, self);
}
+
+void qemu_co_rwlock_init(CoRwlock *lock)
+{
+ memset(lock, 0, sizeof(*lock));
+ qemu_co_queue_init(&lock->queue);
+}
+
+void qemu_co_rwlock_rdlock(CoRwlock *lock)
+{
+ while (lock->writer) {
+ qemu_co_queue_wait(&lock->queue);
+ }
+ lock->reader++;
+}
+
+void qemu_co_rwlock_unlock(CoRwlock *lock)
+{
+ assert(qemu_in_coroutine());
+ if (lock->writer) {
+ lock->writer = false;
+ qemu_co_queue_restart_all(&lock->queue);
+ } else {
+ lock->reader--;
+ assert(lock->reader >= 0);
+ /* Wakeup only one waiting writer */
+ if (!lock->reader) {
+ qemu_co_queue_next(&lock->queue);
+ }
+ }
+}
+
+void qemu_co_rwlock_wrlock(CoRwlock *lock)
+{
+ while (lock->writer || lock->reader) {
+ qemu_co_queue_wait(&lock->queue);
+ }
+ lock->writer = true;
+}