xref: /openbmc/qemu/util/fdmon-io_uring.c (revision 9febfa94b69b7146582c48a868bd2330ac45037f)
1 /* SPDX-License-Identifier: GPL-2.0-or-later */
2 /*
3  * Linux io_uring file descriptor monitoring
4  *
5  * The Linux io_uring API supports file descriptor monitoring with a few
6  * advantages over existing APIs like poll(2) and epoll(7):
7  *
8  * 1. Userspace polling of events is possible because the completion queue (cq
9  *    ring) is shared between the kernel and userspace.  This allows
10  *    applications that rely on userspace polling to also monitor file
11  *    descriptors in the same userspace polling loop.
12  *
13  * 2. Submission and completion is batched and done together in a single system
14  *    call.  This minimizes the number of system calls.
15  *
16  * 3. File descriptor monitoring is O(1) like epoll(7) so it scales better than
17  *    poll(2).
18  *
19  * 4. Nanosecond timeouts are supported so it requires fewer syscalls than
20  *    epoll(7).
21  *
22  * This code only monitors file descriptors and does not do asynchronous disk
23  * I/O.  Implementing disk I/O efficiently has other requirements and should
24  * use a separate io_uring so it does not make sense to unify the code.
25  *
26  * File descriptor monitoring is implemented using the following operations:
27  *
28  * 1. IORING_OP_POLL_ADD - adds a file descriptor to be monitored.
29  * 2. IORING_OP_POLL_REMOVE - removes a file descriptor being monitored.  When
30  *    the poll mask changes for a file descriptor it is first removed and then
31  *    re-added with the new poll mask, so this operation is also used as part
32  *    of modifying an existing monitored file descriptor.
33  * 3. IORING_OP_TIMEOUT - added every time a blocking syscall is made to wait
34  *    for events.  This operation self-cancels if another event completes
35  *    before the timeout.
36  *
37  * io_uring calls the submission queue the "sq ring" and the completion queue
38  * the "cq ring".  Ring entries are called "sqe" and "cqe", respectively.
39  *
40  * The code is structured so that sq/cq rings are only modified within
41  * fdmon_io_uring_wait().  Changes to AioHandlers are made by enqueuing them on
42  * ctx->submit_list so that fdmon_io_uring_wait() can submit IORING_OP_POLL_ADD
43  * and/or IORING_OP_POLL_REMOVE sqes for them.
44  */
45 
46 #include "qemu/osdep.h"
47 #include <poll.h>
48 #include "qapi/error.h"
49 #include "qemu/defer-call.h"
50 #include "qemu/rcu_queue.h"
51 #include "aio-posix.h"
52 #include "trace.h"
53 
54 enum {
55     FDMON_IO_URING_ENTRIES  = 128, /* sq/cq ring size */
56 
57     /* AioHandler::flags */
58     FDMON_IO_URING_PENDING            = (1 << 0),
59     FDMON_IO_URING_ADD                = (1 << 1),
60     FDMON_IO_URING_REMOVE             = (1 << 2),
61     FDMON_IO_URING_DELETE_AIO_HANDLER = (1 << 3),
62 };
63 
64 static inline int poll_events_from_pfd(int pfd_events)
65 {
66     return (pfd_events & G_IO_IN ? POLLIN : 0) |
67            (pfd_events & G_IO_OUT ? POLLOUT : 0) |
68            (pfd_events & G_IO_HUP ? POLLHUP : 0) |
69            (pfd_events & G_IO_ERR ? POLLERR : 0);
70 }
71 
72 static inline int pfd_events_from_poll(int poll_events)
73 {
74     return (poll_events & POLLIN ? G_IO_IN : 0) |
75            (poll_events & POLLOUT ? G_IO_OUT : 0) |
76            (poll_events & POLLHUP ? G_IO_HUP : 0) |
77            (poll_events & POLLERR ? G_IO_ERR : 0);
78 }
79 
80 /*
81  * Returns an sqe for submitting a request. Only called from the AioContext
82  * thread.
83  */
84 static struct io_uring_sqe *get_sqe(AioContext *ctx)
85 {
86     struct io_uring *ring = &ctx->fdmon_io_uring;
87     struct io_uring_sqe *sqe = io_uring_get_sqe(ring);
88     int ret;
89 
90     if (likely(sqe)) {
91         return sqe;
92     }
93 
94     /* No free sqes left, submit pending sqes first */
95     do {
96         ret = io_uring_submit(ring);
97     } while (ret == -EINTR);
98 
99     assert(ret > 1);
100     sqe = io_uring_get_sqe(ring);
101     assert(sqe);
102     return sqe;
103 }
104 
105 /* Atomically enqueue an AioHandler for sq ring submission */
106 static void enqueue(AioHandlerSList *head, AioHandler *node, unsigned flags)
107 {
108     unsigned old_flags;
109 
110     old_flags = qatomic_fetch_or(&node->flags, FDMON_IO_URING_PENDING | flags);
111     if (!(old_flags & FDMON_IO_URING_PENDING)) {
112         QSLIST_INSERT_HEAD_ATOMIC(head, node, node_submitted);
113     }
114 }
115 
116 /* Dequeue an AioHandler for sq ring submission.  Called by fill_sq_ring(). */
117 static AioHandler *dequeue(AioHandlerSList *head, unsigned *flags)
118 {
119     AioHandler *node = QSLIST_FIRST(head);
120 
121     if (!node) {
122         return NULL;
123     }
124 
125     /* Doesn't need to be atomic since fill_sq_ring() moves the list */
126     QSLIST_REMOVE_HEAD(head, node_submitted);
127 
128     /*
129      * Don't clear FDMON_IO_URING_REMOVE.  It's sticky so it can serve two
130      * purposes: telling fill_sq_ring() to submit IORING_OP_POLL_REMOVE and
131      * telling process_cqe() to delete the AioHandler when its
132      * IORING_OP_POLL_ADD completes.
133      */
134     *flags = qatomic_fetch_and(&node->flags, ~(FDMON_IO_URING_PENDING |
135                                               FDMON_IO_URING_ADD));
136     return node;
137 }
138 
139 static void fdmon_io_uring_update(AioContext *ctx,
140                                   AioHandler *old_node,
141                                   AioHandler *new_node)
142 {
143     if (new_node) {
144         enqueue(&ctx->submit_list, new_node, FDMON_IO_URING_ADD);
145     }
146 
147     if (old_node) {
148         /*
149          * Deletion is tricky because IORING_OP_POLL_ADD and
150          * IORING_OP_POLL_REMOVE are async.  We need to wait for the original
151          * IORING_OP_POLL_ADD to complete before this handler can be freed
152          * safely.
153          *
154          * It's possible that the file descriptor becomes ready and the
155          * IORING_OP_POLL_ADD cqe is enqueued before IORING_OP_POLL_REMOVE is
156          * submitted, too.
157          *
158          * Mark this handler deleted right now but don't place it on
159          * ctx->deleted_aio_handlers yet.  Instead, manually fudge the list
160          * entry to make QLIST_IS_INSERTED() think this handler has been
161          * inserted and other code recognizes this AioHandler as deleted.
162          *
163          * Once the original IORING_OP_POLL_ADD completes we enqueue the
164          * handler on the real ctx->deleted_aio_handlers list to be freed.
165          */
166         assert(!QLIST_IS_INSERTED(old_node, node_deleted));
167         old_node->node_deleted.le_prev = &old_node->node_deleted.le_next;
168 
169         enqueue(&ctx->submit_list, old_node, FDMON_IO_URING_REMOVE);
170     }
171 }
172 
173 static void fdmon_io_uring_add_sqe(AioContext *ctx,
174         void (*prep_sqe)(struct io_uring_sqe *sqe, void *opaque),
175         void *opaque, CqeHandler *cqe_handler)
176 {
177     struct io_uring_sqe *sqe = get_sqe(ctx);
178 
179     prep_sqe(sqe, opaque);
180     io_uring_sqe_set_data(sqe, cqe_handler);
181 
182     trace_fdmon_io_uring_add_sqe(ctx, opaque, sqe->opcode, sqe->fd, sqe->off,
183                                  cqe_handler);
184 }
185 
186 static void fdmon_special_cqe_handler(CqeHandler *cqe_handler)
187 {
188     /*
189      * This is an empty function that is never called. It is used as a function
190      * pointer to distinguish it from ordinary cqe handlers.
191      */
192 }
193 
194 static void add_poll_add_sqe(AioContext *ctx, AioHandler *node)
195 {
196     struct io_uring_sqe *sqe = get_sqe(ctx);
197     int events = poll_events_from_pfd(node->pfd.events);
198 
199     io_uring_prep_poll_add(sqe, node->pfd.fd, events);
200     node->internal_cqe_handler.cb = fdmon_special_cqe_handler;
201     io_uring_sqe_set_data(sqe, &node->internal_cqe_handler);
202 }
203 
204 static void add_poll_remove_sqe(AioContext *ctx, AioHandler *node)
205 {
206     struct io_uring_sqe *sqe = get_sqe(ctx);
207     CqeHandler *cqe_handler = &node->internal_cqe_handler;
208 
209 #ifdef LIBURING_HAVE_DATA64
210     io_uring_prep_poll_remove(sqe, (uintptr_t)cqe_handler);
211 #else
212     io_uring_prep_poll_remove(sqe, cqe_handler);
213 #endif
214     io_uring_sqe_set_data(sqe, NULL);
215 }
216 
217 /* Add sqes from ctx->submit_list for submission */
218 static void fill_sq_ring(AioContext *ctx)
219 {
220     AioHandlerSList submit_list;
221     AioHandler *node;
222     unsigned flags;
223 
224     QSLIST_MOVE_ATOMIC(&submit_list, &ctx->submit_list);
225 
226     while ((node = dequeue(&submit_list, &flags))) {
227         /* Order matters, just in case both flags were set */
228         if (flags & FDMON_IO_URING_ADD) {
229             add_poll_add_sqe(ctx, node);
230         }
231         if (flags & FDMON_IO_URING_REMOVE) {
232             add_poll_remove_sqe(ctx, node);
233         }
234         if (flags & FDMON_IO_URING_DELETE_AIO_HANDLER) {
235             /*
236              * process_cqe() sets this flag after ADD and REMOVE have been
237              * cleared. They cannot be set again, so they must be clear.
238              */
239             assert(!(flags & FDMON_IO_URING_ADD));
240             assert(!(flags & FDMON_IO_URING_REMOVE));
241 
242             QLIST_INSERT_HEAD_RCU(&ctx->deleted_aio_handlers, node, node_deleted);
243         }
244     }
245 }
246 
247 static bool process_cqe_aio_handler(AioContext *ctx,
248                                     AioHandlerList *ready_list,
249                                     AioHandler *node,
250                                     struct io_uring_cqe *cqe)
251 {
252     unsigned flags;
253 
254     /*
255      * Deletion can only happen when IORING_OP_POLL_ADD completes.  If we race
256      * with enqueue() here then we can safely clear the FDMON_IO_URING_REMOVE
257      * bit before IORING_OP_POLL_REMOVE is submitted.
258      */
259     flags = qatomic_fetch_and(&node->flags, ~FDMON_IO_URING_REMOVE);
260     if (flags & FDMON_IO_URING_REMOVE) {
261         if (flags & FDMON_IO_URING_PENDING) {
262             /* Still on ctx->submit_list, defer deletion until fill_sq_ring() */
263             qatomic_or(&node->flags, FDMON_IO_URING_DELETE_AIO_HANDLER);
264         } else {
265             QLIST_INSERT_HEAD_RCU(&ctx->deleted_aio_handlers, node, node_deleted);
266         }
267         return false;
268     }
269 
270     aio_add_ready_handler(ready_list, node, pfd_events_from_poll(cqe->res));
271 
272     /* IORING_OP_POLL_ADD is one-shot so we must re-arm it */
273     add_poll_add_sqe(ctx, node);
274     return true;
275 }
276 
277 /* Returns true if a handler became ready */
278 static bool process_cqe(AioContext *ctx,
279                         AioHandlerList *ready_list,
280                         struct io_uring_cqe *cqe)
281 {
282     CqeHandler *cqe_handler = io_uring_cqe_get_data(cqe);
283 
284     /* poll_timeout and poll_remove have a zero user_data field */
285     if (!cqe_handler) {
286         return false;
287     }
288 
289     /*
290      * Special handling for AioHandler cqes. They need ready_list and have a
291      * return value.
292      */
293     if (cqe_handler->cb == fdmon_special_cqe_handler) {
294         AioHandler *node = container_of(cqe_handler, AioHandler,
295                                         internal_cqe_handler);
296         return process_cqe_aio_handler(ctx, ready_list, node, cqe);
297     }
298 
299     cqe_handler->cqe = *cqe;
300 
301     /* Handlers are invoked later by fdmon_io_uring_dispatch() */
302     QSIMPLEQ_INSERT_TAIL(&ctx->cqe_handler_ready_list, cqe_handler, next);
303     return false;
304 }
305 
306 static int process_cq_ring(AioContext *ctx, AioHandlerList *ready_list)
307 {
308     struct io_uring *ring = &ctx->fdmon_io_uring;
309     struct io_uring_cqe *cqe;
310     unsigned num_cqes = 0;
311     unsigned num_ready = 0;
312     unsigned head;
313 
314 #ifdef HAVE_IO_URING_CQ_HAS_OVERFLOW
315     /* If the CQ overflowed then fetch CQEs with a syscall */
316     if (io_uring_cq_has_overflow(ring)) {
317         io_uring_get_events(ring);
318     }
319 #endif
320 
321     io_uring_for_each_cqe(ring, head, cqe) {
322         if (process_cqe(ctx, ready_list, cqe)) {
323             num_ready++;
324         }
325 
326         num_cqes++;
327     }
328 
329     io_uring_cq_advance(ring, num_cqes);
330     return num_ready;
331 }
332 
333 /* This is where SQEs are submitted in the glib event loop */
334 static void fdmon_io_uring_gsource_prepare(AioContext *ctx)
335 {
336     fill_sq_ring(ctx);
337     if (io_uring_sq_ready(&ctx->fdmon_io_uring)) {
338         while (io_uring_submit(&ctx->fdmon_io_uring) == -EINTR) {
339             /* Keep trying if syscall was interrupted */
340         }
341     }
342 }
343 
344 static bool fdmon_io_uring_gsource_check(AioContext *ctx)
345 {
346     gpointer tag = ctx->io_uring_fd_tag;
347     return g_source_query_unix_fd(&ctx->source, tag) & G_IO_IN;
348 }
349 
350 /* Dispatch CQE handlers that are ready */
351 static bool fdmon_io_uring_dispatch(AioContext *ctx)
352 {
353     CqeHandlerSimpleQ *ready_list = &ctx->cqe_handler_ready_list;
354     bool progress = false;
355 
356     /* Handlers may use defer_call() to coalesce frequent operations */
357     defer_call_begin();
358 
359     while (!QSIMPLEQ_EMPTY(ready_list)) {
360         CqeHandler *cqe_handler = QSIMPLEQ_FIRST(ready_list);
361 
362         QSIMPLEQ_REMOVE_HEAD(ready_list, next);
363 
364         trace_fdmon_io_uring_cqe_handler(ctx, cqe_handler,
365                                          cqe_handler->cqe.res);
366         cqe_handler->cb(cqe_handler);
367         progress = true;
368     }
369 
370     defer_call_end();
371 
372     return progress;
373 }
374 
375 
376 /* This is where CQEs are processed in the glib event loop */
377 static void fdmon_io_uring_gsource_dispatch(AioContext *ctx,
378                                             AioHandlerList *ready_list)
379 {
380     process_cq_ring(ctx, ready_list);
381 }
382 
383 static int fdmon_io_uring_wait(AioContext *ctx, AioHandlerList *ready_list,
384                                int64_t timeout)
385 {
386     struct __kernel_timespec ts;
387     unsigned wait_nr = 1; /* block until at least one cqe is ready */
388     int ret;
389 
390     if (timeout == 0) {
391         wait_nr = 0; /* non-blocking */
392     } else if (timeout > 0) {
393         /* Add a timeout that self-cancels when another cqe becomes ready */
394         struct io_uring_sqe *sqe;
395 
396         ts = (struct __kernel_timespec){
397             .tv_sec = timeout / NANOSECONDS_PER_SECOND,
398             .tv_nsec = timeout % NANOSECONDS_PER_SECOND,
399         };
400 
401         sqe = get_sqe(ctx);
402         io_uring_prep_timeout(sqe, &ts, 1, 0);
403         io_uring_sqe_set_data(sqe, NULL);
404     }
405 
406     fill_sq_ring(ctx);
407 
408     /*
409      * Loop to handle signals in both cases:
410      * 1. If no SQEs were submitted, then -EINTR is returned.
411      * 2. If SQEs were submitted then the number of SQEs submitted is returned
412      *    rather than -EINTR.
413      */
414     do {
415         ret = io_uring_submit_and_wait(&ctx->fdmon_io_uring, wait_nr);
416     } while (ret == -EINTR ||
417              (ret >= 0 && wait_nr > io_uring_cq_ready(&ctx->fdmon_io_uring)));
418 
419     assert(ret >= 0);
420 
421     return process_cq_ring(ctx, ready_list);
422 }
423 
424 static bool fdmon_io_uring_need_wait(AioContext *ctx)
425 {
426     /* Have io_uring events completed? */
427     if (io_uring_cq_ready(&ctx->fdmon_io_uring)) {
428         return true;
429     }
430 
431     /* Are there pending sqes to submit? */
432     if (io_uring_sq_ready(&ctx->fdmon_io_uring)) {
433         return true;
434     }
435 
436     /* Do we need to process AioHandlers for io_uring changes? */
437     if (!QSLIST_EMPTY_RCU(&ctx->submit_list)) {
438         return true;
439     }
440 
441     return false;
442 }
443 
444 static const FDMonOps fdmon_io_uring_ops = {
445     .update = fdmon_io_uring_update,
446     .wait = fdmon_io_uring_wait,
447     .need_wait = fdmon_io_uring_need_wait,
448     .dispatch = fdmon_io_uring_dispatch,
449     .gsource_prepare = fdmon_io_uring_gsource_prepare,
450     .gsource_check = fdmon_io_uring_gsource_check,
451     .gsource_dispatch = fdmon_io_uring_gsource_dispatch,
452     .add_sqe = fdmon_io_uring_add_sqe,
453 };
454 
455 bool fdmon_io_uring_setup(AioContext *ctx, Error **errp)
456 {
457     int ret;
458 
459     ctx->io_uring_fd_tag = NULL;
460 
461     ret = io_uring_queue_init(FDMON_IO_URING_ENTRIES, &ctx->fdmon_io_uring, 0);
462     if (ret != 0) {
463         error_setg_errno(errp, -ret, "Failed to initialize io_uring");
464         return false;
465     }
466 
467     QSLIST_INIT(&ctx->submit_list);
468     QSIMPLEQ_INIT(&ctx->cqe_handler_ready_list);
469     ctx->fdmon_ops = &fdmon_io_uring_ops;
470     ctx->io_uring_fd_tag = g_source_add_unix_fd(&ctx->source,
471             ctx->fdmon_io_uring.ring_fd, G_IO_IN);
472     return true;
473 }
474 
475 void fdmon_io_uring_destroy(AioContext *ctx)
476 {
477     AioHandler *node;
478 
479     if (ctx->fdmon_ops != &fdmon_io_uring_ops) {
480         return;
481     }
482 
483     io_uring_queue_exit(&ctx->fdmon_io_uring);
484 
485     /* Move handlers due to be removed onto the deleted list */
486     while ((node = QSLIST_FIRST_RCU(&ctx->submit_list))) {
487         unsigned flags = qatomic_fetch_and(&node->flags,
488                 ~(FDMON_IO_URING_PENDING |
489                   FDMON_IO_URING_ADD |
490                   FDMON_IO_URING_REMOVE |
491                   FDMON_IO_URING_DELETE_AIO_HANDLER));
492 
493         if ((flags & FDMON_IO_URING_REMOVE) ||
494             (flags & FDMON_IO_URING_DELETE_AIO_HANDLER)) {
495             QLIST_INSERT_HEAD_RCU(&ctx->deleted_aio_handlers,
496                                   node, node_deleted);
497         }
498 
499         QSLIST_REMOVE_HEAD_RCU(&ctx->submit_list, node_submitted);
500     }
501 
502     g_source_remove_unix_fd(&ctx->source, ctx->io_uring_fd_tag);
503     ctx->io_uring_fd_tag = NULL;
504 
505     assert(QSIMPLEQ_EMPTY(&ctx->cqe_handler_ready_list));
506 
507     qemu_lockcnt_lock(&ctx->list_lock);
508     fdmon_poll_downgrade(ctx);
509     qemu_lockcnt_unlock(&ctx->list_lock);
510 }
511