1 // SPDX-License-Identifier: GPL-2.0 2 /* 3 * Generic ring buffer 4 * 5 * Copyright (C) 2008 Steven Rostedt <srostedt@redhat.com> 6 */ 7 #include <linux/trace_events.h> 8 #include <linux/ring_buffer.h> 9 #include <linux/trace_clock.h> 10 #include <linux/sched/clock.h> 11 #include <linux/trace_seq.h> 12 #include <linux/spinlock.h> 13 #include <linux/irq_work.h> 14 #include <linux/security.h> 15 #include <linux/uaccess.h> 16 #include <linux/hardirq.h> 17 #include <linux/kthread.h> /* for self test */ 18 #include <linux/module.h> 19 #include <linux/percpu.h> 20 #include <linux/mutex.h> 21 #include <linux/delay.h> 22 #include <linux/slab.h> 23 #include <linux/init.h> 24 #include <linux/hash.h> 25 #include <linux/list.h> 26 #include <linux/cpu.h> 27 #include <linux/oom.h> 28 29 #include <asm/local.h> 30 31 static void update_pages_handler(struct work_struct *work); 32 33 /* 34 * The ring buffer header is special. We must manually up keep it. 35 */ 36 int ring_buffer_print_entry_header(struct trace_seq *s) 37 { 38 trace_seq_puts(s, "# compressed entry header\n"); 39 trace_seq_puts(s, "\ttype_len : 5 bits\n"); 40 trace_seq_puts(s, "\ttime_delta : 27 bits\n"); 41 trace_seq_puts(s, "\tarray : 32 bits\n"); 42 trace_seq_putc(s, '\n'); 43 trace_seq_printf(s, "\tpadding : type == %d\n", 44 RINGBUF_TYPE_PADDING); 45 trace_seq_printf(s, "\ttime_extend : type == %d\n", 46 RINGBUF_TYPE_TIME_EXTEND); 47 trace_seq_printf(s, "\ttime_stamp : type == %d\n", 48 RINGBUF_TYPE_TIME_STAMP); 49 trace_seq_printf(s, "\tdata max type_len == %d\n", 50 RINGBUF_TYPE_DATA_TYPE_LEN_MAX); 51 52 return !trace_seq_has_overflowed(s); 53 } 54 55 /* 56 * The ring buffer is made up of a list of pages. A separate list of pages is 57 * allocated for each CPU. A writer may only write to a buffer that is 58 * associated with the CPU it is currently executing on. A reader may read 59 * from any per cpu buffer. 60 * 61 * The reader is special. For each per cpu buffer, the reader has its own 62 * reader page. When a reader has read the entire reader page, this reader 63 * page is swapped with another page in the ring buffer. 64 * 65 * Now, as long as the writer is off the reader page, the reader can do what 66 * ever it wants with that page. The writer will never write to that page 67 * again (as long as it is out of the ring buffer). 68 * 69 * Here's some silly ASCII art. 70 * 71 * +------+ 72 * |reader| RING BUFFER 73 * |page | 74 * +------+ +---+ +---+ +---+ 75 * | |-->| |-->| | 76 * +---+ +---+ +---+ 77 * ^ | 78 * | | 79 * +---------------+ 80 * 81 * 82 * +------+ 83 * |reader| RING BUFFER 84 * |page |------------------v 85 * +------+ +---+ +---+ +---+ 86 * | |-->| |-->| | 87 * +---+ +---+ +---+ 88 * ^ | 89 * | | 90 * +---------------+ 91 * 92 * 93 * +------+ 94 * |reader| RING BUFFER 95 * |page |------------------v 96 * +------+ +---+ +---+ +---+ 97 * ^ | |-->| |-->| | 98 * | +---+ +---+ +---+ 99 * | | 100 * | | 101 * +------------------------------+ 102 * 103 * 104 * +------+ 105 * |buffer| RING BUFFER 106 * |page |------------------v 107 * +------+ +---+ +---+ +---+ 108 * ^ | | | |-->| | 109 * | New +---+ +---+ +---+ 110 * | Reader------^ | 111 * | page | 112 * +------------------------------+ 113 * 114 * 115 * After we make this swap, the reader can hand this page off to the splice 116 * code and be done with it. It can even allocate a new page if it needs to 117 * and swap that into the ring buffer. 118 * 119 * We will be using cmpxchg soon to make all this lockless. 120 * 121 */ 122 123 /* Used for individual buffers (after the counter) */ 124 #define RB_BUFFER_OFF (1 << 20) 125 126 #define BUF_PAGE_HDR_SIZE offsetof(struct buffer_data_page, data) 127 128 #define RB_EVNT_HDR_SIZE (offsetof(struct ring_buffer_event, array)) 129 #define RB_ALIGNMENT 4U 130 #define RB_MAX_SMALL_DATA (RB_ALIGNMENT * RINGBUF_TYPE_DATA_TYPE_LEN_MAX) 131 #define RB_EVNT_MIN_SIZE 8U /* two 32bit words */ 132 #define RB_ALIGN_DATA __aligned(RB_ALIGNMENT) 133 134 /* define RINGBUF_TYPE_DATA for 'case RINGBUF_TYPE_DATA:' */ 135 #define RINGBUF_TYPE_DATA 0 ... RINGBUF_TYPE_DATA_TYPE_LEN_MAX 136 137 enum { 138 RB_LEN_TIME_EXTEND = 8, 139 RB_LEN_TIME_STAMP = 8, 140 }; 141 142 #define skip_time_extend(event) \ 143 ((struct ring_buffer_event *)((char *)event + RB_LEN_TIME_EXTEND)) 144 145 #define extended_time(event) \ 146 (event->type_len >= RINGBUF_TYPE_TIME_EXTEND) 147 148 static inline int rb_null_event(struct ring_buffer_event *event) 149 { 150 return event->type_len == RINGBUF_TYPE_PADDING && !event->time_delta; 151 } 152 153 static void rb_event_set_padding(struct ring_buffer_event *event) 154 { 155 /* padding has a NULL time_delta */ 156 event->type_len = RINGBUF_TYPE_PADDING; 157 event->time_delta = 0; 158 } 159 160 static unsigned 161 rb_event_data_length(struct ring_buffer_event *event) 162 { 163 unsigned length; 164 165 if (event->type_len) 166 length = event->type_len * RB_ALIGNMENT; 167 else 168 length = event->array[0]; 169 return length + RB_EVNT_HDR_SIZE; 170 } 171 172 /* 173 * Return the length of the given event. Will return 174 * the length of the time extend if the event is a 175 * time extend. 176 */ 177 static inline unsigned 178 rb_event_length(struct ring_buffer_event *event) 179 { 180 switch (event->type_len) { 181 case RINGBUF_TYPE_PADDING: 182 if (rb_null_event(event)) 183 /* undefined */ 184 return -1; 185 return event->array[0] + RB_EVNT_HDR_SIZE; 186 187 case RINGBUF_TYPE_TIME_EXTEND: 188 return RB_LEN_TIME_EXTEND; 189 190 case RINGBUF_TYPE_TIME_STAMP: 191 return RB_LEN_TIME_STAMP; 192 193 case RINGBUF_TYPE_DATA: 194 return rb_event_data_length(event); 195 default: 196 BUG(); 197 } 198 /* not hit */ 199 return 0; 200 } 201 202 /* 203 * Return total length of time extend and data, 204 * or just the event length for all other events. 205 */ 206 static inline unsigned 207 rb_event_ts_length(struct ring_buffer_event *event) 208 { 209 unsigned len = 0; 210 211 if (extended_time(event)) { 212 /* time extends include the data event after it */ 213 len = RB_LEN_TIME_EXTEND; 214 event = skip_time_extend(event); 215 } 216 return len + rb_event_length(event); 217 } 218 219 /** 220 * ring_buffer_event_length - return the length of the event 221 * @event: the event to get the length of 222 * 223 * Returns the size of the data load of a data event. 224 * If the event is something other than a data event, it 225 * returns the size of the event itself. With the exception 226 * of a TIME EXTEND, where it still returns the size of the 227 * data load of the data event after it. 228 */ 229 unsigned ring_buffer_event_length(struct ring_buffer_event *event) 230 { 231 unsigned length; 232 233 if (extended_time(event)) 234 event = skip_time_extend(event); 235 236 length = rb_event_length(event); 237 if (event->type_len > RINGBUF_TYPE_DATA_TYPE_LEN_MAX) 238 return length; 239 length -= RB_EVNT_HDR_SIZE; 240 if (length > RB_MAX_SMALL_DATA + sizeof(event->array[0])) 241 length -= sizeof(event->array[0]); 242 return length; 243 } 244 EXPORT_SYMBOL_GPL(ring_buffer_event_length); 245 246 /* inline for ring buffer fast paths */ 247 static __always_inline void * 248 rb_event_data(struct ring_buffer_event *event) 249 { 250 if (extended_time(event)) 251 event = skip_time_extend(event); 252 BUG_ON(event->type_len > RINGBUF_TYPE_DATA_TYPE_LEN_MAX); 253 /* If length is in len field, then array[0] has the data */ 254 if (event->type_len) 255 return (void *)&event->array[0]; 256 /* Otherwise length is in array[0] and array[1] has the data */ 257 return (void *)&event->array[1]; 258 } 259 260 /** 261 * ring_buffer_event_data - return the data of the event 262 * @event: the event to get the data from 263 */ 264 void *ring_buffer_event_data(struct ring_buffer_event *event) 265 { 266 return rb_event_data(event); 267 } 268 EXPORT_SYMBOL_GPL(ring_buffer_event_data); 269 270 #define for_each_buffer_cpu(buffer, cpu) \ 271 for_each_cpu(cpu, buffer->cpumask) 272 273 #define TS_SHIFT 27 274 #define TS_MASK ((1ULL << TS_SHIFT) - 1) 275 #define TS_DELTA_TEST (~TS_MASK) 276 277 /** 278 * ring_buffer_event_time_stamp - return the event's extended timestamp 279 * @event: the event to get the timestamp of 280 * 281 * Returns the extended timestamp associated with a data event. 282 * An extended time_stamp is a 64-bit timestamp represented 283 * internally in a special way that makes the best use of space 284 * contained within a ring buffer event. This function decodes 285 * it and maps it to a straight u64 value. 286 */ 287 u64 ring_buffer_event_time_stamp(struct ring_buffer_event *event) 288 { 289 u64 ts; 290 291 ts = event->array[0]; 292 ts <<= TS_SHIFT; 293 ts += event->time_delta; 294 295 return ts; 296 } 297 298 /* Flag when events were overwritten */ 299 #define RB_MISSED_EVENTS (1 << 31) 300 /* Missed count stored at end */ 301 #define RB_MISSED_STORED (1 << 30) 302 303 #define RB_MISSED_FLAGS (RB_MISSED_EVENTS|RB_MISSED_STORED) 304 305 struct buffer_data_page { 306 u64 time_stamp; /* page time stamp */ 307 local_t commit; /* write committed index */ 308 unsigned char data[] RB_ALIGN_DATA; /* data of buffer page */ 309 }; 310 311 /* 312 * Note, the buffer_page list must be first. The buffer pages 313 * are allocated in cache lines, which means that each buffer 314 * page will be at the beginning of a cache line, and thus 315 * the least significant bits will be zero. We use this to 316 * add flags in the list struct pointers, to make the ring buffer 317 * lockless. 318 */ 319 struct buffer_page { 320 struct list_head list; /* list of buffer pages */ 321 local_t write; /* index for next write */ 322 unsigned read; /* index for next read */ 323 local_t entries; /* entries on this page */ 324 unsigned long real_end; /* real end of data */ 325 struct buffer_data_page *page; /* Actual data page */ 326 }; 327 328 /* 329 * The buffer page counters, write and entries, must be reset 330 * atomically when crossing page boundaries. To synchronize this 331 * update, two counters are inserted into the number. One is 332 * the actual counter for the write position or count on the page. 333 * 334 * The other is a counter of updaters. Before an update happens 335 * the update partition of the counter is incremented. This will 336 * allow the updater to update the counter atomically. 337 * 338 * The counter is 20 bits, and the state data is 12. 339 */ 340 #define RB_WRITE_MASK 0xfffff 341 #define RB_WRITE_INTCNT (1 << 20) 342 343 static void rb_init_page(struct buffer_data_page *bpage) 344 { 345 local_set(&bpage->commit, 0); 346 } 347 348 /* 349 * Also stolen from mm/slob.c. Thanks to Mathieu Desnoyers for pointing 350 * this issue out. 351 */ 352 static void free_buffer_page(struct buffer_page *bpage) 353 { 354 free_page((unsigned long)bpage->page); 355 kfree(bpage); 356 } 357 358 /* 359 * We need to fit the time_stamp delta into 27 bits. 360 */ 361 static inline int test_time_stamp(u64 delta) 362 { 363 if (delta & TS_DELTA_TEST) 364 return 1; 365 return 0; 366 } 367 368 #define BUF_PAGE_SIZE (PAGE_SIZE - BUF_PAGE_HDR_SIZE) 369 370 /* Max payload is BUF_PAGE_SIZE - header (8bytes) */ 371 #define BUF_MAX_DATA_SIZE (BUF_PAGE_SIZE - (sizeof(u32) * 2)) 372 373 int ring_buffer_print_page_header(struct trace_seq *s) 374 { 375 struct buffer_data_page field; 376 377 trace_seq_printf(s, "\tfield: u64 timestamp;\t" 378 "offset:0;\tsize:%u;\tsigned:%u;\n", 379 (unsigned int)sizeof(field.time_stamp), 380 (unsigned int)is_signed_type(u64)); 381 382 trace_seq_printf(s, "\tfield: local_t commit;\t" 383 "offset:%u;\tsize:%u;\tsigned:%u;\n", 384 (unsigned int)offsetof(typeof(field), commit), 385 (unsigned int)sizeof(field.commit), 386 (unsigned int)is_signed_type(long)); 387 388 trace_seq_printf(s, "\tfield: int overwrite;\t" 389 "offset:%u;\tsize:%u;\tsigned:%u;\n", 390 (unsigned int)offsetof(typeof(field), commit), 391 1, 392 (unsigned int)is_signed_type(long)); 393 394 trace_seq_printf(s, "\tfield: char data;\t" 395 "offset:%u;\tsize:%u;\tsigned:%u;\n", 396 (unsigned int)offsetof(typeof(field), data), 397 (unsigned int)BUF_PAGE_SIZE, 398 (unsigned int)is_signed_type(char)); 399 400 return !trace_seq_has_overflowed(s); 401 } 402 403 struct rb_irq_work { 404 struct irq_work work; 405 wait_queue_head_t waiters; 406 wait_queue_head_t full_waiters; 407 bool waiters_pending; 408 bool full_waiters_pending; 409 bool wakeup_full; 410 }; 411 412 /* 413 * Structure to hold event state and handle nested events. 414 */ 415 struct rb_event_info { 416 u64 ts; 417 u64 delta; 418 unsigned long length; 419 struct buffer_page *tail_page; 420 int add_timestamp; 421 }; 422 423 /* 424 * Used for which event context the event is in. 425 * NMI = 0 426 * IRQ = 1 427 * SOFTIRQ = 2 428 * NORMAL = 3 429 * 430 * See trace_recursive_lock() comment below for more details. 431 */ 432 enum { 433 RB_CTX_NMI, 434 RB_CTX_IRQ, 435 RB_CTX_SOFTIRQ, 436 RB_CTX_NORMAL, 437 RB_CTX_MAX 438 }; 439 440 /* 441 * head_page == tail_page && head == tail then buffer is empty. 442 */ 443 struct ring_buffer_per_cpu { 444 int cpu; 445 atomic_t record_disabled; 446 struct ring_buffer *buffer; 447 raw_spinlock_t reader_lock; /* serialize readers */ 448 arch_spinlock_t lock; 449 struct lock_class_key lock_key; 450 struct buffer_data_page *free_page; 451 unsigned long nr_pages; 452 unsigned int current_context; 453 struct list_head *pages; 454 struct buffer_page *head_page; /* read from head */ 455 struct buffer_page *tail_page; /* write to tail */ 456 struct buffer_page *commit_page; /* committed pages */ 457 struct buffer_page *reader_page; 458 unsigned long lost_events; 459 unsigned long last_overrun; 460 unsigned long nest; 461 local_t entries_bytes; 462 local_t entries; 463 local_t overrun; 464 local_t commit_overrun; 465 local_t dropped_events; 466 local_t committing; 467 local_t commits; 468 local_t pages_touched; 469 local_t pages_read; 470 long last_pages_touch; 471 size_t shortest_full; 472 unsigned long read; 473 unsigned long read_bytes; 474 u64 write_stamp; 475 u64 read_stamp; 476 /* ring buffer pages to update, > 0 to add, < 0 to remove */ 477 long nr_pages_to_update; 478 struct list_head new_pages; /* new pages to add */ 479 struct work_struct update_pages_work; 480 struct completion update_done; 481 482 struct rb_irq_work irq_work; 483 }; 484 485 struct ring_buffer { 486 unsigned flags; 487 int cpus; 488 atomic_t record_disabled; 489 atomic_t resize_disabled; 490 cpumask_var_t cpumask; 491 492 struct lock_class_key *reader_lock_key; 493 494 struct mutex mutex; 495 496 struct ring_buffer_per_cpu **buffers; 497 498 struct hlist_node node; 499 u64 (*clock)(void); 500 501 struct rb_irq_work irq_work; 502 bool time_stamp_abs; 503 }; 504 505 struct ring_buffer_iter { 506 struct ring_buffer_per_cpu *cpu_buffer; 507 unsigned long head; 508 struct buffer_page *head_page; 509 struct buffer_page *cache_reader_page; 510 unsigned long cache_read; 511 u64 read_stamp; 512 }; 513 514 /** 515 * ring_buffer_nr_pages - get the number of buffer pages in the ring buffer 516 * @buffer: The ring_buffer to get the number of pages from 517 * @cpu: The cpu of the ring_buffer to get the number of pages from 518 * 519 * Returns the number of pages used by a per_cpu buffer of the ring buffer. 520 */ 521 size_t ring_buffer_nr_pages(struct ring_buffer *buffer, int cpu) 522 { 523 return buffer->buffers[cpu]->nr_pages; 524 } 525 526 /** 527 * ring_buffer_nr_pages_dirty - get the number of used pages in the ring buffer 528 * @buffer: The ring_buffer to get the number of pages from 529 * @cpu: The cpu of the ring_buffer to get the number of pages from 530 * 531 * Returns the number of pages that have content in the ring buffer. 532 */ 533 size_t ring_buffer_nr_dirty_pages(struct ring_buffer *buffer, int cpu) 534 { 535 size_t read; 536 size_t cnt; 537 538 read = local_read(&buffer->buffers[cpu]->pages_read); 539 cnt = local_read(&buffer->buffers[cpu]->pages_touched); 540 /* The reader can read an empty page, but not more than that */ 541 if (cnt < read) { 542 WARN_ON_ONCE(read > cnt + 1); 543 return 0; 544 } 545 546 return cnt - read; 547 } 548 549 /* 550 * rb_wake_up_waiters - wake up tasks waiting for ring buffer input 551 * 552 * Schedules a delayed work to wake up any task that is blocked on the 553 * ring buffer waiters queue. 554 */ 555 static void rb_wake_up_waiters(struct irq_work *work) 556 { 557 struct rb_irq_work *rbwork = container_of(work, struct rb_irq_work, work); 558 559 wake_up_all(&rbwork->waiters); 560 if (rbwork->wakeup_full) { 561 rbwork->wakeup_full = false; 562 wake_up_all(&rbwork->full_waiters); 563 } 564 } 565 566 /** 567 * ring_buffer_wait - wait for input to the ring buffer 568 * @buffer: buffer to wait on 569 * @cpu: the cpu buffer to wait on 570 * @full: wait until a full page is available, if @cpu != RING_BUFFER_ALL_CPUS 571 * 572 * If @cpu == RING_BUFFER_ALL_CPUS then the task will wake up as soon 573 * as data is added to any of the @buffer's cpu buffers. Otherwise 574 * it will wait for data to be added to a specific cpu buffer. 575 */ 576 int ring_buffer_wait(struct ring_buffer *buffer, int cpu, int full) 577 { 578 struct ring_buffer_per_cpu *uninitialized_var(cpu_buffer); 579 DEFINE_WAIT(wait); 580 struct rb_irq_work *work; 581 int ret = 0; 582 583 /* 584 * Depending on what the caller is waiting for, either any 585 * data in any cpu buffer, or a specific buffer, put the 586 * caller on the appropriate wait queue. 587 */ 588 if (cpu == RING_BUFFER_ALL_CPUS) { 589 work = &buffer->irq_work; 590 /* Full only makes sense on per cpu reads */ 591 full = 0; 592 } else { 593 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 594 return -ENODEV; 595 cpu_buffer = buffer->buffers[cpu]; 596 work = &cpu_buffer->irq_work; 597 } 598 599 600 while (true) { 601 if (full) 602 prepare_to_wait(&work->full_waiters, &wait, TASK_INTERRUPTIBLE); 603 else 604 prepare_to_wait(&work->waiters, &wait, TASK_INTERRUPTIBLE); 605 606 /* 607 * The events can happen in critical sections where 608 * checking a work queue can cause deadlocks. 609 * After adding a task to the queue, this flag is set 610 * only to notify events to try to wake up the queue 611 * using irq_work. 612 * 613 * We don't clear it even if the buffer is no longer 614 * empty. The flag only causes the next event to run 615 * irq_work to do the work queue wake up. The worse 616 * that can happen if we race with !trace_empty() is that 617 * an event will cause an irq_work to try to wake up 618 * an empty queue. 619 * 620 * There's no reason to protect this flag either, as 621 * the work queue and irq_work logic will do the necessary 622 * synchronization for the wake ups. The only thing 623 * that is necessary is that the wake up happens after 624 * a task has been queued. It's OK for spurious wake ups. 625 */ 626 if (full) 627 work->full_waiters_pending = true; 628 else 629 work->waiters_pending = true; 630 631 if (signal_pending(current)) { 632 ret = -EINTR; 633 break; 634 } 635 636 if (cpu == RING_BUFFER_ALL_CPUS && !ring_buffer_empty(buffer)) 637 break; 638 639 if (cpu != RING_BUFFER_ALL_CPUS && 640 !ring_buffer_empty_cpu(buffer, cpu)) { 641 unsigned long flags; 642 bool pagebusy; 643 size_t nr_pages; 644 size_t dirty; 645 646 if (!full) 647 break; 648 649 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 650 pagebusy = cpu_buffer->reader_page == cpu_buffer->commit_page; 651 nr_pages = cpu_buffer->nr_pages; 652 dirty = ring_buffer_nr_dirty_pages(buffer, cpu); 653 if (!cpu_buffer->shortest_full || 654 cpu_buffer->shortest_full < full) 655 cpu_buffer->shortest_full = full; 656 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 657 if (!pagebusy && 658 (!nr_pages || (dirty * 100) > full * nr_pages)) 659 break; 660 } 661 662 schedule(); 663 } 664 665 if (full) 666 finish_wait(&work->full_waiters, &wait); 667 else 668 finish_wait(&work->waiters, &wait); 669 670 return ret; 671 } 672 673 /** 674 * ring_buffer_poll_wait - poll on buffer input 675 * @buffer: buffer to wait on 676 * @cpu: the cpu buffer to wait on 677 * @filp: the file descriptor 678 * @poll_table: The poll descriptor 679 * 680 * If @cpu == RING_BUFFER_ALL_CPUS then the task will wake up as soon 681 * as data is added to any of the @buffer's cpu buffers. Otherwise 682 * it will wait for data to be added to a specific cpu buffer. 683 * 684 * Returns EPOLLIN | EPOLLRDNORM if data exists in the buffers, 685 * zero otherwise. 686 */ 687 __poll_t ring_buffer_poll_wait(struct ring_buffer *buffer, int cpu, 688 struct file *filp, poll_table *poll_table) 689 { 690 struct ring_buffer_per_cpu *cpu_buffer; 691 struct rb_irq_work *work; 692 693 if (cpu == RING_BUFFER_ALL_CPUS) 694 work = &buffer->irq_work; 695 else { 696 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 697 return -EINVAL; 698 699 cpu_buffer = buffer->buffers[cpu]; 700 work = &cpu_buffer->irq_work; 701 } 702 703 poll_wait(filp, &work->waiters, poll_table); 704 work->waiters_pending = true; 705 /* 706 * There's a tight race between setting the waiters_pending and 707 * checking if the ring buffer is empty. Once the waiters_pending bit 708 * is set, the next event will wake the task up, but we can get stuck 709 * if there's only a single event in. 710 * 711 * FIXME: Ideally, we need a memory barrier on the writer side as well, 712 * but adding a memory barrier to all events will cause too much of a 713 * performance hit in the fast path. We only need a memory barrier when 714 * the buffer goes from empty to having content. But as this race is 715 * extremely small, and it's not a problem if another event comes in, we 716 * will fix it later. 717 */ 718 smp_mb(); 719 720 if ((cpu == RING_BUFFER_ALL_CPUS && !ring_buffer_empty(buffer)) || 721 (cpu != RING_BUFFER_ALL_CPUS && !ring_buffer_empty_cpu(buffer, cpu))) 722 return EPOLLIN | EPOLLRDNORM; 723 return 0; 724 } 725 726 /* buffer may be either ring_buffer or ring_buffer_per_cpu */ 727 #define RB_WARN_ON(b, cond) \ 728 ({ \ 729 int _____ret = unlikely(cond); \ 730 if (_____ret) { \ 731 if (__same_type(*(b), struct ring_buffer_per_cpu)) { \ 732 struct ring_buffer_per_cpu *__b = \ 733 (void *)b; \ 734 atomic_inc(&__b->buffer->record_disabled); \ 735 } else \ 736 atomic_inc(&b->record_disabled); \ 737 WARN_ON(1); \ 738 } \ 739 _____ret; \ 740 }) 741 742 /* Up this if you want to test the TIME_EXTENTS and normalization */ 743 #define DEBUG_SHIFT 0 744 745 static inline u64 rb_time_stamp(struct ring_buffer *buffer) 746 { 747 /* shift to debug/test normalization and TIME_EXTENTS */ 748 return buffer->clock() << DEBUG_SHIFT; 749 } 750 751 u64 ring_buffer_time_stamp(struct ring_buffer *buffer, int cpu) 752 { 753 u64 time; 754 755 preempt_disable_notrace(); 756 time = rb_time_stamp(buffer); 757 preempt_enable_notrace(); 758 759 return time; 760 } 761 EXPORT_SYMBOL_GPL(ring_buffer_time_stamp); 762 763 void ring_buffer_normalize_time_stamp(struct ring_buffer *buffer, 764 int cpu, u64 *ts) 765 { 766 /* Just stupid testing the normalize function and deltas */ 767 *ts >>= DEBUG_SHIFT; 768 } 769 EXPORT_SYMBOL_GPL(ring_buffer_normalize_time_stamp); 770 771 /* 772 * Making the ring buffer lockless makes things tricky. 773 * Although writes only happen on the CPU that they are on, 774 * and they only need to worry about interrupts. Reads can 775 * happen on any CPU. 776 * 777 * The reader page is always off the ring buffer, but when the 778 * reader finishes with a page, it needs to swap its page with 779 * a new one from the buffer. The reader needs to take from 780 * the head (writes go to the tail). But if a writer is in overwrite 781 * mode and wraps, it must push the head page forward. 782 * 783 * Here lies the problem. 784 * 785 * The reader must be careful to replace only the head page, and 786 * not another one. As described at the top of the file in the 787 * ASCII art, the reader sets its old page to point to the next 788 * page after head. It then sets the page after head to point to 789 * the old reader page. But if the writer moves the head page 790 * during this operation, the reader could end up with the tail. 791 * 792 * We use cmpxchg to help prevent this race. We also do something 793 * special with the page before head. We set the LSB to 1. 794 * 795 * When the writer must push the page forward, it will clear the 796 * bit that points to the head page, move the head, and then set 797 * the bit that points to the new head page. 798 * 799 * We also don't want an interrupt coming in and moving the head 800 * page on another writer. Thus we use the second LSB to catch 801 * that too. Thus: 802 * 803 * head->list->prev->next bit 1 bit 0 804 * ------- ------- 805 * Normal page 0 0 806 * Points to head page 0 1 807 * New head page 1 0 808 * 809 * Note we can not trust the prev pointer of the head page, because: 810 * 811 * +----+ +-----+ +-----+ 812 * | |------>| T |---X--->| N | 813 * | |<------| | | | 814 * +----+ +-----+ +-----+ 815 * ^ ^ | 816 * | +-----+ | | 817 * +----------| R |----------+ | 818 * | |<-----------+ 819 * +-----+ 820 * 821 * Key: ---X--> HEAD flag set in pointer 822 * T Tail page 823 * R Reader page 824 * N Next page 825 * 826 * (see __rb_reserve_next() to see where this happens) 827 * 828 * What the above shows is that the reader just swapped out 829 * the reader page with a page in the buffer, but before it 830 * could make the new header point back to the new page added 831 * it was preempted by a writer. The writer moved forward onto 832 * the new page added by the reader and is about to move forward 833 * again. 834 * 835 * You can see, it is legitimate for the previous pointer of 836 * the head (or any page) not to point back to itself. But only 837 * temporarily. 838 */ 839 840 #define RB_PAGE_NORMAL 0UL 841 #define RB_PAGE_HEAD 1UL 842 #define RB_PAGE_UPDATE 2UL 843 844 845 #define RB_FLAG_MASK 3UL 846 847 /* PAGE_MOVED is not part of the mask */ 848 #define RB_PAGE_MOVED 4UL 849 850 /* 851 * rb_list_head - remove any bit 852 */ 853 static struct list_head *rb_list_head(struct list_head *list) 854 { 855 unsigned long val = (unsigned long)list; 856 857 return (struct list_head *)(val & ~RB_FLAG_MASK); 858 } 859 860 /* 861 * rb_is_head_page - test if the given page is the head page 862 * 863 * Because the reader may move the head_page pointer, we can 864 * not trust what the head page is (it may be pointing to 865 * the reader page). But if the next page is a header page, 866 * its flags will be non zero. 867 */ 868 static inline int 869 rb_is_head_page(struct ring_buffer_per_cpu *cpu_buffer, 870 struct buffer_page *page, struct list_head *list) 871 { 872 unsigned long val; 873 874 val = (unsigned long)list->next; 875 876 if ((val & ~RB_FLAG_MASK) != (unsigned long)&page->list) 877 return RB_PAGE_MOVED; 878 879 return val & RB_FLAG_MASK; 880 } 881 882 /* 883 * rb_is_reader_page 884 * 885 * The unique thing about the reader page, is that, if the 886 * writer is ever on it, the previous pointer never points 887 * back to the reader page. 888 */ 889 static bool rb_is_reader_page(struct buffer_page *page) 890 { 891 struct list_head *list = page->list.prev; 892 893 return rb_list_head(list->next) != &page->list; 894 } 895 896 /* 897 * rb_set_list_to_head - set a list_head to be pointing to head. 898 */ 899 static void rb_set_list_to_head(struct ring_buffer_per_cpu *cpu_buffer, 900 struct list_head *list) 901 { 902 unsigned long *ptr; 903 904 ptr = (unsigned long *)&list->next; 905 *ptr |= RB_PAGE_HEAD; 906 *ptr &= ~RB_PAGE_UPDATE; 907 } 908 909 /* 910 * rb_head_page_activate - sets up head page 911 */ 912 static void rb_head_page_activate(struct ring_buffer_per_cpu *cpu_buffer) 913 { 914 struct buffer_page *head; 915 916 head = cpu_buffer->head_page; 917 if (!head) 918 return; 919 920 /* 921 * Set the previous list pointer to have the HEAD flag. 922 */ 923 rb_set_list_to_head(cpu_buffer, head->list.prev); 924 } 925 926 static void rb_list_head_clear(struct list_head *list) 927 { 928 unsigned long *ptr = (unsigned long *)&list->next; 929 930 *ptr &= ~RB_FLAG_MASK; 931 } 932 933 /* 934 * rb_head_page_deactivate - clears head page ptr (for free list) 935 */ 936 static void 937 rb_head_page_deactivate(struct ring_buffer_per_cpu *cpu_buffer) 938 { 939 struct list_head *hd; 940 941 /* Go through the whole list and clear any pointers found. */ 942 rb_list_head_clear(cpu_buffer->pages); 943 944 list_for_each(hd, cpu_buffer->pages) 945 rb_list_head_clear(hd); 946 } 947 948 static int rb_head_page_set(struct ring_buffer_per_cpu *cpu_buffer, 949 struct buffer_page *head, 950 struct buffer_page *prev, 951 int old_flag, int new_flag) 952 { 953 struct list_head *list; 954 unsigned long val = (unsigned long)&head->list; 955 unsigned long ret; 956 957 list = &prev->list; 958 959 val &= ~RB_FLAG_MASK; 960 961 ret = cmpxchg((unsigned long *)&list->next, 962 val | old_flag, val | new_flag); 963 964 /* check if the reader took the page */ 965 if ((ret & ~RB_FLAG_MASK) != val) 966 return RB_PAGE_MOVED; 967 968 return ret & RB_FLAG_MASK; 969 } 970 971 static int rb_head_page_set_update(struct ring_buffer_per_cpu *cpu_buffer, 972 struct buffer_page *head, 973 struct buffer_page *prev, 974 int old_flag) 975 { 976 return rb_head_page_set(cpu_buffer, head, prev, 977 old_flag, RB_PAGE_UPDATE); 978 } 979 980 static int rb_head_page_set_head(struct ring_buffer_per_cpu *cpu_buffer, 981 struct buffer_page *head, 982 struct buffer_page *prev, 983 int old_flag) 984 { 985 return rb_head_page_set(cpu_buffer, head, prev, 986 old_flag, RB_PAGE_HEAD); 987 } 988 989 static int rb_head_page_set_normal(struct ring_buffer_per_cpu *cpu_buffer, 990 struct buffer_page *head, 991 struct buffer_page *prev, 992 int old_flag) 993 { 994 return rb_head_page_set(cpu_buffer, head, prev, 995 old_flag, RB_PAGE_NORMAL); 996 } 997 998 static inline void rb_inc_page(struct ring_buffer_per_cpu *cpu_buffer, 999 struct buffer_page **bpage) 1000 { 1001 struct list_head *p = rb_list_head((*bpage)->list.next); 1002 1003 *bpage = list_entry(p, struct buffer_page, list); 1004 } 1005 1006 static struct buffer_page * 1007 rb_set_head_page(struct ring_buffer_per_cpu *cpu_buffer) 1008 { 1009 struct buffer_page *head; 1010 struct buffer_page *page; 1011 struct list_head *list; 1012 int i; 1013 1014 if (RB_WARN_ON(cpu_buffer, !cpu_buffer->head_page)) 1015 return NULL; 1016 1017 /* sanity check */ 1018 list = cpu_buffer->pages; 1019 if (RB_WARN_ON(cpu_buffer, rb_list_head(list->prev->next) != list)) 1020 return NULL; 1021 1022 page = head = cpu_buffer->head_page; 1023 /* 1024 * It is possible that the writer moves the header behind 1025 * where we started, and we miss in one loop. 1026 * A second loop should grab the header, but we'll do 1027 * three loops just because I'm paranoid. 1028 */ 1029 for (i = 0; i < 3; i++) { 1030 do { 1031 if (rb_is_head_page(cpu_buffer, page, page->list.prev)) { 1032 cpu_buffer->head_page = page; 1033 return page; 1034 } 1035 rb_inc_page(cpu_buffer, &page); 1036 } while (page != head); 1037 } 1038 1039 RB_WARN_ON(cpu_buffer, 1); 1040 1041 return NULL; 1042 } 1043 1044 static int rb_head_page_replace(struct buffer_page *old, 1045 struct buffer_page *new) 1046 { 1047 unsigned long *ptr = (unsigned long *)&old->list.prev->next; 1048 unsigned long val; 1049 unsigned long ret; 1050 1051 val = *ptr & ~RB_FLAG_MASK; 1052 val |= RB_PAGE_HEAD; 1053 1054 ret = cmpxchg(ptr, val, (unsigned long)&new->list); 1055 1056 return ret == val; 1057 } 1058 1059 /* 1060 * rb_tail_page_update - move the tail page forward 1061 */ 1062 static void rb_tail_page_update(struct ring_buffer_per_cpu *cpu_buffer, 1063 struct buffer_page *tail_page, 1064 struct buffer_page *next_page) 1065 { 1066 unsigned long old_entries; 1067 unsigned long old_write; 1068 1069 /* 1070 * The tail page now needs to be moved forward. 1071 * 1072 * We need to reset the tail page, but without messing 1073 * with possible erasing of data brought in by interrupts 1074 * that have moved the tail page and are currently on it. 1075 * 1076 * We add a counter to the write field to denote this. 1077 */ 1078 old_write = local_add_return(RB_WRITE_INTCNT, &next_page->write); 1079 old_entries = local_add_return(RB_WRITE_INTCNT, &next_page->entries); 1080 1081 local_inc(&cpu_buffer->pages_touched); 1082 /* 1083 * Just make sure we have seen our old_write and synchronize 1084 * with any interrupts that come in. 1085 */ 1086 barrier(); 1087 1088 /* 1089 * If the tail page is still the same as what we think 1090 * it is, then it is up to us to update the tail 1091 * pointer. 1092 */ 1093 if (tail_page == READ_ONCE(cpu_buffer->tail_page)) { 1094 /* Zero the write counter */ 1095 unsigned long val = old_write & ~RB_WRITE_MASK; 1096 unsigned long eval = old_entries & ~RB_WRITE_MASK; 1097 1098 /* 1099 * This will only succeed if an interrupt did 1100 * not come in and change it. In which case, we 1101 * do not want to modify it. 1102 * 1103 * We add (void) to let the compiler know that we do not care 1104 * about the return value of these functions. We use the 1105 * cmpxchg to only update if an interrupt did not already 1106 * do it for us. If the cmpxchg fails, we don't care. 1107 */ 1108 (void)local_cmpxchg(&next_page->write, old_write, val); 1109 (void)local_cmpxchg(&next_page->entries, old_entries, eval); 1110 1111 /* 1112 * No need to worry about races with clearing out the commit. 1113 * it only can increment when a commit takes place. But that 1114 * only happens in the outer most nested commit. 1115 */ 1116 local_set(&next_page->page->commit, 0); 1117 1118 /* Again, either we update tail_page or an interrupt does */ 1119 (void)cmpxchg(&cpu_buffer->tail_page, tail_page, next_page); 1120 } 1121 } 1122 1123 static int rb_check_bpage(struct ring_buffer_per_cpu *cpu_buffer, 1124 struct buffer_page *bpage) 1125 { 1126 unsigned long val = (unsigned long)bpage; 1127 1128 if (RB_WARN_ON(cpu_buffer, val & RB_FLAG_MASK)) 1129 return 1; 1130 1131 return 0; 1132 } 1133 1134 /** 1135 * rb_check_list - make sure a pointer to a list has the last bits zero 1136 */ 1137 static int rb_check_list(struct ring_buffer_per_cpu *cpu_buffer, 1138 struct list_head *list) 1139 { 1140 if (RB_WARN_ON(cpu_buffer, rb_list_head(list->prev) != list->prev)) 1141 return 1; 1142 if (RB_WARN_ON(cpu_buffer, rb_list_head(list->next) != list->next)) 1143 return 1; 1144 return 0; 1145 } 1146 1147 /** 1148 * rb_check_pages - integrity check of buffer pages 1149 * @cpu_buffer: CPU buffer with pages to test 1150 * 1151 * As a safety measure we check to make sure the data pages have not 1152 * been corrupted. 1153 */ 1154 static int rb_check_pages(struct ring_buffer_per_cpu *cpu_buffer) 1155 { 1156 struct list_head *head = cpu_buffer->pages; 1157 struct buffer_page *bpage, *tmp; 1158 1159 /* Reset the head page if it exists */ 1160 if (cpu_buffer->head_page) 1161 rb_set_head_page(cpu_buffer); 1162 1163 rb_head_page_deactivate(cpu_buffer); 1164 1165 if (RB_WARN_ON(cpu_buffer, head->next->prev != head)) 1166 return -1; 1167 if (RB_WARN_ON(cpu_buffer, head->prev->next != head)) 1168 return -1; 1169 1170 if (rb_check_list(cpu_buffer, head)) 1171 return -1; 1172 1173 list_for_each_entry_safe(bpage, tmp, head, list) { 1174 if (RB_WARN_ON(cpu_buffer, 1175 bpage->list.next->prev != &bpage->list)) 1176 return -1; 1177 if (RB_WARN_ON(cpu_buffer, 1178 bpage->list.prev->next != &bpage->list)) 1179 return -1; 1180 if (rb_check_list(cpu_buffer, &bpage->list)) 1181 return -1; 1182 } 1183 1184 rb_head_page_activate(cpu_buffer); 1185 1186 return 0; 1187 } 1188 1189 static int __rb_allocate_pages(long nr_pages, struct list_head *pages, int cpu) 1190 { 1191 struct buffer_page *bpage, *tmp; 1192 bool user_thread = current->mm != NULL; 1193 gfp_t mflags; 1194 long i; 1195 1196 /* 1197 * Check if the available memory is there first. 1198 * Note, si_mem_available() only gives us a rough estimate of available 1199 * memory. It may not be accurate. But we don't care, we just want 1200 * to prevent doing any allocation when it is obvious that it is 1201 * not going to succeed. 1202 */ 1203 i = si_mem_available(); 1204 if (i < nr_pages) 1205 return -ENOMEM; 1206 1207 /* 1208 * __GFP_RETRY_MAYFAIL flag makes sure that the allocation fails 1209 * gracefully without invoking oom-killer and the system is not 1210 * destabilized. 1211 */ 1212 mflags = GFP_KERNEL | __GFP_RETRY_MAYFAIL; 1213 1214 /* 1215 * If a user thread allocates too much, and si_mem_available() 1216 * reports there's enough memory, even though there is not. 1217 * Make sure the OOM killer kills this thread. This can happen 1218 * even with RETRY_MAYFAIL because another task may be doing 1219 * an allocation after this task has taken all memory. 1220 * This is the task the OOM killer needs to take out during this 1221 * loop, even if it was triggered by an allocation somewhere else. 1222 */ 1223 if (user_thread) 1224 set_current_oom_origin(); 1225 for (i = 0; i < nr_pages; i++) { 1226 struct page *page; 1227 1228 bpage = kzalloc_node(ALIGN(sizeof(*bpage), cache_line_size()), 1229 mflags, cpu_to_node(cpu)); 1230 if (!bpage) 1231 goto free_pages; 1232 1233 list_add(&bpage->list, pages); 1234 1235 page = alloc_pages_node(cpu_to_node(cpu), mflags, 0); 1236 if (!page) 1237 goto free_pages; 1238 bpage->page = page_address(page); 1239 rb_init_page(bpage->page); 1240 1241 if (user_thread && fatal_signal_pending(current)) 1242 goto free_pages; 1243 } 1244 if (user_thread) 1245 clear_current_oom_origin(); 1246 1247 return 0; 1248 1249 free_pages: 1250 list_for_each_entry_safe(bpage, tmp, pages, list) { 1251 list_del_init(&bpage->list); 1252 free_buffer_page(bpage); 1253 } 1254 if (user_thread) 1255 clear_current_oom_origin(); 1256 1257 return -ENOMEM; 1258 } 1259 1260 static int rb_allocate_pages(struct ring_buffer_per_cpu *cpu_buffer, 1261 unsigned long nr_pages) 1262 { 1263 LIST_HEAD(pages); 1264 1265 WARN_ON(!nr_pages); 1266 1267 if (__rb_allocate_pages(nr_pages, &pages, cpu_buffer->cpu)) 1268 return -ENOMEM; 1269 1270 /* 1271 * The ring buffer page list is a circular list that does not 1272 * start and end with a list head. All page list items point to 1273 * other pages. 1274 */ 1275 cpu_buffer->pages = pages.next; 1276 list_del(&pages); 1277 1278 cpu_buffer->nr_pages = nr_pages; 1279 1280 rb_check_pages(cpu_buffer); 1281 1282 return 0; 1283 } 1284 1285 static struct ring_buffer_per_cpu * 1286 rb_allocate_cpu_buffer(struct ring_buffer *buffer, long nr_pages, int cpu) 1287 { 1288 struct ring_buffer_per_cpu *cpu_buffer; 1289 struct buffer_page *bpage; 1290 struct page *page; 1291 int ret; 1292 1293 cpu_buffer = kzalloc_node(ALIGN(sizeof(*cpu_buffer), cache_line_size()), 1294 GFP_KERNEL, cpu_to_node(cpu)); 1295 if (!cpu_buffer) 1296 return NULL; 1297 1298 cpu_buffer->cpu = cpu; 1299 cpu_buffer->buffer = buffer; 1300 raw_spin_lock_init(&cpu_buffer->reader_lock); 1301 lockdep_set_class(&cpu_buffer->reader_lock, buffer->reader_lock_key); 1302 cpu_buffer->lock = (arch_spinlock_t)__ARCH_SPIN_LOCK_UNLOCKED; 1303 INIT_WORK(&cpu_buffer->update_pages_work, update_pages_handler); 1304 init_completion(&cpu_buffer->update_done); 1305 init_irq_work(&cpu_buffer->irq_work.work, rb_wake_up_waiters); 1306 init_waitqueue_head(&cpu_buffer->irq_work.waiters); 1307 init_waitqueue_head(&cpu_buffer->irq_work.full_waiters); 1308 1309 bpage = kzalloc_node(ALIGN(sizeof(*bpage), cache_line_size()), 1310 GFP_KERNEL, cpu_to_node(cpu)); 1311 if (!bpage) 1312 goto fail_free_buffer; 1313 1314 rb_check_bpage(cpu_buffer, bpage); 1315 1316 cpu_buffer->reader_page = bpage; 1317 page = alloc_pages_node(cpu_to_node(cpu), GFP_KERNEL, 0); 1318 if (!page) 1319 goto fail_free_reader; 1320 bpage->page = page_address(page); 1321 rb_init_page(bpage->page); 1322 1323 INIT_LIST_HEAD(&cpu_buffer->reader_page->list); 1324 INIT_LIST_HEAD(&cpu_buffer->new_pages); 1325 1326 ret = rb_allocate_pages(cpu_buffer, nr_pages); 1327 if (ret < 0) 1328 goto fail_free_reader; 1329 1330 cpu_buffer->head_page 1331 = list_entry(cpu_buffer->pages, struct buffer_page, list); 1332 cpu_buffer->tail_page = cpu_buffer->commit_page = cpu_buffer->head_page; 1333 1334 rb_head_page_activate(cpu_buffer); 1335 1336 return cpu_buffer; 1337 1338 fail_free_reader: 1339 free_buffer_page(cpu_buffer->reader_page); 1340 1341 fail_free_buffer: 1342 kfree(cpu_buffer); 1343 return NULL; 1344 } 1345 1346 static void rb_free_cpu_buffer(struct ring_buffer_per_cpu *cpu_buffer) 1347 { 1348 struct list_head *head = cpu_buffer->pages; 1349 struct buffer_page *bpage, *tmp; 1350 1351 free_buffer_page(cpu_buffer->reader_page); 1352 1353 rb_head_page_deactivate(cpu_buffer); 1354 1355 if (head) { 1356 list_for_each_entry_safe(bpage, tmp, head, list) { 1357 list_del_init(&bpage->list); 1358 free_buffer_page(bpage); 1359 } 1360 bpage = list_entry(head, struct buffer_page, list); 1361 free_buffer_page(bpage); 1362 } 1363 1364 kfree(cpu_buffer); 1365 } 1366 1367 /** 1368 * __ring_buffer_alloc - allocate a new ring_buffer 1369 * @size: the size in bytes per cpu that is needed. 1370 * @flags: attributes to set for the ring buffer. 1371 * 1372 * Currently the only flag that is available is the RB_FL_OVERWRITE 1373 * flag. This flag means that the buffer will overwrite old data 1374 * when the buffer wraps. If this flag is not set, the buffer will 1375 * drop data when the tail hits the head. 1376 */ 1377 struct ring_buffer *__ring_buffer_alloc(unsigned long size, unsigned flags, 1378 struct lock_class_key *key) 1379 { 1380 struct ring_buffer *buffer; 1381 long nr_pages; 1382 int bsize; 1383 int cpu; 1384 int ret; 1385 1386 /* keep it in its own cache line */ 1387 buffer = kzalloc(ALIGN(sizeof(*buffer), cache_line_size()), 1388 GFP_KERNEL); 1389 if (!buffer) 1390 return NULL; 1391 1392 if (!zalloc_cpumask_var(&buffer->cpumask, GFP_KERNEL)) 1393 goto fail_free_buffer; 1394 1395 nr_pages = DIV_ROUND_UP(size, BUF_PAGE_SIZE); 1396 buffer->flags = flags; 1397 buffer->clock = trace_clock_local; 1398 buffer->reader_lock_key = key; 1399 1400 init_irq_work(&buffer->irq_work.work, rb_wake_up_waiters); 1401 init_waitqueue_head(&buffer->irq_work.waiters); 1402 1403 /* need at least two pages */ 1404 if (nr_pages < 2) 1405 nr_pages = 2; 1406 1407 buffer->cpus = nr_cpu_ids; 1408 1409 bsize = sizeof(void *) * nr_cpu_ids; 1410 buffer->buffers = kzalloc(ALIGN(bsize, cache_line_size()), 1411 GFP_KERNEL); 1412 if (!buffer->buffers) 1413 goto fail_free_cpumask; 1414 1415 cpu = raw_smp_processor_id(); 1416 cpumask_set_cpu(cpu, buffer->cpumask); 1417 buffer->buffers[cpu] = rb_allocate_cpu_buffer(buffer, nr_pages, cpu); 1418 if (!buffer->buffers[cpu]) 1419 goto fail_free_buffers; 1420 1421 ret = cpuhp_state_add_instance(CPUHP_TRACE_RB_PREPARE, &buffer->node); 1422 if (ret < 0) 1423 goto fail_free_buffers; 1424 1425 mutex_init(&buffer->mutex); 1426 1427 return buffer; 1428 1429 fail_free_buffers: 1430 for_each_buffer_cpu(buffer, cpu) { 1431 if (buffer->buffers[cpu]) 1432 rb_free_cpu_buffer(buffer->buffers[cpu]); 1433 } 1434 kfree(buffer->buffers); 1435 1436 fail_free_cpumask: 1437 free_cpumask_var(buffer->cpumask); 1438 1439 fail_free_buffer: 1440 kfree(buffer); 1441 return NULL; 1442 } 1443 EXPORT_SYMBOL_GPL(__ring_buffer_alloc); 1444 1445 /** 1446 * ring_buffer_free - free a ring buffer. 1447 * @buffer: the buffer to free. 1448 */ 1449 void 1450 ring_buffer_free(struct ring_buffer *buffer) 1451 { 1452 int cpu; 1453 1454 cpuhp_state_remove_instance(CPUHP_TRACE_RB_PREPARE, &buffer->node); 1455 1456 for_each_buffer_cpu(buffer, cpu) 1457 rb_free_cpu_buffer(buffer->buffers[cpu]); 1458 1459 kfree(buffer->buffers); 1460 free_cpumask_var(buffer->cpumask); 1461 1462 kfree(buffer); 1463 } 1464 EXPORT_SYMBOL_GPL(ring_buffer_free); 1465 1466 void ring_buffer_set_clock(struct ring_buffer *buffer, 1467 u64 (*clock)(void)) 1468 { 1469 buffer->clock = clock; 1470 } 1471 1472 void ring_buffer_set_time_stamp_abs(struct ring_buffer *buffer, bool abs) 1473 { 1474 buffer->time_stamp_abs = abs; 1475 } 1476 1477 bool ring_buffer_time_stamp_abs(struct ring_buffer *buffer) 1478 { 1479 return buffer->time_stamp_abs; 1480 } 1481 1482 static void rb_reset_cpu(struct ring_buffer_per_cpu *cpu_buffer); 1483 1484 static inline unsigned long rb_page_entries(struct buffer_page *bpage) 1485 { 1486 return local_read(&bpage->entries) & RB_WRITE_MASK; 1487 } 1488 1489 static inline unsigned long rb_page_write(struct buffer_page *bpage) 1490 { 1491 return local_read(&bpage->write) & RB_WRITE_MASK; 1492 } 1493 1494 static int 1495 rb_remove_pages(struct ring_buffer_per_cpu *cpu_buffer, unsigned long nr_pages) 1496 { 1497 struct list_head *tail_page, *to_remove, *next_page; 1498 struct buffer_page *to_remove_page, *tmp_iter_page; 1499 struct buffer_page *last_page, *first_page; 1500 unsigned long nr_removed; 1501 unsigned long head_bit; 1502 int page_entries; 1503 1504 head_bit = 0; 1505 1506 raw_spin_lock_irq(&cpu_buffer->reader_lock); 1507 atomic_inc(&cpu_buffer->record_disabled); 1508 /* 1509 * We don't race with the readers since we have acquired the reader 1510 * lock. We also don't race with writers after disabling recording. 1511 * This makes it easy to figure out the first and the last page to be 1512 * removed from the list. We unlink all the pages in between including 1513 * the first and last pages. This is done in a busy loop so that we 1514 * lose the least number of traces. 1515 * The pages are freed after we restart recording and unlock readers. 1516 */ 1517 tail_page = &cpu_buffer->tail_page->list; 1518 1519 /* 1520 * tail page might be on reader page, we remove the next page 1521 * from the ring buffer 1522 */ 1523 if (cpu_buffer->tail_page == cpu_buffer->reader_page) 1524 tail_page = rb_list_head(tail_page->next); 1525 to_remove = tail_page; 1526 1527 /* start of pages to remove */ 1528 first_page = list_entry(rb_list_head(to_remove->next), 1529 struct buffer_page, list); 1530 1531 for (nr_removed = 0; nr_removed < nr_pages; nr_removed++) { 1532 to_remove = rb_list_head(to_remove)->next; 1533 head_bit |= (unsigned long)to_remove & RB_PAGE_HEAD; 1534 } 1535 1536 next_page = rb_list_head(to_remove)->next; 1537 1538 /* 1539 * Now we remove all pages between tail_page and next_page. 1540 * Make sure that we have head_bit value preserved for the 1541 * next page 1542 */ 1543 tail_page->next = (struct list_head *)((unsigned long)next_page | 1544 head_bit); 1545 next_page = rb_list_head(next_page); 1546 next_page->prev = tail_page; 1547 1548 /* make sure pages points to a valid page in the ring buffer */ 1549 cpu_buffer->pages = next_page; 1550 1551 /* update head page */ 1552 if (head_bit) 1553 cpu_buffer->head_page = list_entry(next_page, 1554 struct buffer_page, list); 1555 1556 /* 1557 * change read pointer to make sure any read iterators reset 1558 * themselves 1559 */ 1560 cpu_buffer->read = 0; 1561 1562 /* pages are removed, resume tracing and then free the pages */ 1563 atomic_dec(&cpu_buffer->record_disabled); 1564 raw_spin_unlock_irq(&cpu_buffer->reader_lock); 1565 1566 RB_WARN_ON(cpu_buffer, list_empty(cpu_buffer->pages)); 1567 1568 /* last buffer page to remove */ 1569 last_page = list_entry(rb_list_head(to_remove), struct buffer_page, 1570 list); 1571 tmp_iter_page = first_page; 1572 1573 do { 1574 cond_resched(); 1575 1576 to_remove_page = tmp_iter_page; 1577 rb_inc_page(cpu_buffer, &tmp_iter_page); 1578 1579 /* update the counters */ 1580 page_entries = rb_page_entries(to_remove_page); 1581 if (page_entries) { 1582 /* 1583 * If something was added to this page, it was full 1584 * since it is not the tail page. So we deduct the 1585 * bytes consumed in ring buffer from here. 1586 * Increment overrun to account for the lost events. 1587 */ 1588 local_add(page_entries, &cpu_buffer->overrun); 1589 local_sub(BUF_PAGE_SIZE, &cpu_buffer->entries_bytes); 1590 } 1591 1592 /* 1593 * We have already removed references to this list item, just 1594 * free up the buffer_page and its page 1595 */ 1596 free_buffer_page(to_remove_page); 1597 nr_removed--; 1598 1599 } while (to_remove_page != last_page); 1600 1601 RB_WARN_ON(cpu_buffer, nr_removed); 1602 1603 return nr_removed == 0; 1604 } 1605 1606 static int 1607 rb_insert_pages(struct ring_buffer_per_cpu *cpu_buffer) 1608 { 1609 struct list_head *pages = &cpu_buffer->new_pages; 1610 int retries, success; 1611 1612 raw_spin_lock_irq(&cpu_buffer->reader_lock); 1613 /* 1614 * We are holding the reader lock, so the reader page won't be swapped 1615 * in the ring buffer. Now we are racing with the writer trying to 1616 * move head page and the tail page. 1617 * We are going to adapt the reader page update process where: 1618 * 1. We first splice the start and end of list of new pages between 1619 * the head page and its previous page. 1620 * 2. We cmpxchg the prev_page->next to point from head page to the 1621 * start of new pages list. 1622 * 3. Finally, we update the head->prev to the end of new list. 1623 * 1624 * We will try this process 10 times, to make sure that we don't keep 1625 * spinning. 1626 */ 1627 retries = 10; 1628 success = 0; 1629 while (retries--) { 1630 struct list_head *head_page, *prev_page, *r; 1631 struct list_head *last_page, *first_page; 1632 struct list_head *head_page_with_bit; 1633 1634 head_page = &rb_set_head_page(cpu_buffer)->list; 1635 if (!head_page) 1636 break; 1637 prev_page = head_page->prev; 1638 1639 first_page = pages->next; 1640 last_page = pages->prev; 1641 1642 head_page_with_bit = (struct list_head *) 1643 ((unsigned long)head_page | RB_PAGE_HEAD); 1644 1645 last_page->next = head_page_with_bit; 1646 first_page->prev = prev_page; 1647 1648 r = cmpxchg(&prev_page->next, head_page_with_bit, first_page); 1649 1650 if (r == head_page_with_bit) { 1651 /* 1652 * yay, we replaced the page pointer to our new list, 1653 * now, we just have to update to head page's prev 1654 * pointer to point to end of list 1655 */ 1656 head_page->prev = last_page; 1657 success = 1; 1658 break; 1659 } 1660 } 1661 1662 if (success) 1663 INIT_LIST_HEAD(pages); 1664 /* 1665 * If we weren't successful in adding in new pages, warn and stop 1666 * tracing 1667 */ 1668 RB_WARN_ON(cpu_buffer, !success); 1669 raw_spin_unlock_irq(&cpu_buffer->reader_lock); 1670 1671 /* free pages if they weren't inserted */ 1672 if (!success) { 1673 struct buffer_page *bpage, *tmp; 1674 list_for_each_entry_safe(bpage, tmp, &cpu_buffer->new_pages, 1675 list) { 1676 list_del_init(&bpage->list); 1677 free_buffer_page(bpage); 1678 } 1679 } 1680 return success; 1681 } 1682 1683 static void rb_update_pages(struct ring_buffer_per_cpu *cpu_buffer) 1684 { 1685 int success; 1686 1687 if (cpu_buffer->nr_pages_to_update > 0) 1688 success = rb_insert_pages(cpu_buffer); 1689 else 1690 success = rb_remove_pages(cpu_buffer, 1691 -cpu_buffer->nr_pages_to_update); 1692 1693 if (success) 1694 cpu_buffer->nr_pages += cpu_buffer->nr_pages_to_update; 1695 } 1696 1697 static void update_pages_handler(struct work_struct *work) 1698 { 1699 struct ring_buffer_per_cpu *cpu_buffer = container_of(work, 1700 struct ring_buffer_per_cpu, update_pages_work); 1701 rb_update_pages(cpu_buffer); 1702 complete(&cpu_buffer->update_done); 1703 } 1704 1705 /** 1706 * ring_buffer_resize - resize the ring buffer 1707 * @buffer: the buffer to resize. 1708 * @size: the new size. 1709 * @cpu_id: the cpu buffer to resize 1710 * 1711 * Minimum size is 2 * BUF_PAGE_SIZE. 1712 * 1713 * Returns 0 on success and < 0 on failure. 1714 */ 1715 int ring_buffer_resize(struct ring_buffer *buffer, unsigned long size, 1716 int cpu_id) 1717 { 1718 struct ring_buffer_per_cpu *cpu_buffer; 1719 unsigned long nr_pages; 1720 int cpu, err = 0; 1721 1722 /* 1723 * Always succeed at resizing a non-existent buffer: 1724 */ 1725 if (!buffer) 1726 return size; 1727 1728 /* Make sure the requested buffer exists */ 1729 if (cpu_id != RING_BUFFER_ALL_CPUS && 1730 !cpumask_test_cpu(cpu_id, buffer->cpumask)) 1731 return size; 1732 1733 nr_pages = DIV_ROUND_UP(size, BUF_PAGE_SIZE); 1734 1735 /* we need a minimum of two pages */ 1736 if (nr_pages < 2) 1737 nr_pages = 2; 1738 1739 size = nr_pages * BUF_PAGE_SIZE; 1740 1741 /* 1742 * Don't succeed if resizing is disabled, as a reader might be 1743 * manipulating the ring buffer and is expecting a sane state while 1744 * this is true. 1745 */ 1746 if (atomic_read(&buffer->resize_disabled)) 1747 return -EBUSY; 1748 1749 /* prevent another thread from changing buffer sizes */ 1750 mutex_lock(&buffer->mutex); 1751 1752 if (cpu_id == RING_BUFFER_ALL_CPUS) { 1753 /* calculate the pages to update */ 1754 for_each_buffer_cpu(buffer, cpu) { 1755 cpu_buffer = buffer->buffers[cpu]; 1756 1757 cpu_buffer->nr_pages_to_update = nr_pages - 1758 cpu_buffer->nr_pages; 1759 /* 1760 * nothing more to do for removing pages or no update 1761 */ 1762 if (cpu_buffer->nr_pages_to_update <= 0) 1763 continue; 1764 /* 1765 * to add pages, make sure all new pages can be 1766 * allocated without receiving ENOMEM 1767 */ 1768 INIT_LIST_HEAD(&cpu_buffer->new_pages); 1769 if (__rb_allocate_pages(cpu_buffer->nr_pages_to_update, 1770 &cpu_buffer->new_pages, cpu)) { 1771 /* not enough memory for new pages */ 1772 err = -ENOMEM; 1773 goto out_err; 1774 } 1775 } 1776 1777 get_online_cpus(); 1778 /* 1779 * Fire off all the required work handlers 1780 * We can't schedule on offline CPUs, but it's not necessary 1781 * since we can change their buffer sizes without any race. 1782 */ 1783 for_each_buffer_cpu(buffer, cpu) { 1784 cpu_buffer = buffer->buffers[cpu]; 1785 if (!cpu_buffer->nr_pages_to_update) 1786 continue; 1787 1788 /* Can't run something on an offline CPU. */ 1789 if (!cpu_online(cpu)) { 1790 rb_update_pages(cpu_buffer); 1791 cpu_buffer->nr_pages_to_update = 0; 1792 } else { 1793 schedule_work_on(cpu, 1794 &cpu_buffer->update_pages_work); 1795 } 1796 } 1797 1798 /* wait for all the updates to complete */ 1799 for_each_buffer_cpu(buffer, cpu) { 1800 cpu_buffer = buffer->buffers[cpu]; 1801 if (!cpu_buffer->nr_pages_to_update) 1802 continue; 1803 1804 if (cpu_online(cpu)) 1805 wait_for_completion(&cpu_buffer->update_done); 1806 cpu_buffer->nr_pages_to_update = 0; 1807 } 1808 1809 put_online_cpus(); 1810 } else { 1811 /* Make sure this CPU has been initialized */ 1812 if (!cpumask_test_cpu(cpu_id, buffer->cpumask)) 1813 goto out; 1814 1815 cpu_buffer = buffer->buffers[cpu_id]; 1816 1817 if (nr_pages == cpu_buffer->nr_pages) 1818 goto out; 1819 1820 cpu_buffer->nr_pages_to_update = nr_pages - 1821 cpu_buffer->nr_pages; 1822 1823 INIT_LIST_HEAD(&cpu_buffer->new_pages); 1824 if (cpu_buffer->nr_pages_to_update > 0 && 1825 __rb_allocate_pages(cpu_buffer->nr_pages_to_update, 1826 &cpu_buffer->new_pages, cpu_id)) { 1827 err = -ENOMEM; 1828 goto out_err; 1829 } 1830 1831 get_online_cpus(); 1832 1833 /* Can't run something on an offline CPU. */ 1834 if (!cpu_online(cpu_id)) 1835 rb_update_pages(cpu_buffer); 1836 else { 1837 schedule_work_on(cpu_id, 1838 &cpu_buffer->update_pages_work); 1839 wait_for_completion(&cpu_buffer->update_done); 1840 } 1841 1842 cpu_buffer->nr_pages_to_update = 0; 1843 put_online_cpus(); 1844 } 1845 1846 out: 1847 /* 1848 * The ring buffer resize can happen with the ring buffer 1849 * enabled, so that the update disturbs the tracing as little 1850 * as possible. But if the buffer is disabled, we do not need 1851 * to worry about that, and we can take the time to verify 1852 * that the buffer is not corrupt. 1853 */ 1854 if (atomic_read(&buffer->record_disabled)) { 1855 atomic_inc(&buffer->record_disabled); 1856 /* 1857 * Even though the buffer was disabled, we must make sure 1858 * that it is truly disabled before calling rb_check_pages. 1859 * There could have been a race between checking 1860 * record_disable and incrementing it. 1861 */ 1862 synchronize_rcu(); 1863 for_each_buffer_cpu(buffer, cpu) { 1864 cpu_buffer = buffer->buffers[cpu]; 1865 rb_check_pages(cpu_buffer); 1866 } 1867 atomic_dec(&buffer->record_disabled); 1868 } 1869 1870 mutex_unlock(&buffer->mutex); 1871 return size; 1872 1873 out_err: 1874 for_each_buffer_cpu(buffer, cpu) { 1875 struct buffer_page *bpage, *tmp; 1876 1877 cpu_buffer = buffer->buffers[cpu]; 1878 cpu_buffer->nr_pages_to_update = 0; 1879 1880 if (list_empty(&cpu_buffer->new_pages)) 1881 continue; 1882 1883 list_for_each_entry_safe(bpage, tmp, &cpu_buffer->new_pages, 1884 list) { 1885 list_del_init(&bpage->list); 1886 free_buffer_page(bpage); 1887 } 1888 } 1889 mutex_unlock(&buffer->mutex); 1890 return err; 1891 } 1892 EXPORT_SYMBOL_GPL(ring_buffer_resize); 1893 1894 void ring_buffer_change_overwrite(struct ring_buffer *buffer, int val) 1895 { 1896 mutex_lock(&buffer->mutex); 1897 if (val) 1898 buffer->flags |= RB_FL_OVERWRITE; 1899 else 1900 buffer->flags &= ~RB_FL_OVERWRITE; 1901 mutex_unlock(&buffer->mutex); 1902 } 1903 EXPORT_SYMBOL_GPL(ring_buffer_change_overwrite); 1904 1905 static __always_inline void *__rb_page_index(struct buffer_page *bpage, unsigned index) 1906 { 1907 return bpage->page->data + index; 1908 } 1909 1910 static __always_inline struct ring_buffer_event * 1911 rb_reader_event(struct ring_buffer_per_cpu *cpu_buffer) 1912 { 1913 return __rb_page_index(cpu_buffer->reader_page, 1914 cpu_buffer->reader_page->read); 1915 } 1916 1917 static __always_inline struct ring_buffer_event * 1918 rb_iter_head_event(struct ring_buffer_iter *iter) 1919 { 1920 return __rb_page_index(iter->head_page, iter->head); 1921 } 1922 1923 static __always_inline unsigned rb_page_commit(struct buffer_page *bpage) 1924 { 1925 return local_read(&bpage->page->commit); 1926 } 1927 1928 /* Size is determined by what has been committed */ 1929 static __always_inline unsigned rb_page_size(struct buffer_page *bpage) 1930 { 1931 return rb_page_commit(bpage); 1932 } 1933 1934 static __always_inline unsigned 1935 rb_commit_index(struct ring_buffer_per_cpu *cpu_buffer) 1936 { 1937 return rb_page_commit(cpu_buffer->commit_page); 1938 } 1939 1940 static __always_inline unsigned 1941 rb_event_index(struct ring_buffer_event *event) 1942 { 1943 unsigned long addr = (unsigned long)event; 1944 1945 return (addr & ~PAGE_MASK) - BUF_PAGE_HDR_SIZE; 1946 } 1947 1948 static void rb_inc_iter(struct ring_buffer_iter *iter) 1949 { 1950 struct ring_buffer_per_cpu *cpu_buffer = iter->cpu_buffer; 1951 1952 /* 1953 * The iterator could be on the reader page (it starts there). 1954 * But the head could have moved, since the reader was 1955 * found. Check for this case and assign the iterator 1956 * to the head page instead of next. 1957 */ 1958 if (iter->head_page == cpu_buffer->reader_page) 1959 iter->head_page = rb_set_head_page(cpu_buffer); 1960 else 1961 rb_inc_page(cpu_buffer, &iter->head_page); 1962 1963 iter->read_stamp = iter->head_page->page->time_stamp; 1964 iter->head = 0; 1965 } 1966 1967 /* 1968 * rb_handle_head_page - writer hit the head page 1969 * 1970 * Returns: +1 to retry page 1971 * 0 to continue 1972 * -1 on error 1973 */ 1974 static int 1975 rb_handle_head_page(struct ring_buffer_per_cpu *cpu_buffer, 1976 struct buffer_page *tail_page, 1977 struct buffer_page *next_page) 1978 { 1979 struct buffer_page *new_head; 1980 int entries; 1981 int type; 1982 int ret; 1983 1984 entries = rb_page_entries(next_page); 1985 1986 /* 1987 * The hard part is here. We need to move the head 1988 * forward, and protect against both readers on 1989 * other CPUs and writers coming in via interrupts. 1990 */ 1991 type = rb_head_page_set_update(cpu_buffer, next_page, tail_page, 1992 RB_PAGE_HEAD); 1993 1994 /* 1995 * type can be one of four: 1996 * NORMAL - an interrupt already moved it for us 1997 * HEAD - we are the first to get here. 1998 * UPDATE - we are the interrupt interrupting 1999 * a current move. 2000 * MOVED - a reader on another CPU moved the next 2001 * pointer to its reader page. Give up 2002 * and try again. 2003 */ 2004 2005 switch (type) { 2006 case RB_PAGE_HEAD: 2007 /* 2008 * We changed the head to UPDATE, thus 2009 * it is our responsibility to update 2010 * the counters. 2011 */ 2012 local_add(entries, &cpu_buffer->overrun); 2013 local_sub(BUF_PAGE_SIZE, &cpu_buffer->entries_bytes); 2014 2015 /* 2016 * The entries will be zeroed out when we move the 2017 * tail page. 2018 */ 2019 2020 /* still more to do */ 2021 break; 2022 2023 case RB_PAGE_UPDATE: 2024 /* 2025 * This is an interrupt that interrupt the 2026 * previous update. Still more to do. 2027 */ 2028 break; 2029 case RB_PAGE_NORMAL: 2030 /* 2031 * An interrupt came in before the update 2032 * and processed this for us. 2033 * Nothing left to do. 2034 */ 2035 return 1; 2036 case RB_PAGE_MOVED: 2037 /* 2038 * The reader is on another CPU and just did 2039 * a swap with our next_page. 2040 * Try again. 2041 */ 2042 return 1; 2043 default: 2044 RB_WARN_ON(cpu_buffer, 1); /* WTF??? */ 2045 return -1; 2046 } 2047 2048 /* 2049 * Now that we are here, the old head pointer is 2050 * set to UPDATE. This will keep the reader from 2051 * swapping the head page with the reader page. 2052 * The reader (on another CPU) will spin till 2053 * we are finished. 2054 * 2055 * We just need to protect against interrupts 2056 * doing the job. We will set the next pointer 2057 * to HEAD. After that, we set the old pointer 2058 * to NORMAL, but only if it was HEAD before. 2059 * otherwise we are an interrupt, and only 2060 * want the outer most commit to reset it. 2061 */ 2062 new_head = next_page; 2063 rb_inc_page(cpu_buffer, &new_head); 2064 2065 ret = rb_head_page_set_head(cpu_buffer, new_head, next_page, 2066 RB_PAGE_NORMAL); 2067 2068 /* 2069 * Valid returns are: 2070 * HEAD - an interrupt came in and already set it. 2071 * NORMAL - One of two things: 2072 * 1) We really set it. 2073 * 2) A bunch of interrupts came in and moved 2074 * the page forward again. 2075 */ 2076 switch (ret) { 2077 case RB_PAGE_HEAD: 2078 case RB_PAGE_NORMAL: 2079 /* OK */ 2080 break; 2081 default: 2082 RB_WARN_ON(cpu_buffer, 1); 2083 return -1; 2084 } 2085 2086 /* 2087 * It is possible that an interrupt came in, 2088 * set the head up, then more interrupts came in 2089 * and moved it again. When we get back here, 2090 * the page would have been set to NORMAL but we 2091 * just set it back to HEAD. 2092 * 2093 * How do you detect this? Well, if that happened 2094 * the tail page would have moved. 2095 */ 2096 if (ret == RB_PAGE_NORMAL) { 2097 struct buffer_page *buffer_tail_page; 2098 2099 buffer_tail_page = READ_ONCE(cpu_buffer->tail_page); 2100 /* 2101 * If the tail had moved passed next, then we need 2102 * to reset the pointer. 2103 */ 2104 if (buffer_tail_page != tail_page && 2105 buffer_tail_page != next_page) 2106 rb_head_page_set_normal(cpu_buffer, new_head, 2107 next_page, 2108 RB_PAGE_HEAD); 2109 } 2110 2111 /* 2112 * If this was the outer most commit (the one that 2113 * changed the original pointer from HEAD to UPDATE), 2114 * then it is up to us to reset it to NORMAL. 2115 */ 2116 if (type == RB_PAGE_HEAD) { 2117 ret = rb_head_page_set_normal(cpu_buffer, next_page, 2118 tail_page, 2119 RB_PAGE_UPDATE); 2120 if (RB_WARN_ON(cpu_buffer, 2121 ret != RB_PAGE_UPDATE)) 2122 return -1; 2123 } 2124 2125 return 0; 2126 } 2127 2128 static inline void 2129 rb_reset_tail(struct ring_buffer_per_cpu *cpu_buffer, 2130 unsigned long tail, struct rb_event_info *info) 2131 { 2132 struct buffer_page *tail_page = info->tail_page; 2133 struct ring_buffer_event *event; 2134 unsigned long length = info->length; 2135 2136 /* 2137 * Only the event that crossed the page boundary 2138 * must fill the old tail_page with padding. 2139 */ 2140 if (tail >= BUF_PAGE_SIZE) { 2141 /* 2142 * If the page was filled, then we still need 2143 * to update the real_end. Reset it to zero 2144 * and the reader will ignore it. 2145 */ 2146 if (tail == BUF_PAGE_SIZE) 2147 tail_page->real_end = 0; 2148 2149 local_sub(length, &tail_page->write); 2150 return; 2151 } 2152 2153 event = __rb_page_index(tail_page, tail); 2154 2155 /* account for padding bytes */ 2156 local_add(BUF_PAGE_SIZE - tail, &cpu_buffer->entries_bytes); 2157 2158 /* 2159 * Save the original length to the meta data. 2160 * This will be used by the reader to add lost event 2161 * counter. 2162 */ 2163 tail_page->real_end = tail; 2164 2165 /* 2166 * If this event is bigger than the minimum size, then 2167 * we need to be careful that we don't subtract the 2168 * write counter enough to allow another writer to slip 2169 * in on this page. 2170 * We put in a discarded commit instead, to make sure 2171 * that this space is not used again. 2172 * 2173 * If we are less than the minimum size, we don't need to 2174 * worry about it. 2175 */ 2176 if (tail > (BUF_PAGE_SIZE - RB_EVNT_MIN_SIZE)) { 2177 /* No room for any events */ 2178 2179 /* Mark the rest of the page with padding */ 2180 rb_event_set_padding(event); 2181 2182 /* Set the write back to the previous setting */ 2183 local_sub(length, &tail_page->write); 2184 return; 2185 } 2186 2187 /* Put in a discarded event */ 2188 event->array[0] = (BUF_PAGE_SIZE - tail) - RB_EVNT_HDR_SIZE; 2189 event->type_len = RINGBUF_TYPE_PADDING; 2190 /* time delta must be non zero */ 2191 event->time_delta = 1; 2192 2193 /* Set write to end of buffer */ 2194 length = (tail + length) - BUF_PAGE_SIZE; 2195 local_sub(length, &tail_page->write); 2196 } 2197 2198 static inline void rb_end_commit(struct ring_buffer_per_cpu *cpu_buffer); 2199 2200 /* 2201 * This is the slow path, force gcc not to inline it. 2202 */ 2203 static noinline struct ring_buffer_event * 2204 rb_move_tail(struct ring_buffer_per_cpu *cpu_buffer, 2205 unsigned long tail, struct rb_event_info *info) 2206 { 2207 struct buffer_page *tail_page = info->tail_page; 2208 struct buffer_page *commit_page = cpu_buffer->commit_page; 2209 struct ring_buffer *buffer = cpu_buffer->buffer; 2210 struct buffer_page *next_page; 2211 int ret; 2212 2213 next_page = tail_page; 2214 2215 rb_inc_page(cpu_buffer, &next_page); 2216 2217 /* 2218 * If for some reason, we had an interrupt storm that made 2219 * it all the way around the buffer, bail, and warn 2220 * about it. 2221 */ 2222 if (unlikely(next_page == commit_page)) { 2223 local_inc(&cpu_buffer->commit_overrun); 2224 goto out_reset; 2225 } 2226 2227 /* 2228 * This is where the fun begins! 2229 * 2230 * We are fighting against races between a reader that 2231 * could be on another CPU trying to swap its reader 2232 * page with the buffer head. 2233 * 2234 * We are also fighting against interrupts coming in and 2235 * moving the head or tail on us as well. 2236 * 2237 * If the next page is the head page then we have filled 2238 * the buffer, unless the commit page is still on the 2239 * reader page. 2240 */ 2241 if (rb_is_head_page(cpu_buffer, next_page, &tail_page->list)) { 2242 2243 /* 2244 * If the commit is not on the reader page, then 2245 * move the header page. 2246 */ 2247 if (!rb_is_reader_page(cpu_buffer->commit_page)) { 2248 /* 2249 * If we are not in overwrite mode, 2250 * this is easy, just stop here. 2251 */ 2252 if (!(buffer->flags & RB_FL_OVERWRITE)) { 2253 local_inc(&cpu_buffer->dropped_events); 2254 goto out_reset; 2255 } 2256 2257 ret = rb_handle_head_page(cpu_buffer, 2258 tail_page, 2259 next_page); 2260 if (ret < 0) 2261 goto out_reset; 2262 if (ret) 2263 goto out_again; 2264 } else { 2265 /* 2266 * We need to be careful here too. The 2267 * commit page could still be on the reader 2268 * page. We could have a small buffer, and 2269 * have filled up the buffer with events 2270 * from interrupts and such, and wrapped. 2271 * 2272 * Note, if the tail page is also the on the 2273 * reader_page, we let it move out. 2274 */ 2275 if (unlikely((cpu_buffer->commit_page != 2276 cpu_buffer->tail_page) && 2277 (cpu_buffer->commit_page == 2278 cpu_buffer->reader_page))) { 2279 local_inc(&cpu_buffer->commit_overrun); 2280 goto out_reset; 2281 } 2282 } 2283 } 2284 2285 rb_tail_page_update(cpu_buffer, tail_page, next_page); 2286 2287 out_again: 2288 2289 rb_reset_tail(cpu_buffer, tail, info); 2290 2291 /* Commit what we have for now. */ 2292 rb_end_commit(cpu_buffer); 2293 /* rb_end_commit() decs committing */ 2294 local_inc(&cpu_buffer->committing); 2295 2296 /* fail and let the caller try again */ 2297 return ERR_PTR(-EAGAIN); 2298 2299 out_reset: 2300 /* reset write */ 2301 rb_reset_tail(cpu_buffer, tail, info); 2302 2303 return NULL; 2304 } 2305 2306 /* Slow path, do not inline */ 2307 static noinline struct ring_buffer_event * 2308 rb_add_time_stamp(struct ring_buffer_event *event, u64 delta, bool abs) 2309 { 2310 if (abs) 2311 event->type_len = RINGBUF_TYPE_TIME_STAMP; 2312 else 2313 event->type_len = RINGBUF_TYPE_TIME_EXTEND; 2314 2315 /* Not the first event on the page, or not delta? */ 2316 if (abs || rb_event_index(event)) { 2317 event->time_delta = delta & TS_MASK; 2318 event->array[0] = delta >> TS_SHIFT; 2319 } else { 2320 /* nope, just zero it */ 2321 event->time_delta = 0; 2322 event->array[0] = 0; 2323 } 2324 2325 return skip_time_extend(event); 2326 } 2327 2328 static inline bool rb_event_is_commit(struct ring_buffer_per_cpu *cpu_buffer, 2329 struct ring_buffer_event *event); 2330 2331 /** 2332 * rb_update_event - update event type and data 2333 * @event: the event to update 2334 * @type: the type of event 2335 * @length: the size of the event field in the ring buffer 2336 * 2337 * Update the type and data fields of the event. The length 2338 * is the actual size that is written to the ring buffer, 2339 * and with this, we can determine what to place into the 2340 * data field. 2341 */ 2342 static void 2343 rb_update_event(struct ring_buffer_per_cpu *cpu_buffer, 2344 struct ring_buffer_event *event, 2345 struct rb_event_info *info) 2346 { 2347 unsigned length = info->length; 2348 u64 delta = info->delta; 2349 2350 /* Only a commit updates the timestamp */ 2351 if (unlikely(!rb_event_is_commit(cpu_buffer, event))) 2352 delta = 0; 2353 2354 /* 2355 * If we need to add a timestamp, then we 2356 * add it to the start of the reserved space. 2357 */ 2358 if (unlikely(info->add_timestamp)) { 2359 bool abs = ring_buffer_time_stamp_abs(cpu_buffer->buffer); 2360 2361 event = rb_add_time_stamp(event, info->delta, abs); 2362 length -= RB_LEN_TIME_EXTEND; 2363 delta = 0; 2364 } 2365 2366 event->time_delta = delta; 2367 length -= RB_EVNT_HDR_SIZE; 2368 if (length > RB_MAX_SMALL_DATA) { 2369 event->type_len = 0; 2370 event->array[0] = length; 2371 } else 2372 event->type_len = DIV_ROUND_UP(length, RB_ALIGNMENT); 2373 } 2374 2375 static unsigned rb_calculate_event_length(unsigned length) 2376 { 2377 struct ring_buffer_event event; /* Used only for sizeof array */ 2378 2379 /* zero length can cause confusions */ 2380 if (!length) 2381 length++; 2382 2383 if (length > RB_MAX_SMALL_DATA) 2384 length += sizeof(event.array[0]); 2385 2386 length += RB_EVNT_HDR_SIZE; 2387 length = ALIGN(length, RB_ALIGNMENT); 2388 2389 /* 2390 * In case the time delta is larger than the 27 bits for it 2391 * in the header, we need to add a timestamp. If another 2392 * event comes in when trying to discard this one to increase 2393 * the length, then the timestamp will be added in the allocated 2394 * space of this event. If length is bigger than the size needed 2395 * for the TIME_EXTEND, then padding has to be used. The events 2396 * length must be either RB_LEN_TIME_EXTEND, or greater than or equal 2397 * to RB_LEN_TIME_EXTEND + 8, as 8 is the minimum size for padding. 2398 * As length is a multiple of 4, we only need to worry if it 2399 * is 12 (RB_LEN_TIME_EXTEND + 4). 2400 */ 2401 if (length == RB_LEN_TIME_EXTEND + RB_ALIGNMENT) 2402 length += RB_ALIGNMENT; 2403 2404 return length; 2405 } 2406 2407 #ifndef CONFIG_HAVE_UNSTABLE_SCHED_CLOCK 2408 static inline bool sched_clock_stable(void) 2409 { 2410 return true; 2411 } 2412 #endif 2413 2414 static inline int 2415 rb_try_to_discard(struct ring_buffer_per_cpu *cpu_buffer, 2416 struct ring_buffer_event *event) 2417 { 2418 unsigned long new_index, old_index; 2419 struct buffer_page *bpage; 2420 unsigned long index; 2421 unsigned long addr; 2422 2423 new_index = rb_event_index(event); 2424 old_index = new_index + rb_event_ts_length(event); 2425 addr = (unsigned long)event; 2426 addr &= PAGE_MASK; 2427 2428 bpage = READ_ONCE(cpu_buffer->tail_page); 2429 2430 if (bpage->page == (void *)addr && rb_page_write(bpage) == old_index) { 2431 unsigned long write_mask = 2432 local_read(&bpage->write) & ~RB_WRITE_MASK; 2433 unsigned long event_length = rb_event_length(event); 2434 /* 2435 * This is on the tail page. It is possible that 2436 * a write could come in and move the tail page 2437 * and write to the next page. That is fine 2438 * because we just shorten what is on this page. 2439 */ 2440 old_index += write_mask; 2441 new_index += write_mask; 2442 index = local_cmpxchg(&bpage->write, old_index, new_index); 2443 if (index == old_index) { 2444 /* update counters */ 2445 local_sub(event_length, &cpu_buffer->entries_bytes); 2446 return 1; 2447 } 2448 } 2449 2450 /* could not discard */ 2451 return 0; 2452 } 2453 2454 static void rb_start_commit(struct ring_buffer_per_cpu *cpu_buffer) 2455 { 2456 local_inc(&cpu_buffer->committing); 2457 local_inc(&cpu_buffer->commits); 2458 } 2459 2460 static __always_inline void 2461 rb_set_commit_to_write(struct ring_buffer_per_cpu *cpu_buffer) 2462 { 2463 unsigned long max_count; 2464 2465 /* 2466 * We only race with interrupts and NMIs on this CPU. 2467 * If we own the commit event, then we can commit 2468 * all others that interrupted us, since the interruptions 2469 * are in stack format (they finish before they come 2470 * back to us). This allows us to do a simple loop to 2471 * assign the commit to the tail. 2472 */ 2473 again: 2474 max_count = cpu_buffer->nr_pages * 100; 2475 2476 while (cpu_buffer->commit_page != READ_ONCE(cpu_buffer->tail_page)) { 2477 if (RB_WARN_ON(cpu_buffer, !(--max_count))) 2478 return; 2479 if (RB_WARN_ON(cpu_buffer, 2480 rb_is_reader_page(cpu_buffer->tail_page))) 2481 return; 2482 local_set(&cpu_buffer->commit_page->page->commit, 2483 rb_page_write(cpu_buffer->commit_page)); 2484 rb_inc_page(cpu_buffer, &cpu_buffer->commit_page); 2485 /* Only update the write stamp if the page has an event */ 2486 if (rb_page_write(cpu_buffer->commit_page)) 2487 cpu_buffer->write_stamp = 2488 cpu_buffer->commit_page->page->time_stamp; 2489 /* add barrier to keep gcc from optimizing too much */ 2490 barrier(); 2491 } 2492 while (rb_commit_index(cpu_buffer) != 2493 rb_page_write(cpu_buffer->commit_page)) { 2494 2495 local_set(&cpu_buffer->commit_page->page->commit, 2496 rb_page_write(cpu_buffer->commit_page)); 2497 RB_WARN_ON(cpu_buffer, 2498 local_read(&cpu_buffer->commit_page->page->commit) & 2499 ~RB_WRITE_MASK); 2500 barrier(); 2501 } 2502 2503 /* again, keep gcc from optimizing */ 2504 barrier(); 2505 2506 /* 2507 * If an interrupt came in just after the first while loop 2508 * and pushed the tail page forward, we will be left with 2509 * a dangling commit that will never go forward. 2510 */ 2511 if (unlikely(cpu_buffer->commit_page != READ_ONCE(cpu_buffer->tail_page))) 2512 goto again; 2513 } 2514 2515 static __always_inline void rb_end_commit(struct ring_buffer_per_cpu *cpu_buffer) 2516 { 2517 unsigned long commits; 2518 2519 if (RB_WARN_ON(cpu_buffer, 2520 !local_read(&cpu_buffer->committing))) 2521 return; 2522 2523 again: 2524 commits = local_read(&cpu_buffer->commits); 2525 /* synchronize with interrupts */ 2526 barrier(); 2527 if (local_read(&cpu_buffer->committing) == 1) 2528 rb_set_commit_to_write(cpu_buffer); 2529 2530 local_dec(&cpu_buffer->committing); 2531 2532 /* synchronize with interrupts */ 2533 barrier(); 2534 2535 /* 2536 * Need to account for interrupts coming in between the 2537 * updating of the commit page and the clearing of the 2538 * committing counter. 2539 */ 2540 if (unlikely(local_read(&cpu_buffer->commits) != commits) && 2541 !local_read(&cpu_buffer->committing)) { 2542 local_inc(&cpu_buffer->committing); 2543 goto again; 2544 } 2545 } 2546 2547 static inline void rb_event_discard(struct ring_buffer_event *event) 2548 { 2549 if (extended_time(event)) 2550 event = skip_time_extend(event); 2551 2552 /* array[0] holds the actual length for the discarded event */ 2553 event->array[0] = rb_event_data_length(event) - RB_EVNT_HDR_SIZE; 2554 event->type_len = RINGBUF_TYPE_PADDING; 2555 /* time delta must be non zero */ 2556 if (!event->time_delta) 2557 event->time_delta = 1; 2558 } 2559 2560 static __always_inline bool 2561 rb_event_is_commit(struct ring_buffer_per_cpu *cpu_buffer, 2562 struct ring_buffer_event *event) 2563 { 2564 unsigned long addr = (unsigned long)event; 2565 unsigned long index; 2566 2567 index = rb_event_index(event); 2568 addr &= PAGE_MASK; 2569 2570 return cpu_buffer->commit_page->page == (void *)addr && 2571 rb_commit_index(cpu_buffer) == index; 2572 } 2573 2574 static __always_inline void 2575 rb_update_write_stamp(struct ring_buffer_per_cpu *cpu_buffer, 2576 struct ring_buffer_event *event) 2577 { 2578 u64 delta; 2579 2580 /* 2581 * The event first in the commit queue updates the 2582 * time stamp. 2583 */ 2584 if (rb_event_is_commit(cpu_buffer, event)) { 2585 /* 2586 * A commit event that is first on a page 2587 * updates the write timestamp with the page stamp 2588 */ 2589 if (!rb_event_index(event)) 2590 cpu_buffer->write_stamp = 2591 cpu_buffer->commit_page->page->time_stamp; 2592 else if (event->type_len == RINGBUF_TYPE_TIME_EXTEND) { 2593 delta = ring_buffer_event_time_stamp(event); 2594 cpu_buffer->write_stamp += delta; 2595 } else if (event->type_len == RINGBUF_TYPE_TIME_STAMP) { 2596 delta = ring_buffer_event_time_stamp(event); 2597 cpu_buffer->write_stamp = delta; 2598 } else 2599 cpu_buffer->write_stamp += event->time_delta; 2600 } 2601 } 2602 2603 static void rb_commit(struct ring_buffer_per_cpu *cpu_buffer, 2604 struct ring_buffer_event *event) 2605 { 2606 local_inc(&cpu_buffer->entries); 2607 rb_update_write_stamp(cpu_buffer, event); 2608 rb_end_commit(cpu_buffer); 2609 } 2610 2611 static __always_inline void 2612 rb_wakeups(struct ring_buffer *buffer, struct ring_buffer_per_cpu *cpu_buffer) 2613 { 2614 size_t nr_pages; 2615 size_t dirty; 2616 size_t full; 2617 2618 if (buffer->irq_work.waiters_pending) { 2619 buffer->irq_work.waiters_pending = false; 2620 /* irq_work_queue() supplies it's own memory barriers */ 2621 irq_work_queue(&buffer->irq_work.work); 2622 } 2623 2624 if (cpu_buffer->irq_work.waiters_pending) { 2625 cpu_buffer->irq_work.waiters_pending = false; 2626 /* irq_work_queue() supplies it's own memory barriers */ 2627 irq_work_queue(&cpu_buffer->irq_work.work); 2628 } 2629 2630 if (cpu_buffer->last_pages_touch == local_read(&cpu_buffer->pages_touched)) 2631 return; 2632 2633 if (cpu_buffer->reader_page == cpu_buffer->commit_page) 2634 return; 2635 2636 if (!cpu_buffer->irq_work.full_waiters_pending) 2637 return; 2638 2639 cpu_buffer->last_pages_touch = local_read(&cpu_buffer->pages_touched); 2640 2641 full = cpu_buffer->shortest_full; 2642 nr_pages = cpu_buffer->nr_pages; 2643 dirty = ring_buffer_nr_dirty_pages(buffer, cpu_buffer->cpu); 2644 if (full && nr_pages && (dirty * 100) <= full * nr_pages) 2645 return; 2646 2647 cpu_buffer->irq_work.wakeup_full = true; 2648 cpu_buffer->irq_work.full_waiters_pending = false; 2649 /* irq_work_queue() supplies it's own memory barriers */ 2650 irq_work_queue(&cpu_buffer->irq_work.work); 2651 } 2652 2653 /* 2654 * The lock and unlock are done within a preempt disable section. 2655 * The current_context per_cpu variable can only be modified 2656 * by the current task between lock and unlock. But it can 2657 * be modified more than once via an interrupt. To pass this 2658 * information from the lock to the unlock without having to 2659 * access the 'in_interrupt()' functions again (which do show 2660 * a bit of overhead in something as critical as function tracing, 2661 * we use a bitmask trick. 2662 * 2663 * bit 0 = NMI context 2664 * bit 1 = IRQ context 2665 * bit 2 = SoftIRQ context 2666 * bit 3 = normal context. 2667 * 2668 * This works because this is the order of contexts that can 2669 * preempt other contexts. A SoftIRQ never preempts an IRQ 2670 * context. 2671 * 2672 * When the context is determined, the corresponding bit is 2673 * checked and set (if it was set, then a recursion of that context 2674 * happened). 2675 * 2676 * On unlock, we need to clear this bit. To do so, just subtract 2677 * 1 from the current_context and AND it to itself. 2678 * 2679 * (binary) 2680 * 101 - 1 = 100 2681 * 101 & 100 = 100 (clearing bit zero) 2682 * 2683 * 1010 - 1 = 1001 2684 * 1010 & 1001 = 1000 (clearing bit 1) 2685 * 2686 * The least significant bit can be cleared this way, and it 2687 * just so happens that it is the same bit corresponding to 2688 * the current context. 2689 */ 2690 2691 static __always_inline int 2692 trace_recursive_lock(struct ring_buffer_per_cpu *cpu_buffer) 2693 { 2694 unsigned int val = cpu_buffer->current_context; 2695 unsigned long pc = preempt_count(); 2696 int bit; 2697 2698 if (!(pc & (NMI_MASK | HARDIRQ_MASK | SOFTIRQ_OFFSET))) 2699 bit = RB_CTX_NORMAL; 2700 else 2701 bit = pc & NMI_MASK ? RB_CTX_NMI : 2702 pc & HARDIRQ_MASK ? RB_CTX_IRQ : RB_CTX_SOFTIRQ; 2703 2704 if (unlikely(val & (1 << (bit + cpu_buffer->nest)))) 2705 return 1; 2706 2707 val |= (1 << (bit + cpu_buffer->nest)); 2708 cpu_buffer->current_context = val; 2709 2710 return 0; 2711 } 2712 2713 static __always_inline void 2714 trace_recursive_unlock(struct ring_buffer_per_cpu *cpu_buffer) 2715 { 2716 cpu_buffer->current_context &= 2717 cpu_buffer->current_context - (1 << cpu_buffer->nest); 2718 } 2719 2720 /* The recursive locking above uses 4 bits */ 2721 #define NESTED_BITS 4 2722 2723 /** 2724 * ring_buffer_nest_start - Allow to trace while nested 2725 * @buffer: The ring buffer to modify 2726 * 2727 * The ring buffer has a safety mechanism to prevent recursion. 2728 * But there may be a case where a trace needs to be done while 2729 * tracing something else. In this case, calling this function 2730 * will allow this function to nest within a currently active 2731 * ring_buffer_lock_reserve(). 2732 * 2733 * Call this function before calling another ring_buffer_lock_reserve() and 2734 * call ring_buffer_nest_end() after the nested ring_buffer_unlock_commit(). 2735 */ 2736 void ring_buffer_nest_start(struct ring_buffer *buffer) 2737 { 2738 struct ring_buffer_per_cpu *cpu_buffer; 2739 int cpu; 2740 2741 /* Enabled by ring_buffer_nest_end() */ 2742 preempt_disable_notrace(); 2743 cpu = raw_smp_processor_id(); 2744 cpu_buffer = buffer->buffers[cpu]; 2745 /* This is the shift value for the above recursive locking */ 2746 cpu_buffer->nest += NESTED_BITS; 2747 } 2748 2749 /** 2750 * ring_buffer_nest_end - Allow to trace while nested 2751 * @buffer: The ring buffer to modify 2752 * 2753 * Must be called after ring_buffer_nest_start() and after the 2754 * ring_buffer_unlock_commit(). 2755 */ 2756 void ring_buffer_nest_end(struct ring_buffer *buffer) 2757 { 2758 struct ring_buffer_per_cpu *cpu_buffer; 2759 int cpu; 2760 2761 /* disabled by ring_buffer_nest_start() */ 2762 cpu = raw_smp_processor_id(); 2763 cpu_buffer = buffer->buffers[cpu]; 2764 /* This is the shift value for the above recursive locking */ 2765 cpu_buffer->nest -= NESTED_BITS; 2766 preempt_enable_notrace(); 2767 } 2768 2769 /** 2770 * ring_buffer_unlock_commit - commit a reserved 2771 * @buffer: The buffer to commit to 2772 * @event: The event pointer to commit. 2773 * 2774 * This commits the data to the ring buffer, and releases any locks held. 2775 * 2776 * Must be paired with ring_buffer_lock_reserve. 2777 */ 2778 int ring_buffer_unlock_commit(struct ring_buffer *buffer, 2779 struct ring_buffer_event *event) 2780 { 2781 struct ring_buffer_per_cpu *cpu_buffer; 2782 int cpu = raw_smp_processor_id(); 2783 2784 cpu_buffer = buffer->buffers[cpu]; 2785 2786 rb_commit(cpu_buffer, event); 2787 2788 rb_wakeups(buffer, cpu_buffer); 2789 2790 trace_recursive_unlock(cpu_buffer); 2791 2792 preempt_enable_notrace(); 2793 2794 return 0; 2795 } 2796 EXPORT_SYMBOL_GPL(ring_buffer_unlock_commit); 2797 2798 static noinline void 2799 rb_handle_timestamp(struct ring_buffer_per_cpu *cpu_buffer, 2800 struct rb_event_info *info) 2801 { 2802 WARN_ONCE(info->delta > (1ULL << 59), 2803 KERN_WARNING "Delta way too big! %llu ts=%llu write stamp = %llu\n%s", 2804 (unsigned long long)info->delta, 2805 (unsigned long long)info->ts, 2806 (unsigned long long)cpu_buffer->write_stamp, 2807 sched_clock_stable() ? "" : 2808 "If you just came from a suspend/resume,\n" 2809 "please switch to the trace global clock:\n" 2810 " echo global > /sys/kernel/debug/tracing/trace_clock\n" 2811 "or add trace_clock=global to the kernel command line\n"); 2812 info->add_timestamp = 1; 2813 } 2814 2815 static struct ring_buffer_event * 2816 __rb_reserve_next(struct ring_buffer_per_cpu *cpu_buffer, 2817 struct rb_event_info *info) 2818 { 2819 struct ring_buffer_event *event; 2820 struct buffer_page *tail_page; 2821 unsigned long tail, write; 2822 2823 /* 2824 * If the time delta since the last event is too big to 2825 * hold in the time field of the event, then we append a 2826 * TIME EXTEND event ahead of the data event. 2827 */ 2828 if (unlikely(info->add_timestamp)) 2829 info->length += RB_LEN_TIME_EXTEND; 2830 2831 /* Don't let the compiler play games with cpu_buffer->tail_page */ 2832 tail_page = info->tail_page = READ_ONCE(cpu_buffer->tail_page); 2833 write = local_add_return(info->length, &tail_page->write); 2834 2835 /* set write to only the index of the write */ 2836 write &= RB_WRITE_MASK; 2837 tail = write - info->length; 2838 2839 /* 2840 * If this is the first commit on the page, then it has the same 2841 * timestamp as the page itself. 2842 */ 2843 if (!tail && !ring_buffer_time_stamp_abs(cpu_buffer->buffer)) 2844 info->delta = 0; 2845 2846 /* See if we shot pass the end of this buffer page */ 2847 if (unlikely(write > BUF_PAGE_SIZE)) 2848 return rb_move_tail(cpu_buffer, tail, info); 2849 2850 /* We reserved something on the buffer */ 2851 2852 event = __rb_page_index(tail_page, tail); 2853 rb_update_event(cpu_buffer, event, info); 2854 2855 local_inc(&tail_page->entries); 2856 2857 /* 2858 * If this is the first commit on the page, then update 2859 * its timestamp. 2860 */ 2861 if (!tail) 2862 tail_page->page->time_stamp = info->ts; 2863 2864 /* account for these added bytes */ 2865 local_add(info->length, &cpu_buffer->entries_bytes); 2866 2867 return event; 2868 } 2869 2870 static __always_inline struct ring_buffer_event * 2871 rb_reserve_next_event(struct ring_buffer *buffer, 2872 struct ring_buffer_per_cpu *cpu_buffer, 2873 unsigned long length) 2874 { 2875 struct ring_buffer_event *event; 2876 struct rb_event_info info; 2877 int nr_loops = 0; 2878 u64 diff; 2879 2880 rb_start_commit(cpu_buffer); 2881 2882 #ifdef CONFIG_RING_BUFFER_ALLOW_SWAP 2883 /* 2884 * Due to the ability to swap a cpu buffer from a buffer 2885 * it is possible it was swapped before we committed. 2886 * (committing stops a swap). We check for it here and 2887 * if it happened, we have to fail the write. 2888 */ 2889 barrier(); 2890 if (unlikely(READ_ONCE(cpu_buffer->buffer) != buffer)) { 2891 local_dec(&cpu_buffer->committing); 2892 local_dec(&cpu_buffer->commits); 2893 return NULL; 2894 } 2895 #endif 2896 2897 info.length = rb_calculate_event_length(length); 2898 again: 2899 info.add_timestamp = 0; 2900 info.delta = 0; 2901 2902 /* 2903 * We allow for interrupts to reenter here and do a trace. 2904 * If one does, it will cause this original code to loop 2905 * back here. Even with heavy interrupts happening, this 2906 * should only happen a few times in a row. If this happens 2907 * 1000 times in a row, there must be either an interrupt 2908 * storm or we have something buggy. 2909 * Bail! 2910 */ 2911 if (RB_WARN_ON(cpu_buffer, ++nr_loops > 1000)) 2912 goto out_fail; 2913 2914 info.ts = rb_time_stamp(cpu_buffer->buffer); 2915 diff = info.ts - cpu_buffer->write_stamp; 2916 2917 /* make sure this diff is calculated here */ 2918 barrier(); 2919 2920 if (ring_buffer_time_stamp_abs(buffer)) { 2921 info.delta = info.ts; 2922 rb_handle_timestamp(cpu_buffer, &info); 2923 } else /* Did the write stamp get updated already? */ 2924 if (likely(info.ts >= cpu_buffer->write_stamp)) { 2925 info.delta = diff; 2926 if (unlikely(test_time_stamp(info.delta))) 2927 rb_handle_timestamp(cpu_buffer, &info); 2928 } 2929 2930 event = __rb_reserve_next(cpu_buffer, &info); 2931 2932 if (unlikely(PTR_ERR(event) == -EAGAIN)) { 2933 if (info.add_timestamp) 2934 info.length -= RB_LEN_TIME_EXTEND; 2935 goto again; 2936 } 2937 2938 if (!event) 2939 goto out_fail; 2940 2941 return event; 2942 2943 out_fail: 2944 rb_end_commit(cpu_buffer); 2945 return NULL; 2946 } 2947 2948 /** 2949 * ring_buffer_lock_reserve - reserve a part of the buffer 2950 * @buffer: the ring buffer to reserve from 2951 * @length: the length of the data to reserve (excluding event header) 2952 * 2953 * Returns a reserved event on the ring buffer to copy directly to. 2954 * The user of this interface will need to get the body to write into 2955 * and can use the ring_buffer_event_data() interface. 2956 * 2957 * The length is the length of the data needed, not the event length 2958 * which also includes the event header. 2959 * 2960 * Must be paired with ring_buffer_unlock_commit, unless NULL is returned. 2961 * If NULL is returned, then nothing has been allocated or locked. 2962 */ 2963 struct ring_buffer_event * 2964 ring_buffer_lock_reserve(struct ring_buffer *buffer, unsigned long length) 2965 { 2966 struct ring_buffer_per_cpu *cpu_buffer; 2967 struct ring_buffer_event *event; 2968 int cpu; 2969 2970 /* If we are tracing schedule, we don't want to recurse */ 2971 preempt_disable_notrace(); 2972 2973 if (unlikely(atomic_read(&buffer->record_disabled))) 2974 goto out; 2975 2976 cpu = raw_smp_processor_id(); 2977 2978 if (unlikely(!cpumask_test_cpu(cpu, buffer->cpumask))) 2979 goto out; 2980 2981 cpu_buffer = buffer->buffers[cpu]; 2982 2983 if (unlikely(atomic_read(&cpu_buffer->record_disabled))) 2984 goto out; 2985 2986 if (unlikely(length > BUF_MAX_DATA_SIZE)) 2987 goto out; 2988 2989 if (unlikely(trace_recursive_lock(cpu_buffer))) 2990 goto out; 2991 2992 event = rb_reserve_next_event(buffer, cpu_buffer, length); 2993 if (!event) 2994 goto out_unlock; 2995 2996 return event; 2997 2998 out_unlock: 2999 trace_recursive_unlock(cpu_buffer); 3000 out: 3001 preempt_enable_notrace(); 3002 return NULL; 3003 } 3004 EXPORT_SYMBOL_GPL(ring_buffer_lock_reserve); 3005 3006 /* 3007 * Decrement the entries to the page that an event is on. 3008 * The event does not even need to exist, only the pointer 3009 * to the page it is on. This may only be called before the commit 3010 * takes place. 3011 */ 3012 static inline void 3013 rb_decrement_entry(struct ring_buffer_per_cpu *cpu_buffer, 3014 struct ring_buffer_event *event) 3015 { 3016 unsigned long addr = (unsigned long)event; 3017 struct buffer_page *bpage = cpu_buffer->commit_page; 3018 struct buffer_page *start; 3019 3020 addr &= PAGE_MASK; 3021 3022 /* Do the likely case first */ 3023 if (likely(bpage->page == (void *)addr)) { 3024 local_dec(&bpage->entries); 3025 return; 3026 } 3027 3028 /* 3029 * Because the commit page may be on the reader page we 3030 * start with the next page and check the end loop there. 3031 */ 3032 rb_inc_page(cpu_buffer, &bpage); 3033 start = bpage; 3034 do { 3035 if (bpage->page == (void *)addr) { 3036 local_dec(&bpage->entries); 3037 return; 3038 } 3039 rb_inc_page(cpu_buffer, &bpage); 3040 } while (bpage != start); 3041 3042 /* commit not part of this buffer?? */ 3043 RB_WARN_ON(cpu_buffer, 1); 3044 } 3045 3046 /** 3047 * ring_buffer_commit_discard - discard an event that has not been committed 3048 * @buffer: the ring buffer 3049 * @event: non committed event to discard 3050 * 3051 * Sometimes an event that is in the ring buffer needs to be ignored. 3052 * This function lets the user discard an event in the ring buffer 3053 * and then that event will not be read later. 3054 * 3055 * This function only works if it is called before the item has been 3056 * committed. It will try to free the event from the ring buffer 3057 * if another event has not been added behind it. 3058 * 3059 * If another event has been added behind it, it will set the event 3060 * up as discarded, and perform the commit. 3061 * 3062 * If this function is called, do not call ring_buffer_unlock_commit on 3063 * the event. 3064 */ 3065 void ring_buffer_discard_commit(struct ring_buffer *buffer, 3066 struct ring_buffer_event *event) 3067 { 3068 struct ring_buffer_per_cpu *cpu_buffer; 3069 int cpu; 3070 3071 /* The event is discarded regardless */ 3072 rb_event_discard(event); 3073 3074 cpu = smp_processor_id(); 3075 cpu_buffer = buffer->buffers[cpu]; 3076 3077 /* 3078 * This must only be called if the event has not been 3079 * committed yet. Thus we can assume that preemption 3080 * is still disabled. 3081 */ 3082 RB_WARN_ON(buffer, !local_read(&cpu_buffer->committing)); 3083 3084 rb_decrement_entry(cpu_buffer, event); 3085 if (rb_try_to_discard(cpu_buffer, event)) 3086 goto out; 3087 3088 /* 3089 * The commit is still visible by the reader, so we 3090 * must still update the timestamp. 3091 */ 3092 rb_update_write_stamp(cpu_buffer, event); 3093 out: 3094 rb_end_commit(cpu_buffer); 3095 3096 trace_recursive_unlock(cpu_buffer); 3097 3098 preempt_enable_notrace(); 3099 3100 } 3101 EXPORT_SYMBOL_GPL(ring_buffer_discard_commit); 3102 3103 /** 3104 * ring_buffer_write - write data to the buffer without reserving 3105 * @buffer: The ring buffer to write to. 3106 * @length: The length of the data being written (excluding the event header) 3107 * @data: The data to write to the buffer. 3108 * 3109 * This is like ring_buffer_lock_reserve and ring_buffer_unlock_commit as 3110 * one function. If you already have the data to write to the buffer, it 3111 * may be easier to simply call this function. 3112 * 3113 * Note, like ring_buffer_lock_reserve, the length is the length of the data 3114 * and not the length of the event which would hold the header. 3115 */ 3116 int ring_buffer_write(struct ring_buffer *buffer, 3117 unsigned long length, 3118 void *data) 3119 { 3120 struct ring_buffer_per_cpu *cpu_buffer; 3121 struct ring_buffer_event *event; 3122 void *body; 3123 int ret = -EBUSY; 3124 int cpu; 3125 3126 preempt_disable_notrace(); 3127 3128 if (atomic_read(&buffer->record_disabled)) 3129 goto out; 3130 3131 cpu = raw_smp_processor_id(); 3132 3133 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 3134 goto out; 3135 3136 cpu_buffer = buffer->buffers[cpu]; 3137 3138 if (atomic_read(&cpu_buffer->record_disabled)) 3139 goto out; 3140 3141 if (length > BUF_MAX_DATA_SIZE) 3142 goto out; 3143 3144 if (unlikely(trace_recursive_lock(cpu_buffer))) 3145 goto out; 3146 3147 event = rb_reserve_next_event(buffer, cpu_buffer, length); 3148 if (!event) 3149 goto out_unlock; 3150 3151 body = rb_event_data(event); 3152 3153 memcpy(body, data, length); 3154 3155 rb_commit(cpu_buffer, event); 3156 3157 rb_wakeups(buffer, cpu_buffer); 3158 3159 ret = 0; 3160 3161 out_unlock: 3162 trace_recursive_unlock(cpu_buffer); 3163 3164 out: 3165 preempt_enable_notrace(); 3166 3167 return ret; 3168 } 3169 EXPORT_SYMBOL_GPL(ring_buffer_write); 3170 3171 static bool rb_per_cpu_empty(struct ring_buffer_per_cpu *cpu_buffer) 3172 { 3173 struct buffer_page *reader = cpu_buffer->reader_page; 3174 struct buffer_page *head = rb_set_head_page(cpu_buffer); 3175 struct buffer_page *commit = cpu_buffer->commit_page; 3176 3177 /* In case of error, head will be NULL */ 3178 if (unlikely(!head)) 3179 return true; 3180 3181 return reader->read == rb_page_commit(reader) && 3182 (commit == reader || 3183 (commit == head && 3184 head->read == rb_page_commit(commit))); 3185 } 3186 3187 /** 3188 * ring_buffer_record_disable - stop all writes into the buffer 3189 * @buffer: The ring buffer to stop writes to. 3190 * 3191 * This prevents all writes to the buffer. Any attempt to write 3192 * to the buffer after this will fail and return NULL. 3193 * 3194 * The caller should call synchronize_rcu() after this. 3195 */ 3196 void ring_buffer_record_disable(struct ring_buffer *buffer) 3197 { 3198 atomic_inc(&buffer->record_disabled); 3199 } 3200 EXPORT_SYMBOL_GPL(ring_buffer_record_disable); 3201 3202 /** 3203 * ring_buffer_record_enable - enable writes to the buffer 3204 * @buffer: The ring buffer to enable writes 3205 * 3206 * Note, multiple disables will need the same number of enables 3207 * to truly enable the writing (much like preempt_disable). 3208 */ 3209 void ring_buffer_record_enable(struct ring_buffer *buffer) 3210 { 3211 atomic_dec(&buffer->record_disabled); 3212 } 3213 EXPORT_SYMBOL_GPL(ring_buffer_record_enable); 3214 3215 /** 3216 * ring_buffer_record_off - stop all writes into the buffer 3217 * @buffer: The ring buffer to stop writes to. 3218 * 3219 * This prevents all writes to the buffer. Any attempt to write 3220 * to the buffer after this will fail and return NULL. 3221 * 3222 * This is different than ring_buffer_record_disable() as 3223 * it works like an on/off switch, where as the disable() version 3224 * must be paired with a enable(). 3225 */ 3226 void ring_buffer_record_off(struct ring_buffer *buffer) 3227 { 3228 unsigned int rd; 3229 unsigned int new_rd; 3230 3231 do { 3232 rd = atomic_read(&buffer->record_disabled); 3233 new_rd = rd | RB_BUFFER_OFF; 3234 } while (atomic_cmpxchg(&buffer->record_disabled, rd, new_rd) != rd); 3235 } 3236 EXPORT_SYMBOL_GPL(ring_buffer_record_off); 3237 3238 /** 3239 * ring_buffer_record_on - restart writes into the buffer 3240 * @buffer: The ring buffer to start writes to. 3241 * 3242 * This enables all writes to the buffer that was disabled by 3243 * ring_buffer_record_off(). 3244 * 3245 * This is different than ring_buffer_record_enable() as 3246 * it works like an on/off switch, where as the enable() version 3247 * must be paired with a disable(). 3248 */ 3249 void ring_buffer_record_on(struct ring_buffer *buffer) 3250 { 3251 unsigned int rd; 3252 unsigned int new_rd; 3253 3254 do { 3255 rd = atomic_read(&buffer->record_disabled); 3256 new_rd = rd & ~RB_BUFFER_OFF; 3257 } while (atomic_cmpxchg(&buffer->record_disabled, rd, new_rd) != rd); 3258 } 3259 EXPORT_SYMBOL_GPL(ring_buffer_record_on); 3260 3261 /** 3262 * ring_buffer_record_is_on - return true if the ring buffer can write 3263 * @buffer: The ring buffer to see if write is enabled 3264 * 3265 * Returns true if the ring buffer is in a state that it accepts writes. 3266 */ 3267 bool ring_buffer_record_is_on(struct ring_buffer *buffer) 3268 { 3269 return !atomic_read(&buffer->record_disabled); 3270 } 3271 3272 /** 3273 * ring_buffer_record_is_set_on - return true if the ring buffer is set writable 3274 * @buffer: The ring buffer to see if write is set enabled 3275 * 3276 * Returns true if the ring buffer is set writable by ring_buffer_record_on(). 3277 * Note that this does NOT mean it is in a writable state. 3278 * 3279 * It may return true when the ring buffer has been disabled by 3280 * ring_buffer_record_disable(), as that is a temporary disabling of 3281 * the ring buffer. 3282 */ 3283 bool ring_buffer_record_is_set_on(struct ring_buffer *buffer) 3284 { 3285 return !(atomic_read(&buffer->record_disabled) & RB_BUFFER_OFF); 3286 } 3287 3288 /** 3289 * ring_buffer_record_disable_cpu - stop all writes into the cpu_buffer 3290 * @buffer: The ring buffer to stop writes to. 3291 * @cpu: The CPU buffer to stop 3292 * 3293 * This prevents all writes to the buffer. Any attempt to write 3294 * to the buffer after this will fail and return NULL. 3295 * 3296 * The caller should call synchronize_rcu() after this. 3297 */ 3298 void ring_buffer_record_disable_cpu(struct ring_buffer *buffer, int cpu) 3299 { 3300 struct ring_buffer_per_cpu *cpu_buffer; 3301 3302 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 3303 return; 3304 3305 cpu_buffer = buffer->buffers[cpu]; 3306 atomic_inc(&cpu_buffer->record_disabled); 3307 } 3308 EXPORT_SYMBOL_GPL(ring_buffer_record_disable_cpu); 3309 3310 /** 3311 * ring_buffer_record_enable_cpu - enable writes to the buffer 3312 * @buffer: The ring buffer to enable writes 3313 * @cpu: The CPU to enable. 3314 * 3315 * Note, multiple disables will need the same number of enables 3316 * to truly enable the writing (much like preempt_disable). 3317 */ 3318 void ring_buffer_record_enable_cpu(struct ring_buffer *buffer, int cpu) 3319 { 3320 struct ring_buffer_per_cpu *cpu_buffer; 3321 3322 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 3323 return; 3324 3325 cpu_buffer = buffer->buffers[cpu]; 3326 atomic_dec(&cpu_buffer->record_disabled); 3327 } 3328 EXPORT_SYMBOL_GPL(ring_buffer_record_enable_cpu); 3329 3330 /* 3331 * The total entries in the ring buffer is the running counter 3332 * of entries entered into the ring buffer, minus the sum of 3333 * the entries read from the ring buffer and the number of 3334 * entries that were overwritten. 3335 */ 3336 static inline unsigned long 3337 rb_num_of_entries(struct ring_buffer_per_cpu *cpu_buffer) 3338 { 3339 return local_read(&cpu_buffer->entries) - 3340 (local_read(&cpu_buffer->overrun) + cpu_buffer->read); 3341 } 3342 3343 /** 3344 * ring_buffer_oldest_event_ts - get the oldest event timestamp from the buffer 3345 * @buffer: The ring buffer 3346 * @cpu: The per CPU buffer to read from. 3347 */ 3348 u64 ring_buffer_oldest_event_ts(struct ring_buffer *buffer, int cpu) 3349 { 3350 unsigned long flags; 3351 struct ring_buffer_per_cpu *cpu_buffer; 3352 struct buffer_page *bpage; 3353 u64 ret = 0; 3354 3355 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 3356 return 0; 3357 3358 cpu_buffer = buffer->buffers[cpu]; 3359 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 3360 /* 3361 * if the tail is on reader_page, oldest time stamp is on the reader 3362 * page 3363 */ 3364 if (cpu_buffer->tail_page == cpu_buffer->reader_page) 3365 bpage = cpu_buffer->reader_page; 3366 else 3367 bpage = rb_set_head_page(cpu_buffer); 3368 if (bpage) 3369 ret = bpage->page->time_stamp; 3370 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 3371 3372 return ret; 3373 } 3374 EXPORT_SYMBOL_GPL(ring_buffer_oldest_event_ts); 3375 3376 /** 3377 * ring_buffer_bytes_cpu - get the number of bytes consumed in a cpu buffer 3378 * @buffer: The ring buffer 3379 * @cpu: The per CPU buffer to read from. 3380 */ 3381 unsigned long ring_buffer_bytes_cpu(struct ring_buffer *buffer, int cpu) 3382 { 3383 struct ring_buffer_per_cpu *cpu_buffer; 3384 unsigned long ret; 3385 3386 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 3387 return 0; 3388 3389 cpu_buffer = buffer->buffers[cpu]; 3390 ret = local_read(&cpu_buffer->entries_bytes) - cpu_buffer->read_bytes; 3391 3392 return ret; 3393 } 3394 EXPORT_SYMBOL_GPL(ring_buffer_bytes_cpu); 3395 3396 /** 3397 * ring_buffer_entries_cpu - get the number of entries in a cpu buffer 3398 * @buffer: The ring buffer 3399 * @cpu: The per CPU buffer to get the entries from. 3400 */ 3401 unsigned long ring_buffer_entries_cpu(struct ring_buffer *buffer, int cpu) 3402 { 3403 struct ring_buffer_per_cpu *cpu_buffer; 3404 3405 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 3406 return 0; 3407 3408 cpu_buffer = buffer->buffers[cpu]; 3409 3410 return rb_num_of_entries(cpu_buffer); 3411 } 3412 EXPORT_SYMBOL_GPL(ring_buffer_entries_cpu); 3413 3414 /** 3415 * ring_buffer_overrun_cpu - get the number of overruns caused by the ring 3416 * buffer wrapping around (only if RB_FL_OVERWRITE is on). 3417 * @buffer: The ring buffer 3418 * @cpu: The per CPU buffer to get the number of overruns from 3419 */ 3420 unsigned long ring_buffer_overrun_cpu(struct ring_buffer *buffer, int cpu) 3421 { 3422 struct ring_buffer_per_cpu *cpu_buffer; 3423 unsigned long ret; 3424 3425 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 3426 return 0; 3427 3428 cpu_buffer = buffer->buffers[cpu]; 3429 ret = local_read(&cpu_buffer->overrun); 3430 3431 return ret; 3432 } 3433 EXPORT_SYMBOL_GPL(ring_buffer_overrun_cpu); 3434 3435 /** 3436 * ring_buffer_commit_overrun_cpu - get the number of overruns caused by 3437 * commits failing due to the buffer wrapping around while there are uncommitted 3438 * events, such as during an interrupt storm. 3439 * @buffer: The ring buffer 3440 * @cpu: The per CPU buffer to get the number of overruns from 3441 */ 3442 unsigned long 3443 ring_buffer_commit_overrun_cpu(struct ring_buffer *buffer, int cpu) 3444 { 3445 struct ring_buffer_per_cpu *cpu_buffer; 3446 unsigned long ret; 3447 3448 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 3449 return 0; 3450 3451 cpu_buffer = buffer->buffers[cpu]; 3452 ret = local_read(&cpu_buffer->commit_overrun); 3453 3454 return ret; 3455 } 3456 EXPORT_SYMBOL_GPL(ring_buffer_commit_overrun_cpu); 3457 3458 /** 3459 * ring_buffer_dropped_events_cpu - get the number of dropped events caused by 3460 * the ring buffer filling up (only if RB_FL_OVERWRITE is off). 3461 * @buffer: The ring buffer 3462 * @cpu: The per CPU buffer to get the number of overruns from 3463 */ 3464 unsigned long 3465 ring_buffer_dropped_events_cpu(struct ring_buffer *buffer, int cpu) 3466 { 3467 struct ring_buffer_per_cpu *cpu_buffer; 3468 unsigned long ret; 3469 3470 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 3471 return 0; 3472 3473 cpu_buffer = buffer->buffers[cpu]; 3474 ret = local_read(&cpu_buffer->dropped_events); 3475 3476 return ret; 3477 } 3478 EXPORT_SYMBOL_GPL(ring_buffer_dropped_events_cpu); 3479 3480 /** 3481 * ring_buffer_read_events_cpu - get the number of events successfully read 3482 * @buffer: The ring buffer 3483 * @cpu: The per CPU buffer to get the number of events read 3484 */ 3485 unsigned long 3486 ring_buffer_read_events_cpu(struct ring_buffer *buffer, int cpu) 3487 { 3488 struct ring_buffer_per_cpu *cpu_buffer; 3489 3490 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 3491 return 0; 3492 3493 cpu_buffer = buffer->buffers[cpu]; 3494 return cpu_buffer->read; 3495 } 3496 EXPORT_SYMBOL_GPL(ring_buffer_read_events_cpu); 3497 3498 /** 3499 * ring_buffer_entries - get the number of entries in a buffer 3500 * @buffer: The ring buffer 3501 * 3502 * Returns the total number of entries in the ring buffer 3503 * (all CPU entries) 3504 */ 3505 unsigned long ring_buffer_entries(struct ring_buffer *buffer) 3506 { 3507 struct ring_buffer_per_cpu *cpu_buffer; 3508 unsigned long entries = 0; 3509 int cpu; 3510 3511 /* if you care about this being correct, lock the buffer */ 3512 for_each_buffer_cpu(buffer, cpu) { 3513 cpu_buffer = buffer->buffers[cpu]; 3514 entries += rb_num_of_entries(cpu_buffer); 3515 } 3516 3517 return entries; 3518 } 3519 EXPORT_SYMBOL_GPL(ring_buffer_entries); 3520 3521 /** 3522 * ring_buffer_overruns - get the number of overruns in buffer 3523 * @buffer: The ring buffer 3524 * 3525 * Returns the total number of overruns in the ring buffer 3526 * (all CPU entries) 3527 */ 3528 unsigned long ring_buffer_overruns(struct ring_buffer *buffer) 3529 { 3530 struct ring_buffer_per_cpu *cpu_buffer; 3531 unsigned long overruns = 0; 3532 int cpu; 3533 3534 /* if you care about this being correct, lock the buffer */ 3535 for_each_buffer_cpu(buffer, cpu) { 3536 cpu_buffer = buffer->buffers[cpu]; 3537 overruns += local_read(&cpu_buffer->overrun); 3538 } 3539 3540 return overruns; 3541 } 3542 EXPORT_SYMBOL_GPL(ring_buffer_overruns); 3543 3544 static void rb_iter_reset(struct ring_buffer_iter *iter) 3545 { 3546 struct ring_buffer_per_cpu *cpu_buffer = iter->cpu_buffer; 3547 3548 /* Iterator usage is expected to have record disabled */ 3549 iter->head_page = cpu_buffer->reader_page; 3550 iter->head = cpu_buffer->reader_page->read; 3551 3552 iter->cache_reader_page = iter->head_page; 3553 iter->cache_read = cpu_buffer->read; 3554 3555 if (iter->head) 3556 iter->read_stamp = cpu_buffer->read_stamp; 3557 else 3558 iter->read_stamp = iter->head_page->page->time_stamp; 3559 } 3560 3561 /** 3562 * ring_buffer_iter_reset - reset an iterator 3563 * @iter: The iterator to reset 3564 * 3565 * Resets the iterator, so that it will start from the beginning 3566 * again. 3567 */ 3568 void ring_buffer_iter_reset(struct ring_buffer_iter *iter) 3569 { 3570 struct ring_buffer_per_cpu *cpu_buffer; 3571 unsigned long flags; 3572 3573 if (!iter) 3574 return; 3575 3576 cpu_buffer = iter->cpu_buffer; 3577 3578 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 3579 rb_iter_reset(iter); 3580 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 3581 } 3582 EXPORT_SYMBOL_GPL(ring_buffer_iter_reset); 3583 3584 /** 3585 * ring_buffer_iter_empty - check if an iterator has no more to read 3586 * @iter: The iterator to check 3587 */ 3588 int ring_buffer_iter_empty(struct ring_buffer_iter *iter) 3589 { 3590 struct ring_buffer_per_cpu *cpu_buffer; 3591 struct buffer_page *reader; 3592 struct buffer_page *head_page; 3593 struct buffer_page *commit_page; 3594 unsigned commit; 3595 3596 cpu_buffer = iter->cpu_buffer; 3597 3598 /* Remember, trace recording is off when iterator is in use */ 3599 reader = cpu_buffer->reader_page; 3600 head_page = cpu_buffer->head_page; 3601 commit_page = cpu_buffer->commit_page; 3602 commit = rb_page_commit(commit_page); 3603 3604 return ((iter->head_page == commit_page && iter->head == commit) || 3605 (iter->head_page == reader && commit_page == head_page && 3606 head_page->read == commit && 3607 iter->head == rb_page_commit(cpu_buffer->reader_page))); 3608 } 3609 EXPORT_SYMBOL_GPL(ring_buffer_iter_empty); 3610 3611 static void 3612 rb_update_read_stamp(struct ring_buffer_per_cpu *cpu_buffer, 3613 struct ring_buffer_event *event) 3614 { 3615 u64 delta; 3616 3617 switch (event->type_len) { 3618 case RINGBUF_TYPE_PADDING: 3619 return; 3620 3621 case RINGBUF_TYPE_TIME_EXTEND: 3622 delta = ring_buffer_event_time_stamp(event); 3623 cpu_buffer->read_stamp += delta; 3624 return; 3625 3626 case RINGBUF_TYPE_TIME_STAMP: 3627 delta = ring_buffer_event_time_stamp(event); 3628 cpu_buffer->read_stamp = delta; 3629 return; 3630 3631 case RINGBUF_TYPE_DATA: 3632 cpu_buffer->read_stamp += event->time_delta; 3633 return; 3634 3635 default: 3636 BUG(); 3637 } 3638 return; 3639 } 3640 3641 static void 3642 rb_update_iter_read_stamp(struct ring_buffer_iter *iter, 3643 struct ring_buffer_event *event) 3644 { 3645 u64 delta; 3646 3647 switch (event->type_len) { 3648 case RINGBUF_TYPE_PADDING: 3649 return; 3650 3651 case RINGBUF_TYPE_TIME_EXTEND: 3652 delta = ring_buffer_event_time_stamp(event); 3653 iter->read_stamp += delta; 3654 return; 3655 3656 case RINGBUF_TYPE_TIME_STAMP: 3657 delta = ring_buffer_event_time_stamp(event); 3658 iter->read_stamp = delta; 3659 return; 3660 3661 case RINGBUF_TYPE_DATA: 3662 iter->read_stamp += event->time_delta; 3663 return; 3664 3665 default: 3666 BUG(); 3667 } 3668 return; 3669 } 3670 3671 static struct buffer_page * 3672 rb_get_reader_page(struct ring_buffer_per_cpu *cpu_buffer) 3673 { 3674 struct buffer_page *reader = NULL; 3675 unsigned long overwrite; 3676 unsigned long flags; 3677 int nr_loops = 0; 3678 int ret; 3679 3680 local_irq_save(flags); 3681 arch_spin_lock(&cpu_buffer->lock); 3682 3683 again: 3684 /* 3685 * This should normally only loop twice. But because the 3686 * start of the reader inserts an empty page, it causes 3687 * a case where we will loop three times. There should be no 3688 * reason to loop four times (that I know of). 3689 */ 3690 if (RB_WARN_ON(cpu_buffer, ++nr_loops > 3)) { 3691 reader = NULL; 3692 goto out; 3693 } 3694 3695 reader = cpu_buffer->reader_page; 3696 3697 /* If there's more to read, return this page */ 3698 if (cpu_buffer->reader_page->read < rb_page_size(reader)) 3699 goto out; 3700 3701 /* Never should we have an index greater than the size */ 3702 if (RB_WARN_ON(cpu_buffer, 3703 cpu_buffer->reader_page->read > rb_page_size(reader))) 3704 goto out; 3705 3706 /* check if we caught up to the tail */ 3707 reader = NULL; 3708 if (cpu_buffer->commit_page == cpu_buffer->reader_page) 3709 goto out; 3710 3711 /* Don't bother swapping if the ring buffer is empty */ 3712 if (rb_num_of_entries(cpu_buffer) == 0) 3713 goto out; 3714 3715 /* 3716 * Reset the reader page to size zero. 3717 */ 3718 local_set(&cpu_buffer->reader_page->write, 0); 3719 local_set(&cpu_buffer->reader_page->entries, 0); 3720 local_set(&cpu_buffer->reader_page->page->commit, 0); 3721 cpu_buffer->reader_page->real_end = 0; 3722 3723 spin: 3724 /* 3725 * Splice the empty reader page into the list around the head. 3726 */ 3727 reader = rb_set_head_page(cpu_buffer); 3728 if (!reader) 3729 goto out; 3730 cpu_buffer->reader_page->list.next = rb_list_head(reader->list.next); 3731 cpu_buffer->reader_page->list.prev = reader->list.prev; 3732 3733 /* 3734 * cpu_buffer->pages just needs to point to the buffer, it 3735 * has no specific buffer page to point to. Lets move it out 3736 * of our way so we don't accidentally swap it. 3737 */ 3738 cpu_buffer->pages = reader->list.prev; 3739 3740 /* The reader page will be pointing to the new head */ 3741 rb_set_list_to_head(cpu_buffer, &cpu_buffer->reader_page->list); 3742 3743 /* 3744 * We want to make sure we read the overruns after we set up our 3745 * pointers to the next object. The writer side does a 3746 * cmpxchg to cross pages which acts as the mb on the writer 3747 * side. Note, the reader will constantly fail the swap 3748 * while the writer is updating the pointers, so this 3749 * guarantees that the overwrite recorded here is the one we 3750 * want to compare with the last_overrun. 3751 */ 3752 smp_mb(); 3753 overwrite = local_read(&(cpu_buffer->overrun)); 3754 3755 /* 3756 * Here's the tricky part. 3757 * 3758 * We need to move the pointer past the header page. 3759 * But we can only do that if a writer is not currently 3760 * moving it. The page before the header page has the 3761 * flag bit '1' set if it is pointing to the page we want. 3762 * but if the writer is in the process of moving it 3763 * than it will be '2' or already moved '0'. 3764 */ 3765 3766 ret = rb_head_page_replace(reader, cpu_buffer->reader_page); 3767 3768 /* 3769 * If we did not convert it, then we must try again. 3770 */ 3771 if (!ret) 3772 goto spin; 3773 3774 /* 3775 * Yay! We succeeded in replacing the page. 3776 * 3777 * Now make the new head point back to the reader page. 3778 */ 3779 rb_list_head(reader->list.next)->prev = &cpu_buffer->reader_page->list; 3780 rb_inc_page(cpu_buffer, &cpu_buffer->head_page); 3781 3782 local_inc(&cpu_buffer->pages_read); 3783 3784 /* Finally update the reader page to the new head */ 3785 cpu_buffer->reader_page = reader; 3786 cpu_buffer->reader_page->read = 0; 3787 3788 if (overwrite != cpu_buffer->last_overrun) { 3789 cpu_buffer->lost_events = overwrite - cpu_buffer->last_overrun; 3790 cpu_buffer->last_overrun = overwrite; 3791 } 3792 3793 goto again; 3794 3795 out: 3796 /* Update the read_stamp on the first event */ 3797 if (reader && reader->read == 0) 3798 cpu_buffer->read_stamp = reader->page->time_stamp; 3799 3800 arch_spin_unlock(&cpu_buffer->lock); 3801 local_irq_restore(flags); 3802 3803 return reader; 3804 } 3805 3806 static void rb_advance_reader(struct ring_buffer_per_cpu *cpu_buffer) 3807 { 3808 struct ring_buffer_event *event; 3809 struct buffer_page *reader; 3810 unsigned length; 3811 3812 reader = rb_get_reader_page(cpu_buffer); 3813 3814 /* This function should not be called when buffer is empty */ 3815 if (RB_WARN_ON(cpu_buffer, !reader)) 3816 return; 3817 3818 event = rb_reader_event(cpu_buffer); 3819 3820 if (event->type_len <= RINGBUF_TYPE_DATA_TYPE_LEN_MAX) 3821 cpu_buffer->read++; 3822 3823 rb_update_read_stamp(cpu_buffer, event); 3824 3825 length = rb_event_length(event); 3826 cpu_buffer->reader_page->read += length; 3827 } 3828 3829 static void rb_advance_iter(struct ring_buffer_iter *iter) 3830 { 3831 struct ring_buffer_per_cpu *cpu_buffer; 3832 struct ring_buffer_event *event; 3833 unsigned length; 3834 3835 cpu_buffer = iter->cpu_buffer; 3836 3837 /* 3838 * Check if we are at the end of the buffer. 3839 */ 3840 if (iter->head >= rb_page_size(iter->head_page)) { 3841 /* discarded commits can make the page empty */ 3842 if (iter->head_page == cpu_buffer->commit_page) 3843 return; 3844 rb_inc_iter(iter); 3845 return; 3846 } 3847 3848 event = rb_iter_head_event(iter); 3849 3850 length = rb_event_length(event); 3851 3852 /* 3853 * This should not be called to advance the header if we are 3854 * at the tail of the buffer. 3855 */ 3856 if (RB_WARN_ON(cpu_buffer, 3857 (iter->head_page == cpu_buffer->commit_page) && 3858 (iter->head + length > rb_commit_index(cpu_buffer)))) 3859 return; 3860 3861 rb_update_iter_read_stamp(iter, event); 3862 3863 iter->head += length; 3864 3865 /* check for end of page padding */ 3866 if ((iter->head >= rb_page_size(iter->head_page)) && 3867 (iter->head_page != cpu_buffer->commit_page)) 3868 rb_inc_iter(iter); 3869 } 3870 3871 static int rb_lost_events(struct ring_buffer_per_cpu *cpu_buffer) 3872 { 3873 return cpu_buffer->lost_events; 3874 } 3875 3876 static struct ring_buffer_event * 3877 rb_buffer_peek(struct ring_buffer_per_cpu *cpu_buffer, u64 *ts, 3878 unsigned long *lost_events) 3879 { 3880 struct ring_buffer_event *event; 3881 struct buffer_page *reader; 3882 int nr_loops = 0; 3883 3884 if (ts) 3885 *ts = 0; 3886 again: 3887 /* 3888 * We repeat when a time extend is encountered. 3889 * Since the time extend is always attached to a data event, 3890 * we should never loop more than once. 3891 * (We never hit the following condition more than twice). 3892 */ 3893 if (RB_WARN_ON(cpu_buffer, ++nr_loops > 2)) 3894 return NULL; 3895 3896 reader = rb_get_reader_page(cpu_buffer); 3897 if (!reader) 3898 return NULL; 3899 3900 event = rb_reader_event(cpu_buffer); 3901 3902 switch (event->type_len) { 3903 case RINGBUF_TYPE_PADDING: 3904 if (rb_null_event(event)) 3905 RB_WARN_ON(cpu_buffer, 1); 3906 /* 3907 * Because the writer could be discarding every 3908 * event it creates (which would probably be bad) 3909 * if we were to go back to "again" then we may never 3910 * catch up, and will trigger the warn on, or lock 3911 * the box. Return the padding, and we will release 3912 * the current locks, and try again. 3913 */ 3914 return event; 3915 3916 case RINGBUF_TYPE_TIME_EXTEND: 3917 /* Internal data, OK to advance */ 3918 rb_advance_reader(cpu_buffer); 3919 goto again; 3920 3921 case RINGBUF_TYPE_TIME_STAMP: 3922 if (ts) { 3923 *ts = ring_buffer_event_time_stamp(event); 3924 ring_buffer_normalize_time_stamp(cpu_buffer->buffer, 3925 cpu_buffer->cpu, ts); 3926 } 3927 /* Internal data, OK to advance */ 3928 rb_advance_reader(cpu_buffer); 3929 goto again; 3930 3931 case RINGBUF_TYPE_DATA: 3932 if (ts && !(*ts)) { 3933 *ts = cpu_buffer->read_stamp + event->time_delta; 3934 ring_buffer_normalize_time_stamp(cpu_buffer->buffer, 3935 cpu_buffer->cpu, ts); 3936 } 3937 if (lost_events) 3938 *lost_events = rb_lost_events(cpu_buffer); 3939 return event; 3940 3941 default: 3942 BUG(); 3943 } 3944 3945 return NULL; 3946 } 3947 EXPORT_SYMBOL_GPL(ring_buffer_peek); 3948 3949 static struct ring_buffer_event * 3950 rb_iter_peek(struct ring_buffer_iter *iter, u64 *ts) 3951 { 3952 struct ring_buffer *buffer; 3953 struct ring_buffer_per_cpu *cpu_buffer; 3954 struct ring_buffer_event *event; 3955 int nr_loops = 0; 3956 3957 if (ts) 3958 *ts = 0; 3959 3960 cpu_buffer = iter->cpu_buffer; 3961 buffer = cpu_buffer->buffer; 3962 3963 /* 3964 * Check if someone performed a consuming read to 3965 * the buffer. A consuming read invalidates the iterator 3966 * and we need to reset the iterator in this case. 3967 */ 3968 if (unlikely(iter->cache_read != cpu_buffer->read || 3969 iter->cache_reader_page != cpu_buffer->reader_page)) 3970 rb_iter_reset(iter); 3971 3972 again: 3973 if (ring_buffer_iter_empty(iter)) 3974 return NULL; 3975 3976 /* 3977 * We repeat when a time extend is encountered or we hit 3978 * the end of the page. Since the time extend is always attached 3979 * to a data event, we should never loop more than three times. 3980 * Once for going to next page, once on time extend, and 3981 * finally once to get the event. 3982 * (We never hit the following condition more than thrice). 3983 */ 3984 if (RB_WARN_ON(cpu_buffer, ++nr_loops > 3)) 3985 return NULL; 3986 3987 if (rb_per_cpu_empty(cpu_buffer)) 3988 return NULL; 3989 3990 if (iter->head >= rb_page_size(iter->head_page)) { 3991 rb_inc_iter(iter); 3992 goto again; 3993 } 3994 3995 event = rb_iter_head_event(iter); 3996 3997 switch (event->type_len) { 3998 case RINGBUF_TYPE_PADDING: 3999 if (rb_null_event(event)) { 4000 rb_inc_iter(iter); 4001 goto again; 4002 } 4003 rb_advance_iter(iter); 4004 return event; 4005 4006 case RINGBUF_TYPE_TIME_EXTEND: 4007 /* Internal data, OK to advance */ 4008 rb_advance_iter(iter); 4009 goto again; 4010 4011 case RINGBUF_TYPE_TIME_STAMP: 4012 if (ts) { 4013 *ts = ring_buffer_event_time_stamp(event); 4014 ring_buffer_normalize_time_stamp(cpu_buffer->buffer, 4015 cpu_buffer->cpu, ts); 4016 } 4017 /* Internal data, OK to advance */ 4018 rb_advance_iter(iter); 4019 goto again; 4020 4021 case RINGBUF_TYPE_DATA: 4022 if (ts && !(*ts)) { 4023 *ts = iter->read_stamp + event->time_delta; 4024 ring_buffer_normalize_time_stamp(buffer, 4025 cpu_buffer->cpu, ts); 4026 } 4027 return event; 4028 4029 default: 4030 BUG(); 4031 } 4032 4033 return NULL; 4034 } 4035 EXPORT_SYMBOL_GPL(ring_buffer_iter_peek); 4036 4037 static inline bool rb_reader_lock(struct ring_buffer_per_cpu *cpu_buffer) 4038 { 4039 if (likely(!in_nmi())) { 4040 raw_spin_lock(&cpu_buffer->reader_lock); 4041 return true; 4042 } 4043 4044 /* 4045 * If an NMI die dumps out the content of the ring buffer 4046 * trylock must be used to prevent a deadlock if the NMI 4047 * preempted a task that holds the ring buffer locks. If 4048 * we get the lock then all is fine, if not, then continue 4049 * to do the read, but this can corrupt the ring buffer, 4050 * so it must be permanently disabled from future writes. 4051 * Reading from NMI is a oneshot deal. 4052 */ 4053 if (raw_spin_trylock(&cpu_buffer->reader_lock)) 4054 return true; 4055 4056 /* Continue without locking, but disable the ring buffer */ 4057 atomic_inc(&cpu_buffer->record_disabled); 4058 return false; 4059 } 4060 4061 static inline void 4062 rb_reader_unlock(struct ring_buffer_per_cpu *cpu_buffer, bool locked) 4063 { 4064 if (likely(locked)) 4065 raw_spin_unlock(&cpu_buffer->reader_lock); 4066 return; 4067 } 4068 4069 /** 4070 * ring_buffer_peek - peek at the next event to be read 4071 * @buffer: The ring buffer to read 4072 * @cpu: The cpu to peak at 4073 * @ts: The timestamp counter of this event. 4074 * @lost_events: a variable to store if events were lost (may be NULL) 4075 * 4076 * This will return the event that will be read next, but does 4077 * not consume the data. 4078 */ 4079 struct ring_buffer_event * 4080 ring_buffer_peek(struct ring_buffer *buffer, int cpu, u64 *ts, 4081 unsigned long *lost_events) 4082 { 4083 struct ring_buffer_per_cpu *cpu_buffer = buffer->buffers[cpu]; 4084 struct ring_buffer_event *event; 4085 unsigned long flags; 4086 bool dolock; 4087 4088 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 4089 return NULL; 4090 4091 again: 4092 local_irq_save(flags); 4093 dolock = rb_reader_lock(cpu_buffer); 4094 event = rb_buffer_peek(cpu_buffer, ts, lost_events); 4095 if (event && event->type_len == RINGBUF_TYPE_PADDING) 4096 rb_advance_reader(cpu_buffer); 4097 rb_reader_unlock(cpu_buffer, dolock); 4098 local_irq_restore(flags); 4099 4100 if (event && event->type_len == RINGBUF_TYPE_PADDING) 4101 goto again; 4102 4103 return event; 4104 } 4105 4106 /** 4107 * ring_buffer_iter_peek - peek at the next event to be read 4108 * @iter: The ring buffer iterator 4109 * @ts: The timestamp counter of this event. 4110 * 4111 * This will return the event that will be read next, but does 4112 * not increment the iterator. 4113 */ 4114 struct ring_buffer_event * 4115 ring_buffer_iter_peek(struct ring_buffer_iter *iter, u64 *ts) 4116 { 4117 struct ring_buffer_per_cpu *cpu_buffer = iter->cpu_buffer; 4118 struct ring_buffer_event *event; 4119 unsigned long flags; 4120 4121 again: 4122 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 4123 event = rb_iter_peek(iter, ts); 4124 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 4125 4126 if (event && event->type_len == RINGBUF_TYPE_PADDING) 4127 goto again; 4128 4129 return event; 4130 } 4131 4132 /** 4133 * ring_buffer_consume - return an event and consume it 4134 * @buffer: The ring buffer to get the next event from 4135 * @cpu: the cpu to read the buffer from 4136 * @ts: a variable to store the timestamp (may be NULL) 4137 * @lost_events: a variable to store if events were lost (may be NULL) 4138 * 4139 * Returns the next event in the ring buffer, and that event is consumed. 4140 * Meaning, that sequential reads will keep returning a different event, 4141 * and eventually empty the ring buffer if the producer is slower. 4142 */ 4143 struct ring_buffer_event * 4144 ring_buffer_consume(struct ring_buffer *buffer, int cpu, u64 *ts, 4145 unsigned long *lost_events) 4146 { 4147 struct ring_buffer_per_cpu *cpu_buffer; 4148 struct ring_buffer_event *event = NULL; 4149 unsigned long flags; 4150 bool dolock; 4151 4152 again: 4153 /* might be called in atomic */ 4154 preempt_disable(); 4155 4156 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 4157 goto out; 4158 4159 cpu_buffer = buffer->buffers[cpu]; 4160 local_irq_save(flags); 4161 dolock = rb_reader_lock(cpu_buffer); 4162 4163 event = rb_buffer_peek(cpu_buffer, ts, lost_events); 4164 if (event) { 4165 cpu_buffer->lost_events = 0; 4166 rb_advance_reader(cpu_buffer); 4167 } 4168 4169 rb_reader_unlock(cpu_buffer, dolock); 4170 local_irq_restore(flags); 4171 4172 out: 4173 preempt_enable(); 4174 4175 if (event && event->type_len == RINGBUF_TYPE_PADDING) 4176 goto again; 4177 4178 return event; 4179 } 4180 EXPORT_SYMBOL_GPL(ring_buffer_consume); 4181 4182 /** 4183 * ring_buffer_read_prepare - Prepare for a non consuming read of the buffer 4184 * @buffer: The ring buffer to read from 4185 * @cpu: The cpu buffer to iterate over 4186 * @flags: gfp flags to use for memory allocation 4187 * 4188 * This performs the initial preparations necessary to iterate 4189 * through the buffer. Memory is allocated, buffer recording 4190 * is disabled, and the iterator pointer is returned to the caller. 4191 * 4192 * Disabling buffer recording prevents the reading from being 4193 * corrupted. This is not a consuming read, so a producer is not 4194 * expected. 4195 * 4196 * After a sequence of ring_buffer_read_prepare calls, the user is 4197 * expected to make at least one call to ring_buffer_read_prepare_sync. 4198 * Afterwards, ring_buffer_read_start is invoked to get things going 4199 * for real. 4200 * 4201 * This overall must be paired with ring_buffer_read_finish. 4202 */ 4203 struct ring_buffer_iter * 4204 ring_buffer_read_prepare(struct ring_buffer *buffer, int cpu, gfp_t flags) 4205 { 4206 struct ring_buffer_per_cpu *cpu_buffer; 4207 struct ring_buffer_iter *iter; 4208 4209 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 4210 return NULL; 4211 4212 iter = kmalloc(sizeof(*iter), flags); 4213 if (!iter) 4214 return NULL; 4215 4216 cpu_buffer = buffer->buffers[cpu]; 4217 4218 iter->cpu_buffer = cpu_buffer; 4219 4220 atomic_inc(&buffer->resize_disabled); 4221 atomic_inc(&cpu_buffer->record_disabled); 4222 4223 return iter; 4224 } 4225 EXPORT_SYMBOL_GPL(ring_buffer_read_prepare); 4226 4227 /** 4228 * ring_buffer_read_prepare_sync - Synchronize a set of prepare calls 4229 * 4230 * All previously invoked ring_buffer_read_prepare calls to prepare 4231 * iterators will be synchronized. Afterwards, read_buffer_read_start 4232 * calls on those iterators are allowed. 4233 */ 4234 void 4235 ring_buffer_read_prepare_sync(void) 4236 { 4237 synchronize_rcu(); 4238 } 4239 EXPORT_SYMBOL_GPL(ring_buffer_read_prepare_sync); 4240 4241 /** 4242 * ring_buffer_read_start - start a non consuming read of the buffer 4243 * @iter: The iterator returned by ring_buffer_read_prepare 4244 * 4245 * This finalizes the startup of an iteration through the buffer. 4246 * The iterator comes from a call to ring_buffer_read_prepare and 4247 * an intervening ring_buffer_read_prepare_sync must have been 4248 * performed. 4249 * 4250 * Must be paired with ring_buffer_read_finish. 4251 */ 4252 void 4253 ring_buffer_read_start(struct ring_buffer_iter *iter) 4254 { 4255 struct ring_buffer_per_cpu *cpu_buffer; 4256 unsigned long flags; 4257 4258 if (!iter) 4259 return; 4260 4261 cpu_buffer = iter->cpu_buffer; 4262 4263 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 4264 arch_spin_lock(&cpu_buffer->lock); 4265 rb_iter_reset(iter); 4266 arch_spin_unlock(&cpu_buffer->lock); 4267 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 4268 } 4269 EXPORT_SYMBOL_GPL(ring_buffer_read_start); 4270 4271 /** 4272 * ring_buffer_read_finish - finish reading the iterator of the buffer 4273 * @iter: The iterator retrieved by ring_buffer_start 4274 * 4275 * This re-enables the recording to the buffer, and frees the 4276 * iterator. 4277 */ 4278 void 4279 ring_buffer_read_finish(struct ring_buffer_iter *iter) 4280 { 4281 struct ring_buffer_per_cpu *cpu_buffer = iter->cpu_buffer; 4282 unsigned long flags; 4283 4284 /* 4285 * Ring buffer is disabled from recording, here's a good place 4286 * to check the integrity of the ring buffer. 4287 * Must prevent readers from trying to read, as the check 4288 * clears the HEAD page and readers require it. 4289 */ 4290 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 4291 rb_check_pages(cpu_buffer); 4292 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 4293 4294 atomic_dec(&cpu_buffer->record_disabled); 4295 atomic_dec(&cpu_buffer->buffer->resize_disabled); 4296 kfree(iter); 4297 } 4298 EXPORT_SYMBOL_GPL(ring_buffer_read_finish); 4299 4300 /** 4301 * ring_buffer_read - read the next item in the ring buffer by the iterator 4302 * @iter: The ring buffer iterator 4303 * @ts: The time stamp of the event read. 4304 * 4305 * This reads the next event in the ring buffer and increments the iterator. 4306 */ 4307 struct ring_buffer_event * 4308 ring_buffer_read(struct ring_buffer_iter *iter, u64 *ts) 4309 { 4310 struct ring_buffer_event *event; 4311 struct ring_buffer_per_cpu *cpu_buffer = iter->cpu_buffer; 4312 unsigned long flags; 4313 4314 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 4315 again: 4316 event = rb_iter_peek(iter, ts); 4317 if (!event) 4318 goto out; 4319 4320 if (event->type_len == RINGBUF_TYPE_PADDING) 4321 goto again; 4322 4323 rb_advance_iter(iter); 4324 out: 4325 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 4326 4327 return event; 4328 } 4329 EXPORT_SYMBOL_GPL(ring_buffer_read); 4330 4331 /** 4332 * ring_buffer_size - return the size of the ring buffer (in bytes) 4333 * @buffer: The ring buffer. 4334 */ 4335 unsigned long ring_buffer_size(struct ring_buffer *buffer, int cpu) 4336 { 4337 /* 4338 * Earlier, this method returned 4339 * BUF_PAGE_SIZE * buffer->nr_pages 4340 * Since the nr_pages field is now removed, we have converted this to 4341 * return the per cpu buffer value. 4342 */ 4343 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 4344 return 0; 4345 4346 return BUF_PAGE_SIZE * buffer->buffers[cpu]->nr_pages; 4347 } 4348 EXPORT_SYMBOL_GPL(ring_buffer_size); 4349 4350 static void 4351 rb_reset_cpu(struct ring_buffer_per_cpu *cpu_buffer) 4352 { 4353 rb_head_page_deactivate(cpu_buffer); 4354 4355 cpu_buffer->head_page 4356 = list_entry(cpu_buffer->pages, struct buffer_page, list); 4357 local_set(&cpu_buffer->head_page->write, 0); 4358 local_set(&cpu_buffer->head_page->entries, 0); 4359 local_set(&cpu_buffer->head_page->page->commit, 0); 4360 4361 cpu_buffer->head_page->read = 0; 4362 4363 cpu_buffer->tail_page = cpu_buffer->head_page; 4364 cpu_buffer->commit_page = cpu_buffer->head_page; 4365 4366 INIT_LIST_HEAD(&cpu_buffer->reader_page->list); 4367 INIT_LIST_HEAD(&cpu_buffer->new_pages); 4368 local_set(&cpu_buffer->reader_page->write, 0); 4369 local_set(&cpu_buffer->reader_page->entries, 0); 4370 local_set(&cpu_buffer->reader_page->page->commit, 0); 4371 cpu_buffer->reader_page->read = 0; 4372 4373 local_set(&cpu_buffer->entries_bytes, 0); 4374 local_set(&cpu_buffer->overrun, 0); 4375 local_set(&cpu_buffer->commit_overrun, 0); 4376 local_set(&cpu_buffer->dropped_events, 0); 4377 local_set(&cpu_buffer->entries, 0); 4378 local_set(&cpu_buffer->committing, 0); 4379 local_set(&cpu_buffer->commits, 0); 4380 local_set(&cpu_buffer->pages_touched, 0); 4381 local_set(&cpu_buffer->pages_read, 0); 4382 cpu_buffer->last_pages_touch = 0; 4383 cpu_buffer->shortest_full = 0; 4384 cpu_buffer->read = 0; 4385 cpu_buffer->read_bytes = 0; 4386 4387 cpu_buffer->write_stamp = 0; 4388 cpu_buffer->read_stamp = 0; 4389 4390 cpu_buffer->lost_events = 0; 4391 cpu_buffer->last_overrun = 0; 4392 4393 rb_head_page_activate(cpu_buffer); 4394 } 4395 4396 /** 4397 * ring_buffer_reset_cpu - reset a ring buffer per CPU buffer 4398 * @buffer: The ring buffer to reset a per cpu buffer of 4399 * @cpu: The CPU buffer to be reset 4400 */ 4401 void ring_buffer_reset_cpu(struct ring_buffer *buffer, int cpu) 4402 { 4403 struct ring_buffer_per_cpu *cpu_buffer = buffer->buffers[cpu]; 4404 unsigned long flags; 4405 4406 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 4407 return; 4408 4409 atomic_inc(&buffer->resize_disabled); 4410 atomic_inc(&cpu_buffer->record_disabled); 4411 4412 /* Make sure all commits have finished */ 4413 synchronize_rcu(); 4414 4415 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 4416 4417 if (RB_WARN_ON(cpu_buffer, local_read(&cpu_buffer->committing))) 4418 goto out; 4419 4420 arch_spin_lock(&cpu_buffer->lock); 4421 4422 rb_reset_cpu(cpu_buffer); 4423 4424 arch_spin_unlock(&cpu_buffer->lock); 4425 4426 out: 4427 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 4428 4429 atomic_dec(&cpu_buffer->record_disabled); 4430 atomic_dec(&buffer->resize_disabled); 4431 } 4432 EXPORT_SYMBOL_GPL(ring_buffer_reset_cpu); 4433 4434 /** 4435 * ring_buffer_reset - reset a ring buffer 4436 * @buffer: The ring buffer to reset all cpu buffers 4437 */ 4438 void ring_buffer_reset(struct ring_buffer *buffer) 4439 { 4440 int cpu; 4441 4442 for_each_buffer_cpu(buffer, cpu) 4443 ring_buffer_reset_cpu(buffer, cpu); 4444 } 4445 EXPORT_SYMBOL_GPL(ring_buffer_reset); 4446 4447 /** 4448 * rind_buffer_empty - is the ring buffer empty? 4449 * @buffer: The ring buffer to test 4450 */ 4451 bool ring_buffer_empty(struct ring_buffer *buffer) 4452 { 4453 struct ring_buffer_per_cpu *cpu_buffer; 4454 unsigned long flags; 4455 bool dolock; 4456 int cpu; 4457 int ret; 4458 4459 /* yes this is racy, but if you don't like the race, lock the buffer */ 4460 for_each_buffer_cpu(buffer, cpu) { 4461 cpu_buffer = buffer->buffers[cpu]; 4462 local_irq_save(flags); 4463 dolock = rb_reader_lock(cpu_buffer); 4464 ret = rb_per_cpu_empty(cpu_buffer); 4465 rb_reader_unlock(cpu_buffer, dolock); 4466 local_irq_restore(flags); 4467 4468 if (!ret) 4469 return false; 4470 } 4471 4472 return true; 4473 } 4474 EXPORT_SYMBOL_GPL(ring_buffer_empty); 4475 4476 /** 4477 * ring_buffer_empty_cpu - is a cpu buffer of a ring buffer empty? 4478 * @buffer: The ring buffer 4479 * @cpu: The CPU buffer to test 4480 */ 4481 bool ring_buffer_empty_cpu(struct ring_buffer *buffer, int cpu) 4482 { 4483 struct ring_buffer_per_cpu *cpu_buffer; 4484 unsigned long flags; 4485 bool dolock; 4486 int ret; 4487 4488 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 4489 return true; 4490 4491 cpu_buffer = buffer->buffers[cpu]; 4492 local_irq_save(flags); 4493 dolock = rb_reader_lock(cpu_buffer); 4494 ret = rb_per_cpu_empty(cpu_buffer); 4495 rb_reader_unlock(cpu_buffer, dolock); 4496 local_irq_restore(flags); 4497 4498 return ret; 4499 } 4500 EXPORT_SYMBOL_GPL(ring_buffer_empty_cpu); 4501 4502 #ifdef CONFIG_RING_BUFFER_ALLOW_SWAP 4503 /** 4504 * ring_buffer_swap_cpu - swap a CPU buffer between two ring buffers 4505 * @buffer_a: One buffer to swap with 4506 * @buffer_b: The other buffer to swap with 4507 * 4508 * This function is useful for tracers that want to take a "snapshot" 4509 * of a CPU buffer and has another back up buffer lying around. 4510 * it is expected that the tracer handles the cpu buffer not being 4511 * used at the moment. 4512 */ 4513 int ring_buffer_swap_cpu(struct ring_buffer *buffer_a, 4514 struct ring_buffer *buffer_b, int cpu) 4515 { 4516 struct ring_buffer_per_cpu *cpu_buffer_a; 4517 struct ring_buffer_per_cpu *cpu_buffer_b; 4518 int ret = -EINVAL; 4519 4520 if (!cpumask_test_cpu(cpu, buffer_a->cpumask) || 4521 !cpumask_test_cpu(cpu, buffer_b->cpumask)) 4522 goto out; 4523 4524 cpu_buffer_a = buffer_a->buffers[cpu]; 4525 cpu_buffer_b = buffer_b->buffers[cpu]; 4526 4527 /* At least make sure the two buffers are somewhat the same */ 4528 if (cpu_buffer_a->nr_pages != cpu_buffer_b->nr_pages) 4529 goto out; 4530 4531 ret = -EAGAIN; 4532 4533 if (atomic_read(&buffer_a->record_disabled)) 4534 goto out; 4535 4536 if (atomic_read(&buffer_b->record_disabled)) 4537 goto out; 4538 4539 if (atomic_read(&cpu_buffer_a->record_disabled)) 4540 goto out; 4541 4542 if (atomic_read(&cpu_buffer_b->record_disabled)) 4543 goto out; 4544 4545 /* 4546 * We can't do a synchronize_rcu here because this 4547 * function can be called in atomic context. 4548 * Normally this will be called from the same CPU as cpu. 4549 * If not it's up to the caller to protect this. 4550 */ 4551 atomic_inc(&cpu_buffer_a->record_disabled); 4552 atomic_inc(&cpu_buffer_b->record_disabled); 4553 4554 ret = -EBUSY; 4555 if (local_read(&cpu_buffer_a->committing)) 4556 goto out_dec; 4557 if (local_read(&cpu_buffer_b->committing)) 4558 goto out_dec; 4559 4560 buffer_a->buffers[cpu] = cpu_buffer_b; 4561 buffer_b->buffers[cpu] = cpu_buffer_a; 4562 4563 cpu_buffer_b->buffer = buffer_a; 4564 cpu_buffer_a->buffer = buffer_b; 4565 4566 ret = 0; 4567 4568 out_dec: 4569 atomic_dec(&cpu_buffer_a->record_disabled); 4570 atomic_dec(&cpu_buffer_b->record_disabled); 4571 out: 4572 return ret; 4573 } 4574 EXPORT_SYMBOL_GPL(ring_buffer_swap_cpu); 4575 #endif /* CONFIG_RING_BUFFER_ALLOW_SWAP */ 4576 4577 /** 4578 * ring_buffer_alloc_read_page - allocate a page to read from buffer 4579 * @buffer: the buffer to allocate for. 4580 * @cpu: the cpu buffer to allocate. 4581 * 4582 * This function is used in conjunction with ring_buffer_read_page. 4583 * When reading a full page from the ring buffer, these functions 4584 * can be used to speed up the process. The calling function should 4585 * allocate a few pages first with this function. Then when it 4586 * needs to get pages from the ring buffer, it passes the result 4587 * of this function into ring_buffer_read_page, which will swap 4588 * the page that was allocated, with the read page of the buffer. 4589 * 4590 * Returns: 4591 * The page allocated, or ERR_PTR 4592 */ 4593 void *ring_buffer_alloc_read_page(struct ring_buffer *buffer, int cpu) 4594 { 4595 struct ring_buffer_per_cpu *cpu_buffer; 4596 struct buffer_data_page *bpage = NULL; 4597 unsigned long flags; 4598 struct page *page; 4599 4600 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 4601 return ERR_PTR(-ENODEV); 4602 4603 cpu_buffer = buffer->buffers[cpu]; 4604 local_irq_save(flags); 4605 arch_spin_lock(&cpu_buffer->lock); 4606 4607 if (cpu_buffer->free_page) { 4608 bpage = cpu_buffer->free_page; 4609 cpu_buffer->free_page = NULL; 4610 } 4611 4612 arch_spin_unlock(&cpu_buffer->lock); 4613 local_irq_restore(flags); 4614 4615 if (bpage) 4616 goto out; 4617 4618 page = alloc_pages_node(cpu_to_node(cpu), 4619 GFP_KERNEL | __GFP_NORETRY, 0); 4620 if (!page) 4621 return ERR_PTR(-ENOMEM); 4622 4623 bpage = page_address(page); 4624 4625 out: 4626 rb_init_page(bpage); 4627 4628 return bpage; 4629 } 4630 EXPORT_SYMBOL_GPL(ring_buffer_alloc_read_page); 4631 4632 /** 4633 * ring_buffer_free_read_page - free an allocated read page 4634 * @buffer: the buffer the page was allocate for 4635 * @cpu: the cpu buffer the page came from 4636 * @data: the page to free 4637 * 4638 * Free a page allocated from ring_buffer_alloc_read_page. 4639 */ 4640 void ring_buffer_free_read_page(struct ring_buffer *buffer, int cpu, void *data) 4641 { 4642 struct ring_buffer_per_cpu *cpu_buffer = buffer->buffers[cpu]; 4643 struct buffer_data_page *bpage = data; 4644 struct page *page = virt_to_page(bpage); 4645 unsigned long flags; 4646 4647 /* If the page is still in use someplace else, we can't reuse it */ 4648 if (page_ref_count(page) > 1) 4649 goto out; 4650 4651 local_irq_save(flags); 4652 arch_spin_lock(&cpu_buffer->lock); 4653 4654 if (!cpu_buffer->free_page) { 4655 cpu_buffer->free_page = bpage; 4656 bpage = NULL; 4657 } 4658 4659 arch_spin_unlock(&cpu_buffer->lock); 4660 local_irq_restore(flags); 4661 4662 out: 4663 free_page((unsigned long)bpage); 4664 } 4665 EXPORT_SYMBOL_GPL(ring_buffer_free_read_page); 4666 4667 /** 4668 * ring_buffer_read_page - extract a page from the ring buffer 4669 * @buffer: buffer to extract from 4670 * @data_page: the page to use allocated from ring_buffer_alloc_read_page 4671 * @len: amount to extract 4672 * @cpu: the cpu of the buffer to extract 4673 * @full: should the extraction only happen when the page is full. 4674 * 4675 * This function will pull out a page from the ring buffer and consume it. 4676 * @data_page must be the address of the variable that was returned 4677 * from ring_buffer_alloc_read_page. This is because the page might be used 4678 * to swap with a page in the ring buffer. 4679 * 4680 * for example: 4681 * rpage = ring_buffer_alloc_read_page(buffer, cpu); 4682 * if (IS_ERR(rpage)) 4683 * return PTR_ERR(rpage); 4684 * ret = ring_buffer_read_page(buffer, &rpage, len, cpu, 0); 4685 * if (ret >= 0) 4686 * process_page(rpage, ret); 4687 * 4688 * When @full is set, the function will not return true unless 4689 * the writer is off the reader page. 4690 * 4691 * Note: it is up to the calling functions to handle sleeps and wakeups. 4692 * The ring buffer can be used anywhere in the kernel and can not 4693 * blindly call wake_up. The layer that uses the ring buffer must be 4694 * responsible for that. 4695 * 4696 * Returns: 4697 * >=0 if data has been transferred, returns the offset of consumed data. 4698 * <0 if no data has been transferred. 4699 */ 4700 int ring_buffer_read_page(struct ring_buffer *buffer, 4701 void **data_page, size_t len, int cpu, int full) 4702 { 4703 struct ring_buffer_per_cpu *cpu_buffer = buffer->buffers[cpu]; 4704 struct ring_buffer_event *event; 4705 struct buffer_data_page *bpage; 4706 struct buffer_page *reader; 4707 unsigned long missed_events; 4708 unsigned long flags; 4709 unsigned int commit; 4710 unsigned int read; 4711 u64 save_timestamp; 4712 int ret = -1; 4713 4714 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 4715 goto out; 4716 4717 /* 4718 * If len is not big enough to hold the page header, then 4719 * we can not copy anything. 4720 */ 4721 if (len <= BUF_PAGE_HDR_SIZE) 4722 goto out; 4723 4724 len -= BUF_PAGE_HDR_SIZE; 4725 4726 if (!data_page) 4727 goto out; 4728 4729 bpage = *data_page; 4730 if (!bpage) 4731 goto out; 4732 4733 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 4734 4735 reader = rb_get_reader_page(cpu_buffer); 4736 if (!reader) 4737 goto out_unlock; 4738 4739 event = rb_reader_event(cpu_buffer); 4740 4741 read = reader->read; 4742 commit = rb_page_commit(reader); 4743 4744 /* Check if any events were dropped */ 4745 missed_events = cpu_buffer->lost_events; 4746 4747 /* 4748 * If this page has been partially read or 4749 * if len is not big enough to read the rest of the page or 4750 * a writer is still on the page, then 4751 * we must copy the data from the page to the buffer. 4752 * Otherwise, we can simply swap the page with the one passed in. 4753 */ 4754 if (read || (len < (commit - read)) || 4755 cpu_buffer->reader_page == cpu_buffer->commit_page) { 4756 struct buffer_data_page *rpage = cpu_buffer->reader_page->page; 4757 unsigned int rpos = read; 4758 unsigned int pos = 0; 4759 unsigned int size; 4760 4761 if (full) 4762 goto out_unlock; 4763 4764 if (len > (commit - read)) 4765 len = (commit - read); 4766 4767 /* Always keep the time extend and data together */ 4768 size = rb_event_ts_length(event); 4769 4770 if (len < size) 4771 goto out_unlock; 4772 4773 /* save the current timestamp, since the user will need it */ 4774 save_timestamp = cpu_buffer->read_stamp; 4775 4776 /* Need to copy one event at a time */ 4777 do { 4778 /* We need the size of one event, because 4779 * rb_advance_reader only advances by one event, 4780 * whereas rb_event_ts_length may include the size of 4781 * one or two events. 4782 * We have already ensured there's enough space if this 4783 * is a time extend. */ 4784 size = rb_event_length(event); 4785 memcpy(bpage->data + pos, rpage->data + rpos, size); 4786 4787 len -= size; 4788 4789 rb_advance_reader(cpu_buffer); 4790 rpos = reader->read; 4791 pos += size; 4792 4793 if (rpos >= commit) 4794 break; 4795 4796 event = rb_reader_event(cpu_buffer); 4797 /* Always keep the time extend and data together */ 4798 size = rb_event_ts_length(event); 4799 } while (len >= size); 4800 4801 /* update bpage */ 4802 local_set(&bpage->commit, pos); 4803 bpage->time_stamp = save_timestamp; 4804 4805 /* we copied everything to the beginning */ 4806 read = 0; 4807 } else { 4808 /* update the entry counter */ 4809 cpu_buffer->read += rb_page_entries(reader); 4810 cpu_buffer->read_bytes += BUF_PAGE_SIZE; 4811 4812 /* swap the pages */ 4813 rb_init_page(bpage); 4814 bpage = reader->page; 4815 reader->page = *data_page; 4816 local_set(&reader->write, 0); 4817 local_set(&reader->entries, 0); 4818 reader->read = 0; 4819 *data_page = bpage; 4820 4821 /* 4822 * Use the real_end for the data size, 4823 * This gives us a chance to store the lost events 4824 * on the page. 4825 */ 4826 if (reader->real_end) 4827 local_set(&bpage->commit, reader->real_end); 4828 } 4829 ret = read; 4830 4831 cpu_buffer->lost_events = 0; 4832 4833 commit = local_read(&bpage->commit); 4834 /* 4835 * Set a flag in the commit field if we lost events 4836 */ 4837 if (missed_events) { 4838 /* If there is room at the end of the page to save the 4839 * missed events, then record it there. 4840 */ 4841 if (BUF_PAGE_SIZE - commit >= sizeof(missed_events)) { 4842 memcpy(&bpage->data[commit], &missed_events, 4843 sizeof(missed_events)); 4844 local_add(RB_MISSED_STORED, &bpage->commit); 4845 commit += sizeof(missed_events); 4846 } 4847 local_add(RB_MISSED_EVENTS, &bpage->commit); 4848 } 4849 4850 /* 4851 * This page may be off to user land. Zero it out here. 4852 */ 4853 if (commit < BUF_PAGE_SIZE) 4854 memset(&bpage->data[commit], 0, BUF_PAGE_SIZE - commit); 4855 4856 out_unlock: 4857 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 4858 4859 out: 4860 return ret; 4861 } 4862 EXPORT_SYMBOL_GPL(ring_buffer_read_page); 4863 4864 /* 4865 * We only allocate new buffers, never free them if the CPU goes down. 4866 * If we were to free the buffer, then the user would lose any trace that was in 4867 * the buffer. 4868 */ 4869 int trace_rb_cpu_prepare(unsigned int cpu, struct hlist_node *node) 4870 { 4871 struct ring_buffer *buffer; 4872 long nr_pages_same; 4873 int cpu_i; 4874 unsigned long nr_pages; 4875 4876 buffer = container_of(node, struct ring_buffer, node); 4877 if (cpumask_test_cpu(cpu, buffer->cpumask)) 4878 return 0; 4879 4880 nr_pages = 0; 4881 nr_pages_same = 1; 4882 /* check if all cpu sizes are same */ 4883 for_each_buffer_cpu(buffer, cpu_i) { 4884 /* fill in the size from first enabled cpu */ 4885 if (nr_pages == 0) 4886 nr_pages = buffer->buffers[cpu_i]->nr_pages; 4887 if (nr_pages != buffer->buffers[cpu_i]->nr_pages) { 4888 nr_pages_same = 0; 4889 break; 4890 } 4891 } 4892 /* allocate minimum pages, user can later expand it */ 4893 if (!nr_pages_same) 4894 nr_pages = 2; 4895 buffer->buffers[cpu] = 4896 rb_allocate_cpu_buffer(buffer, nr_pages, cpu); 4897 if (!buffer->buffers[cpu]) { 4898 WARN(1, "failed to allocate ring buffer on CPU %u\n", 4899 cpu); 4900 return -ENOMEM; 4901 } 4902 smp_wmb(); 4903 cpumask_set_cpu(cpu, buffer->cpumask); 4904 return 0; 4905 } 4906 4907 #ifdef CONFIG_RING_BUFFER_STARTUP_TEST 4908 /* 4909 * This is a basic integrity check of the ring buffer. 4910 * Late in the boot cycle this test will run when configured in. 4911 * It will kick off a thread per CPU that will go into a loop 4912 * writing to the per cpu ring buffer various sizes of data. 4913 * Some of the data will be large items, some small. 4914 * 4915 * Another thread is created that goes into a spin, sending out 4916 * IPIs to the other CPUs to also write into the ring buffer. 4917 * this is to test the nesting ability of the buffer. 4918 * 4919 * Basic stats are recorded and reported. If something in the 4920 * ring buffer should happen that's not expected, a big warning 4921 * is displayed and all ring buffers are disabled. 4922 */ 4923 static struct task_struct *rb_threads[NR_CPUS] __initdata; 4924 4925 struct rb_test_data { 4926 struct ring_buffer *buffer; 4927 unsigned long events; 4928 unsigned long bytes_written; 4929 unsigned long bytes_alloc; 4930 unsigned long bytes_dropped; 4931 unsigned long events_nested; 4932 unsigned long bytes_written_nested; 4933 unsigned long bytes_alloc_nested; 4934 unsigned long bytes_dropped_nested; 4935 int min_size_nested; 4936 int max_size_nested; 4937 int max_size; 4938 int min_size; 4939 int cpu; 4940 int cnt; 4941 }; 4942 4943 static struct rb_test_data rb_data[NR_CPUS] __initdata; 4944 4945 /* 1 meg per cpu */ 4946 #define RB_TEST_BUFFER_SIZE 1048576 4947 4948 static char rb_string[] __initdata = 4949 "abcdefghijklmnopqrstuvwxyz1234567890!@#$%^&*()?+\\" 4950 "?+|:';\",.<>/?abcdefghijklmnopqrstuvwxyz1234567890" 4951 "!@#$%^&*()?+\\?+|:';\",.<>/?abcdefghijklmnopqrstuv"; 4952 4953 static bool rb_test_started __initdata; 4954 4955 struct rb_item { 4956 int size; 4957 char str[]; 4958 }; 4959 4960 static __init int rb_write_something(struct rb_test_data *data, bool nested) 4961 { 4962 struct ring_buffer_event *event; 4963 struct rb_item *item; 4964 bool started; 4965 int event_len; 4966 int size; 4967 int len; 4968 int cnt; 4969 4970 /* Have nested writes different that what is written */ 4971 cnt = data->cnt + (nested ? 27 : 0); 4972 4973 /* Multiply cnt by ~e, to make some unique increment */ 4974 size = (cnt * 68 / 25) % (sizeof(rb_string) - 1); 4975 4976 len = size + sizeof(struct rb_item); 4977 4978 started = rb_test_started; 4979 /* read rb_test_started before checking buffer enabled */ 4980 smp_rmb(); 4981 4982 event = ring_buffer_lock_reserve(data->buffer, len); 4983 if (!event) { 4984 /* Ignore dropped events before test starts. */ 4985 if (started) { 4986 if (nested) 4987 data->bytes_dropped += len; 4988 else 4989 data->bytes_dropped_nested += len; 4990 } 4991 return len; 4992 } 4993 4994 event_len = ring_buffer_event_length(event); 4995 4996 if (RB_WARN_ON(data->buffer, event_len < len)) 4997 goto out; 4998 4999 item = ring_buffer_event_data(event); 5000 item->size = size; 5001 memcpy(item->str, rb_string, size); 5002 5003 if (nested) { 5004 data->bytes_alloc_nested += event_len; 5005 data->bytes_written_nested += len; 5006 data->events_nested++; 5007 if (!data->min_size_nested || len < data->min_size_nested) 5008 data->min_size_nested = len; 5009 if (len > data->max_size_nested) 5010 data->max_size_nested = len; 5011 } else { 5012 data->bytes_alloc += event_len; 5013 data->bytes_written += len; 5014 data->events++; 5015 if (!data->min_size || len < data->min_size) 5016 data->max_size = len; 5017 if (len > data->max_size) 5018 data->max_size = len; 5019 } 5020 5021 out: 5022 ring_buffer_unlock_commit(data->buffer, event); 5023 5024 return 0; 5025 } 5026 5027 static __init int rb_test(void *arg) 5028 { 5029 struct rb_test_data *data = arg; 5030 5031 while (!kthread_should_stop()) { 5032 rb_write_something(data, false); 5033 data->cnt++; 5034 5035 set_current_state(TASK_INTERRUPTIBLE); 5036 /* Now sleep between a min of 100-300us and a max of 1ms */ 5037 usleep_range(((data->cnt % 3) + 1) * 100, 1000); 5038 } 5039 5040 return 0; 5041 } 5042 5043 static __init void rb_ipi(void *ignore) 5044 { 5045 struct rb_test_data *data; 5046 int cpu = smp_processor_id(); 5047 5048 data = &rb_data[cpu]; 5049 rb_write_something(data, true); 5050 } 5051 5052 static __init int rb_hammer_test(void *arg) 5053 { 5054 while (!kthread_should_stop()) { 5055 5056 /* Send an IPI to all cpus to write data! */ 5057 smp_call_function(rb_ipi, NULL, 1); 5058 /* No sleep, but for non preempt, let others run */ 5059 schedule(); 5060 } 5061 5062 return 0; 5063 } 5064 5065 static __init int test_ringbuffer(void) 5066 { 5067 struct task_struct *rb_hammer; 5068 struct ring_buffer *buffer; 5069 int cpu; 5070 int ret = 0; 5071 5072 if (security_locked_down(LOCKDOWN_TRACEFS)) { 5073 pr_warning("Lockdown is enabled, skipping ring buffer tests\n"); 5074 return 0; 5075 } 5076 5077 pr_info("Running ring buffer tests...\n"); 5078 5079 buffer = ring_buffer_alloc(RB_TEST_BUFFER_SIZE, RB_FL_OVERWRITE); 5080 if (WARN_ON(!buffer)) 5081 return 0; 5082 5083 /* Disable buffer so that threads can't write to it yet */ 5084 ring_buffer_record_off(buffer); 5085 5086 for_each_online_cpu(cpu) { 5087 rb_data[cpu].buffer = buffer; 5088 rb_data[cpu].cpu = cpu; 5089 rb_data[cpu].cnt = cpu; 5090 rb_threads[cpu] = kthread_create(rb_test, &rb_data[cpu], 5091 "rbtester/%d", cpu); 5092 if (WARN_ON(IS_ERR(rb_threads[cpu]))) { 5093 pr_cont("FAILED\n"); 5094 ret = PTR_ERR(rb_threads[cpu]); 5095 goto out_free; 5096 } 5097 5098 kthread_bind(rb_threads[cpu], cpu); 5099 wake_up_process(rb_threads[cpu]); 5100 } 5101 5102 /* Now create the rb hammer! */ 5103 rb_hammer = kthread_run(rb_hammer_test, NULL, "rbhammer"); 5104 if (WARN_ON(IS_ERR(rb_hammer))) { 5105 pr_cont("FAILED\n"); 5106 ret = PTR_ERR(rb_hammer); 5107 goto out_free; 5108 } 5109 5110 ring_buffer_record_on(buffer); 5111 /* 5112 * Show buffer is enabled before setting rb_test_started. 5113 * Yes there's a small race window where events could be 5114 * dropped and the thread wont catch it. But when a ring 5115 * buffer gets enabled, there will always be some kind of 5116 * delay before other CPUs see it. Thus, we don't care about 5117 * those dropped events. We care about events dropped after 5118 * the threads see that the buffer is active. 5119 */ 5120 smp_wmb(); 5121 rb_test_started = true; 5122 5123 set_current_state(TASK_INTERRUPTIBLE); 5124 /* Just run for 10 seconds */; 5125 schedule_timeout(10 * HZ); 5126 5127 kthread_stop(rb_hammer); 5128 5129 out_free: 5130 for_each_online_cpu(cpu) { 5131 if (!rb_threads[cpu]) 5132 break; 5133 kthread_stop(rb_threads[cpu]); 5134 } 5135 if (ret) { 5136 ring_buffer_free(buffer); 5137 return ret; 5138 } 5139 5140 /* Report! */ 5141 pr_info("finished\n"); 5142 for_each_online_cpu(cpu) { 5143 struct ring_buffer_event *event; 5144 struct rb_test_data *data = &rb_data[cpu]; 5145 struct rb_item *item; 5146 unsigned long total_events; 5147 unsigned long total_dropped; 5148 unsigned long total_written; 5149 unsigned long total_alloc; 5150 unsigned long total_read = 0; 5151 unsigned long total_size = 0; 5152 unsigned long total_len = 0; 5153 unsigned long total_lost = 0; 5154 unsigned long lost; 5155 int big_event_size; 5156 int small_event_size; 5157 5158 ret = -1; 5159 5160 total_events = data->events + data->events_nested; 5161 total_written = data->bytes_written + data->bytes_written_nested; 5162 total_alloc = data->bytes_alloc + data->bytes_alloc_nested; 5163 total_dropped = data->bytes_dropped + data->bytes_dropped_nested; 5164 5165 big_event_size = data->max_size + data->max_size_nested; 5166 small_event_size = data->min_size + data->min_size_nested; 5167 5168 pr_info("CPU %d:\n", cpu); 5169 pr_info(" events: %ld\n", total_events); 5170 pr_info(" dropped bytes: %ld\n", total_dropped); 5171 pr_info(" alloced bytes: %ld\n", total_alloc); 5172 pr_info(" written bytes: %ld\n", total_written); 5173 pr_info(" biggest event: %d\n", big_event_size); 5174 pr_info(" smallest event: %d\n", small_event_size); 5175 5176 if (RB_WARN_ON(buffer, total_dropped)) 5177 break; 5178 5179 ret = 0; 5180 5181 while ((event = ring_buffer_consume(buffer, cpu, NULL, &lost))) { 5182 total_lost += lost; 5183 item = ring_buffer_event_data(event); 5184 total_len += ring_buffer_event_length(event); 5185 total_size += item->size + sizeof(struct rb_item); 5186 if (memcmp(&item->str[0], rb_string, item->size) != 0) { 5187 pr_info("FAILED!\n"); 5188 pr_info("buffer had: %.*s\n", item->size, item->str); 5189 pr_info("expected: %.*s\n", item->size, rb_string); 5190 RB_WARN_ON(buffer, 1); 5191 ret = -1; 5192 break; 5193 } 5194 total_read++; 5195 } 5196 if (ret) 5197 break; 5198 5199 ret = -1; 5200 5201 pr_info(" read events: %ld\n", total_read); 5202 pr_info(" lost events: %ld\n", total_lost); 5203 pr_info(" total events: %ld\n", total_lost + total_read); 5204 pr_info(" recorded len bytes: %ld\n", total_len); 5205 pr_info(" recorded size bytes: %ld\n", total_size); 5206 if (total_lost) 5207 pr_info(" With dropped events, record len and size may not match\n" 5208 " alloced and written from above\n"); 5209 if (!total_lost) { 5210 if (RB_WARN_ON(buffer, total_len != total_alloc || 5211 total_size != total_written)) 5212 break; 5213 } 5214 if (RB_WARN_ON(buffer, total_lost + total_read != total_events)) 5215 break; 5216 5217 ret = 0; 5218 } 5219 if (!ret) 5220 pr_info("Ring buffer PASSED!\n"); 5221 5222 ring_buffer_free(buffer); 5223 return 0; 5224 } 5225 5226 late_initcall(test_ringbuffer); 5227 #endif /* CONFIG_RING_BUFFER_STARTUP_TEST */ 5228