1 /* 2 * ring buffer tester and benchmark 3 * 4 * Copyright (C) 2009 Steven Rostedt <srostedt@redhat.com> 5 */ 6 #include <linux/ring_buffer.h> 7 #include <linux/completion.h> 8 #include <linux/kthread.h> 9 #include <linux/module.h> 10 #include <linux/time.h> 11 12 struct rb_page { 13 u64 ts; 14 local_t commit; 15 char data[4080]; 16 }; 17 18 /* run time and sleep time in seconds */ 19 #define RUN_TIME 10 20 #define SLEEP_TIME 10 21 22 /* number of events for writer to wake up the reader */ 23 static int wakeup_interval = 100; 24 25 static int reader_finish; 26 static struct completion read_start; 27 static struct completion read_done; 28 29 static struct ring_buffer *buffer; 30 static struct task_struct *producer; 31 static struct task_struct *consumer; 32 static unsigned long read; 33 34 static int disable_reader; 35 module_param(disable_reader, uint, 0644); 36 MODULE_PARM_DESC(disable_reader, "only run producer"); 37 38 static int write_iteration = 50; 39 module_param(write_iteration, uint, 0644); 40 MODULE_PARM_DESC(write_iteration, "# of writes between timestamp readings"); 41 42 static int producer_nice = 19; 43 static int consumer_nice = 19; 44 45 static int producer_fifo = -1; 46 static int consumer_fifo = -1; 47 48 module_param(producer_nice, uint, 0644); 49 MODULE_PARM_DESC(producer_nice, "nice prio for producer"); 50 51 module_param(consumer_nice, uint, 0644); 52 MODULE_PARM_DESC(consumer_nice, "nice prio for consumer"); 53 54 module_param(producer_fifo, uint, 0644); 55 MODULE_PARM_DESC(producer_fifo, "fifo prio for producer"); 56 57 module_param(consumer_fifo, uint, 0644); 58 MODULE_PARM_DESC(consumer_fifo, "fifo prio for consumer"); 59 60 static int read_events; 61 62 static int kill_test; 63 64 #define KILL_TEST() \ 65 do { \ 66 if (!kill_test) { \ 67 kill_test = 1; \ 68 WARN_ON(1); \ 69 } \ 70 } while (0) 71 72 enum event_status { 73 EVENT_FOUND, 74 EVENT_DROPPED, 75 }; 76 77 static enum event_status read_event(int cpu) 78 { 79 struct ring_buffer_event *event; 80 int *entry; 81 u64 ts; 82 83 event = ring_buffer_consume(buffer, cpu, &ts); 84 if (!event) 85 return EVENT_DROPPED; 86 87 entry = ring_buffer_event_data(event); 88 if (*entry != cpu) { 89 KILL_TEST(); 90 return EVENT_DROPPED; 91 } 92 93 read++; 94 return EVENT_FOUND; 95 } 96 97 static enum event_status read_page(int cpu) 98 { 99 struct ring_buffer_event *event; 100 struct rb_page *rpage; 101 unsigned long commit; 102 void *bpage; 103 int *entry; 104 int ret; 105 int inc; 106 int i; 107 108 bpage = ring_buffer_alloc_read_page(buffer); 109 if (!bpage) 110 return EVENT_DROPPED; 111 112 ret = ring_buffer_read_page(buffer, &bpage, PAGE_SIZE, cpu, 1); 113 if (ret >= 0) { 114 rpage = bpage; 115 commit = local_read(&rpage->commit); 116 for (i = 0; i < commit && !kill_test; i += inc) { 117 118 if (i >= (PAGE_SIZE - offsetof(struct rb_page, data))) { 119 KILL_TEST(); 120 break; 121 } 122 123 inc = -1; 124 event = (void *)&rpage->data[i]; 125 switch (event->type_len) { 126 case RINGBUF_TYPE_PADDING: 127 /* failed writes may be discarded events */ 128 if (!event->time_delta) 129 KILL_TEST(); 130 inc = event->array[0] + 4; 131 break; 132 case RINGBUF_TYPE_TIME_EXTEND: 133 inc = 8; 134 break; 135 case 0: 136 entry = ring_buffer_event_data(event); 137 if (*entry != cpu) { 138 KILL_TEST(); 139 break; 140 } 141 read++; 142 if (!event->array[0]) { 143 KILL_TEST(); 144 break; 145 } 146 inc = event->array[0] + 4; 147 break; 148 default: 149 entry = ring_buffer_event_data(event); 150 if (*entry != cpu) { 151 KILL_TEST(); 152 break; 153 } 154 read++; 155 inc = ((event->type_len + 1) * 4); 156 } 157 if (kill_test) 158 break; 159 160 if (inc <= 0) { 161 KILL_TEST(); 162 break; 163 } 164 } 165 } 166 ring_buffer_free_read_page(buffer, bpage); 167 168 if (ret < 0) 169 return EVENT_DROPPED; 170 return EVENT_FOUND; 171 } 172 173 static void ring_buffer_consumer(void) 174 { 175 /* toggle between reading pages and events */ 176 read_events ^= 1; 177 178 read = 0; 179 while (!reader_finish && !kill_test) { 180 int found; 181 182 do { 183 int cpu; 184 185 found = 0; 186 for_each_online_cpu(cpu) { 187 enum event_status stat; 188 189 if (read_events) 190 stat = read_event(cpu); 191 else 192 stat = read_page(cpu); 193 194 if (kill_test) 195 break; 196 if (stat == EVENT_FOUND) 197 found = 1; 198 } 199 } while (found && !kill_test); 200 201 set_current_state(TASK_INTERRUPTIBLE); 202 if (reader_finish) 203 break; 204 205 schedule(); 206 __set_current_state(TASK_RUNNING); 207 } 208 reader_finish = 0; 209 complete(&read_done); 210 } 211 212 static void ring_buffer_producer(void) 213 { 214 struct timeval start_tv; 215 struct timeval end_tv; 216 unsigned long long time; 217 unsigned long long entries; 218 unsigned long long overruns; 219 unsigned long missed = 0; 220 unsigned long hit = 0; 221 unsigned long avg; 222 int cnt = 0; 223 224 /* 225 * Hammer the buffer for 10 secs (this may 226 * make the system stall) 227 */ 228 trace_printk("Starting ring buffer hammer\n"); 229 do_gettimeofday(&start_tv); 230 do { 231 struct ring_buffer_event *event; 232 int *entry; 233 int i; 234 235 for (i = 0; i < write_iteration; i++) { 236 event = ring_buffer_lock_reserve(buffer, 10); 237 if (!event) { 238 missed++; 239 } else { 240 hit++; 241 entry = ring_buffer_event_data(event); 242 *entry = smp_processor_id(); 243 ring_buffer_unlock_commit(buffer, event); 244 } 245 } 246 do_gettimeofday(&end_tv); 247 248 cnt++; 249 if (consumer && !(cnt % wakeup_interval)) 250 wake_up_process(consumer); 251 252 #ifndef CONFIG_PREEMPT 253 /* 254 * If we are a non preempt kernel, the 10 second run will 255 * stop everything while it runs. Instead, we will call 256 * cond_resched and also add any time that was lost by a 257 * rescedule. 258 * 259 * Do a cond resched at the same frequency we would wake up 260 * the reader. 261 */ 262 if (cnt % wakeup_interval) 263 cond_resched(); 264 #endif 265 266 } while (end_tv.tv_sec < (start_tv.tv_sec + RUN_TIME) && !kill_test); 267 trace_printk("End ring buffer hammer\n"); 268 269 if (consumer) { 270 /* Init both completions here to avoid races */ 271 init_completion(&read_start); 272 init_completion(&read_done); 273 /* the completions must be visible before the finish var */ 274 smp_wmb(); 275 reader_finish = 1; 276 /* finish var visible before waking up the consumer */ 277 smp_wmb(); 278 wake_up_process(consumer); 279 wait_for_completion(&read_done); 280 } 281 282 time = end_tv.tv_sec - start_tv.tv_sec; 283 time *= USEC_PER_SEC; 284 time += (long long)((long)end_tv.tv_usec - (long)start_tv.tv_usec); 285 286 entries = ring_buffer_entries(buffer); 287 overruns = ring_buffer_overruns(buffer); 288 289 if (kill_test) 290 trace_printk("ERROR!\n"); 291 292 if (!disable_reader) { 293 if (consumer_fifo < 0) 294 trace_printk("Running Consumer at nice: %d\n", 295 consumer_nice); 296 else 297 trace_printk("Running Consumer at SCHED_FIFO %d\n", 298 consumer_fifo); 299 } 300 if (producer_fifo < 0) 301 trace_printk("Running Producer at nice: %d\n", 302 producer_nice); 303 else 304 trace_printk("Running Producer at SCHED_FIFO %d\n", 305 producer_fifo); 306 307 /* Let the user know that the test is running at low priority */ 308 if (producer_fifo < 0 && consumer_fifo < 0 && 309 producer_nice == 19 && consumer_nice == 19) 310 trace_printk("WARNING!!! This test is running at lowest priority.\n"); 311 312 trace_printk("Time: %lld (usecs)\n", time); 313 trace_printk("Overruns: %lld\n", overruns); 314 if (disable_reader) 315 trace_printk("Read: (reader disabled)\n"); 316 else 317 trace_printk("Read: %ld (by %s)\n", read, 318 read_events ? "events" : "pages"); 319 trace_printk("Entries: %lld\n", entries); 320 trace_printk("Total: %lld\n", entries + overruns + read); 321 trace_printk("Missed: %ld\n", missed); 322 trace_printk("Hit: %ld\n", hit); 323 324 /* Convert time from usecs to millisecs */ 325 do_div(time, USEC_PER_MSEC); 326 if (time) 327 hit /= (long)time; 328 else 329 trace_printk("TIME IS ZERO??\n"); 330 331 trace_printk("Entries per millisec: %ld\n", hit); 332 333 if (hit) { 334 /* Calculate the average time in nanosecs */ 335 avg = NSEC_PER_MSEC / hit; 336 trace_printk("%ld ns per entry\n", avg); 337 } 338 339 if (missed) { 340 if (time) 341 missed /= (long)time; 342 343 trace_printk("Total iterations per millisec: %ld\n", 344 hit + missed); 345 346 /* it is possible that hit + missed will overflow and be zero */ 347 if (!(hit + missed)) { 348 trace_printk("hit + missed overflowed and totalled zero!\n"); 349 hit--; /* make it non zero */ 350 } 351 352 /* Caculate the average time in nanosecs */ 353 avg = NSEC_PER_MSEC / (hit + missed); 354 trace_printk("%ld ns per entry\n", avg); 355 } 356 } 357 358 static void wait_to_die(void) 359 { 360 set_current_state(TASK_INTERRUPTIBLE); 361 while (!kthread_should_stop()) { 362 schedule(); 363 set_current_state(TASK_INTERRUPTIBLE); 364 } 365 __set_current_state(TASK_RUNNING); 366 } 367 368 static int ring_buffer_consumer_thread(void *arg) 369 { 370 while (!kthread_should_stop() && !kill_test) { 371 complete(&read_start); 372 373 ring_buffer_consumer(); 374 375 set_current_state(TASK_INTERRUPTIBLE); 376 if (kthread_should_stop() || kill_test) 377 break; 378 379 schedule(); 380 __set_current_state(TASK_RUNNING); 381 } 382 __set_current_state(TASK_RUNNING); 383 384 if (kill_test) 385 wait_to_die(); 386 387 return 0; 388 } 389 390 static int ring_buffer_producer_thread(void *arg) 391 { 392 init_completion(&read_start); 393 394 while (!kthread_should_stop() && !kill_test) { 395 ring_buffer_reset(buffer); 396 397 if (consumer) { 398 smp_wmb(); 399 wake_up_process(consumer); 400 wait_for_completion(&read_start); 401 } 402 403 ring_buffer_producer(); 404 405 trace_printk("Sleeping for 10 secs\n"); 406 set_current_state(TASK_INTERRUPTIBLE); 407 schedule_timeout(HZ * SLEEP_TIME); 408 __set_current_state(TASK_RUNNING); 409 } 410 411 if (kill_test) 412 wait_to_die(); 413 414 return 0; 415 } 416 417 static int __init ring_buffer_benchmark_init(void) 418 { 419 int ret; 420 421 /* make a one meg buffer in overwite mode */ 422 buffer = ring_buffer_alloc(1000000, RB_FL_OVERWRITE); 423 if (!buffer) 424 return -ENOMEM; 425 426 if (!disable_reader) { 427 consumer = kthread_create(ring_buffer_consumer_thread, 428 NULL, "rb_consumer"); 429 ret = PTR_ERR(consumer); 430 if (IS_ERR(consumer)) 431 goto out_fail; 432 } 433 434 producer = kthread_run(ring_buffer_producer_thread, 435 NULL, "rb_producer"); 436 ret = PTR_ERR(producer); 437 438 if (IS_ERR(producer)) 439 goto out_kill; 440 441 /* 442 * Run them as low-prio background tasks by default: 443 */ 444 if (!disable_reader) { 445 if (consumer_fifo >= 0) { 446 struct sched_param param = { 447 .sched_priority = consumer_fifo 448 }; 449 sched_setscheduler(consumer, SCHED_FIFO, ¶m); 450 } else 451 set_user_nice(consumer, consumer_nice); 452 } 453 454 if (producer_fifo >= 0) { 455 struct sched_param param = { 456 .sched_priority = consumer_fifo 457 }; 458 sched_setscheduler(producer, SCHED_FIFO, ¶m); 459 } else 460 set_user_nice(producer, producer_nice); 461 462 return 0; 463 464 out_kill: 465 if (consumer) 466 kthread_stop(consumer); 467 468 out_fail: 469 ring_buffer_free(buffer); 470 return ret; 471 } 472 473 static void __exit ring_buffer_benchmark_exit(void) 474 { 475 kthread_stop(producer); 476 if (consumer) 477 kthread_stop(consumer); 478 ring_buffer_free(buffer); 479 } 480 481 module_init(ring_buffer_benchmark_init); 482 module_exit(ring_buffer_benchmark_exit); 483 484 MODULE_AUTHOR("Steven Rostedt"); 485 MODULE_DESCRIPTION("ring_buffer_benchmark"); 486 MODULE_LICENSE("GPL"); 487