1 // SPDX-License-Identifier: GPL-2.0 2 /* 3 * Basic worker thread pool for io_uring 4 * 5 * Copyright (C) 2019 Jens Axboe 6 * 7 */ 8 #include <linux/kernel.h> 9 #include <linux/init.h> 10 #include <linux/errno.h> 11 #include <linux/sched/signal.h> 12 #include <linux/percpu.h> 13 #include <linux/slab.h> 14 #include <linux/rculist_nulls.h> 15 #include <linux/cpu.h> 16 #include <linux/task_work.h> 17 #include <linux/audit.h> 18 #include <linux/mmu_context.h> 19 #include <uapi/linux/io_uring.h> 20 21 #include "io-wq.h" 22 #include "slist.h" 23 #include "io_uring.h" 24 25 #define WORKER_IDLE_TIMEOUT (5 * HZ) 26 #define WORKER_INIT_LIMIT 3 27 28 enum { 29 IO_WORKER_F_UP = 0, /* up and active */ 30 IO_WORKER_F_RUNNING = 1, /* account as running */ 31 IO_WORKER_F_FREE = 2, /* worker on free list */ 32 IO_WORKER_F_BOUND = 3, /* is doing bounded work */ 33 }; 34 35 enum { 36 IO_WQ_BIT_EXIT = 0, /* wq exiting */ 37 }; 38 39 enum { 40 IO_ACCT_STALLED_BIT = 0, /* stalled on hash */ 41 }; 42 43 /* 44 * One for each thread in a wq pool 45 */ 46 struct io_worker { 47 refcount_t ref; 48 int create_index; 49 unsigned long flags; 50 struct hlist_nulls_node nulls_node; 51 struct list_head all_list; 52 struct task_struct *task; 53 struct io_wq *wq; 54 55 struct io_wq_work *cur_work; 56 struct io_wq_work *next_work; 57 raw_spinlock_t lock; 58 59 struct completion ref_done; 60 61 unsigned long create_state; 62 struct callback_head create_work; 63 int init_retries; 64 65 union { 66 struct rcu_head rcu; 67 struct work_struct work; 68 }; 69 }; 70 71 #if BITS_PER_LONG == 64 72 #define IO_WQ_HASH_ORDER 6 73 #else 74 #define IO_WQ_HASH_ORDER 5 75 #endif 76 77 #define IO_WQ_NR_HASH_BUCKETS (1u << IO_WQ_HASH_ORDER) 78 79 struct io_wq_acct { 80 unsigned nr_workers; 81 unsigned max_workers; 82 int index; 83 atomic_t nr_running; 84 raw_spinlock_t lock; 85 struct io_wq_work_list work_list; 86 unsigned long flags; 87 }; 88 89 enum { 90 IO_WQ_ACCT_BOUND, 91 IO_WQ_ACCT_UNBOUND, 92 IO_WQ_ACCT_NR, 93 }; 94 95 /* 96 * Per io_wq state 97 */ 98 struct io_wq { 99 unsigned long state; 100 101 free_work_fn *free_work; 102 io_wq_work_fn *do_work; 103 104 struct io_wq_hash *hash; 105 106 atomic_t worker_refs; 107 struct completion worker_done; 108 109 struct hlist_node cpuhp_node; 110 111 struct task_struct *task; 112 113 struct io_wq_acct acct[IO_WQ_ACCT_NR]; 114 115 /* lock protects access to elements below */ 116 raw_spinlock_t lock; 117 118 struct hlist_nulls_head free_list; 119 struct list_head all_list; 120 121 struct wait_queue_entry wait; 122 123 struct io_wq_work *hash_tail[IO_WQ_NR_HASH_BUCKETS]; 124 125 cpumask_var_t cpu_mask; 126 }; 127 128 static enum cpuhp_state io_wq_online; 129 130 struct io_cb_cancel_data { 131 work_cancel_fn *fn; 132 void *data; 133 int nr_running; 134 int nr_pending; 135 bool cancel_all; 136 }; 137 138 static bool create_io_worker(struct io_wq *wq, int index); 139 static void io_wq_dec_running(struct io_worker *worker); 140 static bool io_acct_cancel_pending_work(struct io_wq *wq, 141 struct io_wq_acct *acct, 142 struct io_cb_cancel_data *match); 143 static void create_worker_cb(struct callback_head *cb); 144 static void io_wq_cancel_tw_create(struct io_wq *wq); 145 146 static bool io_worker_get(struct io_worker *worker) 147 { 148 return refcount_inc_not_zero(&worker->ref); 149 } 150 151 static void io_worker_release(struct io_worker *worker) 152 { 153 if (refcount_dec_and_test(&worker->ref)) 154 complete(&worker->ref_done); 155 } 156 157 static inline struct io_wq_acct *io_get_acct(struct io_wq *wq, bool bound) 158 { 159 return &wq->acct[bound ? IO_WQ_ACCT_BOUND : IO_WQ_ACCT_UNBOUND]; 160 } 161 162 static inline struct io_wq_acct *io_work_get_acct(struct io_wq *wq, 163 struct io_wq_work *work) 164 { 165 return io_get_acct(wq, !(work->flags & IO_WQ_WORK_UNBOUND)); 166 } 167 168 static inline struct io_wq_acct *io_wq_get_acct(struct io_worker *worker) 169 { 170 return io_get_acct(worker->wq, test_bit(IO_WORKER_F_BOUND, &worker->flags)); 171 } 172 173 static void io_worker_ref_put(struct io_wq *wq) 174 { 175 if (atomic_dec_and_test(&wq->worker_refs)) 176 complete(&wq->worker_done); 177 } 178 179 bool io_wq_worker_stopped(void) 180 { 181 struct io_worker *worker = current->worker_private; 182 183 if (WARN_ON_ONCE(!io_wq_current_is_worker())) 184 return true; 185 186 return test_bit(IO_WQ_BIT_EXIT, &worker->wq->state); 187 } 188 189 static void io_worker_cancel_cb(struct io_worker *worker) 190 { 191 struct io_wq_acct *acct = io_wq_get_acct(worker); 192 struct io_wq *wq = worker->wq; 193 194 atomic_dec(&acct->nr_running); 195 raw_spin_lock(&wq->lock); 196 acct->nr_workers--; 197 raw_spin_unlock(&wq->lock); 198 io_worker_ref_put(wq); 199 clear_bit_unlock(0, &worker->create_state); 200 io_worker_release(worker); 201 } 202 203 static bool io_task_worker_match(struct callback_head *cb, void *data) 204 { 205 struct io_worker *worker; 206 207 if (cb->func != create_worker_cb) 208 return false; 209 worker = container_of(cb, struct io_worker, create_work); 210 return worker == data; 211 } 212 213 static void io_worker_exit(struct io_worker *worker) 214 { 215 struct io_wq *wq = worker->wq; 216 217 while (1) { 218 struct callback_head *cb = task_work_cancel_match(wq->task, 219 io_task_worker_match, worker); 220 221 if (!cb) 222 break; 223 io_worker_cancel_cb(worker); 224 } 225 226 io_worker_release(worker); 227 wait_for_completion(&worker->ref_done); 228 229 raw_spin_lock(&wq->lock); 230 if (test_bit(IO_WORKER_F_FREE, &worker->flags)) 231 hlist_nulls_del_rcu(&worker->nulls_node); 232 list_del_rcu(&worker->all_list); 233 raw_spin_unlock(&wq->lock); 234 io_wq_dec_running(worker); 235 /* 236 * this worker is a goner, clear ->worker_private to avoid any 237 * inc/dec running calls that could happen as part of exit from 238 * touching 'worker'. 239 */ 240 current->worker_private = NULL; 241 242 kfree_rcu(worker, rcu); 243 io_worker_ref_put(wq); 244 do_exit(0); 245 } 246 247 static inline bool __io_acct_run_queue(struct io_wq_acct *acct) 248 { 249 return !test_bit(IO_ACCT_STALLED_BIT, &acct->flags) && 250 !wq_list_empty(&acct->work_list); 251 } 252 253 /* 254 * If there's work to do, returns true with acct->lock acquired. If not, 255 * returns false with no lock held. 256 */ 257 static inline bool io_acct_run_queue(struct io_wq_acct *acct) 258 __acquires(&acct->lock) 259 { 260 raw_spin_lock(&acct->lock); 261 if (__io_acct_run_queue(acct)) 262 return true; 263 264 raw_spin_unlock(&acct->lock); 265 return false; 266 } 267 268 /* 269 * Check head of free list for an available worker. If one isn't available, 270 * caller must create one. 271 */ 272 static bool io_wq_activate_free_worker(struct io_wq *wq, 273 struct io_wq_acct *acct) 274 __must_hold(RCU) 275 { 276 struct hlist_nulls_node *n; 277 struct io_worker *worker; 278 279 /* 280 * Iterate free_list and see if we can find an idle worker to 281 * activate. If a given worker is on the free_list but in the process 282 * of exiting, keep trying. 283 */ 284 hlist_nulls_for_each_entry_rcu(worker, n, &wq->free_list, nulls_node) { 285 if (!io_worker_get(worker)) 286 continue; 287 if (io_wq_get_acct(worker) != acct) { 288 io_worker_release(worker); 289 continue; 290 } 291 /* 292 * If the worker is already running, it's either already 293 * starting work or finishing work. In either case, if it does 294 * to go sleep, we'll kick off a new task for this work anyway. 295 */ 296 wake_up_process(worker->task); 297 io_worker_release(worker); 298 return true; 299 } 300 301 return false; 302 } 303 304 /* 305 * We need a worker. If we find a free one, we're good. If not, and we're 306 * below the max number of workers, create one. 307 */ 308 static bool io_wq_create_worker(struct io_wq *wq, struct io_wq_acct *acct) 309 { 310 /* 311 * Most likely an attempt to queue unbounded work on an io_wq that 312 * wasn't setup with any unbounded workers. 313 */ 314 if (unlikely(!acct->max_workers)) 315 pr_warn_once("io-wq is not configured for unbound workers"); 316 317 raw_spin_lock(&wq->lock); 318 if (acct->nr_workers >= acct->max_workers) { 319 raw_spin_unlock(&wq->lock); 320 return true; 321 } 322 acct->nr_workers++; 323 raw_spin_unlock(&wq->lock); 324 atomic_inc(&acct->nr_running); 325 atomic_inc(&wq->worker_refs); 326 return create_io_worker(wq, acct->index); 327 } 328 329 static void io_wq_inc_running(struct io_worker *worker) 330 { 331 struct io_wq_acct *acct = io_wq_get_acct(worker); 332 333 atomic_inc(&acct->nr_running); 334 } 335 336 static void create_worker_cb(struct callback_head *cb) 337 { 338 struct io_worker *worker; 339 struct io_wq *wq; 340 341 struct io_wq_acct *acct; 342 bool do_create = false; 343 344 worker = container_of(cb, struct io_worker, create_work); 345 wq = worker->wq; 346 acct = &wq->acct[worker->create_index]; 347 raw_spin_lock(&wq->lock); 348 349 if (acct->nr_workers < acct->max_workers) { 350 acct->nr_workers++; 351 do_create = true; 352 } 353 raw_spin_unlock(&wq->lock); 354 if (do_create) { 355 create_io_worker(wq, worker->create_index); 356 } else { 357 atomic_dec(&acct->nr_running); 358 io_worker_ref_put(wq); 359 } 360 clear_bit_unlock(0, &worker->create_state); 361 io_worker_release(worker); 362 } 363 364 static bool io_queue_worker_create(struct io_worker *worker, 365 struct io_wq_acct *acct, 366 task_work_func_t func) 367 { 368 struct io_wq *wq = worker->wq; 369 370 /* raced with exit, just ignore create call */ 371 if (test_bit(IO_WQ_BIT_EXIT, &wq->state)) 372 goto fail; 373 if (!io_worker_get(worker)) 374 goto fail; 375 /* 376 * create_state manages ownership of create_work/index. We should 377 * only need one entry per worker, as the worker going to sleep 378 * will trigger the condition, and waking will clear it once it 379 * runs the task_work. 380 */ 381 if (test_bit(0, &worker->create_state) || 382 test_and_set_bit_lock(0, &worker->create_state)) 383 goto fail_release; 384 385 atomic_inc(&wq->worker_refs); 386 init_task_work(&worker->create_work, func); 387 worker->create_index = acct->index; 388 if (!task_work_add(wq->task, &worker->create_work, TWA_SIGNAL)) { 389 /* 390 * EXIT may have been set after checking it above, check after 391 * adding the task_work and remove any creation item if it is 392 * now set. wq exit does that too, but we can have added this 393 * work item after we canceled in io_wq_exit_workers(). 394 */ 395 if (test_bit(IO_WQ_BIT_EXIT, &wq->state)) 396 io_wq_cancel_tw_create(wq); 397 io_worker_ref_put(wq); 398 return true; 399 } 400 io_worker_ref_put(wq); 401 clear_bit_unlock(0, &worker->create_state); 402 fail_release: 403 io_worker_release(worker); 404 fail: 405 atomic_dec(&acct->nr_running); 406 io_worker_ref_put(wq); 407 return false; 408 } 409 410 static void io_wq_dec_running(struct io_worker *worker) 411 { 412 struct io_wq_acct *acct = io_wq_get_acct(worker); 413 struct io_wq *wq = worker->wq; 414 415 if (!test_bit(IO_WORKER_F_UP, &worker->flags)) 416 return; 417 418 if (!atomic_dec_and_test(&acct->nr_running)) 419 return; 420 if (!io_acct_run_queue(acct)) 421 return; 422 423 raw_spin_unlock(&acct->lock); 424 atomic_inc(&acct->nr_running); 425 atomic_inc(&wq->worker_refs); 426 io_queue_worker_create(worker, acct, create_worker_cb); 427 } 428 429 /* 430 * Worker will start processing some work. Move it to the busy list, if 431 * it's currently on the freelist 432 */ 433 static void __io_worker_busy(struct io_wq *wq, struct io_worker *worker) 434 { 435 if (test_bit(IO_WORKER_F_FREE, &worker->flags)) { 436 clear_bit(IO_WORKER_F_FREE, &worker->flags); 437 raw_spin_lock(&wq->lock); 438 hlist_nulls_del_init_rcu(&worker->nulls_node); 439 raw_spin_unlock(&wq->lock); 440 } 441 } 442 443 /* 444 * No work, worker going to sleep. Move to freelist. 445 */ 446 static void __io_worker_idle(struct io_wq *wq, struct io_worker *worker) 447 __must_hold(wq->lock) 448 { 449 if (!test_bit(IO_WORKER_F_FREE, &worker->flags)) { 450 set_bit(IO_WORKER_F_FREE, &worker->flags); 451 hlist_nulls_add_head_rcu(&worker->nulls_node, &wq->free_list); 452 } 453 } 454 455 static inline unsigned int io_get_work_hash(struct io_wq_work *work) 456 { 457 return work->flags >> IO_WQ_HASH_SHIFT; 458 } 459 460 static bool io_wait_on_hash(struct io_wq *wq, unsigned int hash) 461 { 462 bool ret = false; 463 464 spin_lock_irq(&wq->hash->wait.lock); 465 if (list_empty(&wq->wait.entry)) { 466 __add_wait_queue(&wq->hash->wait, &wq->wait); 467 if (!test_bit(hash, &wq->hash->map)) { 468 __set_current_state(TASK_RUNNING); 469 list_del_init(&wq->wait.entry); 470 ret = true; 471 } 472 } 473 spin_unlock_irq(&wq->hash->wait.lock); 474 return ret; 475 } 476 477 static struct io_wq_work *io_get_next_work(struct io_wq_acct *acct, 478 struct io_worker *worker) 479 __must_hold(acct->lock) 480 { 481 struct io_wq_work_node *node, *prev; 482 struct io_wq_work *work, *tail; 483 unsigned int stall_hash = -1U; 484 struct io_wq *wq = worker->wq; 485 486 wq_list_for_each(node, prev, &acct->work_list) { 487 unsigned int hash; 488 489 work = container_of(node, struct io_wq_work, list); 490 491 /* not hashed, can run anytime */ 492 if (!io_wq_is_hashed(work)) { 493 wq_list_del(&acct->work_list, node, prev); 494 return work; 495 } 496 497 hash = io_get_work_hash(work); 498 /* all items with this hash lie in [work, tail] */ 499 tail = wq->hash_tail[hash]; 500 501 /* hashed, can run if not already running */ 502 if (!test_and_set_bit(hash, &wq->hash->map)) { 503 wq->hash_tail[hash] = NULL; 504 wq_list_cut(&acct->work_list, &tail->list, prev); 505 return work; 506 } 507 if (stall_hash == -1U) 508 stall_hash = hash; 509 /* fast forward to a next hash, for-each will fix up @prev */ 510 node = &tail->list; 511 } 512 513 if (stall_hash != -1U) { 514 bool unstalled; 515 516 /* 517 * Set this before dropping the lock to avoid racing with new 518 * work being added and clearing the stalled bit. 519 */ 520 set_bit(IO_ACCT_STALLED_BIT, &acct->flags); 521 raw_spin_unlock(&acct->lock); 522 unstalled = io_wait_on_hash(wq, stall_hash); 523 raw_spin_lock(&acct->lock); 524 if (unstalled) { 525 clear_bit(IO_ACCT_STALLED_BIT, &acct->flags); 526 if (wq_has_sleeper(&wq->hash->wait)) 527 wake_up(&wq->hash->wait); 528 } 529 } 530 531 return NULL; 532 } 533 534 static void io_assign_current_work(struct io_worker *worker, 535 struct io_wq_work *work) 536 { 537 if (work) { 538 io_run_task_work(); 539 cond_resched(); 540 } 541 542 raw_spin_lock(&worker->lock); 543 worker->cur_work = work; 544 worker->next_work = NULL; 545 raw_spin_unlock(&worker->lock); 546 } 547 548 /* 549 * Called with acct->lock held, drops it before returning 550 */ 551 static void io_worker_handle_work(struct io_wq_acct *acct, 552 struct io_worker *worker) 553 __releases(&acct->lock) 554 { 555 struct io_wq *wq = worker->wq; 556 bool do_kill = test_bit(IO_WQ_BIT_EXIT, &wq->state); 557 558 do { 559 struct io_wq_work *work; 560 561 /* 562 * If we got some work, mark us as busy. If we didn't, but 563 * the list isn't empty, it means we stalled on hashed work. 564 * Mark us stalled so we don't keep looking for work when we 565 * can't make progress, any work completion or insertion will 566 * clear the stalled flag. 567 */ 568 work = io_get_next_work(acct, worker); 569 if (work) { 570 /* 571 * Make sure cancelation can find this, even before 572 * it becomes the active work. That avoids a window 573 * where the work has been removed from our general 574 * work list, but isn't yet discoverable as the 575 * current work item for this worker. 576 */ 577 raw_spin_lock(&worker->lock); 578 worker->next_work = work; 579 raw_spin_unlock(&worker->lock); 580 } 581 582 raw_spin_unlock(&acct->lock); 583 584 if (!work) 585 break; 586 587 __io_worker_busy(wq, worker); 588 589 io_assign_current_work(worker, work); 590 __set_current_state(TASK_RUNNING); 591 592 /* handle a whole dependent link */ 593 do { 594 struct io_wq_work *next_hashed, *linked; 595 unsigned int hash = io_get_work_hash(work); 596 597 next_hashed = wq_next_work(work); 598 599 if (unlikely(do_kill) && (work->flags & IO_WQ_WORK_UNBOUND)) 600 work->flags |= IO_WQ_WORK_CANCEL; 601 wq->do_work(work); 602 io_assign_current_work(worker, NULL); 603 604 linked = wq->free_work(work); 605 work = next_hashed; 606 if (!work && linked && !io_wq_is_hashed(linked)) { 607 work = linked; 608 linked = NULL; 609 } 610 io_assign_current_work(worker, work); 611 if (linked) 612 io_wq_enqueue(wq, linked); 613 614 if (hash != -1U && !next_hashed) { 615 /* serialize hash clear with wake_up() */ 616 spin_lock_irq(&wq->hash->wait.lock); 617 clear_bit(hash, &wq->hash->map); 618 clear_bit(IO_ACCT_STALLED_BIT, &acct->flags); 619 spin_unlock_irq(&wq->hash->wait.lock); 620 if (wq_has_sleeper(&wq->hash->wait)) 621 wake_up(&wq->hash->wait); 622 } 623 } while (work); 624 625 if (!__io_acct_run_queue(acct)) 626 break; 627 raw_spin_lock(&acct->lock); 628 } while (1); 629 } 630 631 static int io_wq_worker(void *data) 632 { 633 struct io_worker *worker = data; 634 struct io_wq_acct *acct = io_wq_get_acct(worker); 635 struct io_wq *wq = worker->wq; 636 bool exit_mask = false, last_timeout = false; 637 char buf[TASK_COMM_LEN]; 638 639 set_mask_bits(&worker->flags, 0, 640 BIT(IO_WORKER_F_UP) | BIT(IO_WORKER_F_RUNNING)); 641 642 snprintf(buf, sizeof(buf), "iou-wrk-%d", wq->task->pid); 643 set_task_comm(current, buf); 644 645 while (!test_bit(IO_WQ_BIT_EXIT, &wq->state)) { 646 long ret; 647 648 set_current_state(TASK_INTERRUPTIBLE); 649 650 /* 651 * If we have work to do, io_acct_run_queue() returns with 652 * the acct->lock held. If not, it will drop it. 653 */ 654 while (io_acct_run_queue(acct)) 655 io_worker_handle_work(acct, worker); 656 657 raw_spin_lock(&wq->lock); 658 /* 659 * Last sleep timed out. Exit if we're not the last worker, 660 * or if someone modified our affinity. 661 */ 662 if (last_timeout && (exit_mask || acct->nr_workers > 1)) { 663 acct->nr_workers--; 664 raw_spin_unlock(&wq->lock); 665 __set_current_state(TASK_RUNNING); 666 break; 667 } 668 last_timeout = false; 669 __io_worker_idle(wq, worker); 670 raw_spin_unlock(&wq->lock); 671 if (io_run_task_work()) 672 continue; 673 ret = schedule_timeout(WORKER_IDLE_TIMEOUT); 674 if (signal_pending(current)) { 675 struct ksignal ksig; 676 677 if (!get_signal(&ksig)) 678 continue; 679 break; 680 } 681 if (!ret) { 682 last_timeout = true; 683 exit_mask = !cpumask_test_cpu(raw_smp_processor_id(), 684 wq->cpu_mask); 685 } 686 } 687 688 if (test_bit(IO_WQ_BIT_EXIT, &wq->state) && io_acct_run_queue(acct)) 689 io_worker_handle_work(acct, worker); 690 691 io_worker_exit(worker); 692 return 0; 693 } 694 695 /* 696 * Called when a worker is scheduled in. Mark us as currently running. 697 */ 698 void io_wq_worker_running(struct task_struct *tsk) 699 { 700 struct io_worker *worker = tsk->worker_private; 701 702 if (!worker) 703 return; 704 if (!test_bit(IO_WORKER_F_UP, &worker->flags)) 705 return; 706 if (test_bit(IO_WORKER_F_RUNNING, &worker->flags)) 707 return; 708 set_bit(IO_WORKER_F_RUNNING, &worker->flags); 709 io_wq_inc_running(worker); 710 } 711 712 /* 713 * Called when worker is going to sleep. If there are no workers currently 714 * running and we have work pending, wake up a free one or create a new one. 715 */ 716 void io_wq_worker_sleeping(struct task_struct *tsk) 717 { 718 struct io_worker *worker = tsk->worker_private; 719 720 if (!worker) 721 return; 722 if (!test_bit(IO_WORKER_F_UP, &worker->flags)) 723 return; 724 if (!test_bit(IO_WORKER_F_RUNNING, &worker->flags)) 725 return; 726 727 clear_bit(IO_WORKER_F_RUNNING, &worker->flags); 728 io_wq_dec_running(worker); 729 } 730 731 static void io_init_new_worker(struct io_wq *wq, struct io_worker *worker, 732 struct task_struct *tsk) 733 { 734 tsk->worker_private = worker; 735 worker->task = tsk; 736 set_cpus_allowed_ptr(tsk, wq->cpu_mask); 737 738 raw_spin_lock(&wq->lock); 739 hlist_nulls_add_head_rcu(&worker->nulls_node, &wq->free_list); 740 list_add_tail_rcu(&worker->all_list, &wq->all_list); 741 set_bit(IO_WORKER_F_FREE, &worker->flags); 742 raw_spin_unlock(&wq->lock); 743 wake_up_new_task(tsk); 744 } 745 746 static bool io_wq_work_match_all(struct io_wq_work *work, void *data) 747 { 748 return true; 749 } 750 751 static inline bool io_should_retry_thread(struct io_worker *worker, long err) 752 { 753 /* 754 * Prevent perpetual task_work retry, if the task (or its group) is 755 * exiting. 756 */ 757 if (fatal_signal_pending(current)) 758 return false; 759 if (worker->init_retries++ >= WORKER_INIT_LIMIT) 760 return false; 761 762 switch (err) { 763 case -EAGAIN: 764 case -ERESTARTSYS: 765 case -ERESTARTNOINTR: 766 case -ERESTARTNOHAND: 767 return true; 768 default: 769 return false; 770 } 771 } 772 773 static void create_worker_cont(struct callback_head *cb) 774 { 775 struct io_worker *worker; 776 struct task_struct *tsk; 777 struct io_wq *wq; 778 779 worker = container_of(cb, struct io_worker, create_work); 780 clear_bit_unlock(0, &worker->create_state); 781 wq = worker->wq; 782 tsk = create_io_thread(io_wq_worker, worker, NUMA_NO_NODE); 783 if (!IS_ERR(tsk)) { 784 io_init_new_worker(wq, worker, tsk); 785 io_worker_release(worker); 786 return; 787 } else if (!io_should_retry_thread(worker, PTR_ERR(tsk))) { 788 struct io_wq_acct *acct = io_wq_get_acct(worker); 789 790 atomic_dec(&acct->nr_running); 791 raw_spin_lock(&wq->lock); 792 acct->nr_workers--; 793 if (!acct->nr_workers) { 794 struct io_cb_cancel_data match = { 795 .fn = io_wq_work_match_all, 796 .cancel_all = true, 797 }; 798 799 raw_spin_unlock(&wq->lock); 800 while (io_acct_cancel_pending_work(wq, acct, &match)) 801 ; 802 } else { 803 raw_spin_unlock(&wq->lock); 804 } 805 io_worker_ref_put(wq); 806 kfree(worker); 807 return; 808 } 809 810 /* re-create attempts grab a new worker ref, drop the existing one */ 811 io_worker_release(worker); 812 schedule_work(&worker->work); 813 } 814 815 static void io_workqueue_create(struct work_struct *work) 816 { 817 struct io_worker *worker = container_of(work, struct io_worker, work); 818 struct io_wq_acct *acct = io_wq_get_acct(worker); 819 820 if (!io_queue_worker_create(worker, acct, create_worker_cont)) 821 kfree(worker); 822 } 823 824 static bool create_io_worker(struct io_wq *wq, int index) 825 { 826 struct io_wq_acct *acct = &wq->acct[index]; 827 struct io_worker *worker; 828 struct task_struct *tsk; 829 830 __set_current_state(TASK_RUNNING); 831 832 worker = kzalloc(sizeof(*worker), GFP_KERNEL); 833 if (!worker) { 834 fail: 835 atomic_dec(&acct->nr_running); 836 raw_spin_lock(&wq->lock); 837 acct->nr_workers--; 838 raw_spin_unlock(&wq->lock); 839 io_worker_ref_put(wq); 840 return false; 841 } 842 843 refcount_set(&worker->ref, 1); 844 worker->wq = wq; 845 raw_spin_lock_init(&worker->lock); 846 init_completion(&worker->ref_done); 847 848 if (index == IO_WQ_ACCT_BOUND) 849 set_bit(IO_WORKER_F_BOUND, &worker->flags); 850 851 tsk = create_io_thread(io_wq_worker, worker, NUMA_NO_NODE); 852 if (!IS_ERR(tsk)) { 853 io_init_new_worker(wq, worker, tsk); 854 } else if (!io_should_retry_thread(worker, PTR_ERR(tsk))) { 855 kfree(worker); 856 goto fail; 857 } else { 858 INIT_WORK(&worker->work, io_workqueue_create); 859 schedule_work(&worker->work); 860 } 861 862 return true; 863 } 864 865 /* 866 * Iterate the passed in list and call the specific function for each 867 * worker that isn't exiting 868 */ 869 static bool io_wq_for_each_worker(struct io_wq *wq, 870 bool (*func)(struct io_worker *, void *), 871 void *data) 872 { 873 struct io_worker *worker; 874 bool ret = false; 875 876 list_for_each_entry_rcu(worker, &wq->all_list, all_list) { 877 if (io_worker_get(worker)) { 878 /* no task if node is/was offline */ 879 if (worker->task) 880 ret = func(worker, data); 881 io_worker_release(worker); 882 if (ret) 883 break; 884 } 885 } 886 887 return ret; 888 } 889 890 static bool io_wq_worker_wake(struct io_worker *worker, void *data) 891 { 892 __set_notify_signal(worker->task); 893 wake_up_process(worker->task); 894 return false; 895 } 896 897 static void io_run_cancel(struct io_wq_work *work, struct io_wq *wq) 898 { 899 do { 900 work->flags |= IO_WQ_WORK_CANCEL; 901 wq->do_work(work); 902 work = wq->free_work(work); 903 } while (work); 904 } 905 906 static void io_wq_insert_work(struct io_wq *wq, struct io_wq_work *work) 907 { 908 struct io_wq_acct *acct = io_work_get_acct(wq, work); 909 unsigned int hash; 910 struct io_wq_work *tail; 911 912 if (!io_wq_is_hashed(work)) { 913 append: 914 wq_list_add_tail(&work->list, &acct->work_list); 915 return; 916 } 917 918 hash = io_get_work_hash(work); 919 tail = wq->hash_tail[hash]; 920 wq->hash_tail[hash] = work; 921 if (!tail) 922 goto append; 923 924 wq_list_add_after(&work->list, &tail->list, &acct->work_list); 925 } 926 927 static bool io_wq_work_match_item(struct io_wq_work *work, void *data) 928 { 929 return work == data; 930 } 931 932 void io_wq_enqueue(struct io_wq *wq, struct io_wq_work *work) 933 { 934 struct io_wq_acct *acct = io_work_get_acct(wq, work); 935 unsigned long work_flags = work->flags; 936 struct io_cb_cancel_data match = { 937 .fn = io_wq_work_match_item, 938 .data = work, 939 .cancel_all = false, 940 }; 941 bool do_create; 942 943 /* 944 * If io-wq is exiting for this task, or if the request has explicitly 945 * been marked as one that should not get executed, cancel it here. 946 */ 947 if (test_bit(IO_WQ_BIT_EXIT, &wq->state) || 948 (work->flags & IO_WQ_WORK_CANCEL)) { 949 io_run_cancel(work, wq); 950 return; 951 } 952 953 raw_spin_lock(&acct->lock); 954 io_wq_insert_work(wq, work); 955 clear_bit(IO_ACCT_STALLED_BIT, &acct->flags); 956 raw_spin_unlock(&acct->lock); 957 958 rcu_read_lock(); 959 do_create = !io_wq_activate_free_worker(wq, acct); 960 rcu_read_unlock(); 961 962 if (do_create && ((work_flags & IO_WQ_WORK_CONCURRENT) || 963 !atomic_read(&acct->nr_running))) { 964 bool did_create; 965 966 did_create = io_wq_create_worker(wq, acct); 967 if (likely(did_create)) 968 return; 969 970 raw_spin_lock(&wq->lock); 971 if (acct->nr_workers) { 972 raw_spin_unlock(&wq->lock); 973 return; 974 } 975 raw_spin_unlock(&wq->lock); 976 977 /* fatal condition, failed to create the first worker */ 978 io_acct_cancel_pending_work(wq, acct, &match); 979 } 980 } 981 982 /* 983 * Work items that hash to the same value will not be done in parallel. 984 * Used to limit concurrent writes, generally hashed by inode. 985 */ 986 void io_wq_hash_work(struct io_wq_work *work, void *val) 987 { 988 unsigned int bit; 989 990 bit = hash_ptr(val, IO_WQ_HASH_ORDER); 991 work->flags |= (IO_WQ_WORK_HASHED | (bit << IO_WQ_HASH_SHIFT)); 992 } 993 994 static bool __io_wq_worker_cancel(struct io_worker *worker, 995 struct io_cb_cancel_data *match, 996 struct io_wq_work *work) 997 { 998 if (work && match->fn(work, match->data)) { 999 work->flags |= IO_WQ_WORK_CANCEL; 1000 __set_notify_signal(worker->task); 1001 return true; 1002 } 1003 1004 return false; 1005 } 1006 1007 static bool io_wq_worker_cancel(struct io_worker *worker, void *data) 1008 { 1009 struct io_cb_cancel_data *match = data; 1010 1011 /* 1012 * Hold the lock to avoid ->cur_work going out of scope, caller 1013 * may dereference the passed in work. 1014 */ 1015 raw_spin_lock(&worker->lock); 1016 if (__io_wq_worker_cancel(worker, match, worker->cur_work) || 1017 __io_wq_worker_cancel(worker, match, worker->next_work)) 1018 match->nr_running++; 1019 raw_spin_unlock(&worker->lock); 1020 1021 return match->nr_running && !match->cancel_all; 1022 } 1023 1024 static inline void io_wq_remove_pending(struct io_wq *wq, 1025 struct io_wq_work *work, 1026 struct io_wq_work_node *prev) 1027 { 1028 struct io_wq_acct *acct = io_work_get_acct(wq, work); 1029 unsigned int hash = io_get_work_hash(work); 1030 struct io_wq_work *prev_work = NULL; 1031 1032 if (io_wq_is_hashed(work) && work == wq->hash_tail[hash]) { 1033 if (prev) 1034 prev_work = container_of(prev, struct io_wq_work, list); 1035 if (prev_work && io_get_work_hash(prev_work) == hash) 1036 wq->hash_tail[hash] = prev_work; 1037 else 1038 wq->hash_tail[hash] = NULL; 1039 } 1040 wq_list_del(&acct->work_list, &work->list, prev); 1041 } 1042 1043 static bool io_acct_cancel_pending_work(struct io_wq *wq, 1044 struct io_wq_acct *acct, 1045 struct io_cb_cancel_data *match) 1046 { 1047 struct io_wq_work_node *node, *prev; 1048 struct io_wq_work *work; 1049 1050 raw_spin_lock(&acct->lock); 1051 wq_list_for_each(node, prev, &acct->work_list) { 1052 work = container_of(node, struct io_wq_work, list); 1053 if (!match->fn(work, match->data)) 1054 continue; 1055 io_wq_remove_pending(wq, work, prev); 1056 raw_spin_unlock(&acct->lock); 1057 io_run_cancel(work, wq); 1058 match->nr_pending++; 1059 /* not safe to continue after unlock */ 1060 return true; 1061 } 1062 raw_spin_unlock(&acct->lock); 1063 1064 return false; 1065 } 1066 1067 static void io_wq_cancel_pending_work(struct io_wq *wq, 1068 struct io_cb_cancel_data *match) 1069 { 1070 int i; 1071 retry: 1072 for (i = 0; i < IO_WQ_ACCT_NR; i++) { 1073 struct io_wq_acct *acct = io_get_acct(wq, i == 0); 1074 1075 if (io_acct_cancel_pending_work(wq, acct, match)) { 1076 if (match->cancel_all) 1077 goto retry; 1078 break; 1079 } 1080 } 1081 } 1082 1083 static void io_wq_cancel_running_work(struct io_wq *wq, 1084 struct io_cb_cancel_data *match) 1085 { 1086 rcu_read_lock(); 1087 io_wq_for_each_worker(wq, io_wq_worker_cancel, match); 1088 rcu_read_unlock(); 1089 } 1090 1091 enum io_wq_cancel io_wq_cancel_cb(struct io_wq *wq, work_cancel_fn *cancel, 1092 void *data, bool cancel_all) 1093 { 1094 struct io_cb_cancel_data match = { 1095 .fn = cancel, 1096 .data = data, 1097 .cancel_all = cancel_all, 1098 }; 1099 1100 /* 1101 * First check pending list, if we're lucky we can just remove it 1102 * from there. CANCEL_OK means that the work is returned as-new, 1103 * no completion will be posted for it. 1104 * 1105 * Then check if a free (going busy) or busy worker has the work 1106 * currently running. If we find it there, we'll return CANCEL_RUNNING 1107 * as an indication that we attempt to signal cancellation. The 1108 * completion will run normally in this case. 1109 * 1110 * Do both of these while holding the wq->lock, to ensure that 1111 * we'll find a work item regardless of state. 1112 */ 1113 io_wq_cancel_pending_work(wq, &match); 1114 if (match.nr_pending && !match.cancel_all) 1115 return IO_WQ_CANCEL_OK; 1116 1117 raw_spin_lock(&wq->lock); 1118 io_wq_cancel_running_work(wq, &match); 1119 raw_spin_unlock(&wq->lock); 1120 if (match.nr_running && !match.cancel_all) 1121 return IO_WQ_CANCEL_RUNNING; 1122 1123 if (match.nr_running) 1124 return IO_WQ_CANCEL_RUNNING; 1125 if (match.nr_pending) 1126 return IO_WQ_CANCEL_OK; 1127 return IO_WQ_CANCEL_NOTFOUND; 1128 } 1129 1130 static int io_wq_hash_wake(struct wait_queue_entry *wait, unsigned mode, 1131 int sync, void *key) 1132 { 1133 struct io_wq *wq = container_of(wait, struct io_wq, wait); 1134 int i; 1135 1136 list_del_init(&wait->entry); 1137 1138 rcu_read_lock(); 1139 for (i = 0; i < IO_WQ_ACCT_NR; i++) { 1140 struct io_wq_acct *acct = &wq->acct[i]; 1141 1142 if (test_and_clear_bit(IO_ACCT_STALLED_BIT, &acct->flags)) 1143 io_wq_activate_free_worker(wq, acct); 1144 } 1145 rcu_read_unlock(); 1146 return 1; 1147 } 1148 1149 struct io_wq *io_wq_create(unsigned bounded, struct io_wq_data *data) 1150 { 1151 int ret, i; 1152 struct io_wq *wq; 1153 1154 if (WARN_ON_ONCE(!data->free_work || !data->do_work)) 1155 return ERR_PTR(-EINVAL); 1156 if (WARN_ON_ONCE(!bounded)) 1157 return ERR_PTR(-EINVAL); 1158 1159 wq = kzalloc(sizeof(struct io_wq), GFP_KERNEL); 1160 if (!wq) 1161 return ERR_PTR(-ENOMEM); 1162 1163 refcount_inc(&data->hash->refs); 1164 wq->hash = data->hash; 1165 wq->free_work = data->free_work; 1166 wq->do_work = data->do_work; 1167 1168 ret = -ENOMEM; 1169 1170 if (!alloc_cpumask_var(&wq->cpu_mask, GFP_KERNEL)) 1171 goto err; 1172 cpumask_copy(wq->cpu_mask, cpu_possible_mask); 1173 wq->acct[IO_WQ_ACCT_BOUND].max_workers = bounded; 1174 wq->acct[IO_WQ_ACCT_UNBOUND].max_workers = 1175 task_rlimit(current, RLIMIT_NPROC); 1176 INIT_LIST_HEAD(&wq->wait.entry); 1177 wq->wait.func = io_wq_hash_wake; 1178 for (i = 0; i < IO_WQ_ACCT_NR; i++) { 1179 struct io_wq_acct *acct = &wq->acct[i]; 1180 1181 acct->index = i; 1182 atomic_set(&acct->nr_running, 0); 1183 INIT_WQ_LIST(&acct->work_list); 1184 raw_spin_lock_init(&acct->lock); 1185 } 1186 1187 raw_spin_lock_init(&wq->lock); 1188 INIT_HLIST_NULLS_HEAD(&wq->free_list, 0); 1189 INIT_LIST_HEAD(&wq->all_list); 1190 1191 wq->task = get_task_struct(data->task); 1192 atomic_set(&wq->worker_refs, 1); 1193 init_completion(&wq->worker_done); 1194 ret = cpuhp_state_add_instance_nocalls(io_wq_online, &wq->cpuhp_node); 1195 if (ret) 1196 goto err; 1197 1198 return wq; 1199 err: 1200 io_wq_put_hash(data->hash); 1201 free_cpumask_var(wq->cpu_mask); 1202 kfree(wq); 1203 return ERR_PTR(ret); 1204 } 1205 1206 static bool io_task_work_match(struct callback_head *cb, void *data) 1207 { 1208 struct io_worker *worker; 1209 1210 if (cb->func != create_worker_cb && cb->func != create_worker_cont) 1211 return false; 1212 worker = container_of(cb, struct io_worker, create_work); 1213 return worker->wq == data; 1214 } 1215 1216 void io_wq_exit_start(struct io_wq *wq) 1217 { 1218 set_bit(IO_WQ_BIT_EXIT, &wq->state); 1219 } 1220 1221 static void io_wq_cancel_tw_create(struct io_wq *wq) 1222 { 1223 struct callback_head *cb; 1224 1225 while ((cb = task_work_cancel_match(wq->task, io_task_work_match, wq)) != NULL) { 1226 struct io_worker *worker; 1227 1228 worker = container_of(cb, struct io_worker, create_work); 1229 io_worker_cancel_cb(worker); 1230 /* 1231 * Only the worker continuation helper has worker allocated and 1232 * hence needs freeing. 1233 */ 1234 if (cb->func == create_worker_cont) 1235 kfree(worker); 1236 } 1237 } 1238 1239 static void io_wq_exit_workers(struct io_wq *wq) 1240 { 1241 if (!wq->task) 1242 return; 1243 1244 io_wq_cancel_tw_create(wq); 1245 1246 rcu_read_lock(); 1247 io_wq_for_each_worker(wq, io_wq_worker_wake, NULL); 1248 rcu_read_unlock(); 1249 io_worker_ref_put(wq); 1250 wait_for_completion(&wq->worker_done); 1251 1252 spin_lock_irq(&wq->hash->wait.lock); 1253 list_del_init(&wq->wait.entry); 1254 spin_unlock_irq(&wq->hash->wait.lock); 1255 1256 put_task_struct(wq->task); 1257 wq->task = NULL; 1258 } 1259 1260 static void io_wq_destroy(struct io_wq *wq) 1261 { 1262 struct io_cb_cancel_data match = { 1263 .fn = io_wq_work_match_all, 1264 .cancel_all = true, 1265 }; 1266 1267 cpuhp_state_remove_instance_nocalls(io_wq_online, &wq->cpuhp_node); 1268 io_wq_cancel_pending_work(wq, &match); 1269 free_cpumask_var(wq->cpu_mask); 1270 io_wq_put_hash(wq->hash); 1271 kfree(wq); 1272 } 1273 1274 void io_wq_put_and_exit(struct io_wq *wq) 1275 { 1276 WARN_ON_ONCE(!test_bit(IO_WQ_BIT_EXIT, &wq->state)); 1277 1278 io_wq_exit_workers(wq); 1279 io_wq_destroy(wq); 1280 } 1281 1282 struct online_data { 1283 unsigned int cpu; 1284 bool online; 1285 }; 1286 1287 static bool io_wq_worker_affinity(struct io_worker *worker, void *data) 1288 { 1289 struct online_data *od = data; 1290 1291 if (od->online) 1292 cpumask_set_cpu(od->cpu, worker->wq->cpu_mask); 1293 else 1294 cpumask_clear_cpu(od->cpu, worker->wq->cpu_mask); 1295 return false; 1296 } 1297 1298 static int __io_wq_cpu_online(struct io_wq *wq, unsigned int cpu, bool online) 1299 { 1300 struct online_data od = { 1301 .cpu = cpu, 1302 .online = online 1303 }; 1304 1305 rcu_read_lock(); 1306 io_wq_for_each_worker(wq, io_wq_worker_affinity, &od); 1307 rcu_read_unlock(); 1308 return 0; 1309 } 1310 1311 static int io_wq_cpu_online(unsigned int cpu, struct hlist_node *node) 1312 { 1313 struct io_wq *wq = hlist_entry_safe(node, struct io_wq, cpuhp_node); 1314 1315 return __io_wq_cpu_online(wq, cpu, true); 1316 } 1317 1318 static int io_wq_cpu_offline(unsigned int cpu, struct hlist_node *node) 1319 { 1320 struct io_wq *wq = hlist_entry_safe(node, struct io_wq, cpuhp_node); 1321 1322 return __io_wq_cpu_online(wq, cpu, false); 1323 } 1324 1325 int io_wq_cpu_affinity(struct io_uring_task *tctx, cpumask_var_t mask) 1326 { 1327 if (!tctx || !tctx->io_wq) 1328 return -EINVAL; 1329 1330 rcu_read_lock(); 1331 if (mask) 1332 cpumask_copy(tctx->io_wq->cpu_mask, mask); 1333 else 1334 cpumask_copy(tctx->io_wq->cpu_mask, cpu_possible_mask); 1335 rcu_read_unlock(); 1336 1337 return 0; 1338 } 1339 1340 /* 1341 * Set max number of unbounded workers, returns old value. If new_count is 0, 1342 * then just return the old value. 1343 */ 1344 int io_wq_max_workers(struct io_wq *wq, int *new_count) 1345 { 1346 struct io_wq_acct *acct; 1347 int prev[IO_WQ_ACCT_NR]; 1348 int i; 1349 1350 BUILD_BUG_ON((int) IO_WQ_ACCT_BOUND != (int) IO_WQ_BOUND); 1351 BUILD_BUG_ON((int) IO_WQ_ACCT_UNBOUND != (int) IO_WQ_UNBOUND); 1352 BUILD_BUG_ON((int) IO_WQ_ACCT_NR != 2); 1353 1354 for (i = 0; i < IO_WQ_ACCT_NR; i++) { 1355 if (new_count[i] > task_rlimit(current, RLIMIT_NPROC)) 1356 new_count[i] = task_rlimit(current, RLIMIT_NPROC); 1357 } 1358 1359 for (i = 0; i < IO_WQ_ACCT_NR; i++) 1360 prev[i] = 0; 1361 1362 rcu_read_lock(); 1363 1364 raw_spin_lock(&wq->lock); 1365 for (i = 0; i < IO_WQ_ACCT_NR; i++) { 1366 acct = &wq->acct[i]; 1367 prev[i] = max_t(int, acct->max_workers, prev[i]); 1368 if (new_count[i]) 1369 acct->max_workers = new_count[i]; 1370 } 1371 raw_spin_unlock(&wq->lock); 1372 rcu_read_unlock(); 1373 1374 for (i = 0; i < IO_WQ_ACCT_NR; i++) 1375 new_count[i] = prev[i]; 1376 1377 return 0; 1378 } 1379 1380 static __init int io_wq_init(void) 1381 { 1382 int ret; 1383 1384 ret = cpuhp_setup_state_multi(CPUHP_AP_ONLINE_DYN, "io-wq/online", 1385 io_wq_cpu_online, io_wq_cpu_offline); 1386 if (ret < 0) 1387 return ret; 1388 io_wq_online = ret; 1389 return 0; 1390 } 1391 subsys_initcall(io_wq_init); 1392