1 // SPDX-License-Identifier: GPL-2.0 2 /* 3 * Generic ring buffer 4 * 5 * Copyright (C) 2008 Steven Rostedt <srostedt@redhat.com> 6 */ 7 #include <linux/trace_recursion.h> 8 #include <linux/trace_events.h> 9 #include <linux/ring_buffer.h> 10 #include <linux/trace_clock.h> 11 #include <linux/sched/clock.h> 12 #include <linux/trace_seq.h> 13 #include <linux/spinlock.h> 14 #include <linux/irq_work.h> 15 #include <linux/security.h> 16 #include <linux/uaccess.h> 17 #include <linux/hardirq.h> 18 #include <linux/kthread.h> /* for self test */ 19 #include <linux/module.h> 20 #include <linux/percpu.h> 21 #include <linux/mutex.h> 22 #include <linux/delay.h> 23 #include <linux/slab.h> 24 #include <linux/init.h> 25 #include <linux/hash.h> 26 #include <linux/list.h> 27 #include <linux/cpu.h> 28 #include <linux/oom.h> 29 30 #include <asm/local.h> 31 32 /* 33 * The "absolute" timestamp in the buffer is only 59 bits. 34 * If a clock has the 5 MSBs set, it needs to be saved and 35 * reinserted. 36 */ 37 #define TS_MSB (0xf8ULL << 56) 38 #define ABS_TS_MASK (~TS_MSB) 39 40 static void update_pages_handler(struct work_struct *work); 41 42 /* 43 * The ring buffer header is special. We must manually up keep it. 44 */ 45 int ring_buffer_print_entry_header(struct trace_seq *s) 46 { 47 trace_seq_puts(s, "# compressed entry header\n"); 48 trace_seq_puts(s, "\ttype_len : 5 bits\n"); 49 trace_seq_puts(s, "\ttime_delta : 27 bits\n"); 50 trace_seq_puts(s, "\tarray : 32 bits\n"); 51 trace_seq_putc(s, '\n'); 52 trace_seq_printf(s, "\tpadding : type == %d\n", 53 RINGBUF_TYPE_PADDING); 54 trace_seq_printf(s, "\ttime_extend : type == %d\n", 55 RINGBUF_TYPE_TIME_EXTEND); 56 trace_seq_printf(s, "\ttime_stamp : type == %d\n", 57 RINGBUF_TYPE_TIME_STAMP); 58 trace_seq_printf(s, "\tdata max type_len == %d\n", 59 RINGBUF_TYPE_DATA_TYPE_LEN_MAX); 60 61 return !trace_seq_has_overflowed(s); 62 } 63 64 /* 65 * The ring buffer is made up of a list of pages. A separate list of pages is 66 * allocated for each CPU. A writer may only write to a buffer that is 67 * associated with the CPU it is currently executing on. A reader may read 68 * from any per cpu buffer. 69 * 70 * The reader is special. For each per cpu buffer, the reader has its own 71 * reader page. When a reader has read the entire reader page, this reader 72 * page is swapped with another page in the ring buffer. 73 * 74 * Now, as long as the writer is off the reader page, the reader can do what 75 * ever it wants with that page. The writer will never write to that page 76 * again (as long as it is out of the ring buffer). 77 * 78 * Here's some silly ASCII art. 79 * 80 * +------+ 81 * |reader| RING BUFFER 82 * |page | 83 * +------+ +---+ +---+ +---+ 84 * | |-->| |-->| | 85 * +---+ +---+ +---+ 86 * ^ | 87 * | | 88 * +---------------+ 89 * 90 * 91 * +------+ 92 * |reader| RING BUFFER 93 * |page |------------------v 94 * +------+ +---+ +---+ +---+ 95 * | |-->| |-->| | 96 * +---+ +---+ +---+ 97 * ^ | 98 * | | 99 * +---------------+ 100 * 101 * 102 * +------+ 103 * |reader| RING BUFFER 104 * |page |------------------v 105 * +------+ +---+ +---+ +---+ 106 * ^ | |-->| |-->| | 107 * | +---+ +---+ +---+ 108 * | | 109 * | | 110 * +------------------------------+ 111 * 112 * 113 * +------+ 114 * |buffer| RING BUFFER 115 * |page |------------------v 116 * +------+ +---+ +---+ +---+ 117 * ^ | | | |-->| | 118 * | New +---+ +---+ +---+ 119 * | Reader------^ | 120 * | page | 121 * +------------------------------+ 122 * 123 * 124 * After we make this swap, the reader can hand this page off to the splice 125 * code and be done with it. It can even allocate a new page if it needs to 126 * and swap that into the ring buffer. 127 * 128 * We will be using cmpxchg soon to make all this lockless. 129 * 130 */ 131 132 /* Used for individual buffers (after the counter) */ 133 #define RB_BUFFER_OFF (1 << 20) 134 135 #define BUF_PAGE_HDR_SIZE offsetof(struct buffer_data_page, data) 136 137 #define RB_EVNT_HDR_SIZE (offsetof(struct ring_buffer_event, array)) 138 #define RB_ALIGNMENT 4U 139 #define RB_MAX_SMALL_DATA (RB_ALIGNMENT * RINGBUF_TYPE_DATA_TYPE_LEN_MAX) 140 #define RB_EVNT_MIN_SIZE 8U /* two 32bit words */ 141 142 #ifndef CONFIG_HAVE_64BIT_ALIGNED_ACCESS 143 # define RB_FORCE_8BYTE_ALIGNMENT 0 144 # define RB_ARCH_ALIGNMENT RB_ALIGNMENT 145 #else 146 # define RB_FORCE_8BYTE_ALIGNMENT 1 147 # define RB_ARCH_ALIGNMENT 8U 148 #endif 149 150 #define RB_ALIGN_DATA __aligned(RB_ARCH_ALIGNMENT) 151 152 /* define RINGBUF_TYPE_DATA for 'case RINGBUF_TYPE_DATA:' */ 153 #define RINGBUF_TYPE_DATA 0 ... RINGBUF_TYPE_DATA_TYPE_LEN_MAX 154 155 enum { 156 RB_LEN_TIME_EXTEND = 8, 157 RB_LEN_TIME_STAMP = 8, 158 }; 159 160 #define skip_time_extend(event) \ 161 ((struct ring_buffer_event *)((char *)event + RB_LEN_TIME_EXTEND)) 162 163 #define extended_time(event) \ 164 (event->type_len >= RINGBUF_TYPE_TIME_EXTEND) 165 166 static inline bool rb_null_event(struct ring_buffer_event *event) 167 { 168 return event->type_len == RINGBUF_TYPE_PADDING && !event->time_delta; 169 } 170 171 static void rb_event_set_padding(struct ring_buffer_event *event) 172 { 173 /* padding has a NULL time_delta */ 174 event->type_len = RINGBUF_TYPE_PADDING; 175 event->time_delta = 0; 176 } 177 178 static unsigned 179 rb_event_data_length(struct ring_buffer_event *event) 180 { 181 unsigned length; 182 183 if (event->type_len) 184 length = event->type_len * RB_ALIGNMENT; 185 else 186 length = event->array[0]; 187 return length + RB_EVNT_HDR_SIZE; 188 } 189 190 /* 191 * Return the length of the given event. Will return 192 * the length of the time extend if the event is a 193 * time extend. 194 */ 195 static inline unsigned 196 rb_event_length(struct ring_buffer_event *event) 197 { 198 switch (event->type_len) { 199 case RINGBUF_TYPE_PADDING: 200 if (rb_null_event(event)) 201 /* undefined */ 202 return -1; 203 return event->array[0] + RB_EVNT_HDR_SIZE; 204 205 case RINGBUF_TYPE_TIME_EXTEND: 206 return RB_LEN_TIME_EXTEND; 207 208 case RINGBUF_TYPE_TIME_STAMP: 209 return RB_LEN_TIME_STAMP; 210 211 case RINGBUF_TYPE_DATA: 212 return rb_event_data_length(event); 213 default: 214 WARN_ON_ONCE(1); 215 } 216 /* not hit */ 217 return 0; 218 } 219 220 /* 221 * Return total length of time extend and data, 222 * or just the event length for all other events. 223 */ 224 static inline unsigned 225 rb_event_ts_length(struct ring_buffer_event *event) 226 { 227 unsigned len = 0; 228 229 if (extended_time(event)) { 230 /* time extends include the data event after it */ 231 len = RB_LEN_TIME_EXTEND; 232 event = skip_time_extend(event); 233 } 234 return len + rb_event_length(event); 235 } 236 237 /** 238 * ring_buffer_event_length - return the length of the event 239 * @event: the event to get the length of 240 * 241 * Returns the size of the data load of a data event. 242 * If the event is something other than a data event, it 243 * returns the size of the event itself. With the exception 244 * of a TIME EXTEND, where it still returns the size of the 245 * data load of the data event after it. 246 */ 247 unsigned ring_buffer_event_length(struct ring_buffer_event *event) 248 { 249 unsigned length; 250 251 if (extended_time(event)) 252 event = skip_time_extend(event); 253 254 length = rb_event_length(event); 255 if (event->type_len > RINGBUF_TYPE_DATA_TYPE_LEN_MAX) 256 return length; 257 length -= RB_EVNT_HDR_SIZE; 258 if (length > RB_MAX_SMALL_DATA + sizeof(event->array[0])) 259 length -= sizeof(event->array[0]); 260 return length; 261 } 262 EXPORT_SYMBOL_GPL(ring_buffer_event_length); 263 264 /* inline for ring buffer fast paths */ 265 static __always_inline void * 266 rb_event_data(struct ring_buffer_event *event) 267 { 268 if (extended_time(event)) 269 event = skip_time_extend(event); 270 WARN_ON_ONCE(event->type_len > RINGBUF_TYPE_DATA_TYPE_LEN_MAX); 271 /* If length is in len field, then array[0] has the data */ 272 if (event->type_len) 273 return (void *)&event->array[0]; 274 /* Otherwise length is in array[0] and array[1] has the data */ 275 return (void *)&event->array[1]; 276 } 277 278 /** 279 * ring_buffer_event_data - return the data of the event 280 * @event: the event to get the data from 281 */ 282 void *ring_buffer_event_data(struct ring_buffer_event *event) 283 { 284 return rb_event_data(event); 285 } 286 EXPORT_SYMBOL_GPL(ring_buffer_event_data); 287 288 #define for_each_buffer_cpu(buffer, cpu) \ 289 for_each_cpu(cpu, buffer->cpumask) 290 291 #define for_each_online_buffer_cpu(buffer, cpu) \ 292 for_each_cpu_and(cpu, buffer->cpumask, cpu_online_mask) 293 294 #define TS_SHIFT 27 295 #define TS_MASK ((1ULL << TS_SHIFT) - 1) 296 #define TS_DELTA_TEST (~TS_MASK) 297 298 static u64 rb_event_time_stamp(struct ring_buffer_event *event) 299 { 300 u64 ts; 301 302 ts = event->array[0]; 303 ts <<= TS_SHIFT; 304 ts += event->time_delta; 305 306 return ts; 307 } 308 309 /* Flag when events were overwritten */ 310 #define RB_MISSED_EVENTS (1 << 31) 311 /* Missed count stored at end */ 312 #define RB_MISSED_STORED (1 << 30) 313 314 struct buffer_data_page { 315 u64 time_stamp; /* page time stamp */ 316 local_t commit; /* write committed index */ 317 unsigned char data[] RB_ALIGN_DATA; /* data of buffer page */ 318 }; 319 320 /* 321 * Note, the buffer_page list must be first. The buffer pages 322 * are allocated in cache lines, which means that each buffer 323 * page will be at the beginning of a cache line, and thus 324 * the least significant bits will be zero. We use this to 325 * add flags in the list struct pointers, to make the ring buffer 326 * lockless. 327 */ 328 struct buffer_page { 329 struct list_head list; /* list of buffer pages */ 330 local_t write; /* index for next write */ 331 unsigned read; /* index for next read */ 332 local_t entries; /* entries on this page */ 333 unsigned long real_end; /* real end of data */ 334 struct buffer_data_page *page; /* Actual data page */ 335 }; 336 337 /* 338 * The buffer page counters, write and entries, must be reset 339 * atomically when crossing page boundaries. To synchronize this 340 * update, two counters are inserted into the number. One is 341 * the actual counter for the write position or count on the page. 342 * 343 * The other is a counter of updaters. Before an update happens 344 * the update partition of the counter is incremented. This will 345 * allow the updater to update the counter atomically. 346 * 347 * The counter is 20 bits, and the state data is 12. 348 */ 349 #define RB_WRITE_MASK 0xfffff 350 #define RB_WRITE_INTCNT (1 << 20) 351 352 static void rb_init_page(struct buffer_data_page *bpage) 353 { 354 local_set(&bpage->commit, 0); 355 } 356 357 static void free_buffer_page(struct buffer_page *bpage) 358 { 359 free_page((unsigned long)bpage->page); 360 kfree(bpage); 361 } 362 363 /* 364 * We need to fit the time_stamp delta into 27 bits. 365 */ 366 static inline bool test_time_stamp(u64 delta) 367 { 368 return !!(delta & TS_DELTA_TEST); 369 } 370 371 #define BUF_PAGE_SIZE (PAGE_SIZE - BUF_PAGE_HDR_SIZE) 372 373 /* Max payload is BUF_PAGE_SIZE - header (8bytes) */ 374 #define BUF_MAX_DATA_SIZE (BUF_PAGE_SIZE - (sizeof(u32) * 2)) 375 376 int ring_buffer_print_page_header(struct trace_seq *s) 377 { 378 struct buffer_data_page field; 379 380 trace_seq_printf(s, "\tfield: u64 timestamp;\t" 381 "offset:0;\tsize:%u;\tsigned:%u;\n", 382 (unsigned int)sizeof(field.time_stamp), 383 (unsigned int)is_signed_type(u64)); 384 385 trace_seq_printf(s, "\tfield: local_t commit;\t" 386 "offset:%u;\tsize:%u;\tsigned:%u;\n", 387 (unsigned int)offsetof(typeof(field), commit), 388 (unsigned int)sizeof(field.commit), 389 (unsigned int)is_signed_type(long)); 390 391 trace_seq_printf(s, "\tfield: int overwrite;\t" 392 "offset:%u;\tsize:%u;\tsigned:%u;\n", 393 (unsigned int)offsetof(typeof(field), commit), 394 1, 395 (unsigned int)is_signed_type(long)); 396 397 trace_seq_printf(s, "\tfield: char data;\t" 398 "offset:%u;\tsize:%u;\tsigned:%u;\n", 399 (unsigned int)offsetof(typeof(field), data), 400 (unsigned int)BUF_PAGE_SIZE, 401 (unsigned int)is_signed_type(char)); 402 403 return !trace_seq_has_overflowed(s); 404 } 405 406 struct rb_irq_work { 407 struct irq_work work; 408 wait_queue_head_t waiters; 409 wait_queue_head_t full_waiters; 410 long wait_index; 411 bool waiters_pending; 412 bool full_waiters_pending; 413 bool wakeup_full; 414 }; 415 416 /* 417 * Structure to hold event state and handle nested events. 418 */ 419 struct rb_event_info { 420 u64 ts; 421 u64 delta; 422 u64 before; 423 u64 after; 424 unsigned long length; 425 struct buffer_page *tail_page; 426 int add_timestamp; 427 }; 428 429 /* 430 * Used for the add_timestamp 431 * NONE 432 * EXTEND - wants a time extend 433 * ABSOLUTE - the buffer requests all events to have absolute time stamps 434 * FORCE - force a full time stamp. 435 */ 436 enum { 437 RB_ADD_STAMP_NONE = 0, 438 RB_ADD_STAMP_EXTEND = BIT(1), 439 RB_ADD_STAMP_ABSOLUTE = BIT(2), 440 RB_ADD_STAMP_FORCE = BIT(3) 441 }; 442 /* 443 * Used for which event context the event is in. 444 * TRANSITION = 0 445 * NMI = 1 446 * IRQ = 2 447 * SOFTIRQ = 3 448 * NORMAL = 4 449 * 450 * See trace_recursive_lock() comment below for more details. 451 */ 452 enum { 453 RB_CTX_TRANSITION, 454 RB_CTX_NMI, 455 RB_CTX_IRQ, 456 RB_CTX_SOFTIRQ, 457 RB_CTX_NORMAL, 458 RB_CTX_MAX 459 }; 460 461 #if BITS_PER_LONG == 32 462 #define RB_TIME_32 463 #endif 464 465 /* To test on 64 bit machines */ 466 //#define RB_TIME_32 467 468 #ifdef RB_TIME_32 469 470 struct rb_time_struct { 471 local_t cnt; 472 local_t top; 473 local_t bottom; 474 local_t msb; 475 }; 476 #else 477 #include <asm/local64.h> 478 struct rb_time_struct { 479 local64_t time; 480 }; 481 #endif 482 typedef struct rb_time_struct rb_time_t; 483 484 #define MAX_NEST 5 485 486 /* 487 * head_page == tail_page && head == tail then buffer is empty. 488 */ 489 struct ring_buffer_per_cpu { 490 int cpu; 491 atomic_t record_disabled; 492 atomic_t resize_disabled; 493 struct trace_buffer *buffer; 494 raw_spinlock_t reader_lock; /* serialize readers */ 495 arch_spinlock_t lock; 496 struct lock_class_key lock_key; 497 struct buffer_data_page *free_page; 498 unsigned long nr_pages; 499 unsigned int current_context; 500 struct list_head *pages; 501 struct buffer_page *head_page; /* read from head */ 502 struct buffer_page *tail_page; /* write to tail */ 503 struct buffer_page *commit_page; /* committed pages */ 504 struct buffer_page *reader_page; 505 unsigned long lost_events; 506 unsigned long last_overrun; 507 unsigned long nest; 508 local_t entries_bytes; 509 local_t entries; 510 local_t overrun; 511 local_t commit_overrun; 512 local_t dropped_events; 513 local_t committing; 514 local_t commits; 515 local_t pages_touched; 516 local_t pages_lost; 517 local_t pages_read; 518 long last_pages_touch; 519 size_t shortest_full; 520 unsigned long read; 521 unsigned long read_bytes; 522 rb_time_t write_stamp; 523 rb_time_t before_stamp; 524 u64 event_stamp[MAX_NEST]; 525 u64 read_stamp; 526 /* pages removed since last reset */ 527 unsigned long pages_removed; 528 /* ring buffer pages to update, > 0 to add, < 0 to remove */ 529 long nr_pages_to_update; 530 struct list_head new_pages; /* new pages to add */ 531 struct work_struct update_pages_work; 532 struct completion update_done; 533 534 struct rb_irq_work irq_work; 535 }; 536 537 struct trace_buffer { 538 unsigned flags; 539 int cpus; 540 atomic_t record_disabled; 541 atomic_t resizing; 542 cpumask_var_t cpumask; 543 544 struct lock_class_key *reader_lock_key; 545 546 struct mutex mutex; 547 548 struct ring_buffer_per_cpu **buffers; 549 550 struct hlist_node node; 551 u64 (*clock)(void); 552 553 struct rb_irq_work irq_work; 554 bool time_stamp_abs; 555 }; 556 557 struct ring_buffer_iter { 558 struct ring_buffer_per_cpu *cpu_buffer; 559 unsigned long head; 560 unsigned long next_event; 561 struct buffer_page *head_page; 562 struct buffer_page *cache_reader_page; 563 unsigned long cache_read; 564 unsigned long cache_pages_removed; 565 u64 read_stamp; 566 u64 page_stamp; 567 struct ring_buffer_event *event; 568 int missed_events; 569 }; 570 571 #ifdef RB_TIME_32 572 573 /* 574 * On 32 bit machines, local64_t is very expensive. As the ring 575 * buffer doesn't need all the features of a true 64 bit atomic, 576 * on 32 bit, it uses these functions (64 still uses local64_t). 577 * 578 * For the ring buffer, 64 bit required operations for the time is 579 * the following: 580 * 581 * - Reads may fail if it interrupted a modification of the time stamp. 582 * It will succeed if it did not interrupt another write even if 583 * the read itself is interrupted by a write. 584 * It returns whether it was successful or not. 585 * 586 * - Writes always succeed and will overwrite other writes and writes 587 * that were done by events interrupting the current write. 588 * 589 * - A write followed by a read of the same time stamp will always succeed, 590 * but may not contain the same value. 591 * 592 * - A cmpxchg will fail if it interrupted another write or cmpxchg. 593 * Other than that, it acts like a normal cmpxchg. 594 * 595 * The 60 bit time stamp is broken up by 30 bits in a top and bottom half 596 * (bottom being the least significant 30 bits of the 60 bit time stamp). 597 * 598 * The two most significant bits of each half holds a 2 bit counter (0-3). 599 * Each update will increment this counter by one. 600 * When reading the top and bottom, if the two counter bits match then the 601 * top and bottom together make a valid 60 bit number. 602 */ 603 #define RB_TIME_SHIFT 30 604 #define RB_TIME_VAL_MASK ((1 << RB_TIME_SHIFT) - 1) 605 #define RB_TIME_MSB_SHIFT 60 606 607 static inline int rb_time_cnt(unsigned long val) 608 { 609 return (val >> RB_TIME_SHIFT) & 3; 610 } 611 612 static inline u64 rb_time_val(unsigned long top, unsigned long bottom) 613 { 614 u64 val; 615 616 val = top & RB_TIME_VAL_MASK; 617 val <<= RB_TIME_SHIFT; 618 val |= bottom & RB_TIME_VAL_MASK; 619 620 return val; 621 } 622 623 static inline bool __rb_time_read(rb_time_t *t, u64 *ret, unsigned long *cnt) 624 { 625 unsigned long top, bottom, msb; 626 unsigned long c; 627 628 /* 629 * If the read is interrupted by a write, then the cnt will 630 * be different. Loop until both top and bottom have been read 631 * without interruption. 632 */ 633 do { 634 c = local_read(&t->cnt); 635 top = local_read(&t->top); 636 bottom = local_read(&t->bottom); 637 msb = local_read(&t->msb); 638 } while (c != local_read(&t->cnt)); 639 640 *cnt = rb_time_cnt(top); 641 642 /* If top and bottom counts don't match, this interrupted a write */ 643 if (*cnt != rb_time_cnt(bottom)) 644 return false; 645 646 /* The shift to msb will lose its cnt bits */ 647 *ret = rb_time_val(top, bottom) | ((u64)msb << RB_TIME_MSB_SHIFT); 648 return true; 649 } 650 651 static bool rb_time_read(rb_time_t *t, u64 *ret) 652 { 653 unsigned long cnt; 654 655 return __rb_time_read(t, ret, &cnt); 656 } 657 658 static inline unsigned long rb_time_val_cnt(unsigned long val, unsigned long cnt) 659 { 660 return (val & RB_TIME_VAL_MASK) | ((cnt & 3) << RB_TIME_SHIFT); 661 } 662 663 static inline void rb_time_split(u64 val, unsigned long *top, unsigned long *bottom, 664 unsigned long *msb) 665 { 666 *top = (unsigned long)((val >> RB_TIME_SHIFT) & RB_TIME_VAL_MASK); 667 *bottom = (unsigned long)(val & RB_TIME_VAL_MASK); 668 *msb = (unsigned long)(val >> RB_TIME_MSB_SHIFT); 669 } 670 671 static inline void rb_time_val_set(local_t *t, unsigned long val, unsigned long cnt) 672 { 673 val = rb_time_val_cnt(val, cnt); 674 local_set(t, val); 675 } 676 677 static void rb_time_set(rb_time_t *t, u64 val) 678 { 679 unsigned long cnt, top, bottom, msb; 680 681 rb_time_split(val, &top, &bottom, &msb); 682 683 /* Writes always succeed with a valid number even if it gets interrupted. */ 684 do { 685 cnt = local_inc_return(&t->cnt); 686 rb_time_val_set(&t->top, top, cnt); 687 rb_time_val_set(&t->bottom, bottom, cnt); 688 rb_time_val_set(&t->msb, val >> RB_TIME_MSB_SHIFT, cnt); 689 } while (cnt != local_read(&t->cnt)); 690 } 691 692 static inline bool 693 rb_time_read_cmpxchg(local_t *l, unsigned long expect, unsigned long set) 694 { 695 return local_try_cmpxchg(l, &expect, set); 696 } 697 698 static bool rb_time_cmpxchg(rb_time_t *t, u64 expect, u64 set) 699 { 700 unsigned long cnt, top, bottom, msb; 701 unsigned long cnt2, top2, bottom2, msb2; 702 u64 val; 703 704 /* The cmpxchg always fails if it interrupted an update */ 705 if (!__rb_time_read(t, &val, &cnt2)) 706 return false; 707 708 if (val != expect) 709 return false; 710 711 cnt = local_read(&t->cnt); 712 if ((cnt & 3) != cnt2) 713 return false; 714 715 cnt2 = cnt + 1; 716 717 rb_time_split(val, &top, &bottom, &msb); 718 top = rb_time_val_cnt(top, cnt); 719 bottom = rb_time_val_cnt(bottom, cnt); 720 721 rb_time_split(set, &top2, &bottom2, &msb2); 722 top2 = rb_time_val_cnt(top2, cnt2); 723 bottom2 = rb_time_val_cnt(bottom2, cnt2); 724 725 if (!rb_time_read_cmpxchg(&t->cnt, cnt, cnt2)) 726 return false; 727 if (!rb_time_read_cmpxchg(&t->msb, msb, msb2)) 728 return false; 729 if (!rb_time_read_cmpxchg(&t->top, top, top2)) 730 return false; 731 if (!rb_time_read_cmpxchg(&t->bottom, bottom, bottom2)) 732 return false; 733 return true; 734 } 735 736 #else /* 64 bits */ 737 738 /* local64_t always succeeds */ 739 740 static inline bool rb_time_read(rb_time_t *t, u64 *ret) 741 { 742 *ret = local64_read(&t->time); 743 return true; 744 } 745 static void rb_time_set(rb_time_t *t, u64 val) 746 { 747 local64_set(&t->time, val); 748 } 749 750 static bool rb_time_cmpxchg(rb_time_t *t, u64 expect, u64 set) 751 { 752 return local64_try_cmpxchg(&t->time, &expect, set); 753 } 754 #endif 755 756 /* 757 * Enable this to make sure that the event passed to 758 * ring_buffer_event_time_stamp() is not committed and also 759 * is on the buffer that it passed in. 760 */ 761 //#define RB_VERIFY_EVENT 762 #ifdef RB_VERIFY_EVENT 763 static struct list_head *rb_list_head(struct list_head *list); 764 static void verify_event(struct ring_buffer_per_cpu *cpu_buffer, 765 void *event) 766 { 767 struct buffer_page *page = cpu_buffer->commit_page; 768 struct buffer_page *tail_page = READ_ONCE(cpu_buffer->tail_page); 769 struct list_head *next; 770 long commit, write; 771 unsigned long addr = (unsigned long)event; 772 bool done = false; 773 int stop = 0; 774 775 /* Make sure the event exists and is not committed yet */ 776 do { 777 if (page == tail_page || WARN_ON_ONCE(stop++ > 100)) 778 done = true; 779 commit = local_read(&page->page->commit); 780 write = local_read(&page->write); 781 if (addr >= (unsigned long)&page->page->data[commit] && 782 addr < (unsigned long)&page->page->data[write]) 783 return; 784 785 next = rb_list_head(page->list.next); 786 page = list_entry(next, struct buffer_page, list); 787 } while (!done); 788 WARN_ON_ONCE(1); 789 } 790 #else 791 static inline void verify_event(struct ring_buffer_per_cpu *cpu_buffer, 792 void *event) 793 { 794 } 795 #endif 796 797 /* 798 * The absolute time stamp drops the 5 MSBs and some clocks may 799 * require them. The rb_fix_abs_ts() will take a previous full 800 * time stamp, and add the 5 MSB of that time stamp on to the 801 * saved absolute time stamp. Then they are compared in case of 802 * the unlikely event that the latest time stamp incremented 803 * the 5 MSB. 804 */ 805 static inline u64 rb_fix_abs_ts(u64 abs, u64 save_ts) 806 { 807 if (save_ts & TS_MSB) { 808 abs |= save_ts & TS_MSB; 809 /* Check for overflow */ 810 if (unlikely(abs < save_ts)) 811 abs += 1ULL << 59; 812 } 813 return abs; 814 } 815 816 static inline u64 rb_time_stamp(struct trace_buffer *buffer); 817 818 /** 819 * ring_buffer_event_time_stamp - return the event's current time stamp 820 * @buffer: The buffer that the event is on 821 * @event: the event to get the time stamp of 822 * 823 * Note, this must be called after @event is reserved, and before it is 824 * committed to the ring buffer. And must be called from the same 825 * context where the event was reserved (normal, softirq, irq, etc). 826 * 827 * Returns the time stamp associated with the current event. 828 * If the event has an extended time stamp, then that is used as 829 * the time stamp to return. 830 * In the highly unlikely case that the event was nested more than 831 * the max nesting, then the write_stamp of the buffer is returned, 832 * otherwise current time is returned, but that really neither of 833 * the last two cases should ever happen. 834 */ 835 u64 ring_buffer_event_time_stamp(struct trace_buffer *buffer, 836 struct ring_buffer_event *event) 837 { 838 struct ring_buffer_per_cpu *cpu_buffer = buffer->buffers[smp_processor_id()]; 839 unsigned int nest; 840 u64 ts; 841 842 /* If the event includes an absolute time, then just use that */ 843 if (event->type_len == RINGBUF_TYPE_TIME_STAMP) { 844 ts = rb_event_time_stamp(event); 845 return rb_fix_abs_ts(ts, cpu_buffer->tail_page->page->time_stamp); 846 } 847 848 nest = local_read(&cpu_buffer->committing); 849 verify_event(cpu_buffer, event); 850 if (WARN_ON_ONCE(!nest)) 851 goto fail; 852 853 /* Read the current saved nesting level time stamp */ 854 if (likely(--nest < MAX_NEST)) 855 return cpu_buffer->event_stamp[nest]; 856 857 /* Shouldn't happen, warn if it does */ 858 WARN_ONCE(1, "nest (%d) greater than max", nest); 859 860 fail: 861 /* Can only fail on 32 bit */ 862 if (!rb_time_read(&cpu_buffer->write_stamp, &ts)) 863 /* Screw it, just read the current time */ 864 ts = rb_time_stamp(cpu_buffer->buffer); 865 866 return ts; 867 } 868 869 /** 870 * ring_buffer_nr_pages - get the number of buffer pages in the ring buffer 871 * @buffer: The ring_buffer to get the number of pages from 872 * @cpu: The cpu of the ring_buffer to get the number of pages from 873 * 874 * Returns the number of pages used by a per_cpu buffer of the ring buffer. 875 */ 876 size_t ring_buffer_nr_pages(struct trace_buffer *buffer, int cpu) 877 { 878 return buffer->buffers[cpu]->nr_pages; 879 } 880 881 /** 882 * ring_buffer_nr_dirty_pages - get the number of used pages in the ring buffer 883 * @buffer: The ring_buffer to get the number of pages from 884 * @cpu: The cpu of the ring_buffer to get the number of pages from 885 * 886 * Returns the number of pages that have content in the ring buffer. 887 */ 888 size_t ring_buffer_nr_dirty_pages(struct trace_buffer *buffer, int cpu) 889 { 890 size_t read; 891 size_t lost; 892 size_t cnt; 893 894 read = local_read(&buffer->buffers[cpu]->pages_read); 895 lost = local_read(&buffer->buffers[cpu]->pages_lost); 896 cnt = local_read(&buffer->buffers[cpu]->pages_touched); 897 898 if (WARN_ON_ONCE(cnt < lost)) 899 return 0; 900 901 cnt -= lost; 902 903 /* The reader can read an empty page, but not more than that */ 904 if (cnt < read) { 905 WARN_ON_ONCE(read > cnt + 1); 906 return 0; 907 } 908 909 return cnt - read; 910 } 911 912 static __always_inline bool full_hit(struct trace_buffer *buffer, int cpu, int full) 913 { 914 struct ring_buffer_per_cpu *cpu_buffer = buffer->buffers[cpu]; 915 size_t nr_pages; 916 size_t dirty; 917 918 nr_pages = cpu_buffer->nr_pages; 919 if (!nr_pages || !full) 920 return true; 921 922 dirty = ring_buffer_nr_dirty_pages(buffer, cpu); 923 924 return (dirty * 100) > (full * nr_pages); 925 } 926 927 /* 928 * rb_wake_up_waiters - wake up tasks waiting for ring buffer input 929 * 930 * Schedules a delayed work to wake up any task that is blocked on the 931 * ring buffer waiters queue. 932 */ 933 static void rb_wake_up_waiters(struct irq_work *work) 934 { 935 struct rb_irq_work *rbwork = container_of(work, struct rb_irq_work, work); 936 937 wake_up_all(&rbwork->waiters); 938 if (rbwork->full_waiters_pending || rbwork->wakeup_full) { 939 rbwork->wakeup_full = false; 940 rbwork->full_waiters_pending = false; 941 wake_up_all(&rbwork->full_waiters); 942 } 943 } 944 945 /** 946 * ring_buffer_wake_waiters - wake up any waiters on this ring buffer 947 * @buffer: The ring buffer to wake waiters on 948 * @cpu: The CPU buffer to wake waiters on 949 * 950 * In the case of a file that represents a ring buffer is closing, 951 * it is prudent to wake up any waiters that are on this. 952 */ 953 void ring_buffer_wake_waiters(struct trace_buffer *buffer, int cpu) 954 { 955 struct ring_buffer_per_cpu *cpu_buffer; 956 struct rb_irq_work *rbwork; 957 958 if (!buffer) 959 return; 960 961 if (cpu == RING_BUFFER_ALL_CPUS) { 962 963 /* Wake up individual ones too. One level recursion */ 964 for_each_buffer_cpu(buffer, cpu) 965 ring_buffer_wake_waiters(buffer, cpu); 966 967 rbwork = &buffer->irq_work; 968 } else { 969 if (WARN_ON_ONCE(!buffer->buffers)) 970 return; 971 if (WARN_ON_ONCE(cpu >= nr_cpu_ids)) 972 return; 973 974 cpu_buffer = buffer->buffers[cpu]; 975 /* The CPU buffer may not have been initialized yet */ 976 if (!cpu_buffer) 977 return; 978 rbwork = &cpu_buffer->irq_work; 979 } 980 981 rbwork->wait_index++; 982 /* make sure the waiters see the new index */ 983 smp_wmb(); 984 985 rb_wake_up_waiters(&rbwork->work); 986 } 987 988 /** 989 * ring_buffer_wait - wait for input to the ring buffer 990 * @buffer: buffer to wait on 991 * @cpu: the cpu buffer to wait on 992 * @full: wait until the percentage of pages are available, if @cpu != RING_BUFFER_ALL_CPUS 993 * 994 * If @cpu == RING_BUFFER_ALL_CPUS then the task will wake up as soon 995 * as data is added to any of the @buffer's cpu buffers. Otherwise 996 * it will wait for data to be added to a specific cpu buffer. 997 */ 998 int ring_buffer_wait(struct trace_buffer *buffer, int cpu, int full) 999 { 1000 struct ring_buffer_per_cpu *cpu_buffer; 1001 DEFINE_WAIT(wait); 1002 struct rb_irq_work *work; 1003 long wait_index; 1004 int ret = 0; 1005 1006 /* 1007 * Depending on what the caller is waiting for, either any 1008 * data in any cpu buffer, or a specific buffer, put the 1009 * caller on the appropriate wait queue. 1010 */ 1011 if (cpu == RING_BUFFER_ALL_CPUS) { 1012 work = &buffer->irq_work; 1013 /* Full only makes sense on per cpu reads */ 1014 full = 0; 1015 } else { 1016 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 1017 return -ENODEV; 1018 cpu_buffer = buffer->buffers[cpu]; 1019 work = &cpu_buffer->irq_work; 1020 } 1021 1022 wait_index = READ_ONCE(work->wait_index); 1023 1024 while (true) { 1025 if (full) 1026 prepare_to_wait(&work->full_waiters, &wait, TASK_INTERRUPTIBLE); 1027 else 1028 prepare_to_wait(&work->waiters, &wait, TASK_INTERRUPTIBLE); 1029 1030 /* 1031 * The events can happen in critical sections where 1032 * checking a work queue can cause deadlocks. 1033 * After adding a task to the queue, this flag is set 1034 * only to notify events to try to wake up the queue 1035 * using irq_work. 1036 * 1037 * We don't clear it even if the buffer is no longer 1038 * empty. The flag only causes the next event to run 1039 * irq_work to do the work queue wake up. The worse 1040 * that can happen if we race with !trace_empty() is that 1041 * an event will cause an irq_work to try to wake up 1042 * an empty queue. 1043 * 1044 * There's no reason to protect this flag either, as 1045 * the work queue and irq_work logic will do the necessary 1046 * synchronization for the wake ups. The only thing 1047 * that is necessary is that the wake up happens after 1048 * a task has been queued. It's OK for spurious wake ups. 1049 */ 1050 if (full) 1051 work->full_waiters_pending = true; 1052 else 1053 work->waiters_pending = true; 1054 1055 if (signal_pending(current)) { 1056 ret = -EINTR; 1057 break; 1058 } 1059 1060 if (cpu == RING_BUFFER_ALL_CPUS && !ring_buffer_empty(buffer)) 1061 break; 1062 1063 if (cpu != RING_BUFFER_ALL_CPUS && 1064 !ring_buffer_empty_cpu(buffer, cpu)) { 1065 unsigned long flags; 1066 bool pagebusy; 1067 bool done; 1068 1069 if (!full) 1070 break; 1071 1072 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 1073 pagebusy = cpu_buffer->reader_page == cpu_buffer->commit_page; 1074 done = !pagebusy && full_hit(buffer, cpu, full); 1075 1076 if (!cpu_buffer->shortest_full || 1077 cpu_buffer->shortest_full > full) 1078 cpu_buffer->shortest_full = full; 1079 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 1080 if (done) 1081 break; 1082 } 1083 1084 schedule(); 1085 1086 /* Make sure to see the new wait index */ 1087 smp_rmb(); 1088 if (wait_index != work->wait_index) 1089 break; 1090 } 1091 1092 if (full) 1093 finish_wait(&work->full_waiters, &wait); 1094 else 1095 finish_wait(&work->waiters, &wait); 1096 1097 return ret; 1098 } 1099 1100 /** 1101 * ring_buffer_poll_wait - poll on buffer input 1102 * @buffer: buffer to wait on 1103 * @cpu: the cpu buffer to wait on 1104 * @filp: the file descriptor 1105 * @poll_table: The poll descriptor 1106 * @full: wait until the percentage of pages are available, if @cpu != RING_BUFFER_ALL_CPUS 1107 * 1108 * If @cpu == RING_BUFFER_ALL_CPUS then the task will wake up as soon 1109 * as data is added to any of the @buffer's cpu buffers. Otherwise 1110 * it will wait for data to be added to a specific cpu buffer. 1111 * 1112 * Returns EPOLLIN | EPOLLRDNORM if data exists in the buffers, 1113 * zero otherwise. 1114 */ 1115 __poll_t ring_buffer_poll_wait(struct trace_buffer *buffer, int cpu, 1116 struct file *filp, poll_table *poll_table, int full) 1117 { 1118 struct ring_buffer_per_cpu *cpu_buffer; 1119 struct rb_irq_work *work; 1120 1121 if (cpu == RING_BUFFER_ALL_CPUS) { 1122 work = &buffer->irq_work; 1123 full = 0; 1124 } else { 1125 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 1126 return -EINVAL; 1127 1128 cpu_buffer = buffer->buffers[cpu]; 1129 work = &cpu_buffer->irq_work; 1130 } 1131 1132 if (full) { 1133 poll_wait(filp, &work->full_waiters, poll_table); 1134 work->full_waiters_pending = true; 1135 } else { 1136 poll_wait(filp, &work->waiters, poll_table); 1137 work->waiters_pending = true; 1138 } 1139 1140 /* 1141 * There's a tight race between setting the waiters_pending and 1142 * checking if the ring buffer is empty. Once the waiters_pending bit 1143 * is set, the next event will wake the task up, but we can get stuck 1144 * if there's only a single event in. 1145 * 1146 * FIXME: Ideally, we need a memory barrier on the writer side as well, 1147 * but adding a memory barrier to all events will cause too much of a 1148 * performance hit in the fast path. We only need a memory barrier when 1149 * the buffer goes from empty to having content. But as this race is 1150 * extremely small, and it's not a problem if another event comes in, we 1151 * will fix it later. 1152 */ 1153 smp_mb(); 1154 1155 if (full) 1156 return full_hit(buffer, cpu, full) ? EPOLLIN | EPOLLRDNORM : 0; 1157 1158 if ((cpu == RING_BUFFER_ALL_CPUS && !ring_buffer_empty(buffer)) || 1159 (cpu != RING_BUFFER_ALL_CPUS && !ring_buffer_empty_cpu(buffer, cpu))) 1160 return EPOLLIN | EPOLLRDNORM; 1161 return 0; 1162 } 1163 1164 /* buffer may be either ring_buffer or ring_buffer_per_cpu */ 1165 #define RB_WARN_ON(b, cond) \ 1166 ({ \ 1167 int _____ret = unlikely(cond); \ 1168 if (_____ret) { \ 1169 if (__same_type(*(b), struct ring_buffer_per_cpu)) { \ 1170 struct ring_buffer_per_cpu *__b = \ 1171 (void *)b; \ 1172 atomic_inc(&__b->buffer->record_disabled); \ 1173 } else \ 1174 atomic_inc(&b->record_disabled); \ 1175 WARN_ON(1); \ 1176 } \ 1177 _____ret; \ 1178 }) 1179 1180 /* Up this if you want to test the TIME_EXTENTS and normalization */ 1181 #define DEBUG_SHIFT 0 1182 1183 static inline u64 rb_time_stamp(struct trace_buffer *buffer) 1184 { 1185 u64 ts; 1186 1187 /* Skip retpolines :-( */ 1188 if (IS_ENABLED(CONFIG_RETPOLINE) && likely(buffer->clock == trace_clock_local)) 1189 ts = trace_clock_local(); 1190 else 1191 ts = buffer->clock(); 1192 1193 /* shift to debug/test normalization and TIME_EXTENTS */ 1194 return ts << DEBUG_SHIFT; 1195 } 1196 1197 u64 ring_buffer_time_stamp(struct trace_buffer *buffer) 1198 { 1199 u64 time; 1200 1201 preempt_disable_notrace(); 1202 time = rb_time_stamp(buffer); 1203 preempt_enable_notrace(); 1204 1205 return time; 1206 } 1207 EXPORT_SYMBOL_GPL(ring_buffer_time_stamp); 1208 1209 void ring_buffer_normalize_time_stamp(struct trace_buffer *buffer, 1210 int cpu, u64 *ts) 1211 { 1212 /* Just stupid testing the normalize function and deltas */ 1213 *ts >>= DEBUG_SHIFT; 1214 } 1215 EXPORT_SYMBOL_GPL(ring_buffer_normalize_time_stamp); 1216 1217 /* 1218 * Making the ring buffer lockless makes things tricky. 1219 * Although writes only happen on the CPU that they are on, 1220 * and they only need to worry about interrupts. Reads can 1221 * happen on any CPU. 1222 * 1223 * The reader page is always off the ring buffer, but when the 1224 * reader finishes with a page, it needs to swap its page with 1225 * a new one from the buffer. The reader needs to take from 1226 * the head (writes go to the tail). But if a writer is in overwrite 1227 * mode and wraps, it must push the head page forward. 1228 * 1229 * Here lies the problem. 1230 * 1231 * The reader must be careful to replace only the head page, and 1232 * not another one. As described at the top of the file in the 1233 * ASCII art, the reader sets its old page to point to the next 1234 * page after head. It then sets the page after head to point to 1235 * the old reader page. But if the writer moves the head page 1236 * during this operation, the reader could end up with the tail. 1237 * 1238 * We use cmpxchg to help prevent this race. We also do something 1239 * special with the page before head. We set the LSB to 1. 1240 * 1241 * When the writer must push the page forward, it will clear the 1242 * bit that points to the head page, move the head, and then set 1243 * the bit that points to the new head page. 1244 * 1245 * We also don't want an interrupt coming in and moving the head 1246 * page on another writer. Thus we use the second LSB to catch 1247 * that too. Thus: 1248 * 1249 * head->list->prev->next bit 1 bit 0 1250 * ------- ------- 1251 * Normal page 0 0 1252 * Points to head page 0 1 1253 * New head page 1 0 1254 * 1255 * Note we can not trust the prev pointer of the head page, because: 1256 * 1257 * +----+ +-----+ +-----+ 1258 * | |------>| T |---X--->| N | 1259 * | |<------| | | | 1260 * +----+ +-----+ +-----+ 1261 * ^ ^ | 1262 * | +-----+ | | 1263 * +----------| R |----------+ | 1264 * | |<-----------+ 1265 * +-----+ 1266 * 1267 * Key: ---X--> HEAD flag set in pointer 1268 * T Tail page 1269 * R Reader page 1270 * N Next page 1271 * 1272 * (see __rb_reserve_next() to see where this happens) 1273 * 1274 * What the above shows is that the reader just swapped out 1275 * the reader page with a page in the buffer, but before it 1276 * could make the new header point back to the new page added 1277 * it was preempted by a writer. The writer moved forward onto 1278 * the new page added by the reader and is about to move forward 1279 * again. 1280 * 1281 * You can see, it is legitimate for the previous pointer of 1282 * the head (or any page) not to point back to itself. But only 1283 * temporarily. 1284 */ 1285 1286 #define RB_PAGE_NORMAL 0UL 1287 #define RB_PAGE_HEAD 1UL 1288 #define RB_PAGE_UPDATE 2UL 1289 1290 1291 #define RB_FLAG_MASK 3UL 1292 1293 /* PAGE_MOVED is not part of the mask */ 1294 #define RB_PAGE_MOVED 4UL 1295 1296 /* 1297 * rb_list_head - remove any bit 1298 */ 1299 static struct list_head *rb_list_head(struct list_head *list) 1300 { 1301 unsigned long val = (unsigned long)list; 1302 1303 return (struct list_head *)(val & ~RB_FLAG_MASK); 1304 } 1305 1306 /* 1307 * rb_is_head_page - test if the given page is the head page 1308 * 1309 * Because the reader may move the head_page pointer, we can 1310 * not trust what the head page is (it may be pointing to 1311 * the reader page). But if the next page is a header page, 1312 * its flags will be non zero. 1313 */ 1314 static inline int 1315 rb_is_head_page(struct buffer_page *page, struct list_head *list) 1316 { 1317 unsigned long val; 1318 1319 val = (unsigned long)list->next; 1320 1321 if ((val & ~RB_FLAG_MASK) != (unsigned long)&page->list) 1322 return RB_PAGE_MOVED; 1323 1324 return val & RB_FLAG_MASK; 1325 } 1326 1327 /* 1328 * rb_is_reader_page 1329 * 1330 * The unique thing about the reader page, is that, if the 1331 * writer is ever on it, the previous pointer never points 1332 * back to the reader page. 1333 */ 1334 static bool rb_is_reader_page(struct buffer_page *page) 1335 { 1336 struct list_head *list = page->list.prev; 1337 1338 return rb_list_head(list->next) != &page->list; 1339 } 1340 1341 /* 1342 * rb_set_list_to_head - set a list_head to be pointing to head. 1343 */ 1344 static void rb_set_list_to_head(struct list_head *list) 1345 { 1346 unsigned long *ptr; 1347 1348 ptr = (unsigned long *)&list->next; 1349 *ptr |= RB_PAGE_HEAD; 1350 *ptr &= ~RB_PAGE_UPDATE; 1351 } 1352 1353 /* 1354 * rb_head_page_activate - sets up head page 1355 */ 1356 static void rb_head_page_activate(struct ring_buffer_per_cpu *cpu_buffer) 1357 { 1358 struct buffer_page *head; 1359 1360 head = cpu_buffer->head_page; 1361 if (!head) 1362 return; 1363 1364 /* 1365 * Set the previous list pointer to have the HEAD flag. 1366 */ 1367 rb_set_list_to_head(head->list.prev); 1368 } 1369 1370 static void rb_list_head_clear(struct list_head *list) 1371 { 1372 unsigned long *ptr = (unsigned long *)&list->next; 1373 1374 *ptr &= ~RB_FLAG_MASK; 1375 } 1376 1377 /* 1378 * rb_head_page_deactivate - clears head page ptr (for free list) 1379 */ 1380 static void 1381 rb_head_page_deactivate(struct ring_buffer_per_cpu *cpu_buffer) 1382 { 1383 struct list_head *hd; 1384 1385 /* Go through the whole list and clear any pointers found. */ 1386 rb_list_head_clear(cpu_buffer->pages); 1387 1388 list_for_each(hd, cpu_buffer->pages) 1389 rb_list_head_clear(hd); 1390 } 1391 1392 static int rb_head_page_set(struct ring_buffer_per_cpu *cpu_buffer, 1393 struct buffer_page *head, 1394 struct buffer_page *prev, 1395 int old_flag, int new_flag) 1396 { 1397 struct list_head *list; 1398 unsigned long val = (unsigned long)&head->list; 1399 unsigned long ret; 1400 1401 list = &prev->list; 1402 1403 val &= ~RB_FLAG_MASK; 1404 1405 ret = cmpxchg((unsigned long *)&list->next, 1406 val | old_flag, val | new_flag); 1407 1408 /* check if the reader took the page */ 1409 if ((ret & ~RB_FLAG_MASK) != val) 1410 return RB_PAGE_MOVED; 1411 1412 return ret & RB_FLAG_MASK; 1413 } 1414 1415 static int rb_head_page_set_update(struct ring_buffer_per_cpu *cpu_buffer, 1416 struct buffer_page *head, 1417 struct buffer_page *prev, 1418 int old_flag) 1419 { 1420 return rb_head_page_set(cpu_buffer, head, prev, 1421 old_flag, RB_PAGE_UPDATE); 1422 } 1423 1424 static int rb_head_page_set_head(struct ring_buffer_per_cpu *cpu_buffer, 1425 struct buffer_page *head, 1426 struct buffer_page *prev, 1427 int old_flag) 1428 { 1429 return rb_head_page_set(cpu_buffer, head, prev, 1430 old_flag, RB_PAGE_HEAD); 1431 } 1432 1433 static int rb_head_page_set_normal(struct ring_buffer_per_cpu *cpu_buffer, 1434 struct buffer_page *head, 1435 struct buffer_page *prev, 1436 int old_flag) 1437 { 1438 return rb_head_page_set(cpu_buffer, head, prev, 1439 old_flag, RB_PAGE_NORMAL); 1440 } 1441 1442 static inline void rb_inc_page(struct buffer_page **bpage) 1443 { 1444 struct list_head *p = rb_list_head((*bpage)->list.next); 1445 1446 *bpage = list_entry(p, struct buffer_page, list); 1447 } 1448 1449 static struct buffer_page * 1450 rb_set_head_page(struct ring_buffer_per_cpu *cpu_buffer) 1451 { 1452 struct buffer_page *head; 1453 struct buffer_page *page; 1454 struct list_head *list; 1455 int i; 1456 1457 if (RB_WARN_ON(cpu_buffer, !cpu_buffer->head_page)) 1458 return NULL; 1459 1460 /* sanity check */ 1461 list = cpu_buffer->pages; 1462 if (RB_WARN_ON(cpu_buffer, rb_list_head(list->prev->next) != list)) 1463 return NULL; 1464 1465 page = head = cpu_buffer->head_page; 1466 /* 1467 * It is possible that the writer moves the header behind 1468 * where we started, and we miss in one loop. 1469 * A second loop should grab the header, but we'll do 1470 * three loops just because I'm paranoid. 1471 */ 1472 for (i = 0; i < 3; i++) { 1473 do { 1474 if (rb_is_head_page(page, page->list.prev)) { 1475 cpu_buffer->head_page = page; 1476 return page; 1477 } 1478 rb_inc_page(&page); 1479 } while (page != head); 1480 } 1481 1482 RB_WARN_ON(cpu_buffer, 1); 1483 1484 return NULL; 1485 } 1486 1487 static bool rb_head_page_replace(struct buffer_page *old, 1488 struct buffer_page *new) 1489 { 1490 unsigned long *ptr = (unsigned long *)&old->list.prev->next; 1491 unsigned long val; 1492 1493 val = *ptr & ~RB_FLAG_MASK; 1494 val |= RB_PAGE_HEAD; 1495 1496 return try_cmpxchg(ptr, &val, (unsigned long)&new->list); 1497 } 1498 1499 /* 1500 * rb_tail_page_update - move the tail page forward 1501 */ 1502 static void rb_tail_page_update(struct ring_buffer_per_cpu *cpu_buffer, 1503 struct buffer_page *tail_page, 1504 struct buffer_page *next_page) 1505 { 1506 unsigned long old_entries; 1507 unsigned long old_write; 1508 1509 /* 1510 * The tail page now needs to be moved forward. 1511 * 1512 * We need to reset the tail page, but without messing 1513 * with possible erasing of data brought in by interrupts 1514 * that have moved the tail page and are currently on it. 1515 * 1516 * We add a counter to the write field to denote this. 1517 */ 1518 old_write = local_add_return(RB_WRITE_INTCNT, &next_page->write); 1519 old_entries = local_add_return(RB_WRITE_INTCNT, &next_page->entries); 1520 1521 local_inc(&cpu_buffer->pages_touched); 1522 /* 1523 * Just make sure we have seen our old_write and synchronize 1524 * with any interrupts that come in. 1525 */ 1526 barrier(); 1527 1528 /* 1529 * If the tail page is still the same as what we think 1530 * it is, then it is up to us to update the tail 1531 * pointer. 1532 */ 1533 if (tail_page == READ_ONCE(cpu_buffer->tail_page)) { 1534 /* Zero the write counter */ 1535 unsigned long val = old_write & ~RB_WRITE_MASK; 1536 unsigned long eval = old_entries & ~RB_WRITE_MASK; 1537 1538 /* 1539 * This will only succeed if an interrupt did 1540 * not come in and change it. In which case, we 1541 * do not want to modify it. 1542 * 1543 * We add (void) to let the compiler know that we do not care 1544 * about the return value of these functions. We use the 1545 * cmpxchg to only update if an interrupt did not already 1546 * do it for us. If the cmpxchg fails, we don't care. 1547 */ 1548 (void)local_cmpxchg(&next_page->write, old_write, val); 1549 (void)local_cmpxchg(&next_page->entries, old_entries, eval); 1550 1551 /* 1552 * No need to worry about races with clearing out the commit. 1553 * it only can increment when a commit takes place. But that 1554 * only happens in the outer most nested commit. 1555 */ 1556 local_set(&next_page->page->commit, 0); 1557 1558 /* Again, either we update tail_page or an interrupt does */ 1559 (void)cmpxchg(&cpu_buffer->tail_page, tail_page, next_page); 1560 } 1561 } 1562 1563 static void rb_check_bpage(struct ring_buffer_per_cpu *cpu_buffer, 1564 struct buffer_page *bpage) 1565 { 1566 unsigned long val = (unsigned long)bpage; 1567 1568 RB_WARN_ON(cpu_buffer, val & RB_FLAG_MASK); 1569 } 1570 1571 /** 1572 * rb_check_pages - integrity check of buffer pages 1573 * @cpu_buffer: CPU buffer with pages to test 1574 * 1575 * As a safety measure we check to make sure the data pages have not 1576 * been corrupted. 1577 */ 1578 static void rb_check_pages(struct ring_buffer_per_cpu *cpu_buffer) 1579 { 1580 struct list_head *head = rb_list_head(cpu_buffer->pages); 1581 struct list_head *tmp; 1582 1583 if (RB_WARN_ON(cpu_buffer, 1584 rb_list_head(rb_list_head(head->next)->prev) != head)) 1585 return; 1586 1587 if (RB_WARN_ON(cpu_buffer, 1588 rb_list_head(rb_list_head(head->prev)->next) != head)) 1589 return; 1590 1591 for (tmp = rb_list_head(head->next); tmp != head; tmp = rb_list_head(tmp->next)) { 1592 if (RB_WARN_ON(cpu_buffer, 1593 rb_list_head(rb_list_head(tmp->next)->prev) != tmp)) 1594 return; 1595 1596 if (RB_WARN_ON(cpu_buffer, 1597 rb_list_head(rb_list_head(tmp->prev)->next) != tmp)) 1598 return; 1599 } 1600 } 1601 1602 static int __rb_allocate_pages(struct ring_buffer_per_cpu *cpu_buffer, 1603 long nr_pages, struct list_head *pages) 1604 { 1605 struct buffer_page *bpage, *tmp; 1606 bool user_thread = current->mm != NULL; 1607 gfp_t mflags; 1608 long i; 1609 1610 /* 1611 * Check if the available memory is there first. 1612 * Note, si_mem_available() only gives us a rough estimate of available 1613 * memory. It may not be accurate. But we don't care, we just want 1614 * to prevent doing any allocation when it is obvious that it is 1615 * not going to succeed. 1616 */ 1617 i = si_mem_available(); 1618 if (i < nr_pages) 1619 return -ENOMEM; 1620 1621 /* 1622 * __GFP_RETRY_MAYFAIL flag makes sure that the allocation fails 1623 * gracefully without invoking oom-killer and the system is not 1624 * destabilized. 1625 */ 1626 mflags = GFP_KERNEL | __GFP_RETRY_MAYFAIL; 1627 1628 /* 1629 * If a user thread allocates too much, and si_mem_available() 1630 * reports there's enough memory, even though there is not. 1631 * Make sure the OOM killer kills this thread. This can happen 1632 * even with RETRY_MAYFAIL because another task may be doing 1633 * an allocation after this task has taken all memory. 1634 * This is the task the OOM killer needs to take out during this 1635 * loop, even if it was triggered by an allocation somewhere else. 1636 */ 1637 if (user_thread) 1638 set_current_oom_origin(); 1639 for (i = 0; i < nr_pages; i++) { 1640 struct page *page; 1641 1642 bpage = kzalloc_node(ALIGN(sizeof(*bpage), cache_line_size()), 1643 mflags, cpu_to_node(cpu_buffer->cpu)); 1644 if (!bpage) 1645 goto free_pages; 1646 1647 rb_check_bpage(cpu_buffer, bpage); 1648 1649 list_add(&bpage->list, pages); 1650 1651 page = alloc_pages_node(cpu_to_node(cpu_buffer->cpu), mflags, 0); 1652 if (!page) 1653 goto free_pages; 1654 bpage->page = page_address(page); 1655 rb_init_page(bpage->page); 1656 1657 if (user_thread && fatal_signal_pending(current)) 1658 goto free_pages; 1659 } 1660 if (user_thread) 1661 clear_current_oom_origin(); 1662 1663 return 0; 1664 1665 free_pages: 1666 list_for_each_entry_safe(bpage, tmp, pages, list) { 1667 list_del_init(&bpage->list); 1668 free_buffer_page(bpage); 1669 } 1670 if (user_thread) 1671 clear_current_oom_origin(); 1672 1673 return -ENOMEM; 1674 } 1675 1676 static int rb_allocate_pages(struct ring_buffer_per_cpu *cpu_buffer, 1677 unsigned long nr_pages) 1678 { 1679 LIST_HEAD(pages); 1680 1681 WARN_ON(!nr_pages); 1682 1683 if (__rb_allocate_pages(cpu_buffer, nr_pages, &pages)) 1684 return -ENOMEM; 1685 1686 /* 1687 * The ring buffer page list is a circular list that does not 1688 * start and end with a list head. All page list items point to 1689 * other pages. 1690 */ 1691 cpu_buffer->pages = pages.next; 1692 list_del(&pages); 1693 1694 cpu_buffer->nr_pages = nr_pages; 1695 1696 rb_check_pages(cpu_buffer); 1697 1698 return 0; 1699 } 1700 1701 static struct ring_buffer_per_cpu * 1702 rb_allocate_cpu_buffer(struct trace_buffer *buffer, long nr_pages, int cpu) 1703 { 1704 struct ring_buffer_per_cpu *cpu_buffer; 1705 struct buffer_page *bpage; 1706 struct page *page; 1707 int ret; 1708 1709 cpu_buffer = kzalloc_node(ALIGN(sizeof(*cpu_buffer), cache_line_size()), 1710 GFP_KERNEL, cpu_to_node(cpu)); 1711 if (!cpu_buffer) 1712 return NULL; 1713 1714 cpu_buffer->cpu = cpu; 1715 cpu_buffer->buffer = buffer; 1716 raw_spin_lock_init(&cpu_buffer->reader_lock); 1717 lockdep_set_class(&cpu_buffer->reader_lock, buffer->reader_lock_key); 1718 cpu_buffer->lock = (arch_spinlock_t)__ARCH_SPIN_LOCK_UNLOCKED; 1719 INIT_WORK(&cpu_buffer->update_pages_work, update_pages_handler); 1720 init_completion(&cpu_buffer->update_done); 1721 init_irq_work(&cpu_buffer->irq_work.work, rb_wake_up_waiters); 1722 init_waitqueue_head(&cpu_buffer->irq_work.waiters); 1723 init_waitqueue_head(&cpu_buffer->irq_work.full_waiters); 1724 1725 bpage = kzalloc_node(ALIGN(sizeof(*bpage), cache_line_size()), 1726 GFP_KERNEL, cpu_to_node(cpu)); 1727 if (!bpage) 1728 goto fail_free_buffer; 1729 1730 rb_check_bpage(cpu_buffer, bpage); 1731 1732 cpu_buffer->reader_page = bpage; 1733 page = alloc_pages_node(cpu_to_node(cpu), GFP_KERNEL, 0); 1734 if (!page) 1735 goto fail_free_reader; 1736 bpage->page = page_address(page); 1737 rb_init_page(bpage->page); 1738 1739 INIT_LIST_HEAD(&cpu_buffer->reader_page->list); 1740 INIT_LIST_HEAD(&cpu_buffer->new_pages); 1741 1742 ret = rb_allocate_pages(cpu_buffer, nr_pages); 1743 if (ret < 0) 1744 goto fail_free_reader; 1745 1746 cpu_buffer->head_page 1747 = list_entry(cpu_buffer->pages, struct buffer_page, list); 1748 cpu_buffer->tail_page = cpu_buffer->commit_page = cpu_buffer->head_page; 1749 1750 rb_head_page_activate(cpu_buffer); 1751 1752 return cpu_buffer; 1753 1754 fail_free_reader: 1755 free_buffer_page(cpu_buffer->reader_page); 1756 1757 fail_free_buffer: 1758 kfree(cpu_buffer); 1759 return NULL; 1760 } 1761 1762 static void rb_free_cpu_buffer(struct ring_buffer_per_cpu *cpu_buffer) 1763 { 1764 struct list_head *head = cpu_buffer->pages; 1765 struct buffer_page *bpage, *tmp; 1766 1767 irq_work_sync(&cpu_buffer->irq_work.work); 1768 1769 free_buffer_page(cpu_buffer->reader_page); 1770 1771 if (head) { 1772 rb_head_page_deactivate(cpu_buffer); 1773 1774 list_for_each_entry_safe(bpage, tmp, head, list) { 1775 list_del_init(&bpage->list); 1776 free_buffer_page(bpage); 1777 } 1778 bpage = list_entry(head, struct buffer_page, list); 1779 free_buffer_page(bpage); 1780 } 1781 1782 kfree(cpu_buffer); 1783 } 1784 1785 /** 1786 * __ring_buffer_alloc - allocate a new ring_buffer 1787 * @size: the size in bytes per cpu that is needed. 1788 * @flags: attributes to set for the ring buffer. 1789 * @key: ring buffer reader_lock_key. 1790 * 1791 * Currently the only flag that is available is the RB_FL_OVERWRITE 1792 * flag. This flag means that the buffer will overwrite old data 1793 * when the buffer wraps. If this flag is not set, the buffer will 1794 * drop data when the tail hits the head. 1795 */ 1796 struct trace_buffer *__ring_buffer_alloc(unsigned long size, unsigned flags, 1797 struct lock_class_key *key) 1798 { 1799 struct trace_buffer *buffer; 1800 long nr_pages; 1801 int bsize; 1802 int cpu; 1803 int ret; 1804 1805 /* keep it in its own cache line */ 1806 buffer = kzalloc(ALIGN(sizeof(*buffer), cache_line_size()), 1807 GFP_KERNEL); 1808 if (!buffer) 1809 return NULL; 1810 1811 if (!zalloc_cpumask_var(&buffer->cpumask, GFP_KERNEL)) 1812 goto fail_free_buffer; 1813 1814 nr_pages = DIV_ROUND_UP(size, BUF_PAGE_SIZE); 1815 buffer->flags = flags; 1816 buffer->clock = trace_clock_local; 1817 buffer->reader_lock_key = key; 1818 1819 init_irq_work(&buffer->irq_work.work, rb_wake_up_waiters); 1820 init_waitqueue_head(&buffer->irq_work.waiters); 1821 1822 /* need at least two pages */ 1823 if (nr_pages < 2) 1824 nr_pages = 2; 1825 1826 buffer->cpus = nr_cpu_ids; 1827 1828 bsize = sizeof(void *) * nr_cpu_ids; 1829 buffer->buffers = kzalloc(ALIGN(bsize, cache_line_size()), 1830 GFP_KERNEL); 1831 if (!buffer->buffers) 1832 goto fail_free_cpumask; 1833 1834 cpu = raw_smp_processor_id(); 1835 cpumask_set_cpu(cpu, buffer->cpumask); 1836 buffer->buffers[cpu] = rb_allocate_cpu_buffer(buffer, nr_pages, cpu); 1837 if (!buffer->buffers[cpu]) 1838 goto fail_free_buffers; 1839 1840 ret = cpuhp_state_add_instance(CPUHP_TRACE_RB_PREPARE, &buffer->node); 1841 if (ret < 0) 1842 goto fail_free_buffers; 1843 1844 mutex_init(&buffer->mutex); 1845 1846 return buffer; 1847 1848 fail_free_buffers: 1849 for_each_buffer_cpu(buffer, cpu) { 1850 if (buffer->buffers[cpu]) 1851 rb_free_cpu_buffer(buffer->buffers[cpu]); 1852 } 1853 kfree(buffer->buffers); 1854 1855 fail_free_cpumask: 1856 free_cpumask_var(buffer->cpumask); 1857 1858 fail_free_buffer: 1859 kfree(buffer); 1860 return NULL; 1861 } 1862 EXPORT_SYMBOL_GPL(__ring_buffer_alloc); 1863 1864 /** 1865 * ring_buffer_free - free a ring buffer. 1866 * @buffer: the buffer to free. 1867 */ 1868 void 1869 ring_buffer_free(struct trace_buffer *buffer) 1870 { 1871 int cpu; 1872 1873 cpuhp_state_remove_instance(CPUHP_TRACE_RB_PREPARE, &buffer->node); 1874 1875 irq_work_sync(&buffer->irq_work.work); 1876 1877 for_each_buffer_cpu(buffer, cpu) 1878 rb_free_cpu_buffer(buffer->buffers[cpu]); 1879 1880 kfree(buffer->buffers); 1881 free_cpumask_var(buffer->cpumask); 1882 1883 kfree(buffer); 1884 } 1885 EXPORT_SYMBOL_GPL(ring_buffer_free); 1886 1887 void ring_buffer_set_clock(struct trace_buffer *buffer, 1888 u64 (*clock)(void)) 1889 { 1890 buffer->clock = clock; 1891 } 1892 1893 void ring_buffer_set_time_stamp_abs(struct trace_buffer *buffer, bool abs) 1894 { 1895 buffer->time_stamp_abs = abs; 1896 } 1897 1898 bool ring_buffer_time_stamp_abs(struct trace_buffer *buffer) 1899 { 1900 return buffer->time_stamp_abs; 1901 } 1902 1903 static void rb_reset_cpu(struct ring_buffer_per_cpu *cpu_buffer); 1904 1905 static inline unsigned long rb_page_entries(struct buffer_page *bpage) 1906 { 1907 return local_read(&bpage->entries) & RB_WRITE_MASK; 1908 } 1909 1910 static inline unsigned long rb_page_write(struct buffer_page *bpage) 1911 { 1912 return local_read(&bpage->write) & RB_WRITE_MASK; 1913 } 1914 1915 static bool 1916 rb_remove_pages(struct ring_buffer_per_cpu *cpu_buffer, unsigned long nr_pages) 1917 { 1918 struct list_head *tail_page, *to_remove, *next_page; 1919 struct buffer_page *to_remove_page, *tmp_iter_page; 1920 struct buffer_page *last_page, *first_page; 1921 unsigned long nr_removed; 1922 unsigned long head_bit; 1923 int page_entries; 1924 1925 head_bit = 0; 1926 1927 raw_spin_lock_irq(&cpu_buffer->reader_lock); 1928 atomic_inc(&cpu_buffer->record_disabled); 1929 /* 1930 * We don't race with the readers since we have acquired the reader 1931 * lock. We also don't race with writers after disabling recording. 1932 * This makes it easy to figure out the first and the last page to be 1933 * removed from the list. We unlink all the pages in between including 1934 * the first and last pages. This is done in a busy loop so that we 1935 * lose the least number of traces. 1936 * The pages are freed after we restart recording and unlock readers. 1937 */ 1938 tail_page = &cpu_buffer->tail_page->list; 1939 1940 /* 1941 * tail page might be on reader page, we remove the next page 1942 * from the ring buffer 1943 */ 1944 if (cpu_buffer->tail_page == cpu_buffer->reader_page) 1945 tail_page = rb_list_head(tail_page->next); 1946 to_remove = tail_page; 1947 1948 /* start of pages to remove */ 1949 first_page = list_entry(rb_list_head(to_remove->next), 1950 struct buffer_page, list); 1951 1952 for (nr_removed = 0; nr_removed < nr_pages; nr_removed++) { 1953 to_remove = rb_list_head(to_remove)->next; 1954 head_bit |= (unsigned long)to_remove & RB_PAGE_HEAD; 1955 } 1956 /* Read iterators need to reset themselves when some pages removed */ 1957 cpu_buffer->pages_removed += nr_removed; 1958 1959 next_page = rb_list_head(to_remove)->next; 1960 1961 /* 1962 * Now we remove all pages between tail_page and next_page. 1963 * Make sure that we have head_bit value preserved for the 1964 * next page 1965 */ 1966 tail_page->next = (struct list_head *)((unsigned long)next_page | 1967 head_bit); 1968 next_page = rb_list_head(next_page); 1969 next_page->prev = tail_page; 1970 1971 /* make sure pages points to a valid page in the ring buffer */ 1972 cpu_buffer->pages = next_page; 1973 1974 /* update head page */ 1975 if (head_bit) 1976 cpu_buffer->head_page = list_entry(next_page, 1977 struct buffer_page, list); 1978 1979 /* pages are removed, resume tracing and then free the pages */ 1980 atomic_dec(&cpu_buffer->record_disabled); 1981 raw_spin_unlock_irq(&cpu_buffer->reader_lock); 1982 1983 RB_WARN_ON(cpu_buffer, list_empty(cpu_buffer->pages)); 1984 1985 /* last buffer page to remove */ 1986 last_page = list_entry(rb_list_head(to_remove), struct buffer_page, 1987 list); 1988 tmp_iter_page = first_page; 1989 1990 do { 1991 cond_resched(); 1992 1993 to_remove_page = tmp_iter_page; 1994 rb_inc_page(&tmp_iter_page); 1995 1996 /* update the counters */ 1997 page_entries = rb_page_entries(to_remove_page); 1998 if (page_entries) { 1999 /* 2000 * If something was added to this page, it was full 2001 * since it is not the tail page. So we deduct the 2002 * bytes consumed in ring buffer from here. 2003 * Increment overrun to account for the lost events. 2004 */ 2005 local_add(page_entries, &cpu_buffer->overrun); 2006 local_sub(BUF_PAGE_SIZE, &cpu_buffer->entries_bytes); 2007 local_inc(&cpu_buffer->pages_lost); 2008 } 2009 2010 /* 2011 * We have already removed references to this list item, just 2012 * free up the buffer_page and its page 2013 */ 2014 free_buffer_page(to_remove_page); 2015 nr_removed--; 2016 2017 } while (to_remove_page != last_page); 2018 2019 RB_WARN_ON(cpu_buffer, nr_removed); 2020 2021 return nr_removed == 0; 2022 } 2023 2024 static bool 2025 rb_insert_pages(struct ring_buffer_per_cpu *cpu_buffer) 2026 { 2027 struct list_head *pages = &cpu_buffer->new_pages; 2028 unsigned long flags; 2029 bool success; 2030 int retries; 2031 2032 /* Can be called at early boot up, where interrupts must not been enabled */ 2033 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 2034 /* 2035 * We are holding the reader lock, so the reader page won't be swapped 2036 * in the ring buffer. Now we are racing with the writer trying to 2037 * move head page and the tail page. 2038 * We are going to adapt the reader page update process where: 2039 * 1. We first splice the start and end of list of new pages between 2040 * the head page and its previous page. 2041 * 2. We cmpxchg the prev_page->next to point from head page to the 2042 * start of new pages list. 2043 * 3. Finally, we update the head->prev to the end of new list. 2044 * 2045 * We will try this process 10 times, to make sure that we don't keep 2046 * spinning. 2047 */ 2048 retries = 10; 2049 success = false; 2050 while (retries--) { 2051 struct list_head *head_page, *prev_page, *r; 2052 struct list_head *last_page, *first_page; 2053 struct list_head *head_page_with_bit; 2054 struct buffer_page *hpage = rb_set_head_page(cpu_buffer); 2055 2056 if (!hpage) 2057 break; 2058 head_page = &hpage->list; 2059 prev_page = head_page->prev; 2060 2061 first_page = pages->next; 2062 last_page = pages->prev; 2063 2064 head_page_with_bit = (struct list_head *) 2065 ((unsigned long)head_page | RB_PAGE_HEAD); 2066 2067 last_page->next = head_page_with_bit; 2068 first_page->prev = prev_page; 2069 2070 r = cmpxchg(&prev_page->next, head_page_with_bit, first_page); 2071 2072 if (r == head_page_with_bit) { 2073 /* 2074 * yay, we replaced the page pointer to our new list, 2075 * now, we just have to update to head page's prev 2076 * pointer to point to end of list 2077 */ 2078 head_page->prev = last_page; 2079 success = true; 2080 break; 2081 } 2082 } 2083 2084 if (success) 2085 INIT_LIST_HEAD(pages); 2086 /* 2087 * If we weren't successful in adding in new pages, warn and stop 2088 * tracing 2089 */ 2090 RB_WARN_ON(cpu_buffer, !success); 2091 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 2092 2093 /* free pages if they weren't inserted */ 2094 if (!success) { 2095 struct buffer_page *bpage, *tmp; 2096 list_for_each_entry_safe(bpage, tmp, &cpu_buffer->new_pages, 2097 list) { 2098 list_del_init(&bpage->list); 2099 free_buffer_page(bpage); 2100 } 2101 } 2102 return success; 2103 } 2104 2105 static void rb_update_pages(struct ring_buffer_per_cpu *cpu_buffer) 2106 { 2107 bool success; 2108 2109 if (cpu_buffer->nr_pages_to_update > 0) 2110 success = rb_insert_pages(cpu_buffer); 2111 else 2112 success = rb_remove_pages(cpu_buffer, 2113 -cpu_buffer->nr_pages_to_update); 2114 2115 if (success) 2116 cpu_buffer->nr_pages += cpu_buffer->nr_pages_to_update; 2117 } 2118 2119 static void update_pages_handler(struct work_struct *work) 2120 { 2121 struct ring_buffer_per_cpu *cpu_buffer = container_of(work, 2122 struct ring_buffer_per_cpu, update_pages_work); 2123 rb_update_pages(cpu_buffer); 2124 complete(&cpu_buffer->update_done); 2125 } 2126 2127 /** 2128 * ring_buffer_resize - resize the ring buffer 2129 * @buffer: the buffer to resize. 2130 * @size: the new size. 2131 * @cpu_id: the cpu buffer to resize 2132 * 2133 * Minimum size is 2 * BUF_PAGE_SIZE. 2134 * 2135 * Returns 0 on success and < 0 on failure. 2136 */ 2137 int ring_buffer_resize(struct trace_buffer *buffer, unsigned long size, 2138 int cpu_id) 2139 { 2140 struct ring_buffer_per_cpu *cpu_buffer; 2141 unsigned long nr_pages; 2142 int cpu, err; 2143 2144 /* 2145 * Always succeed at resizing a non-existent buffer: 2146 */ 2147 if (!buffer) 2148 return 0; 2149 2150 /* Make sure the requested buffer exists */ 2151 if (cpu_id != RING_BUFFER_ALL_CPUS && 2152 !cpumask_test_cpu(cpu_id, buffer->cpumask)) 2153 return 0; 2154 2155 nr_pages = DIV_ROUND_UP(size, BUF_PAGE_SIZE); 2156 2157 /* we need a minimum of two pages */ 2158 if (nr_pages < 2) 2159 nr_pages = 2; 2160 2161 /* prevent another thread from changing buffer sizes */ 2162 mutex_lock(&buffer->mutex); 2163 atomic_inc(&buffer->resizing); 2164 2165 if (cpu_id == RING_BUFFER_ALL_CPUS) { 2166 /* 2167 * Don't succeed if resizing is disabled, as a reader might be 2168 * manipulating the ring buffer and is expecting a sane state while 2169 * this is true. 2170 */ 2171 for_each_buffer_cpu(buffer, cpu) { 2172 cpu_buffer = buffer->buffers[cpu]; 2173 if (atomic_read(&cpu_buffer->resize_disabled)) { 2174 err = -EBUSY; 2175 goto out_err_unlock; 2176 } 2177 } 2178 2179 /* calculate the pages to update */ 2180 for_each_buffer_cpu(buffer, cpu) { 2181 cpu_buffer = buffer->buffers[cpu]; 2182 2183 cpu_buffer->nr_pages_to_update = nr_pages - 2184 cpu_buffer->nr_pages; 2185 /* 2186 * nothing more to do for removing pages or no update 2187 */ 2188 if (cpu_buffer->nr_pages_to_update <= 0) 2189 continue; 2190 /* 2191 * to add pages, make sure all new pages can be 2192 * allocated without receiving ENOMEM 2193 */ 2194 INIT_LIST_HEAD(&cpu_buffer->new_pages); 2195 if (__rb_allocate_pages(cpu_buffer, cpu_buffer->nr_pages_to_update, 2196 &cpu_buffer->new_pages)) { 2197 /* not enough memory for new pages */ 2198 err = -ENOMEM; 2199 goto out_err; 2200 } 2201 2202 cond_resched(); 2203 } 2204 2205 cpus_read_lock(); 2206 /* 2207 * Fire off all the required work handlers 2208 * We can't schedule on offline CPUs, but it's not necessary 2209 * since we can change their buffer sizes without any race. 2210 */ 2211 for_each_buffer_cpu(buffer, cpu) { 2212 cpu_buffer = buffer->buffers[cpu]; 2213 if (!cpu_buffer->nr_pages_to_update) 2214 continue; 2215 2216 /* Can't run something on an offline CPU. */ 2217 if (!cpu_online(cpu)) { 2218 rb_update_pages(cpu_buffer); 2219 cpu_buffer->nr_pages_to_update = 0; 2220 } else { 2221 /* Run directly if possible. */ 2222 migrate_disable(); 2223 if (cpu != smp_processor_id()) { 2224 migrate_enable(); 2225 schedule_work_on(cpu, 2226 &cpu_buffer->update_pages_work); 2227 } else { 2228 update_pages_handler(&cpu_buffer->update_pages_work); 2229 migrate_enable(); 2230 } 2231 } 2232 } 2233 2234 /* wait for all the updates to complete */ 2235 for_each_buffer_cpu(buffer, cpu) { 2236 cpu_buffer = buffer->buffers[cpu]; 2237 if (!cpu_buffer->nr_pages_to_update) 2238 continue; 2239 2240 if (cpu_online(cpu)) 2241 wait_for_completion(&cpu_buffer->update_done); 2242 cpu_buffer->nr_pages_to_update = 0; 2243 } 2244 2245 cpus_read_unlock(); 2246 } else { 2247 cpu_buffer = buffer->buffers[cpu_id]; 2248 2249 if (nr_pages == cpu_buffer->nr_pages) 2250 goto out; 2251 2252 /* 2253 * Don't succeed if resizing is disabled, as a reader might be 2254 * manipulating the ring buffer and is expecting a sane state while 2255 * this is true. 2256 */ 2257 if (atomic_read(&cpu_buffer->resize_disabled)) { 2258 err = -EBUSY; 2259 goto out_err_unlock; 2260 } 2261 2262 cpu_buffer->nr_pages_to_update = nr_pages - 2263 cpu_buffer->nr_pages; 2264 2265 INIT_LIST_HEAD(&cpu_buffer->new_pages); 2266 if (cpu_buffer->nr_pages_to_update > 0 && 2267 __rb_allocate_pages(cpu_buffer, cpu_buffer->nr_pages_to_update, 2268 &cpu_buffer->new_pages)) { 2269 err = -ENOMEM; 2270 goto out_err; 2271 } 2272 2273 cpus_read_lock(); 2274 2275 /* Can't run something on an offline CPU. */ 2276 if (!cpu_online(cpu_id)) 2277 rb_update_pages(cpu_buffer); 2278 else { 2279 /* Run directly if possible. */ 2280 migrate_disable(); 2281 if (cpu_id == smp_processor_id()) { 2282 rb_update_pages(cpu_buffer); 2283 migrate_enable(); 2284 } else { 2285 migrate_enable(); 2286 schedule_work_on(cpu_id, 2287 &cpu_buffer->update_pages_work); 2288 wait_for_completion(&cpu_buffer->update_done); 2289 } 2290 } 2291 2292 cpu_buffer->nr_pages_to_update = 0; 2293 cpus_read_unlock(); 2294 } 2295 2296 out: 2297 /* 2298 * The ring buffer resize can happen with the ring buffer 2299 * enabled, so that the update disturbs the tracing as little 2300 * as possible. But if the buffer is disabled, we do not need 2301 * to worry about that, and we can take the time to verify 2302 * that the buffer is not corrupt. 2303 */ 2304 if (atomic_read(&buffer->record_disabled)) { 2305 atomic_inc(&buffer->record_disabled); 2306 /* 2307 * Even though the buffer was disabled, we must make sure 2308 * that it is truly disabled before calling rb_check_pages. 2309 * There could have been a race between checking 2310 * record_disable and incrementing it. 2311 */ 2312 synchronize_rcu(); 2313 for_each_buffer_cpu(buffer, cpu) { 2314 cpu_buffer = buffer->buffers[cpu]; 2315 rb_check_pages(cpu_buffer); 2316 } 2317 atomic_dec(&buffer->record_disabled); 2318 } 2319 2320 atomic_dec(&buffer->resizing); 2321 mutex_unlock(&buffer->mutex); 2322 return 0; 2323 2324 out_err: 2325 for_each_buffer_cpu(buffer, cpu) { 2326 struct buffer_page *bpage, *tmp; 2327 2328 cpu_buffer = buffer->buffers[cpu]; 2329 cpu_buffer->nr_pages_to_update = 0; 2330 2331 if (list_empty(&cpu_buffer->new_pages)) 2332 continue; 2333 2334 list_for_each_entry_safe(bpage, tmp, &cpu_buffer->new_pages, 2335 list) { 2336 list_del_init(&bpage->list); 2337 free_buffer_page(bpage); 2338 } 2339 } 2340 out_err_unlock: 2341 atomic_dec(&buffer->resizing); 2342 mutex_unlock(&buffer->mutex); 2343 return err; 2344 } 2345 EXPORT_SYMBOL_GPL(ring_buffer_resize); 2346 2347 void ring_buffer_change_overwrite(struct trace_buffer *buffer, int val) 2348 { 2349 mutex_lock(&buffer->mutex); 2350 if (val) 2351 buffer->flags |= RB_FL_OVERWRITE; 2352 else 2353 buffer->flags &= ~RB_FL_OVERWRITE; 2354 mutex_unlock(&buffer->mutex); 2355 } 2356 EXPORT_SYMBOL_GPL(ring_buffer_change_overwrite); 2357 2358 static __always_inline void *__rb_page_index(struct buffer_page *bpage, unsigned index) 2359 { 2360 return bpage->page->data + index; 2361 } 2362 2363 static __always_inline struct ring_buffer_event * 2364 rb_reader_event(struct ring_buffer_per_cpu *cpu_buffer) 2365 { 2366 return __rb_page_index(cpu_buffer->reader_page, 2367 cpu_buffer->reader_page->read); 2368 } 2369 2370 static __always_inline unsigned rb_page_commit(struct buffer_page *bpage) 2371 { 2372 return local_read(&bpage->page->commit); 2373 } 2374 2375 static struct ring_buffer_event * 2376 rb_iter_head_event(struct ring_buffer_iter *iter) 2377 { 2378 struct ring_buffer_event *event; 2379 struct buffer_page *iter_head_page = iter->head_page; 2380 unsigned long commit; 2381 unsigned length; 2382 2383 if (iter->head != iter->next_event) 2384 return iter->event; 2385 2386 /* 2387 * When the writer goes across pages, it issues a cmpxchg which 2388 * is a mb(), which will synchronize with the rmb here. 2389 * (see rb_tail_page_update() and __rb_reserve_next()) 2390 */ 2391 commit = rb_page_commit(iter_head_page); 2392 smp_rmb(); 2393 2394 /* An event needs to be at least 8 bytes in size */ 2395 if (iter->head > commit - 8) 2396 goto reset; 2397 2398 event = __rb_page_index(iter_head_page, iter->head); 2399 length = rb_event_length(event); 2400 2401 /* 2402 * READ_ONCE() doesn't work on functions and we don't want the 2403 * compiler doing any crazy optimizations with length. 2404 */ 2405 barrier(); 2406 2407 if ((iter->head + length) > commit || length > BUF_MAX_DATA_SIZE) 2408 /* Writer corrupted the read? */ 2409 goto reset; 2410 2411 memcpy(iter->event, event, length); 2412 /* 2413 * If the page stamp is still the same after this rmb() then the 2414 * event was safely copied without the writer entering the page. 2415 */ 2416 smp_rmb(); 2417 2418 /* Make sure the page didn't change since we read this */ 2419 if (iter->page_stamp != iter_head_page->page->time_stamp || 2420 commit > rb_page_commit(iter_head_page)) 2421 goto reset; 2422 2423 iter->next_event = iter->head + length; 2424 return iter->event; 2425 reset: 2426 /* Reset to the beginning */ 2427 iter->page_stamp = iter->read_stamp = iter->head_page->page->time_stamp; 2428 iter->head = 0; 2429 iter->next_event = 0; 2430 iter->missed_events = 1; 2431 return NULL; 2432 } 2433 2434 /* Size is determined by what has been committed */ 2435 static __always_inline unsigned rb_page_size(struct buffer_page *bpage) 2436 { 2437 return rb_page_commit(bpage); 2438 } 2439 2440 static __always_inline unsigned 2441 rb_commit_index(struct ring_buffer_per_cpu *cpu_buffer) 2442 { 2443 return rb_page_commit(cpu_buffer->commit_page); 2444 } 2445 2446 static __always_inline unsigned 2447 rb_event_index(struct ring_buffer_event *event) 2448 { 2449 unsigned long addr = (unsigned long)event; 2450 2451 return (addr & ~PAGE_MASK) - BUF_PAGE_HDR_SIZE; 2452 } 2453 2454 static void rb_inc_iter(struct ring_buffer_iter *iter) 2455 { 2456 struct ring_buffer_per_cpu *cpu_buffer = iter->cpu_buffer; 2457 2458 /* 2459 * The iterator could be on the reader page (it starts there). 2460 * But the head could have moved, since the reader was 2461 * found. Check for this case and assign the iterator 2462 * to the head page instead of next. 2463 */ 2464 if (iter->head_page == cpu_buffer->reader_page) 2465 iter->head_page = rb_set_head_page(cpu_buffer); 2466 else 2467 rb_inc_page(&iter->head_page); 2468 2469 iter->page_stamp = iter->read_stamp = iter->head_page->page->time_stamp; 2470 iter->head = 0; 2471 iter->next_event = 0; 2472 } 2473 2474 /* 2475 * rb_handle_head_page - writer hit the head page 2476 * 2477 * Returns: +1 to retry page 2478 * 0 to continue 2479 * -1 on error 2480 */ 2481 static int 2482 rb_handle_head_page(struct ring_buffer_per_cpu *cpu_buffer, 2483 struct buffer_page *tail_page, 2484 struct buffer_page *next_page) 2485 { 2486 struct buffer_page *new_head; 2487 int entries; 2488 int type; 2489 int ret; 2490 2491 entries = rb_page_entries(next_page); 2492 2493 /* 2494 * The hard part is here. We need to move the head 2495 * forward, and protect against both readers on 2496 * other CPUs and writers coming in via interrupts. 2497 */ 2498 type = rb_head_page_set_update(cpu_buffer, next_page, tail_page, 2499 RB_PAGE_HEAD); 2500 2501 /* 2502 * type can be one of four: 2503 * NORMAL - an interrupt already moved it for us 2504 * HEAD - we are the first to get here. 2505 * UPDATE - we are the interrupt interrupting 2506 * a current move. 2507 * MOVED - a reader on another CPU moved the next 2508 * pointer to its reader page. Give up 2509 * and try again. 2510 */ 2511 2512 switch (type) { 2513 case RB_PAGE_HEAD: 2514 /* 2515 * We changed the head to UPDATE, thus 2516 * it is our responsibility to update 2517 * the counters. 2518 */ 2519 local_add(entries, &cpu_buffer->overrun); 2520 local_sub(BUF_PAGE_SIZE, &cpu_buffer->entries_bytes); 2521 local_inc(&cpu_buffer->pages_lost); 2522 2523 /* 2524 * The entries will be zeroed out when we move the 2525 * tail page. 2526 */ 2527 2528 /* still more to do */ 2529 break; 2530 2531 case RB_PAGE_UPDATE: 2532 /* 2533 * This is an interrupt that interrupt the 2534 * previous update. Still more to do. 2535 */ 2536 break; 2537 case RB_PAGE_NORMAL: 2538 /* 2539 * An interrupt came in before the update 2540 * and processed this for us. 2541 * Nothing left to do. 2542 */ 2543 return 1; 2544 case RB_PAGE_MOVED: 2545 /* 2546 * The reader is on another CPU and just did 2547 * a swap with our next_page. 2548 * Try again. 2549 */ 2550 return 1; 2551 default: 2552 RB_WARN_ON(cpu_buffer, 1); /* WTF??? */ 2553 return -1; 2554 } 2555 2556 /* 2557 * Now that we are here, the old head pointer is 2558 * set to UPDATE. This will keep the reader from 2559 * swapping the head page with the reader page. 2560 * The reader (on another CPU) will spin till 2561 * we are finished. 2562 * 2563 * We just need to protect against interrupts 2564 * doing the job. We will set the next pointer 2565 * to HEAD. After that, we set the old pointer 2566 * to NORMAL, but only if it was HEAD before. 2567 * otherwise we are an interrupt, and only 2568 * want the outer most commit to reset it. 2569 */ 2570 new_head = next_page; 2571 rb_inc_page(&new_head); 2572 2573 ret = rb_head_page_set_head(cpu_buffer, new_head, next_page, 2574 RB_PAGE_NORMAL); 2575 2576 /* 2577 * Valid returns are: 2578 * HEAD - an interrupt came in and already set it. 2579 * NORMAL - One of two things: 2580 * 1) We really set it. 2581 * 2) A bunch of interrupts came in and moved 2582 * the page forward again. 2583 */ 2584 switch (ret) { 2585 case RB_PAGE_HEAD: 2586 case RB_PAGE_NORMAL: 2587 /* OK */ 2588 break; 2589 default: 2590 RB_WARN_ON(cpu_buffer, 1); 2591 return -1; 2592 } 2593 2594 /* 2595 * It is possible that an interrupt came in, 2596 * set the head up, then more interrupts came in 2597 * and moved it again. When we get back here, 2598 * the page would have been set to NORMAL but we 2599 * just set it back to HEAD. 2600 * 2601 * How do you detect this? Well, if that happened 2602 * the tail page would have moved. 2603 */ 2604 if (ret == RB_PAGE_NORMAL) { 2605 struct buffer_page *buffer_tail_page; 2606 2607 buffer_tail_page = READ_ONCE(cpu_buffer->tail_page); 2608 /* 2609 * If the tail had moved passed next, then we need 2610 * to reset the pointer. 2611 */ 2612 if (buffer_tail_page != tail_page && 2613 buffer_tail_page != next_page) 2614 rb_head_page_set_normal(cpu_buffer, new_head, 2615 next_page, 2616 RB_PAGE_HEAD); 2617 } 2618 2619 /* 2620 * If this was the outer most commit (the one that 2621 * changed the original pointer from HEAD to UPDATE), 2622 * then it is up to us to reset it to NORMAL. 2623 */ 2624 if (type == RB_PAGE_HEAD) { 2625 ret = rb_head_page_set_normal(cpu_buffer, next_page, 2626 tail_page, 2627 RB_PAGE_UPDATE); 2628 if (RB_WARN_ON(cpu_buffer, 2629 ret != RB_PAGE_UPDATE)) 2630 return -1; 2631 } 2632 2633 return 0; 2634 } 2635 2636 static inline void 2637 rb_reset_tail(struct ring_buffer_per_cpu *cpu_buffer, 2638 unsigned long tail, struct rb_event_info *info) 2639 { 2640 struct buffer_page *tail_page = info->tail_page; 2641 struct ring_buffer_event *event; 2642 unsigned long length = info->length; 2643 2644 /* 2645 * Only the event that crossed the page boundary 2646 * must fill the old tail_page with padding. 2647 */ 2648 if (tail >= BUF_PAGE_SIZE) { 2649 /* 2650 * If the page was filled, then we still need 2651 * to update the real_end. Reset it to zero 2652 * and the reader will ignore it. 2653 */ 2654 if (tail == BUF_PAGE_SIZE) 2655 tail_page->real_end = 0; 2656 2657 local_sub(length, &tail_page->write); 2658 return; 2659 } 2660 2661 event = __rb_page_index(tail_page, tail); 2662 2663 /* account for padding bytes */ 2664 local_add(BUF_PAGE_SIZE - tail, &cpu_buffer->entries_bytes); 2665 2666 /* 2667 * Save the original length to the meta data. 2668 * This will be used by the reader to add lost event 2669 * counter. 2670 */ 2671 tail_page->real_end = tail; 2672 2673 /* 2674 * If this event is bigger than the minimum size, then 2675 * we need to be careful that we don't subtract the 2676 * write counter enough to allow another writer to slip 2677 * in on this page. 2678 * We put in a discarded commit instead, to make sure 2679 * that this space is not used again. 2680 * 2681 * If we are less than the minimum size, we don't need to 2682 * worry about it. 2683 */ 2684 if (tail > (BUF_PAGE_SIZE - RB_EVNT_MIN_SIZE)) { 2685 /* No room for any events */ 2686 2687 /* Mark the rest of the page with padding */ 2688 rb_event_set_padding(event); 2689 2690 /* Make sure the padding is visible before the write update */ 2691 smp_wmb(); 2692 2693 /* Set the write back to the previous setting */ 2694 local_sub(length, &tail_page->write); 2695 return; 2696 } 2697 2698 /* Put in a discarded event */ 2699 event->array[0] = (BUF_PAGE_SIZE - tail) - RB_EVNT_HDR_SIZE; 2700 event->type_len = RINGBUF_TYPE_PADDING; 2701 /* time delta must be non zero */ 2702 event->time_delta = 1; 2703 2704 /* Make sure the padding is visible before the tail_page->write update */ 2705 smp_wmb(); 2706 2707 /* Set write to end of buffer */ 2708 length = (tail + length) - BUF_PAGE_SIZE; 2709 local_sub(length, &tail_page->write); 2710 } 2711 2712 static inline void rb_end_commit(struct ring_buffer_per_cpu *cpu_buffer); 2713 2714 /* 2715 * This is the slow path, force gcc not to inline it. 2716 */ 2717 static noinline struct ring_buffer_event * 2718 rb_move_tail(struct ring_buffer_per_cpu *cpu_buffer, 2719 unsigned long tail, struct rb_event_info *info) 2720 { 2721 struct buffer_page *tail_page = info->tail_page; 2722 struct buffer_page *commit_page = cpu_buffer->commit_page; 2723 struct trace_buffer *buffer = cpu_buffer->buffer; 2724 struct buffer_page *next_page; 2725 int ret; 2726 2727 next_page = tail_page; 2728 2729 rb_inc_page(&next_page); 2730 2731 /* 2732 * If for some reason, we had an interrupt storm that made 2733 * it all the way around the buffer, bail, and warn 2734 * about it. 2735 */ 2736 if (unlikely(next_page == commit_page)) { 2737 local_inc(&cpu_buffer->commit_overrun); 2738 goto out_reset; 2739 } 2740 2741 /* 2742 * This is where the fun begins! 2743 * 2744 * We are fighting against races between a reader that 2745 * could be on another CPU trying to swap its reader 2746 * page with the buffer head. 2747 * 2748 * We are also fighting against interrupts coming in and 2749 * moving the head or tail on us as well. 2750 * 2751 * If the next page is the head page then we have filled 2752 * the buffer, unless the commit page is still on the 2753 * reader page. 2754 */ 2755 if (rb_is_head_page(next_page, &tail_page->list)) { 2756 2757 /* 2758 * If the commit is not on the reader page, then 2759 * move the header page. 2760 */ 2761 if (!rb_is_reader_page(cpu_buffer->commit_page)) { 2762 /* 2763 * If we are not in overwrite mode, 2764 * this is easy, just stop here. 2765 */ 2766 if (!(buffer->flags & RB_FL_OVERWRITE)) { 2767 local_inc(&cpu_buffer->dropped_events); 2768 goto out_reset; 2769 } 2770 2771 ret = rb_handle_head_page(cpu_buffer, 2772 tail_page, 2773 next_page); 2774 if (ret < 0) 2775 goto out_reset; 2776 if (ret) 2777 goto out_again; 2778 } else { 2779 /* 2780 * We need to be careful here too. The 2781 * commit page could still be on the reader 2782 * page. We could have a small buffer, and 2783 * have filled up the buffer with events 2784 * from interrupts and such, and wrapped. 2785 * 2786 * Note, if the tail page is also on the 2787 * reader_page, we let it move out. 2788 */ 2789 if (unlikely((cpu_buffer->commit_page != 2790 cpu_buffer->tail_page) && 2791 (cpu_buffer->commit_page == 2792 cpu_buffer->reader_page))) { 2793 local_inc(&cpu_buffer->commit_overrun); 2794 goto out_reset; 2795 } 2796 } 2797 } 2798 2799 rb_tail_page_update(cpu_buffer, tail_page, next_page); 2800 2801 out_again: 2802 2803 rb_reset_tail(cpu_buffer, tail, info); 2804 2805 /* Commit what we have for now. */ 2806 rb_end_commit(cpu_buffer); 2807 /* rb_end_commit() decs committing */ 2808 local_inc(&cpu_buffer->committing); 2809 2810 /* fail and let the caller try again */ 2811 return ERR_PTR(-EAGAIN); 2812 2813 out_reset: 2814 /* reset write */ 2815 rb_reset_tail(cpu_buffer, tail, info); 2816 2817 return NULL; 2818 } 2819 2820 /* Slow path */ 2821 static struct ring_buffer_event * 2822 rb_add_time_stamp(struct ring_buffer_event *event, u64 delta, bool abs) 2823 { 2824 if (abs) 2825 event->type_len = RINGBUF_TYPE_TIME_STAMP; 2826 else 2827 event->type_len = RINGBUF_TYPE_TIME_EXTEND; 2828 2829 /* Not the first event on the page, or not delta? */ 2830 if (abs || rb_event_index(event)) { 2831 event->time_delta = delta & TS_MASK; 2832 event->array[0] = delta >> TS_SHIFT; 2833 } else { 2834 /* nope, just zero it */ 2835 event->time_delta = 0; 2836 event->array[0] = 0; 2837 } 2838 2839 return skip_time_extend(event); 2840 } 2841 2842 #ifndef CONFIG_HAVE_UNSTABLE_SCHED_CLOCK 2843 static inline bool sched_clock_stable(void) 2844 { 2845 return true; 2846 } 2847 #endif 2848 2849 static void 2850 rb_check_timestamp(struct ring_buffer_per_cpu *cpu_buffer, 2851 struct rb_event_info *info) 2852 { 2853 u64 write_stamp; 2854 2855 WARN_ONCE(1, "Delta way too big! %llu ts=%llu before=%llu after=%llu write stamp=%llu\n%s", 2856 (unsigned long long)info->delta, 2857 (unsigned long long)info->ts, 2858 (unsigned long long)info->before, 2859 (unsigned long long)info->after, 2860 (unsigned long long)(rb_time_read(&cpu_buffer->write_stamp, &write_stamp) ? write_stamp : 0), 2861 sched_clock_stable() ? "" : 2862 "If you just came from a suspend/resume,\n" 2863 "please switch to the trace global clock:\n" 2864 " echo global > /sys/kernel/tracing/trace_clock\n" 2865 "or add trace_clock=global to the kernel command line\n"); 2866 } 2867 2868 static void rb_add_timestamp(struct ring_buffer_per_cpu *cpu_buffer, 2869 struct ring_buffer_event **event, 2870 struct rb_event_info *info, 2871 u64 *delta, 2872 unsigned int *length) 2873 { 2874 bool abs = info->add_timestamp & 2875 (RB_ADD_STAMP_FORCE | RB_ADD_STAMP_ABSOLUTE); 2876 2877 if (unlikely(info->delta > (1ULL << 59))) { 2878 /* 2879 * Some timers can use more than 59 bits, and when a timestamp 2880 * is added to the buffer, it will lose those bits. 2881 */ 2882 if (abs && (info->ts & TS_MSB)) { 2883 info->delta &= ABS_TS_MASK; 2884 2885 /* did the clock go backwards */ 2886 } else if (info->before == info->after && info->before > info->ts) { 2887 /* not interrupted */ 2888 static int once; 2889 2890 /* 2891 * This is possible with a recalibrating of the TSC. 2892 * Do not produce a call stack, but just report it. 2893 */ 2894 if (!once) { 2895 once++; 2896 pr_warn("Ring buffer clock went backwards: %llu -> %llu\n", 2897 info->before, info->ts); 2898 } 2899 } else 2900 rb_check_timestamp(cpu_buffer, info); 2901 if (!abs) 2902 info->delta = 0; 2903 } 2904 *event = rb_add_time_stamp(*event, info->delta, abs); 2905 *length -= RB_LEN_TIME_EXTEND; 2906 *delta = 0; 2907 } 2908 2909 /** 2910 * rb_update_event - update event type and data 2911 * @cpu_buffer: The per cpu buffer of the @event 2912 * @event: the event to update 2913 * @info: The info to update the @event with (contains length and delta) 2914 * 2915 * Update the type and data fields of the @event. The length 2916 * is the actual size that is written to the ring buffer, 2917 * and with this, we can determine what to place into the 2918 * data field. 2919 */ 2920 static void 2921 rb_update_event(struct ring_buffer_per_cpu *cpu_buffer, 2922 struct ring_buffer_event *event, 2923 struct rb_event_info *info) 2924 { 2925 unsigned length = info->length; 2926 u64 delta = info->delta; 2927 unsigned int nest = local_read(&cpu_buffer->committing) - 1; 2928 2929 if (!WARN_ON_ONCE(nest >= MAX_NEST)) 2930 cpu_buffer->event_stamp[nest] = info->ts; 2931 2932 /* 2933 * If we need to add a timestamp, then we 2934 * add it to the start of the reserved space. 2935 */ 2936 if (unlikely(info->add_timestamp)) 2937 rb_add_timestamp(cpu_buffer, &event, info, &delta, &length); 2938 2939 event->time_delta = delta; 2940 length -= RB_EVNT_HDR_SIZE; 2941 if (length > RB_MAX_SMALL_DATA || RB_FORCE_8BYTE_ALIGNMENT) { 2942 event->type_len = 0; 2943 event->array[0] = length; 2944 } else 2945 event->type_len = DIV_ROUND_UP(length, RB_ALIGNMENT); 2946 } 2947 2948 static unsigned rb_calculate_event_length(unsigned length) 2949 { 2950 struct ring_buffer_event event; /* Used only for sizeof array */ 2951 2952 /* zero length can cause confusions */ 2953 if (!length) 2954 length++; 2955 2956 if (length > RB_MAX_SMALL_DATA || RB_FORCE_8BYTE_ALIGNMENT) 2957 length += sizeof(event.array[0]); 2958 2959 length += RB_EVNT_HDR_SIZE; 2960 length = ALIGN(length, RB_ARCH_ALIGNMENT); 2961 2962 /* 2963 * In case the time delta is larger than the 27 bits for it 2964 * in the header, we need to add a timestamp. If another 2965 * event comes in when trying to discard this one to increase 2966 * the length, then the timestamp will be added in the allocated 2967 * space of this event. If length is bigger than the size needed 2968 * for the TIME_EXTEND, then padding has to be used. The events 2969 * length must be either RB_LEN_TIME_EXTEND, or greater than or equal 2970 * to RB_LEN_TIME_EXTEND + 8, as 8 is the minimum size for padding. 2971 * As length is a multiple of 4, we only need to worry if it 2972 * is 12 (RB_LEN_TIME_EXTEND + 4). 2973 */ 2974 if (length == RB_LEN_TIME_EXTEND + RB_ALIGNMENT) 2975 length += RB_ALIGNMENT; 2976 2977 return length; 2978 } 2979 2980 static u64 rb_time_delta(struct ring_buffer_event *event) 2981 { 2982 switch (event->type_len) { 2983 case RINGBUF_TYPE_PADDING: 2984 return 0; 2985 2986 case RINGBUF_TYPE_TIME_EXTEND: 2987 return rb_event_time_stamp(event); 2988 2989 case RINGBUF_TYPE_TIME_STAMP: 2990 return 0; 2991 2992 case RINGBUF_TYPE_DATA: 2993 return event->time_delta; 2994 default: 2995 return 0; 2996 } 2997 } 2998 2999 static inline bool 3000 rb_try_to_discard(struct ring_buffer_per_cpu *cpu_buffer, 3001 struct ring_buffer_event *event) 3002 { 3003 unsigned long new_index, old_index; 3004 struct buffer_page *bpage; 3005 unsigned long addr; 3006 u64 write_stamp; 3007 u64 delta; 3008 3009 new_index = rb_event_index(event); 3010 old_index = new_index + rb_event_ts_length(event); 3011 addr = (unsigned long)event; 3012 addr &= PAGE_MASK; 3013 3014 bpage = READ_ONCE(cpu_buffer->tail_page); 3015 3016 delta = rb_time_delta(event); 3017 3018 if (!rb_time_read(&cpu_buffer->write_stamp, &write_stamp)) 3019 return false; 3020 3021 /* Make sure the write stamp is read before testing the location */ 3022 barrier(); 3023 3024 if (bpage->page == (void *)addr && rb_page_write(bpage) == old_index) { 3025 unsigned long write_mask = 3026 local_read(&bpage->write) & ~RB_WRITE_MASK; 3027 unsigned long event_length = rb_event_length(event); 3028 3029 /* Something came in, can't discard */ 3030 if (!rb_time_cmpxchg(&cpu_buffer->write_stamp, 3031 write_stamp, write_stamp - delta)) 3032 return false; 3033 3034 /* 3035 * It's possible that the event time delta is zero 3036 * (has the same time stamp as the previous event) 3037 * in which case write_stamp and before_stamp could 3038 * be the same. In such a case, force before_stamp 3039 * to be different than write_stamp. It doesn't 3040 * matter what it is, as long as its different. 3041 */ 3042 if (!delta) 3043 rb_time_set(&cpu_buffer->before_stamp, 0); 3044 3045 /* 3046 * If an event were to come in now, it would see that the 3047 * write_stamp and the before_stamp are different, and assume 3048 * that this event just added itself before updating 3049 * the write stamp. The interrupting event will fix the 3050 * write stamp for us, and use the before stamp as its delta. 3051 */ 3052 3053 /* 3054 * This is on the tail page. It is possible that 3055 * a write could come in and move the tail page 3056 * and write to the next page. That is fine 3057 * because we just shorten what is on this page. 3058 */ 3059 old_index += write_mask; 3060 new_index += write_mask; 3061 3062 /* caution: old_index gets updated on cmpxchg failure */ 3063 if (local_try_cmpxchg(&bpage->write, &old_index, new_index)) { 3064 /* update counters */ 3065 local_sub(event_length, &cpu_buffer->entries_bytes); 3066 return true; 3067 } 3068 } 3069 3070 /* could not discard */ 3071 return false; 3072 } 3073 3074 static void rb_start_commit(struct ring_buffer_per_cpu *cpu_buffer) 3075 { 3076 local_inc(&cpu_buffer->committing); 3077 local_inc(&cpu_buffer->commits); 3078 } 3079 3080 static __always_inline void 3081 rb_set_commit_to_write(struct ring_buffer_per_cpu *cpu_buffer) 3082 { 3083 unsigned long max_count; 3084 3085 /* 3086 * We only race with interrupts and NMIs on this CPU. 3087 * If we own the commit event, then we can commit 3088 * all others that interrupted us, since the interruptions 3089 * are in stack format (they finish before they come 3090 * back to us). This allows us to do a simple loop to 3091 * assign the commit to the tail. 3092 */ 3093 again: 3094 max_count = cpu_buffer->nr_pages * 100; 3095 3096 while (cpu_buffer->commit_page != READ_ONCE(cpu_buffer->tail_page)) { 3097 if (RB_WARN_ON(cpu_buffer, !(--max_count))) 3098 return; 3099 if (RB_WARN_ON(cpu_buffer, 3100 rb_is_reader_page(cpu_buffer->tail_page))) 3101 return; 3102 /* 3103 * No need for a memory barrier here, as the update 3104 * of the tail_page did it for this page. 3105 */ 3106 local_set(&cpu_buffer->commit_page->page->commit, 3107 rb_page_write(cpu_buffer->commit_page)); 3108 rb_inc_page(&cpu_buffer->commit_page); 3109 /* add barrier to keep gcc from optimizing too much */ 3110 barrier(); 3111 } 3112 while (rb_commit_index(cpu_buffer) != 3113 rb_page_write(cpu_buffer->commit_page)) { 3114 3115 /* Make sure the readers see the content of what is committed. */ 3116 smp_wmb(); 3117 local_set(&cpu_buffer->commit_page->page->commit, 3118 rb_page_write(cpu_buffer->commit_page)); 3119 RB_WARN_ON(cpu_buffer, 3120 local_read(&cpu_buffer->commit_page->page->commit) & 3121 ~RB_WRITE_MASK); 3122 barrier(); 3123 } 3124 3125 /* again, keep gcc from optimizing */ 3126 barrier(); 3127 3128 /* 3129 * If an interrupt came in just after the first while loop 3130 * and pushed the tail page forward, we will be left with 3131 * a dangling commit that will never go forward. 3132 */ 3133 if (unlikely(cpu_buffer->commit_page != READ_ONCE(cpu_buffer->tail_page))) 3134 goto again; 3135 } 3136 3137 static __always_inline void rb_end_commit(struct ring_buffer_per_cpu *cpu_buffer) 3138 { 3139 unsigned long commits; 3140 3141 if (RB_WARN_ON(cpu_buffer, 3142 !local_read(&cpu_buffer->committing))) 3143 return; 3144 3145 again: 3146 commits = local_read(&cpu_buffer->commits); 3147 /* synchronize with interrupts */ 3148 barrier(); 3149 if (local_read(&cpu_buffer->committing) == 1) 3150 rb_set_commit_to_write(cpu_buffer); 3151 3152 local_dec(&cpu_buffer->committing); 3153 3154 /* synchronize with interrupts */ 3155 barrier(); 3156 3157 /* 3158 * Need to account for interrupts coming in between the 3159 * updating of the commit page and the clearing of the 3160 * committing counter. 3161 */ 3162 if (unlikely(local_read(&cpu_buffer->commits) != commits) && 3163 !local_read(&cpu_buffer->committing)) { 3164 local_inc(&cpu_buffer->committing); 3165 goto again; 3166 } 3167 } 3168 3169 static inline void rb_event_discard(struct ring_buffer_event *event) 3170 { 3171 if (extended_time(event)) 3172 event = skip_time_extend(event); 3173 3174 /* array[0] holds the actual length for the discarded event */ 3175 event->array[0] = rb_event_data_length(event) - RB_EVNT_HDR_SIZE; 3176 event->type_len = RINGBUF_TYPE_PADDING; 3177 /* time delta must be non zero */ 3178 if (!event->time_delta) 3179 event->time_delta = 1; 3180 } 3181 3182 static void rb_commit(struct ring_buffer_per_cpu *cpu_buffer) 3183 { 3184 local_inc(&cpu_buffer->entries); 3185 rb_end_commit(cpu_buffer); 3186 } 3187 3188 static __always_inline void 3189 rb_wakeups(struct trace_buffer *buffer, struct ring_buffer_per_cpu *cpu_buffer) 3190 { 3191 if (buffer->irq_work.waiters_pending) { 3192 buffer->irq_work.waiters_pending = false; 3193 /* irq_work_queue() supplies it's own memory barriers */ 3194 irq_work_queue(&buffer->irq_work.work); 3195 } 3196 3197 if (cpu_buffer->irq_work.waiters_pending) { 3198 cpu_buffer->irq_work.waiters_pending = false; 3199 /* irq_work_queue() supplies it's own memory barriers */ 3200 irq_work_queue(&cpu_buffer->irq_work.work); 3201 } 3202 3203 if (cpu_buffer->last_pages_touch == local_read(&cpu_buffer->pages_touched)) 3204 return; 3205 3206 if (cpu_buffer->reader_page == cpu_buffer->commit_page) 3207 return; 3208 3209 if (!cpu_buffer->irq_work.full_waiters_pending) 3210 return; 3211 3212 cpu_buffer->last_pages_touch = local_read(&cpu_buffer->pages_touched); 3213 3214 if (!full_hit(buffer, cpu_buffer->cpu, cpu_buffer->shortest_full)) 3215 return; 3216 3217 cpu_buffer->irq_work.wakeup_full = true; 3218 cpu_buffer->irq_work.full_waiters_pending = false; 3219 /* irq_work_queue() supplies it's own memory barriers */ 3220 irq_work_queue(&cpu_buffer->irq_work.work); 3221 } 3222 3223 #ifdef CONFIG_RING_BUFFER_RECORD_RECURSION 3224 # define do_ring_buffer_record_recursion() \ 3225 do_ftrace_record_recursion(_THIS_IP_, _RET_IP_) 3226 #else 3227 # define do_ring_buffer_record_recursion() do { } while (0) 3228 #endif 3229 3230 /* 3231 * The lock and unlock are done within a preempt disable section. 3232 * The current_context per_cpu variable can only be modified 3233 * by the current task between lock and unlock. But it can 3234 * be modified more than once via an interrupt. To pass this 3235 * information from the lock to the unlock without having to 3236 * access the 'in_interrupt()' functions again (which do show 3237 * a bit of overhead in something as critical as function tracing, 3238 * we use a bitmask trick. 3239 * 3240 * bit 1 = NMI context 3241 * bit 2 = IRQ context 3242 * bit 3 = SoftIRQ context 3243 * bit 4 = normal context. 3244 * 3245 * This works because this is the order of contexts that can 3246 * preempt other contexts. A SoftIRQ never preempts an IRQ 3247 * context. 3248 * 3249 * When the context is determined, the corresponding bit is 3250 * checked and set (if it was set, then a recursion of that context 3251 * happened). 3252 * 3253 * On unlock, we need to clear this bit. To do so, just subtract 3254 * 1 from the current_context and AND it to itself. 3255 * 3256 * (binary) 3257 * 101 - 1 = 100 3258 * 101 & 100 = 100 (clearing bit zero) 3259 * 3260 * 1010 - 1 = 1001 3261 * 1010 & 1001 = 1000 (clearing bit 1) 3262 * 3263 * The least significant bit can be cleared this way, and it 3264 * just so happens that it is the same bit corresponding to 3265 * the current context. 3266 * 3267 * Now the TRANSITION bit breaks the above slightly. The TRANSITION bit 3268 * is set when a recursion is detected at the current context, and if 3269 * the TRANSITION bit is already set, it will fail the recursion. 3270 * This is needed because there's a lag between the changing of 3271 * interrupt context and updating the preempt count. In this case, 3272 * a false positive will be found. To handle this, one extra recursion 3273 * is allowed, and this is done by the TRANSITION bit. If the TRANSITION 3274 * bit is already set, then it is considered a recursion and the function 3275 * ends. Otherwise, the TRANSITION bit is set, and that bit is returned. 3276 * 3277 * On the trace_recursive_unlock(), the TRANSITION bit will be the first 3278 * to be cleared. Even if it wasn't the context that set it. That is, 3279 * if an interrupt comes in while NORMAL bit is set and the ring buffer 3280 * is called before preempt_count() is updated, since the check will 3281 * be on the NORMAL bit, the TRANSITION bit will then be set. If an 3282 * NMI then comes in, it will set the NMI bit, but when the NMI code 3283 * does the trace_recursive_unlock() it will clear the TRANSITION bit 3284 * and leave the NMI bit set. But this is fine, because the interrupt 3285 * code that set the TRANSITION bit will then clear the NMI bit when it 3286 * calls trace_recursive_unlock(). If another NMI comes in, it will 3287 * set the TRANSITION bit and continue. 3288 * 3289 * Note: The TRANSITION bit only handles a single transition between context. 3290 */ 3291 3292 static __always_inline bool 3293 trace_recursive_lock(struct ring_buffer_per_cpu *cpu_buffer) 3294 { 3295 unsigned int val = cpu_buffer->current_context; 3296 int bit = interrupt_context_level(); 3297 3298 bit = RB_CTX_NORMAL - bit; 3299 3300 if (unlikely(val & (1 << (bit + cpu_buffer->nest)))) { 3301 /* 3302 * It is possible that this was called by transitioning 3303 * between interrupt context, and preempt_count() has not 3304 * been updated yet. In this case, use the TRANSITION bit. 3305 */ 3306 bit = RB_CTX_TRANSITION; 3307 if (val & (1 << (bit + cpu_buffer->nest))) { 3308 do_ring_buffer_record_recursion(); 3309 return true; 3310 } 3311 } 3312 3313 val |= (1 << (bit + cpu_buffer->nest)); 3314 cpu_buffer->current_context = val; 3315 3316 return false; 3317 } 3318 3319 static __always_inline void 3320 trace_recursive_unlock(struct ring_buffer_per_cpu *cpu_buffer) 3321 { 3322 cpu_buffer->current_context &= 3323 cpu_buffer->current_context - (1 << cpu_buffer->nest); 3324 } 3325 3326 /* The recursive locking above uses 5 bits */ 3327 #define NESTED_BITS 5 3328 3329 /** 3330 * ring_buffer_nest_start - Allow to trace while nested 3331 * @buffer: The ring buffer to modify 3332 * 3333 * The ring buffer has a safety mechanism to prevent recursion. 3334 * But there may be a case where a trace needs to be done while 3335 * tracing something else. In this case, calling this function 3336 * will allow this function to nest within a currently active 3337 * ring_buffer_lock_reserve(). 3338 * 3339 * Call this function before calling another ring_buffer_lock_reserve() and 3340 * call ring_buffer_nest_end() after the nested ring_buffer_unlock_commit(). 3341 */ 3342 void ring_buffer_nest_start(struct trace_buffer *buffer) 3343 { 3344 struct ring_buffer_per_cpu *cpu_buffer; 3345 int cpu; 3346 3347 /* Enabled by ring_buffer_nest_end() */ 3348 preempt_disable_notrace(); 3349 cpu = raw_smp_processor_id(); 3350 cpu_buffer = buffer->buffers[cpu]; 3351 /* This is the shift value for the above recursive locking */ 3352 cpu_buffer->nest += NESTED_BITS; 3353 } 3354 3355 /** 3356 * ring_buffer_nest_end - Allow to trace while nested 3357 * @buffer: The ring buffer to modify 3358 * 3359 * Must be called after ring_buffer_nest_start() and after the 3360 * ring_buffer_unlock_commit(). 3361 */ 3362 void ring_buffer_nest_end(struct trace_buffer *buffer) 3363 { 3364 struct ring_buffer_per_cpu *cpu_buffer; 3365 int cpu; 3366 3367 /* disabled by ring_buffer_nest_start() */ 3368 cpu = raw_smp_processor_id(); 3369 cpu_buffer = buffer->buffers[cpu]; 3370 /* This is the shift value for the above recursive locking */ 3371 cpu_buffer->nest -= NESTED_BITS; 3372 preempt_enable_notrace(); 3373 } 3374 3375 /** 3376 * ring_buffer_unlock_commit - commit a reserved 3377 * @buffer: The buffer to commit to 3378 * 3379 * This commits the data to the ring buffer, and releases any locks held. 3380 * 3381 * Must be paired with ring_buffer_lock_reserve. 3382 */ 3383 int ring_buffer_unlock_commit(struct trace_buffer *buffer) 3384 { 3385 struct ring_buffer_per_cpu *cpu_buffer; 3386 int cpu = raw_smp_processor_id(); 3387 3388 cpu_buffer = buffer->buffers[cpu]; 3389 3390 rb_commit(cpu_buffer); 3391 3392 rb_wakeups(buffer, cpu_buffer); 3393 3394 trace_recursive_unlock(cpu_buffer); 3395 3396 preempt_enable_notrace(); 3397 3398 return 0; 3399 } 3400 EXPORT_SYMBOL_GPL(ring_buffer_unlock_commit); 3401 3402 /* Special value to validate all deltas on a page. */ 3403 #define CHECK_FULL_PAGE 1L 3404 3405 #ifdef CONFIG_RING_BUFFER_VALIDATE_TIME_DELTAS 3406 static void dump_buffer_page(struct buffer_data_page *bpage, 3407 struct rb_event_info *info, 3408 unsigned long tail) 3409 { 3410 struct ring_buffer_event *event; 3411 u64 ts, delta; 3412 int e; 3413 3414 ts = bpage->time_stamp; 3415 pr_warn(" [%lld] PAGE TIME STAMP\n", ts); 3416 3417 for (e = 0; e < tail; e += rb_event_length(event)) { 3418 3419 event = (struct ring_buffer_event *)(bpage->data + e); 3420 3421 switch (event->type_len) { 3422 3423 case RINGBUF_TYPE_TIME_EXTEND: 3424 delta = rb_event_time_stamp(event); 3425 ts += delta; 3426 pr_warn(" [%lld] delta:%lld TIME EXTEND\n", ts, delta); 3427 break; 3428 3429 case RINGBUF_TYPE_TIME_STAMP: 3430 delta = rb_event_time_stamp(event); 3431 ts = rb_fix_abs_ts(delta, ts); 3432 pr_warn(" [%lld] absolute:%lld TIME STAMP\n", ts, delta); 3433 break; 3434 3435 case RINGBUF_TYPE_PADDING: 3436 ts += event->time_delta; 3437 pr_warn(" [%lld] delta:%d PADDING\n", ts, event->time_delta); 3438 break; 3439 3440 case RINGBUF_TYPE_DATA: 3441 ts += event->time_delta; 3442 pr_warn(" [%lld] delta:%d\n", ts, event->time_delta); 3443 break; 3444 3445 default: 3446 break; 3447 } 3448 } 3449 } 3450 3451 static DEFINE_PER_CPU(atomic_t, checking); 3452 static atomic_t ts_dump; 3453 3454 /* 3455 * Check if the current event time stamp matches the deltas on 3456 * the buffer page. 3457 */ 3458 static void check_buffer(struct ring_buffer_per_cpu *cpu_buffer, 3459 struct rb_event_info *info, 3460 unsigned long tail) 3461 { 3462 struct ring_buffer_event *event; 3463 struct buffer_data_page *bpage; 3464 u64 ts, delta; 3465 bool full = false; 3466 int e; 3467 3468 bpage = info->tail_page->page; 3469 3470 if (tail == CHECK_FULL_PAGE) { 3471 full = true; 3472 tail = local_read(&bpage->commit); 3473 } else if (info->add_timestamp & 3474 (RB_ADD_STAMP_FORCE | RB_ADD_STAMP_ABSOLUTE)) { 3475 /* Ignore events with absolute time stamps */ 3476 return; 3477 } 3478 3479 /* 3480 * Do not check the first event (skip possible extends too). 3481 * Also do not check if previous events have not been committed. 3482 */ 3483 if (tail <= 8 || tail > local_read(&bpage->commit)) 3484 return; 3485 3486 /* 3487 * If this interrupted another event, 3488 */ 3489 if (atomic_inc_return(this_cpu_ptr(&checking)) != 1) 3490 goto out; 3491 3492 ts = bpage->time_stamp; 3493 3494 for (e = 0; e < tail; e += rb_event_length(event)) { 3495 3496 event = (struct ring_buffer_event *)(bpage->data + e); 3497 3498 switch (event->type_len) { 3499 3500 case RINGBUF_TYPE_TIME_EXTEND: 3501 delta = rb_event_time_stamp(event); 3502 ts += delta; 3503 break; 3504 3505 case RINGBUF_TYPE_TIME_STAMP: 3506 delta = rb_event_time_stamp(event); 3507 ts = rb_fix_abs_ts(delta, ts); 3508 break; 3509 3510 case RINGBUF_TYPE_PADDING: 3511 if (event->time_delta == 1) 3512 break; 3513 fallthrough; 3514 case RINGBUF_TYPE_DATA: 3515 ts += event->time_delta; 3516 break; 3517 3518 default: 3519 RB_WARN_ON(cpu_buffer, 1); 3520 } 3521 } 3522 if ((full && ts > info->ts) || 3523 (!full && ts + info->delta != info->ts)) { 3524 /* If another report is happening, ignore this one */ 3525 if (atomic_inc_return(&ts_dump) != 1) { 3526 atomic_dec(&ts_dump); 3527 goto out; 3528 } 3529 atomic_inc(&cpu_buffer->record_disabled); 3530 /* There's some cases in boot up that this can happen */ 3531 WARN_ON_ONCE(system_state != SYSTEM_BOOTING); 3532 pr_warn("[CPU: %d]TIME DOES NOT MATCH expected:%lld actual:%lld delta:%lld before:%lld after:%lld%s\n", 3533 cpu_buffer->cpu, 3534 ts + info->delta, info->ts, info->delta, 3535 info->before, info->after, 3536 full ? " (full)" : ""); 3537 dump_buffer_page(bpage, info, tail); 3538 atomic_dec(&ts_dump); 3539 /* Do not re-enable checking */ 3540 return; 3541 } 3542 out: 3543 atomic_dec(this_cpu_ptr(&checking)); 3544 } 3545 #else 3546 static inline void check_buffer(struct ring_buffer_per_cpu *cpu_buffer, 3547 struct rb_event_info *info, 3548 unsigned long tail) 3549 { 3550 } 3551 #endif /* CONFIG_RING_BUFFER_VALIDATE_TIME_DELTAS */ 3552 3553 static struct ring_buffer_event * 3554 __rb_reserve_next(struct ring_buffer_per_cpu *cpu_buffer, 3555 struct rb_event_info *info) 3556 { 3557 struct ring_buffer_event *event; 3558 struct buffer_page *tail_page; 3559 unsigned long tail, write, w; 3560 bool a_ok; 3561 bool b_ok; 3562 3563 /* Don't let the compiler play games with cpu_buffer->tail_page */ 3564 tail_page = info->tail_page = READ_ONCE(cpu_buffer->tail_page); 3565 3566 /*A*/ w = local_read(&tail_page->write) & RB_WRITE_MASK; 3567 barrier(); 3568 b_ok = rb_time_read(&cpu_buffer->before_stamp, &info->before); 3569 a_ok = rb_time_read(&cpu_buffer->write_stamp, &info->after); 3570 barrier(); 3571 info->ts = rb_time_stamp(cpu_buffer->buffer); 3572 3573 if ((info->add_timestamp & RB_ADD_STAMP_ABSOLUTE)) { 3574 info->delta = info->ts; 3575 } else { 3576 /* 3577 * If interrupting an event time update, we may need an 3578 * absolute timestamp. 3579 * Don't bother if this is the start of a new page (w == 0). 3580 */ 3581 if (unlikely(!a_ok || !b_ok || (info->before != info->after && w))) { 3582 info->add_timestamp |= RB_ADD_STAMP_FORCE | RB_ADD_STAMP_EXTEND; 3583 info->length += RB_LEN_TIME_EXTEND; 3584 } else { 3585 info->delta = info->ts - info->after; 3586 if (unlikely(test_time_stamp(info->delta))) { 3587 info->add_timestamp |= RB_ADD_STAMP_EXTEND; 3588 info->length += RB_LEN_TIME_EXTEND; 3589 } 3590 } 3591 } 3592 3593 /*B*/ rb_time_set(&cpu_buffer->before_stamp, info->ts); 3594 3595 /*C*/ write = local_add_return(info->length, &tail_page->write); 3596 3597 /* set write to only the index of the write */ 3598 write &= RB_WRITE_MASK; 3599 3600 tail = write - info->length; 3601 3602 /* See if we shot pass the end of this buffer page */ 3603 if (unlikely(write > BUF_PAGE_SIZE)) { 3604 /* before and after may now different, fix it up*/ 3605 b_ok = rb_time_read(&cpu_buffer->before_stamp, &info->before); 3606 a_ok = rb_time_read(&cpu_buffer->write_stamp, &info->after); 3607 if (a_ok && b_ok && info->before != info->after) 3608 (void)rb_time_cmpxchg(&cpu_buffer->before_stamp, 3609 info->before, info->after); 3610 if (a_ok && b_ok) 3611 check_buffer(cpu_buffer, info, CHECK_FULL_PAGE); 3612 return rb_move_tail(cpu_buffer, tail, info); 3613 } 3614 3615 if (likely(tail == w)) { 3616 u64 save_before; 3617 bool s_ok; 3618 3619 /* Nothing interrupted us between A and C */ 3620 /*D*/ rb_time_set(&cpu_buffer->write_stamp, info->ts); 3621 barrier(); 3622 /*E*/ s_ok = rb_time_read(&cpu_buffer->before_stamp, &save_before); 3623 RB_WARN_ON(cpu_buffer, !s_ok); 3624 if (likely(!(info->add_timestamp & 3625 (RB_ADD_STAMP_FORCE | RB_ADD_STAMP_ABSOLUTE)))) 3626 /* This did not interrupt any time update */ 3627 info->delta = info->ts - info->after; 3628 else 3629 /* Just use full timestamp for interrupting event */ 3630 info->delta = info->ts; 3631 barrier(); 3632 check_buffer(cpu_buffer, info, tail); 3633 if (unlikely(info->ts != save_before)) { 3634 /* SLOW PATH - Interrupted between C and E */ 3635 3636 a_ok = rb_time_read(&cpu_buffer->write_stamp, &info->after); 3637 RB_WARN_ON(cpu_buffer, !a_ok); 3638 3639 /* Write stamp must only go forward */ 3640 if (save_before > info->after) { 3641 /* 3642 * We do not care about the result, only that 3643 * it gets updated atomically. 3644 */ 3645 (void)rb_time_cmpxchg(&cpu_buffer->write_stamp, 3646 info->after, save_before); 3647 } 3648 } 3649 } else { 3650 u64 ts; 3651 /* SLOW PATH - Interrupted between A and C */ 3652 a_ok = rb_time_read(&cpu_buffer->write_stamp, &info->after); 3653 /* Was interrupted before here, write_stamp must be valid */ 3654 RB_WARN_ON(cpu_buffer, !a_ok); 3655 ts = rb_time_stamp(cpu_buffer->buffer); 3656 barrier(); 3657 /*E*/ if (write == (local_read(&tail_page->write) & RB_WRITE_MASK) && 3658 info->after < ts && 3659 rb_time_cmpxchg(&cpu_buffer->write_stamp, 3660 info->after, ts)) { 3661 /* Nothing came after this event between C and E */ 3662 info->delta = ts - info->after; 3663 } else { 3664 /* 3665 * Interrupted between C and E: 3666 * Lost the previous events time stamp. Just set the 3667 * delta to zero, and this will be the same time as 3668 * the event this event interrupted. And the events that 3669 * came after this will still be correct (as they would 3670 * have built their delta on the previous event. 3671 */ 3672 info->delta = 0; 3673 } 3674 info->ts = ts; 3675 info->add_timestamp &= ~RB_ADD_STAMP_FORCE; 3676 } 3677 3678 /* 3679 * If this is the first commit on the page, then it has the same 3680 * timestamp as the page itself. 3681 */ 3682 if (unlikely(!tail && !(info->add_timestamp & 3683 (RB_ADD_STAMP_FORCE | RB_ADD_STAMP_ABSOLUTE)))) 3684 info->delta = 0; 3685 3686 /* We reserved something on the buffer */ 3687 3688 event = __rb_page_index(tail_page, tail); 3689 rb_update_event(cpu_buffer, event, info); 3690 3691 local_inc(&tail_page->entries); 3692 3693 /* 3694 * If this is the first commit on the page, then update 3695 * its timestamp. 3696 */ 3697 if (unlikely(!tail)) 3698 tail_page->page->time_stamp = info->ts; 3699 3700 /* account for these added bytes */ 3701 local_add(info->length, &cpu_buffer->entries_bytes); 3702 3703 return event; 3704 } 3705 3706 static __always_inline struct ring_buffer_event * 3707 rb_reserve_next_event(struct trace_buffer *buffer, 3708 struct ring_buffer_per_cpu *cpu_buffer, 3709 unsigned long length) 3710 { 3711 struct ring_buffer_event *event; 3712 struct rb_event_info info; 3713 int nr_loops = 0; 3714 int add_ts_default; 3715 3716 rb_start_commit(cpu_buffer); 3717 /* The commit page can not change after this */ 3718 3719 #ifdef CONFIG_RING_BUFFER_ALLOW_SWAP 3720 /* 3721 * Due to the ability to swap a cpu buffer from a buffer 3722 * it is possible it was swapped before we committed. 3723 * (committing stops a swap). We check for it here and 3724 * if it happened, we have to fail the write. 3725 */ 3726 barrier(); 3727 if (unlikely(READ_ONCE(cpu_buffer->buffer) != buffer)) { 3728 local_dec(&cpu_buffer->committing); 3729 local_dec(&cpu_buffer->commits); 3730 return NULL; 3731 } 3732 #endif 3733 3734 info.length = rb_calculate_event_length(length); 3735 3736 if (ring_buffer_time_stamp_abs(cpu_buffer->buffer)) { 3737 add_ts_default = RB_ADD_STAMP_ABSOLUTE; 3738 info.length += RB_LEN_TIME_EXTEND; 3739 } else { 3740 add_ts_default = RB_ADD_STAMP_NONE; 3741 } 3742 3743 again: 3744 info.add_timestamp = add_ts_default; 3745 info.delta = 0; 3746 3747 /* 3748 * We allow for interrupts to reenter here and do a trace. 3749 * If one does, it will cause this original code to loop 3750 * back here. Even with heavy interrupts happening, this 3751 * should only happen a few times in a row. If this happens 3752 * 1000 times in a row, there must be either an interrupt 3753 * storm or we have something buggy. 3754 * Bail! 3755 */ 3756 if (RB_WARN_ON(cpu_buffer, ++nr_loops > 1000)) 3757 goto out_fail; 3758 3759 event = __rb_reserve_next(cpu_buffer, &info); 3760 3761 if (unlikely(PTR_ERR(event) == -EAGAIN)) { 3762 if (info.add_timestamp & (RB_ADD_STAMP_FORCE | RB_ADD_STAMP_EXTEND)) 3763 info.length -= RB_LEN_TIME_EXTEND; 3764 goto again; 3765 } 3766 3767 if (likely(event)) 3768 return event; 3769 out_fail: 3770 rb_end_commit(cpu_buffer); 3771 return NULL; 3772 } 3773 3774 /** 3775 * ring_buffer_lock_reserve - reserve a part of the buffer 3776 * @buffer: the ring buffer to reserve from 3777 * @length: the length of the data to reserve (excluding event header) 3778 * 3779 * Returns a reserved event on the ring buffer to copy directly to. 3780 * The user of this interface will need to get the body to write into 3781 * and can use the ring_buffer_event_data() interface. 3782 * 3783 * The length is the length of the data needed, not the event length 3784 * which also includes the event header. 3785 * 3786 * Must be paired with ring_buffer_unlock_commit, unless NULL is returned. 3787 * If NULL is returned, then nothing has been allocated or locked. 3788 */ 3789 struct ring_buffer_event * 3790 ring_buffer_lock_reserve(struct trace_buffer *buffer, unsigned long length) 3791 { 3792 struct ring_buffer_per_cpu *cpu_buffer; 3793 struct ring_buffer_event *event; 3794 int cpu; 3795 3796 /* If we are tracing schedule, we don't want to recurse */ 3797 preempt_disable_notrace(); 3798 3799 if (unlikely(atomic_read(&buffer->record_disabled))) 3800 goto out; 3801 3802 cpu = raw_smp_processor_id(); 3803 3804 if (unlikely(!cpumask_test_cpu(cpu, buffer->cpumask))) 3805 goto out; 3806 3807 cpu_buffer = buffer->buffers[cpu]; 3808 3809 if (unlikely(atomic_read(&cpu_buffer->record_disabled))) 3810 goto out; 3811 3812 if (unlikely(length > BUF_MAX_DATA_SIZE)) 3813 goto out; 3814 3815 if (unlikely(trace_recursive_lock(cpu_buffer))) 3816 goto out; 3817 3818 event = rb_reserve_next_event(buffer, cpu_buffer, length); 3819 if (!event) 3820 goto out_unlock; 3821 3822 return event; 3823 3824 out_unlock: 3825 trace_recursive_unlock(cpu_buffer); 3826 out: 3827 preempt_enable_notrace(); 3828 return NULL; 3829 } 3830 EXPORT_SYMBOL_GPL(ring_buffer_lock_reserve); 3831 3832 /* 3833 * Decrement the entries to the page that an event is on. 3834 * The event does not even need to exist, only the pointer 3835 * to the page it is on. This may only be called before the commit 3836 * takes place. 3837 */ 3838 static inline void 3839 rb_decrement_entry(struct ring_buffer_per_cpu *cpu_buffer, 3840 struct ring_buffer_event *event) 3841 { 3842 unsigned long addr = (unsigned long)event; 3843 struct buffer_page *bpage = cpu_buffer->commit_page; 3844 struct buffer_page *start; 3845 3846 addr &= PAGE_MASK; 3847 3848 /* Do the likely case first */ 3849 if (likely(bpage->page == (void *)addr)) { 3850 local_dec(&bpage->entries); 3851 return; 3852 } 3853 3854 /* 3855 * Because the commit page may be on the reader page we 3856 * start with the next page and check the end loop there. 3857 */ 3858 rb_inc_page(&bpage); 3859 start = bpage; 3860 do { 3861 if (bpage->page == (void *)addr) { 3862 local_dec(&bpage->entries); 3863 return; 3864 } 3865 rb_inc_page(&bpage); 3866 } while (bpage != start); 3867 3868 /* commit not part of this buffer?? */ 3869 RB_WARN_ON(cpu_buffer, 1); 3870 } 3871 3872 /** 3873 * ring_buffer_discard_commit - discard an event that has not been committed 3874 * @buffer: the ring buffer 3875 * @event: non committed event to discard 3876 * 3877 * Sometimes an event that is in the ring buffer needs to be ignored. 3878 * This function lets the user discard an event in the ring buffer 3879 * and then that event will not be read later. 3880 * 3881 * This function only works if it is called before the item has been 3882 * committed. It will try to free the event from the ring buffer 3883 * if another event has not been added behind it. 3884 * 3885 * If another event has been added behind it, it will set the event 3886 * up as discarded, and perform the commit. 3887 * 3888 * If this function is called, do not call ring_buffer_unlock_commit on 3889 * the event. 3890 */ 3891 void ring_buffer_discard_commit(struct trace_buffer *buffer, 3892 struct ring_buffer_event *event) 3893 { 3894 struct ring_buffer_per_cpu *cpu_buffer; 3895 int cpu; 3896 3897 /* The event is discarded regardless */ 3898 rb_event_discard(event); 3899 3900 cpu = smp_processor_id(); 3901 cpu_buffer = buffer->buffers[cpu]; 3902 3903 /* 3904 * This must only be called if the event has not been 3905 * committed yet. Thus we can assume that preemption 3906 * is still disabled. 3907 */ 3908 RB_WARN_ON(buffer, !local_read(&cpu_buffer->committing)); 3909 3910 rb_decrement_entry(cpu_buffer, event); 3911 if (rb_try_to_discard(cpu_buffer, event)) 3912 goto out; 3913 3914 out: 3915 rb_end_commit(cpu_buffer); 3916 3917 trace_recursive_unlock(cpu_buffer); 3918 3919 preempt_enable_notrace(); 3920 3921 } 3922 EXPORT_SYMBOL_GPL(ring_buffer_discard_commit); 3923 3924 /** 3925 * ring_buffer_write - write data to the buffer without reserving 3926 * @buffer: The ring buffer to write to. 3927 * @length: The length of the data being written (excluding the event header) 3928 * @data: The data to write to the buffer. 3929 * 3930 * This is like ring_buffer_lock_reserve and ring_buffer_unlock_commit as 3931 * one function. If you already have the data to write to the buffer, it 3932 * may be easier to simply call this function. 3933 * 3934 * Note, like ring_buffer_lock_reserve, the length is the length of the data 3935 * and not the length of the event which would hold the header. 3936 */ 3937 int ring_buffer_write(struct trace_buffer *buffer, 3938 unsigned long length, 3939 void *data) 3940 { 3941 struct ring_buffer_per_cpu *cpu_buffer; 3942 struct ring_buffer_event *event; 3943 void *body; 3944 int ret = -EBUSY; 3945 int cpu; 3946 3947 preempt_disable_notrace(); 3948 3949 if (atomic_read(&buffer->record_disabled)) 3950 goto out; 3951 3952 cpu = raw_smp_processor_id(); 3953 3954 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 3955 goto out; 3956 3957 cpu_buffer = buffer->buffers[cpu]; 3958 3959 if (atomic_read(&cpu_buffer->record_disabled)) 3960 goto out; 3961 3962 if (length > BUF_MAX_DATA_SIZE) 3963 goto out; 3964 3965 if (unlikely(trace_recursive_lock(cpu_buffer))) 3966 goto out; 3967 3968 event = rb_reserve_next_event(buffer, cpu_buffer, length); 3969 if (!event) 3970 goto out_unlock; 3971 3972 body = rb_event_data(event); 3973 3974 memcpy(body, data, length); 3975 3976 rb_commit(cpu_buffer); 3977 3978 rb_wakeups(buffer, cpu_buffer); 3979 3980 ret = 0; 3981 3982 out_unlock: 3983 trace_recursive_unlock(cpu_buffer); 3984 3985 out: 3986 preempt_enable_notrace(); 3987 3988 return ret; 3989 } 3990 EXPORT_SYMBOL_GPL(ring_buffer_write); 3991 3992 static bool rb_per_cpu_empty(struct ring_buffer_per_cpu *cpu_buffer) 3993 { 3994 struct buffer_page *reader = cpu_buffer->reader_page; 3995 struct buffer_page *head = rb_set_head_page(cpu_buffer); 3996 struct buffer_page *commit = cpu_buffer->commit_page; 3997 3998 /* In case of error, head will be NULL */ 3999 if (unlikely(!head)) 4000 return true; 4001 4002 /* Reader should exhaust content in reader page */ 4003 if (reader->read != rb_page_commit(reader)) 4004 return false; 4005 4006 /* 4007 * If writers are committing on the reader page, knowing all 4008 * committed content has been read, the ring buffer is empty. 4009 */ 4010 if (commit == reader) 4011 return true; 4012 4013 /* 4014 * If writers are committing on a page other than reader page 4015 * and head page, there should always be content to read. 4016 */ 4017 if (commit != head) 4018 return false; 4019 4020 /* 4021 * Writers are committing on the head page, we just need 4022 * to care about there're committed data, and the reader will 4023 * swap reader page with head page when it is to read data. 4024 */ 4025 return rb_page_commit(commit) == 0; 4026 } 4027 4028 /** 4029 * ring_buffer_record_disable - stop all writes into the buffer 4030 * @buffer: The ring buffer to stop writes to. 4031 * 4032 * This prevents all writes to the buffer. Any attempt to write 4033 * to the buffer after this will fail and return NULL. 4034 * 4035 * The caller should call synchronize_rcu() after this. 4036 */ 4037 void ring_buffer_record_disable(struct trace_buffer *buffer) 4038 { 4039 atomic_inc(&buffer->record_disabled); 4040 } 4041 EXPORT_SYMBOL_GPL(ring_buffer_record_disable); 4042 4043 /** 4044 * ring_buffer_record_enable - enable writes to the buffer 4045 * @buffer: The ring buffer to enable writes 4046 * 4047 * Note, multiple disables will need the same number of enables 4048 * to truly enable the writing (much like preempt_disable). 4049 */ 4050 void ring_buffer_record_enable(struct trace_buffer *buffer) 4051 { 4052 atomic_dec(&buffer->record_disabled); 4053 } 4054 EXPORT_SYMBOL_GPL(ring_buffer_record_enable); 4055 4056 /** 4057 * ring_buffer_record_off - stop all writes into the buffer 4058 * @buffer: The ring buffer to stop writes to. 4059 * 4060 * This prevents all writes to the buffer. Any attempt to write 4061 * to the buffer after this will fail and return NULL. 4062 * 4063 * This is different than ring_buffer_record_disable() as 4064 * it works like an on/off switch, where as the disable() version 4065 * must be paired with a enable(). 4066 */ 4067 void ring_buffer_record_off(struct trace_buffer *buffer) 4068 { 4069 unsigned int rd; 4070 unsigned int new_rd; 4071 4072 rd = atomic_read(&buffer->record_disabled); 4073 do { 4074 new_rd = rd | RB_BUFFER_OFF; 4075 } while (!atomic_try_cmpxchg(&buffer->record_disabled, &rd, new_rd)); 4076 } 4077 EXPORT_SYMBOL_GPL(ring_buffer_record_off); 4078 4079 /** 4080 * ring_buffer_record_on - restart writes into the buffer 4081 * @buffer: The ring buffer to start writes to. 4082 * 4083 * This enables all writes to the buffer that was disabled by 4084 * ring_buffer_record_off(). 4085 * 4086 * This is different than ring_buffer_record_enable() as 4087 * it works like an on/off switch, where as the enable() version 4088 * must be paired with a disable(). 4089 */ 4090 void ring_buffer_record_on(struct trace_buffer *buffer) 4091 { 4092 unsigned int rd; 4093 unsigned int new_rd; 4094 4095 rd = atomic_read(&buffer->record_disabled); 4096 do { 4097 new_rd = rd & ~RB_BUFFER_OFF; 4098 } while (!atomic_try_cmpxchg(&buffer->record_disabled, &rd, new_rd)); 4099 } 4100 EXPORT_SYMBOL_GPL(ring_buffer_record_on); 4101 4102 /** 4103 * ring_buffer_record_is_on - return true if the ring buffer can write 4104 * @buffer: The ring buffer to see if write is enabled 4105 * 4106 * Returns true if the ring buffer is in a state that it accepts writes. 4107 */ 4108 bool ring_buffer_record_is_on(struct trace_buffer *buffer) 4109 { 4110 return !atomic_read(&buffer->record_disabled); 4111 } 4112 4113 /** 4114 * ring_buffer_record_is_set_on - return true if the ring buffer is set writable 4115 * @buffer: The ring buffer to see if write is set enabled 4116 * 4117 * Returns true if the ring buffer is set writable by ring_buffer_record_on(). 4118 * Note that this does NOT mean it is in a writable state. 4119 * 4120 * It may return true when the ring buffer has been disabled by 4121 * ring_buffer_record_disable(), as that is a temporary disabling of 4122 * the ring buffer. 4123 */ 4124 bool ring_buffer_record_is_set_on(struct trace_buffer *buffer) 4125 { 4126 return !(atomic_read(&buffer->record_disabled) & RB_BUFFER_OFF); 4127 } 4128 4129 /** 4130 * ring_buffer_record_disable_cpu - stop all writes into the cpu_buffer 4131 * @buffer: The ring buffer to stop writes to. 4132 * @cpu: The CPU buffer to stop 4133 * 4134 * This prevents all writes to the buffer. Any attempt to write 4135 * to the buffer after this will fail and return NULL. 4136 * 4137 * The caller should call synchronize_rcu() after this. 4138 */ 4139 void ring_buffer_record_disable_cpu(struct trace_buffer *buffer, int cpu) 4140 { 4141 struct ring_buffer_per_cpu *cpu_buffer; 4142 4143 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 4144 return; 4145 4146 cpu_buffer = buffer->buffers[cpu]; 4147 atomic_inc(&cpu_buffer->record_disabled); 4148 } 4149 EXPORT_SYMBOL_GPL(ring_buffer_record_disable_cpu); 4150 4151 /** 4152 * ring_buffer_record_enable_cpu - enable writes to the buffer 4153 * @buffer: The ring buffer to enable writes 4154 * @cpu: The CPU to enable. 4155 * 4156 * Note, multiple disables will need the same number of enables 4157 * to truly enable the writing (much like preempt_disable). 4158 */ 4159 void ring_buffer_record_enable_cpu(struct trace_buffer *buffer, int cpu) 4160 { 4161 struct ring_buffer_per_cpu *cpu_buffer; 4162 4163 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 4164 return; 4165 4166 cpu_buffer = buffer->buffers[cpu]; 4167 atomic_dec(&cpu_buffer->record_disabled); 4168 } 4169 EXPORT_SYMBOL_GPL(ring_buffer_record_enable_cpu); 4170 4171 /* 4172 * The total entries in the ring buffer is the running counter 4173 * of entries entered into the ring buffer, minus the sum of 4174 * the entries read from the ring buffer and the number of 4175 * entries that were overwritten. 4176 */ 4177 static inline unsigned long 4178 rb_num_of_entries(struct ring_buffer_per_cpu *cpu_buffer) 4179 { 4180 return local_read(&cpu_buffer->entries) - 4181 (local_read(&cpu_buffer->overrun) + cpu_buffer->read); 4182 } 4183 4184 /** 4185 * ring_buffer_oldest_event_ts - get the oldest event timestamp from the buffer 4186 * @buffer: The ring buffer 4187 * @cpu: The per CPU buffer to read from. 4188 */ 4189 u64 ring_buffer_oldest_event_ts(struct trace_buffer *buffer, int cpu) 4190 { 4191 unsigned long flags; 4192 struct ring_buffer_per_cpu *cpu_buffer; 4193 struct buffer_page *bpage; 4194 u64 ret = 0; 4195 4196 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 4197 return 0; 4198 4199 cpu_buffer = buffer->buffers[cpu]; 4200 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 4201 /* 4202 * if the tail is on reader_page, oldest time stamp is on the reader 4203 * page 4204 */ 4205 if (cpu_buffer->tail_page == cpu_buffer->reader_page) 4206 bpage = cpu_buffer->reader_page; 4207 else 4208 bpage = rb_set_head_page(cpu_buffer); 4209 if (bpage) 4210 ret = bpage->page->time_stamp; 4211 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 4212 4213 return ret; 4214 } 4215 EXPORT_SYMBOL_GPL(ring_buffer_oldest_event_ts); 4216 4217 /** 4218 * ring_buffer_bytes_cpu - get the number of bytes consumed in a cpu buffer 4219 * @buffer: The ring buffer 4220 * @cpu: The per CPU buffer to read from. 4221 */ 4222 unsigned long ring_buffer_bytes_cpu(struct trace_buffer *buffer, int cpu) 4223 { 4224 struct ring_buffer_per_cpu *cpu_buffer; 4225 unsigned long ret; 4226 4227 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 4228 return 0; 4229 4230 cpu_buffer = buffer->buffers[cpu]; 4231 ret = local_read(&cpu_buffer->entries_bytes) - cpu_buffer->read_bytes; 4232 4233 return ret; 4234 } 4235 EXPORT_SYMBOL_GPL(ring_buffer_bytes_cpu); 4236 4237 /** 4238 * ring_buffer_entries_cpu - get the number of entries in a cpu buffer 4239 * @buffer: The ring buffer 4240 * @cpu: The per CPU buffer to get the entries from. 4241 */ 4242 unsigned long ring_buffer_entries_cpu(struct trace_buffer *buffer, int cpu) 4243 { 4244 struct ring_buffer_per_cpu *cpu_buffer; 4245 4246 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 4247 return 0; 4248 4249 cpu_buffer = buffer->buffers[cpu]; 4250 4251 return rb_num_of_entries(cpu_buffer); 4252 } 4253 EXPORT_SYMBOL_GPL(ring_buffer_entries_cpu); 4254 4255 /** 4256 * ring_buffer_overrun_cpu - get the number of overruns caused by the ring 4257 * buffer wrapping around (only if RB_FL_OVERWRITE is on). 4258 * @buffer: The ring buffer 4259 * @cpu: The per CPU buffer to get the number of overruns from 4260 */ 4261 unsigned long ring_buffer_overrun_cpu(struct trace_buffer *buffer, int cpu) 4262 { 4263 struct ring_buffer_per_cpu *cpu_buffer; 4264 unsigned long ret; 4265 4266 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 4267 return 0; 4268 4269 cpu_buffer = buffer->buffers[cpu]; 4270 ret = local_read(&cpu_buffer->overrun); 4271 4272 return ret; 4273 } 4274 EXPORT_SYMBOL_GPL(ring_buffer_overrun_cpu); 4275 4276 /** 4277 * ring_buffer_commit_overrun_cpu - get the number of overruns caused by 4278 * commits failing due to the buffer wrapping around while there are uncommitted 4279 * events, such as during an interrupt storm. 4280 * @buffer: The ring buffer 4281 * @cpu: The per CPU buffer to get the number of overruns from 4282 */ 4283 unsigned long 4284 ring_buffer_commit_overrun_cpu(struct trace_buffer *buffer, int cpu) 4285 { 4286 struct ring_buffer_per_cpu *cpu_buffer; 4287 unsigned long ret; 4288 4289 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 4290 return 0; 4291 4292 cpu_buffer = buffer->buffers[cpu]; 4293 ret = local_read(&cpu_buffer->commit_overrun); 4294 4295 return ret; 4296 } 4297 EXPORT_SYMBOL_GPL(ring_buffer_commit_overrun_cpu); 4298 4299 /** 4300 * ring_buffer_dropped_events_cpu - get the number of dropped events caused by 4301 * the ring buffer filling up (only if RB_FL_OVERWRITE is off). 4302 * @buffer: The ring buffer 4303 * @cpu: The per CPU buffer to get the number of overruns from 4304 */ 4305 unsigned long 4306 ring_buffer_dropped_events_cpu(struct trace_buffer *buffer, int cpu) 4307 { 4308 struct ring_buffer_per_cpu *cpu_buffer; 4309 unsigned long ret; 4310 4311 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 4312 return 0; 4313 4314 cpu_buffer = buffer->buffers[cpu]; 4315 ret = local_read(&cpu_buffer->dropped_events); 4316 4317 return ret; 4318 } 4319 EXPORT_SYMBOL_GPL(ring_buffer_dropped_events_cpu); 4320 4321 /** 4322 * ring_buffer_read_events_cpu - get the number of events successfully read 4323 * @buffer: The ring buffer 4324 * @cpu: The per CPU buffer to get the number of events read 4325 */ 4326 unsigned long 4327 ring_buffer_read_events_cpu(struct trace_buffer *buffer, int cpu) 4328 { 4329 struct ring_buffer_per_cpu *cpu_buffer; 4330 4331 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 4332 return 0; 4333 4334 cpu_buffer = buffer->buffers[cpu]; 4335 return cpu_buffer->read; 4336 } 4337 EXPORT_SYMBOL_GPL(ring_buffer_read_events_cpu); 4338 4339 /** 4340 * ring_buffer_entries - get the number of entries in a buffer 4341 * @buffer: The ring buffer 4342 * 4343 * Returns the total number of entries in the ring buffer 4344 * (all CPU entries) 4345 */ 4346 unsigned long ring_buffer_entries(struct trace_buffer *buffer) 4347 { 4348 struct ring_buffer_per_cpu *cpu_buffer; 4349 unsigned long entries = 0; 4350 int cpu; 4351 4352 /* if you care about this being correct, lock the buffer */ 4353 for_each_buffer_cpu(buffer, cpu) { 4354 cpu_buffer = buffer->buffers[cpu]; 4355 entries += rb_num_of_entries(cpu_buffer); 4356 } 4357 4358 return entries; 4359 } 4360 EXPORT_SYMBOL_GPL(ring_buffer_entries); 4361 4362 /** 4363 * ring_buffer_overruns - get the number of overruns in buffer 4364 * @buffer: The ring buffer 4365 * 4366 * Returns the total number of overruns in the ring buffer 4367 * (all CPU entries) 4368 */ 4369 unsigned long ring_buffer_overruns(struct trace_buffer *buffer) 4370 { 4371 struct ring_buffer_per_cpu *cpu_buffer; 4372 unsigned long overruns = 0; 4373 int cpu; 4374 4375 /* if you care about this being correct, lock the buffer */ 4376 for_each_buffer_cpu(buffer, cpu) { 4377 cpu_buffer = buffer->buffers[cpu]; 4378 overruns += local_read(&cpu_buffer->overrun); 4379 } 4380 4381 return overruns; 4382 } 4383 EXPORT_SYMBOL_GPL(ring_buffer_overruns); 4384 4385 static void rb_iter_reset(struct ring_buffer_iter *iter) 4386 { 4387 struct ring_buffer_per_cpu *cpu_buffer = iter->cpu_buffer; 4388 4389 /* Iterator usage is expected to have record disabled */ 4390 iter->head_page = cpu_buffer->reader_page; 4391 iter->head = cpu_buffer->reader_page->read; 4392 iter->next_event = iter->head; 4393 4394 iter->cache_reader_page = iter->head_page; 4395 iter->cache_read = cpu_buffer->read; 4396 iter->cache_pages_removed = cpu_buffer->pages_removed; 4397 4398 if (iter->head) { 4399 iter->read_stamp = cpu_buffer->read_stamp; 4400 iter->page_stamp = cpu_buffer->reader_page->page->time_stamp; 4401 } else { 4402 iter->read_stamp = iter->head_page->page->time_stamp; 4403 iter->page_stamp = iter->read_stamp; 4404 } 4405 } 4406 4407 /** 4408 * ring_buffer_iter_reset - reset an iterator 4409 * @iter: The iterator to reset 4410 * 4411 * Resets the iterator, so that it will start from the beginning 4412 * again. 4413 */ 4414 void ring_buffer_iter_reset(struct ring_buffer_iter *iter) 4415 { 4416 struct ring_buffer_per_cpu *cpu_buffer; 4417 unsigned long flags; 4418 4419 if (!iter) 4420 return; 4421 4422 cpu_buffer = iter->cpu_buffer; 4423 4424 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 4425 rb_iter_reset(iter); 4426 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 4427 } 4428 EXPORT_SYMBOL_GPL(ring_buffer_iter_reset); 4429 4430 /** 4431 * ring_buffer_iter_empty - check if an iterator has no more to read 4432 * @iter: The iterator to check 4433 */ 4434 int ring_buffer_iter_empty(struct ring_buffer_iter *iter) 4435 { 4436 struct ring_buffer_per_cpu *cpu_buffer; 4437 struct buffer_page *reader; 4438 struct buffer_page *head_page; 4439 struct buffer_page *commit_page; 4440 struct buffer_page *curr_commit_page; 4441 unsigned commit; 4442 u64 curr_commit_ts; 4443 u64 commit_ts; 4444 4445 cpu_buffer = iter->cpu_buffer; 4446 reader = cpu_buffer->reader_page; 4447 head_page = cpu_buffer->head_page; 4448 commit_page = cpu_buffer->commit_page; 4449 commit_ts = commit_page->page->time_stamp; 4450 4451 /* 4452 * When the writer goes across pages, it issues a cmpxchg which 4453 * is a mb(), which will synchronize with the rmb here. 4454 * (see rb_tail_page_update()) 4455 */ 4456 smp_rmb(); 4457 commit = rb_page_commit(commit_page); 4458 /* We want to make sure that the commit page doesn't change */ 4459 smp_rmb(); 4460 4461 /* Make sure commit page didn't change */ 4462 curr_commit_page = READ_ONCE(cpu_buffer->commit_page); 4463 curr_commit_ts = READ_ONCE(curr_commit_page->page->time_stamp); 4464 4465 /* If the commit page changed, then there's more data */ 4466 if (curr_commit_page != commit_page || 4467 curr_commit_ts != commit_ts) 4468 return 0; 4469 4470 /* Still racy, as it may return a false positive, but that's OK */ 4471 return ((iter->head_page == commit_page && iter->head >= commit) || 4472 (iter->head_page == reader && commit_page == head_page && 4473 head_page->read == commit && 4474 iter->head == rb_page_commit(cpu_buffer->reader_page))); 4475 } 4476 EXPORT_SYMBOL_GPL(ring_buffer_iter_empty); 4477 4478 static void 4479 rb_update_read_stamp(struct ring_buffer_per_cpu *cpu_buffer, 4480 struct ring_buffer_event *event) 4481 { 4482 u64 delta; 4483 4484 switch (event->type_len) { 4485 case RINGBUF_TYPE_PADDING: 4486 return; 4487 4488 case RINGBUF_TYPE_TIME_EXTEND: 4489 delta = rb_event_time_stamp(event); 4490 cpu_buffer->read_stamp += delta; 4491 return; 4492 4493 case RINGBUF_TYPE_TIME_STAMP: 4494 delta = rb_event_time_stamp(event); 4495 delta = rb_fix_abs_ts(delta, cpu_buffer->read_stamp); 4496 cpu_buffer->read_stamp = delta; 4497 return; 4498 4499 case RINGBUF_TYPE_DATA: 4500 cpu_buffer->read_stamp += event->time_delta; 4501 return; 4502 4503 default: 4504 RB_WARN_ON(cpu_buffer, 1); 4505 } 4506 } 4507 4508 static void 4509 rb_update_iter_read_stamp(struct ring_buffer_iter *iter, 4510 struct ring_buffer_event *event) 4511 { 4512 u64 delta; 4513 4514 switch (event->type_len) { 4515 case RINGBUF_TYPE_PADDING: 4516 return; 4517 4518 case RINGBUF_TYPE_TIME_EXTEND: 4519 delta = rb_event_time_stamp(event); 4520 iter->read_stamp += delta; 4521 return; 4522 4523 case RINGBUF_TYPE_TIME_STAMP: 4524 delta = rb_event_time_stamp(event); 4525 delta = rb_fix_abs_ts(delta, iter->read_stamp); 4526 iter->read_stamp = delta; 4527 return; 4528 4529 case RINGBUF_TYPE_DATA: 4530 iter->read_stamp += event->time_delta; 4531 return; 4532 4533 default: 4534 RB_WARN_ON(iter->cpu_buffer, 1); 4535 } 4536 } 4537 4538 static struct buffer_page * 4539 rb_get_reader_page(struct ring_buffer_per_cpu *cpu_buffer) 4540 { 4541 struct buffer_page *reader = NULL; 4542 unsigned long overwrite; 4543 unsigned long flags; 4544 int nr_loops = 0; 4545 bool ret; 4546 4547 local_irq_save(flags); 4548 arch_spin_lock(&cpu_buffer->lock); 4549 4550 again: 4551 /* 4552 * This should normally only loop twice. But because the 4553 * start of the reader inserts an empty page, it causes 4554 * a case where we will loop three times. There should be no 4555 * reason to loop four times (that I know of). 4556 */ 4557 if (RB_WARN_ON(cpu_buffer, ++nr_loops > 3)) { 4558 reader = NULL; 4559 goto out; 4560 } 4561 4562 reader = cpu_buffer->reader_page; 4563 4564 /* If there's more to read, return this page */ 4565 if (cpu_buffer->reader_page->read < rb_page_size(reader)) 4566 goto out; 4567 4568 /* Never should we have an index greater than the size */ 4569 if (RB_WARN_ON(cpu_buffer, 4570 cpu_buffer->reader_page->read > rb_page_size(reader))) 4571 goto out; 4572 4573 /* check if we caught up to the tail */ 4574 reader = NULL; 4575 if (cpu_buffer->commit_page == cpu_buffer->reader_page) 4576 goto out; 4577 4578 /* Don't bother swapping if the ring buffer is empty */ 4579 if (rb_num_of_entries(cpu_buffer) == 0) 4580 goto out; 4581 4582 /* 4583 * Reset the reader page to size zero. 4584 */ 4585 local_set(&cpu_buffer->reader_page->write, 0); 4586 local_set(&cpu_buffer->reader_page->entries, 0); 4587 local_set(&cpu_buffer->reader_page->page->commit, 0); 4588 cpu_buffer->reader_page->real_end = 0; 4589 4590 spin: 4591 /* 4592 * Splice the empty reader page into the list around the head. 4593 */ 4594 reader = rb_set_head_page(cpu_buffer); 4595 if (!reader) 4596 goto out; 4597 cpu_buffer->reader_page->list.next = rb_list_head(reader->list.next); 4598 cpu_buffer->reader_page->list.prev = reader->list.prev; 4599 4600 /* 4601 * cpu_buffer->pages just needs to point to the buffer, it 4602 * has no specific buffer page to point to. Lets move it out 4603 * of our way so we don't accidentally swap it. 4604 */ 4605 cpu_buffer->pages = reader->list.prev; 4606 4607 /* The reader page will be pointing to the new head */ 4608 rb_set_list_to_head(&cpu_buffer->reader_page->list); 4609 4610 /* 4611 * We want to make sure we read the overruns after we set up our 4612 * pointers to the next object. The writer side does a 4613 * cmpxchg to cross pages which acts as the mb on the writer 4614 * side. Note, the reader will constantly fail the swap 4615 * while the writer is updating the pointers, so this 4616 * guarantees that the overwrite recorded here is the one we 4617 * want to compare with the last_overrun. 4618 */ 4619 smp_mb(); 4620 overwrite = local_read(&(cpu_buffer->overrun)); 4621 4622 /* 4623 * Here's the tricky part. 4624 * 4625 * We need to move the pointer past the header page. 4626 * But we can only do that if a writer is not currently 4627 * moving it. The page before the header page has the 4628 * flag bit '1' set if it is pointing to the page we want. 4629 * but if the writer is in the process of moving it 4630 * than it will be '2' or already moved '0'. 4631 */ 4632 4633 ret = rb_head_page_replace(reader, cpu_buffer->reader_page); 4634 4635 /* 4636 * If we did not convert it, then we must try again. 4637 */ 4638 if (!ret) 4639 goto spin; 4640 4641 /* 4642 * Yay! We succeeded in replacing the page. 4643 * 4644 * Now make the new head point back to the reader page. 4645 */ 4646 rb_list_head(reader->list.next)->prev = &cpu_buffer->reader_page->list; 4647 rb_inc_page(&cpu_buffer->head_page); 4648 4649 local_inc(&cpu_buffer->pages_read); 4650 4651 /* Finally update the reader page to the new head */ 4652 cpu_buffer->reader_page = reader; 4653 cpu_buffer->reader_page->read = 0; 4654 4655 if (overwrite != cpu_buffer->last_overrun) { 4656 cpu_buffer->lost_events = overwrite - cpu_buffer->last_overrun; 4657 cpu_buffer->last_overrun = overwrite; 4658 } 4659 4660 goto again; 4661 4662 out: 4663 /* Update the read_stamp on the first event */ 4664 if (reader && reader->read == 0) 4665 cpu_buffer->read_stamp = reader->page->time_stamp; 4666 4667 arch_spin_unlock(&cpu_buffer->lock); 4668 local_irq_restore(flags); 4669 4670 /* 4671 * The writer has preempt disable, wait for it. But not forever 4672 * Although, 1 second is pretty much "forever" 4673 */ 4674 #define USECS_WAIT 1000000 4675 for (nr_loops = 0; nr_loops < USECS_WAIT; nr_loops++) { 4676 /* If the write is past the end of page, a writer is still updating it */ 4677 if (likely(!reader || rb_page_write(reader) <= BUF_PAGE_SIZE)) 4678 break; 4679 4680 udelay(1); 4681 4682 /* Get the latest version of the reader write value */ 4683 smp_rmb(); 4684 } 4685 4686 /* The writer is not moving forward? Something is wrong */ 4687 if (RB_WARN_ON(cpu_buffer, nr_loops == USECS_WAIT)) 4688 reader = NULL; 4689 4690 /* 4691 * Make sure we see any padding after the write update 4692 * (see rb_reset_tail()). 4693 * 4694 * In addition, a writer may be writing on the reader page 4695 * if the page has not been fully filled, so the read barrier 4696 * is also needed to make sure we see the content of what is 4697 * committed by the writer (see rb_set_commit_to_write()). 4698 */ 4699 smp_rmb(); 4700 4701 4702 return reader; 4703 } 4704 4705 static void rb_advance_reader(struct ring_buffer_per_cpu *cpu_buffer) 4706 { 4707 struct ring_buffer_event *event; 4708 struct buffer_page *reader; 4709 unsigned length; 4710 4711 reader = rb_get_reader_page(cpu_buffer); 4712 4713 /* This function should not be called when buffer is empty */ 4714 if (RB_WARN_ON(cpu_buffer, !reader)) 4715 return; 4716 4717 event = rb_reader_event(cpu_buffer); 4718 4719 if (event->type_len <= RINGBUF_TYPE_DATA_TYPE_LEN_MAX) 4720 cpu_buffer->read++; 4721 4722 rb_update_read_stamp(cpu_buffer, event); 4723 4724 length = rb_event_length(event); 4725 cpu_buffer->reader_page->read += length; 4726 } 4727 4728 static void rb_advance_iter(struct ring_buffer_iter *iter) 4729 { 4730 struct ring_buffer_per_cpu *cpu_buffer; 4731 4732 cpu_buffer = iter->cpu_buffer; 4733 4734 /* If head == next_event then we need to jump to the next event */ 4735 if (iter->head == iter->next_event) { 4736 /* If the event gets overwritten again, there's nothing to do */ 4737 if (rb_iter_head_event(iter) == NULL) 4738 return; 4739 } 4740 4741 iter->head = iter->next_event; 4742 4743 /* 4744 * Check if we are at the end of the buffer. 4745 */ 4746 if (iter->next_event >= rb_page_size(iter->head_page)) { 4747 /* discarded commits can make the page empty */ 4748 if (iter->head_page == cpu_buffer->commit_page) 4749 return; 4750 rb_inc_iter(iter); 4751 return; 4752 } 4753 4754 rb_update_iter_read_stamp(iter, iter->event); 4755 } 4756 4757 static int rb_lost_events(struct ring_buffer_per_cpu *cpu_buffer) 4758 { 4759 return cpu_buffer->lost_events; 4760 } 4761 4762 static struct ring_buffer_event * 4763 rb_buffer_peek(struct ring_buffer_per_cpu *cpu_buffer, u64 *ts, 4764 unsigned long *lost_events) 4765 { 4766 struct ring_buffer_event *event; 4767 struct buffer_page *reader; 4768 int nr_loops = 0; 4769 4770 if (ts) 4771 *ts = 0; 4772 again: 4773 /* 4774 * We repeat when a time extend is encountered. 4775 * Since the time extend is always attached to a data event, 4776 * we should never loop more than once. 4777 * (We never hit the following condition more than twice). 4778 */ 4779 if (RB_WARN_ON(cpu_buffer, ++nr_loops > 2)) 4780 return NULL; 4781 4782 reader = rb_get_reader_page(cpu_buffer); 4783 if (!reader) 4784 return NULL; 4785 4786 event = rb_reader_event(cpu_buffer); 4787 4788 switch (event->type_len) { 4789 case RINGBUF_TYPE_PADDING: 4790 if (rb_null_event(event)) 4791 RB_WARN_ON(cpu_buffer, 1); 4792 /* 4793 * Because the writer could be discarding every 4794 * event it creates (which would probably be bad) 4795 * if we were to go back to "again" then we may never 4796 * catch up, and will trigger the warn on, or lock 4797 * the box. Return the padding, and we will release 4798 * the current locks, and try again. 4799 */ 4800 return event; 4801 4802 case RINGBUF_TYPE_TIME_EXTEND: 4803 /* Internal data, OK to advance */ 4804 rb_advance_reader(cpu_buffer); 4805 goto again; 4806 4807 case RINGBUF_TYPE_TIME_STAMP: 4808 if (ts) { 4809 *ts = rb_event_time_stamp(event); 4810 *ts = rb_fix_abs_ts(*ts, reader->page->time_stamp); 4811 ring_buffer_normalize_time_stamp(cpu_buffer->buffer, 4812 cpu_buffer->cpu, ts); 4813 } 4814 /* Internal data, OK to advance */ 4815 rb_advance_reader(cpu_buffer); 4816 goto again; 4817 4818 case RINGBUF_TYPE_DATA: 4819 if (ts && !(*ts)) { 4820 *ts = cpu_buffer->read_stamp + event->time_delta; 4821 ring_buffer_normalize_time_stamp(cpu_buffer->buffer, 4822 cpu_buffer->cpu, ts); 4823 } 4824 if (lost_events) 4825 *lost_events = rb_lost_events(cpu_buffer); 4826 return event; 4827 4828 default: 4829 RB_WARN_ON(cpu_buffer, 1); 4830 } 4831 4832 return NULL; 4833 } 4834 EXPORT_SYMBOL_GPL(ring_buffer_peek); 4835 4836 static struct ring_buffer_event * 4837 rb_iter_peek(struct ring_buffer_iter *iter, u64 *ts) 4838 { 4839 struct trace_buffer *buffer; 4840 struct ring_buffer_per_cpu *cpu_buffer; 4841 struct ring_buffer_event *event; 4842 int nr_loops = 0; 4843 4844 if (ts) 4845 *ts = 0; 4846 4847 cpu_buffer = iter->cpu_buffer; 4848 buffer = cpu_buffer->buffer; 4849 4850 /* 4851 * Check if someone performed a consuming read to the buffer 4852 * or removed some pages from the buffer. In these cases, 4853 * iterator was invalidated and we need to reset it. 4854 */ 4855 if (unlikely(iter->cache_read != cpu_buffer->read || 4856 iter->cache_reader_page != cpu_buffer->reader_page || 4857 iter->cache_pages_removed != cpu_buffer->pages_removed)) 4858 rb_iter_reset(iter); 4859 4860 again: 4861 if (ring_buffer_iter_empty(iter)) 4862 return NULL; 4863 4864 /* 4865 * As the writer can mess with what the iterator is trying 4866 * to read, just give up if we fail to get an event after 4867 * three tries. The iterator is not as reliable when reading 4868 * the ring buffer with an active write as the consumer is. 4869 * Do not warn if the three failures is reached. 4870 */ 4871 if (++nr_loops > 3) 4872 return NULL; 4873 4874 if (rb_per_cpu_empty(cpu_buffer)) 4875 return NULL; 4876 4877 if (iter->head >= rb_page_size(iter->head_page)) { 4878 rb_inc_iter(iter); 4879 goto again; 4880 } 4881 4882 event = rb_iter_head_event(iter); 4883 if (!event) 4884 goto again; 4885 4886 switch (event->type_len) { 4887 case RINGBUF_TYPE_PADDING: 4888 if (rb_null_event(event)) { 4889 rb_inc_iter(iter); 4890 goto again; 4891 } 4892 rb_advance_iter(iter); 4893 return event; 4894 4895 case RINGBUF_TYPE_TIME_EXTEND: 4896 /* Internal data, OK to advance */ 4897 rb_advance_iter(iter); 4898 goto again; 4899 4900 case RINGBUF_TYPE_TIME_STAMP: 4901 if (ts) { 4902 *ts = rb_event_time_stamp(event); 4903 *ts = rb_fix_abs_ts(*ts, iter->head_page->page->time_stamp); 4904 ring_buffer_normalize_time_stamp(cpu_buffer->buffer, 4905 cpu_buffer->cpu, ts); 4906 } 4907 /* Internal data, OK to advance */ 4908 rb_advance_iter(iter); 4909 goto again; 4910 4911 case RINGBUF_TYPE_DATA: 4912 if (ts && !(*ts)) { 4913 *ts = iter->read_stamp + event->time_delta; 4914 ring_buffer_normalize_time_stamp(buffer, 4915 cpu_buffer->cpu, ts); 4916 } 4917 return event; 4918 4919 default: 4920 RB_WARN_ON(cpu_buffer, 1); 4921 } 4922 4923 return NULL; 4924 } 4925 EXPORT_SYMBOL_GPL(ring_buffer_iter_peek); 4926 4927 static inline bool rb_reader_lock(struct ring_buffer_per_cpu *cpu_buffer) 4928 { 4929 if (likely(!in_nmi())) { 4930 raw_spin_lock(&cpu_buffer->reader_lock); 4931 return true; 4932 } 4933 4934 /* 4935 * If an NMI die dumps out the content of the ring buffer 4936 * trylock must be used to prevent a deadlock if the NMI 4937 * preempted a task that holds the ring buffer locks. If 4938 * we get the lock then all is fine, if not, then continue 4939 * to do the read, but this can corrupt the ring buffer, 4940 * so it must be permanently disabled from future writes. 4941 * Reading from NMI is a oneshot deal. 4942 */ 4943 if (raw_spin_trylock(&cpu_buffer->reader_lock)) 4944 return true; 4945 4946 /* Continue without locking, but disable the ring buffer */ 4947 atomic_inc(&cpu_buffer->record_disabled); 4948 return false; 4949 } 4950 4951 static inline void 4952 rb_reader_unlock(struct ring_buffer_per_cpu *cpu_buffer, bool locked) 4953 { 4954 if (likely(locked)) 4955 raw_spin_unlock(&cpu_buffer->reader_lock); 4956 } 4957 4958 /** 4959 * ring_buffer_peek - peek at the next event to be read 4960 * @buffer: The ring buffer to read 4961 * @cpu: The cpu to peak at 4962 * @ts: The timestamp counter of this event. 4963 * @lost_events: a variable to store if events were lost (may be NULL) 4964 * 4965 * This will return the event that will be read next, but does 4966 * not consume the data. 4967 */ 4968 struct ring_buffer_event * 4969 ring_buffer_peek(struct trace_buffer *buffer, int cpu, u64 *ts, 4970 unsigned long *lost_events) 4971 { 4972 struct ring_buffer_per_cpu *cpu_buffer = buffer->buffers[cpu]; 4973 struct ring_buffer_event *event; 4974 unsigned long flags; 4975 bool dolock; 4976 4977 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 4978 return NULL; 4979 4980 again: 4981 local_irq_save(flags); 4982 dolock = rb_reader_lock(cpu_buffer); 4983 event = rb_buffer_peek(cpu_buffer, ts, lost_events); 4984 if (event && event->type_len == RINGBUF_TYPE_PADDING) 4985 rb_advance_reader(cpu_buffer); 4986 rb_reader_unlock(cpu_buffer, dolock); 4987 local_irq_restore(flags); 4988 4989 if (event && event->type_len == RINGBUF_TYPE_PADDING) 4990 goto again; 4991 4992 return event; 4993 } 4994 4995 /** ring_buffer_iter_dropped - report if there are dropped events 4996 * @iter: The ring buffer iterator 4997 * 4998 * Returns true if there was dropped events since the last peek. 4999 */ 5000 bool ring_buffer_iter_dropped(struct ring_buffer_iter *iter) 5001 { 5002 bool ret = iter->missed_events != 0; 5003 5004 iter->missed_events = 0; 5005 return ret; 5006 } 5007 EXPORT_SYMBOL_GPL(ring_buffer_iter_dropped); 5008 5009 /** 5010 * ring_buffer_iter_peek - peek at the next event to be read 5011 * @iter: The ring buffer iterator 5012 * @ts: The timestamp counter of this event. 5013 * 5014 * This will return the event that will be read next, but does 5015 * not increment the iterator. 5016 */ 5017 struct ring_buffer_event * 5018 ring_buffer_iter_peek(struct ring_buffer_iter *iter, u64 *ts) 5019 { 5020 struct ring_buffer_per_cpu *cpu_buffer = iter->cpu_buffer; 5021 struct ring_buffer_event *event; 5022 unsigned long flags; 5023 5024 again: 5025 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 5026 event = rb_iter_peek(iter, ts); 5027 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 5028 5029 if (event && event->type_len == RINGBUF_TYPE_PADDING) 5030 goto again; 5031 5032 return event; 5033 } 5034 5035 /** 5036 * ring_buffer_consume - return an event and consume it 5037 * @buffer: The ring buffer to get the next event from 5038 * @cpu: the cpu to read the buffer from 5039 * @ts: a variable to store the timestamp (may be NULL) 5040 * @lost_events: a variable to store if events were lost (may be NULL) 5041 * 5042 * Returns the next event in the ring buffer, and that event is consumed. 5043 * Meaning, that sequential reads will keep returning a different event, 5044 * and eventually empty the ring buffer if the producer is slower. 5045 */ 5046 struct ring_buffer_event * 5047 ring_buffer_consume(struct trace_buffer *buffer, int cpu, u64 *ts, 5048 unsigned long *lost_events) 5049 { 5050 struct ring_buffer_per_cpu *cpu_buffer; 5051 struct ring_buffer_event *event = NULL; 5052 unsigned long flags; 5053 bool dolock; 5054 5055 again: 5056 /* might be called in atomic */ 5057 preempt_disable(); 5058 5059 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 5060 goto out; 5061 5062 cpu_buffer = buffer->buffers[cpu]; 5063 local_irq_save(flags); 5064 dolock = rb_reader_lock(cpu_buffer); 5065 5066 event = rb_buffer_peek(cpu_buffer, ts, lost_events); 5067 if (event) { 5068 cpu_buffer->lost_events = 0; 5069 rb_advance_reader(cpu_buffer); 5070 } 5071 5072 rb_reader_unlock(cpu_buffer, dolock); 5073 local_irq_restore(flags); 5074 5075 out: 5076 preempt_enable(); 5077 5078 if (event && event->type_len == RINGBUF_TYPE_PADDING) 5079 goto again; 5080 5081 return event; 5082 } 5083 EXPORT_SYMBOL_GPL(ring_buffer_consume); 5084 5085 /** 5086 * ring_buffer_read_prepare - Prepare for a non consuming read of the buffer 5087 * @buffer: The ring buffer to read from 5088 * @cpu: The cpu buffer to iterate over 5089 * @flags: gfp flags to use for memory allocation 5090 * 5091 * This performs the initial preparations necessary to iterate 5092 * through the buffer. Memory is allocated, buffer recording 5093 * is disabled, and the iterator pointer is returned to the caller. 5094 * 5095 * Disabling buffer recording prevents the reading from being 5096 * corrupted. This is not a consuming read, so a producer is not 5097 * expected. 5098 * 5099 * After a sequence of ring_buffer_read_prepare calls, the user is 5100 * expected to make at least one call to ring_buffer_read_prepare_sync. 5101 * Afterwards, ring_buffer_read_start is invoked to get things going 5102 * for real. 5103 * 5104 * This overall must be paired with ring_buffer_read_finish. 5105 */ 5106 struct ring_buffer_iter * 5107 ring_buffer_read_prepare(struct trace_buffer *buffer, int cpu, gfp_t flags) 5108 { 5109 struct ring_buffer_per_cpu *cpu_buffer; 5110 struct ring_buffer_iter *iter; 5111 5112 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 5113 return NULL; 5114 5115 iter = kzalloc(sizeof(*iter), flags); 5116 if (!iter) 5117 return NULL; 5118 5119 iter->event = kmalloc(BUF_MAX_DATA_SIZE, flags); 5120 if (!iter->event) { 5121 kfree(iter); 5122 return NULL; 5123 } 5124 5125 cpu_buffer = buffer->buffers[cpu]; 5126 5127 iter->cpu_buffer = cpu_buffer; 5128 5129 atomic_inc(&cpu_buffer->resize_disabled); 5130 5131 return iter; 5132 } 5133 EXPORT_SYMBOL_GPL(ring_buffer_read_prepare); 5134 5135 /** 5136 * ring_buffer_read_prepare_sync - Synchronize a set of prepare calls 5137 * 5138 * All previously invoked ring_buffer_read_prepare calls to prepare 5139 * iterators will be synchronized. Afterwards, read_buffer_read_start 5140 * calls on those iterators are allowed. 5141 */ 5142 void 5143 ring_buffer_read_prepare_sync(void) 5144 { 5145 synchronize_rcu(); 5146 } 5147 EXPORT_SYMBOL_GPL(ring_buffer_read_prepare_sync); 5148 5149 /** 5150 * ring_buffer_read_start - start a non consuming read of the buffer 5151 * @iter: The iterator returned by ring_buffer_read_prepare 5152 * 5153 * This finalizes the startup of an iteration through the buffer. 5154 * The iterator comes from a call to ring_buffer_read_prepare and 5155 * an intervening ring_buffer_read_prepare_sync must have been 5156 * performed. 5157 * 5158 * Must be paired with ring_buffer_read_finish. 5159 */ 5160 void 5161 ring_buffer_read_start(struct ring_buffer_iter *iter) 5162 { 5163 struct ring_buffer_per_cpu *cpu_buffer; 5164 unsigned long flags; 5165 5166 if (!iter) 5167 return; 5168 5169 cpu_buffer = iter->cpu_buffer; 5170 5171 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 5172 arch_spin_lock(&cpu_buffer->lock); 5173 rb_iter_reset(iter); 5174 arch_spin_unlock(&cpu_buffer->lock); 5175 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 5176 } 5177 EXPORT_SYMBOL_GPL(ring_buffer_read_start); 5178 5179 /** 5180 * ring_buffer_read_finish - finish reading the iterator of the buffer 5181 * @iter: The iterator retrieved by ring_buffer_start 5182 * 5183 * This re-enables the recording to the buffer, and frees the 5184 * iterator. 5185 */ 5186 void 5187 ring_buffer_read_finish(struct ring_buffer_iter *iter) 5188 { 5189 struct ring_buffer_per_cpu *cpu_buffer = iter->cpu_buffer; 5190 unsigned long flags; 5191 5192 /* 5193 * Ring buffer is disabled from recording, here's a good place 5194 * to check the integrity of the ring buffer. 5195 * Must prevent readers from trying to read, as the check 5196 * clears the HEAD page and readers require it. 5197 */ 5198 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 5199 rb_check_pages(cpu_buffer); 5200 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 5201 5202 atomic_dec(&cpu_buffer->resize_disabled); 5203 kfree(iter->event); 5204 kfree(iter); 5205 } 5206 EXPORT_SYMBOL_GPL(ring_buffer_read_finish); 5207 5208 /** 5209 * ring_buffer_iter_advance - advance the iterator to the next location 5210 * @iter: The ring buffer iterator 5211 * 5212 * Move the location of the iterator such that the next read will 5213 * be the next location of the iterator. 5214 */ 5215 void ring_buffer_iter_advance(struct ring_buffer_iter *iter) 5216 { 5217 struct ring_buffer_per_cpu *cpu_buffer = iter->cpu_buffer; 5218 unsigned long flags; 5219 5220 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 5221 5222 rb_advance_iter(iter); 5223 5224 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 5225 } 5226 EXPORT_SYMBOL_GPL(ring_buffer_iter_advance); 5227 5228 /** 5229 * ring_buffer_size - return the size of the ring buffer (in bytes) 5230 * @buffer: The ring buffer. 5231 * @cpu: The CPU to get ring buffer size from. 5232 */ 5233 unsigned long ring_buffer_size(struct trace_buffer *buffer, int cpu) 5234 { 5235 /* 5236 * Earlier, this method returned 5237 * BUF_PAGE_SIZE * buffer->nr_pages 5238 * Since the nr_pages field is now removed, we have converted this to 5239 * return the per cpu buffer value. 5240 */ 5241 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 5242 return 0; 5243 5244 return BUF_PAGE_SIZE * buffer->buffers[cpu]->nr_pages; 5245 } 5246 EXPORT_SYMBOL_GPL(ring_buffer_size); 5247 5248 static void rb_clear_buffer_page(struct buffer_page *page) 5249 { 5250 local_set(&page->write, 0); 5251 local_set(&page->entries, 0); 5252 rb_init_page(page->page); 5253 page->read = 0; 5254 } 5255 5256 static void 5257 rb_reset_cpu(struct ring_buffer_per_cpu *cpu_buffer) 5258 { 5259 struct buffer_page *page; 5260 5261 rb_head_page_deactivate(cpu_buffer); 5262 5263 cpu_buffer->head_page 5264 = list_entry(cpu_buffer->pages, struct buffer_page, list); 5265 rb_clear_buffer_page(cpu_buffer->head_page); 5266 list_for_each_entry(page, cpu_buffer->pages, list) { 5267 rb_clear_buffer_page(page); 5268 } 5269 5270 cpu_buffer->tail_page = cpu_buffer->head_page; 5271 cpu_buffer->commit_page = cpu_buffer->head_page; 5272 5273 INIT_LIST_HEAD(&cpu_buffer->reader_page->list); 5274 INIT_LIST_HEAD(&cpu_buffer->new_pages); 5275 rb_clear_buffer_page(cpu_buffer->reader_page); 5276 5277 local_set(&cpu_buffer->entries_bytes, 0); 5278 local_set(&cpu_buffer->overrun, 0); 5279 local_set(&cpu_buffer->commit_overrun, 0); 5280 local_set(&cpu_buffer->dropped_events, 0); 5281 local_set(&cpu_buffer->entries, 0); 5282 local_set(&cpu_buffer->committing, 0); 5283 local_set(&cpu_buffer->commits, 0); 5284 local_set(&cpu_buffer->pages_touched, 0); 5285 local_set(&cpu_buffer->pages_lost, 0); 5286 local_set(&cpu_buffer->pages_read, 0); 5287 cpu_buffer->last_pages_touch = 0; 5288 cpu_buffer->shortest_full = 0; 5289 cpu_buffer->read = 0; 5290 cpu_buffer->read_bytes = 0; 5291 5292 rb_time_set(&cpu_buffer->write_stamp, 0); 5293 rb_time_set(&cpu_buffer->before_stamp, 0); 5294 5295 memset(cpu_buffer->event_stamp, 0, sizeof(cpu_buffer->event_stamp)); 5296 5297 cpu_buffer->lost_events = 0; 5298 cpu_buffer->last_overrun = 0; 5299 5300 rb_head_page_activate(cpu_buffer); 5301 cpu_buffer->pages_removed = 0; 5302 } 5303 5304 /* Must have disabled the cpu buffer then done a synchronize_rcu */ 5305 static void reset_disabled_cpu_buffer(struct ring_buffer_per_cpu *cpu_buffer) 5306 { 5307 unsigned long flags; 5308 5309 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 5310 5311 if (RB_WARN_ON(cpu_buffer, local_read(&cpu_buffer->committing))) 5312 goto out; 5313 5314 arch_spin_lock(&cpu_buffer->lock); 5315 5316 rb_reset_cpu(cpu_buffer); 5317 5318 arch_spin_unlock(&cpu_buffer->lock); 5319 5320 out: 5321 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 5322 } 5323 5324 /** 5325 * ring_buffer_reset_cpu - reset a ring buffer per CPU buffer 5326 * @buffer: The ring buffer to reset a per cpu buffer of 5327 * @cpu: The CPU buffer to be reset 5328 */ 5329 void ring_buffer_reset_cpu(struct trace_buffer *buffer, int cpu) 5330 { 5331 struct ring_buffer_per_cpu *cpu_buffer = buffer->buffers[cpu]; 5332 5333 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 5334 return; 5335 5336 /* prevent another thread from changing buffer sizes */ 5337 mutex_lock(&buffer->mutex); 5338 5339 atomic_inc(&cpu_buffer->resize_disabled); 5340 atomic_inc(&cpu_buffer->record_disabled); 5341 5342 /* Make sure all commits have finished */ 5343 synchronize_rcu(); 5344 5345 reset_disabled_cpu_buffer(cpu_buffer); 5346 5347 atomic_dec(&cpu_buffer->record_disabled); 5348 atomic_dec(&cpu_buffer->resize_disabled); 5349 5350 mutex_unlock(&buffer->mutex); 5351 } 5352 EXPORT_SYMBOL_GPL(ring_buffer_reset_cpu); 5353 5354 /* Flag to ensure proper resetting of atomic variables */ 5355 #define RESET_BIT (1 << 30) 5356 5357 /** 5358 * ring_buffer_reset_online_cpus - reset a ring buffer per CPU buffer 5359 * @buffer: The ring buffer to reset a per cpu buffer of 5360 */ 5361 void ring_buffer_reset_online_cpus(struct trace_buffer *buffer) 5362 { 5363 struct ring_buffer_per_cpu *cpu_buffer; 5364 int cpu; 5365 5366 /* prevent another thread from changing buffer sizes */ 5367 mutex_lock(&buffer->mutex); 5368 5369 for_each_online_buffer_cpu(buffer, cpu) { 5370 cpu_buffer = buffer->buffers[cpu]; 5371 5372 atomic_add(RESET_BIT, &cpu_buffer->resize_disabled); 5373 atomic_inc(&cpu_buffer->record_disabled); 5374 } 5375 5376 /* Make sure all commits have finished */ 5377 synchronize_rcu(); 5378 5379 for_each_buffer_cpu(buffer, cpu) { 5380 cpu_buffer = buffer->buffers[cpu]; 5381 5382 /* 5383 * If a CPU came online during the synchronize_rcu(), then 5384 * ignore it. 5385 */ 5386 if (!(atomic_read(&cpu_buffer->resize_disabled) & RESET_BIT)) 5387 continue; 5388 5389 reset_disabled_cpu_buffer(cpu_buffer); 5390 5391 atomic_dec(&cpu_buffer->record_disabled); 5392 atomic_sub(RESET_BIT, &cpu_buffer->resize_disabled); 5393 } 5394 5395 mutex_unlock(&buffer->mutex); 5396 } 5397 5398 /** 5399 * ring_buffer_reset - reset a ring buffer 5400 * @buffer: The ring buffer to reset all cpu buffers 5401 */ 5402 void ring_buffer_reset(struct trace_buffer *buffer) 5403 { 5404 struct ring_buffer_per_cpu *cpu_buffer; 5405 int cpu; 5406 5407 /* prevent another thread from changing buffer sizes */ 5408 mutex_lock(&buffer->mutex); 5409 5410 for_each_buffer_cpu(buffer, cpu) { 5411 cpu_buffer = buffer->buffers[cpu]; 5412 5413 atomic_inc(&cpu_buffer->resize_disabled); 5414 atomic_inc(&cpu_buffer->record_disabled); 5415 } 5416 5417 /* Make sure all commits have finished */ 5418 synchronize_rcu(); 5419 5420 for_each_buffer_cpu(buffer, cpu) { 5421 cpu_buffer = buffer->buffers[cpu]; 5422 5423 reset_disabled_cpu_buffer(cpu_buffer); 5424 5425 atomic_dec(&cpu_buffer->record_disabled); 5426 atomic_dec(&cpu_buffer->resize_disabled); 5427 } 5428 5429 mutex_unlock(&buffer->mutex); 5430 } 5431 EXPORT_SYMBOL_GPL(ring_buffer_reset); 5432 5433 /** 5434 * ring_buffer_empty - is the ring buffer empty? 5435 * @buffer: The ring buffer to test 5436 */ 5437 bool ring_buffer_empty(struct trace_buffer *buffer) 5438 { 5439 struct ring_buffer_per_cpu *cpu_buffer; 5440 unsigned long flags; 5441 bool dolock; 5442 bool ret; 5443 int cpu; 5444 5445 /* yes this is racy, but if you don't like the race, lock the buffer */ 5446 for_each_buffer_cpu(buffer, cpu) { 5447 cpu_buffer = buffer->buffers[cpu]; 5448 local_irq_save(flags); 5449 dolock = rb_reader_lock(cpu_buffer); 5450 ret = rb_per_cpu_empty(cpu_buffer); 5451 rb_reader_unlock(cpu_buffer, dolock); 5452 local_irq_restore(flags); 5453 5454 if (!ret) 5455 return false; 5456 } 5457 5458 return true; 5459 } 5460 EXPORT_SYMBOL_GPL(ring_buffer_empty); 5461 5462 /** 5463 * ring_buffer_empty_cpu - is a cpu buffer of a ring buffer empty? 5464 * @buffer: The ring buffer 5465 * @cpu: The CPU buffer to test 5466 */ 5467 bool ring_buffer_empty_cpu(struct trace_buffer *buffer, int cpu) 5468 { 5469 struct ring_buffer_per_cpu *cpu_buffer; 5470 unsigned long flags; 5471 bool dolock; 5472 bool ret; 5473 5474 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 5475 return true; 5476 5477 cpu_buffer = buffer->buffers[cpu]; 5478 local_irq_save(flags); 5479 dolock = rb_reader_lock(cpu_buffer); 5480 ret = rb_per_cpu_empty(cpu_buffer); 5481 rb_reader_unlock(cpu_buffer, dolock); 5482 local_irq_restore(flags); 5483 5484 return ret; 5485 } 5486 EXPORT_SYMBOL_GPL(ring_buffer_empty_cpu); 5487 5488 #ifdef CONFIG_RING_BUFFER_ALLOW_SWAP 5489 /** 5490 * ring_buffer_swap_cpu - swap a CPU buffer between two ring buffers 5491 * @buffer_a: One buffer to swap with 5492 * @buffer_b: The other buffer to swap with 5493 * @cpu: the CPU of the buffers to swap 5494 * 5495 * This function is useful for tracers that want to take a "snapshot" 5496 * of a CPU buffer and has another back up buffer lying around. 5497 * it is expected that the tracer handles the cpu buffer not being 5498 * used at the moment. 5499 */ 5500 int ring_buffer_swap_cpu(struct trace_buffer *buffer_a, 5501 struct trace_buffer *buffer_b, int cpu) 5502 { 5503 struct ring_buffer_per_cpu *cpu_buffer_a; 5504 struct ring_buffer_per_cpu *cpu_buffer_b; 5505 int ret = -EINVAL; 5506 5507 if (!cpumask_test_cpu(cpu, buffer_a->cpumask) || 5508 !cpumask_test_cpu(cpu, buffer_b->cpumask)) 5509 goto out; 5510 5511 cpu_buffer_a = buffer_a->buffers[cpu]; 5512 cpu_buffer_b = buffer_b->buffers[cpu]; 5513 5514 /* At least make sure the two buffers are somewhat the same */ 5515 if (cpu_buffer_a->nr_pages != cpu_buffer_b->nr_pages) 5516 goto out; 5517 5518 ret = -EAGAIN; 5519 5520 if (atomic_read(&buffer_a->record_disabled)) 5521 goto out; 5522 5523 if (atomic_read(&buffer_b->record_disabled)) 5524 goto out; 5525 5526 if (atomic_read(&cpu_buffer_a->record_disabled)) 5527 goto out; 5528 5529 if (atomic_read(&cpu_buffer_b->record_disabled)) 5530 goto out; 5531 5532 /* 5533 * We can't do a synchronize_rcu here because this 5534 * function can be called in atomic context. 5535 * Normally this will be called from the same CPU as cpu. 5536 * If not it's up to the caller to protect this. 5537 */ 5538 atomic_inc(&cpu_buffer_a->record_disabled); 5539 atomic_inc(&cpu_buffer_b->record_disabled); 5540 5541 ret = -EBUSY; 5542 if (local_read(&cpu_buffer_a->committing)) 5543 goto out_dec; 5544 if (local_read(&cpu_buffer_b->committing)) 5545 goto out_dec; 5546 5547 /* 5548 * When resize is in progress, we cannot swap it because 5549 * it will mess the state of the cpu buffer. 5550 */ 5551 if (atomic_read(&buffer_a->resizing)) 5552 goto out_dec; 5553 if (atomic_read(&buffer_b->resizing)) 5554 goto out_dec; 5555 5556 buffer_a->buffers[cpu] = cpu_buffer_b; 5557 buffer_b->buffers[cpu] = cpu_buffer_a; 5558 5559 cpu_buffer_b->buffer = buffer_a; 5560 cpu_buffer_a->buffer = buffer_b; 5561 5562 ret = 0; 5563 5564 out_dec: 5565 atomic_dec(&cpu_buffer_a->record_disabled); 5566 atomic_dec(&cpu_buffer_b->record_disabled); 5567 out: 5568 return ret; 5569 } 5570 EXPORT_SYMBOL_GPL(ring_buffer_swap_cpu); 5571 #endif /* CONFIG_RING_BUFFER_ALLOW_SWAP */ 5572 5573 /** 5574 * ring_buffer_alloc_read_page - allocate a page to read from buffer 5575 * @buffer: the buffer to allocate for. 5576 * @cpu: the cpu buffer to allocate. 5577 * 5578 * This function is used in conjunction with ring_buffer_read_page. 5579 * When reading a full page from the ring buffer, these functions 5580 * can be used to speed up the process. The calling function should 5581 * allocate a few pages first with this function. Then when it 5582 * needs to get pages from the ring buffer, it passes the result 5583 * of this function into ring_buffer_read_page, which will swap 5584 * the page that was allocated, with the read page of the buffer. 5585 * 5586 * Returns: 5587 * The page allocated, or ERR_PTR 5588 */ 5589 void *ring_buffer_alloc_read_page(struct trace_buffer *buffer, int cpu) 5590 { 5591 struct ring_buffer_per_cpu *cpu_buffer; 5592 struct buffer_data_page *bpage = NULL; 5593 unsigned long flags; 5594 struct page *page; 5595 5596 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 5597 return ERR_PTR(-ENODEV); 5598 5599 cpu_buffer = buffer->buffers[cpu]; 5600 local_irq_save(flags); 5601 arch_spin_lock(&cpu_buffer->lock); 5602 5603 if (cpu_buffer->free_page) { 5604 bpage = cpu_buffer->free_page; 5605 cpu_buffer->free_page = NULL; 5606 } 5607 5608 arch_spin_unlock(&cpu_buffer->lock); 5609 local_irq_restore(flags); 5610 5611 if (bpage) 5612 goto out; 5613 5614 page = alloc_pages_node(cpu_to_node(cpu), 5615 GFP_KERNEL | __GFP_NORETRY, 0); 5616 if (!page) 5617 return ERR_PTR(-ENOMEM); 5618 5619 bpage = page_address(page); 5620 5621 out: 5622 rb_init_page(bpage); 5623 5624 return bpage; 5625 } 5626 EXPORT_SYMBOL_GPL(ring_buffer_alloc_read_page); 5627 5628 /** 5629 * ring_buffer_free_read_page - free an allocated read page 5630 * @buffer: the buffer the page was allocate for 5631 * @cpu: the cpu buffer the page came from 5632 * @data: the page to free 5633 * 5634 * Free a page allocated from ring_buffer_alloc_read_page. 5635 */ 5636 void ring_buffer_free_read_page(struct trace_buffer *buffer, int cpu, void *data) 5637 { 5638 struct ring_buffer_per_cpu *cpu_buffer; 5639 struct buffer_data_page *bpage = data; 5640 struct page *page = virt_to_page(bpage); 5641 unsigned long flags; 5642 5643 if (!buffer || !buffer->buffers || !buffer->buffers[cpu]) 5644 return; 5645 5646 cpu_buffer = buffer->buffers[cpu]; 5647 5648 /* If the page is still in use someplace else, we can't reuse it */ 5649 if (page_ref_count(page) > 1) 5650 goto out; 5651 5652 local_irq_save(flags); 5653 arch_spin_lock(&cpu_buffer->lock); 5654 5655 if (!cpu_buffer->free_page) { 5656 cpu_buffer->free_page = bpage; 5657 bpage = NULL; 5658 } 5659 5660 arch_spin_unlock(&cpu_buffer->lock); 5661 local_irq_restore(flags); 5662 5663 out: 5664 free_page((unsigned long)bpage); 5665 } 5666 EXPORT_SYMBOL_GPL(ring_buffer_free_read_page); 5667 5668 /** 5669 * ring_buffer_read_page - extract a page from the ring buffer 5670 * @buffer: buffer to extract from 5671 * @data_page: the page to use allocated from ring_buffer_alloc_read_page 5672 * @len: amount to extract 5673 * @cpu: the cpu of the buffer to extract 5674 * @full: should the extraction only happen when the page is full. 5675 * 5676 * This function will pull out a page from the ring buffer and consume it. 5677 * @data_page must be the address of the variable that was returned 5678 * from ring_buffer_alloc_read_page. This is because the page might be used 5679 * to swap with a page in the ring buffer. 5680 * 5681 * for example: 5682 * rpage = ring_buffer_alloc_read_page(buffer, cpu); 5683 * if (IS_ERR(rpage)) 5684 * return PTR_ERR(rpage); 5685 * ret = ring_buffer_read_page(buffer, &rpage, len, cpu, 0); 5686 * if (ret >= 0) 5687 * process_page(rpage, ret); 5688 * 5689 * When @full is set, the function will not return true unless 5690 * the writer is off the reader page. 5691 * 5692 * Note: it is up to the calling functions to handle sleeps and wakeups. 5693 * The ring buffer can be used anywhere in the kernel and can not 5694 * blindly call wake_up. The layer that uses the ring buffer must be 5695 * responsible for that. 5696 * 5697 * Returns: 5698 * >=0 if data has been transferred, returns the offset of consumed data. 5699 * <0 if no data has been transferred. 5700 */ 5701 int ring_buffer_read_page(struct trace_buffer *buffer, 5702 void **data_page, size_t len, int cpu, int full) 5703 { 5704 struct ring_buffer_per_cpu *cpu_buffer = buffer->buffers[cpu]; 5705 struct ring_buffer_event *event; 5706 struct buffer_data_page *bpage; 5707 struct buffer_page *reader; 5708 unsigned long missed_events; 5709 unsigned long flags; 5710 unsigned int commit; 5711 unsigned int read; 5712 u64 save_timestamp; 5713 int ret = -1; 5714 5715 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 5716 goto out; 5717 5718 /* 5719 * If len is not big enough to hold the page header, then 5720 * we can not copy anything. 5721 */ 5722 if (len <= BUF_PAGE_HDR_SIZE) 5723 goto out; 5724 5725 len -= BUF_PAGE_HDR_SIZE; 5726 5727 if (!data_page) 5728 goto out; 5729 5730 bpage = *data_page; 5731 if (!bpage) 5732 goto out; 5733 5734 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 5735 5736 reader = rb_get_reader_page(cpu_buffer); 5737 if (!reader) 5738 goto out_unlock; 5739 5740 event = rb_reader_event(cpu_buffer); 5741 5742 read = reader->read; 5743 commit = rb_page_commit(reader); 5744 5745 /* Check if any events were dropped */ 5746 missed_events = cpu_buffer->lost_events; 5747 5748 /* 5749 * If this page has been partially read or 5750 * if len is not big enough to read the rest of the page or 5751 * a writer is still on the page, then 5752 * we must copy the data from the page to the buffer. 5753 * Otherwise, we can simply swap the page with the one passed in. 5754 */ 5755 if (read || (len < (commit - read)) || 5756 cpu_buffer->reader_page == cpu_buffer->commit_page) { 5757 struct buffer_data_page *rpage = cpu_buffer->reader_page->page; 5758 unsigned int rpos = read; 5759 unsigned int pos = 0; 5760 unsigned int size; 5761 5762 /* 5763 * If a full page is expected, this can still be returned 5764 * if there's been a previous partial read and the 5765 * rest of the page can be read and the commit page is off 5766 * the reader page. 5767 */ 5768 if (full && 5769 (!read || (len < (commit - read)) || 5770 cpu_buffer->reader_page == cpu_buffer->commit_page)) 5771 goto out_unlock; 5772 5773 if (len > (commit - read)) 5774 len = (commit - read); 5775 5776 /* Always keep the time extend and data together */ 5777 size = rb_event_ts_length(event); 5778 5779 if (len < size) 5780 goto out_unlock; 5781 5782 /* save the current timestamp, since the user will need it */ 5783 save_timestamp = cpu_buffer->read_stamp; 5784 5785 /* Need to copy one event at a time */ 5786 do { 5787 /* We need the size of one event, because 5788 * rb_advance_reader only advances by one event, 5789 * whereas rb_event_ts_length may include the size of 5790 * one or two events. 5791 * We have already ensured there's enough space if this 5792 * is a time extend. */ 5793 size = rb_event_length(event); 5794 memcpy(bpage->data + pos, rpage->data + rpos, size); 5795 5796 len -= size; 5797 5798 rb_advance_reader(cpu_buffer); 5799 rpos = reader->read; 5800 pos += size; 5801 5802 if (rpos >= commit) 5803 break; 5804 5805 event = rb_reader_event(cpu_buffer); 5806 /* Always keep the time extend and data together */ 5807 size = rb_event_ts_length(event); 5808 } while (len >= size); 5809 5810 /* update bpage */ 5811 local_set(&bpage->commit, pos); 5812 bpage->time_stamp = save_timestamp; 5813 5814 /* we copied everything to the beginning */ 5815 read = 0; 5816 } else { 5817 /* update the entry counter */ 5818 cpu_buffer->read += rb_page_entries(reader); 5819 cpu_buffer->read_bytes += BUF_PAGE_SIZE; 5820 5821 /* swap the pages */ 5822 rb_init_page(bpage); 5823 bpage = reader->page; 5824 reader->page = *data_page; 5825 local_set(&reader->write, 0); 5826 local_set(&reader->entries, 0); 5827 reader->read = 0; 5828 *data_page = bpage; 5829 5830 /* 5831 * Use the real_end for the data size, 5832 * This gives us a chance to store the lost events 5833 * on the page. 5834 */ 5835 if (reader->real_end) 5836 local_set(&bpage->commit, reader->real_end); 5837 } 5838 ret = read; 5839 5840 cpu_buffer->lost_events = 0; 5841 5842 commit = local_read(&bpage->commit); 5843 /* 5844 * Set a flag in the commit field if we lost events 5845 */ 5846 if (missed_events) { 5847 /* If there is room at the end of the page to save the 5848 * missed events, then record it there. 5849 */ 5850 if (BUF_PAGE_SIZE - commit >= sizeof(missed_events)) { 5851 memcpy(&bpage->data[commit], &missed_events, 5852 sizeof(missed_events)); 5853 local_add(RB_MISSED_STORED, &bpage->commit); 5854 commit += sizeof(missed_events); 5855 } 5856 local_add(RB_MISSED_EVENTS, &bpage->commit); 5857 } 5858 5859 /* 5860 * This page may be off to user land. Zero it out here. 5861 */ 5862 if (commit < BUF_PAGE_SIZE) 5863 memset(&bpage->data[commit], 0, BUF_PAGE_SIZE - commit); 5864 5865 out_unlock: 5866 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 5867 5868 out: 5869 return ret; 5870 } 5871 EXPORT_SYMBOL_GPL(ring_buffer_read_page); 5872 5873 /* 5874 * We only allocate new buffers, never free them if the CPU goes down. 5875 * If we were to free the buffer, then the user would lose any trace that was in 5876 * the buffer. 5877 */ 5878 int trace_rb_cpu_prepare(unsigned int cpu, struct hlist_node *node) 5879 { 5880 struct trace_buffer *buffer; 5881 long nr_pages_same; 5882 int cpu_i; 5883 unsigned long nr_pages; 5884 5885 buffer = container_of(node, struct trace_buffer, node); 5886 if (cpumask_test_cpu(cpu, buffer->cpumask)) 5887 return 0; 5888 5889 nr_pages = 0; 5890 nr_pages_same = 1; 5891 /* check if all cpu sizes are same */ 5892 for_each_buffer_cpu(buffer, cpu_i) { 5893 /* fill in the size from first enabled cpu */ 5894 if (nr_pages == 0) 5895 nr_pages = buffer->buffers[cpu_i]->nr_pages; 5896 if (nr_pages != buffer->buffers[cpu_i]->nr_pages) { 5897 nr_pages_same = 0; 5898 break; 5899 } 5900 } 5901 /* allocate minimum pages, user can later expand it */ 5902 if (!nr_pages_same) 5903 nr_pages = 2; 5904 buffer->buffers[cpu] = 5905 rb_allocate_cpu_buffer(buffer, nr_pages, cpu); 5906 if (!buffer->buffers[cpu]) { 5907 WARN(1, "failed to allocate ring buffer on CPU %u\n", 5908 cpu); 5909 return -ENOMEM; 5910 } 5911 smp_wmb(); 5912 cpumask_set_cpu(cpu, buffer->cpumask); 5913 return 0; 5914 } 5915 5916 #ifdef CONFIG_RING_BUFFER_STARTUP_TEST 5917 /* 5918 * This is a basic integrity check of the ring buffer. 5919 * Late in the boot cycle this test will run when configured in. 5920 * It will kick off a thread per CPU that will go into a loop 5921 * writing to the per cpu ring buffer various sizes of data. 5922 * Some of the data will be large items, some small. 5923 * 5924 * Another thread is created that goes into a spin, sending out 5925 * IPIs to the other CPUs to also write into the ring buffer. 5926 * this is to test the nesting ability of the buffer. 5927 * 5928 * Basic stats are recorded and reported. If something in the 5929 * ring buffer should happen that's not expected, a big warning 5930 * is displayed and all ring buffers are disabled. 5931 */ 5932 static struct task_struct *rb_threads[NR_CPUS] __initdata; 5933 5934 struct rb_test_data { 5935 struct trace_buffer *buffer; 5936 unsigned long events; 5937 unsigned long bytes_written; 5938 unsigned long bytes_alloc; 5939 unsigned long bytes_dropped; 5940 unsigned long events_nested; 5941 unsigned long bytes_written_nested; 5942 unsigned long bytes_alloc_nested; 5943 unsigned long bytes_dropped_nested; 5944 int min_size_nested; 5945 int max_size_nested; 5946 int max_size; 5947 int min_size; 5948 int cpu; 5949 int cnt; 5950 }; 5951 5952 static struct rb_test_data rb_data[NR_CPUS] __initdata; 5953 5954 /* 1 meg per cpu */ 5955 #define RB_TEST_BUFFER_SIZE 1048576 5956 5957 static char rb_string[] __initdata = 5958 "abcdefghijklmnopqrstuvwxyz1234567890!@#$%^&*()?+\\" 5959 "?+|:';\",.<>/?abcdefghijklmnopqrstuvwxyz1234567890" 5960 "!@#$%^&*()?+\\?+|:';\",.<>/?abcdefghijklmnopqrstuv"; 5961 5962 static bool rb_test_started __initdata; 5963 5964 struct rb_item { 5965 int size; 5966 char str[]; 5967 }; 5968 5969 static __init int rb_write_something(struct rb_test_data *data, bool nested) 5970 { 5971 struct ring_buffer_event *event; 5972 struct rb_item *item; 5973 bool started; 5974 int event_len; 5975 int size; 5976 int len; 5977 int cnt; 5978 5979 /* Have nested writes different that what is written */ 5980 cnt = data->cnt + (nested ? 27 : 0); 5981 5982 /* Multiply cnt by ~e, to make some unique increment */ 5983 size = (cnt * 68 / 25) % (sizeof(rb_string) - 1); 5984 5985 len = size + sizeof(struct rb_item); 5986 5987 started = rb_test_started; 5988 /* read rb_test_started before checking buffer enabled */ 5989 smp_rmb(); 5990 5991 event = ring_buffer_lock_reserve(data->buffer, len); 5992 if (!event) { 5993 /* Ignore dropped events before test starts. */ 5994 if (started) { 5995 if (nested) 5996 data->bytes_dropped += len; 5997 else 5998 data->bytes_dropped_nested += len; 5999 } 6000 return len; 6001 } 6002 6003 event_len = ring_buffer_event_length(event); 6004 6005 if (RB_WARN_ON(data->buffer, event_len < len)) 6006 goto out; 6007 6008 item = ring_buffer_event_data(event); 6009 item->size = size; 6010 memcpy(item->str, rb_string, size); 6011 6012 if (nested) { 6013 data->bytes_alloc_nested += event_len; 6014 data->bytes_written_nested += len; 6015 data->events_nested++; 6016 if (!data->min_size_nested || len < data->min_size_nested) 6017 data->min_size_nested = len; 6018 if (len > data->max_size_nested) 6019 data->max_size_nested = len; 6020 } else { 6021 data->bytes_alloc += event_len; 6022 data->bytes_written += len; 6023 data->events++; 6024 if (!data->min_size || len < data->min_size) 6025 data->max_size = len; 6026 if (len > data->max_size) 6027 data->max_size = len; 6028 } 6029 6030 out: 6031 ring_buffer_unlock_commit(data->buffer); 6032 6033 return 0; 6034 } 6035 6036 static __init int rb_test(void *arg) 6037 { 6038 struct rb_test_data *data = arg; 6039 6040 while (!kthread_should_stop()) { 6041 rb_write_something(data, false); 6042 data->cnt++; 6043 6044 set_current_state(TASK_INTERRUPTIBLE); 6045 /* Now sleep between a min of 100-300us and a max of 1ms */ 6046 usleep_range(((data->cnt % 3) + 1) * 100, 1000); 6047 } 6048 6049 return 0; 6050 } 6051 6052 static __init void rb_ipi(void *ignore) 6053 { 6054 struct rb_test_data *data; 6055 int cpu = smp_processor_id(); 6056 6057 data = &rb_data[cpu]; 6058 rb_write_something(data, true); 6059 } 6060 6061 static __init int rb_hammer_test(void *arg) 6062 { 6063 while (!kthread_should_stop()) { 6064 6065 /* Send an IPI to all cpus to write data! */ 6066 smp_call_function(rb_ipi, NULL, 1); 6067 /* No sleep, but for non preempt, let others run */ 6068 schedule(); 6069 } 6070 6071 return 0; 6072 } 6073 6074 static __init int test_ringbuffer(void) 6075 { 6076 struct task_struct *rb_hammer; 6077 struct trace_buffer *buffer; 6078 int cpu; 6079 int ret = 0; 6080 6081 if (security_locked_down(LOCKDOWN_TRACEFS)) { 6082 pr_warn("Lockdown is enabled, skipping ring buffer tests\n"); 6083 return 0; 6084 } 6085 6086 pr_info("Running ring buffer tests...\n"); 6087 6088 buffer = ring_buffer_alloc(RB_TEST_BUFFER_SIZE, RB_FL_OVERWRITE); 6089 if (WARN_ON(!buffer)) 6090 return 0; 6091 6092 /* Disable buffer so that threads can't write to it yet */ 6093 ring_buffer_record_off(buffer); 6094 6095 for_each_online_cpu(cpu) { 6096 rb_data[cpu].buffer = buffer; 6097 rb_data[cpu].cpu = cpu; 6098 rb_data[cpu].cnt = cpu; 6099 rb_threads[cpu] = kthread_run_on_cpu(rb_test, &rb_data[cpu], 6100 cpu, "rbtester/%u"); 6101 if (WARN_ON(IS_ERR(rb_threads[cpu]))) { 6102 pr_cont("FAILED\n"); 6103 ret = PTR_ERR(rb_threads[cpu]); 6104 goto out_free; 6105 } 6106 } 6107 6108 /* Now create the rb hammer! */ 6109 rb_hammer = kthread_run(rb_hammer_test, NULL, "rbhammer"); 6110 if (WARN_ON(IS_ERR(rb_hammer))) { 6111 pr_cont("FAILED\n"); 6112 ret = PTR_ERR(rb_hammer); 6113 goto out_free; 6114 } 6115 6116 ring_buffer_record_on(buffer); 6117 /* 6118 * Show buffer is enabled before setting rb_test_started. 6119 * Yes there's a small race window where events could be 6120 * dropped and the thread wont catch it. But when a ring 6121 * buffer gets enabled, there will always be some kind of 6122 * delay before other CPUs see it. Thus, we don't care about 6123 * those dropped events. We care about events dropped after 6124 * the threads see that the buffer is active. 6125 */ 6126 smp_wmb(); 6127 rb_test_started = true; 6128 6129 set_current_state(TASK_INTERRUPTIBLE); 6130 /* Just run for 10 seconds */; 6131 schedule_timeout(10 * HZ); 6132 6133 kthread_stop(rb_hammer); 6134 6135 out_free: 6136 for_each_online_cpu(cpu) { 6137 if (!rb_threads[cpu]) 6138 break; 6139 kthread_stop(rb_threads[cpu]); 6140 } 6141 if (ret) { 6142 ring_buffer_free(buffer); 6143 return ret; 6144 } 6145 6146 /* Report! */ 6147 pr_info("finished\n"); 6148 for_each_online_cpu(cpu) { 6149 struct ring_buffer_event *event; 6150 struct rb_test_data *data = &rb_data[cpu]; 6151 struct rb_item *item; 6152 unsigned long total_events; 6153 unsigned long total_dropped; 6154 unsigned long total_written; 6155 unsigned long total_alloc; 6156 unsigned long total_read = 0; 6157 unsigned long total_size = 0; 6158 unsigned long total_len = 0; 6159 unsigned long total_lost = 0; 6160 unsigned long lost; 6161 int big_event_size; 6162 int small_event_size; 6163 6164 ret = -1; 6165 6166 total_events = data->events + data->events_nested; 6167 total_written = data->bytes_written + data->bytes_written_nested; 6168 total_alloc = data->bytes_alloc + data->bytes_alloc_nested; 6169 total_dropped = data->bytes_dropped + data->bytes_dropped_nested; 6170 6171 big_event_size = data->max_size + data->max_size_nested; 6172 small_event_size = data->min_size + data->min_size_nested; 6173 6174 pr_info("CPU %d:\n", cpu); 6175 pr_info(" events: %ld\n", total_events); 6176 pr_info(" dropped bytes: %ld\n", total_dropped); 6177 pr_info(" alloced bytes: %ld\n", total_alloc); 6178 pr_info(" written bytes: %ld\n", total_written); 6179 pr_info(" biggest event: %d\n", big_event_size); 6180 pr_info(" smallest event: %d\n", small_event_size); 6181 6182 if (RB_WARN_ON(buffer, total_dropped)) 6183 break; 6184 6185 ret = 0; 6186 6187 while ((event = ring_buffer_consume(buffer, cpu, NULL, &lost))) { 6188 total_lost += lost; 6189 item = ring_buffer_event_data(event); 6190 total_len += ring_buffer_event_length(event); 6191 total_size += item->size + sizeof(struct rb_item); 6192 if (memcmp(&item->str[0], rb_string, item->size) != 0) { 6193 pr_info("FAILED!\n"); 6194 pr_info("buffer had: %.*s\n", item->size, item->str); 6195 pr_info("expected: %.*s\n", item->size, rb_string); 6196 RB_WARN_ON(buffer, 1); 6197 ret = -1; 6198 break; 6199 } 6200 total_read++; 6201 } 6202 if (ret) 6203 break; 6204 6205 ret = -1; 6206 6207 pr_info(" read events: %ld\n", total_read); 6208 pr_info(" lost events: %ld\n", total_lost); 6209 pr_info(" total events: %ld\n", total_lost + total_read); 6210 pr_info(" recorded len bytes: %ld\n", total_len); 6211 pr_info(" recorded size bytes: %ld\n", total_size); 6212 if (total_lost) { 6213 pr_info(" With dropped events, record len and size may not match\n" 6214 " alloced and written from above\n"); 6215 } else { 6216 if (RB_WARN_ON(buffer, total_len != total_alloc || 6217 total_size != total_written)) 6218 break; 6219 } 6220 if (RB_WARN_ON(buffer, total_lost + total_read != total_events)) 6221 break; 6222 6223 ret = 0; 6224 } 6225 if (!ret) 6226 pr_info("Ring buffer PASSED!\n"); 6227 6228 ring_buffer_free(buffer); 6229 return 0; 6230 } 6231 6232 late_initcall(test_ringbuffer); 6233 #endif /* CONFIG_RING_BUFFER_STARTUP_TEST */ 6234