1 /* 2 * AioContext multithreading tests 3 * 4 * Copyright Red Hat, Inc. 2016 5 * 6 * Authors: 7 * Paolo Bonzini <pbonzini@redhat.com> 8 * 9 * This work is licensed under the terms of the GNU LGPL, version 2 or later. 10 * See the COPYING.LIB file in the top-level directory. 11 */ 12 13 #include "qemu/osdep.h" 14 #include "block/aio.h" 15 #include "qemu/coroutine.h" 16 #include "qemu/thread.h" 17 #include "qemu/error-report.h" 18 #include "iothread.h" 19 20 /* AioContext management */ 21 22 #define NUM_CONTEXTS 5 23 24 static IOThread *threads[NUM_CONTEXTS]; 25 static AioContext *ctx[NUM_CONTEXTS]; 26 static __thread int id = -1; 27 28 static QemuEvent done_event; 29 30 /* Run a function synchronously on a remote iothread. */ 31 32 typedef struct CtxRunData { 33 QEMUBHFunc *cb; 34 void *arg; 35 } CtxRunData; 36 37 static void ctx_run_bh_cb(void *opaque) 38 { 39 CtxRunData *data = opaque; 40 41 data->cb(data->arg); 42 qemu_event_set(&done_event); 43 } 44 45 static void ctx_run(int i, QEMUBHFunc *cb, void *opaque) 46 { 47 CtxRunData data = { 48 .cb = cb, 49 .arg = opaque 50 }; 51 52 qemu_event_reset(&done_event); 53 aio_bh_schedule_oneshot(ctx[i], ctx_run_bh_cb, &data); 54 qemu_event_wait(&done_event); 55 } 56 57 /* Starting the iothreads. */ 58 59 static void set_id_cb(void *opaque) 60 { 61 int *i = opaque; 62 63 id = *i; 64 } 65 66 static void create_aio_contexts(void) 67 { 68 int i; 69 70 for (i = 0; i < NUM_CONTEXTS; i++) { 71 threads[i] = iothread_new(); 72 ctx[i] = iothread_get_aio_context(threads[i]); 73 } 74 75 qemu_event_init(&done_event, false); 76 for (i = 0; i < NUM_CONTEXTS; i++) { 77 ctx_run(i, set_id_cb, &i); 78 } 79 } 80 81 /* Stopping the iothreads. */ 82 83 static void join_aio_contexts(void) 84 { 85 int i; 86 87 for (i = 0; i < NUM_CONTEXTS; i++) { 88 aio_context_ref(ctx[i]); 89 } 90 for (i = 0; i < NUM_CONTEXTS; i++) { 91 iothread_join(threads[i]); 92 } 93 for (i = 0; i < NUM_CONTEXTS; i++) { 94 aio_context_unref(ctx[i]); 95 } 96 qemu_event_destroy(&done_event); 97 } 98 99 /* Basic test for the stuff above. */ 100 101 static void test_lifecycle(void) 102 { 103 create_aio_contexts(); 104 join_aio_contexts(); 105 } 106 107 /* aio_co_schedule test. */ 108 109 static Coroutine *to_schedule[NUM_CONTEXTS]; 110 111 static bool now_stopping; 112 113 static int count_retry; 114 static int count_here; 115 static int count_other; 116 117 static bool schedule_next(int n) 118 { 119 Coroutine *co; 120 121 co = qatomic_xchg(&to_schedule[n], NULL); 122 if (!co) { 123 qatomic_inc(&count_retry); 124 return false; 125 } 126 127 if (n == id) { 128 qatomic_inc(&count_here); 129 } else { 130 qatomic_inc(&count_other); 131 } 132 133 aio_co_schedule(ctx[n], co); 134 return true; 135 } 136 137 static void finish_cb(void *opaque) 138 { 139 schedule_next(id); 140 } 141 142 static coroutine_fn void test_multi_co_schedule_entry(void *opaque) 143 { 144 g_assert(to_schedule[id] == NULL); 145 146 while (!qatomic_mb_read(&now_stopping)) { 147 int n; 148 149 n = g_test_rand_int_range(0, NUM_CONTEXTS); 150 schedule_next(n); 151 152 qatomic_mb_set(&to_schedule[id], qemu_coroutine_self()); 153 qemu_coroutine_yield(); 154 g_assert(to_schedule[id] == NULL); 155 } 156 } 157 158 159 static void test_multi_co_schedule(int seconds) 160 { 161 int i; 162 163 count_here = count_other = count_retry = 0; 164 now_stopping = false; 165 166 create_aio_contexts(); 167 for (i = 0; i < NUM_CONTEXTS; i++) { 168 Coroutine *co1 = qemu_coroutine_create(test_multi_co_schedule_entry, NULL); 169 aio_co_schedule(ctx[i], co1); 170 } 171 172 g_usleep(seconds * 1000000); 173 174 qatomic_mb_set(&now_stopping, true); 175 for (i = 0; i < NUM_CONTEXTS; i++) { 176 ctx_run(i, finish_cb, NULL); 177 to_schedule[i] = NULL; 178 } 179 180 join_aio_contexts(); 181 g_test_message("scheduled %d, queued %d, retry %d, total %d", 182 count_other, count_here, count_retry, 183 count_here + count_other + count_retry); 184 } 185 186 static void test_multi_co_schedule_1(void) 187 { 188 test_multi_co_schedule(1); 189 } 190 191 static void test_multi_co_schedule_10(void) 192 { 193 test_multi_co_schedule(10); 194 } 195 196 /* CoMutex thread-safety. */ 197 198 static uint32_t atomic_counter; 199 static uint32_t running; 200 static uint32_t counter; 201 static CoMutex comutex; 202 203 static void coroutine_fn test_multi_co_mutex_entry(void *opaque) 204 { 205 while (!qatomic_mb_read(&now_stopping)) { 206 qemu_co_mutex_lock(&comutex); 207 counter++; 208 qemu_co_mutex_unlock(&comutex); 209 210 /* Increase atomic_counter *after* releasing the mutex. Otherwise 211 * there is a chance (it happens about 1 in 3 runs) that the iothread 212 * exits before the coroutine is woken up, causing a spurious 213 * assertion failure. 214 */ 215 qatomic_inc(&atomic_counter); 216 } 217 qatomic_dec(&running); 218 } 219 220 static void test_multi_co_mutex(int threads, int seconds) 221 { 222 int i; 223 224 qemu_co_mutex_init(&comutex); 225 counter = 0; 226 atomic_counter = 0; 227 now_stopping = false; 228 229 create_aio_contexts(); 230 assert(threads <= NUM_CONTEXTS); 231 running = threads; 232 for (i = 0; i < threads; i++) { 233 Coroutine *co1 = qemu_coroutine_create(test_multi_co_mutex_entry, NULL); 234 aio_co_schedule(ctx[i], co1); 235 } 236 237 g_usleep(seconds * 1000000); 238 239 qatomic_mb_set(&now_stopping, true); 240 while (running > 0) { 241 g_usleep(100000); 242 } 243 244 join_aio_contexts(); 245 g_test_message("%d iterations/second", counter / seconds); 246 g_assert_cmpint(counter, ==, atomic_counter); 247 } 248 249 /* Testing with NUM_CONTEXTS threads focuses on the queue. The mutex however 250 * is too contended (and the threads spend too much time in aio_poll) 251 * to actually stress the handoff protocol. 252 */ 253 static void test_multi_co_mutex_1(void) 254 { 255 test_multi_co_mutex(NUM_CONTEXTS, 1); 256 } 257 258 static void test_multi_co_mutex_10(void) 259 { 260 test_multi_co_mutex(NUM_CONTEXTS, 10); 261 } 262 263 /* Testing with fewer threads stresses the handoff protocol too. Still, the 264 * case where the locker _can_ pick up a handoff is very rare, happening 265 * about 10 times in 1 million, so increase the runtime a bit compared to 266 * other "quick" testcases that only run for 1 second. 267 */ 268 static void test_multi_co_mutex_2_3(void) 269 { 270 test_multi_co_mutex(2, 3); 271 } 272 273 static void test_multi_co_mutex_2_30(void) 274 { 275 test_multi_co_mutex(2, 30); 276 } 277 278 /* Same test with fair mutexes, for performance comparison. */ 279 280 #ifdef CONFIG_LINUX 281 #include "qemu/futex.h" 282 283 /* The nodes for the mutex reside in this structure (on which we try to avoid 284 * false sharing). The head of the mutex is in the "mutex_head" variable. 285 */ 286 static struct { 287 int next, locked; 288 int padding[14]; 289 } nodes[NUM_CONTEXTS] __attribute__((__aligned__(64))); 290 291 static int mutex_head = -1; 292 293 static void mcs_mutex_lock(void) 294 { 295 int prev; 296 297 nodes[id].next = -1; 298 nodes[id].locked = 1; 299 prev = qatomic_xchg(&mutex_head, id); 300 if (prev != -1) { 301 qatomic_set(&nodes[prev].next, id); 302 qemu_futex_wait(&nodes[id].locked, 1); 303 } 304 } 305 306 static void mcs_mutex_unlock(void) 307 { 308 int next; 309 if (qatomic_read(&nodes[id].next) == -1) { 310 if (qatomic_read(&mutex_head) == id && 311 qatomic_cmpxchg(&mutex_head, id, -1) == id) { 312 /* Last item in the list, exit. */ 313 return; 314 } 315 while (qatomic_read(&nodes[id].next) == -1) { 316 /* mcs_mutex_lock did the xchg, but has not updated 317 * nodes[prev].next yet. 318 */ 319 } 320 } 321 322 /* Wake up the next in line. */ 323 next = qatomic_read(&nodes[id].next); 324 nodes[next].locked = 0; 325 qemu_futex_wake(&nodes[next].locked, 1); 326 } 327 328 static void test_multi_fair_mutex_entry(void *opaque) 329 { 330 while (!qatomic_mb_read(&now_stopping)) { 331 mcs_mutex_lock(); 332 counter++; 333 mcs_mutex_unlock(); 334 qatomic_inc(&atomic_counter); 335 } 336 qatomic_dec(&running); 337 } 338 339 static void test_multi_fair_mutex(int threads, int seconds) 340 { 341 int i; 342 343 assert(mutex_head == -1); 344 counter = 0; 345 atomic_counter = 0; 346 now_stopping = false; 347 348 create_aio_contexts(); 349 assert(threads <= NUM_CONTEXTS); 350 running = threads; 351 for (i = 0; i < threads; i++) { 352 Coroutine *co1 = qemu_coroutine_create(test_multi_fair_mutex_entry, NULL); 353 aio_co_schedule(ctx[i], co1); 354 } 355 356 g_usleep(seconds * 1000000); 357 358 qatomic_mb_set(&now_stopping, true); 359 while (running > 0) { 360 g_usleep(100000); 361 } 362 363 join_aio_contexts(); 364 g_test_message("%d iterations/second", counter / seconds); 365 g_assert_cmpint(counter, ==, atomic_counter); 366 } 367 368 static void test_multi_fair_mutex_1(void) 369 { 370 test_multi_fair_mutex(NUM_CONTEXTS, 1); 371 } 372 373 static void test_multi_fair_mutex_10(void) 374 { 375 test_multi_fair_mutex(NUM_CONTEXTS, 10); 376 } 377 #endif 378 379 /* Same test with pthread mutexes, for performance comparison and 380 * portability. */ 381 382 static QemuMutex mutex; 383 384 static void test_multi_mutex_entry(void *opaque) 385 { 386 while (!qatomic_mb_read(&now_stopping)) { 387 qemu_mutex_lock(&mutex); 388 counter++; 389 qemu_mutex_unlock(&mutex); 390 qatomic_inc(&atomic_counter); 391 } 392 qatomic_dec(&running); 393 } 394 395 static void test_multi_mutex(int threads, int seconds) 396 { 397 int i; 398 399 qemu_mutex_init(&mutex); 400 counter = 0; 401 atomic_counter = 0; 402 now_stopping = false; 403 404 create_aio_contexts(); 405 assert(threads <= NUM_CONTEXTS); 406 running = threads; 407 for (i = 0; i < threads; i++) { 408 Coroutine *co1 = qemu_coroutine_create(test_multi_mutex_entry, NULL); 409 aio_co_schedule(ctx[i], co1); 410 } 411 412 g_usleep(seconds * 1000000); 413 414 qatomic_mb_set(&now_stopping, true); 415 while (running > 0) { 416 g_usleep(100000); 417 } 418 419 join_aio_contexts(); 420 g_test_message("%d iterations/second", counter / seconds); 421 g_assert_cmpint(counter, ==, atomic_counter); 422 } 423 424 static void test_multi_mutex_1(void) 425 { 426 test_multi_mutex(NUM_CONTEXTS, 1); 427 } 428 429 static void test_multi_mutex_10(void) 430 { 431 test_multi_mutex(NUM_CONTEXTS, 10); 432 } 433 434 /* End of tests. */ 435 436 int main(int argc, char **argv) 437 { 438 init_clocks(NULL); 439 440 g_test_init(&argc, &argv, NULL); 441 g_test_add_func("/aio/multi/lifecycle", test_lifecycle); 442 if (g_test_quick()) { 443 g_test_add_func("/aio/multi/schedule", test_multi_co_schedule_1); 444 g_test_add_func("/aio/multi/mutex/contended", test_multi_co_mutex_1); 445 g_test_add_func("/aio/multi/mutex/handoff", test_multi_co_mutex_2_3); 446 #ifdef CONFIG_LINUX 447 g_test_add_func("/aio/multi/mutex/mcs", test_multi_fair_mutex_1); 448 #endif 449 g_test_add_func("/aio/multi/mutex/pthread", test_multi_mutex_1); 450 } else { 451 g_test_add_func("/aio/multi/schedule", test_multi_co_schedule_10); 452 g_test_add_func("/aio/multi/mutex/contended", test_multi_co_mutex_10); 453 g_test_add_func("/aio/multi/mutex/handoff", test_multi_co_mutex_2_30); 454 #ifdef CONFIG_LINUX 455 g_test_add_func("/aio/multi/mutex/mcs", test_multi_fair_mutex_10); 456 #endif 457 g_test_add_func("/aio/multi/mutex/pthread", test_multi_mutex_10); 458 } 459 return g_test_run(); 460 } 461