1 /* 2 * linux/net/sunrpc/sched.c 3 * 4 * Scheduling for synchronous and asynchronous RPC requests. 5 * 6 * Copyright (C) 1996 Olaf Kirch, <okir@monad.swb.de> 7 * 8 * TCP NFS related read + write fixes 9 * (C) 1999 Dave Airlie, University of Limerick, Ireland <airlied@linux.ie> 10 */ 11 12 #include <linux/module.h> 13 14 #include <linux/sched.h> 15 #include <linux/interrupt.h> 16 #include <linux/slab.h> 17 #include <linux/mempool.h> 18 #include <linux/smp.h> 19 #include <linux/spinlock.h> 20 #include <linux/mutex.h> 21 #include <linux/freezer.h> 22 23 #include <linux/sunrpc/clnt.h> 24 25 #include "sunrpc.h" 26 27 #ifdef RPC_DEBUG 28 #define RPCDBG_FACILITY RPCDBG_SCHED 29 #endif 30 31 #define CREATE_TRACE_POINTS 32 #include <trace/events/sunrpc.h> 33 34 /* 35 * RPC slabs and memory pools 36 */ 37 #define RPC_BUFFER_MAXSIZE (2048) 38 #define RPC_BUFFER_POOLSIZE (8) 39 #define RPC_TASK_POOLSIZE (8) 40 static struct kmem_cache *rpc_task_slabp __read_mostly; 41 static struct kmem_cache *rpc_buffer_slabp __read_mostly; 42 static mempool_t *rpc_task_mempool __read_mostly; 43 static mempool_t *rpc_buffer_mempool __read_mostly; 44 45 static void rpc_async_schedule(struct work_struct *); 46 static void rpc_release_task(struct rpc_task *task); 47 static void __rpc_queue_timer_fn(unsigned long ptr); 48 49 /* 50 * RPC tasks sit here while waiting for conditions to improve. 51 */ 52 static struct rpc_wait_queue delay_queue; 53 54 /* 55 * rpciod-related stuff 56 */ 57 struct workqueue_struct *rpciod_workqueue; 58 59 /* 60 * Disable the timer for a given RPC task. Should be called with 61 * queue->lock and bh_disabled in order to avoid races within 62 * rpc_run_timer(). 63 */ 64 static void 65 __rpc_disable_timer(struct rpc_wait_queue *queue, struct rpc_task *task) 66 { 67 if (task->tk_timeout == 0) 68 return; 69 dprintk("RPC: %5u disabling timer\n", task->tk_pid); 70 task->tk_timeout = 0; 71 list_del(&task->u.tk_wait.timer_list); 72 if (list_empty(&queue->timer_list.list)) 73 del_timer(&queue->timer_list.timer); 74 } 75 76 static void 77 rpc_set_queue_timer(struct rpc_wait_queue *queue, unsigned long expires) 78 { 79 queue->timer_list.expires = expires; 80 mod_timer(&queue->timer_list.timer, expires); 81 } 82 83 /* 84 * Set up a timer for the current task. 85 */ 86 static void 87 __rpc_add_timer(struct rpc_wait_queue *queue, struct rpc_task *task) 88 { 89 if (!task->tk_timeout) 90 return; 91 92 dprintk("RPC: %5u setting alarm for %lu ms\n", 93 task->tk_pid, task->tk_timeout * 1000 / HZ); 94 95 task->u.tk_wait.expires = jiffies + task->tk_timeout; 96 if (list_empty(&queue->timer_list.list) || time_before(task->u.tk_wait.expires, queue->timer_list.expires)) 97 rpc_set_queue_timer(queue, task->u.tk_wait.expires); 98 list_add(&task->u.tk_wait.timer_list, &queue->timer_list.list); 99 } 100 101 static void rpc_set_waitqueue_priority(struct rpc_wait_queue *queue, int priority) 102 { 103 queue->priority = priority; 104 } 105 106 static void rpc_set_waitqueue_owner(struct rpc_wait_queue *queue, pid_t pid) 107 { 108 queue->owner = pid; 109 queue->nr = RPC_BATCH_COUNT; 110 } 111 112 static void rpc_reset_waitqueue_priority(struct rpc_wait_queue *queue) 113 { 114 rpc_set_waitqueue_priority(queue, queue->maxpriority); 115 rpc_set_waitqueue_owner(queue, 0); 116 } 117 118 /* 119 * Add new request to a priority queue. 120 */ 121 static void __rpc_add_wait_queue_priority(struct rpc_wait_queue *queue, 122 struct rpc_task *task, 123 unsigned char queue_priority) 124 { 125 struct list_head *q; 126 struct rpc_task *t; 127 128 INIT_LIST_HEAD(&task->u.tk_wait.links); 129 if (unlikely(queue_priority > queue->maxpriority)) 130 queue_priority = queue->maxpriority; 131 if (queue_priority > queue->priority) 132 rpc_set_waitqueue_priority(queue, queue_priority); 133 q = &queue->tasks[queue_priority]; 134 list_for_each_entry(t, q, u.tk_wait.list) { 135 if (t->tk_owner == task->tk_owner) { 136 list_add_tail(&task->u.tk_wait.list, &t->u.tk_wait.links); 137 return; 138 } 139 } 140 list_add_tail(&task->u.tk_wait.list, q); 141 } 142 143 /* 144 * Add new request to wait queue. 145 * 146 * Swapper tasks always get inserted at the head of the queue. 147 * This should avoid many nasty memory deadlocks and hopefully 148 * improve overall performance. 149 * Everyone else gets appended to the queue to ensure proper FIFO behavior. 150 */ 151 static void __rpc_add_wait_queue(struct rpc_wait_queue *queue, 152 struct rpc_task *task, 153 unsigned char queue_priority) 154 { 155 WARN_ON_ONCE(RPC_IS_QUEUED(task)); 156 if (RPC_IS_QUEUED(task)) 157 return; 158 159 if (RPC_IS_PRIORITY(queue)) 160 __rpc_add_wait_queue_priority(queue, task, queue_priority); 161 else if (RPC_IS_SWAPPER(task)) 162 list_add(&task->u.tk_wait.list, &queue->tasks[0]); 163 else 164 list_add_tail(&task->u.tk_wait.list, &queue->tasks[0]); 165 task->tk_waitqueue = queue; 166 queue->qlen++; 167 rpc_set_queued(task); 168 169 dprintk("RPC: %5u added to queue %p \"%s\"\n", 170 task->tk_pid, queue, rpc_qname(queue)); 171 } 172 173 /* 174 * Remove request from a priority queue. 175 */ 176 static void __rpc_remove_wait_queue_priority(struct rpc_task *task) 177 { 178 struct rpc_task *t; 179 180 if (!list_empty(&task->u.tk_wait.links)) { 181 t = list_entry(task->u.tk_wait.links.next, struct rpc_task, u.tk_wait.list); 182 list_move(&t->u.tk_wait.list, &task->u.tk_wait.list); 183 list_splice_init(&task->u.tk_wait.links, &t->u.tk_wait.links); 184 } 185 } 186 187 /* 188 * Remove request from queue. 189 * Note: must be called with spin lock held. 190 */ 191 static void __rpc_remove_wait_queue(struct rpc_wait_queue *queue, struct rpc_task *task) 192 { 193 __rpc_disable_timer(queue, task); 194 if (RPC_IS_PRIORITY(queue)) 195 __rpc_remove_wait_queue_priority(task); 196 list_del(&task->u.tk_wait.list); 197 queue->qlen--; 198 dprintk("RPC: %5u removed from queue %p \"%s\"\n", 199 task->tk_pid, queue, rpc_qname(queue)); 200 } 201 202 static void __rpc_init_priority_wait_queue(struct rpc_wait_queue *queue, const char *qname, unsigned char nr_queues) 203 { 204 int i; 205 206 spin_lock_init(&queue->lock); 207 for (i = 0; i < ARRAY_SIZE(queue->tasks); i++) 208 INIT_LIST_HEAD(&queue->tasks[i]); 209 queue->maxpriority = nr_queues - 1; 210 rpc_reset_waitqueue_priority(queue); 211 queue->qlen = 0; 212 setup_timer(&queue->timer_list.timer, __rpc_queue_timer_fn, (unsigned long)queue); 213 INIT_LIST_HEAD(&queue->timer_list.list); 214 rpc_assign_waitqueue_name(queue, qname); 215 } 216 217 void rpc_init_priority_wait_queue(struct rpc_wait_queue *queue, const char *qname) 218 { 219 __rpc_init_priority_wait_queue(queue, qname, RPC_NR_PRIORITY); 220 } 221 EXPORT_SYMBOL_GPL(rpc_init_priority_wait_queue); 222 223 void rpc_init_wait_queue(struct rpc_wait_queue *queue, const char *qname) 224 { 225 __rpc_init_priority_wait_queue(queue, qname, 1); 226 } 227 EXPORT_SYMBOL_GPL(rpc_init_wait_queue); 228 229 void rpc_destroy_wait_queue(struct rpc_wait_queue *queue) 230 { 231 del_timer_sync(&queue->timer_list.timer); 232 } 233 EXPORT_SYMBOL_GPL(rpc_destroy_wait_queue); 234 235 static int rpc_wait_bit_killable(void *word) 236 { 237 if (fatal_signal_pending(current)) 238 return -ERESTARTSYS; 239 freezable_schedule(); 240 return 0; 241 } 242 243 #ifdef RPC_DEBUG 244 static void rpc_task_set_debuginfo(struct rpc_task *task) 245 { 246 static atomic_t rpc_pid; 247 248 task->tk_pid = atomic_inc_return(&rpc_pid); 249 } 250 #else 251 static inline void rpc_task_set_debuginfo(struct rpc_task *task) 252 { 253 } 254 #endif 255 256 static void rpc_set_active(struct rpc_task *task) 257 { 258 trace_rpc_task_begin(task->tk_client, task, NULL); 259 260 rpc_task_set_debuginfo(task); 261 set_bit(RPC_TASK_ACTIVE, &task->tk_runstate); 262 } 263 264 /* 265 * Mark an RPC call as having completed by clearing the 'active' bit 266 * and then waking up all tasks that were sleeping. 267 */ 268 static int rpc_complete_task(struct rpc_task *task) 269 { 270 void *m = &task->tk_runstate; 271 wait_queue_head_t *wq = bit_waitqueue(m, RPC_TASK_ACTIVE); 272 struct wait_bit_key k = __WAIT_BIT_KEY_INITIALIZER(m, RPC_TASK_ACTIVE); 273 unsigned long flags; 274 int ret; 275 276 trace_rpc_task_complete(task->tk_client, task, NULL); 277 278 spin_lock_irqsave(&wq->lock, flags); 279 clear_bit(RPC_TASK_ACTIVE, &task->tk_runstate); 280 ret = atomic_dec_and_test(&task->tk_count); 281 if (waitqueue_active(wq)) 282 __wake_up_locked_key(wq, TASK_NORMAL, &k); 283 spin_unlock_irqrestore(&wq->lock, flags); 284 return ret; 285 } 286 287 /* 288 * Allow callers to wait for completion of an RPC call 289 * 290 * Note the use of out_of_line_wait_on_bit() rather than wait_on_bit() 291 * to enforce taking of the wq->lock and hence avoid races with 292 * rpc_complete_task(). 293 */ 294 int __rpc_wait_for_completion_task(struct rpc_task *task, int (*action)(void *)) 295 { 296 if (action == NULL) 297 action = rpc_wait_bit_killable; 298 return out_of_line_wait_on_bit(&task->tk_runstate, RPC_TASK_ACTIVE, 299 action, TASK_KILLABLE); 300 } 301 EXPORT_SYMBOL_GPL(__rpc_wait_for_completion_task); 302 303 /* 304 * Make an RPC task runnable. 305 * 306 * Note: If the task is ASYNC, and is being made runnable after sitting on an 307 * rpc_wait_queue, this must be called with the queue spinlock held to protect 308 * the wait queue operation. 309 */ 310 static void rpc_make_runnable(struct rpc_task *task) 311 { 312 rpc_clear_queued(task); 313 if (rpc_test_and_set_running(task)) 314 return; 315 if (RPC_IS_ASYNC(task)) { 316 INIT_WORK(&task->u.tk_work, rpc_async_schedule); 317 queue_work(rpciod_workqueue, &task->u.tk_work); 318 } else 319 wake_up_bit(&task->tk_runstate, RPC_TASK_QUEUED); 320 } 321 322 /* 323 * Prepare for sleeping on a wait queue. 324 * By always appending tasks to the list we ensure FIFO behavior. 325 * NB: An RPC task will only receive interrupt-driven events as long 326 * as it's on a wait queue. 327 */ 328 static void __rpc_sleep_on_priority(struct rpc_wait_queue *q, 329 struct rpc_task *task, 330 rpc_action action, 331 unsigned char queue_priority) 332 { 333 dprintk("RPC: %5u sleep_on(queue \"%s\" time %lu)\n", 334 task->tk_pid, rpc_qname(q), jiffies); 335 336 trace_rpc_task_sleep(task->tk_client, task, q); 337 338 __rpc_add_wait_queue(q, task, queue_priority); 339 340 WARN_ON_ONCE(task->tk_callback != NULL); 341 task->tk_callback = action; 342 __rpc_add_timer(q, task); 343 } 344 345 void rpc_sleep_on(struct rpc_wait_queue *q, struct rpc_task *task, 346 rpc_action action) 347 { 348 /* We shouldn't ever put an inactive task to sleep */ 349 WARN_ON_ONCE(!RPC_IS_ACTIVATED(task)); 350 if (!RPC_IS_ACTIVATED(task)) { 351 task->tk_status = -EIO; 352 rpc_put_task_async(task); 353 return; 354 } 355 356 /* 357 * Protect the queue operations. 358 */ 359 spin_lock_bh(&q->lock); 360 __rpc_sleep_on_priority(q, task, action, task->tk_priority); 361 spin_unlock_bh(&q->lock); 362 } 363 EXPORT_SYMBOL_GPL(rpc_sleep_on); 364 365 void rpc_sleep_on_priority(struct rpc_wait_queue *q, struct rpc_task *task, 366 rpc_action action, int priority) 367 { 368 /* We shouldn't ever put an inactive task to sleep */ 369 WARN_ON_ONCE(!RPC_IS_ACTIVATED(task)); 370 if (!RPC_IS_ACTIVATED(task)) { 371 task->tk_status = -EIO; 372 rpc_put_task_async(task); 373 return; 374 } 375 376 /* 377 * Protect the queue operations. 378 */ 379 spin_lock_bh(&q->lock); 380 __rpc_sleep_on_priority(q, task, action, priority - RPC_PRIORITY_LOW); 381 spin_unlock_bh(&q->lock); 382 } 383 EXPORT_SYMBOL_GPL(rpc_sleep_on_priority); 384 385 /** 386 * __rpc_do_wake_up_task - wake up a single rpc_task 387 * @queue: wait queue 388 * @task: task to be woken up 389 * 390 * Caller must hold queue->lock, and have cleared the task queued flag. 391 */ 392 static void __rpc_do_wake_up_task(struct rpc_wait_queue *queue, struct rpc_task *task) 393 { 394 dprintk("RPC: %5u __rpc_wake_up_task (now %lu)\n", 395 task->tk_pid, jiffies); 396 397 /* Has the task been executed yet? If not, we cannot wake it up! */ 398 if (!RPC_IS_ACTIVATED(task)) { 399 printk(KERN_ERR "RPC: Inactive task (%p) being woken up!\n", task); 400 return; 401 } 402 403 trace_rpc_task_wakeup(task->tk_client, task, queue); 404 405 __rpc_remove_wait_queue(queue, task); 406 407 rpc_make_runnable(task); 408 409 dprintk("RPC: __rpc_wake_up_task done\n"); 410 } 411 412 /* 413 * Wake up a queued task while the queue lock is being held 414 */ 415 static void rpc_wake_up_task_queue_locked(struct rpc_wait_queue *queue, struct rpc_task *task) 416 { 417 if (RPC_IS_QUEUED(task) && task->tk_waitqueue == queue) 418 __rpc_do_wake_up_task(queue, task); 419 } 420 421 /* 422 * Tests whether rpc queue is empty 423 */ 424 int rpc_queue_empty(struct rpc_wait_queue *queue) 425 { 426 int res; 427 428 spin_lock_bh(&queue->lock); 429 res = queue->qlen; 430 spin_unlock_bh(&queue->lock); 431 return res == 0; 432 } 433 EXPORT_SYMBOL_GPL(rpc_queue_empty); 434 435 /* 436 * Wake up a task on a specific queue 437 */ 438 void rpc_wake_up_queued_task(struct rpc_wait_queue *queue, struct rpc_task *task) 439 { 440 spin_lock_bh(&queue->lock); 441 rpc_wake_up_task_queue_locked(queue, task); 442 spin_unlock_bh(&queue->lock); 443 } 444 EXPORT_SYMBOL_GPL(rpc_wake_up_queued_task); 445 446 /* 447 * Wake up the next task on a priority queue. 448 */ 449 static struct rpc_task *__rpc_find_next_queued_priority(struct rpc_wait_queue *queue) 450 { 451 struct list_head *q; 452 struct rpc_task *task; 453 454 /* 455 * Service a batch of tasks from a single owner. 456 */ 457 q = &queue->tasks[queue->priority]; 458 if (!list_empty(q)) { 459 task = list_entry(q->next, struct rpc_task, u.tk_wait.list); 460 if (queue->owner == task->tk_owner) { 461 if (--queue->nr) 462 goto out; 463 list_move_tail(&task->u.tk_wait.list, q); 464 } 465 /* 466 * Check if we need to switch queues. 467 */ 468 goto new_owner; 469 } 470 471 /* 472 * Service the next queue. 473 */ 474 do { 475 if (q == &queue->tasks[0]) 476 q = &queue->tasks[queue->maxpriority]; 477 else 478 q = q - 1; 479 if (!list_empty(q)) { 480 task = list_entry(q->next, struct rpc_task, u.tk_wait.list); 481 goto new_queue; 482 } 483 } while (q != &queue->tasks[queue->priority]); 484 485 rpc_reset_waitqueue_priority(queue); 486 return NULL; 487 488 new_queue: 489 rpc_set_waitqueue_priority(queue, (unsigned int)(q - &queue->tasks[0])); 490 new_owner: 491 rpc_set_waitqueue_owner(queue, task->tk_owner); 492 out: 493 return task; 494 } 495 496 static struct rpc_task *__rpc_find_next_queued(struct rpc_wait_queue *queue) 497 { 498 if (RPC_IS_PRIORITY(queue)) 499 return __rpc_find_next_queued_priority(queue); 500 if (!list_empty(&queue->tasks[0])) 501 return list_first_entry(&queue->tasks[0], struct rpc_task, u.tk_wait.list); 502 return NULL; 503 } 504 505 /* 506 * Wake up the first task on the wait queue. 507 */ 508 struct rpc_task *rpc_wake_up_first(struct rpc_wait_queue *queue, 509 bool (*func)(struct rpc_task *, void *), void *data) 510 { 511 struct rpc_task *task = NULL; 512 513 dprintk("RPC: wake_up_first(%p \"%s\")\n", 514 queue, rpc_qname(queue)); 515 spin_lock_bh(&queue->lock); 516 task = __rpc_find_next_queued(queue); 517 if (task != NULL) { 518 if (func(task, data)) 519 rpc_wake_up_task_queue_locked(queue, task); 520 else 521 task = NULL; 522 } 523 spin_unlock_bh(&queue->lock); 524 525 return task; 526 } 527 EXPORT_SYMBOL_GPL(rpc_wake_up_first); 528 529 static bool rpc_wake_up_next_func(struct rpc_task *task, void *data) 530 { 531 return true; 532 } 533 534 /* 535 * Wake up the next task on the wait queue. 536 */ 537 struct rpc_task *rpc_wake_up_next(struct rpc_wait_queue *queue) 538 { 539 return rpc_wake_up_first(queue, rpc_wake_up_next_func, NULL); 540 } 541 EXPORT_SYMBOL_GPL(rpc_wake_up_next); 542 543 /** 544 * rpc_wake_up - wake up all rpc_tasks 545 * @queue: rpc_wait_queue on which the tasks are sleeping 546 * 547 * Grabs queue->lock 548 */ 549 void rpc_wake_up(struct rpc_wait_queue *queue) 550 { 551 struct list_head *head; 552 553 spin_lock_bh(&queue->lock); 554 head = &queue->tasks[queue->maxpriority]; 555 for (;;) { 556 while (!list_empty(head)) { 557 struct rpc_task *task; 558 task = list_first_entry(head, 559 struct rpc_task, 560 u.tk_wait.list); 561 rpc_wake_up_task_queue_locked(queue, task); 562 } 563 if (head == &queue->tasks[0]) 564 break; 565 head--; 566 } 567 spin_unlock_bh(&queue->lock); 568 } 569 EXPORT_SYMBOL_GPL(rpc_wake_up); 570 571 /** 572 * rpc_wake_up_status - wake up all rpc_tasks and set their status value. 573 * @queue: rpc_wait_queue on which the tasks are sleeping 574 * @status: status value to set 575 * 576 * Grabs queue->lock 577 */ 578 void rpc_wake_up_status(struct rpc_wait_queue *queue, int status) 579 { 580 struct list_head *head; 581 582 spin_lock_bh(&queue->lock); 583 head = &queue->tasks[queue->maxpriority]; 584 for (;;) { 585 while (!list_empty(head)) { 586 struct rpc_task *task; 587 task = list_first_entry(head, 588 struct rpc_task, 589 u.tk_wait.list); 590 task->tk_status = status; 591 rpc_wake_up_task_queue_locked(queue, task); 592 } 593 if (head == &queue->tasks[0]) 594 break; 595 head--; 596 } 597 spin_unlock_bh(&queue->lock); 598 } 599 EXPORT_SYMBOL_GPL(rpc_wake_up_status); 600 601 static void __rpc_queue_timer_fn(unsigned long ptr) 602 { 603 struct rpc_wait_queue *queue = (struct rpc_wait_queue *)ptr; 604 struct rpc_task *task, *n; 605 unsigned long expires, now, timeo; 606 607 spin_lock(&queue->lock); 608 expires = now = jiffies; 609 list_for_each_entry_safe(task, n, &queue->timer_list.list, u.tk_wait.timer_list) { 610 timeo = task->u.tk_wait.expires; 611 if (time_after_eq(now, timeo)) { 612 dprintk("RPC: %5u timeout\n", task->tk_pid); 613 task->tk_status = -ETIMEDOUT; 614 rpc_wake_up_task_queue_locked(queue, task); 615 continue; 616 } 617 if (expires == now || time_after(expires, timeo)) 618 expires = timeo; 619 } 620 if (!list_empty(&queue->timer_list.list)) 621 rpc_set_queue_timer(queue, expires); 622 spin_unlock(&queue->lock); 623 } 624 625 static void __rpc_atrun(struct rpc_task *task) 626 { 627 task->tk_status = 0; 628 } 629 630 /* 631 * Run a task at a later time 632 */ 633 void rpc_delay(struct rpc_task *task, unsigned long delay) 634 { 635 task->tk_timeout = delay; 636 rpc_sleep_on(&delay_queue, task, __rpc_atrun); 637 } 638 EXPORT_SYMBOL_GPL(rpc_delay); 639 640 /* 641 * Helper to call task->tk_ops->rpc_call_prepare 642 */ 643 void rpc_prepare_task(struct rpc_task *task) 644 { 645 task->tk_ops->rpc_call_prepare(task, task->tk_calldata); 646 } 647 648 static void 649 rpc_init_task_statistics(struct rpc_task *task) 650 { 651 /* Initialize retry counters */ 652 task->tk_garb_retry = 2; 653 task->tk_cred_retry = 2; 654 task->tk_rebind_retry = 2; 655 656 /* starting timestamp */ 657 task->tk_start = ktime_get(); 658 } 659 660 static void 661 rpc_reset_task_statistics(struct rpc_task *task) 662 { 663 task->tk_timeouts = 0; 664 task->tk_flags &= ~(RPC_CALL_MAJORSEEN|RPC_TASK_KILLED|RPC_TASK_SENT); 665 666 rpc_init_task_statistics(task); 667 } 668 669 /* 670 * Helper that calls task->tk_ops->rpc_call_done if it exists 671 */ 672 void rpc_exit_task(struct rpc_task *task) 673 { 674 task->tk_action = NULL; 675 if (task->tk_ops->rpc_call_done != NULL) { 676 task->tk_ops->rpc_call_done(task, task->tk_calldata); 677 if (task->tk_action != NULL) { 678 WARN_ON(RPC_ASSASSINATED(task)); 679 /* Always release the RPC slot and buffer memory */ 680 xprt_release(task); 681 rpc_reset_task_statistics(task); 682 } 683 } 684 } 685 686 void rpc_exit(struct rpc_task *task, int status) 687 { 688 task->tk_status = status; 689 task->tk_action = rpc_exit_task; 690 if (RPC_IS_QUEUED(task)) 691 rpc_wake_up_queued_task(task->tk_waitqueue, task); 692 } 693 EXPORT_SYMBOL_GPL(rpc_exit); 694 695 void rpc_release_calldata(const struct rpc_call_ops *ops, void *calldata) 696 { 697 if (ops->rpc_release != NULL) 698 ops->rpc_release(calldata); 699 } 700 701 /* 702 * This is the RPC `scheduler' (or rather, the finite state machine). 703 */ 704 static void __rpc_execute(struct rpc_task *task) 705 { 706 struct rpc_wait_queue *queue; 707 int task_is_async = RPC_IS_ASYNC(task); 708 int status = 0; 709 710 dprintk("RPC: %5u __rpc_execute flags=0x%x\n", 711 task->tk_pid, task->tk_flags); 712 713 WARN_ON_ONCE(RPC_IS_QUEUED(task)); 714 if (RPC_IS_QUEUED(task)) 715 return; 716 717 for (;;) { 718 void (*do_action)(struct rpc_task *); 719 720 /* 721 * Execute any pending callback first. 722 */ 723 do_action = task->tk_callback; 724 task->tk_callback = NULL; 725 if (do_action == NULL) { 726 /* 727 * Perform the next FSM step. 728 * tk_action may be NULL if the task has been killed. 729 * In particular, note that rpc_killall_tasks may 730 * do this at any time, so beware when dereferencing. 731 */ 732 do_action = task->tk_action; 733 if (do_action == NULL) 734 break; 735 } 736 trace_rpc_task_run_action(task->tk_client, task, task->tk_action); 737 do_action(task); 738 739 /* 740 * Lockless check for whether task is sleeping or not. 741 */ 742 if (!RPC_IS_QUEUED(task)) 743 continue; 744 /* 745 * The queue->lock protects against races with 746 * rpc_make_runnable(). 747 * 748 * Note that once we clear RPC_TASK_RUNNING on an asynchronous 749 * rpc_task, rpc_make_runnable() can assign it to a 750 * different workqueue. We therefore cannot assume that the 751 * rpc_task pointer may still be dereferenced. 752 */ 753 queue = task->tk_waitqueue; 754 spin_lock_bh(&queue->lock); 755 if (!RPC_IS_QUEUED(task)) { 756 spin_unlock_bh(&queue->lock); 757 continue; 758 } 759 rpc_clear_running(task); 760 spin_unlock_bh(&queue->lock); 761 if (task_is_async) 762 return; 763 764 /* sync task: sleep here */ 765 dprintk("RPC: %5u sync task going to sleep\n", task->tk_pid); 766 status = out_of_line_wait_on_bit(&task->tk_runstate, 767 RPC_TASK_QUEUED, rpc_wait_bit_killable, 768 TASK_KILLABLE); 769 if (status == -ERESTARTSYS) { 770 /* 771 * When a sync task receives a signal, it exits with 772 * -ERESTARTSYS. In order to catch any callbacks that 773 * clean up after sleeping on some queue, we don't 774 * break the loop here, but go around once more. 775 */ 776 dprintk("RPC: %5u got signal\n", task->tk_pid); 777 task->tk_flags |= RPC_TASK_KILLED; 778 rpc_exit(task, -ERESTARTSYS); 779 } 780 rpc_set_running(task); 781 dprintk("RPC: %5u sync task resuming\n", task->tk_pid); 782 } 783 784 dprintk("RPC: %5u return %d, status %d\n", task->tk_pid, status, 785 task->tk_status); 786 /* Release all resources associated with the task */ 787 rpc_release_task(task); 788 } 789 790 /* 791 * User-visible entry point to the scheduler. 792 * 793 * This may be called recursively if e.g. an async NFS task updates 794 * the attributes and finds that dirty pages must be flushed. 795 * NOTE: Upon exit of this function the task is guaranteed to be 796 * released. In particular note that tk_release() will have 797 * been called, so your task memory may have been freed. 798 */ 799 void rpc_execute(struct rpc_task *task) 800 { 801 rpc_set_active(task); 802 rpc_make_runnable(task); 803 if (!RPC_IS_ASYNC(task)) 804 __rpc_execute(task); 805 } 806 807 static void rpc_async_schedule(struct work_struct *work) 808 { 809 current->flags |= PF_FSTRANS; 810 __rpc_execute(container_of(work, struct rpc_task, u.tk_work)); 811 current->flags &= ~PF_FSTRANS; 812 } 813 814 /** 815 * rpc_malloc - allocate an RPC buffer 816 * @task: RPC task that will use this buffer 817 * @size: requested byte size 818 * 819 * To prevent rpciod from hanging, this allocator never sleeps, 820 * returning NULL if the request cannot be serviced immediately. 821 * The caller can arrange to sleep in a way that is safe for rpciod. 822 * 823 * Most requests are 'small' (under 2KiB) and can be serviced from a 824 * mempool, ensuring that NFS reads and writes can always proceed, 825 * and that there is good locality of reference for these buffers. 826 * 827 * In order to avoid memory starvation triggering more writebacks of 828 * NFS requests, we avoid using GFP_KERNEL. 829 */ 830 void *rpc_malloc(struct rpc_task *task, size_t size) 831 { 832 struct rpc_buffer *buf; 833 gfp_t gfp = GFP_NOWAIT; 834 835 if (RPC_IS_SWAPPER(task)) 836 gfp |= __GFP_MEMALLOC; 837 838 size += sizeof(struct rpc_buffer); 839 if (size <= RPC_BUFFER_MAXSIZE) 840 buf = mempool_alloc(rpc_buffer_mempool, gfp); 841 else 842 buf = kmalloc(size, gfp); 843 844 if (!buf) 845 return NULL; 846 847 buf->len = size; 848 dprintk("RPC: %5u allocated buffer of size %zu at %p\n", 849 task->tk_pid, size, buf); 850 return &buf->data; 851 } 852 EXPORT_SYMBOL_GPL(rpc_malloc); 853 854 /** 855 * rpc_free - free buffer allocated via rpc_malloc 856 * @buffer: buffer to free 857 * 858 */ 859 void rpc_free(void *buffer) 860 { 861 size_t size; 862 struct rpc_buffer *buf; 863 864 if (!buffer) 865 return; 866 867 buf = container_of(buffer, struct rpc_buffer, data); 868 size = buf->len; 869 870 dprintk("RPC: freeing buffer of size %zu at %p\n", 871 size, buf); 872 873 if (size <= RPC_BUFFER_MAXSIZE) 874 mempool_free(buf, rpc_buffer_mempool); 875 else 876 kfree(buf); 877 } 878 EXPORT_SYMBOL_GPL(rpc_free); 879 880 /* 881 * Creation and deletion of RPC task structures 882 */ 883 static void rpc_init_task(struct rpc_task *task, const struct rpc_task_setup *task_setup_data) 884 { 885 memset(task, 0, sizeof(*task)); 886 atomic_set(&task->tk_count, 1); 887 task->tk_flags = task_setup_data->flags; 888 task->tk_ops = task_setup_data->callback_ops; 889 task->tk_calldata = task_setup_data->callback_data; 890 INIT_LIST_HEAD(&task->tk_task); 891 892 task->tk_priority = task_setup_data->priority - RPC_PRIORITY_LOW; 893 task->tk_owner = current->tgid; 894 895 /* Initialize workqueue for async tasks */ 896 task->tk_workqueue = task_setup_data->workqueue; 897 898 if (task->tk_ops->rpc_call_prepare != NULL) 899 task->tk_action = rpc_prepare_task; 900 901 rpc_init_task_statistics(task); 902 903 dprintk("RPC: new task initialized, procpid %u\n", 904 task_pid_nr(current)); 905 } 906 907 static struct rpc_task * 908 rpc_alloc_task(void) 909 { 910 return (struct rpc_task *)mempool_alloc(rpc_task_mempool, GFP_NOIO); 911 } 912 913 /* 914 * Create a new task for the specified client. 915 */ 916 struct rpc_task *rpc_new_task(const struct rpc_task_setup *setup_data) 917 { 918 struct rpc_task *task = setup_data->task; 919 unsigned short flags = 0; 920 921 if (task == NULL) { 922 task = rpc_alloc_task(); 923 if (task == NULL) { 924 rpc_release_calldata(setup_data->callback_ops, 925 setup_data->callback_data); 926 return ERR_PTR(-ENOMEM); 927 } 928 flags = RPC_TASK_DYNAMIC; 929 } 930 931 rpc_init_task(task, setup_data); 932 task->tk_flags |= flags; 933 dprintk("RPC: allocated task %p\n", task); 934 return task; 935 } 936 937 /* 938 * rpc_free_task - release rpc task and perform cleanups 939 * 940 * Note that we free up the rpc_task _after_ rpc_release_calldata() 941 * in order to work around a workqueue dependency issue. 942 * 943 * Tejun Heo states: 944 * "Workqueue currently considers two work items to be the same if they're 945 * on the same address and won't execute them concurrently - ie. it 946 * makes a work item which is queued again while being executed wait 947 * for the previous execution to complete. 948 * 949 * If a work function frees the work item, and then waits for an event 950 * which should be performed by another work item and *that* work item 951 * recycles the freed work item, it can create a false dependency loop. 952 * There really is no reliable way to detect this short of verifying 953 * every memory free." 954 * 955 */ 956 static void rpc_free_task(struct rpc_task *task) 957 { 958 unsigned short tk_flags = task->tk_flags; 959 960 rpc_release_calldata(task->tk_ops, task->tk_calldata); 961 962 if (tk_flags & RPC_TASK_DYNAMIC) { 963 dprintk("RPC: %5u freeing task\n", task->tk_pid); 964 mempool_free(task, rpc_task_mempool); 965 } 966 } 967 968 static void rpc_async_release(struct work_struct *work) 969 { 970 rpc_free_task(container_of(work, struct rpc_task, u.tk_work)); 971 } 972 973 static void rpc_release_resources_task(struct rpc_task *task) 974 { 975 if (task->tk_rqstp) 976 xprt_release(task); 977 if (task->tk_msg.rpc_cred) { 978 put_rpccred(task->tk_msg.rpc_cred); 979 task->tk_msg.rpc_cred = NULL; 980 } 981 rpc_task_release_client(task); 982 } 983 984 static void rpc_final_put_task(struct rpc_task *task, 985 struct workqueue_struct *q) 986 { 987 if (q != NULL) { 988 INIT_WORK(&task->u.tk_work, rpc_async_release); 989 queue_work(q, &task->u.tk_work); 990 } else 991 rpc_free_task(task); 992 } 993 994 static void rpc_do_put_task(struct rpc_task *task, struct workqueue_struct *q) 995 { 996 if (atomic_dec_and_test(&task->tk_count)) { 997 rpc_release_resources_task(task); 998 rpc_final_put_task(task, q); 999 } 1000 } 1001 1002 void rpc_put_task(struct rpc_task *task) 1003 { 1004 rpc_do_put_task(task, NULL); 1005 } 1006 EXPORT_SYMBOL_GPL(rpc_put_task); 1007 1008 void rpc_put_task_async(struct rpc_task *task) 1009 { 1010 rpc_do_put_task(task, task->tk_workqueue); 1011 } 1012 EXPORT_SYMBOL_GPL(rpc_put_task_async); 1013 1014 static void rpc_release_task(struct rpc_task *task) 1015 { 1016 dprintk("RPC: %5u release task\n", task->tk_pid); 1017 1018 WARN_ON_ONCE(RPC_IS_QUEUED(task)); 1019 1020 rpc_release_resources_task(task); 1021 1022 /* 1023 * Note: at this point we have been removed from rpc_clnt->cl_tasks, 1024 * so it should be safe to use task->tk_count as a test for whether 1025 * or not any other processes still hold references to our rpc_task. 1026 */ 1027 if (atomic_read(&task->tk_count) != 1 + !RPC_IS_ASYNC(task)) { 1028 /* Wake up anyone who may be waiting for task completion */ 1029 if (!rpc_complete_task(task)) 1030 return; 1031 } else { 1032 if (!atomic_dec_and_test(&task->tk_count)) 1033 return; 1034 } 1035 rpc_final_put_task(task, task->tk_workqueue); 1036 } 1037 1038 int rpciod_up(void) 1039 { 1040 return try_module_get(THIS_MODULE) ? 0 : -EINVAL; 1041 } 1042 1043 void rpciod_down(void) 1044 { 1045 module_put(THIS_MODULE); 1046 } 1047 1048 /* 1049 * Start up the rpciod workqueue. 1050 */ 1051 static int rpciod_start(void) 1052 { 1053 struct workqueue_struct *wq; 1054 1055 /* 1056 * Create the rpciod thread and wait for it to start. 1057 */ 1058 dprintk("RPC: creating workqueue rpciod\n"); 1059 wq = alloc_workqueue("rpciod", WQ_MEM_RECLAIM, 1); 1060 rpciod_workqueue = wq; 1061 return rpciod_workqueue != NULL; 1062 } 1063 1064 static void rpciod_stop(void) 1065 { 1066 struct workqueue_struct *wq = NULL; 1067 1068 if (rpciod_workqueue == NULL) 1069 return; 1070 dprintk("RPC: destroying workqueue rpciod\n"); 1071 1072 wq = rpciod_workqueue; 1073 rpciod_workqueue = NULL; 1074 destroy_workqueue(wq); 1075 } 1076 1077 void 1078 rpc_destroy_mempool(void) 1079 { 1080 rpciod_stop(); 1081 if (rpc_buffer_mempool) 1082 mempool_destroy(rpc_buffer_mempool); 1083 if (rpc_task_mempool) 1084 mempool_destroy(rpc_task_mempool); 1085 if (rpc_task_slabp) 1086 kmem_cache_destroy(rpc_task_slabp); 1087 if (rpc_buffer_slabp) 1088 kmem_cache_destroy(rpc_buffer_slabp); 1089 rpc_destroy_wait_queue(&delay_queue); 1090 } 1091 1092 int 1093 rpc_init_mempool(void) 1094 { 1095 /* 1096 * The following is not strictly a mempool initialisation, 1097 * but there is no harm in doing it here 1098 */ 1099 rpc_init_wait_queue(&delay_queue, "delayq"); 1100 if (!rpciod_start()) 1101 goto err_nomem; 1102 1103 rpc_task_slabp = kmem_cache_create("rpc_tasks", 1104 sizeof(struct rpc_task), 1105 0, SLAB_HWCACHE_ALIGN, 1106 NULL); 1107 if (!rpc_task_slabp) 1108 goto err_nomem; 1109 rpc_buffer_slabp = kmem_cache_create("rpc_buffers", 1110 RPC_BUFFER_MAXSIZE, 1111 0, SLAB_HWCACHE_ALIGN, 1112 NULL); 1113 if (!rpc_buffer_slabp) 1114 goto err_nomem; 1115 rpc_task_mempool = mempool_create_slab_pool(RPC_TASK_POOLSIZE, 1116 rpc_task_slabp); 1117 if (!rpc_task_mempool) 1118 goto err_nomem; 1119 rpc_buffer_mempool = mempool_create_slab_pool(RPC_BUFFER_POOLSIZE, 1120 rpc_buffer_slabp); 1121 if (!rpc_buffer_mempool) 1122 goto err_nomem; 1123 return 0; 1124 err_nomem: 1125 rpc_destroy_mempool(); 1126 return -ENOMEM; 1127 } 1128