1 // SPDX-License-Identifier: GPL-2.0-only 2 /* 3 * linux/net/sunrpc/xprt.c 4 * 5 * This is a generic RPC call interface supporting congestion avoidance, 6 * and asynchronous calls. 7 * 8 * The interface works like this: 9 * 10 * - When a process places a call, it allocates a request slot if 11 * one is available. Otherwise, it sleeps on the backlog queue 12 * (xprt_reserve). 13 * - Next, the caller puts together the RPC message, stuffs it into 14 * the request struct, and calls xprt_transmit(). 15 * - xprt_transmit sends the message and installs the caller on the 16 * transport's wait list. At the same time, if a reply is expected, 17 * it installs a timer that is run after the packet's timeout has 18 * expired. 19 * - When a packet arrives, the data_ready handler walks the list of 20 * pending requests for that transport. If a matching XID is found, the 21 * caller is woken up, and the timer removed. 22 * - When no reply arrives within the timeout interval, the timer is 23 * fired by the kernel and runs xprt_timer(). It either adjusts the 24 * timeout values (minor timeout) or wakes up the caller with a status 25 * of -ETIMEDOUT. 26 * - When the caller receives a notification from RPC that a reply arrived, 27 * it should release the RPC slot, and process the reply. 28 * If the call timed out, it may choose to retry the operation by 29 * adjusting the initial timeout value, and simply calling rpc_call 30 * again. 31 * 32 * Support for async RPC is done through a set of RPC-specific scheduling 33 * primitives that `transparently' work for processes as well as async 34 * tasks that rely on callbacks. 35 * 36 * Copyright (C) 1995-1997, Olaf Kirch <okir@monad.swb.de> 37 * 38 * Transport switch API copyright (C) 2005, Chuck Lever <cel@netapp.com> 39 */ 40 41 #include <linux/module.h> 42 43 #include <linux/types.h> 44 #include <linux/interrupt.h> 45 #include <linux/workqueue.h> 46 #include <linux/net.h> 47 #include <linux/ktime.h> 48 49 #include <linux/sunrpc/clnt.h> 50 #include <linux/sunrpc/metrics.h> 51 #include <linux/sunrpc/bc_xprt.h> 52 #include <linux/rcupdate.h> 53 #include <linux/sched/mm.h> 54 55 #include <trace/events/sunrpc.h> 56 57 #include "sunrpc.h" 58 #include "sysfs.h" 59 #include "fail.h" 60 61 /* 62 * Local variables 63 */ 64 65 #if IS_ENABLED(CONFIG_SUNRPC_DEBUG) 66 # define RPCDBG_FACILITY RPCDBG_XPRT 67 #endif 68 69 /* 70 * Local functions 71 */ 72 static void xprt_init(struct rpc_xprt *xprt, struct net *net); 73 static __be32 xprt_alloc_xid(struct rpc_xprt *xprt); 74 static void xprt_destroy(struct rpc_xprt *xprt); 75 static void xprt_request_init(struct rpc_task *task); 76 77 static DEFINE_SPINLOCK(xprt_list_lock); 78 static LIST_HEAD(xprt_list); 79 80 static unsigned long xprt_request_timeout(const struct rpc_rqst *req) 81 { 82 unsigned long timeout = jiffies + req->rq_timeout; 83 84 if (time_before(timeout, req->rq_majortimeo)) 85 return timeout; 86 return req->rq_majortimeo; 87 } 88 89 /** 90 * xprt_register_transport - register a transport implementation 91 * @transport: transport to register 92 * 93 * If a transport implementation is loaded as a kernel module, it can 94 * call this interface to make itself known to the RPC client. 95 * 96 * Returns: 97 * 0: transport successfully registered 98 * -EEXIST: transport already registered 99 * -EINVAL: transport module being unloaded 100 */ 101 int xprt_register_transport(struct xprt_class *transport) 102 { 103 struct xprt_class *t; 104 int result; 105 106 result = -EEXIST; 107 spin_lock(&xprt_list_lock); 108 list_for_each_entry(t, &xprt_list, list) { 109 /* don't register the same transport class twice */ 110 if (t->ident == transport->ident) 111 goto out; 112 } 113 114 list_add_tail(&transport->list, &xprt_list); 115 printk(KERN_INFO "RPC: Registered %s transport module.\n", 116 transport->name); 117 result = 0; 118 119 out: 120 spin_unlock(&xprt_list_lock); 121 return result; 122 } 123 EXPORT_SYMBOL_GPL(xprt_register_transport); 124 125 /** 126 * xprt_unregister_transport - unregister a transport implementation 127 * @transport: transport to unregister 128 * 129 * Returns: 130 * 0: transport successfully unregistered 131 * -ENOENT: transport never registered 132 */ 133 int xprt_unregister_transport(struct xprt_class *transport) 134 { 135 struct xprt_class *t; 136 int result; 137 138 result = 0; 139 spin_lock(&xprt_list_lock); 140 list_for_each_entry(t, &xprt_list, list) { 141 if (t == transport) { 142 printk(KERN_INFO 143 "RPC: Unregistered %s transport module.\n", 144 transport->name); 145 list_del_init(&transport->list); 146 goto out; 147 } 148 } 149 result = -ENOENT; 150 151 out: 152 spin_unlock(&xprt_list_lock); 153 return result; 154 } 155 EXPORT_SYMBOL_GPL(xprt_unregister_transport); 156 157 static void 158 xprt_class_release(const struct xprt_class *t) 159 { 160 module_put(t->owner); 161 } 162 163 static const struct xprt_class * 164 xprt_class_find_by_ident_locked(int ident) 165 { 166 const struct xprt_class *t; 167 168 list_for_each_entry(t, &xprt_list, list) { 169 if (t->ident != ident) 170 continue; 171 if (!try_module_get(t->owner)) 172 continue; 173 return t; 174 } 175 return NULL; 176 } 177 178 static const struct xprt_class * 179 xprt_class_find_by_ident(int ident) 180 { 181 const struct xprt_class *t; 182 183 spin_lock(&xprt_list_lock); 184 t = xprt_class_find_by_ident_locked(ident); 185 spin_unlock(&xprt_list_lock); 186 return t; 187 } 188 189 static const struct xprt_class * 190 xprt_class_find_by_netid_locked(const char *netid) 191 { 192 const struct xprt_class *t; 193 unsigned int i; 194 195 list_for_each_entry(t, &xprt_list, list) { 196 for (i = 0; t->netid[i][0] != '\0'; i++) { 197 if (strcmp(t->netid[i], netid) != 0) 198 continue; 199 if (!try_module_get(t->owner)) 200 continue; 201 return t; 202 } 203 } 204 return NULL; 205 } 206 207 static const struct xprt_class * 208 xprt_class_find_by_netid(const char *netid) 209 { 210 const struct xprt_class *t; 211 212 spin_lock(&xprt_list_lock); 213 t = xprt_class_find_by_netid_locked(netid); 214 if (!t) { 215 spin_unlock(&xprt_list_lock); 216 request_module("rpc%s", netid); 217 spin_lock(&xprt_list_lock); 218 t = xprt_class_find_by_netid_locked(netid); 219 } 220 spin_unlock(&xprt_list_lock); 221 return t; 222 } 223 224 /** 225 * xprt_find_transport_ident - convert a netid into a transport identifier 226 * @netid: transport to load 227 * 228 * Returns: 229 * > 0: transport identifier 230 * -ENOENT: transport module not available 231 */ 232 int xprt_find_transport_ident(const char *netid) 233 { 234 const struct xprt_class *t; 235 int ret; 236 237 t = xprt_class_find_by_netid(netid); 238 if (!t) 239 return -ENOENT; 240 ret = t->ident; 241 xprt_class_release(t); 242 return ret; 243 } 244 EXPORT_SYMBOL_GPL(xprt_find_transport_ident); 245 246 static void xprt_clear_locked(struct rpc_xprt *xprt) 247 { 248 xprt->snd_task = NULL; 249 if (!test_bit(XPRT_CLOSE_WAIT, &xprt->state)) { 250 smp_mb__before_atomic(); 251 clear_bit(XPRT_LOCKED, &xprt->state); 252 smp_mb__after_atomic(); 253 } else 254 queue_work(xprtiod_workqueue, &xprt->task_cleanup); 255 } 256 257 /** 258 * xprt_reserve_xprt - serialize write access to transports 259 * @task: task that is requesting access to the transport 260 * @xprt: pointer to the target transport 261 * 262 * This prevents mixing the payload of separate requests, and prevents 263 * transport connects from colliding with writes. No congestion control 264 * is provided. 265 */ 266 int xprt_reserve_xprt(struct rpc_xprt *xprt, struct rpc_task *task) 267 { 268 struct rpc_rqst *req = task->tk_rqstp; 269 270 if (test_and_set_bit(XPRT_LOCKED, &xprt->state)) { 271 if (task == xprt->snd_task) 272 goto out_locked; 273 goto out_sleep; 274 } 275 if (test_bit(XPRT_WRITE_SPACE, &xprt->state)) 276 goto out_unlock; 277 xprt->snd_task = task; 278 279 out_locked: 280 trace_xprt_reserve_xprt(xprt, task); 281 return 1; 282 283 out_unlock: 284 xprt_clear_locked(xprt); 285 out_sleep: 286 task->tk_status = -EAGAIN; 287 if (RPC_IS_SOFT(task)) 288 rpc_sleep_on_timeout(&xprt->sending, task, NULL, 289 xprt_request_timeout(req)); 290 else 291 rpc_sleep_on(&xprt->sending, task, NULL); 292 return 0; 293 } 294 EXPORT_SYMBOL_GPL(xprt_reserve_xprt); 295 296 static bool 297 xprt_need_congestion_window_wait(struct rpc_xprt *xprt) 298 { 299 return test_bit(XPRT_CWND_WAIT, &xprt->state); 300 } 301 302 static void 303 xprt_set_congestion_window_wait(struct rpc_xprt *xprt) 304 { 305 if (!list_empty(&xprt->xmit_queue)) { 306 /* Peek at head of queue to see if it can make progress */ 307 if (list_first_entry(&xprt->xmit_queue, struct rpc_rqst, 308 rq_xmit)->rq_cong) 309 return; 310 } 311 set_bit(XPRT_CWND_WAIT, &xprt->state); 312 } 313 314 static void 315 xprt_test_and_clear_congestion_window_wait(struct rpc_xprt *xprt) 316 { 317 if (!RPCXPRT_CONGESTED(xprt)) 318 clear_bit(XPRT_CWND_WAIT, &xprt->state); 319 } 320 321 /* 322 * xprt_reserve_xprt_cong - serialize write access to transports 323 * @task: task that is requesting access to the transport 324 * 325 * Same as xprt_reserve_xprt, but Van Jacobson congestion control is 326 * integrated into the decision of whether a request is allowed to be 327 * woken up and given access to the transport. 328 * Note that the lock is only granted if we know there are free slots. 329 */ 330 int xprt_reserve_xprt_cong(struct rpc_xprt *xprt, struct rpc_task *task) 331 { 332 struct rpc_rqst *req = task->tk_rqstp; 333 334 if (test_and_set_bit(XPRT_LOCKED, &xprt->state)) { 335 if (task == xprt->snd_task) 336 goto out_locked; 337 goto out_sleep; 338 } 339 if (req == NULL) { 340 xprt->snd_task = task; 341 goto out_locked; 342 } 343 if (test_bit(XPRT_WRITE_SPACE, &xprt->state)) 344 goto out_unlock; 345 if (!xprt_need_congestion_window_wait(xprt)) { 346 xprt->snd_task = task; 347 goto out_locked; 348 } 349 out_unlock: 350 xprt_clear_locked(xprt); 351 out_sleep: 352 task->tk_status = -EAGAIN; 353 if (RPC_IS_SOFT(task)) 354 rpc_sleep_on_timeout(&xprt->sending, task, NULL, 355 xprt_request_timeout(req)); 356 else 357 rpc_sleep_on(&xprt->sending, task, NULL); 358 return 0; 359 out_locked: 360 trace_xprt_reserve_cong(xprt, task); 361 return 1; 362 } 363 EXPORT_SYMBOL_GPL(xprt_reserve_xprt_cong); 364 365 static inline int xprt_lock_write(struct rpc_xprt *xprt, struct rpc_task *task) 366 { 367 int retval; 368 369 if (test_bit(XPRT_LOCKED, &xprt->state) && xprt->snd_task == task) 370 return 1; 371 spin_lock(&xprt->transport_lock); 372 retval = xprt->ops->reserve_xprt(xprt, task); 373 spin_unlock(&xprt->transport_lock); 374 return retval; 375 } 376 377 static bool __xprt_lock_write_func(struct rpc_task *task, void *data) 378 { 379 struct rpc_xprt *xprt = data; 380 381 xprt->snd_task = task; 382 return true; 383 } 384 385 static void __xprt_lock_write_next(struct rpc_xprt *xprt) 386 { 387 if (test_and_set_bit(XPRT_LOCKED, &xprt->state)) 388 return; 389 if (test_bit(XPRT_WRITE_SPACE, &xprt->state)) 390 goto out_unlock; 391 if (rpc_wake_up_first_on_wq(xprtiod_workqueue, &xprt->sending, 392 __xprt_lock_write_func, xprt)) 393 return; 394 out_unlock: 395 xprt_clear_locked(xprt); 396 } 397 398 static void __xprt_lock_write_next_cong(struct rpc_xprt *xprt) 399 { 400 if (test_and_set_bit(XPRT_LOCKED, &xprt->state)) 401 return; 402 if (test_bit(XPRT_WRITE_SPACE, &xprt->state)) 403 goto out_unlock; 404 if (xprt_need_congestion_window_wait(xprt)) 405 goto out_unlock; 406 if (rpc_wake_up_first_on_wq(xprtiod_workqueue, &xprt->sending, 407 __xprt_lock_write_func, xprt)) 408 return; 409 out_unlock: 410 xprt_clear_locked(xprt); 411 } 412 413 /** 414 * xprt_release_xprt - allow other requests to use a transport 415 * @xprt: transport with other tasks potentially waiting 416 * @task: task that is releasing access to the transport 417 * 418 * Note that "task" can be NULL. No congestion control is provided. 419 */ 420 void xprt_release_xprt(struct rpc_xprt *xprt, struct rpc_task *task) 421 { 422 if (xprt->snd_task == task) { 423 xprt_clear_locked(xprt); 424 __xprt_lock_write_next(xprt); 425 } 426 trace_xprt_release_xprt(xprt, task); 427 } 428 EXPORT_SYMBOL_GPL(xprt_release_xprt); 429 430 /** 431 * xprt_release_xprt_cong - allow other requests to use a transport 432 * @xprt: transport with other tasks potentially waiting 433 * @task: task that is releasing access to the transport 434 * 435 * Note that "task" can be NULL. Another task is awoken to use the 436 * transport if the transport's congestion window allows it. 437 */ 438 void xprt_release_xprt_cong(struct rpc_xprt *xprt, struct rpc_task *task) 439 { 440 if (xprt->snd_task == task) { 441 xprt_clear_locked(xprt); 442 __xprt_lock_write_next_cong(xprt); 443 } 444 trace_xprt_release_cong(xprt, task); 445 } 446 EXPORT_SYMBOL_GPL(xprt_release_xprt_cong); 447 448 void xprt_release_write(struct rpc_xprt *xprt, struct rpc_task *task) 449 { 450 if (xprt->snd_task != task) 451 return; 452 spin_lock(&xprt->transport_lock); 453 xprt->ops->release_xprt(xprt, task); 454 spin_unlock(&xprt->transport_lock); 455 } 456 457 /* 458 * Van Jacobson congestion avoidance. Check if the congestion window 459 * overflowed. Put the task to sleep if this is the case. 460 */ 461 static int 462 __xprt_get_cong(struct rpc_xprt *xprt, struct rpc_rqst *req) 463 { 464 if (req->rq_cong) 465 return 1; 466 trace_xprt_get_cong(xprt, req->rq_task); 467 if (RPCXPRT_CONGESTED(xprt)) { 468 xprt_set_congestion_window_wait(xprt); 469 return 0; 470 } 471 req->rq_cong = 1; 472 xprt->cong += RPC_CWNDSCALE; 473 return 1; 474 } 475 476 /* 477 * Adjust the congestion window, and wake up the next task 478 * that has been sleeping due to congestion 479 */ 480 static void 481 __xprt_put_cong(struct rpc_xprt *xprt, struct rpc_rqst *req) 482 { 483 if (!req->rq_cong) 484 return; 485 req->rq_cong = 0; 486 xprt->cong -= RPC_CWNDSCALE; 487 xprt_test_and_clear_congestion_window_wait(xprt); 488 trace_xprt_put_cong(xprt, req->rq_task); 489 __xprt_lock_write_next_cong(xprt); 490 } 491 492 /** 493 * xprt_request_get_cong - Request congestion control credits 494 * @xprt: pointer to transport 495 * @req: pointer to RPC request 496 * 497 * Useful for transports that require congestion control. 498 */ 499 bool 500 xprt_request_get_cong(struct rpc_xprt *xprt, struct rpc_rqst *req) 501 { 502 bool ret = false; 503 504 if (req->rq_cong) 505 return true; 506 spin_lock(&xprt->transport_lock); 507 ret = __xprt_get_cong(xprt, req) != 0; 508 spin_unlock(&xprt->transport_lock); 509 return ret; 510 } 511 EXPORT_SYMBOL_GPL(xprt_request_get_cong); 512 513 /** 514 * xprt_release_rqst_cong - housekeeping when request is complete 515 * @task: RPC request that recently completed 516 * 517 * Useful for transports that require congestion control. 518 */ 519 void xprt_release_rqst_cong(struct rpc_task *task) 520 { 521 struct rpc_rqst *req = task->tk_rqstp; 522 523 __xprt_put_cong(req->rq_xprt, req); 524 } 525 EXPORT_SYMBOL_GPL(xprt_release_rqst_cong); 526 527 static void xprt_clear_congestion_window_wait_locked(struct rpc_xprt *xprt) 528 { 529 if (test_and_clear_bit(XPRT_CWND_WAIT, &xprt->state)) 530 __xprt_lock_write_next_cong(xprt); 531 } 532 533 /* 534 * Clear the congestion window wait flag and wake up the next 535 * entry on xprt->sending 536 */ 537 static void 538 xprt_clear_congestion_window_wait(struct rpc_xprt *xprt) 539 { 540 if (test_and_clear_bit(XPRT_CWND_WAIT, &xprt->state)) { 541 spin_lock(&xprt->transport_lock); 542 __xprt_lock_write_next_cong(xprt); 543 spin_unlock(&xprt->transport_lock); 544 } 545 } 546 547 /** 548 * xprt_adjust_cwnd - adjust transport congestion window 549 * @xprt: pointer to xprt 550 * @task: recently completed RPC request used to adjust window 551 * @result: result code of completed RPC request 552 * 553 * The transport code maintains an estimate on the maximum number of out- 554 * standing RPC requests, using a smoothed version of the congestion 555 * avoidance implemented in 44BSD. This is basically the Van Jacobson 556 * congestion algorithm: If a retransmit occurs, the congestion window is 557 * halved; otherwise, it is incremented by 1/cwnd when 558 * 559 * - a reply is received and 560 * - a full number of requests are outstanding and 561 * - the congestion window hasn't been updated recently. 562 */ 563 void xprt_adjust_cwnd(struct rpc_xprt *xprt, struct rpc_task *task, int result) 564 { 565 struct rpc_rqst *req = task->tk_rqstp; 566 unsigned long cwnd = xprt->cwnd; 567 568 if (result >= 0 && cwnd <= xprt->cong) { 569 /* The (cwnd >> 1) term makes sure 570 * the result gets rounded properly. */ 571 cwnd += (RPC_CWNDSCALE * RPC_CWNDSCALE + (cwnd >> 1)) / cwnd; 572 if (cwnd > RPC_MAXCWND(xprt)) 573 cwnd = RPC_MAXCWND(xprt); 574 __xprt_lock_write_next_cong(xprt); 575 } else if (result == -ETIMEDOUT) { 576 cwnd >>= 1; 577 if (cwnd < RPC_CWNDSCALE) 578 cwnd = RPC_CWNDSCALE; 579 } 580 dprintk("RPC: cong %ld, cwnd was %ld, now %ld\n", 581 xprt->cong, xprt->cwnd, cwnd); 582 xprt->cwnd = cwnd; 583 __xprt_put_cong(xprt, req); 584 } 585 EXPORT_SYMBOL_GPL(xprt_adjust_cwnd); 586 587 /** 588 * xprt_wake_pending_tasks - wake all tasks on a transport's pending queue 589 * @xprt: transport with waiting tasks 590 * @status: result code to plant in each task before waking it 591 * 592 */ 593 void xprt_wake_pending_tasks(struct rpc_xprt *xprt, int status) 594 { 595 if (status < 0) 596 rpc_wake_up_status(&xprt->pending, status); 597 else 598 rpc_wake_up(&xprt->pending); 599 } 600 EXPORT_SYMBOL_GPL(xprt_wake_pending_tasks); 601 602 /** 603 * xprt_wait_for_buffer_space - wait for transport output buffer to clear 604 * @xprt: transport 605 * 606 * Note that we only set the timer for the case of RPC_IS_SOFT(), since 607 * we don't in general want to force a socket disconnection due to 608 * an incomplete RPC call transmission. 609 */ 610 void xprt_wait_for_buffer_space(struct rpc_xprt *xprt) 611 { 612 set_bit(XPRT_WRITE_SPACE, &xprt->state); 613 } 614 EXPORT_SYMBOL_GPL(xprt_wait_for_buffer_space); 615 616 static bool 617 xprt_clear_write_space_locked(struct rpc_xprt *xprt) 618 { 619 if (test_and_clear_bit(XPRT_WRITE_SPACE, &xprt->state)) { 620 __xprt_lock_write_next(xprt); 621 dprintk("RPC: write space: waking waiting task on " 622 "xprt %p\n", xprt); 623 return true; 624 } 625 return false; 626 } 627 628 /** 629 * xprt_write_space - wake the task waiting for transport output buffer space 630 * @xprt: transport with waiting tasks 631 * 632 * Can be called in a soft IRQ context, so xprt_write_space never sleeps. 633 */ 634 bool xprt_write_space(struct rpc_xprt *xprt) 635 { 636 bool ret; 637 638 if (!test_bit(XPRT_WRITE_SPACE, &xprt->state)) 639 return false; 640 spin_lock(&xprt->transport_lock); 641 ret = xprt_clear_write_space_locked(xprt); 642 spin_unlock(&xprt->transport_lock); 643 return ret; 644 } 645 EXPORT_SYMBOL_GPL(xprt_write_space); 646 647 static unsigned long xprt_abs_ktime_to_jiffies(ktime_t abstime) 648 { 649 s64 delta = ktime_to_ns(ktime_get() - abstime); 650 return likely(delta >= 0) ? 651 jiffies - nsecs_to_jiffies(delta) : 652 jiffies + nsecs_to_jiffies(-delta); 653 } 654 655 static unsigned long xprt_calc_majortimeo(struct rpc_rqst *req) 656 { 657 const struct rpc_timeout *to = req->rq_task->tk_client->cl_timeout; 658 unsigned long majortimeo = req->rq_timeout; 659 660 if (to->to_exponential) 661 majortimeo <<= to->to_retries; 662 else 663 majortimeo += to->to_increment * to->to_retries; 664 if (majortimeo > to->to_maxval || majortimeo == 0) 665 majortimeo = to->to_maxval; 666 return majortimeo; 667 } 668 669 static void xprt_reset_majortimeo(struct rpc_rqst *req) 670 { 671 req->rq_majortimeo += xprt_calc_majortimeo(req); 672 } 673 674 static void xprt_reset_minortimeo(struct rpc_rqst *req) 675 { 676 req->rq_minortimeo += req->rq_timeout; 677 } 678 679 static void xprt_init_majortimeo(struct rpc_task *task, struct rpc_rqst *req) 680 { 681 unsigned long time_init; 682 struct rpc_xprt *xprt = req->rq_xprt; 683 684 if (likely(xprt && xprt_connected(xprt))) 685 time_init = jiffies; 686 else 687 time_init = xprt_abs_ktime_to_jiffies(task->tk_start); 688 req->rq_timeout = task->tk_client->cl_timeout->to_initval; 689 req->rq_majortimeo = time_init + xprt_calc_majortimeo(req); 690 req->rq_minortimeo = time_init + req->rq_timeout; 691 } 692 693 /** 694 * xprt_adjust_timeout - adjust timeout values for next retransmit 695 * @req: RPC request containing parameters to use for the adjustment 696 * 697 */ 698 int xprt_adjust_timeout(struct rpc_rqst *req) 699 { 700 struct rpc_xprt *xprt = req->rq_xprt; 701 const struct rpc_timeout *to = req->rq_task->tk_client->cl_timeout; 702 int status = 0; 703 704 if (time_before(jiffies, req->rq_majortimeo)) { 705 if (time_before(jiffies, req->rq_minortimeo)) 706 return status; 707 if (to->to_exponential) 708 req->rq_timeout <<= 1; 709 else 710 req->rq_timeout += to->to_increment; 711 if (to->to_maxval && req->rq_timeout >= to->to_maxval) 712 req->rq_timeout = to->to_maxval; 713 req->rq_retries++; 714 } else { 715 req->rq_timeout = to->to_initval; 716 req->rq_retries = 0; 717 xprt_reset_majortimeo(req); 718 /* Reset the RTT counters == "slow start" */ 719 spin_lock(&xprt->transport_lock); 720 rpc_init_rtt(req->rq_task->tk_client->cl_rtt, to->to_initval); 721 spin_unlock(&xprt->transport_lock); 722 status = -ETIMEDOUT; 723 } 724 xprt_reset_minortimeo(req); 725 726 if (req->rq_timeout == 0) { 727 printk(KERN_WARNING "xprt_adjust_timeout: rq_timeout = 0!\n"); 728 req->rq_timeout = 5 * HZ; 729 } 730 return status; 731 } 732 733 static void xprt_autoclose(struct work_struct *work) 734 { 735 struct rpc_xprt *xprt = 736 container_of(work, struct rpc_xprt, task_cleanup); 737 unsigned int pflags = memalloc_nofs_save(); 738 739 trace_xprt_disconnect_auto(xprt); 740 clear_bit(XPRT_CLOSE_WAIT, &xprt->state); 741 xprt->ops->close(xprt); 742 xprt_release_write(xprt, NULL); 743 wake_up_bit(&xprt->state, XPRT_LOCKED); 744 memalloc_nofs_restore(pflags); 745 } 746 747 /** 748 * xprt_disconnect_done - mark a transport as disconnected 749 * @xprt: transport to flag for disconnect 750 * 751 */ 752 void xprt_disconnect_done(struct rpc_xprt *xprt) 753 { 754 trace_xprt_disconnect_done(xprt); 755 spin_lock(&xprt->transport_lock); 756 xprt_clear_connected(xprt); 757 xprt_clear_write_space_locked(xprt); 758 xprt_clear_congestion_window_wait_locked(xprt); 759 xprt_wake_pending_tasks(xprt, -ENOTCONN); 760 spin_unlock(&xprt->transport_lock); 761 } 762 EXPORT_SYMBOL_GPL(xprt_disconnect_done); 763 764 /** 765 * xprt_schedule_autoclose_locked - Try to schedule an autoclose RPC call 766 * @xprt: transport to disconnect 767 */ 768 static void xprt_schedule_autoclose_locked(struct rpc_xprt *xprt) 769 { 770 set_bit(XPRT_CLOSE_WAIT, &xprt->state); 771 if (test_and_set_bit(XPRT_LOCKED, &xprt->state) == 0) 772 queue_work(xprtiod_workqueue, &xprt->task_cleanup); 773 else if (xprt->snd_task && !test_bit(XPRT_SND_IS_COOKIE, &xprt->state)) 774 rpc_wake_up_queued_task_set_status(&xprt->pending, 775 xprt->snd_task, -ENOTCONN); 776 } 777 778 /** 779 * xprt_force_disconnect - force a transport to disconnect 780 * @xprt: transport to disconnect 781 * 782 */ 783 void xprt_force_disconnect(struct rpc_xprt *xprt) 784 { 785 trace_xprt_disconnect_force(xprt); 786 787 /* Don't race with the test_bit() in xprt_clear_locked() */ 788 spin_lock(&xprt->transport_lock); 789 xprt_schedule_autoclose_locked(xprt); 790 spin_unlock(&xprt->transport_lock); 791 } 792 EXPORT_SYMBOL_GPL(xprt_force_disconnect); 793 794 static unsigned int 795 xprt_connect_cookie(struct rpc_xprt *xprt) 796 { 797 return READ_ONCE(xprt->connect_cookie); 798 } 799 800 static bool 801 xprt_request_retransmit_after_disconnect(struct rpc_task *task) 802 { 803 struct rpc_rqst *req = task->tk_rqstp; 804 struct rpc_xprt *xprt = req->rq_xprt; 805 806 return req->rq_connect_cookie != xprt_connect_cookie(xprt) || 807 !xprt_connected(xprt); 808 } 809 810 /** 811 * xprt_conditional_disconnect - force a transport to disconnect 812 * @xprt: transport to disconnect 813 * @cookie: 'connection cookie' 814 * 815 * This attempts to break the connection if and only if 'cookie' matches 816 * the current transport 'connection cookie'. It ensures that we don't 817 * try to break the connection more than once when we need to retransmit 818 * a batch of RPC requests. 819 * 820 */ 821 void xprt_conditional_disconnect(struct rpc_xprt *xprt, unsigned int cookie) 822 { 823 /* Don't race with the test_bit() in xprt_clear_locked() */ 824 spin_lock(&xprt->transport_lock); 825 if (cookie != xprt->connect_cookie) 826 goto out; 827 if (test_bit(XPRT_CLOSING, &xprt->state)) 828 goto out; 829 xprt_schedule_autoclose_locked(xprt); 830 out: 831 spin_unlock(&xprt->transport_lock); 832 } 833 834 static bool 835 xprt_has_timer(const struct rpc_xprt *xprt) 836 { 837 return xprt->idle_timeout != 0; 838 } 839 840 static void 841 xprt_schedule_autodisconnect(struct rpc_xprt *xprt) 842 __must_hold(&xprt->transport_lock) 843 { 844 xprt->last_used = jiffies; 845 if (RB_EMPTY_ROOT(&xprt->recv_queue) && xprt_has_timer(xprt)) 846 mod_timer(&xprt->timer, xprt->last_used + xprt->idle_timeout); 847 } 848 849 static void 850 xprt_init_autodisconnect(struct timer_list *t) 851 { 852 struct rpc_xprt *xprt = from_timer(xprt, t, timer); 853 854 if (!RB_EMPTY_ROOT(&xprt->recv_queue)) 855 return; 856 /* Reset xprt->last_used to avoid connect/autodisconnect cycling */ 857 xprt->last_used = jiffies; 858 if (test_and_set_bit(XPRT_LOCKED, &xprt->state)) 859 return; 860 queue_work(xprtiod_workqueue, &xprt->task_cleanup); 861 } 862 863 #if IS_ENABLED(CONFIG_FAIL_SUNRPC) 864 static void xprt_inject_disconnect(struct rpc_xprt *xprt) 865 { 866 if (!fail_sunrpc.ignore_client_disconnect && 867 should_fail(&fail_sunrpc.attr, 1)) 868 xprt->ops->inject_disconnect(xprt); 869 } 870 #else 871 static inline void xprt_inject_disconnect(struct rpc_xprt *xprt) 872 { 873 } 874 #endif 875 876 bool xprt_lock_connect(struct rpc_xprt *xprt, 877 struct rpc_task *task, 878 void *cookie) 879 { 880 bool ret = false; 881 882 spin_lock(&xprt->transport_lock); 883 if (!test_bit(XPRT_LOCKED, &xprt->state)) 884 goto out; 885 if (xprt->snd_task != task) 886 goto out; 887 set_bit(XPRT_SND_IS_COOKIE, &xprt->state); 888 xprt->snd_task = cookie; 889 ret = true; 890 out: 891 spin_unlock(&xprt->transport_lock); 892 return ret; 893 } 894 EXPORT_SYMBOL_GPL(xprt_lock_connect); 895 896 void xprt_unlock_connect(struct rpc_xprt *xprt, void *cookie) 897 { 898 spin_lock(&xprt->transport_lock); 899 if (xprt->snd_task != cookie) 900 goto out; 901 if (!test_bit(XPRT_LOCKED, &xprt->state)) 902 goto out; 903 xprt->snd_task =NULL; 904 clear_bit(XPRT_SND_IS_COOKIE, &xprt->state); 905 xprt->ops->release_xprt(xprt, NULL); 906 xprt_schedule_autodisconnect(xprt); 907 out: 908 spin_unlock(&xprt->transport_lock); 909 wake_up_bit(&xprt->state, XPRT_LOCKED); 910 } 911 EXPORT_SYMBOL_GPL(xprt_unlock_connect); 912 913 /** 914 * xprt_connect - schedule a transport connect operation 915 * @task: RPC task that is requesting the connect 916 * 917 */ 918 void xprt_connect(struct rpc_task *task) 919 { 920 struct rpc_xprt *xprt = task->tk_rqstp->rq_xprt; 921 922 trace_xprt_connect(xprt); 923 924 if (!xprt_bound(xprt)) { 925 task->tk_status = -EAGAIN; 926 return; 927 } 928 if (!xprt_lock_write(xprt, task)) 929 return; 930 931 if (test_and_clear_bit(XPRT_CLOSE_WAIT, &xprt->state)) { 932 trace_xprt_disconnect_cleanup(xprt); 933 xprt->ops->close(xprt); 934 } 935 936 if (!xprt_connected(xprt)) { 937 task->tk_rqstp->rq_connect_cookie = xprt->connect_cookie; 938 rpc_sleep_on_timeout(&xprt->pending, task, NULL, 939 xprt_request_timeout(task->tk_rqstp)); 940 941 if (test_bit(XPRT_CLOSING, &xprt->state)) 942 return; 943 if (xprt_test_and_set_connecting(xprt)) 944 return; 945 /* Race breaker */ 946 if (!xprt_connected(xprt)) { 947 xprt->stat.connect_start = jiffies; 948 xprt->ops->connect(xprt, task); 949 } else { 950 xprt_clear_connecting(xprt); 951 task->tk_status = 0; 952 rpc_wake_up_queued_task(&xprt->pending, task); 953 } 954 } 955 xprt_release_write(xprt, task); 956 } 957 958 /** 959 * xprt_reconnect_delay - compute the wait before scheduling a connect 960 * @xprt: transport instance 961 * 962 */ 963 unsigned long xprt_reconnect_delay(const struct rpc_xprt *xprt) 964 { 965 unsigned long start, now = jiffies; 966 967 start = xprt->stat.connect_start + xprt->reestablish_timeout; 968 if (time_after(start, now)) 969 return start - now; 970 return 0; 971 } 972 EXPORT_SYMBOL_GPL(xprt_reconnect_delay); 973 974 /** 975 * xprt_reconnect_backoff - compute the new re-establish timeout 976 * @xprt: transport instance 977 * @init_to: initial reestablish timeout 978 * 979 */ 980 void xprt_reconnect_backoff(struct rpc_xprt *xprt, unsigned long init_to) 981 { 982 xprt->reestablish_timeout <<= 1; 983 if (xprt->reestablish_timeout > xprt->max_reconnect_timeout) 984 xprt->reestablish_timeout = xprt->max_reconnect_timeout; 985 if (xprt->reestablish_timeout < init_to) 986 xprt->reestablish_timeout = init_to; 987 } 988 EXPORT_SYMBOL_GPL(xprt_reconnect_backoff); 989 990 enum xprt_xid_rb_cmp { 991 XID_RB_EQUAL, 992 XID_RB_LEFT, 993 XID_RB_RIGHT, 994 }; 995 static enum xprt_xid_rb_cmp 996 xprt_xid_cmp(__be32 xid1, __be32 xid2) 997 { 998 if (xid1 == xid2) 999 return XID_RB_EQUAL; 1000 if ((__force u32)xid1 < (__force u32)xid2) 1001 return XID_RB_LEFT; 1002 return XID_RB_RIGHT; 1003 } 1004 1005 static struct rpc_rqst * 1006 xprt_request_rb_find(struct rpc_xprt *xprt, __be32 xid) 1007 { 1008 struct rb_node *n = xprt->recv_queue.rb_node; 1009 struct rpc_rqst *req; 1010 1011 while (n != NULL) { 1012 req = rb_entry(n, struct rpc_rqst, rq_recv); 1013 switch (xprt_xid_cmp(xid, req->rq_xid)) { 1014 case XID_RB_LEFT: 1015 n = n->rb_left; 1016 break; 1017 case XID_RB_RIGHT: 1018 n = n->rb_right; 1019 break; 1020 case XID_RB_EQUAL: 1021 return req; 1022 } 1023 } 1024 return NULL; 1025 } 1026 1027 static void 1028 xprt_request_rb_insert(struct rpc_xprt *xprt, struct rpc_rqst *new) 1029 { 1030 struct rb_node **p = &xprt->recv_queue.rb_node; 1031 struct rb_node *n = NULL; 1032 struct rpc_rqst *req; 1033 1034 while (*p != NULL) { 1035 n = *p; 1036 req = rb_entry(n, struct rpc_rqst, rq_recv); 1037 switch(xprt_xid_cmp(new->rq_xid, req->rq_xid)) { 1038 case XID_RB_LEFT: 1039 p = &n->rb_left; 1040 break; 1041 case XID_RB_RIGHT: 1042 p = &n->rb_right; 1043 break; 1044 case XID_RB_EQUAL: 1045 WARN_ON_ONCE(new != req); 1046 return; 1047 } 1048 } 1049 rb_link_node(&new->rq_recv, n, p); 1050 rb_insert_color(&new->rq_recv, &xprt->recv_queue); 1051 } 1052 1053 static void 1054 xprt_request_rb_remove(struct rpc_xprt *xprt, struct rpc_rqst *req) 1055 { 1056 rb_erase(&req->rq_recv, &xprt->recv_queue); 1057 } 1058 1059 /** 1060 * xprt_lookup_rqst - find an RPC request corresponding to an XID 1061 * @xprt: transport on which the original request was transmitted 1062 * @xid: RPC XID of incoming reply 1063 * 1064 * Caller holds xprt->queue_lock. 1065 */ 1066 struct rpc_rqst *xprt_lookup_rqst(struct rpc_xprt *xprt, __be32 xid) 1067 { 1068 struct rpc_rqst *entry; 1069 1070 entry = xprt_request_rb_find(xprt, xid); 1071 if (entry != NULL) { 1072 trace_xprt_lookup_rqst(xprt, xid, 0); 1073 entry->rq_rtt = ktime_sub(ktime_get(), entry->rq_xtime); 1074 return entry; 1075 } 1076 1077 dprintk("RPC: xprt_lookup_rqst did not find xid %08x\n", 1078 ntohl(xid)); 1079 trace_xprt_lookup_rqst(xprt, xid, -ENOENT); 1080 xprt->stat.bad_xids++; 1081 return NULL; 1082 } 1083 EXPORT_SYMBOL_GPL(xprt_lookup_rqst); 1084 1085 static bool 1086 xprt_is_pinned_rqst(struct rpc_rqst *req) 1087 { 1088 return atomic_read(&req->rq_pin) != 0; 1089 } 1090 1091 /** 1092 * xprt_pin_rqst - Pin a request on the transport receive list 1093 * @req: Request to pin 1094 * 1095 * Caller must ensure this is atomic with the call to xprt_lookup_rqst() 1096 * so should be holding xprt->queue_lock. 1097 */ 1098 void xprt_pin_rqst(struct rpc_rqst *req) 1099 { 1100 atomic_inc(&req->rq_pin); 1101 } 1102 EXPORT_SYMBOL_GPL(xprt_pin_rqst); 1103 1104 /** 1105 * xprt_unpin_rqst - Unpin a request on the transport receive list 1106 * @req: Request to pin 1107 * 1108 * Caller should be holding xprt->queue_lock. 1109 */ 1110 void xprt_unpin_rqst(struct rpc_rqst *req) 1111 { 1112 if (!test_bit(RPC_TASK_MSG_PIN_WAIT, &req->rq_task->tk_runstate)) { 1113 atomic_dec(&req->rq_pin); 1114 return; 1115 } 1116 if (atomic_dec_and_test(&req->rq_pin)) 1117 wake_up_var(&req->rq_pin); 1118 } 1119 EXPORT_SYMBOL_GPL(xprt_unpin_rqst); 1120 1121 static void xprt_wait_on_pinned_rqst(struct rpc_rqst *req) 1122 { 1123 wait_var_event(&req->rq_pin, !xprt_is_pinned_rqst(req)); 1124 } 1125 1126 static bool 1127 xprt_request_data_received(struct rpc_task *task) 1128 { 1129 return !test_bit(RPC_TASK_NEED_RECV, &task->tk_runstate) && 1130 READ_ONCE(task->tk_rqstp->rq_reply_bytes_recvd) != 0; 1131 } 1132 1133 static bool 1134 xprt_request_need_enqueue_receive(struct rpc_task *task, struct rpc_rqst *req) 1135 { 1136 return !test_bit(RPC_TASK_NEED_RECV, &task->tk_runstate) && 1137 READ_ONCE(task->tk_rqstp->rq_reply_bytes_recvd) == 0; 1138 } 1139 1140 /** 1141 * xprt_request_enqueue_receive - Add an request to the receive queue 1142 * @task: RPC task 1143 * 1144 */ 1145 void 1146 xprt_request_enqueue_receive(struct rpc_task *task) 1147 { 1148 struct rpc_rqst *req = task->tk_rqstp; 1149 struct rpc_xprt *xprt = req->rq_xprt; 1150 1151 if (!xprt_request_need_enqueue_receive(task, req)) 1152 return; 1153 1154 xprt_request_prepare(task->tk_rqstp); 1155 spin_lock(&xprt->queue_lock); 1156 1157 /* Update the softirq receive buffer */ 1158 memcpy(&req->rq_private_buf, &req->rq_rcv_buf, 1159 sizeof(req->rq_private_buf)); 1160 1161 /* Add request to the receive list */ 1162 xprt_request_rb_insert(xprt, req); 1163 set_bit(RPC_TASK_NEED_RECV, &task->tk_runstate); 1164 spin_unlock(&xprt->queue_lock); 1165 1166 /* Turn off autodisconnect */ 1167 del_singleshot_timer_sync(&xprt->timer); 1168 } 1169 1170 /** 1171 * xprt_request_dequeue_receive_locked - Remove a request from the receive queue 1172 * @task: RPC task 1173 * 1174 * Caller must hold xprt->queue_lock. 1175 */ 1176 static void 1177 xprt_request_dequeue_receive_locked(struct rpc_task *task) 1178 { 1179 struct rpc_rqst *req = task->tk_rqstp; 1180 1181 if (test_and_clear_bit(RPC_TASK_NEED_RECV, &task->tk_runstate)) 1182 xprt_request_rb_remove(req->rq_xprt, req); 1183 } 1184 1185 /** 1186 * xprt_update_rtt - Update RPC RTT statistics 1187 * @task: RPC request that recently completed 1188 * 1189 * Caller holds xprt->queue_lock. 1190 */ 1191 void xprt_update_rtt(struct rpc_task *task) 1192 { 1193 struct rpc_rqst *req = task->tk_rqstp; 1194 struct rpc_rtt *rtt = task->tk_client->cl_rtt; 1195 unsigned int timer = task->tk_msg.rpc_proc->p_timer; 1196 long m = usecs_to_jiffies(ktime_to_us(req->rq_rtt)); 1197 1198 if (timer) { 1199 if (req->rq_ntrans == 1) 1200 rpc_update_rtt(rtt, timer, m); 1201 rpc_set_timeo(rtt, timer, req->rq_ntrans - 1); 1202 } 1203 } 1204 EXPORT_SYMBOL_GPL(xprt_update_rtt); 1205 1206 /** 1207 * xprt_complete_rqst - called when reply processing is complete 1208 * @task: RPC request that recently completed 1209 * @copied: actual number of bytes received from the transport 1210 * 1211 * Caller holds xprt->queue_lock. 1212 */ 1213 void xprt_complete_rqst(struct rpc_task *task, int copied) 1214 { 1215 struct rpc_rqst *req = task->tk_rqstp; 1216 struct rpc_xprt *xprt = req->rq_xprt; 1217 1218 xprt->stat.recvs++; 1219 1220 req->rq_private_buf.len = copied; 1221 /* Ensure all writes are done before we update */ 1222 /* req->rq_reply_bytes_recvd */ 1223 smp_wmb(); 1224 req->rq_reply_bytes_recvd = copied; 1225 xprt_request_dequeue_receive_locked(task); 1226 rpc_wake_up_queued_task(&xprt->pending, task); 1227 } 1228 EXPORT_SYMBOL_GPL(xprt_complete_rqst); 1229 1230 static void xprt_timer(struct rpc_task *task) 1231 { 1232 struct rpc_rqst *req = task->tk_rqstp; 1233 struct rpc_xprt *xprt = req->rq_xprt; 1234 1235 if (task->tk_status != -ETIMEDOUT) 1236 return; 1237 1238 trace_xprt_timer(xprt, req->rq_xid, task->tk_status); 1239 if (!req->rq_reply_bytes_recvd) { 1240 if (xprt->ops->timer) 1241 xprt->ops->timer(xprt, task); 1242 } else 1243 task->tk_status = 0; 1244 } 1245 1246 /** 1247 * xprt_wait_for_reply_request_def - wait for reply 1248 * @task: pointer to rpc_task 1249 * 1250 * Set a request's retransmit timeout based on the transport's 1251 * default timeout parameters. Used by transports that don't adjust 1252 * the retransmit timeout based on round-trip time estimation, 1253 * and put the task to sleep on the pending queue. 1254 */ 1255 void xprt_wait_for_reply_request_def(struct rpc_task *task) 1256 { 1257 struct rpc_rqst *req = task->tk_rqstp; 1258 1259 rpc_sleep_on_timeout(&req->rq_xprt->pending, task, xprt_timer, 1260 xprt_request_timeout(req)); 1261 } 1262 EXPORT_SYMBOL_GPL(xprt_wait_for_reply_request_def); 1263 1264 /** 1265 * xprt_wait_for_reply_request_rtt - wait for reply using RTT estimator 1266 * @task: pointer to rpc_task 1267 * 1268 * Set a request's retransmit timeout using the RTT estimator, 1269 * and put the task to sleep on the pending queue. 1270 */ 1271 void xprt_wait_for_reply_request_rtt(struct rpc_task *task) 1272 { 1273 int timer = task->tk_msg.rpc_proc->p_timer; 1274 struct rpc_clnt *clnt = task->tk_client; 1275 struct rpc_rtt *rtt = clnt->cl_rtt; 1276 struct rpc_rqst *req = task->tk_rqstp; 1277 unsigned long max_timeout = clnt->cl_timeout->to_maxval; 1278 unsigned long timeout; 1279 1280 timeout = rpc_calc_rto(rtt, timer); 1281 timeout <<= rpc_ntimeo(rtt, timer) + req->rq_retries; 1282 if (timeout > max_timeout || timeout == 0) 1283 timeout = max_timeout; 1284 rpc_sleep_on_timeout(&req->rq_xprt->pending, task, xprt_timer, 1285 jiffies + timeout); 1286 } 1287 EXPORT_SYMBOL_GPL(xprt_wait_for_reply_request_rtt); 1288 1289 /** 1290 * xprt_request_wait_receive - wait for the reply to an RPC request 1291 * @task: RPC task about to send a request 1292 * 1293 */ 1294 void xprt_request_wait_receive(struct rpc_task *task) 1295 { 1296 struct rpc_rqst *req = task->tk_rqstp; 1297 struct rpc_xprt *xprt = req->rq_xprt; 1298 1299 if (!test_bit(RPC_TASK_NEED_RECV, &task->tk_runstate)) 1300 return; 1301 /* 1302 * Sleep on the pending queue if we're expecting a reply. 1303 * The spinlock ensures atomicity between the test of 1304 * req->rq_reply_bytes_recvd, and the call to rpc_sleep_on(). 1305 */ 1306 spin_lock(&xprt->queue_lock); 1307 if (test_bit(RPC_TASK_NEED_RECV, &task->tk_runstate)) { 1308 xprt->ops->wait_for_reply_request(task); 1309 /* 1310 * Send an extra queue wakeup call if the 1311 * connection was dropped in case the call to 1312 * rpc_sleep_on() raced. 1313 */ 1314 if (xprt_request_retransmit_after_disconnect(task)) 1315 rpc_wake_up_queued_task_set_status(&xprt->pending, 1316 task, -ENOTCONN); 1317 } 1318 spin_unlock(&xprt->queue_lock); 1319 } 1320 1321 static bool 1322 xprt_request_need_enqueue_transmit(struct rpc_task *task, struct rpc_rqst *req) 1323 { 1324 return !test_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate); 1325 } 1326 1327 /** 1328 * xprt_request_enqueue_transmit - queue a task for transmission 1329 * @task: pointer to rpc_task 1330 * 1331 * Add a task to the transmission queue. 1332 */ 1333 void 1334 xprt_request_enqueue_transmit(struct rpc_task *task) 1335 { 1336 struct rpc_rqst *pos, *req = task->tk_rqstp; 1337 struct rpc_xprt *xprt = req->rq_xprt; 1338 1339 if (xprt_request_need_enqueue_transmit(task, req)) { 1340 req->rq_bytes_sent = 0; 1341 spin_lock(&xprt->queue_lock); 1342 /* 1343 * Requests that carry congestion control credits are added 1344 * to the head of the list to avoid starvation issues. 1345 */ 1346 if (req->rq_cong) { 1347 xprt_clear_congestion_window_wait(xprt); 1348 list_for_each_entry(pos, &xprt->xmit_queue, rq_xmit) { 1349 if (pos->rq_cong) 1350 continue; 1351 /* Note: req is added _before_ pos */ 1352 list_add_tail(&req->rq_xmit, &pos->rq_xmit); 1353 INIT_LIST_HEAD(&req->rq_xmit2); 1354 goto out; 1355 } 1356 } else if (RPC_IS_SWAPPER(task)) { 1357 list_for_each_entry(pos, &xprt->xmit_queue, rq_xmit) { 1358 if (pos->rq_cong || pos->rq_bytes_sent) 1359 continue; 1360 if (RPC_IS_SWAPPER(pos->rq_task)) 1361 continue; 1362 /* Note: req is added _before_ pos */ 1363 list_add_tail(&req->rq_xmit, &pos->rq_xmit); 1364 INIT_LIST_HEAD(&req->rq_xmit2); 1365 goto out; 1366 } 1367 } else if (!req->rq_seqno) { 1368 list_for_each_entry(pos, &xprt->xmit_queue, rq_xmit) { 1369 if (pos->rq_task->tk_owner != task->tk_owner) 1370 continue; 1371 list_add_tail(&req->rq_xmit2, &pos->rq_xmit2); 1372 INIT_LIST_HEAD(&req->rq_xmit); 1373 goto out; 1374 } 1375 } 1376 list_add_tail(&req->rq_xmit, &xprt->xmit_queue); 1377 INIT_LIST_HEAD(&req->rq_xmit2); 1378 out: 1379 atomic_long_inc(&xprt->xmit_queuelen); 1380 set_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate); 1381 spin_unlock(&xprt->queue_lock); 1382 } 1383 } 1384 1385 /** 1386 * xprt_request_dequeue_transmit_locked - remove a task from the transmission queue 1387 * @task: pointer to rpc_task 1388 * 1389 * Remove a task from the transmission queue 1390 * Caller must hold xprt->queue_lock 1391 */ 1392 static void 1393 xprt_request_dequeue_transmit_locked(struct rpc_task *task) 1394 { 1395 struct rpc_rqst *req = task->tk_rqstp; 1396 1397 if (!test_and_clear_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate)) 1398 return; 1399 if (!list_empty(&req->rq_xmit)) { 1400 list_del(&req->rq_xmit); 1401 if (!list_empty(&req->rq_xmit2)) { 1402 struct rpc_rqst *next = list_first_entry(&req->rq_xmit2, 1403 struct rpc_rqst, rq_xmit2); 1404 list_del(&req->rq_xmit2); 1405 list_add_tail(&next->rq_xmit, &next->rq_xprt->xmit_queue); 1406 } 1407 } else 1408 list_del(&req->rq_xmit2); 1409 atomic_long_dec(&req->rq_xprt->xmit_queuelen); 1410 } 1411 1412 /** 1413 * xprt_request_dequeue_transmit - remove a task from the transmission queue 1414 * @task: pointer to rpc_task 1415 * 1416 * Remove a task from the transmission queue 1417 */ 1418 static void 1419 xprt_request_dequeue_transmit(struct rpc_task *task) 1420 { 1421 struct rpc_rqst *req = task->tk_rqstp; 1422 struct rpc_xprt *xprt = req->rq_xprt; 1423 1424 spin_lock(&xprt->queue_lock); 1425 xprt_request_dequeue_transmit_locked(task); 1426 spin_unlock(&xprt->queue_lock); 1427 } 1428 1429 /** 1430 * xprt_request_dequeue_xprt - remove a task from the transmit+receive queue 1431 * @task: pointer to rpc_task 1432 * 1433 * Remove a task from the transmit and receive queues, and ensure that 1434 * it is not pinned by the receive work item. 1435 */ 1436 void 1437 xprt_request_dequeue_xprt(struct rpc_task *task) 1438 { 1439 struct rpc_rqst *req = task->tk_rqstp; 1440 struct rpc_xprt *xprt = req->rq_xprt; 1441 1442 if (test_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate) || 1443 test_bit(RPC_TASK_NEED_RECV, &task->tk_runstate) || 1444 xprt_is_pinned_rqst(req)) { 1445 spin_lock(&xprt->queue_lock); 1446 xprt_request_dequeue_transmit_locked(task); 1447 xprt_request_dequeue_receive_locked(task); 1448 while (xprt_is_pinned_rqst(req)) { 1449 set_bit(RPC_TASK_MSG_PIN_WAIT, &task->tk_runstate); 1450 spin_unlock(&xprt->queue_lock); 1451 xprt_wait_on_pinned_rqst(req); 1452 spin_lock(&xprt->queue_lock); 1453 clear_bit(RPC_TASK_MSG_PIN_WAIT, &task->tk_runstate); 1454 } 1455 spin_unlock(&xprt->queue_lock); 1456 } 1457 } 1458 1459 /** 1460 * xprt_request_prepare - prepare an encoded request for transport 1461 * @req: pointer to rpc_rqst 1462 * 1463 * Calls into the transport layer to do whatever is needed to prepare 1464 * the request for transmission or receive. 1465 */ 1466 void 1467 xprt_request_prepare(struct rpc_rqst *req) 1468 { 1469 struct rpc_xprt *xprt = req->rq_xprt; 1470 1471 if (xprt->ops->prepare_request) 1472 xprt->ops->prepare_request(req); 1473 } 1474 1475 /** 1476 * xprt_request_need_retransmit - Test if a task needs retransmission 1477 * @task: pointer to rpc_task 1478 * 1479 * Test for whether a connection breakage requires the task to retransmit 1480 */ 1481 bool 1482 xprt_request_need_retransmit(struct rpc_task *task) 1483 { 1484 return xprt_request_retransmit_after_disconnect(task); 1485 } 1486 1487 /** 1488 * xprt_prepare_transmit - reserve the transport before sending a request 1489 * @task: RPC task about to send a request 1490 * 1491 */ 1492 bool xprt_prepare_transmit(struct rpc_task *task) 1493 { 1494 struct rpc_rqst *req = task->tk_rqstp; 1495 struct rpc_xprt *xprt = req->rq_xprt; 1496 1497 if (!xprt_lock_write(xprt, task)) { 1498 /* Race breaker: someone may have transmitted us */ 1499 if (!test_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate)) 1500 rpc_wake_up_queued_task_set_status(&xprt->sending, 1501 task, 0); 1502 return false; 1503 1504 } 1505 return true; 1506 } 1507 1508 void xprt_end_transmit(struct rpc_task *task) 1509 { 1510 struct rpc_xprt *xprt = task->tk_rqstp->rq_xprt; 1511 1512 xprt_inject_disconnect(xprt); 1513 xprt_release_write(xprt, task); 1514 } 1515 1516 /** 1517 * xprt_request_transmit - send an RPC request on a transport 1518 * @req: pointer to request to transmit 1519 * @snd_task: RPC task that owns the transport lock 1520 * 1521 * This performs the transmission of a single request. 1522 * Note that if the request is not the same as snd_task, then it 1523 * does need to be pinned. 1524 * Returns '0' on success. 1525 */ 1526 static int 1527 xprt_request_transmit(struct rpc_rqst *req, struct rpc_task *snd_task) 1528 { 1529 struct rpc_xprt *xprt = req->rq_xprt; 1530 struct rpc_task *task = req->rq_task; 1531 unsigned int connect_cookie; 1532 int is_retrans = RPC_WAS_SENT(task); 1533 int status; 1534 1535 if (!req->rq_bytes_sent) { 1536 if (xprt_request_data_received(task)) { 1537 status = 0; 1538 goto out_dequeue; 1539 } 1540 /* Verify that our message lies in the RPCSEC_GSS window */ 1541 if (rpcauth_xmit_need_reencode(task)) { 1542 status = -EBADMSG; 1543 goto out_dequeue; 1544 } 1545 if (RPC_SIGNALLED(task)) { 1546 status = -ERESTARTSYS; 1547 goto out_dequeue; 1548 } 1549 } 1550 1551 /* 1552 * Update req->rq_ntrans before transmitting to avoid races with 1553 * xprt_update_rtt(), which needs to know that it is recording a 1554 * reply to the first transmission. 1555 */ 1556 req->rq_ntrans++; 1557 1558 trace_rpc_xdr_sendto(task, &req->rq_snd_buf); 1559 connect_cookie = xprt->connect_cookie; 1560 status = xprt->ops->send_request(req); 1561 if (status != 0) { 1562 req->rq_ntrans--; 1563 trace_xprt_transmit(req, status); 1564 return status; 1565 } 1566 1567 if (is_retrans) { 1568 task->tk_client->cl_stats->rpcretrans++; 1569 trace_xprt_retransmit(req); 1570 } 1571 1572 xprt_inject_disconnect(xprt); 1573 1574 task->tk_flags |= RPC_TASK_SENT; 1575 spin_lock(&xprt->transport_lock); 1576 1577 xprt->stat.sends++; 1578 xprt->stat.req_u += xprt->stat.sends - xprt->stat.recvs; 1579 xprt->stat.bklog_u += xprt->backlog.qlen; 1580 xprt->stat.sending_u += xprt->sending.qlen; 1581 xprt->stat.pending_u += xprt->pending.qlen; 1582 spin_unlock(&xprt->transport_lock); 1583 1584 req->rq_connect_cookie = connect_cookie; 1585 out_dequeue: 1586 trace_xprt_transmit(req, status); 1587 xprt_request_dequeue_transmit(task); 1588 rpc_wake_up_queued_task_set_status(&xprt->sending, task, status); 1589 return status; 1590 } 1591 1592 /** 1593 * xprt_transmit - send an RPC request on a transport 1594 * @task: controlling RPC task 1595 * 1596 * Attempts to drain the transmit queue. On exit, either the transport 1597 * signalled an error that needs to be handled before transmission can 1598 * resume, or @task finished transmitting, and detected that it already 1599 * received a reply. 1600 */ 1601 void 1602 xprt_transmit(struct rpc_task *task) 1603 { 1604 struct rpc_rqst *next, *req = task->tk_rqstp; 1605 struct rpc_xprt *xprt = req->rq_xprt; 1606 int status; 1607 1608 spin_lock(&xprt->queue_lock); 1609 for (;;) { 1610 next = list_first_entry_or_null(&xprt->xmit_queue, 1611 struct rpc_rqst, rq_xmit); 1612 if (!next) 1613 break; 1614 xprt_pin_rqst(next); 1615 spin_unlock(&xprt->queue_lock); 1616 status = xprt_request_transmit(next, task); 1617 if (status == -EBADMSG && next != req) 1618 status = 0; 1619 spin_lock(&xprt->queue_lock); 1620 xprt_unpin_rqst(next); 1621 if (status < 0) { 1622 if (test_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate)) 1623 task->tk_status = status; 1624 break; 1625 } 1626 /* Was @task transmitted, and has it received a reply? */ 1627 if (xprt_request_data_received(task) && 1628 !test_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate)) 1629 break; 1630 cond_resched_lock(&xprt->queue_lock); 1631 } 1632 spin_unlock(&xprt->queue_lock); 1633 } 1634 1635 static void xprt_complete_request_init(struct rpc_task *task) 1636 { 1637 if (task->tk_rqstp) 1638 xprt_request_init(task); 1639 } 1640 1641 void xprt_add_backlog(struct rpc_xprt *xprt, struct rpc_task *task) 1642 { 1643 set_bit(XPRT_CONGESTED, &xprt->state); 1644 rpc_sleep_on(&xprt->backlog, task, xprt_complete_request_init); 1645 } 1646 EXPORT_SYMBOL_GPL(xprt_add_backlog); 1647 1648 static bool __xprt_set_rq(struct rpc_task *task, void *data) 1649 { 1650 struct rpc_rqst *req = data; 1651 1652 if (task->tk_rqstp == NULL) { 1653 memset(req, 0, sizeof(*req)); /* mark unused */ 1654 task->tk_rqstp = req; 1655 return true; 1656 } 1657 return false; 1658 } 1659 1660 bool xprt_wake_up_backlog(struct rpc_xprt *xprt, struct rpc_rqst *req) 1661 { 1662 if (rpc_wake_up_first(&xprt->backlog, __xprt_set_rq, req) == NULL) { 1663 clear_bit(XPRT_CONGESTED, &xprt->state); 1664 return false; 1665 } 1666 return true; 1667 } 1668 EXPORT_SYMBOL_GPL(xprt_wake_up_backlog); 1669 1670 static bool xprt_throttle_congested(struct rpc_xprt *xprt, struct rpc_task *task) 1671 { 1672 bool ret = false; 1673 1674 if (!test_bit(XPRT_CONGESTED, &xprt->state)) 1675 goto out; 1676 spin_lock(&xprt->reserve_lock); 1677 if (test_bit(XPRT_CONGESTED, &xprt->state)) { 1678 xprt_add_backlog(xprt, task); 1679 ret = true; 1680 } 1681 spin_unlock(&xprt->reserve_lock); 1682 out: 1683 return ret; 1684 } 1685 1686 static struct rpc_rqst *xprt_dynamic_alloc_slot(struct rpc_xprt *xprt) 1687 { 1688 struct rpc_rqst *req = ERR_PTR(-EAGAIN); 1689 1690 if (xprt->num_reqs >= xprt->max_reqs) 1691 goto out; 1692 ++xprt->num_reqs; 1693 spin_unlock(&xprt->reserve_lock); 1694 req = kzalloc(sizeof(struct rpc_rqst), GFP_NOFS); 1695 spin_lock(&xprt->reserve_lock); 1696 if (req != NULL) 1697 goto out; 1698 --xprt->num_reqs; 1699 req = ERR_PTR(-ENOMEM); 1700 out: 1701 return req; 1702 } 1703 1704 static bool xprt_dynamic_free_slot(struct rpc_xprt *xprt, struct rpc_rqst *req) 1705 { 1706 if (xprt->num_reqs > xprt->min_reqs) { 1707 --xprt->num_reqs; 1708 kfree(req); 1709 return true; 1710 } 1711 return false; 1712 } 1713 1714 void xprt_alloc_slot(struct rpc_xprt *xprt, struct rpc_task *task) 1715 { 1716 struct rpc_rqst *req; 1717 1718 spin_lock(&xprt->reserve_lock); 1719 if (!list_empty(&xprt->free)) { 1720 req = list_entry(xprt->free.next, struct rpc_rqst, rq_list); 1721 list_del(&req->rq_list); 1722 goto out_init_req; 1723 } 1724 req = xprt_dynamic_alloc_slot(xprt); 1725 if (!IS_ERR(req)) 1726 goto out_init_req; 1727 switch (PTR_ERR(req)) { 1728 case -ENOMEM: 1729 dprintk("RPC: dynamic allocation of request slot " 1730 "failed! Retrying\n"); 1731 task->tk_status = -ENOMEM; 1732 break; 1733 case -EAGAIN: 1734 xprt_add_backlog(xprt, task); 1735 dprintk("RPC: waiting for request slot\n"); 1736 fallthrough; 1737 default: 1738 task->tk_status = -EAGAIN; 1739 } 1740 spin_unlock(&xprt->reserve_lock); 1741 return; 1742 out_init_req: 1743 xprt->stat.max_slots = max_t(unsigned int, xprt->stat.max_slots, 1744 xprt->num_reqs); 1745 spin_unlock(&xprt->reserve_lock); 1746 1747 task->tk_status = 0; 1748 task->tk_rqstp = req; 1749 } 1750 EXPORT_SYMBOL_GPL(xprt_alloc_slot); 1751 1752 void xprt_free_slot(struct rpc_xprt *xprt, struct rpc_rqst *req) 1753 { 1754 spin_lock(&xprt->reserve_lock); 1755 if (!xprt_wake_up_backlog(xprt, req) && 1756 !xprt_dynamic_free_slot(xprt, req)) { 1757 memset(req, 0, sizeof(*req)); /* mark unused */ 1758 list_add(&req->rq_list, &xprt->free); 1759 } 1760 spin_unlock(&xprt->reserve_lock); 1761 } 1762 EXPORT_SYMBOL_GPL(xprt_free_slot); 1763 1764 static void xprt_free_all_slots(struct rpc_xprt *xprt) 1765 { 1766 struct rpc_rqst *req; 1767 while (!list_empty(&xprt->free)) { 1768 req = list_first_entry(&xprt->free, struct rpc_rqst, rq_list); 1769 list_del(&req->rq_list); 1770 kfree(req); 1771 } 1772 } 1773 1774 static DEFINE_IDA(rpc_xprt_ids); 1775 1776 void xprt_cleanup_ids(void) 1777 { 1778 ida_destroy(&rpc_xprt_ids); 1779 } 1780 1781 static int xprt_alloc_id(struct rpc_xprt *xprt) 1782 { 1783 int id; 1784 1785 id = ida_simple_get(&rpc_xprt_ids, 0, 0, GFP_KERNEL); 1786 if (id < 0) 1787 return id; 1788 1789 xprt->id = id; 1790 return 0; 1791 } 1792 1793 static void xprt_free_id(struct rpc_xprt *xprt) 1794 { 1795 ida_simple_remove(&rpc_xprt_ids, xprt->id); 1796 } 1797 1798 struct rpc_xprt *xprt_alloc(struct net *net, size_t size, 1799 unsigned int num_prealloc, 1800 unsigned int max_alloc) 1801 { 1802 struct rpc_xprt *xprt; 1803 struct rpc_rqst *req; 1804 int i; 1805 1806 xprt = kzalloc(size, GFP_KERNEL); 1807 if (xprt == NULL) 1808 goto out; 1809 1810 xprt_alloc_id(xprt); 1811 xprt_init(xprt, net); 1812 1813 for (i = 0; i < num_prealloc; i++) { 1814 req = kzalloc(sizeof(struct rpc_rqst), GFP_KERNEL); 1815 if (!req) 1816 goto out_free; 1817 list_add(&req->rq_list, &xprt->free); 1818 } 1819 if (max_alloc > num_prealloc) 1820 xprt->max_reqs = max_alloc; 1821 else 1822 xprt->max_reqs = num_prealloc; 1823 xprt->min_reqs = num_prealloc; 1824 xprt->num_reqs = num_prealloc; 1825 1826 return xprt; 1827 1828 out_free: 1829 xprt_free(xprt); 1830 out: 1831 return NULL; 1832 } 1833 EXPORT_SYMBOL_GPL(xprt_alloc); 1834 1835 void xprt_free(struct rpc_xprt *xprt) 1836 { 1837 put_net(xprt->xprt_net); 1838 xprt_free_all_slots(xprt); 1839 xprt_free_id(xprt); 1840 rpc_sysfs_xprt_destroy(xprt); 1841 kfree_rcu(xprt, rcu); 1842 } 1843 EXPORT_SYMBOL_GPL(xprt_free); 1844 1845 static void 1846 xprt_init_connect_cookie(struct rpc_rqst *req, struct rpc_xprt *xprt) 1847 { 1848 req->rq_connect_cookie = xprt_connect_cookie(xprt) - 1; 1849 } 1850 1851 static __be32 1852 xprt_alloc_xid(struct rpc_xprt *xprt) 1853 { 1854 __be32 xid; 1855 1856 spin_lock(&xprt->reserve_lock); 1857 xid = (__force __be32)xprt->xid++; 1858 spin_unlock(&xprt->reserve_lock); 1859 return xid; 1860 } 1861 1862 static void 1863 xprt_init_xid(struct rpc_xprt *xprt) 1864 { 1865 xprt->xid = prandom_u32(); 1866 } 1867 1868 static void 1869 xprt_request_init(struct rpc_task *task) 1870 { 1871 struct rpc_xprt *xprt = task->tk_xprt; 1872 struct rpc_rqst *req = task->tk_rqstp; 1873 1874 req->rq_task = task; 1875 req->rq_xprt = xprt; 1876 req->rq_buffer = NULL; 1877 req->rq_xid = xprt_alloc_xid(xprt); 1878 xprt_init_connect_cookie(req, xprt); 1879 req->rq_snd_buf.len = 0; 1880 req->rq_snd_buf.buflen = 0; 1881 req->rq_rcv_buf.len = 0; 1882 req->rq_rcv_buf.buflen = 0; 1883 req->rq_snd_buf.bvec = NULL; 1884 req->rq_rcv_buf.bvec = NULL; 1885 req->rq_release_snd_buf = NULL; 1886 xprt_init_majortimeo(task, req); 1887 1888 trace_xprt_reserve(req); 1889 } 1890 1891 static void 1892 xprt_do_reserve(struct rpc_xprt *xprt, struct rpc_task *task) 1893 { 1894 xprt->ops->alloc_slot(xprt, task); 1895 if (task->tk_rqstp != NULL) 1896 xprt_request_init(task); 1897 } 1898 1899 /** 1900 * xprt_reserve - allocate an RPC request slot 1901 * @task: RPC task requesting a slot allocation 1902 * 1903 * If the transport is marked as being congested, or if no more 1904 * slots are available, place the task on the transport's 1905 * backlog queue. 1906 */ 1907 void xprt_reserve(struct rpc_task *task) 1908 { 1909 struct rpc_xprt *xprt = task->tk_xprt; 1910 1911 task->tk_status = 0; 1912 if (task->tk_rqstp != NULL) 1913 return; 1914 1915 task->tk_status = -EAGAIN; 1916 if (!xprt_throttle_congested(xprt, task)) 1917 xprt_do_reserve(xprt, task); 1918 } 1919 1920 /** 1921 * xprt_retry_reserve - allocate an RPC request slot 1922 * @task: RPC task requesting a slot allocation 1923 * 1924 * If no more slots are available, place the task on the transport's 1925 * backlog queue. 1926 * Note that the only difference with xprt_reserve is that we now 1927 * ignore the value of the XPRT_CONGESTED flag. 1928 */ 1929 void xprt_retry_reserve(struct rpc_task *task) 1930 { 1931 struct rpc_xprt *xprt = task->tk_xprt; 1932 1933 task->tk_status = 0; 1934 if (task->tk_rqstp != NULL) 1935 return; 1936 1937 task->tk_status = -EAGAIN; 1938 xprt_do_reserve(xprt, task); 1939 } 1940 1941 /** 1942 * xprt_release - release an RPC request slot 1943 * @task: task which is finished with the slot 1944 * 1945 */ 1946 void xprt_release(struct rpc_task *task) 1947 { 1948 struct rpc_xprt *xprt; 1949 struct rpc_rqst *req = task->tk_rqstp; 1950 1951 if (req == NULL) { 1952 if (task->tk_client) { 1953 xprt = task->tk_xprt; 1954 xprt_release_write(xprt, task); 1955 } 1956 return; 1957 } 1958 1959 xprt = req->rq_xprt; 1960 xprt_request_dequeue_xprt(task); 1961 spin_lock(&xprt->transport_lock); 1962 xprt->ops->release_xprt(xprt, task); 1963 if (xprt->ops->release_request) 1964 xprt->ops->release_request(task); 1965 xprt_schedule_autodisconnect(xprt); 1966 spin_unlock(&xprt->transport_lock); 1967 if (req->rq_buffer) 1968 xprt->ops->buf_free(task); 1969 xdr_free_bvec(&req->rq_rcv_buf); 1970 xdr_free_bvec(&req->rq_snd_buf); 1971 if (req->rq_cred != NULL) 1972 put_rpccred(req->rq_cred); 1973 if (req->rq_release_snd_buf) 1974 req->rq_release_snd_buf(req); 1975 1976 task->tk_rqstp = NULL; 1977 if (likely(!bc_prealloc(req))) 1978 xprt->ops->free_slot(xprt, req); 1979 else 1980 xprt_free_bc_request(req); 1981 } 1982 1983 #ifdef CONFIG_SUNRPC_BACKCHANNEL 1984 void 1985 xprt_init_bc_request(struct rpc_rqst *req, struct rpc_task *task) 1986 { 1987 struct xdr_buf *xbufp = &req->rq_snd_buf; 1988 1989 task->tk_rqstp = req; 1990 req->rq_task = task; 1991 xprt_init_connect_cookie(req, req->rq_xprt); 1992 /* 1993 * Set up the xdr_buf length. 1994 * This also indicates that the buffer is XDR encoded already. 1995 */ 1996 xbufp->len = xbufp->head[0].iov_len + xbufp->page_len + 1997 xbufp->tail[0].iov_len; 1998 } 1999 #endif 2000 2001 static void xprt_init(struct rpc_xprt *xprt, struct net *net) 2002 { 2003 kref_init(&xprt->kref); 2004 2005 spin_lock_init(&xprt->transport_lock); 2006 spin_lock_init(&xprt->reserve_lock); 2007 spin_lock_init(&xprt->queue_lock); 2008 2009 INIT_LIST_HEAD(&xprt->free); 2010 xprt->recv_queue = RB_ROOT; 2011 INIT_LIST_HEAD(&xprt->xmit_queue); 2012 #if defined(CONFIG_SUNRPC_BACKCHANNEL) 2013 spin_lock_init(&xprt->bc_pa_lock); 2014 INIT_LIST_HEAD(&xprt->bc_pa_list); 2015 #endif /* CONFIG_SUNRPC_BACKCHANNEL */ 2016 INIT_LIST_HEAD(&xprt->xprt_switch); 2017 2018 xprt->last_used = jiffies; 2019 xprt->cwnd = RPC_INITCWND; 2020 xprt->bind_index = 0; 2021 2022 rpc_init_wait_queue(&xprt->binding, "xprt_binding"); 2023 rpc_init_wait_queue(&xprt->pending, "xprt_pending"); 2024 rpc_init_wait_queue(&xprt->sending, "xprt_sending"); 2025 rpc_init_priority_wait_queue(&xprt->backlog, "xprt_backlog"); 2026 2027 xprt_init_xid(xprt); 2028 2029 xprt->xprt_net = get_net(net); 2030 } 2031 2032 /** 2033 * xprt_create_transport - create an RPC transport 2034 * @args: rpc transport creation arguments 2035 * 2036 */ 2037 struct rpc_xprt *xprt_create_transport(struct xprt_create *args) 2038 { 2039 struct rpc_xprt *xprt; 2040 const struct xprt_class *t; 2041 2042 t = xprt_class_find_by_ident(args->ident); 2043 if (!t) { 2044 dprintk("RPC: transport (%d) not supported\n", args->ident); 2045 return ERR_PTR(-EIO); 2046 } 2047 2048 xprt = t->setup(args); 2049 xprt_class_release(t); 2050 2051 if (IS_ERR(xprt)) 2052 goto out; 2053 if (args->flags & XPRT_CREATE_NO_IDLE_TIMEOUT) 2054 xprt->idle_timeout = 0; 2055 INIT_WORK(&xprt->task_cleanup, xprt_autoclose); 2056 if (xprt_has_timer(xprt)) 2057 timer_setup(&xprt->timer, xprt_init_autodisconnect, 0); 2058 else 2059 timer_setup(&xprt->timer, NULL, 0); 2060 2061 if (strlen(args->servername) > RPC_MAXNETNAMELEN) { 2062 xprt_destroy(xprt); 2063 return ERR_PTR(-EINVAL); 2064 } 2065 xprt->servername = kstrdup(args->servername, GFP_KERNEL); 2066 if (xprt->servername == NULL) { 2067 xprt_destroy(xprt); 2068 return ERR_PTR(-ENOMEM); 2069 } 2070 2071 rpc_xprt_debugfs_register(xprt); 2072 2073 trace_xprt_create(xprt); 2074 out: 2075 return xprt; 2076 } 2077 2078 static void xprt_destroy_cb(struct work_struct *work) 2079 { 2080 struct rpc_xprt *xprt = 2081 container_of(work, struct rpc_xprt, task_cleanup); 2082 2083 trace_xprt_destroy(xprt); 2084 2085 rpc_xprt_debugfs_unregister(xprt); 2086 rpc_destroy_wait_queue(&xprt->binding); 2087 rpc_destroy_wait_queue(&xprt->pending); 2088 rpc_destroy_wait_queue(&xprt->sending); 2089 rpc_destroy_wait_queue(&xprt->backlog); 2090 kfree(xprt->servername); 2091 /* 2092 * Destroy any existing back channel 2093 */ 2094 xprt_destroy_backchannel(xprt, UINT_MAX); 2095 2096 /* 2097 * Tear down transport state and free the rpc_xprt 2098 */ 2099 xprt->ops->destroy(xprt); 2100 } 2101 2102 /** 2103 * xprt_destroy - destroy an RPC transport, killing off all requests. 2104 * @xprt: transport to destroy 2105 * 2106 */ 2107 static void xprt_destroy(struct rpc_xprt *xprt) 2108 { 2109 /* 2110 * Exclude transport connect/disconnect handlers and autoclose 2111 */ 2112 wait_on_bit_lock(&xprt->state, XPRT_LOCKED, TASK_UNINTERRUPTIBLE); 2113 2114 del_timer_sync(&xprt->timer); 2115 2116 /* 2117 * Destroy sockets etc from the system workqueue so they can 2118 * safely flush receive work running on rpciod. 2119 */ 2120 INIT_WORK(&xprt->task_cleanup, xprt_destroy_cb); 2121 schedule_work(&xprt->task_cleanup); 2122 } 2123 2124 static void xprt_destroy_kref(struct kref *kref) 2125 { 2126 xprt_destroy(container_of(kref, struct rpc_xprt, kref)); 2127 } 2128 2129 /** 2130 * xprt_get - return a reference to an RPC transport. 2131 * @xprt: pointer to the transport 2132 * 2133 */ 2134 struct rpc_xprt *xprt_get(struct rpc_xprt *xprt) 2135 { 2136 if (xprt != NULL && kref_get_unless_zero(&xprt->kref)) 2137 return xprt; 2138 return NULL; 2139 } 2140 EXPORT_SYMBOL_GPL(xprt_get); 2141 2142 /** 2143 * xprt_put - release a reference to an RPC transport. 2144 * @xprt: pointer to the transport 2145 * 2146 */ 2147 void xprt_put(struct rpc_xprt *xprt) 2148 { 2149 if (xprt != NULL) 2150 kref_put(&xprt->kref, xprt_destroy_kref); 2151 } 2152 EXPORT_SYMBOL_GPL(xprt_put); 2153