1 /* 2 * Generic ring buffer 3 * 4 * Copyright (C) 2008 Steven Rostedt <srostedt@redhat.com> 5 */ 6 #include <linux/ring_buffer.h> 7 #include <linux/spinlock.h> 8 #include <linux/debugfs.h> 9 #include <linux/uaccess.h> 10 #include <linux/module.h> 11 #include <linux/percpu.h> 12 #include <linux/mutex.h> 13 #include <linux/sched.h> /* used for sched_clock() (for now) */ 14 #include <linux/init.h> 15 #include <linux/hash.h> 16 #include <linux/list.h> 17 #include <linux/fs.h> 18 19 #include "trace.h" 20 21 /* 22 * A fast way to enable or disable all ring buffers is to 23 * call tracing_on or tracing_off. Turning off the ring buffers 24 * prevents all ring buffers from being recorded to. 25 * Turning this switch on, makes it OK to write to the 26 * ring buffer, if the ring buffer is enabled itself. 27 * 28 * There's three layers that must be on in order to write 29 * to the ring buffer. 30 * 31 * 1) This global flag must be set. 32 * 2) The ring buffer must be enabled for recording. 33 * 3) The per cpu buffer must be enabled for recording. 34 * 35 * In case of an anomaly, this global flag has a bit set that 36 * will permantly disable all ring buffers. 37 */ 38 39 /* 40 * Global flag to disable all recording to ring buffers 41 * This has two bits: ON, DISABLED 42 * 43 * ON DISABLED 44 * ---- ---------- 45 * 0 0 : ring buffers are off 46 * 1 0 : ring buffers are on 47 * X 1 : ring buffers are permanently disabled 48 */ 49 50 enum { 51 RB_BUFFERS_ON_BIT = 0, 52 RB_BUFFERS_DISABLED_BIT = 1, 53 }; 54 55 enum { 56 RB_BUFFERS_ON = 1 << RB_BUFFERS_ON_BIT, 57 RB_BUFFERS_DISABLED = 1 << RB_BUFFERS_DISABLED_BIT, 58 }; 59 60 static long ring_buffer_flags __read_mostly = RB_BUFFERS_ON; 61 62 /** 63 * tracing_on - enable all tracing buffers 64 * 65 * This function enables all tracing buffers that may have been 66 * disabled with tracing_off. 67 */ 68 void tracing_on(void) 69 { 70 set_bit(RB_BUFFERS_ON_BIT, &ring_buffer_flags); 71 } 72 EXPORT_SYMBOL_GPL(tracing_on); 73 74 /** 75 * tracing_off - turn off all tracing buffers 76 * 77 * This function stops all tracing buffers from recording data. 78 * It does not disable any overhead the tracers themselves may 79 * be causing. This function simply causes all recording to 80 * the ring buffers to fail. 81 */ 82 void tracing_off(void) 83 { 84 clear_bit(RB_BUFFERS_ON_BIT, &ring_buffer_flags); 85 } 86 EXPORT_SYMBOL_GPL(tracing_off); 87 88 /** 89 * tracing_off_permanent - permanently disable ring buffers 90 * 91 * This function, once called, will disable all ring buffers 92 * permanenty. 93 */ 94 void tracing_off_permanent(void) 95 { 96 set_bit(RB_BUFFERS_DISABLED_BIT, &ring_buffer_flags); 97 } 98 99 #include "trace.h" 100 101 /* Up this if you want to test the TIME_EXTENTS and normalization */ 102 #define DEBUG_SHIFT 0 103 104 /* FIXME!!! */ 105 u64 ring_buffer_time_stamp(int cpu) 106 { 107 u64 time; 108 109 preempt_disable_notrace(); 110 /* shift to debug/test normalization and TIME_EXTENTS */ 111 time = sched_clock() << DEBUG_SHIFT; 112 preempt_enable_no_resched_notrace(); 113 114 return time; 115 } 116 EXPORT_SYMBOL_GPL(ring_buffer_time_stamp); 117 118 void ring_buffer_normalize_time_stamp(int cpu, u64 *ts) 119 { 120 /* Just stupid testing the normalize function and deltas */ 121 *ts >>= DEBUG_SHIFT; 122 } 123 EXPORT_SYMBOL_GPL(ring_buffer_normalize_time_stamp); 124 125 #define RB_EVNT_HDR_SIZE (sizeof(struct ring_buffer_event)) 126 #define RB_ALIGNMENT_SHIFT 2 127 #define RB_ALIGNMENT (1 << RB_ALIGNMENT_SHIFT) 128 #define RB_MAX_SMALL_DATA 28 129 130 enum { 131 RB_LEN_TIME_EXTEND = 8, 132 RB_LEN_TIME_STAMP = 16, 133 }; 134 135 /* inline for ring buffer fast paths */ 136 static inline unsigned 137 rb_event_length(struct ring_buffer_event *event) 138 { 139 unsigned length; 140 141 switch (event->type) { 142 case RINGBUF_TYPE_PADDING: 143 /* undefined */ 144 return -1; 145 146 case RINGBUF_TYPE_TIME_EXTEND: 147 return RB_LEN_TIME_EXTEND; 148 149 case RINGBUF_TYPE_TIME_STAMP: 150 return RB_LEN_TIME_STAMP; 151 152 case RINGBUF_TYPE_DATA: 153 if (event->len) 154 length = event->len << RB_ALIGNMENT_SHIFT; 155 else 156 length = event->array[0]; 157 return length + RB_EVNT_HDR_SIZE; 158 default: 159 BUG(); 160 } 161 /* not hit */ 162 return 0; 163 } 164 165 /** 166 * ring_buffer_event_length - return the length of the event 167 * @event: the event to get the length of 168 */ 169 unsigned ring_buffer_event_length(struct ring_buffer_event *event) 170 { 171 unsigned length = rb_event_length(event); 172 if (event->type != RINGBUF_TYPE_DATA) 173 return length; 174 length -= RB_EVNT_HDR_SIZE; 175 if (length > RB_MAX_SMALL_DATA + sizeof(event->array[0])) 176 length -= sizeof(event->array[0]); 177 return length; 178 } 179 EXPORT_SYMBOL_GPL(ring_buffer_event_length); 180 181 /* inline for ring buffer fast paths */ 182 static inline void * 183 rb_event_data(struct ring_buffer_event *event) 184 { 185 BUG_ON(event->type != RINGBUF_TYPE_DATA); 186 /* If length is in len field, then array[0] has the data */ 187 if (event->len) 188 return (void *)&event->array[0]; 189 /* Otherwise length is in array[0] and array[1] has the data */ 190 return (void *)&event->array[1]; 191 } 192 193 /** 194 * ring_buffer_event_data - return the data of the event 195 * @event: the event to get the data from 196 */ 197 void *ring_buffer_event_data(struct ring_buffer_event *event) 198 { 199 return rb_event_data(event); 200 } 201 EXPORT_SYMBOL_GPL(ring_buffer_event_data); 202 203 #define for_each_buffer_cpu(buffer, cpu) \ 204 for_each_cpu(cpu, buffer->cpumask) 205 206 #define TS_SHIFT 27 207 #define TS_MASK ((1ULL << TS_SHIFT) - 1) 208 #define TS_DELTA_TEST (~TS_MASK) 209 210 struct buffer_data_page { 211 u64 time_stamp; /* page time stamp */ 212 local_t commit; /* write commited index */ 213 unsigned char data[]; /* data of buffer page */ 214 }; 215 216 struct buffer_page { 217 local_t write; /* index for next write */ 218 unsigned read; /* index for next read */ 219 struct list_head list; /* list of free pages */ 220 struct buffer_data_page *page; /* Actual data page */ 221 }; 222 223 static void rb_init_page(struct buffer_data_page *bpage) 224 { 225 local_set(&bpage->commit, 0); 226 } 227 228 /* 229 * Also stolen from mm/slob.c. Thanks to Mathieu Desnoyers for pointing 230 * this issue out. 231 */ 232 static inline void free_buffer_page(struct buffer_page *bpage) 233 { 234 if (bpage->page) 235 free_page((unsigned long)bpage->page); 236 kfree(bpage); 237 } 238 239 /* 240 * We need to fit the time_stamp delta into 27 bits. 241 */ 242 static inline int test_time_stamp(u64 delta) 243 { 244 if (delta & TS_DELTA_TEST) 245 return 1; 246 return 0; 247 } 248 249 #define BUF_PAGE_SIZE (PAGE_SIZE - offsetof(struct buffer_data_page, data)) 250 251 /* 252 * head_page == tail_page && head == tail then buffer is empty. 253 */ 254 struct ring_buffer_per_cpu { 255 int cpu; 256 struct ring_buffer *buffer; 257 spinlock_t reader_lock; /* serialize readers */ 258 raw_spinlock_t lock; 259 struct lock_class_key lock_key; 260 struct list_head pages; 261 struct buffer_page *head_page; /* read from head */ 262 struct buffer_page *tail_page; /* write to tail */ 263 struct buffer_page *commit_page; /* commited pages */ 264 struct buffer_page *reader_page; 265 unsigned long overrun; 266 unsigned long entries; 267 u64 write_stamp; 268 u64 read_stamp; 269 atomic_t record_disabled; 270 }; 271 272 struct ring_buffer { 273 unsigned pages; 274 unsigned flags; 275 int cpus; 276 cpumask_var_t cpumask; 277 atomic_t record_disabled; 278 279 struct mutex mutex; 280 281 struct ring_buffer_per_cpu **buffers; 282 }; 283 284 struct ring_buffer_iter { 285 struct ring_buffer_per_cpu *cpu_buffer; 286 unsigned long head; 287 struct buffer_page *head_page; 288 u64 read_stamp; 289 }; 290 291 /* buffer may be either ring_buffer or ring_buffer_per_cpu */ 292 #define RB_WARN_ON(buffer, cond) \ 293 ({ \ 294 int _____ret = unlikely(cond); \ 295 if (_____ret) { \ 296 atomic_inc(&buffer->record_disabled); \ 297 WARN_ON(1); \ 298 } \ 299 _____ret; \ 300 }) 301 302 /** 303 * check_pages - integrity check of buffer pages 304 * @cpu_buffer: CPU buffer with pages to test 305 * 306 * As a safty measure we check to make sure the data pages have not 307 * been corrupted. 308 */ 309 static int rb_check_pages(struct ring_buffer_per_cpu *cpu_buffer) 310 { 311 struct list_head *head = &cpu_buffer->pages; 312 struct buffer_page *bpage, *tmp; 313 314 if (RB_WARN_ON(cpu_buffer, head->next->prev != head)) 315 return -1; 316 if (RB_WARN_ON(cpu_buffer, head->prev->next != head)) 317 return -1; 318 319 list_for_each_entry_safe(bpage, tmp, head, list) { 320 if (RB_WARN_ON(cpu_buffer, 321 bpage->list.next->prev != &bpage->list)) 322 return -1; 323 if (RB_WARN_ON(cpu_buffer, 324 bpage->list.prev->next != &bpage->list)) 325 return -1; 326 } 327 328 return 0; 329 } 330 331 static int rb_allocate_pages(struct ring_buffer_per_cpu *cpu_buffer, 332 unsigned nr_pages) 333 { 334 struct list_head *head = &cpu_buffer->pages; 335 struct buffer_page *bpage, *tmp; 336 unsigned long addr; 337 LIST_HEAD(pages); 338 unsigned i; 339 340 for (i = 0; i < nr_pages; i++) { 341 bpage = kzalloc_node(ALIGN(sizeof(*bpage), cache_line_size()), 342 GFP_KERNEL, cpu_to_node(cpu_buffer->cpu)); 343 if (!bpage) 344 goto free_pages; 345 list_add(&bpage->list, &pages); 346 347 addr = __get_free_page(GFP_KERNEL); 348 if (!addr) 349 goto free_pages; 350 bpage->page = (void *)addr; 351 rb_init_page(bpage->page); 352 } 353 354 list_splice(&pages, head); 355 356 rb_check_pages(cpu_buffer); 357 358 return 0; 359 360 free_pages: 361 list_for_each_entry_safe(bpage, tmp, &pages, list) { 362 list_del_init(&bpage->list); 363 free_buffer_page(bpage); 364 } 365 return -ENOMEM; 366 } 367 368 static struct ring_buffer_per_cpu * 369 rb_allocate_cpu_buffer(struct ring_buffer *buffer, int cpu) 370 { 371 struct ring_buffer_per_cpu *cpu_buffer; 372 struct buffer_page *bpage; 373 unsigned long addr; 374 int ret; 375 376 cpu_buffer = kzalloc_node(ALIGN(sizeof(*cpu_buffer), cache_line_size()), 377 GFP_KERNEL, cpu_to_node(cpu)); 378 if (!cpu_buffer) 379 return NULL; 380 381 cpu_buffer->cpu = cpu; 382 cpu_buffer->buffer = buffer; 383 spin_lock_init(&cpu_buffer->reader_lock); 384 cpu_buffer->lock = (raw_spinlock_t)__RAW_SPIN_LOCK_UNLOCKED; 385 INIT_LIST_HEAD(&cpu_buffer->pages); 386 387 bpage = kzalloc_node(ALIGN(sizeof(*bpage), cache_line_size()), 388 GFP_KERNEL, cpu_to_node(cpu)); 389 if (!bpage) 390 goto fail_free_buffer; 391 392 cpu_buffer->reader_page = bpage; 393 addr = __get_free_page(GFP_KERNEL); 394 if (!addr) 395 goto fail_free_reader; 396 bpage->page = (void *)addr; 397 rb_init_page(bpage->page); 398 399 INIT_LIST_HEAD(&cpu_buffer->reader_page->list); 400 401 ret = rb_allocate_pages(cpu_buffer, buffer->pages); 402 if (ret < 0) 403 goto fail_free_reader; 404 405 cpu_buffer->head_page 406 = list_entry(cpu_buffer->pages.next, struct buffer_page, list); 407 cpu_buffer->tail_page = cpu_buffer->commit_page = cpu_buffer->head_page; 408 409 return cpu_buffer; 410 411 fail_free_reader: 412 free_buffer_page(cpu_buffer->reader_page); 413 414 fail_free_buffer: 415 kfree(cpu_buffer); 416 return NULL; 417 } 418 419 static void rb_free_cpu_buffer(struct ring_buffer_per_cpu *cpu_buffer) 420 { 421 struct list_head *head = &cpu_buffer->pages; 422 struct buffer_page *bpage, *tmp; 423 424 list_del_init(&cpu_buffer->reader_page->list); 425 free_buffer_page(cpu_buffer->reader_page); 426 427 list_for_each_entry_safe(bpage, tmp, head, list) { 428 list_del_init(&bpage->list); 429 free_buffer_page(bpage); 430 } 431 kfree(cpu_buffer); 432 } 433 434 /* 435 * Causes compile errors if the struct buffer_page gets bigger 436 * than the struct page. 437 */ 438 extern int ring_buffer_page_too_big(void); 439 440 /** 441 * ring_buffer_alloc - allocate a new ring_buffer 442 * @size: the size in bytes per cpu that is needed. 443 * @flags: attributes to set for the ring buffer. 444 * 445 * Currently the only flag that is available is the RB_FL_OVERWRITE 446 * flag. This flag means that the buffer will overwrite old data 447 * when the buffer wraps. If this flag is not set, the buffer will 448 * drop data when the tail hits the head. 449 */ 450 struct ring_buffer *ring_buffer_alloc(unsigned long size, unsigned flags) 451 { 452 struct ring_buffer *buffer; 453 int bsize; 454 int cpu; 455 456 /* Paranoid! Optimizes out when all is well */ 457 if (sizeof(struct buffer_page) > sizeof(struct page)) 458 ring_buffer_page_too_big(); 459 460 461 /* keep it in its own cache line */ 462 buffer = kzalloc(ALIGN(sizeof(*buffer), cache_line_size()), 463 GFP_KERNEL); 464 if (!buffer) 465 return NULL; 466 467 if (!alloc_cpumask_var(&buffer->cpumask, GFP_KERNEL)) 468 goto fail_free_buffer; 469 470 buffer->pages = DIV_ROUND_UP(size, BUF_PAGE_SIZE); 471 buffer->flags = flags; 472 473 /* need at least two pages */ 474 if (buffer->pages == 1) 475 buffer->pages++; 476 477 cpumask_copy(buffer->cpumask, cpu_possible_mask); 478 buffer->cpus = nr_cpu_ids; 479 480 bsize = sizeof(void *) * nr_cpu_ids; 481 buffer->buffers = kzalloc(ALIGN(bsize, cache_line_size()), 482 GFP_KERNEL); 483 if (!buffer->buffers) 484 goto fail_free_cpumask; 485 486 for_each_buffer_cpu(buffer, cpu) { 487 buffer->buffers[cpu] = 488 rb_allocate_cpu_buffer(buffer, cpu); 489 if (!buffer->buffers[cpu]) 490 goto fail_free_buffers; 491 } 492 493 mutex_init(&buffer->mutex); 494 495 return buffer; 496 497 fail_free_buffers: 498 for_each_buffer_cpu(buffer, cpu) { 499 if (buffer->buffers[cpu]) 500 rb_free_cpu_buffer(buffer->buffers[cpu]); 501 } 502 kfree(buffer->buffers); 503 504 fail_free_cpumask: 505 free_cpumask_var(buffer->cpumask); 506 507 fail_free_buffer: 508 kfree(buffer); 509 return NULL; 510 } 511 EXPORT_SYMBOL_GPL(ring_buffer_alloc); 512 513 /** 514 * ring_buffer_free - free a ring buffer. 515 * @buffer: the buffer to free. 516 */ 517 void 518 ring_buffer_free(struct ring_buffer *buffer) 519 { 520 int cpu; 521 522 for_each_buffer_cpu(buffer, cpu) 523 rb_free_cpu_buffer(buffer->buffers[cpu]); 524 525 free_cpumask_var(buffer->cpumask); 526 527 kfree(buffer); 528 } 529 EXPORT_SYMBOL_GPL(ring_buffer_free); 530 531 static void rb_reset_cpu(struct ring_buffer_per_cpu *cpu_buffer); 532 533 static void 534 rb_remove_pages(struct ring_buffer_per_cpu *cpu_buffer, unsigned nr_pages) 535 { 536 struct buffer_page *bpage; 537 struct list_head *p; 538 unsigned i; 539 540 atomic_inc(&cpu_buffer->record_disabled); 541 synchronize_sched(); 542 543 for (i = 0; i < nr_pages; i++) { 544 if (RB_WARN_ON(cpu_buffer, list_empty(&cpu_buffer->pages))) 545 return; 546 p = cpu_buffer->pages.next; 547 bpage = list_entry(p, struct buffer_page, list); 548 list_del_init(&bpage->list); 549 free_buffer_page(bpage); 550 } 551 if (RB_WARN_ON(cpu_buffer, list_empty(&cpu_buffer->pages))) 552 return; 553 554 rb_reset_cpu(cpu_buffer); 555 556 rb_check_pages(cpu_buffer); 557 558 atomic_dec(&cpu_buffer->record_disabled); 559 560 } 561 562 static void 563 rb_insert_pages(struct ring_buffer_per_cpu *cpu_buffer, 564 struct list_head *pages, unsigned nr_pages) 565 { 566 struct buffer_page *bpage; 567 struct list_head *p; 568 unsigned i; 569 570 atomic_inc(&cpu_buffer->record_disabled); 571 synchronize_sched(); 572 573 for (i = 0; i < nr_pages; i++) { 574 if (RB_WARN_ON(cpu_buffer, list_empty(pages))) 575 return; 576 p = pages->next; 577 bpage = list_entry(p, struct buffer_page, list); 578 list_del_init(&bpage->list); 579 list_add_tail(&bpage->list, &cpu_buffer->pages); 580 } 581 rb_reset_cpu(cpu_buffer); 582 583 rb_check_pages(cpu_buffer); 584 585 atomic_dec(&cpu_buffer->record_disabled); 586 } 587 588 /** 589 * ring_buffer_resize - resize the ring buffer 590 * @buffer: the buffer to resize. 591 * @size: the new size. 592 * 593 * The tracer is responsible for making sure that the buffer is 594 * not being used while changing the size. 595 * Note: We may be able to change the above requirement by using 596 * RCU synchronizations. 597 * 598 * Minimum size is 2 * BUF_PAGE_SIZE. 599 * 600 * Returns -1 on failure. 601 */ 602 int ring_buffer_resize(struct ring_buffer *buffer, unsigned long size) 603 { 604 struct ring_buffer_per_cpu *cpu_buffer; 605 unsigned nr_pages, rm_pages, new_pages; 606 struct buffer_page *bpage, *tmp; 607 unsigned long buffer_size; 608 unsigned long addr; 609 LIST_HEAD(pages); 610 int i, cpu; 611 612 /* 613 * Always succeed at resizing a non-existent buffer: 614 */ 615 if (!buffer) 616 return size; 617 618 size = DIV_ROUND_UP(size, BUF_PAGE_SIZE); 619 size *= BUF_PAGE_SIZE; 620 buffer_size = buffer->pages * BUF_PAGE_SIZE; 621 622 /* we need a minimum of two pages */ 623 if (size < BUF_PAGE_SIZE * 2) 624 size = BUF_PAGE_SIZE * 2; 625 626 if (size == buffer_size) 627 return size; 628 629 mutex_lock(&buffer->mutex); 630 631 nr_pages = DIV_ROUND_UP(size, BUF_PAGE_SIZE); 632 633 if (size < buffer_size) { 634 635 /* easy case, just free pages */ 636 if (RB_WARN_ON(buffer, nr_pages >= buffer->pages)) { 637 mutex_unlock(&buffer->mutex); 638 return -1; 639 } 640 641 rm_pages = buffer->pages - nr_pages; 642 643 for_each_buffer_cpu(buffer, cpu) { 644 cpu_buffer = buffer->buffers[cpu]; 645 rb_remove_pages(cpu_buffer, rm_pages); 646 } 647 goto out; 648 } 649 650 /* 651 * This is a bit more difficult. We only want to add pages 652 * when we can allocate enough for all CPUs. We do this 653 * by allocating all the pages and storing them on a local 654 * link list. If we succeed in our allocation, then we 655 * add these pages to the cpu_buffers. Otherwise we just free 656 * them all and return -ENOMEM; 657 */ 658 if (RB_WARN_ON(buffer, nr_pages <= buffer->pages)) { 659 mutex_unlock(&buffer->mutex); 660 return -1; 661 } 662 663 new_pages = nr_pages - buffer->pages; 664 665 for_each_buffer_cpu(buffer, cpu) { 666 for (i = 0; i < new_pages; i++) { 667 bpage = kzalloc_node(ALIGN(sizeof(*bpage), 668 cache_line_size()), 669 GFP_KERNEL, cpu_to_node(cpu)); 670 if (!bpage) 671 goto free_pages; 672 list_add(&bpage->list, &pages); 673 addr = __get_free_page(GFP_KERNEL); 674 if (!addr) 675 goto free_pages; 676 bpage->page = (void *)addr; 677 rb_init_page(bpage->page); 678 } 679 } 680 681 for_each_buffer_cpu(buffer, cpu) { 682 cpu_buffer = buffer->buffers[cpu]; 683 rb_insert_pages(cpu_buffer, &pages, new_pages); 684 } 685 686 if (RB_WARN_ON(buffer, !list_empty(&pages))) { 687 mutex_unlock(&buffer->mutex); 688 return -1; 689 } 690 691 out: 692 buffer->pages = nr_pages; 693 mutex_unlock(&buffer->mutex); 694 695 return size; 696 697 free_pages: 698 list_for_each_entry_safe(bpage, tmp, &pages, list) { 699 list_del_init(&bpage->list); 700 free_buffer_page(bpage); 701 } 702 mutex_unlock(&buffer->mutex); 703 return -ENOMEM; 704 } 705 EXPORT_SYMBOL_GPL(ring_buffer_resize); 706 707 static inline int rb_null_event(struct ring_buffer_event *event) 708 { 709 return event->type == RINGBUF_TYPE_PADDING; 710 } 711 712 static inline void * 713 __rb_data_page_index(struct buffer_data_page *bpage, unsigned index) 714 { 715 return bpage->data + index; 716 } 717 718 static inline void *__rb_page_index(struct buffer_page *bpage, unsigned index) 719 { 720 return bpage->page->data + index; 721 } 722 723 static inline struct ring_buffer_event * 724 rb_reader_event(struct ring_buffer_per_cpu *cpu_buffer) 725 { 726 return __rb_page_index(cpu_buffer->reader_page, 727 cpu_buffer->reader_page->read); 728 } 729 730 static inline struct ring_buffer_event * 731 rb_head_event(struct ring_buffer_per_cpu *cpu_buffer) 732 { 733 return __rb_page_index(cpu_buffer->head_page, 734 cpu_buffer->head_page->read); 735 } 736 737 static inline struct ring_buffer_event * 738 rb_iter_head_event(struct ring_buffer_iter *iter) 739 { 740 return __rb_page_index(iter->head_page, iter->head); 741 } 742 743 static inline unsigned rb_page_write(struct buffer_page *bpage) 744 { 745 return local_read(&bpage->write); 746 } 747 748 static inline unsigned rb_page_commit(struct buffer_page *bpage) 749 { 750 return local_read(&bpage->page->commit); 751 } 752 753 /* Size is determined by what has been commited */ 754 static inline unsigned rb_page_size(struct buffer_page *bpage) 755 { 756 return rb_page_commit(bpage); 757 } 758 759 static inline unsigned 760 rb_commit_index(struct ring_buffer_per_cpu *cpu_buffer) 761 { 762 return rb_page_commit(cpu_buffer->commit_page); 763 } 764 765 static inline unsigned rb_head_size(struct ring_buffer_per_cpu *cpu_buffer) 766 { 767 return rb_page_commit(cpu_buffer->head_page); 768 } 769 770 /* 771 * When the tail hits the head and the buffer is in overwrite mode, 772 * the head jumps to the next page and all content on the previous 773 * page is discarded. But before doing so, we update the overrun 774 * variable of the buffer. 775 */ 776 static void rb_update_overflow(struct ring_buffer_per_cpu *cpu_buffer) 777 { 778 struct ring_buffer_event *event; 779 unsigned long head; 780 781 for (head = 0; head < rb_head_size(cpu_buffer); 782 head += rb_event_length(event)) { 783 784 event = __rb_page_index(cpu_buffer->head_page, head); 785 if (RB_WARN_ON(cpu_buffer, rb_null_event(event))) 786 return; 787 /* Only count data entries */ 788 if (event->type != RINGBUF_TYPE_DATA) 789 continue; 790 cpu_buffer->overrun++; 791 cpu_buffer->entries--; 792 } 793 } 794 795 static inline void rb_inc_page(struct ring_buffer_per_cpu *cpu_buffer, 796 struct buffer_page **bpage) 797 { 798 struct list_head *p = (*bpage)->list.next; 799 800 if (p == &cpu_buffer->pages) 801 p = p->next; 802 803 *bpage = list_entry(p, struct buffer_page, list); 804 } 805 806 static inline unsigned 807 rb_event_index(struct ring_buffer_event *event) 808 { 809 unsigned long addr = (unsigned long)event; 810 811 return (addr & ~PAGE_MASK) - (PAGE_SIZE - BUF_PAGE_SIZE); 812 } 813 814 static inline int 815 rb_is_commit(struct ring_buffer_per_cpu *cpu_buffer, 816 struct ring_buffer_event *event) 817 { 818 unsigned long addr = (unsigned long)event; 819 unsigned long index; 820 821 index = rb_event_index(event); 822 addr &= PAGE_MASK; 823 824 return cpu_buffer->commit_page->page == (void *)addr && 825 rb_commit_index(cpu_buffer) == index; 826 } 827 828 static inline void 829 rb_set_commit_event(struct ring_buffer_per_cpu *cpu_buffer, 830 struct ring_buffer_event *event) 831 { 832 unsigned long addr = (unsigned long)event; 833 unsigned long index; 834 835 index = rb_event_index(event); 836 addr &= PAGE_MASK; 837 838 while (cpu_buffer->commit_page->page != (void *)addr) { 839 if (RB_WARN_ON(cpu_buffer, 840 cpu_buffer->commit_page == cpu_buffer->tail_page)) 841 return; 842 cpu_buffer->commit_page->page->commit = 843 cpu_buffer->commit_page->write; 844 rb_inc_page(cpu_buffer, &cpu_buffer->commit_page); 845 cpu_buffer->write_stamp = 846 cpu_buffer->commit_page->page->time_stamp; 847 } 848 849 /* Now set the commit to the event's index */ 850 local_set(&cpu_buffer->commit_page->page->commit, index); 851 } 852 853 static inline void 854 rb_set_commit_to_write(struct ring_buffer_per_cpu *cpu_buffer) 855 { 856 /* 857 * We only race with interrupts and NMIs on this CPU. 858 * If we own the commit event, then we can commit 859 * all others that interrupted us, since the interruptions 860 * are in stack format (they finish before they come 861 * back to us). This allows us to do a simple loop to 862 * assign the commit to the tail. 863 */ 864 again: 865 while (cpu_buffer->commit_page != cpu_buffer->tail_page) { 866 cpu_buffer->commit_page->page->commit = 867 cpu_buffer->commit_page->write; 868 rb_inc_page(cpu_buffer, &cpu_buffer->commit_page); 869 cpu_buffer->write_stamp = 870 cpu_buffer->commit_page->page->time_stamp; 871 /* add barrier to keep gcc from optimizing too much */ 872 barrier(); 873 } 874 while (rb_commit_index(cpu_buffer) != 875 rb_page_write(cpu_buffer->commit_page)) { 876 cpu_buffer->commit_page->page->commit = 877 cpu_buffer->commit_page->write; 878 barrier(); 879 } 880 881 /* again, keep gcc from optimizing */ 882 barrier(); 883 884 /* 885 * If an interrupt came in just after the first while loop 886 * and pushed the tail page forward, we will be left with 887 * a dangling commit that will never go forward. 888 */ 889 if (unlikely(cpu_buffer->commit_page != cpu_buffer->tail_page)) 890 goto again; 891 } 892 893 static void rb_reset_reader_page(struct ring_buffer_per_cpu *cpu_buffer) 894 { 895 cpu_buffer->read_stamp = cpu_buffer->reader_page->page->time_stamp; 896 cpu_buffer->reader_page->read = 0; 897 } 898 899 static inline void rb_inc_iter(struct ring_buffer_iter *iter) 900 { 901 struct ring_buffer_per_cpu *cpu_buffer = iter->cpu_buffer; 902 903 /* 904 * The iterator could be on the reader page (it starts there). 905 * But the head could have moved, since the reader was 906 * found. Check for this case and assign the iterator 907 * to the head page instead of next. 908 */ 909 if (iter->head_page == cpu_buffer->reader_page) 910 iter->head_page = cpu_buffer->head_page; 911 else 912 rb_inc_page(cpu_buffer, &iter->head_page); 913 914 iter->read_stamp = iter->head_page->page->time_stamp; 915 iter->head = 0; 916 } 917 918 /** 919 * ring_buffer_update_event - update event type and data 920 * @event: the even to update 921 * @type: the type of event 922 * @length: the size of the event field in the ring buffer 923 * 924 * Update the type and data fields of the event. The length 925 * is the actual size that is written to the ring buffer, 926 * and with this, we can determine what to place into the 927 * data field. 928 */ 929 static inline void 930 rb_update_event(struct ring_buffer_event *event, 931 unsigned type, unsigned length) 932 { 933 event->type = type; 934 935 switch (type) { 936 937 case RINGBUF_TYPE_PADDING: 938 break; 939 940 case RINGBUF_TYPE_TIME_EXTEND: 941 event->len = 942 (RB_LEN_TIME_EXTEND + (RB_ALIGNMENT-1)) 943 >> RB_ALIGNMENT_SHIFT; 944 break; 945 946 case RINGBUF_TYPE_TIME_STAMP: 947 event->len = 948 (RB_LEN_TIME_STAMP + (RB_ALIGNMENT-1)) 949 >> RB_ALIGNMENT_SHIFT; 950 break; 951 952 case RINGBUF_TYPE_DATA: 953 length -= RB_EVNT_HDR_SIZE; 954 if (length > RB_MAX_SMALL_DATA) { 955 event->len = 0; 956 event->array[0] = length; 957 } else 958 event->len = 959 (length + (RB_ALIGNMENT-1)) 960 >> RB_ALIGNMENT_SHIFT; 961 break; 962 default: 963 BUG(); 964 } 965 } 966 967 static inline unsigned rb_calculate_event_length(unsigned length) 968 { 969 struct ring_buffer_event event; /* Used only for sizeof array */ 970 971 /* zero length can cause confusions */ 972 if (!length) 973 length = 1; 974 975 if (length > RB_MAX_SMALL_DATA) 976 length += sizeof(event.array[0]); 977 978 length += RB_EVNT_HDR_SIZE; 979 length = ALIGN(length, RB_ALIGNMENT); 980 981 return length; 982 } 983 984 static struct ring_buffer_event * 985 __rb_reserve_next(struct ring_buffer_per_cpu *cpu_buffer, 986 unsigned type, unsigned long length, u64 *ts) 987 { 988 struct buffer_page *tail_page, *head_page, *reader_page, *commit_page; 989 unsigned long tail, write; 990 struct ring_buffer *buffer = cpu_buffer->buffer; 991 struct ring_buffer_event *event; 992 unsigned long flags; 993 994 commit_page = cpu_buffer->commit_page; 995 /* we just need to protect against interrupts */ 996 barrier(); 997 tail_page = cpu_buffer->tail_page; 998 write = local_add_return(length, &tail_page->write); 999 tail = write - length; 1000 1001 /* See if we shot pass the end of this buffer page */ 1002 if (write > BUF_PAGE_SIZE) { 1003 struct buffer_page *next_page = tail_page; 1004 1005 local_irq_save(flags); 1006 __raw_spin_lock(&cpu_buffer->lock); 1007 1008 rb_inc_page(cpu_buffer, &next_page); 1009 1010 head_page = cpu_buffer->head_page; 1011 reader_page = cpu_buffer->reader_page; 1012 1013 /* we grabbed the lock before incrementing */ 1014 if (RB_WARN_ON(cpu_buffer, next_page == reader_page)) 1015 goto out_unlock; 1016 1017 /* 1018 * If for some reason, we had an interrupt storm that made 1019 * it all the way around the buffer, bail, and warn 1020 * about it. 1021 */ 1022 if (unlikely(next_page == commit_page)) { 1023 WARN_ON_ONCE(1); 1024 goto out_unlock; 1025 } 1026 1027 if (next_page == head_page) { 1028 if (!(buffer->flags & RB_FL_OVERWRITE)) 1029 goto out_unlock; 1030 1031 /* tail_page has not moved yet? */ 1032 if (tail_page == cpu_buffer->tail_page) { 1033 /* count overflows */ 1034 rb_update_overflow(cpu_buffer); 1035 1036 rb_inc_page(cpu_buffer, &head_page); 1037 cpu_buffer->head_page = head_page; 1038 cpu_buffer->head_page->read = 0; 1039 } 1040 } 1041 1042 /* 1043 * If the tail page is still the same as what we think 1044 * it is, then it is up to us to update the tail 1045 * pointer. 1046 */ 1047 if (tail_page == cpu_buffer->tail_page) { 1048 local_set(&next_page->write, 0); 1049 local_set(&next_page->page->commit, 0); 1050 cpu_buffer->tail_page = next_page; 1051 1052 /* reread the time stamp */ 1053 *ts = ring_buffer_time_stamp(cpu_buffer->cpu); 1054 cpu_buffer->tail_page->page->time_stamp = *ts; 1055 } 1056 1057 /* 1058 * The actual tail page has moved forward. 1059 */ 1060 if (tail < BUF_PAGE_SIZE) { 1061 /* Mark the rest of the page with padding */ 1062 event = __rb_page_index(tail_page, tail); 1063 event->type = RINGBUF_TYPE_PADDING; 1064 } 1065 1066 if (tail <= BUF_PAGE_SIZE) 1067 /* Set the write back to the previous setting */ 1068 local_set(&tail_page->write, tail); 1069 1070 /* 1071 * If this was a commit entry that failed, 1072 * increment that too 1073 */ 1074 if (tail_page == cpu_buffer->commit_page && 1075 tail == rb_commit_index(cpu_buffer)) { 1076 rb_set_commit_to_write(cpu_buffer); 1077 } 1078 1079 __raw_spin_unlock(&cpu_buffer->lock); 1080 local_irq_restore(flags); 1081 1082 /* fail and let the caller try again */ 1083 return ERR_PTR(-EAGAIN); 1084 } 1085 1086 /* We reserved something on the buffer */ 1087 1088 if (RB_WARN_ON(cpu_buffer, write > BUF_PAGE_SIZE)) 1089 return NULL; 1090 1091 event = __rb_page_index(tail_page, tail); 1092 rb_update_event(event, type, length); 1093 1094 /* 1095 * If this is a commit and the tail is zero, then update 1096 * this page's time stamp. 1097 */ 1098 if (!tail && rb_is_commit(cpu_buffer, event)) 1099 cpu_buffer->commit_page->page->time_stamp = *ts; 1100 1101 return event; 1102 1103 out_unlock: 1104 /* reset write */ 1105 if (tail <= BUF_PAGE_SIZE) 1106 local_set(&tail_page->write, tail); 1107 1108 __raw_spin_unlock(&cpu_buffer->lock); 1109 local_irq_restore(flags); 1110 return NULL; 1111 } 1112 1113 static int 1114 rb_add_time_stamp(struct ring_buffer_per_cpu *cpu_buffer, 1115 u64 *ts, u64 *delta) 1116 { 1117 struct ring_buffer_event *event; 1118 static int once; 1119 int ret; 1120 1121 if (unlikely(*delta > (1ULL << 59) && !once++)) { 1122 printk(KERN_WARNING "Delta way too big! %llu" 1123 " ts=%llu write stamp = %llu\n", 1124 (unsigned long long)*delta, 1125 (unsigned long long)*ts, 1126 (unsigned long long)cpu_buffer->write_stamp); 1127 WARN_ON(1); 1128 } 1129 1130 /* 1131 * The delta is too big, we to add a 1132 * new timestamp. 1133 */ 1134 event = __rb_reserve_next(cpu_buffer, 1135 RINGBUF_TYPE_TIME_EXTEND, 1136 RB_LEN_TIME_EXTEND, 1137 ts); 1138 if (!event) 1139 return -EBUSY; 1140 1141 if (PTR_ERR(event) == -EAGAIN) 1142 return -EAGAIN; 1143 1144 /* Only a commited time event can update the write stamp */ 1145 if (rb_is_commit(cpu_buffer, event)) { 1146 /* 1147 * If this is the first on the page, then we need to 1148 * update the page itself, and just put in a zero. 1149 */ 1150 if (rb_event_index(event)) { 1151 event->time_delta = *delta & TS_MASK; 1152 event->array[0] = *delta >> TS_SHIFT; 1153 } else { 1154 cpu_buffer->commit_page->page->time_stamp = *ts; 1155 event->time_delta = 0; 1156 event->array[0] = 0; 1157 } 1158 cpu_buffer->write_stamp = *ts; 1159 /* let the caller know this was the commit */ 1160 ret = 1; 1161 } else { 1162 /* Darn, this is just wasted space */ 1163 event->time_delta = 0; 1164 event->array[0] = 0; 1165 ret = 0; 1166 } 1167 1168 *delta = 0; 1169 1170 return ret; 1171 } 1172 1173 static struct ring_buffer_event * 1174 rb_reserve_next_event(struct ring_buffer_per_cpu *cpu_buffer, 1175 unsigned type, unsigned long length) 1176 { 1177 struct ring_buffer_event *event; 1178 u64 ts, delta; 1179 int commit = 0; 1180 int nr_loops = 0; 1181 1182 again: 1183 /* 1184 * We allow for interrupts to reenter here and do a trace. 1185 * If one does, it will cause this original code to loop 1186 * back here. Even with heavy interrupts happening, this 1187 * should only happen a few times in a row. If this happens 1188 * 1000 times in a row, there must be either an interrupt 1189 * storm or we have something buggy. 1190 * Bail! 1191 */ 1192 if (RB_WARN_ON(cpu_buffer, ++nr_loops > 1000)) 1193 return NULL; 1194 1195 ts = ring_buffer_time_stamp(cpu_buffer->cpu); 1196 1197 /* 1198 * Only the first commit can update the timestamp. 1199 * Yes there is a race here. If an interrupt comes in 1200 * just after the conditional and it traces too, then it 1201 * will also check the deltas. More than one timestamp may 1202 * also be made. But only the entry that did the actual 1203 * commit will be something other than zero. 1204 */ 1205 if (cpu_buffer->tail_page == cpu_buffer->commit_page && 1206 rb_page_write(cpu_buffer->tail_page) == 1207 rb_commit_index(cpu_buffer)) { 1208 1209 delta = ts - cpu_buffer->write_stamp; 1210 1211 /* make sure this delta is calculated here */ 1212 barrier(); 1213 1214 /* Did the write stamp get updated already? */ 1215 if (unlikely(ts < cpu_buffer->write_stamp)) 1216 delta = 0; 1217 1218 if (test_time_stamp(delta)) { 1219 1220 commit = rb_add_time_stamp(cpu_buffer, &ts, &delta); 1221 1222 if (commit == -EBUSY) 1223 return NULL; 1224 1225 if (commit == -EAGAIN) 1226 goto again; 1227 1228 RB_WARN_ON(cpu_buffer, commit < 0); 1229 } 1230 } else 1231 /* Non commits have zero deltas */ 1232 delta = 0; 1233 1234 event = __rb_reserve_next(cpu_buffer, type, length, &ts); 1235 if (PTR_ERR(event) == -EAGAIN) 1236 goto again; 1237 1238 if (!event) { 1239 if (unlikely(commit)) 1240 /* 1241 * Ouch! We needed a timestamp and it was commited. But 1242 * we didn't get our event reserved. 1243 */ 1244 rb_set_commit_to_write(cpu_buffer); 1245 return NULL; 1246 } 1247 1248 /* 1249 * If the timestamp was commited, make the commit our entry 1250 * now so that we will update it when needed. 1251 */ 1252 if (commit) 1253 rb_set_commit_event(cpu_buffer, event); 1254 else if (!rb_is_commit(cpu_buffer, event)) 1255 delta = 0; 1256 1257 event->time_delta = delta; 1258 1259 return event; 1260 } 1261 1262 static DEFINE_PER_CPU(int, rb_need_resched); 1263 1264 /** 1265 * ring_buffer_lock_reserve - reserve a part of the buffer 1266 * @buffer: the ring buffer to reserve from 1267 * @length: the length of the data to reserve (excluding event header) 1268 * @flags: a pointer to save the interrupt flags 1269 * 1270 * Returns a reseverd event on the ring buffer to copy directly to. 1271 * The user of this interface will need to get the body to write into 1272 * and can use the ring_buffer_event_data() interface. 1273 * 1274 * The length is the length of the data needed, not the event length 1275 * which also includes the event header. 1276 * 1277 * Must be paired with ring_buffer_unlock_commit, unless NULL is returned. 1278 * If NULL is returned, then nothing has been allocated or locked. 1279 */ 1280 struct ring_buffer_event * 1281 ring_buffer_lock_reserve(struct ring_buffer *buffer, 1282 unsigned long length, 1283 unsigned long *flags) 1284 { 1285 struct ring_buffer_per_cpu *cpu_buffer; 1286 struct ring_buffer_event *event; 1287 int cpu, resched; 1288 1289 if (ring_buffer_flags != RB_BUFFERS_ON) 1290 return NULL; 1291 1292 if (atomic_read(&buffer->record_disabled)) 1293 return NULL; 1294 1295 /* If we are tracing schedule, we don't want to recurse */ 1296 resched = ftrace_preempt_disable(); 1297 1298 cpu = raw_smp_processor_id(); 1299 1300 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 1301 goto out; 1302 1303 cpu_buffer = buffer->buffers[cpu]; 1304 1305 if (atomic_read(&cpu_buffer->record_disabled)) 1306 goto out; 1307 1308 length = rb_calculate_event_length(length); 1309 if (length > BUF_PAGE_SIZE) 1310 goto out; 1311 1312 event = rb_reserve_next_event(cpu_buffer, RINGBUF_TYPE_DATA, length); 1313 if (!event) 1314 goto out; 1315 1316 /* 1317 * Need to store resched state on this cpu. 1318 * Only the first needs to. 1319 */ 1320 1321 if (preempt_count() == 1) 1322 per_cpu(rb_need_resched, cpu) = resched; 1323 1324 return event; 1325 1326 out: 1327 ftrace_preempt_enable(resched); 1328 return NULL; 1329 } 1330 EXPORT_SYMBOL_GPL(ring_buffer_lock_reserve); 1331 1332 static void rb_commit(struct ring_buffer_per_cpu *cpu_buffer, 1333 struct ring_buffer_event *event) 1334 { 1335 cpu_buffer->entries++; 1336 1337 /* Only process further if we own the commit */ 1338 if (!rb_is_commit(cpu_buffer, event)) 1339 return; 1340 1341 cpu_buffer->write_stamp += event->time_delta; 1342 1343 rb_set_commit_to_write(cpu_buffer); 1344 } 1345 1346 /** 1347 * ring_buffer_unlock_commit - commit a reserved 1348 * @buffer: The buffer to commit to 1349 * @event: The event pointer to commit. 1350 * @flags: the interrupt flags received from ring_buffer_lock_reserve. 1351 * 1352 * This commits the data to the ring buffer, and releases any locks held. 1353 * 1354 * Must be paired with ring_buffer_lock_reserve. 1355 */ 1356 int ring_buffer_unlock_commit(struct ring_buffer *buffer, 1357 struct ring_buffer_event *event, 1358 unsigned long flags) 1359 { 1360 struct ring_buffer_per_cpu *cpu_buffer; 1361 int cpu = raw_smp_processor_id(); 1362 1363 cpu_buffer = buffer->buffers[cpu]; 1364 1365 rb_commit(cpu_buffer, event); 1366 1367 /* 1368 * Only the last preempt count needs to restore preemption. 1369 */ 1370 if (preempt_count() == 1) 1371 ftrace_preempt_enable(per_cpu(rb_need_resched, cpu)); 1372 else 1373 preempt_enable_no_resched_notrace(); 1374 1375 return 0; 1376 } 1377 EXPORT_SYMBOL_GPL(ring_buffer_unlock_commit); 1378 1379 /** 1380 * ring_buffer_write - write data to the buffer without reserving 1381 * @buffer: The ring buffer to write to. 1382 * @length: The length of the data being written (excluding the event header) 1383 * @data: The data to write to the buffer. 1384 * 1385 * This is like ring_buffer_lock_reserve and ring_buffer_unlock_commit as 1386 * one function. If you already have the data to write to the buffer, it 1387 * may be easier to simply call this function. 1388 * 1389 * Note, like ring_buffer_lock_reserve, the length is the length of the data 1390 * and not the length of the event which would hold the header. 1391 */ 1392 int ring_buffer_write(struct ring_buffer *buffer, 1393 unsigned long length, 1394 void *data) 1395 { 1396 struct ring_buffer_per_cpu *cpu_buffer; 1397 struct ring_buffer_event *event; 1398 unsigned long event_length; 1399 void *body; 1400 int ret = -EBUSY; 1401 int cpu, resched; 1402 1403 if (ring_buffer_flags != RB_BUFFERS_ON) 1404 return -EBUSY; 1405 1406 if (atomic_read(&buffer->record_disabled)) 1407 return -EBUSY; 1408 1409 resched = ftrace_preempt_disable(); 1410 1411 cpu = raw_smp_processor_id(); 1412 1413 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 1414 goto out; 1415 1416 cpu_buffer = buffer->buffers[cpu]; 1417 1418 if (atomic_read(&cpu_buffer->record_disabled)) 1419 goto out; 1420 1421 event_length = rb_calculate_event_length(length); 1422 event = rb_reserve_next_event(cpu_buffer, 1423 RINGBUF_TYPE_DATA, event_length); 1424 if (!event) 1425 goto out; 1426 1427 body = rb_event_data(event); 1428 1429 memcpy(body, data, length); 1430 1431 rb_commit(cpu_buffer, event); 1432 1433 ret = 0; 1434 out: 1435 ftrace_preempt_enable(resched); 1436 1437 return ret; 1438 } 1439 EXPORT_SYMBOL_GPL(ring_buffer_write); 1440 1441 static inline int rb_per_cpu_empty(struct ring_buffer_per_cpu *cpu_buffer) 1442 { 1443 struct buffer_page *reader = cpu_buffer->reader_page; 1444 struct buffer_page *head = cpu_buffer->head_page; 1445 struct buffer_page *commit = cpu_buffer->commit_page; 1446 1447 return reader->read == rb_page_commit(reader) && 1448 (commit == reader || 1449 (commit == head && 1450 head->read == rb_page_commit(commit))); 1451 } 1452 1453 /** 1454 * ring_buffer_record_disable - stop all writes into the buffer 1455 * @buffer: The ring buffer to stop writes to. 1456 * 1457 * This prevents all writes to the buffer. Any attempt to write 1458 * to the buffer after this will fail and return NULL. 1459 * 1460 * The caller should call synchronize_sched() after this. 1461 */ 1462 void ring_buffer_record_disable(struct ring_buffer *buffer) 1463 { 1464 atomic_inc(&buffer->record_disabled); 1465 } 1466 EXPORT_SYMBOL_GPL(ring_buffer_record_disable); 1467 1468 /** 1469 * ring_buffer_record_enable - enable writes to the buffer 1470 * @buffer: The ring buffer to enable writes 1471 * 1472 * Note, multiple disables will need the same number of enables 1473 * to truely enable the writing (much like preempt_disable). 1474 */ 1475 void ring_buffer_record_enable(struct ring_buffer *buffer) 1476 { 1477 atomic_dec(&buffer->record_disabled); 1478 } 1479 EXPORT_SYMBOL_GPL(ring_buffer_record_enable); 1480 1481 /** 1482 * ring_buffer_record_disable_cpu - stop all writes into the cpu_buffer 1483 * @buffer: The ring buffer to stop writes to. 1484 * @cpu: The CPU buffer to stop 1485 * 1486 * This prevents all writes to the buffer. Any attempt to write 1487 * to the buffer after this will fail and return NULL. 1488 * 1489 * The caller should call synchronize_sched() after this. 1490 */ 1491 void ring_buffer_record_disable_cpu(struct ring_buffer *buffer, int cpu) 1492 { 1493 struct ring_buffer_per_cpu *cpu_buffer; 1494 1495 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 1496 return; 1497 1498 cpu_buffer = buffer->buffers[cpu]; 1499 atomic_inc(&cpu_buffer->record_disabled); 1500 } 1501 EXPORT_SYMBOL_GPL(ring_buffer_record_disable_cpu); 1502 1503 /** 1504 * ring_buffer_record_enable_cpu - enable writes to the buffer 1505 * @buffer: The ring buffer to enable writes 1506 * @cpu: The CPU to enable. 1507 * 1508 * Note, multiple disables will need the same number of enables 1509 * to truely enable the writing (much like preempt_disable). 1510 */ 1511 void ring_buffer_record_enable_cpu(struct ring_buffer *buffer, int cpu) 1512 { 1513 struct ring_buffer_per_cpu *cpu_buffer; 1514 1515 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 1516 return; 1517 1518 cpu_buffer = buffer->buffers[cpu]; 1519 atomic_dec(&cpu_buffer->record_disabled); 1520 } 1521 EXPORT_SYMBOL_GPL(ring_buffer_record_enable_cpu); 1522 1523 /** 1524 * ring_buffer_entries_cpu - get the number of entries in a cpu buffer 1525 * @buffer: The ring buffer 1526 * @cpu: The per CPU buffer to get the entries from. 1527 */ 1528 unsigned long ring_buffer_entries_cpu(struct ring_buffer *buffer, int cpu) 1529 { 1530 struct ring_buffer_per_cpu *cpu_buffer; 1531 1532 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 1533 return 0; 1534 1535 cpu_buffer = buffer->buffers[cpu]; 1536 return cpu_buffer->entries; 1537 } 1538 EXPORT_SYMBOL_GPL(ring_buffer_entries_cpu); 1539 1540 /** 1541 * ring_buffer_overrun_cpu - get the number of overruns in a cpu_buffer 1542 * @buffer: The ring buffer 1543 * @cpu: The per CPU buffer to get the number of overruns from 1544 */ 1545 unsigned long ring_buffer_overrun_cpu(struct ring_buffer *buffer, int cpu) 1546 { 1547 struct ring_buffer_per_cpu *cpu_buffer; 1548 1549 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 1550 return 0; 1551 1552 cpu_buffer = buffer->buffers[cpu]; 1553 return cpu_buffer->overrun; 1554 } 1555 EXPORT_SYMBOL_GPL(ring_buffer_overrun_cpu); 1556 1557 /** 1558 * ring_buffer_entries - get the number of entries in a buffer 1559 * @buffer: The ring buffer 1560 * 1561 * Returns the total number of entries in the ring buffer 1562 * (all CPU entries) 1563 */ 1564 unsigned long ring_buffer_entries(struct ring_buffer *buffer) 1565 { 1566 struct ring_buffer_per_cpu *cpu_buffer; 1567 unsigned long entries = 0; 1568 int cpu; 1569 1570 /* if you care about this being correct, lock the buffer */ 1571 for_each_buffer_cpu(buffer, cpu) { 1572 cpu_buffer = buffer->buffers[cpu]; 1573 entries += cpu_buffer->entries; 1574 } 1575 1576 return entries; 1577 } 1578 EXPORT_SYMBOL_GPL(ring_buffer_entries); 1579 1580 /** 1581 * ring_buffer_overrun_cpu - get the number of overruns in buffer 1582 * @buffer: The ring buffer 1583 * 1584 * Returns the total number of overruns in the ring buffer 1585 * (all CPU entries) 1586 */ 1587 unsigned long ring_buffer_overruns(struct ring_buffer *buffer) 1588 { 1589 struct ring_buffer_per_cpu *cpu_buffer; 1590 unsigned long overruns = 0; 1591 int cpu; 1592 1593 /* if you care about this being correct, lock the buffer */ 1594 for_each_buffer_cpu(buffer, cpu) { 1595 cpu_buffer = buffer->buffers[cpu]; 1596 overruns += cpu_buffer->overrun; 1597 } 1598 1599 return overruns; 1600 } 1601 EXPORT_SYMBOL_GPL(ring_buffer_overruns); 1602 1603 static void rb_iter_reset(struct ring_buffer_iter *iter) 1604 { 1605 struct ring_buffer_per_cpu *cpu_buffer = iter->cpu_buffer; 1606 1607 /* Iterator usage is expected to have record disabled */ 1608 if (list_empty(&cpu_buffer->reader_page->list)) { 1609 iter->head_page = cpu_buffer->head_page; 1610 iter->head = cpu_buffer->head_page->read; 1611 } else { 1612 iter->head_page = cpu_buffer->reader_page; 1613 iter->head = cpu_buffer->reader_page->read; 1614 } 1615 if (iter->head) 1616 iter->read_stamp = cpu_buffer->read_stamp; 1617 else 1618 iter->read_stamp = iter->head_page->page->time_stamp; 1619 } 1620 1621 /** 1622 * ring_buffer_iter_reset - reset an iterator 1623 * @iter: The iterator to reset 1624 * 1625 * Resets the iterator, so that it will start from the beginning 1626 * again. 1627 */ 1628 void ring_buffer_iter_reset(struct ring_buffer_iter *iter) 1629 { 1630 struct ring_buffer_per_cpu *cpu_buffer = iter->cpu_buffer; 1631 unsigned long flags; 1632 1633 spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 1634 rb_iter_reset(iter); 1635 spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 1636 } 1637 EXPORT_SYMBOL_GPL(ring_buffer_iter_reset); 1638 1639 /** 1640 * ring_buffer_iter_empty - check if an iterator has no more to read 1641 * @iter: The iterator to check 1642 */ 1643 int ring_buffer_iter_empty(struct ring_buffer_iter *iter) 1644 { 1645 struct ring_buffer_per_cpu *cpu_buffer; 1646 1647 cpu_buffer = iter->cpu_buffer; 1648 1649 return iter->head_page == cpu_buffer->commit_page && 1650 iter->head == rb_commit_index(cpu_buffer); 1651 } 1652 EXPORT_SYMBOL_GPL(ring_buffer_iter_empty); 1653 1654 static void 1655 rb_update_read_stamp(struct ring_buffer_per_cpu *cpu_buffer, 1656 struct ring_buffer_event *event) 1657 { 1658 u64 delta; 1659 1660 switch (event->type) { 1661 case RINGBUF_TYPE_PADDING: 1662 return; 1663 1664 case RINGBUF_TYPE_TIME_EXTEND: 1665 delta = event->array[0]; 1666 delta <<= TS_SHIFT; 1667 delta += event->time_delta; 1668 cpu_buffer->read_stamp += delta; 1669 return; 1670 1671 case RINGBUF_TYPE_TIME_STAMP: 1672 /* FIXME: not implemented */ 1673 return; 1674 1675 case RINGBUF_TYPE_DATA: 1676 cpu_buffer->read_stamp += event->time_delta; 1677 return; 1678 1679 default: 1680 BUG(); 1681 } 1682 return; 1683 } 1684 1685 static void 1686 rb_update_iter_read_stamp(struct ring_buffer_iter *iter, 1687 struct ring_buffer_event *event) 1688 { 1689 u64 delta; 1690 1691 switch (event->type) { 1692 case RINGBUF_TYPE_PADDING: 1693 return; 1694 1695 case RINGBUF_TYPE_TIME_EXTEND: 1696 delta = event->array[0]; 1697 delta <<= TS_SHIFT; 1698 delta += event->time_delta; 1699 iter->read_stamp += delta; 1700 return; 1701 1702 case RINGBUF_TYPE_TIME_STAMP: 1703 /* FIXME: not implemented */ 1704 return; 1705 1706 case RINGBUF_TYPE_DATA: 1707 iter->read_stamp += event->time_delta; 1708 return; 1709 1710 default: 1711 BUG(); 1712 } 1713 return; 1714 } 1715 1716 static struct buffer_page * 1717 rb_get_reader_page(struct ring_buffer_per_cpu *cpu_buffer) 1718 { 1719 struct buffer_page *reader = NULL; 1720 unsigned long flags; 1721 int nr_loops = 0; 1722 1723 local_irq_save(flags); 1724 __raw_spin_lock(&cpu_buffer->lock); 1725 1726 again: 1727 /* 1728 * This should normally only loop twice. But because the 1729 * start of the reader inserts an empty page, it causes 1730 * a case where we will loop three times. There should be no 1731 * reason to loop four times (that I know of). 1732 */ 1733 if (RB_WARN_ON(cpu_buffer, ++nr_loops > 3)) { 1734 reader = NULL; 1735 goto out; 1736 } 1737 1738 reader = cpu_buffer->reader_page; 1739 1740 /* If there's more to read, return this page */ 1741 if (cpu_buffer->reader_page->read < rb_page_size(reader)) 1742 goto out; 1743 1744 /* Never should we have an index greater than the size */ 1745 if (RB_WARN_ON(cpu_buffer, 1746 cpu_buffer->reader_page->read > rb_page_size(reader))) 1747 goto out; 1748 1749 /* check if we caught up to the tail */ 1750 reader = NULL; 1751 if (cpu_buffer->commit_page == cpu_buffer->reader_page) 1752 goto out; 1753 1754 /* 1755 * Splice the empty reader page into the list around the head. 1756 * Reset the reader page to size zero. 1757 */ 1758 1759 reader = cpu_buffer->head_page; 1760 cpu_buffer->reader_page->list.next = reader->list.next; 1761 cpu_buffer->reader_page->list.prev = reader->list.prev; 1762 1763 local_set(&cpu_buffer->reader_page->write, 0); 1764 local_set(&cpu_buffer->reader_page->page->commit, 0); 1765 1766 /* Make the reader page now replace the head */ 1767 reader->list.prev->next = &cpu_buffer->reader_page->list; 1768 reader->list.next->prev = &cpu_buffer->reader_page->list; 1769 1770 /* 1771 * If the tail is on the reader, then we must set the head 1772 * to the inserted page, otherwise we set it one before. 1773 */ 1774 cpu_buffer->head_page = cpu_buffer->reader_page; 1775 1776 if (cpu_buffer->commit_page != reader) 1777 rb_inc_page(cpu_buffer, &cpu_buffer->head_page); 1778 1779 /* Finally update the reader page to the new head */ 1780 cpu_buffer->reader_page = reader; 1781 rb_reset_reader_page(cpu_buffer); 1782 1783 goto again; 1784 1785 out: 1786 __raw_spin_unlock(&cpu_buffer->lock); 1787 local_irq_restore(flags); 1788 1789 return reader; 1790 } 1791 1792 static void rb_advance_reader(struct ring_buffer_per_cpu *cpu_buffer) 1793 { 1794 struct ring_buffer_event *event; 1795 struct buffer_page *reader; 1796 unsigned length; 1797 1798 reader = rb_get_reader_page(cpu_buffer); 1799 1800 /* This function should not be called when buffer is empty */ 1801 if (RB_WARN_ON(cpu_buffer, !reader)) 1802 return; 1803 1804 event = rb_reader_event(cpu_buffer); 1805 1806 if (event->type == RINGBUF_TYPE_DATA) 1807 cpu_buffer->entries--; 1808 1809 rb_update_read_stamp(cpu_buffer, event); 1810 1811 length = rb_event_length(event); 1812 cpu_buffer->reader_page->read += length; 1813 } 1814 1815 static void rb_advance_iter(struct ring_buffer_iter *iter) 1816 { 1817 struct ring_buffer *buffer; 1818 struct ring_buffer_per_cpu *cpu_buffer; 1819 struct ring_buffer_event *event; 1820 unsigned length; 1821 1822 cpu_buffer = iter->cpu_buffer; 1823 buffer = cpu_buffer->buffer; 1824 1825 /* 1826 * Check if we are at the end of the buffer. 1827 */ 1828 if (iter->head >= rb_page_size(iter->head_page)) { 1829 if (RB_WARN_ON(buffer, 1830 iter->head_page == cpu_buffer->commit_page)) 1831 return; 1832 rb_inc_iter(iter); 1833 return; 1834 } 1835 1836 event = rb_iter_head_event(iter); 1837 1838 length = rb_event_length(event); 1839 1840 /* 1841 * This should not be called to advance the header if we are 1842 * at the tail of the buffer. 1843 */ 1844 if (RB_WARN_ON(cpu_buffer, 1845 (iter->head_page == cpu_buffer->commit_page) && 1846 (iter->head + length > rb_commit_index(cpu_buffer)))) 1847 return; 1848 1849 rb_update_iter_read_stamp(iter, event); 1850 1851 iter->head += length; 1852 1853 /* check for end of page padding */ 1854 if ((iter->head >= rb_page_size(iter->head_page)) && 1855 (iter->head_page != cpu_buffer->commit_page)) 1856 rb_advance_iter(iter); 1857 } 1858 1859 static struct ring_buffer_event * 1860 rb_buffer_peek(struct ring_buffer *buffer, int cpu, u64 *ts) 1861 { 1862 struct ring_buffer_per_cpu *cpu_buffer; 1863 struct ring_buffer_event *event; 1864 struct buffer_page *reader; 1865 int nr_loops = 0; 1866 1867 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 1868 return NULL; 1869 1870 cpu_buffer = buffer->buffers[cpu]; 1871 1872 again: 1873 /* 1874 * We repeat when a timestamp is encountered. It is possible 1875 * to get multiple timestamps from an interrupt entering just 1876 * as one timestamp is about to be written. The max times 1877 * that this can happen is the number of nested interrupts we 1878 * can have. Nesting 10 deep of interrupts is clearly 1879 * an anomaly. 1880 */ 1881 if (RB_WARN_ON(cpu_buffer, ++nr_loops > 10)) 1882 return NULL; 1883 1884 reader = rb_get_reader_page(cpu_buffer); 1885 if (!reader) 1886 return NULL; 1887 1888 event = rb_reader_event(cpu_buffer); 1889 1890 switch (event->type) { 1891 case RINGBUF_TYPE_PADDING: 1892 RB_WARN_ON(cpu_buffer, 1); 1893 rb_advance_reader(cpu_buffer); 1894 return NULL; 1895 1896 case RINGBUF_TYPE_TIME_EXTEND: 1897 /* Internal data, OK to advance */ 1898 rb_advance_reader(cpu_buffer); 1899 goto again; 1900 1901 case RINGBUF_TYPE_TIME_STAMP: 1902 /* FIXME: not implemented */ 1903 rb_advance_reader(cpu_buffer); 1904 goto again; 1905 1906 case RINGBUF_TYPE_DATA: 1907 if (ts) { 1908 *ts = cpu_buffer->read_stamp + event->time_delta; 1909 ring_buffer_normalize_time_stamp(cpu_buffer->cpu, ts); 1910 } 1911 return event; 1912 1913 default: 1914 BUG(); 1915 } 1916 1917 return NULL; 1918 } 1919 EXPORT_SYMBOL_GPL(ring_buffer_peek); 1920 1921 static struct ring_buffer_event * 1922 rb_iter_peek(struct ring_buffer_iter *iter, u64 *ts) 1923 { 1924 struct ring_buffer *buffer; 1925 struct ring_buffer_per_cpu *cpu_buffer; 1926 struct ring_buffer_event *event; 1927 int nr_loops = 0; 1928 1929 if (ring_buffer_iter_empty(iter)) 1930 return NULL; 1931 1932 cpu_buffer = iter->cpu_buffer; 1933 buffer = cpu_buffer->buffer; 1934 1935 again: 1936 /* 1937 * We repeat when a timestamp is encountered. It is possible 1938 * to get multiple timestamps from an interrupt entering just 1939 * as one timestamp is about to be written. The max times 1940 * that this can happen is the number of nested interrupts we 1941 * can have. Nesting 10 deep of interrupts is clearly 1942 * an anomaly. 1943 */ 1944 if (RB_WARN_ON(cpu_buffer, ++nr_loops > 10)) 1945 return NULL; 1946 1947 if (rb_per_cpu_empty(cpu_buffer)) 1948 return NULL; 1949 1950 event = rb_iter_head_event(iter); 1951 1952 switch (event->type) { 1953 case RINGBUF_TYPE_PADDING: 1954 rb_inc_iter(iter); 1955 goto again; 1956 1957 case RINGBUF_TYPE_TIME_EXTEND: 1958 /* Internal data, OK to advance */ 1959 rb_advance_iter(iter); 1960 goto again; 1961 1962 case RINGBUF_TYPE_TIME_STAMP: 1963 /* FIXME: not implemented */ 1964 rb_advance_iter(iter); 1965 goto again; 1966 1967 case RINGBUF_TYPE_DATA: 1968 if (ts) { 1969 *ts = iter->read_stamp + event->time_delta; 1970 ring_buffer_normalize_time_stamp(cpu_buffer->cpu, ts); 1971 } 1972 return event; 1973 1974 default: 1975 BUG(); 1976 } 1977 1978 return NULL; 1979 } 1980 EXPORT_SYMBOL_GPL(ring_buffer_iter_peek); 1981 1982 /** 1983 * ring_buffer_peek - peek at the next event to be read 1984 * @buffer: The ring buffer to read 1985 * @cpu: The cpu to peak at 1986 * @ts: The timestamp counter of this event. 1987 * 1988 * This will return the event that will be read next, but does 1989 * not consume the data. 1990 */ 1991 struct ring_buffer_event * 1992 ring_buffer_peek(struct ring_buffer *buffer, int cpu, u64 *ts) 1993 { 1994 struct ring_buffer_per_cpu *cpu_buffer = buffer->buffers[cpu]; 1995 struct ring_buffer_event *event; 1996 unsigned long flags; 1997 1998 spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 1999 event = rb_buffer_peek(buffer, cpu, ts); 2000 spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 2001 2002 return event; 2003 } 2004 2005 /** 2006 * ring_buffer_iter_peek - peek at the next event to be read 2007 * @iter: The ring buffer iterator 2008 * @ts: The timestamp counter of this event. 2009 * 2010 * This will return the event that will be read next, but does 2011 * not increment the iterator. 2012 */ 2013 struct ring_buffer_event * 2014 ring_buffer_iter_peek(struct ring_buffer_iter *iter, u64 *ts) 2015 { 2016 struct ring_buffer_per_cpu *cpu_buffer = iter->cpu_buffer; 2017 struct ring_buffer_event *event; 2018 unsigned long flags; 2019 2020 spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 2021 event = rb_iter_peek(iter, ts); 2022 spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 2023 2024 return event; 2025 } 2026 2027 /** 2028 * ring_buffer_consume - return an event and consume it 2029 * @buffer: The ring buffer to get the next event from 2030 * 2031 * Returns the next event in the ring buffer, and that event is consumed. 2032 * Meaning, that sequential reads will keep returning a different event, 2033 * and eventually empty the ring buffer if the producer is slower. 2034 */ 2035 struct ring_buffer_event * 2036 ring_buffer_consume(struct ring_buffer *buffer, int cpu, u64 *ts) 2037 { 2038 struct ring_buffer_per_cpu *cpu_buffer = buffer->buffers[cpu]; 2039 struct ring_buffer_event *event; 2040 unsigned long flags; 2041 2042 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 2043 return NULL; 2044 2045 spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 2046 2047 event = rb_buffer_peek(buffer, cpu, ts); 2048 if (!event) 2049 goto out; 2050 2051 rb_advance_reader(cpu_buffer); 2052 2053 out: 2054 spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 2055 2056 return event; 2057 } 2058 EXPORT_SYMBOL_GPL(ring_buffer_consume); 2059 2060 /** 2061 * ring_buffer_read_start - start a non consuming read of the buffer 2062 * @buffer: The ring buffer to read from 2063 * @cpu: The cpu buffer to iterate over 2064 * 2065 * This starts up an iteration through the buffer. It also disables 2066 * the recording to the buffer until the reading is finished. 2067 * This prevents the reading from being corrupted. This is not 2068 * a consuming read, so a producer is not expected. 2069 * 2070 * Must be paired with ring_buffer_finish. 2071 */ 2072 struct ring_buffer_iter * 2073 ring_buffer_read_start(struct ring_buffer *buffer, int cpu) 2074 { 2075 struct ring_buffer_per_cpu *cpu_buffer; 2076 struct ring_buffer_iter *iter; 2077 unsigned long flags; 2078 2079 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 2080 return NULL; 2081 2082 iter = kmalloc(sizeof(*iter), GFP_KERNEL); 2083 if (!iter) 2084 return NULL; 2085 2086 cpu_buffer = buffer->buffers[cpu]; 2087 2088 iter->cpu_buffer = cpu_buffer; 2089 2090 atomic_inc(&cpu_buffer->record_disabled); 2091 synchronize_sched(); 2092 2093 spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 2094 __raw_spin_lock(&cpu_buffer->lock); 2095 rb_iter_reset(iter); 2096 __raw_spin_unlock(&cpu_buffer->lock); 2097 spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 2098 2099 return iter; 2100 } 2101 EXPORT_SYMBOL_GPL(ring_buffer_read_start); 2102 2103 /** 2104 * ring_buffer_finish - finish reading the iterator of the buffer 2105 * @iter: The iterator retrieved by ring_buffer_start 2106 * 2107 * This re-enables the recording to the buffer, and frees the 2108 * iterator. 2109 */ 2110 void 2111 ring_buffer_read_finish(struct ring_buffer_iter *iter) 2112 { 2113 struct ring_buffer_per_cpu *cpu_buffer = iter->cpu_buffer; 2114 2115 atomic_dec(&cpu_buffer->record_disabled); 2116 kfree(iter); 2117 } 2118 EXPORT_SYMBOL_GPL(ring_buffer_read_finish); 2119 2120 /** 2121 * ring_buffer_read - read the next item in the ring buffer by the iterator 2122 * @iter: The ring buffer iterator 2123 * @ts: The time stamp of the event read. 2124 * 2125 * This reads the next event in the ring buffer and increments the iterator. 2126 */ 2127 struct ring_buffer_event * 2128 ring_buffer_read(struct ring_buffer_iter *iter, u64 *ts) 2129 { 2130 struct ring_buffer_event *event; 2131 struct ring_buffer_per_cpu *cpu_buffer = iter->cpu_buffer; 2132 unsigned long flags; 2133 2134 spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 2135 event = rb_iter_peek(iter, ts); 2136 if (!event) 2137 goto out; 2138 2139 rb_advance_iter(iter); 2140 out: 2141 spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 2142 2143 return event; 2144 } 2145 EXPORT_SYMBOL_GPL(ring_buffer_read); 2146 2147 /** 2148 * ring_buffer_size - return the size of the ring buffer (in bytes) 2149 * @buffer: The ring buffer. 2150 */ 2151 unsigned long ring_buffer_size(struct ring_buffer *buffer) 2152 { 2153 return BUF_PAGE_SIZE * buffer->pages; 2154 } 2155 EXPORT_SYMBOL_GPL(ring_buffer_size); 2156 2157 static void 2158 rb_reset_cpu(struct ring_buffer_per_cpu *cpu_buffer) 2159 { 2160 cpu_buffer->head_page 2161 = list_entry(cpu_buffer->pages.next, struct buffer_page, list); 2162 local_set(&cpu_buffer->head_page->write, 0); 2163 local_set(&cpu_buffer->head_page->page->commit, 0); 2164 2165 cpu_buffer->head_page->read = 0; 2166 2167 cpu_buffer->tail_page = cpu_buffer->head_page; 2168 cpu_buffer->commit_page = cpu_buffer->head_page; 2169 2170 INIT_LIST_HEAD(&cpu_buffer->reader_page->list); 2171 local_set(&cpu_buffer->reader_page->write, 0); 2172 local_set(&cpu_buffer->reader_page->page->commit, 0); 2173 cpu_buffer->reader_page->read = 0; 2174 2175 cpu_buffer->overrun = 0; 2176 cpu_buffer->entries = 0; 2177 2178 cpu_buffer->write_stamp = 0; 2179 cpu_buffer->read_stamp = 0; 2180 } 2181 2182 /** 2183 * ring_buffer_reset_cpu - reset a ring buffer per CPU buffer 2184 * @buffer: The ring buffer to reset a per cpu buffer of 2185 * @cpu: The CPU buffer to be reset 2186 */ 2187 void ring_buffer_reset_cpu(struct ring_buffer *buffer, int cpu) 2188 { 2189 struct ring_buffer_per_cpu *cpu_buffer = buffer->buffers[cpu]; 2190 unsigned long flags; 2191 2192 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 2193 return; 2194 2195 spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 2196 2197 __raw_spin_lock(&cpu_buffer->lock); 2198 2199 rb_reset_cpu(cpu_buffer); 2200 2201 __raw_spin_unlock(&cpu_buffer->lock); 2202 2203 spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 2204 } 2205 EXPORT_SYMBOL_GPL(ring_buffer_reset_cpu); 2206 2207 /** 2208 * ring_buffer_reset - reset a ring buffer 2209 * @buffer: The ring buffer to reset all cpu buffers 2210 */ 2211 void ring_buffer_reset(struct ring_buffer *buffer) 2212 { 2213 int cpu; 2214 2215 for_each_buffer_cpu(buffer, cpu) 2216 ring_buffer_reset_cpu(buffer, cpu); 2217 } 2218 EXPORT_SYMBOL_GPL(ring_buffer_reset); 2219 2220 /** 2221 * rind_buffer_empty - is the ring buffer empty? 2222 * @buffer: The ring buffer to test 2223 */ 2224 int ring_buffer_empty(struct ring_buffer *buffer) 2225 { 2226 struct ring_buffer_per_cpu *cpu_buffer; 2227 int cpu; 2228 2229 /* yes this is racy, but if you don't like the race, lock the buffer */ 2230 for_each_buffer_cpu(buffer, cpu) { 2231 cpu_buffer = buffer->buffers[cpu]; 2232 if (!rb_per_cpu_empty(cpu_buffer)) 2233 return 0; 2234 } 2235 return 1; 2236 } 2237 EXPORT_SYMBOL_GPL(ring_buffer_empty); 2238 2239 /** 2240 * ring_buffer_empty_cpu - is a cpu buffer of a ring buffer empty? 2241 * @buffer: The ring buffer 2242 * @cpu: The CPU buffer to test 2243 */ 2244 int ring_buffer_empty_cpu(struct ring_buffer *buffer, int cpu) 2245 { 2246 struct ring_buffer_per_cpu *cpu_buffer; 2247 2248 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 2249 return 1; 2250 2251 cpu_buffer = buffer->buffers[cpu]; 2252 return rb_per_cpu_empty(cpu_buffer); 2253 } 2254 EXPORT_SYMBOL_GPL(ring_buffer_empty_cpu); 2255 2256 /** 2257 * ring_buffer_swap_cpu - swap a CPU buffer between two ring buffers 2258 * @buffer_a: One buffer to swap with 2259 * @buffer_b: The other buffer to swap with 2260 * 2261 * This function is useful for tracers that want to take a "snapshot" 2262 * of a CPU buffer and has another back up buffer lying around. 2263 * it is expected that the tracer handles the cpu buffer not being 2264 * used at the moment. 2265 */ 2266 int ring_buffer_swap_cpu(struct ring_buffer *buffer_a, 2267 struct ring_buffer *buffer_b, int cpu) 2268 { 2269 struct ring_buffer_per_cpu *cpu_buffer_a; 2270 struct ring_buffer_per_cpu *cpu_buffer_b; 2271 2272 if (!cpumask_test_cpu(cpu, buffer_a->cpumask) || 2273 !cpumask_test_cpu(cpu, buffer_b->cpumask)) 2274 return -EINVAL; 2275 2276 /* At least make sure the two buffers are somewhat the same */ 2277 if (buffer_a->pages != buffer_b->pages) 2278 return -EINVAL; 2279 2280 cpu_buffer_a = buffer_a->buffers[cpu]; 2281 cpu_buffer_b = buffer_b->buffers[cpu]; 2282 2283 /* 2284 * We can't do a synchronize_sched here because this 2285 * function can be called in atomic context. 2286 * Normally this will be called from the same CPU as cpu. 2287 * If not it's up to the caller to protect this. 2288 */ 2289 atomic_inc(&cpu_buffer_a->record_disabled); 2290 atomic_inc(&cpu_buffer_b->record_disabled); 2291 2292 buffer_a->buffers[cpu] = cpu_buffer_b; 2293 buffer_b->buffers[cpu] = cpu_buffer_a; 2294 2295 cpu_buffer_b->buffer = buffer_a; 2296 cpu_buffer_a->buffer = buffer_b; 2297 2298 atomic_dec(&cpu_buffer_a->record_disabled); 2299 atomic_dec(&cpu_buffer_b->record_disabled); 2300 2301 return 0; 2302 } 2303 EXPORT_SYMBOL_GPL(ring_buffer_swap_cpu); 2304 2305 static void rb_remove_entries(struct ring_buffer_per_cpu *cpu_buffer, 2306 struct buffer_data_page *bpage) 2307 { 2308 struct ring_buffer_event *event; 2309 unsigned long head; 2310 2311 __raw_spin_lock(&cpu_buffer->lock); 2312 for (head = 0; head < local_read(&bpage->commit); 2313 head += rb_event_length(event)) { 2314 2315 event = __rb_data_page_index(bpage, head); 2316 if (RB_WARN_ON(cpu_buffer, rb_null_event(event))) 2317 return; 2318 /* Only count data entries */ 2319 if (event->type != RINGBUF_TYPE_DATA) 2320 continue; 2321 cpu_buffer->entries--; 2322 } 2323 __raw_spin_unlock(&cpu_buffer->lock); 2324 } 2325 2326 /** 2327 * ring_buffer_alloc_read_page - allocate a page to read from buffer 2328 * @buffer: the buffer to allocate for. 2329 * 2330 * This function is used in conjunction with ring_buffer_read_page. 2331 * When reading a full page from the ring buffer, these functions 2332 * can be used to speed up the process. The calling function should 2333 * allocate a few pages first with this function. Then when it 2334 * needs to get pages from the ring buffer, it passes the result 2335 * of this function into ring_buffer_read_page, which will swap 2336 * the page that was allocated, with the read page of the buffer. 2337 * 2338 * Returns: 2339 * The page allocated, or NULL on error. 2340 */ 2341 void *ring_buffer_alloc_read_page(struct ring_buffer *buffer) 2342 { 2343 unsigned long addr; 2344 struct buffer_data_page *bpage; 2345 2346 addr = __get_free_page(GFP_KERNEL); 2347 if (!addr) 2348 return NULL; 2349 2350 bpage = (void *)addr; 2351 2352 return bpage; 2353 } 2354 2355 /** 2356 * ring_buffer_free_read_page - free an allocated read page 2357 * @buffer: the buffer the page was allocate for 2358 * @data: the page to free 2359 * 2360 * Free a page allocated from ring_buffer_alloc_read_page. 2361 */ 2362 void ring_buffer_free_read_page(struct ring_buffer *buffer, void *data) 2363 { 2364 free_page((unsigned long)data); 2365 } 2366 2367 /** 2368 * ring_buffer_read_page - extract a page from the ring buffer 2369 * @buffer: buffer to extract from 2370 * @data_page: the page to use allocated from ring_buffer_alloc_read_page 2371 * @cpu: the cpu of the buffer to extract 2372 * @full: should the extraction only happen when the page is full. 2373 * 2374 * This function will pull out a page from the ring buffer and consume it. 2375 * @data_page must be the address of the variable that was returned 2376 * from ring_buffer_alloc_read_page. This is because the page might be used 2377 * to swap with a page in the ring buffer. 2378 * 2379 * for example: 2380 * rpage = ring_buffer_alloc_page(buffer); 2381 * if (!rpage) 2382 * return error; 2383 * ret = ring_buffer_read_page(buffer, &rpage, cpu, 0); 2384 * if (ret) 2385 * process_page(rpage); 2386 * 2387 * When @full is set, the function will not return true unless 2388 * the writer is off the reader page. 2389 * 2390 * Note: it is up to the calling functions to handle sleeps and wakeups. 2391 * The ring buffer can be used anywhere in the kernel and can not 2392 * blindly call wake_up. The layer that uses the ring buffer must be 2393 * responsible for that. 2394 * 2395 * Returns: 2396 * 1 if data has been transferred 2397 * 0 if no data has been transferred. 2398 */ 2399 int ring_buffer_read_page(struct ring_buffer *buffer, 2400 void **data_page, int cpu, int full) 2401 { 2402 struct ring_buffer_per_cpu *cpu_buffer = buffer->buffers[cpu]; 2403 struct ring_buffer_event *event; 2404 struct buffer_data_page *bpage; 2405 unsigned long flags; 2406 int ret = 0; 2407 2408 if (!data_page) 2409 return 0; 2410 2411 bpage = *data_page; 2412 if (!bpage) 2413 return 0; 2414 2415 spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 2416 2417 /* 2418 * rb_buffer_peek will get the next ring buffer if 2419 * the current reader page is empty. 2420 */ 2421 event = rb_buffer_peek(buffer, cpu, NULL); 2422 if (!event) 2423 goto out; 2424 2425 /* check for data */ 2426 if (!local_read(&cpu_buffer->reader_page->page->commit)) 2427 goto out; 2428 /* 2429 * If the writer is already off of the read page, then simply 2430 * switch the read page with the given page. Otherwise 2431 * we need to copy the data from the reader to the writer. 2432 */ 2433 if (cpu_buffer->reader_page == cpu_buffer->commit_page) { 2434 unsigned int read = cpu_buffer->reader_page->read; 2435 2436 if (full) 2437 goto out; 2438 /* The writer is still on the reader page, we must copy */ 2439 bpage = cpu_buffer->reader_page->page; 2440 memcpy(bpage->data, 2441 cpu_buffer->reader_page->page->data + read, 2442 local_read(&bpage->commit) - read); 2443 2444 /* consume what was read */ 2445 cpu_buffer->reader_page += read; 2446 2447 } else { 2448 /* swap the pages */ 2449 rb_init_page(bpage); 2450 bpage = cpu_buffer->reader_page->page; 2451 cpu_buffer->reader_page->page = *data_page; 2452 cpu_buffer->reader_page->read = 0; 2453 *data_page = bpage; 2454 } 2455 ret = 1; 2456 2457 /* update the entry counter */ 2458 rb_remove_entries(cpu_buffer, bpage); 2459 out: 2460 spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 2461 2462 return ret; 2463 } 2464 2465 static ssize_t 2466 rb_simple_read(struct file *filp, char __user *ubuf, 2467 size_t cnt, loff_t *ppos) 2468 { 2469 long *p = filp->private_data; 2470 char buf[64]; 2471 int r; 2472 2473 if (test_bit(RB_BUFFERS_DISABLED_BIT, p)) 2474 r = sprintf(buf, "permanently disabled\n"); 2475 else 2476 r = sprintf(buf, "%d\n", test_bit(RB_BUFFERS_ON_BIT, p)); 2477 2478 return simple_read_from_buffer(ubuf, cnt, ppos, buf, r); 2479 } 2480 2481 static ssize_t 2482 rb_simple_write(struct file *filp, const char __user *ubuf, 2483 size_t cnt, loff_t *ppos) 2484 { 2485 long *p = filp->private_data; 2486 char buf[64]; 2487 long val; 2488 int ret; 2489 2490 if (cnt >= sizeof(buf)) 2491 return -EINVAL; 2492 2493 if (copy_from_user(&buf, ubuf, cnt)) 2494 return -EFAULT; 2495 2496 buf[cnt] = 0; 2497 2498 ret = strict_strtoul(buf, 10, &val); 2499 if (ret < 0) 2500 return ret; 2501 2502 if (val) 2503 set_bit(RB_BUFFERS_ON_BIT, p); 2504 else 2505 clear_bit(RB_BUFFERS_ON_BIT, p); 2506 2507 (*ppos)++; 2508 2509 return cnt; 2510 } 2511 2512 static struct file_operations rb_simple_fops = { 2513 .open = tracing_open_generic, 2514 .read = rb_simple_read, 2515 .write = rb_simple_write, 2516 }; 2517 2518 2519 static __init int rb_init_debugfs(void) 2520 { 2521 struct dentry *d_tracer; 2522 struct dentry *entry; 2523 2524 d_tracer = tracing_init_dentry(); 2525 2526 entry = debugfs_create_file("tracing_on", 0644, d_tracer, 2527 &ring_buffer_flags, &rb_simple_fops); 2528 if (!entry) 2529 pr_warning("Could not create debugfs 'tracing_on' entry\n"); 2530 2531 return 0; 2532 } 2533 2534 fs_initcall(rb_init_debugfs); 2535