1 // SPDX-License-Identifier: GPL-2.0 2 /* 3 * ring buffer tester and benchmark 4 * 5 * Copyright (C) 2009 Steven Rostedt <srostedt@redhat.com> 6 */ 7 #include <linux/ring_buffer.h> 8 #include <linux/completion.h> 9 #include <linux/kthread.h> 10 #include <uapi/linux/sched/types.h> 11 #include <linux/module.h> 12 #include <linux/ktime.h> 13 #include <asm/local.h> 14 15 struct rb_page { 16 u64 ts; 17 local_t commit; 18 char data[4080]; 19 }; 20 21 /* run time and sleep time in seconds */ 22 #define RUN_TIME 10ULL 23 #define SLEEP_TIME 10 24 25 /* number of events for writer to wake up the reader */ 26 static int wakeup_interval = 100; 27 28 static int reader_finish; 29 static DECLARE_COMPLETION(read_start); 30 static DECLARE_COMPLETION(read_done); 31 32 static struct trace_buffer *buffer; 33 static struct task_struct *producer; 34 static struct task_struct *consumer; 35 static unsigned long read; 36 37 static unsigned int disable_reader; 38 module_param(disable_reader, uint, 0644); 39 MODULE_PARM_DESC(disable_reader, "only run producer"); 40 41 static unsigned int write_iteration = 50; 42 module_param(write_iteration, uint, 0644); 43 MODULE_PARM_DESC(write_iteration, "# of writes between timestamp readings"); 44 45 static int producer_nice = MAX_NICE; 46 static int consumer_nice = MAX_NICE; 47 48 static int producer_fifo; 49 static int consumer_fifo; 50 51 module_param(producer_nice, int, 0644); 52 MODULE_PARM_DESC(producer_nice, "nice prio for producer"); 53 54 module_param(consumer_nice, int, 0644); 55 MODULE_PARM_DESC(consumer_nice, "nice prio for consumer"); 56 57 module_param(producer_fifo, int, 0644); 58 MODULE_PARM_DESC(producer_fifo, "use fifo for producer: 0 - disabled, 1 - low prio, 2 - fifo"); 59 60 module_param(consumer_fifo, int, 0644); 61 MODULE_PARM_DESC(consumer_fifo, "use fifo for consumer: 0 - disabled, 1 - low prio, 2 - fifo"); 62 63 static int read_events; 64 65 static int test_error; 66 67 #define TEST_ERROR() \ 68 do { \ 69 if (!test_error) { \ 70 test_error = 1; \ 71 WARN_ON(1); \ 72 } \ 73 } while (0) 74 75 enum event_status { 76 EVENT_FOUND, 77 EVENT_DROPPED, 78 }; 79 80 static bool break_test(void) 81 { 82 return test_error || kthread_should_stop(); 83 } 84 85 static enum event_status read_event(int cpu) 86 { 87 struct ring_buffer_event *event; 88 int *entry; 89 u64 ts; 90 91 event = ring_buffer_consume(buffer, cpu, &ts, NULL); 92 if (!event) 93 return EVENT_DROPPED; 94 95 entry = ring_buffer_event_data(event); 96 if (*entry != cpu) { 97 TEST_ERROR(); 98 return EVENT_DROPPED; 99 } 100 101 read++; 102 return EVENT_FOUND; 103 } 104 105 static enum event_status read_page(int cpu) 106 { 107 struct ring_buffer_event *event; 108 struct rb_page *rpage; 109 unsigned long commit; 110 void *bpage; 111 int *entry; 112 int ret; 113 int inc; 114 int i; 115 116 bpage = ring_buffer_alloc_read_page(buffer, cpu); 117 if (IS_ERR(bpage)) 118 return EVENT_DROPPED; 119 120 ret = ring_buffer_read_page(buffer, &bpage, PAGE_SIZE, cpu, 1); 121 if (ret >= 0) { 122 rpage = bpage; 123 /* The commit may have missed event flags set, clear them */ 124 commit = local_read(&rpage->commit) & 0xfffff; 125 for (i = 0; i < commit && !test_error ; i += inc) { 126 127 if (i >= (PAGE_SIZE - offsetof(struct rb_page, data))) { 128 TEST_ERROR(); 129 break; 130 } 131 132 inc = -1; 133 event = (void *)&rpage->data[i]; 134 switch (event->type_len) { 135 case RINGBUF_TYPE_PADDING: 136 /* failed writes may be discarded events */ 137 if (!event->time_delta) 138 TEST_ERROR(); 139 inc = event->array[0] + 4; 140 break; 141 case RINGBUF_TYPE_TIME_EXTEND: 142 inc = 8; 143 break; 144 case 0: 145 entry = ring_buffer_event_data(event); 146 if (*entry != cpu) { 147 TEST_ERROR(); 148 break; 149 } 150 read++; 151 if (!event->array[0]) { 152 TEST_ERROR(); 153 break; 154 } 155 inc = event->array[0] + 4; 156 break; 157 default: 158 entry = ring_buffer_event_data(event); 159 if (*entry != cpu) { 160 TEST_ERROR(); 161 break; 162 } 163 read++; 164 inc = ((event->type_len + 1) * 4); 165 } 166 if (test_error) 167 break; 168 169 if (inc <= 0) { 170 TEST_ERROR(); 171 break; 172 } 173 } 174 } 175 ring_buffer_free_read_page(buffer, cpu, bpage); 176 177 if (ret < 0) 178 return EVENT_DROPPED; 179 return EVENT_FOUND; 180 } 181 182 static void ring_buffer_consumer(void) 183 { 184 /* toggle between reading pages and events */ 185 read_events ^= 1; 186 187 read = 0; 188 /* 189 * Continue running until the producer specifically asks to stop 190 * and is ready for the completion. 191 */ 192 while (!READ_ONCE(reader_finish)) { 193 int found = 1; 194 195 while (found && !test_error) { 196 int cpu; 197 198 found = 0; 199 for_each_online_cpu(cpu) { 200 enum event_status stat; 201 202 if (read_events) 203 stat = read_event(cpu); 204 else 205 stat = read_page(cpu); 206 207 if (test_error) 208 break; 209 210 if (stat == EVENT_FOUND) 211 found = 1; 212 213 } 214 } 215 216 /* Wait till the producer wakes us up when there is more data 217 * available or when the producer wants us to finish reading. 218 */ 219 set_current_state(TASK_INTERRUPTIBLE); 220 if (reader_finish) 221 break; 222 223 schedule(); 224 } 225 __set_current_state(TASK_RUNNING); 226 reader_finish = 0; 227 complete(&read_done); 228 } 229 230 static void ring_buffer_producer(void) 231 { 232 ktime_t start_time, end_time, timeout; 233 unsigned long long time; 234 unsigned long long entries; 235 unsigned long long overruns; 236 unsigned long missed = 0; 237 unsigned long hit = 0; 238 unsigned long avg; 239 int cnt = 0; 240 241 /* 242 * Hammer the buffer for 10 secs (this may 243 * make the system stall) 244 */ 245 trace_printk("Starting ring buffer hammer\n"); 246 start_time = ktime_get(); 247 timeout = ktime_add_ns(start_time, RUN_TIME * NSEC_PER_SEC); 248 do { 249 struct ring_buffer_event *event; 250 int *entry; 251 int i; 252 253 for (i = 0; i < write_iteration; i++) { 254 event = ring_buffer_lock_reserve(buffer, 10); 255 if (!event) { 256 missed++; 257 } else { 258 hit++; 259 entry = ring_buffer_event_data(event); 260 *entry = smp_processor_id(); 261 ring_buffer_unlock_commit(buffer); 262 } 263 } 264 end_time = ktime_get(); 265 266 cnt++; 267 if (consumer && !(cnt % wakeup_interval)) 268 wake_up_process(consumer); 269 270 #ifndef CONFIG_PREEMPTION 271 /* 272 * If we are a non preempt kernel, the 10 seconds run will 273 * stop everything while it runs. Instead, we will call 274 * cond_resched and also add any time that was lost by a 275 * reschedule. 276 * 277 * Do a cond resched at the same frequency we would wake up 278 * the reader. 279 */ 280 if (cnt % wakeup_interval) 281 cond_resched(); 282 #endif 283 } while (ktime_before(end_time, timeout) && !break_test()); 284 trace_printk("End ring buffer hammer\n"); 285 286 if (consumer) { 287 /* Init both completions here to avoid races */ 288 init_completion(&read_start); 289 init_completion(&read_done); 290 /* the completions must be visible before the finish var */ 291 smp_wmb(); 292 reader_finish = 1; 293 wake_up_process(consumer); 294 wait_for_completion(&read_done); 295 } 296 297 time = ktime_us_delta(end_time, start_time); 298 299 entries = ring_buffer_entries(buffer); 300 overruns = ring_buffer_overruns(buffer); 301 302 if (test_error) 303 trace_printk("ERROR!\n"); 304 305 if (!disable_reader) { 306 if (consumer_fifo) 307 trace_printk("Running Consumer at SCHED_FIFO %s\n", 308 consumer_fifo == 1 ? "low" : "high"); 309 else 310 trace_printk("Running Consumer at nice: %d\n", 311 consumer_nice); 312 } 313 if (producer_fifo) 314 trace_printk("Running Producer at SCHED_FIFO %s\n", 315 producer_fifo == 1 ? "low" : "high"); 316 else 317 trace_printk("Running Producer at nice: %d\n", 318 producer_nice); 319 320 /* Let the user know that the test is running at low priority */ 321 if (!producer_fifo && !consumer_fifo && 322 producer_nice == MAX_NICE && consumer_nice == MAX_NICE) 323 trace_printk("WARNING!!! This test is running at lowest priority.\n"); 324 325 trace_printk("Time: %lld (usecs)\n", time); 326 trace_printk("Overruns: %lld\n", overruns); 327 if (disable_reader) 328 trace_printk("Read: (reader disabled)\n"); 329 else 330 trace_printk("Read: %ld (by %s)\n", read, 331 read_events ? "events" : "pages"); 332 trace_printk("Entries: %lld\n", entries); 333 trace_printk("Total: %lld\n", entries + overruns + read); 334 trace_printk("Missed: %ld\n", missed); 335 trace_printk("Hit: %ld\n", hit); 336 337 /* Convert time from usecs to millisecs */ 338 do_div(time, USEC_PER_MSEC); 339 if (time) 340 hit /= (long)time; 341 else 342 trace_printk("TIME IS ZERO??\n"); 343 344 trace_printk("Entries per millisec: %ld\n", hit); 345 346 if (hit) { 347 /* Calculate the average time in nanosecs */ 348 avg = NSEC_PER_MSEC / hit; 349 trace_printk("%ld ns per entry\n", avg); 350 } 351 352 if (missed) { 353 if (time) 354 missed /= (long)time; 355 356 trace_printk("Total iterations per millisec: %ld\n", 357 hit + missed); 358 359 /* it is possible that hit + missed will overflow and be zero */ 360 if (!(hit + missed)) { 361 trace_printk("hit + missed overflowed and totalled zero!\n"); 362 hit--; /* make it non zero */ 363 } 364 365 /* Calculate the average time in nanosecs */ 366 avg = NSEC_PER_MSEC / (hit + missed); 367 trace_printk("%ld ns per entry\n", avg); 368 } 369 } 370 371 static void wait_to_die(void) 372 { 373 set_current_state(TASK_INTERRUPTIBLE); 374 while (!kthread_should_stop()) { 375 schedule(); 376 set_current_state(TASK_INTERRUPTIBLE); 377 } 378 __set_current_state(TASK_RUNNING); 379 } 380 381 static int ring_buffer_consumer_thread(void *arg) 382 { 383 while (!break_test()) { 384 complete(&read_start); 385 386 ring_buffer_consumer(); 387 388 set_current_state(TASK_INTERRUPTIBLE); 389 if (break_test()) 390 break; 391 schedule(); 392 } 393 __set_current_state(TASK_RUNNING); 394 395 if (!kthread_should_stop()) 396 wait_to_die(); 397 398 return 0; 399 } 400 401 static int ring_buffer_producer_thread(void *arg) 402 { 403 while (!break_test()) { 404 ring_buffer_reset(buffer); 405 406 if (consumer) { 407 wake_up_process(consumer); 408 wait_for_completion(&read_start); 409 } 410 411 ring_buffer_producer(); 412 if (break_test()) 413 goto out_kill; 414 415 trace_printk("Sleeping for 10 secs\n"); 416 set_current_state(TASK_INTERRUPTIBLE); 417 if (break_test()) 418 goto out_kill; 419 schedule_timeout(HZ * SLEEP_TIME); 420 } 421 422 out_kill: 423 __set_current_state(TASK_RUNNING); 424 if (!kthread_should_stop()) 425 wait_to_die(); 426 427 return 0; 428 } 429 430 static int __init ring_buffer_benchmark_init(void) 431 { 432 int ret; 433 434 /* make a one meg buffer in overwite mode */ 435 buffer = ring_buffer_alloc(1000000, RB_FL_OVERWRITE); 436 if (!buffer) 437 return -ENOMEM; 438 439 if (!disable_reader) { 440 consumer = kthread_create(ring_buffer_consumer_thread, 441 NULL, "rb_consumer"); 442 ret = PTR_ERR(consumer); 443 if (IS_ERR(consumer)) 444 goto out_fail; 445 } 446 447 producer = kthread_run(ring_buffer_producer_thread, 448 NULL, "rb_producer"); 449 ret = PTR_ERR(producer); 450 451 if (IS_ERR(producer)) 452 goto out_kill; 453 454 /* 455 * Run them as low-prio background tasks by default: 456 */ 457 if (!disable_reader) { 458 if (consumer_fifo >= 2) 459 sched_set_fifo(consumer); 460 else if (consumer_fifo == 1) 461 sched_set_fifo_low(consumer); 462 else 463 set_user_nice(consumer, consumer_nice); 464 } 465 466 if (producer_fifo >= 2) 467 sched_set_fifo(producer); 468 else if (producer_fifo == 1) 469 sched_set_fifo_low(producer); 470 else 471 set_user_nice(producer, producer_nice); 472 473 return 0; 474 475 out_kill: 476 if (consumer) 477 kthread_stop(consumer); 478 479 out_fail: 480 ring_buffer_free(buffer); 481 return ret; 482 } 483 484 static void __exit ring_buffer_benchmark_exit(void) 485 { 486 kthread_stop(producer); 487 if (consumer) 488 kthread_stop(consumer); 489 ring_buffer_free(buffer); 490 } 491 492 module_init(ring_buffer_benchmark_init); 493 module_exit(ring_buffer_benchmark_exit); 494 495 MODULE_AUTHOR("Steven Rostedt"); 496 MODULE_DESCRIPTION("ring_buffer_benchmark"); 497 MODULE_LICENSE("GPL"); 498