1 // SPDX-License-Identifier: GPL-2.0-or-later 2 /* 3 * Queued spinlock 4 * 5 * (C) Copyright 2013-2015 Hewlett-Packard Development Company, L.P. 6 * (C) Copyright 2013-2014,2018 Red Hat, Inc. 7 * (C) Copyright 2015 Intel Corp. 8 * (C) Copyright 2015 Hewlett-Packard Enterprise Development LP 9 * 10 * Authors: Waiman Long <longman@redhat.com> 11 * Peter Zijlstra <peterz@infradead.org> 12 */ 13 14 #ifndef _GEN_PV_LOCK_SLOWPATH 15 16 #include <linux/smp.h> 17 #include <linux/bug.h> 18 #include <linux/cpumask.h> 19 #include <linux/percpu.h> 20 #include <linux/hardirq.h> 21 #include <linux/mutex.h> 22 #include <linux/prefetch.h> 23 #include <asm/byteorder.h> 24 #include <asm/qspinlock.h> 25 26 /* 27 * Include queued spinlock statistics code 28 */ 29 #include "qspinlock_stat.h" 30 31 /* 32 * The basic principle of a queue-based spinlock can best be understood 33 * by studying a classic queue-based spinlock implementation called the 34 * MCS lock. A copy of the original MCS lock paper ("Algorithms for Scalable 35 * Synchronization on Shared-Memory Multiprocessors by Mellor-Crummey and 36 * Scott") is available at 37 * 38 * https://bugzilla.kernel.org/show_bug.cgi?id=206115 39 * 40 * This queued spinlock implementation is based on the MCS lock, however to 41 * make it fit the 4 bytes we assume spinlock_t to be, and preserve its 42 * existing API, we must modify it somehow. 43 * 44 * In particular; where the traditional MCS lock consists of a tail pointer 45 * (8 bytes) and needs the next pointer (another 8 bytes) of its own node to 46 * unlock the next pending (next->locked), we compress both these: {tail, 47 * next->locked} into a single u32 value. 48 * 49 * Since a spinlock disables recursion of its own context and there is a limit 50 * to the contexts that can nest; namely: task, softirq, hardirq, nmi. As there 51 * are at most 4 nesting levels, it can be encoded by a 2-bit number. Now 52 * we can encode the tail by combining the 2-bit nesting level with the cpu 53 * number. With one byte for the lock value and 3 bytes for the tail, only a 54 * 32-bit word is now needed. Even though we only need 1 bit for the lock, 55 * we extend it to a full byte to achieve better performance for architectures 56 * that support atomic byte write. 57 * 58 * We also change the first spinner to spin on the lock bit instead of its 59 * node; whereby avoiding the need to carry a node from lock to unlock, and 60 * preserving existing lock API. This also makes the unlock code simpler and 61 * faster. 62 * 63 * N.B. The current implementation only supports architectures that allow 64 * atomic operations on smaller 8-bit and 16-bit data types. 65 * 66 */ 67 68 #include "mcs_spinlock.h" 69 #define MAX_NODES 4 70 71 /* 72 * On 64-bit architectures, the mcs_spinlock structure will be 16 bytes in 73 * size and four of them will fit nicely in one 64-byte cacheline. For 74 * pvqspinlock, however, we need more space for extra data. To accommodate 75 * that, we insert two more long words to pad it up to 32 bytes. IOW, only 76 * two of them can fit in a cacheline in this case. That is OK as it is rare 77 * to have more than 2 levels of slowpath nesting in actual use. We don't 78 * want to penalize pvqspinlocks to optimize for a rare case in native 79 * qspinlocks. 80 */ 81 struct qnode { 82 struct mcs_spinlock mcs; 83 #ifdef CONFIG_PARAVIRT_SPINLOCKS 84 long reserved[2]; 85 #endif 86 }; 87 88 /* 89 * The pending bit spinning loop count. 90 * This heuristic is used to limit the number of lockword accesses 91 * made by atomic_cond_read_relaxed when waiting for the lock to 92 * transition out of the "== _Q_PENDING_VAL" state. We don't spin 93 * indefinitely because there's no guarantee that we'll make forward 94 * progress. 95 */ 96 #ifndef _Q_PENDING_LOOPS 97 #define _Q_PENDING_LOOPS 1 98 #endif 99 100 /* 101 * Per-CPU queue node structures; we can never have more than 4 nested 102 * contexts: task, softirq, hardirq, nmi. 103 * 104 * Exactly fits one 64-byte cacheline on a 64-bit architecture. 105 * 106 * PV doubles the storage and uses the second cacheline for PV state. 107 */ 108 static DEFINE_PER_CPU_ALIGNED(struct qnode, qnodes[MAX_NODES]); 109 110 /* 111 * We must be able to distinguish between no-tail and the tail at 0:0, 112 * therefore increment the cpu number by one. 113 */ 114 115 static inline __pure u32 encode_tail(int cpu, int idx) 116 { 117 u32 tail; 118 119 tail = (cpu + 1) << _Q_TAIL_CPU_OFFSET; 120 tail |= idx << _Q_TAIL_IDX_OFFSET; /* assume < 4 */ 121 122 return tail; 123 } 124 125 static inline __pure struct mcs_spinlock *decode_tail(u32 tail) 126 { 127 int cpu = (tail >> _Q_TAIL_CPU_OFFSET) - 1; 128 int idx = (tail & _Q_TAIL_IDX_MASK) >> _Q_TAIL_IDX_OFFSET; 129 130 return per_cpu_ptr(&qnodes[idx].mcs, cpu); 131 } 132 133 static inline __pure 134 struct mcs_spinlock *grab_mcs_node(struct mcs_spinlock *base, int idx) 135 { 136 return &((struct qnode *)base + idx)->mcs; 137 } 138 139 #define _Q_LOCKED_PENDING_MASK (_Q_LOCKED_MASK | _Q_PENDING_MASK) 140 141 #if _Q_PENDING_BITS == 8 142 /** 143 * clear_pending - clear the pending bit. 144 * @lock: Pointer to queued spinlock structure 145 * 146 * *,1,* -> *,0,* 147 */ 148 static __always_inline void clear_pending(struct qspinlock *lock) 149 { 150 WRITE_ONCE(lock->pending, 0); 151 } 152 153 /** 154 * clear_pending_set_locked - take ownership and clear the pending bit. 155 * @lock: Pointer to queued spinlock structure 156 * 157 * *,1,0 -> *,0,1 158 * 159 * Lock stealing is not allowed if this function is used. 160 */ 161 static __always_inline void clear_pending_set_locked(struct qspinlock *lock) 162 { 163 WRITE_ONCE(lock->locked_pending, _Q_LOCKED_VAL); 164 } 165 166 /* 167 * xchg_tail - Put in the new queue tail code word & retrieve previous one 168 * @lock : Pointer to queued spinlock structure 169 * @tail : The new queue tail code word 170 * Return: The previous queue tail code word 171 * 172 * xchg(lock, tail), which heads an address dependency 173 * 174 * p,*,* -> n,*,* ; prev = xchg(lock, node) 175 */ 176 static __always_inline u32 xchg_tail(struct qspinlock *lock, u32 tail) 177 { 178 /* 179 * We can use relaxed semantics since the caller ensures that the 180 * MCS node is properly initialized before updating the tail. 181 */ 182 return (u32)xchg_relaxed(&lock->tail, 183 tail >> _Q_TAIL_OFFSET) << _Q_TAIL_OFFSET; 184 } 185 186 #else /* _Q_PENDING_BITS == 8 */ 187 188 /** 189 * clear_pending - clear the pending bit. 190 * @lock: Pointer to queued spinlock structure 191 * 192 * *,1,* -> *,0,* 193 */ 194 static __always_inline void clear_pending(struct qspinlock *lock) 195 { 196 atomic_andnot(_Q_PENDING_VAL, &lock->val); 197 } 198 199 /** 200 * clear_pending_set_locked - take ownership and clear the pending bit. 201 * @lock: Pointer to queued spinlock structure 202 * 203 * *,1,0 -> *,0,1 204 */ 205 static __always_inline void clear_pending_set_locked(struct qspinlock *lock) 206 { 207 atomic_add(-_Q_PENDING_VAL + _Q_LOCKED_VAL, &lock->val); 208 } 209 210 /** 211 * xchg_tail - Put in the new queue tail code word & retrieve previous one 212 * @lock : Pointer to queued spinlock structure 213 * @tail : The new queue tail code word 214 * Return: The previous queue tail code word 215 * 216 * xchg(lock, tail) 217 * 218 * p,*,* -> n,*,* ; prev = xchg(lock, node) 219 */ 220 static __always_inline u32 xchg_tail(struct qspinlock *lock, u32 tail) 221 { 222 u32 old, new, val = atomic_read(&lock->val); 223 224 for (;;) { 225 new = (val & _Q_LOCKED_PENDING_MASK) | tail; 226 /* 227 * We can use relaxed semantics since the caller ensures that 228 * the MCS node is properly initialized before updating the 229 * tail. 230 */ 231 old = atomic_cmpxchg_relaxed(&lock->val, val, new); 232 if (old == val) 233 break; 234 235 val = old; 236 } 237 return old; 238 } 239 #endif /* _Q_PENDING_BITS == 8 */ 240 241 /** 242 * queued_fetch_set_pending_acquire - fetch the whole lock value and set pending 243 * @lock : Pointer to queued spinlock structure 244 * Return: The previous lock value 245 * 246 * *,*,* -> *,1,* 247 */ 248 #ifndef queued_fetch_set_pending_acquire 249 static __always_inline u32 queued_fetch_set_pending_acquire(struct qspinlock *lock) 250 { 251 return atomic_fetch_or_acquire(_Q_PENDING_VAL, &lock->val); 252 } 253 #endif 254 255 /** 256 * set_locked - Set the lock bit and own the lock 257 * @lock: Pointer to queued spinlock structure 258 * 259 * *,*,0 -> *,0,1 260 */ 261 static __always_inline void set_locked(struct qspinlock *lock) 262 { 263 WRITE_ONCE(lock->locked, _Q_LOCKED_VAL); 264 } 265 266 267 /* 268 * Generate the native code for queued_spin_unlock_slowpath(); provide NOPs for 269 * all the PV callbacks. 270 */ 271 272 static __always_inline void __pv_init_node(struct mcs_spinlock *node) { } 273 static __always_inline void __pv_wait_node(struct mcs_spinlock *node, 274 struct mcs_spinlock *prev) { } 275 static __always_inline void __pv_kick_node(struct qspinlock *lock, 276 struct mcs_spinlock *node) { } 277 static __always_inline u32 __pv_wait_head_or_lock(struct qspinlock *lock, 278 struct mcs_spinlock *node) 279 { return 0; } 280 281 #define pv_enabled() false 282 283 #define pv_init_node __pv_init_node 284 #define pv_wait_node __pv_wait_node 285 #define pv_kick_node __pv_kick_node 286 #define pv_wait_head_or_lock __pv_wait_head_or_lock 287 288 #ifdef CONFIG_PARAVIRT_SPINLOCKS 289 #define queued_spin_lock_slowpath native_queued_spin_lock_slowpath 290 #endif 291 292 #endif /* _GEN_PV_LOCK_SLOWPATH */ 293 294 /** 295 * queued_spin_lock_slowpath - acquire the queued spinlock 296 * @lock: Pointer to queued spinlock structure 297 * @val: Current value of the queued spinlock 32-bit word 298 * 299 * (queue tail, pending bit, lock value) 300 * 301 * fast : slow : unlock 302 * : : 303 * uncontended (0,0,0) -:--> (0,0,1) ------------------------------:--> (*,*,0) 304 * : | ^--------.------. / : 305 * : v \ \ | : 306 * pending : (0,1,1) +--> (0,1,0) \ | : 307 * : | ^--' | | : 308 * : v | | : 309 * uncontended : (n,x,y) +--> (n,0,0) --' | : 310 * queue : | ^--' | : 311 * : v | : 312 * contended : (*,x,y) +--> (*,0,0) ---> (*,0,1) -' : 313 * queue : ^--' : 314 */ 315 void queued_spin_lock_slowpath(struct qspinlock *lock, u32 val) 316 { 317 struct mcs_spinlock *prev, *next, *node; 318 u32 old, tail; 319 int idx; 320 321 BUILD_BUG_ON(CONFIG_NR_CPUS >= (1U << _Q_TAIL_CPU_BITS)); 322 323 if (pv_enabled()) 324 goto pv_queue; 325 326 if (virt_spin_lock(lock)) 327 return; 328 329 /* 330 * Wait for in-progress pending->locked hand-overs with a bounded 331 * number of spins so that we guarantee forward progress. 332 * 333 * 0,1,0 -> 0,0,1 334 */ 335 if (val == _Q_PENDING_VAL) { 336 int cnt = _Q_PENDING_LOOPS; 337 val = atomic_cond_read_relaxed(&lock->val, 338 (VAL != _Q_PENDING_VAL) || !cnt--); 339 } 340 341 /* 342 * If we observe any contention; queue. 343 */ 344 if (val & ~_Q_LOCKED_MASK) 345 goto queue; 346 347 /* 348 * trylock || pending 349 * 350 * 0,0,* -> 0,1,* -> 0,0,1 pending, trylock 351 */ 352 val = queued_fetch_set_pending_acquire(lock); 353 354 /* 355 * If we observe contention, there is a concurrent locker. 356 * 357 * Undo and queue; our setting of PENDING might have made the 358 * n,0,0 -> 0,0,0 transition fail and it will now be waiting 359 * on @next to become !NULL. 360 */ 361 if (unlikely(val & ~_Q_LOCKED_MASK)) { 362 363 /* Undo PENDING if we set it. */ 364 if (!(val & _Q_PENDING_MASK)) 365 clear_pending(lock); 366 367 goto queue; 368 } 369 370 /* 371 * We're pending, wait for the owner to go away. 372 * 373 * 0,1,1 -> 0,1,0 374 * 375 * this wait loop must be a load-acquire such that we match the 376 * store-release that clears the locked bit and create lock 377 * sequentiality; this is because not all 378 * clear_pending_set_locked() implementations imply full 379 * barriers. 380 */ 381 if (val & _Q_LOCKED_MASK) 382 atomic_cond_read_acquire(&lock->val, !(VAL & _Q_LOCKED_MASK)); 383 384 /* 385 * take ownership and clear the pending bit. 386 * 387 * 0,1,0 -> 0,0,1 388 */ 389 clear_pending_set_locked(lock); 390 lockevent_inc(lock_pending); 391 return; 392 393 /* 394 * End of pending bit optimistic spinning and beginning of MCS 395 * queuing. 396 */ 397 queue: 398 lockevent_inc(lock_slowpath); 399 pv_queue: 400 node = this_cpu_ptr(&qnodes[0].mcs); 401 idx = node->count++; 402 tail = encode_tail(smp_processor_id(), idx); 403 404 /* 405 * 4 nodes are allocated based on the assumption that there will 406 * not be nested NMIs taking spinlocks. That may not be true in 407 * some architectures even though the chance of needing more than 408 * 4 nodes will still be extremely unlikely. When that happens, 409 * we fall back to spinning on the lock directly without using 410 * any MCS node. This is not the most elegant solution, but is 411 * simple enough. 412 */ 413 if (unlikely(idx >= MAX_NODES)) { 414 lockevent_inc(lock_no_node); 415 while (!queued_spin_trylock(lock)) 416 cpu_relax(); 417 goto release; 418 } 419 420 node = grab_mcs_node(node, idx); 421 422 /* 423 * Keep counts of non-zero index values: 424 */ 425 lockevent_cond_inc(lock_use_node2 + idx - 1, idx); 426 427 /* 428 * Ensure that we increment the head node->count before initialising 429 * the actual node. If the compiler is kind enough to reorder these 430 * stores, then an IRQ could overwrite our assignments. 431 */ 432 barrier(); 433 434 node->locked = 0; 435 node->next = NULL; 436 pv_init_node(node); 437 438 /* 439 * We touched a (possibly) cold cacheline in the per-cpu queue node; 440 * attempt the trylock once more in the hope someone let go while we 441 * weren't watching. 442 */ 443 if (queued_spin_trylock(lock)) 444 goto release; 445 446 /* 447 * Ensure that the initialisation of @node is complete before we 448 * publish the updated tail via xchg_tail() and potentially link 449 * @node into the waitqueue via WRITE_ONCE(prev->next, node) below. 450 */ 451 smp_wmb(); 452 453 /* 454 * Publish the updated tail. 455 * We have already touched the queueing cacheline; don't bother with 456 * pending stuff. 457 * 458 * p,*,* -> n,*,* 459 */ 460 old = xchg_tail(lock, tail); 461 next = NULL; 462 463 /* 464 * if there was a previous node; link it and wait until reaching the 465 * head of the waitqueue. 466 */ 467 if (old & _Q_TAIL_MASK) { 468 prev = decode_tail(old); 469 470 /* Link @node into the waitqueue. */ 471 WRITE_ONCE(prev->next, node); 472 473 pv_wait_node(node, prev); 474 arch_mcs_spin_lock_contended(&node->locked); 475 476 /* 477 * While waiting for the MCS lock, the next pointer may have 478 * been set by another lock waiter. We optimistically load 479 * the next pointer & prefetch the cacheline for writing 480 * to reduce latency in the upcoming MCS unlock operation. 481 */ 482 next = READ_ONCE(node->next); 483 if (next) 484 prefetchw(next); 485 } 486 487 /* 488 * we're at the head of the waitqueue, wait for the owner & pending to 489 * go away. 490 * 491 * *,x,y -> *,0,0 492 * 493 * this wait loop must use a load-acquire such that we match the 494 * store-release that clears the locked bit and create lock 495 * sequentiality; this is because the set_locked() function below 496 * does not imply a full barrier. 497 * 498 * The PV pv_wait_head_or_lock function, if active, will acquire 499 * the lock and return a non-zero value. So we have to skip the 500 * atomic_cond_read_acquire() call. As the next PV queue head hasn't 501 * been designated yet, there is no way for the locked value to become 502 * _Q_SLOW_VAL. So both the set_locked() and the 503 * atomic_cmpxchg_relaxed() calls will be safe. 504 * 505 * If PV isn't active, 0 will be returned instead. 506 * 507 */ 508 if ((val = pv_wait_head_or_lock(lock, node))) 509 goto locked; 510 511 val = atomic_cond_read_acquire(&lock->val, !(VAL & _Q_LOCKED_PENDING_MASK)); 512 513 locked: 514 /* 515 * claim the lock: 516 * 517 * n,0,0 -> 0,0,1 : lock, uncontended 518 * *,*,0 -> *,*,1 : lock, contended 519 * 520 * If the queue head is the only one in the queue (lock value == tail) 521 * and nobody is pending, clear the tail code and grab the lock. 522 * Otherwise, we only need to grab the lock. 523 */ 524 525 /* 526 * In the PV case we might already have _Q_LOCKED_VAL set, because 527 * of lock stealing; therefore we must also allow: 528 * 529 * n,0,1 -> 0,0,1 530 * 531 * Note: at this point: (val & _Q_PENDING_MASK) == 0, because of the 532 * above wait condition, therefore any concurrent setting of 533 * PENDING will make the uncontended transition fail. 534 */ 535 if ((val & _Q_TAIL_MASK) == tail) { 536 if (atomic_try_cmpxchg_relaxed(&lock->val, &val, _Q_LOCKED_VAL)) 537 goto release; /* No contention */ 538 } 539 540 /* 541 * Either somebody is queued behind us or _Q_PENDING_VAL got set 542 * which will then detect the remaining tail and queue behind us 543 * ensuring we'll see a @next. 544 */ 545 set_locked(lock); 546 547 /* 548 * contended path; wait for next if not observed yet, release. 549 */ 550 if (!next) 551 next = smp_cond_load_relaxed(&node->next, (VAL)); 552 553 arch_mcs_spin_unlock_contended(&next->locked); 554 pv_kick_node(lock, next); 555 556 release: 557 /* 558 * release the node 559 */ 560 __this_cpu_dec(qnodes[0].mcs.count); 561 } 562 EXPORT_SYMBOL(queued_spin_lock_slowpath); 563 564 /* 565 * Generate the paravirt code for queued_spin_unlock_slowpath(). 566 */ 567 #if !defined(_GEN_PV_LOCK_SLOWPATH) && defined(CONFIG_PARAVIRT_SPINLOCKS) 568 #define _GEN_PV_LOCK_SLOWPATH 569 570 #undef pv_enabled 571 #define pv_enabled() true 572 573 #undef pv_init_node 574 #undef pv_wait_node 575 #undef pv_kick_node 576 #undef pv_wait_head_or_lock 577 578 #undef queued_spin_lock_slowpath 579 #define queued_spin_lock_slowpath __pv_queued_spin_lock_slowpath 580 581 #include "qspinlock_paravirt.h" 582 #include "qspinlock.c" 583 584 #endif 585