xref: /openbmc/qemu/iothread.c (revision 99747b71baf278068b5938ccdc66d6c906ed437e)
1  /*
2   * Event loop thread
3   *
4   * Copyright Red Hat Inc., 2013, 2020
5   *
6   * Authors:
7   *  Stefan Hajnoczi   <stefanha@redhat.com>
8   *
9   * This work is licensed under the terms of the GNU GPL, version 2 or later.
10   * See the COPYING file in the top-level directory.
11   *
12   */
13  
14  #include "qemu/osdep.h"
15  #include "qom/object.h"
16  #include "qom/object_interfaces.h"
17  #include "qemu/module.h"
18  #include "block/aio.h"
19  #include "block/block.h"
20  #include "sysemu/event-loop-base.h"
21  #include "sysemu/iothread.h"
22  #include "qapi/error.h"
23  #include "qapi/qapi-commands-misc.h"
24  #include "qemu/error-report.h"
25  #include "qemu/rcu.h"
26  #include "qemu/main-loop.h"
27  
28  
29  #ifdef CONFIG_POSIX
30  /* Benchmark results from 2016 on NVMe SSD drives show max polling times around
31   * 16-32 microseconds yield IOPS improvements for both iodepth=1 and iodepth=32
32   * workloads.
33   */
34  #define IOTHREAD_POLL_MAX_NS_DEFAULT 32768ULL
35  #else
36  #define IOTHREAD_POLL_MAX_NS_DEFAULT 0ULL
37  #endif
38  
39  static void *iothread_run(void *opaque)
40  {
41      IOThread *iothread = opaque;
42  
43      rcu_register_thread();
44      /*
45       * g_main_context_push_thread_default() must be called before anything
46       * in this new thread uses glib.
47       */
48      g_main_context_push_thread_default(iothread->worker_context);
49      qemu_set_current_aio_context(iothread->ctx);
50      iothread->thread_id = qemu_get_thread_id();
51      qemu_sem_post(&iothread->init_done_sem);
52  
53      while (iothread->running) {
54          /*
55           * Note: from functional-wise the g_main_loop_run() below can
56           * already cover the aio_poll() events, but we can't run the
57           * main loop unconditionally because explicit aio_poll() here
58           * is faster than g_main_loop_run() when we do not need the
59           * gcontext at all (e.g., pure block layer iothreads).  In
60           * other words, when we want to run the gcontext with the
61           * iothread we need to pay some performance for functionality.
62           */
63          aio_poll(iothread->ctx, true);
64  
65          /*
66           * We must check the running state again in case it was
67           * changed in previous aio_poll()
68           */
69          if (iothread->running && qatomic_read(&iothread->run_gcontext)) {
70              g_main_loop_run(iothread->main_loop);
71          }
72      }
73  
74      g_main_context_pop_thread_default(iothread->worker_context);
75      rcu_unregister_thread();
76      return NULL;
77  }
78  
79  /* Runs in iothread_run() thread */
80  static void iothread_stop_bh(void *opaque)
81  {
82      IOThread *iothread = opaque;
83  
84      iothread->running = false; /* stop iothread_run() */
85  
86      if (iothread->main_loop) {
87          g_main_loop_quit(iothread->main_loop);
88      }
89  }
90  
91  void iothread_stop(IOThread *iothread)
92  {
93      if (!iothread->ctx || iothread->stopping) {
94          return;
95      }
96      iothread->stopping = true;
97      aio_bh_schedule_oneshot(iothread->ctx, iothread_stop_bh, iothread);
98      qemu_thread_join(&iothread->thread);
99  }
100  
101  static void iothread_instance_init(Object *obj)
102  {
103      IOThread *iothread = IOTHREAD(obj);
104  
105      iothread->poll_max_ns = IOTHREAD_POLL_MAX_NS_DEFAULT;
106      iothread->thread_id = -1;
107      qemu_sem_init(&iothread->init_done_sem, 0);
108      /* By default, we don't run gcontext */
109      qatomic_set(&iothread->run_gcontext, 0);
110  }
111  
112  static void iothread_instance_finalize(Object *obj)
113  {
114      IOThread *iothread = IOTHREAD(obj);
115  
116      iothread_stop(iothread);
117  
118      /*
119       * Before glib2 2.33.10, there is a glib2 bug that GSource context
120       * pointer may not be cleared even if the context has already been
121       * destroyed (while it should).  Here let's free the AIO context
122       * earlier to bypass that glib bug.
123       *
124       * We can remove this comment after the minimum supported glib2
125       * version boosts to 2.33.10.  Before that, let's free the
126       * GSources first before destroying any GMainContext.
127       */
128      if (iothread->ctx) {
129          aio_context_unref(iothread->ctx);
130          iothread->ctx = NULL;
131      }
132      if (iothread->worker_context) {
133          g_main_context_unref(iothread->worker_context);
134          iothread->worker_context = NULL;
135          g_main_loop_unref(iothread->main_loop);
136          iothread->main_loop = NULL;
137      }
138      qemu_sem_destroy(&iothread->init_done_sem);
139  }
140  
141  static void iothread_init_gcontext(IOThread *iothread, const char *thread_name)
142  {
143      GSource *source;
144      g_autofree char *name = g_strdup_printf("%s aio-context", thread_name);
145  
146      iothread->worker_context = g_main_context_new();
147      source = aio_get_g_source(iothread_get_aio_context(iothread));
148      g_source_set_name(source, name);
149      g_source_attach(source, iothread->worker_context);
150      g_source_unref(source);
151      iothread->main_loop = g_main_loop_new(iothread->worker_context, TRUE);
152  }
153  
154  static void iothread_set_aio_context_params(EventLoopBase *base, Error **errp)
155  {
156      ERRP_GUARD();
157      IOThread *iothread = IOTHREAD(base);
158  
159      if (!iothread->ctx) {
160          return;
161      }
162  
163      aio_context_set_poll_params(iothread->ctx,
164                                  iothread->poll_max_ns,
165                                  iothread->poll_grow,
166                                  iothread->poll_shrink,
167                                  errp);
168      if (*errp) {
169          return;
170      }
171  
172      aio_context_set_aio_params(iothread->ctx,
173                                 iothread->parent_obj.aio_max_batch);
174  
175      aio_context_set_thread_pool_params(iothread->ctx, base->thread_pool_min,
176                                         base->thread_pool_max, errp);
177  }
178  
179  
180  static void iothread_init(EventLoopBase *base, Error **errp)
181  {
182      Error *local_error = NULL;
183      IOThread *iothread = IOTHREAD(base);
184      g_autofree char *thread_name = NULL;
185  
186      iothread->stopping = false;
187      iothread->running = true;
188      iothread->ctx = aio_context_new(errp);
189      if (!iothread->ctx) {
190          return;
191      }
192  
193      thread_name = g_strdup_printf("IO %s",
194                          object_get_canonical_path_component(OBJECT(base)));
195  
196      /*
197       * Init one GMainContext for the iothread unconditionally, even if
198       * it's not used
199       */
200      iothread_init_gcontext(iothread, thread_name);
201  
202      iothread_set_aio_context_params(base, &local_error);
203      if (local_error) {
204          error_propagate(errp, local_error);
205          aio_context_unref(iothread->ctx);
206          iothread->ctx = NULL;
207          return;
208      }
209  
210      /* This assumes we are called from a thread with useful CPU affinity for us
211       * to inherit.
212       */
213      qemu_thread_create(&iothread->thread, thread_name, iothread_run,
214                         iothread, QEMU_THREAD_JOINABLE);
215  
216      /* Wait for initialization to complete */
217      while (iothread->thread_id == -1) {
218          qemu_sem_wait(&iothread->init_done_sem);
219      }
220  }
221  
222  typedef struct {
223      const char *name;
224      ptrdiff_t offset; /* field's byte offset in IOThread struct */
225  } IOThreadParamInfo;
226  
227  static IOThreadParamInfo poll_max_ns_info = {
228      "poll-max-ns", offsetof(IOThread, poll_max_ns),
229  };
230  static IOThreadParamInfo poll_grow_info = {
231      "poll-grow", offsetof(IOThread, poll_grow),
232  };
233  static IOThreadParamInfo poll_shrink_info = {
234      "poll-shrink", offsetof(IOThread, poll_shrink),
235  };
236  
237  static void iothread_get_param(Object *obj, Visitor *v,
238          const char *name, IOThreadParamInfo *info, Error **errp)
239  {
240      IOThread *iothread = IOTHREAD(obj);
241      int64_t *field = (void *)iothread + info->offset;
242  
243      visit_type_int64(v, name, field, errp);
244  }
245  
246  static bool iothread_set_param(Object *obj, Visitor *v,
247          const char *name, IOThreadParamInfo *info, Error **errp)
248  {
249      IOThread *iothread = IOTHREAD(obj);
250      int64_t *field = (void *)iothread + info->offset;
251      int64_t value;
252  
253      if (!visit_type_int64(v, name, &value, errp)) {
254          return false;
255      }
256  
257      if (value < 0) {
258          error_setg(errp, "%s value must be in range [0, %" PRId64 "]",
259                     info->name, INT64_MAX);
260          return false;
261      }
262  
263      *field = value;
264  
265      return true;
266  }
267  
268  static void iothread_get_poll_param(Object *obj, Visitor *v,
269          const char *name, void *opaque, Error **errp)
270  {
271      IOThreadParamInfo *info = opaque;
272  
273      iothread_get_param(obj, v, name, info, errp);
274  }
275  
276  static void iothread_set_poll_param(Object *obj, Visitor *v,
277          const char *name, void *opaque, Error **errp)
278  {
279      IOThread *iothread = IOTHREAD(obj);
280      IOThreadParamInfo *info = opaque;
281  
282      if (!iothread_set_param(obj, v, name, info, errp)) {
283          return;
284      }
285  
286      if (iothread->ctx) {
287          aio_context_set_poll_params(iothread->ctx,
288                                      iothread->poll_max_ns,
289                                      iothread->poll_grow,
290                                      iothread->poll_shrink,
291                                      errp);
292      }
293  }
294  
295  static void iothread_class_init(ObjectClass *klass, void *class_data)
296  {
297      EventLoopBaseClass *bc = EVENT_LOOP_BASE_CLASS(klass);
298  
299      bc->init = iothread_init;
300      bc->update_params = iothread_set_aio_context_params;
301  
302      object_class_property_add(klass, "poll-max-ns", "int",
303                                iothread_get_poll_param,
304                                iothread_set_poll_param,
305                                NULL, &poll_max_ns_info);
306      object_class_property_add(klass, "poll-grow", "int",
307                                iothread_get_poll_param,
308                                iothread_set_poll_param,
309                                NULL, &poll_grow_info);
310      object_class_property_add(klass, "poll-shrink", "int",
311                                iothread_get_poll_param,
312                                iothread_set_poll_param,
313                                NULL, &poll_shrink_info);
314  }
315  
316  static const TypeInfo iothread_info = {
317      .name = TYPE_IOTHREAD,
318      .parent = TYPE_EVENT_LOOP_BASE,
319      .class_init = iothread_class_init,
320      .instance_size = sizeof(IOThread),
321      .instance_init = iothread_instance_init,
322      .instance_finalize = iothread_instance_finalize,
323  };
324  
325  static void iothread_register_types(void)
326  {
327      type_register_static(&iothread_info);
328  }
329  
330  type_init(iothread_register_types)
331  
332  char *iothread_get_id(IOThread *iothread)
333  {
334      return g_strdup(object_get_canonical_path_component(OBJECT(iothread)));
335  }
336  
337  AioContext *iothread_get_aio_context(IOThread *iothread)
338  {
339      return iothread->ctx;
340  }
341  
342  static int query_one_iothread(Object *object, void *opaque)
343  {
344      IOThreadInfoList ***tail = opaque;
345      IOThreadInfo *info;
346      IOThread *iothread;
347  
348      iothread = (IOThread *)object_dynamic_cast(object, TYPE_IOTHREAD);
349      if (!iothread) {
350          return 0;
351      }
352  
353      info = g_new0(IOThreadInfo, 1);
354      info->id = iothread_get_id(iothread);
355      info->thread_id = iothread->thread_id;
356      info->poll_max_ns = iothread->poll_max_ns;
357      info->poll_grow = iothread->poll_grow;
358      info->poll_shrink = iothread->poll_shrink;
359      info->aio_max_batch = iothread->parent_obj.aio_max_batch;
360  
361      QAPI_LIST_APPEND(*tail, info);
362      return 0;
363  }
364  
365  IOThreadInfoList *qmp_query_iothreads(Error **errp)
366  {
367      IOThreadInfoList *head = NULL;
368      IOThreadInfoList **prev = &head;
369      Object *container = object_get_objects_root();
370  
371      object_child_foreach(container, query_one_iothread, &prev);
372      return head;
373  }
374  
375  GMainContext *iothread_get_g_main_context(IOThread *iothread)
376  {
377      qatomic_set(&iothread->run_gcontext, 1);
378      aio_notify(iothread->ctx);
379      return iothread->worker_context;
380  }
381  
382  IOThread *iothread_create(const char *id, Error **errp)
383  {
384      Object *obj;
385  
386      obj = object_new_with_props(TYPE_IOTHREAD,
387                                  object_get_internal_root(),
388                                  id, errp, NULL);
389  
390      return IOTHREAD(obj);
391  }
392  
393  void iothread_destroy(IOThread *iothread)
394  {
395      object_unparent(OBJECT(iothread));
396  }
397  
398  /* Lookup IOThread by its id.  Only finds user-created objects, not internal
399   * iothread_create() objects. */
400  IOThread *iothread_by_id(const char *id)
401  {
402      return IOTHREAD(object_resolve_path_type(id, TYPE_IOTHREAD, NULL));
403  }
404  
405  bool qemu_in_iothread(void)
406  {
407      return qemu_get_current_aio_context() == qemu_get_aio_context() ?
408                      false : true;
409  }
410