1da668aa1SThomas Huth /*
2da668aa1SThomas Huth * Event loop thread implementation for unit tests
3da668aa1SThomas Huth *
4da668aa1SThomas Huth * Copyright Red Hat Inc., 2013, 2016
5da668aa1SThomas Huth *
6da668aa1SThomas Huth * Authors:
7da668aa1SThomas Huth * Stefan Hajnoczi <stefanha@redhat.com>
8da668aa1SThomas Huth * Paolo Bonzini <pbonzini@redhat.com>
9da668aa1SThomas Huth *
10da668aa1SThomas Huth * This work is licensed under the terms of the GNU GPL, version 2 or later.
11da668aa1SThomas Huth * See the COPYING file in the top-level directory.
12da668aa1SThomas Huth *
13da668aa1SThomas Huth */
14da668aa1SThomas Huth
15da668aa1SThomas Huth #include "qemu/osdep.h"
16da668aa1SThomas Huth #include "qapi/error.h"
17da668aa1SThomas Huth #include "block/aio.h"
18da668aa1SThomas Huth #include "qemu/main-loop.h"
19da668aa1SThomas Huth #include "qemu/rcu.h"
20da668aa1SThomas Huth #include "iothread.h"
21da668aa1SThomas Huth
22da668aa1SThomas Huth struct IOThread {
23da668aa1SThomas Huth AioContext *ctx;
24da668aa1SThomas Huth GMainContext *worker_context;
25da668aa1SThomas Huth GMainLoop *main_loop;
26da668aa1SThomas Huth
27da668aa1SThomas Huth QemuThread thread;
28da668aa1SThomas Huth QemuMutex init_done_lock;
29da668aa1SThomas Huth QemuCond init_done_cond; /* is thread initialization done? */
30da668aa1SThomas Huth bool stopping;
31da668aa1SThomas Huth };
32da668aa1SThomas Huth
iothread_init_gcontext(IOThread * iothread)33da668aa1SThomas Huth static void iothread_init_gcontext(IOThread *iothread)
34da668aa1SThomas Huth {
35da668aa1SThomas Huth GSource *source;
36da668aa1SThomas Huth
37da668aa1SThomas Huth iothread->worker_context = g_main_context_new();
38da668aa1SThomas Huth source = aio_get_g_source(iothread_get_aio_context(iothread));
39da668aa1SThomas Huth g_source_attach(source, iothread->worker_context);
40da668aa1SThomas Huth g_source_unref(source);
41da668aa1SThomas Huth iothread->main_loop = g_main_loop_new(iothread->worker_context, TRUE);
42da668aa1SThomas Huth }
43da668aa1SThomas Huth
iothread_run(void * opaque)44da668aa1SThomas Huth static void *iothread_run(void *opaque)
45da668aa1SThomas Huth {
46da668aa1SThomas Huth IOThread *iothread = opaque;
47da668aa1SThomas Huth
48da668aa1SThomas Huth rcu_register_thread();
49da668aa1SThomas Huth
50da668aa1SThomas Huth qemu_mutex_lock(&iothread->init_done_lock);
51da668aa1SThomas Huth iothread->ctx = aio_context_new(&error_abort);
52*5f50be9bSPaolo Bonzini qemu_set_current_aio_context(iothread->ctx);
53da668aa1SThomas Huth
54da668aa1SThomas Huth /*
55da668aa1SThomas Huth * We must connect the ctx to a GMainContext, because in older versions
56da668aa1SThomas Huth * of glib the g_source_ref()/unref() functions are not threadsafe
57da668aa1SThomas Huth * on sources without a context.
58da668aa1SThomas Huth */
59da668aa1SThomas Huth iothread_init_gcontext(iothread);
60da668aa1SThomas Huth
61da668aa1SThomas Huth /*
62da668aa1SThomas Huth * g_main_context_push_thread_default() must be called before anything
63da668aa1SThomas Huth * in this new thread uses glib.
64da668aa1SThomas Huth */
65da668aa1SThomas Huth g_main_context_push_thread_default(iothread->worker_context);
66da668aa1SThomas Huth
67da668aa1SThomas Huth qemu_cond_signal(&iothread->init_done_cond);
68da668aa1SThomas Huth qemu_mutex_unlock(&iothread->init_done_lock);
69da668aa1SThomas Huth
70da668aa1SThomas Huth while (!qatomic_read(&iothread->stopping)) {
71da668aa1SThomas Huth aio_poll(iothread->ctx, true);
72da668aa1SThomas Huth }
73da668aa1SThomas Huth
74da668aa1SThomas Huth g_main_context_pop_thread_default(iothread->worker_context);
75da668aa1SThomas Huth rcu_unregister_thread();
76da668aa1SThomas Huth return NULL;
77da668aa1SThomas Huth }
78da668aa1SThomas Huth
iothread_stop_bh(void * opaque)79da668aa1SThomas Huth static void iothread_stop_bh(void *opaque)
80da668aa1SThomas Huth {
81da668aa1SThomas Huth IOThread *iothread = opaque;
82da668aa1SThomas Huth
83da668aa1SThomas Huth iothread->stopping = true;
84da668aa1SThomas Huth }
85da668aa1SThomas Huth
iothread_join(IOThread * iothread)86da668aa1SThomas Huth void iothread_join(IOThread *iothread)
87da668aa1SThomas Huth {
88da668aa1SThomas Huth aio_bh_schedule_oneshot(iothread->ctx, iothread_stop_bh, iothread);
89da668aa1SThomas Huth qemu_thread_join(&iothread->thread);
90da668aa1SThomas Huth g_main_context_unref(iothread->worker_context);
91da668aa1SThomas Huth g_main_loop_unref(iothread->main_loop);
92da668aa1SThomas Huth qemu_cond_destroy(&iothread->init_done_cond);
93da668aa1SThomas Huth qemu_mutex_destroy(&iothread->init_done_lock);
94da668aa1SThomas Huth aio_context_unref(iothread->ctx);
95da668aa1SThomas Huth g_free(iothread);
96da668aa1SThomas Huth }
97da668aa1SThomas Huth
iothread_new(void)98da668aa1SThomas Huth IOThread *iothread_new(void)
99da668aa1SThomas Huth {
100da668aa1SThomas Huth IOThread *iothread = g_new0(IOThread, 1);
101da668aa1SThomas Huth
102da668aa1SThomas Huth qemu_mutex_init(&iothread->init_done_lock);
103da668aa1SThomas Huth qemu_cond_init(&iothread->init_done_cond);
104da668aa1SThomas Huth qemu_thread_create(&iothread->thread, NULL, iothread_run,
105da668aa1SThomas Huth iothread, QEMU_THREAD_JOINABLE);
106da668aa1SThomas Huth
107da668aa1SThomas Huth /* Wait for initialization to complete */
108da668aa1SThomas Huth qemu_mutex_lock(&iothread->init_done_lock);
109da668aa1SThomas Huth while (iothread->ctx == NULL) {
110da668aa1SThomas Huth qemu_cond_wait(&iothread->init_done_cond,
111da668aa1SThomas Huth &iothread->init_done_lock);
112da668aa1SThomas Huth }
113da668aa1SThomas Huth qemu_mutex_unlock(&iothread->init_done_lock);
114da668aa1SThomas Huth return iothread;
115da668aa1SThomas Huth }
116da668aa1SThomas Huth
iothread_get_aio_context(IOThread * iothread)117da668aa1SThomas Huth AioContext *iothread_get_aio_context(IOThread *iothread)
118da668aa1SThomas Huth {
119da668aa1SThomas Huth return iothread->ctx;
120da668aa1SThomas Huth }
121