1 // SPDX-License-Identifier: GPL-2.0-only 2 /* 3 * linux/net/sunrpc/xprt.c 4 * 5 * This is a generic RPC call interface supporting congestion avoidance, 6 * and asynchronous calls. 7 * 8 * The interface works like this: 9 * 10 * - When a process places a call, it allocates a request slot if 11 * one is available. Otherwise, it sleeps on the backlog queue 12 * (xprt_reserve). 13 * - Next, the caller puts together the RPC message, stuffs it into 14 * the request struct, and calls xprt_transmit(). 15 * - xprt_transmit sends the message and installs the caller on the 16 * transport's wait list. At the same time, if a reply is expected, 17 * it installs a timer that is run after the packet's timeout has 18 * expired. 19 * - When a packet arrives, the data_ready handler walks the list of 20 * pending requests for that transport. If a matching XID is found, the 21 * caller is woken up, and the timer removed. 22 * - When no reply arrives within the timeout interval, the timer is 23 * fired by the kernel and runs xprt_timer(). It either adjusts the 24 * timeout values (minor timeout) or wakes up the caller with a status 25 * of -ETIMEDOUT. 26 * - When the caller receives a notification from RPC that a reply arrived, 27 * it should release the RPC slot, and process the reply. 28 * If the call timed out, it may choose to retry the operation by 29 * adjusting the initial timeout value, and simply calling rpc_call 30 * again. 31 * 32 * Support for async RPC is done through a set of RPC-specific scheduling 33 * primitives that `transparently' work for processes as well as async 34 * tasks that rely on callbacks. 35 * 36 * Copyright (C) 1995-1997, Olaf Kirch <okir@monad.swb.de> 37 * 38 * Transport switch API copyright (C) 2005, Chuck Lever <cel@netapp.com> 39 */ 40 41 #include <linux/module.h> 42 43 #include <linux/types.h> 44 #include <linux/interrupt.h> 45 #include <linux/workqueue.h> 46 #include <linux/net.h> 47 #include <linux/ktime.h> 48 49 #include <linux/sunrpc/clnt.h> 50 #include <linux/sunrpc/metrics.h> 51 #include <linux/sunrpc/bc_xprt.h> 52 #include <linux/rcupdate.h> 53 #include <linux/sched/mm.h> 54 55 #include <trace/events/sunrpc.h> 56 57 #include "sunrpc.h" 58 #include "sysfs.h" 59 #include "fail.h" 60 61 /* 62 * Local variables 63 */ 64 65 #if IS_ENABLED(CONFIG_SUNRPC_DEBUG) 66 # define RPCDBG_FACILITY RPCDBG_XPRT 67 #endif 68 69 /* 70 * Local functions 71 */ 72 static void xprt_init(struct rpc_xprt *xprt, struct net *net); 73 static __be32 xprt_alloc_xid(struct rpc_xprt *xprt); 74 static void xprt_destroy(struct rpc_xprt *xprt); 75 static void xprt_request_init(struct rpc_task *task); 76 77 static DEFINE_SPINLOCK(xprt_list_lock); 78 static LIST_HEAD(xprt_list); 79 80 static unsigned long xprt_request_timeout(const struct rpc_rqst *req) 81 { 82 unsigned long timeout = jiffies + req->rq_timeout; 83 84 if (time_before(timeout, req->rq_majortimeo)) 85 return timeout; 86 return req->rq_majortimeo; 87 } 88 89 /** 90 * xprt_register_transport - register a transport implementation 91 * @transport: transport to register 92 * 93 * If a transport implementation is loaded as a kernel module, it can 94 * call this interface to make itself known to the RPC client. 95 * 96 * Returns: 97 * 0: transport successfully registered 98 * -EEXIST: transport already registered 99 * -EINVAL: transport module being unloaded 100 */ 101 int xprt_register_transport(struct xprt_class *transport) 102 { 103 struct xprt_class *t; 104 int result; 105 106 result = -EEXIST; 107 spin_lock(&xprt_list_lock); 108 list_for_each_entry(t, &xprt_list, list) { 109 /* don't register the same transport class twice */ 110 if (t->ident == transport->ident) 111 goto out; 112 } 113 114 list_add_tail(&transport->list, &xprt_list); 115 printk(KERN_INFO "RPC: Registered %s transport module.\n", 116 transport->name); 117 result = 0; 118 119 out: 120 spin_unlock(&xprt_list_lock); 121 return result; 122 } 123 EXPORT_SYMBOL_GPL(xprt_register_transport); 124 125 /** 126 * xprt_unregister_transport - unregister a transport implementation 127 * @transport: transport to unregister 128 * 129 * Returns: 130 * 0: transport successfully unregistered 131 * -ENOENT: transport never registered 132 */ 133 int xprt_unregister_transport(struct xprt_class *transport) 134 { 135 struct xprt_class *t; 136 int result; 137 138 result = 0; 139 spin_lock(&xprt_list_lock); 140 list_for_each_entry(t, &xprt_list, list) { 141 if (t == transport) { 142 printk(KERN_INFO 143 "RPC: Unregistered %s transport module.\n", 144 transport->name); 145 list_del_init(&transport->list); 146 goto out; 147 } 148 } 149 result = -ENOENT; 150 151 out: 152 spin_unlock(&xprt_list_lock); 153 return result; 154 } 155 EXPORT_SYMBOL_GPL(xprt_unregister_transport); 156 157 static void 158 xprt_class_release(const struct xprt_class *t) 159 { 160 module_put(t->owner); 161 } 162 163 static const struct xprt_class * 164 xprt_class_find_by_ident_locked(int ident) 165 { 166 const struct xprt_class *t; 167 168 list_for_each_entry(t, &xprt_list, list) { 169 if (t->ident != ident) 170 continue; 171 if (!try_module_get(t->owner)) 172 continue; 173 return t; 174 } 175 return NULL; 176 } 177 178 static const struct xprt_class * 179 xprt_class_find_by_ident(int ident) 180 { 181 const struct xprt_class *t; 182 183 spin_lock(&xprt_list_lock); 184 t = xprt_class_find_by_ident_locked(ident); 185 spin_unlock(&xprt_list_lock); 186 return t; 187 } 188 189 static const struct xprt_class * 190 xprt_class_find_by_netid_locked(const char *netid) 191 { 192 const struct xprt_class *t; 193 unsigned int i; 194 195 list_for_each_entry(t, &xprt_list, list) { 196 for (i = 0; t->netid[i][0] != '\0'; i++) { 197 if (strcmp(t->netid[i], netid) != 0) 198 continue; 199 if (!try_module_get(t->owner)) 200 continue; 201 return t; 202 } 203 } 204 return NULL; 205 } 206 207 static const struct xprt_class * 208 xprt_class_find_by_netid(const char *netid) 209 { 210 const struct xprt_class *t; 211 212 spin_lock(&xprt_list_lock); 213 t = xprt_class_find_by_netid_locked(netid); 214 if (!t) { 215 spin_unlock(&xprt_list_lock); 216 request_module("rpc%s", netid); 217 spin_lock(&xprt_list_lock); 218 t = xprt_class_find_by_netid_locked(netid); 219 } 220 spin_unlock(&xprt_list_lock); 221 return t; 222 } 223 224 /** 225 * xprt_find_transport_ident - convert a netid into a transport identifier 226 * @netid: transport to load 227 * 228 * Returns: 229 * > 0: transport identifier 230 * -ENOENT: transport module not available 231 */ 232 int xprt_find_transport_ident(const char *netid) 233 { 234 const struct xprt_class *t; 235 int ret; 236 237 t = xprt_class_find_by_netid(netid); 238 if (!t) 239 return -ENOENT; 240 ret = t->ident; 241 xprt_class_release(t); 242 return ret; 243 } 244 EXPORT_SYMBOL_GPL(xprt_find_transport_ident); 245 246 static void xprt_clear_locked(struct rpc_xprt *xprt) 247 { 248 xprt->snd_task = NULL; 249 if (!test_bit(XPRT_CLOSE_WAIT, &xprt->state)) { 250 smp_mb__before_atomic(); 251 clear_bit(XPRT_LOCKED, &xprt->state); 252 smp_mb__after_atomic(); 253 } else 254 queue_work(xprtiod_workqueue, &xprt->task_cleanup); 255 } 256 257 /** 258 * xprt_reserve_xprt - serialize write access to transports 259 * @task: task that is requesting access to the transport 260 * @xprt: pointer to the target transport 261 * 262 * This prevents mixing the payload of separate requests, and prevents 263 * transport connects from colliding with writes. No congestion control 264 * is provided. 265 */ 266 int xprt_reserve_xprt(struct rpc_xprt *xprt, struct rpc_task *task) 267 { 268 struct rpc_rqst *req = task->tk_rqstp; 269 270 if (test_and_set_bit(XPRT_LOCKED, &xprt->state)) { 271 if (task == xprt->snd_task) 272 goto out_locked; 273 goto out_sleep; 274 } 275 if (test_bit(XPRT_WRITE_SPACE, &xprt->state)) 276 goto out_unlock; 277 xprt->snd_task = task; 278 279 out_locked: 280 trace_xprt_reserve_xprt(xprt, task); 281 return 1; 282 283 out_unlock: 284 xprt_clear_locked(xprt); 285 out_sleep: 286 task->tk_status = -EAGAIN; 287 if (RPC_IS_SOFT(task)) 288 rpc_sleep_on_timeout(&xprt->sending, task, NULL, 289 xprt_request_timeout(req)); 290 else 291 rpc_sleep_on(&xprt->sending, task, NULL); 292 return 0; 293 } 294 EXPORT_SYMBOL_GPL(xprt_reserve_xprt); 295 296 static bool 297 xprt_need_congestion_window_wait(struct rpc_xprt *xprt) 298 { 299 return test_bit(XPRT_CWND_WAIT, &xprt->state); 300 } 301 302 static void 303 xprt_set_congestion_window_wait(struct rpc_xprt *xprt) 304 { 305 if (!list_empty(&xprt->xmit_queue)) { 306 /* Peek at head of queue to see if it can make progress */ 307 if (list_first_entry(&xprt->xmit_queue, struct rpc_rqst, 308 rq_xmit)->rq_cong) 309 return; 310 } 311 set_bit(XPRT_CWND_WAIT, &xprt->state); 312 } 313 314 static void 315 xprt_test_and_clear_congestion_window_wait(struct rpc_xprt *xprt) 316 { 317 if (!RPCXPRT_CONGESTED(xprt)) 318 clear_bit(XPRT_CWND_WAIT, &xprt->state); 319 } 320 321 /* 322 * xprt_reserve_xprt_cong - serialize write access to transports 323 * @task: task that is requesting access to the transport 324 * 325 * Same as xprt_reserve_xprt, but Van Jacobson congestion control is 326 * integrated into the decision of whether a request is allowed to be 327 * woken up and given access to the transport. 328 * Note that the lock is only granted if we know there are free slots. 329 */ 330 int xprt_reserve_xprt_cong(struct rpc_xprt *xprt, struct rpc_task *task) 331 { 332 struct rpc_rqst *req = task->tk_rqstp; 333 334 if (test_and_set_bit(XPRT_LOCKED, &xprt->state)) { 335 if (task == xprt->snd_task) 336 goto out_locked; 337 goto out_sleep; 338 } 339 if (req == NULL) { 340 xprt->snd_task = task; 341 goto out_locked; 342 } 343 if (test_bit(XPRT_WRITE_SPACE, &xprt->state)) 344 goto out_unlock; 345 if (!xprt_need_congestion_window_wait(xprt)) { 346 xprt->snd_task = task; 347 goto out_locked; 348 } 349 out_unlock: 350 xprt_clear_locked(xprt); 351 out_sleep: 352 task->tk_status = -EAGAIN; 353 if (RPC_IS_SOFT(task)) 354 rpc_sleep_on_timeout(&xprt->sending, task, NULL, 355 xprt_request_timeout(req)); 356 else 357 rpc_sleep_on(&xprt->sending, task, NULL); 358 return 0; 359 out_locked: 360 trace_xprt_reserve_cong(xprt, task); 361 return 1; 362 } 363 EXPORT_SYMBOL_GPL(xprt_reserve_xprt_cong); 364 365 static inline int xprt_lock_write(struct rpc_xprt *xprt, struct rpc_task *task) 366 { 367 int retval; 368 369 if (test_bit(XPRT_LOCKED, &xprt->state) && xprt->snd_task == task) 370 return 1; 371 spin_lock(&xprt->transport_lock); 372 retval = xprt->ops->reserve_xprt(xprt, task); 373 spin_unlock(&xprt->transport_lock); 374 return retval; 375 } 376 377 static bool __xprt_lock_write_func(struct rpc_task *task, void *data) 378 { 379 struct rpc_xprt *xprt = data; 380 381 xprt->snd_task = task; 382 return true; 383 } 384 385 static void __xprt_lock_write_next(struct rpc_xprt *xprt) 386 { 387 if (test_and_set_bit(XPRT_LOCKED, &xprt->state)) 388 return; 389 if (test_bit(XPRT_WRITE_SPACE, &xprt->state)) 390 goto out_unlock; 391 if (rpc_wake_up_first_on_wq(xprtiod_workqueue, &xprt->sending, 392 __xprt_lock_write_func, xprt)) 393 return; 394 out_unlock: 395 xprt_clear_locked(xprt); 396 } 397 398 static void __xprt_lock_write_next_cong(struct rpc_xprt *xprt) 399 { 400 if (test_and_set_bit(XPRT_LOCKED, &xprt->state)) 401 return; 402 if (test_bit(XPRT_WRITE_SPACE, &xprt->state)) 403 goto out_unlock; 404 if (xprt_need_congestion_window_wait(xprt)) 405 goto out_unlock; 406 if (rpc_wake_up_first_on_wq(xprtiod_workqueue, &xprt->sending, 407 __xprt_lock_write_func, xprt)) 408 return; 409 out_unlock: 410 xprt_clear_locked(xprt); 411 } 412 413 /** 414 * xprt_release_xprt - allow other requests to use a transport 415 * @xprt: transport with other tasks potentially waiting 416 * @task: task that is releasing access to the transport 417 * 418 * Note that "task" can be NULL. No congestion control is provided. 419 */ 420 void xprt_release_xprt(struct rpc_xprt *xprt, struct rpc_task *task) 421 { 422 if (xprt->snd_task == task) { 423 xprt_clear_locked(xprt); 424 __xprt_lock_write_next(xprt); 425 } 426 trace_xprt_release_xprt(xprt, task); 427 } 428 EXPORT_SYMBOL_GPL(xprt_release_xprt); 429 430 /** 431 * xprt_release_xprt_cong - allow other requests to use a transport 432 * @xprt: transport with other tasks potentially waiting 433 * @task: task that is releasing access to the transport 434 * 435 * Note that "task" can be NULL. Another task is awoken to use the 436 * transport if the transport's congestion window allows it. 437 */ 438 void xprt_release_xprt_cong(struct rpc_xprt *xprt, struct rpc_task *task) 439 { 440 if (xprt->snd_task == task) { 441 xprt_clear_locked(xprt); 442 __xprt_lock_write_next_cong(xprt); 443 } 444 trace_xprt_release_cong(xprt, task); 445 } 446 EXPORT_SYMBOL_GPL(xprt_release_xprt_cong); 447 448 void xprt_release_write(struct rpc_xprt *xprt, struct rpc_task *task) 449 { 450 if (xprt->snd_task != task) 451 return; 452 spin_lock(&xprt->transport_lock); 453 xprt->ops->release_xprt(xprt, task); 454 spin_unlock(&xprt->transport_lock); 455 } 456 457 /* 458 * Van Jacobson congestion avoidance. Check if the congestion window 459 * overflowed. Put the task to sleep if this is the case. 460 */ 461 static int 462 __xprt_get_cong(struct rpc_xprt *xprt, struct rpc_rqst *req) 463 { 464 if (req->rq_cong) 465 return 1; 466 trace_xprt_get_cong(xprt, req->rq_task); 467 if (RPCXPRT_CONGESTED(xprt)) { 468 xprt_set_congestion_window_wait(xprt); 469 return 0; 470 } 471 req->rq_cong = 1; 472 xprt->cong += RPC_CWNDSCALE; 473 return 1; 474 } 475 476 /* 477 * Adjust the congestion window, and wake up the next task 478 * that has been sleeping due to congestion 479 */ 480 static void 481 __xprt_put_cong(struct rpc_xprt *xprt, struct rpc_rqst *req) 482 { 483 if (!req->rq_cong) 484 return; 485 req->rq_cong = 0; 486 xprt->cong -= RPC_CWNDSCALE; 487 xprt_test_and_clear_congestion_window_wait(xprt); 488 trace_xprt_put_cong(xprt, req->rq_task); 489 __xprt_lock_write_next_cong(xprt); 490 } 491 492 /** 493 * xprt_request_get_cong - Request congestion control credits 494 * @xprt: pointer to transport 495 * @req: pointer to RPC request 496 * 497 * Useful for transports that require congestion control. 498 */ 499 bool 500 xprt_request_get_cong(struct rpc_xprt *xprt, struct rpc_rqst *req) 501 { 502 bool ret = false; 503 504 if (req->rq_cong) 505 return true; 506 spin_lock(&xprt->transport_lock); 507 ret = __xprt_get_cong(xprt, req) != 0; 508 spin_unlock(&xprt->transport_lock); 509 return ret; 510 } 511 EXPORT_SYMBOL_GPL(xprt_request_get_cong); 512 513 /** 514 * xprt_release_rqst_cong - housekeeping when request is complete 515 * @task: RPC request that recently completed 516 * 517 * Useful for transports that require congestion control. 518 */ 519 void xprt_release_rqst_cong(struct rpc_task *task) 520 { 521 struct rpc_rqst *req = task->tk_rqstp; 522 523 __xprt_put_cong(req->rq_xprt, req); 524 } 525 EXPORT_SYMBOL_GPL(xprt_release_rqst_cong); 526 527 static void xprt_clear_congestion_window_wait_locked(struct rpc_xprt *xprt) 528 { 529 if (test_and_clear_bit(XPRT_CWND_WAIT, &xprt->state)) 530 __xprt_lock_write_next_cong(xprt); 531 } 532 533 /* 534 * Clear the congestion window wait flag and wake up the next 535 * entry on xprt->sending 536 */ 537 static void 538 xprt_clear_congestion_window_wait(struct rpc_xprt *xprt) 539 { 540 if (test_and_clear_bit(XPRT_CWND_WAIT, &xprt->state)) { 541 spin_lock(&xprt->transport_lock); 542 __xprt_lock_write_next_cong(xprt); 543 spin_unlock(&xprt->transport_lock); 544 } 545 } 546 547 /** 548 * xprt_adjust_cwnd - adjust transport congestion window 549 * @xprt: pointer to xprt 550 * @task: recently completed RPC request used to adjust window 551 * @result: result code of completed RPC request 552 * 553 * The transport code maintains an estimate on the maximum number of out- 554 * standing RPC requests, using a smoothed version of the congestion 555 * avoidance implemented in 44BSD. This is basically the Van Jacobson 556 * congestion algorithm: If a retransmit occurs, the congestion window is 557 * halved; otherwise, it is incremented by 1/cwnd when 558 * 559 * - a reply is received and 560 * - a full number of requests are outstanding and 561 * - the congestion window hasn't been updated recently. 562 */ 563 void xprt_adjust_cwnd(struct rpc_xprt *xprt, struct rpc_task *task, int result) 564 { 565 struct rpc_rqst *req = task->tk_rqstp; 566 unsigned long cwnd = xprt->cwnd; 567 568 if (result >= 0 && cwnd <= xprt->cong) { 569 /* The (cwnd >> 1) term makes sure 570 * the result gets rounded properly. */ 571 cwnd += (RPC_CWNDSCALE * RPC_CWNDSCALE + (cwnd >> 1)) / cwnd; 572 if (cwnd > RPC_MAXCWND(xprt)) 573 cwnd = RPC_MAXCWND(xprt); 574 __xprt_lock_write_next_cong(xprt); 575 } else if (result == -ETIMEDOUT) { 576 cwnd >>= 1; 577 if (cwnd < RPC_CWNDSCALE) 578 cwnd = RPC_CWNDSCALE; 579 } 580 dprintk("RPC: cong %ld, cwnd was %ld, now %ld\n", 581 xprt->cong, xprt->cwnd, cwnd); 582 xprt->cwnd = cwnd; 583 __xprt_put_cong(xprt, req); 584 } 585 EXPORT_SYMBOL_GPL(xprt_adjust_cwnd); 586 587 /** 588 * xprt_wake_pending_tasks - wake all tasks on a transport's pending queue 589 * @xprt: transport with waiting tasks 590 * @status: result code to plant in each task before waking it 591 * 592 */ 593 void xprt_wake_pending_tasks(struct rpc_xprt *xprt, int status) 594 { 595 if (status < 0) 596 rpc_wake_up_status(&xprt->pending, status); 597 else 598 rpc_wake_up(&xprt->pending); 599 } 600 EXPORT_SYMBOL_GPL(xprt_wake_pending_tasks); 601 602 /** 603 * xprt_wait_for_buffer_space - wait for transport output buffer to clear 604 * @xprt: transport 605 * 606 * Note that we only set the timer for the case of RPC_IS_SOFT(), since 607 * we don't in general want to force a socket disconnection due to 608 * an incomplete RPC call transmission. 609 */ 610 void xprt_wait_for_buffer_space(struct rpc_xprt *xprt) 611 { 612 set_bit(XPRT_WRITE_SPACE, &xprt->state); 613 } 614 EXPORT_SYMBOL_GPL(xprt_wait_for_buffer_space); 615 616 static bool 617 xprt_clear_write_space_locked(struct rpc_xprt *xprt) 618 { 619 if (test_and_clear_bit(XPRT_WRITE_SPACE, &xprt->state)) { 620 __xprt_lock_write_next(xprt); 621 dprintk("RPC: write space: waking waiting task on " 622 "xprt %p\n", xprt); 623 return true; 624 } 625 return false; 626 } 627 628 /** 629 * xprt_write_space - wake the task waiting for transport output buffer space 630 * @xprt: transport with waiting tasks 631 * 632 * Can be called in a soft IRQ context, so xprt_write_space never sleeps. 633 */ 634 bool xprt_write_space(struct rpc_xprt *xprt) 635 { 636 bool ret; 637 638 if (!test_bit(XPRT_WRITE_SPACE, &xprt->state)) 639 return false; 640 spin_lock(&xprt->transport_lock); 641 ret = xprt_clear_write_space_locked(xprt); 642 spin_unlock(&xprt->transport_lock); 643 return ret; 644 } 645 EXPORT_SYMBOL_GPL(xprt_write_space); 646 647 static unsigned long xprt_abs_ktime_to_jiffies(ktime_t abstime) 648 { 649 s64 delta = ktime_to_ns(ktime_get() - abstime); 650 return likely(delta >= 0) ? 651 jiffies - nsecs_to_jiffies(delta) : 652 jiffies + nsecs_to_jiffies(-delta); 653 } 654 655 static unsigned long xprt_calc_majortimeo(struct rpc_rqst *req) 656 { 657 const struct rpc_timeout *to = req->rq_task->tk_client->cl_timeout; 658 unsigned long majortimeo = req->rq_timeout; 659 660 if (to->to_exponential) 661 majortimeo <<= to->to_retries; 662 else 663 majortimeo += to->to_increment * to->to_retries; 664 if (majortimeo > to->to_maxval || majortimeo == 0) 665 majortimeo = to->to_maxval; 666 return majortimeo; 667 } 668 669 static void xprt_reset_majortimeo(struct rpc_rqst *req) 670 { 671 req->rq_majortimeo += xprt_calc_majortimeo(req); 672 } 673 674 static void xprt_reset_minortimeo(struct rpc_rqst *req) 675 { 676 req->rq_minortimeo += req->rq_timeout; 677 } 678 679 static void xprt_init_majortimeo(struct rpc_task *task, struct rpc_rqst *req) 680 { 681 unsigned long time_init; 682 struct rpc_xprt *xprt = req->rq_xprt; 683 684 if (likely(xprt && xprt_connected(xprt))) 685 time_init = jiffies; 686 else 687 time_init = xprt_abs_ktime_to_jiffies(task->tk_start); 688 req->rq_timeout = task->tk_client->cl_timeout->to_initval; 689 req->rq_majortimeo = time_init + xprt_calc_majortimeo(req); 690 req->rq_minortimeo = time_init + req->rq_timeout; 691 } 692 693 /** 694 * xprt_adjust_timeout - adjust timeout values for next retransmit 695 * @req: RPC request containing parameters to use for the adjustment 696 * 697 */ 698 int xprt_adjust_timeout(struct rpc_rqst *req) 699 { 700 struct rpc_xprt *xprt = req->rq_xprt; 701 const struct rpc_timeout *to = req->rq_task->tk_client->cl_timeout; 702 int status = 0; 703 704 if (time_before(jiffies, req->rq_majortimeo)) { 705 if (time_before(jiffies, req->rq_minortimeo)) 706 return status; 707 if (to->to_exponential) 708 req->rq_timeout <<= 1; 709 else 710 req->rq_timeout += to->to_increment; 711 if (to->to_maxval && req->rq_timeout >= to->to_maxval) 712 req->rq_timeout = to->to_maxval; 713 req->rq_retries++; 714 } else { 715 req->rq_timeout = to->to_initval; 716 req->rq_retries = 0; 717 xprt_reset_majortimeo(req); 718 /* Reset the RTT counters == "slow start" */ 719 spin_lock(&xprt->transport_lock); 720 rpc_init_rtt(req->rq_task->tk_client->cl_rtt, to->to_initval); 721 spin_unlock(&xprt->transport_lock); 722 status = -ETIMEDOUT; 723 } 724 xprt_reset_minortimeo(req); 725 726 if (req->rq_timeout == 0) { 727 printk(KERN_WARNING "xprt_adjust_timeout: rq_timeout = 0!\n"); 728 req->rq_timeout = 5 * HZ; 729 } 730 return status; 731 } 732 733 static void xprt_autoclose(struct work_struct *work) 734 { 735 struct rpc_xprt *xprt = 736 container_of(work, struct rpc_xprt, task_cleanup); 737 unsigned int pflags = memalloc_nofs_save(); 738 739 trace_xprt_disconnect_auto(xprt); 740 clear_bit(XPRT_CLOSE_WAIT, &xprt->state); 741 xprt->ops->close(xprt); 742 xprt_release_write(xprt, NULL); 743 wake_up_bit(&xprt->state, XPRT_LOCKED); 744 memalloc_nofs_restore(pflags); 745 } 746 747 /** 748 * xprt_disconnect_done - mark a transport as disconnected 749 * @xprt: transport to flag for disconnect 750 * 751 */ 752 void xprt_disconnect_done(struct rpc_xprt *xprt) 753 { 754 trace_xprt_disconnect_done(xprt); 755 spin_lock(&xprt->transport_lock); 756 xprt_clear_connected(xprt); 757 xprt_clear_write_space_locked(xprt); 758 xprt_clear_congestion_window_wait_locked(xprt); 759 xprt_wake_pending_tasks(xprt, -ENOTCONN); 760 spin_unlock(&xprt->transport_lock); 761 } 762 EXPORT_SYMBOL_GPL(xprt_disconnect_done); 763 764 /** 765 * xprt_force_disconnect - force a transport to disconnect 766 * @xprt: transport to disconnect 767 * 768 */ 769 void xprt_force_disconnect(struct rpc_xprt *xprt) 770 { 771 trace_xprt_disconnect_force(xprt); 772 773 /* Don't race with the test_bit() in xprt_clear_locked() */ 774 spin_lock(&xprt->transport_lock); 775 set_bit(XPRT_CLOSE_WAIT, &xprt->state); 776 /* Try to schedule an autoclose RPC call */ 777 if (test_and_set_bit(XPRT_LOCKED, &xprt->state) == 0) 778 queue_work(xprtiod_workqueue, &xprt->task_cleanup); 779 else if (xprt->snd_task) 780 rpc_wake_up_queued_task_set_status(&xprt->pending, 781 xprt->snd_task, -ENOTCONN); 782 spin_unlock(&xprt->transport_lock); 783 } 784 EXPORT_SYMBOL_GPL(xprt_force_disconnect); 785 786 static unsigned int 787 xprt_connect_cookie(struct rpc_xprt *xprt) 788 { 789 return READ_ONCE(xprt->connect_cookie); 790 } 791 792 static bool 793 xprt_request_retransmit_after_disconnect(struct rpc_task *task) 794 { 795 struct rpc_rqst *req = task->tk_rqstp; 796 struct rpc_xprt *xprt = req->rq_xprt; 797 798 return req->rq_connect_cookie != xprt_connect_cookie(xprt) || 799 !xprt_connected(xprt); 800 } 801 802 /** 803 * xprt_conditional_disconnect - force a transport to disconnect 804 * @xprt: transport to disconnect 805 * @cookie: 'connection cookie' 806 * 807 * This attempts to break the connection if and only if 'cookie' matches 808 * the current transport 'connection cookie'. It ensures that we don't 809 * try to break the connection more than once when we need to retransmit 810 * a batch of RPC requests. 811 * 812 */ 813 void xprt_conditional_disconnect(struct rpc_xprt *xprt, unsigned int cookie) 814 { 815 /* Don't race with the test_bit() in xprt_clear_locked() */ 816 spin_lock(&xprt->transport_lock); 817 if (cookie != xprt->connect_cookie) 818 goto out; 819 if (test_bit(XPRT_CLOSING, &xprt->state)) 820 goto out; 821 set_bit(XPRT_CLOSE_WAIT, &xprt->state); 822 /* Try to schedule an autoclose RPC call */ 823 if (test_and_set_bit(XPRT_LOCKED, &xprt->state) == 0) 824 queue_work(xprtiod_workqueue, &xprt->task_cleanup); 825 xprt_wake_pending_tasks(xprt, -EAGAIN); 826 out: 827 spin_unlock(&xprt->transport_lock); 828 } 829 830 static bool 831 xprt_has_timer(const struct rpc_xprt *xprt) 832 { 833 return xprt->idle_timeout != 0; 834 } 835 836 static void 837 xprt_schedule_autodisconnect(struct rpc_xprt *xprt) 838 __must_hold(&xprt->transport_lock) 839 { 840 xprt->last_used = jiffies; 841 if (RB_EMPTY_ROOT(&xprt->recv_queue) && xprt_has_timer(xprt)) 842 mod_timer(&xprt->timer, xprt->last_used + xprt->idle_timeout); 843 } 844 845 static void 846 xprt_init_autodisconnect(struct timer_list *t) 847 { 848 struct rpc_xprt *xprt = from_timer(xprt, t, timer); 849 850 if (!RB_EMPTY_ROOT(&xprt->recv_queue)) 851 return; 852 /* Reset xprt->last_used to avoid connect/autodisconnect cycling */ 853 xprt->last_used = jiffies; 854 if (test_and_set_bit(XPRT_LOCKED, &xprt->state)) 855 return; 856 queue_work(xprtiod_workqueue, &xprt->task_cleanup); 857 } 858 859 #if IS_ENABLED(CONFIG_FAIL_SUNRPC) 860 static void xprt_inject_disconnect(struct rpc_xprt *xprt) 861 { 862 if (!fail_sunrpc.ignore_client_disconnect && 863 should_fail(&fail_sunrpc.attr, 1)) 864 xprt->ops->inject_disconnect(xprt); 865 } 866 #else 867 static inline void xprt_inject_disconnect(struct rpc_xprt *xprt) 868 { 869 } 870 #endif 871 872 bool xprt_lock_connect(struct rpc_xprt *xprt, 873 struct rpc_task *task, 874 void *cookie) 875 { 876 bool ret = false; 877 878 spin_lock(&xprt->transport_lock); 879 if (!test_bit(XPRT_LOCKED, &xprt->state)) 880 goto out; 881 if (xprt->snd_task != task) 882 goto out; 883 xprt->snd_task = cookie; 884 ret = true; 885 out: 886 spin_unlock(&xprt->transport_lock); 887 return ret; 888 } 889 890 void xprt_unlock_connect(struct rpc_xprt *xprt, void *cookie) 891 { 892 spin_lock(&xprt->transport_lock); 893 if (xprt->snd_task != cookie) 894 goto out; 895 if (!test_bit(XPRT_LOCKED, &xprt->state)) 896 goto out; 897 xprt->snd_task =NULL; 898 xprt->ops->release_xprt(xprt, NULL); 899 xprt_schedule_autodisconnect(xprt); 900 out: 901 spin_unlock(&xprt->transport_lock); 902 wake_up_bit(&xprt->state, XPRT_LOCKED); 903 } 904 905 /** 906 * xprt_connect - schedule a transport connect operation 907 * @task: RPC task that is requesting the connect 908 * 909 */ 910 void xprt_connect(struct rpc_task *task) 911 { 912 struct rpc_xprt *xprt = task->tk_rqstp->rq_xprt; 913 914 trace_xprt_connect(xprt); 915 916 if (!xprt_bound(xprt)) { 917 task->tk_status = -EAGAIN; 918 return; 919 } 920 if (!xprt_lock_write(xprt, task)) 921 return; 922 923 if (test_and_clear_bit(XPRT_CLOSE_WAIT, &xprt->state)) { 924 trace_xprt_disconnect_cleanup(xprt); 925 xprt->ops->close(xprt); 926 } 927 928 if (!xprt_connected(xprt)) { 929 task->tk_rqstp->rq_connect_cookie = xprt->connect_cookie; 930 rpc_sleep_on_timeout(&xprt->pending, task, NULL, 931 xprt_request_timeout(task->tk_rqstp)); 932 933 if (test_bit(XPRT_CLOSING, &xprt->state)) 934 return; 935 if (xprt_test_and_set_connecting(xprt)) 936 return; 937 /* Race breaker */ 938 if (!xprt_connected(xprt)) { 939 xprt->stat.connect_start = jiffies; 940 xprt->ops->connect(xprt, task); 941 } else { 942 xprt_clear_connecting(xprt); 943 task->tk_status = 0; 944 rpc_wake_up_queued_task(&xprt->pending, task); 945 } 946 } 947 xprt_release_write(xprt, task); 948 } 949 950 /** 951 * xprt_reconnect_delay - compute the wait before scheduling a connect 952 * @xprt: transport instance 953 * 954 */ 955 unsigned long xprt_reconnect_delay(const struct rpc_xprt *xprt) 956 { 957 unsigned long start, now = jiffies; 958 959 start = xprt->stat.connect_start + xprt->reestablish_timeout; 960 if (time_after(start, now)) 961 return start - now; 962 return 0; 963 } 964 EXPORT_SYMBOL_GPL(xprt_reconnect_delay); 965 966 /** 967 * xprt_reconnect_backoff - compute the new re-establish timeout 968 * @xprt: transport instance 969 * @init_to: initial reestablish timeout 970 * 971 */ 972 void xprt_reconnect_backoff(struct rpc_xprt *xprt, unsigned long init_to) 973 { 974 xprt->reestablish_timeout <<= 1; 975 if (xprt->reestablish_timeout > xprt->max_reconnect_timeout) 976 xprt->reestablish_timeout = xprt->max_reconnect_timeout; 977 if (xprt->reestablish_timeout < init_to) 978 xprt->reestablish_timeout = init_to; 979 } 980 EXPORT_SYMBOL_GPL(xprt_reconnect_backoff); 981 982 enum xprt_xid_rb_cmp { 983 XID_RB_EQUAL, 984 XID_RB_LEFT, 985 XID_RB_RIGHT, 986 }; 987 static enum xprt_xid_rb_cmp 988 xprt_xid_cmp(__be32 xid1, __be32 xid2) 989 { 990 if (xid1 == xid2) 991 return XID_RB_EQUAL; 992 if ((__force u32)xid1 < (__force u32)xid2) 993 return XID_RB_LEFT; 994 return XID_RB_RIGHT; 995 } 996 997 static struct rpc_rqst * 998 xprt_request_rb_find(struct rpc_xprt *xprt, __be32 xid) 999 { 1000 struct rb_node *n = xprt->recv_queue.rb_node; 1001 struct rpc_rqst *req; 1002 1003 while (n != NULL) { 1004 req = rb_entry(n, struct rpc_rqst, rq_recv); 1005 switch (xprt_xid_cmp(xid, req->rq_xid)) { 1006 case XID_RB_LEFT: 1007 n = n->rb_left; 1008 break; 1009 case XID_RB_RIGHT: 1010 n = n->rb_right; 1011 break; 1012 case XID_RB_EQUAL: 1013 return req; 1014 } 1015 } 1016 return NULL; 1017 } 1018 1019 static void 1020 xprt_request_rb_insert(struct rpc_xprt *xprt, struct rpc_rqst *new) 1021 { 1022 struct rb_node **p = &xprt->recv_queue.rb_node; 1023 struct rb_node *n = NULL; 1024 struct rpc_rqst *req; 1025 1026 while (*p != NULL) { 1027 n = *p; 1028 req = rb_entry(n, struct rpc_rqst, rq_recv); 1029 switch(xprt_xid_cmp(new->rq_xid, req->rq_xid)) { 1030 case XID_RB_LEFT: 1031 p = &n->rb_left; 1032 break; 1033 case XID_RB_RIGHT: 1034 p = &n->rb_right; 1035 break; 1036 case XID_RB_EQUAL: 1037 WARN_ON_ONCE(new != req); 1038 return; 1039 } 1040 } 1041 rb_link_node(&new->rq_recv, n, p); 1042 rb_insert_color(&new->rq_recv, &xprt->recv_queue); 1043 } 1044 1045 static void 1046 xprt_request_rb_remove(struct rpc_xprt *xprt, struct rpc_rqst *req) 1047 { 1048 rb_erase(&req->rq_recv, &xprt->recv_queue); 1049 } 1050 1051 /** 1052 * xprt_lookup_rqst - find an RPC request corresponding to an XID 1053 * @xprt: transport on which the original request was transmitted 1054 * @xid: RPC XID of incoming reply 1055 * 1056 * Caller holds xprt->queue_lock. 1057 */ 1058 struct rpc_rqst *xprt_lookup_rqst(struct rpc_xprt *xprt, __be32 xid) 1059 { 1060 struct rpc_rqst *entry; 1061 1062 entry = xprt_request_rb_find(xprt, xid); 1063 if (entry != NULL) { 1064 trace_xprt_lookup_rqst(xprt, xid, 0); 1065 entry->rq_rtt = ktime_sub(ktime_get(), entry->rq_xtime); 1066 return entry; 1067 } 1068 1069 dprintk("RPC: xprt_lookup_rqst did not find xid %08x\n", 1070 ntohl(xid)); 1071 trace_xprt_lookup_rqst(xprt, xid, -ENOENT); 1072 xprt->stat.bad_xids++; 1073 return NULL; 1074 } 1075 EXPORT_SYMBOL_GPL(xprt_lookup_rqst); 1076 1077 static bool 1078 xprt_is_pinned_rqst(struct rpc_rqst *req) 1079 { 1080 return atomic_read(&req->rq_pin) != 0; 1081 } 1082 1083 /** 1084 * xprt_pin_rqst - Pin a request on the transport receive list 1085 * @req: Request to pin 1086 * 1087 * Caller must ensure this is atomic with the call to xprt_lookup_rqst() 1088 * so should be holding xprt->queue_lock. 1089 */ 1090 void xprt_pin_rqst(struct rpc_rqst *req) 1091 { 1092 atomic_inc(&req->rq_pin); 1093 } 1094 EXPORT_SYMBOL_GPL(xprt_pin_rqst); 1095 1096 /** 1097 * xprt_unpin_rqst - Unpin a request on the transport receive list 1098 * @req: Request to pin 1099 * 1100 * Caller should be holding xprt->queue_lock. 1101 */ 1102 void xprt_unpin_rqst(struct rpc_rqst *req) 1103 { 1104 if (!test_bit(RPC_TASK_MSG_PIN_WAIT, &req->rq_task->tk_runstate)) { 1105 atomic_dec(&req->rq_pin); 1106 return; 1107 } 1108 if (atomic_dec_and_test(&req->rq_pin)) 1109 wake_up_var(&req->rq_pin); 1110 } 1111 EXPORT_SYMBOL_GPL(xprt_unpin_rqst); 1112 1113 static void xprt_wait_on_pinned_rqst(struct rpc_rqst *req) 1114 { 1115 wait_var_event(&req->rq_pin, !xprt_is_pinned_rqst(req)); 1116 } 1117 1118 static bool 1119 xprt_request_data_received(struct rpc_task *task) 1120 { 1121 return !test_bit(RPC_TASK_NEED_RECV, &task->tk_runstate) && 1122 READ_ONCE(task->tk_rqstp->rq_reply_bytes_recvd) != 0; 1123 } 1124 1125 static bool 1126 xprt_request_need_enqueue_receive(struct rpc_task *task, struct rpc_rqst *req) 1127 { 1128 return !test_bit(RPC_TASK_NEED_RECV, &task->tk_runstate) && 1129 READ_ONCE(task->tk_rqstp->rq_reply_bytes_recvd) == 0; 1130 } 1131 1132 /** 1133 * xprt_request_enqueue_receive - Add an request to the receive queue 1134 * @task: RPC task 1135 * 1136 */ 1137 void 1138 xprt_request_enqueue_receive(struct rpc_task *task) 1139 { 1140 struct rpc_rqst *req = task->tk_rqstp; 1141 struct rpc_xprt *xprt = req->rq_xprt; 1142 1143 if (!xprt_request_need_enqueue_receive(task, req)) 1144 return; 1145 1146 xprt_request_prepare(task->tk_rqstp); 1147 spin_lock(&xprt->queue_lock); 1148 1149 /* Update the softirq receive buffer */ 1150 memcpy(&req->rq_private_buf, &req->rq_rcv_buf, 1151 sizeof(req->rq_private_buf)); 1152 1153 /* Add request to the receive list */ 1154 xprt_request_rb_insert(xprt, req); 1155 set_bit(RPC_TASK_NEED_RECV, &task->tk_runstate); 1156 spin_unlock(&xprt->queue_lock); 1157 1158 /* Turn off autodisconnect */ 1159 del_singleshot_timer_sync(&xprt->timer); 1160 } 1161 1162 /** 1163 * xprt_request_dequeue_receive_locked - Remove a request from the receive queue 1164 * @task: RPC task 1165 * 1166 * Caller must hold xprt->queue_lock. 1167 */ 1168 static void 1169 xprt_request_dequeue_receive_locked(struct rpc_task *task) 1170 { 1171 struct rpc_rqst *req = task->tk_rqstp; 1172 1173 if (test_and_clear_bit(RPC_TASK_NEED_RECV, &task->tk_runstate)) 1174 xprt_request_rb_remove(req->rq_xprt, req); 1175 } 1176 1177 /** 1178 * xprt_update_rtt - Update RPC RTT statistics 1179 * @task: RPC request that recently completed 1180 * 1181 * Caller holds xprt->queue_lock. 1182 */ 1183 void xprt_update_rtt(struct rpc_task *task) 1184 { 1185 struct rpc_rqst *req = task->tk_rqstp; 1186 struct rpc_rtt *rtt = task->tk_client->cl_rtt; 1187 unsigned int timer = task->tk_msg.rpc_proc->p_timer; 1188 long m = usecs_to_jiffies(ktime_to_us(req->rq_rtt)); 1189 1190 if (timer) { 1191 if (req->rq_ntrans == 1) 1192 rpc_update_rtt(rtt, timer, m); 1193 rpc_set_timeo(rtt, timer, req->rq_ntrans - 1); 1194 } 1195 } 1196 EXPORT_SYMBOL_GPL(xprt_update_rtt); 1197 1198 /** 1199 * xprt_complete_rqst - called when reply processing is complete 1200 * @task: RPC request that recently completed 1201 * @copied: actual number of bytes received from the transport 1202 * 1203 * Caller holds xprt->queue_lock. 1204 */ 1205 void xprt_complete_rqst(struct rpc_task *task, int copied) 1206 { 1207 struct rpc_rqst *req = task->tk_rqstp; 1208 struct rpc_xprt *xprt = req->rq_xprt; 1209 1210 xprt->stat.recvs++; 1211 1212 req->rq_private_buf.len = copied; 1213 /* Ensure all writes are done before we update */ 1214 /* req->rq_reply_bytes_recvd */ 1215 smp_wmb(); 1216 req->rq_reply_bytes_recvd = copied; 1217 xprt_request_dequeue_receive_locked(task); 1218 rpc_wake_up_queued_task(&xprt->pending, task); 1219 } 1220 EXPORT_SYMBOL_GPL(xprt_complete_rqst); 1221 1222 static void xprt_timer(struct rpc_task *task) 1223 { 1224 struct rpc_rqst *req = task->tk_rqstp; 1225 struct rpc_xprt *xprt = req->rq_xprt; 1226 1227 if (task->tk_status != -ETIMEDOUT) 1228 return; 1229 1230 trace_xprt_timer(xprt, req->rq_xid, task->tk_status); 1231 if (!req->rq_reply_bytes_recvd) { 1232 if (xprt->ops->timer) 1233 xprt->ops->timer(xprt, task); 1234 } else 1235 task->tk_status = 0; 1236 } 1237 1238 /** 1239 * xprt_wait_for_reply_request_def - wait for reply 1240 * @task: pointer to rpc_task 1241 * 1242 * Set a request's retransmit timeout based on the transport's 1243 * default timeout parameters. Used by transports that don't adjust 1244 * the retransmit timeout based on round-trip time estimation, 1245 * and put the task to sleep on the pending queue. 1246 */ 1247 void xprt_wait_for_reply_request_def(struct rpc_task *task) 1248 { 1249 struct rpc_rqst *req = task->tk_rqstp; 1250 1251 rpc_sleep_on_timeout(&req->rq_xprt->pending, task, xprt_timer, 1252 xprt_request_timeout(req)); 1253 } 1254 EXPORT_SYMBOL_GPL(xprt_wait_for_reply_request_def); 1255 1256 /** 1257 * xprt_wait_for_reply_request_rtt - wait for reply using RTT estimator 1258 * @task: pointer to rpc_task 1259 * 1260 * Set a request's retransmit timeout using the RTT estimator, 1261 * and put the task to sleep on the pending queue. 1262 */ 1263 void xprt_wait_for_reply_request_rtt(struct rpc_task *task) 1264 { 1265 int timer = task->tk_msg.rpc_proc->p_timer; 1266 struct rpc_clnt *clnt = task->tk_client; 1267 struct rpc_rtt *rtt = clnt->cl_rtt; 1268 struct rpc_rqst *req = task->tk_rqstp; 1269 unsigned long max_timeout = clnt->cl_timeout->to_maxval; 1270 unsigned long timeout; 1271 1272 timeout = rpc_calc_rto(rtt, timer); 1273 timeout <<= rpc_ntimeo(rtt, timer) + req->rq_retries; 1274 if (timeout > max_timeout || timeout == 0) 1275 timeout = max_timeout; 1276 rpc_sleep_on_timeout(&req->rq_xprt->pending, task, xprt_timer, 1277 jiffies + timeout); 1278 } 1279 EXPORT_SYMBOL_GPL(xprt_wait_for_reply_request_rtt); 1280 1281 /** 1282 * xprt_request_wait_receive - wait for the reply to an RPC request 1283 * @task: RPC task about to send a request 1284 * 1285 */ 1286 void xprt_request_wait_receive(struct rpc_task *task) 1287 { 1288 struct rpc_rqst *req = task->tk_rqstp; 1289 struct rpc_xprt *xprt = req->rq_xprt; 1290 1291 if (!test_bit(RPC_TASK_NEED_RECV, &task->tk_runstate)) 1292 return; 1293 /* 1294 * Sleep on the pending queue if we're expecting a reply. 1295 * The spinlock ensures atomicity between the test of 1296 * req->rq_reply_bytes_recvd, and the call to rpc_sleep_on(). 1297 */ 1298 spin_lock(&xprt->queue_lock); 1299 if (test_bit(RPC_TASK_NEED_RECV, &task->tk_runstate)) { 1300 xprt->ops->wait_for_reply_request(task); 1301 /* 1302 * Send an extra queue wakeup call if the 1303 * connection was dropped in case the call to 1304 * rpc_sleep_on() raced. 1305 */ 1306 if (xprt_request_retransmit_after_disconnect(task)) 1307 rpc_wake_up_queued_task_set_status(&xprt->pending, 1308 task, -ENOTCONN); 1309 } 1310 spin_unlock(&xprt->queue_lock); 1311 } 1312 1313 static bool 1314 xprt_request_need_enqueue_transmit(struct rpc_task *task, struct rpc_rqst *req) 1315 { 1316 return !test_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate); 1317 } 1318 1319 /** 1320 * xprt_request_enqueue_transmit - queue a task for transmission 1321 * @task: pointer to rpc_task 1322 * 1323 * Add a task to the transmission queue. 1324 */ 1325 void 1326 xprt_request_enqueue_transmit(struct rpc_task *task) 1327 { 1328 struct rpc_rqst *pos, *req = task->tk_rqstp; 1329 struct rpc_xprt *xprt = req->rq_xprt; 1330 1331 if (xprt_request_need_enqueue_transmit(task, req)) { 1332 req->rq_bytes_sent = 0; 1333 spin_lock(&xprt->queue_lock); 1334 /* 1335 * Requests that carry congestion control credits are added 1336 * to the head of the list to avoid starvation issues. 1337 */ 1338 if (req->rq_cong) { 1339 xprt_clear_congestion_window_wait(xprt); 1340 list_for_each_entry(pos, &xprt->xmit_queue, rq_xmit) { 1341 if (pos->rq_cong) 1342 continue; 1343 /* Note: req is added _before_ pos */ 1344 list_add_tail(&req->rq_xmit, &pos->rq_xmit); 1345 INIT_LIST_HEAD(&req->rq_xmit2); 1346 goto out; 1347 } 1348 } else if (RPC_IS_SWAPPER(task)) { 1349 list_for_each_entry(pos, &xprt->xmit_queue, rq_xmit) { 1350 if (pos->rq_cong || pos->rq_bytes_sent) 1351 continue; 1352 if (RPC_IS_SWAPPER(pos->rq_task)) 1353 continue; 1354 /* Note: req is added _before_ pos */ 1355 list_add_tail(&req->rq_xmit, &pos->rq_xmit); 1356 INIT_LIST_HEAD(&req->rq_xmit2); 1357 goto out; 1358 } 1359 } else if (!req->rq_seqno) { 1360 list_for_each_entry(pos, &xprt->xmit_queue, rq_xmit) { 1361 if (pos->rq_task->tk_owner != task->tk_owner) 1362 continue; 1363 list_add_tail(&req->rq_xmit2, &pos->rq_xmit2); 1364 INIT_LIST_HEAD(&req->rq_xmit); 1365 goto out; 1366 } 1367 } 1368 list_add_tail(&req->rq_xmit, &xprt->xmit_queue); 1369 INIT_LIST_HEAD(&req->rq_xmit2); 1370 out: 1371 atomic_long_inc(&xprt->xmit_queuelen); 1372 set_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate); 1373 spin_unlock(&xprt->queue_lock); 1374 } 1375 } 1376 1377 /** 1378 * xprt_request_dequeue_transmit_locked - remove a task from the transmission queue 1379 * @task: pointer to rpc_task 1380 * 1381 * Remove a task from the transmission queue 1382 * Caller must hold xprt->queue_lock 1383 */ 1384 static void 1385 xprt_request_dequeue_transmit_locked(struct rpc_task *task) 1386 { 1387 struct rpc_rqst *req = task->tk_rqstp; 1388 1389 if (!test_and_clear_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate)) 1390 return; 1391 if (!list_empty(&req->rq_xmit)) { 1392 list_del(&req->rq_xmit); 1393 if (!list_empty(&req->rq_xmit2)) { 1394 struct rpc_rqst *next = list_first_entry(&req->rq_xmit2, 1395 struct rpc_rqst, rq_xmit2); 1396 list_del(&req->rq_xmit2); 1397 list_add_tail(&next->rq_xmit, &next->rq_xprt->xmit_queue); 1398 } 1399 } else 1400 list_del(&req->rq_xmit2); 1401 atomic_long_dec(&req->rq_xprt->xmit_queuelen); 1402 } 1403 1404 /** 1405 * xprt_request_dequeue_transmit - remove a task from the transmission queue 1406 * @task: pointer to rpc_task 1407 * 1408 * Remove a task from the transmission queue 1409 */ 1410 static void 1411 xprt_request_dequeue_transmit(struct rpc_task *task) 1412 { 1413 struct rpc_rqst *req = task->tk_rqstp; 1414 struct rpc_xprt *xprt = req->rq_xprt; 1415 1416 spin_lock(&xprt->queue_lock); 1417 xprt_request_dequeue_transmit_locked(task); 1418 spin_unlock(&xprt->queue_lock); 1419 } 1420 1421 /** 1422 * xprt_request_dequeue_xprt - remove a task from the transmit+receive queue 1423 * @task: pointer to rpc_task 1424 * 1425 * Remove a task from the transmit and receive queues, and ensure that 1426 * it is not pinned by the receive work item. 1427 */ 1428 void 1429 xprt_request_dequeue_xprt(struct rpc_task *task) 1430 { 1431 struct rpc_rqst *req = task->tk_rqstp; 1432 struct rpc_xprt *xprt = req->rq_xprt; 1433 1434 if (test_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate) || 1435 test_bit(RPC_TASK_NEED_RECV, &task->tk_runstate) || 1436 xprt_is_pinned_rqst(req)) { 1437 spin_lock(&xprt->queue_lock); 1438 xprt_request_dequeue_transmit_locked(task); 1439 xprt_request_dequeue_receive_locked(task); 1440 while (xprt_is_pinned_rqst(req)) { 1441 set_bit(RPC_TASK_MSG_PIN_WAIT, &task->tk_runstate); 1442 spin_unlock(&xprt->queue_lock); 1443 xprt_wait_on_pinned_rqst(req); 1444 spin_lock(&xprt->queue_lock); 1445 clear_bit(RPC_TASK_MSG_PIN_WAIT, &task->tk_runstate); 1446 } 1447 spin_unlock(&xprt->queue_lock); 1448 } 1449 } 1450 1451 /** 1452 * xprt_request_prepare - prepare an encoded request for transport 1453 * @req: pointer to rpc_rqst 1454 * 1455 * Calls into the transport layer to do whatever is needed to prepare 1456 * the request for transmission or receive. 1457 */ 1458 void 1459 xprt_request_prepare(struct rpc_rqst *req) 1460 { 1461 struct rpc_xprt *xprt = req->rq_xprt; 1462 1463 if (xprt->ops->prepare_request) 1464 xprt->ops->prepare_request(req); 1465 } 1466 1467 /** 1468 * xprt_request_need_retransmit - Test if a task needs retransmission 1469 * @task: pointer to rpc_task 1470 * 1471 * Test for whether a connection breakage requires the task to retransmit 1472 */ 1473 bool 1474 xprt_request_need_retransmit(struct rpc_task *task) 1475 { 1476 return xprt_request_retransmit_after_disconnect(task); 1477 } 1478 1479 /** 1480 * xprt_prepare_transmit - reserve the transport before sending a request 1481 * @task: RPC task about to send a request 1482 * 1483 */ 1484 bool xprt_prepare_transmit(struct rpc_task *task) 1485 { 1486 struct rpc_rqst *req = task->tk_rqstp; 1487 struct rpc_xprt *xprt = req->rq_xprt; 1488 1489 if (!xprt_lock_write(xprt, task)) { 1490 /* Race breaker: someone may have transmitted us */ 1491 if (!test_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate)) 1492 rpc_wake_up_queued_task_set_status(&xprt->sending, 1493 task, 0); 1494 return false; 1495 1496 } 1497 return true; 1498 } 1499 1500 void xprt_end_transmit(struct rpc_task *task) 1501 { 1502 struct rpc_xprt *xprt = task->tk_rqstp->rq_xprt; 1503 1504 xprt_inject_disconnect(xprt); 1505 xprt_release_write(xprt, task); 1506 } 1507 1508 /** 1509 * xprt_request_transmit - send an RPC request on a transport 1510 * @req: pointer to request to transmit 1511 * @snd_task: RPC task that owns the transport lock 1512 * 1513 * This performs the transmission of a single request. 1514 * Note that if the request is not the same as snd_task, then it 1515 * does need to be pinned. 1516 * Returns '0' on success. 1517 */ 1518 static int 1519 xprt_request_transmit(struct rpc_rqst *req, struct rpc_task *snd_task) 1520 { 1521 struct rpc_xprt *xprt = req->rq_xprt; 1522 struct rpc_task *task = req->rq_task; 1523 unsigned int connect_cookie; 1524 int is_retrans = RPC_WAS_SENT(task); 1525 int status; 1526 1527 if (!req->rq_bytes_sent) { 1528 if (xprt_request_data_received(task)) { 1529 status = 0; 1530 goto out_dequeue; 1531 } 1532 /* Verify that our message lies in the RPCSEC_GSS window */ 1533 if (rpcauth_xmit_need_reencode(task)) { 1534 status = -EBADMSG; 1535 goto out_dequeue; 1536 } 1537 if (RPC_SIGNALLED(task)) { 1538 status = -ERESTARTSYS; 1539 goto out_dequeue; 1540 } 1541 } 1542 1543 /* 1544 * Update req->rq_ntrans before transmitting to avoid races with 1545 * xprt_update_rtt(), which needs to know that it is recording a 1546 * reply to the first transmission. 1547 */ 1548 req->rq_ntrans++; 1549 1550 trace_rpc_xdr_sendto(task, &req->rq_snd_buf); 1551 connect_cookie = xprt->connect_cookie; 1552 status = xprt->ops->send_request(req); 1553 if (status != 0) { 1554 req->rq_ntrans--; 1555 trace_xprt_transmit(req, status); 1556 return status; 1557 } 1558 1559 if (is_retrans) { 1560 task->tk_client->cl_stats->rpcretrans++; 1561 trace_xprt_retransmit(req); 1562 } 1563 1564 xprt_inject_disconnect(xprt); 1565 1566 task->tk_flags |= RPC_TASK_SENT; 1567 spin_lock(&xprt->transport_lock); 1568 1569 xprt->stat.sends++; 1570 xprt->stat.req_u += xprt->stat.sends - xprt->stat.recvs; 1571 xprt->stat.bklog_u += xprt->backlog.qlen; 1572 xprt->stat.sending_u += xprt->sending.qlen; 1573 xprt->stat.pending_u += xprt->pending.qlen; 1574 spin_unlock(&xprt->transport_lock); 1575 1576 req->rq_connect_cookie = connect_cookie; 1577 out_dequeue: 1578 trace_xprt_transmit(req, status); 1579 xprt_request_dequeue_transmit(task); 1580 rpc_wake_up_queued_task_set_status(&xprt->sending, task, status); 1581 return status; 1582 } 1583 1584 /** 1585 * xprt_transmit - send an RPC request on a transport 1586 * @task: controlling RPC task 1587 * 1588 * Attempts to drain the transmit queue. On exit, either the transport 1589 * signalled an error that needs to be handled before transmission can 1590 * resume, or @task finished transmitting, and detected that it already 1591 * received a reply. 1592 */ 1593 void 1594 xprt_transmit(struct rpc_task *task) 1595 { 1596 struct rpc_rqst *next, *req = task->tk_rqstp; 1597 struct rpc_xprt *xprt = req->rq_xprt; 1598 int counter, status; 1599 1600 spin_lock(&xprt->queue_lock); 1601 counter = 0; 1602 while (!list_empty(&xprt->xmit_queue)) { 1603 if (++counter == 20) 1604 break; 1605 next = list_first_entry(&xprt->xmit_queue, 1606 struct rpc_rqst, rq_xmit); 1607 xprt_pin_rqst(next); 1608 spin_unlock(&xprt->queue_lock); 1609 status = xprt_request_transmit(next, task); 1610 if (status == -EBADMSG && next != req) 1611 status = 0; 1612 spin_lock(&xprt->queue_lock); 1613 xprt_unpin_rqst(next); 1614 if (status == 0) { 1615 if (!xprt_request_data_received(task) || 1616 test_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate)) 1617 continue; 1618 } else if (test_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate)) 1619 task->tk_status = status; 1620 break; 1621 } 1622 spin_unlock(&xprt->queue_lock); 1623 } 1624 1625 static void xprt_complete_request_init(struct rpc_task *task) 1626 { 1627 if (task->tk_rqstp) 1628 xprt_request_init(task); 1629 } 1630 1631 void xprt_add_backlog(struct rpc_xprt *xprt, struct rpc_task *task) 1632 { 1633 set_bit(XPRT_CONGESTED, &xprt->state); 1634 rpc_sleep_on(&xprt->backlog, task, xprt_complete_request_init); 1635 } 1636 EXPORT_SYMBOL_GPL(xprt_add_backlog); 1637 1638 static bool __xprt_set_rq(struct rpc_task *task, void *data) 1639 { 1640 struct rpc_rqst *req = data; 1641 1642 if (task->tk_rqstp == NULL) { 1643 memset(req, 0, sizeof(*req)); /* mark unused */ 1644 task->tk_rqstp = req; 1645 return true; 1646 } 1647 return false; 1648 } 1649 1650 bool xprt_wake_up_backlog(struct rpc_xprt *xprt, struct rpc_rqst *req) 1651 { 1652 if (rpc_wake_up_first(&xprt->backlog, __xprt_set_rq, req) == NULL) { 1653 clear_bit(XPRT_CONGESTED, &xprt->state); 1654 return false; 1655 } 1656 return true; 1657 } 1658 EXPORT_SYMBOL_GPL(xprt_wake_up_backlog); 1659 1660 static bool xprt_throttle_congested(struct rpc_xprt *xprt, struct rpc_task *task) 1661 { 1662 bool ret = false; 1663 1664 if (!test_bit(XPRT_CONGESTED, &xprt->state)) 1665 goto out; 1666 spin_lock(&xprt->reserve_lock); 1667 if (test_bit(XPRT_CONGESTED, &xprt->state)) { 1668 xprt_add_backlog(xprt, task); 1669 ret = true; 1670 } 1671 spin_unlock(&xprt->reserve_lock); 1672 out: 1673 return ret; 1674 } 1675 1676 static struct rpc_rqst *xprt_dynamic_alloc_slot(struct rpc_xprt *xprt) 1677 { 1678 struct rpc_rqst *req = ERR_PTR(-EAGAIN); 1679 1680 if (xprt->num_reqs >= xprt->max_reqs) 1681 goto out; 1682 ++xprt->num_reqs; 1683 spin_unlock(&xprt->reserve_lock); 1684 req = kzalloc(sizeof(struct rpc_rqst), GFP_NOFS); 1685 spin_lock(&xprt->reserve_lock); 1686 if (req != NULL) 1687 goto out; 1688 --xprt->num_reqs; 1689 req = ERR_PTR(-ENOMEM); 1690 out: 1691 return req; 1692 } 1693 1694 static bool xprt_dynamic_free_slot(struct rpc_xprt *xprt, struct rpc_rqst *req) 1695 { 1696 if (xprt->num_reqs > xprt->min_reqs) { 1697 --xprt->num_reqs; 1698 kfree(req); 1699 return true; 1700 } 1701 return false; 1702 } 1703 1704 void xprt_alloc_slot(struct rpc_xprt *xprt, struct rpc_task *task) 1705 { 1706 struct rpc_rqst *req; 1707 1708 spin_lock(&xprt->reserve_lock); 1709 if (!list_empty(&xprt->free)) { 1710 req = list_entry(xprt->free.next, struct rpc_rqst, rq_list); 1711 list_del(&req->rq_list); 1712 goto out_init_req; 1713 } 1714 req = xprt_dynamic_alloc_slot(xprt); 1715 if (!IS_ERR(req)) 1716 goto out_init_req; 1717 switch (PTR_ERR(req)) { 1718 case -ENOMEM: 1719 dprintk("RPC: dynamic allocation of request slot " 1720 "failed! Retrying\n"); 1721 task->tk_status = -ENOMEM; 1722 break; 1723 case -EAGAIN: 1724 xprt_add_backlog(xprt, task); 1725 dprintk("RPC: waiting for request slot\n"); 1726 fallthrough; 1727 default: 1728 task->tk_status = -EAGAIN; 1729 } 1730 spin_unlock(&xprt->reserve_lock); 1731 return; 1732 out_init_req: 1733 xprt->stat.max_slots = max_t(unsigned int, xprt->stat.max_slots, 1734 xprt->num_reqs); 1735 spin_unlock(&xprt->reserve_lock); 1736 1737 task->tk_status = 0; 1738 task->tk_rqstp = req; 1739 } 1740 EXPORT_SYMBOL_GPL(xprt_alloc_slot); 1741 1742 void xprt_free_slot(struct rpc_xprt *xprt, struct rpc_rqst *req) 1743 { 1744 spin_lock(&xprt->reserve_lock); 1745 if (!xprt_wake_up_backlog(xprt, req) && 1746 !xprt_dynamic_free_slot(xprt, req)) { 1747 memset(req, 0, sizeof(*req)); /* mark unused */ 1748 list_add(&req->rq_list, &xprt->free); 1749 } 1750 spin_unlock(&xprt->reserve_lock); 1751 } 1752 EXPORT_SYMBOL_GPL(xprt_free_slot); 1753 1754 static void xprt_free_all_slots(struct rpc_xprt *xprt) 1755 { 1756 struct rpc_rqst *req; 1757 while (!list_empty(&xprt->free)) { 1758 req = list_first_entry(&xprt->free, struct rpc_rqst, rq_list); 1759 list_del(&req->rq_list); 1760 kfree(req); 1761 } 1762 } 1763 1764 static DEFINE_IDA(rpc_xprt_ids); 1765 1766 void xprt_cleanup_ids(void) 1767 { 1768 ida_destroy(&rpc_xprt_ids); 1769 } 1770 1771 static int xprt_alloc_id(struct rpc_xprt *xprt) 1772 { 1773 int id; 1774 1775 id = ida_simple_get(&rpc_xprt_ids, 0, 0, GFP_KERNEL); 1776 if (id < 0) 1777 return id; 1778 1779 xprt->id = id; 1780 return 0; 1781 } 1782 1783 static void xprt_free_id(struct rpc_xprt *xprt) 1784 { 1785 ida_simple_remove(&rpc_xprt_ids, xprt->id); 1786 } 1787 1788 struct rpc_xprt *xprt_alloc(struct net *net, size_t size, 1789 unsigned int num_prealloc, 1790 unsigned int max_alloc) 1791 { 1792 struct rpc_xprt *xprt; 1793 struct rpc_rqst *req; 1794 int i; 1795 1796 xprt = kzalloc(size, GFP_KERNEL); 1797 if (xprt == NULL) 1798 goto out; 1799 1800 xprt_alloc_id(xprt); 1801 xprt_init(xprt, net); 1802 1803 for (i = 0; i < num_prealloc; i++) { 1804 req = kzalloc(sizeof(struct rpc_rqst), GFP_KERNEL); 1805 if (!req) 1806 goto out_free; 1807 list_add(&req->rq_list, &xprt->free); 1808 } 1809 if (max_alloc > num_prealloc) 1810 xprt->max_reqs = max_alloc; 1811 else 1812 xprt->max_reqs = num_prealloc; 1813 xprt->min_reqs = num_prealloc; 1814 xprt->num_reqs = num_prealloc; 1815 1816 return xprt; 1817 1818 out_free: 1819 xprt_free(xprt); 1820 out: 1821 return NULL; 1822 } 1823 EXPORT_SYMBOL_GPL(xprt_alloc); 1824 1825 void xprt_free(struct rpc_xprt *xprt) 1826 { 1827 put_net(xprt->xprt_net); 1828 xprt_free_all_slots(xprt); 1829 xprt_free_id(xprt); 1830 rpc_sysfs_xprt_destroy(xprt); 1831 kfree_rcu(xprt, rcu); 1832 } 1833 EXPORT_SYMBOL_GPL(xprt_free); 1834 1835 static void 1836 xprt_init_connect_cookie(struct rpc_rqst *req, struct rpc_xprt *xprt) 1837 { 1838 req->rq_connect_cookie = xprt_connect_cookie(xprt) - 1; 1839 } 1840 1841 static __be32 1842 xprt_alloc_xid(struct rpc_xprt *xprt) 1843 { 1844 __be32 xid; 1845 1846 spin_lock(&xprt->reserve_lock); 1847 xid = (__force __be32)xprt->xid++; 1848 spin_unlock(&xprt->reserve_lock); 1849 return xid; 1850 } 1851 1852 static void 1853 xprt_init_xid(struct rpc_xprt *xprt) 1854 { 1855 xprt->xid = prandom_u32(); 1856 } 1857 1858 static void 1859 xprt_request_init(struct rpc_task *task) 1860 { 1861 struct rpc_xprt *xprt = task->tk_xprt; 1862 struct rpc_rqst *req = task->tk_rqstp; 1863 1864 req->rq_task = task; 1865 req->rq_xprt = xprt; 1866 req->rq_buffer = NULL; 1867 req->rq_xid = xprt_alloc_xid(xprt); 1868 xprt_init_connect_cookie(req, xprt); 1869 req->rq_snd_buf.len = 0; 1870 req->rq_snd_buf.buflen = 0; 1871 req->rq_rcv_buf.len = 0; 1872 req->rq_rcv_buf.buflen = 0; 1873 req->rq_snd_buf.bvec = NULL; 1874 req->rq_rcv_buf.bvec = NULL; 1875 req->rq_release_snd_buf = NULL; 1876 xprt_init_majortimeo(task, req); 1877 1878 trace_xprt_reserve(req); 1879 } 1880 1881 static void 1882 xprt_do_reserve(struct rpc_xprt *xprt, struct rpc_task *task) 1883 { 1884 xprt->ops->alloc_slot(xprt, task); 1885 if (task->tk_rqstp != NULL) 1886 xprt_request_init(task); 1887 } 1888 1889 /** 1890 * xprt_reserve - allocate an RPC request slot 1891 * @task: RPC task requesting a slot allocation 1892 * 1893 * If the transport is marked as being congested, or if no more 1894 * slots are available, place the task on the transport's 1895 * backlog queue. 1896 */ 1897 void xprt_reserve(struct rpc_task *task) 1898 { 1899 struct rpc_xprt *xprt = task->tk_xprt; 1900 1901 task->tk_status = 0; 1902 if (task->tk_rqstp != NULL) 1903 return; 1904 1905 task->tk_status = -EAGAIN; 1906 if (!xprt_throttle_congested(xprt, task)) 1907 xprt_do_reserve(xprt, task); 1908 } 1909 1910 /** 1911 * xprt_retry_reserve - allocate an RPC request slot 1912 * @task: RPC task requesting a slot allocation 1913 * 1914 * If no more slots are available, place the task on the transport's 1915 * backlog queue. 1916 * Note that the only difference with xprt_reserve is that we now 1917 * ignore the value of the XPRT_CONGESTED flag. 1918 */ 1919 void xprt_retry_reserve(struct rpc_task *task) 1920 { 1921 struct rpc_xprt *xprt = task->tk_xprt; 1922 1923 task->tk_status = 0; 1924 if (task->tk_rqstp != NULL) 1925 return; 1926 1927 task->tk_status = -EAGAIN; 1928 xprt_do_reserve(xprt, task); 1929 } 1930 1931 /** 1932 * xprt_release - release an RPC request slot 1933 * @task: task which is finished with the slot 1934 * 1935 */ 1936 void xprt_release(struct rpc_task *task) 1937 { 1938 struct rpc_xprt *xprt; 1939 struct rpc_rqst *req = task->tk_rqstp; 1940 1941 if (req == NULL) { 1942 if (task->tk_client) { 1943 xprt = task->tk_xprt; 1944 xprt_release_write(xprt, task); 1945 } 1946 return; 1947 } 1948 1949 xprt = req->rq_xprt; 1950 xprt_request_dequeue_xprt(task); 1951 spin_lock(&xprt->transport_lock); 1952 xprt->ops->release_xprt(xprt, task); 1953 if (xprt->ops->release_request) 1954 xprt->ops->release_request(task); 1955 xprt_schedule_autodisconnect(xprt); 1956 spin_unlock(&xprt->transport_lock); 1957 if (req->rq_buffer) 1958 xprt->ops->buf_free(task); 1959 xdr_free_bvec(&req->rq_rcv_buf); 1960 xdr_free_bvec(&req->rq_snd_buf); 1961 if (req->rq_cred != NULL) 1962 put_rpccred(req->rq_cred); 1963 if (req->rq_release_snd_buf) 1964 req->rq_release_snd_buf(req); 1965 1966 task->tk_rqstp = NULL; 1967 if (likely(!bc_prealloc(req))) 1968 xprt->ops->free_slot(xprt, req); 1969 else 1970 xprt_free_bc_request(req); 1971 } 1972 1973 #ifdef CONFIG_SUNRPC_BACKCHANNEL 1974 void 1975 xprt_init_bc_request(struct rpc_rqst *req, struct rpc_task *task) 1976 { 1977 struct xdr_buf *xbufp = &req->rq_snd_buf; 1978 1979 task->tk_rqstp = req; 1980 req->rq_task = task; 1981 xprt_init_connect_cookie(req, req->rq_xprt); 1982 /* 1983 * Set up the xdr_buf length. 1984 * This also indicates that the buffer is XDR encoded already. 1985 */ 1986 xbufp->len = xbufp->head[0].iov_len + xbufp->page_len + 1987 xbufp->tail[0].iov_len; 1988 } 1989 #endif 1990 1991 static void xprt_init(struct rpc_xprt *xprt, struct net *net) 1992 { 1993 kref_init(&xprt->kref); 1994 1995 spin_lock_init(&xprt->transport_lock); 1996 spin_lock_init(&xprt->reserve_lock); 1997 spin_lock_init(&xprt->queue_lock); 1998 1999 INIT_LIST_HEAD(&xprt->free); 2000 xprt->recv_queue = RB_ROOT; 2001 INIT_LIST_HEAD(&xprt->xmit_queue); 2002 #if defined(CONFIG_SUNRPC_BACKCHANNEL) 2003 spin_lock_init(&xprt->bc_pa_lock); 2004 INIT_LIST_HEAD(&xprt->bc_pa_list); 2005 #endif /* CONFIG_SUNRPC_BACKCHANNEL */ 2006 INIT_LIST_HEAD(&xprt->xprt_switch); 2007 2008 xprt->last_used = jiffies; 2009 xprt->cwnd = RPC_INITCWND; 2010 xprt->bind_index = 0; 2011 2012 rpc_init_wait_queue(&xprt->binding, "xprt_binding"); 2013 rpc_init_wait_queue(&xprt->pending, "xprt_pending"); 2014 rpc_init_wait_queue(&xprt->sending, "xprt_sending"); 2015 rpc_init_priority_wait_queue(&xprt->backlog, "xprt_backlog"); 2016 2017 xprt_init_xid(xprt); 2018 2019 xprt->xprt_net = get_net(net); 2020 } 2021 2022 /** 2023 * xprt_create_transport - create an RPC transport 2024 * @args: rpc transport creation arguments 2025 * 2026 */ 2027 struct rpc_xprt *xprt_create_transport(struct xprt_create *args) 2028 { 2029 struct rpc_xprt *xprt; 2030 const struct xprt_class *t; 2031 2032 t = xprt_class_find_by_ident(args->ident); 2033 if (!t) { 2034 dprintk("RPC: transport (%d) not supported\n", args->ident); 2035 return ERR_PTR(-EIO); 2036 } 2037 2038 xprt = t->setup(args); 2039 xprt_class_release(t); 2040 2041 if (IS_ERR(xprt)) 2042 goto out; 2043 if (args->flags & XPRT_CREATE_NO_IDLE_TIMEOUT) 2044 xprt->idle_timeout = 0; 2045 INIT_WORK(&xprt->task_cleanup, xprt_autoclose); 2046 if (xprt_has_timer(xprt)) 2047 timer_setup(&xprt->timer, xprt_init_autodisconnect, 0); 2048 else 2049 timer_setup(&xprt->timer, NULL, 0); 2050 2051 if (strlen(args->servername) > RPC_MAXNETNAMELEN) { 2052 xprt_destroy(xprt); 2053 return ERR_PTR(-EINVAL); 2054 } 2055 xprt->servername = kstrdup(args->servername, GFP_KERNEL); 2056 if (xprt->servername == NULL) { 2057 xprt_destroy(xprt); 2058 return ERR_PTR(-ENOMEM); 2059 } 2060 2061 rpc_xprt_debugfs_register(xprt); 2062 2063 trace_xprt_create(xprt); 2064 out: 2065 return xprt; 2066 } 2067 2068 static void xprt_destroy_cb(struct work_struct *work) 2069 { 2070 struct rpc_xprt *xprt = 2071 container_of(work, struct rpc_xprt, task_cleanup); 2072 2073 trace_xprt_destroy(xprt); 2074 2075 rpc_xprt_debugfs_unregister(xprt); 2076 rpc_destroy_wait_queue(&xprt->binding); 2077 rpc_destroy_wait_queue(&xprt->pending); 2078 rpc_destroy_wait_queue(&xprt->sending); 2079 rpc_destroy_wait_queue(&xprt->backlog); 2080 kfree(xprt->servername); 2081 /* 2082 * Destroy any existing back channel 2083 */ 2084 xprt_destroy_backchannel(xprt, UINT_MAX); 2085 2086 /* 2087 * Tear down transport state and free the rpc_xprt 2088 */ 2089 xprt->ops->destroy(xprt); 2090 } 2091 2092 /** 2093 * xprt_destroy - destroy an RPC transport, killing off all requests. 2094 * @xprt: transport to destroy 2095 * 2096 */ 2097 static void xprt_destroy(struct rpc_xprt *xprt) 2098 { 2099 /* 2100 * Exclude transport connect/disconnect handlers and autoclose 2101 */ 2102 wait_on_bit_lock(&xprt->state, XPRT_LOCKED, TASK_UNINTERRUPTIBLE); 2103 2104 del_timer_sync(&xprt->timer); 2105 2106 /* 2107 * Destroy sockets etc from the system workqueue so they can 2108 * safely flush receive work running on rpciod. 2109 */ 2110 INIT_WORK(&xprt->task_cleanup, xprt_destroy_cb); 2111 schedule_work(&xprt->task_cleanup); 2112 } 2113 2114 static void xprt_destroy_kref(struct kref *kref) 2115 { 2116 xprt_destroy(container_of(kref, struct rpc_xprt, kref)); 2117 } 2118 2119 /** 2120 * xprt_get - return a reference to an RPC transport. 2121 * @xprt: pointer to the transport 2122 * 2123 */ 2124 struct rpc_xprt *xprt_get(struct rpc_xprt *xprt) 2125 { 2126 if (xprt != NULL && kref_get_unless_zero(&xprt->kref)) 2127 return xprt; 2128 return NULL; 2129 } 2130 EXPORT_SYMBOL_GPL(xprt_get); 2131 2132 /** 2133 * xprt_put - release a reference to an RPC transport. 2134 * @xprt: pointer to the transport 2135 * 2136 */ 2137 void xprt_put(struct rpc_xprt *xprt) 2138 { 2139 if (xprt != NULL) 2140 kref_put(&xprt->kref, xprt_destroy_kref); 2141 } 2142 EXPORT_SYMBOL_GPL(xprt_put); 2143