1 /* 2 * Generic ring buffer 3 * 4 * Copyright (C) 2008 Steven Rostedt <srostedt@redhat.com> 5 */ 6 #include <linux/ring_buffer.h> 7 #include <linux/trace_clock.h> 8 #include <linux/ftrace_irq.h> 9 #include <linux/spinlock.h> 10 #include <linux/debugfs.h> 11 #include <linux/uaccess.h> 12 #include <linux/hardirq.h> 13 #include <linux/kmemcheck.h> 14 #include <linux/module.h> 15 #include <linux/percpu.h> 16 #include <linux/mutex.h> 17 #include <linux/slab.h> 18 #include <linux/init.h> 19 #include <linux/hash.h> 20 #include <linux/list.h> 21 #include <linux/cpu.h> 22 #include <linux/fs.h> 23 24 #include <asm/local.h> 25 #include "trace.h" 26 27 /* 28 * The ring buffer header is special. We must manually up keep it. 29 */ 30 int ring_buffer_print_entry_header(struct trace_seq *s) 31 { 32 int ret; 33 34 ret = trace_seq_printf(s, "# compressed entry header\n"); 35 ret = trace_seq_printf(s, "\ttype_len : 5 bits\n"); 36 ret = trace_seq_printf(s, "\ttime_delta : 27 bits\n"); 37 ret = trace_seq_printf(s, "\tarray : 32 bits\n"); 38 ret = trace_seq_printf(s, "\n"); 39 ret = trace_seq_printf(s, "\tpadding : type == %d\n", 40 RINGBUF_TYPE_PADDING); 41 ret = trace_seq_printf(s, "\ttime_extend : type == %d\n", 42 RINGBUF_TYPE_TIME_EXTEND); 43 ret = trace_seq_printf(s, "\tdata max type_len == %d\n", 44 RINGBUF_TYPE_DATA_TYPE_LEN_MAX); 45 46 return ret; 47 } 48 49 /* 50 * The ring buffer is made up of a list of pages. A separate list of pages is 51 * allocated for each CPU. A writer may only write to a buffer that is 52 * associated with the CPU it is currently executing on. A reader may read 53 * from any per cpu buffer. 54 * 55 * The reader is special. For each per cpu buffer, the reader has its own 56 * reader page. When a reader has read the entire reader page, this reader 57 * page is swapped with another page in the ring buffer. 58 * 59 * Now, as long as the writer is off the reader page, the reader can do what 60 * ever it wants with that page. The writer will never write to that page 61 * again (as long as it is out of the ring buffer). 62 * 63 * Here's some silly ASCII art. 64 * 65 * +------+ 66 * |reader| RING BUFFER 67 * |page | 68 * +------+ +---+ +---+ +---+ 69 * | |-->| |-->| | 70 * +---+ +---+ +---+ 71 * ^ | 72 * | | 73 * +---------------+ 74 * 75 * 76 * +------+ 77 * |reader| RING BUFFER 78 * |page |------------------v 79 * +------+ +---+ +---+ +---+ 80 * | |-->| |-->| | 81 * +---+ +---+ +---+ 82 * ^ | 83 * | | 84 * +---------------+ 85 * 86 * 87 * +------+ 88 * |reader| RING BUFFER 89 * |page |------------------v 90 * +------+ +---+ +---+ +---+ 91 * ^ | |-->| |-->| | 92 * | +---+ +---+ +---+ 93 * | | 94 * | | 95 * +------------------------------+ 96 * 97 * 98 * +------+ 99 * |buffer| RING BUFFER 100 * |page |------------------v 101 * +------+ +---+ +---+ +---+ 102 * ^ | | | |-->| | 103 * | New +---+ +---+ +---+ 104 * | Reader------^ | 105 * | page | 106 * +------------------------------+ 107 * 108 * 109 * After we make this swap, the reader can hand this page off to the splice 110 * code and be done with it. It can even allocate a new page if it needs to 111 * and swap that into the ring buffer. 112 * 113 * We will be using cmpxchg soon to make all this lockless. 114 * 115 */ 116 117 /* 118 * A fast way to enable or disable all ring buffers is to 119 * call tracing_on or tracing_off. Turning off the ring buffers 120 * prevents all ring buffers from being recorded to. 121 * Turning this switch on, makes it OK to write to the 122 * ring buffer, if the ring buffer is enabled itself. 123 * 124 * There's three layers that must be on in order to write 125 * to the ring buffer. 126 * 127 * 1) This global flag must be set. 128 * 2) The ring buffer must be enabled for recording. 129 * 3) The per cpu buffer must be enabled for recording. 130 * 131 * In case of an anomaly, this global flag has a bit set that 132 * will permantly disable all ring buffers. 133 */ 134 135 /* 136 * Global flag to disable all recording to ring buffers 137 * This has two bits: ON, DISABLED 138 * 139 * ON DISABLED 140 * ---- ---------- 141 * 0 0 : ring buffers are off 142 * 1 0 : ring buffers are on 143 * X 1 : ring buffers are permanently disabled 144 */ 145 146 enum { 147 RB_BUFFERS_ON_BIT = 0, 148 RB_BUFFERS_DISABLED_BIT = 1, 149 }; 150 151 enum { 152 RB_BUFFERS_ON = 1 << RB_BUFFERS_ON_BIT, 153 RB_BUFFERS_DISABLED = 1 << RB_BUFFERS_DISABLED_BIT, 154 }; 155 156 static unsigned long ring_buffer_flags __read_mostly = RB_BUFFERS_ON; 157 158 #define BUF_PAGE_HDR_SIZE offsetof(struct buffer_data_page, data) 159 160 /** 161 * tracing_on - enable all tracing buffers 162 * 163 * This function enables all tracing buffers that may have been 164 * disabled with tracing_off. 165 */ 166 void tracing_on(void) 167 { 168 set_bit(RB_BUFFERS_ON_BIT, &ring_buffer_flags); 169 } 170 EXPORT_SYMBOL_GPL(tracing_on); 171 172 /** 173 * tracing_off - turn off all tracing buffers 174 * 175 * This function stops all tracing buffers from recording data. 176 * It does not disable any overhead the tracers themselves may 177 * be causing. This function simply causes all recording to 178 * the ring buffers to fail. 179 */ 180 void tracing_off(void) 181 { 182 clear_bit(RB_BUFFERS_ON_BIT, &ring_buffer_flags); 183 } 184 EXPORT_SYMBOL_GPL(tracing_off); 185 186 /** 187 * tracing_off_permanent - permanently disable ring buffers 188 * 189 * This function, once called, will disable all ring buffers 190 * permanently. 191 */ 192 void tracing_off_permanent(void) 193 { 194 set_bit(RB_BUFFERS_DISABLED_BIT, &ring_buffer_flags); 195 } 196 197 /** 198 * tracing_is_on - show state of ring buffers enabled 199 */ 200 int tracing_is_on(void) 201 { 202 return ring_buffer_flags == RB_BUFFERS_ON; 203 } 204 EXPORT_SYMBOL_GPL(tracing_is_on); 205 206 #define RB_EVNT_HDR_SIZE (offsetof(struct ring_buffer_event, array)) 207 #define RB_ALIGNMENT 4U 208 #define RB_MAX_SMALL_DATA (RB_ALIGNMENT * RINGBUF_TYPE_DATA_TYPE_LEN_MAX) 209 #define RB_EVNT_MIN_SIZE 8U /* two 32bit words */ 210 211 #if !defined(CONFIG_64BIT) || defined(CONFIG_HAVE_EFFICIENT_UNALIGNED_ACCESS) 212 # define RB_FORCE_8BYTE_ALIGNMENT 0 213 # define RB_ARCH_ALIGNMENT RB_ALIGNMENT 214 #else 215 # define RB_FORCE_8BYTE_ALIGNMENT 1 216 # define RB_ARCH_ALIGNMENT 8U 217 #endif 218 219 /* define RINGBUF_TYPE_DATA for 'case RINGBUF_TYPE_DATA:' */ 220 #define RINGBUF_TYPE_DATA 0 ... RINGBUF_TYPE_DATA_TYPE_LEN_MAX 221 222 enum { 223 RB_LEN_TIME_EXTEND = 8, 224 RB_LEN_TIME_STAMP = 16, 225 }; 226 227 #define skip_time_extend(event) \ 228 ((struct ring_buffer_event *)((char *)event + RB_LEN_TIME_EXTEND)) 229 230 static inline int rb_null_event(struct ring_buffer_event *event) 231 { 232 return event->type_len == RINGBUF_TYPE_PADDING && !event->time_delta; 233 } 234 235 static void rb_event_set_padding(struct ring_buffer_event *event) 236 { 237 /* padding has a NULL time_delta */ 238 event->type_len = RINGBUF_TYPE_PADDING; 239 event->time_delta = 0; 240 } 241 242 static unsigned 243 rb_event_data_length(struct ring_buffer_event *event) 244 { 245 unsigned length; 246 247 if (event->type_len) 248 length = event->type_len * RB_ALIGNMENT; 249 else 250 length = event->array[0]; 251 return length + RB_EVNT_HDR_SIZE; 252 } 253 254 /* 255 * Return the length of the given event. Will return 256 * the length of the time extend if the event is a 257 * time extend. 258 */ 259 static inline unsigned 260 rb_event_length(struct ring_buffer_event *event) 261 { 262 switch (event->type_len) { 263 case RINGBUF_TYPE_PADDING: 264 if (rb_null_event(event)) 265 /* undefined */ 266 return -1; 267 return event->array[0] + RB_EVNT_HDR_SIZE; 268 269 case RINGBUF_TYPE_TIME_EXTEND: 270 return RB_LEN_TIME_EXTEND; 271 272 case RINGBUF_TYPE_TIME_STAMP: 273 return RB_LEN_TIME_STAMP; 274 275 case RINGBUF_TYPE_DATA: 276 return rb_event_data_length(event); 277 default: 278 BUG(); 279 } 280 /* not hit */ 281 return 0; 282 } 283 284 /* 285 * Return total length of time extend and data, 286 * or just the event length for all other events. 287 */ 288 static inline unsigned 289 rb_event_ts_length(struct ring_buffer_event *event) 290 { 291 unsigned len = 0; 292 293 if (event->type_len == RINGBUF_TYPE_TIME_EXTEND) { 294 /* time extends include the data event after it */ 295 len = RB_LEN_TIME_EXTEND; 296 event = skip_time_extend(event); 297 } 298 return len + rb_event_length(event); 299 } 300 301 /** 302 * ring_buffer_event_length - return the length of the event 303 * @event: the event to get the length of 304 * 305 * Returns the size of the data load of a data event. 306 * If the event is something other than a data event, it 307 * returns the size of the event itself. With the exception 308 * of a TIME EXTEND, where it still returns the size of the 309 * data load of the data event after it. 310 */ 311 unsigned ring_buffer_event_length(struct ring_buffer_event *event) 312 { 313 unsigned length; 314 315 if (event->type_len == RINGBUF_TYPE_TIME_EXTEND) 316 event = skip_time_extend(event); 317 318 length = rb_event_length(event); 319 if (event->type_len > RINGBUF_TYPE_DATA_TYPE_LEN_MAX) 320 return length; 321 length -= RB_EVNT_HDR_SIZE; 322 if (length > RB_MAX_SMALL_DATA + sizeof(event->array[0])) 323 length -= sizeof(event->array[0]); 324 return length; 325 } 326 EXPORT_SYMBOL_GPL(ring_buffer_event_length); 327 328 /* inline for ring buffer fast paths */ 329 static void * 330 rb_event_data(struct ring_buffer_event *event) 331 { 332 if (event->type_len == RINGBUF_TYPE_TIME_EXTEND) 333 event = skip_time_extend(event); 334 BUG_ON(event->type_len > RINGBUF_TYPE_DATA_TYPE_LEN_MAX); 335 /* If length is in len field, then array[0] has the data */ 336 if (event->type_len) 337 return (void *)&event->array[0]; 338 /* Otherwise length is in array[0] and array[1] has the data */ 339 return (void *)&event->array[1]; 340 } 341 342 /** 343 * ring_buffer_event_data - return the data of the event 344 * @event: the event to get the data from 345 */ 346 void *ring_buffer_event_data(struct ring_buffer_event *event) 347 { 348 return rb_event_data(event); 349 } 350 EXPORT_SYMBOL_GPL(ring_buffer_event_data); 351 352 #define for_each_buffer_cpu(buffer, cpu) \ 353 for_each_cpu(cpu, buffer->cpumask) 354 355 #define TS_SHIFT 27 356 #define TS_MASK ((1ULL << TS_SHIFT) - 1) 357 #define TS_DELTA_TEST (~TS_MASK) 358 359 /* Flag when events were overwritten */ 360 #define RB_MISSED_EVENTS (1 << 31) 361 /* Missed count stored at end */ 362 #define RB_MISSED_STORED (1 << 30) 363 364 struct buffer_data_page { 365 u64 time_stamp; /* page time stamp */ 366 local_t commit; /* write committed index */ 367 unsigned char data[]; /* data of buffer page */ 368 }; 369 370 /* 371 * Note, the buffer_page list must be first. The buffer pages 372 * are allocated in cache lines, which means that each buffer 373 * page will be at the beginning of a cache line, and thus 374 * the least significant bits will be zero. We use this to 375 * add flags in the list struct pointers, to make the ring buffer 376 * lockless. 377 */ 378 struct buffer_page { 379 struct list_head list; /* list of buffer pages */ 380 local_t write; /* index for next write */ 381 unsigned read; /* index for next read */ 382 local_t entries; /* entries on this page */ 383 unsigned long real_end; /* real end of data */ 384 struct buffer_data_page *page; /* Actual data page */ 385 }; 386 387 /* 388 * The buffer page counters, write and entries, must be reset 389 * atomically when crossing page boundaries. To synchronize this 390 * update, two counters are inserted into the number. One is 391 * the actual counter for the write position or count on the page. 392 * 393 * The other is a counter of updaters. Before an update happens 394 * the update partition of the counter is incremented. This will 395 * allow the updater to update the counter atomically. 396 * 397 * The counter is 20 bits, and the state data is 12. 398 */ 399 #define RB_WRITE_MASK 0xfffff 400 #define RB_WRITE_INTCNT (1 << 20) 401 402 static void rb_init_page(struct buffer_data_page *bpage) 403 { 404 local_set(&bpage->commit, 0); 405 } 406 407 /** 408 * ring_buffer_page_len - the size of data on the page. 409 * @page: The page to read 410 * 411 * Returns the amount of data on the page, including buffer page header. 412 */ 413 size_t ring_buffer_page_len(void *page) 414 { 415 return local_read(&((struct buffer_data_page *)page)->commit) 416 + BUF_PAGE_HDR_SIZE; 417 } 418 419 /* 420 * Also stolen from mm/slob.c. Thanks to Mathieu Desnoyers for pointing 421 * this issue out. 422 */ 423 static void free_buffer_page(struct buffer_page *bpage) 424 { 425 free_page((unsigned long)bpage->page); 426 kfree(bpage); 427 } 428 429 /* 430 * We need to fit the time_stamp delta into 27 bits. 431 */ 432 static inline int test_time_stamp(u64 delta) 433 { 434 if (delta & TS_DELTA_TEST) 435 return 1; 436 return 0; 437 } 438 439 #define BUF_PAGE_SIZE (PAGE_SIZE - BUF_PAGE_HDR_SIZE) 440 441 /* Max payload is BUF_PAGE_SIZE - header (8bytes) */ 442 #define BUF_MAX_DATA_SIZE (BUF_PAGE_SIZE - (sizeof(u32) * 2)) 443 444 int ring_buffer_print_page_header(struct trace_seq *s) 445 { 446 struct buffer_data_page field; 447 int ret; 448 449 ret = trace_seq_printf(s, "\tfield: u64 timestamp;\t" 450 "offset:0;\tsize:%u;\tsigned:%u;\n", 451 (unsigned int)sizeof(field.time_stamp), 452 (unsigned int)is_signed_type(u64)); 453 454 ret = trace_seq_printf(s, "\tfield: local_t commit;\t" 455 "offset:%u;\tsize:%u;\tsigned:%u;\n", 456 (unsigned int)offsetof(typeof(field), commit), 457 (unsigned int)sizeof(field.commit), 458 (unsigned int)is_signed_type(long)); 459 460 ret = trace_seq_printf(s, "\tfield: int overwrite;\t" 461 "offset:%u;\tsize:%u;\tsigned:%u;\n", 462 (unsigned int)offsetof(typeof(field), commit), 463 1, 464 (unsigned int)is_signed_type(long)); 465 466 ret = trace_seq_printf(s, "\tfield: char data;\t" 467 "offset:%u;\tsize:%u;\tsigned:%u;\n", 468 (unsigned int)offsetof(typeof(field), data), 469 (unsigned int)BUF_PAGE_SIZE, 470 (unsigned int)is_signed_type(char)); 471 472 return ret; 473 } 474 475 /* 476 * head_page == tail_page && head == tail then buffer is empty. 477 */ 478 struct ring_buffer_per_cpu { 479 int cpu; 480 atomic_t record_disabled; 481 struct ring_buffer *buffer; 482 spinlock_t reader_lock; /* serialize readers */ 483 arch_spinlock_t lock; 484 struct lock_class_key lock_key; 485 struct list_head *pages; 486 struct buffer_page *head_page; /* read from head */ 487 struct buffer_page *tail_page; /* write to tail */ 488 struct buffer_page *commit_page; /* committed pages */ 489 struct buffer_page *reader_page; 490 unsigned long lost_events; 491 unsigned long last_overrun; 492 local_t commit_overrun; 493 local_t overrun; 494 local_t entries; 495 local_t committing; 496 local_t commits; 497 unsigned long read; 498 u64 write_stamp; 499 u64 read_stamp; 500 }; 501 502 struct ring_buffer { 503 unsigned pages; 504 unsigned flags; 505 int cpus; 506 atomic_t record_disabled; 507 cpumask_var_t cpumask; 508 509 struct lock_class_key *reader_lock_key; 510 511 struct mutex mutex; 512 513 struct ring_buffer_per_cpu **buffers; 514 515 #ifdef CONFIG_HOTPLUG_CPU 516 struct notifier_block cpu_notify; 517 #endif 518 u64 (*clock)(void); 519 }; 520 521 struct ring_buffer_iter { 522 struct ring_buffer_per_cpu *cpu_buffer; 523 unsigned long head; 524 struct buffer_page *head_page; 525 struct buffer_page *cache_reader_page; 526 unsigned long cache_read; 527 u64 read_stamp; 528 }; 529 530 /* buffer may be either ring_buffer or ring_buffer_per_cpu */ 531 #define RB_WARN_ON(b, cond) \ 532 ({ \ 533 int _____ret = unlikely(cond); \ 534 if (_____ret) { \ 535 if (__same_type(*(b), struct ring_buffer_per_cpu)) { \ 536 struct ring_buffer_per_cpu *__b = \ 537 (void *)b; \ 538 atomic_inc(&__b->buffer->record_disabled); \ 539 } else \ 540 atomic_inc(&b->record_disabled); \ 541 WARN_ON(1); \ 542 } \ 543 _____ret; \ 544 }) 545 546 /* Up this if you want to test the TIME_EXTENTS and normalization */ 547 #define DEBUG_SHIFT 0 548 549 static inline u64 rb_time_stamp(struct ring_buffer *buffer) 550 { 551 /* shift to debug/test normalization and TIME_EXTENTS */ 552 return buffer->clock() << DEBUG_SHIFT; 553 } 554 555 u64 ring_buffer_time_stamp(struct ring_buffer *buffer, int cpu) 556 { 557 u64 time; 558 559 preempt_disable_notrace(); 560 time = rb_time_stamp(buffer); 561 preempt_enable_no_resched_notrace(); 562 563 return time; 564 } 565 EXPORT_SYMBOL_GPL(ring_buffer_time_stamp); 566 567 void ring_buffer_normalize_time_stamp(struct ring_buffer *buffer, 568 int cpu, u64 *ts) 569 { 570 /* Just stupid testing the normalize function and deltas */ 571 *ts >>= DEBUG_SHIFT; 572 } 573 EXPORT_SYMBOL_GPL(ring_buffer_normalize_time_stamp); 574 575 /* 576 * Making the ring buffer lockless makes things tricky. 577 * Although writes only happen on the CPU that they are on, 578 * and they only need to worry about interrupts. Reads can 579 * happen on any CPU. 580 * 581 * The reader page is always off the ring buffer, but when the 582 * reader finishes with a page, it needs to swap its page with 583 * a new one from the buffer. The reader needs to take from 584 * the head (writes go to the tail). But if a writer is in overwrite 585 * mode and wraps, it must push the head page forward. 586 * 587 * Here lies the problem. 588 * 589 * The reader must be careful to replace only the head page, and 590 * not another one. As described at the top of the file in the 591 * ASCII art, the reader sets its old page to point to the next 592 * page after head. It then sets the page after head to point to 593 * the old reader page. But if the writer moves the head page 594 * during this operation, the reader could end up with the tail. 595 * 596 * We use cmpxchg to help prevent this race. We also do something 597 * special with the page before head. We set the LSB to 1. 598 * 599 * When the writer must push the page forward, it will clear the 600 * bit that points to the head page, move the head, and then set 601 * the bit that points to the new head page. 602 * 603 * We also don't want an interrupt coming in and moving the head 604 * page on another writer. Thus we use the second LSB to catch 605 * that too. Thus: 606 * 607 * head->list->prev->next bit 1 bit 0 608 * ------- ------- 609 * Normal page 0 0 610 * Points to head page 0 1 611 * New head page 1 0 612 * 613 * Note we can not trust the prev pointer of the head page, because: 614 * 615 * +----+ +-----+ +-----+ 616 * | |------>| T |---X--->| N | 617 * | |<------| | | | 618 * +----+ +-----+ +-----+ 619 * ^ ^ | 620 * | +-----+ | | 621 * +----------| R |----------+ | 622 * | |<-----------+ 623 * +-----+ 624 * 625 * Key: ---X--> HEAD flag set in pointer 626 * T Tail page 627 * R Reader page 628 * N Next page 629 * 630 * (see __rb_reserve_next() to see where this happens) 631 * 632 * What the above shows is that the reader just swapped out 633 * the reader page with a page in the buffer, but before it 634 * could make the new header point back to the new page added 635 * it was preempted by a writer. The writer moved forward onto 636 * the new page added by the reader and is about to move forward 637 * again. 638 * 639 * You can see, it is legitimate for the previous pointer of 640 * the head (or any page) not to point back to itself. But only 641 * temporarially. 642 */ 643 644 #define RB_PAGE_NORMAL 0UL 645 #define RB_PAGE_HEAD 1UL 646 #define RB_PAGE_UPDATE 2UL 647 648 649 #define RB_FLAG_MASK 3UL 650 651 /* PAGE_MOVED is not part of the mask */ 652 #define RB_PAGE_MOVED 4UL 653 654 /* 655 * rb_list_head - remove any bit 656 */ 657 static struct list_head *rb_list_head(struct list_head *list) 658 { 659 unsigned long val = (unsigned long)list; 660 661 return (struct list_head *)(val & ~RB_FLAG_MASK); 662 } 663 664 /* 665 * rb_is_head_page - test if the given page is the head page 666 * 667 * Because the reader may move the head_page pointer, we can 668 * not trust what the head page is (it may be pointing to 669 * the reader page). But if the next page is a header page, 670 * its flags will be non zero. 671 */ 672 static int inline 673 rb_is_head_page(struct ring_buffer_per_cpu *cpu_buffer, 674 struct buffer_page *page, struct list_head *list) 675 { 676 unsigned long val; 677 678 val = (unsigned long)list->next; 679 680 if ((val & ~RB_FLAG_MASK) != (unsigned long)&page->list) 681 return RB_PAGE_MOVED; 682 683 return val & RB_FLAG_MASK; 684 } 685 686 /* 687 * rb_is_reader_page 688 * 689 * The unique thing about the reader page, is that, if the 690 * writer is ever on it, the previous pointer never points 691 * back to the reader page. 692 */ 693 static int rb_is_reader_page(struct buffer_page *page) 694 { 695 struct list_head *list = page->list.prev; 696 697 return rb_list_head(list->next) != &page->list; 698 } 699 700 /* 701 * rb_set_list_to_head - set a list_head to be pointing to head. 702 */ 703 static void rb_set_list_to_head(struct ring_buffer_per_cpu *cpu_buffer, 704 struct list_head *list) 705 { 706 unsigned long *ptr; 707 708 ptr = (unsigned long *)&list->next; 709 *ptr |= RB_PAGE_HEAD; 710 *ptr &= ~RB_PAGE_UPDATE; 711 } 712 713 /* 714 * rb_head_page_activate - sets up head page 715 */ 716 static void rb_head_page_activate(struct ring_buffer_per_cpu *cpu_buffer) 717 { 718 struct buffer_page *head; 719 720 head = cpu_buffer->head_page; 721 if (!head) 722 return; 723 724 /* 725 * Set the previous list pointer to have the HEAD flag. 726 */ 727 rb_set_list_to_head(cpu_buffer, head->list.prev); 728 } 729 730 static void rb_list_head_clear(struct list_head *list) 731 { 732 unsigned long *ptr = (unsigned long *)&list->next; 733 734 *ptr &= ~RB_FLAG_MASK; 735 } 736 737 /* 738 * rb_head_page_dactivate - clears head page ptr (for free list) 739 */ 740 static void 741 rb_head_page_deactivate(struct ring_buffer_per_cpu *cpu_buffer) 742 { 743 struct list_head *hd; 744 745 /* Go through the whole list and clear any pointers found. */ 746 rb_list_head_clear(cpu_buffer->pages); 747 748 list_for_each(hd, cpu_buffer->pages) 749 rb_list_head_clear(hd); 750 } 751 752 static int rb_head_page_set(struct ring_buffer_per_cpu *cpu_buffer, 753 struct buffer_page *head, 754 struct buffer_page *prev, 755 int old_flag, int new_flag) 756 { 757 struct list_head *list; 758 unsigned long val = (unsigned long)&head->list; 759 unsigned long ret; 760 761 list = &prev->list; 762 763 val &= ~RB_FLAG_MASK; 764 765 ret = cmpxchg((unsigned long *)&list->next, 766 val | old_flag, val | new_flag); 767 768 /* check if the reader took the page */ 769 if ((ret & ~RB_FLAG_MASK) != val) 770 return RB_PAGE_MOVED; 771 772 return ret & RB_FLAG_MASK; 773 } 774 775 static int rb_head_page_set_update(struct ring_buffer_per_cpu *cpu_buffer, 776 struct buffer_page *head, 777 struct buffer_page *prev, 778 int old_flag) 779 { 780 return rb_head_page_set(cpu_buffer, head, prev, 781 old_flag, RB_PAGE_UPDATE); 782 } 783 784 static int rb_head_page_set_head(struct ring_buffer_per_cpu *cpu_buffer, 785 struct buffer_page *head, 786 struct buffer_page *prev, 787 int old_flag) 788 { 789 return rb_head_page_set(cpu_buffer, head, prev, 790 old_flag, RB_PAGE_HEAD); 791 } 792 793 static int rb_head_page_set_normal(struct ring_buffer_per_cpu *cpu_buffer, 794 struct buffer_page *head, 795 struct buffer_page *prev, 796 int old_flag) 797 { 798 return rb_head_page_set(cpu_buffer, head, prev, 799 old_flag, RB_PAGE_NORMAL); 800 } 801 802 static inline void rb_inc_page(struct ring_buffer_per_cpu *cpu_buffer, 803 struct buffer_page **bpage) 804 { 805 struct list_head *p = rb_list_head((*bpage)->list.next); 806 807 *bpage = list_entry(p, struct buffer_page, list); 808 } 809 810 static struct buffer_page * 811 rb_set_head_page(struct ring_buffer_per_cpu *cpu_buffer) 812 { 813 struct buffer_page *head; 814 struct buffer_page *page; 815 struct list_head *list; 816 int i; 817 818 if (RB_WARN_ON(cpu_buffer, !cpu_buffer->head_page)) 819 return NULL; 820 821 /* sanity check */ 822 list = cpu_buffer->pages; 823 if (RB_WARN_ON(cpu_buffer, rb_list_head(list->prev->next) != list)) 824 return NULL; 825 826 page = head = cpu_buffer->head_page; 827 /* 828 * It is possible that the writer moves the header behind 829 * where we started, and we miss in one loop. 830 * A second loop should grab the header, but we'll do 831 * three loops just because I'm paranoid. 832 */ 833 for (i = 0; i < 3; i++) { 834 do { 835 if (rb_is_head_page(cpu_buffer, page, page->list.prev)) { 836 cpu_buffer->head_page = page; 837 return page; 838 } 839 rb_inc_page(cpu_buffer, &page); 840 } while (page != head); 841 } 842 843 RB_WARN_ON(cpu_buffer, 1); 844 845 return NULL; 846 } 847 848 static int rb_head_page_replace(struct buffer_page *old, 849 struct buffer_page *new) 850 { 851 unsigned long *ptr = (unsigned long *)&old->list.prev->next; 852 unsigned long val; 853 unsigned long ret; 854 855 val = *ptr & ~RB_FLAG_MASK; 856 val |= RB_PAGE_HEAD; 857 858 ret = cmpxchg(ptr, val, (unsigned long)&new->list); 859 860 return ret == val; 861 } 862 863 /* 864 * rb_tail_page_update - move the tail page forward 865 * 866 * Returns 1 if moved tail page, 0 if someone else did. 867 */ 868 static int rb_tail_page_update(struct ring_buffer_per_cpu *cpu_buffer, 869 struct buffer_page *tail_page, 870 struct buffer_page *next_page) 871 { 872 struct buffer_page *old_tail; 873 unsigned long old_entries; 874 unsigned long old_write; 875 int ret = 0; 876 877 /* 878 * The tail page now needs to be moved forward. 879 * 880 * We need to reset the tail page, but without messing 881 * with possible erasing of data brought in by interrupts 882 * that have moved the tail page and are currently on it. 883 * 884 * We add a counter to the write field to denote this. 885 */ 886 old_write = local_add_return(RB_WRITE_INTCNT, &next_page->write); 887 old_entries = local_add_return(RB_WRITE_INTCNT, &next_page->entries); 888 889 /* 890 * Just make sure we have seen our old_write and synchronize 891 * with any interrupts that come in. 892 */ 893 barrier(); 894 895 /* 896 * If the tail page is still the same as what we think 897 * it is, then it is up to us to update the tail 898 * pointer. 899 */ 900 if (tail_page == cpu_buffer->tail_page) { 901 /* Zero the write counter */ 902 unsigned long val = old_write & ~RB_WRITE_MASK; 903 unsigned long eval = old_entries & ~RB_WRITE_MASK; 904 905 /* 906 * This will only succeed if an interrupt did 907 * not come in and change it. In which case, we 908 * do not want to modify it. 909 * 910 * We add (void) to let the compiler know that we do not care 911 * about the return value of these functions. We use the 912 * cmpxchg to only update if an interrupt did not already 913 * do it for us. If the cmpxchg fails, we don't care. 914 */ 915 (void)local_cmpxchg(&next_page->write, old_write, val); 916 (void)local_cmpxchg(&next_page->entries, old_entries, eval); 917 918 /* 919 * No need to worry about races with clearing out the commit. 920 * it only can increment when a commit takes place. But that 921 * only happens in the outer most nested commit. 922 */ 923 local_set(&next_page->page->commit, 0); 924 925 old_tail = cmpxchg(&cpu_buffer->tail_page, 926 tail_page, next_page); 927 928 if (old_tail == tail_page) 929 ret = 1; 930 } 931 932 return ret; 933 } 934 935 static int rb_check_bpage(struct ring_buffer_per_cpu *cpu_buffer, 936 struct buffer_page *bpage) 937 { 938 unsigned long val = (unsigned long)bpage; 939 940 if (RB_WARN_ON(cpu_buffer, val & RB_FLAG_MASK)) 941 return 1; 942 943 return 0; 944 } 945 946 /** 947 * rb_check_list - make sure a pointer to a list has the last bits zero 948 */ 949 static int rb_check_list(struct ring_buffer_per_cpu *cpu_buffer, 950 struct list_head *list) 951 { 952 if (RB_WARN_ON(cpu_buffer, rb_list_head(list->prev) != list->prev)) 953 return 1; 954 if (RB_WARN_ON(cpu_buffer, rb_list_head(list->next) != list->next)) 955 return 1; 956 return 0; 957 } 958 959 /** 960 * check_pages - integrity check of buffer pages 961 * @cpu_buffer: CPU buffer with pages to test 962 * 963 * As a safety measure we check to make sure the data pages have not 964 * been corrupted. 965 */ 966 static int rb_check_pages(struct ring_buffer_per_cpu *cpu_buffer) 967 { 968 struct list_head *head = cpu_buffer->pages; 969 struct buffer_page *bpage, *tmp; 970 971 rb_head_page_deactivate(cpu_buffer); 972 973 if (RB_WARN_ON(cpu_buffer, head->next->prev != head)) 974 return -1; 975 if (RB_WARN_ON(cpu_buffer, head->prev->next != head)) 976 return -1; 977 978 if (rb_check_list(cpu_buffer, head)) 979 return -1; 980 981 list_for_each_entry_safe(bpage, tmp, head, list) { 982 if (RB_WARN_ON(cpu_buffer, 983 bpage->list.next->prev != &bpage->list)) 984 return -1; 985 if (RB_WARN_ON(cpu_buffer, 986 bpage->list.prev->next != &bpage->list)) 987 return -1; 988 if (rb_check_list(cpu_buffer, &bpage->list)) 989 return -1; 990 } 991 992 rb_head_page_activate(cpu_buffer); 993 994 return 0; 995 } 996 997 static int rb_allocate_pages(struct ring_buffer_per_cpu *cpu_buffer, 998 unsigned nr_pages) 999 { 1000 struct buffer_page *bpage, *tmp; 1001 unsigned long addr; 1002 LIST_HEAD(pages); 1003 unsigned i; 1004 1005 WARN_ON(!nr_pages); 1006 1007 for (i = 0; i < nr_pages; i++) { 1008 bpage = kzalloc_node(ALIGN(sizeof(*bpage), cache_line_size()), 1009 GFP_KERNEL, cpu_to_node(cpu_buffer->cpu)); 1010 if (!bpage) 1011 goto free_pages; 1012 1013 rb_check_bpage(cpu_buffer, bpage); 1014 1015 list_add(&bpage->list, &pages); 1016 1017 addr = __get_free_page(GFP_KERNEL); 1018 if (!addr) 1019 goto free_pages; 1020 bpage->page = (void *)addr; 1021 rb_init_page(bpage->page); 1022 } 1023 1024 /* 1025 * The ring buffer page list is a circular list that does not 1026 * start and end with a list head. All page list items point to 1027 * other pages. 1028 */ 1029 cpu_buffer->pages = pages.next; 1030 list_del(&pages); 1031 1032 rb_check_pages(cpu_buffer); 1033 1034 return 0; 1035 1036 free_pages: 1037 list_for_each_entry_safe(bpage, tmp, &pages, list) { 1038 list_del_init(&bpage->list); 1039 free_buffer_page(bpage); 1040 } 1041 return -ENOMEM; 1042 } 1043 1044 static struct ring_buffer_per_cpu * 1045 rb_allocate_cpu_buffer(struct ring_buffer *buffer, int cpu) 1046 { 1047 struct ring_buffer_per_cpu *cpu_buffer; 1048 struct buffer_page *bpage; 1049 unsigned long addr; 1050 int ret; 1051 1052 cpu_buffer = kzalloc_node(ALIGN(sizeof(*cpu_buffer), cache_line_size()), 1053 GFP_KERNEL, cpu_to_node(cpu)); 1054 if (!cpu_buffer) 1055 return NULL; 1056 1057 cpu_buffer->cpu = cpu; 1058 cpu_buffer->buffer = buffer; 1059 spin_lock_init(&cpu_buffer->reader_lock); 1060 lockdep_set_class(&cpu_buffer->reader_lock, buffer->reader_lock_key); 1061 cpu_buffer->lock = (arch_spinlock_t)__ARCH_SPIN_LOCK_UNLOCKED; 1062 1063 bpage = kzalloc_node(ALIGN(sizeof(*bpage), cache_line_size()), 1064 GFP_KERNEL, cpu_to_node(cpu)); 1065 if (!bpage) 1066 goto fail_free_buffer; 1067 1068 rb_check_bpage(cpu_buffer, bpage); 1069 1070 cpu_buffer->reader_page = bpage; 1071 addr = __get_free_page(GFP_KERNEL); 1072 if (!addr) 1073 goto fail_free_reader; 1074 bpage->page = (void *)addr; 1075 rb_init_page(bpage->page); 1076 1077 INIT_LIST_HEAD(&cpu_buffer->reader_page->list); 1078 1079 ret = rb_allocate_pages(cpu_buffer, buffer->pages); 1080 if (ret < 0) 1081 goto fail_free_reader; 1082 1083 cpu_buffer->head_page 1084 = list_entry(cpu_buffer->pages, struct buffer_page, list); 1085 cpu_buffer->tail_page = cpu_buffer->commit_page = cpu_buffer->head_page; 1086 1087 rb_head_page_activate(cpu_buffer); 1088 1089 return cpu_buffer; 1090 1091 fail_free_reader: 1092 free_buffer_page(cpu_buffer->reader_page); 1093 1094 fail_free_buffer: 1095 kfree(cpu_buffer); 1096 return NULL; 1097 } 1098 1099 static void rb_free_cpu_buffer(struct ring_buffer_per_cpu *cpu_buffer) 1100 { 1101 struct list_head *head = cpu_buffer->pages; 1102 struct buffer_page *bpage, *tmp; 1103 1104 free_buffer_page(cpu_buffer->reader_page); 1105 1106 rb_head_page_deactivate(cpu_buffer); 1107 1108 if (head) { 1109 list_for_each_entry_safe(bpage, tmp, head, list) { 1110 list_del_init(&bpage->list); 1111 free_buffer_page(bpage); 1112 } 1113 bpage = list_entry(head, struct buffer_page, list); 1114 free_buffer_page(bpage); 1115 } 1116 1117 kfree(cpu_buffer); 1118 } 1119 1120 #ifdef CONFIG_HOTPLUG_CPU 1121 static int rb_cpu_notify(struct notifier_block *self, 1122 unsigned long action, void *hcpu); 1123 #endif 1124 1125 /** 1126 * ring_buffer_alloc - allocate a new ring_buffer 1127 * @size: the size in bytes per cpu that is needed. 1128 * @flags: attributes to set for the ring buffer. 1129 * 1130 * Currently the only flag that is available is the RB_FL_OVERWRITE 1131 * flag. This flag means that the buffer will overwrite old data 1132 * when the buffer wraps. If this flag is not set, the buffer will 1133 * drop data when the tail hits the head. 1134 */ 1135 struct ring_buffer *__ring_buffer_alloc(unsigned long size, unsigned flags, 1136 struct lock_class_key *key) 1137 { 1138 struct ring_buffer *buffer; 1139 int bsize; 1140 int cpu; 1141 1142 /* keep it in its own cache line */ 1143 buffer = kzalloc(ALIGN(sizeof(*buffer), cache_line_size()), 1144 GFP_KERNEL); 1145 if (!buffer) 1146 return NULL; 1147 1148 if (!alloc_cpumask_var(&buffer->cpumask, GFP_KERNEL)) 1149 goto fail_free_buffer; 1150 1151 buffer->pages = DIV_ROUND_UP(size, BUF_PAGE_SIZE); 1152 buffer->flags = flags; 1153 buffer->clock = trace_clock_local; 1154 buffer->reader_lock_key = key; 1155 1156 /* need at least two pages */ 1157 if (buffer->pages < 2) 1158 buffer->pages = 2; 1159 1160 /* 1161 * In case of non-hotplug cpu, if the ring-buffer is allocated 1162 * in early initcall, it will not be notified of secondary cpus. 1163 * In that off case, we need to allocate for all possible cpus. 1164 */ 1165 #ifdef CONFIG_HOTPLUG_CPU 1166 get_online_cpus(); 1167 cpumask_copy(buffer->cpumask, cpu_online_mask); 1168 #else 1169 cpumask_copy(buffer->cpumask, cpu_possible_mask); 1170 #endif 1171 buffer->cpus = nr_cpu_ids; 1172 1173 bsize = sizeof(void *) * nr_cpu_ids; 1174 buffer->buffers = kzalloc(ALIGN(bsize, cache_line_size()), 1175 GFP_KERNEL); 1176 if (!buffer->buffers) 1177 goto fail_free_cpumask; 1178 1179 for_each_buffer_cpu(buffer, cpu) { 1180 buffer->buffers[cpu] = 1181 rb_allocate_cpu_buffer(buffer, cpu); 1182 if (!buffer->buffers[cpu]) 1183 goto fail_free_buffers; 1184 } 1185 1186 #ifdef CONFIG_HOTPLUG_CPU 1187 buffer->cpu_notify.notifier_call = rb_cpu_notify; 1188 buffer->cpu_notify.priority = 0; 1189 register_cpu_notifier(&buffer->cpu_notify); 1190 #endif 1191 1192 put_online_cpus(); 1193 mutex_init(&buffer->mutex); 1194 1195 return buffer; 1196 1197 fail_free_buffers: 1198 for_each_buffer_cpu(buffer, cpu) { 1199 if (buffer->buffers[cpu]) 1200 rb_free_cpu_buffer(buffer->buffers[cpu]); 1201 } 1202 kfree(buffer->buffers); 1203 1204 fail_free_cpumask: 1205 free_cpumask_var(buffer->cpumask); 1206 put_online_cpus(); 1207 1208 fail_free_buffer: 1209 kfree(buffer); 1210 return NULL; 1211 } 1212 EXPORT_SYMBOL_GPL(__ring_buffer_alloc); 1213 1214 /** 1215 * ring_buffer_free - free a ring buffer. 1216 * @buffer: the buffer to free. 1217 */ 1218 void 1219 ring_buffer_free(struct ring_buffer *buffer) 1220 { 1221 int cpu; 1222 1223 get_online_cpus(); 1224 1225 #ifdef CONFIG_HOTPLUG_CPU 1226 unregister_cpu_notifier(&buffer->cpu_notify); 1227 #endif 1228 1229 for_each_buffer_cpu(buffer, cpu) 1230 rb_free_cpu_buffer(buffer->buffers[cpu]); 1231 1232 put_online_cpus(); 1233 1234 kfree(buffer->buffers); 1235 free_cpumask_var(buffer->cpumask); 1236 1237 kfree(buffer); 1238 } 1239 EXPORT_SYMBOL_GPL(ring_buffer_free); 1240 1241 void ring_buffer_set_clock(struct ring_buffer *buffer, 1242 u64 (*clock)(void)) 1243 { 1244 buffer->clock = clock; 1245 } 1246 1247 static void rb_reset_cpu(struct ring_buffer_per_cpu *cpu_buffer); 1248 1249 static void 1250 rb_remove_pages(struct ring_buffer_per_cpu *cpu_buffer, unsigned nr_pages) 1251 { 1252 struct buffer_page *bpage; 1253 struct list_head *p; 1254 unsigned i; 1255 1256 spin_lock_irq(&cpu_buffer->reader_lock); 1257 rb_head_page_deactivate(cpu_buffer); 1258 1259 for (i = 0; i < nr_pages; i++) { 1260 if (RB_WARN_ON(cpu_buffer, list_empty(cpu_buffer->pages))) 1261 goto out; 1262 p = cpu_buffer->pages->next; 1263 bpage = list_entry(p, struct buffer_page, list); 1264 list_del_init(&bpage->list); 1265 free_buffer_page(bpage); 1266 } 1267 if (RB_WARN_ON(cpu_buffer, list_empty(cpu_buffer->pages))) 1268 goto out; 1269 1270 rb_reset_cpu(cpu_buffer); 1271 rb_check_pages(cpu_buffer); 1272 1273 out: 1274 spin_unlock_irq(&cpu_buffer->reader_lock); 1275 } 1276 1277 static void 1278 rb_insert_pages(struct ring_buffer_per_cpu *cpu_buffer, 1279 struct list_head *pages, unsigned nr_pages) 1280 { 1281 struct buffer_page *bpage; 1282 struct list_head *p; 1283 unsigned i; 1284 1285 spin_lock_irq(&cpu_buffer->reader_lock); 1286 rb_head_page_deactivate(cpu_buffer); 1287 1288 for (i = 0; i < nr_pages; i++) { 1289 if (RB_WARN_ON(cpu_buffer, list_empty(pages))) 1290 goto out; 1291 p = pages->next; 1292 bpage = list_entry(p, struct buffer_page, list); 1293 list_del_init(&bpage->list); 1294 list_add_tail(&bpage->list, cpu_buffer->pages); 1295 } 1296 rb_reset_cpu(cpu_buffer); 1297 rb_check_pages(cpu_buffer); 1298 1299 out: 1300 spin_unlock_irq(&cpu_buffer->reader_lock); 1301 } 1302 1303 /** 1304 * ring_buffer_resize - resize the ring buffer 1305 * @buffer: the buffer to resize. 1306 * @size: the new size. 1307 * 1308 * Minimum size is 2 * BUF_PAGE_SIZE. 1309 * 1310 * Returns -1 on failure. 1311 */ 1312 int ring_buffer_resize(struct ring_buffer *buffer, unsigned long size) 1313 { 1314 struct ring_buffer_per_cpu *cpu_buffer; 1315 unsigned nr_pages, rm_pages, new_pages; 1316 struct buffer_page *bpage, *tmp; 1317 unsigned long buffer_size; 1318 unsigned long addr; 1319 LIST_HEAD(pages); 1320 int i, cpu; 1321 1322 /* 1323 * Always succeed at resizing a non-existent buffer: 1324 */ 1325 if (!buffer) 1326 return size; 1327 1328 size = DIV_ROUND_UP(size, BUF_PAGE_SIZE); 1329 size *= BUF_PAGE_SIZE; 1330 buffer_size = buffer->pages * BUF_PAGE_SIZE; 1331 1332 /* we need a minimum of two pages */ 1333 if (size < BUF_PAGE_SIZE * 2) 1334 size = BUF_PAGE_SIZE * 2; 1335 1336 if (size == buffer_size) 1337 return size; 1338 1339 atomic_inc(&buffer->record_disabled); 1340 1341 /* Make sure all writers are done with this buffer. */ 1342 synchronize_sched(); 1343 1344 mutex_lock(&buffer->mutex); 1345 get_online_cpus(); 1346 1347 nr_pages = DIV_ROUND_UP(size, BUF_PAGE_SIZE); 1348 1349 if (size < buffer_size) { 1350 1351 /* easy case, just free pages */ 1352 if (RB_WARN_ON(buffer, nr_pages >= buffer->pages)) 1353 goto out_fail; 1354 1355 rm_pages = buffer->pages - nr_pages; 1356 1357 for_each_buffer_cpu(buffer, cpu) { 1358 cpu_buffer = buffer->buffers[cpu]; 1359 rb_remove_pages(cpu_buffer, rm_pages); 1360 } 1361 goto out; 1362 } 1363 1364 /* 1365 * This is a bit more difficult. We only want to add pages 1366 * when we can allocate enough for all CPUs. We do this 1367 * by allocating all the pages and storing them on a local 1368 * link list. If we succeed in our allocation, then we 1369 * add these pages to the cpu_buffers. Otherwise we just free 1370 * them all and return -ENOMEM; 1371 */ 1372 if (RB_WARN_ON(buffer, nr_pages <= buffer->pages)) 1373 goto out_fail; 1374 1375 new_pages = nr_pages - buffer->pages; 1376 1377 for_each_buffer_cpu(buffer, cpu) { 1378 for (i = 0; i < new_pages; i++) { 1379 bpage = kzalloc_node(ALIGN(sizeof(*bpage), 1380 cache_line_size()), 1381 GFP_KERNEL, cpu_to_node(cpu)); 1382 if (!bpage) 1383 goto free_pages; 1384 list_add(&bpage->list, &pages); 1385 addr = __get_free_page(GFP_KERNEL); 1386 if (!addr) 1387 goto free_pages; 1388 bpage->page = (void *)addr; 1389 rb_init_page(bpage->page); 1390 } 1391 } 1392 1393 for_each_buffer_cpu(buffer, cpu) { 1394 cpu_buffer = buffer->buffers[cpu]; 1395 rb_insert_pages(cpu_buffer, &pages, new_pages); 1396 } 1397 1398 if (RB_WARN_ON(buffer, !list_empty(&pages))) 1399 goto out_fail; 1400 1401 out: 1402 buffer->pages = nr_pages; 1403 put_online_cpus(); 1404 mutex_unlock(&buffer->mutex); 1405 1406 atomic_dec(&buffer->record_disabled); 1407 1408 return size; 1409 1410 free_pages: 1411 list_for_each_entry_safe(bpage, tmp, &pages, list) { 1412 list_del_init(&bpage->list); 1413 free_buffer_page(bpage); 1414 } 1415 put_online_cpus(); 1416 mutex_unlock(&buffer->mutex); 1417 atomic_dec(&buffer->record_disabled); 1418 return -ENOMEM; 1419 1420 /* 1421 * Something went totally wrong, and we are too paranoid 1422 * to even clean up the mess. 1423 */ 1424 out_fail: 1425 put_online_cpus(); 1426 mutex_unlock(&buffer->mutex); 1427 atomic_dec(&buffer->record_disabled); 1428 return -1; 1429 } 1430 EXPORT_SYMBOL_GPL(ring_buffer_resize); 1431 1432 void ring_buffer_change_overwrite(struct ring_buffer *buffer, int val) 1433 { 1434 mutex_lock(&buffer->mutex); 1435 if (val) 1436 buffer->flags |= RB_FL_OVERWRITE; 1437 else 1438 buffer->flags &= ~RB_FL_OVERWRITE; 1439 mutex_unlock(&buffer->mutex); 1440 } 1441 EXPORT_SYMBOL_GPL(ring_buffer_change_overwrite); 1442 1443 static inline void * 1444 __rb_data_page_index(struct buffer_data_page *bpage, unsigned index) 1445 { 1446 return bpage->data + index; 1447 } 1448 1449 static inline void *__rb_page_index(struct buffer_page *bpage, unsigned index) 1450 { 1451 return bpage->page->data + index; 1452 } 1453 1454 static inline struct ring_buffer_event * 1455 rb_reader_event(struct ring_buffer_per_cpu *cpu_buffer) 1456 { 1457 return __rb_page_index(cpu_buffer->reader_page, 1458 cpu_buffer->reader_page->read); 1459 } 1460 1461 static inline struct ring_buffer_event * 1462 rb_iter_head_event(struct ring_buffer_iter *iter) 1463 { 1464 return __rb_page_index(iter->head_page, iter->head); 1465 } 1466 1467 static inline unsigned long rb_page_write(struct buffer_page *bpage) 1468 { 1469 return local_read(&bpage->write) & RB_WRITE_MASK; 1470 } 1471 1472 static inline unsigned rb_page_commit(struct buffer_page *bpage) 1473 { 1474 return local_read(&bpage->page->commit); 1475 } 1476 1477 static inline unsigned long rb_page_entries(struct buffer_page *bpage) 1478 { 1479 return local_read(&bpage->entries) & RB_WRITE_MASK; 1480 } 1481 1482 /* Size is determined by what has been commited */ 1483 static inline unsigned rb_page_size(struct buffer_page *bpage) 1484 { 1485 return rb_page_commit(bpage); 1486 } 1487 1488 static inline unsigned 1489 rb_commit_index(struct ring_buffer_per_cpu *cpu_buffer) 1490 { 1491 return rb_page_commit(cpu_buffer->commit_page); 1492 } 1493 1494 static inline unsigned 1495 rb_event_index(struct ring_buffer_event *event) 1496 { 1497 unsigned long addr = (unsigned long)event; 1498 1499 return (addr & ~PAGE_MASK) - BUF_PAGE_HDR_SIZE; 1500 } 1501 1502 static inline int 1503 rb_event_is_commit(struct ring_buffer_per_cpu *cpu_buffer, 1504 struct ring_buffer_event *event) 1505 { 1506 unsigned long addr = (unsigned long)event; 1507 unsigned long index; 1508 1509 index = rb_event_index(event); 1510 addr &= PAGE_MASK; 1511 1512 return cpu_buffer->commit_page->page == (void *)addr && 1513 rb_commit_index(cpu_buffer) == index; 1514 } 1515 1516 static void 1517 rb_set_commit_to_write(struct ring_buffer_per_cpu *cpu_buffer) 1518 { 1519 unsigned long max_count; 1520 1521 /* 1522 * We only race with interrupts and NMIs on this CPU. 1523 * If we own the commit event, then we can commit 1524 * all others that interrupted us, since the interruptions 1525 * are in stack format (they finish before they come 1526 * back to us). This allows us to do a simple loop to 1527 * assign the commit to the tail. 1528 */ 1529 again: 1530 max_count = cpu_buffer->buffer->pages * 100; 1531 1532 while (cpu_buffer->commit_page != cpu_buffer->tail_page) { 1533 if (RB_WARN_ON(cpu_buffer, !(--max_count))) 1534 return; 1535 if (RB_WARN_ON(cpu_buffer, 1536 rb_is_reader_page(cpu_buffer->tail_page))) 1537 return; 1538 local_set(&cpu_buffer->commit_page->page->commit, 1539 rb_page_write(cpu_buffer->commit_page)); 1540 rb_inc_page(cpu_buffer, &cpu_buffer->commit_page); 1541 cpu_buffer->write_stamp = 1542 cpu_buffer->commit_page->page->time_stamp; 1543 /* add barrier to keep gcc from optimizing too much */ 1544 barrier(); 1545 } 1546 while (rb_commit_index(cpu_buffer) != 1547 rb_page_write(cpu_buffer->commit_page)) { 1548 1549 local_set(&cpu_buffer->commit_page->page->commit, 1550 rb_page_write(cpu_buffer->commit_page)); 1551 RB_WARN_ON(cpu_buffer, 1552 local_read(&cpu_buffer->commit_page->page->commit) & 1553 ~RB_WRITE_MASK); 1554 barrier(); 1555 } 1556 1557 /* again, keep gcc from optimizing */ 1558 barrier(); 1559 1560 /* 1561 * If an interrupt came in just after the first while loop 1562 * and pushed the tail page forward, we will be left with 1563 * a dangling commit that will never go forward. 1564 */ 1565 if (unlikely(cpu_buffer->commit_page != cpu_buffer->tail_page)) 1566 goto again; 1567 } 1568 1569 static void rb_reset_reader_page(struct ring_buffer_per_cpu *cpu_buffer) 1570 { 1571 cpu_buffer->read_stamp = cpu_buffer->reader_page->page->time_stamp; 1572 cpu_buffer->reader_page->read = 0; 1573 } 1574 1575 static void rb_inc_iter(struct ring_buffer_iter *iter) 1576 { 1577 struct ring_buffer_per_cpu *cpu_buffer = iter->cpu_buffer; 1578 1579 /* 1580 * The iterator could be on the reader page (it starts there). 1581 * But the head could have moved, since the reader was 1582 * found. Check for this case and assign the iterator 1583 * to the head page instead of next. 1584 */ 1585 if (iter->head_page == cpu_buffer->reader_page) 1586 iter->head_page = rb_set_head_page(cpu_buffer); 1587 else 1588 rb_inc_page(cpu_buffer, &iter->head_page); 1589 1590 iter->read_stamp = iter->head_page->page->time_stamp; 1591 iter->head = 0; 1592 } 1593 1594 /* Slow path, do not inline */ 1595 static noinline struct ring_buffer_event * 1596 rb_add_time_stamp(struct ring_buffer_event *event, u64 delta) 1597 { 1598 event->type_len = RINGBUF_TYPE_TIME_EXTEND; 1599 1600 /* Not the first event on the page? */ 1601 if (rb_event_index(event)) { 1602 event->time_delta = delta & TS_MASK; 1603 event->array[0] = delta >> TS_SHIFT; 1604 } else { 1605 /* nope, just zero it */ 1606 event->time_delta = 0; 1607 event->array[0] = 0; 1608 } 1609 1610 return skip_time_extend(event); 1611 } 1612 1613 /** 1614 * ring_buffer_update_event - update event type and data 1615 * @event: the even to update 1616 * @type: the type of event 1617 * @length: the size of the event field in the ring buffer 1618 * 1619 * Update the type and data fields of the event. The length 1620 * is the actual size that is written to the ring buffer, 1621 * and with this, we can determine what to place into the 1622 * data field. 1623 */ 1624 static void 1625 rb_update_event(struct ring_buffer_per_cpu *cpu_buffer, 1626 struct ring_buffer_event *event, unsigned length, 1627 int add_timestamp, u64 delta) 1628 { 1629 /* Only a commit updates the timestamp */ 1630 if (unlikely(!rb_event_is_commit(cpu_buffer, event))) 1631 delta = 0; 1632 1633 /* 1634 * If we need to add a timestamp, then we 1635 * add it to the start of the resevered space. 1636 */ 1637 if (unlikely(add_timestamp)) { 1638 event = rb_add_time_stamp(event, delta); 1639 length -= RB_LEN_TIME_EXTEND; 1640 delta = 0; 1641 } 1642 1643 event->time_delta = delta; 1644 length -= RB_EVNT_HDR_SIZE; 1645 if (length > RB_MAX_SMALL_DATA || RB_FORCE_8BYTE_ALIGNMENT) { 1646 event->type_len = 0; 1647 event->array[0] = length; 1648 } else 1649 event->type_len = DIV_ROUND_UP(length, RB_ALIGNMENT); 1650 } 1651 1652 /* 1653 * rb_handle_head_page - writer hit the head page 1654 * 1655 * Returns: +1 to retry page 1656 * 0 to continue 1657 * -1 on error 1658 */ 1659 static int 1660 rb_handle_head_page(struct ring_buffer_per_cpu *cpu_buffer, 1661 struct buffer_page *tail_page, 1662 struct buffer_page *next_page) 1663 { 1664 struct buffer_page *new_head; 1665 int entries; 1666 int type; 1667 int ret; 1668 1669 entries = rb_page_entries(next_page); 1670 1671 /* 1672 * The hard part is here. We need to move the head 1673 * forward, and protect against both readers on 1674 * other CPUs and writers coming in via interrupts. 1675 */ 1676 type = rb_head_page_set_update(cpu_buffer, next_page, tail_page, 1677 RB_PAGE_HEAD); 1678 1679 /* 1680 * type can be one of four: 1681 * NORMAL - an interrupt already moved it for us 1682 * HEAD - we are the first to get here. 1683 * UPDATE - we are the interrupt interrupting 1684 * a current move. 1685 * MOVED - a reader on another CPU moved the next 1686 * pointer to its reader page. Give up 1687 * and try again. 1688 */ 1689 1690 switch (type) { 1691 case RB_PAGE_HEAD: 1692 /* 1693 * We changed the head to UPDATE, thus 1694 * it is our responsibility to update 1695 * the counters. 1696 */ 1697 local_add(entries, &cpu_buffer->overrun); 1698 1699 /* 1700 * The entries will be zeroed out when we move the 1701 * tail page. 1702 */ 1703 1704 /* still more to do */ 1705 break; 1706 1707 case RB_PAGE_UPDATE: 1708 /* 1709 * This is an interrupt that interrupt the 1710 * previous update. Still more to do. 1711 */ 1712 break; 1713 case RB_PAGE_NORMAL: 1714 /* 1715 * An interrupt came in before the update 1716 * and processed this for us. 1717 * Nothing left to do. 1718 */ 1719 return 1; 1720 case RB_PAGE_MOVED: 1721 /* 1722 * The reader is on another CPU and just did 1723 * a swap with our next_page. 1724 * Try again. 1725 */ 1726 return 1; 1727 default: 1728 RB_WARN_ON(cpu_buffer, 1); /* WTF??? */ 1729 return -1; 1730 } 1731 1732 /* 1733 * Now that we are here, the old head pointer is 1734 * set to UPDATE. This will keep the reader from 1735 * swapping the head page with the reader page. 1736 * The reader (on another CPU) will spin till 1737 * we are finished. 1738 * 1739 * We just need to protect against interrupts 1740 * doing the job. We will set the next pointer 1741 * to HEAD. After that, we set the old pointer 1742 * to NORMAL, but only if it was HEAD before. 1743 * otherwise we are an interrupt, and only 1744 * want the outer most commit to reset it. 1745 */ 1746 new_head = next_page; 1747 rb_inc_page(cpu_buffer, &new_head); 1748 1749 ret = rb_head_page_set_head(cpu_buffer, new_head, next_page, 1750 RB_PAGE_NORMAL); 1751 1752 /* 1753 * Valid returns are: 1754 * HEAD - an interrupt came in and already set it. 1755 * NORMAL - One of two things: 1756 * 1) We really set it. 1757 * 2) A bunch of interrupts came in and moved 1758 * the page forward again. 1759 */ 1760 switch (ret) { 1761 case RB_PAGE_HEAD: 1762 case RB_PAGE_NORMAL: 1763 /* OK */ 1764 break; 1765 default: 1766 RB_WARN_ON(cpu_buffer, 1); 1767 return -1; 1768 } 1769 1770 /* 1771 * It is possible that an interrupt came in, 1772 * set the head up, then more interrupts came in 1773 * and moved it again. When we get back here, 1774 * the page would have been set to NORMAL but we 1775 * just set it back to HEAD. 1776 * 1777 * How do you detect this? Well, if that happened 1778 * the tail page would have moved. 1779 */ 1780 if (ret == RB_PAGE_NORMAL) { 1781 /* 1782 * If the tail had moved passed next, then we need 1783 * to reset the pointer. 1784 */ 1785 if (cpu_buffer->tail_page != tail_page && 1786 cpu_buffer->tail_page != next_page) 1787 rb_head_page_set_normal(cpu_buffer, new_head, 1788 next_page, 1789 RB_PAGE_HEAD); 1790 } 1791 1792 /* 1793 * If this was the outer most commit (the one that 1794 * changed the original pointer from HEAD to UPDATE), 1795 * then it is up to us to reset it to NORMAL. 1796 */ 1797 if (type == RB_PAGE_HEAD) { 1798 ret = rb_head_page_set_normal(cpu_buffer, next_page, 1799 tail_page, 1800 RB_PAGE_UPDATE); 1801 if (RB_WARN_ON(cpu_buffer, 1802 ret != RB_PAGE_UPDATE)) 1803 return -1; 1804 } 1805 1806 return 0; 1807 } 1808 1809 static unsigned rb_calculate_event_length(unsigned length) 1810 { 1811 struct ring_buffer_event event; /* Used only for sizeof array */ 1812 1813 /* zero length can cause confusions */ 1814 if (!length) 1815 length = 1; 1816 1817 if (length > RB_MAX_SMALL_DATA || RB_FORCE_8BYTE_ALIGNMENT) 1818 length += sizeof(event.array[0]); 1819 1820 length += RB_EVNT_HDR_SIZE; 1821 length = ALIGN(length, RB_ARCH_ALIGNMENT); 1822 1823 return length; 1824 } 1825 1826 static inline void 1827 rb_reset_tail(struct ring_buffer_per_cpu *cpu_buffer, 1828 struct buffer_page *tail_page, 1829 unsigned long tail, unsigned long length) 1830 { 1831 struct ring_buffer_event *event; 1832 1833 /* 1834 * Only the event that crossed the page boundary 1835 * must fill the old tail_page with padding. 1836 */ 1837 if (tail >= BUF_PAGE_SIZE) { 1838 /* 1839 * If the page was filled, then we still need 1840 * to update the real_end. Reset it to zero 1841 * and the reader will ignore it. 1842 */ 1843 if (tail == BUF_PAGE_SIZE) 1844 tail_page->real_end = 0; 1845 1846 local_sub(length, &tail_page->write); 1847 return; 1848 } 1849 1850 event = __rb_page_index(tail_page, tail); 1851 kmemcheck_annotate_bitfield(event, bitfield); 1852 1853 /* 1854 * Save the original length to the meta data. 1855 * This will be used by the reader to add lost event 1856 * counter. 1857 */ 1858 tail_page->real_end = tail; 1859 1860 /* 1861 * If this event is bigger than the minimum size, then 1862 * we need to be careful that we don't subtract the 1863 * write counter enough to allow another writer to slip 1864 * in on this page. 1865 * We put in a discarded commit instead, to make sure 1866 * that this space is not used again. 1867 * 1868 * If we are less than the minimum size, we don't need to 1869 * worry about it. 1870 */ 1871 if (tail > (BUF_PAGE_SIZE - RB_EVNT_MIN_SIZE)) { 1872 /* No room for any events */ 1873 1874 /* Mark the rest of the page with padding */ 1875 rb_event_set_padding(event); 1876 1877 /* Set the write back to the previous setting */ 1878 local_sub(length, &tail_page->write); 1879 return; 1880 } 1881 1882 /* Put in a discarded event */ 1883 event->array[0] = (BUF_PAGE_SIZE - tail) - RB_EVNT_HDR_SIZE; 1884 event->type_len = RINGBUF_TYPE_PADDING; 1885 /* time delta must be non zero */ 1886 event->time_delta = 1; 1887 1888 /* Set write to end of buffer */ 1889 length = (tail + length) - BUF_PAGE_SIZE; 1890 local_sub(length, &tail_page->write); 1891 } 1892 1893 /* 1894 * This is the slow path, force gcc not to inline it. 1895 */ 1896 static noinline struct ring_buffer_event * 1897 rb_move_tail(struct ring_buffer_per_cpu *cpu_buffer, 1898 unsigned long length, unsigned long tail, 1899 struct buffer_page *tail_page, u64 ts) 1900 { 1901 struct buffer_page *commit_page = cpu_buffer->commit_page; 1902 struct ring_buffer *buffer = cpu_buffer->buffer; 1903 struct buffer_page *next_page; 1904 int ret; 1905 1906 next_page = tail_page; 1907 1908 rb_inc_page(cpu_buffer, &next_page); 1909 1910 /* 1911 * If for some reason, we had an interrupt storm that made 1912 * it all the way around the buffer, bail, and warn 1913 * about it. 1914 */ 1915 if (unlikely(next_page == commit_page)) { 1916 local_inc(&cpu_buffer->commit_overrun); 1917 goto out_reset; 1918 } 1919 1920 /* 1921 * This is where the fun begins! 1922 * 1923 * We are fighting against races between a reader that 1924 * could be on another CPU trying to swap its reader 1925 * page with the buffer head. 1926 * 1927 * We are also fighting against interrupts coming in and 1928 * moving the head or tail on us as well. 1929 * 1930 * If the next page is the head page then we have filled 1931 * the buffer, unless the commit page is still on the 1932 * reader page. 1933 */ 1934 if (rb_is_head_page(cpu_buffer, next_page, &tail_page->list)) { 1935 1936 /* 1937 * If the commit is not on the reader page, then 1938 * move the header page. 1939 */ 1940 if (!rb_is_reader_page(cpu_buffer->commit_page)) { 1941 /* 1942 * If we are not in overwrite mode, 1943 * this is easy, just stop here. 1944 */ 1945 if (!(buffer->flags & RB_FL_OVERWRITE)) 1946 goto out_reset; 1947 1948 ret = rb_handle_head_page(cpu_buffer, 1949 tail_page, 1950 next_page); 1951 if (ret < 0) 1952 goto out_reset; 1953 if (ret) 1954 goto out_again; 1955 } else { 1956 /* 1957 * We need to be careful here too. The 1958 * commit page could still be on the reader 1959 * page. We could have a small buffer, and 1960 * have filled up the buffer with events 1961 * from interrupts and such, and wrapped. 1962 * 1963 * Note, if the tail page is also the on the 1964 * reader_page, we let it move out. 1965 */ 1966 if (unlikely((cpu_buffer->commit_page != 1967 cpu_buffer->tail_page) && 1968 (cpu_buffer->commit_page == 1969 cpu_buffer->reader_page))) { 1970 local_inc(&cpu_buffer->commit_overrun); 1971 goto out_reset; 1972 } 1973 } 1974 } 1975 1976 ret = rb_tail_page_update(cpu_buffer, tail_page, next_page); 1977 if (ret) { 1978 /* 1979 * Nested commits always have zero deltas, so 1980 * just reread the time stamp 1981 */ 1982 ts = rb_time_stamp(buffer); 1983 next_page->page->time_stamp = ts; 1984 } 1985 1986 out_again: 1987 1988 rb_reset_tail(cpu_buffer, tail_page, tail, length); 1989 1990 /* fail and let the caller try again */ 1991 return ERR_PTR(-EAGAIN); 1992 1993 out_reset: 1994 /* reset write */ 1995 rb_reset_tail(cpu_buffer, tail_page, tail, length); 1996 1997 return NULL; 1998 } 1999 2000 static struct ring_buffer_event * 2001 __rb_reserve_next(struct ring_buffer_per_cpu *cpu_buffer, 2002 unsigned long length, u64 ts, 2003 u64 delta, int add_timestamp) 2004 { 2005 struct buffer_page *tail_page; 2006 struct ring_buffer_event *event; 2007 unsigned long tail, write; 2008 2009 /* 2010 * If the time delta since the last event is too big to 2011 * hold in the time field of the event, then we append a 2012 * TIME EXTEND event ahead of the data event. 2013 */ 2014 if (unlikely(add_timestamp)) 2015 length += RB_LEN_TIME_EXTEND; 2016 2017 tail_page = cpu_buffer->tail_page; 2018 write = local_add_return(length, &tail_page->write); 2019 2020 /* set write to only the index of the write */ 2021 write &= RB_WRITE_MASK; 2022 tail = write - length; 2023 2024 /* See if we shot pass the end of this buffer page */ 2025 if (unlikely(write > BUF_PAGE_SIZE)) 2026 return rb_move_tail(cpu_buffer, length, tail, 2027 tail_page, ts); 2028 2029 /* We reserved something on the buffer */ 2030 2031 event = __rb_page_index(tail_page, tail); 2032 kmemcheck_annotate_bitfield(event, bitfield); 2033 rb_update_event(cpu_buffer, event, length, add_timestamp, delta); 2034 2035 local_inc(&tail_page->entries); 2036 2037 /* 2038 * If this is the first commit on the page, then update 2039 * its timestamp. 2040 */ 2041 if (!tail) 2042 tail_page->page->time_stamp = ts; 2043 2044 return event; 2045 } 2046 2047 static inline int 2048 rb_try_to_discard(struct ring_buffer_per_cpu *cpu_buffer, 2049 struct ring_buffer_event *event) 2050 { 2051 unsigned long new_index, old_index; 2052 struct buffer_page *bpage; 2053 unsigned long index; 2054 unsigned long addr; 2055 2056 new_index = rb_event_index(event); 2057 old_index = new_index + rb_event_ts_length(event); 2058 addr = (unsigned long)event; 2059 addr &= PAGE_MASK; 2060 2061 bpage = cpu_buffer->tail_page; 2062 2063 if (bpage->page == (void *)addr && rb_page_write(bpage) == old_index) { 2064 unsigned long write_mask = 2065 local_read(&bpage->write) & ~RB_WRITE_MASK; 2066 /* 2067 * This is on the tail page. It is possible that 2068 * a write could come in and move the tail page 2069 * and write to the next page. That is fine 2070 * because we just shorten what is on this page. 2071 */ 2072 old_index += write_mask; 2073 new_index += write_mask; 2074 index = local_cmpxchg(&bpage->write, old_index, new_index); 2075 if (index == old_index) 2076 return 1; 2077 } 2078 2079 /* could not discard */ 2080 return 0; 2081 } 2082 2083 static void rb_start_commit(struct ring_buffer_per_cpu *cpu_buffer) 2084 { 2085 local_inc(&cpu_buffer->committing); 2086 local_inc(&cpu_buffer->commits); 2087 } 2088 2089 static inline void rb_end_commit(struct ring_buffer_per_cpu *cpu_buffer) 2090 { 2091 unsigned long commits; 2092 2093 if (RB_WARN_ON(cpu_buffer, 2094 !local_read(&cpu_buffer->committing))) 2095 return; 2096 2097 again: 2098 commits = local_read(&cpu_buffer->commits); 2099 /* synchronize with interrupts */ 2100 barrier(); 2101 if (local_read(&cpu_buffer->committing) == 1) 2102 rb_set_commit_to_write(cpu_buffer); 2103 2104 local_dec(&cpu_buffer->committing); 2105 2106 /* synchronize with interrupts */ 2107 barrier(); 2108 2109 /* 2110 * Need to account for interrupts coming in between the 2111 * updating of the commit page and the clearing of the 2112 * committing counter. 2113 */ 2114 if (unlikely(local_read(&cpu_buffer->commits) != commits) && 2115 !local_read(&cpu_buffer->committing)) { 2116 local_inc(&cpu_buffer->committing); 2117 goto again; 2118 } 2119 } 2120 2121 static struct ring_buffer_event * 2122 rb_reserve_next_event(struct ring_buffer *buffer, 2123 struct ring_buffer_per_cpu *cpu_buffer, 2124 unsigned long length) 2125 { 2126 struct ring_buffer_event *event; 2127 u64 ts, delta; 2128 int nr_loops = 0; 2129 int add_timestamp; 2130 u64 diff; 2131 2132 rb_start_commit(cpu_buffer); 2133 2134 #ifdef CONFIG_RING_BUFFER_ALLOW_SWAP 2135 /* 2136 * Due to the ability to swap a cpu buffer from a buffer 2137 * it is possible it was swapped before we committed. 2138 * (committing stops a swap). We check for it here and 2139 * if it happened, we have to fail the write. 2140 */ 2141 barrier(); 2142 if (unlikely(ACCESS_ONCE(cpu_buffer->buffer) != buffer)) { 2143 local_dec(&cpu_buffer->committing); 2144 local_dec(&cpu_buffer->commits); 2145 return NULL; 2146 } 2147 #endif 2148 2149 length = rb_calculate_event_length(length); 2150 again: 2151 add_timestamp = 0; 2152 delta = 0; 2153 2154 /* 2155 * We allow for interrupts to reenter here and do a trace. 2156 * If one does, it will cause this original code to loop 2157 * back here. Even with heavy interrupts happening, this 2158 * should only happen a few times in a row. If this happens 2159 * 1000 times in a row, there must be either an interrupt 2160 * storm or we have something buggy. 2161 * Bail! 2162 */ 2163 if (RB_WARN_ON(cpu_buffer, ++nr_loops > 1000)) 2164 goto out_fail; 2165 2166 ts = rb_time_stamp(cpu_buffer->buffer); 2167 diff = ts - cpu_buffer->write_stamp; 2168 2169 /* make sure this diff is calculated here */ 2170 barrier(); 2171 2172 /* Did the write stamp get updated already? */ 2173 if (likely(ts >= cpu_buffer->write_stamp)) { 2174 delta = diff; 2175 if (unlikely(test_time_stamp(delta))) { 2176 WARN_ONCE(delta > (1ULL << 59), 2177 KERN_WARNING "Delta way too big! %llu ts=%llu write stamp = %llu\n", 2178 (unsigned long long)delta, 2179 (unsigned long long)ts, 2180 (unsigned long long)cpu_buffer->write_stamp); 2181 add_timestamp = 1; 2182 } 2183 } 2184 2185 event = __rb_reserve_next(cpu_buffer, length, ts, 2186 delta, add_timestamp); 2187 if (unlikely(PTR_ERR(event) == -EAGAIN)) 2188 goto again; 2189 2190 if (!event) 2191 goto out_fail; 2192 2193 return event; 2194 2195 out_fail: 2196 rb_end_commit(cpu_buffer); 2197 return NULL; 2198 } 2199 2200 #ifdef CONFIG_TRACING 2201 2202 #define TRACE_RECURSIVE_DEPTH 16 2203 2204 /* Keep this code out of the fast path cache */ 2205 static noinline void trace_recursive_fail(void) 2206 { 2207 /* Disable all tracing before we do anything else */ 2208 tracing_off_permanent(); 2209 2210 printk_once(KERN_WARNING "Tracing recursion: depth[%ld]:" 2211 "HC[%lu]:SC[%lu]:NMI[%lu]\n", 2212 current->trace_recursion, 2213 hardirq_count() >> HARDIRQ_SHIFT, 2214 softirq_count() >> SOFTIRQ_SHIFT, 2215 in_nmi()); 2216 2217 WARN_ON_ONCE(1); 2218 } 2219 2220 static inline int trace_recursive_lock(void) 2221 { 2222 current->trace_recursion++; 2223 2224 if (likely(current->trace_recursion < TRACE_RECURSIVE_DEPTH)) 2225 return 0; 2226 2227 trace_recursive_fail(); 2228 2229 return -1; 2230 } 2231 2232 static inline void trace_recursive_unlock(void) 2233 { 2234 WARN_ON_ONCE(!current->trace_recursion); 2235 2236 current->trace_recursion--; 2237 } 2238 2239 #else 2240 2241 #define trace_recursive_lock() (0) 2242 #define trace_recursive_unlock() do { } while (0) 2243 2244 #endif 2245 2246 /** 2247 * ring_buffer_lock_reserve - reserve a part of the buffer 2248 * @buffer: the ring buffer to reserve from 2249 * @length: the length of the data to reserve (excluding event header) 2250 * 2251 * Returns a reseverd event on the ring buffer to copy directly to. 2252 * The user of this interface will need to get the body to write into 2253 * and can use the ring_buffer_event_data() interface. 2254 * 2255 * The length is the length of the data needed, not the event length 2256 * which also includes the event header. 2257 * 2258 * Must be paired with ring_buffer_unlock_commit, unless NULL is returned. 2259 * If NULL is returned, then nothing has been allocated or locked. 2260 */ 2261 struct ring_buffer_event * 2262 ring_buffer_lock_reserve(struct ring_buffer *buffer, unsigned long length) 2263 { 2264 struct ring_buffer_per_cpu *cpu_buffer; 2265 struct ring_buffer_event *event; 2266 int cpu; 2267 2268 if (ring_buffer_flags != RB_BUFFERS_ON) 2269 return NULL; 2270 2271 /* If we are tracing schedule, we don't want to recurse */ 2272 preempt_disable_notrace(); 2273 2274 if (atomic_read(&buffer->record_disabled)) 2275 goto out_nocheck; 2276 2277 if (trace_recursive_lock()) 2278 goto out_nocheck; 2279 2280 cpu = raw_smp_processor_id(); 2281 2282 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 2283 goto out; 2284 2285 cpu_buffer = buffer->buffers[cpu]; 2286 2287 if (atomic_read(&cpu_buffer->record_disabled)) 2288 goto out; 2289 2290 if (length > BUF_MAX_DATA_SIZE) 2291 goto out; 2292 2293 event = rb_reserve_next_event(buffer, cpu_buffer, length); 2294 if (!event) 2295 goto out; 2296 2297 return event; 2298 2299 out: 2300 trace_recursive_unlock(); 2301 2302 out_nocheck: 2303 preempt_enable_notrace(); 2304 return NULL; 2305 } 2306 EXPORT_SYMBOL_GPL(ring_buffer_lock_reserve); 2307 2308 static void 2309 rb_update_write_stamp(struct ring_buffer_per_cpu *cpu_buffer, 2310 struct ring_buffer_event *event) 2311 { 2312 u64 delta; 2313 2314 /* 2315 * The event first in the commit queue updates the 2316 * time stamp. 2317 */ 2318 if (rb_event_is_commit(cpu_buffer, event)) { 2319 /* 2320 * A commit event that is first on a page 2321 * updates the write timestamp with the page stamp 2322 */ 2323 if (!rb_event_index(event)) 2324 cpu_buffer->write_stamp = 2325 cpu_buffer->commit_page->page->time_stamp; 2326 else if (event->type_len == RINGBUF_TYPE_TIME_EXTEND) { 2327 delta = event->array[0]; 2328 delta <<= TS_SHIFT; 2329 delta += event->time_delta; 2330 cpu_buffer->write_stamp += delta; 2331 } else 2332 cpu_buffer->write_stamp += event->time_delta; 2333 } 2334 } 2335 2336 static void rb_commit(struct ring_buffer_per_cpu *cpu_buffer, 2337 struct ring_buffer_event *event) 2338 { 2339 local_inc(&cpu_buffer->entries); 2340 rb_update_write_stamp(cpu_buffer, event); 2341 rb_end_commit(cpu_buffer); 2342 } 2343 2344 /** 2345 * ring_buffer_unlock_commit - commit a reserved 2346 * @buffer: The buffer to commit to 2347 * @event: The event pointer to commit. 2348 * 2349 * This commits the data to the ring buffer, and releases any locks held. 2350 * 2351 * Must be paired with ring_buffer_lock_reserve. 2352 */ 2353 int ring_buffer_unlock_commit(struct ring_buffer *buffer, 2354 struct ring_buffer_event *event) 2355 { 2356 struct ring_buffer_per_cpu *cpu_buffer; 2357 int cpu = raw_smp_processor_id(); 2358 2359 cpu_buffer = buffer->buffers[cpu]; 2360 2361 rb_commit(cpu_buffer, event); 2362 2363 trace_recursive_unlock(); 2364 2365 preempt_enable_notrace(); 2366 2367 return 0; 2368 } 2369 EXPORT_SYMBOL_GPL(ring_buffer_unlock_commit); 2370 2371 static inline void rb_event_discard(struct ring_buffer_event *event) 2372 { 2373 if (event->type_len == RINGBUF_TYPE_TIME_EXTEND) 2374 event = skip_time_extend(event); 2375 2376 /* array[0] holds the actual length for the discarded event */ 2377 event->array[0] = rb_event_data_length(event) - RB_EVNT_HDR_SIZE; 2378 event->type_len = RINGBUF_TYPE_PADDING; 2379 /* time delta must be non zero */ 2380 if (!event->time_delta) 2381 event->time_delta = 1; 2382 } 2383 2384 /* 2385 * Decrement the entries to the page that an event is on. 2386 * The event does not even need to exist, only the pointer 2387 * to the page it is on. This may only be called before the commit 2388 * takes place. 2389 */ 2390 static inline void 2391 rb_decrement_entry(struct ring_buffer_per_cpu *cpu_buffer, 2392 struct ring_buffer_event *event) 2393 { 2394 unsigned long addr = (unsigned long)event; 2395 struct buffer_page *bpage = cpu_buffer->commit_page; 2396 struct buffer_page *start; 2397 2398 addr &= PAGE_MASK; 2399 2400 /* Do the likely case first */ 2401 if (likely(bpage->page == (void *)addr)) { 2402 local_dec(&bpage->entries); 2403 return; 2404 } 2405 2406 /* 2407 * Because the commit page may be on the reader page we 2408 * start with the next page and check the end loop there. 2409 */ 2410 rb_inc_page(cpu_buffer, &bpage); 2411 start = bpage; 2412 do { 2413 if (bpage->page == (void *)addr) { 2414 local_dec(&bpage->entries); 2415 return; 2416 } 2417 rb_inc_page(cpu_buffer, &bpage); 2418 } while (bpage != start); 2419 2420 /* commit not part of this buffer?? */ 2421 RB_WARN_ON(cpu_buffer, 1); 2422 } 2423 2424 /** 2425 * ring_buffer_commit_discard - discard an event that has not been committed 2426 * @buffer: the ring buffer 2427 * @event: non committed event to discard 2428 * 2429 * Sometimes an event that is in the ring buffer needs to be ignored. 2430 * This function lets the user discard an event in the ring buffer 2431 * and then that event will not be read later. 2432 * 2433 * This function only works if it is called before the the item has been 2434 * committed. It will try to free the event from the ring buffer 2435 * if another event has not been added behind it. 2436 * 2437 * If another event has been added behind it, it will set the event 2438 * up as discarded, and perform the commit. 2439 * 2440 * If this function is called, do not call ring_buffer_unlock_commit on 2441 * the event. 2442 */ 2443 void ring_buffer_discard_commit(struct ring_buffer *buffer, 2444 struct ring_buffer_event *event) 2445 { 2446 struct ring_buffer_per_cpu *cpu_buffer; 2447 int cpu; 2448 2449 /* The event is discarded regardless */ 2450 rb_event_discard(event); 2451 2452 cpu = smp_processor_id(); 2453 cpu_buffer = buffer->buffers[cpu]; 2454 2455 /* 2456 * This must only be called if the event has not been 2457 * committed yet. Thus we can assume that preemption 2458 * is still disabled. 2459 */ 2460 RB_WARN_ON(buffer, !local_read(&cpu_buffer->committing)); 2461 2462 rb_decrement_entry(cpu_buffer, event); 2463 if (rb_try_to_discard(cpu_buffer, event)) 2464 goto out; 2465 2466 /* 2467 * The commit is still visible by the reader, so we 2468 * must still update the timestamp. 2469 */ 2470 rb_update_write_stamp(cpu_buffer, event); 2471 out: 2472 rb_end_commit(cpu_buffer); 2473 2474 trace_recursive_unlock(); 2475 2476 preempt_enable_notrace(); 2477 2478 } 2479 EXPORT_SYMBOL_GPL(ring_buffer_discard_commit); 2480 2481 /** 2482 * ring_buffer_write - write data to the buffer without reserving 2483 * @buffer: The ring buffer to write to. 2484 * @length: The length of the data being written (excluding the event header) 2485 * @data: The data to write to the buffer. 2486 * 2487 * This is like ring_buffer_lock_reserve and ring_buffer_unlock_commit as 2488 * one function. If you already have the data to write to the buffer, it 2489 * may be easier to simply call this function. 2490 * 2491 * Note, like ring_buffer_lock_reserve, the length is the length of the data 2492 * and not the length of the event which would hold the header. 2493 */ 2494 int ring_buffer_write(struct ring_buffer *buffer, 2495 unsigned long length, 2496 void *data) 2497 { 2498 struct ring_buffer_per_cpu *cpu_buffer; 2499 struct ring_buffer_event *event; 2500 void *body; 2501 int ret = -EBUSY; 2502 int cpu; 2503 2504 if (ring_buffer_flags != RB_BUFFERS_ON) 2505 return -EBUSY; 2506 2507 preempt_disable_notrace(); 2508 2509 if (atomic_read(&buffer->record_disabled)) 2510 goto out; 2511 2512 cpu = raw_smp_processor_id(); 2513 2514 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 2515 goto out; 2516 2517 cpu_buffer = buffer->buffers[cpu]; 2518 2519 if (atomic_read(&cpu_buffer->record_disabled)) 2520 goto out; 2521 2522 if (length > BUF_MAX_DATA_SIZE) 2523 goto out; 2524 2525 event = rb_reserve_next_event(buffer, cpu_buffer, length); 2526 if (!event) 2527 goto out; 2528 2529 body = rb_event_data(event); 2530 2531 memcpy(body, data, length); 2532 2533 rb_commit(cpu_buffer, event); 2534 2535 ret = 0; 2536 out: 2537 preempt_enable_notrace(); 2538 2539 return ret; 2540 } 2541 EXPORT_SYMBOL_GPL(ring_buffer_write); 2542 2543 static int rb_per_cpu_empty(struct ring_buffer_per_cpu *cpu_buffer) 2544 { 2545 struct buffer_page *reader = cpu_buffer->reader_page; 2546 struct buffer_page *head = rb_set_head_page(cpu_buffer); 2547 struct buffer_page *commit = cpu_buffer->commit_page; 2548 2549 /* In case of error, head will be NULL */ 2550 if (unlikely(!head)) 2551 return 1; 2552 2553 return reader->read == rb_page_commit(reader) && 2554 (commit == reader || 2555 (commit == head && 2556 head->read == rb_page_commit(commit))); 2557 } 2558 2559 /** 2560 * ring_buffer_record_disable - stop all writes into the buffer 2561 * @buffer: The ring buffer to stop writes to. 2562 * 2563 * This prevents all writes to the buffer. Any attempt to write 2564 * to the buffer after this will fail and return NULL. 2565 * 2566 * The caller should call synchronize_sched() after this. 2567 */ 2568 void ring_buffer_record_disable(struct ring_buffer *buffer) 2569 { 2570 atomic_inc(&buffer->record_disabled); 2571 } 2572 EXPORT_SYMBOL_GPL(ring_buffer_record_disable); 2573 2574 /** 2575 * ring_buffer_record_enable - enable writes to the buffer 2576 * @buffer: The ring buffer to enable writes 2577 * 2578 * Note, multiple disables will need the same number of enables 2579 * to truly enable the writing (much like preempt_disable). 2580 */ 2581 void ring_buffer_record_enable(struct ring_buffer *buffer) 2582 { 2583 atomic_dec(&buffer->record_disabled); 2584 } 2585 EXPORT_SYMBOL_GPL(ring_buffer_record_enable); 2586 2587 /** 2588 * ring_buffer_record_disable_cpu - stop all writes into the cpu_buffer 2589 * @buffer: The ring buffer to stop writes to. 2590 * @cpu: The CPU buffer to stop 2591 * 2592 * This prevents all writes to the buffer. Any attempt to write 2593 * to the buffer after this will fail and return NULL. 2594 * 2595 * The caller should call synchronize_sched() after this. 2596 */ 2597 void ring_buffer_record_disable_cpu(struct ring_buffer *buffer, int cpu) 2598 { 2599 struct ring_buffer_per_cpu *cpu_buffer; 2600 2601 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 2602 return; 2603 2604 cpu_buffer = buffer->buffers[cpu]; 2605 atomic_inc(&cpu_buffer->record_disabled); 2606 } 2607 EXPORT_SYMBOL_GPL(ring_buffer_record_disable_cpu); 2608 2609 /** 2610 * ring_buffer_record_enable_cpu - enable writes to the buffer 2611 * @buffer: The ring buffer to enable writes 2612 * @cpu: The CPU to enable. 2613 * 2614 * Note, multiple disables will need the same number of enables 2615 * to truly enable the writing (much like preempt_disable). 2616 */ 2617 void ring_buffer_record_enable_cpu(struct ring_buffer *buffer, int cpu) 2618 { 2619 struct ring_buffer_per_cpu *cpu_buffer; 2620 2621 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 2622 return; 2623 2624 cpu_buffer = buffer->buffers[cpu]; 2625 atomic_dec(&cpu_buffer->record_disabled); 2626 } 2627 EXPORT_SYMBOL_GPL(ring_buffer_record_enable_cpu); 2628 2629 /* 2630 * The total entries in the ring buffer is the running counter 2631 * of entries entered into the ring buffer, minus the sum of 2632 * the entries read from the ring buffer and the number of 2633 * entries that were overwritten. 2634 */ 2635 static inline unsigned long 2636 rb_num_of_entries(struct ring_buffer_per_cpu *cpu_buffer) 2637 { 2638 return local_read(&cpu_buffer->entries) - 2639 (local_read(&cpu_buffer->overrun) + cpu_buffer->read); 2640 } 2641 2642 /** 2643 * ring_buffer_entries_cpu - get the number of entries in a cpu buffer 2644 * @buffer: The ring buffer 2645 * @cpu: The per CPU buffer to get the entries from. 2646 */ 2647 unsigned long ring_buffer_entries_cpu(struct ring_buffer *buffer, int cpu) 2648 { 2649 struct ring_buffer_per_cpu *cpu_buffer; 2650 2651 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 2652 return 0; 2653 2654 cpu_buffer = buffer->buffers[cpu]; 2655 2656 return rb_num_of_entries(cpu_buffer); 2657 } 2658 EXPORT_SYMBOL_GPL(ring_buffer_entries_cpu); 2659 2660 /** 2661 * ring_buffer_overrun_cpu - get the number of overruns in a cpu_buffer 2662 * @buffer: The ring buffer 2663 * @cpu: The per CPU buffer to get the number of overruns from 2664 */ 2665 unsigned long ring_buffer_overrun_cpu(struct ring_buffer *buffer, int cpu) 2666 { 2667 struct ring_buffer_per_cpu *cpu_buffer; 2668 unsigned long ret; 2669 2670 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 2671 return 0; 2672 2673 cpu_buffer = buffer->buffers[cpu]; 2674 ret = local_read(&cpu_buffer->overrun); 2675 2676 return ret; 2677 } 2678 EXPORT_SYMBOL_GPL(ring_buffer_overrun_cpu); 2679 2680 /** 2681 * ring_buffer_commit_overrun_cpu - get the number of overruns caused by commits 2682 * @buffer: The ring buffer 2683 * @cpu: The per CPU buffer to get the number of overruns from 2684 */ 2685 unsigned long 2686 ring_buffer_commit_overrun_cpu(struct ring_buffer *buffer, int cpu) 2687 { 2688 struct ring_buffer_per_cpu *cpu_buffer; 2689 unsigned long ret; 2690 2691 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 2692 return 0; 2693 2694 cpu_buffer = buffer->buffers[cpu]; 2695 ret = local_read(&cpu_buffer->commit_overrun); 2696 2697 return ret; 2698 } 2699 EXPORT_SYMBOL_GPL(ring_buffer_commit_overrun_cpu); 2700 2701 /** 2702 * ring_buffer_entries - get the number of entries in a buffer 2703 * @buffer: The ring buffer 2704 * 2705 * Returns the total number of entries in the ring buffer 2706 * (all CPU entries) 2707 */ 2708 unsigned long ring_buffer_entries(struct ring_buffer *buffer) 2709 { 2710 struct ring_buffer_per_cpu *cpu_buffer; 2711 unsigned long entries = 0; 2712 int cpu; 2713 2714 /* if you care about this being correct, lock the buffer */ 2715 for_each_buffer_cpu(buffer, cpu) { 2716 cpu_buffer = buffer->buffers[cpu]; 2717 entries += rb_num_of_entries(cpu_buffer); 2718 } 2719 2720 return entries; 2721 } 2722 EXPORT_SYMBOL_GPL(ring_buffer_entries); 2723 2724 /** 2725 * ring_buffer_overruns - get the number of overruns in buffer 2726 * @buffer: The ring buffer 2727 * 2728 * Returns the total number of overruns in the ring buffer 2729 * (all CPU entries) 2730 */ 2731 unsigned long ring_buffer_overruns(struct ring_buffer *buffer) 2732 { 2733 struct ring_buffer_per_cpu *cpu_buffer; 2734 unsigned long overruns = 0; 2735 int cpu; 2736 2737 /* if you care about this being correct, lock the buffer */ 2738 for_each_buffer_cpu(buffer, cpu) { 2739 cpu_buffer = buffer->buffers[cpu]; 2740 overruns += local_read(&cpu_buffer->overrun); 2741 } 2742 2743 return overruns; 2744 } 2745 EXPORT_SYMBOL_GPL(ring_buffer_overruns); 2746 2747 static void rb_iter_reset(struct ring_buffer_iter *iter) 2748 { 2749 struct ring_buffer_per_cpu *cpu_buffer = iter->cpu_buffer; 2750 2751 /* Iterator usage is expected to have record disabled */ 2752 if (list_empty(&cpu_buffer->reader_page->list)) { 2753 iter->head_page = rb_set_head_page(cpu_buffer); 2754 if (unlikely(!iter->head_page)) 2755 return; 2756 iter->head = iter->head_page->read; 2757 } else { 2758 iter->head_page = cpu_buffer->reader_page; 2759 iter->head = cpu_buffer->reader_page->read; 2760 } 2761 if (iter->head) 2762 iter->read_stamp = cpu_buffer->read_stamp; 2763 else 2764 iter->read_stamp = iter->head_page->page->time_stamp; 2765 iter->cache_reader_page = cpu_buffer->reader_page; 2766 iter->cache_read = cpu_buffer->read; 2767 } 2768 2769 /** 2770 * ring_buffer_iter_reset - reset an iterator 2771 * @iter: The iterator to reset 2772 * 2773 * Resets the iterator, so that it will start from the beginning 2774 * again. 2775 */ 2776 void ring_buffer_iter_reset(struct ring_buffer_iter *iter) 2777 { 2778 struct ring_buffer_per_cpu *cpu_buffer; 2779 unsigned long flags; 2780 2781 if (!iter) 2782 return; 2783 2784 cpu_buffer = iter->cpu_buffer; 2785 2786 spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 2787 rb_iter_reset(iter); 2788 spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 2789 } 2790 EXPORT_SYMBOL_GPL(ring_buffer_iter_reset); 2791 2792 /** 2793 * ring_buffer_iter_empty - check if an iterator has no more to read 2794 * @iter: The iterator to check 2795 */ 2796 int ring_buffer_iter_empty(struct ring_buffer_iter *iter) 2797 { 2798 struct ring_buffer_per_cpu *cpu_buffer; 2799 2800 cpu_buffer = iter->cpu_buffer; 2801 2802 return iter->head_page == cpu_buffer->commit_page && 2803 iter->head == rb_commit_index(cpu_buffer); 2804 } 2805 EXPORT_SYMBOL_GPL(ring_buffer_iter_empty); 2806 2807 static void 2808 rb_update_read_stamp(struct ring_buffer_per_cpu *cpu_buffer, 2809 struct ring_buffer_event *event) 2810 { 2811 u64 delta; 2812 2813 switch (event->type_len) { 2814 case RINGBUF_TYPE_PADDING: 2815 return; 2816 2817 case RINGBUF_TYPE_TIME_EXTEND: 2818 delta = event->array[0]; 2819 delta <<= TS_SHIFT; 2820 delta += event->time_delta; 2821 cpu_buffer->read_stamp += delta; 2822 return; 2823 2824 case RINGBUF_TYPE_TIME_STAMP: 2825 /* FIXME: not implemented */ 2826 return; 2827 2828 case RINGBUF_TYPE_DATA: 2829 cpu_buffer->read_stamp += event->time_delta; 2830 return; 2831 2832 default: 2833 BUG(); 2834 } 2835 return; 2836 } 2837 2838 static void 2839 rb_update_iter_read_stamp(struct ring_buffer_iter *iter, 2840 struct ring_buffer_event *event) 2841 { 2842 u64 delta; 2843 2844 switch (event->type_len) { 2845 case RINGBUF_TYPE_PADDING: 2846 return; 2847 2848 case RINGBUF_TYPE_TIME_EXTEND: 2849 delta = event->array[0]; 2850 delta <<= TS_SHIFT; 2851 delta += event->time_delta; 2852 iter->read_stamp += delta; 2853 return; 2854 2855 case RINGBUF_TYPE_TIME_STAMP: 2856 /* FIXME: not implemented */ 2857 return; 2858 2859 case RINGBUF_TYPE_DATA: 2860 iter->read_stamp += event->time_delta; 2861 return; 2862 2863 default: 2864 BUG(); 2865 } 2866 return; 2867 } 2868 2869 static struct buffer_page * 2870 rb_get_reader_page(struct ring_buffer_per_cpu *cpu_buffer) 2871 { 2872 struct buffer_page *reader = NULL; 2873 unsigned long overwrite; 2874 unsigned long flags; 2875 int nr_loops = 0; 2876 int ret; 2877 2878 local_irq_save(flags); 2879 arch_spin_lock(&cpu_buffer->lock); 2880 2881 again: 2882 /* 2883 * This should normally only loop twice. But because the 2884 * start of the reader inserts an empty page, it causes 2885 * a case where we will loop three times. There should be no 2886 * reason to loop four times (that I know of). 2887 */ 2888 if (RB_WARN_ON(cpu_buffer, ++nr_loops > 3)) { 2889 reader = NULL; 2890 goto out; 2891 } 2892 2893 reader = cpu_buffer->reader_page; 2894 2895 /* If there's more to read, return this page */ 2896 if (cpu_buffer->reader_page->read < rb_page_size(reader)) 2897 goto out; 2898 2899 /* Never should we have an index greater than the size */ 2900 if (RB_WARN_ON(cpu_buffer, 2901 cpu_buffer->reader_page->read > rb_page_size(reader))) 2902 goto out; 2903 2904 /* check if we caught up to the tail */ 2905 reader = NULL; 2906 if (cpu_buffer->commit_page == cpu_buffer->reader_page) 2907 goto out; 2908 2909 /* 2910 * Reset the reader page to size zero. 2911 */ 2912 local_set(&cpu_buffer->reader_page->write, 0); 2913 local_set(&cpu_buffer->reader_page->entries, 0); 2914 local_set(&cpu_buffer->reader_page->page->commit, 0); 2915 cpu_buffer->reader_page->real_end = 0; 2916 2917 spin: 2918 /* 2919 * Splice the empty reader page into the list around the head. 2920 */ 2921 reader = rb_set_head_page(cpu_buffer); 2922 cpu_buffer->reader_page->list.next = rb_list_head(reader->list.next); 2923 cpu_buffer->reader_page->list.prev = reader->list.prev; 2924 2925 /* 2926 * cpu_buffer->pages just needs to point to the buffer, it 2927 * has no specific buffer page to point to. Lets move it out 2928 * of our way so we don't accidently swap it. 2929 */ 2930 cpu_buffer->pages = reader->list.prev; 2931 2932 /* The reader page will be pointing to the new head */ 2933 rb_set_list_to_head(cpu_buffer, &cpu_buffer->reader_page->list); 2934 2935 /* 2936 * We want to make sure we read the overruns after we set up our 2937 * pointers to the next object. The writer side does a 2938 * cmpxchg to cross pages which acts as the mb on the writer 2939 * side. Note, the reader will constantly fail the swap 2940 * while the writer is updating the pointers, so this 2941 * guarantees that the overwrite recorded here is the one we 2942 * want to compare with the last_overrun. 2943 */ 2944 smp_mb(); 2945 overwrite = local_read(&(cpu_buffer->overrun)); 2946 2947 /* 2948 * Here's the tricky part. 2949 * 2950 * We need to move the pointer past the header page. 2951 * But we can only do that if a writer is not currently 2952 * moving it. The page before the header page has the 2953 * flag bit '1' set if it is pointing to the page we want. 2954 * but if the writer is in the process of moving it 2955 * than it will be '2' or already moved '0'. 2956 */ 2957 2958 ret = rb_head_page_replace(reader, cpu_buffer->reader_page); 2959 2960 /* 2961 * If we did not convert it, then we must try again. 2962 */ 2963 if (!ret) 2964 goto spin; 2965 2966 /* 2967 * Yeah! We succeeded in replacing the page. 2968 * 2969 * Now make the new head point back to the reader page. 2970 */ 2971 rb_list_head(reader->list.next)->prev = &cpu_buffer->reader_page->list; 2972 rb_inc_page(cpu_buffer, &cpu_buffer->head_page); 2973 2974 /* Finally update the reader page to the new head */ 2975 cpu_buffer->reader_page = reader; 2976 rb_reset_reader_page(cpu_buffer); 2977 2978 if (overwrite != cpu_buffer->last_overrun) { 2979 cpu_buffer->lost_events = overwrite - cpu_buffer->last_overrun; 2980 cpu_buffer->last_overrun = overwrite; 2981 } 2982 2983 goto again; 2984 2985 out: 2986 arch_spin_unlock(&cpu_buffer->lock); 2987 local_irq_restore(flags); 2988 2989 return reader; 2990 } 2991 2992 static void rb_advance_reader(struct ring_buffer_per_cpu *cpu_buffer) 2993 { 2994 struct ring_buffer_event *event; 2995 struct buffer_page *reader; 2996 unsigned length; 2997 2998 reader = rb_get_reader_page(cpu_buffer); 2999 3000 /* This function should not be called when buffer is empty */ 3001 if (RB_WARN_ON(cpu_buffer, !reader)) 3002 return; 3003 3004 event = rb_reader_event(cpu_buffer); 3005 3006 if (event->type_len <= RINGBUF_TYPE_DATA_TYPE_LEN_MAX) 3007 cpu_buffer->read++; 3008 3009 rb_update_read_stamp(cpu_buffer, event); 3010 3011 length = rb_event_length(event); 3012 cpu_buffer->reader_page->read += length; 3013 } 3014 3015 static void rb_advance_iter(struct ring_buffer_iter *iter) 3016 { 3017 struct ring_buffer_per_cpu *cpu_buffer; 3018 struct ring_buffer_event *event; 3019 unsigned length; 3020 3021 cpu_buffer = iter->cpu_buffer; 3022 3023 /* 3024 * Check if we are at the end of the buffer. 3025 */ 3026 if (iter->head >= rb_page_size(iter->head_page)) { 3027 /* discarded commits can make the page empty */ 3028 if (iter->head_page == cpu_buffer->commit_page) 3029 return; 3030 rb_inc_iter(iter); 3031 return; 3032 } 3033 3034 event = rb_iter_head_event(iter); 3035 3036 length = rb_event_length(event); 3037 3038 /* 3039 * This should not be called to advance the header if we are 3040 * at the tail of the buffer. 3041 */ 3042 if (RB_WARN_ON(cpu_buffer, 3043 (iter->head_page == cpu_buffer->commit_page) && 3044 (iter->head + length > rb_commit_index(cpu_buffer)))) 3045 return; 3046 3047 rb_update_iter_read_stamp(iter, event); 3048 3049 iter->head += length; 3050 3051 /* check for end of page padding */ 3052 if ((iter->head >= rb_page_size(iter->head_page)) && 3053 (iter->head_page != cpu_buffer->commit_page)) 3054 rb_advance_iter(iter); 3055 } 3056 3057 static int rb_lost_events(struct ring_buffer_per_cpu *cpu_buffer) 3058 { 3059 return cpu_buffer->lost_events; 3060 } 3061 3062 static struct ring_buffer_event * 3063 rb_buffer_peek(struct ring_buffer_per_cpu *cpu_buffer, u64 *ts, 3064 unsigned long *lost_events) 3065 { 3066 struct ring_buffer_event *event; 3067 struct buffer_page *reader; 3068 int nr_loops = 0; 3069 3070 again: 3071 /* 3072 * We repeat when a time extend is encountered. 3073 * Since the time extend is always attached to a data event, 3074 * we should never loop more than once. 3075 * (We never hit the following condition more than twice). 3076 */ 3077 if (RB_WARN_ON(cpu_buffer, ++nr_loops > 2)) 3078 return NULL; 3079 3080 reader = rb_get_reader_page(cpu_buffer); 3081 if (!reader) 3082 return NULL; 3083 3084 event = rb_reader_event(cpu_buffer); 3085 3086 switch (event->type_len) { 3087 case RINGBUF_TYPE_PADDING: 3088 if (rb_null_event(event)) 3089 RB_WARN_ON(cpu_buffer, 1); 3090 /* 3091 * Because the writer could be discarding every 3092 * event it creates (which would probably be bad) 3093 * if we were to go back to "again" then we may never 3094 * catch up, and will trigger the warn on, or lock 3095 * the box. Return the padding, and we will release 3096 * the current locks, and try again. 3097 */ 3098 return event; 3099 3100 case RINGBUF_TYPE_TIME_EXTEND: 3101 /* Internal data, OK to advance */ 3102 rb_advance_reader(cpu_buffer); 3103 goto again; 3104 3105 case RINGBUF_TYPE_TIME_STAMP: 3106 /* FIXME: not implemented */ 3107 rb_advance_reader(cpu_buffer); 3108 goto again; 3109 3110 case RINGBUF_TYPE_DATA: 3111 if (ts) { 3112 *ts = cpu_buffer->read_stamp + event->time_delta; 3113 ring_buffer_normalize_time_stamp(cpu_buffer->buffer, 3114 cpu_buffer->cpu, ts); 3115 } 3116 if (lost_events) 3117 *lost_events = rb_lost_events(cpu_buffer); 3118 return event; 3119 3120 default: 3121 BUG(); 3122 } 3123 3124 return NULL; 3125 } 3126 EXPORT_SYMBOL_GPL(ring_buffer_peek); 3127 3128 static struct ring_buffer_event * 3129 rb_iter_peek(struct ring_buffer_iter *iter, u64 *ts) 3130 { 3131 struct ring_buffer *buffer; 3132 struct ring_buffer_per_cpu *cpu_buffer; 3133 struct ring_buffer_event *event; 3134 int nr_loops = 0; 3135 3136 cpu_buffer = iter->cpu_buffer; 3137 buffer = cpu_buffer->buffer; 3138 3139 /* 3140 * Check if someone performed a consuming read to 3141 * the buffer. A consuming read invalidates the iterator 3142 * and we need to reset the iterator in this case. 3143 */ 3144 if (unlikely(iter->cache_read != cpu_buffer->read || 3145 iter->cache_reader_page != cpu_buffer->reader_page)) 3146 rb_iter_reset(iter); 3147 3148 again: 3149 if (ring_buffer_iter_empty(iter)) 3150 return NULL; 3151 3152 /* 3153 * We repeat when a time extend is encountered. 3154 * Since the time extend is always attached to a data event, 3155 * we should never loop more than once. 3156 * (We never hit the following condition more than twice). 3157 */ 3158 if (RB_WARN_ON(cpu_buffer, ++nr_loops > 2)) 3159 return NULL; 3160 3161 if (rb_per_cpu_empty(cpu_buffer)) 3162 return NULL; 3163 3164 if (iter->head >= local_read(&iter->head_page->page->commit)) { 3165 rb_inc_iter(iter); 3166 goto again; 3167 } 3168 3169 event = rb_iter_head_event(iter); 3170 3171 switch (event->type_len) { 3172 case RINGBUF_TYPE_PADDING: 3173 if (rb_null_event(event)) { 3174 rb_inc_iter(iter); 3175 goto again; 3176 } 3177 rb_advance_iter(iter); 3178 return event; 3179 3180 case RINGBUF_TYPE_TIME_EXTEND: 3181 /* Internal data, OK to advance */ 3182 rb_advance_iter(iter); 3183 goto again; 3184 3185 case RINGBUF_TYPE_TIME_STAMP: 3186 /* FIXME: not implemented */ 3187 rb_advance_iter(iter); 3188 goto again; 3189 3190 case RINGBUF_TYPE_DATA: 3191 if (ts) { 3192 *ts = iter->read_stamp + event->time_delta; 3193 ring_buffer_normalize_time_stamp(buffer, 3194 cpu_buffer->cpu, ts); 3195 } 3196 return event; 3197 3198 default: 3199 BUG(); 3200 } 3201 3202 return NULL; 3203 } 3204 EXPORT_SYMBOL_GPL(ring_buffer_iter_peek); 3205 3206 static inline int rb_ok_to_lock(void) 3207 { 3208 /* 3209 * If an NMI die dumps out the content of the ring buffer 3210 * do not grab locks. We also permanently disable the ring 3211 * buffer too. A one time deal is all you get from reading 3212 * the ring buffer from an NMI. 3213 */ 3214 if (likely(!in_nmi())) 3215 return 1; 3216 3217 tracing_off_permanent(); 3218 return 0; 3219 } 3220 3221 /** 3222 * ring_buffer_peek - peek at the next event to be read 3223 * @buffer: The ring buffer to read 3224 * @cpu: The cpu to peak at 3225 * @ts: The timestamp counter of this event. 3226 * @lost_events: a variable to store if events were lost (may be NULL) 3227 * 3228 * This will return the event that will be read next, but does 3229 * not consume the data. 3230 */ 3231 struct ring_buffer_event * 3232 ring_buffer_peek(struct ring_buffer *buffer, int cpu, u64 *ts, 3233 unsigned long *lost_events) 3234 { 3235 struct ring_buffer_per_cpu *cpu_buffer = buffer->buffers[cpu]; 3236 struct ring_buffer_event *event; 3237 unsigned long flags; 3238 int dolock; 3239 3240 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 3241 return NULL; 3242 3243 dolock = rb_ok_to_lock(); 3244 again: 3245 local_irq_save(flags); 3246 if (dolock) 3247 spin_lock(&cpu_buffer->reader_lock); 3248 event = rb_buffer_peek(cpu_buffer, ts, lost_events); 3249 if (event && event->type_len == RINGBUF_TYPE_PADDING) 3250 rb_advance_reader(cpu_buffer); 3251 if (dolock) 3252 spin_unlock(&cpu_buffer->reader_lock); 3253 local_irq_restore(flags); 3254 3255 if (event && event->type_len == RINGBUF_TYPE_PADDING) 3256 goto again; 3257 3258 return event; 3259 } 3260 3261 /** 3262 * ring_buffer_iter_peek - peek at the next event to be read 3263 * @iter: The ring buffer iterator 3264 * @ts: The timestamp counter of this event. 3265 * 3266 * This will return the event that will be read next, but does 3267 * not increment the iterator. 3268 */ 3269 struct ring_buffer_event * 3270 ring_buffer_iter_peek(struct ring_buffer_iter *iter, u64 *ts) 3271 { 3272 struct ring_buffer_per_cpu *cpu_buffer = iter->cpu_buffer; 3273 struct ring_buffer_event *event; 3274 unsigned long flags; 3275 3276 again: 3277 spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 3278 event = rb_iter_peek(iter, ts); 3279 spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 3280 3281 if (event && event->type_len == RINGBUF_TYPE_PADDING) 3282 goto again; 3283 3284 return event; 3285 } 3286 3287 /** 3288 * ring_buffer_consume - return an event and consume it 3289 * @buffer: The ring buffer to get the next event from 3290 * @cpu: the cpu to read the buffer from 3291 * @ts: a variable to store the timestamp (may be NULL) 3292 * @lost_events: a variable to store if events were lost (may be NULL) 3293 * 3294 * Returns the next event in the ring buffer, and that event is consumed. 3295 * Meaning, that sequential reads will keep returning a different event, 3296 * and eventually empty the ring buffer if the producer is slower. 3297 */ 3298 struct ring_buffer_event * 3299 ring_buffer_consume(struct ring_buffer *buffer, int cpu, u64 *ts, 3300 unsigned long *lost_events) 3301 { 3302 struct ring_buffer_per_cpu *cpu_buffer; 3303 struct ring_buffer_event *event = NULL; 3304 unsigned long flags; 3305 int dolock; 3306 3307 dolock = rb_ok_to_lock(); 3308 3309 again: 3310 /* might be called in atomic */ 3311 preempt_disable(); 3312 3313 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 3314 goto out; 3315 3316 cpu_buffer = buffer->buffers[cpu]; 3317 local_irq_save(flags); 3318 if (dolock) 3319 spin_lock(&cpu_buffer->reader_lock); 3320 3321 event = rb_buffer_peek(cpu_buffer, ts, lost_events); 3322 if (event) { 3323 cpu_buffer->lost_events = 0; 3324 rb_advance_reader(cpu_buffer); 3325 } 3326 3327 if (dolock) 3328 spin_unlock(&cpu_buffer->reader_lock); 3329 local_irq_restore(flags); 3330 3331 out: 3332 preempt_enable(); 3333 3334 if (event && event->type_len == RINGBUF_TYPE_PADDING) 3335 goto again; 3336 3337 return event; 3338 } 3339 EXPORT_SYMBOL_GPL(ring_buffer_consume); 3340 3341 /** 3342 * ring_buffer_read_prepare - Prepare for a non consuming read of the buffer 3343 * @buffer: The ring buffer to read from 3344 * @cpu: The cpu buffer to iterate over 3345 * 3346 * This performs the initial preparations necessary to iterate 3347 * through the buffer. Memory is allocated, buffer recording 3348 * is disabled, and the iterator pointer is returned to the caller. 3349 * 3350 * Disabling buffer recordng prevents the reading from being 3351 * corrupted. This is not a consuming read, so a producer is not 3352 * expected. 3353 * 3354 * After a sequence of ring_buffer_read_prepare calls, the user is 3355 * expected to make at least one call to ring_buffer_prepare_sync. 3356 * Afterwards, ring_buffer_read_start is invoked to get things going 3357 * for real. 3358 * 3359 * This overall must be paired with ring_buffer_finish. 3360 */ 3361 struct ring_buffer_iter * 3362 ring_buffer_read_prepare(struct ring_buffer *buffer, int cpu) 3363 { 3364 struct ring_buffer_per_cpu *cpu_buffer; 3365 struct ring_buffer_iter *iter; 3366 3367 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 3368 return NULL; 3369 3370 iter = kmalloc(sizeof(*iter), GFP_KERNEL); 3371 if (!iter) 3372 return NULL; 3373 3374 cpu_buffer = buffer->buffers[cpu]; 3375 3376 iter->cpu_buffer = cpu_buffer; 3377 3378 atomic_inc(&cpu_buffer->record_disabled); 3379 3380 return iter; 3381 } 3382 EXPORT_SYMBOL_GPL(ring_buffer_read_prepare); 3383 3384 /** 3385 * ring_buffer_read_prepare_sync - Synchronize a set of prepare calls 3386 * 3387 * All previously invoked ring_buffer_read_prepare calls to prepare 3388 * iterators will be synchronized. Afterwards, read_buffer_read_start 3389 * calls on those iterators are allowed. 3390 */ 3391 void 3392 ring_buffer_read_prepare_sync(void) 3393 { 3394 synchronize_sched(); 3395 } 3396 EXPORT_SYMBOL_GPL(ring_buffer_read_prepare_sync); 3397 3398 /** 3399 * ring_buffer_read_start - start a non consuming read of the buffer 3400 * @iter: The iterator returned by ring_buffer_read_prepare 3401 * 3402 * This finalizes the startup of an iteration through the buffer. 3403 * The iterator comes from a call to ring_buffer_read_prepare and 3404 * an intervening ring_buffer_read_prepare_sync must have been 3405 * performed. 3406 * 3407 * Must be paired with ring_buffer_finish. 3408 */ 3409 void 3410 ring_buffer_read_start(struct ring_buffer_iter *iter) 3411 { 3412 struct ring_buffer_per_cpu *cpu_buffer; 3413 unsigned long flags; 3414 3415 if (!iter) 3416 return; 3417 3418 cpu_buffer = iter->cpu_buffer; 3419 3420 spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 3421 arch_spin_lock(&cpu_buffer->lock); 3422 rb_iter_reset(iter); 3423 arch_spin_unlock(&cpu_buffer->lock); 3424 spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 3425 } 3426 EXPORT_SYMBOL_GPL(ring_buffer_read_start); 3427 3428 /** 3429 * ring_buffer_finish - finish reading the iterator of the buffer 3430 * @iter: The iterator retrieved by ring_buffer_start 3431 * 3432 * This re-enables the recording to the buffer, and frees the 3433 * iterator. 3434 */ 3435 void 3436 ring_buffer_read_finish(struct ring_buffer_iter *iter) 3437 { 3438 struct ring_buffer_per_cpu *cpu_buffer = iter->cpu_buffer; 3439 3440 atomic_dec(&cpu_buffer->record_disabled); 3441 kfree(iter); 3442 } 3443 EXPORT_SYMBOL_GPL(ring_buffer_read_finish); 3444 3445 /** 3446 * ring_buffer_read - read the next item in the ring buffer by the iterator 3447 * @iter: The ring buffer iterator 3448 * @ts: The time stamp of the event read. 3449 * 3450 * This reads the next event in the ring buffer and increments the iterator. 3451 */ 3452 struct ring_buffer_event * 3453 ring_buffer_read(struct ring_buffer_iter *iter, u64 *ts) 3454 { 3455 struct ring_buffer_event *event; 3456 struct ring_buffer_per_cpu *cpu_buffer = iter->cpu_buffer; 3457 unsigned long flags; 3458 3459 spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 3460 again: 3461 event = rb_iter_peek(iter, ts); 3462 if (!event) 3463 goto out; 3464 3465 if (event->type_len == RINGBUF_TYPE_PADDING) 3466 goto again; 3467 3468 rb_advance_iter(iter); 3469 out: 3470 spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 3471 3472 return event; 3473 } 3474 EXPORT_SYMBOL_GPL(ring_buffer_read); 3475 3476 /** 3477 * ring_buffer_size - return the size of the ring buffer (in bytes) 3478 * @buffer: The ring buffer. 3479 */ 3480 unsigned long ring_buffer_size(struct ring_buffer *buffer) 3481 { 3482 return BUF_PAGE_SIZE * buffer->pages; 3483 } 3484 EXPORT_SYMBOL_GPL(ring_buffer_size); 3485 3486 static void 3487 rb_reset_cpu(struct ring_buffer_per_cpu *cpu_buffer) 3488 { 3489 rb_head_page_deactivate(cpu_buffer); 3490 3491 cpu_buffer->head_page 3492 = list_entry(cpu_buffer->pages, struct buffer_page, list); 3493 local_set(&cpu_buffer->head_page->write, 0); 3494 local_set(&cpu_buffer->head_page->entries, 0); 3495 local_set(&cpu_buffer->head_page->page->commit, 0); 3496 3497 cpu_buffer->head_page->read = 0; 3498 3499 cpu_buffer->tail_page = cpu_buffer->head_page; 3500 cpu_buffer->commit_page = cpu_buffer->head_page; 3501 3502 INIT_LIST_HEAD(&cpu_buffer->reader_page->list); 3503 local_set(&cpu_buffer->reader_page->write, 0); 3504 local_set(&cpu_buffer->reader_page->entries, 0); 3505 local_set(&cpu_buffer->reader_page->page->commit, 0); 3506 cpu_buffer->reader_page->read = 0; 3507 3508 local_set(&cpu_buffer->commit_overrun, 0); 3509 local_set(&cpu_buffer->overrun, 0); 3510 local_set(&cpu_buffer->entries, 0); 3511 local_set(&cpu_buffer->committing, 0); 3512 local_set(&cpu_buffer->commits, 0); 3513 cpu_buffer->read = 0; 3514 3515 cpu_buffer->write_stamp = 0; 3516 cpu_buffer->read_stamp = 0; 3517 3518 cpu_buffer->lost_events = 0; 3519 cpu_buffer->last_overrun = 0; 3520 3521 rb_head_page_activate(cpu_buffer); 3522 } 3523 3524 /** 3525 * ring_buffer_reset_cpu - reset a ring buffer per CPU buffer 3526 * @buffer: The ring buffer to reset a per cpu buffer of 3527 * @cpu: The CPU buffer to be reset 3528 */ 3529 void ring_buffer_reset_cpu(struct ring_buffer *buffer, int cpu) 3530 { 3531 struct ring_buffer_per_cpu *cpu_buffer = buffer->buffers[cpu]; 3532 unsigned long flags; 3533 3534 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 3535 return; 3536 3537 atomic_inc(&cpu_buffer->record_disabled); 3538 3539 spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 3540 3541 if (RB_WARN_ON(cpu_buffer, local_read(&cpu_buffer->committing))) 3542 goto out; 3543 3544 arch_spin_lock(&cpu_buffer->lock); 3545 3546 rb_reset_cpu(cpu_buffer); 3547 3548 arch_spin_unlock(&cpu_buffer->lock); 3549 3550 out: 3551 spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 3552 3553 atomic_dec(&cpu_buffer->record_disabled); 3554 } 3555 EXPORT_SYMBOL_GPL(ring_buffer_reset_cpu); 3556 3557 /** 3558 * ring_buffer_reset - reset a ring buffer 3559 * @buffer: The ring buffer to reset all cpu buffers 3560 */ 3561 void ring_buffer_reset(struct ring_buffer *buffer) 3562 { 3563 int cpu; 3564 3565 for_each_buffer_cpu(buffer, cpu) 3566 ring_buffer_reset_cpu(buffer, cpu); 3567 } 3568 EXPORT_SYMBOL_GPL(ring_buffer_reset); 3569 3570 /** 3571 * rind_buffer_empty - is the ring buffer empty? 3572 * @buffer: The ring buffer to test 3573 */ 3574 int ring_buffer_empty(struct ring_buffer *buffer) 3575 { 3576 struct ring_buffer_per_cpu *cpu_buffer; 3577 unsigned long flags; 3578 int dolock; 3579 int cpu; 3580 int ret; 3581 3582 dolock = rb_ok_to_lock(); 3583 3584 /* yes this is racy, but if you don't like the race, lock the buffer */ 3585 for_each_buffer_cpu(buffer, cpu) { 3586 cpu_buffer = buffer->buffers[cpu]; 3587 local_irq_save(flags); 3588 if (dolock) 3589 spin_lock(&cpu_buffer->reader_lock); 3590 ret = rb_per_cpu_empty(cpu_buffer); 3591 if (dolock) 3592 spin_unlock(&cpu_buffer->reader_lock); 3593 local_irq_restore(flags); 3594 3595 if (!ret) 3596 return 0; 3597 } 3598 3599 return 1; 3600 } 3601 EXPORT_SYMBOL_GPL(ring_buffer_empty); 3602 3603 /** 3604 * ring_buffer_empty_cpu - is a cpu buffer of a ring buffer empty? 3605 * @buffer: The ring buffer 3606 * @cpu: The CPU buffer to test 3607 */ 3608 int ring_buffer_empty_cpu(struct ring_buffer *buffer, int cpu) 3609 { 3610 struct ring_buffer_per_cpu *cpu_buffer; 3611 unsigned long flags; 3612 int dolock; 3613 int ret; 3614 3615 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 3616 return 1; 3617 3618 dolock = rb_ok_to_lock(); 3619 3620 cpu_buffer = buffer->buffers[cpu]; 3621 local_irq_save(flags); 3622 if (dolock) 3623 spin_lock(&cpu_buffer->reader_lock); 3624 ret = rb_per_cpu_empty(cpu_buffer); 3625 if (dolock) 3626 spin_unlock(&cpu_buffer->reader_lock); 3627 local_irq_restore(flags); 3628 3629 return ret; 3630 } 3631 EXPORT_SYMBOL_GPL(ring_buffer_empty_cpu); 3632 3633 #ifdef CONFIG_RING_BUFFER_ALLOW_SWAP 3634 /** 3635 * ring_buffer_swap_cpu - swap a CPU buffer between two ring buffers 3636 * @buffer_a: One buffer to swap with 3637 * @buffer_b: The other buffer to swap with 3638 * 3639 * This function is useful for tracers that want to take a "snapshot" 3640 * of a CPU buffer and has another back up buffer lying around. 3641 * it is expected that the tracer handles the cpu buffer not being 3642 * used at the moment. 3643 */ 3644 int ring_buffer_swap_cpu(struct ring_buffer *buffer_a, 3645 struct ring_buffer *buffer_b, int cpu) 3646 { 3647 struct ring_buffer_per_cpu *cpu_buffer_a; 3648 struct ring_buffer_per_cpu *cpu_buffer_b; 3649 int ret = -EINVAL; 3650 3651 if (!cpumask_test_cpu(cpu, buffer_a->cpumask) || 3652 !cpumask_test_cpu(cpu, buffer_b->cpumask)) 3653 goto out; 3654 3655 /* At least make sure the two buffers are somewhat the same */ 3656 if (buffer_a->pages != buffer_b->pages) 3657 goto out; 3658 3659 ret = -EAGAIN; 3660 3661 if (ring_buffer_flags != RB_BUFFERS_ON) 3662 goto out; 3663 3664 if (atomic_read(&buffer_a->record_disabled)) 3665 goto out; 3666 3667 if (atomic_read(&buffer_b->record_disabled)) 3668 goto out; 3669 3670 cpu_buffer_a = buffer_a->buffers[cpu]; 3671 cpu_buffer_b = buffer_b->buffers[cpu]; 3672 3673 if (atomic_read(&cpu_buffer_a->record_disabled)) 3674 goto out; 3675 3676 if (atomic_read(&cpu_buffer_b->record_disabled)) 3677 goto out; 3678 3679 /* 3680 * We can't do a synchronize_sched here because this 3681 * function can be called in atomic context. 3682 * Normally this will be called from the same CPU as cpu. 3683 * If not it's up to the caller to protect this. 3684 */ 3685 atomic_inc(&cpu_buffer_a->record_disabled); 3686 atomic_inc(&cpu_buffer_b->record_disabled); 3687 3688 ret = -EBUSY; 3689 if (local_read(&cpu_buffer_a->committing)) 3690 goto out_dec; 3691 if (local_read(&cpu_buffer_b->committing)) 3692 goto out_dec; 3693 3694 buffer_a->buffers[cpu] = cpu_buffer_b; 3695 buffer_b->buffers[cpu] = cpu_buffer_a; 3696 3697 cpu_buffer_b->buffer = buffer_a; 3698 cpu_buffer_a->buffer = buffer_b; 3699 3700 ret = 0; 3701 3702 out_dec: 3703 atomic_dec(&cpu_buffer_a->record_disabled); 3704 atomic_dec(&cpu_buffer_b->record_disabled); 3705 out: 3706 return ret; 3707 } 3708 EXPORT_SYMBOL_GPL(ring_buffer_swap_cpu); 3709 #endif /* CONFIG_RING_BUFFER_ALLOW_SWAP */ 3710 3711 /** 3712 * ring_buffer_alloc_read_page - allocate a page to read from buffer 3713 * @buffer: the buffer to allocate for. 3714 * 3715 * This function is used in conjunction with ring_buffer_read_page. 3716 * When reading a full page from the ring buffer, these functions 3717 * can be used to speed up the process. The calling function should 3718 * allocate a few pages first with this function. Then when it 3719 * needs to get pages from the ring buffer, it passes the result 3720 * of this function into ring_buffer_read_page, which will swap 3721 * the page that was allocated, with the read page of the buffer. 3722 * 3723 * Returns: 3724 * The page allocated, or NULL on error. 3725 */ 3726 void *ring_buffer_alloc_read_page(struct ring_buffer *buffer) 3727 { 3728 struct buffer_data_page *bpage; 3729 unsigned long addr; 3730 3731 addr = __get_free_page(GFP_KERNEL); 3732 if (!addr) 3733 return NULL; 3734 3735 bpage = (void *)addr; 3736 3737 rb_init_page(bpage); 3738 3739 return bpage; 3740 } 3741 EXPORT_SYMBOL_GPL(ring_buffer_alloc_read_page); 3742 3743 /** 3744 * ring_buffer_free_read_page - free an allocated read page 3745 * @buffer: the buffer the page was allocate for 3746 * @data: the page to free 3747 * 3748 * Free a page allocated from ring_buffer_alloc_read_page. 3749 */ 3750 void ring_buffer_free_read_page(struct ring_buffer *buffer, void *data) 3751 { 3752 free_page((unsigned long)data); 3753 } 3754 EXPORT_SYMBOL_GPL(ring_buffer_free_read_page); 3755 3756 /** 3757 * ring_buffer_read_page - extract a page from the ring buffer 3758 * @buffer: buffer to extract from 3759 * @data_page: the page to use allocated from ring_buffer_alloc_read_page 3760 * @len: amount to extract 3761 * @cpu: the cpu of the buffer to extract 3762 * @full: should the extraction only happen when the page is full. 3763 * 3764 * This function will pull out a page from the ring buffer and consume it. 3765 * @data_page must be the address of the variable that was returned 3766 * from ring_buffer_alloc_read_page. This is because the page might be used 3767 * to swap with a page in the ring buffer. 3768 * 3769 * for example: 3770 * rpage = ring_buffer_alloc_read_page(buffer); 3771 * if (!rpage) 3772 * return error; 3773 * ret = ring_buffer_read_page(buffer, &rpage, len, cpu, 0); 3774 * if (ret >= 0) 3775 * process_page(rpage, ret); 3776 * 3777 * When @full is set, the function will not return true unless 3778 * the writer is off the reader page. 3779 * 3780 * Note: it is up to the calling functions to handle sleeps and wakeups. 3781 * The ring buffer can be used anywhere in the kernel and can not 3782 * blindly call wake_up. The layer that uses the ring buffer must be 3783 * responsible for that. 3784 * 3785 * Returns: 3786 * >=0 if data has been transferred, returns the offset of consumed data. 3787 * <0 if no data has been transferred. 3788 */ 3789 int ring_buffer_read_page(struct ring_buffer *buffer, 3790 void **data_page, size_t len, int cpu, int full) 3791 { 3792 struct ring_buffer_per_cpu *cpu_buffer = buffer->buffers[cpu]; 3793 struct ring_buffer_event *event; 3794 struct buffer_data_page *bpage; 3795 struct buffer_page *reader; 3796 unsigned long missed_events; 3797 unsigned long flags; 3798 unsigned int commit; 3799 unsigned int read; 3800 u64 save_timestamp; 3801 int ret = -1; 3802 3803 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 3804 goto out; 3805 3806 /* 3807 * If len is not big enough to hold the page header, then 3808 * we can not copy anything. 3809 */ 3810 if (len <= BUF_PAGE_HDR_SIZE) 3811 goto out; 3812 3813 len -= BUF_PAGE_HDR_SIZE; 3814 3815 if (!data_page) 3816 goto out; 3817 3818 bpage = *data_page; 3819 if (!bpage) 3820 goto out; 3821 3822 spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 3823 3824 reader = rb_get_reader_page(cpu_buffer); 3825 if (!reader) 3826 goto out_unlock; 3827 3828 event = rb_reader_event(cpu_buffer); 3829 3830 read = reader->read; 3831 commit = rb_page_commit(reader); 3832 3833 /* Check if any events were dropped */ 3834 missed_events = cpu_buffer->lost_events; 3835 3836 /* 3837 * If this page has been partially read or 3838 * if len is not big enough to read the rest of the page or 3839 * a writer is still on the page, then 3840 * we must copy the data from the page to the buffer. 3841 * Otherwise, we can simply swap the page with the one passed in. 3842 */ 3843 if (read || (len < (commit - read)) || 3844 cpu_buffer->reader_page == cpu_buffer->commit_page) { 3845 struct buffer_data_page *rpage = cpu_buffer->reader_page->page; 3846 unsigned int rpos = read; 3847 unsigned int pos = 0; 3848 unsigned int size; 3849 3850 if (full) 3851 goto out_unlock; 3852 3853 if (len > (commit - read)) 3854 len = (commit - read); 3855 3856 /* Always keep the time extend and data together */ 3857 size = rb_event_ts_length(event); 3858 3859 if (len < size) 3860 goto out_unlock; 3861 3862 /* save the current timestamp, since the user will need it */ 3863 save_timestamp = cpu_buffer->read_stamp; 3864 3865 /* Need to copy one event at a time */ 3866 do { 3867 /* We need the size of one event, because 3868 * rb_advance_reader only advances by one event, 3869 * whereas rb_event_ts_length may include the size of 3870 * one or two events. 3871 * We have already ensured there's enough space if this 3872 * is a time extend. */ 3873 size = rb_event_length(event); 3874 memcpy(bpage->data + pos, rpage->data + rpos, size); 3875 3876 len -= size; 3877 3878 rb_advance_reader(cpu_buffer); 3879 rpos = reader->read; 3880 pos += size; 3881 3882 if (rpos >= commit) 3883 break; 3884 3885 event = rb_reader_event(cpu_buffer); 3886 /* Always keep the time extend and data together */ 3887 size = rb_event_ts_length(event); 3888 } while (len >= size); 3889 3890 /* update bpage */ 3891 local_set(&bpage->commit, pos); 3892 bpage->time_stamp = save_timestamp; 3893 3894 /* we copied everything to the beginning */ 3895 read = 0; 3896 } else { 3897 /* update the entry counter */ 3898 cpu_buffer->read += rb_page_entries(reader); 3899 3900 /* swap the pages */ 3901 rb_init_page(bpage); 3902 bpage = reader->page; 3903 reader->page = *data_page; 3904 local_set(&reader->write, 0); 3905 local_set(&reader->entries, 0); 3906 reader->read = 0; 3907 *data_page = bpage; 3908 3909 /* 3910 * Use the real_end for the data size, 3911 * This gives us a chance to store the lost events 3912 * on the page. 3913 */ 3914 if (reader->real_end) 3915 local_set(&bpage->commit, reader->real_end); 3916 } 3917 ret = read; 3918 3919 cpu_buffer->lost_events = 0; 3920 3921 commit = local_read(&bpage->commit); 3922 /* 3923 * Set a flag in the commit field if we lost events 3924 */ 3925 if (missed_events) { 3926 /* If there is room at the end of the page to save the 3927 * missed events, then record it there. 3928 */ 3929 if (BUF_PAGE_SIZE - commit >= sizeof(missed_events)) { 3930 memcpy(&bpage->data[commit], &missed_events, 3931 sizeof(missed_events)); 3932 local_add(RB_MISSED_STORED, &bpage->commit); 3933 commit += sizeof(missed_events); 3934 } 3935 local_add(RB_MISSED_EVENTS, &bpage->commit); 3936 } 3937 3938 /* 3939 * This page may be off to user land. Zero it out here. 3940 */ 3941 if (commit < BUF_PAGE_SIZE) 3942 memset(&bpage->data[commit], 0, BUF_PAGE_SIZE - commit); 3943 3944 out_unlock: 3945 spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 3946 3947 out: 3948 return ret; 3949 } 3950 EXPORT_SYMBOL_GPL(ring_buffer_read_page); 3951 3952 #ifdef CONFIG_TRACING 3953 static ssize_t 3954 rb_simple_read(struct file *filp, char __user *ubuf, 3955 size_t cnt, loff_t *ppos) 3956 { 3957 unsigned long *p = filp->private_data; 3958 char buf[64]; 3959 int r; 3960 3961 if (test_bit(RB_BUFFERS_DISABLED_BIT, p)) 3962 r = sprintf(buf, "permanently disabled\n"); 3963 else 3964 r = sprintf(buf, "%d\n", test_bit(RB_BUFFERS_ON_BIT, p)); 3965 3966 return simple_read_from_buffer(ubuf, cnt, ppos, buf, r); 3967 } 3968 3969 static ssize_t 3970 rb_simple_write(struct file *filp, const char __user *ubuf, 3971 size_t cnt, loff_t *ppos) 3972 { 3973 unsigned long *p = filp->private_data; 3974 char buf[64]; 3975 unsigned long val; 3976 int ret; 3977 3978 if (cnt >= sizeof(buf)) 3979 return -EINVAL; 3980 3981 if (copy_from_user(&buf, ubuf, cnt)) 3982 return -EFAULT; 3983 3984 buf[cnt] = 0; 3985 3986 ret = strict_strtoul(buf, 10, &val); 3987 if (ret < 0) 3988 return ret; 3989 3990 if (val) 3991 set_bit(RB_BUFFERS_ON_BIT, p); 3992 else 3993 clear_bit(RB_BUFFERS_ON_BIT, p); 3994 3995 (*ppos)++; 3996 3997 return cnt; 3998 } 3999 4000 static const struct file_operations rb_simple_fops = { 4001 .open = tracing_open_generic, 4002 .read = rb_simple_read, 4003 .write = rb_simple_write, 4004 .llseek = default_llseek, 4005 }; 4006 4007 4008 static __init int rb_init_debugfs(void) 4009 { 4010 struct dentry *d_tracer; 4011 4012 d_tracer = tracing_init_dentry(); 4013 4014 trace_create_file("tracing_on", 0644, d_tracer, 4015 &ring_buffer_flags, &rb_simple_fops); 4016 4017 return 0; 4018 } 4019 4020 fs_initcall(rb_init_debugfs); 4021 #endif 4022 4023 #ifdef CONFIG_HOTPLUG_CPU 4024 static int rb_cpu_notify(struct notifier_block *self, 4025 unsigned long action, void *hcpu) 4026 { 4027 struct ring_buffer *buffer = 4028 container_of(self, struct ring_buffer, cpu_notify); 4029 long cpu = (long)hcpu; 4030 4031 switch (action) { 4032 case CPU_UP_PREPARE: 4033 case CPU_UP_PREPARE_FROZEN: 4034 if (cpumask_test_cpu(cpu, buffer->cpumask)) 4035 return NOTIFY_OK; 4036 4037 buffer->buffers[cpu] = 4038 rb_allocate_cpu_buffer(buffer, cpu); 4039 if (!buffer->buffers[cpu]) { 4040 WARN(1, "failed to allocate ring buffer on CPU %ld\n", 4041 cpu); 4042 return NOTIFY_OK; 4043 } 4044 smp_wmb(); 4045 cpumask_set_cpu(cpu, buffer->cpumask); 4046 break; 4047 case CPU_DOWN_PREPARE: 4048 case CPU_DOWN_PREPARE_FROZEN: 4049 /* 4050 * Do nothing. 4051 * If we were to free the buffer, then the user would 4052 * lose any trace that was in the buffer. 4053 */ 4054 break; 4055 default: 4056 break; 4057 } 4058 return NOTIFY_OK; 4059 } 4060 #endif 4061