1 /*
2  * AioContext multithreading tests
3  *
4  * Copyright Red Hat, Inc. 2016
5  *
6  * Authors:
7  *  Paolo Bonzini    <pbonzini@redhat.com>
8  *
9  * This work is licensed under the terms of the GNU LGPL, version 2 or later.
10  * See the COPYING.LIB file in the top-level directory.
11  */
12 
13 #include "qemu/osdep.h"
14 #include "block/aio.h"
15 #include "qemu/coroutine.h"
16 #include "qemu/thread.h"
17 #include "qemu/error-report.h"
18 #include "iothread.h"
19 
20 /* AioContext management */
21 
22 #define NUM_CONTEXTS 5
23 
24 static IOThread *threads[NUM_CONTEXTS];
25 static AioContext *ctx[NUM_CONTEXTS];
26 static __thread int id = -1;
27 
28 static QemuEvent done_event;
29 
30 /* Run a function synchronously on a remote iothread. */
31 
32 typedef struct CtxRunData {
33     QEMUBHFunc *cb;
34     void *arg;
35 } CtxRunData;
36 
37 static void ctx_run_bh_cb(void *opaque)
38 {
39     CtxRunData *data = opaque;
40 
41     data->cb(data->arg);
42     qemu_event_set(&done_event);
43 }
44 
45 static void ctx_run(int i, QEMUBHFunc *cb, void *opaque)
46 {
47     CtxRunData data = {
48         .cb = cb,
49         .arg = opaque
50     };
51 
52     qemu_event_reset(&done_event);
53     aio_bh_schedule_oneshot(ctx[i], ctx_run_bh_cb, &data);
54     qemu_event_wait(&done_event);
55 }
56 
57 /* Starting the iothreads. */
58 
59 static void set_id_cb(void *opaque)
60 {
61     int *i = opaque;
62 
63     id = *i;
64 }
65 
66 static void create_aio_contexts(void)
67 {
68     int i;
69 
70     for (i = 0; i < NUM_CONTEXTS; i++) {
71         threads[i] = iothread_new();
72         ctx[i] = iothread_get_aio_context(threads[i]);
73     }
74 
75     qemu_event_init(&done_event, false);
76     for (i = 0; i < NUM_CONTEXTS; i++) {
77         ctx_run(i, set_id_cb, &i);
78     }
79 }
80 
81 /* Stopping the iothreads. */
82 
83 static void join_aio_contexts(void)
84 {
85     int i;
86 
87     for (i = 0; i < NUM_CONTEXTS; i++) {
88         aio_context_ref(ctx[i]);
89     }
90     for (i = 0; i < NUM_CONTEXTS; i++) {
91         iothread_join(threads[i]);
92     }
93     for (i = 0; i < NUM_CONTEXTS; i++) {
94         aio_context_unref(ctx[i]);
95     }
96     qemu_event_destroy(&done_event);
97 }
98 
99 /* Basic test for the stuff above. */
100 
101 static void test_lifecycle(void)
102 {
103     create_aio_contexts();
104     join_aio_contexts();
105 }
106 
107 /* aio_co_schedule test.  */
108 
109 static Coroutine *to_schedule[NUM_CONTEXTS];
110 
111 static bool now_stopping;
112 
113 static int count_retry;
114 static int count_here;
115 static int count_other;
116 
117 static bool schedule_next(int n)
118 {
119     Coroutine *co;
120 
121     co = qatomic_xchg(&to_schedule[n], NULL);
122     if (!co) {
123         qatomic_inc(&count_retry);
124         return false;
125     }
126 
127     if (n == id) {
128         qatomic_inc(&count_here);
129     } else {
130         qatomic_inc(&count_other);
131     }
132 
133     aio_co_schedule(ctx[n], co);
134     return true;
135 }
136 
137 static void finish_cb(void *opaque)
138 {
139     schedule_next(id);
140 }
141 
142 static coroutine_fn void test_multi_co_schedule_entry(void *opaque)
143 {
144     g_assert(to_schedule[id] == NULL);
145 
146     while (!qatomic_mb_read(&now_stopping)) {
147         int n;
148 
149         n = g_test_rand_int_range(0, NUM_CONTEXTS);
150         schedule_next(n);
151 
152         qatomic_mb_set(&to_schedule[id], qemu_coroutine_self());
153         qemu_coroutine_yield();
154         g_assert(to_schedule[id] == NULL);
155     }
156 }
157 
158 
159 static void test_multi_co_schedule(int seconds)
160 {
161     int i;
162 
163     count_here = count_other = count_retry = 0;
164     now_stopping = false;
165 
166     create_aio_contexts();
167     for (i = 0; i < NUM_CONTEXTS; i++) {
168         Coroutine *co1 = qemu_coroutine_create(test_multi_co_schedule_entry, NULL);
169         aio_co_schedule(ctx[i], co1);
170     }
171 
172     g_usleep(seconds * 1000000);
173 
174     qatomic_mb_set(&now_stopping, true);
175     for (i = 0; i < NUM_CONTEXTS; i++) {
176         ctx_run(i, finish_cb, NULL);
177         to_schedule[i] = NULL;
178     }
179 
180     join_aio_contexts();
181     g_test_message("scheduled %d, queued %d, retry %d, total %d",
182                   count_other, count_here, count_retry,
183                   count_here + count_other + count_retry);
184 }
185 
186 static void test_multi_co_schedule_1(void)
187 {
188     test_multi_co_schedule(1);
189 }
190 
191 static void test_multi_co_schedule_10(void)
192 {
193     test_multi_co_schedule(10);
194 }
195 
196 /* CoMutex thread-safety.  */
197 
198 static uint32_t atomic_counter;
199 static uint32_t running;
200 static uint32_t counter;
201 static CoMutex comutex;
202 
203 static void coroutine_fn test_multi_co_mutex_entry(void *opaque)
204 {
205     while (!qatomic_mb_read(&now_stopping)) {
206         qemu_co_mutex_lock(&comutex);
207         counter++;
208         qemu_co_mutex_unlock(&comutex);
209 
210         /* Increase atomic_counter *after* releasing the mutex.  Otherwise
211          * there is a chance (it happens about 1 in 3 runs) that the iothread
212          * exits before the coroutine is woken up, causing a spurious
213          * assertion failure.
214          */
215         qatomic_inc(&atomic_counter);
216     }
217     qatomic_dec(&running);
218 }
219 
220 static void test_multi_co_mutex(int threads, int seconds)
221 {
222     int i;
223 
224     qemu_co_mutex_init(&comutex);
225     counter = 0;
226     atomic_counter = 0;
227     now_stopping = false;
228 
229     create_aio_contexts();
230     assert(threads <= NUM_CONTEXTS);
231     running = threads;
232     for (i = 0; i < threads; i++) {
233         Coroutine *co1 = qemu_coroutine_create(test_multi_co_mutex_entry, NULL);
234         aio_co_schedule(ctx[i], co1);
235     }
236 
237     g_usleep(seconds * 1000000);
238 
239     qatomic_mb_set(&now_stopping, true);
240     while (running > 0) {
241         g_usleep(100000);
242     }
243 
244     join_aio_contexts();
245     g_test_message("%d iterations/second", counter / seconds);
246     g_assert_cmpint(counter, ==, atomic_counter);
247 }
248 
249 /* Testing with NUM_CONTEXTS threads focuses on the queue.  The mutex however
250  * is too contended (and the threads spend too much time in aio_poll)
251  * to actually stress the handoff protocol.
252  */
253 static void test_multi_co_mutex_1(void)
254 {
255     test_multi_co_mutex(NUM_CONTEXTS, 1);
256 }
257 
258 static void test_multi_co_mutex_10(void)
259 {
260     test_multi_co_mutex(NUM_CONTEXTS, 10);
261 }
262 
263 /* Testing with fewer threads stresses the handoff protocol too.  Still, the
264  * case where the locker _can_ pick up a handoff is very rare, happening
265  * about 10 times in 1 million, so increase the runtime a bit compared to
266  * other "quick" testcases that only run for 1 second.
267  */
268 static void test_multi_co_mutex_2_3(void)
269 {
270     test_multi_co_mutex(2, 3);
271 }
272 
273 static void test_multi_co_mutex_2_30(void)
274 {
275     test_multi_co_mutex(2, 30);
276 }
277 
278 /* Same test with fair mutexes, for performance comparison.  */
279 
280 #ifdef CONFIG_LINUX
281 #include "qemu/futex.h"
282 
283 /* The nodes for the mutex reside in this structure (on which we try to avoid
284  * false sharing).  The head of the mutex is in the "mutex_head" variable.
285  */
286 static struct {
287     int next, locked;
288     int padding[14];
289 } nodes[NUM_CONTEXTS] __attribute__((__aligned__(64)));
290 
291 static int mutex_head = -1;
292 
293 static void mcs_mutex_lock(void)
294 {
295     int prev;
296 
297     nodes[id].next = -1;
298     nodes[id].locked = 1;
299     prev = qatomic_xchg(&mutex_head, id);
300     if (prev != -1) {
301         qatomic_set(&nodes[prev].next, id);
302         qemu_futex_wait(&nodes[id].locked, 1);
303     }
304 }
305 
306 static void mcs_mutex_unlock(void)
307 {
308     int next;
309     if (qatomic_read(&nodes[id].next) == -1) {
310         if (qatomic_read(&mutex_head) == id &&
311             qatomic_cmpxchg(&mutex_head, id, -1) == id) {
312             /* Last item in the list, exit.  */
313             return;
314         }
315         while (qatomic_read(&nodes[id].next) == -1) {
316             /* mcs_mutex_lock did the xchg, but has not updated
317              * nodes[prev].next yet.
318              */
319         }
320     }
321 
322     /* Wake up the next in line.  */
323     next = qatomic_read(&nodes[id].next);
324     nodes[next].locked = 0;
325     qemu_futex_wake(&nodes[next].locked, 1);
326 }
327 
328 static void test_multi_fair_mutex_entry(void *opaque)
329 {
330     while (!qatomic_mb_read(&now_stopping)) {
331         mcs_mutex_lock();
332         counter++;
333         mcs_mutex_unlock();
334         qatomic_inc(&atomic_counter);
335     }
336     qatomic_dec(&running);
337 }
338 
339 static void test_multi_fair_mutex(int threads, int seconds)
340 {
341     int i;
342 
343     assert(mutex_head == -1);
344     counter = 0;
345     atomic_counter = 0;
346     now_stopping = false;
347 
348     create_aio_contexts();
349     assert(threads <= NUM_CONTEXTS);
350     running = threads;
351     for (i = 0; i < threads; i++) {
352         Coroutine *co1 = qemu_coroutine_create(test_multi_fair_mutex_entry, NULL);
353         aio_co_schedule(ctx[i], co1);
354     }
355 
356     g_usleep(seconds * 1000000);
357 
358     qatomic_mb_set(&now_stopping, true);
359     while (running > 0) {
360         g_usleep(100000);
361     }
362 
363     join_aio_contexts();
364     g_test_message("%d iterations/second", counter / seconds);
365     g_assert_cmpint(counter, ==, atomic_counter);
366 }
367 
368 static void test_multi_fair_mutex_1(void)
369 {
370     test_multi_fair_mutex(NUM_CONTEXTS, 1);
371 }
372 
373 static void test_multi_fair_mutex_10(void)
374 {
375     test_multi_fair_mutex(NUM_CONTEXTS, 10);
376 }
377 #endif
378 
379 /* Same test with pthread mutexes, for performance comparison and
380  * portability.  */
381 
382 static QemuMutex mutex;
383 
384 static void test_multi_mutex_entry(void *opaque)
385 {
386     while (!qatomic_mb_read(&now_stopping)) {
387         qemu_mutex_lock(&mutex);
388         counter++;
389         qemu_mutex_unlock(&mutex);
390         qatomic_inc(&atomic_counter);
391     }
392     qatomic_dec(&running);
393 }
394 
395 static void test_multi_mutex(int threads, int seconds)
396 {
397     int i;
398 
399     qemu_mutex_init(&mutex);
400     counter = 0;
401     atomic_counter = 0;
402     now_stopping = false;
403 
404     create_aio_contexts();
405     assert(threads <= NUM_CONTEXTS);
406     running = threads;
407     for (i = 0; i < threads; i++) {
408         Coroutine *co1 = qemu_coroutine_create(test_multi_mutex_entry, NULL);
409         aio_co_schedule(ctx[i], co1);
410     }
411 
412     g_usleep(seconds * 1000000);
413 
414     qatomic_mb_set(&now_stopping, true);
415     while (running > 0) {
416         g_usleep(100000);
417     }
418 
419     join_aio_contexts();
420     g_test_message("%d iterations/second", counter / seconds);
421     g_assert_cmpint(counter, ==, atomic_counter);
422 }
423 
424 static void test_multi_mutex_1(void)
425 {
426     test_multi_mutex(NUM_CONTEXTS, 1);
427 }
428 
429 static void test_multi_mutex_10(void)
430 {
431     test_multi_mutex(NUM_CONTEXTS, 10);
432 }
433 
434 /* End of tests.  */
435 
436 int main(int argc, char **argv)
437 {
438     init_clocks(NULL);
439 
440     g_test_init(&argc, &argv, NULL);
441     g_test_add_func("/aio/multi/lifecycle", test_lifecycle);
442     if (g_test_quick()) {
443         g_test_add_func("/aio/multi/schedule", test_multi_co_schedule_1);
444         g_test_add_func("/aio/multi/mutex/contended", test_multi_co_mutex_1);
445         g_test_add_func("/aio/multi/mutex/handoff", test_multi_co_mutex_2_3);
446 #ifdef CONFIG_LINUX
447         g_test_add_func("/aio/multi/mutex/mcs", test_multi_fair_mutex_1);
448 #endif
449         g_test_add_func("/aio/multi/mutex/pthread", test_multi_mutex_1);
450     } else {
451         g_test_add_func("/aio/multi/schedule", test_multi_co_schedule_10);
452         g_test_add_func("/aio/multi/mutex/contended", test_multi_co_mutex_10);
453         g_test_add_func("/aio/multi/mutex/handoff", test_multi_co_mutex_2_30);
454 #ifdef CONFIG_LINUX
455         g_test_add_func("/aio/multi/mutex/mcs", test_multi_fair_mutex_10);
456 #endif
457         g_test_add_func("/aio/multi/mutex/pthread", test_multi_mutex_10);
458     }
459     return g_test_run();
460 }
461