1 /*
2  * ring buffer tester and benchmark
3  *
4  * Copyright (C) 2009 Steven Rostedt <srostedt@redhat.com>
5  */
6 #include <linux/ring_buffer.h>
7 #include <linux/completion.h>
8 #include <linux/kthread.h>
9 #include <linux/module.h>
10 #include <linux/time.h>
11 
12 struct rb_page {
13 	u64		ts;
14 	local_t		commit;
15 	char		data[4080];
16 };
17 
18 /* run time and sleep time in seconds */
19 #define RUN_TIME	10
20 #define SLEEP_TIME	10
21 
22 /* number of events for writer to wake up the reader */
23 static int wakeup_interval = 100;
24 
25 static int reader_finish;
26 static struct completion read_start;
27 static struct completion read_done;
28 
29 static struct ring_buffer *buffer;
30 static struct task_struct *producer;
31 static struct task_struct *consumer;
32 static unsigned long read;
33 
34 static int disable_reader;
35 module_param(disable_reader, uint, 0644);
36 MODULE_PARM_DESC(disable_reader, "only run producer");
37 
38 static int read_events;
39 
40 static int kill_test;
41 
42 #define KILL_TEST()				\
43 	do {					\
44 		if (!kill_test) {		\
45 			kill_test = 1;		\
46 			WARN_ON(1);		\
47 		}				\
48 	} while (0)
49 
50 enum event_status {
51 	EVENT_FOUND,
52 	EVENT_DROPPED,
53 };
54 
55 static enum event_status read_event(int cpu)
56 {
57 	struct ring_buffer_event *event;
58 	int *entry;
59 	u64 ts;
60 
61 	event = ring_buffer_consume(buffer, cpu, &ts);
62 	if (!event)
63 		return EVENT_DROPPED;
64 
65 	entry = ring_buffer_event_data(event);
66 	if (*entry != cpu) {
67 		KILL_TEST();
68 		return EVENT_DROPPED;
69 	}
70 
71 	read++;
72 	return EVENT_FOUND;
73 }
74 
75 static enum event_status read_page(int cpu)
76 {
77 	struct ring_buffer_event *event;
78 	struct rb_page *rpage;
79 	unsigned long commit;
80 	void *bpage;
81 	int *entry;
82 	int ret;
83 	int inc;
84 	int i;
85 
86 	bpage = ring_buffer_alloc_read_page(buffer);
87 	if (!bpage)
88 		return EVENT_DROPPED;
89 
90 	ret = ring_buffer_read_page(buffer, &bpage, PAGE_SIZE, cpu, 1);
91 	if (ret >= 0) {
92 		rpage = bpage;
93 		commit = local_read(&rpage->commit);
94 		for (i = 0; i < commit && !kill_test; i += inc) {
95 
96 			if (i >= (PAGE_SIZE - offsetof(struct rb_page, data))) {
97 				KILL_TEST();
98 				break;
99 			}
100 
101 			inc = -1;
102 			event = (void *)&rpage->data[i];
103 			switch (event->type_len) {
104 			case RINGBUF_TYPE_PADDING:
105 				/* failed writes may be discarded events */
106 				if (!event->time_delta)
107 					KILL_TEST();
108 				inc = event->array[0] + 4;
109 				break;
110 			case RINGBUF_TYPE_TIME_EXTEND:
111 				inc = 8;
112 				break;
113 			case 0:
114 				entry = ring_buffer_event_data(event);
115 				if (*entry != cpu) {
116 					KILL_TEST();
117 					break;
118 				}
119 				read++;
120 				if (!event->array[0]) {
121 					KILL_TEST();
122 					break;
123 				}
124 				inc = event->array[0] + 4;
125 				break;
126 			default:
127 				entry = ring_buffer_event_data(event);
128 				if (*entry != cpu) {
129 					KILL_TEST();
130 					break;
131 				}
132 				read++;
133 				inc = ((event->type_len + 1) * 4);
134 			}
135 			if (kill_test)
136 				break;
137 
138 			if (inc <= 0) {
139 				KILL_TEST();
140 				break;
141 			}
142 		}
143 	}
144 	ring_buffer_free_read_page(buffer, bpage);
145 
146 	if (ret < 0)
147 		return EVENT_DROPPED;
148 	return EVENT_FOUND;
149 }
150 
151 static void ring_buffer_consumer(void)
152 {
153 	/* toggle between reading pages and events */
154 	read_events ^= 1;
155 
156 	read = 0;
157 	while (!reader_finish && !kill_test) {
158 		int found;
159 
160 		do {
161 			int cpu;
162 
163 			found = 0;
164 			for_each_online_cpu(cpu) {
165 				enum event_status stat;
166 
167 				if (read_events)
168 					stat = read_event(cpu);
169 				else
170 					stat = read_page(cpu);
171 
172 				if (kill_test)
173 					break;
174 				if (stat == EVENT_FOUND)
175 					found = 1;
176 			}
177 		} while (found && !kill_test);
178 
179 		set_current_state(TASK_INTERRUPTIBLE);
180 		if (reader_finish)
181 			break;
182 
183 		schedule();
184 		__set_current_state(TASK_RUNNING);
185 	}
186 	reader_finish = 0;
187 	complete(&read_done);
188 }
189 
190 static void ring_buffer_producer(void)
191 {
192 	struct timeval start_tv;
193 	struct timeval end_tv;
194 	unsigned long long time;
195 	unsigned long long entries;
196 	unsigned long long overruns;
197 	unsigned long missed = 0;
198 	unsigned long hit = 0;
199 	unsigned long avg;
200 	int cnt = 0;
201 
202 	/*
203 	 * Hammer the buffer for 10 secs (this may
204 	 * make the system stall)
205 	 */
206 	trace_printk("Starting ring buffer hammer\n");
207 	do_gettimeofday(&start_tv);
208 	do {
209 		struct ring_buffer_event *event;
210 		int *entry;
211 
212 		event = ring_buffer_lock_reserve(buffer, 10);
213 		if (!event) {
214 			missed++;
215 		} else {
216 			hit++;
217 			entry = ring_buffer_event_data(event);
218 			*entry = smp_processor_id();
219 			ring_buffer_unlock_commit(buffer, event);
220 		}
221 		do_gettimeofday(&end_tv);
222 
223 		cnt++;
224 		if (consumer && !(cnt % wakeup_interval))
225 			wake_up_process(consumer);
226 
227 #ifndef CONFIG_PREEMPT
228 		/*
229 		 * If we are a non preempt kernel, the 10 second run will
230 		 * stop everything while it runs. Instead, we will call
231 		 * cond_resched and also add any time that was lost by a
232 		 * rescedule.
233 		 *
234 		 * Do a cond resched at the same frequency we would wake up
235 		 * the reader.
236 		 */
237 		if (cnt % wakeup_interval)
238 			cond_resched();
239 #endif
240 
241 	} while (end_tv.tv_sec < (start_tv.tv_sec + RUN_TIME) && !kill_test);
242 	trace_printk("End ring buffer hammer\n");
243 
244 	if (consumer) {
245 		/* Init both completions here to avoid races */
246 		init_completion(&read_start);
247 		init_completion(&read_done);
248 		/* the completions must be visible before the finish var */
249 		smp_wmb();
250 		reader_finish = 1;
251 		/* finish var visible before waking up the consumer */
252 		smp_wmb();
253 		wake_up_process(consumer);
254 		wait_for_completion(&read_done);
255 	}
256 
257 	time = end_tv.tv_sec - start_tv.tv_sec;
258 	time *= USEC_PER_SEC;
259 	time += (long long)((long)end_tv.tv_usec - (long)start_tv.tv_usec);
260 
261 	entries = ring_buffer_entries(buffer);
262 	overruns = ring_buffer_overruns(buffer);
263 
264 	if (kill_test)
265 		trace_printk("ERROR!\n");
266 	trace_printk("Time:     %lld (usecs)\n", time);
267 	trace_printk("Overruns: %lld\n", overruns);
268 	if (disable_reader)
269 		trace_printk("Read:     (reader disabled)\n");
270 	else
271 		trace_printk("Read:     %ld  (by %s)\n", read,
272 			read_events ? "events" : "pages");
273 	trace_printk("Entries:  %lld\n", entries);
274 	trace_printk("Total:    %lld\n", entries + overruns + read);
275 	trace_printk("Missed:   %ld\n", missed);
276 	trace_printk("Hit:      %ld\n", hit);
277 
278 	/* Convert time from usecs to millisecs */
279 	do_div(time, USEC_PER_MSEC);
280 	if (time)
281 		hit /= (long)time;
282 	else
283 		trace_printk("TIME IS ZERO??\n");
284 
285 	trace_printk("Entries per millisec: %ld\n", hit);
286 
287 	if (hit) {
288 		/* Calculate the average time in nanosecs */
289 		avg = NSEC_PER_MSEC / hit;
290 		trace_printk("%ld ns per entry\n", avg);
291 	}
292 
293 	if (missed) {
294 		if (time)
295 			missed /= (long)time;
296 
297 		trace_printk("Total iterations per millisec: %ld\n",
298 			     hit + missed);
299 
300 		/* it is possible that hit + missed will overflow and be zero */
301 		if (!(hit + missed)) {
302 			trace_printk("hit + missed overflowed and totalled zero!\n");
303 			hit--; /* make it non zero */
304 		}
305 
306 		/* Caculate the average time in nanosecs */
307 		avg = NSEC_PER_MSEC / (hit + missed);
308 		trace_printk("%ld ns per entry\n", avg);
309 	}
310 }
311 
312 static void wait_to_die(void)
313 {
314 	set_current_state(TASK_INTERRUPTIBLE);
315 	while (!kthread_should_stop()) {
316 		schedule();
317 		set_current_state(TASK_INTERRUPTIBLE);
318 	}
319 	__set_current_state(TASK_RUNNING);
320 }
321 
322 static int ring_buffer_consumer_thread(void *arg)
323 {
324 	while (!kthread_should_stop() && !kill_test) {
325 		complete(&read_start);
326 
327 		ring_buffer_consumer();
328 
329 		set_current_state(TASK_INTERRUPTIBLE);
330 		if (kthread_should_stop() || kill_test)
331 			break;
332 
333 		schedule();
334 		__set_current_state(TASK_RUNNING);
335 	}
336 	__set_current_state(TASK_RUNNING);
337 
338 	if (kill_test)
339 		wait_to_die();
340 
341 	return 0;
342 }
343 
344 static int ring_buffer_producer_thread(void *arg)
345 {
346 	init_completion(&read_start);
347 
348 	while (!kthread_should_stop() && !kill_test) {
349 		ring_buffer_reset(buffer);
350 
351 		if (consumer) {
352 			smp_wmb();
353 			wake_up_process(consumer);
354 			wait_for_completion(&read_start);
355 		}
356 
357 		ring_buffer_producer();
358 
359 		trace_printk("Sleeping for 10 secs\n");
360 		set_current_state(TASK_INTERRUPTIBLE);
361 		schedule_timeout(HZ * SLEEP_TIME);
362 		__set_current_state(TASK_RUNNING);
363 	}
364 
365 	if (kill_test)
366 		wait_to_die();
367 
368 	return 0;
369 }
370 
371 static int __init ring_buffer_benchmark_init(void)
372 {
373 	int ret;
374 
375 	/* make a one meg buffer in overwite mode */
376 	buffer = ring_buffer_alloc(1000000, RB_FL_OVERWRITE);
377 	if (!buffer)
378 		return -ENOMEM;
379 
380 	if (!disable_reader) {
381 		consumer = kthread_create(ring_buffer_consumer_thread,
382 					  NULL, "rb_consumer");
383 		ret = PTR_ERR(consumer);
384 		if (IS_ERR(consumer))
385 			goto out_fail;
386 	}
387 
388 	producer = kthread_run(ring_buffer_producer_thread,
389 			       NULL, "rb_producer");
390 	ret = PTR_ERR(producer);
391 
392 	if (IS_ERR(producer))
393 		goto out_kill;
394 
395 	return 0;
396 
397  out_kill:
398 	if (consumer)
399 		kthread_stop(consumer);
400 
401  out_fail:
402 	ring_buffer_free(buffer);
403 	return ret;
404 }
405 
406 static void __exit ring_buffer_benchmark_exit(void)
407 {
408 	kthread_stop(producer);
409 	if (consumer)
410 		kthread_stop(consumer);
411 	ring_buffer_free(buffer);
412 }
413 
414 module_init(ring_buffer_benchmark_init);
415 module_exit(ring_buffer_benchmark_exit);
416 
417 MODULE_AUTHOR("Steven Rostedt");
418 MODULE_DESCRIPTION("ring_buffer_benchmark");
419 MODULE_LICENSE("GPL");
420