1 /* 2 * linux/net/sunrpc/sched.c 3 * 4 * Scheduling for synchronous and asynchronous RPC requests. 5 * 6 * Copyright (C) 1996 Olaf Kirch, <okir@monad.swb.de> 7 * 8 * TCP NFS related read + write fixes 9 * (C) 1999 Dave Airlie, University of Limerick, Ireland <airlied@linux.ie> 10 */ 11 12 #include <linux/module.h> 13 14 #include <linux/sched.h> 15 #include <linux/interrupt.h> 16 #include <linux/slab.h> 17 #include <linux/mempool.h> 18 #include <linux/smp.h> 19 #include <linux/smp_lock.h> 20 #include <linux/spinlock.h> 21 22 #include <linux/sunrpc/clnt.h> 23 #include <linux/sunrpc/xprt.h> 24 25 #ifdef RPC_DEBUG 26 #define RPCDBG_FACILITY RPCDBG_SCHED 27 #define RPC_TASK_MAGIC_ID 0xf00baa 28 static int rpc_task_id; 29 #endif 30 31 /* 32 * RPC slabs and memory pools 33 */ 34 #define RPC_BUFFER_MAXSIZE (2048) 35 #define RPC_BUFFER_POOLSIZE (8) 36 #define RPC_TASK_POOLSIZE (8) 37 static kmem_cache_t *rpc_task_slabp __read_mostly; 38 static kmem_cache_t *rpc_buffer_slabp __read_mostly; 39 static mempool_t *rpc_task_mempool __read_mostly; 40 static mempool_t *rpc_buffer_mempool __read_mostly; 41 42 static void __rpc_default_timer(struct rpc_task *task); 43 static void rpciod_killall(void); 44 static void rpc_free(struct rpc_task *task); 45 46 static void rpc_async_schedule(void *); 47 48 /* 49 * RPC tasks that create another task (e.g. for contacting the portmapper) 50 * will wait on this queue for their child's completion 51 */ 52 static RPC_WAITQ(childq, "childq"); 53 54 /* 55 * RPC tasks sit here while waiting for conditions to improve. 56 */ 57 static RPC_WAITQ(delay_queue, "delayq"); 58 59 /* 60 * All RPC tasks are linked into this list 61 */ 62 static LIST_HEAD(all_tasks); 63 64 /* 65 * rpciod-related stuff 66 */ 67 static DECLARE_MUTEX(rpciod_sema); 68 static unsigned int rpciod_users; 69 static struct workqueue_struct *rpciod_workqueue; 70 71 /* 72 * Spinlock for other critical sections of code. 73 */ 74 static DEFINE_SPINLOCK(rpc_sched_lock); 75 76 /* 77 * Disable the timer for a given RPC task. Should be called with 78 * queue->lock and bh_disabled in order to avoid races within 79 * rpc_run_timer(). 80 */ 81 static inline void 82 __rpc_disable_timer(struct rpc_task *task) 83 { 84 dprintk("RPC: %4d disabling timer\n", task->tk_pid); 85 task->tk_timeout_fn = NULL; 86 task->tk_timeout = 0; 87 } 88 89 /* 90 * Run a timeout function. 91 * We use the callback in order to allow __rpc_wake_up_task() 92 * and friends to disable the timer synchronously on SMP systems 93 * without calling del_timer_sync(). The latter could cause a 94 * deadlock if called while we're holding spinlocks... 95 */ 96 static void rpc_run_timer(struct rpc_task *task) 97 { 98 void (*callback)(struct rpc_task *); 99 100 callback = task->tk_timeout_fn; 101 task->tk_timeout_fn = NULL; 102 if (callback && RPC_IS_QUEUED(task)) { 103 dprintk("RPC: %4d running timer\n", task->tk_pid); 104 callback(task); 105 } 106 smp_mb__before_clear_bit(); 107 clear_bit(RPC_TASK_HAS_TIMER, &task->tk_runstate); 108 smp_mb__after_clear_bit(); 109 } 110 111 /* 112 * Set up a timer for the current task. 113 */ 114 static inline void 115 __rpc_add_timer(struct rpc_task *task, rpc_action timer) 116 { 117 if (!task->tk_timeout) 118 return; 119 120 dprintk("RPC: %4d setting alarm for %lu ms\n", 121 task->tk_pid, task->tk_timeout * 1000 / HZ); 122 123 if (timer) 124 task->tk_timeout_fn = timer; 125 else 126 task->tk_timeout_fn = __rpc_default_timer; 127 set_bit(RPC_TASK_HAS_TIMER, &task->tk_runstate); 128 mod_timer(&task->tk_timer, jiffies + task->tk_timeout); 129 } 130 131 /* 132 * Delete any timer for the current task. Because we use del_timer_sync(), 133 * this function should never be called while holding queue->lock. 134 */ 135 static void 136 rpc_delete_timer(struct rpc_task *task) 137 { 138 if (RPC_IS_QUEUED(task)) 139 return; 140 if (test_and_clear_bit(RPC_TASK_HAS_TIMER, &task->tk_runstate)) { 141 del_singleshot_timer_sync(&task->tk_timer); 142 dprintk("RPC: %4d deleting timer\n", task->tk_pid); 143 } 144 } 145 146 /* 147 * Add new request to a priority queue. 148 */ 149 static void __rpc_add_wait_queue_priority(struct rpc_wait_queue *queue, struct rpc_task *task) 150 { 151 struct list_head *q; 152 struct rpc_task *t; 153 154 INIT_LIST_HEAD(&task->u.tk_wait.links); 155 q = &queue->tasks[task->tk_priority]; 156 if (unlikely(task->tk_priority > queue->maxpriority)) 157 q = &queue->tasks[queue->maxpriority]; 158 list_for_each_entry(t, q, u.tk_wait.list) { 159 if (t->tk_cookie == task->tk_cookie) { 160 list_add_tail(&task->u.tk_wait.list, &t->u.tk_wait.links); 161 return; 162 } 163 } 164 list_add_tail(&task->u.tk_wait.list, q); 165 } 166 167 /* 168 * Add new request to wait queue. 169 * 170 * Swapper tasks always get inserted at the head of the queue. 171 * This should avoid many nasty memory deadlocks and hopefully 172 * improve overall performance. 173 * Everyone else gets appended to the queue to ensure proper FIFO behavior. 174 */ 175 static void __rpc_add_wait_queue(struct rpc_wait_queue *queue, struct rpc_task *task) 176 { 177 BUG_ON (RPC_IS_QUEUED(task)); 178 179 if (RPC_IS_PRIORITY(queue)) 180 __rpc_add_wait_queue_priority(queue, task); 181 else if (RPC_IS_SWAPPER(task)) 182 list_add(&task->u.tk_wait.list, &queue->tasks[0]); 183 else 184 list_add_tail(&task->u.tk_wait.list, &queue->tasks[0]); 185 task->u.tk_wait.rpc_waitq = queue; 186 rpc_set_queued(task); 187 188 dprintk("RPC: %4d added to queue %p \"%s\"\n", 189 task->tk_pid, queue, rpc_qname(queue)); 190 } 191 192 /* 193 * Remove request from a priority queue. 194 */ 195 static void __rpc_remove_wait_queue_priority(struct rpc_task *task) 196 { 197 struct rpc_task *t; 198 199 if (!list_empty(&task->u.tk_wait.links)) { 200 t = list_entry(task->u.tk_wait.links.next, struct rpc_task, u.tk_wait.list); 201 list_move(&t->u.tk_wait.list, &task->u.tk_wait.list); 202 list_splice_init(&task->u.tk_wait.links, &t->u.tk_wait.links); 203 } 204 list_del(&task->u.tk_wait.list); 205 } 206 207 /* 208 * Remove request from queue. 209 * Note: must be called with spin lock held. 210 */ 211 static void __rpc_remove_wait_queue(struct rpc_task *task) 212 { 213 struct rpc_wait_queue *queue; 214 queue = task->u.tk_wait.rpc_waitq; 215 216 if (RPC_IS_PRIORITY(queue)) 217 __rpc_remove_wait_queue_priority(task); 218 else 219 list_del(&task->u.tk_wait.list); 220 dprintk("RPC: %4d removed from queue %p \"%s\"\n", 221 task->tk_pid, queue, rpc_qname(queue)); 222 } 223 224 static inline void rpc_set_waitqueue_priority(struct rpc_wait_queue *queue, int priority) 225 { 226 queue->priority = priority; 227 queue->count = 1 << (priority * 2); 228 } 229 230 static inline void rpc_set_waitqueue_cookie(struct rpc_wait_queue *queue, unsigned long cookie) 231 { 232 queue->cookie = cookie; 233 queue->nr = RPC_BATCH_COUNT; 234 } 235 236 static inline void rpc_reset_waitqueue_priority(struct rpc_wait_queue *queue) 237 { 238 rpc_set_waitqueue_priority(queue, queue->maxpriority); 239 rpc_set_waitqueue_cookie(queue, 0); 240 } 241 242 static void __rpc_init_priority_wait_queue(struct rpc_wait_queue *queue, const char *qname, int maxprio) 243 { 244 int i; 245 246 spin_lock_init(&queue->lock); 247 for (i = 0; i < ARRAY_SIZE(queue->tasks); i++) 248 INIT_LIST_HEAD(&queue->tasks[i]); 249 queue->maxpriority = maxprio; 250 rpc_reset_waitqueue_priority(queue); 251 #ifdef RPC_DEBUG 252 queue->name = qname; 253 #endif 254 } 255 256 void rpc_init_priority_wait_queue(struct rpc_wait_queue *queue, const char *qname) 257 { 258 __rpc_init_priority_wait_queue(queue, qname, RPC_PRIORITY_HIGH); 259 } 260 261 void rpc_init_wait_queue(struct rpc_wait_queue *queue, const char *qname) 262 { 263 __rpc_init_priority_wait_queue(queue, qname, 0); 264 } 265 EXPORT_SYMBOL(rpc_init_wait_queue); 266 267 /* 268 * Make an RPC task runnable. 269 * 270 * Note: If the task is ASYNC, this must be called with 271 * the spinlock held to protect the wait queue operation. 272 */ 273 static void rpc_make_runnable(struct rpc_task *task) 274 { 275 int do_ret; 276 277 BUG_ON(task->tk_timeout_fn); 278 do_ret = rpc_test_and_set_running(task); 279 rpc_clear_queued(task); 280 if (do_ret) 281 return; 282 if (RPC_IS_ASYNC(task)) { 283 int status; 284 285 INIT_WORK(&task->u.tk_work, rpc_async_schedule, (void *)task); 286 status = queue_work(task->tk_workqueue, &task->u.tk_work); 287 if (status < 0) { 288 printk(KERN_WARNING "RPC: failed to add task to queue: error: %d!\n", status); 289 task->tk_status = status; 290 return; 291 } 292 } else 293 wake_up_bit(&task->tk_runstate, RPC_TASK_QUEUED); 294 } 295 296 /* 297 * Place a newly initialized task on the workqueue. 298 */ 299 static inline void 300 rpc_schedule_run(struct rpc_task *task) 301 { 302 /* Don't run a child twice! */ 303 if (RPC_IS_ACTIVATED(task)) 304 return; 305 task->tk_active = 1; 306 rpc_make_runnable(task); 307 } 308 309 /* 310 * Prepare for sleeping on a wait queue. 311 * By always appending tasks to the list we ensure FIFO behavior. 312 * NB: An RPC task will only receive interrupt-driven events as long 313 * as it's on a wait queue. 314 */ 315 static void __rpc_sleep_on(struct rpc_wait_queue *q, struct rpc_task *task, 316 rpc_action action, rpc_action timer) 317 { 318 dprintk("RPC: %4d sleep_on(queue \"%s\" time %ld)\n", task->tk_pid, 319 rpc_qname(q), jiffies); 320 321 if (!RPC_IS_ASYNC(task) && !RPC_IS_ACTIVATED(task)) { 322 printk(KERN_ERR "RPC: Inactive synchronous task put to sleep!\n"); 323 return; 324 } 325 326 /* Mark the task as being activated if so needed */ 327 if (!RPC_IS_ACTIVATED(task)) 328 task->tk_active = 1; 329 330 __rpc_add_wait_queue(q, task); 331 332 BUG_ON(task->tk_callback != NULL); 333 task->tk_callback = action; 334 __rpc_add_timer(task, timer); 335 } 336 337 void rpc_sleep_on(struct rpc_wait_queue *q, struct rpc_task *task, 338 rpc_action action, rpc_action timer) 339 { 340 /* 341 * Protect the queue operations. 342 */ 343 spin_lock_bh(&q->lock); 344 __rpc_sleep_on(q, task, action, timer); 345 spin_unlock_bh(&q->lock); 346 } 347 348 /** 349 * __rpc_do_wake_up_task - wake up a single rpc_task 350 * @task: task to be woken up 351 * 352 * Caller must hold queue->lock, and have cleared the task queued flag. 353 */ 354 static void __rpc_do_wake_up_task(struct rpc_task *task) 355 { 356 dprintk("RPC: %4d __rpc_wake_up_task (now %ld)\n", task->tk_pid, jiffies); 357 358 #ifdef RPC_DEBUG 359 BUG_ON(task->tk_magic != RPC_TASK_MAGIC_ID); 360 #endif 361 /* Has the task been executed yet? If not, we cannot wake it up! */ 362 if (!RPC_IS_ACTIVATED(task)) { 363 printk(KERN_ERR "RPC: Inactive task (%p) being woken up!\n", task); 364 return; 365 } 366 367 __rpc_disable_timer(task); 368 __rpc_remove_wait_queue(task); 369 370 rpc_make_runnable(task); 371 372 dprintk("RPC: __rpc_wake_up_task done\n"); 373 } 374 375 /* 376 * Wake up the specified task 377 */ 378 static void __rpc_wake_up_task(struct rpc_task *task) 379 { 380 if (rpc_start_wakeup(task)) { 381 if (RPC_IS_QUEUED(task)) 382 __rpc_do_wake_up_task(task); 383 rpc_finish_wakeup(task); 384 } 385 } 386 387 /* 388 * Default timeout handler if none specified by user 389 */ 390 static void 391 __rpc_default_timer(struct rpc_task *task) 392 { 393 dprintk("RPC: %d timeout (default timer)\n", task->tk_pid); 394 task->tk_status = -ETIMEDOUT; 395 rpc_wake_up_task(task); 396 } 397 398 /* 399 * Wake up the specified task 400 */ 401 void rpc_wake_up_task(struct rpc_task *task) 402 { 403 if (rpc_start_wakeup(task)) { 404 if (RPC_IS_QUEUED(task)) { 405 struct rpc_wait_queue *queue = task->u.tk_wait.rpc_waitq; 406 407 spin_lock_bh(&queue->lock); 408 __rpc_do_wake_up_task(task); 409 spin_unlock_bh(&queue->lock); 410 } 411 rpc_finish_wakeup(task); 412 } 413 } 414 415 /* 416 * Wake up the next task on a priority queue. 417 */ 418 static struct rpc_task * __rpc_wake_up_next_priority(struct rpc_wait_queue *queue) 419 { 420 struct list_head *q; 421 struct rpc_task *task; 422 423 /* 424 * Service a batch of tasks from a single cookie. 425 */ 426 q = &queue->tasks[queue->priority]; 427 if (!list_empty(q)) { 428 task = list_entry(q->next, struct rpc_task, u.tk_wait.list); 429 if (queue->cookie == task->tk_cookie) { 430 if (--queue->nr) 431 goto out; 432 list_move_tail(&task->u.tk_wait.list, q); 433 } 434 /* 435 * Check if we need to switch queues. 436 */ 437 if (--queue->count) 438 goto new_cookie; 439 } 440 441 /* 442 * Service the next queue. 443 */ 444 do { 445 if (q == &queue->tasks[0]) 446 q = &queue->tasks[queue->maxpriority]; 447 else 448 q = q - 1; 449 if (!list_empty(q)) { 450 task = list_entry(q->next, struct rpc_task, u.tk_wait.list); 451 goto new_queue; 452 } 453 } while (q != &queue->tasks[queue->priority]); 454 455 rpc_reset_waitqueue_priority(queue); 456 return NULL; 457 458 new_queue: 459 rpc_set_waitqueue_priority(queue, (unsigned int)(q - &queue->tasks[0])); 460 new_cookie: 461 rpc_set_waitqueue_cookie(queue, task->tk_cookie); 462 out: 463 __rpc_wake_up_task(task); 464 return task; 465 } 466 467 /* 468 * Wake up the next task on the wait queue. 469 */ 470 struct rpc_task * rpc_wake_up_next(struct rpc_wait_queue *queue) 471 { 472 struct rpc_task *task = NULL; 473 474 dprintk("RPC: wake_up_next(%p \"%s\")\n", queue, rpc_qname(queue)); 475 spin_lock_bh(&queue->lock); 476 if (RPC_IS_PRIORITY(queue)) 477 task = __rpc_wake_up_next_priority(queue); 478 else { 479 task_for_first(task, &queue->tasks[0]) 480 __rpc_wake_up_task(task); 481 } 482 spin_unlock_bh(&queue->lock); 483 484 return task; 485 } 486 487 /** 488 * rpc_wake_up - wake up all rpc_tasks 489 * @queue: rpc_wait_queue on which the tasks are sleeping 490 * 491 * Grabs queue->lock 492 */ 493 void rpc_wake_up(struct rpc_wait_queue *queue) 494 { 495 struct rpc_task *task; 496 497 struct list_head *head; 498 spin_lock_bh(&queue->lock); 499 head = &queue->tasks[queue->maxpriority]; 500 for (;;) { 501 while (!list_empty(head)) { 502 task = list_entry(head->next, struct rpc_task, u.tk_wait.list); 503 __rpc_wake_up_task(task); 504 } 505 if (head == &queue->tasks[0]) 506 break; 507 head--; 508 } 509 spin_unlock_bh(&queue->lock); 510 } 511 512 /** 513 * rpc_wake_up_status - wake up all rpc_tasks and set their status value. 514 * @queue: rpc_wait_queue on which the tasks are sleeping 515 * @status: status value to set 516 * 517 * Grabs queue->lock 518 */ 519 void rpc_wake_up_status(struct rpc_wait_queue *queue, int status) 520 { 521 struct list_head *head; 522 struct rpc_task *task; 523 524 spin_lock_bh(&queue->lock); 525 head = &queue->tasks[queue->maxpriority]; 526 for (;;) { 527 while (!list_empty(head)) { 528 task = list_entry(head->next, struct rpc_task, u.tk_wait.list); 529 task->tk_status = status; 530 __rpc_wake_up_task(task); 531 } 532 if (head == &queue->tasks[0]) 533 break; 534 head--; 535 } 536 spin_unlock_bh(&queue->lock); 537 } 538 539 /* 540 * Run a task at a later time 541 */ 542 static void __rpc_atrun(struct rpc_task *); 543 void 544 rpc_delay(struct rpc_task *task, unsigned long delay) 545 { 546 task->tk_timeout = delay; 547 rpc_sleep_on(&delay_queue, task, NULL, __rpc_atrun); 548 } 549 550 static void 551 __rpc_atrun(struct rpc_task *task) 552 { 553 task->tk_status = 0; 554 rpc_wake_up_task(task); 555 } 556 557 /* 558 * Helper that calls task->tk_exit if it exists and then returns 559 * true if we should exit __rpc_execute. 560 */ 561 static inline int __rpc_do_exit(struct rpc_task *task) 562 { 563 if (task->tk_exit != NULL) { 564 lock_kernel(); 565 task->tk_exit(task); 566 unlock_kernel(); 567 /* If tk_action is non-null, we should restart the call */ 568 if (task->tk_action != NULL) { 569 if (!RPC_ASSASSINATED(task)) { 570 /* Release RPC slot and buffer memory */ 571 xprt_release(task); 572 rpc_free(task); 573 return 0; 574 } 575 printk(KERN_ERR "RPC: dead task tried to walk away.\n"); 576 } 577 } 578 return 1; 579 } 580 581 static int rpc_wait_bit_interruptible(void *word) 582 { 583 if (signal_pending(current)) 584 return -ERESTARTSYS; 585 schedule(); 586 return 0; 587 } 588 589 /* 590 * This is the RPC `scheduler' (or rather, the finite state machine). 591 */ 592 static int __rpc_execute(struct rpc_task *task) 593 { 594 int status = 0; 595 596 dprintk("RPC: %4d rpc_execute flgs %x\n", 597 task->tk_pid, task->tk_flags); 598 599 BUG_ON(RPC_IS_QUEUED(task)); 600 601 for (;;) { 602 /* 603 * Garbage collection of pending timers... 604 */ 605 rpc_delete_timer(task); 606 607 /* 608 * Execute any pending callback. 609 */ 610 if (RPC_DO_CALLBACK(task)) { 611 /* Define a callback save pointer */ 612 void (*save_callback)(struct rpc_task *); 613 614 /* 615 * If a callback exists, save it, reset it, 616 * call it. 617 * The save is needed to stop from resetting 618 * another callback set within the callback handler 619 * - Dave 620 */ 621 save_callback=task->tk_callback; 622 task->tk_callback=NULL; 623 lock_kernel(); 624 save_callback(task); 625 unlock_kernel(); 626 } 627 628 /* 629 * Perform the next FSM step. 630 * tk_action may be NULL when the task has been killed 631 * by someone else. 632 */ 633 if (!RPC_IS_QUEUED(task)) { 634 if (task->tk_action != NULL) { 635 lock_kernel(); 636 task->tk_action(task); 637 unlock_kernel(); 638 } else if (__rpc_do_exit(task)) 639 break; 640 } 641 642 /* 643 * Lockless check for whether task is sleeping or not. 644 */ 645 if (!RPC_IS_QUEUED(task)) 646 continue; 647 rpc_clear_running(task); 648 if (RPC_IS_ASYNC(task)) { 649 /* Careful! we may have raced... */ 650 if (RPC_IS_QUEUED(task)) 651 return 0; 652 if (rpc_test_and_set_running(task)) 653 return 0; 654 continue; 655 } 656 657 /* sync task: sleep here */ 658 dprintk("RPC: %4d sync task going to sleep\n", task->tk_pid); 659 /* Note: Caller should be using rpc_clnt_sigmask() */ 660 status = out_of_line_wait_on_bit(&task->tk_runstate, 661 RPC_TASK_QUEUED, rpc_wait_bit_interruptible, 662 TASK_INTERRUPTIBLE); 663 if (status == -ERESTARTSYS) { 664 /* 665 * When a sync task receives a signal, it exits with 666 * -ERESTARTSYS. In order to catch any callbacks that 667 * clean up after sleeping on some queue, we don't 668 * break the loop here, but go around once more. 669 */ 670 dprintk("RPC: %4d got signal\n", task->tk_pid); 671 task->tk_flags |= RPC_TASK_KILLED; 672 rpc_exit(task, -ERESTARTSYS); 673 rpc_wake_up_task(task); 674 } 675 rpc_set_running(task); 676 dprintk("RPC: %4d sync task resuming\n", task->tk_pid); 677 } 678 679 dprintk("RPC: %4d exit() = %d\n", task->tk_pid, task->tk_status); 680 status = task->tk_status; 681 682 /* Release all resources associated with the task */ 683 rpc_release_task(task); 684 return status; 685 } 686 687 /* 688 * User-visible entry point to the scheduler. 689 * 690 * This may be called recursively if e.g. an async NFS task updates 691 * the attributes and finds that dirty pages must be flushed. 692 * NOTE: Upon exit of this function the task is guaranteed to be 693 * released. In particular note that tk_release() will have 694 * been called, so your task memory may have been freed. 695 */ 696 int 697 rpc_execute(struct rpc_task *task) 698 { 699 BUG_ON(task->tk_active); 700 701 task->tk_active = 1; 702 rpc_set_running(task); 703 return __rpc_execute(task); 704 } 705 706 static void rpc_async_schedule(void *arg) 707 { 708 __rpc_execute((struct rpc_task *)arg); 709 } 710 711 /* 712 * Allocate memory for RPC purposes. 713 * 714 * We try to ensure that some NFS reads and writes can always proceed 715 * by using a mempool when allocating 'small' buffers. 716 * In order to avoid memory starvation triggering more writebacks of 717 * NFS requests, we use GFP_NOFS rather than GFP_KERNEL. 718 */ 719 void * 720 rpc_malloc(struct rpc_task *task, size_t size) 721 { 722 gfp_t gfp; 723 724 if (task->tk_flags & RPC_TASK_SWAPPER) 725 gfp = GFP_ATOMIC; 726 else 727 gfp = GFP_NOFS; 728 729 if (size > RPC_BUFFER_MAXSIZE) { 730 task->tk_buffer = kmalloc(size, gfp); 731 if (task->tk_buffer) 732 task->tk_bufsize = size; 733 } else { 734 task->tk_buffer = mempool_alloc(rpc_buffer_mempool, gfp); 735 if (task->tk_buffer) 736 task->tk_bufsize = RPC_BUFFER_MAXSIZE; 737 } 738 return task->tk_buffer; 739 } 740 741 static void 742 rpc_free(struct rpc_task *task) 743 { 744 if (task->tk_buffer) { 745 if (task->tk_bufsize == RPC_BUFFER_MAXSIZE) 746 mempool_free(task->tk_buffer, rpc_buffer_mempool); 747 else 748 kfree(task->tk_buffer); 749 task->tk_buffer = NULL; 750 task->tk_bufsize = 0; 751 } 752 } 753 754 /* 755 * Creation and deletion of RPC task structures 756 */ 757 void rpc_init_task(struct rpc_task *task, struct rpc_clnt *clnt, rpc_action callback, int flags) 758 { 759 memset(task, 0, sizeof(*task)); 760 init_timer(&task->tk_timer); 761 task->tk_timer.data = (unsigned long) task; 762 task->tk_timer.function = (void (*)(unsigned long)) rpc_run_timer; 763 task->tk_client = clnt; 764 task->tk_flags = flags; 765 task->tk_exit = callback; 766 767 /* Initialize retry counters */ 768 task->tk_garb_retry = 2; 769 task->tk_cred_retry = 2; 770 771 task->tk_priority = RPC_PRIORITY_NORMAL; 772 task->tk_cookie = (unsigned long)current; 773 774 /* Initialize workqueue for async tasks */ 775 task->tk_workqueue = rpciod_workqueue; 776 777 if (clnt) { 778 atomic_inc(&clnt->cl_users); 779 if (clnt->cl_softrtry) 780 task->tk_flags |= RPC_TASK_SOFT; 781 if (!clnt->cl_intr) 782 task->tk_flags |= RPC_TASK_NOINTR; 783 } 784 785 #ifdef RPC_DEBUG 786 task->tk_magic = RPC_TASK_MAGIC_ID; 787 task->tk_pid = rpc_task_id++; 788 #endif 789 /* Add to global list of all tasks */ 790 spin_lock(&rpc_sched_lock); 791 list_add_tail(&task->tk_task, &all_tasks); 792 spin_unlock(&rpc_sched_lock); 793 794 dprintk("RPC: %4d new task procpid %d\n", task->tk_pid, 795 current->pid); 796 } 797 798 static struct rpc_task * 799 rpc_alloc_task(void) 800 { 801 return (struct rpc_task *)mempool_alloc(rpc_task_mempool, GFP_NOFS); 802 } 803 804 static void 805 rpc_default_free_task(struct rpc_task *task) 806 { 807 dprintk("RPC: %4d freeing task\n", task->tk_pid); 808 mempool_free(task, rpc_task_mempool); 809 } 810 811 /* 812 * Create a new task for the specified client. We have to 813 * clean up after an allocation failure, as the client may 814 * have specified "oneshot". 815 */ 816 struct rpc_task * 817 rpc_new_task(struct rpc_clnt *clnt, rpc_action callback, int flags) 818 { 819 struct rpc_task *task; 820 821 task = rpc_alloc_task(); 822 if (!task) 823 goto cleanup; 824 825 rpc_init_task(task, clnt, callback, flags); 826 827 /* Replace tk_release */ 828 task->tk_release = rpc_default_free_task; 829 830 dprintk("RPC: %4d allocated task\n", task->tk_pid); 831 task->tk_flags |= RPC_TASK_DYNAMIC; 832 out: 833 return task; 834 835 cleanup: 836 /* Check whether to release the client */ 837 if (clnt) { 838 printk("rpc_new_task: failed, users=%d, oneshot=%d\n", 839 atomic_read(&clnt->cl_users), clnt->cl_oneshot); 840 atomic_inc(&clnt->cl_users); /* pretend we were used ... */ 841 rpc_release_client(clnt); 842 } 843 goto out; 844 } 845 846 void rpc_release_task(struct rpc_task *task) 847 { 848 dprintk("RPC: %4d release task\n", task->tk_pid); 849 850 #ifdef RPC_DEBUG 851 BUG_ON(task->tk_magic != RPC_TASK_MAGIC_ID); 852 #endif 853 854 /* Remove from global task list */ 855 spin_lock(&rpc_sched_lock); 856 list_del(&task->tk_task); 857 spin_unlock(&rpc_sched_lock); 858 859 BUG_ON (RPC_IS_QUEUED(task)); 860 task->tk_active = 0; 861 862 /* Synchronously delete any running timer */ 863 rpc_delete_timer(task); 864 865 /* Release resources */ 866 if (task->tk_rqstp) 867 xprt_release(task); 868 if (task->tk_msg.rpc_cred) 869 rpcauth_unbindcred(task); 870 rpc_free(task); 871 if (task->tk_client) { 872 rpc_release_client(task->tk_client); 873 task->tk_client = NULL; 874 } 875 876 #ifdef RPC_DEBUG 877 task->tk_magic = 0; 878 #endif 879 if (task->tk_release) 880 task->tk_release(task); 881 } 882 883 /** 884 * rpc_find_parent - find the parent of a child task. 885 * @child: child task 886 * 887 * Checks that the parent task is still sleeping on the 888 * queue 'childq'. If so returns a pointer to the parent. 889 * Upon failure returns NULL. 890 * 891 * Caller must hold childq.lock 892 */ 893 static inline struct rpc_task *rpc_find_parent(struct rpc_task *child) 894 { 895 struct rpc_task *task, *parent; 896 struct list_head *le; 897 898 parent = (struct rpc_task *) child->tk_calldata; 899 task_for_each(task, le, &childq.tasks[0]) 900 if (task == parent) 901 return parent; 902 903 return NULL; 904 } 905 906 static void rpc_child_exit(struct rpc_task *child) 907 { 908 struct rpc_task *parent; 909 910 spin_lock_bh(&childq.lock); 911 if ((parent = rpc_find_parent(child)) != NULL) { 912 parent->tk_status = child->tk_status; 913 __rpc_wake_up_task(parent); 914 } 915 spin_unlock_bh(&childq.lock); 916 } 917 918 /* 919 * Note: rpc_new_task releases the client after a failure. 920 */ 921 struct rpc_task * 922 rpc_new_child(struct rpc_clnt *clnt, struct rpc_task *parent) 923 { 924 struct rpc_task *task; 925 926 task = rpc_new_task(clnt, NULL, RPC_TASK_ASYNC | RPC_TASK_CHILD); 927 if (!task) 928 goto fail; 929 task->tk_exit = rpc_child_exit; 930 task->tk_calldata = parent; 931 return task; 932 933 fail: 934 parent->tk_status = -ENOMEM; 935 return NULL; 936 } 937 938 void rpc_run_child(struct rpc_task *task, struct rpc_task *child, rpc_action func) 939 { 940 spin_lock_bh(&childq.lock); 941 /* N.B. Is it possible for the child to have already finished? */ 942 __rpc_sleep_on(&childq, task, func, NULL); 943 rpc_schedule_run(child); 944 spin_unlock_bh(&childq.lock); 945 } 946 947 /* 948 * Kill all tasks for the given client. 949 * XXX: kill their descendants as well? 950 */ 951 void rpc_killall_tasks(struct rpc_clnt *clnt) 952 { 953 struct rpc_task *rovr; 954 struct list_head *le; 955 956 dprintk("RPC: killing all tasks for client %p\n", clnt); 957 958 /* 959 * Spin lock all_tasks to prevent changes... 960 */ 961 spin_lock(&rpc_sched_lock); 962 alltask_for_each(rovr, le, &all_tasks) { 963 if (! RPC_IS_ACTIVATED(rovr)) 964 continue; 965 if (!clnt || rovr->tk_client == clnt) { 966 rovr->tk_flags |= RPC_TASK_KILLED; 967 rpc_exit(rovr, -EIO); 968 rpc_wake_up_task(rovr); 969 } 970 } 971 spin_unlock(&rpc_sched_lock); 972 } 973 974 static DECLARE_MUTEX_LOCKED(rpciod_running); 975 976 static void rpciod_killall(void) 977 { 978 unsigned long flags; 979 980 while (!list_empty(&all_tasks)) { 981 clear_thread_flag(TIF_SIGPENDING); 982 rpc_killall_tasks(NULL); 983 flush_workqueue(rpciod_workqueue); 984 if (!list_empty(&all_tasks)) { 985 dprintk("rpciod_killall: waiting for tasks to exit\n"); 986 yield(); 987 } 988 } 989 990 spin_lock_irqsave(¤t->sighand->siglock, flags); 991 recalc_sigpending(); 992 spin_unlock_irqrestore(¤t->sighand->siglock, flags); 993 } 994 995 /* 996 * Start up the rpciod process if it's not already running. 997 */ 998 int 999 rpciod_up(void) 1000 { 1001 struct workqueue_struct *wq; 1002 int error = 0; 1003 1004 down(&rpciod_sema); 1005 dprintk("rpciod_up: users %d\n", rpciod_users); 1006 rpciod_users++; 1007 if (rpciod_workqueue) 1008 goto out; 1009 /* 1010 * If there's no pid, we should be the first user. 1011 */ 1012 if (rpciod_users > 1) 1013 printk(KERN_WARNING "rpciod_up: no workqueue, %d users??\n", rpciod_users); 1014 /* 1015 * Create the rpciod thread and wait for it to start. 1016 */ 1017 error = -ENOMEM; 1018 wq = create_workqueue("rpciod"); 1019 if (wq == NULL) { 1020 printk(KERN_WARNING "rpciod_up: create workqueue failed, error=%d\n", error); 1021 rpciod_users--; 1022 goto out; 1023 } 1024 rpciod_workqueue = wq; 1025 error = 0; 1026 out: 1027 up(&rpciod_sema); 1028 return error; 1029 } 1030 1031 void 1032 rpciod_down(void) 1033 { 1034 down(&rpciod_sema); 1035 dprintk("rpciod_down sema %d\n", rpciod_users); 1036 if (rpciod_users) { 1037 if (--rpciod_users) 1038 goto out; 1039 } else 1040 printk(KERN_WARNING "rpciod_down: no users??\n"); 1041 1042 if (!rpciod_workqueue) { 1043 dprintk("rpciod_down: Nothing to do!\n"); 1044 goto out; 1045 } 1046 rpciod_killall(); 1047 1048 destroy_workqueue(rpciod_workqueue); 1049 rpciod_workqueue = NULL; 1050 out: 1051 up(&rpciod_sema); 1052 } 1053 1054 #ifdef RPC_DEBUG 1055 void rpc_show_tasks(void) 1056 { 1057 struct list_head *le; 1058 struct rpc_task *t; 1059 1060 spin_lock(&rpc_sched_lock); 1061 if (list_empty(&all_tasks)) { 1062 spin_unlock(&rpc_sched_lock); 1063 return; 1064 } 1065 printk("-pid- proc flgs status -client- -prog- --rqstp- -timeout " 1066 "-rpcwait -action- --exit--\n"); 1067 alltask_for_each(t, le, &all_tasks) { 1068 const char *rpc_waitq = "none"; 1069 1070 if (RPC_IS_QUEUED(t)) 1071 rpc_waitq = rpc_qname(t->u.tk_wait.rpc_waitq); 1072 1073 printk("%05d %04d %04x %06d %8p %6d %8p %08ld %8s %8p %8p\n", 1074 t->tk_pid, 1075 (t->tk_msg.rpc_proc ? t->tk_msg.rpc_proc->p_proc : -1), 1076 t->tk_flags, t->tk_status, 1077 t->tk_client, 1078 (t->tk_client ? t->tk_client->cl_prog : 0), 1079 t->tk_rqstp, t->tk_timeout, 1080 rpc_waitq, 1081 t->tk_action, t->tk_exit); 1082 } 1083 spin_unlock(&rpc_sched_lock); 1084 } 1085 #endif 1086 1087 void 1088 rpc_destroy_mempool(void) 1089 { 1090 if (rpc_buffer_mempool) 1091 mempool_destroy(rpc_buffer_mempool); 1092 if (rpc_task_mempool) 1093 mempool_destroy(rpc_task_mempool); 1094 if (rpc_task_slabp && kmem_cache_destroy(rpc_task_slabp)) 1095 printk(KERN_INFO "rpc_task: not all structures were freed\n"); 1096 if (rpc_buffer_slabp && kmem_cache_destroy(rpc_buffer_slabp)) 1097 printk(KERN_INFO "rpc_buffers: not all structures were freed\n"); 1098 } 1099 1100 int 1101 rpc_init_mempool(void) 1102 { 1103 rpc_task_slabp = kmem_cache_create("rpc_tasks", 1104 sizeof(struct rpc_task), 1105 0, SLAB_HWCACHE_ALIGN, 1106 NULL, NULL); 1107 if (!rpc_task_slabp) 1108 goto err_nomem; 1109 rpc_buffer_slabp = kmem_cache_create("rpc_buffers", 1110 RPC_BUFFER_MAXSIZE, 1111 0, SLAB_HWCACHE_ALIGN, 1112 NULL, NULL); 1113 if (!rpc_buffer_slabp) 1114 goto err_nomem; 1115 rpc_task_mempool = mempool_create(RPC_TASK_POOLSIZE, 1116 mempool_alloc_slab, 1117 mempool_free_slab, 1118 rpc_task_slabp); 1119 if (!rpc_task_mempool) 1120 goto err_nomem; 1121 rpc_buffer_mempool = mempool_create(RPC_BUFFER_POOLSIZE, 1122 mempool_alloc_slab, 1123 mempool_free_slab, 1124 rpc_buffer_slabp); 1125 if (!rpc_buffer_mempool) 1126 goto err_nomem; 1127 return 0; 1128 err_nomem: 1129 rpc_destroy_mempool(); 1130 return -ENOMEM; 1131 } 1132