1 /* 2 * QEMU aio implementation 3 * 4 * Copyright IBM, Corp. 2008 5 * 6 * Authors: 7 * Anthony Liguori <aliguori@us.ibm.com> 8 * 9 * This work is licensed under the terms of the GNU GPL, version 2. See 10 * the COPYING file in the top-level directory. 11 * 12 * Contributions after 2012-01-13 are licensed under the terms of the 13 * GNU GPL, version 2 or (at your option) any later version. 14 */ 15 16 #include "qemu/osdep.h" 17 #include "qemu-common.h" 18 #include "block/block.h" 19 #include "qemu/rcu_queue.h" 20 #include "qemu/sockets.h" 21 #include "qemu/cutils.h" 22 #include "trace.h" 23 #ifdef CONFIG_EPOLL_CREATE1 24 #include <sys/epoll.h> 25 #endif 26 27 struct AioHandler 28 { 29 GPollFD pfd; 30 IOHandler *io_read; 31 IOHandler *io_write; 32 AioPollFn *io_poll; 33 IOHandler *io_poll_begin; 34 IOHandler *io_poll_end; 35 int deleted; 36 void *opaque; 37 bool is_external; 38 QLIST_ENTRY(AioHandler) node; 39 }; 40 41 #ifdef CONFIG_EPOLL_CREATE1 42 43 /* The fd number threashold to switch to epoll */ 44 #define EPOLL_ENABLE_THRESHOLD 64 45 46 static void aio_epoll_disable(AioContext *ctx) 47 { 48 ctx->epoll_available = false; 49 if (!ctx->epoll_enabled) { 50 return; 51 } 52 ctx->epoll_enabled = false; 53 close(ctx->epollfd); 54 } 55 56 static inline int epoll_events_from_pfd(int pfd_events) 57 { 58 return (pfd_events & G_IO_IN ? EPOLLIN : 0) | 59 (pfd_events & G_IO_OUT ? EPOLLOUT : 0) | 60 (pfd_events & G_IO_HUP ? EPOLLHUP : 0) | 61 (pfd_events & G_IO_ERR ? EPOLLERR : 0); 62 } 63 64 static bool aio_epoll_try_enable(AioContext *ctx) 65 { 66 AioHandler *node; 67 struct epoll_event event; 68 69 QLIST_FOREACH_RCU(node, &ctx->aio_handlers, node) { 70 int r; 71 if (node->deleted || !node->pfd.events) { 72 continue; 73 } 74 event.events = epoll_events_from_pfd(node->pfd.events); 75 event.data.ptr = node; 76 r = epoll_ctl(ctx->epollfd, EPOLL_CTL_ADD, node->pfd.fd, &event); 77 if (r) { 78 return false; 79 } 80 } 81 ctx->epoll_enabled = true; 82 return true; 83 } 84 85 static void aio_epoll_update(AioContext *ctx, AioHandler *node, bool is_new) 86 { 87 struct epoll_event event; 88 int r; 89 int ctl; 90 91 if (!ctx->epoll_enabled) { 92 return; 93 } 94 if (!node->pfd.events) { 95 ctl = EPOLL_CTL_DEL; 96 } else { 97 event.data.ptr = node; 98 event.events = epoll_events_from_pfd(node->pfd.events); 99 ctl = is_new ? EPOLL_CTL_ADD : EPOLL_CTL_MOD; 100 } 101 102 r = epoll_ctl(ctx->epollfd, ctl, node->pfd.fd, &event); 103 if (r) { 104 aio_epoll_disable(ctx); 105 } 106 } 107 108 static int aio_epoll(AioContext *ctx, GPollFD *pfds, 109 unsigned npfd, int64_t timeout) 110 { 111 AioHandler *node; 112 int i, ret = 0; 113 struct epoll_event events[128]; 114 115 assert(npfd == 1); 116 assert(pfds[0].fd == ctx->epollfd); 117 if (timeout > 0) { 118 ret = qemu_poll_ns(pfds, npfd, timeout); 119 } 120 if (timeout <= 0 || ret > 0) { 121 ret = epoll_wait(ctx->epollfd, events, 122 sizeof(events) / sizeof(events[0]), 123 timeout); 124 if (ret <= 0) { 125 goto out; 126 } 127 for (i = 0; i < ret; i++) { 128 int ev = events[i].events; 129 node = events[i].data.ptr; 130 node->pfd.revents = (ev & EPOLLIN ? G_IO_IN : 0) | 131 (ev & EPOLLOUT ? G_IO_OUT : 0) | 132 (ev & EPOLLHUP ? G_IO_HUP : 0) | 133 (ev & EPOLLERR ? G_IO_ERR : 0); 134 } 135 } 136 out: 137 return ret; 138 } 139 140 static bool aio_epoll_enabled(AioContext *ctx) 141 { 142 /* Fall back to ppoll when external clients are disabled. */ 143 return !aio_external_disabled(ctx) && ctx->epoll_enabled; 144 } 145 146 static bool aio_epoll_check_poll(AioContext *ctx, GPollFD *pfds, 147 unsigned npfd, int64_t timeout) 148 { 149 if (!ctx->epoll_available) { 150 return false; 151 } 152 if (aio_epoll_enabled(ctx)) { 153 return true; 154 } 155 if (npfd >= EPOLL_ENABLE_THRESHOLD) { 156 if (aio_epoll_try_enable(ctx)) { 157 return true; 158 } else { 159 aio_epoll_disable(ctx); 160 } 161 } 162 return false; 163 } 164 165 #else 166 167 static void aio_epoll_update(AioContext *ctx, AioHandler *node, bool is_new) 168 { 169 } 170 171 static int aio_epoll(AioContext *ctx, GPollFD *pfds, 172 unsigned npfd, int64_t timeout) 173 { 174 assert(false); 175 } 176 177 static bool aio_epoll_enabled(AioContext *ctx) 178 { 179 return false; 180 } 181 182 static bool aio_epoll_check_poll(AioContext *ctx, GPollFD *pfds, 183 unsigned npfd, int64_t timeout) 184 { 185 return false; 186 } 187 188 #endif 189 190 static AioHandler *find_aio_handler(AioContext *ctx, int fd) 191 { 192 AioHandler *node; 193 194 QLIST_FOREACH(node, &ctx->aio_handlers, node) { 195 if (node->pfd.fd == fd) 196 if (!node->deleted) 197 return node; 198 } 199 200 return NULL; 201 } 202 203 void aio_set_fd_handler(AioContext *ctx, 204 int fd, 205 bool is_external, 206 IOHandler *io_read, 207 IOHandler *io_write, 208 AioPollFn *io_poll, 209 void *opaque) 210 { 211 AioHandler *node; 212 bool is_new = false; 213 bool deleted = false; 214 215 qemu_lockcnt_lock(&ctx->list_lock); 216 217 node = find_aio_handler(ctx, fd); 218 219 /* Are we deleting the fd handler? */ 220 if (!io_read && !io_write && !io_poll) { 221 if (node == NULL) { 222 qemu_lockcnt_unlock(&ctx->list_lock); 223 return; 224 } 225 226 g_source_remove_poll(&ctx->source, &node->pfd); 227 228 /* If the lock is held, just mark the node as deleted */ 229 if (qemu_lockcnt_count(&ctx->list_lock)) { 230 node->deleted = 1; 231 node->pfd.revents = 0; 232 } else { 233 /* Otherwise, delete it for real. We can't just mark it as 234 * deleted because deleted nodes are only cleaned up while 235 * no one is walking the handlers list. 236 */ 237 QLIST_REMOVE(node, node); 238 deleted = true; 239 } 240 241 if (!node->io_poll) { 242 ctx->poll_disable_cnt--; 243 } 244 } else { 245 if (node == NULL) { 246 /* Alloc and insert if it's not already there */ 247 node = g_new0(AioHandler, 1); 248 node->pfd.fd = fd; 249 QLIST_INSERT_HEAD_RCU(&ctx->aio_handlers, node, node); 250 251 g_source_add_poll(&ctx->source, &node->pfd); 252 is_new = true; 253 254 ctx->poll_disable_cnt += !io_poll; 255 } else { 256 ctx->poll_disable_cnt += !io_poll - !node->io_poll; 257 } 258 259 /* Update handler with latest information */ 260 node->io_read = io_read; 261 node->io_write = io_write; 262 node->io_poll = io_poll; 263 node->opaque = opaque; 264 node->is_external = is_external; 265 266 node->pfd.events = (io_read ? G_IO_IN | G_IO_HUP | G_IO_ERR : 0); 267 node->pfd.events |= (io_write ? G_IO_OUT | G_IO_ERR : 0); 268 } 269 270 aio_epoll_update(ctx, node, is_new); 271 qemu_lockcnt_unlock(&ctx->list_lock); 272 aio_notify(ctx); 273 274 if (deleted) { 275 g_free(node); 276 } 277 } 278 279 void aio_set_fd_poll(AioContext *ctx, int fd, 280 IOHandler *io_poll_begin, 281 IOHandler *io_poll_end) 282 { 283 AioHandler *node = find_aio_handler(ctx, fd); 284 285 if (!node) { 286 return; 287 } 288 289 node->io_poll_begin = io_poll_begin; 290 node->io_poll_end = io_poll_end; 291 } 292 293 void aio_set_event_notifier(AioContext *ctx, 294 EventNotifier *notifier, 295 bool is_external, 296 EventNotifierHandler *io_read, 297 AioPollFn *io_poll) 298 { 299 aio_set_fd_handler(ctx, event_notifier_get_fd(notifier), is_external, 300 (IOHandler *)io_read, NULL, io_poll, notifier); 301 } 302 303 void aio_set_event_notifier_poll(AioContext *ctx, 304 EventNotifier *notifier, 305 EventNotifierHandler *io_poll_begin, 306 EventNotifierHandler *io_poll_end) 307 { 308 aio_set_fd_poll(ctx, event_notifier_get_fd(notifier), 309 (IOHandler *)io_poll_begin, 310 (IOHandler *)io_poll_end); 311 } 312 313 static void poll_set_started(AioContext *ctx, bool started) 314 { 315 AioHandler *node; 316 317 if (started == ctx->poll_started) { 318 return; 319 } 320 321 ctx->poll_started = started; 322 323 qemu_lockcnt_inc(&ctx->list_lock); 324 QLIST_FOREACH_RCU(node, &ctx->aio_handlers, node) { 325 IOHandler *fn; 326 327 if (node->deleted) { 328 continue; 329 } 330 331 if (started) { 332 fn = node->io_poll_begin; 333 } else { 334 fn = node->io_poll_end; 335 } 336 337 if (fn) { 338 fn(node->opaque); 339 } 340 } 341 qemu_lockcnt_dec(&ctx->list_lock); 342 } 343 344 345 bool aio_prepare(AioContext *ctx) 346 { 347 /* Poll mode cannot be used with glib's event loop, disable it. */ 348 poll_set_started(ctx, false); 349 350 return false; 351 } 352 353 bool aio_pending(AioContext *ctx) 354 { 355 AioHandler *node; 356 bool result = false; 357 358 /* 359 * We have to walk very carefully in case aio_set_fd_handler is 360 * called while we're walking. 361 */ 362 qemu_lockcnt_inc(&ctx->list_lock); 363 364 QLIST_FOREACH_RCU(node, &ctx->aio_handlers, node) { 365 int revents; 366 367 revents = node->pfd.revents & node->pfd.events; 368 if (revents & (G_IO_IN | G_IO_HUP | G_IO_ERR) && node->io_read && 369 aio_node_check(ctx, node->is_external)) { 370 result = true; 371 break; 372 } 373 if (revents & (G_IO_OUT | G_IO_ERR) && node->io_write && 374 aio_node_check(ctx, node->is_external)) { 375 result = true; 376 break; 377 } 378 } 379 qemu_lockcnt_dec(&ctx->list_lock); 380 381 return result; 382 } 383 384 static bool aio_dispatch_handlers(AioContext *ctx) 385 { 386 AioHandler *node, *tmp; 387 bool progress = false; 388 389 /* 390 * We have to walk very carefully in case aio_set_fd_handler is 391 * called while we're walking. 392 */ 393 qemu_lockcnt_inc(&ctx->list_lock); 394 395 QLIST_FOREACH_SAFE_RCU(node, &ctx->aio_handlers, node, tmp) { 396 int revents; 397 398 revents = node->pfd.revents & node->pfd.events; 399 node->pfd.revents = 0; 400 401 if (!node->deleted && 402 (revents & (G_IO_IN | G_IO_HUP | G_IO_ERR)) && 403 aio_node_check(ctx, node->is_external) && 404 node->io_read) { 405 node->io_read(node->opaque); 406 407 /* aio_notify() does not count as progress */ 408 if (node->opaque != &ctx->notifier) { 409 progress = true; 410 } 411 } 412 if (!node->deleted && 413 (revents & (G_IO_OUT | G_IO_ERR)) && 414 aio_node_check(ctx, node->is_external) && 415 node->io_write) { 416 node->io_write(node->opaque); 417 progress = true; 418 } 419 420 if (node->deleted) { 421 if (qemu_lockcnt_dec_if_lock(&ctx->list_lock)) { 422 QLIST_REMOVE(node, node); 423 g_free(node); 424 qemu_lockcnt_inc_and_unlock(&ctx->list_lock); 425 } 426 } 427 } 428 429 qemu_lockcnt_dec(&ctx->list_lock); 430 return progress; 431 } 432 433 /* 434 * Note that dispatch_fds == false has the side-effect of post-poning the 435 * freeing of deleted handlers. 436 */ 437 bool aio_dispatch(AioContext *ctx, bool dispatch_fds) 438 { 439 bool progress; 440 441 /* 442 * If there are callbacks left that have been queued, we need to call them. 443 * Do not call select in this case, because it is possible that the caller 444 * does not need a complete flush (as is the case for aio_poll loops). 445 */ 446 progress = aio_bh_poll(ctx); 447 448 if (dispatch_fds) { 449 progress |= aio_dispatch_handlers(ctx); 450 } 451 452 /* Run our timers */ 453 progress |= timerlistgroup_run_timers(&ctx->tlg); 454 455 return progress; 456 } 457 458 /* These thread-local variables are used only in a small part of aio_poll 459 * around the call to the poll() system call. In particular they are not 460 * used while aio_poll is performing callbacks, which makes it much easier 461 * to think about reentrancy! 462 * 463 * Stack-allocated arrays would be perfect but they have size limitations; 464 * heap allocation is expensive enough that we want to reuse arrays across 465 * calls to aio_poll(). And because poll() has to be called without holding 466 * any lock, the arrays cannot be stored in AioContext. Thread-local data 467 * has none of the disadvantages of these three options. 468 */ 469 static __thread GPollFD *pollfds; 470 static __thread AioHandler **nodes; 471 static __thread unsigned npfd, nalloc; 472 static __thread Notifier pollfds_cleanup_notifier; 473 474 static void pollfds_cleanup(Notifier *n, void *unused) 475 { 476 g_assert(npfd == 0); 477 g_free(pollfds); 478 g_free(nodes); 479 nalloc = 0; 480 } 481 482 static void add_pollfd(AioHandler *node) 483 { 484 if (npfd == nalloc) { 485 if (nalloc == 0) { 486 pollfds_cleanup_notifier.notify = pollfds_cleanup; 487 qemu_thread_atexit_add(&pollfds_cleanup_notifier); 488 nalloc = 8; 489 } else { 490 g_assert(nalloc <= INT_MAX); 491 nalloc *= 2; 492 } 493 pollfds = g_renew(GPollFD, pollfds, nalloc); 494 nodes = g_renew(AioHandler *, nodes, nalloc); 495 } 496 nodes[npfd] = node; 497 pollfds[npfd] = (GPollFD) { 498 .fd = node->pfd.fd, 499 .events = node->pfd.events, 500 }; 501 npfd++; 502 } 503 504 static bool run_poll_handlers_once(AioContext *ctx) 505 { 506 bool progress = false; 507 AioHandler *node; 508 509 QLIST_FOREACH_RCU(node, &ctx->aio_handlers, node) { 510 if (!node->deleted && node->io_poll && 511 aio_node_check(ctx, node->is_external) && 512 node->io_poll(node->opaque)) { 513 progress = true; 514 } 515 516 /* Caller handles freeing deleted nodes. Don't do it here. */ 517 } 518 519 return progress; 520 } 521 522 /* run_poll_handlers: 523 * @ctx: the AioContext 524 * @max_ns: maximum time to poll for, in nanoseconds 525 * 526 * Polls for a given time. 527 * 528 * Note that ctx->notify_me must be non-zero so this function can detect 529 * aio_notify(). 530 * 531 * Note that the caller must have incremented ctx->list_lock. 532 * 533 * Returns: true if progress was made, false otherwise 534 */ 535 static bool run_poll_handlers(AioContext *ctx, int64_t max_ns) 536 { 537 bool progress; 538 int64_t end_time; 539 540 assert(ctx->notify_me); 541 assert(qemu_lockcnt_count(&ctx->list_lock) > 0); 542 assert(ctx->poll_disable_cnt == 0); 543 544 trace_run_poll_handlers_begin(ctx, max_ns); 545 546 end_time = qemu_clock_get_ns(QEMU_CLOCK_REALTIME) + max_ns; 547 548 do { 549 progress = run_poll_handlers_once(ctx); 550 } while (!progress && qemu_clock_get_ns(QEMU_CLOCK_REALTIME) < end_time); 551 552 trace_run_poll_handlers_end(ctx, progress); 553 554 return progress; 555 } 556 557 /* try_poll_mode: 558 * @ctx: the AioContext 559 * @blocking: busy polling is only attempted when blocking is true 560 * 561 * ctx->notify_me must be non-zero so this function can detect aio_notify(). 562 * 563 * Note that the caller must have incremented ctx->list_lock. 564 * 565 * Returns: true if progress was made, false otherwise 566 */ 567 static bool try_poll_mode(AioContext *ctx, bool blocking) 568 { 569 if (blocking && ctx->poll_max_ns && ctx->poll_disable_cnt == 0) { 570 /* See qemu_soonest_timeout() uint64_t hack */ 571 int64_t max_ns = MIN((uint64_t)aio_compute_timeout(ctx), 572 (uint64_t)ctx->poll_ns); 573 574 if (max_ns) { 575 poll_set_started(ctx, true); 576 577 if (run_poll_handlers(ctx, max_ns)) { 578 return true; 579 } 580 } 581 } 582 583 poll_set_started(ctx, false); 584 585 /* Even if we don't run busy polling, try polling once in case it can make 586 * progress and the caller will be able to avoid ppoll(2)/epoll_wait(2). 587 */ 588 return run_poll_handlers_once(ctx); 589 } 590 591 bool aio_poll(AioContext *ctx, bool blocking) 592 { 593 AioHandler *node; 594 int i; 595 int ret = 0; 596 bool progress; 597 int64_t timeout; 598 int64_t start = 0; 599 600 aio_context_acquire(ctx); 601 progress = false; 602 603 /* aio_notify can avoid the expensive event_notifier_set if 604 * everything (file descriptors, bottom halves, timers) will 605 * be re-evaluated before the next blocking poll(). This is 606 * already true when aio_poll is called with blocking == false; 607 * if blocking == true, it is only true after poll() returns, 608 * so disable the optimization now. 609 */ 610 if (blocking) { 611 atomic_add(&ctx->notify_me, 2); 612 } 613 614 qemu_lockcnt_inc(&ctx->list_lock); 615 616 if (ctx->poll_max_ns) { 617 start = qemu_clock_get_ns(QEMU_CLOCK_REALTIME); 618 } 619 620 if (try_poll_mode(ctx, blocking)) { 621 progress = true; 622 } else { 623 assert(npfd == 0); 624 625 /* fill pollfds */ 626 627 if (!aio_epoll_enabled(ctx)) { 628 QLIST_FOREACH_RCU(node, &ctx->aio_handlers, node) { 629 if (!node->deleted && node->pfd.events 630 && aio_node_check(ctx, node->is_external)) { 631 add_pollfd(node); 632 } 633 } 634 } 635 636 timeout = blocking ? aio_compute_timeout(ctx) : 0; 637 638 /* wait until next event */ 639 if (timeout) { 640 aio_context_release(ctx); 641 } 642 if (aio_epoll_check_poll(ctx, pollfds, npfd, timeout)) { 643 AioHandler epoll_handler; 644 645 epoll_handler.pfd.fd = ctx->epollfd; 646 epoll_handler.pfd.events = G_IO_IN | G_IO_OUT | G_IO_HUP | G_IO_ERR; 647 npfd = 0; 648 add_pollfd(&epoll_handler); 649 ret = aio_epoll(ctx, pollfds, npfd, timeout); 650 } else { 651 ret = qemu_poll_ns(pollfds, npfd, timeout); 652 } 653 if (timeout) { 654 aio_context_acquire(ctx); 655 } 656 } 657 658 if (blocking) { 659 atomic_sub(&ctx->notify_me, 2); 660 } 661 662 /* Adjust polling time */ 663 if (ctx->poll_max_ns) { 664 int64_t block_ns = qemu_clock_get_ns(QEMU_CLOCK_REALTIME) - start; 665 666 if (block_ns <= ctx->poll_ns) { 667 /* This is the sweet spot, no adjustment needed */ 668 } else if (block_ns > ctx->poll_max_ns) { 669 /* We'd have to poll for too long, poll less */ 670 int64_t old = ctx->poll_ns; 671 672 if (ctx->poll_shrink) { 673 ctx->poll_ns /= ctx->poll_shrink; 674 } else { 675 ctx->poll_ns = 0; 676 } 677 678 trace_poll_shrink(ctx, old, ctx->poll_ns); 679 } else if (ctx->poll_ns < ctx->poll_max_ns && 680 block_ns < ctx->poll_max_ns) { 681 /* There is room to grow, poll longer */ 682 int64_t old = ctx->poll_ns; 683 int64_t grow = ctx->poll_grow; 684 685 if (grow == 0) { 686 grow = 2; 687 } 688 689 if (ctx->poll_ns) { 690 ctx->poll_ns *= grow; 691 } else { 692 ctx->poll_ns = 4000; /* start polling at 4 microseconds */ 693 } 694 695 if (ctx->poll_ns > ctx->poll_max_ns) { 696 ctx->poll_ns = ctx->poll_max_ns; 697 } 698 699 trace_poll_grow(ctx, old, ctx->poll_ns); 700 } 701 } 702 703 aio_notify_accept(ctx); 704 705 /* if we have any readable fds, dispatch event */ 706 if (ret > 0) { 707 for (i = 0; i < npfd; i++) { 708 nodes[i]->pfd.revents = pollfds[i].revents; 709 } 710 } 711 712 npfd = 0; 713 qemu_lockcnt_dec(&ctx->list_lock); 714 715 /* Run dispatch even if there were no readable fds to run timers */ 716 if (aio_dispatch(ctx, ret > 0)) { 717 progress = true; 718 } 719 720 aio_context_release(ctx); 721 722 return progress; 723 } 724 725 void aio_context_setup(AioContext *ctx) 726 { 727 /* TODO remove this in final patch submission */ 728 if (getenv("QEMU_AIO_POLL_MAX_NS")) { 729 fprintf(stderr, "The QEMU_AIO_POLL_MAX_NS environment variable has " 730 "been replaced with -object iothread,poll-max-ns=NUM\n"); 731 exit(1); 732 } 733 734 #ifdef CONFIG_EPOLL_CREATE1 735 assert(!ctx->epollfd); 736 ctx->epollfd = epoll_create1(EPOLL_CLOEXEC); 737 if (ctx->epollfd == -1) { 738 fprintf(stderr, "Failed to create epoll instance: %s", strerror(errno)); 739 ctx->epoll_available = false; 740 } else { 741 ctx->epoll_available = true; 742 } 743 #endif 744 } 745 746 void aio_context_set_poll_params(AioContext *ctx, int64_t max_ns, 747 int64_t grow, int64_t shrink, Error **errp) 748 { 749 /* No thread synchronization here, it doesn't matter if an incorrect value 750 * is used once. 751 */ 752 ctx->poll_max_ns = max_ns; 753 ctx->poll_ns = 0; 754 ctx->poll_grow = grow; 755 ctx->poll_shrink = shrink; 756 757 aio_notify(ctx); 758 } 759