1 // SPDX-License-Identifier: GPL-2.0-only 2 /* 3 * linux/net/sunrpc/xprt.c 4 * 5 * This is a generic RPC call interface supporting congestion avoidance, 6 * and asynchronous calls. 7 * 8 * The interface works like this: 9 * 10 * - When a process places a call, it allocates a request slot if 11 * one is available. Otherwise, it sleeps on the backlog queue 12 * (xprt_reserve). 13 * - Next, the caller puts together the RPC message, stuffs it into 14 * the request struct, and calls xprt_transmit(). 15 * - xprt_transmit sends the message and installs the caller on the 16 * transport's wait list. At the same time, if a reply is expected, 17 * it installs a timer that is run after the packet's timeout has 18 * expired. 19 * - When a packet arrives, the data_ready handler walks the list of 20 * pending requests for that transport. If a matching XID is found, the 21 * caller is woken up, and the timer removed. 22 * - When no reply arrives within the timeout interval, the timer is 23 * fired by the kernel and runs xprt_timer(). It either adjusts the 24 * timeout values (minor timeout) or wakes up the caller with a status 25 * of -ETIMEDOUT. 26 * - When the caller receives a notification from RPC that a reply arrived, 27 * it should release the RPC slot, and process the reply. 28 * If the call timed out, it may choose to retry the operation by 29 * adjusting the initial timeout value, and simply calling rpc_call 30 * again. 31 * 32 * Support for async RPC is done through a set of RPC-specific scheduling 33 * primitives that `transparently' work for processes as well as async 34 * tasks that rely on callbacks. 35 * 36 * Copyright (C) 1995-1997, Olaf Kirch <okir@monad.swb.de> 37 * 38 * Transport switch API copyright (C) 2005, Chuck Lever <cel@netapp.com> 39 */ 40 41 #include <linux/module.h> 42 43 #include <linux/types.h> 44 #include <linux/interrupt.h> 45 #include <linux/workqueue.h> 46 #include <linux/net.h> 47 #include <linux/ktime.h> 48 49 #include <linux/sunrpc/clnt.h> 50 #include <linux/sunrpc/metrics.h> 51 #include <linux/sunrpc/bc_xprt.h> 52 #include <linux/rcupdate.h> 53 #include <linux/sched/mm.h> 54 55 #include <trace/events/sunrpc.h> 56 57 #include "sunrpc.h" 58 59 /* 60 * Local variables 61 */ 62 63 #if IS_ENABLED(CONFIG_SUNRPC_DEBUG) 64 # define RPCDBG_FACILITY RPCDBG_XPRT 65 #endif 66 67 /* 68 * Local functions 69 */ 70 static void xprt_init(struct rpc_xprt *xprt, struct net *net); 71 static __be32 xprt_alloc_xid(struct rpc_xprt *xprt); 72 static void xprt_destroy(struct rpc_xprt *xprt); 73 static void xprt_request_init(struct rpc_task *task); 74 75 static DEFINE_SPINLOCK(xprt_list_lock); 76 static LIST_HEAD(xprt_list); 77 78 static unsigned long xprt_request_timeout(const struct rpc_rqst *req) 79 { 80 unsigned long timeout = jiffies + req->rq_timeout; 81 82 if (time_before(timeout, req->rq_majortimeo)) 83 return timeout; 84 return req->rq_majortimeo; 85 } 86 87 /** 88 * xprt_register_transport - register a transport implementation 89 * @transport: transport to register 90 * 91 * If a transport implementation is loaded as a kernel module, it can 92 * call this interface to make itself known to the RPC client. 93 * 94 * Returns: 95 * 0: transport successfully registered 96 * -EEXIST: transport already registered 97 * -EINVAL: transport module being unloaded 98 */ 99 int xprt_register_transport(struct xprt_class *transport) 100 { 101 struct xprt_class *t; 102 int result; 103 104 result = -EEXIST; 105 spin_lock(&xprt_list_lock); 106 list_for_each_entry(t, &xprt_list, list) { 107 /* don't register the same transport class twice */ 108 if (t->ident == transport->ident) 109 goto out; 110 } 111 112 list_add_tail(&transport->list, &xprt_list); 113 printk(KERN_INFO "RPC: Registered %s transport module.\n", 114 transport->name); 115 result = 0; 116 117 out: 118 spin_unlock(&xprt_list_lock); 119 return result; 120 } 121 EXPORT_SYMBOL_GPL(xprt_register_transport); 122 123 /** 124 * xprt_unregister_transport - unregister a transport implementation 125 * @transport: transport to unregister 126 * 127 * Returns: 128 * 0: transport successfully unregistered 129 * -ENOENT: transport never registered 130 */ 131 int xprt_unregister_transport(struct xprt_class *transport) 132 { 133 struct xprt_class *t; 134 int result; 135 136 result = 0; 137 spin_lock(&xprt_list_lock); 138 list_for_each_entry(t, &xprt_list, list) { 139 if (t == transport) { 140 printk(KERN_INFO 141 "RPC: Unregistered %s transport module.\n", 142 transport->name); 143 list_del_init(&transport->list); 144 goto out; 145 } 146 } 147 result = -ENOENT; 148 149 out: 150 spin_unlock(&xprt_list_lock); 151 return result; 152 } 153 EXPORT_SYMBOL_GPL(xprt_unregister_transport); 154 155 static void 156 xprt_class_release(const struct xprt_class *t) 157 { 158 module_put(t->owner); 159 } 160 161 static const struct xprt_class * 162 xprt_class_find_by_netid_locked(const char *netid) 163 { 164 const struct xprt_class *t; 165 unsigned int i; 166 167 list_for_each_entry(t, &xprt_list, list) { 168 for (i = 0; t->netid[i][0] != '\0'; i++) { 169 if (strcmp(t->netid[i], netid) != 0) 170 continue; 171 if (!try_module_get(t->owner)) 172 continue; 173 return t; 174 } 175 } 176 return NULL; 177 } 178 179 static const struct xprt_class * 180 xprt_class_find_by_netid(const char *netid) 181 { 182 const struct xprt_class *t; 183 184 spin_lock(&xprt_list_lock); 185 t = xprt_class_find_by_netid_locked(netid); 186 if (!t) { 187 spin_unlock(&xprt_list_lock); 188 request_module("rpc%s", netid); 189 spin_lock(&xprt_list_lock); 190 t = xprt_class_find_by_netid_locked(netid); 191 } 192 spin_unlock(&xprt_list_lock); 193 return t; 194 } 195 196 /** 197 * xprt_load_transport - load a transport implementation 198 * @netid: transport to load 199 * 200 * Returns: 201 * 0: transport successfully loaded 202 * -ENOENT: transport module not available 203 */ 204 int xprt_load_transport(const char *netid) 205 { 206 const struct xprt_class *t; 207 208 t = xprt_class_find_by_netid(netid); 209 if (!t) 210 return -ENOENT; 211 xprt_class_release(t); 212 return 0; 213 } 214 EXPORT_SYMBOL_GPL(xprt_load_transport); 215 216 static void xprt_clear_locked(struct rpc_xprt *xprt) 217 { 218 xprt->snd_task = NULL; 219 if (!test_bit(XPRT_CLOSE_WAIT, &xprt->state)) { 220 smp_mb__before_atomic(); 221 clear_bit(XPRT_LOCKED, &xprt->state); 222 smp_mb__after_atomic(); 223 } else 224 queue_work(xprtiod_workqueue, &xprt->task_cleanup); 225 } 226 227 /** 228 * xprt_reserve_xprt - serialize write access to transports 229 * @task: task that is requesting access to the transport 230 * @xprt: pointer to the target transport 231 * 232 * This prevents mixing the payload of separate requests, and prevents 233 * transport connects from colliding with writes. No congestion control 234 * is provided. 235 */ 236 int xprt_reserve_xprt(struct rpc_xprt *xprt, struct rpc_task *task) 237 { 238 struct rpc_rqst *req = task->tk_rqstp; 239 240 if (test_and_set_bit(XPRT_LOCKED, &xprt->state)) { 241 if (task == xprt->snd_task) 242 goto out_locked; 243 goto out_sleep; 244 } 245 if (test_bit(XPRT_WRITE_SPACE, &xprt->state)) 246 goto out_unlock; 247 xprt->snd_task = task; 248 249 out_locked: 250 trace_xprt_reserve_xprt(xprt, task); 251 return 1; 252 253 out_unlock: 254 xprt_clear_locked(xprt); 255 out_sleep: 256 task->tk_status = -EAGAIN; 257 if (RPC_IS_SOFT(task)) 258 rpc_sleep_on_timeout(&xprt->sending, task, NULL, 259 xprt_request_timeout(req)); 260 else 261 rpc_sleep_on(&xprt->sending, task, NULL); 262 return 0; 263 } 264 EXPORT_SYMBOL_GPL(xprt_reserve_xprt); 265 266 static bool 267 xprt_need_congestion_window_wait(struct rpc_xprt *xprt) 268 { 269 return test_bit(XPRT_CWND_WAIT, &xprt->state); 270 } 271 272 static void 273 xprt_set_congestion_window_wait(struct rpc_xprt *xprt) 274 { 275 if (!list_empty(&xprt->xmit_queue)) { 276 /* Peek at head of queue to see if it can make progress */ 277 if (list_first_entry(&xprt->xmit_queue, struct rpc_rqst, 278 rq_xmit)->rq_cong) 279 return; 280 } 281 set_bit(XPRT_CWND_WAIT, &xprt->state); 282 } 283 284 static void 285 xprt_test_and_clear_congestion_window_wait(struct rpc_xprt *xprt) 286 { 287 if (!RPCXPRT_CONGESTED(xprt)) 288 clear_bit(XPRT_CWND_WAIT, &xprt->state); 289 } 290 291 /* 292 * xprt_reserve_xprt_cong - serialize write access to transports 293 * @task: task that is requesting access to the transport 294 * 295 * Same as xprt_reserve_xprt, but Van Jacobson congestion control is 296 * integrated into the decision of whether a request is allowed to be 297 * woken up and given access to the transport. 298 * Note that the lock is only granted if we know there are free slots. 299 */ 300 int xprt_reserve_xprt_cong(struct rpc_xprt *xprt, struct rpc_task *task) 301 { 302 struct rpc_rqst *req = task->tk_rqstp; 303 304 if (test_and_set_bit(XPRT_LOCKED, &xprt->state)) { 305 if (task == xprt->snd_task) 306 goto out_locked; 307 goto out_sleep; 308 } 309 if (req == NULL) { 310 xprt->snd_task = task; 311 goto out_locked; 312 } 313 if (test_bit(XPRT_WRITE_SPACE, &xprt->state)) 314 goto out_unlock; 315 if (!xprt_need_congestion_window_wait(xprt)) { 316 xprt->snd_task = task; 317 goto out_locked; 318 } 319 out_unlock: 320 xprt_clear_locked(xprt); 321 out_sleep: 322 task->tk_status = -EAGAIN; 323 if (RPC_IS_SOFT(task)) 324 rpc_sleep_on_timeout(&xprt->sending, task, NULL, 325 xprt_request_timeout(req)); 326 else 327 rpc_sleep_on(&xprt->sending, task, NULL); 328 return 0; 329 out_locked: 330 trace_xprt_reserve_cong(xprt, task); 331 return 1; 332 } 333 EXPORT_SYMBOL_GPL(xprt_reserve_xprt_cong); 334 335 static inline int xprt_lock_write(struct rpc_xprt *xprt, struct rpc_task *task) 336 { 337 int retval; 338 339 if (test_bit(XPRT_LOCKED, &xprt->state) && xprt->snd_task == task) 340 return 1; 341 spin_lock(&xprt->transport_lock); 342 retval = xprt->ops->reserve_xprt(xprt, task); 343 spin_unlock(&xprt->transport_lock); 344 return retval; 345 } 346 347 static bool __xprt_lock_write_func(struct rpc_task *task, void *data) 348 { 349 struct rpc_xprt *xprt = data; 350 351 xprt->snd_task = task; 352 return true; 353 } 354 355 static void __xprt_lock_write_next(struct rpc_xprt *xprt) 356 { 357 if (test_and_set_bit(XPRT_LOCKED, &xprt->state)) 358 return; 359 if (test_bit(XPRT_WRITE_SPACE, &xprt->state)) 360 goto out_unlock; 361 if (rpc_wake_up_first_on_wq(xprtiod_workqueue, &xprt->sending, 362 __xprt_lock_write_func, xprt)) 363 return; 364 out_unlock: 365 xprt_clear_locked(xprt); 366 } 367 368 static void __xprt_lock_write_next_cong(struct rpc_xprt *xprt) 369 { 370 if (test_and_set_bit(XPRT_LOCKED, &xprt->state)) 371 return; 372 if (test_bit(XPRT_WRITE_SPACE, &xprt->state)) 373 goto out_unlock; 374 if (xprt_need_congestion_window_wait(xprt)) 375 goto out_unlock; 376 if (rpc_wake_up_first_on_wq(xprtiod_workqueue, &xprt->sending, 377 __xprt_lock_write_func, xprt)) 378 return; 379 out_unlock: 380 xprt_clear_locked(xprt); 381 } 382 383 /** 384 * xprt_release_xprt - allow other requests to use a transport 385 * @xprt: transport with other tasks potentially waiting 386 * @task: task that is releasing access to the transport 387 * 388 * Note that "task" can be NULL. No congestion control is provided. 389 */ 390 void xprt_release_xprt(struct rpc_xprt *xprt, struct rpc_task *task) 391 { 392 if (xprt->snd_task == task) { 393 xprt_clear_locked(xprt); 394 __xprt_lock_write_next(xprt); 395 } 396 trace_xprt_release_xprt(xprt, task); 397 } 398 EXPORT_SYMBOL_GPL(xprt_release_xprt); 399 400 /** 401 * xprt_release_xprt_cong - allow other requests to use a transport 402 * @xprt: transport with other tasks potentially waiting 403 * @task: task that is releasing access to the transport 404 * 405 * Note that "task" can be NULL. Another task is awoken to use the 406 * transport if the transport's congestion window allows it. 407 */ 408 void xprt_release_xprt_cong(struct rpc_xprt *xprt, struct rpc_task *task) 409 { 410 if (xprt->snd_task == task) { 411 xprt_clear_locked(xprt); 412 __xprt_lock_write_next_cong(xprt); 413 } 414 trace_xprt_release_cong(xprt, task); 415 } 416 EXPORT_SYMBOL_GPL(xprt_release_xprt_cong); 417 418 static inline void xprt_release_write(struct rpc_xprt *xprt, struct rpc_task *task) 419 { 420 if (xprt->snd_task != task) 421 return; 422 spin_lock(&xprt->transport_lock); 423 xprt->ops->release_xprt(xprt, task); 424 spin_unlock(&xprt->transport_lock); 425 } 426 427 /* 428 * Van Jacobson congestion avoidance. Check if the congestion window 429 * overflowed. Put the task to sleep if this is the case. 430 */ 431 static int 432 __xprt_get_cong(struct rpc_xprt *xprt, struct rpc_rqst *req) 433 { 434 if (req->rq_cong) 435 return 1; 436 trace_xprt_get_cong(xprt, req->rq_task); 437 if (RPCXPRT_CONGESTED(xprt)) { 438 xprt_set_congestion_window_wait(xprt); 439 return 0; 440 } 441 req->rq_cong = 1; 442 xprt->cong += RPC_CWNDSCALE; 443 return 1; 444 } 445 446 /* 447 * Adjust the congestion window, and wake up the next task 448 * that has been sleeping due to congestion 449 */ 450 static void 451 __xprt_put_cong(struct rpc_xprt *xprt, struct rpc_rqst *req) 452 { 453 if (!req->rq_cong) 454 return; 455 req->rq_cong = 0; 456 xprt->cong -= RPC_CWNDSCALE; 457 xprt_test_and_clear_congestion_window_wait(xprt); 458 trace_xprt_put_cong(xprt, req->rq_task); 459 __xprt_lock_write_next_cong(xprt); 460 } 461 462 /** 463 * xprt_request_get_cong - Request congestion control credits 464 * @xprt: pointer to transport 465 * @req: pointer to RPC request 466 * 467 * Useful for transports that require congestion control. 468 */ 469 bool 470 xprt_request_get_cong(struct rpc_xprt *xprt, struct rpc_rqst *req) 471 { 472 bool ret = false; 473 474 if (req->rq_cong) 475 return true; 476 spin_lock(&xprt->transport_lock); 477 ret = __xprt_get_cong(xprt, req) != 0; 478 spin_unlock(&xprt->transport_lock); 479 return ret; 480 } 481 EXPORT_SYMBOL_GPL(xprt_request_get_cong); 482 483 /** 484 * xprt_release_rqst_cong - housekeeping when request is complete 485 * @task: RPC request that recently completed 486 * 487 * Useful for transports that require congestion control. 488 */ 489 void xprt_release_rqst_cong(struct rpc_task *task) 490 { 491 struct rpc_rqst *req = task->tk_rqstp; 492 493 __xprt_put_cong(req->rq_xprt, req); 494 } 495 EXPORT_SYMBOL_GPL(xprt_release_rqst_cong); 496 497 static void xprt_clear_congestion_window_wait_locked(struct rpc_xprt *xprt) 498 { 499 if (test_and_clear_bit(XPRT_CWND_WAIT, &xprt->state)) 500 __xprt_lock_write_next_cong(xprt); 501 } 502 503 /* 504 * Clear the congestion window wait flag and wake up the next 505 * entry on xprt->sending 506 */ 507 static void 508 xprt_clear_congestion_window_wait(struct rpc_xprt *xprt) 509 { 510 if (test_and_clear_bit(XPRT_CWND_WAIT, &xprt->state)) { 511 spin_lock(&xprt->transport_lock); 512 __xprt_lock_write_next_cong(xprt); 513 spin_unlock(&xprt->transport_lock); 514 } 515 } 516 517 /** 518 * xprt_adjust_cwnd - adjust transport congestion window 519 * @xprt: pointer to xprt 520 * @task: recently completed RPC request used to adjust window 521 * @result: result code of completed RPC request 522 * 523 * The transport code maintains an estimate on the maximum number of out- 524 * standing RPC requests, using a smoothed version of the congestion 525 * avoidance implemented in 44BSD. This is basically the Van Jacobson 526 * congestion algorithm: If a retransmit occurs, the congestion window is 527 * halved; otherwise, it is incremented by 1/cwnd when 528 * 529 * - a reply is received and 530 * - a full number of requests are outstanding and 531 * - the congestion window hasn't been updated recently. 532 */ 533 void xprt_adjust_cwnd(struct rpc_xprt *xprt, struct rpc_task *task, int result) 534 { 535 struct rpc_rqst *req = task->tk_rqstp; 536 unsigned long cwnd = xprt->cwnd; 537 538 if (result >= 0 && cwnd <= xprt->cong) { 539 /* The (cwnd >> 1) term makes sure 540 * the result gets rounded properly. */ 541 cwnd += (RPC_CWNDSCALE * RPC_CWNDSCALE + (cwnd >> 1)) / cwnd; 542 if (cwnd > RPC_MAXCWND(xprt)) 543 cwnd = RPC_MAXCWND(xprt); 544 __xprt_lock_write_next_cong(xprt); 545 } else if (result == -ETIMEDOUT) { 546 cwnd >>= 1; 547 if (cwnd < RPC_CWNDSCALE) 548 cwnd = RPC_CWNDSCALE; 549 } 550 dprintk("RPC: cong %ld, cwnd was %ld, now %ld\n", 551 xprt->cong, xprt->cwnd, cwnd); 552 xprt->cwnd = cwnd; 553 __xprt_put_cong(xprt, req); 554 } 555 EXPORT_SYMBOL_GPL(xprt_adjust_cwnd); 556 557 /** 558 * xprt_wake_pending_tasks - wake all tasks on a transport's pending queue 559 * @xprt: transport with waiting tasks 560 * @status: result code to plant in each task before waking it 561 * 562 */ 563 void xprt_wake_pending_tasks(struct rpc_xprt *xprt, int status) 564 { 565 if (status < 0) 566 rpc_wake_up_status(&xprt->pending, status); 567 else 568 rpc_wake_up(&xprt->pending); 569 } 570 EXPORT_SYMBOL_GPL(xprt_wake_pending_tasks); 571 572 /** 573 * xprt_wait_for_buffer_space - wait for transport output buffer to clear 574 * @xprt: transport 575 * 576 * Note that we only set the timer for the case of RPC_IS_SOFT(), since 577 * we don't in general want to force a socket disconnection due to 578 * an incomplete RPC call transmission. 579 */ 580 void xprt_wait_for_buffer_space(struct rpc_xprt *xprt) 581 { 582 set_bit(XPRT_WRITE_SPACE, &xprt->state); 583 } 584 EXPORT_SYMBOL_GPL(xprt_wait_for_buffer_space); 585 586 static bool 587 xprt_clear_write_space_locked(struct rpc_xprt *xprt) 588 { 589 if (test_and_clear_bit(XPRT_WRITE_SPACE, &xprt->state)) { 590 __xprt_lock_write_next(xprt); 591 dprintk("RPC: write space: waking waiting task on " 592 "xprt %p\n", xprt); 593 return true; 594 } 595 return false; 596 } 597 598 /** 599 * xprt_write_space - wake the task waiting for transport output buffer space 600 * @xprt: transport with waiting tasks 601 * 602 * Can be called in a soft IRQ context, so xprt_write_space never sleeps. 603 */ 604 bool xprt_write_space(struct rpc_xprt *xprt) 605 { 606 bool ret; 607 608 if (!test_bit(XPRT_WRITE_SPACE, &xprt->state)) 609 return false; 610 spin_lock(&xprt->transport_lock); 611 ret = xprt_clear_write_space_locked(xprt); 612 spin_unlock(&xprt->transport_lock); 613 return ret; 614 } 615 EXPORT_SYMBOL_GPL(xprt_write_space); 616 617 static unsigned long xprt_abs_ktime_to_jiffies(ktime_t abstime) 618 { 619 s64 delta = ktime_to_ns(ktime_get() - abstime); 620 return likely(delta >= 0) ? 621 jiffies - nsecs_to_jiffies(delta) : 622 jiffies + nsecs_to_jiffies(-delta); 623 } 624 625 static unsigned long xprt_calc_majortimeo(struct rpc_rqst *req) 626 { 627 const struct rpc_timeout *to = req->rq_task->tk_client->cl_timeout; 628 unsigned long majortimeo = req->rq_timeout; 629 630 if (to->to_exponential) 631 majortimeo <<= to->to_retries; 632 else 633 majortimeo += to->to_increment * to->to_retries; 634 if (majortimeo > to->to_maxval || majortimeo == 0) 635 majortimeo = to->to_maxval; 636 return majortimeo; 637 } 638 639 static void xprt_reset_majortimeo(struct rpc_rqst *req) 640 { 641 req->rq_majortimeo += xprt_calc_majortimeo(req); 642 } 643 644 static void xprt_reset_minortimeo(struct rpc_rqst *req) 645 { 646 req->rq_minortimeo += req->rq_timeout; 647 } 648 649 static void xprt_init_majortimeo(struct rpc_task *task, struct rpc_rqst *req) 650 { 651 unsigned long time_init; 652 struct rpc_xprt *xprt = req->rq_xprt; 653 654 if (likely(xprt && xprt_connected(xprt))) 655 time_init = jiffies; 656 else 657 time_init = xprt_abs_ktime_to_jiffies(task->tk_start); 658 req->rq_timeout = task->tk_client->cl_timeout->to_initval; 659 req->rq_majortimeo = time_init + xprt_calc_majortimeo(req); 660 req->rq_minortimeo = time_init + req->rq_timeout; 661 } 662 663 /** 664 * xprt_adjust_timeout - adjust timeout values for next retransmit 665 * @req: RPC request containing parameters to use for the adjustment 666 * 667 */ 668 int xprt_adjust_timeout(struct rpc_rqst *req) 669 { 670 struct rpc_xprt *xprt = req->rq_xprt; 671 const struct rpc_timeout *to = req->rq_task->tk_client->cl_timeout; 672 int status = 0; 673 674 if (time_before(jiffies, req->rq_majortimeo)) { 675 if (time_before(jiffies, req->rq_minortimeo)) 676 return status; 677 if (to->to_exponential) 678 req->rq_timeout <<= 1; 679 else 680 req->rq_timeout += to->to_increment; 681 if (to->to_maxval && req->rq_timeout >= to->to_maxval) 682 req->rq_timeout = to->to_maxval; 683 req->rq_retries++; 684 } else { 685 req->rq_timeout = to->to_initval; 686 req->rq_retries = 0; 687 xprt_reset_majortimeo(req); 688 /* Reset the RTT counters == "slow start" */ 689 spin_lock(&xprt->transport_lock); 690 rpc_init_rtt(req->rq_task->tk_client->cl_rtt, to->to_initval); 691 spin_unlock(&xprt->transport_lock); 692 status = -ETIMEDOUT; 693 } 694 xprt_reset_minortimeo(req); 695 696 if (req->rq_timeout == 0) { 697 printk(KERN_WARNING "xprt_adjust_timeout: rq_timeout = 0!\n"); 698 req->rq_timeout = 5 * HZ; 699 } 700 return status; 701 } 702 703 static void xprt_autoclose(struct work_struct *work) 704 { 705 struct rpc_xprt *xprt = 706 container_of(work, struct rpc_xprt, task_cleanup); 707 unsigned int pflags = memalloc_nofs_save(); 708 709 trace_xprt_disconnect_auto(xprt); 710 clear_bit(XPRT_CLOSE_WAIT, &xprt->state); 711 xprt->ops->close(xprt); 712 xprt_release_write(xprt, NULL); 713 wake_up_bit(&xprt->state, XPRT_LOCKED); 714 memalloc_nofs_restore(pflags); 715 } 716 717 /** 718 * xprt_disconnect_done - mark a transport as disconnected 719 * @xprt: transport to flag for disconnect 720 * 721 */ 722 void xprt_disconnect_done(struct rpc_xprt *xprt) 723 { 724 trace_xprt_disconnect_done(xprt); 725 spin_lock(&xprt->transport_lock); 726 xprt_clear_connected(xprt); 727 xprt_clear_write_space_locked(xprt); 728 xprt_clear_congestion_window_wait_locked(xprt); 729 xprt_wake_pending_tasks(xprt, -ENOTCONN); 730 spin_unlock(&xprt->transport_lock); 731 } 732 EXPORT_SYMBOL_GPL(xprt_disconnect_done); 733 734 /** 735 * xprt_force_disconnect - force a transport to disconnect 736 * @xprt: transport to disconnect 737 * 738 */ 739 void xprt_force_disconnect(struct rpc_xprt *xprt) 740 { 741 trace_xprt_disconnect_force(xprt); 742 743 /* Don't race with the test_bit() in xprt_clear_locked() */ 744 spin_lock(&xprt->transport_lock); 745 set_bit(XPRT_CLOSE_WAIT, &xprt->state); 746 /* Try to schedule an autoclose RPC call */ 747 if (test_and_set_bit(XPRT_LOCKED, &xprt->state) == 0) 748 queue_work(xprtiod_workqueue, &xprt->task_cleanup); 749 else if (xprt->snd_task) 750 rpc_wake_up_queued_task_set_status(&xprt->pending, 751 xprt->snd_task, -ENOTCONN); 752 spin_unlock(&xprt->transport_lock); 753 } 754 EXPORT_SYMBOL_GPL(xprt_force_disconnect); 755 756 static unsigned int 757 xprt_connect_cookie(struct rpc_xprt *xprt) 758 { 759 return READ_ONCE(xprt->connect_cookie); 760 } 761 762 static bool 763 xprt_request_retransmit_after_disconnect(struct rpc_task *task) 764 { 765 struct rpc_rqst *req = task->tk_rqstp; 766 struct rpc_xprt *xprt = req->rq_xprt; 767 768 return req->rq_connect_cookie != xprt_connect_cookie(xprt) || 769 !xprt_connected(xprt); 770 } 771 772 /** 773 * xprt_conditional_disconnect - force a transport to disconnect 774 * @xprt: transport to disconnect 775 * @cookie: 'connection cookie' 776 * 777 * This attempts to break the connection if and only if 'cookie' matches 778 * the current transport 'connection cookie'. It ensures that we don't 779 * try to break the connection more than once when we need to retransmit 780 * a batch of RPC requests. 781 * 782 */ 783 void xprt_conditional_disconnect(struct rpc_xprt *xprt, unsigned int cookie) 784 { 785 /* Don't race with the test_bit() in xprt_clear_locked() */ 786 spin_lock(&xprt->transport_lock); 787 if (cookie != xprt->connect_cookie) 788 goto out; 789 if (test_bit(XPRT_CLOSING, &xprt->state)) 790 goto out; 791 set_bit(XPRT_CLOSE_WAIT, &xprt->state); 792 /* Try to schedule an autoclose RPC call */ 793 if (test_and_set_bit(XPRT_LOCKED, &xprt->state) == 0) 794 queue_work(xprtiod_workqueue, &xprt->task_cleanup); 795 xprt_wake_pending_tasks(xprt, -EAGAIN); 796 out: 797 spin_unlock(&xprt->transport_lock); 798 } 799 800 static bool 801 xprt_has_timer(const struct rpc_xprt *xprt) 802 { 803 return xprt->idle_timeout != 0; 804 } 805 806 static void 807 xprt_schedule_autodisconnect(struct rpc_xprt *xprt) 808 __must_hold(&xprt->transport_lock) 809 { 810 xprt->last_used = jiffies; 811 if (RB_EMPTY_ROOT(&xprt->recv_queue) && xprt_has_timer(xprt)) 812 mod_timer(&xprt->timer, xprt->last_used + xprt->idle_timeout); 813 } 814 815 static void 816 xprt_init_autodisconnect(struct timer_list *t) 817 { 818 struct rpc_xprt *xprt = from_timer(xprt, t, timer); 819 820 if (!RB_EMPTY_ROOT(&xprt->recv_queue)) 821 return; 822 /* Reset xprt->last_used to avoid connect/autodisconnect cycling */ 823 xprt->last_used = jiffies; 824 if (test_and_set_bit(XPRT_LOCKED, &xprt->state)) 825 return; 826 queue_work(xprtiod_workqueue, &xprt->task_cleanup); 827 } 828 829 bool xprt_lock_connect(struct rpc_xprt *xprt, 830 struct rpc_task *task, 831 void *cookie) 832 { 833 bool ret = false; 834 835 spin_lock(&xprt->transport_lock); 836 if (!test_bit(XPRT_LOCKED, &xprt->state)) 837 goto out; 838 if (xprt->snd_task != task) 839 goto out; 840 xprt->snd_task = cookie; 841 ret = true; 842 out: 843 spin_unlock(&xprt->transport_lock); 844 return ret; 845 } 846 847 void xprt_unlock_connect(struct rpc_xprt *xprt, void *cookie) 848 { 849 spin_lock(&xprt->transport_lock); 850 if (xprt->snd_task != cookie) 851 goto out; 852 if (!test_bit(XPRT_LOCKED, &xprt->state)) 853 goto out; 854 xprt->snd_task =NULL; 855 xprt->ops->release_xprt(xprt, NULL); 856 xprt_schedule_autodisconnect(xprt); 857 out: 858 spin_unlock(&xprt->transport_lock); 859 wake_up_bit(&xprt->state, XPRT_LOCKED); 860 } 861 862 /** 863 * xprt_connect - schedule a transport connect operation 864 * @task: RPC task that is requesting the connect 865 * 866 */ 867 void xprt_connect(struct rpc_task *task) 868 { 869 struct rpc_xprt *xprt = task->tk_rqstp->rq_xprt; 870 871 trace_xprt_connect(xprt); 872 873 if (!xprt_bound(xprt)) { 874 task->tk_status = -EAGAIN; 875 return; 876 } 877 if (!xprt_lock_write(xprt, task)) 878 return; 879 880 if (test_and_clear_bit(XPRT_CLOSE_WAIT, &xprt->state)) { 881 trace_xprt_disconnect_cleanup(xprt); 882 xprt->ops->close(xprt); 883 } 884 885 if (!xprt_connected(xprt)) { 886 task->tk_rqstp->rq_connect_cookie = xprt->connect_cookie; 887 rpc_sleep_on_timeout(&xprt->pending, task, NULL, 888 xprt_request_timeout(task->tk_rqstp)); 889 890 if (test_bit(XPRT_CLOSING, &xprt->state)) 891 return; 892 if (xprt_test_and_set_connecting(xprt)) 893 return; 894 /* Race breaker */ 895 if (!xprt_connected(xprt)) { 896 xprt->stat.connect_start = jiffies; 897 xprt->ops->connect(xprt, task); 898 } else { 899 xprt_clear_connecting(xprt); 900 task->tk_status = 0; 901 rpc_wake_up_queued_task(&xprt->pending, task); 902 } 903 } 904 xprt_release_write(xprt, task); 905 } 906 907 /** 908 * xprt_reconnect_delay - compute the wait before scheduling a connect 909 * @xprt: transport instance 910 * 911 */ 912 unsigned long xprt_reconnect_delay(const struct rpc_xprt *xprt) 913 { 914 unsigned long start, now = jiffies; 915 916 start = xprt->stat.connect_start + xprt->reestablish_timeout; 917 if (time_after(start, now)) 918 return start - now; 919 return 0; 920 } 921 EXPORT_SYMBOL_GPL(xprt_reconnect_delay); 922 923 /** 924 * xprt_reconnect_backoff - compute the new re-establish timeout 925 * @xprt: transport instance 926 * @init_to: initial reestablish timeout 927 * 928 */ 929 void xprt_reconnect_backoff(struct rpc_xprt *xprt, unsigned long init_to) 930 { 931 xprt->reestablish_timeout <<= 1; 932 if (xprt->reestablish_timeout > xprt->max_reconnect_timeout) 933 xprt->reestablish_timeout = xprt->max_reconnect_timeout; 934 if (xprt->reestablish_timeout < init_to) 935 xprt->reestablish_timeout = init_to; 936 } 937 EXPORT_SYMBOL_GPL(xprt_reconnect_backoff); 938 939 enum xprt_xid_rb_cmp { 940 XID_RB_EQUAL, 941 XID_RB_LEFT, 942 XID_RB_RIGHT, 943 }; 944 static enum xprt_xid_rb_cmp 945 xprt_xid_cmp(__be32 xid1, __be32 xid2) 946 { 947 if (xid1 == xid2) 948 return XID_RB_EQUAL; 949 if ((__force u32)xid1 < (__force u32)xid2) 950 return XID_RB_LEFT; 951 return XID_RB_RIGHT; 952 } 953 954 static struct rpc_rqst * 955 xprt_request_rb_find(struct rpc_xprt *xprt, __be32 xid) 956 { 957 struct rb_node *n = xprt->recv_queue.rb_node; 958 struct rpc_rqst *req; 959 960 while (n != NULL) { 961 req = rb_entry(n, struct rpc_rqst, rq_recv); 962 switch (xprt_xid_cmp(xid, req->rq_xid)) { 963 case XID_RB_LEFT: 964 n = n->rb_left; 965 break; 966 case XID_RB_RIGHT: 967 n = n->rb_right; 968 break; 969 case XID_RB_EQUAL: 970 return req; 971 } 972 } 973 return NULL; 974 } 975 976 static void 977 xprt_request_rb_insert(struct rpc_xprt *xprt, struct rpc_rqst *new) 978 { 979 struct rb_node **p = &xprt->recv_queue.rb_node; 980 struct rb_node *n = NULL; 981 struct rpc_rqst *req; 982 983 while (*p != NULL) { 984 n = *p; 985 req = rb_entry(n, struct rpc_rqst, rq_recv); 986 switch(xprt_xid_cmp(new->rq_xid, req->rq_xid)) { 987 case XID_RB_LEFT: 988 p = &n->rb_left; 989 break; 990 case XID_RB_RIGHT: 991 p = &n->rb_right; 992 break; 993 case XID_RB_EQUAL: 994 WARN_ON_ONCE(new != req); 995 return; 996 } 997 } 998 rb_link_node(&new->rq_recv, n, p); 999 rb_insert_color(&new->rq_recv, &xprt->recv_queue); 1000 } 1001 1002 static void 1003 xprt_request_rb_remove(struct rpc_xprt *xprt, struct rpc_rqst *req) 1004 { 1005 rb_erase(&req->rq_recv, &xprt->recv_queue); 1006 } 1007 1008 /** 1009 * xprt_lookup_rqst - find an RPC request corresponding to an XID 1010 * @xprt: transport on which the original request was transmitted 1011 * @xid: RPC XID of incoming reply 1012 * 1013 * Caller holds xprt->queue_lock. 1014 */ 1015 struct rpc_rqst *xprt_lookup_rqst(struct rpc_xprt *xprt, __be32 xid) 1016 { 1017 struct rpc_rqst *entry; 1018 1019 entry = xprt_request_rb_find(xprt, xid); 1020 if (entry != NULL) { 1021 trace_xprt_lookup_rqst(xprt, xid, 0); 1022 entry->rq_rtt = ktime_sub(ktime_get(), entry->rq_xtime); 1023 return entry; 1024 } 1025 1026 dprintk("RPC: xprt_lookup_rqst did not find xid %08x\n", 1027 ntohl(xid)); 1028 trace_xprt_lookup_rqst(xprt, xid, -ENOENT); 1029 xprt->stat.bad_xids++; 1030 return NULL; 1031 } 1032 EXPORT_SYMBOL_GPL(xprt_lookup_rqst); 1033 1034 static bool 1035 xprt_is_pinned_rqst(struct rpc_rqst *req) 1036 { 1037 return atomic_read(&req->rq_pin) != 0; 1038 } 1039 1040 /** 1041 * xprt_pin_rqst - Pin a request on the transport receive list 1042 * @req: Request to pin 1043 * 1044 * Caller must ensure this is atomic with the call to xprt_lookup_rqst() 1045 * so should be holding xprt->queue_lock. 1046 */ 1047 void xprt_pin_rqst(struct rpc_rqst *req) 1048 { 1049 atomic_inc(&req->rq_pin); 1050 } 1051 EXPORT_SYMBOL_GPL(xprt_pin_rqst); 1052 1053 /** 1054 * xprt_unpin_rqst - Unpin a request on the transport receive list 1055 * @req: Request to pin 1056 * 1057 * Caller should be holding xprt->queue_lock. 1058 */ 1059 void xprt_unpin_rqst(struct rpc_rqst *req) 1060 { 1061 if (!test_bit(RPC_TASK_MSG_PIN_WAIT, &req->rq_task->tk_runstate)) { 1062 atomic_dec(&req->rq_pin); 1063 return; 1064 } 1065 if (atomic_dec_and_test(&req->rq_pin)) 1066 wake_up_var(&req->rq_pin); 1067 } 1068 EXPORT_SYMBOL_GPL(xprt_unpin_rqst); 1069 1070 static void xprt_wait_on_pinned_rqst(struct rpc_rqst *req) 1071 { 1072 wait_var_event(&req->rq_pin, !xprt_is_pinned_rqst(req)); 1073 } 1074 1075 static bool 1076 xprt_request_data_received(struct rpc_task *task) 1077 { 1078 return !test_bit(RPC_TASK_NEED_RECV, &task->tk_runstate) && 1079 READ_ONCE(task->tk_rqstp->rq_reply_bytes_recvd) != 0; 1080 } 1081 1082 static bool 1083 xprt_request_need_enqueue_receive(struct rpc_task *task, struct rpc_rqst *req) 1084 { 1085 return !test_bit(RPC_TASK_NEED_RECV, &task->tk_runstate) && 1086 READ_ONCE(task->tk_rqstp->rq_reply_bytes_recvd) == 0; 1087 } 1088 1089 /** 1090 * xprt_request_enqueue_receive - Add an request to the receive queue 1091 * @task: RPC task 1092 * 1093 */ 1094 void 1095 xprt_request_enqueue_receive(struct rpc_task *task) 1096 { 1097 struct rpc_rqst *req = task->tk_rqstp; 1098 struct rpc_xprt *xprt = req->rq_xprt; 1099 1100 if (!xprt_request_need_enqueue_receive(task, req)) 1101 return; 1102 1103 xprt_request_prepare(task->tk_rqstp); 1104 spin_lock(&xprt->queue_lock); 1105 1106 /* Update the softirq receive buffer */ 1107 memcpy(&req->rq_private_buf, &req->rq_rcv_buf, 1108 sizeof(req->rq_private_buf)); 1109 1110 /* Add request to the receive list */ 1111 xprt_request_rb_insert(xprt, req); 1112 set_bit(RPC_TASK_NEED_RECV, &task->tk_runstate); 1113 spin_unlock(&xprt->queue_lock); 1114 1115 /* Turn off autodisconnect */ 1116 del_singleshot_timer_sync(&xprt->timer); 1117 } 1118 1119 /** 1120 * xprt_request_dequeue_receive_locked - Remove a request from the receive queue 1121 * @task: RPC task 1122 * 1123 * Caller must hold xprt->queue_lock. 1124 */ 1125 static void 1126 xprt_request_dequeue_receive_locked(struct rpc_task *task) 1127 { 1128 struct rpc_rqst *req = task->tk_rqstp; 1129 1130 if (test_and_clear_bit(RPC_TASK_NEED_RECV, &task->tk_runstate)) 1131 xprt_request_rb_remove(req->rq_xprt, req); 1132 } 1133 1134 /** 1135 * xprt_update_rtt - Update RPC RTT statistics 1136 * @task: RPC request that recently completed 1137 * 1138 * Caller holds xprt->queue_lock. 1139 */ 1140 void xprt_update_rtt(struct rpc_task *task) 1141 { 1142 struct rpc_rqst *req = task->tk_rqstp; 1143 struct rpc_rtt *rtt = task->tk_client->cl_rtt; 1144 unsigned int timer = task->tk_msg.rpc_proc->p_timer; 1145 long m = usecs_to_jiffies(ktime_to_us(req->rq_rtt)); 1146 1147 if (timer) { 1148 if (req->rq_ntrans == 1) 1149 rpc_update_rtt(rtt, timer, m); 1150 rpc_set_timeo(rtt, timer, req->rq_ntrans - 1); 1151 } 1152 } 1153 EXPORT_SYMBOL_GPL(xprt_update_rtt); 1154 1155 /** 1156 * xprt_complete_rqst - called when reply processing is complete 1157 * @task: RPC request that recently completed 1158 * @copied: actual number of bytes received from the transport 1159 * 1160 * Caller holds xprt->queue_lock. 1161 */ 1162 void xprt_complete_rqst(struct rpc_task *task, int copied) 1163 { 1164 struct rpc_rqst *req = task->tk_rqstp; 1165 struct rpc_xprt *xprt = req->rq_xprt; 1166 1167 xprt->stat.recvs++; 1168 1169 req->rq_private_buf.len = copied; 1170 /* Ensure all writes are done before we update */ 1171 /* req->rq_reply_bytes_recvd */ 1172 smp_wmb(); 1173 req->rq_reply_bytes_recvd = copied; 1174 xprt_request_dequeue_receive_locked(task); 1175 rpc_wake_up_queued_task(&xprt->pending, task); 1176 } 1177 EXPORT_SYMBOL_GPL(xprt_complete_rqst); 1178 1179 static void xprt_timer(struct rpc_task *task) 1180 { 1181 struct rpc_rqst *req = task->tk_rqstp; 1182 struct rpc_xprt *xprt = req->rq_xprt; 1183 1184 if (task->tk_status != -ETIMEDOUT) 1185 return; 1186 1187 trace_xprt_timer(xprt, req->rq_xid, task->tk_status); 1188 if (!req->rq_reply_bytes_recvd) { 1189 if (xprt->ops->timer) 1190 xprt->ops->timer(xprt, task); 1191 } else 1192 task->tk_status = 0; 1193 } 1194 1195 /** 1196 * xprt_wait_for_reply_request_def - wait for reply 1197 * @task: pointer to rpc_task 1198 * 1199 * Set a request's retransmit timeout based on the transport's 1200 * default timeout parameters. Used by transports that don't adjust 1201 * the retransmit timeout based on round-trip time estimation, 1202 * and put the task to sleep on the pending queue. 1203 */ 1204 void xprt_wait_for_reply_request_def(struct rpc_task *task) 1205 { 1206 struct rpc_rqst *req = task->tk_rqstp; 1207 1208 rpc_sleep_on_timeout(&req->rq_xprt->pending, task, xprt_timer, 1209 xprt_request_timeout(req)); 1210 } 1211 EXPORT_SYMBOL_GPL(xprt_wait_for_reply_request_def); 1212 1213 /** 1214 * xprt_wait_for_reply_request_rtt - wait for reply using RTT estimator 1215 * @task: pointer to rpc_task 1216 * 1217 * Set a request's retransmit timeout using the RTT estimator, 1218 * and put the task to sleep on the pending queue. 1219 */ 1220 void xprt_wait_for_reply_request_rtt(struct rpc_task *task) 1221 { 1222 int timer = task->tk_msg.rpc_proc->p_timer; 1223 struct rpc_clnt *clnt = task->tk_client; 1224 struct rpc_rtt *rtt = clnt->cl_rtt; 1225 struct rpc_rqst *req = task->tk_rqstp; 1226 unsigned long max_timeout = clnt->cl_timeout->to_maxval; 1227 unsigned long timeout; 1228 1229 timeout = rpc_calc_rto(rtt, timer); 1230 timeout <<= rpc_ntimeo(rtt, timer) + req->rq_retries; 1231 if (timeout > max_timeout || timeout == 0) 1232 timeout = max_timeout; 1233 rpc_sleep_on_timeout(&req->rq_xprt->pending, task, xprt_timer, 1234 jiffies + timeout); 1235 } 1236 EXPORT_SYMBOL_GPL(xprt_wait_for_reply_request_rtt); 1237 1238 /** 1239 * xprt_request_wait_receive - wait for the reply to an RPC request 1240 * @task: RPC task about to send a request 1241 * 1242 */ 1243 void xprt_request_wait_receive(struct rpc_task *task) 1244 { 1245 struct rpc_rqst *req = task->tk_rqstp; 1246 struct rpc_xprt *xprt = req->rq_xprt; 1247 1248 if (!test_bit(RPC_TASK_NEED_RECV, &task->tk_runstate)) 1249 return; 1250 /* 1251 * Sleep on the pending queue if we're expecting a reply. 1252 * The spinlock ensures atomicity between the test of 1253 * req->rq_reply_bytes_recvd, and the call to rpc_sleep_on(). 1254 */ 1255 spin_lock(&xprt->queue_lock); 1256 if (test_bit(RPC_TASK_NEED_RECV, &task->tk_runstate)) { 1257 xprt->ops->wait_for_reply_request(task); 1258 /* 1259 * Send an extra queue wakeup call if the 1260 * connection was dropped in case the call to 1261 * rpc_sleep_on() raced. 1262 */ 1263 if (xprt_request_retransmit_after_disconnect(task)) 1264 rpc_wake_up_queued_task_set_status(&xprt->pending, 1265 task, -ENOTCONN); 1266 } 1267 spin_unlock(&xprt->queue_lock); 1268 } 1269 1270 static bool 1271 xprt_request_need_enqueue_transmit(struct rpc_task *task, struct rpc_rqst *req) 1272 { 1273 return !test_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate); 1274 } 1275 1276 /** 1277 * xprt_request_enqueue_transmit - queue a task for transmission 1278 * @task: pointer to rpc_task 1279 * 1280 * Add a task to the transmission queue. 1281 */ 1282 void 1283 xprt_request_enqueue_transmit(struct rpc_task *task) 1284 { 1285 struct rpc_rqst *pos, *req = task->tk_rqstp; 1286 struct rpc_xprt *xprt = req->rq_xprt; 1287 1288 if (xprt_request_need_enqueue_transmit(task, req)) { 1289 req->rq_bytes_sent = 0; 1290 spin_lock(&xprt->queue_lock); 1291 /* 1292 * Requests that carry congestion control credits are added 1293 * to the head of the list to avoid starvation issues. 1294 */ 1295 if (req->rq_cong) { 1296 xprt_clear_congestion_window_wait(xprt); 1297 list_for_each_entry(pos, &xprt->xmit_queue, rq_xmit) { 1298 if (pos->rq_cong) 1299 continue; 1300 /* Note: req is added _before_ pos */ 1301 list_add_tail(&req->rq_xmit, &pos->rq_xmit); 1302 INIT_LIST_HEAD(&req->rq_xmit2); 1303 goto out; 1304 } 1305 } else if (RPC_IS_SWAPPER(task)) { 1306 list_for_each_entry(pos, &xprt->xmit_queue, rq_xmit) { 1307 if (pos->rq_cong || pos->rq_bytes_sent) 1308 continue; 1309 if (RPC_IS_SWAPPER(pos->rq_task)) 1310 continue; 1311 /* Note: req is added _before_ pos */ 1312 list_add_tail(&req->rq_xmit, &pos->rq_xmit); 1313 INIT_LIST_HEAD(&req->rq_xmit2); 1314 goto out; 1315 } 1316 } else if (!req->rq_seqno) { 1317 list_for_each_entry(pos, &xprt->xmit_queue, rq_xmit) { 1318 if (pos->rq_task->tk_owner != task->tk_owner) 1319 continue; 1320 list_add_tail(&req->rq_xmit2, &pos->rq_xmit2); 1321 INIT_LIST_HEAD(&req->rq_xmit); 1322 goto out; 1323 } 1324 } 1325 list_add_tail(&req->rq_xmit, &xprt->xmit_queue); 1326 INIT_LIST_HEAD(&req->rq_xmit2); 1327 out: 1328 set_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate); 1329 spin_unlock(&xprt->queue_lock); 1330 } 1331 } 1332 1333 /** 1334 * xprt_request_dequeue_transmit_locked - remove a task from the transmission queue 1335 * @task: pointer to rpc_task 1336 * 1337 * Remove a task from the transmission queue 1338 * Caller must hold xprt->queue_lock 1339 */ 1340 static void 1341 xprt_request_dequeue_transmit_locked(struct rpc_task *task) 1342 { 1343 struct rpc_rqst *req = task->tk_rqstp; 1344 1345 if (!test_and_clear_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate)) 1346 return; 1347 if (!list_empty(&req->rq_xmit)) { 1348 list_del(&req->rq_xmit); 1349 if (!list_empty(&req->rq_xmit2)) { 1350 struct rpc_rqst *next = list_first_entry(&req->rq_xmit2, 1351 struct rpc_rqst, rq_xmit2); 1352 list_del(&req->rq_xmit2); 1353 list_add_tail(&next->rq_xmit, &next->rq_xprt->xmit_queue); 1354 } 1355 } else 1356 list_del(&req->rq_xmit2); 1357 } 1358 1359 /** 1360 * xprt_request_dequeue_transmit - remove a task from the transmission queue 1361 * @task: pointer to rpc_task 1362 * 1363 * Remove a task from the transmission queue 1364 */ 1365 static void 1366 xprt_request_dequeue_transmit(struct rpc_task *task) 1367 { 1368 struct rpc_rqst *req = task->tk_rqstp; 1369 struct rpc_xprt *xprt = req->rq_xprt; 1370 1371 spin_lock(&xprt->queue_lock); 1372 xprt_request_dequeue_transmit_locked(task); 1373 spin_unlock(&xprt->queue_lock); 1374 } 1375 1376 /** 1377 * xprt_request_dequeue_xprt - remove a task from the transmit+receive queue 1378 * @task: pointer to rpc_task 1379 * 1380 * Remove a task from the transmit and receive queues, and ensure that 1381 * it is not pinned by the receive work item. 1382 */ 1383 void 1384 xprt_request_dequeue_xprt(struct rpc_task *task) 1385 { 1386 struct rpc_rqst *req = task->tk_rqstp; 1387 struct rpc_xprt *xprt = req->rq_xprt; 1388 1389 if (test_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate) || 1390 test_bit(RPC_TASK_NEED_RECV, &task->tk_runstate) || 1391 xprt_is_pinned_rqst(req)) { 1392 spin_lock(&xprt->queue_lock); 1393 xprt_request_dequeue_transmit_locked(task); 1394 xprt_request_dequeue_receive_locked(task); 1395 while (xprt_is_pinned_rqst(req)) { 1396 set_bit(RPC_TASK_MSG_PIN_WAIT, &task->tk_runstate); 1397 spin_unlock(&xprt->queue_lock); 1398 xprt_wait_on_pinned_rqst(req); 1399 spin_lock(&xprt->queue_lock); 1400 clear_bit(RPC_TASK_MSG_PIN_WAIT, &task->tk_runstate); 1401 } 1402 spin_unlock(&xprt->queue_lock); 1403 } 1404 } 1405 1406 /** 1407 * xprt_request_prepare - prepare an encoded request for transport 1408 * @req: pointer to rpc_rqst 1409 * 1410 * Calls into the transport layer to do whatever is needed to prepare 1411 * the request for transmission or receive. 1412 */ 1413 void 1414 xprt_request_prepare(struct rpc_rqst *req) 1415 { 1416 struct rpc_xprt *xprt = req->rq_xprt; 1417 1418 if (xprt->ops->prepare_request) 1419 xprt->ops->prepare_request(req); 1420 } 1421 1422 /** 1423 * xprt_request_need_retransmit - Test if a task needs retransmission 1424 * @task: pointer to rpc_task 1425 * 1426 * Test for whether a connection breakage requires the task to retransmit 1427 */ 1428 bool 1429 xprt_request_need_retransmit(struct rpc_task *task) 1430 { 1431 return xprt_request_retransmit_after_disconnect(task); 1432 } 1433 1434 /** 1435 * xprt_prepare_transmit - reserve the transport before sending a request 1436 * @task: RPC task about to send a request 1437 * 1438 */ 1439 bool xprt_prepare_transmit(struct rpc_task *task) 1440 { 1441 struct rpc_rqst *req = task->tk_rqstp; 1442 struct rpc_xprt *xprt = req->rq_xprt; 1443 1444 if (!xprt_lock_write(xprt, task)) { 1445 /* Race breaker: someone may have transmitted us */ 1446 if (!test_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate)) 1447 rpc_wake_up_queued_task_set_status(&xprt->sending, 1448 task, 0); 1449 return false; 1450 1451 } 1452 return true; 1453 } 1454 1455 void xprt_end_transmit(struct rpc_task *task) 1456 { 1457 struct rpc_xprt *xprt = task->tk_rqstp->rq_xprt; 1458 1459 xprt_inject_disconnect(xprt); 1460 xprt_release_write(xprt, task); 1461 } 1462 1463 /** 1464 * xprt_request_transmit - send an RPC request on a transport 1465 * @req: pointer to request to transmit 1466 * @snd_task: RPC task that owns the transport lock 1467 * 1468 * This performs the transmission of a single request. 1469 * Note that if the request is not the same as snd_task, then it 1470 * does need to be pinned. 1471 * Returns '0' on success. 1472 */ 1473 static int 1474 xprt_request_transmit(struct rpc_rqst *req, struct rpc_task *snd_task) 1475 { 1476 struct rpc_xprt *xprt = req->rq_xprt; 1477 struct rpc_task *task = req->rq_task; 1478 unsigned int connect_cookie; 1479 int is_retrans = RPC_WAS_SENT(task); 1480 int status; 1481 1482 if (!req->rq_bytes_sent) { 1483 if (xprt_request_data_received(task)) { 1484 status = 0; 1485 goto out_dequeue; 1486 } 1487 /* Verify that our message lies in the RPCSEC_GSS window */ 1488 if (rpcauth_xmit_need_reencode(task)) { 1489 status = -EBADMSG; 1490 goto out_dequeue; 1491 } 1492 if (RPC_SIGNALLED(task)) { 1493 status = -ERESTARTSYS; 1494 goto out_dequeue; 1495 } 1496 } 1497 1498 /* 1499 * Update req->rq_ntrans before transmitting to avoid races with 1500 * xprt_update_rtt(), which needs to know that it is recording a 1501 * reply to the first transmission. 1502 */ 1503 req->rq_ntrans++; 1504 1505 trace_rpc_xdr_sendto(task, &req->rq_snd_buf); 1506 connect_cookie = xprt->connect_cookie; 1507 status = xprt->ops->send_request(req); 1508 if (status != 0) { 1509 req->rq_ntrans--; 1510 trace_xprt_transmit(req, status); 1511 return status; 1512 } 1513 1514 if (is_retrans) 1515 task->tk_client->cl_stats->rpcretrans++; 1516 1517 xprt_inject_disconnect(xprt); 1518 1519 task->tk_flags |= RPC_TASK_SENT; 1520 spin_lock(&xprt->transport_lock); 1521 1522 xprt->stat.sends++; 1523 xprt->stat.req_u += xprt->stat.sends - xprt->stat.recvs; 1524 xprt->stat.bklog_u += xprt->backlog.qlen; 1525 xprt->stat.sending_u += xprt->sending.qlen; 1526 xprt->stat.pending_u += xprt->pending.qlen; 1527 spin_unlock(&xprt->transport_lock); 1528 1529 req->rq_connect_cookie = connect_cookie; 1530 out_dequeue: 1531 trace_xprt_transmit(req, status); 1532 xprt_request_dequeue_transmit(task); 1533 rpc_wake_up_queued_task_set_status(&xprt->sending, task, status); 1534 return status; 1535 } 1536 1537 /** 1538 * xprt_transmit - send an RPC request on a transport 1539 * @task: controlling RPC task 1540 * 1541 * Attempts to drain the transmit queue. On exit, either the transport 1542 * signalled an error that needs to be handled before transmission can 1543 * resume, or @task finished transmitting, and detected that it already 1544 * received a reply. 1545 */ 1546 void 1547 xprt_transmit(struct rpc_task *task) 1548 { 1549 struct rpc_rqst *next, *req = task->tk_rqstp; 1550 struct rpc_xprt *xprt = req->rq_xprt; 1551 int counter, status; 1552 1553 spin_lock(&xprt->queue_lock); 1554 counter = 0; 1555 while (!list_empty(&xprt->xmit_queue)) { 1556 if (++counter == 20) 1557 break; 1558 next = list_first_entry(&xprt->xmit_queue, 1559 struct rpc_rqst, rq_xmit); 1560 xprt_pin_rqst(next); 1561 spin_unlock(&xprt->queue_lock); 1562 status = xprt_request_transmit(next, task); 1563 if (status == -EBADMSG && next != req) 1564 status = 0; 1565 spin_lock(&xprt->queue_lock); 1566 xprt_unpin_rqst(next); 1567 if (status == 0) { 1568 if (!xprt_request_data_received(task) || 1569 test_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate)) 1570 continue; 1571 } else if (test_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate)) 1572 task->tk_status = status; 1573 break; 1574 } 1575 spin_unlock(&xprt->queue_lock); 1576 } 1577 1578 static void xprt_complete_request_init(struct rpc_task *task) 1579 { 1580 if (task->tk_rqstp) 1581 xprt_request_init(task); 1582 } 1583 1584 void xprt_add_backlog(struct rpc_xprt *xprt, struct rpc_task *task) 1585 { 1586 set_bit(XPRT_CONGESTED, &xprt->state); 1587 rpc_sleep_on(&xprt->backlog, task, xprt_complete_request_init); 1588 } 1589 EXPORT_SYMBOL_GPL(xprt_add_backlog); 1590 1591 static bool __xprt_set_rq(struct rpc_task *task, void *data) 1592 { 1593 struct rpc_rqst *req = data; 1594 1595 if (task->tk_rqstp == NULL) { 1596 memset(req, 0, sizeof(*req)); /* mark unused */ 1597 task->tk_rqstp = req; 1598 return true; 1599 } 1600 return false; 1601 } 1602 1603 bool xprt_wake_up_backlog(struct rpc_xprt *xprt, struct rpc_rqst *req) 1604 { 1605 if (rpc_wake_up_first(&xprt->backlog, __xprt_set_rq, req) == NULL) { 1606 clear_bit(XPRT_CONGESTED, &xprt->state); 1607 return false; 1608 } 1609 return true; 1610 } 1611 EXPORT_SYMBOL_GPL(xprt_wake_up_backlog); 1612 1613 static bool xprt_throttle_congested(struct rpc_xprt *xprt, struct rpc_task *task) 1614 { 1615 bool ret = false; 1616 1617 if (!test_bit(XPRT_CONGESTED, &xprt->state)) 1618 goto out; 1619 spin_lock(&xprt->reserve_lock); 1620 if (test_bit(XPRT_CONGESTED, &xprt->state)) { 1621 xprt_add_backlog(xprt, task); 1622 ret = true; 1623 } 1624 spin_unlock(&xprt->reserve_lock); 1625 out: 1626 return ret; 1627 } 1628 1629 static struct rpc_rqst *xprt_dynamic_alloc_slot(struct rpc_xprt *xprt) 1630 { 1631 struct rpc_rqst *req = ERR_PTR(-EAGAIN); 1632 1633 if (xprt->num_reqs >= xprt->max_reqs) 1634 goto out; 1635 ++xprt->num_reqs; 1636 spin_unlock(&xprt->reserve_lock); 1637 req = kzalloc(sizeof(struct rpc_rqst), GFP_NOFS); 1638 spin_lock(&xprt->reserve_lock); 1639 if (req != NULL) 1640 goto out; 1641 --xprt->num_reqs; 1642 req = ERR_PTR(-ENOMEM); 1643 out: 1644 return req; 1645 } 1646 1647 static bool xprt_dynamic_free_slot(struct rpc_xprt *xprt, struct rpc_rqst *req) 1648 { 1649 if (xprt->num_reqs > xprt->min_reqs) { 1650 --xprt->num_reqs; 1651 kfree(req); 1652 return true; 1653 } 1654 return false; 1655 } 1656 1657 void xprt_alloc_slot(struct rpc_xprt *xprt, struct rpc_task *task) 1658 { 1659 struct rpc_rqst *req; 1660 1661 spin_lock(&xprt->reserve_lock); 1662 if (!list_empty(&xprt->free)) { 1663 req = list_entry(xprt->free.next, struct rpc_rqst, rq_list); 1664 list_del(&req->rq_list); 1665 goto out_init_req; 1666 } 1667 req = xprt_dynamic_alloc_slot(xprt); 1668 if (!IS_ERR(req)) 1669 goto out_init_req; 1670 switch (PTR_ERR(req)) { 1671 case -ENOMEM: 1672 dprintk("RPC: dynamic allocation of request slot " 1673 "failed! Retrying\n"); 1674 task->tk_status = -ENOMEM; 1675 break; 1676 case -EAGAIN: 1677 xprt_add_backlog(xprt, task); 1678 dprintk("RPC: waiting for request slot\n"); 1679 fallthrough; 1680 default: 1681 task->tk_status = -EAGAIN; 1682 } 1683 spin_unlock(&xprt->reserve_lock); 1684 return; 1685 out_init_req: 1686 xprt->stat.max_slots = max_t(unsigned int, xprt->stat.max_slots, 1687 xprt->num_reqs); 1688 spin_unlock(&xprt->reserve_lock); 1689 1690 task->tk_status = 0; 1691 task->tk_rqstp = req; 1692 } 1693 EXPORT_SYMBOL_GPL(xprt_alloc_slot); 1694 1695 void xprt_free_slot(struct rpc_xprt *xprt, struct rpc_rqst *req) 1696 { 1697 spin_lock(&xprt->reserve_lock); 1698 if (!xprt_wake_up_backlog(xprt, req) && 1699 !xprt_dynamic_free_slot(xprt, req)) { 1700 memset(req, 0, sizeof(*req)); /* mark unused */ 1701 list_add(&req->rq_list, &xprt->free); 1702 } 1703 spin_unlock(&xprt->reserve_lock); 1704 } 1705 EXPORT_SYMBOL_GPL(xprt_free_slot); 1706 1707 static void xprt_free_all_slots(struct rpc_xprt *xprt) 1708 { 1709 struct rpc_rqst *req; 1710 while (!list_empty(&xprt->free)) { 1711 req = list_first_entry(&xprt->free, struct rpc_rqst, rq_list); 1712 list_del(&req->rq_list); 1713 kfree(req); 1714 } 1715 } 1716 1717 struct rpc_xprt *xprt_alloc(struct net *net, size_t size, 1718 unsigned int num_prealloc, 1719 unsigned int max_alloc) 1720 { 1721 struct rpc_xprt *xprt; 1722 struct rpc_rqst *req; 1723 int i; 1724 1725 xprt = kzalloc(size, GFP_KERNEL); 1726 if (xprt == NULL) 1727 goto out; 1728 1729 xprt_init(xprt, net); 1730 1731 for (i = 0; i < num_prealloc; i++) { 1732 req = kzalloc(sizeof(struct rpc_rqst), GFP_KERNEL); 1733 if (!req) 1734 goto out_free; 1735 list_add(&req->rq_list, &xprt->free); 1736 } 1737 if (max_alloc > num_prealloc) 1738 xprt->max_reqs = max_alloc; 1739 else 1740 xprt->max_reqs = num_prealloc; 1741 xprt->min_reqs = num_prealloc; 1742 xprt->num_reqs = num_prealloc; 1743 1744 return xprt; 1745 1746 out_free: 1747 xprt_free(xprt); 1748 out: 1749 return NULL; 1750 } 1751 EXPORT_SYMBOL_GPL(xprt_alloc); 1752 1753 void xprt_free(struct rpc_xprt *xprt) 1754 { 1755 put_net(xprt->xprt_net); 1756 xprt_free_all_slots(xprt); 1757 kfree_rcu(xprt, rcu); 1758 } 1759 EXPORT_SYMBOL_GPL(xprt_free); 1760 1761 static void 1762 xprt_init_connect_cookie(struct rpc_rqst *req, struct rpc_xprt *xprt) 1763 { 1764 req->rq_connect_cookie = xprt_connect_cookie(xprt) - 1; 1765 } 1766 1767 static __be32 1768 xprt_alloc_xid(struct rpc_xprt *xprt) 1769 { 1770 __be32 xid; 1771 1772 spin_lock(&xprt->reserve_lock); 1773 xid = (__force __be32)xprt->xid++; 1774 spin_unlock(&xprt->reserve_lock); 1775 return xid; 1776 } 1777 1778 static void 1779 xprt_init_xid(struct rpc_xprt *xprt) 1780 { 1781 xprt->xid = prandom_u32(); 1782 } 1783 1784 static void 1785 xprt_request_init(struct rpc_task *task) 1786 { 1787 struct rpc_xprt *xprt = task->tk_xprt; 1788 struct rpc_rqst *req = task->tk_rqstp; 1789 1790 req->rq_task = task; 1791 req->rq_xprt = xprt; 1792 req->rq_buffer = NULL; 1793 req->rq_xid = xprt_alloc_xid(xprt); 1794 xprt_init_connect_cookie(req, xprt); 1795 req->rq_snd_buf.len = 0; 1796 req->rq_snd_buf.buflen = 0; 1797 req->rq_rcv_buf.len = 0; 1798 req->rq_rcv_buf.buflen = 0; 1799 req->rq_snd_buf.bvec = NULL; 1800 req->rq_rcv_buf.bvec = NULL; 1801 req->rq_release_snd_buf = NULL; 1802 xprt_init_majortimeo(task, req); 1803 1804 trace_xprt_reserve(req); 1805 } 1806 1807 static void 1808 xprt_do_reserve(struct rpc_xprt *xprt, struct rpc_task *task) 1809 { 1810 xprt->ops->alloc_slot(xprt, task); 1811 if (task->tk_rqstp != NULL) 1812 xprt_request_init(task); 1813 } 1814 1815 /** 1816 * xprt_reserve - allocate an RPC request slot 1817 * @task: RPC task requesting a slot allocation 1818 * 1819 * If the transport is marked as being congested, or if no more 1820 * slots are available, place the task on the transport's 1821 * backlog queue. 1822 */ 1823 void xprt_reserve(struct rpc_task *task) 1824 { 1825 struct rpc_xprt *xprt = task->tk_xprt; 1826 1827 task->tk_status = 0; 1828 if (task->tk_rqstp != NULL) 1829 return; 1830 1831 task->tk_status = -EAGAIN; 1832 if (!xprt_throttle_congested(xprt, task)) 1833 xprt_do_reserve(xprt, task); 1834 } 1835 1836 /** 1837 * xprt_retry_reserve - allocate an RPC request slot 1838 * @task: RPC task requesting a slot allocation 1839 * 1840 * If no more slots are available, place the task on the transport's 1841 * backlog queue. 1842 * Note that the only difference with xprt_reserve is that we now 1843 * ignore the value of the XPRT_CONGESTED flag. 1844 */ 1845 void xprt_retry_reserve(struct rpc_task *task) 1846 { 1847 struct rpc_xprt *xprt = task->tk_xprt; 1848 1849 task->tk_status = 0; 1850 if (task->tk_rqstp != NULL) 1851 return; 1852 1853 task->tk_status = -EAGAIN; 1854 xprt_do_reserve(xprt, task); 1855 } 1856 1857 /** 1858 * xprt_release - release an RPC request slot 1859 * @task: task which is finished with the slot 1860 * 1861 */ 1862 void xprt_release(struct rpc_task *task) 1863 { 1864 struct rpc_xprt *xprt; 1865 struct rpc_rqst *req = task->tk_rqstp; 1866 1867 if (req == NULL) { 1868 if (task->tk_client) { 1869 xprt = task->tk_xprt; 1870 xprt_release_write(xprt, task); 1871 } 1872 return; 1873 } 1874 1875 xprt = req->rq_xprt; 1876 xprt_request_dequeue_xprt(task); 1877 spin_lock(&xprt->transport_lock); 1878 xprt->ops->release_xprt(xprt, task); 1879 if (xprt->ops->release_request) 1880 xprt->ops->release_request(task); 1881 xprt_schedule_autodisconnect(xprt); 1882 spin_unlock(&xprt->transport_lock); 1883 if (req->rq_buffer) 1884 xprt->ops->buf_free(task); 1885 xdr_free_bvec(&req->rq_rcv_buf); 1886 xdr_free_bvec(&req->rq_snd_buf); 1887 if (req->rq_cred != NULL) 1888 put_rpccred(req->rq_cred); 1889 if (req->rq_release_snd_buf) 1890 req->rq_release_snd_buf(req); 1891 1892 task->tk_rqstp = NULL; 1893 if (likely(!bc_prealloc(req))) 1894 xprt->ops->free_slot(xprt, req); 1895 else 1896 xprt_free_bc_request(req); 1897 } 1898 1899 #ifdef CONFIG_SUNRPC_BACKCHANNEL 1900 void 1901 xprt_init_bc_request(struct rpc_rqst *req, struct rpc_task *task) 1902 { 1903 struct xdr_buf *xbufp = &req->rq_snd_buf; 1904 1905 task->tk_rqstp = req; 1906 req->rq_task = task; 1907 xprt_init_connect_cookie(req, req->rq_xprt); 1908 /* 1909 * Set up the xdr_buf length. 1910 * This also indicates that the buffer is XDR encoded already. 1911 */ 1912 xbufp->len = xbufp->head[0].iov_len + xbufp->page_len + 1913 xbufp->tail[0].iov_len; 1914 } 1915 #endif 1916 1917 static void xprt_init(struct rpc_xprt *xprt, struct net *net) 1918 { 1919 kref_init(&xprt->kref); 1920 1921 spin_lock_init(&xprt->transport_lock); 1922 spin_lock_init(&xprt->reserve_lock); 1923 spin_lock_init(&xprt->queue_lock); 1924 1925 INIT_LIST_HEAD(&xprt->free); 1926 xprt->recv_queue = RB_ROOT; 1927 INIT_LIST_HEAD(&xprt->xmit_queue); 1928 #if defined(CONFIG_SUNRPC_BACKCHANNEL) 1929 spin_lock_init(&xprt->bc_pa_lock); 1930 INIT_LIST_HEAD(&xprt->bc_pa_list); 1931 #endif /* CONFIG_SUNRPC_BACKCHANNEL */ 1932 INIT_LIST_HEAD(&xprt->xprt_switch); 1933 1934 xprt->last_used = jiffies; 1935 xprt->cwnd = RPC_INITCWND; 1936 xprt->bind_index = 0; 1937 1938 rpc_init_wait_queue(&xprt->binding, "xprt_binding"); 1939 rpc_init_wait_queue(&xprt->pending, "xprt_pending"); 1940 rpc_init_wait_queue(&xprt->sending, "xprt_sending"); 1941 rpc_init_priority_wait_queue(&xprt->backlog, "xprt_backlog"); 1942 1943 xprt_init_xid(xprt); 1944 1945 xprt->xprt_net = get_net(net); 1946 } 1947 1948 /** 1949 * xprt_create_transport - create an RPC transport 1950 * @args: rpc transport creation arguments 1951 * 1952 */ 1953 struct rpc_xprt *xprt_create_transport(struct xprt_create *args) 1954 { 1955 struct rpc_xprt *xprt; 1956 struct xprt_class *t; 1957 1958 spin_lock(&xprt_list_lock); 1959 list_for_each_entry(t, &xprt_list, list) { 1960 if (t->ident == args->ident) { 1961 spin_unlock(&xprt_list_lock); 1962 goto found; 1963 } 1964 } 1965 spin_unlock(&xprt_list_lock); 1966 dprintk("RPC: transport (%d) not supported\n", args->ident); 1967 return ERR_PTR(-EIO); 1968 1969 found: 1970 xprt = t->setup(args); 1971 if (IS_ERR(xprt)) 1972 goto out; 1973 if (args->flags & XPRT_CREATE_NO_IDLE_TIMEOUT) 1974 xprt->idle_timeout = 0; 1975 INIT_WORK(&xprt->task_cleanup, xprt_autoclose); 1976 if (xprt_has_timer(xprt)) 1977 timer_setup(&xprt->timer, xprt_init_autodisconnect, 0); 1978 else 1979 timer_setup(&xprt->timer, NULL, 0); 1980 1981 if (strlen(args->servername) > RPC_MAXNETNAMELEN) { 1982 xprt_destroy(xprt); 1983 return ERR_PTR(-EINVAL); 1984 } 1985 xprt->servername = kstrdup(args->servername, GFP_KERNEL); 1986 if (xprt->servername == NULL) { 1987 xprt_destroy(xprt); 1988 return ERR_PTR(-ENOMEM); 1989 } 1990 1991 rpc_xprt_debugfs_register(xprt); 1992 1993 trace_xprt_create(xprt); 1994 out: 1995 return xprt; 1996 } 1997 1998 static void xprt_destroy_cb(struct work_struct *work) 1999 { 2000 struct rpc_xprt *xprt = 2001 container_of(work, struct rpc_xprt, task_cleanup); 2002 2003 trace_xprt_destroy(xprt); 2004 2005 rpc_xprt_debugfs_unregister(xprt); 2006 rpc_destroy_wait_queue(&xprt->binding); 2007 rpc_destroy_wait_queue(&xprt->pending); 2008 rpc_destroy_wait_queue(&xprt->sending); 2009 rpc_destroy_wait_queue(&xprt->backlog); 2010 kfree(xprt->servername); 2011 /* 2012 * Destroy any existing back channel 2013 */ 2014 xprt_destroy_backchannel(xprt, UINT_MAX); 2015 2016 /* 2017 * Tear down transport state and free the rpc_xprt 2018 */ 2019 xprt->ops->destroy(xprt); 2020 } 2021 2022 /** 2023 * xprt_destroy - destroy an RPC transport, killing off all requests. 2024 * @xprt: transport to destroy 2025 * 2026 */ 2027 static void xprt_destroy(struct rpc_xprt *xprt) 2028 { 2029 /* 2030 * Exclude transport connect/disconnect handlers and autoclose 2031 */ 2032 wait_on_bit_lock(&xprt->state, XPRT_LOCKED, TASK_UNINTERRUPTIBLE); 2033 2034 del_timer_sync(&xprt->timer); 2035 2036 /* 2037 * Destroy sockets etc from the system workqueue so they can 2038 * safely flush receive work running on rpciod. 2039 */ 2040 INIT_WORK(&xprt->task_cleanup, xprt_destroy_cb); 2041 schedule_work(&xprt->task_cleanup); 2042 } 2043 2044 static void xprt_destroy_kref(struct kref *kref) 2045 { 2046 xprt_destroy(container_of(kref, struct rpc_xprt, kref)); 2047 } 2048 2049 /** 2050 * xprt_get - return a reference to an RPC transport. 2051 * @xprt: pointer to the transport 2052 * 2053 */ 2054 struct rpc_xprt *xprt_get(struct rpc_xprt *xprt) 2055 { 2056 if (xprt != NULL && kref_get_unless_zero(&xprt->kref)) 2057 return xprt; 2058 return NULL; 2059 } 2060 EXPORT_SYMBOL_GPL(xprt_get); 2061 2062 /** 2063 * xprt_put - release a reference to an RPC transport. 2064 * @xprt: pointer to the transport 2065 * 2066 */ 2067 void xprt_put(struct rpc_xprt *xprt) 2068 { 2069 if (xprt != NULL) 2070 kref_put(&xprt->kref, xprt_destroy_kref); 2071 } 2072 EXPORT_SYMBOL_GPL(xprt_put); 2073