1 /* SPDX-License-Identifier: GPL-2.0-or-later */ 2 /* 3 * Linux io_uring file descriptor monitoring 4 * 5 * The Linux io_uring API supports file descriptor monitoring with a few 6 * advantages over existing APIs like poll(2) and epoll(7): 7 * 8 * 1. Userspace polling of events is possible because the completion queue (cq 9 * ring) is shared between the kernel and userspace. This allows 10 * applications that rely on userspace polling to also monitor file 11 * descriptors in the same userspace polling loop. 12 * 13 * 2. Submission and completion is batched and done together in a single system 14 * call. This minimizes the number of system calls. 15 * 16 * 3. File descriptor monitoring is O(1) like epoll(7) so it scales better than 17 * poll(2). 18 * 19 * 4. Nanosecond timeouts are supported so it requires fewer syscalls than 20 * epoll(7). 21 * 22 * This code only monitors file descriptors and does not do asynchronous disk 23 * I/O. Implementing disk I/O efficiently has other requirements and should 24 * use a separate io_uring so it does not make sense to unify the code. 25 * 26 * File descriptor monitoring is implemented using the following operations: 27 * 28 * 1. IORING_OP_POLL_ADD - adds a file descriptor to be monitored. 29 * 2. IORING_OP_POLL_REMOVE - removes a file descriptor being monitored. When 30 * the poll mask changes for a file descriptor it is first removed and then 31 * re-added with the new poll mask, so this operation is also used as part 32 * of modifying an existing monitored file descriptor. 33 * 3. IORING_OP_TIMEOUT - added every time a blocking syscall is made to wait 34 * for events. This operation self-cancels if another event completes 35 * before the timeout. 36 * 37 * io_uring calls the submission queue the "sq ring" and the completion queue 38 * the "cq ring". Ring entries are called "sqe" and "cqe", respectively. 39 * 40 * The code is structured so that sq/cq rings are only modified within 41 * fdmon_io_uring_wait(). Changes to AioHandlers are made by enqueuing them on 42 * ctx->submit_list so that fdmon_io_uring_wait() can submit IORING_OP_POLL_ADD 43 * and/or IORING_OP_POLL_REMOVE sqes for them. 44 */ 45 46 #include "qemu/osdep.h" 47 #include <poll.h> 48 #include "qapi/error.h" 49 #include "qemu/defer-call.h" 50 #include "qemu/rcu_queue.h" 51 #include "aio-posix.h" 52 #include "trace.h" 53 54 enum { 55 FDMON_IO_URING_ENTRIES = 128, /* sq/cq ring size */ 56 57 /* AioHandler::flags */ 58 FDMON_IO_URING_PENDING = (1 << 0), 59 FDMON_IO_URING_ADD = (1 << 1), 60 FDMON_IO_URING_REMOVE = (1 << 2), 61 FDMON_IO_URING_DELETE_AIO_HANDLER = (1 << 3), 62 }; 63 64 static inline int poll_events_from_pfd(int pfd_events) 65 { 66 return (pfd_events & G_IO_IN ? POLLIN : 0) | 67 (pfd_events & G_IO_OUT ? POLLOUT : 0) | 68 (pfd_events & G_IO_HUP ? POLLHUP : 0) | 69 (pfd_events & G_IO_ERR ? POLLERR : 0); 70 } 71 72 static inline int pfd_events_from_poll(int poll_events) 73 { 74 return (poll_events & POLLIN ? G_IO_IN : 0) | 75 (poll_events & POLLOUT ? G_IO_OUT : 0) | 76 (poll_events & POLLHUP ? G_IO_HUP : 0) | 77 (poll_events & POLLERR ? G_IO_ERR : 0); 78 } 79 80 /* 81 * Returns an sqe for submitting a request. Only called from the AioContext 82 * thread. 83 */ 84 static struct io_uring_sqe *get_sqe(AioContext *ctx) 85 { 86 struct io_uring *ring = &ctx->fdmon_io_uring; 87 struct io_uring_sqe *sqe = io_uring_get_sqe(ring); 88 int ret; 89 90 if (likely(sqe)) { 91 return sqe; 92 } 93 94 /* No free sqes left, submit pending sqes first */ 95 do { 96 ret = io_uring_submit(ring); 97 } while (ret == -EINTR); 98 99 assert(ret > 1); 100 sqe = io_uring_get_sqe(ring); 101 assert(sqe); 102 return sqe; 103 } 104 105 /* Atomically enqueue an AioHandler for sq ring submission */ 106 static void enqueue(AioHandlerSList *head, AioHandler *node, unsigned flags) 107 { 108 unsigned old_flags; 109 110 old_flags = qatomic_fetch_or(&node->flags, FDMON_IO_URING_PENDING | flags); 111 if (!(old_flags & FDMON_IO_URING_PENDING)) { 112 QSLIST_INSERT_HEAD_ATOMIC(head, node, node_submitted); 113 } 114 } 115 116 /* Dequeue an AioHandler for sq ring submission. Called by fill_sq_ring(). */ 117 static AioHandler *dequeue(AioHandlerSList *head, unsigned *flags) 118 { 119 AioHandler *node = QSLIST_FIRST(head); 120 121 if (!node) { 122 return NULL; 123 } 124 125 /* Doesn't need to be atomic since fill_sq_ring() moves the list */ 126 QSLIST_REMOVE_HEAD(head, node_submitted); 127 128 /* 129 * Don't clear FDMON_IO_URING_REMOVE. It's sticky so it can serve two 130 * purposes: telling fill_sq_ring() to submit IORING_OP_POLL_REMOVE and 131 * telling process_cqe() to delete the AioHandler when its 132 * IORING_OP_POLL_ADD completes. 133 */ 134 *flags = qatomic_fetch_and(&node->flags, ~(FDMON_IO_URING_PENDING | 135 FDMON_IO_URING_ADD)); 136 return node; 137 } 138 139 static void fdmon_io_uring_update(AioContext *ctx, 140 AioHandler *old_node, 141 AioHandler *new_node) 142 { 143 if (new_node) { 144 enqueue(&ctx->submit_list, new_node, FDMON_IO_URING_ADD); 145 } 146 147 if (old_node) { 148 /* 149 * Deletion is tricky because IORING_OP_POLL_ADD and 150 * IORING_OP_POLL_REMOVE are async. We need to wait for the original 151 * IORING_OP_POLL_ADD to complete before this handler can be freed 152 * safely. 153 * 154 * It's possible that the file descriptor becomes ready and the 155 * IORING_OP_POLL_ADD cqe is enqueued before IORING_OP_POLL_REMOVE is 156 * submitted, too. 157 * 158 * Mark this handler deleted right now but don't place it on 159 * ctx->deleted_aio_handlers yet. Instead, manually fudge the list 160 * entry to make QLIST_IS_INSERTED() think this handler has been 161 * inserted and other code recognizes this AioHandler as deleted. 162 * 163 * Once the original IORING_OP_POLL_ADD completes we enqueue the 164 * handler on the real ctx->deleted_aio_handlers list to be freed. 165 */ 166 assert(!QLIST_IS_INSERTED(old_node, node_deleted)); 167 old_node->node_deleted.le_prev = &old_node->node_deleted.le_next; 168 169 enqueue(&ctx->submit_list, old_node, FDMON_IO_URING_REMOVE); 170 } 171 } 172 173 static void fdmon_io_uring_add_sqe(AioContext *ctx, 174 void (*prep_sqe)(struct io_uring_sqe *sqe, void *opaque), 175 void *opaque, CqeHandler *cqe_handler) 176 { 177 struct io_uring_sqe *sqe = get_sqe(ctx); 178 179 prep_sqe(sqe, opaque); 180 io_uring_sqe_set_data(sqe, cqe_handler); 181 182 trace_fdmon_io_uring_add_sqe(ctx, opaque, sqe->opcode, sqe->fd, sqe->off, 183 cqe_handler); 184 } 185 186 static void fdmon_special_cqe_handler(CqeHandler *cqe_handler) 187 { 188 /* 189 * This is an empty function that is never called. It is used as a function 190 * pointer to distinguish it from ordinary cqe handlers. 191 */ 192 } 193 194 static void add_poll_add_sqe(AioContext *ctx, AioHandler *node) 195 { 196 struct io_uring_sqe *sqe = get_sqe(ctx); 197 int events = poll_events_from_pfd(node->pfd.events); 198 199 io_uring_prep_poll_add(sqe, node->pfd.fd, events); 200 node->internal_cqe_handler.cb = fdmon_special_cqe_handler; 201 io_uring_sqe_set_data(sqe, &node->internal_cqe_handler); 202 } 203 204 static void add_poll_remove_sqe(AioContext *ctx, AioHandler *node) 205 { 206 struct io_uring_sqe *sqe = get_sqe(ctx); 207 CqeHandler *cqe_handler = &node->internal_cqe_handler; 208 209 #ifdef LIBURING_HAVE_DATA64 210 io_uring_prep_poll_remove(sqe, (uintptr_t)cqe_handler); 211 #else 212 io_uring_prep_poll_remove(sqe, cqe_handler); 213 #endif 214 io_uring_sqe_set_data(sqe, NULL); 215 } 216 217 /* Add sqes from ctx->submit_list for submission */ 218 static void fill_sq_ring(AioContext *ctx) 219 { 220 AioHandlerSList submit_list; 221 AioHandler *node; 222 unsigned flags; 223 224 QSLIST_MOVE_ATOMIC(&submit_list, &ctx->submit_list); 225 226 while ((node = dequeue(&submit_list, &flags))) { 227 /* Order matters, just in case both flags were set */ 228 if (flags & FDMON_IO_URING_ADD) { 229 add_poll_add_sqe(ctx, node); 230 } 231 if (flags & FDMON_IO_URING_REMOVE) { 232 add_poll_remove_sqe(ctx, node); 233 } 234 if (flags & FDMON_IO_URING_DELETE_AIO_HANDLER) { 235 /* 236 * process_cqe() sets this flag after ADD and REMOVE have been 237 * cleared. They cannot be set again, so they must be clear. 238 */ 239 assert(!(flags & FDMON_IO_URING_ADD)); 240 assert(!(flags & FDMON_IO_URING_REMOVE)); 241 242 QLIST_INSERT_HEAD_RCU(&ctx->deleted_aio_handlers, node, node_deleted); 243 } 244 } 245 } 246 247 static bool process_cqe_aio_handler(AioContext *ctx, 248 AioHandlerList *ready_list, 249 AioHandler *node, 250 struct io_uring_cqe *cqe) 251 { 252 unsigned flags; 253 254 /* 255 * Deletion can only happen when IORING_OP_POLL_ADD completes. If we race 256 * with enqueue() here then we can safely clear the FDMON_IO_URING_REMOVE 257 * bit before IORING_OP_POLL_REMOVE is submitted. 258 */ 259 flags = qatomic_fetch_and(&node->flags, ~FDMON_IO_URING_REMOVE); 260 if (flags & FDMON_IO_URING_REMOVE) { 261 if (flags & FDMON_IO_URING_PENDING) { 262 /* Still on ctx->submit_list, defer deletion until fill_sq_ring() */ 263 qatomic_or(&node->flags, FDMON_IO_URING_DELETE_AIO_HANDLER); 264 } else { 265 QLIST_INSERT_HEAD_RCU(&ctx->deleted_aio_handlers, node, node_deleted); 266 } 267 return false; 268 } 269 270 aio_add_ready_handler(ready_list, node, pfd_events_from_poll(cqe->res)); 271 272 /* IORING_OP_POLL_ADD is one-shot so we must re-arm it */ 273 add_poll_add_sqe(ctx, node); 274 return true; 275 } 276 277 /* Returns true if a handler became ready */ 278 static bool process_cqe(AioContext *ctx, 279 AioHandlerList *ready_list, 280 struct io_uring_cqe *cqe) 281 { 282 CqeHandler *cqe_handler = io_uring_cqe_get_data(cqe); 283 284 /* poll_timeout and poll_remove have a zero user_data field */ 285 if (!cqe_handler) { 286 return false; 287 } 288 289 /* 290 * Special handling for AioHandler cqes. They need ready_list and have a 291 * return value. 292 */ 293 if (cqe_handler->cb == fdmon_special_cqe_handler) { 294 AioHandler *node = container_of(cqe_handler, AioHandler, 295 internal_cqe_handler); 296 return process_cqe_aio_handler(ctx, ready_list, node, cqe); 297 } 298 299 cqe_handler->cqe = *cqe; 300 301 /* Handlers are invoked later by fdmon_io_uring_dispatch() */ 302 QSIMPLEQ_INSERT_TAIL(&ctx->cqe_handler_ready_list, cqe_handler, next); 303 return false; 304 } 305 306 static int process_cq_ring(AioContext *ctx, AioHandlerList *ready_list) 307 { 308 struct io_uring *ring = &ctx->fdmon_io_uring; 309 struct io_uring_cqe *cqe; 310 unsigned num_cqes = 0; 311 unsigned num_ready = 0; 312 unsigned head; 313 314 #ifdef HAVE_IO_URING_CQ_HAS_OVERFLOW 315 /* If the CQ overflowed then fetch CQEs with a syscall */ 316 if (io_uring_cq_has_overflow(ring)) { 317 io_uring_get_events(ring); 318 } 319 #endif 320 321 io_uring_for_each_cqe(ring, head, cqe) { 322 if (process_cqe(ctx, ready_list, cqe)) { 323 num_ready++; 324 } 325 326 num_cqes++; 327 } 328 329 io_uring_cq_advance(ring, num_cqes); 330 return num_ready; 331 } 332 333 /* This is where SQEs are submitted in the glib event loop */ 334 static void fdmon_io_uring_gsource_prepare(AioContext *ctx) 335 { 336 fill_sq_ring(ctx); 337 if (io_uring_sq_ready(&ctx->fdmon_io_uring)) { 338 while (io_uring_submit(&ctx->fdmon_io_uring) == -EINTR) { 339 /* Keep trying if syscall was interrupted */ 340 } 341 } 342 } 343 344 static bool fdmon_io_uring_gsource_check(AioContext *ctx) 345 { 346 gpointer tag = ctx->io_uring_fd_tag; 347 return g_source_query_unix_fd(&ctx->source, tag) & G_IO_IN; 348 } 349 350 /* Dispatch CQE handlers that are ready */ 351 static bool fdmon_io_uring_dispatch(AioContext *ctx) 352 { 353 CqeHandlerSimpleQ *ready_list = &ctx->cqe_handler_ready_list; 354 bool progress = false; 355 356 /* Handlers may use defer_call() to coalesce frequent operations */ 357 defer_call_begin(); 358 359 while (!QSIMPLEQ_EMPTY(ready_list)) { 360 CqeHandler *cqe_handler = QSIMPLEQ_FIRST(ready_list); 361 362 QSIMPLEQ_REMOVE_HEAD(ready_list, next); 363 364 trace_fdmon_io_uring_cqe_handler(ctx, cqe_handler, 365 cqe_handler->cqe.res); 366 cqe_handler->cb(cqe_handler); 367 progress = true; 368 } 369 370 defer_call_end(); 371 372 return progress; 373 } 374 375 376 /* This is where CQEs are processed in the glib event loop */ 377 static void fdmon_io_uring_gsource_dispatch(AioContext *ctx, 378 AioHandlerList *ready_list) 379 { 380 process_cq_ring(ctx, ready_list); 381 } 382 383 static int fdmon_io_uring_wait(AioContext *ctx, AioHandlerList *ready_list, 384 int64_t timeout) 385 { 386 struct __kernel_timespec ts; 387 unsigned wait_nr = 1; /* block until at least one cqe is ready */ 388 int ret; 389 390 if (timeout == 0) { 391 wait_nr = 0; /* non-blocking */ 392 } else if (timeout > 0) { 393 /* Add a timeout that self-cancels when another cqe becomes ready */ 394 struct io_uring_sqe *sqe; 395 396 ts = (struct __kernel_timespec){ 397 .tv_sec = timeout / NANOSECONDS_PER_SECOND, 398 .tv_nsec = timeout % NANOSECONDS_PER_SECOND, 399 }; 400 401 sqe = get_sqe(ctx); 402 io_uring_prep_timeout(sqe, &ts, 1, 0); 403 io_uring_sqe_set_data(sqe, NULL); 404 } 405 406 fill_sq_ring(ctx); 407 408 /* 409 * Loop to handle signals in both cases: 410 * 1. If no SQEs were submitted, then -EINTR is returned. 411 * 2. If SQEs were submitted then the number of SQEs submitted is returned 412 * rather than -EINTR. 413 */ 414 do { 415 ret = io_uring_submit_and_wait(&ctx->fdmon_io_uring, wait_nr); 416 } while (ret == -EINTR || 417 (ret >= 0 && wait_nr > io_uring_cq_ready(&ctx->fdmon_io_uring))); 418 419 assert(ret >= 0); 420 421 return process_cq_ring(ctx, ready_list); 422 } 423 424 static bool fdmon_io_uring_need_wait(AioContext *ctx) 425 { 426 /* Have io_uring events completed? */ 427 if (io_uring_cq_ready(&ctx->fdmon_io_uring)) { 428 return true; 429 } 430 431 /* Are there pending sqes to submit? */ 432 if (io_uring_sq_ready(&ctx->fdmon_io_uring)) { 433 return true; 434 } 435 436 /* Do we need to process AioHandlers for io_uring changes? */ 437 if (!QSLIST_EMPTY_RCU(&ctx->submit_list)) { 438 return true; 439 } 440 441 return false; 442 } 443 444 static const FDMonOps fdmon_io_uring_ops = { 445 .update = fdmon_io_uring_update, 446 .wait = fdmon_io_uring_wait, 447 .need_wait = fdmon_io_uring_need_wait, 448 .dispatch = fdmon_io_uring_dispatch, 449 .gsource_prepare = fdmon_io_uring_gsource_prepare, 450 .gsource_check = fdmon_io_uring_gsource_check, 451 .gsource_dispatch = fdmon_io_uring_gsource_dispatch, 452 .add_sqe = fdmon_io_uring_add_sqe, 453 }; 454 455 bool fdmon_io_uring_setup(AioContext *ctx, Error **errp) 456 { 457 int ret; 458 459 ctx->io_uring_fd_tag = NULL; 460 461 ret = io_uring_queue_init(FDMON_IO_URING_ENTRIES, &ctx->fdmon_io_uring, 0); 462 if (ret != 0) { 463 error_setg_errno(errp, -ret, "Failed to initialize io_uring"); 464 return false; 465 } 466 467 QSLIST_INIT(&ctx->submit_list); 468 QSIMPLEQ_INIT(&ctx->cqe_handler_ready_list); 469 ctx->fdmon_ops = &fdmon_io_uring_ops; 470 ctx->io_uring_fd_tag = g_source_add_unix_fd(&ctx->source, 471 ctx->fdmon_io_uring.ring_fd, G_IO_IN); 472 return true; 473 } 474 475 void fdmon_io_uring_destroy(AioContext *ctx) 476 { 477 AioHandler *node; 478 479 if (ctx->fdmon_ops != &fdmon_io_uring_ops) { 480 return; 481 } 482 483 io_uring_queue_exit(&ctx->fdmon_io_uring); 484 485 /* Move handlers due to be removed onto the deleted list */ 486 while ((node = QSLIST_FIRST_RCU(&ctx->submit_list))) { 487 unsigned flags = qatomic_fetch_and(&node->flags, 488 ~(FDMON_IO_URING_PENDING | 489 FDMON_IO_URING_ADD | 490 FDMON_IO_URING_REMOVE | 491 FDMON_IO_URING_DELETE_AIO_HANDLER)); 492 493 if ((flags & FDMON_IO_URING_REMOVE) || 494 (flags & FDMON_IO_URING_DELETE_AIO_HANDLER)) { 495 QLIST_INSERT_HEAD_RCU(&ctx->deleted_aio_handlers, 496 node, node_deleted); 497 } 498 499 QSLIST_REMOVE_HEAD_RCU(&ctx->submit_list, node_submitted); 500 } 501 502 g_source_remove_unix_fd(&ctx->source, ctx->io_uring_fd_tag); 503 ctx->io_uring_fd_tag = NULL; 504 505 assert(QSIMPLEQ_EMPTY(&ctx->cqe_handler_ready_list)); 506 507 qemu_lockcnt_lock(&ctx->list_lock); 508 fdmon_poll_downgrade(ctx); 509 qemu_lockcnt_unlock(&ctx->list_lock); 510 } 511