]> Git Repo - qemu.git/blob - util/qemu-coroutine-lock.c
coroutine-lock: make CoMutex thread-safe
[qemu.git] / util / qemu-coroutine-lock.c
1 /*
2  * coroutine queues and locks
3  *
4  * Copyright (c) 2011 Kevin Wolf <[email protected]>
5  *
6  * Permission is hereby granted, free of charge, to any person obtaining a copy
7  * of this software and associated documentation files (the "Software"), to deal
8  * in the Software without restriction, including without limitation the rights
9  * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
10  * copies of the Software, and to permit persons to whom the Software is
11  * furnished to do so, subject to the following conditions:
12  *
13  * The above copyright notice and this permission notice shall be included in
14  * all copies or substantial portions of the Software.
15  *
16  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
17  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
18  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
19  * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
20  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
21  * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
22  * THE SOFTWARE.
23  *
24  * The lock-free mutex implementation is based on OSv
25  * (core/lfmutex.cc, include/lockfree/mutex.hh).
26  * Copyright (C) 2013 Cloudius Systems, Ltd.
27  */
28
29 #include "qemu/osdep.h"
30 #include "qemu-common.h"
31 #include "qemu/coroutine.h"
32 #include "qemu/coroutine_int.h"
33 #include "qemu/queue.h"
34 #include "block/aio.h"
35 #include "trace.h"
36
37 void qemu_co_queue_init(CoQueue *queue)
38 {
39     QSIMPLEQ_INIT(&queue->entries);
40 }
41
42 void coroutine_fn qemu_co_queue_wait(CoQueue *queue)
43 {
44     Coroutine *self = qemu_coroutine_self();
45     QSIMPLEQ_INSERT_TAIL(&queue->entries, self, co_queue_next);
46     qemu_coroutine_yield();
47     assert(qemu_in_coroutine());
48 }
49
50 /**
51  * qemu_co_queue_run_restart:
52  *
53  * Enter each coroutine that was previously marked for restart by
54  * qemu_co_queue_next() or qemu_co_queue_restart_all().  This function is
55  * invoked by the core coroutine code when the current coroutine yields or
56  * terminates.
57  */
58 void qemu_co_queue_run_restart(Coroutine *co)
59 {
60     Coroutine *next;
61
62     trace_qemu_co_queue_run_restart(co);
63     while ((next = QSIMPLEQ_FIRST(&co->co_queue_wakeup))) {
64         QSIMPLEQ_REMOVE_HEAD(&co->co_queue_wakeup, co_queue_next);
65         qemu_coroutine_enter(next);
66     }
67 }
68
69 static bool qemu_co_queue_do_restart(CoQueue *queue, bool single)
70 {
71     Coroutine *next;
72
73     if (QSIMPLEQ_EMPTY(&queue->entries)) {
74         return false;
75     }
76
77     while ((next = QSIMPLEQ_FIRST(&queue->entries)) != NULL) {
78         QSIMPLEQ_REMOVE_HEAD(&queue->entries, co_queue_next);
79         aio_co_wake(next);
80         if (single) {
81             break;
82         }
83     }
84     return true;
85 }
86
87 bool coroutine_fn qemu_co_queue_next(CoQueue *queue)
88 {
89     assert(qemu_in_coroutine());
90     return qemu_co_queue_do_restart(queue, true);
91 }
92
93 void coroutine_fn qemu_co_queue_restart_all(CoQueue *queue)
94 {
95     assert(qemu_in_coroutine());
96     qemu_co_queue_do_restart(queue, false);
97 }
98
99 bool qemu_co_enter_next(CoQueue *queue)
100 {
101     Coroutine *next;
102
103     next = QSIMPLEQ_FIRST(&queue->entries);
104     if (!next) {
105         return false;
106     }
107
108     QSIMPLEQ_REMOVE_HEAD(&queue->entries, co_queue_next);
109     qemu_coroutine_enter(next);
110     return true;
111 }
112
113 bool qemu_co_queue_empty(CoQueue *queue)
114 {
115     return QSIMPLEQ_FIRST(&queue->entries) == NULL;
116 }
117
118 /* The wait records are handled with a multiple-producer, single-consumer
119  * lock-free queue.  There cannot be two concurrent pop_waiter() calls
120  * because pop_waiter() can only be called while mutex->handoff is zero.
121  * This can happen in three cases:
122  * - in qemu_co_mutex_unlock, before the hand-off protocol has started.
123  *   In this case, qemu_co_mutex_lock will see mutex->handoff == 0 and
124  *   not take part in the handoff.
125  * - in qemu_co_mutex_lock, if it steals the hand-off responsibility from
126  *   qemu_co_mutex_unlock.  In this case, qemu_co_mutex_unlock will fail
127  *   the cmpxchg (it will see either 0 or the next sequence value) and
128  *   exit.  The next hand-off cannot begin until qemu_co_mutex_lock has
129  *   woken up someone.
130  * - in qemu_co_mutex_unlock, if it takes the hand-off token itself.
131  *   In this case another iteration starts with mutex->handoff == 0;
132  *   a concurrent qemu_co_mutex_lock will fail the cmpxchg, and
133  *   qemu_co_mutex_unlock will go back to case (1).
134  *
135  * The following functions manage this queue.
136  */
137 typedef struct CoWaitRecord {
138     Coroutine *co;
139     QSLIST_ENTRY(CoWaitRecord) next;
140 } CoWaitRecord;
141
142 static void push_waiter(CoMutex *mutex, CoWaitRecord *w)
143 {
144     w->co = qemu_coroutine_self();
145     QSLIST_INSERT_HEAD_ATOMIC(&mutex->from_push, w, next);
146 }
147
148 static void move_waiters(CoMutex *mutex)
149 {
150     QSLIST_HEAD(, CoWaitRecord) reversed;
151     QSLIST_MOVE_ATOMIC(&reversed, &mutex->from_push);
152     while (!QSLIST_EMPTY(&reversed)) {
153         CoWaitRecord *w = QSLIST_FIRST(&reversed);
154         QSLIST_REMOVE_HEAD(&reversed, next);
155         QSLIST_INSERT_HEAD(&mutex->to_pop, w, next);
156     }
157 }
158
159 static CoWaitRecord *pop_waiter(CoMutex *mutex)
160 {
161     CoWaitRecord *w;
162
163     if (QSLIST_EMPTY(&mutex->to_pop)) {
164         move_waiters(mutex);
165         if (QSLIST_EMPTY(&mutex->to_pop)) {
166             return NULL;
167         }
168     }
169     w = QSLIST_FIRST(&mutex->to_pop);
170     QSLIST_REMOVE_HEAD(&mutex->to_pop, next);
171     return w;
172 }
173
174 static bool has_waiters(CoMutex *mutex)
175 {
176     return QSLIST_EMPTY(&mutex->to_pop) || QSLIST_EMPTY(&mutex->from_push);
177 }
178
179 void qemu_co_mutex_init(CoMutex *mutex)
180 {
181     memset(mutex, 0, sizeof(*mutex));
182 }
183
184 static void coroutine_fn qemu_co_mutex_lock_slowpath(CoMutex *mutex)
185 {
186     Coroutine *self = qemu_coroutine_self();
187     CoWaitRecord w;
188     unsigned old_handoff;
189
190     trace_qemu_co_mutex_lock_entry(mutex, self);
191     w.co = self;
192     push_waiter(mutex, &w);
193
194     /* This is the "Responsibility Hand-Off" protocol; a lock() picks from
195      * a concurrent unlock() the responsibility of waking somebody up.
196      */
197     old_handoff = atomic_mb_read(&mutex->handoff);
198     if (old_handoff &&
199         has_waiters(mutex) &&
200         atomic_cmpxchg(&mutex->handoff, old_handoff, 0) == old_handoff) {
201         /* There can be no concurrent pops, because there can be only
202          * one active handoff at a time.
203          */
204         CoWaitRecord *to_wake = pop_waiter(mutex);
205         Coroutine *co = to_wake->co;
206         if (co == self) {
207             /* We got the lock ourselves!  */
208             assert(to_wake == &w);
209             return;
210         }
211
212         aio_co_wake(co);
213     }
214
215     qemu_coroutine_yield();
216     trace_qemu_co_mutex_lock_return(mutex, self);
217 }
218
219 void coroutine_fn qemu_co_mutex_lock(CoMutex *mutex)
220 {
221     Coroutine *self = qemu_coroutine_self();
222
223     if (atomic_fetch_inc(&mutex->locked) == 0) {
224         /* Uncontended.  */
225         trace_qemu_co_mutex_lock_uncontended(mutex, self);
226     } else {
227         qemu_co_mutex_lock_slowpath(mutex);
228     }
229     mutex->holder = self;
230     self->locks_held++;
231 }
232
233 void coroutine_fn qemu_co_mutex_unlock(CoMutex *mutex)
234 {
235     Coroutine *self = qemu_coroutine_self();
236
237     trace_qemu_co_mutex_unlock_entry(mutex, self);
238
239     assert(mutex->locked);
240     assert(mutex->holder == self);
241     assert(qemu_in_coroutine());
242
243     mutex->holder = NULL;
244     self->locks_held--;
245     if (atomic_fetch_dec(&mutex->locked) == 1) {
246         /* No waiting qemu_co_mutex_lock().  Pfew, that was easy!  */
247         return;
248     }
249
250     for (;;) {
251         CoWaitRecord *to_wake = pop_waiter(mutex);
252         unsigned our_handoff;
253
254         if (to_wake) {
255             Coroutine *co = to_wake->co;
256             aio_co_wake(co);
257             break;
258         }
259
260         /* Some concurrent lock() is in progress (we know this because
261          * mutex->locked was >1) but it hasn't yet put itself on the wait
262          * queue.  Pick a sequence number for the handoff protocol (not 0).
263          */
264         if (++mutex->sequence == 0) {
265             mutex->sequence = 1;
266         }
267
268         our_handoff = mutex->sequence;
269         atomic_mb_set(&mutex->handoff, our_handoff);
270         if (!has_waiters(mutex)) {
271             /* The concurrent lock has not added itself yet, so it
272              * will be able to pick our handoff.
273              */
274             break;
275         }
276
277         /* Try to do the handoff protocol ourselves; if somebody else has
278          * already taken it, however, we're done and they're responsible.
279          */
280         if (atomic_cmpxchg(&mutex->handoff, our_handoff, 0) != our_handoff) {
281             break;
282         }
283     }
284
285     trace_qemu_co_mutex_unlock_return(mutex, self);
286 }
287
288 void qemu_co_rwlock_init(CoRwlock *lock)
289 {
290     memset(lock, 0, sizeof(*lock));
291     qemu_co_queue_init(&lock->queue);
292 }
293
294 void qemu_co_rwlock_rdlock(CoRwlock *lock)
295 {
296     Coroutine *self = qemu_coroutine_self();
297
298     while (lock->writer) {
299         qemu_co_queue_wait(&lock->queue);
300     }
301     lock->reader++;
302     self->locks_held++;
303 }
304
305 void qemu_co_rwlock_unlock(CoRwlock *lock)
306 {
307     Coroutine *self = qemu_coroutine_self();
308
309     assert(qemu_in_coroutine());
310     if (lock->writer) {
311         lock->writer = false;
312         qemu_co_queue_restart_all(&lock->queue);
313     } else {
314         lock->reader--;
315         assert(lock->reader >= 0);
316         /* Wakeup only one waiting writer */
317         if (!lock->reader) {
318             qemu_co_queue_next(&lock->queue);
319         }
320     }
321     self->locks_held--;
322 }
323
324 void qemu_co_rwlock_wrlock(CoRwlock *lock)
325 {
326     Coroutine *self = qemu_coroutine_self();
327
328     while (lock->writer || lock->reader) {
329         qemu_co_queue_wait(&lock->queue);
330     }
331     lock->writer = true;
332     self->locks_held++;
333 }
This page took 0.042686 seconds and 4 git commands to generate.