1 /*
2  * ring buffer tester and benchmark
3  *
4  * Copyright (C) 2009 Steven Rostedt <srostedt@redhat.com>
5  */
6 #include <linux/ring_buffer.h>
7 #include <linux/completion.h>
8 #include <linux/kthread.h>
9 #include <linux/module.h>
10 #include <linux/ktime.h>
11 #include <asm/local.h>
12 
13 struct rb_page {
14 	u64		ts;
15 	local_t		commit;
16 	char		data[4080];
17 };
18 
19 /* run time and sleep time in seconds */
20 #define RUN_TIME	10ULL
21 #define SLEEP_TIME	10
22 
23 /* number of events for writer to wake up the reader */
24 static int wakeup_interval = 100;
25 
26 static int reader_finish;
27 static DECLARE_COMPLETION(read_start);
28 static DECLARE_COMPLETION(read_done);
29 
30 static struct ring_buffer *buffer;
31 static struct task_struct *producer;
32 static struct task_struct *consumer;
33 static unsigned long read;
34 
35 static unsigned int disable_reader;
36 module_param(disable_reader, uint, 0644);
37 MODULE_PARM_DESC(disable_reader, "only run producer");
38 
39 static unsigned int write_iteration = 50;
40 module_param(write_iteration, uint, 0644);
41 MODULE_PARM_DESC(write_iteration, "# of writes between timestamp readings");
42 
43 static int producer_nice = MAX_NICE;
44 static int consumer_nice = MAX_NICE;
45 
46 static int producer_fifo = -1;
47 static int consumer_fifo = -1;
48 
49 module_param(producer_nice, int, 0644);
50 MODULE_PARM_DESC(producer_nice, "nice prio for producer");
51 
52 module_param(consumer_nice, int, 0644);
53 MODULE_PARM_DESC(consumer_nice, "nice prio for consumer");
54 
55 module_param(producer_fifo, int, 0644);
56 MODULE_PARM_DESC(producer_fifo, "fifo prio for producer");
57 
58 module_param(consumer_fifo, int, 0644);
59 MODULE_PARM_DESC(consumer_fifo, "fifo prio for consumer");
60 
61 static int read_events;
62 
63 static int test_error;
64 
65 #define TEST_ERROR()				\
66 	do {					\
67 		if (!test_error) {		\
68 			test_error = 1;		\
69 			WARN_ON(1);		\
70 		}				\
71 	} while (0)
72 
73 enum event_status {
74 	EVENT_FOUND,
75 	EVENT_DROPPED,
76 };
77 
78 static bool break_test(void)
79 {
80 	return test_error || kthread_should_stop();
81 }
82 
83 static enum event_status read_event(int cpu)
84 {
85 	struct ring_buffer_event *event;
86 	int *entry;
87 	u64 ts;
88 
89 	event = ring_buffer_consume(buffer, cpu, &ts, NULL);
90 	if (!event)
91 		return EVENT_DROPPED;
92 
93 	entry = ring_buffer_event_data(event);
94 	if (*entry != cpu) {
95 		TEST_ERROR();
96 		return EVENT_DROPPED;
97 	}
98 
99 	read++;
100 	return EVENT_FOUND;
101 }
102 
103 static enum event_status read_page(int cpu)
104 {
105 	struct ring_buffer_event *event;
106 	struct rb_page *rpage;
107 	unsigned long commit;
108 	void *bpage;
109 	int *entry;
110 	int ret;
111 	int inc;
112 	int i;
113 
114 	bpage = ring_buffer_alloc_read_page(buffer, cpu);
115 	if (!bpage)
116 		return EVENT_DROPPED;
117 
118 	ret = ring_buffer_read_page(buffer, &bpage, PAGE_SIZE, cpu, 1);
119 	if (ret >= 0) {
120 		rpage = bpage;
121 		/* The commit may have missed event flags set, clear them */
122 		commit = local_read(&rpage->commit) & 0xfffff;
123 		for (i = 0; i < commit && !test_error ; i += inc) {
124 
125 			if (i >= (PAGE_SIZE - offsetof(struct rb_page, data))) {
126 				TEST_ERROR();
127 				break;
128 			}
129 
130 			inc = -1;
131 			event = (void *)&rpage->data[i];
132 			switch (event->type_len) {
133 			case RINGBUF_TYPE_PADDING:
134 				/* failed writes may be discarded events */
135 				if (!event->time_delta)
136 					TEST_ERROR();
137 				inc = event->array[0] + 4;
138 				break;
139 			case RINGBUF_TYPE_TIME_EXTEND:
140 				inc = 8;
141 				break;
142 			case 0:
143 				entry = ring_buffer_event_data(event);
144 				if (*entry != cpu) {
145 					TEST_ERROR();
146 					break;
147 				}
148 				read++;
149 				if (!event->array[0]) {
150 					TEST_ERROR();
151 					break;
152 				}
153 				inc = event->array[0] + 4;
154 				break;
155 			default:
156 				entry = ring_buffer_event_data(event);
157 				if (*entry != cpu) {
158 					TEST_ERROR();
159 					break;
160 				}
161 				read++;
162 				inc = ((event->type_len + 1) * 4);
163 			}
164 			if (test_error)
165 				break;
166 
167 			if (inc <= 0) {
168 				TEST_ERROR();
169 				break;
170 			}
171 		}
172 	}
173 	ring_buffer_free_read_page(buffer, bpage);
174 
175 	if (ret < 0)
176 		return EVENT_DROPPED;
177 	return EVENT_FOUND;
178 }
179 
180 static void ring_buffer_consumer(void)
181 {
182 	/* toggle between reading pages and events */
183 	read_events ^= 1;
184 
185 	read = 0;
186 	/*
187 	 * Continue running until the producer specifically asks to stop
188 	 * and is ready for the completion.
189 	 */
190 	while (!READ_ONCE(reader_finish)) {
191 		int found = 1;
192 
193 		while (found && !test_error) {
194 			int cpu;
195 
196 			found = 0;
197 			for_each_online_cpu(cpu) {
198 				enum event_status stat;
199 
200 				if (read_events)
201 					stat = read_event(cpu);
202 				else
203 					stat = read_page(cpu);
204 
205 				if (test_error)
206 					break;
207 
208 				if (stat == EVENT_FOUND)
209 					found = 1;
210 
211 			}
212 		}
213 
214 		/* Wait till the producer wakes us up when there is more data
215 		 * available or when the producer wants us to finish reading.
216 		 */
217 		set_current_state(TASK_INTERRUPTIBLE);
218 		if (reader_finish)
219 			break;
220 
221 		schedule();
222 	}
223 	__set_current_state(TASK_RUNNING);
224 	reader_finish = 0;
225 	complete(&read_done);
226 }
227 
228 static void ring_buffer_producer(void)
229 {
230 	ktime_t start_time, end_time, timeout;
231 	unsigned long long time;
232 	unsigned long long entries;
233 	unsigned long long overruns;
234 	unsigned long missed = 0;
235 	unsigned long hit = 0;
236 	unsigned long avg;
237 	int cnt = 0;
238 
239 	/*
240 	 * Hammer the buffer for 10 secs (this may
241 	 * make the system stall)
242 	 */
243 	trace_printk("Starting ring buffer hammer\n");
244 	start_time = ktime_get();
245 	timeout = ktime_add_ns(start_time, RUN_TIME * NSEC_PER_SEC);
246 	do {
247 		struct ring_buffer_event *event;
248 		int *entry;
249 		int i;
250 
251 		for (i = 0; i < write_iteration; i++) {
252 			event = ring_buffer_lock_reserve(buffer, 10);
253 			if (!event) {
254 				missed++;
255 			} else {
256 				hit++;
257 				entry = ring_buffer_event_data(event);
258 				*entry = smp_processor_id();
259 				ring_buffer_unlock_commit(buffer, event);
260 			}
261 		}
262 		end_time = ktime_get();
263 
264 		cnt++;
265 		if (consumer && !(cnt % wakeup_interval))
266 			wake_up_process(consumer);
267 
268 #ifndef CONFIG_PREEMPT
269 		/*
270 		 * If we are a non preempt kernel, the 10 second run will
271 		 * stop everything while it runs. Instead, we will call
272 		 * cond_resched and also add any time that was lost by a
273 		 * rescedule.
274 		 *
275 		 * Do a cond resched at the same frequency we would wake up
276 		 * the reader.
277 		 */
278 		if (cnt % wakeup_interval)
279 			cond_resched();
280 #endif
281 	} while (ktime_before(end_time, timeout) && !break_test());
282 	trace_printk("End ring buffer hammer\n");
283 
284 	if (consumer) {
285 		/* Init both completions here to avoid races */
286 		init_completion(&read_start);
287 		init_completion(&read_done);
288 		/* the completions must be visible before the finish var */
289 		smp_wmb();
290 		reader_finish = 1;
291 		wake_up_process(consumer);
292 		wait_for_completion(&read_done);
293 	}
294 
295 	time = ktime_us_delta(end_time, start_time);
296 
297 	entries = ring_buffer_entries(buffer);
298 	overruns = ring_buffer_overruns(buffer);
299 
300 	if (test_error)
301 		trace_printk("ERROR!\n");
302 
303 	if (!disable_reader) {
304 		if (consumer_fifo < 0)
305 			trace_printk("Running Consumer at nice: %d\n",
306 				     consumer_nice);
307 		else
308 			trace_printk("Running Consumer at SCHED_FIFO %d\n",
309 				     consumer_fifo);
310 	}
311 	if (producer_fifo < 0)
312 		trace_printk("Running Producer at nice: %d\n",
313 			     producer_nice);
314 	else
315 		trace_printk("Running Producer at SCHED_FIFO %d\n",
316 			     producer_fifo);
317 
318 	/* Let the user know that the test is running at low priority */
319 	if (producer_fifo < 0 && consumer_fifo < 0 &&
320 	    producer_nice == MAX_NICE && consumer_nice == MAX_NICE)
321 		trace_printk("WARNING!!! This test is running at lowest priority.\n");
322 
323 	trace_printk("Time:     %lld (usecs)\n", time);
324 	trace_printk("Overruns: %lld\n", overruns);
325 	if (disable_reader)
326 		trace_printk("Read:     (reader disabled)\n");
327 	else
328 		trace_printk("Read:     %ld  (by %s)\n", read,
329 			read_events ? "events" : "pages");
330 	trace_printk("Entries:  %lld\n", entries);
331 	trace_printk("Total:    %lld\n", entries + overruns + read);
332 	trace_printk("Missed:   %ld\n", missed);
333 	trace_printk("Hit:      %ld\n", hit);
334 
335 	/* Convert time from usecs to millisecs */
336 	do_div(time, USEC_PER_MSEC);
337 	if (time)
338 		hit /= (long)time;
339 	else
340 		trace_printk("TIME IS ZERO??\n");
341 
342 	trace_printk("Entries per millisec: %ld\n", hit);
343 
344 	if (hit) {
345 		/* Calculate the average time in nanosecs */
346 		avg = NSEC_PER_MSEC / hit;
347 		trace_printk("%ld ns per entry\n", avg);
348 	}
349 
350 	if (missed) {
351 		if (time)
352 			missed /= (long)time;
353 
354 		trace_printk("Total iterations per millisec: %ld\n",
355 			     hit + missed);
356 
357 		/* it is possible that hit + missed will overflow and be zero */
358 		if (!(hit + missed)) {
359 			trace_printk("hit + missed overflowed and totalled zero!\n");
360 			hit--; /* make it non zero */
361 		}
362 
363 		/* Caculate the average time in nanosecs */
364 		avg = NSEC_PER_MSEC / (hit + missed);
365 		trace_printk("%ld ns per entry\n", avg);
366 	}
367 }
368 
369 static void wait_to_die(void)
370 {
371 	set_current_state(TASK_INTERRUPTIBLE);
372 	while (!kthread_should_stop()) {
373 		schedule();
374 		set_current_state(TASK_INTERRUPTIBLE);
375 	}
376 	__set_current_state(TASK_RUNNING);
377 }
378 
379 static int ring_buffer_consumer_thread(void *arg)
380 {
381 	while (!break_test()) {
382 		complete(&read_start);
383 
384 		ring_buffer_consumer();
385 
386 		set_current_state(TASK_INTERRUPTIBLE);
387 		if (break_test())
388 			break;
389 		schedule();
390 	}
391 	__set_current_state(TASK_RUNNING);
392 
393 	if (!kthread_should_stop())
394 		wait_to_die();
395 
396 	return 0;
397 }
398 
399 static int ring_buffer_producer_thread(void *arg)
400 {
401 	while (!break_test()) {
402 		ring_buffer_reset(buffer);
403 
404 		if (consumer) {
405 			wake_up_process(consumer);
406 			wait_for_completion(&read_start);
407 		}
408 
409 		ring_buffer_producer();
410 		if (break_test())
411 			goto out_kill;
412 
413 		trace_printk("Sleeping for 10 secs\n");
414 		set_current_state(TASK_INTERRUPTIBLE);
415 		if (break_test())
416 			goto out_kill;
417 		schedule_timeout(HZ * SLEEP_TIME);
418 	}
419 
420 out_kill:
421 	__set_current_state(TASK_RUNNING);
422 	if (!kthread_should_stop())
423 		wait_to_die();
424 
425 	return 0;
426 }
427 
428 static int __init ring_buffer_benchmark_init(void)
429 {
430 	int ret;
431 
432 	/* make a one meg buffer in overwite mode */
433 	buffer = ring_buffer_alloc(1000000, RB_FL_OVERWRITE);
434 	if (!buffer)
435 		return -ENOMEM;
436 
437 	if (!disable_reader) {
438 		consumer = kthread_create(ring_buffer_consumer_thread,
439 					  NULL, "rb_consumer");
440 		ret = PTR_ERR(consumer);
441 		if (IS_ERR(consumer))
442 			goto out_fail;
443 	}
444 
445 	producer = kthread_run(ring_buffer_producer_thread,
446 			       NULL, "rb_producer");
447 	ret = PTR_ERR(producer);
448 
449 	if (IS_ERR(producer))
450 		goto out_kill;
451 
452 	/*
453 	 * Run them as low-prio background tasks by default:
454 	 */
455 	if (!disable_reader) {
456 		if (consumer_fifo >= 0) {
457 			struct sched_param param = {
458 				.sched_priority = consumer_fifo
459 			};
460 			sched_setscheduler(consumer, SCHED_FIFO, &param);
461 		} else
462 			set_user_nice(consumer, consumer_nice);
463 	}
464 
465 	if (producer_fifo >= 0) {
466 		struct sched_param param = {
467 			.sched_priority = producer_fifo
468 		};
469 		sched_setscheduler(producer, SCHED_FIFO, &param);
470 	} else
471 		set_user_nice(producer, producer_nice);
472 
473 	return 0;
474 
475  out_kill:
476 	if (consumer)
477 		kthread_stop(consumer);
478 
479  out_fail:
480 	ring_buffer_free(buffer);
481 	return ret;
482 }
483 
484 static void __exit ring_buffer_benchmark_exit(void)
485 {
486 	kthread_stop(producer);
487 	if (consumer)
488 		kthread_stop(consumer);
489 	ring_buffer_free(buffer);
490 }
491 
492 module_init(ring_buffer_benchmark_init);
493 module_exit(ring_buffer_benchmark_exit);
494 
495 MODULE_AUTHOR("Steven Rostedt");
496 MODULE_DESCRIPTION("ring_buffer_benchmark");
497 MODULE_LICENSE("GPL");
498