xref: /openbmc/qemu/util/async.c (revision c2b38b27)
1 /*
2  * Data plane event loop
3  *
4  * Copyright (c) 2003-2008 Fabrice Bellard
5  * Copyright (c) 2009-2017 QEMU contributors
6  *
7  * Permission is hereby granted, free of charge, to any person obtaining a copy
8  * of this software and associated documentation files (the "Software"), to deal
9  * in the Software without restriction, including without limitation the rights
10  * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
11  * copies of the Software, and to permit persons to whom the Software is
12  * furnished to do so, subject to the following conditions:
13  *
14  * The above copyright notice and this permission notice shall be included in
15  * all copies or substantial portions of the Software.
16  *
17  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
18  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
19  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
20  * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
21  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
22  * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
23  * THE SOFTWARE.
24  */
25 
26 #include "qemu/osdep.h"
27 #include "qapi/error.h"
28 #include "qemu-common.h"
29 #include "block/aio.h"
30 #include "block/thread-pool.h"
31 #include "qemu/main-loop.h"
32 #include "qemu/atomic.h"
33 #include "block/raw-aio.h"
34 
35 /***********************************************************/
36 /* bottom halves (can be seen as timers which expire ASAP) */
37 
38 struct QEMUBH {
39     AioContext *ctx;
40     QEMUBHFunc *cb;
41     void *opaque;
42     QEMUBH *next;
43     bool scheduled;
44     bool idle;
45     bool deleted;
46 };
47 
48 void aio_bh_schedule_oneshot(AioContext *ctx, QEMUBHFunc *cb, void *opaque)
49 {
50     QEMUBH *bh;
51     bh = g_new(QEMUBH, 1);
52     *bh = (QEMUBH){
53         .ctx = ctx,
54         .cb = cb,
55         .opaque = opaque,
56     };
57     qemu_lockcnt_lock(&ctx->list_lock);
58     bh->next = ctx->first_bh;
59     bh->scheduled = 1;
60     bh->deleted = 1;
61     /* Make sure that the members are ready before putting bh into list */
62     smp_wmb();
63     ctx->first_bh = bh;
64     qemu_lockcnt_unlock(&ctx->list_lock);
65     aio_notify(ctx);
66 }
67 
68 QEMUBH *aio_bh_new(AioContext *ctx, QEMUBHFunc *cb, void *opaque)
69 {
70     QEMUBH *bh;
71     bh = g_new(QEMUBH, 1);
72     *bh = (QEMUBH){
73         .ctx = ctx,
74         .cb = cb,
75         .opaque = opaque,
76     };
77     qemu_lockcnt_lock(&ctx->list_lock);
78     bh->next = ctx->first_bh;
79     /* Make sure that the members are ready before putting bh into list */
80     smp_wmb();
81     ctx->first_bh = bh;
82     qemu_lockcnt_unlock(&ctx->list_lock);
83     return bh;
84 }
85 
86 void aio_bh_call(QEMUBH *bh)
87 {
88     bh->cb(bh->opaque);
89 }
90 
91 /* Multiple occurrences of aio_bh_poll cannot be called concurrently */
92 int aio_bh_poll(AioContext *ctx)
93 {
94     QEMUBH *bh, **bhp, *next;
95     int ret;
96     bool deleted = false;
97 
98     qemu_lockcnt_inc(&ctx->list_lock);
99 
100     ret = 0;
101     for (bh = atomic_rcu_read(&ctx->first_bh); bh; bh = next) {
102         next = atomic_rcu_read(&bh->next);
103         /* The atomic_xchg is paired with the one in qemu_bh_schedule.  The
104          * implicit memory barrier ensures that the callback sees all writes
105          * done by the scheduling thread.  It also ensures that the scheduling
106          * thread sees the zero before bh->cb has run, and thus will call
107          * aio_notify again if necessary.
108          */
109         if (atomic_xchg(&bh->scheduled, 0)) {
110             /* Idle BHs don't count as progress */
111             if (!bh->idle) {
112                 ret = 1;
113             }
114             bh->idle = 0;
115             aio_bh_call(bh);
116         }
117         if (bh->deleted) {
118             deleted = true;
119         }
120     }
121 
122     /* remove deleted bhs */
123     if (!deleted) {
124         qemu_lockcnt_dec(&ctx->list_lock);
125         return ret;
126     }
127 
128     if (qemu_lockcnt_dec_and_lock(&ctx->list_lock)) {
129         bhp = &ctx->first_bh;
130         while (*bhp) {
131             bh = *bhp;
132             if (bh->deleted && !bh->scheduled) {
133                 *bhp = bh->next;
134                 g_free(bh);
135             } else {
136                 bhp = &bh->next;
137             }
138         }
139         qemu_lockcnt_unlock(&ctx->list_lock);
140     }
141     return ret;
142 }
143 
144 void qemu_bh_schedule_idle(QEMUBH *bh)
145 {
146     bh->idle = 1;
147     /* Make sure that idle & any writes needed by the callback are done
148      * before the locations are read in the aio_bh_poll.
149      */
150     atomic_mb_set(&bh->scheduled, 1);
151 }
152 
153 void qemu_bh_schedule(QEMUBH *bh)
154 {
155     AioContext *ctx;
156 
157     ctx = bh->ctx;
158     bh->idle = 0;
159     /* The memory barrier implicit in atomic_xchg makes sure that:
160      * 1. idle & any writes needed by the callback are done before the
161      *    locations are read in the aio_bh_poll.
162      * 2. ctx is loaded before scheduled is set and the callback has a chance
163      *    to execute.
164      */
165     if (atomic_xchg(&bh->scheduled, 1) == 0) {
166         aio_notify(ctx);
167     }
168 }
169 
170 
171 /* This func is async.
172  */
173 void qemu_bh_cancel(QEMUBH *bh)
174 {
175     bh->scheduled = 0;
176 }
177 
178 /* This func is async.The bottom half will do the delete action at the finial
179  * end.
180  */
181 void qemu_bh_delete(QEMUBH *bh)
182 {
183     bh->scheduled = 0;
184     bh->deleted = 1;
185 }
186 
187 int64_t
188 aio_compute_timeout(AioContext *ctx)
189 {
190     int64_t deadline;
191     int timeout = -1;
192     QEMUBH *bh;
193 
194     for (bh = atomic_rcu_read(&ctx->first_bh); bh;
195          bh = atomic_rcu_read(&bh->next)) {
196         if (bh->scheduled) {
197             if (bh->idle) {
198                 /* idle bottom halves will be polled at least
199                  * every 10ms */
200                 timeout = 10000000;
201             } else {
202                 /* non-idle bottom halves will be executed
203                  * immediately */
204                 return 0;
205             }
206         }
207     }
208 
209     deadline = timerlistgroup_deadline_ns(&ctx->tlg);
210     if (deadline == 0) {
211         return 0;
212     } else {
213         return qemu_soonest_timeout(timeout, deadline);
214     }
215 }
216 
217 static gboolean
218 aio_ctx_prepare(GSource *source, gint    *timeout)
219 {
220     AioContext *ctx = (AioContext *) source;
221 
222     atomic_or(&ctx->notify_me, 1);
223 
224     /* We assume there is no timeout already supplied */
225     *timeout = qemu_timeout_ns_to_ms(aio_compute_timeout(ctx));
226 
227     if (aio_prepare(ctx)) {
228         *timeout = 0;
229     }
230 
231     return *timeout == 0;
232 }
233 
234 static gboolean
235 aio_ctx_check(GSource *source)
236 {
237     AioContext *ctx = (AioContext *) source;
238     QEMUBH *bh;
239 
240     atomic_and(&ctx->notify_me, ~1);
241     aio_notify_accept(ctx);
242 
243     for (bh = ctx->first_bh; bh; bh = bh->next) {
244         if (bh->scheduled) {
245             return true;
246         }
247     }
248     return aio_pending(ctx) || (timerlistgroup_deadline_ns(&ctx->tlg) == 0);
249 }
250 
251 static gboolean
252 aio_ctx_dispatch(GSource     *source,
253                  GSourceFunc  callback,
254                  gpointer     user_data)
255 {
256     AioContext *ctx = (AioContext *) source;
257 
258     assert(callback == NULL);
259     aio_dispatch(ctx, true);
260     return true;
261 }
262 
263 static void
264 aio_ctx_finalize(GSource     *source)
265 {
266     AioContext *ctx = (AioContext *) source;
267 
268     thread_pool_free(ctx->thread_pool);
269 
270 #ifdef CONFIG_LINUX_AIO
271     if (ctx->linux_aio) {
272         laio_detach_aio_context(ctx->linux_aio, ctx);
273         laio_cleanup(ctx->linux_aio);
274         ctx->linux_aio = NULL;
275     }
276 #endif
277 
278     qemu_lockcnt_lock(&ctx->list_lock);
279     assert(!qemu_lockcnt_count(&ctx->list_lock));
280     while (ctx->first_bh) {
281         QEMUBH *next = ctx->first_bh->next;
282 
283         /* qemu_bh_delete() must have been called on BHs in this AioContext */
284         assert(ctx->first_bh->deleted);
285 
286         g_free(ctx->first_bh);
287         ctx->first_bh = next;
288     }
289     qemu_lockcnt_unlock(&ctx->list_lock);
290 
291     aio_set_event_notifier(ctx, &ctx->notifier, false, NULL, NULL);
292     event_notifier_cleanup(&ctx->notifier);
293     qemu_rec_mutex_destroy(&ctx->lock);
294     qemu_lockcnt_destroy(&ctx->list_lock);
295     timerlistgroup_deinit(&ctx->tlg);
296 }
297 
298 static GSourceFuncs aio_source_funcs = {
299     aio_ctx_prepare,
300     aio_ctx_check,
301     aio_ctx_dispatch,
302     aio_ctx_finalize
303 };
304 
305 GSource *aio_get_g_source(AioContext *ctx)
306 {
307     g_source_ref(&ctx->source);
308     return &ctx->source;
309 }
310 
311 ThreadPool *aio_get_thread_pool(AioContext *ctx)
312 {
313     if (!ctx->thread_pool) {
314         ctx->thread_pool = thread_pool_new(ctx);
315     }
316     return ctx->thread_pool;
317 }
318 
319 #ifdef CONFIG_LINUX_AIO
320 LinuxAioState *aio_get_linux_aio(AioContext *ctx)
321 {
322     if (!ctx->linux_aio) {
323         ctx->linux_aio = laio_init();
324         laio_attach_aio_context(ctx->linux_aio, ctx);
325     }
326     return ctx->linux_aio;
327 }
328 #endif
329 
330 void aio_notify(AioContext *ctx)
331 {
332     /* Write e.g. bh->scheduled before reading ctx->notify_me.  Pairs
333      * with atomic_or in aio_ctx_prepare or atomic_add in aio_poll.
334      */
335     smp_mb();
336     if (ctx->notify_me) {
337         event_notifier_set(&ctx->notifier);
338         atomic_mb_set(&ctx->notified, true);
339     }
340 }
341 
342 void aio_notify_accept(AioContext *ctx)
343 {
344     if (atomic_xchg(&ctx->notified, false)) {
345         event_notifier_test_and_clear(&ctx->notifier);
346     }
347 }
348 
349 static void aio_timerlist_notify(void *opaque)
350 {
351     aio_notify(opaque);
352 }
353 
354 static void event_notifier_dummy_cb(EventNotifier *e)
355 {
356 }
357 
358 /* Returns true if aio_notify() was called (e.g. a BH was scheduled) */
359 static bool event_notifier_poll(void *opaque)
360 {
361     EventNotifier *e = opaque;
362     AioContext *ctx = container_of(e, AioContext, notifier);
363 
364     return atomic_read(&ctx->notified);
365 }
366 
367 AioContext *aio_context_new(Error **errp)
368 {
369     int ret;
370     AioContext *ctx;
371 
372     ctx = (AioContext *) g_source_new(&aio_source_funcs, sizeof(AioContext));
373     aio_context_setup(ctx);
374 
375     ret = event_notifier_init(&ctx->notifier, false);
376     if (ret < 0) {
377         error_setg_errno(errp, -ret, "Failed to initialize event notifier");
378         goto fail;
379     }
380     g_source_set_can_recurse(&ctx->source, true);
381     qemu_lockcnt_init(&ctx->list_lock);
382     aio_set_event_notifier(ctx, &ctx->notifier,
383                            false,
384                            (EventNotifierHandler *)
385                            event_notifier_dummy_cb,
386                            event_notifier_poll);
387 #ifdef CONFIG_LINUX_AIO
388     ctx->linux_aio = NULL;
389 #endif
390     ctx->thread_pool = NULL;
391     qemu_rec_mutex_init(&ctx->lock);
392     timerlistgroup_init(&ctx->tlg, aio_timerlist_notify, ctx);
393 
394     ctx->poll_ns = 0;
395     ctx->poll_max_ns = 0;
396     ctx->poll_grow = 0;
397     ctx->poll_shrink = 0;
398 
399     return ctx;
400 fail:
401     g_source_destroy(&ctx->source);
402     return NULL;
403 }
404 
405 void aio_context_ref(AioContext *ctx)
406 {
407     g_source_ref(&ctx->source);
408 }
409 
410 void aio_context_unref(AioContext *ctx)
411 {
412     g_source_unref(&ctx->source);
413 }
414 
415 void aio_context_acquire(AioContext *ctx)
416 {
417     qemu_rec_mutex_lock(&ctx->lock);
418 }
419 
420 void aio_context_release(AioContext *ctx)
421 {
422     qemu_rec_mutex_unlock(&ctx->lock);
423 }
424