1 // SPDX-License-Identifier: GPL-2.0-only 2 /* 3 * linux/net/sunrpc/xprt.c 4 * 5 * This is a generic RPC call interface supporting congestion avoidance, 6 * and asynchronous calls. 7 * 8 * The interface works like this: 9 * 10 * - When a process places a call, it allocates a request slot if 11 * one is available. Otherwise, it sleeps on the backlog queue 12 * (xprt_reserve). 13 * - Next, the caller puts together the RPC message, stuffs it into 14 * the request struct, and calls xprt_transmit(). 15 * - xprt_transmit sends the message and installs the caller on the 16 * transport's wait list. At the same time, if a reply is expected, 17 * it installs a timer that is run after the packet's timeout has 18 * expired. 19 * - When a packet arrives, the data_ready handler walks the list of 20 * pending requests for that transport. If a matching XID is found, the 21 * caller is woken up, and the timer removed. 22 * - When no reply arrives within the timeout interval, the timer is 23 * fired by the kernel and runs xprt_timer(). It either adjusts the 24 * timeout values (minor timeout) or wakes up the caller with a status 25 * of -ETIMEDOUT. 26 * - When the caller receives a notification from RPC that a reply arrived, 27 * it should release the RPC slot, and process the reply. 28 * If the call timed out, it may choose to retry the operation by 29 * adjusting the initial timeout value, and simply calling rpc_call 30 * again. 31 * 32 * Support for async RPC is done through a set of RPC-specific scheduling 33 * primitives that `transparently' work for processes as well as async 34 * tasks that rely on callbacks. 35 * 36 * Copyright (C) 1995-1997, Olaf Kirch <okir@monad.swb.de> 37 * 38 * Transport switch API copyright (C) 2005, Chuck Lever <cel@netapp.com> 39 */ 40 41 #include <linux/module.h> 42 43 #include <linux/types.h> 44 #include <linux/interrupt.h> 45 #include <linux/workqueue.h> 46 #include <linux/net.h> 47 #include <linux/ktime.h> 48 49 #include <linux/sunrpc/clnt.h> 50 #include <linux/sunrpc/metrics.h> 51 #include <linux/sunrpc/bc_xprt.h> 52 #include <linux/rcupdate.h> 53 #include <linux/sched/mm.h> 54 55 #include <trace/events/sunrpc.h> 56 57 #include "sunrpc.h" 58 59 /* 60 * Local variables 61 */ 62 63 #if IS_ENABLED(CONFIG_SUNRPC_DEBUG) 64 # define RPCDBG_FACILITY RPCDBG_XPRT 65 #endif 66 67 /* 68 * Local functions 69 */ 70 static void xprt_init(struct rpc_xprt *xprt, struct net *net); 71 static __be32 xprt_alloc_xid(struct rpc_xprt *xprt); 72 static void xprt_destroy(struct rpc_xprt *xprt); 73 74 static DEFINE_SPINLOCK(xprt_list_lock); 75 static LIST_HEAD(xprt_list); 76 77 static unsigned long xprt_request_timeout(const struct rpc_rqst *req) 78 { 79 unsigned long timeout = jiffies + req->rq_timeout; 80 81 if (time_before(timeout, req->rq_majortimeo)) 82 return timeout; 83 return req->rq_majortimeo; 84 } 85 86 /** 87 * xprt_register_transport - register a transport implementation 88 * @transport: transport to register 89 * 90 * If a transport implementation is loaded as a kernel module, it can 91 * call this interface to make itself known to the RPC client. 92 * 93 * Returns: 94 * 0: transport successfully registered 95 * -EEXIST: transport already registered 96 * -EINVAL: transport module being unloaded 97 */ 98 int xprt_register_transport(struct xprt_class *transport) 99 { 100 struct xprt_class *t; 101 int result; 102 103 result = -EEXIST; 104 spin_lock(&xprt_list_lock); 105 list_for_each_entry(t, &xprt_list, list) { 106 /* don't register the same transport class twice */ 107 if (t->ident == transport->ident) 108 goto out; 109 } 110 111 list_add_tail(&transport->list, &xprt_list); 112 printk(KERN_INFO "RPC: Registered %s transport module.\n", 113 transport->name); 114 result = 0; 115 116 out: 117 spin_unlock(&xprt_list_lock); 118 return result; 119 } 120 EXPORT_SYMBOL_GPL(xprt_register_transport); 121 122 /** 123 * xprt_unregister_transport - unregister a transport implementation 124 * @transport: transport to unregister 125 * 126 * Returns: 127 * 0: transport successfully unregistered 128 * -ENOENT: transport never registered 129 */ 130 int xprt_unregister_transport(struct xprt_class *transport) 131 { 132 struct xprt_class *t; 133 int result; 134 135 result = 0; 136 spin_lock(&xprt_list_lock); 137 list_for_each_entry(t, &xprt_list, list) { 138 if (t == transport) { 139 printk(KERN_INFO 140 "RPC: Unregistered %s transport module.\n", 141 transport->name); 142 list_del_init(&transport->list); 143 goto out; 144 } 145 } 146 result = -ENOENT; 147 148 out: 149 spin_unlock(&xprt_list_lock); 150 return result; 151 } 152 EXPORT_SYMBOL_GPL(xprt_unregister_transport); 153 154 /** 155 * xprt_load_transport - load a transport implementation 156 * @transport_name: transport to load 157 * 158 * Returns: 159 * 0: transport successfully loaded 160 * -ENOENT: transport module not available 161 */ 162 int xprt_load_transport(const char *transport_name) 163 { 164 struct xprt_class *t; 165 int result; 166 167 result = 0; 168 spin_lock(&xprt_list_lock); 169 list_for_each_entry(t, &xprt_list, list) { 170 if (strcmp(t->name, transport_name) == 0) { 171 spin_unlock(&xprt_list_lock); 172 goto out; 173 } 174 } 175 spin_unlock(&xprt_list_lock); 176 result = request_module("xprt%s", transport_name); 177 out: 178 return result; 179 } 180 EXPORT_SYMBOL_GPL(xprt_load_transport); 181 182 static void xprt_clear_locked(struct rpc_xprt *xprt) 183 { 184 xprt->snd_task = NULL; 185 if (!test_bit(XPRT_CLOSE_WAIT, &xprt->state)) { 186 smp_mb__before_atomic(); 187 clear_bit(XPRT_LOCKED, &xprt->state); 188 smp_mb__after_atomic(); 189 } else 190 queue_work(xprtiod_workqueue, &xprt->task_cleanup); 191 } 192 193 /** 194 * xprt_reserve_xprt - serialize write access to transports 195 * @task: task that is requesting access to the transport 196 * @xprt: pointer to the target transport 197 * 198 * This prevents mixing the payload of separate requests, and prevents 199 * transport connects from colliding with writes. No congestion control 200 * is provided. 201 */ 202 int xprt_reserve_xprt(struct rpc_xprt *xprt, struct rpc_task *task) 203 { 204 struct rpc_rqst *req = task->tk_rqstp; 205 206 if (test_and_set_bit(XPRT_LOCKED, &xprt->state)) { 207 if (task == xprt->snd_task) 208 goto out_locked; 209 goto out_sleep; 210 } 211 if (test_bit(XPRT_WRITE_SPACE, &xprt->state)) 212 goto out_unlock; 213 xprt->snd_task = task; 214 215 out_locked: 216 trace_xprt_reserve_xprt(xprt, task); 217 return 1; 218 219 out_unlock: 220 xprt_clear_locked(xprt); 221 out_sleep: 222 task->tk_status = -EAGAIN; 223 if (RPC_IS_SOFT(task)) 224 rpc_sleep_on_timeout(&xprt->sending, task, NULL, 225 xprt_request_timeout(req)); 226 else 227 rpc_sleep_on(&xprt->sending, task, NULL); 228 return 0; 229 } 230 EXPORT_SYMBOL_GPL(xprt_reserve_xprt); 231 232 static bool 233 xprt_need_congestion_window_wait(struct rpc_xprt *xprt) 234 { 235 return test_bit(XPRT_CWND_WAIT, &xprt->state); 236 } 237 238 static void 239 xprt_set_congestion_window_wait(struct rpc_xprt *xprt) 240 { 241 if (!list_empty(&xprt->xmit_queue)) { 242 /* Peek at head of queue to see if it can make progress */ 243 if (list_first_entry(&xprt->xmit_queue, struct rpc_rqst, 244 rq_xmit)->rq_cong) 245 return; 246 } 247 set_bit(XPRT_CWND_WAIT, &xprt->state); 248 } 249 250 static void 251 xprt_test_and_clear_congestion_window_wait(struct rpc_xprt *xprt) 252 { 253 if (!RPCXPRT_CONGESTED(xprt)) 254 clear_bit(XPRT_CWND_WAIT, &xprt->state); 255 } 256 257 /* 258 * xprt_reserve_xprt_cong - serialize write access to transports 259 * @task: task that is requesting access to the transport 260 * 261 * Same as xprt_reserve_xprt, but Van Jacobson congestion control is 262 * integrated into the decision of whether a request is allowed to be 263 * woken up and given access to the transport. 264 * Note that the lock is only granted if we know there are free slots. 265 */ 266 int xprt_reserve_xprt_cong(struct rpc_xprt *xprt, struct rpc_task *task) 267 { 268 struct rpc_rqst *req = task->tk_rqstp; 269 270 if (test_and_set_bit(XPRT_LOCKED, &xprt->state)) { 271 if (task == xprt->snd_task) 272 goto out_locked; 273 goto out_sleep; 274 } 275 if (req == NULL) { 276 xprt->snd_task = task; 277 goto out_locked; 278 } 279 if (test_bit(XPRT_WRITE_SPACE, &xprt->state)) 280 goto out_unlock; 281 if (!xprt_need_congestion_window_wait(xprt)) { 282 xprt->snd_task = task; 283 goto out_locked; 284 } 285 out_unlock: 286 xprt_clear_locked(xprt); 287 out_sleep: 288 task->tk_status = -EAGAIN; 289 if (RPC_IS_SOFT(task)) 290 rpc_sleep_on_timeout(&xprt->sending, task, NULL, 291 xprt_request_timeout(req)); 292 else 293 rpc_sleep_on(&xprt->sending, task, NULL); 294 return 0; 295 out_locked: 296 trace_xprt_reserve_cong(xprt, task); 297 return 1; 298 } 299 EXPORT_SYMBOL_GPL(xprt_reserve_xprt_cong); 300 301 static inline int xprt_lock_write(struct rpc_xprt *xprt, struct rpc_task *task) 302 { 303 int retval; 304 305 if (test_bit(XPRT_LOCKED, &xprt->state) && xprt->snd_task == task) 306 return 1; 307 spin_lock(&xprt->transport_lock); 308 retval = xprt->ops->reserve_xprt(xprt, task); 309 spin_unlock(&xprt->transport_lock); 310 return retval; 311 } 312 313 static bool __xprt_lock_write_func(struct rpc_task *task, void *data) 314 { 315 struct rpc_xprt *xprt = data; 316 317 xprt->snd_task = task; 318 return true; 319 } 320 321 static void __xprt_lock_write_next(struct rpc_xprt *xprt) 322 { 323 if (test_and_set_bit(XPRT_LOCKED, &xprt->state)) 324 return; 325 if (test_bit(XPRT_WRITE_SPACE, &xprt->state)) 326 goto out_unlock; 327 if (rpc_wake_up_first_on_wq(xprtiod_workqueue, &xprt->sending, 328 __xprt_lock_write_func, xprt)) 329 return; 330 out_unlock: 331 xprt_clear_locked(xprt); 332 } 333 334 static void __xprt_lock_write_next_cong(struct rpc_xprt *xprt) 335 { 336 if (test_and_set_bit(XPRT_LOCKED, &xprt->state)) 337 return; 338 if (test_bit(XPRT_WRITE_SPACE, &xprt->state)) 339 goto out_unlock; 340 if (xprt_need_congestion_window_wait(xprt)) 341 goto out_unlock; 342 if (rpc_wake_up_first_on_wq(xprtiod_workqueue, &xprt->sending, 343 __xprt_lock_write_func, xprt)) 344 return; 345 out_unlock: 346 xprt_clear_locked(xprt); 347 } 348 349 /** 350 * xprt_release_xprt - allow other requests to use a transport 351 * @xprt: transport with other tasks potentially waiting 352 * @task: task that is releasing access to the transport 353 * 354 * Note that "task" can be NULL. No congestion control is provided. 355 */ 356 void xprt_release_xprt(struct rpc_xprt *xprt, struct rpc_task *task) 357 { 358 if (xprt->snd_task == task) { 359 xprt_clear_locked(xprt); 360 __xprt_lock_write_next(xprt); 361 } 362 trace_xprt_release_xprt(xprt, task); 363 } 364 EXPORT_SYMBOL_GPL(xprt_release_xprt); 365 366 /** 367 * xprt_release_xprt_cong - allow other requests to use a transport 368 * @xprt: transport with other tasks potentially waiting 369 * @task: task that is releasing access to the transport 370 * 371 * Note that "task" can be NULL. Another task is awoken to use the 372 * transport if the transport's congestion window allows it. 373 */ 374 void xprt_release_xprt_cong(struct rpc_xprt *xprt, struct rpc_task *task) 375 { 376 if (xprt->snd_task == task) { 377 xprt_clear_locked(xprt); 378 __xprt_lock_write_next_cong(xprt); 379 } 380 trace_xprt_release_cong(xprt, task); 381 } 382 EXPORT_SYMBOL_GPL(xprt_release_xprt_cong); 383 384 static inline void xprt_release_write(struct rpc_xprt *xprt, struct rpc_task *task) 385 { 386 if (xprt->snd_task != task) 387 return; 388 spin_lock(&xprt->transport_lock); 389 xprt->ops->release_xprt(xprt, task); 390 spin_unlock(&xprt->transport_lock); 391 } 392 393 /* 394 * Van Jacobson congestion avoidance. Check if the congestion window 395 * overflowed. Put the task to sleep if this is the case. 396 */ 397 static int 398 __xprt_get_cong(struct rpc_xprt *xprt, struct rpc_rqst *req) 399 { 400 if (req->rq_cong) 401 return 1; 402 trace_xprt_get_cong(xprt, req->rq_task); 403 if (RPCXPRT_CONGESTED(xprt)) { 404 xprt_set_congestion_window_wait(xprt); 405 return 0; 406 } 407 req->rq_cong = 1; 408 xprt->cong += RPC_CWNDSCALE; 409 return 1; 410 } 411 412 /* 413 * Adjust the congestion window, and wake up the next task 414 * that has been sleeping due to congestion 415 */ 416 static void 417 __xprt_put_cong(struct rpc_xprt *xprt, struct rpc_rqst *req) 418 { 419 if (!req->rq_cong) 420 return; 421 req->rq_cong = 0; 422 xprt->cong -= RPC_CWNDSCALE; 423 xprt_test_and_clear_congestion_window_wait(xprt); 424 trace_xprt_put_cong(xprt, req->rq_task); 425 __xprt_lock_write_next_cong(xprt); 426 } 427 428 /** 429 * xprt_request_get_cong - Request congestion control credits 430 * @xprt: pointer to transport 431 * @req: pointer to RPC request 432 * 433 * Useful for transports that require congestion control. 434 */ 435 bool 436 xprt_request_get_cong(struct rpc_xprt *xprt, struct rpc_rqst *req) 437 { 438 bool ret = false; 439 440 if (req->rq_cong) 441 return true; 442 spin_lock(&xprt->transport_lock); 443 ret = __xprt_get_cong(xprt, req) != 0; 444 spin_unlock(&xprt->transport_lock); 445 return ret; 446 } 447 EXPORT_SYMBOL_GPL(xprt_request_get_cong); 448 449 /** 450 * xprt_release_rqst_cong - housekeeping when request is complete 451 * @task: RPC request that recently completed 452 * 453 * Useful for transports that require congestion control. 454 */ 455 void xprt_release_rqst_cong(struct rpc_task *task) 456 { 457 struct rpc_rqst *req = task->tk_rqstp; 458 459 __xprt_put_cong(req->rq_xprt, req); 460 } 461 EXPORT_SYMBOL_GPL(xprt_release_rqst_cong); 462 463 static void xprt_clear_congestion_window_wait_locked(struct rpc_xprt *xprt) 464 { 465 if (test_and_clear_bit(XPRT_CWND_WAIT, &xprt->state)) 466 __xprt_lock_write_next_cong(xprt); 467 } 468 469 /* 470 * Clear the congestion window wait flag and wake up the next 471 * entry on xprt->sending 472 */ 473 static void 474 xprt_clear_congestion_window_wait(struct rpc_xprt *xprt) 475 { 476 if (test_and_clear_bit(XPRT_CWND_WAIT, &xprt->state)) { 477 spin_lock(&xprt->transport_lock); 478 __xprt_lock_write_next_cong(xprt); 479 spin_unlock(&xprt->transport_lock); 480 } 481 } 482 483 /** 484 * xprt_adjust_cwnd - adjust transport congestion window 485 * @xprt: pointer to xprt 486 * @task: recently completed RPC request used to adjust window 487 * @result: result code of completed RPC request 488 * 489 * The transport code maintains an estimate on the maximum number of out- 490 * standing RPC requests, using a smoothed version of the congestion 491 * avoidance implemented in 44BSD. This is basically the Van Jacobson 492 * congestion algorithm: If a retransmit occurs, the congestion window is 493 * halved; otherwise, it is incremented by 1/cwnd when 494 * 495 * - a reply is received and 496 * - a full number of requests are outstanding and 497 * - the congestion window hasn't been updated recently. 498 */ 499 void xprt_adjust_cwnd(struct rpc_xprt *xprt, struct rpc_task *task, int result) 500 { 501 struct rpc_rqst *req = task->tk_rqstp; 502 unsigned long cwnd = xprt->cwnd; 503 504 if (result >= 0 && cwnd <= xprt->cong) { 505 /* The (cwnd >> 1) term makes sure 506 * the result gets rounded properly. */ 507 cwnd += (RPC_CWNDSCALE * RPC_CWNDSCALE + (cwnd >> 1)) / cwnd; 508 if (cwnd > RPC_MAXCWND(xprt)) 509 cwnd = RPC_MAXCWND(xprt); 510 __xprt_lock_write_next_cong(xprt); 511 } else if (result == -ETIMEDOUT) { 512 cwnd >>= 1; 513 if (cwnd < RPC_CWNDSCALE) 514 cwnd = RPC_CWNDSCALE; 515 } 516 dprintk("RPC: cong %ld, cwnd was %ld, now %ld\n", 517 xprt->cong, xprt->cwnd, cwnd); 518 xprt->cwnd = cwnd; 519 __xprt_put_cong(xprt, req); 520 } 521 EXPORT_SYMBOL_GPL(xprt_adjust_cwnd); 522 523 /** 524 * xprt_wake_pending_tasks - wake all tasks on a transport's pending queue 525 * @xprt: transport with waiting tasks 526 * @status: result code to plant in each task before waking it 527 * 528 */ 529 void xprt_wake_pending_tasks(struct rpc_xprt *xprt, int status) 530 { 531 if (status < 0) 532 rpc_wake_up_status(&xprt->pending, status); 533 else 534 rpc_wake_up(&xprt->pending); 535 } 536 EXPORT_SYMBOL_GPL(xprt_wake_pending_tasks); 537 538 /** 539 * xprt_wait_for_buffer_space - wait for transport output buffer to clear 540 * @xprt: transport 541 * 542 * Note that we only set the timer for the case of RPC_IS_SOFT(), since 543 * we don't in general want to force a socket disconnection due to 544 * an incomplete RPC call transmission. 545 */ 546 void xprt_wait_for_buffer_space(struct rpc_xprt *xprt) 547 { 548 set_bit(XPRT_WRITE_SPACE, &xprt->state); 549 } 550 EXPORT_SYMBOL_GPL(xprt_wait_for_buffer_space); 551 552 static bool 553 xprt_clear_write_space_locked(struct rpc_xprt *xprt) 554 { 555 if (test_and_clear_bit(XPRT_WRITE_SPACE, &xprt->state)) { 556 __xprt_lock_write_next(xprt); 557 dprintk("RPC: write space: waking waiting task on " 558 "xprt %p\n", xprt); 559 return true; 560 } 561 return false; 562 } 563 564 /** 565 * xprt_write_space - wake the task waiting for transport output buffer space 566 * @xprt: transport with waiting tasks 567 * 568 * Can be called in a soft IRQ context, so xprt_write_space never sleeps. 569 */ 570 bool xprt_write_space(struct rpc_xprt *xprt) 571 { 572 bool ret; 573 574 if (!test_bit(XPRT_WRITE_SPACE, &xprt->state)) 575 return false; 576 spin_lock(&xprt->transport_lock); 577 ret = xprt_clear_write_space_locked(xprt); 578 spin_unlock(&xprt->transport_lock); 579 return ret; 580 } 581 EXPORT_SYMBOL_GPL(xprt_write_space); 582 583 static unsigned long xprt_abs_ktime_to_jiffies(ktime_t abstime) 584 { 585 s64 delta = ktime_to_ns(ktime_get() - abstime); 586 return likely(delta >= 0) ? 587 jiffies - nsecs_to_jiffies(delta) : 588 jiffies + nsecs_to_jiffies(-delta); 589 } 590 591 static unsigned long xprt_calc_majortimeo(struct rpc_rqst *req) 592 { 593 const struct rpc_timeout *to = req->rq_task->tk_client->cl_timeout; 594 unsigned long majortimeo = req->rq_timeout; 595 596 if (to->to_exponential) 597 majortimeo <<= to->to_retries; 598 else 599 majortimeo += to->to_increment * to->to_retries; 600 if (majortimeo > to->to_maxval || majortimeo == 0) 601 majortimeo = to->to_maxval; 602 return majortimeo; 603 } 604 605 static void xprt_reset_majortimeo(struct rpc_rqst *req) 606 { 607 req->rq_majortimeo += xprt_calc_majortimeo(req); 608 } 609 610 static void xprt_reset_minortimeo(struct rpc_rqst *req) 611 { 612 req->rq_minortimeo += req->rq_timeout; 613 } 614 615 static void xprt_init_majortimeo(struct rpc_task *task, struct rpc_rqst *req) 616 { 617 unsigned long time_init; 618 struct rpc_xprt *xprt = req->rq_xprt; 619 620 if (likely(xprt && xprt_connected(xprt))) 621 time_init = jiffies; 622 else 623 time_init = xprt_abs_ktime_to_jiffies(task->tk_start); 624 req->rq_timeout = task->tk_client->cl_timeout->to_initval; 625 req->rq_majortimeo = time_init + xprt_calc_majortimeo(req); 626 req->rq_minortimeo = time_init + req->rq_timeout; 627 } 628 629 /** 630 * xprt_adjust_timeout - adjust timeout values for next retransmit 631 * @req: RPC request containing parameters to use for the adjustment 632 * 633 */ 634 int xprt_adjust_timeout(struct rpc_rqst *req) 635 { 636 struct rpc_xprt *xprt = req->rq_xprt; 637 const struct rpc_timeout *to = req->rq_task->tk_client->cl_timeout; 638 int status = 0; 639 640 if (time_before(jiffies, req->rq_minortimeo)) 641 return status; 642 if (time_before(jiffies, req->rq_majortimeo)) { 643 if (to->to_exponential) 644 req->rq_timeout <<= 1; 645 else 646 req->rq_timeout += to->to_increment; 647 if (to->to_maxval && req->rq_timeout >= to->to_maxval) 648 req->rq_timeout = to->to_maxval; 649 req->rq_retries++; 650 } else { 651 req->rq_timeout = to->to_initval; 652 req->rq_retries = 0; 653 xprt_reset_majortimeo(req); 654 /* Reset the RTT counters == "slow start" */ 655 spin_lock(&xprt->transport_lock); 656 rpc_init_rtt(req->rq_task->tk_client->cl_rtt, to->to_initval); 657 spin_unlock(&xprt->transport_lock); 658 status = -ETIMEDOUT; 659 } 660 xprt_reset_minortimeo(req); 661 662 if (req->rq_timeout == 0) { 663 printk(KERN_WARNING "xprt_adjust_timeout: rq_timeout = 0!\n"); 664 req->rq_timeout = 5 * HZ; 665 } 666 return status; 667 } 668 669 static void xprt_autoclose(struct work_struct *work) 670 { 671 struct rpc_xprt *xprt = 672 container_of(work, struct rpc_xprt, task_cleanup); 673 unsigned int pflags = memalloc_nofs_save(); 674 675 trace_xprt_disconnect_auto(xprt); 676 clear_bit(XPRT_CLOSE_WAIT, &xprt->state); 677 xprt->ops->close(xprt); 678 xprt_release_write(xprt, NULL); 679 wake_up_bit(&xprt->state, XPRT_LOCKED); 680 memalloc_nofs_restore(pflags); 681 } 682 683 /** 684 * xprt_disconnect_done - mark a transport as disconnected 685 * @xprt: transport to flag for disconnect 686 * 687 */ 688 void xprt_disconnect_done(struct rpc_xprt *xprt) 689 { 690 trace_xprt_disconnect_done(xprt); 691 spin_lock(&xprt->transport_lock); 692 xprt_clear_connected(xprt); 693 xprt_clear_write_space_locked(xprt); 694 xprt_clear_congestion_window_wait_locked(xprt); 695 xprt_wake_pending_tasks(xprt, -ENOTCONN); 696 spin_unlock(&xprt->transport_lock); 697 } 698 EXPORT_SYMBOL_GPL(xprt_disconnect_done); 699 700 /** 701 * xprt_force_disconnect - force a transport to disconnect 702 * @xprt: transport to disconnect 703 * 704 */ 705 void xprt_force_disconnect(struct rpc_xprt *xprt) 706 { 707 trace_xprt_disconnect_force(xprt); 708 709 /* Don't race with the test_bit() in xprt_clear_locked() */ 710 spin_lock(&xprt->transport_lock); 711 set_bit(XPRT_CLOSE_WAIT, &xprt->state); 712 /* Try to schedule an autoclose RPC call */ 713 if (test_and_set_bit(XPRT_LOCKED, &xprt->state) == 0) 714 queue_work(xprtiod_workqueue, &xprt->task_cleanup); 715 else if (xprt->snd_task) 716 rpc_wake_up_queued_task_set_status(&xprt->pending, 717 xprt->snd_task, -ENOTCONN); 718 spin_unlock(&xprt->transport_lock); 719 } 720 EXPORT_SYMBOL_GPL(xprt_force_disconnect); 721 722 static unsigned int 723 xprt_connect_cookie(struct rpc_xprt *xprt) 724 { 725 return READ_ONCE(xprt->connect_cookie); 726 } 727 728 static bool 729 xprt_request_retransmit_after_disconnect(struct rpc_task *task) 730 { 731 struct rpc_rqst *req = task->tk_rqstp; 732 struct rpc_xprt *xprt = req->rq_xprt; 733 734 return req->rq_connect_cookie != xprt_connect_cookie(xprt) || 735 !xprt_connected(xprt); 736 } 737 738 /** 739 * xprt_conditional_disconnect - force a transport to disconnect 740 * @xprt: transport to disconnect 741 * @cookie: 'connection cookie' 742 * 743 * This attempts to break the connection if and only if 'cookie' matches 744 * the current transport 'connection cookie'. It ensures that we don't 745 * try to break the connection more than once when we need to retransmit 746 * a batch of RPC requests. 747 * 748 */ 749 void xprt_conditional_disconnect(struct rpc_xprt *xprt, unsigned int cookie) 750 { 751 /* Don't race with the test_bit() in xprt_clear_locked() */ 752 spin_lock(&xprt->transport_lock); 753 if (cookie != xprt->connect_cookie) 754 goto out; 755 if (test_bit(XPRT_CLOSING, &xprt->state)) 756 goto out; 757 set_bit(XPRT_CLOSE_WAIT, &xprt->state); 758 /* Try to schedule an autoclose RPC call */ 759 if (test_and_set_bit(XPRT_LOCKED, &xprt->state) == 0) 760 queue_work(xprtiod_workqueue, &xprt->task_cleanup); 761 xprt_wake_pending_tasks(xprt, -EAGAIN); 762 out: 763 spin_unlock(&xprt->transport_lock); 764 } 765 766 static bool 767 xprt_has_timer(const struct rpc_xprt *xprt) 768 { 769 return xprt->idle_timeout != 0; 770 } 771 772 static void 773 xprt_schedule_autodisconnect(struct rpc_xprt *xprt) 774 __must_hold(&xprt->transport_lock) 775 { 776 xprt->last_used = jiffies; 777 if (RB_EMPTY_ROOT(&xprt->recv_queue) && xprt_has_timer(xprt)) 778 mod_timer(&xprt->timer, xprt->last_used + xprt->idle_timeout); 779 } 780 781 static void 782 xprt_init_autodisconnect(struct timer_list *t) 783 { 784 struct rpc_xprt *xprt = from_timer(xprt, t, timer); 785 786 if (!RB_EMPTY_ROOT(&xprt->recv_queue)) 787 return; 788 /* Reset xprt->last_used to avoid connect/autodisconnect cycling */ 789 xprt->last_used = jiffies; 790 if (test_and_set_bit(XPRT_LOCKED, &xprt->state)) 791 return; 792 queue_work(xprtiod_workqueue, &xprt->task_cleanup); 793 } 794 795 bool xprt_lock_connect(struct rpc_xprt *xprt, 796 struct rpc_task *task, 797 void *cookie) 798 { 799 bool ret = false; 800 801 spin_lock(&xprt->transport_lock); 802 if (!test_bit(XPRT_LOCKED, &xprt->state)) 803 goto out; 804 if (xprt->snd_task != task) 805 goto out; 806 xprt->snd_task = cookie; 807 ret = true; 808 out: 809 spin_unlock(&xprt->transport_lock); 810 return ret; 811 } 812 813 void xprt_unlock_connect(struct rpc_xprt *xprt, void *cookie) 814 { 815 spin_lock(&xprt->transport_lock); 816 if (xprt->snd_task != cookie) 817 goto out; 818 if (!test_bit(XPRT_LOCKED, &xprt->state)) 819 goto out; 820 xprt->snd_task =NULL; 821 xprt->ops->release_xprt(xprt, NULL); 822 xprt_schedule_autodisconnect(xprt); 823 out: 824 spin_unlock(&xprt->transport_lock); 825 wake_up_bit(&xprt->state, XPRT_LOCKED); 826 } 827 828 /** 829 * xprt_connect - schedule a transport connect operation 830 * @task: RPC task that is requesting the connect 831 * 832 */ 833 void xprt_connect(struct rpc_task *task) 834 { 835 struct rpc_xprt *xprt = task->tk_rqstp->rq_xprt; 836 837 dprintk("RPC: %5u xprt_connect xprt %p %s connected\n", task->tk_pid, 838 xprt, (xprt_connected(xprt) ? "is" : "is not")); 839 840 if (!xprt_bound(xprt)) { 841 task->tk_status = -EAGAIN; 842 return; 843 } 844 if (!xprt_lock_write(xprt, task)) 845 return; 846 847 if (test_and_clear_bit(XPRT_CLOSE_WAIT, &xprt->state)) { 848 trace_xprt_disconnect_cleanup(xprt); 849 xprt->ops->close(xprt); 850 } 851 852 if (!xprt_connected(xprt)) { 853 task->tk_rqstp->rq_connect_cookie = xprt->connect_cookie; 854 rpc_sleep_on_timeout(&xprt->pending, task, NULL, 855 xprt_request_timeout(task->tk_rqstp)); 856 857 if (test_bit(XPRT_CLOSING, &xprt->state)) 858 return; 859 if (xprt_test_and_set_connecting(xprt)) 860 return; 861 /* Race breaker */ 862 if (!xprt_connected(xprt)) { 863 xprt->stat.connect_start = jiffies; 864 xprt->ops->connect(xprt, task); 865 } else { 866 xprt_clear_connecting(xprt); 867 task->tk_status = 0; 868 rpc_wake_up_queued_task(&xprt->pending, task); 869 } 870 } 871 xprt_release_write(xprt, task); 872 } 873 874 /** 875 * xprt_reconnect_delay - compute the wait before scheduling a connect 876 * @xprt: transport instance 877 * 878 */ 879 unsigned long xprt_reconnect_delay(const struct rpc_xprt *xprt) 880 { 881 unsigned long start, now = jiffies; 882 883 start = xprt->stat.connect_start + xprt->reestablish_timeout; 884 if (time_after(start, now)) 885 return start - now; 886 return 0; 887 } 888 EXPORT_SYMBOL_GPL(xprt_reconnect_delay); 889 890 /** 891 * xprt_reconnect_backoff - compute the new re-establish timeout 892 * @xprt: transport instance 893 * @init_to: initial reestablish timeout 894 * 895 */ 896 void xprt_reconnect_backoff(struct rpc_xprt *xprt, unsigned long init_to) 897 { 898 xprt->reestablish_timeout <<= 1; 899 if (xprt->reestablish_timeout > xprt->max_reconnect_timeout) 900 xprt->reestablish_timeout = xprt->max_reconnect_timeout; 901 if (xprt->reestablish_timeout < init_to) 902 xprt->reestablish_timeout = init_to; 903 } 904 EXPORT_SYMBOL_GPL(xprt_reconnect_backoff); 905 906 enum xprt_xid_rb_cmp { 907 XID_RB_EQUAL, 908 XID_RB_LEFT, 909 XID_RB_RIGHT, 910 }; 911 static enum xprt_xid_rb_cmp 912 xprt_xid_cmp(__be32 xid1, __be32 xid2) 913 { 914 if (xid1 == xid2) 915 return XID_RB_EQUAL; 916 if ((__force u32)xid1 < (__force u32)xid2) 917 return XID_RB_LEFT; 918 return XID_RB_RIGHT; 919 } 920 921 static struct rpc_rqst * 922 xprt_request_rb_find(struct rpc_xprt *xprt, __be32 xid) 923 { 924 struct rb_node *n = xprt->recv_queue.rb_node; 925 struct rpc_rqst *req; 926 927 while (n != NULL) { 928 req = rb_entry(n, struct rpc_rqst, rq_recv); 929 switch (xprt_xid_cmp(xid, req->rq_xid)) { 930 case XID_RB_LEFT: 931 n = n->rb_left; 932 break; 933 case XID_RB_RIGHT: 934 n = n->rb_right; 935 break; 936 case XID_RB_EQUAL: 937 return req; 938 } 939 } 940 return NULL; 941 } 942 943 static void 944 xprt_request_rb_insert(struct rpc_xprt *xprt, struct rpc_rqst *new) 945 { 946 struct rb_node **p = &xprt->recv_queue.rb_node; 947 struct rb_node *n = NULL; 948 struct rpc_rqst *req; 949 950 while (*p != NULL) { 951 n = *p; 952 req = rb_entry(n, struct rpc_rqst, rq_recv); 953 switch(xprt_xid_cmp(new->rq_xid, req->rq_xid)) { 954 case XID_RB_LEFT: 955 p = &n->rb_left; 956 break; 957 case XID_RB_RIGHT: 958 p = &n->rb_right; 959 break; 960 case XID_RB_EQUAL: 961 WARN_ON_ONCE(new != req); 962 return; 963 } 964 } 965 rb_link_node(&new->rq_recv, n, p); 966 rb_insert_color(&new->rq_recv, &xprt->recv_queue); 967 } 968 969 static void 970 xprt_request_rb_remove(struct rpc_xprt *xprt, struct rpc_rqst *req) 971 { 972 rb_erase(&req->rq_recv, &xprt->recv_queue); 973 } 974 975 /** 976 * xprt_lookup_rqst - find an RPC request corresponding to an XID 977 * @xprt: transport on which the original request was transmitted 978 * @xid: RPC XID of incoming reply 979 * 980 * Caller holds xprt->queue_lock. 981 */ 982 struct rpc_rqst *xprt_lookup_rqst(struct rpc_xprt *xprt, __be32 xid) 983 { 984 struct rpc_rqst *entry; 985 986 entry = xprt_request_rb_find(xprt, xid); 987 if (entry != NULL) { 988 trace_xprt_lookup_rqst(xprt, xid, 0); 989 entry->rq_rtt = ktime_sub(ktime_get(), entry->rq_xtime); 990 return entry; 991 } 992 993 dprintk("RPC: xprt_lookup_rqst did not find xid %08x\n", 994 ntohl(xid)); 995 trace_xprt_lookup_rqst(xprt, xid, -ENOENT); 996 xprt->stat.bad_xids++; 997 return NULL; 998 } 999 EXPORT_SYMBOL_GPL(xprt_lookup_rqst); 1000 1001 static bool 1002 xprt_is_pinned_rqst(struct rpc_rqst *req) 1003 { 1004 return atomic_read(&req->rq_pin) != 0; 1005 } 1006 1007 /** 1008 * xprt_pin_rqst - Pin a request on the transport receive list 1009 * @req: Request to pin 1010 * 1011 * Caller must ensure this is atomic with the call to xprt_lookup_rqst() 1012 * so should be holding xprt->queue_lock. 1013 */ 1014 void xprt_pin_rqst(struct rpc_rqst *req) 1015 { 1016 atomic_inc(&req->rq_pin); 1017 } 1018 EXPORT_SYMBOL_GPL(xprt_pin_rqst); 1019 1020 /** 1021 * xprt_unpin_rqst - Unpin a request on the transport receive list 1022 * @req: Request to pin 1023 * 1024 * Caller should be holding xprt->queue_lock. 1025 */ 1026 void xprt_unpin_rqst(struct rpc_rqst *req) 1027 { 1028 if (!test_bit(RPC_TASK_MSG_PIN_WAIT, &req->rq_task->tk_runstate)) { 1029 atomic_dec(&req->rq_pin); 1030 return; 1031 } 1032 if (atomic_dec_and_test(&req->rq_pin)) 1033 wake_up_var(&req->rq_pin); 1034 } 1035 EXPORT_SYMBOL_GPL(xprt_unpin_rqst); 1036 1037 static void xprt_wait_on_pinned_rqst(struct rpc_rqst *req) 1038 { 1039 wait_var_event(&req->rq_pin, !xprt_is_pinned_rqst(req)); 1040 } 1041 1042 static bool 1043 xprt_request_data_received(struct rpc_task *task) 1044 { 1045 return !test_bit(RPC_TASK_NEED_RECV, &task->tk_runstate) && 1046 READ_ONCE(task->tk_rqstp->rq_reply_bytes_recvd) != 0; 1047 } 1048 1049 static bool 1050 xprt_request_need_enqueue_receive(struct rpc_task *task, struct rpc_rqst *req) 1051 { 1052 return !test_bit(RPC_TASK_NEED_RECV, &task->tk_runstate) && 1053 READ_ONCE(task->tk_rqstp->rq_reply_bytes_recvd) == 0; 1054 } 1055 1056 /** 1057 * xprt_request_enqueue_receive - Add an request to the receive queue 1058 * @task: RPC task 1059 * 1060 */ 1061 void 1062 xprt_request_enqueue_receive(struct rpc_task *task) 1063 { 1064 struct rpc_rqst *req = task->tk_rqstp; 1065 struct rpc_xprt *xprt = req->rq_xprt; 1066 1067 if (!xprt_request_need_enqueue_receive(task, req)) 1068 return; 1069 1070 xprt_request_prepare(task->tk_rqstp); 1071 spin_lock(&xprt->queue_lock); 1072 1073 /* Update the softirq receive buffer */ 1074 memcpy(&req->rq_private_buf, &req->rq_rcv_buf, 1075 sizeof(req->rq_private_buf)); 1076 1077 /* Add request to the receive list */ 1078 xprt_request_rb_insert(xprt, req); 1079 set_bit(RPC_TASK_NEED_RECV, &task->tk_runstate); 1080 spin_unlock(&xprt->queue_lock); 1081 1082 /* Turn off autodisconnect */ 1083 del_singleshot_timer_sync(&xprt->timer); 1084 } 1085 1086 /** 1087 * xprt_request_dequeue_receive_locked - Remove a request from the receive queue 1088 * @task: RPC task 1089 * 1090 * Caller must hold xprt->queue_lock. 1091 */ 1092 static void 1093 xprt_request_dequeue_receive_locked(struct rpc_task *task) 1094 { 1095 struct rpc_rqst *req = task->tk_rqstp; 1096 1097 if (test_and_clear_bit(RPC_TASK_NEED_RECV, &task->tk_runstate)) 1098 xprt_request_rb_remove(req->rq_xprt, req); 1099 } 1100 1101 /** 1102 * xprt_update_rtt - Update RPC RTT statistics 1103 * @task: RPC request that recently completed 1104 * 1105 * Caller holds xprt->queue_lock. 1106 */ 1107 void xprt_update_rtt(struct rpc_task *task) 1108 { 1109 struct rpc_rqst *req = task->tk_rqstp; 1110 struct rpc_rtt *rtt = task->tk_client->cl_rtt; 1111 unsigned int timer = task->tk_msg.rpc_proc->p_timer; 1112 long m = usecs_to_jiffies(ktime_to_us(req->rq_rtt)); 1113 1114 if (timer) { 1115 if (req->rq_ntrans == 1) 1116 rpc_update_rtt(rtt, timer, m); 1117 rpc_set_timeo(rtt, timer, req->rq_ntrans - 1); 1118 } 1119 } 1120 EXPORT_SYMBOL_GPL(xprt_update_rtt); 1121 1122 /** 1123 * xprt_complete_rqst - called when reply processing is complete 1124 * @task: RPC request that recently completed 1125 * @copied: actual number of bytes received from the transport 1126 * 1127 * Caller holds xprt->queue_lock. 1128 */ 1129 void xprt_complete_rqst(struct rpc_task *task, int copied) 1130 { 1131 struct rpc_rqst *req = task->tk_rqstp; 1132 struct rpc_xprt *xprt = req->rq_xprt; 1133 1134 trace_xprt_complete_rqst(xprt, req->rq_xid, copied); 1135 1136 xprt->stat.recvs++; 1137 1138 req->rq_private_buf.len = copied; 1139 /* Ensure all writes are done before we update */ 1140 /* req->rq_reply_bytes_recvd */ 1141 smp_wmb(); 1142 req->rq_reply_bytes_recvd = copied; 1143 xprt_request_dequeue_receive_locked(task); 1144 rpc_wake_up_queued_task(&xprt->pending, task); 1145 } 1146 EXPORT_SYMBOL_GPL(xprt_complete_rqst); 1147 1148 static void xprt_timer(struct rpc_task *task) 1149 { 1150 struct rpc_rqst *req = task->tk_rqstp; 1151 struct rpc_xprt *xprt = req->rq_xprt; 1152 1153 if (task->tk_status != -ETIMEDOUT) 1154 return; 1155 1156 trace_xprt_timer(xprt, req->rq_xid, task->tk_status); 1157 if (!req->rq_reply_bytes_recvd) { 1158 if (xprt->ops->timer) 1159 xprt->ops->timer(xprt, task); 1160 } else 1161 task->tk_status = 0; 1162 } 1163 1164 /** 1165 * xprt_wait_for_reply_request_def - wait for reply 1166 * @task: pointer to rpc_task 1167 * 1168 * Set a request's retransmit timeout based on the transport's 1169 * default timeout parameters. Used by transports that don't adjust 1170 * the retransmit timeout based on round-trip time estimation, 1171 * and put the task to sleep on the pending queue. 1172 */ 1173 void xprt_wait_for_reply_request_def(struct rpc_task *task) 1174 { 1175 struct rpc_rqst *req = task->tk_rqstp; 1176 1177 rpc_sleep_on_timeout(&req->rq_xprt->pending, task, xprt_timer, 1178 xprt_request_timeout(req)); 1179 } 1180 EXPORT_SYMBOL_GPL(xprt_wait_for_reply_request_def); 1181 1182 /** 1183 * xprt_wait_for_reply_request_rtt - wait for reply using RTT estimator 1184 * @task: pointer to rpc_task 1185 * 1186 * Set a request's retransmit timeout using the RTT estimator, 1187 * and put the task to sleep on the pending queue. 1188 */ 1189 void xprt_wait_for_reply_request_rtt(struct rpc_task *task) 1190 { 1191 int timer = task->tk_msg.rpc_proc->p_timer; 1192 struct rpc_clnt *clnt = task->tk_client; 1193 struct rpc_rtt *rtt = clnt->cl_rtt; 1194 struct rpc_rqst *req = task->tk_rqstp; 1195 unsigned long max_timeout = clnt->cl_timeout->to_maxval; 1196 unsigned long timeout; 1197 1198 timeout = rpc_calc_rto(rtt, timer); 1199 timeout <<= rpc_ntimeo(rtt, timer) + req->rq_retries; 1200 if (timeout > max_timeout || timeout == 0) 1201 timeout = max_timeout; 1202 rpc_sleep_on_timeout(&req->rq_xprt->pending, task, xprt_timer, 1203 jiffies + timeout); 1204 } 1205 EXPORT_SYMBOL_GPL(xprt_wait_for_reply_request_rtt); 1206 1207 /** 1208 * xprt_request_wait_receive - wait for the reply to an RPC request 1209 * @task: RPC task about to send a request 1210 * 1211 */ 1212 void xprt_request_wait_receive(struct rpc_task *task) 1213 { 1214 struct rpc_rqst *req = task->tk_rqstp; 1215 struct rpc_xprt *xprt = req->rq_xprt; 1216 1217 if (!test_bit(RPC_TASK_NEED_RECV, &task->tk_runstate)) 1218 return; 1219 /* 1220 * Sleep on the pending queue if we're expecting a reply. 1221 * The spinlock ensures atomicity between the test of 1222 * req->rq_reply_bytes_recvd, and the call to rpc_sleep_on(). 1223 */ 1224 spin_lock(&xprt->queue_lock); 1225 if (test_bit(RPC_TASK_NEED_RECV, &task->tk_runstate)) { 1226 xprt->ops->wait_for_reply_request(task); 1227 /* 1228 * Send an extra queue wakeup call if the 1229 * connection was dropped in case the call to 1230 * rpc_sleep_on() raced. 1231 */ 1232 if (xprt_request_retransmit_after_disconnect(task)) 1233 rpc_wake_up_queued_task_set_status(&xprt->pending, 1234 task, -ENOTCONN); 1235 } 1236 spin_unlock(&xprt->queue_lock); 1237 } 1238 1239 static bool 1240 xprt_request_need_enqueue_transmit(struct rpc_task *task, struct rpc_rqst *req) 1241 { 1242 return !test_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate); 1243 } 1244 1245 /** 1246 * xprt_request_enqueue_transmit - queue a task for transmission 1247 * @task: pointer to rpc_task 1248 * 1249 * Add a task to the transmission queue. 1250 */ 1251 void 1252 xprt_request_enqueue_transmit(struct rpc_task *task) 1253 { 1254 struct rpc_rqst *pos, *req = task->tk_rqstp; 1255 struct rpc_xprt *xprt = req->rq_xprt; 1256 1257 if (xprt_request_need_enqueue_transmit(task, req)) { 1258 req->rq_bytes_sent = 0; 1259 spin_lock(&xprt->queue_lock); 1260 /* 1261 * Requests that carry congestion control credits are added 1262 * to the head of the list to avoid starvation issues. 1263 */ 1264 if (req->rq_cong) { 1265 xprt_clear_congestion_window_wait(xprt); 1266 list_for_each_entry(pos, &xprt->xmit_queue, rq_xmit) { 1267 if (pos->rq_cong) 1268 continue; 1269 /* Note: req is added _before_ pos */ 1270 list_add_tail(&req->rq_xmit, &pos->rq_xmit); 1271 INIT_LIST_HEAD(&req->rq_xmit2); 1272 trace_xprt_enq_xmit(task, 1); 1273 goto out; 1274 } 1275 } else if (RPC_IS_SWAPPER(task)) { 1276 list_for_each_entry(pos, &xprt->xmit_queue, rq_xmit) { 1277 if (pos->rq_cong || pos->rq_bytes_sent) 1278 continue; 1279 if (RPC_IS_SWAPPER(pos->rq_task)) 1280 continue; 1281 /* Note: req is added _before_ pos */ 1282 list_add_tail(&req->rq_xmit, &pos->rq_xmit); 1283 INIT_LIST_HEAD(&req->rq_xmit2); 1284 trace_xprt_enq_xmit(task, 2); 1285 goto out; 1286 } 1287 } else if (!req->rq_seqno) { 1288 list_for_each_entry(pos, &xprt->xmit_queue, rq_xmit) { 1289 if (pos->rq_task->tk_owner != task->tk_owner) 1290 continue; 1291 list_add_tail(&req->rq_xmit2, &pos->rq_xmit2); 1292 INIT_LIST_HEAD(&req->rq_xmit); 1293 trace_xprt_enq_xmit(task, 3); 1294 goto out; 1295 } 1296 } 1297 list_add_tail(&req->rq_xmit, &xprt->xmit_queue); 1298 INIT_LIST_HEAD(&req->rq_xmit2); 1299 trace_xprt_enq_xmit(task, 4); 1300 out: 1301 set_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate); 1302 spin_unlock(&xprt->queue_lock); 1303 } 1304 } 1305 1306 /** 1307 * xprt_request_dequeue_transmit_locked - remove a task from the transmission queue 1308 * @task: pointer to rpc_task 1309 * 1310 * Remove a task from the transmission queue 1311 * Caller must hold xprt->queue_lock 1312 */ 1313 static void 1314 xprt_request_dequeue_transmit_locked(struct rpc_task *task) 1315 { 1316 struct rpc_rqst *req = task->tk_rqstp; 1317 1318 if (!test_and_clear_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate)) 1319 return; 1320 if (!list_empty(&req->rq_xmit)) { 1321 list_del(&req->rq_xmit); 1322 if (!list_empty(&req->rq_xmit2)) { 1323 struct rpc_rqst *next = list_first_entry(&req->rq_xmit2, 1324 struct rpc_rqst, rq_xmit2); 1325 list_del(&req->rq_xmit2); 1326 list_add_tail(&next->rq_xmit, &next->rq_xprt->xmit_queue); 1327 } 1328 } else 1329 list_del(&req->rq_xmit2); 1330 } 1331 1332 /** 1333 * xprt_request_dequeue_transmit - remove a task from the transmission queue 1334 * @task: pointer to rpc_task 1335 * 1336 * Remove a task from the transmission queue 1337 */ 1338 static void 1339 xprt_request_dequeue_transmit(struct rpc_task *task) 1340 { 1341 struct rpc_rqst *req = task->tk_rqstp; 1342 struct rpc_xprt *xprt = req->rq_xprt; 1343 1344 spin_lock(&xprt->queue_lock); 1345 xprt_request_dequeue_transmit_locked(task); 1346 spin_unlock(&xprt->queue_lock); 1347 } 1348 1349 /** 1350 * xprt_request_dequeue_xprt - remove a task from the transmit+receive queue 1351 * @task: pointer to rpc_task 1352 * 1353 * Remove a task from the transmit and receive queues, and ensure that 1354 * it is not pinned by the receive work item. 1355 */ 1356 void 1357 xprt_request_dequeue_xprt(struct rpc_task *task) 1358 { 1359 struct rpc_rqst *req = task->tk_rqstp; 1360 struct rpc_xprt *xprt = req->rq_xprt; 1361 1362 if (test_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate) || 1363 test_bit(RPC_TASK_NEED_RECV, &task->tk_runstate) || 1364 xprt_is_pinned_rqst(req)) { 1365 spin_lock(&xprt->queue_lock); 1366 xprt_request_dequeue_transmit_locked(task); 1367 xprt_request_dequeue_receive_locked(task); 1368 while (xprt_is_pinned_rqst(req)) { 1369 set_bit(RPC_TASK_MSG_PIN_WAIT, &task->tk_runstate); 1370 spin_unlock(&xprt->queue_lock); 1371 xprt_wait_on_pinned_rqst(req); 1372 spin_lock(&xprt->queue_lock); 1373 clear_bit(RPC_TASK_MSG_PIN_WAIT, &task->tk_runstate); 1374 } 1375 spin_unlock(&xprt->queue_lock); 1376 } 1377 } 1378 1379 /** 1380 * xprt_request_prepare - prepare an encoded request for transport 1381 * @req: pointer to rpc_rqst 1382 * 1383 * Calls into the transport layer to do whatever is needed to prepare 1384 * the request for transmission or receive. 1385 */ 1386 void 1387 xprt_request_prepare(struct rpc_rqst *req) 1388 { 1389 struct rpc_xprt *xprt = req->rq_xprt; 1390 1391 if (xprt->ops->prepare_request) 1392 xprt->ops->prepare_request(req); 1393 } 1394 1395 /** 1396 * xprt_request_need_retransmit - Test if a task needs retransmission 1397 * @task: pointer to rpc_task 1398 * 1399 * Test for whether a connection breakage requires the task to retransmit 1400 */ 1401 bool 1402 xprt_request_need_retransmit(struct rpc_task *task) 1403 { 1404 return xprt_request_retransmit_after_disconnect(task); 1405 } 1406 1407 /** 1408 * xprt_prepare_transmit - reserve the transport before sending a request 1409 * @task: RPC task about to send a request 1410 * 1411 */ 1412 bool xprt_prepare_transmit(struct rpc_task *task) 1413 { 1414 struct rpc_rqst *req = task->tk_rqstp; 1415 struct rpc_xprt *xprt = req->rq_xprt; 1416 1417 dprintk("RPC: %5u xprt_prepare_transmit\n", task->tk_pid); 1418 1419 if (!xprt_lock_write(xprt, task)) { 1420 /* Race breaker: someone may have transmitted us */ 1421 if (!test_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate)) 1422 rpc_wake_up_queued_task_set_status(&xprt->sending, 1423 task, 0); 1424 return false; 1425 1426 } 1427 return true; 1428 } 1429 1430 void xprt_end_transmit(struct rpc_task *task) 1431 { 1432 xprt_release_write(task->tk_rqstp->rq_xprt, task); 1433 } 1434 1435 /** 1436 * xprt_request_transmit - send an RPC request on a transport 1437 * @req: pointer to request to transmit 1438 * @snd_task: RPC task that owns the transport lock 1439 * 1440 * This performs the transmission of a single request. 1441 * Note that if the request is not the same as snd_task, then it 1442 * does need to be pinned. 1443 * Returns '0' on success. 1444 */ 1445 static int 1446 xprt_request_transmit(struct rpc_rqst *req, struct rpc_task *snd_task) 1447 { 1448 struct rpc_xprt *xprt = req->rq_xprt; 1449 struct rpc_task *task = req->rq_task; 1450 unsigned int connect_cookie; 1451 int is_retrans = RPC_WAS_SENT(task); 1452 int status; 1453 1454 if (!req->rq_bytes_sent) { 1455 if (xprt_request_data_received(task)) { 1456 status = 0; 1457 goto out_dequeue; 1458 } 1459 /* Verify that our message lies in the RPCSEC_GSS window */ 1460 if (rpcauth_xmit_need_reencode(task)) { 1461 status = -EBADMSG; 1462 goto out_dequeue; 1463 } 1464 if (RPC_SIGNALLED(task)) { 1465 status = -ERESTARTSYS; 1466 goto out_dequeue; 1467 } 1468 } 1469 1470 /* 1471 * Update req->rq_ntrans before transmitting to avoid races with 1472 * xprt_update_rtt(), which needs to know that it is recording a 1473 * reply to the first transmission. 1474 */ 1475 req->rq_ntrans++; 1476 1477 trace_rpc_xdr_sendto(task, &req->rq_snd_buf); 1478 connect_cookie = xprt->connect_cookie; 1479 status = xprt->ops->send_request(req); 1480 if (status != 0) { 1481 req->rq_ntrans--; 1482 trace_xprt_transmit(req, status); 1483 return status; 1484 } 1485 1486 if (is_retrans) 1487 task->tk_client->cl_stats->rpcretrans++; 1488 1489 xprt_inject_disconnect(xprt); 1490 1491 task->tk_flags |= RPC_TASK_SENT; 1492 spin_lock(&xprt->transport_lock); 1493 1494 xprt->stat.sends++; 1495 xprt->stat.req_u += xprt->stat.sends - xprt->stat.recvs; 1496 xprt->stat.bklog_u += xprt->backlog.qlen; 1497 xprt->stat.sending_u += xprt->sending.qlen; 1498 xprt->stat.pending_u += xprt->pending.qlen; 1499 spin_unlock(&xprt->transport_lock); 1500 1501 req->rq_connect_cookie = connect_cookie; 1502 out_dequeue: 1503 trace_xprt_transmit(req, status); 1504 xprt_request_dequeue_transmit(task); 1505 rpc_wake_up_queued_task_set_status(&xprt->sending, task, status); 1506 return status; 1507 } 1508 1509 /** 1510 * xprt_transmit - send an RPC request on a transport 1511 * @task: controlling RPC task 1512 * 1513 * Attempts to drain the transmit queue. On exit, either the transport 1514 * signalled an error that needs to be handled before transmission can 1515 * resume, or @task finished transmitting, and detected that it already 1516 * received a reply. 1517 */ 1518 void 1519 xprt_transmit(struct rpc_task *task) 1520 { 1521 struct rpc_rqst *next, *req = task->tk_rqstp; 1522 struct rpc_xprt *xprt = req->rq_xprt; 1523 int status; 1524 1525 spin_lock(&xprt->queue_lock); 1526 while (!list_empty(&xprt->xmit_queue)) { 1527 next = list_first_entry(&xprt->xmit_queue, 1528 struct rpc_rqst, rq_xmit); 1529 xprt_pin_rqst(next); 1530 spin_unlock(&xprt->queue_lock); 1531 status = xprt_request_transmit(next, task); 1532 if (status == -EBADMSG && next != req) 1533 status = 0; 1534 cond_resched(); 1535 spin_lock(&xprt->queue_lock); 1536 xprt_unpin_rqst(next); 1537 if (status == 0) { 1538 if (!xprt_request_data_received(task) || 1539 test_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate)) 1540 continue; 1541 } else if (test_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate)) 1542 task->tk_status = status; 1543 break; 1544 } 1545 spin_unlock(&xprt->queue_lock); 1546 } 1547 1548 static void xprt_add_backlog(struct rpc_xprt *xprt, struct rpc_task *task) 1549 { 1550 set_bit(XPRT_CONGESTED, &xprt->state); 1551 rpc_sleep_on(&xprt->backlog, task, NULL); 1552 } 1553 1554 static void xprt_wake_up_backlog(struct rpc_xprt *xprt) 1555 { 1556 if (rpc_wake_up_next(&xprt->backlog) == NULL) 1557 clear_bit(XPRT_CONGESTED, &xprt->state); 1558 } 1559 1560 static bool xprt_throttle_congested(struct rpc_xprt *xprt, struct rpc_task *task) 1561 { 1562 bool ret = false; 1563 1564 if (!test_bit(XPRT_CONGESTED, &xprt->state)) 1565 goto out; 1566 spin_lock(&xprt->reserve_lock); 1567 if (test_bit(XPRT_CONGESTED, &xprt->state)) { 1568 rpc_sleep_on(&xprt->backlog, task, NULL); 1569 ret = true; 1570 } 1571 spin_unlock(&xprt->reserve_lock); 1572 out: 1573 return ret; 1574 } 1575 1576 static struct rpc_rqst *xprt_dynamic_alloc_slot(struct rpc_xprt *xprt) 1577 { 1578 struct rpc_rqst *req = ERR_PTR(-EAGAIN); 1579 1580 if (xprt->num_reqs >= xprt->max_reqs) 1581 goto out; 1582 ++xprt->num_reqs; 1583 spin_unlock(&xprt->reserve_lock); 1584 req = kzalloc(sizeof(struct rpc_rqst), GFP_NOFS); 1585 spin_lock(&xprt->reserve_lock); 1586 if (req != NULL) 1587 goto out; 1588 --xprt->num_reqs; 1589 req = ERR_PTR(-ENOMEM); 1590 out: 1591 return req; 1592 } 1593 1594 static bool xprt_dynamic_free_slot(struct rpc_xprt *xprt, struct rpc_rqst *req) 1595 { 1596 if (xprt->num_reqs > xprt->min_reqs) { 1597 --xprt->num_reqs; 1598 kfree(req); 1599 return true; 1600 } 1601 return false; 1602 } 1603 1604 void xprt_alloc_slot(struct rpc_xprt *xprt, struct rpc_task *task) 1605 { 1606 struct rpc_rqst *req; 1607 1608 spin_lock(&xprt->reserve_lock); 1609 if (!list_empty(&xprt->free)) { 1610 req = list_entry(xprt->free.next, struct rpc_rqst, rq_list); 1611 list_del(&req->rq_list); 1612 goto out_init_req; 1613 } 1614 req = xprt_dynamic_alloc_slot(xprt); 1615 if (!IS_ERR(req)) 1616 goto out_init_req; 1617 switch (PTR_ERR(req)) { 1618 case -ENOMEM: 1619 dprintk("RPC: dynamic allocation of request slot " 1620 "failed! Retrying\n"); 1621 task->tk_status = -ENOMEM; 1622 break; 1623 case -EAGAIN: 1624 xprt_add_backlog(xprt, task); 1625 dprintk("RPC: waiting for request slot\n"); 1626 fallthrough; 1627 default: 1628 task->tk_status = -EAGAIN; 1629 } 1630 spin_unlock(&xprt->reserve_lock); 1631 return; 1632 out_init_req: 1633 xprt->stat.max_slots = max_t(unsigned int, xprt->stat.max_slots, 1634 xprt->num_reqs); 1635 spin_unlock(&xprt->reserve_lock); 1636 1637 task->tk_status = 0; 1638 task->tk_rqstp = req; 1639 } 1640 EXPORT_SYMBOL_GPL(xprt_alloc_slot); 1641 1642 void xprt_free_slot(struct rpc_xprt *xprt, struct rpc_rqst *req) 1643 { 1644 spin_lock(&xprt->reserve_lock); 1645 if (!xprt_dynamic_free_slot(xprt, req)) { 1646 memset(req, 0, sizeof(*req)); /* mark unused */ 1647 list_add(&req->rq_list, &xprt->free); 1648 } 1649 xprt_wake_up_backlog(xprt); 1650 spin_unlock(&xprt->reserve_lock); 1651 } 1652 EXPORT_SYMBOL_GPL(xprt_free_slot); 1653 1654 static void xprt_free_all_slots(struct rpc_xprt *xprt) 1655 { 1656 struct rpc_rqst *req; 1657 while (!list_empty(&xprt->free)) { 1658 req = list_first_entry(&xprt->free, struct rpc_rqst, rq_list); 1659 list_del(&req->rq_list); 1660 kfree(req); 1661 } 1662 } 1663 1664 struct rpc_xprt *xprt_alloc(struct net *net, size_t size, 1665 unsigned int num_prealloc, 1666 unsigned int max_alloc) 1667 { 1668 struct rpc_xprt *xprt; 1669 struct rpc_rqst *req; 1670 int i; 1671 1672 xprt = kzalloc(size, GFP_KERNEL); 1673 if (xprt == NULL) 1674 goto out; 1675 1676 xprt_init(xprt, net); 1677 1678 for (i = 0; i < num_prealloc; i++) { 1679 req = kzalloc(sizeof(struct rpc_rqst), GFP_KERNEL); 1680 if (!req) 1681 goto out_free; 1682 list_add(&req->rq_list, &xprt->free); 1683 } 1684 if (max_alloc > num_prealloc) 1685 xprt->max_reqs = max_alloc; 1686 else 1687 xprt->max_reqs = num_prealloc; 1688 xprt->min_reqs = num_prealloc; 1689 xprt->num_reqs = num_prealloc; 1690 1691 return xprt; 1692 1693 out_free: 1694 xprt_free(xprt); 1695 out: 1696 return NULL; 1697 } 1698 EXPORT_SYMBOL_GPL(xprt_alloc); 1699 1700 void xprt_free(struct rpc_xprt *xprt) 1701 { 1702 put_net(xprt->xprt_net); 1703 xprt_free_all_slots(xprt); 1704 kfree_rcu(xprt, rcu); 1705 } 1706 EXPORT_SYMBOL_GPL(xprt_free); 1707 1708 static void 1709 xprt_init_connect_cookie(struct rpc_rqst *req, struct rpc_xprt *xprt) 1710 { 1711 req->rq_connect_cookie = xprt_connect_cookie(xprt) - 1; 1712 } 1713 1714 static __be32 1715 xprt_alloc_xid(struct rpc_xprt *xprt) 1716 { 1717 __be32 xid; 1718 1719 spin_lock(&xprt->reserve_lock); 1720 xid = (__force __be32)xprt->xid++; 1721 spin_unlock(&xprt->reserve_lock); 1722 return xid; 1723 } 1724 1725 static void 1726 xprt_init_xid(struct rpc_xprt *xprt) 1727 { 1728 xprt->xid = prandom_u32(); 1729 } 1730 1731 static void 1732 xprt_request_init(struct rpc_task *task) 1733 { 1734 struct rpc_xprt *xprt = task->tk_xprt; 1735 struct rpc_rqst *req = task->tk_rqstp; 1736 1737 req->rq_task = task; 1738 req->rq_xprt = xprt; 1739 req->rq_buffer = NULL; 1740 req->rq_xid = xprt_alloc_xid(xprt); 1741 xprt_init_connect_cookie(req, xprt); 1742 req->rq_snd_buf.len = 0; 1743 req->rq_snd_buf.buflen = 0; 1744 req->rq_rcv_buf.len = 0; 1745 req->rq_rcv_buf.buflen = 0; 1746 req->rq_snd_buf.bvec = NULL; 1747 req->rq_rcv_buf.bvec = NULL; 1748 req->rq_release_snd_buf = NULL; 1749 xprt_init_majortimeo(task, req); 1750 dprintk("RPC: %5u reserved req %p xid %08x\n", task->tk_pid, 1751 req, ntohl(req->rq_xid)); 1752 } 1753 1754 static void 1755 xprt_do_reserve(struct rpc_xprt *xprt, struct rpc_task *task) 1756 { 1757 xprt->ops->alloc_slot(xprt, task); 1758 if (task->tk_rqstp != NULL) 1759 xprt_request_init(task); 1760 } 1761 1762 /** 1763 * xprt_reserve - allocate an RPC request slot 1764 * @task: RPC task requesting a slot allocation 1765 * 1766 * If the transport is marked as being congested, or if no more 1767 * slots are available, place the task on the transport's 1768 * backlog queue. 1769 */ 1770 void xprt_reserve(struct rpc_task *task) 1771 { 1772 struct rpc_xprt *xprt = task->tk_xprt; 1773 1774 task->tk_status = 0; 1775 if (task->tk_rqstp != NULL) 1776 return; 1777 1778 task->tk_status = -EAGAIN; 1779 if (!xprt_throttle_congested(xprt, task)) 1780 xprt_do_reserve(xprt, task); 1781 } 1782 1783 /** 1784 * xprt_retry_reserve - allocate an RPC request slot 1785 * @task: RPC task requesting a slot allocation 1786 * 1787 * If no more slots are available, place the task on the transport's 1788 * backlog queue. 1789 * Note that the only difference with xprt_reserve is that we now 1790 * ignore the value of the XPRT_CONGESTED flag. 1791 */ 1792 void xprt_retry_reserve(struct rpc_task *task) 1793 { 1794 struct rpc_xprt *xprt = task->tk_xprt; 1795 1796 task->tk_status = 0; 1797 if (task->tk_rqstp != NULL) 1798 return; 1799 1800 task->tk_status = -EAGAIN; 1801 xprt_do_reserve(xprt, task); 1802 } 1803 1804 /** 1805 * xprt_release - release an RPC request slot 1806 * @task: task which is finished with the slot 1807 * 1808 */ 1809 void xprt_release(struct rpc_task *task) 1810 { 1811 struct rpc_xprt *xprt; 1812 struct rpc_rqst *req = task->tk_rqstp; 1813 1814 if (req == NULL) { 1815 if (task->tk_client) { 1816 xprt = task->tk_xprt; 1817 xprt_release_write(xprt, task); 1818 } 1819 return; 1820 } 1821 1822 xprt = req->rq_xprt; 1823 xprt_request_dequeue_xprt(task); 1824 spin_lock(&xprt->transport_lock); 1825 xprt->ops->release_xprt(xprt, task); 1826 if (xprt->ops->release_request) 1827 xprt->ops->release_request(task); 1828 xprt_schedule_autodisconnect(xprt); 1829 spin_unlock(&xprt->transport_lock); 1830 if (req->rq_buffer) 1831 xprt->ops->buf_free(task); 1832 xprt_inject_disconnect(xprt); 1833 xdr_free_bvec(&req->rq_rcv_buf); 1834 xdr_free_bvec(&req->rq_snd_buf); 1835 if (req->rq_cred != NULL) 1836 put_rpccred(req->rq_cred); 1837 task->tk_rqstp = NULL; 1838 if (req->rq_release_snd_buf) 1839 req->rq_release_snd_buf(req); 1840 1841 dprintk("RPC: %5u release request %p\n", task->tk_pid, req); 1842 if (likely(!bc_prealloc(req))) 1843 xprt->ops->free_slot(xprt, req); 1844 else 1845 xprt_free_bc_request(req); 1846 } 1847 1848 #ifdef CONFIG_SUNRPC_BACKCHANNEL 1849 void 1850 xprt_init_bc_request(struct rpc_rqst *req, struct rpc_task *task) 1851 { 1852 struct xdr_buf *xbufp = &req->rq_snd_buf; 1853 1854 task->tk_rqstp = req; 1855 req->rq_task = task; 1856 xprt_init_connect_cookie(req, req->rq_xprt); 1857 /* 1858 * Set up the xdr_buf length. 1859 * This also indicates that the buffer is XDR encoded already. 1860 */ 1861 xbufp->len = xbufp->head[0].iov_len + xbufp->page_len + 1862 xbufp->tail[0].iov_len; 1863 } 1864 #endif 1865 1866 static void xprt_init(struct rpc_xprt *xprt, struct net *net) 1867 { 1868 kref_init(&xprt->kref); 1869 1870 spin_lock_init(&xprt->transport_lock); 1871 spin_lock_init(&xprt->reserve_lock); 1872 spin_lock_init(&xprt->queue_lock); 1873 1874 INIT_LIST_HEAD(&xprt->free); 1875 xprt->recv_queue = RB_ROOT; 1876 INIT_LIST_HEAD(&xprt->xmit_queue); 1877 #if defined(CONFIG_SUNRPC_BACKCHANNEL) 1878 spin_lock_init(&xprt->bc_pa_lock); 1879 INIT_LIST_HEAD(&xprt->bc_pa_list); 1880 #endif /* CONFIG_SUNRPC_BACKCHANNEL */ 1881 INIT_LIST_HEAD(&xprt->xprt_switch); 1882 1883 xprt->last_used = jiffies; 1884 xprt->cwnd = RPC_INITCWND; 1885 xprt->bind_index = 0; 1886 1887 rpc_init_wait_queue(&xprt->binding, "xprt_binding"); 1888 rpc_init_wait_queue(&xprt->pending, "xprt_pending"); 1889 rpc_init_wait_queue(&xprt->sending, "xprt_sending"); 1890 rpc_init_priority_wait_queue(&xprt->backlog, "xprt_backlog"); 1891 1892 xprt_init_xid(xprt); 1893 1894 xprt->xprt_net = get_net(net); 1895 } 1896 1897 /** 1898 * xprt_create_transport - create an RPC transport 1899 * @args: rpc transport creation arguments 1900 * 1901 */ 1902 struct rpc_xprt *xprt_create_transport(struct xprt_create *args) 1903 { 1904 struct rpc_xprt *xprt; 1905 struct xprt_class *t; 1906 1907 spin_lock(&xprt_list_lock); 1908 list_for_each_entry(t, &xprt_list, list) { 1909 if (t->ident == args->ident) { 1910 spin_unlock(&xprt_list_lock); 1911 goto found; 1912 } 1913 } 1914 spin_unlock(&xprt_list_lock); 1915 dprintk("RPC: transport (%d) not supported\n", args->ident); 1916 return ERR_PTR(-EIO); 1917 1918 found: 1919 xprt = t->setup(args); 1920 if (IS_ERR(xprt)) 1921 goto out; 1922 if (args->flags & XPRT_CREATE_NO_IDLE_TIMEOUT) 1923 xprt->idle_timeout = 0; 1924 INIT_WORK(&xprt->task_cleanup, xprt_autoclose); 1925 if (xprt_has_timer(xprt)) 1926 timer_setup(&xprt->timer, xprt_init_autodisconnect, 0); 1927 else 1928 timer_setup(&xprt->timer, NULL, 0); 1929 1930 if (strlen(args->servername) > RPC_MAXNETNAMELEN) { 1931 xprt_destroy(xprt); 1932 return ERR_PTR(-EINVAL); 1933 } 1934 xprt->servername = kstrdup(args->servername, GFP_KERNEL); 1935 if (xprt->servername == NULL) { 1936 xprt_destroy(xprt); 1937 return ERR_PTR(-ENOMEM); 1938 } 1939 1940 rpc_xprt_debugfs_register(xprt); 1941 1942 trace_xprt_create(xprt); 1943 out: 1944 return xprt; 1945 } 1946 1947 static void xprt_destroy_cb(struct work_struct *work) 1948 { 1949 struct rpc_xprt *xprt = 1950 container_of(work, struct rpc_xprt, task_cleanup); 1951 1952 trace_xprt_destroy(xprt); 1953 1954 rpc_xprt_debugfs_unregister(xprt); 1955 rpc_destroy_wait_queue(&xprt->binding); 1956 rpc_destroy_wait_queue(&xprt->pending); 1957 rpc_destroy_wait_queue(&xprt->sending); 1958 rpc_destroy_wait_queue(&xprt->backlog); 1959 kfree(xprt->servername); 1960 /* 1961 * Destroy any existing back channel 1962 */ 1963 xprt_destroy_backchannel(xprt, UINT_MAX); 1964 1965 /* 1966 * Tear down transport state and free the rpc_xprt 1967 */ 1968 xprt->ops->destroy(xprt); 1969 } 1970 1971 /** 1972 * xprt_destroy - destroy an RPC transport, killing off all requests. 1973 * @xprt: transport to destroy 1974 * 1975 */ 1976 static void xprt_destroy(struct rpc_xprt *xprt) 1977 { 1978 /* 1979 * Exclude transport connect/disconnect handlers and autoclose 1980 */ 1981 wait_on_bit_lock(&xprt->state, XPRT_LOCKED, TASK_UNINTERRUPTIBLE); 1982 1983 del_timer_sync(&xprt->timer); 1984 1985 /* 1986 * Destroy sockets etc from the system workqueue so they can 1987 * safely flush receive work running on rpciod. 1988 */ 1989 INIT_WORK(&xprt->task_cleanup, xprt_destroy_cb); 1990 schedule_work(&xprt->task_cleanup); 1991 } 1992 1993 static void xprt_destroy_kref(struct kref *kref) 1994 { 1995 xprt_destroy(container_of(kref, struct rpc_xprt, kref)); 1996 } 1997 1998 /** 1999 * xprt_get - return a reference to an RPC transport. 2000 * @xprt: pointer to the transport 2001 * 2002 */ 2003 struct rpc_xprt *xprt_get(struct rpc_xprt *xprt) 2004 { 2005 if (xprt != NULL && kref_get_unless_zero(&xprt->kref)) 2006 return xprt; 2007 return NULL; 2008 } 2009 EXPORT_SYMBOL_GPL(xprt_get); 2010 2011 /** 2012 * xprt_put - release a reference to an RPC transport. 2013 * @xprt: pointer to the transport 2014 * 2015 */ 2016 void xprt_put(struct rpc_xprt *xprt) 2017 { 2018 if (xprt != NULL) 2019 kref_put(&xprt->kref, xprt_destroy_kref); 2020 } 2021 EXPORT_SYMBOL_GPL(xprt_put); 2022