1 /* 2 * Event loop thread implementation for unit tests 3 * 4 * Copyright Red Hat Inc., 2013, 2016 5 * 6 * Authors: 7 * Stefan Hajnoczi <stefanha@redhat.com> 8 * Paolo Bonzini <pbonzini@redhat.com> 9 * 10 * This work is licensed under the terms of the GNU GPL, version 2 or later. 11 * See the COPYING file in the top-level directory. 12 * 13 */ 14 15 #include "qemu/osdep.h" 16 #include "qapi/error.h" 17 #include "block/aio.h" 18 #include "qemu/main-loop.h" 19 #include "qemu/rcu.h" 20 #include "iothread.h" 21 22 struct IOThread { 23 AioContext *ctx; 24 GMainContext *worker_context; 25 GMainLoop *main_loop; 26 27 QemuThread thread; 28 QemuMutex init_done_lock; 29 QemuCond init_done_cond; /* is thread initialization done? */ 30 bool stopping; 31 }; 32 33 static void iothread_init_gcontext(IOThread *iothread) 34 { 35 GSource *source; 36 37 iothread->worker_context = g_main_context_new(); 38 source = aio_get_g_source(iothread_get_aio_context(iothread)); 39 g_source_attach(source, iothread->worker_context); 40 g_source_unref(source); 41 iothread->main_loop = g_main_loop_new(iothread->worker_context, TRUE); 42 } 43 44 static void *iothread_run(void *opaque) 45 { 46 IOThread *iothread = opaque; 47 48 rcu_register_thread(); 49 50 qemu_mutex_lock(&iothread->init_done_lock); 51 iothread->ctx = aio_context_new(&error_abort); 52 qemu_set_current_aio_context(iothread->ctx); 53 54 /* 55 * We must connect the ctx to a GMainContext, because in older versions 56 * of glib the g_source_ref()/unref() functions are not threadsafe 57 * on sources without a context. 58 */ 59 iothread_init_gcontext(iothread); 60 61 /* 62 * g_main_context_push_thread_default() must be called before anything 63 * in this new thread uses glib. 64 */ 65 g_main_context_push_thread_default(iothread->worker_context); 66 67 qemu_cond_signal(&iothread->init_done_cond); 68 qemu_mutex_unlock(&iothread->init_done_lock); 69 70 while (!qatomic_read(&iothread->stopping)) { 71 aio_poll(iothread->ctx, true); 72 } 73 74 g_main_context_pop_thread_default(iothread->worker_context); 75 rcu_unregister_thread(); 76 return NULL; 77 } 78 79 static void iothread_stop_bh(void *opaque) 80 { 81 IOThread *iothread = opaque; 82 83 iothread->stopping = true; 84 } 85 86 void iothread_join(IOThread *iothread) 87 { 88 aio_bh_schedule_oneshot(iothread->ctx, iothread_stop_bh, iothread); 89 qemu_thread_join(&iothread->thread); 90 g_main_context_unref(iothread->worker_context); 91 g_main_loop_unref(iothread->main_loop); 92 qemu_cond_destroy(&iothread->init_done_cond); 93 qemu_mutex_destroy(&iothread->init_done_lock); 94 aio_context_unref(iothread->ctx); 95 g_free(iothread); 96 } 97 98 IOThread *iothread_new(void) 99 { 100 IOThread *iothread = g_new0(IOThread, 1); 101 102 qemu_mutex_init(&iothread->init_done_lock); 103 qemu_cond_init(&iothread->init_done_cond); 104 qemu_thread_create(&iothread->thread, NULL, iothread_run, 105 iothread, QEMU_THREAD_JOINABLE); 106 107 /* Wait for initialization to complete */ 108 qemu_mutex_lock(&iothread->init_done_lock); 109 while (iothread->ctx == NULL) { 110 qemu_cond_wait(&iothread->init_done_cond, 111 &iothread->init_done_lock); 112 } 113 qemu_mutex_unlock(&iothread->init_done_lock); 114 return iothread; 115 } 116 117 AioContext *iothread_get_aio_context(IOThread *iothread) 118 { 119 return iothread->ctx; 120 } 121