]>
Commit | Line | Data |
---|---|---|
b96e9247 KW |
1 | /* |
2 | * coroutine queues and locks | |
3 | * | |
4 | * Copyright (c) 2011 Kevin Wolf <[email protected]> | |
5 | * | |
6 | * Permission is hereby granted, free of charge, to any person obtaining a copy | |
7 | * of this software and associated documentation files (the "Software"), to deal | |
8 | * in the Software without restriction, including without limitation the rights | |
9 | * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell | |
10 | * copies of the Software, and to permit persons to whom the Software is | |
11 | * furnished to do so, subject to the following conditions: | |
12 | * | |
13 | * The above copyright notice and this permission notice shall be included in | |
14 | * all copies or substantial portions of the Software. | |
15 | * | |
16 | * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR | |
17 | * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, | |
18 | * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL | |
19 | * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER | |
20 | * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, | |
21 | * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN | |
22 | * THE SOFTWARE. | |
fed20a70 PB |
23 | * |
24 | * The lock-free mutex implementation is based on OSv | |
25 | * (core/lfmutex.cc, include/lockfree/mutex.hh). | |
26 | * Copyright (C) 2013 Cloudius Systems, Ltd. | |
b96e9247 KW |
27 | */ |
28 | ||
aafd7584 | 29 | #include "qemu/osdep.h" |
b96e9247 | 30 | #include "qemu-common.h" |
10817bf0 DB |
31 | #include "qemu/coroutine.h" |
32 | #include "qemu/coroutine_int.h" | |
480cff63 | 33 | #include "qemu/processor.h" |
1de7afc9 | 34 | #include "qemu/queue.h" |
a9d92355 | 35 | #include "block/aio.h" |
b96e9247 KW |
36 | #include "trace.h" |
37 | ||
b96e9247 KW |
38 | void qemu_co_queue_init(CoQueue *queue) |
39 | { | |
7d9c8581 | 40 | QSIMPLEQ_INIT(&queue->entries); |
b96e9247 KW |
41 | } |
42 | ||
1a957cf9 | 43 | void coroutine_fn qemu_co_queue_wait_impl(CoQueue *queue, QemuLockable *lock) |
b96e9247 KW |
44 | { |
45 | Coroutine *self = qemu_coroutine_self(); | |
7d9c8581 | 46 | QSIMPLEQ_INSERT_TAIL(&queue->entries, self, co_queue_next); |
1ace7cea | 47 | |
1a957cf9 PB |
48 | if (lock) { |
49 | qemu_lockable_unlock(lock); | |
1ace7cea PB |
50 | } |
51 | ||
52 | /* There is no race condition here. Other threads will call | |
53 | * aio_co_schedule on our AioContext, which can reenter this | |
54 | * coroutine but only after this yield and after the main loop | |
55 | * has gone through the next iteration. | |
56 | */ | |
b96e9247 KW |
57 | qemu_coroutine_yield(); |
58 | assert(qemu_in_coroutine()); | |
1ace7cea PB |
59 | |
60 | /* TODO: OSv implements wait morphing here, where the wakeup | |
61 | * primitive automatically places the woken coroutine on the | |
62 | * mutex's queue. This avoids the thundering herd effect. | |
1a957cf9 PB |
63 | * This could be implemented for CoMutexes, but not really for |
64 | * other cases of QemuLockable. | |
1ace7cea | 65 | */ |
1a957cf9 PB |
66 | if (lock) { |
67 | qemu_lockable_lock(lock); | |
1ace7cea | 68 | } |
b96e9247 KW |
69 | } |
70 | ||
02ffb504 SH |
71 | /** |
72 | * qemu_co_queue_run_restart: | |
73 | * | |
74 | * Enter each coroutine that was previously marked for restart by | |
75 | * qemu_co_queue_next() or qemu_co_queue_restart_all(). This function is | |
76 | * invoked by the core coroutine code when the current coroutine yields or | |
77 | * terminates. | |
78 | */ | |
79 | void qemu_co_queue_run_restart(Coroutine *co) | |
80 | { | |
81 | Coroutine *next; | |
528f449f RP |
82 | QSIMPLEQ_HEAD(, Coroutine) tmp_queue_wakeup = |
83 | QSIMPLEQ_HEAD_INITIALIZER(tmp_queue_wakeup); | |
02ffb504 SH |
84 | |
85 | trace_qemu_co_queue_run_restart(co); | |
528f449f RP |
86 | |
87 | /* Because "co" has yielded, any coroutine that we wakeup can resume it. | |
88 | * If this happens and "co" terminates, co->co_queue_wakeup becomes | |
89 | * invalid memory. Therefore, use a temporary queue and do not touch | |
90 | * the "co" coroutine as soon as you enter another one. | |
91 | * | |
d2f668b7 | 92 | * In its turn resumed "co" can populate "co_queue_wakeup" queue with |
528f449f RP |
93 | * new coroutines to be woken up. The caller, who has resumed "co", |
94 | * will be responsible for traversing the same queue, which may cause | |
95 | * a different wakeup order but not any missing wakeups. | |
96 | */ | |
97 | QSIMPLEQ_CONCAT(&tmp_queue_wakeup, &co->co_queue_wakeup); | |
98 | ||
99 | while ((next = QSIMPLEQ_FIRST(&tmp_queue_wakeup))) { | |
100 | QSIMPLEQ_REMOVE_HEAD(&tmp_queue_wakeup, co_queue_next); | |
0b8b8753 | 101 | qemu_coroutine_enter(next); |
02ffb504 SH |
102 | } |
103 | } | |
104 | ||
28f08246 | 105 | static bool qemu_co_queue_do_restart(CoQueue *queue, bool single) |
b96e9247 | 106 | { |
b96e9247 | 107 | Coroutine *next; |
28f08246 | 108 | |
7d9c8581 | 109 | if (QSIMPLEQ_EMPTY(&queue->entries)) { |
28f08246 SH |
110 | return false; |
111 | } | |
b96e9247 | 112 | |
7d9c8581 PB |
113 | while ((next = QSIMPLEQ_FIRST(&queue->entries)) != NULL) { |
114 | QSIMPLEQ_REMOVE_HEAD(&queue->entries, co_queue_next); | |
a9d92355 | 115 | aio_co_wake(next); |
28f08246 SH |
116 | if (single) { |
117 | break; | |
118 | } | |
b96e9247 | 119 | } |
28f08246 SH |
120 | return true; |
121 | } | |
b96e9247 | 122 | |
b681a1c7 | 123 | bool coroutine_fn qemu_co_queue_next(CoQueue *queue) |
28f08246 | 124 | { |
b681a1c7 | 125 | assert(qemu_in_coroutine()); |
28f08246 | 126 | return qemu_co_queue_do_restart(queue, true); |
b96e9247 KW |
127 | } |
128 | ||
b681a1c7 | 129 | void coroutine_fn qemu_co_queue_restart_all(CoQueue *queue) |
e8ee5e4c | 130 | { |
b681a1c7 | 131 | assert(qemu_in_coroutine()); |
28f08246 | 132 | qemu_co_queue_do_restart(queue, false); |
e8ee5e4c SH |
133 | } |
134 | ||
5261dd7b | 135 | bool qemu_co_enter_next_impl(CoQueue *queue, QemuLockable *lock) |
b681a1c7 BC |
136 | { |
137 | Coroutine *next; | |
138 | ||
7d9c8581 | 139 | next = QSIMPLEQ_FIRST(&queue->entries); |
b681a1c7 BC |
140 | if (!next) { |
141 | return false; | |
142 | } | |
143 | ||
7d9c8581 | 144 | QSIMPLEQ_REMOVE_HEAD(&queue->entries, co_queue_next); |
5261dd7b PB |
145 | if (lock) { |
146 | qemu_lockable_unlock(lock); | |
147 | } | |
148 | aio_co_wake(next); | |
149 | if (lock) { | |
150 | qemu_lockable_lock(lock); | |
151 | } | |
b681a1c7 BC |
152 | return true; |
153 | } | |
154 | ||
b96e9247 KW |
155 | bool qemu_co_queue_empty(CoQueue *queue) |
156 | { | |
7d9c8581 | 157 | return QSIMPLEQ_FIRST(&queue->entries) == NULL; |
b96e9247 KW |
158 | } |
159 | ||
fed20a70 PB |
160 | /* The wait records are handled with a multiple-producer, single-consumer |
161 | * lock-free queue. There cannot be two concurrent pop_waiter() calls | |
162 | * because pop_waiter() can only be called while mutex->handoff is zero. | |
163 | * This can happen in three cases: | |
164 | * - in qemu_co_mutex_unlock, before the hand-off protocol has started. | |
165 | * In this case, qemu_co_mutex_lock will see mutex->handoff == 0 and | |
166 | * not take part in the handoff. | |
167 | * - in qemu_co_mutex_lock, if it steals the hand-off responsibility from | |
168 | * qemu_co_mutex_unlock. In this case, qemu_co_mutex_unlock will fail | |
169 | * the cmpxchg (it will see either 0 or the next sequence value) and | |
170 | * exit. The next hand-off cannot begin until qemu_co_mutex_lock has | |
171 | * woken up someone. | |
172 | * - in qemu_co_mutex_unlock, if it takes the hand-off token itself. | |
173 | * In this case another iteration starts with mutex->handoff == 0; | |
174 | * a concurrent qemu_co_mutex_lock will fail the cmpxchg, and | |
175 | * qemu_co_mutex_unlock will go back to case (1). | |
176 | * | |
177 | * The following functions manage this queue. | |
178 | */ | |
179 | typedef struct CoWaitRecord { | |
180 | Coroutine *co; | |
181 | QSLIST_ENTRY(CoWaitRecord) next; | |
182 | } CoWaitRecord; | |
183 | ||
184 | static void push_waiter(CoMutex *mutex, CoWaitRecord *w) | |
185 | { | |
186 | w->co = qemu_coroutine_self(); | |
187 | QSLIST_INSERT_HEAD_ATOMIC(&mutex->from_push, w, next); | |
188 | } | |
189 | ||
190 | static void move_waiters(CoMutex *mutex) | |
191 | { | |
192 | QSLIST_HEAD(, CoWaitRecord) reversed; | |
193 | QSLIST_MOVE_ATOMIC(&reversed, &mutex->from_push); | |
194 | while (!QSLIST_EMPTY(&reversed)) { | |
195 | CoWaitRecord *w = QSLIST_FIRST(&reversed); | |
196 | QSLIST_REMOVE_HEAD(&reversed, next); | |
197 | QSLIST_INSERT_HEAD(&mutex->to_pop, w, next); | |
198 | } | |
199 | } | |
200 | ||
201 | static CoWaitRecord *pop_waiter(CoMutex *mutex) | |
202 | { | |
203 | CoWaitRecord *w; | |
204 | ||
205 | if (QSLIST_EMPTY(&mutex->to_pop)) { | |
206 | move_waiters(mutex); | |
207 | if (QSLIST_EMPTY(&mutex->to_pop)) { | |
208 | return NULL; | |
209 | } | |
210 | } | |
211 | w = QSLIST_FIRST(&mutex->to_pop); | |
212 | QSLIST_REMOVE_HEAD(&mutex->to_pop, next); | |
213 | return w; | |
214 | } | |
215 | ||
216 | static bool has_waiters(CoMutex *mutex) | |
217 | { | |
218 | return QSLIST_EMPTY(&mutex->to_pop) || QSLIST_EMPTY(&mutex->from_push); | |
219 | } | |
220 | ||
b96e9247 KW |
221 | void qemu_co_mutex_init(CoMutex *mutex) |
222 | { | |
223 | memset(mutex, 0, sizeof(*mutex)); | |
b96e9247 KW |
224 | } |
225 | ||
480cff63 PB |
226 | static void coroutine_fn qemu_co_mutex_wake(CoMutex *mutex, Coroutine *co) |
227 | { | |
228 | /* Read co before co->ctx; pairs with smp_wmb() in | |
229 | * qemu_coroutine_enter(). | |
230 | */ | |
231 | smp_read_barrier_depends(); | |
232 | mutex->ctx = co->ctx; | |
233 | aio_co_wake(co); | |
234 | } | |
235 | ||
236 | static void coroutine_fn qemu_co_mutex_lock_slowpath(AioContext *ctx, | |
237 | CoMutex *mutex) | |
b96e9247 KW |
238 | { |
239 | Coroutine *self = qemu_coroutine_self(); | |
fed20a70 PB |
240 | CoWaitRecord w; |
241 | unsigned old_handoff; | |
b96e9247 KW |
242 | |
243 | trace_qemu_co_mutex_lock_entry(mutex, self); | |
fed20a70 PB |
244 | w.co = self; |
245 | push_waiter(mutex, &w); | |
b96e9247 | 246 | |
fed20a70 PB |
247 | /* This is the "Responsibility Hand-Off" protocol; a lock() picks from |
248 | * a concurrent unlock() the responsibility of waking somebody up. | |
249 | */ | |
250 | old_handoff = atomic_mb_read(&mutex->handoff); | |
251 | if (old_handoff && | |
252 | has_waiters(mutex) && | |
253 | atomic_cmpxchg(&mutex->handoff, old_handoff, 0) == old_handoff) { | |
254 | /* There can be no concurrent pops, because there can be only | |
255 | * one active handoff at a time. | |
256 | */ | |
257 | CoWaitRecord *to_wake = pop_waiter(mutex); | |
258 | Coroutine *co = to_wake->co; | |
259 | if (co == self) { | |
260 | /* We got the lock ourselves! */ | |
261 | assert(to_wake == &w); | |
480cff63 | 262 | mutex->ctx = ctx; |
fed20a70 PB |
263 | return; |
264 | } | |
265 | ||
480cff63 | 266 | qemu_co_mutex_wake(mutex, co); |
b96e9247 KW |
267 | } |
268 | ||
fed20a70 PB |
269 | qemu_coroutine_yield(); |
270 | trace_qemu_co_mutex_lock_return(mutex, self); | |
271 | } | |
272 | ||
273 | void coroutine_fn qemu_co_mutex_lock(CoMutex *mutex) | |
274 | { | |
480cff63 | 275 | AioContext *ctx = qemu_get_current_aio_context(); |
fed20a70 | 276 | Coroutine *self = qemu_coroutine_self(); |
480cff63 PB |
277 | int waiters, i; |
278 | ||
279 | /* Running a very small critical section on pthread_mutex_t and CoMutex | |
280 | * shows that pthread_mutex_t is much faster because it doesn't actually | |
281 | * go to sleep. What happens is that the critical section is shorter | |
282 | * than the latency of entering the kernel and thus FUTEX_WAIT always | |
283 | * fails. With CoMutex there is no such latency but you still want to | |
284 | * avoid wait and wakeup. So introduce it artificially. | |
285 | */ | |
286 | i = 0; | |
287 | retry_fast_path: | |
288 | waiters = atomic_cmpxchg(&mutex->locked, 0, 1); | |
289 | if (waiters != 0) { | |
290 | while (waiters == 1 && ++i < 1000) { | |
291 | if (atomic_read(&mutex->ctx) == ctx) { | |
292 | break; | |
293 | } | |
294 | if (atomic_read(&mutex->locked) == 0) { | |
295 | goto retry_fast_path; | |
296 | } | |
297 | cpu_relax(); | |
298 | } | |
299 | waiters = atomic_fetch_inc(&mutex->locked); | |
300 | } | |
fed20a70 | 301 | |
480cff63 | 302 | if (waiters == 0) { |
fed20a70 PB |
303 | /* Uncontended. */ |
304 | trace_qemu_co_mutex_lock_uncontended(mutex, self); | |
480cff63 | 305 | mutex->ctx = ctx; |
fed20a70 | 306 | } else { |
480cff63 | 307 | qemu_co_mutex_lock_slowpath(ctx, mutex); |
fed20a70 | 308 | } |
0e438cdc | 309 | mutex->holder = self; |
1b7f01d9 | 310 | self->locks_held++; |
b96e9247 KW |
311 | } |
312 | ||
313 | void coroutine_fn qemu_co_mutex_unlock(CoMutex *mutex) | |
314 | { | |
315 | Coroutine *self = qemu_coroutine_self(); | |
316 | ||
317 | trace_qemu_co_mutex_unlock_entry(mutex, self); | |
318 | ||
fed20a70 | 319 | assert(mutex->locked); |
0e438cdc | 320 | assert(mutex->holder == self); |
b96e9247 KW |
321 | assert(qemu_in_coroutine()); |
322 | ||
480cff63 | 323 | mutex->ctx = NULL; |
0e438cdc | 324 | mutex->holder = NULL; |
1b7f01d9 | 325 | self->locks_held--; |
fed20a70 PB |
326 | if (atomic_fetch_dec(&mutex->locked) == 1) { |
327 | /* No waiting qemu_co_mutex_lock(). Pfew, that was easy! */ | |
328 | return; | |
329 | } | |
330 | ||
331 | for (;;) { | |
332 | CoWaitRecord *to_wake = pop_waiter(mutex); | |
333 | unsigned our_handoff; | |
334 | ||
335 | if (to_wake) { | |
480cff63 | 336 | qemu_co_mutex_wake(mutex, to_wake->co); |
fed20a70 PB |
337 | break; |
338 | } | |
339 | ||
340 | /* Some concurrent lock() is in progress (we know this because | |
341 | * mutex->locked was >1) but it hasn't yet put itself on the wait | |
342 | * queue. Pick a sequence number for the handoff protocol (not 0). | |
343 | */ | |
344 | if (++mutex->sequence == 0) { | |
345 | mutex->sequence = 1; | |
346 | } | |
347 | ||
348 | our_handoff = mutex->sequence; | |
349 | atomic_mb_set(&mutex->handoff, our_handoff); | |
350 | if (!has_waiters(mutex)) { | |
351 | /* The concurrent lock has not added itself yet, so it | |
352 | * will be able to pick our handoff. | |
353 | */ | |
354 | break; | |
355 | } | |
356 | ||
357 | /* Try to do the handoff protocol ourselves; if somebody else has | |
358 | * already taken it, however, we're done and they're responsible. | |
359 | */ | |
360 | if (atomic_cmpxchg(&mutex->handoff, our_handoff, 0) != our_handoff) { | |
361 | break; | |
362 | } | |
363 | } | |
b96e9247 KW |
364 | |
365 | trace_qemu_co_mutex_unlock_return(mutex, self); | |
366 | } | |
12888904 AK |
367 | |
368 | void qemu_co_rwlock_init(CoRwlock *lock) | |
369 | { | |
370 | memset(lock, 0, sizeof(*lock)); | |
371 | qemu_co_queue_init(&lock->queue); | |
a7b91d35 | 372 | qemu_co_mutex_init(&lock->mutex); |
12888904 AK |
373 | } |
374 | ||
375 | void qemu_co_rwlock_rdlock(CoRwlock *lock) | |
376 | { | |
1b7f01d9 KW |
377 | Coroutine *self = qemu_coroutine_self(); |
378 | ||
a7b91d35 PB |
379 | qemu_co_mutex_lock(&lock->mutex); |
380 | /* For fairness, wait if a writer is in line. */ | |
381 | while (lock->pending_writer) { | |
382 | qemu_co_queue_wait(&lock->queue, &lock->mutex); | |
12888904 AK |
383 | } |
384 | lock->reader++; | |
a7b91d35 PB |
385 | qemu_co_mutex_unlock(&lock->mutex); |
386 | ||
387 | /* The rest of the read-side critical section is run without the mutex. */ | |
1b7f01d9 | 388 | self->locks_held++; |
12888904 AK |
389 | } |
390 | ||
391 | void qemu_co_rwlock_unlock(CoRwlock *lock) | |
392 | { | |
1b7f01d9 KW |
393 | Coroutine *self = qemu_coroutine_self(); |
394 | ||
12888904 | 395 | assert(qemu_in_coroutine()); |
a7b91d35 PB |
396 | if (!lock->reader) { |
397 | /* The critical section started in qemu_co_rwlock_wrlock. */ | |
e8ee5e4c | 398 | qemu_co_queue_restart_all(&lock->queue); |
12888904 | 399 | } else { |
a7b91d35 PB |
400 | self->locks_held--; |
401 | ||
402 | qemu_co_mutex_lock(&lock->mutex); | |
12888904 AK |
403 | lock->reader--; |
404 | assert(lock->reader >= 0); | |
405 | /* Wakeup only one waiting writer */ | |
406 | if (!lock->reader) { | |
407 | qemu_co_queue_next(&lock->queue); | |
408 | } | |
409 | } | |
a7b91d35 | 410 | qemu_co_mutex_unlock(&lock->mutex); |
12888904 AK |
411 | } |
412 | ||
667221c1 PB |
413 | void qemu_co_rwlock_downgrade(CoRwlock *lock) |
414 | { | |
415 | Coroutine *self = qemu_coroutine_self(); | |
416 | ||
417 | /* lock->mutex critical section started in qemu_co_rwlock_wrlock or | |
418 | * qemu_co_rwlock_upgrade. | |
419 | */ | |
420 | assert(lock->reader == 0); | |
421 | lock->reader++; | |
422 | qemu_co_mutex_unlock(&lock->mutex); | |
423 | ||
424 | /* The rest of the read-side critical section is run without the mutex. */ | |
425 | self->locks_held++; | |
426 | } | |
427 | ||
12888904 AK |
428 | void qemu_co_rwlock_wrlock(CoRwlock *lock) |
429 | { | |
a7b91d35 PB |
430 | qemu_co_mutex_lock(&lock->mutex); |
431 | lock->pending_writer++; | |
432 | while (lock->reader) { | |
433 | qemu_co_queue_wait(&lock->queue, &lock->mutex); | |
12888904 | 434 | } |
a7b91d35 PB |
435 | lock->pending_writer--; |
436 | ||
437 | /* The rest of the write-side critical section is run with | |
438 | * the mutex taken, so that lock->reader remains zero. | |
439 | * There is no need to update self->locks_held. | |
440 | */ | |
12888904 | 441 | } |
667221c1 PB |
442 | |
443 | void qemu_co_rwlock_upgrade(CoRwlock *lock) | |
444 | { | |
445 | Coroutine *self = qemu_coroutine_self(); | |
446 | ||
447 | qemu_co_mutex_lock(&lock->mutex); | |
448 | assert(lock->reader > 0); | |
449 | lock->reader--; | |
450 | lock->pending_writer++; | |
451 | while (lock->reader) { | |
452 | qemu_co_queue_wait(&lock->queue, &lock->mutex); | |
453 | } | |
454 | lock->pending_writer--; | |
455 | ||
456 | /* The rest of the write-side critical section is run with | |
457 | * the mutex taken, similar to qemu_co_rwlock_wrlock. Do | |
458 | * not account for the lock twice in self->locks_held. | |
459 | */ | |
460 | self->locks_held--; | |
461 | } |