1 /* 2 * Linux io_uring support. 3 * 4 * Copyright (C) 2009 IBM, Corp. 5 * Copyright (C) 2009 Red Hat, Inc. 6 * Copyright (C) 2019 Aarushi Mehta 7 * 8 * This work is licensed under the terms of the GNU GPL, version 2 or later. 9 * See the COPYING file in the top-level directory. 10 */ 11 #include "qemu/osdep.h" 12 #include <liburing.h> 13 #include "block/aio.h" 14 #include "qemu/queue.h" 15 #include "block/block.h" 16 #include "block/raw-aio.h" 17 #include "qemu/coroutine.h" 18 #include "qapi/error.h" 19 #include "trace.h" 20 21 /* io_uring ring size */ 22 #define MAX_ENTRIES 128 23 24 typedef struct LuringAIOCB { 25 Coroutine *co; 26 struct io_uring_sqe sqeq; 27 ssize_t ret; 28 QEMUIOVector *qiov; 29 bool is_read; 30 QSIMPLEQ_ENTRY(LuringAIOCB) next; 31 32 /* 33 * Buffered reads may require resubmission, see 34 * luring_resubmit_short_read(). 35 */ 36 int total_read; 37 QEMUIOVector resubmit_qiov; 38 } LuringAIOCB; 39 40 typedef struct LuringQueue { 41 int plugged; 42 unsigned int in_queue; 43 unsigned int in_flight; 44 bool blocked; 45 QSIMPLEQ_HEAD(, LuringAIOCB) submit_queue; 46 } LuringQueue; 47 48 typedef struct LuringState { 49 AioContext *aio_context; 50 51 struct io_uring ring; 52 53 /* io queue for submit at batch. Protected by AioContext lock. */ 54 LuringQueue io_q; 55 56 /* I/O completion processing. Only runs in I/O thread. */ 57 QEMUBH *completion_bh; 58 } LuringState; 59 60 /** 61 * luring_resubmit: 62 * 63 * Resubmit a request by appending it to submit_queue. The caller must ensure 64 * that ioq_submit() is called later so that submit_queue requests are started. 65 */ 66 static void luring_resubmit(LuringState *s, LuringAIOCB *luringcb) 67 { 68 QSIMPLEQ_INSERT_TAIL(&s->io_q.submit_queue, luringcb, next); 69 s->io_q.in_queue++; 70 } 71 72 /** 73 * luring_resubmit_short_read: 74 * 75 * Before Linux commit 9d93a3f5a0c ("io_uring: punt short reads to async 76 * context") a buffered I/O request with the start of the file range in the 77 * page cache could result in a short read. Applications need to resubmit the 78 * remaining read request. 79 * 80 * This is a slow path but recent kernels never take it. 81 */ 82 static void luring_resubmit_short_read(LuringState *s, LuringAIOCB *luringcb, 83 int nread) 84 { 85 QEMUIOVector *resubmit_qiov; 86 size_t remaining; 87 88 trace_luring_resubmit_short_read(s, luringcb, nread); 89 90 /* Update read position */ 91 luringcb->total_read = nread; 92 remaining = luringcb->qiov->size - luringcb->total_read; 93 94 /* Shorten qiov */ 95 resubmit_qiov = &luringcb->resubmit_qiov; 96 if (resubmit_qiov->iov == NULL) { 97 qemu_iovec_init(resubmit_qiov, luringcb->qiov->niov); 98 } else { 99 qemu_iovec_reset(resubmit_qiov); 100 } 101 qemu_iovec_concat(resubmit_qiov, luringcb->qiov, luringcb->total_read, 102 remaining); 103 104 /* Update sqe */ 105 luringcb->sqeq.off = nread; 106 luringcb->sqeq.addr = (__u64)(uintptr_t)luringcb->resubmit_qiov.iov; 107 luringcb->sqeq.len = luringcb->resubmit_qiov.niov; 108 109 luring_resubmit(s, luringcb); 110 } 111 112 /** 113 * luring_process_completions: 114 * @s: AIO state 115 * 116 * Fetches completed I/O requests, consumes cqes and invokes their callbacks 117 * The function is somewhat tricky because it supports nested event loops, for 118 * example when a request callback invokes aio_poll(). 119 * 120 * Function schedules BH completion so it can be called again in a nested 121 * event loop. When there are no events left to complete the BH is being 122 * canceled. 123 * 124 */ 125 static void luring_process_completions(LuringState *s) 126 { 127 struct io_uring_cqe *cqes; 128 int total_bytes; 129 /* 130 * Request completion callbacks can run the nested event loop. 131 * Schedule ourselves so the nested event loop will "see" remaining 132 * completed requests and process them. Without this, completion 133 * callbacks that wait for other requests using a nested event loop 134 * would hang forever. 135 * 136 * This workaround is needed because io_uring uses poll_wait, which 137 * is woken up when new events are added to the uring, thus polling on 138 * the same uring fd will block unless more events are received. 139 * 140 * Other leaf block drivers (drivers that access the data themselves) 141 * are networking based, so they poll sockets for data and run the 142 * correct coroutine. 143 */ 144 qemu_bh_schedule(s->completion_bh); 145 146 while (io_uring_peek_cqe(&s->ring, &cqes) == 0) { 147 LuringAIOCB *luringcb; 148 int ret; 149 150 if (!cqes) { 151 break; 152 } 153 154 luringcb = io_uring_cqe_get_data(cqes); 155 ret = cqes->res; 156 io_uring_cqe_seen(&s->ring, cqes); 157 cqes = NULL; 158 159 /* Change counters one-by-one because we can be nested. */ 160 s->io_q.in_flight--; 161 trace_luring_process_completion(s, luringcb, ret); 162 163 /* total_read is non-zero only for resubmitted read requests */ 164 total_bytes = ret + luringcb->total_read; 165 166 if (ret < 0) { 167 /* 168 * Only writev/readv/fsync requests on regular files or host block 169 * devices are submitted. Therefore -EAGAIN is not expected but it's 170 * known to happen sometimes with Linux SCSI. Submit again and hope 171 * the request completes successfully. 172 * 173 * For more information, see: 174 * https://lore.kernel.org/io-uring/20210727165811.284510-3-axboe@kernel.dk/T/#u 175 * 176 * If the code is changed to submit other types of requests in the 177 * future, then this workaround may need to be extended to deal with 178 * genuine -EAGAIN results that should not be resubmitted 179 * immediately. 180 */ 181 if (ret == -EINTR || ret == -EAGAIN) { 182 luring_resubmit(s, luringcb); 183 continue; 184 } 185 } else if (!luringcb->qiov) { 186 goto end; 187 } else if (total_bytes == luringcb->qiov->size) { 188 ret = 0; 189 /* Only read/write */ 190 } else { 191 /* Short Read/Write */ 192 if (luringcb->is_read) { 193 if (ret > 0) { 194 luring_resubmit_short_read(s, luringcb, ret); 195 continue; 196 } else { 197 /* Pad with zeroes */ 198 qemu_iovec_memset(luringcb->qiov, total_bytes, 0, 199 luringcb->qiov->size - total_bytes); 200 ret = 0; 201 } 202 } else { 203 ret = -ENOSPC; 204 } 205 } 206 end: 207 luringcb->ret = ret; 208 qemu_iovec_destroy(&luringcb->resubmit_qiov); 209 210 /* 211 * If the coroutine is already entered it must be in ioq_submit() 212 * and will notice luringcb->ret has been filled in when it 213 * eventually runs later. Coroutines cannot be entered recursively 214 * so avoid doing that! 215 */ 216 if (!qemu_coroutine_entered(luringcb->co)) { 217 aio_co_wake(luringcb->co); 218 } 219 } 220 qemu_bh_cancel(s->completion_bh); 221 } 222 223 static int ioq_submit(LuringState *s) 224 { 225 int ret = 0; 226 LuringAIOCB *luringcb, *luringcb_next; 227 228 while (s->io_q.in_queue > 0) { 229 /* 230 * Try to fetch sqes from the ring for requests waiting in 231 * the overflow queue 232 */ 233 QSIMPLEQ_FOREACH_SAFE(luringcb, &s->io_q.submit_queue, next, 234 luringcb_next) { 235 struct io_uring_sqe *sqes = io_uring_get_sqe(&s->ring); 236 if (!sqes) { 237 break; 238 } 239 /* Prep sqe for submission */ 240 *sqes = luringcb->sqeq; 241 QSIMPLEQ_REMOVE_HEAD(&s->io_q.submit_queue, next); 242 } 243 ret = io_uring_submit(&s->ring); 244 trace_luring_io_uring_submit(s, ret); 245 /* Prevent infinite loop if submission is refused */ 246 if (ret <= 0) { 247 if (ret == -EAGAIN || ret == -EINTR) { 248 continue; 249 } 250 break; 251 } 252 s->io_q.in_flight += ret; 253 s->io_q.in_queue -= ret; 254 } 255 s->io_q.blocked = (s->io_q.in_queue > 0); 256 257 if (s->io_q.in_flight) { 258 /* 259 * We can try to complete something just right away if there are 260 * still requests in-flight. 261 */ 262 luring_process_completions(s); 263 } 264 return ret; 265 } 266 267 static void luring_process_completions_and_submit(LuringState *s) 268 { 269 aio_context_acquire(s->aio_context); 270 luring_process_completions(s); 271 272 if (!s->io_q.plugged && s->io_q.in_queue > 0) { 273 ioq_submit(s); 274 } 275 aio_context_release(s->aio_context); 276 } 277 278 static void qemu_luring_completion_bh(void *opaque) 279 { 280 LuringState *s = opaque; 281 luring_process_completions_and_submit(s); 282 } 283 284 static void qemu_luring_completion_cb(void *opaque) 285 { 286 LuringState *s = opaque; 287 luring_process_completions_and_submit(s); 288 } 289 290 static bool qemu_luring_poll_cb(void *opaque) 291 { 292 LuringState *s = opaque; 293 294 return io_uring_cq_ready(&s->ring); 295 } 296 297 static void qemu_luring_poll_ready(void *opaque) 298 { 299 LuringState *s = opaque; 300 301 luring_process_completions_and_submit(s); 302 } 303 304 static void ioq_init(LuringQueue *io_q) 305 { 306 QSIMPLEQ_INIT(&io_q->submit_queue); 307 io_q->plugged = 0; 308 io_q->in_queue = 0; 309 io_q->in_flight = 0; 310 io_q->blocked = false; 311 } 312 313 void luring_io_plug(BlockDriverState *bs, LuringState *s) 314 { 315 trace_luring_io_plug(s); 316 s->io_q.plugged++; 317 } 318 319 void luring_io_unplug(BlockDriverState *bs, LuringState *s) 320 { 321 assert(s->io_q.plugged); 322 trace_luring_io_unplug(s, s->io_q.blocked, s->io_q.plugged, 323 s->io_q.in_queue, s->io_q.in_flight); 324 if (--s->io_q.plugged == 0 && 325 !s->io_q.blocked && s->io_q.in_queue > 0) { 326 ioq_submit(s); 327 } 328 } 329 330 /** 331 * luring_do_submit: 332 * @fd: file descriptor for I/O 333 * @luringcb: AIO control block 334 * @s: AIO state 335 * @offset: offset for request 336 * @type: type of request 337 * 338 * Fetches sqes from ring, adds to pending queue and preps them 339 * 340 */ 341 static int luring_do_submit(int fd, LuringAIOCB *luringcb, LuringState *s, 342 uint64_t offset, int type) 343 { 344 int ret; 345 struct io_uring_sqe *sqes = &luringcb->sqeq; 346 347 switch (type) { 348 case QEMU_AIO_WRITE: 349 io_uring_prep_writev(sqes, fd, luringcb->qiov->iov, 350 luringcb->qiov->niov, offset); 351 break; 352 case QEMU_AIO_READ: 353 io_uring_prep_readv(sqes, fd, luringcb->qiov->iov, 354 luringcb->qiov->niov, offset); 355 break; 356 case QEMU_AIO_FLUSH: 357 io_uring_prep_fsync(sqes, fd, IORING_FSYNC_DATASYNC); 358 break; 359 default: 360 fprintf(stderr, "%s: invalid AIO request type, aborting 0x%x.\n", 361 __func__, type); 362 abort(); 363 } 364 io_uring_sqe_set_data(sqes, luringcb); 365 366 QSIMPLEQ_INSERT_TAIL(&s->io_q.submit_queue, luringcb, next); 367 s->io_q.in_queue++; 368 trace_luring_do_submit(s, s->io_q.blocked, s->io_q.plugged, 369 s->io_q.in_queue, s->io_q.in_flight); 370 if (!s->io_q.blocked && 371 (!s->io_q.plugged || 372 s->io_q.in_flight + s->io_q.in_queue >= MAX_ENTRIES)) { 373 ret = ioq_submit(s); 374 trace_luring_do_submit_done(s, ret); 375 return ret; 376 } 377 return 0; 378 } 379 380 int coroutine_fn luring_co_submit(BlockDriverState *bs, LuringState *s, int fd, 381 uint64_t offset, QEMUIOVector *qiov, int type) 382 { 383 int ret; 384 LuringAIOCB luringcb = { 385 .co = qemu_coroutine_self(), 386 .ret = -EINPROGRESS, 387 .qiov = qiov, 388 .is_read = (type == QEMU_AIO_READ), 389 }; 390 trace_luring_co_submit(bs, s, &luringcb, fd, offset, qiov ? qiov->size : 0, 391 type); 392 ret = luring_do_submit(fd, &luringcb, s, offset, type); 393 394 if (ret < 0) { 395 return ret; 396 } 397 398 if (luringcb.ret == -EINPROGRESS) { 399 qemu_coroutine_yield(); 400 } 401 return luringcb.ret; 402 } 403 404 void luring_detach_aio_context(LuringState *s, AioContext *old_context) 405 { 406 aio_set_fd_handler(old_context, s->ring.ring_fd, false, 407 NULL, NULL, NULL, NULL, s); 408 qemu_bh_delete(s->completion_bh); 409 s->aio_context = NULL; 410 } 411 412 void luring_attach_aio_context(LuringState *s, AioContext *new_context) 413 { 414 s->aio_context = new_context; 415 s->completion_bh = aio_bh_new(new_context, qemu_luring_completion_bh, s); 416 aio_set_fd_handler(s->aio_context, s->ring.ring_fd, false, 417 qemu_luring_completion_cb, NULL, 418 qemu_luring_poll_cb, qemu_luring_poll_ready, s); 419 } 420 421 LuringState *luring_init(Error **errp) 422 { 423 int rc; 424 LuringState *s = g_new0(LuringState, 1); 425 struct io_uring *ring = &s->ring; 426 427 trace_luring_init_state(s, sizeof(*s)); 428 429 rc = io_uring_queue_init(MAX_ENTRIES, ring, 0); 430 if (rc < 0) { 431 error_setg_errno(errp, errno, "failed to init linux io_uring ring"); 432 g_free(s); 433 return NULL; 434 } 435 436 ioq_init(&s->io_q); 437 return s; 438 439 } 440 441 void luring_cleanup(LuringState *s) 442 { 443 io_uring_queue_exit(&s->ring); 444 trace_luring_cleanup_state(s); 445 g_free(s); 446 } 447