1 /*
2 * Event loop thread implementation for unit tests
3 *
4 * Copyright Red Hat Inc., 2013, 2016
5 *
6 * Authors:
7 * Stefan Hajnoczi <stefanha@redhat.com>
8 * Paolo Bonzini <pbonzini@redhat.com>
9 *
10 * This work is licensed under the terms of the GNU GPL, version 2 or later.
11 * See the COPYING file in the top-level directory.
12 *
13 */
14
15 #include "qemu/osdep.h"
16 #include "qapi/error.h"
17 #include "block/aio.h"
18 #include "qemu/main-loop.h"
19 #include "qemu/rcu.h"
20 #include "iothread.h"
21
22 struct IOThread {
23 AioContext *ctx;
24 GMainContext *worker_context;
25 GMainLoop *main_loop;
26
27 QemuThread thread;
28 QemuMutex init_done_lock;
29 QemuCond init_done_cond; /* is thread initialization done? */
30 bool stopping;
31 };
32
iothread_init_gcontext(IOThread * iothread)33 static void iothread_init_gcontext(IOThread *iothread)
34 {
35 GSource *source;
36
37 iothread->worker_context = g_main_context_new();
38 source = aio_get_g_source(iothread_get_aio_context(iothread));
39 g_source_attach(source, iothread->worker_context);
40 g_source_unref(source);
41 iothread->main_loop = g_main_loop_new(iothread->worker_context, TRUE);
42 }
43
iothread_run(void * opaque)44 static void *iothread_run(void *opaque)
45 {
46 IOThread *iothread = opaque;
47
48 rcu_register_thread();
49
50 qemu_mutex_lock(&iothread->init_done_lock);
51 iothread->ctx = aio_context_new(&error_abort);
52 qemu_set_current_aio_context(iothread->ctx);
53
54 /*
55 * We must connect the ctx to a GMainContext, because in older versions
56 * of glib the g_source_ref()/unref() functions are not threadsafe
57 * on sources without a context.
58 */
59 iothread_init_gcontext(iothread);
60
61 /*
62 * g_main_context_push_thread_default() must be called before anything
63 * in this new thread uses glib.
64 */
65 g_main_context_push_thread_default(iothread->worker_context);
66
67 qemu_cond_signal(&iothread->init_done_cond);
68 qemu_mutex_unlock(&iothread->init_done_lock);
69
70 while (!qatomic_read(&iothread->stopping)) {
71 aio_poll(iothread->ctx, true);
72 }
73
74 g_main_context_pop_thread_default(iothread->worker_context);
75 rcu_unregister_thread();
76 return NULL;
77 }
78
iothread_stop_bh(void * opaque)79 static void iothread_stop_bh(void *opaque)
80 {
81 IOThread *iothread = opaque;
82
83 iothread->stopping = true;
84 }
85
iothread_join(IOThread * iothread)86 void iothread_join(IOThread *iothread)
87 {
88 aio_bh_schedule_oneshot(iothread->ctx, iothread_stop_bh, iothread);
89 qemu_thread_join(&iothread->thread);
90 g_main_context_unref(iothread->worker_context);
91 g_main_loop_unref(iothread->main_loop);
92 qemu_cond_destroy(&iothread->init_done_cond);
93 qemu_mutex_destroy(&iothread->init_done_lock);
94 aio_context_unref(iothread->ctx);
95 g_free(iothread);
96 }
97
iothread_new(void)98 IOThread *iothread_new(void)
99 {
100 IOThread *iothread = g_new0(IOThread, 1);
101
102 qemu_mutex_init(&iothread->init_done_lock);
103 qemu_cond_init(&iothread->init_done_cond);
104 qemu_thread_create(&iothread->thread, NULL, iothread_run,
105 iothread, QEMU_THREAD_JOINABLE);
106
107 /* Wait for initialization to complete */
108 qemu_mutex_lock(&iothread->init_done_lock);
109 while (iothread->ctx == NULL) {
110 qemu_cond_wait(&iothread->init_done_cond,
111 &iothread->init_done_lock);
112 }
113 qemu_mutex_unlock(&iothread->init_done_lock);
114 return iothread;
115 }
116
iothread_get_aio_context(IOThread * iothread)117 AioContext *iothread_get_aio_context(IOThread *iothread)
118 {
119 return iothread->ctx;
120 }
121