1 /*
2  * ring buffer tester and benchmark
3  *
4  * Copyright (C) 2009 Steven Rostedt <srostedt@redhat.com>
5  */
6 #include <linux/ring_buffer.h>
7 #include <linux/completion.h>
8 #include <linux/kthread.h>
9 #include <linux/module.h>
10 #include <linux/time.h>
11 
12 struct rb_page {
13 	u64		ts;
14 	local_t		commit;
15 	char		data[4080];
16 };
17 
18 /* run time and sleep time in seconds */
19 #define RUN_TIME	10
20 #define SLEEP_TIME	10
21 
22 /* number of events for writer to wake up the reader */
23 static int wakeup_interval = 100;
24 
25 static int reader_finish;
26 static struct completion read_start;
27 static struct completion read_done;
28 
29 static struct ring_buffer *buffer;
30 static struct task_struct *producer;
31 static struct task_struct *consumer;
32 static unsigned long read;
33 
34 static int disable_reader;
35 module_param(disable_reader, uint, 0644);
36 MODULE_PARM_DESC(disable_reader, "only run producer");
37 
38 static int read_events;
39 
40 static int kill_test;
41 
42 #define KILL_TEST()				\
43 	do {					\
44 		if (!kill_test) {		\
45 			kill_test = 1;		\
46 			WARN_ON(1);		\
47 		}				\
48 	} while (0)
49 
50 enum event_status {
51 	EVENT_FOUND,
52 	EVENT_DROPPED,
53 };
54 
55 static enum event_status read_event(int cpu)
56 {
57 	struct ring_buffer_event *event;
58 	int *entry;
59 	u64 ts;
60 
61 	event = ring_buffer_consume(buffer, cpu, &ts);
62 	if (!event)
63 		return EVENT_DROPPED;
64 
65 	entry = ring_buffer_event_data(event);
66 	if (*entry != cpu) {
67 		KILL_TEST();
68 		return EVENT_DROPPED;
69 	}
70 
71 	read++;
72 	return EVENT_FOUND;
73 }
74 
75 static enum event_status read_page(int cpu)
76 {
77 	struct ring_buffer_event *event;
78 	struct rb_page *rpage;
79 	unsigned long commit;
80 	void *bpage;
81 	int *entry;
82 	int ret;
83 	int inc;
84 	int i;
85 
86 	bpage = ring_buffer_alloc_read_page(buffer);
87 	if (!bpage)
88 		return EVENT_DROPPED;
89 
90 	ret = ring_buffer_read_page(buffer, &bpage, PAGE_SIZE, cpu, 1);
91 	if (ret >= 0) {
92 		rpage = bpage;
93 		commit = local_read(&rpage->commit);
94 		for (i = 0; i < commit && !kill_test; i += inc) {
95 
96 			if (i >= (PAGE_SIZE - offsetof(struct rb_page, data))) {
97 				KILL_TEST();
98 				break;
99 			}
100 
101 			inc = -1;
102 			event = (void *)&rpage->data[i];
103 			switch (event->type_len) {
104 			case RINGBUF_TYPE_PADDING:
105 				/* We don't expect any padding */
106 				KILL_TEST();
107 				break;
108 			case RINGBUF_TYPE_TIME_EXTEND:
109 				inc = 8;
110 				break;
111 			case 0:
112 				entry = ring_buffer_event_data(event);
113 				if (*entry != cpu) {
114 					KILL_TEST();
115 					break;
116 				}
117 				read++;
118 				if (!event->array[0]) {
119 					KILL_TEST();
120 					break;
121 				}
122 				inc = event->array[0];
123 				break;
124 			default:
125 				entry = ring_buffer_event_data(event);
126 				if (*entry != cpu) {
127 					KILL_TEST();
128 					break;
129 				}
130 				read++;
131 				inc = ((event->type_len + 1) * 4);
132 			}
133 			if (kill_test)
134 				break;
135 
136 			if (inc <= 0) {
137 				KILL_TEST();
138 				break;
139 			}
140 		}
141 	}
142 	ring_buffer_free_read_page(buffer, bpage);
143 
144 	if (ret < 0)
145 		return EVENT_DROPPED;
146 	return EVENT_FOUND;
147 }
148 
149 static void ring_buffer_consumer(void)
150 {
151 	/* toggle between reading pages and events */
152 	read_events ^= 1;
153 
154 	read = 0;
155 	while (!reader_finish && !kill_test) {
156 		int found;
157 
158 		do {
159 			int cpu;
160 
161 			found = 0;
162 			for_each_online_cpu(cpu) {
163 				enum event_status stat;
164 
165 				if (read_events)
166 					stat = read_event(cpu);
167 				else
168 					stat = read_page(cpu);
169 
170 				if (kill_test)
171 					break;
172 				if (stat == EVENT_FOUND)
173 					found = 1;
174 			}
175 		} while (found && !kill_test);
176 
177 		set_current_state(TASK_INTERRUPTIBLE);
178 		if (reader_finish)
179 			break;
180 
181 		schedule();
182 		__set_current_state(TASK_RUNNING);
183 	}
184 	reader_finish = 0;
185 	complete(&read_done);
186 }
187 
188 static void ring_buffer_producer(void)
189 {
190 	struct timeval start_tv;
191 	struct timeval end_tv;
192 	unsigned long long time;
193 	unsigned long long entries;
194 	unsigned long long overruns;
195 	unsigned long missed = 0;
196 	unsigned long hit = 0;
197 	unsigned long avg;
198 	int cnt = 0;
199 
200 	/*
201 	 * Hammer the buffer for 10 secs (this may
202 	 * make the system stall)
203 	 */
204 	pr_info("Starting ring buffer hammer\n");
205 	do_gettimeofday(&start_tv);
206 	do {
207 		struct ring_buffer_event *event;
208 		int *entry;
209 
210 		event = ring_buffer_lock_reserve(buffer, 10);
211 		if (!event) {
212 			missed++;
213 		} else {
214 			hit++;
215 			entry = ring_buffer_event_data(event);
216 			*entry = smp_processor_id();
217 			ring_buffer_unlock_commit(buffer, event);
218 		}
219 		do_gettimeofday(&end_tv);
220 
221 		cnt++;
222 		if (consumer && !(cnt % wakeup_interval))
223 			wake_up_process(consumer);
224 
225 #ifndef CONFIG_PREEMPT
226 		/*
227 		 * If we are a non preempt kernel, the 10 second run will
228 		 * stop everything while it runs. Instead, we will call
229 		 * cond_resched and also add any time that was lost by a
230 		 * rescedule.
231 		 *
232 		 * Do a cond resched at the same frequency we would wake up
233 		 * the reader.
234 		 */
235 		if (cnt % wakeup_interval)
236 			cond_resched();
237 #endif
238 
239 	} while (end_tv.tv_sec < (start_tv.tv_sec + RUN_TIME) && !kill_test);
240 	pr_info("End ring buffer hammer\n");
241 
242 	if (consumer) {
243 		/* Init both completions here to avoid races */
244 		init_completion(&read_start);
245 		init_completion(&read_done);
246 		/* the completions must be visible before the finish var */
247 		smp_wmb();
248 		reader_finish = 1;
249 		/* finish var visible before waking up the consumer */
250 		smp_wmb();
251 		wake_up_process(consumer);
252 		wait_for_completion(&read_done);
253 	}
254 
255 	time = end_tv.tv_sec - start_tv.tv_sec;
256 	time *= USEC_PER_SEC;
257 	time += (long long)((long)end_tv.tv_usec - (long)start_tv.tv_usec);
258 
259 	entries = ring_buffer_entries(buffer);
260 	overruns = ring_buffer_overruns(buffer);
261 
262 	if (kill_test)
263 		pr_info("ERROR!\n");
264 	pr_info("Time:     %lld (usecs)\n", time);
265 	pr_info("Overruns: %lld\n", overruns);
266 	if (disable_reader)
267 		pr_info("Read:     (reader disabled)\n");
268 	else
269 		pr_info("Read:     %ld  (by %s)\n", read,
270 			read_events ? "events" : "pages");
271 	pr_info("Entries:  %lld\n", entries);
272 	pr_info("Total:    %lld\n", entries + overruns + read);
273 	pr_info("Missed:   %ld\n", missed);
274 	pr_info("Hit:      %ld\n", hit);
275 
276 	/* Convert time from usecs to millisecs */
277 	do_div(time, USEC_PER_MSEC);
278 	if (time)
279 		hit /= (long)time;
280 	else
281 		pr_info("TIME IS ZERO??\n");
282 
283 	pr_info("Entries per millisec: %ld\n", hit);
284 
285 	if (hit) {
286 		/* Calculate the average time in nanosecs */
287 		avg = NSEC_PER_MSEC / hit;
288 		pr_info("%ld ns per entry\n", avg);
289 	}
290 
291 	if (missed) {
292 		if (time)
293 			missed /= (long)time;
294 
295 		pr_info("Total iterations per millisec: %ld\n", hit + missed);
296 
297 		/* it is possible that hit + missed will overflow and be zero */
298 		if (!(hit + missed)) {
299 			pr_info("hit + missed overflowed and totalled zero!\n");
300 			hit--; /* make it non zero */
301 		}
302 
303 		/* Caculate the average time in nanosecs */
304 		avg = NSEC_PER_MSEC / (hit + missed);
305 		pr_info("%ld ns per entry\n", avg);
306 	}
307 }
308 
309 static void wait_to_die(void)
310 {
311 	set_current_state(TASK_INTERRUPTIBLE);
312 	while (!kthread_should_stop()) {
313 		schedule();
314 		set_current_state(TASK_INTERRUPTIBLE);
315 	}
316 	__set_current_state(TASK_RUNNING);
317 }
318 
319 static int ring_buffer_consumer_thread(void *arg)
320 {
321 	while (!kthread_should_stop() && !kill_test) {
322 		complete(&read_start);
323 
324 		ring_buffer_consumer();
325 
326 		set_current_state(TASK_INTERRUPTIBLE);
327 		if (kthread_should_stop() || kill_test)
328 			break;
329 
330 		schedule();
331 		__set_current_state(TASK_RUNNING);
332 	}
333 	__set_current_state(TASK_RUNNING);
334 
335 	if (kill_test)
336 		wait_to_die();
337 
338 	return 0;
339 }
340 
341 static int ring_buffer_producer_thread(void *arg)
342 {
343 	init_completion(&read_start);
344 
345 	while (!kthread_should_stop() && !kill_test) {
346 		ring_buffer_reset(buffer);
347 
348 		if (consumer) {
349 			smp_wmb();
350 			wake_up_process(consumer);
351 			wait_for_completion(&read_start);
352 		}
353 
354 		ring_buffer_producer();
355 
356 		pr_info("Sleeping for 10 secs\n");
357 		set_current_state(TASK_INTERRUPTIBLE);
358 		schedule_timeout(HZ * SLEEP_TIME);
359 		__set_current_state(TASK_RUNNING);
360 	}
361 
362 	if (kill_test)
363 		wait_to_die();
364 
365 	return 0;
366 }
367 
368 static int __init ring_buffer_benchmark_init(void)
369 {
370 	int ret;
371 
372 	/* make a one meg buffer in overwite mode */
373 	buffer = ring_buffer_alloc(1000000, RB_FL_OVERWRITE);
374 	if (!buffer)
375 		return -ENOMEM;
376 
377 	if (!disable_reader) {
378 		consumer = kthread_create(ring_buffer_consumer_thread,
379 					  NULL, "rb_consumer");
380 		ret = PTR_ERR(consumer);
381 		if (IS_ERR(consumer))
382 			goto out_fail;
383 	}
384 
385 	producer = kthread_run(ring_buffer_producer_thread,
386 			       NULL, "rb_producer");
387 	ret = PTR_ERR(producer);
388 
389 	if (IS_ERR(producer))
390 		goto out_kill;
391 
392 	return 0;
393 
394  out_kill:
395 	if (consumer)
396 		kthread_stop(consumer);
397 
398  out_fail:
399 	ring_buffer_free(buffer);
400 	return ret;
401 }
402 
403 static void __exit ring_buffer_benchmark_exit(void)
404 {
405 	kthread_stop(producer);
406 	if (consumer)
407 		kthread_stop(consumer);
408 	ring_buffer_free(buffer);
409 }
410 
411 module_init(ring_buffer_benchmark_init);
412 module_exit(ring_buffer_benchmark_exit);
413 
414 MODULE_AUTHOR("Steven Rostedt");
415 MODULE_DESCRIPTION("ring_buffer_benchmark");
416 MODULE_LICENSE("GPL");
417