1 /* 2 * linux/net/sunrpc/xprt.c 3 * 4 * This is a generic RPC call interface supporting congestion avoidance, 5 * and asynchronous calls. 6 * 7 * The interface works like this: 8 * 9 * - When a process places a call, it allocates a request slot if 10 * one is available. Otherwise, it sleeps on the backlog queue 11 * (xprt_reserve). 12 * - Next, the caller puts together the RPC message, stuffs it into 13 * the request struct, and calls xprt_transmit(). 14 * - xprt_transmit sends the message and installs the caller on the 15 * transport's wait list. At the same time, if a reply is expected, 16 * it installs a timer that is run after the packet's timeout has 17 * expired. 18 * - When a packet arrives, the data_ready handler walks the list of 19 * pending requests for that transport. If a matching XID is found, the 20 * caller is woken up, and the timer removed. 21 * - When no reply arrives within the timeout interval, the timer is 22 * fired by the kernel and runs xprt_timer(). It either adjusts the 23 * timeout values (minor timeout) or wakes up the caller with a status 24 * of -ETIMEDOUT. 25 * - When the caller receives a notification from RPC that a reply arrived, 26 * it should release the RPC slot, and process the reply. 27 * If the call timed out, it may choose to retry the operation by 28 * adjusting the initial timeout value, and simply calling rpc_call 29 * again. 30 * 31 * Support for async RPC is done through a set of RPC-specific scheduling 32 * primitives that `transparently' work for processes as well as async 33 * tasks that rely on callbacks. 34 * 35 * Copyright (C) 1995-1997, Olaf Kirch <okir@monad.swb.de> 36 * 37 * Transport switch API copyright (C) 2005, Chuck Lever <cel@netapp.com> 38 */ 39 40 #include <linux/module.h> 41 42 #include <linux/types.h> 43 #include <linux/interrupt.h> 44 #include <linux/workqueue.h> 45 #include <linux/net.h> 46 #include <linux/ktime.h> 47 48 #include <linux/sunrpc/clnt.h> 49 #include <linux/sunrpc/metrics.h> 50 #include <linux/sunrpc/bc_xprt.h> 51 52 #include "sunrpc.h" 53 54 /* 55 * Local variables 56 */ 57 58 #ifdef RPC_DEBUG 59 # define RPCDBG_FACILITY RPCDBG_XPRT 60 #endif 61 62 /* 63 * Local functions 64 */ 65 static void xprt_request_init(struct rpc_task *, struct rpc_xprt *); 66 static void xprt_connect_status(struct rpc_task *task); 67 static int __xprt_get_cong(struct rpc_xprt *, struct rpc_task *); 68 69 static DEFINE_SPINLOCK(xprt_list_lock); 70 static LIST_HEAD(xprt_list); 71 72 /* 73 * The transport code maintains an estimate on the maximum number of out- 74 * standing RPC requests, using a smoothed version of the congestion 75 * avoidance implemented in 44BSD. This is basically the Van Jacobson 76 * congestion algorithm: If a retransmit occurs, the congestion window is 77 * halved; otherwise, it is incremented by 1/cwnd when 78 * 79 * - a reply is received and 80 * - a full number of requests are outstanding and 81 * - the congestion window hasn't been updated recently. 82 */ 83 #define RPC_CWNDSHIFT (8U) 84 #define RPC_CWNDSCALE (1U << RPC_CWNDSHIFT) 85 #define RPC_INITCWND RPC_CWNDSCALE 86 #define RPC_MAXCWND(xprt) ((xprt)->max_reqs << RPC_CWNDSHIFT) 87 88 #define RPCXPRT_CONGESTED(xprt) ((xprt)->cong >= (xprt)->cwnd) 89 90 /** 91 * xprt_register_transport - register a transport implementation 92 * @transport: transport to register 93 * 94 * If a transport implementation is loaded as a kernel module, it can 95 * call this interface to make itself known to the RPC client. 96 * 97 * Returns: 98 * 0: transport successfully registered 99 * -EEXIST: transport already registered 100 * -EINVAL: transport module being unloaded 101 */ 102 int xprt_register_transport(struct xprt_class *transport) 103 { 104 struct xprt_class *t; 105 int result; 106 107 result = -EEXIST; 108 spin_lock(&xprt_list_lock); 109 list_for_each_entry(t, &xprt_list, list) { 110 /* don't register the same transport class twice */ 111 if (t->ident == transport->ident) 112 goto out; 113 } 114 115 list_add_tail(&transport->list, &xprt_list); 116 printk(KERN_INFO "RPC: Registered %s transport module.\n", 117 transport->name); 118 result = 0; 119 120 out: 121 spin_unlock(&xprt_list_lock); 122 return result; 123 } 124 EXPORT_SYMBOL_GPL(xprt_register_transport); 125 126 /** 127 * xprt_unregister_transport - unregister a transport implementation 128 * @transport: transport to unregister 129 * 130 * Returns: 131 * 0: transport successfully unregistered 132 * -ENOENT: transport never registered 133 */ 134 int xprt_unregister_transport(struct xprt_class *transport) 135 { 136 struct xprt_class *t; 137 int result; 138 139 result = 0; 140 spin_lock(&xprt_list_lock); 141 list_for_each_entry(t, &xprt_list, list) { 142 if (t == transport) { 143 printk(KERN_INFO 144 "RPC: Unregistered %s transport module.\n", 145 transport->name); 146 list_del_init(&transport->list); 147 goto out; 148 } 149 } 150 result = -ENOENT; 151 152 out: 153 spin_unlock(&xprt_list_lock); 154 return result; 155 } 156 EXPORT_SYMBOL_GPL(xprt_unregister_transport); 157 158 /** 159 * xprt_load_transport - load a transport implementation 160 * @transport_name: transport to load 161 * 162 * Returns: 163 * 0: transport successfully loaded 164 * -ENOENT: transport module not available 165 */ 166 int xprt_load_transport(const char *transport_name) 167 { 168 struct xprt_class *t; 169 int result; 170 171 result = 0; 172 spin_lock(&xprt_list_lock); 173 list_for_each_entry(t, &xprt_list, list) { 174 if (strcmp(t->name, transport_name) == 0) { 175 spin_unlock(&xprt_list_lock); 176 goto out; 177 } 178 } 179 spin_unlock(&xprt_list_lock); 180 result = request_module("xprt%s", transport_name); 181 out: 182 return result; 183 } 184 EXPORT_SYMBOL_GPL(xprt_load_transport); 185 186 /** 187 * xprt_reserve_xprt - serialize write access to transports 188 * @task: task that is requesting access to the transport 189 * 190 * This prevents mixing the payload of separate requests, and prevents 191 * transport connects from colliding with writes. No congestion control 192 * is provided. 193 */ 194 int xprt_reserve_xprt(struct rpc_task *task) 195 { 196 struct rpc_rqst *req = task->tk_rqstp; 197 struct rpc_xprt *xprt = req->rq_xprt; 198 199 if (test_and_set_bit(XPRT_LOCKED, &xprt->state)) { 200 if (task == xprt->snd_task) 201 return 1; 202 if (task == NULL) 203 return 0; 204 goto out_sleep; 205 } 206 xprt->snd_task = task; 207 if (req) { 208 req->rq_bytes_sent = 0; 209 req->rq_ntrans++; 210 } 211 return 1; 212 213 out_sleep: 214 dprintk("RPC: %5u failed to lock transport %p\n", 215 task->tk_pid, xprt); 216 task->tk_timeout = 0; 217 task->tk_status = -EAGAIN; 218 if (req && req->rq_ntrans) 219 rpc_sleep_on(&xprt->resend, task, NULL); 220 else 221 rpc_sleep_on(&xprt->sending, task, NULL); 222 return 0; 223 } 224 EXPORT_SYMBOL_GPL(xprt_reserve_xprt); 225 226 static void xprt_clear_locked(struct rpc_xprt *xprt) 227 { 228 xprt->snd_task = NULL; 229 if (!test_bit(XPRT_CLOSE_WAIT, &xprt->state) || xprt->shutdown) { 230 smp_mb__before_clear_bit(); 231 clear_bit(XPRT_LOCKED, &xprt->state); 232 smp_mb__after_clear_bit(); 233 } else 234 queue_work(rpciod_workqueue, &xprt->task_cleanup); 235 } 236 237 /* 238 * xprt_reserve_xprt_cong - serialize write access to transports 239 * @task: task that is requesting access to the transport 240 * 241 * Same as xprt_reserve_xprt, but Van Jacobson congestion control is 242 * integrated into the decision of whether a request is allowed to be 243 * woken up and given access to the transport. 244 */ 245 int xprt_reserve_xprt_cong(struct rpc_task *task) 246 { 247 struct rpc_xprt *xprt = task->tk_xprt; 248 struct rpc_rqst *req = task->tk_rqstp; 249 250 if (test_and_set_bit(XPRT_LOCKED, &xprt->state)) { 251 if (task == xprt->snd_task) 252 return 1; 253 goto out_sleep; 254 } 255 if (__xprt_get_cong(xprt, task)) { 256 xprt->snd_task = task; 257 if (req) { 258 req->rq_bytes_sent = 0; 259 req->rq_ntrans++; 260 } 261 return 1; 262 } 263 xprt_clear_locked(xprt); 264 out_sleep: 265 dprintk("RPC: %5u failed to lock transport %p\n", task->tk_pid, xprt); 266 task->tk_timeout = 0; 267 task->tk_status = -EAGAIN; 268 if (req && req->rq_ntrans) 269 rpc_sleep_on(&xprt->resend, task, NULL); 270 else 271 rpc_sleep_on(&xprt->sending, task, NULL); 272 return 0; 273 } 274 EXPORT_SYMBOL_GPL(xprt_reserve_xprt_cong); 275 276 static inline int xprt_lock_write(struct rpc_xprt *xprt, struct rpc_task *task) 277 { 278 int retval; 279 280 spin_lock_bh(&xprt->transport_lock); 281 retval = xprt->ops->reserve_xprt(task); 282 spin_unlock_bh(&xprt->transport_lock); 283 return retval; 284 } 285 286 static void __xprt_lock_write_next(struct rpc_xprt *xprt) 287 { 288 struct rpc_task *task; 289 struct rpc_rqst *req; 290 291 if (test_and_set_bit(XPRT_LOCKED, &xprt->state)) 292 return; 293 294 task = rpc_wake_up_next(&xprt->resend); 295 if (!task) { 296 task = rpc_wake_up_next(&xprt->sending); 297 if (!task) 298 goto out_unlock; 299 } 300 301 req = task->tk_rqstp; 302 xprt->snd_task = task; 303 if (req) { 304 req->rq_bytes_sent = 0; 305 req->rq_ntrans++; 306 } 307 return; 308 309 out_unlock: 310 xprt_clear_locked(xprt); 311 } 312 313 static void __xprt_lock_write_next_cong(struct rpc_xprt *xprt) 314 { 315 struct rpc_task *task; 316 317 if (test_and_set_bit(XPRT_LOCKED, &xprt->state)) 318 return; 319 if (RPCXPRT_CONGESTED(xprt)) 320 goto out_unlock; 321 task = rpc_wake_up_next(&xprt->resend); 322 if (!task) { 323 task = rpc_wake_up_next(&xprt->sending); 324 if (!task) 325 goto out_unlock; 326 } 327 if (__xprt_get_cong(xprt, task)) { 328 struct rpc_rqst *req = task->tk_rqstp; 329 xprt->snd_task = task; 330 if (req) { 331 req->rq_bytes_sent = 0; 332 req->rq_ntrans++; 333 } 334 return; 335 } 336 out_unlock: 337 xprt_clear_locked(xprt); 338 } 339 340 /** 341 * xprt_release_xprt - allow other requests to use a transport 342 * @xprt: transport with other tasks potentially waiting 343 * @task: task that is releasing access to the transport 344 * 345 * Note that "task" can be NULL. No congestion control is provided. 346 */ 347 void xprt_release_xprt(struct rpc_xprt *xprt, struct rpc_task *task) 348 { 349 if (xprt->snd_task == task) { 350 xprt_clear_locked(xprt); 351 __xprt_lock_write_next(xprt); 352 } 353 } 354 EXPORT_SYMBOL_GPL(xprt_release_xprt); 355 356 /** 357 * xprt_release_xprt_cong - allow other requests to use a transport 358 * @xprt: transport with other tasks potentially waiting 359 * @task: task that is releasing access to the transport 360 * 361 * Note that "task" can be NULL. Another task is awoken to use the 362 * transport if the transport's congestion window allows it. 363 */ 364 void xprt_release_xprt_cong(struct rpc_xprt *xprt, struct rpc_task *task) 365 { 366 if (xprt->snd_task == task) { 367 xprt_clear_locked(xprt); 368 __xprt_lock_write_next_cong(xprt); 369 } 370 } 371 EXPORT_SYMBOL_GPL(xprt_release_xprt_cong); 372 373 static inline void xprt_release_write(struct rpc_xprt *xprt, struct rpc_task *task) 374 { 375 spin_lock_bh(&xprt->transport_lock); 376 xprt->ops->release_xprt(xprt, task); 377 spin_unlock_bh(&xprt->transport_lock); 378 } 379 380 /* 381 * Van Jacobson congestion avoidance. Check if the congestion window 382 * overflowed. Put the task to sleep if this is the case. 383 */ 384 static int 385 __xprt_get_cong(struct rpc_xprt *xprt, struct rpc_task *task) 386 { 387 struct rpc_rqst *req = task->tk_rqstp; 388 389 if (req->rq_cong) 390 return 1; 391 dprintk("RPC: %5u xprt_cwnd_limited cong = %lu cwnd = %lu\n", 392 task->tk_pid, xprt->cong, xprt->cwnd); 393 if (RPCXPRT_CONGESTED(xprt)) 394 return 0; 395 req->rq_cong = 1; 396 xprt->cong += RPC_CWNDSCALE; 397 return 1; 398 } 399 400 /* 401 * Adjust the congestion window, and wake up the next task 402 * that has been sleeping due to congestion 403 */ 404 static void 405 __xprt_put_cong(struct rpc_xprt *xprt, struct rpc_rqst *req) 406 { 407 if (!req->rq_cong) 408 return; 409 req->rq_cong = 0; 410 xprt->cong -= RPC_CWNDSCALE; 411 __xprt_lock_write_next_cong(xprt); 412 } 413 414 /** 415 * xprt_release_rqst_cong - housekeeping when request is complete 416 * @task: RPC request that recently completed 417 * 418 * Useful for transports that require congestion control. 419 */ 420 void xprt_release_rqst_cong(struct rpc_task *task) 421 { 422 __xprt_put_cong(task->tk_xprt, task->tk_rqstp); 423 } 424 EXPORT_SYMBOL_GPL(xprt_release_rqst_cong); 425 426 /** 427 * xprt_adjust_cwnd - adjust transport congestion window 428 * @task: recently completed RPC request used to adjust window 429 * @result: result code of completed RPC request 430 * 431 * We use a time-smoothed congestion estimator to avoid heavy oscillation. 432 */ 433 void xprt_adjust_cwnd(struct rpc_task *task, int result) 434 { 435 struct rpc_rqst *req = task->tk_rqstp; 436 struct rpc_xprt *xprt = task->tk_xprt; 437 unsigned long cwnd = xprt->cwnd; 438 439 if (result >= 0 && cwnd <= xprt->cong) { 440 /* The (cwnd >> 1) term makes sure 441 * the result gets rounded properly. */ 442 cwnd += (RPC_CWNDSCALE * RPC_CWNDSCALE + (cwnd >> 1)) / cwnd; 443 if (cwnd > RPC_MAXCWND(xprt)) 444 cwnd = RPC_MAXCWND(xprt); 445 __xprt_lock_write_next_cong(xprt); 446 } else if (result == -ETIMEDOUT) { 447 cwnd >>= 1; 448 if (cwnd < RPC_CWNDSCALE) 449 cwnd = RPC_CWNDSCALE; 450 } 451 dprintk("RPC: cong %ld, cwnd was %ld, now %ld\n", 452 xprt->cong, xprt->cwnd, cwnd); 453 xprt->cwnd = cwnd; 454 __xprt_put_cong(xprt, req); 455 } 456 EXPORT_SYMBOL_GPL(xprt_adjust_cwnd); 457 458 /** 459 * xprt_wake_pending_tasks - wake all tasks on a transport's pending queue 460 * @xprt: transport with waiting tasks 461 * @status: result code to plant in each task before waking it 462 * 463 */ 464 void xprt_wake_pending_tasks(struct rpc_xprt *xprt, int status) 465 { 466 if (status < 0) 467 rpc_wake_up_status(&xprt->pending, status); 468 else 469 rpc_wake_up(&xprt->pending); 470 } 471 EXPORT_SYMBOL_GPL(xprt_wake_pending_tasks); 472 473 /** 474 * xprt_wait_for_buffer_space - wait for transport output buffer to clear 475 * @task: task to be put to sleep 476 * @action: function pointer to be executed after wait 477 */ 478 void xprt_wait_for_buffer_space(struct rpc_task *task, rpc_action action) 479 { 480 struct rpc_rqst *req = task->tk_rqstp; 481 struct rpc_xprt *xprt = req->rq_xprt; 482 483 task->tk_timeout = req->rq_timeout; 484 rpc_sleep_on(&xprt->pending, task, action); 485 } 486 EXPORT_SYMBOL_GPL(xprt_wait_for_buffer_space); 487 488 /** 489 * xprt_write_space - wake the task waiting for transport output buffer space 490 * @xprt: transport with waiting tasks 491 * 492 * Can be called in a soft IRQ context, so xprt_write_space never sleeps. 493 */ 494 void xprt_write_space(struct rpc_xprt *xprt) 495 { 496 if (unlikely(xprt->shutdown)) 497 return; 498 499 spin_lock_bh(&xprt->transport_lock); 500 if (xprt->snd_task) { 501 dprintk("RPC: write space: waking waiting task on " 502 "xprt %p\n", xprt); 503 rpc_wake_up_queued_task(&xprt->pending, xprt->snd_task); 504 } 505 spin_unlock_bh(&xprt->transport_lock); 506 } 507 EXPORT_SYMBOL_GPL(xprt_write_space); 508 509 /** 510 * xprt_set_retrans_timeout_def - set a request's retransmit timeout 511 * @task: task whose timeout is to be set 512 * 513 * Set a request's retransmit timeout based on the transport's 514 * default timeout parameters. Used by transports that don't adjust 515 * the retransmit timeout based on round-trip time estimation. 516 */ 517 void xprt_set_retrans_timeout_def(struct rpc_task *task) 518 { 519 task->tk_timeout = task->tk_rqstp->rq_timeout; 520 } 521 EXPORT_SYMBOL_GPL(xprt_set_retrans_timeout_def); 522 523 /* 524 * xprt_set_retrans_timeout_rtt - set a request's retransmit timeout 525 * @task: task whose timeout is to be set 526 * 527 * Set a request's retransmit timeout using the RTT estimator. 528 */ 529 void xprt_set_retrans_timeout_rtt(struct rpc_task *task) 530 { 531 int timer = task->tk_msg.rpc_proc->p_timer; 532 struct rpc_clnt *clnt = task->tk_client; 533 struct rpc_rtt *rtt = clnt->cl_rtt; 534 struct rpc_rqst *req = task->tk_rqstp; 535 unsigned long max_timeout = clnt->cl_timeout->to_maxval; 536 537 task->tk_timeout = rpc_calc_rto(rtt, timer); 538 task->tk_timeout <<= rpc_ntimeo(rtt, timer) + req->rq_retries; 539 if (task->tk_timeout > max_timeout || task->tk_timeout == 0) 540 task->tk_timeout = max_timeout; 541 } 542 EXPORT_SYMBOL_GPL(xprt_set_retrans_timeout_rtt); 543 544 static void xprt_reset_majortimeo(struct rpc_rqst *req) 545 { 546 const struct rpc_timeout *to = req->rq_task->tk_client->cl_timeout; 547 548 req->rq_majortimeo = req->rq_timeout; 549 if (to->to_exponential) 550 req->rq_majortimeo <<= to->to_retries; 551 else 552 req->rq_majortimeo += to->to_increment * to->to_retries; 553 if (req->rq_majortimeo > to->to_maxval || req->rq_majortimeo == 0) 554 req->rq_majortimeo = to->to_maxval; 555 req->rq_majortimeo += jiffies; 556 } 557 558 /** 559 * xprt_adjust_timeout - adjust timeout values for next retransmit 560 * @req: RPC request containing parameters to use for the adjustment 561 * 562 */ 563 int xprt_adjust_timeout(struct rpc_rqst *req) 564 { 565 struct rpc_xprt *xprt = req->rq_xprt; 566 const struct rpc_timeout *to = req->rq_task->tk_client->cl_timeout; 567 int status = 0; 568 569 if (time_before(jiffies, req->rq_majortimeo)) { 570 if (to->to_exponential) 571 req->rq_timeout <<= 1; 572 else 573 req->rq_timeout += to->to_increment; 574 if (to->to_maxval && req->rq_timeout >= to->to_maxval) 575 req->rq_timeout = to->to_maxval; 576 req->rq_retries++; 577 } else { 578 req->rq_timeout = to->to_initval; 579 req->rq_retries = 0; 580 xprt_reset_majortimeo(req); 581 /* Reset the RTT counters == "slow start" */ 582 spin_lock_bh(&xprt->transport_lock); 583 rpc_init_rtt(req->rq_task->tk_client->cl_rtt, to->to_initval); 584 spin_unlock_bh(&xprt->transport_lock); 585 status = -ETIMEDOUT; 586 } 587 588 if (req->rq_timeout == 0) { 589 printk(KERN_WARNING "xprt_adjust_timeout: rq_timeout = 0!\n"); 590 req->rq_timeout = 5 * HZ; 591 } 592 return status; 593 } 594 595 static void xprt_autoclose(struct work_struct *work) 596 { 597 struct rpc_xprt *xprt = 598 container_of(work, struct rpc_xprt, task_cleanup); 599 600 xprt->ops->close(xprt); 601 clear_bit(XPRT_CLOSE_WAIT, &xprt->state); 602 xprt_release_write(xprt, NULL); 603 } 604 605 /** 606 * xprt_disconnect_done - mark a transport as disconnected 607 * @xprt: transport to flag for disconnect 608 * 609 */ 610 void xprt_disconnect_done(struct rpc_xprt *xprt) 611 { 612 dprintk("RPC: disconnected transport %p\n", xprt); 613 spin_lock_bh(&xprt->transport_lock); 614 xprt_clear_connected(xprt); 615 xprt_wake_pending_tasks(xprt, -EAGAIN); 616 spin_unlock_bh(&xprt->transport_lock); 617 } 618 EXPORT_SYMBOL_GPL(xprt_disconnect_done); 619 620 /** 621 * xprt_force_disconnect - force a transport to disconnect 622 * @xprt: transport to disconnect 623 * 624 */ 625 void xprt_force_disconnect(struct rpc_xprt *xprt) 626 { 627 /* Don't race with the test_bit() in xprt_clear_locked() */ 628 spin_lock_bh(&xprt->transport_lock); 629 set_bit(XPRT_CLOSE_WAIT, &xprt->state); 630 /* Try to schedule an autoclose RPC call */ 631 if (test_and_set_bit(XPRT_LOCKED, &xprt->state) == 0) 632 queue_work(rpciod_workqueue, &xprt->task_cleanup); 633 xprt_wake_pending_tasks(xprt, -EAGAIN); 634 spin_unlock_bh(&xprt->transport_lock); 635 } 636 637 /** 638 * xprt_conditional_disconnect - force a transport to disconnect 639 * @xprt: transport to disconnect 640 * @cookie: 'connection cookie' 641 * 642 * This attempts to break the connection if and only if 'cookie' matches 643 * the current transport 'connection cookie'. It ensures that we don't 644 * try to break the connection more than once when we need to retransmit 645 * a batch of RPC requests. 646 * 647 */ 648 void xprt_conditional_disconnect(struct rpc_xprt *xprt, unsigned int cookie) 649 { 650 /* Don't race with the test_bit() in xprt_clear_locked() */ 651 spin_lock_bh(&xprt->transport_lock); 652 if (cookie != xprt->connect_cookie) 653 goto out; 654 if (test_bit(XPRT_CLOSING, &xprt->state) || !xprt_connected(xprt)) 655 goto out; 656 set_bit(XPRT_CLOSE_WAIT, &xprt->state); 657 /* Try to schedule an autoclose RPC call */ 658 if (test_and_set_bit(XPRT_LOCKED, &xprt->state) == 0) 659 queue_work(rpciod_workqueue, &xprt->task_cleanup); 660 xprt_wake_pending_tasks(xprt, -EAGAIN); 661 out: 662 spin_unlock_bh(&xprt->transport_lock); 663 } 664 665 static void 666 xprt_init_autodisconnect(unsigned long data) 667 { 668 struct rpc_xprt *xprt = (struct rpc_xprt *)data; 669 670 spin_lock(&xprt->transport_lock); 671 if (!list_empty(&xprt->recv) || xprt->shutdown) 672 goto out_abort; 673 if (test_and_set_bit(XPRT_LOCKED, &xprt->state)) 674 goto out_abort; 675 spin_unlock(&xprt->transport_lock); 676 set_bit(XPRT_CONNECTION_CLOSE, &xprt->state); 677 queue_work(rpciod_workqueue, &xprt->task_cleanup); 678 return; 679 out_abort: 680 spin_unlock(&xprt->transport_lock); 681 } 682 683 /** 684 * xprt_connect - schedule a transport connect operation 685 * @task: RPC task that is requesting the connect 686 * 687 */ 688 void xprt_connect(struct rpc_task *task) 689 { 690 struct rpc_xprt *xprt = task->tk_xprt; 691 692 dprintk("RPC: %5u xprt_connect xprt %p %s connected\n", task->tk_pid, 693 xprt, (xprt_connected(xprt) ? "is" : "is not")); 694 695 if (!xprt_bound(xprt)) { 696 task->tk_status = -EAGAIN; 697 return; 698 } 699 if (!xprt_lock_write(xprt, task)) 700 return; 701 702 if (test_and_clear_bit(XPRT_CLOSE_WAIT, &xprt->state)) 703 xprt->ops->close(xprt); 704 705 if (xprt_connected(xprt)) 706 xprt_release_write(xprt, task); 707 else { 708 if (task->tk_rqstp) 709 task->tk_rqstp->rq_bytes_sent = 0; 710 711 task->tk_timeout = task->tk_rqstp->rq_timeout; 712 rpc_sleep_on(&xprt->pending, task, xprt_connect_status); 713 714 if (test_bit(XPRT_CLOSING, &xprt->state)) 715 return; 716 if (xprt_test_and_set_connecting(xprt)) 717 return; 718 xprt->stat.connect_start = jiffies; 719 xprt->ops->connect(task); 720 } 721 } 722 723 static void xprt_connect_status(struct rpc_task *task) 724 { 725 struct rpc_xprt *xprt = task->tk_xprt; 726 727 if (task->tk_status == 0) { 728 xprt->stat.connect_count++; 729 xprt->stat.connect_time += (long)jiffies - xprt->stat.connect_start; 730 dprintk("RPC: %5u xprt_connect_status: connection established\n", 731 task->tk_pid); 732 return; 733 } 734 735 switch (task->tk_status) { 736 case -EAGAIN: 737 dprintk("RPC: %5u xprt_connect_status: retrying\n", task->tk_pid); 738 break; 739 case -ETIMEDOUT: 740 dprintk("RPC: %5u xprt_connect_status: connect attempt timed " 741 "out\n", task->tk_pid); 742 break; 743 default: 744 dprintk("RPC: %5u xprt_connect_status: error %d connecting to " 745 "server %s\n", task->tk_pid, -task->tk_status, 746 task->tk_client->cl_server); 747 xprt_release_write(xprt, task); 748 task->tk_status = -EIO; 749 } 750 } 751 752 /** 753 * xprt_lookup_rqst - find an RPC request corresponding to an XID 754 * @xprt: transport on which the original request was transmitted 755 * @xid: RPC XID of incoming reply 756 * 757 */ 758 struct rpc_rqst *xprt_lookup_rqst(struct rpc_xprt *xprt, __be32 xid) 759 { 760 struct list_head *pos; 761 762 list_for_each(pos, &xprt->recv) { 763 struct rpc_rqst *entry = list_entry(pos, struct rpc_rqst, rq_list); 764 if (entry->rq_xid == xid) 765 return entry; 766 } 767 768 dprintk("RPC: xprt_lookup_rqst did not find xid %08x\n", 769 ntohl(xid)); 770 xprt->stat.bad_xids++; 771 return NULL; 772 } 773 EXPORT_SYMBOL_GPL(xprt_lookup_rqst); 774 775 static void xprt_update_rtt(struct rpc_task *task) 776 { 777 struct rpc_rqst *req = task->tk_rqstp; 778 struct rpc_rtt *rtt = task->tk_client->cl_rtt; 779 unsigned timer = task->tk_msg.rpc_proc->p_timer; 780 long m = usecs_to_jiffies(ktime_to_us(req->rq_rtt)); 781 782 if (timer) { 783 if (req->rq_ntrans == 1) 784 rpc_update_rtt(rtt, timer, m); 785 rpc_set_timeo(rtt, timer, req->rq_ntrans - 1); 786 } 787 } 788 789 /** 790 * xprt_complete_rqst - called when reply processing is complete 791 * @task: RPC request that recently completed 792 * @copied: actual number of bytes received from the transport 793 * 794 * Caller holds transport lock. 795 */ 796 void xprt_complete_rqst(struct rpc_task *task, int copied) 797 { 798 struct rpc_rqst *req = task->tk_rqstp; 799 struct rpc_xprt *xprt = req->rq_xprt; 800 801 dprintk("RPC: %5u xid %08x complete (%d bytes received)\n", 802 task->tk_pid, ntohl(req->rq_xid), copied); 803 804 xprt->stat.recvs++; 805 req->rq_rtt = ktime_sub(ktime_get(), req->rq_xtime); 806 if (xprt->ops->timer != NULL) 807 xprt_update_rtt(task); 808 809 list_del_init(&req->rq_list); 810 req->rq_private_buf.len = copied; 811 /* Ensure all writes are done before we update */ 812 /* req->rq_reply_bytes_recvd */ 813 smp_wmb(); 814 req->rq_reply_bytes_recvd = copied; 815 rpc_wake_up_queued_task(&xprt->pending, task); 816 } 817 EXPORT_SYMBOL_GPL(xprt_complete_rqst); 818 819 static void xprt_timer(struct rpc_task *task) 820 { 821 struct rpc_rqst *req = task->tk_rqstp; 822 struct rpc_xprt *xprt = req->rq_xprt; 823 824 if (task->tk_status != -ETIMEDOUT) 825 return; 826 dprintk("RPC: %5u xprt_timer\n", task->tk_pid); 827 828 spin_lock_bh(&xprt->transport_lock); 829 if (!req->rq_reply_bytes_recvd) { 830 if (xprt->ops->timer) 831 xprt->ops->timer(task); 832 } else 833 task->tk_status = 0; 834 spin_unlock_bh(&xprt->transport_lock); 835 } 836 837 static inline int xprt_has_timer(struct rpc_xprt *xprt) 838 { 839 return xprt->idle_timeout != 0; 840 } 841 842 /** 843 * xprt_prepare_transmit - reserve the transport before sending a request 844 * @task: RPC task about to send a request 845 * 846 */ 847 int xprt_prepare_transmit(struct rpc_task *task) 848 { 849 struct rpc_rqst *req = task->tk_rqstp; 850 struct rpc_xprt *xprt = req->rq_xprt; 851 int err = 0; 852 853 dprintk("RPC: %5u xprt_prepare_transmit\n", task->tk_pid); 854 855 spin_lock_bh(&xprt->transport_lock); 856 if (req->rq_reply_bytes_recvd && !req->rq_bytes_sent) { 857 err = req->rq_reply_bytes_recvd; 858 goto out_unlock; 859 } 860 if (!xprt->ops->reserve_xprt(task)) 861 err = -EAGAIN; 862 out_unlock: 863 spin_unlock_bh(&xprt->transport_lock); 864 return err; 865 } 866 867 void xprt_end_transmit(struct rpc_task *task) 868 { 869 xprt_release_write(task->tk_rqstp->rq_xprt, task); 870 } 871 872 /** 873 * xprt_transmit - send an RPC request on a transport 874 * @task: controlling RPC task 875 * 876 * We have to copy the iovec because sendmsg fiddles with its contents. 877 */ 878 void xprt_transmit(struct rpc_task *task) 879 { 880 struct rpc_rqst *req = task->tk_rqstp; 881 struct rpc_xprt *xprt = req->rq_xprt; 882 int status; 883 884 dprintk("RPC: %5u xprt_transmit(%u)\n", task->tk_pid, req->rq_slen); 885 886 if (!req->rq_reply_bytes_recvd) { 887 if (list_empty(&req->rq_list) && rpc_reply_expected(task)) { 888 /* 889 * Add to the list only if we're expecting a reply 890 */ 891 spin_lock_bh(&xprt->transport_lock); 892 /* Update the softirq receive buffer */ 893 memcpy(&req->rq_private_buf, &req->rq_rcv_buf, 894 sizeof(req->rq_private_buf)); 895 /* Add request to the receive list */ 896 list_add_tail(&req->rq_list, &xprt->recv); 897 spin_unlock_bh(&xprt->transport_lock); 898 xprt_reset_majortimeo(req); 899 /* Turn off autodisconnect */ 900 del_singleshot_timer_sync(&xprt->timer); 901 } 902 } else if (!req->rq_bytes_sent) 903 return; 904 905 req->rq_connect_cookie = xprt->connect_cookie; 906 req->rq_xtime = ktime_get(); 907 status = xprt->ops->send_request(task); 908 if (status != 0) { 909 task->tk_status = status; 910 return; 911 } 912 913 dprintk("RPC: %5u xmit complete\n", task->tk_pid); 914 spin_lock_bh(&xprt->transport_lock); 915 916 xprt->ops->set_retrans_timeout(task); 917 918 xprt->stat.sends++; 919 xprt->stat.req_u += xprt->stat.sends - xprt->stat.recvs; 920 xprt->stat.bklog_u += xprt->backlog.qlen; 921 922 /* Don't race with disconnect */ 923 if (!xprt_connected(xprt)) 924 task->tk_status = -ENOTCONN; 925 else if (!req->rq_reply_bytes_recvd && rpc_reply_expected(task)) { 926 /* 927 * Sleep on the pending queue since 928 * we're expecting a reply. 929 */ 930 rpc_sleep_on(&xprt->pending, task, xprt_timer); 931 } 932 spin_unlock_bh(&xprt->transport_lock); 933 } 934 935 static void xprt_alloc_slot(struct rpc_task *task) 936 { 937 struct rpc_xprt *xprt = task->tk_xprt; 938 939 task->tk_status = 0; 940 if (task->tk_rqstp) 941 return; 942 if (!list_empty(&xprt->free)) { 943 struct rpc_rqst *req = list_entry(xprt->free.next, struct rpc_rqst, rq_list); 944 list_del_init(&req->rq_list); 945 task->tk_rqstp = req; 946 xprt_request_init(task, xprt); 947 return; 948 } 949 dprintk("RPC: waiting for request slot\n"); 950 task->tk_status = -EAGAIN; 951 task->tk_timeout = 0; 952 rpc_sleep_on(&xprt->backlog, task, NULL); 953 } 954 955 static void xprt_free_slot(struct rpc_xprt *xprt, struct rpc_rqst *req) 956 { 957 memset(req, 0, sizeof(*req)); /* mark unused */ 958 959 spin_lock(&xprt->reserve_lock); 960 list_add(&req->rq_list, &xprt->free); 961 rpc_wake_up_next(&xprt->backlog); 962 spin_unlock(&xprt->reserve_lock); 963 } 964 965 /** 966 * xprt_reserve - allocate an RPC request slot 967 * @task: RPC task requesting a slot allocation 968 * 969 * If no more slots are available, place the task on the transport's 970 * backlog queue. 971 */ 972 void xprt_reserve(struct rpc_task *task) 973 { 974 struct rpc_xprt *xprt = task->tk_xprt; 975 976 task->tk_status = -EIO; 977 spin_lock(&xprt->reserve_lock); 978 xprt_alloc_slot(task); 979 spin_unlock(&xprt->reserve_lock); 980 } 981 982 static inline __be32 xprt_alloc_xid(struct rpc_xprt *xprt) 983 { 984 return (__force __be32)xprt->xid++; 985 } 986 987 static inline void xprt_init_xid(struct rpc_xprt *xprt) 988 { 989 xprt->xid = net_random(); 990 } 991 992 static void xprt_request_init(struct rpc_task *task, struct rpc_xprt *xprt) 993 { 994 struct rpc_rqst *req = task->tk_rqstp; 995 996 req->rq_timeout = task->tk_client->cl_timeout->to_initval; 997 req->rq_task = task; 998 req->rq_xprt = xprt; 999 req->rq_buffer = NULL; 1000 req->rq_xid = xprt_alloc_xid(xprt); 1001 req->rq_release_snd_buf = NULL; 1002 xprt_reset_majortimeo(req); 1003 dprintk("RPC: %5u reserved req %p xid %08x\n", task->tk_pid, 1004 req, ntohl(req->rq_xid)); 1005 } 1006 1007 /** 1008 * xprt_release - release an RPC request slot 1009 * @task: task which is finished with the slot 1010 * 1011 */ 1012 void xprt_release(struct rpc_task *task) 1013 { 1014 struct rpc_xprt *xprt; 1015 struct rpc_rqst *req; 1016 1017 if (!(req = task->tk_rqstp)) 1018 return; 1019 1020 xprt = req->rq_xprt; 1021 rpc_count_iostats(task); 1022 spin_lock_bh(&xprt->transport_lock); 1023 xprt->ops->release_xprt(xprt, task); 1024 if (xprt->ops->release_request) 1025 xprt->ops->release_request(task); 1026 if (!list_empty(&req->rq_list)) 1027 list_del(&req->rq_list); 1028 xprt->last_used = jiffies; 1029 if (list_empty(&xprt->recv) && xprt_has_timer(xprt)) 1030 mod_timer(&xprt->timer, 1031 xprt->last_used + xprt->idle_timeout); 1032 spin_unlock_bh(&xprt->transport_lock); 1033 if (req->rq_buffer) 1034 xprt->ops->buf_free(req->rq_buffer); 1035 if (req->rq_cred != NULL) 1036 put_rpccred(req->rq_cred); 1037 task->tk_rqstp = NULL; 1038 if (req->rq_release_snd_buf) 1039 req->rq_release_snd_buf(req); 1040 1041 dprintk("RPC: %5u release request %p\n", task->tk_pid, req); 1042 if (likely(!bc_prealloc(req))) 1043 xprt_free_slot(xprt, req); 1044 else 1045 xprt_free_bc_request(req); 1046 } 1047 1048 /** 1049 * xprt_create_transport - create an RPC transport 1050 * @args: rpc transport creation arguments 1051 * 1052 */ 1053 struct rpc_xprt *xprt_create_transport(struct xprt_create *args) 1054 { 1055 struct rpc_xprt *xprt; 1056 struct rpc_rqst *req; 1057 struct xprt_class *t; 1058 1059 spin_lock(&xprt_list_lock); 1060 list_for_each_entry(t, &xprt_list, list) { 1061 if (t->ident == args->ident) { 1062 spin_unlock(&xprt_list_lock); 1063 goto found; 1064 } 1065 } 1066 spin_unlock(&xprt_list_lock); 1067 printk(KERN_ERR "RPC: transport (%d) not supported\n", args->ident); 1068 return ERR_PTR(-EIO); 1069 1070 found: 1071 xprt = t->setup(args); 1072 if (IS_ERR(xprt)) { 1073 dprintk("RPC: xprt_create_transport: failed, %ld\n", 1074 -PTR_ERR(xprt)); 1075 return xprt; 1076 } 1077 1078 kref_init(&xprt->kref); 1079 spin_lock_init(&xprt->transport_lock); 1080 spin_lock_init(&xprt->reserve_lock); 1081 1082 INIT_LIST_HEAD(&xprt->free); 1083 INIT_LIST_HEAD(&xprt->recv); 1084 #if defined(CONFIG_NFS_V4_1) 1085 spin_lock_init(&xprt->bc_pa_lock); 1086 INIT_LIST_HEAD(&xprt->bc_pa_list); 1087 #endif /* CONFIG_NFS_V4_1 */ 1088 1089 INIT_WORK(&xprt->task_cleanup, xprt_autoclose); 1090 if (xprt_has_timer(xprt)) 1091 setup_timer(&xprt->timer, xprt_init_autodisconnect, 1092 (unsigned long)xprt); 1093 else 1094 init_timer(&xprt->timer); 1095 xprt->last_used = jiffies; 1096 xprt->cwnd = RPC_INITCWND; 1097 xprt->bind_index = 0; 1098 1099 rpc_init_wait_queue(&xprt->binding, "xprt_binding"); 1100 rpc_init_wait_queue(&xprt->pending, "xprt_pending"); 1101 rpc_init_wait_queue(&xprt->sending, "xprt_sending"); 1102 rpc_init_wait_queue(&xprt->resend, "xprt_resend"); 1103 rpc_init_priority_wait_queue(&xprt->backlog, "xprt_backlog"); 1104 1105 /* initialize free list */ 1106 for (req = &xprt->slot[xprt->max_reqs-1]; req >= &xprt->slot[0]; req--) 1107 list_add(&req->rq_list, &xprt->free); 1108 1109 xprt_init_xid(xprt); 1110 1111 dprintk("RPC: created transport %p with %u slots\n", xprt, 1112 xprt->max_reqs); 1113 return xprt; 1114 } 1115 1116 /** 1117 * xprt_destroy - destroy an RPC transport, killing off all requests. 1118 * @kref: kref for the transport to destroy 1119 * 1120 */ 1121 static void xprt_destroy(struct kref *kref) 1122 { 1123 struct rpc_xprt *xprt = container_of(kref, struct rpc_xprt, kref); 1124 1125 dprintk("RPC: destroying transport %p\n", xprt); 1126 xprt->shutdown = 1; 1127 del_timer_sync(&xprt->timer); 1128 1129 rpc_destroy_wait_queue(&xprt->binding); 1130 rpc_destroy_wait_queue(&xprt->pending); 1131 rpc_destroy_wait_queue(&xprt->sending); 1132 rpc_destroy_wait_queue(&xprt->resend); 1133 rpc_destroy_wait_queue(&xprt->backlog); 1134 cancel_work_sync(&xprt->task_cleanup); 1135 /* 1136 * Tear down transport state and free the rpc_xprt 1137 */ 1138 xprt->ops->destroy(xprt); 1139 } 1140 1141 /** 1142 * xprt_put - release a reference to an RPC transport. 1143 * @xprt: pointer to the transport 1144 * 1145 */ 1146 void xprt_put(struct rpc_xprt *xprt) 1147 { 1148 kref_put(&xprt->kref, xprt_destroy); 1149 } 1150 1151 /** 1152 * xprt_get - return a reference to an RPC transport. 1153 * @xprt: pointer to the transport 1154 * 1155 */ 1156 struct rpc_xprt *xprt_get(struct rpc_xprt *xprt) 1157 { 1158 kref_get(&xprt->kref); 1159 return xprt; 1160 } 1161