1 /* 2 * Linux native AIO support. 3 * 4 * Copyright (C) 2009 IBM, Corp. 5 * Copyright (C) 2009 Red Hat, Inc. 6 * 7 * This work is licensed under the terms of the GNU GPL, version 2 or later. 8 * See the COPYING file in the top-level directory. 9 */ 10 #include "qemu/osdep.h" 11 #include "qemu-common.h" 12 #include "block/aio.h" 13 #include "qemu/queue.h" 14 #include "block/block.h" 15 #include "block/raw-aio.h" 16 #include "qemu/event_notifier.h" 17 #include "qemu/coroutine.h" 18 #include "qapi/error.h" 19 20 #include <libaio.h> 21 22 /* 23 * Queue size (per-device). 24 * 25 * XXX: eventually we need to communicate this to the guest and/or make it 26 * tunable by the guest. If we get more outstanding requests at a time 27 * than this we will get EAGAIN from io_submit which is communicated to 28 * the guest as an I/O error. 29 */ 30 #define MAX_EVENTS 128 31 32 struct qemu_laiocb { 33 BlockAIOCB common; 34 Coroutine *co; 35 LinuxAioState *ctx; 36 struct iocb iocb; 37 ssize_t ret; 38 size_t nbytes; 39 QEMUIOVector *qiov; 40 bool is_read; 41 QSIMPLEQ_ENTRY(qemu_laiocb) next; 42 }; 43 44 typedef struct { 45 int plugged; 46 unsigned int in_queue; 47 unsigned int in_flight; 48 bool blocked; 49 QSIMPLEQ_HEAD(, qemu_laiocb) pending; 50 } LaioQueue; 51 52 struct LinuxAioState { 53 AioContext *aio_context; 54 55 io_context_t ctx; 56 EventNotifier e; 57 58 /* io queue for submit at batch. Protected by AioContext lock. */ 59 LaioQueue io_q; 60 61 /* I/O completion processing. Only runs in I/O thread. */ 62 QEMUBH *completion_bh; 63 int event_idx; 64 int event_max; 65 }; 66 67 static void ioq_submit(LinuxAioState *s); 68 69 static inline ssize_t io_event_ret(struct io_event *ev) 70 { 71 return (ssize_t)(((uint64_t)ev->res2 << 32) | ev->res); 72 } 73 74 /* 75 * Completes an AIO request (calls the callback and frees the ACB). 76 */ 77 static void qemu_laio_process_completion(struct qemu_laiocb *laiocb) 78 { 79 int ret; 80 81 ret = laiocb->ret; 82 if (ret != -ECANCELED) { 83 if (ret == laiocb->nbytes) { 84 ret = 0; 85 } else if (ret >= 0) { 86 /* Short reads mean EOF, pad with zeros. */ 87 if (laiocb->is_read) { 88 qemu_iovec_memset(laiocb->qiov, ret, 0, 89 laiocb->qiov->size - ret); 90 } else { 91 ret = -ENOSPC; 92 } 93 } 94 } 95 96 laiocb->ret = ret; 97 if (laiocb->co) { 98 /* If the coroutine is already entered it must be in ioq_submit() and 99 * will notice laio->ret has been filled in when it eventually runs 100 * later. Coroutines cannot be entered recursively so avoid doing 101 * that! 102 */ 103 if (!qemu_coroutine_entered(laiocb->co)) { 104 aio_co_wake(laiocb->co); 105 } 106 } else { 107 laiocb->common.cb(laiocb->common.opaque, ret); 108 qemu_aio_unref(laiocb); 109 } 110 } 111 112 /** 113 * aio_ring buffer which is shared between userspace and kernel. 114 * 115 * This copied from linux/fs/aio.c, common header does not exist 116 * but AIO exists for ages so we assume ABI is stable. 117 */ 118 struct aio_ring { 119 unsigned id; /* kernel internal index number */ 120 unsigned nr; /* number of io_events */ 121 unsigned head; /* Written to by userland or by kernel. */ 122 unsigned tail; 123 124 unsigned magic; 125 unsigned compat_features; 126 unsigned incompat_features; 127 unsigned header_length; /* size of aio_ring */ 128 129 struct io_event io_events[0]; 130 }; 131 132 /** 133 * io_getevents_peek: 134 * @ctx: AIO context 135 * @events: pointer on events array, output value 136 137 * Returns the number of completed events and sets a pointer 138 * on events array. This function does not update the internal 139 * ring buffer, only reads head and tail. When @events has been 140 * processed io_getevents_commit() must be called. 141 */ 142 static inline unsigned int io_getevents_peek(io_context_t ctx, 143 struct io_event **events) 144 { 145 struct aio_ring *ring = (struct aio_ring *)ctx; 146 unsigned int head = ring->head, tail = ring->tail; 147 unsigned int nr; 148 149 nr = tail >= head ? tail - head : ring->nr - head; 150 *events = ring->io_events + head; 151 /* To avoid speculative loads of s->events[i] before observing tail. 152 Paired with smp_wmb() inside linux/fs/aio.c: aio_complete(). */ 153 smp_rmb(); 154 155 return nr; 156 } 157 158 /** 159 * io_getevents_commit: 160 * @ctx: AIO context 161 * @nr: the number of events on which head should be advanced 162 * 163 * Advances head of a ring buffer. 164 */ 165 static inline void io_getevents_commit(io_context_t ctx, unsigned int nr) 166 { 167 struct aio_ring *ring = (struct aio_ring *)ctx; 168 169 if (nr) { 170 ring->head = (ring->head + nr) % ring->nr; 171 } 172 } 173 174 /** 175 * io_getevents_advance_and_peek: 176 * @ctx: AIO context 177 * @events: pointer on events array, output value 178 * @nr: the number of events on which head should be advanced 179 * 180 * Advances head of a ring buffer and returns number of elements left. 181 */ 182 static inline unsigned int 183 io_getevents_advance_and_peek(io_context_t ctx, 184 struct io_event **events, 185 unsigned int nr) 186 { 187 io_getevents_commit(ctx, nr); 188 return io_getevents_peek(ctx, events); 189 } 190 191 /** 192 * qemu_laio_process_completions: 193 * @s: AIO state 194 * 195 * Fetches completed I/O requests and invokes their callbacks. 196 * 197 * The function is somewhat tricky because it supports nested event loops, for 198 * example when a request callback invokes aio_poll(). In order to do this, 199 * indices are kept in LinuxAioState. Function schedules BH completion so it 200 * can be called again in a nested event loop. When there are no events left 201 * to complete the BH is being canceled. 202 */ 203 static void qemu_laio_process_completions(LinuxAioState *s) 204 { 205 struct io_event *events; 206 207 /* Reschedule so nested event loops see currently pending completions */ 208 qemu_bh_schedule(s->completion_bh); 209 210 while ((s->event_max = io_getevents_advance_and_peek(s->ctx, &events, 211 s->event_idx))) { 212 for (s->event_idx = 0; s->event_idx < s->event_max; ) { 213 struct iocb *iocb = events[s->event_idx].obj; 214 struct qemu_laiocb *laiocb = 215 container_of(iocb, struct qemu_laiocb, iocb); 216 217 laiocb->ret = io_event_ret(&events[s->event_idx]); 218 219 /* Change counters one-by-one because we can be nested. */ 220 s->io_q.in_flight--; 221 s->event_idx++; 222 qemu_laio_process_completion(laiocb); 223 } 224 } 225 226 qemu_bh_cancel(s->completion_bh); 227 228 /* If we are nested we have to notify the level above that we are done 229 * by setting event_max to zero, upper level will then jump out of it's 230 * own `for` loop. If we are the last all counters droped to zero. */ 231 s->event_max = 0; 232 s->event_idx = 0; 233 } 234 235 static void qemu_laio_process_completions_and_submit(LinuxAioState *s) 236 { 237 qemu_laio_process_completions(s); 238 239 aio_context_acquire(s->aio_context); 240 if (!s->io_q.plugged && !QSIMPLEQ_EMPTY(&s->io_q.pending)) { 241 ioq_submit(s); 242 } 243 aio_context_release(s->aio_context); 244 } 245 246 static void qemu_laio_completion_bh(void *opaque) 247 { 248 LinuxAioState *s = opaque; 249 250 qemu_laio_process_completions_and_submit(s); 251 } 252 253 static void qemu_laio_completion_cb(EventNotifier *e) 254 { 255 LinuxAioState *s = container_of(e, LinuxAioState, e); 256 257 if (event_notifier_test_and_clear(&s->e)) { 258 qemu_laio_process_completions_and_submit(s); 259 } 260 } 261 262 static bool qemu_laio_poll_cb(void *opaque) 263 { 264 EventNotifier *e = opaque; 265 LinuxAioState *s = container_of(e, LinuxAioState, e); 266 struct io_event *events; 267 268 if (!io_getevents_peek(s->ctx, &events)) { 269 return false; 270 } 271 272 qemu_laio_process_completions_and_submit(s); 273 return true; 274 } 275 276 static void laio_cancel(BlockAIOCB *blockacb) 277 { 278 struct qemu_laiocb *laiocb = (struct qemu_laiocb *)blockacb; 279 struct io_event event; 280 int ret; 281 282 if (laiocb->ret != -EINPROGRESS) { 283 return; 284 } 285 ret = io_cancel(laiocb->ctx->ctx, &laiocb->iocb, &event); 286 laiocb->ret = -ECANCELED; 287 if (ret != 0) { 288 /* iocb is not cancelled, cb will be called by the event loop later */ 289 return; 290 } 291 292 laiocb->common.cb(laiocb->common.opaque, laiocb->ret); 293 } 294 295 static const AIOCBInfo laio_aiocb_info = { 296 .aiocb_size = sizeof(struct qemu_laiocb), 297 .cancel_async = laio_cancel, 298 }; 299 300 static void ioq_init(LaioQueue *io_q) 301 { 302 QSIMPLEQ_INIT(&io_q->pending); 303 io_q->plugged = 0; 304 io_q->in_queue = 0; 305 io_q->in_flight = 0; 306 io_q->blocked = false; 307 } 308 309 static void ioq_submit(LinuxAioState *s) 310 { 311 int ret, len; 312 struct qemu_laiocb *aiocb; 313 struct iocb *iocbs[MAX_EVENTS]; 314 QSIMPLEQ_HEAD(, qemu_laiocb) completed; 315 316 do { 317 if (s->io_q.in_flight >= MAX_EVENTS) { 318 break; 319 } 320 len = 0; 321 QSIMPLEQ_FOREACH(aiocb, &s->io_q.pending, next) { 322 iocbs[len++] = &aiocb->iocb; 323 if (s->io_q.in_flight + len >= MAX_EVENTS) { 324 break; 325 } 326 } 327 328 ret = io_submit(s->ctx, len, iocbs); 329 if (ret == -EAGAIN) { 330 break; 331 } 332 if (ret < 0) { 333 /* Fail the first request, retry the rest */ 334 aiocb = QSIMPLEQ_FIRST(&s->io_q.pending); 335 QSIMPLEQ_REMOVE_HEAD(&s->io_q.pending, next); 336 s->io_q.in_queue--; 337 aiocb->ret = ret; 338 qemu_laio_process_completion(aiocb); 339 continue; 340 } 341 342 s->io_q.in_flight += ret; 343 s->io_q.in_queue -= ret; 344 aiocb = container_of(iocbs[ret - 1], struct qemu_laiocb, iocb); 345 QSIMPLEQ_SPLIT_AFTER(&s->io_q.pending, aiocb, next, &completed); 346 } while (ret == len && !QSIMPLEQ_EMPTY(&s->io_q.pending)); 347 s->io_q.blocked = (s->io_q.in_queue > 0); 348 349 if (s->io_q.in_flight) { 350 /* We can try to complete something just right away if there are 351 * still requests in-flight. */ 352 qemu_laio_process_completions(s); 353 /* 354 * Even we have completed everything (in_flight == 0), the queue can 355 * have still pended requests (in_queue > 0). We do not attempt to 356 * repeat submission to avoid IO hang. The reason is simple: s->e is 357 * still set and completion callback will be called shortly and all 358 * pended requests will be submitted from there. 359 */ 360 } 361 } 362 363 void laio_io_plug(BlockDriverState *bs, LinuxAioState *s) 364 { 365 s->io_q.plugged++; 366 } 367 368 void laio_io_unplug(BlockDriverState *bs, LinuxAioState *s) 369 { 370 assert(s->io_q.plugged); 371 if (--s->io_q.plugged == 0 && 372 !s->io_q.blocked && !QSIMPLEQ_EMPTY(&s->io_q.pending)) { 373 ioq_submit(s); 374 } 375 } 376 377 static int laio_do_submit(int fd, struct qemu_laiocb *laiocb, off_t offset, 378 int type) 379 { 380 LinuxAioState *s = laiocb->ctx; 381 struct iocb *iocbs = &laiocb->iocb; 382 QEMUIOVector *qiov = laiocb->qiov; 383 384 switch (type) { 385 case QEMU_AIO_WRITE: 386 io_prep_pwritev(iocbs, fd, qiov->iov, qiov->niov, offset); 387 break; 388 case QEMU_AIO_READ: 389 io_prep_preadv(iocbs, fd, qiov->iov, qiov->niov, offset); 390 break; 391 /* Currently Linux kernel does not support other operations */ 392 default: 393 fprintf(stderr, "%s: invalid AIO request type 0x%x.\n", 394 __func__, type); 395 return -EIO; 396 } 397 io_set_eventfd(&laiocb->iocb, event_notifier_get_fd(&s->e)); 398 399 QSIMPLEQ_INSERT_TAIL(&s->io_q.pending, laiocb, next); 400 s->io_q.in_queue++; 401 if (!s->io_q.blocked && 402 (!s->io_q.plugged || 403 s->io_q.in_flight + s->io_q.in_queue >= MAX_EVENTS)) { 404 ioq_submit(s); 405 } 406 407 return 0; 408 } 409 410 int coroutine_fn laio_co_submit(BlockDriverState *bs, LinuxAioState *s, int fd, 411 uint64_t offset, QEMUIOVector *qiov, int type) 412 { 413 int ret; 414 struct qemu_laiocb laiocb = { 415 .co = qemu_coroutine_self(), 416 .nbytes = qiov->size, 417 .ctx = s, 418 .ret = -EINPROGRESS, 419 .is_read = (type == QEMU_AIO_READ), 420 .qiov = qiov, 421 }; 422 423 ret = laio_do_submit(fd, &laiocb, offset, type); 424 if (ret < 0) { 425 return ret; 426 } 427 428 if (laiocb.ret == -EINPROGRESS) { 429 qemu_coroutine_yield(); 430 } 431 return laiocb.ret; 432 } 433 434 BlockAIOCB *laio_submit(BlockDriverState *bs, LinuxAioState *s, int fd, 435 int64_t sector_num, QEMUIOVector *qiov, int nb_sectors, 436 BlockCompletionFunc *cb, void *opaque, int type) 437 { 438 struct qemu_laiocb *laiocb; 439 off_t offset = sector_num * BDRV_SECTOR_SIZE; 440 int ret; 441 442 laiocb = qemu_aio_get(&laio_aiocb_info, bs, cb, opaque); 443 laiocb->nbytes = nb_sectors * BDRV_SECTOR_SIZE; 444 laiocb->ctx = s; 445 laiocb->ret = -EINPROGRESS; 446 laiocb->is_read = (type == QEMU_AIO_READ); 447 laiocb->qiov = qiov; 448 449 ret = laio_do_submit(fd, laiocb, offset, type); 450 if (ret < 0) { 451 qemu_aio_unref(laiocb); 452 return NULL; 453 } 454 455 return &laiocb->common; 456 } 457 458 void laio_detach_aio_context(LinuxAioState *s, AioContext *old_context) 459 { 460 aio_set_event_notifier(old_context, &s->e, false, NULL, NULL); 461 qemu_bh_delete(s->completion_bh); 462 s->aio_context = NULL; 463 } 464 465 void laio_attach_aio_context(LinuxAioState *s, AioContext *new_context) 466 { 467 s->aio_context = new_context; 468 s->completion_bh = aio_bh_new(new_context, qemu_laio_completion_bh, s); 469 aio_set_event_notifier(new_context, &s->e, false, 470 qemu_laio_completion_cb, 471 qemu_laio_poll_cb); 472 } 473 474 LinuxAioState *laio_init(Error **errp) 475 { 476 int rc; 477 LinuxAioState *s; 478 479 s = g_malloc0(sizeof(*s)); 480 rc = event_notifier_init(&s->e, false); 481 if (rc < 0) { 482 error_setg_errno(errp, -rc, "failed to to initialize event notifier"); 483 goto out_free_state; 484 } 485 486 rc = io_setup(MAX_EVENTS, &s->ctx); 487 if (rc < 0) { 488 error_setg_errno(errp, -rc, "failed to create linux AIO context"); 489 goto out_close_efd; 490 } 491 492 ioq_init(&s->io_q); 493 494 return s; 495 496 out_close_efd: 497 event_notifier_cleanup(&s->e); 498 out_free_state: 499 g_free(s); 500 return NULL; 501 } 502 503 void laio_cleanup(LinuxAioState *s) 504 { 505 event_notifier_cleanup(&s->e); 506 507 if (io_destroy(s->ctx) != 0) { 508 fprintf(stderr, "%s: destroy AIO context %p failed\n", 509 __func__, &s->ctx); 510 } 511 g_free(s); 512 } 513