1 // SPDX-License-Identifier: GPL-2.0-only 2 /* 3 * linux/net/sunrpc/xprt.c 4 * 5 * This is a generic RPC call interface supporting congestion avoidance, 6 * and asynchronous calls. 7 * 8 * The interface works like this: 9 * 10 * - When a process places a call, it allocates a request slot if 11 * one is available. Otherwise, it sleeps on the backlog queue 12 * (xprt_reserve). 13 * - Next, the caller puts together the RPC message, stuffs it into 14 * the request struct, and calls xprt_transmit(). 15 * - xprt_transmit sends the message and installs the caller on the 16 * transport's wait list. At the same time, if a reply is expected, 17 * it installs a timer that is run after the packet's timeout has 18 * expired. 19 * - When a packet arrives, the data_ready handler walks the list of 20 * pending requests for that transport. If a matching XID is found, the 21 * caller is woken up, and the timer removed. 22 * - When no reply arrives within the timeout interval, the timer is 23 * fired by the kernel and runs xprt_timer(). It either adjusts the 24 * timeout values (minor timeout) or wakes up the caller with a status 25 * of -ETIMEDOUT. 26 * - When the caller receives a notification from RPC that a reply arrived, 27 * it should release the RPC slot, and process the reply. 28 * If the call timed out, it may choose to retry the operation by 29 * adjusting the initial timeout value, and simply calling rpc_call 30 * again. 31 * 32 * Support for async RPC is done through a set of RPC-specific scheduling 33 * primitives that `transparently' work for processes as well as async 34 * tasks that rely on callbacks. 35 * 36 * Copyright (C) 1995-1997, Olaf Kirch <okir@monad.swb.de> 37 * 38 * Transport switch API copyright (C) 2005, Chuck Lever <cel@netapp.com> 39 */ 40 41 #include <linux/module.h> 42 43 #include <linux/types.h> 44 #include <linux/interrupt.h> 45 #include <linux/workqueue.h> 46 #include <linux/net.h> 47 #include <linux/ktime.h> 48 49 #include <linux/sunrpc/clnt.h> 50 #include <linux/sunrpc/metrics.h> 51 #include <linux/sunrpc/bc_xprt.h> 52 #include <linux/rcupdate.h> 53 #include <linux/sched/mm.h> 54 55 #include <trace/events/sunrpc.h> 56 57 #include "sunrpc.h" 58 #include "sysfs.h" 59 #include "fail.h" 60 61 /* 62 * Local variables 63 */ 64 65 #if IS_ENABLED(CONFIG_SUNRPC_DEBUG) 66 # define RPCDBG_FACILITY RPCDBG_XPRT 67 #endif 68 69 /* 70 * Local functions 71 */ 72 static void xprt_init(struct rpc_xprt *xprt, struct net *net); 73 static __be32 xprt_alloc_xid(struct rpc_xprt *xprt); 74 static void xprt_destroy(struct rpc_xprt *xprt); 75 static void xprt_request_init(struct rpc_task *task); 76 77 static DEFINE_SPINLOCK(xprt_list_lock); 78 static LIST_HEAD(xprt_list); 79 80 static unsigned long xprt_request_timeout(const struct rpc_rqst *req) 81 { 82 unsigned long timeout = jiffies + req->rq_timeout; 83 84 if (time_before(timeout, req->rq_majortimeo)) 85 return timeout; 86 return req->rq_majortimeo; 87 } 88 89 /** 90 * xprt_register_transport - register a transport implementation 91 * @transport: transport to register 92 * 93 * If a transport implementation is loaded as a kernel module, it can 94 * call this interface to make itself known to the RPC client. 95 * 96 * Returns: 97 * 0: transport successfully registered 98 * -EEXIST: transport already registered 99 * -EINVAL: transport module being unloaded 100 */ 101 int xprt_register_transport(struct xprt_class *transport) 102 { 103 struct xprt_class *t; 104 int result; 105 106 result = -EEXIST; 107 spin_lock(&xprt_list_lock); 108 list_for_each_entry(t, &xprt_list, list) { 109 /* don't register the same transport class twice */ 110 if (t->ident == transport->ident) 111 goto out; 112 } 113 114 list_add_tail(&transport->list, &xprt_list); 115 printk(KERN_INFO "RPC: Registered %s transport module.\n", 116 transport->name); 117 result = 0; 118 119 out: 120 spin_unlock(&xprt_list_lock); 121 return result; 122 } 123 EXPORT_SYMBOL_GPL(xprt_register_transport); 124 125 /** 126 * xprt_unregister_transport - unregister a transport implementation 127 * @transport: transport to unregister 128 * 129 * Returns: 130 * 0: transport successfully unregistered 131 * -ENOENT: transport never registered 132 */ 133 int xprt_unregister_transport(struct xprt_class *transport) 134 { 135 struct xprt_class *t; 136 int result; 137 138 result = 0; 139 spin_lock(&xprt_list_lock); 140 list_for_each_entry(t, &xprt_list, list) { 141 if (t == transport) { 142 printk(KERN_INFO 143 "RPC: Unregistered %s transport module.\n", 144 transport->name); 145 list_del_init(&transport->list); 146 goto out; 147 } 148 } 149 result = -ENOENT; 150 151 out: 152 spin_unlock(&xprt_list_lock); 153 return result; 154 } 155 EXPORT_SYMBOL_GPL(xprt_unregister_transport); 156 157 static void 158 xprt_class_release(const struct xprt_class *t) 159 { 160 module_put(t->owner); 161 } 162 163 static const struct xprt_class * 164 xprt_class_find_by_ident_locked(int ident) 165 { 166 const struct xprt_class *t; 167 168 list_for_each_entry(t, &xprt_list, list) { 169 if (t->ident != ident) 170 continue; 171 if (!try_module_get(t->owner)) 172 continue; 173 return t; 174 } 175 return NULL; 176 } 177 178 static const struct xprt_class * 179 xprt_class_find_by_ident(int ident) 180 { 181 const struct xprt_class *t; 182 183 spin_lock(&xprt_list_lock); 184 t = xprt_class_find_by_ident_locked(ident); 185 spin_unlock(&xprt_list_lock); 186 return t; 187 } 188 189 static const struct xprt_class * 190 xprt_class_find_by_netid_locked(const char *netid) 191 { 192 const struct xprt_class *t; 193 unsigned int i; 194 195 list_for_each_entry(t, &xprt_list, list) { 196 for (i = 0; t->netid[i][0] != '\0'; i++) { 197 if (strcmp(t->netid[i], netid) != 0) 198 continue; 199 if (!try_module_get(t->owner)) 200 continue; 201 return t; 202 } 203 } 204 return NULL; 205 } 206 207 static const struct xprt_class * 208 xprt_class_find_by_netid(const char *netid) 209 { 210 const struct xprt_class *t; 211 212 spin_lock(&xprt_list_lock); 213 t = xprt_class_find_by_netid_locked(netid); 214 if (!t) { 215 spin_unlock(&xprt_list_lock); 216 request_module("rpc%s", netid); 217 spin_lock(&xprt_list_lock); 218 t = xprt_class_find_by_netid_locked(netid); 219 } 220 spin_unlock(&xprt_list_lock); 221 return t; 222 } 223 224 /** 225 * xprt_find_transport_ident - convert a netid into a transport identifier 226 * @netid: transport to load 227 * 228 * Returns: 229 * > 0: transport identifier 230 * -ENOENT: transport module not available 231 */ 232 int xprt_find_transport_ident(const char *netid) 233 { 234 const struct xprt_class *t; 235 int ret; 236 237 t = xprt_class_find_by_netid(netid); 238 if (!t) 239 return -ENOENT; 240 ret = t->ident; 241 xprt_class_release(t); 242 return ret; 243 } 244 EXPORT_SYMBOL_GPL(xprt_find_transport_ident); 245 246 static void xprt_clear_locked(struct rpc_xprt *xprt) 247 { 248 xprt->snd_task = NULL; 249 if (!test_bit(XPRT_CLOSE_WAIT, &xprt->state)) 250 clear_bit_unlock(XPRT_LOCKED, &xprt->state); 251 else 252 queue_work(xprtiod_workqueue, &xprt->task_cleanup); 253 } 254 255 /** 256 * xprt_reserve_xprt - serialize write access to transports 257 * @task: task that is requesting access to the transport 258 * @xprt: pointer to the target transport 259 * 260 * This prevents mixing the payload of separate requests, and prevents 261 * transport connects from colliding with writes. No congestion control 262 * is provided. 263 */ 264 int xprt_reserve_xprt(struct rpc_xprt *xprt, struct rpc_task *task) 265 { 266 struct rpc_rqst *req = task->tk_rqstp; 267 268 if (test_and_set_bit(XPRT_LOCKED, &xprt->state)) { 269 if (task == xprt->snd_task) 270 goto out_locked; 271 goto out_sleep; 272 } 273 if (test_bit(XPRT_WRITE_SPACE, &xprt->state)) 274 goto out_unlock; 275 xprt->snd_task = task; 276 277 out_locked: 278 trace_xprt_reserve_xprt(xprt, task); 279 return 1; 280 281 out_unlock: 282 xprt_clear_locked(xprt); 283 out_sleep: 284 task->tk_status = -EAGAIN; 285 if (RPC_IS_SOFT(task)) 286 rpc_sleep_on_timeout(&xprt->sending, task, NULL, 287 xprt_request_timeout(req)); 288 else 289 rpc_sleep_on(&xprt->sending, task, NULL); 290 return 0; 291 } 292 EXPORT_SYMBOL_GPL(xprt_reserve_xprt); 293 294 static bool 295 xprt_need_congestion_window_wait(struct rpc_xprt *xprt) 296 { 297 return test_bit(XPRT_CWND_WAIT, &xprt->state); 298 } 299 300 static void 301 xprt_set_congestion_window_wait(struct rpc_xprt *xprt) 302 { 303 if (!list_empty(&xprt->xmit_queue)) { 304 /* Peek at head of queue to see if it can make progress */ 305 if (list_first_entry(&xprt->xmit_queue, struct rpc_rqst, 306 rq_xmit)->rq_cong) 307 return; 308 } 309 set_bit(XPRT_CWND_WAIT, &xprt->state); 310 } 311 312 static void 313 xprt_test_and_clear_congestion_window_wait(struct rpc_xprt *xprt) 314 { 315 if (!RPCXPRT_CONGESTED(xprt)) 316 clear_bit(XPRT_CWND_WAIT, &xprt->state); 317 } 318 319 /* 320 * xprt_reserve_xprt_cong - serialize write access to transports 321 * @task: task that is requesting access to the transport 322 * 323 * Same as xprt_reserve_xprt, but Van Jacobson congestion control is 324 * integrated into the decision of whether a request is allowed to be 325 * woken up and given access to the transport. 326 * Note that the lock is only granted if we know there are free slots. 327 */ 328 int xprt_reserve_xprt_cong(struct rpc_xprt *xprt, struct rpc_task *task) 329 { 330 struct rpc_rqst *req = task->tk_rqstp; 331 332 if (test_and_set_bit(XPRT_LOCKED, &xprt->state)) { 333 if (task == xprt->snd_task) 334 goto out_locked; 335 goto out_sleep; 336 } 337 if (req == NULL) { 338 xprt->snd_task = task; 339 goto out_locked; 340 } 341 if (test_bit(XPRT_WRITE_SPACE, &xprt->state)) 342 goto out_unlock; 343 if (!xprt_need_congestion_window_wait(xprt)) { 344 xprt->snd_task = task; 345 goto out_locked; 346 } 347 out_unlock: 348 xprt_clear_locked(xprt); 349 out_sleep: 350 task->tk_status = -EAGAIN; 351 if (RPC_IS_SOFT(task)) 352 rpc_sleep_on_timeout(&xprt->sending, task, NULL, 353 xprt_request_timeout(req)); 354 else 355 rpc_sleep_on(&xprt->sending, task, NULL); 356 return 0; 357 out_locked: 358 trace_xprt_reserve_cong(xprt, task); 359 return 1; 360 } 361 EXPORT_SYMBOL_GPL(xprt_reserve_xprt_cong); 362 363 static inline int xprt_lock_write(struct rpc_xprt *xprt, struct rpc_task *task) 364 { 365 int retval; 366 367 if (test_bit(XPRT_LOCKED, &xprt->state) && xprt->snd_task == task) 368 return 1; 369 spin_lock(&xprt->transport_lock); 370 retval = xprt->ops->reserve_xprt(xprt, task); 371 spin_unlock(&xprt->transport_lock); 372 return retval; 373 } 374 375 static bool __xprt_lock_write_func(struct rpc_task *task, void *data) 376 { 377 struct rpc_xprt *xprt = data; 378 379 xprt->snd_task = task; 380 return true; 381 } 382 383 static void __xprt_lock_write_next(struct rpc_xprt *xprt) 384 { 385 if (test_and_set_bit(XPRT_LOCKED, &xprt->state)) 386 return; 387 if (test_bit(XPRT_WRITE_SPACE, &xprt->state)) 388 goto out_unlock; 389 if (rpc_wake_up_first_on_wq(xprtiod_workqueue, &xprt->sending, 390 __xprt_lock_write_func, xprt)) 391 return; 392 out_unlock: 393 xprt_clear_locked(xprt); 394 } 395 396 static void __xprt_lock_write_next_cong(struct rpc_xprt *xprt) 397 { 398 if (test_and_set_bit(XPRT_LOCKED, &xprt->state)) 399 return; 400 if (test_bit(XPRT_WRITE_SPACE, &xprt->state)) 401 goto out_unlock; 402 if (xprt_need_congestion_window_wait(xprt)) 403 goto out_unlock; 404 if (rpc_wake_up_first_on_wq(xprtiod_workqueue, &xprt->sending, 405 __xprt_lock_write_func, xprt)) 406 return; 407 out_unlock: 408 xprt_clear_locked(xprt); 409 } 410 411 /** 412 * xprt_release_xprt - allow other requests to use a transport 413 * @xprt: transport with other tasks potentially waiting 414 * @task: task that is releasing access to the transport 415 * 416 * Note that "task" can be NULL. No congestion control is provided. 417 */ 418 void xprt_release_xprt(struct rpc_xprt *xprt, struct rpc_task *task) 419 { 420 if (xprt->snd_task == task) { 421 xprt_clear_locked(xprt); 422 __xprt_lock_write_next(xprt); 423 } 424 trace_xprt_release_xprt(xprt, task); 425 } 426 EXPORT_SYMBOL_GPL(xprt_release_xprt); 427 428 /** 429 * xprt_release_xprt_cong - allow other requests to use a transport 430 * @xprt: transport with other tasks potentially waiting 431 * @task: task that is releasing access to the transport 432 * 433 * Note that "task" can be NULL. Another task is awoken to use the 434 * transport if the transport's congestion window allows it. 435 */ 436 void xprt_release_xprt_cong(struct rpc_xprt *xprt, struct rpc_task *task) 437 { 438 if (xprt->snd_task == task) { 439 xprt_clear_locked(xprt); 440 __xprt_lock_write_next_cong(xprt); 441 } 442 trace_xprt_release_cong(xprt, task); 443 } 444 EXPORT_SYMBOL_GPL(xprt_release_xprt_cong); 445 446 void xprt_release_write(struct rpc_xprt *xprt, struct rpc_task *task) 447 { 448 if (xprt->snd_task != task) 449 return; 450 spin_lock(&xprt->transport_lock); 451 xprt->ops->release_xprt(xprt, task); 452 spin_unlock(&xprt->transport_lock); 453 } 454 455 /* 456 * Van Jacobson congestion avoidance. Check if the congestion window 457 * overflowed. Put the task to sleep if this is the case. 458 */ 459 static int 460 __xprt_get_cong(struct rpc_xprt *xprt, struct rpc_rqst *req) 461 { 462 if (req->rq_cong) 463 return 1; 464 trace_xprt_get_cong(xprt, req->rq_task); 465 if (RPCXPRT_CONGESTED(xprt)) { 466 xprt_set_congestion_window_wait(xprt); 467 return 0; 468 } 469 req->rq_cong = 1; 470 xprt->cong += RPC_CWNDSCALE; 471 return 1; 472 } 473 474 /* 475 * Adjust the congestion window, and wake up the next task 476 * that has been sleeping due to congestion 477 */ 478 static void 479 __xprt_put_cong(struct rpc_xprt *xprt, struct rpc_rqst *req) 480 { 481 if (!req->rq_cong) 482 return; 483 req->rq_cong = 0; 484 xprt->cong -= RPC_CWNDSCALE; 485 xprt_test_and_clear_congestion_window_wait(xprt); 486 trace_xprt_put_cong(xprt, req->rq_task); 487 __xprt_lock_write_next_cong(xprt); 488 } 489 490 /** 491 * xprt_request_get_cong - Request congestion control credits 492 * @xprt: pointer to transport 493 * @req: pointer to RPC request 494 * 495 * Useful for transports that require congestion control. 496 */ 497 bool 498 xprt_request_get_cong(struct rpc_xprt *xprt, struct rpc_rqst *req) 499 { 500 bool ret = false; 501 502 if (req->rq_cong) 503 return true; 504 spin_lock(&xprt->transport_lock); 505 ret = __xprt_get_cong(xprt, req) != 0; 506 spin_unlock(&xprt->transport_lock); 507 return ret; 508 } 509 EXPORT_SYMBOL_GPL(xprt_request_get_cong); 510 511 /** 512 * xprt_release_rqst_cong - housekeeping when request is complete 513 * @task: RPC request that recently completed 514 * 515 * Useful for transports that require congestion control. 516 */ 517 void xprt_release_rqst_cong(struct rpc_task *task) 518 { 519 struct rpc_rqst *req = task->tk_rqstp; 520 521 __xprt_put_cong(req->rq_xprt, req); 522 } 523 EXPORT_SYMBOL_GPL(xprt_release_rqst_cong); 524 525 static void xprt_clear_congestion_window_wait_locked(struct rpc_xprt *xprt) 526 { 527 if (test_and_clear_bit(XPRT_CWND_WAIT, &xprt->state)) 528 __xprt_lock_write_next_cong(xprt); 529 } 530 531 /* 532 * Clear the congestion window wait flag and wake up the next 533 * entry on xprt->sending 534 */ 535 static void 536 xprt_clear_congestion_window_wait(struct rpc_xprt *xprt) 537 { 538 if (test_and_clear_bit(XPRT_CWND_WAIT, &xprt->state)) { 539 spin_lock(&xprt->transport_lock); 540 __xprt_lock_write_next_cong(xprt); 541 spin_unlock(&xprt->transport_lock); 542 } 543 } 544 545 /** 546 * xprt_adjust_cwnd - adjust transport congestion window 547 * @xprt: pointer to xprt 548 * @task: recently completed RPC request used to adjust window 549 * @result: result code of completed RPC request 550 * 551 * The transport code maintains an estimate on the maximum number of out- 552 * standing RPC requests, using a smoothed version of the congestion 553 * avoidance implemented in 44BSD. This is basically the Van Jacobson 554 * congestion algorithm: If a retransmit occurs, the congestion window is 555 * halved; otherwise, it is incremented by 1/cwnd when 556 * 557 * - a reply is received and 558 * - a full number of requests are outstanding and 559 * - the congestion window hasn't been updated recently. 560 */ 561 void xprt_adjust_cwnd(struct rpc_xprt *xprt, struct rpc_task *task, int result) 562 { 563 struct rpc_rqst *req = task->tk_rqstp; 564 unsigned long cwnd = xprt->cwnd; 565 566 if (result >= 0 && cwnd <= xprt->cong) { 567 /* The (cwnd >> 1) term makes sure 568 * the result gets rounded properly. */ 569 cwnd += (RPC_CWNDSCALE * RPC_CWNDSCALE + (cwnd >> 1)) / cwnd; 570 if (cwnd > RPC_MAXCWND(xprt)) 571 cwnd = RPC_MAXCWND(xprt); 572 __xprt_lock_write_next_cong(xprt); 573 } else if (result == -ETIMEDOUT) { 574 cwnd >>= 1; 575 if (cwnd < RPC_CWNDSCALE) 576 cwnd = RPC_CWNDSCALE; 577 } 578 dprintk("RPC: cong %ld, cwnd was %ld, now %ld\n", 579 xprt->cong, xprt->cwnd, cwnd); 580 xprt->cwnd = cwnd; 581 __xprt_put_cong(xprt, req); 582 } 583 EXPORT_SYMBOL_GPL(xprt_adjust_cwnd); 584 585 /** 586 * xprt_wake_pending_tasks - wake all tasks on a transport's pending queue 587 * @xprt: transport with waiting tasks 588 * @status: result code to plant in each task before waking it 589 * 590 */ 591 void xprt_wake_pending_tasks(struct rpc_xprt *xprt, int status) 592 { 593 if (status < 0) 594 rpc_wake_up_status(&xprt->pending, status); 595 else 596 rpc_wake_up(&xprt->pending); 597 } 598 EXPORT_SYMBOL_GPL(xprt_wake_pending_tasks); 599 600 /** 601 * xprt_wait_for_buffer_space - wait for transport output buffer to clear 602 * @xprt: transport 603 * 604 * Note that we only set the timer for the case of RPC_IS_SOFT(), since 605 * we don't in general want to force a socket disconnection due to 606 * an incomplete RPC call transmission. 607 */ 608 void xprt_wait_for_buffer_space(struct rpc_xprt *xprt) 609 { 610 set_bit(XPRT_WRITE_SPACE, &xprt->state); 611 } 612 EXPORT_SYMBOL_GPL(xprt_wait_for_buffer_space); 613 614 static bool 615 xprt_clear_write_space_locked(struct rpc_xprt *xprt) 616 { 617 if (test_and_clear_bit(XPRT_WRITE_SPACE, &xprt->state)) { 618 __xprt_lock_write_next(xprt); 619 dprintk("RPC: write space: waking waiting task on " 620 "xprt %p\n", xprt); 621 return true; 622 } 623 return false; 624 } 625 626 /** 627 * xprt_write_space - wake the task waiting for transport output buffer space 628 * @xprt: transport with waiting tasks 629 * 630 * Can be called in a soft IRQ context, so xprt_write_space never sleeps. 631 */ 632 bool xprt_write_space(struct rpc_xprt *xprt) 633 { 634 bool ret; 635 636 if (!test_bit(XPRT_WRITE_SPACE, &xprt->state)) 637 return false; 638 spin_lock(&xprt->transport_lock); 639 ret = xprt_clear_write_space_locked(xprt); 640 spin_unlock(&xprt->transport_lock); 641 return ret; 642 } 643 EXPORT_SYMBOL_GPL(xprt_write_space); 644 645 static unsigned long xprt_abs_ktime_to_jiffies(ktime_t abstime) 646 { 647 s64 delta = ktime_to_ns(ktime_get() - abstime); 648 return likely(delta >= 0) ? 649 jiffies - nsecs_to_jiffies(delta) : 650 jiffies + nsecs_to_jiffies(-delta); 651 } 652 653 static unsigned long xprt_calc_majortimeo(struct rpc_rqst *req) 654 { 655 const struct rpc_timeout *to = req->rq_task->tk_client->cl_timeout; 656 unsigned long majortimeo = req->rq_timeout; 657 658 if (to->to_exponential) 659 majortimeo <<= to->to_retries; 660 else 661 majortimeo += to->to_increment * to->to_retries; 662 if (majortimeo > to->to_maxval || majortimeo == 0) 663 majortimeo = to->to_maxval; 664 return majortimeo; 665 } 666 667 static void xprt_reset_majortimeo(struct rpc_rqst *req) 668 { 669 req->rq_majortimeo += xprt_calc_majortimeo(req); 670 } 671 672 static void xprt_reset_minortimeo(struct rpc_rqst *req) 673 { 674 req->rq_minortimeo += req->rq_timeout; 675 } 676 677 static void xprt_init_majortimeo(struct rpc_task *task, struct rpc_rqst *req) 678 { 679 unsigned long time_init; 680 struct rpc_xprt *xprt = req->rq_xprt; 681 682 if (likely(xprt && xprt_connected(xprt))) 683 time_init = jiffies; 684 else 685 time_init = xprt_abs_ktime_to_jiffies(task->tk_start); 686 req->rq_timeout = task->tk_client->cl_timeout->to_initval; 687 req->rq_majortimeo = time_init + xprt_calc_majortimeo(req); 688 req->rq_minortimeo = time_init + req->rq_timeout; 689 } 690 691 /** 692 * xprt_adjust_timeout - adjust timeout values for next retransmit 693 * @req: RPC request containing parameters to use for the adjustment 694 * 695 */ 696 int xprt_adjust_timeout(struct rpc_rqst *req) 697 { 698 struct rpc_xprt *xprt = req->rq_xprt; 699 const struct rpc_timeout *to = req->rq_task->tk_client->cl_timeout; 700 int status = 0; 701 702 if (time_before(jiffies, req->rq_majortimeo)) { 703 if (time_before(jiffies, req->rq_minortimeo)) 704 return status; 705 if (to->to_exponential) 706 req->rq_timeout <<= 1; 707 else 708 req->rq_timeout += to->to_increment; 709 if (to->to_maxval && req->rq_timeout >= to->to_maxval) 710 req->rq_timeout = to->to_maxval; 711 req->rq_retries++; 712 } else { 713 req->rq_timeout = to->to_initval; 714 req->rq_retries = 0; 715 xprt_reset_majortimeo(req); 716 /* Reset the RTT counters == "slow start" */ 717 spin_lock(&xprt->transport_lock); 718 rpc_init_rtt(req->rq_task->tk_client->cl_rtt, to->to_initval); 719 spin_unlock(&xprt->transport_lock); 720 status = -ETIMEDOUT; 721 } 722 xprt_reset_minortimeo(req); 723 724 if (req->rq_timeout == 0) { 725 printk(KERN_WARNING "xprt_adjust_timeout: rq_timeout = 0!\n"); 726 req->rq_timeout = 5 * HZ; 727 } 728 return status; 729 } 730 731 static void xprt_autoclose(struct work_struct *work) 732 { 733 struct rpc_xprt *xprt = 734 container_of(work, struct rpc_xprt, task_cleanup); 735 unsigned int pflags = memalloc_nofs_save(); 736 737 trace_xprt_disconnect_auto(xprt); 738 xprt->connect_cookie++; 739 smp_mb__before_atomic(); 740 clear_bit(XPRT_CLOSE_WAIT, &xprt->state); 741 xprt->ops->close(xprt); 742 xprt_release_write(xprt, NULL); 743 wake_up_bit(&xprt->state, XPRT_LOCKED); 744 memalloc_nofs_restore(pflags); 745 } 746 747 /** 748 * xprt_disconnect_done - mark a transport as disconnected 749 * @xprt: transport to flag for disconnect 750 * 751 */ 752 void xprt_disconnect_done(struct rpc_xprt *xprt) 753 { 754 trace_xprt_disconnect_done(xprt); 755 spin_lock(&xprt->transport_lock); 756 xprt_clear_connected(xprt); 757 xprt_clear_write_space_locked(xprt); 758 xprt_clear_congestion_window_wait_locked(xprt); 759 xprt_wake_pending_tasks(xprt, -ENOTCONN); 760 spin_unlock(&xprt->transport_lock); 761 } 762 EXPORT_SYMBOL_GPL(xprt_disconnect_done); 763 764 /** 765 * xprt_schedule_autoclose_locked - Try to schedule an autoclose RPC call 766 * @xprt: transport to disconnect 767 */ 768 static void xprt_schedule_autoclose_locked(struct rpc_xprt *xprt) 769 { 770 if (test_and_set_bit(XPRT_CLOSE_WAIT, &xprt->state)) 771 return; 772 if (test_and_set_bit(XPRT_LOCKED, &xprt->state) == 0) 773 queue_work(xprtiod_workqueue, &xprt->task_cleanup); 774 else if (xprt->snd_task && !test_bit(XPRT_SND_IS_COOKIE, &xprt->state)) 775 rpc_wake_up_queued_task_set_status(&xprt->pending, 776 xprt->snd_task, -ENOTCONN); 777 } 778 779 /** 780 * xprt_force_disconnect - force a transport to disconnect 781 * @xprt: transport to disconnect 782 * 783 */ 784 void xprt_force_disconnect(struct rpc_xprt *xprt) 785 { 786 trace_xprt_disconnect_force(xprt); 787 788 /* Don't race with the test_bit() in xprt_clear_locked() */ 789 spin_lock(&xprt->transport_lock); 790 xprt_schedule_autoclose_locked(xprt); 791 spin_unlock(&xprt->transport_lock); 792 } 793 EXPORT_SYMBOL_GPL(xprt_force_disconnect); 794 795 static unsigned int 796 xprt_connect_cookie(struct rpc_xprt *xprt) 797 { 798 return READ_ONCE(xprt->connect_cookie); 799 } 800 801 static bool 802 xprt_request_retransmit_after_disconnect(struct rpc_task *task) 803 { 804 struct rpc_rqst *req = task->tk_rqstp; 805 struct rpc_xprt *xprt = req->rq_xprt; 806 807 return req->rq_connect_cookie != xprt_connect_cookie(xprt) || 808 !xprt_connected(xprt); 809 } 810 811 /** 812 * xprt_conditional_disconnect - force a transport to disconnect 813 * @xprt: transport to disconnect 814 * @cookie: 'connection cookie' 815 * 816 * This attempts to break the connection if and only if 'cookie' matches 817 * the current transport 'connection cookie'. It ensures that we don't 818 * try to break the connection more than once when we need to retransmit 819 * a batch of RPC requests. 820 * 821 */ 822 void xprt_conditional_disconnect(struct rpc_xprt *xprt, unsigned int cookie) 823 { 824 /* Don't race with the test_bit() in xprt_clear_locked() */ 825 spin_lock(&xprt->transport_lock); 826 if (cookie != xprt->connect_cookie) 827 goto out; 828 if (test_bit(XPRT_CLOSING, &xprt->state)) 829 goto out; 830 xprt_schedule_autoclose_locked(xprt); 831 out: 832 spin_unlock(&xprt->transport_lock); 833 } 834 835 static bool 836 xprt_has_timer(const struct rpc_xprt *xprt) 837 { 838 return xprt->idle_timeout != 0; 839 } 840 841 static void 842 xprt_schedule_autodisconnect(struct rpc_xprt *xprt) 843 __must_hold(&xprt->transport_lock) 844 { 845 xprt->last_used = jiffies; 846 if (RB_EMPTY_ROOT(&xprt->recv_queue) && xprt_has_timer(xprt)) 847 mod_timer(&xprt->timer, xprt->last_used + xprt->idle_timeout); 848 } 849 850 static void 851 xprt_init_autodisconnect(struct timer_list *t) 852 { 853 struct rpc_xprt *xprt = from_timer(xprt, t, timer); 854 855 if (!RB_EMPTY_ROOT(&xprt->recv_queue)) 856 return; 857 /* Reset xprt->last_used to avoid connect/autodisconnect cycling */ 858 xprt->last_used = jiffies; 859 if (test_and_set_bit(XPRT_LOCKED, &xprt->state)) 860 return; 861 queue_work(xprtiod_workqueue, &xprt->task_cleanup); 862 } 863 864 #if IS_ENABLED(CONFIG_FAIL_SUNRPC) 865 static void xprt_inject_disconnect(struct rpc_xprt *xprt) 866 { 867 if (!fail_sunrpc.ignore_client_disconnect && 868 should_fail(&fail_sunrpc.attr, 1)) 869 xprt->ops->inject_disconnect(xprt); 870 } 871 #else 872 static inline void xprt_inject_disconnect(struct rpc_xprt *xprt) 873 { 874 } 875 #endif 876 877 bool xprt_lock_connect(struct rpc_xprt *xprt, 878 struct rpc_task *task, 879 void *cookie) 880 { 881 bool ret = false; 882 883 spin_lock(&xprt->transport_lock); 884 if (!test_bit(XPRT_LOCKED, &xprt->state)) 885 goto out; 886 if (xprt->snd_task != task) 887 goto out; 888 set_bit(XPRT_SND_IS_COOKIE, &xprt->state); 889 xprt->snd_task = cookie; 890 ret = true; 891 out: 892 spin_unlock(&xprt->transport_lock); 893 return ret; 894 } 895 EXPORT_SYMBOL_GPL(xprt_lock_connect); 896 897 void xprt_unlock_connect(struct rpc_xprt *xprt, void *cookie) 898 { 899 spin_lock(&xprt->transport_lock); 900 if (xprt->snd_task != cookie) 901 goto out; 902 if (!test_bit(XPRT_LOCKED, &xprt->state)) 903 goto out; 904 xprt->snd_task =NULL; 905 clear_bit(XPRT_SND_IS_COOKIE, &xprt->state); 906 xprt->ops->release_xprt(xprt, NULL); 907 xprt_schedule_autodisconnect(xprt); 908 out: 909 spin_unlock(&xprt->transport_lock); 910 wake_up_bit(&xprt->state, XPRT_LOCKED); 911 } 912 EXPORT_SYMBOL_GPL(xprt_unlock_connect); 913 914 /** 915 * xprt_connect - schedule a transport connect operation 916 * @task: RPC task that is requesting the connect 917 * 918 */ 919 void xprt_connect(struct rpc_task *task) 920 { 921 struct rpc_xprt *xprt = task->tk_rqstp->rq_xprt; 922 923 trace_xprt_connect(xprt); 924 925 if (!xprt_bound(xprt)) { 926 task->tk_status = -EAGAIN; 927 return; 928 } 929 if (!xprt_lock_write(xprt, task)) 930 return; 931 932 if (test_and_clear_bit(XPRT_CLOSE_WAIT, &xprt->state)) { 933 trace_xprt_disconnect_cleanup(xprt); 934 xprt->ops->close(xprt); 935 } 936 937 if (!xprt_connected(xprt)) { 938 task->tk_rqstp->rq_connect_cookie = xprt->connect_cookie; 939 rpc_sleep_on_timeout(&xprt->pending, task, NULL, 940 xprt_request_timeout(task->tk_rqstp)); 941 942 if (test_bit(XPRT_CLOSING, &xprt->state)) 943 return; 944 if (xprt_test_and_set_connecting(xprt)) 945 return; 946 /* Race breaker */ 947 if (!xprt_connected(xprt)) { 948 xprt->stat.connect_start = jiffies; 949 xprt->ops->connect(xprt, task); 950 } else { 951 xprt_clear_connecting(xprt); 952 task->tk_status = 0; 953 rpc_wake_up_queued_task(&xprt->pending, task); 954 } 955 } 956 xprt_release_write(xprt, task); 957 } 958 959 /** 960 * xprt_reconnect_delay - compute the wait before scheduling a connect 961 * @xprt: transport instance 962 * 963 */ 964 unsigned long xprt_reconnect_delay(const struct rpc_xprt *xprt) 965 { 966 unsigned long start, now = jiffies; 967 968 start = xprt->stat.connect_start + xprt->reestablish_timeout; 969 if (time_after(start, now)) 970 return start - now; 971 return 0; 972 } 973 EXPORT_SYMBOL_GPL(xprt_reconnect_delay); 974 975 /** 976 * xprt_reconnect_backoff - compute the new re-establish timeout 977 * @xprt: transport instance 978 * @init_to: initial reestablish timeout 979 * 980 */ 981 void xprt_reconnect_backoff(struct rpc_xprt *xprt, unsigned long init_to) 982 { 983 xprt->reestablish_timeout <<= 1; 984 if (xprt->reestablish_timeout > xprt->max_reconnect_timeout) 985 xprt->reestablish_timeout = xprt->max_reconnect_timeout; 986 if (xprt->reestablish_timeout < init_to) 987 xprt->reestablish_timeout = init_to; 988 } 989 EXPORT_SYMBOL_GPL(xprt_reconnect_backoff); 990 991 enum xprt_xid_rb_cmp { 992 XID_RB_EQUAL, 993 XID_RB_LEFT, 994 XID_RB_RIGHT, 995 }; 996 static enum xprt_xid_rb_cmp 997 xprt_xid_cmp(__be32 xid1, __be32 xid2) 998 { 999 if (xid1 == xid2) 1000 return XID_RB_EQUAL; 1001 if ((__force u32)xid1 < (__force u32)xid2) 1002 return XID_RB_LEFT; 1003 return XID_RB_RIGHT; 1004 } 1005 1006 static struct rpc_rqst * 1007 xprt_request_rb_find(struct rpc_xprt *xprt, __be32 xid) 1008 { 1009 struct rb_node *n = xprt->recv_queue.rb_node; 1010 struct rpc_rqst *req; 1011 1012 while (n != NULL) { 1013 req = rb_entry(n, struct rpc_rqst, rq_recv); 1014 switch (xprt_xid_cmp(xid, req->rq_xid)) { 1015 case XID_RB_LEFT: 1016 n = n->rb_left; 1017 break; 1018 case XID_RB_RIGHT: 1019 n = n->rb_right; 1020 break; 1021 case XID_RB_EQUAL: 1022 return req; 1023 } 1024 } 1025 return NULL; 1026 } 1027 1028 static void 1029 xprt_request_rb_insert(struct rpc_xprt *xprt, struct rpc_rqst *new) 1030 { 1031 struct rb_node **p = &xprt->recv_queue.rb_node; 1032 struct rb_node *n = NULL; 1033 struct rpc_rqst *req; 1034 1035 while (*p != NULL) { 1036 n = *p; 1037 req = rb_entry(n, struct rpc_rqst, rq_recv); 1038 switch(xprt_xid_cmp(new->rq_xid, req->rq_xid)) { 1039 case XID_RB_LEFT: 1040 p = &n->rb_left; 1041 break; 1042 case XID_RB_RIGHT: 1043 p = &n->rb_right; 1044 break; 1045 case XID_RB_EQUAL: 1046 WARN_ON_ONCE(new != req); 1047 return; 1048 } 1049 } 1050 rb_link_node(&new->rq_recv, n, p); 1051 rb_insert_color(&new->rq_recv, &xprt->recv_queue); 1052 } 1053 1054 static void 1055 xprt_request_rb_remove(struct rpc_xprt *xprt, struct rpc_rqst *req) 1056 { 1057 rb_erase(&req->rq_recv, &xprt->recv_queue); 1058 } 1059 1060 /** 1061 * xprt_lookup_rqst - find an RPC request corresponding to an XID 1062 * @xprt: transport on which the original request was transmitted 1063 * @xid: RPC XID of incoming reply 1064 * 1065 * Caller holds xprt->queue_lock. 1066 */ 1067 struct rpc_rqst *xprt_lookup_rqst(struct rpc_xprt *xprt, __be32 xid) 1068 { 1069 struct rpc_rqst *entry; 1070 1071 entry = xprt_request_rb_find(xprt, xid); 1072 if (entry != NULL) { 1073 trace_xprt_lookup_rqst(xprt, xid, 0); 1074 entry->rq_rtt = ktime_sub(ktime_get(), entry->rq_xtime); 1075 return entry; 1076 } 1077 1078 dprintk("RPC: xprt_lookup_rqst did not find xid %08x\n", 1079 ntohl(xid)); 1080 trace_xprt_lookup_rqst(xprt, xid, -ENOENT); 1081 xprt->stat.bad_xids++; 1082 return NULL; 1083 } 1084 EXPORT_SYMBOL_GPL(xprt_lookup_rqst); 1085 1086 static bool 1087 xprt_is_pinned_rqst(struct rpc_rqst *req) 1088 { 1089 return atomic_read(&req->rq_pin) != 0; 1090 } 1091 1092 /** 1093 * xprt_pin_rqst - Pin a request on the transport receive list 1094 * @req: Request to pin 1095 * 1096 * Caller must ensure this is atomic with the call to xprt_lookup_rqst() 1097 * so should be holding xprt->queue_lock. 1098 */ 1099 void xprt_pin_rqst(struct rpc_rqst *req) 1100 { 1101 atomic_inc(&req->rq_pin); 1102 } 1103 EXPORT_SYMBOL_GPL(xprt_pin_rqst); 1104 1105 /** 1106 * xprt_unpin_rqst - Unpin a request on the transport receive list 1107 * @req: Request to pin 1108 * 1109 * Caller should be holding xprt->queue_lock. 1110 */ 1111 void xprt_unpin_rqst(struct rpc_rqst *req) 1112 { 1113 if (!test_bit(RPC_TASK_MSG_PIN_WAIT, &req->rq_task->tk_runstate)) { 1114 atomic_dec(&req->rq_pin); 1115 return; 1116 } 1117 if (atomic_dec_and_test(&req->rq_pin)) 1118 wake_up_var(&req->rq_pin); 1119 } 1120 EXPORT_SYMBOL_GPL(xprt_unpin_rqst); 1121 1122 static void xprt_wait_on_pinned_rqst(struct rpc_rqst *req) 1123 { 1124 wait_var_event(&req->rq_pin, !xprt_is_pinned_rqst(req)); 1125 } 1126 1127 static bool 1128 xprt_request_data_received(struct rpc_task *task) 1129 { 1130 return !test_bit(RPC_TASK_NEED_RECV, &task->tk_runstate) && 1131 READ_ONCE(task->tk_rqstp->rq_reply_bytes_recvd) != 0; 1132 } 1133 1134 static bool 1135 xprt_request_need_enqueue_receive(struct rpc_task *task, struct rpc_rqst *req) 1136 { 1137 return !test_bit(RPC_TASK_NEED_RECV, &task->tk_runstate) && 1138 READ_ONCE(task->tk_rqstp->rq_reply_bytes_recvd) == 0; 1139 } 1140 1141 /** 1142 * xprt_request_enqueue_receive - Add an request to the receive queue 1143 * @task: RPC task 1144 * 1145 */ 1146 void 1147 xprt_request_enqueue_receive(struct rpc_task *task) 1148 { 1149 struct rpc_rqst *req = task->tk_rqstp; 1150 struct rpc_xprt *xprt = req->rq_xprt; 1151 1152 if (!xprt_request_need_enqueue_receive(task, req)) 1153 return; 1154 1155 xprt_request_prepare(task->tk_rqstp); 1156 spin_lock(&xprt->queue_lock); 1157 1158 /* Update the softirq receive buffer */ 1159 memcpy(&req->rq_private_buf, &req->rq_rcv_buf, 1160 sizeof(req->rq_private_buf)); 1161 1162 /* Add request to the receive list */ 1163 xprt_request_rb_insert(xprt, req); 1164 set_bit(RPC_TASK_NEED_RECV, &task->tk_runstate); 1165 spin_unlock(&xprt->queue_lock); 1166 1167 /* Turn off autodisconnect */ 1168 del_singleshot_timer_sync(&xprt->timer); 1169 } 1170 1171 /** 1172 * xprt_request_dequeue_receive_locked - Remove a request from the receive queue 1173 * @task: RPC task 1174 * 1175 * Caller must hold xprt->queue_lock. 1176 */ 1177 static void 1178 xprt_request_dequeue_receive_locked(struct rpc_task *task) 1179 { 1180 struct rpc_rqst *req = task->tk_rqstp; 1181 1182 if (test_and_clear_bit(RPC_TASK_NEED_RECV, &task->tk_runstate)) 1183 xprt_request_rb_remove(req->rq_xprt, req); 1184 } 1185 1186 /** 1187 * xprt_update_rtt - Update RPC RTT statistics 1188 * @task: RPC request that recently completed 1189 * 1190 * Caller holds xprt->queue_lock. 1191 */ 1192 void xprt_update_rtt(struct rpc_task *task) 1193 { 1194 struct rpc_rqst *req = task->tk_rqstp; 1195 struct rpc_rtt *rtt = task->tk_client->cl_rtt; 1196 unsigned int timer = task->tk_msg.rpc_proc->p_timer; 1197 long m = usecs_to_jiffies(ktime_to_us(req->rq_rtt)); 1198 1199 if (timer) { 1200 if (req->rq_ntrans == 1) 1201 rpc_update_rtt(rtt, timer, m); 1202 rpc_set_timeo(rtt, timer, req->rq_ntrans - 1); 1203 } 1204 } 1205 EXPORT_SYMBOL_GPL(xprt_update_rtt); 1206 1207 /** 1208 * xprt_complete_rqst - called when reply processing is complete 1209 * @task: RPC request that recently completed 1210 * @copied: actual number of bytes received from the transport 1211 * 1212 * Caller holds xprt->queue_lock. 1213 */ 1214 void xprt_complete_rqst(struct rpc_task *task, int copied) 1215 { 1216 struct rpc_rqst *req = task->tk_rqstp; 1217 struct rpc_xprt *xprt = req->rq_xprt; 1218 1219 xprt->stat.recvs++; 1220 1221 req->rq_private_buf.len = copied; 1222 /* Ensure all writes are done before we update */ 1223 /* req->rq_reply_bytes_recvd */ 1224 smp_wmb(); 1225 req->rq_reply_bytes_recvd = copied; 1226 xprt_request_dequeue_receive_locked(task); 1227 rpc_wake_up_queued_task(&xprt->pending, task); 1228 } 1229 EXPORT_SYMBOL_GPL(xprt_complete_rqst); 1230 1231 static void xprt_timer(struct rpc_task *task) 1232 { 1233 struct rpc_rqst *req = task->tk_rqstp; 1234 struct rpc_xprt *xprt = req->rq_xprt; 1235 1236 if (task->tk_status != -ETIMEDOUT) 1237 return; 1238 1239 trace_xprt_timer(xprt, req->rq_xid, task->tk_status); 1240 if (!req->rq_reply_bytes_recvd) { 1241 if (xprt->ops->timer) 1242 xprt->ops->timer(xprt, task); 1243 } else 1244 task->tk_status = 0; 1245 } 1246 1247 /** 1248 * xprt_wait_for_reply_request_def - wait for reply 1249 * @task: pointer to rpc_task 1250 * 1251 * Set a request's retransmit timeout based on the transport's 1252 * default timeout parameters. Used by transports that don't adjust 1253 * the retransmit timeout based on round-trip time estimation, 1254 * and put the task to sleep on the pending queue. 1255 */ 1256 void xprt_wait_for_reply_request_def(struct rpc_task *task) 1257 { 1258 struct rpc_rqst *req = task->tk_rqstp; 1259 1260 rpc_sleep_on_timeout(&req->rq_xprt->pending, task, xprt_timer, 1261 xprt_request_timeout(req)); 1262 } 1263 EXPORT_SYMBOL_GPL(xprt_wait_for_reply_request_def); 1264 1265 /** 1266 * xprt_wait_for_reply_request_rtt - wait for reply using RTT estimator 1267 * @task: pointer to rpc_task 1268 * 1269 * Set a request's retransmit timeout using the RTT estimator, 1270 * and put the task to sleep on the pending queue. 1271 */ 1272 void xprt_wait_for_reply_request_rtt(struct rpc_task *task) 1273 { 1274 int timer = task->tk_msg.rpc_proc->p_timer; 1275 struct rpc_clnt *clnt = task->tk_client; 1276 struct rpc_rtt *rtt = clnt->cl_rtt; 1277 struct rpc_rqst *req = task->tk_rqstp; 1278 unsigned long max_timeout = clnt->cl_timeout->to_maxval; 1279 unsigned long timeout; 1280 1281 timeout = rpc_calc_rto(rtt, timer); 1282 timeout <<= rpc_ntimeo(rtt, timer) + req->rq_retries; 1283 if (timeout > max_timeout || timeout == 0) 1284 timeout = max_timeout; 1285 rpc_sleep_on_timeout(&req->rq_xprt->pending, task, xprt_timer, 1286 jiffies + timeout); 1287 } 1288 EXPORT_SYMBOL_GPL(xprt_wait_for_reply_request_rtt); 1289 1290 /** 1291 * xprt_request_wait_receive - wait for the reply to an RPC request 1292 * @task: RPC task about to send a request 1293 * 1294 */ 1295 void xprt_request_wait_receive(struct rpc_task *task) 1296 { 1297 struct rpc_rqst *req = task->tk_rqstp; 1298 struct rpc_xprt *xprt = req->rq_xprt; 1299 1300 if (!test_bit(RPC_TASK_NEED_RECV, &task->tk_runstate)) 1301 return; 1302 /* 1303 * Sleep on the pending queue if we're expecting a reply. 1304 * The spinlock ensures atomicity between the test of 1305 * req->rq_reply_bytes_recvd, and the call to rpc_sleep_on(). 1306 */ 1307 spin_lock(&xprt->queue_lock); 1308 if (test_bit(RPC_TASK_NEED_RECV, &task->tk_runstate)) { 1309 xprt->ops->wait_for_reply_request(task); 1310 /* 1311 * Send an extra queue wakeup call if the 1312 * connection was dropped in case the call to 1313 * rpc_sleep_on() raced. 1314 */ 1315 if (xprt_request_retransmit_after_disconnect(task)) 1316 rpc_wake_up_queued_task_set_status(&xprt->pending, 1317 task, -ENOTCONN); 1318 } 1319 spin_unlock(&xprt->queue_lock); 1320 } 1321 1322 static bool 1323 xprt_request_need_enqueue_transmit(struct rpc_task *task, struct rpc_rqst *req) 1324 { 1325 return !test_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate); 1326 } 1327 1328 /** 1329 * xprt_request_enqueue_transmit - queue a task for transmission 1330 * @task: pointer to rpc_task 1331 * 1332 * Add a task to the transmission queue. 1333 */ 1334 void 1335 xprt_request_enqueue_transmit(struct rpc_task *task) 1336 { 1337 struct rpc_rqst *pos, *req = task->tk_rqstp; 1338 struct rpc_xprt *xprt = req->rq_xprt; 1339 1340 if (xprt_request_need_enqueue_transmit(task, req)) { 1341 req->rq_bytes_sent = 0; 1342 spin_lock(&xprt->queue_lock); 1343 /* 1344 * Requests that carry congestion control credits are added 1345 * to the head of the list to avoid starvation issues. 1346 */ 1347 if (req->rq_cong) { 1348 xprt_clear_congestion_window_wait(xprt); 1349 list_for_each_entry(pos, &xprt->xmit_queue, rq_xmit) { 1350 if (pos->rq_cong) 1351 continue; 1352 /* Note: req is added _before_ pos */ 1353 list_add_tail(&req->rq_xmit, &pos->rq_xmit); 1354 INIT_LIST_HEAD(&req->rq_xmit2); 1355 goto out; 1356 } 1357 } else if (RPC_IS_SWAPPER(task)) { 1358 list_for_each_entry(pos, &xprt->xmit_queue, rq_xmit) { 1359 if (pos->rq_cong || pos->rq_bytes_sent) 1360 continue; 1361 if (RPC_IS_SWAPPER(pos->rq_task)) 1362 continue; 1363 /* Note: req is added _before_ pos */ 1364 list_add_tail(&req->rq_xmit, &pos->rq_xmit); 1365 INIT_LIST_HEAD(&req->rq_xmit2); 1366 goto out; 1367 } 1368 } else if (!req->rq_seqno) { 1369 list_for_each_entry(pos, &xprt->xmit_queue, rq_xmit) { 1370 if (pos->rq_task->tk_owner != task->tk_owner) 1371 continue; 1372 list_add_tail(&req->rq_xmit2, &pos->rq_xmit2); 1373 INIT_LIST_HEAD(&req->rq_xmit); 1374 goto out; 1375 } 1376 } 1377 list_add_tail(&req->rq_xmit, &xprt->xmit_queue); 1378 INIT_LIST_HEAD(&req->rq_xmit2); 1379 out: 1380 atomic_long_inc(&xprt->xmit_queuelen); 1381 set_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate); 1382 spin_unlock(&xprt->queue_lock); 1383 } 1384 } 1385 1386 /** 1387 * xprt_request_dequeue_transmit_locked - remove a task from the transmission queue 1388 * @task: pointer to rpc_task 1389 * 1390 * Remove a task from the transmission queue 1391 * Caller must hold xprt->queue_lock 1392 */ 1393 static void 1394 xprt_request_dequeue_transmit_locked(struct rpc_task *task) 1395 { 1396 struct rpc_rqst *req = task->tk_rqstp; 1397 1398 if (!test_and_clear_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate)) 1399 return; 1400 if (!list_empty(&req->rq_xmit)) { 1401 list_del(&req->rq_xmit); 1402 if (!list_empty(&req->rq_xmit2)) { 1403 struct rpc_rqst *next = list_first_entry(&req->rq_xmit2, 1404 struct rpc_rqst, rq_xmit2); 1405 list_del(&req->rq_xmit2); 1406 list_add_tail(&next->rq_xmit, &next->rq_xprt->xmit_queue); 1407 } 1408 } else 1409 list_del(&req->rq_xmit2); 1410 atomic_long_dec(&req->rq_xprt->xmit_queuelen); 1411 } 1412 1413 /** 1414 * xprt_request_dequeue_transmit - remove a task from the transmission queue 1415 * @task: pointer to rpc_task 1416 * 1417 * Remove a task from the transmission queue 1418 */ 1419 static void 1420 xprt_request_dequeue_transmit(struct rpc_task *task) 1421 { 1422 struct rpc_rqst *req = task->tk_rqstp; 1423 struct rpc_xprt *xprt = req->rq_xprt; 1424 1425 spin_lock(&xprt->queue_lock); 1426 xprt_request_dequeue_transmit_locked(task); 1427 spin_unlock(&xprt->queue_lock); 1428 } 1429 1430 /** 1431 * xprt_request_dequeue_xprt - remove a task from the transmit+receive queue 1432 * @task: pointer to rpc_task 1433 * 1434 * Remove a task from the transmit and receive queues, and ensure that 1435 * it is not pinned by the receive work item. 1436 */ 1437 void 1438 xprt_request_dequeue_xprt(struct rpc_task *task) 1439 { 1440 struct rpc_rqst *req = task->tk_rqstp; 1441 struct rpc_xprt *xprt = req->rq_xprt; 1442 1443 if (test_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate) || 1444 test_bit(RPC_TASK_NEED_RECV, &task->tk_runstate) || 1445 xprt_is_pinned_rqst(req)) { 1446 spin_lock(&xprt->queue_lock); 1447 xprt_request_dequeue_transmit_locked(task); 1448 xprt_request_dequeue_receive_locked(task); 1449 while (xprt_is_pinned_rqst(req)) { 1450 set_bit(RPC_TASK_MSG_PIN_WAIT, &task->tk_runstate); 1451 spin_unlock(&xprt->queue_lock); 1452 xprt_wait_on_pinned_rqst(req); 1453 spin_lock(&xprt->queue_lock); 1454 clear_bit(RPC_TASK_MSG_PIN_WAIT, &task->tk_runstate); 1455 } 1456 spin_unlock(&xprt->queue_lock); 1457 } 1458 } 1459 1460 /** 1461 * xprt_request_prepare - prepare an encoded request for transport 1462 * @req: pointer to rpc_rqst 1463 * 1464 * Calls into the transport layer to do whatever is needed to prepare 1465 * the request for transmission or receive. 1466 */ 1467 void 1468 xprt_request_prepare(struct rpc_rqst *req) 1469 { 1470 struct rpc_xprt *xprt = req->rq_xprt; 1471 1472 if (xprt->ops->prepare_request) 1473 xprt->ops->prepare_request(req); 1474 } 1475 1476 /** 1477 * xprt_request_need_retransmit - Test if a task needs retransmission 1478 * @task: pointer to rpc_task 1479 * 1480 * Test for whether a connection breakage requires the task to retransmit 1481 */ 1482 bool 1483 xprt_request_need_retransmit(struct rpc_task *task) 1484 { 1485 return xprt_request_retransmit_after_disconnect(task); 1486 } 1487 1488 /** 1489 * xprt_prepare_transmit - reserve the transport before sending a request 1490 * @task: RPC task about to send a request 1491 * 1492 */ 1493 bool xprt_prepare_transmit(struct rpc_task *task) 1494 { 1495 struct rpc_rqst *req = task->tk_rqstp; 1496 struct rpc_xprt *xprt = req->rq_xprt; 1497 1498 if (!xprt_lock_write(xprt, task)) { 1499 /* Race breaker: someone may have transmitted us */ 1500 if (!test_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate)) 1501 rpc_wake_up_queued_task_set_status(&xprt->sending, 1502 task, 0); 1503 return false; 1504 1505 } 1506 return true; 1507 } 1508 1509 void xprt_end_transmit(struct rpc_task *task) 1510 { 1511 struct rpc_xprt *xprt = task->tk_rqstp->rq_xprt; 1512 1513 xprt_inject_disconnect(xprt); 1514 xprt_release_write(xprt, task); 1515 } 1516 1517 /** 1518 * xprt_request_transmit - send an RPC request on a transport 1519 * @req: pointer to request to transmit 1520 * @snd_task: RPC task that owns the transport lock 1521 * 1522 * This performs the transmission of a single request. 1523 * Note that if the request is not the same as snd_task, then it 1524 * does need to be pinned. 1525 * Returns '0' on success. 1526 */ 1527 static int 1528 xprt_request_transmit(struct rpc_rqst *req, struct rpc_task *snd_task) 1529 { 1530 struct rpc_xprt *xprt = req->rq_xprt; 1531 struct rpc_task *task = req->rq_task; 1532 unsigned int connect_cookie; 1533 int is_retrans = RPC_WAS_SENT(task); 1534 int status; 1535 1536 if (!req->rq_bytes_sent) { 1537 if (xprt_request_data_received(task)) { 1538 status = 0; 1539 goto out_dequeue; 1540 } 1541 /* Verify that our message lies in the RPCSEC_GSS window */ 1542 if (rpcauth_xmit_need_reencode(task)) { 1543 status = -EBADMSG; 1544 goto out_dequeue; 1545 } 1546 if (RPC_SIGNALLED(task)) { 1547 status = -ERESTARTSYS; 1548 goto out_dequeue; 1549 } 1550 } 1551 1552 /* 1553 * Update req->rq_ntrans before transmitting to avoid races with 1554 * xprt_update_rtt(), which needs to know that it is recording a 1555 * reply to the first transmission. 1556 */ 1557 req->rq_ntrans++; 1558 1559 trace_rpc_xdr_sendto(task, &req->rq_snd_buf); 1560 connect_cookie = xprt->connect_cookie; 1561 status = xprt->ops->send_request(req); 1562 if (status != 0) { 1563 req->rq_ntrans--; 1564 trace_xprt_transmit(req, status); 1565 return status; 1566 } 1567 1568 if (is_retrans) { 1569 task->tk_client->cl_stats->rpcretrans++; 1570 trace_xprt_retransmit(req); 1571 } 1572 1573 xprt_inject_disconnect(xprt); 1574 1575 task->tk_flags |= RPC_TASK_SENT; 1576 spin_lock(&xprt->transport_lock); 1577 1578 xprt->stat.sends++; 1579 xprt->stat.req_u += xprt->stat.sends - xprt->stat.recvs; 1580 xprt->stat.bklog_u += xprt->backlog.qlen; 1581 xprt->stat.sending_u += xprt->sending.qlen; 1582 xprt->stat.pending_u += xprt->pending.qlen; 1583 spin_unlock(&xprt->transport_lock); 1584 1585 req->rq_connect_cookie = connect_cookie; 1586 out_dequeue: 1587 trace_xprt_transmit(req, status); 1588 xprt_request_dequeue_transmit(task); 1589 rpc_wake_up_queued_task_set_status(&xprt->sending, task, status); 1590 return status; 1591 } 1592 1593 /** 1594 * xprt_transmit - send an RPC request on a transport 1595 * @task: controlling RPC task 1596 * 1597 * Attempts to drain the transmit queue. On exit, either the transport 1598 * signalled an error that needs to be handled before transmission can 1599 * resume, or @task finished transmitting, and detected that it already 1600 * received a reply. 1601 */ 1602 void 1603 xprt_transmit(struct rpc_task *task) 1604 { 1605 struct rpc_rqst *next, *req = task->tk_rqstp; 1606 struct rpc_xprt *xprt = req->rq_xprt; 1607 int status; 1608 1609 spin_lock(&xprt->queue_lock); 1610 for (;;) { 1611 next = list_first_entry_or_null(&xprt->xmit_queue, 1612 struct rpc_rqst, rq_xmit); 1613 if (!next) 1614 break; 1615 xprt_pin_rqst(next); 1616 spin_unlock(&xprt->queue_lock); 1617 status = xprt_request_transmit(next, task); 1618 if (status == -EBADMSG && next != req) 1619 status = 0; 1620 spin_lock(&xprt->queue_lock); 1621 xprt_unpin_rqst(next); 1622 if (status < 0) { 1623 if (test_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate)) 1624 task->tk_status = status; 1625 break; 1626 } 1627 /* Was @task transmitted, and has it received a reply? */ 1628 if (xprt_request_data_received(task) && 1629 !test_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate)) 1630 break; 1631 cond_resched_lock(&xprt->queue_lock); 1632 } 1633 spin_unlock(&xprt->queue_lock); 1634 } 1635 1636 static void xprt_complete_request_init(struct rpc_task *task) 1637 { 1638 if (task->tk_rqstp) 1639 xprt_request_init(task); 1640 } 1641 1642 void xprt_add_backlog(struct rpc_xprt *xprt, struct rpc_task *task) 1643 { 1644 set_bit(XPRT_CONGESTED, &xprt->state); 1645 rpc_sleep_on(&xprt->backlog, task, xprt_complete_request_init); 1646 } 1647 EXPORT_SYMBOL_GPL(xprt_add_backlog); 1648 1649 static bool __xprt_set_rq(struct rpc_task *task, void *data) 1650 { 1651 struct rpc_rqst *req = data; 1652 1653 if (task->tk_rqstp == NULL) { 1654 memset(req, 0, sizeof(*req)); /* mark unused */ 1655 task->tk_rqstp = req; 1656 return true; 1657 } 1658 return false; 1659 } 1660 1661 bool xprt_wake_up_backlog(struct rpc_xprt *xprt, struct rpc_rqst *req) 1662 { 1663 if (rpc_wake_up_first(&xprt->backlog, __xprt_set_rq, req) == NULL) { 1664 clear_bit(XPRT_CONGESTED, &xprt->state); 1665 return false; 1666 } 1667 return true; 1668 } 1669 EXPORT_SYMBOL_GPL(xprt_wake_up_backlog); 1670 1671 static bool xprt_throttle_congested(struct rpc_xprt *xprt, struct rpc_task *task) 1672 { 1673 bool ret = false; 1674 1675 if (!test_bit(XPRT_CONGESTED, &xprt->state)) 1676 goto out; 1677 spin_lock(&xprt->reserve_lock); 1678 if (test_bit(XPRT_CONGESTED, &xprt->state)) { 1679 xprt_add_backlog(xprt, task); 1680 ret = true; 1681 } 1682 spin_unlock(&xprt->reserve_lock); 1683 out: 1684 return ret; 1685 } 1686 1687 static struct rpc_rqst *xprt_dynamic_alloc_slot(struct rpc_xprt *xprt) 1688 { 1689 struct rpc_rqst *req = ERR_PTR(-EAGAIN); 1690 1691 if (xprt->num_reqs >= xprt->max_reqs) 1692 goto out; 1693 ++xprt->num_reqs; 1694 spin_unlock(&xprt->reserve_lock); 1695 req = kzalloc(sizeof(struct rpc_rqst), GFP_NOFS); 1696 spin_lock(&xprt->reserve_lock); 1697 if (req != NULL) 1698 goto out; 1699 --xprt->num_reqs; 1700 req = ERR_PTR(-ENOMEM); 1701 out: 1702 return req; 1703 } 1704 1705 static bool xprt_dynamic_free_slot(struct rpc_xprt *xprt, struct rpc_rqst *req) 1706 { 1707 if (xprt->num_reqs > xprt->min_reqs) { 1708 --xprt->num_reqs; 1709 kfree(req); 1710 return true; 1711 } 1712 return false; 1713 } 1714 1715 void xprt_alloc_slot(struct rpc_xprt *xprt, struct rpc_task *task) 1716 { 1717 struct rpc_rqst *req; 1718 1719 spin_lock(&xprt->reserve_lock); 1720 if (!list_empty(&xprt->free)) { 1721 req = list_entry(xprt->free.next, struct rpc_rqst, rq_list); 1722 list_del(&req->rq_list); 1723 goto out_init_req; 1724 } 1725 req = xprt_dynamic_alloc_slot(xprt); 1726 if (!IS_ERR(req)) 1727 goto out_init_req; 1728 switch (PTR_ERR(req)) { 1729 case -ENOMEM: 1730 dprintk("RPC: dynamic allocation of request slot " 1731 "failed! Retrying\n"); 1732 task->tk_status = -ENOMEM; 1733 break; 1734 case -EAGAIN: 1735 xprt_add_backlog(xprt, task); 1736 dprintk("RPC: waiting for request slot\n"); 1737 fallthrough; 1738 default: 1739 task->tk_status = -EAGAIN; 1740 } 1741 spin_unlock(&xprt->reserve_lock); 1742 return; 1743 out_init_req: 1744 xprt->stat.max_slots = max_t(unsigned int, xprt->stat.max_slots, 1745 xprt->num_reqs); 1746 spin_unlock(&xprt->reserve_lock); 1747 1748 task->tk_status = 0; 1749 task->tk_rqstp = req; 1750 } 1751 EXPORT_SYMBOL_GPL(xprt_alloc_slot); 1752 1753 void xprt_free_slot(struct rpc_xprt *xprt, struct rpc_rqst *req) 1754 { 1755 spin_lock(&xprt->reserve_lock); 1756 if (!xprt_wake_up_backlog(xprt, req) && 1757 !xprt_dynamic_free_slot(xprt, req)) { 1758 memset(req, 0, sizeof(*req)); /* mark unused */ 1759 list_add(&req->rq_list, &xprt->free); 1760 } 1761 spin_unlock(&xprt->reserve_lock); 1762 } 1763 EXPORT_SYMBOL_GPL(xprt_free_slot); 1764 1765 static void xprt_free_all_slots(struct rpc_xprt *xprt) 1766 { 1767 struct rpc_rqst *req; 1768 while (!list_empty(&xprt->free)) { 1769 req = list_first_entry(&xprt->free, struct rpc_rqst, rq_list); 1770 list_del(&req->rq_list); 1771 kfree(req); 1772 } 1773 } 1774 1775 static DEFINE_IDA(rpc_xprt_ids); 1776 1777 void xprt_cleanup_ids(void) 1778 { 1779 ida_destroy(&rpc_xprt_ids); 1780 } 1781 1782 static int xprt_alloc_id(struct rpc_xprt *xprt) 1783 { 1784 int id; 1785 1786 id = ida_simple_get(&rpc_xprt_ids, 0, 0, GFP_KERNEL); 1787 if (id < 0) 1788 return id; 1789 1790 xprt->id = id; 1791 return 0; 1792 } 1793 1794 static void xprt_free_id(struct rpc_xprt *xprt) 1795 { 1796 ida_simple_remove(&rpc_xprt_ids, xprt->id); 1797 } 1798 1799 struct rpc_xprt *xprt_alloc(struct net *net, size_t size, 1800 unsigned int num_prealloc, 1801 unsigned int max_alloc) 1802 { 1803 struct rpc_xprt *xprt; 1804 struct rpc_rqst *req; 1805 int i; 1806 1807 xprt = kzalloc(size, GFP_KERNEL); 1808 if (xprt == NULL) 1809 goto out; 1810 1811 xprt_alloc_id(xprt); 1812 xprt_init(xprt, net); 1813 1814 for (i = 0; i < num_prealloc; i++) { 1815 req = kzalloc(sizeof(struct rpc_rqst), GFP_KERNEL); 1816 if (!req) 1817 goto out_free; 1818 list_add(&req->rq_list, &xprt->free); 1819 } 1820 if (max_alloc > num_prealloc) 1821 xprt->max_reqs = max_alloc; 1822 else 1823 xprt->max_reqs = num_prealloc; 1824 xprt->min_reqs = num_prealloc; 1825 xprt->num_reqs = num_prealloc; 1826 1827 return xprt; 1828 1829 out_free: 1830 xprt_free(xprt); 1831 out: 1832 return NULL; 1833 } 1834 EXPORT_SYMBOL_GPL(xprt_alloc); 1835 1836 void xprt_free(struct rpc_xprt *xprt) 1837 { 1838 put_net(xprt->xprt_net); 1839 xprt_free_all_slots(xprt); 1840 xprt_free_id(xprt); 1841 rpc_sysfs_xprt_destroy(xprt); 1842 kfree_rcu(xprt, rcu); 1843 } 1844 EXPORT_SYMBOL_GPL(xprt_free); 1845 1846 static void 1847 xprt_init_connect_cookie(struct rpc_rqst *req, struct rpc_xprt *xprt) 1848 { 1849 req->rq_connect_cookie = xprt_connect_cookie(xprt) - 1; 1850 } 1851 1852 static __be32 1853 xprt_alloc_xid(struct rpc_xprt *xprt) 1854 { 1855 __be32 xid; 1856 1857 spin_lock(&xprt->reserve_lock); 1858 xid = (__force __be32)xprt->xid++; 1859 spin_unlock(&xprt->reserve_lock); 1860 return xid; 1861 } 1862 1863 static void 1864 xprt_init_xid(struct rpc_xprt *xprt) 1865 { 1866 xprt->xid = prandom_u32(); 1867 } 1868 1869 static void 1870 xprt_request_init(struct rpc_task *task) 1871 { 1872 struct rpc_xprt *xprt = task->tk_xprt; 1873 struct rpc_rqst *req = task->tk_rqstp; 1874 1875 req->rq_task = task; 1876 req->rq_xprt = xprt; 1877 req->rq_buffer = NULL; 1878 req->rq_xid = xprt_alloc_xid(xprt); 1879 xprt_init_connect_cookie(req, xprt); 1880 req->rq_snd_buf.len = 0; 1881 req->rq_snd_buf.buflen = 0; 1882 req->rq_rcv_buf.len = 0; 1883 req->rq_rcv_buf.buflen = 0; 1884 req->rq_snd_buf.bvec = NULL; 1885 req->rq_rcv_buf.bvec = NULL; 1886 req->rq_release_snd_buf = NULL; 1887 xprt_init_majortimeo(task, req); 1888 1889 trace_xprt_reserve(req); 1890 } 1891 1892 static void 1893 xprt_do_reserve(struct rpc_xprt *xprt, struct rpc_task *task) 1894 { 1895 xprt->ops->alloc_slot(xprt, task); 1896 if (task->tk_rqstp != NULL) 1897 xprt_request_init(task); 1898 } 1899 1900 /** 1901 * xprt_reserve - allocate an RPC request slot 1902 * @task: RPC task requesting a slot allocation 1903 * 1904 * If the transport is marked as being congested, or if no more 1905 * slots are available, place the task on the transport's 1906 * backlog queue. 1907 */ 1908 void xprt_reserve(struct rpc_task *task) 1909 { 1910 struct rpc_xprt *xprt = task->tk_xprt; 1911 1912 task->tk_status = 0; 1913 if (task->tk_rqstp != NULL) 1914 return; 1915 1916 task->tk_status = -EAGAIN; 1917 if (!xprt_throttle_congested(xprt, task)) 1918 xprt_do_reserve(xprt, task); 1919 } 1920 1921 /** 1922 * xprt_retry_reserve - allocate an RPC request slot 1923 * @task: RPC task requesting a slot allocation 1924 * 1925 * If no more slots are available, place the task on the transport's 1926 * backlog queue. 1927 * Note that the only difference with xprt_reserve is that we now 1928 * ignore the value of the XPRT_CONGESTED flag. 1929 */ 1930 void xprt_retry_reserve(struct rpc_task *task) 1931 { 1932 struct rpc_xprt *xprt = task->tk_xprt; 1933 1934 task->tk_status = 0; 1935 if (task->tk_rqstp != NULL) 1936 return; 1937 1938 task->tk_status = -EAGAIN; 1939 xprt_do_reserve(xprt, task); 1940 } 1941 1942 /** 1943 * xprt_release - release an RPC request slot 1944 * @task: task which is finished with the slot 1945 * 1946 */ 1947 void xprt_release(struct rpc_task *task) 1948 { 1949 struct rpc_xprt *xprt; 1950 struct rpc_rqst *req = task->tk_rqstp; 1951 1952 if (req == NULL) { 1953 if (task->tk_client) { 1954 xprt = task->tk_xprt; 1955 xprt_release_write(xprt, task); 1956 } 1957 return; 1958 } 1959 1960 xprt = req->rq_xprt; 1961 xprt_request_dequeue_xprt(task); 1962 spin_lock(&xprt->transport_lock); 1963 xprt->ops->release_xprt(xprt, task); 1964 if (xprt->ops->release_request) 1965 xprt->ops->release_request(task); 1966 xprt_schedule_autodisconnect(xprt); 1967 spin_unlock(&xprt->transport_lock); 1968 if (req->rq_buffer) 1969 xprt->ops->buf_free(task); 1970 xdr_free_bvec(&req->rq_rcv_buf); 1971 xdr_free_bvec(&req->rq_snd_buf); 1972 if (req->rq_cred != NULL) 1973 put_rpccred(req->rq_cred); 1974 if (req->rq_release_snd_buf) 1975 req->rq_release_snd_buf(req); 1976 1977 task->tk_rqstp = NULL; 1978 if (likely(!bc_prealloc(req))) 1979 xprt->ops->free_slot(xprt, req); 1980 else 1981 xprt_free_bc_request(req); 1982 } 1983 1984 #ifdef CONFIG_SUNRPC_BACKCHANNEL 1985 void 1986 xprt_init_bc_request(struct rpc_rqst *req, struct rpc_task *task) 1987 { 1988 struct xdr_buf *xbufp = &req->rq_snd_buf; 1989 1990 task->tk_rqstp = req; 1991 req->rq_task = task; 1992 xprt_init_connect_cookie(req, req->rq_xprt); 1993 /* 1994 * Set up the xdr_buf length. 1995 * This also indicates that the buffer is XDR encoded already. 1996 */ 1997 xbufp->len = xbufp->head[0].iov_len + xbufp->page_len + 1998 xbufp->tail[0].iov_len; 1999 } 2000 #endif 2001 2002 static void xprt_init(struct rpc_xprt *xprt, struct net *net) 2003 { 2004 kref_init(&xprt->kref); 2005 2006 spin_lock_init(&xprt->transport_lock); 2007 spin_lock_init(&xprt->reserve_lock); 2008 spin_lock_init(&xprt->queue_lock); 2009 2010 INIT_LIST_HEAD(&xprt->free); 2011 xprt->recv_queue = RB_ROOT; 2012 INIT_LIST_HEAD(&xprt->xmit_queue); 2013 #if defined(CONFIG_SUNRPC_BACKCHANNEL) 2014 spin_lock_init(&xprt->bc_pa_lock); 2015 INIT_LIST_HEAD(&xprt->bc_pa_list); 2016 #endif /* CONFIG_SUNRPC_BACKCHANNEL */ 2017 INIT_LIST_HEAD(&xprt->xprt_switch); 2018 2019 xprt->last_used = jiffies; 2020 xprt->cwnd = RPC_INITCWND; 2021 xprt->bind_index = 0; 2022 2023 rpc_init_wait_queue(&xprt->binding, "xprt_binding"); 2024 rpc_init_wait_queue(&xprt->pending, "xprt_pending"); 2025 rpc_init_wait_queue(&xprt->sending, "xprt_sending"); 2026 rpc_init_priority_wait_queue(&xprt->backlog, "xprt_backlog"); 2027 2028 xprt_init_xid(xprt); 2029 2030 xprt->xprt_net = get_net(net); 2031 } 2032 2033 /** 2034 * xprt_create_transport - create an RPC transport 2035 * @args: rpc transport creation arguments 2036 * 2037 */ 2038 struct rpc_xprt *xprt_create_transport(struct xprt_create *args) 2039 { 2040 struct rpc_xprt *xprt; 2041 const struct xprt_class *t; 2042 2043 t = xprt_class_find_by_ident(args->ident); 2044 if (!t) { 2045 dprintk("RPC: transport (%d) not supported\n", args->ident); 2046 return ERR_PTR(-EIO); 2047 } 2048 2049 xprt = t->setup(args); 2050 xprt_class_release(t); 2051 2052 if (IS_ERR(xprt)) 2053 goto out; 2054 if (args->flags & XPRT_CREATE_NO_IDLE_TIMEOUT) 2055 xprt->idle_timeout = 0; 2056 INIT_WORK(&xprt->task_cleanup, xprt_autoclose); 2057 if (xprt_has_timer(xprt)) 2058 timer_setup(&xprt->timer, xprt_init_autodisconnect, 0); 2059 else 2060 timer_setup(&xprt->timer, NULL, 0); 2061 2062 if (strlen(args->servername) > RPC_MAXNETNAMELEN) { 2063 xprt_destroy(xprt); 2064 return ERR_PTR(-EINVAL); 2065 } 2066 xprt->servername = kstrdup(args->servername, GFP_KERNEL); 2067 if (xprt->servername == NULL) { 2068 xprt_destroy(xprt); 2069 return ERR_PTR(-ENOMEM); 2070 } 2071 2072 rpc_xprt_debugfs_register(xprt); 2073 2074 trace_xprt_create(xprt); 2075 out: 2076 return xprt; 2077 } 2078 2079 static void xprt_destroy_cb(struct work_struct *work) 2080 { 2081 struct rpc_xprt *xprt = 2082 container_of(work, struct rpc_xprt, task_cleanup); 2083 2084 trace_xprt_destroy(xprt); 2085 2086 rpc_xprt_debugfs_unregister(xprt); 2087 rpc_destroy_wait_queue(&xprt->binding); 2088 rpc_destroy_wait_queue(&xprt->pending); 2089 rpc_destroy_wait_queue(&xprt->sending); 2090 rpc_destroy_wait_queue(&xprt->backlog); 2091 kfree(xprt->servername); 2092 /* 2093 * Destroy any existing back channel 2094 */ 2095 xprt_destroy_backchannel(xprt, UINT_MAX); 2096 2097 /* 2098 * Tear down transport state and free the rpc_xprt 2099 */ 2100 xprt->ops->destroy(xprt); 2101 } 2102 2103 /** 2104 * xprt_destroy - destroy an RPC transport, killing off all requests. 2105 * @xprt: transport to destroy 2106 * 2107 */ 2108 static void xprt_destroy(struct rpc_xprt *xprt) 2109 { 2110 /* 2111 * Exclude transport connect/disconnect handlers and autoclose 2112 */ 2113 wait_on_bit_lock(&xprt->state, XPRT_LOCKED, TASK_UNINTERRUPTIBLE); 2114 2115 del_timer_sync(&xprt->timer); 2116 2117 /* 2118 * Destroy sockets etc from the system workqueue so they can 2119 * safely flush receive work running on rpciod. 2120 */ 2121 INIT_WORK(&xprt->task_cleanup, xprt_destroy_cb); 2122 schedule_work(&xprt->task_cleanup); 2123 } 2124 2125 static void xprt_destroy_kref(struct kref *kref) 2126 { 2127 xprt_destroy(container_of(kref, struct rpc_xprt, kref)); 2128 } 2129 2130 /** 2131 * xprt_get - return a reference to an RPC transport. 2132 * @xprt: pointer to the transport 2133 * 2134 */ 2135 struct rpc_xprt *xprt_get(struct rpc_xprt *xprt) 2136 { 2137 if (xprt != NULL && kref_get_unless_zero(&xprt->kref)) 2138 return xprt; 2139 return NULL; 2140 } 2141 EXPORT_SYMBOL_GPL(xprt_get); 2142 2143 /** 2144 * xprt_put - release a reference to an RPC transport. 2145 * @xprt: pointer to the transport 2146 * 2147 */ 2148 void xprt_put(struct rpc_xprt *xprt) 2149 { 2150 if (xprt != NULL) 2151 kref_put(&xprt->kref, xprt_destroy_kref); 2152 } 2153 EXPORT_SYMBOL_GPL(xprt_put); 2154