1 /* 2 * AioContext multithreading tests 3 * 4 * Copyright Red Hat, Inc. 2016 5 * 6 * Authors: 7 * Paolo Bonzini <pbonzini@redhat.com> 8 * 9 * This work is licensed under the terms of the GNU LGPL, version 2 or later. 10 * See the COPYING.LIB file in the top-level directory. 11 */ 12 13 #include "qemu/osdep.h" 14 #include "block/aio.h" 15 #include "qemu/coroutine.h" 16 #include "qemu/thread.h" 17 #include "qemu/error-report.h" 18 #include "iothread.h" 19 20 /* AioContext management */ 21 22 #define NUM_CONTEXTS 5 23 24 static IOThread *threads[NUM_CONTEXTS]; 25 static AioContext *ctx[NUM_CONTEXTS]; 26 static __thread int id = -1; 27 28 static QemuEvent done_event; 29 30 /* Run a function synchronously on a remote iothread. */ 31 32 typedef struct CtxRunData { 33 QEMUBHFunc *cb; 34 void *arg; 35 } CtxRunData; 36 37 static void ctx_run_bh_cb(void *opaque) 38 { 39 CtxRunData *data = opaque; 40 41 data->cb(data->arg); 42 qemu_event_set(&done_event); 43 } 44 45 static void ctx_run(int i, QEMUBHFunc *cb, void *opaque) 46 { 47 CtxRunData data = { 48 .cb = cb, 49 .arg = opaque 50 }; 51 52 qemu_event_reset(&done_event); 53 aio_bh_schedule_oneshot(ctx[i], ctx_run_bh_cb, &data); 54 qemu_event_wait(&done_event); 55 } 56 57 /* Starting the iothreads. */ 58 59 static void set_id_cb(void *opaque) 60 { 61 int *i = opaque; 62 63 id = *i; 64 } 65 66 static void create_aio_contexts(void) 67 { 68 int i; 69 70 for (i = 0; i < NUM_CONTEXTS; i++) { 71 threads[i] = iothread_new(); 72 ctx[i] = iothread_get_aio_context(threads[i]); 73 } 74 75 qemu_event_init(&done_event, false); 76 for (i = 0; i < NUM_CONTEXTS; i++) { 77 ctx_run(i, set_id_cb, &i); 78 } 79 } 80 81 /* Stopping the iothreads. */ 82 83 static void join_aio_contexts(void) 84 { 85 int i; 86 87 for (i = 0; i < NUM_CONTEXTS; i++) { 88 aio_context_ref(ctx[i]); 89 } 90 for (i = 0; i < NUM_CONTEXTS; i++) { 91 iothread_join(threads[i]); 92 } 93 for (i = 0; i < NUM_CONTEXTS; i++) { 94 aio_context_unref(ctx[i]); 95 } 96 qemu_event_destroy(&done_event); 97 } 98 99 /* Basic test for the stuff above. */ 100 101 static void test_lifecycle(void) 102 { 103 create_aio_contexts(); 104 join_aio_contexts(); 105 } 106 107 /* aio_co_schedule test. */ 108 109 static Coroutine *to_schedule[NUM_CONTEXTS]; 110 static bool stop[NUM_CONTEXTS]; 111 112 static int count_retry; 113 static int count_here; 114 static int count_other; 115 116 static bool schedule_next(int n) 117 { 118 Coroutine *co; 119 120 co = qatomic_xchg(&to_schedule[n], NULL); 121 if (!co) { 122 qatomic_inc(&count_retry); 123 return false; 124 } 125 126 if (n == id) { 127 qatomic_inc(&count_here); 128 } else { 129 qatomic_inc(&count_other); 130 } 131 132 aio_co_schedule(ctx[n], co); 133 return true; 134 } 135 136 static void finish_cb(void *opaque) 137 { 138 stop[id] = true; 139 schedule_next(id); 140 } 141 142 static coroutine_fn void test_multi_co_schedule_entry(void *opaque) 143 { 144 g_assert(to_schedule[id] == NULL); 145 146 /* 147 * The next iteration will set to_schedule[id] again, but once finish_cb 148 * is scheduled there is no guarantee that it will actually be woken up, 149 * so at that point it must not go to sleep. 150 */ 151 while (!stop[id]) { 152 int n; 153 154 n = g_test_rand_int_range(0, NUM_CONTEXTS); 155 schedule_next(n); 156 157 qatomic_mb_set(&to_schedule[id], qemu_coroutine_self()); 158 /* finish_cb can run here. */ 159 qemu_coroutine_yield(); 160 g_assert(to_schedule[id] == NULL); 161 } 162 } 163 164 165 static void test_multi_co_schedule(int seconds) 166 { 167 int i; 168 169 count_here = count_other = count_retry = 0; 170 171 create_aio_contexts(); 172 for (i = 0; i < NUM_CONTEXTS; i++) { 173 Coroutine *co1 = qemu_coroutine_create(test_multi_co_schedule_entry, NULL); 174 aio_co_schedule(ctx[i], co1); 175 } 176 177 g_usleep(seconds * 1000000); 178 179 /* Guarantee that each AioContext is woken up from its last wait. */ 180 for (i = 0; i < NUM_CONTEXTS; i++) { 181 ctx_run(i, finish_cb, NULL); 182 g_assert(to_schedule[i] == NULL); 183 } 184 185 join_aio_contexts(); 186 g_test_message("scheduled %d, queued %d, retry %d, total %d", 187 count_other, count_here, count_retry, 188 count_here + count_other + count_retry); 189 } 190 191 static void test_multi_co_schedule_1(void) 192 { 193 test_multi_co_schedule(1); 194 } 195 196 static void test_multi_co_schedule_10(void) 197 { 198 test_multi_co_schedule(10); 199 } 200 201 /* CoMutex thread-safety. */ 202 203 static uint32_t atomic_counter; 204 static uint32_t running; 205 static uint32_t counter; 206 static CoMutex comutex; 207 static bool now_stopping; 208 209 static void coroutine_fn test_multi_co_mutex_entry(void *opaque) 210 { 211 while (!qatomic_read(&now_stopping)) { 212 qemu_co_mutex_lock(&comutex); 213 counter++; 214 qemu_co_mutex_unlock(&comutex); 215 216 /* Increase atomic_counter *after* releasing the mutex. Otherwise 217 * there is a chance (it happens about 1 in 3 runs) that the iothread 218 * exits before the coroutine is woken up, causing a spurious 219 * assertion failure. 220 */ 221 qatomic_inc(&atomic_counter); 222 } 223 qatomic_dec(&running); 224 } 225 226 static void test_multi_co_mutex(int threads, int seconds) 227 { 228 int i; 229 230 qemu_co_mutex_init(&comutex); 231 counter = 0; 232 atomic_counter = 0; 233 now_stopping = false; 234 235 create_aio_contexts(); 236 assert(threads <= NUM_CONTEXTS); 237 running = threads; 238 for (i = 0; i < threads; i++) { 239 Coroutine *co1 = qemu_coroutine_create(test_multi_co_mutex_entry, NULL); 240 aio_co_schedule(ctx[i], co1); 241 } 242 243 g_usleep(seconds * 1000000); 244 245 qatomic_set(&now_stopping, true); 246 while (running > 0) { 247 g_usleep(100000); 248 } 249 250 join_aio_contexts(); 251 g_test_message("%d iterations/second", counter / seconds); 252 g_assert_cmpint(counter, ==, atomic_counter); 253 } 254 255 /* Testing with NUM_CONTEXTS threads focuses on the queue. The mutex however 256 * is too contended (and the threads spend too much time in aio_poll) 257 * to actually stress the handoff protocol. 258 */ 259 static void test_multi_co_mutex_1(void) 260 { 261 test_multi_co_mutex(NUM_CONTEXTS, 1); 262 } 263 264 static void test_multi_co_mutex_10(void) 265 { 266 test_multi_co_mutex(NUM_CONTEXTS, 10); 267 } 268 269 /* Testing with fewer threads stresses the handoff protocol too. Still, the 270 * case where the locker _can_ pick up a handoff is very rare, happening 271 * about 10 times in 1 million, so increase the runtime a bit compared to 272 * other "quick" testcases that only run for 1 second. 273 */ 274 static void test_multi_co_mutex_2_3(void) 275 { 276 test_multi_co_mutex(2, 3); 277 } 278 279 static void test_multi_co_mutex_2_30(void) 280 { 281 test_multi_co_mutex(2, 30); 282 } 283 284 /* Same test with fair mutexes, for performance comparison. */ 285 286 #ifdef CONFIG_LINUX 287 #include "qemu/futex.h" 288 289 /* The nodes for the mutex reside in this structure (on which we try to avoid 290 * false sharing). The head of the mutex is in the "mutex_head" variable. 291 */ 292 static struct { 293 int next, locked; 294 int padding[14]; 295 } nodes[NUM_CONTEXTS] __attribute__((__aligned__(64))); 296 297 static int mutex_head = -1; 298 299 static void mcs_mutex_lock(void) 300 { 301 int prev; 302 303 nodes[id].next = -1; 304 nodes[id].locked = 1; 305 prev = qatomic_xchg(&mutex_head, id); 306 if (prev != -1) { 307 qatomic_set(&nodes[prev].next, id); 308 qemu_futex_wait(&nodes[id].locked, 1); 309 } 310 } 311 312 static void mcs_mutex_unlock(void) 313 { 314 int next; 315 if (qatomic_read(&nodes[id].next) == -1) { 316 if (qatomic_read(&mutex_head) == id && 317 qatomic_cmpxchg(&mutex_head, id, -1) == id) { 318 /* Last item in the list, exit. */ 319 return; 320 } 321 while (qatomic_read(&nodes[id].next) == -1) { 322 /* mcs_mutex_lock did the xchg, but has not updated 323 * nodes[prev].next yet. 324 */ 325 } 326 } 327 328 /* Wake up the next in line. */ 329 next = qatomic_read(&nodes[id].next); 330 nodes[next].locked = 0; 331 qemu_futex_wake(&nodes[next].locked, 1); 332 } 333 334 static void test_multi_fair_mutex_entry(void *opaque) 335 { 336 while (!qatomic_read(&now_stopping)) { 337 mcs_mutex_lock(); 338 counter++; 339 mcs_mutex_unlock(); 340 qatomic_inc(&atomic_counter); 341 } 342 qatomic_dec(&running); 343 } 344 345 static void test_multi_fair_mutex(int threads, int seconds) 346 { 347 int i; 348 349 assert(mutex_head == -1); 350 counter = 0; 351 atomic_counter = 0; 352 now_stopping = false; 353 354 create_aio_contexts(); 355 assert(threads <= NUM_CONTEXTS); 356 running = threads; 357 for (i = 0; i < threads; i++) { 358 Coroutine *co1 = qemu_coroutine_create(test_multi_fair_mutex_entry, NULL); 359 aio_co_schedule(ctx[i], co1); 360 } 361 362 g_usleep(seconds * 1000000); 363 364 qatomic_set(&now_stopping, true); 365 while (running > 0) { 366 g_usleep(100000); 367 } 368 369 join_aio_contexts(); 370 g_test_message("%d iterations/second", counter / seconds); 371 g_assert_cmpint(counter, ==, atomic_counter); 372 } 373 374 static void test_multi_fair_mutex_1(void) 375 { 376 test_multi_fair_mutex(NUM_CONTEXTS, 1); 377 } 378 379 static void test_multi_fair_mutex_10(void) 380 { 381 test_multi_fair_mutex(NUM_CONTEXTS, 10); 382 } 383 #endif 384 385 /* Same test with pthread mutexes, for performance comparison and 386 * portability. */ 387 388 static QemuMutex mutex; 389 390 static void test_multi_mutex_entry(void *opaque) 391 { 392 while (!qatomic_read(&now_stopping)) { 393 qemu_mutex_lock(&mutex); 394 counter++; 395 qemu_mutex_unlock(&mutex); 396 qatomic_inc(&atomic_counter); 397 } 398 qatomic_dec(&running); 399 } 400 401 static void test_multi_mutex(int threads, int seconds) 402 { 403 int i; 404 405 qemu_mutex_init(&mutex); 406 counter = 0; 407 atomic_counter = 0; 408 now_stopping = false; 409 410 create_aio_contexts(); 411 assert(threads <= NUM_CONTEXTS); 412 running = threads; 413 for (i = 0; i < threads; i++) { 414 Coroutine *co1 = qemu_coroutine_create(test_multi_mutex_entry, NULL); 415 aio_co_schedule(ctx[i], co1); 416 } 417 418 g_usleep(seconds * 1000000); 419 420 qatomic_set(&now_stopping, true); 421 while (running > 0) { 422 g_usleep(100000); 423 } 424 425 join_aio_contexts(); 426 g_test_message("%d iterations/second", counter / seconds); 427 g_assert_cmpint(counter, ==, atomic_counter); 428 } 429 430 static void test_multi_mutex_1(void) 431 { 432 test_multi_mutex(NUM_CONTEXTS, 1); 433 } 434 435 static void test_multi_mutex_10(void) 436 { 437 test_multi_mutex(NUM_CONTEXTS, 10); 438 } 439 440 /* End of tests. */ 441 442 int main(int argc, char **argv) 443 { 444 init_clocks(NULL); 445 446 g_test_init(&argc, &argv, NULL); 447 g_test_add_func("/aio/multi/lifecycle", test_lifecycle); 448 if (g_test_quick()) { 449 g_test_add_func("/aio/multi/schedule", test_multi_co_schedule_1); 450 g_test_add_func("/aio/multi/mutex/contended", test_multi_co_mutex_1); 451 g_test_add_func("/aio/multi/mutex/handoff", test_multi_co_mutex_2_3); 452 #ifdef CONFIG_LINUX 453 g_test_add_func("/aio/multi/mutex/mcs", test_multi_fair_mutex_1); 454 #endif 455 g_test_add_func("/aio/multi/mutex/pthread", test_multi_mutex_1); 456 } else { 457 g_test_add_func("/aio/multi/schedule", test_multi_co_schedule_10); 458 g_test_add_func("/aio/multi/mutex/contended", test_multi_co_mutex_10); 459 g_test_add_func("/aio/multi/mutex/handoff", test_multi_co_mutex_2_30); 460 #ifdef CONFIG_LINUX 461 g_test_add_func("/aio/multi/mutex/mcs", test_multi_fair_mutex_10); 462 #endif 463 g_test_add_func("/aio/multi/mutex/pthread", test_multi_mutex_10); 464 } 465 return g_test_run(); 466 } 467