xref: /openbmc/linux/kernel/trace/ring_buffer.c (revision b8bb76713ec50df2f11efee386e16f93d51e1076)
1 /*
2  * Generic ring buffer
3  *
4  * Copyright (C) 2008 Steven Rostedt <srostedt@redhat.com>
5  */
6 #include <linux/ring_buffer.h>
7 #include <linux/spinlock.h>
8 #include <linux/debugfs.h>
9 #include <linux/uaccess.h>
10 #include <linux/module.h>
11 #include <linux/percpu.h>
12 #include <linux/mutex.h>
13 #include <linux/sched.h>	/* used for sched_clock() (for now) */
14 #include <linux/init.h>
15 #include <linux/hash.h>
16 #include <linux/list.h>
17 #include <linux/fs.h>
18 
19 #include "trace.h"
20 
21 /*
22  * A fast way to enable or disable all ring buffers is to
23  * call tracing_on or tracing_off. Turning off the ring buffers
24  * prevents all ring buffers from being recorded to.
25  * Turning this switch on, makes it OK to write to the
26  * ring buffer, if the ring buffer is enabled itself.
27  *
28  * There's three layers that must be on in order to write
29  * to the ring buffer.
30  *
31  * 1) This global flag must be set.
32  * 2) The ring buffer must be enabled for recording.
33  * 3) The per cpu buffer must be enabled for recording.
34  *
35  * In case of an anomaly, this global flag has a bit set that
36  * will permantly disable all ring buffers.
37  */
38 
39 /*
40  * Global flag to disable all recording to ring buffers
41  *  This has two bits: ON, DISABLED
42  *
43  *  ON   DISABLED
44  * ---- ----------
45  *   0      0        : ring buffers are off
46  *   1      0        : ring buffers are on
47  *   X      1        : ring buffers are permanently disabled
48  */
49 
50 enum {
51 	RB_BUFFERS_ON_BIT	= 0,
52 	RB_BUFFERS_DISABLED_BIT	= 1,
53 };
54 
55 enum {
56 	RB_BUFFERS_ON		= 1 << RB_BUFFERS_ON_BIT,
57 	RB_BUFFERS_DISABLED	= 1 << RB_BUFFERS_DISABLED_BIT,
58 };
59 
60 static long ring_buffer_flags __read_mostly = RB_BUFFERS_ON;
61 
62 /**
63  * tracing_on - enable all tracing buffers
64  *
65  * This function enables all tracing buffers that may have been
66  * disabled with tracing_off.
67  */
68 void tracing_on(void)
69 {
70 	set_bit(RB_BUFFERS_ON_BIT, &ring_buffer_flags);
71 }
72 EXPORT_SYMBOL_GPL(tracing_on);
73 
74 /**
75  * tracing_off - turn off all tracing buffers
76  *
77  * This function stops all tracing buffers from recording data.
78  * It does not disable any overhead the tracers themselves may
79  * be causing. This function simply causes all recording to
80  * the ring buffers to fail.
81  */
82 void tracing_off(void)
83 {
84 	clear_bit(RB_BUFFERS_ON_BIT, &ring_buffer_flags);
85 }
86 EXPORT_SYMBOL_GPL(tracing_off);
87 
88 /**
89  * tracing_off_permanent - permanently disable ring buffers
90  *
91  * This function, once called, will disable all ring buffers
92  * permanenty.
93  */
94 void tracing_off_permanent(void)
95 {
96 	set_bit(RB_BUFFERS_DISABLED_BIT, &ring_buffer_flags);
97 }
98 
99 #include "trace.h"
100 
101 /* Up this if you want to test the TIME_EXTENTS and normalization */
102 #define DEBUG_SHIFT 0
103 
104 /* FIXME!!! */
105 u64 ring_buffer_time_stamp(int cpu)
106 {
107 	u64 time;
108 
109 	preempt_disable_notrace();
110 	/* shift to debug/test normalization and TIME_EXTENTS */
111 	time = sched_clock() << DEBUG_SHIFT;
112 	preempt_enable_no_resched_notrace();
113 
114 	return time;
115 }
116 EXPORT_SYMBOL_GPL(ring_buffer_time_stamp);
117 
118 void ring_buffer_normalize_time_stamp(int cpu, u64 *ts)
119 {
120 	/* Just stupid testing the normalize function and deltas */
121 	*ts >>= DEBUG_SHIFT;
122 }
123 EXPORT_SYMBOL_GPL(ring_buffer_normalize_time_stamp);
124 
125 #define RB_EVNT_HDR_SIZE (sizeof(struct ring_buffer_event))
126 #define RB_ALIGNMENT_SHIFT	2
127 #define RB_ALIGNMENT		(1 << RB_ALIGNMENT_SHIFT)
128 #define RB_MAX_SMALL_DATA	28
129 
130 enum {
131 	RB_LEN_TIME_EXTEND = 8,
132 	RB_LEN_TIME_STAMP = 16,
133 };
134 
135 /* inline for ring buffer fast paths */
136 static inline unsigned
137 rb_event_length(struct ring_buffer_event *event)
138 {
139 	unsigned length;
140 
141 	switch (event->type) {
142 	case RINGBUF_TYPE_PADDING:
143 		/* undefined */
144 		return -1;
145 
146 	case RINGBUF_TYPE_TIME_EXTEND:
147 		return RB_LEN_TIME_EXTEND;
148 
149 	case RINGBUF_TYPE_TIME_STAMP:
150 		return RB_LEN_TIME_STAMP;
151 
152 	case RINGBUF_TYPE_DATA:
153 		if (event->len)
154 			length = event->len << RB_ALIGNMENT_SHIFT;
155 		else
156 			length = event->array[0];
157 		return length + RB_EVNT_HDR_SIZE;
158 	default:
159 		BUG();
160 	}
161 	/* not hit */
162 	return 0;
163 }
164 
165 /**
166  * ring_buffer_event_length - return the length of the event
167  * @event: the event to get the length of
168  */
169 unsigned ring_buffer_event_length(struct ring_buffer_event *event)
170 {
171 	unsigned length = rb_event_length(event);
172 	if (event->type != RINGBUF_TYPE_DATA)
173 		return length;
174 	length -= RB_EVNT_HDR_SIZE;
175 	if (length > RB_MAX_SMALL_DATA + sizeof(event->array[0]))
176                 length -= sizeof(event->array[0]);
177 	return length;
178 }
179 EXPORT_SYMBOL_GPL(ring_buffer_event_length);
180 
181 /* inline for ring buffer fast paths */
182 static inline void *
183 rb_event_data(struct ring_buffer_event *event)
184 {
185 	BUG_ON(event->type != RINGBUF_TYPE_DATA);
186 	/* If length is in len field, then array[0] has the data */
187 	if (event->len)
188 		return (void *)&event->array[0];
189 	/* Otherwise length is in array[0] and array[1] has the data */
190 	return (void *)&event->array[1];
191 }
192 
193 /**
194  * ring_buffer_event_data - return the data of the event
195  * @event: the event to get the data from
196  */
197 void *ring_buffer_event_data(struct ring_buffer_event *event)
198 {
199 	return rb_event_data(event);
200 }
201 EXPORT_SYMBOL_GPL(ring_buffer_event_data);
202 
203 #define for_each_buffer_cpu(buffer, cpu)		\
204 	for_each_cpu(cpu, buffer->cpumask)
205 
206 #define TS_SHIFT	27
207 #define TS_MASK		((1ULL << TS_SHIFT) - 1)
208 #define TS_DELTA_TEST	(~TS_MASK)
209 
210 struct buffer_data_page {
211 	u64		 time_stamp;	/* page time stamp */
212 	local_t		 commit;	/* write commited index */
213 	unsigned char	 data[];	/* data of buffer page */
214 };
215 
216 struct buffer_page {
217 	local_t		 write;		/* index for next write */
218 	unsigned	 read;		/* index for next read */
219 	struct list_head list;		/* list of free pages */
220 	struct buffer_data_page *page;	/* Actual data page */
221 };
222 
223 static void rb_init_page(struct buffer_data_page *bpage)
224 {
225 	local_set(&bpage->commit, 0);
226 }
227 
228 /*
229  * Also stolen from mm/slob.c. Thanks to Mathieu Desnoyers for pointing
230  * this issue out.
231  */
232 static inline void free_buffer_page(struct buffer_page *bpage)
233 {
234 	if (bpage->page)
235 		free_page((unsigned long)bpage->page);
236 	kfree(bpage);
237 }
238 
239 /*
240  * We need to fit the time_stamp delta into 27 bits.
241  */
242 static inline int test_time_stamp(u64 delta)
243 {
244 	if (delta & TS_DELTA_TEST)
245 		return 1;
246 	return 0;
247 }
248 
249 #define BUF_PAGE_SIZE (PAGE_SIZE - offsetof(struct buffer_data_page, data))
250 
251 /*
252  * head_page == tail_page && head == tail then buffer is empty.
253  */
254 struct ring_buffer_per_cpu {
255 	int				cpu;
256 	struct ring_buffer		*buffer;
257 	spinlock_t			reader_lock; /* serialize readers */
258 	raw_spinlock_t			lock;
259 	struct lock_class_key		lock_key;
260 	struct list_head		pages;
261 	struct buffer_page		*head_page;	/* read from head */
262 	struct buffer_page		*tail_page;	/* write to tail */
263 	struct buffer_page		*commit_page;	/* commited pages */
264 	struct buffer_page		*reader_page;
265 	unsigned long			overrun;
266 	unsigned long			entries;
267 	u64				write_stamp;
268 	u64				read_stamp;
269 	atomic_t			record_disabled;
270 };
271 
272 struct ring_buffer {
273 	unsigned			pages;
274 	unsigned			flags;
275 	int				cpus;
276 	cpumask_var_t			cpumask;
277 	atomic_t			record_disabled;
278 
279 	struct mutex			mutex;
280 
281 	struct ring_buffer_per_cpu	**buffers;
282 };
283 
284 struct ring_buffer_iter {
285 	struct ring_buffer_per_cpu	*cpu_buffer;
286 	unsigned long			head;
287 	struct buffer_page		*head_page;
288 	u64				read_stamp;
289 };
290 
291 /* buffer may be either ring_buffer or ring_buffer_per_cpu */
292 #define RB_WARN_ON(buffer, cond)				\
293 	({							\
294 		int _____ret = unlikely(cond);			\
295 		if (_____ret) {					\
296 			atomic_inc(&buffer->record_disabled);	\
297 			WARN_ON(1);				\
298 		}						\
299 		_____ret;					\
300 	})
301 
302 /**
303  * check_pages - integrity check of buffer pages
304  * @cpu_buffer: CPU buffer with pages to test
305  *
306  * As a safty measure we check to make sure the data pages have not
307  * been corrupted.
308  */
309 static int rb_check_pages(struct ring_buffer_per_cpu *cpu_buffer)
310 {
311 	struct list_head *head = &cpu_buffer->pages;
312 	struct buffer_page *bpage, *tmp;
313 
314 	if (RB_WARN_ON(cpu_buffer, head->next->prev != head))
315 		return -1;
316 	if (RB_WARN_ON(cpu_buffer, head->prev->next != head))
317 		return -1;
318 
319 	list_for_each_entry_safe(bpage, tmp, head, list) {
320 		if (RB_WARN_ON(cpu_buffer,
321 			       bpage->list.next->prev != &bpage->list))
322 			return -1;
323 		if (RB_WARN_ON(cpu_buffer,
324 			       bpage->list.prev->next != &bpage->list))
325 			return -1;
326 	}
327 
328 	return 0;
329 }
330 
331 static int rb_allocate_pages(struct ring_buffer_per_cpu *cpu_buffer,
332 			     unsigned nr_pages)
333 {
334 	struct list_head *head = &cpu_buffer->pages;
335 	struct buffer_page *bpage, *tmp;
336 	unsigned long addr;
337 	LIST_HEAD(pages);
338 	unsigned i;
339 
340 	for (i = 0; i < nr_pages; i++) {
341 		bpage = kzalloc_node(ALIGN(sizeof(*bpage), cache_line_size()),
342 				    GFP_KERNEL, cpu_to_node(cpu_buffer->cpu));
343 		if (!bpage)
344 			goto free_pages;
345 		list_add(&bpage->list, &pages);
346 
347 		addr = __get_free_page(GFP_KERNEL);
348 		if (!addr)
349 			goto free_pages;
350 		bpage->page = (void *)addr;
351 		rb_init_page(bpage->page);
352 	}
353 
354 	list_splice(&pages, head);
355 
356 	rb_check_pages(cpu_buffer);
357 
358 	return 0;
359 
360  free_pages:
361 	list_for_each_entry_safe(bpage, tmp, &pages, list) {
362 		list_del_init(&bpage->list);
363 		free_buffer_page(bpage);
364 	}
365 	return -ENOMEM;
366 }
367 
368 static struct ring_buffer_per_cpu *
369 rb_allocate_cpu_buffer(struct ring_buffer *buffer, int cpu)
370 {
371 	struct ring_buffer_per_cpu *cpu_buffer;
372 	struct buffer_page *bpage;
373 	unsigned long addr;
374 	int ret;
375 
376 	cpu_buffer = kzalloc_node(ALIGN(sizeof(*cpu_buffer), cache_line_size()),
377 				  GFP_KERNEL, cpu_to_node(cpu));
378 	if (!cpu_buffer)
379 		return NULL;
380 
381 	cpu_buffer->cpu = cpu;
382 	cpu_buffer->buffer = buffer;
383 	spin_lock_init(&cpu_buffer->reader_lock);
384 	cpu_buffer->lock = (raw_spinlock_t)__RAW_SPIN_LOCK_UNLOCKED;
385 	INIT_LIST_HEAD(&cpu_buffer->pages);
386 
387 	bpage = kzalloc_node(ALIGN(sizeof(*bpage), cache_line_size()),
388 			    GFP_KERNEL, cpu_to_node(cpu));
389 	if (!bpage)
390 		goto fail_free_buffer;
391 
392 	cpu_buffer->reader_page = bpage;
393 	addr = __get_free_page(GFP_KERNEL);
394 	if (!addr)
395 		goto fail_free_reader;
396 	bpage->page = (void *)addr;
397 	rb_init_page(bpage->page);
398 
399 	INIT_LIST_HEAD(&cpu_buffer->reader_page->list);
400 
401 	ret = rb_allocate_pages(cpu_buffer, buffer->pages);
402 	if (ret < 0)
403 		goto fail_free_reader;
404 
405 	cpu_buffer->head_page
406 		= list_entry(cpu_buffer->pages.next, struct buffer_page, list);
407 	cpu_buffer->tail_page = cpu_buffer->commit_page = cpu_buffer->head_page;
408 
409 	return cpu_buffer;
410 
411  fail_free_reader:
412 	free_buffer_page(cpu_buffer->reader_page);
413 
414  fail_free_buffer:
415 	kfree(cpu_buffer);
416 	return NULL;
417 }
418 
419 static void rb_free_cpu_buffer(struct ring_buffer_per_cpu *cpu_buffer)
420 {
421 	struct list_head *head = &cpu_buffer->pages;
422 	struct buffer_page *bpage, *tmp;
423 
424 	list_del_init(&cpu_buffer->reader_page->list);
425 	free_buffer_page(cpu_buffer->reader_page);
426 
427 	list_for_each_entry_safe(bpage, tmp, head, list) {
428 		list_del_init(&bpage->list);
429 		free_buffer_page(bpage);
430 	}
431 	kfree(cpu_buffer);
432 }
433 
434 /*
435  * Causes compile errors if the struct buffer_page gets bigger
436  * than the struct page.
437  */
438 extern int ring_buffer_page_too_big(void);
439 
440 /**
441  * ring_buffer_alloc - allocate a new ring_buffer
442  * @size: the size in bytes per cpu that is needed.
443  * @flags: attributes to set for the ring buffer.
444  *
445  * Currently the only flag that is available is the RB_FL_OVERWRITE
446  * flag. This flag means that the buffer will overwrite old data
447  * when the buffer wraps. If this flag is not set, the buffer will
448  * drop data when the tail hits the head.
449  */
450 struct ring_buffer *ring_buffer_alloc(unsigned long size, unsigned flags)
451 {
452 	struct ring_buffer *buffer;
453 	int bsize;
454 	int cpu;
455 
456 	/* Paranoid! Optimizes out when all is well */
457 	if (sizeof(struct buffer_page) > sizeof(struct page))
458 		ring_buffer_page_too_big();
459 
460 
461 	/* keep it in its own cache line */
462 	buffer = kzalloc(ALIGN(sizeof(*buffer), cache_line_size()),
463 			 GFP_KERNEL);
464 	if (!buffer)
465 		return NULL;
466 
467 	if (!alloc_cpumask_var(&buffer->cpumask, GFP_KERNEL))
468 		goto fail_free_buffer;
469 
470 	buffer->pages = DIV_ROUND_UP(size, BUF_PAGE_SIZE);
471 	buffer->flags = flags;
472 
473 	/* need at least two pages */
474 	if (buffer->pages == 1)
475 		buffer->pages++;
476 
477 	cpumask_copy(buffer->cpumask, cpu_possible_mask);
478 	buffer->cpus = nr_cpu_ids;
479 
480 	bsize = sizeof(void *) * nr_cpu_ids;
481 	buffer->buffers = kzalloc(ALIGN(bsize, cache_line_size()),
482 				  GFP_KERNEL);
483 	if (!buffer->buffers)
484 		goto fail_free_cpumask;
485 
486 	for_each_buffer_cpu(buffer, cpu) {
487 		buffer->buffers[cpu] =
488 			rb_allocate_cpu_buffer(buffer, cpu);
489 		if (!buffer->buffers[cpu])
490 			goto fail_free_buffers;
491 	}
492 
493 	mutex_init(&buffer->mutex);
494 
495 	return buffer;
496 
497  fail_free_buffers:
498 	for_each_buffer_cpu(buffer, cpu) {
499 		if (buffer->buffers[cpu])
500 			rb_free_cpu_buffer(buffer->buffers[cpu]);
501 	}
502 	kfree(buffer->buffers);
503 
504  fail_free_cpumask:
505 	free_cpumask_var(buffer->cpumask);
506 
507  fail_free_buffer:
508 	kfree(buffer);
509 	return NULL;
510 }
511 EXPORT_SYMBOL_GPL(ring_buffer_alloc);
512 
513 /**
514  * ring_buffer_free - free a ring buffer.
515  * @buffer: the buffer to free.
516  */
517 void
518 ring_buffer_free(struct ring_buffer *buffer)
519 {
520 	int cpu;
521 
522 	for_each_buffer_cpu(buffer, cpu)
523 		rb_free_cpu_buffer(buffer->buffers[cpu]);
524 
525 	free_cpumask_var(buffer->cpumask);
526 
527 	kfree(buffer);
528 }
529 EXPORT_SYMBOL_GPL(ring_buffer_free);
530 
531 static void rb_reset_cpu(struct ring_buffer_per_cpu *cpu_buffer);
532 
533 static void
534 rb_remove_pages(struct ring_buffer_per_cpu *cpu_buffer, unsigned nr_pages)
535 {
536 	struct buffer_page *bpage;
537 	struct list_head *p;
538 	unsigned i;
539 
540 	atomic_inc(&cpu_buffer->record_disabled);
541 	synchronize_sched();
542 
543 	for (i = 0; i < nr_pages; i++) {
544 		if (RB_WARN_ON(cpu_buffer, list_empty(&cpu_buffer->pages)))
545 			return;
546 		p = cpu_buffer->pages.next;
547 		bpage = list_entry(p, struct buffer_page, list);
548 		list_del_init(&bpage->list);
549 		free_buffer_page(bpage);
550 	}
551 	if (RB_WARN_ON(cpu_buffer, list_empty(&cpu_buffer->pages)))
552 		return;
553 
554 	rb_reset_cpu(cpu_buffer);
555 
556 	rb_check_pages(cpu_buffer);
557 
558 	atomic_dec(&cpu_buffer->record_disabled);
559 
560 }
561 
562 static void
563 rb_insert_pages(struct ring_buffer_per_cpu *cpu_buffer,
564 		struct list_head *pages, unsigned nr_pages)
565 {
566 	struct buffer_page *bpage;
567 	struct list_head *p;
568 	unsigned i;
569 
570 	atomic_inc(&cpu_buffer->record_disabled);
571 	synchronize_sched();
572 
573 	for (i = 0; i < nr_pages; i++) {
574 		if (RB_WARN_ON(cpu_buffer, list_empty(pages)))
575 			return;
576 		p = pages->next;
577 		bpage = list_entry(p, struct buffer_page, list);
578 		list_del_init(&bpage->list);
579 		list_add_tail(&bpage->list, &cpu_buffer->pages);
580 	}
581 	rb_reset_cpu(cpu_buffer);
582 
583 	rb_check_pages(cpu_buffer);
584 
585 	atomic_dec(&cpu_buffer->record_disabled);
586 }
587 
588 /**
589  * ring_buffer_resize - resize the ring buffer
590  * @buffer: the buffer to resize.
591  * @size: the new size.
592  *
593  * The tracer is responsible for making sure that the buffer is
594  * not being used while changing the size.
595  * Note: We may be able to change the above requirement by using
596  *  RCU synchronizations.
597  *
598  * Minimum size is 2 * BUF_PAGE_SIZE.
599  *
600  * Returns -1 on failure.
601  */
602 int ring_buffer_resize(struct ring_buffer *buffer, unsigned long size)
603 {
604 	struct ring_buffer_per_cpu *cpu_buffer;
605 	unsigned nr_pages, rm_pages, new_pages;
606 	struct buffer_page *bpage, *tmp;
607 	unsigned long buffer_size;
608 	unsigned long addr;
609 	LIST_HEAD(pages);
610 	int i, cpu;
611 
612 	/*
613 	 * Always succeed at resizing a non-existent buffer:
614 	 */
615 	if (!buffer)
616 		return size;
617 
618 	size = DIV_ROUND_UP(size, BUF_PAGE_SIZE);
619 	size *= BUF_PAGE_SIZE;
620 	buffer_size = buffer->pages * BUF_PAGE_SIZE;
621 
622 	/* we need a minimum of two pages */
623 	if (size < BUF_PAGE_SIZE * 2)
624 		size = BUF_PAGE_SIZE * 2;
625 
626 	if (size == buffer_size)
627 		return size;
628 
629 	mutex_lock(&buffer->mutex);
630 
631 	nr_pages = DIV_ROUND_UP(size, BUF_PAGE_SIZE);
632 
633 	if (size < buffer_size) {
634 
635 		/* easy case, just free pages */
636 		if (RB_WARN_ON(buffer, nr_pages >= buffer->pages)) {
637 			mutex_unlock(&buffer->mutex);
638 			return -1;
639 		}
640 
641 		rm_pages = buffer->pages - nr_pages;
642 
643 		for_each_buffer_cpu(buffer, cpu) {
644 			cpu_buffer = buffer->buffers[cpu];
645 			rb_remove_pages(cpu_buffer, rm_pages);
646 		}
647 		goto out;
648 	}
649 
650 	/*
651 	 * This is a bit more difficult. We only want to add pages
652 	 * when we can allocate enough for all CPUs. We do this
653 	 * by allocating all the pages and storing them on a local
654 	 * link list. If we succeed in our allocation, then we
655 	 * add these pages to the cpu_buffers. Otherwise we just free
656 	 * them all and return -ENOMEM;
657 	 */
658 	if (RB_WARN_ON(buffer, nr_pages <= buffer->pages)) {
659 		mutex_unlock(&buffer->mutex);
660 		return -1;
661 	}
662 
663 	new_pages = nr_pages - buffer->pages;
664 
665 	for_each_buffer_cpu(buffer, cpu) {
666 		for (i = 0; i < new_pages; i++) {
667 			bpage = kzalloc_node(ALIGN(sizeof(*bpage),
668 						  cache_line_size()),
669 					    GFP_KERNEL, cpu_to_node(cpu));
670 			if (!bpage)
671 				goto free_pages;
672 			list_add(&bpage->list, &pages);
673 			addr = __get_free_page(GFP_KERNEL);
674 			if (!addr)
675 				goto free_pages;
676 			bpage->page = (void *)addr;
677 			rb_init_page(bpage->page);
678 		}
679 	}
680 
681 	for_each_buffer_cpu(buffer, cpu) {
682 		cpu_buffer = buffer->buffers[cpu];
683 		rb_insert_pages(cpu_buffer, &pages, new_pages);
684 	}
685 
686 	if (RB_WARN_ON(buffer, !list_empty(&pages))) {
687 		mutex_unlock(&buffer->mutex);
688 		return -1;
689 	}
690 
691  out:
692 	buffer->pages = nr_pages;
693 	mutex_unlock(&buffer->mutex);
694 
695 	return size;
696 
697  free_pages:
698 	list_for_each_entry_safe(bpage, tmp, &pages, list) {
699 		list_del_init(&bpage->list);
700 		free_buffer_page(bpage);
701 	}
702 	mutex_unlock(&buffer->mutex);
703 	return -ENOMEM;
704 }
705 EXPORT_SYMBOL_GPL(ring_buffer_resize);
706 
707 static inline int rb_null_event(struct ring_buffer_event *event)
708 {
709 	return event->type == RINGBUF_TYPE_PADDING;
710 }
711 
712 static inline void *
713 __rb_data_page_index(struct buffer_data_page *bpage, unsigned index)
714 {
715 	return bpage->data + index;
716 }
717 
718 static inline void *__rb_page_index(struct buffer_page *bpage, unsigned index)
719 {
720 	return bpage->page->data + index;
721 }
722 
723 static inline struct ring_buffer_event *
724 rb_reader_event(struct ring_buffer_per_cpu *cpu_buffer)
725 {
726 	return __rb_page_index(cpu_buffer->reader_page,
727 			       cpu_buffer->reader_page->read);
728 }
729 
730 static inline struct ring_buffer_event *
731 rb_head_event(struct ring_buffer_per_cpu *cpu_buffer)
732 {
733 	return __rb_page_index(cpu_buffer->head_page,
734 			       cpu_buffer->head_page->read);
735 }
736 
737 static inline struct ring_buffer_event *
738 rb_iter_head_event(struct ring_buffer_iter *iter)
739 {
740 	return __rb_page_index(iter->head_page, iter->head);
741 }
742 
743 static inline unsigned rb_page_write(struct buffer_page *bpage)
744 {
745 	return local_read(&bpage->write);
746 }
747 
748 static inline unsigned rb_page_commit(struct buffer_page *bpage)
749 {
750 	return local_read(&bpage->page->commit);
751 }
752 
753 /* Size is determined by what has been commited */
754 static inline unsigned rb_page_size(struct buffer_page *bpage)
755 {
756 	return rb_page_commit(bpage);
757 }
758 
759 static inline unsigned
760 rb_commit_index(struct ring_buffer_per_cpu *cpu_buffer)
761 {
762 	return rb_page_commit(cpu_buffer->commit_page);
763 }
764 
765 static inline unsigned rb_head_size(struct ring_buffer_per_cpu *cpu_buffer)
766 {
767 	return rb_page_commit(cpu_buffer->head_page);
768 }
769 
770 /*
771  * When the tail hits the head and the buffer is in overwrite mode,
772  * the head jumps to the next page and all content on the previous
773  * page is discarded. But before doing so, we update the overrun
774  * variable of the buffer.
775  */
776 static void rb_update_overflow(struct ring_buffer_per_cpu *cpu_buffer)
777 {
778 	struct ring_buffer_event *event;
779 	unsigned long head;
780 
781 	for (head = 0; head < rb_head_size(cpu_buffer);
782 	     head += rb_event_length(event)) {
783 
784 		event = __rb_page_index(cpu_buffer->head_page, head);
785 		if (RB_WARN_ON(cpu_buffer, rb_null_event(event)))
786 			return;
787 		/* Only count data entries */
788 		if (event->type != RINGBUF_TYPE_DATA)
789 			continue;
790 		cpu_buffer->overrun++;
791 		cpu_buffer->entries--;
792 	}
793 }
794 
795 static inline void rb_inc_page(struct ring_buffer_per_cpu *cpu_buffer,
796 			       struct buffer_page **bpage)
797 {
798 	struct list_head *p = (*bpage)->list.next;
799 
800 	if (p == &cpu_buffer->pages)
801 		p = p->next;
802 
803 	*bpage = list_entry(p, struct buffer_page, list);
804 }
805 
806 static inline unsigned
807 rb_event_index(struct ring_buffer_event *event)
808 {
809 	unsigned long addr = (unsigned long)event;
810 
811 	return (addr & ~PAGE_MASK) - (PAGE_SIZE - BUF_PAGE_SIZE);
812 }
813 
814 static inline int
815 rb_is_commit(struct ring_buffer_per_cpu *cpu_buffer,
816 	     struct ring_buffer_event *event)
817 {
818 	unsigned long addr = (unsigned long)event;
819 	unsigned long index;
820 
821 	index = rb_event_index(event);
822 	addr &= PAGE_MASK;
823 
824 	return cpu_buffer->commit_page->page == (void *)addr &&
825 		rb_commit_index(cpu_buffer) == index;
826 }
827 
828 static inline void
829 rb_set_commit_event(struct ring_buffer_per_cpu *cpu_buffer,
830 		    struct ring_buffer_event *event)
831 {
832 	unsigned long addr = (unsigned long)event;
833 	unsigned long index;
834 
835 	index = rb_event_index(event);
836 	addr &= PAGE_MASK;
837 
838 	while (cpu_buffer->commit_page->page != (void *)addr) {
839 		if (RB_WARN_ON(cpu_buffer,
840 			  cpu_buffer->commit_page == cpu_buffer->tail_page))
841 			return;
842 		cpu_buffer->commit_page->page->commit =
843 			cpu_buffer->commit_page->write;
844 		rb_inc_page(cpu_buffer, &cpu_buffer->commit_page);
845 		cpu_buffer->write_stamp =
846 			cpu_buffer->commit_page->page->time_stamp;
847 	}
848 
849 	/* Now set the commit to the event's index */
850 	local_set(&cpu_buffer->commit_page->page->commit, index);
851 }
852 
853 static inline void
854 rb_set_commit_to_write(struct ring_buffer_per_cpu *cpu_buffer)
855 {
856 	/*
857 	 * We only race with interrupts and NMIs on this CPU.
858 	 * If we own the commit event, then we can commit
859 	 * all others that interrupted us, since the interruptions
860 	 * are in stack format (they finish before they come
861 	 * back to us). This allows us to do a simple loop to
862 	 * assign the commit to the tail.
863 	 */
864  again:
865 	while (cpu_buffer->commit_page != cpu_buffer->tail_page) {
866 		cpu_buffer->commit_page->page->commit =
867 			cpu_buffer->commit_page->write;
868 		rb_inc_page(cpu_buffer, &cpu_buffer->commit_page);
869 		cpu_buffer->write_stamp =
870 			cpu_buffer->commit_page->page->time_stamp;
871 		/* add barrier to keep gcc from optimizing too much */
872 		barrier();
873 	}
874 	while (rb_commit_index(cpu_buffer) !=
875 	       rb_page_write(cpu_buffer->commit_page)) {
876 		cpu_buffer->commit_page->page->commit =
877 			cpu_buffer->commit_page->write;
878 		barrier();
879 	}
880 
881 	/* again, keep gcc from optimizing */
882 	barrier();
883 
884 	/*
885 	 * If an interrupt came in just after the first while loop
886 	 * and pushed the tail page forward, we will be left with
887 	 * a dangling commit that will never go forward.
888 	 */
889 	if (unlikely(cpu_buffer->commit_page != cpu_buffer->tail_page))
890 		goto again;
891 }
892 
893 static void rb_reset_reader_page(struct ring_buffer_per_cpu *cpu_buffer)
894 {
895 	cpu_buffer->read_stamp = cpu_buffer->reader_page->page->time_stamp;
896 	cpu_buffer->reader_page->read = 0;
897 }
898 
899 static inline void rb_inc_iter(struct ring_buffer_iter *iter)
900 {
901 	struct ring_buffer_per_cpu *cpu_buffer = iter->cpu_buffer;
902 
903 	/*
904 	 * The iterator could be on the reader page (it starts there).
905 	 * But the head could have moved, since the reader was
906 	 * found. Check for this case and assign the iterator
907 	 * to the head page instead of next.
908 	 */
909 	if (iter->head_page == cpu_buffer->reader_page)
910 		iter->head_page = cpu_buffer->head_page;
911 	else
912 		rb_inc_page(cpu_buffer, &iter->head_page);
913 
914 	iter->read_stamp = iter->head_page->page->time_stamp;
915 	iter->head = 0;
916 }
917 
918 /**
919  * ring_buffer_update_event - update event type and data
920  * @event: the even to update
921  * @type: the type of event
922  * @length: the size of the event field in the ring buffer
923  *
924  * Update the type and data fields of the event. The length
925  * is the actual size that is written to the ring buffer,
926  * and with this, we can determine what to place into the
927  * data field.
928  */
929 static inline void
930 rb_update_event(struct ring_buffer_event *event,
931 			 unsigned type, unsigned length)
932 {
933 	event->type = type;
934 
935 	switch (type) {
936 
937 	case RINGBUF_TYPE_PADDING:
938 		break;
939 
940 	case RINGBUF_TYPE_TIME_EXTEND:
941 		event->len =
942 			(RB_LEN_TIME_EXTEND + (RB_ALIGNMENT-1))
943 			>> RB_ALIGNMENT_SHIFT;
944 		break;
945 
946 	case RINGBUF_TYPE_TIME_STAMP:
947 		event->len =
948 			(RB_LEN_TIME_STAMP + (RB_ALIGNMENT-1))
949 			>> RB_ALIGNMENT_SHIFT;
950 		break;
951 
952 	case RINGBUF_TYPE_DATA:
953 		length -= RB_EVNT_HDR_SIZE;
954 		if (length > RB_MAX_SMALL_DATA) {
955 			event->len = 0;
956 			event->array[0] = length;
957 		} else
958 			event->len =
959 				(length + (RB_ALIGNMENT-1))
960 				>> RB_ALIGNMENT_SHIFT;
961 		break;
962 	default:
963 		BUG();
964 	}
965 }
966 
967 static inline unsigned rb_calculate_event_length(unsigned length)
968 {
969 	struct ring_buffer_event event; /* Used only for sizeof array */
970 
971 	/* zero length can cause confusions */
972 	if (!length)
973 		length = 1;
974 
975 	if (length > RB_MAX_SMALL_DATA)
976 		length += sizeof(event.array[0]);
977 
978 	length += RB_EVNT_HDR_SIZE;
979 	length = ALIGN(length, RB_ALIGNMENT);
980 
981 	return length;
982 }
983 
984 static struct ring_buffer_event *
985 __rb_reserve_next(struct ring_buffer_per_cpu *cpu_buffer,
986 		  unsigned type, unsigned long length, u64 *ts)
987 {
988 	struct buffer_page *tail_page, *head_page, *reader_page, *commit_page;
989 	unsigned long tail, write;
990 	struct ring_buffer *buffer = cpu_buffer->buffer;
991 	struct ring_buffer_event *event;
992 	unsigned long flags;
993 
994 	commit_page = cpu_buffer->commit_page;
995 	/* we just need to protect against interrupts */
996 	barrier();
997 	tail_page = cpu_buffer->tail_page;
998 	write = local_add_return(length, &tail_page->write);
999 	tail = write - length;
1000 
1001 	/* See if we shot pass the end of this buffer page */
1002 	if (write > BUF_PAGE_SIZE) {
1003 		struct buffer_page *next_page = tail_page;
1004 
1005 		local_irq_save(flags);
1006 		__raw_spin_lock(&cpu_buffer->lock);
1007 
1008 		rb_inc_page(cpu_buffer, &next_page);
1009 
1010 		head_page = cpu_buffer->head_page;
1011 		reader_page = cpu_buffer->reader_page;
1012 
1013 		/* we grabbed the lock before incrementing */
1014 		if (RB_WARN_ON(cpu_buffer, next_page == reader_page))
1015 			goto out_unlock;
1016 
1017 		/*
1018 		 * If for some reason, we had an interrupt storm that made
1019 		 * it all the way around the buffer, bail, and warn
1020 		 * about it.
1021 		 */
1022 		if (unlikely(next_page == commit_page)) {
1023 			WARN_ON_ONCE(1);
1024 			goto out_unlock;
1025 		}
1026 
1027 		if (next_page == head_page) {
1028 			if (!(buffer->flags & RB_FL_OVERWRITE))
1029 				goto out_unlock;
1030 
1031 			/* tail_page has not moved yet? */
1032 			if (tail_page == cpu_buffer->tail_page) {
1033 				/* count overflows */
1034 				rb_update_overflow(cpu_buffer);
1035 
1036 				rb_inc_page(cpu_buffer, &head_page);
1037 				cpu_buffer->head_page = head_page;
1038 				cpu_buffer->head_page->read = 0;
1039 			}
1040 		}
1041 
1042 		/*
1043 		 * If the tail page is still the same as what we think
1044 		 * it is, then it is up to us to update the tail
1045 		 * pointer.
1046 		 */
1047 		if (tail_page == cpu_buffer->tail_page) {
1048 			local_set(&next_page->write, 0);
1049 			local_set(&next_page->page->commit, 0);
1050 			cpu_buffer->tail_page = next_page;
1051 
1052 			/* reread the time stamp */
1053 			*ts = ring_buffer_time_stamp(cpu_buffer->cpu);
1054 			cpu_buffer->tail_page->page->time_stamp = *ts;
1055 		}
1056 
1057 		/*
1058 		 * The actual tail page has moved forward.
1059 		 */
1060 		if (tail < BUF_PAGE_SIZE) {
1061 			/* Mark the rest of the page with padding */
1062 			event = __rb_page_index(tail_page, tail);
1063 			event->type = RINGBUF_TYPE_PADDING;
1064 		}
1065 
1066 		if (tail <= BUF_PAGE_SIZE)
1067 			/* Set the write back to the previous setting */
1068 			local_set(&tail_page->write, tail);
1069 
1070 		/*
1071 		 * If this was a commit entry that failed,
1072 		 * increment that too
1073 		 */
1074 		if (tail_page == cpu_buffer->commit_page &&
1075 		    tail == rb_commit_index(cpu_buffer)) {
1076 			rb_set_commit_to_write(cpu_buffer);
1077 		}
1078 
1079 		__raw_spin_unlock(&cpu_buffer->lock);
1080 		local_irq_restore(flags);
1081 
1082 		/* fail and let the caller try again */
1083 		return ERR_PTR(-EAGAIN);
1084 	}
1085 
1086 	/* We reserved something on the buffer */
1087 
1088 	if (RB_WARN_ON(cpu_buffer, write > BUF_PAGE_SIZE))
1089 		return NULL;
1090 
1091 	event = __rb_page_index(tail_page, tail);
1092 	rb_update_event(event, type, length);
1093 
1094 	/*
1095 	 * If this is a commit and the tail is zero, then update
1096 	 * this page's time stamp.
1097 	 */
1098 	if (!tail && rb_is_commit(cpu_buffer, event))
1099 		cpu_buffer->commit_page->page->time_stamp = *ts;
1100 
1101 	return event;
1102 
1103  out_unlock:
1104 	/* reset write */
1105 	if (tail <= BUF_PAGE_SIZE)
1106 		local_set(&tail_page->write, tail);
1107 
1108 	__raw_spin_unlock(&cpu_buffer->lock);
1109 	local_irq_restore(flags);
1110 	return NULL;
1111 }
1112 
1113 static int
1114 rb_add_time_stamp(struct ring_buffer_per_cpu *cpu_buffer,
1115 		  u64 *ts, u64 *delta)
1116 {
1117 	struct ring_buffer_event *event;
1118 	static int once;
1119 	int ret;
1120 
1121 	if (unlikely(*delta > (1ULL << 59) && !once++)) {
1122 		printk(KERN_WARNING "Delta way too big! %llu"
1123 		       " ts=%llu write stamp = %llu\n",
1124 		       (unsigned long long)*delta,
1125 		       (unsigned long long)*ts,
1126 		       (unsigned long long)cpu_buffer->write_stamp);
1127 		WARN_ON(1);
1128 	}
1129 
1130 	/*
1131 	 * The delta is too big, we to add a
1132 	 * new timestamp.
1133 	 */
1134 	event = __rb_reserve_next(cpu_buffer,
1135 				  RINGBUF_TYPE_TIME_EXTEND,
1136 				  RB_LEN_TIME_EXTEND,
1137 				  ts);
1138 	if (!event)
1139 		return -EBUSY;
1140 
1141 	if (PTR_ERR(event) == -EAGAIN)
1142 		return -EAGAIN;
1143 
1144 	/* Only a commited time event can update the write stamp */
1145 	if (rb_is_commit(cpu_buffer, event)) {
1146 		/*
1147 		 * If this is the first on the page, then we need to
1148 		 * update the page itself, and just put in a zero.
1149 		 */
1150 		if (rb_event_index(event)) {
1151 			event->time_delta = *delta & TS_MASK;
1152 			event->array[0] = *delta >> TS_SHIFT;
1153 		} else {
1154 			cpu_buffer->commit_page->page->time_stamp = *ts;
1155 			event->time_delta = 0;
1156 			event->array[0] = 0;
1157 		}
1158 		cpu_buffer->write_stamp = *ts;
1159 		/* let the caller know this was the commit */
1160 		ret = 1;
1161 	} else {
1162 		/* Darn, this is just wasted space */
1163 		event->time_delta = 0;
1164 		event->array[0] = 0;
1165 		ret = 0;
1166 	}
1167 
1168 	*delta = 0;
1169 
1170 	return ret;
1171 }
1172 
1173 static struct ring_buffer_event *
1174 rb_reserve_next_event(struct ring_buffer_per_cpu *cpu_buffer,
1175 		      unsigned type, unsigned long length)
1176 {
1177 	struct ring_buffer_event *event;
1178 	u64 ts, delta;
1179 	int commit = 0;
1180 	int nr_loops = 0;
1181 
1182  again:
1183 	/*
1184 	 * We allow for interrupts to reenter here and do a trace.
1185 	 * If one does, it will cause this original code to loop
1186 	 * back here. Even with heavy interrupts happening, this
1187 	 * should only happen a few times in a row. If this happens
1188 	 * 1000 times in a row, there must be either an interrupt
1189 	 * storm or we have something buggy.
1190 	 * Bail!
1191 	 */
1192 	if (RB_WARN_ON(cpu_buffer, ++nr_loops > 1000))
1193 		return NULL;
1194 
1195 	ts = ring_buffer_time_stamp(cpu_buffer->cpu);
1196 
1197 	/*
1198 	 * Only the first commit can update the timestamp.
1199 	 * Yes there is a race here. If an interrupt comes in
1200 	 * just after the conditional and it traces too, then it
1201 	 * will also check the deltas. More than one timestamp may
1202 	 * also be made. But only the entry that did the actual
1203 	 * commit will be something other than zero.
1204 	 */
1205 	if (cpu_buffer->tail_page == cpu_buffer->commit_page &&
1206 	    rb_page_write(cpu_buffer->tail_page) ==
1207 	    rb_commit_index(cpu_buffer)) {
1208 
1209 		delta = ts - cpu_buffer->write_stamp;
1210 
1211 		/* make sure this delta is calculated here */
1212 		barrier();
1213 
1214 		/* Did the write stamp get updated already? */
1215 		if (unlikely(ts < cpu_buffer->write_stamp))
1216 			delta = 0;
1217 
1218 		if (test_time_stamp(delta)) {
1219 
1220 			commit = rb_add_time_stamp(cpu_buffer, &ts, &delta);
1221 
1222 			if (commit == -EBUSY)
1223 				return NULL;
1224 
1225 			if (commit == -EAGAIN)
1226 				goto again;
1227 
1228 			RB_WARN_ON(cpu_buffer, commit < 0);
1229 		}
1230 	} else
1231 		/* Non commits have zero deltas */
1232 		delta = 0;
1233 
1234 	event = __rb_reserve_next(cpu_buffer, type, length, &ts);
1235 	if (PTR_ERR(event) == -EAGAIN)
1236 		goto again;
1237 
1238 	if (!event) {
1239 		if (unlikely(commit))
1240 			/*
1241 			 * Ouch! We needed a timestamp and it was commited. But
1242 			 * we didn't get our event reserved.
1243 			 */
1244 			rb_set_commit_to_write(cpu_buffer);
1245 		return NULL;
1246 	}
1247 
1248 	/*
1249 	 * If the timestamp was commited, make the commit our entry
1250 	 * now so that we will update it when needed.
1251 	 */
1252 	if (commit)
1253 		rb_set_commit_event(cpu_buffer, event);
1254 	else if (!rb_is_commit(cpu_buffer, event))
1255 		delta = 0;
1256 
1257 	event->time_delta = delta;
1258 
1259 	return event;
1260 }
1261 
1262 static DEFINE_PER_CPU(int, rb_need_resched);
1263 
1264 /**
1265  * ring_buffer_lock_reserve - reserve a part of the buffer
1266  * @buffer: the ring buffer to reserve from
1267  * @length: the length of the data to reserve (excluding event header)
1268  * @flags: a pointer to save the interrupt flags
1269  *
1270  * Returns a reseverd event on the ring buffer to copy directly to.
1271  * The user of this interface will need to get the body to write into
1272  * and can use the ring_buffer_event_data() interface.
1273  *
1274  * The length is the length of the data needed, not the event length
1275  * which also includes the event header.
1276  *
1277  * Must be paired with ring_buffer_unlock_commit, unless NULL is returned.
1278  * If NULL is returned, then nothing has been allocated or locked.
1279  */
1280 struct ring_buffer_event *
1281 ring_buffer_lock_reserve(struct ring_buffer *buffer,
1282 			 unsigned long length,
1283 			 unsigned long *flags)
1284 {
1285 	struct ring_buffer_per_cpu *cpu_buffer;
1286 	struct ring_buffer_event *event;
1287 	int cpu, resched;
1288 
1289 	if (ring_buffer_flags != RB_BUFFERS_ON)
1290 		return NULL;
1291 
1292 	if (atomic_read(&buffer->record_disabled))
1293 		return NULL;
1294 
1295 	/* If we are tracing schedule, we don't want to recurse */
1296 	resched = ftrace_preempt_disable();
1297 
1298 	cpu = raw_smp_processor_id();
1299 
1300 	if (!cpumask_test_cpu(cpu, buffer->cpumask))
1301 		goto out;
1302 
1303 	cpu_buffer = buffer->buffers[cpu];
1304 
1305 	if (atomic_read(&cpu_buffer->record_disabled))
1306 		goto out;
1307 
1308 	length = rb_calculate_event_length(length);
1309 	if (length > BUF_PAGE_SIZE)
1310 		goto out;
1311 
1312 	event = rb_reserve_next_event(cpu_buffer, RINGBUF_TYPE_DATA, length);
1313 	if (!event)
1314 		goto out;
1315 
1316 	/*
1317 	 * Need to store resched state on this cpu.
1318 	 * Only the first needs to.
1319 	 */
1320 
1321 	if (preempt_count() == 1)
1322 		per_cpu(rb_need_resched, cpu) = resched;
1323 
1324 	return event;
1325 
1326  out:
1327 	ftrace_preempt_enable(resched);
1328 	return NULL;
1329 }
1330 EXPORT_SYMBOL_GPL(ring_buffer_lock_reserve);
1331 
1332 static void rb_commit(struct ring_buffer_per_cpu *cpu_buffer,
1333 		      struct ring_buffer_event *event)
1334 {
1335 	cpu_buffer->entries++;
1336 
1337 	/* Only process further if we own the commit */
1338 	if (!rb_is_commit(cpu_buffer, event))
1339 		return;
1340 
1341 	cpu_buffer->write_stamp += event->time_delta;
1342 
1343 	rb_set_commit_to_write(cpu_buffer);
1344 }
1345 
1346 /**
1347  * ring_buffer_unlock_commit - commit a reserved
1348  * @buffer: The buffer to commit to
1349  * @event: The event pointer to commit.
1350  * @flags: the interrupt flags received from ring_buffer_lock_reserve.
1351  *
1352  * This commits the data to the ring buffer, and releases any locks held.
1353  *
1354  * Must be paired with ring_buffer_lock_reserve.
1355  */
1356 int ring_buffer_unlock_commit(struct ring_buffer *buffer,
1357 			      struct ring_buffer_event *event,
1358 			      unsigned long flags)
1359 {
1360 	struct ring_buffer_per_cpu *cpu_buffer;
1361 	int cpu = raw_smp_processor_id();
1362 
1363 	cpu_buffer = buffer->buffers[cpu];
1364 
1365 	rb_commit(cpu_buffer, event);
1366 
1367 	/*
1368 	 * Only the last preempt count needs to restore preemption.
1369 	 */
1370 	if (preempt_count() == 1)
1371 		ftrace_preempt_enable(per_cpu(rb_need_resched, cpu));
1372 	else
1373 		preempt_enable_no_resched_notrace();
1374 
1375 	return 0;
1376 }
1377 EXPORT_SYMBOL_GPL(ring_buffer_unlock_commit);
1378 
1379 /**
1380  * ring_buffer_write - write data to the buffer without reserving
1381  * @buffer: The ring buffer to write to.
1382  * @length: The length of the data being written (excluding the event header)
1383  * @data: The data to write to the buffer.
1384  *
1385  * This is like ring_buffer_lock_reserve and ring_buffer_unlock_commit as
1386  * one function. If you already have the data to write to the buffer, it
1387  * may be easier to simply call this function.
1388  *
1389  * Note, like ring_buffer_lock_reserve, the length is the length of the data
1390  * and not the length of the event which would hold the header.
1391  */
1392 int ring_buffer_write(struct ring_buffer *buffer,
1393 			unsigned long length,
1394 			void *data)
1395 {
1396 	struct ring_buffer_per_cpu *cpu_buffer;
1397 	struct ring_buffer_event *event;
1398 	unsigned long event_length;
1399 	void *body;
1400 	int ret = -EBUSY;
1401 	int cpu, resched;
1402 
1403 	if (ring_buffer_flags != RB_BUFFERS_ON)
1404 		return -EBUSY;
1405 
1406 	if (atomic_read(&buffer->record_disabled))
1407 		return -EBUSY;
1408 
1409 	resched = ftrace_preempt_disable();
1410 
1411 	cpu = raw_smp_processor_id();
1412 
1413 	if (!cpumask_test_cpu(cpu, buffer->cpumask))
1414 		goto out;
1415 
1416 	cpu_buffer = buffer->buffers[cpu];
1417 
1418 	if (atomic_read(&cpu_buffer->record_disabled))
1419 		goto out;
1420 
1421 	event_length = rb_calculate_event_length(length);
1422 	event = rb_reserve_next_event(cpu_buffer,
1423 				      RINGBUF_TYPE_DATA, event_length);
1424 	if (!event)
1425 		goto out;
1426 
1427 	body = rb_event_data(event);
1428 
1429 	memcpy(body, data, length);
1430 
1431 	rb_commit(cpu_buffer, event);
1432 
1433 	ret = 0;
1434  out:
1435 	ftrace_preempt_enable(resched);
1436 
1437 	return ret;
1438 }
1439 EXPORT_SYMBOL_GPL(ring_buffer_write);
1440 
1441 static inline int rb_per_cpu_empty(struct ring_buffer_per_cpu *cpu_buffer)
1442 {
1443 	struct buffer_page *reader = cpu_buffer->reader_page;
1444 	struct buffer_page *head = cpu_buffer->head_page;
1445 	struct buffer_page *commit = cpu_buffer->commit_page;
1446 
1447 	return reader->read == rb_page_commit(reader) &&
1448 		(commit == reader ||
1449 		 (commit == head &&
1450 		  head->read == rb_page_commit(commit)));
1451 }
1452 
1453 /**
1454  * ring_buffer_record_disable - stop all writes into the buffer
1455  * @buffer: The ring buffer to stop writes to.
1456  *
1457  * This prevents all writes to the buffer. Any attempt to write
1458  * to the buffer after this will fail and return NULL.
1459  *
1460  * The caller should call synchronize_sched() after this.
1461  */
1462 void ring_buffer_record_disable(struct ring_buffer *buffer)
1463 {
1464 	atomic_inc(&buffer->record_disabled);
1465 }
1466 EXPORT_SYMBOL_GPL(ring_buffer_record_disable);
1467 
1468 /**
1469  * ring_buffer_record_enable - enable writes to the buffer
1470  * @buffer: The ring buffer to enable writes
1471  *
1472  * Note, multiple disables will need the same number of enables
1473  * to truely enable the writing (much like preempt_disable).
1474  */
1475 void ring_buffer_record_enable(struct ring_buffer *buffer)
1476 {
1477 	atomic_dec(&buffer->record_disabled);
1478 }
1479 EXPORT_SYMBOL_GPL(ring_buffer_record_enable);
1480 
1481 /**
1482  * ring_buffer_record_disable_cpu - stop all writes into the cpu_buffer
1483  * @buffer: The ring buffer to stop writes to.
1484  * @cpu: The CPU buffer to stop
1485  *
1486  * This prevents all writes to the buffer. Any attempt to write
1487  * to the buffer after this will fail and return NULL.
1488  *
1489  * The caller should call synchronize_sched() after this.
1490  */
1491 void ring_buffer_record_disable_cpu(struct ring_buffer *buffer, int cpu)
1492 {
1493 	struct ring_buffer_per_cpu *cpu_buffer;
1494 
1495 	if (!cpumask_test_cpu(cpu, buffer->cpumask))
1496 		return;
1497 
1498 	cpu_buffer = buffer->buffers[cpu];
1499 	atomic_inc(&cpu_buffer->record_disabled);
1500 }
1501 EXPORT_SYMBOL_GPL(ring_buffer_record_disable_cpu);
1502 
1503 /**
1504  * ring_buffer_record_enable_cpu - enable writes to the buffer
1505  * @buffer: The ring buffer to enable writes
1506  * @cpu: The CPU to enable.
1507  *
1508  * Note, multiple disables will need the same number of enables
1509  * to truely enable the writing (much like preempt_disable).
1510  */
1511 void ring_buffer_record_enable_cpu(struct ring_buffer *buffer, int cpu)
1512 {
1513 	struct ring_buffer_per_cpu *cpu_buffer;
1514 
1515 	if (!cpumask_test_cpu(cpu, buffer->cpumask))
1516 		return;
1517 
1518 	cpu_buffer = buffer->buffers[cpu];
1519 	atomic_dec(&cpu_buffer->record_disabled);
1520 }
1521 EXPORT_SYMBOL_GPL(ring_buffer_record_enable_cpu);
1522 
1523 /**
1524  * ring_buffer_entries_cpu - get the number of entries in a cpu buffer
1525  * @buffer: The ring buffer
1526  * @cpu: The per CPU buffer to get the entries from.
1527  */
1528 unsigned long ring_buffer_entries_cpu(struct ring_buffer *buffer, int cpu)
1529 {
1530 	struct ring_buffer_per_cpu *cpu_buffer;
1531 
1532 	if (!cpumask_test_cpu(cpu, buffer->cpumask))
1533 		return 0;
1534 
1535 	cpu_buffer = buffer->buffers[cpu];
1536 	return cpu_buffer->entries;
1537 }
1538 EXPORT_SYMBOL_GPL(ring_buffer_entries_cpu);
1539 
1540 /**
1541  * ring_buffer_overrun_cpu - get the number of overruns in a cpu_buffer
1542  * @buffer: The ring buffer
1543  * @cpu: The per CPU buffer to get the number of overruns from
1544  */
1545 unsigned long ring_buffer_overrun_cpu(struct ring_buffer *buffer, int cpu)
1546 {
1547 	struct ring_buffer_per_cpu *cpu_buffer;
1548 
1549 	if (!cpumask_test_cpu(cpu, buffer->cpumask))
1550 		return 0;
1551 
1552 	cpu_buffer = buffer->buffers[cpu];
1553 	return cpu_buffer->overrun;
1554 }
1555 EXPORT_SYMBOL_GPL(ring_buffer_overrun_cpu);
1556 
1557 /**
1558  * ring_buffer_entries - get the number of entries in a buffer
1559  * @buffer: The ring buffer
1560  *
1561  * Returns the total number of entries in the ring buffer
1562  * (all CPU entries)
1563  */
1564 unsigned long ring_buffer_entries(struct ring_buffer *buffer)
1565 {
1566 	struct ring_buffer_per_cpu *cpu_buffer;
1567 	unsigned long entries = 0;
1568 	int cpu;
1569 
1570 	/* if you care about this being correct, lock the buffer */
1571 	for_each_buffer_cpu(buffer, cpu) {
1572 		cpu_buffer = buffer->buffers[cpu];
1573 		entries += cpu_buffer->entries;
1574 	}
1575 
1576 	return entries;
1577 }
1578 EXPORT_SYMBOL_GPL(ring_buffer_entries);
1579 
1580 /**
1581  * ring_buffer_overrun_cpu - get the number of overruns in buffer
1582  * @buffer: The ring buffer
1583  *
1584  * Returns the total number of overruns in the ring buffer
1585  * (all CPU entries)
1586  */
1587 unsigned long ring_buffer_overruns(struct ring_buffer *buffer)
1588 {
1589 	struct ring_buffer_per_cpu *cpu_buffer;
1590 	unsigned long overruns = 0;
1591 	int cpu;
1592 
1593 	/* if you care about this being correct, lock the buffer */
1594 	for_each_buffer_cpu(buffer, cpu) {
1595 		cpu_buffer = buffer->buffers[cpu];
1596 		overruns += cpu_buffer->overrun;
1597 	}
1598 
1599 	return overruns;
1600 }
1601 EXPORT_SYMBOL_GPL(ring_buffer_overruns);
1602 
1603 static void rb_iter_reset(struct ring_buffer_iter *iter)
1604 {
1605 	struct ring_buffer_per_cpu *cpu_buffer = iter->cpu_buffer;
1606 
1607 	/* Iterator usage is expected to have record disabled */
1608 	if (list_empty(&cpu_buffer->reader_page->list)) {
1609 		iter->head_page = cpu_buffer->head_page;
1610 		iter->head = cpu_buffer->head_page->read;
1611 	} else {
1612 		iter->head_page = cpu_buffer->reader_page;
1613 		iter->head = cpu_buffer->reader_page->read;
1614 	}
1615 	if (iter->head)
1616 		iter->read_stamp = cpu_buffer->read_stamp;
1617 	else
1618 		iter->read_stamp = iter->head_page->page->time_stamp;
1619 }
1620 
1621 /**
1622  * ring_buffer_iter_reset - reset an iterator
1623  * @iter: The iterator to reset
1624  *
1625  * Resets the iterator, so that it will start from the beginning
1626  * again.
1627  */
1628 void ring_buffer_iter_reset(struct ring_buffer_iter *iter)
1629 {
1630 	struct ring_buffer_per_cpu *cpu_buffer = iter->cpu_buffer;
1631 	unsigned long flags;
1632 
1633 	spin_lock_irqsave(&cpu_buffer->reader_lock, flags);
1634 	rb_iter_reset(iter);
1635 	spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags);
1636 }
1637 EXPORT_SYMBOL_GPL(ring_buffer_iter_reset);
1638 
1639 /**
1640  * ring_buffer_iter_empty - check if an iterator has no more to read
1641  * @iter: The iterator to check
1642  */
1643 int ring_buffer_iter_empty(struct ring_buffer_iter *iter)
1644 {
1645 	struct ring_buffer_per_cpu *cpu_buffer;
1646 
1647 	cpu_buffer = iter->cpu_buffer;
1648 
1649 	return iter->head_page == cpu_buffer->commit_page &&
1650 		iter->head == rb_commit_index(cpu_buffer);
1651 }
1652 EXPORT_SYMBOL_GPL(ring_buffer_iter_empty);
1653 
1654 static void
1655 rb_update_read_stamp(struct ring_buffer_per_cpu *cpu_buffer,
1656 		     struct ring_buffer_event *event)
1657 {
1658 	u64 delta;
1659 
1660 	switch (event->type) {
1661 	case RINGBUF_TYPE_PADDING:
1662 		return;
1663 
1664 	case RINGBUF_TYPE_TIME_EXTEND:
1665 		delta = event->array[0];
1666 		delta <<= TS_SHIFT;
1667 		delta += event->time_delta;
1668 		cpu_buffer->read_stamp += delta;
1669 		return;
1670 
1671 	case RINGBUF_TYPE_TIME_STAMP:
1672 		/* FIXME: not implemented */
1673 		return;
1674 
1675 	case RINGBUF_TYPE_DATA:
1676 		cpu_buffer->read_stamp += event->time_delta;
1677 		return;
1678 
1679 	default:
1680 		BUG();
1681 	}
1682 	return;
1683 }
1684 
1685 static void
1686 rb_update_iter_read_stamp(struct ring_buffer_iter *iter,
1687 			  struct ring_buffer_event *event)
1688 {
1689 	u64 delta;
1690 
1691 	switch (event->type) {
1692 	case RINGBUF_TYPE_PADDING:
1693 		return;
1694 
1695 	case RINGBUF_TYPE_TIME_EXTEND:
1696 		delta = event->array[0];
1697 		delta <<= TS_SHIFT;
1698 		delta += event->time_delta;
1699 		iter->read_stamp += delta;
1700 		return;
1701 
1702 	case RINGBUF_TYPE_TIME_STAMP:
1703 		/* FIXME: not implemented */
1704 		return;
1705 
1706 	case RINGBUF_TYPE_DATA:
1707 		iter->read_stamp += event->time_delta;
1708 		return;
1709 
1710 	default:
1711 		BUG();
1712 	}
1713 	return;
1714 }
1715 
1716 static struct buffer_page *
1717 rb_get_reader_page(struct ring_buffer_per_cpu *cpu_buffer)
1718 {
1719 	struct buffer_page *reader = NULL;
1720 	unsigned long flags;
1721 	int nr_loops = 0;
1722 
1723 	local_irq_save(flags);
1724 	__raw_spin_lock(&cpu_buffer->lock);
1725 
1726  again:
1727 	/*
1728 	 * This should normally only loop twice. But because the
1729 	 * start of the reader inserts an empty page, it causes
1730 	 * a case where we will loop three times. There should be no
1731 	 * reason to loop four times (that I know of).
1732 	 */
1733 	if (RB_WARN_ON(cpu_buffer, ++nr_loops > 3)) {
1734 		reader = NULL;
1735 		goto out;
1736 	}
1737 
1738 	reader = cpu_buffer->reader_page;
1739 
1740 	/* If there's more to read, return this page */
1741 	if (cpu_buffer->reader_page->read < rb_page_size(reader))
1742 		goto out;
1743 
1744 	/* Never should we have an index greater than the size */
1745 	if (RB_WARN_ON(cpu_buffer,
1746 		       cpu_buffer->reader_page->read > rb_page_size(reader)))
1747 		goto out;
1748 
1749 	/* check if we caught up to the tail */
1750 	reader = NULL;
1751 	if (cpu_buffer->commit_page == cpu_buffer->reader_page)
1752 		goto out;
1753 
1754 	/*
1755 	 * Splice the empty reader page into the list around the head.
1756 	 * Reset the reader page to size zero.
1757 	 */
1758 
1759 	reader = cpu_buffer->head_page;
1760 	cpu_buffer->reader_page->list.next = reader->list.next;
1761 	cpu_buffer->reader_page->list.prev = reader->list.prev;
1762 
1763 	local_set(&cpu_buffer->reader_page->write, 0);
1764 	local_set(&cpu_buffer->reader_page->page->commit, 0);
1765 
1766 	/* Make the reader page now replace the head */
1767 	reader->list.prev->next = &cpu_buffer->reader_page->list;
1768 	reader->list.next->prev = &cpu_buffer->reader_page->list;
1769 
1770 	/*
1771 	 * If the tail is on the reader, then we must set the head
1772 	 * to the inserted page, otherwise we set it one before.
1773 	 */
1774 	cpu_buffer->head_page = cpu_buffer->reader_page;
1775 
1776 	if (cpu_buffer->commit_page != reader)
1777 		rb_inc_page(cpu_buffer, &cpu_buffer->head_page);
1778 
1779 	/* Finally update the reader page to the new head */
1780 	cpu_buffer->reader_page = reader;
1781 	rb_reset_reader_page(cpu_buffer);
1782 
1783 	goto again;
1784 
1785  out:
1786 	__raw_spin_unlock(&cpu_buffer->lock);
1787 	local_irq_restore(flags);
1788 
1789 	return reader;
1790 }
1791 
1792 static void rb_advance_reader(struct ring_buffer_per_cpu *cpu_buffer)
1793 {
1794 	struct ring_buffer_event *event;
1795 	struct buffer_page *reader;
1796 	unsigned length;
1797 
1798 	reader = rb_get_reader_page(cpu_buffer);
1799 
1800 	/* This function should not be called when buffer is empty */
1801 	if (RB_WARN_ON(cpu_buffer, !reader))
1802 		return;
1803 
1804 	event = rb_reader_event(cpu_buffer);
1805 
1806 	if (event->type == RINGBUF_TYPE_DATA)
1807 		cpu_buffer->entries--;
1808 
1809 	rb_update_read_stamp(cpu_buffer, event);
1810 
1811 	length = rb_event_length(event);
1812 	cpu_buffer->reader_page->read += length;
1813 }
1814 
1815 static void rb_advance_iter(struct ring_buffer_iter *iter)
1816 {
1817 	struct ring_buffer *buffer;
1818 	struct ring_buffer_per_cpu *cpu_buffer;
1819 	struct ring_buffer_event *event;
1820 	unsigned length;
1821 
1822 	cpu_buffer = iter->cpu_buffer;
1823 	buffer = cpu_buffer->buffer;
1824 
1825 	/*
1826 	 * Check if we are at the end of the buffer.
1827 	 */
1828 	if (iter->head >= rb_page_size(iter->head_page)) {
1829 		if (RB_WARN_ON(buffer,
1830 			       iter->head_page == cpu_buffer->commit_page))
1831 			return;
1832 		rb_inc_iter(iter);
1833 		return;
1834 	}
1835 
1836 	event = rb_iter_head_event(iter);
1837 
1838 	length = rb_event_length(event);
1839 
1840 	/*
1841 	 * This should not be called to advance the header if we are
1842 	 * at the tail of the buffer.
1843 	 */
1844 	if (RB_WARN_ON(cpu_buffer,
1845 		       (iter->head_page == cpu_buffer->commit_page) &&
1846 		       (iter->head + length > rb_commit_index(cpu_buffer))))
1847 		return;
1848 
1849 	rb_update_iter_read_stamp(iter, event);
1850 
1851 	iter->head += length;
1852 
1853 	/* check for end of page padding */
1854 	if ((iter->head >= rb_page_size(iter->head_page)) &&
1855 	    (iter->head_page != cpu_buffer->commit_page))
1856 		rb_advance_iter(iter);
1857 }
1858 
1859 static struct ring_buffer_event *
1860 rb_buffer_peek(struct ring_buffer *buffer, int cpu, u64 *ts)
1861 {
1862 	struct ring_buffer_per_cpu *cpu_buffer;
1863 	struct ring_buffer_event *event;
1864 	struct buffer_page *reader;
1865 	int nr_loops = 0;
1866 
1867 	if (!cpumask_test_cpu(cpu, buffer->cpumask))
1868 		return NULL;
1869 
1870 	cpu_buffer = buffer->buffers[cpu];
1871 
1872  again:
1873 	/*
1874 	 * We repeat when a timestamp is encountered. It is possible
1875 	 * to get multiple timestamps from an interrupt entering just
1876 	 * as one timestamp is about to be written. The max times
1877 	 * that this can happen is the number of nested interrupts we
1878 	 * can have.  Nesting 10 deep of interrupts is clearly
1879 	 * an anomaly.
1880 	 */
1881 	if (RB_WARN_ON(cpu_buffer, ++nr_loops > 10))
1882 		return NULL;
1883 
1884 	reader = rb_get_reader_page(cpu_buffer);
1885 	if (!reader)
1886 		return NULL;
1887 
1888 	event = rb_reader_event(cpu_buffer);
1889 
1890 	switch (event->type) {
1891 	case RINGBUF_TYPE_PADDING:
1892 		RB_WARN_ON(cpu_buffer, 1);
1893 		rb_advance_reader(cpu_buffer);
1894 		return NULL;
1895 
1896 	case RINGBUF_TYPE_TIME_EXTEND:
1897 		/* Internal data, OK to advance */
1898 		rb_advance_reader(cpu_buffer);
1899 		goto again;
1900 
1901 	case RINGBUF_TYPE_TIME_STAMP:
1902 		/* FIXME: not implemented */
1903 		rb_advance_reader(cpu_buffer);
1904 		goto again;
1905 
1906 	case RINGBUF_TYPE_DATA:
1907 		if (ts) {
1908 			*ts = cpu_buffer->read_stamp + event->time_delta;
1909 			ring_buffer_normalize_time_stamp(cpu_buffer->cpu, ts);
1910 		}
1911 		return event;
1912 
1913 	default:
1914 		BUG();
1915 	}
1916 
1917 	return NULL;
1918 }
1919 EXPORT_SYMBOL_GPL(ring_buffer_peek);
1920 
1921 static struct ring_buffer_event *
1922 rb_iter_peek(struct ring_buffer_iter *iter, u64 *ts)
1923 {
1924 	struct ring_buffer *buffer;
1925 	struct ring_buffer_per_cpu *cpu_buffer;
1926 	struct ring_buffer_event *event;
1927 	int nr_loops = 0;
1928 
1929 	if (ring_buffer_iter_empty(iter))
1930 		return NULL;
1931 
1932 	cpu_buffer = iter->cpu_buffer;
1933 	buffer = cpu_buffer->buffer;
1934 
1935  again:
1936 	/*
1937 	 * We repeat when a timestamp is encountered. It is possible
1938 	 * to get multiple timestamps from an interrupt entering just
1939 	 * as one timestamp is about to be written. The max times
1940 	 * that this can happen is the number of nested interrupts we
1941 	 * can have. Nesting 10 deep of interrupts is clearly
1942 	 * an anomaly.
1943 	 */
1944 	if (RB_WARN_ON(cpu_buffer, ++nr_loops > 10))
1945 		return NULL;
1946 
1947 	if (rb_per_cpu_empty(cpu_buffer))
1948 		return NULL;
1949 
1950 	event = rb_iter_head_event(iter);
1951 
1952 	switch (event->type) {
1953 	case RINGBUF_TYPE_PADDING:
1954 		rb_inc_iter(iter);
1955 		goto again;
1956 
1957 	case RINGBUF_TYPE_TIME_EXTEND:
1958 		/* Internal data, OK to advance */
1959 		rb_advance_iter(iter);
1960 		goto again;
1961 
1962 	case RINGBUF_TYPE_TIME_STAMP:
1963 		/* FIXME: not implemented */
1964 		rb_advance_iter(iter);
1965 		goto again;
1966 
1967 	case RINGBUF_TYPE_DATA:
1968 		if (ts) {
1969 			*ts = iter->read_stamp + event->time_delta;
1970 			ring_buffer_normalize_time_stamp(cpu_buffer->cpu, ts);
1971 		}
1972 		return event;
1973 
1974 	default:
1975 		BUG();
1976 	}
1977 
1978 	return NULL;
1979 }
1980 EXPORT_SYMBOL_GPL(ring_buffer_iter_peek);
1981 
1982 /**
1983  * ring_buffer_peek - peek at the next event to be read
1984  * @buffer: The ring buffer to read
1985  * @cpu: The cpu to peak at
1986  * @ts: The timestamp counter of this event.
1987  *
1988  * This will return the event that will be read next, but does
1989  * not consume the data.
1990  */
1991 struct ring_buffer_event *
1992 ring_buffer_peek(struct ring_buffer *buffer, int cpu, u64 *ts)
1993 {
1994 	struct ring_buffer_per_cpu *cpu_buffer = buffer->buffers[cpu];
1995 	struct ring_buffer_event *event;
1996 	unsigned long flags;
1997 
1998 	spin_lock_irqsave(&cpu_buffer->reader_lock, flags);
1999 	event = rb_buffer_peek(buffer, cpu, ts);
2000 	spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags);
2001 
2002 	return event;
2003 }
2004 
2005 /**
2006  * ring_buffer_iter_peek - peek at the next event to be read
2007  * @iter: The ring buffer iterator
2008  * @ts: The timestamp counter of this event.
2009  *
2010  * This will return the event that will be read next, but does
2011  * not increment the iterator.
2012  */
2013 struct ring_buffer_event *
2014 ring_buffer_iter_peek(struct ring_buffer_iter *iter, u64 *ts)
2015 {
2016 	struct ring_buffer_per_cpu *cpu_buffer = iter->cpu_buffer;
2017 	struct ring_buffer_event *event;
2018 	unsigned long flags;
2019 
2020 	spin_lock_irqsave(&cpu_buffer->reader_lock, flags);
2021 	event = rb_iter_peek(iter, ts);
2022 	spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags);
2023 
2024 	return event;
2025 }
2026 
2027 /**
2028  * ring_buffer_consume - return an event and consume it
2029  * @buffer: The ring buffer to get the next event from
2030  *
2031  * Returns the next event in the ring buffer, and that event is consumed.
2032  * Meaning, that sequential reads will keep returning a different event,
2033  * and eventually empty the ring buffer if the producer is slower.
2034  */
2035 struct ring_buffer_event *
2036 ring_buffer_consume(struct ring_buffer *buffer, int cpu, u64 *ts)
2037 {
2038 	struct ring_buffer_per_cpu *cpu_buffer = buffer->buffers[cpu];
2039 	struct ring_buffer_event *event;
2040 	unsigned long flags;
2041 
2042 	if (!cpumask_test_cpu(cpu, buffer->cpumask))
2043 		return NULL;
2044 
2045 	spin_lock_irqsave(&cpu_buffer->reader_lock, flags);
2046 
2047 	event = rb_buffer_peek(buffer, cpu, ts);
2048 	if (!event)
2049 		goto out;
2050 
2051 	rb_advance_reader(cpu_buffer);
2052 
2053  out:
2054 	spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags);
2055 
2056 	return event;
2057 }
2058 EXPORT_SYMBOL_GPL(ring_buffer_consume);
2059 
2060 /**
2061  * ring_buffer_read_start - start a non consuming read of the buffer
2062  * @buffer: The ring buffer to read from
2063  * @cpu: The cpu buffer to iterate over
2064  *
2065  * This starts up an iteration through the buffer. It also disables
2066  * the recording to the buffer until the reading is finished.
2067  * This prevents the reading from being corrupted. This is not
2068  * a consuming read, so a producer is not expected.
2069  *
2070  * Must be paired with ring_buffer_finish.
2071  */
2072 struct ring_buffer_iter *
2073 ring_buffer_read_start(struct ring_buffer *buffer, int cpu)
2074 {
2075 	struct ring_buffer_per_cpu *cpu_buffer;
2076 	struct ring_buffer_iter *iter;
2077 	unsigned long flags;
2078 
2079 	if (!cpumask_test_cpu(cpu, buffer->cpumask))
2080 		return NULL;
2081 
2082 	iter = kmalloc(sizeof(*iter), GFP_KERNEL);
2083 	if (!iter)
2084 		return NULL;
2085 
2086 	cpu_buffer = buffer->buffers[cpu];
2087 
2088 	iter->cpu_buffer = cpu_buffer;
2089 
2090 	atomic_inc(&cpu_buffer->record_disabled);
2091 	synchronize_sched();
2092 
2093 	spin_lock_irqsave(&cpu_buffer->reader_lock, flags);
2094 	__raw_spin_lock(&cpu_buffer->lock);
2095 	rb_iter_reset(iter);
2096 	__raw_spin_unlock(&cpu_buffer->lock);
2097 	spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags);
2098 
2099 	return iter;
2100 }
2101 EXPORT_SYMBOL_GPL(ring_buffer_read_start);
2102 
2103 /**
2104  * ring_buffer_finish - finish reading the iterator of the buffer
2105  * @iter: The iterator retrieved by ring_buffer_start
2106  *
2107  * This re-enables the recording to the buffer, and frees the
2108  * iterator.
2109  */
2110 void
2111 ring_buffer_read_finish(struct ring_buffer_iter *iter)
2112 {
2113 	struct ring_buffer_per_cpu *cpu_buffer = iter->cpu_buffer;
2114 
2115 	atomic_dec(&cpu_buffer->record_disabled);
2116 	kfree(iter);
2117 }
2118 EXPORT_SYMBOL_GPL(ring_buffer_read_finish);
2119 
2120 /**
2121  * ring_buffer_read - read the next item in the ring buffer by the iterator
2122  * @iter: The ring buffer iterator
2123  * @ts: The time stamp of the event read.
2124  *
2125  * This reads the next event in the ring buffer and increments the iterator.
2126  */
2127 struct ring_buffer_event *
2128 ring_buffer_read(struct ring_buffer_iter *iter, u64 *ts)
2129 {
2130 	struct ring_buffer_event *event;
2131 	struct ring_buffer_per_cpu *cpu_buffer = iter->cpu_buffer;
2132 	unsigned long flags;
2133 
2134 	spin_lock_irqsave(&cpu_buffer->reader_lock, flags);
2135 	event = rb_iter_peek(iter, ts);
2136 	if (!event)
2137 		goto out;
2138 
2139 	rb_advance_iter(iter);
2140  out:
2141 	spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags);
2142 
2143 	return event;
2144 }
2145 EXPORT_SYMBOL_GPL(ring_buffer_read);
2146 
2147 /**
2148  * ring_buffer_size - return the size of the ring buffer (in bytes)
2149  * @buffer: The ring buffer.
2150  */
2151 unsigned long ring_buffer_size(struct ring_buffer *buffer)
2152 {
2153 	return BUF_PAGE_SIZE * buffer->pages;
2154 }
2155 EXPORT_SYMBOL_GPL(ring_buffer_size);
2156 
2157 static void
2158 rb_reset_cpu(struct ring_buffer_per_cpu *cpu_buffer)
2159 {
2160 	cpu_buffer->head_page
2161 		= list_entry(cpu_buffer->pages.next, struct buffer_page, list);
2162 	local_set(&cpu_buffer->head_page->write, 0);
2163 	local_set(&cpu_buffer->head_page->page->commit, 0);
2164 
2165 	cpu_buffer->head_page->read = 0;
2166 
2167 	cpu_buffer->tail_page = cpu_buffer->head_page;
2168 	cpu_buffer->commit_page = cpu_buffer->head_page;
2169 
2170 	INIT_LIST_HEAD(&cpu_buffer->reader_page->list);
2171 	local_set(&cpu_buffer->reader_page->write, 0);
2172 	local_set(&cpu_buffer->reader_page->page->commit, 0);
2173 	cpu_buffer->reader_page->read = 0;
2174 
2175 	cpu_buffer->overrun = 0;
2176 	cpu_buffer->entries = 0;
2177 
2178 	cpu_buffer->write_stamp = 0;
2179 	cpu_buffer->read_stamp = 0;
2180 }
2181 
2182 /**
2183  * ring_buffer_reset_cpu - reset a ring buffer per CPU buffer
2184  * @buffer: The ring buffer to reset a per cpu buffer of
2185  * @cpu: The CPU buffer to be reset
2186  */
2187 void ring_buffer_reset_cpu(struct ring_buffer *buffer, int cpu)
2188 {
2189 	struct ring_buffer_per_cpu *cpu_buffer = buffer->buffers[cpu];
2190 	unsigned long flags;
2191 
2192 	if (!cpumask_test_cpu(cpu, buffer->cpumask))
2193 		return;
2194 
2195 	spin_lock_irqsave(&cpu_buffer->reader_lock, flags);
2196 
2197 	__raw_spin_lock(&cpu_buffer->lock);
2198 
2199 	rb_reset_cpu(cpu_buffer);
2200 
2201 	__raw_spin_unlock(&cpu_buffer->lock);
2202 
2203 	spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags);
2204 }
2205 EXPORT_SYMBOL_GPL(ring_buffer_reset_cpu);
2206 
2207 /**
2208  * ring_buffer_reset - reset a ring buffer
2209  * @buffer: The ring buffer to reset all cpu buffers
2210  */
2211 void ring_buffer_reset(struct ring_buffer *buffer)
2212 {
2213 	int cpu;
2214 
2215 	for_each_buffer_cpu(buffer, cpu)
2216 		ring_buffer_reset_cpu(buffer, cpu);
2217 }
2218 EXPORT_SYMBOL_GPL(ring_buffer_reset);
2219 
2220 /**
2221  * rind_buffer_empty - is the ring buffer empty?
2222  * @buffer: The ring buffer to test
2223  */
2224 int ring_buffer_empty(struct ring_buffer *buffer)
2225 {
2226 	struct ring_buffer_per_cpu *cpu_buffer;
2227 	int cpu;
2228 
2229 	/* yes this is racy, but if you don't like the race, lock the buffer */
2230 	for_each_buffer_cpu(buffer, cpu) {
2231 		cpu_buffer = buffer->buffers[cpu];
2232 		if (!rb_per_cpu_empty(cpu_buffer))
2233 			return 0;
2234 	}
2235 	return 1;
2236 }
2237 EXPORT_SYMBOL_GPL(ring_buffer_empty);
2238 
2239 /**
2240  * ring_buffer_empty_cpu - is a cpu buffer of a ring buffer empty?
2241  * @buffer: The ring buffer
2242  * @cpu: The CPU buffer to test
2243  */
2244 int ring_buffer_empty_cpu(struct ring_buffer *buffer, int cpu)
2245 {
2246 	struct ring_buffer_per_cpu *cpu_buffer;
2247 
2248 	if (!cpumask_test_cpu(cpu, buffer->cpumask))
2249 		return 1;
2250 
2251 	cpu_buffer = buffer->buffers[cpu];
2252 	return rb_per_cpu_empty(cpu_buffer);
2253 }
2254 EXPORT_SYMBOL_GPL(ring_buffer_empty_cpu);
2255 
2256 /**
2257  * ring_buffer_swap_cpu - swap a CPU buffer between two ring buffers
2258  * @buffer_a: One buffer to swap with
2259  * @buffer_b: The other buffer to swap with
2260  *
2261  * This function is useful for tracers that want to take a "snapshot"
2262  * of a CPU buffer and has another back up buffer lying around.
2263  * it is expected that the tracer handles the cpu buffer not being
2264  * used at the moment.
2265  */
2266 int ring_buffer_swap_cpu(struct ring_buffer *buffer_a,
2267 			 struct ring_buffer *buffer_b, int cpu)
2268 {
2269 	struct ring_buffer_per_cpu *cpu_buffer_a;
2270 	struct ring_buffer_per_cpu *cpu_buffer_b;
2271 
2272 	if (!cpumask_test_cpu(cpu, buffer_a->cpumask) ||
2273 	    !cpumask_test_cpu(cpu, buffer_b->cpumask))
2274 		return -EINVAL;
2275 
2276 	/* At least make sure the two buffers are somewhat the same */
2277 	if (buffer_a->pages != buffer_b->pages)
2278 		return -EINVAL;
2279 
2280 	cpu_buffer_a = buffer_a->buffers[cpu];
2281 	cpu_buffer_b = buffer_b->buffers[cpu];
2282 
2283 	/*
2284 	 * We can't do a synchronize_sched here because this
2285 	 * function can be called in atomic context.
2286 	 * Normally this will be called from the same CPU as cpu.
2287 	 * If not it's up to the caller to protect this.
2288 	 */
2289 	atomic_inc(&cpu_buffer_a->record_disabled);
2290 	atomic_inc(&cpu_buffer_b->record_disabled);
2291 
2292 	buffer_a->buffers[cpu] = cpu_buffer_b;
2293 	buffer_b->buffers[cpu] = cpu_buffer_a;
2294 
2295 	cpu_buffer_b->buffer = buffer_a;
2296 	cpu_buffer_a->buffer = buffer_b;
2297 
2298 	atomic_dec(&cpu_buffer_a->record_disabled);
2299 	atomic_dec(&cpu_buffer_b->record_disabled);
2300 
2301 	return 0;
2302 }
2303 EXPORT_SYMBOL_GPL(ring_buffer_swap_cpu);
2304 
2305 static void rb_remove_entries(struct ring_buffer_per_cpu *cpu_buffer,
2306 			      struct buffer_data_page *bpage)
2307 {
2308 	struct ring_buffer_event *event;
2309 	unsigned long head;
2310 
2311 	__raw_spin_lock(&cpu_buffer->lock);
2312 	for (head = 0; head < local_read(&bpage->commit);
2313 	     head += rb_event_length(event)) {
2314 
2315 		event = __rb_data_page_index(bpage, head);
2316 		if (RB_WARN_ON(cpu_buffer, rb_null_event(event)))
2317 			return;
2318 		/* Only count data entries */
2319 		if (event->type != RINGBUF_TYPE_DATA)
2320 			continue;
2321 		cpu_buffer->entries--;
2322 	}
2323 	__raw_spin_unlock(&cpu_buffer->lock);
2324 }
2325 
2326 /**
2327  * ring_buffer_alloc_read_page - allocate a page to read from buffer
2328  * @buffer: the buffer to allocate for.
2329  *
2330  * This function is used in conjunction with ring_buffer_read_page.
2331  * When reading a full page from the ring buffer, these functions
2332  * can be used to speed up the process. The calling function should
2333  * allocate a few pages first with this function. Then when it
2334  * needs to get pages from the ring buffer, it passes the result
2335  * of this function into ring_buffer_read_page, which will swap
2336  * the page that was allocated, with the read page of the buffer.
2337  *
2338  * Returns:
2339  *  The page allocated, or NULL on error.
2340  */
2341 void *ring_buffer_alloc_read_page(struct ring_buffer *buffer)
2342 {
2343 	unsigned long addr;
2344 	struct buffer_data_page *bpage;
2345 
2346 	addr = __get_free_page(GFP_KERNEL);
2347 	if (!addr)
2348 		return NULL;
2349 
2350 	bpage = (void *)addr;
2351 
2352 	return bpage;
2353 }
2354 
2355 /**
2356  * ring_buffer_free_read_page - free an allocated read page
2357  * @buffer: the buffer the page was allocate for
2358  * @data: the page to free
2359  *
2360  * Free a page allocated from ring_buffer_alloc_read_page.
2361  */
2362 void ring_buffer_free_read_page(struct ring_buffer *buffer, void *data)
2363 {
2364 	free_page((unsigned long)data);
2365 }
2366 
2367 /**
2368  * ring_buffer_read_page - extract a page from the ring buffer
2369  * @buffer: buffer to extract from
2370  * @data_page: the page to use allocated from ring_buffer_alloc_read_page
2371  * @cpu: the cpu of the buffer to extract
2372  * @full: should the extraction only happen when the page is full.
2373  *
2374  * This function will pull out a page from the ring buffer and consume it.
2375  * @data_page must be the address of the variable that was returned
2376  * from ring_buffer_alloc_read_page. This is because the page might be used
2377  * to swap with a page in the ring buffer.
2378  *
2379  * for example:
2380  *	rpage = ring_buffer_alloc_page(buffer);
2381  *	if (!rpage)
2382  *		return error;
2383  *	ret = ring_buffer_read_page(buffer, &rpage, cpu, 0);
2384  *	if (ret)
2385  *		process_page(rpage);
2386  *
2387  * When @full is set, the function will not return true unless
2388  * the writer is off the reader page.
2389  *
2390  * Note: it is up to the calling functions to handle sleeps and wakeups.
2391  *  The ring buffer can be used anywhere in the kernel and can not
2392  *  blindly call wake_up. The layer that uses the ring buffer must be
2393  *  responsible for that.
2394  *
2395  * Returns:
2396  *  1 if data has been transferred
2397  *  0 if no data has been transferred.
2398  */
2399 int ring_buffer_read_page(struct ring_buffer *buffer,
2400 			    void **data_page, int cpu, int full)
2401 {
2402 	struct ring_buffer_per_cpu *cpu_buffer = buffer->buffers[cpu];
2403 	struct ring_buffer_event *event;
2404 	struct buffer_data_page *bpage;
2405 	unsigned long flags;
2406 	int ret = 0;
2407 
2408 	if (!data_page)
2409 		return 0;
2410 
2411 	bpage = *data_page;
2412 	if (!bpage)
2413 		return 0;
2414 
2415 	spin_lock_irqsave(&cpu_buffer->reader_lock, flags);
2416 
2417 	/*
2418 	 * rb_buffer_peek will get the next ring buffer if
2419 	 * the current reader page is empty.
2420 	 */
2421 	event = rb_buffer_peek(buffer, cpu, NULL);
2422 	if (!event)
2423 		goto out;
2424 
2425 	/* check for data */
2426 	if (!local_read(&cpu_buffer->reader_page->page->commit))
2427 		goto out;
2428 	/*
2429 	 * If the writer is already off of the read page, then simply
2430 	 * switch the read page with the given page. Otherwise
2431 	 * we need to copy the data from the reader to the writer.
2432 	 */
2433 	if (cpu_buffer->reader_page == cpu_buffer->commit_page) {
2434 		unsigned int read = cpu_buffer->reader_page->read;
2435 
2436 		if (full)
2437 			goto out;
2438 		/* The writer is still on the reader page, we must copy */
2439 		bpage = cpu_buffer->reader_page->page;
2440 		memcpy(bpage->data,
2441 		       cpu_buffer->reader_page->page->data + read,
2442 		       local_read(&bpage->commit) - read);
2443 
2444 		/* consume what was read */
2445 		cpu_buffer->reader_page += read;
2446 
2447 	} else {
2448 		/* swap the pages */
2449 		rb_init_page(bpage);
2450 		bpage = cpu_buffer->reader_page->page;
2451 		cpu_buffer->reader_page->page = *data_page;
2452 		cpu_buffer->reader_page->read = 0;
2453 		*data_page = bpage;
2454 	}
2455 	ret = 1;
2456 
2457 	/* update the entry counter */
2458 	rb_remove_entries(cpu_buffer, bpage);
2459  out:
2460 	spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags);
2461 
2462 	return ret;
2463 }
2464 
2465 static ssize_t
2466 rb_simple_read(struct file *filp, char __user *ubuf,
2467 	       size_t cnt, loff_t *ppos)
2468 {
2469 	long *p = filp->private_data;
2470 	char buf[64];
2471 	int r;
2472 
2473 	if (test_bit(RB_BUFFERS_DISABLED_BIT, p))
2474 		r = sprintf(buf, "permanently disabled\n");
2475 	else
2476 		r = sprintf(buf, "%d\n", test_bit(RB_BUFFERS_ON_BIT, p));
2477 
2478 	return simple_read_from_buffer(ubuf, cnt, ppos, buf, r);
2479 }
2480 
2481 static ssize_t
2482 rb_simple_write(struct file *filp, const char __user *ubuf,
2483 		size_t cnt, loff_t *ppos)
2484 {
2485 	long *p = filp->private_data;
2486 	char buf[64];
2487 	long val;
2488 	int ret;
2489 
2490 	if (cnt >= sizeof(buf))
2491 		return -EINVAL;
2492 
2493 	if (copy_from_user(&buf, ubuf, cnt))
2494 		return -EFAULT;
2495 
2496 	buf[cnt] = 0;
2497 
2498 	ret = strict_strtoul(buf, 10, &val);
2499 	if (ret < 0)
2500 		return ret;
2501 
2502 	if (val)
2503 		set_bit(RB_BUFFERS_ON_BIT, p);
2504 	else
2505 		clear_bit(RB_BUFFERS_ON_BIT, p);
2506 
2507 	(*ppos)++;
2508 
2509 	return cnt;
2510 }
2511 
2512 static struct file_operations rb_simple_fops = {
2513 	.open		= tracing_open_generic,
2514 	.read		= rb_simple_read,
2515 	.write		= rb_simple_write,
2516 };
2517 
2518 
2519 static __init int rb_init_debugfs(void)
2520 {
2521 	struct dentry *d_tracer;
2522 	struct dentry *entry;
2523 
2524 	d_tracer = tracing_init_dentry();
2525 
2526 	entry = debugfs_create_file("tracing_on", 0644, d_tracer,
2527 				    &ring_buffer_flags, &rb_simple_fops);
2528 	if (!entry)
2529 		pr_warning("Could not create debugfs 'tracing_on' entry\n");
2530 
2531 	return 0;
2532 }
2533 
2534 fs_initcall(rb_init_debugfs);
2535