1 /* 2 * Definitions for the 'struct ptr_ring' datastructure. 3 * 4 * Author: 5 * Michael S. Tsirkin <mst@redhat.com> 6 * 7 * Copyright (C) 2016 Red Hat, Inc. 8 * 9 * This program is free software; you can redistribute it and/or modify it 10 * under the terms of the GNU General Public License as published by the 11 * Free Software Foundation; either version 2 of the License, or (at your 12 * option) any later version. 13 * 14 * This is a limited-size FIFO maintaining pointers in FIFO order, with 15 * one CPU producing entries and another consuming entries from a FIFO. 16 * 17 * This implementation tries to minimize cache-contention when there is a 18 * single producer and a single consumer CPU. 19 */ 20 21 #ifndef _LINUX_PTR_RING_H 22 #define _LINUX_PTR_RING_H 1 23 24 #ifdef __KERNEL__ 25 #include <linux/spinlock.h> 26 #include <linux/cache.h> 27 #include <linux/types.h> 28 #include <linux/compiler.h> 29 #include <linux/cache.h> 30 #include <linux/slab.h> 31 #include <asm/errno.h> 32 #endif 33 34 struct ptr_ring { 35 int producer ____cacheline_aligned_in_smp; 36 spinlock_t producer_lock; 37 int consumer_head ____cacheline_aligned_in_smp; /* next valid entry */ 38 int consumer_tail; /* next entry to invalidate */ 39 spinlock_t consumer_lock; 40 /* Shared consumer/producer data */ 41 /* Read-only by both the producer and the consumer */ 42 int size ____cacheline_aligned_in_smp; /* max entries in queue */ 43 int batch; /* number of entries to consume in a batch */ 44 void **queue; 45 }; 46 47 /* Note: callers invoking this in a loop must use a compiler barrier, 48 * for example cpu_relax(). 49 * 50 * NB: this is unlike __ptr_ring_empty in that callers must hold producer_lock: 51 * see e.g. ptr_ring_full. 52 */ 53 static inline bool __ptr_ring_full(struct ptr_ring *r) 54 { 55 return r->queue[r->producer]; 56 } 57 58 static inline bool ptr_ring_full(struct ptr_ring *r) 59 { 60 bool ret; 61 62 spin_lock(&r->producer_lock); 63 ret = __ptr_ring_full(r); 64 spin_unlock(&r->producer_lock); 65 66 return ret; 67 } 68 69 static inline bool ptr_ring_full_irq(struct ptr_ring *r) 70 { 71 bool ret; 72 73 spin_lock_irq(&r->producer_lock); 74 ret = __ptr_ring_full(r); 75 spin_unlock_irq(&r->producer_lock); 76 77 return ret; 78 } 79 80 static inline bool ptr_ring_full_any(struct ptr_ring *r) 81 { 82 unsigned long flags; 83 bool ret; 84 85 spin_lock_irqsave(&r->producer_lock, flags); 86 ret = __ptr_ring_full(r); 87 spin_unlock_irqrestore(&r->producer_lock, flags); 88 89 return ret; 90 } 91 92 static inline bool ptr_ring_full_bh(struct ptr_ring *r) 93 { 94 bool ret; 95 96 spin_lock_bh(&r->producer_lock); 97 ret = __ptr_ring_full(r); 98 spin_unlock_bh(&r->producer_lock); 99 100 return ret; 101 } 102 103 /* Note: callers invoking this in a loop must use a compiler barrier, 104 * for example cpu_relax(). Callers must hold producer_lock. 105 * Callers are responsible for making sure pointer that is being queued 106 * points to a valid data. 107 */ 108 static inline int __ptr_ring_produce(struct ptr_ring *r, void *ptr) 109 { 110 if (unlikely(!r->size) || r->queue[r->producer]) 111 return -ENOSPC; 112 113 /* Make sure the pointer we are storing points to a valid data. */ 114 /* Pairs with smp_read_barrier_depends in __ptr_ring_consume. */ 115 smp_wmb(); 116 117 WRITE_ONCE(r->queue[r->producer++], ptr); 118 if (unlikely(r->producer >= r->size)) 119 r->producer = 0; 120 return 0; 121 } 122 123 /* 124 * Note: resize (below) nests producer lock within consumer lock, so if you 125 * consume in interrupt or BH context, you must disable interrupts/BH when 126 * calling this. 127 */ 128 static inline int ptr_ring_produce(struct ptr_ring *r, void *ptr) 129 { 130 int ret; 131 132 spin_lock(&r->producer_lock); 133 ret = __ptr_ring_produce(r, ptr); 134 spin_unlock(&r->producer_lock); 135 136 return ret; 137 } 138 139 static inline int ptr_ring_produce_irq(struct ptr_ring *r, void *ptr) 140 { 141 int ret; 142 143 spin_lock_irq(&r->producer_lock); 144 ret = __ptr_ring_produce(r, ptr); 145 spin_unlock_irq(&r->producer_lock); 146 147 return ret; 148 } 149 150 static inline int ptr_ring_produce_any(struct ptr_ring *r, void *ptr) 151 { 152 unsigned long flags; 153 int ret; 154 155 spin_lock_irqsave(&r->producer_lock, flags); 156 ret = __ptr_ring_produce(r, ptr); 157 spin_unlock_irqrestore(&r->producer_lock, flags); 158 159 return ret; 160 } 161 162 static inline int ptr_ring_produce_bh(struct ptr_ring *r, void *ptr) 163 { 164 int ret; 165 166 spin_lock_bh(&r->producer_lock); 167 ret = __ptr_ring_produce(r, ptr); 168 spin_unlock_bh(&r->producer_lock); 169 170 return ret; 171 } 172 173 static inline void *__ptr_ring_peek(struct ptr_ring *r) 174 { 175 if (likely(r->size)) 176 return READ_ONCE(r->queue[r->consumer_head]); 177 return NULL; 178 } 179 180 /* 181 * Test ring empty status without taking any locks. 182 * 183 * NB: This is only safe to call if ring is never resized. 184 * 185 * However, if some other CPU consumes ring entries at the same time, the value 186 * returned is not guaranteed to be correct. 187 * 188 * In this case - to avoid incorrectly detecting the ring 189 * as empty - the CPU consuming the ring entries is responsible 190 * for either consuming all ring entries until the ring is empty, 191 * or synchronizing with some other CPU and causing it to 192 * re-test __ptr_ring_empty and/or consume the ring enteries 193 * after the synchronization point. 194 * 195 * Note: callers invoking this in a loop must use a compiler barrier, 196 * for example cpu_relax(). 197 */ 198 static inline bool __ptr_ring_empty(struct ptr_ring *r) 199 { 200 if (likely(r->size)) 201 return !r->queue[READ_ONCE(r->consumer_head)]; 202 return true; 203 } 204 205 static inline bool ptr_ring_empty(struct ptr_ring *r) 206 { 207 bool ret; 208 209 spin_lock(&r->consumer_lock); 210 ret = __ptr_ring_empty(r); 211 spin_unlock(&r->consumer_lock); 212 213 return ret; 214 } 215 216 static inline bool ptr_ring_empty_irq(struct ptr_ring *r) 217 { 218 bool ret; 219 220 spin_lock_irq(&r->consumer_lock); 221 ret = __ptr_ring_empty(r); 222 spin_unlock_irq(&r->consumer_lock); 223 224 return ret; 225 } 226 227 static inline bool ptr_ring_empty_any(struct ptr_ring *r) 228 { 229 unsigned long flags; 230 bool ret; 231 232 spin_lock_irqsave(&r->consumer_lock, flags); 233 ret = __ptr_ring_empty(r); 234 spin_unlock_irqrestore(&r->consumer_lock, flags); 235 236 return ret; 237 } 238 239 static inline bool ptr_ring_empty_bh(struct ptr_ring *r) 240 { 241 bool ret; 242 243 spin_lock_bh(&r->consumer_lock); 244 ret = __ptr_ring_empty(r); 245 spin_unlock_bh(&r->consumer_lock); 246 247 return ret; 248 } 249 250 /* Must only be called after __ptr_ring_peek returned !NULL */ 251 static inline void __ptr_ring_discard_one(struct ptr_ring *r) 252 { 253 /* Fundamentally, what we want to do is update consumer 254 * index and zero out the entry so producer can reuse it. 255 * Doing it naively at each consume would be as simple as: 256 * consumer = r->consumer; 257 * r->queue[consumer++] = NULL; 258 * if (unlikely(consumer >= r->size)) 259 * consumer = 0; 260 * r->consumer = consumer; 261 * but that is suboptimal when the ring is full as producer is writing 262 * out new entries in the same cache line. Defer these updates until a 263 * batch of entries has been consumed. 264 */ 265 /* Note: we must keep consumer_head valid at all times for __ptr_ring_empty 266 * to work correctly. 267 */ 268 int consumer_head = r->consumer_head; 269 int head = consumer_head++; 270 271 /* Once we have processed enough entries invalidate them in 272 * the ring all at once so producer can reuse their space in the ring. 273 * We also do this when we reach end of the ring - not mandatory 274 * but helps keep the implementation simple. 275 */ 276 if (unlikely(consumer_head - r->consumer_tail >= r->batch || 277 consumer_head >= r->size)) { 278 /* Zero out entries in the reverse order: this way we touch the 279 * cache line that producer might currently be reading the last; 280 * producer won't make progress and touch other cache lines 281 * besides the first one until we write out all entries. 282 */ 283 while (likely(head >= r->consumer_tail)) 284 r->queue[head--] = NULL; 285 r->consumer_tail = consumer_head; 286 } 287 if (unlikely(consumer_head >= r->size)) { 288 consumer_head = 0; 289 r->consumer_tail = 0; 290 } 291 /* matching READ_ONCE in __ptr_ring_empty for lockless tests */ 292 WRITE_ONCE(r->consumer_head, consumer_head); 293 } 294 295 static inline void *__ptr_ring_consume(struct ptr_ring *r) 296 { 297 void *ptr; 298 299 /* The READ_ONCE in __ptr_ring_peek guarantees that anyone 300 * accessing data through the pointer is up to date. Pairs 301 * with smp_wmb in __ptr_ring_produce. 302 */ 303 ptr = __ptr_ring_peek(r); 304 if (ptr) 305 __ptr_ring_discard_one(r); 306 307 return ptr; 308 } 309 310 static inline int __ptr_ring_consume_batched(struct ptr_ring *r, 311 void **array, int n) 312 { 313 void *ptr; 314 int i; 315 316 for (i = 0; i < n; i++) { 317 ptr = __ptr_ring_consume(r); 318 if (!ptr) 319 break; 320 array[i] = ptr; 321 } 322 323 return i; 324 } 325 326 /* 327 * Note: resize (below) nests producer lock within consumer lock, so if you 328 * call this in interrupt or BH context, you must disable interrupts/BH when 329 * producing. 330 */ 331 static inline void *ptr_ring_consume(struct ptr_ring *r) 332 { 333 void *ptr; 334 335 spin_lock(&r->consumer_lock); 336 ptr = __ptr_ring_consume(r); 337 spin_unlock(&r->consumer_lock); 338 339 return ptr; 340 } 341 342 static inline void *ptr_ring_consume_irq(struct ptr_ring *r) 343 { 344 void *ptr; 345 346 spin_lock_irq(&r->consumer_lock); 347 ptr = __ptr_ring_consume(r); 348 spin_unlock_irq(&r->consumer_lock); 349 350 return ptr; 351 } 352 353 static inline void *ptr_ring_consume_any(struct ptr_ring *r) 354 { 355 unsigned long flags; 356 void *ptr; 357 358 spin_lock_irqsave(&r->consumer_lock, flags); 359 ptr = __ptr_ring_consume(r); 360 spin_unlock_irqrestore(&r->consumer_lock, flags); 361 362 return ptr; 363 } 364 365 static inline void *ptr_ring_consume_bh(struct ptr_ring *r) 366 { 367 void *ptr; 368 369 spin_lock_bh(&r->consumer_lock); 370 ptr = __ptr_ring_consume(r); 371 spin_unlock_bh(&r->consumer_lock); 372 373 return ptr; 374 } 375 376 static inline int ptr_ring_consume_batched(struct ptr_ring *r, 377 void **array, int n) 378 { 379 int ret; 380 381 spin_lock(&r->consumer_lock); 382 ret = __ptr_ring_consume_batched(r, array, n); 383 spin_unlock(&r->consumer_lock); 384 385 return ret; 386 } 387 388 static inline int ptr_ring_consume_batched_irq(struct ptr_ring *r, 389 void **array, int n) 390 { 391 int ret; 392 393 spin_lock_irq(&r->consumer_lock); 394 ret = __ptr_ring_consume_batched(r, array, n); 395 spin_unlock_irq(&r->consumer_lock); 396 397 return ret; 398 } 399 400 static inline int ptr_ring_consume_batched_any(struct ptr_ring *r, 401 void **array, int n) 402 { 403 unsigned long flags; 404 int ret; 405 406 spin_lock_irqsave(&r->consumer_lock, flags); 407 ret = __ptr_ring_consume_batched(r, array, n); 408 spin_unlock_irqrestore(&r->consumer_lock, flags); 409 410 return ret; 411 } 412 413 static inline int ptr_ring_consume_batched_bh(struct ptr_ring *r, 414 void **array, int n) 415 { 416 int ret; 417 418 spin_lock_bh(&r->consumer_lock); 419 ret = __ptr_ring_consume_batched(r, array, n); 420 spin_unlock_bh(&r->consumer_lock); 421 422 return ret; 423 } 424 425 /* Cast to structure type and call a function without discarding from FIFO. 426 * Function must return a value. 427 * Callers must take consumer_lock. 428 */ 429 #define __PTR_RING_PEEK_CALL(r, f) ((f)(__ptr_ring_peek(r))) 430 431 #define PTR_RING_PEEK_CALL(r, f) ({ \ 432 typeof((f)(NULL)) __PTR_RING_PEEK_CALL_v; \ 433 \ 434 spin_lock(&(r)->consumer_lock); \ 435 __PTR_RING_PEEK_CALL_v = __PTR_RING_PEEK_CALL(r, f); \ 436 spin_unlock(&(r)->consumer_lock); \ 437 __PTR_RING_PEEK_CALL_v; \ 438 }) 439 440 #define PTR_RING_PEEK_CALL_IRQ(r, f) ({ \ 441 typeof((f)(NULL)) __PTR_RING_PEEK_CALL_v; \ 442 \ 443 spin_lock_irq(&(r)->consumer_lock); \ 444 __PTR_RING_PEEK_CALL_v = __PTR_RING_PEEK_CALL(r, f); \ 445 spin_unlock_irq(&(r)->consumer_lock); \ 446 __PTR_RING_PEEK_CALL_v; \ 447 }) 448 449 #define PTR_RING_PEEK_CALL_BH(r, f) ({ \ 450 typeof((f)(NULL)) __PTR_RING_PEEK_CALL_v; \ 451 \ 452 spin_lock_bh(&(r)->consumer_lock); \ 453 __PTR_RING_PEEK_CALL_v = __PTR_RING_PEEK_CALL(r, f); \ 454 spin_unlock_bh(&(r)->consumer_lock); \ 455 __PTR_RING_PEEK_CALL_v; \ 456 }) 457 458 #define PTR_RING_PEEK_CALL_ANY(r, f) ({ \ 459 typeof((f)(NULL)) __PTR_RING_PEEK_CALL_v; \ 460 unsigned long __PTR_RING_PEEK_CALL_f;\ 461 \ 462 spin_lock_irqsave(&(r)->consumer_lock, __PTR_RING_PEEK_CALL_f); \ 463 __PTR_RING_PEEK_CALL_v = __PTR_RING_PEEK_CALL(r, f); \ 464 spin_unlock_irqrestore(&(r)->consumer_lock, __PTR_RING_PEEK_CALL_f); \ 465 __PTR_RING_PEEK_CALL_v; \ 466 }) 467 468 /* Not all gfp_t flags (besides GFP_KERNEL) are allowed. See 469 * documentation for vmalloc for which of them are legal. 470 */ 471 static inline void **__ptr_ring_init_queue_alloc(unsigned int size, gfp_t gfp) 472 { 473 if (size > KMALLOC_MAX_SIZE / sizeof(void *)) 474 return NULL; 475 return kvmalloc_array(size, sizeof(void *), gfp | __GFP_ZERO); 476 } 477 478 static inline void __ptr_ring_set_size(struct ptr_ring *r, int size) 479 { 480 r->size = size; 481 r->batch = SMP_CACHE_BYTES * 2 / sizeof(*(r->queue)); 482 /* We need to set batch at least to 1 to make logic 483 * in __ptr_ring_discard_one work correctly. 484 * Batching too much (because ring is small) would cause a lot of 485 * burstiness. Needs tuning, for now disable batching. 486 */ 487 if (r->batch > r->size / 2 || !r->batch) 488 r->batch = 1; 489 } 490 491 static inline int ptr_ring_init(struct ptr_ring *r, int size, gfp_t gfp) 492 { 493 r->queue = __ptr_ring_init_queue_alloc(size, gfp); 494 if (!r->queue) 495 return -ENOMEM; 496 497 __ptr_ring_set_size(r, size); 498 r->producer = r->consumer_head = r->consumer_tail = 0; 499 spin_lock_init(&r->producer_lock); 500 spin_lock_init(&r->consumer_lock); 501 502 return 0; 503 } 504 505 /* 506 * Return entries into ring. Destroy entries that don't fit. 507 * 508 * Note: this is expected to be a rare slow path operation. 509 * 510 * Note: producer lock is nested within consumer lock, so if you 511 * resize you must make sure all uses nest correctly. 512 * In particular if you consume ring in interrupt or BH context, you must 513 * disable interrupts/BH when doing so. 514 */ 515 static inline void ptr_ring_unconsume(struct ptr_ring *r, void **batch, int n, 516 void (*destroy)(void *)) 517 { 518 unsigned long flags; 519 int head; 520 521 spin_lock_irqsave(&r->consumer_lock, flags); 522 spin_lock(&r->producer_lock); 523 524 if (!r->size) 525 goto done; 526 527 /* 528 * Clean out buffered entries (for simplicity). This way following code 529 * can test entries for NULL and if not assume they are valid. 530 */ 531 head = r->consumer_head - 1; 532 while (likely(head >= r->consumer_tail)) 533 r->queue[head--] = NULL; 534 r->consumer_tail = r->consumer_head; 535 536 /* 537 * Go over entries in batch, start moving head back and copy entries. 538 * Stop when we run into previously unconsumed entries. 539 */ 540 while (n) { 541 head = r->consumer_head - 1; 542 if (head < 0) 543 head = r->size - 1; 544 if (r->queue[head]) { 545 /* This batch entry will have to be destroyed. */ 546 goto done; 547 } 548 r->queue[head] = batch[--n]; 549 r->consumer_tail = head; 550 /* matching READ_ONCE in __ptr_ring_empty for lockless tests */ 551 WRITE_ONCE(r->consumer_head, head); 552 } 553 554 done: 555 /* Destroy all entries left in the batch. */ 556 while (n) 557 destroy(batch[--n]); 558 spin_unlock(&r->producer_lock); 559 spin_unlock_irqrestore(&r->consumer_lock, flags); 560 } 561 562 static inline void **__ptr_ring_swap_queue(struct ptr_ring *r, void **queue, 563 int size, gfp_t gfp, 564 void (*destroy)(void *)) 565 { 566 int producer = 0; 567 void **old; 568 void *ptr; 569 570 while ((ptr = __ptr_ring_consume(r))) 571 if (producer < size) 572 queue[producer++] = ptr; 573 else if (destroy) 574 destroy(ptr); 575 576 if (producer >= size) 577 producer = 0; 578 __ptr_ring_set_size(r, size); 579 r->producer = producer; 580 r->consumer_head = 0; 581 r->consumer_tail = 0; 582 old = r->queue; 583 r->queue = queue; 584 585 return old; 586 } 587 588 /* 589 * Note: producer lock is nested within consumer lock, so if you 590 * resize you must make sure all uses nest correctly. 591 * In particular if you consume ring in interrupt or BH context, you must 592 * disable interrupts/BH when doing so. 593 */ 594 static inline int ptr_ring_resize(struct ptr_ring *r, int size, gfp_t gfp, 595 void (*destroy)(void *)) 596 { 597 unsigned long flags; 598 void **queue = __ptr_ring_init_queue_alloc(size, gfp); 599 void **old; 600 601 if (!queue) 602 return -ENOMEM; 603 604 spin_lock_irqsave(&(r)->consumer_lock, flags); 605 spin_lock(&(r)->producer_lock); 606 607 old = __ptr_ring_swap_queue(r, queue, size, gfp, destroy); 608 609 spin_unlock(&(r)->producer_lock); 610 spin_unlock_irqrestore(&(r)->consumer_lock, flags); 611 612 kvfree(old); 613 614 return 0; 615 } 616 617 /* 618 * Note: producer lock is nested within consumer lock, so if you 619 * resize you must make sure all uses nest correctly. 620 * In particular if you consume ring in interrupt or BH context, you must 621 * disable interrupts/BH when doing so. 622 */ 623 static inline int ptr_ring_resize_multiple(struct ptr_ring **rings, 624 unsigned int nrings, 625 int size, 626 gfp_t gfp, void (*destroy)(void *)) 627 { 628 unsigned long flags; 629 void ***queues; 630 int i; 631 632 queues = kmalloc_array(nrings, sizeof(*queues), gfp); 633 if (!queues) 634 goto noqueues; 635 636 for (i = 0; i < nrings; ++i) { 637 queues[i] = __ptr_ring_init_queue_alloc(size, gfp); 638 if (!queues[i]) 639 goto nomem; 640 } 641 642 for (i = 0; i < nrings; ++i) { 643 spin_lock_irqsave(&(rings[i])->consumer_lock, flags); 644 spin_lock(&(rings[i])->producer_lock); 645 queues[i] = __ptr_ring_swap_queue(rings[i], queues[i], 646 size, gfp, destroy); 647 spin_unlock(&(rings[i])->producer_lock); 648 spin_unlock_irqrestore(&(rings[i])->consumer_lock, flags); 649 } 650 651 for (i = 0; i < nrings; ++i) 652 kvfree(queues[i]); 653 654 kfree(queues); 655 656 return 0; 657 658 nomem: 659 while (--i >= 0) 660 kvfree(queues[i]); 661 662 kfree(queues); 663 664 noqueues: 665 return -ENOMEM; 666 } 667 668 static inline void ptr_ring_cleanup(struct ptr_ring *r, void (*destroy)(void *)) 669 { 670 void *ptr; 671 672 if (destroy) 673 while ((ptr = ptr_ring_consume(r))) 674 destroy(ptr); 675 kvfree(r->queue); 676 } 677 678 #endif /* _LINUX_PTR_RING_H */ 679