1 // SPDX-License-Identifier: GPL-2.0 2 /* 3 * Basic worker thread pool for io_uring 4 * 5 * Copyright (C) 2019 Jens Axboe 6 * 7 */ 8 #include <linux/kernel.h> 9 #include <linux/init.h> 10 #include <linux/errno.h> 11 #include <linux/sched/signal.h> 12 #include <linux/percpu.h> 13 #include <linux/slab.h> 14 #include <linux/rculist_nulls.h> 15 #include <linux/cpu.h> 16 #include <linux/task_work.h> 17 #include <linux/audit.h> 18 #include <linux/mmu_context.h> 19 #include <uapi/linux/io_uring.h> 20 21 #include "io-wq.h" 22 #include "slist.h" 23 #include "io_uring.h" 24 25 #define WORKER_IDLE_TIMEOUT (5 * HZ) 26 27 enum { 28 IO_WORKER_F_UP = 1, /* up and active */ 29 IO_WORKER_F_RUNNING = 2, /* account as running */ 30 IO_WORKER_F_FREE = 4, /* worker on free list */ 31 IO_WORKER_F_BOUND = 8, /* is doing bounded work */ 32 }; 33 34 enum { 35 IO_WQ_BIT_EXIT = 0, /* wq exiting */ 36 }; 37 38 enum { 39 IO_ACCT_STALLED_BIT = 0, /* stalled on hash */ 40 }; 41 42 /* 43 * One for each thread in a wq pool 44 */ 45 struct io_worker { 46 refcount_t ref; 47 unsigned flags; 48 struct hlist_nulls_node nulls_node; 49 struct list_head all_list; 50 struct task_struct *task; 51 struct io_wq *wq; 52 53 struct io_wq_work *cur_work; 54 struct io_wq_work *next_work; 55 raw_spinlock_t lock; 56 57 struct completion ref_done; 58 59 unsigned long create_state; 60 struct callback_head create_work; 61 int create_index; 62 63 union { 64 struct rcu_head rcu; 65 struct work_struct work; 66 }; 67 }; 68 69 #if BITS_PER_LONG == 64 70 #define IO_WQ_HASH_ORDER 6 71 #else 72 #define IO_WQ_HASH_ORDER 5 73 #endif 74 75 #define IO_WQ_NR_HASH_BUCKETS (1u << IO_WQ_HASH_ORDER) 76 77 struct io_wq_acct { 78 unsigned nr_workers; 79 unsigned max_workers; 80 int index; 81 atomic_t nr_running; 82 raw_spinlock_t lock; 83 struct io_wq_work_list work_list; 84 unsigned long flags; 85 }; 86 87 enum { 88 IO_WQ_ACCT_BOUND, 89 IO_WQ_ACCT_UNBOUND, 90 IO_WQ_ACCT_NR, 91 }; 92 93 /* 94 * Per io_wq state 95 */ 96 struct io_wq { 97 unsigned long state; 98 99 free_work_fn *free_work; 100 io_wq_work_fn *do_work; 101 102 struct io_wq_hash *hash; 103 104 atomic_t worker_refs; 105 struct completion worker_done; 106 107 struct hlist_node cpuhp_node; 108 109 struct task_struct *task; 110 111 struct io_wq_acct acct[IO_WQ_ACCT_NR]; 112 113 /* lock protects access to elements below */ 114 raw_spinlock_t lock; 115 116 struct hlist_nulls_head free_list; 117 struct list_head all_list; 118 119 struct wait_queue_entry wait; 120 121 struct io_wq_work *hash_tail[IO_WQ_NR_HASH_BUCKETS]; 122 123 cpumask_var_t cpu_mask; 124 }; 125 126 static enum cpuhp_state io_wq_online; 127 128 struct io_cb_cancel_data { 129 work_cancel_fn *fn; 130 void *data; 131 int nr_running; 132 int nr_pending; 133 bool cancel_all; 134 }; 135 136 static bool create_io_worker(struct io_wq *wq, int index); 137 static void io_wq_dec_running(struct io_worker *worker); 138 static bool io_acct_cancel_pending_work(struct io_wq *wq, 139 struct io_wq_acct *acct, 140 struct io_cb_cancel_data *match); 141 static void create_worker_cb(struct callback_head *cb); 142 static void io_wq_cancel_tw_create(struct io_wq *wq); 143 144 static bool io_worker_get(struct io_worker *worker) 145 { 146 return refcount_inc_not_zero(&worker->ref); 147 } 148 149 static void io_worker_release(struct io_worker *worker) 150 { 151 if (refcount_dec_and_test(&worker->ref)) 152 complete(&worker->ref_done); 153 } 154 155 static inline struct io_wq_acct *io_get_acct(struct io_wq *wq, bool bound) 156 { 157 return &wq->acct[bound ? IO_WQ_ACCT_BOUND : IO_WQ_ACCT_UNBOUND]; 158 } 159 160 static inline struct io_wq_acct *io_work_get_acct(struct io_wq *wq, 161 struct io_wq_work *work) 162 { 163 return io_get_acct(wq, !(work->flags & IO_WQ_WORK_UNBOUND)); 164 } 165 166 static inline struct io_wq_acct *io_wq_get_acct(struct io_worker *worker) 167 { 168 return io_get_acct(worker->wq, worker->flags & IO_WORKER_F_BOUND); 169 } 170 171 static void io_worker_ref_put(struct io_wq *wq) 172 { 173 if (atomic_dec_and_test(&wq->worker_refs)) 174 complete(&wq->worker_done); 175 } 176 177 static void io_worker_cancel_cb(struct io_worker *worker) 178 { 179 struct io_wq_acct *acct = io_wq_get_acct(worker); 180 struct io_wq *wq = worker->wq; 181 182 atomic_dec(&acct->nr_running); 183 raw_spin_lock(&wq->lock); 184 acct->nr_workers--; 185 raw_spin_unlock(&wq->lock); 186 io_worker_ref_put(wq); 187 clear_bit_unlock(0, &worker->create_state); 188 io_worker_release(worker); 189 } 190 191 static bool io_task_worker_match(struct callback_head *cb, void *data) 192 { 193 struct io_worker *worker; 194 195 if (cb->func != create_worker_cb) 196 return false; 197 worker = container_of(cb, struct io_worker, create_work); 198 return worker == data; 199 } 200 201 static void io_worker_exit(struct io_worker *worker) 202 { 203 struct io_wq *wq = worker->wq; 204 205 while (1) { 206 struct callback_head *cb = task_work_cancel_match(wq->task, 207 io_task_worker_match, worker); 208 209 if (!cb) 210 break; 211 io_worker_cancel_cb(worker); 212 } 213 214 io_worker_release(worker); 215 wait_for_completion(&worker->ref_done); 216 217 raw_spin_lock(&wq->lock); 218 if (worker->flags & IO_WORKER_F_FREE) 219 hlist_nulls_del_rcu(&worker->nulls_node); 220 list_del_rcu(&worker->all_list); 221 raw_spin_unlock(&wq->lock); 222 io_wq_dec_running(worker); 223 worker->flags = 0; 224 225 kfree_rcu(worker, rcu); 226 io_worker_ref_put(wq); 227 do_exit(0); 228 } 229 230 static inline bool io_acct_run_queue(struct io_wq_acct *acct) 231 { 232 bool ret = false; 233 234 raw_spin_lock(&acct->lock); 235 if (!wq_list_empty(&acct->work_list) && 236 !test_bit(IO_ACCT_STALLED_BIT, &acct->flags)) 237 ret = true; 238 raw_spin_unlock(&acct->lock); 239 240 return ret; 241 } 242 243 /* 244 * Check head of free list for an available worker. If one isn't available, 245 * caller must create one. 246 */ 247 static bool io_wq_activate_free_worker(struct io_wq *wq, 248 struct io_wq_acct *acct) 249 __must_hold(RCU) 250 { 251 struct hlist_nulls_node *n; 252 struct io_worker *worker; 253 254 /* 255 * Iterate free_list and see if we can find an idle worker to 256 * activate. If a given worker is on the free_list but in the process 257 * of exiting, keep trying. 258 */ 259 hlist_nulls_for_each_entry_rcu(worker, n, &wq->free_list, nulls_node) { 260 if (!io_worker_get(worker)) 261 continue; 262 if (io_wq_get_acct(worker) != acct) { 263 io_worker_release(worker); 264 continue; 265 } 266 if (wake_up_process(worker->task)) { 267 io_worker_release(worker); 268 return true; 269 } 270 io_worker_release(worker); 271 } 272 273 return false; 274 } 275 276 /* 277 * We need a worker. If we find a free one, we're good. If not, and we're 278 * below the max number of workers, create one. 279 */ 280 static bool io_wq_create_worker(struct io_wq *wq, struct io_wq_acct *acct) 281 { 282 /* 283 * Most likely an attempt to queue unbounded work on an io_wq that 284 * wasn't setup with any unbounded workers. 285 */ 286 if (unlikely(!acct->max_workers)) 287 pr_warn_once("io-wq is not configured for unbound workers"); 288 289 raw_spin_lock(&wq->lock); 290 if (acct->nr_workers >= acct->max_workers) { 291 raw_spin_unlock(&wq->lock); 292 return true; 293 } 294 acct->nr_workers++; 295 raw_spin_unlock(&wq->lock); 296 atomic_inc(&acct->nr_running); 297 atomic_inc(&wq->worker_refs); 298 return create_io_worker(wq, acct->index); 299 } 300 301 static void io_wq_inc_running(struct io_worker *worker) 302 { 303 struct io_wq_acct *acct = io_wq_get_acct(worker); 304 305 atomic_inc(&acct->nr_running); 306 } 307 308 static void create_worker_cb(struct callback_head *cb) 309 { 310 struct io_worker *worker; 311 struct io_wq *wq; 312 313 struct io_wq_acct *acct; 314 bool do_create = false; 315 316 worker = container_of(cb, struct io_worker, create_work); 317 wq = worker->wq; 318 acct = &wq->acct[worker->create_index]; 319 raw_spin_lock(&wq->lock); 320 321 if (acct->nr_workers < acct->max_workers) { 322 acct->nr_workers++; 323 do_create = true; 324 } 325 raw_spin_unlock(&wq->lock); 326 if (do_create) { 327 create_io_worker(wq, worker->create_index); 328 } else { 329 atomic_dec(&acct->nr_running); 330 io_worker_ref_put(wq); 331 } 332 clear_bit_unlock(0, &worker->create_state); 333 io_worker_release(worker); 334 } 335 336 static bool io_queue_worker_create(struct io_worker *worker, 337 struct io_wq_acct *acct, 338 task_work_func_t func) 339 { 340 struct io_wq *wq = worker->wq; 341 342 /* raced with exit, just ignore create call */ 343 if (test_bit(IO_WQ_BIT_EXIT, &wq->state)) 344 goto fail; 345 if (!io_worker_get(worker)) 346 goto fail; 347 /* 348 * create_state manages ownership of create_work/index. We should 349 * only need one entry per worker, as the worker going to sleep 350 * will trigger the condition, and waking will clear it once it 351 * runs the task_work. 352 */ 353 if (test_bit(0, &worker->create_state) || 354 test_and_set_bit_lock(0, &worker->create_state)) 355 goto fail_release; 356 357 atomic_inc(&wq->worker_refs); 358 init_task_work(&worker->create_work, func); 359 worker->create_index = acct->index; 360 if (!task_work_add(wq->task, &worker->create_work, TWA_SIGNAL)) { 361 /* 362 * EXIT may have been set after checking it above, check after 363 * adding the task_work and remove any creation item if it is 364 * now set. wq exit does that too, but we can have added this 365 * work item after we canceled in io_wq_exit_workers(). 366 */ 367 if (test_bit(IO_WQ_BIT_EXIT, &wq->state)) 368 io_wq_cancel_tw_create(wq); 369 io_worker_ref_put(wq); 370 return true; 371 } 372 io_worker_ref_put(wq); 373 clear_bit_unlock(0, &worker->create_state); 374 fail_release: 375 io_worker_release(worker); 376 fail: 377 atomic_dec(&acct->nr_running); 378 io_worker_ref_put(wq); 379 return false; 380 } 381 382 static void io_wq_dec_running(struct io_worker *worker) 383 { 384 struct io_wq_acct *acct = io_wq_get_acct(worker); 385 struct io_wq *wq = worker->wq; 386 387 if (!(worker->flags & IO_WORKER_F_UP)) 388 return; 389 390 if (!atomic_dec_and_test(&acct->nr_running)) 391 return; 392 if (!io_acct_run_queue(acct)) 393 return; 394 395 atomic_inc(&acct->nr_running); 396 atomic_inc(&wq->worker_refs); 397 io_queue_worker_create(worker, acct, create_worker_cb); 398 } 399 400 /* 401 * Worker will start processing some work. Move it to the busy list, if 402 * it's currently on the freelist 403 */ 404 static void __io_worker_busy(struct io_wq *wq, struct io_worker *worker) 405 { 406 if (worker->flags & IO_WORKER_F_FREE) { 407 worker->flags &= ~IO_WORKER_F_FREE; 408 raw_spin_lock(&wq->lock); 409 hlist_nulls_del_init_rcu(&worker->nulls_node); 410 raw_spin_unlock(&wq->lock); 411 } 412 } 413 414 /* 415 * No work, worker going to sleep. Move to freelist. 416 */ 417 static void __io_worker_idle(struct io_wq *wq, struct io_worker *worker) 418 __must_hold(wq->lock) 419 { 420 if (!(worker->flags & IO_WORKER_F_FREE)) { 421 worker->flags |= IO_WORKER_F_FREE; 422 hlist_nulls_add_head_rcu(&worker->nulls_node, &wq->free_list); 423 } 424 } 425 426 static inline unsigned int io_get_work_hash(struct io_wq_work *work) 427 { 428 return work->flags >> IO_WQ_HASH_SHIFT; 429 } 430 431 static bool io_wait_on_hash(struct io_wq *wq, unsigned int hash) 432 { 433 bool ret = false; 434 435 spin_lock_irq(&wq->hash->wait.lock); 436 if (list_empty(&wq->wait.entry)) { 437 __add_wait_queue(&wq->hash->wait, &wq->wait); 438 if (!test_bit(hash, &wq->hash->map)) { 439 __set_current_state(TASK_RUNNING); 440 list_del_init(&wq->wait.entry); 441 ret = true; 442 } 443 } 444 spin_unlock_irq(&wq->hash->wait.lock); 445 return ret; 446 } 447 448 static struct io_wq_work *io_get_next_work(struct io_wq_acct *acct, 449 struct io_worker *worker) 450 __must_hold(acct->lock) 451 { 452 struct io_wq_work_node *node, *prev; 453 struct io_wq_work *work, *tail; 454 unsigned int stall_hash = -1U; 455 struct io_wq *wq = worker->wq; 456 457 wq_list_for_each(node, prev, &acct->work_list) { 458 unsigned int hash; 459 460 work = container_of(node, struct io_wq_work, list); 461 462 /* not hashed, can run anytime */ 463 if (!io_wq_is_hashed(work)) { 464 wq_list_del(&acct->work_list, node, prev); 465 return work; 466 } 467 468 hash = io_get_work_hash(work); 469 /* all items with this hash lie in [work, tail] */ 470 tail = wq->hash_tail[hash]; 471 472 /* hashed, can run if not already running */ 473 if (!test_and_set_bit(hash, &wq->hash->map)) { 474 wq->hash_tail[hash] = NULL; 475 wq_list_cut(&acct->work_list, &tail->list, prev); 476 return work; 477 } 478 if (stall_hash == -1U) 479 stall_hash = hash; 480 /* fast forward to a next hash, for-each will fix up @prev */ 481 node = &tail->list; 482 } 483 484 if (stall_hash != -1U) { 485 bool unstalled; 486 487 /* 488 * Set this before dropping the lock to avoid racing with new 489 * work being added and clearing the stalled bit. 490 */ 491 set_bit(IO_ACCT_STALLED_BIT, &acct->flags); 492 raw_spin_unlock(&acct->lock); 493 unstalled = io_wait_on_hash(wq, stall_hash); 494 raw_spin_lock(&acct->lock); 495 if (unstalled) { 496 clear_bit(IO_ACCT_STALLED_BIT, &acct->flags); 497 if (wq_has_sleeper(&wq->hash->wait)) 498 wake_up(&wq->hash->wait); 499 } 500 } 501 502 return NULL; 503 } 504 505 static void io_assign_current_work(struct io_worker *worker, 506 struct io_wq_work *work) 507 { 508 if (work) { 509 io_run_task_work(); 510 cond_resched(); 511 } 512 513 raw_spin_lock(&worker->lock); 514 worker->cur_work = work; 515 worker->next_work = NULL; 516 raw_spin_unlock(&worker->lock); 517 } 518 519 static void io_worker_handle_work(struct io_worker *worker) 520 { 521 struct io_wq_acct *acct = io_wq_get_acct(worker); 522 struct io_wq *wq = worker->wq; 523 bool do_kill = test_bit(IO_WQ_BIT_EXIT, &wq->state); 524 525 do { 526 struct io_wq_work *work; 527 528 /* 529 * If we got some work, mark us as busy. If we didn't, but 530 * the list isn't empty, it means we stalled on hashed work. 531 * Mark us stalled so we don't keep looking for work when we 532 * can't make progress, any work completion or insertion will 533 * clear the stalled flag. 534 */ 535 raw_spin_lock(&acct->lock); 536 work = io_get_next_work(acct, worker); 537 raw_spin_unlock(&acct->lock); 538 if (work) { 539 __io_worker_busy(wq, worker); 540 541 /* 542 * Make sure cancelation can find this, even before 543 * it becomes the active work. That avoids a window 544 * where the work has been removed from our general 545 * work list, but isn't yet discoverable as the 546 * current work item for this worker. 547 */ 548 raw_spin_lock(&worker->lock); 549 worker->next_work = work; 550 raw_spin_unlock(&worker->lock); 551 } else { 552 break; 553 } 554 io_assign_current_work(worker, work); 555 __set_current_state(TASK_RUNNING); 556 557 /* handle a whole dependent link */ 558 do { 559 struct io_wq_work *next_hashed, *linked; 560 unsigned int hash = io_get_work_hash(work); 561 562 next_hashed = wq_next_work(work); 563 564 if (unlikely(do_kill) && (work->flags & IO_WQ_WORK_UNBOUND)) 565 work->flags |= IO_WQ_WORK_CANCEL; 566 wq->do_work(work); 567 io_assign_current_work(worker, NULL); 568 569 linked = wq->free_work(work); 570 work = next_hashed; 571 if (!work && linked && !io_wq_is_hashed(linked)) { 572 work = linked; 573 linked = NULL; 574 } 575 io_assign_current_work(worker, work); 576 if (linked) 577 io_wq_enqueue(wq, linked); 578 579 if (hash != -1U && !next_hashed) { 580 /* serialize hash clear with wake_up() */ 581 spin_lock_irq(&wq->hash->wait.lock); 582 clear_bit(hash, &wq->hash->map); 583 clear_bit(IO_ACCT_STALLED_BIT, &acct->flags); 584 spin_unlock_irq(&wq->hash->wait.lock); 585 if (wq_has_sleeper(&wq->hash->wait)) 586 wake_up(&wq->hash->wait); 587 } 588 } while (work); 589 } while (1); 590 } 591 592 static int io_wq_worker(void *data) 593 { 594 struct io_worker *worker = data; 595 struct io_wq_acct *acct = io_wq_get_acct(worker); 596 struct io_wq *wq = worker->wq; 597 bool exit_mask = false, last_timeout = false; 598 char buf[TASK_COMM_LEN]; 599 600 worker->flags |= (IO_WORKER_F_UP | IO_WORKER_F_RUNNING); 601 602 snprintf(buf, sizeof(buf), "iou-wrk-%d", wq->task->pid); 603 set_task_comm(current, buf); 604 605 while (!test_bit(IO_WQ_BIT_EXIT, &wq->state)) { 606 long ret; 607 608 set_current_state(TASK_INTERRUPTIBLE); 609 while (io_acct_run_queue(acct)) 610 io_worker_handle_work(worker); 611 612 raw_spin_lock(&wq->lock); 613 /* 614 * Last sleep timed out. Exit if we're not the last worker, 615 * or if someone modified our affinity. 616 */ 617 if (last_timeout && (exit_mask || acct->nr_workers > 1)) { 618 acct->nr_workers--; 619 raw_spin_unlock(&wq->lock); 620 __set_current_state(TASK_RUNNING); 621 break; 622 } 623 last_timeout = false; 624 __io_worker_idle(wq, worker); 625 raw_spin_unlock(&wq->lock); 626 if (io_run_task_work()) 627 continue; 628 ret = schedule_timeout(WORKER_IDLE_TIMEOUT); 629 if (signal_pending(current)) { 630 struct ksignal ksig; 631 632 if (!get_signal(&ksig)) 633 continue; 634 break; 635 } 636 if (!ret) { 637 last_timeout = true; 638 exit_mask = !cpumask_test_cpu(raw_smp_processor_id(), 639 wq->cpu_mask); 640 } 641 } 642 643 if (test_bit(IO_WQ_BIT_EXIT, &wq->state)) 644 io_worker_handle_work(worker); 645 646 io_worker_exit(worker); 647 return 0; 648 } 649 650 /* 651 * Called when a worker is scheduled in. Mark us as currently running. 652 */ 653 void io_wq_worker_running(struct task_struct *tsk) 654 { 655 struct io_worker *worker = tsk->worker_private; 656 657 if (!worker) 658 return; 659 if (!(worker->flags & IO_WORKER_F_UP)) 660 return; 661 if (worker->flags & IO_WORKER_F_RUNNING) 662 return; 663 worker->flags |= IO_WORKER_F_RUNNING; 664 io_wq_inc_running(worker); 665 } 666 667 /* 668 * Called when worker is going to sleep. If there are no workers currently 669 * running and we have work pending, wake up a free one or create a new one. 670 */ 671 void io_wq_worker_sleeping(struct task_struct *tsk) 672 { 673 struct io_worker *worker = tsk->worker_private; 674 675 if (!worker) 676 return; 677 if (!(worker->flags & IO_WORKER_F_UP)) 678 return; 679 if (!(worker->flags & IO_WORKER_F_RUNNING)) 680 return; 681 682 worker->flags &= ~IO_WORKER_F_RUNNING; 683 io_wq_dec_running(worker); 684 } 685 686 static void io_init_new_worker(struct io_wq *wq, struct io_worker *worker, 687 struct task_struct *tsk) 688 { 689 tsk->worker_private = worker; 690 worker->task = tsk; 691 set_cpus_allowed_ptr(tsk, wq->cpu_mask); 692 693 raw_spin_lock(&wq->lock); 694 hlist_nulls_add_head_rcu(&worker->nulls_node, &wq->free_list); 695 list_add_tail_rcu(&worker->all_list, &wq->all_list); 696 worker->flags |= IO_WORKER_F_FREE; 697 raw_spin_unlock(&wq->lock); 698 wake_up_new_task(tsk); 699 } 700 701 static bool io_wq_work_match_all(struct io_wq_work *work, void *data) 702 { 703 return true; 704 } 705 706 static inline bool io_should_retry_thread(long err) 707 { 708 /* 709 * Prevent perpetual task_work retry, if the task (or its group) is 710 * exiting. 711 */ 712 if (fatal_signal_pending(current)) 713 return false; 714 715 switch (err) { 716 case -EAGAIN: 717 case -ERESTARTSYS: 718 case -ERESTARTNOINTR: 719 case -ERESTARTNOHAND: 720 return true; 721 default: 722 return false; 723 } 724 } 725 726 static void create_worker_cont(struct callback_head *cb) 727 { 728 struct io_worker *worker; 729 struct task_struct *tsk; 730 struct io_wq *wq; 731 732 worker = container_of(cb, struct io_worker, create_work); 733 clear_bit_unlock(0, &worker->create_state); 734 wq = worker->wq; 735 tsk = create_io_thread(io_wq_worker, worker, NUMA_NO_NODE); 736 if (!IS_ERR(tsk)) { 737 io_init_new_worker(wq, worker, tsk); 738 io_worker_release(worker); 739 return; 740 } else if (!io_should_retry_thread(PTR_ERR(tsk))) { 741 struct io_wq_acct *acct = io_wq_get_acct(worker); 742 743 atomic_dec(&acct->nr_running); 744 raw_spin_lock(&wq->lock); 745 acct->nr_workers--; 746 if (!acct->nr_workers) { 747 struct io_cb_cancel_data match = { 748 .fn = io_wq_work_match_all, 749 .cancel_all = true, 750 }; 751 752 raw_spin_unlock(&wq->lock); 753 while (io_acct_cancel_pending_work(wq, acct, &match)) 754 ; 755 } else { 756 raw_spin_unlock(&wq->lock); 757 } 758 io_worker_ref_put(wq); 759 kfree(worker); 760 return; 761 } 762 763 /* re-create attempts grab a new worker ref, drop the existing one */ 764 io_worker_release(worker); 765 schedule_work(&worker->work); 766 } 767 768 static void io_workqueue_create(struct work_struct *work) 769 { 770 struct io_worker *worker = container_of(work, struct io_worker, work); 771 struct io_wq_acct *acct = io_wq_get_acct(worker); 772 773 if (!io_queue_worker_create(worker, acct, create_worker_cont)) 774 kfree(worker); 775 } 776 777 static bool create_io_worker(struct io_wq *wq, int index) 778 { 779 struct io_wq_acct *acct = &wq->acct[index]; 780 struct io_worker *worker; 781 struct task_struct *tsk; 782 783 __set_current_state(TASK_RUNNING); 784 785 worker = kzalloc(sizeof(*worker), GFP_KERNEL); 786 if (!worker) { 787 fail: 788 atomic_dec(&acct->nr_running); 789 raw_spin_lock(&wq->lock); 790 acct->nr_workers--; 791 raw_spin_unlock(&wq->lock); 792 io_worker_ref_put(wq); 793 return false; 794 } 795 796 refcount_set(&worker->ref, 1); 797 worker->wq = wq; 798 raw_spin_lock_init(&worker->lock); 799 init_completion(&worker->ref_done); 800 801 if (index == IO_WQ_ACCT_BOUND) 802 worker->flags |= IO_WORKER_F_BOUND; 803 804 tsk = create_io_thread(io_wq_worker, worker, NUMA_NO_NODE); 805 if (!IS_ERR(tsk)) { 806 io_init_new_worker(wq, worker, tsk); 807 } else if (!io_should_retry_thread(PTR_ERR(tsk))) { 808 kfree(worker); 809 goto fail; 810 } else { 811 INIT_WORK(&worker->work, io_workqueue_create); 812 schedule_work(&worker->work); 813 } 814 815 return true; 816 } 817 818 /* 819 * Iterate the passed in list and call the specific function for each 820 * worker that isn't exiting 821 */ 822 static bool io_wq_for_each_worker(struct io_wq *wq, 823 bool (*func)(struct io_worker *, void *), 824 void *data) 825 { 826 struct io_worker *worker; 827 bool ret = false; 828 829 list_for_each_entry_rcu(worker, &wq->all_list, all_list) { 830 if (io_worker_get(worker)) { 831 /* no task if node is/was offline */ 832 if (worker->task) 833 ret = func(worker, data); 834 io_worker_release(worker); 835 if (ret) 836 break; 837 } 838 } 839 840 return ret; 841 } 842 843 static bool io_wq_worker_wake(struct io_worker *worker, void *data) 844 { 845 __set_notify_signal(worker->task); 846 wake_up_process(worker->task); 847 return false; 848 } 849 850 static void io_run_cancel(struct io_wq_work *work, struct io_wq *wq) 851 { 852 do { 853 work->flags |= IO_WQ_WORK_CANCEL; 854 wq->do_work(work); 855 work = wq->free_work(work); 856 } while (work); 857 } 858 859 static void io_wq_insert_work(struct io_wq *wq, struct io_wq_work *work) 860 { 861 struct io_wq_acct *acct = io_work_get_acct(wq, work); 862 unsigned int hash; 863 struct io_wq_work *tail; 864 865 if (!io_wq_is_hashed(work)) { 866 append: 867 wq_list_add_tail(&work->list, &acct->work_list); 868 return; 869 } 870 871 hash = io_get_work_hash(work); 872 tail = wq->hash_tail[hash]; 873 wq->hash_tail[hash] = work; 874 if (!tail) 875 goto append; 876 877 wq_list_add_after(&work->list, &tail->list, &acct->work_list); 878 } 879 880 static bool io_wq_work_match_item(struct io_wq_work *work, void *data) 881 { 882 return work == data; 883 } 884 885 void io_wq_enqueue(struct io_wq *wq, struct io_wq_work *work) 886 { 887 struct io_wq_acct *acct = io_work_get_acct(wq, work); 888 struct io_cb_cancel_data match; 889 unsigned work_flags = work->flags; 890 bool do_create; 891 892 /* 893 * If io-wq is exiting for this task, or if the request has explicitly 894 * been marked as one that should not get executed, cancel it here. 895 */ 896 if (test_bit(IO_WQ_BIT_EXIT, &wq->state) || 897 (work->flags & IO_WQ_WORK_CANCEL)) { 898 io_run_cancel(work, wq); 899 return; 900 } 901 902 raw_spin_lock(&acct->lock); 903 io_wq_insert_work(wq, work); 904 clear_bit(IO_ACCT_STALLED_BIT, &acct->flags); 905 raw_spin_unlock(&acct->lock); 906 907 raw_spin_lock(&wq->lock); 908 rcu_read_lock(); 909 do_create = !io_wq_activate_free_worker(wq, acct); 910 rcu_read_unlock(); 911 912 raw_spin_unlock(&wq->lock); 913 914 if (do_create && ((work_flags & IO_WQ_WORK_CONCURRENT) || 915 !atomic_read(&acct->nr_running))) { 916 bool did_create; 917 918 did_create = io_wq_create_worker(wq, acct); 919 if (likely(did_create)) 920 return; 921 922 raw_spin_lock(&wq->lock); 923 if (acct->nr_workers) { 924 raw_spin_unlock(&wq->lock); 925 return; 926 } 927 raw_spin_unlock(&wq->lock); 928 929 /* fatal condition, failed to create the first worker */ 930 match.fn = io_wq_work_match_item, 931 match.data = work, 932 match.cancel_all = false, 933 934 io_acct_cancel_pending_work(wq, acct, &match); 935 } 936 } 937 938 /* 939 * Work items that hash to the same value will not be done in parallel. 940 * Used to limit concurrent writes, generally hashed by inode. 941 */ 942 void io_wq_hash_work(struct io_wq_work *work, void *val) 943 { 944 unsigned int bit; 945 946 bit = hash_ptr(val, IO_WQ_HASH_ORDER); 947 work->flags |= (IO_WQ_WORK_HASHED | (bit << IO_WQ_HASH_SHIFT)); 948 } 949 950 static bool __io_wq_worker_cancel(struct io_worker *worker, 951 struct io_cb_cancel_data *match, 952 struct io_wq_work *work) 953 { 954 if (work && match->fn(work, match->data)) { 955 work->flags |= IO_WQ_WORK_CANCEL; 956 __set_notify_signal(worker->task); 957 return true; 958 } 959 960 return false; 961 } 962 963 static bool io_wq_worker_cancel(struct io_worker *worker, void *data) 964 { 965 struct io_cb_cancel_data *match = data; 966 967 /* 968 * Hold the lock to avoid ->cur_work going out of scope, caller 969 * may dereference the passed in work. 970 */ 971 raw_spin_lock(&worker->lock); 972 if (__io_wq_worker_cancel(worker, match, worker->cur_work) || 973 __io_wq_worker_cancel(worker, match, worker->next_work)) 974 match->nr_running++; 975 raw_spin_unlock(&worker->lock); 976 977 return match->nr_running && !match->cancel_all; 978 } 979 980 static inline void io_wq_remove_pending(struct io_wq *wq, 981 struct io_wq_work *work, 982 struct io_wq_work_node *prev) 983 { 984 struct io_wq_acct *acct = io_work_get_acct(wq, work); 985 unsigned int hash = io_get_work_hash(work); 986 struct io_wq_work *prev_work = NULL; 987 988 if (io_wq_is_hashed(work) && work == wq->hash_tail[hash]) { 989 if (prev) 990 prev_work = container_of(prev, struct io_wq_work, list); 991 if (prev_work && io_get_work_hash(prev_work) == hash) 992 wq->hash_tail[hash] = prev_work; 993 else 994 wq->hash_tail[hash] = NULL; 995 } 996 wq_list_del(&acct->work_list, &work->list, prev); 997 } 998 999 static bool io_acct_cancel_pending_work(struct io_wq *wq, 1000 struct io_wq_acct *acct, 1001 struct io_cb_cancel_data *match) 1002 { 1003 struct io_wq_work_node *node, *prev; 1004 struct io_wq_work *work; 1005 1006 raw_spin_lock(&acct->lock); 1007 wq_list_for_each(node, prev, &acct->work_list) { 1008 work = container_of(node, struct io_wq_work, list); 1009 if (!match->fn(work, match->data)) 1010 continue; 1011 io_wq_remove_pending(wq, work, prev); 1012 raw_spin_unlock(&acct->lock); 1013 io_run_cancel(work, wq); 1014 match->nr_pending++; 1015 /* not safe to continue after unlock */ 1016 return true; 1017 } 1018 raw_spin_unlock(&acct->lock); 1019 1020 return false; 1021 } 1022 1023 static void io_wq_cancel_pending_work(struct io_wq *wq, 1024 struct io_cb_cancel_data *match) 1025 { 1026 int i; 1027 retry: 1028 for (i = 0; i < IO_WQ_ACCT_NR; i++) { 1029 struct io_wq_acct *acct = io_get_acct(wq, i == 0); 1030 1031 if (io_acct_cancel_pending_work(wq, acct, match)) { 1032 if (match->cancel_all) 1033 goto retry; 1034 break; 1035 } 1036 } 1037 } 1038 1039 static void io_wq_cancel_running_work(struct io_wq *wq, 1040 struct io_cb_cancel_data *match) 1041 { 1042 rcu_read_lock(); 1043 io_wq_for_each_worker(wq, io_wq_worker_cancel, match); 1044 rcu_read_unlock(); 1045 } 1046 1047 enum io_wq_cancel io_wq_cancel_cb(struct io_wq *wq, work_cancel_fn *cancel, 1048 void *data, bool cancel_all) 1049 { 1050 struct io_cb_cancel_data match = { 1051 .fn = cancel, 1052 .data = data, 1053 .cancel_all = cancel_all, 1054 }; 1055 1056 /* 1057 * First check pending list, if we're lucky we can just remove it 1058 * from there. CANCEL_OK means that the work is returned as-new, 1059 * no completion will be posted for it. 1060 * 1061 * Then check if a free (going busy) or busy worker has the work 1062 * currently running. If we find it there, we'll return CANCEL_RUNNING 1063 * as an indication that we attempt to signal cancellation. The 1064 * completion will run normally in this case. 1065 * 1066 * Do both of these while holding the wq->lock, to ensure that 1067 * we'll find a work item regardless of state. 1068 */ 1069 io_wq_cancel_pending_work(wq, &match); 1070 if (match.nr_pending && !match.cancel_all) 1071 return IO_WQ_CANCEL_OK; 1072 1073 raw_spin_lock(&wq->lock); 1074 io_wq_cancel_running_work(wq, &match); 1075 raw_spin_unlock(&wq->lock); 1076 if (match.nr_running && !match.cancel_all) 1077 return IO_WQ_CANCEL_RUNNING; 1078 1079 if (match.nr_running) 1080 return IO_WQ_CANCEL_RUNNING; 1081 if (match.nr_pending) 1082 return IO_WQ_CANCEL_OK; 1083 return IO_WQ_CANCEL_NOTFOUND; 1084 } 1085 1086 static int io_wq_hash_wake(struct wait_queue_entry *wait, unsigned mode, 1087 int sync, void *key) 1088 { 1089 struct io_wq *wq = container_of(wait, struct io_wq, wait); 1090 int i; 1091 1092 list_del_init(&wait->entry); 1093 1094 rcu_read_lock(); 1095 for (i = 0; i < IO_WQ_ACCT_NR; i++) { 1096 struct io_wq_acct *acct = &wq->acct[i]; 1097 1098 if (test_and_clear_bit(IO_ACCT_STALLED_BIT, &acct->flags)) 1099 io_wq_activate_free_worker(wq, acct); 1100 } 1101 rcu_read_unlock(); 1102 return 1; 1103 } 1104 1105 struct io_wq *io_wq_create(unsigned bounded, struct io_wq_data *data) 1106 { 1107 int ret, i; 1108 struct io_wq *wq; 1109 1110 if (WARN_ON_ONCE(!data->free_work || !data->do_work)) 1111 return ERR_PTR(-EINVAL); 1112 if (WARN_ON_ONCE(!bounded)) 1113 return ERR_PTR(-EINVAL); 1114 1115 wq = kzalloc(sizeof(struct io_wq), GFP_KERNEL); 1116 if (!wq) 1117 return ERR_PTR(-ENOMEM); 1118 ret = cpuhp_state_add_instance_nocalls(io_wq_online, &wq->cpuhp_node); 1119 if (ret) 1120 goto err_wq; 1121 1122 refcount_inc(&data->hash->refs); 1123 wq->hash = data->hash; 1124 wq->free_work = data->free_work; 1125 wq->do_work = data->do_work; 1126 1127 ret = -ENOMEM; 1128 1129 if (!alloc_cpumask_var(&wq->cpu_mask, GFP_KERNEL)) 1130 goto err; 1131 cpumask_copy(wq->cpu_mask, cpu_possible_mask); 1132 wq->acct[IO_WQ_ACCT_BOUND].max_workers = bounded; 1133 wq->acct[IO_WQ_ACCT_UNBOUND].max_workers = 1134 task_rlimit(current, RLIMIT_NPROC); 1135 INIT_LIST_HEAD(&wq->wait.entry); 1136 wq->wait.func = io_wq_hash_wake; 1137 for (i = 0; i < IO_WQ_ACCT_NR; i++) { 1138 struct io_wq_acct *acct = &wq->acct[i]; 1139 1140 acct->index = i; 1141 atomic_set(&acct->nr_running, 0); 1142 INIT_WQ_LIST(&acct->work_list); 1143 raw_spin_lock_init(&acct->lock); 1144 } 1145 1146 raw_spin_lock_init(&wq->lock); 1147 INIT_HLIST_NULLS_HEAD(&wq->free_list, 0); 1148 INIT_LIST_HEAD(&wq->all_list); 1149 1150 wq->task = get_task_struct(data->task); 1151 atomic_set(&wq->worker_refs, 1); 1152 init_completion(&wq->worker_done); 1153 return wq; 1154 err: 1155 io_wq_put_hash(data->hash); 1156 cpuhp_state_remove_instance_nocalls(io_wq_online, &wq->cpuhp_node); 1157 1158 free_cpumask_var(wq->cpu_mask); 1159 err_wq: 1160 kfree(wq); 1161 return ERR_PTR(ret); 1162 } 1163 1164 static bool io_task_work_match(struct callback_head *cb, void *data) 1165 { 1166 struct io_worker *worker; 1167 1168 if (cb->func != create_worker_cb && cb->func != create_worker_cont) 1169 return false; 1170 worker = container_of(cb, struct io_worker, create_work); 1171 return worker->wq == data; 1172 } 1173 1174 void io_wq_exit_start(struct io_wq *wq) 1175 { 1176 set_bit(IO_WQ_BIT_EXIT, &wq->state); 1177 } 1178 1179 static void io_wq_cancel_tw_create(struct io_wq *wq) 1180 { 1181 struct callback_head *cb; 1182 1183 while ((cb = task_work_cancel_match(wq->task, io_task_work_match, wq)) != NULL) { 1184 struct io_worker *worker; 1185 1186 worker = container_of(cb, struct io_worker, create_work); 1187 io_worker_cancel_cb(worker); 1188 /* 1189 * Only the worker continuation helper has worker allocated and 1190 * hence needs freeing. 1191 */ 1192 if (cb->func == create_worker_cont) 1193 kfree(worker); 1194 } 1195 } 1196 1197 static void io_wq_exit_workers(struct io_wq *wq) 1198 { 1199 if (!wq->task) 1200 return; 1201 1202 io_wq_cancel_tw_create(wq); 1203 1204 rcu_read_lock(); 1205 io_wq_for_each_worker(wq, io_wq_worker_wake, NULL); 1206 rcu_read_unlock(); 1207 io_worker_ref_put(wq); 1208 wait_for_completion(&wq->worker_done); 1209 1210 spin_lock_irq(&wq->hash->wait.lock); 1211 list_del_init(&wq->wait.entry); 1212 spin_unlock_irq(&wq->hash->wait.lock); 1213 1214 put_task_struct(wq->task); 1215 wq->task = NULL; 1216 } 1217 1218 static void io_wq_destroy(struct io_wq *wq) 1219 { 1220 struct io_cb_cancel_data match = { 1221 .fn = io_wq_work_match_all, 1222 .cancel_all = true, 1223 }; 1224 1225 cpuhp_state_remove_instance_nocalls(io_wq_online, &wq->cpuhp_node); 1226 io_wq_cancel_pending_work(wq, &match); 1227 free_cpumask_var(wq->cpu_mask); 1228 io_wq_put_hash(wq->hash); 1229 kfree(wq); 1230 } 1231 1232 void io_wq_put_and_exit(struct io_wq *wq) 1233 { 1234 WARN_ON_ONCE(!test_bit(IO_WQ_BIT_EXIT, &wq->state)); 1235 1236 io_wq_exit_workers(wq); 1237 io_wq_destroy(wq); 1238 } 1239 1240 struct online_data { 1241 unsigned int cpu; 1242 bool online; 1243 }; 1244 1245 static bool io_wq_worker_affinity(struct io_worker *worker, void *data) 1246 { 1247 struct online_data *od = data; 1248 1249 if (od->online) 1250 cpumask_set_cpu(od->cpu, worker->wq->cpu_mask); 1251 else 1252 cpumask_clear_cpu(od->cpu, worker->wq->cpu_mask); 1253 return false; 1254 } 1255 1256 static int __io_wq_cpu_online(struct io_wq *wq, unsigned int cpu, bool online) 1257 { 1258 struct online_data od = { 1259 .cpu = cpu, 1260 .online = online 1261 }; 1262 1263 rcu_read_lock(); 1264 io_wq_for_each_worker(wq, io_wq_worker_affinity, &od); 1265 rcu_read_unlock(); 1266 return 0; 1267 } 1268 1269 static int io_wq_cpu_online(unsigned int cpu, struct hlist_node *node) 1270 { 1271 struct io_wq *wq = hlist_entry_safe(node, struct io_wq, cpuhp_node); 1272 1273 return __io_wq_cpu_online(wq, cpu, true); 1274 } 1275 1276 static int io_wq_cpu_offline(unsigned int cpu, struct hlist_node *node) 1277 { 1278 struct io_wq *wq = hlist_entry_safe(node, struct io_wq, cpuhp_node); 1279 1280 return __io_wq_cpu_online(wq, cpu, false); 1281 } 1282 1283 int io_wq_cpu_affinity(struct io_wq *wq, cpumask_var_t mask) 1284 { 1285 rcu_read_lock(); 1286 if (mask) 1287 cpumask_copy(wq->cpu_mask, mask); 1288 else 1289 cpumask_copy(wq->cpu_mask, cpu_possible_mask); 1290 rcu_read_unlock(); 1291 1292 return 0; 1293 } 1294 1295 /* 1296 * Set max number of unbounded workers, returns old value. If new_count is 0, 1297 * then just return the old value. 1298 */ 1299 int io_wq_max_workers(struct io_wq *wq, int *new_count) 1300 { 1301 struct io_wq_acct *acct; 1302 int prev[IO_WQ_ACCT_NR]; 1303 int i; 1304 1305 BUILD_BUG_ON((int) IO_WQ_ACCT_BOUND != (int) IO_WQ_BOUND); 1306 BUILD_BUG_ON((int) IO_WQ_ACCT_UNBOUND != (int) IO_WQ_UNBOUND); 1307 BUILD_BUG_ON((int) IO_WQ_ACCT_NR != 2); 1308 1309 for (i = 0; i < IO_WQ_ACCT_NR; i++) { 1310 if (new_count[i] > task_rlimit(current, RLIMIT_NPROC)) 1311 new_count[i] = task_rlimit(current, RLIMIT_NPROC); 1312 } 1313 1314 for (i = 0; i < IO_WQ_ACCT_NR; i++) 1315 prev[i] = 0; 1316 1317 rcu_read_lock(); 1318 1319 raw_spin_lock(&wq->lock); 1320 for (i = 0; i < IO_WQ_ACCT_NR; i++) { 1321 acct = &wq->acct[i]; 1322 prev[i] = max_t(int, acct->max_workers, prev[i]); 1323 if (new_count[i]) 1324 acct->max_workers = new_count[i]; 1325 } 1326 raw_spin_unlock(&wq->lock); 1327 rcu_read_unlock(); 1328 1329 for (i = 0; i < IO_WQ_ACCT_NR; i++) 1330 new_count[i] = prev[i]; 1331 1332 return 0; 1333 } 1334 1335 static __init int io_wq_init(void) 1336 { 1337 int ret; 1338 1339 ret = cpuhp_setup_state_multi(CPUHP_AP_ONLINE_DYN, "io-wq/online", 1340 io_wq_cpu_online, io_wq_cpu_offline); 1341 if (ret < 0) 1342 return ret; 1343 io_wq_online = ret; 1344 return 0; 1345 } 1346 subsys_initcall(io_wq_init); 1347