xref: /openbmc/qemu/util/qemu-coroutine-lock.c (revision dc5bd18f)
1 /*
2  * coroutine queues and locks
3  *
4  * Copyright (c) 2011 Kevin Wolf <kwolf@redhat.com>
5  *
6  * Permission is hereby granted, free of charge, to any person obtaining a copy
7  * of this software and associated documentation files (the "Software"), to deal
8  * in the Software without restriction, including without limitation the rights
9  * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
10  * copies of the Software, and to permit persons to whom the Software is
11  * furnished to do so, subject to the following conditions:
12  *
13  * The above copyright notice and this permission notice shall be included in
14  * all copies or substantial portions of the Software.
15  *
16  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
17  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
18  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
19  * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
20  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
21  * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
22  * THE SOFTWARE.
23  *
24  * The lock-free mutex implementation is based on OSv
25  * (core/lfmutex.cc, include/lockfree/mutex.hh).
26  * Copyright (C) 2013 Cloudius Systems, Ltd.
27  */
28 
29 #include "qemu/osdep.h"
30 #include "qemu-common.h"
31 #include "qemu/coroutine.h"
32 #include "qemu/coroutine_int.h"
33 #include "qemu/processor.h"
34 #include "qemu/queue.h"
35 #include "block/aio.h"
36 #include "trace.h"
37 
38 void qemu_co_queue_init(CoQueue *queue)
39 {
40     QSIMPLEQ_INIT(&queue->entries);
41 }
42 
43 void coroutine_fn qemu_co_queue_wait_impl(CoQueue *queue, QemuLockable *lock)
44 {
45     Coroutine *self = qemu_coroutine_self();
46     QSIMPLEQ_INSERT_TAIL(&queue->entries, self, co_queue_next);
47 
48     if (lock) {
49         qemu_lockable_unlock(lock);
50     }
51 
52     /* There is no race condition here.  Other threads will call
53      * aio_co_schedule on our AioContext, which can reenter this
54      * coroutine but only after this yield and after the main loop
55      * has gone through the next iteration.
56      */
57     qemu_coroutine_yield();
58     assert(qemu_in_coroutine());
59 
60     /* TODO: OSv implements wait morphing here, where the wakeup
61      * primitive automatically places the woken coroutine on the
62      * mutex's queue.  This avoids the thundering herd effect.
63      * This could be implemented for CoMutexes, but not really for
64      * other cases of QemuLockable.
65      */
66     if (lock) {
67         qemu_lockable_lock(lock);
68     }
69 }
70 
71 /**
72  * qemu_co_queue_run_restart:
73  *
74  * Enter each coroutine that was previously marked for restart by
75  * qemu_co_queue_next() or qemu_co_queue_restart_all().  This function is
76  * invoked by the core coroutine code when the current coroutine yields or
77  * terminates.
78  */
79 void qemu_co_queue_run_restart(Coroutine *co)
80 {
81     Coroutine *next;
82     QSIMPLEQ_HEAD(, Coroutine) tmp_queue_wakeup =
83         QSIMPLEQ_HEAD_INITIALIZER(tmp_queue_wakeup);
84 
85     trace_qemu_co_queue_run_restart(co);
86 
87     /* Because "co" has yielded, any coroutine that we wakeup can resume it.
88      * If this happens and "co" terminates, co->co_queue_wakeup becomes
89      * invalid memory.  Therefore, use a temporary queue and do not touch
90      * the "co" coroutine as soon as you enter another one.
91      *
92      * In its turn resumed "co" can populate "co_queue_wakeup" queue with
93      * new coroutines to be woken up.  The caller, who has resumed "co",
94      * will be responsible for traversing the same queue, which may cause
95      * a different wakeup order but not any missing wakeups.
96      */
97     QSIMPLEQ_CONCAT(&tmp_queue_wakeup, &co->co_queue_wakeup);
98 
99     while ((next = QSIMPLEQ_FIRST(&tmp_queue_wakeup))) {
100         QSIMPLEQ_REMOVE_HEAD(&tmp_queue_wakeup, co_queue_next);
101         qemu_coroutine_enter(next);
102     }
103 }
104 
105 static bool qemu_co_queue_do_restart(CoQueue *queue, bool single)
106 {
107     Coroutine *next;
108 
109     if (QSIMPLEQ_EMPTY(&queue->entries)) {
110         return false;
111     }
112 
113     while ((next = QSIMPLEQ_FIRST(&queue->entries)) != NULL) {
114         QSIMPLEQ_REMOVE_HEAD(&queue->entries, co_queue_next);
115         aio_co_wake(next);
116         if (single) {
117             break;
118         }
119     }
120     return true;
121 }
122 
123 bool coroutine_fn qemu_co_queue_next(CoQueue *queue)
124 {
125     assert(qemu_in_coroutine());
126     return qemu_co_queue_do_restart(queue, true);
127 }
128 
129 void coroutine_fn qemu_co_queue_restart_all(CoQueue *queue)
130 {
131     assert(qemu_in_coroutine());
132     qemu_co_queue_do_restart(queue, false);
133 }
134 
135 bool qemu_co_enter_next_impl(CoQueue *queue, QemuLockable *lock)
136 {
137     Coroutine *next;
138 
139     next = QSIMPLEQ_FIRST(&queue->entries);
140     if (!next) {
141         return false;
142     }
143 
144     QSIMPLEQ_REMOVE_HEAD(&queue->entries, co_queue_next);
145     if (lock) {
146         qemu_lockable_unlock(lock);
147     }
148     aio_co_wake(next);
149     if (lock) {
150         qemu_lockable_lock(lock);
151     }
152     return true;
153 }
154 
155 bool qemu_co_queue_empty(CoQueue *queue)
156 {
157     return QSIMPLEQ_FIRST(&queue->entries) == NULL;
158 }
159 
160 /* The wait records are handled with a multiple-producer, single-consumer
161  * lock-free queue.  There cannot be two concurrent pop_waiter() calls
162  * because pop_waiter() can only be called while mutex->handoff is zero.
163  * This can happen in three cases:
164  * - in qemu_co_mutex_unlock, before the hand-off protocol has started.
165  *   In this case, qemu_co_mutex_lock will see mutex->handoff == 0 and
166  *   not take part in the handoff.
167  * - in qemu_co_mutex_lock, if it steals the hand-off responsibility from
168  *   qemu_co_mutex_unlock.  In this case, qemu_co_mutex_unlock will fail
169  *   the cmpxchg (it will see either 0 or the next sequence value) and
170  *   exit.  The next hand-off cannot begin until qemu_co_mutex_lock has
171  *   woken up someone.
172  * - in qemu_co_mutex_unlock, if it takes the hand-off token itself.
173  *   In this case another iteration starts with mutex->handoff == 0;
174  *   a concurrent qemu_co_mutex_lock will fail the cmpxchg, and
175  *   qemu_co_mutex_unlock will go back to case (1).
176  *
177  * The following functions manage this queue.
178  */
179 typedef struct CoWaitRecord {
180     Coroutine *co;
181     QSLIST_ENTRY(CoWaitRecord) next;
182 } CoWaitRecord;
183 
184 static void push_waiter(CoMutex *mutex, CoWaitRecord *w)
185 {
186     w->co = qemu_coroutine_self();
187     QSLIST_INSERT_HEAD_ATOMIC(&mutex->from_push, w, next);
188 }
189 
190 static void move_waiters(CoMutex *mutex)
191 {
192     QSLIST_HEAD(, CoWaitRecord) reversed;
193     QSLIST_MOVE_ATOMIC(&reversed, &mutex->from_push);
194     while (!QSLIST_EMPTY(&reversed)) {
195         CoWaitRecord *w = QSLIST_FIRST(&reversed);
196         QSLIST_REMOVE_HEAD(&reversed, next);
197         QSLIST_INSERT_HEAD(&mutex->to_pop, w, next);
198     }
199 }
200 
201 static CoWaitRecord *pop_waiter(CoMutex *mutex)
202 {
203     CoWaitRecord *w;
204 
205     if (QSLIST_EMPTY(&mutex->to_pop)) {
206         move_waiters(mutex);
207         if (QSLIST_EMPTY(&mutex->to_pop)) {
208             return NULL;
209         }
210     }
211     w = QSLIST_FIRST(&mutex->to_pop);
212     QSLIST_REMOVE_HEAD(&mutex->to_pop, next);
213     return w;
214 }
215 
216 static bool has_waiters(CoMutex *mutex)
217 {
218     return QSLIST_EMPTY(&mutex->to_pop) || QSLIST_EMPTY(&mutex->from_push);
219 }
220 
221 void qemu_co_mutex_init(CoMutex *mutex)
222 {
223     memset(mutex, 0, sizeof(*mutex));
224 }
225 
226 static void coroutine_fn qemu_co_mutex_wake(CoMutex *mutex, Coroutine *co)
227 {
228     /* Read co before co->ctx; pairs with smp_wmb() in
229      * qemu_coroutine_enter().
230      */
231     smp_read_barrier_depends();
232     mutex->ctx = co->ctx;
233     aio_co_wake(co);
234 }
235 
236 static void coroutine_fn qemu_co_mutex_lock_slowpath(AioContext *ctx,
237                                                      CoMutex *mutex)
238 {
239     Coroutine *self = qemu_coroutine_self();
240     CoWaitRecord w;
241     unsigned old_handoff;
242 
243     trace_qemu_co_mutex_lock_entry(mutex, self);
244     w.co = self;
245     push_waiter(mutex, &w);
246 
247     /* This is the "Responsibility Hand-Off" protocol; a lock() picks from
248      * a concurrent unlock() the responsibility of waking somebody up.
249      */
250     old_handoff = atomic_mb_read(&mutex->handoff);
251     if (old_handoff &&
252         has_waiters(mutex) &&
253         atomic_cmpxchg(&mutex->handoff, old_handoff, 0) == old_handoff) {
254         /* There can be no concurrent pops, because there can be only
255          * one active handoff at a time.
256          */
257         CoWaitRecord *to_wake = pop_waiter(mutex);
258         Coroutine *co = to_wake->co;
259         if (co == self) {
260             /* We got the lock ourselves!  */
261             assert(to_wake == &w);
262             mutex->ctx = ctx;
263             return;
264         }
265 
266         qemu_co_mutex_wake(mutex, co);
267     }
268 
269     qemu_coroutine_yield();
270     trace_qemu_co_mutex_lock_return(mutex, self);
271 }
272 
273 void coroutine_fn qemu_co_mutex_lock(CoMutex *mutex)
274 {
275     AioContext *ctx = qemu_get_current_aio_context();
276     Coroutine *self = qemu_coroutine_self();
277     int waiters, i;
278 
279     /* Running a very small critical section on pthread_mutex_t and CoMutex
280      * shows that pthread_mutex_t is much faster because it doesn't actually
281      * go to sleep.  What happens is that the critical section is shorter
282      * than the latency of entering the kernel and thus FUTEX_WAIT always
283      * fails.  With CoMutex there is no such latency but you still want to
284      * avoid wait and wakeup.  So introduce it artificially.
285      */
286     i = 0;
287 retry_fast_path:
288     waiters = atomic_cmpxchg(&mutex->locked, 0, 1);
289     if (waiters != 0) {
290         while (waiters == 1 && ++i < 1000) {
291             if (atomic_read(&mutex->ctx) == ctx) {
292                 break;
293             }
294             if (atomic_read(&mutex->locked) == 0) {
295                 goto retry_fast_path;
296             }
297             cpu_relax();
298         }
299         waiters = atomic_fetch_inc(&mutex->locked);
300     }
301 
302     if (waiters == 0) {
303         /* Uncontended.  */
304         trace_qemu_co_mutex_lock_uncontended(mutex, self);
305         mutex->ctx = ctx;
306     } else {
307         qemu_co_mutex_lock_slowpath(ctx, mutex);
308     }
309     mutex->holder = self;
310     self->locks_held++;
311 }
312 
313 void coroutine_fn qemu_co_mutex_unlock(CoMutex *mutex)
314 {
315     Coroutine *self = qemu_coroutine_self();
316 
317     trace_qemu_co_mutex_unlock_entry(mutex, self);
318 
319     assert(mutex->locked);
320     assert(mutex->holder == self);
321     assert(qemu_in_coroutine());
322 
323     mutex->ctx = NULL;
324     mutex->holder = NULL;
325     self->locks_held--;
326     if (atomic_fetch_dec(&mutex->locked) == 1) {
327         /* No waiting qemu_co_mutex_lock().  Pfew, that was easy!  */
328         return;
329     }
330 
331     for (;;) {
332         CoWaitRecord *to_wake = pop_waiter(mutex);
333         unsigned our_handoff;
334 
335         if (to_wake) {
336             qemu_co_mutex_wake(mutex, to_wake->co);
337             break;
338         }
339 
340         /* Some concurrent lock() is in progress (we know this because
341          * mutex->locked was >1) but it hasn't yet put itself on the wait
342          * queue.  Pick a sequence number for the handoff protocol (not 0).
343          */
344         if (++mutex->sequence == 0) {
345             mutex->sequence = 1;
346         }
347 
348         our_handoff = mutex->sequence;
349         atomic_mb_set(&mutex->handoff, our_handoff);
350         if (!has_waiters(mutex)) {
351             /* The concurrent lock has not added itself yet, so it
352              * will be able to pick our handoff.
353              */
354             break;
355         }
356 
357         /* Try to do the handoff protocol ourselves; if somebody else has
358          * already taken it, however, we're done and they're responsible.
359          */
360         if (atomic_cmpxchg(&mutex->handoff, our_handoff, 0) != our_handoff) {
361             break;
362         }
363     }
364 
365     trace_qemu_co_mutex_unlock_return(mutex, self);
366 }
367 
368 void qemu_co_rwlock_init(CoRwlock *lock)
369 {
370     memset(lock, 0, sizeof(*lock));
371     qemu_co_queue_init(&lock->queue);
372     qemu_co_mutex_init(&lock->mutex);
373 }
374 
375 void qemu_co_rwlock_rdlock(CoRwlock *lock)
376 {
377     Coroutine *self = qemu_coroutine_self();
378 
379     qemu_co_mutex_lock(&lock->mutex);
380     /* For fairness, wait if a writer is in line.  */
381     while (lock->pending_writer) {
382         qemu_co_queue_wait(&lock->queue, &lock->mutex);
383     }
384     lock->reader++;
385     qemu_co_mutex_unlock(&lock->mutex);
386 
387     /* The rest of the read-side critical section is run without the mutex.  */
388     self->locks_held++;
389 }
390 
391 void qemu_co_rwlock_unlock(CoRwlock *lock)
392 {
393     Coroutine *self = qemu_coroutine_self();
394 
395     assert(qemu_in_coroutine());
396     if (!lock->reader) {
397         /* The critical section started in qemu_co_rwlock_wrlock.  */
398         qemu_co_queue_restart_all(&lock->queue);
399     } else {
400         self->locks_held--;
401 
402         qemu_co_mutex_lock(&lock->mutex);
403         lock->reader--;
404         assert(lock->reader >= 0);
405         /* Wakeup only one waiting writer */
406         if (!lock->reader) {
407             qemu_co_queue_next(&lock->queue);
408         }
409     }
410     qemu_co_mutex_unlock(&lock->mutex);
411 }
412 
413 void qemu_co_rwlock_downgrade(CoRwlock *lock)
414 {
415     Coroutine *self = qemu_coroutine_self();
416 
417     /* lock->mutex critical section started in qemu_co_rwlock_wrlock or
418      * qemu_co_rwlock_upgrade.
419      */
420     assert(lock->reader == 0);
421     lock->reader++;
422     qemu_co_mutex_unlock(&lock->mutex);
423 
424     /* The rest of the read-side critical section is run without the mutex.  */
425     self->locks_held++;
426 }
427 
428 void qemu_co_rwlock_wrlock(CoRwlock *lock)
429 {
430     qemu_co_mutex_lock(&lock->mutex);
431     lock->pending_writer++;
432     while (lock->reader) {
433         qemu_co_queue_wait(&lock->queue, &lock->mutex);
434     }
435     lock->pending_writer--;
436 
437     /* The rest of the write-side critical section is run with
438      * the mutex taken, so that lock->reader remains zero.
439      * There is no need to update self->locks_held.
440      */
441 }
442 
443 void qemu_co_rwlock_upgrade(CoRwlock *lock)
444 {
445     Coroutine *self = qemu_coroutine_self();
446 
447     qemu_co_mutex_lock(&lock->mutex);
448     assert(lock->reader > 0);
449     lock->reader--;
450     lock->pending_writer++;
451     while (lock->reader) {
452         qemu_co_queue_wait(&lock->queue, &lock->mutex);
453     }
454     lock->pending_writer--;
455 
456     /* The rest of the write-side critical section is run with
457      * the mutex taken, similar to qemu_co_rwlock_wrlock.  Do
458      * not account for the lock twice in self->locks_held.
459      */
460     self->locks_held--;
461 }
462