1 // SPDX-License-Identifier: GPL-2.0 2 /* 3 * Generic ring buffer 4 * 5 * Copyright (C) 2008 Steven Rostedt <srostedt@redhat.com> 6 */ 7 #include <linux/trace_events.h> 8 #include <linux/ring_buffer.h> 9 #include <linux/trace_clock.h> 10 #include <linux/sched/clock.h> 11 #include <linux/trace_seq.h> 12 #include <linux/spinlock.h> 13 #include <linux/irq_work.h> 14 #include <linux/security.h> 15 #include <linux/uaccess.h> 16 #include <linux/hardirq.h> 17 #include <linux/kthread.h> /* for self test */ 18 #include <linux/module.h> 19 #include <linux/percpu.h> 20 #include <linux/mutex.h> 21 #include <linux/delay.h> 22 #include <linux/slab.h> 23 #include <linux/init.h> 24 #include <linux/hash.h> 25 #include <linux/list.h> 26 #include <linux/cpu.h> 27 #include <linux/oom.h> 28 29 #include <asm/local.h> 30 31 static void update_pages_handler(struct work_struct *work); 32 33 /* 34 * The ring buffer header is special. We must manually up keep it. 35 */ 36 int ring_buffer_print_entry_header(struct trace_seq *s) 37 { 38 trace_seq_puts(s, "# compressed entry header\n"); 39 trace_seq_puts(s, "\ttype_len : 5 bits\n"); 40 trace_seq_puts(s, "\ttime_delta : 27 bits\n"); 41 trace_seq_puts(s, "\tarray : 32 bits\n"); 42 trace_seq_putc(s, '\n'); 43 trace_seq_printf(s, "\tpadding : type == %d\n", 44 RINGBUF_TYPE_PADDING); 45 trace_seq_printf(s, "\ttime_extend : type == %d\n", 46 RINGBUF_TYPE_TIME_EXTEND); 47 trace_seq_printf(s, "\ttime_stamp : type == %d\n", 48 RINGBUF_TYPE_TIME_STAMP); 49 trace_seq_printf(s, "\tdata max type_len == %d\n", 50 RINGBUF_TYPE_DATA_TYPE_LEN_MAX); 51 52 return !trace_seq_has_overflowed(s); 53 } 54 55 /* 56 * The ring buffer is made up of a list of pages. A separate list of pages is 57 * allocated for each CPU. A writer may only write to a buffer that is 58 * associated with the CPU it is currently executing on. A reader may read 59 * from any per cpu buffer. 60 * 61 * The reader is special. For each per cpu buffer, the reader has its own 62 * reader page. When a reader has read the entire reader page, this reader 63 * page is swapped with another page in the ring buffer. 64 * 65 * Now, as long as the writer is off the reader page, the reader can do what 66 * ever it wants with that page. The writer will never write to that page 67 * again (as long as it is out of the ring buffer). 68 * 69 * Here's some silly ASCII art. 70 * 71 * +------+ 72 * |reader| RING BUFFER 73 * |page | 74 * +------+ +---+ +---+ +---+ 75 * | |-->| |-->| | 76 * +---+ +---+ +---+ 77 * ^ | 78 * | | 79 * +---------------+ 80 * 81 * 82 * +------+ 83 * |reader| RING BUFFER 84 * |page |------------------v 85 * +------+ +---+ +---+ +---+ 86 * | |-->| |-->| | 87 * +---+ +---+ +---+ 88 * ^ | 89 * | | 90 * +---------------+ 91 * 92 * 93 * +------+ 94 * |reader| RING BUFFER 95 * |page |------------------v 96 * +------+ +---+ +---+ +---+ 97 * ^ | |-->| |-->| | 98 * | +---+ +---+ +---+ 99 * | | 100 * | | 101 * +------------------------------+ 102 * 103 * 104 * +------+ 105 * |buffer| RING BUFFER 106 * |page |------------------v 107 * +------+ +---+ +---+ +---+ 108 * ^ | | | |-->| | 109 * | New +---+ +---+ +---+ 110 * | Reader------^ | 111 * | page | 112 * +------------------------------+ 113 * 114 * 115 * After we make this swap, the reader can hand this page off to the splice 116 * code and be done with it. It can even allocate a new page if it needs to 117 * and swap that into the ring buffer. 118 * 119 * We will be using cmpxchg soon to make all this lockless. 120 * 121 */ 122 123 /* Used for individual buffers (after the counter) */ 124 #define RB_BUFFER_OFF (1 << 20) 125 126 #define BUF_PAGE_HDR_SIZE offsetof(struct buffer_data_page, data) 127 128 #define RB_EVNT_HDR_SIZE (offsetof(struct ring_buffer_event, array)) 129 #define RB_ALIGNMENT 4U 130 #define RB_MAX_SMALL_DATA (RB_ALIGNMENT * RINGBUF_TYPE_DATA_TYPE_LEN_MAX) 131 #define RB_EVNT_MIN_SIZE 8U /* two 32bit words */ 132 #define RB_ALIGN_DATA __aligned(RB_ALIGNMENT) 133 134 /* define RINGBUF_TYPE_DATA for 'case RINGBUF_TYPE_DATA:' */ 135 #define RINGBUF_TYPE_DATA 0 ... RINGBUF_TYPE_DATA_TYPE_LEN_MAX 136 137 enum { 138 RB_LEN_TIME_EXTEND = 8, 139 RB_LEN_TIME_STAMP = 8, 140 }; 141 142 #define skip_time_extend(event) \ 143 ((struct ring_buffer_event *)((char *)event + RB_LEN_TIME_EXTEND)) 144 145 #define extended_time(event) \ 146 (event->type_len >= RINGBUF_TYPE_TIME_EXTEND) 147 148 static inline int rb_null_event(struct ring_buffer_event *event) 149 { 150 return event->type_len == RINGBUF_TYPE_PADDING && !event->time_delta; 151 } 152 153 static void rb_event_set_padding(struct ring_buffer_event *event) 154 { 155 /* padding has a NULL time_delta */ 156 event->type_len = RINGBUF_TYPE_PADDING; 157 event->time_delta = 0; 158 } 159 160 static unsigned 161 rb_event_data_length(struct ring_buffer_event *event) 162 { 163 unsigned length; 164 165 if (event->type_len) 166 length = event->type_len * RB_ALIGNMENT; 167 else 168 length = event->array[0]; 169 return length + RB_EVNT_HDR_SIZE; 170 } 171 172 /* 173 * Return the length of the given event. Will return 174 * the length of the time extend if the event is a 175 * time extend. 176 */ 177 static inline unsigned 178 rb_event_length(struct ring_buffer_event *event) 179 { 180 switch (event->type_len) { 181 case RINGBUF_TYPE_PADDING: 182 if (rb_null_event(event)) 183 /* undefined */ 184 return -1; 185 return event->array[0] + RB_EVNT_HDR_SIZE; 186 187 case RINGBUF_TYPE_TIME_EXTEND: 188 return RB_LEN_TIME_EXTEND; 189 190 case RINGBUF_TYPE_TIME_STAMP: 191 return RB_LEN_TIME_STAMP; 192 193 case RINGBUF_TYPE_DATA: 194 return rb_event_data_length(event); 195 default: 196 WARN_ON_ONCE(1); 197 } 198 /* not hit */ 199 return 0; 200 } 201 202 /* 203 * Return total length of time extend and data, 204 * or just the event length for all other events. 205 */ 206 static inline unsigned 207 rb_event_ts_length(struct ring_buffer_event *event) 208 { 209 unsigned len = 0; 210 211 if (extended_time(event)) { 212 /* time extends include the data event after it */ 213 len = RB_LEN_TIME_EXTEND; 214 event = skip_time_extend(event); 215 } 216 return len + rb_event_length(event); 217 } 218 219 /** 220 * ring_buffer_event_length - return the length of the event 221 * @event: the event to get the length of 222 * 223 * Returns the size of the data load of a data event. 224 * If the event is something other than a data event, it 225 * returns the size of the event itself. With the exception 226 * of a TIME EXTEND, where it still returns the size of the 227 * data load of the data event after it. 228 */ 229 unsigned ring_buffer_event_length(struct ring_buffer_event *event) 230 { 231 unsigned length; 232 233 if (extended_time(event)) 234 event = skip_time_extend(event); 235 236 length = rb_event_length(event); 237 if (event->type_len > RINGBUF_TYPE_DATA_TYPE_LEN_MAX) 238 return length; 239 length -= RB_EVNT_HDR_SIZE; 240 if (length > RB_MAX_SMALL_DATA + sizeof(event->array[0])) 241 length -= sizeof(event->array[0]); 242 return length; 243 } 244 EXPORT_SYMBOL_GPL(ring_buffer_event_length); 245 246 /* inline for ring buffer fast paths */ 247 static __always_inline void * 248 rb_event_data(struct ring_buffer_event *event) 249 { 250 if (extended_time(event)) 251 event = skip_time_extend(event); 252 WARN_ON_ONCE(event->type_len > RINGBUF_TYPE_DATA_TYPE_LEN_MAX); 253 /* If length is in len field, then array[0] has the data */ 254 if (event->type_len) 255 return (void *)&event->array[0]; 256 /* Otherwise length is in array[0] and array[1] has the data */ 257 return (void *)&event->array[1]; 258 } 259 260 /** 261 * ring_buffer_event_data - return the data of the event 262 * @event: the event to get the data from 263 */ 264 void *ring_buffer_event_data(struct ring_buffer_event *event) 265 { 266 return rb_event_data(event); 267 } 268 EXPORT_SYMBOL_GPL(ring_buffer_event_data); 269 270 #define for_each_buffer_cpu(buffer, cpu) \ 271 for_each_cpu(cpu, buffer->cpumask) 272 273 #define for_each_online_buffer_cpu(buffer, cpu) \ 274 for_each_cpu_and(cpu, buffer->cpumask, cpu_online_mask) 275 276 #define TS_SHIFT 27 277 #define TS_MASK ((1ULL << TS_SHIFT) - 1) 278 #define TS_DELTA_TEST (~TS_MASK) 279 280 /** 281 * ring_buffer_event_time_stamp - return the event's extended timestamp 282 * @event: the event to get the timestamp of 283 * 284 * Returns the extended timestamp associated with a data event. 285 * An extended time_stamp is a 64-bit timestamp represented 286 * internally in a special way that makes the best use of space 287 * contained within a ring buffer event. This function decodes 288 * it and maps it to a straight u64 value. 289 */ 290 u64 ring_buffer_event_time_stamp(struct ring_buffer_event *event) 291 { 292 u64 ts; 293 294 ts = event->array[0]; 295 ts <<= TS_SHIFT; 296 ts += event->time_delta; 297 298 return ts; 299 } 300 301 /* Flag when events were overwritten */ 302 #define RB_MISSED_EVENTS (1 << 31) 303 /* Missed count stored at end */ 304 #define RB_MISSED_STORED (1 << 30) 305 306 struct buffer_data_page { 307 u64 time_stamp; /* page time stamp */ 308 local_t commit; /* write committed index */ 309 unsigned char data[] RB_ALIGN_DATA; /* data of buffer page */ 310 }; 311 312 /* 313 * Note, the buffer_page list must be first. The buffer pages 314 * are allocated in cache lines, which means that each buffer 315 * page will be at the beginning of a cache line, and thus 316 * the least significant bits will be zero. We use this to 317 * add flags in the list struct pointers, to make the ring buffer 318 * lockless. 319 */ 320 struct buffer_page { 321 struct list_head list; /* list of buffer pages */ 322 local_t write; /* index for next write */ 323 unsigned read; /* index for next read */ 324 local_t entries; /* entries on this page */ 325 unsigned long real_end; /* real end of data */ 326 struct buffer_data_page *page; /* Actual data page */ 327 }; 328 329 /* 330 * The buffer page counters, write and entries, must be reset 331 * atomically when crossing page boundaries. To synchronize this 332 * update, two counters are inserted into the number. One is 333 * the actual counter for the write position or count on the page. 334 * 335 * The other is a counter of updaters. Before an update happens 336 * the update partition of the counter is incremented. This will 337 * allow the updater to update the counter atomically. 338 * 339 * The counter is 20 bits, and the state data is 12. 340 */ 341 #define RB_WRITE_MASK 0xfffff 342 #define RB_WRITE_INTCNT (1 << 20) 343 344 static void rb_init_page(struct buffer_data_page *bpage) 345 { 346 local_set(&bpage->commit, 0); 347 } 348 349 /* 350 * Also stolen from mm/slob.c. Thanks to Mathieu Desnoyers for pointing 351 * this issue out. 352 */ 353 static void free_buffer_page(struct buffer_page *bpage) 354 { 355 free_page((unsigned long)bpage->page); 356 kfree(bpage); 357 } 358 359 /* 360 * We need to fit the time_stamp delta into 27 bits. 361 */ 362 static inline int test_time_stamp(u64 delta) 363 { 364 if (delta & TS_DELTA_TEST) 365 return 1; 366 return 0; 367 } 368 369 #define BUF_PAGE_SIZE (PAGE_SIZE - BUF_PAGE_HDR_SIZE) 370 371 /* Max payload is BUF_PAGE_SIZE - header (8bytes) */ 372 #define BUF_MAX_DATA_SIZE (BUF_PAGE_SIZE - (sizeof(u32) * 2)) 373 374 int ring_buffer_print_page_header(struct trace_seq *s) 375 { 376 struct buffer_data_page field; 377 378 trace_seq_printf(s, "\tfield: u64 timestamp;\t" 379 "offset:0;\tsize:%u;\tsigned:%u;\n", 380 (unsigned int)sizeof(field.time_stamp), 381 (unsigned int)is_signed_type(u64)); 382 383 trace_seq_printf(s, "\tfield: local_t commit;\t" 384 "offset:%u;\tsize:%u;\tsigned:%u;\n", 385 (unsigned int)offsetof(typeof(field), commit), 386 (unsigned int)sizeof(field.commit), 387 (unsigned int)is_signed_type(long)); 388 389 trace_seq_printf(s, "\tfield: int overwrite;\t" 390 "offset:%u;\tsize:%u;\tsigned:%u;\n", 391 (unsigned int)offsetof(typeof(field), commit), 392 1, 393 (unsigned int)is_signed_type(long)); 394 395 trace_seq_printf(s, "\tfield: char data;\t" 396 "offset:%u;\tsize:%u;\tsigned:%u;\n", 397 (unsigned int)offsetof(typeof(field), data), 398 (unsigned int)BUF_PAGE_SIZE, 399 (unsigned int)is_signed_type(char)); 400 401 return !trace_seq_has_overflowed(s); 402 } 403 404 struct rb_irq_work { 405 struct irq_work work; 406 wait_queue_head_t waiters; 407 wait_queue_head_t full_waiters; 408 bool waiters_pending; 409 bool full_waiters_pending; 410 bool wakeup_full; 411 }; 412 413 /* 414 * Structure to hold event state and handle nested events. 415 */ 416 struct rb_event_info { 417 u64 ts; 418 u64 delta; 419 u64 before; 420 u64 after; 421 unsigned long length; 422 struct buffer_page *tail_page; 423 int add_timestamp; 424 }; 425 426 /* 427 * Used for the add_timestamp 428 * NONE 429 * EXTEND - wants a time extend 430 * ABSOLUTE - the buffer requests all events to have absolute time stamps 431 * FORCE - force a full time stamp. 432 */ 433 enum { 434 RB_ADD_STAMP_NONE = 0, 435 RB_ADD_STAMP_EXTEND = BIT(1), 436 RB_ADD_STAMP_ABSOLUTE = BIT(2), 437 RB_ADD_STAMP_FORCE = BIT(3) 438 }; 439 /* 440 * Used for which event context the event is in. 441 * TRANSITION = 0 442 * NMI = 1 443 * IRQ = 2 444 * SOFTIRQ = 3 445 * NORMAL = 4 446 * 447 * See trace_recursive_lock() comment below for more details. 448 */ 449 enum { 450 RB_CTX_TRANSITION, 451 RB_CTX_NMI, 452 RB_CTX_IRQ, 453 RB_CTX_SOFTIRQ, 454 RB_CTX_NORMAL, 455 RB_CTX_MAX 456 }; 457 458 #if BITS_PER_LONG == 32 459 #define RB_TIME_32 460 #endif 461 462 /* To test on 64 bit machines */ 463 //#define RB_TIME_32 464 465 #ifdef RB_TIME_32 466 467 struct rb_time_struct { 468 local_t cnt; 469 local_t top; 470 local_t bottom; 471 }; 472 #else 473 #include <asm/local64.h> 474 struct rb_time_struct { 475 local64_t time; 476 }; 477 #endif 478 typedef struct rb_time_struct rb_time_t; 479 480 /* 481 * head_page == tail_page && head == tail then buffer is empty. 482 */ 483 struct ring_buffer_per_cpu { 484 int cpu; 485 atomic_t record_disabled; 486 atomic_t resize_disabled; 487 struct trace_buffer *buffer; 488 raw_spinlock_t reader_lock; /* serialize readers */ 489 arch_spinlock_t lock; 490 struct lock_class_key lock_key; 491 struct buffer_data_page *free_page; 492 unsigned long nr_pages; 493 unsigned int current_context; 494 struct list_head *pages; 495 struct buffer_page *head_page; /* read from head */ 496 struct buffer_page *tail_page; /* write to tail */ 497 struct buffer_page *commit_page; /* committed pages */ 498 struct buffer_page *reader_page; 499 unsigned long lost_events; 500 unsigned long last_overrun; 501 unsigned long nest; 502 local_t entries_bytes; 503 local_t entries; 504 local_t overrun; 505 local_t commit_overrun; 506 local_t dropped_events; 507 local_t committing; 508 local_t commits; 509 local_t pages_touched; 510 local_t pages_read; 511 long last_pages_touch; 512 size_t shortest_full; 513 unsigned long read; 514 unsigned long read_bytes; 515 rb_time_t write_stamp; 516 rb_time_t before_stamp; 517 u64 read_stamp; 518 /* ring buffer pages to update, > 0 to add, < 0 to remove */ 519 long nr_pages_to_update; 520 struct list_head new_pages; /* new pages to add */ 521 struct work_struct update_pages_work; 522 struct completion update_done; 523 524 struct rb_irq_work irq_work; 525 }; 526 527 struct trace_buffer { 528 unsigned flags; 529 int cpus; 530 atomic_t record_disabled; 531 cpumask_var_t cpumask; 532 533 struct lock_class_key *reader_lock_key; 534 535 struct mutex mutex; 536 537 struct ring_buffer_per_cpu **buffers; 538 539 struct hlist_node node; 540 u64 (*clock)(void); 541 542 struct rb_irq_work irq_work; 543 bool time_stamp_abs; 544 }; 545 546 struct ring_buffer_iter { 547 struct ring_buffer_per_cpu *cpu_buffer; 548 unsigned long head; 549 unsigned long next_event; 550 struct buffer_page *head_page; 551 struct buffer_page *cache_reader_page; 552 unsigned long cache_read; 553 u64 read_stamp; 554 u64 page_stamp; 555 struct ring_buffer_event *event; 556 int missed_events; 557 }; 558 559 #ifdef RB_TIME_32 560 561 /* 562 * On 32 bit machines, local64_t is very expensive. As the ring 563 * buffer doesn't need all the features of a true 64 bit atomic, 564 * on 32 bit, it uses these functions (64 still uses local64_t). 565 * 566 * For the ring buffer, 64 bit required operations for the time is 567 * the following: 568 * 569 * - Only need 59 bits (uses 60 to make it even). 570 * - Reads may fail if it interrupted a modification of the time stamp. 571 * It will succeed if it did not interrupt another write even if 572 * the read itself is interrupted by a write. 573 * It returns whether it was successful or not. 574 * 575 * - Writes always succeed and will overwrite other writes and writes 576 * that were done by events interrupting the current write. 577 * 578 * - A write followed by a read of the same time stamp will always succeed, 579 * but may not contain the same value. 580 * 581 * - A cmpxchg will fail if it interrupted another write or cmpxchg. 582 * Other than that, it acts like a normal cmpxchg. 583 * 584 * The 60 bit time stamp is broken up by 30 bits in a top and bottom half 585 * (bottom being the least significant 30 bits of the 60 bit time stamp). 586 * 587 * The two most significant bits of each half holds a 2 bit counter (0-3). 588 * Each update will increment this counter by one. 589 * When reading the top and bottom, if the two counter bits match then the 590 * top and bottom together make a valid 60 bit number. 591 */ 592 #define RB_TIME_SHIFT 30 593 #define RB_TIME_VAL_MASK ((1 << RB_TIME_SHIFT) - 1) 594 595 static inline int rb_time_cnt(unsigned long val) 596 { 597 return (val >> RB_TIME_SHIFT) & 3; 598 } 599 600 static inline u64 rb_time_val(unsigned long top, unsigned long bottom) 601 { 602 u64 val; 603 604 val = top & RB_TIME_VAL_MASK; 605 val <<= RB_TIME_SHIFT; 606 val |= bottom & RB_TIME_VAL_MASK; 607 608 return val; 609 } 610 611 static inline bool __rb_time_read(rb_time_t *t, u64 *ret, unsigned long *cnt) 612 { 613 unsigned long top, bottom; 614 unsigned long c; 615 616 /* 617 * If the read is interrupted by a write, then the cnt will 618 * be different. Loop until both top and bottom have been read 619 * without interruption. 620 */ 621 do { 622 c = local_read(&t->cnt); 623 top = local_read(&t->top); 624 bottom = local_read(&t->bottom); 625 } while (c != local_read(&t->cnt)); 626 627 *cnt = rb_time_cnt(top); 628 629 /* If top and bottom counts don't match, this interrupted a write */ 630 if (*cnt != rb_time_cnt(bottom)) 631 return false; 632 633 *ret = rb_time_val(top, bottom); 634 return true; 635 } 636 637 static bool rb_time_read(rb_time_t *t, u64 *ret) 638 { 639 unsigned long cnt; 640 641 return __rb_time_read(t, ret, &cnt); 642 } 643 644 static inline unsigned long rb_time_val_cnt(unsigned long val, unsigned long cnt) 645 { 646 return (val & RB_TIME_VAL_MASK) | ((cnt & 3) << RB_TIME_SHIFT); 647 } 648 649 static inline void rb_time_split(u64 val, unsigned long *top, unsigned long *bottom) 650 { 651 *top = (unsigned long)((val >> RB_TIME_SHIFT) & RB_TIME_VAL_MASK); 652 *bottom = (unsigned long)(val & RB_TIME_VAL_MASK); 653 } 654 655 static inline void rb_time_val_set(local_t *t, unsigned long val, unsigned long cnt) 656 { 657 val = rb_time_val_cnt(val, cnt); 658 local_set(t, val); 659 } 660 661 static void rb_time_set(rb_time_t *t, u64 val) 662 { 663 unsigned long cnt, top, bottom; 664 665 rb_time_split(val, &top, &bottom); 666 667 /* Writes always succeed with a valid number even if it gets interrupted. */ 668 do { 669 cnt = local_inc_return(&t->cnt); 670 rb_time_val_set(&t->top, top, cnt); 671 rb_time_val_set(&t->bottom, bottom, cnt); 672 } while (cnt != local_read(&t->cnt)); 673 } 674 675 static inline bool 676 rb_time_read_cmpxchg(local_t *l, unsigned long expect, unsigned long set) 677 { 678 unsigned long ret; 679 680 ret = local_cmpxchg(l, expect, set); 681 return ret == expect; 682 } 683 684 static int rb_time_cmpxchg(rb_time_t *t, u64 expect, u64 set) 685 { 686 unsigned long cnt, top, bottom; 687 unsigned long cnt2, top2, bottom2; 688 u64 val; 689 690 /* The cmpxchg always fails if it interrupted an update */ 691 if (!__rb_time_read(t, &val, &cnt2)) 692 return false; 693 694 if (val != expect) 695 return false; 696 697 cnt = local_read(&t->cnt); 698 if ((cnt & 3) != cnt2) 699 return false; 700 701 cnt2 = cnt + 1; 702 703 rb_time_split(val, &top, &bottom); 704 top = rb_time_val_cnt(top, cnt); 705 bottom = rb_time_val_cnt(bottom, cnt); 706 707 rb_time_split(set, &top2, &bottom2); 708 top2 = rb_time_val_cnt(top2, cnt2); 709 bottom2 = rb_time_val_cnt(bottom2, cnt2); 710 711 if (!rb_time_read_cmpxchg(&t->cnt, cnt, cnt2)) 712 return false; 713 if (!rb_time_read_cmpxchg(&t->top, top, top2)) 714 return false; 715 if (!rb_time_read_cmpxchg(&t->bottom, bottom, bottom2)) 716 return false; 717 return true; 718 } 719 720 #else /* 64 bits */ 721 722 /* local64_t always succeeds */ 723 724 static inline bool rb_time_read(rb_time_t *t, u64 *ret) 725 { 726 *ret = local64_read(&t->time); 727 return true; 728 } 729 static void rb_time_set(rb_time_t *t, u64 val) 730 { 731 local64_set(&t->time, val); 732 } 733 734 static bool rb_time_cmpxchg(rb_time_t *t, u64 expect, u64 set) 735 { 736 u64 val; 737 val = local64_cmpxchg(&t->time, expect, set); 738 return val == expect; 739 } 740 #endif 741 742 /** 743 * ring_buffer_nr_pages - get the number of buffer pages in the ring buffer 744 * @buffer: The ring_buffer to get the number of pages from 745 * @cpu: The cpu of the ring_buffer to get the number of pages from 746 * 747 * Returns the number of pages used by a per_cpu buffer of the ring buffer. 748 */ 749 size_t ring_buffer_nr_pages(struct trace_buffer *buffer, int cpu) 750 { 751 return buffer->buffers[cpu]->nr_pages; 752 } 753 754 /** 755 * ring_buffer_nr_pages_dirty - get the number of used pages in the ring buffer 756 * @buffer: The ring_buffer to get the number of pages from 757 * @cpu: The cpu of the ring_buffer to get the number of pages from 758 * 759 * Returns the number of pages that have content in the ring buffer. 760 */ 761 size_t ring_buffer_nr_dirty_pages(struct trace_buffer *buffer, int cpu) 762 { 763 size_t read; 764 size_t cnt; 765 766 read = local_read(&buffer->buffers[cpu]->pages_read); 767 cnt = local_read(&buffer->buffers[cpu]->pages_touched); 768 /* The reader can read an empty page, but not more than that */ 769 if (cnt < read) { 770 WARN_ON_ONCE(read > cnt + 1); 771 return 0; 772 } 773 774 return cnt - read; 775 } 776 777 /* 778 * rb_wake_up_waiters - wake up tasks waiting for ring buffer input 779 * 780 * Schedules a delayed work to wake up any task that is blocked on the 781 * ring buffer waiters queue. 782 */ 783 static void rb_wake_up_waiters(struct irq_work *work) 784 { 785 struct rb_irq_work *rbwork = container_of(work, struct rb_irq_work, work); 786 787 wake_up_all(&rbwork->waiters); 788 if (rbwork->wakeup_full) { 789 rbwork->wakeup_full = false; 790 wake_up_all(&rbwork->full_waiters); 791 } 792 } 793 794 /** 795 * ring_buffer_wait - wait for input to the ring buffer 796 * @buffer: buffer to wait on 797 * @cpu: the cpu buffer to wait on 798 * @full: wait until the percentage of pages are available, if @cpu != RING_BUFFER_ALL_CPUS 799 * 800 * If @cpu == RING_BUFFER_ALL_CPUS then the task will wake up as soon 801 * as data is added to any of the @buffer's cpu buffers. Otherwise 802 * it will wait for data to be added to a specific cpu buffer. 803 */ 804 int ring_buffer_wait(struct trace_buffer *buffer, int cpu, int full) 805 { 806 struct ring_buffer_per_cpu *cpu_buffer; 807 DEFINE_WAIT(wait); 808 struct rb_irq_work *work; 809 int ret = 0; 810 811 /* 812 * Depending on what the caller is waiting for, either any 813 * data in any cpu buffer, or a specific buffer, put the 814 * caller on the appropriate wait queue. 815 */ 816 if (cpu == RING_BUFFER_ALL_CPUS) { 817 work = &buffer->irq_work; 818 /* Full only makes sense on per cpu reads */ 819 full = 0; 820 } else { 821 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 822 return -ENODEV; 823 cpu_buffer = buffer->buffers[cpu]; 824 work = &cpu_buffer->irq_work; 825 } 826 827 828 while (true) { 829 if (full) 830 prepare_to_wait(&work->full_waiters, &wait, TASK_INTERRUPTIBLE); 831 else 832 prepare_to_wait(&work->waiters, &wait, TASK_INTERRUPTIBLE); 833 834 /* 835 * The events can happen in critical sections where 836 * checking a work queue can cause deadlocks. 837 * After adding a task to the queue, this flag is set 838 * only to notify events to try to wake up the queue 839 * using irq_work. 840 * 841 * We don't clear it even if the buffer is no longer 842 * empty. The flag only causes the next event to run 843 * irq_work to do the work queue wake up. The worse 844 * that can happen if we race with !trace_empty() is that 845 * an event will cause an irq_work to try to wake up 846 * an empty queue. 847 * 848 * There's no reason to protect this flag either, as 849 * the work queue and irq_work logic will do the necessary 850 * synchronization for the wake ups. The only thing 851 * that is necessary is that the wake up happens after 852 * a task has been queued. It's OK for spurious wake ups. 853 */ 854 if (full) 855 work->full_waiters_pending = true; 856 else 857 work->waiters_pending = true; 858 859 if (signal_pending(current)) { 860 ret = -EINTR; 861 break; 862 } 863 864 if (cpu == RING_BUFFER_ALL_CPUS && !ring_buffer_empty(buffer)) 865 break; 866 867 if (cpu != RING_BUFFER_ALL_CPUS && 868 !ring_buffer_empty_cpu(buffer, cpu)) { 869 unsigned long flags; 870 bool pagebusy; 871 size_t nr_pages; 872 size_t dirty; 873 874 if (!full) 875 break; 876 877 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 878 pagebusy = cpu_buffer->reader_page == cpu_buffer->commit_page; 879 nr_pages = cpu_buffer->nr_pages; 880 dirty = ring_buffer_nr_dirty_pages(buffer, cpu); 881 if (!cpu_buffer->shortest_full || 882 cpu_buffer->shortest_full < full) 883 cpu_buffer->shortest_full = full; 884 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 885 if (!pagebusy && 886 (!nr_pages || (dirty * 100) > full * nr_pages)) 887 break; 888 } 889 890 schedule(); 891 } 892 893 if (full) 894 finish_wait(&work->full_waiters, &wait); 895 else 896 finish_wait(&work->waiters, &wait); 897 898 return ret; 899 } 900 901 /** 902 * ring_buffer_poll_wait - poll on buffer input 903 * @buffer: buffer to wait on 904 * @cpu: the cpu buffer to wait on 905 * @filp: the file descriptor 906 * @poll_table: The poll descriptor 907 * 908 * If @cpu == RING_BUFFER_ALL_CPUS then the task will wake up as soon 909 * as data is added to any of the @buffer's cpu buffers. Otherwise 910 * it will wait for data to be added to a specific cpu buffer. 911 * 912 * Returns EPOLLIN | EPOLLRDNORM if data exists in the buffers, 913 * zero otherwise. 914 */ 915 __poll_t ring_buffer_poll_wait(struct trace_buffer *buffer, int cpu, 916 struct file *filp, poll_table *poll_table) 917 { 918 struct ring_buffer_per_cpu *cpu_buffer; 919 struct rb_irq_work *work; 920 921 if (cpu == RING_BUFFER_ALL_CPUS) 922 work = &buffer->irq_work; 923 else { 924 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 925 return -EINVAL; 926 927 cpu_buffer = buffer->buffers[cpu]; 928 work = &cpu_buffer->irq_work; 929 } 930 931 poll_wait(filp, &work->waiters, poll_table); 932 work->waiters_pending = true; 933 /* 934 * There's a tight race between setting the waiters_pending and 935 * checking if the ring buffer is empty. Once the waiters_pending bit 936 * is set, the next event will wake the task up, but we can get stuck 937 * if there's only a single event in. 938 * 939 * FIXME: Ideally, we need a memory barrier on the writer side as well, 940 * but adding a memory barrier to all events will cause too much of a 941 * performance hit in the fast path. We only need a memory barrier when 942 * the buffer goes from empty to having content. But as this race is 943 * extremely small, and it's not a problem if another event comes in, we 944 * will fix it later. 945 */ 946 smp_mb(); 947 948 if ((cpu == RING_BUFFER_ALL_CPUS && !ring_buffer_empty(buffer)) || 949 (cpu != RING_BUFFER_ALL_CPUS && !ring_buffer_empty_cpu(buffer, cpu))) 950 return EPOLLIN | EPOLLRDNORM; 951 return 0; 952 } 953 954 /* buffer may be either ring_buffer or ring_buffer_per_cpu */ 955 #define RB_WARN_ON(b, cond) \ 956 ({ \ 957 int _____ret = unlikely(cond); \ 958 if (_____ret) { \ 959 if (__same_type(*(b), struct ring_buffer_per_cpu)) { \ 960 struct ring_buffer_per_cpu *__b = \ 961 (void *)b; \ 962 atomic_inc(&__b->buffer->record_disabled); \ 963 } else \ 964 atomic_inc(&b->record_disabled); \ 965 WARN_ON(1); \ 966 } \ 967 _____ret; \ 968 }) 969 970 /* Up this if you want to test the TIME_EXTENTS and normalization */ 971 #define DEBUG_SHIFT 0 972 973 static inline u64 rb_time_stamp(struct trace_buffer *buffer) 974 { 975 u64 ts; 976 977 /* Skip retpolines :-( */ 978 if (IS_ENABLED(CONFIG_RETPOLINE) && likely(buffer->clock == trace_clock_local)) 979 ts = trace_clock_local(); 980 else 981 ts = buffer->clock(); 982 983 /* shift to debug/test normalization and TIME_EXTENTS */ 984 return ts << DEBUG_SHIFT; 985 } 986 987 u64 ring_buffer_time_stamp(struct trace_buffer *buffer, int cpu) 988 { 989 u64 time; 990 991 preempt_disable_notrace(); 992 time = rb_time_stamp(buffer); 993 preempt_enable_notrace(); 994 995 return time; 996 } 997 EXPORT_SYMBOL_GPL(ring_buffer_time_stamp); 998 999 void ring_buffer_normalize_time_stamp(struct trace_buffer *buffer, 1000 int cpu, u64 *ts) 1001 { 1002 /* Just stupid testing the normalize function and deltas */ 1003 *ts >>= DEBUG_SHIFT; 1004 } 1005 EXPORT_SYMBOL_GPL(ring_buffer_normalize_time_stamp); 1006 1007 /* 1008 * Making the ring buffer lockless makes things tricky. 1009 * Although writes only happen on the CPU that they are on, 1010 * and they only need to worry about interrupts. Reads can 1011 * happen on any CPU. 1012 * 1013 * The reader page is always off the ring buffer, but when the 1014 * reader finishes with a page, it needs to swap its page with 1015 * a new one from the buffer. The reader needs to take from 1016 * the head (writes go to the tail). But if a writer is in overwrite 1017 * mode and wraps, it must push the head page forward. 1018 * 1019 * Here lies the problem. 1020 * 1021 * The reader must be careful to replace only the head page, and 1022 * not another one. As described at the top of the file in the 1023 * ASCII art, the reader sets its old page to point to the next 1024 * page after head. It then sets the page after head to point to 1025 * the old reader page. But if the writer moves the head page 1026 * during this operation, the reader could end up with the tail. 1027 * 1028 * We use cmpxchg to help prevent this race. We also do something 1029 * special with the page before head. We set the LSB to 1. 1030 * 1031 * When the writer must push the page forward, it will clear the 1032 * bit that points to the head page, move the head, and then set 1033 * the bit that points to the new head page. 1034 * 1035 * We also don't want an interrupt coming in and moving the head 1036 * page on another writer. Thus we use the second LSB to catch 1037 * that too. Thus: 1038 * 1039 * head->list->prev->next bit 1 bit 0 1040 * ------- ------- 1041 * Normal page 0 0 1042 * Points to head page 0 1 1043 * New head page 1 0 1044 * 1045 * Note we can not trust the prev pointer of the head page, because: 1046 * 1047 * +----+ +-----+ +-----+ 1048 * | |------>| T |---X--->| N | 1049 * | |<------| | | | 1050 * +----+ +-----+ +-----+ 1051 * ^ ^ | 1052 * | +-----+ | | 1053 * +----------| R |----------+ | 1054 * | |<-----------+ 1055 * +-----+ 1056 * 1057 * Key: ---X--> HEAD flag set in pointer 1058 * T Tail page 1059 * R Reader page 1060 * N Next page 1061 * 1062 * (see __rb_reserve_next() to see where this happens) 1063 * 1064 * What the above shows is that the reader just swapped out 1065 * the reader page with a page in the buffer, but before it 1066 * could make the new header point back to the new page added 1067 * it was preempted by a writer. The writer moved forward onto 1068 * the new page added by the reader and is about to move forward 1069 * again. 1070 * 1071 * You can see, it is legitimate for the previous pointer of 1072 * the head (or any page) not to point back to itself. But only 1073 * temporarily. 1074 */ 1075 1076 #define RB_PAGE_NORMAL 0UL 1077 #define RB_PAGE_HEAD 1UL 1078 #define RB_PAGE_UPDATE 2UL 1079 1080 1081 #define RB_FLAG_MASK 3UL 1082 1083 /* PAGE_MOVED is not part of the mask */ 1084 #define RB_PAGE_MOVED 4UL 1085 1086 /* 1087 * rb_list_head - remove any bit 1088 */ 1089 static struct list_head *rb_list_head(struct list_head *list) 1090 { 1091 unsigned long val = (unsigned long)list; 1092 1093 return (struct list_head *)(val & ~RB_FLAG_MASK); 1094 } 1095 1096 /* 1097 * rb_is_head_page - test if the given page is the head page 1098 * 1099 * Because the reader may move the head_page pointer, we can 1100 * not trust what the head page is (it may be pointing to 1101 * the reader page). But if the next page is a header page, 1102 * its flags will be non zero. 1103 */ 1104 static inline int 1105 rb_is_head_page(struct ring_buffer_per_cpu *cpu_buffer, 1106 struct buffer_page *page, struct list_head *list) 1107 { 1108 unsigned long val; 1109 1110 val = (unsigned long)list->next; 1111 1112 if ((val & ~RB_FLAG_MASK) != (unsigned long)&page->list) 1113 return RB_PAGE_MOVED; 1114 1115 return val & RB_FLAG_MASK; 1116 } 1117 1118 /* 1119 * rb_is_reader_page 1120 * 1121 * The unique thing about the reader page, is that, if the 1122 * writer is ever on it, the previous pointer never points 1123 * back to the reader page. 1124 */ 1125 static bool rb_is_reader_page(struct buffer_page *page) 1126 { 1127 struct list_head *list = page->list.prev; 1128 1129 return rb_list_head(list->next) != &page->list; 1130 } 1131 1132 /* 1133 * rb_set_list_to_head - set a list_head to be pointing to head. 1134 */ 1135 static void rb_set_list_to_head(struct ring_buffer_per_cpu *cpu_buffer, 1136 struct list_head *list) 1137 { 1138 unsigned long *ptr; 1139 1140 ptr = (unsigned long *)&list->next; 1141 *ptr |= RB_PAGE_HEAD; 1142 *ptr &= ~RB_PAGE_UPDATE; 1143 } 1144 1145 /* 1146 * rb_head_page_activate - sets up head page 1147 */ 1148 static void rb_head_page_activate(struct ring_buffer_per_cpu *cpu_buffer) 1149 { 1150 struct buffer_page *head; 1151 1152 head = cpu_buffer->head_page; 1153 if (!head) 1154 return; 1155 1156 /* 1157 * Set the previous list pointer to have the HEAD flag. 1158 */ 1159 rb_set_list_to_head(cpu_buffer, head->list.prev); 1160 } 1161 1162 static void rb_list_head_clear(struct list_head *list) 1163 { 1164 unsigned long *ptr = (unsigned long *)&list->next; 1165 1166 *ptr &= ~RB_FLAG_MASK; 1167 } 1168 1169 /* 1170 * rb_head_page_deactivate - clears head page ptr (for free list) 1171 */ 1172 static void 1173 rb_head_page_deactivate(struct ring_buffer_per_cpu *cpu_buffer) 1174 { 1175 struct list_head *hd; 1176 1177 /* Go through the whole list and clear any pointers found. */ 1178 rb_list_head_clear(cpu_buffer->pages); 1179 1180 list_for_each(hd, cpu_buffer->pages) 1181 rb_list_head_clear(hd); 1182 } 1183 1184 static int rb_head_page_set(struct ring_buffer_per_cpu *cpu_buffer, 1185 struct buffer_page *head, 1186 struct buffer_page *prev, 1187 int old_flag, int new_flag) 1188 { 1189 struct list_head *list; 1190 unsigned long val = (unsigned long)&head->list; 1191 unsigned long ret; 1192 1193 list = &prev->list; 1194 1195 val &= ~RB_FLAG_MASK; 1196 1197 ret = cmpxchg((unsigned long *)&list->next, 1198 val | old_flag, val | new_flag); 1199 1200 /* check if the reader took the page */ 1201 if ((ret & ~RB_FLAG_MASK) != val) 1202 return RB_PAGE_MOVED; 1203 1204 return ret & RB_FLAG_MASK; 1205 } 1206 1207 static int rb_head_page_set_update(struct ring_buffer_per_cpu *cpu_buffer, 1208 struct buffer_page *head, 1209 struct buffer_page *prev, 1210 int old_flag) 1211 { 1212 return rb_head_page_set(cpu_buffer, head, prev, 1213 old_flag, RB_PAGE_UPDATE); 1214 } 1215 1216 static int rb_head_page_set_head(struct ring_buffer_per_cpu *cpu_buffer, 1217 struct buffer_page *head, 1218 struct buffer_page *prev, 1219 int old_flag) 1220 { 1221 return rb_head_page_set(cpu_buffer, head, prev, 1222 old_flag, RB_PAGE_HEAD); 1223 } 1224 1225 static int rb_head_page_set_normal(struct ring_buffer_per_cpu *cpu_buffer, 1226 struct buffer_page *head, 1227 struct buffer_page *prev, 1228 int old_flag) 1229 { 1230 return rb_head_page_set(cpu_buffer, head, prev, 1231 old_flag, RB_PAGE_NORMAL); 1232 } 1233 1234 static inline void rb_inc_page(struct ring_buffer_per_cpu *cpu_buffer, 1235 struct buffer_page **bpage) 1236 { 1237 struct list_head *p = rb_list_head((*bpage)->list.next); 1238 1239 *bpage = list_entry(p, struct buffer_page, list); 1240 } 1241 1242 static struct buffer_page * 1243 rb_set_head_page(struct ring_buffer_per_cpu *cpu_buffer) 1244 { 1245 struct buffer_page *head; 1246 struct buffer_page *page; 1247 struct list_head *list; 1248 int i; 1249 1250 if (RB_WARN_ON(cpu_buffer, !cpu_buffer->head_page)) 1251 return NULL; 1252 1253 /* sanity check */ 1254 list = cpu_buffer->pages; 1255 if (RB_WARN_ON(cpu_buffer, rb_list_head(list->prev->next) != list)) 1256 return NULL; 1257 1258 page = head = cpu_buffer->head_page; 1259 /* 1260 * It is possible that the writer moves the header behind 1261 * where we started, and we miss in one loop. 1262 * A second loop should grab the header, but we'll do 1263 * three loops just because I'm paranoid. 1264 */ 1265 for (i = 0; i < 3; i++) { 1266 do { 1267 if (rb_is_head_page(cpu_buffer, page, page->list.prev)) { 1268 cpu_buffer->head_page = page; 1269 return page; 1270 } 1271 rb_inc_page(cpu_buffer, &page); 1272 } while (page != head); 1273 } 1274 1275 RB_WARN_ON(cpu_buffer, 1); 1276 1277 return NULL; 1278 } 1279 1280 static int rb_head_page_replace(struct buffer_page *old, 1281 struct buffer_page *new) 1282 { 1283 unsigned long *ptr = (unsigned long *)&old->list.prev->next; 1284 unsigned long val; 1285 unsigned long ret; 1286 1287 val = *ptr & ~RB_FLAG_MASK; 1288 val |= RB_PAGE_HEAD; 1289 1290 ret = cmpxchg(ptr, val, (unsigned long)&new->list); 1291 1292 return ret == val; 1293 } 1294 1295 /* 1296 * rb_tail_page_update - move the tail page forward 1297 */ 1298 static void rb_tail_page_update(struct ring_buffer_per_cpu *cpu_buffer, 1299 struct buffer_page *tail_page, 1300 struct buffer_page *next_page) 1301 { 1302 unsigned long old_entries; 1303 unsigned long old_write; 1304 1305 /* 1306 * The tail page now needs to be moved forward. 1307 * 1308 * We need to reset the tail page, but without messing 1309 * with possible erasing of data brought in by interrupts 1310 * that have moved the tail page and are currently on it. 1311 * 1312 * We add a counter to the write field to denote this. 1313 */ 1314 old_write = local_add_return(RB_WRITE_INTCNT, &next_page->write); 1315 old_entries = local_add_return(RB_WRITE_INTCNT, &next_page->entries); 1316 1317 local_inc(&cpu_buffer->pages_touched); 1318 /* 1319 * Just make sure we have seen our old_write and synchronize 1320 * with any interrupts that come in. 1321 */ 1322 barrier(); 1323 1324 /* 1325 * If the tail page is still the same as what we think 1326 * it is, then it is up to us to update the tail 1327 * pointer. 1328 */ 1329 if (tail_page == READ_ONCE(cpu_buffer->tail_page)) { 1330 /* Zero the write counter */ 1331 unsigned long val = old_write & ~RB_WRITE_MASK; 1332 unsigned long eval = old_entries & ~RB_WRITE_MASK; 1333 1334 /* 1335 * This will only succeed if an interrupt did 1336 * not come in and change it. In which case, we 1337 * do not want to modify it. 1338 * 1339 * We add (void) to let the compiler know that we do not care 1340 * about the return value of these functions. We use the 1341 * cmpxchg to only update if an interrupt did not already 1342 * do it for us. If the cmpxchg fails, we don't care. 1343 */ 1344 (void)local_cmpxchg(&next_page->write, old_write, val); 1345 (void)local_cmpxchg(&next_page->entries, old_entries, eval); 1346 1347 /* 1348 * No need to worry about races with clearing out the commit. 1349 * it only can increment when a commit takes place. But that 1350 * only happens in the outer most nested commit. 1351 */ 1352 local_set(&next_page->page->commit, 0); 1353 1354 /* Again, either we update tail_page or an interrupt does */ 1355 (void)cmpxchg(&cpu_buffer->tail_page, tail_page, next_page); 1356 } 1357 } 1358 1359 static int rb_check_bpage(struct ring_buffer_per_cpu *cpu_buffer, 1360 struct buffer_page *bpage) 1361 { 1362 unsigned long val = (unsigned long)bpage; 1363 1364 if (RB_WARN_ON(cpu_buffer, val & RB_FLAG_MASK)) 1365 return 1; 1366 1367 return 0; 1368 } 1369 1370 /** 1371 * rb_check_list - make sure a pointer to a list has the last bits zero 1372 */ 1373 static int rb_check_list(struct ring_buffer_per_cpu *cpu_buffer, 1374 struct list_head *list) 1375 { 1376 if (RB_WARN_ON(cpu_buffer, rb_list_head(list->prev) != list->prev)) 1377 return 1; 1378 if (RB_WARN_ON(cpu_buffer, rb_list_head(list->next) != list->next)) 1379 return 1; 1380 return 0; 1381 } 1382 1383 /** 1384 * rb_check_pages - integrity check of buffer pages 1385 * @cpu_buffer: CPU buffer with pages to test 1386 * 1387 * As a safety measure we check to make sure the data pages have not 1388 * been corrupted. 1389 */ 1390 static int rb_check_pages(struct ring_buffer_per_cpu *cpu_buffer) 1391 { 1392 struct list_head *head = cpu_buffer->pages; 1393 struct buffer_page *bpage, *tmp; 1394 1395 /* Reset the head page if it exists */ 1396 if (cpu_buffer->head_page) 1397 rb_set_head_page(cpu_buffer); 1398 1399 rb_head_page_deactivate(cpu_buffer); 1400 1401 if (RB_WARN_ON(cpu_buffer, head->next->prev != head)) 1402 return -1; 1403 if (RB_WARN_ON(cpu_buffer, head->prev->next != head)) 1404 return -1; 1405 1406 if (rb_check_list(cpu_buffer, head)) 1407 return -1; 1408 1409 list_for_each_entry_safe(bpage, tmp, head, list) { 1410 if (RB_WARN_ON(cpu_buffer, 1411 bpage->list.next->prev != &bpage->list)) 1412 return -1; 1413 if (RB_WARN_ON(cpu_buffer, 1414 bpage->list.prev->next != &bpage->list)) 1415 return -1; 1416 if (rb_check_list(cpu_buffer, &bpage->list)) 1417 return -1; 1418 } 1419 1420 rb_head_page_activate(cpu_buffer); 1421 1422 return 0; 1423 } 1424 1425 static int __rb_allocate_pages(long nr_pages, struct list_head *pages, int cpu) 1426 { 1427 struct buffer_page *bpage, *tmp; 1428 bool user_thread = current->mm != NULL; 1429 gfp_t mflags; 1430 long i; 1431 1432 /* 1433 * Check if the available memory is there first. 1434 * Note, si_mem_available() only gives us a rough estimate of available 1435 * memory. It may not be accurate. But we don't care, we just want 1436 * to prevent doing any allocation when it is obvious that it is 1437 * not going to succeed. 1438 */ 1439 i = si_mem_available(); 1440 if (i < nr_pages) 1441 return -ENOMEM; 1442 1443 /* 1444 * __GFP_RETRY_MAYFAIL flag makes sure that the allocation fails 1445 * gracefully without invoking oom-killer and the system is not 1446 * destabilized. 1447 */ 1448 mflags = GFP_KERNEL | __GFP_RETRY_MAYFAIL; 1449 1450 /* 1451 * If a user thread allocates too much, and si_mem_available() 1452 * reports there's enough memory, even though there is not. 1453 * Make sure the OOM killer kills this thread. This can happen 1454 * even with RETRY_MAYFAIL because another task may be doing 1455 * an allocation after this task has taken all memory. 1456 * This is the task the OOM killer needs to take out during this 1457 * loop, even if it was triggered by an allocation somewhere else. 1458 */ 1459 if (user_thread) 1460 set_current_oom_origin(); 1461 for (i = 0; i < nr_pages; i++) { 1462 struct page *page; 1463 1464 bpage = kzalloc_node(ALIGN(sizeof(*bpage), cache_line_size()), 1465 mflags, cpu_to_node(cpu)); 1466 if (!bpage) 1467 goto free_pages; 1468 1469 list_add(&bpage->list, pages); 1470 1471 page = alloc_pages_node(cpu_to_node(cpu), mflags, 0); 1472 if (!page) 1473 goto free_pages; 1474 bpage->page = page_address(page); 1475 rb_init_page(bpage->page); 1476 1477 if (user_thread && fatal_signal_pending(current)) 1478 goto free_pages; 1479 } 1480 if (user_thread) 1481 clear_current_oom_origin(); 1482 1483 return 0; 1484 1485 free_pages: 1486 list_for_each_entry_safe(bpage, tmp, pages, list) { 1487 list_del_init(&bpage->list); 1488 free_buffer_page(bpage); 1489 } 1490 if (user_thread) 1491 clear_current_oom_origin(); 1492 1493 return -ENOMEM; 1494 } 1495 1496 static int rb_allocate_pages(struct ring_buffer_per_cpu *cpu_buffer, 1497 unsigned long nr_pages) 1498 { 1499 LIST_HEAD(pages); 1500 1501 WARN_ON(!nr_pages); 1502 1503 if (__rb_allocate_pages(nr_pages, &pages, cpu_buffer->cpu)) 1504 return -ENOMEM; 1505 1506 /* 1507 * The ring buffer page list is a circular list that does not 1508 * start and end with a list head. All page list items point to 1509 * other pages. 1510 */ 1511 cpu_buffer->pages = pages.next; 1512 list_del(&pages); 1513 1514 cpu_buffer->nr_pages = nr_pages; 1515 1516 rb_check_pages(cpu_buffer); 1517 1518 return 0; 1519 } 1520 1521 static struct ring_buffer_per_cpu * 1522 rb_allocate_cpu_buffer(struct trace_buffer *buffer, long nr_pages, int cpu) 1523 { 1524 struct ring_buffer_per_cpu *cpu_buffer; 1525 struct buffer_page *bpage; 1526 struct page *page; 1527 int ret; 1528 1529 cpu_buffer = kzalloc_node(ALIGN(sizeof(*cpu_buffer), cache_line_size()), 1530 GFP_KERNEL, cpu_to_node(cpu)); 1531 if (!cpu_buffer) 1532 return NULL; 1533 1534 cpu_buffer->cpu = cpu; 1535 cpu_buffer->buffer = buffer; 1536 raw_spin_lock_init(&cpu_buffer->reader_lock); 1537 lockdep_set_class(&cpu_buffer->reader_lock, buffer->reader_lock_key); 1538 cpu_buffer->lock = (arch_spinlock_t)__ARCH_SPIN_LOCK_UNLOCKED; 1539 INIT_WORK(&cpu_buffer->update_pages_work, update_pages_handler); 1540 init_completion(&cpu_buffer->update_done); 1541 init_irq_work(&cpu_buffer->irq_work.work, rb_wake_up_waiters); 1542 init_waitqueue_head(&cpu_buffer->irq_work.waiters); 1543 init_waitqueue_head(&cpu_buffer->irq_work.full_waiters); 1544 1545 bpage = kzalloc_node(ALIGN(sizeof(*bpage), cache_line_size()), 1546 GFP_KERNEL, cpu_to_node(cpu)); 1547 if (!bpage) 1548 goto fail_free_buffer; 1549 1550 rb_check_bpage(cpu_buffer, bpage); 1551 1552 cpu_buffer->reader_page = bpage; 1553 page = alloc_pages_node(cpu_to_node(cpu), GFP_KERNEL, 0); 1554 if (!page) 1555 goto fail_free_reader; 1556 bpage->page = page_address(page); 1557 rb_init_page(bpage->page); 1558 1559 INIT_LIST_HEAD(&cpu_buffer->reader_page->list); 1560 INIT_LIST_HEAD(&cpu_buffer->new_pages); 1561 1562 ret = rb_allocate_pages(cpu_buffer, nr_pages); 1563 if (ret < 0) 1564 goto fail_free_reader; 1565 1566 cpu_buffer->head_page 1567 = list_entry(cpu_buffer->pages, struct buffer_page, list); 1568 cpu_buffer->tail_page = cpu_buffer->commit_page = cpu_buffer->head_page; 1569 1570 rb_head_page_activate(cpu_buffer); 1571 1572 return cpu_buffer; 1573 1574 fail_free_reader: 1575 free_buffer_page(cpu_buffer->reader_page); 1576 1577 fail_free_buffer: 1578 kfree(cpu_buffer); 1579 return NULL; 1580 } 1581 1582 static void rb_free_cpu_buffer(struct ring_buffer_per_cpu *cpu_buffer) 1583 { 1584 struct list_head *head = cpu_buffer->pages; 1585 struct buffer_page *bpage, *tmp; 1586 1587 free_buffer_page(cpu_buffer->reader_page); 1588 1589 rb_head_page_deactivate(cpu_buffer); 1590 1591 if (head) { 1592 list_for_each_entry_safe(bpage, tmp, head, list) { 1593 list_del_init(&bpage->list); 1594 free_buffer_page(bpage); 1595 } 1596 bpage = list_entry(head, struct buffer_page, list); 1597 free_buffer_page(bpage); 1598 } 1599 1600 kfree(cpu_buffer); 1601 } 1602 1603 /** 1604 * __ring_buffer_alloc - allocate a new ring_buffer 1605 * @size: the size in bytes per cpu that is needed. 1606 * @flags: attributes to set for the ring buffer. 1607 * @key: ring buffer reader_lock_key. 1608 * 1609 * Currently the only flag that is available is the RB_FL_OVERWRITE 1610 * flag. This flag means that the buffer will overwrite old data 1611 * when the buffer wraps. If this flag is not set, the buffer will 1612 * drop data when the tail hits the head. 1613 */ 1614 struct trace_buffer *__ring_buffer_alloc(unsigned long size, unsigned flags, 1615 struct lock_class_key *key) 1616 { 1617 struct trace_buffer *buffer; 1618 long nr_pages; 1619 int bsize; 1620 int cpu; 1621 int ret; 1622 1623 /* keep it in its own cache line */ 1624 buffer = kzalloc(ALIGN(sizeof(*buffer), cache_line_size()), 1625 GFP_KERNEL); 1626 if (!buffer) 1627 return NULL; 1628 1629 if (!zalloc_cpumask_var(&buffer->cpumask, GFP_KERNEL)) 1630 goto fail_free_buffer; 1631 1632 nr_pages = DIV_ROUND_UP(size, BUF_PAGE_SIZE); 1633 buffer->flags = flags; 1634 buffer->clock = trace_clock_local; 1635 buffer->reader_lock_key = key; 1636 1637 init_irq_work(&buffer->irq_work.work, rb_wake_up_waiters); 1638 init_waitqueue_head(&buffer->irq_work.waiters); 1639 1640 /* need at least two pages */ 1641 if (nr_pages < 2) 1642 nr_pages = 2; 1643 1644 buffer->cpus = nr_cpu_ids; 1645 1646 bsize = sizeof(void *) * nr_cpu_ids; 1647 buffer->buffers = kzalloc(ALIGN(bsize, cache_line_size()), 1648 GFP_KERNEL); 1649 if (!buffer->buffers) 1650 goto fail_free_cpumask; 1651 1652 cpu = raw_smp_processor_id(); 1653 cpumask_set_cpu(cpu, buffer->cpumask); 1654 buffer->buffers[cpu] = rb_allocate_cpu_buffer(buffer, nr_pages, cpu); 1655 if (!buffer->buffers[cpu]) 1656 goto fail_free_buffers; 1657 1658 ret = cpuhp_state_add_instance(CPUHP_TRACE_RB_PREPARE, &buffer->node); 1659 if (ret < 0) 1660 goto fail_free_buffers; 1661 1662 mutex_init(&buffer->mutex); 1663 1664 return buffer; 1665 1666 fail_free_buffers: 1667 for_each_buffer_cpu(buffer, cpu) { 1668 if (buffer->buffers[cpu]) 1669 rb_free_cpu_buffer(buffer->buffers[cpu]); 1670 } 1671 kfree(buffer->buffers); 1672 1673 fail_free_cpumask: 1674 free_cpumask_var(buffer->cpumask); 1675 1676 fail_free_buffer: 1677 kfree(buffer); 1678 return NULL; 1679 } 1680 EXPORT_SYMBOL_GPL(__ring_buffer_alloc); 1681 1682 /** 1683 * ring_buffer_free - free a ring buffer. 1684 * @buffer: the buffer to free. 1685 */ 1686 void 1687 ring_buffer_free(struct trace_buffer *buffer) 1688 { 1689 int cpu; 1690 1691 cpuhp_state_remove_instance(CPUHP_TRACE_RB_PREPARE, &buffer->node); 1692 1693 for_each_buffer_cpu(buffer, cpu) 1694 rb_free_cpu_buffer(buffer->buffers[cpu]); 1695 1696 kfree(buffer->buffers); 1697 free_cpumask_var(buffer->cpumask); 1698 1699 kfree(buffer); 1700 } 1701 EXPORT_SYMBOL_GPL(ring_buffer_free); 1702 1703 void ring_buffer_set_clock(struct trace_buffer *buffer, 1704 u64 (*clock)(void)) 1705 { 1706 buffer->clock = clock; 1707 } 1708 1709 void ring_buffer_set_time_stamp_abs(struct trace_buffer *buffer, bool abs) 1710 { 1711 buffer->time_stamp_abs = abs; 1712 } 1713 1714 bool ring_buffer_time_stamp_abs(struct trace_buffer *buffer) 1715 { 1716 return buffer->time_stamp_abs; 1717 } 1718 1719 static void rb_reset_cpu(struct ring_buffer_per_cpu *cpu_buffer); 1720 1721 static inline unsigned long rb_page_entries(struct buffer_page *bpage) 1722 { 1723 return local_read(&bpage->entries) & RB_WRITE_MASK; 1724 } 1725 1726 static inline unsigned long rb_page_write(struct buffer_page *bpage) 1727 { 1728 return local_read(&bpage->write) & RB_WRITE_MASK; 1729 } 1730 1731 static int 1732 rb_remove_pages(struct ring_buffer_per_cpu *cpu_buffer, unsigned long nr_pages) 1733 { 1734 struct list_head *tail_page, *to_remove, *next_page; 1735 struct buffer_page *to_remove_page, *tmp_iter_page; 1736 struct buffer_page *last_page, *first_page; 1737 unsigned long nr_removed; 1738 unsigned long head_bit; 1739 int page_entries; 1740 1741 head_bit = 0; 1742 1743 raw_spin_lock_irq(&cpu_buffer->reader_lock); 1744 atomic_inc(&cpu_buffer->record_disabled); 1745 /* 1746 * We don't race with the readers since we have acquired the reader 1747 * lock. We also don't race with writers after disabling recording. 1748 * This makes it easy to figure out the first and the last page to be 1749 * removed from the list. We unlink all the pages in between including 1750 * the first and last pages. This is done in a busy loop so that we 1751 * lose the least number of traces. 1752 * The pages are freed after we restart recording and unlock readers. 1753 */ 1754 tail_page = &cpu_buffer->tail_page->list; 1755 1756 /* 1757 * tail page might be on reader page, we remove the next page 1758 * from the ring buffer 1759 */ 1760 if (cpu_buffer->tail_page == cpu_buffer->reader_page) 1761 tail_page = rb_list_head(tail_page->next); 1762 to_remove = tail_page; 1763 1764 /* start of pages to remove */ 1765 first_page = list_entry(rb_list_head(to_remove->next), 1766 struct buffer_page, list); 1767 1768 for (nr_removed = 0; nr_removed < nr_pages; nr_removed++) { 1769 to_remove = rb_list_head(to_remove)->next; 1770 head_bit |= (unsigned long)to_remove & RB_PAGE_HEAD; 1771 } 1772 1773 next_page = rb_list_head(to_remove)->next; 1774 1775 /* 1776 * Now we remove all pages between tail_page and next_page. 1777 * Make sure that we have head_bit value preserved for the 1778 * next page 1779 */ 1780 tail_page->next = (struct list_head *)((unsigned long)next_page | 1781 head_bit); 1782 next_page = rb_list_head(next_page); 1783 next_page->prev = tail_page; 1784 1785 /* make sure pages points to a valid page in the ring buffer */ 1786 cpu_buffer->pages = next_page; 1787 1788 /* update head page */ 1789 if (head_bit) 1790 cpu_buffer->head_page = list_entry(next_page, 1791 struct buffer_page, list); 1792 1793 /* 1794 * change read pointer to make sure any read iterators reset 1795 * themselves 1796 */ 1797 cpu_buffer->read = 0; 1798 1799 /* pages are removed, resume tracing and then free the pages */ 1800 atomic_dec(&cpu_buffer->record_disabled); 1801 raw_spin_unlock_irq(&cpu_buffer->reader_lock); 1802 1803 RB_WARN_ON(cpu_buffer, list_empty(cpu_buffer->pages)); 1804 1805 /* last buffer page to remove */ 1806 last_page = list_entry(rb_list_head(to_remove), struct buffer_page, 1807 list); 1808 tmp_iter_page = first_page; 1809 1810 do { 1811 cond_resched(); 1812 1813 to_remove_page = tmp_iter_page; 1814 rb_inc_page(cpu_buffer, &tmp_iter_page); 1815 1816 /* update the counters */ 1817 page_entries = rb_page_entries(to_remove_page); 1818 if (page_entries) { 1819 /* 1820 * If something was added to this page, it was full 1821 * since it is not the tail page. So we deduct the 1822 * bytes consumed in ring buffer from here. 1823 * Increment overrun to account for the lost events. 1824 */ 1825 local_add(page_entries, &cpu_buffer->overrun); 1826 local_sub(BUF_PAGE_SIZE, &cpu_buffer->entries_bytes); 1827 } 1828 1829 /* 1830 * We have already removed references to this list item, just 1831 * free up the buffer_page and its page 1832 */ 1833 free_buffer_page(to_remove_page); 1834 nr_removed--; 1835 1836 } while (to_remove_page != last_page); 1837 1838 RB_WARN_ON(cpu_buffer, nr_removed); 1839 1840 return nr_removed == 0; 1841 } 1842 1843 static int 1844 rb_insert_pages(struct ring_buffer_per_cpu *cpu_buffer) 1845 { 1846 struct list_head *pages = &cpu_buffer->new_pages; 1847 int retries, success; 1848 1849 raw_spin_lock_irq(&cpu_buffer->reader_lock); 1850 /* 1851 * We are holding the reader lock, so the reader page won't be swapped 1852 * in the ring buffer. Now we are racing with the writer trying to 1853 * move head page and the tail page. 1854 * We are going to adapt the reader page update process where: 1855 * 1. We first splice the start and end of list of new pages between 1856 * the head page and its previous page. 1857 * 2. We cmpxchg the prev_page->next to point from head page to the 1858 * start of new pages list. 1859 * 3. Finally, we update the head->prev to the end of new list. 1860 * 1861 * We will try this process 10 times, to make sure that we don't keep 1862 * spinning. 1863 */ 1864 retries = 10; 1865 success = 0; 1866 while (retries--) { 1867 struct list_head *head_page, *prev_page, *r; 1868 struct list_head *last_page, *first_page; 1869 struct list_head *head_page_with_bit; 1870 1871 head_page = &rb_set_head_page(cpu_buffer)->list; 1872 if (!head_page) 1873 break; 1874 prev_page = head_page->prev; 1875 1876 first_page = pages->next; 1877 last_page = pages->prev; 1878 1879 head_page_with_bit = (struct list_head *) 1880 ((unsigned long)head_page | RB_PAGE_HEAD); 1881 1882 last_page->next = head_page_with_bit; 1883 first_page->prev = prev_page; 1884 1885 r = cmpxchg(&prev_page->next, head_page_with_bit, first_page); 1886 1887 if (r == head_page_with_bit) { 1888 /* 1889 * yay, we replaced the page pointer to our new list, 1890 * now, we just have to update to head page's prev 1891 * pointer to point to end of list 1892 */ 1893 head_page->prev = last_page; 1894 success = 1; 1895 break; 1896 } 1897 } 1898 1899 if (success) 1900 INIT_LIST_HEAD(pages); 1901 /* 1902 * If we weren't successful in adding in new pages, warn and stop 1903 * tracing 1904 */ 1905 RB_WARN_ON(cpu_buffer, !success); 1906 raw_spin_unlock_irq(&cpu_buffer->reader_lock); 1907 1908 /* free pages if they weren't inserted */ 1909 if (!success) { 1910 struct buffer_page *bpage, *tmp; 1911 list_for_each_entry_safe(bpage, tmp, &cpu_buffer->new_pages, 1912 list) { 1913 list_del_init(&bpage->list); 1914 free_buffer_page(bpage); 1915 } 1916 } 1917 return success; 1918 } 1919 1920 static void rb_update_pages(struct ring_buffer_per_cpu *cpu_buffer) 1921 { 1922 int success; 1923 1924 if (cpu_buffer->nr_pages_to_update > 0) 1925 success = rb_insert_pages(cpu_buffer); 1926 else 1927 success = rb_remove_pages(cpu_buffer, 1928 -cpu_buffer->nr_pages_to_update); 1929 1930 if (success) 1931 cpu_buffer->nr_pages += cpu_buffer->nr_pages_to_update; 1932 } 1933 1934 static void update_pages_handler(struct work_struct *work) 1935 { 1936 struct ring_buffer_per_cpu *cpu_buffer = container_of(work, 1937 struct ring_buffer_per_cpu, update_pages_work); 1938 rb_update_pages(cpu_buffer); 1939 complete(&cpu_buffer->update_done); 1940 } 1941 1942 /** 1943 * ring_buffer_resize - resize the ring buffer 1944 * @buffer: the buffer to resize. 1945 * @size: the new size. 1946 * @cpu_id: the cpu buffer to resize 1947 * 1948 * Minimum size is 2 * BUF_PAGE_SIZE. 1949 * 1950 * Returns 0 on success and < 0 on failure. 1951 */ 1952 int ring_buffer_resize(struct trace_buffer *buffer, unsigned long size, 1953 int cpu_id) 1954 { 1955 struct ring_buffer_per_cpu *cpu_buffer; 1956 unsigned long nr_pages; 1957 int cpu, err; 1958 1959 /* 1960 * Always succeed at resizing a non-existent buffer: 1961 */ 1962 if (!buffer) 1963 return 0; 1964 1965 /* Make sure the requested buffer exists */ 1966 if (cpu_id != RING_BUFFER_ALL_CPUS && 1967 !cpumask_test_cpu(cpu_id, buffer->cpumask)) 1968 return 0; 1969 1970 nr_pages = DIV_ROUND_UP(size, BUF_PAGE_SIZE); 1971 1972 /* we need a minimum of two pages */ 1973 if (nr_pages < 2) 1974 nr_pages = 2; 1975 1976 size = nr_pages * BUF_PAGE_SIZE; 1977 1978 /* prevent another thread from changing buffer sizes */ 1979 mutex_lock(&buffer->mutex); 1980 1981 1982 if (cpu_id == RING_BUFFER_ALL_CPUS) { 1983 /* 1984 * Don't succeed if resizing is disabled, as a reader might be 1985 * manipulating the ring buffer and is expecting a sane state while 1986 * this is true. 1987 */ 1988 for_each_buffer_cpu(buffer, cpu) { 1989 cpu_buffer = buffer->buffers[cpu]; 1990 if (atomic_read(&cpu_buffer->resize_disabled)) { 1991 err = -EBUSY; 1992 goto out_err_unlock; 1993 } 1994 } 1995 1996 /* calculate the pages to update */ 1997 for_each_buffer_cpu(buffer, cpu) { 1998 cpu_buffer = buffer->buffers[cpu]; 1999 2000 cpu_buffer->nr_pages_to_update = nr_pages - 2001 cpu_buffer->nr_pages; 2002 /* 2003 * nothing more to do for removing pages or no update 2004 */ 2005 if (cpu_buffer->nr_pages_to_update <= 0) 2006 continue; 2007 /* 2008 * to add pages, make sure all new pages can be 2009 * allocated without receiving ENOMEM 2010 */ 2011 INIT_LIST_HEAD(&cpu_buffer->new_pages); 2012 if (__rb_allocate_pages(cpu_buffer->nr_pages_to_update, 2013 &cpu_buffer->new_pages, cpu)) { 2014 /* not enough memory for new pages */ 2015 err = -ENOMEM; 2016 goto out_err; 2017 } 2018 } 2019 2020 get_online_cpus(); 2021 /* 2022 * Fire off all the required work handlers 2023 * We can't schedule on offline CPUs, but it's not necessary 2024 * since we can change their buffer sizes without any race. 2025 */ 2026 for_each_buffer_cpu(buffer, cpu) { 2027 cpu_buffer = buffer->buffers[cpu]; 2028 if (!cpu_buffer->nr_pages_to_update) 2029 continue; 2030 2031 /* Can't run something on an offline CPU. */ 2032 if (!cpu_online(cpu)) { 2033 rb_update_pages(cpu_buffer); 2034 cpu_buffer->nr_pages_to_update = 0; 2035 } else { 2036 schedule_work_on(cpu, 2037 &cpu_buffer->update_pages_work); 2038 } 2039 } 2040 2041 /* wait for all the updates to complete */ 2042 for_each_buffer_cpu(buffer, cpu) { 2043 cpu_buffer = buffer->buffers[cpu]; 2044 if (!cpu_buffer->nr_pages_to_update) 2045 continue; 2046 2047 if (cpu_online(cpu)) 2048 wait_for_completion(&cpu_buffer->update_done); 2049 cpu_buffer->nr_pages_to_update = 0; 2050 } 2051 2052 put_online_cpus(); 2053 } else { 2054 /* Make sure this CPU has been initialized */ 2055 if (!cpumask_test_cpu(cpu_id, buffer->cpumask)) 2056 goto out; 2057 2058 cpu_buffer = buffer->buffers[cpu_id]; 2059 2060 if (nr_pages == cpu_buffer->nr_pages) 2061 goto out; 2062 2063 /* 2064 * Don't succeed if resizing is disabled, as a reader might be 2065 * manipulating the ring buffer and is expecting a sane state while 2066 * this is true. 2067 */ 2068 if (atomic_read(&cpu_buffer->resize_disabled)) { 2069 err = -EBUSY; 2070 goto out_err_unlock; 2071 } 2072 2073 cpu_buffer->nr_pages_to_update = nr_pages - 2074 cpu_buffer->nr_pages; 2075 2076 INIT_LIST_HEAD(&cpu_buffer->new_pages); 2077 if (cpu_buffer->nr_pages_to_update > 0 && 2078 __rb_allocate_pages(cpu_buffer->nr_pages_to_update, 2079 &cpu_buffer->new_pages, cpu_id)) { 2080 err = -ENOMEM; 2081 goto out_err; 2082 } 2083 2084 get_online_cpus(); 2085 2086 /* Can't run something on an offline CPU. */ 2087 if (!cpu_online(cpu_id)) 2088 rb_update_pages(cpu_buffer); 2089 else { 2090 schedule_work_on(cpu_id, 2091 &cpu_buffer->update_pages_work); 2092 wait_for_completion(&cpu_buffer->update_done); 2093 } 2094 2095 cpu_buffer->nr_pages_to_update = 0; 2096 put_online_cpus(); 2097 } 2098 2099 out: 2100 /* 2101 * The ring buffer resize can happen with the ring buffer 2102 * enabled, so that the update disturbs the tracing as little 2103 * as possible. But if the buffer is disabled, we do not need 2104 * to worry about that, and we can take the time to verify 2105 * that the buffer is not corrupt. 2106 */ 2107 if (atomic_read(&buffer->record_disabled)) { 2108 atomic_inc(&buffer->record_disabled); 2109 /* 2110 * Even though the buffer was disabled, we must make sure 2111 * that it is truly disabled before calling rb_check_pages. 2112 * There could have been a race between checking 2113 * record_disable and incrementing it. 2114 */ 2115 synchronize_rcu(); 2116 for_each_buffer_cpu(buffer, cpu) { 2117 cpu_buffer = buffer->buffers[cpu]; 2118 rb_check_pages(cpu_buffer); 2119 } 2120 atomic_dec(&buffer->record_disabled); 2121 } 2122 2123 mutex_unlock(&buffer->mutex); 2124 return 0; 2125 2126 out_err: 2127 for_each_buffer_cpu(buffer, cpu) { 2128 struct buffer_page *bpage, *tmp; 2129 2130 cpu_buffer = buffer->buffers[cpu]; 2131 cpu_buffer->nr_pages_to_update = 0; 2132 2133 if (list_empty(&cpu_buffer->new_pages)) 2134 continue; 2135 2136 list_for_each_entry_safe(bpage, tmp, &cpu_buffer->new_pages, 2137 list) { 2138 list_del_init(&bpage->list); 2139 free_buffer_page(bpage); 2140 } 2141 } 2142 out_err_unlock: 2143 mutex_unlock(&buffer->mutex); 2144 return err; 2145 } 2146 EXPORT_SYMBOL_GPL(ring_buffer_resize); 2147 2148 void ring_buffer_change_overwrite(struct trace_buffer *buffer, int val) 2149 { 2150 mutex_lock(&buffer->mutex); 2151 if (val) 2152 buffer->flags |= RB_FL_OVERWRITE; 2153 else 2154 buffer->flags &= ~RB_FL_OVERWRITE; 2155 mutex_unlock(&buffer->mutex); 2156 } 2157 EXPORT_SYMBOL_GPL(ring_buffer_change_overwrite); 2158 2159 static __always_inline void *__rb_page_index(struct buffer_page *bpage, unsigned index) 2160 { 2161 return bpage->page->data + index; 2162 } 2163 2164 static __always_inline struct ring_buffer_event * 2165 rb_reader_event(struct ring_buffer_per_cpu *cpu_buffer) 2166 { 2167 return __rb_page_index(cpu_buffer->reader_page, 2168 cpu_buffer->reader_page->read); 2169 } 2170 2171 static __always_inline unsigned rb_page_commit(struct buffer_page *bpage) 2172 { 2173 return local_read(&bpage->page->commit); 2174 } 2175 2176 static struct ring_buffer_event * 2177 rb_iter_head_event(struct ring_buffer_iter *iter) 2178 { 2179 struct ring_buffer_event *event; 2180 struct buffer_page *iter_head_page = iter->head_page; 2181 unsigned long commit; 2182 unsigned length; 2183 2184 if (iter->head != iter->next_event) 2185 return iter->event; 2186 2187 /* 2188 * When the writer goes across pages, it issues a cmpxchg which 2189 * is a mb(), which will synchronize with the rmb here. 2190 * (see rb_tail_page_update() and __rb_reserve_next()) 2191 */ 2192 commit = rb_page_commit(iter_head_page); 2193 smp_rmb(); 2194 event = __rb_page_index(iter_head_page, iter->head); 2195 length = rb_event_length(event); 2196 2197 /* 2198 * READ_ONCE() doesn't work on functions and we don't want the 2199 * compiler doing any crazy optimizations with length. 2200 */ 2201 barrier(); 2202 2203 if ((iter->head + length) > commit || length > BUF_MAX_DATA_SIZE) 2204 /* Writer corrupted the read? */ 2205 goto reset; 2206 2207 memcpy(iter->event, event, length); 2208 /* 2209 * If the page stamp is still the same after this rmb() then the 2210 * event was safely copied without the writer entering the page. 2211 */ 2212 smp_rmb(); 2213 2214 /* Make sure the page didn't change since we read this */ 2215 if (iter->page_stamp != iter_head_page->page->time_stamp || 2216 commit > rb_page_commit(iter_head_page)) 2217 goto reset; 2218 2219 iter->next_event = iter->head + length; 2220 return iter->event; 2221 reset: 2222 /* Reset to the beginning */ 2223 iter->page_stamp = iter->read_stamp = iter->head_page->page->time_stamp; 2224 iter->head = 0; 2225 iter->next_event = 0; 2226 iter->missed_events = 1; 2227 return NULL; 2228 } 2229 2230 /* Size is determined by what has been committed */ 2231 static __always_inline unsigned rb_page_size(struct buffer_page *bpage) 2232 { 2233 return rb_page_commit(bpage); 2234 } 2235 2236 static __always_inline unsigned 2237 rb_commit_index(struct ring_buffer_per_cpu *cpu_buffer) 2238 { 2239 return rb_page_commit(cpu_buffer->commit_page); 2240 } 2241 2242 static __always_inline unsigned 2243 rb_event_index(struct ring_buffer_event *event) 2244 { 2245 unsigned long addr = (unsigned long)event; 2246 2247 return (addr & ~PAGE_MASK) - BUF_PAGE_HDR_SIZE; 2248 } 2249 2250 static void rb_inc_iter(struct ring_buffer_iter *iter) 2251 { 2252 struct ring_buffer_per_cpu *cpu_buffer = iter->cpu_buffer; 2253 2254 /* 2255 * The iterator could be on the reader page (it starts there). 2256 * But the head could have moved, since the reader was 2257 * found. Check for this case and assign the iterator 2258 * to the head page instead of next. 2259 */ 2260 if (iter->head_page == cpu_buffer->reader_page) 2261 iter->head_page = rb_set_head_page(cpu_buffer); 2262 else 2263 rb_inc_page(cpu_buffer, &iter->head_page); 2264 2265 iter->page_stamp = iter->read_stamp = iter->head_page->page->time_stamp; 2266 iter->head = 0; 2267 iter->next_event = 0; 2268 } 2269 2270 /* 2271 * rb_handle_head_page - writer hit the head page 2272 * 2273 * Returns: +1 to retry page 2274 * 0 to continue 2275 * -1 on error 2276 */ 2277 static int 2278 rb_handle_head_page(struct ring_buffer_per_cpu *cpu_buffer, 2279 struct buffer_page *tail_page, 2280 struct buffer_page *next_page) 2281 { 2282 struct buffer_page *new_head; 2283 int entries; 2284 int type; 2285 int ret; 2286 2287 entries = rb_page_entries(next_page); 2288 2289 /* 2290 * The hard part is here. We need to move the head 2291 * forward, and protect against both readers on 2292 * other CPUs and writers coming in via interrupts. 2293 */ 2294 type = rb_head_page_set_update(cpu_buffer, next_page, tail_page, 2295 RB_PAGE_HEAD); 2296 2297 /* 2298 * type can be one of four: 2299 * NORMAL - an interrupt already moved it for us 2300 * HEAD - we are the first to get here. 2301 * UPDATE - we are the interrupt interrupting 2302 * a current move. 2303 * MOVED - a reader on another CPU moved the next 2304 * pointer to its reader page. Give up 2305 * and try again. 2306 */ 2307 2308 switch (type) { 2309 case RB_PAGE_HEAD: 2310 /* 2311 * We changed the head to UPDATE, thus 2312 * it is our responsibility to update 2313 * the counters. 2314 */ 2315 local_add(entries, &cpu_buffer->overrun); 2316 local_sub(BUF_PAGE_SIZE, &cpu_buffer->entries_bytes); 2317 2318 /* 2319 * The entries will be zeroed out when we move the 2320 * tail page. 2321 */ 2322 2323 /* still more to do */ 2324 break; 2325 2326 case RB_PAGE_UPDATE: 2327 /* 2328 * This is an interrupt that interrupt the 2329 * previous update. Still more to do. 2330 */ 2331 break; 2332 case RB_PAGE_NORMAL: 2333 /* 2334 * An interrupt came in before the update 2335 * and processed this for us. 2336 * Nothing left to do. 2337 */ 2338 return 1; 2339 case RB_PAGE_MOVED: 2340 /* 2341 * The reader is on another CPU and just did 2342 * a swap with our next_page. 2343 * Try again. 2344 */ 2345 return 1; 2346 default: 2347 RB_WARN_ON(cpu_buffer, 1); /* WTF??? */ 2348 return -1; 2349 } 2350 2351 /* 2352 * Now that we are here, the old head pointer is 2353 * set to UPDATE. This will keep the reader from 2354 * swapping the head page with the reader page. 2355 * The reader (on another CPU) will spin till 2356 * we are finished. 2357 * 2358 * We just need to protect against interrupts 2359 * doing the job. We will set the next pointer 2360 * to HEAD. After that, we set the old pointer 2361 * to NORMAL, but only if it was HEAD before. 2362 * otherwise we are an interrupt, and only 2363 * want the outer most commit to reset it. 2364 */ 2365 new_head = next_page; 2366 rb_inc_page(cpu_buffer, &new_head); 2367 2368 ret = rb_head_page_set_head(cpu_buffer, new_head, next_page, 2369 RB_PAGE_NORMAL); 2370 2371 /* 2372 * Valid returns are: 2373 * HEAD - an interrupt came in and already set it. 2374 * NORMAL - One of two things: 2375 * 1) We really set it. 2376 * 2) A bunch of interrupts came in and moved 2377 * the page forward again. 2378 */ 2379 switch (ret) { 2380 case RB_PAGE_HEAD: 2381 case RB_PAGE_NORMAL: 2382 /* OK */ 2383 break; 2384 default: 2385 RB_WARN_ON(cpu_buffer, 1); 2386 return -1; 2387 } 2388 2389 /* 2390 * It is possible that an interrupt came in, 2391 * set the head up, then more interrupts came in 2392 * and moved it again. When we get back here, 2393 * the page would have been set to NORMAL but we 2394 * just set it back to HEAD. 2395 * 2396 * How do you detect this? Well, if that happened 2397 * the tail page would have moved. 2398 */ 2399 if (ret == RB_PAGE_NORMAL) { 2400 struct buffer_page *buffer_tail_page; 2401 2402 buffer_tail_page = READ_ONCE(cpu_buffer->tail_page); 2403 /* 2404 * If the tail had moved passed next, then we need 2405 * to reset the pointer. 2406 */ 2407 if (buffer_tail_page != tail_page && 2408 buffer_tail_page != next_page) 2409 rb_head_page_set_normal(cpu_buffer, new_head, 2410 next_page, 2411 RB_PAGE_HEAD); 2412 } 2413 2414 /* 2415 * If this was the outer most commit (the one that 2416 * changed the original pointer from HEAD to UPDATE), 2417 * then it is up to us to reset it to NORMAL. 2418 */ 2419 if (type == RB_PAGE_HEAD) { 2420 ret = rb_head_page_set_normal(cpu_buffer, next_page, 2421 tail_page, 2422 RB_PAGE_UPDATE); 2423 if (RB_WARN_ON(cpu_buffer, 2424 ret != RB_PAGE_UPDATE)) 2425 return -1; 2426 } 2427 2428 return 0; 2429 } 2430 2431 static inline void 2432 rb_reset_tail(struct ring_buffer_per_cpu *cpu_buffer, 2433 unsigned long tail, struct rb_event_info *info) 2434 { 2435 struct buffer_page *tail_page = info->tail_page; 2436 struct ring_buffer_event *event; 2437 unsigned long length = info->length; 2438 2439 /* 2440 * Only the event that crossed the page boundary 2441 * must fill the old tail_page with padding. 2442 */ 2443 if (tail >= BUF_PAGE_SIZE) { 2444 /* 2445 * If the page was filled, then we still need 2446 * to update the real_end. Reset it to zero 2447 * and the reader will ignore it. 2448 */ 2449 if (tail == BUF_PAGE_SIZE) 2450 tail_page->real_end = 0; 2451 2452 local_sub(length, &tail_page->write); 2453 return; 2454 } 2455 2456 event = __rb_page_index(tail_page, tail); 2457 2458 /* account for padding bytes */ 2459 local_add(BUF_PAGE_SIZE - tail, &cpu_buffer->entries_bytes); 2460 2461 /* 2462 * Save the original length to the meta data. 2463 * This will be used by the reader to add lost event 2464 * counter. 2465 */ 2466 tail_page->real_end = tail; 2467 2468 /* 2469 * If this event is bigger than the minimum size, then 2470 * we need to be careful that we don't subtract the 2471 * write counter enough to allow another writer to slip 2472 * in on this page. 2473 * We put in a discarded commit instead, to make sure 2474 * that this space is not used again. 2475 * 2476 * If we are less than the minimum size, we don't need to 2477 * worry about it. 2478 */ 2479 if (tail > (BUF_PAGE_SIZE - RB_EVNT_MIN_SIZE)) { 2480 /* No room for any events */ 2481 2482 /* Mark the rest of the page with padding */ 2483 rb_event_set_padding(event); 2484 2485 /* Set the write back to the previous setting */ 2486 local_sub(length, &tail_page->write); 2487 return; 2488 } 2489 2490 /* Put in a discarded event */ 2491 event->array[0] = (BUF_PAGE_SIZE - tail) - RB_EVNT_HDR_SIZE; 2492 event->type_len = RINGBUF_TYPE_PADDING; 2493 /* time delta must be non zero */ 2494 event->time_delta = 1; 2495 2496 /* Set write to end of buffer */ 2497 length = (tail + length) - BUF_PAGE_SIZE; 2498 local_sub(length, &tail_page->write); 2499 } 2500 2501 static inline void rb_end_commit(struct ring_buffer_per_cpu *cpu_buffer); 2502 2503 /* 2504 * This is the slow path, force gcc not to inline it. 2505 */ 2506 static noinline struct ring_buffer_event * 2507 rb_move_tail(struct ring_buffer_per_cpu *cpu_buffer, 2508 unsigned long tail, struct rb_event_info *info) 2509 { 2510 struct buffer_page *tail_page = info->tail_page; 2511 struct buffer_page *commit_page = cpu_buffer->commit_page; 2512 struct trace_buffer *buffer = cpu_buffer->buffer; 2513 struct buffer_page *next_page; 2514 int ret; 2515 2516 next_page = tail_page; 2517 2518 rb_inc_page(cpu_buffer, &next_page); 2519 2520 /* 2521 * If for some reason, we had an interrupt storm that made 2522 * it all the way around the buffer, bail, and warn 2523 * about it. 2524 */ 2525 if (unlikely(next_page == commit_page)) { 2526 local_inc(&cpu_buffer->commit_overrun); 2527 goto out_reset; 2528 } 2529 2530 /* 2531 * This is where the fun begins! 2532 * 2533 * We are fighting against races between a reader that 2534 * could be on another CPU trying to swap its reader 2535 * page with the buffer head. 2536 * 2537 * We are also fighting against interrupts coming in and 2538 * moving the head or tail on us as well. 2539 * 2540 * If the next page is the head page then we have filled 2541 * the buffer, unless the commit page is still on the 2542 * reader page. 2543 */ 2544 if (rb_is_head_page(cpu_buffer, next_page, &tail_page->list)) { 2545 2546 /* 2547 * If the commit is not on the reader page, then 2548 * move the header page. 2549 */ 2550 if (!rb_is_reader_page(cpu_buffer->commit_page)) { 2551 /* 2552 * If we are not in overwrite mode, 2553 * this is easy, just stop here. 2554 */ 2555 if (!(buffer->flags & RB_FL_OVERWRITE)) { 2556 local_inc(&cpu_buffer->dropped_events); 2557 goto out_reset; 2558 } 2559 2560 ret = rb_handle_head_page(cpu_buffer, 2561 tail_page, 2562 next_page); 2563 if (ret < 0) 2564 goto out_reset; 2565 if (ret) 2566 goto out_again; 2567 } else { 2568 /* 2569 * We need to be careful here too. The 2570 * commit page could still be on the reader 2571 * page. We could have a small buffer, and 2572 * have filled up the buffer with events 2573 * from interrupts and such, and wrapped. 2574 * 2575 * Note, if the tail page is also the on the 2576 * reader_page, we let it move out. 2577 */ 2578 if (unlikely((cpu_buffer->commit_page != 2579 cpu_buffer->tail_page) && 2580 (cpu_buffer->commit_page == 2581 cpu_buffer->reader_page))) { 2582 local_inc(&cpu_buffer->commit_overrun); 2583 goto out_reset; 2584 } 2585 } 2586 } 2587 2588 rb_tail_page_update(cpu_buffer, tail_page, next_page); 2589 2590 out_again: 2591 2592 rb_reset_tail(cpu_buffer, tail, info); 2593 2594 /* Commit what we have for now. */ 2595 rb_end_commit(cpu_buffer); 2596 /* rb_end_commit() decs committing */ 2597 local_inc(&cpu_buffer->committing); 2598 2599 /* fail and let the caller try again */ 2600 return ERR_PTR(-EAGAIN); 2601 2602 out_reset: 2603 /* reset write */ 2604 rb_reset_tail(cpu_buffer, tail, info); 2605 2606 return NULL; 2607 } 2608 2609 /* Slow path */ 2610 static struct ring_buffer_event * 2611 rb_add_time_stamp(struct ring_buffer_event *event, u64 delta, bool abs) 2612 { 2613 if (abs) 2614 event->type_len = RINGBUF_TYPE_TIME_STAMP; 2615 else 2616 event->type_len = RINGBUF_TYPE_TIME_EXTEND; 2617 2618 /* Not the first event on the page, or not delta? */ 2619 if (abs || rb_event_index(event)) { 2620 event->time_delta = delta & TS_MASK; 2621 event->array[0] = delta >> TS_SHIFT; 2622 } else { 2623 /* nope, just zero it */ 2624 event->time_delta = 0; 2625 event->array[0] = 0; 2626 } 2627 2628 return skip_time_extend(event); 2629 } 2630 2631 static inline bool rb_event_is_commit(struct ring_buffer_per_cpu *cpu_buffer, 2632 struct ring_buffer_event *event); 2633 2634 #ifndef CONFIG_HAVE_UNSTABLE_SCHED_CLOCK 2635 static inline bool sched_clock_stable(void) 2636 { 2637 return true; 2638 } 2639 #endif 2640 2641 static void 2642 rb_check_timestamp(struct ring_buffer_per_cpu *cpu_buffer, 2643 struct rb_event_info *info) 2644 { 2645 u64 write_stamp; 2646 2647 WARN_ONCE(1, "Delta way too big! %llu ts=%llu before=%llu after=%llu write stamp=%llu\n%s", 2648 (unsigned long long)info->delta, 2649 (unsigned long long)info->ts, 2650 (unsigned long long)info->before, 2651 (unsigned long long)info->after, 2652 (unsigned long long)(rb_time_read(&cpu_buffer->write_stamp, &write_stamp) ? write_stamp : 0), 2653 sched_clock_stable() ? "" : 2654 "If you just came from a suspend/resume,\n" 2655 "please switch to the trace global clock:\n" 2656 " echo global > /sys/kernel/debug/tracing/trace_clock\n" 2657 "or add trace_clock=global to the kernel command line\n"); 2658 } 2659 2660 static void rb_add_timestamp(struct ring_buffer_per_cpu *cpu_buffer, 2661 struct ring_buffer_event **event, 2662 struct rb_event_info *info, 2663 u64 *delta, 2664 unsigned int *length) 2665 { 2666 bool abs = info->add_timestamp & 2667 (RB_ADD_STAMP_FORCE | RB_ADD_STAMP_ABSOLUTE); 2668 2669 if (unlikely(info->delta > (1ULL << 59))) { 2670 /* did the clock go backwards */ 2671 if (info->before == info->after && info->before > info->ts) { 2672 /* not interrupted */ 2673 static int once; 2674 2675 /* 2676 * This is possible with a recalibrating of the TSC. 2677 * Do not produce a call stack, but just report it. 2678 */ 2679 if (!once) { 2680 once++; 2681 pr_warn("Ring buffer clock went backwards: %llu -> %llu\n", 2682 info->before, info->ts); 2683 } 2684 } else 2685 rb_check_timestamp(cpu_buffer, info); 2686 if (!abs) 2687 info->delta = 0; 2688 } 2689 *event = rb_add_time_stamp(*event, info->delta, abs); 2690 *length -= RB_LEN_TIME_EXTEND; 2691 *delta = 0; 2692 } 2693 2694 /** 2695 * rb_update_event - update event type and data 2696 * @cpu_buffer: The per cpu buffer of the @event 2697 * @event: the event to update 2698 * @info: The info to update the @event with (contains length and delta) 2699 * 2700 * Update the type and data fields of the @event. The length 2701 * is the actual size that is written to the ring buffer, 2702 * and with this, we can determine what to place into the 2703 * data field. 2704 */ 2705 static void 2706 rb_update_event(struct ring_buffer_per_cpu *cpu_buffer, 2707 struct ring_buffer_event *event, 2708 struct rb_event_info *info) 2709 { 2710 unsigned length = info->length; 2711 u64 delta = info->delta; 2712 2713 /* 2714 * If we need to add a timestamp, then we 2715 * add it to the start of the reserved space. 2716 */ 2717 if (unlikely(info->add_timestamp)) 2718 rb_add_timestamp(cpu_buffer, &event, info, &delta, &length); 2719 2720 event->time_delta = delta; 2721 length -= RB_EVNT_HDR_SIZE; 2722 if (length > RB_MAX_SMALL_DATA) { 2723 event->type_len = 0; 2724 event->array[0] = length; 2725 } else 2726 event->type_len = DIV_ROUND_UP(length, RB_ALIGNMENT); 2727 } 2728 2729 static unsigned rb_calculate_event_length(unsigned length) 2730 { 2731 struct ring_buffer_event event; /* Used only for sizeof array */ 2732 2733 /* zero length can cause confusions */ 2734 if (!length) 2735 length++; 2736 2737 if (length > RB_MAX_SMALL_DATA) 2738 length += sizeof(event.array[0]); 2739 2740 length += RB_EVNT_HDR_SIZE; 2741 length = ALIGN(length, RB_ALIGNMENT); 2742 2743 /* 2744 * In case the time delta is larger than the 27 bits for it 2745 * in the header, we need to add a timestamp. If another 2746 * event comes in when trying to discard this one to increase 2747 * the length, then the timestamp will be added in the allocated 2748 * space of this event. If length is bigger than the size needed 2749 * for the TIME_EXTEND, then padding has to be used. The events 2750 * length must be either RB_LEN_TIME_EXTEND, or greater than or equal 2751 * to RB_LEN_TIME_EXTEND + 8, as 8 is the minimum size for padding. 2752 * As length is a multiple of 4, we only need to worry if it 2753 * is 12 (RB_LEN_TIME_EXTEND + 4). 2754 */ 2755 if (length == RB_LEN_TIME_EXTEND + RB_ALIGNMENT) 2756 length += RB_ALIGNMENT; 2757 2758 return length; 2759 } 2760 2761 static __always_inline bool 2762 rb_event_is_commit(struct ring_buffer_per_cpu *cpu_buffer, 2763 struct ring_buffer_event *event) 2764 { 2765 unsigned long addr = (unsigned long)event; 2766 unsigned long index; 2767 2768 index = rb_event_index(event); 2769 addr &= PAGE_MASK; 2770 2771 return cpu_buffer->commit_page->page == (void *)addr && 2772 rb_commit_index(cpu_buffer) == index; 2773 } 2774 2775 static u64 rb_time_delta(struct ring_buffer_event *event) 2776 { 2777 switch (event->type_len) { 2778 case RINGBUF_TYPE_PADDING: 2779 return 0; 2780 2781 case RINGBUF_TYPE_TIME_EXTEND: 2782 return ring_buffer_event_time_stamp(event); 2783 2784 case RINGBUF_TYPE_TIME_STAMP: 2785 return 0; 2786 2787 case RINGBUF_TYPE_DATA: 2788 return event->time_delta; 2789 default: 2790 return 0; 2791 } 2792 } 2793 2794 static inline int 2795 rb_try_to_discard(struct ring_buffer_per_cpu *cpu_buffer, 2796 struct ring_buffer_event *event) 2797 { 2798 unsigned long new_index, old_index; 2799 struct buffer_page *bpage; 2800 unsigned long index; 2801 unsigned long addr; 2802 u64 write_stamp; 2803 u64 delta; 2804 2805 new_index = rb_event_index(event); 2806 old_index = new_index + rb_event_ts_length(event); 2807 addr = (unsigned long)event; 2808 addr &= PAGE_MASK; 2809 2810 bpage = READ_ONCE(cpu_buffer->tail_page); 2811 2812 delta = rb_time_delta(event); 2813 2814 if (!rb_time_read(&cpu_buffer->write_stamp, &write_stamp)) 2815 return 0; 2816 2817 /* Make sure the write stamp is read before testing the location */ 2818 barrier(); 2819 2820 if (bpage->page == (void *)addr && rb_page_write(bpage) == old_index) { 2821 unsigned long write_mask = 2822 local_read(&bpage->write) & ~RB_WRITE_MASK; 2823 unsigned long event_length = rb_event_length(event); 2824 2825 /* Something came in, can't discard */ 2826 if (!rb_time_cmpxchg(&cpu_buffer->write_stamp, 2827 write_stamp, write_stamp - delta)) 2828 return 0; 2829 2830 /* 2831 * If an event were to come in now, it would see that the 2832 * write_stamp and the before_stamp are different, and assume 2833 * that this event just added itself before updating 2834 * the write stamp. The interrupting event will fix the 2835 * write stamp for us, and use the before stamp as its delta. 2836 */ 2837 2838 /* 2839 * This is on the tail page. It is possible that 2840 * a write could come in and move the tail page 2841 * and write to the next page. That is fine 2842 * because we just shorten what is on this page. 2843 */ 2844 old_index += write_mask; 2845 new_index += write_mask; 2846 index = local_cmpxchg(&bpage->write, old_index, new_index); 2847 if (index == old_index) { 2848 /* update counters */ 2849 local_sub(event_length, &cpu_buffer->entries_bytes); 2850 return 1; 2851 } 2852 } 2853 2854 /* could not discard */ 2855 return 0; 2856 } 2857 2858 static void rb_start_commit(struct ring_buffer_per_cpu *cpu_buffer) 2859 { 2860 local_inc(&cpu_buffer->committing); 2861 local_inc(&cpu_buffer->commits); 2862 } 2863 2864 static __always_inline void 2865 rb_set_commit_to_write(struct ring_buffer_per_cpu *cpu_buffer) 2866 { 2867 unsigned long max_count; 2868 2869 /* 2870 * We only race with interrupts and NMIs on this CPU. 2871 * If we own the commit event, then we can commit 2872 * all others that interrupted us, since the interruptions 2873 * are in stack format (they finish before they come 2874 * back to us). This allows us to do a simple loop to 2875 * assign the commit to the tail. 2876 */ 2877 again: 2878 max_count = cpu_buffer->nr_pages * 100; 2879 2880 while (cpu_buffer->commit_page != READ_ONCE(cpu_buffer->tail_page)) { 2881 if (RB_WARN_ON(cpu_buffer, !(--max_count))) 2882 return; 2883 if (RB_WARN_ON(cpu_buffer, 2884 rb_is_reader_page(cpu_buffer->tail_page))) 2885 return; 2886 local_set(&cpu_buffer->commit_page->page->commit, 2887 rb_page_write(cpu_buffer->commit_page)); 2888 rb_inc_page(cpu_buffer, &cpu_buffer->commit_page); 2889 /* add barrier to keep gcc from optimizing too much */ 2890 barrier(); 2891 } 2892 while (rb_commit_index(cpu_buffer) != 2893 rb_page_write(cpu_buffer->commit_page)) { 2894 2895 local_set(&cpu_buffer->commit_page->page->commit, 2896 rb_page_write(cpu_buffer->commit_page)); 2897 RB_WARN_ON(cpu_buffer, 2898 local_read(&cpu_buffer->commit_page->page->commit) & 2899 ~RB_WRITE_MASK); 2900 barrier(); 2901 } 2902 2903 /* again, keep gcc from optimizing */ 2904 barrier(); 2905 2906 /* 2907 * If an interrupt came in just after the first while loop 2908 * and pushed the tail page forward, we will be left with 2909 * a dangling commit that will never go forward. 2910 */ 2911 if (unlikely(cpu_buffer->commit_page != READ_ONCE(cpu_buffer->tail_page))) 2912 goto again; 2913 } 2914 2915 static __always_inline void rb_end_commit(struct ring_buffer_per_cpu *cpu_buffer) 2916 { 2917 unsigned long commits; 2918 2919 if (RB_WARN_ON(cpu_buffer, 2920 !local_read(&cpu_buffer->committing))) 2921 return; 2922 2923 again: 2924 commits = local_read(&cpu_buffer->commits); 2925 /* synchronize with interrupts */ 2926 barrier(); 2927 if (local_read(&cpu_buffer->committing) == 1) 2928 rb_set_commit_to_write(cpu_buffer); 2929 2930 local_dec(&cpu_buffer->committing); 2931 2932 /* synchronize with interrupts */ 2933 barrier(); 2934 2935 /* 2936 * Need to account for interrupts coming in between the 2937 * updating of the commit page and the clearing of the 2938 * committing counter. 2939 */ 2940 if (unlikely(local_read(&cpu_buffer->commits) != commits) && 2941 !local_read(&cpu_buffer->committing)) { 2942 local_inc(&cpu_buffer->committing); 2943 goto again; 2944 } 2945 } 2946 2947 static inline void rb_event_discard(struct ring_buffer_event *event) 2948 { 2949 if (extended_time(event)) 2950 event = skip_time_extend(event); 2951 2952 /* array[0] holds the actual length for the discarded event */ 2953 event->array[0] = rb_event_data_length(event) - RB_EVNT_HDR_SIZE; 2954 event->type_len = RINGBUF_TYPE_PADDING; 2955 /* time delta must be non zero */ 2956 if (!event->time_delta) 2957 event->time_delta = 1; 2958 } 2959 2960 static void rb_commit(struct ring_buffer_per_cpu *cpu_buffer, 2961 struct ring_buffer_event *event) 2962 { 2963 local_inc(&cpu_buffer->entries); 2964 rb_end_commit(cpu_buffer); 2965 } 2966 2967 static __always_inline void 2968 rb_wakeups(struct trace_buffer *buffer, struct ring_buffer_per_cpu *cpu_buffer) 2969 { 2970 size_t nr_pages; 2971 size_t dirty; 2972 size_t full; 2973 2974 if (buffer->irq_work.waiters_pending) { 2975 buffer->irq_work.waiters_pending = false; 2976 /* irq_work_queue() supplies it's own memory barriers */ 2977 irq_work_queue(&buffer->irq_work.work); 2978 } 2979 2980 if (cpu_buffer->irq_work.waiters_pending) { 2981 cpu_buffer->irq_work.waiters_pending = false; 2982 /* irq_work_queue() supplies it's own memory barriers */ 2983 irq_work_queue(&cpu_buffer->irq_work.work); 2984 } 2985 2986 if (cpu_buffer->last_pages_touch == local_read(&cpu_buffer->pages_touched)) 2987 return; 2988 2989 if (cpu_buffer->reader_page == cpu_buffer->commit_page) 2990 return; 2991 2992 if (!cpu_buffer->irq_work.full_waiters_pending) 2993 return; 2994 2995 cpu_buffer->last_pages_touch = local_read(&cpu_buffer->pages_touched); 2996 2997 full = cpu_buffer->shortest_full; 2998 nr_pages = cpu_buffer->nr_pages; 2999 dirty = ring_buffer_nr_dirty_pages(buffer, cpu_buffer->cpu); 3000 if (full && nr_pages && (dirty * 100) <= full * nr_pages) 3001 return; 3002 3003 cpu_buffer->irq_work.wakeup_full = true; 3004 cpu_buffer->irq_work.full_waiters_pending = false; 3005 /* irq_work_queue() supplies it's own memory barriers */ 3006 irq_work_queue(&cpu_buffer->irq_work.work); 3007 } 3008 3009 /* 3010 * The lock and unlock are done within a preempt disable section. 3011 * The current_context per_cpu variable can only be modified 3012 * by the current task between lock and unlock. But it can 3013 * be modified more than once via an interrupt. To pass this 3014 * information from the lock to the unlock without having to 3015 * access the 'in_interrupt()' functions again (which do show 3016 * a bit of overhead in something as critical as function tracing, 3017 * we use a bitmask trick. 3018 * 3019 * bit 1 = NMI context 3020 * bit 2 = IRQ context 3021 * bit 3 = SoftIRQ context 3022 * bit 4 = normal context. 3023 * 3024 * This works because this is the order of contexts that can 3025 * preempt other contexts. A SoftIRQ never preempts an IRQ 3026 * context. 3027 * 3028 * When the context is determined, the corresponding bit is 3029 * checked and set (if it was set, then a recursion of that context 3030 * happened). 3031 * 3032 * On unlock, we need to clear this bit. To do so, just subtract 3033 * 1 from the current_context and AND it to itself. 3034 * 3035 * (binary) 3036 * 101 - 1 = 100 3037 * 101 & 100 = 100 (clearing bit zero) 3038 * 3039 * 1010 - 1 = 1001 3040 * 1010 & 1001 = 1000 (clearing bit 1) 3041 * 3042 * The least significant bit can be cleared this way, and it 3043 * just so happens that it is the same bit corresponding to 3044 * the current context. 3045 * 3046 * Now the TRANSITION bit breaks the above slightly. The TRANSITION bit 3047 * is set when a recursion is detected at the current context, and if 3048 * the TRANSITION bit is already set, it will fail the recursion. 3049 * This is needed because there's a lag between the changing of 3050 * interrupt context and updating the preempt count. In this case, 3051 * a false positive will be found. To handle this, one extra recursion 3052 * is allowed, and this is done by the TRANSITION bit. If the TRANSITION 3053 * bit is already set, then it is considered a recursion and the function 3054 * ends. Otherwise, the TRANSITION bit is set, and that bit is returned. 3055 * 3056 * On the trace_recursive_unlock(), the TRANSITION bit will be the first 3057 * to be cleared. Even if it wasn't the context that set it. That is, 3058 * if an interrupt comes in while NORMAL bit is set and the ring buffer 3059 * is called before preempt_count() is updated, since the check will 3060 * be on the NORMAL bit, the TRANSITION bit will then be set. If an 3061 * NMI then comes in, it will set the NMI bit, but when the NMI code 3062 * does the trace_recursive_unlock() it will clear the TRANSTION bit 3063 * and leave the NMI bit set. But this is fine, because the interrupt 3064 * code that set the TRANSITION bit will then clear the NMI bit when it 3065 * calls trace_recursive_unlock(). If another NMI comes in, it will 3066 * set the TRANSITION bit and continue. 3067 * 3068 * Note: The TRANSITION bit only handles a single transition between context. 3069 */ 3070 3071 static __always_inline int 3072 trace_recursive_lock(struct ring_buffer_per_cpu *cpu_buffer) 3073 { 3074 unsigned int val = cpu_buffer->current_context; 3075 unsigned long pc = preempt_count(); 3076 int bit; 3077 3078 if (!(pc & (NMI_MASK | HARDIRQ_MASK | SOFTIRQ_OFFSET))) 3079 bit = RB_CTX_NORMAL; 3080 else 3081 bit = pc & NMI_MASK ? RB_CTX_NMI : 3082 pc & HARDIRQ_MASK ? RB_CTX_IRQ : RB_CTX_SOFTIRQ; 3083 3084 if (unlikely(val & (1 << (bit + cpu_buffer->nest)))) { 3085 /* 3086 * It is possible that this was called by transitioning 3087 * between interrupt context, and preempt_count() has not 3088 * been updated yet. In this case, use the TRANSITION bit. 3089 */ 3090 bit = RB_CTX_TRANSITION; 3091 if (val & (1 << (bit + cpu_buffer->nest))) 3092 return 1; 3093 } 3094 3095 val |= (1 << (bit + cpu_buffer->nest)); 3096 cpu_buffer->current_context = val; 3097 3098 return 0; 3099 } 3100 3101 static __always_inline void 3102 trace_recursive_unlock(struct ring_buffer_per_cpu *cpu_buffer) 3103 { 3104 cpu_buffer->current_context &= 3105 cpu_buffer->current_context - (1 << cpu_buffer->nest); 3106 } 3107 3108 /* The recursive locking above uses 5 bits */ 3109 #define NESTED_BITS 5 3110 3111 /** 3112 * ring_buffer_nest_start - Allow to trace while nested 3113 * @buffer: The ring buffer to modify 3114 * 3115 * The ring buffer has a safety mechanism to prevent recursion. 3116 * But there may be a case where a trace needs to be done while 3117 * tracing something else. In this case, calling this function 3118 * will allow this function to nest within a currently active 3119 * ring_buffer_lock_reserve(). 3120 * 3121 * Call this function before calling another ring_buffer_lock_reserve() and 3122 * call ring_buffer_nest_end() after the nested ring_buffer_unlock_commit(). 3123 */ 3124 void ring_buffer_nest_start(struct trace_buffer *buffer) 3125 { 3126 struct ring_buffer_per_cpu *cpu_buffer; 3127 int cpu; 3128 3129 /* Enabled by ring_buffer_nest_end() */ 3130 preempt_disable_notrace(); 3131 cpu = raw_smp_processor_id(); 3132 cpu_buffer = buffer->buffers[cpu]; 3133 /* This is the shift value for the above recursive locking */ 3134 cpu_buffer->nest += NESTED_BITS; 3135 } 3136 3137 /** 3138 * ring_buffer_nest_end - Allow to trace while nested 3139 * @buffer: The ring buffer to modify 3140 * 3141 * Must be called after ring_buffer_nest_start() and after the 3142 * ring_buffer_unlock_commit(). 3143 */ 3144 void ring_buffer_nest_end(struct trace_buffer *buffer) 3145 { 3146 struct ring_buffer_per_cpu *cpu_buffer; 3147 int cpu; 3148 3149 /* disabled by ring_buffer_nest_start() */ 3150 cpu = raw_smp_processor_id(); 3151 cpu_buffer = buffer->buffers[cpu]; 3152 /* This is the shift value for the above recursive locking */ 3153 cpu_buffer->nest -= NESTED_BITS; 3154 preempt_enable_notrace(); 3155 } 3156 3157 /** 3158 * ring_buffer_unlock_commit - commit a reserved 3159 * @buffer: The buffer to commit to 3160 * @event: The event pointer to commit. 3161 * 3162 * This commits the data to the ring buffer, and releases any locks held. 3163 * 3164 * Must be paired with ring_buffer_lock_reserve. 3165 */ 3166 int ring_buffer_unlock_commit(struct trace_buffer *buffer, 3167 struct ring_buffer_event *event) 3168 { 3169 struct ring_buffer_per_cpu *cpu_buffer; 3170 int cpu = raw_smp_processor_id(); 3171 3172 cpu_buffer = buffer->buffers[cpu]; 3173 3174 rb_commit(cpu_buffer, event); 3175 3176 rb_wakeups(buffer, cpu_buffer); 3177 3178 trace_recursive_unlock(cpu_buffer); 3179 3180 preempt_enable_notrace(); 3181 3182 return 0; 3183 } 3184 EXPORT_SYMBOL_GPL(ring_buffer_unlock_commit); 3185 3186 static struct ring_buffer_event * 3187 __rb_reserve_next(struct ring_buffer_per_cpu *cpu_buffer, 3188 struct rb_event_info *info) 3189 { 3190 struct ring_buffer_event *event; 3191 struct buffer_page *tail_page; 3192 unsigned long tail, write, w; 3193 bool a_ok; 3194 bool b_ok; 3195 3196 /* Don't let the compiler play games with cpu_buffer->tail_page */ 3197 tail_page = info->tail_page = READ_ONCE(cpu_buffer->tail_page); 3198 3199 /*A*/ w = local_read(&tail_page->write) & RB_WRITE_MASK; 3200 barrier(); 3201 b_ok = rb_time_read(&cpu_buffer->before_stamp, &info->before); 3202 a_ok = rb_time_read(&cpu_buffer->write_stamp, &info->after); 3203 barrier(); 3204 info->ts = rb_time_stamp(cpu_buffer->buffer); 3205 3206 if ((info->add_timestamp & RB_ADD_STAMP_ABSOLUTE)) { 3207 info->delta = info->ts; 3208 } else { 3209 /* 3210 * If interrupting an event time update, we may need an 3211 * absolute timestamp. 3212 * Don't bother if this is the start of a new page (w == 0). 3213 */ 3214 if (unlikely(!a_ok || !b_ok || (info->before != info->after && w))) { 3215 info->add_timestamp |= RB_ADD_STAMP_FORCE | RB_ADD_STAMP_EXTEND; 3216 info->length += RB_LEN_TIME_EXTEND; 3217 } else { 3218 info->delta = info->ts - info->after; 3219 if (unlikely(test_time_stamp(info->delta))) { 3220 info->add_timestamp |= RB_ADD_STAMP_EXTEND; 3221 info->length += RB_LEN_TIME_EXTEND; 3222 } 3223 } 3224 } 3225 3226 /*B*/ rb_time_set(&cpu_buffer->before_stamp, info->ts); 3227 3228 /*C*/ write = local_add_return(info->length, &tail_page->write); 3229 3230 /* set write to only the index of the write */ 3231 write &= RB_WRITE_MASK; 3232 3233 tail = write - info->length; 3234 3235 /* See if we shot pass the end of this buffer page */ 3236 if (unlikely(write > BUF_PAGE_SIZE)) { 3237 /* before and after may now different, fix it up*/ 3238 b_ok = rb_time_read(&cpu_buffer->before_stamp, &info->before); 3239 a_ok = rb_time_read(&cpu_buffer->write_stamp, &info->after); 3240 if (a_ok && b_ok && info->before != info->after) 3241 (void)rb_time_cmpxchg(&cpu_buffer->before_stamp, 3242 info->before, info->after); 3243 return rb_move_tail(cpu_buffer, tail, info); 3244 } 3245 3246 if (likely(tail == w)) { 3247 u64 save_before; 3248 bool s_ok; 3249 3250 /* Nothing interrupted us between A and C */ 3251 /*D*/ rb_time_set(&cpu_buffer->write_stamp, info->ts); 3252 barrier(); 3253 /*E*/ s_ok = rb_time_read(&cpu_buffer->before_stamp, &save_before); 3254 RB_WARN_ON(cpu_buffer, !s_ok); 3255 if (likely(!(info->add_timestamp & 3256 (RB_ADD_STAMP_FORCE | RB_ADD_STAMP_ABSOLUTE)))) 3257 /* This did not interrupt any time update */ 3258 info->delta = info->ts - info->after; 3259 else 3260 /* Just use full timestamp for inerrupting event */ 3261 info->delta = info->ts; 3262 barrier(); 3263 if (unlikely(info->ts != save_before)) { 3264 /* SLOW PATH - Interrupted between C and E */ 3265 3266 a_ok = rb_time_read(&cpu_buffer->write_stamp, &info->after); 3267 RB_WARN_ON(cpu_buffer, !a_ok); 3268 3269 /* Write stamp must only go forward */ 3270 if (save_before > info->after) { 3271 /* 3272 * We do not care about the result, only that 3273 * it gets updated atomically. 3274 */ 3275 (void)rb_time_cmpxchg(&cpu_buffer->write_stamp, 3276 info->after, save_before); 3277 } 3278 } 3279 } else { 3280 u64 ts; 3281 /* SLOW PATH - Interrupted between A and C */ 3282 a_ok = rb_time_read(&cpu_buffer->write_stamp, &info->after); 3283 /* Was interrupted before here, write_stamp must be valid */ 3284 RB_WARN_ON(cpu_buffer, !a_ok); 3285 ts = rb_time_stamp(cpu_buffer->buffer); 3286 barrier(); 3287 /*E*/ if (write == (local_read(&tail_page->write) & RB_WRITE_MASK) && 3288 info->after < ts && 3289 rb_time_cmpxchg(&cpu_buffer->write_stamp, 3290 info->after, ts)) { 3291 /* Nothing came after this event between C and E */ 3292 info->delta = ts - info->after; 3293 info->ts = ts; 3294 } else { 3295 /* 3296 * Interrupted beween C and E: 3297 * Lost the previous events time stamp. Just set the 3298 * delta to zero, and this will be the same time as 3299 * the event this event interrupted. And the events that 3300 * came after this will still be correct (as they would 3301 * have built their delta on the previous event. 3302 */ 3303 info->delta = 0; 3304 } 3305 info->add_timestamp &= ~RB_ADD_STAMP_FORCE; 3306 } 3307 3308 /* 3309 * If this is the first commit on the page, then it has the same 3310 * timestamp as the page itself. 3311 */ 3312 if (unlikely(!tail && !(info->add_timestamp & 3313 (RB_ADD_STAMP_FORCE | RB_ADD_STAMP_ABSOLUTE)))) 3314 info->delta = 0; 3315 3316 /* We reserved something on the buffer */ 3317 3318 event = __rb_page_index(tail_page, tail); 3319 rb_update_event(cpu_buffer, event, info); 3320 3321 local_inc(&tail_page->entries); 3322 3323 /* 3324 * If this is the first commit on the page, then update 3325 * its timestamp. 3326 */ 3327 if (unlikely(!tail)) 3328 tail_page->page->time_stamp = info->ts; 3329 3330 /* account for these added bytes */ 3331 local_add(info->length, &cpu_buffer->entries_bytes); 3332 3333 return event; 3334 } 3335 3336 static __always_inline struct ring_buffer_event * 3337 rb_reserve_next_event(struct trace_buffer *buffer, 3338 struct ring_buffer_per_cpu *cpu_buffer, 3339 unsigned long length) 3340 { 3341 struct ring_buffer_event *event; 3342 struct rb_event_info info; 3343 int nr_loops = 0; 3344 int add_ts_default; 3345 3346 rb_start_commit(cpu_buffer); 3347 /* The commit page can not change after this */ 3348 3349 #ifdef CONFIG_RING_BUFFER_ALLOW_SWAP 3350 /* 3351 * Due to the ability to swap a cpu buffer from a buffer 3352 * it is possible it was swapped before we committed. 3353 * (committing stops a swap). We check for it here and 3354 * if it happened, we have to fail the write. 3355 */ 3356 barrier(); 3357 if (unlikely(READ_ONCE(cpu_buffer->buffer) != buffer)) { 3358 local_dec(&cpu_buffer->committing); 3359 local_dec(&cpu_buffer->commits); 3360 return NULL; 3361 } 3362 #endif 3363 3364 info.length = rb_calculate_event_length(length); 3365 3366 if (ring_buffer_time_stamp_abs(cpu_buffer->buffer)) { 3367 add_ts_default = RB_ADD_STAMP_ABSOLUTE; 3368 info.length += RB_LEN_TIME_EXTEND; 3369 } else { 3370 add_ts_default = RB_ADD_STAMP_NONE; 3371 } 3372 3373 again: 3374 info.add_timestamp = add_ts_default; 3375 info.delta = 0; 3376 3377 /* 3378 * We allow for interrupts to reenter here and do a trace. 3379 * If one does, it will cause this original code to loop 3380 * back here. Even with heavy interrupts happening, this 3381 * should only happen a few times in a row. If this happens 3382 * 1000 times in a row, there must be either an interrupt 3383 * storm or we have something buggy. 3384 * Bail! 3385 */ 3386 if (RB_WARN_ON(cpu_buffer, ++nr_loops > 1000)) 3387 goto out_fail; 3388 3389 event = __rb_reserve_next(cpu_buffer, &info); 3390 3391 if (unlikely(PTR_ERR(event) == -EAGAIN)) { 3392 if (info.add_timestamp & (RB_ADD_STAMP_FORCE | RB_ADD_STAMP_EXTEND)) 3393 info.length -= RB_LEN_TIME_EXTEND; 3394 goto again; 3395 } 3396 3397 if (likely(event)) 3398 return event; 3399 out_fail: 3400 rb_end_commit(cpu_buffer); 3401 return NULL; 3402 } 3403 3404 /** 3405 * ring_buffer_lock_reserve - reserve a part of the buffer 3406 * @buffer: the ring buffer to reserve from 3407 * @length: the length of the data to reserve (excluding event header) 3408 * 3409 * Returns a reserved event on the ring buffer to copy directly to. 3410 * The user of this interface will need to get the body to write into 3411 * and can use the ring_buffer_event_data() interface. 3412 * 3413 * The length is the length of the data needed, not the event length 3414 * which also includes the event header. 3415 * 3416 * Must be paired with ring_buffer_unlock_commit, unless NULL is returned. 3417 * If NULL is returned, then nothing has been allocated or locked. 3418 */ 3419 struct ring_buffer_event * 3420 ring_buffer_lock_reserve(struct trace_buffer *buffer, unsigned long length) 3421 { 3422 struct ring_buffer_per_cpu *cpu_buffer; 3423 struct ring_buffer_event *event; 3424 int cpu; 3425 3426 /* If we are tracing schedule, we don't want to recurse */ 3427 preempt_disable_notrace(); 3428 3429 if (unlikely(atomic_read(&buffer->record_disabled))) 3430 goto out; 3431 3432 cpu = raw_smp_processor_id(); 3433 3434 if (unlikely(!cpumask_test_cpu(cpu, buffer->cpumask))) 3435 goto out; 3436 3437 cpu_buffer = buffer->buffers[cpu]; 3438 3439 if (unlikely(atomic_read(&cpu_buffer->record_disabled))) 3440 goto out; 3441 3442 if (unlikely(length > BUF_MAX_DATA_SIZE)) 3443 goto out; 3444 3445 if (unlikely(trace_recursive_lock(cpu_buffer))) 3446 goto out; 3447 3448 event = rb_reserve_next_event(buffer, cpu_buffer, length); 3449 if (!event) 3450 goto out_unlock; 3451 3452 return event; 3453 3454 out_unlock: 3455 trace_recursive_unlock(cpu_buffer); 3456 out: 3457 preempt_enable_notrace(); 3458 return NULL; 3459 } 3460 EXPORT_SYMBOL_GPL(ring_buffer_lock_reserve); 3461 3462 /* 3463 * Decrement the entries to the page that an event is on. 3464 * The event does not even need to exist, only the pointer 3465 * to the page it is on. This may only be called before the commit 3466 * takes place. 3467 */ 3468 static inline void 3469 rb_decrement_entry(struct ring_buffer_per_cpu *cpu_buffer, 3470 struct ring_buffer_event *event) 3471 { 3472 unsigned long addr = (unsigned long)event; 3473 struct buffer_page *bpage = cpu_buffer->commit_page; 3474 struct buffer_page *start; 3475 3476 addr &= PAGE_MASK; 3477 3478 /* Do the likely case first */ 3479 if (likely(bpage->page == (void *)addr)) { 3480 local_dec(&bpage->entries); 3481 return; 3482 } 3483 3484 /* 3485 * Because the commit page may be on the reader page we 3486 * start with the next page and check the end loop there. 3487 */ 3488 rb_inc_page(cpu_buffer, &bpage); 3489 start = bpage; 3490 do { 3491 if (bpage->page == (void *)addr) { 3492 local_dec(&bpage->entries); 3493 return; 3494 } 3495 rb_inc_page(cpu_buffer, &bpage); 3496 } while (bpage != start); 3497 3498 /* commit not part of this buffer?? */ 3499 RB_WARN_ON(cpu_buffer, 1); 3500 } 3501 3502 /** 3503 * ring_buffer_commit_discard - discard an event that has not been committed 3504 * @buffer: the ring buffer 3505 * @event: non committed event to discard 3506 * 3507 * Sometimes an event that is in the ring buffer needs to be ignored. 3508 * This function lets the user discard an event in the ring buffer 3509 * and then that event will not be read later. 3510 * 3511 * This function only works if it is called before the item has been 3512 * committed. It will try to free the event from the ring buffer 3513 * if another event has not been added behind it. 3514 * 3515 * If another event has been added behind it, it will set the event 3516 * up as discarded, and perform the commit. 3517 * 3518 * If this function is called, do not call ring_buffer_unlock_commit on 3519 * the event. 3520 */ 3521 void ring_buffer_discard_commit(struct trace_buffer *buffer, 3522 struct ring_buffer_event *event) 3523 { 3524 struct ring_buffer_per_cpu *cpu_buffer; 3525 int cpu; 3526 3527 /* The event is discarded regardless */ 3528 rb_event_discard(event); 3529 3530 cpu = smp_processor_id(); 3531 cpu_buffer = buffer->buffers[cpu]; 3532 3533 /* 3534 * This must only be called if the event has not been 3535 * committed yet. Thus we can assume that preemption 3536 * is still disabled. 3537 */ 3538 RB_WARN_ON(buffer, !local_read(&cpu_buffer->committing)); 3539 3540 rb_decrement_entry(cpu_buffer, event); 3541 if (rb_try_to_discard(cpu_buffer, event)) 3542 goto out; 3543 3544 out: 3545 rb_end_commit(cpu_buffer); 3546 3547 trace_recursive_unlock(cpu_buffer); 3548 3549 preempt_enable_notrace(); 3550 3551 } 3552 EXPORT_SYMBOL_GPL(ring_buffer_discard_commit); 3553 3554 /** 3555 * ring_buffer_write - write data to the buffer without reserving 3556 * @buffer: The ring buffer to write to. 3557 * @length: The length of the data being written (excluding the event header) 3558 * @data: The data to write to the buffer. 3559 * 3560 * This is like ring_buffer_lock_reserve and ring_buffer_unlock_commit as 3561 * one function. If you already have the data to write to the buffer, it 3562 * may be easier to simply call this function. 3563 * 3564 * Note, like ring_buffer_lock_reserve, the length is the length of the data 3565 * and not the length of the event which would hold the header. 3566 */ 3567 int ring_buffer_write(struct trace_buffer *buffer, 3568 unsigned long length, 3569 void *data) 3570 { 3571 struct ring_buffer_per_cpu *cpu_buffer; 3572 struct ring_buffer_event *event; 3573 void *body; 3574 int ret = -EBUSY; 3575 int cpu; 3576 3577 preempt_disable_notrace(); 3578 3579 if (atomic_read(&buffer->record_disabled)) 3580 goto out; 3581 3582 cpu = raw_smp_processor_id(); 3583 3584 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 3585 goto out; 3586 3587 cpu_buffer = buffer->buffers[cpu]; 3588 3589 if (atomic_read(&cpu_buffer->record_disabled)) 3590 goto out; 3591 3592 if (length > BUF_MAX_DATA_SIZE) 3593 goto out; 3594 3595 if (unlikely(trace_recursive_lock(cpu_buffer))) 3596 goto out; 3597 3598 event = rb_reserve_next_event(buffer, cpu_buffer, length); 3599 if (!event) 3600 goto out_unlock; 3601 3602 body = rb_event_data(event); 3603 3604 memcpy(body, data, length); 3605 3606 rb_commit(cpu_buffer, event); 3607 3608 rb_wakeups(buffer, cpu_buffer); 3609 3610 ret = 0; 3611 3612 out_unlock: 3613 trace_recursive_unlock(cpu_buffer); 3614 3615 out: 3616 preempt_enable_notrace(); 3617 3618 return ret; 3619 } 3620 EXPORT_SYMBOL_GPL(ring_buffer_write); 3621 3622 static bool rb_per_cpu_empty(struct ring_buffer_per_cpu *cpu_buffer) 3623 { 3624 struct buffer_page *reader = cpu_buffer->reader_page; 3625 struct buffer_page *head = rb_set_head_page(cpu_buffer); 3626 struct buffer_page *commit = cpu_buffer->commit_page; 3627 3628 /* In case of error, head will be NULL */ 3629 if (unlikely(!head)) 3630 return true; 3631 3632 return reader->read == rb_page_commit(reader) && 3633 (commit == reader || 3634 (commit == head && 3635 head->read == rb_page_commit(commit))); 3636 } 3637 3638 /** 3639 * ring_buffer_record_disable - stop all writes into the buffer 3640 * @buffer: The ring buffer to stop writes to. 3641 * 3642 * This prevents all writes to the buffer. Any attempt to write 3643 * to the buffer after this will fail and return NULL. 3644 * 3645 * The caller should call synchronize_rcu() after this. 3646 */ 3647 void ring_buffer_record_disable(struct trace_buffer *buffer) 3648 { 3649 atomic_inc(&buffer->record_disabled); 3650 } 3651 EXPORT_SYMBOL_GPL(ring_buffer_record_disable); 3652 3653 /** 3654 * ring_buffer_record_enable - enable writes to the buffer 3655 * @buffer: The ring buffer to enable writes 3656 * 3657 * Note, multiple disables will need the same number of enables 3658 * to truly enable the writing (much like preempt_disable). 3659 */ 3660 void ring_buffer_record_enable(struct trace_buffer *buffer) 3661 { 3662 atomic_dec(&buffer->record_disabled); 3663 } 3664 EXPORT_SYMBOL_GPL(ring_buffer_record_enable); 3665 3666 /** 3667 * ring_buffer_record_off - stop all writes into the buffer 3668 * @buffer: The ring buffer to stop writes to. 3669 * 3670 * This prevents all writes to the buffer. Any attempt to write 3671 * to the buffer after this will fail and return NULL. 3672 * 3673 * This is different than ring_buffer_record_disable() as 3674 * it works like an on/off switch, where as the disable() version 3675 * must be paired with a enable(). 3676 */ 3677 void ring_buffer_record_off(struct trace_buffer *buffer) 3678 { 3679 unsigned int rd; 3680 unsigned int new_rd; 3681 3682 do { 3683 rd = atomic_read(&buffer->record_disabled); 3684 new_rd = rd | RB_BUFFER_OFF; 3685 } while (atomic_cmpxchg(&buffer->record_disabled, rd, new_rd) != rd); 3686 } 3687 EXPORT_SYMBOL_GPL(ring_buffer_record_off); 3688 3689 /** 3690 * ring_buffer_record_on - restart writes into the buffer 3691 * @buffer: The ring buffer to start writes to. 3692 * 3693 * This enables all writes to the buffer that was disabled by 3694 * ring_buffer_record_off(). 3695 * 3696 * This is different than ring_buffer_record_enable() as 3697 * it works like an on/off switch, where as the enable() version 3698 * must be paired with a disable(). 3699 */ 3700 void ring_buffer_record_on(struct trace_buffer *buffer) 3701 { 3702 unsigned int rd; 3703 unsigned int new_rd; 3704 3705 do { 3706 rd = atomic_read(&buffer->record_disabled); 3707 new_rd = rd & ~RB_BUFFER_OFF; 3708 } while (atomic_cmpxchg(&buffer->record_disabled, rd, new_rd) != rd); 3709 } 3710 EXPORT_SYMBOL_GPL(ring_buffer_record_on); 3711 3712 /** 3713 * ring_buffer_record_is_on - return true if the ring buffer can write 3714 * @buffer: The ring buffer to see if write is enabled 3715 * 3716 * Returns true if the ring buffer is in a state that it accepts writes. 3717 */ 3718 bool ring_buffer_record_is_on(struct trace_buffer *buffer) 3719 { 3720 return !atomic_read(&buffer->record_disabled); 3721 } 3722 3723 /** 3724 * ring_buffer_record_is_set_on - return true if the ring buffer is set writable 3725 * @buffer: The ring buffer to see if write is set enabled 3726 * 3727 * Returns true if the ring buffer is set writable by ring_buffer_record_on(). 3728 * Note that this does NOT mean it is in a writable state. 3729 * 3730 * It may return true when the ring buffer has been disabled by 3731 * ring_buffer_record_disable(), as that is a temporary disabling of 3732 * the ring buffer. 3733 */ 3734 bool ring_buffer_record_is_set_on(struct trace_buffer *buffer) 3735 { 3736 return !(atomic_read(&buffer->record_disabled) & RB_BUFFER_OFF); 3737 } 3738 3739 /** 3740 * ring_buffer_record_disable_cpu - stop all writes into the cpu_buffer 3741 * @buffer: The ring buffer to stop writes to. 3742 * @cpu: The CPU buffer to stop 3743 * 3744 * This prevents all writes to the buffer. Any attempt to write 3745 * to the buffer after this will fail and return NULL. 3746 * 3747 * The caller should call synchronize_rcu() after this. 3748 */ 3749 void ring_buffer_record_disable_cpu(struct trace_buffer *buffer, int cpu) 3750 { 3751 struct ring_buffer_per_cpu *cpu_buffer; 3752 3753 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 3754 return; 3755 3756 cpu_buffer = buffer->buffers[cpu]; 3757 atomic_inc(&cpu_buffer->record_disabled); 3758 } 3759 EXPORT_SYMBOL_GPL(ring_buffer_record_disable_cpu); 3760 3761 /** 3762 * ring_buffer_record_enable_cpu - enable writes to the buffer 3763 * @buffer: The ring buffer to enable writes 3764 * @cpu: The CPU to enable. 3765 * 3766 * Note, multiple disables will need the same number of enables 3767 * to truly enable the writing (much like preempt_disable). 3768 */ 3769 void ring_buffer_record_enable_cpu(struct trace_buffer *buffer, int cpu) 3770 { 3771 struct ring_buffer_per_cpu *cpu_buffer; 3772 3773 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 3774 return; 3775 3776 cpu_buffer = buffer->buffers[cpu]; 3777 atomic_dec(&cpu_buffer->record_disabled); 3778 } 3779 EXPORT_SYMBOL_GPL(ring_buffer_record_enable_cpu); 3780 3781 /* 3782 * The total entries in the ring buffer is the running counter 3783 * of entries entered into the ring buffer, minus the sum of 3784 * the entries read from the ring buffer and the number of 3785 * entries that were overwritten. 3786 */ 3787 static inline unsigned long 3788 rb_num_of_entries(struct ring_buffer_per_cpu *cpu_buffer) 3789 { 3790 return local_read(&cpu_buffer->entries) - 3791 (local_read(&cpu_buffer->overrun) + cpu_buffer->read); 3792 } 3793 3794 /** 3795 * ring_buffer_oldest_event_ts - get the oldest event timestamp from the buffer 3796 * @buffer: The ring buffer 3797 * @cpu: The per CPU buffer to read from. 3798 */ 3799 u64 ring_buffer_oldest_event_ts(struct trace_buffer *buffer, int cpu) 3800 { 3801 unsigned long flags; 3802 struct ring_buffer_per_cpu *cpu_buffer; 3803 struct buffer_page *bpage; 3804 u64 ret = 0; 3805 3806 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 3807 return 0; 3808 3809 cpu_buffer = buffer->buffers[cpu]; 3810 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 3811 /* 3812 * if the tail is on reader_page, oldest time stamp is on the reader 3813 * page 3814 */ 3815 if (cpu_buffer->tail_page == cpu_buffer->reader_page) 3816 bpage = cpu_buffer->reader_page; 3817 else 3818 bpage = rb_set_head_page(cpu_buffer); 3819 if (bpage) 3820 ret = bpage->page->time_stamp; 3821 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 3822 3823 return ret; 3824 } 3825 EXPORT_SYMBOL_GPL(ring_buffer_oldest_event_ts); 3826 3827 /** 3828 * ring_buffer_bytes_cpu - get the number of bytes consumed in a cpu buffer 3829 * @buffer: The ring buffer 3830 * @cpu: The per CPU buffer to read from. 3831 */ 3832 unsigned long ring_buffer_bytes_cpu(struct trace_buffer *buffer, int cpu) 3833 { 3834 struct ring_buffer_per_cpu *cpu_buffer; 3835 unsigned long ret; 3836 3837 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 3838 return 0; 3839 3840 cpu_buffer = buffer->buffers[cpu]; 3841 ret = local_read(&cpu_buffer->entries_bytes) - cpu_buffer->read_bytes; 3842 3843 return ret; 3844 } 3845 EXPORT_SYMBOL_GPL(ring_buffer_bytes_cpu); 3846 3847 /** 3848 * ring_buffer_entries_cpu - get the number of entries in a cpu buffer 3849 * @buffer: The ring buffer 3850 * @cpu: The per CPU buffer to get the entries from. 3851 */ 3852 unsigned long ring_buffer_entries_cpu(struct trace_buffer *buffer, int cpu) 3853 { 3854 struct ring_buffer_per_cpu *cpu_buffer; 3855 3856 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 3857 return 0; 3858 3859 cpu_buffer = buffer->buffers[cpu]; 3860 3861 return rb_num_of_entries(cpu_buffer); 3862 } 3863 EXPORT_SYMBOL_GPL(ring_buffer_entries_cpu); 3864 3865 /** 3866 * ring_buffer_overrun_cpu - get the number of overruns caused by the ring 3867 * buffer wrapping around (only if RB_FL_OVERWRITE is on). 3868 * @buffer: The ring buffer 3869 * @cpu: The per CPU buffer to get the number of overruns from 3870 */ 3871 unsigned long ring_buffer_overrun_cpu(struct trace_buffer *buffer, int cpu) 3872 { 3873 struct ring_buffer_per_cpu *cpu_buffer; 3874 unsigned long ret; 3875 3876 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 3877 return 0; 3878 3879 cpu_buffer = buffer->buffers[cpu]; 3880 ret = local_read(&cpu_buffer->overrun); 3881 3882 return ret; 3883 } 3884 EXPORT_SYMBOL_GPL(ring_buffer_overrun_cpu); 3885 3886 /** 3887 * ring_buffer_commit_overrun_cpu - get the number of overruns caused by 3888 * commits failing due to the buffer wrapping around while there are uncommitted 3889 * events, such as during an interrupt storm. 3890 * @buffer: The ring buffer 3891 * @cpu: The per CPU buffer to get the number of overruns from 3892 */ 3893 unsigned long 3894 ring_buffer_commit_overrun_cpu(struct trace_buffer *buffer, int cpu) 3895 { 3896 struct ring_buffer_per_cpu *cpu_buffer; 3897 unsigned long ret; 3898 3899 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 3900 return 0; 3901 3902 cpu_buffer = buffer->buffers[cpu]; 3903 ret = local_read(&cpu_buffer->commit_overrun); 3904 3905 return ret; 3906 } 3907 EXPORT_SYMBOL_GPL(ring_buffer_commit_overrun_cpu); 3908 3909 /** 3910 * ring_buffer_dropped_events_cpu - get the number of dropped events caused by 3911 * the ring buffer filling up (only if RB_FL_OVERWRITE is off). 3912 * @buffer: The ring buffer 3913 * @cpu: The per CPU buffer to get the number of overruns from 3914 */ 3915 unsigned long 3916 ring_buffer_dropped_events_cpu(struct trace_buffer *buffer, int cpu) 3917 { 3918 struct ring_buffer_per_cpu *cpu_buffer; 3919 unsigned long ret; 3920 3921 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 3922 return 0; 3923 3924 cpu_buffer = buffer->buffers[cpu]; 3925 ret = local_read(&cpu_buffer->dropped_events); 3926 3927 return ret; 3928 } 3929 EXPORT_SYMBOL_GPL(ring_buffer_dropped_events_cpu); 3930 3931 /** 3932 * ring_buffer_read_events_cpu - get the number of events successfully read 3933 * @buffer: The ring buffer 3934 * @cpu: The per CPU buffer to get the number of events read 3935 */ 3936 unsigned long 3937 ring_buffer_read_events_cpu(struct trace_buffer *buffer, int cpu) 3938 { 3939 struct ring_buffer_per_cpu *cpu_buffer; 3940 3941 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 3942 return 0; 3943 3944 cpu_buffer = buffer->buffers[cpu]; 3945 return cpu_buffer->read; 3946 } 3947 EXPORT_SYMBOL_GPL(ring_buffer_read_events_cpu); 3948 3949 /** 3950 * ring_buffer_entries - get the number of entries in a buffer 3951 * @buffer: The ring buffer 3952 * 3953 * Returns the total number of entries in the ring buffer 3954 * (all CPU entries) 3955 */ 3956 unsigned long ring_buffer_entries(struct trace_buffer *buffer) 3957 { 3958 struct ring_buffer_per_cpu *cpu_buffer; 3959 unsigned long entries = 0; 3960 int cpu; 3961 3962 /* if you care about this being correct, lock the buffer */ 3963 for_each_buffer_cpu(buffer, cpu) { 3964 cpu_buffer = buffer->buffers[cpu]; 3965 entries += rb_num_of_entries(cpu_buffer); 3966 } 3967 3968 return entries; 3969 } 3970 EXPORT_SYMBOL_GPL(ring_buffer_entries); 3971 3972 /** 3973 * ring_buffer_overruns - get the number of overruns in buffer 3974 * @buffer: The ring buffer 3975 * 3976 * Returns the total number of overruns in the ring buffer 3977 * (all CPU entries) 3978 */ 3979 unsigned long ring_buffer_overruns(struct trace_buffer *buffer) 3980 { 3981 struct ring_buffer_per_cpu *cpu_buffer; 3982 unsigned long overruns = 0; 3983 int cpu; 3984 3985 /* if you care about this being correct, lock the buffer */ 3986 for_each_buffer_cpu(buffer, cpu) { 3987 cpu_buffer = buffer->buffers[cpu]; 3988 overruns += local_read(&cpu_buffer->overrun); 3989 } 3990 3991 return overruns; 3992 } 3993 EXPORT_SYMBOL_GPL(ring_buffer_overruns); 3994 3995 static void rb_iter_reset(struct ring_buffer_iter *iter) 3996 { 3997 struct ring_buffer_per_cpu *cpu_buffer = iter->cpu_buffer; 3998 3999 /* Iterator usage is expected to have record disabled */ 4000 iter->head_page = cpu_buffer->reader_page; 4001 iter->head = cpu_buffer->reader_page->read; 4002 iter->next_event = iter->head; 4003 4004 iter->cache_reader_page = iter->head_page; 4005 iter->cache_read = cpu_buffer->read; 4006 4007 if (iter->head) { 4008 iter->read_stamp = cpu_buffer->read_stamp; 4009 iter->page_stamp = cpu_buffer->reader_page->page->time_stamp; 4010 } else { 4011 iter->read_stamp = iter->head_page->page->time_stamp; 4012 iter->page_stamp = iter->read_stamp; 4013 } 4014 } 4015 4016 /** 4017 * ring_buffer_iter_reset - reset an iterator 4018 * @iter: The iterator to reset 4019 * 4020 * Resets the iterator, so that it will start from the beginning 4021 * again. 4022 */ 4023 void ring_buffer_iter_reset(struct ring_buffer_iter *iter) 4024 { 4025 struct ring_buffer_per_cpu *cpu_buffer; 4026 unsigned long flags; 4027 4028 if (!iter) 4029 return; 4030 4031 cpu_buffer = iter->cpu_buffer; 4032 4033 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 4034 rb_iter_reset(iter); 4035 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 4036 } 4037 EXPORT_SYMBOL_GPL(ring_buffer_iter_reset); 4038 4039 /** 4040 * ring_buffer_iter_empty - check if an iterator has no more to read 4041 * @iter: The iterator to check 4042 */ 4043 int ring_buffer_iter_empty(struct ring_buffer_iter *iter) 4044 { 4045 struct ring_buffer_per_cpu *cpu_buffer; 4046 struct buffer_page *reader; 4047 struct buffer_page *head_page; 4048 struct buffer_page *commit_page; 4049 struct buffer_page *curr_commit_page; 4050 unsigned commit; 4051 u64 curr_commit_ts; 4052 u64 commit_ts; 4053 4054 cpu_buffer = iter->cpu_buffer; 4055 reader = cpu_buffer->reader_page; 4056 head_page = cpu_buffer->head_page; 4057 commit_page = cpu_buffer->commit_page; 4058 commit_ts = commit_page->page->time_stamp; 4059 4060 /* 4061 * When the writer goes across pages, it issues a cmpxchg which 4062 * is a mb(), which will synchronize with the rmb here. 4063 * (see rb_tail_page_update()) 4064 */ 4065 smp_rmb(); 4066 commit = rb_page_commit(commit_page); 4067 /* We want to make sure that the commit page doesn't change */ 4068 smp_rmb(); 4069 4070 /* Make sure commit page didn't change */ 4071 curr_commit_page = READ_ONCE(cpu_buffer->commit_page); 4072 curr_commit_ts = READ_ONCE(curr_commit_page->page->time_stamp); 4073 4074 /* If the commit page changed, then there's more data */ 4075 if (curr_commit_page != commit_page || 4076 curr_commit_ts != commit_ts) 4077 return 0; 4078 4079 /* Still racy, as it may return a false positive, but that's OK */ 4080 return ((iter->head_page == commit_page && iter->head >= commit) || 4081 (iter->head_page == reader && commit_page == head_page && 4082 head_page->read == commit && 4083 iter->head == rb_page_commit(cpu_buffer->reader_page))); 4084 } 4085 EXPORT_SYMBOL_GPL(ring_buffer_iter_empty); 4086 4087 static void 4088 rb_update_read_stamp(struct ring_buffer_per_cpu *cpu_buffer, 4089 struct ring_buffer_event *event) 4090 { 4091 u64 delta; 4092 4093 switch (event->type_len) { 4094 case RINGBUF_TYPE_PADDING: 4095 return; 4096 4097 case RINGBUF_TYPE_TIME_EXTEND: 4098 delta = ring_buffer_event_time_stamp(event); 4099 cpu_buffer->read_stamp += delta; 4100 return; 4101 4102 case RINGBUF_TYPE_TIME_STAMP: 4103 delta = ring_buffer_event_time_stamp(event); 4104 cpu_buffer->read_stamp = delta; 4105 return; 4106 4107 case RINGBUF_TYPE_DATA: 4108 cpu_buffer->read_stamp += event->time_delta; 4109 return; 4110 4111 default: 4112 RB_WARN_ON(cpu_buffer, 1); 4113 } 4114 return; 4115 } 4116 4117 static void 4118 rb_update_iter_read_stamp(struct ring_buffer_iter *iter, 4119 struct ring_buffer_event *event) 4120 { 4121 u64 delta; 4122 4123 switch (event->type_len) { 4124 case RINGBUF_TYPE_PADDING: 4125 return; 4126 4127 case RINGBUF_TYPE_TIME_EXTEND: 4128 delta = ring_buffer_event_time_stamp(event); 4129 iter->read_stamp += delta; 4130 return; 4131 4132 case RINGBUF_TYPE_TIME_STAMP: 4133 delta = ring_buffer_event_time_stamp(event); 4134 iter->read_stamp = delta; 4135 return; 4136 4137 case RINGBUF_TYPE_DATA: 4138 iter->read_stamp += event->time_delta; 4139 return; 4140 4141 default: 4142 RB_WARN_ON(iter->cpu_buffer, 1); 4143 } 4144 return; 4145 } 4146 4147 static struct buffer_page * 4148 rb_get_reader_page(struct ring_buffer_per_cpu *cpu_buffer) 4149 { 4150 struct buffer_page *reader = NULL; 4151 unsigned long overwrite; 4152 unsigned long flags; 4153 int nr_loops = 0; 4154 int ret; 4155 4156 local_irq_save(flags); 4157 arch_spin_lock(&cpu_buffer->lock); 4158 4159 again: 4160 /* 4161 * This should normally only loop twice. But because the 4162 * start of the reader inserts an empty page, it causes 4163 * a case where we will loop three times. There should be no 4164 * reason to loop four times (that I know of). 4165 */ 4166 if (RB_WARN_ON(cpu_buffer, ++nr_loops > 3)) { 4167 reader = NULL; 4168 goto out; 4169 } 4170 4171 reader = cpu_buffer->reader_page; 4172 4173 /* If there's more to read, return this page */ 4174 if (cpu_buffer->reader_page->read < rb_page_size(reader)) 4175 goto out; 4176 4177 /* Never should we have an index greater than the size */ 4178 if (RB_WARN_ON(cpu_buffer, 4179 cpu_buffer->reader_page->read > rb_page_size(reader))) 4180 goto out; 4181 4182 /* check if we caught up to the tail */ 4183 reader = NULL; 4184 if (cpu_buffer->commit_page == cpu_buffer->reader_page) 4185 goto out; 4186 4187 /* Don't bother swapping if the ring buffer is empty */ 4188 if (rb_num_of_entries(cpu_buffer) == 0) 4189 goto out; 4190 4191 /* 4192 * Reset the reader page to size zero. 4193 */ 4194 local_set(&cpu_buffer->reader_page->write, 0); 4195 local_set(&cpu_buffer->reader_page->entries, 0); 4196 local_set(&cpu_buffer->reader_page->page->commit, 0); 4197 cpu_buffer->reader_page->real_end = 0; 4198 4199 spin: 4200 /* 4201 * Splice the empty reader page into the list around the head. 4202 */ 4203 reader = rb_set_head_page(cpu_buffer); 4204 if (!reader) 4205 goto out; 4206 cpu_buffer->reader_page->list.next = rb_list_head(reader->list.next); 4207 cpu_buffer->reader_page->list.prev = reader->list.prev; 4208 4209 /* 4210 * cpu_buffer->pages just needs to point to the buffer, it 4211 * has no specific buffer page to point to. Lets move it out 4212 * of our way so we don't accidentally swap it. 4213 */ 4214 cpu_buffer->pages = reader->list.prev; 4215 4216 /* The reader page will be pointing to the new head */ 4217 rb_set_list_to_head(cpu_buffer, &cpu_buffer->reader_page->list); 4218 4219 /* 4220 * We want to make sure we read the overruns after we set up our 4221 * pointers to the next object. The writer side does a 4222 * cmpxchg to cross pages which acts as the mb on the writer 4223 * side. Note, the reader will constantly fail the swap 4224 * while the writer is updating the pointers, so this 4225 * guarantees that the overwrite recorded here is the one we 4226 * want to compare with the last_overrun. 4227 */ 4228 smp_mb(); 4229 overwrite = local_read(&(cpu_buffer->overrun)); 4230 4231 /* 4232 * Here's the tricky part. 4233 * 4234 * We need to move the pointer past the header page. 4235 * But we can only do that if a writer is not currently 4236 * moving it. The page before the header page has the 4237 * flag bit '1' set if it is pointing to the page we want. 4238 * but if the writer is in the process of moving it 4239 * than it will be '2' or already moved '0'. 4240 */ 4241 4242 ret = rb_head_page_replace(reader, cpu_buffer->reader_page); 4243 4244 /* 4245 * If we did not convert it, then we must try again. 4246 */ 4247 if (!ret) 4248 goto spin; 4249 4250 /* 4251 * Yay! We succeeded in replacing the page. 4252 * 4253 * Now make the new head point back to the reader page. 4254 */ 4255 rb_list_head(reader->list.next)->prev = &cpu_buffer->reader_page->list; 4256 rb_inc_page(cpu_buffer, &cpu_buffer->head_page); 4257 4258 local_inc(&cpu_buffer->pages_read); 4259 4260 /* Finally update the reader page to the new head */ 4261 cpu_buffer->reader_page = reader; 4262 cpu_buffer->reader_page->read = 0; 4263 4264 if (overwrite != cpu_buffer->last_overrun) { 4265 cpu_buffer->lost_events = overwrite - cpu_buffer->last_overrun; 4266 cpu_buffer->last_overrun = overwrite; 4267 } 4268 4269 goto again; 4270 4271 out: 4272 /* Update the read_stamp on the first event */ 4273 if (reader && reader->read == 0) 4274 cpu_buffer->read_stamp = reader->page->time_stamp; 4275 4276 arch_spin_unlock(&cpu_buffer->lock); 4277 local_irq_restore(flags); 4278 4279 return reader; 4280 } 4281 4282 static void rb_advance_reader(struct ring_buffer_per_cpu *cpu_buffer) 4283 { 4284 struct ring_buffer_event *event; 4285 struct buffer_page *reader; 4286 unsigned length; 4287 4288 reader = rb_get_reader_page(cpu_buffer); 4289 4290 /* This function should not be called when buffer is empty */ 4291 if (RB_WARN_ON(cpu_buffer, !reader)) 4292 return; 4293 4294 event = rb_reader_event(cpu_buffer); 4295 4296 if (event->type_len <= RINGBUF_TYPE_DATA_TYPE_LEN_MAX) 4297 cpu_buffer->read++; 4298 4299 rb_update_read_stamp(cpu_buffer, event); 4300 4301 length = rb_event_length(event); 4302 cpu_buffer->reader_page->read += length; 4303 } 4304 4305 static void rb_advance_iter(struct ring_buffer_iter *iter) 4306 { 4307 struct ring_buffer_per_cpu *cpu_buffer; 4308 4309 cpu_buffer = iter->cpu_buffer; 4310 4311 /* If head == next_event then we need to jump to the next event */ 4312 if (iter->head == iter->next_event) { 4313 /* If the event gets overwritten again, there's nothing to do */ 4314 if (rb_iter_head_event(iter) == NULL) 4315 return; 4316 } 4317 4318 iter->head = iter->next_event; 4319 4320 /* 4321 * Check if we are at the end of the buffer. 4322 */ 4323 if (iter->next_event >= rb_page_size(iter->head_page)) { 4324 /* discarded commits can make the page empty */ 4325 if (iter->head_page == cpu_buffer->commit_page) 4326 return; 4327 rb_inc_iter(iter); 4328 return; 4329 } 4330 4331 rb_update_iter_read_stamp(iter, iter->event); 4332 } 4333 4334 static int rb_lost_events(struct ring_buffer_per_cpu *cpu_buffer) 4335 { 4336 return cpu_buffer->lost_events; 4337 } 4338 4339 static struct ring_buffer_event * 4340 rb_buffer_peek(struct ring_buffer_per_cpu *cpu_buffer, u64 *ts, 4341 unsigned long *lost_events) 4342 { 4343 struct ring_buffer_event *event; 4344 struct buffer_page *reader; 4345 int nr_loops = 0; 4346 4347 if (ts) 4348 *ts = 0; 4349 again: 4350 /* 4351 * We repeat when a time extend is encountered. 4352 * Since the time extend is always attached to a data event, 4353 * we should never loop more than once. 4354 * (We never hit the following condition more than twice). 4355 */ 4356 if (RB_WARN_ON(cpu_buffer, ++nr_loops > 2)) 4357 return NULL; 4358 4359 reader = rb_get_reader_page(cpu_buffer); 4360 if (!reader) 4361 return NULL; 4362 4363 event = rb_reader_event(cpu_buffer); 4364 4365 switch (event->type_len) { 4366 case RINGBUF_TYPE_PADDING: 4367 if (rb_null_event(event)) 4368 RB_WARN_ON(cpu_buffer, 1); 4369 /* 4370 * Because the writer could be discarding every 4371 * event it creates (which would probably be bad) 4372 * if we were to go back to "again" then we may never 4373 * catch up, and will trigger the warn on, or lock 4374 * the box. Return the padding, and we will release 4375 * the current locks, and try again. 4376 */ 4377 return event; 4378 4379 case RINGBUF_TYPE_TIME_EXTEND: 4380 /* Internal data, OK to advance */ 4381 rb_advance_reader(cpu_buffer); 4382 goto again; 4383 4384 case RINGBUF_TYPE_TIME_STAMP: 4385 if (ts) { 4386 *ts = ring_buffer_event_time_stamp(event); 4387 ring_buffer_normalize_time_stamp(cpu_buffer->buffer, 4388 cpu_buffer->cpu, ts); 4389 } 4390 /* Internal data, OK to advance */ 4391 rb_advance_reader(cpu_buffer); 4392 goto again; 4393 4394 case RINGBUF_TYPE_DATA: 4395 if (ts && !(*ts)) { 4396 *ts = cpu_buffer->read_stamp + event->time_delta; 4397 ring_buffer_normalize_time_stamp(cpu_buffer->buffer, 4398 cpu_buffer->cpu, ts); 4399 } 4400 if (lost_events) 4401 *lost_events = rb_lost_events(cpu_buffer); 4402 return event; 4403 4404 default: 4405 RB_WARN_ON(cpu_buffer, 1); 4406 } 4407 4408 return NULL; 4409 } 4410 EXPORT_SYMBOL_GPL(ring_buffer_peek); 4411 4412 static struct ring_buffer_event * 4413 rb_iter_peek(struct ring_buffer_iter *iter, u64 *ts) 4414 { 4415 struct trace_buffer *buffer; 4416 struct ring_buffer_per_cpu *cpu_buffer; 4417 struct ring_buffer_event *event; 4418 int nr_loops = 0; 4419 4420 if (ts) 4421 *ts = 0; 4422 4423 cpu_buffer = iter->cpu_buffer; 4424 buffer = cpu_buffer->buffer; 4425 4426 /* 4427 * Check if someone performed a consuming read to 4428 * the buffer. A consuming read invalidates the iterator 4429 * and we need to reset the iterator in this case. 4430 */ 4431 if (unlikely(iter->cache_read != cpu_buffer->read || 4432 iter->cache_reader_page != cpu_buffer->reader_page)) 4433 rb_iter_reset(iter); 4434 4435 again: 4436 if (ring_buffer_iter_empty(iter)) 4437 return NULL; 4438 4439 /* 4440 * As the writer can mess with what the iterator is trying 4441 * to read, just give up if we fail to get an event after 4442 * three tries. The iterator is not as reliable when reading 4443 * the ring buffer with an active write as the consumer is. 4444 * Do not warn if the three failures is reached. 4445 */ 4446 if (++nr_loops > 3) 4447 return NULL; 4448 4449 if (rb_per_cpu_empty(cpu_buffer)) 4450 return NULL; 4451 4452 if (iter->head >= rb_page_size(iter->head_page)) { 4453 rb_inc_iter(iter); 4454 goto again; 4455 } 4456 4457 event = rb_iter_head_event(iter); 4458 if (!event) 4459 goto again; 4460 4461 switch (event->type_len) { 4462 case RINGBUF_TYPE_PADDING: 4463 if (rb_null_event(event)) { 4464 rb_inc_iter(iter); 4465 goto again; 4466 } 4467 rb_advance_iter(iter); 4468 return event; 4469 4470 case RINGBUF_TYPE_TIME_EXTEND: 4471 /* Internal data, OK to advance */ 4472 rb_advance_iter(iter); 4473 goto again; 4474 4475 case RINGBUF_TYPE_TIME_STAMP: 4476 if (ts) { 4477 *ts = ring_buffer_event_time_stamp(event); 4478 ring_buffer_normalize_time_stamp(cpu_buffer->buffer, 4479 cpu_buffer->cpu, ts); 4480 } 4481 /* Internal data, OK to advance */ 4482 rb_advance_iter(iter); 4483 goto again; 4484 4485 case RINGBUF_TYPE_DATA: 4486 if (ts && !(*ts)) { 4487 *ts = iter->read_stamp + event->time_delta; 4488 ring_buffer_normalize_time_stamp(buffer, 4489 cpu_buffer->cpu, ts); 4490 } 4491 return event; 4492 4493 default: 4494 RB_WARN_ON(cpu_buffer, 1); 4495 } 4496 4497 return NULL; 4498 } 4499 EXPORT_SYMBOL_GPL(ring_buffer_iter_peek); 4500 4501 static inline bool rb_reader_lock(struct ring_buffer_per_cpu *cpu_buffer) 4502 { 4503 if (likely(!in_nmi())) { 4504 raw_spin_lock(&cpu_buffer->reader_lock); 4505 return true; 4506 } 4507 4508 /* 4509 * If an NMI die dumps out the content of the ring buffer 4510 * trylock must be used to prevent a deadlock if the NMI 4511 * preempted a task that holds the ring buffer locks. If 4512 * we get the lock then all is fine, if not, then continue 4513 * to do the read, but this can corrupt the ring buffer, 4514 * so it must be permanently disabled from future writes. 4515 * Reading from NMI is a oneshot deal. 4516 */ 4517 if (raw_spin_trylock(&cpu_buffer->reader_lock)) 4518 return true; 4519 4520 /* Continue without locking, but disable the ring buffer */ 4521 atomic_inc(&cpu_buffer->record_disabled); 4522 return false; 4523 } 4524 4525 static inline void 4526 rb_reader_unlock(struct ring_buffer_per_cpu *cpu_buffer, bool locked) 4527 { 4528 if (likely(locked)) 4529 raw_spin_unlock(&cpu_buffer->reader_lock); 4530 return; 4531 } 4532 4533 /** 4534 * ring_buffer_peek - peek at the next event to be read 4535 * @buffer: The ring buffer to read 4536 * @cpu: The cpu to peak at 4537 * @ts: The timestamp counter of this event. 4538 * @lost_events: a variable to store if events were lost (may be NULL) 4539 * 4540 * This will return the event that will be read next, but does 4541 * not consume the data. 4542 */ 4543 struct ring_buffer_event * 4544 ring_buffer_peek(struct trace_buffer *buffer, int cpu, u64 *ts, 4545 unsigned long *lost_events) 4546 { 4547 struct ring_buffer_per_cpu *cpu_buffer = buffer->buffers[cpu]; 4548 struct ring_buffer_event *event; 4549 unsigned long flags; 4550 bool dolock; 4551 4552 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 4553 return NULL; 4554 4555 again: 4556 local_irq_save(flags); 4557 dolock = rb_reader_lock(cpu_buffer); 4558 event = rb_buffer_peek(cpu_buffer, ts, lost_events); 4559 if (event && event->type_len == RINGBUF_TYPE_PADDING) 4560 rb_advance_reader(cpu_buffer); 4561 rb_reader_unlock(cpu_buffer, dolock); 4562 local_irq_restore(flags); 4563 4564 if (event && event->type_len == RINGBUF_TYPE_PADDING) 4565 goto again; 4566 4567 return event; 4568 } 4569 4570 /** ring_buffer_iter_dropped - report if there are dropped events 4571 * @iter: The ring buffer iterator 4572 * 4573 * Returns true if there was dropped events since the last peek. 4574 */ 4575 bool ring_buffer_iter_dropped(struct ring_buffer_iter *iter) 4576 { 4577 bool ret = iter->missed_events != 0; 4578 4579 iter->missed_events = 0; 4580 return ret; 4581 } 4582 EXPORT_SYMBOL_GPL(ring_buffer_iter_dropped); 4583 4584 /** 4585 * ring_buffer_iter_peek - peek at the next event to be read 4586 * @iter: The ring buffer iterator 4587 * @ts: The timestamp counter of this event. 4588 * 4589 * This will return the event that will be read next, but does 4590 * not increment the iterator. 4591 */ 4592 struct ring_buffer_event * 4593 ring_buffer_iter_peek(struct ring_buffer_iter *iter, u64 *ts) 4594 { 4595 struct ring_buffer_per_cpu *cpu_buffer = iter->cpu_buffer; 4596 struct ring_buffer_event *event; 4597 unsigned long flags; 4598 4599 again: 4600 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 4601 event = rb_iter_peek(iter, ts); 4602 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 4603 4604 if (event && event->type_len == RINGBUF_TYPE_PADDING) 4605 goto again; 4606 4607 return event; 4608 } 4609 4610 /** 4611 * ring_buffer_consume - return an event and consume it 4612 * @buffer: The ring buffer to get the next event from 4613 * @cpu: the cpu to read the buffer from 4614 * @ts: a variable to store the timestamp (may be NULL) 4615 * @lost_events: a variable to store if events were lost (may be NULL) 4616 * 4617 * Returns the next event in the ring buffer, and that event is consumed. 4618 * Meaning, that sequential reads will keep returning a different event, 4619 * and eventually empty the ring buffer if the producer is slower. 4620 */ 4621 struct ring_buffer_event * 4622 ring_buffer_consume(struct trace_buffer *buffer, int cpu, u64 *ts, 4623 unsigned long *lost_events) 4624 { 4625 struct ring_buffer_per_cpu *cpu_buffer; 4626 struct ring_buffer_event *event = NULL; 4627 unsigned long flags; 4628 bool dolock; 4629 4630 again: 4631 /* might be called in atomic */ 4632 preempt_disable(); 4633 4634 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 4635 goto out; 4636 4637 cpu_buffer = buffer->buffers[cpu]; 4638 local_irq_save(flags); 4639 dolock = rb_reader_lock(cpu_buffer); 4640 4641 event = rb_buffer_peek(cpu_buffer, ts, lost_events); 4642 if (event) { 4643 cpu_buffer->lost_events = 0; 4644 rb_advance_reader(cpu_buffer); 4645 } 4646 4647 rb_reader_unlock(cpu_buffer, dolock); 4648 local_irq_restore(flags); 4649 4650 out: 4651 preempt_enable(); 4652 4653 if (event && event->type_len == RINGBUF_TYPE_PADDING) 4654 goto again; 4655 4656 return event; 4657 } 4658 EXPORT_SYMBOL_GPL(ring_buffer_consume); 4659 4660 /** 4661 * ring_buffer_read_prepare - Prepare for a non consuming read of the buffer 4662 * @buffer: The ring buffer to read from 4663 * @cpu: The cpu buffer to iterate over 4664 * @flags: gfp flags to use for memory allocation 4665 * 4666 * This performs the initial preparations necessary to iterate 4667 * through the buffer. Memory is allocated, buffer recording 4668 * is disabled, and the iterator pointer is returned to the caller. 4669 * 4670 * Disabling buffer recording prevents the reading from being 4671 * corrupted. This is not a consuming read, so a producer is not 4672 * expected. 4673 * 4674 * After a sequence of ring_buffer_read_prepare calls, the user is 4675 * expected to make at least one call to ring_buffer_read_prepare_sync. 4676 * Afterwards, ring_buffer_read_start is invoked to get things going 4677 * for real. 4678 * 4679 * This overall must be paired with ring_buffer_read_finish. 4680 */ 4681 struct ring_buffer_iter * 4682 ring_buffer_read_prepare(struct trace_buffer *buffer, int cpu, gfp_t flags) 4683 { 4684 struct ring_buffer_per_cpu *cpu_buffer; 4685 struct ring_buffer_iter *iter; 4686 4687 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 4688 return NULL; 4689 4690 iter = kzalloc(sizeof(*iter), flags); 4691 if (!iter) 4692 return NULL; 4693 4694 iter->event = kmalloc(BUF_MAX_DATA_SIZE, flags); 4695 if (!iter->event) { 4696 kfree(iter); 4697 return NULL; 4698 } 4699 4700 cpu_buffer = buffer->buffers[cpu]; 4701 4702 iter->cpu_buffer = cpu_buffer; 4703 4704 atomic_inc(&cpu_buffer->resize_disabled); 4705 4706 return iter; 4707 } 4708 EXPORT_SYMBOL_GPL(ring_buffer_read_prepare); 4709 4710 /** 4711 * ring_buffer_read_prepare_sync - Synchronize a set of prepare calls 4712 * 4713 * All previously invoked ring_buffer_read_prepare calls to prepare 4714 * iterators will be synchronized. Afterwards, read_buffer_read_start 4715 * calls on those iterators are allowed. 4716 */ 4717 void 4718 ring_buffer_read_prepare_sync(void) 4719 { 4720 synchronize_rcu(); 4721 } 4722 EXPORT_SYMBOL_GPL(ring_buffer_read_prepare_sync); 4723 4724 /** 4725 * ring_buffer_read_start - start a non consuming read of the buffer 4726 * @iter: The iterator returned by ring_buffer_read_prepare 4727 * 4728 * This finalizes the startup of an iteration through the buffer. 4729 * The iterator comes from a call to ring_buffer_read_prepare and 4730 * an intervening ring_buffer_read_prepare_sync must have been 4731 * performed. 4732 * 4733 * Must be paired with ring_buffer_read_finish. 4734 */ 4735 void 4736 ring_buffer_read_start(struct ring_buffer_iter *iter) 4737 { 4738 struct ring_buffer_per_cpu *cpu_buffer; 4739 unsigned long flags; 4740 4741 if (!iter) 4742 return; 4743 4744 cpu_buffer = iter->cpu_buffer; 4745 4746 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 4747 arch_spin_lock(&cpu_buffer->lock); 4748 rb_iter_reset(iter); 4749 arch_spin_unlock(&cpu_buffer->lock); 4750 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 4751 } 4752 EXPORT_SYMBOL_GPL(ring_buffer_read_start); 4753 4754 /** 4755 * ring_buffer_read_finish - finish reading the iterator of the buffer 4756 * @iter: The iterator retrieved by ring_buffer_start 4757 * 4758 * This re-enables the recording to the buffer, and frees the 4759 * iterator. 4760 */ 4761 void 4762 ring_buffer_read_finish(struct ring_buffer_iter *iter) 4763 { 4764 struct ring_buffer_per_cpu *cpu_buffer = iter->cpu_buffer; 4765 unsigned long flags; 4766 4767 /* 4768 * Ring buffer is disabled from recording, here's a good place 4769 * to check the integrity of the ring buffer. 4770 * Must prevent readers from trying to read, as the check 4771 * clears the HEAD page and readers require it. 4772 */ 4773 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 4774 rb_check_pages(cpu_buffer); 4775 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 4776 4777 atomic_dec(&cpu_buffer->resize_disabled); 4778 kfree(iter->event); 4779 kfree(iter); 4780 } 4781 EXPORT_SYMBOL_GPL(ring_buffer_read_finish); 4782 4783 /** 4784 * ring_buffer_iter_advance - advance the iterator to the next location 4785 * @iter: The ring buffer iterator 4786 * 4787 * Move the location of the iterator such that the next read will 4788 * be the next location of the iterator. 4789 */ 4790 void ring_buffer_iter_advance(struct ring_buffer_iter *iter) 4791 { 4792 struct ring_buffer_per_cpu *cpu_buffer = iter->cpu_buffer; 4793 unsigned long flags; 4794 4795 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 4796 4797 rb_advance_iter(iter); 4798 4799 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 4800 } 4801 EXPORT_SYMBOL_GPL(ring_buffer_iter_advance); 4802 4803 /** 4804 * ring_buffer_size - return the size of the ring buffer (in bytes) 4805 * @buffer: The ring buffer. 4806 * @cpu: The CPU to get ring buffer size from. 4807 */ 4808 unsigned long ring_buffer_size(struct trace_buffer *buffer, int cpu) 4809 { 4810 /* 4811 * Earlier, this method returned 4812 * BUF_PAGE_SIZE * buffer->nr_pages 4813 * Since the nr_pages field is now removed, we have converted this to 4814 * return the per cpu buffer value. 4815 */ 4816 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 4817 return 0; 4818 4819 return BUF_PAGE_SIZE * buffer->buffers[cpu]->nr_pages; 4820 } 4821 EXPORT_SYMBOL_GPL(ring_buffer_size); 4822 4823 static void 4824 rb_reset_cpu(struct ring_buffer_per_cpu *cpu_buffer) 4825 { 4826 rb_head_page_deactivate(cpu_buffer); 4827 4828 cpu_buffer->head_page 4829 = list_entry(cpu_buffer->pages, struct buffer_page, list); 4830 local_set(&cpu_buffer->head_page->write, 0); 4831 local_set(&cpu_buffer->head_page->entries, 0); 4832 local_set(&cpu_buffer->head_page->page->commit, 0); 4833 4834 cpu_buffer->head_page->read = 0; 4835 4836 cpu_buffer->tail_page = cpu_buffer->head_page; 4837 cpu_buffer->commit_page = cpu_buffer->head_page; 4838 4839 INIT_LIST_HEAD(&cpu_buffer->reader_page->list); 4840 INIT_LIST_HEAD(&cpu_buffer->new_pages); 4841 local_set(&cpu_buffer->reader_page->write, 0); 4842 local_set(&cpu_buffer->reader_page->entries, 0); 4843 local_set(&cpu_buffer->reader_page->page->commit, 0); 4844 cpu_buffer->reader_page->read = 0; 4845 4846 local_set(&cpu_buffer->entries_bytes, 0); 4847 local_set(&cpu_buffer->overrun, 0); 4848 local_set(&cpu_buffer->commit_overrun, 0); 4849 local_set(&cpu_buffer->dropped_events, 0); 4850 local_set(&cpu_buffer->entries, 0); 4851 local_set(&cpu_buffer->committing, 0); 4852 local_set(&cpu_buffer->commits, 0); 4853 local_set(&cpu_buffer->pages_touched, 0); 4854 local_set(&cpu_buffer->pages_read, 0); 4855 cpu_buffer->last_pages_touch = 0; 4856 cpu_buffer->shortest_full = 0; 4857 cpu_buffer->read = 0; 4858 cpu_buffer->read_bytes = 0; 4859 4860 rb_time_set(&cpu_buffer->write_stamp, 0); 4861 rb_time_set(&cpu_buffer->before_stamp, 0); 4862 4863 cpu_buffer->lost_events = 0; 4864 cpu_buffer->last_overrun = 0; 4865 4866 rb_head_page_activate(cpu_buffer); 4867 } 4868 4869 /* Must have disabled the cpu buffer then done a synchronize_rcu */ 4870 static void reset_disabled_cpu_buffer(struct ring_buffer_per_cpu *cpu_buffer) 4871 { 4872 unsigned long flags; 4873 4874 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 4875 4876 if (RB_WARN_ON(cpu_buffer, local_read(&cpu_buffer->committing))) 4877 goto out; 4878 4879 arch_spin_lock(&cpu_buffer->lock); 4880 4881 rb_reset_cpu(cpu_buffer); 4882 4883 arch_spin_unlock(&cpu_buffer->lock); 4884 4885 out: 4886 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 4887 } 4888 4889 /** 4890 * ring_buffer_reset_cpu - reset a ring buffer per CPU buffer 4891 * @buffer: The ring buffer to reset a per cpu buffer of 4892 * @cpu: The CPU buffer to be reset 4893 */ 4894 void ring_buffer_reset_cpu(struct trace_buffer *buffer, int cpu) 4895 { 4896 struct ring_buffer_per_cpu *cpu_buffer = buffer->buffers[cpu]; 4897 4898 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 4899 return; 4900 4901 /* prevent another thread from changing buffer sizes */ 4902 mutex_lock(&buffer->mutex); 4903 4904 atomic_inc(&cpu_buffer->resize_disabled); 4905 atomic_inc(&cpu_buffer->record_disabled); 4906 4907 /* Make sure all commits have finished */ 4908 synchronize_rcu(); 4909 4910 reset_disabled_cpu_buffer(cpu_buffer); 4911 4912 atomic_dec(&cpu_buffer->record_disabled); 4913 atomic_dec(&cpu_buffer->resize_disabled); 4914 4915 mutex_unlock(&buffer->mutex); 4916 } 4917 EXPORT_SYMBOL_GPL(ring_buffer_reset_cpu); 4918 4919 /** 4920 * ring_buffer_reset_cpu - reset a ring buffer per CPU buffer 4921 * @buffer: The ring buffer to reset a per cpu buffer of 4922 * @cpu: The CPU buffer to be reset 4923 */ 4924 void ring_buffer_reset_online_cpus(struct trace_buffer *buffer) 4925 { 4926 struct ring_buffer_per_cpu *cpu_buffer; 4927 int cpu; 4928 4929 /* prevent another thread from changing buffer sizes */ 4930 mutex_lock(&buffer->mutex); 4931 4932 for_each_online_buffer_cpu(buffer, cpu) { 4933 cpu_buffer = buffer->buffers[cpu]; 4934 4935 atomic_inc(&cpu_buffer->resize_disabled); 4936 atomic_inc(&cpu_buffer->record_disabled); 4937 } 4938 4939 /* Make sure all commits have finished */ 4940 synchronize_rcu(); 4941 4942 for_each_online_buffer_cpu(buffer, cpu) { 4943 cpu_buffer = buffer->buffers[cpu]; 4944 4945 reset_disabled_cpu_buffer(cpu_buffer); 4946 4947 atomic_dec(&cpu_buffer->record_disabled); 4948 atomic_dec(&cpu_buffer->resize_disabled); 4949 } 4950 4951 mutex_unlock(&buffer->mutex); 4952 } 4953 4954 /** 4955 * ring_buffer_reset - reset a ring buffer 4956 * @buffer: The ring buffer to reset all cpu buffers 4957 */ 4958 void ring_buffer_reset(struct trace_buffer *buffer) 4959 { 4960 struct ring_buffer_per_cpu *cpu_buffer; 4961 int cpu; 4962 4963 for_each_buffer_cpu(buffer, cpu) { 4964 cpu_buffer = buffer->buffers[cpu]; 4965 4966 atomic_inc(&cpu_buffer->resize_disabled); 4967 atomic_inc(&cpu_buffer->record_disabled); 4968 } 4969 4970 /* Make sure all commits have finished */ 4971 synchronize_rcu(); 4972 4973 for_each_buffer_cpu(buffer, cpu) { 4974 cpu_buffer = buffer->buffers[cpu]; 4975 4976 reset_disabled_cpu_buffer(cpu_buffer); 4977 4978 atomic_dec(&cpu_buffer->record_disabled); 4979 atomic_dec(&cpu_buffer->resize_disabled); 4980 } 4981 } 4982 EXPORT_SYMBOL_GPL(ring_buffer_reset); 4983 4984 /** 4985 * rind_buffer_empty - is the ring buffer empty? 4986 * @buffer: The ring buffer to test 4987 */ 4988 bool ring_buffer_empty(struct trace_buffer *buffer) 4989 { 4990 struct ring_buffer_per_cpu *cpu_buffer; 4991 unsigned long flags; 4992 bool dolock; 4993 int cpu; 4994 int ret; 4995 4996 /* yes this is racy, but if you don't like the race, lock the buffer */ 4997 for_each_buffer_cpu(buffer, cpu) { 4998 cpu_buffer = buffer->buffers[cpu]; 4999 local_irq_save(flags); 5000 dolock = rb_reader_lock(cpu_buffer); 5001 ret = rb_per_cpu_empty(cpu_buffer); 5002 rb_reader_unlock(cpu_buffer, dolock); 5003 local_irq_restore(flags); 5004 5005 if (!ret) 5006 return false; 5007 } 5008 5009 return true; 5010 } 5011 EXPORT_SYMBOL_GPL(ring_buffer_empty); 5012 5013 /** 5014 * ring_buffer_empty_cpu - is a cpu buffer of a ring buffer empty? 5015 * @buffer: The ring buffer 5016 * @cpu: The CPU buffer to test 5017 */ 5018 bool ring_buffer_empty_cpu(struct trace_buffer *buffer, int cpu) 5019 { 5020 struct ring_buffer_per_cpu *cpu_buffer; 5021 unsigned long flags; 5022 bool dolock; 5023 int ret; 5024 5025 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 5026 return true; 5027 5028 cpu_buffer = buffer->buffers[cpu]; 5029 local_irq_save(flags); 5030 dolock = rb_reader_lock(cpu_buffer); 5031 ret = rb_per_cpu_empty(cpu_buffer); 5032 rb_reader_unlock(cpu_buffer, dolock); 5033 local_irq_restore(flags); 5034 5035 return ret; 5036 } 5037 EXPORT_SYMBOL_GPL(ring_buffer_empty_cpu); 5038 5039 #ifdef CONFIG_RING_BUFFER_ALLOW_SWAP 5040 /** 5041 * ring_buffer_swap_cpu - swap a CPU buffer between two ring buffers 5042 * @buffer_a: One buffer to swap with 5043 * @buffer_b: The other buffer to swap with 5044 * @cpu: the CPU of the buffers to swap 5045 * 5046 * This function is useful for tracers that want to take a "snapshot" 5047 * of a CPU buffer and has another back up buffer lying around. 5048 * it is expected that the tracer handles the cpu buffer not being 5049 * used at the moment. 5050 */ 5051 int ring_buffer_swap_cpu(struct trace_buffer *buffer_a, 5052 struct trace_buffer *buffer_b, int cpu) 5053 { 5054 struct ring_buffer_per_cpu *cpu_buffer_a; 5055 struct ring_buffer_per_cpu *cpu_buffer_b; 5056 int ret = -EINVAL; 5057 5058 if (!cpumask_test_cpu(cpu, buffer_a->cpumask) || 5059 !cpumask_test_cpu(cpu, buffer_b->cpumask)) 5060 goto out; 5061 5062 cpu_buffer_a = buffer_a->buffers[cpu]; 5063 cpu_buffer_b = buffer_b->buffers[cpu]; 5064 5065 /* At least make sure the two buffers are somewhat the same */ 5066 if (cpu_buffer_a->nr_pages != cpu_buffer_b->nr_pages) 5067 goto out; 5068 5069 ret = -EAGAIN; 5070 5071 if (atomic_read(&buffer_a->record_disabled)) 5072 goto out; 5073 5074 if (atomic_read(&buffer_b->record_disabled)) 5075 goto out; 5076 5077 if (atomic_read(&cpu_buffer_a->record_disabled)) 5078 goto out; 5079 5080 if (atomic_read(&cpu_buffer_b->record_disabled)) 5081 goto out; 5082 5083 /* 5084 * We can't do a synchronize_rcu here because this 5085 * function can be called in atomic context. 5086 * Normally this will be called from the same CPU as cpu. 5087 * If not it's up to the caller to protect this. 5088 */ 5089 atomic_inc(&cpu_buffer_a->record_disabled); 5090 atomic_inc(&cpu_buffer_b->record_disabled); 5091 5092 ret = -EBUSY; 5093 if (local_read(&cpu_buffer_a->committing)) 5094 goto out_dec; 5095 if (local_read(&cpu_buffer_b->committing)) 5096 goto out_dec; 5097 5098 buffer_a->buffers[cpu] = cpu_buffer_b; 5099 buffer_b->buffers[cpu] = cpu_buffer_a; 5100 5101 cpu_buffer_b->buffer = buffer_a; 5102 cpu_buffer_a->buffer = buffer_b; 5103 5104 ret = 0; 5105 5106 out_dec: 5107 atomic_dec(&cpu_buffer_a->record_disabled); 5108 atomic_dec(&cpu_buffer_b->record_disabled); 5109 out: 5110 return ret; 5111 } 5112 EXPORT_SYMBOL_GPL(ring_buffer_swap_cpu); 5113 #endif /* CONFIG_RING_BUFFER_ALLOW_SWAP */ 5114 5115 /** 5116 * ring_buffer_alloc_read_page - allocate a page to read from buffer 5117 * @buffer: the buffer to allocate for. 5118 * @cpu: the cpu buffer to allocate. 5119 * 5120 * This function is used in conjunction with ring_buffer_read_page. 5121 * When reading a full page from the ring buffer, these functions 5122 * can be used to speed up the process. The calling function should 5123 * allocate a few pages first with this function. Then when it 5124 * needs to get pages from the ring buffer, it passes the result 5125 * of this function into ring_buffer_read_page, which will swap 5126 * the page that was allocated, with the read page of the buffer. 5127 * 5128 * Returns: 5129 * The page allocated, or ERR_PTR 5130 */ 5131 void *ring_buffer_alloc_read_page(struct trace_buffer *buffer, int cpu) 5132 { 5133 struct ring_buffer_per_cpu *cpu_buffer; 5134 struct buffer_data_page *bpage = NULL; 5135 unsigned long flags; 5136 struct page *page; 5137 5138 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 5139 return ERR_PTR(-ENODEV); 5140 5141 cpu_buffer = buffer->buffers[cpu]; 5142 local_irq_save(flags); 5143 arch_spin_lock(&cpu_buffer->lock); 5144 5145 if (cpu_buffer->free_page) { 5146 bpage = cpu_buffer->free_page; 5147 cpu_buffer->free_page = NULL; 5148 } 5149 5150 arch_spin_unlock(&cpu_buffer->lock); 5151 local_irq_restore(flags); 5152 5153 if (bpage) 5154 goto out; 5155 5156 page = alloc_pages_node(cpu_to_node(cpu), 5157 GFP_KERNEL | __GFP_NORETRY, 0); 5158 if (!page) 5159 return ERR_PTR(-ENOMEM); 5160 5161 bpage = page_address(page); 5162 5163 out: 5164 rb_init_page(bpage); 5165 5166 return bpage; 5167 } 5168 EXPORT_SYMBOL_GPL(ring_buffer_alloc_read_page); 5169 5170 /** 5171 * ring_buffer_free_read_page - free an allocated read page 5172 * @buffer: the buffer the page was allocate for 5173 * @cpu: the cpu buffer the page came from 5174 * @data: the page to free 5175 * 5176 * Free a page allocated from ring_buffer_alloc_read_page. 5177 */ 5178 void ring_buffer_free_read_page(struct trace_buffer *buffer, int cpu, void *data) 5179 { 5180 struct ring_buffer_per_cpu *cpu_buffer = buffer->buffers[cpu]; 5181 struct buffer_data_page *bpage = data; 5182 struct page *page = virt_to_page(bpage); 5183 unsigned long flags; 5184 5185 /* If the page is still in use someplace else, we can't reuse it */ 5186 if (page_ref_count(page) > 1) 5187 goto out; 5188 5189 local_irq_save(flags); 5190 arch_spin_lock(&cpu_buffer->lock); 5191 5192 if (!cpu_buffer->free_page) { 5193 cpu_buffer->free_page = bpage; 5194 bpage = NULL; 5195 } 5196 5197 arch_spin_unlock(&cpu_buffer->lock); 5198 local_irq_restore(flags); 5199 5200 out: 5201 free_page((unsigned long)bpage); 5202 } 5203 EXPORT_SYMBOL_GPL(ring_buffer_free_read_page); 5204 5205 /** 5206 * ring_buffer_read_page - extract a page from the ring buffer 5207 * @buffer: buffer to extract from 5208 * @data_page: the page to use allocated from ring_buffer_alloc_read_page 5209 * @len: amount to extract 5210 * @cpu: the cpu of the buffer to extract 5211 * @full: should the extraction only happen when the page is full. 5212 * 5213 * This function will pull out a page from the ring buffer and consume it. 5214 * @data_page must be the address of the variable that was returned 5215 * from ring_buffer_alloc_read_page. This is because the page might be used 5216 * to swap with a page in the ring buffer. 5217 * 5218 * for example: 5219 * rpage = ring_buffer_alloc_read_page(buffer, cpu); 5220 * if (IS_ERR(rpage)) 5221 * return PTR_ERR(rpage); 5222 * ret = ring_buffer_read_page(buffer, &rpage, len, cpu, 0); 5223 * if (ret >= 0) 5224 * process_page(rpage, ret); 5225 * 5226 * When @full is set, the function will not return true unless 5227 * the writer is off the reader page. 5228 * 5229 * Note: it is up to the calling functions to handle sleeps and wakeups. 5230 * The ring buffer can be used anywhere in the kernel and can not 5231 * blindly call wake_up. The layer that uses the ring buffer must be 5232 * responsible for that. 5233 * 5234 * Returns: 5235 * >=0 if data has been transferred, returns the offset of consumed data. 5236 * <0 if no data has been transferred. 5237 */ 5238 int ring_buffer_read_page(struct trace_buffer *buffer, 5239 void **data_page, size_t len, int cpu, int full) 5240 { 5241 struct ring_buffer_per_cpu *cpu_buffer = buffer->buffers[cpu]; 5242 struct ring_buffer_event *event; 5243 struct buffer_data_page *bpage; 5244 struct buffer_page *reader; 5245 unsigned long missed_events; 5246 unsigned long flags; 5247 unsigned int commit; 5248 unsigned int read; 5249 u64 save_timestamp; 5250 int ret = -1; 5251 5252 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 5253 goto out; 5254 5255 /* 5256 * If len is not big enough to hold the page header, then 5257 * we can not copy anything. 5258 */ 5259 if (len <= BUF_PAGE_HDR_SIZE) 5260 goto out; 5261 5262 len -= BUF_PAGE_HDR_SIZE; 5263 5264 if (!data_page) 5265 goto out; 5266 5267 bpage = *data_page; 5268 if (!bpage) 5269 goto out; 5270 5271 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 5272 5273 reader = rb_get_reader_page(cpu_buffer); 5274 if (!reader) 5275 goto out_unlock; 5276 5277 event = rb_reader_event(cpu_buffer); 5278 5279 read = reader->read; 5280 commit = rb_page_commit(reader); 5281 5282 /* Check if any events were dropped */ 5283 missed_events = cpu_buffer->lost_events; 5284 5285 /* 5286 * If this page has been partially read or 5287 * if len is not big enough to read the rest of the page or 5288 * a writer is still on the page, then 5289 * we must copy the data from the page to the buffer. 5290 * Otherwise, we can simply swap the page with the one passed in. 5291 */ 5292 if (read || (len < (commit - read)) || 5293 cpu_buffer->reader_page == cpu_buffer->commit_page) { 5294 struct buffer_data_page *rpage = cpu_buffer->reader_page->page; 5295 unsigned int rpos = read; 5296 unsigned int pos = 0; 5297 unsigned int size; 5298 5299 if (full) 5300 goto out_unlock; 5301 5302 if (len > (commit - read)) 5303 len = (commit - read); 5304 5305 /* Always keep the time extend and data together */ 5306 size = rb_event_ts_length(event); 5307 5308 if (len < size) 5309 goto out_unlock; 5310 5311 /* save the current timestamp, since the user will need it */ 5312 save_timestamp = cpu_buffer->read_stamp; 5313 5314 /* Need to copy one event at a time */ 5315 do { 5316 /* We need the size of one event, because 5317 * rb_advance_reader only advances by one event, 5318 * whereas rb_event_ts_length may include the size of 5319 * one or two events. 5320 * We have already ensured there's enough space if this 5321 * is a time extend. */ 5322 size = rb_event_length(event); 5323 memcpy(bpage->data + pos, rpage->data + rpos, size); 5324 5325 len -= size; 5326 5327 rb_advance_reader(cpu_buffer); 5328 rpos = reader->read; 5329 pos += size; 5330 5331 if (rpos >= commit) 5332 break; 5333 5334 event = rb_reader_event(cpu_buffer); 5335 /* Always keep the time extend and data together */ 5336 size = rb_event_ts_length(event); 5337 } while (len >= size); 5338 5339 /* update bpage */ 5340 local_set(&bpage->commit, pos); 5341 bpage->time_stamp = save_timestamp; 5342 5343 /* we copied everything to the beginning */ 5344 read = 0; 5345 } else { 5346 /* update the entry counter */ 5347 cpu_buffer->read += rb_page_entries(reader); 5348 cpu_buffer->read_bytes += BUF_PAGE_SIZE; 5349 5350 /* swap the pages */ 5351 rb_init_page(bpage); 5352 bpage = reader->page; 5353 reader->page = *data_page; 5354 local_set(&reader->write, 0); 5355 local_set(&reader->entries, 0); 5356 reader->read = 0; 5357 *data_page = bpage; 5358 5359 /* 5360 * Use the real_end for the data size, 5361 * This gives us a chance to store the lost events 5362 * on the page. 5363 */ 5364 if (reader->real_end) 5365 local_set(&bpage->commit, reader->real_end); 5366 } 5367 ret = read; 5368 5369 cpu_buffer->lost_events = 0; 5370 5371 commit = local_read(&bpage->commit); 5372 /* 5373 * Set a flag in the commit field if we lost events 5374 */ 5375 if (missed_events) { 5376 /* If there is room at the end of the page to save the 5377 * missed events, then record it there. 5378 */ 5379 if (BUF_PAGE_SIZE - commit >= sizeof(missed_events)) { 5380 memcpy(&bpage->data[commit], &missed_events, 5381 sizeof(missed_events)); 5382 local_add(RB_MISSED_STORED, &bpage->commit); 5383 commit += sizeof(missed_events); 5384 } 5385 local_add(RB_MISSED_EVENTS, &bpage->commit); 5386 } 5387 5388 /* 5389 * This page may be off to user land. Zero it out here. 5390 */ 5391 if (commit < BUF_PAGE_SIZE) 5392 memset(&bpage->data[commit], 0, BUF_PAGE_SIZE - commit); 5393 5394 out_unlock: 5395 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 5396 5397 out: 5398 return ret; 5399 } 5400 EXPORT_SYMBOL_GPL(ring_buffer_read_page); 5401 5402 /* 5403 * We only allocate new buffers, never free them if the CPU goes down. 5404 * If we were to free the buffer, then the user would lose any trace that was in 5405 * the buffer. 5406 */ 5407 int trace_rb_cpu_prepare(unsigned int cpu, struct hlist_node *node) 5408 { 5409 struct trace_buffer *buffer; 5410 long nr_pages_same; 5411 int cpu_i; 5412 unsigned long nr_pages; 5413 5414 buffer = container_of(node, struct trace_buffer, node); 5415 if (cpumask_test_cpu(cpu, buffer->cpumask)) 5416 return 0; 5417 5418 nr_pages = 0; 5419 nr_pages_same = 1; 5420 /* check if all cpu sizes are same */ 5421 for_each_buffer_cpu(buffer, cpu_i) { 5422 /* fill in the size from first enabled cpu */ 5423 if (nr_pages == 0) 5424 nr_pages = buffer->buffers[cpu_i]->nr_pages; 5425 if (nr_pages != buffer->buffers[cpu_i]->nr_pages) { 5426 nr_pages_same = 0; 5427 break; 5428 } 5429 } 5430 /* allocate minimum pages, user can later expand it */ 5431 if (!nr_pages_same) 5432 nr_pages = 2; 5433 buffer->buffers[cpu] = 5434 rb_allocate_cpu_buffer(buffer, nr_pages, cpu); 5435 if (!buffer->buffers[cpu]) { 5436 WARN(1, "failed to allocate ring buffer on CPU %u\n", 5437 cpu); 5438 return -ENOMEM; 5439 } 5440 smp_wmb(); 5441 cpumask_set_cpu(cpu, buffer->cpumask); 5442 return 0; 5443 } 5444 5445 #ifdef CONFIG_RING_BUFFER_STARTUP_TEST 5446 /* 5447 * This is a basic integrity check of the ring buffer. 5448 * Late in the boot cycle this test will run when configured in. 5449 * It will kick off a thread per CPU that will go into a loop 5450 * writing to the per cpu ring buffer various sizes of data. 5451 * Some of the data will be large items, some small. 5452 * 5453 * Another thread is created that goes into a spin, sending out 5454 * IPIs to the other CPUs to also write into the ring buffer. 5455 * this is to test the nesting ability of the buffer. 5456 * 5457 * Basic stats are recorded and reported. If something in the 5458 * ring buffer should happen that's not expected, a big warning 5459 * is displayed and all ring buffers are disabled. 5460 */ 5461 static struct task_struct *rb_threads[NR_CPUS] __initdata; 5462 5463 struct rb_test_data { 5464 struct trace_buffer *buffer; 5465 unsigned long events; 5466 unsigned long bytes_written; 5467 unsigned long bytes_alloc; 5468 unsigned long bytes_dropped; 5469 unsigned long events_nested; 5470 unsigned long bytes_written_nested; 5471 unsigned long bytes_alloc_nested; 5472 unsigned long bytes_dropped_nested; 5473 int min_size_nested; 5474 int max_size_nested; 5475 int max_size; 5476 int min_size; 5477 int cpu; 5478 int cnt; 5479 }; 5480 5481 static struct rb_test_data rb_data[NR_CPUS] __initdata; 5482 5483 /* 1 meg per cpu */ 5484 #define RB_TEST_BUFFER_SIZE 1048576 5485 5486 static char rb_string[] __initdata = 5487 "abcdefghijklmnopqrstuvwxyz1234567890!@#$%^&*()?+\\" 5488 "?+|:';\",.<>/?abcdefghijklmnopqrstuvwxyz1234567890" 5489 "!@#$%^&*()?+\\?+|:';\",.<>/?abcdefghijklmnopqrstuv"; 5490 5491 static bool rb_test_started __initdata; 5492 5493 struct rb_item { 5494 int size; 5495 char str[]; 5496 }; 5497 5498 static __init int rb_write_something(struct rb_test_data *data, bool nested) 5499 { 5500 struct ring_buffer_event *event; 5501 struct rb_item *item; 5502 bool started; 5503 int event_len; 5504 int size; 5505 int len; 5506 int cnt; 5507 5508 /* Have nested writes different that what is written */ 5509 cnt = data->cnt + (nested ? 27 : 0); 5510 5511 /* Multiply cnt by ~e, to make some unique increment */ 5512 size = (cnt * 68 / 25) % (sizeof(rb_string) - 1); 5513 5514 len = size + sizeof(struct rb_item); 5515 5516 started = rb_test_started; 5517 /* read rb_test_started before checking buffer enabled */ 5518 smp_rmb(); 5519 5520 event = ring_buffer_lock_reserve(data->buffer, len); 5521 if (!event) { 5522 /* Ignore dropped events before test starts. */ 5523 if (started) { 5524 if (nested) 5525 data->bytes_dropped += len; 5526 else 5527 data->bytes_dropped_nested += len; 5528 } 5529 return len; 5530 } 5531 5532 event_len = ring_buffer_event_length(event); 5533 5534 if (RB_WARN_ON(data->buffer, event_len < len)) 5535 goto out; 5536 5537 item = ring_buffer_event_data(event); 5538 item->size = size; 5539 memcpy(item->str, rb_string, size); 5540 5541 if (nested) { 5542 data->bytes_alloc_nested += event_len; 5543 data->bytes_written_nested += len; 5544 data->events_nested++; 5545 if (!data->min_size_nested || len < data->min_size_nested) 5546 data->min_size_nested = len; 5547 if (len > data->max_size_nested) 5548 data->max_size_nested = len; 5549 } else { 5550 data->bytes_alloc += event_len; 5551 data->bytes_written += len; 5552 data->events++; 5553 if (!data->min_size || len < data->min_size) 5554 data->max_size = len; 5555 if (len > data->max_size) 5556 data->max_size = len; 5557 } 5558 5559 out: 5560 ring_buffer_unlock_commit(data->buffer, event); 5561 5562 return 0; 5563 } 5564 5565 static __init int rb_test(void *arg) 5566 { 5567 struct rb_test_data *data = arg; 5568 5569 while (!kthread_should_stop()) { 5570 rb_write_something(data, false); 5571 data->cnt++; 5572 5573 set_current_state(TASK_INTERRUPTIBLE); 5574 /* Now sleep between a min of 100-300us and a max of 1ms */ 5575 usleep_range(((data->cnt % 3) + 1) * 100, 1000); 5576 } 5577 5578 return 0; 5579 } 5580 5581 static __init void rb_ipi(void *ignore) 5582 { 5583 struct rb_test_data *data; 5584 int cpu = smp_processor_id(); 5585 5586 data = &rb_data[cpu]; 5587 rb_write_something(data, true); 5588 } 5589 5590 static __init int rb_hammer_test(void *arg) 5591 { 5592 while (!kthread_should_stop()) { 5593 5594 /* Send an IPI to all cpus to write data! */ 5595 smp_call_function(rb_ipi, NULL, 1); 5596 /* No sleep, but for non preempt, let others run */ 5597 schedule(); 5598 } 5599 5600 return 0; 5601 } 5602 5603 static __init int test_ringbuffer(void) 5604 { 5605 struct task_struct *rb_hammer; 5606 struct trace_buffer *buffer; 5607 int cpu; 5608 int ret = 0; 5609 5610 if (security_locked_down(LOCKDOWN_TRACEFS)) { 5611 pr_warn("Lockdown is enabled, skipping ring buffer tests\n"); 5612 return 0; 5613 } 5614 5615 pr_info("Running ring buffer tests...\n"); 5616 5617 buffer = ring_buffer_alloc(RB_TEST_BUFFER_SIZE, RB_FL_OVERWRITE); 5618 if (WARN_ON(!buffer)) 5619 return 0; 5620 5621 /* Disable buffer so that threads can't write to it yet */ 5622 ring_buffer_record_off(buffer); 5623 5624 for_each_online_cpu(cpu) { 5625 rb_data[cpu].buffer = buffer; 5626 rb_data[cpu].cpu = cpu; 5627 rb_data[cpu].cnt = cpu; 5628 rb_threads[cpu] = kthread_create(rb_test, &rb_data[cpu], 5629 "rbtester/%d", cpu); 5630 if (WARN_ON(IS_ERR(rb_threads[cpu]))) { 5631 pr_cont("FAILED\n"); 5632 ret = PTR_ERR(rb_threads[cpu]); 5633 goto out_free; 5634 } 5635 5636 kthread_bind(rb_threads[cpu], cpu); 5637 wake_up_process(rb_threads[cpu]); 5638 } 5639 5640 /* Now create the rb hammer! */ 5641 rb_hammer = kthread_run(rb_hammer_test, NULL, "rbhammer"); 5642 if (WARN_ON(IS_ERR(rb_hammer))) { 5643 pr_cont("FAILED\n"); 5644 ret = PTR_ERR(rb_hammer); 5645 goto out_free; 5646 } 5647 5648 ring_buffer_record_on(buffer); 5649 /* 5650 * Show buffer is enabled before setting rb_test_started. 5651 * Yes there's a small race window where events could be 5652 * dropped and the thread wont catch it. But when a ring 5653 * buffer gets enabled, there will always be some kind of 5654 * delay before other CPUs see it. Thus, we don't care about 5655 * those dropped events. We care about events dropped after 5656 * the threads see that the buffer is active. 5657 */ 5658 smp_wmb(); 5659 rb_test_started = true; 5660 5661 set_current_state(TASK_INTERRUPTIBLE); 5662 /* Just run for 10 seconds */; 5663 schedule_timeout(10 * HZ); 5664 5665 kthread_stop(rb_hammer); 5666 5667 out_free: 5668 for_each_online_cpu(cpu) { 5669 if (!rb_threads[cpu]) 5670 break; 5671 kthread_stop(rb_threads[cpu]); 5672 } 5673 if (ret) { 5674 ring_buffer_free(buffer); 5675 return ret; 5676 } 5677 5678 /* Report! */ 5679 pr_info("finished\n"); 5680 for_each_online_cpu(cpu) { 5681 struct ring_buffer_event *event; 5682 struct rb_test_data *data = &rb_data[cpu]; 5683 struct rb_item *item; 5684 unsigned long total_events; 5685 unsigned long total_dropped; 5686 unsigned long total_written; 5687 unsigned long total_alloc; 5688 unsigned long total_read = 0; 5689 unsigned long total_size = 0; 5690 unsigned long total_len = 0; 5691 unsigned long total_lost = 0; 5692 unsigned long lost; 5693 int big_event_size; 5694 int small_event_size; 5695 5696 ret = -1; 5697 5698 total_events = data->events + data->events_nested; 5699 total_written = data->bytes_written + data->bytes_written_nested; 5700 total_alloc = data->bytes_alloc + data->bytes_alloc_nested; 5701 total_dropped = data->bytes_dropped + data->bytes_dropped_nested; 5702 5703 big_event_size = data->max_size + data->max_size_nested; 5704 small_event_size = data->min_size + data->min_size_nested; 5705 5706 pr_info("CPU %d:\n", cpu); 5707 pr_info(" events: %ld\n", total_events); 5708 pr_info(" dropped bytes: %ld\n", total_dropped); 5709 pr_info(" alloced bytes: %ld\n", total_alloc); 5710 pr_info(" written bytes: %ld\n", total_written); 5711 pr_info(" biggest event: %d\n", big_event_size); 5712 pr_info(" smallest event: %d\n", small_event_size); 5713 5714 if (RB_WARN_ON(buffer, total_dropped)) 5715 break; 5716 5717 ret = 0; 5718 5719 while ((event = ring_buffer_consume(buffer, cpu, NULL, &lost))) { 5720 total_lost += lost; 5721 item = ring_buffer_event_data(event); 5722 total_len += ring_buffer_event_length(event); 5723 total_size += item->size + sizeof(struct rb_item); 5724 if (memcmp(&item->str[0], rb_string, item->size) != 0) { 5725 pr_info("FAILED!\n"); 5726 pr_info("buffer had: %.*s\n", item->size, item->str); 5727 pr_info("expected: %.*s\n", item->size, rb_string); 5728 RB_WARN_ON(buffer, 1); 5729 ret = -1; 5730 break; 5731 } 5732 total_read++; 5733 } 5734 if (ret) 5735 break; 5736 5737 ret = -1; 5738 5739 pr_info(" read events: %ld\n", total_read); 5740 pr_info(" lost events: %ld\n", total_lost); 5741 pr_info(" total events: %ld\n", total_lost + total_read); 5742 pr_info(" recorded len bytes: %ld\n", total_len); 5743 pr_info(" recorded size bytes: %ld\n", total_size); 5744 if (total_lost) 5745 pr_info(" With dropped events, record len and size may not match\n" 5746 " alloced and written from above\n"); 5747 if (!total_lost) { 5748 if (RB_WARN_ON(buffer, total_len != total_alloc || 5749 total_size != total_written)) 5750 break; 5751 } 5752 if (RB_WARN_ON(buffer, total_lost + total_read != total_events)) 5753 break; 5754 5755 ret = 0; 5756 } 5757 if (!ret) 5758 pr_info("Ring buffer PASSED!\n"); 5759 5760 ring_buffer_free(buffer); 5761 return 0; 5762 } 5763 5764 late_initcall(test_ringbuffer); 5765 #endif /* CONFIG_RING_BUFFER_STARTUP_TEST */ 5766