1 /*
2  * ring buffer tester and benchmark
3  *
4  * Copyright (C) 2009 Steven Rostedt <srostedt@redhat.com>
5  */
6 #include <linux/ring_buffer.h>
7 #include <linux/completion.h>
8 #include <linux/kthread.h>
9 #include <uapi/linux/sched/types.h>
10 #include <linux/module.h>
11 #include <linux/ktime.h>
12 #include <asm/local.h>
13 
14 struct rb_page {
15 	u64		ts;
16 	local_t		commit;
17 	char		data[4080];
18 };
19 
20 /* run time and sleep time in seconds */
21 #define RUN_TIME	10ULL
22 #define SLEEP_TIME	10
23 
24 /* number of events for writer to wake up the reader */
25 static int wakeup_interval = 100;
26 
27 static int reader_finish;
28 static DECLARE_COMPLETION(read_start);
29 static DECLARE_COMPLETION(read_done);
30 
31 static struct ring_buffer *buffer;
32 static struct task_struct *producer;
33 static struct task_struct *consumer;
34 static unsigned long read;
35 
36 static unsigned int disable_reader;
37 module_param(disable_reader, uint, 0644);
38 MODULE_PARM_DESC(disable_reader, "only run producer");
39 
40 static unsigned int write_iteration = 50;
41 module_param(write_iteration, uint, 0644);
42 MODULE_PARM_DESC(write_iteration, "# of writes between timestamp readings");
43 
44 static int producer_nice = MAX_NICE;
45 static int consumer_nice = MAX_NICE;
46 
47 static int producer_fifo = -1;
48 static int consumer_fifo = -1;
49 
50 module_param(producer_nice, int, 0644);
51 MODULE_PARM_DESC(producer_nice, "nice prio for producer");
52 
53 module_param(consumer_nice, int, 0644);
54 MODULE_PARM_DESC(consumer_nice, "nice prio for consumer");
55 
56 module_param(producer_fifo, int, 0644);
57 MODULE_PARM_DESC(producer_fifo, "fifo prio for producer");
58 
59 module_param(consumer_fifo, int, 0644);
60 MODULE_PARM_DESC(consumer_fifo, "fifo prio for consumer");
61 
62 static int read_events;
63 
64 static int test_error;
65 
66 #define TEST_ERROR()				\
67 	do {					\
68 		if (!test_error) {		\
69 			test_error = 1;		\
70 			WARN_ON(1);		\
71 		}				\
72 	} while (0)
73 
74 enum event_status {
75 	EVENT_FOUND,
76 	EVENT_DROPPED,
77 };
78 
79 static bool break_test(void)
80 {
81 	return test_error || kthread_should_stop();
82 }
83 
84 static enum event_status read_event(int cpu)
85 {
86 	struct ring_buffer_event *event;
87 	int *entry;
88 	u64 ts;
89 
90 	event = ring_buffer_consume(buffer, cpu, &ts, NULL);
91 	if (!event)
92 		return EVENT_DROPPED;
93 
94 	entry = ring_buffer_event_data(event);
95 	if (*entry != cpu) {
96 		TEST_ERROR();
97 		return EVENT_DROPPED;
98 	}
99 
100 	read++;
101 	return EVENT_FOUND;
102 }
103 
104 static enum event_status read_page(int cpu)
105 {
106 	struct ring_buffer_event *event;
107 	struct rb_page *rpage;
108 	unsigned long commit;
109 	void *bpage;
110 	int *entry;
111 	int ret;
112 	int inc;
113 	int i;
114 
115 	bpage = ring_buffer_alloc_read_page(buffer, cpu);
116 	if (IS_ERR(bpage))
117 		return EVENT_DROPPED;
118 
119 	ret = ring_buffer_read_page(buffer, &bpage, PAGE_SIZE, cpu, 1);
120 	if (ret >= 0) {
121 		rpage = bpage;
122 		/* The commit may have missed event flags set, clear them */
123 		commit = local_read(&rpage->commit) & 0xfffff;
124 		for (i = 0; i < commit && !test_error ; i += inc) {
125 
126 			if (i >= (PAGE_SIZE - offsetof(struct rb_page, data))) {
127 				TEST_ERROR();
128 				break;
129 			}
130 
131 			inc = -1;
132 			event = (void *)&rpage->data[i];
133 			switch (event->type_len) {
134 			case RINGBUF_TYPE_PADDING:
135 				/* failed writes may be discarded events */
136 				if (!event->time_delta)
137 					TEST_ERROR();
138 				inc = event->array[0] + 4;
139 				break;
140 			case RINGBUF_TYPE_TIME_EXTEND:
141 				inc = 8;
142 				break;
143 			case 0:
144 				entry = ring_buffer_event_data(event);
145 				if (*entry != cpu) {
146 					TEST_ERROR();
147 					break;
148 				}
149 				read++;
150 				if (!event->array[0]) {
151 					TEST_ERROR();
152 					break;
153 				}
154 				inc = event->array[0] + 4;
155 				break;
156 			default:
157 				entry = ring_buffer_event_data(event);
158 				if (*entry != cpu) {
159 					TEST_ERROR();
160 					break;
161 				}
162 				read++;
163 				inc = ((event->type_len + 1) * 4);
164 			}
165 			if (test_error)
166 				break;
167 
168 			if (inc <= 0) {
169 				TEST_ERROR();
170 				break;
171 			}
172 		}
173 	}
174 	ring_buffer_free_read_page(buffer, cpu, bpage);
175 
176 	if (ret < 0)
177 		return EVENT_DROPPED;
178 	return EVENT_FOUND;
179 }
180 
181 static void ring_buffer_consumer(void)
182 {
183 	/* toggle between reading pages and events */
184 	read_events ^= 1;
185 
186 	read = 0;
187 	/*
188 	 * Continue running until the producer specifically asks to stop
189 	 * and is ready for the completion.
190 	 */
191 	while (!READ_ONCE(reader_finish)) {
192 		int found = 1;
193 
194 		while (found && !test_error) {
195 			int cpu;
196 
197 			found = 0;
198 			for_each_online_cpu(cpu) {
199 				enum event_status stat;
200 
201 				if (read_events)
202 					stat = read_event(cpu);
203 				else
204 					stat = read_page(cpu);
205 
206 				if (test_error)
207 					break;
208 
209 				if (stat == EVENT_FOUND)
210 					found = 1;
211 
212 			}
213 		}
214 
215 		/* Wait till the producer wakes us up when there is more data
216 		 * available or when the producer wants us to finish reading.
217 		 */
218 		set_current_state(TASK_INTERRUPTIBLE);
219 		if (reader_finish)
220 			break;
221 
222 		schedule();
223 	}
224 	__set_current_state(TASK_RUNNING);
225 	reader_finish = 0;
226 	complete(&read_done);
227 }
228 
229 static void ring_buffer_producer(void)
230 {
231 	ktime_t start_time, end_time, timeout;
232 	unsigned long long time;
233 	unsigned long long entries;
234 	unsigned long long overruns;
235 	unsigned long missed = 0;
236 	unsigned long hit = 0;
237 	unsigned long avg;
238 	int cnt = 0;
239 
240 	/*
241 	 * Hammer the buffer for 10 secs (this may
242 	 * make the system stall)
243 	 */
244 	trace_printk("Starting ring buffer hammer\n");
245 	start_time = ktime_get();
246 	timeout = ktime_add_ns(start_time, RUN_TIME * NSEC_PER_SEC);
247 	do {
248 		struct ring_buffer_event *event;
249 		int *entry;
250 		int i;
251 
252 		for (i = 0; i < write_iteration; i++) {
253 			event = ring_buffer_lock_reserve(buffer, 10);
254 			if (!event) {
255 				missed++;
256 			} else {
257 				hit++;
258 				entry = ring_buffer_event_data(event);
259 				*entry = smp_processor_id();
260 				ring_buffer_unlock_commit(buffer, event);
261 			}
262 		}
263 		end_time = ktime_get();
264 
265 		cnt++;
266 		if (consumer && !(cnt % wakeup_interval))
267 			wake_up_process(consumer);
268 
269 #ifndef CONFIG_PREEMPT
270 		/*
271 		 * If we are a non preempt kernel, the 10 second run will
272 		 * stop everything while it runs. Instead, we will call
273 		 * cond_resched and also add any time that was lost by a
274 		 * rescedule.
275 		 *
276 		 * Do a cond resched at the same frequency we would wake up
277 		 * the reader.
278 		 */
279 		if (cnt % wakeup_interval)
280 			cond_resched();
281 #endif
282 	} while (ktime_before(end_time, timeout) && !break_test());
283 	trace_printk("End ring buffer hammer\n");
284 
285 	if (consumer) {
286 		/* Init both completions here to avoid races */
287 		init_completion(&read_start);
288 		init_completion(&read_done);
289 		/* the completions must be visible before the finish var */
290 		smp_wmb();
291 		reader_finish = 1;
292 		wake_up_process(consumer);
293 		wait_for_completion(&read_done);
294 	}
295 
296 	time = ktime_us_delta(end_time, start_time);
297 
298 	entries = ring_buffer_entries(buffer);
299 	overruns = ring_buffer_overruns(buffer);
300 
301 	if (test_error)
302 		trace_printk("ERROR!\n");
303 
304 	if (!disable_reader) {
305 		if (consumer_fifo < 0)
306 			trace_printk("Running Consumer at nice: %d\n",
307 				     consumer_nice);
308 		else
309 			trace_printk("Running Consumer at SCHED_FIFO %d\n",
310 				     consumer_fifo);
311 	}
312 	if (producer_fifo < 0)
313 		trace_printk("Running Producer at nice: %d\n",
314 			     producer_nice);
315 	else
316 		trace_printk("Running Producer at SCHED_FIFO %d\n",
317 			     producer_fifo);
318 
319 	/* Let the user know that the test is running at low priority */
320 	if (producer_fifo < 0 && consumer_fifo < 0 &&
321 	    producer_nice == MAX_NICE && consumer_nice == MAX_NICE)
322 		trace_printk("WARNING!!! This test is running at lowest priority.\n");
323 
324 	trace_printk("Time:     %lld (usecs)\n", time);
325 	trace_printk("Overruns: %lld\n", overruns);
326 	if (disable_reader)
327 		trace_printk("Read:     (reader disabled)\n");
328 	else
329 		trace_printk("Read:     %ld  (by %s)\n", read,
330 			read_events ? "events" : "pages");
331 	trace_printk("Entries:  %lld\n", entries);
332 	trace_printk("Total:    %lld\n", entries + overruns + read);
333 	trace_printk("Missed:   %ld\n", missed);
334 	trace_printk("Hit:      %ld\n", hit);
335 
336 	/* Convert time from usecs to millisecs */
337 	do_div(time, USEC_PER_MSEC);
338 	if (time)
339 		hit /= (long)time;
340 	else
341 		trace_printk("TIME IS ZERO??\n");
342 
343 	trace_printk("Entries per millisec: %ld\n", hit);
344 
345 	if (hit) {
346 		/* Calculate the average time in nanosecs */
347 		avg = NSEC_PER_MSEC / hit;
348 		trace_printk("%ld ns per entry\n", avg);
349 	}
350 
351 	if (missed) {
352 		if (time)
353 			missed /= (long)time;
354 
355 		trace_printk("Total iterations per millisec: %ld\n",
356 			     hit + missed);
357 
358 		/* it is possible that hit + missed will overflow and be zero */
359 		if (!(hit + missed)) {
360 			trace_printk("hit + missed overflowed and totalled zero!\n");
361 			hit--; /* make it non zero */
362 		}
363 
364 		/* Caculate the average time in nanosecs */
365 		avg = NSEC_PER_MSEC / (hit + missed);
366 		trace_printk("%ld ns per entry\n", avg);
367 	}
368 }
369 
370 static void wait_to_die(void)
371 {
372 	set_current_state(TASK_INTERRUPTIBLE);
373 	while (!kthread_should_stop()) {
374 		schedule();
375 		set_current_state(TASK_INTERRUPTIBLE);
376 	}
377 	__set_current_state(TASK_RUNNING);
378 }
379 
380 static int ring_buffer_consumer_thread(void *arg)
381 {
382 	while (!break_test()) {
383 		complete(&read_start);
384 
385 		ring_buffer_consumer();
386 
387 		set_current_state(TASK_INTERRUPTIBLE);
388 		if (break_test())
389 			break;
390 		schedule();
391 	}
392 	__set_current_state(TASK_RUNNING);
393 
394 	if (!kthread_should_stop())
395 		wait_to_die();
396 
397 	return 0;
398 }
399 
400 static int ring_buffer_producer_thread(void *arg)
401 {
402 	while (!break_test()) {
403 		ring_buffer_reset(buffer);
404 
405 		if (consumer) {
406 			wake_up_process(consumer);
407 			wait_for_completion(&read_start);
408 		}
409 
410 		ring_buffer_producer();
411 		if (break_test())
412 			goto out_kill;
413 
414 		trace_printk("Sleeping for 10 secs\n");
415 		set_current_state(TASK_INTERRUPTIBLE);
416 		if (break_test())
417 			goto out_kill;
418 		schedule_timeout(HZ * SLEEP_TIME);
419 	}
420 
421 out_kill:
422 	__set_current_state(TASK_RUNNING);
423 	if (!kthread_should_stop())
424 		wait_to_die();
425 
426 	return 0;
427 }
428 
429 static int __init ring_buffer_benchmark_init(void)
430 {
431 	int ret;
432 
433 	/* make a one meg buffer in overwite mode */
434 	buffer = ring_buffer_alloc(1000000, RB_FL_OVERWRITE);
435 	if (!buffer)
436 		return -ENOMEM;
437 
438 	if (!disable_reader) {
439 		consumer = kthread_create(ring_buffer_consumer_thread,
440 					  NULL, "rb_consumer");
441 		ret = PTR_ERR(consumer);
442 		if (IS_ERR(consumer))
443 			goto out_fail;
444 	}
445 
446 	producer = kthread_run(ring_buffer_producer_thread,
447 			       NULL, "rb_producer");
448 	ret = PTR_ERR(producer);
449 
450 	if (IS_ERR(producer))
451 		goto out_kill;
452 
453 	/*
454 	 * Run them as low-prio background tasks by default:
455 	 */
456 	if (!disable_reader) {
457 		if (consumer_fifo >= 0) {
458 			struct sched_param param = {
459 				.sched_priority = consumer_fifo
460 			};
461 			sched_setscheduler(consumer, SCHED_FIFO, &param);
462 		} else
463 			set_user_nice(consumer, consumer_nice);
464 	}
465 
466 	if (producer_fifo >= 0) {
467 		struct sched_param param = {
468 			.sched_priority = producer_fifo
469 		};
470 		sched_setscheduler(producer, SCHED_FIFO, &param);
471 	} else
472 		set_user_nice(producer, producer_nice);
473 
474 	return 0;
475 
476  out_kill:
477 	if (consumer)
478 		kthread_stop(consumer);
479 
480  out_fail:
481 	ring_buffer_free(buffer);
482 	return ret;
483 }
484 
485 static void __exit ring_buffer_benchmark_exit(void)
486 {
487 	kthread_stop(producer);
488 	if (consumer)
489 		kthread_stop(consumer);
490 	ring_buffer_free(buffer);
491 }
492 
493 module_init(ring_buffer_benchmark_init);
494 module_exit(ring_buffer_benchmark_exit);
495 
496 MODULE_AUTHOR("Steven Rostedt");
497 MODULE_DESCRIPTION("ring_buffer_benchmark");
498 MODULE_LICENSE("GPL");
499