1 /* 2 * QEMU aio implementation 3 * 4 * Copyright IBM, Corp. 2008 5 * 6 * Authors: 7 * Anthony Liguori <aliguori@us.ibm.com> 8 * 9 * This work is licensed under the terms of the GNU GPL, version 2. See 10 * the COPYING file in the top-level directory. 11 * 12 * Contributions after 2012-01-13 are licensed under the terms of the 13 * GNU GPL, version 2 or (at your option) any later version. 14 */ 15 16 #include "qemu/osdep.h" 17 #include "block/block.h" 18 #include "qemu/rcu.h" 19 #include "qemu/rcu_queue.h" 20 #include "qemu/sockets.h" 21 #include "qemu/cutils.h" 22 #include "trace.h" 23 #include "aio-posix.h" 24 25 /* Stop userspace polling on a handler if it isn't active for some time */ 26 #define POLL_IDLE_INTERVAL_NS (7 * NANOSECONDS_PER_SECOND) 27 28 bool aio_poll_disabled(AioContext *ctx) 29 { 30 return atomic_read(&ctx->poll_disable_cnt); 31 } 32 33 void aio_add_ready_handler(AioHandlerList *ready_list, 34 AioHandler *node, 35 int revents) 36 { 37 QLIST_SAFE_REMOVE(node, node_ready); /* remove from nested parent's list */ 38 node->pfd.revents = revents; 39 QLIST_INSERT_HEAD(ready_list, node, node_ready); 40 } 41 42 static AioHandler *find_aio_handler(AioContext *ctx, int fd) 43 { 44 AioHandler *node; 45 46 QLIST_FOREACH(node, &ctx->aio_handlers, node) { 47 if (node->pfd.fd == fd) { 48 if (!QLIST_IS_INSERTED(node, node_deleted)) { 49 return node; 50 } 51 } 52 } 53 54 return NULL; 55 } 56 57 static bool aio_remove_fd_handler(AioContext *ctx, AioHandler *node) 58 { 59 /* If the GSource is in the process of being destroyed then 60 * g_source_remove_poll() causes an assertion failure. Skip 61 * removal in that case, because glib cleans up its state during 62 * destruction anyway. 63 */ 64 if (!g_source_is_destroyed(&ctx->source)) { 65 g_source_remove_poll(&ctx->source, &node->pfd); 66 } 67 68 node->pfd.revents = 0; 69 70 /* If the fd monitor has already marked it deleted, leave it alone */ 71 if (QLIST_IS_INSERTED(node, node_deleted)) { 72 return false; 73 } 74 75 /* If a read is in progress, just mark the node as deleted */ 76 if (qemu_lockcnt_count(&ctx->list_lock)) { 77 QLIST_INSERT_HEAD_RCU(&ctx->deleted_aio_handlers, node, node_deleted); 78 return false; 79 } 80 /* Otherwise, delete it for real. We can't just mark it as 81 * deleted because deleted nodes are only cleaned up while 82 * no one is walking the handlers list. 83 */ 84 QLIST_SAFE_REMOVE(node, node_poll); 85 QLIST_REMOVE(node, node); 86 return true; 87 } 88 89 void aio_set_fd_handler(AioContext *ctx, 90 int fd, 91 bool is_external, 92 IOHandler *io_read, 93 IOHandler *io_write, 94 AioPollFn *io_poll, 95 void *opaque) 96 { 97 AioHandler *node; 98 AioHandler *new_node = NULL; 99 bool is_new = false; 100 bool deleted = false; 101 int poll_disable_change; 102 103 qemu_lockcnt_lock(&ctx->list_lock); 104 105 node = find_aio_handler(ctx, fd); 106 107 /* Are we deleting the fd handler? */ 108 if (!io_read && !io_write && !io_poll) { 109 if (node == NULL) { 110 qemu_lockcnt_unlock(&ctx->list_lock); 111 return; 112 } 113 /* Clean events in order to unregister fd from the ctx epoll. */ 114 node->pfd.events = 0; 115 116 poll_disable_change = -!node->io_poll; 117 } else { 118 poll_disable_change = !io_poll - (node && !node->io_poll); 119 if (node == NULL) { 120 is_new = true; 121 } 122 /* Alloc and insert if it's not already there */ 123 new_node = g_new0(AioHandler, 1); 124 125 /* Update handler with latest information */ 126 new_node->io_read = io_read; 127 new_node->io_write = io_write; 128 new_node->io_poll = io_poll; 129 new_node->opaque = opaque; 130 new_node->is_external = is_external; 131 132 if (is_new) { 133 new_node->pfd.fd = fd; 134 } else { 135 new_node->pfd = node->pfd; 136 } 137 g_source_add_poll(&ctx->source, &new_node->pfd); 138 139 new_node->pfd.events = (io_read ? G_IO_IN | G_IO_HUP | G_IO_ERR : 0); 140 new_node->pfd.events |= (io_write ? G_IO_OUT | G_IO_ERR : 0); 141 142 QLIST_INSERT_HEAD_RCU(&ctx->aio_handlers, new_node, node); 143 } 144 145 /* No need to order poll_disable_cnt writes against other updates; 146 * the counter is only used to avoid wasting time and latency on 147 * iterated polling when the system call will be ultimately necessary. 148 * Changing handlers is a rare event, and a little wasted polling until 149 * the aio_notify below is not an issue. 150 */ 151 atomic_set(&ctx->poll_disable_cnt, 152 atomic_read(&ctx->poll_disable_cnt) + poll_disable_change); 153 154 ctx->fdmon_ops->update(ctx, node, new_node); 155 if (node) { 156 deleted = aio_remove_fd_handler(ctx, node); 157 } 158 qemu_lockcnt_unlock(&ctx->list_lock); 159 aio_notify(ctx); 160 161 if (deleted) { 162 g_free(node); 163 } 164 } 165 166 void aio_set_fd_poll(AioContext *ctx, int fd, 167 IOHandler *io_poll_begin, 168 IOHandler *io_poll_end) 169 { 170 AioHandler *node = find_aio_handler(ctx, fd); 171 172 if (!node) { 173 return; 174 } 175 176 node->io_poll_begin = io_poll_begin; 177 node->io_poll_end = io_poll_end; 178 } 179 180 void aio_set_event_notifier(AioContext *ctx, 181 EventNotifier *notifier, 182 bool is_external, 183 EventNotifierHandler *io_read, 184 AioPollFn *io_poll) 185 { 186 aio_set_fd_handler(ctx, event_notifier_get_fd(notifier), is_external, 187 (IOHandler *)io_read, NULL, io_poll, notifier); 188 } 189 190 void aio_set_event_notifier_poll(AioContext *ctx, 191 EventNotifier *notifier, 192 EventNotifierHandler *io_poll_begin, 193 EventNotifierHandler *io_poll_end) 194 { 195 aio_set_fd_poll(ctx, event_notifier_get_fd(notifier), 196 (IOHandler *)io_poll_begin, 197 (IOHandler *)io_poll_end); 198 } 199 200 static bool poll_set_started(AioContext *ctx, bool started) 201 { 202 AioHandler *node; 203 bool progress = false; 204 205 if (started == ctx->poll_started) { 206 return false; 207 } 208 209 ctx->poll_started = started; 210 211 qemu_lockcnt_inc(&ctx->list_lock); 212 QLIST_FOREACH(node, &ctx->poll_aio_handlers, node_poll) { 213 IOHandler *fn; 214 215 if (QLIST_IS_INSERTED(node, node_deleted)) { 216 continue; 217 } 218 219 if (started) { 220 fn = node->io_poll_begin; 221 } else { 222 fn = node->io_poll_end; 223 } 224 225 if (fn) { 226 fn(node->opaque); 227 } 228 229 /* Poll one last time in case ->io_poll_end() raced with the event */ 230 if (!started) { 231 progress = node->io_poll(node->opaque) || progress; 232 } 233 } 234 qemu_lockcnt_dec(&ctx->list_lock); 235 236 return progress; 237 } 238 239 240 bool aio_prepare(AioContext *ctx) 241 { 242 /* Poll mode cannot be used with glib's event loop, disable it. */ 243 poll_set_started(ctx, false); 244 245 return false; 246 } 247 248 bool aio_pending(AioContext *ctx) 249 { 250 AioHandler *node; 251 bool result = false; 252 253 /* 254 * We have to walk very carefully in case aio_set_fd_handler is 255 * called while we're walking. 256 */ 257 qemu_lockcnt_inc(&ctx->list_lock); 258 259 QLIST_FOREACH_RCU(node, &ctx->aio_handlers, node) { 260 int revents; 261 262 revents = node->pfd.revents & node->pfd.events; 263 if (revents & (G_IO_IN | G_IO_HUP | G_IO_ERR) && node->io_read && 264 aio_node_check(ctx, node->is_external)) { 265 result = true; 266 break; 267 } 268 if (revents & (G_IO_OUT | G_IO_ERR) && node->io_write && 269 aio_node_check(ctx, node->is_external)) { 270 result = true; 271 break; 272 } 273 } 274 qemu_lockcnt_dec(&ctx->list_lock); 275 276 return result; 277 } 278 279 static void aio_free_deleted_handlers(AioContext *ctx) 280 { 281 AioHandler *node; 282 283 if (QLIST_EMPTY_RCU(&ctx->deleted_aio_handlers)) { 284 return; 285 } 286 if (!qemu_lockcnt_dec_if_lock(&ctx->list_lock)) { 287 return; /* we are nested, let the parent do the freeing */ 288 } 289 290 while ((node = QLIST_FIRST_RCU(&ctx->deleted_aio_handlers))) { 291 QLIST_REMOVE(node, node); 292 QLIST_REMOVE(node, node_deleted); 293 QLIST_SAFE_REMOVE(node, node_poll); 294 g_free(node); 295 } 296 297 qemu_lockcnt_inc_and_unlock(&ctx->list_lock); 298 } 299 300 static bool aio_dispatch_handler(AioContext *ctx, AioHandler *node) 301 { 302 bool progress = false; 303 int revents; 304 305 revents = node->pfd.revents & node->pfd.events; 306 node->pfd.revents = 0; 307 308 /* 309 * Start polling AioHandlers when they become ready because activity is 310 * likely to continue. Note that starvation is theoretically possible when 311 * fdmon_supports_polling(), but only until the fd fires for the first 312 * time. 313 */ 314 if (!QLIST_IS_INSERTED(node, node_deleted) && 315 !QLIST_IS_INSERTED(node, node_poll) && 316 node->io_poll) { 317 trace_poll_add(ctx, node, node->pfd.fd, revents); 318 if (ctx->poll_started && node->io_poll_begin) { 319 node->io_poll_begin(node->opaque); 320 } 321 QLIST_INSERT_HEAD(&ctx->poll_aio_handlers, node, node_poll); 322 } 323 324 if (!QLIST_IS_INSERTED(node, node_deleted) && 325 (revents & (G_IO_IN | G_IO_HUP | G_IO_ERR)) && 326 aio_node_check(ctx, node->is_external) && 327 node->io_read) { 328 node->io_read(node->opaque); 329 330 /* aio_notify() does not count as progress */ 331 if (node->opaque != &ctx->notifier) { 332 progress = true; 333 } 334 } 335 if (!QLIST_IS_INSERTED(node, node_deleted) && 336 (revents & (G_IO_OUT | G_IO_ERR)) && 337 aio_node_check(ctx, node->is_external) && 338 node->io_write) { 339 node->io_write(node->opaque); 340 progress = true; 341 } 342 343 return progress; 344 } 345 346 /* 347 * If we have a list of ready handlers then this is more efficient than 348 * scanning all handlers with aio_dispatch_handlers(). 349 */ 350 static bool aio_dispatch_ready_handlers(AioContext *ctx, 351 AioHandlerList *ready_list) 352 { 353 bool progress = false; 354 AioHandler *node; 355 356 while ((node = QLIST_FIRST(ready_list))) { 357 QLIST_REMOVE(node, node_ready); 358 progress = aio_dispatch_handler(ctx, node) || progress; 359 } 360 361 return progress; 362 } 363 364 /* Slower than aio_dispatch_ready_handlers() but only used via glib */ 365 static bool aio_dispatch_handlers(AioContext *ctx) 366 { 367 AioHandler *node, *tmp; 368 bool progress = false; 369 370 QLIST_FOREACH_SAFE_RCU(node, &ctx->aio_handlers, node, tmp) { 371 progress = aio_dispatch_handler(ctx, node) || progress; 372 } 373 374 return progress; 375 } 376 377 void aio_dispatch(AioContext *ctx) 378 { 379 qemu_lockcnt_inc(&ctx->list_lock); 380 aio_bh_poll(ctx); 381 aio_dispatch_handlers(ctx); 382 aio_free_deleted_handlers(ctx); 383 qemu_lockcnt_dec(&ctx->list_lock); 384 385 timerlistgroup_run_timers(&ctx->tlg); 386 } 387 388 static bool run_poll_handlers_once(AioContext *ctx, 389 int64_t now, 390 int64_t *timeout) 391 { 392 bool progress = false; 393 AioHandler *node; 394 AioHandler *tmp; 395 396 QLIST_FOREACH_SAFE(node, &ctx->poll_aio_handlers, node_poll, tmp) { 397 if (aio_node_check(ctx, node->is_external) && 398 node->io_poll(node->opaque)) { 399 node->poll_idle_timeout = now + POLL_IDLE_INTERVAL_NS; 400 401 /* 402 * Polling was successful, exit try_poll_mode immediately 403 * to adjust the next polling time. 404 */ 405 *timeout = 0; 406 if (node->opaque != &ctx->notifier) { 407 progress = true; 408 } 409 } 410 411 /* Caller handles freeing deleted nodes. Don't do it here. */ 412 } 413 414 return progress; 415 } 416 417 static bool fdmon_supports_polling(AioContext *ctx) 418 { 419 return ctx->fdmon_ops->need_wait != aio_poll_disabled; 420 } 421 422 static bool remove_idle_poll_handlers(AioContext *ctx, int64_t now) 423 { 424 AioHandler *node; 425 AioHandler *tmp; 426 bool progress = false; 427 428 /* 429 * File descriptor monitoring implementations without userspace polling 430 * support suffer from starvation when a subset of handlers is polled 431 * because fds will not be processed in a timely fashion. Don't remove 432 * idle poll handlers. 433 */ 434 if (!fdmon_supports_polling(ctx)) { 435 return false; 436 } 437 438 QLIST_FOREACH_SAFE(node, &ctx->poll_aio_handlers, node_poll, tmp) { 439 if (node->poll_idle_timeout == 0LL) { 440 node->poll_idle_timeout = now + POLL_IDLE_INTERVAL_NS; 441 } else if (now >= node->poll_idle_timeout) { 442 trace_poll_remove(ctx, node, node->pfd.fd); 443 node->poll_idle_timeout = 0LL; 444 QLIST_SAFE_REMOVE(node, node_poll); 445 if (ctx->poll_started && node->io_poll_end) { 446 node->io_poll_end(node->opaque); 447 448 /* 449 * Final poll in case ->io_poll_end() races with an event. 450 * Nevermind about re-adding the handler in the rare case where 451 * this causes progress. 452 */ 453 progress = node->io_poll(node->opaque) || progress; 454 } 455 } 456 } 457 458 return progress; 459 } 460 461 /* run_poll_handlers: 462 * @ctx: the AioContext 463 * @max_ns: maximum time to poll for, in nanoseconds 464 * 465 * Polls for a given time. 466 * 467 * Note that ctx->notify_me must be non-zero so this function can detect 468 * aio_notify(). 469 * 470 * Note that the caller must have incremented ctx->list_lock. 471 * 472 * Returns: true if progress was made, false otherwise 473 */ 474 static bool run_poll_handlers(AioContext *ctx, int64_t max_ns, int64_t *timeout) 475 { 476 bool progress; 477 int64_t start_time, elapsed_time; 478 479 assert(ctx->notify_me); 480 assert(qemu_lockcnt_count(&ctx->list_lock) > 0); 481 482 trace_run_poll_handlers_begin(ctx, max_ns, *timeout); 483 484 /* 485 * Optimization: ->io_poll() handlers often contain RCU read critical 486 * sections and we therefore see many rcu_read_lock() -> rcu_read_unlock() 487 * -> rcu_read_lock() -> ... sequences with expensive memory 488 * synchronization primitives. Make the entire polling loop an RCU 489 * critical section because nested rcu_read_lock()/rcu_read_unlock() calls 490 * are cheap. 491 */ 492 RCU_READ_LOCK_GUARD(); 493 494 start_time = qemu_clock_get_ns(QEMU_CLOCK_REALTIME); 495 do { 496 progress = run_poll_handlers_once(ctx, start_time, timeout); 497 elapsed_time = qemu_clock_get_ns(QEMU_CLOCK_REALTIME) - start_time; 498 max_ns = qemu_soonest_timeout(*timeout, max_ns); 499 assert(!(max_ns && progress)); 500 } while (elapsed_time < max_ns && !ctx->fdmon_ops->need_wait(ctx)); 501 502 if (remove_idle_poll_handlers(ctx, start_time + elapsed_time)) { 503 *timeout = 0; 504 progress = true; 505 } 506 507 /* If time has passed with no successful polling, adjust *timeout to 508 * keep the same ending time. 509 */ 510 if (*timeout != -1) { 511 *timeout -= MIN(*timeout, elapsed_time); 512 } 513 514 trace_run_poll_handlers_end(ctx, progress, *timeout); 515 return progress; 516 } 517 518 /* try_poll_mode: 519 * @ctx: the AioContext 520 * @timeout: timeout for blocking wait, computed by the caller and updated if 521 * polling succeeds. 522 * 523 * ctx->notify_me must be non-zero so this function can detect aio_notify(). 524 * 525 * Note that the caller must have incremented ctx->list_lock. 526 * 527 * Returns: true if progress was made, false otherwise 528 */ 529 static bool try_poll_mode(AioContext *ctx, int64_t *timeout) 530 { 531 int64_t max_ns; 532 533 if (QLIST_EMPTY_RCU(&ctx->poll_aio_handlers)) { 534 return false; 535 } 536 537 max_ns = qemu_soonest_timeout(*timeout, ctx->poll_ns); 538 if (max_ns && !ctx->fdmon_ops->need_wait(ctx)) { 539 poll_set_started(ctx, true); 540 541 if (run_poll_handlers(ctx, max_ns, timeout)) { 542 return true; 543 } 544 } 545 546 if (poll_set_started(ctx, false)) { 547 *timeout = 0; 548 return true; 549 } 550 551 return false; 552 } 553 554 bool aio_poll(AioContext *ctx, bool blocking) 555 { 556 AioHandlerList ready_list = QLIST_HEAD_INITIALIZER(ready_list); 557 int ret = 0; 558 bool progress; 559 int64_t timeout; 560 int64_t start = 0; 561 562 /* 563 * There cannot be two concurrent aio_poll calls for the same AioContext (or 564 * an aio_poll concurrent with a GSource prepare/check/dispatch callback). 565 * We rely on this below to avoid slow locked accesses to ctx->notify_me. 566 */ 567 assert(in_aio_context_home_thread(ctx)); 568 569 /* aio_notify can avoid the expensive event_notifier_set if 570 * everything (file descriptors, bottom halves, timers) will 571 * be re-evaluated before the next blocking poll(). This is 572 * already true when aio_poll is called with blocking == false; 573 * if blocking == true, it is only true after poll() returns, 574 * so disable the optimization now. 575 */ 576 if (blocking) { 577 atomic_set(&ctx->notify_me, atomic_read(&ctx->notify_me) + 2); 578 /* 579 * Write ctx->notify_me before computing the timeout 580 * (reading bottom half flags, etc.). Pairs with 581 * smp_mb in aio_notify(). 582 */ 583 smp_mb(); 584 } 585 586 qemu_lockcnt_inc(&ctx->list_lock); 587 588 if (ctx->poll_max_ns) { 589 start = qemu_clock_get_ns(QEMU_CLOCK_REALTIME); 590 } 591 592 timeout = blocking ? aio_compute_timeout(ctx) : 0; 593 progress = try_poll_mode(ctx, &timeout); 594 assert(!(timeout && progress)); 595 596 /* If polling is allowed, non-blocking aio_poll does not need the 597 * system call---a single round of run_poll_handlers_once suffices. 598 */ 599 if (timeout || ctx->fdmon_ops->need_wait(ctx)) { 600 ret = ctx->fdmon_ops->wait(ctx, &ready_list, timeout); 601 } 602 603 if (blocking) { 604 /* Finish the poll before clearing the flag. */ 605 atomic_store_release(&ctx->notify_me, atomic_read(&ctx->notify_me) - 2); 606 aio_notify_accept(ctx); 607 } 608 609 /* Adjust polling time */ 610 if (ctx->poll_max_ns) { 611 int64_t block_ns = qemu_clock_get_ns(QEMU_CLOCK_REALTIME) - start; 612 613 if (block_ns <= ctx->poll_ns) { 614 /* This is the sweet spot, no adjustment needed */ 615 } else if (block_ns > ctx->poll_max_ns) { 616 /* We'd have to poll for too long, poll less */ 617 int64_t old = ctx->poll_ns; 618 619 if (ctx->poll_shrink) { 620 ctx->poll_ns /= ctx->poll_shrink; 621 } else { 622 ctx->poll_ns = 0; 623 } 624 625 trace_poll_shrink(ctx, old, ctx->poll_ns); 626 } else if (ctx->poll_ns < ctx->poll_max_ns && 627 block_ns < ctx->poll_max_ns) { 628 /* There is room to grow, poll longer */ 629 int64_t old = ctx->poll_ns; 630 int64_t grow = ctx->poll_grow; 631 632 if (grow == 0) { 633 grow = 2; 634 } 635 636 if (ctx->poll_ns) { 637 ctx->poll_ns *= grow; 638 } else { 639 ctx->poll_ns = 4000; /* start polling at 4 microseconds */ 640 } 641 642 if (ctx->poll_ns > ctx->poll_max_ns) { 643 ctx->poll_ns = ctx->poll_max_ns; 644 } 645 646 trace_poll_grow(ctx, old, ctx->poll_ns); 647 } 648 } 649 650 progress |= aio_bh_poll(ctx); 651 652 if (ret > 0) { 653 progress |= aio_dispatch_ready_handlers(ctx, &ready_list); 654 } 655 656 aio_free_deleted_handlers(ctx); 657 658 qemu_lockcnt_dec(&ctx->list_lock); 659 660 progress |= timerlistgroup_run_timers(&ctx->tlg); 661 662 return progress; 663 } 664 665 void aio_context_setup(AioContext *ctx) 666 { 667 ctx->fdmon_ops = &fdmon_poll_ops; 668 ctx->epollfd = -1; 669 670 /* Use the fastest fd monitoring implementation if available */ 671 if (fdmon_io_uring_setup(ctx)) { 672 return; 673 } 674 675 fdmon_epoll_setup(ctx); 676 } 677 678 void aio_context_destroy(AioContext *ctx) 679 { 680 fdmon_io_uring_destroy(ctx); 681 fdmon_epoll_disable(ctx); 682 aio_free_deleted_handlers(ctx); 683 } 684 685 void aio_context_use_g_source(AioContext *ctx) 686 { 687 /* 688 * Disable io_uring when the glib main loop is used because it doesn't 689 * support mixed glib/aio_poll() usage. It relies on aio_poll() being 690 * called regularly so that changes to the monitored file descriptors are 691 * submitted, otherwise a list of pending fd handlers builds up. 692 */ 693 fdmon_io_uring_destroy(ctx); 694 aio_free_deleted_handlers(ctx); 695 } 696 697 void aio_context_set_poll_params(AioContext *ctx, int64_t max_ns, 698 int64_t grow, int64_t shrink, Error **errp) 699 { 700 /* No thread synchronization here, it doesn't matter if an incorrect value 701 * is used once. 702 */ 703 ctx->poll_max_ns = max_ns; 704 ctx->poll_ns = 0; 705 ctx->poll_grow = grow; 706 ctx->poll_shrink = shrink; 707 708 aio_notify(ctx); 709 } 710