1 /* 2 * Event loop thread implementation for unit tests 3 * 4 * Copyright Red Hat Inc., 2013, 2016 5 * 6 * Authors: 7 * Stefan Hajnoczi <stefanha@redhat.com> 8 * Paolo Bonzini <pbonzini@redhat.com> 9 * 10 * This work is licensed under the terms of the GNU GPL, version 2 or later. 11 * See the COPYING file in the top-level directory. 12 * 13 */ 14 15 #include "qemu/osdep.h" 16 #include "qapi/error.h" 17 #include "block/aio.h" 18 #include "qemu/main-loop.h" 19 #include "qemu/rcu.h" 20 #include "iothread.h" 21 22 struct IOThread { 23 AioContext *ctx; 24 GMainContext *worker_context; 25 GMainLoop *main_loop; 26 27 QemuThread thread; 28 QemuMutex init_done_lock; 29 QemuCond init_done_cond; /* is thread initialization done? */ 30 bool stopping; 31 }; 32 33 static __thread IOThread *my_iothread; 34 35 AioContext *qemu_get_current_aio_context(void) 36 { 37 return my_iothread ? my_iothread->ctx : qemu_get_aio_context(); 38 } 39 40 static void iothread_init_gcontext(IOThread *iothread) 41 { 42 GSource *source; 43 44 iothread->worker_context = g_main_context_new(); 45 source = aio_get_g_source(iothread_get_aio_context(iothread)); 46 g_source_attach(source, iothread->worker_context); 47 g_source_unref(source); 48 iothread->main_loop = g_main_loop_new(iothread->worker_context, TRUE); 49 } 50 51 static void *iothread_run(void *opaque) 52 { 53 IOThread *iothread = opaque; 54 55 rcu_register_thread(); 56 57 my_iothread = iothread; 58 qemu_mutex_lock(&iothread->init_done_lock); 59 iothread->ctx = aio_context_new(&error_abort); 60 61 /* 62 * We must connect the ctx to a GMainContext, because in older versions 63 * of glib the g_source_ref()/unref() functions are not threadsafe 64 * on sources without a context. 65 */ 66 iothread_init_gcontext(iothread); 67 68 /* 69 * g_main_context_push_thread_default() must be called before anything 70 * in this new thread uses glib. 71 */ 72 g_main_context_push_thread_default(iothread->worker_context); 73 74 qemu_cond_signal(&iothread->init_done_cond); 75 qemu_mutex_unlock(&iothread->init_done_lock); 76 77 while (!qatomic_read(&iothread->stopping)) { 78 aio_poll(iothread->ctx, true); 79 } 80 81 g_main_context_pop_thread_default(iothread->worker_context); 82 rcu_unregister_thread(); 83 return NULL; 84 } 85 86 static void iothread_stop_bh(void *opaque) 87 { 88 IOThread *iothread = opaque; 89 90 iothread->stopping = true; 91 } 92 93 void iothread_join(IOThread *iothread) 94 { 95 aio_bh_schedule_oneshot(iothread->ctx, iothread_stop_bh, iothread); 96 qemu_thread_join(&iothread->thread); 97 g_main_context_unref(iothread->worker_context); 98 g_main_loop_unref(iothread->main_loop); 99 qemu_cond_destroy(&iothread->init_done_cond); 100 qemu_mutex_destroy(&iothread->init_done_lock); 101 aio_context_unref(iothread->ctx); 102 g_free(iothread); 103 } 104 105 IOThread *iothread_new(void) 106 { 107 IOThread *iothread = g_new0(IOThread, 1); 108 109 qemu_mutex_init(&iothread->init_done_lock); 110 qemu_cond_init(&iothread->init_done_cond); 111 qemu_thread_create(&iothread->thread, NULL, iothread_run, 112 iothread, QEMU_THREAD_JOINABLE); 113 114 /* Wait for initialization to complete */ 115 qemu_mutex_lock(&iothread->init_done_lock); 116 while (iothread->ctx == NULL) { 117 qemu_cond_wait(&iothread->init_done_cond, 118 &iothread->init_done_lock); 119 } 120 qemu_mutex_unlock(&iothread->init_done_lock); 121 return iothread; 122 } 123 124 AioContext *iothread_get_aio_context(IOThread *iothread) 125 { 126 return iothread->ctx; 127 } 128