1 // SPDX-License-Identifier: GPL-2.0 2 /* 3 * Generic ring buffer 4 * 5 * Copyright (C) 2008 Steven Rostedt <srostedt@redhat.com> 6 */ 7 #include <linux/trace_recursion.h> 8 #include <linux/trace_events.h> 9 #include <linux/ring_buffer.h> 10 #include <linux/trace_clock.h> 11 #include <linux/sched/clock.h> 12 #include <linux/trace_seq.h> 13 #include <linux/spinlock.h> 14 #include <linux/irq_work.h> 15 #include <linux/security.h> 16 #include <linux/uaccess.h> 17 #include <linux/hardirq.h> 18 #include <linux/kthread.h> /* for self test */ 19 #include <linux/module.h> 20 #include <linux/percpu.h> 21 #include <linux/mutex.h> 22 #include <linux/delay.h> 23 #include <linux/slab.h> 24 #include <linux/init.h> 25 #include <linux/hash.h> 26 #include <linux/list.h> 27 #include <linux/cpu.h> 28 #include <linux/oom.h> 29 30 #include <asm/local.h> 31 32 /* 33 * The "absolute" timestamp in the buffer is only 59 bits. 34 * If a clock has the 5 MSBs set, it needs to be saved and 35 * reinserted. 36 */ 37 #define TS_MSB (0xf8ULL << 56) 38 #define ABS_TS_MASK (~TS_MSB) 39 40 static void update_pages_handler(struct work_struct *work); 41 42 /* 43 * The ring buffer header is special. We must manually up keep it. 44 */ 45 int ring_buffer_print_entry_header(struct trace_seq *s) 46 { 47 trace_seq_puts(s, "# compressed entry header\n"); 48 trace_seq_puts(s, "\ttype_len : 5 bits\n"); 49 trace_seq_puts(s, "\ttime_delta : 27 bits\n"); 50 trace_seq_puts(s, "\tarray : 32 bits\n"); 51 trace_seq_putc(s, '\n'); 52 trace_seq_printf(s, "\tpadding : type == %d\n", 53 RINGBUF_TYPE_PADDING); 54 trace_seq_printf(s, "\ttime_extend : type == %d\n", 55 RINGBUF_TYPE_TIME_EXTEND); 56 trace_seq_printf(s, "\ttime_stamp : type == %d\n", 57 RINGBUF_TYPE_TIME_STAMP); 58 trace_seq_printf(s, "\tdata max type_len == %d\n", 59 RINGBUF_TYPE_DATA_TYPE_LEN_MAX); 60 61 return !trace_seq_has_overflowed(s); 62 } 63 64 /* 65 * The ring buffer is made up of a list of pages. A separate list of pages is 66 * allocated for each CPU. A writer may only write to a buffer that is 67 * associated with the CPU it is currently executing on. A reader may read 68 * from any per cpu buffer. 69 * 70 * The reader is special. For each per cpu buffer, the reader has its own 71 * reader page. When a reader has read the entire reader page, this reader 72 * page is swapped with another page in the ring buffer. 73 * 74 * Now, as long as the writer is off the reader page, the reader can do what 75 * ever it wants with that page. The writer will never write to that page 76 * again (as long as it is out of the ring buffer). 77 * 78 * Here's some silly ASCII art. 79 * 80 * +------+ 81 * |reader| RING BUFFER 82 * |page | 83 * +------+ +---+ +---+ +---+ 84 * | |-->| |-->| | 85 * +---+ +---+ +---+ 86 * ^ | 87 * | | 88 * +---------------+ 89 * 90 * 91 * +------+ 92 * |reader| RING BUFFER 93 * |page |------------------v 94 * +------+ +---+ +---+ +---+ 95 * | |-->| |-->| | 96 * +---+ +---+ +---+ 97 * ^ | 98 * | | 99 * +---------------+ 100 * 101 * 102 * +------+ 103 * |reader| RING BUFFER 104 * |page |------------------v 105 * +------+ +---+ +---+ +---+ 106 * ^ | |-->| |-->| | 107 * | +---+ +---+ +---+ 108 * | | 109 * | | 110 * +------------------------------+ 111 * 112 * 113 * +------+ 114 * |buffer| RING BUFFER 115 * |page |------------------v 116 * +------+ +---+ +---+ +---+ 117 * ^ | | | |-->| | 118 * | New +---+ +---+ +---+ 119 * | Reader------^ | 120 * | page | 121 * +------------------------------+ 122 * 123 * 124 * After we make this swap, the reader can hand this page off to the splice 125 * code and be done with it. It can even allocate a new page if it needs to 126 * and swap that into the ring buffer. 127 * 128 * We will be using cmpxchg soon to make all this lockless. 129 * 130 */ 131 132 /* Used for individual buffers (after the counter) */ 133 #define RB_BUFFER_OFF (1 << 20) 134 135 #define BUF_PAGE_HDR_SIZE offsetof(struct buffer_data_page, data) 136 137 #define RB_EVNT_HDR_SIZE (offsetof(struct ring_buffer_event, array)) 138 #define RB_ALIGNMENT 4U 139 #define RB_MAX_SMALL_DATA (RB_ALIGNMENT * RINGBUF_TYPE_DATA_TYPE_LEN_MAX) 140 #define RB_EVNT_MIN_SIZE 8U /* two 32bit words */ 141 142 #ifndef CONFIG_HAVE_64BIT_ALIGNED_ACCESS 143 # define RB_FORCE_8BYTE_ALIGNMENT 0 144 # define RB_ARCH_ALIGNMENT RB_ALIGNMENT 145 #else 146 # define RB_FORCE_8BYTE_ALIGNMENT 1 147 # define RB_ARCH_ALIGNMENT 8U 148 #endif 149 150 #define RB_ALIGN_DATA __aligned(RB_ARCH_ALIGNMENT) 151 152 /* define RINGBUF_TYPE_DATA for 'case RINGBUF_TYPE_DATA:' */ 153 #define RINGBUF_TYPE_DATA 0 ... RINGBUF_TYPE_DATA_TYPE_LEN_MAX 154 155 enum { 156 RB_LEN_TIME_EXTEND = 8, 157 RB_LEN_TIME_STAMP = 8, 158 }; 159 160 #define skip_time_extend(event) \ 161 ((struct ring_buffer_event *)((char *)event + RB_LEN_TIME_EXTEND)) 162 163 #define extended_time(event) \ 164 (event->type_len >= RINGBUF_TYPE_TIME_EXTEND) 165 166 static inline bool rb_null_event(struct ring_buffer_event *event) 167 { 168 return event->type_len == RINGBUF_TYPE_PADDING && !event->time_delta; 169 } 170 171 static void rb_event_set_padding(struct ring_buffer_event *event) 172 { 173 /* padding has a NULL time_delta */ 174 event->type_len = RINGBUF_TYPE_PADDING; 175 event->time_delta = 0; 176 } 177 178 static unsigned 179 rb_event_data_length(struct ring_buffer_event *event) 180 { 181 unsigned length; 182 183 if (event->type_len) 184 length = event->type_len * RB_ALIGNMENT; 185 else 186 length = event->array[0]; 187 return length + RB_EVNT_HDR_SIZE; 188 } 189 190 /* 191 * Return the length of the given event. Will return 192 * the length of the time extend if the event is a 193 * time extend. 194 */ 195 static inline unsigned 196 rb_event_length(struct ring_buffer_event *event) 197 { 198 switch (event->type_len) { 199 case RINGBUF_TYPE_PADDING: 200 if (rb_null_event(event)) 201 /* undefined */ 202 return -1; 203 return event->array[0] + RB_EVNT_HDR_SIZE; 204 205 case RINGBUF_TYPE_TIME_EXTEND: 206 return RB_LEN_TIME_EXTEND; 207 208 case RINGBUF_TYPE_TIME_STAMP: 209 return RB_LEN_TIME_STAMP; 210 211 case RINGBUF_TYPE_DATA: 212 return rb_event_data_length(event); 213 default: 214 WARN_ON_ONCE(1); 215 } 216 /* not hit */ 217 return 0; 218 } 219 220 /* 221 * Return total length of time extend and data, 222 * or just the event length for all other events. 223 */ 224 static inline unsigned 225 rb_event_ts_length(struct ring_buffer_event *event) 226 { 227 unsigned len = 0; 228 229 if (extended_time(event)) { 230 /* time extends include the data event after it */ 231 len = RB_LEN_TIME_EXTEND; 232 event = skip_time_extend(event); 233 } 234 return len + rb_event_length(event); 235 } 236 237 /** 238 * ring_buffer_event_length - return the length of the event 239 * @event: the event to get the length of 240 * 241 * Returns the size of the data load of a data event. 242 * If the event is something other than a data event, it 243 * returns the size of the event itself. With the exception 244 * of a TIME EXTEND, where it still returns the size of the 245 * data load of the data event after it. 246 */ 247 unsigned ring_buffer_event_length(struct ring_buffer_event *event) 248 { 249 unsigned length; 250 251 if (extended_time(event)) 252 event = skip_time_extend(event); 253 254 length = rb_event_length(event); 255 if (event->type_len > RINGBUF_TYPE_DATA_TYPE_LEN_MAX) 256 return length; 257 length -= RB_EVNT_HDR_SIZE; 258 if (length > RB_MAX_SMALL_DATA + sizeof(event->array[0])) 259 length -= sizeof(event->array[0]); 260 return length; 261 } 262 EXPORT_SYMBOL_GPL(ring_buffer_event_length); 263 264 /* inline for ring buffer fast paths */ 265 static __always_inline void * 266 rb_event_data(struct ring_buffer_event *event) 267 { 268 if (extended_time(event)) 269 event = skip_time_extend(event); 270 WARN_ON_ONCE(event->type_len > RINGBUF_TYPE_DATA_TYPE_LEN_MAX); 271 /* If length is in len field, then array[0] has the data */ 272 if (event->type_len) 273 return (void *)&event->array[0]; 274 /* Otherwise length is in array[0] and array[1] has the data */ 275 return (void *)&event->array[1]; 276 } 277 278 /** 279 * ring_buffer_event_data - return the data of the event 280 * @event: the event to get the data from 281 */ 282 void *ring_buffer_event_data(struct ring_buffer_event *event) 283 { 284 return rb_event_data(event); 285 } 286 EXPORT_SYMBOL_GPL(ring_buffer_event_data); 287 288 #define for_each_buffer_cpu(buffer, cpu) \ 289 for_each_cpu(cpu, buffer->cpumask) 290 291 #define for_each_online_buffer_cpu(buffer, cpu) \ 292 for_each_cpu_and(cpu, buffer->cpumask, cpu_online_mask) 293 294 #define TS_SHIFT 27 295 #define TS_MASK ((1ULL << TS_SHIFT) - 1) 296 #define TS_DELTA_TEST (~TS_MASK) 297 298 static u64 rb_event_time_stamp(struct ring_buffer_event *event) 299 { 300 u64 ts; 301 302 ts = event->array[0]; 303 ts <<= TS_SHIFT; 304 ts += event->time_delta; 305 306 return ts; 307 } 308 309 /* Flag when events were overwritten */ 310 #define RB_MISSED_EVENTS (1 << 31) 311 /* Missed count stored at end */ 312 #define RB_MISSED_STORED (1 << 30) 313 314 struct buffer_data_page { 315 u64 time_stamp; /* page time stamp */ 316 local_t commit; /* write committed index */ 317 unsigned char data[] RB_ALIGN_DATA; /* data of buffer page */ 318 }; 319 320 /* 321 * Note, the buffer_page list must be first. The buffer pages 322 * are allocated in cache lines, which means that each buffer 323 * page will be at the beginning of a cache line, and thus 324 * the least significant bits will be zero. We use this to 325 * add flags in the list struct pointers, to make the ring buffer 326 * lockless. 327 */ 328 struct buffer_page { 329 struct list_head list; /* list of buffer pages */ 330 local_t write; /* index for next write */ 331 unsigned read; /* index for next read */ 332 local_t entries; /* entries on this page */ 333 unsigned long real_end; /* real end of data */ 334 struct buffer_data_page *page; /* Actual data page */ 335 }; 336 337 /* 338 * The buffer page counters, write and entries, must be reset 339 * atomically when crossing page boundaries. To synchronize this 340 * update, two counters are inserted into the number. One is 341 * the actual counter for the write position or count on the page. 342 * 343 * The other is a counter of updaters. Before an update happens 344 * the update partition of the counter is incremented. This will 345 * allow the updater to update the counter atomically. 346 * 347 * The counter is 20 bits, and the state data is 12. 348 */ 349 #define RB_WRITE_MASK 0xfffff 350 #define RB_WRITE_INTCNT (1 << 20) 351 352 static void rb_init_page(struct buffer_data_page *bpage) 353 { 354 local_set(&bpage->commit, 0); 355 } 356 357 static void free_buffer_page(struct buffer_page *bpage) 358 { 359 free_page((unsigned long)bpage->page); 360 kfree(bpage); 361 } 362 363 /* 364 * We need to fit the time_stamp delta into 27 bits. 365 */ 366 static inline bool test_time_stamp(u64 delta) 367 { 368 return !!(delta & TS_DELTA_TEST); 369 } 370 371 #define BUF_PAGE_SIZE (PAGE_SIZE - BUF_PAGE_HDR_SIZE) 372 373 /* Max payload is BUF_PAGE_SIZE - header (8bytes) */ 374 #define BUF_MAX_DATA_SIZE (BUF_PAGE_SIZE - (sizeof(u32) * 2)) 375 376 int ring_buffer_print_page_header(struct trace_seq *s) 377 { 378 struct buffer_data_page field; 379 380 trace_seq_printf(s, "\tfield: u64 timestamp;\t" 381 "offset:0;\tsize:%u;\tsigned:%u;\n", 382 (unsigned int)sizeof(field.time_stamp), 383 (unsigned int)is_signed_type(u64)); 384 385 trace_seq_printf(s, "\tfield: local_t commit;\t" 386 "offset:%u;\tsize:%u;\tsigned:%u;\n", 387 (unsigned int)offsetof(typeof(field), commit), 388 (unsigned int)sizeof(field.commit), 389 (unsigned int)is_signed_type(long)); 390 391 trace_seq_printf(s, "\tfield: int overwrite;\t" 392 "offset:%u;\tsize:%u;\tsigned:%u;\n", 393 (unsigned int)offsetof(typeof(field), commit), 394 1, 395 (unsigned int)is_signed_type(long)); 396 397 trace_seq_printf(s, "\tfield: char data;\t" 398 "offset:%u;\tsize:%u;\tsigned:%u;\n", 399 (unsigned int)offsetof(typeof(field), data), 400 (unsigned int)BUF_PAGE_SIZE, 401 (unsigned int)is_signed_type(char)); 402 403 return !trace_seq_has_overflowed(s); 404 } 405 406 struct rb_irq_work { 407 struct irq_work work; 408 wait_queue_head_t waiters; 409 wait_queue_head_t full_waiters; 410 long wait_index; 411 bool waiters_pending; 412 bool full_waiters_pending; 413 bool wakeup_full; 414 }; 415 416 /* 417 * Structure to hold event state and handle nested events. 418 */ 419 struct rb_event_info { 420 u64 ts; 421 u64 delta; 422 u64 before; 423 u64 after; 424 unsigned long length; 425 struct buffer_page *tail_page; 426 int add_timestamp; 427 }; 428 429 /* 430 * Used for the add_timestamp 431 * NONE 432 * EXTEND - wants a time extend 433 * ABSOLUTE - the buffer requests all events to have absolute time stamps 434 * FORCE - force a full time stamp. 435 */ 436 enum { 437 RB_ADD_STAMP_NONE = 0, 438 RB_ADD_STAMP_EXTEND = BIT(1), 439 RB_ADD_STAMP_ABSOLUTE = BIT(2), 440 RB_ADD_STAMP_FORCE = BIT(3) 441 }; 442 /* 443 * Used for which event context the event is in. 444 * TRANSITION = 0 445 * NMI = 1 446 * IRQ = 2 447 * SOFTIRQ = 3 448 * NORMAL = 4 449 * 450 * See trace_recursive_lock() comment below for more details. 451 */ 452 enum { 453 RB_CTX_TRANSITION, 454 RB_CTX_NMI, 455 RB_CTX_IRQ, 456 RB_CTX_SOFTIRQ, 457 RB_CTX_NORMAL, 458 RB_CTX_MAX 459 }; 460 461 #if BITS_PER_LONG == 32 462 #define RB_TIME_32 463 #endif 464 465 /* To test on 64 bit machines */ 466 //#define RB_TIME_32 467 468 #ifdef RB_TIME_32 469 470 struct rb_time_struct { 471 local_t cnt; 472 local_t top; 473 local_t bottom; 474 local_t msb; 475 }; 476 #else 477 #include <asm/local64.h> 478 struct rb_time_struct { 479 local64_t time; 480 }; 481 #endif 482 typedef struct rb_time_struct rb_time_t; 483 484 #define MAX_NEST 5 485 486 /* 487 * head_page == tail_page && head == tail then buffer is empty. 488 */ 489 struct ring_buffer_per_cpu { 490 int cpu; 491 atomic_t record_disabled; 492 atomic_t resize_disabled; 493 struct trace_buffer *buffer; 494 raw_spinlock_t reader_lock; /* serialize readers */ 495 arch_spinlock_t lock; 496 struct lock_class_key lock_key; 497 struct buffer_data_page *free_page; 498 unsigned long nr_pages; 499 unsigned int current_context; 500 struct list_head *pages; 501 struct buffer_page *head_page; /* read from head */ 502 struct buffer_page *tail_page; /* write to tail */ 503 struct buffer_page *commit_page; /* committed pages */ 504 struct buffer_page *reader_page; 505 unsigned long lost_events; 506 unsigned long last_overrun; 507 unsigned long nest; 508 local_t entries_bytes; 509 local_t entries; 510 local_t overrun; 511 local_t commit_overrun; 512 local_t dropped_events; 513 local_t committing; 514 local_t commits; 515 local_t pages_touched; 516 local_t pages_lost; 517 local_t pages_read; 518 long last_pages_touch; 519 size_t shortest_full; 520 unsigned long read; 521 unsigned long read_bytes; 522 rb_time_t write_stamp; 523 rb_time_t before_stamp; 524 u64 event_stamp[MAX_NEST]; 525 u64 read_stamp; 526 /* ring buffer pages to update, > 0 to add, < 0 to remove */ 527 long nr_pages_to_update; 528 struct list_head new_pages; /* new pages to add */ 529 struct work_struct update_pages_work; 530 struct completion update_done; 531 532 struct rb_irq_work irq_work; 533 }; 534 535 struct trace_buffer { 536 unsigned flags; 537 int cpus; 538 atomic_t record_disabled; 539 cpumask_var_t cpumask; 540 541 struct lock_class_key *reader_lock_key; 542 543 struct mutex mutex; 544 545 struct ring_buffer_per_cpu **buffers; 546 547 struct hlist_node node; 548 u64 (*clock)(void); 549 550 struct rb_irq_work irq_work; 551 bool time_stamp_abs; 552 }; 553 554 struct ring_buffer_iter { 555 struct ring_buffer_per_cpu *cpu_buffer; 556 unsigned long head; 557 unsigned long next_event; 558 struct buffer_page *head_page; 559 struct buffer_page *cache_reader_page; 560 unsigned long cache_read; 561 u64 read_stamp; 562 u64 page_stamp; 563 struct ring_buffer_event *event; 564 int missed_events; 565 }; 566 567 #ifdef RB_TIME_32 568 569 /* 570 * On 32 bit machines, local64_t is very expensive. As the ring 571 * buffer doesn't need all the features of a true 64 bit atomic, 572 * on 32 bit, it uses these functions (64 still uses local64_t). 573 * 574 * For the ring buffer, 64 bit required operations for the time is 575 * the following: 576 * 577 * - Reads may fail if it interrupted a modification of the time stamp. 578 * It will succeed if it did not interrupt another write even if 579 * the read itself is interrupted by a write. 580 * It returns whether it was successful or not. 581 * 582 * - Writes always succeed and will overwrite other writes and writes 583 * that were done by events interrupting the current write. 584 * 585 * - A write followed by a read of the same time stamp will always succeed, 586 * but may not contain the same value. 587 * 588 * - A cmpxchg will fail if it interrupted another write or cmpxchg. 589 * Other than that, it acts like a normal cmpxchg. 590 * 591 * The 60 bit time stamp is broken up by 30 bits in a top and bottom half 592 * (bottom being the least significant 30 bits of the 60 bit time stamp). 593 * 594 * The two most significant bits of each half holds a 2 bit counter (0-3). 595 * Each update will increment this counter by one. 596 * When reading the top and bottom, if the two counter bits match then the 597 * top and bottom together make a valid 60 bit number. 598 */ 599 #define RB_TIME_SHIFT 30 600 #define RB_TIME_VAL_MASK ((1 << RB_TIME_SHIFT) - 1) 601 #define RB_TIME_MSB_SHIFT 60 602 603 static inline int rb_time_cnt(unsigned long val) 604 { 605 return (val >> RB_TIME_SHIFT) & 3; 606 } 607 608 static inline u64 rb_time_val(unsigned long top, unsigned long bottom) 609 { 610 u64 val; 611 612 val = top & RB_TIME_VAL_MASK; 613 val <<= RB_TIME_SHIFT; 614 val |= bottom & RB_TIME_VAL_MASK; 615 616 return val; 617 } 618 619 static inline bool __rb_time_read(rb_time_t *t, u64 *ret, unsigned long *cnt) 620 { 621 unsigned long top, bottom, msb; 622 unsigned long c; 623 624 /* 625 * If the read is interrupted by a write, then the cnt will 626 * be different. Loop until both top and bottom have been read 627 * without interruption. 628 */ 629 do { 630 c = local_read(&t->cnt); 631 top = local_read(&t->top); 632 bottom = local_read(&t->bottom); 633 msb = local_read(&t->msb); 634 } while (c != local_read(&t->cnt)); 635 636 *cnt = rb_time_cnt(top); 637 638 /* If top and bottom counts don't match, this interrupted a write */ 639 if (*cnt != rb_time_cnt(bottom)) 640 return false; 641 642 /* The shift to msb will lose its cnt bits */ 643 *ret = rb_time_val(top, bottom) | ((u64)msb << RB_TIME_MSB_SHIFT); 644 return true; 645 } 646 647 static bool rb_time_read(rb_time_t *t, u64 *ret) 648 { 649 unsigned long cnt; 650 651 return __rb_time_read(t, ret, &cnt); 652 } 653 654 static inline unsigned long rb_time_val_cnt(unsigned long val, unsigned long cnt) 655 { 656 return (val & RB_TIME_VAL_MASK) | ((cnt & 3) << RB_TIME_SHIFT); 657 } 658 659 static inline void rb_time_split(u64 val, unsigned long *top, unsigned long *bottom, 660 unsigned long *msb) 661 { 662 *top = (unsigned long)((val >> RB_TIME_SHIFT) & RB_TIME_VAL_MASK); 663 *bottom = (unsigned long)(val & RB_TIME_VAL_MASK); 664 *msb = (unsigned long)(val >> RB_TIME_MSB_SHIFT); 665 } 666 667 static inline void rb_time_val_set(local_t *t, unsigned long val, unsigned long cnt) 668 { 669 val = rb_time_val_cnt(val, cnt); 670 local_set(t, val); 671 } 672 673 static void rb_time_set(rb_time_t *t, u64 val) 674 { 675 unsigned long cnt, top, bottom, msb; 676 677 rb_time_split(val, &top, &bottom, &msb); 678 679 /* Writes always succeed with a valid number even if it gets interrupted. */ 680 do { 681 cnt = local_inc_return(&t->cnt); 682 rb_time_val_set(&t->top, top, cnt); 683 rb_time_val_set(&t->bottom, bottom, cnt); 684 rb_time_val_set(&t->msb, val >> RB_TIME_MSB_SHIFT, cnt); 685 } while (cnt != local_read(&t->cnt)); 686 } 687 688 static inline bool 689 rb_time_read_cmpxchg(local_t *l, unsigned long expect, unsigned long set) 690 { 691 unsigned long ret; 692 693 ret = local_cmpxchg(l, expect, set); 694 return ret == expect; 695 } 696 697 static bool rb_time_cmpxchg(rb_time_t *t, u64 expect, u64 set) 698 { 699 unsigned long cnt, top, bottom, msb; 700 unsigned long cnt2, top2, bottom2, msb2; 701 u64 val; 702 703 /* The cmpxchg always fails if it interrupted an update */ 704 if (!__rb_time_read(t, &val, &cnt2)) 705 return false; 706 707 if (val != expect) 708 return false; 709 710 cnt = local_read(&t->cnt); 711 if ((cnt & 3) != cnt2) 712 return false; 713 714 cnt2 = cnt + 1; 715 716 rb_time_split(val, &top, &bottom, &msb); 717 top = rb_time_val_cnt(top, cnt); 718 bottom = rb_time_val_cnt(bottom, cnt); 719 720 rb_time_split(set, &top2, &bottom2, &msb2); 721 top2 = rb_time_val_cnt(top2, cnt2); 722 bottom2 = rb_time_val_cnt(bottom2, cnt2); 723 724 if (!rb_time_read_cmpxchg(&t->cnt, cnt, cnt2)) 725 return false; 726 if (!rb_time_read_cmpxchg(&t->msb, msb, msb2)) 727 return false; 728 if (!rb_time_read_cmpxchg(&t->top, top, top2)) 729 return false; 730 if (!rb_time_read_cmpxchg(&t->bottom, bottom, bottom2)) 731 return false; 732 return true; 733 } 734 735 #else /* 64 bits */ 736 737 /* local64_t always succeeds */ 738 739 static inline bool rb_time_read(rb_time_t *t, u64 *ret) 740 { 741 *ret = local64_read(&t->time); 742 return true; 743 } 744 static void rb_time_set(rb_time_t *t, u64 val) 745 { 746 local64_set(&t->time, val); 747 } 748 749 static bool rb_time_cmpxchg(rb_time_t *t, u64 expect, u64 set) 750 { 751 u64 val; 752 val = local64_cmpxchg(&t->time, expect, set); 753 return val == expect; 754 } 755 #endif 756 757 /* 758 * Enable this to make sure that the event passed to 759 * ring_buffer_event_time_stamp() is not committed and also 760 * is on the buffer that it passed in. 761 */ 762 //#define RB_VERIFY_EVENT 763 #ifdef RB_VERIFY_EVENT 764 static struct list_head *rb_list_head(struct list_head *list); 765 static void verify_event(struct ring_buffer_per_cpu *cpu_buffer, 766 void *event) 767 { 768 struct buffer_page *page = cpu_buffer->commit_page; 769 struct buffer_page *tail_page = READ_ONCE(cpu_buffer->tail_page); 770 struct list_head *next; 771 long commit, write; 772 unsigned long addr = (unsigned long)event; 773 bool done = false; 774 int stop = 0; 775 776 /* Make sure the event exists and is not committed yet */ 777 do { 778 if (page == tail_page || WARN_ON_ONCE(stop++ > 100)) 779 done = true; 780 commit = local_read(&page->page->commit); 781 write = local_read(&page->write); 782 if (addr >= (unsigned long)&page->page->data[commit] && 783 addr < (unsigned long)&page->page->data[write]) 784 return; 785 786 next = rb_list_head(page->list.next); 787 page = list_entry(next, struct buffer_page, list); 788 } while (!done); 789 WARN_ON_ONCE(1); 790 } 791 #else 792 static inline void verify_event(struct ring_buffer_per_cpu *cpu_buffer, 793 void *event) 794 { 795 } 796 #endif 797 798 /* 799 * The absolute time stamp drops the 5 MSBs and some clocks may 800 * require them. The rb_fix_abs_ts() will take a previous full 801 * time stamp, and add the 5 MSB of that time stamp on to the 802 * saved absolute time stamp. Then they are compared in case of 803 * the unlikely event that the latest time stamp incremented 804 * the 5 MSB. 805 */ 806 static inline u64 rb_fix_abs_ts(u64 abs, u64 save_ts) 807 { 808 if (save_ts & TS_MSB) { 809 abs |= save_ts & TS_MSB; 810 /* Check for overflow */ 811 if (unlikely(abs < save_ts)) 812 abs += 1ULL << 59; 813 } 814 return abs; 815 } 816 817 static inline u64 rb_time_stamp(struct trace_buffer *buffer); 818 819 /** 820 * ring_buffer_event_time_stamp - return the event's current time stamp 821 * @buffer: The buffer that the event is on 822 * @event: the event to get the time stamp of 823 * 824 * Note, this must be called after @event is reserved, and before it is 825 * committed to the ring buffer. And must be called from the same 826 * context where the event was reserved (normal, softirq, irq, etc). 827 * 828 * Returns the time stamp associated with the current event. 829 * If the event has an extended time stamp, then that is used as 830 * the time stamp to return. 831 * In the highly unlikely case that the event was nested more than 832 * the max nesting, then the write_stamp of the buffer is returned, 833 * otherwise current time is returned, but that really neither of 834 * the last two cases should ever happen. 835 */ 836 u64 ring_buffer_event_time_stamp(struct trace_buffer *buffer, 837 struct ring_buffer_event *event) 838 { 839 struct ring_buffer_per_cpu *cpu_buffer = buffer->buffers[smp_processor_id()]; 840 unsigned int nest; 841 u64 ts; 842 843 /* If the event includes an absolute time, then just use that */ 844 if (event->type_len == RINGBUF_TYPE_TIME_STAMP) { 845 ts = rb_event_time_stamp(event); 846 return rb_fix_abs_ts(ts, cpu_buffer->tail_page->page->time_stamp); 847 } 848 849 nest = local_read(&cpu_buffer->committing); 850 verify_event(cpu_buffer, event); 851 if (WARN_ON_ONCE(!nest)) 852 goto fail; 853 854 /* Read the current saved nesting level time stamp */ 855 if (likely(--nest < MAX_NEST)) 856 return cpu_buffer->event_stamp[nest]; 857 858 /* Shouldn't happen, warn if it does */ 859 WARN_ONCE(1, "nest (%d) greater than max", nest); 860 861 fail: 862 /* Can only fail on 32 bit */ 863 if (!rb_time_read(&cpu_buffer->write_stamp, &ts)) 864 /* Screw it, just read the current time */ 865 ts = rb_time_stamp(cpu_buffer->buffer); 866 867 return ts; 868 } 869 870 /** 871 * ring_buffer_nr_pages - get the number of buffer pages in the ring buffer 872 * @buffer: The ring_buffer to get the number of pages from 873 * @cpu: The cpu of the ring_buffer to get the number of pages from 874 * 875 * Returns the number of pages used by a per_cpu buffer of the ring buffer. 876 */ 877 size_t ring_buffer_nr_pages(struct trace_buffer *buffer, int cpu) 878 { 879 return buffer->buffers[cpu]->nr_pages; 880 } 881 882 /** 883 * ring_buffer_nr_dirty_pages - get the number of used pages in the ring buffer 884 * @buffer: The ring_buffer to get the number of pages from 885 * @cpu: The cpu of the ring_buffer to get the number of pages from 886 * 887 * Returns the number of pages that have content in the ring buffer. 888 */ 889 size_t ring_buffer_nr_dirty_pages(struct trace_buffer *buffer, int cpu) 890 { 891 size_t read; 892 size_t lost; 893 size_t cnt; 894 895 read = local_read(&buffer->buffers[cpu]->pages_read); 896 lost = local_read(&buffer->buffers[cpu]->pages_lost); 897 cnt = local_read(&buffer->buffers[cpu]->pages_touched); 898 899 if (WARN_ON_ONCE(cnt < lost)) 900 return 0; 901 902 cnt -= lost; 903 904 /* The reader can read an empty page, but not more than that */ 905 if (cnt < read) { 906 WARN_ON_ONCE(read > cnt + 1); 907 return 0; 908 } 909 910 return cnt - read; 911 } 912 913 static __always_inline bool full_hit(struct trace_buffer *buffer, int cpu, int full) 914 { 915 struct ring_buffer_per_cpu *cpu_buffer = buffer->buffers[cpu]; 916 size_t nr_pages; 917 size_t dirty; 918 919 nr_pages = cpu_buffer->nr_pages; 920 if (!nr_pages || !full) 921 return true; 922 923 dirty = ring_buffer_nr_dirty_pages(buffer, cpu); 924 925 return (dirty * 100) > (full * nr_pages); 926 } 927 928 /* 929 * rb_wake_up_waiters - wake up tasks waiting for ring buffer input 930 * 931 * Schedules a delayed work to wake up any task that is blocked on the 932 * ring buffer waiters queue. 933 */ 934 static void rb_wake_up_waiters(struct irq_work *work) 935 { 936 struct rb_irq_work *rbwork = container_of(work, struct rb_irq_work, work); 937 938 wake_up_all(&rbwork->waiters); 939 if (rbwork->full_waiters_pending || rbwork->wakeup_full) { 940 rbwork->wakeup_full = false; 941 rbwork->full_waiters_pending = false; 942 wake_up_all(&rbwork->full_waiters); 943 } 944 } 945 946 /** 947 * ring_buffer_wake_waiters - wake up any waiters on this ring buffer 948 * @buffer: The ring buffer to wake waiters on 949 * 950 * In the case of a file that represents a ring buffer is closing, 951 * it is prudent to wake up any waiters that are on this. 952 */ 953 void ring_buffer_wake_waiters(struct trace_buffer *buffer, int cpu) 954 { 955 struct ring_buffer_per_cpu *cpu_buffer; 956 struct rb_irq_work *rbwork; 957 958 if (!buffer) 959 return; 960 961 if (cpu == RING_BUFFER_ALL_CPUS) { 962 963 /* Wake up individual ones too. One level recursion */ 964 for_each_buffer_cpu(buffer, cpu) 965 ring_buffer_wake_waiters(buffer, cpu); 966 967 rbwork = &buffer->irq_work; 968 } else { 969 if (WARN_ON_ONCE(!buffer->buffers)) 970 return; 971 if (WARN_ON_ONCE(cpu >= nr_cpu_ids)) 972 return; 973 974 cpu_buffer = buffer->buffers[cpu]; 975 /* The CPU buffer may not have been initialized yet */ 976 if (!cpu_buffer) 977 return; 978 rbwork = &cpu_buffer->irq_work; 979 } 980 981 rbwork->wait_index++; 982 /* make sure the waiters see the new index */ 983 smp_wmb(); 984 985 rb_wake_up_waiters(&rbwork->work); 986 } 987 988 /** 989 * ring_buffer_wait - wait for input to the ring buffer 990 * @buffer: buffer to wait on 991 * @cpu: the cpu buffer to wait on 992 * @full: wait until the percentage of pages are available, if @cpu != RING_BUFFER_ALL_CPUS 993 * 994 * If @cpu == RING_BUFFER_ALL_CPUS then the task will wake up as soon 995 * as data is added to any of the @buffer's cpu buffers. Otherwise 996 * it will wait for data to be added to a specific cpu buffer. 997 */ 998 int ring_buffer_wait(struct trace_buffer *buffer, int cpu, int full) 999 { 1000 struct ring_buffer_per_cpu *cpu_buffer; 1001 DEFINE_WAIT(wait); 1002 struct rb_irq_work *work; 1003 long wait_index; 1004 int ret = 0; 1005 1006 /* 1007 * Depending on what the caller is waiting for, either any 1008 * data in any cpu buffer, or a specific buffer, put the 1009 * caller on the appropriate wait queue. 1010 */ 1011 if (cpu == RING_BUFFER_ALL_CPUS) { 1012 work = &buffer->irq_work; 1013 /* Full only makes sense on per cpu reads */ 1014 full = 0; 1015 } else { 1016 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 1017 return -ENODEV; 1018 cpu_buffer = buffer->buffers[cpu]; 1019 work = &cpu_buffer->irq_work; 1020 } 1021 1022 wait_index = READ_ONCE(work->wait_index); 1023 1024 while (true) { 1025 if (full) 1026 prepare_to_wait(&work->full_waiters, &wait, TASK_INTERRUPTIBLE); 1027 else 1028 prepare_to_wait(&work->waiters, &wait, TASK_INTERRUPTIBLE); 1029 1030 /* 1031 * The events can happen in critical sections where 1032 * checking a work queue can cause deadlocks. 1033 * After adding a task to the queue, this flag is set 1034 * only to notify events to try to wake up the queue 1035 * using irq_work. 1036 * 1037 * We don't clear it even if the buffer is no longer 1038 * empty. The flag only causes the next event to run 1039 * irq_work to do the work queue wake up. The worse 1040 * that can happen if we race with !trace_empty() is that 1041 * an event will cause an irq_work to try to wake up 1042 * an empty queue. 1043 * 1044 * There's no reason to protect this flag either, as 1045 * the work queue and irq_work logic will do the necessary 1046 * synchronization for the wake ups. The only thing 1047 * that is necessary is that the wake up happens after 1048 * a task has been queued. It's OK for spurious wake ups. 1049 */ 1050 if (full) 1051 work->full_waiters_pending = true; 1052 else 1053 work->waiters_pending = true; 1054 1055 if (signal_pending(current)) { 1056 ret = -EINTR; 1057 break; 1058 } 1059 1060 if (cpu == RING_BUFFER_ALL_CPUS && !ring_buffer_empty(buffer)) 1061 break; 1062 1063 if (cpu != RING_BUFFER_ALL_CPUS && 1064 !ring_buffer_empty_cpu(buffer, cpu)) { 1065 unsigned long flags; 1066 bool pagebusy; 1067 bool done; 1068 1069 if (!full) 1070 break; 1071 1072 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 1073 pagebusy = cpu_buffer->reader_page == cpu_buffer->commit_page; 1074 done = !pagebusy && full_hit(buffer, cpu, full); 1075 1076 if (!cpu_buffer->shortest_full || 1077 cpu_buffer->shortest_full > full) 1078 cpu_buffer->shortest_full = full; 1079 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 1080 if (done) 1081 break; 1082 } 1083 1084 schedule(); 1085 1086 /* Make sure to see the new wait index */ 1087 smp_rmb(); 1088 if (wait_index != work->wait_index) 1089 break; 1090 } 1091 1092 if (full) 1093 finish_wait(&work->full_waiters, &wait); 1094 else 1095 finish_wait(&work->waiters, &wait); 1096 1097 return ret; 1098 } 1099 1100 /** 1101 * ring_buffer_poll_wait - poll on buffer input 1102 * @buffer: buffer to wait on 1103 * @cpu: the cpu buffer to wait on 1104 * @filp: the file descriptor 1105 * @poll_table: The poll descriptor 1106 * @full: wait until the percentage of pages are available, if @cpu != RING_BUFFER_ALL_CPUS 1107 * 1108 * If @cpu == RING_BUFFER_ALL_CPUS then the task will wake up as soon 1109 * as data is added to any of the @buffer's cpu buffers. Otherwise 1110 * it will wait for data to be added to a specific cpu buffer. 1111 * 1112 * Returns EPOLLIN | EPOLLRDNORM if data exists in the buffers, 1113 * zero otherwise. 1114 */ 1115 __poll_t ring_buffer_poll_wait(struct trace_buffer *buffer, int cpu, 1116 struct file *filp, poll_table *poll_table, int full) 1117 { 1118 struct ring_buffer_per_cpu *cpu_buffer; 1119 struct rb_irq_work *work; 1120 1121 if (cpu == RING_BUFFER_ALL_CPUS) { 1122 work = &buffer->irq_work; 1123 full = 0; 1124 } else { 1125 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 1126 return -EINVAL; 1127 1128 cpu_buffer = buffer->buffers[cpu]; 1129 work = &cpu_buffer->irq_work; 1130 } 1131 1132 if (full) { 1133 poll_wait(filp, &work->full_waiters, poll_table); 1134 work->full_waiters_pending = true; 1135 } else { 1136 poll_wait(filp, &work->waiters, poll_table); 1137 work->waiters_pending = true; 1138 } 1139 1140 /* 1141 * There's a tight race between setting the waiters_pending and 1142 * checking if the ring buffer is empty. Once the waiters_pending bit 1143 * is set, the next event will wake the task up, but we can get stuck 1144 * if there's only a single event in. 1145 * 1146 * FIXME: Ideally, we need a memory barrier on the writer side as well, 1147 * but adding a memory barrier to all events will cause too much of a 1148 * performance hit in the fast path. We only need a memory barrier when 1149 * the buffer goes from empty to having content. But as this race is 1150 * extremely small, and it's not a problem if another event comes in, we 1151 * will fix it later. 1152 */ 1153 smp_mb(); 1154 1155 if (full) 1156 return full_hit(buffer, cpu, full) ? EPOLLIN | EPOLLRDNORM : 0; 1157 1158 if ((cpu == RING_BUFFER_ALL_CPUS && !ring_buffer_empty(buffer)) || 1159 (cpu != RING_BUFFER_ALL_CPUS && !ring_buffer_empty_cpu(buffer, cpu))) 1160 return EPOLLIN | EPOLLRDNORM; 1161 return 0; 1162 } 1163 1164 /* buffer may be either ring_buffer or ring_buffer_per_cpu */ 1165 #define RB_WARN_ON(b, cond) \ 1166 ({ \ 1167 int _____ret = unlikely(cond); \ 1168 if (_____ret) { \ 1169 if (__same_type(*(b), struct ring_buffer_per_cpu)) { \ 1170 struct ring_buffer_per_cpu *__b = \ 1171 (void *)b; \ 1172 atomic_inc(&__b->buffer->record_disabled); \ 1173 } else \ 1174 atomic_inc(&b->record_disabled); \ 1175 WARN_ON(1); \ 1176 } \ 1177 _____ret; \ 1178 }) 1179 1180 /* Up this if you want to test the TIME_EXTENTS and normalization */ 1181 #define DEBUG_SHIFT 0 1182 1183 static inline u64 rb_time_stamp(struct trace_buffer *buffer) 1184 { 1185 u64 ts; 1186 1187 /* Skip retpolines :-( */ 1188 if (IS_ENABLED(CONFIG_RETPOLINE) && likely(buffer->clock == trace_clock_local)) 1189 ts = trace_clock_local(); 1190 else 1191 ts = buffer->clock(); 1192 1193 /* shift to debug/test normalization and TIME_EXTENTS */ 1194 return ts << DEBUG_SHIFT; 1195 } 1196 1197 u64 ring_buffer_time_stamp(struct trace_buffer *buffer) 1198 { 1199 u64 time; 1200 1201 preempt_disable_notrace(); 1202 time = rb_time_stamp(buffer); 1203 preempt_enable_notrace(); 1204 1205 return time; 1206 } 1207 EXPORT_SYMBOL_GPL(ring_buffer_time_stamp); 1208 1209 void ring_buffer_normalize_time_stamp(struct trace_buffer *buffer, 1210 int cpu, u64 *ts) 1211 { 1212 /* Just stupid testing the normalize function and deltas */ 1213 *ts >>= DEBUG_SHIFT; 1214 } 1215 EXPORT_SYMBOL_GPL(ring_buffer_normalize_time_stamp); 1216 1217 /* 1218 * Making the ring buffer lockless makes things tricky. 1219 * Although writes only happen on the CPU that they are on, 1220 * and they only need to worry about interrupts. Reads can 1221 * happen on any CPU. 1222 * 1223 * The reader page is always off the ring buffer, but when the 1224 * reader finishes with a page, it needs to swap its page with 1225 * a new one from the buffer. The reader needs to take from 1226 * the head (writes go to the tail). But if a writer is in overwrite 1227 * mode and wraps, it must push the head page forward. 1228 * 1229 * Here lies the problem. 1230 * 1231 * The reader must be careful to replace only the head page, and 1232 * not another one. As described at the top of the file in the 1233 * ASCII art, the reader sets its old page to point to the next 1234 * page after head. It then sets the page after head to point to 1235 * the old reader page. But if the writer moves the head page 1236 * during this operation, the reader could end up with the tail. 1237 * 1238 * We use cmpxchg to help prevent this race. We also do something 1239 * special with the page before head. We set the LSB to 1. 1240 * 1241 * When the writer must push the page forward, it will clear the 1242 * bit that points to the head page, move the head, and then set 1243 * the bit that points to the new head page. 1244 * 1245 * We also don't want an interrupt coming in and moving the head 1246 * page on another writer. Thus we use the second LSB to catch 1247 * that too. Thus: 1248 * 1249 * head->list->prev->next bit 1 bit 0 1250 * ------- ------- 1251 * Normal page 0 0 1252 * Points to head page 0 1 1253 * New head page 1 0 1254 * 1255 * Note we can not trust the prev pointer of the head page, because: 1256 * 1257 * +----+ +-----+ +-----+ 1258 * | |------>| T |---X--->| N | 1259 * | |<------| | | | 1260 * +----+ +-----+ +-----+ 1261 * ^ ^ | 1262 * | +-----+ | | 1263 * +----------| R |----------+ | 1264 * | |<-----------+ 1265 * +-----+ 1266 * 1267 * Key: ---X--> HEAD flag set in pointer 1268 * T Tail page 1269 * R Reader page 1270 * N Next page 1271 * 1272 * (see __rb_reserve_next() to see where this happens) 1273 * 1274 * What the above shows is that the reader just swapped out 1275 * the reader page with a page in the buffer, but before it 1276 * could make the new header point back to the new page added 1277 * it was preempted by a writer. The writer moved forward onto 1278 * the new page added by the reader and is about to move forward 1279 * again. 1280 * 1281 * You can see, it is legitimate for the previous pointer of 1282 * the head (or any page) not to point back to itself. But only 1283 * temporarily. 1284 */ 1285 1286 #define RB_PAGE_NORMAL 0UL 1287 #define RB_PAGE_HEAD 1UL 1288 #define RB_PAGE_UPDATE 2UL 1289 1290 1291 #define RB_FLAG_MASK 3UL 1292 1293 /* PAGE_MOVED is not part of the mask */ 1294 #define RB_PAGE_MOVED 4UL 1295 1296 /* 1297 * rb_list_head - remove any bit 1298 */ 1299 static struct list_head *rb_list_head(struct list_head *list) 1300 { 1301 unsigned long val = (unsigned long)list; 1302 1303 return (struct list_head *)(val & ~RB_FLAG_MASK); 1304 } 1305 1306 /* 1307 * rb_is_head_page - test if the given page is the head page 1308 * 1309 * Because the reader may move the head_page pointer, we can 1310 * not trust what the head page is (it may be pointing to 1311 * the reader page). But if the next page is a header page, 1312 * its flags will be non zero. 1313 */ 1314 static inline int 1315 rb_is_head_page(struct buffer_page *page, struct list_head *list) 1316 { 1317 unsigned long val; 1318 1319 val = (unsigned long)list->next; 1320 1321 if ((val & ~RB_FLAG_MASK) != (unsigned long)&page->list) 1322 return RB_PAGE_MOVED; 1323 1324 return val & RB_FLAG_MASK; 1325 } 1326 1327 /* 1328 * rb_is_reader_page 1329 * 1330 * The unique thing about the reader page, is that, if the 1331 * writer is ever on it, the previous pointer never points 1332 * back to the reader page. 1333 */ 1334 static bool rb_is_reader_page(struct buffer_page *page) 1335 { 1336 struct list_head *list = page->list.prev; 1337 1338 return rb_list_head(list->next) != &page->list; 1339 } 1340 1341 /* 1342 * rb_set_list_to_head - set a list_head to be pointing to head. 1343 */ 1344 static void rb_set_list_to_head(struct list_head *list) 1345 { 1346 unsigned long *ptr; 1347 1348 ptr = (unsigned long *)&list->next; 1349 *ptr |= RB_PAGE_HEAD; 1350 *ptr &= ~RB_PAGE_UPDATE; 1351 } 1352 1353 /* 1354 * rb_head_page_activate - sets up head page 1355 */ 1356 static void rb_head_page_activate(struct ring_buffer_per_cpu *cpu_buffer) 1357 { 1358 struct buffer_page *head; 1359 1360 head = cpu_buffer->head_page; 1361 if (!head) 1362 return; 1363 1364 /* 1365 * Set the previous list pointer to have the HEAD flag. 1366 */ 1367 rb_set_list_to_head(head->list.prev); 1368 } 1369 1370 static void rb_list_head_clear(struct list_head *list) 1371 { 1372 unsigned long *ptr = (unsigned long *)&list->next; 1373 1374 *ptr &= ~RB_FLAG_MASK; 1375 } 1376 1377 /* 1378 * rb_head_page_deactivate - clears head page ptr (for free list) 1379 */ 1380 static void 1381 rb_head_page_deactivate(struct ring_buffer_per_cpu *cpu_buffer) 1382 { 1383 struct list_head *hd; 1384 1385 /* Go through the whole list and clear any pointers found. */ 1386 rb_list_head_clear(cpu_buffer->pages); 1387 1388 list_for_each(hd, cpu_buffer->pages) 1389 rb_list_head_clear(hd); 1390 } 1391 1392 static int rb_head_page_set(struct ring_buffer_per_cpu *cpu_buffer, 1393 struct buffer_page *head, 1394 struct buffer_page *prev, 1395 int old_flag, int new_flag) 1396 { 1397 struct list_head *list; 1398 unsigned long val = (unsigned long)&head->list; 1399 unsigned long ret; 1400 1401 list = &prev->list; 1402 1403 val &= ~RB_FLAG_MASK; 1404 1405 ret = cmpxchg((unsigned long *)&list->next, 1406 val | old_flag, val | new_flag); 1407 1408 /* check if the reader took the page */ 1409 if ((ret & ~RB_FLAG_MASK) != val) 1410 return RB_PAGE_MOVED; 1411 1412 return ret & RB_FLAG_MASK; 1413 } 1414 1415 static int rb_head_page_set_update(struct ring_buffer_per_cpu *cpu_buffer, 1416 struct buffer_page *head, 1417 struct buffer_page *prev, 1418 int old_flag) 1419 { 1420 return rb_head_page_set(cpu_buffer, head, prev, 1421 old_flag, RB_PAGE_UPDATE); 1422 } 1423 1424 static int rb_head_page_set_head(struct ring_buffer_per_cpu *cpu_buffer, 1425 struct buffer_page *head, 1426 struct buffer_page *prev, 1427 int old_flag) 1428 { 1429 return rb_head_page_set(cpu_buffer, head, prev, 1430 old_flag, RB_PAGE_HEAD); 1431 } 1432 1433 static int rb_head_page_set_normal(struct ring_buffer_per_cpu *cpu_buffer, 1434 struct buffer_page *head, 1435 struct buffer_page *prev, 1436 int old_flag) 1437 { 1438 return rb_head_page_set(cpu_buffer, head, prev, 1439 old_flag, RB_PAGE_NORMAL); 1440 } 1441 1442 static inline void rb_inc_page(struct buffer_page **bpage) 1443 { 1444 struct list_head *p = rb_list_head((*bpage)->list.next); 1445 1446 *bpage = list_entry(p, struct buffer_page, list); 1447 } 1448 1449 static struct buffer_page * 1450 rb_set_head_page(struct ring_buffer_per_cpu *cpu_buffer) 1451 { 1452 struct buffer_page *head; 1453 struct buffer_page *page; 1454 struct list_head *list; 1455 int i; 1456 1457 if (RB_WARN_ON(cpu_buffer, !cpu_buffer->head_page)) 1458 return NULL; 1459 1460 /* sanity check */ 1461 list = cpu_buffer->pages; 1462 if (RB_WARN_ON(cpu_buffer, rb_list_head(list->prev->next) != list)) 1463 return NULL; 1464 1465 page = head = cpu_buffer->head_page; 1466 /* 1467 * It is possible that the writer moves the header behind 1468 * where we started, and we miss in one loop. 1469 * A second loop should grab the header, but we'll do 1470 * three loops just because I'm paranoid. 1471 */ 1472 for (i = 0; i < 3; i++) { 1473 do { 1474 if (rb_is_head_page(page, page->list.prev)) { 1475 cpu_buffer->head_page = page; 1476 return page; 1477 } 1478 rb_inc_page(&page); 1479 } while (page != head); 1480 } 1481 1482 RB_WARN_ON(cpu_buffer, 1); 1483 1484 return NULL; 1485 } 1486 1487 static bool rb_head_page_replace(struct buffer_page *old, 1488 struct buffer_page *new) 1489 { 1490 unsigned long *ptr = (unsigned long *)&old->list.prev->next; 1491 unsigned long val; 1492 unsigned long ret; 1493 1494 val = *ptr & ~RB_FLAG_MASK; 1495 val |= RB_PAGE_HEAD; 1496 1497 ret = cmpxchg(ptr, val, (unsigned long)&new->list); 1498 1499 return ret == val; 1500 } 1501 1502 /* 1503 * rb_tail_page_update - move the tail page forward 1504 */ 1505 static void rb_tail_page_update(struct ring_buffer_per_cpu *cpu_buffer, 1506 struct buffer_page *tail_page, 1507 struct buffer_page *next_page) 1508 { 1509 unsigned long old_entries; 1510 unsigned long old_write; 1511 1512 /* 1513 * The tail page now needs to be moved forward. 1514 * 1515 * We need to reset the tail page, but without messing 1516 * with possible erasing of data brought in by interrupts 1517 * that have moved the tail page and are currently on it. 1518 * 1519 * We add a counter to the write field to denote this. 1520 */ 1521 old_write = local_add_return(RB_WRITE_INTCNT, &next_page->write); 1522 old_entries = local_add_return(RB_WRITE_INTCNT, &next_page->entries); 1523 1524 local_inc(&cpu_buffer->pages_touched); 1525 /* 1526 * Just make sure we have seen our old_write and synchronize 1527 * with any interrupts that come in. 1528 */ 1529 barrier(); 1530 1531 /* 1532 * If the tail page is still the same as what we think 1533 * it is, then it is up to us to update the tail 1534 * pointer. 1535 */ 1536 if (tail_page == READ_ONCE(cpu_buffer->tail_page)) { 1537 /* Zero the write counter */ 1538 unsigned long val = old_write & ~RB_WRITE_MASK; 1539 unsigned long eval = old_entries & ~RB_WRITE_MASK; 1540 1541 /* 1542 * This will only succeed if an interrupt did 1543 * not come in and change it. In which case, we 1544 * do not want to modify it. 1545 * 1546 * We add (void) to let the compiler know that we do not care 1547 * about the return value of these functions. We use the 1548 * cmpxchg to only update if an interrupt did not already 1549 * do it for us. If the cmpxchg fails, we don't care. 1550 */ 1551 (void)local_cmpxchg(&next_page->write, old_write, val); 1552 (void)local_cmpxchg(&next_page->entries, old_entries, eval); 1553 1554 /* 1555 * No need to worry about races with clearing out the commit. 1556 * it only can increment when a commit takes place. But that 1557 * only happens in the outer most nested commit. 1558 */ 1559 local_set(&next_page->page->commit, 0); 1560 1561 /* Again, either we update tail_page or an interrupt does */ 1562 (void)cmpxchg(&cpu_buffer->tail_page, tail_page, next_page); 1563 } 1564 } 1565 1566 static void rb_check_bpage(struct ring_buffer_per_cpu *cpu_buffer, 1567 struct buffer_page *bpage) 1568 { 1569 unsigned long val = (unsigned long)bpage; 1570 1571 RB_WARN_ON(cpu_buffer, val & RB_FLAG_MASK); 1572 } 1573 1574 /** 1575 * rb_check_pages - integrity check of buffer pages 1576 * @cpu_buffer: CPU buffer with pages to test 1577 * 1578 * As a safety measure we check to make sure the data pages have not 1579 * been corrupted. 1580 */ 1581 static void rb_check_pages(struct ring_buffer_per_cpu *cpu_buffer) 1582 { 1583 struct list_head *head = rb_list_head(cpu_buffer->pages); 1584 struct list_head *tmp; 1585 1586 if (RB_WARN_ON(cpu_buffer, 1587 rb_list_head(rb_list_head(head->next)->prev) != head)) 1588 return; 1589 1590 if (RB_WARN_ON(cpu_buffer, 1591 rb_list_head(rb_list_head(head->prev)->next) != head)) 1592 return; 1593 1594 for (tmp = rb_list_head(head->next); tmp != head; tmp = rb_list_head(tmp->next)) { 1595 if (RB_WARN_ON(cpu_buffer, 1596 rb_list_head(rb_list_head(tmp->next)->prev) != tmp)) 1597 return; 1598 1599 if (RB_WARN_ON(cpu_buffer, 1600 rb_list_head(rb_list_head(tmp->prev)->next) != tmp)) 1601 return; 1602 } 1603 } 1604 1605 static int __rb_allocate_pages(struct ring_buffer_per_cpu *cpu_buffer, 1606 long nr_pages, struct list_head *pages) 1607 { 1608 struct buffer_page *bpage, *tmp; 1609 bool user_thread = current->mm != NULL; 1610 gfp_t mflags; 1611 long i; 1612 1613 /* 1614 * Check if the available memory is there first. 1615 * Note, si_mem_available() only gives us a rough estimate of available 1616 * memory. It may not be accurate. But we don't care, we just want 1617 * to prevent doing any allocation when it is obvious that it is 1618 * not going to succeed. 1619 */ 1620 i = si_mem_available(); 1621 if (i < nr_pages) 1622 return -ENOMEM; 1623 1624 /* 1625 * __GFP_RETRY_MAYFAIL flag makes sure that the allocation fails 1626 * gracefully without invoking oom-killer and the system is not 1627 * destabilized. 1628 */ 1629 mflags = GFP_KERNEL | __GFP_RETRY_MAYFAIL; 1630 1631 /* 1632 * If a user thread allocates too much, and si_mem_available() 1633 * reports there's enough memory, even though there is not. 1634 * Make sure the OOM killer kills this thread. This can happen 1635 * even with RETRY_MAYFAIL because another task may be doing 1636 * an allocation after this task has taken all memory. 1637 * This is the task the OOM killer needs to take out during this 1638 * loop, even if it was triggered by an allocation somewhere else. 1639 */ 1640 if (user_thread) 1641 set_current_oom_origin(); 1642 for (i = 0; i < nr_pages; i++) { 1643 struct page *page; 1644 1645 bpage = kzalloc_node(ALIGN(sizeof(*bpage), cache_line_size()), 1646 mflags, cpu_to_node(cpu_buffer->cpu)); 1647 if (!bpage) 1648 goto free_pages; 1649 1650 rb_check_bpage(cpu_buffer, bpage); 1651 1652 list_add(&bpage->list, pages); 1653 1654 page = alloc_pages_node(cpu_to_node(cpu_buffer->cpu), mflags, 0); 1655 if (!page) 1656 goto free_pages; 1657 bpage->page = page_address(page); 1658 rb_init_page(bpage->page); 1659 1660 if (user_thread && fatal_signal_pending(current)) 1661 goto free_pages; 1662 } 1663 if (user_thread) 1664 clear_current_oom_origin(); 1665 1666 return 0; 1667 1668 free_pages: 1669 list_for_each_entry_safe(bpage, tmp, pages, list) { 1670 list_del_init(&bpage->list); 1671 free_buffer_page(bpage); 1672 } 1673 if (user_thread) 1674 clear_current_oom_origin(); 1675 1676 return -ENOMEM; 1677 } 1678 1679 static int rb_allocate_pages(struct ring_buffer_per_cpu *cpu_buffer, 1680 unsigned long nr_pages) 1681 { 1682 LIST_HEAD(pages); 1683 1684 WARN_ON(!nr_pages); 1685 1686 if (__rb_allocate_pages(cpu_buffer, nr_pages, &pages)) 1687 return -ENOMEM; 1688 1689 /* 1690 * The ring buffer page list is a circular list that does not 1691 * start and end with a list head. All page list items point to 1692 * other pages. 1693 */ 1694 cpu_buffer->pages = pages.next; 1695 list_del(&pages); 1696 1697 cpu_buffer->nr_pages = nr_pages; 1698 1699 rb_check_pages(cpu_buffer); 1700 1701 return 0; 1702 } 1703 1704 static struct ring_buffer_per_cpu * 1705 rb_allocate_cpu_buffer(struct trace_buffer *buffer, long nr_pages, int cpu) 1706 { 1707 struct ring_buffer_per_cpu *cpu_buffer; 1708 struct buffer_page *bpage; 1709 struct page *page; 1710 int ret; 1711 1712 cpu_buffer = kzalloc_node(ALIGN(sizeof(*cpu_buffer), cache_line_size()), 1713 GFP_KERNEL, cpu_to_node(cpu)); 1714 if (!cpu_buffer) 1715 return NULL; 1716 1717 cpu_buffer->cpu = cpu; 1718 cpu_buffer->buffer = buffer; 1719 raw_spin_lock_init(&cpu_buffer->reader_lock); 1720 lockdep_set_class(&cpu_buffer->reader_lock, buffer->reader_lock_key); 1721 cpu_buffer->lock = (arch_spinlock_t)__ARCH_SPIN_LOCK_UNLOCKED; 1722 INIT_WORK(&cpu_buffer->update_pages_work, update_pages_handler); 1723 init_completion(&cpu_buffer->update_done); 1724 init_irq_work(&cpu_buffer->irq_work.work, rb_wake_up_waiters); 1725 init_waitqueue_head(&cpu_buffer->irq_work.waiters); 1726 init_waitqueue_head(&cpu_buffer->irq_work.full_waiters); 1727 1728 bpage = kzalloc_node(ALIGN(sizeof(*bpage), cache_line_size()), 1729 GFP_KERNEL, cpu_to_node(cpu)); 1730 if (!bpage) 1731 goto fail_free_buffer; 1732 1733 rb_check_bpage(cpu_buffer, bpage); 1734 1735 cpu_buffer->reader_page = bpage; 1736 page = alloc_pages_node(cpu_to_node(cpu), GFP_KERNEL, 0); 1737 if (!page) 1738 goto fail_free_reader; 1739 bpage->page = page_address(page); 1740 rb_init_page(bpage->page); 1741 1742 INIT_LIST_HEAD(&cpu_buffer->reader_page->list); 1743 INIT_LIST_HEAD(&cpu_buffer->new_pages); 1744 1745 ret = rb_allocate_pages(cpu_buffer, nr_pages); 1746 if (ret < 0) 1747 goto fail_free_reader; 1748 1749 cpu_buffer->head_page 1750 = list_entry(cpu_buffer->pages, struct buffer_page, list); 1751 cpu_buffer->tail_page = cpu_buffer->commit_page = cpu_buffer->head_page; 1752 1753 rb_head_page_activate(cpu_buffer); 1754 1755 return cpu_buffer; 1756 1757 fail_free_reader: 1758 free_buffer_page(cpu_buffer->reader_page); 1759 1760 fail_free_buffer: 1761 kfree(cpu_buffer); 1762 return NULL; 1763 } 1764 1765 static void rb_free_cpu_buffer(struct ring_buffer_per_cpu *cpu_buffer) 1766 { 1767 struct list_head *head = cpu_buffer->pages; 1768 struct buffer_page *bpage, *tmp; 1769 1770 irq_work_sync(&cpu_buffer->irq_work.work); 1771 1772 free_buffer_page(cpu_buffer->reader_page); 1773 1774 if (head) { 1775 rb_head_page_deactivate(cpu_buffer); 1776 1777 list_for_each_entry_safe(bpage, tmp, head, list) { 1778 list_del_init(&bpage->list); 1779 free_buffer_page(bpage); 1780 } 1781 bpage = list_entry(head, struct buffer_page, list); 1782 free_buffer_page(bpage); 1783 } 1784 1785 kfree(cpu_buffer); 1786 } 1787 1788 /** 1789 * __ring_buffer_alloc - allocate a new ring_buffer 1790 * @size: the size in bytes per cpu that is needed. 1791 * @flags: attributes to set for the ring buffer. 1792 * @key: ring buffer reader_lock_key. 1793 * 1794 * Currently the only flag that is available is the RB_FL_OVERWRITE 1795 * flag. This flag means that the buffer will overwrite old data 1796 * when the buffer wraps. If this flag is not set, the buffer will 1797 * drop data when the tail hits the head. 1798 */ 1799 struct trace_buffer *__ring_buffer_alloc(unsigned long size, unsigned flags, 1800 struct lock_class_key *key) 1801 { 1802 struct trace_buffer *buffer; 1803 long nr_pages; 1804 int bsize; 1805 int cpu; 1806 int ret; 1807 1808 /* keep it in its own cache line */ 1809 buffer = kzalloc(ALIGN(sizeof(*buffer), cache_line_size()), 1810 GFP_KERNEL); 1811 if (!buffer) 1812 return NULL; 1813 1814 if (!zalloc_cpumask_var(&buffer->cpumask, GFP_KERNEL)) 1815 goto fail_free_buffer; 1816 1817 nr_pages = DIV_ROUND_UP(size, BUF_PAGE_SIZE); 1818 buffer->flags = flags; 1819 buffer->clock = trace_clock_local; 1820 buffer->reader_lock_key = key; 1821 1822 init_irq_work(&buffer->irq_work.work, rb_wake_up_waiters); 1823 init_waitqueue_head(&buffer->irq_work.waiters); 1824 1825 /* need at least two pages */ 1826 if (nr_pages < 2) 1827 nr_pages = 2; 1828 1829 buffer->cpus = nr_cpu_ids; 1830 1831 bsize = sizeof(void *) * nr_cpu_ids; 1832 buffer->buffers = kzalloc(ALIGN(bsize, cache_line_size()), 1833 GFP_KERNEL); 1834 if (!buffer->buffers) 1835 goto fail_free_cpumask; 1836 1837 cpu = raw_smp_processor_id(); 1838 cpumask_set_cpu(cpu, buffer->cpumask); 1839 buffer->buffers[cpu] = rb_allocate_cpu_buffer(buffer, nr_pages, cpu); 1840 if (!buffer->buffers[cpu]) 1841 goto fail_free_buffers; 1842 1843 ret = cpuhp_state_add_instance(CPUHP_TRACE_RB_PREPARE, &buffer->node); 1844 if (ret < 0) 1845 goto fail_free_buffers; 1846 1847 mutex_init(&buffer->mutex); 1848 1849 return buffer; 1850 1851 fail_free_buffers: 1852 for_each_buffer_cpu(buffer, cpu) { 1853 if (buffer->buffers[cpu]) 1854 rb_free_cpu_buffer(buffer->buffers[cpu]); 1855 } 1856 kfree(buffer->buffers); 1857 1858 fail_free_cpumask: 1859 free_cpumask_var(buffer->cpumask); 1860 1861 fail_free_buffer: 1862 kfree(buffer); 1863 return NULL; 1864 } 1865 EXPORT_SYMBOL_GPL(__ring_buffer_alloc); 1866 1867 /** 1868 * ring_buffer_free - free a ring buffer. 1869 * @buffer: the buffer to free. 1870 */ 1871 void 1872 ring_buffer_free(struct trace_buffer *buffer) 1873 { 1874 int cpu; 1875 1876 cpuhp_state_remove_instance(CPUHP_TRACE_RB_PREPARE, &buffer->node); 1877 1878 irq_work_sync(&buffer->irq_work.work); 1879 1880 for_each_buffer_cpu(buffer, cpu) 1881 rb_free_cpu_buffer(buffer->buffers[cpu]); 1882 1883 kfree(buffer->buffers); 1884 free_cpumask_var(buffer->cpumask); 1885 1886 kfree(buffer); 1887 } 1888 EXPORT_SYMBOL_GPL(ring_buffer_free); 1889 1890 void ring_buffer_set_clock(struct trace_buffer *buffer, 1891 u64 (*clock)(void)) 1892 { 1893 buffer->clock = clock; 1894 } 1895 1896 void ring_buffer_set_time_stamp_abs(struct trace_buffer *buffer, bool abs) 1897 { 1898 buffer->time_stamp_abs = abs; 1899 } 1900 1901 bool ring_buffer_time_stamp_abs(struct trace_buffer *buffer) 1902 { 1903 return buffer->time_stamp_abs; 1904 } 1905 1906 static void rb_reset_cpu(struct ring_buffer_per_cpu *cpu_buffer); 1907 1908 static inline unsigned long rb_page_entries(struct buffer_page *bpage) 1909 { 1910 return local_read(&bpage->entries) & RB_WRITE_MASK; 1911 } 1912 1913 static inline unsigned long rb_page_write(struct buffer_page *bpage) 1914 { 1915 return local_read(&bpage->write) & RB_WRITE_MASK; 1916 } 1917 1918 static bool 1919 rb_remove_pages(struct ring_buffer_per_cpu *cpu_buffer, unsigned long nr_pages) 1920 { 1921 struct list_head *tail_page, *to_remove, *next_page; 1922 struct buffer_page *to_remove_page, *tmp_iter_page; 1923 struct buffer_page *last_page, *first_page; 1924 unsigned long nr_removed; 1925 unsigned long head_bit; 1926 int page_entries; 1927 1928 head_bit = 0; 1929 1930 raw_spin_lock_irq(&cpu_buffer->reader_lock); 1931 atomic_inc(&cpu_buffer->record_disabled); 1932 /* 1933 * We don't race with the readers since we have acquired the reader 1934 * lock. We also don't race with writers after disabling recording. 1935 * This makes it easy to figure out the first and the last page to be 1936 * removed from the list. We unlink all the pages in between including 1937 * the first and last pages. This is done in a busy loop so that we 1938 * lose the least number of traces. 1939 * The pages are freed after we restart recording and unlock readers. 1940 */ 1941 tail_page = &cpu_buffer->tail_page->list; 1942 1943 /* 1944 * tail page might be on reader page, we remove the next page 1945 * from the ring buffer 1946 */ 1947 if (cpu_buffer->tail_page == cpu_buffer->reader_page) 1948 tail_page = rb_list_head(tail_page->next); 1949 to_remove = tail_page; 1950 1951 /* start of pages to remove */ 1952 first_page = list_entry(rb_list_head(to_remove->next), 1953 struct buffer_page, list); 1954 1955 for (nr_removed = 0; nr_removed < nr_pages; nr_removed++) { 1956 to_remove = rb_list_head(to_remove)->next; 1957 head_bit |= (unsigned long)to_remove & RB_PAGE_HEAD; 1958 } 1959 1960 next_page = rb_list_head(to_remove)->next; 1961 1962 /* 1963 * Now we remove all pages between tail_page and next_page. 1964 * Make sure that we have head_bit value preserved for the 1965 * next page 1966 */ 1967 tail_page->next = (struct list_head *)((unsigned long)next_page | 1968 head_bit); 1969 next_page = rb_list_head(next_page); 1970 next_page->prev = tail_page; 1971 1972 /* make sure pages points to a valid page in the ring buffer */ 1973 cpu_buffer->pages = next_page; 1974 1975 /* update head page */ 1976 if (head_bit) 1977 cpu_buffer->head_page = list_entry(next_page, 1978 struct buffer_page, list); 1979 1980 /* 1981 * change read pointer to make sure any read iterators reset 1982 * themselves 1983 */ 1984 cpu_buffer->read = 0; 1985 1986 /* pages are removed, resume tracing and then free the pages */ 1987 atomic_dec(&cpu_buffer->record_disabled); 1988 raw_spin_unlock_irq(&cpu_buffer->reader_lock); 1989 1990 RB_WARN_ON(cpu_buffer, list_empty(cpu_buffer->pages)); 1991 1992 /* last buffer page to remove */ 1993 last_page = list_entry(rb_list_head(to_remove), struct buffer_page, 1994 list); 1995 tmp_iter_page = first_page; 1996 1997 do { 1998 cond_resched(); 1999 2000 to_remove_page = tmp_iter_page; 2001 rb_inc_page(&tmp_iter_page); 2002 2003 /* update the counters */ 2004 page_entries = rb_page_entries(to_remove_page); 2005 if (page_entries) { 2006 /* 2007 * If something was added to this page, it was full 2008 * since it is not the tail page. So we deduct the 2009 * bytes consumed in ring buffer from here. 2010 * Increment overrun to account for the lost events. 2011 */ 2012 local_add(page_entries, &cpu_buffer->overrun); 2013 local_sub(BUF_PAGE_SIZE, &cpu_buffer->entries_bytes); 2014 local_inc(&cpu_buffer->pages_lost); 2015 } 2016 2017 /* 2018 * We have already removed references to this list item, just 2019 * free up the buffer_page and its page 2020 */ 2021 free_buffer_page(to_remove_page); 2022 nr_removed--; 2023 2024 } while (to_remove_page != last_page); 2025 2026 RB_WARN_ON(cpu_buffer, nr_removed); 2027 2028 return nr_removed == 0; 2029 } 2030 2031 static bool 2032 rb_insert_pages(struct ring_buffer_per_cpu *cpu_buffer) 2033 { 2034 struct list_head *pages = &cpu_buffer->new_pages; 2035 unsigned long flags; 2036 bool success; 2037 int retries; 2038 2039 /* Can be called at early boot up, where interrupts must not been enabled */ 2040 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 2041 /* 2042 * We are holding the reader lock, so the reader page won't be swapped 2043 * in the ring buffer. Now we are racing with the writer trying to 2044 * move head page and the tail page. 2045 * We are going to adapt the reader page update process where: 2046 * 1. We first splice the start and end of list of new pages between 2047 * the head page and its previous page. 2048 * 2. We cmpxchg the prev_page->next to point from head page to the 2049 * start of new pages list. 2050 * 3. Finally, we update the head->prev to the end of new list. 2051 * 2052 * We will try this process 10 times, to make sure that we don't keep 2053 * spinning. 2054 */ 2055 retries = 10; 2056 success = false; 2057 while (retries--) { 2058 struct list_head *head_page, *prev_page, *r; 2059 struct list_head *last_page, *first_page; 2060 struct list_head *head_page_with_bit; 2061 struct buffer_page *hpage = rb_set_head_page(cpu_buffer); 2062 2063 if (!hpage) 2064 break; 2065 head_page = &hpage->list; 2066 prev_page = head_page->prev; 2067 2068 first_page = pages->next; 2069 last_page = pages->prev; 2070 2071 head_page_with_bit = (struct list_head *) 2072 ((unsigned long)head_page | RB_PAGE_HEAD); 2073 2074 last_page->next = head_page_with_bit; 2075 first_page->prev = prev_page; 2076 2077 r = cmpxchg(&prev_page->next, head_page_with_bit, first_page); 2078 2079 if (r == head_page_with_bit) { 2080 /* 2081 * yay, we replaced the page pointer to our new list, 2082 * now, we just have to update to head page's prev 2083 * pointer to point to end of list 2084 */ 2085 head_page->prev = last_page; 2086 success = true; 2087 break; 2088 } 2089 } 2090 2091 if (success) 2092 INIT_LIST_HEAD(pages); 2093 /* 2094 * If we weren't successful in adding in new pages, warn and stop 2095 * tracing 2096 */ 2097 RB_WARN_ON(cpu_buffer, !success); 2098 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 2099 2100 /* free pages if they weren't inserted */ 2101 if (!success) { 2102 struct buffer_page *bpage, *tmp; 2103 list_for_each_entry_safe(bpage, tmp, &cpu_buffer->new_pages, 2104 list) { 2105 list_del_init(&bpage->list); 2106 free_buffer_page(bpage); 2107 } 2108 } 2109 return success; 2110 } 2111 2112 static void rb_update_pages(struct ring_buffer_per_cpu *cpu_buffer) 2113 { 2114 bool success; 2115 2116 if (cpu_buffer->nr_pages_to_update > 0) 2117 success = rb_insert_pages(cpu_buffer); 2118 else 2119 success = rb_remove_pages(cpu_buffer, 2120 -cpu_buffer->nr_pages_to_update); 2121 2122 if (success) 2123 cpu_buffer->nr_pages += cpu_buffer->nr_pages_to_update; 2124 } 2125 2126 static void update_pages_handler(struct work_struct *work) 2127 { 2128 struct ring_buffer_per_cpu *cpu_buffer = container_of(work, 2129 struct ring_buffer_per_cpu, update_pages_work); 2130 rb_update_pages(cpu_buffer); 2131 complete(&cpu_buffer->update_done); 2132 } 2133 2134 /** 2135 * ring_buffer_resize - resize the ring buffer 2136 * @buffer: the buffer to resize. 2137 * @size: the new size. 2138 * @cpu_id: the cpu buffer to resize 2139 * 2140 * Minimum size is 2 * BUF_PAGE_SIZE. 2141 * 2142 * Returns 0 on success and < 0 on failure. 2143 */ 2144 int ring_buffer_resize(struct trace_buffer *buffer, unsigned long size, 2145 int cpu_id) 2146 { 2147 struct ring_buffer_per_cpu *cpu_buffer; 2148 unsigned long nr_pages; 2149 int cpu, err; 2150 2151 /* 2152 * Always succeed at resizing a non-existent buffer: 2153 */ 2154 if (!buffer) 2155 return 0; 2156 2157 /* Make sure the requested buffer exists */ 2158 if (cpu_id != RING_BUFFER_ALL_CPUS && 2159 !cpumask_test_cpu(cpu_id, buffer->cpumask)) 2160 return 0; 2161 2162 nr_pages = DIV_ROUND_UP(size, BUF_PAGE_SIZE); 2163 2164 /* we need a minimum of two pages */ 2165 if (nr_pages < 2) 2166 nr_pages = 2; 2167 2168 /* prevent another thread from changing buffer sizes */ 2169 mutex_lock(&buffer->mutex); 2170 2171 2172 if (cpu_id == RING_BUFFER_ALL_CPUS) { 2173 /* 2174 * Don't succeed if resizing is disabled, as a reader might be 2175 * manipulating the ring buffer and is expecting a sane state while 2176 * this is true. 2177 */ 2178 for_each_buffer_cpu(buffer, cpu) { 2179 cpu_buffer = buffer->buffers[cpu]; 2180 if (atomic_read(&cpu_buffer->resize_disabled)) { 2181 err = -EBUSY; 2182 goto out_err_unlock; 2183 } 2184 } 2185 2186 /* calculate the pages to update */ 2187 for_each_buffer_cpu(buffer, cpu) { 2188 cpu_buffer = buffer->buffers[cpu]; 2189 2190 cpu_buffer->nr_pages_to_update = nr_pages - 2191 cpu_buffer->nr_pages; 2192 /* 2193 * nothing more to do for removing pages or no update 2194 */ 2195 if (cpu_buffer->nr_pages_to_update <= 0) 2196 continue; 2197 /* 2198 * to add pages, make sure all new pages can be 2199 * allocated without receiving ENOMEM 2200 */ 2201 INIT_LIST_HEAD(&cpu_buffer->new_pages); 2202 if (__rb_allocate_pages(cpu_buffer, cpu_buffer->nr_pages_to_update, 2203 &cpu_buffer->new_pages)) { 2204 /* not enough memory for new pages */ 2205 err = -ENOMEM; 2206 goto out_err; 2207 } 2208 } 2209 2210 cpus_read_lock(); 2211 /* 2212 * Fire off all the required work handlers 2213 * We can't schedule on offline CPUs, but it's not necessary 2214 * since we can change their buffer sizes without any race. 2215 */ 2216 for_each_buffer_cpu(buffer, cpu) { 2217 cpu_buffer = buffer->buffers[cpu]; 2218 if (!cpu_buffer->nr_pages_to_update) 2219 continue; 2220 2221 /* Can't run something on an offline CPU. */ 2222 if (!cpu_online(cpu)) { 2223 rb_update_pages(cpu_buffer); 2224 cpu_buffer->nr_pages_to_update = 0; 2225 } else { 2226 /* Run directly if possible. */ 2227 migrate_disable(); 2228 if (cpu != smp_processor_id()) { 2229 migrate_enable(); 2230 schedule_work_on(cpu, 2231 &cpu_buffer->update_pages_work); 2232 } else { 2233 update_pages_handler(&cpu_buffer->update_pages_work); 2234 migrate_enable(); 2235 } 2236 } 2237 } 2238 2239 /* wait for all the updates to complete */ 2240 for_each_buffer_cpu(buffer, cpu) { 2241 cpu_buffer = buffer->buffers[cpu]; 2242 if (!cpu_buffer->nr_pages_to_update) 2243 continue; 2244 2245 if (cpu_online(cpu)) 2246 wait_for_completion(&cpu_buffer->update_done); 2247 cpu_buffer->nr_pages_to_update = 0; 2248 } 2249 2250 cpus_read_unlock(); 2251 } else { 2252 cpu_buffer = buffer->buffers[cpu_id]; 2253 2254 if (nr_pages == cpu_buffer->nr_pages) 2255 goto out; 2256 2257 /* 2258 * Don't succeed if resizing is disabled, as a reader might be 2259 * manipulating the ring buffer and is expecting a sane state while 2260 * this is true. 2261 */ 2262 if (atomic_read(&cpu_buffer->resize_disabled)) { 2263 err = -EBUSY; 2264 goto out_err_unlock; 2265 } 2266 2267 cpu_buffer->nr_pages_to_update = nr_pages - 2268 cpu_buffer->nr_pages; 2269 2270 INIT_LIST_HEAD(&cpu_buffer->new_pages); 2271 if (cpu_buffer->nr_pages_to_update > 0 && 2272 __rb_allocate_pages(cpu_buffer, cpu_buffer->nr_pages_to_update, 2273 &cpu_buffer->new_pages)) { 2274 err = -ENOMEM; 2275 goto out_err; 2276 } 2277 2278 cpus_read_lock(); 2279 2280 /* Can't run something on an offline CPU. */ 2281 if (!cpu_online(cpu_id)) 2282 rb_update_pages(cpu_buffer); 2283 else { 2284 /* Run directly if possible. */ 2285 migrate_disable(); 2286 if (cpu_id == smp_processor_id()) { 2287 rb_update_pages(cpu_buffer); 2288 migrate_enable(); 2289 } else { 2290 migrate_enable(); 2291 schedule_work_on(cpu_id, 2292 &cpu_buffer->update_pages_work); 2293 wait_for_completion(&cpu_buffer->update_done); 2294 } 2295 } 2296 2297 cpu_buffer->nr_pages_to_update = 0; 2298 cpus_read_unlock(); 2299 } 2300 2301 out: 2302 /* 2303 * The ring buffer resize can happen with the ring buffer 2304 * enabled, so that the update disturbs the tracing as little 2305 * as possible. But if the buffer is disabled, we do not need 2306 * to worry about that, and we can take the time to verify 2307 * that the buffer is not corrupt. 2308 */ 2309 if (atomic_read(&buffer->record_disabled)) { 2310 atomic_inc(&buffer->record_disabled); 2311 /* 2312 * Even though the buffer was disabled, we must make sure 2313 * that it is truly disabled before calling rb_check_pages. 2314 * There could have been a race between checking 2315 * record_disable and incrementing it. 2316 */ 2317 synchronize_rcu(); 2318 for_each_buffer_cpu(buffer, cpu) { 2319 cpu_buffer = buffer->buffers[cpu]; 2320 rb_check_pages(cpu_buffer); 2321 } 2322 atomic_dec(&buffer->record_disabled); 2323 } 2324 2325 mutex_unlock(&buffer->mutex); 2326 return 0; 2327 2328 out_err: 2329 for_each_buffer_cpu(buffer, cpu) { 2330 struct buffer_page *bpage, *tmp; 2331 2332 cpu_buffer = buffer->buffers[cpu]; 2333 cpu_buffer->nr_pages_to_update = 0; 2334 2335 if (list_empty(&cpu_buffer->new_pages)) 2336 continue; 2337 2338 list_for_each_entry_safe(bpage, tmp, &cpu_buffer->new_pages, 2339 list) { 2340 list_del_init(&bpage->list); 2341 free_buffer_page(bpage); 2342 } 2343 } 2344 out_err_unlock: 2345 mutex_unlock(&buffer->mutex); 2346 return err; 2347 } 2348 EXPORT_SYMBOL_GPL(ring_buffer_resize); 2349 2350 void ring_buffer_change_overwrite(struct trace_buffer *buffer, int val) 2351 { 2352 mutex_lock(&buffer->mutex); 2353 if (val) 2354 buffer->flags |= RB_FL_OVERWRITE; 2355 else 2356 buffer->flags &= ~RB_FL_OVERWRITE; 2357 mutex_unlock(&buffer->mutex); 2358 } 2359 EXPORT_SYMBOL_GPL(ring_buffer_change_overwrite); 2360 2361 static __always_inline void *__rb_page_index(struct buffer_page *bpage, unsigned index) 2362 { 2363 return bpage->page->data + index; 2364 } 2365 2366 static __always_inline struct ring_buffer_event * 2367 rb_reader_event(struct ring_buffer_per_cpu *cpu_buffer) 2368 { 2369 return __rb_page_index(cpu_buffer->reader_page, 2370 cpu_buffer->reader_page->read); 2371 } 2372 2373 static __always_inline unsigned rb_page_commit(struct buffer_page *bpage) 2374 { 2375 return local_read(&bpage->page->commit); 2376 } 2377 2378 static struct ring_buffer_event * 2379 rb_iter_head_event(struct ring_buffer_iter *iter) 2380 { 2381 struct ring_buffer_event *event; 2382 struct buffer_page *iter_head_page = iter->head_page; 2383 unsigned long commit; 2384 unsigned length; 2385 2386 if (iter->head != iter->next_event) 2387 return iter->event; 2388 2389 /* 2390 * When the writer goes across pages, it issues a cmpxchg which 2391 * is a mb(), which will synchronize with the rmb here. 2392 * (see rb_tail_page_update() and __rb_reserve_next()) 2393 */ 2394 commit = rb_page_commit(iter_head_page); 2395 smp_rmb(); 2396 event = __rb_page_index(iter_head_page, iter->head); 2397 length = rb_event_length(event); 2398 2399 /* 2400 * READ_ONCE() doesn't work on functions and we don't want the 2401 * compiler doing any crazy optimizations with length. 2402 */ 2403 barrier(); 2404 2405 if ((iter->head + length) > commit || length > BUF_MAX_DATA_SIZE) 2406 /* Writer corrupted the read? */ 2407 goto reset; 2408 2409 memcpy(iter->event, event, length); 2410 /* 2411 * If the page stamp is still the same after this rmb() then the 2412 * event was safely copied without the writer entering the page. 2413 */ 2414 smp_rmb(); 2415 2416 /* Make sure the page didn't change since we read this */ 2417 if (iter->page_stamp != iter_head_page->page->time_stamp || 2418 commit > rb_page_commit(iter_head_page)) 2419 goto reset; 2420 2421 iter->next_event = iter->head + length; 2422 return iter->event; 2423 reset: 2424 /* Reset to the beginning */ 2425 iter->page_stamp = iter->read_stamp = iter->head_page->page->time_stamp; 2426 iter->head = 0; 2427 iter->next_event = 0; 2428 iter->missed_events = 1; 2429 return NULL; 2430 } 2431 2432 /* Size is determined by what has been committed */ 2433 static __always_inline unsigned rb_page_size(struct buffer_page *bpage) 2434 { 2435 return rb_page_commit(bpage); 2436 } 2437 2438 static __always_inline unsigned 2439 rb_commit_index(struct ring_buffer_per_cpu *cpu_buffer) 2440 { 2441 return rb_page_commit(cpu_buffer->commit_page); 2442 } 2443 2444 static __always_inline unsigned 2445 rb_event_index(struct ring_buffer_event *event) 2446 { 2447 unsigned long addr = (unsigned long)event; 2448 2449 return (addr & ~PAGE_MASK) - BUF_PAGE_HDR_SIZE; 2450 } 2451 2452 static void rb_inc_iter(struct ring_buffer_iter *iter) 2453 { 2454 struct ring_buffer_per_cpu *cpu_buffer = iter->cpu_buffer; 2455 2456 /* 2457 * The iterator could be on the reader page (it starts there). 2458 * But the head could have moved, since the reader was 2459 * found. Check for this case and assign the iterator 2460 * to the head page instead of next. 2461 */ 2462 if (iter->head_page == cpu_buffer->reader_page) 2463 iter->head_page = rb_set_head_page(cpu_buffer); 2464 else 2465 rb_inc_page(&iter->head_page); 2466 2467 iter->page_stamp = iter->read_stamp = iter->head_page->page->time_stamp; 2468 iter->head = 0; 2469 iter->next_event = 0; 2470 } 2471 2472 /* 2473 * rb_handle_head_page - writer hit the head page 2474 * 2475 * Returns: +1 to retry page 2476 * 0 to continue 2477 * -1 on error 2478 */ 2479 static int 2480 rb_handle_head_page(struct ring_buffer_per_cpu *cpu_buffer, 2481 struct buffer_page *tail_page, 2482 struct buffer_page *next_page) 2483 { 2484 struct buffer_page *new_head; 2485 int entries; 2486 int type; 2487 int ret; 2488 2489 entries = rb_page_entries(next_page); 2490 2491 /* 2492 * The hard part is here. We need to move the head 2493 * forward, and protect against both readers on 2494 * other CPUs and writers coming in via interrupts. 2495 */ 2496 type = rb_head_page_set_update(cpu_buffer, next_page, tail_page, 2497 RB_PAGE_HEAD); 2498 2499 /* 2500 * type can be one of four: 2501 * NORMAL - an interrupt already moved it for us 2502 * HEAD - we are the first to get here. 2503 * UPDATE - we are the interrupt interrupting 2504 * a current move. 2505 * MOVED - a reader on another CPU moved the next 2506 * pointer to its reader page. Give up 2507 * and try again. 2508 */ 2509 2510 switch (type) { 2511 case RB_PAGE_HEAD: 2512 /* 2513 * We changed the head to UPDATE, thus 2514 * it is our responsibility to update 2515 * the counters. 2516 */ 2517 local_add(entries, &cpu_buffer->overrun); 2518 local_sub(BUF_PAGE_SIZE, &cpu_buffer->entries_bytes); 2519 local_inc(&cpu_buffer->pages_lost); 2520 2521 /* 2522 * The entries will be zeroed out when we move the 2523 * tail page. 2524 */ 2525 2526 /* still more to do */ 2527 break; 2528 2529 case RB_PAGE_UPDATE: 2530 /* 2531 * This is an interrupt that interrupt the 2532 * previous update. Still more to do. 2533 */ 2534 break; 2535 case RB_PAGE_NORMAL: 2536 /* 2537 * An interrupt came in before the update 2538 * and processed this for us. 2539 * Nothing left to do. 2540 */ 2541 return 1; 2542 case RB_PAGE_MOVED: 2543 /* 2544 * The reader is on another CPU and just did 2545 * a swap with our next_page. 2546 * Try again. 2547 */ 2548 return 1; 2549 default: 2550 RB_WARN_ON(cpu_buffer, 1); /* WTF??? */ 2551 return -1; 2552 } 2553 2554 /* 2555 * Now that we are here, the old head pointer is 2556 * set to UPDATE. This will keep the reader from 2557 * swapping the head page with the reader page. 2558 * The reader (on another CPU) will spin till 2559 * we are finished. 2560 * 2561 * We just need to protect against interrupts 2562 * doing the job. We will set the next pointer 2563 * to HEAD. After that, we set the old pointer 2564 * to NORMAL, but only if it was HEAD before. 2565 * otherwise we are an interrupt, and only 2566 * want the outer most commit to reset it. 2567 */ 2568 new_head = next_page; 2569 rb_inc_page(&new_head); 2570 2571 ret = rb_head_page_set_head(cpu_buffer, new_head, next_page, 2572 RB_PAGE_NORMAL); 2573 2574 /* 2575 * Valid returns are: 2576 * HEAD - an interrupt came in and already set it. 2577 * NORMAL - One of two things: 2578 * 1) We really set it. 2579 * 2) A bunch of interrupts came in and moved 2580 * the page forward again. 2581 */ 2582 switch (ret) { 2583 case RB_PAGE_HEAD: 2584 case RB_PAGE_NORMAL: 2585 /* OK */ 2586 break; 2587 default: 2588 RB_WARN_ON(cpu_buffer, 1); 2589 return -1; 2590 } 2591 2592 /* 2593 * It is possible that an interrupt came in, 2594 * set the head up, then more interrupts came in 2595 * and moved it again. When we get back here, 2596 * the page would have been set to NORMAL but we 2597 * just set it back to HEAD. 2598 * 2599 * How do you detect this? Well, if that happened 2600 * the tail page would have moved. 2601 */ 2602 if (ret == RB_PAGE_NORMAL) { 2603 struct buffer_page *buffer_tail_page; 2604 2605 buffer_tail_page = READ_ONCE(cpu_buffer->tail_page); 2606 /* 2607 * If the tail had moved passed next, then we need 2608 * to reset the pointer. 2609 */ 2610 if (buffer_tail_page != tail_page && 2611 buffer_tail_page != next_page) 2612 rb_head_page_set_normal(cpu_buffer, new_head, 2613 next_page, 2614 RB_PAGE_HEAD); 2615 } 2616 2617 /* 2618 * If this was the outer most commit (the one that 2619 * changed the original pointer from HEAD to UPDATE), 2620 * then it is up to us to reset it to NORMAL. 2621 */ 2622 if (type == RB_PAGE_HEAD) { 2623 ret = rb_head_page_set_normal(cpu_buffer, next_page, 2624 tail_page, 2625 RB_PAGE_UPDATE); 2626 if (RB_WARN_ON(cpu_buffer, 2627 ret != RB_PAGE_UPDATE)) 2628 return -1; 2629 } 2630 2631 return 0; 2632 } 2633 2634 static inline void 2635 rb_reset_tail(struct ring_buffer_per_cpu *cpu_buffer, 2636 unsigned long tail, struct rb_event_info *info) 2637 { 2638 struct buffer_page *tail_page = info->tail_page; 2639 struct ring_buffer_event *event; 2640 unsigned long length = info->length; 2641 2642 /* 2643 * Only the event that crossed the page boundary 2644 * must fill the old tail_page with padding. 2645 */ 2646 if (tail >= BUF_PAGE_SIZE) { 2647 /* 2648 * If the page was filled, then we still need 2649 * to update the real_end. Reset it to zero 2650 * and the reader will ignore it. 2651 */ 2652 if (tail == BUF_PAGE_SIZE) 2653 tail_page->real_end = 0; 2654 2655 local_sub(length, &tail_page->write); 2656 return; 2657 } 2658 2659 event = __rb_page_index(tail_page, tail); 2660 2661 /* account for padding bytes */ 2662 local_add(BUF_PAGE_SIZE - tail, &cpu_buffer->entries_bytes); 2663 2664 /* 2665 * Save the original length to the meta data. 2666 * This will be used by the reader to add lost event 2667 * counter. 2668 */ 2669 tail_page->real_end = tail; 2670 2671 /* 2672 * If this event is bigger than the minimum size, then 2673 * we need to be careful that we don't subtract the 2674 * write counter enough to allow another writer to slip 2675 * in on this page. 2676 * We put in a discarded commit instead, to make sure 2677 * that this space is not used again. 2678 * 2679 * If we are less than the minimum size, we don't need to 2680 * worry about it. 2681 */ 2682 if (tail > (BUF_PAGE_SIZE - RB_EVNT_MIN_SIZE)) { 2683 /* No room for any events */ 2684 2685 /* Mark the rest of the page with padding */ 2686 rb_event_set_padding(event); 2687 2688 /* Make sure the padding is visible before the write update */ 2689 smp_wmb(); 2690 2691 /* Set the write back to the previous setting */ 2692 local_sub(length, &tail_page->write); 2693 return; 2694 } 2695 2696 /* Put in a discarded event */ 2697 event->array[0] = (BUF_PAGE_SIZE - tail) - RB_EVNT_HDR_SIZE; 2698 event->type_len = RINGBUF_TYPE_PADDING; 2699 /* time delta must be non zero */ 2700 event->time_delta = 1; 2701 2702 /* Make sure the padding is visible before the tail_page->write update */ 2703 smp_wmb(); 2704 2705 /* Set write to end of buffer */ 2706 length = (tail + length) - BUF_PAGE_SIZE; 2707 local_sub(length, &tail_page->write); 2708 } 2709 2710 static inline void rb_end_commit(struct ring_buffer_per_cpu *cpu_buffer); 2711 2712 /* 2713 * This is the slow path, force gcc not to inline it. 2714 */ 2715 static noinline struct ring_buffer_event * 2716 rb_move_tail(struct ring_buffer_per_cpu *cpu_buffer, 2717 unsigned long tail, struct rb_event_info *info) 2718 { 2719 struct buffer_page *tail_page = info->tail_page; 2720 struct buffer_page *commit_page = cpu_buffer->commit_page; 2721 struct trace_buffer *buffer = cpu_buffer->buffer; 2722 struct buffer_page *next_page; 2723 int ret; 2724 2725 next_page = tail_page; 2726 2727 rb_inc_page(&next_page); 2728 2729 /* 2730 * If for some reason, we had an interrupt storm that made 2731 * it all the way around the buffer, bail, and warn 2732 * about it. 2733 */ 2734 if (unlikely(next_page == commit_page)) { 2735 local_inc(&cpu_buffer->commit_overrun); 2736 goto out_reset; 2737 } 2738 2739 /* 2740 * This is where the fun begins! 2741 * 2742 * We are fighting against races between a reader that 2743 * could be on another CPU trying to swap its reader 2744 * page with the buffer head. 2745 * 2746 * We are also fighting against interrupts coming in and 2747 * moving the head or tail on us as well. 2748 * 2749 * If the next page is the head page then we have filled 2750 * the buffer, unless the commit page is still on the 2751 * reader page. 2752 */ 2753 if (rb_is_head_page(next_page, &tail_page->list)) { 2754 2755 /* 2756 * If the commit is not on the reader page, then 2757 * move the header page. 2758 */ 2759 if (!rb_is_reader_page(cpu_buffer->commit_page)) { 2760 /* 2761 * If we are not in overwrite mode, 2762 * this is easy, just stop here. 2763 */ 2764 if (!(buffer->flags & RB_FL_OVERWRITE)) { 2765 local_inc(&cpu_buffer->dropped_events); 2766 goto out_reset; 2767 } 2768 2769 ret = rb_handle_head_page(cpu_buffer, 2770 tail_page, 2771 next_page); 2772 if (ret < 0) 2773 goto out_reset; 2774 if (ret) 2775 goto out_again; 2776 } else { 2777 /* 2778 * We need to be careful here too. The 2779 * commit page could still be on the reader 2780 * page. We could have a small buffer, and 2781 * have filled up the buffer with events 2782 * from interrupts and such, and wrapped. 2783 * 2784 * Note, if the tail page is also on the 2785 * reader_page, we let it move out. 2786 */ 2787 if (unlikely((cpu_buffer->commit_page != 2788 cpu_buffer->tail_page) && 2789 (cpu_buffer->commit_page == 2790 cpu_buffer->reader_page))) { 2791 local_inc(&cpu_buffer->commit_overrun); 2792 goto out_reset; 2793 } 2794 } 2795 } 2796 2797 rb_tail_page_update(cpu_buffer, tail_page, next_page); 2798 2799 out_again: 2800 2801 rb_reset_tail(cpu_buffer, tail, info); 2802 2803 /* Commit what we have for now. */ 2804 rb_end_commit(cpu_buffer); 2805 /* rb_end_commit() decs committing */ 2806 local_inc(&cpu_buffer->committing); 2807 2808 /* fail and let the caller try again */ 2809 return ERR_PTR(-EAGAIN); 2810 2811 out_reset: 2812 /* reset write */ 2813 rb_reset_tail(cpu_buffer, tail, info); 2814 2815 return NULL; 2816 } 2817 2818 /* Slow path */ 2819 static struct ring_buffer_event * 2820 rb_add_time_stamp(struct ring_buffer_event *event, u64 delta, bool abs) 2821 { 2822 if (abs) 2823 event->type_len = RINGBUF_TYPE_TIME_STAMP; 2824 else 2825 event->type_len = RINGBUF_TYPE_TIME_EXTEND; 2826 2827 /* Not the first event on the page, or not delta? */ 2828 if (abs || rb_event_index(event)) { 2829 event->time_delta = delta & TS_MASK; 2830 event->array[0] = delta >> TS_SHIFT; 2831 } else { 2832 /* nope, just zero it */ 2833 event->time_delta = 0; 2834 event->array[0] = 0; 2835 } 2836 2837 return skip_time_extend(event); 2838 } 2839 2840 #ifndef CONFIG_HAVE_UNSTABLE_SCHED_CLOCK 2841 static inline bool sched_clock_stable(void) 2842 { 2843 return true; 2844 } 2845 #endif 2846 2847 static void 2848 rb_check_timestamp(struct ring_buffer_per_cpu *cpu_buffer, 2849 struct rb_event_info *info) 2850 { 2851 u64 write_stamp; 2852 2853 WARN_ONCE(1, "Delta way too big! %llu ts=%llu before=%llu after=%llu write stamp=%llu\n%s", 2854 (unsigned long long)info->delta, 2855 (unsigned long long)info->ts, 2856 (unsigned long long)info->before, 2857 (unsigned long long)info->after, 2858 (unsigned long long)(rb_time_read(&cpu_buffer->write_stamp, &write_stamp) ? write_stamp : 0), 2859 sched_clock_stable() ? "" : 2860 "If you just came from a suspend/resume,\n" 2861 "please switch to the trace global clock:\n" 2862 " echo global > /sys/kernel/tracing/trace_clock\n" 2863 "or add trace_clock=global to the kernel command line\n"); 2864 } 2865 2866 static void rb_add_timestamp(struct ring_buffer_per_cpu *cpu_buffer, 2867 struct ring_buffer_event **event, 2868 struct rb_event_info *info, 2869 u64 *delta, 2870 unsigned int *length) 2871 { 2872 bool abs = info->add_timestamp & 2873 (RB_ADD_STAMP_FORCE | RB_ADD_STAMP_ABSOLUTE); 2874 2875 if (unlikely(info->delta > (1ULL << 59))) { 2876 /* 2877 * Some timers can use more than 59 bits, and when a timestamp 2878 * is added to the buffer, it will lose those bits. 2879 */ 2880 if (abs && (info->ts & TS_MSB)) { 2881 info->delta &= ABS_TS_MASK; 2882 2883 /* did the clock go backwards */ 2884 } else if (info->before == info->after && info->before > info->ts) { 2885 /* not interrupted */ 2886 static int once; 2887 2888 /* 2889 * This is possible with a recalibrating of the TSC. 2890 * Do not produce a call stack, but just report it. 2891 */ 2892 if (!once) { 2893 once++; 2894 pr_warn("Ring buffer clock went backwards: %llu -> %llu\n", 2895 info->before, info->ts); 2896 } 2897 } else 2898 rb_check_timestamp(cpu_buffer, info); 2899 if (!abs) 2900 info->delta = 0; 2901 } 2902 *event = rb_add_time_stamp(*event, info->delta, abs); 2903 *length -= RB_LEN_TIME_EXTEND; 2904 *delta = 0; 2905 } 2906 2907 /** 2908 * rb_update_event - update event type and data 2909 * @cpu_buffer: The per cpu buffer of the @event 2910 * @event: the event to update 2911 * @info: The info to update the @event with (contains length and delta) 2912 * 2913 * Update the type and data fields of the @event. The length 2914 * is the actual size that is written to the ring buffer, 2915 * and with this, we can determine what to place into the 2916 * data field. 2917 */ 2918 static void 2919 rb_update_event(struct ring_buffer_per_cpu *cpu_buffer, 2920 struct ring_buffer_event *event, 2921 struct rb_event_info *info) 2922 { 2923 unsigned length = info->length; 2924 u64 delta = info->delta; 2925 unsigned int nest = local_read(&cpu_buffer->committing) - 1; 2926 2927 if (!WARN_ON_ONCE(nest >= MAX_NEST)) 2928 cpu_buffer->event_stamp[nest] = info->ts; 2929 2930 /* 2931 * If we need to add a timestamp, then we 2932 * add it to the start of the reserved space. 2933 */ 2934 if (unlikely(info->add_timestamp)) 2935 rb_add_timestamp(cpu_buffer, &event, info, &delta, &length); 2936 2937 event->time_delta = delta; 2938 length -= RB_EVNT_HDR_SIZE; 2939 if (length > RB_MAX_SMALL_DATA || RB_FORCE_8BYTE_ALIGNMENT) { 2940 event->type_len = 0; 2941 event->array[0] = length; 2942 } else 2943 event->type_len = DIV_ROUND_UP(length, RB_ALIGNMENT); 2944 } 2945 2946 static unsigned rb_calculate_event_length(unsigned length) 2947 { 2948 struct ring_buffer_event event; /* Used only for sizeof array */ 2949 2950 /* zero length can cause confusions */ 2951 if (!length) 2952 length++; 2953 2954 if (length > RB_MAX_SMALL_DATA || RB_FORCE_8BYTE_ALIGNMENT) 2955 length += sizeof(event.array[0]); 2956 2957 length += RB_EVNT_HDR_SIZE; 2958 length = ALIGN(length, RB_ARCH_ALIGNMENT); 2959 2960 /* 2961 * In case the time delta is larger than the 27 bits for it 2962 * in the header, we need to add a timestamp. If another 2963 * event comes in when trying to discard this one to increase 2964 * the length, then the timestamp will be added in the allocated 2965 * space of this event. If length is bigger than the size needed 2966 * for the TIME_EXTEND, then padding has to be used. The events 2967 * length must be either RB_LEN_TIME_EXTEND, or greater than or equal 2968 * to RB_LEN_TIME_EXTEND + 8, as 8 is the minimum size for padding. 2969 * As length is a multiple of 4, we only need to worry if it 2970 * is 12 (RB_LEN_TIME_EXTEND + 4). 2971 */ 2972 if (length == RB_LEN_TIME_EXTEND + RB_ALIGNMENT) 2973 length += RB_ALIGNMENT; 2974 2975 return length; 2976 } 2977 2978 static u64 rb_time_delta(struct ring_buffer_event *event) 2979 { 2980 switch (event->type_len) { 2981 case RINGBUF_TYPE_PADDING: 2982 return 0; 2983 2984 case RINGBUF_TYPE_TIME_EXTEND: 2985 return rb_event_time_stamp(event); 2986 2987 case RINGBUF_TYPE_TIME_STAMP: 2988 return 0; 2989 2990 case RINGBUF_TYPE_DATA: 2991 return event->time_delta; 2992 default: 2993 return 0; 2994 } 2995 } 2996 2997 static inline bool 2998 rb_try_to_discard(struct ring_buffer_per_cpu *cpu_buffer, 2999 struct ring_buffer_event *event) 3000 { 3001 unsigned long new_index, old_index; 3002 struct buffer_page *bpage; 3003 unsigned long index; 3004 unsigned long addr; 3005 u64 write_stamp; 3006 u64 delta; 3007 3008 new_index = rb_event_index(event); 3009 old_index = new_index + rb_event_ts_length(event); 3010 addr = (unsigned long)event; 3011 addr &= PAGE_MASK; 3012 3013 bpage = READ_ONCE(cpu_buffer->tail_page); 3014 3015 delta = rb_time_delta(event); 3016 3017 if (!rb_time_read(&cpu_buffer->write_stamp, &write_stamp)) 3018 return false; 3019 3020 /* Make sure the write stamp is read before testing the location */ 3021 barrier(); 3022 3023 if (bpage->page == (void *)addr && rb_page_write(bpage) == old_index) { 3024 unsigned long write_mask = 3025 local_read(&bpage->write) & ~RB_WRITE_MASK; 3026 unsigned long event_length = rb_event_length(event); 3027 3028 /* Something came in, can't discard */ 3029 if (!rb_time_cmpxchg(&cpu_buffer->write_stamp, 3030 write_stamp, write_stamp - delta)) 3031 return false; 3032 3033 /* 3034 * It's possible that the event time delta is zero 3035 * (has the same time stamp as the previous event) 3036 * in which case write_stamp and before_stamp could 3037 * be the same. In such a case, force before_stamp 3038 * to be different than write_stamp. It doesn't 3039 * matter what it is, as long as its different. 3040 */ 3041 if (!delta) 3042 rb_time_set(&cpu_buffer->before_stamp, 0); 3043 3044 /* 3045 * If an event were to come in now, it would see that the 3046 * write_stamp and the before_stamp are different, and assume 3047 * that this event just added itself before updating 3048 * the write stamp. The interrupting event will fix the 3049 * write stamp for us, and use the before stamp as its delta. 3050 */ 3051 3052 /* 3053 * This is on the tail page. It is possible that 3054 * a write could come in and move the tail page 3055 * and write to the next page. That is fine 3056 * because we just shorten what is on this page. 3057 */ 3058 old_index += write_mask; 3059 new_index += write_mask; 3060 index = local_cmpxchg(&bpage->write, old_index, new_index); 3061 if (index == old_index) { 3062 /* update counters */ 3063 local_sub(event_length, &cpu_buffer->entries_bytes); 3064 return true; 3065 } 3066 } 3067 3068 /* could not discard */ 3069 return false; 3070 } 3071 3072 static void rb_start_commit(struct ring_buffer_per_cpu *cpu_buffer) 3073 { 3074 local_inc(&cpu_buffer->committing); 3075 local_inc(&cpu_buffer->commits); 3076 } 3077 3078 static __always_inline void 3079 rb_set_commit_to_write(struct ring_buffer_per_cpu *cpu_buffer) 3080 { 3081 unsigned long max_count; 3082 3083 /* 3084 * We only race with interrupts and NMIs on this CPU. 3085 * If we own the commit event, then we can commit 3086 * all others that interrupted us, since the interruptions 3087 * are in stack format (they finish before they come 3088 * back to us). This allows us to do a simple loop to 3089 * assign the commit to the tail. 3090 */ 3091 again: 3092 max_count = cpu_buffer->nr_pages * 100; 3093 3094 while (cpu_buffer->commit_page != READ_ONCE(cpu_buffer->tail_page)) { 3095 if (RB_WARN_ON(cpu_buffer, !(--max_count))) 3096 return; 3097 if (RB_WARN_ON(cpu_buffer, 3098 rb_is_reader_page(cpu_buffer->tail_page))) 3099 return; 3100 /* 3101 * No need for a memory barrier here, as the update 3102 * of the tail_page did it for this page. 3103 */ 3104 local_set(&cpu_buffer->commit_page->page->commit, 3105 rb_page_write(cpu_buffer->commit_page)); 3106 rb_inc_page(&cpu_buffer->commit_page); 3107 /* add barrier to keep gcc from optimizing too much */ 3108 barrier(); 3109 } 3110 while (rb_commit_index(cpu_buffer) != 3111 rb_page_write(cpu_buffer->commit_page)) { 3112 3113 /* Make sure the readers see the content of what is committed. */ 3114 smp_wmb(); 3115 local_set(&cpu_buffer->commit_page->page->commit, 3116 rb_page_write(cpu_buffer->commit_page)); 3117 RB_WARN_ON(cpu_buffer, 3118 local_read(&cpu_buffer->commit_page->page->commit) & 3119 ~RB_WRITE_MASK); 3120 barrier(); 3121 } 3122 3123 /* again, keep gcc from optimizing */ 3124 barrier(); 3125 3126 /* 3127 * If an interrupt came in just after the first while loop 3128 * and pushed the tail page forward, we will be left with 3129 * a dangling commit that will never go forward. 3130 */ 3131 if (unlikely(cpu_buffer->commit_page != READ_ONCE(cpu_buffer->tail_page))) 3132 goto again; 3133 } 3134 3135 static __always_inline void rb_end_commit(struct ring_buffer_per_cpu *cpu_buffer) 3136 { 3137 unsigned long commits; 3138 3139 if (RB_WARN_ON(cpu_buffer, 3140 !local_read(&cpu_buffer->committing))) 3141 return; 3142 3143 again: 3144 commits = local_read(&cpu_buffer->commits); 3145 /* synchronize with interrupts */ 3146 barrier(); 3147 if (local_read(&cpu_buffer->committing) == 1) 3148 rb_set_commit_to_write(cpu_buffer); 3149 3150 local_dec(&cpu_buffer->committing); 3151 3152 /* synchronize with interrupts */ 3153 barrier(); 3154 3155 /* 3156 * Need to account for interrupts coming in between the 3157 * updating of the commit page and the clearing of the 3158 * committing counter. 3159 */ 3160 if (unlikely(local_read(&cpu_buffer->commits) != commits) && 3161 !local_read(&cpu_buffer->committing)) { 3162 local_inc(&cpu_buffer->committing); 3163 goto again; 3164 } 3165 } 3166 3167 static inline void rb_event_discard(struct ring_buffer_event *event) 3168 { 3169 if (extended_time(event)) 3170 event = skip_time_extend(event); 3171 3172 /* array[0] holds the actual length for the discarded event */ 3173 event->array[0] = rb_event_data_length(event) - RB_EVNT_HDR_SIZE; 3174 event->type_len = RINGBUF_TYPE_PADDING; 3175 /* time delta must be non zero */ 3176 if (!event->time_delta) 3177 event->time_delta = 1; 3178 } 3179 3180 static void rb_commit(struct ring_buffer_per_cpu *cpu_buffer) 3181 { 3182 local_inc(&cpu_buffer->entries); 3183 rb_end_commit(cpu_buffer); 3184 } 3185 3186 static __always_inline void 3187 rb_wakeups(struct trace_buffer *buffer, struct ring_buffer_per_cpu *cpu_buffer) 3188 { 3189 if (buffer->irq_work.waiters_pending) { 3190 buffer->irq_work.waiters_pending = false; 3191 /* irq_work_queue() supplies it's own memory barriers */ 3192 irq_work_queue(&buffer->irq_work.work); 3193 } 3194 3195 if (cpu_buffer->irq_work.waiters_pending) { 3196 cpu_buffer->irq_work.waiters_pending = false; 3197 /* irq_work_queue() supplies it's own memory barriers */ 3198 irq_work_queue(&cpu_buffer->irq_work.work); 3199 } 3200 3201 if (cpu_buffer->last_pages_touch == local_read(&cpu_buffer->pages_touched)) 3202 return; 3203 3204 if (cpu_buffer->reader_page == cpu_buffer->commit_page) 3205 return; 3206 3207 if (!cpu_buffer->irq_work.full_waiters_pending) 3208 return; 3209 3210 cpu_buffer->last_pages_touch = local_read(&cpu_buffer->pages_touched); 3211 3212 if (!full_hit(buffer, cpu_buffer->cpu, cpu_buffer->shortest_full)) 3213 return; 3214 3215 cpu_buffer->irq_work.wakeup_full = true; 3216 cpu_buffer->irq_work.full_waiters_pending = false; 3217 /* irq_work_queue() supplies it's own memory barriers */ 3218 irq_work_queue(&cpu_buffer->irq_work.work); 3219 } 3220 3221 #ifdef CONFIG_RING_BUFFER_RECORD_RECURSION 3222 # define do_ring_buffer_record_recursion() \ 3223 do_ftrace_record_recursion(_THIS_IP_, _RET_IP_) 3224 #else 3225 # define do_ring_buffer_record_recursion() do { } while (0) 3226 #endif 3227 3228 /* 3229 * The lock and unlock are done within a preempt disable section. 3230 * The current_context per_cpu variable can only be modified 3231 * by the current task between lock and unlock. But it can 3232 * be modified more than once via an interrupt. To pass this 3233 * information from the lock to the unlock without having to 3234 * access the 'in_interrupt()' functions again (which do show 3235 * a bit of overhead in something as critical as function tracing, 3236 * we use a bitmask trick. 3237 * 3238 * bit 1 = NMI context 3239 * bit 2 = IRQ context 3240 * bit 3 = SoftIRQ context 3241 * bit 4 = normal context. 3242 * 3243 * This works because this is the order of contexts that can 3244 * preempt other contexts. A SoftIRQ never preempts an IRQ 3245 * context. 3246 * 3247 * When the context is determined, the corresponding bit is 3248 * checked and set (if it was set, then a recursion of that context 3249 * happened). 3250 * 3251 * On unlock, we need to clear this bit. To do so, just subtract 3252 * 1 from the current_context and AND it to itself. 3253 * 3254 * (binary) 3255 * 101 - 1 = 100 3256 * 101 & 100 = 100 (clearing bit zero) 3257 * 3258 * 1010 - 1 = 1001 3259 * 1010 & 1001 = 1000 (clearing bit 1) 3260 * 3261 * The least significant bit can be cleared this way, and it 3262 * just so happens that it is the same bit corresponding to 3263 * the current context. 3264 * 3265 * Now the TRANSITION bit breaks the above slightly. The TRANSITION bit 3266 * is set when a recursion is detected at the current context, and if 3267 * the TRANSITION bit is already set, it will fail the recursion. 3268 * This is needed because there's a lag between the changing of 3269 * interrupt context and updating the preempt count. In this case, 3270 * a false positive will be found. To handle this, one extra recursion 3271 * is allowed, and this is done by the TRANSITION bit. If the TRANSITION 3272 * bit is already set, then it is considered a recursion and the function 3273 * ends. Otherwise, the TRANSITION bit is set, and that bit is returned. 3274 * 3275 * On the trace_recursive_unlock(), the TRANSITION bit will be the first 3276 * to be cleared. Even if it wasn't the context that set it. That is, 3277 * if an interrupt comes in while NORMAL bit is set and the ring buffer 3278 * is called before preempt_count() is updated, since the check will 3279 * be on the NORMAL bit, the TRANSITION bit will then be set. If an 3280 * NMI then comes in, it will set the NMI bit, but when the NMI code 3281 * does the trace_recursive_unlock() it will clear the TRANSITION bit 3282 * and leave the NMI bit set. But this is fine, because the interrupt 3283 * code that set the TRANSITION bit will then clear the NMI bit when it 3284 * calls trace_recursive_unlock(). If another NMI comes in, it will 3285 * set the TRANSITION bit and continue. 3286 * 3287 * Note: The TRANSITION bit only handles a single transition between context. 3288 */ 3289 3290 static __always_inline bool 3291 trace_recursive_lock(struct ring_buffer_per_cpu *cpu_buffer) 3292 { 3293 unsigned int val = cpu_buffer->current_context; 3294 int bit = interrupt_context_level(); 3295 3296 bit = RB_CTX_NORMAL - bit; 3297 3298 if (unlikely(val & (1 << (bit + cpu_buffer->nest)))) { 3299 /* 3300 * It is possible that this was called by transitioning 3301 * between interrupt context, and preempt_count() has not 3302 * been updated yet. In this case, use the TRANSITION bit. 3303 */ 3304 bit = RB_CTX_TRANSITION; 3305 if (val & (1 << (bit + cpu_buffer->nest))) { 3306 do_ring_buffer_record_recursion(); 3307 return true; 3308 } 3309 } 3310 3311 val |= (1 << (bit + cpu_buffer->nest)); 3312 cpu_buffer->current_context = val; 3313 3314 return false; 3315 } 3316 3317 static __always_inline void 3318 trace_recursive_unlock(struct ring_buffer_per_cpu *cpu_buffer) 3319 { 3320 cpu_buffer->current_context &= 3321 cpu_buffer->current_context - (1 << cpu_buffer->nest); 3322 } 3323 3324 /* The recursive locking above uses 5 bits */ 3325 #define NESTED_BITS 5 3326 3327 /** 3328 * ring_buffer_nest_start - Allow to trace while nested 3329 * @buffer: The ring buffer to modify 3330 * 3331 * The ring buffer has a safety mechanism to prevent recursion. 3332 * But there may be a case where a trace needs to be done while 3333 * tracing something else. In this case, calling this function 3334 * will allow this function to nest within a currently active 3335 * ring_buffer_lock_reserve(). 3336 * 3337 * Call this function before calling another ring_buffer_lock_reserve() and 3338 * call ring_buffer_nest_end() after the nested ring_buffer_unlock_commit(). 3339 */ 3340 void ring_buffer_nest_start(struct trace_buffer *buffer) 3341 { 3342 struct ring_buffer_per_cpu *cpu_buffer; 3343 int cpu; 3344 3345 /* Enabled by ring_buffer_nest_end() */ 3346 preempt_disable_notrace(); 3347 cpu = raw_smp_processor_id(); 3348 cpu_buffer = buffer->buffers[cpu]; 3349 /* This is the shift value for the above recursive locking */ 3350 cpu_buffer->nest += NESTED_BITS; 3351 } 3352 3353 /** 3354 * ring_buffer_nest_end - Allow to trace while nested 3355 * @buffer: The ring buffer to modify 3356 * 3357 * Must be called after ring_buffer_nest_start() and after the 3358 * ring_buffer_unlock_commit(). 3359 */ 3360 void ring_buffer_nest_end(struct trace_buffer *buffer) 3361 { 3362 struct ring_buffer_per_cpu *cpu_buffer; 3363 int cpu; 3364 3365 /* disabled by ring_buffer_nest_start() */ 3366 cpu = raw_smp_processor_id(); 3367 cpu_buffer = buffer->buffers[cpu]; 3368 /* This is the shift value for the above recursive locking */ 3369 cpu_buffer->nest -= NESTED_BITS; 3370 preempt_enable_notrace(); 3371 } 3372 3373 /** 3374 * ring_buffer_unlock_commit - commit a reserved 3375 * @buffer: The buffer to commit to 3376 * @event: The event pointer to commit. 3377 * 3378 * This commits the data to the ring buffer, and releases any locks held. 3379 * 3380 * Must be paired with ring_buffer_lock_reserve. 3381 */ 3382 int ring_buffer_unlock_commit(struct trace_buffer *buffer) 3383 { 3384 struct ring_buffer_per_cpu *cpu_buffer; 3385 int cpu = raw_smp_processor_id(); 3386 3387 cpu_buffer = buffer->buffers[cpu]; 3388 3389 rb_commit(cpu_buffer); 3390 3391 rb_wakeups(buffer, cpu_buffer); 3392 3393 trace_recursive_unlock(cpu_buffer); 3394 3395 preempt_enable_notrace(); 3396 3397 return 0; 3398 } 3399 EXPORT_SYMBOL_GPL(ring_buffer_unlock_commit); 3400 3401 /* Special value to validate all deltas on a page. */ 3402 #define CHECK_FULL_PAGE 1L 3403 3404 #ifdef CONFIG_RING_BUFFER_VALIDATE_TIME_DELTAS 3405 static void dump_buffer_page(struct buffer_data_page *bpage, 3406 struct rb_event_info *info, 3407 unsigned long tail) 3408 { 3409 struct ring_buffer_event *event; 3410 u64 ts, delta; 3411 int e; 3412 3413 ts = bpage->time_stamp; 3414 pr_warn(" [%lld] PAGE TIME STAMP\n", ts); 3415 3416 for (e = 0; e < tail; e += rb_event_length(event)) { 3417 3418 event = (struct ring_buffer_event *)(bpage->data + e); 3419 3420 switch (event->type_len) { 3421 3422 case RINGBUF_TYPE_TIME_EXTEND: 3423 delta = rb_event_time_stamp(event); 3424 ts += delta; 3425 pr_warn(" [%lld] delta:%lld TIME EXTEND\n", ts, delta); 3426 break; 3427 3428 case RINGBUF_TYPE_TIME_STAMP: 3429 delta = rb_event_time_stamp(event); 3430 ts = rb_fix_abs_ts(delta, ts); 3431 pr_warn(" [%lld] absolute:%lld TIME STAMP\n", ts, delta); 3432 break; 3433 3434 case RINGBUF_TYPE_PADDING: 3435 ts += event->time_delta; 3436 pr_warn(" [%lld] delta:%d PADDING\n", ts, event->time_delta); 3437 break; 3438 3439 case RINGBUF_TYPE_DATA: 3440 ts += event->time_delta; 3441 pr_warn(" [%lld] delta:%d\n", ts, event->time_delta); 3442 break; 3443 3444 default: 3445 break; 3446 } 3447 } 3448 } 3449 3450 static DEFINE_PER_CPU(atomic_t, checking); 3451 static atomic_t ts_dump; 3452 3453 /* 3454 * Check if the current event time stamp matches the deltas on 3455 * the buffer page. 3456 */ 3457 static void check_buffer(struct ring_buffer_per_cpu *cpu_buffer, 3458 struct rb_event_info *info, 3459 unsigned long tail) 3460 { 3461 struct ring_buffer_event *event; 3462 struct buffer_data_page *bpage; 3463 u64 ts, delta; 3464 bool full = false; 3465 int e; 3466 3467 bpage = info->tail_page->page; 3468 3469 if (tail == CHECK_FULL_PAGE) { 3470 full = true; 3471 tail = local_read(&bpage->commit); 3472 } else if (info->add_timestamp & 3473 (RB_ADD_STAMP_FORCE | RB_ADD_STAMP_ABSOLUTE)) { 3474 /* Ignore events with absolute time stamps */ 3475 return; 3476 } 3477 3478 /* 3479 * Do not check the first event (skip possible extends too). 3480 * Also do not check if previous events have not been committed. 3481 */ 3482 if (tail <= 8 || tail > local_read(&bpage->commit)) 3483 return; 3484 3485 /* 3486 * If this interrupted another event, 3487 */ 3488 if (atomic_inc_return(this_cpu_ptr(&checking)) != 1) 3489 goto out; 3490 3491 ts = bpage->time_stamp; 3492 3493 for (e = 0; e < tail; e += rb_event_length(event)) { 3494 3495 event = (struct ring_buffer_event *)(bpage->data + e); 3496 3497 switch (event->type_len) { 3498 3499 case RINGBUF_TYPE_TIME_EXTEND: 3500 delta = rb_event_time_stamp(event); 3501 ts += delta; 3502 break; 3503 3504 case RINGBUF_TYPE_TIME_STAMP: 3505 delta = rb_event_time_stamp(event); 3506 ts = rb_fix_abs_ts(delta, ts); 3507 break; 3508 3509 case RINGBUF_TYPE_PADDING: 3510 if (event->time_delta == 1) 3511 break; 3512 fallthrough; 3513 case RINGBUF_TYPE_DATA: 3514 ts += event->time_delta; 3515 break; 3516 3517 default: 3518 RB_WARN_ON(cpu_buffer, 1); 3519 } 3520 } 3521 if ((full && ts > info->ts) || 3522 (!full && ts + info->delta != info->ts)) { 3523 /* If another report is happening, ignore this one */ 3524 if (atomic_inc_return(&ts_dump) != 1) { 3525 atomic_dec(&ts_dump); 3526 goto out; 3527 } 3528 atomic_inc(&cpu_buffer->record_disabled); 3529 /* There's some cases in boot up that this can happen */ 3530 WARN_ON_ONCE(system_state != SYSTEM_BOOTING); 3531 pr_warn("[CPU: %d]TIME DOES NOT MATCH expected:%lld actual:%lld delta:%lld before:%lld after:%lld%s\n", 3532 cpu_buffer->cpu, 3533 ts + info->delta, info->ts, info->delta, 3534 info->before, info->after, 3535 full ? " (full)" : ""); 3536 dump_buffer_page(bpage, info, tail); 3537 atomic_dec(&ts_dump); 3538 /* Do not re-enable checking */ 3539 return; 3540 } 3541 out: 3542 atomic_dec(this_cpu_ptr(&checking)); 3543 } 3544 #else 3545 static inline void check_buffer(struct ring_buffer_per_cpu *cpu_buffer, 3546 struct rb_event_info *info, 3547 unsigned long tail) 3548 { 3549 } 3550 #endif /* CONFIG_RING_BUFFER_VALIDATE_TIME_DELTAS */ 3551 3552 static struct ring_buffer_event * 3553 __rb_reserve_next(struct ring_buffer_per_cpu *cpu_buffer, 3554 struct rb_event_info *info) 3555 { 3556 struct ring_buffer_event *event; 3557 struct buffer_page *tail_page; 3558 unsigned long tail, write, w; 3559 bool a_ok; 3560 bool b_ok; 3561 3562 /* Don't let the compiler play games with cpu_buffer->tail_page */ 3563 tail_page = info->tail_page = READ_ONCE(cpu_buffer->tail_page); 3564 3565 /*A*/ w = local_read(&tail_page->write) & RB_WRITE_MASK; 3566 barrier(); 3567 b_ok = rb_time_read(&cpu_buffer->before_stamp, &info->before); 3568 a_ok = rb_time_read(&cpu_buffer->write_stamp, &info->after); 3569 barrier(); 3570 info->ts = rb_time_stamp(cpu_buffer->buffer); 3571 3572 if ((info->add_timestamp & RB_ADD_STAMP_ABSOLUTE)) { 3573 info->delta = info->ts; 3574 } else { 3575 /* 3576 * If interrupting an event time update, we may need an 3577 * absolute timestamp. 3578 * Don't bother if this is the start of a new page (w == 0). 3579 */ 3580 if (unlikely(!a_ok || !b_ok || (info->before != info->after && w))) { 3581 info->add_timestamp |= RB_ADD_STAMP_FORCE | RB_ADD_STAMP_EXTEND; 3582 info->length += RB_LEN_TIME_EXTEND; 3583 } else { 3584 info->delta = info->ts - info->after; 3585 if (unlikely(test_time_stamp(info->delta))) { 3586 info->add_timestamp |= RB_ADD_STAMP_EXTEND; 3587 info->length += RB_LEN_TIME_EXTEND; 3588 } 3589 } 3590 } 3591 3592 /*B*/ rb_time_set(&cpu_buffer->before_stamp, info->ts); 3593 3594 /*C*/ write = local_add_return(info->length, &tail_page->write); 3595 3596 /* set write to only the index of the write */ 3597 write &= RB_WRITE_MASK; 3598 3599 tail = write - info->length; 3600 3601 /* See if we shot pass the end of this buffer page */ 3602 if (unlikely(write > BUF_PAGE_SIZE)) { 3603 /* before and after may now different, fix it up*/ 3604 b_ok = rb_time_read(&cpu_buffer->before_stamp, &info->before); 3605 a_ok = rb_time_read(&cpu_buffer->write_stamp, &info->after); 3606 if (a_ok && b_ok && info->before != info->after) 3607 (void)rb_time_cmpxchg(&cpu_buffer->before_stamp, 3608 info->before, info->after); 3609 if (a_ok && b_ok) 3610 check_buffer(cpu_buffer, info, CHECK_FULL_PAGE); 3611 return rb_move_tail(cpu_buffer, tail, info); 3612 } 3613 3614 if (likely(tail == w)) { 3615 u64 save_before; 3616 bool s_ok; 3617 3618 /* Nothing interrupted us between A and C */ 3619 /*D*/ rb_time_set(&cpu_buffer->write_stamp, info->ts); 3620 barrier(); 3621 /*E*/ s_ok = rb_time_read(&cpu_buffer->before_stamp, &save_before); 3622 RB_WARN_ON(cpu_buffer, !s_ok); 3623 if (likely(!(info->add_timestamp & 3624 (RB_ADD_STAMP_FORCE | RB_ADD_STAMP_ABSOLUTE)))) 3625 /* This did not interrupt any time update */ 3626 info->delta = info->ts - info->after; 3627 else 3628 /* Just use full timestamp for interrupting event */ 3629 info->delta = info->ts; 3630 barrier(); 3631 check_buffer(cpu_buffer, info, tail); 3632 if (unlikely(info->ts != save_before)) { 3633 /* SLOW PATH - Interrupted between C and E */ 3634 3635 a_ok = rb_time_read(&cpu_buffer->write_stamp, &info->after); 3636 RB_WARN_ON(cpu_buffer, !a_ok); 3637 3638 /* Write stamp must only go forward */ 3639 if (save_before > info->after) { 3640 /* 3641 * We do not care about the result, only that 3642 * it gets updated atomically. 3643 */ 3644 (void)rb_time_cmpxchg(&cpu_buffer->write_stamp, 3645 info->after, save_before); 3646 } 3647 } 3648 } else { 3649 u64 ts; 3650 /* SLOW PATH - Interrupted between A and C */ 3651 a_ok = rb_time_read(&cpu_buffer->write_stamp, &info->after); 3652 /* Was interrupted before here, write_stamp must be valid */ 3653 RB_WARN_ON(cpu_buffer, !a_ok); 3654 ts = rb_time_stamp(cpu_buffer->buffer); 3655 barrier(); 3656 /*E*/ if (write == (local_read(&tail_page->write) & RB_WRITE_MASK) && 3657 info->after < ts && 3658 rb_time_cmpxchg(&cpu_buffer->write_stamp, 3659 info->after, ts)) { 3660 /* Nothing came after this event between C and E */ 3661 info->delta = ts - info->after; 3662 } else { 3663 /* 3664 * Interrupted between C and E: 3665 * Lost the previous events time stamp. Just set the 3666 * delta to zero, and this will be the same time as 3667 * the event this event interrupted. And the events that 3668 * came after this will still be correct (as they would 3669 * have built their delta on the previous event. 3670 */ 3671 info->delta = 0; 3672 } 3673 info->ts = ts; 3674 info->add_timestamp &= ~RB_ADD_STAMP_FORCE; 3675 } 3676 3677 /* 3678 * If this is the first commit on the page, then it has the same 3679 * timestamp as the page itself. 3680 */ 3681 if (unlikely(!tail && !(info->add_timestamp & 3682 (RB_ADD_STAMP_FORCE | RB_ADD_STAMP_ABSOLUTE)))) 3683 info->delta = 0; 3684 3685 /* We reserved something on the buffer */ 3686 3687 event = __rb_page_index(tail_page, tail); 3688 rb_update_event(cpu_buffer, event, info); 3689 3690 local_inc(&tail_page->entries); 3691 3692 /* 3693 * If this is the first commit on the page, then update 3694 * its timestamp. 3695 */ 3696 if (unlikely(!tail)) 3697 tail_page->page->time_stamp = info->ts; 3698 3699 /* account for these added bytes */ 3700 local_add(info->length, &cpu_buffer->entries_bytes); 3701 3702 return event; 3703 } 3704 3705 static __always_inline struct ring_buffer_event * 3706 rb_reserve_next_event(struct trace_buffer *buffer, 3707 struct ring_buffer_per_cpu *cpu_buffer, 3708 unsigned long length) 3709 { 3710 struct ring_buffer_event *event; 3711 struct rb_event_info info; 3712 int nr_loops = 0; 3713 int add_ts_default; 3714 3715 rb_start_commit(cpu_buffer); 3716 /* The commit page can not change after this */ 3717 3718 #ifdef CONFIG_RING_BUFFER_ALLOW_SWAP 3719 /* 3720 * Due to the ability to swap a cpu buffer from a buffer 3721 * it is possible it was swapped before we committed. 3722 * (committing stops a swap). We check for it here and 3723 * if it happened, we have to fail the write. 3724 */ 3725 barrier(); 3726 if (unlikely(READ_ONCE(cpu_buffer->buffer) != buffer)) { 3727 local_dec(&cpu_buffer->committing); 3728 local_dec(&cpu_buffer->commits); 3729 return NULL; 3730 } 3731 #endif 3732 3733 info.length = rb_calculate_event_length(length); 3734 3735 if (ring_buffer_time_stamp_abs(cpu_buffer->buffer)) { 3736 add_ts_default = RB_ADD_STAMP_ABSOLUTE; 3737 info.length += RB_LEN_TIME_EXTEND; 3738 } else { 3739 add_ts_default = RB_ADD_STAMP_NONE; 3740 } 3741 3742 again: 3743 info.add_timestamp = add_ts_default; 3744 info.delta = 0; 3745 3746 /* 3747 * We allow for interrupts to reenter here and do a trace. 3748 * If one does, it will cause this original code to loop 3749 * back here. Even with heavy interrupts happening, this 3750 * should only happen a few times in a row. If this happens 3751 * 1000 times in a row, there must be either an interrupt 3752 * storm or we have something buggy. 3753 * Bail! 3754 */ 3755 if (RB_WARN_ON(cpu_buffer, ++nr_loops > 1000)) 3756 goto out_fail; 3757 3758 event = __rb_reserve_next(cpu_buffer, &info); 3759 3760 if (unlikely(PTR_ERR(event) == -EAGAIN)) { 3761 if (info.add_timestamp & (RB_ADD_STAMP_FORCE | RB_ADD_STAMP_EXTEND)) 3762 info.length -= RB_LEN_TIME_EXTEND; 3763 goto again; 3764 } 3765 3766 if (likely(event)) 3767 return event; 3768 out_fail: 3769 rb_end_commit(cpu_buffer); 3770 return NULL; 3771 } 3772 3773 /** 3774 * ring_buffer_lock_reserve - reserve a part of the buffer 3775 * @buffer: the ring buffer to reserve from 3776 * @length: the length of the data to reserve (excluding event header) 3777 * 3778 * Returns a reserved event on the ring buffer to copy directly to. 3779 * The user of this interface will need to get the body to write into 3780 * and can use the ring_buffer_event_data() interface. 3781 * 3782 * The length is the length of the data needed, not the event length 3783 * which also includes the event header. 3784 * 3785 * Must be paired with ring_buffer_unlock_commit, unless NULL is returned. 3786 * If NULL is returned, then nothing has been allocated or locked. 3787 */ 3788 struct ring_buffer_event * 3789 ring_buffer_lock_reserve(struct trace_buffer *buffer, unsigned long length) 3790 { 3791 struct ring_buffer_per_cpu *cpu_buffer; 3792 struct ring_buffer_event *event; 3793 int cpu; 3794 3795 /* If we are tracing schedule, we don't want to recurse */ 3796 preempt_disable_notrace(); 3797 3798 if (unlikely(atomic_read(&buffer->record_disabled))) 3799 goto out; 3800 3801 cpu = raw_smp_processor_id(); 3802 3803 if (unlikely(!cpumask_test_cpu(cpu, buffer->cpumask))) 3804 goto out; 3805 3806 cpu_buffer = buffer->buffers[cpu]; 3807 3808 if (unlikely(atomic_read(&cpu_buffer->record_disabled))) 3809 goto out; 3810 3811 if (unlikely(length > BUF_MAX_DATA_SIZE)) 3812 goto out; 3813 3814 if (unlikely(trace_recursive_lock(cpu_buffer))) 3815 goto out; 3816 3817 event = rb_reserve_next_event(buffer, cpu_buffer, length); 3818 if (!event) 3819 goto out_unlock; 3820 3821 return event; 3822 3823 out_unlock: 3824 trace_recursive_unlock(cpu_buffer); 3825 out: 3826 preempt_enable_notrace(); 3827 return NULL; 3828 } 3829 EXPORT_SYMBOL_GPL(ring_buffer_lock_reserve); 3830 3831 /* 3832 * Decrement the entries to the page that an event is on. 3833 * The event does not even need to exist, only the pointer 3834 * to the page it is on. This may only be called before the commit 3835 * takes place. 3836 */ 3837 static inline void 3838 rb_decrement_entry(struct ring_buffer_per_cpu *cpu_buffer, 3839 struct ring_buffer_event *event) 3840 { 3841 unsigned long addr = (unsigned long)event; 3842 struct buffer_page *bpage = cpu_buffer->commit_page; 3843 struct buffer_page *start; 3844 3845 addr &= PAGE_MASK; 3846 3847 /* Do the likely case first */ 3848 if (likely(bpage->page == (void *)addr)) { 3849 local_dec(&bpage->entries); 3850 return; 3851 } 3852 3853 /* 3854 * Because the commit page may be on the reader page we 3855 * start with the next page and check the end loop there. 3856 */ 3857 rb_inc_page(&bpage); 3858 start = bpage; 3859 do { 3860 if (bpage->page == (void *)addr) { 3861 local_dec(&bpage->entries); 3862 return; 3863 } 3864 rb_inc_page(&bpage); 3865 } while (bpage != start); 3866 3867 /* commit not part of this buffer?? */ 3868 RB_WARN_ON(cpu_buffer, 1); 3869 } 3870 3871 /** 3872 * ring_buffer_discard_commit - discard an event that has not been committed 3873 * @buffer: the ring buffer 3874 * @event: non committed event to discard 3875 * 3876 * Sometimes an event that is in the ring buffer needs to be ignored. 3877 * This function lets the user discard an event in the ring buffer 3878 * and then that event will not be read later. 3879 * 3880 * This function only works if it is called before the item has been 3881 * committed. It will try to free the event from the ring buffer 3882 * if another event has not been added behind it. 3883 * 3884 * If another event has been added behind it, it will set the event 3885 * up as discarded, and perform the commit. 3886 * 3887 * If this function is called, do not call ring_buffer_unlock_commit on 3888 * the event. 3889 */ 3890 void ring_buffer_discard_commit(struct trace_buffer *buffer, 3891 struct ring_buffer_event *event) 3892 { 3893 struct ring_buffer_per_cpu *cpu_buffer; 3894 int cpu; 3895 3896 /* The event is discarded regardless */ 3897 rb_event_discard(event); 3898 3899 cpu = smp_processor_id(); 3900 cpu_buffer = buffer->buffers[cpu]; 3901 3902 /* 3903 * This must only be called if the event has not been 3904 * committed yet. Thus we can assume that preemption 3905 * is still disabled. 3906 */ 3907 RB_WARN_ON(buffer, !local_read(&cpu_buffer->committing)); 3908 3909 rb_decrement_entry(cpu_buffer, event); 3910 if (rb_try_to_discard(cpu_buffer, event)) 3911 goto out; 3912 3913 out: 3914 rb_end_commit(cpu_buffer); 3915 3916 trace_recursive_unlock(cpu_buffer); 3917 3918 preempt_enable_notrace(); 3919 3920 } 3921 EXPORT_SYMBOL_GPL(ring_buffer_discard_commit); 3922 3923 /** 3924 * ring_buffer_write - write data to the buffer without reserving 3925 * @buffer: The ring buffer to write to. 3926 * @length: The length of the data being written (excluding the event header) 3927 * @data: The data to write to the buffer. 3928 * 3929 * This is like ring_buffer_lock_reserve and ring_buffer_unlock_commit as 3930 * one function. If you already have the data to write to the buffer, it 3931 * may be easier to simply call this function. 3932 * 3933 * Note, like ring_buffer_lock_reserve, the length is the length of the data 3934 * and not the length of the event which would hold the header. 3935 */ 3936 int ring_buffer_write(struct trace_buffer *buffer, 3937 unsigned long length, 3938 void *data) 3939 { 3940 struct ring_buffer_per_cpu *cpu_buffer; 3941 struct ring_buffer_event *event; 3942 void *body; 3943 int ret = -EBUSY; 3944 int cpu; 3945 3946 preempt_disable_notrace(); 3947 3948 if (atomic_read(&buffer->record_disabled)) 3949 goto out; 3950 3951 cpu = raw_smp_processor_id(); 3952 3953 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 3954 goto out; 3955 3956 cpu_buffer = buffer->buffers[cpu]; 3957 3958 if (atomic_read(&cpu_buffer->record_disabled)) 3959 goto out; 3960 3961 if (length > BUF_MAX_DATA_SIZE) 3962 goto out; 3963 3964 if (unlikely(trace_recursive_lock(cpu_buffer))) 3965 goto out; 3966 3967 event = rb_reserve_next_event(buffer, cpu_buffer, length); 3968 if (!event) 3969 goto out_unlock; 3970 3971 body = rb_event_data(event); 3972 3973 memcpy(body, data, length); 3974 3975 rb_commit(cpu_buffer); 3976 3977 rb_wakeups(buffer, cpu_buffer); 3978 3979 ret = 0; 3980 3981 out_unlock: 3982 trace_recursive_unlock(cpu_buffer); 3983 3984 out: 3985 preempt_enable_notrace(); 3986 3987 return ret; 3988 } 3989 EXPORT_SYMBOL_GPL(ring_buffer_write); 3990 3991 static bool rb_per_cpu_empty(struct ring_buffer_per_cpu *cpu_buffer) 3992 { 3993 struct buffer_page *reader = cpu_buffer->reader_page; 3994 struct buffer_page *head = rb_set_head_page(cpu_buffer); 3995 struct buffer_page *commit = cpu_buffer->commit_page; 3996 3997 /* In case of error, head will be NULL */ 3998 if (unlikely(!head)) 3999 return true; 4000 4001 /* Reader should exhaust content in reader page */ 4002 if (reader->read != rb_page_commit(reader)) 4003 return false; 4004 4005 /* 4006 * If writers are committing on the reader page, knowing all 4007 * committed content has been read, the ring buffer is empty. 4008 */ 4009 if (commit == reader) 4010 return true; 4011 4012 /* 4013 * If writers are committing on a page other than reader page 4014 * and head page, there should always be content to read. 4015 */ 4016 if (commit != head) 4017 return false; 4018 4019 /* 4020 * Writers are committing on the head page, we just need 4021 * to care about there're committed data, and the reader will 4022 * swap reader page with head page when it is to read data. 4023 */ 4024 return rb_page_commit(commit) == 0; 4025 } 4026 4027 /** 4028 * ring_buffer_record_disable - stop all writes into the buffer 4029 * @buffer: The ring buffer to stop writes to. 4030 * 4031 * This prevents all writes to the buffer. Any attempt to write 4032 * to the buffer after this will fail and return NULL. 4033 * 4034 * The caller should call synchronize_rcu() after this. 4035 */ 4036 void ring_buffer_record_disable(struct trace_buffer *buffer) 4037 { 4038 atomic_inc(&buffer->record_disabled); 4039 } 4040 EXPORT_SYMBOL_GPL(ring_buffer_record_disable); 4041 4042 /** 4043 * ring_buffer_record_enable - enable writes to the buffer 4044 * @buffer: The ring buffer to enable writes 4045 * 4046 * Note, multiple disables will need the same number of enables 4047 * to truly enable the writing (much like preempt_disable). 4048 */ 4049 void ring_buffer_record_enable(struct trace_buffer *buffer) 4050 { 4051 atomic_dec(&buffer->record_disabled); 4052 } 4053 EXPORT_SYMBOL_GPL(ring_buffer_record_enable); 4054 4055 /** 4056 * ring_buffer_record_off - stop all writes into the buffer 4057 * @buffer: The ring buffer to stop writes to. 4058 * 4059 * This prevents all writes to the buffer. Any attempt to write 4060 * to the buffer after this will fail and return NULL. 4061 * 4062 * This is different than ring_buffer_record_disable() as 4063 * it works like an on/off switch, where as the disable() version 4064 * must be paired with a enable(). 4065 */ 4066 void ring_buffer_record_off(struct trace_buffer *buffer) 4067 { 4068 unsigned int rd; 4069 unsigned int new_rd; 4070 4071 rd = atomic_read(&buffer->record_disabled); 4072 do { 4073 new_rd = rd | RB_BUFFER_OFF; 4074 } while (!atomic_try_cmpxchg(&buffer->record_disabled, &rd, new_rd)); 4075 } 4076 EXPORT_SYMBOL_GPL(ring_buffer_record_off); 4077 4078 /** 4079 * ring_buffer_record_on - restart writes into the buffer 4080 * @buffer: The ring buffer to start writes to. 4081 * 4082 * This enables all writes to the buffer that was disabled by 4083 * ring_buffer_record_off(). 4084 * 4085 * This is different than ring_buffer_record_enable() as 4086 * it works like an on/off switch, where as the enable() version 4087 * must be paired with a disable(). 4088 */ 4089 void ring_buffer_record_on(struct trace_buffer *buffer) 4090 { 4091 unsigned int rd; 4092 unsigned int new_rd; 4093 4094 rd = atomic_read(&buffer->record_disabled); 4095 do { 4096 new_rd = rd & ~RB_BUFFER_OFF; 4097 } while (!atomic_try_cmpxchg(&buffer->record_disabled, &rd, new_rd)); 4098 } 4099 EXPORT_SYMBOL_GPL(ring_buffer_record_on); 4100 4101 /** 4102 * ring_buffer_record_is_on - return true if the ring buffer can write 4103 * @buffer: The ring buffer to see if write is enabled 4104 * 4105 * Returns true if the ring buffer is in a state that it accepts writes. 4106 */ 4107 bool ring_buffer_record_is_on(struct trace_buffer *buffer) 4108 { 4109 return !atomic_read(&buffer->record_disabled); 4110 } 4111 4112 /** 4113 * ring_buffer_record_is_set_on - return true if the ring buffer is set writable 4114 * @buffer: The ring buffer to see if write is set enabled 4115 * 4116 * Returns true if the ring buffer is set writable by ring_buffer_record_on(). 4117 * Note that this does NOT mean it is in a writable state. 4118 * 4119 * It may return true when the ring buffer has been disabled by 4120 * ring_buffer_record_disable(), as that is a temporary disabling of 4121 * the ring buffer. 4122 */ 4123 bool ring_buffer_record_is_set_on(struct trace_buffer *buffer) 4124 { 4125 return !(atomic_read(&buffer->record_disabled) & RB_BUFFER_OFF); 4126 } 4127 4128 /** 4129 * ring_buffer_record_disable_cpu - stop all writes into the cpu_buffer 4130 * @buffer: The ring buffer to stop writes to. 4131 * @cpu: The CPU buffer to stop 4132 * 4133 * This prevents all writes to the buffer. Any attempt to write 4134 * to the buffer after this will fail and return NULL. 4135 * 4136 * The caller should call synchronize_rcu() after this. 4137 */ 4138 void ring_buffer_record_disable_cpu(struct trace_buffer *buffer, int cpu) 4139 { 4140 struct ring_buffer_per_cpu *cpu_buffer; 4141 4142 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 4143 return; 4144 4145 cpu_buffer = buffer->buffers[cpu]; 4146 atomic_inc(&cpu_buffer->record_disabled); 4147 } 4148 EXPORT_SYMBOL_GPL(ring_buffer_record_disable_cpu); 4149 4150 /** 4151 * ring_buffer_record_enable_cpu - enable writes to the buffer 4152 * @buffer: The ring buffer to enable writes 4153 * @cpu: The CPU to enable. 4154 * 4155 * Note, multiple disables will need the same number of enables 4156 * to truly enable the writing (much like preempt_disable). 4157 */ 4158 void ring_buffer_record_enable_cpu(struct trace_buffer *buffer, int cpu) 4159 { 4160 struct ring_buffer_per_cpu *cpu_buffer; 4161 4162 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 4163 return; 4164 4165 cpu_buffer = buffer->buffers[cpu]; 4166 atomic_dec(&cpu_buffer->record_disabled); 4167 } 4168 EXPORT_SYMBOL_GPL(ring_buffer_record_enable_cpu); 4169 4170 /* 4171 * The total entries in the ring buffer is the running counter 4172 * of entries entered into the ring buffer, minus the sum of 4173 * the entries read from the ring buffer and the number of 4174 * entries that were overwritten. 4175 */ 4176 static inline unsigned long 4177 rb_num_of_entries(struct ring_buffer_per_cpu *cpu_buffer) 4178 { 4179 return local_read(&cpu_buffer->entries) - 4180 (local_read(&cpu_buffer->overrun) + cpu_buffer->read); 4181 } 4182 4183 /** 4184 * ring_buffer_oldest_event_ts - get the oldest event timestamp from the buffer 4185 * @buffer: The ring buffer 4186 * @cpu: The per CPU buffer to read from. 4187 */ 4188 u64 ring_buffer_oldest_event_ts(struct trace_buffer *buffer, int cpu) 4189 { 4190 unsigned long flags; 4191 struct ring_buffer_per_cpu *cpu_buffer; 4192 struct buffer_page *bpage; 4193 u64 ret = 0; 4194 4195 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 4196 return 0; 4197 4198 cpu_buffer = buffer->buffers[cpu]; 4199 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 4200 /* 4201 * if the tail is on reader_page, oldest time stamp is on the reader 4202 * page 4203 */ 4204 if (cpu_buffer->tail_page == cpu_buffer->reader_page) 4205 bpage = cpu_buffer->reader_page; 4206 else 4207 bpage = rb_set_head_page(cpu_buffer); 4208 if (bpage) 4209 ret = bpage->page->time_stamp; 4210 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 4211 4212 return ret; 4213 } 4214 EXPORT_SYMBOL_GPL(ring_buffer_oldest_event_ts); 4215 4216 /** 4217 * ring_buffer_bytes_cpu - get the number of bytes consumed in a cpu buffer 4218 * @buffer: The ring buffer 4219 * @cpu: The per CPU buffer to read from. 4220 */ 4221 unsigned long ring_buffer_bytes_cpu(struct trace_buffer *buffer, int cpu) 4222 { 4223 struct ring_buffer_per_cpu *cpu_buffer; 4224 unsigned long ret; 4225 4226 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 4227 return 0; 4228 4229 cpu_buffer = buffer->buffers[cpu]; 4230 ret = local_read(&cpu_buffer->entries_bytes) - cpu_buffer->read_bytes; 4231 4232 return ret; 4233 } 4234 EXPORT_SYMBOL_GPL(ring_buffer_bytes_cpu); 4235 4236 /** 4237 * ring_buffer_entries_cpu - get the number of entries in a cpu buffer 4238 * @buffer: The ring buffer 4239 * @cpu: The per CPU buffer to get the entries from. 4240 */ 4241 unsigned long ring_buffer_entries_cpu(struct trace_buffer *buffer, int cpu) 4242 { 4243 struct ring_buffer_per_cpu *cpu_buffer; 4244 4245 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 4246 return 0; 4247 4248 cpu_buffer = buffer->buffers[cpu]; 4249 4250 return rb_num_of_entries(cpu_buffer); 4251 } 4252 EXPORT_SYMBOL_GPL(ring_buffer_entries_cpu); 4253 4254 /** 4255 * ring_buffer_overrun_cpu - get the number of overruns caused by the ring 4256 * buffer wrapping around (only if RB_FL_OVERWRITE is on). 4257 * @buffer: The ring buffer 4258 * @cpu: The per CPU buffer to get the number of overruns from 4259 */ 4260 unsigned long ring_buffer_overrun_cpu(struct trace_buffer *buffer, int cpu) 4261 { 4262 struct ring_buffer_per_cpu *cpu_buffer; 4263 unsigned long ret; 4264 4265 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 4266 return 0; 4267 4268 cpu_buffer = buffer->buffers[cpu]; 4269 ret = local_read(&cpu_buffer->overrun); 4270 4271 return ret; 4272 } 4273 EXPORT_SYMBOL_GPL(ring_buffer_overrun_cpu); 4274 4275 /** 4276 * ring_buffer_commit_overrun_cpu - get the number of overruns caused by 4277 * commits failing due to the buffer wrapping around while there are uncommitted 4278 * events, such as during an interrupt storm. 4279 * @buffer: The ring buffer 4280 * @cpu: The per CPU buffer to get the number of overruns from 4281 */ 4282 unsigned long 4283 ring_buffer_commit_overrun_cpu(struct trace_buffer *buffer, int cpu) 4284 { 4285 struct ring_buffer_per_cpu *cpu_buffer; 4286 unsigned long ret; 4287 4288 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 4289 return 0; 4290 4291 cpu_buffer = buffer->buffers[cpu]; 4292 ret = local_read(&cpu_buffer->commit_overrun); 4293 4294 return ret; 4295 } 4296 EXPORT_SYMBOL_GPL(ring_buffer_commit_overrun_cpu); 4297 4298 /** 4299 * ring_buffer_dropped_events_cpu - get the number of dropped events caused by 4300 * the ring buffer filling up (only if RB_FL_OVERWRITE is off). 4301 * @buffer: The ring buffer 4302 * @cpu: The per CPU buffer to get the number of overruns from 4303 */ 4304 unsigned long 4305 ring_buffer_dropped_events_cpu(struct trace_buffer *buffer, int cpu) 4306 { 4307 struct ring_buffer_per_cpu *cpu_buffer; 4308 unsigned long ret; 4309 4310 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 4311 return 0; 4312 4313 cpu_buffer = buffer->buffers[cpu]; 4314 ret = local_read(&cpu_buffer->dropped_events); 4315 4316 return ret; 4317 } 4318 EXPORT_SYMBOL_GPL(ring_buffer_dropped_events_cpu); 4319 4320 /** 4321 * ring_buffer_read_events_cpu - get the number of events successfully read 4322 * @buffer: The ring buffer 4323 * @cpu: The per CPU buffer to get the number of events read 4324 */ 4325 unsigned long 4326 ring_buffer_read_events_cpu(struct trace_buffer *buffer, int cpu) 4327 { 4328 struct ring_buffer_per_cpu *cpu_buffer; 4329 4330 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 4331 return 0; 4332 4333 cpu_buffer = buffer->buffers[cpu]; 4334 return cpu_buffer->read; 4335 } 4336 EXPORT_SYMBOL_GPL(ring_buffer_read_events_cpu); 4337 4338 /** 4339 * ring_buffer_entries - get the number of entries in a buffer 4340 * @buffer: The ring buffer 4341 * 4342 * Returns the total number of entries in the ring buffer 4343 * (all CPU entries) 4344 */ 4345 unsigned long ring_buffer_entries(struct trace_buffer *buffer) 4346 { 4347 struct ring_buffer_per_cpu *cpu_buffer; 4348 unsigned long entries = 0; 4349 int cpu; 4350 4351 /* if you care about this being correct, lock the buffer */ 4352 for_each_buffer_cpu(buffer, cpu) { 4353 cpu_buffer = buffer->buffers[cpu]; 4354 entries += rb_num_of_entries(cpu_buffer); 4355 } 4356 4357 return entries; 4358 } 4359 EXPORT_SYMBOL_GPL(ring_buffer_entries); 4360 4361 /** 4362 * ring_buffer_overruns - get the number of overruns in buffer 4363 * @buffer: The ring buffer 4364 * 4365 * Returns the total number of overruns in the ring buffer 4366 * (all CPU entries) 4367 */ 4368 unsigned long ring_buffer_overruns(struct trace_buffer *buffer) 4369 { 4370 struct ring_buffer_per_cpu *cpu_buffer; 4371 unsigned long overruns = 0; 4372 int cpu; 4373 4374 /* if you care about this being correct, lock the buffer */ 4375 for_each_buffer_cpu(buffer, cpu) { 4376 cpu_buffer = buffer->buffers[cpu]; 4377 overruns += local_read(&cpu_buffer->overrun); 4378 } 4379 4380 return overruns; 4381 } 4382 EXPORT_SYMBOL_GPL(ring_buffer_overruns); 4383 4384 static void rb_iter_reset(struct ring_buffer_iter *iter) 4385 { 4386 struct ring_buffer_per_cpu *cpu_buffer = iter->cpu_buffer; 4387 4388 /* Iterator usage is expected to have record disabled */ 4389 iter->head_page = cpu_buffer->reader_page; 4390 iter->head = cpu_buffer->reader_page->read; 4391 iter->next_event = iter->head; 4392 4393 iter->cache_reader_page = iter->head_page; 4394 iter->cache_read = cpu_buffer->read; 4395 4396 if (iter->head) { 4397 iter->read_stamp = cpu_buffer->read_stamp; 4398 iter->page_stamp = cpu_buffer->reader_page->page->time_stamp; 4399 } else { 4400 iter->read_stamp = iter->head_page->page->time_stamp; 4401 iter->page_stamp = iter->read_stamp; 4402 } 4403 } 4404 4405 /** 4406 * ring_buffer_iter_reset - reset an iterator 4407 * @iter: The iterator to reset 4408 * 4409 * Resets the iterator, so that it will start from the beginning 4410 * again. 4411 */ 4412 void ring_buffer_iter_reset(struct ring_buffer_iter *iter) 4413 { 4414 struct ring_buffer_per_cpu *cpu_buffer; 4415 unsigned long flags; 4416 4417 if (!iter) 4418 return; 4419 4420 cpu_buffer = iter->cpu_buffer; 4421 4422 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 4423 rb_iter_reset(iter); 4424 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 4425 } 4426 EXPORT_SYMBOL_GPL(ring_buffer_iter_reset); 4427 4428 /** 4429 * ring_buffer_iter_empty - check if an iterator has no more to read 4430 * @iter: The iterator to check 4431 */ 4432 int ring_buffer_iter_empty(struct ring_buffer_iter *iter) 4433 { 4434 struct ring_buffer_per_cpu *cpu_buffer; 4435 struct buffer_page *reader; 4436 struct buffer_page *head_page; 4437 struct buffer_page *commit_page; 4438 struct buffer_page *curr_commit_page; 4439 unsigned commit; 4440 u64 curr_commit_ts; 4441 u64 commit_ts; 4442 4443 cpu_buffer = iter->cpu_buffer; 4444 reader = cpu_buffer->reader_page; 4445 head_page = cpu_buffer->head_page; 4446 commit_page = cpu_buffer->commit_page; 4447 commit_ts = commit_page->page->time_stamp; 4448 4449 /* 4450 * When the writer goes across pages, it issues a cmpxchg which 4451 * is a mb(), which will synchronize with the rmb here. 4452 * (see rb_tail_page_update()) 4453 */ 4454 smp_rmb(); 4455 commit = rb_page_commit(commit_page); 4456 /* We want to make sure that the commit page doesn't change */ 4457 smp_rmb(); 4458 4459 /* Make sure commit page didn't change */ 4460 curr_commit_page = READ_ONCE(cpu_buffer->commit_page); 4461 curr_commit_ts = READ_ONCE(curr_commit_page->page->time_stamp); 4462 4463 /* If the commit page changed, then there's more data */ 4464 if (curr_commit_page != commit_page || 4465 curr_commit_ts != commit_ts) 4466 return 0; 4467 4468 /* Still racy, as it may return a false positive, but that's OK */ 4469 return ((iter->head_page == commit_page && iter->head >= commit) || 4470 (iter->head_page == reader && commit_page == head_page && 4471 head_page->read == commit && 4472 iter->head == rb_page_commit(cpu_buffer->reader_page))); 4473 } 4474 EXPORT_SYMBOL_GPL(ring_buffer_iter_empty); 4475 4476 static void 4477 rb_update_read_stamp(struct ring_buffer_per_cpu *cpu_buffer, 4478 struct ring_buffer_event *event) 4479 { 4480 u64 delta; 4481 4482 switch (event->type_len) { 4483 case RINGBUF_TYPE_PADDING: 4484 return; 4485 4486 case RINGBUF_TYPE_TIME_EXTEND: 4487 delta = rb_event_time_stamp(event); 4488 cpu_buffer->read_stamp += delta; 4489 return; 4490 4491 case RINGBUF_TYPE_TIME_STAMP: 4492 delta = rb_event_time_stamp(event); 4493 delta = rb_fix_abs_ts(delta, cpu_buffer->read_stamp); 4494 cpu_buffer->read_stamp = delta; 4495 return; 4496 4497 case RINGBUF_TYPE_DATA: 4498 cpu_buffer->read_stamp += event->time_delta; 4499 return; 4500 4501 default: 4502 RB_WARN_ON(cpu_buffer, 1); 4503 } 4504 } 4505 4506 static void 4507 rb_update_iter_read_stamp(struct ring_buffer_iter *iter, 4508 struct ring_buffer_event *event) 4509 { 4510 u64 delta; 4511 4512 switch (event->type_len) { 4513 case RINGBUF_TYPE_PADDING: 4514 return; 4515 4516 case RINGBUF_TYPE_TIME_EXTEND: 4517 delta = rb_event_time_stamp(event); 4518 iter->read_stamp += delta; 4519 return; 4520 4521 case RINGBUF_TYPE_TIME_STAMP: 4522 delta = rb_event_time_stamp(event); 4523 delta = rb_fix_abs_ts(delta, iter->read_stamp); 4524 iter->read_stamp = delta; 4525 return; 4526 4527 case RINGBUF_TYPE_DATA: 4528 iter->read_stamp += event->time_delta; 4529 return; 4530 4531 default: 4532 RB_WARN_ON(iter->cpu_buffer, 1); 4533 } 4534 } 4535 4536 static struct buffer_page * 4537 rb_get_reader_page(struct ring_buffer_per_cpu *cpu_buffer) 4538 { 4539 struct buffer_page *reader = NULL; 4540 unsigned long overwrite; 4541 unsigned long flags; 4542 int nr_loops = 0; 4543 bool ret; 4544 4545 local_irq_save(flags); 4546 arch_spin_lock(&cpu_buffer->lock); 4547 4548 again: 4549 /* 4550 * This should normally only loop twice. But because the 4551 * start of the reader inserts an empty page, it causes 4552 * a case where we will loop three times. There should be no 4553 * reason to loop four times (that I know of). 4554 */ 4555 if (RB_WARN_ON(cpu_buffer, ++nr_loops > 3)) { 4556 reader = NULL; 4557 goto out; 4558 } 4559 4560 reader = cpu_buffer->reader_page; 4561 4562 /* If there's more to read, return this page */ 4563 if (cpu_buffer->reader_page->read < rb_page_size(reader)) 4564 goto out; 4565 4566 /* Never should we have an index greater than the size */ 4567 if (RB_WARN_ON(cpu_buffer, 4568 cpu_buffer->reader_page->read > rb_page_size(reader))) 4569 goto out; 4570 4571 /* check if we caught up to the tail */ 4572 reader = NULL; 4573 if (cpu_buffer->commit_page == cpu_buffer->reader_page) 4574 goto out; 4575 4576 /* Don't bother swapping if the ring buffer is empty */ 4577 if (rb_num_of_entries(cpu_buffer) == 0) 4578 goto out; 4579 4580 /* 4581 * Reset the reader page to size zero. 4582 */ 4583 local_set(&cpu_buffer->reader_page->write, 0); 4584 local_set(&cpu_buffer->reader_page->entries, 0); 4585 local_set(&cpu_buffer->reader_page->page->commit, 0); 4586 cpu_buffer->reader_page->real_end = 0; 4587 4588 spin: 4589 /* 4590 * Splice the empty reader page into the list around the head. 4591 */ 4592 reader = rb_set_head_page(cpu_buffer); 4593 if (!reader) 4594 goto out; 4595 cpu_buffer->reader_page->list.next = rb_list_head(reader->list.next); 4596 cpu_buffer->reader_page->list.prev = reader->list.prev; 4597 4598 /* 4599 * cpu_buffer->pages just needs to point to the buffer, it 4600 * has no specific buffer page to point to. Lets move it out 4601 * of our way so we don't accidentally swap it. 4602 */ 4603 cpu_buffer->pages = reader->list.prev; 4604 4605 /* The reader page will be pointing to the new head */ 4606 rb_set_list_to_head(&cpu_buffer->reader_page->list); 4607 4608 /* 4609 * We want to make sure we read the overruns after we set up our 4610 * pointers to the next object. The writer side does a 4611 * cmpxchg to cross pages which acts as the mb on the writer 4612 * side. Note, the reader will constantly fail the swap 4613 * while the writer is updating the pointers, so this 4614 * guarantees that the overwrite recorded here is the one we 4615 * want to compare with the last_overrun. 4616 */ 4617 smp_mb(); 4618 overwrite = local_read(&(cpu_buffer->overrun)); 4619 4620 /* 4621 * Here's the tricky part. 4622 * 4623 * We need to move the pointer past the header page. 4624 * But we can only do that if a writer is not currently 4625 * moving it. The page before the header page has the 4626 * flag bit '1' set if it is pointing to the page we want. 4627 * but if the writer is in the process of moving it 4628 * than it will be '2' or already moved '0'. 4629 */ 4630 4631 ret = rb_head_page_replace(reader, cpu_buffer->reader_page); 4632 4633 /* 4634 * If we did not convert it, then we must try again. 4635 */ 4636 if (!ret) 4637 goto spin; 4638 4639 /* 4640 * Yay! We succeeded in replacing the page. 4641 * 4642 * Now make the new head point back to the reader page. 4643 */ 4644 rb_list_head(reader->list.next)->prev = &cpu_buffer->reader_page->list; 4645 rb_inc_page(&cpu_buffer->head_page); 4646 4647 local_inc(&cpu_buffer->pages_read); 4648 4649 /* Finally update the reader page to the new head */ 4650 cpu_buffer->reader_page = reader; 4651 cpu_buffer->reader_page->read = 0; 4652 4653 if (overwrite != cpu_buffer->last_overrun) { 4654 cpu_buffer->lost_events = overwrite - cpu_buffer->last_overrun; 4655 cpu_buffer->last_overrun = overwrite; 4656 } 4657 4658 goto again; 4659 4660 out: 4661 /* Update the read_stamp on the first event */ 4662 if (reader && reader->read == 0) 4663 cpu_buffer->read_stamp = reader->page->time_stamp; 4664 4665 arch_spin_unlock(&cpu_buffer->lock); 4666 local_irq_restore(flags); 4667 4668 /* 4669 * The writer has preempt disable, wait for it. But not forever 4670 * Although, 1 second is pretty much "forever" 4671 */ 4672 #define USECS_WAIT 1000000 4673 for (nr_loops = 0; nr_loops < USECS_WAIT; nr_loops++) { 4674 /* If the write is past the end of page, a writer is still updating it */ 4675 if (likely(!reader || rb_page_write(reader) <= BUF_PAGE_SIZE)) 4676 break; 4677 4678 udelay(1); 4679 4680 /* Get the latest version of the reader write value */ 4681 smp_rmb(); 4682 } 4683 4684 /* The writer is not moving forward? Something is wrong */ 4685 if (RB_WARN_ON(cpu_buffer, nr_loops == USECS_WAIT)) 4686 reader = NULL; 4687 4688 /* 4689 * Make sure we see any padding after the write update 4690 * (see rb_reset_tail()). 4691 * 4692 * In addition, a writer may be writing on the reader page 4693 * if the page has not been fully filled, so the read barrier 4694 * is also needed to make sure we see the content of what is 4695 * committed by the writer (see rb_set_commit_to_write()). 4696 */ 4697 smp_rmb(); 4698 4699 4700 return reader; 4701 } 4702 4703 static void rb_advance_reader(struct ring_buffer_per_cpu *cpu_buffer) 4704 { 4705 struct ring_buffer_event *event; 4706 struct buffer_page *reader; 4707 unsigned length; 4708 4709 reader = rb_get_reader_page(cpu_buffer); 4710 4711 /* This function should not be called when buffer is empty */ 4712 if (RB_WARN_ON(cpu_buffer, !reader)) 4713 return; 4714 4715 event = rb_reader_event(cpu_buffer); 4716 4717 if (event->type_len <= RINGBUF_TYPE_DATA_TYPE_LEN_MAX) 4718 cpu_buffer->read++; 4719 4720 rb_update_read_stamp(cpu_buffer, event); 4721 4722 length = rb_event_length(event); 4723 cpu_buffer->reader_page->read += length; 4724 } 4725 4726 static void rb_advance_iter(struct ring_buffer_iter *iter) 4727 { 4728 struct ring_buffer_per_cpu *cpu_buffer; 4729 4730 cpu_buffer = iter->cpu_buffer; 4731 4732 /* If head == next_event then we need to jump to the next event */ 4733 if (iter->head == iter->next_event) { 4734 /* If the event gets overwritten again, there's nothing to do */ 4735 if (rb_iter_head_event(iter) == NULL) 4736 return; 4737 } 4738 4739 iter->head = iter->next_event; 4740 4741 /* 4742 * Check if we are at the end of the buffer. 4743 */ 4744 if (iter->next_event >= rb_page_size(iter->head_page)) { 4745 /* discarded commits can make the page empty */ 4746 if (iter->head_page == cpu_buffer->commit_page) 4747 return; 4748 rb_inc_iter(iter); 4749 return; 4750 } 4751 4752 rb_update_iter_read_stamp(iter, iter->event); 4753 } 4754 4755 static int rb_lost_events(struct ring_buffer_per_cpu *cpu_buffer) 4756 { 4757 return cpu_buffer->lost_events; 4758 } 4759 4760 static struct ring_buffer_event * 4761 rb_buffer_peek(struct ring_buffer_per_cpu *cpu_buffer, u64 *ts, 4762 unsigned long *lost_events) 4763 { 4764 struct ring_buffer_event *event; 4765 struct buffer_page *reader; 4766 int nr_loops = 0; 4767 4768 if (ts) 4769 *ts = 0; 4770 again: 4771 /* 4772 * We repeat when a time extend is encountered. 4773 * Since the time extend is always attached to a data event, 4774 * we should never loop more than once. 4775 * (We never hit the following condition more than twice). 4776 */ 4777 if (RB_WARN_ON(cpu_buffer, ++nr_loops > 2)) 4778 return NULL; 4779 4780 reader = rb_get_reader_page(cpu_buffer); 4781 if (!reader) 4782 return NULL; 4783 4784 event = rb_reader_event(cpu_buffer); 4785 4786 switch (event->type_len) { 4787 case RINGBUF_TYPE_PADDING: 4788 if (rb_null_event(event)) 4789 RB_WARN_ON(cpu_buffer, 1); 4790 /* 4791 * Because the writer could be discarding every 4792 * event it creates (which would probably be bad) 4793 * if we were to go back to "again" then we may never 4794 * catch up, and will trigger the warn on, or lock 4795 * the box. Return the padding, and we will release 4796 * the current locks, and try again. 4797 */ 4798 return event; 4799 4800 case RINGBUF_TYPE_TIME_EXTEND: 4801 /* Internal data, OK to advance */ 4802 rb_advance_reader(cpu_buffer); 4803 goto again; 4804 4805 case RINGBUF_TYPE_TIME_STAMP: 4806 if (ts) { 4807 *ts = rb_event_time_stamp(event); 4808 *ts = rb_fix_abs_ts(*ts, reader->page->time_stamp); 4809 ring_buffer_normalize_time_stamp(cpu_buffer->buffer, 4810 cpu_buffer->cpu, ts); 4811 } 4812 /* Internal data, OK to advance */ 4813 rb_advance_reader(cpu_buffer); 4814 goto again; 4815 4816 case RINGBUF_TYPE_DATA: 4817 if (ts && !(*ts)) { 4818 *ts = cpu_buffer->read_stamp + event->time_delta; 4819 ring_buffer_normalize_time_stamp(cpu_buffer->buffer, 4820 cpu_buffer->cpu, ts); 4821 } 4822 if (lost_events) 4823 *lost_events = rb_lost_events(cpu_buffer); 4824 return event; 4825 4826 default: 4827 RB_WARN_ON(cpu_buffer, 1); 4828 } 4829 4830 return NULL; 4831 } 4832 EXPORT_SYMBOL_GPL(ring_buffer_peek); 4833 4834 static struct ring_buffer_event * 4835 rb_iter_peek(struct ring_buffer_iter *iter, u64 *ts) 4836 { 4837 struct trace_buffer *buffer; 4838 struct ring_buffer_per_cpu *cpu_buffer; 4839 struct ring_buffer_event *event; 4840 int nr_loops = 0; 4841 4842 if (ts) 4843 *ts = 0; 4844 4845 cpu_buffer = iter->cpu_buffer; 4846 buffer = cpu_buffer->buffer; 4847 4848 /* 4849 * Check if someone performed a consuming read to 4850 * the buffer. A consuming read invalidates the iterator 4851 * and we need to reset the iterator in this case. 4852 */ 4853 if (unlikely(iter->cache_read != cpu_buffer->read || 4854 iter->cache_reader_page != cpu_buffer->reader_page)) 4855 rb_iter_reset(iter); 4856 4857 again: 4858 if (ring_buffer_iter_empty(iter)) 4859 return NULL; 4860 4861 /* 4862 * As the writer can mess with what the iterator is trying 4863 * to read, just give up if we fail to get an event after 4864 * three tries. The iterator is not as reliable when reading 4865 * the ring buffer with an active write as the consumer is. 4866 * Do not warn if the three failures is reached. 4867 */ 4868 if (++nr_loops > 3) 4869 return NULL; 4870 4871 if (rb_per_cpu_empty(cpu_buffer)) 4872 return NULL; 4873 4874 if (iter->head >= rb_page_size(iter->head_page)) { 4875 rb_inc_iter(iter); 4876 goto again; 4877 } 4878 4879 event = rb_iter_head_event(iter); 4880 if (!event) 4881 goto again; 4882 4883 switch (event->type_len) { 4884 case RINGBUF_TYPE_PADDING: 4885 if (rb_null_event(event)) { 4886 rb_inc_iter(iter); 4887 goto again; 4888 } 4889 rb_advance_iter(iter); 4890 return event; 4891 4892 case RINGBUF_TYPE_TIME_EXTEND: 4893 /* Internal data, OK to advance */ 4894 rb_advance_iter(iter); 4895 goto again; 4896 4897 case RINGBUF_TYPE_TIME_STAMP: 4898 if (ts) { 4899 *ts = rb_event_time_stamp(event); 4900 *ts = rb_fix_abs_ts(*ts, iter->head_page->page->time_stamp); 4901 ring_buffer_normalize_time_stamp(cpu_buffer->buffer, 4902 cpu_buffer->cpu, ts); 4903 } 4904 /* Internal data, OK to advance */ 4905 rb_advance_iter(iter); 4906 goto again; 4907 4908 case RINGBUF_TYPE_DATA: 4909 if (ts && !(*ts)) { 4910 *ts = iter->read_stamp + event->time_delta; 4911 ring_buffer_normalize_time_stamp(buffer, 4912 cpu_buffer->cpu, ts); 4913 } 4914 return event; 4915 4916 default: 4917 RB_WARN_ON(cpu_buffer, 1); 4918 } 4919 4920 return NULL; 4921 } 4922 EXPORT_SYMBOL_GPL(ring_buffer_iter_peek); 4923 4924 static inline bool rb_reader_lock(struct ring_buffer_per_cpu *cpu_buffer) 4925 { 4926 if (likely(!in_nmi())) { 4927 raw_spin_lock(&cpu_buffer->reader_lock); 4928 return true; 4929 } 4930 4931 /* 4932 * If an NMI die dumps out the content of the ring buffer 4933 * trylock must be used to prevent a deadlock if the NMI 4934 * preempted a task that holds the ring buffer locks. If 4935 * we get the lock then all is fine, if not, then continue 4936 * to do the read, but this can corrupt the ring buffer, 4937 * so it must be permanently disabled from future writes. 4938 * Reading from NMI is a oneshot deal. 4939 */ 4940 if (raw_spin_trylock(&cpu_buffer->reader_lock)) 4941 return true; 4942 4943 /* Continue without locking, but disable the ring buffer */ 4944 atomic_inc(&cpu_buffer->record_disabled); 4945 return false; 4946 } 4947 4948 static inline void 4949 rb_reader_unlock(struct ring_buffer_per_cpu *cpu_buffer, bool locked) 4950 { 4951 if (likely(locked)) 4952 raw_spin_unlock(&cpu_buffer->reader_lock); 4953 } 4954 4955 /** 4956 * ring_buffer_peek - peek at the next event to be read 4957 * @buffer: The ring buffer to read 4958 * @cpu: The cpu to peak at 4959 * @ts: The timestamp counter of this event. 4960 * @lost_events: a variable to store if events were lost (may be NULL) 4961 * 4962 * This will return the event that will be read next, but does 4963 * not consume the data. 4964 */ 4965 struct ring_buffer_event * 4966 ring_buffer_peek(struct trace_buffer *buffer, int cpu, u64 *ts, 4967 unsigned long *lost_events) 4968 { 4969 struct ring_buffer_per_cpu *cpu_buffer = buffer->buffers[cpu]; 4970 struct ring_buffer_event *event; 4971 unsigned long flags; 4972 bool dolock; 4973 4974 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 4975 return NULL; 4976 4977 again: 4978 local_irq_save(flags); 4979 dolock = rb_reader_lock(cpu_buffer); 4980 event = rb_buffer_peek(cpu_buffer, ts, lost_events); 4981 if (event && event->type_len == RINGBUF_TYPE_PADDING) 4982 rb_advance_reader(cpu_buffer); 4983 rb_reader_unlock(cpu_buffer, dolock); 4984 local_irq_restore(flags); 4985 4986 if (event && event->type_len == RINGBUF_TYPE_PADDING) 4987 goto again; 4988 4989 return event; 4990 } 4991 4992 /** ring_buffer_iter_dropped - report if there are dropped events 4993 * @iter: The ring buffer iterator 4994 * 4995 * Returns true if there was dropped events since the last peek. 4996 */ 4997 bool ring_buffer_iter_dropped(struct ring_buffer_iter *iter) 4998 { 4999 bool ret = iter->missed_events != 0; 5000 5001 iter->missed_events = 0; 5002 return ret; 5003 } 5004 EXPORT_SYMBOL_GPL(ring_buffer_iter_dropped); 5005 5006 /** 5007 * ring_buffer_iter_peek - peek at the next event to be read 5008 * @iter: The ring buffer iterator 5009 * @ts: The timestamp counter of this event. 5010 * 5011 * This will return the event that will be read next, but does 5012 * not increment the iterator. 5013 */ 5014 struct ring_buffer_event * 5015 ring_buffer_iter_peek(struct ring_buffer_iter *iter, u64 *ts) 5016 { 5017 struct ring_buffer_per_cpu *cpu_buffer = iter->cpu_buffer; 5018 struct ring_buffer_event *event; 5019 unsigned long flags; 5020 5021 again: 5022 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 5023 event = rb_iter_peek(iter, ts); 5024 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 5025 5026 if (event && event->type_len == RINGBUF_TYPE_PADDING) 5027 goto again; 5028 5029 return event; 5030 } 5031 5032 /** 5033 * ring_buffer_consume - return an event and consume it 5034 * @buffer: The ring buffer to get the next event from 5035 * @cpu: the cpu to read the buffer from 5036 * @ts: a variable to store the timestamp (may be NULL) 5037 * @lost_events: a variable to store if events were lost (may be NULL) 5038 * 5039 * Returns the next event in the ring buffer, and that event is consumed. 5040 * Meaning, that sequential reads will keep returning a different event, 5041 * and eventually empty the ring buffer if the producer is slower. 5042 */ 5043 struct ring_buffer_event * 5044 ring_buffer_consume(struct trace_buffer *buffer, int cpu, u64 *ts, 5045 unsigned long *lost_events) 5046 { 5047 struct ring_buffer_per_cpu *cpu_buffer; 5048 struct ring_buffer_event *event = NULL; 5049 unsigned long flags; 5050 bool dolock; 5051 5052 again: 5053 /* might be called in atomic */ 5054 preempt_disable(); 5055 5056 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 5057 goto out; 5058 5059 cpu_buffer = buffer->buffers[cpu]; 5060 local_irq_save(flags); 5061 dolock = rb_reader_lock(cpu_buffer); 5062 5063 event = rb_buffer_peek(cpu_buffer, ts, lost_events); 5064 if (event) { 5065 cpu_buffer->lost_events = 0; 5066 rb_advance_reader(cpu_buffer); 5067 } 5068 5069 rb_reader_unlock(cpu_buffer, dolock); 5070 local_irq_restore(flags); 5071 5072 out: 5073 preempt_enable(); 5074 5075 if (event && event->type_len == RINGBUF_TYPE_PADDING) 5076 goto again; 5077 5078 return event; 5079 } 5080 EXPORT_SYMBOL_GPL(ring_buffer_consume); 5081 5082 /** 5083 * ring_buffer_read_prepare - Prepare for a non consuming read of the buffer 5084 * @buffer: The ring buffer to read from 5085 * @cpu: The cpu buffer to iterate over 5086 * @flags: gfp flags to use for memory allocation 5087 * 5088 * This performs the initial preparations necessary to iterate 5089 * through the buffer. Memory is allocated, buffer recording 5090 * is disabled, and the iterator pointer is returned to the caller. 5091 * 5092 * Disabling buffer recording prevents the reading from being 5093 * corrupted. This is not a consuming read, so a producer is not 5094 * expected. 5095 * 5096 * After a sequence of ring_buffer_read_prepare calls, the user is 5097 * expected to make at least one call to ring_buffer_read_prepare_sync. 5098 * Afterwards, ring_buffer_read_start is invoked to get things going 5099 * for real. 5100 * 5101 * This overall must be paired with ring_buffer_read_finish. 5102 */ 5103 struct ring_buffer_iter * 5104 ring_buffer_read_prepare(struct trace_buffer *buffer, int cpu, gfp_t flags) 5105 { 5106 struct ring_buffer_per_cpu *cpu_buffer; 5107 struct ring_buffer_iter *iter; 5108 5109 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 5110 return NULL; 5111 5112 iter = kzalloc(sizeof(*iter), flags); 5113 if (!iter) 5114 return NULL; 5115 5116 iter->event = kmalloc(BUF_MAX_DATA_SIZE, flags); 5117 if (!iter->event) { 5118 kfree(iter); 5119 return NULL; 5120 } 5121 5122 cpu_buffer = buffer->buffers[cpu]; 5123 5124 iter->cpu_buffer = cpu_buffer; 5125 5126 atomic_inc(&cpu_buffer->resize_disabled); 5127 5128 return iter; 5129 } 5130 EXPORT_SYMBOL_GPL(ring_buffer_read_prepare); 5131 5132 /** 5133 * ring_buffer_read_prepare_sync - Synchronize a set of prepare calls 5134 * 5135 * All previously invoked ring_buffer_read_prepare calls to prepare 5136 * iterators will be synchronized. Afterwards, read_buffer_read_start 5137 * calls on those iterators are allowed. 5138 */ 5139 void 5140 ring_buffer_read_prepare_sync(void) 5141 { 5142 synchronize_rcu(); 5143 } 5144 EXPORT_SYMBOL_GPL(ring_buffer_read_prepare_sync); 5145 5146 /** 5147 * ring_buffer_read_start - start a non consuming read of the buffer 5148 * @iter: The iterator returned by ring_buffer_read_prepare 5149 * 5150 * This finalizes the startup of an iteration through the buffer. 5151 * The iterator comes from a call to ring_buffer_read_prepare and 5152 * an intervening ring_buffer_read_prepare_sync must have been 5153 * performed. 5154 * 5155 * Must be paired with ring_buffer_read_finish. 5156 */ 5157 void 5158 ring_buffer_read_start(struct ring_buffer_iter *iter) 5159 { 5160 struct ring_buffer_per_cpu *cpu_buffer; 5161 unsigned long flags; 5162 5163 if (!iter) 5164 return; 5165 5166 cpu_buffer = iter->cpu_buffer; 5167 5168 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 5169 arch_spin_lock(&cpu_buffer->lock); 5170 rb_iter_reset(iter); 5171 arch_spin_unlock(&cpu_buffer->lock); 5172 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 5173 } 5174 EXPORT_SYMBOL_GPL(ring_buffer_read_start); 5175 5176 /** 5177 * ring_buffer_read_finish - finish reading the iterator of the buffer 5178 * @iter: The iterator retrieved by ring_buffer_start 5179 * 5180 * This re-enables the recording to the buffer, and frees the 5181 * iterator. 5182 */ 5183 void 5184 ring_buffer_read_finish(struct ring_buffer_iter *iter) 5185 { 5186 struct ring_buffer_per_cpu *cpu_buffer = iter->cpu_buffer; 5187 unsigned long flags; 5188 5189 /* 5190 * Ring buffer is disabled from recording, here's a good place 5191 * to check the integrity of the ring buffer. 5192 * Must prevent readers from trying to read, as the check 5193 * clears the HEAD page and readers require it. 5194 */ 5195 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 5196 rb_check_pages(cpu_buffer); 5197 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 5198 5199 atomic_dec(&cpu_buffer->resize_disabled); 5200 kfree(iter->event); 5201 kfree(iter); 5202 } 5203 EXPORT_SYMBOL_GPL(ring_buffer_read_finish); 5204 5205 /** 5206 * ring_buffer_iter_advance - advance the iterator to the next location 5207 * @iter: The ring buffer iterator 5208 * 5209 * Move the location of the iterator such that the next read will 5210 * be the next location of the iterator. 5211 */ 5212 void ring_buffer_iter_advance(struct ring_buffer_iter *iter) 5213 { 5214 struct ring_buffer_per_cpu *cpu_buffer = iter->cpu_buffer; 5215 unsigned long flags; 5216 5217 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 5218 5219 rb_advance_iter(iter); 5220 5221 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 5222 } 5223 EXPORT_SYMBOL_GPL(ring_buffer_iter_advance); 5224 5225 /** 5226 * ring_buffer_size - return the size of the ring buffer (in bytes) 5227 * @buffer: The ring buffer. 5228 * @cpu: The CPU to get ring buffer size from. 5229 */ 5230 unsigned long ring_buffer_size(struct trace_buffer *buffer, int cpu) 5231 { 5232 /* 5233 * Earlier, this method returned 5234 * BUF_PAGE_SIZE * buffer->nr_pages 5235 * Since the nr_pages field is now removed, we have converted this to 5236 * return the per cpu buffer value. 5237 */ 5238 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 5239 return 0; 5240 5241 return BUF_PAGE_SIZE * buffer->buffers[cpu]->nr_pages; 5242 } 5243 EXPORT_SYMBOL_GPL(ring_buffer_size); 5244 5245 static void 5246 rb_reset_cpu(struct ring_buffer_per_cpu *cpu_buffer) 5247 { 5248 rb_head_page_deactivate(cpu_buffer); 5249 5250 cpu_buffer->head_page 5251 = list_entry(cpu_buffer->pages, struct buffer_page, list); 5252 local_set(&cpu_buffer->head_page->write, 0); 5253 local_set(&cpu_buffer->head_page->entries, 0); 5254 local_set(&cpu_buffer->head_page->page->commit, 0); 5255 5256 cpu_buffer->head_page->read = 0; 5257 5258 cpu_buffer->tail_page = cpu_buffer->head_page; 5259 cpu_buffer->commit_page = cpu_buffer->head_page; 5260 5261 INIT_LIST_HEAD(&cpu_buffer->reader_page->list); 5262 INIT_LIST_HEAD(&cpu_buffer->new_pages); 5263 local_set(&cpu_buffer->reader_page->write, 0); 5264 local_set(&cpu_buffer->reader_page->entries, 0); 5265 local_set(&cpu_buffer->reader_page->page->commit, 0); 5266 cpu_buffer->reader_page->read = 0; 5267 5268 local_set(&cpu_buffer->entries_bytes, 0); 5269 local_set(&cpu_buffer->overrun, 0); 5270 local_set(&cpu_buffer->commit_overrun, 0); 5271 local_set(&cpu_buffer->dropped_events, 0); 5272 local_set(&cpu_buffer->entries, 0); 5273 local_set(&cpu_buffer->committing, 0); 5274 local_set(&cpu_buffer->commits, 0); 5275 local_set(&cpu_buffer->pages_touched, 0); 5276 local_set(&cpu_buffer->pages_lost, 0); 5277 local_set(&cpu_buffer->pages_read, 0); 5278 cpu_buffer->last_pages_touch = 0; 5279 cpu_buffer->shortest_full = 0; 5280 cpu_buffer->read = 0; 5281 cpu_buffer->read_bytes = 0; 5282 5283 rb_time_set(&cpu_buffer->write_stamp, 0); 5284 rb_time_set(&cpu_buffer->before_stamp, 0); 5285 5286 memset(cpu_buffer->event_stamp, 0, sizeof(cpu_buffer->event_stamp)); 5287 5288 cpu_buffer->lost_events = 0; 5289 cpu_buffer->last_overrun = 0; 5290 5291 rb_head_page_activate(cpu_buffer); 5292 } 5293 5294 /* Must have disabled the cpu buffer then done a synchronize_rcu */ 5295 static void reset_disabled_cpu_buffer(struct ring_buffer_per_cpu *cpu_buffer) 5296 { 5297 unsigned long flags; 5298 5299 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 5300 5301 if (RB_WARN_ON(cpu_buffer, local_read(&cpu_buffer->committing))) 5302 goto out; 5303 5304 arch_spin_lock(&cpu_buffer->lock); 5305 5306 rb_reset_cpu(cpu_buffer); 5307 5308 arch_spin_unlock(&cpu_buffer->lock); 5309 5310 out: 5311 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 5312 } 5313 5314 /** 5315 * ring_buffer_reset_cpu - reset a ring buffer per CPU buffer 5316 * @buffer: The ring buffer to reset a per cpu buffer of 5317 * @cpu: The CPU buffer to be reset 5318 */ 5319 void ring_buffer_reset_cpu(struct trace_buffer *buffer, int cpu) 5320 { 5321 struct ring_buffer_per_cpu *cpu_buffer = buffer->buffers[cpu]; 5322 5323 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 5324 return; 5325 5326 /* prevent another thread from changing buffer sizes */ 5327 mutex_lock(&buffer->mutex); 5328 5329 atomic_inc(&cpu_buffer->resize_disabled); 5330 atomic_inc(&cpu_buffer->record_disabled); 5331 5332 /* Make sure all commits have finished */ 5333 synchronize_rcu(); 5334 5335 reset_disabled_cpu_buffer(cpu_buffer); 5336 5337 atomic_dec(&cpu_buffer->record_disabled); 5338 atomic_dec(&cpu_buffer->resize_disabled); 5339 5340 mutex_unlock(&buffer->mutex); 5341 } 5342 EXPORT_SYMBOL_GPL(ring_buffer_reset_cpu); 5343 5344 /* Flag to ensure proper resetting of atomic variables */ 5345 #define RESET_BIT (1 << 30) 5346 5347 /** 5348 * ring_buffer_reset_online_cpus - reset a ring buffer per CPU buffer 5349 * @buffer: The ring buffer to reset a per cpu buffer of 5350 * @cpu: The CPU buffer to be reset 5351 */ 5352 void ring_buffer_reset_online_cpus(struct trace_buffer *buffer) 5353 { 5354 struct ring_buffer_per_cpu *cpu_buffer; 5355 int cpu; 5356 5357 /* prevent another thread from changing buffer sizes */ 5358 mutex_lock(&buffer->mutex); 5359 5360 for_each_online_buffer_cpu(buffer, cpu) { 5361 cpu_buffer = buffer->buffers[cpu]; 5362 5363 atomic_add(RESET_BIT, &cpu_buffer->resize_disabled); 5364 atomic_inc(&cpu_buffer->record_disabled); 5365 } 5366 5367 /* Make sure all commits have finished */ 5368 synchronize_rcu(); 5369 5370 for_each_buffer_cpu(buffer, cpu) { 5371 cpu_buffer = buffer->buffers[cpu]; 5372 5373 /* 5374 * If a CPU came online during the synchronize_rcu(), then 5375 * ignore it. 5376 */ 5377 if (!(atomic_read(&cpu_buffer->resize_disabled) & RESET_BIT)) 5378 continue; 5379 5380 reset_disabled_cpu_buffer(cpu_buffer); 5381 5382 atomic_dec(&cpu_buffer->record_disabled); 5383 atomic_sub(RESET_BIT, &cpu_buffer->resize_disabled); 5384 } 5385 5386 mutex_unlock(&buffer->mutex); 5387 } 5388 5389 /** 5390 * ring_buffer_reset - reset a ring buffer 5391 * @buffer: The ring buffer to reset all cpu buffers 5392 */ 5393 void ring_buffer_reset(struct trace_buffer *buffer) 5394 { 5395 struct ring_buffer_per_cpu *cpu_buffer; 5396 int cpu; 5397 5398 /* prevent another thread from changing buffer sizes */ 5399 mutex_lock(&buffer->mutex); 5400 5401 for_each_buffer_cpu(buffer, cpu) { 5402 cpu_buffer = buffer->buffers[cpu]; 5403 5404 atomic_inc(&cpu_buffer->resize_disabled); 5405 atomic_inc(&cpu_buffer->record_disabled); 5406 } 5407 5408 /* Make sure all commits have finished */ 5409 synchronize_rcu(); 5410 5411 for_each_buffer_cpu(buffer, cpu) { 5412 cpu_buffer = buffer->buffers[cpu]; 5413 5414 reset_disabled_cpu_buffer(cpu_buffer); 5415 5416 atomic_dec(&cpu_buffer->record_disabled); 5417 atomic_dec(&cpu_buffer->resize_disabled); 5418 } 5419 5420 mutex_unlock(&buffer->mutex); 5421 } 5422 EXPORT_SYMBOL_GPL(ring_buffer_reset); 5423 5424 /** 5425 * ring_buffer_empty - is the ring buffer empty? 5426 * @buffer: The ring buffer to test 5427 */ 5428 bool ring_buffer_empty(struct trace_buffer *buffer) 5429 { 5430 struct ring_buffer_per_cpu *cpu_buffer; 5431 unsigned long flags; 5432 bool dolock; 5433 bool ret; 5434 int cpu; 5435 5436 /* yes this is racy, but if you don't like the race, lock the buffer */ 5437 for_each_buffer_cpu(buffer, cpu) { 5438 cpu_buffer = buffer->buffers[cpu]; 5439 local_irq_save(flags); 5440 dolock = rb_reader_lock(cpu_buffer); 5441 ret = rb_per_cpu_empty(cpu_buffer); 5442 rb_reader_unlock(cpu_buffer, dolock); 5443 local_irq_restore(flags); 5444 5445 if (!ret) 5446 return false; 5447 } 5448 5449 return true; 5450 } 5451 EXPORT_SYMBOL_GPL(ring_buffer_empty); 5452 5453 /** 5454 * ring_buffer_empty_cpu - is a cpu buffer of a ring buffer empty? 5455 * @buffer: The ring buffer 5456 * @cpu: The CPU buffer to test 5457 */ 5458 bool ring_buffer_empty_cpu(struct trace_buffer *buffer, int cpu) 5459 { 5460 struct ring_buffer_per_cpu *cpu_buffer; 5461 unsigned long flags; 5462 bool dolock; 5463 bool ret; 5464 5465 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 5466 return true; 5467 5468 cpu_buffer = buffer->buffers[cpu]; 5469 local_irq_save(flags); 5470 dolock = rb_reader_lock(cpu_buffer); 5471 ret = rb_per_cpu_empty(cpu_buffer); 5472 rb_reader_unlock(cpu_buffer, dolock); 5473 local_irq_restore(flags); 5474 5475 return ret; 5476 } 5477 EXPORT_SYMBOL_GPL(ring_buffer_empty_cpu); 5478 5479 #ifdef CONFIG_RING_BUFFER_ALLOW_SWAP 5480 /** 5481 * ring_buffer_swap_cpu - swap a CPU buffer between two ring buffers 5482 * @buffer_a: One buffer to swap with 5483 * @buffer_b: The other buffer to swap with 5484 * @cpu: the CPU of the buffers to swap 5485 * 5486 * This function is useful for tracers that want to take a "snapshot" 5487 * of a CPU buffer and has another back up buffer lying around. 5488 * it is expected that the tracer handles the cpu buffer not being 5489 * used at the moment. 5490 */ 5491 int ring_buffer_swap_cpu(struct trace_buffer *buffer_a, 5492 struct trace_buffer *buffer_b, int cpu) 5493 { 5494 struct ring_buffer_per_cpu *cpu_buffer_a; 5495 struct ring_buffer_per_cpu *cpu_buffer_b; 5496 int ret = -EINVAL; 5497 5498 if (!cpumask_test_cpu(cpu, buffer_a->cpumask) || 5499 !cpumask_test_cpu(cpu, buffer_b->cpumask)) 5500 goto out; 5501 5502 cpu_buffer_a = buffer_a->buffers[cpu]; 5503 cpu_buffer_b = buffer_b->buffers[cpu]; 5504 5505 /* At least make sure the two buffers are somewhat the same */ 5506 if (cpu_buffer_a->nr_pages != cpu_buffer_b->nr_pages) 5507 goto out; 5508 5509 ret = -EAGAIN; 5510 5511 if (atomic_read(&buffer_a->record_disabled)) 5512 goto out; 5513 5514 if (atomic_read(&buffer_b->record_disabled)) 5515 goto out; 5516 5517 if (atomic_read(&cpu_buffer_a->record_disabled)) 5518 goto out; 5519 5520 if (atomic_read(&cpu_buffer_b->record_disabled)) 5521 goto out; 5522 5523 /* 5524 * We can't do a synchronize_rcu here because this 5525 * function can be called in atomic context. 5526 * Normally this will be called from the same CPU as cpu. 5527 * If not it's up to the caller to protect this. 5528 */ 5529 atomic_inc(&cpu_buffer_a->record_disabled); 5530 atomic_inc(&cpu_buffer_b->record_disabled); 5531 5532 ret = -EBUSY; 5533 if (local_read(&cpu_buffer_a->committing)) 5534 goto out_dec; 5535 if (local_read(&cpu_buffer_b->committing)) 5536 goto out_dec; 5537 5538 buffer_a->buffers[cpu] = cpu_buffer_b; 5539 buffer_b->buffers[cpu] = cpu_buffer_a; 5540 5541 cpu_buffer_b->buffer = buffer_a; 5542 cpu_buffer_a->buffer = buffer_b; 5543 5544 ret = 0; 5545 5546 out_dec: 5547 atomic_dec(&cpu_buffer_a->record_disabled); 5548 atomic_dec(&cpu_buffer_b->record_disabled); 5549 out: 5550 return ret; 5551 } 5552 EXPORT_SYMBOL_GPL(ring_buffer_swap_cpu); 5553 #endif /* CONFIG_RING_BUFFER_ALLOW_SWAP */ 5554 5555 /** 5556 * ring_buffer_alloc_read_page - allocate a page to read from buffer 5557 * @buffer: the buffer to allocate for. 5558 * @cpu: the cpu buffer to allocate. 5559 * 5560 * This function is used in conjunction with ring_buffer_read_page. 5561 * When reading a full page from the ring buffer, these functions 5562 * can be used to speed up the process. The calling function should 5563 * allocate a few pages first with this function. Then when it 5564 * needs to get pages from the ring buffer, it passes the result 5565 * of this function into ring_buffer_read_page, which will swap 5566 * the page that was allocated, with the read page of the buffer. 5567 * 5568 * Returns: 5569 * The page allocated, or ERR_PTR 5570 */ 5571 void *ring_buffer_alloc_read_page(struct trace_buffer *buffer, int cpu) 5572 { 5573 struct ring_buffer_per_cpu *cpu_buffer; 5574 struct buffer_data_page *bpage = NULL; 5575 unsigned long flags; 5576 struct page *page; 5577 5578 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 5579 return ERR_PTR(-ENODEV); 5580 5581 cpu_buffer = buffer->buffers[cpu]; 5582 local_irq_save(flags); 5583 arch_spin_lock(&cpu_buffer->lock); 5584 5585 if (cpu_buffer->free_page) { 5586 bpage = cpu_buffer->free_page; 5587 cpu_buffer->free_page = NULL; 5588 } 5589 5590 arch_spin_unlock(&cpu_buffer->lock); 5591 local_irq_restore(flags); 5592 5593 if (bpage) 5594 goto out; 5595 5596 page = alloc_pages_node(cpu_to_node(cpu), 5597 GFP_KERNEL | __GFP_NORETRY, 0); 5598 if (!page) 5599 return ERR_PTR(-ENOMEM); 5600 5601 bpage = page_address(page); 5602 5603 out: 5604 rb_init_page(bpage); 5605 5606 return bpage; 5607 } 5608 EXPORT_SYMBOL_GPL(ring_buffer_alloc_read_page); 5609 5610 /** 5611 * ring_buffer_free_read_page - free an allocated read page 5612 * @buffer: the buffer the page was allocate for 5613 * @cpu: the cpu buffer the page came from 5614 * @data: the page to free 5615 * 5616 * Free a page allocated from ring_buffer_alloc_read_page. 5617 */ 5618 void ring_buffer_free_read_page(struct trace_buffer *buffer, int cpu, void *data) 5619 { 5620 struct ring_buffer_per_cpu *cpu_buffer; 5621 struct buffer_data_page *bpage = data; 5622 struct page *page = virt_to_page(bpage); 5623 unsigned long flags; 5624 5625 if (!buffer || !buffer->buffers || !buffer->buffers[cpu]) 5626 return; 5627 5628 cpu_buffer = buffer->buffers[cpu]; 5629 5630 /* If the page is still in use someplace else, we can't reuse it */ 5631 if (page_ref_count(page) > 1) 5632 goto out; 5633 5634 local_irq_save(flags); 5635 arch_spin_lock(&cpu_buffer->lock); 5636 5637 if (!cpu_buffer->free_page) { 5638 cpu_buffer->free_page = bpage; 5639 bpage = NULL; 5640 } 5641 5642 arch_spin_unlock(&cpu_buffer->lock); 5643 local_irq_restore(flags); 5644 5645 out: 5646 free_page((unsigned long)bpage); 5647 } 5648 EXPORT_SYMBOL_GPL(ring_buffer_free_read_page); 5649 5650 /** 5651 * ring_buffer_read_page - extract a page from the ring buffer 5652 * @buffer: buffer to extract from 5653 * @data_page: the page to use allocated from ring_buffer_alloc_read_page 5654 * @len: amount to extract 5655 * @cpu: the cpu of the buffer to extract 5656 * @full: should the extraction only happen when the page is full. 5657 * 5658 * This function will pull out a page from the ring buffer and consume it. 5659 * @data_page must be the address of the variable that was returned 5660 * from ring_buffer_alloc_read_page. This is because the page might be used 5661 * to swap with a page in the ring buffer. 5662 * 5663 * for example: 5664 * rpage = ring_buffer_alloc_read_page(buffer, cpu); 5665 * if (IS_ERR(rpage)) 5666 * return PTR_ERR(rpage); 5667 * ret = ring_buffer_read_page(buffer, &rpage, len, cpu, 0); 5668 * if (ret >= 0) 5669 * process_page(rpage, ret); 5670 * 5671 * When @full is set, the function will not return true unless 5672 * the writer is off the reader page. 5673 * 5674 * Note: it is up to the calling functions to handle sleeps and wakeups. 5675 * The ring buffer can be used anywhere in the kernel and can not 5676 * blindly call wake_up. The layer that uses the ring buffer must be 5677 * responsible for that. 5678 * 5679 * Returns: 5680 * >=0 if data has been transferred, returns the offset of consumed data. 5681 * <0 if no data has been transferred. 5682 */ 5683 int ring_buffer_read_page(struct trace_buffer *buffer, 5684 void **data_page, size_t len, int cpu, int full) 5685 { 5686 struct ring_buffer_per_cpu *cpu_buffer = buffer->buffers[cpu]; 5687 struct ring_buffer_event *event; 5688 struct buffer_data_page *bpage; 5689 struct buffer_page *reader; 5690 unsigned long missed_events; 5691 unsigned long flags; 5692 unsigned int commit; 5693 unsigned int read; 5694 u64 save_timestamp; 5695 int ret = -1; 5696 5697 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 5698 goto out; 5699 5700 /* 5701 * If len is not big enough to hold the page header, then 5702 * we can not copy anything. 5703 */ 5704 if (len <= BUF_PAGE_HDR_SIZE) 5705 goto out; 5706 5707 len -= BUF_PAGE_HDR_SIZE; 5708 5709 if (!data_page) 5710 goto out; 5711 5712 bpage = *data_page; 5713 if (!bpage) 5714 goto out; 5715 5716 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 5717 5718 reader = rb_get_reader_page(cpu_buffer); 5719 if (!reader) 5720 goto out_unlock; 5721 5722 event = rb_reader_event(cpu_buffer); 5723 5724 read = reader->read; 5725 commit = rb_page_commit(reader); 5726 5727 /* Check if any events were dropped */ 5728 missed_events = cpu_buffer->lost_events; 5729 5730 /* 5731 * If this page has been partially read or 5732 * if len is not big enough to read the rest of the page or 5733 * a writer is still on the page, then 5734 * we must copy the data from the page to the buffer. 5735 * Otherwise, we can simply swap the page with the one passed in. 5736 */ 5737 if (read || (len < (commit - read)) || 5738 cpu_buffer->reader_page == cpu_buffer->commit_page) { 5739 struct buffer_data_page *rpage = cpu_buffer->reader_page->page; 5740 unsigned int rpos = read; 5741 unsigned int pos = 0; 5742 unsigned int size; 5743 5744 /* 5745 * If a full page is expected, this can still be returned 5746 * if there's been a previous partial read and the 5747 * rest of the page can be read and the commit page is off 5748 * the reader page. 5749 */ 5750 if (full && 5751 (!read || (len < (commit - read)) || 5752 cpu_buffer->reader_page == cpu_buffer->commit_page)) 5753 goto out_unlock; 5754 5755 if (len > (commit - read)) 5756 len = (commit - read); 5757 5758 /* Always keep the time extend and data together */ 5759 size = rb_event_ts_length(event); 5760 5761 if (len < size) 5762 goto out_unlock; 5763 5764 /* save the current timestamp, since the user will need it */ 5765 save_timestamp = cpu_buffer->read_stamp; 5766 5767 /* Need to copy one event at a time */ 5768 do { 5769 /* We need the size of one event, because 5770 * rb_advance_reader only advances by one event, 5771 * whereas rb_event_ts_length may include the size of 5772 * one or two events. 5773 * We have already ensured there's enough space if this 5774 * is a time extend. */ 5775 size = rb_event_length(event); 5776 memcpy(bpage->data + pos, rpage->data + rpos, size); 5777 5778 len -= size; 5779 5780 rb_advance_reader(cpu_buffer); 5781 rpos = reader->read; 5782 pos += size; 5783 5784 if (rpos >= commit) 5785 break; 5786 5787 event = rb_reader_event(cpu_buffer); 5788 /* Always keep the time extend and data together */ 5789 size = rb_event_ts_length(event); 5790 } while (len >= size); 5791 5792 /* update bpage */ 5793 local_set(&bpage->commit, pos); 5794 bpage->time_stamp = save_timestamp; 5795 5796 /* we copied everything to the beginning */ 5797 read = 0; 5798 } else { 5799 /* update the entry counter */ 5800 cpu_buffer->read += rb_page_entries(reader); 5801 cpu_buffer->read_bytes += BUF_PAGE_SIZE; 5802 5803 /* swap the pages */ 5804 rb_init_page(bpage); 5805 bpage = reader->page; 5806 reader->page = *data_page; 5807 local_set(&reader->write, 0); 5808 local_set(&reader->entries, 0); 5809 reader->read = 0; 5810 *data_page = bpage; 5811 5812 /* 5813 * Use the real_end for the data size, 5814 * This gives us a chance to store the lost events 5815 * on the page. 5816 */ 5817 if (reader->real_end) 5818 local_set(&bpage->commit, reader->real_end); 5819 } 5820 ret = read; 5821 5822 cpu_buffer->lost_events = 0; 5823 5824 commit = local_read(&bpage->commit); 5825 /* 5826 * Set a flag in the commit field if we lost events 5827 */ 5828 if (missed_events) { 5829 /* If there is room at the end of the page to save the 5830 * missed events, then record it there. 5831 */ 5832 if (BUF_PAGE_SIZE - commit >= sizeof(missed_events)) { 5833 memcpy(&bpage->data[commit], &missed_events, 5834 sizeof(missed_events)); 5835 local_add(RB_MISSED_STORED, &bpage->commit); 5836 commit += sizeof(missed_events); 5837 } 5838 local_add(RB_MISSED_EVENTS, &bpage->commit); 5839 } 5840 5841 /* 5842 * This page may be off to user land. Zero it out here. 5843 */ 5844 if (commit < BUF_PAGE_SIZE) 5845 memset(&bpage->data[commit], 0, BUF_PAGE_SIZE - commit); 5846 5847 out_unlock: 5848 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 5849 5850 out: 5851 return ret; 5852 } 5853 EXPORT_SYMBOL_GPL(ring_buffer_read_page); 5854 5855 /* 5856 * We only allocate new buffers, never free them if the CPU goes down. 5857 * If we were to free the buffer, then the user would lose any trace that was in 5858 * the buffer. 5859 */ 5860 int trace_rb_cpu_prepare(unsigned int cpu, struct hlist_node *node) 5861 { 5862 struct trace_buffer *buffer; 5863 long nr_pages_same; 5864 int cpu_i; 5865 unsigned long nr_pages; 5866 5867 buffer = container_of(node, struct trace_buffer, node); 5868 if (cpumask_test_cpu(cpu, buffer->cpumask)) 5869 return 0; 5870 5871 nr_pages = 0; 5872 nr_pages_same = 1; 5873 /* check if all cpu sizes are same */ 5874 for_each_buffer_cpu(buffer, cpu_i) { 5875 /* fill in the size from first enabled cpu */ 5876 if (nr_pages == 0) 5877 nr_pages = buffer->buffers[cpu_i]->nr_pages; 5878 if (nr_pages != buffer->buffers[cpu_i]->nr_pages) { 5879 nr_pages_same = 0; 5880 break; 5881 } 5882 } 5883 /* allocate minimum pages, user can later expand it */ 5884 if (!nr_pages_same) 5885 nr_pages = 2; 5886 buffer->buffers[cpu] = 5887 rb_allocate_cpu_buffer(buffer, nr_pages, cpu); 5888 if (!buffer->buffers[cpu]) { 5889 WARN(1, "failed to allocate ring buffer on CPU %u\n", 5890 cpu); 5891 return -ENOMEM; 5892 } 5893 smp_wmb(); 5894 cpumask_set_cpu(cpu, buffer->cpumask); 5895 return 0; 5896 } 5897 5898 #ifdef CONFIG_RING_BUFFER_STARTUP_TEST 5899 /* 5900 * This is a basic integrity check of the ring buffer. 5901 * Late in the boot cycle this test will run when configured in. 5902 * It will kick off a thread per CPU that will go into a loop 5903 * writing to the per cpu ring buffer various sizes of data. 5904 * Some of the data will be large items, some small. 5905 * 5906 * Another thread is created that goes into a spin, sending out 5907 * IPIs to the other CPUs to also write into the ring buffer. 5908 * this is to test the nesting ability of the buffer. 5909 * 5910 * Basic stats are recorded and reported. If something in the 5911 * ring buffer should happen that's not expected, a big warning 5912 * is displayed and all ring buffers are disabled. 5913 */ 5914 static struct task_struct *rb_threads[NR_CPUS] __initdata; 5915 5916 struct rb_test_data { 5917 struct trace_buffer *buffer; 5918 unsigned long events; 5919 unsigned long bytes_written; 5920 unsigned long bytes_alloc; 5921 unsigned long bytes_dropped; 5922 unsigned long events_nested; 5923 unsigned long bytes_written_nested; 5924 unsigned long bytes_alloc_nested; 5925 unsigned long bytes_dropped_nested; 5926 int min_size_nested; 5927 int max_size_nested; 5928 int max_size; 5929 int min_size; 5930 int cpu; 5931 int cnt; 5932 }; 5933 5934 static struct rb_test_data rb_data[NR_CPUS] __initdata; 5935 5936 /* 1 meg per cpu */ 5937 #define RB_TEST_BUFFER_SIZE 1048576 5938 5939 static char rb_string[] __initdata = 5940 "abcdefghijklmnopqrstuvwxyz1234567890!@#$%^&*()?+\\" 5941 "?+|:';\",.<>/?abcdefghijklmnopqrstuvwxyz1234567890" 5942 "!@#$%^&*()?+\\?+|:';\",.<>/?abcdefghijklmnopqrstuv"; 5943 5944 static bool rb_test_started __initdata; 5945 5946 struct rb_item { 5947 int size; 5948 char str[]; 5949 }; 5950 5951 static __init int rb_write_something(struct rb_test_data *data, bool nested) 5952 { 5953 struct ring_buffer_event *event; 5954 struct rb_item *item; 5955 bool started; 5956 int event_len; 5957 int size; 5958 int len; 5959 int cnt; 5960 5961 /* Have nested writes different that what is written */ 5962 cnt = data->cnt + (nested ? 27 : 0); 5963 5964 /* Multiply cnt by ~e, to make some unique increment */ 5965 size = (cnt * 68 / 25) % (sizeof(rb_string) - 1); 5966 5967 len = size + sizeof(struct rb_item); 5968 5969 started = rb_test_started; 5970 /* read rb_test_started before checking buffer enabled */ 5971 smp_rmb(); 5972 5973 event = ring_buffer_lock_reserve(data->buffer, len); 5974 if (!event) { 5975 /* Ignore dropped events before test starts. */ 5976 if (started) { 5977 if (nested) 5978 data->bytes_dropped += len; 5979 else 5980 data->bytes_dropped_nested += len; 5981 } 5982 return len; 5983 } 5984 5985 event_len = ring_buffer_event_length(event); 5986 5987 if (RB_WARN_ON(data->buffer, event_len < len)) 5988 goto out; 5989 5990 item = ring_buffer_event_data(event); 5991 item->size = size; 5992 memcpy(item->str, rb_string, size); 5993 5994 if (nested) { 5995 data->bytes_alloc_nested += event_len; 5996 data->bytes_written_nested += len; 5997 data->events_nested++; 5998 if (!data->min_size_nested || len < data->min_size_nested) 5999 data->min_size_nested = len; 6000 if (len > data->max_size_nested) 6001 data->max_size_nested = len; 6002 } else { 6003 data->bytes_alloc += event_len; 6004 data->bytes_written += len; 6005 data->events++; 6006 if (!data->min_size || len < data->min_size) 6007 data->max_size = len; 6008 if (len > data->max_size) 6009 data->max_size = len; 6010 } 6011 6012 out: 6013 ring_buffer_unlock_commit(data->buffer); 6014 6015 return 0; 6016 } 6017 6018 static __init int rb_test(void *arg) 6019 { 6020 struct rb_test_data *data = arg; 6021 6022 while (!kthread_should_stop()) { 6023 rb_write_something(data, false); 6024 data->cnt++; 6025 6026 set_current_state(TASK_INTERRUPTIBLE); 6027 /* Now sleep between a min of 100-300us and a max of 1ms */ 6028 usleep_range(((data->cnt % 3) + 1) * 100, 1000); 6029 } 6030 6031 return 0; 6032 } 6033 6034 static __init void rb_ipi(void *ignore) 6035 { 6036 struct rb_test_data *data; 6037 int cpu = smp_processor_id(); 6038 6039 data = &rb_data[cpu]; 6040 rb_write_something(data, true); 6041 } 6042 6043 static __init int rb_hammer_test(void *arg) 6044 { 6045 while (!kthread_should_stop()) { 6046 6047 /* Send an IPI to all cpus to write data! */ 6048 smp_call_function(rb_ipi, NULL, 1); 6049 /* No sleep, but for non preempt, let others run */ 6050 schedule(); 6051 } 6052 6053 return 0; 6054 } 6055 6056 static __init int test_ringbuffer(void) 6057 { 6058 struct task_struct *rb_hammer; 6059 struct trace_buffer *buffer; 6060 int cpu; 6061 int ret = 0; 6062 6063 if (security_locked_down(LOCKDOWN_TRACEFS)) { 6064 pr_warn("Lockdown is enabled, skipping ring buffer tests\n"); 6065 return 0; 6066 } 6067 6068 pr_info("Running ring buffer tests...\n"); 6069 6070 buffer = ring_buffer_alloc(RB_TEST_BUFFER_SIZE, RB_FL_OVERWRITE); 6071 if (WARN_ON(!buffer)) 6072 return 0; 6073 6074 /* Disable buffer so that threads can't write to it yet */ 6075 ring_buffer_record_off(buffer); 6076 6077 for_each_online_cpu(cpu) { 6078 rb_data[cpu].buffer = buffer; 6079 rb_data[cpu].cpu = cpu; 6080 rb_data[cpu].cnt = cpu; 6081 rb_threads[cpu] = kthread_run_on_cpu(rb_test, &rb_data[cpu], 6082 cpu, "rbtester/%u"); 6083 if (WARN_ON(IS_ERR(rb_threads[cpu]))) { 6084 pr_cont("FAILED\n"); 6085 ret = PTR_ERR(rb_threads[cpu]); 6086 goto out_free; 6087 } 6088 } 6089 6090 /* Now create the rb hammer! */ 6091 rb_hammer = kthread_run(rb_hammer_test, NULL, "rbhammer"); 6092 if (WARN_ON(IS_ERR(rb_hammer))) { 6093 pr_cont("FAILED\n"); 6094 ret = PTR_ERR(rb_hammer); 6095 goto out_free; 6096 } 6097 6098 ring_buffer_record_on(buffer); 6099 /* 6100 * Show buffer is enabled before setting rb_test_started. 6101 * Yes there's a small race window where events could be 6102 * dropped and the thread wont catch it. But when a ring 6103 * buffer gets enabled, there will always be some kind of 6104 * delay before other CPUs see it. Thus, we don't care about 6105 * those dropped events. We care about events dropped after 6106 * the threads see that the buffer is active. 6107 */ 6108 smp_wmb(); 6109 rb_test_started = true; 6110 6111 set_current_state(TASK_INTERRUPTIBLE); 6112 /* Just run for 10 seconds */; 6113 schedule_timeout(10 * HZ); 6114 6115 kthread_stop(rb_hammer); 6116 6117 out_free: 6118 for_each_online_cpu(cpu) { 6119 if (!rb_threads[cpu]) 6120 break; 6121 kthread_stop(rb_threads[cpu]); 6122 } 6123 if (ret) { 6124 ring_buffer_free(buffer); 6125 return ret; 6126 } 6127 6128 /* Report! */ 6129 pr_info("finished\n"); 6130 for_each_online_cpu(cpu) { 6131 struct ring_buffer_event *event; 6132 struct rb_test_data *data = &rb_data[cpu]; 6133 struct rb_item *item; 6134 unsigned long total_events; 6135 unsigned long total_dropped; 6136 unsigned long total_written; 6137 unsigned long total_alloc; 6138 unsigned long total_read = 0; 6139 unsigned long total_size = 0; 6140 unsigned long total_len = 0; 6141 unsigned long total_lost = 0; 6142 unsigned long lost; 6143 int big_event_size; 6144 int small_event_size; 6145 6146 ret = -1; 6147 6148 total_events = data->events + data->events_nested; 6149 total_written = data->bytes_written + data->bytes_written_nested; 6150 total_alloc = data->bytes_alloc + data->bytes_alloc_nested; 6151 total_dropped = data->bytes_dropped + data->bytes_dropped_nested; 6152 6153 big_event_size = data->max_size + data->max_size_nested; 6154 small_event_size = data->min_size + data->min_size_nested; 6155 6156 pr_info("CPU %d:\n", cpu); 6157 pr_info(" events: %ld\n", total_events); 6158 pr_info(" dropped bytes: %ld\n", total_dropped); 6159 pr_info(" alloced bytes: %ld\n", total_alloc); 6160 pr_info(" written bytes: %ld\n", total_written); 6161 pr_info(" biggest event: %d\n", big_event_size); 6162 pr_info(" smallest event: %d\n", small_event_size); 6163 6164 if (RB_WARN_ON(buffer, total_dropped)) 6165 break; 6166 6167 ret = 0; 6168 6169 while ((event = ring_buffer_consume(buffer, cpu, NULL, &lost))) { 6170 total_lost += lost; 6171 item = ring_buffer_event_data(event); 6172 total_len += ring_buffer_event_length(event); 6173 total_size += item->size + sizeof(struct rb_item); 6174 if (memcmp(&item->str[0], rb_string, item->size) != 0) { 6175 pr_info("FAILED!\n"); 6176 pr_info("buffer had: %.*s\n", item->size, item->str); 6177 pr_info("expected: %.*s\n", item->size, rb_string); 6178 RB_WARN_ON(buffer, 1); 6179 ret = -1; 6180 break; 6181 } 6182 total_read++; 6183 } 6184 if (ret) 6185 break; 6186 6187 ret = -1; 6188 6189 pr_info(" read events: %ld\n", total_read); 6190 pr_info(" lost events: %ld\n", total_lost); 6191 pr_info(" total events: %ld\n", total_lost + total_read); 6192 pr_info(" recorded len bytes: %ld\n", total_len); 6193 pr_info(" recorded size bytes: %ld\n", total_size); 6194 if (total_lost) { 6195 pr_info(" With dropped events, record len and size may not match\n" 6196 " alloced and written from above\n"); 6197 } else { 6198 if (RB_WARN_ON(buffer, total_len != total_alloc || 6199 total_size != total_written)) 6200 break; 6201 } 6202 if (RB_WARN_ON(buffer, total_lost + total_read != total_events)) 6203 break; 6204 6205 ret = 0; 6206 } 6207 if (!ret) 6208 pr_info("Ring buffer PASSED!\n"); 6209 6210 ring_buffer_free(buffer); 6211 return 0; 6212 } 6213 6214 late_initcall(test_ringbuffer); 6215 #endif /* CONFIG_RING_BUFFER_STARTUP_TEST */ 6216