1 // SPDX-License-Identifier: GPL-2.0-only 2 /* 3 * linux/net/sunrpc/xprt.c 4 * 5 * This is a generic RPC call interface supporting congestion avoidance, 6 * and asynchronous calls. 7 * 8 * The interface works like this: 9 * 10 * - When a process places a call, it allocates a request slot if 11 * one is available. Otherwise, it sleeps on the backlog queue 12 * (xprt_reserve). 13 * - Next, the caller puts together the RPC message, stuffs it into 14 * the request struct, and calls xprt_transmit(). 15 * - xprt_transmit sends the message and installs the caller on the 16 * transport's wait list. At the same time, if a reply is expected, 17 * it installs a timer that is run after the packet's timeout has 18 * expired. 19 * - When a packet arrives, the data_ready handler walks the list of 20 * pending requests for that transport. If a matching XID is found, the 21 * caller is woken up, and the timer removed. 22 * - When no reply arrives within the timeout interval, the timer is 23 * fired by the kernel and runs xprt_timer(). It either adjusts the 24 * timeout values (minor timeout) or wakes up the caller with a status 25 * of -ETIMEDOUT. 26 * - When the caller receives a notification from RPC that a reply arrived, 27 * it should release the RPC slot, and process the reply. 28 * If the call timed out, it may choose to retry the operation by 29 * adjusting the initial timeout value, and simply calling rpc_call 30 * again. 31 * 32 * Support for async RPC is done through a set of RPC-specific scheduling 33 * primitives that `transparently' work for processes as well as async 34 * tasks that rely on callbacks. 35 * 36 * Copyright (C) 1995-1997, Olaf Kirch <okir@monad.swb.de> 37 * 38 * Transport switch API copyright (C) 2005, Chuck Lever <cel@netapp.com> 39 */ 40 41 #include <linux/module.h> 42 43 #include <linux/types.h> 44 #include <linux/interrupt.h> 45 #include <linux/workqueue.h> 46 #include <linux/net.h> 47 #include <linux/ktime.h> 48 49 #include <linux/sunrpc/clnt.h> 50 #include <linux/sunrpc/metrics.h> 51 #include <linux/sunrpc/bc_xprt.h> 52 #include <linux/rcupdate.h> 53 #include <linux/sched/mm.h> 54 55 #include <trace/events/sunrpc.h> 56 57 #include "sunrpc.h" 58 #include "sysfs.h" 59 #include "fail.h" 60 61 /* 62 * Local variables 63 */ 64 65 #if IS_ENABLED(CONFIG_SUNRPC_DEBUG) 66 # define RPCDBG_FACILITY RPCDBG_XPRT 67 #endif 68 69 /* 70 * Local functions 71 */ 72 static void xprt_init(struct rpc_xprt *xprt, struct net *net); 73 static __be32 xprt_alloc_xid(struct rpc_xprt *xprt); 74 static void xprt_destroy(struct rpc_xprt *xprt); 75 static void xprt_request_init(struct rpc_task *task); 76 77 static DEFINE_SPINLOCK(xprt_list_lock); 78 static LIST_HEAD(xprt_list); 79 80 static unsigned long xprt_request_timeout(const struct rpc_rqst *req) 81 { 82 unsigned long timeout = jiffies + req->rq_timeout; 83 84 if (time_before(timeout, req->rq_majortimeo)) 85 return timeout; 86 return req->rq_majortimeo; 87 } 88 89 /** 90 * xprt_register_transport - register a transport implementation 91 * @transport: transport to register 92 * 93 * If a transport implementation is loaded as a kernel module, it can 94 * call this interface to make itself known to the RPC client. 95 * 96 * Returns: 97 * 0: transport successfully registered 98 * -EEXIST: transport already registered 99 * -EINVAL: transport module being unloaded 100 */ 101 int xprt_register_transport(struct xprt_class *transport) 102 { 103 struct xprt_class *t; 104 int result; 105 106 result = -EEXIST; 107 spin_lock(&xprt_list_lock); 108 list_for_each_entry(t, &xprt_list, list) { 109 /* don't register the same transport class twice */ 110 if (t->ident == transport->ident) 111 goto out; 112 } 113 114 list_add_tail(&transport->list, &xprt_list); 115 printk(KERN_INFO "RPC: Registered %s transport module.\n", 116 transport->name); 117 result = 0; 118 119 out: 120 spin_unlock(&xprt_list_lock); 121 return result; 122 } 123 EXPORT_SYMBOL_GPL(xprt_register_transport); 124 125 /** 126 * xprt_unregister_transport - unregister a transport implementation 127 * @transport: transport to unregister 128 * 129 * Returns: 130 * 0: transport successfully unregistered 131 * -ENOENT: transport never registered 132 */ 133 int xprt_unregister_transport(struct xprt_class *transport) 134 { 135 struct xprt_class *t; 136 int result; 137 138 result = 0; 139 spin_lock(&xprt_list_lock); 140 list_for_each_entry(t, &xprt_list, list) { 141 if (t == transport) { 142 printk(KERN_INFO 143 "RPC: Unregistered %s transport module.\n", 144 transport->name); 145 list_del_init(&transport->list); 146 goto out; 147 } 148 } 149 result = -ENOENT; 150 151 out: 152 spin_unlock(&xprt_list_lock); 153 return result; 154 } 155 EXPORT_SYMBOL_GPL(xprt_unregister_transport); 156 157 static void 158 xprt_class_release(const struct xprt_class *t) 159 { 160 module_put(t->owner); 161 } 162 163 static const struct xprt_class * 164 xprt_class_find_by_ident_locked(int ident) 165 { 166 const struct xprt_class *t; 167 168 list_for_each_entry(t, &xprt_list, list) { 169 if (t->ident != ident) 170 continue; 171 if (!try_module_get(t->owner)) 172 continue; 173 return t; 174 } 175 return NULL; 176 } 177 178 static const struct xprt_class * 179 xprt_class_find_by_ident(int ident) 180 { 181 const struct xprt_class *t; 182 183 spin_lock(&xprt_list_lock); 184 t = xprt_class_find_by_ident_locked(ident); 185 spin_unlock(&xprt_list_lock); 186 return t; 187 } 188 189 static const struct xprt_class * 190 xprt_class_find_by_netid_locked(const char *netid) 191 { 192 const struct xprt_class *t; 193 unsigned int i; 194 195 list_for_each_entry(t, &xprt_list, list) { 196 for (i = 0; t->netid[i][0] != '\0'; i++) { 197 if (strcmp(t->netid[i], netid) != 0) 198 continue; 199 if (!try_module_get(t->owner)) 200 continue; 201 return t; 202 } 203 } 204 return NULL; 205 } 206 207 static const struct xprt_class * 208 xprt_class_find_by_netid(const char *netid) 209 { 210 const struct xprt_class *t; 211 212 spin_lock(&xprt_list_lock); 213 t = xprt_class_find_by_netid_locked(netid); 214 if (!t) { 215 spin_unlock(&xprt_list_lock); 216 request_module("rpc%s", netid); 217 spin_lock(&xprt_list_lock); 218 t = xprt_class_find_by_netid_locked(netid); 219 } 220 spin_unlock(&xprt_list_lock); 221 return t; 222 } 223 224 /** 225 * xprt_find_transport_ident - convert a netid into a transport identifier 226 * @netid: transport to load 227 * 228 * Returns: 229 * > 0: transport identifier 230 * -ENOENT: transport module not available 231 */ 232 int xprt_find_transport_ident(const char *netid) 233 { 234 const struct xprt_class *t; 235 int ret; 236 237 t = xprt_class_find_by_netid(netid); 238 if (!t) 239 return -ENOENT; 240 ret = t->ident; 241 xprt_class_release(t); 242 return ret; 243 } 244 EXPORT_SYMBOL_GPL(xprt_find_transport_ident); 245 246 static void xprt_clear_locked(struct rpc_xprt *xprt) 247 { 248 xprt->snd_task = NULL; 249 if (!test_bit(XPRT_CLOSE_WAIT, &xprt->state)) { 250 smp_mb__before_atomic(); 251 clear_bit(XPRT_LOCKED, &xprt->state); 252 smp_mb__after_atomic(); 253 } else 254 queue_work(xprtiod_workqueue, &xprt->task_cleanup); 255 } 256 257 /** 258 * xprt_reserve_xprt - serialize write access to transports 259 * @task: task that is requesting access to the transport 260 * @xprt: pointer to the target transport 261 * 262 * This prevents mixing the payload of separate requests, and prevents 263 * transport connects from colliding with writes. No congestion control 264 * is provided. 265 */ 266 int xprt_reserve_xprt(struct rpc_xprt *xprt, struct rpc_task *task) 267 { 268 struct rpc_rqst *req = task->tk_rqstp; 269 270 if (test_and_set_bit(XPRT_LOCKED, &xprt->state)) { 271 if (task == xprt->snd_task) 272 goto out_locked; 273 goto out_sleep; 274 } 275 if (test_bit(XPRT_WRITE_SPACE, &xprt->state)) 276 goto out_unlock; 277 xprt->snd_task = task; 278 279 out_locked: 280 trace_xprt_reserve_xprt(xprt, task); 281 return 1; 282 283 out_unlock: 284 xprt_clear_locked(xprt); 285 out_sleep: 286 task->tk_status = -EAGAIN; 287 if (RPC_IS_SOFT(task)) 288 rpc_sleep_on_timeout(&xprt->sending, task, NULL, 289 xprt_request_timeout(req)); 290 else 291 rpc_sleep_on(&xprt->sending, task, NULL); 292 return 0; 293 } 294 EXPORT_SYMBOL_GPL(xprt_reserve_xprt); 295 296 static bool 297 xprt_need_congestion_window_wait(struct rpc_xprt *xprt) 298 { 299 return test_bit(XPRT_CWND_WAIT, &xprt->state); 300 } 301 302 static void 303 xprt_set_congestion_window_wait(struct rpc_xprt *xprt) 304 { 305 if (!list_empty(&xprt->xmit_queue)) { 306 /* Peek at head of queue to see if it can make progress */ 307 if (list_first_entry(&xprt->xmit_queue, struct rpc_rqst, 308 rq_xmit)->rq_cong) 309 return; 310 } 311 set_bit(XPRT_CWND_WAIT, &xprt->state); 312 } 313 314 static void 315 xprt_test_and_clear_congestion_window_wait(struct rpc_xprt *xprt) 316 { 317 if (!RPCXPRT_CONGESTED(xprt)) 318 clear_bit(XPRT_CWND_WAIT, &xprt->state); 319 } 320 321 /* 322 * xprt_reserve_xprt_cong - serialize write access to transports 323 * @task: task that is requesting access to the transport 324 * 325 * Same as xprt_reserve_xprt, but Van Jacobson congestion control is 326 * integrated into the decision of whether a request is allowed to be 327 * woken up and given access to the transport. 328 * Note that the lock is only granted if we know there are free slots. 329 */ 330 int xprt_reserve_xprt_cong(struct rpc_xprt *xprt, struct rpc_task *task) 331 { 332 struct rpc_rqst *req = task->tk_rqstp; 333 334 if (test_and_set_bit(XPRT_LOCKED, &xprt->state)) { 335 if (task == xprt->snd_task) 336 goto out_locked; 337 goto out_sleep; 338 } 339 if (req == NULL) { 340 xprt->snd_task = task; 341 goto out_locked; 342 } 343 if (test_bit(XPRT_WRITE_SPACE, &xprt->state)) 344 goto out_unlock; 345 if (!xprt_need_congestion_window_wait(xprt)) { 346 xprt->snd_task = task; 347 goto out_locked; 348 } 349 out_unlock: 350 xprt_clear_locked(xprt); 351 out_sleep: 352 task->tk_status = -EAGAIN; 353 if (RPC_IS_SOFT(task)) 354 rpc_sleep_on_timeout(&xprt->sending, task, NULL, 355 xprt_request_timeout(req)); 356 else 357 rpc_sleep_on(&xprt->sending, task, NULL); 358 return 0; 359 out_locked: 360 trace_xprt_reserve_cong(xprt, task); 361 return 1; 362 } 363 EXPORT_SYMBOL_GPL(xprt_reserve_xprt_cong); 364 365 static inline int xprt_lock_write(struct rpc_xprt *xprt, struct rpc_task *task) 366 { 367 int retval; 368 369 if (test_bit(XPRT_LOCKED, &xprt->state) && xprt->snd_task == task) 370 return 1; 371 spin_lock(&xprt->transport_lock); 372 retval = xprt->ops->reserve_xprt(xprt, task); 373 spin_unlock(&xprt->transport_lock); 374 return retval; 375 } 376 377 static bool __xprt_lock_write_func(struct rpc_task *task, void *data) 378 { 379 struct rpc_xprt *xprt = data; 380 381 xprt->snd_task = task; 382 return true; 383 } 384 385 static void __xprt_lock_write_next(struct rpc_xprt *xprt) 386 { 387 if (test_and_set_bit(XPRT_LOCKED, &xprt->state)) 388 return; 389 if (test_bit(XPRT_WRITE_SPACE, &xprt->state)) 390 goto out_unlock; 391 if (rpc_wake_up_first_on_wq(xprtiod_workqueue, &xprt->sending, 392 __xprt_lock_write_func, xprt)) 393 return; 394 out_unlock: 395 xprt_clear_locked(xprt); 396 } 397 398 static void __xprt_lock_write_next_cong(struct rpc_xprt *xprt) 399 { 400 if (test_and_set_bit(XPRT_LOCKED, &xprt->state)) 401 return; 402 if (test_bit(XPRT_WRITE_SPACE, &xprt->state)) 403 goto out_unlock; 404 if (xprt_need_congestion_window_wait(xprt)) 405 goto out_unlock; 406 if (rpc_wake_up_first_on_wq(xprtiod_workqueue, &xprt->sending, 407 __xprt_lock_write_func, xprt)) 408 return; 409 out_unlock: 410 xprt_clear_locked(xprt); 411 } 412 413 /** 414 * xprt_release_xprt - allow other requests to use a transport 415 * @xprt: transport with other tasks potentially waiting 416 * @task: task that is releasing access to the transport 417 * 418 * Note that "task" can be NULL. No congestion control is provided. 419 */ 420 void xprt_release_xprt(struct rpc_xprt *xprt, struct rpc_task *task) 421 { 422 if (xprt->snd_task == task) { 423 xprt_clear_locked(xprt); 424 __xprt_lock_write_next(xprt); 425 } 426 trace_xprt_release_xprt(xprt, task); 427 } 428 EXPORT_SYMBOL_GPL(xprt_release_xprt); 429 430 /** 431 * xprt_release_xprt_cong - allow other requests to use a transport 432 * @xprt: transport with other tasks potentially waiting 433 * @task: task that is releasing access to the transport 434 * 435 * Note that "task" can be NULL. Another task is awoken to use the 436 * transport if the transport's congestion window allows it. 437 */ 438 void xprt_release_xprt_cong(struct rpc_xprt *xprt, struct rpc_task *task) 439 { 440 if (xprt->snd_task == task) { 441 xprt_clear_locked(xprt); 442 __xprt_lock_write_next_cong(xprt); 443 } 444 trace_xprt_release_cong(xprt, task); 445 } 446 EXPORT_SYMBOL_GPL(xprt_release_xprt_cong); 447 448 void xprt_release_write(struct rpc_xprt *xprt, struct rpc_task *task) 449 { 450 if (xprt->snd_task != task) 451 return; 452 spin_lock(&xprt->transport_lock); 453 xprt->ops->release_xprt(xprt, task); 454 spin_unlock(&xprt->transport_lock); 455 } 456 457 /* 458 * Van Jacobson congestion avoidance. Check if the congestion window 459 * overflowed. Put the task to sleep if this is the case. 460 */ 461 static int 462 __xprt_get_cong(struct rpc_xprt *xprt, struct rpc_rqst *req) 463 { 464 if (req->rq_cong) 465 return 1; 466 trace_xprt_get_cong(xprt, req->rq_task); 467 if (RPCXPRT_CONGESTED(xprt)) { 468 xprt_set_congestion_window_wait(xprt); 469 return 0; 470 } 471 req->rq_cong = 1; 472 xprt->cong += RPC_CWNDSCALE; 473 return 1; 474 } 475 476 /* 477 * Adjust the congestion window, and wake up the next task 478 * that has been sleeping due to congestion 479 */ 480 static void 481 __xprt_put_cong(struct rpc_xprt *xprt, struct rpc_rqst *req) 482 { 483 if (!req->rq_cong) 484 return; 485 req->rq_cong = 0; 486 xprt->cong -= RPC_CWNDSCALE; 487 xprt_test_and_clear_congestion_window_wait(xprt); 488 trace_xprt_put_cong(xprt, req->rq_task); 489 __xprt_lock_write_next_cong(xprt); 490 } 491 492 /** 493 * xprt_request_get_cong - Request congestion control credits 494 * @xprt: pointer to transport 495 * @req: pointer to RPC request 496 * 497 * Useful for transports that require congestion control. 498 */ 499 bool 500 xprt_request_get_cong(struct rpc_xprt *xprt, struct rpc_rqst *req) 501 { 502 bool ret = false; 503 504 if (req->rq_cong) 505 return true; 506 spin_lock(&xprt->transport_lock); 507 ret = __xprt_get_cong(xprt, req) != 0; 508 spin_unlock(&xprt->transport_lock); 509 return ret; 510 } 511 EXPORT_SYMBOL_GPL(xprt_request_get_cong); 512 513 /** 514 * xprt_release_rqst_cong - housekeeping when request is complete 515 * @task: RPC request that recently completed 516 * 517 * Useful for transports that require congestion control. 518 */ 519 void xprt_release_rqst_cong(struct rpc_task *task) 520 { 521 struct rpc_rqst *req = task->tk_rqstp; 522 523 __xprt_put_cong(req->rq_xprt, req); 524 } 525 EXPORT_SYMBOL_GPL(xprt_release_rqst_cong); 526 527 static void xprt_clear_congestion_window_wait_locked(struct rpc_xprt *xprt) 528 { 529 if (test_and_clear_bit(XPRT_CWND_WAIT, &xprt->state)) 530 __xprt_lock_write_next_cong(xprt); 531 } 532 533 /* 534 * Clear the congestion window wait flag and wake up the next 535 * entry on xprt->sending 536 */ 537 static void 538 xprt_clear_congestion_window_wait(struct rpc_xprt *xprt) 539 { 540 if (test_and_clear_bit(XPRT_CWND_WAIT, &xprt->state)) { 541 spin_lock(&xprt->transport_lock); 542 __xprt_lock_write_next_cong(xprt); 543 spin_unlock(&xprt->transport_lock); 544 } 545 } 546 547 /** 548 * xprt_adjust_cwnd - adjust transport congestion window 549 * @xprt: pointer to xprt 550 * @task: recently completed RPC request used to adjust window 551 * @result: result code of completed RPC request 552 * 553 * The transport code maintains an estimate on the maximum number of out- 554 * standing RPC requests, using a smoothed version of the congestion 555 * avoidance implemented in 44BSD. This is basically the Van Jacobson 556 * congestion algorithm: If a retransmit occurs, the congestion window is 557 * halved; otherwise, it is incremented by 1/cwnd when 558 * 559 * - a reply is received and 560 * - a full number of requests are outstanding and 561 * - the congestion window hasn't been updated recently. 562 */ 563 void xprt_adjust_cwnd(struct rpc_xprt *xprt, struct rpc_task *task, int result) 564 { 565 struct rpc_rqst *req = task->tk_rqstp; 566 unsigned long cwnd = xprt->cwnd; 567 568 if (result >= 0 && cwnd <= xprt->cong) { 569 /* The (cwnd >> 1) term makes sure 570 * the result gets rounded properly. */ 571 cwnd += (RPC_CWNDSCALE * RPC_CWNDSCALE + (cwnd >> 1)) / cwnd; 572 if (cwnd > RPC_MAXCWND(xprt)) 573 cwnd = RPC_MAXCWND(xprt); 574 __xprt_lock_write_next_cong(xprt); 575 } else if (result == -ETIMEDOUT) { 576 cwnd >>= 1; 577 if (cwnd < RPC_CWNDSCALE) 578 cwnd = RPC_CWNDSCALE; 579 } 580 dprintk("RPC: cong %ld, cwnd was %ld, now %ld\n", 581 xprt->cong, xprt->cwnd, cwnd); 582 xprt->cwnd = cwnd; 583 __xprt_put_cong(xprt, req); 584 } 585 EXPORT_SYMBOL_GPL(xprt_adjust_cwnd); 586 587 /** 588 * xprt_wake_pending_tasks - wake all tasks on a transport's pending queue 589 * @xprt: transport with waiting tasks 590 * @status: result code to plant in each task before waking it 591 * 592 */ 593 void xprt_wake_pending_tasks(struct rpc_xprt *xprt, int status) 594 { 595 if (status < 0) 596 rpc_wake_up_status(&xprt->pending, status); 597 else 598 rpc_wake_up(&xprt->pending); 599 } 600 EXPORT_SYMBOL_GPL(xprt_wake_pending_tasks); 601 602 /** 603 * xprt_wait_for_buffer_space - wait for transport output buffer to clear 604 * @xprt: transport 605 * 606 * Note that we only set the timer for the case of RPC_IS_SOFT(), since 607 * we don't in general want to force a socket disconnection due to 608 * an incomplete RPC call transmission. 609 */ 610 void xprt_wait_for_buffer_space(struct rpc_xprt *xprt) 611 { 612 set_bit(XPRT_WRITE_SPACE, &xprt->state); 613 } 614 EXPORT_SYMBOL_GPL(xprt_wait_for_buffer_space); 615 616 static bool 617 xprt_clear_write_space_locked(struct rpc_xprt *xprt) 618 { 619 if (test_and_clear_bit(XPRT_WRITE_SPACE, &xprt->state)) { 620 __xprt_lock_write_next(xprt); 621 dprintk("RPC: write space: waking waiting task on " 622 "xprt %p\n", xprt); 623 return true; 624 } 625 return false; 626 } 627 628 /** 629 * xprt_write_space - wake the task waiting for transport output buffer space 630 * @xprt: transport with waiting tasks 631 * 632 * Can be called in a soft IRQ context, so xprt_write_space never sleeps. 633 */ 634 bool xprt_write_space(struct rpc_xprt *xprt) 635 { 636 bool ret; 637 638 if (!test_bit(XPRT_WRITE_SPACE, &xprt->state)) 639 return false; 640 spin_lock(&xprt->transport_lock); 641 ret = xprt_clear_write_space_locked(xprt); 642 spin_unlock(&xprt->transport_lock); 643 return ret; 644 } 645 EXPORT_SYMBOL_GPL(xprt_write_space); 646 647 static unsigned long xprt_abs_ktime_to_jiffies(ktime_t abstime) 648 { 649 s64 delta = ktime_to_ns(ktime_get() - abstime); 650 return likely(delta >= 0) ? 651 jiffies - nsecs_to_jiffies(delta) : 652 jiffies + nsecs_to_jiffies(-delta); 653 } 654 655 static unsigned long xprt_calc_majortimeo(struct rpc_rqst *req) 656 { 657 const struct rpc_timeout *to = req->rq_task->tk_client->cl_timeout; 658 unsigned long majortimeo = req->rq_timeout; 659 660 if (to->to_exponential) 661 majortimeo <<= to->to_retries; 662 else 663 majortimeo += to->to_increment * to->to_retries; 664 if (majortimeo > to->to_maxval || majortimeo == 0) 665 majortimeo = to->to_maxval; 666 return majortimeo; 667 } 668 669 static void xprt_reset_majortimeo(struct rpc_rqst *req) 670 { 671 req->rq_majortimeo += xprt_calc_majortimeo(req); 672 } 673 674 static void xprt_reset_minortimeo(struct rpc_rqst *req) 675 { 676 req->rq_minortimeo += req->rq_timeout; 677 } 678 679 static void xprt_init_majortimeo(struct rpc_task *task, struct rpc_rqst *req) 680 { 681 unsigned long time_init; 682 struct rpc_xprt *xprt = req->rq_xprt; 683 684 if (likely(xprt && xprt_connected(xprt))) 685 time_init = jiffies; 686 else 687 time_init = xprt_abs_ktime_to_jiffies(task->tk_start); 688 req->rq_timeout = task->tk_client->cl_timeout->to_initval; 689 req->rq_majortimeo = time_init + xprt_calc_majortimeo(req); 690 req->rq_minortimeo = time_init + req->rq_timeout; 691 } 692 693 /** 694 * xprt_adjust_timeout - adjust timeout values for next retransmit 695 * @req: RPC request containing parameters to use for the adjustment 696 * 697 */ 698 int xprt_adjust_timeout(struct rpc_rqst *req) 699 { 700 struct rpc_xprt *xprt = req->rq_xprt; 701 const struct rpc_timeout *to = req->rq_task->tk_client->cl_timeout; 702 int status = 0; 703 704 if (time_before(jiffies, req->rq_majortimeo)) { 705 if (time_before(jiffies, req->rq_minortimeo)) 706 return status; 707 if (to->to_exponential) 708 req->rq_timeout <<= 1; 709 else 710 req->rq_timeout += to->to_increment; 711 if (to->to_maxval && req->rq_timeout >= to->to_maxval) 712 req->rq_timeout = to->to_maxval; 713 req->rq_retries++; 714 } else { 715 req->rq_timeout = to->to_initval; 716 req->rq_retries = 0; 717 xprt_reset_majortimeo(req); 718 /* Reset the RTT counters == "slow start" */ 719 spin_lock(&xprt->transport_lock); 720 rpc_init_rtt(req->rq_task->tk_client->cl_rtt, to->to_initval); 721 spin_unlock(&xprt->transport_lock); 722 status = -ETIMEDOUT; 723 } 724 xprt_reset_minortimeo(req); 725 726 if (req->rq_timeout == 0) { 727 printk(KERN_WARNING "xprt_adjust_timeout: rq_timeout = 0!\n"); 728 req->rq_timeout = 5 * HZ; 729 } 730 return status; 731 } 732 733 static void xprt_autoclose(struct work_struct *work) 734 { 735 struct rpc_xprt *xprt = 736 container_of(work, struct rpc_xprt, task_cleanup); 737 unsigned int pflags = memalloc_nofs_save(); 738 739 trace_xprt_disconnect_auto(xprt); 740 clear_bit(XPRT_CLOSE_WAIT, &xprt->state); 741 xprt->ops->close(xprt); 742 xprt_release_write(xprt, NULL); 743 wake_up_bit(&xprt->state, XPRT_LOCKED); 744 memalloc_nofs_restore(pflags); 745 } 746 747 /** 748 * xprt_disconnect_done - mark a transport as disconnected 749 * @xprt: transport to flag for disconnect 750 * 751 */ 752 void xprt_disconnect_done(struct rpc_xprt *xprt) 753 { 754 trace_xprt_disconnect_done(xprt); 755 spin_lock(&xprt->transport_lock); 756 xprt_clear_connected(xprt); 757 xprt_clear_write_space_locked(xprt); 758 xprt_clear_congestion_window_wait_locked(xprt); 759 xprt_wake_pending_tasks(xprt, -ENOTCONN); 760 spin_unlock(&xprt->transport_lock); 761 } 762 EXPORT_SYMBOL_GPL(xprt_disconnect_done); 763 764 /** 765 * xprt_schedule_autoclose_locked - Try to schedule an autoclose RPC call 766 * @xprt: transport to disconnect 767 */ 768 static void xprt_schedule_autoclose_locked(struct rpc_xprt *xprt) 769 { 770 set_bit(XPRT_CLOSE_WAIT, &xprt->state); 771 if (test_and_set_bit(XPRT_LOCKED, &xprt->state) == 0) 772 queue_work(xprtiod_workqueue, &xprt->task_cleanup); 773 else if (xprt->snd_task && !test_bit(XPRT_SND_IS_COOKIE, &xprt->state)) 774 rpc_wake_up_queued_task_set_status(&xprt->pending, 775 xprt->snd_task, -ENOTCONN); 776 } 777 778 /** 779 * xprt_force_disconnect - force a transport to disconnect 780 * @xprt: transport to disconnect 781 * 782 */ 783 void xprt_force_disconnect(struct rpc_xprt *xprt) 784 { 785 trace_xprt_disconnect_force(xprt); 786 787 /* Don't race with the test_bit() in xprt_clear_locked() */ 788 spin_lock(&xprt->transport_lock); 789 xprt_schedule_autoclose_locked(xprt); 790 spin_unlock(&xprt->transport_lock); 791 } 792 EXPORT_SYMBOL_GPL(xprt_force_disconnect); 793 794 static unsigned int 795 xprt_connect_cookie(struct rpc_xprt *xprt) 796 { 797 return READ_ONCE(xprt->connect_cookie); 798 } 799 800 static bool 801 xprt_request_retransmit_after_disconnect(struct rpc_task *task) 802 { 803 struct rpc_rqst *req = task->tk_rqstp; 804 struct rpc_xprt *xprt = req->rq_xprt; 805 806 return req->rq_connect_cookie != xprt_connect_cookie(xprt) || 807 !xprt_connected(xprt); 808 } 809 810 /** 811 * xprt_conditional_disconnect - force a transport to disconnect 812 * @xprt: transport to disconnect 813 * @cookie: 'connection cookie' 814 * 815 * This attempts to break the connection if and only if 'cookie' matches 816 * the current transport 'connection cookie'. It ensures that we don't 817 * try to break the connection more than once when we need to retransmit 818 * a batch of RPC requests. 819 * 820 */ 821 void xprt_conditional_disconnect(struct rpc_xprt *xprt, unsigned int cookie) 822 { 823 /* Don't race with the test_bit() in xprt_clear_locked() */ 824 spin_lock(&xprt->transport_lock); 825 if (cookie != xprt->connect_cookie) 826 goto out; 827 if (test_bit(XPRT_CLOSING, &xprt->state)) 828 goto out; 829 xprt_schedule_autoclose_locked(xprt); 830 out: 831 spin_unlock(&xprt->transport_lock); 832 } 833 834 static bool 835 xprt_has_timer(const struct rpc_xprt *xprt) 836 { 837 return xprt->idle_timeout != 0; 838 } 839 840 static void 841 xprt_schedule_autodisconnect(struct rpc_xprt *xprt) 842 __must_hold(&xprt->transport_lock) 843 { 844 xprt->last_used = jiffies; 845 if (RB_EMPTY_ROOT(&xprt->recv_queue) && xprt_has_timer(xprt)) 846 mod_timer(&xprt->timer, xprt->last_used + xprt->idle_timeout); 847 } 848 849 static void 850 xprt_init_autodisconnect(struct timer_list *t) 851 { 852 struct rpc_xprt *xprt = from_timer(xprt, t, timer); 853 854 if (!RB_EMPTY_ROOT(&xprt->recv_queue)) 855 return; 856 /* Reset xprt->last_used to avoid connect/autodisconnect cycling */ 857 xprt->last_used = jiffies; 858 if (test_and_set_bit(XPRT_LOCKED, &xprt->state)) 859 return; 860 queue_work(xprtiod_workqueue, &xprt->task_cleanup); 861 } 862 863 #if IS_ENABLED(CONFIG_FAIL_SUNRPC) 864 static void xprt_inject_disconnect(struct rpc_xprt *xprt) 865 { 866 if (!fail_sunrpc.ignore_client_disconnect && 867 should_fail(&fail_sunrpc.attr, 1)) 868 xprt->ops->inject_disconnect(xprt); 869 } 870 #else 871 static inline void xprt_inject_disconnect(struct rpc_xprt *xprt) 872 { 873 } 874 #endif 875 876 bool xprt_lock_connect(struct rpc_xprt *xprt, 877 struct rpc_task *task, 878 void *cookie) 879 { 880 bool ret = false; 881 882 spin_lock(&xprt->transport_lock); 883 if (!test_bit(XPRT_LOCKED, &xprt->state)) 884 goto out; 885 if (xprt->snd_task != task) 886 goto out; 887 set_bit(XPRT_SND_IS_COOKIE, &xprt->state); 888 xprt->snd_task = cookie; 889 ret = true; 890 out: 891 spin_unlock(&xprt->transport_lock); 892 return ret; 893 } 894 EXPORT_SYMBOL_GPL(xprt_lock_connect); 895 896 void xprt_unlock_connect(struct rpc_xprt *xprt, void *cookie) 897 { 898 spin_lock(&xprt->transport_lock); 899 if (xprt->snd_task != cookie) 900 goto out; 901 if (!test_bit(XPRT_LOCKED, &xprt->state)) 902 goto out; 903 xprt->snd_task =NULL; 904 clear_bit(XPRT_SND_IS_COOKIE, &xprt->state); 905 xprt->ops->release_xprt(xprt, NULL); 906 xprt_schedule_autodisconnect(xprt); 907 out: 908 spin_unlock(&xprt->transport_lock); 909 wake_up_bit(&xprt->state, XPRT_LOCKED); 910 } 911 EXPORT_SYMBOL_GPL(xprt_unlock_connect); 912 913 /** 914 * xprt_connect - schedule a transport connect operation 915 * @task: RPC task that is requesting the connect 916 * 917 */ 918 void xprt_connect(struct rpc_task *task) 919 { 920 struct rpc_xprt *xprt = task->tk_rqstp->rq_xprt; 921 922 trace_xprt_connect(xprt); 923 924 if (!xprt_bound(xprt)) { 925 task->tk_status = -EAGAIN; 926 return; 927 } 928 if (!xprt_lock_write(xprt, task)) 929 return; 930 931 if (test_and_clear_bit(XPRT_CLOSE_WAIT, &xprt->state)) { 932 trace_xprt_disconnect_cleanup(xprt); 933 xprt->ops->close(xprt); 934 } 935 936 if (!xprt_connected(xprt)) { 937 task->tk_rqstp->rq_connect_cookie = xprt->connect_cookie; 938 rpc_sleep_on_timeout(&xprt->pending, task, NULL, 939 xprt_request_timeout(task->tk_rqstp)); 940 941 if (test_bit(XPRT_CLOSING, &xprt->state)) 942 return; 943 if (xprt_test_and_set_connecting(xprt)) 944 return; 945 /* Race breaker */ 946 if (!xprt_connected(xprt)) { 947 xprt->stat.connect_start = jiffies; 948 xprt->ops->connect(xprt, task); 949 } else { 950 xprt_clear_connecting(xprt); 951 task->tk_status = 0; 952 rpc_wake_up_queued_task(&xprt->pending, task); 953 } 954 } 955 xprt_release_write(xprt, task); 956 } 957 958 /** 959 * xprt_reconnect_delay - compute the wait before scheduling a connect 960 * @xprt: transport instance 961 * 962 */ 963 unsigned long xprt_reconnect_delay(const struct rpc_xprt *xprt) 964 { 965 unsigned long start, now = jiffies; 966 967 start = xprt->stat.connect_start + xprt->reestablish_timeout; 968 if (time_after(start, now)) 969 return start - now; 970 return 0; 971 } 972 EXPORT_SYMBOL_GPL(xprt_reconnect_delay); 973 974 /** 975 * xprt_reconnect_backoff - compute the new re-establish timeout 976 * @xprt: transport instance 977 * @init_to: initial reestablish timeout 978 * 979 */ 980 void xprt_reconnect_backoff(struct rpc_xprt *xprt, unsigned long init_to) 981 { 982 xprt->reestablish_timeout <<= 1; 983 if (xprt->reestablish_timeout > xprt->max_reconnect_timeout) 984 xprt->reestablish_timeout = xprt->max_reconnect_timeout; 985 if (xprt->reestablish_timeout < init_to) 986 xprt->reestablish_timeout = init_to; 987 } 988 EXPORT_SYMBOL_GPL(xprt_reconnect_backoff); 989 990 enum xprt_xid_rb_cmp { 991 XID_RB_EQUAL, 992 XID_RB_LEFT, 993 XID_RB_RIGHT, 994 }; 995 static enum xprt_xid_rb_cmp 996 xprt_xid_cmp(__be32 xid1, __be32 xid2) 997 { 998 if (xid1 == xid2) 999 return XID_RB_EQUAL; 1000 if ((__force u32)xid1 < (__force u32)xid2) 1001 return XID_RB_LEFT; 1002 return XID_RB_RIGHT; 1003 } 1004 1005 static struct rpc_rqst * 1006 xprt_request_rb_find(struct rpc_xprt *xprt, __be32 xid) 1007 { 1008 struct rb_node *n = xprt->recv_queue.rb_node; 1009 struct rpc_rqst *req; 1010 1011 while (n != NULL) { 1012 req = rb_entry(n, struct rpc_rqst, rq_recv); 1013 switch (xprt_xid_cmp(xid, req->rq_xid)) { 1014 case XID_RB_LEFT: 1015 n = n->rb_left; 1016 break; 1017 case XID_RB_RIGHT: 1018 n = n->rb_right; 1019 break; 1020 case XID_RB_EQUAL: 1021 return req; 1022 } 1023 } 1024 return NULL; 1025 } 1026 1027 static void 1028 xprt_request_rb_insert(struct rpc_xprt *xprt, struct rpc_rqst *new) 1029 { 1030 struct rb_node **p = &xprt->recv_queue.rb_node; 1031 struct rb_node *n = NULL; 1032 struct rpc_rqst *req; 1033 1034 while (*p != NULL) { 1035 n = *p; 1036 req = rb_entry(n, struct rpc_rqst, rq_recv); 1037 switch(xprt_xid_cmp(new->rq_xid, req->rq_xid)) { 1038 case XID_RB_LEFT: 1039 p = &n->rb_left; 1040 break; 1041 case XID_RB_RIGHT: 1042 p = &n->rb_right; 1043 break; 1044 case XID_RB_EQUAL: 1045 WARN_ON_ONCE(new != req); 1046 return; 1047 } 1048 } 1049 rb_link_node(&new->rq_recv, n, p); 1050 rb_insert_color(&new->rq_recv, &xprt->recv_queue); 1051 } 1052 1053 static void 1054 xprt_request_rb_remove(struct rpc_xprt *xprt, struct rpc_rqst *req) 1055 { 1056 rb_erase(&req->rq_recv, &xprt->recv_queue); 1057 } 1058 1059 /** 1060 * xprt_lookup_rqst - find an RPC request corresponding to an XID 1061 * @xprt: transport on which the original request was transmitted 1062 * @xid: RPC XID of incoming reply 1063 * 1064 * Caller holds xprt->queue_lock. 1065 */ 1066 struct rpc_rqst *xprt_lookup_rqst(struct rpc_xprt *xprt, __be32 xid) 1067 { 1068 struct rpc_rqst *entry; 1069 1070 entry = xprt_request_rb_find(xprt, xid); 1071 if (entry != NULL) { 1072 trace_xprt_lookup_rqst(xprt, xid, 0); 1073 entry->rq_rtt = ktime_sub(ktime_get(), entry->rq_xtime); 1074 return entry; 1075 } 1076 1077 dprintk("RPC: xprt_lookup_rqst did not find xid %08x\n", 1078 ntohl(xid)); 1079 trace_xprt_lookup_rqst(xprt, xid, -ENOENT); 1080 xprt->stat.bad_xids++; 1081 return NULL; 1082 } 1083 EXPORT_SYMBOL_GPL(xprt_lookup_rqst); 1084 1085 static bool 1086 xprt_is_pinned_rqst(struct rpc_rqst *req) 1087 { 1088 return atomic_read(&req->rq_pin) != 0; 1089 } 1090 1091 /** 1092 * xprt_pin_rqst - Pin a request on the transport receive list 1093 * @req: Request to pin 1094 * 1095 * Caller must ensure this is atomic with the call to xprt_lookup_rqst() 1096 * so should be holding xprt->queue_lock. 1097 */ 1098 void xprt_pin_rqst(struct rpc_rqst *req) 1099 { 1100 atomic_inc(&req->rq_pin); 1101 } 1102 EXPORT_SYMBOL_GPL(xprt_pin_rqst); 1103 1104 /** 1105 * xprt_unpin_rqst - Unpin a request on the transport receive list 1106 * @req: Request to pin 1107 * 1108 * Caller should be holding xprt->queue_lock. 1109 */ 1110 void xprt_unpin_rqst(struct rpc_rqst *req) 1111 { 1112 if (!test_bit(RPC_TASK_MSG_PIN_WAIT, &req->rq_task->tk_runstate)) { 1113 atomic_dec(&req->rq_pin); 1114 return; 1115 } 1116 if (atomic_dec_and_test(&req->rq_pin)) 1117 wake_up_var(&req->rq_pin); 1118 } 1119 EXPORT_SYMBOL_GPL(xprt_unpin_rqst); 1120 1121 static void xprt_wait_on_pinned_rqst(struct rpc_rqst *req) 1122 { 1123 wait_var_event(&req->rq_pin, !xprt_is_pinned_rqst(req)); 1124 } 1125 1126 static bool 1127 xprt_request_data_received(struct rpc_task *task) 1128 { 1129 return !test_bit(RPC_TASK_NEED_RECV, &task->tk_runstate) && 1130 READ_ONCE(task->tk_rqstp->rq_reply_bytes_recvd) != 0; 1131 } 1132 1133 static bool 1134 xprt_request_need_enqueue_receive(struct rpc_task *task, struct rpc_rqst *req) 1135 { 1136 return !test_bit(RPC_TASK_NEED_RECV, &task->tk_runstate) && 1137 READ_ONCE(task->tk_rqstp->rq_reply_bytes_recvd) == 0; 1138 } 1139 1140 /** 1141 * xprt_request_enqueue_receive - Add an request to the receive queue 1142 * @task: RPC task 1143 * 1144 */ 1145 void 1146 xprt_request_enqueue_receive(struct rpc_task *task) 1147 { 1148 struct rpc_rqst *req = task->tk_rqstp; 1149 struct rpc_xprt *xprt = req->rq_xprt; 1150 1151 if (!xprt_request_need_enqueue_receive(task, req)) 1152 return; 1153 1154 xprt_request_prepare(task->tk_rqstp); 1155 spin_lock(&xprt->queue_lock); 1156 1157 /* Update the softirq receive buffer */ 1158 memcpy(&req->rq_private_buf, &req->rq_rcv_buf, 1159 sizeof(req->rq_private_buf)); 1160 1161 /* Add request to the receive list */ 1162 xprt_request_rb_insert(xprt, req); 1163 set_bit(RPC_TASK_NEED_RECV, &task->tk_runstate); 1164 spin_unlock(&xprt->queue_lock); 1165 1166 /* Turn off autodisconnect */ 1167 del_singleshot_timer_sync(&xprt->timer); 1168 } 1169 1170 /** 1171 * xprt_request_dequeue_receive_locked - Remove a request from the receive queue 1172 * @task: RPC task 1173 * 1174 * Caller must hold xprt->queue_lock. 1175 */ 1176 static void 1177 xprt_request_dequeue_receive_locked(struct rpc_task *task) 1178 { 1179 struct rpc_rqst *req = task->tk_rqstp; 1180 1181 if (test_and_clear_bit(RPC_TASK_NEED_RECV, &task->tk_runstate)) 1182 xprt_request_rb_remove(req->rq_xprt, req); 1183 } 1184 1185 /** 1186 * xprt_update_rtt - Update RPC RTT statistics 1187 * @task: RPC request that recently completed 1188 * 1189 * Caller holds xprt->queue_lock. 1190 */ 1191 void xprt_update_rtt(struct rpc_task *task) 1192 { 1193 struct rpc_rqst *req = task->tk_rqstp; 1194 struct rpc_rtt *rtt = task->tk_client->cl_rtt; 1195 unsigned int timer = task->tk_msg.rpc_proc->p_timer; 1196 long m = usecs_to_jiffies(ktime_to_us(req->rq_rtt)); 1197 1198 if (timer) { 1199 if (req->rq_ntrans == 1) 1200 rpc_update_rtt(rtt, timer, m); 1201 rpc_set_timeo(rtt, timer, req->rq_ntrans - 1); 1202 } 1203 } 1204 EXPORT_SYMBOL_GPL(xprt_update_rtt); 1205 1206 /** 1207 * xprt_complete_rqst - called when reply processing is complete 1208 * @task: RPC request that recently completed 1209 * @copied: actual number of bytes received from the transport 1210 * 1211 * Caller holds xprt->queue_lock. 1212 */ 1213 void xprt_complete_rqst(struct rpc_task *task, int copied) 1214 { 1215 struct rpc_rqst *req = task->tk_rqstp; 1216 struct rpc_xprt *xprt = req->rq_xprt; 1217 1218 xprt->stat.recvs++; 1219 1220 req->rq_private_buf.len = copied; 1221 /* Ensure all writes are done before we update */ 1222 /* req->rq_reply_bytes_recvd */ 1223 smp_wmb(); 1224 req->rq_reply_bytes_recvd = copied; 1225 xprt_request_dequeue_receive_locked(task); 1226 rpc_wake_up_queued_task(&xprt->pending, task); 1227 } 1228 EXPORT_SYMBOL_GPL(xprt_complete_rqst); 1229 1230 static void xprt_timer(struct rpc_task *task) 1231 { 1232 struct rpc_rqst *req = task->tk_rqstp; 1233 struct rpc_xprt *xprt = req->rq_xprt; 1234 1235 if (task->tk_status != -ETIMEDOUT) 1236 return; 1237 1238 trace_xprt_timer(xprt, req->rq_xid, task->tk_status); 1239 if (!req->rq_reply_bytes_recvd) { 1240 if (xprt->ops->timer) 1241 xprt->ops->timer(xprt, task); 1242 } else 1243 task->tk_status = 0; 1244 } 1245 1246 /** 1247 * xprt_wait_for_reply_request_def - wait for reply 1248 * @task: pointer to rpc_task 1249 * 1250 * Set a request's retransmit timeout based on the transport's 1251 * default timeout parameters. Used by transports that don't adjust 1252 * the retransmit timeout based on round-trip time estimation, 1253 * and put the task to sleep on the pending queue. 1254 */ 1255 void xprt_wait_for_reply_request_def(struct rpc_task *task) 1256 { 1257 struct rpc_rqst *req = task->tk_rqstp; 1258 1259 rpc_sleep_on_timeout(&req->rq_xprt->pending, task, xprt_timer, 1260 xprt_request_timeout(req)); 1261 } 1262 EXPORT_SYMBOL_GPL(xprt_wait_for_reply_request_def); 1263 1264 /** 1265 * xprt_wait_for_reply_request_rtt - wait for reply using RTT estimator 1266 * @task: pointer to rpc_task 1267 * 1268 * Set a request's retransmit timeout using the RTT estimator, 1269 * and put the task to sleep on the pending queue. 1270 */ 1271 void xprt_wait_for_reply_request_rtt(struct rpc_task *task) 1272 { 1273 int timer = task->tk_msg.rpc_proc->p_timer; 1274 struct rpc_clnt *clnt = task->tk_client; 1275 struct rpc_rtt *rtt = clnt->cl_rtt; 1276 struct rpc_rqst *req = task->tk_rqstp; 1277 unsigned long max_timeout = clnt->cl_timeout->to_maxval; 1278 unsigned long timeout; 1279 1280 timeout = rpc_calc_rto(rtt, timer); 1281 timeout <<= rpc_ntimeo(rtt, timer) + req->rq_retries; 1282 if (timeout > max_timeout || timeout == 0) 1283 timeout = max_timeout; 1284 rpc_sleep_on_timeout(&req->rq_xprt->pending, task, xprt_timer, 1285 jiffies + timeout); 1286 } 1287 EXPORT_SYMBOL_GPL(xprt_wait_for_reply_request_rtt); 1288 1289 /** 1290 * xprt_request_wait_receive - wait for the reply to an RPC request 1291 * @task: RPC task about to send a request 1292 * 1293 */ 1294 void xprt_request_wait_receive(struct rpc_task *task) 1295 { 1296 struct rpc_rqst *req = task->tk_rqstp; 1297 struct rpc_xprt *xprt = req->rq_xprt; 1298 1299 if (!test_bit(RPC_TASK_NEED_RECV, &task->tk_runstate)) 1300 return; 1301 /* 1302 * Sleep on the pending queue if we're expecting a reply. 1303 * The spinlock ensures atomicity between the test of 1304 * req->rq_reply_bytes_recvd, and the call to rpc_sleep_on(). 1305 */ 1306 spin_lock(&xprt->queue_lock); 1307 if (test_bit(RPC_TASK_NEED_RECV, &task->tk_runstate)) { 1308 xprt->ops->wait_for_reply_request(task); 1309 /* 1310 * Send an extra queue wakeup call if the 1311 * connection was dropped in case the call to 1312 * rpc_sleep_on() raced. 1313 */ 1314 if (xprt_request_retransmit_after_disconnect(task)) 1315 rpc_wake_up_queued_task_set_status(&xprt->pending, 1316 task, -ENOTCONN); 1317 } 1318 spin_unlock(&xprt->queue_lock); 1319 } 1320 1321 static bool 1322 xprt_request_need_enqueue_transmit(struct rpc_task *task, struct rpc_rqst *req) 1323 { 1324 return !test_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate); 1325 } 1326 1327 /** 1328 * xprt_request_enqueue_transmit - queue a task for transmission 1329 * @task: pointer to rpc_task 1330 * 1331 * Add a task to the transmission queue. 1332 */ 1333 void 1334 xprt_request_enqueue_transmit(struct rpc_task *task) 1335 { 1336 struct rpc_rqst *pos, *req = task->tk_rqstp; 1337 struct rpc_xprt *xprt = req->rq_xprt; 1338 1339 if (xprt_request_need_enqueue_transmit(task, req)) { 1340 req->rq_bytes_sent = 0; 1341 spin_lock(&xprt->queue_lock); 1342 /* 1343 * Requests that carry congestion control credits are added 1344 * to the head of the list to avoid starvation issues. 1345 */ 1346 if (req->rq_cong) { 1347 xprt_clear_congestion_window_wait(xprt); 1348 list_for_each_entry(pos, &xprt->xmit_queue, rq_xmit) { 1349 if (pos->rq_cong) 1350 continue; 1351 /* Note: req is added _before_ pos */ 1352 list_add_tail(&req->rq_xmit, &pos->rq_xmit); 1353 INIT_LIST_HEAD(&req->rq_xmit2); 1354 goto out; 1355 } 1356 } else if (RPC_IS_SWAPPER(task)) { 1357 list_for_each_entry(pos, &xprt->xmit_queue, rq_xmit) { 1358 if (pos->rq_cong || pos->rq_bytes_sent) 1359 continue; 1360 if (RPC_IS_SWAPPER(pos->rq_task)) 1361 continue; 1362 /* Note: req is added _before_ pos */ 1363 list_add_tail(&req->rq_xmit, &pos->rq_xmit); 1364 INIT_LIST_HEAD(&req->rq_xmit2); 1365 goto out; 1366 } 1367 } else if (!req->rq_seqno) { 1368 list_for_each_entry(pos, &xprt->xmit_queue, rq_xmit) { 1369 if (pos->rq_task->tk_owner != task->tk_owner) 1370 continue; 1371 list_add_tail(&req->rq_xmit2, &pos->rq_xmit2); 1372 INIT_LIST_HEAD(&req->rq_xmit); 1373 goto out; 1374 } 1375 } 1376 list_add_tail(&req->rq_xmit, &xprt->xmit_queue); 1377 INIT_LIST_HEAD(&req->rq_xmit2); 1378 out: 1379 atomic_long_inc(&xprt->xmit_queuelen); 1380 set_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate); 1381 spin_unlock(&xprt->queue_lock); 1382 } 1383 } 1384 1385 /** 1386 * xprt_request_dequeue_transmit_locked - remove a task from the transmission queue 1387 * @task: pointer to rpc_task 1388 * 1389 * Remove a task from the transmission queue 1390 * Caller must hold xprt->queue_lock 1391 */ 1392 static void 1393 xprt_request_dequeue_transmit_locked(struct rpc_task *task) 1394 { 1395 struct rpc_rqst *req = task->tk_rqstp; 1396 1397 if (!test_and_clear_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate)) 1398 return; 1399 if (!list_empty(&req->rq_xmit)) { 1400 list_del(&req->rq_xmit); 1401 if (!list_empty(&req->rq_xmit2)) { 1402 struct rpc_rqst *next = list_first_entry(&req->rq_xmit2, 1403 struct rpc_rqst, rq_xmit2); 1404 list_del(&req->rq_xmit2); 1405 list_add_tail(&next->rq_xmit, &next->rq_xprt->xmit_queue); 1406 } 1407 } else 1408 list_del(&req->rq_xmit2); 1409 atomic_long_dec(&req->rq_xprt->xmit_queuelen); 1410 } 1411 1412 /** 1413 * xprt_request_dequeue_transmit - remove a task from the transmission queue 1414 * @task: pointer to rpc_task 1415 * 1416 * Remove a task from the transmission queue 1417 */ 1418 static void 1419 xprt_request_dequeue_transmit(struct rpc_task *task) 1420 { 1421 struct rpc_rqst *req = task->tk_rqstp; 1422 struct rpc_xprt *xprt = req->rq_xprt; 1423 1424 spin_lock(&xprt->queue_lock); 1425 xprt_request_dequeue_transmit_locked(task); 1426 spin_unlock(&xprt->queue_lock); 1427 } 1428 1429 /** 1430 * xprt_request_dequeue_xprt - remove a task from the transmit+receive queue 1431 * @task: pointer to rpc_task 1432 * 1433 * Remove a task from the transmit and receive queues, and ensure that 1434 * it is not pinned by the receive work item. 1435 */ 1436 void 1437 xprt_request_dequeue_xprt(struct rpc_task *task) 1438 { 1439 struct rpc_rqst *req = task->tk_rqstp; 1440 struct rpc_xprt *xprt = req->rq_xprt; 1441 1442 if (test_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate) || 1443 test_bit(RPC_TASK_NEED_RECV, &task->tk_runstate) || 1444 xprt_is_pinned_rqst(req)) { 1445 spin_lock(&xprt->queue_lock); 1446 xprt_request_dequeue_transmit_locked(task); 1447 xprt_request_dequeue_receive_locked(task); 1448 while (xprt_is_pinned_rqst(req)) { 1449 set_bit(RPC_TASK_MSG_PIN_WAIT, &task->tk_runstate); 1450 spin_unlock(&xprt->queue_lock); 1451 xprt_wait_on_pinned_rqst(req); 1452 spin_lock(&xprt->queue_lock); 1453 clear_bit(RPC_TASK_MSG_PIN_WAIT, &task->tk_runstate); 1454 } 1455 spin_unlock(&xprt->queue_lock); 1456 } 1457 } 1458 1459 /** 1460 * xprt_request_prepare - prepare an encoded request for transport 1461 * @req: pointer to rpc_rqst 1462 * 1463 * Calls into the transport layer to do whatever is needed to prepare 1464 * the request for transmission or receive. 1465 */ 1466 void 1467 xprt_request_prepare(struct rpc_rqst *req) 1468 { 1469 struct rpc_xprt *xprt = req->rq_xprt; 1470 1471 if (xprt->ops->prepare_request) 1472 xprt->ops->prepare_request(req); 1473 } 1474 1475 /** 1476 * xprt_request_need_retransmit - Test if a task needs retransmission 1477 * @task: pointer to rpc_task 1478 * 1479 * Test for whether a connection breakage requires the task to retransmit 1480 */ 1481 bool 1482 xprt_request_need_retransmit(struct rpc_task *task) 1483 { 1484 return xprt_request_retransmit_after_disconnect(task); 1485 } 1486 1487 /** 1488 * xprt_prepare_transmit - reserve the transport before sending a request 1489 * @task: RPC task about to send a request 1490 * 1491 */ 1492 bool xprt_prepare_transmit(struct rpc_task *task) 1493 { 1494 struct rpc_rqst *req = task->tk_rqstp; 1495 struct rpc_xprt *xprt = req->rq_xprt; 1496 1497 if (!xprt_lock_write(xprt, task)) { 1498 /* Race breaker: someone may have transmitted us */ 1499 if (!test_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate)) 1500 rpc_wake_up_queued_task_set_status(&xprt->sending, 1501 task, 0); 1502 return false; 1503 1504 } 1505 return true; 1506 } 1507 1508 void xprt_end_transmit(struct rpc_task *task) 1509 { 1510 struct rpc_xprt *xprt = task->tk_rqstp->rq_xprt; 1511 1512 xprt_inject_disconnect(xprt); 1513 xprt_release_write(xprt, task); 1514 } 1515 1516 /** 1517 * xprt_request_transmit - send an RPC request on a transport 1518 * @req: pointer to request to transmit 1519 * @snd_task: RPC task that owns the transport lock 1520 * 1521 * This performs the transmission of a single request. 1522 * Note that if the request is not the same as snd_task, then it 1523 * does need to be pinned. 1524 * Returns '0' on success. 1525 */ 1526 static int 1527 xprt_request_transmit(struct rpc_rqst *req, struct rpc_task *snd_task) 1528 { 1529 struct rpc_xprt *xprt = req->rq_xprt; 1530 struct rpc_task *task = req->rq_task; 1531 unsigned int connect_cookie; 1532 int is_retrans = RPC_WAS_SENT(task); 1533 int status; 1534 1535 if (!req->rq_bytes_sent) { 1536 if (xprt_request_data_received(task)) { 1537 status = 0; 1538 goto out_dequeue; 1539 } 1540 /* Verify that our message lies in the RPCSEC_GSS window */ 1541 if (rpcauth_xmit_need_reencode(task)) { 1542 status = -EBADMSG; 1543 goto out_dequeue; 1544 } 1545 if (RPC_SIGNALLED(task)) { 1546 status = -ERESTARTSYS; 1547 goto out_dequeue; 1548 } 1549 } 1550 1551 /* 1552 * Update req->rq_ntrans before transmitting to avoid races with 1553 * xprt_update_rtt(), which needs to know that it is recording a 1554 * reply to the first transmission. 1555 */ 1556 req->rq_ntrans++; 1557 1558 trace_rpc_xdr_sendto(task, &req->rq_snd_buf); 1559 connect_cookie = xprt->connect_cookie; 1560 status = xprt->ops->send_request(req); 1561 if (status != 0) { 1562 req->rq_ntrans--; 1563 trace_xprt_transmit(req, status); 1564 return status; 1565 } 1566 1567 if (is_retrans) { 1568 task->tk_client->cl_stats->rpcretrans++; 1569 trace_xprt_retransmit(req); 1570 } 1571 1572 xprt_inject_disconnect(xprt); 1573 1574 task->tk_flags |= RPC_TASK_SENT; 1575 spin_lock(&xprt->transport_lock); 1576 1577 xprt->stat.sends++; 1578 xprt->stat.req_u += xprt->stat.sends - xprt->stat.recvs; 1579 xprt->stat.bklog_u += xprt->backlog.qlen; 1580 xprt->stat.sending_u += xprt->sending.qlen; 1581 xprt->stat.pending_u += xprt->pending.qlen; 1582 spin_unlock(&xprt->transport_lock); 1583 1584 req->rq_connect_cookie = connect_cookie; 1585 out_dequeue: 1586 trace_xprt_transmit(req, status); 1587 xprt_request_dequeue_transmit(task); 1588 rpc_wake_up_queued_task_set_status(&xprt->sending, task, status); 1589 return status; 1590 } 1591 1592 /** 1593 * xprt_transmit - send an RPC request on a transport 1594 * @task: controlling RPC task 1595 * 1596 * Attempts to drain the transmit queue. On exit, either the transport 1597 * signalled an error that needs to be handled before transmission can 1598 * resume, or @task finished transmitting, and detected that it already 1599 * received a reply. 1600 */ 1601 void 1602 xprt_transmit(struct rpc_task *task) 1603 { 1604 struct rpc_rqst *next, *req = task->tk_rqstp; 1605 struct rpc_xprt *xprt = req->rq_xprt; 1606 int counter, status; 1607 1608 spin_lock(&xprt->queue_lock); 1609 counter = 0; 1610 while (!list_empty(&xprt->xmit_queue)) { 1611 if (++counter == 20) 1612 break; 1613 next = list_first_entry(&xprt->xmit_queue, 1614 struct rpc_rqst, rq_xmit); 1615 xprt_pin_rqst(next); 1616 spin_unlock(&xprt->queue_lock); 1617 status = xprt_request_transmit(next, task); 1618 if (status == -EBADMSG && next != req) 1619 status = 0; 1620 spin_lock(&xprt->queue_lock); 1621 xprt_unpin_rqst(next); 1622 if (status == 0) { 1623 if (!xprt_request_data_received(task) || 1624 test_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate)) 1625 continue; 1626 } else if (test_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate)) 1627 task->tk_status = status; 1628 break; 1629 } 1630 spin_unlock(&xprt->queue_lock); 1631 } 1632 1633 static void xprt_complete_request_init(struct rpc_task *task) 1634 { 1635 if (task->tk_rqstp) 1636 xprt_request_init(task); 1637 } 1638 1639 void xprt_add_backlog(struct rpc_xprt *xprt, struct rpc_task *task) 1640 { 1641 set_bit(XPRT_CONGESTED, &xprt->state); 1642 rpc_sleep_on(&xprt->backlog, task, xprt_complete_request_init); 1643 } 1644 EXPORT_SYMBOL_GPL(xprt_add_backlog); 1645 1646 static bool __xprt_set_rq(struct rpc_task *task, void *data) 1647 { 1648 struct rpc_rqst *req = data; 1649 1650 if (task->tk_rqstp == NULL) { 1651 memset(req, 0, sizeof(*req)); /* mark unused */ 1652 task->tk_rqstp = req; 1653 return true; 1654 } 1655 return false; 1656 } 1657 1658 bool xprt_wake_up_backlog(struct rpc_xprt *xprt, struct rpc_rqst *req) 1659 { 1660 if (rpc_wake_up_first(&xprt->backlog, __xprt_set_rq, req) == NULL) { 1661 clear_bit(XPRT_CONGESTED, &xprt->state); 1662 return false; 1663 } 1664 return true; 1665 } 1666 EXPORT_SYMBOL_GPL(xprt_wake_up_backlog); 1667 1668 static bool xprt_throttle_congested(struct rpc_xprt *xprt, struct rpc_task *task) 1669 { 1670 bool ret = false; 1671 1672 if (!test_bit(XPRT_CONGESTED, &xprt->state)) 1673 goto out; 1674 spin_lock(&xprt->reserve_lock); 1675 if (test_bit(XPRT_CONGESTED, &xprt->state)) { 1676 xprt_add_backlog(xprt, task); 1677 ret = true; 1678 } 1679 spin_unlock(&xprt->reserve_lock); 1680 out: 1681 return ret; 1682 } 1683 1684 static struct rpc_rqst *xprt_dynamic_alloc_slot(struct rpc_xprt *xprt) 1685 { 1686 struct rpc_rqst *req = ERR_PTR(-EAGAIN); 1687 1688 if (xprt->num_reqs >= xprt->max_reqs) 1689 goto out; 1690 ++xprt->num_reqs; 1691 spin_unlock(&xprt->reserve_lock); 1692 req = kzalloc(sizeof(struct rpc_rqst), GFP_NOFS); 1693 spin_lock(&xprt->reserve_lock); 1694 if (req != NULL) 1695 goto out; 1696 --xprt->num_reqs; 1697 req = ERR_PTR(-ENOMEM); 1698 out: 1699 return req; 1700 } 1701 1702 static bool xprt_dynamic_free_slot(struct rpc_xprt *xprt, struct rpc_rqst *req) 1703 { 1704 if (xprt->num_reqs > xprt->min_reqs) { 1705 --xprt->num_reqs; 1706 kfree(req); 1707 return true; 1708 } 1709 return false; 1710 } 1711 1712 void xprt_alloc_slot(struct rpc_xprt *xprt, struct rpc_task *task) 1713 { 1714 struct rpc_rqst *req; 1715 1716 spin_lock(&xprt->reserve_lock); 1717 if (!list_empty(&xprt->free)) { 1718 req = list_entry(xprt->free.next, struct rpc_rqst, rq_list); 1719 list_del(&req->rq_list); 1720 goto out_init_req; 1721 } 1722 req = xprt_dynamic_alloc_slot(xprt); 1723 if (!IS_ERR(req)) 1724 goto out_init_req; 1725 switch (PTR_ERR(req)) { 1726 case -ENOMEM: 1727 dprintk("RPC: dynamic allocation of request slot " 1728 "failed! Retrying\n"); 1729 task->tk_status = -ENOMEM; 1730 break; 1731 case -EAGAIN: 1732 xprt_add_backlog(xprt, task); 1733 dprintk("RPC: waiting for request slot\n"); 1734 fallthrough; 1735 default: 1736 task->tk_status = -EAGAIN; 1737 } 1738 spin_unlock(&xprt->reserve_lock); 1739 return; 1740 out_init_req: 1741 xprt->stat.max_slots = max_t(unsigned int, xprt->stat.max_slots, 1742 xprt->num_reqs); 1743 spin_unlock(&xprt->reserve_lock); 1744 1745 task->tk_status = 0; 1746 task->tk_rqstp = req; 1747 } 1748 EXPORT_SYMBOL_GPL(xprt_alloc_slot); 1749 1750 void xprt_free_slot(struct rpc_xprt *xprt, struct rpc_rqst *req) 1751 { 1752 spin_lock(&xprt->reserve_lock); 1753 if (!xprt_wake_up_backlog(xprt, req) && 1754 !xprt_dynamic_free_slot(xprt, req)) { 1755 memset(req, 0, sizeof(*req)); /* mark unused */ 1756 list_add(&req->rq_list, &xprt->free); 1757 } 1758 spin_unlock(&xprt->reserve_lock); 1759 } 1760 EXPORT_SYMBOL_GPL(xprt_free_slot); 1761 1762 static void xprt_free_all_slots(struct rpc_xprt *xprt) 1763 { 1764 struct rpc_rqst *req; 1765 while (!list_empty(&xprt->free)) { 1766 req = list_first_entry(&xprt->free, struct rpc_rqst, rq_list); 1767 list_del(&req->rq_list); 1768 kfree(req); 1769 } 1770 } 1771 1772 static DEFINE_IDA(rpc_xprt_ids); 1773 1774 void xprt_cleanup_ids(void) 1775 { 1776 ida_destroy(&rpc_xprt_ids); 1777 } 1778 1779 static int xprt_alloc_id(struct rpc_xprt *xprt) 1780 { 1781 int id; 1782 1783 id = ida_simple_get(&rpc_xprt_ids, 0, 0, GFP_KERNEL); 1784 if (id < 0) 1785 return id; 1786 1787 xprt->id = id; 1788 return 0; 1789 } 1790 1791 static void xprt_free_id(struct rpc_xprt *xprt) 1792 { 1793 ida_simple_remove(&rpc_xprt_ids, xprt->id); 1794 } 1795 1796 struct rpc_xprt *xprt_alloc(struct net *net, size_t size, 1797 unsigned int num_prealloc, 1798 unsigned int max_alloc) 1799 { 1800 struct rpc_xprt *xprt; 1801 struct rpc_rqst *req; 1802 int i; 1803 1804 xprt = kzalloc(size, GFP_KERNEL); 1805 if (xprt == NULL) 1806 goto out; 1807 1808 xprt_alloc_id(xprt); 1809 xprt_init(xprt, net); 1810 1811 for (i = 0; i < num_prealloc; i++) { 1812 req = kzalloc(sizeof(struct rpc_rqst), GFP_KERNEL); 1813 if (!req) 1814 goto out_free; 1815 list_add(&req->rq_list, &xprt->free); 1816 } 1817 if (max_alloc > num_prealloc) 1818 xprt->max_reqs = max_alloc; 1819 else 1820 xprt->max_reqs = num_prealloc; 1821 xprt->min_reqs = num_prealloc; 1822 xprt->num_reqs = num_prealloc; 1823 1824 return xprt; 1825 1826 out_free: 1827 xprt_free(xprt); 1828 out: 1829 return NULL; 1830 } 1831 EXPORT_SYMBOL_GPL(xprt_alloc); 1832 1833 void xprt_free(struct rpc_xprt *xprt) 1834 { 1835 put_net(xprt->xprt_net); 1836 xprt_free_all_slots(xprt); 1837 xprt_free_id(xprt); 1838 rpc_sysfs_xprt_destroy(xprt); 1839 kfree_rcu(xprt, rcu); 1840 } 1841 EXPORT_SYMBOL_GPL(xprt_free); 1842 1843 static void 1844 xprt_init_connect_cookie(struct rpc_rqst *req, struct rpc_xprt *xprt) 1845 { 1846 req->rq_connect_cookie = xprt_connect_cookie(xprt) - 1; 1847 } 1848 1849 static __be32 1850 xprt_alloc_xid(struct rpc_xprt *xprt) 1851 { 1852 __be32 xid; 1853 1854 spin_lock(&xprt->reserve_lock); 1855 xid = (__force __be32)xprt->xid++; 1856 spin_unlock(&xprt->reserve_lock); 1857 return xid; 1858 } 1859 1860 static void 1861 xprt_init_xid(struct rpc_xprt *xprt) 1862 { 1863 xprt->xid = prandom_u32(); 1864 } 1865 1866 static void 1867 xprt_request_init(struct rpc_task *task) 1868 { 1869 struct rpc_xprt *xprt = task->tk_xprt; 1870 struct rpc_rqst *req = task->tk_rqstp; 1871 1872 req->rq_task = task; 1873 req->rq_xprt = xprt; 1874 req->rq_buffer = NULL; 1875 req->rq_xid = xprt_alloc_xid(xprt); 1876 xprt_init_connect_cookie(req, xprt); 1877 req->rq_snd_buf.len = 0; 1878 req->rq_snd_buf.buflen = 0; 1879 req->rq_rcv_buf.len = 0; 1880 req->rq_rcv_buf.buflen = 0; 1881 req->rq_snd_buf.bvec = NULL; 1882 req->rq_rcv_buf.bvec = NULL; 1883 req->rq_release_snd_buf = NULL; 1884 xprt_init_majortimeo(task, req); 1885 1886 trace_xprt_reserve(req); 1887 } 1888 1889 static void 1890 xprt_do_reserve(struct rpc_xprt *xprt, struct rpc_task *task) 1891 { 1892 xprt->ops->alloc_slot(xprt, task); 1893 if (task->tk_rqstp != NULL) 1894 xprt_request_init(task); 1895 } 1896 1897 /** 1898 * xprt_reserve - allocate an RPC request slot 1899 * @task: RPC task requesting a slot allocation 1900 * 1901 * If the transport is marked as being congested, or if no more 1902 * slots are available, place the task on the transport's 1903 * backlog queue. 1904 */ 1905 void xprt_reserve(struct rpc_task *task) 1906 { 1907 struct rpc_xprt *xprt = task->tk_xprt; 1908 1909 task->tk_status = 0; 1910 if (task->tk_rqstp != NULL) 1911 return; 1912 1913 task->tk_status = -EAGAIN; 1914 if (!xprt_throttle_congested(xprt, task)) 1915 xprt_do_reserve(xprt, task); 1916 } 1917 1918 /** 1919 * xprt_retry_reserve - allocate an RPC request slot 1920 * @task: RPC task requesting a slot allocation 1921 * 1922 * If no more slots are available, place the task on the transport's 1923 * backlog queue. 1924 * Note that the only difference with xprt_reserve is that we now 1925 * ignore the value of the XPRT_CONGESTED flag. 1926 */ 1927 void xprt_retry_reserve(struct rpc_task *task) 1928 { 1929 struct rpc_xprt *xprt = task->tk_xprt; 1930 1931 task->tk_status = 0; 1932 if (task->tk_rqstp != NULL) 1933 return; 1934 1935 task->tk_status = -EAGAIN; 1936 xprt_do_reserve(xprt, task); 1937 } 1938 1939 /** 1940 * xprt_release - release an RPC request slot 1941 * @task: task which is finished with the slot 1942 * 1943 */ 1944 void xprt_release(struct rpc_task *task) 1945 { 1946 struct rpc_xprt *xprt; 1947 struct rpc_rqst *req = task->tk_rqstp; 1948 1949 if (req == NULL) { 1950 if (task->tk_client) { 1951 xprt = task->tk_xprt; 1952 xprt_release_write(xprt, task); 1953 } 1954 return; 1955 } 1956 1957 xprt = req->rq_xprt; 1958 xprt_request_dequeue_xprt(task); 1959 spin_lock(&xprt->transport_lock); 1960 xprt->ops->release_xprt(xprt, task); 1961 if (xprt->ops->release_request) 1962 xprt->ops->release_request(task); 1963 xprt_schedule_autodisconnect(xprt); 1964 spin_unlock(&xprt->transport_lock); 1965 if (req->rq_buffer) 1966 xprt->ops->buf_free(task); 1967 xdr_free_bvec(&req->rq_rcv_buf); 1968 xdr_free_bvec(&req->rq_snd_buf); 1969 if (req->rq_cred != NULL) 1970 put_rpccred(req->rq_cred); 1971 if (req->rq_release_snd_buf) 1972 req->rq_release_snd_buf(req); 1973 1974 task->tk_rqstp = NULL; 1975 if (likely(!bc_prealloc(req))) 1976 xprt->ops->free_slot(xprt, req); 1977 else 1978 xprt_free_bc_request(req); 1979 } 1980 1981 #ifdef CONFIG_SUNRPC_BACKCHANNEL 1982 void 1983 xprt_init_bc_request(struct rpc_rqst *req, struct rpc_task *task) 1984 { 1985 struct xdr_buf *xbufp = &req->rq_snd_buf; 1986 1987 task->tk_rqstp = req; 1988 req->rq_task = task; 1989 xprt_init_connect_cookie(req, req->rq_xprt); 1990 /* 1991 * Set up the xdr_buf length. 1992 * This also indicates that the buffer is XDR encoded already. 1993 */ 1994 xbufp->len = xbufp->head[0].iov_len + xbufp->page_len + 1995 xbufp->tail[0].iov_len; 1996 } 1997 #endif 1998 1999 static void xprt_init(struct rpc_xprt *xprt, struct net *net) 2000 { 2001 kref_init(&xprt->kref); 2002 2003 spin_lock_init(&xprt->transport_lock); 2004 spin_lock_init(&xprt->reserve_lock); 2005 spin_lock_init(&xprt->queue_lock); 2006 2007 INIT_LIST_HEAD(&xprt->free); 2008 xprt->recv_queue = RB_ROOT; 2009 INIT_LIST_HEAD(&xprt->xmit_queue); 2010 #if defined(CONFIG_SUNRPC_BACKCHANNEL) 2011 spin_lock_init(&xprt->bc_pa_lock); 2012 INIT_LIST_HEAD(&xprt->bc_pa_list); 2013 #endif /* CONFIG_SUNRPC_BACKCHANNEL */ 2014 INIT_LIST_HEAD(&xprt->xprt_switch); 2015 2016 xprt->last_used = jiffies; 2017 xprt->cwnd = RPC_INITCWND; 2018 xprt->bind_index = 0; 2019 2020 rpc_init_wait_queue(&xprt->binding, "xprt_binding"); 2021 rpc_init_wait_queue(&xprt->pending, "xprt_pending"); 2022 rpc_init_wait_queue(&xprt->sending, "xprt_sending"); 2023 rpc_init_priority_wait_queue(&xprt->backlog, "xprt_backlog"); 2024 2025 xprt_init_xid(xprt); 2026 2027 xprt->xprt_net = get_net(net); 2028 } 2029 2030 /** 2031 * xprt_create_transport - create an RPC transport 2032 * @args: rpc transport creation arguments 2033 * 2034 */ 2035 struct rpc_xprt *xprt_create_transport(struct xprt_create *args) 2036 { 2037 struct rpc_xprt *xprt; 2038 const struct xprt_class *t; 2039 2040 t = xprt_class_find_by_ident(args->ident); 2041 if (!t) { 2042 dprintk("RPC: transport (%d) not supported\n", args->ident); 2043 return ERR_PTR(-EIO); 2044 } 2045 2046 xprt = t->setup(args); 2047 xprt_class_release(t); 2048 2049 if (IS_ERR(xprt)) 2050 goto out; 2051 if (args->flags & XPRT_CREATE_NO_IDLE_TIMEOUT) 2052 xprt->idle_timeout = 0; 2053 INIT_WORK(&xprt->task_cleanup, xprt_autoclose); 2054 if (xprt_has_timer(xprt)) 2055 timer_setup(&xprt->timer, xprt_init_autodisconnect, 0); 2056 else 2057 timer_setup(&xprt->timer, NULL, 0); 2058 2059 if (strlen(args->servername) > RPC_MAXNETNAMELEN) { 2060 xprt_destroy(xprt); 2061 return ERR_PTR(-EINVAL); 2062 } 2063 xprt->servername = kstrdup(args->servername, GFP_KERNEL); 2064 if (xprt->servername == NULL) { 2065 xprt_destroy(xprt); 2066 return ERR_PTR(-ENOMEM); 2067 } 2068 2069 rpc_xprt_debugfs_register(xprt); 2070 2071 trace_xprt_create(xprt); 2072 out: 2073 return xprt; 2074 } 2075 2076 static void xprt_destroy_cb(struct work_struct *work) 2077 { 2078 struct rpc_xprt *xprt = 2079 container_of(work, struct rpc_xprt, task_cleanup); 2080 2081 trace_xprt_destroy(xprt); 2082 2083 rpc_xprt_debugfs_unregister(xprt); 2084 rpc_destroy_wait_queue(&xprt->binding); 2085 rpc_destroy_wait_queue(&xprt->pending); 2086 rpc_destroy_wait_queue(&xprt->sending); 2087 rpc_destroy_wait_queue(&xprt->backlog); 2088 kfree(xprt->servername); 2089 /* 2090 * Destroy any existing back channel 2091 */ 2092 xprt_destroy_backchannel(xprt, UINT_MAX); 2093 2094 /* 2095 * Tear down transport state and free the rpc_xprt 2096 */ 2097 xprt->ops->destroy(xprt); 2098 } 2099 2100 /** 2101 * xprt_destroy - destroy an RPC transport, killing off all requests. 2102 * @xprt: transport to destroy 2103 * 2104 */ 2105 static void xprt_destroy(struct rpc_xprt *xprt) 2106 { 2107 /* 2108 * Exclude transport connect/disconnect handlers and autoclose 2109 */ 2110 wait_on_bit_lock(&xprt->state, XPRT_LOCKED, TASK_UNINTERRUPTIBLE); 2111 2112 del_timer_sync(&xprt->timer); 2113 2114 /* 2115 * Destroy sockets etc from the system workqueue so they can 2116 * safely flush receive work running on rpciod. 2117 */ 2118 INIT_WORK(&xprt->task_cleanup, xprt_destroy_cb); 2119 schedule_work(&xprt->task_cleanup); 2120 } 2121 2122 static void xprt_destroy_kref(struct kref *kref) 2123 { 2124 xprt_destroy(container_of(kref, struct rpc_xprt, kref)); 2125 } 2126 2127 /** 2128 * xprt_get - return a reference to an RPC transport. 2129 * @xprt: pointer to the transport 2130 * 2131 */ 2132 struct rpc_xprt *xprt_get(struct rpc_xprt *xprt) 2133 { 2134 if (xprt != NULL && kref_get_unless_zero(&xprt->kref)) 2135 return xprt; 2136 return NULL; 2137 } 2138 EXPORT_SYMBOL_GPL(xprt_get); 2139 2140 /** 2141 * xprt_put - release a reference to an RPC transport. 2142 * @xprt: pointer to the transport 2143 * 2144 */ 2145 void xprt_put(struct rpc_xprt *xprt) 2146 { 2147 if (xprt != NULL) 2148 kref_put(&xprt->kref, xprt_destroy_kref); 2149 } 2150 EXPORT_SYMBOL_GPL(xprt_put); 2151