1 /* 2 * Generic ring buffer 3 * 4 * Copyright (C) 2008 Steven Rostedt <srostedt@redhat.com> 5 */ 6 #include <linux/ring_buffer.h> 7 #include <linux/trace_clock.h> 8 #include <linux/spinlock.h> 9 #include <linux/debugfs.h> 10 #include <linux/uaccess.h> 11 #include <linux/hardirq.h> 12 #include <linux/kmemcheck.h> 13 #include <linux/module.h> 14 #include <linux/percpu.h> 15 #include <linux/mutex.h> 16 #include <linux/slab.h> 17 #include <linux/init.h> 18 #include <linux/hash.h> 19 #include <linux/list.h> 20 #include <linux/cpu.h> 21 #include <linux/fs.h> 22 23 #include <asm/local.h> 24 #include "trace.h" 25 26 /* 27 * The ring buffer header is special. We must manually up keep it. 28 */ 29 int ring_buffer_print_entry_header(struct trace_seq *s) 30 { 31 int ret; 32 33 ret = trace_seq_printf(s, "# compressed entry header\n"); 34 ret = trace_seq_printf(s, "\ttype_len : 5 bits\n"); 35 ret = trace_seq_printf(s, "\ttime_delta : 27 bits\n"); 36 ret = trace_seq_printf(s, "\tarray : 32 bits\n"); 37 ret = trace_seq_printf(s, "\n"); 38 ret = trace_seq_printf(s, "\tpadding : type == %d\n", 39 RINGBUF_TYPE_PADDING); 40 ret = trace_seq_printf(s, "\ttime_extend : type == %d\n", 41 RINGBUF_TYPE_TIME_EXTEND); 42 ret = trace_seq_printf(s, "\tdata max type_len == %d\n", 43 RINGBUF_TYPE_DATA_TYPE_LEN_MAX); 44 45 return ret; 46 } 47 48 /* 49 * The ring buffer is made up of a list of pages. A separate list of pages is 50 * allocated for each CPU. A writer may only write to a buffer that is 51 * associated with the CPU it is currently executing on. A reader may read 52 * from any per cpu buffer. 53 * 54 * The reader is special. For each per cpu buffer, the reader has its own 55 * reader page. When a reader has read the entire reader page, this reader 56 * page is swapped with another page in the ring buffer. 57 * 58 * Now, as long as the writer is off the reader page, the reader can do what 59 * ever it wants with that page. The writer will never write to that page 60 * again (as long as it is out of the ring buffer). 61 * 62 * Here's some silly ASCII art. 63 * 64 * +------+ 65 * |reader| RING BUFFER 66 * |page | 67 * +------+ +---+ +---+ +---+ 68 * | |-->| |-->| | 69 * +---+ +---+ +---+ 70 * ^ | 71 * | | 72 * +---------------+ 73 * 74 * 75 * +------+ 76 * |reader| RING BUFFER 77 * |page |------------------v 78 * +------+ +---+ +---+ +---+ 79 * | |-->| |-->| | 80 * +---+ +---+ +---+ 81 * ^ | 82 * | | 83 * +---------------+ 84 * 85 * 86 * +------+ 87 * |reader| RING BUFFER 88 * |page |------------------v 89 * +------+ +---+ +---+ +---+ 90 * ^ | |-->| |-->| | 91 * | +---+ +---+ +---+ 92 * | | 93 * | | 94 * +------------------------------+ 95 * 96 * 97 * +------+ 98 * |buffer| RING BUFFER 99 * |page |------------------v 100 * +------+ +---+ +---+ +---+ 101 * ^ | | | |-->| | 102 * | New +---+ +---+ +---+ 103 * | Reader------^ | 104 * | page | 105 * +------------------------------+ 106 * 107 * 108 * After we make this swap, the reader can hand this page off to the splice 109 * code and be done with it. It can even allocate a new page if it needs to 110 * and swap that into the ring buffer. 111 * 112 * We will be using cmpxchg soon to make all this lockless. 113 * 114 */ 115 116 /* 117 * A fast way to enable or disable all ring buffers is to 118 * call tracing_on or tracing_off. Turning off the ring buffers 119 * prevents all ring buffers from being recorded to. 120 * Turning this switch on, makes it OK to write to the 121 * ring buffer, if the ring buffer is enabled itself. 122 * 123 * There's three layers that must be on in order to write 124 * to the ring buffer. 125 * 126 * 1) This global flag must be set. 127 * 2) The ring buffer must be enabled for recording. 128 * 3) The per cpu buffer must be enabled for recording. 129 * 130 * In case of an anomaly, this global flag has a bit set that 131 * will permantly disable all ring buffers. 132 */ 133 134 /* 135 * Global flag to disable all recording to ring buffers 136 * This has two bits: ON, DISABLED 137 * 138 * ON DISABLED 139 * ---- ---------- 140 * 0 0 : ring buffers are off 141 * 1 0 : ring buffers are on 142 * X 1 : ring buffers are permanently disabled 143 */ 144 145 enum { 146 RB_BUFFERS_ON_BIT = 0, 147 RB_BUFFERS_DISABLED_BIT = 1, 148 }; 149 150 enum { 151 RB_BUFFERS_ON = 1 << RB_BUFFERS_ON_BIT, 152 RB_BUFFERS_DISABLED = 1 << RB_BUFFERS_DISABLED_BIT, 153 }; 154 155 static unsigned long ring_buffer_flags __read_mostly = RB_BUFFERS_ON; 156 157 #define BUF_PAGE_HDR_SIZE offsetof(struct buffer_data_page, data) 158 159 /** 160 * tracing_on - enable all tracing buffers 161 * 162 * This function enables all tracing buffers that may have been 163 * disabled with tracing_off. 164 */ 165 void tracing_on(void) 166 { 167 set_bit(RB_BUFFERS_ON_BIT, &ring_buffer_flags); 168 } 169 EXPORT_SYMBOL_GPL(tracing_on); 170 171 /** 172 * tracing_off - turn off all tracing buffers 173 * 174 * This function stops all tracing buffers from recording data. 175 * It does not disable any overhead the tracers themselves may 176 * be causing. This function simply causes all recording to 177 * the ring buffers to fail. 178 */ 179 void tracing_off(void) 180 { 181 clear_bit(RB_BUFFERS_ON_BIT, &ring_buffer_flags); 182 } 183 EXPORT_SYMBOL_GPL(tracing_off); 184 185 /** 186 * tracing_off_permanent - permanently disable ring buffers 187 * 188 * This function, once called, will disable all ring buffers 189 * permanently. 190 */ 191 void tracing_off_permanent(void) 192 { 193 set_bit(RB_BUFFERS_DISABLED_BIT, &ring_buffer_flags); 194 } 195 196 /** 197 * tracing_is_on - show state of ring buffers enabled 198 */ 199 int tracing_is_on(void) 200 { 201 return ring_buffer_flags == RB_BUFFERS_ON; 202 } 203 EXPORT_SYMBOL_GPL(tracing_is_on); 204 205 #define RB_EVNT_HDR_SIZE (offsetof(struct ring_buffer_event, array)) 206 #define RB_ALIGNMENT 4U 207 #define RB_MAX_SMALL_DATA (RB_ALIGNMENT * RINGBUF_TYPE_DATA_TYPE_LEN_MAX) 208 #define RB_EVNT_MIN_SIZE 8U /* two 32bit words */ 209 210 #if !defined(CONFIG_64BIT) || defined(CONFIG_HAVE_EFFICIENT_UNALIGNED_ACCESS) 211 # define RB_FORCE_8BYTE_ALIGNMENT 0 212 # define RB_ARCH_ALIGNMENT RB_ALIGNMENT 213 #else 214 # define RB_FORCE_8BYTE_ALIGNMENT 1 215 # define RB_ARCH_ALIGNMENT 8U 216 #endif 217 218 /* define RINGBUF_TYPE_DATA for 'case RINGBUF_TYPE_DATA:' */ 219 #define RINGBUF_TYPE_DATA 0 ... RINGBUF_TYPE_DATA_TYPE_LEN_MAX 220 221 enum { 222 RB_LEN_TIME_EXTEND = 8, 223 RB_LEN_TIME_STAMP = 16, 224 }; 225 226 #define skip_time_extend(event) \ 227 ((struct ring_buffer_event *)((char *)event + RB_LEN_TIME_EXTEND)) 228 229 static inline int rb_null_event(struct ring_buffer_event *event) 230 { 231 return event->type_len == RINGBUF_TYPE_PADDING && !event->time_delta; 232 } 233 234 static void rb_event_set_padding(struct ring_buffer_event *event) 235 { 236 /* padding has a NULL time_delta */ 237 event->type_len = RINGBUF_TYPE_PADDING; 238 event->time_delta = 0; 239 } 240 241 static unsigned 242 rb_event_data_length(struct ring_buffer_event *event) 243 { 244 unsigned length; 245 246 if (event->type_len) 247 length = event->type_len * RB_ALIGNMENT; 248 else 249 length = event->array[0]; 250 return length + RB_EVNT_HDR_SIZE; 251 } 252 253 /* 254 * Return the length of the given event. Will return 255 * the length of the time extend if the event is a 256 * time extend. 257 */ 258 static inline unsigned 259 rb_event_length(struct ring_buffer_event *event) 260 { 261 switch (event->type_len) { 262 case RINGBUF_TYPE_PADDING: 263 if (rb_null_event(event)) 264 /* undefined */ 265 return -1; 266 return event->array[0] + RB_EVNT_HDR_SIZE; 267 268 case RINGBUF_TYPE_TIME_EXTEND: 269 return RB_LEN_TIME_EXTEND; 270 271 case RINGBUF_TYPE_TIME_STAMP: 272 return RB_LEN_TIME_STAMP; 273 274 case RINGBUF_TYPE_DATA: 275 return rb_event_data_length(event); 276 default: 277 BUG(); 278 } 279 /* not hit */ 280 return 0; 281 } 282 283 /* 284 * Return total length of time extend and data, 285 * or just the event length for all other events. 286 */ 287 static inline unsigned 288 rb_event_ts_length(struct ring_buffer_event *event) 289 { 290 unsigned len = 0; 291 292 if (event->type_len == RINGBUF_TYPE_TIME_EXTEND) { 293 /* time extends include the data event after it */ 294 len = RB_LEN_TIME_EXTEND; 295 event = skip_time_extend(event); 296 } 297 return len + rb_event_length(event); 298 } 299 300 /** 301 * ring_buffer_event_length - return the length of the event 302 * @event: the event to get the length of 303 * 304 * Returns the size of the data load of a data event. 305 * If the event is something other than a data event, it 306 * returns the size of the event itself. With the exception 307 * of a TIME EXTEND, where it still returns the size of the 308 * data load of the data event after it. 309 */ 310 unsigned ring_buffer_event_length(struct ring_buffer_event *event) 311 { 312 unsigned length; 313 314 if (event->type_len == RINGBUF_TYPE_TIME_EXTEND) 315 event = skip_time_extend(event); 316 317 length = rb_event_length(event); 318 if (event->type_len > RINGBUF_TYPE_DATA_TYPE_LEN_MAX) 319 return length; 320 length -= RB_EVNT_HDR_SIZE; 321 if (length > RB_MAX_SMALL_DATA + sizeof(event->array[0])) 322 length -= sizeof(event->array[0]); 323 return length; 324 } 325 EXPORT_SYMBOL_GPL(ring_buffer_event_length); 326 327 /* inline for ring buffer fast paths */ 328 static void * 329 rb_event_data(struct ring_buffer_event *event) 330 { 331 if (event->type_len == RINGBUF_TYPE_TIME_EXTEND) 332 event = skip_time_extend(event); 333 BUG_ON(event->type_len > RINGBUF_TYPE_DATA_TYPE_LEN_MAX); 334 /* If length is in len field, then array[0] has the data */ 335 if (event->type_len) 336 return (void *)&event->array[0]; 337 /* Otherwise length is in array[0] and array[1] has the data */ 338 return (void *)&event->array[1]; 339 } 340 341 /** 342 * ring_buffer_event_data - return the data of the event 343 * @event: the event to get the data from 344 */ 345 void *ring_buffer_event_data(struct ring_buffer_event *event) 346 { 347 return rb_event_data(event); 348 } 349 EXPORT_SYMBOL_GPL(ring_buffer_event_data); 350 351 #define for_each_buffer_cpu(buffer, cpu) \ 352 for_each_cpu(cpu, buffer->cpumask) 353 354 #define TS_SHIFT 27 355 #define TS_MASK ((1ULL << TS_SHIFT) - 1) 356 #define TS_DELTA_TEST (~TS_MASK) 357 358 /* Flag when events were overwritten */ 359 #define RB_MISSED_EVENTS (1 << 31) 360 /* Missed count stored at end */ 361 #define RB_MISSED_STORED (1 << 30) 362 363 struct buffer_data_page { 364 u64 time_stamp; /* page time stamp */ 365 local_t commit; /* write committed index */ 366 unsigned char data[]; /* data of buffer page */ 367 }; 368 369 /* 370 * Note, the buffer_page list must be first. The buffer pages 371 * are allocated in cache lines, which means that each buffer 372 * page will be at the beginning of a cache line, and thus 373 * the least significant bits will be zero. We use this to 374 * add flags in the list struct pointers, to make the ring buffer 375 * lockless. 376 */ 377 struct buffer_page { 378 struct list_head list; /* list of buffer pages */ 379 local_t write; /* index for next write */ 380 unsigned read; /* index for next read */ 381 local_t entries; /* entries on this page */ 382 unsigned long real_end; /* real end of data */ 383 struct buffer_data_page *page; /* Actual data page */ 384 }; 385 386 /* 387 * The buffer page counters, write and entries, must be reset 388 * atomically when crossing page boundaries. To synchronize this 389 * update, two counters are inserted into the number. One is 390 * the actual counter for the write position or count on the page. 391 * 392 * The other is a counter of updaters. Before an update happens 393 * the update partition of the counter is incremented. This will 394 * allow the updater to update the counter atomically. 395 * 396 * The counter is 20 bits, and the state data is 12. 397 */ 398 #define RB_WRITE_MASK 0xfffff 399 #define RB_WRITE_INTCNT (1 << 20) 400 401 static void rb_init_page(struct buffer_data_page *bpage) 402 { 403 local_set(&bpage->commit, 0); 404 } 405 406 /** 407 * ring_buffer_page_len - the size of data on the page. 408 * @page: The page to read 409 * 410 * Returns the amount of data on the page, including buffer page header. 411 */ 412 size_t ring_buffer_page_len(void *page) 413 { 414 return local_read(&((struct buffer_data_page *)page)->commit) 415 + BUF_PAGE_HDR_SIZE; 416 } 417 418 /* 419 * Also stolen from mm/slob.c. Thanks to Mathieu Desnoyers for pointing 420 * this issue out. 421 */ 422 static void free_buffer_page(struct buffer_page *bpage) 423 { 424 free_page((unsigned long)bpage->page); 425 kfree(bpage); 426 } 427 428 /* 429 * We need to fit the time_stamp delta into 27 bits. 430 */ 431 static inline int test_time_stamp(u64 delta) 432 { 433 if (delta & TS_DELTA_TEST) 434 return 1; 435 return 0; 436 } 437 438 #define BUF_PAGE_SIZE (PAGE_SIZE - BUF_PAGE_HDR_SIZE) 439 440 /* Max payload is BUF_PAGE_SIZE - header (8bytes) */ 441 #define BUF_MAX_DATA_SIZE (BUF_PAGE_SIZE - (sizeof(u32) * 2)) 442 443 int ring_buffer_print_page_header(struct trace_seq *s) 444 { 445 struct buffer_data_page field; 446 int ret; 447 448 ret = trace_seq_printf(s, "\tfield: u64 timestamp;\t" 449 "offset:0;\tsize:%u;\tsigned:%u;\n", 450 (unsigned int)sizeof(field.time_stamp), 451 (unsigned int)is_signed_type(u64)); 452 453 ret = trace_seq_printf(s, "\tfield: local_t commit;\t" 454 "offset:%u;\tsize:%u;\tsigned:%u;\n", 455 (unsigned int)offsetof(typeof(field), commit), 456 (unsigned int)sizeof(field.commit), 457 (unsigned int)is_signed_type(long)); 458 459 ret = trace_seq_printf(s, "\tfield: int overwrite;\t" 460 "offset:%u;\tsize:%u;\tsigned:%u;\n", 461 (unsigned int)offsetof(typeof(field), commit), 462 1, 463 (unsigned int)is_signed_type(long)); 464 465 ret = trace_seq_printf(s, "\tfield: char data;\t" 466 "offset:%u;\tsize:%u;\tsigned:%u;\n", 467 (unsigned int)offsetof(typeof(field), data), 468 (unsigned int)BUF_PAGE_SIZE, 469 (unsigned int)is_signed_type(char)); 470 471 return ret; 472 } 473 474 /* 475 * head_page == tail_page && head == tail then buffer is empty. 476 */ 477 struct ring_buffer_per_cpu { 478 int cpu; 479 atomic_t record_disabled; 480 struct ring_buffer *buffer; 481 raw_spinlock_t reader_lock; /* serialize readers */ 482 arch_spinlock_t lock; 483 struct lock_class_key lock_key; 484 struct list_head *pages; 485 struct buffer_page *head_page; /* read from head */ 486 struct buffer_page *tail_page; /* write to tail */ 487 struct buffer_page *commit_page; /* committed pages */ 488 struct buffer_page *reader_page; 489 unsigned long lost_events; 490 unsigned long last_overrun; 491 local_t entries_bytes; 492 local_t commit_overrun; 493 local_t overrun; 494 local_t entries; 495 local_t committing; 496 local_t commits; 497 unsigned long read; 498 unsigned long read_bytes; 499 u64 write_stamp; 500 u64 read_stamp; 501 }; 502 503 struct ring_buffer { 504 unsigned pages; 505 unsigned flags; 506 int cpus; 507 atomic_t record_disabled; 508 cpumask_var_t cpumask; 509 510 struct lock_class_key *reader_lock_key; 511 512 struct mutex mutex; 513 514 struct ring_buffer_per_cpu **buffers; 515 516 #ifdef CONFIG_HOTPLUG_CPU 517 struct notifier_block cpu_notify; 518 #endif 519 u64 (*clock)(void); 520 }; 521 522 struct ring_buffer_iter { 523 struct ring_buffer_per_cpu *cpu_buffer; 524 unsigned long head; 525 struct buffer_page *head_page; 526 struct buffer_page *cache_reader_page; 527 unsigned long cache_read; 528 u64 read_stamp; 529 }; 530 531 /* buffer may be either ring_buffer or ring_buffer_per_cpu */ 532 #define RB_WARN_ON(b, cond) \ 533 ({ \ 534 int _____ret = unlikely(cond); \ 535 if (_____ret) { \ 536 if (__same_type(*(b), struct ring_buffer_per_cpu)) { \ 537 struct ring_buffer_per_cpu *__b = \ 538 (void *)b; \ 539 atomic_inc(&__b->buffer->record_disabled); \ 540 } else \ 541 atomic_inc(&b->record_disabled); \ 542 WARN_ON(1); \ 543 } \ 544 _____ret; \ 545 }) 546 547 /* Up this if you want to test the TIME_EXTENTS and normalization */ 548 #define DEBUG_SHIFT 0 549 550 static inline u64 rb_time_stamp(struct ring_buffer *buffer) 551 { 552 /* shift to debug/test normalization and TIME_EXTENTS */ 553 return buffer->clock() << DEBUG_SHIFT; 554 } 555 556 u64 ring_buffer_time_stamp(struct ring_buffer *buffer, int cpu) 557 { 558 u64 time; 559 560 preempt_disable_notrace(); 561 time = rb_time_stamp(buffer); 562 preempt_enable_no_resched_notrace(); 563 564 return time; 565 } 566 EXPORT_SYMBOL_GPL(ring_buffer_time_stamp); 567 568 void ring_buffer_normalize_time_stamp(struct ring_buffer *buffer, 569 int cpu, u64 *ts) 570 { 571 /* Just stupid testing the normalize function and deltas */ 572 *ts >>= DEBUG_SHIFT; 573 } 574 EXPORT_SYMBOL_GPL(ring_buffer_normalize_time_stamp); 575 576 /* 577 * Making the ring buffer lockless makes things tricky. 578 * Although writes only happen on the CPU that they are on, 579 * and they only need to worry about interrupts. Reads can 580 * happen on any CPU. 581 * 582 * The reader page is always off the ring buffer, but when the 583 * reader finishes with a page, it needs to swap its page with 584 * a new one from the buffer. The reader needs to take from 585 * the head (writes go to the tail). But if a writer is in overwrite 586 * mode and wraps, it must push the head page forward. 587 * 588 * Here lies the problem. 589 * 590 * The reader must be careful to replace only the head page, and 591 * not another one. As described at the top of the file in the 592 * ASCII art, the reader sets its old page to point to the next 593 * page after head. It then sets the page after head to point to 594 * the old reader page. But if the writer moves the head page 595 * during this operation, the reader could end up with the tail. 596 * 597 * We use cmpxchg to help prevent this race. We also do something 598 * special with the page before head. We set the LSB to 1. 599 * 600 * When the writer must push the page forward, it will clear the 601 * bit that points to the head page, move the head, and then set 602 * the bit that points to the new head page. 603 * 604 * We also don't want an interrupt coming in and moving the head 605 * page on another writer. Thus we use the second LSB to catch 606 * that too. Thus: 607 * 608 * head->list->prev->next bit 1 bit 0 609 * ------- ------- 610 * Normal page 0 0 611 * Points to head page 0 1 612 * New head page 1 0 613 * 614 * Note we can not trust the prev pointer of the head page, because: 615 * 616 * +----+ +-----+ +-----+ 617 * | |------>| T |---X--->| N | 618 * | |<------| | | | 619 * +----+ +-----+ +-----+ 620 * ^ ^ | 621 * | +-----+ | | 622 * +----------| R |----------+ | 623 * | |<-----------+ 624 * +-----+ 625 * 626 * Key: ---X--> HEAD flag set in pointer 627 * T Tail page 628 * R Reader page 629 * N Next page 630 * 631 * (see __rb_reserve_next() to see where this happens) 632 * 633 * What the above shows is that the reader just swapped out 634 * the reader page with a page in the buffer, but before it 635 * could make the new header point back to the new page added 636 * it was preempted by a writer. The writer moved forward onto 637 * the new page added by the reader and is about to move forward 638 * again. 639 * 640 * You can see, it is legitimate for the previous pointer of 641 * the head (or any page) not to point back to itself. But only 642 * temporarially. 643 */ 644 645 #define RB_PAGE_NORMAL 0UL 646 #define RB_PAGE_HEAD 1UL 647 #define RB_PAGE_UPDATE 2UL 648 649 650 #define RB_FLAG_MASK 3UL 651 652 /* PAGE_MOVED is not part of the mask */ 653 #define RB_PAGE_MOVED 4UL 654 655 /* 656 * rb_list_head - remove any bit 657 */ 658 static struct list_head *rb_list_head(struct list_head *list) 659 { 660 unsigned long val = (unsigned long)list; 661 662 return (struct list_head *)(val & ~RB_FLAG_MASK); 663 } 664 665 /* 666 * rb_is_head_page - test if the given page is the head page 667 * 668 * Because the reader may move the head_page pointer, we can 669 * not trust what the head page is (it may be pointing to 670 * the reader page). But if the next page is a header page, 671 * its flags will be non zero. 672 */ 673 static inline int 674 rb_is_head_page(struct ring_buffer_per_cpu *cpu_buffer, 675 struct buffer_page *page, struct list_head *list) 676 { 677 unsigned long val; 678 679 val = (unsigned long)list->next; 680 681 if ((val & ~RB_FLAG_MASK) != (unsigned long)&page->list) 682 return RB_PAGE_MOVED; 683 684 return val & RB_FLAG_MASK; 685 } 686 687 /* 688 * rb_is_reader_page 689 * 690 * The unique thing about the reader page, is that, if the 691 * writer is ever on it, the previous pointer never points 692 * back to the reader page. 693 */ 694 static int rb_is_reader_page(struct buffer_page *page) 695 { 696 struct list_head *list = page->list.prev; 697 698 return rb_list_head(list->next) != &page->list; 699 } 700 701 /* 702 * rb_set_list_to_head - set a list_head to be pointing to head. 703 */ 704 static void rb_set_list_to_head(struct ring_buffer_per_cpu *cpu_buffer, 705 struct list_head *list) 706 { 707 unsigned long *ptr; 708 709 ptr = (unsigned long *)&list->next; 710 *ptr |= RB_PAGE_HEAD; 711 *ptr &= ~RB_PAGE_UPDATE; 712 } 713 714 /* 715 * rb_head_page_activate - sets up head page 716 */ 717 static void rb_head_page_activate(struct ring_buffer_per_cpu *cpu_buffer) 718 { 719 struct buffer_page *head; 720 721 head = cpu_buffer->head_page; 722 if (!head) 723 return; 724 725 /* 726 * Set the previous list pointer to have the HEAD flag. 727 */ 728 rb_set_list_to_head(cpu_buffer, head->list.prev); 729 } 730 731 static void rb_list_head_clear(struct list_head *list) 732 { 733 unsigned long *ptr = (unsigned long *)&list->next; 734 735 *ptr &= ~RB_FLAG_MASK; 736 } 737 738 /* 739 * rb_head_page_dactivate - clears head page ptr (for free list) 740 */ 741 static void 742 rb_head_page_deactivate(struct ring_buffer_per_cpu *cpu_buffer) 743 { 744 struct list_head *hd; 745 746 /* Go through the whole list and clear any pointers found. */ 747 rb_list_head_clear(cpu_buffer->pages); 748 749 list_for_each(hd, cpu_buffer->pages) 750 rb_list_head_clear(hd); 751 } 752 753 static int rb_head_page_set(struct ring_buffer_per_cpu *cpu_buffer, 754 struct buffer_page *head, 755 struct buffer_page *prev, 756 int old_flag, int new_flag) 757 { 758 struct list_head *list; 759 unsigned long val = (unsigned long)&head->list; 760 unsigned long ret; 761 762 list = &prev->list; 763 764 val &= ~RB_FLAG_MASK; 765 766 ret = cmpxchg((unsigned long *)&list->next, 767 val | old_flag, val | new_flag); 768 769 /* check if the reader took the page */ 770 if ((ret & ~RB_FLAG_MASK) != val) 771 return RB_PAGE_MOVED; 772 773 return ret & RB_FLAG_MASK; 774 } 775 776 static int rb_head_page_set_update(struct ring_buffer_per_cpu *cpu_buffer, 777 struct buffer_page *head, 778 struct buffer_page *prev, 779 int old_flag) 780 { 781 return rb_head_page_set(cpu_buffer, head, prev, 782 old_flag, RB_PAGE_UPDATE); 783 } 784 785 static int rb_head_page_set_head(struct ring_buffer_per_cpu *cpu_buffer, 786 struct buffer_page *head, 787 struct buffer_page *prev, 788 int old_flag) 789 { 790 return rb_head_page_set(cpu_buffer, head, prev, 791 old_flag, RB_PAGE_HEAD); 792 } 793 794 static int rb_head_page_set_normal(struct ring_buffer_per_cpu *cpu_buffer, 795 struct buffer_page *head, 796 struct buffer_page *prev, 797 int old_flag) 798 { 799 return rb_head_page_set(cpu_buffer, head, prev, 800 old_flag, RB_PAGE_NORMAL); 801 } 802 803 static inline void rb_inc_page(struct ring_buffer_per_cpu *cpu_buffer, 804 struct buffer_page **bpage) 805 { 806 struct list_head *p = rb_list_head((*bpage)->list.next); 807 808 *bpage = list_entry(p, struct buffer_page, list); 809 } 810 811 static struct buffer_page * 812 rb_set_head_page(struct ring_buffer_per_cpu *cpu_buffer) 813 { 814 struct buffer_page *head; 815 struct buffer_page *page; 816 struct list_head *list; 817 int i; 818 819 if (RB_WARN_ON(cpu_buffer, !cpu_buffer->head_page)) 820 return NULL; 821 822 /* sanity check */ 823 list = cpu_buffer->pages; 824 if (RB_WARN_ON(cpu_buffer, rb_list_head(list->prev->next) != list)) 825 return NULL; 826 827 page = head = cpu_buffer->head_page; 828 /* 829 * It is possible that the writer moves the header behind 830 * where we started, and we miss in one loop. 831 * A second loop should grab the header, but we'll do 832 * three loops just because I'm paranoid. 833 */ 834 for (i = 0; i < 3; i++) { 835 do { 836 if (rb_is_head_page(cpu_buffer, page, page->list.prev)) { 837 cpu_buffer->head_page = page; 838 return page; 839 } 840 rb_inc_page(cpu_buffer, &page); 841 } while (page != head); 842 } 843 844 RB_WARN_ON(cpu_buffer, 1); 845 846 return NULL; 847 } 848 849 static int rb_head_page_replace(struct buffer_page *old, 850 struct buffer_page *new) 851 { 852 unsigned long *ptr = (unsigned long *)&old->list.prev->next; 853 unsigned long val; 854 unsigned long ret; 855 856 val = *ptr & ~RB_FLAG_MASK; 857 val |= RB_PAGE_HEAD; 858 859 ret = cmpxchg(ptr, val, (unsigned long)&new->list); 860 861 return ret == val; 862 } 863 864 /* 865 * rb_tail_page_update - move the tail page forward 866 * 867 * Returns 1 if moved tail page, 0 if someone else did. 868 */ 869 static int rb_tail_page_update(struct ring_buffer_per_cpu *cpu_buffer, 870 struct buffer_page *tail_page, 871 struct buffer_page *next_page) 872 { 873 struct buffer_page *old_tail; 874 unsigned long old_entries; 875 unsigned long old_write; 876 int ret = 0; 877 878 /* 879 * The tail page now needs to be moved forward. 880 * 881 * We need to reset the tail page, but without messing 882 * with possible erasing of data brought in by interrupts 883 * that have moved the tail page and are currently on it. 884 * 885 * We add a counter to the write field to denote this. 886 */ 887 old_write = local_add_return(RB_WRITE_INTCNT, &next_page->write); 888 old_entries = local_add_return(RB_WRITE_INTCNT, &next_page->entries); 889 890 /* 891 * Just make sure we have seen our old_write and synchronize 892 * with any interrupts that come in. 893 */ 894 barrier(); 895 896 /* 897 * If the tail page is still the same as what we think 898 * it is, then it is up to us to update the tail 899 * pointer. 900 */ 901 if (tail_page == cpu_buffer->tail_page) { 902 /* Zero the write counter */ 903 unsigned long val = old_write & ~RB_WRITE_MASK; 904 unsigned long eval = old_entries & ~RB_WRITE_MASK; 905 906 /* 907 * This will only succeed if an interrupt did 908 * not come in and change it. In which case, we 909 * do not want to modify it. 910 * 911 * We add (void) to let the compiler know that we do not care 912 * about the return value of these functions. We use the 913 * cmpxchg to only update if an interrupt did not already 914 * do it for us. If the cmpxchg fails, we don't care. 915 */ 916 (void)local_cmpxchg(&next_page->write, old_write, val); 917 (void)local_cmpxchg(&next_page->entries, old_entries, eval); 918 919 /* 920 * No need to worry about races with clearing out the commit. 921 * it only can increment when a commit takes place. But that 922 * only happens in the outer most nested commit. 923 */ 924 local_set(&next_page->page->commit, 0); 925 926 old_tail = cmpxchg(&cpu_buffer->tail_page, 927 tail_page, next_page); 928 929 if (old_tail == tail_page) 930 ret = 1; 931 } 932 933 return ret; 934 } 935 936 static int rb_check_bpage(struct ring_buffer_per_cpu *cpu_buffer, 937 struct buffer_page *bpage) 938 { 939 unsigned long val = (unsigned long)bpage; 940 941 if (RB_WARN_ON(cpu_buffer, val & RB_FLAG_MASK)) 942 return 1; 943 944 return 0; 945 } 946 947 /** 948 * rb_check_list - make sure a pointer to a list has the last bits zero 949 */ 950 static int rb_check_list(struct ring_buffer_per_cpu *cpu_buffer, 951 struct list_head *list) 952 { 953 if (RB_WARN_ON(cpu_buffer, rb_list_head(list->prev) != list->prev)) 954 return 1; 955 if (RB_WARN_ON(cpu_buffer, rb_list_head(list->next) != list->next)) 956 return 1; 957 return 0; 958 } 959 960 /** 961 * check_pages - integrity check of buffer pages 962 * @cpu_buffer: CPU buffer with pages to test 963 * 964 * As a safety measure we check to make sure the data pages have not 965 * been corrupted. 966 */ 967 static int rb_check_pages(struct ring_buffer_per_cpu *cpu_buffer) 968 { 969 struct list_head *head = cpu_buffer->pages; 970 struct buffer_page *bpage, *tmp; 971 972 rb_head_page_deactivate(cpu_buffer); 973 974 if (RB_WARN_ON(cpu_buffer, head->next->prev != head)) 975 return -1; 976 if (RB_WARN_ON(cpu_buffer, head->prev->next != head)) 977 return -1; 978 979 if (rb_check_list(cpu_buffer, head)) 980 return -1; 981 982 list_for_each_entry_safe(bpage, tmp, head, list) { 983 if (RB_WARN_ON(cpu_buffer, 984 bpage->list.next->prev != &bpage->list)) 985 return -1; 986 if (RB_WARN_ON(cpu_buffer, 987 bpage->list.prev->next != &bpage->list)) 988 return -1; 989 if (rb_check_list(cpu_buffer, &bpage->list)) 990 return -1; 991 } 992 993 rb_head_page_activate(cpu_buffer); 994 995 return 0; 996 } 997 998 static int rb_allocate_pages(struct ring_buffer_per_cpu *cpu_buffer, 999 unsigned nr_pages) 1000 { 1001 struct buffer_page *bpage, *tmp; 1002 LIST_HEAD(pages); 1003 unsigned i; 1004 1005 WARN_ON(!nr_pages); 1006 1007 for (i = 0; i < nr_pages; i++) { 1008 struct page *page; 1009 /* 1010 * __GFP_NORETRY flag makes sure that the allocation fails 1011 * gracefully without invoking oom-killer and the system is 1012 * not destabilized. 1013 */ 1014 bpage = kzalloc_node(ALIGN(sizeof(*bpage), cache_line_size()), 1015 GFP_KERNEL | __GFP_NORETRY, 1016 cpu_to_node(cpu_buffer->cpu)); 1017 if (!bpage) 1018 goto free_pages; 1019 1020 rb_check_bpage(cpu_buffer, bpage); 1021 1022 list_add(&bpage->list, &pages); 1023 1024 page = alloc_pages_node(cpu_to_node(cpu_buffer->cpu), 1025 GFP_KERNEL | __GFP_NORETRY, 0); 1026 if (!page) 1027 goto free_pages; 1028 bpage->page = page_address(page); 1029 rb_init_page(bpage->page); 1030 } 1031 1032 /* 1033 * The ring buffer page list is a circular list that does not 1034 * start and end with a list head. All page list items point to 1035 * other pages. 1036 */ 1037 cpu_buffer->pages = pages.next; 1038 list_del(&pages); 1039 1040 rb_check_pages(cpu_buffer); 1041 1042 return 0; 1043 1044 free_pages: 1045 list_for_each_entry_safe(bpage, tmp, &pages, list) { 1046 list_del_init(&bpage->list); 1047 free_buffer_page(bpage); 1048 } 1049 return -ENOMEM; 1050 } 1051 1052 static struct ring_buffer_per_cpu * 1053 rb_allocate_cpu_buffer(struct ring_buffer *buffer, int cpu) 1054 { 1055 struct ring_buffer_per_cpu *cpu_buffer; 1056 struct buffer_page *bpage; 1057 struct page *page; 1058 int ret; 1059 1060 cpu_buffer = kzalloc_node(ALIGN(sizeof(*cpu_buffer), cache_line_size()), 1061 GFP_KERNEL, cpu_to_node(cpu)); 1062 if (!cpu_buffer) 1063 return NULL; 1064 1065 cpu_buffer->cpu = cpu; 1066 cpu_buffer->buffer = buffer; 1067 raw_spin_lock_init(&cpu_buffer->reader_lock); 1068 lockdep_set_class(&cpu_buffer->reader_lock, buffer->reader_lock_key); 1069 cpu_buffer->lock = (arch_spinlock_t)__ARCH_SPIN_LOCK_UNLOCKED; 1070 1071 bpage = kzalloc_node(ALIGN(sizeof(*bpage), cache_line_size()), 1072 GFP_KERNEL, cpu_to_node(cpu)); 1073 if (!bpage) 1074 goto fail_free_buffer; 1075 1076 rb_check_bpage(cpu_buffer, bpage); 1077 1078 cpu_buffer->reader_page = bpage; 1079 page = alloc_pages_node(cpu_to_node(cpu), GFP_KERNEL, 0); 1080 if (!page) 1081 goto fail_free_reader; 1082 bpage->page = page_address(page); 1083 rb_init_page(bpage->page); 1084 1085 INIT_LIST_HEAD(&cpu_buffer->reader_page->list); 1086 1087 ret = rb_allocate_pages(cpu_buffer, buffer->pages); 1088 if (ret < 0) 1089 goto fail_free_reader; 1090 1091 cpu_buffer->head_page 1092 = list_entry(cpu_buffer->pages, struct buffer_page, list); 1093 cpu_buffer->tail_page = cpu_buffer->commit_page = cpu_buffer->head_page; 1094 1095 rb_head_page_activate(cpu_buffer); 1096 1097 return cpu_buffer; 1098 1099 fail_free_reader: 1100 free_buffer_page(cpu_buffer->reader_page); 1101 1102 fail_free_buffer: 1103 kfree(cpu_buffer); 1104 return NULL; 1105 } 1106 1107 static void rb_free_cpu_buffer(struct ring_buffer_per_cpu *cpu_buffer) 1108 { 1109 struct list_head *head = cpu_buffer->pages; 1110 struct buffer_page *bpage, *tmp; 1111 1112 free_buffer_page(cpu_buffer->reader_page); 1113 1114 rb_head_page_deactivate(cpu_buffer); 1115 1116 if (head) { 1117 list_for_each_entry_safe(bpage, tmp, head, list) { 1118 list_del_init(&bpage->list); 1119 free_buffer_page(bpage); 1120 } 1121 bpage = list_entry(head, struct buffer_page, list); 1122 free_buffer_page(bpage); 1123 } 1124 1125 kfree(cpu_buffer); 1126 } 1127 1128 #ifdef CONFIG_HOTPLUG_CPU 1129 static int rb_cpu_notify(struct notifier_block *self, 1130 unsigned long action, void *hcpu); 1131 #endif 1132 1133 /** 1134 * ring_buffer_alloc - allocate a new ring_buffer 1135 * @size: the size in bytes per cpu that is needed. 1136 * @flags: attributes to set for the ring buffer. 1137 * 1138 * Currently the only flag that is available is the RB_FL_OVERWRITE 1139 * flag. This flag means that the buffer will overwrite old data 1140 * when the buffer wraps. If this flag is not set, the buffer will 1141 * drop data when the tail hits the head. 1142 */ 1143 struct ring_buffer *__ring_buffer_alloc(unsigned long size, unsigned flags, 1144 struct lock_class_key *key) 1145 { 1146 struct ring_buffer *buffer; 1147 int bsize; 1148 int cpu; 1149 1150 /* keep it in its own cache line */ 1151 buffer = kzalloc(ALIGN(sizeof(*buffer), cache_line_size()), 1152 GFP_KERNEL); 1153 if (!buffer) 1154 return NULL; 1155 1156 if (!alloc_cpumask_var(&buffer->cpumask, GFP_KERNEL)) 1157 goto fail_free_buffer; 1158 1159 buffer->pages = DIV_ROUND_UP(size, BUF_PAGE_SIZE); 1160 buffer->flags = flags; 1161 buffer->clock = trace_clock_local; 1162 buffer->reader_lock_key = key; 1163 1164 /* need at least two pages */ 1165 if (buffer->pages < 2) 1166 buffer->pages = 2; 1167 1168 /* 1169 * In case of non-hotplug cpu, if the ring-buffer is allocated 1170 * in early initcall, it will not be notified of secondary cpus. 1171 * In that off case, we need to allocate for all possible cpus. 1172 */ 1173 #ifdef CONFIG_HOTPLUG_CPU 1174 get_online_cpus(); 1175 cpumask_copy(buffer->cpumask, cpu_online_mask); 1176 #else 1177 cpumask_copy(buffer->cpumask, cpu_possible_mask); 1178 #endif 1179 buffer->cpus = nr_cpu_ids; 1180 1181 bsize = sizeof(void *) * nr_cpu_ids; 1182 buffer->buffers = kzalloc(ALIGN(bsize, cache_line_size()), 1183 GFP_KERNEL); 1184 if (!buffer->buffers) 1185 goto fail_free_cpumask; 1186 1187 for_each_buffer_cpu(buffer, cpu) { 1188 buffer->buffers[cpu] = 1189 rb_allocate_cpu_buffer(buffer, cpu); 1190 if (!buffer->buffers[cpu]) 1191 goto fail_free_buffers; 1192 } 1193 1194 #ifdef CONFIG_HOTPLUG_CPU 1195 buffer->cpu_notify.notifier_call = rb_cpu_notify; 1196 buffer->cpu_notify.priority = 0; 1197 register_cpu_notifier(&buffer->cpu_notify); 1198 #endif 1199 1200 put_online_cpus(); 1201 mutex_init(&buffer->mutex); 1202 1203 return buffer; 1204 1205 fail_free_buffers: 1206 for_each_buffer_cpu(buffer, cpu) { 1207 if (buffer->buffers[cpu]) 1208 rb_free_cpu_buffer(buffer->buffers[cpu]); 1209 } 1210 kfree(buffer->buffers); 1211 1212 fail_free_cpumask: 1213 free_cpumask_var(buffer->cpumask); 1214 put_online_cpus(); 1215 1216 fail_free_buffer: 1217 kfree(buffer); 1218 return NULL; 1219 } 1220 EXPORT_SYMBOL_GPL(__ring_buffer_alloc); 1221 1222 /** 1223 * ring_buffer_free - free a ring buffer. 1224 * @buffer: the buffer to free. 1225 */ 1226 void 1227 ring_buffer_free(struct ring_buffer *buffer) 1228 { 1229 int cpu; 1230 1231 get_online_cpus(); 1232 1233 #ifdef CONFIG_HOTPLUG_CPU 1234 unregister_cpu_notifier(&buffer->cpu_notify); 1235 #endif 1236 1237 for_each_buffer_cpu(buffer, cpu) 1238 rb_free_cpu_buffer(buffer->buffers[cpu]); 1239 1240 put_online_cpus(); 1241 1242 kfree(buffer->buffers); 1243 free_cpumask_var(buffer->cpumask); 1244 1245 kfree(buffer); 1246 } 1247 EXPORT_SYMBOL_GPL(ring_buffer_free); 1248 1249 void ring_buffer_set_clock(struct ring_buffer *buffer, 1250 u64 (*clock)(void)) 1251 { 1252 buffer->clock = clock; 1253 } 1254 1255 static void rb_reset_cpu(struct ring_buffer_per_cpu *cpu_buffer); 1256 1257 static void 1258 rb_remove_pages(struct ring_buffer_per_cpu *cpu_buffer, unsigned nr_pages) 1259 { 1260 struct buffer_page *bpage; 1261 struct list_head *p; 1262 unsigned i; 1263 1264 raw_spin_lock_irq(&cpu_buffer->reader_lock); 1265 rb_head_page_deactivate(cpu_buffer); 1266 1267 for (i = 0; i < nr_pages; i++) { 1268 if (RB_WARN_ON(cpu_buffer, list_empty(cpu_buffer->pages))) 1269 goto out; 1270 p = cpu_buffer->pages->next; 1271 bpage = list_entry(p, struct buffer_page, list); 1272 list_del_init(&bpage->list); 1273 free_buffer_page(bpage); 1274 } 1275 if (RB_WARN_ON(cpu_buffer, list_empty(cpu_buffer->pages))) 1276 goto out; 1277 1278 rb_reset_cpu(cpu_buffer); 1279 rb_check_pages(cpu_buffer); 1280 1281 out: 1282 raw_spin_unlock_irq(&cpu_buffer->reader_lock); 1283 } 1284 1285 static void 1286 rb_insert_pages(struct ring_buffer_per_cpu *cpu_buffer, 1287 struct list_head *pages, unsigned nr_pages) 1288 { 1289 struct buffer_page *bpage; 1290 struct list_head *p; 1291 unsigned i; 1292 1293 raw_spin_lock_irq(&cpu_buffer->reader_lock); 1294 rb_head_page_deactivate(cpu_buffer); 1295 1296 for (i = 0; i < nr_pages; i++) { 1297 if (RB_WARN_ON(cpu_buffer, list_empty(pages))) 1298 goto out; 1299 p = pages->next; 1300 bpage = list_entry(p, struct buffer_page, list); 1301 list_del_init(&bpage->list); 1302 list_add_tail(&bpage->list, cpu_buffer->pages); 1303 } 1304 rb_reset_cpu(cpu_buffer); 1305 rb_check_pages(cpu_buffer); 1306 1307 out: 1308 raw_spin_unlock_irq(&cpu_buffer->reader_lock); 1309 } 1310 1311 /** 1312 * ring_buffer_resize - resize the ring buffer 1313 * @buffer: the buffer to resize. 1314 * @size: the new size. 1315 * 1316 * Minimum size is 2 * BUF_PAGE_SIZE. 1317 * 1318 * Returns -1 on failure. 1319 */ 1320 int ring_buffer_resize(struct ring_buffer *buffer, unsigned long size) 1321 { 1322 struct ring_buffer_per_cpu *cpu_buffer; 1323 unsigned nr_pages, rm_pages, new_pages; 1324 struct buffer_page *bpage, *tmp; 1325 unsigned long buffer_size; 1326 LIST_HEAD(pages); 1327 int i, cpu; 1328 1329 /* 1330 * Always succeed at resizing a non-existent buffer: 1331 */ 1332 if (!buffer) 1333 return size; 1334 1335 size = DIV_ROUND_UP(size, BUF_PAGE_SIZE); 1336 size *= BUF_PAGE_SIZE; 1337 buffer_size = buffer->pages * BUF_PAGE_SIZE; 1338 1339 /* we need a minimum of two pages */ 1340 if (size < BUF_PAGE_SIZE * 2) 1341 size = BUF_PAGE_SIZE * 2; 1342 1343 if (size == buffer_size) 1344 return size; 1345 1346 atomic_inc(&buffer->record_disabled); 1347 1348 /* Make sure all writers are done with this buffer. */ 1349 synchronize_sched(); 1350 1351 mutex_lock(&buffer->mutex); 1352 get_online_cpus(); 1353 1354 nr_pages = DIV_ROUND_UP(size, BUF_PAGE_SIZE); 1355 1356 if (size < buffer_size) { 1357 1358 /* easy case, just free pages */ 1359 if (RB_WARN_ON(buffer, nr_pages >= buffer->pages)) 1360 goto out_fail; 1361 1362 rm_pages = buffer->pages - nr_pages; 1363 1364 for_each_buffer_cpu(buffer, cpu) { 1365 cpu_buffer = buffer->buffers[cpu]; 1366 rb_remove_pages(cpu_buffer, rm_pages); 1367 } 1368 goto out; 1369 } 1370 1371 /* 1372 * This is a bit more difficult. We only want to add pages 1373 * when we can allocate enough for all CPUs. We do this 1374 * by allocating all the pages and storing them on a local 1375 * link list. If we succeed in our allocation, then we 1376 * add these pages to the cpu_buffers. Otherwise we just free 1377 * them all and return -ENOMEM; 1378 */ 1379 if (RB_WARN_ON(buffer, nr_pages <= buffer->pages)) 1380 goto out_fail; 1381 1382 new_pages = nr_pages - buffer->pages; 1383 1384 for_each_buffer_cpu(buffer, cpu) { 1385 for (i = 0; i < new_pages; i++) { 1386 struct page *page; 1387 /* 1388 * __GFP_NORETRY flag makes sure that the allocation 1389 * fails gracefully without invoking oom-killer and 1390 * the system is not destabilized. 1391 */ 1392 bpage = kzalloc_node(ALIGN(sizeof(*bpage), 1393 cache_line_size()), 1394 GFP_KERNEL | __GFP_NORETRY, 1395 cpu_to_node(cpu)); 1396 if (!bpage) 1397 goto free_pages; 1398 list_add(&bpage->list, &pages); 1399 page = alloc_pages_node(cpu_to_node(cpu), 1400 GFP_KERNEL | __GFP_NORETRY, 0); 1401 if (!page) 1402 goto free_pages; 1403 bpage->page = page_address(page); 1404 rb_init_page(bpage->page); 1405 } 1406 } 1407 1408 for_each_buffer_cpu(buffer, cpu) { 1409 cpu_buffer = buffer->buffers[cpu]; 1410 rb_insert_pages(cpu_buffer, &pages, new_pages); 1411 } 1412 1413 if (RB_WARN_ON(buffer, !list_empty(&pages))) 1414 goto out_fail; 1415 1416 out: 1417 buffer->pages = nr_pages; 1418 put_online_cpus(); 1419 mutex_unlock(&buffer->mutex); 1420 1421 atomic_dec(&buffer->record_disabled); 1422 1423 return size; 1424 1425 free_pages: 1426 list_for_each_entry_safe(bpage, tmp, &pages, list) { 1427 list_del_init(&bpage->list); 1428 free_buffer_page(bpage); 1429 } 1430 put_online_cpus(); 1431 mutex_unlock(&buffer->mutex); 1432 atomic_dec(&buffer->record_disabled); 1433 return -ENOMEM; 1434 1435 /* 1436 * Something went totally wrong, and we are too paranoid 1437 * to even clean up the mess. 1438 */ 1439 out_fail: 1440 put_online_cpus(); 1441 mutex_unlock(&buffer->mutex); 1442 atomic_dec(&buffer->record_disabled); 1443 return -1; 1444 } 1445 EXPORT_SYMBOL_GPL(ring_buffer_resize); 1446 1447 void ring_buffer_change_overwrite(struct ring_buffer *buffer, int val) 1448 { 1449 mutex_lock(&buffer->mutex); 1450 if (val) 1451 buffer->flags |= RB_FL_OVERWRITE; 1452 else 1453 buffer->flags &= ~RB_FL_OVERWRITE; 1454 mutex_unlock(&buffer->mutex); 1455 } 1456 EXPORT_SYMBOL_GPL(ring_buffer_change_overwrite); 1457 1458 static inline void * 1459 __rb_data_page_index(struct buffer_data_page *bpage, unsigned index) 1460 { 1461 return bpage->data + index; 1462 } 1463 1464 static inline void *__rb_page_index(struct buffer_page *bpage, unsigned index) 1465 { 1466 return bpage->page->data + index; 1467 } 1468 1469 static inline struct ring_buffer_event * 1470 rb_reader_event(struct ring_buffer_per_cpu *cpu_buffer) 1471 { 1472 return __rb_page_index(cpu_buffer->reader_page, 1473 cpu_buffer->reader_page->read); 1474 } 1475 1476 static inline struct ring_buffer_event * 1477 rb_iter_head_event(struct ring_buffer_iter *iter) 1478 { 1479 return __rb_page_index(iter->head_page, iter->head); 1480 } 1481 1482 static inline unsigned long rb_page_write(struct buffer_page *bpage) 1483 { 1484 return local_read(&bpage->write) & RB_WRITE_MASK; 1485 } 1486 1487 static inline unsigned rb_page_commit(struct buffer_page *bpage) 1488 { 1489 return local_read(&bpage->page->commit); 1490 } 1491 1492 static inline unsigned long rb_page_entries(struct buffer_page *bpage) 1493 { 1494 return local_read(&bpage->entries) & RB_WRITE_MASK; 1495 } 1496 1497 /* Size is determined by what has been committed */ 1498 static inline unsigned rb_page_size(struct buffer_page *bpage) 1499 { 1500 return rb_page_commit(bpage); 1501 } 1502 1503 static inline unsigned 1504 rb_commit_index(struct ring_buffer_per_cpu *cpu_buffer) 1505 { 1506 return rb_page_commit(cpu_buffer->commit_page); 1507 } 1508 1509 static inline unsigned 1510 rb_event_index(struct ring_buffer_event *event) 1511 { 1512 unsigned long addr = (unsigned long)event; 1513 1514 return (addr & ~PAGE_MASK) - BUF_PAGE_HDR_SIZE; 1515 } 1516 1517 static inline int 1518 rb_event_is_commit(struct ring_buffer_per_cpu *cpu_buffer, 1519 struct ring_buffer_event *event) 1520 { 1521 unsigned long addr = (unsigned long)event; 1522 unsigned long index; 1523 1524 index = rb_event_index(event); 1525 addr &= PAGE_MASK; 1526 1527 return cpu_buffer->commit_page->page == (void *)addr && 1528 rb_commit_index(cpu_buffer) == index; 1529 } 1530 1531 static void 1532 rb_set_commit_to_write(struct ring_buffer_per_cpu *cpu_buffer) 1533 { 1534 unsigned long max_count; 1535 1536 /* 1537 * We only race with interrupts and NMIs on this CPU. 1538 * If we own the commit event, then we can commit 1539 * all others that interrupted us, since the interruptions 1540 * are in stack format (they finish before they come 1541 * back to us). This allows us to do a simple loop to 1542 * assign the commit to the tail. 1543 */ 1544 again: 1545 max_count = cpu_buffer->buffer->pages * 100; 1546 1547 while (cpu_buffer->commit_page != cpu_buffer->tail_page) { 1548 if (RB_WARN_ON(cpu_buffer, !(--max_count))) 1549 return; 1550 if (RB_WARN_ON(cpu_buffer, 1551 rb_is_reader_page(cpu_buffer->tail_page))) 1552 return; 1553 local_set(&cpu_buffer->commit_page->page->commit, 1554 rb_page_write(cpu_buffer->commit_page)); 1555 rb_inc_page(cpu_buffer, &cpu_buffer->commit_page); 1556 cpu_buffer->write_stamp = 1557 cpu_buffer->commit_page->page->time_stamp; 1558 /* add barrier to keep gcc from optimizing too much */ 1559 barrier(); 1560 } 1561 while (rb_commit_index(cpu_buffer) != 1562 rb_page_write(cpu_buffer->commit_page)) { 1563 1564 local_set(&cpu_buffer->commit_page->page->commit, 1565 rb_page_write(cpu_buffer->commit_page)); 1566 RB_WARN_ON(cpu_buffer, 1567 local_read(&cpu_buffer->commit_page->page->commit) & 1568 ~RB_WRITE_MASK); 1569 barrier(); 1570 } 1571 1572 /* again, keep gcc from optimizing */ 1573 barrier(); 1574 1575 /* 1576 * If an interrupt came in just after the first while loop 1577 * and pushed the tail page forward, we will be left with 1578 * a dangling commit that will never go forward. 1579 */ 1580 if (unlikely(cpu_buffer->commit_page != cpu_buffer->tail_page)) 1581 goto again; 1582 } 1583 1584 static void rb_reset_reader_page(struct ring_buffer_per_cpu *cpu_buffer) 1585 { 1586 cpu_buffer->read_stamp = cpu_buffer->reader_page->page->time_stamp; 1587 cpu_buffer->reader_page->read = 0; 1588 } 1589 1590 static void rb_inc_iter(struct ring_buffer_iter *iter) 1591 { 1592 struct ring_buffer_per_cpu *cpu_buffer = iter->cpu_buffer; 1593 1594 /* 1595 * The iterator could be on the reader page (it starts there). 1596 * But the head could have moved, since the reader was 1597 * found. Check for this case and assign the iterator 1598 * to the head page instead of next. 1599 */ 1600 if (iter->head_page == cpu_buffer->reader_page) 1601 iter->head_page = rb_set_head_page(cpu_buffer); 1602 else 1603 rb_inc_page(cpu_buffer, &iter->head_page); 1604 1605 iter->read_stamp = iter->head_page->page->time_stamp; 1606 iter->head = 0; 1607 } 1608 1609 /* Slow path, do not inline */ 1610 static noinline struct ring_buffer_event * 1611 rb_add_time_stamp(struct ring_buffer_event *event, u64 delta) 1612 { 1613 event->type_len = RINGBUF_TYPE_TIME_EXTEND; 1614 1615 /* Not the first event on the page? */ 1616 if (rb_event_index(event)) { 1617 event->time_delta = delta & TS_MASK; 1618 event->array[0] = delta >> TS_SHIFT; 1619 } else { 1620 /* nope, just zero it */ 1621 event->time_delta = 0; 1622 event->array[0] = 0; 1623 } 1624 1625 return skip_time_extend(event); 1626 } 1627 1628 /** 1629 * ring_buffer_update_event - update event type and data 1630 * @event: the even to update 1631 * @type: the type of event 1632 * @length: the size of the event field in the ring buffer 1633 * 1634 * Update the type and data fields of the event. The length 1635 * is the actual size that is written to the ring buffer, 1636 * and with this, we can determine what to place into the 1637 * data field. 1638 */ 1639 static void 1640 rb_update_event(struct ring_buffer_per_cpu *cpu_buffer, 1641 struct ring_buffer_event *event, unsigned length, 1642 int add_timestamp, u64 delta) 1643 { 1644 /* Only a commit updates the timestamp */ 1645 if (unlikely(!rb_event_is_commit(cpu_buffer, event))) 1646 delta = 0; 1647 1648 /* 1649 * If we need to add a timestamp, then we 1650 * add it to the start of the resevered space. 1651 */ 1652 if (unlikely(add_timestamp)) { 1653 event = rb_add_time_stamp(event, delta); 1654 length -= RB_LEN_TIME_EXTEND; 1655 delta = 0; 1656 } 1657 1658 event->time_delta = delta; 1659 length -= RB_EVNT_HDR_SIZE; 1660 if (length > RB_MAX_SMALL_DATA || RB_FORCE_8BYTE_ALIGNMENT) { 1661 event->type_len = 0; 1662 event->array[0] = length; 1663 } else 1664 event->type_len = DIV_ROUND_UP(length, RB_ALIGNMENT); 1665 } 1666 1667 /* 1668 * rb_handle_head_page - writer hit the head page 1669 * 1670 * Returns: +1 to retry page 1671 * 0 to continue 1672 * -1 on error 1673 */ 1674 static int 1675 rb_handle_head_page(struct ring_buffer_per_cpu *cpu_buffer, 1676 struct buffer_page *tail_page, 1677 struct buffer_page *next_page) 1678 { 1679 struct buffer_page *new_head; 1680 int entries; 1681 int type; 1682 int ret; 1683 1684 entries = rb_page_entries(next_page); 1685 1686 /* 1687 * The hard part is here. We need to move the head 1688 * forward, and protect against both readers on 1689 * other CPUs and writers coming in via interrupts. 1690 */ 1691 type = rb_head_page_set_update(cpu_buffer, next_page, tail_page, 1692 RB_PAGE_HEAD); 1693 1694 /* 1695 * type can be one of four: 1696 * NORMAL - an interrupt already moved it for us 1697 * HEAD - we are the first to get here. 1698 * UPDATE - we are the interrupt interrupting 1699 * a current move. 1700 * MOVED - a reader on another CPU moved the next 1701 * pointer to its reader page. Give up 1702 * and try again. 1703 */ 1704 1705 switch (type) { 1706 case RB_PAGE_HEAD: 1707 /* 1708 * We changed the head to UPDATE, thus 1709 * it is our responsibility to update 1710 * the counters. 1711 */ 1712 local_add(entries, &cpu_buffer->overrun); 1713 local_sub(BUF_PAGE_SIZE, &cpu_buffer->entries_bytes); 1714 1715 /* 1716 * The entries will be zeroed out when we move the 1717 * tail page. 1718 */ 1719 1720 /* still more to do */ 1721 break; 1722 1723 case RB_PAGE_UPDATE: 1724 /* 1725 * This is an interrupt that interrupt the 1726 * previous update. Still more to do. 1727 */ 1728 break; 1729 case RB_PAGE_NORMAL: 1730 /* 1731 * An interrupt came in before the update 1732 * and processed this for us. 1733 * Nothing left to do. 1734 */ 1735 return 1; 1736 case RB_PAGE_MOVED: 1737 /* 1738 * The reader is on another CPU and just did 1739 * a swap with our next_page. 1740 * Try again. 1741 */ 1742 return 1; 1743 default: 1744 RB_WARN_ON(cpu_buffer, 1); /* WTF??? */ 1745 return -1; 1746 } 1747 1748 /* 1749 * Now that we are here, the old head pointer is 1750 * set to UPDATE. This will keep the reader from 1751 * swapping the head page with the reader page. 1752 * The reader (on another CPU) will spin till 1753 * we are finished. 1754 * 1755 * We just need to protect against interrupts 1756 * doing the job. We will set the next pointer 1757 * to HEAD. After that, we set the old pointer 1758 * to NORMAL, but only if it was HEAD before. 1759 * otherwise we are an interrupt, and only 1760 * want the outer most commit to reset it. 1761 */ 1762 new_head = next_page; 1763 rb_inc_page(cpu_buffer, &new_head); 1764 1765 ret = rb_head_page_set_head(cpu_buffer, new_head, next_page, 1766 RB_PAGE_NORMAL); 1767 1768 /* 1769 * Valid returns are: 1770 * HEAD - an interrupt came in and already set it. 1771 * NORMAL - One of two things: 1772 * 1) We really set it. 1773 * 2) A bunch of interrupts came in and moved 1774 * the page forward again. 1775 */ 1776 switch (ret) { 1777 case RB_PAGE_HEAD: 1778 case RB_PAGE_NORMAL: 1779 /* OK */ 1780 break; 1781 default: 1782 RB_WARN_ON(cpu_buffer, 1); 1783 return -1; 1784 } 1785 1786 /* 1787 * It is possible that an interrupt came in, 1788 * set the head up, then more interrupts came in 1789 * and moved it again. When we get back here, 1790 * the page would have been set to NORMAL but we 1791 * just set it back to HEAD. 1792 * 1793 * How do you detect this? Well, if that happened 1794 * the tail page would have moved. 1795 */ 1796 if (ret == RB_PAGE_NORMAL) { 1797 /* 1798 * If the tail had moved passed next, then we need 1799 * to reset the pointer. 1800 */ 1801 if (cpu_buffer->tail_page != tail_page && 1802 cpu_buffer->tail_page != next_page) 1803 rb_head_page_set_normal(cpu_buffer, new_head, 1804 next_page, 1805 RB_PAGE_HEAD); 1806 } 1807 1808 /* 1809 * If this was the outer most commit (the one that 1810 * changed the original pointer from HEAD to UPDATE), 1811 * then it is up to us to reset it to NORMAL. 1812 */ 1813 if (type == RB_PAGE_HEAD) { 1814 ret = rb_head_page_set_normal(cpu_buffer, next_page, 1815 tail_page, 1816 RB_PAGE_UPDATE); 1817 if (RB_WARN_ON(cpu_buffer, 1818 ret != RB_PAGE_UPDATE)) 1819 return -1; 1820 } 1821 1822 return 0; 1823 } 1824 1825 static unsigned rb_calculate_event_length(unsigned length) 1826 { 1827 struct ring_buffer_event event; /* Used only for sizeof array */ 1828 1829 /* zero length can cause confusions */ 1830 if (!length) 1831 length = 1; 1832 1833 if (length > RB_MAX_SMALL_DATA || RB_FORCE_8BYTE_ALIGNMENT) 1834 length += sizeof(event.array[0]); 1835 1836 length += RB_EVNT_HDR_SIZE; 1837 length = ALIGN(length, RB_ARCH_ALIGNMENT); 1838 1839 return length; 1840 } 1841 1842 static inline void 1843 rb_reset_tail(struct ring_buffer_per_cpu *cpu_buffer, 1844 struct buffer_page *tail_page, 1845 unsigned long tail, unsigned long length) 1846 { 1847 struct ring_buffer_event *event; 1848 1849 /* 1850 * Only the event that crossed the page boundary 1851 * must fill the old tail_page with padding. 1852 */ 1853 if (tail >= BUF_PAGE_SIZE) { 1854 /* 1855 * If the page was filled, then we still need 1856 * to update the real_end. Reset it to zero 1857 * and the reader will ignore it. 1858 */ 1859 if (tail == BUF_PAGE_SIZE) 1860 tail_page->real_end = 0; 1861 1862 local_sub(length, &tail_page->write); 1863 return; 1864 } 1865 1866 event = __rb_page_index(tail_page, tail); 1867 kmemcheck_annotate_bitfield(event, bitfield); 1868 1869 /* account for padding bytes */ 1870 local_add(BUF_PAGE_SIZE - tail, &cpu_buffer->entries_bytes); 1871 1872 /* 1873 * Save the original length to the meta data. 1874 * This will be used by the reader to add lost event 1875 * counter. 1876 */ 1877 tail_page->real_end = tail; 1878 1879 /* 1880 * If this event is bigger than the minimum size, then 1881 * we need to be careful that we don't subtract the 1882 * write counter enough to allow another writer to slip 1883 * in on this page. 1884 * We put in a discarded commit instead, to make sure 1885 * that this space is not used again. 1886 * 1887 * If we are less than the minimum size, we don't need to 1888 * worry about it. 1889 */ 1890 if (tail > (BUF_PAGE_SIZE - RB_EVNT_MIN_SIZE)) { 1891 /* No room for any events */ 1892 1893 /* Mark the rest of the page with padding */ 1894 rb_event_set_padding(event); 1895 1896 /* Set the write back to the previous setting */ 1897 local_sub(length, &tail_page->write); 1898 return; 1899 } 1900 1901 /* Put in a discarded event */ 1902 event->array[0] = (BUF_PAGE_SIZE - tail) - RB_EVNT_HDR_SIZE; 1903 event->type_len = RINGBUF_TYPE_PADDING; 1904 /* time delta must be non zero */ 1905 event->time_delta = 1; 1906 1907 /* Set write to end of buffer */ 1908 length = (tail + length) - BUF_PAGE_SIZE; 1909 local_sub(length, &tail_page->write); 1910 } 1911 1912 /* 1913 * This is the slow path, force gcc not to inline it. 1914 */ 1915 static noinline struct ring_buffer_event * 1916 rb_move_tail(struct ring_buffer_per_cpu *cpu_buffer, 1917 unsigned long length, unsigned long tail, 1918 struct buffer_page *tail_page, u64 ts) 1919 { 1920 struct buffer_page *commit_page = cpu_buffer->commit_page; 1921 struct ring_buffer *buffer = cpu_buffer->buffer; 1922 struct buffer_page *next_page; 1923 int ret; 1924 1925 next_page = tail_page; 1926 1927 rb_inc_page(cpu_buffer, &next_page); 1928 1929 /* 1930 * If for some reason, we had an interrupt storm that made 1931 * it all the way around the buffer, bail, and warn 1932 * about it. 1933 */ 1934 if (unlikely(next_page == commit_page)) { 1935 local_inc(&cpu_buffer->commit_overrun); 1936 goto out_reset; 1937 } 1938 1939 /* 1940 * This is where the fun begins! 1941 * 1942 * We are fighting against races between a reader that 1943 * could be on another CPU trying to swap its reader 1944 * page with the buffer head. 1945 * 1946 * We are also fighting against interrupts coming in and 1947 * moving the head or tail on us as well. 1948 * 1949 * If the next page is the head page then we have filled 1950 * the buffer, unless the commit page is still on the 1951 * reader page. 1952 */ 1953 if (rb_is_head_page(cpu_buffer, next_page, &tail_page->list)) { 1954 1955 /* 1956 * If the commit is not on the reader page, then 1957 * move the header page. 1958 */ 1959 if (!rb_is_reader_page(cpu_buffer->commit_page)) { 1960 /* 1961 * If we are not in overwrite mode, 1962 * this is easy, just stop here. 1963 */ 1964 if (!(buffer->flags & RB_FL_OVERWRITE)) 1965 goto out_reset; 1966 1967 ret = rb_handle_head_page(cpu_buffer, 1968 tail_page, 1969 next_page); 1970 if (ret < 0) 1971 goto out_reset; 1972 if (ret) 1973 goto out_again; 1974 } else { 1975 /* 1976 * We need to be careful here too. The 1977 * commit page could still be on the reader 1978 * page. We could have a small buffer, and 1979 * have filled up the buffer with events 1980 * from interrupts and such, and wrapped. 1981 * 1982 * Note, if the tail page is also the on the 1983 * reader_page, we let it move out. 1984 */ 1985 if (unlikely((cpu_buffer->commit_page != 1986 cpu_buffer->tail_page) && 1987 (cpu_buffer->commit_page == 1988 cpu_buffer->reader_page))) { 1989 local_inc(&cpu_buffer->commit_overrun); 1990 goto out_reset; 1991 } 1992 } 1993 } 1994 1995 ret = rb_tail_page_update(cpu_buffer, tail_page, next_page); 1996 if (ret) { 1997 /* 1998 * Nested commits always have zero deltas, so 1999 * just reread the time stamp 2000 */ 2001 ts = rb_time_stamp(buffer); 2002 next_page->page->time_stamp = ts; 2003 } 2004 2005 out_again: 2006 2007 rb_reset_tail(cpu_buffer, tail_page, tail, length); 2008 2009 /* fail and let the caller try again */ 2010 return ERR_PTR(-EAGAIN); 2011 2012 out_reset: 2013 /* reset write */ 2014 rb_reset_tail(cpu_buffer, tail_page, tail, length); 2015 2016 return NULL; 2017 } 2018 2019 static struct ring_buffer_event * 2020 __rb_reserve_next(struct ring_buffer_per_cpu *cpu_buffer, 2021 unsigned long length, u64 ts, 2022 u64 delta, int add_timestamp) 2023 { 2024 struct buffer_page *tail_page; 2025 struct ring_buffer_event *event; 2026 unsigned long tail, write; 2027 2028 /* 2029 * If the time delta since the last event is too big to 2030 * hold in the time field of the event, then we append a 2031 * TIME EXTEND event ahead of the data event. 2032 */ 2033 if (unlikely(add_timestamp)) 2034 length += RB_LEN_TIME_EXTEND; 2035 2036 tail_page = cpu_buffer->tail_page; 2037 write = local_add_return(length, &tail_page->write); 2038 2039 /* set write to only the index of the write */ 2040 write &= RB_WRITE_MASK; 2041 tail = write - length; 2042 2043 /* See if we shot pass the end of this buffer page */ 2044 if (unlikely(write > BUF_PAGE_SIZE)) 2045 return rb_move_tail(cpu_buffer, length, tail, 2046 tail_page, ts); 2047 2048 /* We reserved something on the buffer */ 2049 2050 event = __rb_page_index(tail_page, tail); 2051 kmemcheck_annotate_bitfield(event, bitfield); 2052 rb_update_event(cpu_buffer, event, length, add_timestamp, delta); 2053 2054 local_inc(&tail_page->entries); 2055 2056 /* 2057 * If this is the first commit on the page, then update 2058 * its timestamp. 2059 */ 2060 if (!tail) 2061 tail_page->page->time_stamp = ts; 2062 2063 /* account for these added bytes */ 2064 local_add(length, &cpu_buffer->entries_bytes); 2065 2066 return event; 2067 } 2068 2069 static inline int 2070 rb_try_to_discard(struct ring_buffer_per_cpu *cpu_buffer, 2071 struct ring_buffer_event *event) 2072 { 2073 unsigned long new_index, old_index; 2074 struct buffer_page *bpage; 2075 unsigned long index; 2076 unsigned long addr; 2077 2078 new_index = rb_event_index(event); 2079 old_index = new_index + rb_event_ts_length(event); 2080 addr = (unsigned long)event; 2081 addr &= PAGE_MASK; 2082 2083 bpage = cpu_buffer->tail_page; 2084 2085 if (bpage->page == (void *)addr && rb_page_write(bpage) == old_index) { 2086 unsigned long write_mask = 2087 local_read(&bpage->write) & ~RB_WRITE_MASK; 2088 unsigned long event_length = rb_event_length(event); 2089 /* 2090 * This is on the tail page. It is possible that 2091 * a write could come in and move the tail page 2092 * and write to the next page. That is fine 2093 * because we just shorten what is on this page. 2094 */ 2095 old_index += write_mask; 2096 new_index += write_mask; 2097 index = local_cmpxchg(&bpage->write, old_index, new_index); 2098 if (index == old_index) { 2099 /* update counters */ 2100 local_sub(event_length, &cpu_buffer->entries_bytes); 2101 return 1; 2102 } 2103 } 2104 2105 /* could not discard */ 2106 return 0; 2107 } 2108 2109 static void rb_start_commit(struct ring_buffer_per_cpu *cpu_buffer) 2110 { 2111 local_inc(&cpu_buffer->committing); 2112 local_inc(&cpu_buffer->commits); 2113 } 2114 2115 static inline void rb_end_commit(struct ring_buffer_per_cpu *cpu_buffer) 2116 { 2117 unsigned long commits; 2118 2119 if (RB_WARN_ON(cpu_buffer, 2120 !local_read(&cpu_buffer->committing))) 2121 return; 2122 2123 again: 2124 commits = local_read(&cpu_buffer->commits); 2125 /* synchronize with interrupts */ 2126 barrier(); 2127 if (local_read(&cpu_buffer->committing) == 1) 2128 rb_set_commit_to_write(cpu_buffer); 2129 2130 local_dec(&cpu_buffer->committing); 2131 2132 /* synchronize with interrupts */ 2133 barrier(); 2134 2135 /* 2136 * Need to account for interrupts coming in between the 2137 * updating of the commit page and the clearing of the 2138 * committing counter. 2139 */ 2140 if (unlikely(local_read(&cpu_buffer->commits) != commits) && 2141 !local_read(&cpu_buffer->committing)) { 2142 local_inc(&cpu_buffer->committing); 2143 goto again; 2144 } 2145 } 2146 2147 static struct ring_buffer_event * 2148 rb_reserve_next_event(struct ring_buffer *buffer, 2149 struct ring_buffer_per_cpu *cpu_buffer, 2150 unsigned long length) 2151 { 2152 struct ring_buffer_event *event; 2153 u64 ts, delta; 2154 int nr_loops = 0; 2155 int add_timestamp; 2156 u64 diff; 2157 2158 rb_start_commit(cpu_buffer); 2159 2160 #ifdef CONFIG_RING_BUFFER_ALLOW_SWAP 2161 /* 2162 * Due to the ability to swap a cpu buffer from a buffer 2163 * it is possible it was swapped before we committed. 2164 * (committing stops a swap). We check for it here and 2165 * if it happened, we have to fail the write. 2166 */ 2167 barrier(); 2168 if (unlikely(ACCESS_ONCE(cpu_buffer->buffer) != buffer)) { 2169 local_dec(&cpu_buffer->committing); 2170 local_dec(&cpu_buffer->commits); 2171 return NULL; 2172 } 2173 #endif 2174 2175 length = rb_calculate_event_length(length); 2176 again: 2177 add_timestamp = 0; 2178 delta = 0; 2179 2180 /* 2181 * We allow for interrupts to reenter here and do a trace. 2182 * If one does, it will cause this original code to loop 2183 * back here. Even with heavy interrupts happening, this 2184 * should only happen a few times in a row. If this happens 2185 * 1000 times in a row, there must be either an interrupt 2186 * storm or we have something buggy. 2187 * Bail! 2188 */ 2189 if (RB_WARN_ON(cpu_buffer, ++nr_loops > 1000)) 2190 goto out_fail; 2191 2192 ts = rb_time_stamp(cpu_buffer->buffer); 2193 diff = ts - cpu_buffer->write_stamp; 2194 2195 /* make sure this diff is calculated here */ 2196 barrier(); 2197 2198 /* Did the write stamp get updated already? */ 2199 if (likely(ts >= cpu_buffer->write_stamp)) { 2200 delta = diff; 2201 if (unlikely(test_time_stamp(delta))) { 2202 int local_clock_stable = 1; 2203 #ifdef CONFIG_HAVE_UNSTABLE_SCHED_CLOCK 2204 local_clock_stable = sched_clock_stable; 2205 #endif 2206 WARN_ONCE(delta > (1ULL << 59), 2207 KERN_WARNING "Delta way too big! %llu ts=%llu write stamp = %llu\n%s", 2208 (unsigned long long)delta, 2209 (unsigned long long)ts, 2210 (unsigned long long)cpu_buffer->write_stamp, 2211 local_clock_stable ? "" : 2212 "If you just came from a suspend/resume,\n" 2213 "please switch to the trace global clock:\n" 2214 " echo global > /sys/kernel/debug/tracing/trace_clock\n"); 2215 add_timestamp = 1; 2216 } 2217 } 2218 2219 event = __rb_reserve_next(cpu_buffer, length, ts, 2220 delta, add_timestamp); 2221 if (unlikely(PTR_ERR(event) == -EAGAIN)) 2222 goto again; 2223 2224 if (!event) 2225 goto out_fail; 2226 2227 return event; 2228 2229 out_fail: 2230 rb_end_commit(cpu_buffer); 2231 return NULL; 2232 } 2233 2234 #ifdef CONFIG_TRACING 2235 2236 #define TRACE_RECURSIVE_DEPTH 16 2237 2238 /* Keep this code out of the fast path cache */ 2239 static noinline void trace_recursive_fail(void) 2240 { 2241 /* Disable all tracing before we do anything else */ 2242 tracing_off_permanent(); 2243 2244 printk_once(KERN_WARNING "Tracing recursion: depth[%ld]:" 2245 "HC[%lu]:SC[%lu]:NMI[%lu]\n", 2246 trace_recursion_buffer(), 2247 hardirq_count() >> HARDIRQ_SHIFT, 2248 softirq_count() >> SOFTIRQ_SHIFT, 2249 in_nmi()); 2250 2251 WARN_ON_ONCE(1); 2252 } 2253 2254 static inline int trace_recursive_lock(void) 2255 { 2256 trace_recursion_inc(); 2257 2258 if (likely(trace_recursion_buffer() < TRACE_RECURSIVE_DEPTH)) 2259 return 0; 2260 2261 trace_recursive_fail(); 2262 2263 return -1; 2264 } 2265 2266 static inline void trace_recursive_unlock(void) 2267 { 2268 WARN_ON_ONCE(!trace_recursion_buffer()); 2269 2270 trace_recursion_dec(); 2271 } 2272 2273 #else 2274 2275 #define trace_recursive_lock() (0) 2276 #define trace_recursive_unlock() do { } while (0) 2277 2278 #endif 2279 2280 /** 2281 * ring_buffer_lock_reserve - reserve a part of the buffer 2282 * @buffer: the ring buffer to reserve from 2283 * @length: the length of the data to reserve (excluding event header) 2284 * 2285 * Returns a reseverd event on the ring buffer to copy directly to. 2286 * The user of this interface will need to get the body to write into 2287 * and can use the ring_buffer_event_data() interface. 2288 * 2289 * The length is the length of the data needed, not the event length 2290 * which also includes the event header. 2291 * 2292 * Must be paired with ring_buffer_unlock_commit, unless NULL is returned. 2293 * If NULL is returned, then nothing has been allocated or locked. 2294 */ 2295 struct ring_buffer_event * 2296 ring_buffer_lock_reserve(struct ring_buffer *buffer, unsigned long length) 2297 { 2298 struct ring_buffer_per_cpu *cpu_buffer; 2299 struct ring_buffer_event *event; 2300 int cpu; 2301 2302 if (ring_buffer_flags != RB_BUFFERS_ON) 2303 return NULL; 2304 2305 /* If we are tracing schedule, we don't want to recurse */ 2306 preempt_disable_notrace(); 2307 2308 if (atomic_read(&buffer->record_disabled)) 2309 goto out_nocheck; 2310 2311 if (trace_recursive_lock()) 2312 goto out_nocheck; 2313 2314 cpu = raw_smp_processor_id(); 2315 2316 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 2317 goto out; 2318 2319 cpu_buffer = buffer->buffers[cpu]; 2320 2321 if (atomic_read(&cpu_buffer->record_disabled)) 2322 goto out; 2323 2324 if (length > BUF_MAX_DATA_SIZE) 2325 goto out; 2326 2327 event = rb_reserve_next_event(buffer, cpu_buffer, length); 2328 if (!event) 2329 goto out; 2330 2331 return event; 2332 2333 out: 2334 trace_recursive_unlock(); 2335 2336 out_nocheck: 2337 preempt_enable_notrace(); 2338 return NULL; 2339 } 2340 EXPORT_SYMBOL_GPL(ring_buffer_lock_reserve); 2341 2342 static void 2343 rb_update_write_stamp(struct ring_buffer_per_cpu *cpu_buffer, 2344 struct ring_buffer_event *event) 2345 { 2346 u64 delta; 2347 2348 /* 2349 * The event first in the commit queue updates the 2350 * time stamp. 2351 */ 2352 if (rb_event_is_commit(cpu_buffer, event)) { 2353 /* 2354 * A commit event that is first on a page 2355 * updates the write timestamp with the page stamp 2356 */ 2357 if (!rb_event_index(event)) 2358 cpu_buffer->write_stamp = 2359 cpu_buffer->commit_page->page->time_stamp; 2360 else if (event->type_len == RINGBUF_TYPE_TIME_EXTEND) { 2361 delta = event->array[0]; 2362 delta <<= TS_SHIFT; 2363 delta += event->time_delta; 2364 cpu_buffer->write_stamp += delta; 2365 } else 2366 cpu_buffer->write_stamp += event->time_delta; 2367 } 2368 } 2369 2370 static void rb_commit(struct ring_buffer_per_cpu *cpu_buffer, 2371 struct ring_buffer_event *event) 2372 { 2373 local_inc(&cpu_buffer->entries); 2374 rb_update_write_stamp(cpu_buffer, event); 2375 rb_end_commit(cpu_buffer); 2376 } 2377 2378 /** 2379 * ring_buffer_unlock_commit - commit a reserved 2380 * @buffer: The buffer to commit to 2381 * @event: The event pointer to commit. 2382 * 2383 * This commits the data to the ring buffer, and releases any locks held. 2384 * 2385 * Must be paired with ring_buffer_lock_reserve. 2386 */ 2387 int ring_buffer_unlock_commit(struct ring_buffer *buffer, 2388 struct ring_buffer_event *event) 2389 { 2390 struct ring_buffer_per_cpu *cpu_buffer; 2391 int cpu = raw_smp_processor_id(); 2392 2393 cpu_buffer = buffer->buffers[cpu]; 2394 2395 rb_commit(cpu_buffer, event); 2396 2397 trace_recursive_unlock(); 2398 2399 preempt_enable_notrace(); 2400 2401 return 0; 2402 } 2403 EXPORT_SYMBOL_GPL(ring_buffer_unlock_commit); 2404 2405 static inline void rb_event_discard(struct ring_buffer_event *event) 2406 { 2407 if (event->type_len == RINGBUF_TYPE_TIME_EXTEND) 2408 event = skip_time_extend(event); 2409 2410 /* array[0] holds the actual length for the discarded event */ 2411 event->array[0] = rb_event_data_length(event) - RB_EVNT_HDR_SIZE; 2412 event->type_len = RINGBUF_TYPE_PADDING; 2413 /* time delta must be non zero */ 2414 if (!event->time_delta) 2415 event->time_delta = 1; 2416 } 2417 2418 /* 2419 * Decrement the entries to the page that an event is on. 2420 * The event does not even need to exist, only the pointer 2421 * to the page it is on. This may only be called before the commit 2422 * takes place. 2423 */ 2424 static inline void 2425 rb_decrement_entry(struct ring_buffer_per_cpu *cpu_buffer, 2426 struct ring_buffer_event *event) 2427 { 2428 unsigned long addr = (unsigned long)event; 2429 struct buffer_page *bpage = cpu_buffer->commit_page; 2430 struct buffer_page *start; 2431 2432 addr &= PAGE_MASK; 2433 2434 /* Do the likely case first */ 2435 if (likely(bpage->page == (void *)addr)) { 2436 local_dec(&bpage->entries); 2437 return; 2438 } 2439 2440 /* 2441 * Because the commit page may be on the reader page we 2442 * start with the next page and check the end loop there. 2443 */ 2444 rb_inc_page(cpu_buffer, &bpage); 2445 start = bpage; 2446 do { 2447 if (bpage->page == (void *)addr) { 2448 local_dec(&bpage->entries); 2449 return; 2450 } 2451 rb_inc_page(cpu_buffer, &bpage); 2452 } while (bpage != start); 2453 2454 /* commit not part of this buffer?? */ 2455 RB_WARN_ON(cpu_buffer, 1); 2456 } 2457 2458 /** 2459 * ring_buffer_commit_discard - discard an event that has not been committed 2460 * @buffer: the ring buffer 2461 * @event: non committed event to discard 2462 * 2463 * Sometimes an event that is in the ring buffer needs to be ignored. 2464 * This function lets the user discard an event in the ring buffer 2465 * and then that event will not be read later. 2466 * 2467 * This function only works if it is called before the the item has been 2468 * committed. It will try to free the event from the ring buffer 2469 * if another event has not been added behind it. 2470 * 2471 * If another event has been added behind it, it will set the event 2472 * up as discarded, and perform the commit. 2473 * 2474 * If this function is called, do not call ring_buffer_unlock_commit on 2475 * the event. 2476 */ 2477 void ring_buffer_discard_commit(struct ring_buffer *buffer, 2478 struct ring_buffer_event *event) 2479 { 2480 struct ring_buffer_per_cpu *cpu_buffer; 2481 int cpu; 2482 2483 /* The event is discarded regardless */ 2484 rb_event_discard(event); 2485 2486 cpu = smp_processor_id(); 2487 cpu_buffer = buffer->buffers[cpu]; 2488 2489 /* 2490 * This must only be called if the event has not been 2491 * committed yet. Thus we can assume that preemption 2492 * is still disabled. 2493 */ 2494 RB_WARN_ON(buffer, !local_read(&cpu_buffer->committing)); 2495 2496 rb_decrement_entry(cpu_buffer, event); 2497 if (rb_try_to_discard(cpu_buffer, event)) 2498 goto out; 2499 2500 /* 2501 * The commit is still visible by the reader, so we 2502 * must still update the timestamp. 2503 */ 2504 rb_update_write_stamp(cpu_buffer, event); 2505 out: 2506 rb_end_commit(cpu_buffer); 2507 2508 trace_recursive_unlock(); 2509 2510 preempt_enable_notrace(); 2511 2512 } 2513 EXPORT_SYMBOL_GPL(ring_buffer_discard_commit); 2514 2515 /** 2516 * ring_buffer_write - write data to the buffer without reserving 2517 * @buffer: The ring buffer to write to. 2518 * @length: The length of the data being written (excluding the event header) 2519 * @data: The data to write to the buffer. 2520 * 2521 * This is like ring_buffer_lock_reserve and ring_buffer_unlock_commit as 2522 * one function. If you already have the data to write to the buffer, it 2523 * may be easier to simply call this function. 2524 * 2525 * Note, like ring_buffer_lock_reserve, the length is the length of the data 2526 * and not the length of the event which would hold the header. 2527 */ 2528 int ring_buffer_write(struct ring_buffer *buffer, 2529 unsigned long length, 2530 void *data) 2531 { 2532 struct ring_buffer_per_cpu *cpu_buffer; 2533 struct ring_buffer_event *event; 2534 void *body; 2535 int ret = -EBUSY; 2536 int cpu; 2537 2538 if (ring_buffer_flags != RB_BUFFERS_ON) 2539 return -EBUSY; 2540 2541 preempt_disable_notrace(); 2542 2543 if (atomic_read(&buffer->record_disabled)) 2544 goto out; 2545 2546 cpu = raw_smp_processor_id(); 2547 2548 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 2549 goto out; 2550 2551 cpu_buffer = buffer->buffers[cpu]; 2552 2553 if (atomic_read(&cpu_buffer->record_disabled)) 2554 goto out; 2555 2556 if (length > BUF_MAX_DATA_SIZE) 2557 goto out; 2558 2559 event = rb_reserve_next_event(buffer, cpu_buffer, length); 2560 if (!event) 2561 goto out; 2562 2563 body = rb_event_data(event); 2564 2565 memcpy(body, data, length); 2566 2567 rb_commit(cpu_buffer, event); 2568 2569 ret = 0; 2570 out: 2571 preempt_enable_notrace(); 2572 2573 return ret; 2574 } 2575 EXPORT_SYMBOL_GPL(ring_buffer_write); 2576 2577 static int rb_per_cpu_empty(struct ring_buffer_per_cpu *cpu_buffer) 2578 { 2579 struct buffer_page *reader = cpu_buffer->reader_page; 2580 struct buffer_page *head = rb_set_head_page(cpu_buffer); 2581 struct buffer_page *commit = cpu_buffer->commit_page; 2582 2583 /* In case of error, head will be NULL */ 2584 if (unlikely(!head)) 2585 return 1; 2586 2587 return reader->read == rb_page_commit(reader) && 2588 (commit == reader || 2589 (commit == head && 2590 head->read == rb_page_commit(commit))); 2591 } 2592 2593 /** 2594 * ring_buffer_record_disable - stop all writes into the buffer 2595 * @buffer: The ring buffer to stop writes to. 2596 * 2597 * This prevents all writes to the buffer. Any attempt to write 2598 * to the buffer after this will fail and return NULL. 2599 * 2600 * The caller should call synchronize_sched() after this. 2601 */ 2602 void ring_buffer_record_disable(struct ring_buffer *buffer) 2603 { 2604 atomic_inc(&buffer->record_disabled); 2605 } 2606 EXPORT_SYMBOL_GPL(ring_buffer_record_disable); 2607 2608 /** 2609 * ring_buffer_record_enable - enable writes to the buffer 2610 * @buffer: The ring buffer to enable writes 2611 * 2612 * Note, multiple disables will need the same number of enables 2613 * to truly enable the writing (much like preempt_disable). 2614 */ 2615 void ring_buffer_record_enable(struct ring_buffer *buffer) 2616 { 2617 atomic_dec(&buffer->record_disabled); 2618 } 2619 EXPORT_SYMBOL_GPL(ring_buffer_record_enable); 2620 2621 /** 2622 * ring_buffer_record_disable_cpu - stop all writes into the cpu_buffer 2623 * @buffer: The ring buffer to stop writes to. 2624 * @cpu: The CPU buffer to stop 2625 * 2626 * This prevents all writes to the buffer. Any attempt to write 2627 * to the buffer after this will fail and return NULL. 2628 * 2629 * The caller should call synchronize_sched() after this. 2630 */ 2631 void ring_buffer_record_disable_cpu(struct ring_buffer *buffer, int cpu) 2632 { 2633 struct ring_buffer_per_cpu *cpu_buffer; 2634 2635 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 2636 return; 2637 2638 cpu_buffer = buffer->buffers[cpu]; 2639 atomic_inc(&cpu_buffer->record_disabled); 2640 } 2641 EXPORT_SYMBOL_GPL(ring_buffer_record_disable_cpu); 2642 2643 /** 2644 * ring_buffer_record_enable_cpu - enable writes to the buffer 2645 * @buffer: The ring buffer to enable writes 2646 * @cpu: The CPU to enable. 2647 * 2648 * Note, multiple disables will need the same number of enables 2649 * to truly enable the writing (much like preempt_disable). 2650 */ 2651 void ring_buffer_record_enable_cpu(struct ring_buffer *buffer, int cpu) 2652 { 2653 struct ring_buffer_per_cpu *cpu_buffer; 2654 2655 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 2656 return; 2657 2658 cpu_buffer = buffer->buffers[cpu]; 2659 atomic_dec(&cpu_buffer->record_disabled); 2660 } 2661 EXPORT_SYMBOL_GPL(ring_buffer_record_enable_cpu); 2662 2663 /* 2664 * The total entries in the ring buffer is the running counter 2665 * of entries entered into the ring buffer, minus the sum of 2666 * the entries read from the ring buffer and the number of 2667 * entries that were overwritten. 2668 */ 2669 static inline unsigned long 2670 rb_num_of_entries(struct ring_buffer_per_cpu *cpu_buffer) 2671 { 2672 return local_read(&cpu_buffer->entries) - 2673 (local_read(&cpu_buffer->overrun) + cpu_buffer->read); 2674 } 2675 2676 /** 2677 * ring_buffer_oldest_event_ts - get the oldest event timestamp from the buffer 2678 * @buffer: The ring buffer 2679 * @cpu: The per CPU buffer to read from. 2680 */ 2681 unsigned long ring_buffer_oldest_event_ts(struct ring_buffer *buffer, int cpu) 2682 { 2683 unsigned long flags; 2684 struct ring_buffer_per_cpu *cpu_buffer; 2685 struct buffer_page *bpage; 2686 unsigned long ret; 2687 2688 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 2689 return 0; 2690 2691 cpu_buffer = buffer->buffers[cpu]; 2692 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 2693 /* 2694 * if the tail is on reader_page, oldest time stamp is on the reader 2695 * page 2696 */ 2697 if (cpu_buffer->tail_page == cpu_buffer->reader_page) 2698 bpage = cpu_buffer->reader_page; 2699 else 2700 bpage = rb_set_head_page(cpu_buffer); 2701 ret = bpage->page->time_stamp; 2702 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 2703 2704 return ret; 2705 } 2706 EXPORT_SYMBOL_GPL(ring_buffer_oldest_event_ts); 2707 2708 /** 2709 * ring_buffer_bytes_cpu - get the number of bytes consumed in a cpu buffer 2710 * @buffer: The ring buffer 2711 * @cpu: The per CPU buffer to read from. 2712 */ 2713 unsigned long ring_buffer_bytes_cpu(struct ring_buffer *buffer, int cpu) 2714 { 2715 struct ring_buffer_per_cpu *cpu_buffer; 2716 unsigned long ret; 2717 2718 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 2719 return 0; 2720 2721 cpu_buffer = buffer->buffers[cpu]; 2722 ret = local_read(&cpu_buffer->entries_bytes) - cpu_buffer->read_bytes; 2723 2724 return ret; 2725 } 2726 EXPORT_SYMBOL_GPL(ring_buffer_bytes_cpu); 2727 2728 /** 2729 * ring_buffer_entries_cpu - get the number of entries in a cpu buffer 2730 * @buffer: The ring buffer 2731 * @cpu: The per CPU buffer to get the entries from. 2732 */ 2733 unsigned long ring_buffer_entries_cpu(struct ring_buffer *buffer, int cpu) 2734 { 2735 struct ring_buffer_per_cpu *cpu_buffer; 2736 2737 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 2738 return 0; 2739 2740 cpu_buffer = buffer->buffers[cpu]; 2741 2742 return rb_num_of_entries(cpu_buffer); 2743 } 2744 EXPORT_SYMBOL_GPL(ring_buffer_entries_cpu); 2745 2746 /** 2747 * ring_buffer_overrun_cpu - get the number of overruns in a cpu_buffer 2748 * @buffer: The ring buffer 2749 * @cpu: The per CPU buffer to get the number of overruns from 2750 */ 2751 unsigned long ring_buffer_overrun_cpu(struct ring_buffer *buffer, int cpu) 2752 { 2753 struct ring_buffer_per_cpu *cpu_buffer; 2754 unsigned long ret; 2755 2756 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 2757 return 0; 2758 2759 cpu_buffer = buffer->buffers[cpu]; 2760 ret = local_read(&cpu_buffer->overrun); 2761 2762 return ret; 2763 } 2764 EXPORT_SYMBOL_GPL(ring_buffer_overrun_cpu); 2765 2766 /** 2767 * ring_buffer_commit_overrun_cpu - get the number of overruns caused by commits 2768 * @buffer: The ring buffer 2769 * @cpu: The per CPU buffer to get the number of overruns from 2770 */ 2771 unsigned long 2772 ring_buffer_commit_overrun_cpu(struct ring_buffer *buffer, int cpu) 2773 { 2774 struct ring_buffer_per_cpu *cpu_buffer; 2775 unsigned long ret; 2776 2777 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 2778 return 0; 2779 2780 cpu_buffer = buffer->buffers[cpu]; 2781 ret = local_read(&cpu_buffer->commit_overrun); 2782 2783 return ret; 2784 } 2785 EXPORT_SYMBOL_GPL(ring_buffer_commit_overrun_cpu); 2786 2787 /** 2788 * ring_buffer_entries - get the number of entries in a buffer 2789 * @buffer: The ring buffer 2790 * 2791 * Returns the total number of entries in the ring buffer 2792 * (all CPU entries) 2793 */ 2794 unsigned long ring_buffer_entries(struct ring_buffer *buffer) 2795 { 2796 struct ring_buffer_per_cpu *cpu_buffer; 2797 unsigned long entries = 0; 2798 int cpu; 2799 2800 /* if you care about this being correct, lock the buffer */ 2801 for_each_buffer_cpu(buffer, cpu) { 2802 cpu_buffer = buffer->buffers[cpu]; 2803 entries += rb_num_of_entries(cpu_buffer); 2804 } 2805 2806 return entries; 2807 } 2808 EXPORT_SYMBOL_GPL(ring_buffer_entries); 2809 2810 /** 2811 * ring_buffer_overruns - get the number of overruns in buffer 2812 * @buffer: The ring buffer 2813 * 2814 * Returns the total number of overruns in the ring buffer 2815 * (all CPU entries) 2816 */ 2817 unsigned long ring_buffer_overruns(struct ring_buffer *buffer) 2818 { 2819 struct ring_buffer_per_cpu *cpu_buffer; 2820 unsigned long overruns = 0; 2821 int cpu; 2822 2823 /* if you care about this being correct, lock the buffer */ 2824 for_each_buffer_cpu(buffer, cpu) { 2825 cpu_buffer = buffer->buffers[cpu]; 2826 overruns += local_read(&cpu_buffer->overrun); 2827 } 2828 2829 return overruns; 2830 } 2831 EXPORT_SYMBOL_GPL(ring_buffer_overruns); 2832 2833 static void rb_iter_reset(struct ring_buffer_iter *iter) 2834 { 2835 struct ring_buffer_per_cpu *cpu_buffer = iter->cpu_buffer; 2836 2837 /* Iterator usage is expected to have record disabled */ 2838 if (list_empty(&cpu_buffer->reader_page->list)) { 2839 iter->head_page = rb_set_head_page(cpu_buffer); 2840 if (unlikely(!iter->head_page)) 2841 return; 2842 iter->head = iter->head_page->read; 2843 } else { 2844 iter->head_page = cpu_buffer->reader_page; 2845 iter->head = cpu_buffer->reader_page->read; 2846 } 2847 if (iter->head) 2848 iter->read_stamp = cpu_buffer->read_stamp; 2849 else 2850 iter->read_stamp = iter->head_page->page->time_stamp; 2851 iter->cache_reader_page = cpu_buffer->reader_page; 2852 iter->cache_read = cpu_buffer->read; 2853 } 2854 2855 /** 2856 * ring_buffer_iter_reset - reset an iterator 2857 * @iter: The iterator to reset 2858 * 2859 * Resets the iterator, so that it will start from the beginning 2860 * again. 2861 */ 2862 void ring_buffer_iter_reset(struct ring_buffer_iter *iter) 2863 { 2864 struct ring_buffer_per_cpu *cpu_buffer; 2865 unsigned long flags; 2866 2867 if (!iter) 2868 return; 2869 2870 cpu_buffer = iter->cpu_buffer; 2871 2872 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 2873 rb_iter_reset(iter); 2874 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 2875 } 2876 EXPORT_SYMBOL_GPL(ring_buffer_iter_reset); 2877 2878 /** 2879 * ring_buffer_iter_empty - check if an iterator has no more to read 2880 * @iter: The iterator to check 2881 */ 2882 int ring_buffer_iter_empty(struct ring_buffer_iter *iter) 2883 { 2884 struct ring_buffer_per_cpu *cpu_buffer; 2885 2886 cpu_buffer = iter->cpu_buffer; 2887 2888 return iter->head_page == cpu_buffer->commit_page && 2889 iter->head == rb_commit_index(cpu_buffer); 2890 } 2891 EXPORT_SYMBOL_GPL(ring_buffer_iter_empty); 2892 2893 static void 2894 rb_update_read_stamp(struct ring_buffer_per_cpu *cpu_buffer, 2895 struct ring_buffer_event *event) 2896 { 2897 u64 delta; 2898 2899 switch (event->type_len) { 2900 case RINGBUF_TYPE_PADDING: 2901 return; 2902 2903 case RINGBUF_TYPE_TIME_EXTEND: 2904 delta = event->array[0]; 2905 delta <<= TS_SHIFT; 2906 delta += event->time_delta; 2907 cpu_buffer->read_stamp += delta; 2908 return; 2909 2910 case RINGBUF_TYPE_TIME_STAMP: 2911 /* FIXME: not implemented */ 2912 return; 2913 2914 case RINGBUF_TYPE_DATA: 2915 cpu_buffer->read_stamp += event->time_delta; 2916 return; 2917 2918 default: 2919 BUG(); 2920 } 2921 return; 2922 } 2923 2924 static void 2925 rb_update_iter_read_stamp(struct ring_buffer_iter *iter, 2926 struct ring_buffer_event *event) 2927 { 2928 u64 delta; 2929 2930 switch (event->type_len) { 2931 case RINGBUF_TYPE_PADDING: 2932 return; 2933 2934 case RINGBUF_TYPE_TIME_EXTEND: 2935 delta = event->array[0]; 2936 delta <<= TS_SHIFT; 2937 delta += event->time_delta; 2938 iter->read_stamp += delta; 2939 return; 2940 2941 case RINGBUF_TYPE_TIME_STAMP: 2942 /* FIXME: not implemented */ 2943 return; 2944 2945 case RINGBUF_TYPE_DATA: 2946 iter->read_stamp += event->time_delta; 2947 return; 2948 2949 default: 2950 BUG(); 2951 } 2952 return; 2953 } 2954 2955 static struct buffer_page * 2956 rb_get_reader_page(struct ring_buffer_per_cpu *cpu_buffer) 2957 { 2958 struct buffer_page *reader = NULL; 2959 unsigned long overwrite; 2960 unsigned long flags; 2961 int nr_loops = 0; 2962 int ret; 2963 2964 local_irq_save(flags); 2965 arch_spin_lock(&cpu_buffer->lock); 2966 2967 again: 2968 /* 2969 * This should normally only loop twice. But because the 2970 * start of the reader inserts an empty page, it causes 2971 * a case where we will loop three times. There should be no 2972 * reason to loop four times (that I know of). 2973 */ 2974 if (RB_WARN_ON(cpu_buffer, ++nr_loops > 3)) { 2975 reader = NULL; 2976 goto out; 2977 } 2978 2979 reader = cpu_buffer->reader_page; 2980 2981 /* If there's more to read, return this page */ 2982 if (cpu_buffer->reader_page->read < rb_page_size(reader)) 2983 goto out; 2984 2985 /* Never should we have an index greater than the size */ 2986 if (RB_WARN_ON(cpu_buffer, 2987 cpu_buffer->reader_page->read > rb_page_size(reader))) 2988 goto out; 2989 2990 /* check if we caught up to the tail */ 2991 reader = NULL; 2992 if (cpu_buffer->commit_page == cpu_buffer->reader_page) 2993 goto out; 2994 2995 /* 2996 * Reset the reader page to size zero. 2997 */ 2998 local_set(&cpu_buffer->reader_page->write, 0); 2999 local_set(&cpu_buffer->reader_page->entries, 0); 3000 local_set(&cpu_buffer->reader_page->page->commit, 0); 3001 cpu_buffer->reader_page->real_end = 0; 3002 3003 spin: 3004 /* 3005 * Splice the empty reader page into the list around the head. 3006 */ 3007 reader = rb_set_head_page(cpu_buffer); 3008 cpu_buffer->reader_page->list.next = rb_list_head(reader->list.next); 3009 cpu_buffer->reader_page->list.prev = reader->list.prev; 3010 3011 /* 3012 * cpu_buffer->pages just needs to point to the buffer, it 3013 * has no specific buffer page to point to. Lets move it out 3014 * of our way so we don't accidentally swap it. 3015 */ 3016 cpu_buffer->pages = reader->list.prev; 3017 3018 /* The reader page will be pointing to the new head */ 3019 rb_set_list_to_head(cpu_buffer, &cpu_buffer->reader_page->list); 3020 3021 /* 3022 * We want to make sure we read the overruns after we set up our 3023 * pointers to the next object. The writer side does a 3024 * cmpxchg to cross pages which acts as the mb on the writer 3025 * side. Note, the reader will constantly fail the swap 3026 * while the writer is updating the pointers, so this 3027 * guarantees that the overwrite recorded here is the one we 3028 * want to compare with the last_overrun. 3029 */ 3030 smp_mb(); 3031 overwrite = local_read(&(cpu_buffer->overrun)); 3032 3033 /* 3034 * Here's the tricky part. 3035 * 3036 * We need to move the pointer past the header page. 3037 * But we can only do that if a writer is not currently 3038 * moving it. The page before the header page has the 3039 * flag bit '1' set if it is pointing to the page we want. 3040 * but if the writer is in the process of moving it 3041 * than it will be '2' or already moved '0'. 3042 */ 3043 3044 ret = rb_head_page_replace(reader, cpu_buffer->reader_page); 3045 3046 /* 3047 * If we did not convert it, then we must try again. 3048 */ 3049 if (!ret) 3050 goto spin; 3051 3052 /* 3053 * Yeah! We succeeded in replacing the page. 3054 * 3055 * Now make the new head point back to the reader page. 3056 */ 3057 rb_list_head(reader->list.next)->prev = &cpu_buffer->reader_page->list; 3058 rb_inc_page(cpu_buffer, &cpu_buffer->head_page); 3059 3060 /* Finally update the reader page to the new head */ 3061 cpu_buffer->reader_page = reader; 3062 rb_reset_reader_page(cpu_buffer); 3063 3064 if (overwrite != cpu_buffer->last_overrun) { 3065 cpu_buffer->lost_events = overwrite - cpu_buffer->last_overrun; 3066 cpu_buffer->last_overrun = overwrite; 3067 } 3068 3069 goto again; 3070 3071 out: 3072 arch_spin_unlock(&cpu_buffer->lock); 3073 local_irq_restore(flags); 3074 3075 return reader; 3076 } 3077 3078 static void rb_advance_reader(struct ring_buffer_per_cpu *cpu_buffer) 3079 { 3080 struct ring_buffer_event *event; 3081 struct buffer_page *reader; 3082 unsigned length; 3083 3084 reader = rb_get_reader_page(cpu_buffer); 3085 3086 /* This function should not be called when buffer is empty */ 3087 if (RB_WARN_ON(cpu_buffer, !reader)) 3088 return; 3089 3090 event = rb_reader_event(cpu_buffer); 3091 3092 if (event->type_len <= RINGBUF_TYPE_DATA_TYPE_LEN_MAX) 3093 cpu_buffer->read++; 3094 3095 rb_update_read_stamp(cpu_buffer, event); 3096 3097 length = rb_event_length(event); 3098 cpu_buffer->reader_page->read += length; 3099 } 3100 3101 static void rb_advance_iter(struct ring_buffer_iter *iter) 3102 { 3103 struct ring_buffer_per_cpu *cpu_buffer; 3104 struct ring_buffer_event *event; 3105 unsigned length; 3106 3107 cpu_buffer = iter->cpu_buffer; 3108 3109 /* 3110 * Check if we are at the end of the buffer. 3111 */ 3112 if (iter->head >= rb_page_size(iter->head_page)) { 3113 /* discarded commits can make the page empty */ 3114 if (iter->head_page == cpu_buffer->commit_page) 3115 return; 3116 rb_inc_iter(iter); 3117 return; 3118 } 3119 3120 event = rb_iter_head_event(iter); 3121 3122 length = rb_event_length(event); 3123 3124 /* 3125 * This should not be called to advance the header if we are 3126 * at the tail of the buffer. 3127 */ 3128 if (RB_WARN_ON(cpu_buffer, 3129 (iter->head_page == cpu_buffer->commit_page) && 3130 (iter->head + length > rb_commit_index(cpu_buffer)))) 3131 return; 3132 3133 rb_update_iter_read_stamp(iter, event); 3134 3135 iter->head += length; 3136 3137 /* check for end of page padding */ 3138 if ((iter->head >= rb_page_size(iter->head_page)) && 3139 (iter->head_page != cpu_buffer->commit_page)) 3140 rb_advance_iter(iter); 3141 } 3142 3143 static int rb_lost_events(struct ring_buffer_per_cpu *cpu_buffer) 3144 { 3145 return cpu_buffer->lost_events; 3146 } 3147 3148 static struct ring_buffer_event * 3149 rb_buffer_peek(struct ring_buffer_per_cpu *cpu_buffer, u64 *ts, 3150 unsigned long *lost_events) 3151 { 3152 struct ring_buffer_event *event; 3153 struct buffer_page *reader; 3154 int nr_loops = 0; 3155 3156 again: 3157 /* 3158 * We repeat when a time extend is encountered. 3159 * Since the time extend is always attached to a data event, 3160 * we should never loop more than once. 3161 * (We never hit the following condition more than twice). 3162 */ 3163 if (RB_WARN_ON(cpu_buffer, ++nr_loops > 2)) 3164 return NULL; 3165 3166 reader = rb_get_reader_page(cpu_buffer); 3167 if (!reader) 3168 return NULL; 3169 3170 event = rb_reader_event(cpu_buffer); 3171 3172 switch (event->type_len) { 3173 case RINGBUF_TYPE_PADDING: 3174 if (rb_null_event(event)) 3175 RB_WARN_ON(cpu_buffer, 1); 3176 /* 3177 * Because the writer could be discarding every 3178 * event it creates (which would probably be bad) 3179 * if we were to go back to "again" then we may never 3180 * catch up, and will trigger the warn on, or lock 3181 * the box. Return the padding, and we will release 3182 * the current locks, and try again. 3183 */ 3184 return event; 3185 3186 case RINGBUF_TYPE_TIME_EXTEND: 3187 /* Internal data, OK to advance */ 3188 rb_advance_reader(cpu_buffer); 3189 goto again; 3190 3191 case RINGBUF_TYPE_TIME_STAMP: 3192 /* FIXME: not implemented */ 3193 rb_advance_reader(cpu_buffer); 3194 goto again; 3195 3196 case RINGBUF_TYPE_DATA: 3197 if (ts) { 3198 *ts = cpu_buffer->read_stamp + event->time_delta; 3199 ring_buffer_normalize_time_stamp(cpu_buffer->buffer, 3200 cpu_buffer->cpu, ts); 3201 } 3202 if (lost_events) 3203 *lost_events = rb_lost_events(cpu_buffer); 3204 return event; 3205 3206 default: 3207 BUG(); 3208 } 3209 3210 return NULL; 3211 } 3212 EXPORT_SYMBOL_GPL(ring_buffer_peek); 3213 3214 static struct ring_buffer_event * 3215 rb_iter_peek(struct ring_buffer_iter *iter, u64 *ts) 3216 { 3217 struct ring_buffer *buffer; 3218 struct ring_buffer_per_cpu *cpu_buffer; 3219 struct ring_buffer_event *event; 3220 int nr_loops = 0; 3221 3222 cpu_buffer = iter->cpu_buffer; 3223 buffer = cpu_buffer->buffer; 3224 3225 /* 3226 * Check if someone performed a consuming read to 3227 * the buffer. A consuming read invalidates the iterator 3228 * and we need to reset the iterator in this case. 3229 */ 3230 if (unlikely(iter->cache_read != cpu_buffer->read || 3231 iter->cache_reader_page != cpu_buffer->reader_page)) 3232 rb_iter_reset(iter); 3233 3234 again: 3235 if (ring_buffer_iter_empty(iter)) 3236 return NULL; 3237 3238 /* 3239 * We repeat when a time extend is encountered. 3240 * Since the time extend is always attached to a data event, 3241 * we should never loop more than once. 3242 * (We never hit the following condition more than twice). 3243 */ 3244 if (RB_WARN_ON(cpu_buffer, ++nr_loops > 2)) 3245 return NULL; 3246 3247 if (rb_per_cpu_empty(cpu_buffer)) 3248 return NULL; 3249 3250 if (iter->head >= local_read(&iter->head_page->page->commit)) { 3251 rb_inc_iter(iter); 3252 goto again; 3253 } 3254 3255 event = rb_iter_head_event(iter); 3256 3257 switch (event->type_len) { 3258 case RINGBUF_TYPE_PADDING: 3259 if (rb_null_event(event)) { 3260 rb_inc_iter(iter); 3261 goto again; 3262 } 3263 rb_advance_iter(iter); 3264 return event; 3265 3266 case RINGBUF_TYPE_TIME_EXTEND: 3267 /* Internal data, OK to advance */ 3268 rb_advance_iter(iter); 3269 goto again; 3270 3271 case RINGBUF_TYPE_TIME_STAMP: 3272 /* FIXME: not implemented */ 3273 rb_advance_iter(iter); 3274 goto again; 3275 3276 case RINGBUF_TYPE_DATA: 3277 if (ts) { 3278 *ts = iter->read_stamp + event->time_delta; 3279 ring_buffer_normalize_time_stamp(buffer, 3280 cpu_buffer->cpu, ts); 3281 } 3282 return event; 3283 3284 default: 3285 BUG(); 3286 } 3287 3288 return NULL; 3289 } 3290 EXPORT_SYMBOL_GPL(ring_buffer_iter_peek); 3291 3292 static inline int rb_ok_to_lock(void) 3293 { 3294 /* 3295 * If an NMI die dumps out the content of the ring buffer 3296 * do not grab locks. We also permanently disable the ring 3297 * buffer too. A one time deal is all you get from reading 3298 * the ring buffer from an NMI. 3299 */ 3300 if (likely(!in_nmi())) 3301 return 1; 3302 3303 tracing_off_permanent(); 3304 return 0; 3305 } 3306 3307 /** 3308 * ring_buffer_peek - peek at the next event to be read 3309 * @buffer: The ring buffer to read 3310 * @cpu: The cpu to peak at 3311 * @ts: The timestamp counter of this event. 3312 * @lost_events: a variable to store if events were lost (may be NULL) 3313 * 3314 * This will return the event that will be read next, but does 3315 * not consume the data. 3316 */ 3317 struct ring_buffer_event * 3318 ring_buffer_peek(struct ring_buffer *buffer, int cpu, u64 *ts, 3319 unsigned long *lost_events) 3320 { 3321 struct ring_buffer_per_cpu *cpu_buffer = buffer->buffers[cpu]; 3322 struct ring_buffer_event *event; 3323 unsigned long flags; 3324 int dolock; 3325 3326 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 3327 return NULL; 3328 3329 dolock = rb_ok_to_lock(); 3330 again: 3331 local_irq_save(flags); 3332 if (dolock) 3333 raw_spin_lock(&cpu_buffer->reader_lock); 3334 event = rb_buffer_peek(cpu_buffer, ts, lost_events); 3335 if (event && event->type_len == RINGBUF_TYPE_PADDING) 3336 rb_advance_reader(cpu_buffer); 3337 if (dolock) 3338 raw_spin_unlock(&cpu_buffer->reader_lock); 3339 local_irq_restore(flags); 3340 3341 if (event && event->type_len == RINGBUF_TYPE_PADDING) 3342 goto again; 3343 3344 return event; 3345 } 3346 3347 /** 3348 * ring_buffer_iter_peek - peek at the next event to be read 3349 * @iter: The ring buffer iterator 3350 * @ts: The timestamp counter of this event. 3351 * 3352 * This will return the event that will be read next, but does 3353 * not increment the iterator. 3354 */ 3355 struct ring_buffer_event * 3356 ring_buffer_iter_peek(struct ring_buffer_iter *iter, u64 *ts) 3357 { 3358 struct ring_buffer_per_cpu *cpu_buffer = iter->cpu_buffer; 3359 struct ring_buffer_event *event; 3360 unsigned long flags; 3361 3362 again: 3363 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 3364 event = rb_iter_peek(iter, ts); 3365 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 3366 3367 if (event && event->type_len == RINGBUF_TYPE_PADDING) 3368 goto again; 3369 3370 return event; 3371 } 3372 3373 /** 3374 * ring_buffer_consume - return an event and consume it 3375 * @buffer: The ring buffer to get the next event from 3376 * @cpu: the cpu to read the buffer from 3377 * @ts: a variable to store the timestamp (may be NULL) 3378 * @lost_events: a variable to store if events were lost (may be NULL) 3379 * 3380 * Returns the next event in the ring buffer, and that event is consumed. 3381 * Meaning, that sequential reads will keep returning a different event, 3382 * and eventually empty the ring buffer if the producer is slower. 3383 */ 3384 struct ring_buffer_event * 3385 ring_buffer_consume(struct ring_buffer *buffer, int cpu, u64 *ts, 3386 unsigned long *lost_events) 3387 { 3388 struct ring_buffer_per_cpu *cpu_buffer; 3389 struct ring_buffer_event *event = NULL; 3390 unsigned long flags; 3391 int dolock; 3392 3393 dolock = rb_ok_to_lock(); 3394 3395 again: 3396 /* might be called in atomic */ 3397 preempt_disable(); 3398 3399 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 3400 goto out; 3401 3402 cpu_buffer = buffer->buffers[cpu]; 3403 local_irq_save(flags); 3404 if (dolock) 3405 raw_spin_lock(&cpu_buffer->reader_lock); 3406 3407 event = rb_buffer_peek(cpu_buffer, ts, lost_events); 3408 if (event) { 3409 cpu_buffer->lost_events = 0; 3410 rb_advance_reader(cpu_buffer); 3411 } 3412 3413 if (dolock) 3414 raw_spin_unlock(&cpu_buffer->reader_lock); 3415 local_irq_restore(flags); 3416 3417 out: 3418 preempt_enable(); 3419 3420 if (event && event->type_len == RINGBUF_TYPE_PADDING) 3421 goto again; 3422 3423 return event; 3424 } 3425 EXPORT_SYMBOL_GPL(ring_buffer_consume); 3426 3427 /** 3428 * ring_buffer_read_prepare - Prepare for a non consuming read of the buffer 3429 * @buffer: The ring buffer to read from 3430 * @cpu: The cpu buffer to iterate over 3431 * 3432 * This performs the initial preparations necessary to iterate 3433 * through the buffer. Memory is allocated, buffer recording 3434 * is disabled, and the iterator pointer is returned to the caller. 3435 * 3436 * Disabling buffer recordng prevents the reading from being 3437 * corrupted. This is not a consuming read, so a producer is not 3438 * expected. 3439 * 3440 * After a sequence of ring_buffer_read_prepare calls, the user is 3441 * expected to make at least one call to ring_buffer_prepare_sync. 3442 * Afterwards, ring_buffer_read_start is invoked to get things going 3443 * for real. 3444 * 3445 * This overall must be paired with ring_buffer_finish. 3446 */ 3447 struct ring_buffer_iter * 3448 ring_buffer_read_prepare(struct ring_buffer *buffer, int cpu) 3449 { 3450 struct ring_buffer_per_cpu *cpu_buffer; 3451 struct ring_buffer_iter *iter; 3452 3453 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 3454 return NULL; 3455 3456 iter = kmalloc(sizeof(*iter), GFP_KERNEL); 3457 if (!iter) 3458 return NULL; 3459 3460 cpu_buffer = buffer->buffers[cpu]; 3461 3462 iter->cpu_buffer = cpu_buffer; 3463 3464 atomic_inc(&cpu_buffer->record_disabled); 3465 3466 return iter; 3467 } 3468 EXPORT_SYMBOL_GPL(ring_buffer_read_prepare); 3469 3470 /** 3471 * ring_buffer_read_prepare_sync - Synchronize a set of prepare calls 3472 * 3473 * All previously invoked ring_buffer_read_prepare calls to prepare 3474 * iterators will be synchronized. Afterwards, read_buffer_read_start 3475 * calls on those iterators are allowed. 3476 */ 3477 void 3478 ring_buffer_read_prepare_sync(void) 3479 { 3480 synchronize_sched(); 3481 } 3482 EXPORT_SYMBOL_GPL(ring_buffer_read_prepare_sync); 3483 3484 /** 3485 * ring_buffer_read_start - start a non consuming read of the buffer 3486 * @iter: The iterator returned by ring_buffer_read_prepare 3487 * 3488 * This finalizes the startup of an iteration through the buffer. 3489 * The iterator comes from a call to ring_buffer_read_prepare and 3490 * an intervening ring_buffer_read_prepare_sync must have been 3491 * performed. 3492 * 3493 * Must be paired with ring_buffer_finish. 3494 */ 3495 void 3496 ring_buffer_read_start(struct ring_buffer_iter *iter) 3497 { 3498 struct ring_buffer_per_cpu *cpu_buffer; 3499 unsigned long flags; 3500 3501 if (!iter) 3502 return; 3503 3504 cpu_buffer = iter->cpu_buffer; 3505 3506 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 3507 arch_spin_lock(&cpu_buffer->lock); 3508 rb_iter_reset(iter); 3509 arch_spin_unlock(&cpu_buffer->lock); 3510 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 3511 } 3512 EXPORT_SYMBOL_GPL(ring_buffer_read_start); 3513 3514 /** 3515 * ring_buffer_finish - finish reading the iterator of the buffer 3516 * @iter: The iterator retrieved by ring_buffer_start 3517 * 3518 * This re-enables the recording to the buffer, and frees the 3519 * iterator. 3520 */ 3521 void 3522 ring_buffer_read_finish(struct ring_buffer_iter *iter) 3523 { 3524 struct ring_buffer_per_cpu *cpu_buffer = iter->cpu_buffer; 3525 3526 atomic_dec(&cpu_buffer->record_disabled); 3527 kfree(iter); 3528 } 3529 EXPORT_SYMBOL_GPL(ring_buffer_read_finish); 3530 3531 /** 3532 * ring_buffer_read - read the next item in the ring buffer by the iterator 3533 * @iter: The ring buffer iterator 3534 * @ts: The time stamp of the event read. 3535 * 3536 * This reads the next event in the ring buffer and increments the iterator. 3537 */ 3538 struct ring_buffer_event * 3539 ring_buffer_read(struct ring_buffer_iter *iter, u64 *ts) 3540 { 3541 struct ring_buffer_event *event; 3542 struct ring_buffer_per_cpu *cpu_buffer = iter->cpu_buffer; 3543 unsigned long flags; 3544 3545 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 3546 again: 3547 event = rb_iter_peek(iter, ts); 3548 if (!event) 3549 goto out; 3550 3551 if (event->type_len == RINGBUF_TYPE_PADDING) 3552 goto again; 3553 3554 rb_advance_iter(iter); 3555 out: 3556 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 3557 3558 return event; 3559 } 3560 EXPORT_SYMBOL_GPL(ring_buffer_read); 3561 3562 /** 3563 * ring_buffer_size - return the size of the ring buffer (in bytes) 3564 * @buffer: The ring buffer. 3565 */ 3566 unsigned long ring_buffer_size(struct ring_buffer *buffer) 3567 { 3568 return BUF_PAGE_SIZE * buffer->pages; 3569 } 3570 EXPORT_SYMBOL_GPL(ring_buffer_size); 3571 3572 static void 3573 rb_reset_cpu(struct ring_buffer_per_cpu *cpu_buffer) 3574 { 3575 rb_head_page_deactivate(cpu_buffer); 3576 3577 cpu_buffer->head_page 3578 = list_entry(cpu_buffer->pages, struct buffer_page, list); 3579 local_set(&cpu_buffer->head_page->write, 0); 3580 local_set(&cpu_buffer->head_page->entries, 0); 3581 local_set(&cpu_buffer->head_page->page->commit, 0); 3582 3583 cpu_buffer->head_page->read = 0; 3584 3585 cpu_buffer->tail_page = cpu_buffer->head_page; 3586 cpu_buffer->commit_page = cpu_buffer->head_page; 3587 3588 INIT_LIST_HEAD(&cpu_buffer->reader_page->list); 3589 local_set(&cpu_buffer->reader_page->write, 0); 3590 local_set(&cpu_buffer->reader_page->entries, 0); 3591 local_set(&cpu_buffer->reader_page->page->commit, 0); 3592 cpu_buffer->reader_page->read = 0; 3593 3594 local_set(&cpu_buffer->commit_overrun, 0); 3595 local_set(&cpu_buffer->entries_bytes, 0); 3596 local_set(&cpu_buffer->overrun, 0); 3597 local_set(&cpu_buffer->entries, 0); 3598 local_set(&cpu_buffer->committing, 0); 3599 local_set(&cpu_buffer->commits, 0); 3600 cpu_buffer->read = 0; 3601 cpu_buffer->read_bytes = 0; 3602 3603 cpu_buffer->write_stamp = 0; 3604 cpu_buffer->read_stamp = 0; 3605 3606 cpu_buffer->lost_events = 0; 3607 cpu_buffer->last_overrun = 0; 3608 3609 rb_head_page_activate(cpu_buffer); 3610 } 3611 3612 /** 3613 * ring_buffer_reset_cpu - reset a ring buffer per CPU buffer 3614 * @buffer: The ring buffer to reset a per cpu buffer of 3615 * @cpu: The CPU buffer to be reset 3616 */ 3617 void ring_buffer_reset_cpu(struct ring_buffer *buffer, int cpu) 3618 { 3619 struct ring_buffer_per_cpu *cpu_buffer = buffer->buffers[cpu]; 3620 unsigned long flags; 3621 3622 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 3623 return; 3624 3625 atomic_inc(&cpu_buffer->record_disabled); 3626 3627 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 3628 3629 if (RB_WARN_ON(cpu_buffer, local_read(&cpu_buffer->committing))) 3630 goto out; 3631 3632 arch_spin_lock(&cpu_buffer->lock); 3633 3634 rb_reset_cpu(cpu_buffer); 3635 3636 arch_spin_unlock(&cpu_buffer->lock); 3637 3638 out: 3639 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 3640 3641 atomic_dec(&cpu_buffer->record_disabled); 3642 } 3643 EXPORT_SYMBOL_GPL(ring_buffer_reset_cpu); 3644 3645 /** 3646 * ring_buffer_reset - reset a ring buffer 3647 * @buffer: The ring buffer to reset all cpu buffers 3648 */ 3649 void ring_buffer_reset(struct ring_buffer *buffer) 3650 { 3651 int cpu; 3652 3653 for_each_buffer_cpu(buffer, cpu) 3654 ring_buffer_reset_cpu(buffer, cpu); 3655 } 3656 EXPORT_SYMBOL_GPL(ring_buffer_reset); 3657 3658 /** 3659 * rind_buffer_empty - is the ring buffer empty? 3660 * @buffer: The ring buffer to test 3661 */ 3662 int ring_buffer_empty(struct ring_buffer *buffer) 3663 { 3664 struct ring_buffer_per_cpu *cpu_buffer; 3665 unsigned long flags; 3666 int dolock; 3667 int cpu; 3668 int ret; 3669 3670 dolock = rb_ok_to_lock(); 3671 3672 /* yes this is racy, but if you don't like the race, lock the buffer */ 3673 for_each_buffer_cpu(buffer, cpu) { 3674 cpu_buffer = buffer->buffers[cpu]; 3675 local_irq_save(flags); 3676 if (dolock) 3677 raw_spin_lock(&cpu_buffer->reader_lock); 3678 ret = rb_per_cpu_empty(cpu_buffer); 3679 if (dolock) 3680 raw_spin_unlock(&cpu_buffer->reader_lock); 3681 local_irq_restore(flags); 3682 3683 if (!ret) 3684 return 0; 3685 } 3686 3687 return 1; 3688 } 3689 EXPORT_SYMBOL_GPL(ring_buffer_empty); 3690 3691 /** 3692 * ring_buffer_empty_cpu - is a cpu buffer of a ring buffer empty? 3693 * @buffer: The ring buffer 3694 * @cpu: The CPU buffer to test 3695 */ 3696 int ring_buffer_empty_cpu(struct ring_buffer *buffer, int cpu) 3697 { 3698 struct ring_buffer_per_cpu *cpu_buffer; 3699 unsigned long flags; 3700 int dolock; 3701 int ret; 3702 3703 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 3704 return 1; 3705 3706 dolock = rb_ok_to_lock(); 3707 3708 cpu_buffer = buffer->buffers[cpu]; 3709 local_irq_save(flags); 3710 if (dolock) 3711 raw_spin_lock(&cpu_buffer->reader_lock); 3712 ret = rb_per_cpu_empty(cpu_buffer); 3713 if (dolock) 3714 raw_spin_unlock(&cpu_buffer->reader_lock); 3715 local_irq_restore(flags); 3716 3717 return ret; 3718 } 3719 EXPORT_SYMBOL_GPL(ring_buffer_empty_cpu); 3720 3721 #ifdef CONFIG_RING_BUFFER_ALLOW_SWAP 3722 /** 3723 * ring_buffer_swap_cpu - swap a CPU buffer between two ring buffers 3724 * @buffer_a: One buffer to swap with 3725 * @buffer_b: The other buffer to swap with 3726 * 3727 * This function is useful for tracers that want to take a "snapshot" 3728 * of a CPU buffer and has another back up buffer lying around. 3729 * it is expected that the tracer handles the cpu buffer not being 3730 * used at the moment. 3731 */ 3732 int ring_buffer_swap_cpu(struct ring_buffer *buffer_a, 3733 struct ring_buffer *buffer_b, int cpu) 3734 { 3735 struct ring_buffer_per_cpu *cpu_buffer_a; 3736 struct ring_buffer_per_cpu *cpu_buffer_b; 3737 int ret = -EINVAL; 3738 3739 if (!cpumask_test_cpu(cpu, buffer_a->cpumask) || 3740 !cpumask_test_cpu(cpu, buffer_b->cpumask)) 3741 goto out; 3742 3743 /* At least make sure the two buffers are somewhat the same */ 3744 if (buffer_a->pages != buffer_b->pages) 3745 goto out; 3746 3747 ret = -EAGAIN; 3748 3749 if (ring_buffer_flags != RB_BUFFERS_ON) 3750 goto out; 3751 3752 if (atomic_read(&buffer_a->record_disabled)) 3753 goto out; 3754 3755 if (atomic_read(&buffer_b->record_disabled)) 3756 goto out; 3757 3758 cpu_buffer_a = buffer_a->buffers[cpu]; 3759 cpu_buffer_b = buffer_b->buffers[cpu]; 3760 3761 if (atomic_read(&cpu_buffer_a->record_disabled)) 3762 goto out; 3763 3764 if (atomic_read(&cpu_buffer_b->record_disabled)) 3765 goto out; 3766 3767 /* 3768 * We can't do a synchronize_sched here because this 3769 * function can be called in atomic context. 3770 * Normally this will be called from the same CPU as cpu. 3771 * If not it's up to the caller to protect this. 3772 */ 3773 atomic_inc(&cpu_buffer_a->record_disabled); 3774 atomic_inc(&cpu_buffer_b->record_disabled); 3775 3776 ret = -EBUSY; 3777 if (local_read(&cpu_buffer_a->committing)) 3778 goto out_dec; 3779 if (local_read(&cpu_buffer_b->committing)) 3780 goto out_dec; 3781 3782 buffer_a->buffers[cpu] = cpu_buffer_b; 3783 buffer_b->buffers[cpu] = cpu_buffer_a; 3784 3785 cpu_buffer_b->buffer = buffer_a; 3786 cpu_buffer_a->buffer = buffer_b; 3787 3788 ret = 0; 3789 3790 out_dec: 3791 atomic_dec(&cpu_buffer_a->record_disabled); 3792 atomic_dec(&cpu_buffer_b->record_disabled); 3793 out: 3794 return ret; 3795 } 3796 EXPORT_SYMBOL_GPL(ring_buffer_swap_cpu); 3797 #endif /* CONFIG_RING_BUFFER_ALLOW_SWAP */ 3798 3799 /** 3800 * ring_buffer_alloc_read_page - allocate a page to read from buffer 3801 * @buffer: the buffer to allocate for. 3802 * 3803 * This function is used in conjunction with ring_buffer_read_page. 3804 * When reading a full page from the ring buffer, these functions 3805 * can be used to speed up the process. The calling function should 3806 * allocate a few pages first with this function. Then when it 3807 * needs to get pages from the ring buffer, it passes the result 3808 * of this function into ring_buffer_read_page, which will swap 3809 * the page that was allocated, with the read page of the buffer. 3810 * 3811 * Returns: 3812 * The page allocated, or NULL on error. 3813 */ 3814 void *ring_buffer_alloc_read_page(struct ring_buffer *buffer, int cpu) 3815 { 3816 struct buffer_data_page *bpage; 3817 struct page *page; 3818 3819 page = alloc_pages_node(cpu_to_node(cpu), 3820 GFP_KERNEL | __GFP_NORETRY, 0); 3821 if (!page) 3822 return NULL; 3823 3824 bpage = page_address(page); 3825 3826 rb_init_page(bpage); 3827 3828 return bpage; 3829 } 3830 EXPORT_SYMBOL_GPL(ring_buffer_alloc_read_page); 3831 3832 /** 3833 * ring_buffer_free_read_page - free an allocated read page 3834 * @buffer: the buffer the page was allocate for 3835 * @data: the page to free 3836 * 3837 * Free a page allocated from ring_buffer_alloc_read_page. 3838 */ 3839 void ring_buffer_free_read_page(struct ring_buffer *buffer, void *data) 3840 { 3841 free_page((unsigned long)data); 3842 } 3843 EXPORT_SYMBOL_GPL(ring_buffer_free_read_page); 3844 3845 /** 3846 * ring_buffer_read_page - extract a page from the ring buffer 3847 * @buffer: buffer to extract from 3848 * @data_page: the page to use allocated from ring_buffer_alloc_read_page 3849 * @len: amount to extract 3850 * @cpu: the cpu of the buffer to extract 3851 * @full: should the extraction only happen when the page is full. 3852 * 3853 * This function will pull out a page from the ring buffer and consume it. 3854 * @data_page must be the address of the variable that was returned 3855 * from ring_buffer_alloc_read_page. This is because the page might be used 3856 * to swap with a page in the ring buffer. 3857 * 3858 * for example: 3859 * rpage = ring_buffer_alloc_read_page(buffer); 3860 * if (!rpage) 3861 * return error; 3862 * ret = ring_buffer_read_page(buffer, &rpage, len, cpu, 0); 3863 * if (ret >= 0) 3864 * process_page(rpage, ret); 3865 * 3866 * When @full is set, the function will not return true unless 3867 * the writer is off the reader page. 3868 * 3869 * Note: it is up to the calling functions to handle sleeps and wakeups. 3870 * The ring buffer can be used anywhere in the kernel and can not 3871 * blindly call wake_up. The layer that uses the ring buffer must be 3872 * responsible for that. 3873 * 3874 * Returns: 3875 * >=0 if data has been transferred, returns the offset of consumed data. 3876 * <0 if no data has been transferred. 3877 */ 3878 int ring_buffer_read_page(struct ring_buffer *buffer, 3879 void **data_page, size_t len, int cpu, int full) 3880 { 3881 struct ring_buffer_per_cpu *cpu_buffer = buffer->buffers[cpu]; 3882 struct ring_buffer_event *event; 3883 struct buffer_data_page *bpage; 3884 struct buffer_page *reader; 3885 unsigned long missed_events; 3886 unsigned long flags; 3887 unsigned int commit; 3888 unsigned int read; 3889 u64 save_timestamp; 3890 int ret = -1; 3891 3892 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 3893 goto out; 3894 3895 /* 3896 * If len is not big enough to hold the page header, then 3897 * we can not copy anything. 3898 */ 3899 if (len <= BUF_PAGE_HDR_SIZE) 3900 goto out; 3901 3902 len -= BUF_PAGE_HDR_SIZE; 3903 3904 if (!data_page) 3905 goto out; 3906 3907 bpage = *data_page; 3908 if (!bpage) 3909 goto out; 3910 3911 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 3912 3913 reader = rb_get_reader_page(cpu_buffer); 3914 if (!reader) 3915 goto out_unlock; 3916 3917 event = rb_reader_event(cpu_buffer); 3918 3919 read = reader->read; 3920 commit = rb_page_commit(reader); 3921 3922 /* Check if any events were dropped */ 3923 missed_events = cpu_buffer->lost_events; 3924 3925 /* 3926 * If this page has been partially read or 3927 * if len is not big enough to read the rest of the page or 3928 * a writer is still on the page, then 3929 * we must copy the data from the page to the buffer. 3930 * Otherwise, we can simply swap the page with the one passed in. 3931 */ 3932 if (read || (len < (commit - read)) || 3933 cpu_buffer->reader_page == cpu_buffer->commit_page) { 3934 struct buffer_data_page *rpage = cpu_buffer->reader_page->page; 3935 unsigned int rpos = read; 3936 unsigned int pos = 0; 3937 unsigned int size; 3938 3939 if (full) 3940 goto out_unlock; 3941 3942 if (len > (commit - read)) 3943 len = (commit - read); 3944 3945 /* Always keep the time extend and data together */ 3946 size = rb_event_ts_length(event); 3947 3948 if (len < size) 3949 goto out_unlock; 3950 3951 /* save the current timestamp, since the user will need it */ 3952 save_timestamp = cpu_buffer->read_stamp; 3953 3954 /* Need to copy one event at a time */ 3955 do { 3956 /* We need the size of one event, because 3957 * rb_advance_reader only advances by one event, 3958 * whereas rb_event_ts_length may include the size of 3959 * one or two events. 3960 * We have already ensured there's enough space if this 3961 * is a time extend. */ 3962 size = rb_event_length(event); 3963 memcpy(bpage->data + pos, rpage->data + rpos, size); 3964 3965 len -= size; 3966 3967 rb_advance_reader(cpu_buffer); 3968 rpos = reader->read; 3969 pos += size; 3970 3971 if (rpos >= commit) 3972 break; 3973 3974 event = rb_reader_event(cpu_buffer); 3975 /* Always keep the time extend and data together */ 3976 size = rb_event_ts_length(event); 3977 } while (len >= size); 3978 3979 /* update bpage */ 3980 local_set(&bpage->commit, pos); 3981 bpage->time_stamp = save_timestamp; 3982 3983 /* we copied everything to the beginning */ 3984 read = 0; 3985 } else { 3986 /* update the entry counter */ 3987 cpu_buffer->read += rb_page_entries(reader); 3988 cpu_buffer->read_bytes += BUF_PAGE_SIZE; 3989 3990 /* swap the pages */ 3991 rb_init_page(bpage); 3992 bpage = reader->page; 3993 reader->page = *data_page; 3994 local_set(&reader->write, 0); 3995 local_set(&reader->entries, 0); 3996 reader->read = 0; 3997 *data_page = bpage; 3998 3999 /* 4000 * Use the real_end for the data size, 4001 * This gives us a chance to store the lost events 4002 * on the page. 4003 */ 4004 if (reader->real_end) 4005 local_set(&bpage->commit, reader->real_end); 4006 } 4007 ret = read; 4008 4009 cpu_buffer->lost_events = 0; 4010 4011 commit = local_read(&bpage->commit); 4012 /* 4013 * Set a flag in the commit field if we lost events 4014 */ 4015 if (missed_events) { 4016 /* If there is room at the end of the page to save the 4017 * missed events, then record it there. 4018 */ 4019 if (BUF_PAGE_SIZE - commit >= sizeof(missed_events)) { 4020 memcpy(&bpage->data[commit], &missed_events, 4021 sizeof(missed_events)); 4022 local_add(RB_MISSED_STORED, &bpage->commit); 4023 commit += sizeof(missed_events); 4024 } 4025 local_add(RB_MISSED_EVENTS, &bpage->commit); 4026 } 4027 4028 /* 4029 * This page may be off to user land. Zero it out here. 4030 */ 4031 if (commit < BUF_PAGE_SIZE) 4032 memset(&bpage->data[commit], 0, BUF_PAGE_SIZE - commit); 4033 4034 out_unlock: 4035 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 4036 4037 out: 4038 return ret; 4039 } 4040 EXPORT_SYMBOL_GPL(ring_buffer_read_page); 4041 4042 #ifdef CONFIG_TRACING 4043 static ssize_t 4044 rb_simple_read(struct file *filp, char __user *ubuf, 4045 size_t cnt, loff_t *ppos) 4046 { 4047 unsigned long *p = filp->private_data; 4048 char buf[64]; 4049 int r; 4050 4051 if (test_bit(RB_BUFFERS_DISABLED_BIT, p)) 4052 r = sprintf(buf, "permanently disabled\n"); 4053 else 4054 r = sprintf(buf, "%d\n", test_bit(RB_BUFFERS_ON_BIT, p)); 4055 4056 return simple_read_from_buffer(ubuf, cnt, ppos, buf, r); 4057 } 4058 4059 static ssize_t 4060 rb_simple_write(struct file *filp, const char __user *ubuf, 4061 size_t cnt, loff_t *ppos) 4062 { 4063 unsigned long *p = filp->private_data; 4064 unsigned long val; 4065 int ret; 4066 4067 ret = kstrtoul_from_user(ubuf, cnt, 10, &val); 4068 if (ret) 4069 return ret; 4070 4071 if (val) 4072 set_bit(RB_BUFFERS_ON_BIT, p); 4073 else 4074 clear_bit(RB_BUFFERS_ON_BIT, p); 4075 4076 (*ppos)++; 4077 4078 return cnt; 4079 } 4080 4081 static const struct file_operations rb_simple_fops = { 4082 .open = tracing_open_generic, 4083 .read = rb_simple_read, 4084 .write = rb_simple_write, 4085 .llseek = default_llseek, 4086 }; 4087 4088 4089 static __init int rb_init_debugfs(void) 4090 { 4091 struct dentry *d_tracer; 4092 4093 d_tracer = tracing_init_dentry(); 4094 4095 trace_create_file("tracing_on", 0644, d_tracer, 4096 &ring_buffer_flags, &rb_simple_fops); 4097 4098 return 0; 4099 } 4100 4101 fs_initcall(rb_init_debugfs); 4102 #endif 4103 4104 #ifdef CONFIG_HOTPLUG_CPU 4105 static int rb_cpu_notify(struct notifier_block *self, 4106 unsigned long action, void *hcpu) 4107 { 4108 struct ring_buffer *buffer = 4109 container_of(self, struct ring_buffer, cpu_notify); 4110 long cpu = (long)hcpu; 4111 4112 switch (action) { 4113 case CPU_UP_PREPARE: 4114 case CPU_UP_PREPARE_FROZEN: 4115 if (cpumask_test_cpu(cpu, buffer->cpumask)) 4116 return NOTIFY_OK; 4117 4118 buffer->buffers[cpu] = 4119 rb_allocate_cpu_buffer(buffer, cpu); 4120 if (!buffer->buffers[cpu]) { 4121 WARN(1, "failed to allocate ring buffer on CPU %ld\n", 4122 cpu); 4123 return NOTIFY_OK; 4124 } 4125 smp_wmb(); 4126 cpumask_set_cpu(cpu, buffer->cpumask); 4127 break; 4128 case CPU_DOWN_PREPARE: 4129 case CPU_DOWN_PREPARE_FROZEN: 4130 /* 4131 * Do nothing. 4132 * If we were to free the buffer, then the user would 4133 * lose any trace that was in the buffer. 4134 */ 4135 break; 4136 default: 4137 break; 4138 } 4139 return NOTIFY_OK; 4140 } 4141 #endif 4142