1 // SPDX-License-Identifier: GPL-2.0
2 /*
3  * ring buffer tester and benchmark
4  *
5  * Copyright (C) 2009 Steven Rostedt <srostedt@redhat.com>
6  */
7 #include <linux/ring_buffer.h>
8 #include <linux/completion.h>
9 #include <linux/kthread.h>
10 #include <uapi/linux/sched/types.h>
11 #include <linux/module.h>
12 #include <linux/ktime.h>
13 #include <asm/local.h>
14 
15 struct rb_page {
16 	u64		ts;
17 	local_t		commit;
18 	char		data[4080];
19 };
20 
21 /* run time and sleep time in seconds */
22 #define RUN_TIME	10ULL
23 #define SLEEP_TIME	10
24 
25 /* number of events for writer to wake up the reader */
26 static int wakeup_interval = 100;
27 
28 static int reader_finish;
29 static DECLARE_COMPLETION(read_start);
30 static DECLARE_COMPLETION(read_done);
31 
32 static struct ring_buffer *buffer;
33 static struct task_struct *producer;
34 static struct task_struct *consumer;
35 static unsigned long read;
36 
37 static unsigned int disable_reader;
38 module_param(disable_reader, uint, 0644);
39 MODULE_PARM_DESC(disable_reader, "only run producer");
40 
41 static unsigned int write_iteration = 50;
42 module_param(write_iteration, uint, 0644);
43 MODULE_PARM_DESC(write_iteration, "# of writes between timestamp readings");
44 
45 static int producer_nice = MAX_NICE;
46 static int consumer_nice = MAX_NICE;
47 
48 static int producer_fifo = -1;
49 static int consumer_fifo = -1;
50 
51 module_param(producer_nice, int, 0644);
52 MODULE_PARM_DESC(producer_nice, "nice prio for producer");
53 
54 module_param(consumer_nice, int, 0644);
55 MODULE_PARM_DESC(consumer_nice, "nice prio for consumer");
56 
57 module_param(producer_fifo, int, 0644);
58 MODULE_PARM_DESC(producer_fifo, "fifo prio for producer");
59 
60 module_param(consumer_fifo, int, 0644);
61 MODULE_PARM_DESC(consumer_fifo, "fifo prio for consumer");
62 
63 static int read_events;
64 
65 static int test_error;
66 
67 #define TEST_ERROR()				\
68 	do {					\
69 		if (!test_error) {		\
70 			test_error = 1;		\
71 			WARN_ON(1);		\
72 		}				\
73 	} while (0)
74 
75 enum event_status {
76 	EVENT_FOUND,
77 	EVENT_DROPPED,
78 };
79 
80 static bool break_test(void)
81 {
82 	return test_error || kthread_should_stop();
83 }
84 
85 static enum event_status read_event(int cpu)
86 {
87 	struct ring_buffer_event *event;
88 	int *entry;
89 	u64 ts;
90 
91 	event = ring_buffer_consume(buffer, cpu, &ts, NULL);
92 	if (!event)
93 		return EVENT_DROPPED;
94 
95 	entry = ring_buffer_event_data(event);
96 	if (*entry != cpu) {
97 		TEST_ERROR();
98 		return EVENT_DROPPED;
99 	}
100 
101 	read++;
102 	return EVENT_FOUND;
103 }
104 
105 static enum event_status read_page(int cpu)
106 {
107 	struct ring_buffer_event *event;
108 	struct rb_page *rpage;
109 	unsigned long commit;
110 	void *bpage;
111 	int *entry;
112 	int ret;
113 	int inc;
114 	int i;
115 
116 	bpage = ring_buffer_alloc_read_page(buffer, cpu);
117 	if (IS_ERR(bpage))
118 		return EVENT_DROPPED;
119 
120 	ret = ring_buffer_read_page(buffer, &bpage, PAGE_SIZE, cpu, 1);
121 	if (ret >= 0) {
122 		rpage = bpage;
123 		/* The commit may have missed event flags set, clear them */
124 		commit = local_read(&rpage->commit) & 0xfffff;
125 		for (i = 0; i < commit && !test_error ; i += inc) {
126 
127 			if (i >= (PAGE_SIZE - offsetof(struct rb_page, data))) {
128 				TEST_ERROR();
129 				break;
130 			}
131 
132 			inc = -1;
133 			event = (void *)&rpage->data[i];
134 			switch (event->type_len) {
135 			case RINGBUF_TYPE_PADDING:
136 				/* failed writes may be discarded events */
137 				if (!event->time_delta)
138 					TEST_ERROR();
139 				inc = event->array[0] + 4;
140 				break;
141 			case RINGBUF_TYPE_TIME_EXTEND:
142 				inc = 8;
143 				break;
144 			case 0:
145 				entry = ring_buffer_event_data(event);
146 				if (*entry != cpu) {
147 					TEST_ERROR();
148 					break;
149 				}
150 				read++;
151 				if (!event->array[0]) {
152 					TEST_ERROR();
153 					break;
154 				}
155 				inc = event->array[0] + 4;
156 				break;
157 			default:
158 				entry = ring_buffer_event_data(event);
159 				if (*entry != cpu) {
160 					TEST_ERROR();
161 					break;
162 				}
163 				read++;
164 				inc = ((event->type_len + 1) * 4);
165 			}
166 			if (test_error)
167 				break;
168 
169 			if (inc <= 0) {
170 				TEST_ERROR();
171 				break;
172 			}
173 		}
174 	}
175 	ring_buffer_free_read_page(buffer, cpu, bpage);
176 
177 	if (ret < 0)
178 		return EVENT_DROPPED;
179 	return EVENT_FOUND;
180 }
181 
182 static void ring_buffer_consumer(void)
183 {
184 	/* toggle between reading pages and events */
185 	read_events ^= 1;
186 
187 	read = 0;
188 	/*
189 	 * Continue running until the producer specifically asks to stop
190 	 * and is ready for the completion.
191 	 */
192 	while (!READ_ONCE(reader_finish)) {
193 		int found = 1;
194 
195 		while (found && !test_error) {
196 			int cpu;
197 
198 			found = 0;
199 			for_each_online_cpu(cpu) {
200 				enum event_status stat;
201 
202 				if (read_events)
203 					stat = read_event(cpu);
204 				else
205 					stat = read_page(cpu);
206 
207 				if (test_error)
208 					break;
209 
210 				if (stat == EVENT_FOUND)
211 					found = 1;
212 
213 			}
214 		}
215 
216 		/* Wait till the producer wakes us up when there is more data
217 		 * available or when the producer wants us to finish reading.
218 		 */
219 		set_current_state(TASK_INTERRUPTIBLE);
220 		if (reader_finish)
221 			break;
222 
223 		schedule();
224 	}
225 	__set_current_state(TASK_RUNNING);
226 	reader_finish = 0;
227 	complete(&read_done);
228 }
229 
230 static void ring_buffer_producer(void)
231 {
232 	ktime_t start_time, end_time, timeout;
233 	unsigned long long time;
234 	unsigned long long entries;
235 	unsigned long long overruns;
236 	unsigned long missed = 0;
237 	unsigned long hit = 0;
238 	unsigned long avg;
239 	int cnt = 0;
240 
241 	/*
242 	 * Hammer the buffer for 10 secs (this may
243 	 * make the system stall)
244 	 */
245 	trace_printk("Starting ring buffer hammer\n");
246 	start_time = ktime_get();
247 	timeout = ktime_add_ns(start_time, RUN_TIME * NSEC_PER_SEC);
248 	do {
249 		struct ring_buffer_event *event;
250 		int *entry;
251 		int i;
252 
253 		for (i = 0; i < write_iteration; i++) {
254 			event = ring_buffer_lock_reserve(buffer, 10);
255 			if (!event) {
256 				missed++;
257 			} else {
258 				hit++;
259 				entry = ring_buffer_event_data(event);
260 				*entry = smp_processor_id();
261 				ring_buffer_unlock_commit(buffer, event);
262 			}
263 		}
264 		end_time = ktime_get();
265 
266 		cnt++;
267 		if (consumer && !(cnt % wakeup_interval))
268 			wake_up_process(consumer);
269 
270 #ifndef CONFIG_PREEMPT
271 		/*
272 		 * If we are a non preempt kernel, the 10 second run will
273 		 * stop everything while it runs. Instead, we will call
274 		 * cond_resched and also add any time that was lost by a
275 		 * rescedule.
276 		 *
277 		 * Do a cond resched at the same frequency we would wake up
278 		 * the reader.
279 		 */
280 		if (cnt % wakeup_interval)
281 			cond_resched();
282 #endif
283 	} while (ktime_before(end_time, timeout) && !break_test());
284 	trace_printk("End ring buffer hammer\n");
285 
286 	if (consumer) {
287 		/* Init both completions here to avoid races */
288 		init_completion(&read_start);
289 		init_completion(&read_done);
290 		/* the completions must be visible before the finish var */
291 		smp_wmb();
292 		reader_finish = 1;
293 		wake_up_process(consumer);
294 		wait_for_completion(&read_done);
295 	}
296 
297 	time = ktime_us_delta(end_time, start_time);
298 
299 	entries = ring_buffer_entries(buffer);
300 	overruns = ring_buffer_overruns(buffer);
301 
302 	if (test_error)
303 		trace_printk("ERROR!\n");
304 
305 	if (!disable_reader) {
306 		if (consumer_fifo < 0)
307 			trace_printk("Running Consumer at nice: %d\n",
308 				     consumer_nice);
309 		else
310 			trace_printk("Running Consumer at SCHED_FIFO %d\n",
311 				     consumer_fifo);
312 	}
313 	if (producer_fifo < 0)
314 		trace_printk("Running Producer at nice: %d\n",
315 			     producer_nice);
316 	else
317 		trace_printk("Running Producer at SCHED_FIFO %d\n",
318 			     producer_fifo);
319 
320 	/* Let the user know that the test is running at low priority */
321 	if (producer_fifo < 0 && consumer_fifo < 0 &&
322 	    producer_nice == MAX_NICE && consumer_nice == MAX_NICE)
323 		trace_printk("WARNING!!! This test is running at lowest priority.\n");
324 
325 	trace_printk("Time:     %lld (usecs)\n", time);
326 	trace_printk("Overruns: %lld\n", overruns);
327 	if (disable_reader)
328 		trace_printk("Read:     (reader disabled)\n");
329 	else
330 		trace_printk("Read:     %ld  (by %s)\n", read,
331 			read_events ? "events" : "pages");
332 	trace_printk("Entries:  %lld\n", entries);
333 	trace_printk("Total:    %lld\n", entries + overruns + read);
334 	trace_printk("Missed:   %ld\n", missed);
335 	trace_printk("Hit:      %ld\n", hit);
336 
337 	/* Convert time from usecs to millisecs */
338 	do_div(time, USEC_PER_MSEC);
339 	if (time)
340 		hit /= (long)time;
341 	else
342 		trace_printk("TIME IS ZERO??\n");
343 
344 	trace_printk("Entries per millisec: %ld\n", hit);
345 
346 	if (hit) {
347 		/* Calculate the average time in nanosecs */
348 		avg = NSEC_PER_MSEC / hit;
349 		trace_printk("%ld ns per entry\n", avg);
350 	}
351 
352 	if (missed) {
353 		if (time)
354 			missed /= (long)time;
355 
356 		trace_printk("Total iterations per millisec: %ld\n",
357 			     hit + missed);
358 
359 		/* it is possible that hit + missed will overflow and be zero */
360 		if (!(hit + missed)) {
361 			trace_printk("hit + missed overflowed and totalled zero!\n");
362 			hit--; /* make it non zero */
363 		}
364 
365 		/* Caculate the average time in nanosecs */
366 		avg = NSEC_PER_MSEC / (hit + missed);
367 		trace_printk("%ld ns per entry\n", avg);
368 	}
369 }
370 
371 static void wait_to_die(void)
372 {
373 	set_current_state(TASK_INTERRUPTIBLE);
374 	while (!kthread_should_stop()) {
375 		schedule();
376 		set_current_state(TASK_INTERRUPTIBLE);
377 	}
378 	__set_current_state(TASK_RUNNING);
379 }
380 
381 static int ring_buffer_consumer_thread(void *arg)
382 {
383 	while (!break_test()) {
384 		complete(&read_start);
385 
386 		ring_buffer_consumer();
387 
388 		set_current_state(TASK_INTERRUPTIBLE);
389 		if (break_test())
390 			break;
391 		schedule();
392 	}
393 	__set_current_state(TASK_RUNNING);
394 
395 	if (!kthread_should_stop())
396 		wait_to_die();
397 
398 	return 0;
399 }
400 
401 static int ring_buffer_producer_thread(void *arg)
402 {
403 	while (!break_test()) {
404 		ring_buffer_reset(buffer);
405 
406 		if (consumer) {
407 			wake_up_process(consumer);
408 			wait_for_completion(&read_start);
409 		}
410 
411 		ring_buffer_producer();
412 		if (break_test())
413 			goto out_kill;
414 
415 		trace_printk("Sleeping for 10 secs\n");
416 		set_current_state(TASK_INTERRUPTIBLE);
417 		if (break_test())
418 			goto out_kill;
419 		schedule_timeout(HZ * SLEEP_TIME);
420 	}
421 
422 out_kill:
423 	__set_current_state(TASK_RUNNING);
424 	if (!kthread_should_stop())
425 		wait_to_die();
426 
427 	return 0;
428 }
429 
430 static int __init ring_buffer_benchmark_init(void)
431 {
432 	int ret;
433 
434 	/* make a one meg buffer in overwite mode */
435 	buffer = ring_buffer_alloc(1000000, RB_FL_OVERWRITE);
436 	if (!buffer)
437 		return -ENOMEM;
438 
439 	if (!disable_reader) {
440 		consumer = kthread_create(ring_buffer_consumer_thread,
441 					  NULL, "rb_consumer");
442 		ret = PTR_ERR(consumer);
443 		if (IS_ERR(consumer))
444 			goto out_fail;
445 	}
446 
447 	producer = kthread_run(ring_buffer_producer_thread,
448 			       NULL, "rb_producer");
449 	ret = PTR_ERR(producer);
450 
451 	if (IS_ERR(producer))
452 		goto out_kill;
453 
454 	/*
455 	 * Run them as low-prio background tasks by default:
456 	 */
457 	if (!disable_reader) {
458 		if (consumer_fifo >= 0) {
459 			struct sched_param param = {
460 				.sched_priority = consumer_fifo
461 			};
462 			sched_setscheduler(consumer, SCHED_FIFO, &param);
463 		} else
464 			set_user_nice(consumer, consumer_nice);
465 	}
466 
467 	if (producer_fifo >= 0) {
468 		struct sched_param param = {
469 			.sched_priority = producer_fifo
470 		};
471 		sched_setscheduler(producer, SCHED_FIFO, &param);
472 	} else
473 		set_user_nice(producer, producer_nice);
474 
475 	return 0;
476 
477  out_kill:
478 	if (consumer)
479 		kthread_stop(consumer);
480 
481  out_fail:
482 	ring_buffer_free(buffer);
483 	return ret;
484 }
485 
486 static void __exit ring_buffer_benchmark_exit(void)
487 {
488 	kthread_stop(producer);
489 	if (consumer)
490 		kthread_stop(consumer);
491 	ring_buffer_free(buffer);
492 }
493 
494 module_init(ring_buffer_benchmark_init);
495 module_exit(ring_buffer_benchmark_exit);
496 
497 MODULE_AUTHOR("Steven Rostedt");
498 MODULE_DESCRIPTION("ring_buffer_benchmark");
499 MODULE_LICENSE("GPL");
500