1 /* 2 * Event loop thread 3 * 4 * Copyright Red Hat Inc., 2013 5 * 6 * Authors: 7 * Stefan Hajnoczi <stefanha@redhat.com> 8 * 9 * This work is licensed under the terms of the GNU GPL, version 2 or later. 10 * See the COPYING file in the top-level directory. 11 * 12 */ 13 14 #include "qemu/osdep.h" 15 #include "qom/object.h" 16 #include "qom/object_interfaces.h" 17 #include "qemu/module.h" 18 #include "block/aio.h" 19 #include "block/block.h" 20 #include "sysemu/iothread.h" 21 #include "qmp-commands.h" 22 #include "qemu/error-report.h" 23 #include "qemu/rcu.h" 24 #include "qemu/main-loop.h" 25 26 typedef ObjectClass IOThreadClass; 27 28 #define IOTHREAD_GET_CLASS(obj) \ 29 OBJECT_GET_CLASS(IOThreadClass, obj, TYPE_IOTHREAD) 30 #define IOTHREAD_CLASS(klass) \ 31 OBJECT_CLASS_CHECK(IOThreadClass, klass, TYPE_IOTHREAD) 32 33 /* Benchmark results from 2016 on NVMe SSD drives show max polling times around 34 * 16-32 microseconds yield IOPS improvements for both iodepth=1 and iodepth=32 35 * workloads. 36 */ 37 #define IOTHREAD_POLL_MAX_NS_DEFAULT 32768ULL 38 39 static __thread IOThread *my_iothread; 40 41 AioContext *qemu_get_current_aio_context(void) 42 { 43 return my_iothread ? my_iothread->ctx : qemu_get_aio_context(); 44 } 45 46 static void *iothread_run(void *opaque) 47 { 48 IOThread *iothread = opaque; 49 50 rcu_register_thread(); 51 52 my_iothread = iothread; 53 qemu_mutex_lock(&iothread->init_done_lock); 54 iothread->thread_id = qemu_get_thread_id(); 55 qemu_cond_signal(&iothread->init_done_cond); 56 qemu_mutex_unlock(&iothread->init_done_lock); 57 58 while (!atomic_read(&iothread->stopping)) { 59 aio_poll(iothread->ctx, true); 60 61 if (atomic_read(&iothread->worker_context)) { 62 GMainLoop *loop; 63 64 g_main_context_push_thread_default(iothread->worker_context); 65 iothread->main_loop = 66 g_main_loop_new(iothread->worker_context, TRUE); 67 loop = iothread->main_loop; 68 69 g_main_loop_run(iothread->main_loop); 70 iothread->main_loop = NULL; 71 g_main_loop_unref(loop); 72 73 g_main_context_pop_thread_default(iothread->worker_context); 74 } 75 } 76 77 rcu_unregister_thread(); 78 return NULL; 79 } 80 81 void iothread_stop(IOThread *iothread) 82 { 83 if (!iothread->ctx || iothread->stopping) { 84 return; 85 } 86 iothread->stopping = true; 87 aio_notify(iothread->ctx); 88 if (atomic_read(&iothread->main_loop)) { 89 g_main_loop_quit(iothread->main_loop); 90 } 91 qemu_thread_join(&iothread->thread); 92 } 93 94 static int iothread_stop_iter(Object *object, void *opaque) 95 { 96 IOThread *iothread; 97 98 iothread = (IOThread *)object_dynamic_cast(object, TYPE_IOTHREAD); 99 if (!iothread) { 100 return 0; 101 } 102 iothread_stop(iothread); 103 return 0; 104 } 105 106 static void iothread_instance_init(Object *obj) 107 { 108 IOThread *iothread = IOTHREAD(obj); 109 110 iothread->poll_max_ns = IOTHREAD_POLL_MAX_NS_DEFAULT; 111 } 112 113 static void iothread_instance_finalize(Object *obj) 114 { 115 IOThread *iothread = IOTHREAD(obj); 116 117 iothread_stop(iothread); 118 if (iothread->worker_context) { 119 g_main_context_unref(iothread->worker_context); 120 iothread->worker_context = NULL; 121 } 122 qemu_cond_destroy(&iothread->init_done_cond); 123 qemu_mutex_destroy(&iothread->init_done_lock); 124 if (!iothread->ctx) { 125 return; 126 } 127 aio_context_unref(iothread->ctx); 128 } 129 130 static void iothread_complete(UserCreatable *obj, Error **errp) 131 { 132 Error *local_error = NULL; 133 IOThread *iothread = IOTHREAD(obj); 134 char *name, *thread_name; 135 136 iothread->stopping = false; 137 iothread->thread_id = -1; 138 iothread->ctx = aio_context_new(&local_error); 139 if (!iothread->ctx) { 140 error_propagate(errp, local_error); 141 return; 142 } 143 144 aio_context_set_poll_params(iothread->ctx, 145 iothread->poll_max_ns, 146 iothread->poll_grow, 147 iothread->poll_shrink, 148 &local_error); 149 if (local_error) { 150 error_propagate(errp, local_error); 151 aio_context_unref(iothread->ctx); 152 iothread->ctx = NULL; 153 return; 154 } 155 156 qemu_mutex_init(&iothread->init_done_lock); 157 qemu_cond_init(&iothread->init_done_cond); 158 iothread->once = (GOnce) G_ONCE_INIT; 159 160 /* This assumes we are called from a thread with useful CPU affinity for us 161 * to inherit. 162 */ 163 name = object_get_canonical_path_component(OBJECT(obj)); 164 thread_name = g_strdup_printf("IO %s", name); 165 qemu_thread_create(&iothread->thread, thread_name, iothread_run, 166 iothread, QEMU_THREAD_JOINABLE); 167 g_free(thread_name); 168 g_free(name); 169 170 /* Wait for initialization to complete */ 171 qemu_mutex_lock(&iothread->init_done_lock); 172 while (iothread->thread_id == -1) { 173 qemu_cond_wait(&iothread->init_done_cond, 174 &iothread->init_done_lock); 175 } 176 qemu_mutex_unlock(&iothread->init_done_lock); 177 } 178 179 typedef struct { 180 const char *name; 181 ptrdiff_t offset; /* field's byte offset in IOThread struct */ 182 } PollParamInfo; 183 184 static PollParamInfo poll_max_ns_info = { 185 "poll-max-ns", offsetof(IOThread, poll_max_ns), 186 }; 187 static PollParamInfo poll_grow_info = { 188 "poll-grow", offsetof(IOThread, poll_grow), 189 }; 190 static PollParamInfo poll_shrink_info = { 191 "poll-shrink", offsetof(IOThread, poll_shrink), 192 }; 193 194 static void iothread_get_poll_param(Object *obj, Visitor *v, 195 const char *name, void *opaque, Error **errp) 196 { 197 IOThread *iothread = IOTHREAD(obj); 198 PollParamInfo *info = opaque; 199 int64_t *field = (void *)iothread + info->offset; 200 201 visit_type_int64(v, name, field, errp); 202 } 203 204 static void iothread_set_poll_param(Object *obj, Visitor *v, 205 const char *name, void *opaque, Error **errp) 206 { 207 IOThread *iothread = IOTHREAD(obj); 208 PollParamInfo *info = opaque; 209 int64_t *field = (void *)iothread + info->offset; 210 Error *local_err = NULL; 211 int64_t value; 212 213 visit_type_int64(v, name, &value, &local_err); 214 if (local_err) { 215 goto out; 216 } 217 218 if (value < 0) { 219 error_setg(&local_err, "%s value must be in range [0, %"PRId64"]", 220 info->name, INT64_MAX); 221 goto out; 222 } 223 224 *field = value; 225 226 if (iothread->ctx) { 227 aio_context_set_poll_params(iothread->ctx, 228 iothread->poll_max_ns, 229 iothread->poll_grow, 230 iothread->poll_shrink, 231 &local_err); 232 } 233 234 out: 235 error_propagate(errp, local_err); 236 } 237 238 static void iothread_class_init(ObjectClass *klass, void *class_data) 239 { 240 UserCreatableClass *ucc = USER_CREATABLE_CLASS(klass); 241 ucc->complete = iothread_complete; 242 243 object_class_property_add(klass, "poll-max-ns", "int", 244 iothread_get_poll_param, 245 iothread_set_poll_param, 246 NULL, &poll_max_ns_info, &error_abort); 247 object_class_property_add(klass, "poll-grow", "int", 248 iothread_get_poll_param, 249 iothread_set_poll_param, 250 NULL, &poll_grow_info, &error_abort); 251 object_class_property_add(klass, "poll-shrink", "int", 252 iothread_get_poll_param, 253 iothread_set_poll_param, 254 NULL, &poll_shrink_info, &error_abort); 255 } 256 257 static const TypeInfo iothread_info = { 258 .name = TYPE_IOTHREAD, 259 .parent = TYPE_OBJECT, 260 .class_init = iothread_class_init, 261 .instance_size = sizeof(IOThread), 262 .instance_init = iothread_instance_init, 263 .instance_finalize = iothread_instance_finalize, 264 .interfaces = (InterfaceInfo[]) { 265 {TYPE_USER_CREATABLE}, 266 {} 267 }, 268 }; 269 270 static void iothread_register_types(void) 271 { 272 type_register_static(&iothread_info); 273 } 274 275 type_init(iothread_register_types) 276 277 char *iothread_get_id(IOThread *iothread) 278 { 279 return object_get_canonical_path_component(OBJECT(iothread)); 280 } 281 282 AioContext *iothread_get_aio_context(IOThread *iothread) 283 { 284 return iothread->ctx; 285 } 286 287 static int query_one_iothread(Object *object, void *opaque) 288 { 289 IOThreadInfoList ***prev = opaque; 290 IOThreadInfoList *elem; 291 IOThreadInfo *info; 292 IOThread *iothread; 293 294 iothread = (IOThread *)object_dynamic_cast(object, TYPE_IOTHREAD); 295 if (!iothread) { 296 return 0; 297 } 298 299 info = g_new0(IOThreadInfo, 1); 300 info->id = iothread_get_id(iothread); 301 info->thread_id = iothread->thread_id; 302 info->poll_max_ns = iothread->poll_max_ns; 303 info->poll_grow = iothread->poll_grow; 304 info->poll_shrink = iothread->poll_shrink; 305 306 elem = g_new0(IOThreadInfoList, 1); 307 elem->value = info; 308 elem->next = NULL; 309 310 **prev = elem; 311 *prev = &elem->next; 312 return 0; 313 } 314 315 IOThreadInfoList *qmp_query_iothreads(Error **errp) 316 { 317 IOThreadInfoList *head = NULL; 318 IOThreadInfoList **prev = &head; 319 Object *container = object_get_objects_root(); 320 321 object_child_foreach(container, query_one_iothread, &prev); 322 return head; 323 } 324 325 void iothread_stop_all(void) 326 { 327 Object *container = object_get_objects_root(); 328 BlockDriverState *bs; 329 BdrvNextIterator it; 330 331 for (bs = bdrv_first(&it); bs; bs = bdrv_next(&it)) { 332 AioContext *ctx = bdrv_get_aio_context(bs); 333 if (ctx == qemu_get_aio_context()) { 334 continue; 335 } 336 aio_context_acquire(ctx); 337 bdrv_set_aio_context(bs, qemu_get_aio_context()); 338 aio_context_release(ctx); 339 } 340 341 object_child_foreach(container, iothread_stop_iter, NULL); 342 } 343 344 static gpointer iothread_g_main_context_init(gpointer opaque) 345 { 346 AioContext *ctx; 347 IOThread *iothread = opaque; 348 GSource *source; 349 350 iothread->worker_context = g_main_context_new(); 351 352 ctx = iothread_get_aio_context(iothread); 353 source = aio_get_g_source(ctx); 354 g_source_attach(source, iothread->worker_context); 355 g_source_unref(source); 356 357 aio_notify(iothread->ctx); 358 return NULL; 359 } 360 361 GMainContext *iothread_get_g_main_context(IOThread *iothread) 362 { 363 g_once(&iothread->once, iothread_g_main_context_init, iothread); 364 365 return iothread->worker_context; 366 } 367 368 IOThread *iothread_create(const char *id, Error **errp) 369 { 370 Object *obj; 371 372 obj = object_new_with_props(TYPE_IOTHREAD, 373 object_get_internal_root(), 374 id, errp, NULL); 375 376 return IOTHREAD(obj); 377 } 378 379 void iothread_destroy(IOThread *iothread) 380 { 381 object_unparent(OBJECT(iothread)); 382 } 383