1 /* 2 * Event loop thread 3 * 4 * Copyright Red Hat Inc., 2013 5 * 6 * Authors: 7 * Stefan Hajnoczi <stefanha@redhat.com> 8 * 9 * This work is licensed under the terms of the GNU GPL, version 2 or later. 10 * See the COPYING file in the top-level directory. 11 * 12 */ 13 14 #include "qemu/osdep.h" 15 #include "qom/object.h" 16 #include "qom/object_interfaces.h" 17 #include "qemu/module.h" 18 #include "block/aio.h" 19 #include "sysemu/iothread.h" 20 #include "qmp-commands.h" 21 #include "qemu/error-report.h" 22 #include "qemu/rcu.h" 23 24 typedef ObjectClass IOThreadClass; 25 26 #define IOTHREAD_GET_CLASS(obj) \ 27 OBJECT_GET_CLASS(IOThreadClass, obj, TYPE_IOTHREAD) 28 #define IOTHREAD_CLASS(klass) \ 29 OBJECT_CLASS_CHECK(IOThreadClass, klass, TYPE_IOTHREAD) 30 31 static void *iothread_run(void *opaque) 32 { 33 IOThread *iothread = opaque; 34 bool blocking; 35 36 rcu_register_thread(); 37 38 qemu_mutex_lock(&iothread->init_done_lock); 39 iothread->thread_id = qemu_get_thread_id(); 40 qemu_cond_signal(&iothread->init_done_cond); 41 qemu_mutex_unlock(&iothread->init_done_lock); 42 43 while (!iothread->stopping) { 44 aio_context_acquire(iothread->ctx); 45 blocking = true; 46 while (!iothread->stopping && aio_poll(iothread->ctx, blocking)) { 47 /* Progress was made, keep going */ 48 blocking = false; 49 } 50 aio_context_release(iothread->ctx); 51 } 52 53 rcu_unregister_thread(); 54 return NULL; 55 } 56 57 static int iothread_stop(Object *object, void *opaque) 58 { 59 IOThread *iothread; 60 61 iothread = (IOThread *)object_dynamic_cast(object, TYPE_IOTHREAD); 62 if (!iothread || !iothread->ctx) { 63 return 0; 64 } 65 iothread->stopping = true; 66 aio_notify(iothread->ctx); 67 qemu_thread_join(&iothread->thread); 68 return 0; 69 } 70 71 static void iothread_instance_finalize(Object *obj) 72 { 73 IOThread *iothread = IOTHREAD(obj); 74 75 iothread_stop(obj, NULL); 76 qemu_cond_destroy(&iothread->init_done_cond); 77 qemu_mutex_destroy(&iothread->init_done_lock); 78 aio_context_unref(iothread->ctx); 79 } 80 81 static void iothread_complete(UserCreatable *obj, Error **errp) 82 { 83 Error *local_error = NULL; 84 IOThread *iothread = IOTHREAD(obj); 85 char *name, *thread_name; 86 87 iothread->stopping = false; 88 iothread->thread_id = -1; 89 iothread->ctx = aio_context_new(&local_error); 90 if (!iothread->ctx) { 91 error_propagate(errp, local_error); 92 return; 93 } 94 95 qemu_mutex_init(&iothread->init_done_lock); 96 qemu_cond_init(&iothread->init_done_cond); 97 98 /* This assumes we are called from a thread with useful CPU affinity for us 99 * to inherit. 100 */ 101 name = object_get_canonical_path_component(OBJECT(obj)); 102 thread_name = g_strdup_printf("IO %s", name); 103 qemu_thread_create(&iothread->thread, thread_name, iothread_run, 104 iothread, QEMU_THREAD_JOINABLE); 105 g_free(thread_name); 106 g_free(name); 107 108 /* Wait for initialization to complete */ 109 qemu_mutex_lock(&iothread->init_done_lock); 110 while (iothread->thread_id == -1) { 111 qemu_cond_wait(&iothread->init_done_cond, 112 &iothread->init_done_lock); 113 } 114 qemu_mutex_unlock(&iothread->init_done_lock); 115 } 116 117 static void iothread_class_init(ObjectClass *klass, void *class_data) 118 { 119 UserCreatableClass *ucc = USER_CREATABLE_CLASS(klass); 120 ucc->complete = iothread_complete; 121 } 122 123 static const TypeInfo iothread_info = { 124 .name = TYPE_IOTHREAD, 125 .parent = TYPE_OBJECT, 126 .class_init = iothread_class_init, 127 .instance_size = sizeof(IOThread), 128 .instance_finalize = iothread_instance_finalize, 129 .interfaces = (InterfaceInfo[]) { 130 {TYPE_USER_CREATABLE}, 131 {} 132 }, 133 }; 134 135 static void iothread_register_types(void) 136 { 137 type_register_static(&iothread_info); 138 } 139 140 type_init(iothread_register_types) 141 142 char *iothread_get_id(IOThread *iothread) 143 { 144 return object_get_canonical_path_component(OBJECT(iothread)); 145 } 146 147 AioContext *iothread_get_aio_context(IOThread *iothread) 148 { 149 return iothread->ctx; 150 } 151 152 static int query_one_iothread(Object *object, void *opaque) 153 { 154 IOThreadInfoList ***prev = opaque; 155 IOThreadInfoList *elem; 156 IOThreadInfo *info; 157 IOThread *iothread; 158 159 iothread = (IOThread *)object_dynamic_cast(object, TYPE_IOTHREAD); 160 if (!iothread) { 161 return 0; 162 } 163 164 info = g_new0(IOThreadInfo, 1); 165 info->id = iothread_get_id(iothread); 166 info->thread_id = iothread->thread_id; 167 168 elem = g_new0(IOThreadInfoList, 1); 169 elem->value = info; 170 elem->next = NULL; 171 172 **prev = elem; 173 *prev = &elem->next; 174 return 0; 175 } 176 177 IOThreadInfoList *qmp_query_iothreads(Error **errp) 178 { 179 IOThreadInfoList *head = NULL; 180 IOThreadInfoList **prev = &head; 181 Object *container = object_get_objects_root(); 182 183 object_child_foreach(container, query_one_iothread, &prev); 184 return head; 185 } 186 187 void iothread_stop_all(void) 188 { 189 Object *container = object_get_objects_root(); 190 191 object_child_foreach(container, iothread_stop, NULL); 192 } 193