1 /* 2 * Generic ring buffer 3 * 4 * Copyright (C) 2008 Steven Rostedt <srostedt@redhat.com> 5 */ 6 #include <linux/trace_events.h> 7 #include <linux/ring_buffer.h> 8 #include <linux/trace_clock.h> 9 #include <linux/sched/clock.h> 10 #include <linux/trace_seq.h> 11 #include <linux/spinlock.h> 12 #include <linux/irq_work.h> 13 #include <linux/uaccess.h> 14 #include <linux/hardirq.h> 15 #include <linux/kthread.h> /* for self test */ 16 #include <linux/kmemcheck.h> 17 #include <linux/module.h> 18 #include <linux/percpu.h> 19 #include <linux/mutex.h> 20 #include <linux/delay.h> 21 #include <linux/slab.h> 22 #include <linux/init.h> 23 #include <linux/hash.h> 24 #include <linux/list.h> 25 #include <linux/cpu.h> 26 27 #include <asm/local.h> 28 29 static void update_pages_handler(struct work_struct *work); 30 31 /* 32 * The ring buffer header is special. We must manually up keep it. 33 */ 34 int ring_buffer_print_entry_header(struct trace_seq *s) 35 { 36 trace_seq_puts(s, "# compressed entry header\n"); 37 trace_seq_puts(s, "\ttype_len : 5 bits\n"); 38 trace_seq_puts(s, "\ttime_delta : 27 bits\n"); 39 trace_seq_puts(s, "\tarray : 32 bits\n"); 40 trace_seq_putc(s, '\n'); 41 trace_seq_printf(s, "\tpadding : type == %d\n", 42 RINGBUF_TYPE_PADDING); 43 trace_seq_printf(s, "\ttime_extend : type == %d\n", 44 RINGBUF_TYPE_TIME_EXTEND); 45 trace_seq_printf(s, "\tdata max type_len == %d\n", 46 RINGBUF_TYPE_DATA_TYPE_LEN_MAX); 47 48 return !trace_seq_has_overflowed(s); 49 } 50 51 /* 52 * The ring buffer is made up of a list of pages. A separate list of pages is 53 * allocated for each CPU. A writer may only write to a buffer that is 54 * associated with the CPU it is currently executing on. A reader may read 55 * from any per cpu buffer. 56 * 57 * The reader is special. For each per cpu buffer, the reader has its own 58 * reader page. When a reader has read the entire reader page, this reader 59 * page is swapped with another page in the ring buffer. 60 * 61 * Now, as long as the writer is off the reader page, the reader can do what 62 * ever it wants with that page. The writer will never write to that page 63 * again (as long as it is out of the ring buffer). 64 * 65 * Here's some silly ASCII art. 66 * 67 * +------+ 68 * |reader| RING BUFFER 69 * |page | 70 * +------+ +---+ +---+ +---+ 71 * | |-->| |-->| | 72 * +---+ +---+ +---+ 73 * ^ | 74 * | | 75 * +---------------+ 76 * 77 * 78 * +------+ 79 * |reader| RING BUFFER 80 * |page |------------------v 81 * +------+ +---+ +---+ +---+ 82 * | |-->| |-->| | 83 * +---+ +---+ +---+ 84 * ^ | 85 * | | 86 * +---------------+ 87 * 88 * 89 * +------+ 90 * |reader| RING BUFFER 91 * |page |------------------v 92 * +------+ +---+ +---+ +---+ 93 * ^ | |-->| |-->| | 94 * | +---+ +---+ +---+ 95 * | | 96 * | | 97 * +------------------------------+ 98 * 99 * 100 * +------+ 101 * |buffer| RING BUFFER 102 * |page |------------------v 103 * +------+ +---+ +---+ +---+ 104 * ^ | | | |-->| | 105 * | New +---+ +---+ +---+ 106 * | Reader------^ | 107 * | page | 108 * +------------------------------+ 109 * 110 * 111 * After we make this swap, the reader can hand this page off to the splice 112 * code and be done with it. It can even allocate a new page if it needs to 113 * and swap that into the ring buffer. 114 * 115 * We will be using cmpxchg soon to make all this lockless. 116 * 117 */ 118 119 /* Used for individual buffers (after the counter) */ 120 #define RB_BUFFER_OFF (1 << 20) 121 122 #define BUF_PAGE_HDR_SIZE offsetof(struct buffer_data_page, data) 123 124 #define RB_EVNT_HDR_SIZE (offsetof(struct ring_buffer_event, array)) 125 #define RB_ALIGNMENT 4U 126 #define RB_MAX_SMALL_DATA (RB_ALIGNMENT * RINGBUF_TYPE_DATA_TYPE_LEN_MAX) 127 #define RB_EVNT_MIN_SIZE 8U /* two 32bit words */ 128 129 #ifndef CONFIG_HAVE_64BIT_ALIGNED_ACCESS 130 # define RB_FORCE_8BYTE_ALIGNMENT 0 131 # define RB_ARCH_ALIGNMENT RB_ALIGNMENT 132 #else 133 # define RB_FORCE_8BYTE_ALIGNMENT 1 134 # define RB_ARCH_ALIGNMENT 8U 135 #endif 136 137 #define RB_ALIGN_DATA __aligned(RB_ARCH_ALIGNMENT) 138 139 /* define RINGBUF_TYPE_DATA for 'case RINGBUF_TYPE_DATA:' */ 140 #define RINGBUF_TYPE_DATA 0 ... RINGBUF_TYPE_DATA_TYPE_LEN_MAX 141 142 enum { 143 RB_LEN_TIME_EXTEND = 8, 144 RB_LEN_TIME_STAMP = 16, 145 }; 146 147 #define skip_time_extend(event) \ 148 ((struct ring_buffer_event *)((char *)event + RB_LEN_TIME_EXTEND)) 149 150 static inline int rb_null_event(struct ring_buffer_event *event) 151 { 152 return event->type_len == RINGBUF_TYPE_PADDING && !event->time_delta; 153 } 154 155 static void rb_event_set_padding(struct ring_buffer_event *event) 156 { 157 /* padding has a NULL time_delta */ 158 event->type_len = RINGBUF_TYPE_PADDING; 159 event->time_delta = 0; 160 } 161 162 static unsigned 163 rb_event_data_length(struct ring_buffer_event *event) 164 { 165 unsigned length; 166 167 if (event->type_len) 168 length = event->type_len * RB_ALIGNMENT; 169 else 170 length = event->array[0]; 171 return length + RB_EVNT_HDR_SIZE; 172 } 173 174 /* 175 * Return the length of the given event. Will return 176 * the length of the time extend if the event is a 177 * time extend. 178 */ 179 static inline unsigned 180 rb_event_length(struct ring_buffer_event *event) 181 { 182 switch (event->type_len) { 183 case RINGBUF_TYPE_PADDING: 184 if (rb_null_event(event)) 185 /* undefined */ 186 return -1; 187 return event->array[0] + RB_EVNT_HDR_SIZE; 188 189 case RINGBUF_TYPE_TIME_EXTEND: 190 return RB_LEN_TIME_EXTEND; 191 192 case RINGBUF_TYPE_TIME_STAMP: 193 return RB_LEN_TIME_STAMP; 194 195 case RINGBUF_TYPE_DATA: 196 return rb_event_data_length(event); 197 default: 198 BUG(); 199 } 200 /* not hit */ 201 return 0; 202 } 203 204 /* 205 * Return total length of time extend and data, 206 * or just the event length for all other events. 207 */ 208 static inline unsigned 209 rb_event_ts_length(struct ring_buffer_event *event) 210 { 211 unsigned len = 0; 212 213 if (event->type_len == RINGBUF_TYPE_TIME_EXTEND) { 214 /* time extends include the data event after it */ 215 len = RB_LEN_TIME_EXTEND; 216 event = skip_time_extend(event); 217 } 218 return len + rb_event_length(event); 219 } 220 221 /** 222 * ring_buffer_event_length - return the length of the event 223 * @event: the event to get the length of 224 * 225 * Returns the size of the data load of a data event. 226 * If the event is something other than a data event, it 227 * returns the size of the event itself. With the exception 228 * of a TIME EXTEND, where it still returns the size of the 229 * data load of the data event after it. 230 */ 231 unsigned ring_buffer_event_length(struct ring_buffer_event *event) 232 { 233 unsigned length; 234 235 if (event->type_len == RINGBUF_TYPE_TIME_EXTEND) 236 event = skip_time_extend(event); 237 238 length = rb_event_length(event); 239 if (event->type_len > RINGBUF_TYPE_DATA_TYPE_LEN_MAX) 240 return length; 241 length -= RB_EVNT_HDR_SIZE; 242 if (length > RB_MAX_SMALL_DATA + sizeof(event->array[0])) 243 length -= sizeof(event->array[0]); 244 return length; 245 } 246 EXPORT_SYMBOL_GPL(ring_buffer_event_length); 247 248 /* inline for ring buffer fast paths */ 249 static __always_inline void * 250 rb_event_data(struct ring_buffer_event *event) 251 { 252 if (event->type_len == RINGBUF_TYPE_TIME_EXTEND) 253 event = skip_time_extend(event); 254 BUG_ON(event->type_len > RINGBUF_TYPE_DATA_TYPE_LEN_MAX); 255 /* If length is in len field, then array[0] has the data */ 256 if (event->type_len) 257 return (void *)&event->array[0]; 258 /* Otherwise length is in array[0] and array[1] has the data */ 259 return (void *)&event->array[1]; 260 } 261 262 /** 263 * ring_buffer_event_data - return the data of the event 264 * @event: the event to get the data from 265 */ 266 void *ring_buffer_event_data(struct ring_buffer_event *event) 267 { 268 return rb_event_data(event); 269 } 270 EXPORT_SYMBOL_GPL(ring_buffer_event_data); 271 272 #define for_each_buffer_cpu(buffer, cpu) \ 273 for_each_cpu(cpu, buffer->cpumask) 274 275 #define TS_SHIFT 27 276 #define TS_MASK ((1ULL << TS_SHIFT) - 1) 277 #define TS_DELTA_TEST (~TS_MASK) 278 279 /* Flag when events were overwritten */ 280 #define RB_MISSED_EVENTS (1 << 31) 281 /* Missed count stored at end */ 282 #define RB_MISSED_STORED (1 << 30) 283 284 struct buffer_data_page { 285 u64 time_stamp; /* page time stamp */ 286 local_t commit; /* write committed index */ 287 unsigned char data[] RB_ALIGN_DATA; /* data of buffer page */ 288 }; 289 290 /* 291 * Note, the buffer_page list must be first. The buffer pages 292 * are allocated in cache lines, which means that each buffer 293 * page will be at the beginning of a cache line, and thus 294 * the least significant bits will be zero. We use this to 295 * add flags in the list struct pointers, to make the ring buffer 296 * lockless. 297 */ 298 struct buffer_page { 299 struct list_head list; /* list of buffer pages */ 300 local_t write; /* index for next write */ 301 unsigned read; /* index for next read */ 302 local_t entries; /* entries on this page */ 303 unsigned long real_end; /* real end of data */ 304 struct buffer_data_page *page; /* Actual data page */ 305 }; 306 307 /* 308 * The buffer page counters, write and entries, must be reset 309 * atomically when crossing page boundaries. To synchronize this 310 * update, two counters are inserted into the number. One is 311 * the actual counter for the write position or count on the page. 312 * 313 * The other is a counter of updaters. Before an update happens 314 * the update partition of the counter is incremented. This will 315 * allow the updater to update the counter atomically. 316 * 317 * The counter is 20 bits, and the state data is 12. 318 */ 319 #define RB_WRITE_MASK 0xfffff 320 #define RB_WRITE_INTCNT (1 << 20) 321 322 static void rb_init_page(struct buffer_data_page *bpage) 323 { 324 local_set(&bpage->commit, 0); 325 } 326 327 /** 328 * ring_buffer_page_len - the size of data on the page. 329 * @page: The page to read 330 * 331 * Returns the amount of data on the page, including buffer page header. 332 */ 333 size_t ring_buffer_page_len(void *page) 334 { 335 return local_read(&((struct buffer_data_page *)page)->commit) 336 + BUF_PAGE_HDR_SIZE; 337 } 338 339 /* 340 * Also stolen from mm/slob.c. Thanks to Mathieu Desnoyers for pointing 341 * this issue out. 342 */ 343 static void free_buffer_page(struct buffer_page *bpage) 344 { 345 free_page((unsigned long)bpage->page); 346 kfree(bpage); 347 } 348 349 /* 350 * We need to fit the time_stamp delta into 27 bits. 351 */ 352 static inline int test_time_stamp(u64 delta) 353 { 354 if (delta & TS_DELTA_TEST) 355 return 1; 356 return 0; 357 } 358 359 #define BUF_PAGE_SIZE (PAGE_SIZE - BUF_PAGE_HDR_SIZE) 360 361 /* Max payload is BUF_PAGE_SIZE - header (8bytes) */ 362 #define BUF_MAX_DATA_SIZE (BUF_PAGE_SIZE - (sizeof(u32) * 2)) 363 364 int ring_buffer_print_page_header(struct trace_seq *s) 365 { 366 struct buffer_data_page field; 367 368 trace_seq_printf(s, "\tfield: u64 timestamp;\t" 369 "offset:0;\tsize:%u;\tsigned:%u;\n", 370 (unsigned int)sizeof(field.time_stamp), 371 (unsigned int)is_signed_type(u64)); 372 373 trace_seq_printf(s, "\tfield: local_t commit;\t" 374 "offset:%u;\tsize:%u;\tsigned:%u;\n", 375 (unsigned int)offsetof(typeof(field), commit), 376 (unsigned int)sizeof(field.commit), 377 (unsigned int)is_signed_type(long)); 378 379 trace_seq_printf(s, "\tfield: int overwrite;\t" 380 "offset:%u;\tsize:%u;\tsigned:%u;\n", 381 (unsigned int)offsetof(typeof(field), commit), 382 1, 383 (unsigned int)is_signed_type(long)); 384 385 trace_seq_printf(s, "\tfield: char data;\t" 386 "offset:%u;\tsize:%u;\tsigned:%u;\n", 387 (unsigned int)offsetof(typeof(field), data), 388 (unsigned int)BUF_PAGE_SIZE, 389 (unsigned int)is_signed_type(char)); 390 391 return !trace_seq_has_overflowed(s); 392 } 393 394 struct rb_irq_work { 395 struct irq_work work; 396 wait_queue_head_t waiters; 397 wait_queue_head_t full_waiters; 398 bool waiters_pending; 399 bool full_waiters_pending; 400 bool wakeup_full; 401 }; 402 403 /* 404 * Structure to hold event state and handle nested events. 405 */ 406 struct rb_event_info { 407 u64 ts; 408 u64 delta; 409 unsigned long length; 410 struct buffer_page *tail_page; 411 int add_timestamp; 412 }; 413 414 /* 415 * Used for which event context the event is in. 416 * NMI = 0 417 * IRQ = 1 418 * SOFTIRQ = 2 419 * NORMAL = 3 420 * 421 * See trace_recursive_lock() comment below for more details. 422 */ 423 enum { 424 RB_CTX_NMI, 425 RB_CTX_IRQ, 426 RB_CTX_SOFTIRQ, 427 RB_CTX_NORMAL, 428 RB_CTX_MAX 429 }; 430 431 /* 432 * head_page == tail_page && head == tail then buffer is empty. 433 */ 434 struct ring_buffer_per_cpu { 435 int cpu; 436 atomic_t record_disabled; 437 struct ring_buffer *buffer; 438 raw_spinlock_t reader_lock; /* serialize readers */ 439 arch_spinlock_t lock; 440 struct lock_class_key lock_key; 441 struct buffer_data_page *free_page; 442 unsigned long nr_pages; 443 unsigned int current_context; 444 struct list_head *pages; 445 struct buffer_page *head_page; /* read from head */ 446 struct buffer_page *tail_page; /* write to tail */ 447 struct buffer_page *commit_page; /* committed pages */ 448 struct buffer_page *reader_page; 449 unsigned long lost_events; 450 unsigned long last_overrun; 451 local_t entries_bytes; 452 local_t entries; 453 local_t overrun; 454 local_t commit_overrun; 455 local_t dropped_events; 456 local_t committing; 457 local_t commits; 458 unsigned long read; 459 unsigned long read_bytes; 460 u64 write_stamp; 461 u64 read_stamp; 462 /* ring buffer pages to update, > 0 to add, < 0 to remove */ 463 long nr_pages_to_update; 464 struct list_head new_pages; /* new pages to add */ 465 struct work_struct update_pages_work; 466 struct completion update_done; 467 468 struct rb_irq_work irq_work; 469 }; 470 471 struct ring_buffer { 472 unsigned flags; 473 int cpus; 474 atomic_t record_disabled; 475 atomic_t resize_disabled; 476 cpumask_var_t cpumask; 477 478 struct lock_class_key *reader_lock_key; 479 480 struct mutex mutex; 481 482 struct ring_buffer_per_cpu **buffers; 483 484 struct hlist_node node; 485 u64 (*clock)(void); 486 487 struct rb_irq_work irq_work; 488 }; 489 490 struct ring_buffer_iter { 491 struct ring_buffer_per_cpu *cpu_buffer; 492 unsigned long head; 493 struct buffer_page *head_page; 494 struct buffer_page *cache_reader_page; 495 unsigned long cache_read; 496 u64 read_stamp; 497 }; 498 499 /* 500 * rb_wake_up_waiters - wake up tasks waiting for ring buffer input 501 * 502 * Schedules a delayed work to wake up any task that is blocked on the 503 * ring buffer waiters queue. 504 */ 505 static void rb_wake_up_waiters(struct irq_work *work) 506 { 507 struct rb_irq_work *rbwork = container_of(work, struct rb_irq_work, work); 508 509 wake_up_all(&rbwork->waiters); 510 if (rbwork->wakeup_full) { 511 rbwork->wakeup_full = false; 512 wake_up_all(&rbwork->full_waiters); 513 } 514 } 515 516 /** 517 * ring_buffer_wait - wait for input to the ring buffer 518 * @buffer: buffer to wait on 519 * @cpu: the cpu buffer to wait on 520 * @full: wait until a full page is available, if @cpu != RING_BUFFER_ALL_CPUS 521 * 522 * If @cpu == RING_BUFFER_ALL_CPUS then the task will wake up as soon 523 * as data is added to any of the @buffer's cpu buffers. Otherwise 524 * it will wait for data to be added to a specific cpu buffer. 525 */ 526 int ring_buffer_wait(struct ring_buffer *buffer, int cpu, bool full) 527 { 528 struct ring_buffer_per_cpu *uninitialized_var(cpu_buffer); 529 DEFINE_WAIT(wait); 530 struct rb_irq_work *work; 531 int ret = 0; 532 533 /* 534 * Depending on what the caller is waiting for, either any 535 * data in any cpu buffer, or a specific buffer, put the 536 * caller on the appropriate wait queue. 537 */ 538 if (cpu == RING_BUFFER_ALL_CPUS) { 539 work = &buffer->irq_work; 540 /* Full only makes sense on per cpu reads */ 541 full = false; 542 } else { 543 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 544 return -ENODEV; 545 cpu_buffer = buffer->buffers[cpu]; 546 work = &cpu_buffer->irq_work; 547 } 548 549 550 while (true) { 551 if (full) 552 prepare_to_wait(&work->full_waiters, &wait, TASK_INTERRUPTIBLE); 553 else 554 prepare_to_wait(&work->waiters, &wait, TASK_INTERRUPTIBLE); 555 556 /* 557 * The events can happen in critical sections where 558 * checking a work queue can cause deadlocks. 559 * After adding a task to the queue, this flag is set 560 * only to notify events to try to wake up the queue 561 * using irq_work. 562 * 563 * We don't clear it even if the buffer is no longer 564 * empty. The flag only causes the next event to run 565 * irq_work to do the work queue wake up. The worse 566 * that can happen if we race with !trace_empty() is that 567 * an event will cause an irq_work to try to wake up 568 * an empty queue. 569 * 570 * There's no reason to protect this flag either, as 571 * the work queue and irq_work logic will do the necessary 572 * synchronization for the wake ups. The only thing 573 * that is necessary is that the wake up happens after 574 * a task has been queued. It's OK for spurious wake ups. 575 */ 576 if (full) 577 work->full_waiters_pending = true; 578 else 579 work->waiters_pending = true; 580 581 if (signal_pending(current)) { 582 ret = -EINTR; 583 break; 584 } 585 586 if (cpu == RING_BUFFER_ALL_CPUS && !ring_buffer_empty(buffer)) 587 break; 588 589 if (cpu != RING_BUFFER_ALL_CPUS && 590 !ring_buffer_empty_cpu(buffer, cpu)) { 591 unsigned long flags; 592 bool pagebusy; 593 594 if (!full) 595 break; 596 597 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 598 pagebusy = cpu_buffer->reader_page == cpu_buffer->commit_page; 599 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 600 601 if (!pagebusy) 602 break; 603 } 604 605 schedule(); 606 } 607 608 if (full) 609 finish_wait(&work->full_waiters, &wait); 610 else 611 finish_wait(&work->waiters, &wait); 612 613 return ret; 614 } 615 616 /** 617 * ring_buffer_poll_wait - poll on buffer input 618 * @buffer: buffer to wait on 619 * @cpu: the cpu buffer to wait on 620 * @filp: the file descriptor 621 * @poll_table: The poll descriptor 622 * 623 * If @cpu == RING_BUFFER_ALL_CPUS then the task will wake up as soon 624 * as data is added to any of the @buffer's cpu buffers. Otherwise 625 * it will wait for data to be added to a specific cpu buffer. 626 * 627 * Returns POLLIN | POLLRDNORM if data exists in the buffers, 628 * zero otherwise. 629 */ 630 int ring_buffer_poll_wait(struct ring_buffer *buffer, int cpu, 631 struct file *filp, poll_table *poll_table) 632 { 633 struct ring_buffer_per_cpu *cpu_buffer; 634 struct rb_irq_work *work; 635 636 if (cpu == RING_BUFFER_ALL_CPUS) 637 work = &buffer->irq_work; 638 else { 639 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 640 return -EINVAL; 641 642 cpu_buffer = buffer->buffers[cpu]; 643 work = &cpu_buffer->irq_work; 644 } 645 646 poll_wait(filp, &work->waiters, poll_table); 647 work->waiters_pending = true; 648 /* 649 * There's a tight race between setting the waiters_pending and 650 * checking if the ring buffer is empty. Once the waiters_pending bit 651 * is set, the next event will wake the task up, but we can get stuck 652 * if there's only a single event in. 653 * 654 * FIXME: Ideally, we need a memory barrier on the writer side as well, 655 * but adding a memory barrier to all events will cause too much of a 656 * performance hit in the fast path. We only need a memory barrier when 657 * the buffer goes from empty to having content. But as this race is 658 * extremely small, and it's not a problem if another event comes in, we 659 * will fix it later. 660 */ 661 smp_mb(); 662 663 if ((cpu == RING_BUFFER_ALL_CPUS && !ring_buffer_empty(buffer)) || 664 (cpu != RING_BUFFER_ALL_CPUS && !ring_buffer_empty_cpu(buffer, cpu))) 665 return POLLIN | POLLRDNORM; 666 return 0; 667 } 668 669 /* buffer may be either ring_buffer or ring_buffer_per_cpu */ 670 #define RB_WARN_ON(b, cond) \ 671 ({ \ 672 int _____ret = unlikely(cond); \ 673 if (_____ret) { \ 674 if (__same_type(*(b), struct ring_buffer_per_cpu)) { \ 675 struct ring_buffer_per_cpu *__b = \ 676 (void *)b; \ 677 atomic_inc(&__b->buffer->record_disabled); \ 678 } else \ 679 atomic_inc(&b->record_disabled); \ 680 WARN_ON(1); \ 681 } \ 682 _____ret; \ 683 }) 684 685 /* Up this if you want to test the TIME_EXTENTS and normalization */ 686 #define DEBUG_SHIFT 0 687 688 static inline u64 rb_time_stamp(struct ring_buffer *buffer) 689 { 690 /* shift to debug/test normalization and TIME_EXTENTS */ 691 return buffer->clock() << DEBUG_SHIFT; 692 } 693 694 u64 ring_buffer_time_stamp(struct ring_buffer *buffer, int cpu) 695 { 696 u64 time; 697 698 preempt_disable_notrace(); 699 time = rb_time_stamp(buffer); 700 preempt_enable_no_resched_notrace(); 701 702 return time; 703 } 704 EXPORT_SYMBOL_GPL(ring_buffer_time_stamp); 705 706 void ring_buffer_normalize_time_stamp(struct ring_buffer *buffer, 707 int cpu, u64 *ts) 708 { 709 /* Just stupid testing the normalize function and deltas */ 710 *ts >>= DEBUG_SHIFT; 711 } 712 EXPORT_SYMBOL_GPL(ring_buffer_normalize_time_stamp); 713 714 /* 715 * Making the ring buffer lockless makes things tricky. 716 * Although writes only happen on the CPU that they are on, 717 * and they only need to worry about interrupts. Reads can 718 * happen on any CPU. 719 * 720 * The reader page is always off the ring buffer, but when the 721 * reader finishes with a page, it needs to swap its page with 722 * a new one from the buffer. The reader needs to take from 723 * the head (writes go to the tail). But if a writer is in overwrite 724 * mode and wraps, it must push the head page forward. 725 * 726 * Here lies the problem. 727 * 728 * The reader must be careful to replace only the head page, and 729 * not another one. As described at the top of the file in the 730 * ASCII art, the reader sets its old page to point to the next 731 * page after head. It then sets the page after head to point to 732 * the old reader page. But if the writer moves the head page 733 * during this operation, the reader could end up with the tail. 734 * 735 * We use cmpxchg to help prevent this race. We also do something 736 * special with the page before head. We set the LSB to 1. 737 * 738 * When the writer must push the page forward, it will clear the 739 * bit that points to the head page, move the head, and then set 740 * the bit that points to the new head page. 741 * 742 * We also don't want an interrupt coming in and moving the head 743 * page on another writer. Thus we use the second LSB to catch 744 * that too. Thus: 745 * 746 * head->list->prev->next bit 1 bit 0 747 * ------- ------- 748 * Normal page 0 0 749 * Points to head page 0 1 750 * New head page 1 0 751 * 752 * Note we can not trust the prev pointer of the head page, because: 753 * 754 * +----+ +-----+ +-----+ 755 * | |------>| T |---X--->| N | 756 * | |<------| | | | 757 * +----+ +-----+ +-----+ 758 * ^ ^ | 759 * | +-----+ | | 760 * +----------| R |----------+ | 761 * | |<-----------+ 762 * +-----+ 763 * 764 * Key: ---X--> HEAD flag set in pointer 765 * T Tail page 766 * R Reader page 767 * N Next page 768 * 769 * (see __rb_reserve_next() to see where this happens) 770 * 771 * What the above shows is that the reader just swapped out 772 * the reader page with a page in the buffer, but before it 773 * could make the new header point back to the new page added 774 * it was preempted by a writer. The writer moved forward onto 775 * the new page added by the reader and is about to move forward 776 * again. 777 * 778 * You can see, it is legitimate for the previous pointer of 779 * the head (or any page) not to point back to itself. But only 780 * temporarially. 781 */ 782 783 #define RB_PAGE_NORMAL 0UL 784 #define RB_PAGE_HEAD 1UL 785 #define RB_PAGE_UPDATE 2UL 786 787 788 #define RB_FLAG_MASK 3UL 789 790 /* PAGE_MOVED is not part of the mask */ 791 #define RB_PAGE_MOVED 4UL 792 793 /* 794 * rb_list_head - remove any bit 795 */ 796 static struct list_head *rb_list_head(struct list_head *list) 797 { 798 unsigned long val = (unsigned long)list; 799 800 return (struct list_head *)(val & ~RB_FLAG_MASK); 801 } 802 803 /* 804 * rb_is_head_page - test if the given page is the head page 805 * 806 * Because the reader may move the head_page pointer, we can 807 * not trust what the head page is (it may be pointing to 808 * the reader page). But if the next page is a header page, 809 * its flags will be non zero. 810 */ 811 static inline int 812 rb_is_head_page(struct ring_buffer_per_cpu *cpu_buffer, 813 struct buffer_page *page, struct list_head *list) 814 { 815 unsigned long val; 816 817 val = (unsigned long)list->next; 818 819 if ((val & ~RB_FLAG_MASK) != (unsigned long)&page->list) 820 return RB_PAGE_MOVED; 821 822 return val & RB_FLAG_MASK; 823 } 824 825 /* 826 * rb_is_reader_page 827 * 828 * The unique thing about the reader page, is that, if the 829 * writer is ever on it, the previous pointer never points 830 * back to the reader page. 831 */ 832 static bool rb_is_reader_page(struct buffer_page *page) 833 { 834 struct list_head *list = page->list.prev; 835 836 return rb_list_head(list->next) != &page->list; 837 } 838 839 /* 840 * rb_set_list_to_head - set a list_head to be pointing to head. 841 */ 842 static void rb_set_list_to_head(struct ring_buffer_per_cpu *cpu_buffer, 843 struct list_head *list) 844 { 845 unsigned long *ptr; 846 847 ptr = (unsigned long *)&list->next; 848 *ptr |= RB_PAGE_HEAD; 849 *ptr &= ~RB_PAGE_UPDATE; 850 } 851 852 /* 853 * rb_head_page_activate - sets up head page 854 */ 855 static void rb_head_page_activate(struct ring_buffer_per_cpu *cpu_buffer) 856 { 857 struct buffer_page *head; 858 859 head = cpu_buffer->head_page; 860 if (!head) 861 return; 862 863 /* 864 * Set the previous list pointer to have the HEAD flag. 865 */ 866 rb_set_list_to_head(cpu_buffer, head->list.prev); 867 } 868 869 static void rb_list_head_clear(struct list_head *list) 870 { 871 unsigned long *ptr = (unsigned long *)&list->next; 872 873 *ptr &= ~RB_FLAG_MASK; 874 } 875 876 /* 877 * rb_head_page_dactivate - clears head page ptr (for free list) 878 */ 879 static void 880 rb_head_page_deactivate(struct ring_buffer_per_cpu *cpu_buffer) 881 { 882 struct list_head *hd; 883 884 /* Go through the whole list and clear any pointers found. */ 885 rb_list_head_clear(cpu_buffer->pages); 886 887 list_for_each(hd, cpu_buffer->pages) 888 rb_list_head_clear(hd); 889 } 890 891 static int rb_head_page_set(struct ring_buffer_per_cpu *cpu_buffer, 892 struct buffer_page *head, 893 struct buffer_page *prev, 894 int old_flag, int new_flag) 895 { 896 struct list_head *list; 897 unsigned long val = (unsigned long)&head->list; 898 unsigned long ret; 899 900 list = &prev->list; 901 902 val &= ~RB_FLAG_MASK; 903 904 ret = cmpxchg((unsigned long *)&list->next, 905 val | old_flag, val | new_flag); 906 907 /* check if the reader took the page */ 908 if ((ret & ~RB_FLAG_MASK) != val) 909 return RB_PAGE_MOVED; 910 911 return ret & RB_FLAG_MASK; 912 } 913 914 static int rb_head_page_set_update(struct ring_buffer_per_cpu *cpu_buffer, 915 struct buffer_page *head, 916 struct buffer_page *prev, 917 int old_flag) 918 { 919 return rb_head_page_set(cpu_buffer, head, prev, 920 old_flag, RB_PAGE_UPDATE); 921 } 922 923 static int rb_head_page_set_head(struct ring_buffer_per_cpu *cpu_buffer, 924 struct buffer_page *head, 925 struct buffer_page *prev, 926 int old_flag) 927 { 928 return rb_head_page_set(cpu_buffer, head, prev, 929 old_flag, RB_PAGE_HEAD); 930 } 931 932 static int rb_head_page_set_normal(struct ring_buffer_per_cpu *cpu_buffer, 933 struct buffer_page *head, 934 struct buffer_page *prev, 935 int old_flag) 936 { 937 return rb_head_page_set(cpu_buffer, head, prev, 938 old_flag, RB_PAGE_NORMAL); 939 } 940 941 static inline void rb_inc_page(struct ring_buffer_per_cpu *cpu_buffer, 942 struct buffer_page **bpage) 943 { 944 struct list_head *p = rb_list_head((*bpage)->list.next); 945 946 *bpage = list_entry(p, struct buffer_page, list); 947 } 948 949 static struct buffer_page * 950 rb_set_head_page(struct ring_buffer_per_cpu *cpu_buffer) 951 { 952 struct buffer_page *head; 953 struct buffer_page *page; 954 struct list_head *list; 955 int i; 956 957 if (RB_WARN_ON(cpu_buffer, !cpu_buffer->head_page)) 958 return NULL; 959 960 /* sanity check */ 961 list = cpu_buffer->pages; 962 if (RB_WARN_ON(cpu_buffer, rb_list_head(list->prev->next) != list)) 963 return NULL; 964 965 page = head = cpu_buffer->head_page; 966 /* 967 * It is possible that the writer moves the header behind 968 * where we started, and we miss in one loop. 969 * A second loop should grab the header, but we'll do 970 * three loops just because I'm paranoid. 971 */ 972 for (i = 0; i < 3; i++) { 973 do { 974 if (rb_is_head_page(cpu_buffer, page, page->list.prev)) { 975 cpu_buffer->head_page = page; 976 return page; 977 } 978 rb_inc_page(cpu_buffer, &page); 979 } while (page != head); 980 } 981 982 RB_WARN_ON(cpu_buffer, 1); 983 984 return NULL; 985 } 986 987 static int rb_head_page_replace(struct buffer_page *old, 988 struct buffer_page *new) 989 { 990 unsigned long *ptr = (unsigned long *)&old->list.prev->next; 991 unsigned long val; 992 unsigned long ret; 993 994 val = *ptr & ~RB_FLAG_MASK; 995 val |= RB_PAGE_HEAD; 996 997 ret = cmpxchg(ptr, val, (unsigned long)&new->list); 998 999 return ret == val; 1000 } 1001 1002 /* 1003 * rb_tail_page_update - move the tail page forward 1004 */ 1005 static void rb_tail_page_update(struct ring_buffer_per_cpu *cpu_buffer, 1006 struct buffer_page *tail_page, 1007 struct buffer_page *next_page) 1008 { 1009 unsigned long old_entries; 1010 unsigned long old_write; 1011 1012 /* 1013 * The tail page now needs to be moved forward. 1014 * 1015 * We need to reset the tail page, but without messing 1016 * with possible erasing of data brought in by interrupts 1017 * that have moved the tail page and are currently on it. 1018 * 1019 * We add a counter to the write field to denote this. 1020 */ 1021 old_write = local_add_return(RB_WRITE_INTCNT, &next_page->write); 1022 old_entries = local_add_return(RB_WRITE_INTCNT, &next_page->entries); 1023 1024 /* 1025 * Just make sure we have seen our old_write and synchronize 1026 * with any interrupts that come in. 1027 */ 1028 barrier(); 1029 1030 /* 1031 * If the tail page is still the same as what we think 1032 * it is, then it is up to us to update the tail 1033 * pointer. 1034 */ 1035 if (tail_page == READ_ONCE(cpu_buffer->tail_page)) { 1036 /* Zero the write counter */ 1037 unsigned long val = old_write & ~RB_WRITE_MASK; 1038 unsigned long eval = old_entries & ~RB_WRITE_MASK; 1039 1040 /* 1041 * This will only succeed if an interrupt did 1042 * not come in and change it. In which case, we 1043 * do not want to modify it. 1044 * 1045 * We add (void) to let the compiler know that we do not care 1046 * about the return value of these functions. We use the 1047 * cmpxchg to only update if an interrupt did not already 1048 * do it for us. If the cmpxchg fails, we don't care. 1049 */ 1050 (void)local_cmpxchg(&next_page->write, old_write, val); 1051 (void)local_cmpxchg(&next_page->entries, old_entries, eval); 1052 1053 /* 1054 * No need to worry about races with clearing out the commit. 1055 * it only can increment when a commit takes place. But that 1056 * only happens in the outer most nested commit. 1057 */ 1058 local_set(&next_page->page->commit, 0); 1059 1060 /* Again, either we update tail_page or an interrupt does */ 1061 (void)cmpxchg(&cpu_buffer->tail_page, tail_page, next_page); 1062 } 1063 } 1064 1065 static int rb_check_bpage(struct ring_buffer_per_cpu *cpu_buffer, 1066 struct buffer_page *bpage) 1067 { 1068 unsigned long val = (unsigned long)bpage; 1069 1070 if (RB_WARN_ON(cpu_buffer, val & RB_FLAG_MASK)) 1071 return 1; 1072 1073 return 0; 1074 } 1075 1076 /** 1077 * rb_check_list - make sure a pointer to a list has the last bits zero 1078 */ 1079 static int rb_check_list(struct ring_buffer_per_cpu *cpu_buffer, 1080 struct list_head *list) 1081 { 1082 if (RB_WARN_ON(cpu_buffer, rb_list_head(list->prev) != list->prev)) 1083 return 1; 1084 if (RB_WARN_ON(cpu_buffer, rb_list_head(list->next) != list->next)) 1085 return 1; 1086 return 0; 1087 } 1088 1089 /** 1090 * rb_check_pages - integrity check of buffer pages 1091 * @cpu_buffer: CPU buffer with pages to test 1092 * 1093 * As a safety measure we check to make sure the data pages have not 1094 * been corrupted. 1095 */ 1096 static int rb_check_pages(struct ring_buffer_per_cpu *cpu_buffer) 1097 { 1098 struct list_head *head = cpu_buffer->pages; 1099 struct buffer_page *bpage, *tmp; 1100 1101 /* Reset the head page if it exists */ 1102 if (cpu_buffer->head_page) 1103 rb_set_head_page(cpu_buffer); 1104 1105 rb_head_page_deactivate(cpu_buffer); 1106 1107 if (RB_WARN_ON(cpu_buffer, head->next->prev != head)) 1108 return -1; 1109 if (RB_WARN_ON(cpu_buffer, head->prev->next != head)) 1110 return -1; 1111 1112 if (rb_check_list(cpu_buffer, head)) 1113 return -1; 1114 1115 list_for_each_entry_safe(bpage, tmp, head, list) { 1116 if (RB_WARN_ON(cpu_buffer, 1117 bpage->list.next->prev != &bpage->list)) 1118 return -1; 1119 if (RB_WARN_ON(cpu_buffer, 1120 bpage->list.prev->next != &bpage->list)) 1121 return -1; 1122 if (rb_check_list(cpu_buffer, &bpage->list)) 1123 return -1; 1124 } 1125 1126 rb_head_page_activate(cpu_buffer); 1127 1128 return 0; 1129 } 1130 1131 static int __rb_allocate_pages(long nr_pages, struct list_head *pages, int cpu) 1132 { 1133 struct buffer_page *bpage, *tmp; 1134 long i; 1135 1136 for (i = 0; i < nr_pages; i++) { 1137 struct page *page; 1138 /* 1139 * __GFP_NORETRY flag makes sure that the allocation fails 1140 * gracefully without invoking oom-killer and the system is 1141 * not destabilized. 1142 */ 1143 bpage = kzalloc_node(ALIGN(sizeof(*bpage), cache_line_size()), 1144 GFP_KERNEL | __GFP_NORETRY, 1145 cpu_to_node(cpu)); 1146 if (!bpage) 1147 goto free_pages; 1148 1149 list_add(&bpage->list, pages); 1150 1151 page = alloc_pages_node(cpu_to_node(cpu), 1152 GFP_KERNEL | __GFP_NORETRY, 0); 1153 if (!page) 1154 goto free_pages; 1155 bpage->page = page_address(page); 1156 rb_init_page(bpage->page); 1157 } 1158 1159 return 0; 1160 1161 free_pages: 1162 list_for_each_entry_safe(bpage, tmp, pages, list) { 1163 list_del_init(&bpage->list); 1164 free_buffer_page(bpage); 1165 } 1166 1167 return -ENOMEM; 1168 } 1169 1170 static int rb_allocate_pages(struct ring_buffer_per_cpu *cpu_buffer, 1171 unsigned long nr_pages) 1172 { 1173 LIST_HEAD(pages); 1174 1175 WARN_ON(!nr_pages); 1176 1177 if (__rb_allocate_pages(nr_pages, &pages, cpu_buffer->cpu)) 1178 return -ENOMEM; 1179 1180 /* 1181 * The ring buffer page list is a circular list that does not 1182 * start and end with a list head. All page list items point to 1183 * other pages. 1184 */ 1185 cpu_buffer->pages = pages.next; 1186 list_del(&pages); 1187 1188 cpu_buffer->nr_pages = nr_pages; 1189 1190 rb_check_pages(cpu_buffer); 1191 1192 return 0; 1193 } 1194 1195 static struct ring_buffer_per_cpu * 1196 rb_allocate_cpu_buffer(struct ring_buffer *buffer, long nr_pages, int cpu) 1197 { 1198 struct ring_buffer_per_cpu *cpu_buffer; 1199 struct buffer_page *bpage; 1200 struct page *page; 1201 int ret; 1202 1203 cpu_buffer = kzalloc_node(ALIGN(sizeof(*cpu_buffer), cache_line_size()), 1204 GFP_KERNEL, cpu_to_node(cpu)); 1205 if (!cpu_buffer) 1206 return NULL; 1207 1208 cpu_buffer->cpu = cpu; 1209 cpu_buffer->buffer = buffer; 1210 raw_spin_lock_init(&cpu_buffer->reader_lock); 1211 lockdep_set_class(&cpu_buffer->reader_lock, buffer->reader_lock_key); 1212 cpu_buffer->lock = (arch_spinlock_t)__ARCH_SPIN_LOCK_UNLOCKED; 1213 INIT_WORK(&cpu_buffer->update_pages_work, update_pages_handler); 1214 init_completion(&cpu_buffer->update_done); 1215 init_irq_work(&cpu_buffer->irq_work.work, rb_wake_up_waiters); 1216 init_waitqueue_head(&cpu_buffer->irq_work.waiters); 1217 init_waitqueue_head(&cpu_buffer->irq_work.full_waiters); 1218 1219 bpage = kzalloc_node(ALIGN(sizeof(*bpage), cache_line_size()), 1220 GFP_KERNEL, cpu_to_node(cpu)); 1221 if (!bpage) 1222 goto fail_free_buffer; 1223 1224 rb_check_bpage(cpu_buffer, bpage); 1225 1226 cpu_buffer->reader_page = bpage; 1227 page = alloc_pages_node(cpu_to_node(cpu), GFP_KERNEL, 0); 1228 if (!page) 1229 goto fail_free_reader; 1230 bpage->page = page_address(page); 1231 rb_init_page(bpage->page); 1232 1233 INIT_LIST_HEAD(&cpu_buffer->reader_page->list); 1234 INIT_LIST_HEAD(&cpu_buffer->new_pages); 1235 1236 ret = rb_allocate_pages(cpu_buffer, nr_pages); 1237 if (ret < 0) 1238 goto fail_free_reader; 1239 1240 cpu_buffer->head_page 1241 = list_entry(cpu_buffer->pages, struct buffer_page, list); 1242 cpu_buffer->tail_page = cpu_buffer->commit_page = cpu_buffer->head_page; 1243 1244 rb_head_page_activate(cpu_buffer); 1245 1246 return cpu_buffer; 1247 1248 fail_free_reader: 1249 free_buffer_page(cpu_buffer->reader_page); 1250 1251 fail_free_buffer: 1252 kfree(cpu_buffer); 1253 return NULL; 1254 } 1255 1256 static void rb_free_cpu_buffer(struct ring_buffer_per_cpu *cpu_buffer) 1257 { 1258 struct list_head *head = cpu_buffer->pages; 1259 struct buffer_page *bpage, *tmp; 1260 1261 free_buffer_page(cpu_buffer->reader_page); 1262 1263 rb_head_page_deactivate(cpu_buffer); 1264 1265 if (head) { 1266 list_for_each_entry_safe(bpage, tmp, head, list) { 1267 list_del_init(&bpage->list); 1268 free_buffer_page(bpage); 1269 } 1270 bpage = list_entry(head, struct buffer_page, list); 1271 free_buffer_page(bpage); 1272 } 1273 1274 kfree(cpu_buffer); 1275 } 1276 1277 /** 1278 * __ring_buffer_alloc - allocate a new ring_buffer 1279 * @size: the size in bytes per cpu that is needed. 1280 * @flags: attributes to set for the ring buffer. 1281 * 1282 * Currently the only flag that is available is the RB_FL_OVERWRITE 1283 * flag. This flag means that the buffer will overwrite old data 1284 * when the buffer wraps. If this flag is not set, the buffer will 1285 * drop data when the tail hits the head. 1286 */ 1287 struct ring_buffer *__ring_buffer_alloc(unsigned long size, unsigned flags, 1288 struct lock_class_key *key) 1289 { 1290 struct ring_buffer *buffer; 1291 long nr_pages; 1292 int bsize; 1293 int cpu; 1294 int ret; 1295 1296 /* keep it in its own cache line */ 1297 buffer = kzalloc(ALIGN(sizeof(*buffer), cache_line_size()), 1298 GFP_KERNEL); 1299 if (!buffer) 1300 return NULL; 1301 1302 if (!zalloc_cpumask_var(&buffer->cpumask, GFP_KERNEL)) 1303 goto fail_free_buffer; 1304 1305 nr_pages = DIV_ROUND_UP(size, BUF_PAGE_SIZE); 1306 buffer->flags = flags; 1307 buffer->clock = trace_clock_local; 1308 buffer->reader_lock_key = key; 1309 1310 init_irq_work(&buffer->irq_work.work, rb_wake_up_waiters); 1311 init_waitqueue_head(&buffer->irq_work.waiters); 1312 1313 /* need at least two pages */ 1314 if (nr_pages < 2) 1315 nr_pages = 2; 1316 1317 buffer->cpus = nr_cpu_ids; 1318 1319 bsize = sizeof(void *) * nr_cpu_ids; 1320 buffer->buffers = kzalloc(ALIGN(bsize, cache_line_size()), 1321 GFP_KERNEL); 1322 if (!buffer->buffers) 1323 goto fail_free_cpumask; 1324 1325 cpu = raw_smp_processor_id(); 1326 cpumask_set_cpu(cpu, buffer->cpumask); 1327 buffer->buffers[cpu] = rb_allocate_cpu_buffer(buffer, nr_pages, cpu); 1328 if (!buffer->buffers[cpu]) 1329 goto fail_free_buffers; 1330 1331 ret = cpuhp_state_add_instance(CPUHP_TRACE_RB_PREPARE, &buffer->node); 1332 if (ret < 0) 1333 goto fail_free_buffers; 1334 1335 mutex_init(&buffer->mutex); 1336 1337 return buffer; 1338 1339 fail_free_buffers: 1340 for_each_buffer_cpu(buffer, cpu) { 1341 if (buffer->buffers[cpu]) 1342 rb_free_cpu_buffer(buffer->buffers[cpu]); 1343 } 1344 kfree(buffer->buffers); 1345 1346 fail_free_cpumask: 1347 free_cpumask_var(buffer->cpumask); 1348 1349 fail_free_buffer: 1350 kfree(buffer); 1351 return NULL; 1352 } 1353 EXPORT_SYMBOL_GPL(__ring_buffer_alloc); 1354 1355 /** 1356 * ring_buffer_free - free a ring buffer. 1357 * @buffer: the buffer to free. 1358 */ 1359 void 1360 ring_buffer_free(struct ring_buffer *buffer) 1361 { 1362 int cpu; 1363 1364 cpuhp_state_remove_instance(CPUHP_TRACE_RB_PREPARE, &buffer->node); 1365 1366 for_each_buffer_cpu(buffer, cpu) 1367 rb_free_cpu_buffer(buffer->buffers[cpu]); 1368 1369 kfree(buffer->buffers); 1370 free_cpumask_var(buffer->cpumask); 1371 1372 kfree(buffer); 1373 } 1374 EXPORT_SYMBOL_GPL(ring_buffer_free); 1375 1376 void ring_buffer_set_clock(struct ring_buffer *buffer, 1377 u64 (*clock)(void)) 1378 { 1379 buffer->clock = clock; 1380 } 1381 1382 static void rb_reset_cpu(struct ring_buffer_per_cpu *cpu_buffer); 1383 1384 static inline unsigned long rb_page_entries(struct buffer_page *bpage) 1385 { 1386 return local_read(&bpage->entries) & RB_WRITE_MASK; 1387 } 1388 1389 static inline unsigned long rb_page_write(struct buffer_page *bpage) 1390 { 1391 return local_read(&bpage->write) & RB_WRITE_MASK; 1392 } 1393 1394 static int 1395 rb_remove_pages(struct ring_buffer_per_cpu *cpu_buffer, unsigned long nr_pages) 1396 { 1397 struct list_head *tail_page, *to_remove, *next_page; 1398 struct buffer_page *to_remove_page, *tmp_iter_page; 1399 struct buffer_page *last_page, *first_page; 1400 unsigned long nr_removed; 1401 unsigned long head_bit; 1402 int page_entries; 1403 1404 head_bit = 0; 1405 1406 raw_spin_lock_irq(&cpu_buffer->reader_lock); 1407 atomic_inc(&cpu_buffer->record_disabled); 1408 /* 1409 * We don't race with the readers since we have acquired the reader 1410 * lock. We also don't race with writers after disabling recording. 1411 * This makes it easy to figure out the first and the last page to be 1412 * removed from the list. We unlink all the pages in between including 1413 * the first and last pages. This is done in a busy loop so that we 1414 * lose the least number of traces. 1415 * The pages are freed after we restart recording and unlock readers. 1416 */ 1417 tail_page = &cpu_buffer->tail_page->list; 1418 1419 /* 1420 * tail page might be on reader page, we remove the next page 1421 * from the ring buffer 1422 */ 1423 if (cpu_buffer->tail_page == cpu_buffer->reader_page) 1424 tail_page = rb_list_head(tail_page->next); 1425 to_remove = tail_page; 1426 1427 /* start of pages to remove */ 1428 first_page = list_entry(rb_list_head(to_remove->next), 1429 struct buffer_page, list); 1430 1431 for (nr_removed = 0; nr_removed < nr_pages; nr_removed++) { 1432 to_remove = rb_list_head(to_remove)->next; 1433 head_bit |= (unsigned long)to_remove & RB_PAGE_HEAD; 1434 } 1435 1436 next_page = rb_list_head(to_remove)->next; 1437 1438 /* 1439 * Now we remove all pages between tail_page and next_page. 1440 * Make sure that we have head_bit value preserved for the 1441 * next page 1442 */ 1443 tail_page->next = (struct list_head *)((unsigned long)next_page | 1444 head_bit); 1445 next_page = rb_list_head(next_page); 1446 next_page->prev = tail_page; 1447 1448 /* make sure pages points to a valid page in the ring buffer */ 1449 cpu_buffer->pages = next_page; 1450 1451 /* update head page */ 1452 if (head_bit) 1453 cpu_buffer->head_page = list_entry(next_page, 1454 struct buffer_page, list); 1455 1456 /* 1457 * change read pointer to make sure any read iterators reset 1458 * themselves 1459 */ 1460 cpu_buffer->read = 0; 1461 1462 /* pages are removed, resume tracing and then free the pages */ 1463 atomic_dec(&cpu_buffer->record_disabled); 1464 raw_spin_unlock_irq(&cpu_buffer->reader_lock); 1465 1466 RB_WARN_ON(cpu_buffer, list_empty(cpu_buffer->pages)); 1467 1468 /* last buffer page to remove */ 1469 last_page = list_entry(rb_list_head(to_remove), struct buffer_page, 1470 list); 1471 tmp_iter_page = first_page; 1472 1473 do { 1474 to_remove_page = tmp_iter_page; 1475 rb_inc_page(cpu_buffer, &tmp_iter_page); 1476 1477 /* update the counters */ 1478 page_entries = rb_page_entries(to_remove_page); 1479 if (page_entries) { 1480 /* 1481 * If something was added to this page, it was full 1482 * since it is not the tail page. So we deduct the 1483 * bytes consumed in ring buffer from here. 1484 * Increment overrun to account for the lost events. 1485 */ 1486 local_add(page_entries, &cpu_buffer->overrun); 1487 local_sub(BUF_PAGE_SIZE, &cpu_buffer->entries_bytes); 1488 } 1489 1490 /* 1491 * We have already removed references to this list item, just 1492 * free up the buffer_page and its page 1493 */ 1494 free_buffer_page(to_remove_page); 1495 nr_removed--; 1496 1497 } while (to_remove_page != last_page); 1498 1499 RB_WARN_ON(cpu_buffer, nr_removed); 1500 1501 return nr_removed == 0; 1502 } 1503 1504 static int 1505 rb_insert_pages(struct ring_buffer_per_cpu *cpu_buffer) 1506 { 1507 struct list_head *pages = &cpu_buffer->new_pages; 1508 int retries, success; 1509 1510 raw_spin_lock_irq(&cpu_buffer->reader_lock); 1511 /* 1512 * We are holding the reader lock, so the reader page won't be swapped 1513 * in the ring buffer. Now we are racing with the writer trying to 1514 * move head page and the tail page. 1515 * We are going to adapt the reader page update process where: 1516 * 1. We first splice the start and end of list of new pages between 1517 * the head page and its previous page. 1518 * 2. We cmpxchg the prev_page->next to point from head page to the 1519 * start of new pages list. 1520 * 3. Finally, we update the head->prev to the end of new list. 1521 * 1522 * We will try this process 10 times, to make sure that we don't keep 1523 * spinning. 1524 */ 1525 retries = 10; 1526 success = 0; 1527 while (retries--) { 1528 struct list_head *head_page, *prev_page, *r; 1529 struct list_head *last_page, *first_page; 1530 struct list_head *head_page_with_bit; 1531 1532 head_page = &rb_set_head_page(cpu_buffer)->list; 1533 if (!head_page) 1534 break; 1535 prev_page = head_page->prev; 1536 1537 first_page = pages->next; 1538 last_page = pages->prev; 1539 1540 head_page_with_bit = (struct list_head *) 1541 ((unsigned long)head_page | RB_PAGE_HEAD); 1542 1543 last_page->next = head_page_with_bit; 1544 first_page->prev = prev_page; 1545 1546 r = cmpxchg(&prev_page->next, head_page_with_bit, first_page); 1547 1548 if (r == head_page_with_bit) { 1549 /* 1550 * yay, we replaced the page pointer to our new list, 1551 * now, we just have to update to head page's prev 1552 * pointer to point to end of list 1553 */ 1554 head_page->prev = last_page; 1555 success = 1; 1556 break; 1557 } 1558 } 1559 1560 if (success) 1561 INIT_LIST_HEAD(pages); 1562 /* 1563 * If we weren't successful in adding in new pages, warn and stop 1564 * tracing 1565 */ 1566 RB_WARN_ON(cpu_buffer, !success); 1567 raw_spin_unlock_irq(&cpu_buffer->reader_lock); 1568 1569 /* free pages if they weren't inserted */ 1570 if (!success) { 1571 struct buffer_page *bpage, *tmp; 1572 list_for_each_entry_safe(bpage, tmp, &cpu_buffer->new_pages, 1573 list) { 1574 list_del_init(&bpage->list); 1575 free_buffer_page(bpage); 1576 } 1577 } 1578 return success; 1579 } 1580 1581 static void rb_update_pages(struct ring_buffer_per_cpu *cpu_buffer) 1582 { 1583 int success; 1584 1585 if (cpu_buffer->nr_pages_to_update > 0) 1586 success = rb_insert_pages(cpu_buffer); 1587 else 1588 success = rb_remove_pages(cpu_buffer, 1589 -cpu_buffer->nr_pages_to_update); 1590 1591 if (success) 1592 cpu_buffer->nr_pages += cpu_buffer->nr_pages_to_update; 1593 } 1594 1595 static void update_pages_handler(struct work_struct *work) 1596 { 1597 struct ring_buffer_per_cpu *cpu_buffer = container_of(work, 1598 struct ring_buffer_per_cpu, update_pages_work); 1599 rb_update_pages(cpu_buffer); 1600 complete(&cpu_buffer->update_done); 1601 } 1602 1603 /** 1604 * ring_buffer_resize - resize the ring buffer 1605 * @buffer: the buffer to resize. 1606 * @size: the new size. 1607 * @cpu_id: the cpu buffer to resize 1608 * 1609 * Minimum size is 2 * BUF_PAGE_SIZE. 1610 * 1611 * Returns 0 on success and < 0 on failure. 1612 */ 1613 int ring_buffer_resize(struct ring_buffer *buffer, unsigned long size, 1614 int cpu_id) 1615 { 1616 struct ring_buffer_per_cpu *cpu_buffer; 1617 unsigned long nr_pages; 1618 int cpu, err = 0; 1619 1620 /* 1621 * Always succeed at resizing a non-existent buffer: 1622 */ 1623 if (!buffer) 1624 return size; 1625 1626 /* Make sure the requested buffer exists */ 1627 if (cpu_id != RING_BUFFER_ALL_CPUS && 1628 !cpumask_test_cpu(cpu_id, buffer->cpumask)) 1629 return size; 1630 1631 nr_pages = DIV_ROUND_UP(size, BUF_PAGE_SIZE); 1632 1633 /* we need a minimum of two pages */ 1634 if (nr_pages < 2) 1635 nr_pages = 2; 1636 1637 size = nr_pages * BUF_PAGE_SIZE; 1638 1639 /* 1640 * Don't succeed if resizing is disabled, as a reader might be 1641 * manipulating the ring buffer and is expecting a sane state while 1642 * this is true. 1643 */ 1644 if (atomic_read(&buffer->resize_disabled)) 1645 return -EBUSY; 1646 1647 /* prevent another thread from changing buffer sizes */ 1648 mutex_lock(&buffer->mutex); 1649 1650 if (cpu_id == RING_BUFFER_ALL_CPUS) { 1651 /* calculate the pages to update */ 1652 for_each_buffer_cpu(buffer, cpu) { 1653 cpu_buffer = buffer->buffers[cpu]; 1654 1655 cpu_buffer->nr_pages_to_update = nr_pages - 1656 cpu_buffer->nr_pages; 1657 /* 1658 * nothing more to do for removing pages or no update 1659 */ 1660 if (cpu_buffer->nr_pages_to_update <= 0) 1661 continue; 1662 /* 1663 * to add pages, make sure all new pages can be 1664 * allocated without receiving ENOMEM 1665 */ 1666 INIT_LIST_HEAD(&cpu_buffer->new_pages); 1667 if (__rb_allocate_pages(cpu_buffer->nr_pages_to_update, 1668 &cpu_buffer->new_pages, cpu)) { 1669 /* not enough memory for new pages */ 1670 err = -ENOMEM; 1671 goto out_err; 1672 } 1673 } 1674 1675 get_online_cpus(); 1676 /* 1677 * Fire off all the required work handlers 1678 * We can't schedule on offline CPUs, but it's not necessary 1679 * since we can change their buffer sizes without any race. 1680 */ 1681 for_each_buffer_cpu(buffer, cpu) { 1682 cpu_buffer = buffer->buffers[cpu]; 1683 if (!cpu_buffer->nr_pages_to_update) 1684 continue; 1685 1686 /* Can't run something on an offline CPU. */ 1687 if (!cpu_online(cpu)) { 1688 rb_update_pages(cpu_buffer); 1689 cpu_buffer->nr_pages_to_update = 0; 1690 } else { 1691 schedule_work_on(cpu, 1692 &cpu_buffer->update_pages_work); 1693 } 1694 } 1695 1696 /* wait for all the updates to complete */ 1697 for_each_buffer_cpu(buffer, cpu) { 1698 cpu_buffer = buffer->buffers[cpu]; 1699 if (!cpu_buffer->nr_pages_to_update) 1700 continue; 1701 1702 if (cpu_online(cpu)) 1703 wait_for_completion(&cpu_buffer->update_done); 1704 cpu_buffer->nr_pages_to_update = 0; 1705 } 1706 1707 put_online_cpus(); 1708 } else { 1709 /* Make sure this CPU has been intitialized */ 1710 if (!cpumask_test_cpu(cpu_id, buffer->cpumask)) 1711 goto out; 1712 1713 cpu_buffer = buffer->buffers[cpu_id]; 1714 1715 if (nr_pages == cpu_buffer->nr_pages) 1716 goto out; 1717 1718 cpu_buffer->nr_pages_to_update = nr_pages - 1719 cpu_buffer->nr_pages; 1720 1721 INIT_LIST_HEAD(&cpu_buffer->new_pages); 1722 if (cpu_buffer->nr_pages_to_update > 0 && 1723 __rb_allocate_pages(cpu_buffer->nr_pages_to_update, 1724 &cpu_buffer->new_pages, cpu_id)) { 1725 err = -ENOMEM; 1726 goto out_err; 1727 } 1728 1729 get_online_cpus(); 1730 1731 /* Can't run something on an offline CPU. */ 1732 if (!cpu_online(cpu_id)) 1733 rb_update_pages(cpu_buffer); 1734 else { 1735 schedule_work_on(cpu_id, 1736 &cpu_buffer->update_pages_work); 1737 wait_for_completion(&cpu_buffer->update_done); 1738 } 1739 1740 cpu_buffer->nr_pages_to_update = 0; 1741 put_online_cpus(); 1742 } 1743 1744 out: 1745 /* 1746 * The ring buffer resize can happen with the ring buffer 1747 * enabled, so that the update disturbs the tracing as little 1748 * as possible. But if the buffer is disabled, we do not need 1749 * to worry about that, and we can take the time to verify 1750 * that the buffer is not corrupt. 1751 */ 1752 if (atomic_read(&buffer->record_disabled)) { 1753 atomic_inc(&buffer->record_disabled); 1754 /* 1755 * Even though the buffer was disabled, we must make sure 1756 * that it is truly disabled before calling rb_check_pages. 1757 * There could have been a race between checking 1758 * record_disable and incrementing it. 1759 */ 1760 synchronize_sched(); 1761 for_each_buffer_cpu(buffer, cpu) { 1762 cpu_buffer = buffer->buffers[cpu]; 1763 rb_check_pages(cpu_buffer); 1764 } 1765 atomic_dec(&buffer->record_disabled); 1766 } 1767 1768 mutex_unlock(&buffer->mutex); 1769 return size; 1770 1771 out_err: 1772 for_each_buffer_cpu(buffer, cpu) { 1773 struct buffer_page *bpage, *tmp; 1774 1775 cpu_buffer = buffer->buffers[cpu]; 1776 cpu_buffer->nr_pages_to_update = 0; 1777 1778 if (list_empty(&cpu_buffer->new_pages)) 1779 continue; 1780 1781 list_for_each_entry_safe(bpage, tmp, &cpu_buffer->new_pages, 1782 list) { 1783 list_del_init(&bpage->list); 1784 free_buffer_page(bpage); 1785 } 1786 } 1787 mutex_unlock(&buffer->mutex); 1788 return err; 1789 } 1790 EXPORT_SYMBOL_GPL(ring_buffer_resize); 1791 1792 void ring_buffer_change_overwrite(struct ring_buffer *buffer, int val) 1793 { 1794 mutex_lock(&buffer->mutex); 1795 if (val) 1796 buffer->flags |= RB_FL_OVERWRITE; 1797 else 1798 buffer->flags &= ~RB_FL_OVERWRITE; 1799 mutex_unlock(&buffer->mutex); 1800 } 1801 EXPORT_SYMBOL_GPL(ring_buffer_change_overwrite); 1802 1803 static __always_inline void * 1804 __rb_data_page_index(struct buffer_data_page *bpage, unsigned index) 1805 { 1806 return bpage->data + index; 1807 } 1808 1809 static __always_inline void *__rb_page_index(struct buffer_page *bpage, unsigned index) 1810 { 1811 return bpage->page->data + index; 1812 } 1813 1814 static __always_inline struct ring_buffer_event * 1815 rb_reader_event(struct ring_buffer_per_cpu *cpu_buffer) 1816 { 1817 return __rb_page_index(cpu_buffer->reader_page, 1818 cpu_buffer->reader_page->read); 1819 } 1820 1821 static __always_inline struct ring_buffer_event * 1822 rb_iter_head_event(struct ring_buffer_iter *iter) 1823 { 1824 return __rb_page_index(iter->head_page, iter->head); 1825 } 1826 1827 static __always_inline unsigned rb_page_commit(struct buffer_page *bpage) 1828 { 1829 return local_read(&bpage->page->commit); 1830 } 1831 1832 /* Size is determined by what has been committed */ 1833 static __always_inline unsigned rb_page_size(struct buffer_page *bpage) 1834 { 1835 return rb_page_commit(bpage); 1836 } 1837 1838 static __always_inline unsigned 1839 rb_commit_index(struct ring_buffer_per_cpu *cpu_buffer) 1840 { 1841 return rb_page_commit(cpu_buffer->commit_page); 1842 } 1843 1844 static __always_inline unsigned 1845 rb_event_index(struct ring_buffer_event *event) 1846 { 1847 unsigned long addr = (unsigned long)event; 1848 1849 return (addr & ~PAGE_MASK) - BUF_PAGE_HDR_SIZE; 1850 } 1851 1852 static void rb_inc_iter(struct ring_buffer_iter *iter) 1853 { 1854 struct ring_buffer_per_cpu *cpu_buffer = iter->cpu_buffer; 1855 1856 /* 1857 * The iterator could be on the reader page (it starts there). 1858 * But the head could have moved, since the reader was 1859 * found. Check for this case and assign the iterator 1860 * to the head page instead of next. 1861 */ 1862 if (iter->head_page == cpu_buffer->reader_page) 1863 iter->head_page = rb_set_head_page(cpu_buffer); 1864 else 1865 rb_inc_page(cpu_buffer, &iter->head_page); 1866 1867 iter->read_stamp = iter->head_page->page->time_stamp; 1868 iter->head = 0; 1869 } 1870 1871 /* 1872 * rb_handle_head_page - writer hit the head page 1873 * 1874 * Returns: +1 to retry page 1875 * 0 to continue 1876 * -1 on error 1877 */ 1878 static int 1879 rb_handle_head_page(struct ring_buffer_per_cpu *cpu_buffer, 1880 struct buffer_page *tail_page, 1881 struct buffer_page *next_page) 1882 { 1883 struct buffer_page *new_head; 1884 int entries; 1885 int type; 1886 int ret; 1887 1888 entries = rb_page_entries(next_page); 1889 1890 /* 1891 * The hard part is here. We need to move the head 1892 * forward, and protect against both readers on 1893 * other CPUs and writers coming in via interrupts. 1894 */ 1895 type = rb_head_page_set_update(cpu_buffer, next_page, tail_page, 1896 RB_PAGE_HEAD); 1897 1898 /* 1899 * type can be one of four: 1900 * NORMAL - an interrupt already moved it for us 1901 * HEAD - we are the first to get here. 1902 * UPDATE - we are the interrupt interrupting 1903 * a current move. 1904 * MOVED - a reader on another CPU moved the next 1905 * pointer to its reader page. Give up 1906 * and try again. 1907 */ 1908 1909 switch (type) { 1910 case RB_PAGE_HEAD: 1911 /* 1912 * We changed the head to UPDATE, thus 1913 * it is our responsibility to update 1914 * the counters. 1915 */ 1916 local_add(entries, &cpu_buffer->overrun); 1917 local_sub(BUF_PAGE_SIZE, &cpu_buffer->entries_bytes); 1918 1919 /* 1920 * The entries will be zeroed out when we move the 1921 * tail page. 1922 */ 1923 1924 /* still more to do */ 1925 break; 1926 1927 case RB_PAGE_UPDATE: 1928 /* 1929 * This is an interrupt that interrupt the 1930 * previous update. Still more to do. 1931 */ 1932 break; 1933 case RB_PAGE_NORMAL: 1934 /* 1935 * An interrupt came in before the update 1936 * and processed this for us. 1937 * Nothing left to do. 1938 */ 1939 return 1; 1940 case RB_PAGE_MOVED: 1941 /* 1942 * The reader is on another CPU and just did 1943 * a swap with our next_page. 1944 * Try again. 1945 */ 1946 return 1; 1947 default: 1948 RB_WARN_ON(cpu_buffer, 1); /* WTF??? */ 1949 return -1; 1950 } 1951 1952 /* 1953 * Now that we are here, the old head pointer is 1954 * set to UPDATE. This will keep the reader from 1955 * swapping the head page with the reader page. 1956 * The reader (on another CPU) will spin till 1957 * we are finished. 1958 * 1959 * We just need to protect against interrupts 1960 * doing the job. We will set the next pointer 1961 * to HEAD. After that, we set the old pointer 1962 * to NORMAL, but only if it was HEAD before. 1963 * otherwise we are an interrupt, and only 1964 * want the outer most commit to reset it. 1965 */ 1966 new_head = next_page; 1967 rb_inc_page(cpu_buffer, &new_head); 1968 1969 ret = rb_head_page_set_head(cpu_buffer, new_head, next_page, 1970 RB_PAGE_NORMAL); 1971 1972 /* 1973 * Valid returns are: 1974 * HEAD - an interrupt came in and already set it. 1975 * NORMAL - One of two things: 1976 * 1) We really set it. 1977 * 2) A bunch of interrupts came in and moved 1978 * the page forward again. 1979 */ 1980 switch (ret) { 1981 case RB_PAGE_HEAD: 1982 case RB_PAGE_NORMAL: 1983 /* OK */ 1984 break; 1985 default: 1986 RB_WARN_ON(cpu_buffer, 1); 1987 return -1; 1988 } 1989 1990 /* 1991 * It is possible that an interrupt came in, 1992 * set the head up, then more interrupts came in 1993 * and moved it again. When we get back here, 1994 * the page would have been set to NORMAL but we 1995 * just set it back to HEAD. 1996 * 1997 * How do you detect this? Well, if that happened 1998 * the tail page would have moved. 1999 */ 2000 if (ret == RB_PAGE_NORMAL) { 2001 struct buffer_page *buffer_tail_page; 2002 2003 buffer_tail_page = READ_ONCE(cpu_buffer->tail_page); 2004 /* 2005 * If the tail had moved passed next, then we need 2006 * to reset the pointer. 2007 */ 2008 if (buffer_tail_page != tail_page && 2009 buffer_tail_page != next_page) 2010 rb_head_page_set_normal(cpu_buffer, new_head, 2011 next_page, 2012 RB_PAGE_HEAD); 2013 } 2014 2015 /* 2016 * If this was the outer most commit (the one that 2017 * changed the original pointer from HEAD to UPDATE), 2018 * then it is up to us to reset it to NORMAL. 2019 */ 2020 if (type == RB_PAGE_HEAD) { 2021 ret = rb_head_page_set_normal(cpu_buffer, next_page, 2022 tail_page, 2023 RB_PAGE_UPDATE); 2024 if (RB_WARN_ON(cpu_buffer, 2025 ret != RB_PAGE_UPDATE)) 2026 return -1; 2027 } 2028 2029 return 0; 2030 } 2031 2032 static inline void 2033 rb_reset_tail(struct ring_buffer_per_cpu *cpu_buffer, 2034 unsigned long tail, struct rb_event_info *info) 2035 { 2036 struct buffer_page *tail_page = info->tail_page; 2037 struct ring_buffer_event *event; 2038 unsigned long length = info->length; 2039 2040 /* 2041 * Only the event that crossed the page boundary 2042 * must fill the old tail_page with padding. 2043 */ 2044 if (tail >= BUF_PAGE_SIZE) { 2045 /* 2046 * If the page was filled, then we still need 2047 * to update the real_end. Reset it to zero 2048 * and the reader will ignore it. 2049 */ 2050 if (tail == BUF_PAGE_SIZE) 2051 tail_page->real_end = 0; 2052 2053 local_sub(length, &tail_page->write); 2054 return; 2055 } 2056 2057 event = __rb_page_index(tail_page, tail); 2058 kmemcheck_annotate_bitfield(event, bitfield); 2059 2060 /* account for padding bytes */ 2061 local_add(BUF_PAGE_SIZE - tail, &cpu_buffer->entries_bytes); 2062 2063 /* 2064 * Save the original length to the meta data. 2065 * This will be used by the reader to add lost event 2066 * counter. 2067 */ 2068 tail_page->real_end = tail; 2069 2070 /* 2071 * If this event is bigger than the minimum size, then 2072 * we need to be careful that we don't subtract the 2073 * write counter enough to allow another writer to slip 2074 * in on this page. 2075 * We put in a discarded commit instead, to make sure 2076 * that this space is not used again. 2077 * 2078 * If we are less than the minimum size, we don't need to 2079 * worry about it. 2080 */ 2081 if (tail > (BUF_PAGE_SIZE - RB_EVNT_MIN_SIZE)) { 2082 /* No room for any events */ 2083 2084 /* Mark the rest of the page with padding */ 2085 rb_event_set_padding(event); 2086 2087 /* Set the write back to the previous setting */ 2088 local_sub(length, &tail_page->write); 2089 return; 2090 } 2091 2092 /* Put in a discarded event */ 2093 event->array[0] = (BUF_PAGE_SIZE - tail) - RB_EVNT_HDR_SIZE; 2094 event->type_len = RINGBUF_TYPE_PADDING; 2095 /* time delta must be non zero */ 2096 event->time_delta = 1; 2097 2098 /* Set write to end of buffer */ 2099 length = (tail + length) - BUF_PAGE_SIZE; 2100 local_sub(length, &tail_page->write); 2101 } 2102 2103 static inline void rb_end_commit(struct ring_buffer_per_cpu *cpu_buffer); 2104 2105 /* 2106 * This is the slow path, force gcc not to inline it. 2107 */ 2108 static noinline struct ring_buffer_event * 2109 rb_move_tail(struct ring_buffer_per_cpu *cpu_buffer, 2110 unsigned long tail, struct rb_event_info *info) 2111 { 2112 struct buffer_page *tail_page = info->tail_page; 2113 struct buffer_page *commit_page = cpu_buffer->commit_page; 2114 struct ring_buffer *buffer = cpu_buffer->buffer; 2115 struct buffer_page *next_page; 2116 int ret; 2117 2118 next_page = tail_page; 2119 2120 rb_inc_page(cpu_buffer, &next_page); 2121 2122 /* 2123 * If for some reason, we had an interrupt storm that made 2124 * it all the way around the buffer, bail, and warn 2125 * about it. 2126 */ 2127 if (unlikely(next_page == commit_page)) { 2128 local_inc(&cpu_buffer->commit_overrun); 2129 goto out_reset; 2130 } 2131 2132 /* 2133 * This is where the fun begins! 2134 * 2135 * We are fighting against races between a reader that 2136 * could be on another CPU trying to swap its reader 2137 * page with the buffer head. 2138 * 2139 * We are also fighting against interrupts coming in and 2140 * moving the head or tail on us as well. 2141 * 2142 * If the next page is the head page then we have filled 2143 * the buffer, unless the commit page is still on the 2144 * reader page. 2145 */ 2146 if (rb_is_head_page(cpu_buffer, next_page, &tail_page->list)) { 2147 2148 /* 2149 * If the commit is not on the reader page, then 2150 * move the header page. 2151 */ 2152 if (!rb_is_reader_page(cpu_buffer->commit_page)) { 2153 /* 2154 * If we are not in overwrite mode, 2155 * this is easy, just stop here. 2156 */ 2157 if (!(buffer->flags & RB_FL_OVERWRITE)) { 2158 local_inc(&cpu_buffer->dropped_events); 2159 goto out_reset; 2160 } 2161 2162 ret = rb_handle_head_page(cpu_buffer, 2163 tail_page, 2164 next_page); 2165 if (ret < 0) 2166 goto out_reset; 2167 if (ret) 2168 goto out_again; 2169 } else { 2170 /* 2171 * We need to be careful here too. The 2172 * commit page could still be on the reader 2173 * page. We could have a small buffer, and 2174 * have filled up the buffer with events 2175 * from interrupts and such, and wrapped. 2176 * 2177 * Note, if the tail page is also the on the 2178 * reader_page, we let it move out. 2179 */ 2180 if (unlikely((cpu_buffer->commit_page != 2181 cpu_buffer->tail_page) && 2182 (cpu_buffer->commit_page == 2183 cpu_buffer->reader_page))) { 2184 local_inc(&cpu_buffer->commit_overrun); 2185 goto out_reset; 2186 } 2187 } 2188 } 2189 2190 rb_tail_page_update(cpu_buffer, tail_page, next_page); 2191 2192 out_again: 2193 2194 rb_reset_tail(cpu_buffer, tail, info); 2195 2196 /* Commit what we have for now. */ 2197 rb_end_commit(cpu_buffer); 2198 /* rb_end_commit() decs committing */ 2199 local_inc(&cpu_buffer->committing); 2200 2201 /* fail and let the caller try again */ 2202 return ERR_PTR(-EAGAIN); 2203 2204 out_reset: 2205 /* reset write */ 2206 rb_reset_tail(cpu_buffer, tail, info); 2207 2208 return NULL; 2209 } 2210 2211 /* Slow path, do not inline */ 2212 static noinline struct ring_buffer_event * 2213 rb_add_time_stamp(struct ring_buffer_event *event, u64 delta) 2214 { 2215 event->type_len = RINGBUF_TYPE_TIME_EXTEND; 2216 2217 /* Not the first event on the page? */ 2218 if (rb_event_index(event)) { 2219 event->time_delta = delta & TS_MASK; 2220 event->array[0] = delta >> TS_SHIFT; 2221 } else { 2222 /* nope, just zero it */ 2223 event->time_delta = 0; 2224 event->array[0] = 0; 2225 } 2226 2227 return skip_time_extend(event); 2228 } 2229 2230 static inline bool rb_event_is_commit(struct ring_buffer_per_cpu *cpu_buffer, 2231 struct ring_buffer_event *event); 2232 2233 /** 2234 * rb_update_event - update event type and data 2235 * @event: the event to update 2236 * @type: the type of event 2237 * @length: the size of the event field in the ring buffer 2238 * 2239 * Update the type and data fields of the event. The length 2240 * is the actual size that is written to the ring buffer, 2241 * and with this, we can determine what to place into the 2242 * data field. 2243 */ 2244 static void 2245 rb_update_event(struct ring_buffer_per_cpu *cpu_buffer, 2246 struct ring_buffer_event *event, 2247 struct rb_event_info *info) 2248 { 2249 unsigned length = info->length; 2250 u64 delta = info->delta; 2251 2252 /* Only a commit updates the timestamp */ 2253 if (unlikely(!rb_event_is_commit(cpu_buffer, event))) 2254 delta = 0; 2255 2256 /* 2257 * If we need to add a timestamp, then we 2258 * add it to the start of the resevered space. 2259 */ 2260 if (unlikely(info->add_timestamp)) { 2261 event = rb_add_time_stamp(event, delta); 2262 length -= RB_LEN_TIME_EXTEND; 2263 delta = 0; 2264 } 2265 2266 event->time_delta = delta; 2267 length -= RB_EVNT_HDR_SIZE; 2268 if (length > RB_MAX_SMALL_DATA || RB_FORCE_8BYTE_ALIGNMENT) { 2269 event->type_len = 0; 2270 event->array[0] = length; 2271 } else 2272 event->type_len = DIV_ROUND_UP(length, RB_ALIGNMENT); 2273 } 2274 2275 static unsigned rb_calculate_event_length(unsigned length) 2276 { 2277 struct ring_buffer_event event; /* Used only for sizeof array */ 2278 2279 /* zero length can cause confusions */ 2280 if (!length) 2281 length++; 2282 2283 if (length > RB_MAX_SMALL_DATA || RB_FORCE_8BYTE_ALIGNMENT) 2284 length += sizeof(event.array[0]); 2285 2286 length += RB_EVNT_HDR_SIZE; 2287 length = ALIGN(length, RB_ARCH_ALIGNMENT); 2288 2289 /* 2290 * In case the time delta is larger than the 27 bits for it 2291 * in the header, we need to add a timestamp. If another 2292 * event comes in when trying to discard this one to increase 2293 * the length, then the timestamp will be added in the allocated 2294 * space of this event. If length is bigger than the size needed 2295 * for the TIME_EXTEND, then padding has to be used. The events 2296 * length must be either RB_LEN_TIME_EXTEND, or greater than or equal 2297 * to RB_LEN_TIME_EXTEND + 8, as 8 is the minimum size for padding. 2298 * As length is a multiple of 4, we only need to worry if it 2299 * is 12 (RB_LEN_TIME_EXTEND + 4). 2300 */ 2301 if (length == RB_LEN_TIME_EXTEND + RB_ALIGNMENT) 2302 length += RB_ALIGNMENT; 2303 2304 return length; 2305 } 2306 2307 #ifndef CONFIG_HAVE_UNSTABLE_SCHED_CLOCK 2308 static inline bool sched_clock_stable(void) 2309 { 2310 return true; 2311 } 2312 #endif 2313 2314 static inline int 2315 rb_try_to_discard(struct ring_buffer_per_cpu *cpu_buffer, 2316 struct ring_buffer_event *event) 2317 { 2318 unsigned long new_index, old_index; 2319 struct buffer_page *bpage; 2320 unsigned long index; 2321 unsigned long addr; 2322 2323 new_index = rb_event_index(event); 2324 old_index = new_index + rb_event_ts_length(event); 2325 addr = (unsigned long)event; 2326 addr &= PAGE_MASK; 2327 2328 bpage = READ_ONCE(cpu_buffer->tail_page); 2329 2330 if (bpage->page == (void *)addr && rb_page_write(bpage) == old_index) { 2331 unsigned long write_mask = 2332 local_read(&bpage->write) & ~RB_WRITE_MASK; 2333 unsigned long event_length = rb_event_length(event); 2334 /* 2335 * This is on the tail page. It is possible that 2336 * a write could come in and move the tail page 2337 * and write to the next page. That is fine 2338 * because we just shorten what is on this page. 2339 */ 2340 old_index += write_mask; 2341 new_index += write_mask; 2342 index = local_cmpxchg(&bpage->write, old_index, new_index); 2343 if (index == old_index) { 2344 /* update counters */ 2345 local_sub(event_length, &cpu_buffer->entries_bytes); 2346 return 1; 2347 } 2348 } 2349 2350 /* could not discard */ 2351 return 0; 2352 } 2353 2354 static void rb_start_commit(struct ring_buffer_per_cpu *cpu_buffer) 2355 { 2356 local_inc(&cpu_buffer->committing); 2357 local_inc(&cpu_buffer->commits); 2358 } 2359 2360 static __always_inline void 2361 rb_set_commit_to_write(struct ring_buffer_per_cpu *cpu_buffer) 2362 { 2363 unsigned long max_count; 2364 2365 /* 2366 * We only race with interrupts and NMIs on this CPU. 2367 * If we own the commit event, then we can commit 2368 * all others that interrupted us, since the interruptions 2369 * are in stack format (they finish before they come 2370 * back to us). This allows us to do a simple loop to 2371 * assign the commit to the tail. 2372 */ 2373 again: 2374 max_count = cpu_buffer->nr_pages * 100; 2375 2376 while (cpu_buffer->commit_page != READ_ONCE(cpu_buffer->tail_page)) { 2377 if (RB_WARN_ON(cpu_buffer, !(--max_count))) 2378 return; 2379 if (RB_WARN_ON(cpu_buffer, 2380 rb_is_reader_page(cpu_buffer->tail_page))) 2381 return; 2382 local_set(&cpu_buffer->commit_page->page->commit, 2383 rb_page_write(cpu_buffer->commit_page)); 2384 rb_inc_page(cpu_buffer, &cpu_buffer->commit_page); 2385 /* Only update the write stamp if the page has an event */ 2386 if (rb_page_write(cpu_buffer->commit_page)) 2387 cpu_buffer->write_stamp = 2388 cpu_buffer->commit_page->page->time_stamp; 2389 /* add barrier to keep gcc from optimizing too much */ 2390 barrier(); 2391 } 2392 while (rb_commit_index(cpu_buffer) != 2393 rb_page_write(cpu_buffer->commit_page)) { 2394 2395 local_set(&cpu_buffer->commit_page->page->commit, 2396 rb_page_write(cpu_buffer->commit_page)); 2397 RB_WARN_ON(cpu_buffer, 2398 local_read(&cpu_buffer->commit_page->page->commit) & 2399 ~RB_WRITE_MASK); 2400 barrier(); 2401 } 2402 2403 /* again, keep gcc from optimizing */ 2404 barrier(); 2405 2406 /* 2407 * If an interrupt came in just after the first while loop 2408 * and pushed the tail page forward, we will be left with 2409 * a dangling commit that will never go forward. 2410 */ 2411 if (unlikely(cpu_buffer->commit_page != READ_ONCE(cpu_buffer->tail_page))) 2412 goto again; 2413 } 2414 2415 static __always_inline void rb_end_commit(struct ring_buffer_per_cpu *cpu_buffer) 2416 { 2417 unsigned long commits; 2418 2419 if (RB_WARN_ON(cpu_buffer, 2420 !local_read(&cpu_buffer->committing))) 2421 return; 2422 2423 again: 2424 commits = local_read(&cpu_buffer->commits); 2425 /* synchronize with interrupts */ 2426 barrier(); 2427 if (local_read(&cpu_buffer->committing) == 1) 2428 rb_set_commit_to_write(cpu_buffer); 2429 2430 local_dec(&cpu_buffer->committing); 2431 2432 /* synchronize with interrupts */ 2433 barrier(); 2434 2435 /* 2436 * Need to account for interrupts coming in between the 2437 * updating of the commit page and the clearing of the 2438 * committing counter. 2439 */ 2440 if (unlikely(local_read(&cpu_buffer->commits) != commits) && 2441 !local_read(&cpu_buffer->committing)) { 2442 local_inc(&cpu_buffer->committing); 2443 goto again; 2444 } 2445 } 2446 2447 static inline void rb_event_discard(struct ring_buffer_event *event) 2448 { 2449 if (event->type_len == RINGBUF_TYPE_TIME_EXTEND) 2450 event = skip_time_extend(event); 2451 2452 /* array[0] holds the actual length for the discarded event */ 2453 event->array[0] = rb_event_data_length(event) - RB_EVNT_HDR_SIZE; 2454 event->type_len = RINGBUF_TYPE_PADDING; 2455 /* time delta must be non zero */ 2456 if (!event->time_delta) 2457 event->time_delta = 1; 2458 } 2459 2460 static __always_inline bool 2461 rb_event_is_commit(struct ring_buffer_per_cpu *cpu_buffer, 2462 struct ring_buffer_event *event) 2463 { 2464 unsigned long addr = (unsigned long)event; 2465 unsigned long index; 2466 2467 index = rb_event_index(event); 2468 addr &= PAGE_MASK; 2469 2470 return cpu_buffer->commit_page->page == (void *)addr && 2471 rb_commit_index(cpu_buffer) == index; 2472 } 2473 2474 static __always_inline void 2475 rb_update_write_stamp(struct ring_buffer_per_cpu *cpu_buffer, 2476 struct ring_buffer_event *event) 2477 { 2478 u64 delta; 2479 2480 /* 2481 * The event first in the commit queue updates the 2482 * time stamp. 2483 */ 2484 if (rb_event_is_commit(cpu_buffer, event)) { 2485 /* 2486 * A commit event that is first on a page 2487 * updates the write timestamp with the page stamp 2488 */ 2489 if (!rb_event_index(event)) 2490 cpu_buffer->write_stamp = 2491 cpu_buffer->commit_page->page->time_stamp; 2492 else if (event->type_len == RINGBUF_TYPE_TIME_EXTEND) { 2493 delta = event->array[0]; 2494 delta <<= TS_SHIFT; 2495 delta += event->time_delta; 2496 cpu_buffer->write_stamp += delta; 2497 } else 2498 cpu_buffer->write_stamp += event->time_delta; 2499 } 2500 } 2501 2502 static void rb_commit(struct ring_buffer_per_cpu *cpu_buffer, 2503 struct ring_buffer_event *event) 2504 { 2505 local_inc(&cpu_buffer->entries); 2506 rb_update_write_stamp(cpu_buffer, event); 2507 rb_end_commit(cpu_buffer); 2508 } 2509 2510 static __always_inline void 2511 rb_wakeups(struct ring_buffer *buffer, struct ring_buffer_per_cpu *cpu_buffer) 2512 { 2513 bool pagebusy; 2514 2515 if (buffer->irq_work.waiters_pending) { 2516 buffer->irq_work.waiters_pending = false; 2517 /* irq_work_queue() supplies it's own memory barriers */ 2518 irq_work_queue(&buffer->irq_work.work); 2519 } 2520 2521 if (cpu_buffer->irq_work.waiters_pending) { 2522 cpu_buffer->irq_work.waiters_pending = false; 2523 /* irq_work_queue() supplies it's own memory barriers */ 2524 irq_work_queue(&cpu_buffer->irq_work.work); 2525 } 2526 2527 pagebusy = cpu_buffer->reader_page == cpu_buffer->commit_page; 2528 2529 if (!pagebusy && cpu_buffer->irq_work.full_waiters_pending) { 2530 cpu_buffer->irq_work.wakeup_full = true; 2531 cpu_buffer->irq_work.full_waiters_pending = false; 2532 /* irq_work_queue() supplies it's own memory barriers */ 2533 irq_work_queue(&cpu_buffer->irq_work.work); 2534 } 2535 } 2536 2537 /* 2538 * The lock and unlock are done within a preempt disable section. 2539 * The current_context per_cpu variable can only be modified 2540 * by the current task between lock and unlock. But it can 2541 * be modified more than once via an interrupt. To pass this 2542 * information from the lock to the unlock without having to 2543 * access the 'in_interrupt()' functions again (which do show 2544 * a bit of overhead in something as critical as function tracing, 2545 * we use a bitmask trick. 2546 * 2547 * bit 0 = NMI context 2548 * bit 1 = IRQ context 2549 * bit 2 = SoftIRQ context 2550 * bit 3 = normal context. 2551 * 2552 * This works because this is the order of contexts that can 2553 * preempt other contexts. A SoftIRQ never preempts an IRQ 2554 * context. 2555 * 2556 * When the context is determined, the corresponding bit is 2557 * checked and set (if it was set, then a recursion of that context 2558 * happened). 2559 * 2560 * On unlock, we need to clear this bit. To do so, just subtract 2561 * 1 from the current_context and AND it to itself. 2562 * 2563 * (binary) 2564 * 101 - 1 = 100 2565 * 101 & 100 = 100 (clearing bit zero) 2566 * 2567 * 1010 - 1 = 1001 2568 * 1010 & 1001 = 1000 (clearing bit 1) 2569 * 2570 * The least significant bit can be cleared this way, and it 2571 * just so happens that it is the same bit corresponding to 2572 * the current context. 2573 */ 2574 2575 static __always_inline int 2576 trace_recursive_lock(struct ring_buffer_per_cpu *cpu_buffer) 2577 { 2578 unsigned int val = cpu_buffer->current_context; 2579 int bit; 2580 2581 if (in_interrupt()) { 2582 if (in_nmi()) 2583 bit = RB_CTX_NMI; 2584 else if (in_irq()) 2585 bit = RB_CTX_IRQ; 2586 else 2587 bit = RB_CTX_SOFTIRQ; 2588 } else 2589 bit = RB_CTX_NORMAL; 2590 2591 if (unlikely(val & (1 << bit))) 2592 return 1; 2593 2594 val |= (1 << bit); 2595 cpu_buffer->current_context = val; 2596 2597 return 0; 2598 } 2599 2600 static __always_inline void 2601 trace_recursive_unlock(struct ring_buffer_per_cpu *cpu_buffer) 2602 { 2603 cpu_buffer->current_context &= cpu_buffer->current_context - 1; 2604 } 2605 2606 /** 2607 * ring_buffer_unlock_commit - commit a reserved 2608 * @buffer: The buffer to commit to 2609 * @event: The event pointer to commit. 2610 * 2611 * This commits the data to the ring buffer, and releases any locks held. 2612 * 2613 * Must be paired with ring_buffer_lock_reserve. 2614 */ 2615 int ring_buffer_unlock_commit(struct ring_buffer *buffer, 2616 struct ring_buffer_event *event) 2617 { 2618 struct ring_buffer_per_cpu *cpu_buffer; 2619 int cpu = raw_smp_processor_id(); 2620 2621 cpu_buffer = buffer->buffers[cpu]; 2622 2623 rb_commit(cpu_buffer, event); 2624 2625 rb_wakeups(buffer, cpu_buffer); 2626 2627 trace_recursive_unlock(cpu_buffer); 2628 2629 preempt_enable_notrace(); 2630 2631 return 0; 2632 } 2633 EXPORT_SYMBOL_GPL(ring_buffer_unlock_commit); 2634 2635 static noinline void 2636 rb_handle_timestamp(struct ring_buffer_per_cpu *cpu_buffer, 2637 struct rb_event_info *info) 2638 { 2639 WARN_ONCE(info->delta > (1ULL << 59), 2640 KERN_WARNING "Delta way too big! %llu ts=%llu write stamp = %llu\n%s", 2641 (unsigned long long)info->delta, 2642 (unsigned long long)info->ts, 2643 (unsigned long long)cpu_buffer->write_stamp, 2644 sched_clock_stable() ? "" : 2645 "If you just came from a suspend/resume,\n" 2646 "please switch to the trace global clock:\n" 2647 " echo global > /sys/kernel/debug/tracing/trace_clock\n"); 2648 info->add_timestamp = 1; 2649 } 2650 2651 static struct ring_buffer_event * 2652 __rb_reserve_next(struct ring_buffer_per_cpu *cpu_buffer, 2653 struct rb_event_info *info) 2654 { 2655 struct ring_buffer_event *event; 2656 struct buffer_page *tail_page; 2657 unsigned long tail, write; 2658 2659 /* 2660 * If the time delta since the last event is too big to 2661 * hold in the time field of the event, then we append a 2662 * TIME EXTEND event ahead of the data event. 2663 */ 2664 if (unlikely(info->add_timestamp)) 2665 info->length += RB_LEN_TIME_EXTEND; 2666 2667 /* Don't let the compiler play games with cpu_buffer->tail_page */ 2668 tail_page = info->tail_page = READ_ONCE(cpu_buffer->tail_page); 2669 write = local_add_return(info->length, &tail_page->write); 2670 2671 /* set write to only the index of the write */ 2672 write &= RB_WRITE_MASK; 2673 tail = write - info->length; 2674 2675 /* 2676 * If this is the first commit on the page, then it has the same 2677 * timestamp as the page itself. 2678 */ 2679 if (!tail) 2680 info->delta = 0; 2681 2682 /* See if we shot pass the end of this buffer page */ 2683 if (unlikely(write > BUF_PAGE_SIZE)) 2684 return rb_move_tail(cpu_buffer, tail, info); 2685 2686 /* We reserved something on the buffer */ 2687 2688 event = __rb_page_index(tail_page, tail); 2689 kmemcheck_annotate_bitfield(event, bitfield); 2690 rb_update_event(cpu_buffer, event, info); 2691 2692 local_inc(&tail_page->entries); 2693 2694 /* 2695 * If this is the first commit on the page, then update 2696 * its timestamp. 2697 */ 2698 if (!tail) 2699 tail_page->page->time_stamp = info->ts; 2700 2701 /* account for these added bytes */ 2702 local_add(info->length, &cpu_buffer->entries_bytes); 2703 2704 return event; 2705 } 2706 2707 static __always_inline struct ring_buffer_event * 2708 rb_reserve_next_event(struct ring_buffer *buffer, 2709 struct ring_buffer_per_cpu *cpu_buffer, 2710 unsigned long length) 2711 { 2712 struct ring_buffer_event *event; 2713 struct rb_event_info info; 2714 int nr_loops = 0; 2715 u64 diff; 2716 2717 rb_start_commit(cpu_buffer); 2718 2719 #ifdef CONFIG_RING_BUFFER_ALLOW_SWAP 2720 /* 2721 * Due to the ability to swap a cpu buffer from a buffer 2722 * it is possible it was swapped before we committed. 2723 * (committing stops a swap). We check for it here and 2724 * if it happened, we have to fail the write. 2725 */ 2726 barrier(); 2727 if (unlikely(ACCESS_ONCE(cpu_buffer->buffer) != buffer)) { 2728 local_dec(&cpu_buffer->committing); 2729 local_dec(&cpu_buffer->commits); 2730 return NULL; 2731 } 2732 #endif 2733 2734 info.length = rb_calculate_event_length(length); 2735 again: 2736 info.add_timestamp = 0; 2737 info.delta = 0; 2738 2739 /* 2740 * We allow for interrupts to reenter here and do a trace. 2741 * If one does, it will cause this original code to loop 2742 * back here. Even with heavy interrupts happening, this 2743 * should only happen a few times in a row. If this happens 2744 * 1000 times in a row, there must be either an interrupt 2745 * storm or we have something buggy. 2746 * Bail! 2747 */ 2748 if (RB_WARN_ON(cpu_buffer, ++nr_loops > 1000)) 2749 goto out_fail; 2750 2751 info.ts = rb_time_stamp(cpu_buffer->buffer); 2752 diff = info.ts - cpu_buffer->write_stamp; 2753 2754 /* make sure this diff is calculated here */ 2755 barrier(); 2756 2757 /* Did the write stamp get updated already? */ 2758 if (likely(info.ts >= cpu_buffer->write_stamp)) { 2759 info.delta = diff; 2760 if (unlikely(test_time_stamp(info.delta))) 2761 rb_handle_timestamp(cpu_buffer, &info); 2762 } 2763 2764 event = __rb_reserve_next(cpu_buffer, &info); 2765 2766 if (unlikely(PTR_ERR(event) == -EAGAIN)) { 2767 if (info.add_timestamp) 2768 info.length -= RB_LEN_TIME_EXTEND; 2769 goto again; 2770 } 2771 2772 if (!event) 2773 goto out_fail; 2774 2775 return event; 2776 2777 out_fail: 2778 rb_end_commit(cpu_buffer); 2779 return NULL; 2780 } 2781 2782 /** 2783 * ring_buffer_lock_reserve - reserve a part of the buffer 2784 * @buffer: the ring buffer to reserve from 2785 * @length: the length of the data to reserve (excluding event header) 2786 * 2787 * Returns a reseverd event on the ring buffer to copy directly to. 2788 * The user of this interface will need to get the body to write into 2789 * and can use the ring_buffer_event_data() interface. 2790 * 2791 * The length is the length of the data needed, not the event length 2792 * which also includes the event header. 2793 * 2794 * Must be paired with ring_buffer_unlock_commit, unless NULL is returned. 2795 * If NULL is returned, then nothing has been allocated or locked. 2796 */ 2797 struct ring_buffer_event * 2798 ring_buffer_lock_reserve(struct ring_buffer *buffer, unsigned long length) 2799 { 2800 struct ring_buffer_per_cpu *cpu_buffer; 2801 struct ring_buffer_event *event; 2802 int cpu; 2803 2804 /* If we are tracing schedule, we don't want to recurse */ 2805 preempt_disable_notrace(); 2806 2807 if (unlikely(atomic_read(&buffer->record_disabled))) 2808 goto out; 2809 2810 cpu = raw_smp_processor_id(); 2811 2812 if (unlikely(!cpumask_test_cpu(cpu, buffer->cpumask))) 2813 goto out; 2814 2815 cpu_buffer = buffer->buffers[cpu]; 2816 2817 if (unlikely(atomic_read(&cpu_buffer->record_disabled))) 2818 goto out; 2819 2820 if (unlikely(length > BUF_MAX_DATA_SIZE)) 2821 goto out; 2822 2823 if (unlikely(trace_recursive_lock(cpu_buffer))) 2824 goto out; 2825 2826 event = rb_reserve_next_event(buffer, cpu_buffer, length); 2827 if (!event) 2828 goto out_unlock; 2829 2830 return event; 2831 2832 out_unlock: 2833 trace_recursive_unlock(cpu_buffer); 2834 out: 2835 preempt_enable_notrace(); 2836 return NULL; 2837 } 2838 EXPORT_SYMBOL_GPL(ring_buffer_lock_reserve); 2839 2840 /* 2841 * Decrement the entries to the page that an event is on. 2842 * The event does not even need to exist, only the pointer 2843 * to the page it is on. This may only be called before the commit 2844 * takes place. 2845 */ 2846 static inline void 2847 rb_decrement_entry(struct ring_buffer_per_cpu *cpu_buffer, 2848 struct ring_buffer_event *event) 2849 { 2850 unsigned long addr = (unsigned long)event; 2851 struct buffer_page *bpage = cpu_buffer->commit_page; 2852 struct buffer_page *start; 2853 2854 addr &= PAGE_MASK; 2855 2856 /* Do the likely case first */ 2857 if (likely(bpage->page == (void *)addr)) { 2858 local_dec(&bpage->entries); 2859 return; 2860 } 2861 2862 /* 2863 * Because the commit page may be on the reader page we 2864 * start with the next page and check the end loop there. 2865 */ 2866 rb_inc_page(cpu_buffer, &bpage); 2867 start = bpage; 2868 do { 2869 if (bpage->page == (void *)addr) { 2870 local_dec(&bpage->entries); 2871 return; 2872 } 2873 rb_inc_page(cpu_buffer, &bpage); 2874 } while (bpage != start); 2875 2876 /* commit not part of this buffer?? */ 2877 RB_WARN_ON(cpu_buffer, 1); 2878 } 2879 2880 /** 2881 * ring_buffer_commit_discard - discard an event that has not been committed 2882 * @buffer: the ring buffer 2883 * @event: non committed event to discard 2884 * 2885 * Sometimes an event that is in the ring buffer needs to be ignored. 2886 * This function lets the user discard an event in the ring buffer 2887 * and then that event will not be read later. 2888 * 2889 * This function only works if it is called before the the item has been 2890 * committed. It will try to free the event from the ring buffer 2891 * if another event has not been added behind it. 2892 * 2893 * If another event has been added behind it, it will set the event 2894 * up as discarded, and perform the commit. 2895 * 2896 * If this function is called, do not call ring_buffer_unlock_commit on 2897 * the event. 2898 */ 2899 void ring_buffer_discard_commit(struct ring_buffer *buffer, 2900 struct ring_buffer_event *event) 2901 { 2902 struct ring_buffer_per_cpu *cpu_buffer; 2903 int cpu; 2904 2905 /* The event is discarded regardless */ 2906 rb_event_discard(event); 2907 2908 cpu = smp_processor_id(); 2909 cpu_buffer = buffer->buffers[cpu]; 2910 2911 /* 2912 * This must only be called if the event has not been 2913 * committed yet. Thus we can assume that preemption 2914 * is still disabled. 2915 */ 2916 RB_WARN_ON(buffer, !local_read(&cpu_buffer->committing)); 2917 2918 rb_decrement_entry(cpu_buffer, event); 2919 if (rb_try_to_discard(cpu_buffer, event)) 2920 goto out; 2921 2922 /* 2923 * The commit is still visible by the reader, so we 2924 * must still update the timestamp. 2925 */ 2926 rb_update_write_stamp(cpu_buffer, event); 2927 out: 2928 rb_end_commit(cpu_buffer); 2929 2930 trace_recursive_unlock(cpu_buffer); 2931 2932 preempt_enable_notrace(); 2933 2934 } 2935 EXPORT_SYMBOL_GPL(ring_buffer_discard_commit); 2936 2937 /** 2938 * ring_buffer_write - write data to the buffer without reserving 2939 * @buffer: The ring buffer to write to. 2940 * @length: The length of the data being written (excluding the event header) 2941 * @data: The data to write to the buffer. 2942 * 2943 * This is like ring_buffer_lock_reserve and ring_buffer_unlock_commit as 2944 * one function. If you already have the data to write to the buffer, it 2945 * may be easier to simply call this function. 2946 * 2947 * Note, like ring_buffer_lock_reserve, the length is the length of the data 2948 * and not the length of the event which would hold the header. 2949 */ 2950 int ring_buffer_write(struct ring_buffer *buffer, 2951 unsigned long length, 2952 void *data) 2953 { 2954 struct ring_buffer_per_cpu *cpu_buffer; 2955 struct ring_buffer_event *event; 2956 void *body; 2957 int ret = -EBUSY; 2958 int cpu; 2959 2960 preempt_disable_notrace(); 2961 2962 if (atomic_read(&buffer->record_disabled)) 2963 goto out; 2964 2965 cpu = raw_smp_processor_id(); 2966 2967 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 2968 goto out; 2969 2970 cpu_buffer = buffer->buffers[cpu]; 2971 2972 if (atomic_read(&cpu_buffer->record_disabled)) 2973 goto out; 2974 2975 if (length > BUF_MAX_DATA_SIZE) 2976 goto out; 2977 2978 if (unlikely(trace_recursive_lock(cpu_buffer))) 2979 goto out; 2980 2981 event = rb_reserve_next_event(buffer, cpu_buffer, length); 2982 if (!event) 2983 goto out_unlock; 2984 2985 body = rb_event_data(event); 2986 2987 memcpy(body, data, length); 2988 2989 rb_commit(cpu_buffer, event); 2990 2991 rb_wakeups(buffer, cpu_buffer); 2992 2993 ret = 0; 2994 2995 out_unlock: 2996 trace_recursive_unlock(cpu_buffer); 2997 2998 out: 2999 preempt_enable_notrace(); 3000 3001 return ret; 3002 } 3003 EXPORT_SYMBOL_GPL(ring_buffer_write); 3004 3005 static bool rb_per_cpu_empty(struct ring_buffer_per_cpu *cpu_buffer) 3006 { 3007 struct buffer_page *reader = cpu_buffer->reader_page; 3008 struct buffer_page *head = rb_set_head_page(cpu_buffer); 3009 struct buffer_page *commit = cpu_buffer->commit_page; 3010 3011 /* In case of error, head will be NULL */ 3012 if (unlikely(!head)) 3013 return true; 3014 3015 return reader->read == rb_page_commit(reader) && 3016 (commit == reader || 3017 (commit == head && 3018 head->read == rb_page_commit(commit))); 3019 } 3020 3021 /** 3022 * ring_buffer_record_disable - stop all writes into the buffer 3023 * @buffer: The ring buffer to stop writes to. 3024 * 3025 * This prevents all writes to the buffer. Any attempt to write 3026 * to the buffer after this will fail and return NULL. 3027 * 3028 * The caller should call synchronize_sched() after this. 3029 */ 3030 void ring_buffer_record_disable(struct ring_buffer *buffer) 3031 { 3032 atomic_inc(&buffer->record_disabled); 3033 } 3034 EXPORT_SYMBOL_GPL(ring_buffer_record_disable); 3035 3036 /** 3037 * ring_buffer_record_enable - enable writes to the buffer 3038 * @buffer: The ring buffer to enable writes 3039 * 3040 * Note, multiple disables will need the same number of enables 3041 * to truly enable the writing (much like preempt_disable). 3042 */ 3043 void ring_buffer_record_enable(struct ring_buffer *buffer) 3044 { 3045 atomic_dec(&buffer->record_disabled); 3046 } 3047 EXPORT_SYMBOL_GPL(ring_buffer_record_enable); 3048 3049 /** 3050 * ring_buffer_record_off - stop all writes into the buffer 3051 * @buffer: The ring buffer to stop writes to. 3052 * 3053 * This prevents all writes to the buffer. Any attempt to write 3054 * to the buffer after this will fail and return NULL. 3055 * 3056 * This is different than ring_buffer_record_disable() as 3057 * it works like an on/off switch, where as the disable() version 3058 * must be paired with a enable(). 3059 */ 3060 void ring_buffer_record_off(struct ring_buffer *buffer) 3061 { 3062 unsigned int rd; 3063 unsigned int new_rd; 3064 3065 do { 3066 rd = atomic_read(&buffer->record_disabled); 3067 new_rd = rd | RB_BUFFER_OFF; 3068 } while (atomic_cmpxchg(&buffer->record_disabled, rd, new_rd) != rd); 3069 } 3070 EXPORT_SYMBOL_GPL(ring_buffer_record_off); 3071 3072 /** 3073 * ring_buffer_record_on - restart writes into the buffer 3074 * @buffer: The ring buffer to start writes to. 3075 * 3076 * This enables all writes to the buffer that was disabled by 3077 * ring_buffer_record_off(). 3078 * 3079 * This is different than ring_buffer_record_enable() as 3080 * it works like an on/off switch, where as the enable() version 3081 * must be paired with a disable(). 3082 */ 3083 void ring_buffer_record_on(struct ring_buffer *buffer) 3084 { 3085 unsigned int rd; 3086 unsigned int new_rd; 3087 3088 do { 3089 rd = atomic_read(&buffer->record_disabled); 3090 new_rd = rd & ~RB_BUFFER_OFF; 3091 } while (atomic_cmpxchg(&buffer->record_disabled, rd, new_rd) != rd); 3092 } 3093 EXPORT_SYMBOL_GPL(ring_buffer_record_on); 3094 3095 /** 3096 * ring_buffer_record_is_on - return true if the ring buffer can write 3097 * @buffer: The ring buffer to see if write is enabled 3098 * 3099 * Returns true if the ring buffer is in a state that it accepts writes. 3100 */ 3101 int ring_buffer_record_is_on(struct ring_buffer *buffer) 3102 { 3103 return !atomic_read(&buffer->record_disabled); 3104 } 3105 3106 /** 3107 * ring_buffer_record_disable_cpu - stop all writes into the cpu_buffer 3108 * @buffer: The ring buffer to stop writes to. 3109 * @cpu: The CPU buffer to stop 3110 * 3111 * This prevents all writes to the buffer. Any attempt to write 3112 * to the buffer after this will fail and return NULL. 3113 * 3114 * The caller should call synchronize_sched() after this. 3115 */ 3116 void ring_buffer_record_disable_cpu(struct ring_buffer *buffer, int cpu) 3117 { 3118 struct ring_buffer_per_cpu *cpu_buffer; 3119 3120 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 3121 return; 3122 3123 cpu_buffer = buffer->buffers[cpu]; 3124 atomic_inc(&cpu_buffer->record_disabled); 3125 } 3126 EXPORT_SYMBOL_GPL(ring_buffer_record_disable_cpu); 3127 3128 /** 3129 * ring_buffer_record_enable_cpu - enable writes to the buffer 3130 * @buffer: The ring buffer to enable writes 3131 * @cpu: The CPU to enable. 3132 * 3133 * Note, multiple disables will need the same number of enables 3134 * to truly enable the writing (much like preempt_disable). 3135 */ 3136 void ring_buffer_record_enable_cpu(struct ring_buffer *buffer, int cpu) 3137 { 3138 struct ring_buffer_per_cpu *cpu_buffer; 3139 3140 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 3141 return; 3142 3143 cpu_buffer = buffer->buffers[cpu]; 3144 atomic_dec(&cpu_buffer->record_disabled); 3145 } 3146 EXPORT_SYMBOL_GPL(ring_buffer_record_enable_cpu); 3147 3148 /* 3149 * The total entries in the ring buffer is the running counter 3150 * of entries entered into the ring buffer, minus the sum of 3151 * the entries read from the ring buffer and the number of 3152 * entries that were overwritten. 3153 */ 3154 static inline unsigned long 3155 rb_num_of_entries(struct ring_buffer_per_cpu *cpu_buffer) 3156 { 3157 return local_read(&cpu_buffer->entries) - 3158 (local_read(&cpu_buffer->overrun) + cpu_buffer->read); 3159 } 3160 3161 /** 3162 * ring_buffer_oldest_event_ts - get the oldest event timestamp from the buffer 3163 * @buffer: The ring buffer 3164 * @cpu: The per CPU buffer to read from. 3165 */ 3166 u64 ring_buffer_oldest_event_ts(struct ring_buffer *buffer, int cpu) 3167 { 3168 unsigned long flags; 3169 struct ring_buffer_per_cpu *cpu_buffer; 3170 struct buffer_page *bpage; 3171 u64 ret = 0; 3172 3173 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 3174 return 0; 3175 3176 cpu_buffer = buffer->buffers[cpu]; 3177 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 3178 /* 3179 * if the tail is on reader_page, oldest time stamp is on the reader 3180 * page 3181 */ 3182 if (cpu_buffer->tail_page == cpu_buffer->reader_page) 3183 bpage = cpu_buffer->reader_page; 3184 else 3185 bpage = rb_set_head_page(cpu_buffer); 3186 if (bpage) 3187 ret = bpage->page->time_stamp; 3188 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 3189 3190 return ret; 3191 } 3192 EXPORT_SYMBOL_GPL(ring_buffer_oldest_event_ts); 3193 3194 /** 3195 * ring_buffer_bytes_cpu - get the number of bytes consumed in a cpu buffer 3196 * @buffer: The ring buffer 3197 * @cpu: The per CPU buffer to read from. 3198 */ 3199 unsigned long ring_buffer_bytes_cpu(struct ring_buffer *buffer, int cpu) 3200 { 3201 struct ring_buffer_per_cpu *cpu_buffer; 3202 unsigned long ret; 3203 3204 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 3205 return 0; 3206 3207 cpu_buffer = buffer->buffers[cpu]; 3208 ret = local_read(&cpu_buffer->entries_bytes) - cpu_buffer->read_bytes; 3209 3210 return ret; 3211 } 3212 EXPORT_SYMBOL_GPL(ring_buffer_bytes_cpu); 3213 3214 /** 3215 * ring_buffer_entries_cpu - get the number of entries in a cpu buffer 3216 * @buffer: The ring buffer 3217 * @cpu: The per CPU buffer to get the entries from. 3218 */ 3219 unsigned long ring_buffer_entries_cpu(struct ring_buffer *buffer, int cpu) 3220 { 3221 struct ring_buffer_per_cpu *cpu_buffer; 3222 3223 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 3224 return 0; 3225 3226 cpu_buffer = buffer->buffers[cpu]; 3227 3228 return rb_num_of_entries(cpu_buffer); 3229 } 3230 EXPORT_SYMBOL_GPL(ring_buffer_entries_cpu); 3231 3232 /** 3233 * ring_buffer_overrun_cpu - get the number of overruns caused by the ring 3234 * buffer wrapping around (only if RB_FL_OVERWRITE is on). 3235 * @buffer: The ring buffer 3236 * @cpu: The per CPU buffer to get the number of overruns from 3237 */ 3238 unsigned long ring_buffer_overrun_cpu(struct ring_buffer *buffer, int cpu) 3239 { 3240 struct ring_buffer_per_cpu *cpu_buffer; 3241 unsigned long ret; 3242 3243 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 3244 return 0; 3245 3246 cpu_buffer = buffer->buffers[cpu]; 3247 ret = local_read(&cpu_buffer->overrun); 3248 3249 return ret; 3250 } 3251 EXPORT_SYMBOL_GPL(ring_buffer_overrun_cpu); 3252 3253 /** 3254 * ring_buffer_commit_overrun_cpu - get the number of overruns caused by 3255 * commits failing due to the buffer wrapping around while there are uncommitted 3256 * events, such as during an interrupt storm. 3257 * @buffer: The ring buffer 3258 * @cpu: The per CPU buffer to get the number of overruns from 3259 */ 3260 unsigned long 3261 ring_buffer_commit_overrun_cpu(struct ring_buffer *buffer, int cpu) 3262 { 3263 struct ring_buffer_per_cpu *cpu_buffer; 3264 unsigned long ret; 3265 3266 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 3267 return 0; 3268 3269 cpu_buffer = buffer->buffers[cpu]; 3270 ret = local_read(&cpu_buffer->commit_overrun); 3271 3272 return ret; 3273 } 3274 EXPORT_SYMBOL_GPL(ring_buffer_commit_overrun_cpu); 3275 3276 /** 3277 * ring_buffer_dropped_events_cpu - get the number of dropped events caused by 3278 * the ring buffer filling up (only if RB_FL_OVERWRITE is off). 3279 * @buffer: The ring buffer 3280 * @cpu: The per CPU buffer to get the number of overruns from 3281 */ 3282 unsigned long 3283 ring_buffer_dropped_events_cpu(struct ring_buffer *buffer, int cpu) 3284 { 3285 struct ring_buffer_per_cpu *cpu_buffer; 3286 unsigned long ret; 3287 3288 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 3289 return 0; 3290 3291 cpu_buffer = buffer->buffers[cpu]; 3292 ret = local_read(&cpu_buffer->dropped_events); 3293 3294 return ret; 3295 } 3296 EXPORT_SYMBOL_GPL(ring_buffer_dropped_events_cpu); 3297 3298 /** 3299 * ring_buffer_read_events_cpu - get the number of events successfully read 3300 * @buffer: The ring buffer 3301 * @cpu: The per CPU buffer to get the number of events read 3302 */ 3303 unsigned long 3304 ring_buffer_read_events_cpu(struct ring_buffer *buffer, int cpu) 3305 { 3306 struct ring_buffer_per_cpu *cpu_buffer; 3307 3308 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 3309 return 0; 3310 3311 cpu_buffer = buffer->buffers[cpu]; 3312 return cpu_buffer->read; 3313 } 3314 EXPORT_SYMBOL_GPL(ring_buffer_read_events_cpu); 3315 3316 /** 3317 * ring_buffer_entries - get the number of entries in a buffer 3318 * @buffer: The ring buffer 3319 * 3320 * Returns the total number of entries in the ring buffer 3321 * (all CPU entries) 3322 */ 3323 unsigned long ring_buffer_entries(struct ring_buffer *buffer) 3324 { 3325 struct ring_buffer_per_cpu *cpu_buffer; 3326 unsigned long entries = 0; 3327 int cpu; 3328 3329 /* if you care about this being correct, lock the buffer */ 3330 for_each_buffer_cpu(buffer, cpu) { 3331 cpu_buffer = buffer->buffers[cpu]; 3332 entries += rb_num_of_entries(cpu_buffer); 3333 } 3334 3335 return entries; 3336 } 3337 EXPORT_SYMBOL_GPL(ring_buffer_entries); 3338 3339 /** 3340 * ring_buffer_overruns - get the number of overruns in buffer 3341 * @buffer: The ring buffer 3342 * 3343 * Returns the total number of overruns in the ring buffer 3344 * (all CPU entries) 3345 */ 3346 unsigned long ring_buffer_overruns(struct ring_buffer *buffer) 3347 { 3348 struct ring_buffer_per_cpu *cpu_buffer; 3349 unsigned long overruns = 0; 3350 int cpu; 3351 3352 /* if you care about this being correct, lock the buffer */ 3353 for_each_buffer_cpu(buffer, cpu) { 3354 cpu_buffer = buffer->buffers[cpu]; 3355 overruns += local_read(&cpu_buffer->overrun); 3356 } 3357 3358 return overruns; 3359 } 3360 EXPORT_SYMBOL_GPL(ring_buffer_overruns); 3361 3362 static void rb_iter_reset(struct ring_buffer_iter *iter) 3363 { 3364 struct ring_buffer_per_cpu *cpu_buffer = iter->cpu_buffer; 3365 3366 /* Iterator usage is expected to have record disabled */ 3367 iter->head_page = cpu_buffer->reader_page; 3368 iter->head = cpu_buffer->reader_page->read; 3369 3370 iter->cache_reader_page = iter->head_page; 3371 iter->cache_read = cpu_buffer->read; 3372 3373 if (iter->head) 3374 iter->read_stamp = cpu_buffer->read_stamp; 3375 else 3376 iter->read_stamp = iter->head_page->page->time_stamp; 3377 } 3378 3379 /** 3380 * ring_buffer_iter_reset - reset an iterator 3381 * @iter: The iterator to reset 3382 * 3383 * Resets the iterator, so that it will start from the beginning 3384 * again. 3385 */ 3386 void ring_buffer_iter_reset(struct ring_buffer_iter *iter) 3387 { 3388 struct ring_buffer_per_cpu *cpu_buffer; 3389 unsigned long flags; 3390 3391 if (!iter) 3392 return; 3393 3394 cpu_buffer = iter->cpu_buffer; 3395 3396 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 3397 rb_iter_reset(iter); 3398 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 3399 } 3400 EXPORT_SYMBOL_GPL(ring_buffer_iter_reset); 3401 3402 /** 3403 * ring_buffer_iter_empty - check if an iterator has no more to read 3404 * @iter: The iterator to check 3405 */ 3406 int ring_buffer_iter_empty(struct ring_buffer_iter *iter) 3407 { 3408 struct ring_buffer_per_cpu *cpu_buffer; 3409 struct buffer_page *reader; 3410 struct buffer_page *head_page; 3411 struct buffer_page *commit_page; 3412 unsigned commit; 3413 3414 cpu_buffer = iter->cpu_buffer; 3415 3416 /* Remember, trace recording is off when iterator is in use */ 3417 reader = cpu_buffer->reader_page; 3418 head_page = cpu_buffer->head_page; 3419 commit_page = cpu_buffer->commit_page; 3420 commit = rb_page_commit(commit_page); 3421 3422 return ((iter->head_page == commit_page && iter->head == commit) || 3423 (iter->head_page == reader && commit_page == head_page && 3424 head_page->read == commit && 3425 iter->head == rb_page_commit(cpu_buffer->reader_page))); 3426 } 3427 EXPORT_SYMBOL_GPL(ring_buffer_iter_empty); 3428 3429 static void 3430 rb_update_read_stamp(struct ring_buffer_per_cpu *cpu_buffer, 3431 struct ring_buffer_event *event) 3432 { 3433 u64 delta; 3434 3435 switch (event->type_len) { 3436 case RINGBUF_TYPE_PADDING: 3437 return; 3438 3439 case RINGBUF_TYPE_TIME_EXTEND: 3440 delta = event->array[0]; 3441 delta <<= TS_SHIFT; 3442 delta += event->time_delta; 3443 cpu_buffer->read_stamp += delta; 3444 return; 3445 3446 case RINGBUF_TYPE_TIME_STAMP: 3447 /* FIXME: not implemented */ 3448 return; 3449 3450 case RINGBUF_TYPE_DATA: 3451 cpu_buffer->read_stamp += event->time_delta; 3452 return; 3453 3454 default: 3455 BUG(); 3456 } 3457 return; 3458 } 3459 3460 static void 3461 rb_update_iter_read_stamp(struct ring_buffer_iter *iter, 3462 struct ring_buffer_event *event) 3463 { 3464 u64 delta; 3465 3466 switch (event->type_len) { 3467 case RINGBUF_TYPE_PADDING: 3468 return; 3469 3470 case RINGBUF_TYPE_TIME_EXTEND: 3471 delta = event->array[0]; 3472 delta <<= TS_SHIFT; 3473 delta += event->time_delta; 3474 iter->read_stamp += delta; 3475 return; 3476 3477 case RINGBUF_TYPE_TIME_STAMP: 3478 /* FIXME: not implemented */ 3479 return; 3480 3481 case RINGBUF_TYPE_DATA: 3482 iter->read_stamp += event->time_delta; 3483 return; 3484 3485 default: 3486 BUG(); 3487 } 3488 return; 3489 } 3490 3491 static struct buffer_page * 3492 rb_get_reader_page(struct ring_buffer_per_cpu *cpu_buffer) 3493 { 3494 struct buffer_page *reader = NULL; 3495 unsigned long overwrite; 3496 unsigned long flags; 3497 int nr_loops = 0; 3498 int ret; 3499 3500 local_irq_save(flags); 3501 arch_spin_lock(&cpu_buffer->lock); 3502 3503 again: 3504 /* 3505 * This should normally only loop twice. But because the 3506 * start of the reader inserts an empty page, it causes 3507 * a case where we will loop three times. There should be no 3508 * reason to loop four times (that I know of). 3509 */ 3510 if (RB_WARN_ON(cpu_buffer, ++nr_loops > 3)) { 3511 reader = NULL; 3512 goto out; 3513 } 3514 3515 reader = cpu_buffer->reader_page; 3516 3517 /* If there's more to read, return this page */ 3518 if (cpu_buffer->reader_page->read < rb_page_size(reader)) 3519 goto out; 3520 3521 /* Never should we have an index greater than the size */ 3522 if (RB_WARN_ON(cpu_buffer, 3523 cpu_buffer->reader_page->read > rb_page_size(reader))) 3524 goto out; 3525 3526 /* check if we caught up to the tail */ 3527 reader = NULL; 3528 if (cpu_buffer->commit_page == cpu_buffer->reader_page) 3529 goto out; 3530 3531 /* Don't bother swapping if the ring buffer is empty */ 3532 if (rb_num_of_entries(cpu_buffer) == 0) 3533 goto out; 3534 3535 /* 3536 * Reset the reader page to size zero. 3537 */ 3538 local_set(&cpu_buffer->reader_page->write, 0); 3539 local_set(&cpu_buffer->reader_page->entries, 0); 3540 local_set(&cpu_buffer->reader_page->page->commit, 0); 3541 cpu_buffer->reader_page->real_end = 0; 3542 3543 spin: 3544 /* 3545 * Splice the empty reader page into the list around the head. 3546 */ 3547 reader = rb_set_head_page(cpu_buffer); 3548 if (!reader) 3549 goto out; 3550 cpu_buffer->reader_page->list.next = rb_list_head(reader->list.next); 3551 cpu_buffer->reader_page->list.prev = reader->list.prev; 3552 3553 /* 3554 * cpu_buffer->pages just needs to point to the buffer, it 3555 * has no specific buffer page to point to. Lets move it out 3556 * of our way so we don't accidentally swap it. 3557 */ 3558 cpu_buffer->pages = reader->list.prev; 3559 3560 /* The reader page will be pointing to the new head */ 3561 rb_set_list_to_head(cpu_buffer, &cpu_buffer->reader_page->list); 3562 3563 /* 3564 * We want to make sure we read the overruns after we set up our 3565 * pointers to the next object. The writer side does a 3566 * cmpxchg to cross pages which acts as the mb on the writer 3567 * side. Note, the reader will constantly fail the swap 3568 * while the writer is updating the pointers, so this 3569 * guarantees that the overwrite recorded here is the one we 3570 * want to compare with the last_overrun. 3571 */ 3572 smp_mb(); 3573 overwrite = local_read(&(cpu_buffer->overrun)); 3574 3575 /* 3576 * Here's the tricky part. 3577 * 3578 * We need to move the pointer past the header page. 3579 * But we can only do that if a writer is not currently 3580 * moving it. The page before the header page has the 3581 * flag bit '1' set if it is pointing to the page we want. 3582 * but if the writer is in the process of moving it 3583 * than it will be '2' or already moved '0'. 3584 */ 3585 3586 ret = rb_head_page_replace(reader, cpu_buffer->reader_page); 3587 3588 /* 3589 * If we did not convert it, then we must try again. 3590 */ 3591 if (!ret) 3592 goto spin; 3593 3594 /* 3595 * Yeah! We succeeded in replacing the page. 3596 * 3597 * Now make the new head point back to the reader page. 3598 */ 3599 rb_list_head(reader->list.next)->prev = &cpu_buffer->reader_page->list; 3600 rb_inc_page(cpu_buffer, &cpu_buffer->head_page); 3601 3602 /* Finally update the reader page to the new head */ 3603 cpu_buffer->reader_page = reader; 3604 cpu_buffer->reader_page->read = 0; 3605 3606 if (overwrite != cpu_buffer->last_overrun) { 3607 cpu_buffer->lost_events = overwrite - cpu_buffer->last_overrun; 3608 cpu_buffer->last_overrun = overwrite; 3609 } 3610 3611 goto again; 3612 3613 out: 3614 /* Update the read_stamp on the first event */ 3615 if (reader && reader->read == 0) 3616 cpu_buffer->read_stamp = reader->page->time_stamp; 3617 3618 arch_spin_unlock(&cpu_buffer->lock); 3619 local_irq_restore(flags); 3620 3621 return reader; 3622 } 3623 3624 static void rb_advance_reader(struct ring_buffer_per_cpu *cpu_buffer) 3625 { 3626 struct ring_buffer_event *event; 3627 struct buffer_page *reader; 3628 unsigned length; 3629 3630 reader = rb_get_reader_page(cpu_buffer); 3631 3632 /* This function should not be called when buffer is empty */ 3633 if (RB_WARN_ON(cpu_buffer, !reader)) 3634 return; 3635 3636 event = rb_reader_event(cpu_buffer); 3637 3638 if (event->type_len <= RINGBUF_TYPE_DATA_TYPE_LEN_MAX) 3639 cpu_buffer->read++; 3640 3641 rb_update_read_stamp(cpu_buffer, event); 3642 3643 length = rb_event_length(event); 3644 cpu_buffer->reader_page->read += length; 3645 } 3646 3647 static void rb_advance_iter(struct ring_buffer_iter *iter) 3648 { 3649 struct ring_buffer_per_cpu *cpu_buffer; 3650 struct ring_buffer_event *event; 3651 unsigned length; 3652 3653 cpu_buffer = iter->cpu_buffer; 3654 3655 /* 3656 * Check if we are at the end of the buffer. 3657 */ 3658 if (iter->head >= rb_page_size(iter->head_page)) { 3659 /* discarded commits can make the page empty */ 3660 if (iter->head_page == cpu_buffer->commit_page) 3661 return; 3662 rb_inc_iter(iter); 3663 return; 3664 } 3665 3666 event = rb_iter_head_event(iter); 3667 3668 length = rb_event_length(event); 3669 3670 /* 3671 * This should not be called to advance the header if we are 3672 * at the tail of the buffer. 3673 */ 3674 if (RB_WARN_ON(cpu_buffer, 3675 (iter->head_page == cpu_buffer->commit_page) && 3676 (iter->head + length > rb_commit_index(cpu_buffer)))) 3677 return; 3678 3679 rb_update_iter_read_stamp(iter, event); 3680 3681 iter->head += length; 3682 3683 /* check for end of page padding */ 3684 if ((iter->head >= rb_page_size(iter->head_page)) && 3685 (iter->head_page != cpu_buffer->commit_page)) 3686 rb_inc_iter(iter); 3687 } 3688 3689 static int rb_lost_events(struct ring_buffer_per_cpu *cpu_buffer) 3690 { 3691 return cpu_buffer->lost_events; 3692 } 3693 3694 static struct ring_buffer_event * 3695 rb_buffer_peek(struct ring_buffer_per_cpu *cpu_buffer, u64 *ts, 3696 unsigned long *lost_events) 3697 { 3698 struct ring_buffer_event *event; 3699 struct buffer_page *reader; 3700 int nr_loops = 0; 3701 3702 again: 3703 /* 3704 * We repeat when a time extend is encountered. 3705 * Since the time extend is always attached to a data event, 3706 * we should never loop more than once. 3707 * (We never hit the following condition more than twice). 3708 */ 3709 if (RB_WARN_ON(cpu_buffer, ++nr_loops > 2)) 3710 return NULL; 3711 3712 reader = rb_get_reader_page(cpu_buffer); 3713 if (!reader) 3714 return NULL; 3715 3716 event = rb_reader_event(cpu_buffer); 3717 3718 switch (event->type_len) { 3719 case RINGBUF_TYPE_PADDING: 3720 if (rb_null_event(event)) 3721 RB_WARN_ON(cpu_buffer, 1); 3722 /* 3723 * Because the writer could be discarding every 3724 * event it creates (which would probably be bad) 3725 * if we were to go back to "again" then we may never 3726 * catch up, and will trigger the warn on, or lock 3727 * the box. Return the padding, and we will release 3728 * the current locks, and try again. 3729 */ 3730 return event; 3731 3732 case RINGBUF_TYPE_TIME_EXTEND: 3733 /* Internal data, OK to advance */ 3734 rb_advance_reader(cpu_buffer); 3735 goto again; 3736 3737 case RINGBUF_TYPE_TIME_STAMP: 3738 /* FIXME: not implemented */ 3739 rb_advance_reader(cpu_buffer); 3740 goto again; 3741 3742 case RINGBUF_TYPE_DATA: 3743 if (ts) { 3744 *ts = cpu_buffer->read_stamp + event->time_delta; 3745 ring_buffer_normalize_time_stamp(cpu_buffer->buffer, 3746 cpu_buffer->cpu, ts); 3747 } 3748 if (lost_events) 3749 *lost_events = rb_lost_events(cpu_buffer); 3750 return event; 3751 3752 default: 3753 BUG(); 3754 } 3755 3756 return NULL; 3757 } 3758 EXPORT_SYMBOL_GPL(ring_buffer_peek); 3759 3760 static struct ring_buffer_event * 3761 rb_iter_peek(struct ring_buffer_iter *iter, u64 *ts) 3762 { 3763 struct ring_buffer *buffer; 3764 struct ring_buffer_per_cpu *cpu_buffer; 3765 struct ring_buffer_event *event; 3766 int nr_loops = 0; 3767 3768 cpu_buffer = iter->cpu_buffer; 3769 buffer = cpu_buffer->buffer; 3770 3771 /* 3772 * Check if someone performed a consuming read to 3773 * the buffer. A consuming read invalidates the iterator 3774 * and we need to reset the iterator in this case. 3775 */ 3776 if (unlikely(iter->cache_read != cpu_buffer->read || 3777 iter->cache_reader_page != cpu_buffer->reader_page)) 3778 rb_iter_reset(iter); 3779 3780 again: 3781 if (ring_buffer_iter_empty(iter)) 3782 return NULL; 3783 3784 /* 3785 * We repeat when a time extend is encountered or we hit 3786 * the end of the page. Since the time extend is always attached 3787 * to a data event, we should never loop more than three times. 3788 * Once for going to next page, once on time extend, and 3789 * finally once to get the event. 3790 * (We never hit the following condition more than thrice). 3791 */ 3792 if (RB_WARN_ON(cpu_buffer, ++nr_loops > 3)) 3793 return NULL; 3794 3795 if (rb_per_cpu_empty(cpu_buffer)) 3796 return NULL; 3797 3798 if (iter->head >= rb_page_size(iter->head_page)) { 3799 rb_inc_iter(iter); 3800 goto again; 3801 } 3802 3803 event = rb_iter_head_event(iter); 3804 3805 switch (event->type_len) { 3806 case RINGBUF_TYPE_PADDING: 3807 if (rb_null_event(event)) { 3808 rb_inc_iter(iter); 3809 goto again; 3810 } 3811 rb_advance_iter(iter); 3812 return event; 3813 3814 case RINGBUF_TYPE_TIME_EXTEND: 3815 /* Internal data, OK to advance */ 3816 rb_advance_iter(iter); 3817 goto again; 3818 3819 case RINGBUF_TYPE_TIME_STAMP: 3820 /* FIXME: not implemented */ 3821 rb_advance_iter(iter); 3822 goto again; 3823 3824 case RINGBUF_TYPE_DATA: 3825 if (ts) { 3826 *ts = iter->read_stamp + event->time_delta; 3827 ring_buffer_normalize_time_stamp(buffer, 3828 cpu_buffer->cpu, ts); 3829 } 3830 return event; 3831 3832 default: 3833 BUG(); 3834 } 3835 3836 return NULL; 3837 } 3838 EXPORT_SYMBOL_GPL(ring_buffer_iter_peek); 3839 3840 static inline bool rb_reader_lock(struct ring_buffer_per_cpu *cpu_buffer) 3841 { 3842 if (likely(!in_nmi())) { 3843 raw_spin_lock(&cpu_buffer->reader_lock); 3844 return true; 3845 } 3846 3847 /* 3848 * If an NMI die dumps out the content of the ring buffer 3849 * trylock must be used to prevent a deadlock if the NMI 3850 * preempted a task that holds the ring buffer locks. If 3851 * we get the lock then all is fine, if not, then continue 3852 * to do the read, but this can corrupt the ring buffer, 3853 * so it must be permanently disabled from future writes. 3854 * Reading from NMI is a oneshot deal. 3855 */ 3856 if (raw_spin_trylock(&cpu_buffer->reader_lock)) 3857 return true; 3858 3859 /* Continue without locking, but disable the ring buffer */ 3860 atomic_inc(&cpu_buffer->record_disabled); 3861 return false; 3862 } 3863 3864 static inline void 3865 rb_reader_unlock(struct ring_buffer_per_cpu *cpu_buffer, bool locked) 3866 { 3867 if (likely(locked)) 3868 raw_spin_unlock(&cpu_buffer->reader_lock); 3869 return; 3870 } 3871 3872 /** 3873 * ring_buffer_peek - peek at the next event to be read 3874 * @buffer: The ring buffer to read 3875 * @cpu: The cpu to peak at 3876 * @ts: The timestamp counter of this event. 3877 * @lost_events: a variable to store if events were lost (may be NULL) 3878 * 3879 * This will return the event that will be read next, but does 3880 * not consume the data. 3881 */ 3882 struct ring_buffer_event * 3883 ring_buffer_peek(struct ring_buffer *buffer, int cpu, u64 *ts, 3884 unsigned long *lost_events) 3885 { 3886 struct ring_buffer_per_cpu *cpu_buffer = buffer->buffers[cpu]; 3887 struct ring_buffer_event *event; 3888 unsigned long flags; 3889 bool dolock; 3890 3891 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 3892 return NULL; 3893 3894 again: 3895 local_irq_save(flags); 3896 dolock = rb_reader_lock(cpu_buffer); 3897 event = rb_buffer_peek(cpu_buffer, ts, lost_events); 3898 if (event && event->type_len == RINGBUF_TYPE_PADDING) 3899 rb_advance_reader(cpu_buffer); 3900 rb_reader_unlock(cpu_buffer, dolock); 3901 local_irq_restore(flags); 3902 3903 if (event && event->type_len == RINGBUF_TYPE_PADDING) 3904 goto again; 3905 3906 return event; 3907 } 3908 3909 /** 3910 * ring_buffer_iter_peek - peek at the next event to be read 3911 * @iter: The ring buffer iterator 3912 * @ts: The timestamp counter of this event. 3913 * 3914 * This will return the event that will be read next, but does 3915 * not increment the iterator. 3916 */ 3917 struct ring_buffer_event * 3918 ring_buffer_iter_peek(struct ring_buffer_iter *iter, u64 *ts) 3919 { 3920 struct ring_buffer_per_cpu *cpu_buffer = iter->cpu_buffer; 3921 struct ring_buffer_event *event; 3922 unsigned long flags; 3923 3924 again: 3925 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 3926 event = rb_iter_peek(iter, ts); 3927 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 3928 3929 if (event && event->type_len == RINGBUF_TYPE_PADDING) 3930 goto again; 3931 3932 return event; 3933 } 3934 3935 /** 3936 * ring_buffer_consume - return an event and consume it 3937 * @buffer: The ring buffer to get the next event from 3938 * @cpu: the cpu to read the buffer from 3939 * @ts: a variable to store the timestamp (may be NULL) 3940 * @lost_events: a variable to store if events were lost (may be NULL) 3941 * 3942 * Returns the next event in the ring buffer, and that event is consumed. 3943 * Meaning, that sequential reads will keep returning a different event, 3944 * and eventually empty the ring buffer if the producer is slower. 3945 */ 3946 struct ring_buffer_event * 3947 ring_buffer_consume(struct ring_buffer *buffer, int cpu, u64 *ts, 3948 unsigned long *lost_events) 3949 { 3950 struct ring_buffer_per_cpu *cpu_buffer; 3951 struct ring_buffer_event *event = NULL; 3952 unsigned long flags; 3953 bool dolock; 3954 3955 again: 3956 /* might be called in atomic */ 3957 preempt_disable(); 3958 3959 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 3960 goto out; 3961 3962 cpu_buffer = buffer->buffers[cpu]; 3963 local_irq_save(flags); 3964 dolock = rb_reader_lock(cpu_buffer); 3965 3966 event = rb_buffer_peek(cpu_buffer, ts, lost_events); 3967 if (event) { 3968 cpu_buffer->lost_events = 0; 3969 rb_advance_reader(cpu_buffer); 3970 } 3971 3972 rb_reader_unlock(cpu_buffer, dolock); 3973 local_irq_restore(flags); 3974 3975 out: 3976 preempt_enable(); 3977 3978 if (event && event->type_len == RINGBUF_TYPE_PADDING) 3979 goto again; 3980 3981 return event; 3982 } 3983 EXPORT_SYMBOL_GPL(ring_buffer_consume); 3984 3985 /** 3986 * ring_buffer_read_prepare - Prepare for a non consuming read of the buffer 3987 * @buffer: The ring buffer to read from 3988 * @cpu: The cpu buffer to iterate over 3989 * 3990 * This performs the initial preparations necessary to iterate 3991 * through the buffer. Memory is allocated, buffer recording 3992 * is disabled, and the iterator pointer is returned to the caller. 3993 * 3994 * Disabling buffer recordng prevents the reading from being 3995 * corrupted. This is not a consuming read, so a producer is not 3996 * expected. 3997 * 3998 * After a sequence of ring_buffer_read_prepare calls, the user is 3999 * expected to make at least one call to ring_buffer_read_prepare_sync. 4000 * Afterwards, ring_buffer_read_start is invoked to get things going 4001 * for real. 4002 * 4003 * This overall must be paired with ring_buffer_read_finish. 4004 */ 4005 struct ring_buffer_iter * 4006 ring_buffer_read_prepare(struct ring_buffer *buffer, int cpu) 4007 { 4008 struct ring_buffer_per_cpu *cpu_buffer; 4009 struct ring_buffer_iter *iter; 4010 4011 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 4012 return NULL; 4013 4014 iter = kmalloc(sizeof(*iter), GFP_KERNEL); 4015 if (!iter) 4016 return NULL; 4017 4018 cpu_buffer = buffer->buffers[cpu]; 4019 4020 iter->cpu_buffer = cpu_buffer; 4021 4022 atomic_inc(&buffer->resize_disabled); 4023 atomic_inc(&cpu_buffer->record_disabled); 4024 4025 return iter; 4026 } 4027 EXPORT_SYMBOL_GPL(ring_buffer_read_prepare); 4028 4029 /** 4030 * ring_buffer_read_prepare_sync - Synchronize a set of prepare calls 4031 * 4032 * All previously invoked ring_buffer_read_prepare calls to prepare 4033 * iterators will be synchronized. Afterwards, read_buffer_read_start 4034 * calls on those iterators are allowed. 4035 */ 4036 void 4037 ring_buffer_read_prepare_sync(void) 4038 { 4039 synchronize_sched(); 4040 } 4041 EXPORT_SYMBOL_GPL(ring_buffer_read_prepare_sync); 4042 4043 /** 4044 * ring_buffer_read_start - start a non consuming read of the buffer 4045 * @iter: The iterator returned by ring_buffer_read_prepare 4046 * 4047 * This finalizes the startup of an iteration through the buffer. 4048 * The iterator comes from a call to ring_buffer_read_prepare and 4049 * an intervening ring_buffer_read_prepare_sync must have been 4050 * performed. 4051 * 4052 * Must be paired with ring_buffer_read_finish. 4053 */ 4054 void 4055 ring_buffer_read_start(struct ring_buffer_iter *iter) 4056 { 4057 struct ring_buffer_per_cpu *cpu_buffer; 4058 unsigned long flags; 4059 4060 if (!iter) 4061 return; 4062 4063 cpu_buffer = iter->cpu_buffer; 4064 4065 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 4066 arch_spin_lock(&cpu_buffer->lock); 4067 rb_iter_reset(iter); 4068 arch_spin_unlock(&cpu_buffer->lock); 4069 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 4070 } 4071 EXPORT_SYMBOL_GPL(ring_buffer_read_start); 4072 4073 /** 4074 * ring_buffer_read_finish - finish reading the iterator of the buffer 4075 * @iter: The iterator retrieved by ring_buffer_start 4076 * 4077 * This re-enables the recording to the buffer, and frees the 4078 * iterator. 4079 */ 4080 void 4081 ring_buffer_read_finish(struct ring_buffer_iter *iter) 4082 { 4083 struct ring_buffer_per_cpu *cpu_buffer = iter->cpu_buffer; 4084 unsigned long flags; 4085 4086 /* 4087 * Ring buffer is disabled from recording, here's a good place 4088 * to check the integrity of the ring buffer. 4089 * Must prevent readers from trying to read, as the check 4090 * clears the HEAD page and readers require it. 4091 */ 4092 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 4093 rb_check_pages(cpu_buffer); 4094 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 4095 4096 atomic_dec(&cpu_buffer->record_disabled); 4097 atomic_dec(&cpu_buffer->buffer->resize_disabled); 4098 kfree(iter); 4099 } 4100 EXPORT_SYMBOL_GPL(ring_buffer_read_finish); 4101 4102 /** 4103 * ring_buffer_read - read the next item in the ring buffer by the iterator 4104 * @iter: The ring buffer iterator 4105 * @ts: The time stamp of the event read. 4106 * 4107 * This reads the next event in the ring buffer and increments the iterator. 4108 */ 4109 struct ring_buffer_event * 4110 ring_buffer_read(struct ring_buffer_iter *iter, u64 *ts) 4111 { 4112 struct ring_buffer_event *event; 4113 struct ring_buffer_per_cpu *cpu_buffer = iter->cpu_buffer; 4114 unsigned long flags; 4115 4116 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 4117 again: 4118 event = rb_iter_peek(iter, ts); 4119 if (!event) 4120 goto out; 4121 4122 if (event->type_len == RINGBUF_TYPE_PADDING) 4123 goto again; 4124 4125 rb_advance_iter(iter); 4126 out: 4127 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 4128 4129 return event; 4130 } 4131 EXPORT_SYMBOL_GPL(ring_buffer_read); 4132 4133 /** 4134 * ring_buffer_size - return the size of the ring buffer (in bytes) 4135 * @buffer: The ring buffer. 4136 */ 4137 unsigned long ring_buffer_size(struct ring_buffer *buffer, int cpu) 4138 { 4139 /* 4140 * Earlier, this method returned 4141 * BUF_PAGE_SIZE * buffer->nr_pages 4142 * Since the nr_pages field is now removed, we have converted this to 4143 * return the per cpu buffer value. 4144 */ 4145 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 4146 return 0; 4147 4148 return BUF_PAGE_SIZE * buffer->buffers[cpu]->nr_pages; 4149 } 4150 EXPORT_SYMBOL_GPL(ring_buffer_size); 4151 4152 static void 4153 rb_reset_cpu(struct ring_buffer_per_cpu *cpu_buffer) 4154 { 4155 rb_head_page_deactivate(cpu_buffer); 4156 4157 cpu_buffer->head_page 4158 = list_entry(cpu_buffer->pages, struct buffer_page, list); 4159 local_set(&cpu_buffer->head_page->write, 0); 4160 local_set(&cpu_buffer->head_page->entries, 0); 4161 local_set(&cpu_buffer->head_page->page->commit, 0); 4162 4163 cpu_buffer->head_page->read = 0; 4164 4165 cpu_buffer->tail_page = cpu_buffer->head_page; 4166 cpu_buffer->commit_page = cpu_buffer->head_page; 4167 4168 INIT_LIST_HEAD(&cpu_buffer->reader_page->list); 4169 INIT_LIST_HEAD(&cpu_buffer->new_pages); 4170 local_set(&cpu_buffer->reader_page->write, 0); 4171 local_set(&cpu_buffer->reader_page->entries, 0); 4172 local_set(&cpu_buffer->reader_page->page->commit, 0); 4173 cpu_buffer->reader_page->read = 0; 4174 4175 local_set(&cpu_buffer->entries_bytes, 0); 4176 local_set(&cpu_buffer->overrun, 0); 4177 local_set(&cpu_buffer->commit_overrun, 0); 4178 local_set(&cpu_buffer->dropped_events, 0); 4179 local_set(&cpu_buffer->entries, 0); 4180 local_set(&cpu_buffer->committing, 0); 4181 local_set(&cpu_buffer->commits, 0); 4182 cpu_buffer->read = 0; 4183 cpu_buffer->read_bytes = 0; 4184 4185 cpu_buffer->write_stamp = 0; 4186 cpu_buffer->read_stamp = 0; 4187 4188 cpu_buffer->lost_events = 0; 4189 cpu_buffer->last_overrun = 0; 4190 4191 rb_head_page_activate(cpu_buffer); 4192 } 4193 4194 /** 4195 * ring_buffer_reset_cpu - reset a ring buffer per CPU buffer 4196 * @buffer: The ring buffer to reset a per cpu buffer of 4197 * @cpu: The CPU buffer to be reset 4198 */ 4199 void ring_buffer_reset_cpu(struct ring_buffer *buffer, int cpu) 4200 { 4201 struct ring_buffer_per_cpu *cpu_buffer = buffer->buffers[cpu]; 4202 unsigned long flags; 4203 4204 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 4205 return; 4206 4207 atomic_inc(&buffer->resize_disabled); 4208 atomic_inc(&cpu_buffer->record_disabled); 4209 4210 /* Make sure all commits have finished */ 4211 synchronize_sched(); 4212 4213 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 4214 4215 if (RB_WARN_ON(cpu_buffer, local_read(&cpu_buffer->committing))) 4216 goto out; 4217 4218 arch_spin_lock(&cpu_buffer->lock); 4219 4220 rb_reset_cpu(cpu_buffer); 4221 4222 arch_spin_unlock(&cpu_buffer->lock); 4223 4224 out: 4225 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 4226 4227 atomic_dec(&cpu_buffer->record_disabled); 4228 atomic_dec(&buffer->resize_disabled); 4229 } 4230 EXPORT_SYMBOL_GPL(ring_buffer_reset_cpu); 4231 4232 /** 4233 * ring_buffer_reset - reset a ring buffer 4234 * @buffer: The ring buffer to reset all cpu buffers 4235 */ 4236 void ring_buffer_reset(struct ring_buffer *buffer) 4237 { 4238 int cpu; 4239 4240 for_each_buffer_cpu(buffer, cpu) 4241 ring_buffer_reset_cpu(buffer, cpu); 4242 } 4243 EXPORT_SYMBOL_GPL(ring_buffer_reset); 4244 4245 /** 4246 * rind_buffer_empty - is the ring buffer empty? 4247 * @buffer: The ring buffer to test 4248 */ 4249 bool ring_buffer_empty(struct ring_buffer *buffer) 4250 { 4251 struct ring_buffer_per_cpu *cpu_buffer; 4252 unsigned long flags; 4253 bool dolock; 4254 int cpu; 4255 int ret; 4256 4257 /* yes this is racy, but if you don't like the race, lock the buffer */ 4258 for_each_buffer_cpu(buffer, cpu) { 4259 cpu_buffer = buffer->buffers[cpu]; 4260 local_irq_save(flags); 4261 dolock = rb_reader_lock(cpu_buffer); 4262 ret = rb_per_cpu_empty(cpu_buffer); 4263 rb_reader_unlock(cpu_buffer, dolock); 4264 local_irq_restore(flags); 4265 4266 if (!ret) 4267 return false; 4268 } 4269 4270 return true; 4271 } 4272 EXPORT_SYMBOL_GPL(ring_buffer_empty); 4273 4274 /** 4275 * ring_buffer_empty_cpu - is a cpu buffer of a ring buffer empty? 4276 * @buffer: The ring buffer 4277 * @cpu: The CPU buffer to test 4278 */ 4279 bool ring_buffer_empty_cpu(struct ring_buffer *buffer, int cpu) 4280 { 4281 struct ring_buffer_per_cpu *cpu_buffer; 4282 unsigned long flags; 4283 bool dolock; 4284 int ret; 4285 4286 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 4287 return true; 4288 4289 cpu_buffer = buffer->buffers[cpu]; 4290 local_irq_save(flags); 4291 dolock = rb_reader_lock(cpu_buffer); 4292 ret = rb_per_cpu_empty(cpu_buffer); 4293 rb_reader_unlock(cpu_buffer, dolock); 4294 local_irq_restore(flags); 4295 4296 return ret; 4297 } 4298 EXPORT_SYMBOL_GPL(ring_buffer_empty_cpu); 4299 4300 #ifdef CONFIG_RING_BUFFER_ALLOW_SWAP 4301 /** 4302 * ring_buffer_swap_cpu - swap a CPU buffer between two ring buffers 4303 * @buffer_a: One buffer to swap with 4304 * @buffer_b: The other buffer to swap with 4305 * 4306 * This function is useful for tracers that want to take a "snapshot" 4307 * of a CPU buffer and has another back up buffer lying around. 4308 * it is expected that the tracer handles the cpu buffer not being 4309 * used at the moment. 4310 */ 4311 int ring_buffer_swap_cpu(struct ring_buffer *buffer_a, 4312 struct ring_buffer *buffer_b, int cpu) 4313 { 4314 struct ring_buffer_per_cpu *cpu_buffer_a; 4315 struct ring_buffer_per_cpu *cpu_buffer_b; 4316 int ret = -EINVAL; 4317 4318 if (!cpumask_test_cpu(cpu, buffer_a->cpumask) || 4319 !cpumask_test_cpu(cpu, buffer_b->cpumask)) 4320 goto out; 4321 4322 cpu_buffer_a = buffer_a->buffers[cpu]; 4323 cpu_buffer_b = buffer_b->buffers[cpu]; 4324 4325 /* At least make sure the two buffers are somewhat the same */ 4326 if (cpu_buffer_a->nr_pages != cpu_buffer_b->nr_pages) 4327 goto out; 4328 4329 ret = -EAGAIN; 4330 4331 if (atomic_read(&buffer_a->record_disabled)) 4332 goto out; 4333 4334 if (atomic_read(&buffer_b->record_disabled)) 4335 goto out; 4336 4337 if (atomic_read(&cpu_buffer_a->record_disabled)) 4338 goto out; 4339 4340 if (atomic_read(&cpu_buffer_b->record_disabled)) 4341 goto out; 4342 4343 /* 4344 * We can't do a synchronize_sched here because this 4345 * function can be called in atomic context. 4346 * Normally this will be called from the same CPU as cpu. 4347 * If not it's up to the caller to protect this. 4348 */ 4349 atomic_inc(&cpu_buffer_a->record_disabled); 4350 atomic_inc(&cpu_buffer_b->record_disabled); 4351 4352 ret = -EBUSY; 4353 if (local_read(&cpu_buffer_a->committing)) 4354 goto out_dec; 4355 if (local_read(&cpu_buffer_b->committing)) 4356 goto out_dec; 4357 4358 buffer_a->buffers[cpu] = cpu_buffer_b; 4359 buffer_b->buffers[cpu] = cpu_buffer_a; 4360 4361 cpu_buffer_b->buffer = buffer_a; 4362 cpu_buffer_a->buffer = buffer_b; 4363 4364 ret = 0; 4365 4366 out_dec: 4367 atomic_dec(&cpu_buffer_a->record_disabled); 4368 atomic_dec(&cpu_buffer_b->record_disabled); 4369 out: 4370 return ret; 4371 } 4372 EXPORT_SYMBOL_GPL(ring_buffer_swap_cpu); 4373 #endif /* CONFIG_RING_BUFFER_ALLOW_SWAP */ 4374 4375 /** 4376 * ring_buffer_alloc_read_page - allocate a page to read from buffer 4377 * @buffer: the buffer to allocate for. 4378 * @cpu: the cpu buffer to allocate. 4379 * 4380 * This function is used in conjunction with ring_buffer_read_page. 4381 * When reading a full page from the ring buffer, these functions 4382 * can be used to speed up the process. The calling function should 4383 * allocate a few pages first with this function. Then when it 4384 * needs to get pages from the ring buffer, it passes the result 4385 * of this function into ring_buffer_read_page, which will swap 4386 * the page that was allocated, with the read page of the buffer. 4387 * 4388 * Returns: 4389 * The page allocated, or NULL on error. 4390 */ 4391 void *ring_buffer_alloc_read_page(struct ring_buffer *buffer, int cpu) 4392 { 4393 struct ring_buffer_per_cpu *cpu_buffer = buffer->buffers[cpu]; 4394 struct buffer_data_page *bpage = NULL; 4395 unsigned long flags; 4396 struct page *page; 4397 4398 local_irq_save(flags); 4399 arch_spin_lock(&cpu_buffer->lock); 4400 4401 if (cpu_buffer->free_page) { 4402 bpage = cpu_buffer->free_page; 4403 cpu_buffer->free_page = NULL; 4404 } 4405 4406 arch_spin_unlock(&cpu_buffer->lock); 4407 local_irq_restore(flags); 4408 4409 if (bpage) 4410 goto out; 4411 4412 page = alloc_pages_node(cpu_to_node(cpu), 4413 GFP_KERNEL | __GFP_NORETRY, 0); 4414 if (!page) 4415 return NULL; 4416 4417 bpage = page_address(page); 4418 4419 out: 4420 rb_init_page(bpage); 4421 4422 return bpage; 4423 } 4424 EXPORT_SYMBOL_GPL(ring_buffer_alloc_read_page); 4425 4426 /** 4427 * ring_buffer_free_read_page - free an allocated read page 4428 * @buffer: the buffer the page was allocate for 4429 * @cpu: the cpu buffer the page came from 4430 * @data: the page to free 4431 * 4432 * Free a page allocated from ring_buffer_alloc_read_page. 4433 */ 4434 void ring_buffer_free_read_page(struct ring_buffer *buffer, int cpu, void *data) 4435 { 4436 struct ring_buffer_per_cpu *cpu_buffer = buffer->buffers[cpu]; 4437 struct buffer_data_page *bpage = data; 4438 unsigned long flags; 4439 4440 local_irq_save(flags); 4441 arch_spin_lock(&cpu_buffer->lock); 4442 4443 if (!cpu_buffer->free_page) { 4444 cpu_buffer->free_page = bpage; 4445 bpage = NULL; 4446 } 4447 4448 arch_spin_unlock(&cpu_buffer->lock); 4449 local_irq_restore(flags); 4450 4451 free_page((unsigned long)bpage); 4452 } 4453 EXPORT_SYMBOL_GPL(ring_buffer_free_read_page); 4454 4455 /** 4456 * ring_buffer_read_page - extract a page from the ring buffer 4457 * @buffer: buffer to extract from 4458 * @data_page: the page to use allocated from ring_buffer_alloc_read_page 4459 * @len: amount to extract 4460 * @cpu: the cpu of the buffer to extract 4461 * @full: should the extraction only happen when the page is full. 4462 * 4463 * This function will pull out a page from the ring buffer and consume it. 4464 * @data_page must be the address of the variable that was returned 4465 * from ring_buffer_alloc_read_page. This is because the page might be used 4466 * to swap with a page in the ring buffer. 4467 * 4468 * for example: 4469 * rpage = ring_buffer_alloc_read_page(buffer, cpu); 4470 * if (!rpage) 4471 * return error; 4472 * ret = ring_buffer_read_page(buffer, &rpage, len, cpu, 0); 4473 * if (ret >= 0) 4474 * process_page(rpage, ret); 4475 * 4476 * When @full is set, the function will not return true unless 4477 * the writer is off the reader page. 4478 * 4479 * Note: it is up to the calling functions to handle sleeps and wakeups. 4480 * The ring buffer can be used anywhere in the kernel and can not 4481 * blindly call wake_up. The layer that uses the ring buffer must be 4482 * responsible for that. 4483 * 4484 * Returns: 4485 * >=0 if data has been transferred, returns the offset of consumed data. 4486 * <0 if no data has been transferred. 4487 */ 4488 int ring_buffer_read_page(struct ring_buffer *buffer, 4489 void **data_page, size_t len, int cpu, int full) 4490 { 4491 struct ring_buffer_per_cpu *cpu_buffer = buffer->buffers[cpu]; 4492 struct ring_buffer_event *event; 4493 struct buffer_data_page *bpage; 4494 struct buffer_page *reader; 4495 unsigned long missed_events; 4496 unsigned long flags; 4497 unsigned int commit; 4498 unsigned int read; 4499 u64 save_timestamp; 4500 int ret = -1; 4501 4502 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 4503 goto out; 4504 4505 /* 4506 * If len is not big enough to hold the page header, then 4507 * we can not copy anything. 4508 */ 4509 if (len <= BUF_PAGE_HDR_SIZE) 4510 goto out; 4511 4512 len -= BUF_PAGE_HDR_SIZE; 4513 4514 if (!data_page) 4515 goto out; 4516 4517 bpage = *data_page; 4518 if (!bpage) 4519 goto out; 4520 4521 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 4522 4523 reader = rb_get_reader_page(cpu_buffer); 4524 if (!reader) 4525 goto out_unlock; 4526 4527 event = rb_reader_event(cpu_buffer); 4528 4529 read = reader->read; 4530 commit = rb_page_commit(reader); 4531 4532 /* Check if any events were dropped */ 4533 missed_events = cpu_buffer->lost_events; 4534 4535 /* 4536 * If this page has been partially read or 4537 * if len is not big enough to read the rest of the page or 4538 * a writer is still on the page, then 4539 * we must copy the data from the page to the buffer. 4540 * Otherwise, we can simply swap the page with the one passed in. 4541 */ 4542 if (read || (len < (commit - read)) || 4543 cpu_buffer->reader_page == cpu_buffer->commit_page) { 4544 struct buffer_data_page *rpage = cpu_buffer->reader_page->page; 4545 unsigned int rpos = read; 4546 unsigned int pos = 0; 4547 unsigned int size; 4548 4549 if (full) 4550 goto out_unlock; 4551 4552 if (len > (commit - read)) 4553 len = (commit - read); 4554 4555 /* Always keep the time extend and data together */ 4556 size = rb_event_ts_length(event); 4557 4558 if (len < size) 4559 goto out_unlock; 4560 4561 /* save the current timestamp, since the user will need it */ 4562 save_timestamp = cpu_buffer->read_stamp; 4563 4564 /* Need to copy one event at a time */ 4565 do { 4566 /* We need the size of one event, because 4567 * rb_advance_reader only advances by one event, 4568 * whereas rb_event_ts_length may include the size of 4569 * one or two events. 4570 * We have already ensured there's enough space if this 4571 * is a time extend. */ 4572 size = rb_event_length(event); 4573 memcpy(bpage->data + pos, rpage->data + rpos, size); 4574 4575 len -= size; 4576 4577 rb_advance_reader(cpu_buffer); 4578 rpos = reader->read; 4579 pos += size; 4580 4581 if (rpos >= commit) 4582 break; 4583 4584 event = rb_reader_event(cpu_buffer); 4585 /* Always keep the time extend and data together */ 4586 size = rb_event_ts_length(event); 4587 } while (len >= size); 4588 4589 /* update bpage */ 4590 local_set(&bpage->commit, pos); 4591 bpage->time_stamp = save_timestamp; 4592 4593 /* we copied everything to the beginning */ 4594 read = 0; 4595 } else { 4596 /* update the entry counter */ 4597 cpu_buffer->read += rb_page_entries(reader); 4598 cpu_buffer->read_bytes += BUF_PAGE_SIZE; 4599 4600 /* swap the pages */ 4601 rb_init_page(bpage); 4602 bpage = reader->page; 4603 reader->page = *data_page; 4604 local_set(&reader->write, 0); 4605 local_set(&reader->entries, 0); 4606 reader->read = 0; 4607 *data_page = bpage; 4608 4609 /* 4610 * Use the real_end for the data size, 4611 * This gives us a chance to store the lost events 4612 * on the page. 4613 */ 4614 if (reader->real_end) 4615 local_set(&bpage->commit, reader->real_end); 4616 } 4617 ret = read; 4618 4619 cpu_buffer->lost_events = 0; 4620 4621 commit = local_read(&bpage->commit); 4622 /* 4623 * Set a flag in the commit field if we lost events 4624 */ 4625 if (missed_events) { 4626 /* If there is room at the end of the page to save the 4627 * missed events, then record it there. 4628 */ 4629 if (BUF_PAGE_SIZE - commit >= sizeof(missed_events)) { 4630 memcpy(&bpage->data[commit], &missed_events, 4631 sizeof(missed_events)); 4632 local_add(RB_MISSED_STORED, &bpage->commit); 4633 commit += sizeof(missed_events); 4634 } 4635 local_add(RB_MISSED_EVENTS, &bpage->commit); 4636 } 4637 4638 /* 4639 * This page may be off to user land. Zero it out here. 4640 */ 4641 if (commit < BUF_PAGE_SIZE) 4642 memset(&bpage->data[commit], 0, BUF_PAGE_SIZE - commit); 4643 4644 out_unlock: 4645 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 4646 4647 out: 4648 return ret; 4649 } 4650 EXPORT_SYMBOL_GPL(ring_buffer_read_page); 4651 4652 /* 4653 * We only allocate new buffers, never free them if the CPU goes down. 4654 * If we were to free the buffer, then the user would lose any trace that was in 4655 * the buffer. 4656 */ 4657 int trace_rb_cpu_prepare(unsigned int cpu, struct hlist_node *node) 4658 { 4659 struct ring_buffer *buffer; 4660 long nr_pages_same; 4661 int cpu_i; 4662 unsigned long nr_pages; 4663 4664 buffer = container_of(node, struct ring_buffer, node); 4665 if (cpumask_test_cpu(cpu, buffer->cpumask)) 4666 return 0; 4667 4668 nr_pages = 0; 4669 nr_pages_same = 1; 4670 /* check if all cpu sizes are same */ 4671 for_each_buffer_cpu(buffer, cpu_i) { 4672 /* fill in the size from first enabled cpu */ 4673 if (nr_pages == 0) 4674 nr_pages = buffer->buffers[cpu_i]->nr_pages; 4675 if (nr_pages != buffer->buffers[cpu_i]->nr_pages) { 4676 nr_pages_same = 0; 4677 break; 4678 } 4679 } 4680 /* allocate minimum pages, user can later expand it */ 4681 if (!nr_pages_same) 4682 nr_pages = 2; 4683 buffer->buffers[cpu] = 4684 rb_allocate_cpu_buffer(buffer, nr_pages, cpu); 4685 if (!buffer->buffers[cpu]) { 4686 WARN(1, "failed to allocate ring buffer on CPU %u\n", 4687 cpu); 4688 return -ENOMEM; 4689 } 4690 smp_wmb(); 4691 cpumask_set_cpu(cpu, buffer->cpumask); 4692 return 0; 4693 } 4694 4695 #ifdef CONFIG_RING_BUFFER_STARTUP_TEST 4696 /* 4697 * This is a basic integrity check of the ring buffer. 4698 * Late in the boot cycle this test will run when configured in. 4699 * It will kick off a thread per CPU that will go into a loop 4700 * writing to the per cpu ring buffer various sizes of data. 4701 * Some of the data will be large items, some small. 4702 * 4703 * Another thread is created that goes into a spin, sending out 4704 * IPIs to the other CPUs to also write into the ring buffer. 4705 * this is to test the nesting ability of the buffer. 4706 * 4707 * Basic stats are recorded and reported. If something in the 4708 * ring buffer should happen that's not expected, a big warning 4709 * is displayed and all ring buffers are disabled. 4710 */ 4711 static struct task_struct *rb_threads[NR_CPUS] __initdata; 4712 4713 struct rb_test_data { 4714 struct ring_buffer *buffer; 4715 unsigned long events; 4716 unsigned long bytes_written; 4717 unsigned long bytes_alloc; 4718 unsigned long bytes_dropped; 4719 unsigned long events_nested; 4720 unsigned long bytes_written_nested; 4721 unsigned long bytes_alloc_nested; 4722 unsigned long bytes_dropped_nested; 4723 int min_size_nested; 4724 int max_size_nested; 4725 int max_size; 4726 int min_size; 4727 int cpu; 4728 int cnt; 4729 }; 4730 4731 static struct rb_test_data rb_data[NR_CPUS] __initdata; 4732 4733 /* 1 meg per cpu */ 4734 #define RB_TEST_BUFFER_SIZE 1048576 4735 4736 static char rb_string[] __initdata = 4737 "abcdefghijklmnopqrstuvwxyz1234567890!@#$%^&*()?+\\" 4738 "?+|:';\",.<>/?abcdefghijklmnopqrstuvwxyz1234567890" 4739 "!@#$%^&*()?+\\?+|:';\",.<>/?abcdefghijklmnopqrstuv"; 4740 4741 static bool rb_test_started __initdata; 4742 4743 struct rb_item { 4744 int size; 4745 char str[]; 4746 }; 4747 4748 static __init int rb_write_something(struct rb_test_data *data, bool nested) 4749 { 4750 struct ring_buffer_event *event; 4751 struct rb_item *item; 4752 bool started; 4753 int event_len; 4754 int size; 4755 int len; 4756 int cnt; 4757 4758 /* Have nested writes different that what is written */ 4759 cnt = data->cnt + (nested ? 27 : 0); 4760 4761 /* Multiply cnt by ~e, to make some unique increment */ 4762 size = (data->cnt * 68 / 25) % (sizeof(rb_string) - 1); 4763 4764 len = size + sizeof(struct rb_item); 4765 4766 started = rb_test_started; 4767 /* read rb_test_started before checking buffer enabled */ 4768 smp_rmb(); 4769 4770 event = ring_buffer_lock_reserve(data->buffer, len); 4771 if (!event) { 4772 /* Ignore dropped events before test starts. */ 4773 if (started) { 4774 if (nested) 4775 data->bytes_dropped += len; 4776 else 4777 data->bytes_dropped_nested += len; 4778 } 4779 return len; 4780 } 4781 4782 event_len = ring_buffer_event_length(event); 4783 4784 if (RB_WARN_ON(data->buffer, event_len < len)) 4785 goto out; 4786 4787 item = ring_buffer_event_data(event); 4788 item->size = size; 4789 memcpy(item->str, rb_string, size); 4790 4791 if (nested) { 4792 data->bytes_alloc_nested += event_len; 4793 data->bytes_written_nested += len; 4794 data->events_nested++; 4795 if (!data->min_size_nested || len < data->min_size_nested) 4796 data->min_size_nested = len; 4797 if (len > data->max_size_nested) 4798 data->max_size_nested = len; 4799 } else { 4800 data->bytes_alloc += event_len; 4801 data->bytes_written += len; 4802 data->events++; 4803 if (!data->min_size || len < data->min_size) 4804 data->max_size = len; 4805 if (len > data->max_size) 4806 data->max_size = len; 4807 } 4808 4809 out: 4810 ring_buffer_unlock_commit(data->buffer, event); 4811 4812 return 0; 4813 } 4814 4815 static __init int rb_test(void *arg) 4816 { 4817 struct rb_test_data *data = arg; 4818 4819 while (!kthread_should_stop()) { 4820 rb_write_something(data, false); 4821 data->cnt++; 4822 4823 set_current_state(TASK_INTERRUPTIBLE); 4824 /* Now sleep between a min of 100-300us and a max of 1ms */ 4825 usleep_range(((data->cnt % 3) + 1) * 100, 1000); 4826 } 4827 4828 return 0; 4829 } 4830 4831 static __init void rb_ipi(void *ignore) 4832 { 4833 struct rb_test_data *data; 4834 int cpu = smp_processor_id(); 4835 4836 data = &rb_data[cpu]; 4837 rb_write_something(data, true); 4838 } 4839 4840 static __init int rb_hammer_test(void *arg) 4841 { 4842 while (!kthread_should_stop()) { 4843 4844 /* Send an IPI to all cpus to write data! */ 4845 smp_call_function(rb_ipi, NULL, 1); 4846 /* No sleep, but for non preempt, let others run */ 4847 schedule(); 4848 } 4849 4850 return 0; 4851 } 4852 4853 static __init int test_ringbuffer(void) 4854 { 4855 struct task_struct *rb_hammer; 4856 struct ring_buffer *buffer; 4857 int cpu; 4858 int ret = 0; 4859 4860 pr_info("Running ring buffer tests...\n"); 4861 4862 buffer = ring_buffer_alloc(RB_TEST_BUFFER_SIZE, RB_FL_OVERWRITE); 4863 if (WARN_ON(!buffer)) 4864 return 0; 4865 4866 /* Disable buffer so that threads can't write to it yet */ 4867 ring_buffer_record_off(buffer); 4868 4869 for_each_online_cpu(cpu) { 4870 rb_data[cpu].buffer = buffer; 4871 rb_data[cpu].cpu = cpu; 4872 rb_data[cpu].cnt = cpu; 4873 rb_threads[cpu] = kthread_create(rb_test, &rb_data[cpu], 4874 "rbtester/%d", cpu); 4875 if (WARN_ON(IS_ERR(rb_threads[cpu]))) { 4876 pr_cont("FAILED\n"); 4877 ret = PTR_ERR(rb_threads[cpu]); 4878 goto out_free; 4879 } 4880 4881 kthread_bind(rb_threads[cpu], cpu); 4882 wake_up_process(rb_threads[cpu]); 4883 } 4884 4885 /* Now create the rb hammer! */ 4886 rb_hammer = kthread_run(rb_hammer_test, NULL, "rbhammer"); 4887 if (WARN_ON(IS_ERR(rb_hammer))) { 4888 pr_cont("FAILED\n"); 4889 ret = PTR_ERR(rb_hammer); 4890 goto out_free; 4891 } 4892 4893 ring_buffer_record_on(buffer); 4894 /* 4895 * Show buffer is enabled before setting rb_test_started. 4896 * Yes there's a small race window where events could be 4897 * dropped and the thread wont catch it. But when a ring 4898 * buffer gets enabled, there will always be some kind of 4899 * delay before other CPUs see it. Thus, we don't care about 4900 * those dropped events. We care about events dropped after 4901 * the threads see that the buffer is active. 4902 */ 4903 smp_wmb(); 4904 rb_test_started = true; 4905 4906 set_current_state(TASK_INTERRUPTIBLE); 4907 /* Just run for 10 seconds */; 4908 schedule_timeout(10 * HZ); 4909 4910 kthread_stop(rb_hammer); 4911 4912 out_free: 4913 for_each_online_cpu(cpu) { 4914 if (!rb_threads[cpu]) 4915 break; 4916 kthread_stop(rb_threads[cpu]); 4917 } 4918 if (ret) { 4919 ring_buffer_free(buffer); 4920 return ret; 4921 } 4922 4923 /* Report! */ 4924 pr_info("finished\n"); 4925 for_each_online_cpu(cpu) { 4926 struct ring_buffer_event *event; 4927 struct rb_test_data *data = &rb_data[cpu]; 4928 struct rb_item *item; 4929 unsigned long total_events; 4930 unsigned long total_dropped; 4931 unsigned long total_written; 4932 unsigned long total_alloc; 4933 unsigned long total_read = 0; 4934 unsigned long total_size = 0; 4935 unsigned long total_len = 0; 4936 unsigned long total_lost = 0; 4937 unsigned long lost; 4938 int big_event_size; 4939 int small_event_size; 4940 4941 ret = -1; 4942 4943 total_events = data->events + data->events_nested; 4944 total_written = data->bytes_written + data->bytes_written_nested; 4945 total_alloc = data->bytes_alloc + data->bytes_alloc_nested; 4946 total_dropped = data->bytes_dropped + data->bytes_dropped_nested; 4947 4948 big_event_size = data->max_size + data->max_size_nested; 4949 small_event_size = data->min_size + data->min_size_nested; 4950 4951 pr_info("CPU %d:\n", cpu); 4952 pr_info(" events: %ld\n", total_events); 4953 pr_info(" dropped bytes: %ld\n", total_dropped); 4954 pr_info(" alloced bytes: %ld\n", total_alloc); 4955 pr_info(" written bytes: %ld\n", total_written); 4956 pr_info(" biggest event: %d\n", big_event_size); 4957 pr_info(" smallest event: %d\n", small_event_size); 4958 4959 if (RB_WARN_ON(buffer, total_dropped)) 4960 break; 4961 4962 ret = 0; 4963 4964 while ((event = ring_buffer_consume(buffer, cpu, NULL, &lost))) { 4965 total_lost += lost; 4966 item = ring_buffer_event_data(event); 4967 total_len += ring_buffer_event_length(event); 4968 total_size += item->size + sizeof(struct rb_item); 4969 if (memcmp(&item->str[0], rb_string, item->size) != 0) { 4970 pr_info("FAILED!\n"); 4971 pr_info("buffer had: %.*s\n", item->size, item->str); 4972 pr_info("expected: %.*s\n", item->size, rb_string); 4973 RB_WARN_ON(buffer, 1); 4974 ret = -1; 4975 break; 4976 } 4977 total_read++; 4978 } 4979 if (ret) 4980 break; 4981 4982 ret = -1; 4983 4984 pr_info(" read events: %ld\n", total_read); 4985 pr_info(" lost events: %ld\n", total_lost); 4986 pr_info(" total events: %ld\n", total_lost + total_read); 4987 pr_info(" recorded len bytes: %ld\n", total_len); 4988 pr_info(" recorded size bytes: %ld\n", total_size); 4989 if (total_lost) 4990 pr_info(" With dropped events, record len and size may not match\n" 4991 " alloced and written from above\n"); 4992 if (!total_lost) { 4993 if (RB_WARN_ON(buffer, total_len != total_alloc || 4994 total_size != total_written)) 4995 break; 4996 } 4997 if (RB_WARN_ON(buffer, total_lost + total_read != total_events)) 4998 break; 4999 5000 ret = 0; 5001 } 5002 if (!ret) 5003 pr_info("Ring buffer PASSED!\n"); 5004 5005 ring_buffer_free(buffer); 5006 return 0; 5007 } 5008 5009 late_initcall(test_ringbuffer); 5010 #endif /* CONFIG_RING_BUFFER_STARTUP_TEST */ 5011