1 /* 2 * Linux io_uring support. 3 * 4 * Copyright (C) 2009 IBM, Corp. 5 * Copyright (C) 2009 Red Hat, Inc. 6 * Copyright (C) 2019 Aarushi Mehta 7 * 8 * This work is licensed under the terms of the GNU GPL, version 2 or later. 9 * See the COPYING file in the top-level directory. 10 */ 11 #include "qemu/osdep.h" 12 #include <liburing.h> 13 #include "block/aio.h" 14 #include "qemu/queue.h" 15 #include "block/block.h" 16 #include "block/raw-aio.h" 17 #include "qemu/coroutine.h" 18 #include "qapi/error.h" 19 #include "trace.h" 20 21 /* io_uring ring size */ 22 #define MAX_ENTRIES 128 23 24 typedef struct LuringAIOCB { 25 Coroutine *co; 26 struct io_uring_sqe sqeq; 27 ssize_t ret; 28 QEMUIOVector *qiov; 29 bool is_read; 30 QSIMPLEQ_ENTRY(LuringAIOCB) next; 31 32 /* 33 * Buffered reads may require resubmission, see 34 * luring_resubmit_short_read(). 35 */ 36 int total_read; 37 QEMUIOVector resubmit_qiov; 38 } LuringAIOCB; 39 40 typedef struct LuringQueue { 41 int plugged; 42 unsigned int in_queue; 43 unsigned int in_flight; 44 bool blocked; 45 QSIMPLEQ_HEAD(, LuringAIOCB) submit_queue; 46 } LuringQueue; 47 48 typedef struct LuringState { 49 AioContext *aio_context; 50 51 struct io_uring ring; 52 53 /* io queue for submit at batch. Protected by AioContext lock. */ 54 LuringQueue io_q; 55 56 /* I/O completion processing. Only runs in I/O thread. */ 57 QEMUBH *completion_bh; 58 } LuringState; 59 60 /** 61 * luring_resubmit: 62 * 63 * Resubmit a request by appending it to submit_queue. The caller must ensure 64 * that ioq_submit() is called later so that submit_queue requests are started. 65 */ 66 static void luring_resubmit(LuringState *s, LuringAIOCB *luringcb) 67 { 68 QSIMPLEQ_INSERT_TAIL(&s->io_q.submit_queue, luringcb, next); 69 s->io_q.in_queue++; 70 } 71 72 /** 73 * luring_resubmit_short_read: 74 * 75 * Short reads are rare but may occur. The remaining read request needs to be 76 * resubmitted. 77 */ 78 static void luring_resubmit_short_read(LuringState *s, LuringAIOCB *luringcb, 79 int nread) 80 { 81 QEMUIOVector *resubmit_qiov; 82 size_t remaining; 83 84 trace_luring_resubmit_short_read(s, luringcb, nread); 85 86 /* Update read position */ 87 luringcb->total_read += nread; 88 remaining = luringcb->qiov->size - luringcb->total_read; 89 90 /* Shorten qiov */ 91 resubmit_qiov = &luringcb->resubmit_qiov; 92 if (resubmit_qiov->iov == NULL) { 93 qemu_iovec_init(resubmit_qiov, luringcb->qiov->niov); 94 } else { 95 qemu_iovec_reset(resubmit_qiov); 96 } 97 qemu_iovec_concat(resubmit_qiov, luringcb->qiov, luringcb->total_read, 98 remaining); 99 100 /* Update sqe */ 101 luringcb->sqeq.off += nread; 102 luringcb->sqeq.addr = (__u64)(uintptr_t)luringcb->resubmit_qiov.iov; 103 luringcb->sqeq.len = luringcb->resubmit_qiov.niov; 104 105 luring_resubmit(s, luringcb); 106 } 107 108 /** 109 * luring_process_completions: 110 * @s: AIO state 111 * 112 * Fetches completed I/O requests, consumes cqes and invokes their callbacks 113 * The function is somewhat tricky because it supports nested event loops, for 114 * example when a request callback invokes aio_poll(). 115 * 116 * Function schedules BH completion so it can be called again in a nested 117 * event loop. When there are no events left to complete the BH is being 118 * canceled. 119 * 120 */ 121 static void luring_process_completions(LuringState *s) 122 { 123 struct io_uring_cqe *cqes; 124 int total_bytes; 125 /* 126 * Request completion callbacks can run the nested event loop. 127 * Schedule ourselves so the nested event loop will "see" remaining 128 * completed requests and process them. Without this, completion 129 * callbacks that wait for other requests using a nested event loop 130 * would hang forever. 131 * 132 * This workaround is needed because io_uring uses poll_wait, which 133 * is woken up when new events are added to the uring, thus polling on 134 * the same uring fd will block unless more events are received. 135 * 136 * Other leaf block drivers (drivers that access the data themselves) 137 * are networking based, so they poll sockets for data and run the 138 * correct coroutine. 139 */ 140 qemu_bh_schedule(s->completion_bh); 141 142 while (io_uring_peek_cqe(&s->ring, &cqes) == 0) { 143 LuringAIOCB *luringcb; 144 int ret; 145 146 if (!cqes) { 147 break; 148 } 149 150 luringcb = io_uring_cqe_get_data(cqes); 151 ret = cqes->res; 152 io_uring_cqe_seen(&s->ring, cqes); 153 cqes = NULL; 154 155 /* Change counters one-by-one because we can be nested. */ 156 s->io_q.in_flight--; 157 trace_luring_process_completion(s, luringcb, ret); 158 159 /* total_read is non-zero only for resubmitted read requests */ 160 total_bytes = ret + luringcb->total_read; 161 162 if (ret < 0) { 163 /* 164 * Only writev/readv/fsync requests on regular files or host block 165 * devices are submitted. Therefore -EAGAIN is not expected but it's 166 * known to happen sometimes with Linux SCSI. Submit again and hope 167 * the request completes successfully. 168 * 169 * For more information, see: 170 * https://lore.kernel.org/io-uring/20210727165811.284510-3-axboe@kernel.dk/T/#u 171 * 172 * If the code is changed to submit other types of requests in the 173 * future, then this workaround may need to be extended to deal with 174 * genuine -EAGAIN results that should not be resubmitted 175 * immediately. 176 */ 177 if (ret == -EINTR || ret == -EAGAIN) { 178 luring_resubmit(s, luringcb); 179 continue; 180 } 181 } else if (!luringcb->qiov) { 182 goto end; 183 } else if (total_bytes == luringcb->qiov->size) { 184 ret = 0; 185 /* Only read/write */ 186 } else { 187 /* Short Read/Write */ 188 if (luringcb->is_read) { 189 if (ret > 0) { 190 luring_resubmit_short_read(s, luringcb, ret); 191 continue; 192 } else { 193 /* Pad with zeroes */ 194 qemu_iovec_memset(luringcb->qiov, total_bytes, 0, 195 luringcb->qiov->size - total_bytes); 196 ret = 0; 197 } 198 } else { 199 ret = -ENOSPC; 200 } 201 } 202 end: 203 luringcb->ret = ret; 204 qemu_iovec_destroy(&luringcb->resubmit_qiov); 205 206 /* 207 * If the coroutine is already entered it must be in ioq_submit() 208 * and will notice luringcb->ret has been filled in when it 209 * eventually runs later. Coroutines cannot be entered recursively 210 * so avoid doing that! 211 */ 212 if (!qemu_coroutine_entered(luringcb->co)) { 213 aio_co_wake(luringcb->co); 214 } 215 } 216 qemu_bh_cancel(s->completion_bh); 217 } 218 219 static int ioq_submit(LuringState *s) 220 { 221 int ret = 0; 222 LuringAIOCB *luringcb, *luringcb_next; 223 224 while (s->io_q.in_queue > 0) { 225 /* 226 * Try to fetch sqes from the ring for requests waiting in 227 * the overflow queue 228 */ 229 QSIMPLEQ_FOREACH_SAFE(luringcb, &s->io_q.submit_queue, next, 230 luringcb_next) { 231 struct io_uring_sqe *sqes = io_uring_get_sqe(&s->ring); 232 if (!sqes) { 233 break; 234 } 235 /* Prep sqe for submission */ 236 *sqes = luringcb->sqeq; 237 QSIMPLEQ_REMOVE_HEAD(&s->io_q.submit_queue, next); 238 } 239 ret = io_uring_submit(&s->ring); 240 trace_luring_io_uring_submit(s, ret); 241 /* Prevent infinite loop if submission is refused */ 242 if (ret <= 0) { 243 if (ret == -EAGAIN || ret == -EINTR) { 244 continue; 245 } 246 break; 247 } 248 s->io_q.in_flight += ret; 249 s->io_q.in_queue -= ret; 250 } 251 s->io_q.blocked = (s->io_q.in_queue > 0); 252 253 if (s->io_q.in_flight) { 254 /* 255 * We can try to complete something just right away if there are 256 * still requests in-flight. 257 */ 258 luring_process_completions(s); 259 } 260 return ret; 261 } 262 263 static void luring_process_completions_and_submit(LuringState *s) 264 { 265 aio_context_acquire(s->aio_context); 266 luring_process_completions(s); 267 268 if (!s->io_q.plugged && s->io_q.in_queue > 0) { 269 ioq_submit(s); 270 } 271 aio_context_release(s->aio_context); 272 } 273 274 static void qemu_luring_completion_bh(void *opaque) 275 { 276 LuringState *s = opaque; 277 luring_process_completions_and_submit(s); 278 } 279 280 static void qemu_luring_completion_cb(void *opaque) 281 { 282 LuringState *s = opaque; 283 luring_process_completions_and_submit(s); 284 } 285 286 static bool qemu_luring_poll_cb(void *opaque) 287 { 288 LuringState *s = opaque; 289 290 return io_uring_cq_ready(&s->ring); 291 } 292 293 static void qemu_luring_poll_ready(void *opaque) 294 { 295 LuringState *s = opaque; 296 297 luring_process_completions_and_submit(s); 298 } 299 300 static void ioq_init(LuringQueue *io_q) 301 { 302 QSIMPLEQ_INIT(&io_q->submit_queue); 303 io_q->plugged = 0; 304 io_q->in_queue = 0; 305 io_q->in_flight = 0; 306 io_q->blocked = false; 307 } 308 309 void luring_io_plug(BlockDriverState *bs, LuringState *s) 310 { 311 trace_luring_io_plug(s); 312 s->io_q.plugged++; 313 } 314 315 void luring_io_unplug(BlockDriverState *bs, LuringState *s) 316 { 317 assert(s->io_q.plugged); 318 trace_luring_io_unplug(s, s->io_q.blocked, s->io_q.plugged, 319 s->io_q.in_queue, s->io_q.in_flight); 320 if (--s->io_q.plugged == 0 && 321 !s->io_q.blocked && s->io_q.in_queue > 0) { 322 ioq_submit(s); 323 } 324 } 325 326 /** 327 * luring_do_submit: 328 * @fd: file descriptor for I/O 329 * @luringcb: AIO control block 330 * @s: AIO state 331 * @offset: offset for request 332 * @type: type of request 333 * 334 * Fetches sqes from ring, adds to pending queue and preps them 335 * 336 */ 337 static int luring_do_submit(int fd, LuringAIOCB *luringcb, LuringState *s, 338 uint64_t offset, int type) 339 { 340 int ret; 341 struct io_uring_sqe *sqes = &luringcb->sqeq; 342 343 switch (type) { 344 case QEMU_AIO_WRITE: 345 io_uring_prep_writev(sqes, fd, luringcb->qiov->iov, 346 luringcb->qiov->niov, offset); 347 break; 348 case QEMU_AIO_READ: 349 io_uring_prep_readv(sqes, fd, luringcb->qiov->iov, 350 luringcb->qiov->niov, offset); 351 break; 352 case QEMU_AIO_FLUSH: 353 io_uring_prep_fsync(sqes, fd, IORING_FSYNC_DATASYNC); 354 break; 355 default: 356 fprintf(stderr, "%s: invalid AIO request type, aborting 0x%x.\n", 357 __func__, type); 358 abort(); 359 } 360 io_uring_sqe_set_data(sqes, luringcb); 361 362 QSIMPLEQ_INSERT_TAIL(&s->io_q.submit_queue, luringcb, next); 363 s->io_q.in_queue++; 364 trace_luring_do_submit(s, s->io_q.blocked, s->io_q.plugged, 365 s->io_q.in_queue, s->io_q.in_flight); 366 if (!s->io_q.blocked && 367 (!s->io_q.plugged || 368 s->io_q.in_flight + s->io_q.in_queue >= MAX_ENTRIES)) { 369 ret = ioq_submit(s); 370 trace_luring_do_submit_done(s, ret); 371 return ret; 372 } 373 return 0; 374 } 375 376 int coroutine_fn luring_co_submit(BlockDriverState *bs, LuringState *s, int fd, 377 uint64_t offset, QEMUIOVector *qiov, int type) 378 { 379 int ret; 380 LuringAIOCB luringcb = { 381 .co = qemu_coroutine_self(), 382 .ret = -EINPROGRESS, 383 .qiov = qiov, 384 .is_read = (type == QEMU_AIO_READ), 385 }; 386 trace_luring_co_submit(bs, s, &luringcb, fd, offset, qiov ? qiov->size : 0, 387 type); 388 ret = luring_do_submit(fd, &luringcb, s, offset, type); 389 390 if (ret < 0) { 391 return ret; 392 } 393 394 if (luringcb.ret == -EINPROGRESS) { 395 qemu_coroutine_yield(); 396 } 397 return luringcb.ret; 398 } 399 400 void luring_detach_aio_context(LuringState *s, AioContext *old_context) 401 { 402 aio_set_fd_handler(old_context, s->ring.ring_fd, false, 403 NULL, NULL, NULL, NULL, s); 404 qemu_bh_delete(s->completion_bh); 405 s->aio_context = NULL; 406 } 407 408 void luring_attach_aio_context(LuringState *s, AioContext *new_context) 409 { 410 s->aio_context = new_context; 411 s->completion_bh = aio_bh_new(new_context, qemu_luring_completion_bh, s); 412 aio_set_fd_handler(s->aio_context, s->ring.ring_fd, false, 413 qemu_luring_completion_cb, NULL, 414 qemu_luring_poll_cb, qemu_luring_poll_ready, s); 415 } 416 417 LuringState *luring_init(Error **errp) 418 { 419 int rc; 420 LuringState *s = g_new0(LuringState, 1); 421 struct io_uring *ring = &s->ring; 422 423 trace_luring_init_state(s, sizeof(*s)); 424 425 rc = io_uring_queue_init(MAX_ENTRIES, ring, 0); 426 if (rc < 0) { 427 error_setg_errno(errp, errno, "failed to init linux io_uring ring"); 428 g_free(s); 429 return NULL; 430 } 431 432 ioq_init(&s->io_q); 433 return s; 434 435 } 436 437 void luring_cleanup(LuringState *s) 438 { 439 io_uring_queue_exit(&s->ring); 440 trace_luring_cleanup_state(s); 441 g_free(s); 442 } 443