1 // SPDX-License-Identifier: GPL-2.0 2 /* 3 * Generic ring buffer 4 * 5 * Copyright (C) 2008 Steven Rostedt <srostedt@redhat.com> 6 */ 7 #include <linux/trace_events.h> 8 #include <linux/ring_buffer.h> 9 #include <linux/trace_clock.h> 10 #include <linux/sched/clock.h> 11 #include <linux/trace_seq.h> 12 #include <linux/spinlock.h> 13 #include <linux/irq_work.h> 14 #include <linux/security.h> 15 #include <linux/uaccess.h> 16 #include <linux/hardirq.h> 17 #include <linux/kthread.h> /* for self test */ 18 #include <linux/module.h> 19 #include <linux/percpu.h> 20 #include <linux/mutex.h> 21 #include <linux/delay.h> 22 #include <linux/slab.h> 23 #include <linux/init.h> 24 #include <linux/hash.h> 25 #include <linux/list.h> 26 #include <linux/cpu.h> 27 #include <linux/oom.h> 28 29 #include <asm/local.h> 30 31 static void update_pages_handler(struct work_struct *work); 32 33 /* 34 * The ring buffer header is special. We must manually up keep it. 35 */ 36 int ring_buffer_print_entry_header(struct trace_seq *s) 37 { 38 trace_seq_puts(s, "# compressed entry header\n"); 39 trace_seq_puts(s, "\ttype_len : 5 bits\n"); 40 trace_seq_puts(s, "\ttime_delta : 27 bits\n"); 41 trace_seq_puts(s, "\tarray : 32 bits\n"); 42 trace_seq_putc(s, '\n'); 43 trace_seq_printf(s, "\tpadding : type == %d\n", 44 RINGBUF_TYPE_PADDING); 45 trace_seq_printf(s, "\ttime_extend : type == %d\n", 46 RINGBUF_TYPE_TIME_EXTEND); 47 trace_seq_printf(s, "\ttime_stamp : type == %d\n", 48 RINGBUF_TYPE_TIME_STAMP); 49 trace_seq_printf(s, "\tdata max type_len == %d\n", 50 RINGBUF_TYPE_DATA_TYPE_LEN_MAX); 51 52 return !trace_seq_has_overflowed(s); 53 } 54 55 /* 56 * The ring buffer is made up of a list of pages. A separate list of pages is 57 * allocated for each CPU. A writer may only write to a buffer that is 58 * associated with the CPU it is currently executing on. A reader may read 59 * from any per cpu buffer. 60 * 61 * The reader is special. For each per cpu buffer, the reader has its own 62 * reader page. When a reader has read the entire reader page, this reader 63 * page is swapped with another page in the ring buffer. 64 * 65 * Now, as long as the writer is off the reader page, the reader can do what 66 * ever it wants with that page. The writer will never write to that page 67 * again (as long as it is out of the ring buffer). 68 * 69 * Here's some silly ASCII art. 70 * 71 * +------+ 72 * |reader| RING BUFFER 73 * |page | 74 * +------+ +---+ +---+ +---+ 75 * | |-->| |-->| | 76 * +---+ +---+ +---+ 77 * ^ | 78 * | | 79 * +---------------+ 80 * 81 * 82 * +------+ 83 * |reader| RING BUFFER 84 * |page |------------------v 85 * +------+ +---+ +---+ +---+ 86 * | |-->| |-->| | 87 * +---+ +---+ +---+ 88 * ^ | 89 * | | 90 * +---------------+ 91 * 92 * 93 * +------+ 94 * |reader| RING BUFFER 95 * |page |------------------v 96 * +------+ +---+ +---+ +---+ 97 * ^ | |-->| |-->| | 98 * | +---+ +---+ +---+ 99 * | | 100 * | | 101 * +------------------------------+ 102 * 103 * 104 * +------+ 105 * |buffer| RING BUFFER 106 * |page |------------------v 107 * +------+ +---+ +---+ +---+ 108 * ^ | | | |-->| | 109 * | New +---+ +---+ +---+ 110 * | Reader------^ | 111 * | page | 112 * +------------------------------+ 113 * 114 * 115 * After we make this swap, the reader can hand this page off to the splice 116 * code and be done with it. It can even allocate a new page if it needs to 117 * and swap that into the ring buffer. 118 * 119 * We will be using cmpxchg soon to make all this lockless. 120 * 121 */ 122 123 /* Used for individual buffers (after the counter) */ 124 #define RB_BUFFER_OFF (1 << 20) 125 126 #define BUF_PAGE_HDR_SIZE offsetof(struct buffer_data_page, data) 127 128 #define RB_EVNT_HDR_SIZE (offsetof(struct ring_buffer_event, array)) 129 #define RB_ALIGNMENT 4U 130 #define RB_MAX_SMALL_DATA (RB_ALIGNMENT * RINGBUF_TYPE_DATA_TYPE_LEN_MAX) 131 #define RB_EVNT_MIN_SIZE 8U /* two 32bit words */ 132 #define RB_ALIGN_DATA __aligned(RB_ALIGNMENT) 133 134 /* define RINGBUF_TYPE_DATA for 'case RINGBUF_TYPE_DATA:' */ 135 #define RINGBUF_TYPE_DATA 0 ... RINGBUF_TYPE_DATA_TYPE_LEN_MAX 136 137 enum { 138 RB_LEN_TIME_EXTEND = 8, 139 RB_LEN_TIME_STAMP = 8, 140 }; 141 142 #define skip_time_extend(event) \ 143 ((struct ring_buffer_event *)((char *)event + RB_LEN_TIME_EXTEND)) 144 145 #define extended_time(event) \ 146 (event->type_len >= RINGBUF_TYPE_TIME_EXTEND) 147 148 static inline int rb_null_event(struct ring_buffer_event *event) 149 { 150 return event->type_len == RINGBUF_TYPE_PADDING && !event->time_delta; 151 } 152 153 static void rb_event_set_padding(struct ring_buffer_event *event) 154 { 155 /* padding has a NULL time_delta */ 156 event->type_len = RINGBUF_TYPE_PADDING; 157 event->time_delta = 0; 158 } 159 160 static unsigned 161 rb_event_data_length(struct ring_buffer_event *event) 162 { 163 unsigned length; 164 165 if (event->type_len) 166 length = event->type_len * RB_ALIGNMENT; 167 else 168 length = event->array[0]; 169 return length + RB_EVNT_HDR_SIZE; 170 } 171 172 /* 173 * Return the length of the given event. Will return 174 * the length of the time extend if the event is a 175 * time extend. 176 */ 177 static inline unsigned 178 rb_event_length(struct ring_buffer_event *event) 179 { 180 switch (event->type_len) { 181 case RINGBUF_TYPE_PADDING: 182 if (rb_null_event(event)) 183 /* undefined */ 184 return -1; 185 return event->array[0] + RB_EVNT_HDR_SIZE; 186 187 case RINGBUF_TYPE_TIME_EXTEND: 188 return RB_LEN_TIME_EXTEND; 189 190 case RINGBUF_TYPE_TIME_STAMP: 191 return RB_LEN_TIME_STAMP; 192 193 case RINGBUF_TYPE_DATA: 194 return rb_event_data_length(event); 195 default: 196 WARN_ON_ONCE(1); 197 } 198 /* not hit */ 199 return 0; 200 } 201 202 /* 203 * Return total length of time extend and data, 204 * or just the event length for all other events. 205 */ 206 static inline unsigned 207 rb_event_ts_length(struct ring_buffer_event *event) 208 { 209 unsigned len = 0; 210 211 if (extended_time(event)) { 212 /* time extends include the data event after it */ 213 len = RB_LEN_TIME_EXTEND; 214 event = skip_time_extend(event); 215 } 216 return len + rb_event_length(event); 217 } 218 219 /** 220 * ring_buffer_event_length - return the length of the event 221 * @event: the event to get the length of 222 * 223 * Returns the size of the data load of a data event. 224 * If the event is something other than a data event, it 225 * returns the size of the event itself. With the exception 226 * of a TIME EXTEND, where it still returns the size of the 227 * data load of the data event after it. 228 */ 229 unsigned ring_buffer_event_length(struct ring_buffer_event *event) 230 { 231 unsigned length; 232 233 if (extended_time(event)) 234 event = skip_time_extend(event); 235 236 length = rb_event_length(event); 237 if (event->type_len > RINGBUF_TYPE_DATA_TYPE_LEN_MAX) 238 return length; 239 length -= RB_EVNT_HDR_SIZE; 240 if (length > RB_MAX_SMALL_DATA + sizeof(event->array[0])) 241 length -= sizeof(event->array[0]); 242 return length; 243 } 244 EXPORT_SYMBOL_GPL(ring_buffer_event_length); 245 246 /* inline for ring buffer fast paths */ 247 static __always_inline void * 248 rb_event_data(struct ring_buffer_event *event) 249 { 250 if (extended_time(event)) 251 event = skip_time_extend(event); 252 WARN_ON_ONCE(event->type_len > RINGBUF_TYPE_DATA_TYPE_LEN_MAX); 253 /* If length is in len field, then array[0] has the data */ 254 if (event->type_len) 255 return (void *)&event->array[0]; 256 /* Otherwise length is in array[0] and array[1] has the data */ 257 return (void *)&event->array[1]; 258 } 259 260 /** 261 * ring_buffer_event_data - return the data of the event 262 * @event: the event to get the data from 263 */ 264 void *ring_buffer_event_data(struct ring_buffer_event *event) 265 { 266 return rb_event_data(event); 267 } 268 EXPORT_SYMBOL_GPL(ring_buffer_event_data); 269 270 #define for_each_buffer_cpu(buffer, cpu) \ 271 for_each_cpu(cpu, buffer->cpumask) 272 273 #define for_each_online_buffer_cpu(buffer, cpu) \ 274 for_each_cpu_and(cpu, buffer->cpumask, cpu_online_mask) 275 276 #define TS_SHIFT 27 277 #define TS_MASK ((1ULL << TS_SHIFT) - 1) 278 #define TS_DELTA_TEST (~TS_MASK) 279 280 /** 281 * ring_buffer_event_time_stamp - return the event's extended timestamp 282 * @event: the event to get the timestamp of 283 * 284 * Returns the extended timestamp associated with a data event. 285 * An extended time_stamp is a 64-bit timestamp represented 286 * internally in a special way that makes the best use of space 287 * contained within a ring buffer event. This function decodes 288 * it and maps it to a straight u64 value. 289 */ 290 u64 ring_buffer_event_time_stamp(struct ring_buffer_event *event) 291 { 292 u64 ts; 293 294 ts = event->array[0]; 295 ts <<= TS_SHIFT; 296 ts += event->time_delta; 297 298 return ts; 299 } 300 301 /* Flag when events were overwritten */ 302 #define RB_MISSED_EVENTS (1 << 31) 303 /* Missed count stored at end */ 304 #define RB_MISSED_STORED (1 << 30) 305 306 struct buffer_data_page { 307 u64 time_stamp; /* page time stamp */ 308 local_t commit; /* write committed index */ 309 unsigned char data[] RB_ALIGN_DATA; /* data of buffer page */ 310 }; 311 312 /* 313 * Note, the buffer_page list must be first. The buffer pages 314 * are allocated in cache lines, which means that each buffer 315 * page will be at the beginning of a cache line, and thus 316 * the least significant bits will be zero. We use this to 317 * add flags in the list struct pointers, to make the ring buffer 318 * lockless. 319 */ 320 struct buffer_page { 321 struct list_head list; /* list of buffer pages */ 322 local_t write; /* index for next write */ 323 unsigned read; /* index for next read */ 324 local_t entries; /* entries on this page */ 325 unsigned long real_end; /* real end of data */ 326 struct buffer_data_page *page; /* Actual data page */ 327 }; 328 329 /* 330 * The buffer page counters, write and entries, must be reset 331 * atomically when crossing page boundaries. To synchronize this 332 * update, two counters are inserted into the number. One is 333 * the actual counter for the write position or count on the page. 334 * 335 * The other is a counter of updaters. Before an update happens 336 * the update partition of the counter is incremented. This will 337 * allow the updater to update the counter atomically. 338 * 339 * The counter is 20 bits, and the state data is 12. 340 */ 341 #define RB_WRITE_MASK 0xfffff 342 #define RB_WRITE_INTCNT (1 << 20) 343 344 static void rb_init_page(struct buffer_data_page *bpage) 345 { 346 local_set(&bpage->commit, 0); 347 } 348 349 /* 350 * Also stolen from mm/slob.c. Thanks to Mathieu Desnoyers for pointing 351 * this issue out. 352 */ 353 static void free_buffer_page(struct buffer_page *bpage) 354 { 355 free_page((unsigned long)bpage->page); 356 kfree(bpage); 357 } 358 359 /* 360 * We need to fit the time_stamp delta into 27 bits. 361 */ 362 static inline int test_time_stamp(u64 delta) 363 { 364 if (delta & TS_DELTA_TEST) 365 return 1; 366 return 0; 367 } 368 369 #define BUF_PAGE_SIZE (PAGE_SIZE - BUF_PAGE_HDR_SIZE) 370 371 /* Max payload is BUF_PAGE_SIZE - header (8bytes) */ 372 #define BUF_MAX_DATA_SIZE (BUF_PAGE_SIZE - (sizeof(u32) * 2)) 373 374 int ring_buffer_print_page_header(struct trace_seq *s) 375 { 376 struct buffer_data_page field; 377 378 trace_seq_printf(s, "\tfield: u64 timestamp;\t" 379 "offset:0;\tsize:%u;\tsigned:%u;\n", 380 (unsigned int)sizeof(field.time_stamp), 381 (unsigned int)is_signed_type(u64)); 382 383 trace_seq_printf(s, "\tfield: local_t commit;\t" 384 "offset:%u;\tsize:%u;\tsigned:%u;\n", 385 (unsigned int)offsetof(typeof(field), commit), 386 (unsigned int)sizeof(field.commit), 387 (unsigned int)is_signed_type(long)); 388 389 trace_seq_printf(s, "\tfield: int overwrite;\t" 390 "offset:%u;\tsize:%u;\tsigned:%u;\n", 391 (unsigned int)offsetof(typeof(field), commit), 392 1, 393 (unsigned int)is_signed_type(long)); 394 395 trace_seq_printf(s, "\tfield: char data;\t" 396 "offset:%u;\tsize:%u;\tsigned:%u;\n", 397 (unsigned int)offsetof(typeof(field), data), 398 (unsigned int)BUF_PAGE_SIZE, 399 (unsigned int)is_signed_type(char)); 400 401 return !trace_seq_has_overflowed(s); 402 } 403 404 struct rb_irq_work { 405 struct irq_work work; 406 wait_queue_head_t waiters; 407 wait_queue_head_t full_waiters; 408 bool waiters_pending; 409 bool full_waiters_pending; 410 bool wakeup_full; 411 }; 412 413 /* 414 * Structure to hold event state and handle nested events. 415 */ 416 struct rb_event_info { 417 u64 ts; 418 u64 delta; 419 u64 before; 420 u64 after; 421 unsigned long length; 422 struct buffer_page *tail_page; 423 int add_timestamp; 424 }; 425 426 /* 427 * Used for the add_timestamp 428 * NONE 429 * EXTEND - wants a time extend 430 * ABSOLUTE - the buffer requests all events to have absolute time stamps 431 * FORCE - force a full time stamp. 432 */ 433 enum { 434 RB_ADD_STAMP_NONE = 0, 435 RB_ADD_STAMP_EXTEND = BIT(1), 436 RB_ADD_STAMP_ABSOLUTE = BIT(2), 437 RB_ADD_STAMP_FORCE = BIT(3) 438 }; 439 /* 440 * Used for which event context the event is in. 441 * TRANSITION = 0 442 * NMI = 1 443 * IRQ = 2 444 * SOFTIRQ = 3 445 * NORMAL = 4 446 * 447 * See trace_recursive_lock() comment below for more details. 448 */ 449 enum { 450 RB_CTX_TRANSITION, 451 RB_CTX_NMI, 452 RB_CTX_IRQ, 453 RB_CTX_SOFTIRQ, 454 RB_CTX_NORMAL, 455 RB_CTX_MAX 456 }; 457 458 #if BITS_PER_LONG == 32 459 #define RB_TIME_32 460 #endif 461 462 /* To test on 64 bit machines */ 463 //#define RB_TIME_32 464 465 #ifdef RB_TIME_32 466 467 struct rb_time_struct { 468 local_t cnt; 469 local_t top; 470 local_t bottom; 471 }; 472 #else 473 #include <asm/local64.h> 474 struct rb_time_struct { 475 local64_t time; 476 }; 477 #endif 478 typedef struct rb_time_struct rb_time_t; 479 480 /* 481 * head_page == tail_page && head == tail then buffer is empty. 482 */ 483 struct ring_buffer_per_cpu { 484 int cpu; 485 atomic_t record_disabled; 486 atomic_t resize_disabled; 487 struct trace_buffer *buffer; 488 raw_spinlock_t reader_lock; /* serialize readers */ 489 arch_spinlock_t lock; 490 struct lock_class_key lock_key; 491 struct buffer_data_page *free_page; 492 unsigned long nr_pages; 493 unsigned int current_context; 494 struct list_head *pages; 495 struct buffer_page *head_page; /* read from head */ 496 struct buffer_page *tail_page; /* write to tail */ 497 struct buffer_page *commit_page; /* committed pages */ 498 struct buffer_page *reader_page; 499 unsigned long lost_events; 500 unsigned long last_overrun; 501 unsigned long nest; 502 local_t entries_bytes; 503 local_t entries; 504 local_t overrun; 505 local_t commit_overrun; 506 local_t dropped_events; 507 local_t committing; 508 local_t commits; 509 local_t pages_touched; 510 local_t pages_read; 511 long last_pages_touch; 512 size_t shortest_full; 513 unsigned long read; 514 unsigned long read_bytes; 515 rb_time_t write_stamp; 516 rb_time_t before_stamp; 517 u64 read_stamp; 518 /* ring buffer pages to update, > 0 to add, < 0 to remove */ 519 long nr_pages_to_update; 520 struct list_head new_pages; /* new pages to add */ 521 struct work_struct update_pages_work; 522 struct completion update_done; 523 524 struct rb_irq_work irq_work; 525 }; 526 527 struct trace_buffer { 528 unsigned flags; 529 int cpus; 530 atomic_t record_disabled; 531 cpumask_var_t cpumask; 532 533 struct lock_class_key *reader_lock_key; 534 535 struct mutex mutex; 536 537 struct ring_buffer_per_cpu **buffers; 538 539 struct hlist_node node; 540 u64 (*clock)(void); 541 542 struct rb_irq_work irq_work; 543 bool time_stamp_abs; 544 }; 545 546 struct ring_buffer_iter { 547 struct ring_buffer_per_cpu *cpu_buffer; 548 unsigned long head; 549 unsigned long next_event; 550 struct buffer_page *head_page; 551 struct buffer_page *cache_reader_page; 552 unsigned long cache_read; 553 u64 read_stamp; 554 u64 page_stamp; 555 struct ring_buffer_event *event; 556 int missed_events; 557 }; 558 559 #ifdef RB_TIME_32 560 561 /* 562 * On 32 bit machines, local64_t is very expensive. As the ring 563 * buffer doesn't need all the features of a true 64 bit atomic, 564 * on 32 bit, it uses these functions (64 still uses local64_t). 565 * 566 * For the ring buffer, 64 bit required operations for the time is 567 * the following: 568 * 569 * - Only need 59 bits (uses 60 to make it even). 570 * - Reads may fail if it interrupted a modification of the time stamp. 571 * It will succeed if it did not interrupt another write even if 572 * the read itself is interrupted by a write. 573 * It returns whether it was successful or not. 574 * 575 * - Writes always succeed and will overwrite other writes and writes 576 * that were done by events interrupting the current write. 577 * 578 * - A write followed by a read of the same time stamp will always succeed, 579 * but may not contain the same value. 580 * 581 * - A cmpxchg will fail if it interrupted another write or cmpxchg. 582 * Other than that, it acts like a normal cmpxchg. 583 * 584 * The 60 bit time stamp is broken up by 30 bits in a top and bottom half 585 * (bottom being the least significant 30 bits of the 60 bit time stamp). 586 * 587 * The two most significant bits of each half holds a 2 bit counter (0-3). 588 * Each update will increment this counter by one. 589 * When reading the top and bottom, if the two counter bits match then the 590 * top and bottom together make a valid 60 bit number. 591 */ 592 #define RB_TIME_SHIFT 30 593 #define RB_TIME_VAL_MASK ((1 << RB_TIME_SHIFT) - 1) 594 595 static inline int rb_time_cnt(unsigned long val) 596 { 597 return (val >> RB_TIME_SHIFT) & 3; 598 } 599 600 static inline u64 rb_time_val(unsigned long top, unsigned long bottom) 601 { 602 u64 val; 603 604 val = top & RB_TIME_VAL_MASK; 605 val <<= RB_TIME_SHIFT; 606 val |= bottom & RB_TIME_VAL_MASK; 607 608 return val; 609 } 610 611 static inline bool __rb_time_read(rb_time_t *t, u64 *ret, unsigned long *cnt) 612 { 613 unsigned long top, bottom; 614 unsigned long c; 615 616 /* 617 * If the read is interrupted by a write, then the cnt will 618 * be different. Loop until both top and bottom have been read 619 * without interruption. 620 */ 621 do { 622 c = local_read(&t->cnt); 623 top = local_read(&t->top); 624 bottom = local_read(&t->bottom); 625 } while (c != local_read(&t->cnt)); 626 627 *cnt = rb_time_cnt(top); 628 629 /* If top and bottom counts don't match, this interrupted a write */ 630 if (*cnt != rb_time_cnt(bottom)) 631 return false; 632 633 *ret = rb_time_val(top, bottom); 634 return true; 635 } 636 637 static bool rb_time_read(rb_time_t *t, u64 *ret) 638 { 639 unsigned long cnt; 640 641 return __rb_time_read(t, ret, &cnt); 642 } 643 644 static inline unsigned long rb_time_val_cnt(unsigned long val, unsigned long cnt) 645 { 646 return (val & RB_TIME_VAL_MASK) | ((cnt & 3) << RB_TIME_SHIFT); 647 } 648 649 static inline void rb_time_split(u64 val, unsigned long *top, unsigned long *bottom) 650 { 651 *top = (unsigned long)((val >> RB_TIME_SHIFT) & RB_TIME_VAL_MASK); 652 *bottom = (unsigned long)(val & RB_TIME_VAL_MASK); 653 } 654 655 static inline void rb_time_val_set(local_t *t, unsigned long val, unsigned long cnt) 656 { 657 val = rb_time_val_cnt(val, cnt); 658 local_set(t, val); 659 } 660 661 static void rb_time_set(rb_time_t *t, u64 val) 662 { 663 unsigned long cnt, top, bottom; 664 665 rb_time_split(val, &top, &bottom); 666 667 /* Writes always succeed with a valid number even if it gets interrupted. */ 668 do { 669 cnt = local_inc_return(&t->cnt); 670 rb_time_val_set(&t->top, top, cnt); 671 rb_time_val_set(&t->bottom, bottom, cnt); 672 } while (cnt != local_read(&t->cnt)); 673 } 674 675 static inline bool 676 rb_time_read_cmpxchg(local_t *l, unsigned long expect, unsigned long set) 677 { 678 unsigned long ret; 679 680 ret = local_cmpxchg(l, expect, set); 681 return ret == expect; 682 } 683 684 static int rb_time_cmpxchg(rb_time_t *t, u64 expect, u64 set) 685 { 686 unsigned long cnt, top, bottom; 687 unsigned long cnt2, top2, bottom2; 688 u64 val; 689 690 /* The cmpxchg always fails if it interrupted an update */ 691 if (!__rb_time_read(t, &val, &cnt2)) 692 return false; 693 694 if (val != expect) 695 return false; 696 697 cnt = local_read(&t->cnt); 698 if ((cnt & 3) != cnt2) 699 return false; 700 701 cnt2 = cnt + 1; 702 703 rb_time_split(val, &top, &bottom); 704 top = rb_time_val_cnt(top, cnt); 705 bottom = rb_time_val_cnt(bottom, cnt); 706 707 rb_time_split(set, &top2, &bottom2); 708 top2 = rb_time_val_cnt(top2, cnt2); 709 bottom2 = rb_time_val_cnt(bottom2, cnt2); 710 711 if (!rb_time_read_cmpxchg(&t->cnt, cnt, cnt2)) 712 return false; 713 if (!rb_time_read_cmpxchg(&t->top, top, top2)) 714 return false; 715 if (!rb_time_read_cmpxchg(&t->bottom, bottom, bottom2)) 716 return false; 717 return true; 718 } 719 720 #else /* 64 bits */ 721 722 /* local64_t always succeeds */ 723 724 static inline bool rb_time_read(rb_time_t *t, u64 *ret) 725 { 726 *ret = local64_read(&t->time); 727 return true; 728 } 729 static void rb_time_set(rb_time_t *t, u64 val) 730 { 731 local64_set(&t->time, val); 732 } 733 734 static bool rb_time_cmpxchg(rb_time_t *t, u64 expect, u64 set) 735 { 736 u64 val; 737 val = local64_cmpxchg(&t->time, expect, set); 738 return val == expect; 739 } 740 #endif 741 742 /** 743 * ring_buffer_nr_pages - get the number of buffer pages in the ring buffer 744 * @buffer: The ring_buffer to get the number of pages from 745 * @cpu: The cpu of the ring_buffer to get the number of pages from 746 * 747 * Returns the number of pages used by a per_cpu buffer of the ring buffer. 748 */ 749 size_t ring_buffer_nr_pages(struct trace_buffer *buffer, int cpu) 750 { 751 return buffer->buffers[cpu]->nr_pages; 752 } 753 754 /** 755 * ring_buffer_nr_pages_dirty - get the number of used pages in the ring buffer 756 * @buffer: The ring_buffer to get the number of pages from 757 * @cpu: The cpu of the ring_buffer to get the number of pages from 758 * 759 * Returns the number of pages that have content in the ring buffer. 760 */ 761 size_t ring_buffer_nr_dirty_pages(struct trace_buffer *buffer, int cpu) 762 { 763 size_t read; 764 size_t cnt; 765 766 read = local_read(&buffer->buffers[cpu]->pages_read); 767 cnt = local_read(&buffer->buffers[cpu]->pages_touched); 768 /* The reader can read an empty page, but not more than that */ 769 if (cnt < read) { 770 WARN_ON_ONCE(read > cnt + 1); 771 return 0; 772 } 773 774 return cnt - read; 775 } 776 777 /* 778 * rb_wake_up_waiters - wake up tasks waiting for ring buffer input 779 * 780 * Schedules a delayed work to wake up any task that is blocked on the 781 * ring buffer waiters queue. 782 */ 783 static void rb_wake_up_waiters(struct irq_work *work) 784 { 785 struct rb_irq_work *rbwork = container_of(work, struct rb_irq_work, work); 786 787 wake_up_all(&rbwork->waiters); 788 if (rbwork->wakeup_full) { 789 rbwork->wakeup_full = false; 790 wake_up_all(&rbwork->full_waiters); 791 } 792 } 793 794 /** 795 * ring_buffer_wait - wait for input to the ring buffer 796 * @buffer: buffer to wait on 797 * @cpu: the cpu buffer to wait on 798 * @full: wait until the percentage of pages are available, if @cpu != RING_BUFFER_ALL_CPUS 799 * 800 * If @cpu == RING_BUFFER_ALL_CPUS then the task will wake up as soon 801 * as data is added to any of the @buffer's cpu buffers. Otherwise 802 * it will wait for data to be added to a specific cpu buffer. 803 */ 804 int ring_buffer_wait(struct trace_buffer *buffer, int cpu, int full) 805 { 806 struct ring_buffer_per_cpu *cpu_buffer; 807 DEFINE_WAIT(wait); 808 struct rb_irq_work *work; 809 int ret = 0; 810 811 /* 812 * Depending on what the caller is waiting for, either any 813 * data in any cpu buffer, or a specific buffer, put the 814 * caller on the appropriate wait queue. 815 */ 816 if (cpu == RING_BUFFER_ALL_CPUS) { 817 work = &buffer->irq_work; 818 /* Full only makes sense on per cpu reads */ 819 full = 0; 820 } else { 821 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 822 return -ENODEV; 823 cpu_buffer = buffer->buffers[cpu]; 824 work = &cpu_buffer->irq_work; 825 } 826 827 828 while (true) { 829 if (full) 830 prepare_to_wait(&work->full_waiters, &wait, TASK_INTERRUPTIBLE); 831 else 832 prepare_to_wait(&work->waiters, &wait, TASK_INTERRUPTIBLE); 833 834 /* 835 * The events can happen in critical sections where 836 * checking a work queue can cause deadlocks. 837 * After adding a task to the queue, this flag is set 838 * only to notify events to try to wake up the queue 839 * using irq_work. 840 * 841 * We don't clear it even if the buffer is no longer 842 * empty. The flag only causes the next event to run 843 * irq_work to do the work queue wake up. The worse 844 * that can happen if we race with !trace_empty() is that 845 * an event will cause an irq_work to try to wake up 846 * an empty queue. 847 * 848 * There's no reason to protect this flag either, as 849 * the work queue and irq_work logic will do the necessary 850 * synchronization for the wake ups. The only thing 851 * that is necessary is that the wake up happens after 852 * a task has been queued. It's OK for spurious wake ups. 853 */ 854 if (full) 855 work->full_waiters_pending = true; 856 else 857 work->waiters_pending = true; 858 859 if (signal_pending(current)) { 860 ret = -EINTR; 861 break; 862 } 863 864 if (cpu == RING_BUFFER_ALL_CPUS && !ring_buffer_empty(buffer)) 865 break; 866 867 if (cpu != RING_BUFFER_ALL_CPUS && 868 !ring_buffer_empty_cpu(buffer, cpu)) { 869 unsigned long flags; 870 bool pagebusy; 871 size_t nr_pages; 872 size_t dirty; 873 874 if (!full) 875 break; 876 877 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 878 pagebusy = cpu_buffer->reader_page == cpu_buffer->commit_page; 879 nr_pages = cpu_buffer->nr_pages; 880 dirty = ring_buffer_nr_dirty_pages(buffer, cpu); 881 if (!cpu_buffer->shortest_full || 882 cpu_buffer->shortest_full < full) 883 cpu_buffer->shortest_full = full; 884 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 885 if (!pagebusy && 886 (!nr_pages || (dirty * 100) > full * nr_pages)) 887 break; 888 } 889 890 schedule(); 891 } 892 893 if (full) 894 finish_wait(&work->full_waiters, &wait); 895 else 896 finish_wait(&work->waiters, &wait); 897 898 return ret; 899 } 900 901 /** 902 * ring_buffer_poll_wait - poll on buffer input 903 * @buffer: buffer to wait on 904 * @cpu: the cpu buffer to wait on 905 * @filp: the file descriptor 906 * @poll_table: The poll descriptor 907 * 908 * If @cpu == RING_BUFFER_ALL_CPUS then the task will wake up as soon 909 * as data is added to any of the @buffer's cpu buffers. Otherwise 910 * it will wait for data to be added to a specific cpu buffer. 911 * 912 * Returns EPOLLIN | EPOLLRDNORM if data exists in the buffers, 913 * zero otherwise. 914 */ 915 __poll_t ring_buffer_poll_wait(struct trace_buffer *buffer, int cpu, 916 struct file *filp, poll_table *poll_table) 917 { 918 struct ring_buffer_per_cpu *cpu_buffer; 919 struct rb_irq_work *work; 920 921 if (cpu == RING_BUFFER_ALL_CPUS) 922 work = &buffer->irq_work; 923 else { 924 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 925 return -EINVAL; 926 927 cpu_buffer = buffer->buffers[cpu]; 928 work = &cpu_buffer->irq_work; 929 } 930 931 poll_wait(filp, &work->waiters, poll_table); 932 work->waiters_pending = true; 933 /* 934 * There's a tight race between setting the waiters_pending and 935 * checking if the ring buffer is empty. Once the waiters_pending bit 936 * is set, the next event will wake the task up, but we can get stuck 937 * if there's only a single event in. 938 * 939 * FIXME: Ideally, we need a memory barrier on the writer side as well, 940 * but adding a memory barrier to all events will cause too much of a 941 * performance hit in the fast path. We only need a memory barrier when 942 * the buffer goes from empty to having content. But as this race is 943 * extremely small, and it's not a problem if another event comes in, we 944 * will fix it later. 945 */ 946 smp_mb(); 947 948 if ((cpu == RING_BUFFER_ALL_CPUS && !ring_buffer_empty(buffer)) || 949 (cpu != RING_BUFFER_ALL_CPUS && !ring_buffer_empty_cpu(buffer, cpu))) 950 return EPOLLIN | EPOLLRDNORM; 951 return 0; 952 } 953 954 /* buffer may be either ring_buffer or ring_buffer_per_cpu */ 955 #define RB_WARN_ON(b, cond) \ 956 ({ \ 957 int _____ret = unlikely(cond); \ 958 if (_____ret) { \ 959 if (__same_type(*(b), struct ring_buffer_per_cpu)) { \ 960 struct ring_buffer_per_cpu *__b = \ 961 (void *)b; \ 962 atomic_inc(&__b->buffer->record_disabled); \ 963 } else \ 964 atomic_inc(&b->record_disabled); \ 965 WARN_ON(1); \ 966 } \ 967 _____ret; \ 968 }) 969 970 /* Up this if you want to test the TIME_EXTENTS and normalization */ 971 #define DEBUG_SHIFT 0 972 973 static inline u64 rb_time_stamp(struct trace_buffer *buffer) 974 { 975 u64 ts; 976 977 /* Skip retpolines :-( */ 978 if (IS_ENABLED(CONFIG_RETPOLINE) && likely(buffer->clock == trace_clock_local)) 979 ts = trace_clock_local(); 980 else 981 ts = buffer->clock(); 982 983 /* shift to debug/test normalization and TIME_EXTENTS */ 984 return ts << DEBUG_SHIFT; 985 } 986 987 u64 ring_buffer_time_stamp(struct trace_buffer *buffer, int cpu) 988 { 989 u64 time; 990 991 preempt_disable_notrace(); 992 time = rb_time_stamp(buffer); 993 preempt_enable_notrace(); 994 995 return time; 996 } 997 EXPORT_SYMBOL_GPL(ring_buffer_time_stamp); 998 999 void ring_buffer_normalize_time_stamp(struct trace_buffer *buffer, 1000 int cpu, u64 *ts) 1001 { 1002 /* Just stupid testing the normalize function and deltas */ 1003 *ts >>= DEBUG_SHIFT; 1004 } 1005 EXPORT_SYMBOL_GPL(ring_buffer_normalize_time_stamp); 1006 1007 /* 1008 * Making the ring buffer lockless makes things tricky. 1009 * Although writes only happen on the CPU that they are on, 1010 * and they only need to worry about interrupts. Reads can 1011 * happen on any CPU. 1012 * 1013 * The reader page is always off the ring buffer, but when the 1014 * reader finishes with a page, it needs to swap its page with 1015 * a new one from the buffer. The reader needs to take from 1016 * the head (writes go to the tail). But if a writer is in overwrite 1017 * mode and wraps, it must push the head page forward. 1018 * 1019 * Here lies the problem. 1020 * 1021 * The reader must be careful to replace only the head page, and 1022 * not another one. As described at the top of the file in the 1023 * ASCII art, the reader sets its old page to point to the next 1024 * page after head. It then sets the page after head to point to 1025 * the old reader page. But if the writer moves the head page 1026 * during this operation, the reader could end up with the tail. 1027 * 1028 * We use cmpxchg to help prevent this race. We also do something 1029 * special with the page before head. We set the LSB to 1. 1030 * 1031 * When the writer must push the page forward, it will clear the 1032 * bit that points to the head page, move the head, and then set 1033 * the bit that points to the new head page. 1034 * 1035 * We also don't want an interrupt coming in and moving the head 1036 * page on another writer. Thus we use the second LSB to catch 1037 * that too. Thus: 1038 * 1039 * head->list->prev->next bit 1 bit 0 1040 * ------- ------- 1041 * Normal page 0 0 1042 * Points to head page 0 1 1043 * New head page 1 0 1044 * 1045 * Note we can not trust the prev pointer of the head page, because: 1046 * 1047 * +----+ +-----+ +-----+ 1048 * | |------>| T |---X--->| N | 1049 * | |<------| | | | 1050 * +----+ +-----+ +-----+ 1051 * ^ ^ | 1052 * | +-----+ | | 1053 * +----------| R |----------+ | 1054 * | |<-----------+ 1055 * +-----+ 1056 * 1057 * Key: ---X--> HEAD flag set in pointer 1058 * T Tail page 1059 * R Reader page 1060 * N Next page 1061 * 1062 * (see __rb_reserve_next() to see where this happens) 1063 * 1064 * What the above shows is that the reader just swapped out 1065 * the reader page with a page in the buffer, but before it 1066 * could make the new header point back to the new page added 1067 * it was preempted by a writer. The writer moved forward onto 1068 * the new page added by the reader and is about to move forward 1069 * again. 1070 * 1071 * You can see, it is legitimate for the previous pointer of 1072 * the head (or any page) not to point back to itself. But only 1073 * temporarily. 1074 */ 1075 1076 #define RB_PAGE_NORMAL 0UL 1077 #define RB_PAGE_HEAD 1UL 1078 #define RB_PAGE_UPDATE 2UL 1079 1080 1081 #define RB_FLAG_MASK 3UL 1082 1083 /* PAGE_MOVED is not part of the mask */ 1084 #define RB_PAGE_MOVED 4UL 1085 1086 /* 1087 * rb_list_head - remove any bit 1088 */ 1089 static struct list_head *rb_list_head(struct list_head *list) 1090 { 1091 unsigned long val = (unsigned long)list; 1092 1093 return (struct list_head *)(val & ~RB_FLAG_MASK); 1094 } 1095 1096 /* 1097 * rb_is_head_page - test if the given page is the head page 1098 * 1099 * Because the reader may move the head_page pointer, we can 1100 * not trust what the head page is (it may be pointing to 1101 * the reader page). But if the next page is a header page, 1102 * its flags will be non zero. 1103 */ 1104 static inline int 1105 rb_is_head_page(struct ring_buffer_per_cpu *cpu_buffer, 1106 struct buffer_page *page, struct list_head *list) 1107 { 1108 unsigned long val; 1109 1110 val = (unsigned long)list->next; 1111 1112 if ((val & ~RB_FLAG_MASK) != (unsigned long)&page->list) 1113 return RB_PAGE_MOVED; 1114 1115 return val & RB_FLAG_MASK; 1116 } 1117 1118 /* 1119 * rb_is_reader_page 1120 * 1121 * The unique thing about the reader page, is that, if the 1122 * writer is ever on it, the previous pointer never points 1123 * back to the reader page. 1124 */ 1125 static bool rb_is_reader_page(struct buffer_page *page) 1126 { 1127 struct list_head *list = page->list.prev; 1128 1129 return rb_list_head(list->next) != &page->list; 1130 } 1131 1132 /* 1133 * rb_set_list_to_head - set a list_head to be pointing to head. 1134 */ 1135 static void rb_set_list_to_head(struct ring_buffer_per_cpu *cpu_buffer, 1136 struct list_head *list) 1137 { 1138 unsigned long *ptr; 1139 1140 ptr = (unsigned long *)&list->next; 1141 *ptr |= RB_PAGE_HEAD; 1142 *ptr &= ~RB_PAGE_UPDATE; 1143 } 1144 1145 /* 1146 * rb_head_page_activate - sets up head page 1147 */ 1148 static void rb_head_page_activate(struct ring_buffer_per_cpu *cpu_buffer) 1149 { 1150 struct buffer_page *head; 1151 1152 head = cpu_buffer->head_page; 1153 if (!head) 1154 return; 1155 1156 /* 1157 * Set the previous list pointer to have the HEAD flag. 1158 */ 1159 rb_set_list_to_head(cpu_buffer, head->list.prev); 1160 } 1161 1162 static void rb_list_head_clear(struct list_head *list) 1163 { 1164 unsigned long *ptr = (unsigned long *)&list->next; 1165 1166 *ptr &= ~RB_FLAG_MASK; 1167 } 1168 1169 /* 1170 * rb_head_page_deactivate - clears head page ptr (for free list) 1171 */ 1172 static void 1173 rb_head_page_deactivate(struct ring_buffer_per_cpu *cpu_buffer) 1174 { 1175 struct list_head *hd; 1176 1177 /* Go through the whole list and clear any pointers found. */ 1178 rb_list_head_clear(cpu_buffer->pages); 1179 1180 list_for_each(hd, cpu_buffer->pages) 1181 rb_list_head_clear(hd); 1182 } 1183 1184 static int rb_head_page_set(struct ring_buffer_per_cpu *cpu_buffer, 1185 struct buffer_page *head, 1186 struct buffer_page *prev, 1187 int old_flag, int new_flag) 1188 { 1189 struct list_head *list; 1190 unsigned long val = (unsigned long)&head->list; 1191 unsigned long ret; 1192 1193 list = &prev->list; 1194 1195 val &= ~RB_FLAG_MASK; 1196 1197 ret = cmpxchg((unsigned long *)&list->next, 1198 val | old_flag, val | new_flag); 1199 1200 /* check if the reader took the page */ 1201 if ((ret & ~RB_FLAG_MASK) != val) 1202 return RB_PAGE_MOVED; 1203 1204 return ret & RB_FLAG_MASK; 1205 } 1206 1207 static int rb_head_page_set_update(struct ring_buffer_per_cpu *cpu_buffer, 1208 struct buffer_page *head, 1209 struct buffer_page *prev, 1210 int old_flag) 1211 { 1212 return rb_head_page_set(cpu_buffer, head, prev, 1213 old_flag, RB_PAGE_UPDATE); 1214 } 1215 1216 static int rb_head_page_set_head(struct ring_buffer_per_cpu *cpu_buffer, 1217 struct buffer_page *head, 1218 struct buffer_page *prev, 1219 int old_flag) 1220 { 1221 return rb_head_page_set(cpu_buffer, head, prev, 1222 old_flag, RB_PAGE_HEAD); 1223 } 1224 1225 static int rb_head_page_set_normal(struct ring_buffer_per_cpu *cpu_buffer, 1226 struct buffer_page *head, 1227 struct buffer_page *prev, 1228 int old_flag) 1229 { 1230 return rb_head_page_set(cpu_buffer, head, prev, 1231 old_flag, RB_PAGE_NORMAL); 1232 } 1233 1234 static inline void rb_inc_page(struct ring_buffer_per_cpu *cpu_buffer, 1235 struct buffer_page **bpage) 1236 { 1237 struct list_head *p = rb_list_head((*bpage)->list.next); 1238 1239 *bpage = list_entry(p, struct buffer_page, list); 1240 } 1241 1242 static struct buffer_page * 1243 rb_set_head_page(struct ring_buffer_per_cpu *cpu_buffer) 1244 { 1245 struct buffer_page *head; 1246 struct buffer_page *page; 1247 struct list_head *list; 1248 int i; 1249 1250 if (RB_WARN_ON(cpu_buffer, !cpu_buffer->head_page)) 1251 return NULL; 1252 1253 /* sanity check */ 1254 list = cpu_buffer->pages; 1255 if (RB_WARN_ON(cpu_buffer, rb_list_head(list->prev->next) != list)) 1256 return NULL; 1257 1258 page = head = cpu_buffer->head_page; 1259 /* 1260 * It is possible that the writer moves the header behind 1261 * where we started, and we miss in one loop. 1262 * A second loop should grab the header, but we'll do 1263 * three loops just because I'm paranoid. 1264 */ 1265 for (i = 0; i < 3; i++) { 1266 do { 1267 if (rb_is_head_page(cpu_buffer, page, page->list.prev)) { 1268 cpu_buffer->head_page = page; 1269 return page; 1270 } 1271 rb_inc_page(cpu_buffer, &page); 1272 } while (page != head); 1273 } 1274 1275 RB_WARN_ON(cpu_buffer, 1); 1276 1277 return NULL; 1278 } 1279 1280 static int rb_head_page_replace(struct buffer_page *old, 1281 struct buffer_page *new) 1282 { 1283 unsigned long *ptr = (unsigned long *)&old->list.prev->next; 1284 unsigned long val; 1285 unsigned long ret; 1286 1287 val = *ptr & ~RB_FLAG_MASK; 1288 val |= RB_PAGE_HEAD; 1289 1290 ret = cmpxchg(ptr, val, (unsigned long)&new->list); 1291 1292 return ret == val; 1293 } 1294 1295 /* 1296 * rb_tail_page_update - move the tail page forward 1297 */ 1298 static void rb_tail_page_update(struct ring_buffer_per_cpu *cpu_buffer, 1299 struct buffer_page *tail_page, 1300 struct buffer_page *next_page) 1301 { 1302 unsigned long old_entries; 1303 unsigned long old_write; 1304 1305 /* 1306 * The tail page now needs to be moved forward. 1307 * 1308 * We need to reset the tail page, but without messing 1309 * with possible erasing of data brought in by interrupts 1310 * that have moved the tail page and are currently on it. 1311 * 1312 * We add a counter to the write field to denote this. 1313 */ 1314 old_write = local_add_return(RB_WRITE_INTCNT, &next_page->write); 1315 old_entries = local_add_return(RB_WRITE_INTCNT, &next_page->entries); 1316 1317 local_inc(&cpu_buffer->pages_touched); 1318 /* 1319 * Just make sure we have seen our old_write and synchronize 1320 * with any interrupts that come in. 1321 */ 1322 barrier(); 1323 1324 /* 1325 * If the tail page is still the same as what we think 1326 * it is, then it is up to us to update the tail 1327 * pointer. 1328 */ 1329 if (tail_page == READ_ONCE(cpu_buffer->tail_page)) { 1330 /* Zero the write counter */ 1331 unsigned long val = old_write & ~RB_WRITE_MASK; 1332 unsigned long eval = old_entries & ~RB_WRITE_MASK; 1333 1334 /* 1335 * This will only succeed if an interrupt did 1336 * not come in and change it. In which case, we 1337 * do not want to modify it. 1338 * 1339 * We add (void) to let the compiler know that we do not care 1340 * about the return value of these functions. We use the 1341 * cmpxchg to only update if an interrupt did not already 1342 * do it for us. If the cmpxchg fails, we don't care. 1343 */ 1344 (void)local_cmpxchg(&next_page->write, old_write, val); 1345 (void)local_cmpxchg(&next_page->entries, old_entries, eval); 1346 1347 /* 1348 * No need to worry about races with clearing out the commit. 1349 * it only can increment when a commit takes place. But that 1350 * only happens in the outer most nested commit. 1351 */ 1352 local_set(&next_page->page->commit, 0); 1353 1354 /* Again, either we update tail_page or an interrupt does */ 1355 (void)cmpxchg(&cpu_buffer->tail_page, tail_page, next_page); 1356 } 1357 } 1358 1359 static int rb_check_bpage(struct ring_buffer_per_cpu *cpu_buffer, 1360 struct buffer_page *bpage) 1361 { 1362 unsigned long val = (unsigned long)bpage; 1363 1364 if (RB_WARN_ON(cpu_buffer, val & RB_FLAG_MASK)) 1365 return 1; 1366 1367 return 0; 1368 } 1369 1370 /** 1371 * rb_check_list - make sure a pointer to a list has the last bits zero 1372 */ 1373 static int rb_check_list(struct ring_buffer_per_cpu *cpu_buffer, 1374 struct list_head *list) 1375 { 1376 if (RB_WARN_ON(cpu_buffer, rb_list_head(list->prev) != list->prev)) 1377 return 1; 1378 if (RB_WARN_ON(cpu_buffer, rb_list_head(list->next) != list->next)) 1379 return 1; 1380 return 0; 1381 } 1382 1383 /** 1384 * rb_check_pages - integrity check of buffer pages 1385 * @cpu_buffer: CPU buffer with pages to test 1386 * 1387 * As a safety measure we check to make sure the data pages have not 1388 * been corrupted. 1389 */ 1390 static int rb_check_pages(struct ring_buffer_per_cpu *cpu_buffer) 1391 { 1392 struct list_head *head = cpu_buffer->pages; 1393 struct buffer_page *bpage, *tmp; 1394 1395 /* Reset the head page if it exists */ 1396 if (cpu_buffer->head_page) 1397 rb_set_head_page(cpu_buffer); 1398 1399 rb_head_page_deactivate(cpu_buffer); 1400 1401 if (RB_WARN_ON(cpu_buffer, head->next->prev != head)) 1402 return -1; 1403 if (RB_WARN_ON(cpu_buffer, head->prev->next != head)) 1404 return -1; 1405 1406 if (rb_check_list(cpu_buffer, head)) 1407 return -1; 1408 1409 list_for_each_entry_safe(bpage, tmp, head, list) { 1410 if (RB_WARN_ON(cpu_buffer, 1411 bpage->list.next->prev != &bpage->list)) 1412 return -1; 1413 if (RB_WARN_ON(cpu_buffer, 1414 bpage->list.prev->next != &bpage->list)) 1415 return -1; 1416 if (rb_check_list(cpu_buffer, &bpage->list)) 1417 return -1; 1418 } 1419 1420 rb_head_page_activate(cpu_buffer); 1421 1422 return 0; 1423 } 1424 1425 static int __rb_allocate_pages(long nr_pages, struct list_head *pages, int cpu) 1426 { 1427 struct buffer_page *bpage, *tmp; 1428 bool user_thread = current->mm != NULL; 1429 gfp_t mflags; 1430 long i; 1431 1432 /* 1433 * Check if the available memory is there first. 1434 * Note, si_mem_available() only gives us a rough estimate of available 1435 * memory. It may not be accurate. But we don't care, we just want 1436 * to prevent doing any allocation when it is obvious that it is 1437 * not going to succeed. 1438 */ 1439 i = si_mem_available(); 1440 if (i < nr_pages) 1441 return -ENOMEM; 1442 1443 /* 1444 * __GFP_RETRY_MAYFAIL flag makes sure that the allocation fails 1445 * gracefully without invoking oom-killer and the system is not 1446 * destabilized. 1447 */ 1448 mflags = GFP_KERNEL | __GFP_RETRY_MAYFAIL; 1449 1450 /* 1451 * If a user thread allocates too much, and si_mem_available() 1452 * reports there's enough memory, even though there is not. 1453 * Make sure the OOM killer kills this thread. This can happen 1454 * even with RETRY_MAYFAIL because another task may be doing 1455 * an allocation after this task has taken all memory. 1456 * This is the task the OOM killer needs to take out during this 1457 * loop, even if it was triggered by an allocation somewhere else. 1458 */ 1459 if (user_thread) 1460 set_current_oom_origin(); 1461 for (i = 0; i < nr_pages; i++) { 1462 struct page *page; 1463 1464 bpage = kzalloc_node(ALIGN(sizeof(*bpage), cache_line_size()), 1465 mflags, cpu_to_node(cpu)); 1466 if (!bpage) 1467 goto free_pages; 1468 1469 list_add(&bpage->list, pages); 1470 1471 page = alloc_pages_node(cpu_to_node(cpu), mflags, 0); 1472 if (!page) 1473 goto free_pages; 1474 bpage->page = page_address(page); 1475 rb_init_page(bpage->page); 1476 1477 if (user_thread && fatal_signal_pending(current)) 1478 goto free_pages; 1479 } 1480 if (user_thread) 1481 clear_current_oom_origin(); 1482 1483 return 0; 1484 1485 free_pages: 1486 list_for_each_entry_safe(bpage, tmp, pages, list) { 1487 list_del_init(&bpage->list); 1488 free_buffer_page(bpage); 1489 } 1490 if (user_thread) 1491 clear_current_oom_origin(); 1492 1493 return -ENOMEM; 1494 } 1495 1496 static int rb_allocate_pages(struct ring_buffer_per_cpu *cpu_buffer, 1497 unsigned long nr_pages) 1498 { 1499 LIST_HEAD(pages); 1500 1501 WARN_ON(!nr_pages); 1502 1503 if (__rb_allocate_pages(nr_pages, &pages, cpu_buffer->cpu)) 1504 return -ENOMEM; 1505 1506 /* 1507 * The ring buffer page list is a circular list that does not 1508 * start and end with a list head. All page list items point to 1509 * other pages. 1510 */ 1511 cpu_buffer->pages = pages.next; 1512 list_del(&pages); 1513 1514 cpu_buffer->nr_pages = nr_pages; 1515 1516 rb_check_pages(cpu_buffer); 1517 1518 return 0; 1519 } 1520 1521 static struct ring_buffer_per_cpu * 1522 rb_allocate_cpu_buffer(struct trace_buffer *buffer, long nr_pages, int cpu) 1523 { 1524 struct ring_buffer_per_cpu *cpu_buffer; 1525 struct buffer_page *bpage; 1526 struct page *page; 1527 int ret; 1528 1529 cpu_buffer = kzalloc_node(ALIGN(sizeof(*cpu_buffer), cache_line_size()), 1530 GFP_KERNEL, cpu_to_node(cpu)); 1531 if (!cpu_buffer) 1532 return NULL; 1533 1534 cpu_buffer->cpu = cpu; 1535 cpu_buffer->buffer = buffer; 1536 raw_spin_lock_init(&cpu_buffer->reader_lock); 1537 lockdep_set_class(&cpu_buffer->reader_lock, buffer->reader_lock_key); 1538 cpu_buffer->lock = (arch_spinlock_t)__ARCH_SPIN_LOCK_UNLOCKED; 1539 INIT_WORK(&cpu_buffer->update_pages_work, update_pages_handler); 1540 init_completion(&cpu_buffer->update_done); 1541 init_irq_work(&cpu_buffer->irq_work.work, rb_wake_up_waiters); 1542 init_waitqueue_head(&cpu_buffer->irq_work.waiters); 1543 init_waitqueue_head(&cpu_buffer->irq_work.full_waiters); 1544 1545 bpage = kzalloc_node(ALIGN(sizeof(*bpage), cache_line_size()), 1546 GFP_KERNEL, cpu_to_node(cpu)); 1547 if (!bpage) 1548 goto fail_free_buffer; 1549 1550 rb_check_bpage(cpu_buffer, bpage); 1551 1552 cpu_buffer->reader_page = bpage; 1553 page = alloc_pages_node(cpu_to_node(cpu), GFP_KERNEL, 0); 1554 if (!page) 1555 goto fail_free_reader; 1556 bpage->page = page_address(page); 1557 rb_init_page(bpage->page); 1558 1559 INIT_LIST_HEAD(&cpu_buffer->reader_page->list); 1560 INIT_LIST_HEAD(&cpu_buffer->new_pages); 1561 1562 ret = rb_allocate_pages(cpu_buffer, nr_pages); 1563 if (ret < 0) 1564 goto fail_free_reader; 1565 1566 cpu_buffer->head_page 1567 = list_entry(cpu_buffer->pages, struct buffer_page, list); 1568 cpu_buffer->tail_page = cpu_buffer->commit_page = cpu_buffer->head_page; 1569 1570 rb_head_page_activate(cpu_buffer); 1571 1572 return cpu_buffer; 1573 1574 fail_free_reader: 1575 free_buffer_page(cpu_buffer->reader_page); 1576 1577 fail_free_buffer: 1578 kfree(cpu_buffer); 1579 return NULL; 1580 } 1581 1582 static void rb_free_cpu_buffer(struct ring_buffer_per_cpu *cpu_buffer) 1583 { 1584 struct list_head *head = cpu_buffer->pages; 1585 struct buffer_page *bpage, *tmp; 1586 1587 free_buffer_page(cpu_buffer->reader_page); 1588 1589 rb_head_page_deactivate(cpu_buffer); 1590 1591 if (head) { 1592 list_for_each_entry_safe(bpage, tmp, head, list) { 1593 list_del_init(&bpage->list); 1594 free_buffer_page(bpage); 1595 } 1596 bpage = list_entry(head, struct buffer_page, list); 1597 free_buffer_page(bpage); 1598 } 1599 1600 kfree(cpu_buffer); 1601 } 1602 1603 /** 1604 * __ring_buffer_alloc - allocate a new ring_buffer 1605 * @size: the size in bytes per cpu that is needed. 1606 * @flags: attributes to set for the ring buffer. 1607 * @key: ring buffer reader_lock_key. 1608 * 1609 * Currently the only flag that is available is the RB_FL_OVERWRITE 1610 * flag. This flag means that the buffer will overwrite old data 1611 * when the buffer wraps. If this flag is not set, the buffer will 1612 * drop data when the tail hits the head. 1613 */ 1614 struct trace_buffer *__ring_buffer_alloc(unsigned long size, unsigned flags, 1615 struct lock_class_key *key) 1616 { 1617 struct trace_buffer *buffer; 1618 long nr_pages; 1619 int bsize; 1620 int cpu; 1621 int ret; 1622 1623 /* keep it in its own cache line */ 1624 buffer = kzalloc(ALIGN(sizeof(*buffer), cache_line_size()), 1625 GFP_KERNEL); 1626 if (!buffer) 1627 return NULL; 1628 1629 if (!zalloc_cpumask_var(&buffer->cpumask, GFP_KERNEL)) 1630 goto fail_free_buffer; 1631 1632 nr_pages = DIV_ROUND_UP(size, BUF_PAGE_SIZE); 1633 buffer->flags = flags; 1634 buffer->clock = trace_clock_local; 1635 buffer->reader_lock_key = key; 1636 1637 init_irq_work(&buffer->irq_work.work, rb_wake_up_waiters); 1638 init_waitqueue_head(&buffer->irq_work.waiters); 1639 1640 /* need at least two pages */ 1641 if (nr_pages < 2) 1642 nr_pages = 2; 1643 1644 buffer->cpus = nr_cpu_ids; 1645 1646 bsize = sizeof(void *) * nr_cpu_ids; 1647 buffer->buffers = kzalloc(ALIGN(bsize, cache_line_size()), 1648 GFP_KERNEL); 1649 if (!buffer->buffers) 1650 goto fail_free_cpumask; 1651 1652 cpu = raw_smp_processor_id(); 1653 cpumask_set_cpu(cpu, buffer->cpumask); 1654 buffer->buffers[cpu] = rb_allocate_cpu_buffer(buffer, nr_pages, cpu); 1655 if (!buffer->buffers[cpu]) 1656 goto fail_free_buffers; 1657 1658 ret = cpuhp_state_add_instance(CPUHP_TRACE_RB_PREPARE, &buffer->node); 1659 if (ret < 0) 1660 goto fail_free_buffers; 1661 1662 mutex_init(&buffer->mutex); 1663 1664 return buffer; 1665 1666 fail_free_buffers: 1667 for_each_buffer_cpu(buffer, cpu) { 1668 if (buffer->buffers[cpu]) 1669 rb_free_cpu_buffer(buffer->buffers[cpu]); 1670 } 1671 kfree(buffer->buffers); 1672 1673 fail_free_cpumask: 1674 free_cpumask_var(buffer->cpumask); 1675 1676 fail_free_buffer: 1677 kfree(buffer); 1678 return NULL; 1679 } 1680 EXPORT_SYMBOL_GPL(__ring_buffer_alloc); 1681 1682 /** 1683 * ring_buffer_free - free a ring buffer. 1684 * @buffer: the buffer to free. 1685 */ 1686 void 1687 ring_buffer_free(struct trace_buffer *buffer) 1688 { 1689 int cpu; 1690 1691 cpuhp_state_remove_instance(CPUHP_TRACE_RB_PREPARE, &buffer->node); 1692 1693 for_each_buffer_cpu(buffer, cpu) 1694 rb_free_cpu_buffer(buffer->buffers[cpu]); 1695 1696 kfree(buffer->buffers); 1697 free_cpumask_var(buffer->cpumask); 1698 1699 kfree(buffer); 1700 } 1701 EXPORT_SYMBOL_GPL(ring_buffer_free); 1702 1703 void ring_buffer_set_clock(struct trace_buffer *buffer, 1704 u64 (*clock)(void)) 1705 { 1706 buffer->clock = clock; 1707 } 1708 1709 void ring_buffer_set_time_stamp_abs(struct trace_buffer *buffer, bool abs) 1710 { 1711 buffer->time_stamp_abs = abs; 1712 } 1713 1714 bool ring_buffer_time_stamp_abs(struct trace_buffer *buffer) 1715 { 1716 return buffer->time_stamp_abs; 1717 } 1718 1719 static void rb_reset_cpu(struct ring_buffer_per_cpu *cpu_buffer); 1720 1721 static inline unsigned long rb_page_entries(struct buffer_page *bpage) 1722 { 1723 return local_read(&bpage->entries) & RB_WRITE_MASK; 1724 } 1725 1726 static inline unsigned long rb_page_write(struct buffer_page *bpage) 1727 { 1728 return local_read(&bpage->write) & RB_WRITE_MASK; 1729 } 1730 1731 static int 1732 rb_remove_pages(struct ring_buffer_per_cpu *cpu_buffer, unsigned long nr_pages) 1733 { 1734 struct list_head *tail_page, *to_remove, *next_page; 1735 struct buffer_page *to_remove_page, *tmp_iter_page; 1736 struct buffer_page *last_page, *first_page; 1737 unsigned long nr_removed; 1738 unsigned long head_bit; 1739 int page_entries; 1740 1741 head_bit = 0; 1742 1743 raw_spin_lock_irq(&cpu_buffer->reader_lock); 1744 atomic_inc(&cpu_buffer->record_disabled); 1745 /* 1746 * We don't race with the readers since we have acquired the reader 1747 * lock. We also don't race with writers after disabling recording. 1748 * This makes it easy to figure out the first and the last page to be 1749 * removed from the list. We unlink all the pages in between including 1750 * the first and last pages. This is done in a busy loop so that we 1751 * lose the least number of traces. 1752 * The pages are freed after we restart recording and unlock readers. 1753 */ 1754 tail_page = &cpu_buffer->tail_page->list; 1755 1756 /* 1757 * tail page might be on reader page, we remove the next page 1758 * from the ring buffer 1759 */ 1760 if (cpu_buffer->tail_page == cpu_buffer->reader_page) 1761 tail_page = rb_list_head(tail_page->next); 1762 to_remove = tail_page; 1763 1764 /* start of pages to remove */ 1765 first_page = list_entry(rb_list_head(to_remove->next), 1766 struct buffer_page, list); 1767 1768 for (nr_removed = 0; nr_removed < nr_pages; nr_removed++) { 1769 to_remove = rb_list_head(to_remove)->next; 1770 head_bit |= (unsigned long)to_remove & RB_PAGE_HEAD; 1771 } 1772 1773 next_page = rb_list_head(to_remove)->next; 1774 1775 /* 1776 * Now we remove all pages between tail_page and next_page. 1777 * Make sure that we have head_bit value preserved for the 1778 * next page 1779 */ 1780 tail_page->next = (struct list_head *)((unsigned long)next_page | 1781 head_bit); 1782 next_page = rb_list_head(next_page); 1783 next_page->prev = tail_page; 1784 1785 /* make sure pages points to a valid page in the ring buffer */ 1786 cpu_buffer->pages = next_page; 1787 1788 /* update head page */ 1789 if (head_bit) 1790 cpu_buffer->head_page = list_entry(next_page, 1791 struct buffer_page, list); 1792 1793 /* 1794 * change read pointer to make sure any read iterators reset 1795 * themselves 1796 */ 1797 cpu_buffer->read = 0; 1798 1799 /* pages are removed, resume tracing and then free the pages */ 1800 atomic_dec(&cpu_buffer->record_disabled); 1801 raw_spin_unlock_irq(&cpu_buffer->reader_lock); 1802 1803 RB_WARN_ON(cpu_buffer, list_empty(cpu_buffer->pages)); 1804 1805 /* last buffer page to remove */ 1806 last_page = list_entry(rb_list_head(to_remove), struct buffer_page, 1807 list); 1808 tmp_iter_page = first_page; 1809 1810 do { 1811 cond_resched(); 1812 1813 to_remove_page = tmp_iter_page; 1814 rb_inc_page(cpu_buffer, &tmp_iter_page); 1815 1816 /* update the counters */ 1817 page_entries = rb_page_entries(to_remove_page); 1818 if (page_entries) { 1819 /* 1820 * If something was added to this page, it was full 1821 * since it is not the tail page. So we deduct the 1822 * bytes consumed in ring buffer from here. 1823 * Increment overrun to account for the lost events. 1824 */ 1825 local_add(page_entries, &cpu_buffer->overrun); 1826 local_sub(BUF_PAGE_SIZE, &cpu_buffer->entries_bytes); 1827 } 1828 1829 /* 1830 * We have already removed references to this list item, just 1831 * free up the buffer_page and its page 1832 */ 1833 free_buffer_page(to_remove_page); 1834 nr_removed--; 1835 1836 } while (to_remove_page != last_page); 1837 1838 RB_WARN_ON(cpu_buffer, nr_removed); 1839 1840 return nr_removed == 0; 1841 } 1842 1843 static int 1844 rb_insert_pages(struct ring_buffer_per_cpu *cpu_buffer) 1845 { 1846 struct list_head *pages = &cpu_buffer->new_pages; 1847 int retries, success; 1848 1849 raw_spin_lock_irq(&cpu_buffer->reader_lock); 1850 /* 1851 * We are holding the reader lock, so the reader page won't be swapped 1852 * in the ring buffer. Now we are racing with the writer trying to 1853 * move head page and the tail page. 1854 * We are going to adapt the reader page update process where: 1855 * 1. We first splice the start and end of list of new pages between 1856 * the head page and its previous page. 1857 * 2. We cmpxchg the prev_page->next to point from head page to the 1858 * start of new pages list. 1859 * 3. Finally, we update the head->prev to the end of new list. 1860 * 1861 * We will try this process 10 times, to make sure that we don't keep 1862 * spinning. 1863 */ 1864 retries = 10; 1865 success = 0; 1866 while (retries--) { 1867 struct list_head *head_page, *prev_page, *r; 1868 struct list_head *last_page, *first_page; 1869 struct list_head *head_page_with_bit; 1870 1871 head_page = &rb_set_head_page(cpu_buffer)->list; 1872 if (!head_page) 1873 break; 1874 prev_page = head_page->prev; 1875 1876 first_page = pages->next; 1877 last_page = pages->prev; 1878 1879 head_page_with_bit = (struct list_head *) 1880 ((unsigned long)head_page | RB_PAGE_HEAD); 1881 1882 last_page->next = head_page_with_bit; 1883 first_page->prev = prev_page; 1884 1885 r = cmpxchg(&prev_page->next, head_page_with_bit, first_page); 1886 1887 if (r == head_page_with_bit) { 1888 /* 1889 * yay, we replaced the page pointer to our new list, 1890 * now, we just have to update to head page's prev 1891 * pointer to point to end of list 1892 */ 1893 head_page->prev = last_page; 1894 success = 1; 1895 break; 1896 } 1897 } 1898 1899 if (success) 1900 INIT_LIST_HEAD(pages); 1901 /* 1902 * If we weren't successful in adding in new pages, warn and stop 1903 * tracing 1904 */ 1905 RB_WARN_ON(cpu_buffer, !success); 1906 raw_spin_unlock_irq(&cpu_buffer->reader_lock); 1907 1908 /* free pages if they weren't inserted */ 1909 if (!success) { 1910 struct buffer_page *bpage, *tmp; 1911 list_for_each_entry_safe(bpage, tmp, &cpu_buffer->new_pages, 1912 list) { 1913 list_del_init(&bpage->list); 1914 free_buffer_page(bpage); 1915 } 1916 } 1917 return success; 1918 } 1919 1920 static void rb_update_pages(struct ring_buffer_per_cpu *cpu_buffer) 1921 { 1922 int success; 1923 1924 if (cpu_buffer->nr_pages_to_update > 0) 1925 success = rb_insert_pages(cpu_buffer); 1926 else 1927 success = rb_remove_pages(cpu_buffer, 1928 -cpu_buffer->nr_pages_to_update); 1929 1930 if (success) 1931 cpu_buffer->nr_pages += cpu_buffer->nr_pages_to_update; 1932 } 1933 1934 static void update_pages_handler(struct work_struct *work) 1935 { 1936 struct ring_buffer_per_cpu *cpu_buffer = container_of(work, 1937 struct ring_buffer_per_cpu, update_pages_work); 1938 rb_update_pages(cpu_buffer); 1939 complete(&cpu_buffer->update_done); 1940 } 1941 1942 /** 1943 * ring_buffer_resize - resize the ring buffer 1944 * @buffer: the buffer to resize. 1945 * @size: the new size. 1946 * @cpu_id: the cpu buffer to resize 1947 * 1948 * Minimum size is 2 * BUF_PAGE_SIZE. 1949 * 1950 * Returns 0 on success and < 0 on failure. 1951 */ 1952 int ring_buffer_resize(struct trace_buffer *buffer, unsigned long size, 1953 int cpu_id) 1954 { 1955 struct ring_buffer_per_cpu *cpu_buffer; 1956 unsigned long nr_pages; 1957 int cpu, err; 1958 1959 /* 1960 * Always succeed at resizing a non-existent buffer: 1961 */ 1962 if (!buffer) 1963 return 0; 1964 1965 /* Make sure the requested buffer exists */ 1966 if (cpu_id != RING_BUFFER_ALL_CPUS && 1967 !cpumask_test_cpu(cpu_id, buffer->cpumask)) 1968 return 0; 1969 1970 nr_pages = DIV_ROUND_UP(size, BUF_PAGE_SIZE); 1971 1972 /* we need a minimum of two pages */ 1973 if (nr_pages < 2) 1974 nr_pages = 2; 1975 1976 size = nr_pages * BUF_PAGE_SIZE; 1977 1978 /* prevent another thread from changing buffer sizes */ 1979 mutex_lock(&buffer->mutex); 1980 1981 1982 if (cpu_id == RING_BUFFER_ALL_CPUS) { 1983 /* 1984 * Don't succeed if resizing is disabled, as a reader might be 1985 * manipulating the ring buffer and is expecting a sane state while 1986 * this is true. 1987 */ 1988 for_each_buffer_cpu(buffer, cpu) { 1989 cpu_buffer = buffer->buffers[cpu]; 1990 if (atomic_read(&cpu_buffer->resize_disabled)) { 1991 err = -EBUSY; 1992 goto out_err_unlock; 1993 } 1994 } 1995 1996 /* calculate the pages to update */ 1997 for_each_buffer_cpu(buffer, cpu) { 1998 cpu_buffer = buffer->buffers[cpu]; 1999 2000 cpu_buffer->nr_pages_to_update = nr_pages - 2001 cpu_buffer->nr_pages; 2002 /* 2003 * nothing more to do for removing pages or no update 2004 */ 2005 if (cpu_buffer->nr_pages_to_update <= 0) 2006 continue; 2007 /* 2008 * to add pages, make sure all new pages can be 2009 * allocated without receiving ENOMEM 2010 */ 2011 INIT_LIST_HEAD(&cpu_buffer->new_pages); 2012 if (__rb_allocate_pages(cpu_buffer->nr_pages_to_update, 2013 &cpu_buffer->new_pages, cpu)) { 2014 /* not enough memory for new pages */ 2015 err = -ENOMEM; 2016 goto out_err; 2017 } 2018 } 2019 2020 get_online_cpus(); 2021 /* 2022 * Fire off all the required work handlers 2023 * We can't schedule on offline CPUs, but it's not necessary 2024 * since we can change their buffer sizes without any race. 2025 */ 2026 for_each_buffer_cpu(buffer, cpu) { 2027 cpu_buffer = buffer->buffers[cpu]; 2028 if (!cpu_buffer->nr_pages_to_update) 2029 continue; 2030 2031 /* Can't run something on an offline CPU. */ 2032 if (!cpu_online(cpu)) { 2033 rb_update_pages(cpu_buffer); 2034 cpu_buffer->nr_pages_to_update = 0; 2035 } else { 2036 schedule_work_on(cpu, 2037 &cpu_buffer->update_pages_work); 2038 } 2039 } 2040 2041 /* wait for all the updates to complete */ 2042 for_each_buffer_cpu(buffer, cpu) { 2043 cpu_buffer = buffer->buffers[cpu]; 2044 if (!cpu_buffer->nr_pages_to_update) 2045 continue; 2046 2047 if (cpu_online(cpu)) 2048 wait_for_completion(&cpu_buffer->update_done); 2049 cpu_buffer->nr_pages_to_update = 0; 2050 } 2051 2052 put_online_cpus(); 2053 } else { 2054 /* Make sure this CPU has been initialized */ 2055 if (!cpumask_test_cpu(cpu_id, buffer->cpumask)) 2056 goto out; 2057 2058 cpu_buffer = buffer->buffers[cpu_id]; 2059 2060 if (nr_pages == cpu_buffer->nr_pages) 2061 goto out; 2062 2063 /* 2064 * Don't succeed if resizing is disabled, as a reader might be 2065 * manipulating the ring buffer and is expecting a sane state while 2066 * this is true. 2067 */ 2068 if (atomic_read(&cpu_buffer->resize_disabled)) { 2069 err = -EBUSY; 2070 goto out_err_unlock; 2071 } 2072 2073 cpu_buffer->nr_pages_to_update = nr_pages - 2074 cpu_buffer->nr_pages; 2075 2076 INIT_LIST_HEAD(&cpu_buffer->new_pages); 2077 if (cpu_buffer->nr_pages_to_update > 0 && 2078 __rb_allocate_pages(cpu_buffer->nr_pages_to_update, 2079 &cpu_buffer->new_pages, cpu_id)) { 2080 err = -ENOMEM; 2081 goto out_err; 2082 } 2083 2084 get_online_cpus(); 2085 2086 /* Can't run something on an offline CPU. */ 2087 if (!cpu_online(cpu_id)) 2088 rb_update_pages(cpu_buffer); 2089 else { 2090 schedule_work_on(cpu_id, 2091 &cpu_buffer->update_pages_work); 2092 wait_for_completion(&cpu_buffer->update_done); 2093 } 2094 2095 cpu_buffer->nr_pages_to_update = 0; 2096 put_online_cpus(); 2097 } 2098 2099 out: 2100 /* 2101 * The ring buffer resize can happen with the ring buffer 2102 * enabled, so that the update disturbs the tracing as little 2103 * as possible. But if the buffer is disabled, we do not need 2104 * to worry about that, and we can take the time to verify 2105 * that the buffer is not corrupt. 2106 */ 2107 if (atomic_read(&buffer->record_disabled)) { 2108 atomic_inc(&buffer->record_disabled); 2109 /* 2110 * Even though the buffer was disabled, we must make sure 2111 * that it is truly disabled before calling rb_check_pages. 2112 * There could have been a race between checking 2113 * record_disable and incrementing it. 2114 */ 2115 synchronize_rcu(); 2116 for_each_buffer_cpu(buffer, cpu) { 2117 cpu_buffer = buffer->buffers[cpu]; 2118 rb_check_pages(cpu_buffer); 2119 } 2120 atomic_dec(&buffer->record_disabled); 2121 } 2122 2123 mutex_unlock(&buffer->mutex); 2124 return 0; 2125 2126 out_err: 2127 for_each_buffer_cpu(buffer, cpu) { 2128 struct buffer_page *bpage, *tmp; 2129 2130 cpu_buffer = buffer->buffers[cpu]; 2131 cpu_buffer->nr_pages_to_update = 0; 2132 2133 if (list_empty(&cpu_buffer->new_pages)) 2134 continue; 2135 2136 list_for_each_entry_safe(bpage, tmp, &cpu_buffer->new_pages, 2137 list) { 2138 list_del_init(&bpage->list); 2139 free_buffer_page(bpage); 2140 } 2141 } 2142 out_err_unlock: 2143 mutex_unlock(&buffer->mutex); 2144 return err; 2145 } 2146 EXPORT_SYMBOL_GPL(ring_buffer_resize); 2147 2148 void ring_buffer_change_overwrite(struct trace_buffer *buffer, int val) 2149 { 2150 mutex_lock(&buffer->mutex); 2151 if (val) 2152 buffer->flags |= RB_FL_OVERWRITE; 2153 else 2154 buffer->flags &= ~RB_FL_OVERWRITE; 2155 mutex_unlock(&buffer->mutex); 2156 } 2157 EXPORT_SYMBOL_GPL(ring_buffer_change_overwrite); 2158 2159 static __always_inline void *__rb_page_index(struct buffer_page *bpage, unsigned index) 2160 { 2161 return bpage->page->data + index; 2162 } 2163 2164 static __always_inline struct ring_buffer_event * 2165 rb_reader_event(struct ring_buffer_per_cpu *cpu_buffer) 2166 { 2167 return __rb_page_index(cpu_buffer->reader_page, 2168 cpu_buffer->reader_page->read); 2169 } 2170 2171 static __always_inline unsigned rb_page_commit(struct buffer_page *bpage) 2172 { 2173 return local_read(&bpage->page->commit); 2174 } 2175 2176 static struct ring_buffer_event * 2177 rb_iter_head_event(struct ring_buffer_iter *iter) 2178 { 2179 struct ring_buffer_event *event; 2180 struct buffer_page *iter_head_page = iter->head_page; 2181 unsigned long commit; 2182 unsigned length; 2183 2184 if (iter->head != iter->next_event) 2185 return iter->event; 2186 2187 /* 2188 * When the writer goes across pages, it issues a cmpxchg which 2189 * is a mb(), which will synchronize with the rmb here. 2190 * (see rb_tail_page_update() and __rb_reserve_next()) 2191 */ 2192 commit = rb_page_commit(iter_head_page); 2193 smp_rmb(); 2194 event = __rb_page_index(iter_head_page, iter->head); 2195 length = rb_event_length(event); 2196 2197 /* 2198 * READ_ONCE() doesn't work on functions and we don't want the 2199 * compiler doing any crazy optimizations with length. 2200 */ 2201 barrier(); 2202 2203 if ((iter->head + length) > commit || length > BUF_MAX_DATA_SIZE) 2204 /* Writer corrupted the read? */ 2205 goto reset; 2206 2207 memcpy(iter->event, event, length); 2208 /* 2209 * If the page stamp is still the same after this rmb() then the 2210 * event was safely copied without the writer entering the page. 2211 */ 2212 smp_rmb(); 2213 2214 /* Make sure the page didn't change since we read this */ 2215 if (iter->page_stamp != iter_head_page->page->time_stamp || 2216 commit > rb_page_commit(iter_head_page)) 2217 goto reset; 2218 2219 iter->next_event = iter->head + length; 2220 return iter->event; 2221 reset: 2222 /* Reset to the beginning */ 2223 iter->page_stamp = iter->read_stamp = iter->head_page->page->time_stamp; 2224 iter->head = 0; 2225 iter->next_event = 0; 2226 iter->missed_events = 1; 2227 return NULL; 2228 } 2229 2230 /* Size is determined by what has been committed */ 2231 static __always_inline unsigned rb_page_size(struct buffer_page *bpage) 2232 { 2233 return rb_page_commit(bpage); 2234 } 2235 2236 static __always_inline unsigned 2237 rb_commit_index(struct ring_buffer_per_cpu *cpu_buffer) 2238 { 2239 return rb_page_commit(cpu_buffer->commit_page); 2240 } 2241 2242 static __always_inline unsigned 2243 rb_event_index(struct ring_buffer_event *event) 2244 { 2245 unsigned long addr = (unsigned long)event; 2246 2247 return (addr & ~PAGE_MASK) - BUF_PAGE_HDR_SIZE; 2248 } 2249 2250 static void rb_inc_iter(struct ring_buffer_iter *iter) 2251 { 2252 struct ring_buffer_per_cpu *cpu_buffer = iter->cpu_buffer; 2253 2254 /* 2255 * The iterator could be on the reader page (it starts there). 2256 * But the head could have moved, since the reader was 2257 * found. Check for this case and assign the iterator 2258 * to the head page instead of next. 2259 */ 2260 if (iter->head_page == cpu_buffer->reader_page) 2261 iter->head_page = rb_set_head_page(cpu_buffer); 2262 else 2263 rb_inc_page(cpu_buffer, &iter->head_page); 2264 2265 iter->page_stamp = iter->read_stamp = iter->head_page->page->time_stamp; 2266 iter->head = 0; 2267 iter->next_event = 0; 2268 } 2269 2270 /* 2271 * rb_handle_head_page - writer hit the head page 2272 * 2273 * Returns: +1 to retry page 2274 * 0 to continue 2275 * -1 on error 2276 */ 2277 static int 2278 rb_handle_head_page(struct ring_buffer_per_cpu *cpu_buffer, 2279 struct buffer_page *tail_page, 2280 struct buffer_page *next_page) 2281 { 2282 struct buffer_page *new_head; 2283 int entries; 2284 int type; 2285 int ret; 2286 2287 entries = rb_page_entries(next_page); 2288 2289 /* 2290 * The hard part is here. We need to move the head 2291 * forward, and protect against both readers on 2292 * other CPUs and writers coming in via interrupts. 2293 */ 2294 type = rb_head_page_set_update(cpu_buffer, next_page, tail_page, 2295 RB_PAGE_HEAD); 2296 2297 /* 2298 * type can be one of four: 2299 * NORMAL - an interrupt already moved it for us 2300 * HEAD - we are the first to get here. 2301 * UPDATE - we are the interrupt interrupting 2302 * a current move. 2303 * MOVED - a reader on another CPU moved the next 2304 * pointer to its reader page. Give up 2305 * and try again. 2306 */ 2307 2308 switch (type) { 2309 case RB_PAGE_HEAD: 2310 /* 2311 * We changed the head to UPDATE, thus 2312 * it is our responsibility to update 2313 * the counters. 2314 */ 2315 local_add(entries, &cpu_buffer->overrun); 2316 local_sub(BUF_PAGE_SIZE, &cpu_buffer->entries_bytes); 2317 2318 /* 2319 * The entries will be zeroed out when we move the 2320 * tail page. 2321 */ 2322 2323 /* still more to do */ 2324 break; 2325 2326 case RB_PAGE_UPDATE: 2327 /* 2328 * This is an interrupt that interrupt the 2329 * previous update. Still more to do. 2330 */ 2331 break; 2332 case RB_PAGE_NORMAL: 2333 /* 2334 * An interrupt came in before the update 2335 * and processed this for us. 2336 * Nothing left to do. 2337 */ 2338 return 1; 2339 case RB_PAGE_MOVED: 2340 /* 2341 * The reader is on another CPU and just did 2342 * a swap with our next_page. 2343 * Try again. 2344 */ 2345 return 1; 2346 default: 2347 RB_WARN_ON(cpu_buffer, 1); /* WTF??? */ 2348 return -1; 2349 } 2350 2351 /* 2352 * Now that we are here, the old head pointer is 2353 * set to UPDATE. This will keep the reader from 2354 * swapping the head page with the reader page. 2355 * The reader (on another CPU) will spin till 2356 * we are finished. 2357 * 2358 * We just need to protect against interrupts 2359 * doing the job. We will set the next pointer 2360 * to HEAD. After that, we set the old pointer 2361 * to NORMAL, but only if it was HEAD before. 2362 * otherwise we are an interrupt, and only 2363 * want the outer most commit to reset it. 2364 */ 2365 new_head = next_page; 2366 rb_inc_page(cpu_buffer, &new_head); 2367 2368 ret = rb_head_page_set_head(cpu_buffer, new_head, next_page, 2369 RB_PAGE_NORMAL); 2370 2371 /* 2372 * Valid returns are: 2373 * HEAD - an interrupt came in and already set it. 2374 * NORMAL - One of two things: 2375 * 1) We really set it. 2376 * 2) A bunch of interrupts came in and moved 2377 * the page forward again. 2378 */ 2379 switch (ret) { 2380 case RB_PAGE_HEAD: 2381 case RB_PAGE_NORMAL: 2382 /* OK */ 2383 break; 2384 default: 2385 RB_WARN_ON(cpu_buffer, 1); 2386 return -1; 2387 } 2388 2389 /* 2390 * It is possible that an interrupt came in, 2391 * set the head up, then more interrupts came in 2392 * and moved it again. When we get back here, 2393 * the page would have been set to NORMAL but we 2394 * just set it back to HEAD. 2395 * 2396 * How do you detect this? Well, if that happened 2397 * the tail page would have moved. 2398 */ 2399 if (ret == RB_PAGE_NORMAL) { 2400 struct buffer_page *buffer_tail_page; 2401 2402 buffer_tail_page = READ_ONCE(cpu_buffer->tail_page); 2403 /* 2404 * If the tail had moved passed next, then we need 2405 * to reset the pointer. 2406 */ 2407 if (buffer_tail_page != tail_page && 2408 buffer_tail_page != next_page) 2409 rb_head_page_set_normal(cpu_buffer, new_head, 2410 next_page, 2411 RB_PAGE_HEAD); 2412 } 2413 2414 /* 2415 * If this was the outer most commit (the one that 2416 * changed the original pointer from HEAD to UPDATE), 2417 * then it is up to us to reset it to NORMAL. 2418 */ 2419 if (type == RB_PAGE_HEAD) { 2420 ret = rb_head_page_set_normal(cpu_buffer, next_page, 2421 tail_page, 2422 RB_PAGE_UPDATE); 2423 if (RB_WARN_ON(cpu_buffer, 2424 ret != RB_PAGE_UPDATE)) 2425 return -1; 2426 } 2427 2428 return 0; 2429 } 2430 2431 static inline void 2432 rb_reset_tail(struct ring_buffer_per_cpu *cpu_buffer, 2433 unsigned long tail, struct rb_event_info *info) 2434 { 2435 struct buffer_page *tail_page = info->tail_page; 2436 struct ring_buffer_event *event; 2437 unsigned long length = info->length; 2438 2439 /* 2440 * Only the event that crossed the page boundary 2441 * must fill the old tail_page with padding. 2442 */ 2443 if (tail >= BUF_PAGE_SIZE) { 2444 /* 2445 * If the page was filled, then we still need 2446 * to update the real_end. Reset it to zero 2447 * and the reader will ignore it. 2448 */ 2449 if (tail == BUF_PAGE_SIZE) 2450 tail_page->real_end = 0; 2451 2452 local_sub(length, &tail_page->write); 2453 return; 2454 } 2455 2456 event = __rb_page_index(tail_page, tail); 2457 2458 /* account for padding bytes */ 2459 local_add(BUF_PAGE_SIZE - tail, &cpu_buffer->entries_bytes); 2460 2461 /* 2462 * Save the original length to the meta data. 2463 * This will be used by the reader to add lost event 2464 * counter. 2465 */ 2466 tail_page->real_end = tail; 2467 2468 /* 2469 * If this event is bigger than the minimum size, then 2470 * we need to be careful that we don't subtract the 2471 * write counter enough to allow another writer to slip 2472 * in on this page. 2473 * We put in a discarded commit instead, to make sure 2474 * that this space is not used again. 2475 * 2476 * If we are less than the minimum size, we don't need to 2477 * worry about it. 2478 */ 2479 if (tail > (BUF_PAGE_SIZE - RB_EVNT_MIN_SIZE)) { 2480 /* No room for any events */ 2481 2482 /* Mark the rest of the page with padding */ 2483 rb_event_set_padding(event); 2484 2485 /* Set the write back to the previous setting */ 2486 local_sub(length, &tail_page->write); 2487 return; 2488 } 2489 2490 /* Put in a discarded event */ 2491 event->array[0] = (BUF_PAGE_SIZE - tail) - RB_EVNT_HDR_SIZE; 2492 event->type_len = RINGBUF_TYPE_PADDING; 2493 /* time delta must be non zero */ 2494 event->time_delta = 1; 2495 2496 /* Set write to end of buffer */ 2497 length = (tail + length) - BUF_PAGE_SIZE; 2498 local_sub(length, &tail_page->write); 2499 } 2500 2501 static inline void rb_end_commit(struct ring_buffer_per_cpu *cpu_buffer); 2502 2503 /* 2504 * This is the slow path, force gcc not to inline it. 2505 */ 2506 static noinline struct ring_buffer_event * 2507 rb_move_tail(struct ring_buffer_per_cpu *cpu_buffer, 2508 unsigned long tail, struct rb_event_info *info) 2509 { 2510 struct buffer_page *tail_page = info->tail_page; 2511 struct buffer_page *commit_page = cpu_buffer->commit_page; 2512 struct trace_buffer *buffer = cpu_buffer->buffer; 2513 struct buffer_page *next_page; 2514 int ret; 2515 2516 next_page = tail_page; 2517 2518 rb_inc_page(cpu_buffer, &next_page); 2519 2520 /* 2521 * If for some reason, we had an interrupt storm that made 2522 * it all the way around the buffer, bail, and warn 2523 * about it. 2524 */ 2525 if (unlikely(next_page == commit_page)) { 2526 local_inc(&cpu_buffer->commit_overrun); 2527 goto out_reset; 2528 } 2529 2530 /* 2531 * This is where the fun begins! 2532 * 2533 * We are fighting against races between a reader that 2534 * could be on another CPU trying to swap its reader 2535 * page with the buffer head. 2536 * 2537 * We are also fighting against interrupts coming in and 2538 * moving the head or tail on us as well. 2539 * 2540 * If the next page is the head page then we have filled 2541 * the buffer, unless the commit page is still on the 2542 * reader page. 2543 */ 2544 if (rb_is_head_page(cpu_buffer, next_page, &tail_page->list)) { 2545 2546 /* 2547 * If the commit is not on the reader page, then 2548 * move the header page. 2549 */ 2550 if (!rb_is_reader_page(cpu_buffer->commit_page)) { 2551 /* 2552 * If we are not in overwrite mode, 2553 * this is easy, just stop here. 2554 */ 2555 if (!(buffer->flags & RB_FL_OVERWRITE)) { 2556 local_inc(&cpu_buffer->dropped_events); 2557 goto out_reset; 2558 } 2559 2560 ret = rb_handle_head_page(cpu_buffer, 2561 tail_page, 2562 next_page); 2563 if (ret < 0) 2564 goto out_reset; 2565 if (ret) 2566 goto out_again; 2567 } else { 2568 /* 2569 * We need to be careful here too. The 2570 * commit page could still be on the reader 2571 * page. We could have a small buffer, and 2572 * have filled up the buffer with events 2573 * from interrupts and such, and wrapped. 2574 * 2575 * Note, if the tail page is also the on the 2576 * reader_page, we let it move out. 2577 */ 2578 if (unlikely((cpu_buffer->commit_page != 2579 cpu_buffer->tail_page) && 2580 (cpu_buffer->commit_page == 2581 cpu_buffer->reader_page))) { 2582 local_inc(&cpu_buffer->commit_overrun); 2583 goto out_reset; 2584 } 2585 } 2586 } 2587 2588 rb_tail_page_update(cpu_buffer, tail_page, next_page); 2589 2590 out_again: 2591 2592 rb_reset_tail(cpu_buffer, tail, info); 2593 2594 /* Commit what we have for now. */ 2595 rb_end_commit(cpu_buffer); 2596 /* rb_end_commit() decs committing */ 2597 local_inc(&cpu_buffer->committing); 2598 2599 /* fail and let the caller try again */ 2600 return ERR_PTR(-EAGAIN); 2601 2602 out_reset: 2603 /* reset write */ 2604 rb_reset_tail(cpu_buffer, tail, info); 2605 2606 return NULL; 2607 } 2608 2609 /* Slow path */ 2610 static struct ring_buffer_event * 2611 rb_add_time_stamp(struct ring_buffer_event *event, u64 delta, bool abs) 2612 { 2613 if (abs) 2614 event->type_len = RINGBUF_TYPE_TIME_STAMP; 2615 else 2616 event->type_len = RINGBUF_TYPE_TIME_EXTEND; 2617 2618 /* Not the first event on the page, or not delta? */ 2619 if (abs || rb_event_index(event)) { 2620 event->time_delta = delta & TS_MASK; 2621 event->array[0] = delta >> TS_SHIFT; 2622 } else { 2623 /* nope, just zero it */ 2624 event->time_delta = 0; 2625 event->array[0] = 0; 2626 } 2627 2628 return skip_time_extend(event); 2629 } 2630 2631 static inline bool rb_event_is_commit(struct ring_buffer_per_cpu *cpu_buffer, 2632 struct ring_buffer_event *event); 2633 2634 #ifndef CONFIG_HAVE_UNSTABLE_SCHED_CLOCK 2635 static inline bool sched_clock_stable(void) 2636 { 2637 return true; 2638 } 2639 #endif 2640 2641 static void 2642 rb_check_timestamp(struct ring_buffer_per_cpu *cpu_buffer, 2643 struct rb_event_info *info) 2644 { 2645 u64 write_stamp; 2646 2647 WARN_ONCE(1, "Delta way too big! %llu ts=%llu before=%llu after=%llu write stamp=%llu\n%s", 2648 (unsigned long long)info->delta, 2649 (unsigned long long)info->ts, 2650 (unsigned long long)info->before, 2651 (unsigned long long)info->after, 2652 (unsigned long long)(rb_time_read(&cpu_buffer->write_stamp, &write_stamp) ? write_stamp : 0), 2653 sched_clock_stable() ? "" : 2654 "If you just came from a suspend/resume,\n" 2655 "please switch to the trace global clock:\n" 2656 " echo global > /sys/kernel/debug/tracing/trace_clock\n" 2657 "or add trace_clock=global to the kernel command line\n"); 2658 } 2659 2660 static void rb_add_timestamp(struct ring_buffer_per_cpu *cpu_buffer, 2661 struct ring_buffer_event **event, 2662 struct rb_event_info *info, 2663 u64 *delta, 2664 unsigned int *length) 2665 { 2666 bool abs = info->add_timestamp & 2667 (RB_ADD_STAMP_FORCE | RB_ADD_STAMP_ABSOLUTE); 2668 2669 if (unlikely(info->delta > (1ULL << 59))) { 2670 /* did the clock go backwards */ 2671 if (info->before == info->after && info->before > info->ts) { 2672 /* not interrupted */ 2673 static int once; 2674 2675 /* 2676 * This is possible with a recalibrating of the TSC. 2677 * Do not produce a call stack, but just report it. 2678 */ 2679 if (!once) { 2680 once++; 2681 pr_warn("Ring buffer clock went backwards: %llu -> %llu\n", 2682 info->before, info->ts); 2683 } 2684 } else 2685 rb_check_timestamp(cpu_buffer, info); 2686 if (!abs) 2687 info->delta = 0; 2688 } 2689 *event = rb_add_time_stamp(*event, info->delta, abs); 2690 *length -= RB_LEN_TIME_EXTEND; 2691 *delta = 0; 2692 } 2693 2694 /** 2695 * rb_update_event - update event type and data 2696 * @cpu_buffer: The per cpu buffer of the @event 2697 * @event: the event to update 2698 * @info: The info to update the @event with (contains length and delta) 2699 * 2700 * Update the type and data fields of the @event. The length 2701 * is the actual size that is written to the ring buffer, 2702 * and with this, we can determine what to place into the 2703 * data field. 2704 */ 2705 static void 2706 rb_update_event(struct ring_buffer_per_cpu *cpu_buffer, 2707 struct ring_buffer_event *event, 2708 struct rb_event_info *info) 2709 { 2710 unsigned length = info->length; 2711 u64 delta = info->delta; 2712 2713 /* 2714 * If we need to add a timestamp, then we 2715 * add it to the start of the reserved space. 2716 */ 2717 if (unlikely(info->add_timestamp)) 2718 rb_add_timestamp(cpu_buffer, &event, info, &delta, &length); 2719 2720 event->time_delta = delta; 2721 length -= RB_EVNT_HDR_SIZE; 2722 if (length > RB_MAX_SMALL_DATA) { 2723 event->type_len = 0; 2724 event->array[0] = length; 2725 } else 2726 event->type_len = DIV_ROUND_UP(length, RB_ALIGNMENT); 2727 } 2728 2729 static unsigned rb_calculate_event_length(unsigned length) 2730 { 2731 struct ring_buffer_event event; /* Used only for sizeof array */ 2732 2733 /* zero length can cause confusions */ 2734 if (!length) 2735 length++; 2736 2737 if (length > RB_MAX_SMALL_DATA) 2738 length += sizeof(event.array[0]); 2739 2740 length += RB_EVNT_HDR_SIZE; 2741 length = ALIGN(length, RB_ALIGNMENT); 2742 2743 /* 2744 * In case the time delta is larger than the 27 bits for it 2745 * in the header, we need to add a timestamp. If another 2746 * event comes in when trying to discard this one to increase 2747 * the length, then the timestamp will be added in the allocated 2748 * space of this event. If length is bigger than the size needed 2749 * for the TIME_EXTEND, then padding has to be used. The events 2750 * length must be either RB_LEN_TIME_EXTEND, or greater than or equal 2751 * to RB_LEN_TIME_EXTEND + 8, as 8 is the minimum size for padding. 2752 * As length is a multiple of 4, we only need to worry if it 2753 * is 12 (RB_LEN_TIME_EXTEND + 4). 2754 */ 2755 if (length == RB_LEN_TIME_EXTEND + RB_ALIGNMENT) 2756 length += RB_ALIGNMENT; 2757 2758 return length; 2759 } 2760 2761 static __always_inline bool 2762 rb_event_is_commit(struct ring_buffer_per_cpu *cpu_buffer, 2763 struct ring_buffer_event *event) 2764 { 2765 unsigned long addr = (unsigned long)event; 2766 unsigned long index; 2767 2768 index = rb_event_index(event); 2769 addr &= PAGE_MASK; 2770 2771 return cpu_buffer->commit_page->page == (void *)addr && 2772 rb_commit_index(cpu_buffer) == index; 2773 } 2774 2775 static u64 rb_time_delta(struct ring_buffer_event *event) 2776 { 2777 switch (event->type_len) { 2778 case RINGBUF_TYPE_PADDING: 2779 return 0; 2780 2781 case RINGBUF_TYPE_TIME_EXTEND: 2782 return ring_buffer_event_time_stamp(event); 2783 2784 case RINGBUF_TYPE_TIME_STAMP: 2785 return 0; 2786 2787 case RINGBUF_TYPE_DATA: 2788 return event->time_delta; 2789 default: 2790 return 0; 2791 } 2792 } 2793 2794 static inline int 2795 rb_try_to_discard(struct ring_buffer_per_cpu *cpu_buffer, 2796 struct ring_buffer_event *event) 2797 { 2798 unsigned long new_index, old_index; 2799 struct buffer_page *bpage; 2800 unsigned long index; 2801 unsigned long addr; 2802 u64 write_stamp; 2803 u64 delta; 2804 2805 new_index = rb_event_index(event); 2806 old_index = new_index + rb_event_ts_length(event); 2807 addr = (unsigned long)event; 2808 addr &= PAGE_MASK; 2809 2810 bpage = READ_ONCE(cpu_buffer->tail_page); 2811 2812 delta = rb_time_delta(event); 2813 2814 if (!rb_time_read(&cpu_buffer->write_stamp, &write_stamp)) 2815 return 0; 2816 2817 /* Make sure the write stamp is read before testing the location */ 2818 barrier(); 2819 2820 if (bpage->page == (void *)addr && rb_page_write(bpage) == old_index) { 2821 unsigned long write_mask = 2822 local_read(&bpage->write) & ~RB_WRITE_MASK; 2823 unsigned long event_length = rb_event_length(event); 2824 2825 /* Something came in, can't discard */ 2826 if (!rb_time_cmpxchg(&cpu_buffer->write_stamp, 2827 write_stamp, write_stamp - delta)) 2828 return 0; 2829 2830 /* 2831 * If an event were to come in now, it would see that the 2832 * write_stamp and the before_stamp are different, and assume 2833 * that this event just added itself before updating 2834 * the write stamp. The interrupting event will fix the 2835 * write stamp for us, and use the before stamp as its delta. 2836 */ 2837 2838 /* 2839 * This is on the tail page. It is possible that 2840 * a write could come in and move the tail page 2841 * and write to the next page. That is fine 2842 * because we just shorten what is on this page. 2843 */ 2844 old_index += write_mask; 2845 new_index += write_mask; 2846 index = local_cmpxchg(&bpage->write, old_index, new_index); 2847 if (index == old_index) { 2848 /* update counters */ 2849 local_sub(event_length, &cpu_buffer->entries_bytes); 2850 return 1; 2851 } 2852 } 2853 2854 /* could not discard */ 2855 return 0; 2856 } 2857 2858 static void rb_start_commit(struct ring_buffer_per_cpu *cpu_buffer) 2859 { 2860 local_inc(&cpu_buffer->committing); 2861 local_inc(&cpu_buffer->commits); 2862 } 2863 2864 static __always_inline void 2865 rb_set_commit_to_write(struct ring_buffer_per_cpu *cpu_buffer) 2866 { 2867 unsigned long max_count; 2868 2869 /* 2870 * We only race with interrupts and NMIs on this CPU. 2871 * If we own the commit event, then we can commit 2872 * all others that interrupted us, since the interruptions 2873 * are in stack format (they finish before they come 2874 * back to us). This allows us to do a simple loop to 2875 * assign the commit to the tail. 2876 */ 2877 again: 2878 max_count = cpu_buffer->nr_pages * 100; 2879 2880 while (cpu_buffer->commit_page != READ_ONCE(cpu_buffer->tail_page)) { 2881 if (RB_WARN_ON(cpu_buffer, !(--max_count))) 2882 return; 2883 if (RB_WARN_ON(cpu_buffer, 2884 rb_is_reader_page(cpu_buffer->tail_page))) 2885 return; 2886 local_set(&cpu_buffer->commit_page->page->commit, 2887 rb_page_write(cpu_buffer->commit_page)); 2888 rb_inc_page(cpu_buffer, &cpu_buffer->commit_page); 2889 /* add barrier to keep gcc from optimizing too much */ 2890 barrier(); 2891 } 2892 while (rb_commit_index(cpu_buffer) != 2893 rb_page_write(cpu_buffer->commit_page)) { 2894 2895 local_set(&cpu_buffer->commit_page->page->commit, 2896 rb_page_write(cpu_buffer->commit_page)); 2897 RB_WARN_ON(cpu_buffer, 2898 local_read(&cpu_buffer->commit_page->page->commit) & 2899 ~RB_WRITE_MASK); 2900 barrier(); 2901 } 2902 2903 /* again, keep gcc from optimizing */ 2904 barrier(); 2905 2906 /* 2907 * If an interrupt came in just after the first while loop 2908 * and pushed the tail page forward, we will be left with 2909 * a dangling commit that will never go forward. 2910 */ 2911 if (unlikely(cpu_buffer->commit_page != READ_ONCE(cpu_buffer->tail_page))) 2912 goto again; 2913 } 2914 2915 static __always_inline void rb_end_commit(struct ring_buffer_per_cpu *cpu_buffer) 2916 { 2917 unsigned long commits; 2918 2919 if (RB_WARN_ON(cpu_buffer, 2920 !local_read(&cpu_buffer->committing))) 2921 return; 2922 2923 again: 2924 commits = local_read(&cpu_buffer->commits); 2925 /* synchronize with interrupts */ 2926 barrier(); 2927 if (local_read(&cpu_buffer->committing) == 1) 2928 rb_set_commit_to_write(cpu_buffer); 2929 2930 local_dec(&cpu_buffer->committing); 2931 2932 /* synchronize with interrupts */ 2933 barrier(); 2934 2935 /* 2936 * Need to account for interrupts coming in between the 2937 * updating of the commit page and the clearing of the 2938 * committing counter. 2939 */ 2940 if (unlikely(local_read(&cpu_buffer->commits) != commits) && 2941 !local_read(&cpu_buffer->committing)) { 2942 local_inc(&cpu_buffer->committing); 2943 goto again; 2944 } 2945 } 2946 2947 static inline void rb_event_discard(struct ring_buffer_event *event) 2948 { 2949 if (extended_time(event)) 2950 event = skip_time_extend(event); 2951 2952 /* array[0] holds the actual length for the discarded event */ 2953 event->array[0] = rb_event_data_length(event) - RB_EVNT_HDR_SIZE; 2954 event->type_len = RINGBUF_TYPE_PADDING; 2955 /* time delta must be non zero */ 2956 if (!event->time_delta) 2957 event->time_delta = 1; 2958 } 2959 2960 static void rb_commit(struct ring_buffer_per_cpu *cpu_buffer, 2961 struct ring_buffer_event *event) 2962 { 2963 local_inc(&cpu_buffer->entries); 2964 rb_end_commit(cpu_buffer); 2965 } 2966 2967 static __always_inline void 2968 rb_wakeups(struct trace_buffer *buffer, struct ring_buffer_per_cpu *cpu_buffer) 2969 { 2970 size_t nr_pages; 2971 size_t dirty; 2972 size_t full; 2973 2974 if (buffer->irq_work.waiters_pending) { 2975 buffer->irq_work.waiters_pending = false; 2976 /* irq_work_queue() supplies it's own memory barriers */ 2977 irq_work_queue(&buffer->irq_work.work); 2978 } 2979 2980 if (cpu_buffer->irq_work.waiters_pending) { 2981 cpu_buffer->irq_work.waiters_pending = false; 2982 /* irq_work_queue() supplies it's own memory barriers */ 2983 irq_work_queue(&cpu_buffer->irq_work.work); 2984 } 2985 2986 if (cpu_buffer->last_pages_touch == local_read(&cpu_buffer->pages_touched)) 2987 return; 2988 2989 if (cpu_buffer->reader_page == cpu_buffer->commit_page) 2990 return; 2991 2992 if (!cpu_buffer->irq_work.full_waiters_pending) 2993 return; 2994 2995 cpu_buffer->last_pages_touch = local_read(&cpu_buffer->pages_touched); 2996 2997 full = cpu_buffer->shortest_full; 2998 nr_pages = cpu_buffer->nr_pages; 2999 dirty = ring_buffer_nr_dirty_pages(buffer, cpu_buffer->cpu); 3000 if (full && nr_pages && (dirty * 100) <= full * nr_pages) 3001 return; 3002 3003 cpu_buffer->irq_work.wakeup_full = true; 3004 cpu_buffer->irq_work.full_waiters_pending = false; 3005 /* irq_work_queue() supplies it's own memory barriers */ 3006 irq_work_queue(&cpu_buffer->irq_work.work); 3007 } 3008 3009 /* 3010 * The lock and unlock are done within a preempt disable section. 3011 * The current_context per_cpu variable can only be modified 3012 * by the current task between lock and unlock. But it can 3013 * be modified more than once via an interrupt. To pass this 3014 * information from the lock to the unlock without having to 3015 * access the 'in_interrupt()' functions again (which do show 3016 * a bit of overhead in something as critical as function tracing, 3017 * we use a bitmask trick. 3018 * 3019 * bit 1 = NMI context 3020 * bit 2 = IRQ context 3021 * bit 3 = SoftIRQ context 3022 * bit 4 = normal context. 3023 * 3024 * This works because this is the order of contexts that can 3025 * preempt other contexts. A SoftIRQ never preempts an IRQ 3026 * context. 3027 * 3028 * When the context is determined, the corresponding bit is 3029 * checked and set (if it was set, then a recursion of that context 3030 * happened). 3031 * 3032 * On unlock, we need to clear this bit. To do so, just subtract 3033 * 1 from the current_context and AND it to itself. 3034 * 3035 * (binary) 3036 * 101 - 1 = 100 3037 * 101 & 100 = 100 (clearing bit zero) 3038 * 3039 * 1010 - 1 = 1001 3040 * 1010 & 1001 = 1000 (clearing bit 1) 3041 * 3042 * The least significant bit can be cleared this way, and it 3043 * just so happens that it is the same bit corresponding to 3044 * the current context. 3045 * 3046 * Now the TRANSITION bit breaks the above slightly. The TRANSITION bit 3047 * is set when a recursion is detected at the current context, and if 3048 * the TRANSITION bit is already set, it will fail the recursion. 3049 * This is needed because there's a lag between the changing of 3050 * interrupt context and updating the preempt count. In this case, 3051 * a false positive will be found. To handle this, one extra recursion 3052 * is allowed, and this is done by the TRANSITION bit. If the TRANSITION 3053 * bit is already set, then it is considered a recursion and the function 3054 * ends. Otherwise, the TRANSITION bit is set, and that bit is returned. 3055 * 3056 * On the trace_recursive_unlock(), the TRANSITION bit will be the first 3057 * to be cleared. Even if it wasn't the context that set it. That is, 3058 * if an interrupt comes in while NORMAL bit is set and the ring buffer 3059 * is called before preempt_count() is updated, since the check will 3060 * be on the NORMAL bit, the TRANSITION bit will then be set. If an 3061 * NMI then comes in, it will set the NMI bit, but when the NMI code 3062 * does the trace_recursive_unlock() it will clear the TRANSTION bit 3063 * and leave the NMI bit set. But this is fine, because the interrupt 3064 * code that set the TRANSITION bit will then clear the NMI bit when it 3065 * calls trace_recursive_unlock(). If another NMI comes in, it will 3066 * set the TRANSITION bit and continue. 3067 * 3068 * Note: The TRANSITION bit only handles a single transition between context. 3069 */ 3070 3071 static __always_inline int 3072 trace_recursive_lock(struct ring_buffer_per_cpu *cpu_buffer) 3073 { 3074 unsigned int val = cpu_buffer->current_context; 3075 unsigned long pc = preempt_count(); 3076 int bit; 3077 3078 if (!(pc & (NMI_MASK | HARDIRQ_MASK | SOFTIRQ_OFFSET))) 3079 bit = RB_CTX_NORMAL; 3080 else 3081 bit = pc & NMI_MASK ? RB_CTX_NMI : 3082 pc & HARDIRQ_MASK ? RB_CTX_IRQ : RB_CTX_SOFTIRQ; 3083 3084 if (unlikely(val & (1 << (bit + cpu_buffer->nest)))) { 3085 /* 3086 * It is possible that this was called by transitioning 3087 * between interrupt context, and preempt_count() has not 3088 * been updated yet. In this case, use the TRANSITION bit. 3089 */ 3090 bit = RB_CTX_TRANSITION; 3091 if (val & (1 << (bit + cpu_buffer->nest))) 3092 return 1; 3093 } 3094 3095 val |= (1 << (bit + cpu_buffer->nest)); 3096 cpu_buffer->current_context = val; 3097 3098 return 0; 3099 } 3100 3101 static __always_inline void 3102 trace_recursive_unlock(struct ring_buffer_per_cpu *cpu_buffer) 3103 { 3104 cpu_buffer->current_context &= 3105 cpu_buffer->current_context - (1 << cpu_buffer->nest); 3106 } 3107 3108 /* The recursive locking above uses 5 bits */ 3109 #define NESTED_BITS 5 3110 3111 /** 3112 * ring_buffer_nest_start - Allow to trace while nested 3113 * @buffer: The ring buffer to modify 3114 * 3115 * The ring buffer has a safety mechanism to prevent recursion. 3116 * But there may be a case where a trace needs to be done while 3117 * tracing something else. In this case, calling this function 3118 * will allow this function to nest within a currently active 3119 * ring_buffer_lock_reserve(). 3120 * 3121 * Call this function before calling another ring_buffer_lock_reserve() and 3122 * call ring_buffer_nest_end() after the nested ring_buffer_unlock_commit(). 3123 */ 3124 void ring_buffer_nest_start(struct trace_buffer *buffer) 3125 { 3126 struct ring_buffer_per_cpu *cpu_buffer; 3127 int cpu; 3128 3129 /* Enabled by ring_buffer_nest_end() */ 3130 preempt_disable_notrace(); 3131 cpu = raw_smp_processor_id(); 3132 cpu_buffer = buffer->buffers[cpu]; 3133 /* This is the shift value for the above recursive locking */ 3134 cpu_buffer->nest += NESTED_BITS; 3135 } 3136 3137 /** 3138 * ring_buffer_nest_end - Allow to trace while nested 3139 * @buffer: The ring buffer to modify 3140 * 3141 * Must be called after ring_buffer_nest_start() and after the 3142 * ring_buffer_unlock_commit(). 3143 */ 3144 void ring_buffer_nest_end(struct trace_buffer *buffer) 3145 { 3146 struct ring_buffer_per_cpu *cpu_buffer; 3147 int cpu; 3148 3149 /* disabled by ring_buffer_nest_start() */ 3150 cpu = raw_smp_processor_id(); 3151 cpu_buffer = buffer->buffers[cpu]; 3152 /* This is the shift value for the above recursive locking */ 3153 cpu_buffer->nest -= NESTED_BITS; 3154 preempt_enable_notrace(); 3155 } 3156 3157 /** 3158 * ring_buffer_unlock_commit - commit a reserved 3159 * @buffer: The buffer to commit to 3160 * @event: The event pointer to commit. 3161 * 3162 * This commits the data to the ring buffer, and releases any locks held. 3163 * 3164 * Must be paired with ring_buffer_lock_reserve. 3165 */ 3166 int ring_buffer_unlock_commit(struct trace_buffer *buffer, 3167 struct ring_buffer_event *event) 3168 { 3169 struct ring_buffer_per_cpu *cpu_buffer; 3170 int cpu = raw_smp_processor_id(); 3171 3172 cpu_buffer = buffer->buffers[cpu]; 3173 3174 rb_commit(cpu_buffer, event); 3175 3176 rb_wakeups(buffer, cpu_buffer); 3177 3178 trace_recursive_unlock(cpu_buffer); 3179 3180 preempt_enable_notrace(); 3181 3182 return 0; 3183 } 3184 EXPORT_SYMBOL_GPL(ring_buffer_unlock_commit); 3185 3186 static struct ring_buffer_event * 3187 __rb_reserve_next(struct ring_buffer_per_cpu *cpu_buffer, 3188 struct rb_event_info *info) 3189 { 3190 struct ring_buffer_event *event; 3191 struct buffer_page *tail_page; 3192 unsigned long tail, write, w; 3193 bool a_ok; 3194 bool b_ok; 3195 3196 /* Don't let the compiler play games with cpu_buffer->tail_page */ 3197 tail_page = info->tail_page = READ_ONCE(cpu_buffer->tail_page); 3198 3199 /*A*/ w = local_read(&tail_page->write) & RB_WRITE_MASK; 3200 barrier(); 3201 b_ok = rb_time_read(&cpu_buffer->before_stamp, &info->before); 3202 a_ok = rb_time_read(&cpu_buffer->write_stamp, &info->after); 3203 barrier(); 3204 info->ts = rb_time_stamp(cpu_buffer->buffer); 3205 3206 if ((info->add_timestamp & RB_ADD_STAMP_ABSOLUTE)) { 3207 info->delta = info->ts; 3208 } else { 3209 /* 3210 * If interrupting an event time update, we may need an 3211 * absolute timestamp. 3212 * Don't bother if this is the start of a new page (w == 0). 3213 */ 3214 if (unlikely(!a_ok || !b_ok || (info->before != info->after && w))) { 3215 info->add_timestamp |= RB_ADD_STAMP_FORCE | RB_ADD_STAMP_EXTEND; 3216 info->length += RB_LEN_TIME_EXTEND; 3217 } else { 3218 info->delta = info->ts - info->after; 3219 if (unlikely(test_time_stamp(info->delta))) { 3220 info->add_timestamp |= RB_ADD_STAMP_EXTEND; 3221 info->length += RB_LEN_TIME_EXTEND; 3222 } 3223 } 3224 } 3225 3226 /*B*/ rb_time_set(&cpu_buffer->before_stamp, info->ts); 3227 3228 /*C*/ write = local_add_return(info->length, &tail_page->write); 3229 3230 /* set write to only the index of the write */ 3231 write &= RB_WRITE_MASK; 3232 3233 tail = write - info->length; 3234 3235 /* See if we shot pass the end of this buffer page */ 3236 if (unlikely(write > BUF_PAGE_SIZE)) { 3237 if (tail != w) { 3238 /* before and after may now different, fix it up*/ 3239 b_ok = rb_time_read(&cpu_buffer->before_stamp, &info->before); 3240 a_ok = rb_time_read(&cpu_buffer->write_stamp, &info->after); 3241 if (a_ok && b_ok && info->before != info->after) 3242 (void)rb_time_cmpxchg(&cpu_buffer->before_stamp, 3243 info->before, info->after); 3244 } 3245 return rb_move_tail(cpu_buffer, tail, info); 3246 } 3247 3248 if (likely(tail == w)) { 3249 u64 save_before; 3250 bool s_ok; 3251 3252 /* Nothing interrupted us between A and C */ 3253 /*D*/ rb_time_set(&cpu_buffer->write_stamp, info->ts); 3254 barrier(); 3255 /*E*/ s_ok = rb_time_read(&cpu_buffer->before_stamp, &save_before); 3256 RB_WARN_ON(cpu_buffer, !s_ok); 3257 if (likely(!(info->add_timestamp & 3258 (RB_ADD_STAMP_FORCE | RB_ADD_STAMP_ABSOLUTE)))) 3259 /* This did not interrupt any time update */ 3260 info->delta = info->ts - info->after; 3261 else 3262 /* Just use full timestamp for inerrupting event */ 3263 info->delta = info->ts; 3264 barrier(); 3265 if (unlikely(info->ts != save_before)) { 3266 /* SLOW PATH - Interrupted between C and E */ 3267 3268 a_ok = rb_time_read(&cpu_buffer->write_stamp, &info->after); 3269 RB_WARN_ON(cpu_buffer, !a_ok); 3270 3271 /* Write stamp must only go forward */ 3272 if (save_before > info->after) { 3273 /* 3274 * We do not care about the result, only that 3275 * it gets updated atomically. 3276 */ 3277 (void)rb_time_cmpxchg(&cpu_buffer->write_stamp, 3278 info->after, save_before); 3279 } 3280 } 3281 } else { 3282 u64 ts; 3283 /* SLOW PATH - Interrupted between A and C */ 3284 a_ok = rb_time_read(&cpu_buffer->write_stamp, &info->after); 3285 /* Was interrupted before here, write_stamp must be valid */ 3286 RB_WARN_ON(cpu_buffer, !a_ok); 3287 ts = rb_time_stamp(cpu_buffer->buffer); 3288 barrier(); 3289 /*E*/ if (write == (local_read(&tail_page->write) & RB_WRITE_MASK) && 3290 info->after < ts) { 3291 /* Nothing came after this event between C and E */ 3292 info->delta = ts - info->after; 3293 (void)rb_time_cmpxchg(&cpu_buffer->write_stamp, 3294 info->after, info->ts); 3295 info->ts = ts; 3296 } else { 3297 /* 3298 * Interrupted beween C and E: 3299 * Lost the previous events time stamp. Just set the 3300 * delta to zero, and this will be the same time as 3301 * the event this event interrupted. And the events that 3302 * came after this will still be correct (as they would 3303 * have built their delta on the previous event. 3304 */ 3305 info->delta = 0; 3306 } 3307 info->add_timestamp &= ~RB_ADD_STAMP_FORCE; 3308 } 3309 3310 /* 3311 * If this is the first commit on the page, then it has the same 3312 * timestamp as the page itself. 3313 */ 3314 if (unlikely(!tail && !(info->add_timestamp & 3315 (RB_ADD_STAMP_FORCE | RB_ADD_STAMP_ABSOLUTE)))) 3316 info->delta = 0; 3317 3318 /* We reserved something on the buffer */ 3319 3320 event = __rb_page_index(tail_page, tail); 3321 rb_update_event(cpu_buffer, event, info); 3322 3323 local_inc(&tail_page->entries); 3324 3325 /* 3326 * If this is the first commit on the page, then update 3327 * its timestamp. 3328 */ 3329 if (unlikely(!tail)) 3330 tail_page->page->time_stamp = info->ts; 3331 3332 /* account for these added bytes */ 3333 local_add(info->length, &cpu_buffer->entries_bytes); 3334 3335 return event; 3336 } 3337 3338 static __always_inline struct ring_buffer_event * 3339 rb_reserve_next_event(struct trace_buffer *buffer, 3340 struct ring_buffer_per_cpu *cpu_buffer, 3341 unsigned long length) 3342 { 3343 struct ring_buffer_event *event; 3344 struct rb_event_info info; 3345 int nr_loops = 0; 3346 int add_ts_default; 3347 3348 rb_start_commit(cpu_buffer); 3349 /* The commit page can not change after this */ 3350 3351 #ifdef CONFIG_RING_BUFFER_ALLOW_SWAP 3352 /* 3353 * Due to the ability to swap a cpu buffer from a buffer 3354 * it is possible it was swapped before we committed. 3355 * (committing stops a swap). We check for it here and 3356 * if it happened, we have to fail the write. 3357 */ 3358 barrier(); 3359 if (unlikely(READ_ONCE(cpu_buffer->buffer) != buffer)) { 3360 local_dec(&cpu_buffer->committing); 3361 local_dec(&cpu_buffer->commits); 3362 return NULL; 3363 } 3364 #endif 3365 3366 info.length = rb_calculate_event_length(length); 3367 3368 if (ring_buffer_time_stamp_abs(cpu_buffer->buffer)) { 3369 add_ts_default = RB_ADD_STAMP_ABSOLUTE; 3370 info.length += RB_LEN_TIME_EXTEND; 3371 } else { 3372 add_ts_default = RB_ADD_STAMP_NONE; 3373 } 3374 3375 again: 3376 info.add_timestamp = add_ts_default; 3377 info.delta = 0; 3378 3379 /* 3380 * We allow for interrupts to reenter here and do a trace. 3381 * If one does, it will cause this original code to loop 3382 * back here. Even with heavy interrupts happening, this 3383 * should only happen a few times in a row. If this happens 3384 * 1000 times in a row, there must be either an interrupt 3385 * storm or we have something buggy. 3386 * Bail! 3387 */ 3388 if (RB_WARN_ON(cpu_buffer, ++nr_loops > 1000)) 3389 goto out_fail; 3390 3391 event = __rb_reserve_next(cpu_buffer, &info); 3392 3393 if (unlikely(PTR_ERR(event) == -EAGAIN)) { 3394 if (info.add_timestamp & (RB_ADD_STAMP_FORCE | RB_ADD_STAMP_EXTEND)) 3395 info.length -= RB_LEN_TIME_EXTEND; 3396 goto again; 3397 } 3398 3399 if (likely(event)) 3400 return event; 3401 out_fail: 3402 rb_end_commit(cpu_buffer); 3403 return NULL; 3404 } 3405 3406 /** 3407 * ring_buffer_lock_reserve - reserve a part of the buffer 3408 * @buffer: the ring buffer to reserve from 3409 * @length: the length of the data to reserve (excluding event header) 3410 * 3411 * Returns a reserved event on the ring buffer to copy directly to. 3412 * The user of this interface will need to get the body to write into 3413 * and can use the ring_buffer_event_data() interface. 3414 * 3415 * The length is the length of the data needed, not the event length 3416 * which also includes the event header. 3417 * 3418 * Must be paired with ring_buffer_unlock_commit, unless NULL is returned. 3419 * If NULL is returned, then nothing has been allocated or locked. 3420 */ 3421 struct ring_buffer_event * 3422 ring_buffer_lock_reserve(struct trace_buffer *buffer, unsigned long length) 3423 { 3424 struct ring_buffer_per_cpu *cpu_buffer; 3425 struct ring_buffer_event *event; 3426 int cpu; 3427 3428 /* If we are tracing schedule, we don't want to recurse */ 3429 preempt_disable_notrace(); 3430 3431 if (unlikely(atomic_read(&buffer->record_disabled))) 3432 goto out; 3433 3434 cpu = raw_smp_processor_id(); 3435 3436 if (unlikely(!cpumask_test_cpu(cpu, buffer->cpumask))) 3437 goto out; 3438 3439 cpu_buffer = buffer->buffers[cpu]; 3440 3441 if (unlikely(atomic_read(&cpu_buffer->record_disabled))) 3442 goto out; 3443 3444 if (unlikely(length > BUF_MAX_DATA_SIZE)) 3445 goto out; 3446 3447 if (unlikely(trace_recursive_lock(cpu_buffer))) 3448 goto out; 3449 3450 event = rb_reserve_next_event(buffer, cpu_buffer, length); 3451 if (!event) 3452 goto out_unlock; 3453 3454 return event; 3455 3456 out_unlock: 3457 trace_recursive_unlock(cpu_buffer); 3458 out: 3459 preempt_enable_notrace(); 3460 return NULL; 3461 } 3462 EXPORT_SYMBOL_GPL(ring_buffer_lock_reserve); 3463 3464 /* 3465 * Decrement the entries to the page that an event is on. 3466 * The event does not even need to exist, only the pointer 3467 * to the page it is on. This may only be called before the commit 3468 * takes place. 3469 */ 3470 static inline void 3471 rb_decrement_entry(struct ring_buffer_per_cpu *cpu_buffer, 3472 struct ring_buffer_event *event) 3473 { 3474 unsigned long addr = (unsigned long)event; 3475 struct buffer_page *bpage = cpu_buffer->commit_page; 3476 struct buffer_page *start; 3477 3478 addr &= PAGE_MASK; 3479 3480 /* Do the likely case first */ 3481 if (likely(bpage->page == (void *)addr)) { 3482 local_dec(&bpage->entries); 3483 return; 3484 } 3485 3486 /* 3487 * Because the commit page may be on the reader page we 3488 * start with the next page and check the end loop there. 3489 */ 3490 rb_inc_page(cpu_buffer, &bpage); 3491 start = bpage; 3492 do { 3493 if (bpage->page == (void *)addr) { 3494 local_dec(&bpage->entries); 3495 return; 3496 } 3497 rb_inc_page(cpu_buffer, &bpage); 3498 } while (bpage != start); 3499 3500 /* commit not part of this buffer?? */ 3501 RB_WARN_ON(cpu_buffer, 1); 3502 } 3503 3504 /** 3505 * ring_buffer_commit_discard - discard an event that has not been committed 3506 * @buffer: the ring buffer 3507 * @event: non committed event to discard 3508 * 3509 * Sometimes an event that is in the ring buffer needs to be ignored. 3510 * This function lets the user discard an event in the ring buffer 3511 * and then that event will not be read later. 3512 * 3513 * This function only works if it is called before the item has been 3514 * committed. It will try to free the event from the ring buffer 3515 * if another event has not been added behind it. 3516 * 3517 * If another event has been added behind it, it will set the event 3518 * up as discarded, and perform the commit. 3519 * 3520 * If this function is called, do not call ring_buffer_unlock_commit on 3521 * the event. 3522 */ 3523 void ring_buffer_discard_commit(struct trace_buffer *buffer, 3524 struct ring_buffer_event *event) 3525 { 3526 struct ring_buffer_per_cpu *cpu_buffer; 3527 int cpu; 3528 3529 /* The event is discarded regardless */ 3530 rb_event_discard(event); 3531 3532 cpu = smp_processor_id(); 3533 cpu_buffer = buffer->buffers[cpu]; 3534 3535 /* 3536 * This must only be called if the event has not been 3537 * committed yet. Thus we can assume that preemption 3538 * is still disabled. 3539 */ 3540 RB_WARN_ON(buffer, !local_read(&cpu_buffer->committing)); 3541 3542 rb_decrement_entry(cpu_buffer, event); 3543 if (rb_try_to_discard(cpu_buffer, event)) 3544 goto out; 3545 3546 out: 3547 rb_end_commit(cpu_buffer); 3548 3549 trace_recursive_unlock(cpu_buffer); 3550 3551 preempt_enable_notrace(); 3552 3553 } 3554 EXPORT_SYMBOL_GPL(ring_buffer_discard_commit); 3555 3556 /** 3557 * ring_buffer_write - write data to the buffer without reserving 3558 * @buffer: The ring buffer to write to. 3559 * @length: The length of the data being written (excluding the event header) 3560 * @data: The data to write to the buffer. 3561 * 3562 * This is like ring_buffer_lock_reserve and ring_buffer_unlock_commit as 3563 * one function. If you already have the data to write to the buffer, it 3564 * may be easier to simply call this function. 3565 * 3566 * Note, like ring_buffer_lock_reserve, the length is the length of the data 3567 * and not the length of the event which would hold the header. 3568 */ 3569 int ring_buffer_write(struct trace_buffer *buffer, 3570 unsigned long length, 3571 void *data) 3572 { 3573 struct ring_buffer_per_cpu *cpu_buffer; 3574 struct ring_buffer_event *event; 3575 void *body; 3576 int ret = -EBUSY; 3577 int cpu; 3578 3579 preempt_disable_notrace(); 3580 3581 if (atomic_read(&buffer->record_disabled)) 3582 goto out; 3583 3584 cpu = raw_smp_processor_id(); 3585 3586 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 3587 goto out; 3588 3589 cpu_buffer = buffer->buffers[cpu]; 3590 3591 if (atomic_read(&cpu_buffer->record_disabled)) 3592 goto out; 3593 3594 if (length > BUF_MAX_DATA_SIZE) 3595 goto out; 3596 3597 if (unlikely(trace_recursive_lock(cpu_buffer))) 3598 goto out; 3599 3600 event = rb_reserve_next_event(buffer, cpu_buffer, length); 3601 if (!event) 3602 goto out_unlock; 3603 3604 body = rb_event_data(event); 3605 3606 memcpy(body, data, length); 3607 3608 rb_commit(cpu_buffer, event); 3609 3610 rb_wakeups(buffer, cpu_buffer); 3611 3612 ret = 0; 3613 3614 out_unlock: 3615 trace_recursive_unlock(cpu_buffer); 3616 3617 out: 3618 preempt_enable_notrace(); 3619 3620 return ret; 3621 } 3622 EXPORT_SYMBOL_GPL(ring_buffer_write); 3623 3624 static bool rb_per_cpu_empty(struct ring_buffer_per_cpu *cpu_buffer) 3625 { 3626 struct buffer_page *reader = cpu_buffer->reader_page; 3627 struct buffer_page *head = rb_set_head_page(cpu_buffer); 3628 struct buffer_page *commit = cpu_buffer->commit_page; 3629 3630 /* In case of error, head will be NULL */ 3631 if (unlikely(!head)) 3632 return true; 3633 3634 return reader->read == rb_page_commit(reader) && 3635 (commit == reader || 3636 (commit == head && 3637 head->read == rb_page_commit(commit))); 3638 } 3639 3640 /** 3641 * ring_buffer_record_disable - stop all writes into the buffer 3642 * @buffer: The ring buffer to stop writes to. 3643 * 3644 * This prevents all writes to the buffer. Any attempt to write 3645 * to the buffer after this will fail and return NULL. 3646 * 3647 * The caller should call synchronize_rcu() after this. 3648 */ 3649 void ring_buffer_record_disable(struct trace_buffer *buffer) 3650 { 3651 atomic_inc(&buffer->record_disabled); 3652 } 3653 EXPORT_SYMBOL_GPL(ring_buffer_record_disable); 3654 3655 /** 3656 * ring_buffer_record_enable - enable writes to the buffer 3657 * @buffer: The ring buffer to enable writes 3658 * 3659 * Note, multiple disables will need the same number of enables 3660 * to truly enable the writing (much like preempt_disable). 3661 */ 3662 void ring_buffer_record_enable(struct trace_buffer *buffer) 3663 { 3664 atomic_dec(&buffer->record_disabled); 3665 } 3666 EXPORT_SYMBOL_GPL(ring_buffer_record_enable); 3667 3668 /** 3669 * ring_buffer_record_off - stop all writes into the buffer 3670 * @buffer: The ring buffer to stop writes to. 3671 * 3672 * This prevents all writes to the buffer. Any attempt to write 3673 * to the buffer after this will fail and return NULL. 3674 * 3675 * This is different than ring_buffer_record_disable() as 3676 * it works like an on/off switch, where as the disable() version 3677 * must be paired with a enable(). 3678 */ 3679 void ring_buffer_record_off(struct trace_buffer *buffer) 3680 { 3681 unsigned int rd; 3682 unsigned int new_rd; 3683 3684 do { 3685 rd = atomic_read(&buffer->record_disabled); 3686 new_rd = rd | RB_BUFFER_OFF; 3687 } while (atomic_cmpxchg(&buffer->record_disabled, rd, new_rd) != rd); 3688 } 3689 EXPORT_SYMBOL_GPL(ring_buffer_record_off); 3690 3691 /** 3692 * ring_buffer_record_on - restart writes into the buffer 3693 * @buffer: The ring buffer to start writes to. 3694 * 3695 * This enables all writes to the buffer that was disabled by 3696 * ring_buffer_record_off(). 3697 * 3698 * This is different than ring_buffer_record_enable() as 3699 * it works like an on/off switch, where as the enable() version 3700 * must be paired with a disable(). 3701 */ 3702 void ring_buffer_record_on(struct trace_buffer *buffer) 3703 { 3704 unsigned int rd; 3705 unsigned int new_rd; 3706 3707 do { 3708 rd = atomic_read(&buffer->record_disabled); 3709 new_rd = rd & ~RB_BUFFER_OFF; 3710 } while (atomic_cmpxchg(&buffer->record_disabled, rd, new_rd) != rd); 3711 } 3712 EXPORT_SYMBOL_GPL(ring_buffer_record_on); 3713 3714 /** 3715 * ring_buffer_record_is_on - return true if the ring buffer can write 3716 * @buffer: The ring buffer to see if write is enabled 3717 * 3718 * Returns true if the ring buffer is in a state that it accepts writes. 3719 */ 3720 bool ring_buffer_record_is_on(struct trace_buffer *buffer) 3721 { 3722 return !atomic_read(&buffer->record_disabled); 3723 } 3724 3725 /** 3726 * ring_buffer_record_is_set_on - return true if the ring buffer is set writable 3727 * @buffer: The ring buffer to see if write is set enabled 3728 * 3729 * Returns true if the ring buffer is set writable by ring_buffer_record_on(). 3730 * Note that this does NOT mean it is in a writable state. 3731 * 3732 * It may return true when the ring buffer has been disabled by 3733 * ring_buffer_record_disable(), as that is a temporary disabling of 3734 * the ring buffer. 3735 */ 3736 bool ring_buffer_record_is_set_on(struct trace_buffer *buffer) 3737 { 3738 return !(atomic_read(&buffer->record_disabled) & RB_BUFFER_OFF); 3739 } 3740 3741 /** 3742 * ring_buffer_record_disable_cpu - stop all writes into the cpu_buffer 3743 * @buffer: The ring buffer to stop writes to. 3744 * @cpu: The CPU buffer to stop 3745 * 3746 * This prevents all writes to the buffer. Any attempt to write 3747 * to the buffer after this will fail and return NULL. 3748 * 3749 * The caller should call synchronize_rcu() after this. 3750 */ 3751 void ring_buffer_record_disable_cpu(struct trace_buffer *buffer, int cpu) 3752 { 3753 struct ring_buffer_per_cpu *cpu_buffer; 3754 3755 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 3756 return; 3757 3758 cpu_buffer = buffer->buffers[cpu]; 3759 atomic_inc(&cpu_buffer->record_disabled); 3760 } 3761 EXPORT_SYMBOL_GPL(ring_buffer_record_disable_cpu); 3762 3763 /** 3764 * ring_buffer_record_enable_cpu - enable writes to the buffer 3765 * @buffer: The ring buffer to enable writes 3766 * @cpu: The CPU to enable. 3767 * 3768 * Note, multiple disables will need the same number of enables 3769 * to truly enable the writing (much like preempt_disable). 3770 */ 3771 void ring_buffer_record_enable_cpu(struct trace_buffer *buffer, int cpu) 3772 { 3773 struct ring_buffer_per_cpu *cpu_buffer; 3774 3775 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 3776 return; 3777 3778 cpu_buffer = buffer->buffers[cpu]; 3779 atomic_dec(&cpu_buffer->record_disabled); 3780 } 3781 EXPORT_SYMBOL_GPL(ring_buffer_record_enable_cpu); 3782 3783 /* 3784 * The total entries in the ring buffer is the running counter 3785 * of entries entered into the ring buffer, minus the sum of 3786 * the entries read from the ring buffer and the number of 3787 * entries that were overwritten. 3788 */ 3789 static inline unsigned long 3790 rb_num_of_entries(struct ring_buffer_per_cpu *cpu_buffer) 3791 { 3792 return local_read(&cpu_buffer->entries) - 3793 (local_read(&cpu_buffer->overrun) + cpu_buffer->read); 3794 } 3795 3796 /** 3797 * ring_buffer_oldest_event_ts - get the oldest event timestamp from the buffer 3798 * @buffer: The ring buffer 3799 * @cpu: The per CPU buffer to read from. 3800 */ 3801 u64 ring_buffer_oldest_event_ts(struct trace_buffer *buffer, int cpu) 3802 { 3803 unsigned long flags; 3804 struct ring_buffer_per_cpu *cpu_buffer; 3805 struct buffer_page *bpage; 3806 u64 ret = 0; 3807 3808 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 3809 return 0; 3810 3811 cpu_buffer = buffer->buffers[cpu]; 3812 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 3813 /* 3814 * if the tail is on reader_page, oldest time stamp is on the reader 3815 * page 3816 */ 3817 if (cpu_buffer->tail_page == cpu_buffer->reader_page) 3818 bpage = cpu_buffer->reader_page; 3819 else 3820 bpage = rb_set_head_page(cpu_buffer); 3821 if (bpage) 3822 ret = bpage->page->time_stamp; 3823 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 3824 3825 return ret; 3826 } 3827 EXPORT_SYMBOL_GPL(ring_buffer_oldest_event_ts); 3828 3829 /** 3830 * ring_buffer_bytes_cpu - get the number of bytes consumed in a cpu buffer 3831 * @buffer: The ring buffer 3832 * @cpu: The per CPU buffer to read from. 3833 */ 3834 unsigned long ring_buffer_bytes_cpu(struct trace_buffer *buffer, int cpu) 3835 { 3836 struct ring_buffer_per_cpu *cpu_buffer; 3837 unsigned long ret; 3838 3839 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 3840 return 0; 3841 3842 cpu_buffer = buffer->buffers[cpu]; 3843 ret = local_read(&cpu_buffer->entries_bytes) - cpu_buffer->read_bytes; 3844 3845 return ret; 3846 } 3847 EXPORT_SYMBOL_GPL(ring_buffer_bytes_cpu); 3848 3849 /** 3850 * ring_buffer_entries_cpu - get the number of entries in a cpu buffer 3851 * @buffer: The ring buffer 3852 * @cpu: The per CPU buffer to get the entries from. 3853 */ 3854 unsigned long ring_buffer_entries_cpu(struct trace_buffer *buffer, int cpu) 3855 { 3856 struct ring_buffer_per_cpu *cpu_buffer; 3857 3858 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 3859 return 0; 3860 3861 cpu_buffer = buffer->buffers[cpu]; 3862 3863 return rb_num_of_entries(cpu_buffer); 3864 } 3865 EXPORT_SYMBOL_GPL(ring_buffer_entries_cpu); 3866 3867 /** 3868 * ring_buffer_overrun_cpu - get the number of overruns caused by the ring 3869 * buffer wrapping around (only if RB_FL_OVERWRITE is on). 3870 * @buffer: The ring buffer 3871 * @cpu: The per CPU buffer to get the number of overruns from 3872 */ 3873 unsigned long ring_buffer_overrun_cpu(struct trace_buffer *buffer, int cpu) 3874 { 3875 struct ring_buffer_per_cpu *cpu_buffer; 3876 unsigned long ret; 3877 3878 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 3879 return 0; 3880 3881 cpu_buffer = buffer->buffers[cpu]; 3882 ret = local_read(&cpu_buffer->overrun); 3883 3884 return ret; 3885 } 3886 EXPORT_SYMBOL_GPL(ring_buffer_overrun_cpu); 3887 3888 /** 3889 * ring_buffer_commit_overrun_cpu - get the number of overruns caused by 3890 * commits failing due to the buffer wrapping around while there are uncommitted 3891 * events, such as during an interrupt storm. 3892 * @buffer: The ring buffer 3893 * @cpu: The per CPU buffer to get the number of overruns from 3894 */ 3895 unsigned long 3896 ring_buffer_commit_overrun_cpu(struct trace_buffer *buffer, int cpu) 3897 { 3898 struct ring_buffer_per_cpu *cpu_buffer; 3899 unsigned long ret; 3900 3901 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 3902 return 0; 3903 3904 cpu_buffer = buffer->buffers[cpu]; 3905 ret = local_read(&cpu_buffer->commit_overrun); 3906 3907 return ret; 3908 } 3909 EXPORT_SYMBOL_GPL(ring_buffer_commit_overrun_cpu); 3910 3911 /** 3912 * ring_buffer_dropped_events_cpu - get the number of dropped events caused by 3913 * the ring buffer filling up (only if RB_FL_OVERWRITE is off). 3914 * @buffer: The ring buffer 3915 * @cpu: The per CPU buffer to get the number of overruns from 3916 */ 3917 unsigned long 3918 ring_buffer_dropped_events_cpu(struct trace_buffer *buffer, int cpu) 3919 { 3920 struct ring_buffer_per_cpu *cpu_buffer; 3921 unsigned long ret; 3922 3923 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 3924 return 0; 3925 3926 cpu_buffer = buffer->buffers[cpu]; 3927 ret = local_read(&cpu_buffer->dropped_events); 3928 3929 return ret; 3930 } 3931 EXPORT_SYMBOL_GPL(ring_buffer_dropped_events_cpu); 3932 3933 /** 3934 * ring_buffer_read_events_cpu - get the number of events successfully read 3935 * @buffer: The ring buffer 3936 * @cpu: The per CPU buffer to get the number of events read 3937 */ 3938 unsigned long 3939 ring_buffer_read_events_cpu(struct trace_buffer *buffer, int cpu) 3940 { 3941 struct ring_buffer_per_cpu *cpu_buffer; 3942 3943 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 3944 return 0; 3945 3946 cpu_buffer = buffer->buffers[cpu]; 3947 return cpu_buffer->read; 3948 } 3949 EXPORT_SYMBOL_GPL(ring_buffer_read_events_cpu); 3950 3951 /** 3952 * ring_buffer_entries - get the number of entries in a buffer 3953 * @buffer: The ring buffer 3954 * 3955 * Returns the total number of entries in the ring buffer 3956 * (all CPU entries) 3957 */ 3958 unsigned long ring_buffer_entries(struct trace_buffer *buffer) 3959 { 3960 struct ring_buffer_per_cpu *cpu_buffer; 3961 unsigned long entries = 0; 3962 int cpu; 3963 3964 /* if you care about this being correct, lock the buffer */ 3965 for_each_buffer_cpu(buffer, cpu) { 3966 cpu_buffer = buffer->buffers[cpu]; 3967 entries += rb_num_of_entries(cpu_buffer); 3968 } 3969 3970 return entries; 3971 } 3972 EXPORT_SYMBOL_GPL(ring_buffer_entries); 3973 3974 /** 3975 * ring_buffer_overruns - get the number of overruns in buffer 3976 * @buffer: The ring buffer 3977 * 3978 * Returns the total number of overruns in the ring buffer 3979 * (all CPU entries) 3980 */ 3981 unsigned long ring_buffer_overruns(struct trace_buffer *buffer) 3982 { 3983 struct ring_buffer_per_cpu *cpu_buffer; 3984 unsigned long overruns = 0; 3985 int cpu; 3986 3987 /* if you care about this being correct, lock the buffer */ 3988 for_each_buffer_cpu(buffer, cpu) { 3989 cpu_buffer = buffer->buffers[cpu]; 3990 overruns += local_read(&cpu_buffer->overrun); 3991 } 3992 3993 return overruns; 3994 } 3995 EXPORT_SYMBOL_GPL(ring_buffer_overruns); 3996 3997 static void rb_iter_reset(struct ring_buffer_iter *iter) 3998 { 3999 struct ring_buffer_per_cpu *cpu_buffer = iter->cpu_buffer; 4000 4001 /* Iterator usage is expected to have record disabled */ 4002 iter->head_page = cpu_buffer->reader_page; 4003 iter->head = cpu_buffer->reader_page->read; 4004 iter->next_event = iter->head; 4005 4006 iter->cache_reader_page = iter->head_page; 4007 iter->cache_read = cpu_buffer->read; 4008 4009 if (iter->head) { 4010 iter->read_stamp = cpu_buffer->read_stamp; 4011 iter->page_stamp = cpu_buffer->reader_page->page->time_stamp; 4012 } else { 4013 iter->read_stamp = iter->head_page->page->time_stamp; 4014 iter->page_stamp = iter->read_stamp; 4015 } 4016 } 4017 4018 /** 4019 * ring_buffer_iter_reset - reset an iterator 4020 * @iter: The iterator to reset 4021 * 4022 * Resets the iterator, so that it will start from the beginning 4023 * again. 4024 */ 4025 void ring_buffer_iter_reset(struct ring_buffer_iter *iter) 4026 { 4027 struct ring_buffer_per_cpu *cpu_buffer; 4028 unsigned long flags; 4029 4030 if (!iter) 4031 return; 4032 4033 cpu_buffer = iter->cpu_buffer; 4034 4035 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 4036 rb_iter_reset(iter); 4037 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 4038 } 4039 EXPORT_SYMBOL_GPL(ring_buffer_iter_reset); 4040 4041 /** 4042 * ring_buffer_iter_empty - check if an iterator has no more to read 4043 * @iter: The iterator to check 4044 */ 4045 int ring_buffer_iter_empty(struct ring_buffer_iter *iter) 4046 { 4047 struct ring_buffer_per_cpu *cpu_buffer; 4048 struct buffer_page *reader; 4049 struct buffer_page *head_page; 4050 struct buffer_page *commit_page; 4051 struct buffer_page *curr_commit_page; 4052 unsigned commit; 4053 u64 curr_commit_ts; 4054 u64 commit_ts; 4055 4056 cpu_buffer = iter->cpu_buffer; 4057 reader = cpu_buffer->reader_page; 4058 head_page = cpu_buffer->head_page; 4059 commit_page = cpu_buffer->commit_page; 4060 commit_ts = commit_page->page->time_stamp; 4061 4062 /* 4063 * When the writer goes across pages, it issues a cmpxchg which 4064 * is a mb(), which will synchronize with the rmb here. 4065 * (see rb_tail_page_update()) 4066 */ 4067 smp_rmb(); 4068 commit = rb_page_commit(commit_page); 4069 /* We want to make sure that the commit page doesn't change */ 4070 smp_rmb(); 4071 4072 /* Make sure commit page didn't change */ 4073 curr_commit_page = READ_ONCE(cpu_buffer->commit_page); 4074 curr_commit_ts = READ_ONCE(curr_commit_page->page->time_stamp); 4075 4076 /* If the commit page changed, then there's more data */ 4077 if (curr_commit_page != commit_page || 4078 curr_commit_ts != commit_ts) 4079 return 0; 4080 4081 /* Still racy, as it may return a false positive, but that's OK */ 4082 return ((iter->head_page == commit_page && iter->head >= commit) || 4083 (iter->head_page == reader && commit_page == head_page && 4084 head_page->read == commit && 4085 iter->head == rb_page_commit(cpu_buffer->reader_page))); 4086 } 4087 EXPORT_SYMBOL_GPL(ring_buffer_iter_empty); 4088 4089 static void 4090 rb_update_read_stamp(struct ring_buffer_per_cpu *cpu_buffer, 4091 struct ring_buffer_event *event) 4092 { 4093 u64 delta; 4094 4095 switch (event->type_len) { 4096 case RINGBUF_TYPE_PADDING: 4097 return; 4098 4099 case RINGBUF_TYPE_TIME_EXTEND: 4100 delta = ring_buffer_event_time_stamp(event); 4101 cpu_buffer->read_stamp += delta; 4102 return; 4103 4104 case RINGBUF_TYPE_TIME_STAMP: 4105 delta = ring_buffer_event_time_stamp(event); 4106 cpu_buffer->read_stamp = delta; 4107 return; 4108 4109 case RINGBUF_TYPE_DATA: 4110 cpu_buffer->read_stamp += event->time_delta; 4111 return; 4112 4113 default: 4114 RB_WARN_ON(cpu_buffer, 1); 4115 } 4116 return; 4117 } 4118 4119 static void 4120 rb_update_iter_read_stamp(struct ring_buffer_iter *iter, 4121 struct ring_buffer_event *event) 4122 { 4123 u64 delta; 4124 4125 switch (event->type_len) { 4126 case RINGBUF_TYPE_PADDING: 4127 return; 4128 4129 case RINGBUF_TYPE_TIME_EXTEND: 4130 delta = ring_buffer_event_time_stamp(event); 4131 iter->read_stamp += delta; 4132 return; 4133 4134 case RINGBUF_TYPE_TIME_STAMP: 4135 delta = ring_buffer_event_time_stamp(event); 4136 iter->read_stamp = delta; 4137 return; 4138 4139 case RINGBUF_TYPE_DATA: 4140 iter->read_stamp += event->time_delta; 4141 return; 4142 4143 default: 4144 RB_WARN_ON(iter->cpu_buffer, 1); 4145 } 4146 return; 4147 } 4148 4149 static struct buffer_page * 4150 rb_get_reader_page(struct ring_buffer_per_cpu *cpu_buffer) 4151 { 4152 struct buffer_page *reader = NULL; 4153 unsigned long overwrite; 4154 unsigned long flags; 4155 int nr_loops = 0; 4156 int ret; 4157 4158 local_irq_save(flags); 4159 arch_spin_lock(&cpu_buffer->lock); 4160 4161 again: 4162 /* 4163 * This should normally only loop twice. But because the 4164 * start of the reader inserts an empty page, it causes 4165 * a case where we will loop three times. There should be no 4166 * reason to loop four times (that I know of). 4167 */ 4168 if (RB_WARN_ON(cpu_buffer, ++nr_loops > 3)) { 4169 reader = NULL; 4170 goto out; 4171 } 4172 4173 reader = cpu_buffer->reader_page; 4174 4175 /* If there's more to read, return this page */ 4176 if (cpu_buffer->reader_page->read < rb_page_size(reader)) 4177 goto out; 4178 4179 /* Never should we have an index greater than the size */ 4180 if (RB_WARN_ON(cpu_buffer, 4181 cpu_buffer->reader_page->read > rb_page_size(reader))) 4182 goto out; 4183 4184 /* check if we caught up to the tail */ 4185 reader = NULL; 4186 if (cpu_buffer->commit_page == cpu_buffer->reader_page) 4187 goto out; 4188 4189 /* Don't bother swapping if the ring buffer is empty */ 4190 if (rb_num_of_entries(cpu_buffer) == 0) 4191 goto out; 4192 4193 /* 4194 * Reset the reader page to size zero. 4195 */ 4196 local_set(&cpu_buffer->reader_page->write, 0); 4197 local_set(&cpu_buffer->reader_page->entries, 0); 4198 local_set(&cpu_buffer->reader_page->page->commit, 0); 4199 cpu_buffer->reader_page->real_end = 0; 4200 4201 spin: 4202 /* 4203 * Splice the empty reader page into the list around the head. 4204 */ 4205 reader = rb_set_head_page(cpu_buffer); 4206 if (!reader) 4207 goto out; 4208 cpu_buffer->reader_page->list.next = rb_list_head(reader->list.next); 4209 cpu_buffer->reader_page->list.prev = reader->list.prev; 4210 4211 /* 4212 * cpu_buffer->pages just needs to point to the buffer, it 4213 * has no specific buffer page to point to. Lets move it out 4214 * of our way so we don't accidentally swap it. 4215 */ 4216 cpu_buffer->pages = reader->list.prev; 4217 4218 /* The reader page will be pointing to the new head */ 4219 rb_set_list_to_head(cpu_buffer, &cpu_buffer->reader_page->list); 4220 4221 /* 4222 * We want to make sure we read the overruns after we set up our 4223 * pointers to the next object. The writer side does a 4224 * cmpxchg to cross pages which acts as the mb on the writer 4225 * side. Note, the reader will constantly fail the swap 4226 * while the writer is updating the pointers, so this 4227 * guarantees that the overwrite recorded here is the one we 4228 * want to compare with the last_overrun. 4229 */ 4230 smp_mb(); 4231 overwrite = local_read(&(cpu_buffer->overrun)); 4232 4233 /* 4234 * Here's the tricky part. 4235 * 4236 * We need to move the pointer past the header page. 4237 * But we can only do that if a writer is not currently 4238 * moving it. The page before the header page has the 4239 * flag bit '1' set if it is pointing to the page we want. 4240 * but if the writer is in the process of moving it 4241 * than it will be '2' or already moved '0'. 4242 */ 4243 4244 ret = rb_head_page_replace(reader, cpu_buffer->reader_page); 4245 4246 /* 4247 * If we did not convert it, then we must try again. 4248 */ 4249 if (!ret) 4250 goto spin; 4251 4252 /* 4253 * Yay! We succeeded in replacing the page. 4254 * 4255 * Now make the new head point back to the reader page. 4256 */ 4257 rb_list_head(reader->list.next)->prev = &cpu_buffer->reader_page->list; 4258 rb_inc_page(cpu_buffer, &cpu_buffer->head_page); 4259 4260 local_inc(&cpu_buffer->pages_read); 4261 4262 /* Finally update the reader page to the new head */ 4263 cpu_buffer->reader_page = reader; 4264 cpu_buffer->reader_page->read = 0; 4265 4266 if (overwrite != cpu_buffer->last_overrun) { 4267 cpu_buffer->lost_events = overwrite - cpu_buffer->last_overrun; 4268 cpu_buffer->last_overrun = overwrite; 4269 } 4270 4271 goto again; 4272 4273 out: 4274 /* Update the read_stamp on the first event */ 4275 if (reader && reader->read == 0) 4276 cpu_buffer->read_stamp = reader->page->time_stamp; 4277 4278 arch_spin_unlock(&cpu_buffer->lock); 4279 local_irq_restore(flags); 4280 4281 return reader; 4282 } 4283 4284 static void rb_advance_reader(struct ring_buffer_per_cpu *cpu_buffer) 4285 { 4286 struct ring_buffer_event *event; 4287 struct buffer_page *reader; 4288 unsigned length; 4289 4290 reader = rb_get_reader_page(cpu_buffer); 4291 4292 /* This function should not be called when buffer is empty */ 4293 if (RB_WARN_ON(cpu_buffer, !reader)) 4294 return; 4295 4296 event = rb_reader_event(cpu_buffer); 4297 4298 if (event->type_len <= RINGBUF_TYPE_DATA_TYPE_LEN_MAX) 4299 cpu_buffer->read++; 4300 4301 rb_update_read_stamp(cpu_buffer, event); 4302 4303 length = rb_event_length(event); 4304 cpu_buffer->reader_page->read += length; 4305 } 4306 4307 static void rb_advance_iter(struct ring_buffer_iter *iter) 4308 { 4309 struct ring_buffer_per_cpu *cpu_buffer; 4310 4311 cpu_buffer = iter->cpu_buffer; 4312 4313 /* If head == next_event then we need to jump to the next event */ 4314 if (iter->head == iter->next_event) { 4315 /* If the event gets overwritten again, there's nothing to do */ 4316 if (rb_iter_head_event(iter) == NULL) 4317 return; 4318 } 4319 4320 iter->head = iter->next_event; 4321 4322 /* 4323 * Check if we are at the end of the buffer. 4324 */ 4325 if (iter->next_event >= rb_page_size(iter->head_page)) { 4326 /* discarded commits can make the page empty */ 4327 if (iter->head_page == cpu_buffer->commit_page) 4328 return; 4329 rb_inc_iter(iter); 4330 return; 4331 } 4332 4333 rb_update_iter_read_stamp(iter, iter->event); 4334 } 4335 4336 static int rb_lost_events(struct ring_buffer_per_cpu *cpu_buffer) 4337 { 4338 return cpu_buffer->lost_events; 4339 } 4340 4341 static struct ring_buffer_event * 4342 rb_buffer_peek(struct ring_buffer_per_cpu *cpu_buffer, u64 *ts, 4343 unsigned long *lost_events) 4344 { 4345 struct ring_buffer_event *event; 4346 struct buffer_page *reader; 4347 int nr_loops = 0; 4348 4349 if (ts) 4350 *ts = 0; 4351 again: 4352 /* 4353 * We repeat when a time extend is encountered. 4354 * Since the time extend is always attached to a data event, 4355 * we should never loop more than once. 4356 * (We never hit the following condition more than twice). 4357 */ 4358 if (RB_WARN_ON(cpu_buffer, ++nr_loops > 2)) 4359 return NULL; 4360 4361 reader = rb_get_reader_page(cpu_buffer); 4362 if (!reader) 4363 return NULL; 4364 4365 event = rb_reader_event(cpu_buffer); 4366 4367 switch (event->type_len) { 4368 case RINGBUF_TYPE_PADDING: 4369 if (rb_null_event(event)) 4370 RB_WARN_ON(cpu_buffer, 1); 4371 /* 4372 * Because the writer could be discarding every 4373 * event it creates (which would probably be bad) 4374 * if we were to go back to "again" then we may never 4375 * catch up, and will trigger the warn on, or lock 4376 * the box. Return the padding, and we will release 4377 * the current locks, and try again. 4378 */ 4379 return event; 4380 4381 case RINGBUF_TYPE_TIME_EXTEND: 4382 /* Internal data, OK to advance */ 4383 rb_advance_reader(cpu_buffer); 4384 goto again; 4385 4386 case RINGBUF_TYPE_TIME_STAMP: 4387 if (ts) { 4388 *ts = ring_buffer_event_time_stamp(event); 4389 ring_buffer_normalize_time_stamp(cpu_buffer->buffer, 4390 cpu_buffer->cpu, ts); 4391 } 4392 /* Internal data, OK to advance */ 4393 rb_advance_reader(cpu_buffer); 4394 goto again; 4395 4396 case RINGBUF_TYPE_DATA: 4397 if (ts && !(*ts)) { 4398 *ts = cpu_buffer->read_stamp + event->time_delta; 4399 ring_buffer_normalize_time_stamp(cpu_buffer->buffer, 4400 cpu_buffer->cpu, ts); 4401 } 4402 if (lost_events) 4403 *lost_events = rb_lost_events(cpu_buffer); 4404 return event; 4405 4406 default: 4407 RB_WARN_ON(cpu_buffer, 1); 4408 } 4409 4410 return NULL; 4411 } 4412 EXPORT_SYMBOL_GPL(ring_buffer_peek); 4413 4414 static struct ring_buffer_event * 4415 rb_iter_peek(struct ring_buffer_iter *iter, u64 *ts) 4416 { 4417 struct trace_buffer *buffer; 4418 struct ring_buffer_per_cpu *cpu_buffer; 4419 struct ring_buffer_event *event; 4420 int nr_loops = 0; 4421 4422 if (ts) 4423 *ts = 0; 4424 4425 cpu_buffer = iter->cpu_buffer; 4426 buffer = cpu_buffer->buffer; 4427 4428 /* 4429 * Check if someone performed a consuming read to 4430 * the buffer. A consuming read invalidates the iterator 4431 * and we need to reset the iterator in this case. 4432 */ 4433 if (unlikely(iter->cache_read != cpu_buffer->read || 4434 iter->cache_reader_page != cpu_buffer->reader_page)) 4435 rb_iter_reset(iter); 4436 4437 again: 4438 if (ring_buffer_iter_empty(iter)) 4439 return NULL; 4440 4441 /* 4442 * As the writer can mess with what the iterator is trying 4443 * to read, just give up if we fail to get an event after 4444 * three tries. The iterator is not as reliable when reading 4445 * the ring buffer with an active write as the consumer is. 4446 * Do not warn if the three failures is reached. 4447 */ 4448 if (++nr_loops > 3) 4449 return NULL; 4450 4451 if (rb_per_cpu_empty(cpu_buffer)) 4452 return NULL; 4453 4454 if (iter->head >= rb_page_size(iter->head_page)) { 4455 rb_inc_iter(iter); 4456 goto again; 4457 } 4458 4459 event = rb_iter_head_event(iter); 4460 if (!event) 4461 goto again; 4462 4463 switch (event->type_len) { 4464 case RINGBUF_TYPE_PADDING: 4465 if (rb_null_event(event)) { 4466 rb_inc_iter(iter); 4467 goto again; 4468 } 4469 rb_advance_iter(iter); 4470 return event; 4471 4472 case RINGBUF_TYPE_TIME_EXTEND: 4473 /* Internal data, OK to advance */ 4474 rb_advance_iter(iter); 4475 goto again; 4476 4477 case RINGBUF_TYPE_TIME_STAMP: 4478 if (ts) { 4479 *ts = ring_buffer_event_time_stamp(event); 4480 ring_buffer_normalize_time_stamp(cpu_buffer->buffer, 4481 cpu_buffer->cpu, ts); 4482 } 4483 /* Internal data, OK to advance */ 4484 rb_advance_iter(iter); 4485 goto again; 4486 4487 case RINGBUF_TYPE_DATA: 4488 if (ts && !(*ts)) { 4489 *ts = iter->read_stamp + event->time_delta; 4490 ring_buffer_normalize_time_stamp(buffer, 4491 cpu_buffer->cpu, ts); 4492 } 4493 return event; 4494 4495 default: 4496 RB_WARN_ON(cpu_buffer, 1); 4497 } 4498 4499 return NULL; 4500 } 4501 EXPORT_SYMBOL_GPL(ring_buffer_iter_peek); 4502 4503 static inline bool rb_reader_lock(struct ring_buffer_per_cpu *cpu_buffer) 4504 { 4505 if (likely(!in_nmi())) { 4506 raw_spin_lock(&cpu_buffer->reader_lock); 4507 return true; 4508 } 4509 4510 /* 4511 * If an NMI die dumps out the content of the ring buffer 4512 * trylock must be used to prevent a deadlock if the NMI 4513 * preempted a task that holds the ring buffer locks. If 4514 * we get the lock then all is fine, if not, then continue 4515 * to do the read, but this can corrupt the ring buffer, 4516 * so it must be permanently disabled from future writes. 4517 * Reading from NMI is a oneshot deal. 4518 */ 4519 if (raw_spin_trylock(&cpu_buffer->reader_lock)) 4520 return true; 4521 4522 /* Continue without locking, but disable the ring buffer */ 4523 atomic_inc(&cpu_buffer->record_disabled); 4524 return false; 4525 } 4526 4527 static inline void 4528 rb_reader_unlock(struct ring_buffer_per_cpu *cpu_buffer, bool locked) 4529 { 4530 if (likely(locked)) 4531 raw_spin_unlock(&cpu_buffer->reader_lock); 4532 return; 4533 } 4534 4535 /** 4536 * ring_buffer_peek - peek at the next event to be read 4537 * @buffer: The ring buffer to read 4538 * @cpu: The cpu to peak at 4539 * @ts: The timestamp counter of this event. 4540 * @lost_events: a variable to store if events were lost (may be NULL) 4541 * 4542 * This will return the event that will be read next, but does 4543 * not consume the data. 4544 */ 4545 struct ring_buffer_event * 4546 ring_buffer_peek(struct trace_buffer *buffer, int cpu, u64 *ts, 4547 unsigned long *lost_events) 4548 { 4549 struct ring_buffer_per_cpu *cpu_buffer = buffer->buffers[cpu]; 4550 struct ring_buffer_event *event; 4551 unsigned long flags; 4552 bool dolock; 4553 4554 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 4555 return NULL; 4556 4557 again: 4558 local_irq_save(flags); 4559 dolock = rb_reader_lock(cpu_buffer); 4560 event = rb_buffer_peek(cpu_buffer, ts, lost_events); 4561 if (event && event->type_len == RINGBUF_TYPE_PADDING) 4562 rb_advance_reader(cpu_buffer); 4563 rb_reader_unlock(cpu_buffer, dolock); 4564 local_irq_restore(flags); 4565 4566 if (event && event->type_len == RINGBUF_TYPE_PADDING) 4567 goto again; 4568 4569 return event; 4570 } 4571 4572 /** ring_buffer_iter_dropped - report if there are dropped events 4573 * @iter: The ring buffer iterator 4574 * 4575 * Returns true if there was dropped events since the last peek. 4576 */ 4577 bool ring_buffer_iter_dropped(struct ring_buffer_iter *iter) 4578 { 4579 bool ret = iter->missed_events != 0; 4580 4581 iter->missed_events = 0; 4582 return ret; 4583 } 4584 EXPORT_SYMBOL_GPL(ring_buffer_iter_dropped); 4585 4586 /** 4587 * ring_buffer_iter_peek - peek at the next event to be read 4588 * @iter: The ring buffer iterator 4589 * @ts: The timestamp counter of this event. 4590 * 4591 * This will return the event that will be read next, but does 4592 * not increment the iterator. 4593 */ 4594 struct ring_buffer_event * 4595 ring_buffer_iter_peek(struct ring_buffer_iter *iter, u64 *ts) 4596 { 4597 struct ring_buffer_per_cpu *cpu_buffer = iter->cpu_buffer; 4598 struct ring_buffer_event *event; 4599 unsigned long flags; 4600 4601 again: 4602 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 4603 event = rb_iter_peek(iter, ts); 4604 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 4605 4606 if (event && event->type_len == RINGBUF_TYPE_PADDING) 4607 goto again; 4608 4609 return event; 4610 } 4611 4612 /** 4613 * ring_buffer_consume - return an event and consume it 4614 * @buffer: The ring buffer to get the next event from 4615 * @cpu: the cpu to read the buffer from 4616 * @ts: a variable to store the timestamp (may be NULL) 4617 * @lost_events: a variable to store if events were lost (may be NULL) 4618 * 4619 * Returns the next event in the ring buffer, and that event is consumed. 4620 * Meaning, that sequential reads will keep returning a different event, 4621 * and eventually empty the ring buffer if the producer is slower. 4622 */ 4623 struct ring_buffer_event * 4624 ring_buffer_consume(struct trace_buffer *buffer, int cpu, u64 *ts, 4625 unsigned long *lost_events) 4626 { 4627 struct ring_buffer_per_cpu *cpu_buffer; 4628 struct ring_buffer_event *event = NULL; 4629 unsigned long flags; 4630 bool dolock; 4631 4632 again: 4633 /* might be called in atomic */ 4634 preempt_disable(); 4635 4636 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 4637 goto out; 4638 4639 cpu_buffer = buffer->buffers[cpu]; 4640 local_irq_save(flags); 4641 dolock = rb_reader_lock(cpu_buffer); 4642 4643 event = rb_buffer_peek(cpu_buffer, ts, lost_events); 4644 if (event) { 4645 cpu_buffer->lost_events = 0; 4646 rb_advance_reader(cpu_buffer); 4647 } 4648 4649 rb_reader_unlock(cpu_buffer, dolock); 4650 local_irq_restore(flags); 4651 4652 out: 4653 preempt_enable(); 4654 4655 if (event && event->type_len == RINGBUF_TYPE_PADDING) 4656 goto again; 4657 4658 return event; 4659 } 4660 EXPORT_SYMBOL_GPL(ring_buffer_consume); 4661 4662 /** 4663 * ring_buffer_read_prepare - Prepare for a non consuming read of the buffer 4664 * @buffer: The ring buffer to read from 4665 * @cpu: The cpu buffer to iterate over 4666 * @flags: gfp flags to use for memory allocation 4667 * 4668 * This performs the initial preparations necessary to iterate 4669 * through the buffer. Memory is allocated, buffer recording 4670 * is disabled, and the iterator pointer is returned to the caller. 4671 * 4672 * Disabling buffer recording prevents the reading from being 4673 * corrupted. This is not a consuming read, so a producer is not 4674 * expected. 4675 * 4676 * After a sequence of ring_buffer_read_prepare calls, the user is 4677 * expected to make at least one call to ring_buffer_read_prepare_sync. 4678 * Afterwards, ring_buffer_read_start is invoked to get things going 4679 * for real. 4680 * 4681 * This overall must be paired with ring_buffer_read_finish. 4682 */ 4683 struct ring_buffer_iter * 4684 ring_buffer_read_prepare(struct trace_buffer *buffer, int cpu, gfp_t flags) 4685 { 4686 struct ring_buffer_per_cpu *cpu_buffer; 4687 struct ring_buffer_iter *iter; 4688 4689 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 4690 return NULL; 4691 4692 iter = kzalloc(sizeof(*iter), flags); 4693 if (!iter) 4694 return NULL; 4695 4696 iter->event = kmalloc(BUF_MAX_DATA_SIZE, flags); 4697 if (!iter->event) { 4698 kfree(iter); 4699 return NULL; 4700 } 4701 4702 cpu_buffer = buffer->buffers[cpu]; 4703 4704 iter->cpu_buffer = cpu_buffer; 4705 4706 atomic_inc(&cpu_buffer->resize_disabled); 4707 4708 return iter; 4709 } 4710 EXPORT_SYMBOL_GPL(ring_buffer_read_prepare); 4711 4712 /** 4713 * ring_buffer_read_prepare_sync - Synchronize a set of prepare calls 4714 * 4715 * All previously invoked ring_buffer_read_prepare calls to prepare 4716 * iterators will be synchronized. Afterwards, read_buffer_read_start 4717 * calls on those iterators are allowed. 4718 */ 4719 void 4720 ring_buffer_read_prepare_sync(void) 4721 { 4722 synchronize_rcu(); 4723 } 4724 EXPORT_SYMBOL_GPL(ring_buffer_read_prepare_sync); 4725 4726 /** 4727 * ring_buffer_read_start - start a non consuming read of the buffer 4728 * @iter: The iterator returned by ring_buffer_read_prepare 4729 * 4730 * This finalizes the startup of an iteration through the buffer. 4731 * The iterator comes from a call to ring_buffer_read_prepare and 4732 * an intervening ring_buffer_read_prepare_sync must have been 4733 * performed. 4734 * 4735 * Must be paired with ring_buffer_read_finish. 4736 */ 4737 void 4738 ring_buffer_read_start(struct ring_buffer_iter *iter) 4739 { 4740 struct ring_buffer_per_cpu *cpu_buffer; 4741 unsigned long flags; 4742 4743 if (!iter) 4744 return; 4745 4746 cpu_buffer = iter->cpu_buffer; 4747 4748 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 4749 arch_spin_lock(&cpu_buffer->lock); 4750 rb_iter_reset(iter); 4751 arch_spin_unlock(&cpu_buffer->lock); 4752 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 4753 } 4754 EXPORT_SYMBOL_GPL(ring_buffer_read_start); 4755 4756 /** 4757 * ring_buffer_read_finish - finish reading the iterator of the buffer 4758 * @iter: The iterator retrieved by ring_buffer_start 4759 * 4760 * This re-enables the recording to the buffer, and frees the 4761 * iterator. 4762 */ 4763 void 4764 ring_buffer_read_finish(struct ring_buffer_iter *iter) 4765 { 4766 struct ring_buffer_per_cpu *cpu_buffer = iter->cpu_buffer; 4767 unsigned long flags; 4768 4769 /* 4770 * Ring buffer is disabled from recording, here's a good place 4771 * to check the integrity of the ring buffer. 4772 * Must prevent readers from trying to read, as the check 4773 * clears the HEAD page and readers require it. 4774 */ 4775 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 4776 rb_check_pages(cpu_buffer); 4777 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 4778 4779 atomic_dec(&cpu_buffer->resize_disabled); 4780 kfree(iter->event); 4781 kfree(iter); 4782 } 4783 EXPORT_SYMBOL_GPL(ring_buffer_read_finish); 4784 4785 /** 4786 * ring_buffer_iter_advance - advance the iterator to the next location 4787 * @iter: The ring buffer iterator 4788 * 4789 * Move the location of the iterator such that the next read will 4790 * be the next location of the iterator. 4791 */ 4792 void ring_buffer_iter_advance(struct ring_buffer_iter *iter) 4793 { 4794 struct ring_buffer_per_cpu *cpu_buffer = iter->cpu_buffer; 4795 unsigned long flags; 4796 4797 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 4798 4799 rb_advance_iter(iter); 4800 4801 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 4802 } 4803 EXPORT_SYMBOL_GPL(ring_buffer_iter_advance); 4804 4805 /** 4806 * ring_buffer_size - return the size of the ring buffer (in bytes) 4807 * @buffer: The ring buffer. 4808 * @cpu: The CPU to get ring buffer size from. 4809 */ 4810 unsigned long ring_buffer_size(struct trace_buffer *buffer, int cpu) 4811 { 4812 /* 4813 * Earlier, this method returned 4814 * BUF_PAGE_SIZE * buffer->nr_pages 4815 * Since the nr_pages field is now removed, we have converted this to 4816 * return the per cpu buffer value. 4817 */ 4818 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 4819 return 0; 4820 4821 return BUF_PAGE_SIZE * buffer->buffers[cpu]->nr_pages; 4822 } 4823 EXPORT_SYMBOL_GPL(ring_buffer_size); 4824 4825 static void 4826 rb_reset_cpu(struct ring_buffer_per_cpu *cpu_buffer) 4827 { 4828 rb_head_page_deactivate(cpu_buffer); 4829 4830 cpu_buffer->head_page 4831 = list_entry(cpu_buffer->pages, struct buffer_page, list); 4832 local_set(&cpu_buffer->head_page->write, 0); 4833 local_set(&cpu_buffer->head_page->entries, 0); 4834 local_set(&cpu_buffer->head_page->page->commit, 0); 4835 4836 cpu_buffer->head_page->read = 0; 4837 4838 cpu_buffer->tail_page = cpu_buffer->head_page; 4839 cpu_buffer->commit_page = cpu_buffer->head_page; 4840 4841 INIT_LIST_HEAD(&cpu_buffer->reader_page->list); 4842 INIT_LIST_HEAD(&cpu_buffer->new_pages); 4843 local_set(&cpu_buffer->reader_page->write, 0); 4844 local_set(&cpu_buffer->reader_page->entries, 0); 4845 local_set(&cpu_buffer->reader_page->page->commit, 0); 4846 cpu_buffer->reader_page->read = 0; 4847 4848 local_set(&cpu_buffer->entries_bytes, 0); 4849 local_set(&cpu_buffer->overrun, 0); 4850 local_set(&cpu_buffer->commit_overrun, 0); 4851 local_set(&cpu_buffer->dropped_events, 0); 4852 local_set(&cpu_buffer->entries, 0); 4853 local_set(&cpu_buffer->committing, 0); 4854 local_set(&cpu_buffer->commits, 0); 4855 local_set(&cpu_buffer->pages_touched, 0); 4856 local_set(&cpu_buffer->pages_read, 0); 4857 cpu_buffer->last_pages_touch = 0; 4858 cpu_buffer->shortest_full = 0; 4859 cpu_buffer->read = 0; 4860 cpu_buffer->read_bytes = 0; 4861 4862 rb_time_set(&cpu_buffer->write_stamp, 0); 4863 rb_time_set(&cpu_buffer->before_stamp, 0); 4864 4865 cpu_buffer->lost_events = 0; 4866 cpu_buffer->last_overrun = 0; 4867 4868 rb_head_page_activate(cpu_buffer); 4869 } 4870 4871 /* Must have disabled the cpu buffer then done a synchronize_rcu */ 4872 static void reset_disabled_cpu_buffer(struct ring_buffer_per_cpu *cpu_buffer) 4873 { 4874 unsigned long flags; 4875 4876 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 4877 4878 if (RB_WARN_ON(cpu_buffer, local_read(&cpu_buffer->committing))) 4879 goto out; 4880 4881 arch_spin_lock(&cpu_buffer->lock); 4882 4883 rb_reset_cpu(cpu_buffer); 4884 4885 arch_spin_unlock(&cpu_buffer->lock); 4886 4887 out: 4888 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 4889 } 4890 4891 /** 4892 * ring_buffer_reset_cpu - reset a ring buffer per CPU buffer 4893 * @buffer: The ring buffer to reset a per cpu buffer of 4894 * @cpu: The CPU buffer to be reset 4895 */ 4896 void ring_buffer_reset_cpu(struct trace_buffer *buffer, int cpu) 4897 { 4898 struct ring_buffer_per_cpu *cpu_buffer = buffer->buffers[cpu]; 4899 4900 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 4901 return; 4902 4903 /* prevent another thread from changing buffer sizes */ 4904 mutex_lock(&buffer->mutex); 4905 4906 atomic_inc(&cpu_buffer->resize_disabled); 4907 atomic_inc(&cpu_buffer->record_disabled); 4908 4909 /* Make sure all commits have finished */ 4910 synchronize_rcu(); 4911 4912 reset_disabled_cpu_buffer(cpu_buffer); 4913 4914 atomic_dec(&cpu_buffer->record_disabled); 4915 atomic_dec(&cpu_buffer->resize_disabled); 4916 4917 mutex_unlock(&buffer->mutex); 4918 } 4919 EXPORT_SYMBOL_GPL(ring_buffer_reset_cpu); 4920 4921 /** 4922 * ring_buffer_reset_cpu - reset a ring buffer per CPU buffer 4923 * @buffer: The ring buffer to reset a per cpu buffer of 4924 * @cpu: The CPU buffer to be reset 4925 */ 4926 void ring_buffer_reset_online_cpus(struct trace_buffer *buffer) 4927 { 4928 struct ring_buffer_per_cpu *cpu_buffer; 4929 int cpu; 4930 4931 /* prevent another thread from changing buffer sizes */ 4932 mutex_lock(&buffer->mutex); 4933 4934 for_each_online_buffer_cpu(buffer, cpu) { 4935 cpu_buffer = buffer->buffers[cpu]; 4936 4937 atomic_inc(&cpu_buffer->resize_disabled); 4938 atomic_inc(&cpu_buffer->record_disabled); 4939 } 4940 4941 /* Make sure all commits have finished */ 4942 synchronize_rcu(); 4943 4944 for_each_online_buffer_cpu(buffer, cpu) { 4945 cpu_buffer = buffer->buffers[cpu]; 4946 4947 reset_disabled_cpu_buffer(cpu_buffer); 4948 4949 atomic_dec(&cpu_buffer->record_disabled); 4950 atomic_dec(&cpu_buffer->resize_disabled); 4951 } 4952 4953 mutex_unlock(&buffer->mutex); 4954 } 4955 4956 /** 4957 * ring_buffer_reset - reset a ring buffer 4958 * @buffer: The ring buffer to reset all cpu buffers 4959 */ 4960 void ring_buffer_reset(struct trace_buffer *buffer) 4961 { 4962 struct ring_buffer_per_cpu *cpu_buffer; 4963 int cpu; 4964 4965 for_each_buffer_cpu(buffer, cpu) { 4966 cpu_buffer = buffer->buffers[cpu]; 4967 4968 atomic_inc(&cpu_buffer->resize_disabled); 4969 atomic_inc(&cpu_buffer->record_disabled); 4970 } 4971 4972 /* Make sure all commits have finished */ 4973 synchronize_rcu(); 4974 4975 for_each_buffer_cpu(buffer, cpu) { 4976 cpu_buffer = buffer->buffers[cpu]; 4977 4978 reset_disabled_cpu_buffer(cpu_buffer); 4979 4980 atomic_dec(&cpu_buffer->record_disabled); 4981 atomic_dec(&cpu_buffer->resize_disabled); 4982 } 4983 } 4984 EXPORT_SYMBOL_GPL(ring_buffer_reset); 4985 4986 /** 4987 * rind_buffer_empty - is the ring buffer empty? 4988 * @buffer: The ring buffer to test 4989 */ 4990 bool ring_buffer_empty(struct trace_buffer *buffer) 4991 { 4992 struct ring_buffer_per_cpu *cpu_buffer; 4993 unsigned long flags; 4994 bool dolock; 4995 int cpu; 4996 int ret; 4997 4998 /* yes this is racy, but if you don't like the race, lock the buffer */ 4999 for_each_buffer_cpu(buffer, cpu) { 5000 cpu_buffer = buffer->buffers[cpu]; 5001 local_irq_save(flags); 5002 dolock = rb_reader_lock(cpu_buffer); 5003 ret = rb_per_cpu_empty(cpu_buffer); 5004 rb_reader_unlock(cpu_buffer, dolock); 5005 local_irq_restore(flags); 5006 5007 if (!ret) 5008 return false; 5009 } 5010 5011 return true; 5012 } 5013 EXPORT_SYMBOL_GPL(ring_buffer_empty); 5014 5015 /** 5016 * ring_buffer_empty_cpu - is a cpu buffer of a ring buffer empty? 5017 * @buffer: The ring buffer 5018 * @cpu: The CPU buffer to test 5019 */ 5020 bool ring_buffer_empty_cpu(struct trace_buffer *buffer, int cpu) 5021 { 5022 struct ring_buffer_per_cpu *cpu_buffer; 5023 unsigned long flags; 5024 bool dolock; 5025 int ret; 5026 5027 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 5028 return true; 5029 5030 cpu_buffer = buffer->buffers[cpu]; 5031 local_irq_save(flags); 5032 dolock = rb_reader_lock(cpu_buffer); 5033 ret = rb_per_cpu_empty(cpu_buffer); 5034 rb_reader_unlock(cpu_buffer, dolock); 5035 local_irq_restore(flags); 5036 5037 return ret; 5038 } 5039 EXPORT_SYMBOL_GPL(ring_buffer_empty_cpu); 5040 5041 #ifdef CONFIG_RING_BUFFER_ALLOW_SWAP 5042 /** 5043 * ring_buffer_swap_cpu - swap a CPU buffer between two ring buffers 5044 * @buffer_a: One buffer to swap with 5045 * @buffer_b: The other buffer to swap with 5046 * @cpu: the CPU of the buffers to swap 5047 * 5048 * This function is useful for tracers that want to take a "snapshot" 5049 * of a CPU buffer and has another back up buffer lying around. 5050 * it is expected that the tracer handles the cpu buffer not being 5051 * used at the moment. 5052 */ 5053 int ring_buffer_swap_cpu(struct trace_buffer *buffer_a, 5054 struct trace_buffer *buffer_b, int cpu) 5055 { 5056 struct ring_buffer_per_cpu *cpu_buffer_a; 5057 struct ring_buffer_per_cpu *cpu_buffer_b; 5058 int ret = -EINVAL; 5059 5060 if (!cpumask_test_cpu(cpu, buffer_a->cpumask) || 5061 !cpumask_test_cpu(cpu, buffer_b->cpumask)) 5062 goto out; 5063 5064 cpu_buffer_a = buffer_a->buffers[cpu]; 5065 cpu_buffer_b = buffer_b->buffers[cpu]; 5066 5067 /* At least make sure the two buffers are somewhat the same */ 5068 if (cpu_buffer_a->nr_pages != cpu_buffer_b->nr_pages) 5069 goto out; 5070 5071 ret = -EAGAIN; 5072 5073 if (atomic_read(&buffer_a->record_disabled)) 5074 goto out; 5075 5076 if (atomic_read(&buffer_b->record_disabled)) 5077 goto out; 5078 5079 if (atomic_read(&cpu_buffer_a->record_disabled)) 5080 goto out; 5081 5082 if (atomic_read(&cpu_buffer_b->record_disabled)) 5083 goto out; 5084 5085 /* 5086 * We can't do a synchronize_rcu here because this 5087 * function can be called in atomic context. 5088 * Normally this will be called from the same CPU as cpu. 5089 * If not it's up to the caller to protect this. 5090 */ 5091 atomic_inc(&cpu_buffer_a->record_disabled); 5092 atomic_inc(&cpu_buffer_b->record_disabled); 5093 5094 ret = -EBUSY; 5095 if (local_read(&cpu_buffer_a->committing)) 5096 goto out_dec; 5097 if (local_read(&cpu_buffer_b->committing)) 5098 goto out_dec; 5099 5100 buffer_a->buffers[cpu] = cpu_buffer_b; 5101 buffer_b->buffers[cpu] = cpu_buffer_a; 5102 5103 cpu_buffer_b->buffer = buffer_a; 5104 cpu_buffer_a->buffer = buffer_b; 5105 5106 ret = 0; 5107 5108 out_dec: 5109 atomic_dec(&cpu_buffer_a->record_disabled); 5110 atomic_dec(&cpu_buffer_b->record_disabled); 5111 out: 5112 return ret; 5113 } 5114 EXPORT_SYMBOL_GPL(ring_buffer_swap_cpu); 5115 #endif /* CONFIG_RING_BUFFER_ALLOW_SWAP */ 5116 5117 /** 5118 * ring_buffer_alloc_read_page - allocate a page to read from buffer 5119 * @buffer: the buffer to allocate for. 5120 * @cpu: the cpu buffer to allocate. 5121 * 5122 * This function is used in conjunction with ring_buffer_read_page. 5123 * When reading a full page from the ring buffer, these functions 5124 * can be used to speed up the process. The calling function should 5125 * allocate a few pages first with this function. Then when it 5126 * needs to get pages from the ring buffer, it passes the result 5127 * of this function into ring_buffer_read_page, which will swap 5128 * the page that was allocated, with the read page of the buffer. 5129 * 5130 * Returns: 5131 * The page allocated, or ERR_PTR 5132 */ 5133 void *ring_buffer_alloc_read_page(struct trace_buffer *buffer, int cpu) 5134 { 5135 struct ring_buffer_per_cpu *cpu_buffer; 5136 struct buffer_data_page *bpage = NULL; 5137 unsigned long flags; 5138 struct page *page; 5139 5140 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 5141 return ERR_PTR(-ENODEV); 5142 5143 cpu_buffer = buffer->buffers[cpu]; 5144 local_irq_save(flags); 5145 arch_spin_lock(&cpu_buffer->lock); 5146 5147 if (cpu_buffer->free_page) { 5148 bpage = cpu_buffer->free_page; 5149 cpu_buffer->free_page = NULL; 5150 } 5151 5152 arch_spin_unlock(&cpu_buffer->lock); 5153 local_irq_restore(flags); 5154 5155 if (bpage) 5156 goto out; 5157 5158 page = alloc_pages_node(cpu_to_node(cpu), 5159 GFP_KERNEL | __GFP_NORETRY, 0); 5160 if (!page) 5161 return ERR_PTR(-ENOMEM); 5162 5163 bpage = page_address(page); 5164 5165 out: 5166 rb_init_page(bpage); 5167 5168 return bpage; 5169 } 5170 EXPORT_SYMBOL_GPL(ring_buffer_alloc_read_page); 5171 5172 /** 5173 * ring_buffer_free_read_page - free an allocated read page 5174 * @buffer: the buffer the page was allocate for 5175 * @cpu: the cpu buffer the page came from 5176 * @data: the page to free 5177 * 5178 * Free a page allocated from ring_buffer_alloc_read_page. 5179 */ 5180 void ring_buffer_free_read_page(struct trace_buffer *buffer, int cpu, void *data) 5181 { 5182 struct ring_buffer_per_cpu *cpu_buffer = buffer->buffers[cpu]; 5183 struct buffer_data_page *bpage = data; 5184 struct page *page = virt_to_page(bpage); 5185 unsigned long flags; 5186 5187 /* If the page is still in use someplace else, we can't reuse it */ 5188 if (page_ref_count(page) > 1) 5189 goto out; 5190 5191 local_irq_save(flags); 5192 arch_spin_lock(&cpu_buffer->lock); 5193 5194 if (!cpu_buffer->free_page) { 5195 cpu_buffer->free_page = bpage; 5196 bpage = NULL; 5197 } 5198 5199 arch_spin_unlock(&cpu_buffer->lock); 5200 local_irq_restore(flags); 5201 5202 out: 5203 free_page((unsigned long)bpage); 5204 } 5205 EXPORT_SYMBOL_GPL(ring_buffer_free_read_page); 5206 5207 /** 5208 * ring_buffer_read_page - extract a page from the ring buffer 5209 * @buffer: buffer to extract from 5210 * @data_page: the page to use allocated from ring_buffer_alloc_read_page 5211 * @len: amount to extract 5212 * @cpu: the cpu of the buffer to extract 5213 * @full: should the extraction only happen when the page is full. 5214 * 5215 * This function will pull out a page from the ring buffer and consume it. 5216 * @data_page must be the address of the variable that was returned 5217 * from ring_buffer_alloc_read_page. This is because the page might be used 5218 * to swap with a page in the ring buffer. 5219 * 5220 * for example: 5221 * rpage = ring_buffer_alloc_read_page(buffer, cpu); 5222 * if (IS_ERR(rpage)) 5223 * return PTR_ERR(rpage); 5224 * ret = ring_buffer_read_page(buffer, &rpage, len, cpu, 0); 5225 * if (ret >= 0) 5226 * process_page(rpage, ret); 5227 * 5228 * When @full is set, the function will not return true unless 5229 * the writer is off the reader page. 5230 * 5231 * Note: it is up to the calling functions to handle sleeps and wakeups. 5232 * The ring buffer can be used anywhere in the kernel and can not 5233 * blindly call wake_up. The layer that uses the ring buffer must be 5234 * responsible for that. 5235 * 5236 * Returns: 5237 * >=0 if data has been transferred, returns the offset of consumed data. 5238 * <0 if no data has been transferred. 5239 */ 5240 int ring_buffer_read_page(struct trace_buffer *buffer, 5241 void **data_page, size_t len, int cpu, int full) 5242 { 5243 struct ring_buffer_per_cpu *cpu_buffer = buffer->buffers[cpu]; 5244 struct ring_buffer_event *event; 5245 struct buffer_data_page *bpage; 5246 struct buffer_page *reader; 5247 unsigned long missed_events; 5248 unsigned long flags; 5249 unsigned int commit; 5250 unsigned int read; 5251 u64 save_timestamp; 5252 int ret = -1; 5253 5254 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 5255 goto out; 5256 5257 /* 5258 * If len is not big enough to hold the page header, then 5259 * we can not copy anything. 5260 */ 5261 if (len <= BUF_PAGE_HDR_SIZE) 5262 goto out; 5263 5264 len -= BUF_PAGE_HDR_SIZE; 5265 5266 if (!data_page) 5267 goto out; 5268 5269 bpage = *data_page; 5270 if (!bpage) 5271 goto out; 5272 5273 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 5274 5275 reader = rb_get_reader_page(cpu_buffer); 5276 if (!reader) 5277 goto out_unlock; 5278 5279 event = rb_reader_event(cpu_buffer); 5280 5281 read = reader->read; 5282 commit = rb_page_commit(reader); 5283 5284 /* Check if any events were dropped */ 5285 missed_events = cpu_buffer->lost_events; 5286 5287 /* 5288 * If this page has been partially read or 5289 * if len is not big enough to read the rest of the page or 5290 * a writer is still on the page, then 5291 * we must copy the data from the page to the buffer. 5292 * Otherwise, we can simply swap the page with the one passed in. 5293 */ 5294 if (read || (len < (commit - read)) || 5295 cpu_buffer->reader_page == cpu_buffer->commit_page) { 5296 struct buffer_data_page *rpage = cpu_buffer->reader_page->page; 5297 unsigned int rpos = read; 5298 unsigned int pos = 0; 5299 unsigned int size; 5300 5301 if (full) 5302 goto out_unlock; 5303 5304 if (len > (commit - read)) 5305 len = (commit - read); 5306 5307 /* Always keep the time extend and data together */ 5308 size = rb_event_ts_length(event); 5309 5310 if (len < size) 5311 goto out_unlock; 5312 5313 /* save the current timestamp, since the user will need it */ 5314 save_timestamp = cpu_buffer->read_stamp; 5315 5316 /* Need to copy one event at a time */ 5317 do { 5318 /* We need the size of one event, because 5319 * rb_advance_reader only advances by one event, 5320 * whereas rb_event_ts_length may include the size of 5321 * one or two events. 5322 * We have already ensured there's enough space if this 5323 * is a time extend. */ 5324 size = rb_event_length(event); 5325 memcpy(bpage->data + pos, rpage->data + rpos, size); 5326 5327 len -= size; 5328 5329 rb_advance_reader(cpu_buffer); 5330 rpos = reader->read; 5331 pos += size; 5332 5333 if (rpos >= commit) 5334 break; 5335 5336 event = rb_reader_event(cpu_buffer); 5337 /* Always keep the time extend and data together */ 5338 size = rb_event_ts_length(event); 5339 } while (len >= size); 5340 5341 /* update bpage */ 5342 local_set(&bpage->commit, pos); 5343 bpage->time_stamp = save_timestamp; 5344 5345 /* we copied everything to the beginning */ 5346 read = 0; 5347 } else { 5348 /* update the entry counter */ 5349 cpu_buffer->read += rb_page_entries(reader); 5350 cpu_buffer->read_bytes += BUF_PAGE_SIZE; 5351 5352 /* swap the pages */ 5353 rb_init_page(bpage); 5354 bpage = reader->page; 5355 reader->page = *data_page; 5356 local_set(&reader->write, 0); 5357 local_set(&reader->entries, 0); 5358 reader->read = 0; 5359 *data_page = bpage; 5360 5361 /* 5362 * Use the real_end for the data size, 5363 * This gives us a chance to store the lost events 5364 * on the page. 5365 */ 5366 if (reader->real_end) 5367 local_set(&bpage->commit, reader->real_end); 5368 } 5369 ret = read; 5370 5371 cpu_buffer->lost_events = 0; 5372 5373 commit = local_read(&bpage->commit); 5374 /* 5375 * Set a flag in the commit field if we lost events 5376 */ 5377 if (missed_events) { 5378 /* If there is room at the end of the page to save the 5379 * missed events, then record it there. 5380 */ 5381 if (BUF_PAGE_SIZE - commit >= sizeof(missed_events)) { 5382 memcpy(&bpage->data[commit], &missed_events, 5383 sizeof(missed_events)); 5384 local_add(RB_MISSED_STORED, &bpage->commit); 5385 commit += sizeof(missed_events); 5386 } 5387 local_add(RB_MISSED_EVENTS, &bpage->commit); 5388 } 5389 5390 /* 5391 * This page may be off to user land. Zero it out here. 5392 */ 5393 if (commit < BUF_PAGE_SIZE) 5394 memset(&bpage->data[commit], 0, BUF_PAGE_SIZE - commit); 5395 5396 out_unlock: 5397 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 5398 5399 out: 5400 return ret; 5401 } 5402 EXPORT_SYMBOL_GPL(ring_buffer_read_page); 5403 5404 /* 5405 * We only allocate new buffers, never free them if the CPU goes down. 5406 * If we were to free the buffer, then the user would lose any trace that was in 5407 * the buffer. 5408 */ 5409 int trace_rb_cpu_prepare(unsigned int cpu, struct hlist_node *node) 5410 { 5411 struct trace_buffer *buffer; 5412 long nr_pages_same; 5413 int cpu_i; 5414 unsigned long nr_pages; 5415 5416 buffer = container_of(node, struct trace_buffer, node); 5417 if (cpumask_test_cpu(cpu, buffer->cpumask)) 5418 return 0; 5419 5420 nr_pages = 0; 5421 nr_pages_same = 1; 5422 /* check if all cpu sizes are same */ 5423 for_each_buffer_cpu(buffer, cpu_i) { 5424 /* fill in the size from first enabled cpu */ 5425 if (nr_pages == 0) 5426 nr_pages = buffer->buffers[cpu_i]->nr_pages; 5427 if (nr_pages != buffer->buffers[cpu_i]->nr_pages) { 5428 nr_pages_same = 0; 5429 break; 5430 } 5431 } 5432 /* allocate minimum pages, user can later expand it */ 5433 if (!nr_pages_same) 5434 nr_pages = 2; 5435 buffer->buffers[cpu] = 5436 rb_allocate_cpu_buffer(buffer, nr_pages, cpu); 5437 if (!buffer->buffers[cpu]) { 5438 WARN(1, "failed to allocate ring buffer on CPU %u\n", 5439 cpu); 5440 return -ENOMEM; 5441 } 5442 smp_wmb(); 5443 cpumask_set_cpu(cpu, buffer->cpumask); 5444 return 0; 5445 } 5446 5447 #ifdef CONFIG_RING_BUFFER_STARTUP_TEST 5448 /* 5449 * This is a basic integrity check of the ring buffer. 5450 * Late in the boot cycle this test will run when configured in. 5451 * It will kick off a thread per CPU that will go into a loop 5452 * writing to the per cpu ring buffer various sizes of data. 5453 * Some of the data will be large items, some small. 5454 * 5455 * Another thread is created that goes into a spin, sending out 5456 * IPIs to the other CPUs to also write into the ring buffer. 5457 * this is to test the nesting ability of the buffer. 5458 * 5459 * Basic stats are recorded and reported. If something in the 5460 * ring buffer should happen that's not expected, a big warning 5461 * is displayed and all ring buffers are disabled. 5462 */ 5463 static struct task_struct *rb_threads[NR_CPUS] __initdata; 5464 5465 struct rb_test_data { 5466 struct trace_buffer *buffer; 5467 unsigned long events; 5468 unsigned long bytes_written; 5469 unsigned long bytes_alloc; 5470 unsigned long bytes_dropped; 5471 unsigned long events_nested; 5472 unsigned long bytes_written_nested; 5473 unsigned long bytes_alloc_nested; 5474 unsigned long bytes_dropped_nested; 5475 int min_size_nested; 5476 int max_size_nested; 5477 int max_size; 5478 int min_size; 5479 int cpu; 5480 int cnt; 5481 }; 5482 5483 static struct rb_test_data rb_data[NR_CPUS] __initdata; 5484 5485 /* 1 meg per cpu */ 5486 #define RB_TEST_BUFFER_SIZE 1048576 5487 5488 static char rb_string[] __initdata = 5489 "abcdefghijklmnopqrstuvwxyz1234567890!@#$%^&*()?+\\" 5490 "?+|:';\",.<>/?abcdefghijklmnopqrstuvwxyz1234567890" 5491 "!@#$%^&*()?+\\?+|:';\",.<>/?abcdefghijklmnopqrstuv"; 5492 5493 static bool rb_test_started __initdata; 5494 5495 struct rb_item { 5496 int size; 5497 char str[]; 5498 }; 5499 5500 static __init int rb_write_something(struct rb_test_data *data, bool nested) 5501 { 5502 struct ring_buffer_event *event; 5503 struct rb_item *item; 5504 bool started; 5505 int event_len; 5506 int size; 5507 int len; 5508 int cnt; 5509 5510 /* Have nested writes different that what is written */ 5511 cnt = data->cnt + (nested ? 27 : 0); 5512 5513 /* Multiply cnt by ~e, to make some unique increment */ 5514 size = (cnt * 68 / 25) % (sizeof(rb_string) - 1); 5515 5516 len = size + sizeof(struct rb_item); 5517 5518 started = rb_test_started; 5519 /* read rb_test_started before checking buffer enabled */ 5520 smp_rmb(); 5521 5522 event = ring_buffer_lock_reserve(data->buffer, len); 5523 if (!event) { 5524 /* Ignore dropped events before test starts. */ 5525 if (started) { 5526 if (nested) 5527 data->bytes_dropped += len; 5528 else 5529 data->bytes_dropped_nested += len; 5530 } 5531 return len; 5532 } 5533 5534 event_len = ring_buffer_event_length(event); 5535 5536 if (RB_WARN_ON(data->buffer, event_len < len)) 5537 goto out; 5538 5539 item = ring_buffer_event_data(event); 5540 item->size = size; 5541 memcpy(item->str, rb_string, size); 5542 5543 if (nested) { 5544 data->bytes_alloc_nested += event_len; 5545 data->bytes_written_nested += len; 5546 data->events_nested++; 5547 if (!data->min_size_nested || len < data->min_size_nested) 5548 data->min_size_nested = len; 5549 if (len > data->max_size_nested) 5550 data->max_size_nested = len; 5551 } else { 5552 data->bytes_alloc += event_len; 5553 data->bytes_written += len; 5554 data->events++; 5555 if (!data->min_size || len < data->min_size) 5556 data->max_size = len; 5557 if (len > data->max_size) 5558 data->max_size = len; 5559 } 5560 5561 out: 5562 ring_buffer_unlock_commit(data->buffer, event); 5563 5564 return 0; 5565 } 5566 5567 static __init int rb_test(void *arg) 5568 { 5569 struct rb_test_data *data = arg; 5570 5571 while (!kthread_should_stop()) { 5572 rb_write_something(data, false); 5573 data->cnt++; 5574 5575 set_current_state(TASK_INTERRUPTIBLE); 5576 /* Now sleep between a min of 100-300us and a max of 1ms */ 5577 usleep_range(((data->cnt % 3) + 1) * 100, 1000); 5578 } 5579 5580 return 0; 5581 } 5582 5583 static __init void rb_ipi(void *ignore) 5584 { 5585 struct rb_test_data *data; 5586 int cpu = smp_processor_id(); 5587 5588 data = &rb_data[cpu]; 5589 rb_write_something(data, true); 5590 } 5591 5592 static __init int rb_hammer_test(void *arg) 5593 { 5594 while (!kthread_should_stop()) { 5595 5596 /* Send an IPI to all cpus to write data! */ 5597 smp_call_function(rb_ipi, NULL, 1); 5598 /* No sleep, but for non preempt, let others run */ 5599 schedule(); 5600 } 5601 5602 return 0; 5603 } 5604 5605 static __init int test_ringbuffer(void) 5606 { 5607 struct task_struct *rb_hammer; 5608 struct trace_buffer *buffer; 5609 int cpu; 5610 int ret = 0; 5611 5612 if (security_locked_down(LOCKDOWN_TRACEFS)) { 5613 pr_warn("Lockdown is enabled, skipping ring buffer tests\n"); 5614 return 0; 5615 } 5616 5617 pr_info("Running ring buffer tests...\n"); 5618 5619 buffer = ring_buffer_alloc(RB_TEST_BUFFER_SIZE, RB_FL_OVERWRITE); 5620 if (WARN_ON(!buffer)) 5621 return 0; 5622 5623 /* Disable buffer so that threads can't write to it yet */ 5624 ring_buffer_record_off(buffer); 5625 5626 for_each_online_cpu(cpu) { 5627 rb_data[cpu].buffer = buffer; 5628 rb_data[cpu].cpu = cpu; 5629 rb_data[cpu].cnt = cpu; 5630 rb_threads[cpu] = kthread_create(rb_test, &rb_data[cpu], 5631 "rbtester/%d", cpu); 5632 if (WARN_ON(IS_ERR(rb_threads[cpu]))) { 5633 pr_cont("FAILED\n"); 5634 ret = PTR_ERR(rb_threads[cpu]); 5635 goto out_free; 5636 } 5637 5638 kthread_bind(rb_threads[cpu], cpu); 5639 wake_up_process(rb_threads[cpu]); 5640 } 5641 5642 /* Now create the rb hammer! */ 5643 rb_hammer = kthread_run(rb_hammer_test, NULL, "rbhammer"); 5644 if (WARN_ON(IS_ERR(rb_hammer))) { 5645 pr_cont("FAILED\n"); 5646 ret = PTR_ERR(rb_hammer); 5647 goto out_free; 5648 } 5649 5650 ring_buffer_record_on(buffer); 5651 /* 5652 * Show buffer is enabled before setting rb_test_started. 5653 * Yes there's a small race window where events could be 5654 * dropped and the thread wont catch it. But when a ring 5655 * buffer gets enabled, there will always be some kind of 5656 * delay before other CPUs see it. Thus, we don't care about 5657 * those dropped events. We care about events dropped after 5658 * the threads see that the buffer is active. 5659 */ 5660 smp_wmb(); 5661 rb_test_started = true; 5662 5663 set_current_state(TASK_INTERRUPTIBLE); 5664 /* Just run for 10 seconds */; 5665 schedule_timeout(10 * HZ); 5666 5667 kthread_stop(rb_hammer); 5668 5669 out_free: 5670 for_each_online_cpu(cpu) { 5671 if (!rb_threads[cpu]) 5672 break; 5673 kthread_stop(rb_threads[cpu]); 5674 } 5675 if (ret) { 5676 ring_buffer_free(buffer); 5677 return ret; 5678 } 5679 5680 /* Report! */ 5681 pr_info("finished\n"); 5682 for_each_online_cpu(cpu) { 5683 struct ring_buffer_event *event; 5684 struct rb_test_data *data = &rb_data[cpu]; 5685 struct rb_item *item; 5686 unsigned long total_events; 5687 unsigned long total_dropped; 5688 unsigned long total_written; 5689 unsigned long total_alloc; 5690 unsigned long total_read = 0; 5691 unsigned long total_size = 0; 5692 unsigned long total_len = 0; 5693 unsigned long total_lost = 0; 5694 unsigned long lost; 5695 int big_event_size; 5696 int small_event_size; 5697 5698 ret = -1; 5699 5700 total_events = data->events + data->events_nested; 5701 total_written = data->bytes_written + data->bytes_written_nested; 5702 total_alloc = data->bytes_alloc + data->bytes_alloc_nested; 5703 total_dropped = data->bytes_dropped + data->bytes_dropped_nested; 5704 5705 big_event_size = data->max_size + data->max_size_nested; 5706 small_event_size = data->min_size + data->min_size_nested; 5707 5708 pr_info("CPU %d:\n", cpu); 5709 pr_info(" events: %ld\n", total_events); 5710 pr_info(" dropped bytes: %ld\n", total_dropped); 5711 pr_info(" alloced bytes: %ld\n", total_alloc); 5712 pr_info(" written bytes: %ld\n", total_written); 5713 pr_info(" biggest event: %d\n", big_event_size); 5714 pr_info(" smallest event: %d\n", small_event_size); 5715 5716 if (RB_WARN_ON(buffer, total_dropped)) 5717 break; 5718 5719 ret = 0; 5720 5721 while ((event = ring_buffer_consume(buffer, cpu, NULL, &lost))) { 5722 total_lost += lost; 5723 item = ring_buffer_event_data(event); 5724 total_len += ring_buffer_event_length(event); 5725 total_size += item->size + sizeof(struct rb_item); 5726 if (memcmp(&item->str[0], rb_string, item->size) != 0) { 5727 pr_info("FAILED!\n"); 5728 pr_info("buffer had: %.*s\n", item->size, item->str); 5729 pr_info("expected: %.*s\n", item->size, rb_string); 5730 RB_WARN_ON(buffer, 1); 5731 ret = -1; 5732 break; 5733 } 5734 total_read++; 5735 } 5736 if (ret) 5737 break; 5738 5739 ret = -1; 5740 5741 pr_info(" read events: %ld\n", total_read); 5742 pr_info(" lost events: %ld\n", total_lost); 5743 pr_info(" total events: %ld\n", total_lost + total_read); 5744 pr_info(" recorded len bytes: %ld\n", total_len); 5745 pr_info(" recorded size bytes: %ld\n", total_size); 5746 if (total_lost) 5747 pr_info(" With dropped events, record len and size may not match\n" 5748 " alloced and written from above\n"); 5749 if (!total_lost) { 5750 if (RB_WARN_ON(buffer, total_len != total_alloc || 5751 total_size != total_written)) 5752 break; 5753 } 5754 if (RB_WARN_ON(buffer, total_lost + total_read != total_events)) 5755 break; 5756 5757 ret = 0; 5758 } 5759 if (!ret) 5760 pr_info("Ring buffer PASSED!\n"); 5761 5762 ring_buffer_free(buffer); 5763 return 0; 5764 } 5765 5766 late_initcall(test_ringbuffer); 5767 #endif /* CONFIG_RING_BUFFER_STARTUP_TEST */ 5768