1 /* 2 * linux/net/sunrpc/sched.c 3 * 4 * Scheduling for synchronous and asynchronous RPC requests. 5 * 6 * Copyright (C) 1996 Olaf Kirch, <okir@monad.swb.de> 7 * 8 * TCP NFS related read + write fixes 9 * (C) 1999 Dave Airlie, University of Limerick, Ireland <airlied@linux.ie> 10 */ 11 12 #include <linux/module.h> 13 14 #include <linux/sched.h> 15 #include <linux/interrupt.h> 16 #include <linux/slab.h> 17 #include <linux/mempool.h> 18 #include <linux/smp.h> 19 #include <linux/smp_lock.h> 20 #include <linux/spinlock.h> 21 22 #include <linux/sunrpc/clnt.h> 23 #include <linux/sunrpc/xprt.h> 24 25 #ifdef RPC_DEBUG 26 #define RPCDBG_FACILITY RPCDBG_SCHED 27 #define RPC_TASK_MAGIC_ID 0xf00baa 28 static int rpc_task_id; 29 #endif 30 31 /* 32 * RPC slabs and memory pools 33 */ 34 #define RPC_BUFFER_MAXSIZE (2048) 35 #define RPC_BUFFER_POOLSIZE (8) 36 #define RPC_TASK_POOLSIZE (8) 37 static kmem_cache_t *rpc_task_slabp; 38 static kmem_cache_t *rpc_buffer_slabp; 39 static mempool_t *rpc_task_mempool; 40 static mempool_t *rpc_buffer_mempool; 41 42 static void __rpc_default_timer(struct rpc_task *task); 43 static void rpciod_killall(void); 44 static void rpc_free(struct rpc_task *task); 45 46 static void rpc_async_schedule(void *); 47 48 /* 49 * RPC tasks that create another task (e.g. for contacting the portmapper) 50 * will wait on this queue for their child's completion 51 */ 52 static RPC_WAITQ(childq, "childq"); 53 54 /* 55 * RPC tasks sit here while waiting for conditions to improve. 56 */ 57 static RPC_WAITQ(delay_queue, "delayq"); 58 59 /* 60 * All RPC tasks are linked into this list 61 */ 62 static LIST_HEAD(all_tasks); 63 64 /* 65 * rpciod-related stuff 66 */ 67 static DECLARE_MUTEX(rpciod_sema); 68 static unsigned int rpciod_users; 69 static struct workqueue_struct *rpciod_workqueue; 70 71 /* 72 * Spinlock for other critical sections of code. 73 */ 74 static DEFINE_SPINLOCK(rpc_sched_lock); 75 76 /* 77 * Disable the timer for a given RPC task. Should be called with 78 * queue->lock and bh_disabled in order to avoid races within 79 * rpc_run_timer(). 80 */ 81 static inline void 82 __rpc_disable_timer(struct rpc_task *task) 83 { 84 dprintk("RPC: %4d disabling timer\n", task->tk_pid); 85 task->tk_timeout_fn = NULL; 86 task->tk_timeout = 0; 87 } 88 89 /* 90 * Run a timeout function. 91 * We use the callback in order to allow __rpc_wake_up_task() 92 * and friends to disable the timer synchronously on SMP systems 93 * without calling del_timer_sync(). The latter could cause a 94 * deadlock if called while we're holding spinlocks... 95 */ 96 static void rpc_run_timer(struct rpc_task *task) 97 { 98 void (*callback)(struct rpc_task *); 99 100 callback = task->tk_timeout_fn; 101 task->tk_timeout_fn = NULL; 102 if (callback && RPC_IS_QUEUED(task)) { 103 dprintk("RPC: %4d running timer\n", task->tk_pid); 104 callback(task); 105 } 106 smp_mb__before_clear_bit(); 107 clear_bit(RPC_TASK_HAS_TIMER, &task->tk_runstate); 108 smp_mb__after_clear_bit(); 109 } 110 111 /* 112 * Set up a timer for the current task. 113 */ 114 static inline void 115 __rpc_add_timer(struct rpc_task *task, rpc_action timer) 116 { 117 if (!task->tk_timeout) 118 return; 119 120 dprintk("RPC: %4d setting alarm for %lu ms\n", 121 task->tk_pid, task->tk_timeout * 1000 / HZ); 122 123 if (timer) 124 task->tk_timeout_fn = timer; 125 else 126 task->tk_timeout_fn = __rpc_default_timer; 127 set_bit(RPC_TASK_HAS_TIMER, &task->tk_runstate); 128 mod_timer(&task->tk_timer, jiffies + task->tk_timeout); 129 } 130 131 /* 132 * Delete any timer for the current task. Because we use del_timer_sync(), 133 * this function should never be called while holding queue->lock. 134 */ 135 static void 136 rpc_delete_timer(struct rpc_task *task) 137 { 138 if (RPC_IS_QUEUED(task)) 139 return; 140 if (test_and_clear_bit(RPC_TASK_HAS_TIMER, &task->tk_runstate)) { 141 del_singleshot_timer_sync(&task->tk_timer); 142 dprintk("RPC: %4d deleting timer\n", task->tk_pid); 143 } 144 } 145 146 /* 147 * Add new request to a priority queue. 148 */ 149 static void __rpc_add_wait_queue_priority(struct rpc_wait_queue *queue, struct rpc_task *task) 150 { 151 struct list_head *q; 152 struct rpc_task *t; 153 154 INIT_LIST_HEAD(&task->u.tk_wait.links); 155 q = &queue->tasks[task->tk_priority]; 156 if (unlikely(task->tk_priority > queue->maxpriority)) 157 q = &queue->tasks[queue->maxpriority]; 158 list_for_each_entry(t, q, u.tk_wait.list) { 159 if (t->tk_cookie == task->tk_cookie) { 160 list_add_tail(&task->u.tk_wait.list, &t->u.tk_wait.links); 161 return; 162 } 163 } 164 list_add_tail(&task->u.tk_wait.list, q); 165 } 166 167 /* 168 * Add new request to wait queue. 169 * 170 * Swapper tasks always get inserted at the head of the queue. 171 * This should avoid many nasty memory deadlocks and hopefully 172 * improve overall performance. 173 * Everyone else gets appended to the queue to ensure proper FIFO behavior. 174 */ 175 static void __rpc_add_wait_queue(struct rpc_wait_queue *queue, struct rpc_task *task) 176 { 177 BUG_ON (RPC_IS_QUEUED(task)); 178 179 if (RPC_IS_PRIORITY(queue)) 180 __rpc_add_wait_queue_priority(queue, task); 181 else if (RPC_IS_SWAPPER(task)) 182 list_add(&task->u.tk_wait.list, &queue->tasks[0]); 183 else 184 list_add_tail(&task->u.tk_wait.list, &queue->tasks[0]); 185 task->u.tk_wait.rpc_waitq = queue; 186 rpc_set_queued(task); 187 188 dprintk("RPC: %4d added to queue %p \"%s\"\n", 189 task->tk_pid, queue, rpc_qname(queue)); 190 } 191 192 /* 193 * Remove request from a priority queue. 194 */ 195 static void __rpc_remove_wait_queue_priority(struct rpc_task *task) 196 { 197 struct rpc_task *t; 198 199 if (!list_empty(&task->u.tk_wait.links)) { 200 t = list_entry(task->u.tk_wait.links.next, struct rpc_task, u.tk_wait.list); 201 list_move(&t->u.tk_wait.list, &task->u.tk_wait.list); 202 list_splice_init(&task->u.tk_wait.links, &t->u.tk_wait.links); 203 } 204 list_del(&task->u.tk_wait.list); 205 } 206 207 /* 208 * Remove request from queue. 209 * Note: must be called with spin lock held. 210 */ 211 static void __rpc_remove_wait_queue(struct rpc_task *task) 212 { 213 struct rpc_wait_queue *queue; 214 queue = task->u.tk_wait.rpc_waitq; 215 216 if (RPC_IS_PRIORITY(queue)) 217 __rpc_remove_wait_queue_priority(task); 218 else 219 list_del(&task->u.tk_wait.list); 220 dprintk("RPC: %4d removed from queue %p \"%s\"\n", 221 task->tk_pid, queue, rpc_qname(queue)); 222 } 223 224 static inline void rpc_set_waitqueue_priority(struct rpc_wait_queue *queue, int priority) 225 { 226 queue->priority = priority; 227 queue->count = 1 << (priority * 2); 228 } 229 230 static inline void rpc_set_waitqueue_cookie(struct rpc_wait_queue *queue, unsigned long cookie) 231 { 232 queue->cookie = cookie; 233 queue->nr = RPC_BATCH_COUNT; 234 } 235 236 static inline void rpc_reset_waitqueue_priority(struct rpc_wait_queue *queue) 237 { 238 rpc_set_waitqueue_priority(queue, queue->maxpriority); 239 rpc_set_waitqueue_cookie(queue, 0); 240 } 241 242 static void __rpc_init_priority_wait_queue(struct rpc_wait_queue *queue, const char *qname, int maxprio) 243 { 244 int i; 245 246 spin_lock_init(&queue->lock); 247 for (i = 0; i < ARRAY_SIZE(queue->tasks); i++) 248 INIT_LIST_HEAD(&queue->tasks[i]); 249 queue->maxpriority = maxprio; 250 rpc_reset_waitqueue_priority(queue); 251 #ifdef RPC_DEBUG 252 queue->name = qname; 253 #endif 254 } 255 256 void rpc_init_priority_wait_queue(struct rpc_wait_queue *queue, const char *qname) 257 { 258 __rpc_init_priority_wait_queue(queue, qname, RPC_PRIORITY_HIGH); 259 } 260 261 void rpc_init_wait_queue(struct rpc_wait_queue *queue, const char *qname) 262 { 263 __rpc_init_priority_wait_queue(queue, qname, 0); 264 } 265 EXPORT_SYMBOL(rpc_init_wait_queue); 266 267 /* 268 * Make an RPC task runnable. 269 * 270 * Note: If the task is ASYNC, this must be called with 271 * the spinlock held to protect the wait queue operation. 272 */ 273 static void rpc_make_runnable(struct rpc_task *task) 274 { 275 int do_ret; 276 277 BUG_ON(task->tk_timeout_fn); 278 do_ret = rpc_test_and_set_running(task); 279 rpc_clear_queued(task); 280 if (do_ret) 281 return; 282 if (RPC_IS_ASYNC(task)) { 283 int status; 284 285 INIT_WORK(&task->u.tk_work, rpc_async_schedule, (void *)task); 286 status = queue_work(task->tk_workqueue, &task->u.tk_work); 287 if (status < 0) { 288 printk(KERN_WARNING "RPC: failed to add task to queue: error: %d!\n", status); 289 task->tk_status = status; 290 return; 291 } 292 } else 293 wake_up(&task->u.tk_wait.waitq); 294 } 295 296 /* 297 * Place a newly initialized task on the workqueue. 298 */ 299 static inline void 300 rpc_schedule_run(struct rpc_task *task) 301 { 302 /* Don't run a child twice! */ 303 if (RPC_IS_ACTIVATED(task)) 304 return; 305 task->tk_active = 1; 306 rpc_make_runnable(task); 307 } 308 309 /* 310 * Prepare for sleeping on a wait queue. 311 * By always appending tasks to the list we ensure FIFO behavior. 312 * NB: An RPC task will only receive interrupt-driven events as long 313 * as it's on a wait queue. 314 */ 315 static void __rpc_sleep_on(struct rpc_wait_queue *q, struct rpc_task *task, 316 rpc_action action, rpc_action timer) 317 { 318 dprintk("RPC: %4d sleep_on(queue \"%s\" time %ld)\n", task->tk_pid, 319 rpc_qname(q), jiffies); 320 321 if (!RPC_IS_ASYNC(task) && !RPC_IS_ACTIVATED(task)) { 322 printk(KERN_ERR "RPC: Inactive synchronous task put to sleep!\n"); 323 return; 324 } 325 326 /* Mark the task as being activated if so needed */ 327 if (!RPC_IS_ACTIVATED(task)) 328 task->tk_active = 1; 329 330 __rpc_add_wait_queue(q, task); 331 332 BUG_ON(task->tk_callback != NULL); 333 task->tk_callback = action; 334 __rpc_add_timer(task, timer); 335 } 336 337 void rpc_sleep_on(struct rpc_wait_queue *q, struct rpc_task *task, 338 rpc_action action, rpc_action timer) 339 { 340 /* 341 * Protect the queue operations. 342 */ 343 spin_lock_bh(&q->lock); 344 __rpc_sleep_on(q, task, action, timer); 345 spin_unlock_bh(&q->lock); 346 } 347 348 /** 349 * __rpc_do_wake_up_task - wake up a single rpc_task 350 * @task: task to be woken up 351 * 352 * Caller must hold queue->lock, and have cleared the task queued flag. 353 */ 354 static void __rpc_do_wake_up_task(struct rpc_task *task) 355 { 356 dprintk("RPC: %4d __rpc_wake_up_task (now %ld)\n", task->tk_pid, jiffies); 357 358 #ifdef RPC_DEBUG 359 BUG_ON(task->tk_magic != RPC_TASK_MAGIC_ID); 360 #endif 361 /* Has the task been executed yet? If not, we cannot wake it up! */ 362 if (!RPC_IS_ACTIVATED(task)) { 363 printk(KERN_ERR "RPC: Inactive task (%p) being woken up!\n", task); 364 return; 365 } 366 367 __rpc_disable_timer(task); 368 __rpc_remove_wait_queue(task); 369 370 rpc_make_runnable(task); 371 372 dprintk("RPC: __rpc_wake_up_task done\n"); 373 } 374 375 /* 376 * Wake up the specified task 377 */ 378 static void __rpc_wake_up_task(struct rpc_task *task) 379 { 380 if (rpc_start_wakeup(task)) { 381 if (RPC_IS_QUEUED(task)) 382 __rpc_do_wake_up_task(task); 383 rpc_finish_wakeup(task); 384 } 385 } 386 387 /* 388 * Default timeout handler if none specified by user 389 */ 390 static void 391 __rpc_default_timer(struct rpc_task *task) 392 { 393 dprintk("RPC: %d timeout (default timer)\n", task->tk_pid); 394 task->tk_status = -ETIMEDOUT; 395 rpc_wake_up_task(task); 396 } 397 398 /* 399 * Wake up the specified task 400 */ 401 void rpc_wake_up_task(struct rpc_task *task) 402 { 403 if (rpc_start_wakeup(task)) { 404 if (RPC_IS_QUEUED(task)) { 405 struct rpc_wait_queue *queue = task->u.tk_wait.rpc_waitq; 406 407 spin_lock_bh(&queue->lock); 408 __rpc_do_wake_up_task(task); 409 spin_unlock_bh(&queue->lock); 410 } 411 rpc_finish_wakeup(task); 412 } 413 } 414 415 /* 416 * Wake up the next task on a priority queue. 417 */ 418 static struct rpc_task * __rpc_wake_up_next_priority(struct rpc_wait_queue *queue) 419 { 420 struct list_head *q; 421 struct rpc_task *task; 422 423 /* 424 * Service a batch of tasks from a single cookie. 425 */ 426 q = &queue->tasks[queue->priority]; 427 if (!list_empty(q)) { 428 task = list_entry(q->next, struct rpc_task, u.tk_wait.list); 429 if (queue->cookie == task->tk_cookie) { 430 if (--queue->nr) 431 goto out; 432 list_move_tail(&task->u.tk_wait.list, q); 433 } 434 /* 435 * Check if we need to switch queues. 436 */ 437 if (--queue->count) 438 goto new_cookie; 439 } 440 441 /* 442 * Service the next queue. 443 */ 444 do { 445 if (q == &queue->tasks[0]) 446 q = &queue->tasks[queue->maxpriority]; 447 else 448 q = q - 1; 449 if (!list_empty(q)) { 450 task = list_entry(q->next, struct rpc_task, u.tk_wait.list); 451 goto new_queue; 452 } 453 } while (q != &queue->tasks[queue->priority]); 454 455 rpc_reset_waitqueue_priority(queue); 456 return NULL; 457 458 new_queue: 459 rpc_set_waitqueue_priority(queue, (unsigned int)(q - &queue->tasks[0])); 460 new_cookie: 461 rpc_set_waitqueue_cookie(queue, task->tk_cookie); 462 out: 463 __rpc_wake_up_task(task); 464 return task; 465 } 466 467 /* 468 * Wake up the next task on the wait queue. 469 */ 470 struct rpc_task * rpc_wake_up_next(struct rpc_wait_queue *queue) 471 { 472 struct rpc_task *task = NULL; 473 474 dprintk("RPC: wake_up_next(%p \"%s\")\n", queue, rpc_qname(queue)); 475 spin_lock_bh(&queue->lock); 476 if (RPC_IS_PRIORITY(queue)) 477 task = __rpc_wake_up_next_priority(queue); 478 else { 479 task_for_first(task, &queue->tasks[0]) 480 __rpc_wake_up_task(task); 481 } 482 spin_unlock_bh(&queue->lock); 483 484 return task; 485 } 486 487 /** 488 * rpc_wake_up - wake up all rpc_tasks 489 * @queue: rpc_wait_queue on which the tasks are sleeping 490 * 491 * Grabs queue->lock 492 */ 493 void rpc_wake_up(struct rpc_wait_queue *queue) 494 { 495 struct rpc_task *task; 496 497 struct list_head *head; 498 spin_lock_bh(&queue->lock); 499 head = &queue->tasks[queue->maxpriority]; 500 for (;;) { 501 while (!list_empty(head)) { 502 task = list_entry(head->next, struct rpc_task, u.tk_wait.list); 503 __rpc_wake_up_task(task); 504 } 505 if (head == &queue->tasks[0]) 506 break; 507 head--; 508 } 509 spin_unlock_bh(&queue->lock); 510 } 511 512 /** 513 * rpc_wake_up_status - wake up all rpc_tasks and set their status value. 514 * @queue: rpc_wait_queue on which the tasks are sleeping 515 * @status: status value to set 516 * 517 * Grabs queue->lock 518 */ 519 void rpc_wake_up_status(struct rpc_wait_queue *queue, int status) 520 { 521 struct list_head *head; 522 struct rpc_task *task; 523 524 spin_lock_bh(&queue->lock); 525 head = &queue->tasks[queue->maxpriority]; 526 for (;;) { 527 while (!list_empty(head)) { 528 task = list_entry(head->next, struct rpc_task, u.tk_wait.list); 529 task->tk_status = status; 530 __rpc_wake_up_task(task); 531 } 532 if (head == &queue->tasks[0]) 533 break; 534 head--; 535 } 536 spin_unlock_bh(&queue->lock); 537 } 538 539 /* 540 * Run a task at a later time 541 */ 542 static void __rpc_atrun(struct rpc_task *); 543 void 544 rpc_delay(struct rpc_task *task, unsigned long delay) 545 { 546 task->tk_timeout = delay; 547 rpc_sleep_on(&delay_queue, task, NULL, __rpc_atrun); 548 } 549 550 static void 551 __rpc_atrun(struct rpc_task *task) 552 { 553 task->tk_status = 0; 554 rpc_wake_up_task(task); 555 } 556 557 /* 558 * This is the RPC `scheduler' (or rather, the finite state machine). 559 */ 560 static int __rpc_execute(struct rpc_task *task) 561 { 562 int status = 0; 563 564 dprintk("RPC: %4d rpc_execute flgs %x\n", 565 task->tk_pid, task->tk_flags); 566 567 BUG_ON(RPC_IS_QUEUED(task)); 568 569 restarted: 570 while (1) { 571 /* 572 * Garbage collection of pending timers... 573 */ 574 rpc_delete_timer(task); 575 576 /* 577 * Execute any pending callback. 578 */ 579 if (RPC_DO_CALLBACK(task)) { 580 /* Define a callback save pointer */ 581 void (*save_callback)(struct rpc_task *); 582 583 /* 584 * If a callback exists, save it, reset it, 585 * call it. 586 * The save is needed to stop from resetting 587 * another callback set within the callback handler 588 * - Dave 589 */ 590 save_callback=task->tk_callback; 591 task->tk_callback=NULL; 592 lock_kernel(); 593 save_callback(task); 594 unlock_kernel(); 595 } 596 597 /* 598 * Perform the next FSM step. 599 * tk_action may be NULL when the task has been killed 600 * by someone else. 601 */ 602 if (!RPC_IS_QUEUED(task)) { 603 if (!task->tk_action) 604 break; 605 lock_kernel(); 606 task->tk_action(task); 607 unlock_kernel(); 608 } 609 610 /* 611 * Lockless check for whether task is sleeping or not. 612 */ 613 if (!RPC_IS_QUEUED(task)) 614 continue; 615 rpc_clear_running(task); 616 if (RPC_IS_ASYNC(task)) { 617 /* Careful! we may have raced... */ 618 if (RPC_IS_QUEUED(task)) 619 return 0; 620 if (rpc_test_and_set_running(task)) 621 return 0; 622 continue; 623 } 624 625 /* sync task: sleep here */ 626 dprintk("RPC: %4d sync task going to sleep\n", task->tk_pid); 627 if (RPC_TASK_UNINTERRUPTIBLE(task)) { 628 __wait_event(task->u.tk_wait.waitq, !RPC_IS_QUEUED(task)); 629 } else { 630 __wait_event_interruptible(task->u.tk_wait.waitq, !RPC_IS_QUEUED(task), status); 631 /* 632 * When a sync task receives a signal, it exits with 633 * -ERESTARTSYS. In order to catch any callbacks that 634 * clean up after sleeping on some queue, we don't 635 * break the loop here, but go around once more. 636 */ 637 if (status == -ERESTARTSYS) { 638 dprintk("RPC: %4d got signal\n", task->tk_pid); 639 task->tk_flags |= RPC_TASK_KILLED; 640 rpc_exit(task, -ERESTARTSYS); 641 rpc_wake_up_task(task); 642 } 643 } 644 rpc_set_running(task); 645 dprintk("RPC: %4d sync task resuming\n", task->tk_pid); 646 } 647 648 if (task->tk_exit) { 649 lock_kernel(); 650 task->tk_exit(task); 651 unlock_kernel(); 652 /* If tk_action is non-null, the user wants us to restart */ 653 if (task->tk_action) { 654 if (!RPC_ASSASSINATED(task)) { 655 /* Release RPC slot and buffer memory */ 656 if (task->tk_rqstp) 657 xprt_release(task); 658 rpc_free(task); 659 goto restarted; 660 } 661 printk(KERN_ERR "RPC: dead task tries to walk away.\n"); 662 } 663 } 664 665 dprintk("RPC: %4d exit() = %d\n", task->tk_pid, task->tk_status); 666 status = task->tk_status; 667 668 /* Release all resources associated with the task */ 669 rpc_release_task(task); 670 return status; 671 } 672 673 /* 674 * User-visible entry point to the scheduler. 675 * 676 * This may be called recursively if e.g. an async NFS task updates 677 * the attributes and finds that dirty pages must be flushed. 678 * NOTE: Upon exit of this function the task is guaranteed to be 679 * released. In particular note that tk_release() will have 680 * been called, so your task memory may have been freed. 681 */ 682 int 683 rpc_execute(struct rpc_task *task) 684 { 685 BUG_ON(task->tk_active); 686 687 task->tk_active = 1; 688 rpc_set_running(task); 689 return __rpc_execute(task); 690 } 691 692 static void rpc_async_schedule(void *arg) 693 { 694 __rpc_execute((struct rpc_task *)arg); 695 } 696 697 /* 698 * Allocate memory for RPC purposes. 699 * 700 * We try to ensure that some NFS reads and writes can always proceed 701 * by using a mempool when allocating 'small' buffers. 702 * In order to avoid memory starvation triggering more writebacks of 703 * NFS requests, we use GFP_NOFS rather than GFP_KERNEL. 704 */ 705 void * 706 rpc_malloc(struct rpc_task *task, size_t size) 707 { 708 int gfp; 709 710 if (task->tk_flags & RPC_TASK_SWAPPER) 711 gfp = GFP_ATOMIC; 712 else 713 gfp = GFP_NOFS; 714 715 if (size > RPC_BUFFER_MAXSIZE) { 716 task->tk_buffer = kmalloc(size, gfp); 717 if (task->tk_buffer) 718 task->tk_bufsize = size; 719 } else { 720 task->tk_buffer = mempool_alloc(rpc_buffer_mempool, gfp); 721 if (task->tk_buffer) 722 task->tk_bufsize = RPC_BUFFER_MAXSIZE; 723 } 724 return task->tk_buffer; 725 } 726 727 static void 728 rpc_free(struct rpc_task *task) 729 { 730 if (task->tk_buffer) { 731 if (task->tk_bufsize == RPC_BUFFER_MAXSIZE) 732 mempool_free(task->tk_buffer, rpc_buffer_mempool); 733 else 734 kfree(task->tk_buffer); 735 task->tk_buffer = NULL; 736 task->tk_bufsize = 0; 737 } 738 } 739 740 /* 741 * Creation and deletion of RPC task structures 742 */ 743 void rpc_init_task(struct rpc_task *task, struct rpc_clnt *clnt, rpc_action callback, int flags) 744 { 745 memset(task, 0, sizeof(*task)); 746 init_timer(&task->tk_timer); 747 task->tk_timer.data = (unsigned long) task; 748 task->tk_timer.function = (void (*)(unsigned long)) rpc_run_timer; 749 task->tk_client = clnt; 750 task->tk_flags = flags; 751 task->tk_exit = callback; 752 753 /* Initialize retry counters */ 754 task->tk_garb_retry = 2; 755 task->tk_cred_retry = 2; 756 757 task->tk_priority = RPC_PRIORITY_NORMAL; 758 task->tk_cookie = (unsigned long)current; 759 760 /* Initialize workqueue for async tasks */ 761 task->tk_workqueue = rpciod_workqueue; 762 if (!RPC_IS_ASYNC(task)) 763 init_waitqueue_head(&task->u.tk_wait.waitq); 764 765 if (clnt) { 766 atomic_inc(&clnt->cl_users); 767 if (clnt->cl_softrtry) 768 task->tk_flags |= RPC_TASK_SOFT; 769 if (!clnt->cl_intr) 770 task->tk_flags |= RPC_TASK_NOINTR; 771 } 772 773 #ifdef RPC_DEBUG 774 task->tk_magic = RPC_TASK_MAGIC_ID; 775 task->tk_pid = rpc_task_id++; 776 #endif 777 /* Add to global list of all tasks */ 778 spin_lock(&rpc_sched_lock); 779 list_add_tail(&task->tk_task, &all_tasks); 780 spin_unlock(&rpc_sched_lock); 781 782 dprintk("RPC: %4d new task procpid %d\n", task->tk_pid, 783 current->pid); 784 } 785 786 static struct rpc_task * 787 rpc_alloc_task(void) 788 { 789 return (struct rpc_task *)mempool_alloc(rpc_task_mempool, GFP_NOFS); 790 } 791 792 static void 793 rpc_default_free_task(struct rpc_task *task) 794 { 795 dprintk("RPC: %4d freeing task\n", task->tk_pid); 796 mempool_free(task, rpc_task_mempool); 797 } 798 799 /* 800 * Create a new task for the specified client. We have to 801 * clean up after an allocation failure, as the client may 802 * have specified "oneshot". 803 */ 804 struct rpc_task * 805 rpc_new_task(struct rpc_clnt *clnt, rpc_action callback, int flags) 806 { 807 struct rpc_task *task; 808 809 task = rpc_alloc_task(); 810 if (!task) 811 goto cleanup; 812 813 rpc_init_task(task, clnt, callback, flags); 814 815 /* Replace tk_release */ 816 task->tk_release = rpc_default_free_task; 817 818 dprintk("RPC: %4d allocated task\n", task->tk_pid); 819 task->tk_flags |= RPC_TASK_DYNAMIC; 820 out: 821 return task; 822 823 cleanup: 824 /* Check whether to release the client */ 825 if (clnt) { 826 printk("rpc_new_task: failed, users=%d, oneshot=%d\n", 827 atomic_read(&clnt->cl_users), clnt->cl_oneshot); 828 atomic_inc(&clnt->cl_users); /* pretend we were used ... */ 829 rpc_release_client(clnt); 830 } 831 goto out; 832 } 833 834 void rpc_release_task(struct rpc_task *task) 835 { 836 dprintk("RPC: %4d release task\n", task->tk_pid); 837 838 #ifdef RPC_DEBUG 839 BUG_ON(task->tk_magic != RPC_TASK_MAGIC_ID); 840 #endif 841 842 /* Remove from global task list */ 843 spin_lock(&rpc_sched_lock); 844 list_del(&task->tk_task); 845 spin_unlock(&rpc_sched_lock); 846 847 BUG_ON (RPC_IS_QUEUED(task)); 848 task->tk_active = 0; 849 850 /* Synchronously delete any running timer */ 851 rpc_delete_timer(task); 852 853 /* Release resources */ 854 if (task->tk_rqstp) 855 xprt_release(task); 856 if (task->tk_msg.rpc_cred) 857 rpcauth_unbindcred(task); 858 rpc_free(task); 859 if (task->tk_client) { 860 rpc_release_client(task->tk_client); 861 task->tk_client = NULL; 862 } 863 864 #ifdef RPC_DEBUG 865 task->tk_magic = 0; 866 #endif 867 if (task->tk_release) 868 task->tk_release(task); 869 } 870 871 /** 872 * rpc_find_parent - find the parent of a child task. 873 * @child: child task 874 * 875 * Checks that the parent task is still sleeping on the 876 * queue 'childq'. If so returns a pointer to the parent. 877 * Upon failure returns NULL. 878 * 879 * Caller must hold childq.lock 880 */ 881 static inline struct rpc_task *rpc_find_parent(struct rpc_task *child) 882 { 883 struct rpc_task *task, *parent; 884 struct list_head *le; 885 886 parent = (struct rpc_task *) child->tk_calldata; 887 task_for_each(task, le, &childq.tasks[0]) 888 if (task == parent) 889 return parent; 890 891 return NULL; 892 } 893 894 static void rpc_child_exit(struct rpc_task *child) 895 { 896 struct rpc_task *parent; 897 898 spin_lock_bh(&childq.lock); 899 if ((parent = rpc_find_parent(child)) != NULL) { 900 parent->tk_status = child->tk_status; 901 __rpc_wake_up_task(parent); 902 } 903 spin_unlock_bh(&childq.lock); 904 } 905 906 /* 907 * Note: rpc_new_task releases the client after a failure. 908 */ 909 struct rpc_task * 910 rpc_new_child(struct rpc_clnt *clnt, struct rpc_task *parent) 911 { 912 struct rpc_task *task; 913 914 task = rpc_new_task(clnt, NULL, RPC_TASK_ASYNC | RPC_TASK_CHILD); 915 if (!task) 916 goto fail; 917 task->tk_exit = rpc_child_exit; 918 task->tk_calldata = parent; 919 return task; 920 921 fail: 922 parent->tk_status = -ENOMEM; 923 return NULL; 924 } 925 926 void rpc_run_child(struct rpc_task *task, struct rpc_task *child, rpc_action func) 927 { 928 spin_lock_bh(&childq.lock); 929 /* N.B. Is it possible for the child to have already finished? */ 930 __rpc_sleep_on(&childq, task, func, NULL); 931 rpc_schedule_run(child); 932 spin_unlock_bh(&childq.lock); 933 } 934 935 /* 936 * Kill all tasks for the given client. 937 * XXX: kill their descendants as well? 938 */ 939 void rpc_killall_tasks(struct rpc_clnt *clnt) 940 { 941 struct rpc_task *rovr; 942 struct list_head *le; 943 944 dprintk("RPC: killing all tasks for client %p\n", clnt); 945 946 /* 947 * Spin lock all_tasks to prevent changes... 948 */ 949 spin_lock(&rpc_sched_lock); 950 alltask_for_each(rovr, le, &all_tasks) { 951 if (! RPC_IS_ACTIVATED(rovr)) 952 continue; 953 if (!clnt || rovr->tk_client == clnt) { 954 rovr->tk_flags |= RPC_TASK_KILLED; 955 rpc_exit(rovr, -EIO); 956 rpc_wake_up_task(rovr); 957 } 958 } 959 spin_unlock(&rpc_sched_lock); 960 } 961 962 static DECLARE_MUTEX_LOCKED(rpciod_running); 963 964 static void rpciod_killall(void) 965 { 966 unsigned long flags; 967 968 while (!list_empty(&all_tasks)) { 969 clear_thread_flag(TIF_SIGPENDING); 970 rpc_killall_tasks(NULL); 971 flush_workqueue(rpciod_workqueue); 972 if (!list_empty(&all_tasks)) { 973 dprintk("rpciod_killall: waiting for tasks to exit\n"); 974 yield(); 975 } 976 } 977 978 spin_lock_irqsave(¤t->sighand->siglock, flags); 979 recalc_sigpending(); 980 spin_unlock_irqrestore(¤t->sighand->siglock, flags); 981 } 982 983 /* 984 * Start up the rpciod process if it's not already running. 985 */ 986 int 987 rpciod_up(void) 988 { 989 struct workqueue_struct *wq; 990 int error = 0; 991 992 down(&rpciod_sema); 993 dprintk("rpciod_up: users %d\n", rpciod_users); 994 rpciod_users++; 995 if (rpciod_workqueue) 996 goto out; 997 /* 998 * If there's no pid, we should be the first user. 999 */ 1000 if (rpciod_users > 1) 1001 printk(KERN_WARNING "rpciod_up: no workqueue, %d users??\n", rpciod_users); 1002 /* 1003 * Create the rpciod thread and wait for it to start. 1004 */ 1005 error = -ENOMEM; 1006 wq = create_workqueue("rpciod"); 1007 if (wq == NULL) { 1008 printk(KERN_WARNING "rpciod_up: create workqueue failed, error=%d\n", error); 1009 rpciod_users--; 1010 goto out; 1011 } 1012 rpciod_workqueue = wq; 1013 error = 0; 1014 out: 1015 up(&rpciod_sema); 1016 return error; 1017 } 1018 1019 void 1020 rpciod_down(void) 1021 { 1022 down(&rpciod_sema); 1023 dprintk("rpciod_down sema %d\n", rpciod_users); 1024 if (rpciod_users) { 1025 if (--rpciod_users) 1026 goto out; 1027 } else 1028 printk(KERN_WARNING "rpciod_down: no users??\n"); 1029 1030 if (!rpciod_workqueue) { 1031 dprintk("rpciod_down: Nothing to do!\n"); 1032 goto out; 1033 } 1034 rpciod_killall(); 1035 1036 destroy_workqueue(rpciod_workqueue); 1037 rpciod_workqueue = NULL; 1038 out: 1039 up(&rpciod_sema); 1040 } 1041 1042 #ifdef RPC_DEBUG 1043 void rpc_show_tasks(void) 1044 { 1045 struct list_head *le; 1046 struct rpc_task *t; 1047 1048 spin_lock(&rpc_sched_lock); 1049 if (list_empty(&all_tasks)) { 1050 spin_unlock(&rpc_sched_lock); 1051 return; 1052 } 1053 printk("-pid- proc flgs status -client- -prog- --rqstp- -timeout " 1054 "-rpcwait -action- --exit--\n"); 1055 alltask_for_each(t, le, &all_tasks) { 1056 const char *rpc_waitq = "none"; 1057 1058 if (RPC_IS_QUEUED(t)) 1059 rpc_waitq = rpc_qname(t->u.tk_wait.rpc_waitq); 1060 1061 printk("%05d %04d %04x %06d %8p %6d %8p %08ld %8s %8p %8p\n", 1062 t->tk_pid, 1063 (t->tk_msg.rpc_proc ? t->tk_msg.rpc_proc->p_proc : -1), 1064 t->tk_flags, t->tk_status, 1065 t->tk_client, 1066 (t->tk_client ? t->tk_client->cl_prog : 0), 1067 t->tk_rqstp, t->tk_timeout, 1068 rpc_waitq, 1069 t->tk_action, t->tk_exit); 1070 } 1071 spin_unlock(&rpc_sched_lock); 1072 } 1073 #endif 1074 1075 void 1076 rpc_destroy_mempool(void) 1077 { 1078 if (rpc_buffer_mempool) 1079 mempool_destroy(rpc_buffer_mempool); 1080 if (rpc_task_mempool) 1081 mempool_destroy(rpc_task_mempool); 1082 if (rpc_task_slabp && kmem_cache_destroy(rpc_task_slabp)) 1083 printk(KERN_INFO "rpc_task: not all structures were freed\n"); 1084 if (rpc_buffer_slabp && kmem_cache_destroy(rpc_buffer_slabp)) 1085 printk(KERN_INFO "rpc_buffers: not all structures were freed\n"); 1086 } 1087 1088 int 1089 rpc_init_mempool(void) 1090 { 1091 rpc_task_slabp = kmem_cache_create("rpc_tasks", 1092 sizeof(struct rpc_task), 1093 0, SLAB_HWCACHE_ALIGN, 1094 NULL, NULL); 1095 if (!rpc_task_slabp) 1096 goto err_nomem; 1097 rpc_buffer_slabp = kmem_cache_create("rpc_buffers", 1098 RPC_BUFFER_MAXSIZE, 1099 0, SLAB_HWCACHE_ALIGN, 1100 NULL, NULL); 1101 if (!rpc_buffer_slabp) 1102 goto err_nomem; 1103 rpc_task_mempool = mempool_create(RPC_TASK_POOLSIZE, 1104 mempool_alloc_slab, 1105 mempool_free_slab, 1106 rpc_task_slabp); 1107 if (!rpc_task_mempool) 1108 goto err_nomem; 1109 rpc_buffer_mempool = mempool_create(RPC_BUFFER_POOLSIZE, 1110 mempool_alloc_slab, 1111 mempool_free_slab, 1112 rpc_buffer_slabp); 1113 if (!rpc_buffer_mempool) 1114 goto err_nomem; 1115 return 0; 1116 err_nomem: 1117 rpc_destroy_mempool(); 1118 return -ENOMEM; 1119 } 1120