xref: /openbmc/qemu/util/aio-posix.c (revision c8c89a6a)
1 /*
2  * QEMU aio implementation
3  *
4  * Copyright IBM, Corp. 2008
5  *
6  * Authors:
7  *  Anthony Liguori   <aliguori@us.ibm.com>
8  *
9  * This work is licensed under the terms of the GNU GPL, version 2.  See
10  * the COPYING file in the top-level directory.
11  *
12  * Contributions after 2012-01-13 are licensed under the terms of the
13  * GNU GPL, version 2 or (at your option) any later version.
14  */
15 
16 #include "qemu/osdep.h"
17 #include "block/block.h"
18 #include "qemu/main-loop.h"
19 #include "qemu/rcu.h"
20 #include "qemu/rcu_queue.h"
21 #include "qemu/sockets.h"
22 #include "qemu/cutils.h"
23 #include "trace.h"
24 #include "aio-posix.h"
25 
26 /*
27  * G_IO_IN and G_IO_OUT are not appropriate revents values for polling, since
28  * the handler may not need to access the file descriptor. For example, the
29  * handler doesn't need to read from an EventNotifier if it polled a memory
30  * location and a read syscall would be slow. Define our own unique revents
31  * value to indicate that polling determined this AioHandler is ready.
32  */
33 #define REVENTS_POLL_READY 0
34 
35 /* Stop userspace polling on a handler if it isn't active for some time */
36 #define POLL_IDLE_INTERVAL_NS (7 * NANOSECONDS_PER_SECOND)
37 
38 bool aio_poll_disabled(AioContext *ctx)
39 {
40     return qatomic_read(&ctx->poll_disable_cnt);
41 }
42 
43 void aio_add_ready_handler(AioHandlerList *ready_list,
44                            AioHandler *node,
45                            int revents)
46 {
47     QLIST_SAFE_REMOVE(node, node_ready); /* remove from nested parent's list */
48     node->pfd.revents = revents;
49     QLIST_INSERT_HEAD(ready_list, node, node_ready);
50 }
51 
52 static AioHandler *find_aio_handler(AioContext *ctx, int fd)
53 {
54     AioHandler *node;
55 
56     QLIST_FOREACH(node, &ctx->aio_handlers, node) {
57         if (node->pfd.fd == fd) {
58             if (!QLIST_IS_INSERTED(node, node_deleted)) {
59                 return node;
60             }
61         }
62     }
63 
64     return NULL;
65 }
66 
67 static bool aio_remove_fd_handler(AioContext *ctx, AioHandler *node)
68 {
69     /* If the GSource is in the process of being destroyed then
70      * g_source_remove_poll() causes an assertion failure.  Skip
71      * removal in that case, because glib cleans up its state during
72      * destruction anyway.
73      */
74     if (!g_source_is_destroyed(&ctx->source)) {
75         g_source_remove_poll(&ctx->source, &node->pfd);
76     }
77 
78     node->pfd.revents = 0;
79 
80     /* If the fd monitor has already marked it deleted, leave it alone */
81     if (QLIST_IS_INSERTED(node, node_deleted)) {
82         return false;
83     }
84 
85     /* If a read is in progress, just mark the node as deleted */
86     if (qemu_lockcnt_count(&ctx->list_lock)) {
87         QLIST_INSERT_HEAD_RCU(&ctx->deleted_aio_handlers, node, node_deleted);
88         return false;
89     }
90     /* Otherwise, delete it for real.  We can't just mark it as
91      * deleted because deleted nodes are only cleaned up while
92      * no one is walking the handlers list.
93      */
94     QLIST_SAFE_REMOVE(node, node_poll);
95     QLIST_REMOVE(node, node);
96     return true;
97 }
98 
99 void aio_set_fd_handler(AioContext *ctx,
100                         int fd,
101                         bool is_external,
102                         IOHandler *io_read,
103                         IOHandler *io_write,
104                         AioPollFn *io_poll,
105                         IOHandler *io_poll_ready,
106                         void *opaque)
107 {
108     AioHandler *node;
109     AioHandler *new_node = NULL;
110     bool is_new = false;
111     bool deleted = false;
112     int poll_disable_change;
113 
114     if (io_poll && !io_poll_ready) {
115         io_poll = NULL; /* polling only makes sense if there is a handler */
116     }
117 
118     qemu_lockcnt_lock(&ctx->list_lock);
119 
120     node = find_aio_handler(ctx, fd);
121 
122     /* Are we deleting the fd handler? */
123     if (!io_read && !io_write && !io_poll) {
124         if (node == NULL) {
125             qemu_lockcnt_unlock(&ctx->list_lock);
126             return;
127         }
128         /* Clean events in order to unregister fd from the ctx epoll. */
129         node->pfd.events = 0;
130 
131         poll_disable_change = -!node->io_poll;
132     } else {
133         poll_disable_change = !io_poll - (node && !node->io_poll);
134         if (node == NULL) {
135             is_new = true;
136         }
137         /* Alloc and insert if it's not already there */
138         new_node = g_new0(AioHandler, 1);
139 
140         /* Update handler with latest information */
141         new_node->io_read = io_read;
142         new_node->io_write = io_write;
143         new_node->io_poll = io_poll;
144         new_node->io_poll_ready = io_poll_ready;
145         new_node->opaque = opaque;
146         new_node->is_external = is_external;
147 
148         if (is_new) {
149             new_node->pfd.fd = fd;
150         } else {
151             new_node->pfd = node->pfd;
152         }
153         g_source_add_poll(&ctx->source, &new_node->pfd);
154 
155         new_node->pfd.events = (io_read ? G_IO_IN | G_IO_HUP | G_IO_ERR : 0);
156         new_node->pfd.events |= (io_write ? G_IO_OUT | G_IO_ERR : 0);
157 
158         QLIST_INSERT_HEAD_RCU(&ctx->aio_handlers, new_node, node);
159     }
160 
161     /* No need to order poll_disable_cnt writes against other updates;
162      * the counter is only used to avoid wasting time and latency on
163      * iterated polling when the system call will be ultimately necessary.
164      * Changing handlers is a rare event, and a little wasted polling until
165      * the aio_notify below is not an issue.
166      */
167     qatomic_set(&ctx->poll_disable_cnt,
168                qatomic_read(&ctx->poll_disable_cnt) + poll_disable_change);
169 
170     ctx->fdmon_ops->update(ctx, node, new_node);
171     if (node) {
172         deleted = aio_remove_fd_handler(ctx, node);
173     }
174     qemu_lockcnt_unlock(&ctx->list_lock);
175     aio_notify(ctx);
176 
177     if (deleted) {
178         g_free(node);
179     }
180 }
181 
182 void aio_set_fd_poll(AioContext *ctx, int fd,
183                      IOHandler *io_poll_begin,
184                      IOHandler *io_poll_end)
185 {
186     AioHandler *node = find_aio_handler(ctx, fd);
187 
188     if (!node) {
189         return;
190     }
191 
192     node->io_poll_begin = io_poll_begin;
193     node->io_poll_end = io_poll_end;
194 }
195 
196 void aio_set_event_notifier(AioContext *ctx,
197                             EventNotifier *notifier,
198                             bool is_external,
199                             EventNotifierHandler *io_read,
200                             AioPollFn *io_poll,
201                             EventNotifierHandler *io_poll_ready)
202 {
203     aio_set_fd_handler(ctx, event_notifier_get_fd(notifier), is_external,
204                        (IOHandler *)io_read, NULL, io_poll,
205                        (IOHandler *)io_poll_ready, notifier);
206 }
207 
208 void aio_set_event_notifier_poll(AioContext *ctx,
209                                  EventNotifier *notifier,
210                                  EventNotifierHandler *io_poll_begin,
211                                  EventNotifierHandler *io_poll_end)
212 {
213     aio_set_fd_poll(ctx, event_notifier_get_fd(notifier),
214                     (IOHandler *)io_poll_begin,
215                     (IOHandler *)io_poll_end);
216 }
217 
218 static bool poll_set_started(AioContext *ctx, AioHandlerList *ready_list,
219                              bool started)
220 {
221     AioHandler *node;
222     bool progress = false;
223 
224     if (started == ctx->poll_started) {
225         return false;
226     }
227 
228     ctx->poll_started = started;
229 
230     qemu_lockcnt_inc(&ctx->list_lock);
231     QLIST_FOREACH(node, &ctx->poll_aio_handlers, node_poll) {
232         IOHandler *fn;
233 
234         if (QLIST_IS_INSERTED(node, node_deleted)) {
235             continue;
236         }
237 
238         if (started) {
239             fn = node->io_poll_begin;
240         } else {
241             fn = node->io_poll_end;
242         }
243 
244         if (fn) {
245             fn(node->opaque);
246         }
247 
248         /* Poll one last time in case ->io_poll_end() raced with the event */
249         if (!started && node->io_poll(node->opaque)) {
250             aio_add_ready_handler(ready_list, node, REVENTS_POLL_READY);
251             progress = true;
252         }
253     }
254     qemu_lockcnt_dec(&ctx->list_lock);
255 
256     return progress;
257 }
258 
259 
260 bool aio_prepare(AioContext *ctx)
261 {
262     AioHandlerList ready_list = QLIST_HEAD_INITIALIZER(ready_list);
263 
264     /* Poll mode cannot be used with glib's event loop, disable it. */
265     poll_set_started(ctx, &ready_list, false);
266     /* TODO what to do with this list? */
267 
268     return false;
269 }
270 
271 bool aio_pending(AioContext *ctx)
272 {
273     AioHandler *node;
274     bool result = false;
275 
276     /*
277      * We have to walk very carefully in case aio_set_fd_handler is
278      * called while we're walking.
279      */
280     qemu_lockcnt_inc(&ctx->list_lock);
281 
282     QLIST_FOREACH_RCU(node, &ctx->aio_handlers, node) {
283         int revents;
284 
285         revents = node->pfd.revents & node->pfd.events;
286         if (revents & (G_IO_IN | G_IO_HUP | G_IO_ERR) && node->io_read &&
287             aio_node_check(ctx, node->is_external)) {
288             result = true;
289             break;
290         }
291         if (revents & (G_IO_OUT | G_IO_ERR) && node->io_write &&
292             aio_node_check(ctx, node->is_external)) {
293             result = true;
294             break;
295         }
296     }
297     qemu_lockcnt_dec(&ctx->list_lock);
298 
299     return result;
300 }
301 
302 static void aio_free_deleted_handlers(AioContext *ctx)
303 {
304     AioHandler *node;
305 
306     if (QLIST_EMPTY_RCU(&ctx->deleted_aio_handlers)) {
307         return;
308     }
309     if (!qemu_lockcnt_dec_if_lock(&ctx->list_lock)) {
310         return; /* we are nested, let the parent do the freeing */
311     }
312 
313     while ((node = QLIST_FIRST_RCU(&ctx->deleted_aio_handlers))) {
314         QLIST_REMOVE(node, node);
315         QLIST_REMOVE(node, node_deleted);
316         QLIST_SAFE_REMOVE(node, node_poll);
317         g_free(node);
318     }
319 
320     qemu_lockcnt_inc_and_unlock(&ctx->list_lock);
321 }
322 
323 static bool aio_dispatch_handler(AioContext *ctx, AioHandler *node)
324 {
325     bool progress = false;
326     int revents;
327 
328     revents = node->pfd.revents & node->pfd.events;
329     node->pfd.revents = 0;
330 
331     /*
332      * Start polling AioHandlers when they become ready because activity is
333      * likely to continue.  Note that starvation is theoretically possible when
334      * fdmon_supports_polling(), but only until the fd fires for the first
335      * time.
336      */
337     if (!QLIST_IS_INSERTED(node, node_deleted) &&
338         !QLIST_IS_INSERTED(node, node_poll) &&
339         node->io_poll) {
340         trace_poll_add(ctx, node, node->pfd.fd, revents);
341         if (ctx->poll_started && node->io_poll_begin) {
342             node->io_poll_begin(node->opaque);
343         }
344         QLIST_INSERT_HEAD(&ctx->poll_aio_handlers, node, node_poll);
345     }
346     if (!QLIST_IS_INSERTED(node, node_deleted) &&
347         revents == 0 &&
348         aio_node_check(ctx, node->is_external) &&
349         node->io_poll_ready) {
350         node->io_poll_ready(node->opaque);
351 
352         /*
353          * Return early since revents was zero. aio_notify() does not count as
354          * progress.
355          */
356         return node->opaque != &ctx->notifier;
357     }
358 
359     if (!QLIST_IS_INSERTED(node, node_deleted) &&
360         (revents & (G_IO_IN | G_IO_HUP | G_IO_ERR)) &&
361         aio_node_check(ctx, node->is_external) &&
362         node->io_read) {
363         node->io_read(node->opaque);
364 
365         /* aio_notify() does not count as progress */
366         if (node->opaque != &ctx->notifier) {
367             progress = true;
368         }
369     }
370     if (!QLIST_IS_INSERTED(node, node_deleted) &&
371         (revents & (G_IO_OUT | G_IO_ERR)) &&
372         aio_node_check(ctx, node->is_external) &&
373         node->io_write) {
374         node->io_write(node->opaque);
375         progress = true;
376     }
377 
378     return progress;
379 }
380 
381 /*
382  * If we have a list of ready handlers then this is more efficient than
383  * scanning all handlers with aio_dispatch_handlers().
384  */
385 static bool aio_dispatch_ready_handlers(AioContext *ctx,
386                                         AioHandlerList *ready_list)
387 {
388     bool progress = false;
389     AioHandler *node;
390 
391     while ((node = QLIST_FIRST(ready_list))) {
392         QLIST_REMOVE(node, node_ready);
393         progress = aio_dispatch_handler(ctx, node) || progress;
394     }
395 
396     return progress;
397 }
398 
399 /* Slower than aio_dispatch_ready_handlers() but only used via glib */
400 static bool aio_dispatch_handlers(AioContext *ctx)
401 {
402     AioHandler *node, *tmp;
403     bool progress = false;
404 
405     QLIST_FOREACH_SAFE_RCU(node, &ctx->aio_handlers, node, tmp) {
406         progress = aio_dispatch_handler(ctx, node) || progress;
407     }
408 
409     return progress;
410 }
411 
412 void aio_dispatch(AioContext *ctx)
413 {
414     qemu_lockcnt_inc(&ctx->list_lock);
415     aio_bh_poll(ctx);
416     aio_dispatch_handlers(ctx);
417     aio_free_deleted_handlers(ctx);
418     qemu_lockcnt_dec(&ctx->list_lock);
419 
420     timerlistgroup_run_timers(&ctx->tlg);
421 }
422 
423 static bool run_poll_handlers_once(AioContext *ctx,
424                                    AioHandlerList *ready_list,
425                                    int64_t now,
426                                    int64_t *timeout)
427 {
428     bool progress = false;
429     AioHandler *node;
430     AioHandler *tmp;
431 
432     QLIST_FOREACH_SAFE(node, &ctx->poll_aio_handlers, node_poll, tmp) {
433         if (aio_node_check(ctx, node->is_external) &&
434             node->io_poll(node->opaque)) {
435             aio_add_ready_handler(ready_list, node, REVENTS_POLL_READY);
436 
437             node->poll_idle_timeout = now + POLL_IDLE_INTERVAL_NS;
438 
439             /*
440              * Polling was successful, exit try_poll_mode immediately
441              * to adjust the next polling time.
442              */
443             *timeout = 0;
444             if (node->opaque != &ctx->notifier) {
445                 progress = true;
446             }
447         }
448 
449         /* Caller handles freeing deleted nodes.  Don't do it here. */
450     }
451 
452     return progress;
453 }
454 
455 static bool fdmon_supports_polling(AioContext *ctx)
456 {
457     return ctx->fdmon_ops->need_wait != aio_poll_disabled;
458 }
459 
460 static bool remove_idle_poll_handlers(AioContext *ctx,
461                                       AioHandlerList *ready_list,
462                                       int64_t now)
463 {
464     AioHandler *node;
465     AioHandler *tmp;
466     bool progress = false;
467 
468     /*
469      * File descriptor monitoring implementations without userspace polling
470      * support suffer from starvation when a subset of handlers is polled
471      * because fds will not be processed in a timely fashion.  Don't remove
472      * idle poll handlers.
473      */
474     if (!fdmon_supports_polling(ctx)) {
475         return false;
476     }
477 
478     QLIST_FOREACH_SAFE(node, &ctx->poll_aio_handlers, node_poll, tmp) {
479         if (node->poll_idle_timeout == 0LL) {
480             node->poll_idle_timeout = now + POLL_IDLE_INTERVAL_NS;
481         } else if (now >= node->poll_idle_timeout) {
482             trace_poll_remove(ctx, node, node->pfd.fd);
483             node->poll_idle_timeout = 0LL;
484             QLIST_SAFE_REMOVE(node, node_poll);
485             if (ctx->poll_started && node->io_poll_end) {
486                 node->io_poll_end(node->opaque);
487 
488                 /*
489                  * Final poll in case ->io_poll_end() races with an event.
490                  * Nevermind about re-adding the handler in the rare case where
491                  * this causes progress.
492                  */
493                 if (node->io_poll(node->opaque)) {
494                     aio_add_ready_handler(ready_list, node,
495                                           REVENTS_POLL_READY);
496                     progress = true;
497                 }
498             }
499         }
500     }
501 
502     return progress;
503 }
504 
505 /* run_poll_handlers:
506  * @ctx: the AioContext
507  * @ready_list: the list to place ready handlers on
508  * @max_ns: maximum time to poll for, in nanoseconds
509  *
510  * Polls for a given time.
511  *
512  * Note that the caller must have incremented ctx->list_lock.
513  *
514  * Returns: true if progress was made, false otherwise
515  */
516 static bool run_poll_handlers(AioContext *ctx, AioHandlerList *ready_list,
517                               int64_t max_ns, int64_t *timeout)
518 {
519     bool progress;
520     int64_t start_time, elapsed_time;
521 
522     assert(qemu_lockcnt_count(&ctx->list_lock) > 0);
523 
524     trace_run_poll_handlers_begin(ctx, max_ns, *timeout);
525 
526     /*
527      * Optimization: ->io_poll() handlers often contain RCU read critical
528      * sections and we therefore see many rcu_read_lock() -> rcu_read_unlock()
529      * -> rcu_read_lock() -> ... sequences with expensive memory
530      * synchronization primitives.  Make the entire polling loop an RCU
531      * critical section because nested rcu_read_lock()/rcu_read_unlock() calls
532      * are cheap.
533      */
534     RCU_READ_LOCK_GUARD();
535 
536     start_time = qemu_clock_get_ns(QEMU_CLOCK_REALTIME);
537     do {
538         progress = run_poll_handlers_once(ctx, ready_list,
539                                           start_time, timeout);
540         elapsed_time = qemu_clock_get_ns(QEMU_CLOCK_REALTIME) - start_time;
541         max_ns = qemu_soonest_timeout(*timeout, max_ns);
542         assert(!(max_ns && progress));
543     } while (elapsed_time < max_ns && !ctx->fdmon_ops->need_wait(ctx));
544 
545     if (remove_idle_poll_handlers(ctx, ready_list,
546                                   start_time + elapsed_time)) {
547         *timeout = 0;
548         progress = true;
549     }
550 
551     /* If time has passed with no successful polling, adjust *timeout to
552      * keep the same ending time.
553      */
554     if (*timeout != -1) {
555         *timeout -= MIN(*timeout, elapsed_time);
556     }
557 
558     trace_run_poll_handlers_end(ctx, progress, *timeout);
559     return progress;
560 }
561 
562 /* try_poll_mode:
563  * @ctx: the AioContext
564  * @ready_list: list to add handlers that need to be run
565  * @timeout: timeout for blocking wait, computed by the caller and updated if
566  *    polling succeeds.
567  *
568  * Note that the caller must have incremented ctx->list_lock.
569  *
570  * Returns: true if progress was made, false otherwise
571  */
572 static bool try_poll_mode(AioContext *ctx, AioHandlerList *ready_list,
573                           int64_t *timeout)
574 {
575     int64_t max_ns;
576 
577     if (QLIST_EMPTY_RCU(&ctx->poll_aio_handlers)) {
578         return false;
579     }
580 
581     max_ns = qemu_soonest_timeout(*timeout, ctx->poll_ns);
582     if (max_ns && !ctx->fdmon_ops->need_wait(ctx)) {
583         poll_set_started(ctx, ready_list, true);
584 
585         if (run_poll_handlers(ctx, ready_list, max_ns, timeout)) {
586             return true;
587         }
588     }
589 
590     if (poll_set_started(ctx, ready_list, false)) {
591         *timeout = 0;
592         return true;
593     }
594 
595     return false;
596 }
597 
598 bool aio_poll(AioContext *ctx, bool blocking)
599 {
600     AioHandlerList ready_list = QLIST_HEAD_INITIALIZER(ready_list);
601     bool progress;
602     bool use_notify_me;
603     int64_t timeout;
604     int64_t start = 0;
605 
606     /*
607      * There cannot be two concurrent aio_poll calls for the same AioContext (or
608      * an aio_poll concurrent with a GSource prepare/check/dispatch callback).
609      * We rely on this below to avoid slow locked accesses to ctx->notify_me.
610      *
611      * aio_poll() may only be called in the AioContext's thread. iohandler_ctx
612      * is special in that it runs in the main thread, but that thread's context
613      * is qemu_aio_context.
614      */
615     assert(in_aio_context_home_thread(ctx == iohandler_get_aio_context() ?
616                                       qemu_get_aio_context() : ctx));
617 
618     qemu_lockcnt_inc(&ctx->list_lock);
619 
620     if (ctx->poll_max_ns) {
621         start = qemu_clock_get_ns(QEMU_CLOCK_REALTIME);
622     }
623 
624     timeout = blocking ? aio_compute_timeout(ctx) : 0;
625     progress = try_poll_mode(ctx, &ready_list, &timeout);
626     assert(!(timeout && progress));
627 
628     /*
629      * aio_notify can avoid the expensive event_notifier_set if
630      * everything (file descriptors, bottom halves, timers) will
631      * be re-evaluated before the next blocking poll().  This is
632      * already true when aio_poll is called with blocking == false;
633      * if blocking == true, it is only true after poll() returns,
634      * so disable the optimization now.
635      */
636     use_notify_me = timeout != 0;
637     if (use_notify_me) {
638         qatomic_set(&ctx->notify_me, qatomic_read(&ctx->notify_me) + 2);
639         /*
640          * Write ctx->notify_me before reading ctx->notified.  Pairs with
641          * smp_mb in aio_notify().
642          */
643         smp_mb();
644 
645         /* Don't block if aio_notify() was called */
646         if (qatomic_read(&ctx->notified)) {
647             timeout = 0;
648         }
649     }
650 
651     /* If polling is allowed, non-blocking aio_poll does not need the
652      * system call---a single round of run_poll_handlers_once suffices.
653      */
654     if (timeout || ctx->fdmon_ops->need_wait(ctx)) {
655         ctx->fdmon_ops->wait(ctx, &ready_list, timeout);
656     }
657 
658     if (use_notify_me) {
659         /* Finish the poll before clearing the flag.  */
660         qatomic_store_release(&ctx->notify_me,
661                              qatomic_read(&ctx->notify_me) - 2);
662     }
663 
664     aio_notify_accept(ctx);
665 
666     /* Adjust polling time */
667     if (ctx->poll_max_ns) {
668         int64_t block_ns = qemu_clock_get_ns(QEMU_CLOCK_REALTIME) - start;
669 
670         if (block_ns <= ctx->poll_ns) {
671             /* This is the sweet spot, no adjustment needed */
672         } else if (block_ns > ctx->poll_max_ns) {
673             /* We'd have to poll for too long, poll less */
674             int64_t old = ctx->poll_ns;
675 
676             if (ctx->poll_shrink) {
677                 ctx->poll_ns /= ctx->poll_shrink;
678             } else {
679                 ctx->poll_ns = 0;
680             }
681 
682             trace_poll_shrink(ctx, old, ctx->poll_ns);
683         } else if (ctx->poll_ns < ctx->poll_max_ns &&
684                    block_ns < ctx->poll_max_ns) {
685             /* There is room to grow, poll longer */
686             int64_t old = ctx->poll_ns;
687             int64_t grow = ctx->poll_grow;
688 
689             if (grow == 0) {
690                 grow = 2;
691             }
692 
693             if (ctx->poll_ns) {
694                 ctx->poll_ns *= grow;
695             } else {
696                 ctx->poll_ns = 4000; /* start polling at 4 microseconds */
697             }
698 
699             if (ctx->poll_ns > ctx->poll_max_ns) {
700                 ctx->poll_ns = ctx->poll_max_ns;
701             }
702 
703             trace_poll_grow(ctx, old, ctx->poll_ns);
704         }
705     }
706 
707     progress |= aio_bh_poll(ctx);
708     progress |= aio_dispatch_ready_handlers(ctx, &ready_list);
709 
710     aio_free_deleted_handlers(ctx);
711 
712     qemu_lockcnt_dec(&ctx->list_lock);
713 
714     progress |= timerlistgroup_run_timers(&ctx->tlg);
715 
716     return progress;
717 }
718 
719 void aio_context_setup(AioContext *ctx)
720 {
721     ctx->fdmon_ops = &fdmon_poll_ops;
722     ctx->epollfd = -1;
723 
724     /* Use the fastest fd monitoring implementation if available */
725     if (fdmon_io_uring_setup(ctx)) {
726         return;
727     }
728 
729     fdmon_epoll_setup(ctx);
730 }
731 
732 void aio_context_destroy(AioContext *ctx)
733 {
734     fdmon_io_uring_destroy(ctx);
735     fdmon_epoll_disable(ctx);
736     aio_free_deleted_handlers(ctx);
737 }
738 
739 void aio_context_use_g_source(AioContext *ctx)
740 {
741     /*
742      * Disable io_uring when the glib main loop is used because it doesn't
743      * support mixed glib/aio_poll() usage. It relies on aio_poll() being
744      * called regularly so that changes to the monitored file descriptors are
745      * submitted, otherwise a list of pending fd handlers builds up.
746      */
747     fdmon_io_uring_destroy(ctx);
748     aio_free_deleted_handlers(ctx);
749 }
750 
751 void aio_context_set_poll_params(AioContext *ctx, int64_t max_ns,
752                                  int64_t grow, int64_t shrink, Error **errp)
753 {
754     /* No thread synchronization here, it doesn't matter if an incorrect value
755      * is used once.
756      */
757     ctx->poll_max_ns = max_ns;
758     ctx->poll_ns = 0;
759     ctx->poll_grow = grow;
760     ctx->poll_shrink = shrink;
761 
762     aio_notify(ctx);
763 }
764 
765 void aio_context_set_aio_params(AioContext *ctx, int64_t max_batch,
766                                 Error **errp)
767 {
768     /*
769      * No thread synchronization here, it doesn't matter if an incorrect value
770      * is used once.
771      */
772     ctx->aio_max_batch = max_batch;
773 
774     aio_notify(ctx);
775 }
776