1 // SPDX-License-Identifier: GPL-2.0-only 2 /* 3 * linux/net/sunrpc/xprt.c 4 * 5 * This is a generic RPC call interface supporting congestion avoidance, 6 * and asynchronous calls. 7 * 8 * The interface works like this: 9 * 10 * - When a process places a call, it allocates a request slot if 11 * one is available. Otherwise, it sleeps on the backlog queue 12 * (xprt_reserve). 13 * - Next, the caller puts together the RPC message, stuffs it into 14 * the request struct, and calls xprt_transmit(). 15 * - xprt_transmit sends the message and installs the caller on the 16 * transport's wait list. At the same time, if a reply is expected, 17 * it installs a timer that is run after the packet's timeout has 18 * expired. 19 * - When a packet arrives, the data_ready handler walks the list of 20 * pending requests for that transport. If a matching XID is found, the 21 * caller is woken up, and the timer removed. 22 * - When no reply arrives within the timeout interval, the timer is 23 * fired by the kernel and runs xprt_timer(). It either adjusts the 24 * timeout values (minor timeout) or wakes up the caller with a status 25 * of -ETIMEDOUT. 26 * - When the caller receives a notification from RPC that a reply arrived, 27 * it should release the RPC slot, and process the reply. 28 * If the call timed out, it may choose to retry the operation by 29 * adjusting the initial timeout value, and simply calling rpc_call 30 * again. 31 * 32 * Support for async RPC is done through a set of RPC-specific scheduling 33 * primitives that `transparently' work for processes as well as async 34 * tasks that rely on callbacks. 35 * 36 * Copyright (C) 1995-1997, Olaf Kirch <okir@monad.swb.de> 37 * 38 * Transport switch API copyright (C) 2005, Chuck Lever <cel@netapp.com> 39 */ 40 41 #include <linux/module.h> 42 43 #include <linux/types.h> 44 #include <linux/interrupt.h> 45 #include <linux/workqueue.h> 46 #include <linux/net.h> 47 #include <linux/ktime.h> 48 49 #include <linux/sunrpc/clnt.h> 50 #include <linux/sunrpc/metrics.h> 51 #include <linux/sunrpc/bc_xprt.h> 52 #include <linux/rcupdate.h> 53 #include <linux/sched/mm.h> 54 55 #include <trace/events/sunrpc.h> 56 57 #include "sunrpc.h" 58 59 /* 60 * Local variables 61 */ 62 63 #if IS_ENABLED(CONFIG_SUNRPC_DEBUG) 64 # define RPCDBG_FACILITY RPCDBG_XPRT 65 #endif 66 67 /* 68 * Local functions 69 */ 70 static void xprt_init(struct rpc_xprt *xprt, struct net *net); 71 static __be32 xprt_alloc_xid(struct rpc_xprt *xprt); 72 static void xprt_destroy(struct rpc_xprt *xprt); 73 74 static DEFINE_SPINLOCK(xprt_list_lock); 75 static LIST_HEAD(xprt_list); 76 77 static unsigned long xprt_request_timeout(const struct rpc_rqst *req) 78 { 79 unsigned long timeout = jiffies + req->rq_timeout; 80 81 if (time_before(timeout, req->rq_majortimeo)) 82 return timeout; 83 return req->rq_majortimeo; 84 } 85 86 /** 87 * xprt_register_transport - register a transport implementation 88 * @transport: transport to register 89 * 90 * If a transport implementation is loaded as a kernel module, it can 91 * call this interface to make itself known to the RPC client. 92 * 93 * Returns: 94 * 0: transport successfully registered 95 * -EEXIST: transport already registered 96 * -EINVAL: transport module being unloaded 97 */ 98 int xprt_register_transport(struct xprt_class *transport) 99 { 100 struct xprt_class *t; 101 int result; 102 103 result = -EEXIST; 104 spin_lock(&xprt_list_lock); 105 list_for_each_entry(t, &xprt_list, list) { 106 /* don't register the same transport class twice */ 107 if (t->ident == transport->ident) 108 goto out; 109 } 110 111 list_add_tail(&transport->list, &xprt_list); 112 printk(KERN_INFO "RPC: Registered %s transport module.\n", 113 transport->name); 114 result = 0; 115 116 out: 117 spin_unlock(&xprt_list_lock); 118 return result; 119 } 120 EXPORT_SYMBOL_GPL(xprt_register_transport); 121 122 /** 123 * xprt_unregister_transport - unregister a transport implementation 124 * @transport: transport to unregister 125 * 126 * Returns: 127 * 0: transport successfully unregistered 128 * -ENOENT: transport never registered 129 */ 130 int xprt_unregister_transport(struct xprt_class *transport) 131 { 132 struct xprt_class *t; 133 int result; 134 135 result = 0; 136 spin_lock(&xprt_list_lock); 137 list_for_each_entry(t, &xprt_list, list) { 138 if (t == transport) { 139 printk(KERN_INFO 140 "RPC: Unregistered %s transport module.\n", 141 transport->name); 142 list_del_init(&transport->list); 143 goto out; 144 } 145 } 146 result = -ENOENT; 147 148 out: 149 spin_unlock(&xprt_list_lock); 150 return result; 151 } 152 EXPORT_SYMBOL_GPL(xprt_unregister_transport); 153 154 /** 155 * xprt_load_transport - load a transport implementation 156 * @transport_name: transport to load 157 * 158 * Returns: 159 * 0: transport successfully loaded 160 * -ENOENT: transport module not available 161 */ 162 int xprt_load_transport(const char *transport_name) 163 { 164 struct xprt_class *t; 165 int result; 166 167 result = 0; 168 spin_lock(&xprt_list_lock); 169 list_for_each_entry(t, &xprt_list, list) { 170 if (strcmp(t->name, transport_name) == 0) { 171 spin_unlock(&xprt_list_lock); 172 goto out; 173 } 174 } 175 spin_unlock(&xprt_list_lock); 176 result = request_module("xprt%s", transport_name); 177 out: 178 return result; 179 } 180 EXPORT_SYMBOL_GPL(xprt_load_transport); 181 182 static void xprt_clear_locked(struct rpc_xprt *xprt) 183 { 184 xprt->snd_task = NULL; 185 if (!test_bit(XPRT_CLOSE_WAIT, &xprt->state)) { 186 smp_mb__before_atomic(); 187 clear_bit(XPRT_LOCKED, &xprt->state); 188 smp_mb__after_atomic(); 189 } else 190 queue_work(xprtiod_workqueue, &xprt->task_cleanup); 191 } 192 193 /** 194 * xprt_reserve_xprt - serialize write access to transports 195 * @task: task that is requesting access to the transport 196 * @xprt: pointer to the target transport 197 * 198 * This prevents mixing the payload of separate requests, and prevents 199 * transport connects from colliding with writes. No congestion control 200 * is provided. 201 */ 202 int xprt_reserve_xprt(struct rpc_xprt *xprt, struct rpc_task *task) 203 { 204 struct rpc_rqst *req = task->tk_rqstp; 205 206 if (test_and_set_bit(XPRT_LOCKED, &xprt->state)) { 207 if (task == xprt->snd_task) 208 return 1; 209 goto out_sleep; 210 } 211 if (test_bit(XPRT_WRITE_SPACE, &xprt->state)) 212 goto out_unlock; 213 xprt->snd_task = task; 214 215 return 1; 216 217 out_unlock: 218 xprt_clear_locked(xprt); 219 out_sleep: 220 dprintk("RPC: %5u failed to lock transport %p\n", 221 task->tk_pid, xprt); 222 task->tk_status = -EAGAIN; 223 if (RPC_IS_SOFT(task)) 224 rpc_sleep_on_timeout(&xprt->sending, task, NULL, 225 xprt_request_timeout(req)); 226 else 227 rpc_sleep_on(&xprt->sending, task, NULL); 228 return 0; 229 } 230 EXPORT_SYMBOL_GPL(xprt_reserve_xprt); 231 232 static bool 233 xprt_need_congestion_window_wait(struct rpc_xprt *xprt) 234 { 235 return test_bit(XPRT_CWND_WAIT, &xprt->state); 236 } 237 238 static void 239 xprt_set_congestion_window_wait(struct rpc_xprt *xprt) 240 { 241 if (!list_empty(&xprt->xmit_queue)) { 242 /* Peek at head of queue to see if it can make progress */ 243 if (list_first_entry(&xprt->xmit_queue, struct rpc_rqst, 244 rq_xmit)->rq_cong) 245 return; 246 } 247 set_bit(XPRT_CWND_WAIT, &xprt->state); 248 } 249 250 static void 251 xprt_test_and_clear_congestion_window_wait(struct rpc_xprt *xprt) 252 { 253 if (!RPCXPRT_CONGESTED(xprt)) 254 clear_bit(XPRT_CWND_WAIT, &xprt->state); 255 } 256 257 /* 258 * xprt_reserve_xprt_cong - serialize write access to transports 259 * @task: task that is requesting access to the transport 260 * 261 * Same as xprt_reserve_xprt, but Van Jacobson congestion control is 262 * integrated into the decision of whether a request is allowed to be 263 * woken up and given access to the transport. 264 * Note that the lock is only granted if we know there are free slots. 265 */ 266 int xprt_reserve_xprt_cong(struct rpc_xprt *xprt, struct rpc_task *task) 267 { 268 struct rpc_rqst *req = task->tk_rqstp; 269 270 if (test_and_set_bit(XPRT_LOCKED, &xprt->state)) { 271 if (task == xprt->snd_task) 272 return 1; 273 goto out_sleep; 274 } 275 if (req == NULL) { 276 xprt->snd_task = task; 277 return 1; 278 } 279 if (test_bit(XPRT_WRITE_SPACE, &xprt->state)) 280 goto out_unlock; 281 if (!xprt_need_congestion_window_wait(xprt)) { 282 xprt->snd_task = task; 283 return 1; 284 } 285 out_unlock: 286 xprt_clear_locked(xprt); 287 out_sleep: 288 dprintk("RPC: %5u failed to lock transport %p\n", task->tk_pid, xprt); 289 task->tk_status = -EAGAIN; 290 if (RPC_IS_SOFT(task)) 291 rpc_sleep_on_timeout(&xprt->sending, task, NULL, 292 xprt_request_timeout(req)); 293 else 294 rpc_sleep_on(&xprt->sending, task, NULL); 295 return 0; 296 } 297 EXPORT_SYMBOL_GPL(xprt_reserve_xprt_cong); 298 299 static inline int xprt_lock_write(struct rpc_xprt *xprt, struct rpc_task *task) 300 { 301 int retval; 302 303 if (test_bit(XPRT_LOCKED, &xprt->state) && xprt->snd_task == task) 304 return 1; 305 spin_lock(&xprt->transport_lock); 306 retval = xprt->ops->reserve_xprt(xprt, task); 307 spin_unlock(&xprt->transport_lock); 308 return retval; 309 } 310 311 static bool __xprt_lock_write_func(struct rpc_task *task, void *data) 312 { 313 struct rpc_xprt *xprt = data; 314 315 xprt->snd_task = task; 316 return true; 317 } 318 319 static void __xprt_lock_write_next(struct rpc_xprt *xprt) 320 { 321 if (test_and_set_bit(XPRT_LOCKED, &xprt->state)) 322 return; 323 if (test_bit(XPRT_WRITE_SPACE, &xprt->state)) 324 goto out_unlock; 325 if (rpc_wake_up_first_on_wq(xprtiod_workqueue, &xprt->sending, 326 __xprt_lock_write_func, xprt)) 327 return; 328 out_unlock: 329 xprt_clear_locked(xprt); 330 } 331 332 static void __xprt_lock_write_next_cong(struct rpc_xprt *xprt) 333 { 334 if (test_and_set_bit(XPRT_LOCKED, &xprt->state)) 335 return; 336 if (test_bit(XPRT_WRITE_SPACE, &xprt->state)) 337 goto out_unlock; 338 if (xprt_need_congestion_window_wait(xprt)) 339 goto out_unlock; 340 if (rpc_wake_up_first_on_wq(xprtiod_workqueue, &xprt->sending, 341 __xprt_lock_write_func, xprt)) 342 return; 343 out_unlock: 344 xprt_clear_locked(xprt); 345 } 346 347 /** 348 * xprt_release_xprt - allow other requests to use a transport 349 * @xprt: transport with other tasks potentially waiting 350 * @task: task that is releasing access to the transport 351 * 352 * Note that "task" can be NULL. No congestion control is provided. 353 */ 354 void xprt_release_xprt(struct rpc_xprt *xprt, struct rpc_task *task) 355 { 356 if (xprt->snd_task == task) { 357 xprt_clear_locked(xprt); 358 __xprt_lock_write_next(xprt); 359 } 360 } 361 EXPORT_SYMBOL_GPL(xprt_release_xprt); 362 363 /** 364 * xprt_release_xprt_cong - allow other requests to use a transport 365 * @xprt: transport with other tasks potentially waiting 366 * @task: task that is releasing access to the transport 367 * 368 * Note that "task" can be NULL. Another task is awoken to use the 369 * transport if the transport's congestion window allows it. 370 */ 371 void xprt_release_xprt_cong(struct rpc_xprt *xprt, struct rpc_task *task) 372 { 373 if (xprt->snd_task == task) { 374 xprt_clear_locked(xprt); 375 __xprt_lock_write_next_cong(xprt); 376 } 377 } 378 EXPORT_SYMBOL_GPL(xprt_release_xprt_cong); 379 380 static inline void xprt_release_write(struct rpc_xprt *xprt, struct rpc_task *task) 381 { 382 if (xprt->snd_task != task) 383 return; 384 spin_lock(&xprt->transport_lock); 385 xprt->ops->release_xprt(xprt, task); 386 spin_unlock(&xprt->transport_lock); 387 } 388 389 /* 390 * Van Jacobson congestion avoidance. Check if the congestion window 391 * overflowed. Put the task to sleep if this is the case. 392 */ 393 static int 394 __xprt_get_cong(struct rpc_xprt *xprt, struct rpc_rqst *req) 395 { 396 if (req->rq_cong) 397 return 1; 398 dprintk("RPC: %5u xprt_cwnd_limited cong = %lu cwnd = %lu\n", 399 req->rq_task->tk_pid, xprt->cong, xprt->cwnd); 400 if (RPCXPRT_CONGESTED(xprt)) { 401 xprt_set_congestion_window_wait(xprt); 402 return 0; 403 } 404 req->rq_cong = 1; 405 xprt->cong += RPC_CWNDSCALE; 406 return 1; 407 } 408 409 /* 410 * Adjust the congestion window, and wake up the next task 411 * that has been sleeping due to congestion 412 */ 413 static void 414 __xprt_put_cong(struct rpc_xprt *xprt, struct rpc_rqst *req) 415 { 416 if (!req->rq_cong) 417 return; 418 req->rq_cong = 0; 419 xprt->cong -= RPC_CWNDSCALE; 420 xprt_test_and_clear_congestion_window_wait(xprt); 421 __xprt_lock_write_next_cong(xprt); 422 } 423 424 /** 425 * xprt_request_get_cong - Request congestion control credits 426 * @xprt: pointer to transport 427 * @req: pointer to RPC request 428 * 429 * Useful for transports that require congestion control. 430 */ 431 bool 432 xprt_request_get_cong(struct rpc_xprt *xprt, struct rpc_rqst *req) 433 { 434 bool ret = false; 435 436 if (req->rq_cong) 437 return true; 438 spin_lock(&xprt->transport_lock); 439 ret = __xprt_get_cong(xprt, req) != 0; 440 spin_unlock(&xprt->transport_lock); 441 return ret; 442 } 443 EXPORT_SYMBOL_GPL(xprt_request_get_cong); 444 445 /** 446 * xprt_release_rqst_cong - housekeeping when request is complete 447 * @task: RPC request that recently completed 448 * 449 * Useful for transports that require congestion control. 450 */ 451 void xprt_release_rqst_cong(struct rpc_task *task) 452 { 453 struct rpc_rqst *req = task->tk_rqstp; 454 455 __xprt_put_cong(req->rq_xprt, req); 456 } 457 EXPORT_SYMBOL_GPL(xprt_release_rqst_cong); 458 459 /* 460 * Clear the congestion window wait flag and wake up the next 461 * entry on xprt->sending 462 */ 463 static void 464 xprt_clear_congestion_window_wait(struct rpc_xprt *xprt) 465 { 466 if (test_and_clear_bit(XPRT_CWND_WAIT, &xprt->state)) { 467 spin_lock(&xprt->transport_lock); 468 __xprt_lock_write_next_cong(xprt); 469 spin_unlock(&xprt->transport_lock); 470 } 471 } 472 473 /** 474 * xprt_adjust_cwnd - adjust transport congestion window 475 * @xprt: pointer to xprt 476 * @task: recently completed RPC request used to adjust window 477 * @result: result code of completed RPC request 478 * 479 * The transport code maintains an estimate on the maximum number of out- 480 * standing RPC requests, using a smoothed version of the congestion 481 * avoidance implemented in 44BSD. This is basically the Van Jacobson 482 * congestion algorithm: If a retransmit occurs, the congestion window is 483 * halved; otherwise, it is incremented by 1/cwnd when 484 * 485 * - a reply is received and 486 * - a full number of requests are outstanding and 487 * - the congestion window hasn't been updated recently. 488 */ 489 void xprt_adjust_cwnd(struct rpc_xprt *xprt, struct rpc_task *task, int result) 490 { 491 struct rpc_rqst *req = task->tk_rqstp; 492 unsigned long cwnd = xprt->cwnd; 493 494 if (result >= 0 && cwnd <= xprt->cong) { 495 /* The (cwnd >> 1) term makes sure 496 * the result gets rounded properly. */ 497 cwnd += (RPC_CWNDSCALE * RPC_CWNDSCALE + (cwnd >> 1)) / cwnd; 498 if (cwnd > RPC_MAXCWND(xprt)) 499 cwnd = RPC_MAXCWND(xprt); 500 __xprt_lock_write_next_cong(xprt); 501 } else if (result == -ETIMEDOUT) { 502 cwnd >>= 1; 503 if (cwnd < RPC_CWNDSCALE) 504 cwnd = RPC_CWNDSCALE; 505 } 506 dprintk("RPC: cong %ld, cwnd was %ld, now %ld\n", 507 xprt->cong, xprt->cwnd, cwnd); 508 xprt->cwnd = cwnd; 509 __xprt_put_cong(xprt, req); 510 } 511 EXPORT_SYMBOL_GPL(xprt_adjust_cwnd); 512 513 /** 514 * xprt_wake_pending_tasks - wake all tasks on a transport's pending queue 515 * @xprt: transport with waiting tasks 516 * @status: result code to plant in each task before waking it 517 * 518 */ 519 void xprt_wake_pending_tasks(struct rpc_xprt *xprt, int status) 520 { 521 if (status < 0) 522 rpc_wake_up_status(&xprt->pending, status); 523 else 524 rpc_wake_up(&xprt->pending); 525 } 526 EXPORT_SYMBOL_GPL(xprt_wake_pending_tasks); 527 528 /** 529 * xprt_wait_for_buffer_space - wait for transport output buffer to clear 530 * @xprt: transport 531 * 532 * Note that we only set the timer for the case of RPC_IS_SOFT(), since 533 * we don't in general want to force a socket disconnection due to 534 * an incomplete RPC call transmission. 535 */ 536 void xprt_wait_for_buffer_space(struct rpc_xprt *xprt) 537 { 538 set_bit(XPRT_WRITE_SPACE, &xprt->state); 539 } 540 EXPORT_SYMBOL_GPL(xprt_wait_for_buffer_space); 541 542 static bool 543 xprt_clear_write_space_locked(struct rpc_xprt *xprt) 544 { 545 if (test_and_clear_bit(XPRT_WRITE_SPACE, &xprt->state)) { 546 __xprt_lock_write_next(xprt); 547 dprintk("RPC: write space: waking waiting task on " 548 "xprt %p\n", xprt); 549 return true; 550 } 551 return false; 552 } 553 554 /** 555 * xprt_write_space - wake the task waiting for transport output buffer space 556 * @xprt: transport with waiting tasks 557 * 558 * Can be called in a soft IRQ context, so xprt_write_space never sleeps. 559 */ 560 bool xprt_write_space(struct rpc_xprt *xprt) 561 { 562 bool ret; 563 564 if (!test_bit(XPRT_WRITE_SPACE, &xprt->state)) 565 return false; 566 spin_lock(&xprt->transport_lock); 567 ret = xprt_clear_write_space_locked(xprt); 568 spin_unlock(&xprt->transport_lock); 569 return ret; 570 } 571 EXPORT_SYMBOL_GPL(xprt_write_space); 572 573 static unsigned long xprt_abs_ktime_to_jiffies(ktime_t abstime) 574 { 575 s64 delta = ktime_to_ns(ktime_get() - abstime); 576 return likely(delta >= 0) ? 577 jiffies - nsecs_to_jiffies(delta) : 578 jiffies + nsecs_to_jiffies(-delta); 579 } 580 581 static unsigned long xprt_calc_majortimeo(struct rpc_rqst *req) 582 { 583 const struct rpc_timeout *to = req->rq_task->tk_client->cl_timeout; 584 unsigned long majortimeo = req->rq_timeout; 585 586 if (to->to_exponential) 587 majortimeo <<= to->to_retries; 588 else 589 majortimeo += to->to_increment * to->to_retries; 590 if (majortimeo > to->to_maxval || majortimeo == 0) 591 majortimeo = to->to_maxval; 592 return majortimeo; 593 } 594 595 static void xprt_reset_majortimeo(struct rpc_rqst *req) 596 { 597 req->rq_majortimeo += xprt_calc_majortimeo(req); 598 } 599 600 static void xprt_init_majortimeo(struct rpc_task *task, struct rpc_rqst *req) 601 { 602 unsigned long time_init; 603 struct rpc_xprt *xprt = req->rq_xprt; 604 605 if (likely(xprt && xprt_connected(xprt))) 606 time_init = jiffies; 607 else 608 time_init = xprt_abs_ktime_to_jiffies(task->tk_start); 609 req->rq_timeout = task->tk_client->cl_timeout->to_initval; 610 req->rq_majortimeo = time_init + xprt_calc_majortimeo(req); 611 } 612 613 /** 614 * xprt_adjust_timeout - adjust timeout values for next retransmit 615 * @req: RPC request containing parameters to use for the adjustment 616 * 617 */ 618 int xprt_adjust_timeout(struct rpc_rqst *req) 619 { 620 struct rpc_xprt *xprt = req->rq_xprt; 621 const struct rpc_timeout *to = req->rq_task->tk_client->cl_timeout; 622 int status = 0; 623 624 if (time_before(jiffies, req->rq_majortimeo)) { 625 if (to->to_exponential) 626 req->rq_timeout <<= 1; 627 else 628 req->rq_timeout += to->to_increment; 629 if (to->to_maxval && req->rq_timeout >= to->to_maxval) 630 req->rq_timeout = to->to_maxval; 631 req->rq_retries++; 632 } else { 633 req->rq_timeout = to->to_initval; 634 req->rq_retries = 0; 635 xprt_reset_majortimeo(req); 636 /* Reset the RTT counters == "slow start" */ 637 spin_lock(&xprt->transport_lock); 638 rpc_init_rtt(req->rq_task->tk_client->cl_rtt, to->to_initval); 639 spin_unlock(&xprt->transport_lock); 640 status = -ETIMEDOUT; 641 } 642 643 if (req->rq_timeout == 0) { 644 printk(KERN_WARNING "xprt_adjust_timeout: rq_timeout = 0!\n"); 645 req->rq_timeout = 5 * HZ; 646 } 647 return status; 648 } 649 650 static void xprt_autoclose(struct work_struct *work) 651 { 652 struct rpc_xprt *xprt = 653 container_of(work, struct rpc_xprt, task_cleanup); 654 unsigned int pflags = memalloc_nofs_save(); 655 656 clear_bit(XPRT_CLOSE_WAIT, &xprt->state); 657 xprt->ops->close(xprt); 658 xprt_release_write(xprt, NULL); 659 wake_up_bit(&xprt->state, XPRT_LOCKED); 660 memalloc_nofs_restore(pflags); 661 } 662 663 /** 664 * xprt_disconnect_done - mark a transport as disconnected 665 * @xprt: transport to flag for disconnect 666 * 667 */ 668 void xprt_disconnect_done(struct rpc_xprt *xprt) 669 { 670 dprintk("RPC: disconnected transport %p\n", xprt); 671 spin_lock(&xprt->transport_lock); 672 xprt_clear_connected(xprt); 673 xprt_clear_write_space_locked(xprt); 674 xprt_wake_pending_tasks(xprt, -ENOTCONN); 675 spin_unlock(&xprt->transport_lock); 676 } 677 EXPORT_SYMBOL_GPL(xprt_disconnect_done); 678 679 /** 680 * xprt_force_disconnect - force a transport to disconnect 681 * @xprt: transport to disconnect 682 * 683 */ 684 void xprt_force_disconnect(struct rpc_xprt *xprt) 685 { 686 /* Don't race with the test_bit() in xprt_clear_locked() */ 687 spin_lock(&xprt->transport_lock); 688 set_bit(XPRT_CLOSE_WAIT, &xprt->state); 689 /* Try to schedule an autoclose RPC call */ 690 if (test_and_set_bit(XPRT_LOCKED, &xprt->state) == 0) 691 queue_work(xprtiod_workqueue, &xprt->task_cleanup); 692 else if (xprt->snd_task) 693 rpc_wake_up_queued_task_set_status(&xprt->pending, 694 xprt->snd_task, -ENOTCONN); 695 spin_unlock(&xprt->transport_lock); 696 } 697 EXPORT_SYMBOL_GPL(xprt_force_disconnect); 698 699 static unsigned int 700 xprt_connect_cookie(struct rpc_xprt *xprt) 701 { 702 return READ_ONCE(xprt->connect_cookie); 703 } 704 705 static bool 706 xprt_request_retransmit_after_disconnect(struct rpc_task *task) 707 { 708 struct rpc_rqst *req = task->tk_rqstp; 709 struct rpc_xprt *xprt = req->rq_xprt; 710 711 return req->rq_connect_cookie != xprt_connect_cookie(xprt) || 712 !xprt_connected(xprt); 713 } 714 715 /** 716 * xprt_conditional_disconnect - force a transport to disconnect 717 * @xprt: transport to disconnect 718 * @cookie: 'connection cookie' 719 * 720 * This attempts to break the connection if and only if 'cookie' matches 721 * the current transport 'connection cookie'. It ensures that we don't 722 * try to break the connection more than once when we need to retransmit 723 * a batch of RPC requests. 724 * 725 */ 726 void xprt_conditional_disconnect(struct rpc_xprt *xprt, unsigned int cookie) 727 { 728 /* Don't race with the test_bit() in xprt_clear_locked() */ 729 spin_lock(&xprt->transport_lock); 730 if (cookie != xprt->connect_cookie) 731 goto out; 732 if (test_bit(XPRT_CLOSING, &xprt->state)) 733 goto out; 734 set_bit(XPRT_CLOSE_WAIT, &xprt->state); 735 /* Try to schedule an autoclose RPC call */ 736 if (test_and_set_bit(XPRT_LOCKED, &xprt->state) == 0) 737 queue_work(xprtiod_workqueue, &xprt->task_cleanup); 738 xprt_wake_pending_tasks(xprt, -EAGAIN); 739 out: 740 spin_unlock(&xprt->transport_lock); 741 } 742 743 static bool 744 xprt_has_timer(const struct rpc_xprt *xprt) 745 { 746 return xprt->idle_timeout != 0; 747 } 748 749 static void 750 xprt_schedule_autodisconnect(struct rpc_xprt *xprt) 751 __must_hold(&xprt->transport_lock) 752 { 753 xprt->last_used = jiffies; 754 if (RB_EMPTY_ROOT(&xprt->recv_queue) && xprt_has_timer(xprt)) 755 mod_timer(&xprt->timer, xprt->last_used + xprt->idle_timeout); 756 } 757 758 static void 759 xprt_init_autodisconnect(struct timer_list *t) 760 { 761 struct rpc_xprt *xprt = from_timer(xprt, t, timer); 762 763 if (!RB_EMPTY_ROOT(&xprt->recv_queue)) 764 return; 765 /* Reset xprt->last_used to avoid connect/autodisconnect cycling */ 766 xprt->last_used = jiffies; 767 if (test_and_set_bit(XPRT_LOCKED, &xprt->state)) 768 return; 769 queue_work(xprtiod_workqueue, &xprt->task_cleanup); 770 } 771 772 bool xprt_lock_connect(struct rpc_xprt *xprt, 773 struct rpc_task *task, 774 void *cookie) 775 { 776 bool ret = false; 777 778 spin_lock(&xprt->transport_lock); 779 if (!test_bit(XPRT_LOCKED, &xprt->state)) 780 goto out; 781 if (xprt->snd_task != task) 782 goto out; 783 xprt->snd_task = cookie; 784 ret = true; 785 out: 786 spin_unlock(&xprt->transport_lock); 787 return ret; 788 } 789 790 void xprt_unlock_connect(struct rpc_xprt *xprt, void *cookie) 791 { 792 spin_lock(&xprt->transport_lock); 793 if (xprt->snd_task != cookie) 794 goto out; 795 if (!test_bit(XPRT_LOCKED, &xprt->state)) 796 goto out; 797 xprt->snd_task =NULL; 798 xprt->ops->release_xprt(xprt, NULL); 799 xprt_schedule_autodisconnect(xprt); 800 out: 801 spin_unlock(&xprt->transport_lock); 802 wake_up_bit(&xprt->state, XPRT_LOCKED); 803 } 804 805 /** 806 * xprt_connect - schedule a transport connect operation 807 * @task: RPC task that is requesting the connect 808 * 809 */ 810 void xprt_connect(struct rpc_task *task) 811 { 812 struct rpc_xprt *xprt = task->tk_rqstp->rq_xprt; 813 814 dprintk("RPC: %5u xprt_connect xprt %p %s connected\n", task->tk_pid, 815 xprt, (xprt_connected(xprt) ? "is" : "is not")); 816 817 if (!xprt_bound(xprt)) { 818 task->tk_status = -EAGAIN; 819 return; 820 } 821 if (!xprt_lock_write(xprt, task)) 822 return; 823 824 if (test_and_clear_bit(XPRT_CLOSE_WAIT, &xprt->state)) 825 xprt->ops->close(xprt); 826 827 if (!xprt_connected(xprt)) { 828 task->tk_rqstp->rq_connect_cookie = xprt->connect_cookie; 829 rpc_sleep_on_timeout(&xprt->pending, task, NULL, 830 xprt_request_timeout(task->tk_rqstp)); 831 832 if (test_bit(XPRT_CLOSING, &xprt->state)) 833 return; 834 if (xprt_test_and_set_connecting(xprt)) 835 return; 836 /* Race breaker */ 837 if (!xprt_connected(xprt)) { 838 xprt->stat.connect_start = jiffies; 839 xprt->ops->connect(xprt, task); 840 } else { 841 xprt_clear_connecting(xprt); 842 task->tk_status = 0; 843 rpc_wake_up_queued_task(&xprt->pending, task); 844 } 845 } 846 xprt_release_write(xprt, task); 847 } 848 849 /** 850 * xprt_reconnect_delay - compute the wait before scheduling a connect 851 * @xprt: transport instance 852 * 853 */ 854 unsigned long xprt_reconnect_delay(const struct rpc_xprt *xprt) 855 { 856 unsigned long start, now = jiffies; 857 858 start = xprt->stat.connect_start + xprt->reestablish_timeout; 859 if (time_after(start, now)) 860 return start - now; 861 return 0; 862 } 863 EXPORT_SYMBOL_GPL(xprt_reconnect_delay); 864 865 /** 866 * xprt_reconnect_backoff - compute the new re-establish timeout 867 * @xprt: transport instance 868 * @init_to: initial reestablish timeout 869 * 870 */ 871 void xprt_reconnect_backoff(struct rpc_xprt *xprt, unsigned long init_to) 872 { 873 xprt->reestablish_timeout <<= 1; 874 if (xprt->reestablish_timeout > xprt->max_reconnect_timeout) 875 xprt->reestablish_timeout = xprt->max_reconnect_timeout; 876 if (xprt->reestablish_timeout < init_to) 877 xprt->reestablish_timeout = init_to; 878 } 879 EXPORT_SYMBOL_GPL(xprt_reconnect_backoff); 880 881 enum xprt_xid_rb_cmp { 882 XID_RB_EQUAL, 883 XID_RB_LEFT, 884 XID_RB_RIGHT, 885 }; 886 static enum xprt_xid_rb_cmp 887 xprt_xid_cmp(__be32 xid1, __be32 xid2) 888 { 889 if (xid1 == xid2) 890 return XID_RB_EQUAL; 891 if ((__force u32)xid1 < (__force u32)xid2) 892 return XID_RB_LEFT; 893 return XID_RB_RIGHT; 894 } 895 896 static struct rpc_rqst * 897 xprt_request_rb_find(struct rpc_xprt *xprt, __be32 xid) 898 { 899 struct rb_node *n = xprt->recv_queue.rb_node; 900 struct rpc_rqst *req; 901 902 while (n != NULL) { 903 req = rb_entry(n, struct rpc_rqst, rq_recv); 904 switch (xprt_xid_cmp(xid, req->rq_xid)) { 905 case XID_RB_LEFT: 906 n = n->rb_left; 907 break; 908 case XID_RB_RIGHT: 909 n = n->rb_right; 910 break; 911 case XID_RB_EQUAL: 912 return req; 913 } 914 } 915 return NULL; 916 } 917 918 static void 919 xprt_request_rb_insert(struct rpc_xprt *xprt, struct rpc_rqst *new) 920 { 921 struct rb_node **p = &xprt->recv_queue.rb_node; 922 struct rb_node *n = NULL; 923 struct rpc_rqst *req; 924 925 while (*p != NULL) { 926 n = *p; 927 req = rb_entry(n, struct rpc_rqst, rq_recv); 928 switch(xprt_xid_cmp(new->rq_xid, req->rq_xid)) { 929 case XID_RB_LEFT: 930 p = &n->rb_left; 931 break; 932 case XID_RB_RIGHT: 933 p = &n->rb_right; 934 break; 935 case XID_RB_EQUAL: 936 WARN_ON_ONCE(new != req); 937 return; 938 } 939 } 940 rb_link_node(&new->rq_recv, n, p); 941 rb_insert_color(&new->rq_recv, &xprt->recv_queue); 942 } 943 944 static void 945 xprt_request_rb_remove(struct rpc_xprt *xprt, struct rpc_rqst *req) 946 { 947 rb_erase(&req->rq_recv, &xprt->recv_queue); 948 } 949 950 /** 951 * xprt_lookup_rqst - find an RPC request corresponding to an XID 952 * @xprt: transport on which the original request was transmitted 953 * @xid: RPC XID of incoming reply 954 * 955 * Caller holds xprt->queue_lock. 956 */ 957 struct rpc_rqst *xprt_lookup_rqst(struct rpc_xprt *xprt, __be32 xid) 958 { 959 struct rpc_rqst *entry; 960 961 entry = xprt_request_rb_find(xprt, xid); 962 if (entry != NULL) { 963 trace_xprt_lookup_rqst(xprt, xid, 0); 964 entry->rq_rtt = ktime_sub(ktime_get(), entry->rq_xtime); 965 return entry; 966 } 967 968 dprintk("RPC: xprt_lookup_rqst did not find xid %08x\n", 969 ntohl(xid)); 970 trace_xprt_lookup_rqst(xprt, xid, -ENOENT); 971 xprt->stat.bad_xids++; 972 return NULL; 973 } 974 EXPORT_SYMBOL_GPL(xprt_lookup_rqst); 975 976 static bool 977 xprt_is_pinned_rqst(struct rpc_rqst *req) 978 { 979 return atomic_read(&req->rq_pin) != 0; 980 } 981 982 /** 983 * xprt_pin_rqst - Pin a request on the transport receive list 984 * @req: Request to pin 985 * 986 * Caller must ensure this is atomic with the call to xprt_lookup_rqst() 987 * so should be holding xprt->queue_lock. 988 */ 989 void xprt_pin_rqst(struct rpc_rqst *req) 990 { 991 atomic_inc(&req->rq_pin); 992 } 993 EXPORT_SYMBOL_GPL(xprt_pin_rqst); 994 995 /** 996 * xprt_unpin_rqst - Unpin a request on the transport receive list 997 * @req: Request to pin 998 * 999 * Caller should be holding xprt->queue_lock. 1000 */ 1001 void xprt_unpin_rqst(struct rpc_rqst *req) 1002 { 1003 if (!test_bit(RPC_TASK_MSG_PIN_WAIT, &req->rq_task->tk_runstate)) { 1004 atomic_dec(&req->rq_pin); 1005 return; 1006 } 1007 if (atomic_dec_and_test(&req->rq_pin)) 1008 wake_up_var(&req->rq_pin); 1009 } 1010 EXPORT_SYMBOL_GPL(xprt_unpin_rqst); 1011 1012 static void xprt_wait_on_pinned_rqst(struct rpc_rqst *req) 1013 { 1014 wait_var_event(&req->rq_pin, !xprt_is_pinned_rqst(req)); 1015 } 1016 1017 static bool 1018 xprt_request_data_received(struct rpc_task *task) 1019 { 1020 return !test_bit(RPC_TASK_NEED_RECV, &task->tk_runstate) && 1021 READ_ONCE(task->tk_rqstp->rq_reply_bytes_recvd) != 0; 1022 } 1023 1024 static bool 1025 xprt_request_need_enqueue_receive(struct rpc_task *task, struct rpc_rqst *req) 1026 { 1027 return !test_bit(RPC_TASK_NEED_RECV, &task->tk_runstate) && 1028 READ_ONCE(task->tk_rqstp->rq_reply_bytes_recvd) == 0; 1029 } 1030 1031 /** 1032 * xprt_request_enqueue_receive - Add an request to the receive queue 1033 * @task: RPC task 1034 * 1035 */ 1036 void 1037 xprt_request_enqueue_receive(struct rpc_task *task) 1038 { 1039 struct rpc_rqst *req = task->tk_rqstp; 1040 struct rpc_xprt *xprt = req->rq_xprt; 1041 1042 if (!xprt_request_need_enqueue_receive(task, req)) 1043 return; 1044 1045 xprt_request_prepare(task->tk_rqstp); 1046 spin_lock(&xprt->queue_lock); 1047 1048 /* Update the softirq receive buffer */ 1049 memcpy(&req->rq_private_buf, &req->rq_rcv_buf, 1050 sizeof(req->rq_private_buf)); 1051 1052 /* Add request to the receive list */ 1053 xprt_request_rb_insert(xprt, req); 1054 set_bit(RPC_TASK_NEED_RECV, &task->tk_runstate); 1055 spin_unlock(&xprt->queue_lock); 1056 1057 /* Turn off autodisconnect */ 1058 del_singleshot_timer_sync(&xprt->timer); 1059 } 1060 1061 /** 1062 * xprt_request_dequeue_receive_locked - Remove a request from the receive queue 1063 * @task: RPC task 1064 * 1065 * Caller must hold xprt->queue_lock. 1066 */ 1067 static void 1068 xprt_request_dequeue_receive_locked(struct rpc_task *task) 1069 { 1070 struct rpc_rqst *req = task->tk_rqstp; 1071 1072 if (test_and_clear_bit(RPC_TASK_NEED_RECV, &task->tk_runstate)) 1073 xprt_request_rb_remove(req->rq_xprt, req); 1074 } 1075 1076 /** 1077 * xprt_update_rtt - Update RPC RTT statistics 1078 * @task: RPC request that recently completed 1079 * 1080 * Caller holds xprt->queue_lock. 1081 */ 1082 void xprt_update_rtt(struct rpc_task *task) 1083 { 1084 struct rpc_rqst *req = task->tk_rqstp; 1085 struct rpc_rtt *rtt = task->tk_client->cl_rtt; 1086 unsigned int timer = task->tk_msg.rpc_proc->p_timer; 1087 long m = usecs_to_jiffies(ktime_to_us(req->rq_rtt)); 1088 1089 if (timer) { 1090 if (req->rq_ntrans == 1) 1091 rpc_update_rtt(rtt, timer, m); 1092 rpc_set_timeo(rtt, timer, req->rq_ntrans - 1); 1093 } 1094 } 1095 EXPORT_SYMBOL_GPL(xprt_update_rtt); 1096 1097 /** 1098 * xprt_complete_rqst - called when reply processing is complete 1099 * @task: RPC request that recently completed 1100 * @copied: actual number of bytes received from the transport 1101 * 1102 * Caller holds xprt->queue_lock. 1103 */ 1104 void xprt_complete_rqst(struct rpc_task *task, int copied) 1105 { 1106 struct rpc_rqst *req = task->tk_rqstp; 1107 struct rpc_xprt *xprt = req->rq_xprt; 1108 1109 dprintk("RPC: %5u xid %08x complete (%d bytes received)\n", 1110 task->tk_pid, ntohl(req->rq_xid), copied); 1111 trace_xprt_complete_rqst(xprt, req->rq_xid, copied); 1112 1113 xprt->stat.recvs++; 1114 1115 req->rq_private_buf.len = copied; 1116 /* Ensure all writes are done before we update */ 1117 /* req->rq_reply_bytes_recvd */ 1118 smp_wmb(); 1119 req->rq_reply_bytes_recvd = copied; 1120 xprt_request_dequeue_receive_locked(task); 1121 rpc_wake_up_queued_task(&xprt->pending, task); 1122 } 1123 EXPORT_SYMBOL_GPL(xprt_complete_rqst); 1124 1125 static void xprt_timer(struct rpc_task *task) 1126 { 1127 struct rpc_rqst *req = task->tk_rqstp; 1128 struct rpc_xprt *xprt = req->rq_xprt; 1129 1130 if (task->tk_status != -ETIMEDOUT) 1131 return; 1132 1133 trace_xprt_timer(xprt, req->rq_xid, task->tk_status); 1134 if (!req->rq_reply_bytes_recvd) { 1135 if (xprt->ops->timer) 1136 xprt->ops->timer(xprt, task); 1137 } else 1138 task->tk_status = 0; 1139 } 1140 1141 /** 1142 * xprt_wait_for_reply_request_def - wait for reply 1143 * @task: pointer to rpc_task 1144 * 1145 * Set a request's retransmit timeout based on the transport's 1146 * default timeout parameters. Used by transports that don't adjust 1147 * the retransmit timeout based on round-trip time estimation, 1148 * and put the task to sleep on the pending queue. 1149 */ 1150 void xprt_wait_for_reply_request_def(struct rpc_task *task) 1151 { 1152 struct rpc_rqst *req = task->tk_rqstp; 1153 1154 rpc_sleep_on_timeout(&req->rq_xprt->pending, task, xprt_timer, 1155 xprt_request_timeout(req)); 1156 } 1157 EXPORT_SYMBOL_GPL(xprt_wait_for_reply_request_def); 1158 1159 /** 1160 * xprt_wait_for_reply_request_rtt - wait for reply using RTT estimator 1161 * @task: pointer to rpc_task 1162 * 1163 * Set a request's retransmit timeout using the RTT estimator, 1164 * and put the task to sleep on the pending queue. 1165 */ 1166 void xprt_wait_for_reply_request_rtt(struct rpc_task *task) 1167 { 1168 int timer = task->tk_msg.rpc_proc->p_timer; 1169 struct rpc_clnt *clnt = task->tk_client; 1170 struct rpc_rtt *rtt = clnt->cl_rtt; 1171 struct rpc_rqst *req = task->tk_rqstp; 1172 unsigned long max_timeout = clnt->cl_timeout->to_maxval; 1173 unsigned long timeout; 1174 1175 timeout = rpc_calc_rto(rtt, timer); 1176 timeout <<= rpc_ntimeo(rtt, timer) + req->rq_retries; 1177 if (timeout > max_timeout || timeout == 0) 1178 timeout = max_timeout; 1179 rpc_sleep_on_timeout(&req->rq_xprt->pending, task, xprt_timer, 1180 jiffies + timeout); 1181 } 1182 EXPORT_SYMBOL_GPL(xprt_wait_for_reply_request_rtt); 1183 1184 /** 1185 * xprt_request_wait_receive - wait for the reply to an RPC request 1186 * @task: RPC task about to send a request 1187 * 1188 */ 1189 void xprt_request_wait_receive(struct rpc_task *task) 1190 { 1191 struct rpc_rqst *req = task->tk_rqstp; 1192 struct rpc_xprt *xprt = req->rq_xprt; 1193 1194 if (!test_bit(RPC_TASK_NEED_RECV, &task->tk_runstate)) 1195 return; 1196 /* 1197 * Sleep on the pending queue if we're expecting a reply. 1198 * The spinlock ensures atomicity between the test of 1199 * req->rq_reply_bytes_recvd, and the call to rpc_sleep_on(). 1200 */ 1201 spin_lock(&xprt->queue_lock); 1202 if (test_bit(RPC_TASK_NEED_RECV, &task->tk_runstate)) { 1203 xprt->ops->wait_for_reply_request(task); 1204 /* 1205 * Send an extra queue wakeup call if the 1206 * connection was dropped in case the call to 1207 * rpc_sleep_on() raced. 1208 */ 1209 if (xprt_request_retransmit_after_disconnect(task)) 1210 rpc_wake_up_queued_task_set_status(&xprt->pending, 1211 task, -ENOTCONN); 1212 } 1213 spin_unlock(&xprt->queue_lock); 1214 } 1215 1216 static bool 1217 xprt_request_need_enqueue_transmit(struct rpc_task *task, struct rpc_rqst *req) 1218 { 1219 return !test_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate); 1220 } 1221 1222 /** 1223 * xprt_request_enqueue_transmit - queue a task for transmission 1224 * @task: pointer to rpc_task 1225 * 1226 * Add a task to the transmission queue. 1227 */ 1228 void 1229 xprt_request_enqueue_transmit(struct rpc_task *task) 1230 { 1231 struct rpc_rqst *pos, *req = task->tk_rqstp; 1232 struct rpc_xprt *xprt = req->rq_xprt; 1233 1234 if (xprt_request_need_enqueue_transmit(task, req)) { 1235 req->rq_bytes_sent = 0; 1236 spin_lock(&xprt->queue_lock); 1237 /* 1238 * Requests that carry congestion control credits are added 1239 * to the head of the list to avoid starvation issues. 1240 */ 1241 if (req->rq_cong) { 1242 xprt_clear_congestion_window_wait(xprt); 1243 list_for_each_entry(pos, &xprt->xmit_queue, rq_xmit) { 1244 if (pos->rq_cong) 1245 continue; 1246 /* Note: req is added _before_ pos */ 1247 list_add_tail(&req->rq_xmit, &pos->rq_xmit); 1248 INIT_LIST_HEAD(&req->rq_xmit2); 1249 trace_xprt_enq_xmit(task, 1); 1250 goto out; 1251 } 1252 } else if (RPC_IS_SWAPPER(task)) { 1253 list_for_each_entry(pos, &xprt->xmit_queue, rq_xmit) { 1254 if (pos->rq_cong || pos->rq_bytes_sent) 1255 continue; 1256 if (RPC_IS_SWAPPER(pos->rq_task)) 1257 continue; 1258 /* Note: req is added _before_ pos */ 1259 list_add_tail(&req->rq_xmit, &pos->rq_xmit); 1260 INIT_LIST_HEAD(&req->rq_xmit2); 1261 trace_xprt_enq_xmit(task, 2); 1262 goto out; 1263 } 1264 } else if (!req->rq_seqno) { 1265 list_for_each_entry(pos, &xprt->xmit_queue, rq_xmit) { 1266 if (pos->rq_task->tk_owner != task->tk_owner) 1267 continue; 1268 list_add_tail(&req->rq_xmit2, &pos->rq_xmit2); 1269 INIT_LIST_HEAD(&req->rq_xmit); 1270 trace_xprt_enq_xmit(task, 3); 1271 goto out; 1272 } 1273 } 1274 list_add_tail(&req->rq_xmit, &xprt->xmit_queue); 1275 INIT_LIST_HEAD(&req->rq_xmit2); 1276 trace_xprt_enq_xmit(task, 4); 1277 out: 1278 set_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate); 1279 spin_unlock(&xprt->queue_lock); 1280 } 1281 } 1282 1283 /** 1284 * xprt_request_dequeue_transmit_locked - remove a task from the transmission queue 1285 * @task: pointer to rpc_task 1286 * 1287 * Remove a task from the transmission queue 1288 * Caller must hold xprt->queue_lock 1289 */ 1290 static void 1291 xprt_request_dequeue_transmit_locked(struct rpc_task *task) 1292 { 1293 struct rpc_rqst *req = task->tk_rqstp; 1294 1295 if (!test_and_clear_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate)) 1296 return; 1297 if (!list_empty(&req->rq_xmit)) { 1298 list_del(&req->rq_xmit); 1299 if (!list_empty(&req->rq_xmit2)) { 1300 struct rpc_rqst *next = list_first_entry(&req->rq_xmit2, 1301 struct rpc_rqst, rq_xmit2); 1302 list_del(&req->rq_xmit2); 1303 list_add_tail(&next->rq_xmit, &next->rq_xprt->xmit_queue); 1304 } 1305 } else 1306 list_del(&req->rq_xmit2); 1307 } 1308 1309 /** 1310 * xprt_request_dequeue_transmit - remove a task from the transmission queue 1311 * @task: pointer to rpc_task 1312 * 1313 * Remove a task from the transmission queue 1314 */ 1315 static void 1316 xprt_request_dequeue_transmit(struct rpc_task *task) 1317 { 1318 struct rpc_rqst *req = task->tk_rqstp; 1319 struct rpc_xprt *xprt = req->rq_xprt; 1320 1321 spin_lock(&xprt->queue_lock); 1322 xprt_request_dequeue_transmit_locked(task); 1323 spin_unlock(&xprt->queue_lock); 1324 } 1325 1326 /** 1327 * xprt_request_prepare - prepare an encoded request for transport 1328 * @req: pointer to rpc_rqst 1329 * 1330 * Calls into the transport layer to do whatever is needed to prepare 1331 * the request for transmission or receive. 1332 */ 1333 void 1334 xprt_request_prepare(struct rpc_rqst *req) 1335 { 1336 struct rpc_xprt *xprt = req->rq_xprt; 1337 1338 if (xprt->ops->prepare_request) 1339 xprt->ops->prepare_request(req); 1340 } 1341 1342 /** 1343 * xprt_request_need_retransmit - Test if a task needs retransmission 1344 * @task: pointer to rpc_task 1345 * 1346 * Test for whether a connection breakage requires the task to retransmit 1347 */ 1348 bool 1349 xprt_request_need_retransmit(struct rpc_task *task) 1350 { 1351 return xprt_request_retransmit_after_disconnect(task); 1352 } 1353 1354 /** 1355 * xprt_prepare_transmit - reserve the transport before sending a request 1356 * @task: RPC task about to send a request 1357 * 1358 */ 1359 bool xprt_prepare_transmit(struct rpc_task *task) 1360 { 1361 struct rpc_rqst *req = task->tk_rqstp; 1362 struct rpc_xprt *xprt = req->rq_xprt; 1363 1364 dprintk("RPC: %5u xprt_prepare_transmit\n", task->tk_pid); 1365 1366 if (!xprt_lock_write(xprt, task)) { 1367 /* Race breaker: someone may have transmitted us */ 1368 if (!test_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate)) 1369 rpc_wake_up_queued_task_set_status(&xprt->sending, 1370 task, 0); 1371 return false; 1372 1373 } 1374 return true; 1375 } 1376 1377 void xprt_end_transmit(struct rpc_task *task) 1378 { 1379 xprt_release_write(task->tk_rqstp->rq_xprt, task); 1380 } 1381 1382 /** 1383 * xprt_request_transmit - send an RPC request on a transport 1384 * @req: pointer to request to transmit 1385 * @snd_task: RPC task that owns the transport lock 1386 * 1387 * This performs the transmission of a single request. 1388 * Note that if the request is not the same as snd_task, then it 1389 * does need to be pinned. 1390 * Returns '0' on success. 1391 */ 1392 static int 1393 xprt_request_transmit(struct rpc_rqst *req, struct rpc_task *snd_task) 1394 { 1395 struct rpc_xprt *xprt = req->rq_xprt; 1396 struct rpc_task *task = req->rq_task; 1397 unsigned int connect_cookie; 1398 int is_retrans = RPC_WAS_SENT(task); 1399 int status; 1400 1401 if (!req->rq_bytes_sent) { 1402 if (xprt_request_data_received(task)) { 1403 status = 0; 1404 goto out_dequeue; 1405 } 1406 /* Verify that our message lies in the RPCSEC_GSS window */ 1407 if (rpcauth_xmit_need_reencode(task)) { 1408 status = -EBADMSG; 1409 goto out_dequeue; 1410 } 1411 if (task->tk_ops->rpc_call_prepare_transmit) { 1412 task->tk_ops->rpc_call_prepare_transmit(task, 1413 task->tk_calldata); 1414 status = task->tk_status; 1415 if (status < 0) 1416 goto out_dequeue; 1417 } 1418 if (RPC_SIGNALLED(task)) { 1419 status = -ERESTARTSYS; 1420 goto out_dequeue; 1421 } 1422 } 1423 1424 /* 1425 * Update req->rq_ntrans before transmitting to avoid races with 1426 * xprt_update_rtt(), which needs to know that it is recording a 1427 * reply to the first transmission. 1428 */ 1429 req->rq_ntrans++; 1430 1431 connect_cookie = xprt->connect_cookie; 1432 status = xprt->ops->send_request(req); 1433 if (status != 0) { 1434 req->rq_ntrans--; 1435 trace_xprt_transmit(req, status); 1436 return status; 1437 } 1438 1439 if (is_retrans) 1440 task->tk_client->cl_stats->rpcretrans++; 1441 1442 xprt_inject_disconnect(xprt); 1443 1444 task->tk_flags |= RPC_TASK_SENT; 1445 spin_lock(&xprt->transport_lock); 1446 1447 xprt->stat.sends++; 1448 xprt->stat.req_u += xprt->stat.sends - xprt->stat.recvs; 1449 xprt->stat.bklog_u += xprt->backlog.qlen; 1450 xprt->stat.sending_u += xprt->sending.qlen; 1451 xprt->stat.pending_u += xprt->pending.qlen; 1452 spin_unlock(&xprt->transport_lock); 1453 1454 req->rq_connect_cookie = connect_cookie; 1455 out_dequeue: 1456 trace_xprt_transmit(req, status); 1457 xprt_request_dequeue_transmit(task); 1458 rpc_wake_up_queued_task_set_status(&xprt->sending, task, status); 1459 return status; 1460 } 1461 1462 /** 1463 * xprt_transmit - send an RPC request on a transport 1464 * @task: controlling RPC task 1465 * 1466 * Attempts to drain the transmit queue. On exit, either the transport 1467 * signalled an error that needs to be handled before transmission can 1468 * resume, or @task finished transmitting, and detected that it already 1469 * received a reply. 1470 */ 1471 void 1472 xprt_transmit(struct rpc_task *task) 1473 { 1474 struct rpc_rqst *next, *req = task->tk_rqstp; 1475 struct rpc_xprt *xprt = req->rq_xprt; 1476 int status; 1477 1478 spin_lock(&xprt->queue_lock); 1479 while (!list_empty(&xprt->xmit_queue)) { 1480 next = list_first_entry(&xprt->xmit_queue, 1481 struct rpc_rqst, rq_xmit); 1482 xprt_pin_rqst(next); 1483 spin_unlock(&xprt->queue_lock); 1484 status = xprt_request_transmit(next, task); 1485 if (status == -EBADMSG && next != req) 1486 status = 0; 1487 cond_resched(); 1488 spin_lock(&xprt->queue_lock); 1489 xprt_unpin_rqst(next); 1490 if (status == 0) { 1491 if (!xprt_request_data_received(task) || 1492 test_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate)) 1493 continue; 1494 } else if (test_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate)) 1495 task->tk_status = status; 1496 break; 1497 } 1498 spin_unlock(&xprt->queue_lock); 1499 } 1500 1501 static void xprt_add_backlog(struct rpc_xprt *xprt, struct rpc_task *task) 1502 { 1503 set_bit(XPRT_CONGESTED, &xprt->state); 1504 rpc_sleep_on(&xprt->backlog, task, NULL); 1505 } 1506 1507 static void xprt_wake_up_backlog(struct rpc_xprt *xprt) 1508 { 1509 if (rpc_wake_up_next(&xprt->backlog) == NULL) 1510 clear_bit(XPRT_CONGESTED, &xprt->state); 1511 } 1512 1513 static bool xprt_throttle_congested(struct rpc_xprt *xprt, struct rpc_task *task) 1514 { 1515 bool ret = false; 1516 1517 if (!test_bit(XPRT_CONGESTED, &xprt->state)) 1518 goto out; 1519 spin_lock(&xprt->reserve_lock); 1520 if (test_bit(XPRT_CONGESTED, &xprt->state)) { 1521 rpc_sleep_on(&xprt->backlog, task, NULL); 1522 ret = true; 1523 } 1524 spin_unlock(&xprt->reserve_lock); 1525 out: 1526 return ret; 1527 } 1528 1529 static struct rpc_rqst *xprt_dynamic_alloc_slot(struct rpc_xprt *xprt) 1530 { 1531 struct rpc_rqst *req = ERR_PTR(-EAGAIN); 1532 1533 if (xprt->num_reqs >= xprt->max_reqs) 1534 goto out; 1535 ++xprt->num_reqs; 1536 spin_unlock(&xprt->reserve_lock); 1537 req = kzalloc(sizeof(struct rpc_rqst), GFP_NOFS); 1538 spin_lock(&xprt->reserve_lock); 1539 if (req != NULL) 1540 goto out; 1541 --xprt->num_reqs; 1542 req = ERR_PTR(-ENOMEM); 1543 out: 1544 return req; 1545 } 1546 1547 static bool xprt_dynamic_free_slot(struct rpc_xprt *xprt, struct rpc_rqst *req) 1548 { 1549 if (xprt->num_reqs > xprt->min_reqs) { 1550 --xprt->num_reqs; 1551 kfree(req); 1552 return true; 1553 } 1554 return false; 1555 } 1556 1557 void xprt_alloc_slot(struct rpc_xprt *xprt, struct rpc_task *task) 1558 { 1559 struct rpc_rqst *req; 1560 1561 spin_lock(&xprt->reserve_lock); 1562 if (!list_empty(&xprt->free)) { 1563 req = list_entry(xprt->free.next, struct rpc_rqst, rq_list); 1564 list_del(&req->rq_list); 1565 goto out_init_req; 1566 } 1567 req = xprt_dynamic_alloc_slot(xprt); 1568 if (!IS_ERR(req)) 1569 goto out_init_req; 1570 switch (PTR_ERR(req)) { 1571 case -ENOMEM: 1572 dprintk("RPC: dynamic allocation of request slot " 1573 "failed! Retrying\n"); 1574 task->tk_status = -ENOMEM; 1575 break; 1576 case -EAGAIN: 1577 xprt_add_backlog(xprt, task); 1578 dprintk("RPC: waiting for request slot\n"); 1579 /* fall through */ 1580 default: 1581 task->tk_status = -EAGAIN; 1582 } 1583 spin_unlock(&xprt->reserve_lock); 1584 return; 1585 out_init_req: 1586 xprt->stat.max_slots = max_t(unsigned int, xprt->stat.max_slots, 1587 xprt->num_reqs); 1588 spin_unlock(&xprt->reserve_lock); 1589 1590 task->tk_status = 0; 1591 task->tk_rqstp = req; 1592 } 1593 EXPORT_SYMBOL_GPL(xprt_alloc_slot); 1594 1595 void xprt_free_slot(struct rpc_xprt *xprt, struct rpc_rqst *req) 1596 { 1597 spin_lock(&xprt->reserve_lock); 1598 if (!xprt_dynamic_free_slot(xprt, req)) { 1599 memset(req, 0, sizeof(*req)); /* mark unused */ 1600 list_add(&req->rq_list, &xprt->free); 1601 } 1602 xprt_wake_up_backlog(xprt); 1603 spin_unlock(&xprt->reserve_lock); 1604 } 1605 EXPORT_SYMBOL_GPL(xprt_free_slot); 1606 1607 static void xprt_free_all_slots(struct rpc_xprt *xprt) 1608 { 1609 struct rpc_rqst *req; 1610 while (!list_empty(&xprt->free)) { 1611 req = list_first_entry(&xprt->free, struct rpc_rqst, rq_list); 1612 list_del(&req->rq_list); 1613 kfree(req); 1614 } 1615 } 1616 1617 struct rpc_xprt *xprt_alloc(struct net *net, size_t size, 1618 unsigned int num_prealloc, 1619 unsigned int max_alloc) 1620 { 1621 struct rpc_xprt *xprt; 1622 struct rpc_rqst *req; 1623 int i; 1624 1625 xprt = kzalloc(size, GFP_KERNEL); 1626 if (xprt == NULL) 1627 goto out; 1628 1629 xprt_init(xprt, net); 1630 1631 for (i = 0; i < num_prealloc; i++) { 1632 req = kzalloc(sizeof(struct rpc_rqst), GFP_KERNEL); 1633 if (!req) 1634 goto out_free; 1635 list_add(&req->rq_list, &xprt->free); 1636 } 1637 if (max_alloc > num_prealloc) 1638 xprt->max_reqs = max_alloc; 1639 else 1640 xprt->max_reqs = num_prealloc; 1641 xprt->min_reqs = num_prealloc; 1642 xprt->num_reqs = num_prealloc; 1643 1644 return xprt; 1645 1646 out_free: 1647 xprt_free(xprt); 1648 out: 1649 return NULL; 1650 } 1651 EXPORT_SYMBOL_GPL(xprt_alloc); 1652 1653 void xprt_free(struct rpc_xprt *xprt) 1654 { 1655 put_net(xprt->xprt_net); 1656 xprt_free_all_slots(xprt); 1657 kfree_rcu(xprt, rcu); 1658 } 1659 EXPORT_SYMBOL_GPL(xprt_free); 1660 1661 static void 1662 xprt_init_connect_cookie(struct rpc_rqst *req, struct rpc_xprt *xprt) 1663 { 1664 req->rq_connect_cookie = xprt_connect_cookie(xprt) - 1; 1665 } 1666 1667 static __be32 1668 xprt_alloc_xid(struct rpc_xprt *xprt) 1669 { 1670 __be32 xid; 1671 1672 spin_lock(&xprt->reserve_lock); 1673 xid = (__force __be32)xprt->xid++; 1674 spin_unlock(&xprt->reserve_lock); 1675 return xid; 1676 } 1677 1678 static void 1679 xprt_init_xid(struct rpc_xprt *xprt) 1680 { 1681 xprt->xid = prandom_u32(); 1682 } 1683 1684 static void 1685 xprt_request_init(struct rpc_task *task) 1686 { 1687 struct rpc_xprt *xprt = task->tk_xprt; 1688 struct rpc_rqst *req = task->tk_rqstp; 1689 1690 req->rq_task = task; 1691 req->rq_xprt = xprt; 1692 req->rq_buffer = NULL; 1693 req->rq_xid = xprt_alloc_xid(xprt); 1694 xprt_init_connect_cookie(req, xprt); 1695 req->rq_snd_buf.len = 0; 1696 req->rq_snd_buf.buflen = 0; 1697 req->rq_rcv_buf.len = 0; 1698 req->rq_rcv_buf.buflen = 0; 1699 req->rq_snd_buf.bvec = NULL; 1700 req->rq_rcv_buf.bvec = NULL; 1701 req->rq_release_snd_buf = NULL; 1702 xprt_init_majortimeo(task, req); 1703 dprintk("RPC: %5u reserved req %p xid %08x\n", task->tk_pid, 1704 req, ntohl(req->rq_xid)); 1705 } 1706 1707 static void 1708 xprt_do_reserve(struct rpc_xprt *xprt, struct rpc_task *task) 1709 { 1710 xprt->ops->alloc_slot(xprt, task); 1711 if (task->tk_rqstp != NULL) 1712 xprt_request_init(task); 1713 } 1714 1715 /** 1716 * xprt_reserve - allocate an RPC request slot 1717 * @task: RPC task requesting a slot allocation 1718 * 1719 * If the transport is marked as being congested, or if no more 1720 * slots are available, place the task on the transport's 1721 * backlog queue. 1722 */ 1723 void xprt_reserve(struct rpc_task *task) 1724 { 1725 struct rpc_xprt *xprt = task->tk_xprt; 1726 1727 task->tk_status = 0; 1728 if (task->tk_rqstp != NULL) 1729 return; 1730 1731 task->tk_status = -EAGAIN; 1732 if (!xprt_throttle_congested(xprt, task)) 1733 xprt_do_reserve(xprt, task); 1734 } 1735 1736 /** 1737 * xprt_retry_reserve - allocate an RPC request slot 1738 * @task: RPC task requesting a slot allocation 1739 * 1740 * If no more slots are available, place the task on the transport's 1741 * backlog queue. 1742 * Note that the only difference with xprt_reserve is that we now 1743 * ignore the value of the XPRT_CONGESTED flag. 1744 */ 1745 void xprt_retry_reserve(struct rpc_task *task) 1746 { 1747 struct rpc_xprt *xprt = task->tk_xprt; 1748 1749 task->tk_status = 0; 1750 if (task->tk_rqstp != NULL) 1751 return; 1752 1753 task->tk_status = -EAGAIN; 1754 xprt_do_reserve(xprt, task); 1755 } 1756 1757 static void 1758 xprt_request_dequeue_all(struct rpc_task *task, struct rpc_rqst *req) 1759 { 1760 struct rpc_xprt *xprt = req->rq_xprt; 1761 1762 if (test_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate) || 1763 test_bit(RPC_TASK_NEED_RECV, &task->tk_runstate) || 1764 xprt_is_pinned_rqst(req)) { 1765 spin_lock(&xprt->queue_lock); 1766 xprt_request_dequeue_transmit_locked(task); 1767 xprt_request_dequeue_receive_locked(task); 1768 while (xprt_is_pinned_rqst(req)) { 1769 set_bit(RPC_TASK_MSG_PIN_WAIT, &task->tk_runstate); 1770 spin_unlock(&xprt->queue_lock); 1771 xprt_wait_on_pinned_rqst(req); 1772 spin_lock(&xprt->queue_lock); 1773 clear_bit(RPC_TASK_MSG_PIN_WAIT, &task->tk_runstate); 1774 } 1775 spin_unlock(&xprt->queue_lock); 1776 } 1777 } 1778 1779 /** 1780 * xprt_release - release an RPC request slot 1781 * @task: task which is finished with the slot 1782 * 1783 */ 1784 void xprt_release(struct rpc_task *task) 1785 { 1786 struct rpc_xprt *xprt; 1787 struct rpc_rqst *req = task->tk_rqstp; 1788 1789 if (req == NULL) { 1790 if (task->tk_client) { 1791 xprt = task->tk_xprt; 1792 xprt_release_write(xprt, task); 1793 } 1794 return; 1795 } 1796 1797 xprt = req->rq_xprt; 1798 xprt_request_dequeue_all(task, req); 1799 spin_lock(&xprt->transport_lock); 1800 xprt->ops->release_xprt(xprt, task); 1801 if (xprt->ops->release_request) 1802 xprt->ops->release_request(task); 1803 xprt_schedule_autodisconnect(xprt); 1804 spin_unlock(&xprt->transport_lock); 1805 if (req->rq_buffer) 1806 xprt->ops->buf_free(task); 1807 xprt_inject_disconnect(xprt); 1808 xdr_free_bvec(&req->rq_rcv_buf); 1809 xdr_free_bvec(&req->rq_snd_buf); 1810 if (req->rq_cred != NULL) 1811 put_rpccred(req->rq_cred); 1812 task->tk_rqstp = NULL; 1813 if (req->rq_release_snd_buf) 1814 req->rq_release_snd_buf(req); 1815 1816 dprintk("RPC: %5u release request %p\n", task->tk_pid, req); 1817 if (likely(!bc_prealloc(req))) 1818 xprt->ops->free_slot(xprt, req); 1819 else 1820 xprt_free_bc_request(req); 1821 } 1822 1823 #ifdef CONFIG_SUNRPC_BACKCHANNEL 1824 void 1825 xprt_init_bc_request(struct rpc_rqst *req, struct rpc_task *task) 1826 { 1827 struct xdr_buf *xbufp = &req->rq_snd_buf; 1828 1829 task->tk_rqstp = req; 1830 req->rq_task = task; 1831 xprt_init_connect_cookie(req, req->rq_xprt); 1832 /* 1833 * Set up the xdr_buf length. 1834 * This also indicates that the buffer is XDR encoded already. 1835 */ 1836 xbufp->len = xbufp->head[0].iov_len + xbufp->page_len + 1837 xbufp->tail[0].iov_len; 1838 } 1839 #endif 1840 1841 static void xprt_init(struct rpc_xprt *xprt, struct net *net) 1842 { 1843 kref_init(&xprt->kref); 1844 1845 spin_lock_init(&xprt->transport_lock); 1846 spin_lock_init(&xprt->reserve_lock); 1847 spin_lock_init(&xprt->queue_lock); 1848 1849 INIT_LIST_HEAD(&xprt->free); 1850 xprt->recv_queue = RB_ROOT; 1851 INIT_LIST_HEAD(&xprt->xmit_queue); 1852 #if defined(CONFIG_SUNRPC_BACKCHANNEL) 1853 spin_lock_init(&xprt->bc_pa_lock); 1854 INIT_LIST_HEAD(&xprt->bc_pa_list); 1855 #endif /* CONFIG_SUNRPC_BACKCHANNEL */ 1856 INIT_LIST_HEAD(&xprt->xprt_switch); 1857 1858 xprt->last_used = jiffies; 1859 xprt->cwnd = RPC_INITCWND; 1860 xprt->bind_index = 0; 1861 1862 rpc_init_wait_queue(&xprt->binding, "xprt_binding"); 1863 rpc_init_wait_queue(&xprt->pending, "xprt_pending"); 1864 rpc_init_wait_queue(&xprt->sending, "xprt_sending"); 1865 rpc_init_priority_wait_queue(&xprt->backlog, "xprt_backlog"); 1866 1867 xprt_init_xid(xprt); 1868 1869 xprt->xprt_net = get_net(net); 1870 } 1871 1872 /** 1873 * xprt_create_transport - create an RPC transport 1874 * @args: rpc transport creation arguments 1875 * 1876 */ 1877 struct rpc_xprt *xprt_create_transport(struct xprt_create *args) 1878 { 1879 struct rpc_xprt *xprt; 1880 struct xprt_class *t; 1881 1882 spin_lock(&xprt_list_lock); 1883 list_for_each_entry(t, &xprt_list, list) { 1884 if (t->ident == args->ident) { 1885 spin_unlock(&xprt_list_lock); 1886 goto found; 1887 } 1888 } 1889 spin_unlock(&xprt_list_lock); 1890 dprintk("RPC: transport (%d) not supported\n", args->ident); 1891 return ERR_PTR(-EIO); 1892 1893 found: 1894 xprt = t->setup(args); 1895 if (IS_ERR(xprt)) { 1896 dprintk("RPC: xprt_create_transport: failed, %ld\n", 1897 -PTR_ERR(xprt)); 1898 goto out; 1899 } 1900 if (args->flags & XPRT_CREATE_NO_IDLE_TIMEOUT) 1901 xprt->idle_timeout = 0; 1902 INIT_WORK(&xprt->task_cleanup, xprt_autoclose); 1903 if (xprt_has_timer(xprt)) 1904 timer_setup(&xprt->timer, xprt_init_autodisconnect, 0); 1905 else 1906 timer_setup(&xprt->timer, NULL, 0); 1907 1908 if (strlen(args->servername) > RPC_MAXNETNAMELEN) { 1909 xprt_destroy(xprt); 1910 return ERR_PTR(-EINVAL); 1911 } 1912 xprt->servername = kstrdup(args->servername, GFP_KERNEL); 1913 if (xprt->servername == NULL) { 1914 xprt_destroy(xprt); 1915 return ERR_PTR(-ENOMEM); 1916 } 1917 1918 rpc_xprt_debugfs_register(xprt); 1919 1920 dprintk("RPC: created transport %p with %u slots\n", xprt, 1921 xprt->max_reqs); 1922 out: 1923 return xprt; 1924 } 1925 1926 static void xprt_destroy_cb(struct work_struct *work) 1927 { 1928 struct rpc_xprt *xprt = 1929 container_of(work, struct rpc_xprt, task_cleanup); 1930 1931 rpc_xprt_debugfs_unregister(xprt); 1932 rpc_destroy_wait_queue(&xprt->binding); 1933 rpc_destroy_wait_queue(&xprt->pending); 1934 rpc_destroy_wait_queue(&xprt->sending); 1935 rpc_destroy_wait_queue(&xprt->backlog); 1936 kfree(xprt->servername); 1937 /* 1938 * Tear down transport state and free the rpc_xprt 1939 */ 1940 xprt->ops->destroy(xprt); 1941 } 1942 1943 /** 1944 * xprt_destroy - destroy an RPC transport, killing off all requests. 1945 * @xprt: transport to destroy 1946 * 1947 */ 1948 static void xprt_destroy(struct rpc_xprt *xprt) 1949 { 1950 dprintk("RPC: destroying transport %p\n", xprt); 1951 1952 /* 1953 * Exclude transport connect/disconnect handlers and autoclose 1954 */ 1955 wait_on_bit_lock(&xprt->state, XPRT_LOCKED, TASK_UNINTERRUPTIBLE); 1956 1957 del_timer_sync(&xprt->timer); 1958 1959 /* 1960 * Destroy sockets etc from the system workqueue so they can 1961 * safely flush receive work running on rpciod. 1962 */ 1963 INIT_WORK(&xprt->task_cleanup, xprt_destroy_cb); 1964 schedule_work(&xprt->task_cleanup); 1965 } 1966 1967 static void xprt_destroy_kref(struct kref *kref) 1968 { 1969 xprt_destroy(container_of(kref, struct rpc_xprt, kref)); 1970 } 1971 1972 /** 1973 * xprt_get - return a reference to an RPC transport. 1974 * @xprt: pointer to the transport 1975 * 1976 */ 1977 struct rpc_xprt *xprt_get(struct rpc_xprt *xprt) 1978 { 1979 if (xprt != NULL && kref_get_unless_zero(&xprt->kref)) 1980 return xprt; 1981 return NULL; 1982 } 1983 EXPORT_SYMBOL_GPL(xprt_get); 1984 1985 /** 1986 * xprt_put - release a reference to an RPC transport. 1987 * @xprt: pointer to the transport 1988 * 1989 */ 1990 void xprt_put(struct rpc_xprt *xprt) 1991 { 1992 if (xprt != NULL) 1993 kref_put(&xprt->kref, xprt_destroy_kref); 1994 } 1995 EXPORT_SYMBOL_GPL(xprt_put); 1996