1 // SPDX-License-Identifier: GPL-2.0 2 /* 3 * Basic worker thread pool for io_uring 4 * 5 * Copyright (C) 2019 Jens Axboe 6 * 7 */ 8 #include <linux/kernel.h> 9 #include <linux/init.h> 10 #include <linux/errno.h> 11 #include <linux/sched/signal.h> 12 #include <linux/percpu.h> 13 #include <linux/slab.h> 14 #include <linux/rculist_nulls.h> 15 #include <linux/cpu.h> 16 #include <linux/task_work.h> 17 #include <linux/audit.h> 18 #include <linux/mmu_context.h> 19 #include <uapi/linux/io_uring.h> 20 21 #include "io-wq.h" 22 #include "slist.h" 23 #include "io_uring.h" 24 25 #define WORKER_IDLE_TIMEOUT (5 * HZ) 26 27 enum { 28 IO_WORKER_F_UP = 1, /* up and active */ 29 IO_WORKER_F_RUNNING = 2, /* account as running */ 30 IO_WORKER_F_FREE = 4, /* worker on free list */ 31 IO_WORKER_F_BOUND = 8, /* is doing bounded work */ 32 }; 33 34 enum { 35 IO_WQ_BIT_EXIT = 0, /* wq exiting */ 36 }; 37 38 enum { 39 IO_ACCT_STALLED_BIT = 0, /* stalled on hash */ 40 }; 41 42 /* 43 * One for each thread in a wq pool 44 */ 45 struct io_worker { 46 refcount_t ref; 47 unsigned flags; 48 struct hlist_nulls_node nulls_node; 49 struct list_head all_list; 50 struct task_struct *task; 51 struct io_wq *wq; 52 53 struct io_wq_work *cur_work; 54 struct io_wq_work *next_work; 55 raw_spinlock_t lock; 56 57 struct completion ref_done; 58 59 unsigned long create_state; 60 struct callback_head create_work; 61 int create_index; 62 63 union { 64 struct rcu_head rcu; 65 struct work_struct work; 66 }; 67 }; 68 69 #if BITS_PER_LONG == 64 70 #define IO_WQ_HASH_ORDER 6 71 #else 72 #define IO_WQ_HASH_ORDER 5 73 #endif 74 75 #define IO_WQ_NR_HASH_BUCKETS (1u << IO_WQ_HASH_ORDER) 76 77 struct io_wq_acct { 78 unsigned nr_workers; 79 unsigned max_workers; 80 int index; 81 atomic_t nr_running; 82 raw_spinlock_t lock; 83 struct io_wq_work_list work_list; 84 unsigned long flags; 85 }; 86 87 enum { 88 IO_WQ_ACCT_BOUND, 89 IO_WQ_ACCT_UNBOUND, 90 IO_WQ_ACCT_NR, 91 }; 92 93 /* 94 * Per io_wq state 95 */ 96 struct io_wq { 97 unsigned long state; 98 99 free_work_fn *free_work; 100 io_wq_work_fn *do_work; 101 102 struct io_wq_hash *hash; 103 104 atomic_t worker_refs; 105 struct completion worker_done; 106 107 struct hlist_node cpuhp_node; 108 109 struct task_struct *task; 110 111 struct io_wq_acct acct[IO_WQ_ACCT_NR]; 112 113 /* lock protects access to elements below */ 114 raw_spinlock_t lock; 115 116 struct hlist_nulls_head free_list; 117 struct list_head all_list; 118 119 struct wait_queue_entry wait; 120 121 struct io_wq_work *hash_tail[IO_WQ_NR_HASH_BUCKETS]; 122 123 cpumask_var_t cpu_mask; 124 }; 125 126 static enum cpuhp_state io_wq_online; 127 128 struct io_cb_cancel_data { 129 work_cancel_fn *fn; 130 void *data; 131 int nr_running; 132 int nr_pending; 133 bool cancel_all; 134 }; 135 136 static bool create_io_worker(struct io_wq *wq, int index); 137 static void io_wq_dec_running(struct io_worker *worker); 138 static bool io_acct_cancel_pending_work(struct io_wq *wq, 139 struct io_wq_acct *acct, 140 struct io_cb_cancel_data *match); 141 static void create_worker_cb(struct callback_head *cb); 142 static void io_wq_cancel_tw_create(struct io_wq *wq); 143 144 static bool io_worker_get(struct io_worker *worker) 145 { 146 return refcount_inc_not_zero(&worker->ref); 147 } 148 149 static void io_worker_release(struct io_worker *worker) 150 { 151 if (refcount_dec_and_test(&worker->ref)) 152 complete(&worker->ref_done); 153 } 154 155 static inline struct io_wq_acct *io_get_acct(struct io_wq *wq, bool bound) 156 { 157 return &wq->acct[bound ? IO_WQ_ACCT_BOUND : IO_WQ_ACCT_UNBOUND]; 158 } 159 160 static inline struct io_wq_acct *io_work_get_acct(struct io_wq *wq, 161 struct io_wq_work *work) 162 { 163 return io_get_acct(wq, !(work->flags & IO_WQ_WORK_UNBOUND)); 164 } 165 166 static inline struct io_wq_acct *io_wq_get_acct(struct io_worker *worker) 167 { 168 return io_get_acct(worker->wq, worker->flags & IO_WORKER_F_BOUND); 169 } 170 171 static void io_worker_ref_put(struct io_wq *wq) 172 { 173 if (atomic_dec_and_test(&wq->worker_refs)) 174 complete(&wq->worker_done); 175 } 176 177 static void io_worker_cancel_cb(struct io_worker *worker) 178 { 179 struct io_wq_acct *acct = io_wq_get_acct(worker); 180 struct io_wq *wq = worker->wq; 181 182 atomic_dec(&acct->nr_running); 183 raw_spin_lock(&wq->lock); 184 acct->nr_workers--; 185 raw_spin_unlock(&wq->lock); 186 io_worker_ref_put(wq); 187 clear_bit_unlock(0, &worker->create_state); 188 io_worker_release(worker); 189 } 190 191 static bool io_task_worker_match(struct callback_head *cb, void *data) 192 { 193 struct io_worker *worker; 194 195 if (cb->func != create_worker_cb) 196 return false; 197 worker = container_of(cb, struct io_worker, create_work); 198 return worker == data; 199 } 200 201 static void io_worker_exit(struct io_worker *worker) 202 { 203 struct io_wq *wq = worker->wq; 204 205 while (1) { 206 struct callback_head *cb = task_work_cancel_match(wq->task, 207 io_task_worker_match, worker); 208 209 if (!cb) 210 break; 211 io_worker_cancel_cb(worker); 212 } 213 214 io_worker_release(worker); 215 wait_for_completion(&worker->ref_done); 216 217 raw_spin_lock(&wq->lock); 218 if (worker->flags & IO_WORKER_F_FREE) 219 hlist_nulls_del_rcu(&worker->nulls_node); 220 list_del_rcu(&worker->all_list); 221 raw_spin_unlock(&wq->lock); 222 io_wq_dec_running(worker); 223 /* 224 * this worker is a goner, clear ->worker_private to avoid any 225 * inc/dec running calls that could happen as part of exit from 226 * touching 'worker'. 227 */ 228 current->worker_private = NULL; 229 230 kfree_rcu(worker, rcu); 231 io_worker_ref_put(wq); 232 do_exit(0); 233 } 234 235 static inline bool __io_acct_run_queue(struct io_wq_acct *acct) 236 { 237 return !test_bit(IO_ACCT_STALLED_BIT, &acct->flags) && 238 !wq_list_empty(&acct->work_list); 239 } 240 241 /* 242 * If there's work to do, returns true with acct->lock acquired. If not, 243 * returns false with no lock held. 244 */ 245 static inline bool io_acct_run_queue(struct io_wq_acct *acct) 246 __acquires(&acct->lock) 247 { 248 raw_spin_lock(&acct->lock); 249 if (__io_acct_run_queue(acct)) 250 return true; 251 252 raw_spin_unlock(&acct->lock); 253 return false; 254 } 255 256 /* 257 * Check head of free list for an available worker. If one isn't available, 258 * caller must create one. 259 */ 260 static bool io_wq_activate_free_worker(struct io_wq *wq, 261 struct io_wq_acct *acct) 262 __must_hold(RCU) 263 { 264 struct hlist_nulls_node *n; 265 struct io_worker *worker; 266 267 /* 268 * Iterate free_list and see if we can find an idle worker to 269 * activate. If a given worker is on the free_list but in the process 270 * of exiting, keep trying. 271 */ 272 hlist_nulls_for_each_entry_rcu(worker, n, &wq->free_list, nulls_node) { 273 if (!io_worker_get(worker)) 274 continue; 275 if (io_wq_get_acct(worker) != acct) { 276 io_worker_release(worker); 277 continue; 278 } 279 /* 280 * If the worker is already running, it's either already 281 * starting work or finishing work. In either case, if it does 282 * to go sleep, we'll kick off a new task for this work anyway. 283 */ 284 wake_up_process(worker->task); 285 io_worker_release(worker); 286 return true; 287 } 288 289 return false; 290 } 291 292 /* 293 * We need a worker. If we find a free one, we're good. If not, and we're 294 * below the max number of workers, create one. 295 */ 296 static bool io_wq_create_worker(struct io_wq *wq, struct io_wq_acct *acct) 297 { 298 /* 299 * Most likely an attempt to queue unbounded work on an io_wq that 300 * wasn't setup with any unbounded workers. 301 */ 302 if (unlikely(!acct->max_workers)) 303 pr_warn_once("io-wq is not configured for unbound workers"); 304 305 raw_spin_lock(&wq->lock); 306 if (acct->nr_workers >= acct->max_workers) { 307 raw_spin_unlock(&wq->lock); 308 return true; 309 } 310 acct->nr_workers++; 311 raw_spin_unlock(&wq->lock); 312 atomic_inc(&acct->nr_running); 313 atomic_inc(&wq->worker_refs); 314 return create_io_worker(wq, acct->index); 315 } 316 317 static void io_wq_inc_running(struct io_worker *worker) 318 { 319 struct io_wq_acct *acct = io_wq_get_acct(worker); 320 321 atomic_inc(&acct->nr_running); 322 } 323 324 static void create_worker_cb(struct callback_head *cb) 325 { 326 struct io_worker *worker; 327 struct io_wq *wq; 328 329 struct io_wq_acct *acct; 330 bool do_create = false; 331 332 worker = container_of(cb, struct io_worker, create_work); 333 wq = worker->wq; 334 acct = &wq->acct[worker->create_index]; 335 raw_spin_lock(&wq->lock); 336 337 if (acct->nr_workers < acct->max_workers) { 338 acct->nr_workers++; 339 do_create = true; 340 } 341 raw_spin_unlock(&wq->lock); 342 if (do_create) { 343 create_io_worker(wq, worker->create_index); 344 } else { 345 atomic_dec(&acct->nr_running); 346 io_worker_ref_put(wq); 347 } 348 clear_bit_unlock(0, &worker->create_state); 349 io_worker_release(worker); 350 } 351 352 static bool io_queue_worker_create(struct io_worker *worker, 353 struct io_wq_acct *acct, 354 task_work_func_t func) 355 { 356 struct io_wq *wq = worker->wq; 357 358 /* raced with exit, just ignore create call */ 359 if (test_bit(IO_WQ_BIT_EXIT, &wq->state)) 360 goto fail; 361 if (!io_worker_get(worker)) 362 goto fail; 363 /* 364 * create_state manages ownership of create_work/index. We should 365 * only need one entry per worker, as the worker going to sleep 366 * will trigger the condition, and waking will clear it once it 367 * runs the task_work. 368 */ 369 if (test_bit(0, &worker->create_state) || 370 test_and_set_bit_lock(0, &worker->create_state)) 371 goto fail_release; 372 373 atomic_inc(&wq->worker_refs); 374 init_task_work(&worker->create_work, func); 375 worker->create_index = acct->index; 376 if (!task_work_add(wq->task, &worker->create_work, TWA_SIGNAL)) { 377 /* 378 * EXIT may have been set after checking it above, check after 379 * adding the task_work and remove any creation item if it is 380 * now set. wq exit does that too, but we can have added this 381 * work item after we canceled in io_wq_exit_workers(). 382 */ 383 if (test_bit(IO_WQ_BIT_EXIT, &wq->state)) 384 io_wq_cancel_tw_create(wq); 385 io_worker_ref_put(wq); 386 return true; 387 } 388 io_worker_ref_put(wq); 389 clear_bit_unlock(0, &worker->create_state); 390 fail_release: 391 io_worker_release(worker); 392 fail: 393 atomic_dec(&acct->nr_running); 394 io_worker_ref_put(wq); 395 return false; 396 } 397 398 static void io_wq_dec_running(struct io_worker *worker) 399 { 400 struct io_wq_acct *acct = io_wq_get_acct(worker); 401 struct io_wq *wq = worker->wq; 402 403 if (!(worker->flags & IO_WORKER_F_UP)) 404 return; 405 406 if (!atomic_dec_and_test(&acct->nr_running)) 407 return; 408 if (!io_acct_run_queue(acct)) 409 return; 410 411 raw_spin_unlock(&acct->lock); 412 atomic_inc(&acct->nr_running); 413 atomic_inc(&wq->worker_refs); 414 io_queue_worker_create(worker, acct, create_worker_cb); 415 } 416 417 /* 418 * Worker will start processing some work. Move it to the busy list, if 419 * it's currently on the freelist 420 */ 421 static void __io_worker_busy(struct io_wq *wq, struct io_worker *worker) 422 { 423 if (worker->flags & IO_WORKER_F_FREE) { 424 worker->flags &= ~IO_WORKER_F_FREE; 425 raw_spin_lock(&wq->lock); 426 hlist_nulls_del_init_rcu(&worker->nulls_node); 427 raw_spin_unlock(&wq->lock); 428 } 429 } 430 431 /* 432 * No work, worker going to sleep. Move to freelist. 433 */ 434 static void __io_worker_idle(struct io_wq *wq, struct io_worker *worker) 435 __must_hold(wq->lock) 436 { 437 if (!(worker->flags & IO_WORKER_F_FREE)) { 438 worker->flags |= IO_WORKER_F_FREE; 439 hlist_nulls_add_head_rcu(&worker->nulls_node, &wq->free_list); 440 } 441 } 442 443 static inline unsigned int io_get_work_hash(struct io_wq_work *work) 444 { 445 return work->flags >> IO_WQ_HASH_SHIFT; 446 } 447 448 static bool io_wait_on_hash(struct io_wq *wq, unsigned int hash) 449 { 450 bool ret = false; 451 452 spin_lock_irq(&wq->hash->wait.lock); 453 if (list_empty(&wq->wait.entry)) { 454 __add_wait_queue(&wq->hash->wait, &wq->wait); 455 if (!test_bit(hash, &wq->hash->map)) { 456 __set_current_state(TASK_RUNNING); 457 list_del_init(&wq->wait.entry); 458 ret = true; 459 } 460 } 461 spin_unlock_irq(&wq->hash->wait.lock); 462 return ret; 463 } 464 465 static struct io_wq_work *io_get_next_work(struct io_wq_acct *acct, 466 struct io_worker *worker) 467 __must_hold(acct->lock) 468 { 469 struct io_wq_work_node *node, *prev; 470 struct io_wq_work *work, *tail; 471 unsigned int stall_hash = -1U; 472 struct io_wq *wq = worker->wq; 473 474 wq_list_for_each(node, prev, &acct->work_list) { 475 unsigned int hash; 476 477 work = container_of(node, struct io_wq_work, list); 478 479 /* not hashed, can run anytime */ 480 if (!io_wq_is_hashed(work)) { 481 wq_list_del(&acct->work_list, node, prev); 482 return work; 483 } 484 485 hash = io_get_work_hash(work); 486 /* all items with this hash lie in [work, tail] */ 487 tail = wq->hash_tail[hash]; 488 489 /* hashed, can run if not already running */ 490 if (!test_and_set_bit(hash, &wq->hash->map)) { 491 wq->hash_tail[hash] = NULL; 492 wq_list_cut(&acct->work_list, &tail->list, prev); 493 return work; 494 } 495 if (stall_hash == -1U) 496 stall_hash = hash; 497 /* fast forward to a next hash, for-each will fix up @prev */ 498 node = &tail->list; 499 } 500 501 if (stall_hash != -1U) { 502 bool unstalled; 503 504 /* 505 * Set this before dropping the lock to avoid racing with new 506 * work being added and clearing the stalled bit. 507 */ 508 set_bit(IO_ACCT_STALLED_BIT, &acct->flags); 509 raw_spin_unlock(&acct->lock); 510 unstalled = io_wait_on_hash(wq, stall_hash); 511 raw_spin_lock(&acct->lock); 512 if (unstalled) { 513 clear_bit(IO_ACCT_STALLED_BIT, &acct->flags); 514 if (wq_has_sleeper(&wq->hash->wait)) 515 wake_up(&wq->hash->wait); 516 } 517 } 518 519 return NULL; 520 } 521 522 static void io_assign_current_work(struct io_worker *worker, 523 struct io_wq_work *work) 524 { 525 if (work) { 526 io_run_task_work(); 527 cond_resched(); 528 } 529 530 raw_spin_lock(&worker->lock); 531 worker->cur_work = work; 532 worker->next_work = NULL; 533 raw_spin_unlock(&worker->lock); 534 } 535 536 /* 537 * Called with acct->lock held, drops it before returning 538 */ 539 static void io_worker_handle_work(struct io_wq_acct *acct, 540 struct io_worker *worker) 541 __releases(&acct->lock) 542 { 543 struct io_wq *wq = worker->wq; 544 bool do_kill = test_bit(IO_WQ_BIT_EXIT, &wq->state); 545 546 do { 547 struct io_wq_work *work; 548 549 /* 550 * If we got some work, mark us as busy. If we didn't, but 551 * the list isn't empty, it means we stalled on hashed work. 552 * Mark us stalled so we don't keep looking for work when we 553 * can't make progress, any work completion or insertion will 554 * clear the stalled flag. 555 */ 556 work = io_get_next_work(acct, worker); 557 raw_spin_unlock(&acct->lock); 558 if (work) { 559 __io_worker_busy(wq, worker); 560 561 /* 562 * Make sure cancelation can find this, even before 563 * it becomes the active work. That avoids a window 564 * where the work has been removed from our general 565 * work list, but isn't yet discoverable as the 566 * current work item for this worker. 567 */ 568 raw_spin_lock(&worker->lock); 569 worker->next_work = work; 570 raw_spin_unlock(&worker->lock); 571 } else { 572 break; 573 } 574 io_assign_current_work(worker, work); 575 __set_current_state(TASK_RUNNING); 576 577 /* handle a whole dependent link */ 578 do { 579 struct io_wq_work *next_hashed, *linked; 580 unsigned int hash = io_get_work_hash(work); 581 582 next_hashed = wq_next_work(work); 583 584 if (unlikely(do_kill) && (work->flags & IO_WQ_WORK_UNBOUND)) 585 work->flags |= IO_WQ_WORK_CANCEL; 586 wq->do_work(work); 587 io_assign_current_work(worker, NULL); 588 589 linked = wq->free_work(work); 590 work = next_hashed; 591 if (!work && linked && !io_wq_is_hashed(linked)) { 592 work = linked; 593 linked = NULL; 594 } 595 io_assign_current_work(worker, work); 596 if (linked) 597 io_wq_enqueue(wq, linked); 598 599 if (hash != -1U && !next_hashed) { 600 /* serialize hash clear with wake_up() */ 601 spin_lock_irq(&wq->hash->wait.lock); 602 clear_bit(hash, &wq->hash->map); 603 clear_bit(IO_ACCT_STALLED_BIT, &acct->flags); 604 spin_unlock_irq(&wq->hash->wait.lock); 605 if (wq_has_sleeper(&wq->hash->wait)) 606 wake_up(&wq->hash->wait); 607 } 608 } while (work); 609 610 if (!__io_acct_run_queue(acct)) 611 break; 612 raw_spin_lock(&acct->lock); 613 } while (1); 614 } 615 616 static int io_wq_worker(void *data) 617 { 618 struct io_worker *worker = data; 619 struct io_wq_acct *acct = io_wq_get_acct(worker); 620 struct io_wq *wq = worker->wq; 621 bool exit_mask = false, last_timeout = false; 622 char buf[TASK_COMM_LEN]; 623 624 worker->flags |= (IO_WORKER_F_UP | IO_WORKER_F_RUNNING); 625 626 snprintf(buf, sizeof(buf), "iou-wrk-%d", wq->task->pid); 627 set_task_comm(current, buf); 628 629 while (!test_bit(IO_WQ_BIT_EXIT, &wq->state)) { 630 long ret; 631 632 set_current_state(TASK_INTERRUPTIBLE); 633 634 /* 635 * If we have work to do, io_acct_run_queue() returns with 636 * the acct->lock held. If not, it will drop it. 637 */ 638 while (io_acct_run_queue(acct)) 639 io_worker_handle_work(acct, worker); 640 641 raw_spin_lock(&wq->lock); 642 /* 643 * Last sleep timed out. Exit if we're not the last worker, 644 * or if someone modified our affinity. 645 */ 646 if (last_timeout && (exit_mask || acct->nr_workers > 1)) { 647 acct->nr_workers--; 648 raw_spin_unlock(&wq->lock); 649 __set_current_state(TASK_RUNNING); 650 break; 651 } 652 last_timeout = false; 653 __io_worker_idle(wq, worker); 654 raw_spin_unlock(&wq->lock); 655 if (io_run_task_work()) 656 continue; 657 ret = schedule_timeout(WORKER_IDLE_TIMEOUT); 658 if (signal_pending(current)) { 659 struct ksignal ksig; 660 661 if (!get_signal(&ksig)) 662 continue; 663 break; 664 } 665 if (!ret) { 666 last_timeout = true; 667 exit_mask = !cpumask_test_cpu(raw_smp_processor_id(), 668 wq->cpu_mask); 669 } 670 } 671 672 if (test_bit(IO_WQ_BIT_EXIT, &wq->state) && io_acct_run_queue(acct)) 673 io_worker_handle_work(acct, worker); 674 675 io_worker_exit(worker); 676 return 0; 677 } 678 679 /* 680 * Called when a worker is scheduled in. Mark us as currently running. 681 */ 682 void io_wq_worker_running(struct task_struct *tsk) 683 { 684 struct io_worker *worker = tsk->worker_private; 685 686 if (!worker) 687 return; 688 if (!(worker->flags & IO_WORKER_F_UP)) 689 return; 690 if (worker->flags & IO_WORKER_F_RUNNING) 691 return; 692 worker->flags |= IO_WORKER_F_RUNNING; 693 io_wq_inc_running(worker); 694 } 695 696 /* 697 * Called when worker is going to sleep. If there are no workers currently 698 * running and we have work pending, wake up a free one or create a new one. 699 */ 700 void io_wq_worker_sleeping(struct task_struct *tsk) 701 { 702 struct io_worker *worker = tsk->worker_private; 703 704 if (!worker) 705 return; 706 if (!(worker->flags & IO_WORKER_F_UP)) 707 return; 708 if (!(worker->flags & IO_WORKER_F_RUNNING)) 709 return; 710 711 worker->flags &= ~IO_WORKER_F_RUNNING; 712 io_wq_dec_running(worker); 713 } 714 715 static void io_init_new_worker(struct io_wq *wq, struct io_worker *worker, 716 struct task_struct *tsk) 717 { 718 tsk->worker_private = worker; 719 worker->task = tsk; 720 set_cpus_allowed_ptr(tsk, wq->cpu_mask); 721 722 raw_spin_lock(&wq->lock); 723 hlist_nulls_add_head_rcu(&worker->nulls_node, &wq->free_list); 724 list_add_tail_rcu(&worker->all_list, &wq->all_list); 725 worker->flags |= IO_WORKER_F_FREE; 726 raw_spin_unlock(&wq->lock); 727 wake_up_new_task(tsk); 728 } 729 730 static bool io_wq_work_match_all(struct io_wq_work *work, void *data) 731 { 732 return true; 733 } 734 735 static inline bool io_should_retry_thread(long err) 736 { 737 /* 738 * Prevent perpetual task_work retry, if the task (or its group) is 739 * exiting. 740 */ 741 if (fatal_signal_pending(current)) 742 return false; 743 744 switch (err) { 745 case -EAGAIN: 746 case -ERESTARTSYS: 747 case -ERESTARTNOINTR: 748 case -ERESTARTNOHAND: 749 return true; 750 default: 751 return false; 752 } 753 } 754 755 static void create_worker_cont(struct callback_head *cb) 756 { 757 struct io_worker *worker; 758 struct task_struct *tsk; 759 struct io_wq *wq; 760 761 worker = container_of(cb, struct io_worker, create_work); 762 clear_bit_unlock(0, &worker->create_state); 763 wq = worker->wq; 764 tsk = create_io_thread(io_wq_worker, worker, NUMA_NO_NODE); 765 if (!IS_ERR(tsk)) { 766 io_init_new_worker(wq, worker, tsk); 767 io_worker_release(worker); 768 return; 769 } else if (!io_should_retry_thread(PTR_ERR(tsk))) { 770 struct io_wq_acct *acct = io_wq_get_acct(worker); 771 772 atomic_dec(&acct->nr_running); 773 raw_spin_lock(&wq->lock); 774 acct->nr_workers--; 775 if (!acct->nr_workers) { 776 struct io_cb_cancel_data match = { 777 .fn = io_wq_work_match_all, 778 .cancel_all = true, 779 }; 780 781 raw_spin_unlock(&wq->lock); 782 while (io_acct_cancel_pending_work(wq, acct, &match)) 783 ; 784 } else { 785 raw_spin_unlock(&wq->lock); 786 } 787 io_worker_ref_put(wq); 788 kfree(worker); 789 return; 790 } 791 792 /* re-create attempts grab a new worker ref, drop the existing one */ 793 io_worker_release(worker); 794 schedule_work(&worker->work); 795 } 796 797 static void io_workqueue_create(struct work_struct *work) 798 { 799 struct io_worker *worker = container_of(work, struct io_worker, work); 800 struct io_wq_acct *acct = io_wq_get_acct(worker); 801 802 if (!io_queue_worker_create(worker, acct, create_worker_cont)) 803 kfree(worker); 804 } 805 806 static bool create_io_worker(struct io_wq *wq, int index) 807 { 808 struct io_wq_acct *acct = &wq->acct[index]; 809 struct io_worker *worker; 810 struct task_struct *tsk; 811 812 __set_current_state(TASK_RUNNING); 813 814 worker = kzalloc(sizeof(*worker), GFP_KERNEL); 815 if (!worker) { 816 fail: 817 atomic_dec(&acct->nr_running); 818 raw_spin_lock(&wq->lock); 819 acct->nr_workers--; 820 raw_spin_unlock(&wq->lock); 821 io_worker_ref_put(wq); 822 return false; 823 } 824 825 refcount_set(&worker->ref, 1); 826 worker->wq = wq; 827 raw_spin_lock_init(&worker->lock); 828 init_completion(&worker->ref_done); 829 830 if (index == IO_WQ_ACCT_BOUND) 831 worker->flags |= IO_WORKER_F_BOUND; 832 833 tsk = create_io_thread(io_wq_worker, worker, NUMA_NO_NODE); 834 if (!IS_ERR(tsk)) { 835 io_init_new_worker(wq, worker, tsk); 836 } else if (!io_should_retry_thread(PTR_ERR(tsk))) { 837 kfree(worker); 838 goto fail; 839 } else { 840 INIT_WORK(&worker->work, io_workqueue_create); 841 schedule_work(&worker->work); 842 } 843 844 return true; 845 } 846 847 /* 848 * Iterate the passed in list and call the specific function for each 849 * worker that isn't exiting 850 */ 851 static bool io_wq_for_each_worker(struct io_wq *wq, 852 bool (*func)(struct io_worker *, void *), 853 void *data) 854 { 855 struct io_worker *worker; 856 bool ret = false; 857 858 list_for_each_entry_rcu(worker, &wq->all_list, all_list) { 859 if (io_worker_get(worker)) { 860 /* no task if node is/was offline */ 861 if (worker->task) 862 ret = func(worker, data); 863 io_worker_release(worker); 864 if (ret) 865 break; 866 } 867 } 868 869 return ret; 870 } 871 872 static bool io_wq_worker_wake(struct io_worker *worker, void *data) 873 { 874 __set_notify_signal(worker->task); 875 wake_up_process(worker->task); 876 return false; 877 } 878 879 static void io_run_cancel(struct io_wq_work *work, struct io_wq *wq) 880 { 881 do { 882 work->flags |= IO_WQ_WORK_CANCEL; 883 wq->do_work(work); 884 work = wq->free_work(work); 885 } while (work); 886 } 887 888 static void io_wq_insert_work(struct io_wq *wq, struct io_wq_work *work) 889 { 890 struct io_wq_acct *acct = io_work_get_acct(wq, work); 891 unsigned int hash; 892 struct io_wq_work *tail; 893 894 if (!io_wq_is_hashed(work)) { 895 append: 896 wq_list_add_tail(&work->list, &acct->work_list); 897 return; 898 } 899 900 hash = io_get_work_hash(work); 901 tail = wq->hash_tail[hash]; 902 wq->hash_tail[hash] = work; 903 if (!tail) 904 goto append; 905 906 wq_list_add_after(&work->list, &tail->list, &acct->work_list); 907 } 908 909 static bool io_wq_work_match_item(struct io_wq_work *work, void *data) 910 { 911 return work == data; 912 } 913 914 void io_wq_enqueue(struct io_wq *wq, struct io_wq_work *work) 915 { 916 struct io_wq_acct *acct = io_work_get_acct(wq, work); 917 struct io_cb_cancel_data match; 918 unsigned work_flags = work->flags; 919 bool do_create; 920 921 /* 922 * If io-wq is exiting for this task, or if the request has explicitly 923 * been marked as one that should not get executed, cancel it here. 924 */ 925 if (test_bit(IO_WQ_BIT_EXIT, &wq->state) || 926 (work->flags & IO_WQ_WORK_CANCEL)) { 927 io_run_cancel(work, wq); 928 return; 929 } 930 931 raw_spin_lock(&acct->lock); 932 io_wq_insert_work(wq, work); 933 clear_bit(IO_ACCT_STALLED_BIT, &acct->flags); 934 raw_spin_unlock(&acct->lock); 935 936 rcu_read_lock(); 937 do_create = !io_wq_activate_free_worker(wq, acct); 938 rcu_read_unlock(); 939 940 if (do_create && ((work_flags & IO_WQ_WORK_CONCURRENT) || 941 !atomic_read(&acct->nr_running))) { 942 bool did_create; 943 944 did_create = io_wq_create_worker(wq, acct); 945 if (likely(did_create)) 946 return; 947 948 raw_spin_lock(&wq->lock); 949 if (acct->nr_workers) { 950 raw_spin_unlock(&wq->lock); 951 return; 952 } 953 raw_spin_unlock(&wq->lock); 954 955 /* fatal condition, failed to create the first worker */ 956 match.fn = io_wq_work_match_item, 957 match.data = work, 958 match.cancel_all = false, 959 960 io_acct_cancel_pending_work(wq, acct, &match); 961 } 962 } 963 964 /* 965 * Work items that hash to the same value will not be done in parallel. 966 * Used to limit concurrent writes, generally hashed by inode. 967 */ 968 void io_wq_hash_work(struct io_wq_work *work, void *val) 969 { 970 unsigned int bit; 971 972 bit = hash_ptr(val, IO_WQ_HASH_ORDER); 973 work->flags |= (IO_WQ_WORK_HASHED | (bit << IO_WQ_HASH_SHIFT)); 974 } 975 976 static bool __io_wq_worker_cancel(struct io_worker *worker, 977 struct io_cb_cancel_data *match, 978 struct io_wq_work *work) 979 { 980 if (work && match->fn(work, match->data)) { 981 work->flags |= IO_WQ_WORK_CANCEL; 982 __set_notify_signal(worker->task); 983 return true; 984 } 985 986 return false; 987 } 988 989 static bool io_wq_worker_cancel(struct io_worker *worker, void *data) 990 { 991 struct io_cb_cancel_data *match = data; 992 993 /* 994 * Hold the lock to avoid ->cur_work going out of scope, caller 995 * may dereference the passed in work. 996 */ 997 raw_spin_lock(&worker->lock); 998 if (__io_wq_worker_cancel(worker, match, worker->cur_work) || 999 __io_wq_worker_cancel(worker, match, worker->next_work)) 1000 match->nr_running++; 1001 raw_spin_unlock(&worker->lock); 1002 1003 return match->nr_running && !match->cancel_all; 1004 } 1005 1006 static inline void io_wq_remove_pending(struct io_wq *wq, 1007 struct io_wq_work *work, 1008 struct io_wq_work_node *prev) 1009 { 1010 struct io_wq_acct *acct = io_work_get_acct(wq, work); 1011 unsigned int hash = io_get_work_hash(work); 1012 struct io_wq_work *prev_work = NULL; 1013 1014 if (io_wq_is_hashed(work) && work == wq->hash_tail[hash]) { 1015 if (prev) 1016 prev_work = container_of(prev, struct io_wq_work, list); 1017 if (prev_work && io_get_work_hash(prev_work) == hash) 1018 wq->hash_tail[hash] = prev_work; 1019 else 1020 wq->hash_tail[hash] = NULL; 1021 } 1022 wq_list_del(&acct->work_list, &work->list, prev); 1023 } 1024 1025 static bool io_acct_cancel_pending_work(struct io_wq *wq, 1026 struct io_wq_acct *acct, 1027 struct io_cb_cancel_data *match) 1028 { 1029 struct io_wq_work_node *node, *prev; 1030 struct io_wq_work *work; 1031 1032 raw_spin_lock(&acct->lock); 1033 wq_list_for_each(node, prev, &acct->work_list) { 1034 work = container_of(node, struct io_wq_work, list); 1035 if (!match->fn(work, match->data)) 1036 continue; 1037 io_wq_remove_pending(wq, work, prev); 1038 raw_spin_unlock(&acct->lock); 1039 io_run_cancel(work, wq); 1040 match->nr_pending++; 1041 /* not safe to continue after unlock */ 1042 return true; 1043 } 1044 raw_spin_unlock(&acct->lock); 1045 1046 return false; 1047 } 1048 1049 static void io_wq_cancel_pending_work(struct io_wq *wq, 1050 struct io_cb_cancel_data *match) 1051 { 1052 int i; 1053 retry: 1054 for (i = 0; i < IO_WQ_ACCT_NR; i++) { 1055 struct io_wq_acct *acct = io_get_acct(wq, i == 0); 1056 1057 if (io_acct_cancel_pending_work(wq, acct, match)) { 1058 if (match->cancel_all) 1059 goto retry; 1060 break; 1061 } 1062 } 1063 } 1064 1065 static void io_wq_cancel_running_work(struct io_wq *wq, 1066 struct io_cb_cancel_data *match) 1067 { 1068 rcu_read_lock(); 1069 io_wq_for_each_worker(wq, io_wq_worker_cancel, match); 1070 rcu_read_unlock(); 1071 } 1072 1073 enum io_wq_cancel io_wq_cancel_cb(struct io_wq *wq, work_cancel_fn *cancel, 1074 void *data, bool cancel_all) 1075 { 1076 struct io_cb_cancel_data match = { 1077 .fn = cancel, 1078 .data = data, 1079 .cancel_all = cancel_all, 1080 }; 1081 1082 /* 1083 * First check pending list, if we're lucky we can just remove it 1084 * from there. CANCEL_OK means that the work is returned as-new, 1085 * no completion will be posted for it. 1086 * 1087 * Then check if a free (going busy) or busy worker has the work 1088 * currently running. If we find it there, we'll return CANCEL_RUNNING 1089 * as an indication that we attempt to signal cancellation. The 1090 * completion will run normally in this case. 1091 * 1092 * Do both of these while holding the wq->lock, to ensure that 1093 * we'll find a work item regardless of state. 1094 */ 1095 io_wq_cancel_pending_work(wq, &match); 1096 if (match.nr_pending && !match.cancel_all) 1097 return IO_WQ_CANCEL_OK; 1098 1099 raw_spin_lock(&wq->lock); 1100 io_wq_cancel_running_work(wq, &match); 1101 raw_spin_unlock(&wq->lock); 1102 if (match.nr_running && !match.cancel_all) 1103 return IO_WQ_CANCEL_RUNNING; 1104 1105 if (match.nr_running) 1106 return IO_WQ_CANCEL_RUNNING; 1107 if (match.nr_pending) 1108 return IO_WQ_CANCEL_OK; 1109 return IO_WQ_CANCEL_NOTFOUND; 1110 } 1111 1112 static int io_wq_hash_wake(struct wait_queue_entry *wait, unsigned mode, 1113 int sync, void *key) 1114 { 1115 struct io_wq *wq = container_of(wait, struct io_wq, wait); 1116 int i; 1117 1118 list_del_init(&wait->entry); 1119 1120 rcu_read_lock(); 1121 for (i = 0; i < IO_WQ_ACCT_NR; i++) { 1122 struct io_wq_acct *acct = &wq->acct[i]; 1123 1124 if (test_and_clear_bit(IO_ACCT_STALLED_BIT, &acct->flags)) 1125 io_wq_activate_free_worker(wq, acct); 1126 } 1127 rcu_read_unlock(); 1128 return 1; 1129 } 1130 1131 struct io_wq *io_wq_create(unsigned bounded, struct io_wq_data *data) 1132 { 1133 int ret, i; 1134 struct io_wq *wq; 1135 1136 if (WARN_ON_ONCE(!data->free_work || !data->do_work)) 1137 return ERR_PTR(-EINVAL); 1138 if (WARN_ON_ONCE(!bounded)) 1139 return ERR_PTR(-EINVAL); 1140 1141 wq = kzalloc(sizeof(struct io_wq), GFP_KERNEL); 1142 if (!wq) 1143 return ERR_PTR(-ENOMEM); 1144 ret = cpuhp_state_add_instance_nocalls(io_wq_online, &wq->cpuhp_node); 1145 if (ret) 1146 goto err_wq; 1147 1148 refcount_inc(&data->hash->refs); 1149 wq->hash = data->hash; 1150 wq->free_work = data->free_work; 1151 wq->do_work = data->do_work; 1152 1153 ret = -ENOMEM; 1154 1155 if (!alloc_cpumask_var(&wq->cpu_mask, GFP_KERNEL)) 1156 goto err; 1157 cpumask_copy(wq->cpu_mask, cpu_possible_mask); 1158 wq->acct[IO_WQ_ACCT_BOUND].max_workers = bounded; 1159 wq->acct[IO_WQ_ACCT_UNBOUND].max_workers = 1160 task_rlimit(current, RLIMIT_NPROC); 1161 INIT_LIST_HEAD(&wq->wait.entry); 1162 wq->wait.func = io_wq_hash_wake; 1163 for (i = 0; i < IO_WQ_ACCT_NR; i++) { 1164 struct io_wq_acct *acct = &wq->acct[i]; 1165 1166 acct->index = i; 1167 atomic_set(&acct->nr_running, 0); 1168 INIT_WQ_LIST(&acct->work_list); 1169 raw_spin_lock_init(&acct->lock); 1170 } 1171 1172 raw_spin_lock_init(&wq->lock); 1173 INIT_HLIST_NULLS_HEAD(&wq->free_list, 0); 1174 INIT_LIST_HEAD(&wq->all_list); 1175 1176 wq->task = get_task_struct(data->task); 1177 atomic_set(&wq->worker_refs, 1); 1178 init_completion(&wq->worker_done); 1179 return wq; 1180 err: 1181 io_wq_put_hash(data->hash); 1182 cpuhp_state_remove_instance_nocalls(io_wq_online, &wq->cpuhp_node); 1183 1184 free_cpumask_var(wq->cpu_mask); 1185 err_wq: 1186 kfree(wq); 1187 return ERR_PTR(ret); 1188 } 1189 1190 static bool io_task_work_match(struct callback_head *cb, void *data) 1191 { 1192 struct io_worker *worker; 1193 1194 if (cb->func != create_worker_cb && cb->func != create_worker_cont) 1195 return false; 1196 worker = container_of(cb, struct io_worker, create_work); 1197 return worker->wq == data; 1198 } 1199 1200 void io_wq_exit_start(struct io_wq *wq) 1201 { 1202 set_bit(IO_WQ_BIT_EXIT, &wq->state); 1203 } 1204 1205 static void io_wq_cancel_tw_create(struct io_wq *wq) 1206 { 1207 struct callback_head *cb; 1208 1209 while ((cb = task_work_cancel_match(wq->task, io_task_work_match, wq)) != NULL) { 1210 struct io_worker *worker; 1211 1212 worker = container_of(cb, struct io_worker, create_work); 1213 io_worker_cancel_cb(worker); 1214 /* 1215 * Only the worker continuation helper has worker allocated and 1216 * hence needs freeing. 1217 */ 1218 if (cb->func == create_worker_cont) 1219 kfree(worker); 1220 } 1221 } 1222 1223 static void io_wq_exit_workers(struct io_wq *wq) 1224 { 1225 if (!wq->task) 1226 return; 1227 1228 io_wq_cancel_tw_create(wq); 1229 1230 rcu_read_lock(); 1231 io_wq_for_each_worker(wq, io_wq_worker_wake, NULL); 1232 rcu_read_unlock(); 1233 io_worker_ref_put(wq); 1234 wait_for_completion(&wq->worker_done); 1235 1236 spin_lock_irq(&wq->hash->wait.lock); 1237 list_del_init(&wq->wait.entry); 1238 spin_unlock_irq(&wq->hash->wait.lock); 1239 1240 put_task_struct(wq->task); 1241 wq->task = NULL; 1242 } 1243 1244 static void io_wq_destroy(struct io_wq *wq) 1245 { 1246 struct io_cb_cancel_data match = { 1247 .fn = io_wq_work_match_all, 1248 .cancel_all = true, 1249 }; 1250 1251 cpuhp_state_remove_instance_nocalls(io_wq_online, &wq->cpuhp_node); 1252 io_wq_cancel_pending_work(wq, &match); 1253 free_cpumask_var(wq->cpu_mask); 1254 io_wq_put_hash(wq->hash); 1255 kfree(wq); 1256 } 1257 1258 void io_wq_put_and_exit(struct io_wq *wq) 1259 { 1260 WARN_ON_ONCE(!test_bit(IO_WQ_BIT_EXIT, &wq->state)); 1261 1262 io_wq_exit_workers(wq); 1263 io_wq_destroy(wq); 1264 } 1265 1266 struct online_data { 1267 unsigned int cpu; 1268 bool online; 1269 }; 1270 1271 static bool io_wq_worker_affinity(struct io_worker *worker, void *data) 1272 { 1273 struct online_data *od = data; 1274 1275 if (od->online) 1276 cpumask_set_cpu(od->cpu, worker->wq->cpu_mask); 1277 else 1278 cpumask_clear_cpu(od->cpu, worker->wq->cpu_mask); 1279 return false; 1280 } 1281 1282 static int __io_wq_cpu_online(struct io_wq *wq, unsigned int cpu, bool online) 1283 { 1284 struct online_data od = { 1285 .cpu = cpu, 1286 .online = online 1287 }; 1288 1289 rcu_read_lock(); 1290 io_wq_for_each_worker(wq, io_wq_worker_affinity, &od); 1291 rcu_read_unlock(); 1292 return 0; 1293 } 1294 1295 static int io_wq_cpu_online(unsigned int cpu, struct hlist_node *node) 1296 { 1297 struct io_wq *wq = hlist_entry_safe(node, struct io_wq, cpuhp_node); 1298 1299 return __io_wq_cpu_online(wq, cpu, true); 1300 } 1301 1302 static int io_wq_cpu_offline(unsigned int cpu, struct hlist_node *node) 1303 { 1304 struct io_wq *wq = hlist_entry_safe(node, struct io_wq, cpuhp_node); 1305 1306 return __io_wq_cpu_online(wq, cpu, false); 1307 } 1308 1309 int io_wq_cpu_affinity(struct io_uring_task *tctx, cpumask_var_t mask) 1310 { 1311 if (!tctx || !tctx->io_wq) 1312 return -EINVAL; 1313 1314 rcu_read_lock(); 1315 if (mask) 1316 cpumask_copy(tctx->io_wq->cpu_mask, mask); 1317 else 1318 cpumask_copy(tctx->io_wq->cpu_mask, cpu_possible_mask); 1319 rcu_read_unlock(); 1320 1321 return 0; 1322 } 1323 1324 /* 1325 * Set max number of unbounded workers, returns old value. If new_count is 0, 1326 * then just return the old value. 1327 */ 1328 int io_wq_max_workers(struct io_wq *wq, int *new_count) 1329 { 1330 struct io_wq_acct *acct; 1331 int prev[IO_WQ_ACCT_NR]; 1332 int i; 1333 1334 BUILD_BUG_ON((int) IO_WQ_ACCT_BOUND != (int) IO_WQ_BOUND); 1335 BUILD_BUG_ON((int) IO_WQ_ACCT_UNBOUND != (int) IO_WQ_UNBOUND); 1336 BUILD_BUG_ON((int) IO_WQ_ACCT_NR != 2); 1337 1338 for (i = 0; i < IO_WQ_ACCT_NR; i++) { 1339 if (new_count[i] > task_rlimit(current, RLIMIT_NPROC)) 1340 new_count[i] = task_rlimit(current, RLIMIT_NPROC); 1341 } 1342 1343 for (i = 0; i < IO_WQ_ACCT_NR; i++) 1344 prev[i] = 0; 1345 1346 rcu_read_lock(); 1347 1348 raw_spin_lock(&wq->lock); 1349 for (i = 0; i < IO_WQ_ACCT_NR; i++) { 1350 acct = &wq->acct[i]; 1351 prev[i] = max_t(int, acct->max_workers, prev[i]); 1352 if (new_count[i]) 1353 acct->max_workers = new_count[i]; 1354 } 1355 raw_spin_unlock(&wq->lock); 1356 rcu_read_unlock(); 1357 1358 for (i = 0; i < IO_WQ_ACCT_NR; i++) 1359 new_count[i] = prev[i]; 1360 1361 return 0; 1362 } 1363 1364 static __init int io_wq_init(void) 1365 { 1366 int ret; 1367 1368 ret = cpuhp_setup_state_multi(CPUHP_AP_ONLINE_DYN, "io-wq/online", 1369 io_wq_cpu_online, io_wq_cpu_offline); 1370 if (ret < 0) 1371 return ret; 1372 io_wq_online = ret; 1373 return 0; 1374 } 1375 subsys_initcall(io_wq_init); 1376