1 // SPDX-License-Identifier: GPL-2.0-only 2 /* 3 * linux/net/sunrpc/xprt.c 4 * 5 * This is a generic RPC call interface supporting congestion avoidance, 6 * and asynchronous calls. 7 * 8 * The interface works like this: 9 * 10 * - When a process places a call, it allocates a request slot if 11 * one is available. Otherwise, it sleeps on the backlog queue 12 * (xprt_reserve). 13 * - Next, the caller puts together the RPC message, stuffs it into 14 * the request struct, and calls xprt_transmit(). 15 * - xprt_transmit sends the message and installs the caller on the 16 * transport's wait list. At the same time, if a reply is expected, 17 * it installs a timer that is run after the packet's timeout has 18 * expired. 19 * - When a packet arrives, the data_ready handler walks the list of 20 * pending requests for that transport. If a matching XID is found, the 21 * caller is woken up, and the timer removed. 22 * - When no reply arrives within the timeout interval, the timer is 23 * fired by the kernel and runs xprt_timer(). It either adjusts the 24 * timeout values (minor timeout) or wakes up the caller with a status 25 * of -ETIMEDOUT. 26 * - When the caller receives a notification from RPC that a reply arrived, 27 * it should release the RPC slot, and process the reply. 28 * If the call timed out, it may choose to retry the operation by 29 * adjusting the initial timeout value, and simply calling rpc_call 30 * again. 31 * 32 * Support for async RPC is done through a set of RPC-specific scheduling 33 * primitives that `transparently' work for processes as well as async 34 * tasks that rely on callbacks. 35 * 36 * Copyright (C) 1995-1997, Olaf Kirch <okir@monad.swb.de> 37 * 38 * Transport switch API copyright (C) 2005, Chuck Lever <cel@netapp.com> 39 */ 40 41 #include <linux/module.h> 42 43 #include <linux/types.h> 44 #include <linux/interrupt.h> 45 #include <linux/workqueue.h> 46 #include <linux/net.h> 47 #include <linux/ktime.h> 48 49 #include <linux/sunrpc/clnt.h> 50 #include <linux/sunrpc/metrics.h> 51 #include <linux/sunrpc/bc_xprt.h> 52 #include <linux/rcupdate.h> 53 #include <linux/sched/mm.h> 54 55 #include <trace/events/sunrpc.h> 56 57 #include "sunrpc.h" 58 #include "sysfs.h" 59 60 /* 61 * Local variables 62 */ 63 64 #if IS_ENABLED(CONFIG_SUNRPC_DEBUG) 65 # define RPCDBG_FACILITY RPCDBG_XPRT 66 #endif 67 68 /* 69 * Local functions 70 */ 71 static void xprt_init(struct rpc_xprt *xprt, struct net *net); 72 static __be32 xprt_alloc_xid(struct rpc_xprt *xprt); 73 static void xprt_destroy(struct rpc_xprt *xprt); 74 static void xprt_request_init(struct rpc_task *task); 75 76 static DEFINE_SPINLOCK(xprt_list_lock); 77 static LIST_HEAD(xprt_list); 78 79 static unsigned long xprt_request_timeout(const struct rpc_rqst *req) 80 { 81 unsigned long timeout = jiffies + req->rq_timeout; 82 83 if (time_before(timeout, req->rq_majortimeo)) 84 return timeout; 85 return req->rq_majortimeo; 86 } 87 88 /** 89 * xprt_register_transport - register a transport implementation 90 * @transport: transport to register 91 * 92 * If a transport implementation is loaded as a kernel module, it can 93 * call this interface to make itself known to the RPC client. 94 * 95 * Returns: 96 * 0: transport successfully registered 97 * -EEXIST: transport already registered 98 * -EINVAL: transport module being unloaded 99 */ 100 int xprt_register_transport(struct xprt_class *transport) 101 { 102 struct xprt_class *t; 103 int result; 104 105 result = -EEXIST; 106 spin_lock(&xprt_list_lock); 107 list_for_each_entry(t, &xprt_list, list) { 108 /* don't register the same transport class twice */ 109 if (t->ident == transport->ident) 110 goto out; 111 } 112 113 list_add_tail(&transport->list, &xprt_list); 114 printk(KERN_INFO "RPC: Registered %s transport module.\n", 115 transport->name); 116 result = 0; 117 118 out: 119 spin_unlock(&xprt_list_lock); 120 return result; 121 } 122 EXPORT_SYMBOL_GPL(xprt_register_transport); 123 124 /** 125 * xprt_unregister_transport - unregister a transport implementation 126 * @transport: transport to unregister 127 * 128 * Returns: 129 * 0: transport successfully unregistered 130 * -ENOENT: transport never registered 131 */ 132 int xprt_unregister_transport(struct xprt_class *transport) 133 { 134 struct xprt_class *t; 135 int result; 136 137 result = 0; 138 spin_lock(&xprt_list_lock); 139 list_for_each_entry(t, &xprt_list, list) { 140 if (t == transport) { 141 printk(KERN_INFO 142 "RPC: Unregistered %s transport module.\n", 143 transport->name); 144 list_del_init(&transport->list); 145 goto out; 146 } 147 } 148 result = -ENOENT; 149 150 out: 151 spin_unlock(&xprt_list_lock); 152 return result; 153 } 154 EXPORT_SYMBOL_GPL(xprt_unregister_transport); 155 156 static void 157 xprt_class_release(const struct xprt_class *t) 158 { 159 module_put(t->owner); 160 } 161 162 static const struct xprt_class * 163 xprt_class_find_by_ident_locked(int ident) 164 { 165 const struct xprt_class *t; 166 167 list_for_each_entry(t, &xprt_list, list) { 168 if (t->ident != ident) 169 continue; 170 if (!try_module_get(t->owner)) 171 continue; 172 return t; 173 } 174 return NULL; 175 } 176 177 static const struct xprt_class * 178 xprt_class_find_by_ident(int ident) 179 { 180 const struct xprt_class *t; 181 182 spin_lock(&xprt_list_lock); 183 t = xprt_class_find_by_ident_locked(ident); 184 spin_unlock(&xprt_list_lock); 185 return t; 186 } 187 188 static const struct xprt_class * 189 xprt_class_find_by_netid_locked(const char *netid) 190 { 191 const struct xprt_class *t; 192 unsigned int i; 193 194 list_for_each_entry(t, &xprt_list, list) { 195 for (i = 0; t->netid[i][0] != '\0'; i++) { 196 if (strcmp(t->netid[i], netid) != 0) 197 continue; 198 if (!try_module_get(t->owner)) 199 continue; 200 return t; 201 } 202 } 203 return NULL; 204 } 205 206 static const struct xprt_class * 207 xprt_class_find_by_netid(const char *netid) 208 { 209 const struct xprt_class *t; 210 211 spin_lock(&xprt_list_lock); 212 t = xprt_class_find_by_netid_locked(netid); 213 if (!t) { 214 spin_unlock(&xprt_list_lock); 215 request_module("rpc%s", netid); 216 spin_lock(&xprt_list_lock); 217 t = xprt_class_find_by_netid_locked(netid); 218 } 219 spin_unlock(&xprt_list_lock); 220 return t; 221 } 222 223 /** 224 * xprt_find_transport_ident - convert a netid into a transport identifier 225 * @netid: transport to load 226 * 227 * Returns: 228 * > 0: transport identifier 229 * -ENOENT: transport module not available 230 */ 231 int xprt_find_transport_ident(const char *netid) 232 { 233 const struct xprt_class *t; 234 int ret; 235 236 t = xprt_class_find_by_netid(netid); 237 if (!t) 238 return -ENOENT; 239 ret = t->ident; 240 xprt_class_release(t); 241 return ret; 242 } 243 EXPORT_SYMBOL_GPL(xprt_find_transport_ident); 244 245 static void xprt_clear_locked(struct rpc_xprt *xprt) 246 { 247 xprt->snd_task = NULL; 248 if (!test_bit(XPRT_CLOSE_WAIT, &xprt->state)) { 249 smp_mb__before_atomic(); 250 clear_bit(XPRT_LOCKED, &xprt->state); 251 smp_mb__after_atomic(); 252 } else 253 queue_work(xprtiod_workqueue, &xprt->task_cleanup); 254 } 255 256 /** 257 * xprt_reserve_xprt - serialize write access to transports 258 * @task: task that is requesting access to the transport 259 * @xprt: pointer to the target transport 260 * 261 * This prevents mixing the payload of separate requests, and prevents 262 * transport connects from colliding with writes. No congestion control 263 * is provided. 264 */ 265 int xprt_reserve_xprt(struct rpc_xprt *xprt, struct rpc_task *task) 266 { 267 struct rpc_rqst *req = task->tk_rqstp; 268 269 if (test_and_set_bit(XPRT_LOCKED, &xprt->state)) { 270 if (task == xprt->snd_task) 271 goto out_locked; 272 goto out_sleep; 273 } 274 if (test_bit(XPRT_WRITE_SPACE, &xprt->state)) 275 goto out_unlock; 276 xprt->snd_task = task; 277 278 out_locked: 279 trace_xprt_reserve_xprt(xprt, task); 280 return 1; 281 282 out_unlock: 283 xprt_clear_locked(xprt); 284 out_sleep: 285 task->tk_status = -EAGAIN; 286 if (RPC_IS_SOFT(task)) 287 rpc_sleep_on_timeout(&xprt->sending, task, NULL, 288 xprt_request_timeout(req)); 289 else 290 rpc_sleep_on(&xprt->sending, task, NULL); 291 return 0; 292 } 293 EXPORT_SYMBOL_GPL(xprt_reserve_xprt); 294 295 static bool 296 xprt_need_congestion_window_wait(struct rpc_xprt *xprt) 297 { 298 return test_bit(XPRT_CWND_WAIT, &xprt->state); 299 } 300 301 static void 302 xprt_set_congestion_window_wait(struct rpc_xprt *xprt) 303 { 304 if (!list_empty(&xprt->xmit_queue)) { 305 /* Peek at head of queue to see if it can make progress */ 306 if (list_first_entry(&xprt->xmit_queue, struct rpc_rqst, 307 rq_xmit)->rq_cong) 308 return; 309 } 310 set_bit(XPRT_CWND_WAIT, &xprt->state); 311 } 312 313 static void 314 xprt_test_and_clear_congestion_window_wait(struct rpc_xprt *xprt) 315 { 316 if (!RPCXPRT_CONGESTED(xprt)) 317 clear_bit(XPRT_CWND_WAIT, &xprt->state); 318 } 319 320 /* 321 * xprt_reserve_xprt_cong - serialize write access to transports 322 * @task: task that is requesting access to the transport 323 * 324 * Same as xprt_reserve_xprt, but Van Jacobson congestion control is 325 * integrated into the decision of whether a request is allowed to be 326 * woken up and given access to the transport. 327 * Note that the lock is only granted if we know there are free slots. 328 */ 329 int xprt_reserve_xprt_cong(struct rpc_xprt *xprt, struct rpc_task *task) 330 { 331 struct rpc_rqst *req = task->tk_rqstp; 332 333 if (test_and_set_bit(XPRT_LOCKED, &xprt->state)) { 334 if (task == xprt->snd_task) 335 goto out_locked; 336 goto out_sleep; 337 } 338 if (req == NULL) { 339 xprt->snd_task = task; 340 goto out_locked; 341 } 342 if (test_bit(XPRT_WRITE_SPACE, &xprt->state)) 343 goto out_unlock; 344 if (!xprt_need_congestion_window_wait(xprt)) { 345 xprt->snd_task = task; 346 goto out_locked; 347 } 348 out_unlock: 349 xprt_clear_locked(xprt); 350 out_sleep: 351 task->tk_status = -EAGAIN; 352 if (RPC_IS_SOFT(task)) 353 rpc_sleep_on_timeout(&xprt->sending, task, NULL, 354 xprt_request_timeout(req)); 355 else 356 rpc_sleep_on(&xprt->sending, task, NULL); 357 return 0; 358 out_locked: 359 trace_xprt_reserve_cong(xprt, task); 360 return 1; 361 } 362 EXPORT_SYMBOL_GPL(xprt_reserve_xprt_cong); 363 364 static inline int xprt_lock_write(struct rpc_xprt *xprt, struct rpc_task *task) 365 { 366 int retval; 367 368 if (test_bit(XPRT_LOCKED, &xprt->state) && xprt->snd_task == task) 369 return 1; 370 spin_lock(&xprt->transport_lock); 371 retval = xprt->ops->reserve_xprt(xprt, task); 372 spin_unlock(&xprt->transport_lock); 373 return retval; 374 } 375 376 static bool __xprt_lock_write_func(struct rpc_task *task, void *data) 377 { 378 struct rpc_xprt *xprt = data; 379 380 xprt->snd_task = task; 381 return true; 382 } 383 384 static void __xprt_lock_write_next(struct rpc_xprt *xprt) 385 { 386 if (test_and_set_bit(XPRT_LOCKED, &xprt->state)) 387 return; 388 if (test_bit(XPRT_WRITE_SPACE, &xprt->state)) 389 goto out_unlock; 390 if (rpc_wake_up_first_on_wq(xprtiod_workqueue, &xprt->sending, 391 __xprt_lock_write_func, xprt)) 392 return; 393 out_unlock: 394 xprt_clear_locked(xprt); 395 } 396 397 static void __xprt_lock_write_next_cong(struct rpc_xprt *xprt) 398 { 399 if (test_and_set_bit(XPRT_LOCKED, &xprt->state)) 400 return; 401 if (test_bit(XPRT_WRITE_SPACE, &xprt->state)) 402 goto out_unlock; 403 if (xprt_need_congestion_window_wait(xprt)) 404 goto out_unlock; 405 if (rpc_wake_up_first_on_wq(xprtiod_workqueue, &xprt->sending, 406 __xprt_lock_write_func, xprt)) 407 return; 408 out_unlock: 409 xprt_clear_locked(xprt); 410 } 411 412 /** 413 * xprt_release_xprt - allow other requests to use a transport 414 * @xprt: transport with other tasks potentially waiting 415 * @task: task that is releasing access to the transport 416 * 417 * Note that "task" can be NULL. No congestion control is provided. 418 */ 419 void xprt_release_xprt(struct rpc_xprt *xprt, struct rpc_task *task) 420 { 421 if (xprt->snd_task == task) { 422 xprt_clear_locked(xprt); 423 __xprt_lock_write_next(xprt); 424 } 425 trace_xprt_release_xprt(xprt, task); 426 } 427 EXPORT_SYMBOL_GPL(xprt_release_xprt); 428 429 /** 430 * xprt_release_xprt_cong - allow other requests to use a transport 431 * @xprt: transport with other tasks potentially waiting 432 * @task: task that is releasing access to the transport 433 * 434 * Note that "task" can be NULL. Another task is awoken to use the 435 * transport if the transport's congestion window allows it. 436 */ 437 void xprt_release_xprt_cong(struct rpc_xprt *xprt, struct rpc_task *task) 438 { 439 if (xprt->snd_task == task) { 440 xprt_clear_locked(xprt); 441 __xprt_lock_write_next_cong(xprt); 442 } 443 trace_xprt_release_cong(xprt, task); 444 } 445 EXPORT_SYMBOL_GPL(xprt_release_xprt_cong); 446 447 void xprt_release_write(struct rpc_xprt *xprt, struct rpc_task *task) 448 { 449 if (xprt->snd_task != task) 450 return; 451 spin_lock(&xprt->transport_lock); 452 xprt->ops->release_xprt(xprt, task); 453 spin_unlock(&xprt->transport_lock); 454 } 455 456 /* 457 * Van Jacobson congestion avoidance. Check if the congestion window 458 * overflowed. Put the task to sleep if this is the case. 459 */ 460 static int 461 __xprt_get_cong(struct rpc_xprt *xprt, struct rpc_rqst *req) 462 { 463 if (req->rq_cong) 464 return 1; 465 trace_xprt_get_cong(xprt, req->rq_task); 466 if (RPCXPRT_CONGESTED(xprt)) { 467 xprt_set_congestion_window_wait(xprt); 468 return 0; 469 } 470 req->rq_cong = 1; 471 xprt->cong += RPC_CWNDSCALE; 472 return 1; 473 } 474 475 /* 476 * Adjust the congestion window, and wake up the next task 477 * that has been sleeping due to congestion 478 */ 479 static void 480 __xprt_put_cong(struct rpc_xprt *xprt, struct rpc_rqst *req) 481 { 482 if (!req->rq_cong) 483 return; 484 req->rq_cong = 0; 485 xprt->cong -= RPC_CWNDSCALE; 486 xprt_test_and_clear_congestion_window_wait(xprt); 487 trace_xprt_put_cong(xprt, req->rq_task); 488 __xprt_lock_write_next_cong(xprt); 489 } 490 491 /** 492 * xprt_request_get_cong - Request congestion control credits 493 * @xprt: pointer to transport 494 * @req: pointer to RPC request 495 * 496 * Useful for transports that require congestion control. 497 */ 498 bool 499 xprt_request_get_cong(struct rpc_xprt *xprt, struct rpc_rqst *req) 500 { 501 bool ret = false; 502 503 if (req->rq_cong) 504 return true; 505 spin_lock(&xprt->transport_lock); 506 ret = __xprt_get_cong(xprt, req) != 0; 507 spin_unlock(&xprt->transport_lock); 508 return ret; 509 } 510 EXPORT_SYMBOL_GPL(xprt_request_get_cong); 511 512 /** 513 * xprt_release_rqst_cong - housekeeping when request is complete 514 * @task: RPC request that recently completed 515 * 516 * Useful for transports that require congestion control. 517 */ 518 void xprt_release_rqst_cong(struct rpc_task *task) 519 { 520 struct rpc_rqst *req = task->tk_rqstp; 521 522 __xprt_put_cong(req->rq_xprt, req); 523 } 524 EXPORT_SYMBOL_GPL(xprt_release_rqst_cong); 525 526 static void xprt_clear_congestion_window_wait_locked(struct rpc_xprt *xprt) 527 { 528 if (test_and_clear_bit(XPRT_CWND_WAIT, &xprt->state)) 529 __xprt_lock_write_next_cong(xprt); 530 } 531 532 /* 533 * Clear the congestion window wait flag and wake up the next 534 * entry on xprt->sending 535 */ 536 static void 537 xprt_clear_congestion_window_wait(struct rpc_xprt *xprt) 538 { 539 if (test_and_clear_bit(XPRT_CWND_WAIT, &xprt->state)) { 540 spin_lock(&xprt->transport_lock); 541 __xprt_lock_write_next_cong(xprt); 542 spin_unlock(&xprt->transport_lock); 543 } 544 } 545 546 /** 547 * xprt_adjust_cwnd - adjust transport congestion window 548 * @xprt: pointer to xprt 549 * @task: recently completed RPC request used to adjust window 550 * @result: result code of completed RPC request 551 * 552 * The transport code maintains an estimate on the maximum number of out- 553 * standing RPC requests, using a smoothed version of the congestion 554 * avoidance implemented in 44BSD. This is basically the Van Jacobson 555 * congestion algorithm: If a retransmit occurs, the congestion window is 556 * halved; otherwise, it is incremented by 1/cwnd when 557 * 558 * - a reply is received and 559 * - a full number of requests are outstanding and 560 * - the congestion window hasn't been updated recently. 561 */ 562 void xprt_adjust_cwnd(struct rpc_xprt *xprt, struct rpc_task *task, int result) 563 { 564 struct rpc_rqst *req = task->tk_rqstp; 565 unsigned long cwnd = xprt->cwnd; 566 567 if (result >= 0 && cwnd <= xprt->cong) { 568 /* The (cwnd >> 1) term makes sure 569 * the result gets rounded properly. */ 570 cwnd += (RPC_CWNDSCALE * RPC_CWNDSCALE + (cwnd >> 1)) / cwnd; 571 if (cwnd > RPC_MAXCWND(xprt)) 572 cwnd = RPC_MAXCWND(xprt); 573 __xprt_lock_write_next_cong(xprt); 574 } else if (result == -ETIMEDOUT) { 575 cwnd >>= 1; 576 if (cwnd < RPC_CWNDSCALE) 577 cwnd = RPC_CWNDSCALE; 578 } 579 dprintk("RPC: cong %ld, cwnd was %ld, now %ld\n", 580 xprt->cong, xprt->cwnd, cwnd); 581 xprt->cwnd = cwnd; 582 __xprt_put_cong(xprt, req); 583 } 584 EXPORT_SYMBOL_GPL(xprt_adjust_cwnd); 585 586 /** 587 * xprt_wake_pending_tasks - wake all tasks on a transport's pending queue 588 * @xprt: transport with waiting tasks 589 * @status: result code to plant in each task before waking it 590 * 591 */ 592 void xprt_wake_pending_tasks(struct rpc_xprt *xprt, int status) 593 { 594 if (status < 0) 595 rpc_wake_up_status(&xprt->pending, status); 596 else 597 rpc_wake_up(&xprt->pending); 598 } 599 EXPORT_SYMBOL_GPL(xprt_wake_pending_tasks); 600 601 /** 602 * xprt_wait_for_buffer_space - wait for transport output buffer to clear 603 * @xprt: transport 604 * 605 * Note that we only set the timer for the case of RPC_IS_SOFT(), since 606 * we don't in general want to force a socket disconnection due to 607 * an incomplete RPC call transmission. 608 */ 609 void xprt_wait_for_buffer_space(struct rpc_xprt *xprt) 610 { 611 set_bit(XPRT_WRITE_SPACE, &xprt->state); 612 } 613 EXPORT_SYMBOL_GPL(xprt_wait_for_buffer_space); 614 615 static bool 616 xprt_clear_write_space_locked(struct rpc_xprt *xprt) 617 { 618 if (test_and_clear_bit(XPRT_WRITE_SPACE, &xprt->state)) { 619 __xprt_lock_write_next(xprt); 620 dprintk("RPC: write space: waking waiting task on " 621 "xprt %p\n", xprt); 622 return true; 623 } 624 return false; 625 } 626 627 /** 628 * xprt_write_space - wake the task waiting for transport output buffer space 629 * @xprt: transport with waiting tasks 630 * 631 * Can be called in a soft IRQ context, so xprt_write_space never sleeps. 632 */ 633 bool xprt_write_space(struct rpc_xprt *xprt) 634 { 635 bool ret; 636 637 if (!test_bit(XPRT_WRITE_SPACE, &xprt->state)) 638 return false; 639 spin_lock(&xprt->transport_lock); 640 ret = xprt_clear_write_space_locked(xprt); 641 spin_unlock(&xprt->transport_lock); 642 return ret; 643 } 644 EXPORT_SYMBOL_GPL(xprt_write_space); 645 646 static unsigned long xprt_abs_ktime_to_jiffies(ktime_t abstime) 647 { 648 s64 delta = ktime_to_ns(ktime_get() - abstime); 649 return likely(delta >= 0) ? 650 jiffies - nsecs_to_jiffies(delta) : 651 jiffies + nsecs_to_jiffies(-delta); 652 } 653 654 static unsigned long xprt_calc_majortimeo(struct rpc_rqst *req) 655 { 656 const struct rpc_timeout *to = req->rq_task->tk_client->cl_timeout; 657 unsigned long majortimeo = req->rq_timeout; 658 659 if (to->to_exponential) 660 majortimeo <<= to->to_retries; 661 else 662 majortimeo += to->to_increment * to->to_retries; 663 if (majortimeo > to->to_maxval || majortimeo == 0) 664 majortimeo = to->to_maxval; 665 return majortimeo; 666 } 667 668 static void xprt_reset_majortimeo(struct rpc_rqst *req) 669 { 670 req->rq_majortimeo += xprt_calc_majortimeo(req); 671 } 672 673 static void xprt_reset_minortimeo(struct rpc_rqst *req) 674 { 675 req->rq_minortimeo += req->rq_timeout; 676 } 677 678 static void xprt_init_majortimeo(struct rpc_task *task, struct rpc_rqst *req) 679 { 680 unsigned long time_init; 681 struct rpc_xprt *xprt = req->rq_xprt; 682 683 if (likely(xprt && xprt_connected(xprt))) 684 time_init = jiffies; 685 else 686 time_init = xprt_abs_ktime_to_jiffies(task->tk_start); 687 req->rq_timeout = task->tk_client->cl_timeout->to_initval; 688 req->rq_majortimeo = time_init + xprt_calc_majortimeo(req); 689 req->rq_minortimeo = time_init + req->rq_timeout; 690 } 691 692 /** 693 * xprt_adjust_timeout - adjust timeout values for next retransmit 694 * @req: RPC request containing parameters to use for the adjustment 695 * 696 */ 697 int xprt_adjust_timeout(struct rpc_rqst *req) 698 { 699 struct rpc_xprt *xprt = req->rq_xprt; 700 const struct rpc_timeout *to = req->rq_task->tk_client->cl_timeout; 701 int status = 0; 702 703 if (time_before(jiffies, req->rq_majortimeo)) { 704 if (time_before(jiffies, req->rq_minortimeo)) 705 return status; 706 if (to->to_exponential) 707 req->rq_timeout <<= 1; 708 else 709 req->rq_timeout += to->to_increment; 710 if (to->to_maxval && req->rq_timeout >= to->to_maxval) 711 req->rq_timeout = to->to_maxval; 712 req->rq_retries++; 713 } else { 714 req->rq_timeout = to->to_initval; 715 req->rq_retries = 0; 716 xprt_reset_majortimeo(req); 717 /* Reset the RTT counters == "slow start" */ 718 spin_lock(&xprt->transport_lock); 719 rpc_init_rtt(req->rq_task->tk_client->cl_rtt, to->to_initval); 720 spin_unlock(&xprt->transport_lock); 721 status = -ETIMEDOUT; 722 } 723 xprt_reset_minortimeo(req); 724 725 if (req->rq_timeout == 0) { 726 printk(KERN_WARNING "xprt_adjust_timeout: rq_timeout = 0!\n"); 727 req->rq_timeout = 5 * HZ; 728 } 729 return status; 730 } 731 732 static void xprt_autoclose(struct work_struct *work) 733 { 734 struct rpc_xprt *xprt = 735 container_of(work, struct rpc_xprt, task_cleanup); 736 unsigned int pflags = memalloc_nofs_save(); 737 738 trace_xprt_disconnect_auto(xprt); 739 clear_bit(XPRT_CLOSE_WAIT, &xprt->state); 740 xprt->ops->close(xprt); 741 xprt_release_write(xprt, NULL); 742 wake_up_bit(&xprt->state, XPRT_LOCKED); 743 memalloc_nofs_restore(pflags); 744 } 745 746 /** 747 * xprt_disconnect_done - mark a transport as disconnected 748 * @xprt: transport to flag for disconnect 749 * 750 */ 751 void xprt_disconnect_done(struct rpc_xprt *xprt) 752 { 753 trace_xprt_disconnect_done(xprt); 754 spin_lock(&xprt->transport_lock); 755 xprt_clear_connected(xprt); 756 xprt_clear_write_space_locked(xprt); 757 xprt_clear_congestion_window_wait_locked(xprt); 758 xprt_wake_pending_tasks(xprt, -ENOTCONN); 759 spin_unlock(&xprt->transport_lock); 760 } 761 EXPORT_SYMBOL_GPL(xprt_disconnect_done); 762 763 /** 764 * xprt_force_disconnect - force a transport to disconnect 765 * @xprt: transport to disconnect 766 * 767 */ 768 void xprt_force_disconnect(struct rpc_xprt *xprt) 769 { 770 trace_xprt_disconnect_force(xprt); 771 772 /* Don't race with the test_bit() in xprt_clear_locked() */ 773 spin_lock(&xprt->transport_lock); 774 set_bit(XPRT_CLOSE_WAIT, &xprt->state); 775 /* Try to schedule an autoclose RPC call */ 776 if (test_and_set_bit(XPRT_LOCKED, &xprt->state) == 0) 777 queue_work(xprtiod_workqueue, &xprt->task_cleanup); 778 else if (xprt->snd_task && !test_bit(XPRT_SND_IS_COOKIE, &xprt->state)) 779 rpc_wake_up_queued_task_set_status(&xprt->pending, 780 xprt->snd_task, -ENOTCONN); 781 spin_unlock(&xprt->transport_lock); 782 } 783 EXPORT_SYMBOL_GPL(xprt_force_disconnect); 784 785 static unsigned int 786 xprt_connect_cookie(struct rpc_xprt *xprt) 787 { 788 return READ_ONCE(xprt->connect_cookie); 789 } 790 791 static bool 792 xprt_request_retransmit_after_disconnect(struct rpc_task *task) 793 { 794 struct rpc_rqst *req = task->tk_rqstp; 795 struct rpc_xprt *xprt = req->rq_xprt; 796 797 return req->rq_connect_cookie != xprt_connect_cookie(xprt) || 798 !xprt_connected(xprt); 799 } 800 801 /** 802 * xprt_conditional_disconnect - force a transport to disconnect 803 * @xprt: transport to disconnect 804 * @cookie: 'connection cookie' 805 * 806 * This attempts to break the connection if and only if 'cookie' matches 807 * the current transport 'connection cookie'. It ensures that we don't 808 * try to break the connection more than once when we need to retransmit 809 * a batch of RPC requests. 810 * 811 */ 812 void xprt_conditional_disconnect(struct rpc_xprt *xprt, unsigned int cookie) 813 { 814 /* Don't race with the test_bit() in xprt_clear_locked() */ 815 spin_lock(&xprt->transport_lock); 816 if (cookie != xprt->connect_cookie) 817 goto out; 818 if (test_bit(XPRT_CLOSING, &xprt->state)) 819 goto out; 820 set_bit(XPRT_CLOSE_WAIT, &xprt->state); 821 /* Try to schedule an autoclose RPC call */ 822 if (test_and_set_bit(XPRT_LOCKED, &xprt->state) == 0) 823 queue_work(xprtiod_workqueue, &xprt->task_cleanup); 824 xprt_wake_pending_tasks(xprt, -EAGAIN); 825 out: 826 spin_unlock(&xprt->transport_lock); 827 } 828 829 static bool 830 xprt_has_timer(const struct rpc_xprt *xprt) 831 { 832 return xprt->idle_timeout != 0; 833 } 834 835 static void 836 xprt_schedule_autodisconnect(struct rpc_xprt *xprt) 837 __must_hold(&xprt->transport_lock) 838 { 839 xprt->last_used = jiffies; 840 if (RB_EMPTY_ROOT(&xprt->recv_queue) && xprt_has_timer(xprt)) 841 mod_timer(&xprt->timer, xprt->last_used + xprt->idle_timeout); 842 } 843 844 static void 845 xprt_init_autodisconnect(struct timer_list *t) 846 { 847 struct rpc_xprt *xprt = from_timer(xprt, t, timer); 848 849 if (!RB_EMPTY_ROOT(&xprt->recv_queue)) 850 return; 851 /* Reset xprt->last_used to avoid connect/autodisconnect cycling */ 852 xprt->last_used = jiffies; 853 if (test_and_set_bit(XPRT_LOCKED, &xprt->state)) 854 return; 855 queue_work(xprtiod_workqueue, &xprt->task_cleanup); 856 } 857 858 bool xprt_lock_connect(struct rpc_xprt *xprt, 859 struct rpc_task *task, 860 void *cookie) 861 { 862 bool ret = false; 863 864 spin_lock(&xprt->transport_lock); 865 if (!test_bit(XPRT_LOCKED, &xprt->state)) 866 goto out; 867 if (xprt->snd_task != task) 868 goto out; 869 set_bit(XPRT_SND_IS_COOKIE, &xprt->state); 870 xprt->snd_task = cookie; 871 ret = true; 872 out: 873 spin_unlock(&xprt->transport_lock); 874 return ret; 875 } 876 877 void xprt_unlock_connect(struct rpc_xprt *xprt, void *cookie) 878 { 879 spin_lock(&xprt->transport_lock); 880 if (xprt->snd_task != cookie) 881 goto out; 882 if (!test_bit(XPRT_LOCKED, &xprt->state)) 883 goto out; 884 xprt->snd_task =NULL; 885 clear_bit(XPRT_SND_IS_COOKIE, &xprt->state); 886 xprt->ops->release_xprt(xprt, NULL); 887 xprt_schedule_autodisconnect(xprt); 888 out: 889 spin_unlock(&xprt->transport_lock); 890 wake_up_bit(&xprt->state, XPRT_LOCKED); 891 } 892 893 /** 894 * xprt_connect - schedule a transport connect operation 895 * @task: RPC task that is requesting the connect 896 * 897 */ 898 void xprt_connect(struct rpc_task *task) 899 { 900 struct rpc_xprt *xprt = task->tk_rqstp->rq_xprt; 901 902 trace_xprt_connect(xprt); 903 904 if (!xprt_bound(xprt)) { 905 task->tk_status = -EAGAIN; 906 return; 907 } 908 if (!xprt_lock_write(xprt, task)) 909 return; 910 911 if (test_and_clear_bit(XPRT_CLOSE_WAIT, &xprt->state)) { 912 trace_xprt_disconnect_cleanup(xprt); 913 xprt->ops->close(xprt); 914 } 915 916 if (!xprt_connected(xprt)) { 917 task->tk_rqstp->rq_connect_cookie = xprt->connect_cookie; 918 rpc_sleep_on_timeout(&xprt->pending, task, NULL, 919 xprt_request_timeout(task->tk_rqstp)); 920 921 if (test_bit(XPRT_CLOSING, &xprt->state)) 922 return; 923 if (xprt_test_and_set_connecting(xprt)) 924 return; 925 /* Race breaker */ 926 if (!xprt_connected(xprt)) { 927 xprt->stat.connect_start = jiffies; 928 xprt->ops->connect(xprt, task); 929 } else { 930 xprt_clear_connecting(xprt); 931 task->tk_status = 0; 932 rpc_wake_up_queued_task(&xprt->pending, task); 933 } 934 } 935 xprt_release_write(xprt, task); 936 } 937 938 /** 939 * xprt_reconnect_delay - compute the wait before scheduling a connect 940 * @xprt: transport instance 941 * 942 */ 943 unsigned long xprt_reconnect_delay(const struct rpc_xprt *xprt) 944 { 945 unsigned long start, now = jiffies; 946 947 start = xprt->stat.connect_start + xprt->reestablish_timeout; 948 if (time_after(start, now)) 949 return start - now; 950 return 0; 951 } 952 EXPORT_SYMBOL_GPL(xprt_reconnect_delay); 953 954 /** 955 * xprt_reconnect_backoff - compute the new re-establish timeout 956 * @xprt: transport instance 957 * @init_to: initial reestablish timeout 958 * 959 */ 960 void xprt_reconnect_backoff(struct rpc_xprt *xprt, unsigned long init_to) 961 { 962 xprt->reestablish_timeout <<= 1; 963 if (xprt->reestablish_timeout > xprt->max_reconnect_timeout) 964 xprt->reestablish_timeout = xprt->max_reconnect_timeout; 965 if (xprt->reestablish_timeout < init_to) 966 xprt->reestablish_timeout = init_to; 967 } 968 EXPORT_SYMBOL_GPL(xprt_reconnect_backoff); 969 970 enum xprt_xid_rb_cmp { 971 XID_RB_EQUAL, 972 XID_RB_LEFT, 973 XID_RB_RIGHT, 974 }; 975 static enum xprt_xid_rb_cmp 976 xprt_xid_cmp(__be32 xid1, __be32 xid2) 977 { 978 if (xid1 == xid2) 979 return XID_RB_EQUAL; 980 if ((__force u32)xid1 < (__force u32)xid2) 981 return XID_RB_LEFT; 982 return XID_RB_RIGHT; 983 } 984 985 static struct rpc_rqst * 986 xprt_request_rb_find(struct rpc_xprt *xprt, __be32 xid) 987 { 988 struct rb_node *n = xprt->recv_queue.rb_node; 989 struct rpc_rqst *req; 990 991 while (n != NULL) { 992 req = rb_entry(n, struct rpc_rqst, rq_recv); 993 switch (xprt_xid_cmp(xid, req->rq_xid)) { 994 case XID_RB_LEFT: 995 n = n->rb_left; 996 break; 997 case XID_RB_RIGHT: 998 n = n->rb_right; 999 break; 1000 case XID_RB_EQUAL: 1001 return req; 1002 } 1003 } 1004 return NULL; 1005 } 1006 1007 static void 1008 xprt_request_rb_insert(struct rpc_xprt *xprt, struct rpc_rqst *new) 1009 { 1010 struct rb_node **p = &xprt->recv_queue.rb_node; 1011 struct rb_node *n = NULL; 1012 struct rpc_rqst *req; 1013 1014 while (*p != NULL) { 1015 n = *p; 1016 req = rb_entry(n, struct rpc_rqst, rq_recv); 1017 switch(xprt_xid_cmp(new->rq_xid, req->rq_xid)) { 1018 case XID_RB_LEFT: 1019 p = &n->rb_left; 1020 break; 1021 case XID_RB_RIGHT: 1022 p = &n->rb_right; 1023 break; 1024 case XID_RB_EQUAL: 1025 WARN_ON_ONCE(new != req); 1026 return; 1027 } 1028 } 1029 rb_link_node(&new->rq_recv, n, p); 1030 rb_insert_color(&new->rq_recv, &xprt->recv_queue); 1031 } 1032 1033 static void 1034 xprt_request_rb_remove(struct rpc_xprt *xprt, struct rpc_rqst *req) 1035 { 1036 rb_erase(&req->rq_recv, &xprt->recv_queue); 1037 } 1038 1039 /** 1040 * xprt_lookup_rqst - find an RPC request corresponding to an XID 1041 * @xprt: transport on which the original request was transmitted 1042 * @xid: RPC XID of incoming reply 1043 * 1044 * Caller holds xprt->queue_lock. 1045 */ 1046 struct rpc_rqst *xprt_lookup_rqst(struct rpc_xprt *xprt, __be32 xid) 1047 { 1048 struct rpc_rqst *entry; 1049 1050 entry = xprt_request_rb_find(xprt, xid); 1051 if (entry != NULL) { 1052 trace_xprt_lookup_rqst(xprt, xid, 0); 1053 entry->rq_rtt = ktime_sub(ktime_get(), entry->rq_xtime); 1054 return entry; 1055 } 1056 1057 dprintk("RPC: xprt_lookup_rqst did not find xid %08x\n", 1058 ntohl(xid)); 1059 trace_xprt_lookup_rqst(xprt, xid, -ENOENT); 1060 xprt->stat.bad_xids++; 1061 return NULL; 1062 } 1063 EXPORT_SYMBOL_GPL(xprt_lookup_rqst); 1064 1065 static bool 1066 xprt_is_pinned_rqst(struct rpc_rqst *req) 1067 { 1068 return atomic_read(&req->rq_pin) != 0; 1069 } 1070 1071 /** 1072 * xprt_pin_rqst - Pin a request on the transport receive list 1073 * @req: Request to pin 1074 * 1075 * Caller must ensure this is atomic with the call to xprt_lookup_rqst() 1076 * so should be holding xprt->queue_lock. 1077 */ 1078 void xprt_pin_rqst(struct rpc_rqst *req) 1079 { 1080 atomic_inc(&req->rq_pin); 1081 } 1082 EXPORT_SYMBOL_GPL(xprt_pin_rqst); 1083 1084 /** 1085 * xprt_unpin_rqst - Unpin a request on the transport receive list 1086 * @req: Request to pin 1087 * 1088 * Caller should be holding xprt->queue_lock. 1089 */ 1090 void xprt_unpin_rqst(struct rpc_rqst *req) 1091 { 1092 if (!test_bit(RPC_TASK_MSG_PIN_WAIT, &req->rq_task->tk_runstate)) { 1093 atomic_dec(&req->rq_pin); 1094 return; 1095 } 1096 if (atomic_dec_and_test(&req->rq_pin)) 1097 wake_up_var(&req->rq_pin); 1098 } 1099 EXPORT_SYMBOL_GPL(xprt_unpin_rqst); 1100 1101 static void xprt_wait_on_pinned_rqst(struct rpc_rqst *req) 1102 { 1103 wait_var_event(&req->rq_pin, !xprt_is_pinned_rqst(req)); 1104 } 1105 1106 static bool 1107 xprt_request_data_received(struct rpc_task *task) 1108 { 1109 return !test_bit(RPC_TASK_NEED_RECV, &task->tk_runstate) && 1110 READ_ONCE(task->tk_rqstp->rq_reply_bytes_recvd) != 0; 1111 } 1112 1113 static bool 1114 xprt_request_need_enqueue_receive(struct rpc_task *task, struct rpc_rqst *req) 1115 { 1116 return !test_bit(RPC_TASK_NEED_RECV, &task->tk_runstate) && 1117 READ_ONCE(task->tk_rqstp->rq_reply_bytes_recvd) == 0; 1118 } 1119 1120 /** 1121 * xprt_request_enqueue_receive - Add an request to the receive queue 1122 * @task: RPC task 1123 * 1124 */ 1125 void 1126 xprt_request_enqueue_receive(struct rpc_task *task) 1127 { 1128 struct rpc_rqst *req = task->tk_rqstp; 1129 struct rpc_xprt *xprt = req->rq_xprt; 1130 1131 if (!xprt_request_need_enqueue_receive(task, req)) 1132 return; 1133 1134 xprt_request_prepare(task->tk_rqstp); 1135 spin_lock(&xprt->queue_lock); 1136 1137 /* Update the softirq receive buffer */ 1138 memcpy(&req->rq_private_buf, &req->rq_rcv_buf, 1139 sizeof(req->rq_private_buf)); 1140 1141 /* Add request to the receive list */ 1142 xprt_request_rb_insert(xprt, req); 1143 set_bit(RPC_TASK_NEED_RECV, &task->tk_runstate); 1144 spin_unlock(&xprt->queue_lock); 1145 1146 /* Turn off autodisconnect */ 1147 del_singleshot_timer_sync(&xprt->timer); 1148 } 1149 1150 /** 1151 * xprt_request_dequeue_receive_locked - Remove a request from the receive queue 1152 * @task: RPC task 1153 * 1154 * Caller must hold xprt->queue_lock. 1155 */ 1156 static void 1157 xprt_request_dequeue_receive_locked(struct rpc_task *task) 1158 { 1159 struct rpc_rqst *req = task->tk_rqstp; 1160 1161 if (test_and_clear_bit(RPC_TASK_NEED_RECV, &task->tk_runstate)) 1162 xprt_request_rb_remove(req->rq_xprt, req); 1163 } 1164 1165 /** 1166 * xprt_update_rtt - Update RPC RTT statistics 1167 * @task: RPC request that recently completed 1168 * 1169 * Caller holds xprt->queue_lock. 1170 */ 1171 void xprt_update_rtt(struct rpc_task *task) 1172 { 1173 struct rpc_rqst *req = task->tk_rqstp; 1174 struct rpc_rtt *rtt = task->tk_client->cl_rtt; 1175 unsigned int timer = task->tk_msg.rpc_proc->p_timer; 1176 long m = usecs_to_jiffies(ktime_to_us(req->rq_rtt)); 1177 1178 if (timer) { 1179 if (req->rq_ntrans == 1) 1180 rpc_update_rtt(rtt, timer, m); 1181 rpc_set_timeo(rtt, timer, req->rq_ntrans - 1); 1182 } 1183 } 1184 EXPORT_SYMBOL_GPL(xprt_update_rtt); 1185 1186 /** 1187 * xprt_complete_rqst - called when reply processing is complete 1188 * @task: RPC request that recently completed 1189 * @copied: actual number of bytes received from the transport 1190 * 1191 * Caller holds xprt->queue_lock. 1192 */ 1193 void xprt_complete_rqst(struct rpc_task *task, int copied) 1194 { 1195 struct rpc_rqst *req = task->tk_rqstp; 1196 struct rpc_xprt *xprt = req->rq_xprt; 1197 1198 xprt->stat.recvs++; 1199 1200 req->rq_private_buf.len = copied; 1201 /* Ensure all writes are done before we update */ 1202 /* req->rq_reply_bytes_recvd */ 1203 smp_wmb(); 1204 req->rq_reply_bytes_recvd = copied; 1205 xprt_request_dequeue_receive_locked(task); 1206 rpc_wake_up_queued_task(&xprt->pending, task); 1207 } 1208 EXPORT_SYMBOL_GPL(xprt_complete_rqst); 1209 1210 static void xprt_timer(struct rpc_task *task) 1211 { 1212 struct rpc_rqst *req = task->tk_rqstp; 1213 struct rpc_xprt *xprt = req->rq_xprt; 1214 1215 if (task->tk_status != -ETIMEDOUT) 1216 return; 1217 1218 trace_xprt_timer(xprt, req->rq_xid, task->tk_status); 1219 if (!req->rq_reply_bytes_recvd) { 1220 if (xprt->ops->timer) 1221 xprt->ops->timer(xprt, task); 1222 } else 1223 task->tk_status = 0; 1224 } 1225 1226 /** 1227 * xprt_wait_for_reply_request_def - wait for reply 1228 * @task: pointer to rpc_task 1229 * 1230 * Set a request's retransmit timeout based on the transport's 1231 * default timeout parameters. Used by transports that don't adjust 1232 * the retransmit timeout based on round-trip time estimation, 1233 * and put the task to sleep on the pending queue. 1234 */ 1235 void xprt_wait_for_reply_request_def(struct rpc_task *task) 1236 { 1237 struct rpc_rqst *req = task->tk_rqstp; 1238 1239 rpc_sleep_on_timeout(&req->rq_xprt->pending, task, xprt_timer, 1240 xprt_request_timeout(req)); 1241 } 1242 EXPORT_SYMBOL_GPL(xprt_wait_for_reply_request_def); 1243 1244 /** 1245 * xprt_wait_for_reply_request_rtt - wait for reply using RTT estimator 1246 * @task: pointer to rpc_task 1247 * 1248 * Set a request's retransmit timeout using the RTT estimator, 1249 * and put the task to sleep on the pending queue. 1250 */ 1251 void xprt_wait_for_reply_request_rtt(struct rpc_task *task) 1252 { 1253 int timer = task->tk_msg.rpc_proc->p_timer; 1254 struct rpc_clnt *clnt = task->tk_client; 1255 struct rpc_rtt *rtt = clnt->cl_rtt; 1256 struct rpc_rqst *req = task->tk_rqstp; 1257 unsigned long max_timeout = clnt->cl_timeout->to_maxval; 1258 unsigned long timeout; 1259 1260 timeout = rpc_calc_rto(rtt, timer); 1261 timeout <<= rpc_ntimeo(rtt, timer) + req->rq_retries; 1262 if (timeout > max_timeout || timeout == 0) 1263 timeout = max_timeout; 1264 rpc_sleep_on_timeout(&req->rq_xprt->pending, task, xprt_timer, 1265 jiffies + timeout); 1266 } 1267 EXPORT_SYMBOL_GPL(xprt_wait_for_reply_request_rtt); 1268 1269 /** 1270 * xprt_request_wait_receive - wait for the reply to an RPC request 1271 * @task: RPC task about to send a request 1272 * 1273 */ 1274 void xprt_request_wait_receive(struct rpc_task *task) 1275 { 1276 struct rpc_rqst *req = task->tk_rqstp; 1277 struct rpc_xprt *xprt = req->rq_xprt; 1278 1279 if (!test_bit(RPC_TASK_NEED_RECV, &task->tk_runstate)) 1280 return; 1281 /* 1282 * Sleep on the pending queue if we're expecting a reply. 1283 * The spinlock ensures atomicity between the test of 1284 * req->rq_reply_bytes_recvd, and the call to rpc_sleep_on(). 1285 */ 1286 spin_lock(&xprt->queue_lock); 1287 if (test_bit(RPC_TASK_NEED_RECV, &task->tk_runstate)) { 1288 xprt->ops->wait_for_reply_request(task); 1289 /* 1290 * Send an extra queue wakeup call if the 1291 * connection was dropped in case the call to 1292 * rpc_sleep_on() raced. 1293 */ 1294 if (xprt_request_retransmit_after_disconnect(task)) 1295 rpc_wake_up_queued_task_set_status(&xprt->pending, 1296 task, -ENOTCONN); 1297 } 1298 spin_unlock(&xprt->queue_lock); 1299 } 1300 1301 static bool 1302 xprt_request_need_enqueue_transmit(struct rpc_task *task, struct rpc_rqst *req) 1303 { 1304 return !test_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate); 1305 } 1306 1307 /** 1308 * xprt_request_enqueue_transmit - queue a task for transmission 1309 * @task: pointer to rpc_task 1310 * 1311 * Add a task to the transmission queue. 1312 */ 1313 void 1314 xprt_request_enqueue_transmit(struct rpc_task *task) 1315 { 1316 struct rpc_rqst *pos, *req = task->tk_rqstp; 1317 struct rpc_xprt *xprt = req->rq_xprt; 1318 1319 if (xprt_request_need_enqueue_transmit(task, req)) { 1320 req->rq_bytes_sent = 0; 1321 spin_lock(&xprt->queue_lock); 1322 /* 1323 * Requests that carry congestion control credits are added 1324 * to the head of the list to avoid starvation issues. 1325 */ 1326 if (req->rq_cong) { 1327 xprt_clear_congestion_window_wait(xprt); 1328 list_for_each_entry(pos, &xprt->xmit_queue, rq_xmit) { 1329 if (pos->rq_cong) 1330 continue; 1331 /* Note: req is added _before_ pos */ 1332 list_add_tail(&req->rq_xmit, &pos->rq_xmit); 1333 INIT_LIST_HEAD(&req->rq_xmit2); 1334 goto out; 1335 } 1336 } else if (RPC_IS_SWAPPER(task)) { 1337 list_for_each_entry(pos, &xprt->xmit_queue, rq_xmit) { 1338 if (pos->rq_cong || pos->rq_bytes_sent) 1339 continue; 1340 if (RPC_IS_SWAPPER(pos->rq_task)) 1341 continue; 1342 /* Note: req is added _before_ pos */ 1343 list_add_tail(&req->rq_xmit, &pos->rq_xmit); 1344 INIT_LIST_HEAD(&req->rq_xmit2); 1345 goto out; 1346 } 1347 } else if (!req->rq_seqno) { 1348 list_for_each_entry(pos, &xprt->xmit_queue, rq_xmit) { 1349 if (pos->rq_task->tk_owner != task->tk_owner) 1350 continue; 1351 list_add_tail(&req->rq_xmit2, &pos->rq_xmit2); 1352 INIT_LIST_HEAD(&req->rq_xmit); 1353 goto out; 1354 } 1355 } 1356 list_add_tail(&req->rq_xmit, &xprt->xmit_queue); 1357 INIT_LIST_HEAD(&req->rq_xmit2); 1358 out: 1359 atomic_long_inc(&xprt->xmit_queuelen); 1360 set_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate); 1361 spin_unlock(&xprt->queue_lock); 1362 } 1363 } 1364 1365 /** 1366 * xprt_request_dequeue_transmit_locked - remove a task from the transmission queue 1367 * @task: pointer to rpc_task 1368 * 1369 * Remove a task from the transmission queue 1370 * Caller must hold xprt->queue_lock 1371 */ 1372 static void 1373 xprt_request_dequeue_transmit_locked(struct rpc_task *task) 1374 { 1375 struct rpc_rqst *req = task->tk_rqstp; 1376 1377 if (!test_and_clear_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate)) 1378 return; 1379 if (!list_empty(&req->rq_xmit)) { 1380 list_del(&req->rq_xmit); 1381 if (!list_empty(&req->rq_xmit2)) { 1382 struct rpc_rqst *next = list_first_entry(&req->rq_xmit2, 1383 struct rpc_rqst, rq_xmit2); 1384 list_del(&req->rq_xmit2); 1385 list_add_tail(&next->rq_xmit, &next->rq_xprt->xmit_queue); 1386 } 1387 } else 1388 list_del(&req->rq_xmit2); 1389 atomic_long_dec(&req->rq_xprt->xmit_queuelen); 1390 } 1391 1392 /** 1393 * xprt_request_dequeue_transmit - remove a task from the transmission queue 1394 * @task: pointer to rpc_task 1395 * 1396 * Remove a task from the transmission queue 1397 */ 1398 static void 1399 xprt_request_dequeue_transmit(struct rpc_task *task) 1400 { 1401 struct rpc_rqst *req = task->tk_rqstp; 1402 struct rpc_xprt *xprt = req->rq_xprt; 1403 1404 spin_lock(&xprt->queue_lock); 1405 xprt_request_dequeue_transmit_locked(task); 1406 spin_unlock(&xprt->queue_lock); 1407 } 1408 1409 /** 1410 * xprt_request_dequeue_xprt - remove a task from the transmit+receive queue 1411 * @task: pointer to rpc_task 1412 * 1413 * Remove a task from the transmit and receive queues, and ensure that 1414 * it is not pinned by the receive work item. 1415 */ 1416 void 1417 xprt_request_dequeue_xprt(struct rpc_task *task) 1418 { 1419 struct rpc_rqst *req = task->tk_rqstp; 1420 struct rpc_xprt *xprt = req->rq_xprt; 1421 1422 if (test_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate) || 1423 test_bit(RPC_TASK_NEED_RECV, &task->tk_runstate) || 1424 xprt_is_pinned_rqst(req)) { 1425 spin_lock(&xprt->queue_lock); 1426 xprt_request_dequeue_transmit_locked(task); 1427 xprt_request_dequeue_receive_locked(task); 1428 while (xprt_is_pinned_rqst(req)) { 1429 set_bit(RPC_TASK_MSG_PIN_WAIT, &task->tk_runstate); 1430 spin_unlock(&xprt->queue_lock); 1431 xprt_wait_on_pinned_rqst(req); 1432 spin_lock(&xprt->queue_lock); 1433 clear_bit(RPC_TASK_MSG_PIN_WAIT, &task->tk_runstate); 1434 } 1435 spin_unlock(&xprt->queue_lock); 1436 } 1437 } 1438 1439 /** 1440 * xprt_request_prepare - prepare an encoded request for transport 1441 * @req: pointer to rpc_rqst 1442 * 1443 * Calls into the transport layer to do whatever is needed to prepare 1444 * the request for transmission or receive. 1445 */ 1446 void 1447 xprt_request_prepare(struct rpc_rqst *req) 1448 { 1449 struct rpc_xprt *xprt = req->rq_xprt; 1450 1451 if (xprt->ops->prepare_request) 1452 xprt->ops->prepare_request(req); 1453 } 1454 1455 /** 1456 * xprt_request_need_retransmit - Test if a task needs retransmission 1457 * @task: pointer to rpc_task 1458 * 1459 * Test for whether a connection breakage requires the task to retransmit 1460 */ 1461 bool 1462 xprt_request_need_retransmit(struct rpc_task *task) 1463 { 1464 return xprt_request_retransmit_after_disconnect(task); 1465 } 1466 1467 /** 1468 * xprt_prepare_transmit - reserve the transport before sending a request 1469 * @task: RPC task about to send a request 1470 * 1471 */ 1472 bool xprt_prepare_transmit(struct rpc_task *task) 1473 { 1474 struct rpc_rqst *req = task->tk_rqstp; 1475 struct rpc_xprt *xprt = req->rq_xprt; 1476 1477 if (!xprt_lock_write(xprt, task)) { 1478 /* Race breaker: someone may have transmitted us */ 1479 if (!test_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate)) 1480 rpc_wake_up_queued_task_set_status(&xprt->sending, 1481 task, 0); 1482 return false; 1483 1484 } 1485 return true; 1486 } 1487 1488 void xprt_end_transmit(struct rpc_task *task) 1489 { 1490 struct rpc_xprt *xprt = task->tk_rqstp->rq_xprt; 1491 1492 xprt_inject_disconnect(xprt); 1493 xprt_release_write(xprt, task); 1494 } 1495 1496 /** 1497 * xprt_request_transmit - send an RPC request on a transport 1498 * @req: pointer to request to transmit 1499 * @snd_task: RPC task that owns the transport lock 1500 * 1501 * This performs the transmission of a single request. 1502 * Note that if the request is not the same as snd_task, then it 1503 * does need to be pinned. 1504 * Returns '0' on success. 1505 */ 1506 static int 1507 xprt_request_transmit(struct rpc_rqst *req, struct rpc_task *snd_task) 1508 { 1509 struct rpc_xprt *xprt = req->rq_xprt; 1510 struct rpc_task *task = req->rq_task; 1511 unsigned int connect_cookie; 1512 int is_retrans = RPC_WAS_SENT(task); 1513 int status; 1514 1515 if (!req->rq_bytes_sent) { 1516 if (xprt_request_data_received(task)) { 1517 status = 0; 1518 goto out_dequeue; 1519 } 1520 /* Verify that our message lies in the RPCSEC_GSS window */ 1521 if (rpcauth_xmit_need_reencode(task)) { 1522 status = -EBADMSG; 1523 goto out_dequeue; 1524 } 1525 if (RPC_SIGNALLED(task)) { 1526 status = -ERESTARTSYS; 1527 goto out_dequeue; 1528 } 1529 } 1530 1531 /* 1532 * Update req->rq_ntrans before transmitting to avoid races with 1533 * xprt_update_rtt(), which needs to know that it is recording a 1534 * reply to the first transmission. 1535 */ 1536 req->rq_ntrans++; 1537 1538 trace_rpc_xdr_sendto(task, &req->rq_snd_buf); 1539 connect_cookie = xprt->connect_cookie; 1540 status = xprt->ops->send_request(req); 1541 if (status != 0) { 1542 req->rq_ntrans--; 1543 trace_xprt_transmit(req, status); 1544 return status; 1545 } 1546 1547 if (is_retrans) { 1548 task->tk_client->cl_stats->rpcretrans++; 1549 trace_xprt_retransmit(req); 1550 } 1551 1552 xprt_inject_disconnect(xprt); 1553 1554 task->tk_flags |= RPC_TASK_SENT; 1555 spin_lock(&xprt->transport_lock); 1556 1557 xprt->stat.sends++; 1558 xprt->stat.req_u += xprt->stat.sends - xprt->stat.recvs; 1559 xprt->stat.bklog_u += xprt->backlog.qlen; 1560 xprt->stat.sending_u += xprt->sending.qlen; 1561 xprt->stat.pending_u += xprt->pending.qlen; 1562 spin_unlock(&xprt->transport_lock); 1563 1564 req->rq_connect_cookie = connect_cookie; 1565 out_dequeue: 1566 trace_xprt_transmit(req, status); 1567 xprt_request_dequeue_transmit(task); 1568 rpc_wake_up_queued_task_set_status(&xprt->sending, task, status); 1569 return status; 1570 } 1571 1572 /** 1573 * xprt_transmit - send an RPC request on a transport 1574 * @task: controlling RPC task 1575 * 1576 * Attempts to drain the transmit queue. On exit, either the transport 1577 * signalled an error that needs to be handled before transmission can 1578 * resume, or @task finished transmitting, and detected that it already 1579 * received a reply. 1580 */ 1581 void 1582 xprt_transmit(struct rpc_task *task) 1583 { 1584 struct rpc_rqst *next, *req = task->tk_rqstp; 1585 struct rpc_xprt *xprt = req->rq_xprt; 1586 int counter, status; 1587 1588 spin_lock(&xprt->queue_lock); 1589 counter = 0; 1590 while (!list_empty(&xprt->xmit_queue)) { 1591 if (++counter == 20) 1592 break; 1593 next = list_first_entry(&xprt->xmit_queue, 1594 struct rpc_rqst, rq_xmit); 1595 xprt_pin_rqst(next); 1596 spin_unlock(&xprt->queue_lock); 1597 status = xprt_request_transmit(next, task); 1598 if (status == -EBADMSG && next != req) 1599 status = 0; 1600 spin_lock(&xprt->queue_lock); 1601 xprt_unpin_rqst(next); 1602 if (status == 0) { 1603 if (!xprt_request_data_received(task) || 1604 test_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate)) 1605 continue; 1606 } else if (test_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate)) 1607 task->tk_status = status; 1608 break; 1609 } 1610 spin_unlock(&xprt->queue_lock); 1611 } 1612 1613 static void xprt_complete_request_init(struct rpc_task *task) 1614 { 1615 if (task->tk_rqstp) 1616 xprt_request_init(task); 1617 } 1618 1619 void xprt_add_backlog(struct rpc_xprt *xprt, struct rpc_task *task) 1620 { 1621 set_bit(XPRT_CONGESTED, &xprt->state); 1622 rpc_sleep_on(&xprt->backlog, task, xprt_complete_request_init); 1623 } 1624 EXPORT_SYMBOL_GPL(xprt_add_backlog); 1625 1626 static bool __xprt_set_rq(struct rpc_task *task, void *data) 1627 { 1628 struct rpc_rqst *req = data; 1629 1630 if (task->tk_rqstp == NULL) { 1631 memset(req, 0, sizeof(*req)); /* mark unused */ 1632 task->tk_rqstp = req; 1633 return true; 1634 } 1635 return false; 1636 } 1637 1638 bool xprt_wake_up_backlog(struct rpc_xprt *xprt, struct rpc_rqst *req) 1639 { 1640 if (rpc_wake_up_first(&xprt->backlog, __xprt_set_rq, req) == NULL) { 1641 clear_bit(XPRT_CONGESTED, &xprt->state); 1642 return false; 1643 } 1644 return true; 1645 } 1646 EXPORT_SYMBOL_GPL(xprt_wake_up_backlog); 1647 1648 static bool xprt_throttle_congested(struct rpc_xprt *xprt, struct rpc_task *task) 1649 { 1650 bool ret = false; 1651 1652 if (!test_bit(XPRT_CONGESTED, &xprt->state)) 1653 goto out; 1654 spin_lock(&xprt->reserve_lock); 1655 if (test_bit(XPRT_CONGESTED, &xprt->state)) { 1656 xprt_add_backlog(xprt, task); 1657 ret = true; 1658 } 1659 spin_unlock(&xprt->reserve_lock); 1660 out: 1661 return ret; 1662 } 1663 1664 static struct rpc_rqst *xprt_dynamic_alloc_slot(struct rpc_xprt *xprt) 1665 { 1666 struct rpc_rqst *req = ERR_PTR(-EAGAIN); 1667 1668 if (xprt->num_reqs >= xprt->max_reqs) 1669 goto out; 1670 ++xprt->num_reqs; 1671 spin_unlock(&xprt->reserve_lock); 1672 req = kzalloc(sizeof(struct rpc_rqst), GFP_NOFS); 1673 spin_lock(&xprt->reserve_lock); 1674 if (req != NULL) 1675 goto out; 1676 --xprt->num_reqs; 1677 req = ERR_PTR(-ENOMEM); 1678 out: 1679 return req; 1680 } 1681 1682 static bool xprt_dynamic_free_slot(struct rpc_xprt *xprt, struct rpc_rqst *req) 1683 { 1684 if (xprt->num_reqs > xprt->min_reqs) { 1685 --xprt->num_reqs; 1686 kfree(req); 1687 return true; 1688 } 1689 return false; 1690 } 1691 1692 void xprt_alloc_slot(struct rpc_xprt *xprt, struct rpc_task *task) 1693 { 1694 struct rpc_rqst *req; 1695 1696 spin_lock(&xprt->reserve_lock); 1697 if (!list_empty(&xprt->free)) { 1698 req = list_entry(xprt->free.next, struct rpc_rqst, rq_list); 1699 list_del(&req->rq_list); 1700 goto out_init_req; 1701 } 1702 req = xprt_dynamic_alloc_slot(xprt); 1703 if (!IS_ERR(req)) 1704 goto out_init_req; 1705 switch (PTR_ERR(req)) { 1706 case -ENOMEM: 1707 dprintk("RPC: dynamic allocation of request slot " 1708 "failed! Retrying\n"); 1709 task->tk_status = -ENOMEM; 1710 break; 1711 case -EAGAIN: 1712 xprt_add_backlog(xprt, task); 1713 dprintk("RPC: waiting for request slot\n"); 1714 fallthrough; 1715 default: 1716 task->tk_status = -EAGAIN; 1717 } 1718 spin_unlock(&xprt->reserve_lock); 1719 return; 1720 out_init_req: 1721 xprt->stat.max_slots = max_t(unsigned int, xprt->stat.max_slots, 1722 xprt->num_reqs); 1723 spin_unlock(&xprt->reserve_lock); 1724 1725 task->tk_status = 0; 1726 task->tk_rqstp = req; 1727 } 1728 EXPORT_SYMBOL_GPL(xprt_alloc_slot); 1729 1730 void xprt_free_slot(struct rpc_xprt *xprt, struct rpc_rqst *req) 1731 { 1732 spin_lock(&xprt->reserve_lock); 1733 if (!xprt_wake_up_backlog(xprt, req) && 1734 !xprt_dynamic_free_slot(xprt, req)) { 1735 memset(req, 0, sizeof(*req)); /* mark unused */ 1736 list_add(&req->rq_list, &xprt->free); 1737 } 1738 spin_unlock(&xprt->reserve_lock); 1739 } 1740 EXPORT_SYMBOL_GPL(xprt_free_slot); 1741 1742 static void xprt_free_all_slots(struct rpc_xprt *xprt) 1743 { 1744 struct rpc_rqst *req; 1745 while (!list_empty(&xprt->free)) { 1746 req = list_first_entry(&xprt->free, struct rpc_rqst, rq_list); 1747 list_del(&req->rq_list); 1748 kfree(req); 1749 } 1750 } 1751 1752 static DEFINE_IDA(rpc_xprt_ids); 1753 1754 void xprt_cleanup_ids(void) 1755 { 1756 ida_destroy(&rpc_xprt_ids); 1757 } 1758 1759 static int xprt_alloc_id(struct rpc_xprt *xprt) 1760 { 1761 int id; 1762 1763 id = ida_simple_get(&rpc_xprt_ids, 0, 0, GFP_KERNEL); 1764 if (id < 0) 1765 return id; 1766 1767 xprt->id = id; 1768 return 0; 1769 } 1770 1771 static void xprt_free_id(struct rpc_xprt *xprt) 1772 { 1773 ida_simple_remove(&rpc_xprt_ids, xprt->id); 1774 } 1775 1776 struct rpc_xprt *xprt_alloc(struct net *net, size_t size, 1777 unsigned int num_prealloc, 1778 unsigned int max_alloc) 1779 { 1780 struct rpc_xprt *xprt; 1781 struct rpc_rqst *req; 1782 int i; 1783 1784 xprt = kzalloc(size, GFP_KERNEL); 1785 if (xprt == NULL) 1786 goto out; 1787 1788 xprt_alloc_id(xprt); 1789 xprt_init(xprt, net); 1790 1791 for (i = 0; i < num_prealloc; i++) { 1792 req = kzalloc(sizeof(struct rpc_rqst), GFP_KERNEL); 1793 if (!req) 1794 goto out_free; 1795 list_add(&req->rq_list, &xprt->free); 1796 } 1797 if (max_alloc > num_prealloc) 1798 xprt->max_reqs = max_alloc; 1799 else 1800 xprt->max_reqs = num_prealloc; 1801 xprt->min_reqs = num_prealloc; 1802 xprt->num_reqs = num_prealloc; 1803 1804 return xprt; 1805 1806 out_free: 1807 xprt_free(xprt); 1808 out: 1809 return NULL; 1810 } 1811 EXPORT_SYMBOL_GPL(xprt_alloc); 1812 1813 void xprt_free(struct rpc_xprt *xprt) 1814 { 1815 put_net(xprt->xprt_net); 1816 xprt_free_all_slots(xprt); 1817 xprt_free_id(xprt); 1818 rpc_sysfs_xprt_destroy(xprt); 1819 kfree_rcu(xprt, rcu); 1820 } 1821 EXPORT_SYMBOL_GPL(xprt_free); 1822 1823 static void 1824 xprt_init_connect_cookie(struct rpc_rqst *req, struct rpc_xprt *xprt) 1825 { 1826 req->rq_connect_cookie = xprt_connect_cookie(xprt) - 1; 1827 } 1828 1829 static __be32 1830 xprt_alloc_xid(struct rpc_xprt *xprt) 1831 { 1832 __be32 xid; 1833 1834 spin_lock(&xprt->reserve_lock); 1835 xid = (__force __be32)xprt->xid++; 1836 spin_unlock(&xprt->reserve_lock); 1837 return xid; 1838 } 1839 1840 static void 1841 xprt_init_xid(struct rpc_xprt *xprt) 1842 { 1843 xprt->xid = prandom_u32(); 1844 } 1845 1846 static void 1847 xprt_request_init(struct rpc_task *task) 1848 { 1849 struct rpc_xprt *xprt = task->tk_xprt; 1850 struct rpc_rqst *req = task->tk_rqstp; 1851 1852 req->rq_task = task; 1853 req->rq_xprt = xprt; 1854 req->rq_buffer = NULL; 1855 req->rq_xid = xprt_alloc_xid(xprt); 1856 xprt_init_connect_cookie(req, xprt); 1857 req->rq_snd_buf.len = 0; 1858 req->rq_snd_buf.buflen = 0; 1859 req->rq_rcv_buf.len = 0; 1860 req->rq_rcv_buf.buflen = 0; 1861 req->rq_snd_buf.bvec = NULL; 1862 req->rq_rcv_buf.bvec = NULL; 1863 req->rq_release_snd_buf = NULL; 1864 xprt_init_majortimeo(task, req); 1865 1866 trace_xprt_reserve(req); 1867 } 1868 1869 static void 1870 xprt_do_reserve(struct rpc_xprt *xprt, struct rpc_task *task) 1871 { 1872 xprt->ops->alloc_slot(xprt, task); 1873 if (task->tk_rqstp != NULL) 1874 xprt_request_init(task); 1875 } 1876 1877 /** 1878 * xprt_reserve - allocate an RPC request slot 1879 * @task: RPC task requesting a slot allocation 1880 * 1881 * If the transport is marked as being congested, or if no more 1882 * slots are available, place the task on the transport's 1883 * backlog queue. 1884 */ 1885 void xprt_reserve(struct rpc_task *task) 1886 { 1887 struct rpc_xprt *xprt = task->tk_xprt; 1888 1889 task->tk_status = 0; 1890 if (task->tk_rqstp != NULL) 1891 return; 1892 1893 task->tk_status = -EAGAIN; 1894 if (!xprt_throttle_congested(xprt, task)) 1895 xprt_do_reserve(xprt, task); 1896 } 1897 1898 /** 1899 * xprt_retry_reserve - allocate an RPC request slot 1900 * @task: RPC task requesting a slot allocation 1901 * 1902 * If no more slots are available, place the task on the transport's 1903 * backlog queue. 1904 * Note that the only difference with xprt_reserve is that we now 1905 * ignore the value of the XPRT_CONGESTED flag. 1906 */ 1907 void xprt_retry_reserve(struct rpc_task *task) 1908 { 1909 struct rpc_xprt *xprt = task->tk_xprt; 1910 1911 task->tk_status = 0; 1912 if (task->tk_rqstp != NULL) 1913 return; 1914 1915 task->tk_status = -EAGAIN; 1916 xprt_do_reserve(xprt, task); 1917 } 1918 1919 /** 1920 * xprt_release - release an RPC request slot 1921 * @task: task which is finished with the slot 1922 * 1923 */ 1924 void xprt_release(struct rpc_task *task) 1925 { 1926 struct rpc_xprt *xprt; 1927 struct rpc_rqst *req = task->tk_rqstp; 1928 1929 if (req == NULL) { 1930 if (task->tk_client) { 1931 xprt = task->tk_xprt; 1932 xprt_release_write(xprt, task); 1933 } 1934 return; 1935 } 1936 1937 xprt = req->rq_xprt; 1938 xprt_request_dequeue_xprt(task); 1939 spin_lock(&xprt->transport_lock); 1940 xprt->ops->release_xprt(xprt, task); 1941 if (xprt->ops->release_request) 1942 xprt->ops->release_request(task); 1943 xprt_schedule_autodisconnect(xprt); 1944 spin_unlock(&xprt->transport_lock); 1945 if (req->rq_buffer) 1946 xprt->ops->buf_free(task); 1947 xdr_free_bvec(&req->rq_rcv_buf); 1948 xdr_free_bvec(&req->rq_snd_buf); 1949 if (req->rq_cred != NULL) 1950 put_rpccred(req->rq_cred); 1951 if (req->rq_release_snd_buf) 1952 req->rq_release_snd_buf(req); 1953 1954 task->tk_rqstp = NULL; 1955 if (likely(!bc_prealloc(req))) 1956 xprt->ops->free_slot(xprt, req); 1957 else 1958 xprt_free_bc_request(req); 1959 } 1960 1961 #ifdef CONFIG_SUNRPC_BACKCHANNEL 1962 void 1963 xprt_init_bc_request(struct rpc_rqst *req, struct rpc_task *task) 1964 { 1965 struct xdr_buf *xbufp = &req->rq_snd_buf; 1966 1967 task->tk_rqstp = req; 1968 req->rq_task = task; 1969 xprt_init_connect_cookie(req, req->rq_xprt); 1970 /* 1971 * Set up the xdr_buf length. 1972 * This also indicates that the buffer is XDR encoded already. 1973 */ 1974 xbufp->len = xbufp->head[0].iov_len + xbufp->page_len + 1975 xbufp->tail[0].iov_len; 1976 } 1977 #endif 1978 1979 static void xprt_init(struct rpc_xprt *xprt, struct net *net) 1980 { 1981 kref_init(&xprt->kref); 1982 1983 spin_lock_init(&xprt->transport_lock); 1984 spin_lock_init(&xprt->reserve_lock); 1985 spin_lock_init(&xprt->queue_lock); 1986 1987 INIT_LIST_HEAD(&xprt->free); 1988 xprt->recv_queue = RB_ROOT; 1989 INIT_LIST_HEAD(&xprt->xmit_queue); 1990 #if defined(CONFIG_SUNRPC_BACKCHANNEL) 1991 spin_lock_init(&xprt->bc_pa_lock); 1992 INIT_LIST_HEAD(&xprt->bc_pa_list); 1993 #endif /* CONFIG_SUNRPC_BACKCHANNEL */ 1994 INIT_LIST_HEAD(&xprt->xprt_switch); 1995 1996 xprt->last_used = jiffies; 1997 xprt->cwnd = RPC_INITCWND; 1998 xprt->bind_index = 0; 1999 2000 rpc_init_wait_queue(&xprt->binding, "xprt_binding"); 2001 rpc_init_wait_queue(&xprt->pending, "xprt_pending"); 2002 rpc_init_wait_queue(&xprt->sending, "xprt_sending"); 2003 rpc_init_priority_wait_queue(&xprt->backlog, "xprt_backlog"); 2004 2005 xprt_init_xid(xprt); 2006 2007 xprt->xprt_net = get_net(net); 2008 } 2009 2010 /** 2011 * xprt_create_transport - create an RPC transport 2012 * @args: rpc transport creation arguments 2013 * 2014 */ 2015 struct rpc_xprt *xprt_create_transport(struct xprt_create *args) 2016 { 2017 struct rpc_xprt *xprt; 2018 const struct xprt_class *t; 2019 2020 t = xprt_class_find_by_ident(args->ident); 2021 if (!t) { 2022 dprintk("RPC: transport (%d) not supported\n", args->ident); 2023 return ERR_PTR(-EIO); 2024 } 2025 2026 xprt = t->setup(args); 2027 xprt_class_release(t); 2028 2029 if (IS_ERR(xprt)) 2030 goto out; 2031 if (args->flags & XPRT_CREATE_NO_IDLE_TIMEOUT) 2032 xprt->idle_timeout = 0; 2033 INIT_WORK(&xprt->task_cleanup, xprt_autoclose); 2034 if (xprt_has_timer(xprt)) 2035 timer_setup(&xprt->timer, xprt_init_autodisconnect, 0); 2036 else 2037 timer_setup(&xprt->timer, NULL, 0); 2038 2039 if (strlen(args->servername) > RPC_MAXNETNAMELEN) { 2040 xprt_destroy(xprt); 2041 return ERR_PTR(-EINVAL); 2042 } 2043 xprt->servername = kstrdup(args->servername, GFP_KERNEL); 2044 if (xprt->servername == NULL) { 2045 xprt_destroy(xprt); 2046 return ERR_PTR(-ENOMEM); 2047 } 2048 2049 rpc_xprt_debugfs_register(xprt); 2050 2051 trace_xprt_create(xprt); 2052 out: 2053 return xprt; 2054 } 2055 2056 static void xprt_destroy_cb(struct work_struct *work) 2057 { 2058 struct rpc_xprt *xprt = 2059 container_of(work, struct rpc_xprt, task_cleanup); 2060 2061 trace_xprt_destroy(xprt); 2062 2063 rpc_xprt_debugfs_unregister(xprt); 2064 rpc_destroy_wait_queue(&xprt->binding); 2065 rpc_destroy_wait_queue(&xprt->pending); 2066 rpc_destroy_wait_queue(&xprt->sending); 2067 rpc_destroy_wait_queue(&xprt->backlog); 2068 kfree(xprt->servername); 2069 /* 2070 * Destroy any existing back channel 2071 */ 2072 xprt_destroy_backchannel(xprt, UINT_MAX); 2073 2074 /* 2075 * Tear down transport state and free the rpc_xprt 2076 */ 2077 xprt->ops->destroy(xprt); 2078 } 2079 2080 /** 2081 * xprt_destroy - destroy an RPC transport, killing off all requests. 2082 * @xprt: transport to destroy 2083 * 2084 */ 2085 static void xprt_destroy(struct rpc_xprt *xprt) 2086 { 2087 /* 2088 * Exclude transport connect/disconnect handlers and autoclose 2089 */ 2090 wait_on_bit_lock(&xprt->state, XPRT_LOCKED, TASK_UNINTERRUPTIBLE); 2091 2092 del_timer_sync(&xprt->timer); 2093 2094 /* 2095 * Destroy sockets etc from the system workqueue so they can 2096 * safely flush receive work running on rpciod. 2097 */ 2098 INIT_WORK(&xprt->task_cleanup, xprt_destroy_cb); 2099 schedule_work(&xprt->task_cleanup); 2100 } 2101 2102 static void xprt_destroy_kref(struct kref *kref) 2103 { 2104 xprt_destroy(container_of(kref, struct rpc_xprt, kref)); 2105 } 2106 2107 /** 2108 * xprt_get - return a reference to an RPC transport. 2109 * @xprt: pointer to the transport 2110 * 2111 */ 2112 struct rpc_xprt *xprt_get(struct rpc_xprt *xprt) 2113 { 2114 if (xprt != NULL && kref_get_unless_zero(&xprt->kref)) 2115 return xprt; 2116 return NULL; 2117 } 2118 EXPORT_SYMBOL_GPL(xprt_get); 2119 2120 /** 2121 * xprt_put - release a reference to an RPC transport. 2122 * @xprt: pointer to the transport 2123 * 2124 */ 2125 void xprt_put(struct rpc_xprt *xprt) 2126 { 2127 if (xprt != NULL) 2128 kref_put(&xprt->kref, xprt_destroy_kref); 2129 } 2130 EXPORT_SYMBOL_GPL(xprt_put); 2131