1 /* 2 * Generic ring buffer 3 * 4 * Copyright (C) 2008 Steven Rostedt <srostedt@redhat.com> 5 */ 6 #include <linux/ring_buffer.h> 7 #include <linux/trace_clock.h> 8 #include <linux/ftrace_irq.h> 9 #include <linux/spinlock.h> 10 #include <linux/debugfs.h> 11 #include <linux/uaccess.h> 12 #include <linux/hardirq.h> 13 #include <linux/kmemcheck.h> 14 #include <linux/module.h> 15 #include <linux/percpu.h> 16 #include <linux/mutex.h> 17 #include <linux/slab.h> 18 #include <linux/init.h> 19 #include <linux/hash.h> 20 #include <linux/list.h> 21 #include <linux/cpu.h> 22 #include <linux/fs.h> 23 24 #include <asm/local.h> 25 #include "trace.h" 26 27 /* 28 * The ring buffer header is special. We must manually up keep it. 29 */ 30 int ring_buffer_print_entry_header(struct trace_seq *s) 31 { 32 int ret; 33 34 ret = trace_seq_printf(s, "# compressed entry header\n"); 35 ret = trace_seq_printf(s, "\ttype_len : 5 bits\n"); 36 ret = trace_seq_printf(s, "\ttime_delta : 27 bits\n"); 37 ret = trace_seq_printf(s, "\tarray : 32 bits\n"); 38 ret = trace_seq_printf(s, "\n"); 39 ret = trace_seq_printf(s, "\tpadding : type == %d\n", 40 RINGBUF_TYPE_PADDING); 41 ret = trace_seq_printf(s, "\ttime_extend : type == %d\n", 42 RINGBUF_TYPE_TIME_EXTEND); 43 ret = trace_seq_printf(s, "\tdata max type_len == %d\n", 44 RINGBUF_TYPE_DATA_TYPE_LEN_MAX); 45 46 return ret; 47 } 48 49 /* 50 * The ring buffer is made up of a list of pages. A separate list of pages is 51 * allocated for each CPU. A writer may only write to a buffer that is 52 * associated with the CPU it is currently executing on. A reader may read 53 * from any per cpu buffer. 54 * 55 * The reader is special. For each per cpu buffer, the reader has its own 56 * reader page. When a reader has read the entire reader page, this reader 57 * page is swapped with another page in the ring buffer. 58 * 59 * Now, as long as the writer is off the reader page, the reader can do what 60 * ever it wants with that page. The writer will never write to that page 61 * again (as long as it is out of the ring buffer). 62 * 63 * Here's some silly ASCII art. 64 * 65 * +------+ 66 * |reader| RING BUFFER 67 * |page | 68 * +------+ +---+ +---+ +---+ 69 * | |-->| |-->| | 70 * +---+ +---+ +---+ 71 * ^ | 72 * | | 73 * +---------------+ 74 * 75 * 76 * +------+ 77 * |reader| RING BUFFER 78 * |page |------------------v 79 * +------+ +---+ +---+ +---+ 80 * | |-->| |-->| | 81 * +---+ +---+ +---+ 82 * ^ | 83 * | | 84 * +---------------+ 85 * 86 * 87 * +------+ 88 * |reader| RING BUFFER 89 * |page |------------------v 90 * +------+ +---+ +---+ +---+ 91 * ^ | |-->| |-->| | 92 * | +---+ +---+ +---+ 93 * | | 94 * | | 95 * +------------------------------+ 96 * 97 * 98 * +------+ 99 * |buffer| RING BUFFER 100 * |page |------------------v 101 * +------+ +---+ +---+ +---+ 102 * ^ | | | |-->| | 103 * | New +---+ +---+ +---+ 104 * | Reader------^ | 105 * | page | 106 * +------------------------------+ 107 * 108 * 109 * After we make this swap, the reader can hand this page off to the splice 110 * code and be done with it. It can even allocate a new page if it needs to 111 * and swap that into the ring buffer. 112 * 113 * We will be using cmpxchg soon to make all this lockless. 114 * 115 */ 116 117 /* 118 * A fast way to enable or disable all ring buffers is to 119 * call tracing_on or tracing_off. Turning off the ring buffers 120 * prevents all ring buffers from being recorded to. 121 * Turning this switch on, makes it OK to write to the 122 * ring buffer, if the ring buffer is enabled itself. 123 * 124 * There's three layers that must be on in order to write 125 * to the ring buffer. 126 * 127 * 1) This global flag must be set. 128 * 2) The ring buffer must be enabled for recording. 129 * 3) The per cpu buffer must be enabled for recording. 130 * 131 * In case of an anomaly, this global flag has a bit set that 132 * will permantly disable all ring buffers. 133 */ 134 135 /* 136 * Global flag to disable all recording to ring buffers 137 * This has two bits: ON, DISABLED 138 * 139 * ON DISABLED 140 * ---- ---------- 141 * 0 0 : ring buffers are off 142 * 1 0 : ring buffers are on 143 * X 1 : ring buffers are permanently disabled 144 */ 145 146 enum { 147 RB_BUFFERS_ON_BIT = 0, 148 RB_BUFFERS_DISABLED_BIT = 1, 149 }; 150 151 enum { 152 RB_BUFFERS_ON = 1 << RB_BUFFERS_ON_BIT, 153 RB_BUFFERS_DISABLED = 1 << RB_BUFFERS_DISABLED_BIT, 154 }; 155 156 static unsigned long ring_buffer_flags __read_mostly = RB_BUFFERS_ON; 157 158 #define BUF_PAGE_HDR_SIZE offsetof(struct buffer_data_page, data) 159 160 /** 161 * tracing_on - enable all tracing buffers 162 * 163 * This function enables all tracing buffers that may have been 164 * disabled with tracing_off. 165 */ 166 void tracing_on(void) 167 { 168 set_bit(RB_BUFFERS_ON_BIT, &ring_buffer_flags); 169 } 170 EXPORT_SYMBOL_GPL(tracing_on); 171 172 /** 173 * tracing_off - turn off all tracing buffers 174 * 175 * This function stops all tracing buffers from recording data. 176 * It does not disable any overhead the tracers themselves may 177 * be causing. This function simply causes all recording to 178 * the ring buffers to fail. 179 */ 180 void tracing_off(void) 181 { 182 clear_bit(RB_BUFFERS_ON_BIT, &ring_buffer_flags); 183 } 184 EXPORT_SYMBOL_GPL(tracing_off); 185 186 /** 187 * tracing_off_permanent - permanently disable ring buffers 188 * 189 * This function, once called, will disable all ring buffers 190 * permanently. 191 */ 192 void tracing_off_permanent(void) 193 { 194 set_bit(RB_BUFFERS_DISABLED_BIT, &ring_buffer_flags); 195 } 196 197 /** 198 * tracing_is_on - show state of ring buffers enabled 199 */ 200 int tracing_is_on(void) 201 { 202 return ring_buffer_flags == RB_BUFFERS_ON; 203 } 204 EXPORT_SYMBOL_GPL(tracing_is_on); 205 206 #define RB_EVNT_HDR_SIZE (offsetof(struct ring_buffer_event, array)) 207 #define RB_ALIGNMENT 4U 208 #define RB_MAX_SMALL_DATA (RB_ALIGNMENT * RINGBUF_TYPE_DATA_TYPE_LEN_MAX) 209 #define RB_EVNT_MIN_SIZE 8U /* two 32bit words */ 210 211 #if !defined(CONFIG_64BIT) || defined(CONFIG_HAVE_EFFICIENT_UNALIGNED_ACCESS) 212 # define RB_FORCE_8BYTE_ALIGNMENT 0 213 # define RB_ARCH_ALIGNMENT RB_ALIGNMENT 214 #else 215 # define RB_FORCE_8BYTE_ALIGNMENT 1 216 # define RB_ARCH_ALIGNMENT 8U 217 #endif 218 219 /* define RINGBUF_TYPE_DATA for 'case RINGBUF_TYPE_DATA:' */ 220 #define RINGBUF_TYPE_DATA 0 ... RINGBUF_TYPE_DATA_TYPE_LEN_MAX 221 222 enum { 223 RB_LEN_TIME_EXTEND = 8, 224 RB_LEN_TIME_STAMP = 16, 225 }; 226 227 #define skip_time_extend(event) \ 228 ((struct ring_buffer_event *)((char *)event + RB_LEN_TIME_EXTEND)) 229 230 static inline int rb_null_event(struct ring_buffer_event *event) 231 { 232 return event->type_len == RINGBUF_TYPE_PADDING && !event->time_delta; 233 } 234 235 static void rb_event_set_padding(struct ring_buffer_event *event) 236 { 237 /* padding has a NULL time_delta */ 238 event->type_len = RINGBUF_TYPE_PADDING; 239 event->time_delta = 0; 240 } 241 242 static unsigned 243 rb_event_data_length(struct ring_buffer_event *event) 244 { 245 unsigned length; 246 247 if (event->type_len) 248 length = event->type_len * RB_ALIGNMENT; 249 else 250 length = event->array[0]; 251 return length + RB_EVNT_HDR_SIZE; 252 } 253 254 /* 255 * Return the length of the given event. Will return 256 * the length of the time extend if the event is a 257 * time extend. 258 */ 259 static inline unsigned 260 rb_event_length(struct ring_buffer_event *event) 261 { 262 switch (event->type_len) { 263 case RINGBUF_TYPE_PADDING: 264 if (rb_null_event(event)) 265 /* undefined */ 266 return -1; 267 return event->array[0] + RB_EVNT_HDR_SIZE; 268 269 case RINGBUF_TYPE_TIME_EXTEND: 270 return RB_LEN_TIME_EXTEND; 271 272 case RINGBUF_TYPE_TIME_STAMP: 273 return RB_LEN_TIME_STAMP; 274 275 case RINGBUF_TYPE_DATA: 276 return rb_event_data_length(event); 277 default: 278 BUG(); 279 } 280 /* not hit */ 281 return 0; 282 } 283 284 /* 285 * Return total length of time extend and data, 286 * or just the event length for all other events. 287 */ 288 static inline unsigned 289 rb_event_ts_length(struct ring_buffer_event *event) 290 { 291 unsigned len = 0; 292 293 if (event->type_len == RINGBUF_TYPE_TIME_EXTEND) { 294 /* time extends include the data event after it */ 295 len = RB_LEN_TIME_EXTEND; 296 event = skip_time_extend(event); 297 } 298 return len + rb_event_length(event); 299 } 300 301 /** 302 * ring_buffer_event_length - return the length of the event 303 * @event: the event to get the length of 304 * 305 * Returns the size of the data load of a data event. 306 * If the event is something other than a data event, it 307 * returns the size of the event itself. With the exception 308 * of a TIME EXTEND, where it still returns the size of the 309 * data load of the data event after it. 310 */ 311 unsigned ring_buffer_event_length(struct ring_buffer_event *event) 312 { 313 unsigned length; 314 315 if (event->type_len == RINGBUF_TYPE_TIME_EXTEND) 316 event = skip_time_extend(event); 317 318 length = rb_event_length(event); 319 if (event->type_len > RINGBUF_TYPE_DATA_TYPE_LEN_MAX) 320 return length; 321 length -= RB_EVNT_HDR_SIZE; 322 if (length > RB_MAX_SMALL_DATA + sizeof(event->array[0])) 323 length -= sizeof(event->array[0]); 324 return length; 325 } 326 EXPORT_SYMBOL_GPL(ring_buffer_event_length); 327 328 /* inline for ring buffer fast paths */ 329 static void * 330 rb_event_data(struct ring_buffer_event *event) 331 { 332 if (event->type_len == RINGBUF_TYPE_TIME_EXTEND) 333 event = skip_time_extend(event); 334 BUG_ON(event->type_len > RINGBUF_TYPE_DATA_TYPE_LEN_MAX); 335 /* If length is in len field, then array[0] has the data */ 336 if (event->type_len) 337 return (void *)&event->array[0]; 338 /* Otherwise length is in array[0] and array[1] has the data */ 339 return (void *)&event->array[1]; 340 } 341 342 /** 343 * ring_buffer_event_data - return the data of the event 344 * @event: the event to get the data from 345 */ 346 void *ring_buffer_event_data(struct ring_buffer_event *event) 347 { 348 return rb_event_data(event); 349 } 350 EXPORT_SYMBOL_GPL(ring_buffer_event_data); 351 352 #define for_each_buffer_cpu(buffer, cpu) \ 353 for_each_cpu(cpu, buffer->cpumask) 354 355 #define TS_SHIFT 27 356 #define TS_MASK ((1ULL << TS_SHIFT) - 1) 357 #define TS_DELTA_TEST (~TS_MASK) 358 359 /* Flag when events were overwritten */ 360 #define RB_MISSED_EVENTS (1 << 31) 361 /* Missed count stored at end */ 362 #define RB_MISSED_STORED (1 << 30) 363 364 struct buffer_data_page { 365 u64 time_stamp; /* page time stamp */ 366 local_t commit; /* write committed index */ 367 unsigned char data[]; /* data of buffer page */ 368 }; 369 370 /* 371 * Note, the buffer_page list must be first. The buffer pages 372 * are allocated in cache lines, which means that each buffer 373 * page will be at the beginning of a cache line, and thus 374 * the least significant bits will be zero. We use this to 375 * add flags in the list struct pointers, to make the ring buffer 376 * lockless. 377 */ 378 struct buffer_page { 379 struct list_head list; /* list of buffer pages */ 380 local_t write; /* index for next write */ 381 unsigned read; /* index for next read */ 382 local_t entries; /* entries on this page */ 383 unsigned long real_end; /* real end of data */ 384 struct buffer_data_page *page; /* Actual data page */ 385 }; 386 387 /* 388 * The buffer page counters, write and entries, must be reset 389 * atomically when crossing page boundaries. To synchronize this 390 * update, two counters are inserted into the number. One is 391 * the actual counter for the write position or count on the page. 392 * 393 * The other is a counter of updaters. Before an update happens 394 * the update partition of the counter is incremented. This will 395 * allow the updater to update the counter atomically. 396 * 397 * The counter is 20 bits, and the state data is 12. 398 */ 399 #define RB_WRITE_MASK 0xfffff 400 #define RB_WRITE_INTCNT (1 << 20) 401 402 static void rb_init_page(struct buffer_data_page *bpage) 403 { 404 local_set(&bpage->commit, 0); 405 } 406 407 /** 408 * ring_buffer_page_len - the size of data on the page. 409 * @page: The page to read 410 * 411 * Returns the amount of data on the page, including buffer page header. 412 */ 413 size_t ring_buffer_page_len(void *page) 414 { 415 return local_read(&((struct buffer_data_page *)page)->commit) 416 + BUF_PAGE_HDR_SIZE; 417 } 418 419 /* 420 * Also stolen from mm/slob.c. Thanks to Mathieu Desnoyers for pointing 421 * this issue out. 422 */ 423 static void free_buffer_page(struct buffer_page *bpage) 424 { 425 free_page((unsigned long)bpage->page); 426 kfree(bpage); 427 } 428 429 /* 430 * We need to fit the time_stamp delta into 27 bits. 431 */ 432 static inline int test_time_stamp(u64 delta) 433 { 434 if (delta & TS_DELTA_TEST) 435 return 1; 436 return 0; 437 } 438 439 #define BUF_PAGE_SIZE (PAGE_SIZE - BUF_PAGE_HDR_SIZE) 440 441 /* Max payload is BUF_PAGE_SIZE - header (8bytes) */ 442 #define BUF_MAX_DATA_SIZE (BUF_PAGE_SIZE - (sizeof(u32) * 2)) 443 444 int ring_buffer_print_page_header(struct trace_seq *s) 445 { 446 struct buffer_data_page field; 447 int ret; 448 449 ret = trace_seq_printf(s, "\tfield: u64 timestamp;\t" 450 "offset:0;\tsize:%u;\tsigned:%u;\n", 451 (unsigned int)sizeof(field.time_stamp), 452 (unsigned int)is_signed_type(u64)); 453 454 ret = trace_seq_printf(s, "\tfield: local_t commit;\t" 455 "offset:%u;\tsize:%u;\tsigned:%u;\n", 456 (unsigned int)offsetof(typeof(field), commit), 457 (unsigned int)sizeof(field.commit), 458 (unsigned int)is_signed_type(long)); 459 460 ret = trace_seq_printf(s, "\tfield: int overwrite;\t" 461 "offset:%u;\tsize:%u;\tsigned:%u;\n", 462 (unsigned int)offsetof(typeof(field), commit), 463 1, 464 (unsigned int)is_signed_type(long)); 465 466 ret = trace_seq_printf(s, "\tfield: char data;\t" 467 "offset:%u;\tsize:%u;\tsigned:%u;\n", 468 (unsigned int)offsetof(typeof(field), data), 469 (unsigned int)BUF_PAGE_SIZE, 470 (unsigned int)is_signed_type(char)); 471 472 return ret; 473 } 474 475 /* 476 * head_page == tail_page && head == tail then buffer is empty. 477 */ 478 struct ring_buffer_per_cpu { 479 int cpu; 480 atomic_t record_disabled; 481 struct ring_buffer *buffer; 482 spinlock_t reader_lock; /* serialize readers */ 483 arch_spinlock_t lock; 484 struct lock_class_key lock_key; 485 struct list_head *pages; 486 struct buffer_page *head_page; /* read from head */ 487 struct buffer_page *tail_page; /* write to tail */ 488 struct buffer_page *commit_page; /* committed pages */ 489 struct buffer_page *reader_page; 490 unsigned long lost_events; 491 unsigned long last_overrun; 492 local_t commit_overrun; 493 local_t overrun; 494 local_t entries; 495 local_t committing; 496 local_t commits; 497 unsigned long read; 498 u64 write_stamp; 499 u64 read_stamp; 500 }; 501 502 struct ring_buffer { 503 unsigned pages; 504 unsigned flags; 505 int cpus; 506 atomic_t record_disabled; 507 cpumask_var_t cpumask; 508 509 struct lock_class_key *reader_lock_key; 510 511 struct mutex mutex; 512 513 struct ring_buffer_per_cpu **buffers; 514 515 #ifdef CONFIG_HOTPLUG_CPU 516 struct notifier_block cpu_notify; 517 #endif 518 u64 (*clock)(void); 519 }; 520 521 struct ring_buffer_iter { 522 struct ring_buffer_per_cpu *cpu_buffer; 523 unsigned long head; 524 struct buffer_page *head_page; 525 struct buffer_page *cache_reader_page; 526 unsigned long cache_read; 527 u64 read_stamp; 528 }; 529 530 /* buffer may be either ring_buffer or ring_buffer_per_cpu */ 531 #define RB_WARN_ON(b, cond) \ 532 ({ \ 533 int _____ret = unlikely(cond); \ 534 if (_____ret) { \ 535 if (__same_type(*(b), struct ring_buffer_per_cpu)) { \ 536 struct ring_buffer_per_cpu *__b = \ 537 (void *)b; \ 538 atomic_inc(&__b->buffer->record_disabled); \ 539 } else \ 540 atomic_inc(&b->record_disabled); \ 541 WARN_ON(1); \ 542 } \ 543 _____ret; \ 544 }) 545 546 /* Up this if you want to test the TIME_EXTENTS and normalization */ 547 #define DEBUG_SHIFT 0 548 549 static inline u64 rb_time_stamp(struct ring_buffer *buffer) 550 { 551 /* shift to debug/test normalization and TIME_EXTENTS */ 552 return buffer->clock() << DEBUG_SHIFT; 553 } 554 555 u64 ring_buffer_time_stamp(struct ring_buffer *buffer, int cpu) 556 { 557 u64 time; 558 559 preempt_disable_notrace(); 560 time = rb_time_stamp(buffer); 561 preempt_enable_no_resched_notrace(); 562 563 return time; 564 } 565 EXPORT_SYMBOL_GPL(ring_buffer_time_stamp); 566 567 void ring_buffer_normalize_time_stamp(struct ring_buffer *buffer, 568 int cpu, u64 *ts) 569 { 570 /* Just stupid testing the normalize function and deltas */ 571 *ts >>= DEBUG_SHIFT; 572 } 573 EXPORT_SYMBOL_GPL(ring_buffer_normalize_time_stamp); 574 575 /* 576 * Making the ring buffer lockless makes things tricky. 577 * Although writes only happen on the CPU that they are on, 578 * and they only need to worry about interrupts. Reads can 579 * happen on any CPU. 580 * 581 * The reader page is always off the ring buffer, but when the 582 * reader finishes with a page, it needs to swap its page with 583 * a new one from the buffer. The reader needs to take from 584 * the head (writes go to the tail). But if a writer is in overwrite 585 * mode and wraps, it must push the head page forward. 586 * 587 * Here lies the problem. 588 * 589 * The reader must be careful to replace only the head page, and 590 * not another one. As described at the top of the file in the 591 * ASCII art, the reader sets its old page to point to the next 592 * page after head. It then sets the page after head to point to 593 * the old reader page. But if the writer moves the head page 594 * during this operation, the reader could end up with the tail. 595 * 596 * We use cmpxchg to help prevent this race. We also do something 597 * special with the page before head. We set the LSB to 1. 598 * 599 * When the writer must push the page forward, it will clear the 600 * bit that points to the head page, move the head, and then set 601 * the bit that points to the new head page. 602 * 603 * We also don't want an interrupt coming in and moving the head 604 * page on another writer. Thus we use the second LSB to catch 605 * that too. Thus: 606 * 607 * head->list->prev->next bit 1 bit 0 608 * ------- ------- 609 * Normal page 0 0 610 * Points to head page 0 1 611 * New head page 1 0 612 * 613 * Note we can not trust the prev pointer of the head page, because: 614 * 615 * +----+ +-----+ +-----+ 616 * | |------>| T |---X--->| N | 617 * | |<------| | | | 618 * +----+ +-----+ +-----+ 619 * ^ ^ | 620 * | +-----+ | | 621 * +----------| R |----------+ | 622 * | |<-----------+ 623 * +-----+ 624 * 625 * Key: ---X--> HEAD flag set in pointer 626 * T Tail page 627 * R Reader page 628 * N Next page 629 * 630 * (see __rb_reserve_next() to see where this happens) 631 * 632 * What the above shows is that the reader just swapped out 633 * the reader page with a page in the buffer, but before it 634 * could make the new header point back to the new page added 635 * it was preempted by a writer. The writer moved forward onto 636 * the new page added by the reader and is about to move forward 637 * again. 638 * 639 * You can see, it is legitimate for the previous pointer of 640 * the head (or any page) not to point back to itself. But only 641 * temporarially. 642 */ 643 644 #define RB_PAGE_NORMAL 0UL 645 #define RB_PAGE_HEAD 1UL 646 #define RB_PAGE_UPDATE 2UL 647 648 649 #define RB_FLAG_MASK 3UL 650 651 /* PAGE_MOVED is not part of the mask */ 652 #define RB_PAGE_MOVED 4UL 653 654 /* 655 * rb_list_head - remove any bit 656 */ 657 static struct list_head *rb_list_head(struct list_head *list) 658 { 659 unsigned long val = (unsigned long)list; 660 661 return (struct list_head *)(val & ~RB_FLAG_MASK); 662 } 663 664 /* 665 * rb_is_head_page - test if the given page is the head page 666 * 667 * Because the reader may move the head_page pointer, we can 668 * not trust what the head page is (it may be pointing to 669 * the reader page). But if the next page is a header page, 670 * its flags will be non zero. 671 */ 672 static int inline 673 rb_is_head_page(struct ring_buffer_per_cpu *cpu_buffer, 674 struct buffer_page *page, struct list_head *list) 675 { 676 unsigned long val; 677 678 val = (unsigned long)list->next; 679 680 if ((val & ~RB_FLAG_MASK) != (unsigned long)&page->list) 681 return RB_PAGE_MOVED; 682 683 return val & RB_FLAG_MASK; 684 } 685 686 /* 687 * rb_is_reader_page 688 * 689 * The unique thing about the reader page, is that, if the 690 * writer is ever on it, the previous pointer never points 691 * back to the reader page. 692 */ 693 static int rb_is_reader_page(struct buffer_page *page) 694 { 695 struct list_head *list = page->list.prev; 696 697 return rb_list_head(list->next) != &page->list; 698 } 699 700 /* 701 * rb_set_list_to_head - set a list_head to be pointing to head. 702 */ 703 static void rb_set_list_to_head(struct ring_buffer_per_cpu *cpu_buffer, 704 struct list_head *list) 705 { 706 unsigned long *ptr; 707 708 ptr = (unsigned long *)&list->next; 709 *ptr |= RB_PAGE_HEAD; 710 *ptr &= ~RB_PAGE_UPDATE; 711 } 712 713 /* 714 * rb_head_page_activate - sets up head page 715 */ 716 static void rb_head_page_activate(struct ring_buffer_per_cpu *cpu_buffer) 717 { 718 struct buffer_page *head; 719 720 head = cpu_buffer->head_page; 721 if (!head) 722 return; 723 724 /* 725 * Set the previous list pointer to have the HEAD flag. 726 */ 727 rb_set_list_to_head(cpu_buffer, head->list.prev); 728 } 729 730 static void rb_list_head_clear(struct list_head *list) 731 { 732 unsigned long *ptr = (unsigned long *)&list->next; 733 734 *ptr &= ~RB_FLAG_MASK; 735 } 736 737 /* 738 * rb_head_page_dactivate - clears head page ptr (for free list) 739 */ 740 static void 741 rb_head_page_deactivate(struct ring_buffer_per_cpu *cpu_buffer) 742 { 743 struct list_head *hd; 744 745 /* Go through the whole list and clear any pointers found. */ 746 rb_list_head_clear(cpu_buffer->pages); 747 748 list_for_each(hd, cpu_buffer->pages) 749 rb_list_head_clear(hd); 750 } 751 752 static int rb_head_page_set(struct ring_buffer_per_cpu *cpu_buffer, 753 struct buffer_page *head, 754 struct buffer_page *prev, 755 int old_flag, int new_flag) 756 { 757 struct list_head *list; 758 unsigned long val = (unsigned long)&head->list; 759 unsigned long ret; 760 761 list = &prev->list; 762 763 val &= ~RB_FLAG_MASK; 764 765 ret = cmpxchg((unsigned long *)&list->next, 766 val | old_flag, val | new_flag); 767 768 /* check if the reader took the page */ 769 if ((ret & ~RB_FLAG_MASK) != val) 770 return RB_PAGE_MOVED; 771 772 return ret & RB_FLAG_MASK; 773 } 774 775 static int rb_head_page_set_update(struct ring_buffer_per_cpu *cpu_buffer, 776 struct buffer_page *head, 777 struct buffer_page *prev, 778 int old_flag) 779 { 780 return rb_head_page_set(cpu_buffer, head, prev, 781 old_flag, RB_PAGE_UPDATE); 782 } 783 784 static int rb_head_page_set_head(struct ring_buffer_per_cpu *cpu_buffer, 785 struct buffer_page *head, 786 struct buffer_page *prev, 787 int old_flag) 788 { 789 return rb_head_page_set(cpu_buffer, head, prev, 790 old_flag, RB_PAGE_HEAD); 791 } 792 793 static int rb_head_page_set_normal(struct ring_buffer_per_cpu *cpu_buffer, 794 struct buffer_page *head, 795 struct buffer_page *prev, 796 int old_flag) 797 { 798 return rb_head_page_set(cpu_buffer, head, prev, 799 old_flag, RB_PAGE_NORMAL); 800 } 801 802 static inline void rb_inc_page(struct ring_buffer_per_cpu *cpu_buffer, 803 struct buffer_page **bpage) 804 { 805 struct list_head *p = rb_list_head((*bpage)->list.next); 806 807 *bpage = list_entry(p, struct buffer_page, list); 808 } 809 810 static struct buffer_page * 811 rb_set_head_page(struct ring_buffer_per_cpu *cpu_buffer) 812 { 813 struct buffer_page *head; 814 struct buffer_page *page; 815 struct list_head *list; 816 int i; 817 818 if (RB_WARN_ON(cpu_buffer, !cpu_buffer->head_page)) 819 return NULL; 820 821 /* sanity check */ 822 list = cpu_buffer->pages; 823 if (RB_WARN_ON(cpu_buffer, rb_list_head(list->prev->next) != list)) 824 return NULL; 825 826 page = head = cpu_buffer->head_page; 827 /* 828 * It is possible that the writer moves the header behind 829 * where we started, and we miss in one loop. 830 * A second loop should grab the header, but we'll do 831 * three loops just because I'm paranoid. 832 */ 833 for (i = 0; i < 3; i++) { 834 do { 835 if (rb_is_head_page(cpu_buffer, page, page->list.prev)) { 836 cpu_buffer->head_page = page; 837 return page; 838 } 839 rb_inc_page(cpu_buffer, &page); 840 } while (page != head); 841 } 842 843 RB_WARN_ON(cpu_buffer, 1); 844 845 return NULL; 846 } 847 848 static int rb_head_page_replace(struct buffer_page *old, 849 struct buffer_page *new) 850 { 851 unsigned long *ptr = (unsigned long *)&old->list.prev->next; 852 unsigned long val; 853 unsigned long ret; 854 855 val = *ptr & ~RB_FLAG_MASK; 856 val |= RB_PAGE_HEAD; 857 858 ret = cmpxchg(ptr, val, (unsigned long)&new->list); 859 860 return ret == val; 861 } 862 863 /* 864 * rb_tail_page_update - move the tail page forward 865 * 866 * Returns 1 if moved tail page, 0 if someone else did. 867 */ 868 static int rb_tail_page_update(struct ring_buffer_per_cpu *cpu_buffer, 869 struct buffer_page *tail_page, 870 struct buffer_page *next_page) 871 { 872 struct buffer_page *old_tail; 873 unsigned long old_entries; 874 unsigned long old_write; 875 int ret = 0; 876 877 /* 878 * The tail page now needs to be moved forward. 879 * 880 * We need to reset the tail page, but without messing 881 * with possible erasing of data brought in by interrupts 882 * that have moved the tail page and are currently on it. 883 * 884 * We add a counter to the write field to denote this. 885 */ 886 old_write = local_add_return(RB_WRITE_INTCNT, &next_page->write); 887 old_entries = local_add_return(RB_WRITE_INTCNT, &next_page->entries); 888 889 /* 890 * Just make sure we have seen our old_write and synchronize 891 * with any interrupts that come in. 892 */ 893 barrier(); 894 895 /* 896 * If the tail page is still the same as what we think 897 * it is, then it is up to us to update the tail 898 * pointer. 899 */ 900 if (tail_page == cpu_buffer->tail_page) { 901 /* Zero the write counter */ 902 unsigned long val = old_write & ~RB_WRITE_MASK; 903 unsigned long eval = old_entries & ~RB_WRITE_MASK; 904 905 /* 906 * This will only succeed if an interrupt did 907 * not come in and change it. In which case, we 908 * do not want to modify it. 909 * 910 * We add (void) to let the compiler know that we do not care 911 * about the return value of these functions. We use the 912 * cmpxchg to only update if an interrupt did not already 913 * do it for us. If the cmpxchg fails, we don't care. 914 */ 915 (void)local_cmpxchg(&next_page->write, old_write, val); 916 (void)local_cmpxchg(&next_page->entries, old_entries, eval); 917 918 /* 919 * No need to worry about races with clearing out the commit. 920 * it only can increment when a commit takes place. But that 921 * only happens in the outer most nested commit. 922 */ 923 local_set(&next_page->page->commit, 0); 924 925 old_tail = cmpxchg(&cpu_buffer->tail_page, 926 tail_page, next_page); 927 928 if (old_tail == tail_page) 929 ret = 1; 930 } 931 932 return ret; 933 } 934 935 static int rb_check_bpage(struct ring_buffer_per_cpu *cpu_buffer, 936 struct buffer_page *bpage) 937 { 938 unsigned long val = (unsigned long)bpage; 939 940 if (RB_WARN_ON(cpu_buffer, val & RB_FLAG_MASK)) 941 return 1; 942 943 return 0; 944 } 945 946 /** 947 * rb_check_list - make sure a pointer to a list has the last bits zero 948 */ 949 static int rb_check_list(struct ring_buffer_per_cpu *cpu_buffer, 950 struct list_head *list) 951 { 952 if (RB_WARN_ON(cpu_buffer, rb_list_head(list->prev) != list->prev)) 953 return 1; 954 if (RB_WARN_ON(cpu_buffer, rb_list_head(list->next) != list->next)) 955 return 1; 956 return 0; 957 } 958 959 /** 960 * check_pages - integrity check of buffer pages 961 * @cpu_buffer: CPU buffer with pages to test 962 * 963 * As a safety measure we check to make sure the data pages have not 964 * been corrupted. 965 */ 966 static int rb_check_pages(struct ring_buffer_per_cpu *cpu_buffer) 967 { 968 struct list_head *head = cpu_buffer->pages; 969 struct buffer_page *bpage, *tmp; 970 971 rb_head_page_deactivate(cpu_buffer); 972 973 if (RB_WARN_ON(cpu_buffer, head->next->prev != head)) 974 return -1; 975 if (RB_WARN_ON(cpu_buffer, head->prev->next != head)) 976 return -1; 977 978 if (rb_check_list(cpu_buffer, head)) 979 return -1; 980 981 list_for_each_entry_safe(bpage, tmp, head, list) { 982 if (RB_WARN_ON(cpu_buffer, 983 bpage->list.next->prev != &bpage->list)) 984 return -1; 985 if (RB_WARN_ON(cpu_buffer, 986 bpage->list.prev->next != &bpage->list)) 987 return -1; 988 if (rb_check_list(cpu_buffer, &bpage->list)) 989 return -1; 990 } 991 992 rb_head_page_activate(cpu_buffer); 993 994 return 0; 995 } 996 997 static int rb_allocate_pages(struct ring_buffer_per_cpu *cpu_buffer, 998 unsigned nr_pages) 999 { 1000 struct buffer_page *bpage, *tmp; 1001 unsigned long addr; 1002 LIST_HEAD(pages); 1003 unsigned i; 1004 1005 WARN_ON(!nr_pages); 1006 1007 for (i = 0; i < nr_pages; i++) { 1008 bpage = kzalloc_node(ALIGN(sizeof(*bpage), cache_line_size()), 1009 GFP_KERNEL, cpu_to_node(cpu_buffer->cpu)); 1010 if (!bpage) 1011 goto free_pages; 1012 1013 rb_check_bpage(cpu_buffer, bpage); 1014 1015 list_add(&bpage->list, &pages); 1016 1017 addr = __get_free_page(GFP_KERNEL); 1018 if (!addr) 1019 goto free_pages; 1020 bpage->page = (void *)addr; 1021 rb_init_page(bpage->page); 1022 } 1023 1024 /* 1025 * The ring buffer page list is a circular list that does not 1026 * start and end with a list head. All page list items point to 1027 * other pages. 1028 */ 1029 cpu_buffer->pages = pages.next; 1030 list_del(&pages); 1031 1032 rb_check_pages(cpu_buffer); 1033 1034 return 0; 1035 1036 free_pages: 1037 list_for_each_entry_safe(bpage, tmp, &pages, list) { 1038 list_del_init(&bpage->list); 1039 free_buffer_page(bpage); 1040 } 1041 return -ENOMEM; 1042 } 1043 1044 static struct ring_buffer_per_cpu * 1045 rb_allocate_cpu_buffer(struct ring_buffer *buffer, int cpu) 1046 { 1047 struct ring_buffer_per_cpu *cpu_buffer; 1048 struct buffer_page *bpage; 1049 unsigned long addr; 1050 int ret; 1051 1052 cpu_buffer = kzalloc_node(ALIGN(sizeof(*cpu_buffer), cache_line_size()), 1053 GFP_KERNEL, cpu_to_node(cpu)); 1054 if (!cpu_buffer) 1055 return NULL; 1056 1057 cpu_buffer->cpu = cpu; 1058 cpu_buffer->buffer = buffer; 1059 spin_lock_init(&cpu_buffer->reader_lock); 1060 lockdep_set_class(&cpu_buffer->reader_lock, buffer->reader_lock_key); 1061 cpu_buffer->lock = (arch_spinlock_t)__ARCH_SPIN_LOCK_UNLOCKED; 1062 1063 bpage = kzalloc_node(ALIGN(sizeof(*bpage), cache_line_size()), 1064 GFP_KERNEL, cpu_to_node(cpu)); 1065 if (!bpage) 1066 goto fail_free_buffer; 1067 1068 rb_check_bpage(cpu_buffer, bpage); 1069 1070 cpu_buffer->reader_page = bpage; 1071 addr = __get_free_page(GFP_KERNEL); 1072 if (!addr) 1073 goto fail_free_reader; 1074 bpage->page = (void *)addr; 1075 rb_init_page(bpage->page); 1076 1077 INIT_LIST_HEAD(&cpu_buffer->reader_page->list); 1078 1079 ret = rb_allocate_pages(cpu_buffer, buffer->pages); 1080 if (ret < 0) 1081 goto fail_free_reader; 1082 1083 cpu_buffer->head_page 1084 = list_entry(cpu_buffer->pages, struct buffer_page, list); 1085 cpu_buffer->tail_page = cpu_buffer->commit_page = cpu_buffer->head_page; 1086 1087 rb_head_page_activate(cpu_buffer); 1088 1089 return cpu_buffer; 1090 1091 fail_free_reader: 1092 free_buffer_page(cpu_buffer->reader_page); 1093 1094 fail_free_buffer: 1095 kfree(cpu_buffer); 1096 return NULL; 1097 } 1098 1099 static void rb_free_cpu_buffer(struct ring_buffer_per_cpu *cpu_buffer) 1100 { 1101 struct list_head *head = cpu_buffer->pages; 1102 struct buffer_page *bpage, *tmp; 1103 1104 free_buffer_page(cpu_buffer->reader_page); 1105 1106 rb_head_page_deactivate(cpu_buffer); 1107 1108 if (head) { 1109 list_for_each_entry_safe(bpage, tmp, head, list) { 1110 list_del_init(&bpage->list); 1111 free_buffer_page(bpage); 1112 } 1113 bpage = list_entry(head, struct buffer_page, list); 1114 free_buffer_page(bpage); 1115 } 1116 1117 kfree(cpu_buffer); 1118 } 1119 1120 #ifdef CONFIG_HOTPLUG_CPU 1121 static int rb_cpu_notify(struct notifier_block *self, 1122 unsigned long action, void *hcpu); 1123 #endif 1124 1125 /** 1126 * ring_buffer_alloc - allocate a new ring_buffer 1127 * @size: the size in bytes per cpu that is needed. 1128 * @flags: attributes to set for the ring buffer. 1129 * 1130 * Currently the only flag that is available is the RB_FL_OVERWRITE 1131 * flag. This flag means that the buffer will overwrite old data 1132 * when the buffer wraps. If this flag is not set, the buffer will 1133 * drop data when the tail hits the head. 1134 */ 1135 struct ring_buffer *__ring_buffer_alloc(unsigned long size, unsigned flags, 1136 struct lock_class_key *key) 1137 { 1138 struct ring_buffer *buffer; 1139 int bsize; 1140 int cpu; 1141 1142 /* keep it in its own cache line */ 1143 buffer = kzalloc(ALIGN(sizeof(*buffer), cache_line_size()), 1144 GFP_KERNEL); 1145 if (!buffer) 1146 return NULL; 1147 1148 if (!alloc_cpumask_var(&buffer->cpumask, GFP_KERNEL)) 1149 goto fail_free_buffer; 1150 1151 buffer->pages = DIV_ROUND_UP(size, BUF_PAGE_SIZE); 1152 buffer->flags = flags; 1153 buffer->clock = trace_clock_local; 1154 buffer->reader_lock_key = key; 1155 1156 /* need at least two pages */ 1157 if (buffer->pages < 2) 1158 buffer->pages = 2; 1159 1160 /* 1161 * In case of non-hotplug cpu, if the ring-buffer is allocated 1162 * in early initcall, it will not be notified of secondary cpus. 1163 * In that off case, we need to allocate for all possible cpus. 1164 */ 1165 #ifdef CONFIG_HOTPLUG_CPU 1166 get_online_cpus(); 1167 cpumask_copy(buffer->cpumask, cpu_online_mask); 1168 #else 1169 cpumask_copy(buffer->cpumask, cpu_possible_mask); 1170 #endif 1171 buffer->cpus = nr_cpu_ids; 1172 1173 bsize = sizeof(void *) * nr_cpu_ids; 1174 buffer->buffers = kzalloc(ALIGN(bsize, cache_line_size()), 1175 GFP_KERNEL); 1176 if (!buffer->buffers) 1177 goto fail_free_cpumask; 1178 1179 for_each_buffer_cpu(buffer, cpu) { 1180 buffer->buffers[cpu] = 1181 rb_allocate_cpu_buffer(buffer, cpu); 1182 if (!buffer->buffers[cpu]) 1183 goto fail_free_buffers; 1184 } 1185 1186 #ifdef CONFIG_HOTPLUG_CPU 1187 buffer->cpu_notify.notifier_call = rb_cpu_notify; 1188 buffer->cpu_notify.priority = 0; 1189 register_cpu_notifier(&buffer->cpu_notify); 1190 #endif 1191 1192 put_online_cpus(); 1193 mutex_init(&buffer->mutex); 1194 1195 return buffer; 1196 1197 fail_free_buffers: 1198 for_each_buffer_cpu(buffer, cpu) { 1199 if (buffer->buffers[cpu]) 1200 rb_free_cpu_buffer(buffer->buffers[cpu]); 1201 } 1202 kfree(buffer->buffers); 1203 1204 fail_free_cpumask: 1205 free_cpumask_var(buffer->cpumask); 1206 put_online_cpus(); 1207 1208 fail_free_buffer: 1209 kfree(buffer); 1210 return NULL; 1211 } 1212 EXPORT_SYMBOL_GPL(__ring_buffer_alloc); 1213 1214 /** 1215 * ring_buffer_free - free a ring buffer. 1216 * @buffer: the buffer to free. 1217 */ 1218 void 1219 ring_buffer_free(struct ring_buffer *buffer) 1220 { 1221 int cpu; 1222 1223 get_online_cpus(); 1224 1225 #ifdef CONFIG_HOTPLUG_CPU 1226 unregister_cpu_notifier(&buffer->cpu_notify); 1227 #endif 1228 1229 for_each_buffer_cpu(buffer, cpu) 1230 rb_free_cpu_buffer(buffer->buffers[cpu]); 1231 1232 put_online_cpus(); 1233 1234 kfree(buffer->buffers); 1235 free_cpumask_var(buffer->cpumask); 1236 1237 kfree(buffer); 1238 } 1239 EXPORT_SYMBOL_GPL(ring_buffer_free); 1240 1241 void ring_buffer_set_clock(struct ring_buffer *buffer, 1242 u64 (*clock)(void)) 1243 { 1244 buffer->clock = clock; 1245 } 1246 1247 static void rb_reset_cpu(struct ring_buffer_per_cpu *cpu_buffer); 1248 1249 static void 1250 rb_remove_pages(struct ring_buffer_per_cpu *cpu_buffer, unsigned nr_pages) 1251 { 1252 struct buffer_page *bpage; 1253 struct list_head *p; 1254 unsigned i; 1255 1256 spin_lock_irq(&cpu_buffer->reader_lock); 1257 rb_head_page_deactivate(cpu_buffer); 1258 1259 for (i = 0; i < nr_pages; i++) { 1260 if (RB_WARN_ON(cpu_buffer, list_empty(cpu_buffer->pages))) 1261 goto out; 1262 p = cpu_buffer->pages->next; 1263 bpage = list_entry(p, struct buffer_page, list); 1264 list_del_init(&bpage->list); 1265 free_buffer_page(bpage); 1266 } 1267 if (RB_WARN_ON(cpu_buffer, list_empty(cpu_buffer->pages))) 1268 goto out; 1269 1270 rb_reset_cpu(cpu_buffer); 1271 rb_check_pages(cpu_buffer); 1272 1273 out: 1274 spin_unlock_irq(&cpu_buffer->reader_lock); 1275 } 1276 1277 static void 1278 rb_insert_pages(struct ring_buffer_per_cpu *cpu_buffer, 1279 struct list_head *pages, unsigned nr_pages) 1280 { 1281 struct buffer_page *bpage; 1282 struct list_head *p; 1283 unsigned i; 1284 1285 spin_lock_irq(&cpu_buffer->reader_lock); 1286 rb_head_page_deactivate(cpu_buffer); 1287 1288 for (i = 0; i < nr_pages; i++) { 1289 if (RB_WARN_ON(cpu_buffer, list_empty(pages))) 1290 goto out; 1291 p = pages->next; 1292 bpage = list_entry(p, struct buffer_page, list); 1293 list_del_init(&bpage->list); 1294 list_add_tail(&bpage->list, cpu_buffer->pages); 1295 } 1296 rb_reset_cpu(cpu_buffer); 1297 rb_check_pages(cpu_buffer); 1298 1299 out: 1300 spin_unlock_irq(&cpu_buffer->reader_lock); 1301 } 1302 1303 /** 1304 * ring_buffer_resize - resize the ring buffer 1305 * @buffer: the buffer to resize. 1306 * @size: the new size. 1307 * 1308 * Minimum size is 2 * BUF_PAGE_SIZE. 1309 * 1310 * Returns -1 on failure. 1311 */ 1312 int ring_buffer_resize(struct ring_buffer *buffer, unsigned long size) 1313 { 1314 struct ring_buffer_per_cpu *cpu_buffer; 1315 unsigned nr_pages, rm_pages, new_pages; 1316 struct buffer_page *bpage, *tmp; 1317 unsigned long buffer_size; 1318 unsigned long addr; 1319 LIST_HEAD(pages); 1320 int i, cpu; 1321 1322 /* 1323 * Always succeed at resizing a non-existent buffer: 1324 */ 1325 if (!buffer) 1326 return size; 1327 1328 size = DIV_ROUND_UP(size, BUF_PAGE_SIZE); 1329 size *= BUF_PAGE_SIZE; 1330 buffer_size = buffer->pages * BUF_PAGE_SIZE; 1331 1332 /* we need a minimum of two pages */ 1333 if (size < BUF_PAGE_SIZE * 2) 1334 size = BUF_PAGE_SIZE * 2; 1335 1336 if (size == buffer_size) 1337 return size; 1338 1339 atomic_inc(&buffer->record_disabled); 1340 1341 /* Make sure all writers are done with this buffer. */ 1342 synchronize_sched(); 1343 1344 mutex_lock(&buffer->mutex); 1345 get_online_cpus(); 1346 1347 nr_pages = DIV_ROUND_UP(size, BUF_PAGE_SIZE); 1348 1349 if (size < buffer_size) { 1350 1351 /* easy case, just free pages */ 1352 if (RB_WARN_ON(buffer, nr_pages >= buffer->pages)) 1353 goto out_fail; 1354 1355 rm_pages = buffer->pages - nr_pages; 1356 1357 for_each_buffer_cpu(buffer, cpu) { 1358 cpu_buffer = buffer->buffers[cpu]; 1359 rb_remove_pages(cpu_buffer, rm_pages); 1360 } 1361 goto out; 1362 } 1363 1364 /* 1365 * This is a bit more difficult. We only want to add pages 1366 * when we can allocate enough for all CPUs. We do this 1367 * by allocating all the pages and storing them on a local 1368 * link list. If we succeed in our allocation, then we 1369 * add these pages to the cpu_buffers. Otherwise we just free 1370 * them all and return -ENOMEM; 1371 */ 1372 if (RB_WARN_ON(buffer, nr_pages <= buffer->pages)) 1373 goto out_fail; 1374 1375 new_pages = nr_pages - buffer->pages; 1376 1377 for_each_buffer_cpu(buffer, cpu) { 1378 for (i = 0; i < new_pages; i++) { 1379 bpage = kzalloc_node(ALIGN(sizeof(*bpage), 1380 cache_line_size()), 1381 GFP_KERNEL, cpu_to_node(cpu)); 1382 if (!bpage) 1383 goto free_pages; 1384 list_add(&bpage->list, &pages); 1385 addr = __get_free_page(GFP_KERNEL); 1386 if (!addr) 1387 goto free_pages; 1388 bpage->page = (void *)addr; 1389 rb_init_page(bpage->page); 1390 } 1391 } 1392 1393 for_each_buffer_cpu(buffer, cpu) { 1394 cpu_buffer = buffer->buffers[cpu]; 1395 rb_insert_pages(cpu_buffer, &pages, new_pages); 1396 } 1397 1398 if (RB_WARN_ON(buffer, !list_empty(&pages))) 1399 goto out_fail; 1400 1401 out: 1402 buffer->pages = nr_pages; 1403 put_online_cpus(); 1404 mutex_unlock(&buffer->mutex); 1405 1406 atomic_dec(&buffer->record_disabled); 1407 1408 return size; 1409 1410 free_pages: 1411 list_for_each_entry_safe(bpage, tmp, &pages, list) { 1412 list_del_init(&bpage->list); 1413 free_buffer_page(bpage); 1414 } 1415 put_online_cpus(); 1416 mutex_unlock(&buffer->mutex); 1417 atomic_dec(&buffer->record_disabled); 1418 return -ENOMEM; 1419 1420 /* 1421 * Something went totally wrong, and we are too paranoid 1422 * to even clean up the mess. 1423 */ 1424 out_fail: 1425 put_online_cpus(); 1426 mutex_unlock(&buffer->mutex); 1427 atomic_dec(&buffer->record_disabled); 1428 return -1; 1429 } 1430 EXPORT_SYMBOL_GPL(ring_buffer_resize); 1431 1432 static inline void * 1433 __rb_data_page_index(struct buffer_data_page *bpage, unsigned index) 1434 { 1435 return bpage->data + index; 1436 } 1437 1438 static inline void *__rb_page_index(struct buffer_page *bpage, unsigned index) 1439 { 1440 return bpage->page->data + index; 1441 } 1442 1443 static inline struct ring_buffer_event * 1444 rb_reader_event(struct ring_buffer_per_cpu *cpu_buffer) 1445 { 1446 return __rb_page_index(cpu_buffer->reader_page, 1447 cpu_buffer->reader_page->read); 1448 } 1449 1450 static inline struct ring_buffer_event * 1451 rb_iter_head_event(struct ring_buffer_iter *iter) 1452 { 1453 return __rb_page_index(iter->head_page, iter->head); 1454 } 1455 1456 static inline unsigned long rb_page_write(struct buffer_page *bpage) 1457 { 1458 return local_read(&bpage->write) & RB_WRITE_MASK; 1459 } 1460 1461 static inline unsigned rb_page_commit(struct buffer_page *bpage) 1462 { 1463 return local_read(&bpage->page->commit); 1464 } 1465 1466 static inline unsigned long rb_page_entries(struct buffer_page *bpage) 1467 { 1468 return local_read(&bpage->entries) & RB_WRITE_MASK; 1469 } 1470 1471 /* Size is determined by what has been commited */ 1472 static inline unsigned rb_page_size(struct buffer_page *bpage) 1473 { 1474 return rb_page_commit(bpage); 1475 } 1476 1477 static inline unsigned 1478 rb_commit_index(struct ring_buffer_per_cpu *cpu_buffer) 1479 { 1480 return rb_page_commit(cpu_buffer->commit_page); 1481 } 1482 1483 static inline unsigned 1484 rb_event_index(struct ring_buffer_event *event) 1485 { 1486 unsigned long addr = (unsigned long)event; 1487 1488 return (addr & ~PAGE_MASK) - BUF_PAGE_HDR_SIZE; 1489 } 1490 1491 static inline int 1492 rb_event_is_commit(struct ring_buffer_per_cpu *cpu_buffer, 1493 struct ring_buffer_event *event) 1494 { 1495 unsigned long addr = (unsigned long)event; 1496 unsigned long index; 1497 1498 index = rb_event_index(event); 1499 addr &= PAGE_MASK; 1500 1501 return cpu_buffer->commit_page->page == (void *)addr && 1502 rb_commit_index(cpu_buffer) == index; 1503 } 1504 1505 static void 1506 rb_set_commit_to_write(struct ring_buffer_per_cpu *cpu_buffer) 1507 { 1508 unsigned long max_count; 1509 1510 /* 1511 * We only race with interrupts and NMIs on this CPU. 1512 * If we own the commit event, then we can commit 1513 * all others that interrupted us, since the interruptions 1514 * are in stack format (they finish before they come 1515 * back to us). This allows us to do a simple loop to 1516 * assign the commit to the tail. 1517 */ 1518 again: 1519 max_count = cpu_buffer->buffer->pages * 100; 1520 1521 while (cpu_buffer->commit_page != cpu_buffer->tail_page) { 1522 if (RB_WARN_ON(cpu_buffer, !(--max_count))) 1523 return; 1524 if (RB_WARN_ON(cpu_buffer, 1525 rb_is_reader_page(cpu_buffer->tail_page))) 1526 return; 1527 local_set(&cpu_buffer->commit_page->page->commit, 1528 rb_page_write(cpu_buffer->commit_page)); 1529 rb_inc_page(cpu_buffer, &cpu_buffer->commit_page); 1530 cpu_buffer->write_stamp = 1531 cpu_buffer->commit_page->page->time_stamp; 1532 /* add barrier to keep gcc from optimizing too much */ 1533 barrier(); 1534 } 1535 while (rb_commit_index(cpu_buffer) != 1536 rb_page_write(cpu_buffer->commit_page)) { 1537 1538 local_set(&cpu_buffer->commit_page->page->commit, 1539 rb_page_write(cpu_buffer->commit_page)); 1540 RB_WARN_ON(cpu_buffer, 1541 local_read(&cpu_buffer->commit_page->page->commit) & 1542 ~RB_WRITE_MASK); 1543 barrier(); 1544 } 1545 1546 /* again, keep gcc from optimizing */ 1547 barrier(); 1548 1549 /* 1550 * If an interrupt came in just after the first while loop 1551 * and pushed the tail page forward, we will be left with 1552 * a dangling commit that will never go forward. 1553 */ 1554 if (unlikely(cpu_buffer->commit_page != cpu_buffer->tail_page)) 1555 goto again; 1556 } 1557 1558 static void rb_reset_reader_page(struct ring_buffer_per_cpu *cpu_buffer) 1559 { 1560 cpu_buffer->read_stamp = cpu_buffer->reader_page->page->time_stamp; 1561 cpu_buffer->reader_page->read = 0; 1562 } 1563 1564 static void rb_inc_iter(struct ring_buffer_iter *iter) 1565 { 1566 struct ring_buffer_per_cpu *cpu_buffer = iter->cpu_buffer; 1567 1568 /* 1569 * The iterator could be on the reader page (it starts there). 1570 * But the head could have moved, since the reader was 1571 * found. Check for this case and assign the iterator 1572 * to the head page instead of next. 1573 */ 1574 if (iter->head_page == cpu_buffer->reader_page) 1575 iter->head_page = rb_set_head_page(cpu_buffer); 1576 else 1577 rb_inc_page(cpu_buffer, &iter->head_page); 1578 1579 iter->read_stamp = iter->head_page->page->time_stamp; 1580 iter->head = 0; 1581 } 1582 1583 /* Slow path, do not inline */ 1584 static noinline struct ring_buffer_event * 1585 rb_add_time_stamp(struct ring_buffer_event *event, u64 delta) 1586 { 1587 event->type_len = RINGBUF_TYPE_TIME_EXTEND; 1588 1589 /* Not the first event on the page? */ 1590 if (rb_event_index(event)) { 1591 event->time_delta = delta & TS_MASK; 1592 event->array[0] = delta >> TS_SHIFT; 1593 } else { 1594 /* nope, just zero it */ 1595 event->time_delta = 0; 1596 event->array[0] = 0; 1597 } 1598 1599 return skip_time_extend(event); 1600 } 1601 1602 /** 1603 * ring_buffer_update_event - update event type and data 1604 * @event: the even to update 1605 * @type: the type of event 1606 * @length: the size of the event field in the ring buffer 1607 * 1608 * Update the type and data fields of the event. The length 1609 * is the actual size that is written to the ring buffer, 1610 * and with this, we can determine what to place into the 1611 * data field. 1612 */ 1613 static void 1614 rb_update_event(struct ring_buffer_per_cpu *cpu_buffer, 1615 struct ring_buffer_event *event, unsigned length, 1616 int add_timestamp, u64 delta) 1617 { 1618 /* Only a commit updates the timestamp */ 1619 if (unlikely(!rb_event_is_commit(cpu_buffer, event))) 1620 delta = 0; 1621 1622 /* 1623 * If we need to add a timestamp, then we 1624 * add it to the start of the resevered space. 1625 */ 1626 if (unlikely(add_timestamp)) { 1627 event = rb_add_time_stamp(event, delta); 1628 length -= RB_LEN_TIME_EXTEND; 1629 delta = 0; 1630 } 1631 1632 event->time_delta = delta; 1633 length -= RB_EVNT_HDR_SIZE; 1634 if (length > RB_MAX_SMALL_DATA || RB_FORCE_8BYTE_ALIGNMENT) { 1635 event->type_len = 0; 1636 event->array[0] = length; 1637 } else 1638 event->type_len = DIV_ROUND_UP(length, RB_ALIGNMENT); 1639 } 1640 1641 /* 1642 * rb_handle_head_page - writer hit the head page 1643 * 1644 * Returns: +1 to retry page 1645 * 0 to continue 1646 * -1 on error 1647 */ 1648 static int 1649 rb_handle_head_page(struct ring_buffer_per_cpu *cpu_buffer, 1650 struct buffer_page *tail_page, 1651 struct buffer_page *next_page) 1652 { 1653 struct buffer_page *new_head; 1654 int entries; 1655 int type; 1656 int ret; 1657 1658 entries = rb_page_entries(next_page); 1659 1660 /* 1661 * The hard part is here. We need to move the head 1662 * forward, and protect against both readers on 1663 * other CPUs and writers coming in via interrupts. 1664 */ 1665 type = rb_head_page_set_update(cpu_buffer, next_page, tail_page, 1666 RB_PAGE_HEAD); 1667 1668 /* 1669 * type can be one of four: 1670 * NORMAL - an interrupt already moved it for us 1671 * HEAD - we are the first to get here. 1672 * UPDATE - we are the interrupt interrupting 1673 * a current move. 1674 * MOVED - a reader on another CPU moved the next 1675 * pointer to its reader page. Give up 1676 * and try again. 1677 */ 1678 1679 switch (type) { 1680 case RB_PAGE_HEAD: 1681 /* 1682 * We changed the head to UPDATE, thus 1683 * it is our responsibility to update 1684 * the counters. 1685 */ 1686 local_add(entries, &cpu_buffer->overrun); 1687 1688 /* 1689 * The entries will be zeroed out when we move the 1690 * tail page. 1691 */ 1692 1693 /* still more to do */ 1694 break; 1695 1696 case RB_PAGE_UPDATE: 1697 /* 1698 * This is an interrupt that interrupt the 1699 * previous update. Still more to do. 1700 */ 1701 break; 1702 case RB_PAGE_NORMAL: 1703 /* 1704 * An interrupt came in before the update 1705 * and processed this for us. 1706 * Nothing left to do. 1707 */ 1708 return 1; 1709 case RB_PAGE_MOVED: 1710 /* 1711 * The reader is on another CPU and just did 1712 * a swap with our next_page. 1713 * Try again. 1714 */ 1715 return 1; 1716 default: 1717 RB_WARN_ON(cpu_buffer, 1); /* WTF??? */ 1718 return -1; 1719 } 1720 1721 /* 1722 * Now that we are here, the old head pointer is 1723 * set to UPDATE. This will keep the reader from 1724 * swapping the head page with the reader page. 1725 * The reader (on another CPU) will spin till 1726 * we are finished. 1727 * 1728 * We just need to protect against interrupts 1729 * doing the job. We will set the next pointer 1730 * to HEAD. After that, we set the old pointer 1731 * to NORMAL, but only if it was HEAD before. 1732 * otherwise we are an interrupt, and only 1733 * want the outer most commit to reset it. 1734 */ 1735 new_head = next_page; 1736 rb_inc_page(cpu_buffer, &new_head); 1737 1738 ret = rb_head_page_set_head(cpu_buffer, new_head, next_page, 1739 RB_PAGE_NORMAL); 1740 1741 /* 1742 * Valid returns are: 1743 * HEAD - an interrupt came in and already set it. 1744 * NORMAL - One of two things: 1745 * 1) We really set it. 1746 * 2) A bunch of interrupts came in and moved 1747 * the page forward again. 1748 */ 1749 switch (ret) { 1750 case RB_PAGE_HEAD: 1751 case RB_PAGE_NORMAL: 1752 /* OK */ 1753 break; 1754 default: 1755 RB_WARN_ON(cpu_buffer, 1); 1756 return -1; 1757 } 1758 1759 /* 1760 * It is possible that an interrupt came in, 1761 * set the head up, then more interrupts came in 1762 * and moved it again. When we get back here, 1763 * the page would have been set to NORMAL but we 1764 * just set it back to HEAD. 1765 * 1766 * How do you detect this? Well, if that happened 1767 * the tail page would have moved. 1768 */ 1769 if (ret == RB_PAGE_NORMAL) { 1770 /* 1771 * If the tail had moved passed next, then we need 1772 * to reset the pointer. 1773 */ 1774 if (cpu_buffer->tail_page != tail_page && 1775 cpu_buffer->tail_page != next_page) 1776 rb_head_page_set_normal(cpu_buffer, new_head, 1777 next_page, 1778 RB_PAGE_HEAD); 1779 } 1780 1781 /* 1782 * If this was the outer most commit (the one that 1783 * changed the original pointer from HEAD to UPDATE), 1784 * then it is up to us to reset it to NORMAL. 1785 */ 1786 if (type == RB_PAGE_HEAD) { 1787 ret = rb_head_page_set_normal(cpu_buffer, next_page, 1788 tail_page, 1789 RB_PAGE_UPDATE); 1790 if (RB_WARN_ON(cpu_buffer, 1791 ret != RB_PAGE_UPDATE)) 1792 return -1; 1793 } 1794 1795 return 0; 1796 } 1797 1798 static unsigned rb_calculate_event_length(unsigned length) 1799 { 1800 struct ring_buffer_event event; /* Used only for sizeof array */ 1801 1802 /* zero length can cause confusions */ 1803 if (!length) 1804 length = 1; 1805 1806 if (length > RB_MAX_SMALL_DATA || RB_FORCE_8BYTE_ALIGNMENT) 1807 length += sizeof(event.array[0]); 1808 1809 length += RB_EVNT_HDR_SIZE; 1810 length = ALIGN(length, RB_ARCH_ALIGNMENT); 1811 1812 return length; 1813 } 1814 1815 static inline void 1816 rb_reset_tail(struct ring_buffer_per_cpu *cpu_buffer, 1817 struct buffer_page *tail_page, 1818 unsigned long tail, unsigned long length) 1819 { 1820 struct ring_buffer_event *event; 1821 1822 /* 1823 * Only the event that crossed the page boundary 1824 * must fill the old tail_page with padding. 1825 */ 1826 if (tail >= BUF_PAGE_SIZE) { 1827 /* 1828 * If the page was filled, then we still need 1829 * to update the real_end. Reset it to zero 1830 * and the reader will ignore it. 1831 */ 1832 if (tail == BUF_PAGE_SIZE) 1833 tail_page->real_end = 0; 1834 1835 local_sub(length, &tail_page->write); 1836 return; 1837 } 1838 1839 event = __rb_page_index(tail_page, tail); 1840 kmemcheck_annotate_bitfield(event, bitfield); 1841 1842 /* 1843 * Save the original length to the meta data. 1844 * This will be used by the reader to add lost event 1845 * counter. 1846 */ 1847 tail_page->real_end = tail; 1848 1849 /* 1850 * If this event is bigger than the minimum size, then 1851 * we need to be careful that we don't subtract the 1852 * write counter enough to allow another writer to slip 1853 * in on this page. 1854 * We put in a discarded commit instead, to make sure 1855 * that this space is not used again. 1856 * 1857 * If we are less than the minimum size, we don't need to 1858 * worry about it. 1859 */ 1860 if (tail > (BUF_PAGE_SIZE - RB_EVNT_MIN_SIZE)) { 1861 /* No room for any events */ 1862 1863 /* Mark the rest of the page with padding */ 1864 rb_event_set_padding(event); 1865 1866 /* Set the write back to the previous setting */ 1867 local_sub(length, &tail_page->write); 1868 return; 1869 } 1870 1871 /* Put in a discarded event */ 1872 event->array[0] = (BUF_PAGE_SIZE - tail) - RB_EVNT_HDR_SIZE; 1873 event->type_len = RINGBUF_TYPE_PADDING; 1874 /* time delta must be non zero */ 1875 event->time_delta = 1; 1876 1877 /* Set write to end of buffer */ 1878 length = (tail + length) - BUF_PAGE_SIZE; 1879 local_sub(length, &tail_page->write); 1880 } 1881 1882 /* 1883 * This is the slow path, force gcc not to inline it. 1884 */ 1885 static noinline struct ring_buffer_event * 1886 rb_move_tail(struct ring_buffer_per_cpu *cpu_buffer, 1887 unsigned long length, unsigned long tail, 1888 struct buffer_page *tail_page, u64 ts) 1889 { 1890 struct buffer_page *commit_page = cpu_buffer->commit_page; 1891 struct ring_buffer *buffer = cpu_buffer->buffer; 1892 struct buffer_page *next_page; 1893 int ret; 1894 1895 next_page = tail_page; 1896 1897 rb_inc_page(cpu_buffer, &next_page); 1898 1899 /* 1900 * If for some reason, we had an interrupt storm that made 1901 * it all the way around the buffer, bail, and warn 1902 * about it. 1903 */ 1904 if (unlikely(next_page == commit_page)) { 1905 local_inc(&cpu_buffer->commit_overrun); 1906 goto out_reset; 1907 } 1908 1909 /* 1910 * This is where the fun begins! 1911 * 1912 * We are fighting against races between a reader that 1913 * could be on another CPU trying to swap its reader 1914 * page with the buffer head. 1915 * 1916 * We are also fighting against interrupts coming in and 1917 * moving the head or tail on us as well. 1918 * 1919 * If the next page is the head page then we have filled 1920 * the buffer, unless the commit page is still on the 1921 * reader page. 1922 */ 1923 if (rb_is_head_page(cpu_buffer, next_page, &tail_page->list)) { 1924 1925 /* 1926 * If the commit is not on the reader page, then 1927 * move the header page. 1928 */ 1929 if (!rb_is_reader_page(cpu_buffer->commit_page)) { 1930 /* 1931 * If we are not in overwrite mode, 1932 * this is easy, just stop here. 1933 */ 1934 if (!(buffer->flags & RB_FL_OVERWRITE)) 1935 goto out_reset; 1936 1937 ret = rb_handle_head_page(cpu_buffer, 1938 tail_page, 1939 next_page); 1940 if (ret < 0) 1941 goto out_reset; 1942 if (ret) 1943 goto out_again; 1944 } else { 1945 /* 1946 * We need to be careful here too. The 1947 * commit page could still be on the reader 1948 * page. We could have a small buffer, and 1949 * have filled up the buffer with events 1950 * from interrupts and such, and wrapped. 1951 * 1952 * Note, if the tail page is also the on the 1953 * reader_page, we let it move out. 1954 */ 1955 if (unlikely((cpu_buffer->commit_page != 1956 cpu_buffer->tail_page) && 1957 (cpu_buffer->commit_page == 1958 cpu_buffer->reader_page))) { 1959 local_inc(&cpu_buffer->commit_overrun); 1960 goto out_reset; 1961 } 1962 } 1963 } 1964 1965 ret = rb_tail_page_update(cpu_buffer, tail_page, next_page); 1966 if (ret) { 1967 /* 1968 * Nested commits always have zero deltas, so 1969 * just reread the time stamp 1970 */ 1971 ts = rb_time_stamp(buffer); 1972 next_page->page->time_stamp = ts; 1973 } 1974 1975 out_again: 1976 1977 rb_reset_tail(cpu_buffer, tail_page, tail, length); 1978 1979 /* fail and let the caller try again */ 1980 return ERR_PTR(-EAGAIN); 1981 1982 out_reset: 1983 /* reset write */ 1984 rb_reset_tail(cpu_buffer, tail_page, tail, length); 1985 1986 return NULL; 1987 } 1988 1989 static struct ring_buffer_event * 1990 __rb_reserve_next(struct ring_buffer_per_cpu *cpu_buffer, 1991 unsigned long length, u64 ts, 1992 u64 delta, int add_timestamp) 1993 { 1994 struct buffer_page *tail_page; 1995 struct ring_buffer_event *event; 1996 unsigned long tail, write; 1997 1998 /* 1999 * If the time delta since the last event is too big to 2000 * hold in the time field of the event, then we append a 2001 * TIME EXTEND event ahead of the data event. 2002 */ 2003 if (unlikely(add_timestamp)) 2004 length += RB_LEN_TIME_EXTEND; 2005 2006 tail_page = cpu_buffer->tail_page; 2007 write = local_add_return(length, &tail_page->write); 2008 2009 /* set write to only the index of the write */ 2010 write &= RB_WRITE_MASK; 2011 tail = write - length; 2012 2013 /* See if we shot pass the end of this buffer page */ 2014 if (unlikely(write > BUF_PAGE_SIZE)) 2015 return rb_move_tail(cpu_buffer, length, tail, 2016 tail_page, ts); 2017 2018 /* We reserved something on the buffer */ 2019 2020 event = __rb_page_index(tail_page, tail); 2021 kmemcheck_annotate_bitfield(event, bitfield); 2022 rb_update_event(cpu_buffer, event, length, add_timestamp, delta); 2023 2024 local_inc(&tail_page->entries); 2025 2026 /* 2027 * If this is the first commit on the page, then update 2028 * its timestamp. 2029 */ 2030 if (!tail) 2031 tail_page->page->time_stamp = ts; 2032 2033 return event; 2034 } 2035 2036 static inline int 2037 rb_try_to_discard(struct ring_buffer_per_cpu *cpu_buffer, 2038 struct ring_buffer_event *event) 2039 { 2040 unsigned long new_index, old_index; 2041 struct buffer_page *bpage; 2042 unsigned long index; 2043 unsigned long addr; 2044 2045 new_index = rb_event_index(event); 2046 old_index = new_index + rb_event_ts_length(event); 2047 addr = (unsigned long)event; 2048 addr &= PAGE_MASK; 2049 2050 bpage = cpu_buffer->tail_page; 2051 2052 if (bpage->page == (void *)addr && rb_page_write(bpage) == old_index) { 2053 unsigned long write_mask = 2054 local_read(&bpage->write) & ~RB_WRITE_MASK; 2055 /* 2056 * This is on the tail page. It is possible that 2057 * a write could come in and move the tail page 2058 * and write to the next page. That is fine 2059 * because we just shorten what is on this page. 2060 */ 2061 old_index += write_mask; 2062 new_index += write_mask; 2063 index = local_cmpxchg(&bpage->write, old_index, new_index); 2064 if (index == old_index) 2065 return 1; 2066 } 2067 2068 /* could not discard */ 2069 return 0; 2070 } 2071 2072 static void rb_start_commit(struct ring_buffer_per_cpu *cpu_buffer) 2073 { 2074 local_inc(&cpu_buffer->committing); 2075 local_inc(&cpu_buffer->commits); 2076 } 2077 2078 static inline void rb_end_commit(struct ring_buffer_per_cpu *cpu_buffer) 2079 { 2080 unsigned long commits; 2081 2082 if (RB_WARN_ON(cpu_buffer, 2083 !local_read(&cpu_buffer->committing))) 2084 return; 2085 2086 again: 2087 commits = local_read(&cpu_buffer->commits); 2088 /* synchronize with interrupts */ 2089 barrier(); 2090 if (local_read(&cpu_buffer->committing) == 1) 2091 rb_set_commit_to_write(cpu_buffer); 2092 2093 local_dec(&cpu_buffer->committing); 2094 2095 /* synchronize with interrupts */ 2096 barrier(); 2097 2098 /* 2099 * Need to account for interrupts coming in between the 2100 * updating of the commit page and the clearing of the 2101 * committing counter. 2102 */ 2103 if (unlikely(local_read(&cpu_buffer->commits) != commits) && 2104 !local_read(&cpu_buffer->committing)) { 2105 local_inc(&cpu_buffer->committing); 2106 goto again; 2107 } 2108 } 2109 2110 static struct ring_buffer_event * 2111 rb_reserve_next_event(struct ring_buffer *buffer, 2112 struct ring_buffer_per_cpu *cpu_buffer, 2113 unsigned long length) 2114 { 2115 struct ring_buffer_event *event; 2116 u64 ts, delta; 2117 int nr_loops = 0; 2118 int add_timestamp; 2119 u64 diff; 2120 2121 rb_start_commit(cpu_buffer); 2122 2123 #ifdef CONFIG_RING_BUFFER_ALLOW_SWAP 2124 /* 2125 * Due to the ability to swap a cpu buffer from a buffer 2126 * it is possible it was swapped before we committed. 2127 * (committing stops a swap). We check for it here and 2128 * if it happened, we have to fail the write. 2129 */ 2130 barrier(); 2131 if (unlikely(ACCESS_ONCE(cpu_buffer->buffer) != buffer)) { 2132 local_dec(&cpu_buffer->committing); 2133 local_dec(&cpu_buffer->commits); 2134 return NULL; 2135 } 2136 #endif 2137 2138 length = rb_calculate_event_length(length); 2139 again: 2140 add_timestamp = 0; 2141 delta = 0; 2142 2143 /* 2144 * We allow for interrupts to reenter here and do a trace. 2145 * If one does, it will cause this original code to loop 2146 * back here. Even with heavy interrupts happening, this 2147 * should only happen a few times in a row. If this happens 2148 * 1000 times in a row, there must be either an interrupt 2149 * storm or we have something buggy. 2150 * Bail! 2151 */ 2152 if (RB_WARN_ON(cpu_buffer, ++nr_loops > 1000)) 2153 goto out_fail; 2154 2155 ts = rb_time_stamp(cpu_buffer->buffer); 2156 diff = ts - cpu_buffer->write_stamp; 2157 2158 /* make sure this diff is calculated here */ 2159 barrier(); 2160 2161 /* Did the write stamp get updated already? */ 2162 if (likely(ts >= cpu_buffer->write_stamp)) { 2163 delta = diff; 2164 if (unlikely(test_time_stamp(delta))) { 2165 WARN_ONCE(delta > (1ULL << 59), 2166 KERN_WARNING "Delta way too big! %llu ts=%llu write stamp = %llu\n", 2167 (unsigned long long)delta, 2168 (unsigned long long)ts, 2169 (unsigned long long)cpu_buffer->write_stamp); 2170 add_timestamp = 1; 2171 } 2172 } 2173 2174 event = __rb_reserve_next(cpu_buffer, length, ts, 2175 delta, add_timestamp); 2176 if (unlikely(PTR_ERR(event) == -EAGAIN)) 2177 goto again; 2178 2179 if (!event) 2180 goto out_fail; 2181 2182 return event; 2183 2184 out_fail: 2185 rb_end_commit(cpu_buffer); 2186 return NULL; 2187 } 2188 2189 #ifdef CONFIG_TRACING 2190 2191 #define TRACE_RECURSIVE_DEPTH 16 2192 2193 /* Keep this code out of the fast path cache */ 2194 static noinline void trace_recursive_fail(void) 2195 { 2196 /* Disable all tracing before we do anything else */ 2197 tracing_off_permanent(); 2198 2199 printk_once(KERN_WARNING "Tracing recursion: depth[%ld]:" 2200 "HC[%lu]:SC[%lu]:NMI[%lu]\n", 2201 current->trace_recursion, 2202 hardirq_count() >> HARDIRQ_SHIFT, 2203 softirq_count() >> SOFTIRQ_SHIFT, 2204 in_nmi()); 2205 2206 WARN_ON_ONCE(1); 2207 } 2208 2209 static inline int trace_recursive_lock(void) 2210 { 2211 current->trace_recursion++; 2212 2213 if (likely(current->trace_recursion < TRACE_RECURSIVE_DEPTH)) 2214 return 0; 2215 2216 trace_recursive_fail(); 2217 2218 return -1; 2219 } 2220 2221 static inline void trace_recursive_unlock(void) 2222 { 2223 WARN_ON_ONCE(!current->trace_recursion); 2224 2225 current->trace_recursion--; 2226 } 2227 2228 #else 2229 2230 #define trace_recursive_lock() (0) 2231 #define trace_recursive_unlock() do { } while (0) 2232 2233 #endif 2234 2235 /** 2236 * ring_buffer_lock_reserve - reserve a part of the buffer 2237 * @buffer: the ring buffer to reserve from 2238 * @length: the length of the data to reserve (excluding event header) 2239 * 2240 * Returns a reseverd event on the ring buffer to copy directly to. 2241 * The user of this interface will need to get the body to write into 2242 * and can use the ring_buffer_event_data() interface. 2243 * 2244 * The length is the length of the data needed, not the event length 2245 * which also includes the event header. 2246 * 2247 * Must be paired with ring_buffer_unlock_commit, unless NULL is returned. 2248 * If NULL is returned, then nothing has been allocated or locked. 2249 */ 2250 struct ring_buffer_event * 2251 ring_buffer_lock_reserve(struct ring_buffer *buffer, unsigned long length) 2252 { 2253 struct ring_buffer_per_cpu *cpu_buffer; 2254 struct ring_buffer_event *event; 2255 int cpu; 2256 2257 if (ring_buffer_flags != RB_BUFFERS_ON) 2258 return NULL; 2259 2260 /* If we are tracing schedule, we don't want to recurse */ 2261 preempt_disable_notrace(); 2262 2263 if (atomic_read(&buffer->record_disabled)) 2264 goto out_nocheck; 2265 2266 if (trace_recursive_lock()) 2267 goto out_nocheck; 2268 2269 cpu = raw_smp_processor_id(); 2270 2271 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 2272 goto out; 2273 2274 cpu_buffer = buffer->buffers[cpu]; 2275 2276 if (atomic_read(&cpu_buffer->record_disabled)) 2277 goto out; 2278 2279 if (length > BUF_MAX_DATA_SIZE) 2280 goto out; 2281 2282 event = rb_reserve_next_event(buffer, cpu_buffer, length); 2283 if (!event) 2284 goto out; 2285 2286 return event; 2287 2288 out: 2289 trace_recursive_unlock(); 2290 2291 out_nocheck: 2292 preempt_enable_notrace(); 2293 return NULL; 2294 } 2295 EXPORT_SYMBOL_GPL(ring_buffer_lock_reserve); 2296 2297 static void 2298 rb_update_write_stamp(struct ring_buffer_per_cpu *cpu_buffer, 2299 struct ring_buffer_event *event) 2300 { 2301 u64 delta; 2302 2303 /* 2304 * The event first in the commit queue updates the 2305 * time stamp. 2306 */ 2307 if (rb_event_is_commit(cpu_buffer, event)) { 2308 /* 2309 * A commit event that is first on a page 2310 * updates the write timestamp with the page stamp 2311 */ 2312 if (!rb_event_index(event)) 2313 cpu_buffer->write_stamp = 2314 cpu_buffer->commit_page->page->time_stamp; 2315 else if (event->type_len == RINGBUF_TYPE_TIME_EXTEND) { 2316 delta = event->array[0]; 2317 delta <<= TS_SHIFT; 2318 delta += event->time_delta; 2319 cpu_buffer->write_stamp += delta; 2320 } else 2321 cpu_buffer->write_stamp += event->time_delta; 2322 } 2323 } 2324 2325 static void rb_commit(struct ring_buffer_per_cpu *cpu_buffer, 2326 struct ring_buffer_event *event) 2327 { 2328 local_inc(&cpu_buffer->entries); 2329 rb_update_write_stamp(cpu_buffer, event); 2330 rb_end_commit(cpu_buffer); 2331 } 2332 2333 /** 2334 * ring_buffer_unlock_commit - commit a reserved 2335 * @buffer: The buffer to commit to 2336 * @event: The event pointer to commit. 2337 * 2338 * This commits the data to the ring buffer, and releases any locks held. 2339 * 2340 * Must be paired with ring_buffer_lock_reserve. 2341 */ 2342 int ring_buffer_unlock_commit(struct ring_buffer *buffer, 2343 struct ring_buffer_event *event) 2344 { 2345 struct ring_buffer_per_cpu *cpu_buffer; 2346 int cpu = raw_smp_processor_id(); 2347 2348 cpu_buffer = buffer->buffers[cpu]; 2349 2350 rb_commit(cpu_buffer, event); 2351 2352 trace_recursive_unlock(); 2353 2354 preempt_enable_notrace(); 2355 2356 return 0; 2357 } 2358 EXPORT_SYMBOL_GPL(ring_buffer_unlock_commit); 2359 2360 static inline void rb_event_discard(struct ring_buffer_event *event) 2361 { 2362 if (event->type_len == RINGBUF_TYPE_TIME_EXTEND) 2363 event = skip_time_extend(event); 2364 2365 /* array[0] holds the actual length for the discarded event */ 2366 event->array[0] = rb_event_data_length(event) - RB_EVNT_HDR_SIZE; 2367 event->type_len = RINGBUF_TYPE_PADDING; 2368 /* time delta must be non zero */ 2369 if (!event->time_delta) 2370 event->time_delta = 1; 2371 } 2372 2373 /* 2374 * Decrement the entries to the page that an event is on. 2375 * The event does not even need to exist, only the pointer 2376 * to the page it is on. This may only be called before the commit 2377 * takes place. 2378 */ 2379 static inline void 2380 rb_decrement_entry(struct ring_buffer_per_cpu *cpu_buffer, 2381 struct ring_buffer_event *event) 2382 { 2383 unsigned long addr = (unsigned long)event; 2384 struct buffer_page *bpage = cpu_buffer->commit_page; 2385 struct buffer_page *start; 2386 2387 addr &= PAGE_MASK; 2388 2389 /* Do the likely case first */ 2390 if (likely(bpage->page == (void *)addr)) { 2391 local_dec(&bpage->entries); 2392 return; 2393 } 2394 2395 /* 2396 * Because the commit page may be on the reader page we 2397 * start with the next page and check the end loop there. 2398 */ 2399 rb_inc_page(cpu_buffer, &bpage); 2400 start = bpage; 2401 do { 2402 if (bpage->page == (void *)addr) { 2403 local_dec(&bpage->entries); 2404 return; 2405 } 2406 rb_inc_page(cpu_buffer, &bpage); 2407 } while (bpage != start); 2408 2409 /* commit not part of this buffer?? */ 2410 RB_WARN_ON(cpu_buffer, 1); 2411 } 2412 2413 /** 2414 * ring_buffer_commit_discard - discard an event that has not been committed 2415 * @buffer: the ring buffer 2416 * @event: non committed event to discard 2417 * 2418 * Sometimes an event that is in the ring buffer needs to be ignored. 2419 * This function lets the user discard an event in the ring buffer 2420 * and then that event will not be read later. 2421 * 2422 * This function only works if it is called before the the item has been 2423 * committed. It will try to free the event from the ring buffer 2424 * if another event has not been added behind it. 2425 * 2426 * If another event has been added behind it, it will set the event 2427 * up as discarded, and perform the commit. 2428 * 2429 * If this function is called, do not call ring_buffer_unlock_commit on 2430 * the event. 2431 */ 2432 void ring_buffer_discard_commit(struct ring_buffer *buffer, 2433 struct ring_buffer_event *event) 2434 { 2435 struct ring_buffer_per_cpu *cpu_buffer; 2436 int cpu; 2437 2438 /* The event is discarded regardless */ 2439 rb_event_discard(event); 2440 2441 cpu = smp_processor_id(); 2442 cpu_buffer = buffer->buffers[cpu]; 2443 2444 /* 2445 * This must only be called if the event has not been 2446 * committed yet. Thus we can assume that preemption 2447 * is still disabled. 2448 */ 2449 RB_WARN_ON(buffer, !local_read(&cpu_buffer->committing)); 2450 2451 rb_decrement_entry(cpu_buffer, event); 2452 if (rb_try_to_discard(cpu_buffer, event)) 2453 goto out; 2454 2455 /* 2456 * The commit is still visible by the reader, so we 2457 * must still update the timestamp. 2458 */ 2459 rb_update_write_stamp(cpu_buffer, event); 2460 out: 2461 rb_end_commit(cpu_buffer); 2462 2463 trace_recursive_unlock(); 2464 2465 preempt_enable_notrace(); 2466 2467 } 2468 EXPORT_SYMBOL_GPL(ring_buffer_discard_commit); 2469 2470 /** 2471 * ring_buffer_write - write data to the buffer without reserving 2472 * @buffer: The ring buffer to write to. 2473 * @length: The length of the data being written (excluding the event header) 2474 * @data: The data to write to the buffer. 2475 * 2476 * This is like ring_buffer_lock_reserve and ring_buffer_unlock_commit as 2477 * one function. If you already have the data to write to the buffer, it 2478 * may be easier to simply call this function. 2479 * 2480 * Note, like ring_buffer_lock_reserve, the length is the length of the data 2481 * and not the length of the event which would hold the header. 2482 */ 2483 int ring_buffer_write(struct ring_buffer *buffer, 2484 unsigned long length, 2485 void *data) 2486 { 2487 struct ring_buffer_per_cpu *cpu_buffer; 2488 struct ring_buffer_event *event; 2489 void *body; 2490 int ret = -EBUSY; 2491 int cpu; 2492 2493 if (ring_buffer_flags != RB_BUFFERS_ON) 2494 return -EBUSY; 2495 2496 preempt_disable_notrace(); 2497 2498 if (atomic_read(&buffer->record_disabled)) 2499 goto out; 2500 2501 cpu = raw_smp_processor_id(); 2502 2503 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 2504 goto out; 2505 2506 cpu_buffer = buffer->buffers[cpu]; 2507 2508 if (atomic_read(&cpu_buffer->record_disabled)) 2509 goto out; 2510 2511 if (length > BUF_MAX_DATA_SIZE) 2512 goto out; 2513 2514 event = rb_reserve_next_event(buffer, cpu_buffer, length); 2515 if (!event) 2516 goto out; 2517 2518 body = rb_event_data(event); 2519 2520 memcpy(body, data, length); 2521 2522 rb_commit(cpu_buffer, event); 2523 2524 ret = 0; 2525 out: 2526 preempt_enable_notrace(); 2527 2528 return ret; 2529 } 2530 EXPORT_SYMBOL_GPL(ring_buffer_write); 2531 2532 static int rb_per_cpu_empty(struct ring_buffer_per_cpu *cpu_buffer) 2533 { 2534 struct buffer_page *reader = cpu_buffer->reader_page; 2535 struct buffer_page *head = rb_set_head_page(cpu_buffer); 2536 struct buffer_page *commit = cpu_buffer->commit_page; 2537 2538 /* In case of error, head will be NULL */ 2539 if (unlikely(!head)) 2540 return 1; 2541 2542 return reader->read == rb_page_commit(reader) && 2543 (commit == reader || 2544 (commit == head && 2545 head->read == rb_page_commit(commit))); 2546 } 2547 2548 /** 2549 * ring_buffer_record_disable - stop all writes into the buffer 2550 * @buffer: The ring buffer to stop writes to. 2551 * 2552 * This prevents all writes to the buffer. Any attempt to write 2553 * to the buffer after this will fail and return NULL. 2554 * 2555 * The caller should call synchronize_sched() after this. 2556 */ 2557 void ring_buffer_record_disable(struct ring_buffer *buffer) 2558 { 2559 atomic_inc(&buffer->record_disabled); 2560 } 2561 EXPORT_SYMBOL_GPL(ring_buffer_record_disable); 2562 2563 /** 2564 * ring_buffer_record_enable - enable writes to the buffer 2565 * @buffer: The ring buffer to enable writes 2566 * 2567 * Note, multiple disables will need the same number of enables 2568 * to truly enable the writing (much like preempt_disable). 2569 */ 2570 void ring_buffer_record_enable(struct ring_buffer *buffer) 2571 { 2572 atomic_dec(&buffer->record_disabled); 2573 } 2574 EXPORT_SYMBOL_GPL(ring_buffer_record_enable); 2575 2576 /** 2577 * ring_buffer_record_disable_cpu - stop all writes into the cpu_buffer 2578 * @buffer: The ring buffer to stop writes to. 2579 * @cpu: The CPU buffer to stop 2580 * 2581 * This prevents all writes to the buffer. Any attempt to write 2582 * to the buffer after this will fail and return NULL. 2583 * 2584 * The caller should call synchronize_sched() after this. 2585 */ 2586 void ring_buffer_record_disable_cpu(struct ring_buffer *buffer, int cpu) 2587 { 2588 struct ring_buffer_per_cpu *cpu_buffer; 2589 2590 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 2591 return; 2592 2593 cpu_buffer = buffer->buffers[cpu]; 2594 atomic_inc(&cpu_buffer->record_disabled); 2595 } 2596 EXPORT_SYMBOL_GPL(ring_buffer_record_disable_cpu); 2597 2598 /** 2599 * ring_buffer_record_enable_cpu - enable writes to the buffer 2600 * @buffer: The ring buffer to enable writes 2601 * @cpu: The CPU to enable. 2602 * 2603 * Note, multiple disables will need the same number of enables 2604 * to truly enable the writing (much like preempt_disable). 2605 */ 2606 void ring_buffer_record_enable_cpu(struct ring_buffer *buffer, int cpu) 2607 { 2608 struct ring_buffer_per_cpu *cpu_buffer; 2609 2610 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 2611 return; 2612 2613 cpu_buffer = buffer->buffers[cpu]; 2614 atomic_dec(&cpu_buffer->record_disabled); 2615 } 2616 EXPORT_SYMBOL_GPL(ring_buffer_record_enable_cpu); 2617 2618 /* 2619 * The total entries in the ring buffer is the running counter 2620 * of entries entered into the ring buffer, minus the sum of 2621 * the entries read from the ring buffer and the number of 2622 * entries that were overwritten. 2623 */ 2624 static inline unsigned long 2625 rb_num_of_entries(struct ring_buffer_per_cpu *cpu_buffer) 2626 { 2627 return local_read(&cpu_buffer->entries) - 2628 (local_read(&cpu_buffer->overrun) + cpu_buffer->read); 2629 } 2630 2631 /** 2632 * ring_buffer_entries_cpu - get the number of entries in a cpu buffer 2633 * @buffer: The ring buffer 2634 * @cpu: The per CPU buffer to get the entries from. 2635 */ 2636 unsigned long ring_buffer_entries_cpu(struct ring_buffer *buffer, int cpu) 2637 { 2638 struct ring_buffer_per_cpu *cpu_buffer; 2639 2640 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 2641 return 0; 2642 2643 cpu_buffer = buffer->buffers[cpu]; 2644 2645 return rb_num_of_entries(cpu_buffer); 2646 } 2647 EXPORT_SYMBOL_GPL(ring_buffer_entries_cpu); 2648 2649 /** 2650 * ring_buffer_overrun_cpu - get the number of overruns in a cpu_buffer 2651 * @buffer: The ring buffer 2652 * @cpu: The per CPU buffer to get the number of overruns from 2653 */ 2654 unsigned long ring_buffer_overrun_cpu(struct ring_buffer *buffer, int cpu) 2655 { 2656 struct ring_buffer_per_cpu *cpu_buffer; 2657 unsigned long ret; 2658 2659 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 2660 return 0; 2661 2662 cpu_buffer = buffer->buffers[cpu]; 2663 ret = local_read(&cpu_buffer->overrun); 2664 2665 return ret; 2666 } 2667 EXPORT_SYMBOL_GPL(ring_buffer_overrun_cpu); 2668 2669 /** 2670 * ring_buffer_commit_overrun_cpu - get the number of overruns caused by commits 2671 * @buffer: The ring buffer 2672 * @cpu: The per CPU buffer to get the number of overruns from 2673 */ 2674 unsigned long 2675 ring_buffer_commit_overrun_cpu(struct ring_buffer *buffer, int cpu) 2676 { 2677 struct ring_buffer_per_cpu *cpu_buffer; 2678 unsigned long ret; 2679 2680 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 2681 return 0; 2682 2683 cpu_buffer = buffer->buffers[cpu]; 2684 ret = local_read(&cpu_buffer->commit_overrun); 2685 2686 return ret; 2687 } 2688 EXPORT_SYMBOL_GPL(ring_buffer_commit_overrun_cpu); 2689 2690 /** 2691 * ring_buffer_entries - get the number of entries in a buffer 2692 * @buffer: The ring buffer 2693 * 2694 * Returns the total number of entries in the ring buffer 2695 * (all CPU entries) 2696 */ 2697 unsigned long ring_buffer_entries(struct ring_buffer *buffer) 2698 { 2699 struct ring_buffer_per_cpu *cpu_buffer; 2700 unsigned long entries = 0; 2701 int cpu; 2702 2703 /* if you care about this being correct, lock the buffer */ 2704 for_each_buffer_cpu(buffer, cpu) { 2705 cpu_buffer = buffer->buffers[cpu]; 2706 entries += rb_num_of_entries(cpu_buffer); 2707 } 2708 2709 return entries; 2710 } 2711 EXPORT_SYMBOL_GPL(ring_buffer_entries); 2712 2713 /** 2714 * ring_buffer_overruns - get the number of overruns in buffer 2715 * @buffer: The ring buffer 2716 * 2717 * Returns the total number of overruns in the ring buffer 2718 * (all CPU entries) 2719 */ 2720 unsigned long ring_buffer_overruns(struct ring_buffer *buffer) 2721 { 2722 struct ring_buffer_per_cpu *cpu_buffer; 2723 unsigned long overruns = 0; 2724 int cpu; 2725 2726 /* if you care about this being correct, lock the buffer */ 2727 for_each_buffer_cpu(buffer, cpu) { 2728 cpu_buffer = buffer->buffers[cpu]; 2729 overruns += local_read(&cpu_buffer->overrun); 2730 } 2731 2732 return overruns; 2733 } 2734 EXPORT_SYMBOL_GPL(ring_buffer_overruns); 2735 2736 static void rb_iter_reset(struct ring_buffer_iter *iter) 2737 { 2738 struct ring_buffer_per_cpu *cpu_buffer = iter->cpu_buffer; 2739 2740 /* Iterator usage is expected to have record disabled */ 2741 if (list_empty(&cpu_buffer->reader_page->list)) { 2742 iter->head_page = rb_set_head_page(cpu_buffer); 2743 if (unlikely(!iter->head_page)) 2744 return; 2745 iter->head = iter->head_page->read; 2746 } else { 2747 iter->head_page = cpu_buffer->reader_page; 2748 iter->head = cpu_buffer->reader_page->read; 2749 } 2750 if (iter->head) 2751 iter->read_stamp = cpu_buffer->read_stamp; 2752 else 2753 iter->read_stamp = iter->head_page->page->time_stamp; 2754 iter->cache_reader_page = cpu_buffer->reader_page; 2755 iter->cache_read = cpu_buffer->read; 2756 } 2757 2758 /** 2759 * ring_buffer_iter_reset - reset an iterator 2760 * @iter: The iterator to reset 2761 * 2762 * Resets the iterator, so that it will start from the beginning 2763 * again. 2764 */ 2765 void ring_buffer_iter_reset(struct ring_buffer_iter *iter) 2766 { 2767 struct ring_buffer_per_cpu *cpu_buffer; 2768 unsigned long flags; 2769 2770 if (!iter) 2771 return; 2772 2773 cpu_buffer = iter->cpu_buffer; 2774 2775 spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 2776 rb_iter_reset(iter); 2777 spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 2778 } 2779 EXPORT_SYMBOL_GPL(ring_buffer_iter_reset); 2780 2781 /** 2782 * ring_buffer_iter_empty - check if an iterator has no more to read 2783 * @iter: The iterator to check 2784 */ 2785 int ring_buffer_iter_empty(struct ring_buffer_iter *iter) 2786 { 2787 struct ring_buffer_per_cpu *cpu_buffer; 2788 2789 cpu_buffer = iter->cpu_buffer; 2790 2791 return iter->head_page == cpu_buffer->commit_page && 2792 iter->head == rb_commit_index(cpu_buffer); 2793 } 2794 EXPORT_SYMBOL_GPL(ring_buffer_iter_empty); 2795 2796 static void 2797 rb_update_read_stamp(struct ring_buffer_per_cpu *cpu_buffer, 2798 struct ring_buffer_event *event) 2799 { 2800 u64 delta; 2801 2802 switch (event->type_len) { 2803 case RINGBUF_TYPE_PADDING: 2804 return; 2805 2806 case RINGBUF_TYPE_TIME_EXTEND: 2807 delta = event->array[0]; 2808 delta <<= TS_SHIFT; 2809 delta += event->time_delta; 2810 cpu_buffer->read_stamp += delta; 2811 return; 2812 2813 case RINGBUF_TYPE_TIME_STAMP: 2814 /* FIXME: not implemented */ 2815 return; 2816 2817 case RINGBUF_TYPE_DATA: 2818 cpu_buffer->read_stamp += event->time_delta; 2819 return; 2820 2821 default: 2822 BUG(); 2823 } 2824 return; 2825 } 2826 2827 static void 2828 rb_update_iter_read_stamp(struct ring_buffer_iter *iter, 2829 struct ring_buffer_event *event) 2830 { 2831 u64 delta; 2832 2833 switch (event->type_len) { 2834 case RINGBUF_TYPE_PADDING: 2835 return; 2836 2837 case RINGBUF_TYPE_TIME_EXTEND: 2838 delta = event->array[0]; 2839 delta <<= TS_SHIFT; 2840 delta += event->time_delta; 2841 iter->read_stamp += delta; 2842 return; 2843 2844 case RINGBUF_TYPE_TIME_STAMP: 2845 /* FIXME: not implemented */ 2846 return; 2847 2848 case RINGBUF_TYPE_DATA: 2849 iter->read_stamp += event->time_delta; 2850 return; 2851 2852 default: 2853 BUG(); 2854 } 2855 return; 2856 } 2857 2858 static struct buffer_page * 2859 rb_get_reader_page(struct ring_buffer_per_cpu *cpu_buffer) 2860 { 2861 struct buffer_page *reader = NULL; 2862 unsigned long overwrite; 2863 unsigned long flags; 2864 int nr_loops = 0; 2865 int ret; 2866 2867 local_irq_save(flags); 2868 arch_spin_lock(&cpu_buffer->lock); 2869 2870 again: 2871 /* 2872 * This should normally only loop twice. But because the 2873 * start of the reader inserts an empty page, it causes 2874 * a case where we will loop three times. There should be no 2875 * reason to loop four times (that I know of). 2876 */ 2877 if (RB_WARN_ON(cpu_buffer, ++nr_loops > 3)) { 2878 reader = NULL; 2879 goto out; 2880 } 2881 2882 reader = cpu_buffer->reader_page; 2883 2884 /* If there's more to read, return this page */ 2885 if (cpu_buffer->reader_page->read < rb_page_size(reader)) 2886 goto out; 2887 2888 /* Never should we have an index greater than the size */ 2889 if (RB_WARN_ON(cpu_buffer, 2890 cpu_buffer->reader_page->read > rb_page_size(reader))) 2891 goto out; 2892 2893 /* check if we caught up to the tail */ 2894 reader = NULL; 2895 if (cpu_buffer->commit_page == cpu_buffer->reader_page) 2896 goto out; 2897 2898 /* 2899 * Reset the reader page to size zero. 2900 */ 2901 local_set(&cpu_buffer->reader_page->write, 0); 2902 local_set(&cpu_buffer->reader_page->entries, 0); 2903 local_set(&cpu_buffer->reader_page->page->commit, 0); 2904 cpu_buffer->reader_page->real_end = 0; 2905 2906 spin: 2907 /* 2908 * Splice the empty reader page into the list around the head. 2909 */ 2910 reader = rb_set_head_page(cpu_buffer); 2911 cpu_buffer->reader_page->list.next = rb_list_head(reader->list.next); 2912 cpu_buffer->reader_page->list.prev = reader->list.prev; 2913 2914 /* 2915 * cpu_buffer->pages just needs to point to the buffer, it 2916 * has no specific buffer page to point to. Lets move it out 2917 * of our way so we don't accidently swap it. 2918 */ 2919 cpu_buffer->pages = reader->list.prev; 2920 2921 /* The reader page will be pointing to the new head */ 2922 rb_set_list_to_head(cpu_buffer, &cpu_buffer->reader_page->list); 2923 2924 /* 2925 * We want to make sure we read the overruns after we set up our 2926 * pointers to the next object. The writer side does a 2927 * cmpxchg to cross pages which acts as the mb on the writer 2928 * side. Note, the reader will constantly fail the swap 2929 * while the writer is updating the pointers, so this 2930 * guarantees that the overwrite recorded here is the one we 2931 * want to compare with the last_overrun. 2932 */ 2933 smp_mb(); 2934 overwrite = local_read(&(cpu_buffer->overrun)); 2935 2936 /* 2937 * Here's the tricky part. 2938 * 2939 * We need to move the pointer past the header page. 2940 * But we can only do that if a writer is not currently 2941 * moving it. The page before the header page has the 2942 * flag bit '1' set if it is pointing to the page we want. 2943 * but if the writer is in the process of moving it 2944 * than it will be '2' or already moved '0'. 2945 */ 2946 2947 ret = rb_head_page_replace(reader, cpu_buffer->reader_page); 2948 2949 /* 2950 * If we did not convert it, then we must try again. 2951 */ 2952 if (!ret) 2953 goto spin; 2954 2955 /* 2956 * Yeah! We succeeded in replacing the page. 2957 * 2958 * Now make the new head point back to the reader page. 2959 */ 2960 rb_list_head(reader->list.next)->prev = &cpu_buffer->reader_page->list; 2961 rb_inc_page(cpu_buffer, &cpu_buffer->head_page); 2962 2963 /* Finally update the reader page to the new head */ 2964 cpu_buffer->reader_page = reader; 2965 rb_reset_reader_page(cpu_buffer); 2966 2967 if (overwrite != cpu_buffer->last_overrun) { 2968 cpu_buffer->lost_events = overwrite - cpu_buffer->last_overrun; 2969 cpu_buffer->last_overrun = overwrite; 2970 } 2971 2972 goto again; 2973 2974 out: 2975 arch_spin_unlock(&cpu_buffer->lock); 2976 local_irq_restore(flags); 2977 2978 return reader; 2979 } 2980 2981 static void rb_advance_reader(struct ring_buffer_per_cpu *cpu_buffer) 2982 { 2983 struct ring_buffer_event *event; 2984 struct buffer_page *reader; 2985 unsigned length; 2986 2987 reader = rb_get_reader_page(cpu_buffer); 2988 2989 /* This function should not be called when buffer is empty */ 2990 if (RB_WARN_ON(cpu_buffer, !reader)) 2991 return; 2992 2993 event = rb_reader_event(cpu_buffer); 2994 2995 if (event->type_len <= RINGBUF_TYPE_DATA_TYPE_LEN_MAX) 2996 cpu_buffer->read++; 2997 2998 rb_update_read_stamp(cpu_buffer, event); 2999 3000 length = rb_event_length(event); 3001 cpu_buffer->reader_page->read += length; 3002 } 3003 3004 static void rb_advance_iter(struct ring_buffer_iter *iter) 3005 { 3006 struct ring_buffer_per_cpu *cpu_buffer; 3007 struct ring_buffer_event *event; 3008 unsigned length; 3009 3010 cpu_buffer = iter->cpu_buffer; 3011 3012 /* 3013 * Check if we are at the end of the buffer. 3014 */ 3015 if (iter->head >= rb_page_size(iter->head_page)) { 3016 /* discarded commits can make the page empty */ 3017 if (iter->head_page == cpu_buffer->commit_page) 3018 return; 3019 rb_inc_iter(iter); 3020 return; 3021 } 3022 3023 event = rb_iter_head_event(iter); 3024 3025 length = rb_event_length(event); 3026 3027 /* 3028 * This should not be called to advance the header if we are 3029 * at the tail of the buffer. 3030 */ 3031 if (RB_WARN_ON(cpu_buffer, 3032 (iter->head_page == cpu_buffer->commit_page) && 3033 (iter->head + length > rb_commit_index(cpu_buffer)))) 3034 return; 3035 3036 rb_update_iter_read_stamp(iter, event); 3037 3038 iter->head += length; 3039 3040 /* check for end of page padding */ 3041 if ((iter->head >= rb_page_size(iter->head_page)) && 3042 (iter->head_page != cpu_buffer->commit_page)) 3043 rb_advance_iter(iter); 3044 } 3045 3046 static int rb_lost_events(struct ring_buffer_per_cpu *cpu_buffer) 3047 { 3048 return cpu_buffer->lost_events; 3049 } 3050 3051 static struct ring_buffer_event * 3052 rb_buffer_peek(struct ring_buffer_per_cpu *cpu_buffer, u64 *ts, 3053 unsigned long *lost_events) 3054 { 3055 struct ring_buffer_event *event; 3056 struct buffer_page *reader; 3057 int nr_loops = 0; 3058 3059 again: 3060 /* 3061 * We repeat when a time extend is encountered. 3062 * Since the time extend is always attached to a data event, 3063 * we should never loop more than once. 3064 * (We never hit the following condition more than twice). 3065 */ 3066 if (RB_WARN_ON(cpu_buffer, ++nr_loops > 2)) 3067 return NULL; 3068 3069 reader = rb_get_reader_page(cpu_buffer); 3070 if (!reader) 3071 return NULL; 3072 3073 event = rb_reader_event(cpu_buffer); 3074 3075 switch (event->type_len) { 3076 case RINGBUF_TYPE_PADDING: 3077 if (rb_null_event(event)) 3078 RB_WARN_ON(cpu_buffer, 1); 3079 /* 3080 * Because the writer could be discarding every 3081 * event it creates (which would probably be bad) 3082 * if we were to go back to "again" then we may never 3083 * catch up, and will trigger the warn on, or lock 3084 * the box. Return the padding, and we will release 3085 * the current locks, and try again. 3086 */ 3087 return event; 3088 3089 case RINGBUF_TYPE_TIME_EXTEND: 3090 /* Internal data, OK to advance */ 3091 rb_advance_reader(cpu_buffer); 3092 goto again; 3093 3094 case RINGBUF_TYPE_TIME_STAMP: 3095 /* FIXME: not implemented */ 3096 rb_advance_reader(cpu_buffer); 3097 goto again; 3098 3099 case RINGBUF_TYPE_DATA: 3100 if (ts) { 3101 *ts = cpu_buffer->read_stamp + event->time_delta; 3102 ring_buffer_normalize_time_stamp(cpu_buffer->buffer, 3103 cpu_buffer->cpu, ts); 3104 } 3105 if (lost_events) 3106 *lost_events = rb_lost_events(cpu_buffer); 3107 return event; 3108 3109 default: 3110 BUG(); 3111 } 3112 3113 return NULL; 3114 } 3115 EXPORT_SYMBOL_GPL(ring_buffer_peek); 3116 3117 static struct ring_buffer_event * 3118 rb_iter_peek(struct ring_buffer_iter *iter, u64 *ts) 3119 { 3120 struct ring_buffer *buffer; 3121 struct ring_buffer_per_cpu *cpu_buffer; 3122 struct ring_buffer_event *event; 3123 int nr_loops = 0; 3124 3125 cpu_buffer = iter->cpu_buffer; 3126 buffer = cpu_buffer->buffer; 3127 3128 /* 3129 * Check if someone performed a consuming read to 3130 * the buffer. A consuming read invalidates the iterator 3131 * and we need to reset the iterator in this case. 3132 */ 3133 if (unlikely(iter->cache_read != cpu_buffer->read || 3134 iter->cache_reader_page != cpu_buffer->reader_page)) 3135 rb_iter_reset(iter); 3136 3137 again: 3138 if (ring_buffer_iter_empty(iter)) 3139 return NULL; 3140 3141 /* 3142 * We repeat when a time extend is encountered. 3143 * Since the time extend is always attached to a data event, 3144 * we should never loop more than once. 3145 * (We never hit the following condition more than twice). 3146 */ 3147 if (RB_WARN_ON(cpu_buffer, ++nr_loops > 2)) 3148 return NULL; 3149 3150 if (rb_per_cpu_empty(cpu_buffer)) 3151 return NULL; 3152 3153 if (iter->head >= local_read(&iter->head_page->page->commit)) { 3154 rb_inc_iter(iter); 3155 goto again; 3156 } 3157 3158 event = rb_iter_head_event(iter); 3159 3160 switch (event->type_len) { 3161 case RINGBUF_TYPE_PADDING: 3162 if (rb_null_event(event)) { 3163 rb_inc_iter(iter); 3164 goto again; 3165 } 3166 rb_advance_iter(iter); 3167 return event; 3168 3169 case RINGBUF_TYPE_TIME_EXTEND: 3170 /* Internal data, OK to advance */ 3171 rb_advance_iter(iter); 3172 goto again; 3173 3174 case RINGBUF_TYPE_TIME_STAMP: 3175 /* FIXME: not implemented */ 3176 rb_advance_iter(iter); 3177 goto again; 3178 3179 case RINGBUF_TYPE_DATA: 3180 if (ts) { 3181 *ts = iter->read_stamp + event->time_delta; 3182 ring_buffer_normalize_time_stamp(buffer, 3183 cpu_buffer->cpu, ts); 3184 } 3185 return event; 3186 3187 default: 3188 BUG(); 3189 } 3190 3191 return NULL; 3192 } 3193 EXPORT_SYMBOL_GPL(ring_buffer_iter_peek); 3194 3195 static inline int rb_ok_to_lock(void) 3196 { 3197 /* 3198 * If an NMI die dumps out the content of the ring buffer 3199 * do not grab locks. We also permanently disable the ring 3200 * buffer too. A one time deal is all you get from reading 3201 * the ring buffer from an NMI. 3202 */ 3203 if (likely(!in_nmi())) 3204 return 1; 3205 3206 tracing_off_permanent(); 3207 return 0; 3208 } 3209 3210 /** 3211 * ring_buffer_peek - peek at the next event to be read 3212 * @buffer: The ring buffer to read 3213 * @cpu: The cpu to peak at 3214 * @ts: The timestamp counter of this event. 3215 * @lost_events: a variable to store if events were lost (may be NULL) 3216 * 3217 * This will return the event that will be read next, but does 3218 * not consume the data. 3219 */ 3220 struct ring_buffer_event * 3221 ring_buffer_peek(struct ring_buffer *buffer, int cpu, u64 *ts, 3222 unsigned long *lost_events) 3223 { 3224 struct ring_buffer_per_cpu *cpu_buffer = buffer->buffers[cpu]; 3225 struct ring_buffer_event *event; 3226 unsigned long flags; 3227 int dolock; 3228 3229 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 3230 return NULL; 3231 3232 dolock = rb_ok_to_lock(); 3233 again: 3234 local_irq_save(flags); 3235 if (dolock) 3236 spin_lock(&cpu_buffer->reader_lock); 3237 event = rb_buffer_peek(cpu_buffer, ts, lost_events); 3238 if (event && event->type_len == RINGBUF_TYPE_PADDING) 3239 rb_advance_reader(cpu_buffer); 3240 if (dolock) 3241 spin_unlock(&cpu_buffer->reader_lock); 3242 local_irq_restore(flags); 3243 3244 if (event && event->type_len == RINGBUF_TYPE_PADDING) 3245 goto again; 3246 3247 return event; 3248 } 3249 3250 /** 3251 * ring_buffer_iter_peek - peek at the next event to be read 3252 * @iter: The ring buffer iterator 3253 * @ts: The timestamp counter of this event. 3254 * 3255 * This will return the event that will be read next, but does 3256 * not increment the iterator. 3257 */ 3258 struct ring_buffer_event * 3259 ring_buffer_iter_peek(struct ring_buffer_iter *iter, u64 *ts) 3260 { 3261 struct ring_buffer_per_cpu *cpu_buffer = iter->cpu_buffer; 3262 struct ring_buffer_event *event; 3263 unsigned long flags; 3264 3265 again: 3266 spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 3267 event = rb_iter_peek(iter, ts); 3268 spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 3269 3270 if (event && event->type_len == RINGBUF_TYPE_PADDING) 3271 goto again; 3272 3273 return event; 3274 } 3275 3276 /** 3277 * ring_buffer_consume - return an event and consume it 3278 * @buffer: The ring buffer to get the next event from 3279 * @cpu: the cpu to read the buffer from 3280 * @ts: a variable to store the timestamp (may be NULL) 3281 * @lost_events: a variable to store if events were lost (may be NULL) 3282 * 3283 * Returns the next event in the ring buffer, and that event is consumed. 3284 * Meaning, that sequential reads will keep returning a different event, 3285 * and eventually empty the ring buffer if the producer is slower. 3286 */ 3287 struct ring_buffer_event * 3288 ring_buffer_consume(struct ring_buffer *buffer, int cpu, u64 *ts, 3289 unsigned long *lost_events) 3290 { 3291 struct ring_buffer_per_cpu *cpu_buffer; 3292 struct ring_buffer_event *event = NULL; 3293 unsigned long flags; 3294 int dolock; 3295 3296 dolock = rb_ok_to_lock(); 3297 3298 again: 3299 /* might be called in atomic */ 3300 preempt_disable(); 3301 3302 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 3303 goto out; 3304 3305 cpu_buffer = buffer->buffers[cpu]; 3306 local_irq_save(flags); 3307 if (dolock) 3308 spin_lock(&cpu_buffer->reader_lock); 3309 3310 event = rb_buffer_peek(cpu_buffer, ts, lost_events); 3311 if (event) { 3312 cpu_buffer->lost_events = 0; 3313 rb_advance_reader(cpu_buffer); 3314 } 3315 3316 if (dolock) 3317 spin_unlock(&cpu_buffer->reader_lock); 3318 local_irq_restore(flags); 3319 3320 out: 3321 preempt_enable(); 3322 3323 if (event && event->type_len == RINGBUF_TYPE_PADDING) 3324 goto again; 3325 3326 return event; 3327 } 3328 EXPORT_SYMBOL_GPL(ring_buffer_consume); 3329 3330 /** 3331 * ring_buffer_read_prepare - Prepare for a non consuming read of the buffer 3332 * @buffer: The ring buffer to read from 3333 * @cpu: The cpu buffer to iterate over 3334 * 3335 * This performs the initial preparations necessary to iterate 3336 * through the buffer. Memory is allocated, buffer recording 3337 * is disabled, and the iterator pointer is returned to the caller. 3338 * 3339 * Disabling buffer recordng prevents the reading from being 3340 * corrupted. This is not a consuming read, so a producer is not 3341 * expected. 3342 * 3343 * After a sequence of ring_buffer_read_prepare calls, the user is 3344 * expected to make at least one call to ring_buffer_prepare_sync. 3345 * Afterwards, ring_buffer_read_start is invoked to get things going 3346 * for real. 3347 * 3348 * This overall must be paired with ring_buffer_finish. 3349 */ 3350 struct ring_buffer_iter * 3351 ring_buffer_read_prepare(struct ring_buffer *buffer, int cpu) 3352 { 3353 struct ring_buffer_per_cpu *cpu_buffer; 3354 struct ring_buffer_iter *iter; 3355 3356 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 3357 return NULL; 3358 3359 iter = kmalloc(sizeof(*iter), GFP_KERNEL); 3360 if (!iter) 3361 return NULL; 3362 3363 cpu_buffer = buffer->buffers[cpu]; 3364 3365 iter->cpu_buffer = cpu_buffer; 3366 3367 atomic_inc(&cpu_buffer->record_disabled); 3368 3369 return iter; 3370 } 3371 EXPORT_SYMBOL_GPL(ring_buffer_read_prepare); 3372 3373 /** 3374 * ring_buffer_read_prepare_sync - Synchronize a set of prepare calls 3375 * 3376 * All previously invoked ring_buffer_read_prepare calls to prepare 3377 * iterators will be synchronized. Afterwards, read_buffer_read_start 3378 * calls on those iterators are allowed. 3379 */ 3380 void 3381 ring_buffer_read_prepare_sync(void) 3382 { 3383 synchronize_sched(); 3384 } 3385 EXPORT_SYMBOL_GPL(ring_buffer_read_prepare_sync); 3386 3387 /** 3388 * ring_buffer_read_start - start a non consuming read of the buffer 3389 * @iter: The iterator returned by ring_buffer_read_prepare 3390 * 3391 * This finalizes the startup of an iteration through the buffer. 3392 * The iterator comes from a call to ring_buffer_read_prepare and 3393 * an intervening ring_buffer_read_prepare_sync must have been 3394 * performed. 3395 * 3396 * Must be paired with ring_buffer_finish. 3397 */ 3398 void 3399 ring_buffer_read_start(struct ring_buffer_iter *iter) 3400 { 3401 struct ring_buffer_per_cpu *cpu_buffer; 3402 unsigned long flags; 3403 3404 if (!iter) 3405 return; 3406 3407 cpu_buffer = iter->cpu_buffer; 3408 3409 spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 3410 arch_spin_lock(&cpu_buffer->lock); 3411 rb_iter_reset(iter); 3412 arch_spin_unlock(&cpu_buffer->lock); 3413 spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 3414 } 3415 EXPORT_SYMBOL_GPL(ring_buffer_read_start); 3416 3417 /** 3418 * ring_buffer_finish - finish reading the iterator of the buffer 3419 * @iter: The iterator retrieved by ring_buffer_start 3420 * 3421 * This re-enables the recording to the buffer, and frees the 3422 * iterator. 3423 */ 3424 void 3425 ring_buffer_read_finish(struct ring_buffer_iter *iter) 3426 { 3427 struct ring_buffer_per_cpu *cpu_buffer = iter->cpu_buffer; 3428 3429 atomic_dec(&cpu_buffer->record_disabled); 3430 kfree(iter); 3431 } 3432 EXPORT_SYMBOL_GPL(ring_buffer_read_finish); 3433 3434 /** 3435 * ring_buffer_read - read the next item in the ring buffer by the iterator 3436 * @iter: The ring buffer iterator 3437 * @ts: The time stamp of the event read. 3438 * 3439 * This reads the next event in the ring buffer and increments the iterator. 3440 */ 3441 struct ring_buffer_event * 3442 ring_buffer_read(struct ring_buffer_iter *iter, u64 *ts) 3443 { 3444 struct ring_buffer_event *event; 3445 struct ring_buffer_per_cpu *cpu_buffer = iter->cpu_buffer; 3446 unsigned long flags; 3447 3448 spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 3449 again: 3450 event = rb_iter_peek(iter, ts); 3451 if (!event) 3452 goto out; 3453 3454 if (event->type_len == RINGBUF_TYPE_PADDING) 3455 goto again; 3456 3457 rb_advance_iter(iter); 3458 out: 3459 spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 3460 3461 return event; 3462 } 3463 EXPORT_SYMBOL_GPL(ring_buffer_read); 3464 3465 /** 3466 * ring_buffer_size - return the size of the ring buffer (in bytes) 3467 * @buffer: The ring buffer. 3468 */ 3469 unsigned long ring_buffer_size(struct ring_buffer *buffer) 3470 { 3471 return BUF_PAGE_SIZE * buffer->pages; 3472 } 3473 EXPORT_SYMBOL_GPL(ring_buffer_size); 3474 3475 static void 3476 rb_reset_cpu(struct ring_buffer_per_cpu *cpu_buffer) 3477 { 3478 rb_head_page_deactivate(cpu_buffer); 3479 3480 cpu_buffer->head_page 3481 = list_entry(cpu_buffer->pages, struct buffer_page, list); 3482 local_set(&cpu_buffer->head_page->write, 0); 3483 local_set(&cpu_buffer->head_page->entries, 0); 3484 local_set(&cpu_buffer->head_page->page->commit, 0); 3485 3486 cpu_buffer->head_page->read = 0; 3487 3488 cpu_buffer->tail_page = cpu_buffer->head_page; 3489 cpu_buffer->commit_page = cpu_buffer->head_page; 3490 3491 INIT_LIST_HEAD(&cpu_buffer->reader_page->list); 3492 local_set(&cpu_buffer->reader_page->write, 0); 3493 local_set(&cpu_buffer->reader_page->entries, 0); 3494 local_set(&cpu_buffer->reader_page->page->commit, 0); 3495 cpu_buffer->reader_page->read = 0; 3496 3497 local_set(&cpu_buffer->commit_overrun, 0); 3498 local_set(&cpu_buffer->overrun, 0); 3499 local_set(&cpu_buffer->entries, 0); 3500 local_set(&cpu_buffer->committing, 0); 3501 local_set(&cpu_buffer->commits, 0); 3502 cpu_buffer->read = 0; 3503 3504 cpu_buffer->write_stamp = 0; 3505 cpu_buffer->read_stamp = 0; 3506 3507 cpu_buffer->lost_events = 0; 3508 cpu_buffer->last_overrun = 0; 3509 3510 rb_head_page_activate(cpu_buffer); 3511 } 3512 3513 /** 3514 * ring_buffer_reset_cpu - reset a ring buffer per CPU buffer 3515 * @buffer: The ring buffer to reset a per cpu buffer of 3516 * @cpu: The CPU buffer to be reset 3517 */ 3518 void ring_buffer_reset_cpu(struct ring_buffer *buffer, int cpu) 3519 { 3520 struct ring_buffer_per_cpu *cpu_buffer = buffer->buffers[cpu]; 3521 unsigned long flags; 3522 3523 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 3524 return; 3525 3526 atomic_inc(&cpu_buffer->record_disabled); 3527 3528 spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 3529 3530 if (RB_WARN_ON(cpu_buffer, local_read(&cpu_buffer->committing))) 3531 goto out; 3532 3533 arch_spin_lock(&cpu_buffer->lock); 3534 3535 rb_reset_cpu(cpu_buffer); 3536 3537 arch_spin_unlock(&cpu_buffer->lock); 3538 3539 out: 3540 spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 3541 3542 atomic_dec(&cpu_buffer->record_disabled); 3543 } 3544 EXPORT_SYMBOL_GPL(ring_buffer_reset_cpu); 3545 3546 /** 3547 * ring_buffer_reset - reset a ring buffer 3548 * @buffer: The ring buffer to reset all cpu buffers 3549 */ 3550 void ring_buffer_reset(struct ring_buffer *buffer) 3551 { 3552 int cpu; 3553 3554 for_each_buffer_cpu(buffer, cpu) 3555 ring_buffer_reset_cpu(buffer, cpu); 3556 } 3557 EXPORT_SYMBOL_GPL(ring_buffer_reset); 3558 3559 /** 3560 * rind_buffer_empty - is the ring buffer empty? 3561 * @buffer: The ring buffer to test 3562 */ 3563 int ring_buffer_empty(struct ring_buffer *buffer) 3564 { 3565 struct ring_buffer_per_cpu *cpu_buffer; 3566 unsigned long flags; 3567 int dolock; 3568 int cpu; 3569 int ret; 3570 3571 dolock = rb_ok_to_lock(); 3572 3573 /* yes this is racy, but if you don't like the race, lock the buffer */ 3574 for_each_buffer_cpu(buffer, cpu) { 3575 cpu_buffer = buffer->buffers[cpu]; 3576 local_irq_save(flags); 3577 if (dolock) 3578 spin_lock(&cpu_buffer->reader_lock); 3579 ret = rb_per_cpu_empty(cpu_buffer); 3580 if (dolock) 3581 spin_unlock(&cpu_buffer->reader_lock); 3582 local_irq_restore(flags); 3583 3584 if (!ret) 3585 return 0; 3586 } 3587 3588 return 1; 3589 } 3590 EXPORT_SYMBOL_GPL(ring_buffer_empty); 3591 3592 /** 3593 * ring_buffer_empty_cpu - is a cpu buffer of a ring buffer empty? 3594 * @buffer: The ring buffer 3595 * @cpu: The CPU buffer to test 3596 */ 3597 int ring_buffer_empty_cpu(struct ring_buffer *buffer, int cpu) 3598 { 3599 struct ring_buffer_per_cpu *cpu_buffer; 3600 unsigned long flags; 3601 int dolock; 3602 int ret; 3603 3604 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 3605 return 1; 3606 3607 dolock = rb_ok_to_lock(); 3608 3609 cpu_buffer = buffer->buffers[cpu]; 3610 local_irq_save(flags); 3611 if (dolock) 3612 spin_lock(&cpu_buffer->reader_lock); 3613 ret = rb_per_cpu_empty(cpu_buffer); 3614 if (dolock) 3615 spin_unlock(&cpu_buffer->reader_lock); 3616 local_irq_restore(flags); 3617 3618 return ret; 3619 } 3620 EXPORT_SYMBOL_GPL(ring_buffer_empty_cpu); 3621 3622 #ifdef CONFIG_RING_BUFFER_ALLOW_SWAP 3623 /** 3624 * ring_buffer_swap_cpu - swap a CPU buffer between two ring buffers 3625 * @buffer_a: One buffer to swap with 3626 * @buffer_b: The other buffer to swap with 3627 * 3628 * This function is useful for tracers that want to take a "snapshot" 3629 * of a CPU buffer and has another back up buffer lying around. 3630 * it is expected that the tracer handles the cpu buffer not being 3631 * used at the moment. 3632 */ 3633 int ring_buffer_swap_cpu(struct ring_buffer *buffer_a, 3634 struct ring_buffer *buffer_b, int cpu) 3635 { 3636 struct ring_buffer_per_cpu *cpu_buffer_a; 3637 struct ring_buffer_per_cpu *cpu_buffer_b; 3638 int ret = -EINVAL; 3639 3640 if (!cpumask_test_cpu(cpu, buffer_a->cpumask) || 3641 !cpumask_test_cpu(cpu, buffer_b->cpumask)) 3642 goto out; 3643 3644 /* At least make sure the two buffers are somewhat the same */ 3645 if (buffer_a->pages != buffer_b->pages) 3646 goto out; 3647 3648 ret = -EAGAIN; 3649 3650 if (ring_buffer_flags != RB_BUFFERS_ON) 3651 goto out; 3652 3653 if (atomic_read(&buffer_a->record_disabled)) 3654 goto out; 3655 3656 if (atomic_read(&buffer_b->record_disabled)) 3657 goto out; 3658 3659 cpu_buffer_a = buffer_a->buffers[cpu]; 3660 cpu_buffer_b = buffer_b->buffers[cpu]; 3661 3662 if (atomic_read(&cpu_buffer_a->record_disabled)) 3663 goto out; 3664 3665 if (atomic_read(&cpu_buffer_b->record_disabled)) 3666 goto out; 3667 3668 /* 3669 * We can't do a synchronize_sched here because this 3670 * function can be called in atomic context. 3671 * Normally this will be called from the same CPU as cpu. 3672 * If not it's up to the caller to protect this. 3673 */ 3674 atomic_inc(&cpu_buffer_a->record_disabled); 3675 atomic_inc(&cpu_buffer_b->record_disabled); 3676 3677 ret = -EBUSY; 3678 if (local_read(&cpu_buffer_a->committing)) 3679 goto out_dec; 3680 if (local_read(&cpu_buffer_b->committing)) 3681 goto out_dec; 3682 3683 buffer_a->buffers[cpu] = cpu_buffer_b; 3684 buffer_b->buffers[cpu] = cpu_buffer_a; 3685 3686 cpu_buffer_b->buffer = buffer_a; 3687 cpu_buffer_a->buffer = buffer_b; 3688 3689 ret = 0; 3690 3691 out_dec: 3692 atomic_dec(&cpu_buffer_a->record_disabled); 3693 atomic_dec(&cpu_buffer_b->record_disabled); 3694 out: 3695 return ret; 3696 } 3697 EXPORT_SYMBOL_GPL(ring_buffer_swap_cpu); 3698 #endif /* CONFIG_RING_BUFFER_ALLOW_SWAP */ 3699 3700 /** 3701 * ring_buffer_alloc_read_page - allocate a page to read from buffer 3702 * @buffer: the buffer to allocate for. 3703 * 3704 * This function is used in conjunction with ring_buffer_read_page. 3705 * When reading a full page from the ring buffer, these functions 3706 * can be used to speed up the process. The calling function should 3707 * allocate a few pages first with this function. Then when it 3708 * needs to get pages from the ring buffer, it passes the result 3709 * of this function into ring_buffer_read_page, which will swap 3710 * the page that was allocated, with the read page of the buffer. 3711 * 3712 * Returns: 3713 * The page allocated, or NULL on error. 3714 */ 3715 void *ring_buffer_alloc_read_page(struct ring_buffer *buffer) 3716 { 3717 struct buffer_data_page *bpage; 3718 unsigned long addr; 3719 3720 addr = __get_free_page(GFP_KERNEL); 3721 if (!addr) 3722 return NULL; 3723 3724 bpage = (void *)addr; 3725 3726 rb_init_page(bpage); 3727 3728 return bpage; 3729 } 3730 EXPORT_SYMBOL_GPL(ring_buffer_alloc_read_page); 3731 3732 /** 3733 * ring_buffer_free_read_page - free an allocated read page 3734 * @buffer: the buffer the page was allocate for 3735 * @data: the page to free 3736 * 3737 * Free a page allocated from ring_buffer_alloc_read_page. 3738 */ 3739 void ring_buffer_free_read_page(struct ring_buffer *buffer, void *data) 3740 { 3741 free_page((unsigned long)data); 3742 } 3743 EXPORT_SYMBOL_GPL(ring_buffer_free_read_page); 3744 3745 /** 3746 * ring_buffer_read_page - extract a page from the ring buffer 3747 * @buffer: buffer to extract from 3748 * @data_page: the page to use allocated from ring_buffer_alloc_read_page 3749 * @len: amount to extract 3750 * @cpu: the cpu of the buffer to extract 3751 * @full: should the extraction only happen when the page is full. 3752 * 3753 * This function will pull out a page from the ring buffer and consume it. 3754 * @data_page must be the address of the variable that was returned 3755 * from ring_buffer_alloc_read_page. This is because the page might be used 3756 * to swap with a page in the ring buffer. 3757 * 3758 * for example: 3759 * rpage = ring_buffer_alloc_read_page(buffer); 3760 * if (!rpage) 3761 * return error; 3762 * ret = ring_buffer_read_page(buffer, &rpage, len, cpu, 0); 3763 * if (ret >= 0) 3764 * process_page(rpage, ret); 3765 * 3766 * When @full is set, the function will not return true unless 3767 * the writer is off the reader page. 3768 * 3769 * Note: it is up to the calling functions to handle sleeps and wakeups. 3770 * The ring buffer can be used anywhere in the kernel and can not 3771 * blindly call wake_up. The layer that uses the ring buffer must be 3772 * responsible for that. 3773 * 3774 * Returns: 3775 * >=0 if data has been transferred, returns the offset of consumed data. 3776 * <0 if no data has been transferred. 3777 */ 3778 int ring_buffer_read_page(struct ring_buffer *buffer, 3779 void **data_page, size_t len, int cpu, int full) 3780 { 3781 struct ring_buffer_per_cpu *cpu_buffer = buffer->buffers[cpu]; 3782 struct ring_buffer_event *event; 3783 struct buffer_data_page *bpage; 3784 struct buffer_page *reader; 3785 unsigned long missed_events; 3786 unsigned long flags; 3787 unsigned int commit; 3788 unsigned int read; 3789 u64 save_timestamp; 3790 int ret = -1; 3791 3792 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 3793 goto out; 3794 3795 /* 3796 * If len is not big enough to hold the page header, then 3797 * we can not copy anything. 3798 */ 3799 if (len <= BUF_PAGE_HDR_SIZE) 3800 goto out; 3801 3802 len -= BUF_PAGE_HDR_SIZE; 3803 3804 if (!data_page) 3805 goto out; 3806 3807 bpage = *data_page; 3808 if (!bpage) 3809 goto out; 3810 3811 spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 3812 3813 reader = rb_get_reader_page(cpu_buffer); 3814 if (!reader) 3815 goto out_unlock; 3816 3817 event = rb_reader_event(cpu_buffer); 3818 3819 read = reader->read; 3820 commit = rb_page_commit(reader); 3821 3822 /* Check if any events were dropped */ 3823 missed_events = cpu_buffer->lost_events; 3824 3825 /* 3826 * If this page has been partially read or 3827 * if len is not big enough to read the rest of the page or 3828 * a writer is still on the page, then 3829 * we must copy the data from the page to the buffer. 3830 * Otherwise, we can simply swap the page with the one passed in. 3831 */ 3832 if (read || (len < (commit - read)) || 3833 cpu_buffer->reader_page == cpu_buffer->commit_page) { 3834 struct buffer_data_page *rpage = cpu_buffer->reader_page->page; 3835 unsigned int rpos = read; 3836 unsigned int pos = 0; 3837 unsigned int size; 3838 3839 if (full) 3840 goto out_unlock; 3841 3842 if (len > (commit - read)) 3843 len = (commit - read); 3844 3845 /* Always keep the time extend and data together */ 3846 size = rb_event_ts_length(event); 3847 3848 if (len < size) 3849 goto out_unlock; 3850 3851 /* save the current timestamp, since the user will need it */ 3852 save_timestamp = cpu_buffer->read_stamp; 3853 3854 /* Need to copy one event at a time */ 3855 do { 3856 /* We need the size of one event, because 3857 * rb_advance_reader only advances by one event, 3858 * whereas rb_event_ts_length may include the size of 3859 * one or two events. 3860 * We have already ensured there's enough space if this 3861 * is a time extend. */ 3862 size = rb_event_length(event); 3863 memcpy(bpage->data + pos, rpage->data + rpos, size); 3864 3865 len -= size; 3866 3867 rb_advance_reader(cpu_buffer); 3868 rpos = reader->read; 3869 pos += size; 3870 3871 if (rpos >= commit) 3872 break; 3873 3874 event = rb_reader_event(cpu_buffer); 3875 /* Always keep the time extend and data together */ 3876 size = rb_event_ts_length(event); 3877 } while (len >= size); 3878 3879 /* update bpage */ 3880 local_set(&bpage->commit, pos); 3881 bpage->time_stamp = save_timestamp; 3882 3883 /* we copied everything to the beginning */ 3884 read = 0; 3885 } else { 3886 /* update the entry counter */ 3887 cpu_buffer->read += rb_page_entries(reader); 3888 3889 /* swap the pages */ 3890 rb_init_page(bpage); 3891 bpage = reader->page; 3892 reader->page = *data_page; 3893 local_set(&reader->write, 0); 3894 local_set(&reader->entries, 0); 3895 reader->read = 0; 3896 *data_page = bpage; 3897 3898 /* 3899 * Use the real_end for the data size, 3900 * This gives us a chance to store the lost events 3901 * on the page. 3902 */ 3903 if (reader->real_end) 3904 local_set(&bpage->commit, reader->real_end); 3905 } 3906 ret = read; 3907 3908 cpu_buffer->lost_events = 0; 3909 3910 commit = local_read(&bpage->commit); 3911 /* 3912 * Set a flag in the commit field if we lost events 3913 */ 3914 if (missed_events) { 3915 /* If there is room at the end of the page to save the 3916 * missed events, then record it there. 3917 */ 3918 if (BUF_PAGE_SIZE - commit >= sizeof(missed_events)) { 3919 memcpy(&bpage->data[commit], &missed_events, 3920 sizeof(missed_events)); 3921 local_add(RB_MISSED_STORED, &bpage->commit); 3922 commit += sizeof(missed_events); 3923 } 3924 local_add(RB_MISSED_EVENTS, &bpage->commit); 3925 } 3926 3927 /* 3928 * This page may be off to user land. Zero it out here. 3929 */ 3930 if (commit < BUF_PAGE_SIZE) 3931 memset(&bpage->data[commit], 0, BUF_PAGE_SIZE - commit); 3932 3933 out_unlock: 3934 spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 3935 3936 out: 3937 return ret; 3938 } 3939 EXPORT_SYMBOL_GPL(ring_buffer_read_page); 3940 3941 #ifdef CONFIG_TRACING 3942 static ssize_t 3943 rb_simple_read(struct file *filp, char __user *ubuf, 3944 size_t cnt, loff_t *ppos) 3945 { 3946 unsigned long *p = filp->private_data; 3947 char buf[64]; 3948 int r; 3949 3950 if (test_bit(RB_BUFFERS_DISABLED_BIT, p)) 3951 r = sprintf(buf, "permanently disabled\n"); 3952 else 3953 r = sprintf(buf, "%d\n", test_bit(RB_BUFFERS_ON_BIT, p)); 3954 3955 return simple_read_from_buffer(ubuf, cnt, ppos, buf, r); 3956 } 3957 3958 static ssize_t 3959 rb_simple_write(struct file *filp, const char __user *ubuf, 3960 size_t cnt, loff_t *ppos) 3961 { 3962 unsigned long *p = filp->private_data; 3963 char buf[64]; 3964 unsigned long val; 3965 int ret; 3966 3967 if (cnt >= sizeof(buf)) 3968 return -EINVAL; 3969 3970 if (copy_from_user(&buf, ubuf, cnt)) 3971 return -EFAULT; 3972 3973 buf[cnt] = 0; 3974 3975 ret = strict_strtoul(buf, 10, &val); 3976 if (ret < 0) 3977 return ret; 3978 3979 if (val) 3980 set_bit(RB_BUFFERS_ON_BIT, p); 3981 else 3982 clear_bit(RB_BUFFERS_ON_BIT, p); 3983 3984 (*ppos)++; 3985 3986 return cnt; 3987 } 3988 3989 static const struct file_operations rb_simple_fops = { 3990 .open = tracing_open_generic, 3991 .read = rb_simple_read, 3992 .write = rb_simple_write, 3993 .llseek = default_llseek, 3994 }; 3995 3996 3997 static __init int rb_init_debugfs(void) 3998 { 3999 struct dentry *d_tracer; 4000 4001 d_tracer = tracing_init_dentry(); 4002 4003 trace_create_file("tracing_on", 0644, d_tracer, 4004 &ring_buffer_flags, &rb_simple_fops); 4005 4006 return 0; 4007 } 4008 4009 fs_initcall(rb_init_debugfs); 4010 #endif 4011 4012 #ifdef CONFIG_HOTPLUG_CPU 4013 static int rb_cpu_notify(struct notifier_block *self, 4014 unsigned long action, void *hcpu) 4015 { 4016 struct ring_buffer *buffer = 4017 container_of(self, struct ring_buffer, cpu_notify); 4018 long cpu = (long)hcpu; 4019 4020 switch (action) { 4021 case CPU_UP_PREPARE: 4022 case CPU_UP_PREPARE_FROZEN: 4023 if (cpumask_test_cpu(cpu, buffer->cpumask)) 4024 return NOTIFY_OK; 4025 4026 buffer->buffers[cpu] = 4027 rb_allocate_cpu_buffer(buffer, cpu); 4028 if (!buffer->buffers[cpu]) { 4029 WARN(1, "failed to allocate ring buffer on CPU %ld\n", 4030 cpu); 4031 return NOTIFY_OK; 4032 } 4033 smp_wmb(); 4034 cpumask_set_cpu(cpu, buffer->cpumask); 4035 break; 4036 case CPU_DOWN_PREPARE: 4037 case CPU_DOWN_PREPARE_FROZEN: 4038 /* 4039 * Do nothing. 4040 * If we were to free the buffer, then the user would 4041 * lose any trace that was in the buffer. 4042 */ 4043 break; 4044 default: 4045 break; 4046 } 4047 return NOTIFY_OK; 4048 } 4049 #endif 4050