1 /* 2 * linux/net/sunrpc/xprt.c 3 * 4 * This is a generic RPC call interface supporting congestion avoidance, 5 * and asynchronous calls. 6 * 7 * The interface works like this: 8 * 9 * - When a process places a call, it allocates a request slot if 10 * one is available. Otherwise, it sleeps on the backlog queue 11 * (xprt_reserve). 12 * - Next, the caller puts together the RPC message, stuffs it into 13 * the request struct, and calls xprt_transmit(). 14 * - xprt_transmit sends the message and installs the caller on the 15 * transport's wait list. At the same time, if a reply is expected, 16 * it installs a timer that is run after the packet's timeout has 17 * expired. 18 * - When a packet arrives, the data_ready handler walks the list of 19 * pending requests for that transport. If a matching XID is found, the 20 * caller is woken up, and the timer removed. 21 * - When no reply arrives within the timeout interval, the timer is 22 * fired by the kernel and runs xprt_timer(). It either adjusts the 23 * timeout values (minor timeout) or wakes up the caller with a status 24 * of -ETIMEDOUT. 25 * - When the caller receives a notification from RPC that a reply arrived, 26 * it should release the RPC slot, and process the reply. 27 * If the call timed out, it may choose to retry the operation by 28 * adjusting the initial timeout value, and simply calling rpc_call 29 * again. 30 * 31 * Support for async RPC is done through a set of RPC-specific scheduling 32 * primitives that `transparently' work for processes as well as async 33 * tasks that rely on callbacks. 34 * 35 * Copyright (C) 1995-1997, Olaf Kirch <okir@monad.swb.de> 36 * 37 * Transport switch API copyright (C) 2005, Chuck Lever <cel@netapp.com> 38 */ 39 40 #include <linux/module.h> 41 42 #include <linux/types.h> 43 #include <linux/interrupt.h> 44 #include <linux/workqueue.h> 45 #include <linux/net.h> 46 #include <linux/ktime.h> 47 48 #include <linux/sunrpc/clnt.h> 49 #include <linux/sunrpc/metrics.h> 50 #include <linux/sunrpc/bc_xprt.h> 51 52 #include "sunrpc.h" 53 54 /* 55 * Local variables 56 */ 57 58 #ifdef RPC_DEBUG 59 # define RPCDBG_FACILITY RPCDBG_XPRT 60 #endif 61 62 /* 63 * Local functions 64 */ 65 static void xprt_request_init(struct rpc_task *, struct rpc_xprt *); 66 static void xprt_connect_status(struct rpc_task *task); 67 static int __xprt_get_cong(struct rpc_xprt *, struct rpc_task *); 68 69 static DEFINE_SPINLOCK(xprt_list_lock); 70 static LIST_HEAD(xprt_list); 71 72 /* 73 * The transport code maintains an estimate on the maximum number of out- 74 * standing RPC requests, using a smoothed version of the congestion 75 * avoidance implemented in 44BSD. This is basically the Van Jacobson 76 * congestion algorithm: If a retransmit occurs, the congestion window is 77 * halved; otherwise, it is incremented by 1/cwnd when 78 * 79 * - a reply is received and 80 * - a full number of requests are outstanding and 81 * - the congestion window hasn't been updated recently. 82 */ 83 #define RPC_CWNDSHIFT (8U) 84 #define RPC_CWNDSCALE (1U << RPC_CWNDSHIFT) 85 #define RPC_INITCWND RPC_CWNDSCALE 86 #define RPC_MAXCWND(xprt) ((xprt)->max_reqs << RPC_CWNDSHIFT) 87 88 #define RPCXPRT_CONGESTED(xprt) ((xprt)->cong >= (xprt)->cwnd) 89 90 /** 91 * xprt_register_transport - register a transport implementation 92 * @transport: transport to register 93 * 94 * If a transport implementation is loaded as a kernel module, it can 95 * call this interface to make itself known to the RPC client. 96 * 97 * Returns: 98 * 0: transport successfully registered 99 * -EEXIST: transport already registered 100 * -EINVAL: transport module being unloaded 101 */ 102 int xprt_register_transport(struct xprt_class *transport) 103 { 104 struct xprt_class *t; 105 int result; 106 107 result = -EEXIST; 108 spin_lock(&xprt_list_lock); 109 list_for_each_entry(t, &xprt_list, list) { 110 /* don't register the same transport class twice */ 111 if (t->ident == transport->ident) 112 goto out; 113 } 114 115 list_add_tail(&transport->list, &xprt_list); 116 printk(KERN_INFO "RPC: Registered %s transport module.\n", 117 transport->name); 118 result = 0; 119 120 out: 121 spin_unlock(&xprt_list_lock); 122 return result; 123 } 124 EXPORT_SYMBOL_GPL(xprt_register_transport); 125 126 /** 127 * xprt_unregister_transport - unregister a transport implementation 128 * @transport: transport to unregister 129 * 130 * Returns: 131 * 0: transport successfully unregistered 132 * -ENOENT: transport never registered 133 */ 134 int xprt_unregister_transport(struct xprt_class *transport) 135 { 136 struct xprt_class *t; 137 int result; 138 139 result = 0; 140 spin_lock(&xprt_list_lock); 141 list_for_each_entry(t, &xprt_list, list) { 142 if (t == transport) { 143 printk(KERN_INFO 144 "RPC: Unregistered %s transport module.\n", 145 transport->name); 146 list_del_init(&transport->list); 147 goto out; 148 } 149 } 150 result = -ENOENT; 151 152 out: 153 spin_unlock(&xprt_list_lock); 154 return result; 155 } 156 EXPORT_SYMBOL_GPL(xprt_unregister_transport); 157 158 /** 159 * xprt_load_transport - load a transport implementation 160 * @transport_name: transport to load 161 * 162 * Returns: 163 * 0: transport successfully loaded 164 * -ENOENT: transport module not available 165 */ 166 int xprt_load_transport(const char *transport_name) 167 { 168 struct xprt_class *t; 169 int result; 170 171 result = 0; 172 spin_lock(&xprt_list_lock); 173 list_for_each_entry(t, &xprt_list, list) { 174 if (strcmp(t->name, transport_name) == 0) { 175 spin_unlock(&xprt_list_lock); 176 goto out; 177 } 178 } 179 spin_unlock(&xprt_list_lock); 180 result = request_module("xprt%s", transport_name); 181 out: 182 return result; 183 } 184 EXPORT_SYMBOL_GPL(xprt_load_transport); 185 186 /** 187 * xprt_reserve_xprt - serialize write access to transports 188 * @task: task that is requesting access to the transport 189 * 190 * This prevents mixing the payload of separate requests, and prevents 191 * transport connects from colliding with writes. No congestion control 192 * is provided. 193 */ 194 int xprt_reserve_xprt(struct rpc_task *task) 195 { 196 struct rpc_rqst *req = task->tk_rqstp; 197 struct rpc_xprt *xprt = req->rq_xprt; 198 199 if (test_and_set_bit(XPRT_LOCKED, &xprt->state)) { 200 if (task == xprt->snd_task) 201 return 1; 202 if (task == NULL) 203 return 0; 204 goto out_sleep; 205 } 206 xprt->snd_task = task; 207 if (req) { 208 req->rq_bytes_sent = 0; 209 req->rq_ntrans++; 210 } 211 return 1; 212 213 out_sleep: 214 dprintk("RPC: %5u failed to lock transport %p\n", 215 task->tk_pid, xprt); 216 task->tk_timeout = 0; 217 task->tk_status = -EAGAIN; 218 if (req && req->rq_ntrans) 219 rpc_sleep_on(&xprt->resend, task, NULL); 220 else 221 rpc_sleep_on(&xprt->sending, task, NULL); 222 return 0; 223 } 224 EXPORT_SYMBOL_GPL(xprt_reserve_xprt); 225 226 static void xprt_clear_locked(struct rpc_xprt *xprt) 227 { 228 xprt->snd_task = NULL; 229 if (!test_bit(XPRT_CLOSE_WAIT, &xprt->state) || xprt->shutdown) { 230 smp_mb__before_clear_bit(); 231 clear_bit(XPRT_LOCKED, &xprt->state); 232 smp_mb__after_clear_bit(); 233 } else 234 queue_work(rpciod_workqueue, &xprt->task_cleanup); 235 } 236 237 /* 238 * xprt_reserve_xprt_cong - serialize write access to transports 239 * @task: task that is requesting access to the transport 240 * 241 * Same as xprt_reserve_xprt, but Van Jacobson congestion control is 242 * integrated into the decision of whether a request is allowed to be 243 * woken up and given access to the transport. 244 */ 245 int xprt_reserve_xprt_cong(struct rpc_task *task) 246 { 247 struct rpc_xprt *xprt = task->tk_xprt; 248 struct rpc_rqst *req = task->tk_rqstp; 249 250 if (test_and_set_bit(XPRT_LOCKED, &xprt->state)) { 251 if (task == xprt->snd_task) 252 return 1; 253 goto out_sleep; 254 } 255 if (__xprt_get_cong(xprt, task)) { 256 xprt->snd_task = task; 257 if (req) { 258 req->rq_bytes_sent = 0; 259 req->rq_ntrans++; 260 } 261 return 1; 262 } 263 xprt_clear_locked(xprt); 264 out_sleep: 265 dprintk("RPC: %5u failed to lock transport %p\n", task->tk_pid, xprt); 266 task->tk_timeout = 0; 267 task->tk_status = -EAGAIN; 268 if (req && req->rq_ntrans) 269 rpc_sleep_on(&xprt->resend, task, NULL); 270 else 271 rpc_sleep_on(&xprt->sending, task, NULL); 272 return 0; 273 } 274 EXPORT_SYMBOL_GPL(xprt_reserve_xprt_cong); 275 276 static inline int xprt_lock_write(struct rpc_xprt *xprt, struct rpc_task *task) 277 { 278 int retval; 279 280 spin_lock_bh(&xprt->transport_lock); 281 retval = xprt->ops->reserve_xprt(task); 282 spin_unlock_bh(&xprt->transport_lock); 283 return retval; 284 } 285 286 static void __xprt_lock_write_next(struct rpc_xprt *xprt) 287 { 288 struct rpc_task *task; 289 struct rpc_rqst *req; 290 291 if (test_and_set_bit(XPRT_LOCKED, &xprt->state)) 292 return; 293 294 task = rpc_wake_up_next(&xprt->resend); 295 if (!task) { 296 task = rpc_wake_up_next(&xprt->sending); 297 if (!task) 298 goto out_unlock; 299 } 300 301 req = task->tk_rqstp; 302 xprt->snd_task = task; 303 if (req) { 304 req->rq_bytes_sent = 0; 305 req->rq_ntrans++; 306 } 307 return; 308 309 out_unlock: 310 xprt_clear_locked(xprt); 311 } 312 313 static void __xprt_lock_write_next_cong(struct rpc_xprt *xprt) 314 { 315 struct rpc_task *task; 316 317 if (test_and_set_bit(XPRT_LOCKED, &xprt->state)) 318 return; 319 if (RPCXPRT_CONGESTED(xprt)) 320 goto out_unlock; 321 task = rpc_wake_up_next(&xprt->resend); 322 if (!task) { 323 task = rpc_wake_up_next(&xprt->sending); 324 if (!task) 325 goto out_unlock; 326 } 327 if (__xprt_get_cong(xprt, task)) { 328 struct rpc_rqst *req = task->tk_rqstp; 329 xprt->snd_task = task; 330 if (req) { 331 req->rq_bytes_sent = 0; 332 req->rq_ntrans++; 333 } 334 return; 335 } 336 out_unlock: 337 xprt_clear_locked(xprt); 338 } 339 340 /** 341 * xprt_release_xprt - allow other requests to use a transport 342 * @xprt: transport with other tasks potentially waiting 343 * @task: task that is releasing access to the transport 344 * 345 * Note that "task" can be NULL. No congestion control is provided. 346 */ 347 void xprt_release_xprt(struct rpc_xprt *xprt, struct rpc_task *task) 348 { 349 if (xprt->snd_task == task) { 350 xprt_clear_locked(xprt); 351 __xprt_lock_write_next(xprt); 352 } 353 } 354 EXPORT_SYMBOL_GPL(xprt_release_xprt); 355 356 /** 357 * xprt_release_xprt_cong - allow other requests to use a transport 358 * @xprt: transport with other tasks potentially waiting 359 * @task: task that is releasing access to the transport 360 * 361 * Note that "task" can be NULL. Another task is awoken to use the 362 * transport if the transport's congestion window allows it. 363 */ 364 void xprt_release_xprt_cong(struct rpc_xprt *xprt, struct rpc_task *task) 365 { 366 if (xprt->snd_task == task) { 367 xprt_clear_locked(xprt); 368 __xprt_lock_write_next_cong(xprt); 369 } 370 } 371 EXPORT_SYMBOL_GPL(xprt_release_xprt_cong); 372 373 static inline void xprt_release_write(struct rpc_xprt *xprt, struct rpc_task *task) 374 { 375 spin_lock_bh(&xprt->transport_lock); 376 xprt->ops->release_xprt(xprt, task); 377 spin_unlock_bh(&xprt->transport_lock); 378 } 379 380 /* 381 * Van Jacobson congestion avoidance. Check if the congestion window 382 * overflowed. Put the task to sleep if this is the case. 383 */ 384 static int 385 __xprt_get_cong(struct rpc_xprt *xprt, struct rpc_task *task) 386 { 387 struct rpc_rqst *req = task->tk_rqstp; 388 389 if (req->rq_cong) 390 return 1; 391 dprintk("RPC: %5u xprt_cwnd_limited cong = %lu cwnd = %lu\n", 392 task->tk_pid, xprt->cong, xprt->cwnd); 393 if (RPCXPRT_CONGESTED(xprt)) 394 return 0; 395 req->rq_cong = 1; 396 xprt->cong += RPC_CWNDSCALE; 397 return 1; 398 } 399 400 /* 401 * Adjust the congestion window, and wake up the next task 402 * that has been sleeping due to congestion 403 */ 404 static void 405 __xprt_put_cong(struct rpc_xprt *xprt, struct rpc_rqst *req) 406 { 407 if (!req->rq_cong) 408 return; 409 req->rq_cong = 0; 410 xprt->cong -= RPC_CWNDSCALE; 411 __xprt_lock_write_next_cong(xprt); 412 } 413 414 /** 415 * xprt_release_rqst_cong - housekeeping when request is complete 416 * @task: RPC request that recently completed 417 * 418 * Useful for transports that require congestion control. 419 */ 420 void xprt_release_rqst_cong(struct rpc_task *task) 421 { 422 __xprt_put_cong(task->tk_xprt, task->tk_rqstp); 423 } 424 EXPORT_SYMBOL_GPL(xprt_release_rqst_cong); 425 426 /** 427 * xprt_adjust_cwnd - adjust transport congestion window 428 * @task: recently completed RPC request used to adjust window 429 * @result: result code of completed RPC request 430 * 431 * We use a time-smoothed congestion estimator to avoid heavy oscillation. 432 */ 433 void xprt_adjust_cwnd(struct rpc_task *task, int result) 434 { 435 struct rpc_rqst *req = task->tk_rqstp; 436 struct rpc_xprt *xprt = task->tk_xprt; 437 unsigned long cwnd = xprt->cwnd; 438 439 if (result >= 0 && cwnd <= xprt->cong) { 440 /* The (cwnd >> 1) term makes sure 441 * the result gets rounded properly. */ 442 cwnd += (RPC_CWNDSCALE * RPC_CWNDSCALE + (cwnd >> 1)) / cwnd; 443 if (cwnd > RPC_MAXCWND(xprt)) 444 cwnd = RPC_MAXCWND(xprt); 445 __xprt_lock_write_next_cong(xprt); 446 } else if (result == -ETIMEDOUT) { 447 cwnd >>= 1; 448 if (cwnd < RPC_CWNDSCALE) 449 cwnd = RPC_CWNDSCALE; 450 } 451 dprintk("RPC: cong %ld, cwnd was %ld, now %ld\n", 452 xprt->cong, xprt->cwnd, cwnd); 453 xprt->cwnd = cwnd; 454 __xprt_put_cong(xprt, req); 455 } 456 EXPORT_SYMBOL_GPL(xprt_adjust_cwnd); 457 458 /** 459 * xprt_wake_pending_tasks - wake all tasks on a transport's pending queue 460 * @xprt: transport with waiting tasks 461 * @status: result code to plant in each task before waking it 462 * 463 */ 464 void xprt_wake_pending_tasks(struct rpc_xprt *xprt, int status) 465 { 466 if (status < 0) 467 rpc_wake_up_status(&xprt->pending, status); 468 else 469 rpc_wake_up(&xprt->pending); 470 } 471 EXPORT_SYMBOL_GPL(xprt_wake_pending_tasks); 472 473 /** 474 * xprt_wait_for_buffer_space - wait for transport output buffer to clear 475 * @task: task to be put to sleep 476 * @action: function pointer to be executed after wait 477 */ 478 void xprt_wait_for_buffer_space(struct rpc_task *task, rpc_action action) 479 { 480 struct rpc_rqst *req = task->tk_rqstp; 481 struct rpc_xprt *xprt = req->rq_xprt; 482 483 task->tk_timeout = req->rq_timeout; 484 rpc_sleep_on(&xprt->pending, task, action); 485 } 486 EXPORT_SYMBOL_GPL(xprt_wait_for_buffer_space); 487 488 /** 489 * xprt_write_space - wake the task waiting for transport output buffer space 490 * @xprt: transport with waiting tasks 491 * 492 * Can be called in a soft IRQ context, so xprt_write_space never sleeps. 493 */ 494 void xprt_write_space(struct rpc_xprt *xprt) 495 { 496 if (unlikely(xprt->shutdown)) 497 return; 498 499 spin_lock_bh(&xprt->transport_lock); 500 if (xprt->snd_task) { 501 dprintk("RPC: write space: waking waiting task on " 502 "xprt %p\n", xprt); 503 rpc_wake_up_queued_task(&xprt->pending, xprt->snd_task); 504 } 505 spin_unlock_bh(&xprt->transport_lock); 506 } 507 EXPORT_SYMBOL_GPL(xprt_write_space); 508 509 /** 510 * xprt_set_retrans_timeout_def - set a request's retransmit timeout 511 * @task: task whose timeout is to be set 512 * 513 * Set a request's retransmit timeout based on the transport's 514 * default timeout parameters. Used by transports that don't adjust 515 * the retransmit timeout based on round-trip time estimation. 516 */ 517 void xprt_set_retrans_timeout_def(struct rpc_task *task) 518 { 519 task->tk_timeout = task->tk_rqstp->rq_timeout; 520 } 521 EXPORT_SYMBOL_GPL(xprt_set_retrans_timeout_def); 522 523 /* 524 * xprt_set_retrans_timeout_rtt - set a request's retransmit timeout 525 * @task: task whose timeout is to be set 526 * 527 * Set a request's retransmit timeout using the RTT estimator. 528 */ 529 void xprt_set_retrans_timeout_rtt(struct rpc_task *task) 530 { 531 int timer = task->tk_msg.rpc_proc->p_timer; 532 struct rpc_clnt *clnt = task->tk_client; 533 struct rpc_rtt *rtt = clnt->cl_rtt; 534 struct rpc_rqst *req = task->tk_rqstp; 535 unsigned long max_timeout = clnt->cl_timeout->to_maxval; 536 537 task->tk_timeout = rpc_calc_rto(rtt, timer); 538 task->tk_timeout <<= rpc_ntimeo(rtt, timer) + req->rq_retries; 539 if (task->tk_timeout > max_timeout || task->tk_timeout == 0) 540 task->tk_timeout = max_timeout; 541 } 542 EXPORT_SYMBOL_GPL(xprt_set_retrans_timeout_rtt); 543 544 static void xprt_reset_majortimeo(struct rpc_rqst *req) 545 { 546 const struct rpc_timeout *to = req->rq_task->tk_client->cl_timeout; 547 548 req->rq_majortimeo = req->rq_timeout; 549 if (to->to_exponential) 550 req->rq_majortimeo <<= to->to_retries; 551 else 552 req->rq_majortimeo += to->to_increment * to->to_retries; 553 if (req->rq_majortimeo > to->to_maxval || req->rq_majortimeo == 0) 554 req->rq_majortimeo = to->to_maxval; 555 req->rq_majortimeo += jiffies; 556 } 557 558 /** 559 * xprt_adjust_timeout - adjust timeout values for next retransmit 560 * @req: RPC request containing parameters to use for the adjustment 561 * 562 */ 563 int xprt_adjust_timeout(struct rpc_rqst *req) 564 { 565 struct rpc_xprt *xprt = req->rq_xprt; 566 const struct rpc_timeout *to = req->rq_task->tk_client->cl_timeout; 567 int status = 0; 568 569 if (time_before(jiffies, req->rq_majortimeo)) { 570 if (to->to_exponential) 571 req->rq_timeout <<= 1; 572 else 573 req->rq_timeout += to->to_increment; 574 if (to->to_maxval && req->rq_timeout >= to->to_maxval) 575 req->rq_timeout = to->to_maxval; 576 req->rq_retries++; 577 } else { 578 req->rq_timeout = to->to_initval; 579 req->rq_retries = 0; 580 xprt_reset_majortimeo(req); 581 /* Reset the RTT counters == "slow start" */ 582 spin_lock_bh(&xprt->transport_lock); 583 rpc_init_rtt(req->rq_task->tk_client->cl_rtt, to->to_initval); 584 spin_unlock_bh(&xprt->transport_lock); 585 status = -ETIMEDOUT; 586 } 587 588 if (req->rq_timeout == 0) { 589 printk(KERN_WARNING "xprt_adjust_timeout: rq_timeout = 0!\n"); 590 req->rq_timeout = 5 * HZ; 591 } 592 return status; 593 } 594 595 static void xprt_autoclose(struct work_struct *work) 596 { 597 struct rpc_xprt *xprt = 598 container_of(work, struct rpc_xprt, task_cleanup); 599 600 xprt->ops->close(xprt); 601 clear_bit(XPRT_CLOSE_WAIT, &xprt->state); 602 xprt_release_write(xprt, NULL); 603 } 604 605 /** 606 * xprt_disconnect_done - mark a transport as disconnected 607 * @xprt: transport to flag for disconnect 608 * 609 */ 610 void xprt_disconnect_done(struct rpc_xprt *xprt) 611 { 612 dprintk("RPC: disconnected transport %p\n", xprt); 613 spin_lock_bh(&xprt->transport_lock); 614 xprt_clear_connected(xprt); 615 xprt_wake_pending_tasks(xprt, -EAGAIN); 616 spin_unlock_bh(&xprt->transport_lock); 617 } 618 EXPORT_SYMBOL_GPL(xprt_disconnect_done); 619 620 /** 621 * xprt_force_disconnect - force a transport to disconnect 622 * @xprt: transport to disconnect 623 * 624 */ 625 void xprt_force_disconnect(struct rpc_xprt *xprt) 626 { 627 /* Don't race with the test_bit() in xprt_clear_locked() */ 628 spin_lock_bh(&xprt->transport_lock); 629 set_bit(XPRT_CLOSE_WAIT, &xprt->state); 630 /* Try to schedule an autoclose RPC call */ 631 if (test_and_set_bit(XPRT_LOCKED, &xprt->state) == 0) 632 queue_work(rpciod_workqueue, &xprt->task_cleanup); 633 xprt_wake_pending_tasks(xprt, -EAGAIN); 634 spin_unlock_bh(&xprt->transport_lock); 635 } 636 637 /** 638 * xprt_conditional_disconnect - force a transport to disconnect 639 * @xprt: transport to disconnect 640 * @cookie: 'connection cookie' 641 * 642 * This attempts to break the connection if and only if 'cookie' matches 643 * the current transport 'connection cookie'. It ensures that we don't 644 * try to break the connection more than once when we need to retransmit 645 * a batch of RPC requests. 646 * 647 */ 648 void xprt_conditional_disconnect(struct rpc_xprt *xprt, unsigned int cookie) 649 { 650 /* Don't race with the test_bit() in xprt_clear_locked() */ 651 spin_lock_bh(&xprt->transport_lock); 652 if (cookie != xprt->connect_cookie) 653 goto out; 654 if (test_bit(XPRT_CLOSING, &xprt->state) || !xprt_connected(xprt)) 655 goto out; 656 set_bit(XPRT_CLOSE_WAIT, &xprt->state); 657 /* Try to schedule an autoclose RPC call */ 658 if (test_and_set_bit(XPRT_LOCKED, &xprt->state) == 0) 659 queue_work(rpciod_workqueue, &xprt->task_cleanup); 660 xprt_wake_pending_tasks(xprt, -EAGAIN); 661 out: 662 spin_unlock_bh(&xprt->transport_lock); 663 } 664 665 static void 666 xprt_init_autodisconnect(unsigned long data) 667 { 668 struct rpc_xprt *xprt = (struct rpc_xprt *)data; 669 670 spin_lock(&xprt->transport_lock); 671 if (!list_empty(&xprt->recv) || xprt->shutdown) 672 goto out_abort; 673 if (test_and_set_bit(XPRT_LOCKED, &xprt->state)) 674 goto out_abort; 675 spin_unlock(&xprt->transport_lock); 676 set_bit(XPRT_CONNECTION_CLOSE, &xprt->state); 677 queue_work(rpciod_workqueue, &xprt->task_cleanup); 678 return; 679 out_abort: 680 spin_unlock(&xprt->transport_lock); 681 } 682 683 /** 684 * xprt_connect - schedule a transport connect operation 685 * @task: RPC task that is requesting the connect 686 * 687 */ 688 void xprt_connect(struct rpc_task *task) 689 { 690 struct rpc_xprt *xprt = task->tk_xprt; 691 692 dprintk("RPC: %5u xprt_connect xprt %p %s connected\n", task->tk_pid, 693 xprt, (xprt_connected(xprt) ? "is" : "is not")); 694 695 if (!xprt_bound(xprt)) { 696 task->tk_status = -EAGAIN; 697 return; 698 } 699 if (!xprt_lock_write(xprt, task)) 700 return; 701 702 if (test_and_clear_bit(XPRT_CLOSE_WAIT, &xprt->state)) 703 xprt->ops->close(xprt); 704 705 if (xprt_connected(xprt)) 706 xprt_release_write(xprt, task); 707 else { 708 if (task->tk_rqstp) 709 task->tk_rqstp->rq_bytes_sent = 0; 710 711 task->tk_timeout = task->tk_rqstp->rq_timeout; 712 rpc_sleep_on(&xprt->pending, task, xprt_connect_status); 713 714 if (test_bit(XPRT_CLOSING, &xprt->state)) 715 return; 716 if (xprt_test_and_set_connecting(xprt)) 717 return; 718 xprt->stat.connect_start = jiffies; 719 xprt->ops->connect(task); 720 } 721 } 722 723 static void xprt_connect_status(struct rpc_task *task) 724 { 725 struct rpc_xprt *xprt = task->tk_xprt; 726 727 if (task->tk_status == 0) { 728 xprt->stat.connect_count++; 729 xprt->stat.connect_time += (long)jiffies - xprt->stat.connect_start; 730 dprintk("RPC: %5u xprt_connect_status: connection established\n", 731 task->tk_pid); 732 return; 733 } 734 735 switch (task->tk_status) { 736 case -EAGAIN: 737 dprintk("RPC: %5u xprt_connect_status: retrying\n", task->tk_pid); 738 break; 739 case -ETIMEDOUT: 740 dprintk("RPC: %5u xprt_connect_status: connect attempt timed " 741 "out\n", task->tk_pid); 742 break; 743 default: 744 dprintk("RPC: %5u xprt_connect_status: error %d connecting to " 745 "server %s\n", task->tk_pid, -task->tk_status, 746 task->tk_client->cl_server); 747 xprt_release_write(xprt, task); 748 task->tk_status = -EIO; 749 } 750 } 751 752 /** 753 * xprt_lookup_rqst - find an RPC request corresponding to an XID 754 * @xprt: transport on which the original request was transmitted 755 * @xid: RPC XID of incoming reply 756 * 757 */ 758 struct rpc_rqst *xprt_lookup_rqst(struct rpc_xprt *xprt, __be32 xid) 759 { 760 struct list_head *pos; 761 762 list_for_each(pos, &xprt->recv) { 763 struct rpc_rqst *entry = list_entry(pos, struct rpc_rqst, rq_list); 764 if (entry->rq_xid == xid) 765 return entry; 766 } 767 768 dprintk("RPC: xprt_lookup_rqst did not find xid %08x\n", 769 ntohl(xid)); 770 xprt->stat.bad_xids++; 771 return NULL; 772 } 773 EXPORT_SYMBOL_GPL(xprt_lookup_rqst); 774 775 static void xprt_update_rtt(struct rpc_task *task) 776 { 777 struct rpc_rqst *req = task->tk_rqstp; 778 struct rpc_rtt *rtt = task->tk_client->cl_rtt; 779 unsigned timer = task->tk_msg.rpc_proc->p_timer; 780 long m = usecs_to_jiffies(ktime_to_us(req->rq_rtt)); 781 782 if (timer) { 783 if (req->rq_ntrans == 1) 784 rpc_update_rtt(rtt, timer, m); 785 rpc_set_timeo(rtt, timer, req->rq_ntrans - 1); 786 } 787 } 788 789 /** 790 * xprt_complete_rqst - called when reply processing is complete 791 * @task: RPC request that recently completed 792 * @copied: actual number of bytes received from the transport 793 * 794 * Caller holds transport lock. 795 */ 796 void xprt_complete_rqst(struct rpc_task *task, int copied) 797 { 798 struct rpc_rqst *req = task->tk_rqstp; 799 struct rpc_xprt *xprt = req->rq_xprt; 800 801 dprintk("RPC: %5u xid %08x complete (%d bytes received)\n", 802 task->tk_pid, ntohl(req->rq_xid), copied); 803 804 xprt->stat.recvs++; 805 req->rq_rtt = ktime_sub(ktime_get(), req->rq_xtime); 806 if (xprt->ops->timer != NULL) 807 xprt_update_rtt(task); 808 809 list_del_init(&req->rq_list); 810 req->rq_private_buf.len = copied; 811 /* Ensure all writes are done before we update */ 812 /* req->rq_reply_bytes_recvd */ 813 smp_wmb(); 814 req->rq_reply_bytes_recvd = copied; 815 rpc_wake_up_queued_task(&xprt->pending, task); 816 } 817 EXPORT_SYMBOL_GPL(xprt_complete_rqst); 818 819 static void xprt_timer(struct rpc_task *task) 820 { 821 struct rpc_rqst *req = task->tk_rqstp; 822 struct rpc_xprt *xprt = req->rq_xprt; 823 824 if (task->tk_status != -ETIMEDOUT) 825 return; 826 dprintk("RPC: %5u xprt_timer\n", task->tk_pid); 827 828 spin_lock_bh(&xprt->transport_lock); 829 if (!req->rq_reply_bytes_recvd) { 830 if (xprt->ops->timer) 831 xprt->ops->timer(task); 832 } else 833 task->tk_status = 0; 834 spin_unlock_bh(&xprt->transport_lock); 835 } 836 837 static inline int xprt_has_timer(struct rpc_xprt *xprt) 838 { 839 return xprt->idle_timeout != 0; 840 } 841 842 /** 843 * xprt_prepare_transmit - reserve the transport before sending a request 844 * @task: RPC task about to send a request 845 * 846 */ 847 int xprt_prepare_transmit(struct rpc_task *task) 848 { 849 struct rpc_rqst *req = task->tk_rqstp; 850 struct rpc_xprt *xprt = req->rq_xprt; 851 int err = 0; 852 853 dprintk("RPC: %5u xprt_prepare_transmit\n", task->tk_pid); 854 855 spin_lock_bh(&xprt->transport_lock); 856 if (req->rq_reply_bytes_recvd && !req->rq_bytes_sent) { 857 err = req->rq_reply_bytes_recvd; 858 goto out_unlock; 859 } 860 if (!xprt->ops->reserve_xprt(task)) 861 err = -EAGAIN; 862 out_unlock: 863 spin_unlock_bh(&xprt->transport_lock); 864 return err; 865 } 866 867 void xprt_end_transmit(struct rpc_task *task) 868 { 869 xprt_release_write(task->tk_rqstp->rq_xprt, task); 870 } 871 872 /** 873 * xprt_transmit - send an RPC request on a transport 874 * @task: controlling RPC task 875 * 876 * We have to copy the iovec because sendmsg fiddles with its contents. 877 */ 878 void xprt_transmit(struct rpc_task *task) 879 { 880 struct rpc_rqst *req = task->tk_rqstp; 881 struct rpc_xprt *xprt = req->rq_xprt; 882 int status; 883 884 dprintk("RPC: %5u xprt_transmit(%u)\n", task->tk_pid, req->rq_slen); 885 886 if (!req->rq_reply_bytes_recvd) { 887 if (list_empty(&req->rq_list) && rpc_reply_expected(task)) { 888 /* 889 * Add to the list only if we're expecting a reply 890 */ 891 spin_lock_bh(&xprt->transport_lock); 892 /* Update the softirq receive buffer */ 893 memcpy(&req->rq_private_buf, &req->rq_rcv_buf, 894 sizeof(req->rq_private_buf)); 895 /* Add request to the receive list */ 896 list_add_tail(&req->rq_list, &xprt->recv); 897 spin_unlock_bh(&xprt->transport_lock); 898 xprt_reset_majortimeo(req); 899 /* Turn off autodisconnect */ 900 del_singleshot_timer_sync(&xprt->timer); 901 } 902 } else if (!req->rq_bytes_sent) 903 return; 904 905 req->rq_connect_cookie = xprt->connect_cookie; 906 req->rq_xtime = ktime_get(); 907 status = xprt->ops->send_request(task); 908 if (status != 0) { 909 task->tk_status = status; 910 return; 911 } 912 913 dprintk("RPC: %5u xmit complete\n", task->tk_pid); 914 spin_lock_bh(&xprt->transport_lock); 915 916 xprt->ops->set_retrans_timeout(task); 917 918 xprt->stat.sends++; 919 xprt->stat.req_u += xprt->stat.sends - xprt->stat.recvs; 920 xprt->stat.bklog_u += xprt->backlog.qlen; 921 922 /* Don't race with disconnect */ 923 if (!xprt_connected(xprt)) 924 task->tk_status = -ENOTCONN; 925 else if (!req->rq_reply_bytes_recvd && rpc_reply_expected(task)) { 926 /* 927 * Sleep on the pending queue since 928 * we're expecting a reply. 929 */ 930 rpc_sleep_on(&xprt->pending, task, xprt_timer); 931 } 932 spin_unlock_bh(&xprt->transport_lock); 933 } 934 935 static void xprt_alloc_slot(struct rpc_task *task) 936 { 937 struct rpc_xprt *xprt = task->tk_xprt; 938 939 task->tk_status = 0; 940 if (task->tk_rqstp) 941 return; 942 if (!list_empty(&xprt->free)) { 943 struct rpc_rqst *req = list_entry(xprt->free.next, struct rpc_rqst, rq_list); 944 list_del_init(&req->rq_list); 945 task->tk_rqstp = req; 946 xprt_request_init(task, xprt); 947 return; 948 } 949 dprintk("RPC: waiting for request slot\n"); 950 task->tk_status = -EAGAIN; 951 task->tk_timeout = 0; 952 rpc_sleep_on(&xprt->backlog, task, NULL); 953 } 954 955 static void xprt_free_slot(struct rpc_xprt *xprt, struct rpc_rqst *req) 956 { 957 memset(req, 0, sizeof(*req)); /* mark unused */ 958 959 spin_lock(&xprt->reserve_lock); 960 list_add(&req->rq_list, &xprt->free); 961 rpc_wake_up_next(&xprt->backlog); 962 spin_unlock(&xprt->reserve_lock); 963 } 964 965 struct rpc_xprt *xprt_alloc(int size, int max_req) 966 { 967 struct rpc_xprt *xprt; 968 969 xprt = kzalloc(size, GFP_KERNEL); 970 if (xprt == NULL) 971 goto out; 972 973 xprt->max_reqs = max_req; 974 xprt->slot = kcalloc(max_req, sizeof(struct rpc_rqst), GFP_KERNEL); 975 if (xprt->slot == NULL) 976 goto out_free; 977 978 return xprt; 979 980 out_free: 981 kfree(xprt); 982 out: 983 return NULL; 984 } 985 EXPORT_SYMBOL_GPL(xprt_alloc); 986 987 void xprt_free(struct rpc_xprt *xprt) 988 { 989 kfree(xprt->slot); 990 kfree(xprt); 991 } 992 EXPORT_SYMBOL_GPL(xprt_free); 993 994 /** 995 * xprt_reserve - allocate an RPC request slot 996 * @task: RPC task requesting a slot allocation 997 * 998 * If no more slots are available, place the task on the transport's 999 * backlog queue. 1000 */ 1001 void xprt_reserve(struct rpc_task *task) 1002 { 1003 struct rpc_xprt *xprt = task->tk_xprt; 1004 1005 task->tk_status = -EIO; 1006 spin_lock(&xprt->reserve_lock); 1007 xprt_alloc_slot(task); 1008 spin_unlock(&xprt->reserve_lock); 1009 } 1010 1011 static inline __be32 xprt_alloc_xid(struct rpc_xprt *xprt) 1012 { 1013 return (__force __be32)xprt->xid++; 1014 } 1015 1016 static inline void xprt_init_xid(struct rpc_xprt *xprt) 1017 { 1018 xprt->xid = net_random(); 1019 } 1020 1021 static void xprt_request_init(struct rpc_task *task, struct rpc_xprt *xprt) 1022 { 1023 struct rpc_rqst *req = task->tk_rqstp; 1024 1025 req->rq_timeout = task->tk_client->cl_timeout->to_initval; 1026 req->rq_task = task; 1027 req->rq_xprt = xprt; 1028 req->rq_buffer = NULL; 1029 req->rq_xid = xprt_alloc_xid(xprt); 1030 req->rq_release_snd_buf = NULL; 1031 xprt_reset_majortimeo(req); 1032 dprintk("RPC: %5u reserved req %p xid %08x\n", task->tk_pid, 1033 req, ntohl(req->rq_xid)); 1034 } 1035 1036 /** 1037 * xprt_release - release an RPC request slot 1038 * @task: task which is finished with the slot 1039 * 1040 */ 1041 void xprt_release(struct rpc_task *task) 1042 { 1043 struct rpc_xprt *xprt; 1044 struct rpc_rqst *req; 1045 1046 if (!(req = task->tk_rqstp)) 1047 return; 1048 1049 xprt = req->rq_xprt; 1050 rpc_count_iostats(task); 1051 spin_lock_bh(&xprt->transport_lock); 1052 xprt->ops->release_xprt(xprt, task); 1053 if (xprt->ops->release_request) 1054 xprt->ops->release_request(task); 1055 if (!list_empty(&req->rq_list)) 1056 list_del(&req->rq_list); 1057 xprt->last_used = jiffies; 1058 if (list_empty(&xprt->recv) && xprt_has_timer(xprt)) 1059 mod_timer(&xprt->timer, 1060 xprt->last_used + xprt->idle_timeout); 1061 spin_unlock_bh(&xprt->transport_lock); 1062 if (req->rq_buffer) 1063 xprt->ops->buf_free(req->rq_buffer); 1064 if (req->rq_cred != NULL) 1065 put_rpccred(req->rq_cred); 1066 task->tk_rqstp = NULL; 1067 if (req->rq_release_snd_buf) 1068 req->rq_release_snd_buf(req); 1069 1070 dprintk("RPC: %5u release request %p\n", task->tk_pid, req); 1071 if (likely(!bc_prealloc(req))) 1072 xprt_free_slot(xprt, req); 1073 else 1074 xprt_free_bc_request(req); 1075 } 1076 1077 /** 1078 * xprt_create_transport - create an RPC transport 1079 * @args: rpc transport creation arguments 1080 * 1081 */ 1082 struct rpc_xprt *xprt_create_transport(struct xprt_create *args) 1083 { 1084 struct rpc_xprt *xprt; 1085 struct rpc_rqst *req; 1086 struct xprt_class *t; 1087 1088 spin_lock(&xprt_list_lock); 1089 list_for_each_entry(t, &xprt_list, list) { 1090 if (t->ident == args->ident) { 1091 spin_unlock(&xprt_list_lock); 1092 goto found; 1093 } 1094 } 1095 spin_unlock(&xprt_list_lock); 1096 printk(KERN_ERR "RPC: transport (%d) not supported\n", args->ident); 1097 return ERR_PTR(-EIO); 1098 1099 found: 1100 xprt = t->setup(args); 1101 if (IS_ERR(xprt)) { 1102 dprintk("RPC: xprt_create_transport: failed, %ld\n", 1103 -PTR_ERR(xprt)); 1104 return xprt; 1105 } 1106 1107 kref_init(&xprt->kref); 1108 spin_lock_init(&xprt->transport_lock); 1109 spin_lock_init(&xprt->reserve_lock); 1110 1111 INIT_LIST_HEAD(&xprt->free); 1112 INIT_LIST_HEAD(&xprt->recv); 1113 #if defined(CONFIG_NFS_V4_1) 1114 spin_lock_init(&xprt->bc_pa_lock); 1115 INIT_LIST_HEAD(&xprt->bc_pa_list); 1116 #endif /* CONFIG_NFS_V4_1 */ 1117 1118 INIT_WORK(&xprt->task_cleanup, xprt_autoclose); 1119 if (xprt_has_timer(xprt)) 1120 setup_timer(&xprt->timer, xprt_init_autodisconnect, 1121 (unsigned long)xprt); 1122 else 1123 init_timer(&xprt->timer); 1124 xprt->last_used = jiffies; 1125 xprt->cwnd = RPC_INITCWND; 1126 xprt->bind_index = 0; 1127 1128 rpc_init_wait_queue(&xprt->binding, "xprt_binding"); 1129 rpc_init_wait_queue(&xprt->pending, "xprt_pending"); 1130 rpc_init_wait_queue(&xprt->sending, "xprt_sending"); 1131 rpc_init_wait_queue(&xprt->resend, "xprt_resend"); 1132 rpc_init_priority_wait_queue(&xprt->backlog, "xprt_backlog"); 1133 1134 /* initialize free list */ 1135 for (req = &xprt->slot[xprt->max_reqs-1]; req >= &xprt->slot[0]; req--) 1136 list_add(&req->rq_list, &xprt->free); 1137 1138 xprt_init_xid(xprt); 1139 1140 dprintk("RPC: created transport %p with %u slots\n", xprt, 1141 xprt->max_reqs); 1142 return xprt; 1143 } 1144 1145 /** 1146 * xprt_destroy - destroy an RPC transport, killing off all requests. 1147 * @kref: kref for the transport to destroy 1148 * 1149 */ 1150 static void xprt_destroy(struct kref *kref) 1151 { 1152 struct rpc_xprt *xprt = container_of(kref, struct rpc_xprt, kref); 1153 1154 dprintk("RPC: destroying transport %p\n", xprt); 1155 xprt->shutdown = 1; 1156 del_timer_sync(&xprt->timer); 1157 1158 rpc_destroy_wait_queue(&xprt->binding); 1159 rpc_destroy_wait_queue(&xprt->pending); 1160 rpc_destroy_wait_queue(&xprt->sending); 1161 rpc_destroy_wait_queue(&xprt->resend); 1162 rpc_destroy_wait_queue(&xprt->backlog); 1163 cancel_work_sync(&xprt->task_cleanup); 1164 /* 1165 * Tear down transport state and free the rpc_xprt 1166 */ 1167 xprt->ops->destroy(xprt); 1168 } 1169 1170 /** 1171 * xprt_put - release a reference to an RPC transport. 1172 * @xprt: pointer to the transport 1173 * 1174 */ 1175 void xprt_put(struct rpc_xprt *xprt) 1176 { 1177 kref_put(&xprt->kref, xprt_destroy); 1178 } 1179 1180 /** 1181 * xprt_get - return a reference to an RPC transport. 1182 * @xprt: pointer to the transport 1183 * 1184 */ 1185 struct rpc_xprt *xprt_get(struct rpc_xprt *xprt) 1186 { 1187 kref_get(&xprt->kref); 1188 return xprt; 1189 } 1190