1 /* 2 * QEMU aio implementation 3 * 4 * Copyright IBM, Corp. 2008 5 * 6 * Authors: 7 * Anthony Liguori <aliguori@us.ibm.com> 8 * 9 * This work is licensed under the terms of the GNU GPL, version 2. See 10 * the COPYING file in the top-level directory. 11 * 12 * Contributions after 2012-01-13 are licensed under the terms of the 13 * GNU GPL, version 2 or (at your option) any later version. 14 */ 15 16 #include "qemu/osdep.h" 17 #include "block/block.h" 18 #include "qemu/rcu.h" 19 #include "qemu/rcu_queue.h" 20 #include "qemu/sockets.h" 21 #include "qemu/cutils.h" 22 #include "trace.h" 23 #include "aio-posix.h" 24 25 /* Stop userspace polling on a handler if it isn't active for some time */ 26 #define POLL_IDLE_INTERVAL_NS (7 * NANOSECONDS_PER_SECOND) 27 28 bool aio_poll_disabled(AioContext *ctx) 29 { 30 return atomic_read(&ctx->poll_disable_cnt); 31 } 32 33 void aio_add_ready_handler(AioHandlerList *ready_list, 34 AioHandler *node, 35 int revents) 36 { 37 QLIST_SAFE_REMOVE(node, node_ready); /* remove from nested parent's list */ 38 node->pfd.revents = revents; 39 QLIST_INSERT_HEAD(ready_list, node, node_ready); 40 } 41 42 static AioHandler *find_aio_handler(AioContext *ctx, int fd) 43 { 44 AioHandler *node; 45 46 QLIST_FOREACH(node, &ctx->aio_handlers, node) { 47 if (node->pfd.fd == fd) { 48 if (!QLIST_IS_INSERTED(node, node_deleted)) { 49 return node; 50 } 51 } 52 } 53 54 return NULL; 55 } 56 57 static bool aio_remove_fd_handler(AioContext *ctx, AioHandler *node) 58 { 59 /* If the GSource is in the process of being destroyed then 60 * g_source_remove_poll() causes an assertion failure. Skip 61 * removal in that case, because glib cleans up its state during 62 * destruction anyway. 63 */ 64 if (!g_source_is_destroyed(&ctx->source)) { 65 g_source_remove_poll(&ctx->source, &node->pfd); 66 } 67 68 node->pfd.revents = 0; 69 70 /* If the fd monitor has already marked it deleted, leave it alone */ 71 if (QLIST_IS_INSERTED(node, node_deleted)) { 72 return false; 73 } 74 75 /* If a read is in progress, just mark the node as deleted */ 76 if (qemu_lockcnt_count(&ctx->list_lock)) { 77 QLIST_INSERT_HEAD_RCU(&ctx->deleted_aio_handlers, node, node_deleted); 78 return false; 79 } 80 /* Otherwise, delete it for real. We can't just mark it as 81 * deleted because deleted nodes are only cleaned up while 82 * no one is walking the handlers list. 83 */ 84 QLIST_SAFE_REMOVE(node, node_poll); 85 QLIST_REMOVE(node, node); 86 return true; 87 } 88 89 void aio_set_fd_handler(AioContext *ctx, 90 int fd, 91 bool is_external, 92 IOHandler *io_read, 93 IOHandler *io_write, 94 AioPollFn *io_poll, 95 void *opaque) 96 { 97 AioHandler *node; 98 AioHandler *new_node = NULL; 99 bool is_new = false; 100 bool deleted = false; 101 int poll_disable_change; 102 103 qemu_lockcnt_lock(&ctx->list_lock); 104 105 node = find_aio_handler(ctx, fd); 106 107 /* Are we deleting the fd handler? */ 108 if (!io_read && !io_write && !io_poll) { 109 if (node == NULL) { 110 qemu_lockcnt_unlock(&ctx->list_lock); 111 return; 112 } 113 /* Clean events in order to unregister fd from the ctx epoll. */ 114 node->pfd.events = 0; 115 116 poll_disable_change = -!node->io_poll; 117 } else { 118 poll_disable_change = !io_poll - (node && !node->io_poll); 119 if (node == NULL) { 120 is_new = true; 121 } 122 /* Alloc and insert if it's not already there */ 123 new_node = g_new0(AioHandler, 1); 124 125 /* Update handler with latest information */ 126 new_node->io_read = io_read; 127 new_node->io_write = io_write; 128 new_node->io_poll = io_poll; 129 new_node->opaque = opaque; 130 new_node->is_external = is_external; 131 132 if (is_new) { 133 new_node->pfd.fd = fd; 134 } else { 135 new_node->pfd = node->pfd; 136 } 137 g_source_add_poll(&ctx->source, &new_node->pfd); 138 139 new_node->pfd.events = (io_read ? G_IO_IN | G_IO_HUP | G_IO_ERR : 0); 140 new_node->pfd.events |= (io_write ? G_IO_OUT | G_IO_ERR : 0); 141 142 QLIST_INSERT_HEAD_RCU(&ctx->aio_handlers, new_node, node); 143 } 144 145 /* No need to order poll_disable_cnt writes against other updates; 146 * the counter is only used to avoid wasting time and latency on 147 * iterated polling when the system call will be ultimately necessary. 148 * Changing handlers is a rare event, and a little wasted polling until 149 * the aio_notify below is not an issue. 150 */ 151 atomic_set(&ctx->poll_disable_cnt, 152 atomic_read(&ctx->poll_disable_cnt) + poll_disable_change); 153 154 ctx->fdmon_ops->update(ctx, node, new_node); 155 if (node) { 156 deleted = aio_remove_fd_handler(ctx, node); 157 } 158 qemu_lockcnt_unlock(&ctx->list_lock); 159 aio_notify(ctx); 160 161 if (deleted) { 162 g_free(node); 163 } 164 } 165 166 void aio_set_fd_poll(AioContext *ctx, int fd, 167 IOHandler *io_poll_begin, 168 IOHandler *io_poll_end) 169 { 170 AioHandler *node = find_aio_handler(ctx, fd); 171 172 if (!node) { 173 return; 174 } 175 176 node->io_poll_begin = io_poll_begin; 177 node->io_poll_end = io_poll_end; 178 } 179 180 void aio_set_event_notifier(AioContext *ctx, 181 EventNotifier *notifier, 182 bool is_external, 183 EventNotifierHandler *io_read, 184 AioPollFn *io_poll) 185 { 186 aio_set_fd_handler(ctx, event_notifier_get_fd(notifier), is_external, 187 (IOHandler *)io_read, NULL, io_poll, notifier); 188 } 189 190 void aio_set_event_notifier_poll(AioContext *ctx, 191 EventNotifier *notifier, 192 EventNotifierHandler *io_poll_begin, 193 EventNotifierHandler *io_poll_end) 194 { 195 aio_set_fd_poll(ctx, event_notifier_get_fd(notifier), 196 (IOHandler *)io_poll_begin, 197 (IOHandler *)io_poll_end); 198 } 199 200 static bool poll_set_started(AioContext *ctx, bool started) 201 { 202 AioHandler *node; 203 bool progress = false; 204 205 if (started == ctx->poll_started) { 206 return false; 207 } 208 209 ctx->poll_started = started; 210 211 qemu_lockcnt_inc(&ctx->list_lock); 212 QLIST_FOREACH(node, &ctx->poll_aio_handlers, node_poll) { 213 IOHandler *fn; 214 215 if (QLIST_IS_INSERTED(node, node_deleted)) { 216 continue; 217 } 218 219 if (started) { 220 fn = node->io_poll_begin; 221 } else { 222 fn = node->io_poll_end; 223 } 224 225 if (fn) { 226 fn(node->opaque); 227 } 228 229 /* Poll one last time in case ->io_poll_end() raced with the event */ 230 if (!started) { 231 progress = node->io_poll(node->opaque) || progress; 232 } 233 } 234 qemu_lockcnt_dec(&ctx->list_lock); 235 236 return progress; 237 } 238 239 240 bool aio_prepare(AioContext *ctx) 241 { 242 /* Poll mode cannot be used with glib's event loop, disable it. */ 243 poll_set_started(ctx, false); 244 245 return false; 246 } 247 248 bool aio_pending(AioContext *ctx) 249 { 250 AioHandler *node; 251 bool result = false; 252 253 /* 254 * We have to walk very carefully in case aio_set_fd_handler is 255 * called while we're walking. 256 */ 257 qemu_lockcnt_inc(&ctx->list_lock); 258 259 QLIST_FOREACH_RCU(node, &ctx->aio_handlers, node) { 260 int revents; 261 262 revents = node->pfd.revents & node->pfd.events; 263 if (revents & (G_IO_IN | G_IO_HUP | G_IO_ERR) && node->io_read && 264 aio_node_check(ctx, node->is_external)) { 265 result = true; 266 break; 267 } 268 if (revents & (G_IO_OUT | G_IO_ERR) && node->io_write && 269 aio_node_check(ctx, node->is_external)) { 270 result = true; 271 break; 272 } 273 } 274 qemu_lockcnt_dec(&ctx->list_lock); 275 276 return result; 277 } 278 279 static void aio_free_deleted_handlers(AioContext *ctx) 280 { 281 AioHandler *node; 282 283 if (QLIST_EMPTY_RCU(&ctx->deleted_aio_handlers)) { 284 return; 285 } 286 if (!qemu_lockcnt_dec_if_lock(&ctx->list_lock)) { 287 return; /* we are nested, let the parent do the freeing */ 288 } 289 290 while ((node = QLIST_FIRST_RCU(&ctx->deleted_aio_handlers))) { 291 QLIST_REMOVE(node, node); 292 QLIST_REMOVE(node, node_deleted); 293 QLIST_SAFE_REMOVE(node, node_poll); 294 g_free(node); 295 } 296 297 qemu_lockcnt_inc_and_unlock(&ctx->list_lock); 298 } 299 300 static bool aio_dispatch_handler(AioContext *ctx, AioHandler *node) 301 { 302 bool progress = false; 303 int revents; 304 305 revents = node->pfd.revents & node->pfd.events; 306 node->pfd.revents = 0; 307 308 /* 309 * Start polling AioHandlers when they become ready because activity is 310 * likely to continue. Note that starvation is theoretically possible when 311 * fdmon_supports_polling(), but only until the fd fires for the first 312 * time. 313 */ 314 if (!QLIST_IS_INSERTED(node, node_deleted) && 315 !QLIST_IS_INSERTED(node, node_poll) && 316 node->io_poll) { 317 trace_poll_add(ctx, node, node->pfd.fd, revents); 318 if (ctx->poll_started && node->io_poll_begin) { 319 node->io_poll_begin(node->opaque); 320 } 321 QLIST_INSERT_HEAD(&ctx->poll_aio_handlers, node, node_poll); 322 } 323 324 if (!QLIST_IS_INSERTED(node, node_deleted) && 325 (revents & (G_IO_IN | G_IO_HUP | G_IO_ERR)) && 326 aio_node_check(ctx, node->is_external) && 327 node->io_read) { 328 node->io_read(node->opaque); 329 330 /* aio_notify() does not count as progress */ 331 if (node->opaque != &ctx->notifier) { 332 progress = true; 333 } 334 } 335 if (!QLIST_IS_INSERTED(node, node_deleted) && 336 (revents & (G_IO_OUT | G_IO_ERR)) && 337 aio_node_check(ctx, node->is_external) && 338 node->io_write) { 339 node->io_write(node->opaque); 340 progress = true; 341 } 342 343 return progress; 344 } 345 346 /* 347 * If we have a list of ready handlers then this is more efficient than 348 * scanning all handlers with aio_dispatch_handlers(). 349 */ 350 static bool aio_dispatch_ready_handlers(AioContext *ctx, 351 AioHandlerList *ready_list) 352 { 353 bool progress = false; 354 AioHandler *node; 355 356 while ((node = QLIST_FIRST(ready_list))) { 357 QLIST_REMOVE(node, node_ready); 358 progress = aio_dispatch_handler(ctx, node) || progress; 359 } 360 361 return progress; 362 } 363 364 /* Slower than aio_dispatch_ready_handlers() but only used via glib */ 365 static bool aio_dispatch_handlers(AioContext *ctx) 366 { 367 AioHandler *node, *tmp; 368 bool progress = false; 369 370 QLIST_FOREACH_SAFE_RCU(node, &ctx->aio_handlers, node, tmp) { 371 progress = aio_dispatch_handler(ctx, node) || progress; 372 } 373 374 return progress; 375 } 376 377 void aio_dispatch(AioContext *ctx) 378 { 379 qemu_lockcnt_inc(&ctx->list_lock); 380 aio_bh_poll(ctx); 381 aio_dispatch_handlers(ctx); 382 aio_free_deleted_handlers(ctx); 383 qemu_lockcnt_dec(&ctx->list_lock); 384 385 timerlistgroup_run_timers(&ctx->tlg); 386 } 387 388 static bool run_poll_handlers_once(AioContext *ctx, 389 int64_t now, 390 int64_t *timeout) 391 { 392 bool progress = false; 393 AioHandler *node; 394 AioHandler *tmp; 395 396 QLIST_FOREACH_SAFE(node, &ctx->poll_aio_handlers, node_poll, tmp) { 397 if (aio_node_check(ctx, node->is_external) && 398 node->io_poll(node->opaque)) { 399 node->poll_idle_timeout = now + POLL_IDLE_INTERVAL_NS; 400 401 /* 402 * Polling was successful, exit try_poll_mode immediately 403 * to adjust the next polling time. 404 */ 405 *timeout = 0; 406 if (node->opaque != &ctx->notifier) { 407 progress = true; 408 } 409 } 410 411 /* Caller handles freeing deleted nodes. Don't do it here. */ 412 } 413 414 return progress; 415 } 416 417 static bool fdmon_supports_polling(AioContext *ctx) 418 { 419 return ctx->fdmon_ops->need_wait != aio_poll_disabled; 420 } 421 422 static bool remove_idle_poll_handlers(AioContext *ctx, int64_t now) 423 { 424 AioHandler *node; 425 AioHandler *tmp; 426 bool progress = false; 427 428 /* 429 * File descriptor monitoring implementations without userspace polling 430 * support suffer from starvation when a subset of handlers is polled 431 * because fds will not be processed in a timely fashion. Don't remove 432 * idle poll handlers. 433 */ 434 if (!fdmon_supports_polling(ctx)) { 435 return false; 436 } 437 438 QLIST_FOREACH_SAFE(node, &ctx->poll_aio_handlers, node_poll, tmp) { 439 if (node->poll_idle_timeout == 0LL) { 440 node->poll_idle_timeout = now + POLL_IDLE_INTERVAL_NS; 441 } else if (now >= node->poll_idle_timeout) { 442 trace_poll_remove(ctx, node, node->pfd.fd); 443 node->poll_idle_timeout = 0LL; 444 QLIST_SAFE_REMOVE(node, node_poll); 445 if (ctx->poll_started && node->io_poll_end) { 446 node->io_poll_end(node->opaque); 447 448 /* 449 * Final poll in case ->io_poll_end() races with an event. 450 * Nevermind about re-adding the handler in the rare case where 451 * this causes progress. 452 */ 453 progress = node->io_poll(node->opaque) || progress; 454 } 455 } 456 } 457 458 return progress; 459 } 460 461 /* run_poll_handlers: 462 * @ctx: the AioContext 463 * @max_ns: maximum time to poll for, in nanoseconds 464 * 465 * Polls for a given time. 466 * 467 * Note that the caller must have incremented ctx->list_lock. 468 * 469 * Returns: true if progress was made, false otherwise 470 */ 471 static bool run_poll_handlers(AioContext *ctx, int64_t max_ns, int64_t *timeout) 472 { 473 bool progress; 474 int64_t start_time, elapsed_time; 475 476 assert(qemu_lockcnt_count(&ctx->list_lock) > 0); 477 478 trace_run_poll_handlers_begin(ctx, max_ns, *timeout); 479 480 /* 481 * Optimization: ->io_poll() handlers often contain RCU read critical 482 * sections and we therefore see many rcu_read_lock() -> rcu_read_unlock() 483 * -> rcu_read_lock() -> ... sequences with expensive memory 484 * synchronization primitives. Make the entire polling loop an RCU 485 * critical section because nested rcu_read_lock()/rcu_read_unlock() calls 486 * are cheap. 487 */ 488 RCU_READ_LOCK_GUARD(); 489 490 start_time = qemu_clock_get_ns(QEMU_CLOCK_REALTIME); 491 do { 492 progress = run_poll_handlers_once(ctx, start_time, timeout); 493 elapsed_time = qemu_clock_get_ns(QEMU_CLOCK_REALTIME) - start_time; 494 max_ns = qemu_soonest_timeout(*timeout, max_ns); 495 assert(!(max_ns && progress)); 496 } while (elapsed_time < max_ns && !ctx->fdmon_ops->need_wait(ctx)); 497 498 if (remove_idle_poll_handlers(ctx, start_time + elapsed_time)) { 499 *timeout = 0; 500 progress = true; 501 } 502 503 /* If time has passed with no successful polling, adjust *timeout to 504 * keep the same ending time. 505 */ 506 if (*timeout != -1) { 507 *timeout -= MIN(*timeout, elapsed_time); 508 } 509 510 trace_run_poll_handlers_end(ctx, progress, *timeout); 511 return progress; 512 } 513 514 /* try_poll_mode: 515 * @ctx: the AioContext 516 * @timeout: timeout for blocking wait, computed by the caller and updated if 517 * polling succeeds. 518 * 519 * Note that the caller must have incremented ctx->list_lock. 520 * 521 * Returns: true if progress was made, false otherwise 522 */ 523 static bool try_poll_mode(AioContext *ctx, int64_t *timeout) 524 { 525 int64_t max_ns; 526 527 if (QLIST_EMPTY_RCU(&ctx->poll_aio_handlers)) { 528 return false; 529 } 530 531 max_ns = qemu_soonest_timeout(*timeout, ctx->poll_ns); 532 if (max_ns && !ctx->fdmon_ops->need_wait(ctx)) { 533 poll_set_started(ctx, true); 534 535 if (run_poll_handlers(ctx, max_ns, timeout)) { 536 return true; 537 } 538 } 539 540 if (poll_set_started(ctx, false)) { 541 *timeout = 0; 542 return true; 543 } 544 545 return false; 546 } 547 548 bool aio_poll(AioContext *ctx, bool blocking) 549 { 550 AioHandlerList ready_list = QLIST_HEAD_INITIALIZER(ready_list); 551 int ret = 0; 552 bool progress; 553 bool use_notify_me; 554 int64_t timeout; 555 int64_t start = 0; 556 557 /* 558 * There cannot be two concurrent aio_poll calls for the same AioContext (or 559 * an aio_poll concurrent with a GSource prepare/check/dispatch callback). 560 * We rely on this below to avoid slow locked accesses to ctx->notify_me. 561 */ 562 assert(in_aio_context_home_thread(ctx)); 563 564 qemu_lockcnt_inc(&ctx->list_lock); 565 566 if (ctx->poll_max_ns) { 567 start = qemu_clock_get_ns(QEMU_CLOCK_REALTIME); 568 } 569 570 timeout = blocking ? aio_compute_timeout(ctx) : 0; 571 progress = try_poll_mode(ctx, &timeout); 572 assert(!(timeout && progress)); 573 574 /* 575 * aio_notify can avoid the expensive event_notifier_set if 576 * everything (file descriptors, bottom halves, timers) will 577 * be re-evaluated before the next blocking poll(). This is 578 * already true when aio_poll is called with blocking == false; 579 * if blocking == true, it is only true after poll() returns, 580 * so disable the optimization now. 581 */ 582 use_notify_me = timeout != 0; 583 if (use_notify_me) { 584 atomic_set(&ctx->notify_me, atomic_read(&ctx->notify_me) + 2); 585 /* 586 * Write ctx->notify_me before reading ctx->notified. Pairs with 587 * smp_mb in aio_notify(). 588 */ 589 smp_mb(); 590 591 /* Don't block if aio_notify() was called */ 592 if (atomic_read(&ctx->notified)) { 593 timeout = 0; 594 } 595 } 596 597 /* If polling is allowed, non-blocking aio_poll does not need the 598 * system call---a single round of run_poll_handlers_once suffices. 599 */ 600 if (timeout || ctx->fdmon_ops->need_wait(ctx)) { 601 ret = ctx->fdmon_ops->wait(ctx, &ready_list, timeout); 602 } 603 604 if (use_notify_me) { 605 /* Finish the poll before clearing the flag. */ 606 atomic_store_release(&ctx->notify_me, 607 atomic_read(&ctx->notify_me) - 2); 608 } 609 610 aio_notify_accept(ctx); 611 612 /* Adjust polling time */ 613 if (ctx->poll_max_ns) { 614 int64_t block_ns = qemu_clock_get_ns(QEMU_CLOCK_REALTIME) - start; 615 616 if (block_ns <= ctx->poll_ns) { 617 /* This is the sweet spot, no adjustment needed */ 618 } else if (block_ns > ctx->poll_max_ns) { 619 /* We'd have to poll for too long, poll less */ 620 int64_t old = ctx->poll_ns; 621 622 if (ctx->poll_shrink) { 623 ctx->poll_ns /= ctx->poll_shrink; 624 } else { 625 ctx->poll_ns = 0; 626 } 627 628 trace_poll_shrink(ctx, old, ctx->poll_ns); 629 } else if (ctx->poll_ns < ctx->poll_max_ns && 630 block_ns < ctx->poll_max_ns) { 631 /* There is room to grow, poll longer */ 632 int64_t old = ctx->poll_ns; 633 int64_t grow = ctx->poll_grow; 634 635 if (grow == 0) { 636 grow = 2; 637 } 638 639 if (ctx->poll_ns) { 640 ctx->poll_ns *= grow; 641 } else { 642 ctx->poll_ns = 4000; /* start polling at 4 microseconds */ 643 } 644 645 if (ctx->poll_ns > ctx->poll_max_ns) { 646 ctx->poll_ns = ctx->poll_max_ns; 647 } 648 649 trace_poll_grow(ctx, old, ctx->poll_ns); 650 } 651 } 652 653 progress |= aio_bh_poll(ctx); 654 655 if (ret > 0) { 656 progress |= aio_dispatch_ready_handlers(ctx, &ready_list); 657 } 658 659 aio_free_deleted_handlers(ctx); 660 661 qemu_lockcnt_dec(&ctx->list_lock); 662 663 progress |= timerlistgroup_run_timers(&ctx->tlg); 664 665 return progress; 666 } 667 668 void aio_context_setup(AioContext *ctx) 669 { 670 ctx->fdmon_ops = &fdmon_poll_ops; 671 ctx->epollfd = -1; 672 673 /* Use the fastest fd monitoring implementation if available */ 674 if (fdmon_io_uring_setup(ctx)) { 675 return; 676 } 677 678 fdmon_epoll_setup(ctx); 679 } 680 681 void aio_context_destroy(AioContext *ctx) 682 { 683 fdmon_io_uring_destroy(ctx); 684 fdmon_epoll_disable(ctx); 685 aio_free_deleted_handlers(ctx); 686 } 687 688 void aio_context_use_g_source(AioContext *ctx) 689 { 690 /* 691 * Disable io_uring when the glib main loop is used because it doesn't 692 * support mixed glib/aio_poll() usage. It relies on aio_poll() being 693 * called regularly so that changes to the monitored file descriptors are 694 * submitted, otherwise a list of pending fd handlers builds up. 695 */ 696 fdmon_io_uring_destroy(ctx); 697 aio_free_deleted_handlers(ctx); 698 } 699 700 void aio_context_set_poll_params(AioContext *ctx, int64_t max_ns, 701 int64_t grow, int64_t shrink, Error **errp) 702 { 703 /* No thread synchronization here, it doesn't matter if an incorrect value 704 * is used once. 705 */ 706 ctx->poll_max_ns = max_ns; 707 ctx->poll_ns = 0; 708 ctx->poll_grow = grow; 709 ctx->poll_shrink = shrink; 710 711 aio_notify(ctx); 712 } 713