1 /* 2 * linux/net/sunrpc/xprt.c 3 * 4 * This is a generic RPC call interface supporting congestion avoidance, 5 * and asynchronous calls. 6 * 7 * The interface works like this: 8 * 9 * - When a process places a call, it allocates a request slot if 10 * one is available. Otherwise, it sleeps on the backlog queue 11 * (xprt_reserve). 12 * - Next, the caller puts together the RPC message, stuffs it into 13 * the request struct, and calls xprt_transmit(). 14 * - xprt_transmit sends the message and installs the caller on the 15 * transport's wait list. At the same time, if a reply is expected, 16 * it installs a timer that is run after the packet's timeout has 17 * expired. 18 * - When a packet arrives, the data_ready handler walks the list of 19 * pending requests for that transport. If a matching XID is found, the 20 * caller is woken up, and the timer removed. 21 * - When no reply arrives within the timeout interval, the timer is 22 * fired by the kernel and runs xprt_timer(). It either adjusts the 23 * timeout values (minor timeout) or wakes up the caller with a status 24 * of -ETIMEDOUT. 25 * - When the caller receives a notification from RPC that a reply arrived, 26 * it should release the RPC slot, and process the reply. 27 * If the call timed out, it may choose to retry the operation by 28 * adjusting the initial timeout value, and simply calling rpc_call 29 * again. 30 * 31 * Support for async RPC is done through a set of RPC-specific scheduling 32 * primitives that `transparently' work for processes as well as async 33 * tasks that rely on callbacks. 34 * 35 * Copyright (C) 1995-1997, Olaf Kirch <okir@monad.swb.de> 36 * 37 * Transport switch API copyright (C) 2005, Chuck Lever <cel@netapp.com> 38 */ 39 40 #include <linux/module.h> 41 42 #include <linux/types.h> 43 #include <linux/interrupt.h> 44 #include <linux/workqueue.h> 45 #include <linux/net.h> 46 #include <linux/ktime.h> 47 48 #include <linux/sunrpc/clnt.h> 49 #include <linux/sunrpc/metrics.h> 50 #include <linux/sunrpc/bc_xprt.h> 51 #include <linux/rcupdate.h> 52 #include <linux/sched/mm.h> 53 54 #include <trace/events/sunrpc.h> 55 56 #include "sunrpc.h" 57 58 /* 59 * Local variables 60 */ 61 62 #if IS_ENABLED(CONFIG_SUNRPC_DEBUG) 63 # define RPCDBG_FACILITY RPCDBG_XPRT 64 #endif 65 66 /* 67 * Local functions 68 */ 69 static void xprt_init(struct rpc_xprt *xprt, struct net *net); 70 static __be32 xprt_alloc_xid(struct rpc_xprt *xprt); 71 static void xprt_destroy(struct rpc_xprt *xprt); 72 73 static DEFINE_SPINLOCK(xprt_list_lock); 74 static LIST_HEAD(xprt_list); 75 76 static unsigned long xprt_request_timeout(const struct rpc_rqst *req) 77 { 78 unsigned long timeout = jiffies + req->rq_timeout; 79 80 if (time_before(timeout, req->rq_majortimeo)) 81 return timeout; 82 return req->rq_majortimeo; 83 } 84 85 /** 86 * xprt_register_transport - register a transport implementation 87 * @transport: transport to register 88 * 89 * If a transport implementation is loaded as a kernel module, it can 90 * call this interface to make itself known to the RPC client. 91 * 92 * Returns: 93 * 0: transport successfully registered 94 * -EEXIST: transport already registered 95 * -EINVAL: transport module being unloaded 96 */ 97 int xprt_register_transport(struct xprt_class *transport) 98 { 99 struct xprt_class *t; 100 int result; 101 102 result = -EEXIST; 103 spin_lock(&xprt_list_lock); 104 list_for_each_entry(t, &xprt_list, list) { 105 /* don't register the same transport class twice */ 106 if (t->ident == transport->ident) 107 goto out; 108 } 109 110 list_add_tail(&transport->list, &xprt_list); 111 printk(KERN_INFO "RPC: Registered %s transport module.\n", 112 transport->name); 113 result = 0; 114 115 out: 116 spin_unlock(&xprt_list_lock); 117 return result; 118 } 119 EXPORT_SYMBOL_GPL(xprt_register_transport); 120 121 /** 122 * xprt_unregister_transport - unregister a transport implementation 123 * @transport: transport to unregister 124 * 125 * Returns: 126 * 0: transport successfully unregistered 127 * -ENOENT: transport never registered 128 */ 129 int xprt_unregister_transport(struct xprt_class *transport) 130 { 131 struct xprt_class *t; 132 int result; 133 134 result = 0; 135 spin_lock(&xprt_list_lock); 136 list_for_each_entry(t, &xprt_list, list) { 137 if (t == transport) { 138 printk(KERN_INFO 139 "RPC: Unregistered %s transport module.\n", 140 transport->name); 141 list_del_init(&transport->list); 142 goto out; 143 } 144 } 145 result = -ENOENT; 146 147 out: 148 spin_unlock(&xprt_list_lock); 149 return result; 150 } 151 EXPORT_SYMBOL_GPL(xprt_unregister_transport); 152 153 /** 154 * xprt_load_transport - load a transport implementation 155 * @transport_name: transport to load 156 * 157 * Returns: 158 * 0: transport successfully loaded 159 * -ENOENT: transport module not available 160 */ 161 int xprt_load_transport(const char *transport_name) 162 { 163 struct xprt_class *t; 164 int result; 165 166 result = 0; 167 spin_lock(&xprt_list_lock); 168 list_for_each_entry(t, &xprt_list, list) { 169 if (strcmp(t->name, transport_name) == 0) { 170 spin_unlock(&xprt_list_lock); 171 goto out; 172 } 173 } 174 spin_unlock(&xprt_list_lock); 175 result = request_module("xprt%s", transport_name); 176 out: 177 return result; 178 } 179 EXPORT_SYMBOL_GPL(xprt_load_transport); 180 181 static void xprt_clear_locked(struct rpc_xprt *xprt) 182 { 183 xprt->snd_task = NULL; 184 if (!test_bit(XPRT_CLOSE_WAIT, &xprt->state)) { 185 smp_mb__before_atomic(); 186 clear_bit(XPRT_LOCKED, &xprt->state); 187 smp_mb__after_atomic(); 188 } else 189 queue_work(xprtiod_workqueue, &xprt->task_cleanup); 190 } 191 192 /** 193 * xprt_reserve_xprt - serialize write access to transports 194 * @task: task that is requesting access to the transport 195 * @xprt: pointer to the target transport 196 * 197 * This prevents mixing the payload of separate requests, and prevents 198 * transport connects from colliding with writes. No congestion control 199 * is provided. 200 */ 201 int xprt_reserve_xprt(struct rpc_xprt *xprt, struct rpc_task *task) 202 { 203 struct rpc_rqst *req = task->tk_rqstp; 204 205 if (test_and_set_bit(XPRT_LOCKED, &xprt->state)) { 206 if (task == xprt->snd_task) 207 return 1; 208 goto out_sleep; 209 } 210 if (test_bit(XPRT_WRITE_SPACE, &xprt->state)) 211 goto out_unlock; 212 xprt->snd_task = task; 213 214 return 1; 215 216 out_unlock: 217 xprt_clear_locked(xprt); 218 out_sleep: 219 dprintk("RPC: %5u failed to lock transport %p\n", 220 task->tk_pid, xprt); 221 task->tk_status = -EAGAIN; 222 if (RPC_IS_SOFT(task)) 223 rpc_sleep_on_timeout(&xprt->sending, task, NULL, 224 xprt_request_timeout(req)); 225 else 226 rpc_sleep_on(&xprt->sending, task, NULL); 227 return 0; 228 } 229 EXPORT_SYMBOL_GPL(xprt_reserve_xprt); 230 231 static bool 232 xprt_need_congestion_window_wait(struct rpc_xprt *xprt) 233 { 234 return test_bit(XPRT_CWND_WAIT, &xprt->state); 235 } 236 237 static void 238 xprt_set_congestion_window_wait(struct rpc_xprt *xprt) 239 { 240 if (!list_empty(&xprt->xmit_queue)) { 241 /* Peek at head of queue to see if it can make progress */ 242 if (list_first_entry(&xprt->xmit_queue, struct rpc_rqst, 243 rq_xmit)->rq_cong) 244 return; 245 } 246 set_bit(XPRT_CWND_WAIT, &xprt->state); 247 } 248 249 static void 250 xprt_test_and_clear_congestion_window_wait(struct rpc_xprt *xprt) 251 { 252 if (!RPCXPRT_CONGESTED(xprt)) 253 clear_bit(XPRT_CWND_WAIT, &xprt->state); 254 } 255 256 /* 257 * xprt_reserve_xprt_cong - serialize write access to transports 258 * @task: task that is requesting access to the transport 259 * 260 * Same as xprt_reserve_xprt, but Van Jacobson congestion control is 261 * integrated into the decision of whether a request is allowed to be 262 * woken up and given access to the transport. 263 * Note that the lock is only granted if we know there are free slots. 264 */ 265 int xprt_reserve_xprt_cong(struct rpc_xprt *xprt, struct rpc_task *task) 266 { 267 struct rpc_rqst *req = task->tk_rqstp; 268 269 if (test_and_set_bit(XPRT_LOCKED, &xprt->state)) { 270 if (task == xprt->snd_task) 271 return 1; 272 goto out_sleep; 273 } 274 if (req == NULL) { 275 xprt->snd_task = task; 276 return 1; 277 } 278 if (test_bit(XPRT_WRITE_SPACE, &xprt->state)) 279 goto out_unlock; 280 if (!xprt_need_congestion_window_wait(xprt)) { 281 xprt->snd_task = task; 282 return 1; 283 } 284 out_unlock: 285 xprt_clear_locked(xprt); 286 out_sleep: 287 dprintk("RPC: %5u failed to lock transport %p\n", task->tk_pid, xprt); 288 task->tk_status = -EAGAIN; 289 if (RPC_IS_SOFT(task)) 290 rpc_sleep_on_timeout(&xprt->sending, task, NULL, 291 xprt_request_timeout(req)); 292 else 293 rpc_sleep_on(&xprt->sending, task, NULL); 294 return 0; 295 } 296 EXPORT_SYMBOL_GPL(xprt_reserve_xprt_cong); 297 298 static inline int xprt_lock_write(struct rpc_xprt *xprt, struct rpc_task *task) 299 { 300 int retval; 301 302 if (test_bit(XPRT_LOCKED, &xprt->state) && xprt->snd_task == task) 303 return 1; 304 spin_lock_bh(&xprt->transport_lock); 305 retval = xprt->ops->reserve_xprt(xprt, task); 306 spin_unlock_bh(&xprt->transport_lock); 307 return retval; 308 } 309 310 static bool __xprt_lock_write_func(struct rpc_task *task, void *data) 311 { 312 struct rpc_xprt *xprt = data; 313 314 xprt->snd_task = task; 315 return true; 316 } 317 318 static void __xprt_lock_write_next(struct rpc_xprt *xprt) 319 { 320 if (test_and_set_bit(XPRT_LOCKED, &xprt->state)) 321 return; 322 if (test_bit(XPRT_WRITE_SPACE, &xprt->state)) 323 goto out_unlock; 324 if (rpc_wake_up_first_on_wq(xprtiod_workqueue, &xprt->sending, 325 __xprt_lock_write_func, xprt)) 326 return; 327 out_unlock: 328 xprt_clear_locked(xprt); 329 } 330 331 static void __xprt_lock_write_next_cong(struct rpc_xprt *xprt) 332 { 333 if (test_and_set_bit(XPRT_LOCKED, &xprt->state)) 334 return; 335 if (test_bit(XPRT_WRITE_SPACE, &xprt->state)) 336 goto out_unlock; 337 if (xprt_need_congestion_window_wait(xprt)) 338 goto out_unlock; 339 if (rpc_wake_up_first_on_wq(xprtiod_workqueue, &xprt->sending, 340 __xprt_lock_write_func, xprt)) 341 return; 342 out_unlock: 343 xprt_clear_locked(xprt); 344 } 345 346 /** 347 * xprt_release_xprt - allow other requests to use a transport 348 * @xprt: transport with other tasks potentially waiting 349 * @task: task that is releasing access to the transport 350 * 351 * Note that "task" can be NULL. No congestion control is provided. 352 */ 353 void xprt_release_xprt(struct rpc_xprt *xprt, struct rpc_task *task) 354 { 355 if (xprt->snd_task == task) { 356 xprt_clear_locked(xprt); 357 __xprt_lock_write_next(xprt); 358 } 359 } 360 EXPORT_SYMBOL_GPL(xprt_release_xprt); 361 362 /** 363 * xprt_release_xprt_cong - allow other requests to use a transport 364 * @xprt: transport with other tasks potentially waiting 365 * @task: task that is releasing access to the transport 366 * 367 * Note that "task" can be NULL. Another task is awoken to use the 368 * transport if the transport's congestion window allows it. 369 */ 370 void xprt_release_xprt_cong(struct rpc_xprt *xprt, struct rpc_task *task) 371 { 372 if (xprt->snd_task == task) { 373 xprt_clear_locked(xprt); 374 __xprt_lock_write_next_cong(xprt); 375 } 376 } 377 EXPORT_SYMBOL_GPL(xprt_release_xprt_cong); 378 379 static inline void xprt_release_write(struct rpc_xprt *xprt, struct rpc_task *task) 380 { 381 if (xprt->snd_task != task) 382 return; 383 spin_lock_bh(&xprt->transport_lock); 384 xprt->ops->release_xprt(xprt, task); 385 spin_unlock_bh(&xprt->transport_lock); 386 } 387 388 /* 389 * Van Jacobson congestion avoidance. Check if the congestion window 390 * overflowed. Put the task to sleep if this is the case. 391 */ 392 static int 393 __xprt_get_cong(struct rpc_xprt *xprt, struct rpc_rqst *req) 394 { 395 if (req->rq_cong) 396 return 1; 397 dprintk("RPC: %5u xprt_cwnd_limited cong = %lu cwnd = %lu\n", 398 req->rq_task->tk_pid, xprt->cong, xprt->cwnd); 399 if (RPCXPRT_CONGESTED(xprt)) { 400 xprt_set_congestion_window_wait(xprt); 401 return 0; 402 } 403 req->rq_cong = 1; 404 xprt->cong += RPC_CWNDSCALE; 405 return 1; 406 } 407 408 /* 409 * Adjust the congestion window, and wake up the next task 410 * that has been sleeping due to congestion 411 */ 412 static void 413 __xprt_put_cong(struct rpc_xprt *xprt, struct rpc_rqst *req) 414 { 415 if (!req->rq_cong) 416 return; 417 req->rq_cong = 0; 418 xprt->cong -= RPC_CWNDSCALE; 419 xprt_test_and_clear_congestion_window_wait(xprt); 420 __xprt_lock_write_next_cong(xprt); 421 } 422 423 /** 424 * xprt_request_get_cong - Request congestion control credits 425 * @xprt: pointer to transport 426 * @req: pointer to RPC request 427 * 428 * Useful for transports that require congestion control. 429 */ 430 bool 431 xprt_request_get_cong(struct rpc_xprt *xprt, struct rpc_rqst *req) 432 { 433 bool ret = false; 434 435 if (req->rq_cong) 436 return true; 437 spin_lock_bh(&xprt->transport_lock); 438 ret = __xprt_get_cong(xprt, req) != 0; 439 spin_unlock_bh(&xprt->transport_lock); 440 return ret; 441 } 442 EXPORT_SYMBOL_GPL(xprt_request_get_cong); 443 444 /** 445 * xprt_release_rqst_cong - housekeeping when request is complete 446 * @task: RPC request that recently completed 447 * 448 * Useful for transports that require congestion control. 449 */ 450 void xprt_release_rqst_cong(struct rpc_task *task) 451 { 452 struct rpc_rqst *req = task->tk_rqstp; 453 454 __xprt_put_cong(req->rq_xprt, req); 455 } 456 EXPORT_SYMBOL_GPL(xprt_release_rqst_cong); 457 458 /* 459 * Clear the congestion window wait flag and wake up the next 460 * entry on xprt->sending 461 */ 462 static void 463 xprt_clear_congestion_window_wait(struct rpc_xprt *xprt) 464 { 465 if (test_and_clear_bit(XPRT_CWND_WAIT, &xprt->state)) { 466 spin_lock_bh(&xprt->transport_lock); 467 __xprt_lock_write_next_cong(xprt); 468 spin_unlock_bh(&xprt->transport_lock); 469 } 470 } 471 472 /** 473 * xprt_adjust_cwnd - adjust transport congestion window 474 * @xprt: pointer to xprt 475 * @task: recently completed RPC request used to adjust window 476 * @result: result code of completed RPC request 477 * 478 * The transport code maintains an estimate on the maximum number of out- 479 * standing RPC requests, using a smoothed version of the congestion 480 * avoidance implemented in 44BSD. This is basically the Van Jacobson 481 * congestion algorithm: If a retransmit occurs, the congestion window is 482 * halved; otherwise, it is incremented by 1/cwnd when 483 * 484 * - a reply is received and 485 * - a full number of requests are outstanding and 486 * - the congestion window hasn't been updated recently. 487 */ 488 void xprt_adjust_cwnd(struct rpc_xprt *xprt, struct rpc_task *task, int result) 489 { 490 struct rpc_rqst *req = task->tk_rqstp; 491 unsigned long cwnd = xprt->cwnd; 492 493 if (result >= 0 && cwnd <= xprt->cong) { 494 /* The (cwnd >> 1) term makes sure 495 * the result gets rounded properly. */ 496 cwnd += (RPC_CWNDSCALE * RPC_CWNDSCALE + (cwnd >> 1)) / cwnd; 497 if (cwnd > RPC_MAXCWND(xprt)) 498 cwnd = RPC_MAXCWND(xprt); 499 __xprt_lock_write_next_cong(xprt); 500 } else if (result == -ETIMEDOUT) { 501 cwnd >>= 1; 502 if (cwnd < RPC_CWNDSCALE) 503 cwnd = RPC_CWNDSCALE; 504 } 505 dprintk("RPC: cong %ld, cwnd was %ld, now %ld\n", 506 xprt->cong, xprt->cwnd, cwnd); 507 xprt->cwnd = cwnd; 508 __xprt_put_cong(xprt, req); 509 } 510 EXPORT_SYMBOL_GPL(xprt_adjust_cwnd); 511 512 /** 513 * xprt_wake_pending_tasks - wake all tasks on a transport's pending queue 514 * @xprt: transport with waiting tasks 515 * @status: result code to plant in each task before waking it 516 * 517 */ 518 void xprt_wake_pending_tasks(struct rpc_xprt *xprt, int status) 519 { 520 if (status < 0) 521 rpc_wake_up_status(&xprt->pending, status); 522 else 523 rpc_wake_up(&xprt->pending); 524 } 525 EXPORT_SYMBOL_GPL(xprt_wake_pending_tasks); 526 527 /** 528 * xprt_wait_for_buffer_space - wait for transport output buffer to clear 529 * @xprt: transport 530 * 531 * Note that we only set the timer for the case of RPC_IS_SOFT(), since 532 * we don't in general want to force a socket disconnection due to 533 * an incomplete RPC call transmission. 534 */ 535 void xprt_wait_for_buffer_space(struct rpc_xprt *xprt) 536 { 537 set_bit(XPRT_WRITE_SPACE, &xprt->state); 538 } 539 EXPORT_SYMBOL_GPL(xprt_wait_for_buffer_space); 540 541 static bool 542 xprt_clear_write_space_locked(struct rpc_xprt *xprt) 543 { 544 if (test_and_clear_bit(XPRT_WRITE_SPACE, &xprt->state)) { 545 __xprt_lock_write_next(xprt); 546 dprintk("RPC: write space: waking waiting task on " 547 "xprt %p\n", xprt); 548 return true; 549 } 550 return false; 551 } 552 553 /** 554 * xprt_write_space - wake the task waiting for transport output buffer space 555 * @xprt: transport with waiting tasks 556 * 557 * Can be called in a soft IRQ context, so xprt_write_space never sleeps. 558 */ 559 bool xprt_write_space(struct rpc_xprt *xprt) 560 { 561 bool ret; 562 563 if (!test_bit(XPRT_WRITE_SPACE, &xprt->state)) 564 return false; 565 spin_lock_bh(&xprt->transport_lock); 566 ret = xprt_clear_write_space_locked(xprt); 567 spin_unlock_bh(&xprt->transport_lock); 568 return ret; 569 } 570 EXPORT_SYMBOL_GPL(xprt_write_space); 571 572 static unsigned long xprt_abs_ktime_to_jiffies(ktime_t abstime) 573 { 574 s64 delta = ktime_to_ns(ktime_get() - abstime); 575 return likely(delta >= 0) ? 576 jiffies - nsecs_to_jiffies(delta) : 577 jiffies + nsecs_to_jiffies(-delta); 578 } 579 580 static unsigned long xprt_calc_majortimeo(struct rpc_rqst *req) 581 { 582 const struct rpc_timeout *to = req->rq_task->tk_client->cl_timeout; 583 unsigned long majortimeo = req->rq_timeout; 584 585 if (to->to_exponential) 586 majortimeo <<= to->to_retries; 587 else 588 majortimeo += to->to_increment * to->to_retries; 589 if (majortimeo > to->to_maxval || majortimeo == 0) 590 majortimeo = to->to_maxval; 591 return majortimeo; 592 } 593 594 static void xprt_reset_majortimeo(struct rpc_rqst *req) 595 { 596 req->rq_majortimeo += xprt_calc_majortimeo(req); 597 } 598 599 static void xprt_init_majortimeo(struct rpc_task *task, struct rpc_rqst *req) 600 { 601 unsigned long time_init; 602 struct rpc_xprt *xprt = req->rq_xprt; 603 604 if (likely(xprt && xprt_connected(xprt))) 605 time_init = jiffies; 606 else 607 time_init = xprt_abs_ktime_to_jiffies(task->tk_start); 608 req->rq_timeout = task->tk_client->cl_timeout->to_initval; 609 req->rq_majortimeo = time_init + xprt_calc_majortimeo(req); 610 } 611 612 /** 613 * xprt_adjust_timeout - adjust timeout values for next retransmit 614 * @req: RPC request containing parameters to use for the adjustment 615 * 616 */ 617 int xprt_adjust_timeout(struct rpc_rqst *req) 618 { 619 struct rpc_xprt *xprt = req->rq_xprt; 620 const struct rpc_timeout *to = req->rq_task->tk_client->cl_timeout; 621 int status = 0; 622 623 if (time_before(jiffies, req->rq_majortimeo)) { 624 if (to->to_exponential) 625 req->rq_timeout <<= 1; 626 else 627 req->rq_timeout += to->to_increment; 628 if (to->to_maxval && req->rq_timeout >= to->to_maxval) 629 req->rq_timeout = to->to_maxval; 630 req->rq_retries++; 631 } else { 632 req->rq_timeout = to->to_initval; 633 req->rq_retries = 0; 634 xprt_reset_majortimeo(req); 635 /* Reset the RTT counters == "slow start" */ 636 spin_lock_bh(&xprt->transport_lock); 637 rpc_init_rtt(req->rq_task->tk_client->cl_rtt, to->to_initval); 638 spin_unlock_bh(&xprt->transport_lock); 639 status = -ETIMEDOUT; 640 } 641 642 if (req->rq_timeout == 0) { 643 printk(KERN_WARNING "xprt_adjust_timeout: rq_timeout = 0!\n"); 644 req->rq_timeout = 5 * HZ; 645 } 646 return status; 647 } 648 649 static void xprt_autoclose(struct work_struct *work) 650 { 651 struct rpc_xprt *xprt = 652 container_of(work, struct rpc_xprt, task_cleanup); 653 unsigned int pflags = memalloc_nofs_save(); 654 655 clear_bit(XPRT_CLOSE_WAIT, &xprt->state); 656 xprt->ops->close(xprt); 657 xprt_release_write(xprt, NULL); 658 wake_up_bit(&xprt->state, XPRT_LOCKED); 659 memalloc_nofs_restore(pflags); 660 } 661 662 /** 663 * xprt_disconnect_done - mark a transport as disconnected 664 * @xprt: transport to flag for disconnect 665 * 666 */ 667 void xprt_disconnect_done(struct rpc_xprt *xprt) 668 { 669 dprintk("RPC: disconnected transport %p\n", xprt); 670 spin_lock_bh(&xprt->transport_lock); 671 xprt_clear_connected(xprt); 672 xprt_clear_write_space_locked(xprt); 673 xprt_wake_pending_tasks(xprt, -ENOTCONN); 674 spin_unlock_bh(&xprt->transport_lock); 675 } 676 EXPORT_SYMBOL_GPL(xprt_disconnect_done); 677 678 /** 679 * xprt_force_disconnect - force a transport to disconnect 680 * @xprt: transport to disconnect 681 * 682 */ 683 void xprt_force_disconnect(struct rpc_xprt *xprt) 684 { 685 /* Don't race with the test_bit() in xprt_clear_locked() */ 686 spin_lock_bh(&xprt->transport_lock); 687 set_bit(XPRT_CLOSE_WAIT, &xprt->state); 688 /* Try to schedule an autoclose RPC call */ 689 if (test_and_set_bit(XPRT_LOCKED, &xprt->state) == 0) 690 queue_work(xprtiod_workqueue, &xprt->task_cleanup); 691 else if (xprt->snd_task) 692 rpc_wake_up_queued_task_set_status(&xprt->pending, 693 xprt->snd_task, -ENOTCONN); 694 spin_unlock_bh(&xprt->transport_lock); 695 } 696 EXPORT_SYMBOL_GPL(xprt_force_disconnect); 697 698 static unsigned int 699 xprt_connect_cookie(struct rpc_xprt *xprt) 700 { 701 return READ_ONCE(xprt->connect_cookie); 702 } 703 704 static bool 705 xprt_request_retransmit_after_disconnect(struct rpc_task *task) 706 { 707 struct rpc_rqst *req = task->tk_rqstp; 708 struct rpc_xprt *xprt = req->rq_xprt; 709 710 return req->rq_connect_cookie != xprt_connect_cookie(xprt) || 711 !xprt_connected(xprt); 712 } 713 714 /** 715 * xprt_conditional_disconnect - force a transport to disconnect 716 * @xprt: transport to disconnect 717 * @cookie: 'connection cookie' 718 * 719 * This attempts to break the connection if and only if 'cookie' matches 720 * the current transport 'connection cookie'. It ensures that we don't 721 * try to break the connection more than once when we need to retransmit 722 * a batch of RPC requests. 723 * 724 */ 725 void xprt_conditional_disconnect(struct rpc_xprt *xprt, unsigned int cookie) 726 { 727 /* Don't race with the test_bit() in xprt_clear_locked() */ 728 spin_lock_bh(&xprt->transport_lock); 729 if (cookie != xprt->connect_cookie) 730 goto out; 731 if (test_bit(XPRT_CLOSING, &xprt->state)) 732 goto out; 733 set_bit(XPRT_CLOSE_WAIT, &xprt->state); 734 /* Try to schedule an autoclose RPC call */ 735 if (test_and_set_bit(XPRT_LOCKED, &xprt->state) == 0) 736 queue_work(xprtiod_workqueue, &xprt->task_cleanup); 737 xprt_wake_pending_tasks(xprt, -EAGAIN); 738 out: 739 spin_unlock_bh(&xprt->transport_lock); 740 } 741 742 static bool 743 xprt_has_timer(const struct rpc_xprt *xprt) 744 { 745 return xprt->idle_timeout != 0; 746 } 747 748 static void 749 xprt_schedule_autodisconnect(struct rpc_xprt *xprt) 750 __must_hold(&xprt->transport_lock) 751 { 752 if (RB_EMPTY_ROOT(&xprt->recv_queue) && xprt_has_timer(xprt)) 753 mod_timer(&xprt->timer, xprt->last_used + xprt->idle_timeout); 754 } 755 756 static void 757 xprt_init_autodisconnect(struct timer_list *t) 758 { 759 struct rpc_xprt *xprt = from_timer(xprt, t, timer); 760 761 spin_lock(&xprt->transport_lock); 762 if (!RB_EMPTY_ROOT(&xprt->recv_queue)) 763 goto out_abort; 764 /* Reset xprt->last_used to avoid connect/autodisconnect cycling */ 765 xprt->last_used = jiffies; 766 if (test_and_set_bit(XPRT_LOCKED, &xprt->state)) 767 goto out_abort; 768 spin_unlock(&xprt->transport_lock); 769 queue_work(xprtiod_workqueue, &xprt->task_cleanup); 770 return; 771 out_abort: 772 spin_unlock(&xprt->transport_lock); 773 } 774 775 bool xprt_lock_connect(struct rpc_xprt *xprt, 776 struct rpc_task *task, 777 void *cookie) 778 { 779 bool ret = false; 780 781 spin_lock_bh(&xprt->transport_lock); 782 if (!test_bit(XPRT_LOCKED, &xprt->state)) 783 goto out; 784 if (xprt->snd_task != task) 785 goto out; 786 xprt->snd_task = cookie; 787 ret = true; 788 out: 789 spin_unlock_bh(&xprt->transport_lock); 790 return ret; 791 } 792 793 void xprt_unlock_connect(struct rpc_xprt *xprt, void *cookie) 794 { 795 spin_lock_bh(&xprt->transport_lock); 796 if (xprt->snd_task != cookie) 797 goto out; 798 if (!test_bit(XPRT_LOCKED, &xprt->state)) 799 goto out; 800 xprt->snd_task =NULL; 801 xprt->ops->release_xprt(xprt, NULL); 802 xprt_schedule_autodisconnect(xprt); 803 out: 804 spin_unlock_bh(&xprt->transport_lock); 805 wake_up_bit(&xprt->state, XPRT_LOCKED); 806 } 807 808 /** 809 * xprt_connect - schedule a transport connect operation 810 * @task: RPC task that is requesting the connect 811 * 812 */ 813 void xprt_connect(struct rpc_task *task) 814 { 815 struct rpc_xprt *xprt = task->tk_rqstp->rq_xprt; 816 817 dprintk("RPC: %5u xprt_connect xprt %p %s connected\n", task->tk_pid, 818 xprt, (xprt_connected(xprt) ? "is" : "is not")); 819 820 if (!xprt_bound(xprt)) { 821 task->tk_status = -EAGAIN; 822 return; 823 } 824 if (!xprt_lock_write(xprt, task)) 825 return; 826 827 if (test_and_clear_bit(XPRT_CLOSE_WAIT, &xprt->state)) 828 xprt->ops->close(xprt); 829 830 if (!xprt_connected(xprt)) { 831 task->tk_rqstp->rq_connect_cookie = xprt->connect_cookie; 832 rpc_sleep_on_timeout(&xprt->pending, task, NULL, 833 xprt_request_timeout(task->tk_rqstp)); 834 835 if (test_bit(XPRT_CLOSING, &xprt->state)) 836 return; 837 if (xprt_test_and_set_connecting(xprt)) 838 return; 839 /* Race breaker */ 840 if (!xprt_connected(xprt)) { 841 xprt->stat.connect_start = jiffies; 842 xprt->ops->connect(xprt, task); 843 } else { 844 xprt_clear_connecting(xprt); 845 task->tk_status = 0; 846 rpc_wake_up_queued_task(&xprt->pending, task); 847 } 848 } 849 xprt_release_write(xprt, task); 850 } 851 852 enum xprt_xid_rb_cmp { 853 XID_RB_EQUAL, 854 XID_RB_LEFT, 855 XID_RB_RIGHT, 856 }; 857 static enum xprt_xid_rb_cmp 858 xprt_xid_cmp(__be32 xid1, __be32 xid2) 859 { 860 if (xid1 == xid2) 861 return XID_RB_EQUAL; 862 if ((__force u32)xid1 < (__force u32)xid2) 863 return XID_RB_LEFT; 864 return XID_RB_RIGHT; 865 } 866 867 static struct rpc_rqst * 868 xprt_request_rb_find(struct rpc_xprt *xprt, __be32 xid) 869 { 870 struct rb_node *n = xprt->recv_queue.rb_node; 871 struct rpc_rqst *req; 872 873 while (n != NULL) { 874 req = rb_entry(n, struct rpc_rqst, rq_recv); 875 switch (xprt_xid_cmp(xid, req->rq_xid)) { 876 case XID_RB_LEFT: 877 n = n->rb_left; 878 break; 879 case XID_RB_RIGHT: 880 n = n->rb_right; 881 break; 882 case XID_RB_EQUAL: 883 return req; 884 } 885 } 886 return NULL; 887 } 888 889 static void 890 xprt_request_rb_insert(struct rpc_xprt *xprt, struct rpc_rqst *new) 891 { 892 struct rb_node **p = &xprt->recv_queue.rb_node; 893 struct rb_node *n = NULL; 894 struct rpc_rqst *req; 895 896 while (*p != NULL) { 897 n = *p; 898 req = rb_entry(n, struct rpc_rqst, rq_recv); 899 switch(xprt_xid_cmp(new->rq_xid, req->rq_xid)) { 900 case XID_RB_LEFT: 901 p = &n->rb_left; 902 break; 903 case XID_RB_RIGHT: 904 p = &n->rb_right; 905 break; 906 case XID_RB_EQUAL: 907 WARN_ON_ONCE(new != req); 908 return; 909 } 910 } 911 rb_link_node(&new->rq_recv, n, p); 912 rb_insert_color(&new->rq_recv, &xprt->recv_queue); 913 } 914 915 static void 916 xprt_request_rb_remove(struct rpc_xprt *xprt, struct rpc_rqst *req) 917 { 918 rb_erase(&req->rq_recv, &xprt->recv_queue); 919 } 920 921 /** 922 * xprt_lookup_rqst - find an RPC request corresponding to an XID 923 * @xprt: transport on which the original request was transmitted 924 * @xid: RPC XID of incoming reply 925 * 926 * Caller holds xprt->queue_lock. 927 */ 928 struct rpc_rqst *xprt_lookup_rqst(struct rpc_xprt *xprt, __be32 xid) 929 { 930 struct rpc_rqst *entry; 931 932 entry = xprt_request_rb_find(xprt, xid); 933 if (entry != NULL) { 934 trace_xprt_lookup_rqst(xprt, xid, 0); 935 entry->rq_rtt = ktime_sub(ktime_get(), entry->rq_xtime); 936 return entry; 937 } 938 939 dprintk("RPC: xprt_lookup_rqst did not find xid %08x\n", 940 ntohl(xid)); 941 trace_xprt_lookup_rqst(xprt, xid, -ENOENT); 942 xprt->stat.bad_xids++; 943 return NULL; 944 } 945 EXPORT_SYMBOL_GPL(xprt_lookup_rqst); 946 947 static bool 948 xprt_is_pinned_rqst(struct rpc_rqst *req) 949 { 950 return atomic_read(&req->rq_pin) != 0; 951 } 952 953 /** 954 * xprt_pin_rqst - Pin a request on the transport receive list 955 * @req: Request to pin 956 * 957 * Caller must ensure this is atomic with the call to xprt_lookup_rqst() 958 * so should be holding xprt->queue_lock. 959 */ 960 void xprt_pin_rqst(struct rpc_rqst *req) 961 { 962 atomic_inc(&req->rq_pin); 963 } 964 EXPORT_SYMBOL_GPL(xprt_pin_rqst); 965 966 /** 967 * xprt_unpin_rqst - Unpin a request on the transport receive list 968 * @req: Request to pin 969 * 970 * Caller should be holding xprt->queue_lock. 971 */ 972 void xprt_unpin_rqst(struct rpc_rqst *req) 973 { 974 if (!test_bit(RPC_TASK_MSG_PIN_WAIT, &req->rq_task->tk_runstate)) { 975 atomic_dec(&req->rq_pin); 976 return; 977 } 978 if (atomic_dec_and_test(&req->rq_pin)) 979 wake_up_var(&req->rq_pin); 980 } 981 EXPORT_SYMBOL_GPL(xprt_unpin_rqst); 982 983 static void xprt_wait_on_pinned_rqst(struct rpc_rqst *req) 984 { 985 wait_var_event(&req->rq_pin, !xprt_is_pinned_rqst(req)); 986 } 987 988 static bool 989 xprt_request_data_received(struct rpc_task *task) 990 { 991 return !test_bit(RPC_TASK_NEED_RECV, &task->tk_runstate) && 992 READ_ONCE(task->tk_rqstp->rq_reply_bytes_recvd) != 0; 993 } 994 995 static bool 996 xprt_request_need_enqueue_receive(struct rpc_task *task, struct rpc_rqst *req) 997 { 998 return !test_bit(RPC_TASK_NEED_RECV, &task->tk_runstate) && 999 READ_ONCE(task->tk_rqstp->rq_reply_bytes_recvd) == 0; 1000 } 1001 1002 /** 1003 * xprt_request_enqueue_receive - Add an request to the receive queue 1004 * @task: RPC task 1005 * 1006 */ 1007 void 1008 xprt_request_enqueue_receive(struct rpc_task *task) 1009 { 1010 struct rpc_rqst *req = task->tk_rqstp; 1011 struct rpc_xprt *xprt = req->rq_xprt; 1012 1013 if (!xprt_request_need_enqueue_receive(task, req)) 1014 return; 1015 spin_lock(&xprt->queue_lock); 1016 1017 /* Update the softirq receive buffer */ 1018 memcpy(&req->rq_private_buf, &req->rq_rcv_buf, 1019 sizeof(req->rq_private_buf)); 1020 1021 /* Add request to the receive list */ 1022 xprt_request_rb_insert(xprt, req); 1023 set_bit(RPC_TASK_NEED_RECV, &task->tk_runstate); 1024 spin_unlock(&xprt->queue_lock); 1025 1026 /* Turn off autodisconnect */ 1027 del_singleshot_timer_sync(&xprt->timer); 1028 } 1029 1030 /** 1031 * xprt_request_dequeue_receive_locked - Remove a request from the receive queue 1032 * @task: RPC task 1033 * 1034 * Caller must hold xprt->queue_lock. 1035 */ 1036 static void 1037 xprt_request_dequeue_receive_locked(struct rpc_task *task) 1038 { 1039 struct rpc_rqst *req = task->tk_rqstp; 1040 1041 if (test_and_clear_bit(RPC_TASK_NEED_RECV, &task->tk_runstate)) 1042 xprt_request_rb_remove(req->rq_xprt, req); 1043 } 1044 1045 /** 1046 * xprt_update_rtt - Update RPC RTT statistics 1047 * @task: RPC request that recently completed 1048 * 1049 * Caller holds xprt->queue_lock. 1050 */ 1051 void xprt_update_rtt(struct rpc_task *task) 1052 { 1053 struct rpc_rqst *req = task->tk_rqstp; 1054 struct rpc_rtt *rtt = task->tk_client->cl_rtt; 1055 unsigned int timer = task->tk_msg.rpc_proc->p_timer; 1056 long m = usecs_to_jiffies(ktime_to_us(req->rq_rtt)); 1057 1058 if (timer) { 1059 if (req->rq_ntrans == 1) 1060 rpc_update_rtt(rtt, timer, m); 1061 rpc_set_timeo(rtt, timer, req->rq_ntrans - 1); 1062 } 1063 } 1064 EXPORT_SYMBOL_GPL(xprt_update_rtt); 1065 1066 /** 1067 * xprt_complete_rqst - called when reply processing is complete 1068 * @task: RPC request that recently completed 1069 * @copied: actual number of bytes received from the transport 1070 * 1071 * Caller holds xprt->queue_lock. 1072 */ 1073 void xprt_complete_rqst(struct rpc_task *task, int copied) 1074 { 1075 struct rpc_rqst *req = task->tk_rqstp; 1076 struct rpc_xprt *xprt = req->rq_xprt; 1077 1078 dprintk("RPC: %5u xid %08x complete (%d bytes received)\n", 1079 task->tk_pid, ntohl(req->rq_xid), copied); 1080 trace_xprt_complete_rqst(xprt, req->rq_xid, copied); 1081 1082 xprt->stat.recvs++; 1083 1084 req->rq_private_buf.len = copied; 1085 /* Ensure all writes are done before we update */ 1086 /* req->rq_reply_bytes_recvd */ 1087 smp_wmb(); 1088 req->rq_reply_bytes_recvd = copied; 1089 xprt_request_dequeue_receive_locked(task); 1090 rpc_wake_up_queued_task(&xprt->pending, task); 1091 } 1092 EXPORT_SYMBOL_GPL(xprt_complete_rqst); 1093 1094 static void xprt_timer(struct rpc_task *task) 1095 { 1096 struct rpc_rqst *req = task->tk_rqstp; 1097 struct rpc_xprt *xprt = req->rq_xprt; 1098 1099 if (task->tk_status != -ETIMEDOUT) 1100 return; 1101 1102 trace_xprt_timer(xprt, req->rq_xid, task->tk_status); 1103 if (!req->rq_reply_bytes_recvd) { 1104 if (xprt->ops->timer) 1105 xprt->ops->timer(xprt, task); 1106 } else 1107 task->tk_status = 0; 1108 } 1109 1110 /** 1111 * xprt_wait_for_reply_request_def - wait for reply 1112 * @task: pointer to rpc_task 1113 * 1114 * Set a request's retransmit timeout based on the transport's 1115 * default timeout parameters. Used by transports that don't adjust 1116 * the retransmit timeout based on round-trip time estimation, 1117 * and put the task to sleep on the pending queue. 1118 */ 1119 void xprt_wait_for_reply_request_def(struct rpc_task *task) 1120 { 1121 struct rpc_rqst *req = task->tk_rqstp; 1122 1123 rpc_sleep_on_timeout(&req->rq_xprt->pending, task, xprt_timer, 1124 xprt_request_timeout(req)); 1125 } 1126 EXPORT_SYMBOL_GPL(xprt_wait_for_reply_request_def); 1127 1128 /** 1129 * xprt_wait_for_reply_request_rtt - wait for reply using RTT estimator 1130 * @task: pointer to rpc_task 1131 * 1132 * Set a request's retransmit timeout using the RTT estimator, 1133 * and put the task to sleep on the pending queue. 1134 */ 1135 void xprt_wait_for_reply_request_rtt(struct rpc_task *task) 1136 { 1137 int timer = task->tk_msg.rpc_proc->p_timer; 1138 struct rpc_clnt *clnt = task->tk_client; 1139 struct rpc_rtt *rtt = clnt->cl_rtt; 1140 struct rpc_rqst *req = task->tk_rqstp; 1141 unsigned long max_timeout = clnt->cl_timeout->to_maxval; 1142 unsigned long timeout; 1143 1144 timeout = rpc_calc_rto(rtt, timer); 1145 timeout <<= rpc_ntimeo(rtt, timer) + req->rq_retries; 1146 if (timeout > max_timeout || timeout == 0) 1147 timeout = max_timeout; 1148 rpc_sleep_on_timeout(&req->rq_xprt->pending, task, xprt_timer, 1149 jiffies + timeout); 1150 } 1151 EXPORT_SYMBOL_GPL(xprt_wait_for_reply_request_rtt); 1152 1153 /** 1154 * xprt_request_wait_receive - wait for the reply to an RPC request 1155 * @task: RPC task about to send a request 1156 * 1157 */ 1158 void xprt_request_wait_receive(struct rpc_task *task) 1159 { 1160 struct rpc_rqst *req = task->tk_rqstp; 1161 struct rpc_xprt *xprt = req->rq_xprt; 1162 1163 if (!test_bit(RPC_TASK_NEED_RECV, &task->tk_runstate)) 1164 return; 1165 /* 1166 * Sleep on the pending queue if we're expecting a reply. 1167 * The spinlock ensures atomicity between the test of 1168 * req->rq_reply_bytes_recvd, and the call to rpc_sleep_on(). 1169 */ 1170 spin_lock(&xprt->queue_lock); 1171 if (test_bit(RPC_TASK_NEED_RECV, &task->tk_runstate)) { 1172 xprt->ops->wait_for_reply_request(task); 1173 /* 1174 * Send an extra queue wakeup call if the 1175 * connection was dropped in case the call to 1176 * rpc_sleep_on() raced. 1177 */ 1178 if (xprt_request_retransmit_after_disconnect(task)) 1179 rpc_wake_up_queued_task_set_status(&xprt->pending, 1180 task, -ENOTCONN); 1181 } 1182 spin_unlock(&xprt->queue_lock); 1183 } 1184 1185 static bool 1186 xprt_request_need_enqueue_transmit(struct rpc_task *task, struct rpc_rqst *req) 1187 { 1188 return !test_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate); 1189 } 1190 1191 /** 1192 * xprt_request_enqueue_transmit - queue a task for transmission 1193 * @task: pointer to rpc_task 1194 * 1195 * Add a task to the transmission queue. 1196 */ 1197 void 1198 xprt_request_enqueue_transmit(struct rpc_task *task) 1199 { 1200 struct rpc_rqst *pos, *req = task->tk_rqstp; 1201 struct rpc_xprt *xprt = req->rq_xprt; 1202 1203 if (xprt_request_need_enqueue_transmit(task, req)) { 1204 req->rq_bytes_sent = 0; 1205 spin_lock(&xprt->queue_lock); 1206 /* 1207 * Requests that carry congestion control credits are added 1208 * to the head of the list to avoid starvation issues. 1209 */ 1210 if (req->rq_cong) { 1211 xprt_clear_congestion_window_wait(xprt); 1212 list_for_each_entry(pos, &xprt->xmit_queue, rq_xmit) { 1213 if (pos->rq_cong) 1214 continue; 1215 /* Note: req is added _before_ pos */ 1216 list_add_tail(&req->rq_xmit, &pos->rq_xmit); 1217 INIT_LIST_HEAD(&req->rq_xmit2); 1218 trace_xprt_enq_xmit(task, 1); 1219 goto out; 1220 } 1221 } else if (RPC_IS_SWAPPER(task)) { 1222 list_for_each_entry(pos, &xprt->xmit_queue, rq_xmit) { 1223 if (pos->rq_cong || pos->rq_bytes_sent) 1224 continue; 1225 if (RPC_IS_SWAPPER(pos->rq_task)) 1226 continue; 1227 /* Note: req is added _before_ pos */ 1228 list_add_tail(&req->rq_xmit, &pos->rq_xmit); 1229 INIT_LIST_HEAD(&req->rq_xmit2); 1230 trace_xprt_enq_xmit(task, 2); 1231 goto out; 1232 } 1233 } else if (!req->rq_seqno) { 1234 list_for_each_entry(pos, &xprt->xmit_queue, rq_xmit) { 1235 if (pos->rq_task->tk_owner != task->tk_owner) 1236 continue; 1237 list_add_tail(&req->rq_xmit2, &pos->rq_xmit2); 1238 INIT_LIST_HEAD(&req->rq_xmit); 1239 trace_xprt_enq_xmit(task, 3); 1240 goto out; 1241 } 1242 } 1243 list_add_tail(&req->rq_xmit, &xprt->xmit_queue); 1244 INIT_LIST_HEAD(&req->rq_xmit2); 1245 trace_xprt_enq_xmit(task, 4); 1246 out: 1247 set_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate); 1248 spin_unlock(&xprt->queue_lock); 1249 } 1250 } 1251 1252 /** 1253 * xprt_request_dequeue_transmit_locked - remove a task from the transmission queue 1254 * @task: pointer to rpc_task 1255 * 1256 * Remove a task from the transmission queue 1257 * Caller must hold xprt->queue_lock 1258 */ 1259 static void 1260 xprt_request_dequeue_transmit_locked(struct rpc_task *task) 1261 { 1262 struct rpc_rqst *req = task->tk_rqstp; 1263 1264 if (!test_and_clear_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate)) 1265 return; 1266 if (!list_empty(&req->rq_xmit)) { 1267 list_del(&req->rq_xmit); 1268 if (!list_empty(&req->rq_xmit2)) { 1269 struct rpc_rqst *next = list_first_entry(&req->rq_xmit2, 1270 struct rpc_rqst, rq_xmit2); 1271 list_del(&req->rq_xmit2); 1272 list_add_tail(&next->rq_xmit, &next->rq_xprt->xmit_queue); 1273 } 1274 } else 1275 list_del(&req->rq_xmit2); 1276 } 1277 1278 /** 1279 * xprt_request_dequeue_transmit - remove a task from the transmission queue 1280 * @task: pointer to rpc_task 1281 * 1282 * Remove a task from the transmission queue 1283 */ 1284 static void 1285 xprt_request_dequeue_transmit(struct rpc_task *task) 1286 { 1287 struct rpc_rqst *req = task->tk_rqstp; 1288 struct rpc_xprt *xprt = req->rq_xprt; 1289 1290 spin_lock(&xprt->queue_lock); 1291 xprt_request_dequeue_transmit_locked(task); 1292 spin_unlock(&xprt->queue_lock); 1293 } 1294 1295 /** 1296 * xprt_request_prepare - prepare an encoded request for transport 1297 * @req: pointer to rpc_rqst 1298 * 1299 * Calls into the transport layer to do whatever is needed to prepare 1300 * the request for transmission or receive. 1301 */ 1302 void 1303 xprt_request_prepare(struct rpc_rqst *req) 1304 { 1305 struct rpc_xprt *xprt = req->rq_xprt; 1306 1307 if (xprt->ops->prepare_request) 1308 xprt->ops->prepare_request(req); 1309 } 1310 1311 /** 1312 * xprt_request_need_retransmit - Test if a task needs retransmission 1313 * @task: pointer to rpc_task 1314 * 1315 * Test for whether a connection breakage requires the task to retransmit 1316 */ 1317 bool 1318 xprt_request_need_retransmit(struct rpc_task *task) 1319 { 1320 return xprt_request_retransmit_after_disconnect(task); 1321 } 1322 1323 /** 1324 * xprt_prepare_transmit - reserve the transport before sending a request 1325 * @task: RPC task about to send a request 1326 * 1327 */ 1328 bool xprt_prepare_transmit(struct rpc_task *task) 1329 { 1330 struct rpc_rqst *req = task->tk_rqstp; 1331 struct rpc_xprt *xprt = req->rq_xprt; 1332 1333 dprintk("RPC: %5u xprt_prepare_transmit\n", task->tk_pid); 1334 1335 if (!xprt_lock_write(xprt, task)) { 1336 /* Race breaker: someone may have transmitted us */ 1337 if (!test_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate)) 1338 rpc_wake_up_queued_task_set_status(&xprt->sending, 1339 task, 0); 1340 return false; 1341 1342 } 1343 return true; 1344 } 1345 1346 void xprt_end_transmit(struct rpc_task *task) 1347 { 1348 xprt_release_write(task->tk_rqstp->rq_xprt, task); 1349 } 1350 1351 /** 1352 * xprt_request_transmit - send an RPC request on a transport 1353 * @req: pointer to request to transmit 1354 * @snd_task: RPC task that owns the transport lock 1355 * 1356 * This performs the transmission of a single request. 1357 * Note that if the request is not the same as snd_task, then it 1358 * does need to be pinned. 1359 * Returns '0' on success. 1360 */ 1361 static int 1362 xprt_request_transmit(struct rpc_rqst *req, struct rpc_task *snd_task) 1363 { 1364 struct rpc_xprt *xprt = req->rq_xprt; 1365 struct rpc_task *task = req->rq_task; 1366 unsigned int connect_cookie; 1367 int is_retrans = RPC_WAS_SENT(task); 1368 int status; 1369 1370 if (!req->rq_bytes_sent) { 1371 if (xprt_request_data_received(task)) { 1372 status = 0; 1373 goto out_dequeue; 1374 } 1375 /* Verify that our message lies in the RPCSEC_GSS window */ 1376 if (rpcauth_xmit_need_reencode(task)) { 1377 status = -EBADMSG; 1378 goto out_dequeue; 1379 } 1380 if (task->tk_ops->rpc_call_prepare_transmit) { 1381 task->tk_ops->rpc_call_prepare_transmit(task, 1382 task->tk_calldata); 1383 status = task->tk_status; 1384 if (status < 0) 1385 goto out_dequeue; 1386 } 1387 if (RPC_SIGNALLED(task)) { 1388 status = -ERESTARTSYS; 1389 goto out_dequeue; 1390 } 1391 } 1392 1393 /* 1394 * Update req->rq_ntrans before transmitting to avoid races with 1395 * xprt_update_rtt(), which needs to know that it is recording a 1396 * reply to the first transmission. 1397 */ 1398 req->rq_ntrans++; 1399 1400 connect_cookie = xprt->connect_cookie; 1401 status = xprt->ops->send_request(req); 1402 if (status != 0) { 1403 req->rq_ntrans--; 1404 trace_xprt_transmit(req, status); 1405 return status; 1406 } 1407 1408 if (is_retrans) 1409 task->tk_client->cl_stats->rpcretrans++; 1410 1411 xprt_inject_disconnect(xprt); 1412 1413 task->tk_flags |= RPC_TASK_SENT; 1414 spin_lock_bh(&xprt->transport_lock); 1415 1416 xprt->stat.sends++; 1417 xprt->stat.req_u += xprt->stat.sends - xprt->stat.recvs; 1418 xprt->stat.bklog_u += xprt->backlog.qlen; 1419 xprt->stat.sending_u += xprt->sending.qlen; 1420 xprt->stat.pending_u += xprt->pending.qlen; 1421 spin_unlock_bh(&xprt->transport_lock); 1422 1423 req->rq_connect_cookie = connect_cookie; 1424 out_dequeue: 1425 trace_xprt_transmit(req, status); 1426 xprt_request_dequeue_transmit(task); 1427 rpc_wake_up_queued_task_set_status(&xprt->sending, task, status); 1428 return status; 1429 } 1430 1431 /** 1432 * xprt_transmit - send an RPC request on a transport 1433 * @task: controlling RPC task 1434 * 1435 * Attempts to drain the transmit queue. On exit, either the transport 1436 * signalled an error that needs to be handled before transmission can 1437 * resume, or @task finished transmitting, and detected that it already 1438 * received a reply. 1439 */ 1440 void 1441 xprt_transmit(struct rpc_task *task) 1442 { 1443 struct rpc_rqst *next, *req = task->tk_rqstp; 1444 struct rpc_xprt *xprt = req->rq_xprt; 1445 int status; 1446 1447 spin_lock(&xprt->queue_lock); 1448 while (!list_empty(&xprt->xmit_queue)) { 1449 next = list_first_entry(&xprt->xmit_queue, 1450 struct rpc_rqst, rq_xmit); 1451 xprt_pin_rqst(next); 1452 spin_unlock(&xprt->queue_lock); 1453 status = xprt_request_transmit(next, task); 1454 if (status == -EBADMSG && next != req) 1455 status = 0; 1456 cond_resched(); 1457 spin_lock(&xprt->queue_lock); 1458 xprt_unpin_rqst(next); 1459 if (status == 0) { 1460 if (!xprt_request_data_received(task) || 1461 test_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate)) 1462 continue; 1463 } else if (test_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate)) 1464 task->tk_status = status; 1465 break; 1466 } 1467 spin_unlock(&xprt->queue_lock); 1468 } 1469 1470 static void xprt_add_backlog(struct rpc_xprt *xprt, struct rpc_task *task) 1471 { 1472 set_bit(XPRT_CONGESTED, &xprt->state); 1473 rpc_sleep_on(&xprt->backlog, task, NULL); 1474 } 1475 1476 static void xprt_wake_up_backlog(struct rpc_xprt *xprt) 1477 { 1478 if (rpc_wake_up_next(&xprt->backlog) == NULL) 1479 clear_bit(XPRT_CONGESTED, &xprt->state); 1480 } 1481 1482 static bool xprt_throttle_congested(struct rpc_xprt *xprt, struct rpc_task *task) 1483 { 1484 bool ret = false; 1485 1486 if (!test_bit(XPRT_CONGESTED, &xprt->state)) 1487 goto out; 1488 spin_lock(&xprt->reserve_lock); 1489 if (test_bit(XPRT_CONGESTED, &xprt->state)) { 1490 rpc_sleep_on(&xprt->backlog, task, NULL); 1491 ret = true; 1492 } 1493 spin_unlock(&xprt->reserve_lock); 1494 out: 1495 return ret; 1496 } 1497 1498 static struct rpc_rqst *xprt_dynamic_alloc_slot(struct rpc_xprt *xprt) 1499 { 1500 struct rpc_rqst *req = ERR_PTR(-EAGAIN); 1501 1502 if (xprt->num_reqs >= xprt->max_reqs) 1503 goto out; 1504 ++xprt->num_reqs; 1505 spin_unlock(&xprt->reserve_lock); 1506 req = kzalloc(sizeof(struct rpc_rqst), GFP_NOFS); 1507 spin_lock(&xprt->reserve_lock); 1508 if (req != NULL) 1509 goto out; 1510 --xprt->num_reqs; 1511 req = ERR_PTR(-ENOMEM); 1512 out: 1513 return req; 1514 } 1515 1516 static bool xprt_dynamic_free_slot(struct rpc_xprt *xprt, struct rpc_rqst *req) 1517 { 1518 if (xprt->num_reqs > xprt->min_reqs) { 1519 --xprt->num_reqs; 1520 kfree(req); 1521 return true; 1522 } 1523 return false; 1524 } 1525 1526 void xprt_alloc_slot(struct rpc_xprt *xprt, struct rpc_task *task) 1527 { 1528 struct rpc_rqst *req; 1529 1530 spin_lock(&xprt->reserve_lock); 1531 if (!list_empty(&xprt->free)) { 1532 req = list_entry(xprt->free.next, struct rpc_rqst, rq_list); 1533 list_del(&req->rq_list); 1534 goto out_init_req; 1535 } 1536 req = xprt_dynamic_alloc_slot(xprt); 1537 if (!IS_ERR(req)) 1538 goto out_init_req; 1539 switch (PTR_ERR(req)) { 1540 case -ENOMEM: 1541 dprintk("RPC: dynamic allocation of request slot " 1542 "failed! Retrying\n"); 1543 task->tk_status = -ENOMEM; 1544 break; 1545 case -EAGAIN: 1546 xprt_add_backlog(xprt, task); 1547 dprintk("RPC: waiting for request slot\n"); 1548 /* fall through */ 1549 default: 1550 task->tk_status = -EAGAIN; 1551 } 1552 spin_unlock(&xprt->reserve_lock); 1553 return; 1554 out_init_req: 1555 xprt->stat.max_slots = max_t(unsigned int, xprt->stat.max_slots, 1556 xprt->num_reqs); 1557 spin_unlock(&xprt->reserve_lock); 1558 1559 task->tk_status = 0; 1560 task->tk_rqstp = req; 1561 } 1562 EXPORT_SYMBOL_GPL(xprt_alloc_slot); 1563 1564 void xprt_free_slot(struct rpc_xprt *xprt, struct rpc_rqst *req) 1565 { 1566 spin_lock(&xprt->reserve_lock); 1567 if (!xprt_dynamic_free_slot(xprt, req)) { 1568 memset(req, 0, sizeof(*req)); /* mark unused */ 1569 list_add(&req->rq_list, &xprt->free); 1570 } 1571 xprt_wake_up_backlog(xprt); 1572 spin_unlock(&xprt->reserve_lock); 1573 } 1574 EXPORT_SYMBOL_GPL(xprt_free_slot); 1575 1576 static void xprt_free_all_slots(struct rpc_xprt *xprt) 1577 { 1578 struct rpc_rqst *req; 1579 while (!list_empty(&xprt->free)) { 1580 req = list_first_entry(&xprt->free, struct rpc_rqst, rq_list); 1581 list_del(&req->rq_list); 1582 kfree(req); 1583 } 1584 } 1585 1586 struct rpc_xprt *xprt_alloc(struct net *net, size_t size, 1587 unsigned int num_prealloc, 1588 unsigned int max_alloc) 1589 { 1590 struct rpc_xprt *xprt; 1591 struct rpc_rqst *req; 1592 int i; 1593 1594 xprt = kzalloc(size, GFP_KERNEL); 1595 if (xprt == NULL) 1596 goto out; 1597 1598 xprt_init(xprt, net); 1599 1600 for (i = 0; i < num_prealloc; i++) { 1601 req = kzalloc(sizeof(struct rpc_rqst), GFP_KERNEL); 1602 if (!req) 1603 goto out_free; 1604 list_add(&req->rq_list, &xprt->free); 1605 } 1606 if (max_alloc > num_prealloc) 1607 xprt->max_reqs = max_alloc; 1608 else 1609 xprt->max_reqs = num_prealloc; 1610 xprt->min_reqs = num_prealloc; 1611 xprt->num_reqs = num_prealloc; 1612 1613 return xprt; 1614 1615 out_free: 1616 xprt_free(xprt); 1617 out: 1618 return NULL; 1619 } 1620 EXPORT_SYMBOL_GPL(xprt_alloc); 1621 1622 void xprt_free(struct rpc_xprt *xprt) 1623 { 1624 put_net(xprt->xprt_net); 1625 xprt_free_all_slots(xprt); 1626 kfree_rcu(xprt, rcu); 1627 } 1628 EXPORT_SYMBOL_GPL(xprt_free); 1629 1630 static void 1631 xprt_init_connect_cookie(struct rpc_rqst *req, struct rpc_xprt *xprt) 1632 { 1633 req->rq_connect_cookie = xprt_connect_cookie(xprt) - 1; 1634 } 1635 1636 static __be32 1637 xprt_alloc_xid(struct rpc_xprt *xprt) 1638 { 1639 __be32 xid; 1640 1641 spin_lock(&xprt->reserve_lock); 1642 xid = (__force __be32)xprt->xid++; 1643 spin_unlock(&xprt->reserve_lock); 1644 return xid; 1645 } 1646 1647 static void 1648 xprt_init_xid(struct rpc_xprt *xprt) 1649 { 1650 xprt->xid = prandom_u32(); 1651 } 1652 1653 static void 1654 xprt_request_init(struct rpc_task *task) 1655 { 1656 struct rpc_xprt *xprt = task->tk_xprt; 1657 struct rpc_rqst *req = task->tk_rqstp; 1658 1659 req->rq_task = task; 1660 req->rq_xprt = xprt; 1661 req->rq_buffer = NULL; 1662 req->rq_xid = xprt_alloc_xid(xprt); 1663 xprt_init_connect_cookie(req, xprt); 1664 req->rq_snd_buf.len = 0; 1665 req->rq_snd_buf.buflen = 0; 1666 req->rq_rcv_buf.len = 0; 1667 req->rq_rcv_buf.buflen = 0; 1668 req->rq_snd_buf.bvec = NULL; 1669 req->rq_rcv_buf.bvec = NULL; 1670 req->rq_release_snd_buf = NULL; 1671 xprt_init_majortimeo(task, req); 1672 dprintk("RPC: %5u reserved req %p xid %08x\n", task->tk_pid, 1673 req, ntohl(req->rq_xid)); 1674 } 1675 1676 static void 1677 xprt_do_reserve(struct rpc_xprt *xprt, struct rpc_task *task) 1678 { 1679 xprt->ops->alloc_slot(xprt, task); 1680 if (task->tk_rqstp != NULL) 1681 xprt_request_init(task); 1682 } 1683 1684 /** 1685 * xprt_reserve - allocate an RPC request slot 1686 * @task: RPC task requesting a slot allocation 1687 * 1688 * If the transport is marked as being congested, or if no more 1689 * slots are available, place the task on the transport's 1690 * backlog queue. 1691 */ 1692 void xprt_reserve(struct rpc_task *task) 1693 { 1694 struct rpc_xprt *xprt = task->tk_xprt; 1695 1696 task->tk_status = 0; 1697 if (task->tk_rqstp != NULL) 1698 return; 1699 1700 task->tk_status = -EAGAIN; 1701 if (!xprt_throttle_congested(xprt, task)) 1702 xprt_do_reserve(xprt, task); 1703 } 1704 1705 /** 1706 * xprt_retry_reserve - allocate an RPC request slot 1707 * @task: RPC task requesting a slot allocation 1708 * 1709 * If no more slots are available, place the task on the transport's 1710 * backlog queue. 1711 * Note that the only difference with xprt_reserve is that we now 1712 * ignore the value of the XPRT_CONGESTED flag. 1713 */ 1714 void xprt_retry_reserve(struct rpc_task *task) 1715 { 1716 struct rpc_xprt *xprt = task->tk_xprt; 1717 1718 task->tk_status = 0; 1719 if (task->tk_rqstp != NULL) 1720 return; 1721 1722 task->tk_status = -EAGAIN; 1723 xprt_do_reserve(xprt, task); 1724 } 1725 1726 static void 1727 xprt_request_dequeue_all(struct rpc_task *task, struct rpc_rqst *req) 1728 { 1729 struct rpc_xprt *xprt = req->rq_xprt; 1730 1731 if (test_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate) || 1732 test_bit(RPC_TASK_NEED_RECV, &task->tk_runstate) || 1733 xprt_is_pinned_rqst(req)) { 1734 spin_lock(&xprt->queue_lock); 1735 xprt_request_dequeue_transmit_locked(task); 1736 xprt_request_dequeue_receive_locked(task); 1737 while (xprt_is_pinned_rqst(req)) { 1738 set_bit(RPC_TASK_MSG_PIN_WAIT, &task->tk_runstate); 1739 spin_unlock(&xprt->queue_lock); 1740 xprt_wait_on_pinned_rqst(req); 1741 spin_lock(&xprt->queue_lock); 1742 clear_bit(RPC_TASK_MSG_PIN_WAIT, &task->tk_runstate); 1743 } 1744 spin_unlock(&xprt->queue_lock); 1745 } 1746 } 1747 1748 /** 1749 * xprt_release - release an RPC request slot 1750 * @task: task which is finished with the slot 1751 * 1752 */ 1753 void xprt_release(struct rpc_task *task) 1754 { 1755 struct rpc_xprt *xprt; 1756 struct rpc_rqst *req = task->tk_rqstp; 1757 1758 if (req == NULL) { 1759 if (task->tk_client) { 1760 xprt = task->tk_xprt; 1761 xprt_release_write(xprt, task); 1762 } 1763 return; 1764 } 1765 1766 xprt = req->rq_xprt; 1767 if (task->tk_ops->rpc_count_stats != NULL) 1768 task->tk_ops->rpc_count_stats(task, task->tk_calldata); 1769 else if (task->tk_client) 1770 rpc_count_iostats(task, task->tk_client->cl_metrics); 1771 xprt_request_dequeue_all(task, req); 1772 spin_lock_bh(&xprt->transport_lock); 1773 xprt->ops->release_xprt(xprt, task); 1774 if (xprt->ops->release_request) 1775 xprt->ops->release_request(task); 1776 xprt->last_used = jiffies; 1777 xprt_schedule_autodisconnect(xprt); 1778 spin_unlock_bh(&xprt->transport_lock); 1779 if (req->rq_buffer) 1780 xprt->ops->buf_free(task); 1781 xprt_inject_disconnect(xprt); 1782 xdr_free_bvec(&req->rq_rcv_buf); 1783 xdr_free_bvec(&req->rq_snd_buf); 1784 if (req->rq_cred != NULL) 1785 put_rpccred(req->rq_cred); 1786 task->tk_rqstp = NULL; 1787 if (req->rq_release_snd_buf) 1788 req->rq_release_snd_buf(req); 1789 1790 dprintk("RPC: %5u release request %p\n", task->tk_pid, req); 1791 if (likely(!bc_prealloc(req))) 1792 xprt->ops->free_slot(xprt, req); 1793 else 1794 xprt_free_bc_request(req); 1795 } 1796 1797 #ifdef CONFIG_SUNRPC_BACKCHANNEL 1798 void 1799 xprt_init_bc_request(struct rpc_rqst *req, struct rpc_task *task) 1800 { 1801 struct xdr_buf *xbufp = &req->rq_snd_buf; 1802 1803 task->tk_rqstp = req; 1804 req->rq_task = task; 1805 xprt_init_connect_cookie(req, req->rq_xprt); 1806 /* 1807 * Set up the xdr_buf length. 1808 * This also indicates that the buffer is XDR encoded already. 1809 */ 1810 xbufp->len = xbufp->head[0].iov_len + xbufp->page_len + 1811 xbufp->tail[0].iov_len; 1812 } 1813 #endif 1814 1815 static void xprt_init(struct rpc_xprt *xprt, struct net *net) 1816 { 1817 kref_init(&xprt->kref); 1818 1819 spin_lock_init(&xprt->transport_lock); 1820 spin_lock_init(&xprt->reserve_lock); 1821 spin_lock_init(&xprt->queue_lock); 1822 1823 INIT_LIST_HEAD(&xprt->free); 1824 xprt->recv_queue = RB_ROOT; 1825 INIT_LIST_HEAD(&xprt->xmit_queue); 1826 #if defined(CONFIG_SUNRPC_BACKCHANNEL) 1827 spin_lock_init(&xprt->bc_pa_lock); 1828 INIT_LIST_HEAD(&xprt->bc_pa_list); 1829 #endif /* CONFIG_SUNRPC_BACKCHANNEL */ 1830 INIT_LIST_HEAD(&xprt->xprt_switch); 1831 1832 xprt->last_used = jiffies; 1833 xprt->cwnd = RPC_INITCWND; 1834 xprt->bind_index = 0; 1835 1836 rpc_init_wait_queue(&xprt->binding, "xprt_binding"); 1837 rpc_init_wait_queue(&xprt->pending, "xprt_pending"); 1838 rpc_init_wait_queue(&xprt->sending, "xprt_sending"); 1839 rpc_init_priority_wait_queue(&xprt->backlog, "xprt_backlog"); 1840 1841 xprt_init_xid(xprt); 1842 1843 xprt->xprt_net = get_net(net); 1844 } 1845 1846 /** 1847 * xprt_create_transport - create an RPC transport 1848 * @args: rpc transport creation arguments 1849 * 1850 */ 1851 struct rpc_xprt *xprt_create_transport(struct xprt_create *args) 1852 { 1853 struct rpc_xprt *xprt; 1854 struct xprt_class *t; 1855 1856 spin_lock(&xprt_list_lock); 1857 list_for_each_entry(t, &xprt_list, list) { 1858 if (t->ident == args->ident) { 1859 spin_unlock(&xprt_list_lock); 1860 goto found; 1861 } 1862 } 1863 spin_unlock(&xprt_list_lock); 1864 dprintk("RPC: transport (%d) not supported\n", args->ident); 1865 return ERR_PTR(-EIO); 1866 1867 found: 1868 xprt = t->setup(args); 1869 if (IS_ERR(xprt)) { 1870 dprintk("RPC: xprt_create_transport: failed, %ld\n", 1871 -PTR_ERR(xprt)); 1872 goto out; 1873 } 1874 if (args->flags & XPRT_CREATE_NO_IDLE_TIMEOUT) 1875 xprt->idle_timeout = 0; 1876 INIT_WORK(&xprt->task_cleanup, xprt_autoclose); 1877 if (xprt_has_timer(xprt)) 1878 timer_setup(&xprt->timer, 1879 xprt_init_autodisconnect, 1880 TIMER_DEFERRABLE); 1881 else 1882 timer_setup(&xprt->timer, NULL, 0); 1883 1884 if (strlen(args->servername) > RPC_MAXNETNAMELEN) { 1885 xprt_destroy(xprt); 1886 return ERR_PTR(-EINVAL); 1887 } 1888 xprt->servername = kstrdup(args->servername, GFP_KERNEL); 1889 if (xprt->servername == NULL) { 1890 xprt_destroy(xprt); 1891 return ERR_PTR(-ENOMEM); 1892 } 1893 1894 rpc_xprt_debugfs_register(xprt); 1895 1896 dprintk("RPC: created transport %p with %u slots\n", xprt, 1897 xprt->max_reqs); 1898 out: 1899 return xprt; 1900 } 1901 1902 static void xprt_destroy_cb(struct work_struct *work) 1903 { 1904 struct rpc_xprt *xprt = 1905 container_of(work, struct rpc_xprt, task_cleanup); 1906 1907 rpc_xprt_debugfs_unregister(xprt); 1908 rpc_destroy_wait_queue(&xprt->binding); 1909 rpc_destroy_wait_queue(&xprt->pending); 1910 rpc_destroy_wait_queue(&xprt->sending); 1911 rpc_destroy_wait_queue(&xprt->backlog); 1912 kfree(xprt->servername); 1913 /* 1914 * Tear down transport state and free the rpc_xprt 1915 */ 1916 xprt->ops->destroy(xprt); 1917 } 1918 1919 /** 1920 * xprt_destroy - destroy an RPC transport, killing off all requests. 1921 * @xprt: transport to destroy 1922 * 1923 */ 1924 static void xprt_destroy(struct rpc_xprt *xprt) 1925 { 1926 dprintk("RPC: destroying transport %p\n", xprt); 1927 1928 /* 1929 * Exclude transport connect/disconnect handlers and autoclose 1930 */ 1931 wait_on_bit_lock(&xprt->state, XPRT_LOCKED, TASK_UNINTERRUPTIBLE); 1932 1933 del_timer_sync(&xprt->timer); 1934 1935 /* 1936 * Destroy sockets etc from the system workqueue so they can 1937 * safely flush receive work running on rpciod. 1938 */ 1939 INIT_WORK(&xprt->task_cleanup, xprt_destroy_cb); 1940 schedule_work(&xprt->task_cleanup); 1941 } 1942 1943 static void xprt_destroy_kref(struct kref *kref) 1944 { 1945 xprt_destroy(container_of(kref, struct rpc_xprt, kref)); 1946 } 1947 1948 /** 1949 * xprt_get - return a reference to an RPC transport. 1950 * @xprt: pointer to the transport 1951 * 1952 */ 1953 struct rpc_xprt *xprt_get(struct rpc_xprt *xprt) 1954 { 1955 if (xprt != NULL && kref_get_unless_zero(&xprt->kref)) 1956 return xprt; 1957 return NULL; 1958 } 1959 EXPORT_SYMBOL_GPL(xprt_get); 1960 1961 /** 1962 * xprt_put - release a reference to an RPC transport. 1963 * @xprt: pointer to the transport 1964 * 1965 */ 1966 void xprt_put(struct rpc_xprt *xprt) 1967 { 1968 if (xprt != NULL) 1969 kref_put(&xprt->kref, xprt_destroy_kref); 1970 } 1971 EXPORT_SYMBOL_GPL(xprt_put); 1972