xref: /openbmc/qemu/iothread.c (revision 709395f8)
1 /*
2  * Event loop thread
3  *
4  * Copyright Red Hat Inc., 2013
5  *
6  * Authors:
7  *  Stefan Hajnoczi   <stefanha@redhat.com>
8  *
9  * This work is licensed under the terms of the GNU GPL, version 2 or later.
10  * See the COPYING file in the top-level directory.
11  *
12  */
13 
14 #include "qemu/osdep.h"
15 #include "qom/object.h"
16 #include "qom/object_interfaces.h"
17 #include "qemu/module.h"
18 #include "block/aio.h"
19 #include "block/block.h"
20 #include "sysemu/iothread.h"
21 #include "qapi/error.h"
22 #include "qapi/qapi-commands-misc.h"
23 #include "qemu/error-report.h"
24 #include "qemu/rcu.h"
25 #include "qemu/main-loop.h"
26 
27 typedef ObjectClass IOThreadClass;
28 
29 #define IOTHREAD_GET_CLASS(obj) \
30    OBJECT_GET_CLASS(IOThreadClass, obj, TYPE_IOTHREAD)
31 #define IOTHREAD_CLASS(klass) \
32    OBJECT_CLASS_CHECK(IOThreadClass, klass, TYPE_IOTHREAD)
33 
34 #ifdef CONFIG_POSIX
35 /* Benchmark results from 2016 on NVMe SSD drives show max polling times around
36  * 16-32 microseconds yield IOPS improvements for both iodepth=1 and iodepth=32
37  * workloads.
38  */
39 #define IOTHREAD_POLL_MAX_NS_DEFAULT 32768ULL
40 #else
41 #define IOTHREAD_POLL_MAX_NS_DEFAULT 0ULL
42 #endif
43 
44 static __thread IOThread *my_iothread;
45 
46 AioContext *qemu_get_current_aio_context(void)
47 {
48     return my_iothread ? my_iothread->ctx : qemu_get_aio_context();
49 }
50 
51 static void *iothread_run(void *opaque)
52 {
53     IOThread *iothread = opaque;
54 
55     rcu_register_thread();
56 
57     my_iothread = iothread;
58     qemu_mutex_lock(&iothread->init_done_lock);
59     iothread->thread_id = qemu_get_thread_id();
60     qemu_cond_signal(&iothread->init_done_cond);
61     qemu_mutex_unlock(&iothread->init_done_lock);
62 
63     while (iothread->running) {
64         aio_poll(iothread->ctx, true);
65 
66         /*
67          * We must check the running state again in case it was
68          * changed in previous aio_poll()
69          */
70         if (iothread->running && atomic_read(&iothread->worker_context)) {
71             GMainLoop *loop;
72 
73             g_main_context_push_thread_default(iothread->worker_context);
74             iothread->main_loop =
75                 g_main_loop_new(iothread->worker_context, TRUE);
76             loop = iothread->main_loop;
77 
78             g_main_loop_run(iothread->main_loop);
79             iothread->main_loop = NULL;
80             g_main_loop_unref(loop);
81 
82             g_main_context_pop_thread_default(iothread->worker_context);
83         }
84     }
85 
86     rcu_unregister_thread();
87     return NULL;
88 }
89 
90 /* Runs in iothread_run() thread */
91 static void iothread_stop_bh(void *opaque)
92 {
93     IOThread *iothread = opaque;
94 
95     iothread->running = false; /* stop iothread_run() */
96 
97     if (iothread->main_loop) {
98         g_main_loop_quit(iothread->main_loop);
99     }
100 }
101 
102 void iothread_stop(IOThread *iothread)
103 {
104     if (!iothread->ctx || iothread->stopping) {
105         return;
106     }
107     iothread->stopping = true;
108     aio_bh_schedule_oneshot(iothread->ctx, iothread_stop_bh, iothread);
109     qemu_thread_join(&iothread->thread);
110 }
111 
112 static void iothread_instance_init(Object *obj)
113 {
114     IOThread *iothread = IOTHREAD(obj);
115 
116     iothread->poll_max_ns = IOTHREAD_POLL_MAX_NS_DEFAULT;
117     iothread->thread_id = -1;
118 }
119 
120 static void iothread_instance_finalize(Object *obj)
121 {
122     IOThread *iothread = IOTHREAD(obj);
123 
124     iothread_stop(iothread);
125 
126     if (iothread->thread_id != -1) {
127         qemu_cond_destroy(&iothread->init_done_cond);
128         qemu_mutex_destroy(&iothread->init_done_lock);
129     }
130     /*
131      * Before glib2 2.33.10, there is a glib2 bug that GSource context
132      * pointer may not be cleared even if the context has already been
133      * destroyed (while it should).  Here let's free the AIO context
134      * earlier to bypass that glib bug.
135      *
136      * We can remove this comment after the minimum supported glib2
137      * version boosts to 2.33.10.  Before that, let's free the
138      * GSources first before destroying any GMainContext.
139      */
140     if (iothread->ctx) {
141         aio_context_unref(iothread->ctx);
142         iothread->ctx = NULL;
143     }
144     if (iothread->worker_context) {
145         g_main_context_unref(iothread->worker_context);
146         iothread->worker_context = NULL;
147     }
148 }
149 
150 static void iothread_complete(UserCreatable *obj, Error **errp)
151 {
152     Error *local_error = NULL;
153     IOThread *iothread = IOTHREAD(obj);
154     char *name, *thread_name;
155 
156     iothread->stopping = false;
157     iothread->running = true;
158     iothread->ctx = aio_context_new(&local_error);
159     if (!iothread->ctx) {
160         error_propagate(errp, local_error);
161         return;
162     }
163 
164     aio_context_set_poll_params(iothread->ctx,
165                                 iothread->poll_max_ns,
166                                 iothread->poll_grow,
167                                 iothread->poll_shrink,
168                                 &local_error);
169     if (local_error) {
170         error_propagate(errp, local_error);
171         aio_context_unref(iothread->ctx);
172         iothread->ctx = NULL;
173         return;
174     }
175 
176     qemu_mutex_init(&iothread->init_done_lock);
177     qemu_cond_init(&iothread->init_done_cond);
178     iothread->once = (GOnce) G_ONCE_INIT;
179 
180     /* This assumes we are called from a thread with useful CPU affinity for us
181      * to inherit.
182      */
183     name = object_get_canonical_path_component(OBJECT(obj));
184     thread_name = g_strdup_printf("IO %s", name);
185     qemu_thread_create(&iothread->thread, thread_name, iothread_run,
186                        iothread, QEMU_THREAD_JOINABLE);
187     g_free(thread_name);
188     g_free(name);
189 
190     /* Wait for initialization to complete */
191     qemu_mutex_lock(&iothread->init_done_lock);
192     while (iothread->thread_id == -1) {
193         qemu_cond_wait(&iothread->init_done_cond,
194                        &iothread->init_done_lock);
195     }
196     qemu_mutex_unlock(&iothread->init_done_lock);
197 }
198 
199 typedef struct {
200     const char *name;
201     ptrdiff_t offset; /* field's byte offset in IOThread struct */
202 } PollParamInfo;
203 
204 static PollParamInfo poll_max_ns_info = {
205     "poll-max-ns", offsetof(IOThread, poll_max_ns),
206 };
207 static PollParamInfo poll_grow_info = {
208     "poll-grow", offsetof(IOThread, poll_grow),
209 };
210 static PollParamInfo poll_shrink_info = {
211     "poll-shrink", offsetof(IOThread, poll_shrink),
212 };
213 
214 static void iothread_get_poll_param(Object *obj, Visitor *v,
215         const char *name, void *opaque, Error **errp)
216 {
217     IOThread *iothread = IOTHREAD(obj);
218     PollParamInfo *info = opaque;
219     int64_t *field = (void *)iothread + info->offset;
220 
221     visit_type_int64(v, name, field, errp);
222 }
223 
224 static void iothread_set_poll_param(Object *obj, Visitor *v,
225         const char *name, void *opaque, Error **errp)
226 {
227     IOThread *iothread = IOTHREAD(obj);
228     PollParamInfo *info = opaque;
229     int64_t *field = (void *)iothread + info->offset;
230     Error *local_err = NULL;
231     int64_t value;
232 
233     visit_type_int64(v, name, &value, &local_err);
234     if (local_err) {
235         goto out;
236     }
237 
238     if (value < 0) {
239         error_setg(&local_err, "%s value must be in range [0, %"PRId64"]",
240                    info->name, INT64_MAX);
241         goto out;
242     }
243 
244     *field = value;
245 
246     if (iothread->ctx) {
247         aio_context_set_poll_params(iothread->ctx,
248                                     iothread->poll_max_ns,
249                                     iothread->poll_grow,
250                                     iothread->poll_shrink,
251                                     &local_err);
252     }
253 
254 out:
255     error_propagate(errp, local_err);
256 }
257 
258 static void iothread_class_init(ObjectClass *klass, void *class_data)
259 {
260     UserCreatableClass *ucc = USER_CREATABLE_CLASS(klass);
261     ucc->complete = iothread_complete;
262 
263     object_class_property_add(klass, "poll-max-ns", "int",
264                               iothread_get_poll_param,
265                               iothread_set_poll_param,
266                               NULL, &poll_max_ns_info, &error_abort);
267     object_class_property_add(klass, "poll-grow", "int",
268                               iothread_get_poll_param,
269                               iothread_set_poll_param,
270                               NULL, &poll_grow_info, &error_abort);
271     object_class_property_add(klass, "poll-shrink", "int",
272                               iothread_get_poll_param,
273                               iothread_set_poll_param,
274                               NULL, &poll_shrink_info, &error_abort);
275 }
276 
277 static const TypeInfo iothread_info = {
278     .name = TYPE_IOTHREAD,
279     .parent = TYPE_OBJECT,
280     .class_init = iothread_class_init,
281     .instance_size = sizeof(IOThread),
282     .instance_init = iothread_instance_init,
283     .instance_finalize = iothread_instance_finalize,
284     .interfaces = (InterfaceInfo[]) {
285         {TYPE_USER_CREATABLE},
286         {}
287     },
288 };
289 
290 static void iothread_register_types(void)
291 {
292     type_register_static(&iothread_info);
293 }
294 
295 type_init(iothread_register_types)
296 
297 char *iothread_get_id(IOThread *iothread)
298 {
299     return object_get_canonical_path_component(OBJECT(iothread));
300 }
301 
302 AioContext *iothread_get_aio_context(IOThread *iothread)
303 {
304     return iothread->ctx;
305 }
306 
307 static int query_one_iothread(Object *object, void *opaque)
308 {
309     IOThreadInfoList ***prev = opaque;
310     IOThreadInfoList *elem;
311     IOThreadInfo *info;
312     IOThread *iothread;
313 
314     iothread = (IOThread *)object_dynamic_cast(object, TYPE_IOTHREAD);
315     if (!iothread) {
316         return 0;
317     }
318 
319     info = g_new0(IOThreadInfo, 1);
320     info->id = iothread_get_id(iothread);
321     info->thread_id = iothread->thread_id;
322     info->poll_max_ns = iothread->poll_max_ns;
323     info->poll_grow = iothread->poll_grow;
324     info->poll_shrink = iothread->poll_shrink;
325 
326     elem = g_new0(IOThreadInfoList, 1);
327     elem->value = info;
328     elem->next = NULL;
329 
330     **prev = elem;
331     *prev = &elem->next;
332     return 0;
333 }
334 
335 IOThreadInfoList *qmp_query_iothreads(Error **errp)
336 {
337     IOThreadInfoList *head = NULL;
338     IOThreadInfoList **prev = &head;
339     Object *container = object_get_objects_root();
340 
341     object_child_foreach(container, query_one_iothread, &prev);
342     return head;
343 }
344 
345 static gpointer iothread_g_main_context_init(gpointer opaque)
346 {
347     AioContext *ctx;
348     IOThread *iothread = opaque;
349     GSource *source;
350 
351     iothread->worker_context = g_main_context_new();
352 
353     ctx = iothread_get_aio_context(iothread);
354     source = aio_get_g_source(ctx);
355     g_source_attach(source, iothread->worker_context);
356     g_source_unref(source);
357 
358     aio_notify(iothread->ctx);
359     return NULL;
360 }
361 
362 GMainContext *iothread_get_g_main_context(IOThread *iothread)
363 {
364     g_once(&iothread->once, iothread_g_main_context_init, iothread);
365 
366     return iothread->worker_context;
367 }
368 
369 IOThread *iothread_create(const char *id, Error **errp)
370 {
371     Object *obj;
372 
373     obj = object_new_with_props(TYPE_IOTHREAD,
374                                 object_get_internal_root(),
375                                 id, errp, NULL);
376 
377     return IOTHREAD(obj);
378 }
379 
380 void iothread_destroy(IOThread *iothread)
381 {
382     object_unparent(OBJECT(iothread));
383 }
384 
385 /* Lookup IOThread by its id.  Only finds user-created objects, not internal
386  * iothread_create() objects. */
387 IOThread *iothread_by_id(const char *id)
388 {
389     return IOTHREAD(object_resolve_path_type(id, TYPE_IOTHREAD, NULL));
390 }
391