1 /* 2 * coroutine queues and locks 3 * 4 * Copyright (c) 2011 Kevin Wolf <kwolf@redhat.com> 5 * 6 * Permission is hereby granted, free of charge, to any person obtaining a copy 7 * of this software and associated documentation files (the "Software"), to deal 8 * in the Software without restriction, including without limitation the rights 9 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell 10 * copies of the Software, and to permit persons to whom the Software is 11 * furnished to do so, subject to the following conditions: 12 * 13 * The above copyright notice and this permission notice shall be included in 14 * all copies or substantial portions of the Software. 15 * 16 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR 17 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, 18 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL 19 * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER 20 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, 21 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN 22 * THE SOFTWARE. 23 * 24 * The lock-free mutex implementation is based on OSv 25 * (core/lfmutex.cc, include/lockfree/mutex.hh). 26 * Copyright (C) 2013 Cloudius Systems, Ltd. 27 */ 28 29 #include "qemu/osdep.h" 30 #include "qemu/coroutine.h" 31 #include "qemu/coroutine_int.h" 32 #include "qemu/processor.h" 33 #include "qemu/queue.h" 34 #include "block/aio.h" 35 #include "trace.h" 36 37 void qemu_co_queue_init(CoQueue *queue) 38 { 39 QSIMPLEQ_INIT(&queue->entries); 40 } 41 42 void coroutine_fn qemu_co_queue_wait_impl(CoQueue *queue, QemuLockable *lock, 43 CoQueueWaitFlags flags) 44 { 45 Coroutine *self = qemu_coroutine_self(); 46 if (flags & CO_QUEUE_WAIT_FRONT) { 47 QSIMPLEQ_INSERT_HEAD(&queue->entries, self, co_queue_next); 48 } else { 49 QSIMPLEQ_INSERT_TAIL(&queue->entries, self, co_queue_next); 50 } 51 52 if (lock) { 53 qemu_lockable_unlock(lock); 54 } 55 56 /* There is no race condition here. Other threads will call 57 * aio_co_schedule on our AioContext, which can reenter this 58 * coroutine but only after this yield and after the main loop 59 * has gone through the next iteration. 60 */ 61 qemu_coroutine_yield(); 62 assert(qemu_in_coroutine()); 63 64 /* TODO: OSv implements wait morphing here, where the wakeup 65 * primitive automatically places the woken coroutine on the 66 * mutex's queue. This avoids the thundering herd effect. 67 * This could be implemented for CoMutexes, but not really for 68 * other cases of QemuLockable. 69 */ 70 if (lock) { 71 qemu_lockable_lock(lock); 72 } 73 } 74 75 bool qemu_co_enter_next_impl(CoQueue *queue, QemuLockable *lock) 76 { 77 Coroutine *next; 78 79 next = QSIMPLEQ_FIRST(&queue->entries); 80 if (!next) { 81 return false; 82 } 83 84 QSIMPLEQ_REMOVE_HEAD(&queue->entries, co_queue_next); 85 if (lock) { 86 qemu_lockable_unlock(lock); 87 } 88 aio_co_wake(next); 89 if (lock) { 90 qemu_lockable_lock(lock); 91 } 92 return true; 93 } 94 95 bool coroutine_fn qemu_co_queue_next(CoQueue *queue) 96 { 97 /* No unlock/lock needed in coroutine context. */ 98 return qemu_co_enter_next_impl(queue, NULL); 99 } 100 101 void qemu_co_enter_all_impl(CoQueue *queue, QemuLockable *lock) 102 { 103 while (qemu_co_enter_next_impl(queue, lock)) { 104 /* just loop */ 105 } 106 } 107 108 void coroutine_fn qemu_co_queue_restart_all(CoQueue *queue) 109 { 110 /* No unlock/lock needed in coroutine context. */ 111 qemu_co_enter_all_impl(queue, NULL); 112 } 113 114 bool qemu_co_queue_empty(CoQueue *queue) 115 { 116 return QSIMPLEQ_FIRST(&queue->entries) == NULL; 117 } 118 119 /* The wait records are handled with a multiple-producer, single-consumer 120 * lock-free queue. There cannot be two concurrent pop_waiter() calls 121 * because pop_waiter() can only be called while mutex->handoff is zero. 122 * This can happen in three cases: 123 * - in qemu_co_mutex_unlock, before the hand-off protocol has started. 124 * In this case, qemu_co_mutex_lock will see mutex->handoff == 0 and 125 * not take part in the handoff. 126 * - in qemu_co_mutex_lock, if it steals the hand-off responsibility from 127 * qemu_co_mutex_unlock. In this case, qemu_co_mutex_unlock will fail 128 * the cmpxchg (it will see either 0 or the next sequence value) and 129 * exit. The next hand-off cannot begin until qemu_co_mutex_lock has 130 * woken up someone. 131 * - in qemu_co_mutex_unlock, if it takes the hand-off token itself. 132 * In this case another iteration starts with mutex->handoff == 0; 133 * a concurrent qemu_co_mutex_lock will fail the cmpxchg, and 134 * qemu_co_mutex_unlock will go back to case (1). 135 * 136 * The following functions manage this queue. 137 */ 138 typedef struct CoWaitRecord { 139 Coroutine *co; 140 QSLIST_ENTRY(CoWaitRecord) next; 141 } CoWaitRecord; 142 143 static void coroutine_fn push_waiter(CoMutex *mutex, CoWaitRecord *w) 144 { 145 w->co = qemu_coroutine_self(); 146 QSLIST_INSERT_HEAD_ATOMIC(&mutex->from_push, w, next); 147 } 148 149 static void move_waiters(CoMutex *mutex) 150 { 151 QSLIST_HEAD(, CoWaitRecord) reversed; 152 QSLIST_MOVE_ATOMIC(&reversed, &mutex->from_push); 153 while (!QSLIST_EMPTY(&reversed)) { 154 CoWaitRecord *w = QSLIST_FIRST(&reversed); 155 QSLIST_REMOVE_HEAD(&reversed, next); 156 QSLIST_INSERT_HEAD(&mutex->to_pop, w, next); 157 } 158 } 159 160 static CoWaitRecord *pop_waiter(CoMutex *mutex) 161 { 162 CoWaitRecord *w; 163 164 if (QSLIST_EMPTY(&mutex->to_pop)) { 165 move_waiters(mutex); 166 if (QSLIST_EMPTY(&mutex->to_pop)) { 167 return NULL; 168 } 169 } 170 w = QSLIST_FIRST(&mutex->to_pop); 171 QSLIST_REMOVE_HEAD(&mutex->to_pop, next); 172 return w; 173 } 174 175 static bool has_waiters(CoMutex *mutex) 176 { 177 return QSLIST_EMPTY(&mutex->to_pop) || QSLIST_EMPTY(&mutex->from_push); 178 } 179 180 void qemu_co_mutex_init(CoMutex *mutex) 181 { 182 memset(mutex, 0, sizeof(*mutex)); 183 } 184 185 static void coroutine_fn qemu_co_mutex_wake(CoMutex *mutex, Coroutine *co) 186 { 187 /* Read co before co->ctx; pairs with smp_wmb() in 188 * qemu_coroutine_enter(). 189 */ 190 smp_read_barrier_depends(); 191 mutex->ctx = co->ctx; 192 aio_co_wake(co); 193 } 194 195 static void coroutine_fn qemu_co_mutex_lock_slowpath(AioContext *ctx, 196 CoMutex *mutex) 197 { 198 Coroutine *self = qemu_coroutine_self(); 199 CoWaitRecord w; 200 unsigned old_handoff; 201 202 trace_qemu_co_mutex_lock_entry(mutex, self); 203 push_waiter(mutex, &w); 204 205 /* This is the "Responsibility Hand-Off" protocol; a lock() picks from 206 * a concurrent unlock() the responsibility of waking somebody up. 207 */ 208 old_handoff = qatomic_mb_read(&mutex->handoff); 209 if (old_handoff && 210 has_waiters(mutex) && 211 qatomic_cmpxchg(&mutex->handoff, old_handoff, 0) == old_handoff) { 212 /* There can be no concurrent pops, because there can be only 213 * one active handoff at a time. 214 */ 215 CoWaitRecord *to_wake = pop_waiter(mutex); 216 Coroutine *co = to_wake->co; 217 if (co == self) { 218 /* We got the lock ourselves! */ 219 assert(to_wake == &w); 220 mutex->ctx = ctx; 221 return; 222 } 223 224 qemu_co_mutex_wake(mutex, co); 225 } 226 227 qemu_coroutine_yield(); 228 trace_qemu_co_mutex_lock_return(mutex, self); 229 } 230 231 void coroutine_fn qemu_co_mutex_lock(CoMutex *mutex) 232 { 233 AioContext *ctx = qemu_get_current_aio_context(); 234 Coroutine *self = qemu_coroutine_self(); 235 int waiters, i; 236 237 /* Running a very small critical section on pthread_mutex_t and CoMutex 238 * shows that pthread_mutex_t is much faster because it doesn't actually 239 * go to sleep. What happens is that the critical section is shorter 240 * than the latency of entering the kernel and thus FUTEX_WAIT always 241 * fails. With CoMutex there is no such latency but you still want to 242 * avoid wait and wakeup. So introduce it artificially. 243 */ 244 i = 0; 245 retry_fast_path: 246 waiters = qatomic_cmpxchg(&mutex->locked, 0, 1); 247 if (waiters != 0) { 248 while (waiters == 1 && ++i < 1000) { 249 if (qatomic_read(&mutex->ctx) == ctx) { 250 break; 251 } 252 if (qatomic_read(&mutex->locked) == 0) { 253 goto retry_fast_path; 254 } 255 cpu_relax(); 256 } 257 waiters = qatomic_fetch_inc(&mutex->locked); 258 } 259 260 if (waiters == 0) { 261 /* Uncontended. */ 262 trace_qemu_co_mutex_lock_uncontended(mutex, self); 263 mutex->ctx = ctx; 264 } else { 265 qemu_co_mutex_lock_slowpath(ctx, mutex); 266 } 267 mutex->holder = self; 268 self->locks_held++; 269 } 270 271 void coroutine_fn qemu_co_mutex_unlock(CoMutex *mutex) 272 { 273 Coroutine *self = qemu_coroutine_self(); 274 275 trace_qemu_co_mutex_unlock_entry(mutex, self); 276 277 assert(mutex->locked); 278 assert(mutex->holder == self); 279 assert(qemu_in_coroutine()); 280 281 mutex->ctx = NULL; 282 mutex->holder = NULL; 283 self->locks_held--; 284 if (qatomic_fetch_dec(&mutex->locked) == 1) { 285 /* No waiting qemu_co_mutex_lock(). Pfew, that was easy! */ 286 return; 287 } 288 289 for (;;) { 290 CoWaitRecord *to_wake = pop_waiter(mutex); 291 unsigned our_handoff; 292 293 if (to_wake) { 294 qemu_co_mutex_wake(mutex, to_wake->co); 295 break; 296 } 297 298 /* Some concurrent lock() is in progress (we know this because 299 * mutex->locked was >1) but it hasn't yet put itself on the wait 300 * queue. Pick a sequence number for the handoff protocol (not 0). 301 */ 302 if (++mutex->sequence == 0) { 303 mutex->sequence = 1; 304 } 305 306 our_handoff = mutex->sequence; 307 qatomic_mb_set(&mutex->handoff, our_handoff); 308 if (!has_waiters(mutex)) { 309 /* The concurrent lock has not added itself yet, so it 310 * will be able to pick our handoff. 311 */ 312 break; 313 } 314 315 /* Try to do the handoff protocol ourselves; if somebody else has 316 * already taken it, however, we're done and they're responsible. 317 */ 318 if (qatomic_cmpxchg(&mutex->handoff, our_handoff, 0) != our_handoff) { 319 break; 320 } 321 } 322 323 trace_qemu_co_mutex_unlock_return(mutex, self); 324 } 325 326 struct CoRwTicket { 327 bool read; 328 Coroutine *co; 329 QSIMPLEQ_ENTRY(CoRwTicket) next; 330 }; 331 332 void qemu_co_rwlock_init(CoRwlock *lock) 333 { 334 qemu_co_mutex_init(&lock->mutex); 335 lock->owners = 0; 336 QSIMPLEQ_INIT(&lock->tickets); 337 } 338 339 /* Releases the internal CoMutex. */ 340 static void coroutine_fn qemu_co_rwlock_maybe_wake_one(CoRwlock *lock) 341 { 342 CoRwTicket *tkt = QSIMPLEQ_FIRST(&lock->tickets); 343 Coroutine *co = NULL; 344 345 /* 346 * Setting lock->owners here prevents rdlock and wrlock from 347 * sneaking in between unlock and wake. 348 */ 349 350 if (tkt) { 351 if (tkt->read) { 352 if (lock->owners >= 0) { 353 lock->owners++; 354 co = tkt->co; 355 } 356 } else { 357 if (lock->owners == 0) { 358 lock->owners = -1; 359 co = tkt->co; 360 } 361 } 362 } 363 364 if (co) { 365 QSIMPLEQ_REMOVE_HEAD(&lock->tickets, next); 366 qemu_co_mutex_unlock(&lock->mutex); 367 aio_co_wake(co); 368 } else { 369 qemu_co_mutex_unlock(&lock->mutex); 370 } 371 } 372 373 void coroutine_fn qemu_co_rwlock_rdlock(CoRwlock *lock) 374 { 375 Coroutine *self = qemu_coroutine_self(); 376 377 qemu_co_mutex_lock(&lock->mutex); 378 /* For fairness, wait if a writer is in line. */ 379 if (lock->owners == 0 || (lock->owners > 0 && QSIMPLEQ_EMPTY(&lock->tickets))) { 380 lock->owners++; 381 qemu_co_mutex_unlock(&lock->mutex); 382 } else { 383 CoRwTicket my_ticket = { true, self }; 384 385 QSIMPLEQ_INSERT_TAIL(&lock->tickets, &my_ticket, next); 386 qemu_co_mutex_unlock(&lock->mutex); 387 qemu_coroutine_yield(); 388 assert(lock->owners >= 1); 389 390 /* Possibly wake another reader, which will wake the next in line. */ 391 qemu_co_mutex_lock(&lock->mutex); 392 qemu_co_rwlock_maybe_wake_one(lock); 393 } 394 395 self->locks_held++; 396 } 397 398 void coroutine_fn qemu_co_rwlock_unlock(CoRwlock *lock) 399 { 400 Coroutine *self = qemu_coroutine_self(); 401 402 assert(qemu_in_coroutine()); 403 self->locks_held--; 404 405 qemu_co_mutex_lock(&lock->mutex); 406 if (lock->owners > 0) { 407 lock->owners--; 408 } else { 409 assert(lock->owners == -1); 410 lock->owners = 0; 411 } 412 413 qemu_co_rwlock_maybe_wake_one(lock); 414 } 415 416 void coroutine_fn qemu_co_rwlock_downgrade(CoRwlock *lock) 417 { 418 qemu_co_mutex_lock(&lock->mutex); 419 assert(lock->owners == -1); 420 lock->owners = 1; 421 422 /* Possibly wake another reader, which will wake the next in line. */ 423 qemu_co_rwlock_maybe_wake_one(lock); 424 } 425 426 void coroutine_fn qemu_co_rwlock_wrlock(CoRwlock *lock) 427 { 428 Coroutine *self = qemu_coroutine_self(); 429 430 qemu_co_mutex_lock(&lock->mutex); 431 if (lock->owners == 0) { 432 lock->owners = -1; 433 qemu_co_mutex_unlock(&lock->mutex); 434 } else { 435 CoRwTicket my_ticket = { false, qemu_coroutine_self() }; 436 437 QSIMPLEQ_INSERT_TAIL(&lock->tickets, &my_ticket, next); 438 qemu_co_mutex_unlock(&lock->mutex); 439 qemu_coroutine_yield(); 440 assert(lock->owners == -1); 441 } 442 443 self->locks_held++; 444 } 445 446 void coroutine_fn qemu_co_rwlock_upgrade(CoRwlock *lock) 447 { 448 qemu_co_mutex_lock(&lock->mutex); 449 assert(lock->owners > 0); 450 /* For fairness, wait if a writer is in line. */ 451 if (lock->owners == 1 && QSIMPLEQ_EMPTY(&lock->tickets)) { 452 lock->owners = -1; 453 qemu_co_mutex_unlock(&lock->mutex); 454 } else { 455 CoRwTicket my_ticket = { false, qemu_coroutine_self() }; 456 457 lock->owners--; 458 QSIMPLEQ_INSERT_TAIL(&lock->tickets, &my_ticket, next); 459 qemu_co_rwlock_maybe_wake_one(lock); 460 qemu_coroutine_yield(); 461 assert(lock->owners == -1); 462 } 463 } 464