1 /* SPDX-License-Identifier: GPL-2.0-or-later */ 2 /* 3 * Linux io_uring file descriptor monitoring 4 * 5 * The Linux io_uring API supports file descriptor monitoring with a few 6 * advantages over existing APIs like poll(2) and epoll(7): 7 * 8 * 1. Userspace polling of events is possible because the completion queue (cq 9 * ring) is shared between the kernel and userspace. This allows 10 * applications that rely on userspace polling to also monitor file 11 * descriptors in the same userspace polling loop. 12 * 13 * 2. Submission and completion is batched and done together in a single system 14 * call. This minimizes the number of system calls. 15 * 16 * 3. File descriptor monitoring is O(1) like epoll(7) so it scales better than 17 * poll(2). 18 * 19 * 4. Nanosecond timeouts are supported so it requires fewer syscalls than 20 * epoll(7). 21 * 22 * This code only monitors file descriptors and does not do asynchronous disk 23 * I/O. Implementing disk I/O efficiently has other requirements and should 24 * use a separate io_uring so it does not make sense to unify the code. 25 * 26 * File descriptor monitoring is implemented using the following operations: 27 * 28 * 1. IORING_OP_POLL_ADD - adds a file descriptor to be monitored. 29 * 2. IORING_OP_POLL_REMOVE - removes a file descriptor being monitored. When 30 * the poll mask changes for a file descriptor it is first removed and then 31 * re-added with the new poll mask, so this operation is also used as part 32 * of modifying an existing monitored file descriptor. 33 * 3. IORING_OP_TIMEOUT - added every time a blocking syscall is made to wait 34 * for events. This operation self-cancels if another event completes 35 * before the timeout. 36 * 37 * io_uring calls the submission queue the "sq ring" and the completion queue 38 * the "cq ring". Ring entries are called "sqe" and "cqe", respectively. 39 * 40 * The code is structured so that sq/cq rings are only modified within 41 * fdmon_io_uring_wait(). Changes to AioHandlers are made by enqueuing them on 42 * ctx->submit_list so that fdmon_io_uring_wait() can submit IORING_OP_POLL_ADD 43 * and/or IORING_OP_POLL_REMOVE sqes for them. 44 */ 45 46 #include "qemu/osdep.h" 47 #include <poll.h> 48 #include "qemu/rcu_queue.h" 49 #include "aio-posix.h" 50 51 enum { 52 FDMON_IO_URING_ENTRIES = 128, /* sq/cq ring size */ 53 54 /* AioHandler::flags */ 55 FDMON_IO_URING_PENDING = (1 << 0), 56 FDMON_IO_URING_ADD = (1 << 1), 57 FDMON_IO_URING_REMOVE = (1 << 2), 58 }; 59 60 static inline int poll_events_from_pfd(int pfd_events) 61 { 62 return (pfd_events & G_IO_IN ? POLLIN : 0) | 63 (pfd_events & G_IO_OUT ? POLLOUT : 0) | 64 (pfd_events & G_IO_HUP ? POLLHUP : 0) | 65 (pfd_events & G_IO_ERR ? POLLERR : 0); 66 } 67 68 static inline int pfd_events_from_poll(int poll_events) 69 { 70 return (poll_events & POLLIN ? G_IO_IN : 0) | 71 (poll_events & POLLOUT ? G_IO_OUT : 0) | 72 (poll_events & POLLHUP ? G_IO_HUP : 0) | 73 (poll_events & POLLERR ? G_IO_ERR : 0); 74 } 75 76 /* 77 * Returns an sqe for submitting a request. Only be called within 78 * fdmon_io_uring_wait(). 79 */ 80 static struct io_uring_sqe *get_sqe(AioContext *ctx) 81 { 82 struct io_uring *ring = &ctx->fdmon_io_uring; 83 struct io_uring_sqe *sqe = io_uring_get_sqe(ring); 84 int ret; 85 86 if (likely(sqe)) { 87 return sqe; 88 } 89 90 /* No free sqes left, submit pending sqes first */ 91 do { 92 ret = io_uring_submit(ring); 93 } while (ret == -EINTR); 94 95 assert(ret > 1); 96 sqe = io_uring_get_sqe(ring); 97 assert(sqe); 98 return sqe; 99 } 100 101 /* Atomically enqueue an AioHandler for sq ring submission */ 102 static void enqueue(AioHandlerSList *head, AioHandler *node, unsigned flags) 103 { 104 unsigned old_flags; 105 106 old_flags = qatomic_fetch_or(&node->flags, FDMON_IO_URING_PENDING | flags); 107 if (!(old_flags & FDMON_IO_URING_PENDING)) { 108 QSLIST_INSERT_HEAD_ATOMIC(head, node, node_submitted); 109 } 110 } 111 112 /* Dequeue an AioHandler for sq ring submission. Called by fill_sq_ring(). */ 113 static AioHandler *dequeue(AioHandlerSList *head, unsigned *flags) 114 { 115 AioHandler *node = QSLIST_FIRST(head); 116 117 if (!node) { 118 return NULL; 119 } 120 121 /* Doesn't need to be atomic since fill_sq_ring() moves the list */ 122 QSLIST_REMOVE_HEAD(head, node_submitted); 123 124 /* 125 * Don't clear FDMON_IO_URING_REMOVE. It's sticky so it can serve two 126 * purposes: telling fill_sq_ring() to submit IORING_OP_POLL_REMOVE and 127 * telling process_cqe() to delete the AioHandler when its 128 * IORING_OP_POLL_ADD completes. 129 */ 130 *flags = qatomic_fetch_and(&node->flags, ~(FDMON_IO_URING_PENDING | 131 FDMON_IO_URING_ADD)); 132 return node; 133 } 134 135 static void fdmon_io_uring_update(AioContext *ctx, 136 AioHandler *old_node, 137 AioHandler *new_node) 138 { 139 if (new_node) { 140 enqueue(&ctx->submit_list, new_node, FDMON_IO_URING_ADD); 141 } 142 143 if (old_node) { 144 /* 145 * Deletion is tricky because IORING_OP_POLL_ADD and 146 * IORING_OP_POLL_REMOVE are async. We need to wait for the original 147 * IORING_OP_POLL_ADD to complete before this handler can be freed 148 * safely. 149 * 150 * It's possible that the file descriptor becomes ready and the 151 * IORING_OP_POLL_ADD cqe is enqueued before IORING_OP_POLL_REMOVE is 152 * submitted, too. 153 * 154 * Mark this handler deleted right now but don't place it on 155 * ctx->deleted_aio_handlers yet. Instead, manually fudge the list 156 * entry to make QLIST_IS_INSERTED() think this handler has been 157 * inserted and other code recognizes this AioHandler as deleted. 158 * 159 * Once the original IORING_OP_POLL_ADD completes we enqueue the 160 * handler on the real ctx->deleted_aio_handlers list to be freed. 161 */ 162 assert(!QLIST_IS_INSERTED(old_node, node_deleted)); 163 old_node->node_deleted.le_prev = &old_node->node_deleted.le_next; 164 165 enqueue(&ctx->submit_list, old_node, FDMON_IO_URING_REMOVE); 166 } 167 } 168 169 static void add_poll_add_sqe(AioContext *ctx, AioHandler *node) 170 { 171 struct io_uring_sqe *sqe = get_sqe(ctx); 172 int events = poll_events_from_pfd(node->pfd.events); 173 174 io_uring_prep_poll_add(sqe, node->pfd.fd, events); 175 io_uring_sqe_set_data(sqe, node); 176 } 177 178 static void add_poll_remove_sqe(AioContext *ctx, AioHandler *node) 179 { 180 struct io_uring_sqe *sqe = get_sqe(ctx); 181 182 #ifdef LIBURING_HAVE_DATA64 183 io_uring_prep_poll_remove(sqe, (uintptr_t)node); 184 #else 185 io_uring_prep_poll_remove(sqe, node); 186 #endif 187 io_uring_sqe_set_data(sqe, NULL); 188 } 189 190 /* Add a timeout that self-cancels when another cqe becomes ready */ 191 static void add_timeout_sqe(AioContext *ctx, int64_t ns) 192 { 193 struct io_uring_sqe *sqe; 194 struct __kernel_timespec ts = { 195 .tv_sec = ns / NANOSECONDS_PER_SECOND, 196 .tv_nsec = ns % NANOSECONDS_PER_SECOND, 197 }; 198 199 sqe = get_sqe(ctx); 200 io_uring_prep_timeout(sqe, &ts, 1, 0); 201 io_uring_sqe_set_data(sqe, NULL); 202 } 203 204 /* Add sqes from ctx->submit_list for submission */ 205 static void fill_sq_ring(AioContext *ctx) 206 { 207 AioHandlerSList submit_list; 208 AioHandler *node; 209 unsigned flags; 210 211 QSLIST_MOVE_ATOMIC(&submit_list, &ctx->submit_list); 212 213 while ((node = dequeue(&submit_list, &flags))) { 214 /* Order matters, just in case both flags were set */ 215 if (flags & FDMON_IO_URING_ADD) { 216 add_poll_add_sqe(ctx, node); 217 } 218 if (flags & FDMON_IO_URING_REMOVE) { 219 add_poll_remove_sqe(ctx, node); 220 } 221 } 222 } 223 224 /* Returns true if a handler became ready */ 225 static bool process_cqe(AioContext *ctx, 226 AioHandlerList *ready_list, 227 struct io_uring_cqe *cqe) 228 { 229 AioHandler *node = io_uring_cqe_get_data(cqe); 230 unsigned flags; 231 232 /* poll_timeout and poll_remove have a zero user_data field */ 233 if (!node) { 234 return false; 235 } 236 237 /* 238 * Deletion can only happen when IORING_OP_POLL_ADD completes. If we race 239 * with enqueue() here then we can safely clear the FDMON_IO_URING_REMOVE 240 * bit before IORING_OP_POLL_REMOVE is submitted. 241 */ 242 flags = qatomic_fetch_and(&node->flags, ~FDMON_IO_URING_REMOVE); 243 if (flags & FDMON_IO_URING_REMOVE) { 244 QLIST_INSERT_HEAD_RCU(&ctx->deleted_aio_handlers, node, node_deleted); 245 return false; 246 } 247 248 aio_add_ready_handler(ready_list, node, pfd_events_from_poll(cqe->res)); 249 250 /* IORING_OP_POLL_ADD is one-shot so we must re-arm it */ 251 add_poll_add_sqe(ctx, node); 252 return true; 253 } 254 255 static int process_cq_ring(AioContext *ctx, AioHandlerList *ready_list) 256 { 257 struct io_uring *ring = &ctx->fdmon_io_uring; 258 struct io_uring_cqe *cqe; 259 unsigned num_cqes = 0; 260 unsigned num_ready = 0; 261 unsigned head; 262 263 io_uring_for_each_cqe(ring, head, cqe) { 264 if (process_cqe(ctx, ready_list, cqe)) { 265 num_ready++; 266 } 267 268 num_cqes++; 269 } 270 271 io_uring_cq_advance(ring, num_cqes); 272 return num_ready; 273 } 274 275 static int fdmon_io_uring_wait(AioContext *ctx, AioHandlerList *ready_list, 276 int64_t timeout) 277 { 278 unsigned wait_nr = 1; /* block until at least one cqe is ready */ 279 int ret; 280 281 if (timeout == 0) { 282 wait_nr = 0; /* non-blocking */ 283 } else if (timeout > 0) { 284 add_timeout_sqe(ctx, timeout); 285 } 286 287 fill_sq_ring(ctx); 288 289 do { 290 ret = io_uring_submit_and_wait(&ctx->fdmon_io_uring, wait_nr); 291 } while (ret == -EINTR); 292 293 assert(ret >= 0); 294 295 return process_cq_ring(ctx, ready_list); 296 } 297 298 static bool fdmon_io_uring_need_wait(AioContext *ctx) 299 { 300 /* Have io_uring events completed? */ 301 if (io_uring_cq_ready(&ctx->fdmon_io_uring)) { 302 return true; 303 } 304 305 /* Are there pending sqes to submit? */ 306 if (io_uring_sq_ready(&ctx->fdmon_io_uring)) { 307 return true; 308 } 309 310 /* Do we need to process AioHandlers for io_uring changes? */ 311 if (!QSLIST_EMPTY_RCU(&ctx->submit_list)) { 312 return true; 313 } 314 315 return false; 316 } 317 318 static const FDMonOps fdmon_io_uring_ops = { 319 .update = fdmon_io_uring_update, 320 .wait = fdmon_io_uring_wait, 321 .need_wait = fdmon_io_uring_need_wait, 322 }; 323 324 bool fdmon_io_uring_setup(AioContext *ctx) 325 { 326 int ret; 327 328 ret = io_uring_queue_init(FDMON_IO_URING_ENTRIES, &ctx->fdmon_io_uring, 0); 329 if (ret != 0) { 330 return false; 331 } 332 333 QSLIST_INIT(&ctx->submit_list); 334 ctx->fdmon_ops = &fdmon_io_uring_ops; 335 return true; 336 } 337 338 void fdmon_io_uring_destroy(AioContext *ctx) 339 { 340 if (ctx->fdmon_ops == &fdmon_io_uring_ops) { 341 AioHandler *node; 342 343 io_uring_queue_exit(&ctx->fdmon_io_uring); 344 345 /* Move handlers due to be removed onto the deleted list */ 346 while ((node = QSLIST_FIRST_RCU(&ctx->submit_list))) { 347 unsigned flags = qatomic_fetch_and(&node->flags, 348 ~(FDMON_IO_URING_PENDING | 349 FDMON_IO_URING_ADD | 350 FDMON_IO_URING_REMOVE)); 351 352 if (flags & FDMON_IO_URING_REMOVE) { 353 QLIST_INSERT_HEAD_RCU(&ctx->deleted_aio_handlers, node, node_deleted); 354 } 355 356 QSLIST_REMOVE_HEAD_RCU(&ctx->submit_list, node_submitted); 357 } 358 359 ctx->fdmon_ops = &fdmon_poll_ops; 360 } 361 } 362