1 /* 2 * linux/net/sunrpc/xprt.c 3 * 4 * This is a generic RPC call interface supporting congestion avoidance, 5 * and asynchronous calls. 6 * 7 * The interface works like this: 8 * 9 * - When a process places a call, it allocates a request slot if 10 * one is available. Otherwise, it sleeps on the backlog queue 11 * (xprt_reserve). 12 * - Next, the caller puts together the RPC message, stuffs it into 13 * the request struct, and calls xprt_transmit(). 14 * - xprt_transmit sends the message and installs the caller on the 15 * transport's wait list. At the same time, if a reply is expected, 16 * it installs a timer that is run after the packet's timeout has 17 * expired. 18 * - When a packet arrives, the data_ready handler walks the list of 19 * pending requests for that transport. If a matching XID is found, the 20 * caller is woken up, and the timer removed. 21 * - When no reply arrives within the timeout interval, the timer is 22 * fired by the kernel and runs xprt_timer(). It either adjusts the 23 * timeout values (minor timeout) or wakes up the caller with a status 24 * of -ETIMEDOUT. 25 * - When the caller receives a notification from RPC that a reply arrived, 26 * it should release the RPC slot, and process the reply. 27 * If the call timed out, it may choose to retry the operation by 28 * adjusting the initial timeout value, and simply calling rpc_call 29 * again. 30 * 31 * Support for async RPC is done through a set of RPC-specific scheduling 32 * primitives that `transparently' work for processes as well as async 33 * tasks that rely on callbacks. 34 * 35 * Copyright (C) 1995-1997, Olaf Kirch <okir@monad.swb.de> 36 * 37 * Transport switch API copyright (C) 2005, Chuck Lever <cel@netapp.com> 38 */ 39 40 #include <linux/module.h> 41 42 #include <linux/types.h> 43 #include <linux/interrupt.h> 44 #include <linux/workqueue.h> 45 #include <linux/net.h> 46 #include <linux/ktime.h> 47 48 #include <linux/sunrpc/clnt.h> 49 #include <linux/sunrpc/metrics.h> 50 #include <linux/sunrpc/bc_xprt.h> 51 #include <linux/rcupdate.h> 52 #include <linux/sched/mm.h> 53 54 #include <trace/events/sunrpc.h> 55 56 #include "sunrpc.h" 57 58 /* 59 * Local variables 60 */ 61 62 #if IS_ENABLED(CONFIG_SUNRPC_DEBUG) 63 # define RPCDBG_FACILITY RPCDBG_XPRT 64 #endif 65 66 /* 67 * Local functions 68 */ 69 static void xprt_init(struct rpc_xprt *xprt, struct net *net); 70 static __be32 xprt_alloc_xid(struct rpc_xprt *xprt); 71 static void xprt_destroy(struct rpc_xprt *xprt); 72 73 static DEFINE_SPINLOCK(xprt_list_lock); 74 static LIST_HEAD(xprt_list); 75 76 /** 77 * xprt_register_transport - register a transport implementation 78 * @transport: transport to register 79 * 80 * If a transport implementation is loaded as a kernel module, it can 81 * call this interface to make itself known to the RPC client. 82 * 83 * Returns: 84 * 0: transport successfully registered 85 * -EEXIST: transport already registered 86 * -EINVAL: transport module being unloaded 87 */ 88 int xprt_register_transport(struct xprt_class *transport) 89 { 90 struct xprt_class *t; 91 int result; 92 93 result = -EEXIST; 94 spin_lock(&xprt_list_lock); 95 list_for_each_entry(t, &xprt_list, list) { 96 /* don't register the same transport class twice */ 97 if (t->ident == transport->ident) 98 goto out; 99 } 100 101 list_add_tail(&transport->list, &xprt_list); 102 printk(KERN_INFO "RPC: Registered %s transport module.\n", 103 transport->name); 104 result = 0; 105 106 out: 107 spin_unlock(&xprt_list_lock); 108 return result; 109 } 110 EXPORT_SYMBOL_GPL(xprt_register_transport); 111 112 /** 113 * xprt_unregister_transport - unregister a transport implementation 114 * @transport: transport to unregister 115 * 116 * Returns: 117 * 0: transport successfully unregistered 118 * -ENOENT: transport never registered 119 */ 120 int xprt_unregister_transport(struct xprt_class *transport) 121 { 122 struct xprt_class *t; 123 int result; 124 125 result = 0; 126 spin_lock(&xprt_list_lock); 127 list_for_each_entry(t, &xprt_list, list) { 128 if (t == transport) { 129 printk(KERN_INFO 130 "RPC: Unregistered %s transport module.\n", 131 transport->name); 132 list_del_init(&transport->list); 133 goto out; 134 } 135 } 136 result = -ENOENT; 137 138 out: 139 spin_unlock(&xprt_list_lock); 140 return result; 141 } 142 EXPORT_SYMBOL_GPL(xprt_unregister_transport); 143 144 /** 145 * xprt_load_transport - load a transport implementation 146 * @transport_name: transport to load 147 * 148 * Returns: 149 * 0: transport successfully loaded 150 * -ENOENT: transport module not available 151 */ 152 int xprt_load_transport(const char *transport_name) 153 { 154 struct xprt_class *t; 155 int result; 156 157 result = 0; 158 spin_lock(&xprt_list_lock); 159 list_for_each_entry(t, &xprt_list, list) { 160 if (strcmp(t->name, transport_name) == 0) { 161 spin_unlock(&xprt_list_lock); 162 goto out; 163 } 164 } 165 spin_unlock(&xprt_list_lock); 166 result = request_module("xprt%s", transport_name); 167 out: 168 return result; 169 } 170 EXPORT_SYMBOL_GPL(xprt_load_transport); 171 172 static void xprt_clear_locked(struct rpc_xprt *xprt) 173 { 174 xprt->snd_task = NULL; 175 if (!test_bit(XPRT_CLOSE_WAIT, &xprt->state)) { 176 smp_mb__before_atomic(); 177 clear_bit(XPRT_LOCKED, &xprt->state); 178 smp_mb__after_atomic(); 179 } else 180 queue_work(xprtiod_workqueue, &xprt->task_cleanup); 181 } 182 183 /** 184 * xprt_reserve_xprt - serialize write access to transports 185 * @task: task that is requesting access to the transport 186 * @xprt: pointer to the target transport 187 * 188 * This prevents mixing the payload of separate requests, and prevents 189 * transport connects from colliding with writes. No congestion control 190 * is provided. 191 */ 192 int xprt_reserve_xprt(struct rpc_xprt *xprt, struct rpc_task *task) 193 { 194 struct rpc_rqst *req = task->tk_rqstp; 195 196 if (test_and_set_bit(XPRT_LOCKED, &xprt->state)) { 197 if (task == xprt->snd_task) 198 return 1; 199 goto out_sleep; 200 } 201 if (test_bit(XPRT_WRITE_SPACE, &xprt->state)) 202 goto out_unlock; 203 xprt->snd_task = task; 204 205 return 1; 206 207 out_unlock: 208 xprt_clear_locked(xprt); 209 out_sleep: 210 dprintk("RPC: %5u failed to lock transport %p\n", 211 task->tk_pid, xprt); 212 task->tk_timeout = RPC_IS_SOFT(task) ? req->rq_timeout : 0; 213 task->tk_status = -EAGAIN; 214 rpc_sleep_on(&xprt->sending, task, NULL); 215 return 0; 216 } 217 EXPORT_SYMBOL_GPL(xprt_reserve_xprt); 218 219 static bool 220 xprt_need_congestion_window_wait(struct rpc_xprt *xprt) 221 { 222 return test_bit(XPRT_CWND_WAIT, &xprt->state); 223 } 224 225 static void 226 xprt_set_congestion_window_wait(struct rpc_xprt *xprt) 227 { 228 if (!list_empty(&xprt->xmit_queue)) { 229 /* Peek at head of queue to see if it can make progress */ 230 if (list_first_entry(&xprt->xmit_queue, struct rpc_rqst, 231 rq_xmit)->rq_cong) 232 return; 233 } 234 set_bit(XPRT_CWND_WAIT, &xprt->state); 235 } 236 237 static void 238 xprt_test_and_clear_congestion_window_wait(struct rpc_xprt *xprt) 239 { 240 if (!RPCXPRT_CONGESTED(xprt)) 241 clear_bit(XPRT_CWND_WAIT, &xprt->state); 242 } 243 244 /* 245 * xprt_reserve_xprt_cong - serialize write access to transports 246 * @task: task that is requesting access to the transport 247 * 248 * Same as xprt_reserve_xprt, but Van Jacobson congestion control is 249 * integrated into the decision of whether a request is allowed to be 250 * woken up and given access to the transport. 251 * Note that the lock is only granted if we know there are free slots. 252 */ 253 int xprt_reserve_xprt_cong(struct rpc_xprt *xprt, struct rpc_task *task) 254 { 255 struct rpc_rqst *req = task->tk_rqstp; 256 257 if (test_and_set_bit(XPRT_LOCKED, &xprt->state)) { 258 if (task == xprt->snd_task) 259 return 1; 260 goto out_sleep; 261 } 262 if (req == NULL) { 263 xprt->snd_task = task; 264 return 1; 265 } 266 if (test_bit(XPRT_WRITE_SPACE, &xprt->state)) 267 goto out_unlock; 268 if (!xprt_need_congestion_window_wait(xprt)) { 269 xprt->snd_task = task; 270 return 1; 271 } 272 out_unlock: 273 xprt_clear_locked(xprt); 274 out_sleep: 275 dprintk("RPC: %5u failed to lock transport %p\n", task->tk_pid, xprt); 276 task->tk_timeout = RPC_IS_SOFT(task) ? req->rq_timeout : 0; 277 task->tk_status = -EAGAIN; 278 rpc_sleep_on(&xprt->sending, task, NULL); 279 return 0; 280 } 281 EXPORT_SYMBOL_GPL(xprt_reserve_xprt_cong); 282 283 static inline int xprt_lock_write(struct rpc_xprt *xprt, struct rpc_task *task) 284 { 285 int retval; 286 287 if (test_bit(XPRT_LOCKED, &xprt->state) && xprt->snd_task == task) 288 return 1; 289 spin_lock_bh(&xprt->transport_lock); 290 retval = xprt->ops->reserve_xprt(xprt, task); 291 spin_unlock_bh(&xprt->transport_lock); 292 return retval; 293 } 294 295 static bool __xprt_lock_write_func(struct rpc_task *task, void *data) 296 { 297 struct rpc_xprt *xprt = data; 298 299 xprt->snd_task = task; 300 return true; 301 } 302 303 static void __xprt_lock_write_next(struct rpc_xprt *xprt) 304 { 305 if (test_and_set_bit(XPRT_LOCKED, &xprt->state)) 306 return; 307 if (test_bit(XPRT_WRITE_SPACE, &xprt->state)) 308 goto out_unlock; 309 if (rpc_wake_up_first_on_wq(xprtiod_workqueue, &xprt->sending, 310 __xprt_lock_write_func, xprt)) 311 return; 312 out_unlock: 313 xprt_clear_locked(xprt); 314 } 315 316 static void __xprt_lock_write_next_cong(struct rpc_xprt *xprt) 317 { 318 if (test_and_set_bit(XPRT_LOCKED, &xprt->state)) 319 return; 320 if (test_bit(XPRT_WRITE_SPACE, &xprt->state)) 321 goto out_unlock; 322 if (xprt_need_congestion_window_wait(xprt)) 323 goto out_unlock; 324 if (rpc_wake_up_first_on_wq(xprtiod_workqueue, &xprt->sending, 325 __xprt_lock_write_func, xprt)) 326 return; 327 out_unlock: 328 xprt_clear_locked(xprt); 329 } 330 331 /** 332 * xprt_release_xprt - allow other requests to use a transport 333 * @xprt: transport with other tasks potentially waiting 334 * @task: task that is releasing access to the transport 335 * 336 * Note that "task" can be NULL. No congestion control is provided. 337 */ 338 void xprt_release_xprt(struct rpc_xprt *xprt, struct rpc_task *task) 339 { 340 if (xprt->snd_task == task) { 341 xprt_clear_locked(xprt); 342 __xprt_lock_write_next(xprt); 343 } 344 } 345 EXPORT_SYMBOL_GPL(xprt_release_xprt); 346 347 /** 348 * xprt_release_xprt_cong - allow other requests to use a transport 349 * @xprt: transport with other tasks potentially waiting 350 * @task: task that is releasing access to the transport 351 * 352 * Note that "task" can be NULL. Another task is awoken to use the 353 * transport if the transport's congestion window allows it. 354 */ 355 void xprt_release_xprt_cong(struct rpc_xprt *xprt, struct rpc_task *task) 356 { 357 if (xprt->snd_task == task) { 358 xprt_clear_locked(xprt); 359 __xprt_lock_write_next_cong(xprt); 360 } 361 } 362 EXPORT_SYMBOL_GPL(xprt_release_xprt_cong); 363 364 static inline void xprt_release_write(struct rpc_xprt *xprt, struct rpc_task *task) 365 { 366 if (xprt->snd_task != task) 367 return; 368 spin_lock_bh(&xprt->transport_lock); 369 xprt->ops->release_xprt(xprt, task); 370 spin_unlock_bh(&xprt->transport_lock); 371 } 372 373 /* 374 * Van Jacobson congestion avoidance. Check if the congestion window 375 * overflowed. Put the task to sleep if this is the case. 376 */ 377 static int 378 __xprt_get_cong(struct rpc_xprt *xprt, struct rpc_rqst *req) 379 { 380 if (req->rq_cong) 381 return 1; 382 dprintk("RPC: %5u xprt_cwnd_limited cong = %lu cwnd = %lu\n", 383 req->rq_task->tk_pid, xprt->cong, xprt->cwnd); 384 if (RPCXPRT_CONGESTED(xprt)) { 385 xprt_set_congestion_window_wait(xprt); 386 return 0; 387 } 388 req->rq_cong = 1; 389 xprt->cong += RPC_CWNDSCALE; 390 return 1; 391 } 392 393 /* 394 * Adjust the congestion window, and wake up the next task 395 * that has been sleeping due to congestion 396 */ 397 static void 398 __xprt_put_cong(struct rpc_xprt *xprt, struct rpc_rqst *req) 399 { 400 if (!req->rq_cong) 401 return; 402 req->rq_cong = 0; 403 xprt->cong -= RPC_CWNDSCALE; 404 xprt_test_and_clear_congestion_window_wait(xprt); 405 __xprt_lock_write_next_cong(xprt); 406 } 407 408 /** 409 * xprt_request_get_cong - Request congestion control credits 410 * @xprt: pointer to transport 411 * @req: pointer to RPC request 412 * 413 * Useful for transports that require congestion control. 414 */ 415 bool 416 xprt_request_get_cong(struct rpc_xprt *xprt, struct rpc_rqst *req) 417 { 418 bool ret = false; 419 420 if (req->rq_cong) 421 return true; 422 spin_lock_bh(&xprt->transport_lock); 423 ret = __xprt_get_cong(xprt, req) != 0; 424 spin_unlock_bh(&xprt->transport_lock); 425 return ret; 426 } 427 EXPORT_SYMBOL_GPL(xprt_request_get_cong); 428 429 /** 430 * xprt_release_rqst_cong - housekeeping when request is complete 431 * @task: RPC request that recently completed 432 * 433 * Useful for transports that require congestion control. 434 */ 435 void xprt_release_rqst_cong(struct rpc_task *task) 436 { 437 struct rpc_rqst *req = task->tk_rqstp; 438 439 __xprt_put_cong(req->rq_xprt, req); 440 } 441 EXPORT_SYMBOL_GPL(xprt_release_rqst_cong); 442 443 /* 444 * Clear the congestion window wait flag and wake up the next 445 * entry on xprt->sending 446 */ 447 static void 448 xprt_clear_congestion_window_wait(struct rpc_xprt *xprt) 449 { 450 if (test_and_clear_bit(XPRT_CWND_WAIT, &xprt->state)) { 451 spin_lock_bh(&xprt->transport_lock); 452 __xprt_lock_write_next_cong(xprt); 453 spin_unlock_bh(&xprt->transport_lock); 454 } 455 } 456 457 /** 458 * xprt_adjust_cwnd - adjust transport congestion window 459 * @xprt: pointer to xprt 460 * @task: recently completed RPC request used to adjust window 461 * @result: result code of completed RPC request 462 * 463 * The transport code maintains an estimate on the maximum number of out- 464 * standing RPC requests, using a smoothed version of the congestion 465 * avoidance implemented in 44BSD. This is basically the Van Jacobson 466 * congestion algorithm: If a retransmit occurs, the congestion window is 467 * halved; otherwise, it is incremented by 1/cwnd when 468 * 469 * - a reply is received and 470 * - a full number of requests are outstanding and 471 * - the congestion window hasn't been updated recently. 472 */ 473 void xprt_adjust_cwnd(struct rpc_xprt *xprt, struct rpc_task *task, int result) 474 { 475 struct rpc_rqst *req = task->tk_rqstp; 476 unsigned long cwnd = xprt->cwnd; 477 478 if (result >= 0 && cwnd <= xprt->cong) { 479 /* The (cwnd >> 1) term makes sure 480 * the result gets rounded properly. */ 481 cwnd += (RPC_CWNDSCALE * RPC_CWNDSCALE + (cwnd >> 1)) / cwnd; 482 if (cwnd > RPC_MAXCWND(xprt)) 483 cwnd = RPC_MAXCWND(xprt); 484 __xprt_lock_write_next_cong(xprt); 485 } else if (result == -ETIMEDOUT) { 486 cwnd >>= 1; 487 if (cwnd < RPC_CWNDSCALE) 488 cwnd = RPC_CWNDSCALE; 489 } 490 dprintk("RPC: cong %ld, cwnd was %ld, now %ld\n", 491 xprt->cong, xprt->cwnd, cwnd); 492 xprt->cwnd = cwnd; 493 __xprt_put_cong(xprt, req); 494 } 495 EXPORT_SYMBOL_GPL(xprt_adjust_cwnd); 496 497 /** 498 * xprt_wake_pending_tasks - wake all tasks on a transport's pending queue 499 * @xprt: transport with waiting tasks 500 * @status: result code to plant in each task before waking it 501 * 502 */ 503 void xprt_wake_pending_tasks(struct rpc_xprt *xprt, int status) 504 { 505 if (status < 0) 506 rpc_wake_up_status(&xprt->pending, status); 507 else 508 rpc_wake_up(&xprt->pending); 509 } 510 EXPORT_SYMBOL_GPL(xprt_wake_pending_tasks); 511 512 /** 513 * xprt_wait_for_buffer_space - wait for transport output buffer to clear 514 * @xprt: transport 515 * 516 * Note that we only set the timer for the case of RPC_IS_SOFT(), since 517 * we don't in general want to force a socket disconnection due to 518 * an incomplete RPC call transmission. 519 */ 520 void xprt_wait_for_buffer_space(struct rpc_xprt *xprt) 521 { 522 set_bit(XPRT_WRITE_SPACE, &xprt->state); 523 } 524 EXPORT_SYMBOL_GPL(xprt_wait_for_buffer_space); 525 526 static bool 527 xprt_clear_write_space_locked(struct rpc_xprt *xprt) 528 { 529 if (test_and_clear_bit(XPRT_WRITE_SPACE, &xprt->state)) { 530 __xprt_lock_write_next(xprt); 531 dprintk("RPC: write space: waking waiting task on " 532 "xprt %p\n", xprt); 533 return true; 534 } 535 return false; 536 } 537 538 /** 539 * xprt_write_space - wake the task waiting for transport output buffer space 540 * @xprt: transport with waiting tasks 541 * 542 * Can be called in a soft IRQ context, so xprt_write_space never sleeps. 543 */ 544 bool xprt_write_space(struct rpc_xprt *xprt) 545 { 546 bool ret; 547 548 if (!test_bit(XPRT_WRITE_SPACE, &xprt->state)) 549 return false; 550 spin_lock_bh(&xprt->transport_lock); 551 ret = xprt_clear_write_space_locked(xprt); 552 spin_unlock_bh(&xprt->transport_lock); 553 return ret; 554 } 555 EXPORT_SYMBOL_GPL(xprt_write_space); 556 557 /** 558 * xprt_set_retrans_timeout_def - set a request's retransmit timeout 559 * @task: task whose timeout is to be set 560 * 561 * Set a request's retransmit timeout based on the transport's 562 * default timeout parameters. Used by transports that don't adjust 563 * the retransmit timeout based on round-trip time estimation. 564 */ 565 void xprt_set_retrans_timeout_def(struct rpc_task *task) 566 { 567 task->tk_timeout = task->tk_rqstp->rq_timeout; 568 } 569 EXPORT_SYMBOL_GPL(xprt_set_retrans_timeout_def); 570 571 /** 572 * xprt_set_retrans_timeout_rtt - set a request's retransmit timeout 573 * @task: task whose timeout is to be set 574 * 575 * Set a request's retransmit timeout using the RTT estimator. 576 */ 577 void xprt_set_retrans_timeout_rtt(struct rpc_task *task) 578 { 579 int timer = task->tk_msg.rpc_proc->p_timer; 580 struct rpc_clnt *clnt = task->tk_client; 581 struct rpc_rtt *rtt = clnt->cl_rtt; 582 struct rpc_rqst *req = task->tk_rqstp; 583 unsigned long max_timeout = clnt->cl_timeout->to_maxval; 584 585 task->tk_timeout = rpc_calc_rto(rtt, timer); 586 task->tk_timeout <<= rpc_ntimeo(rtt, timer) + req->rq_retries; 587 if (task->tk_timeout > max_timeout || task->tk_timeout == 0) 588 task->tk_timeout = max_timeout; 589 } 590 EXPORT_SYMBOL_GPL(xprt_set_retrans_timeout_rtt); 591 592 static void xprt_reset_majortimeo(struct rpc_rqst *req) 593 { 594 const struct rpc_timeout *to = req->rq_task->tk_client->cl_timeout; 595 596 req->rq_majortimeo = req->rq_timeout; 597 if (to->to_exponential) 598 req->rq_majortimeo <<= to->to_retries; 599 else 600 req->rq_majortimeo += to->to_increment * to->to_retries; 601 if (req->rq_majortimeo > to->to_maxval || req->rq_majortimeo == 0) 602 req->rq_majortimeo = to->to_maxval; 603 req->rq_majortimeo += jiffies; 604 } 605 606 /** 607 * xprt_adjust_timeout - adjust timeout values for next retransmit 608 * @req: RPC request containing parameters to use for the adjustment 609 * 610 */ 611 int xprt_adjust_timeout(struct rpc_rqst *req) 612 { 613 struct rpc_xprt *xprt = req->rq_xprt; 614 const struct rpc_timeout *to = req->rq_task->tk_client->cl_timeout; 615 int status = 0; 616 617 if (time_before(jiffies, req->rq_majortimeo)) { 618 if (to->to_exponential) 619 req->rq_timeout <<= 1; 620 else 621 req->rq_timeout += to->to_increment; 622 if (to->to_maxval && req->rq_timeout >= to->to_maxval) 623 req->rq_timeout = to->to_maxval; 624 req->rq_retries++; 625 } else { 626 req->rq_timeout = to->to_initval; 627 req->rq_retries = 0; 628 xprt_reset_majortimeo(req); 629 /* Reset the RTT counters == "slow start" */ 630 spin_lock_bh(&xprt->transport_lock); 631 rpc_init_rtt(req->rq_task->tk_client->cl_rtt, to->to_initval); 632 spin_unlock_bh(&xprt->transport_lock); 633 status = -ETIMEDOUT; 634 } 635 636 if (req->rq_timeout == 0) { 637 printk(KERN_WARNING "xprt_adjust_timeout: rq_timeout = 0!\n"); 638 req->rq_timeout = 5 * HZ; 639 } 640 return status; 641 } 642 643 static void xprt_autoclose(struct work_struct *work) 644 { 645 struct rpc_xprt *xprt = 646 container_of(work, struct rpc_xprt, task_cleanup); 647 unsigned int pflags = memalloc_nofs_save(); 648 649 clear_bit(XPRT_CLOSE_WAIT, &xprt->state); 650 xprt->ops->close(xprt); 651 xprt_release_write(xprt, NULL); 652 wake_up_bit(&xprt->state, XPRT_LOCKED); 653 memalloc_nofs_restore(pflags); 654 } 655 656 /** 657 * xprt_disconnect_done - mark a transport as disconnected 658 * @xprt: transport to flag for disconnect 659 * 660 */ 661 void xprt_disconnect_done(struct rpc_xprt *xprt) 662 { 663 dprintk("RPC: disconnected transport %p\n", xprt); 664 spin_lock_bh(&xprt->transport_lock); 665 xprt_clear_connected(xprt); 666 xprt_clear_write_space_locked(xprt); 667 xprt_wake_pending_tasks(xprt, -ENOTCONN); 668 spin_unlock_bh(&xprt->transport_lock); 669 } 670 EXPORT_SYMBOL_GPL(xprt_disconnect_done); 671 672 /** 673 * xprt_force_disconnect - force a transport to disconnect 674 * @xprt: transport to disconnect 675 * 676 */ 677 void xprt_force_disconnect(struct rpc_xprt *xprt) 678 { 679 /* Don't race with the test_bit() in xprt_clear_locked() */ 680 spin_lock_bh(&xprt->transport_lock); 681 set_bit(XPRT_CLOSE_WAIT, &xprt->state); 682 /* Try to schedule an autoclose RPC call */ 683 if (test_and_set_bit(XPRT_LOCKED, &xprt->state) == 0) 684 queue_work(xprtiod_workqueue, &xprt->task_cleanup); 685 else if (xprt->snd_task) 686 rpc_wake_up_queued_task_set_status(&xprt->pending, 687 xprt->snd_task, -ENOTCONN); 688 spin_unlock_bh(&xprt->transport_lock); 689 } 690 EXPORT_SYMBOL_GPL(xprt_force_disconnect); 691 692 static unsigned int 693 xprt_connect_cookie(struct rpc_xprt *xprt) 694 { 695 return READ_ONCE(xprt->connect_cookie); 696 } 697 698 static bool 699 xprt_request_retransmit_after_disconnect(struct rpc_task *task) 700 { 701 struct rpc_rqst *req = task->tk_rqstp; 702 struct rpc_xprt *xprt = req->rq_xprt; 703 704 return req->rq_connect_cookie != xprt_connect_cookie(xprt) || 705 !xprt_connected(xprt); 706 } 707 708 /** 709 * xprt_conditional_disconnect - force a transport to disconnect 710 * @xprt: transport to disconnect 711 * @cookie: 'connection cookie' 712 * 713 * This attempts to break the connection if and only if 'cookie' matches 714 * the current transport 'connection cookie'. It ensures that we don't 715 * try to break the connection more than once when we need to retransmit 716 * a batch of RPC requests. 717 * 718 */ 719 void xprt_conditional_disconnect(struct rpc_xprt *xprt, unsigned int cookie) 720 { 721 /* Don't race with the test_bit() in xprt_clear_locked() */ 722 spin_lock_bh(&xprt->transport_lock); 723 if (cookie != xprt->connect_cookie) 724 goto out; 725 if (test_bit(XPRT_CLOSING, &xprt->state)) 726 goto out; 727 set_bit(XPRT_CLOSE_WAIT, &xprt->state); 728 /* Try to schedule an autoclose RPC call */ 729 if (test_and_set_bit(XPRT_LOCKED, &xprt->state) == 0) 730 queue_work(xprtiod_workqueue, &xprt->task_cleanup); 731 xprt_wake_pending_tasks(xprt, -EAGAIN); 732 out: 733 spin_unlock_bh(&xprt->transport_lock); 734 } 735 736 static bool 737 xprt_has_timer(const struct rpc_xprt *xprt) 738 { 739 return xprt->idle_timeout != 0; 740 } 741 742 static void 743 xprt_schedule_autodisconnect(struct rpc_xprt *xprt) 744 __must_hold(&xprt->transport_lock) 745 { 746 if (RB_EMPTY_ROOT(&xprt->recv_queue) && xprt_has_timer(xprt)) 747 mod_timer(&xprt->timer, xprt->last_used + xprt->idle_timeout); 748 } 749 750 static void 751 xprt_init_autodisconnect(struct timer_list *t) 752 { 753 struct rpc_xprt *xprt = from_timer(xprt, t, timer); 754 755 spin_lock(&xprt->transport_lock); 756 if (!RB_EMPTY_ROOT(&xprt->recv_queue)) 757 goto out_abort; 758 /* Reset xprt->last_used to avoid connect/autodisconnect cycling */ 759 xprt->last_used = jiffies; 760 if (test_and_set_bit(XPRT_LOCKED, &xprt->state)) 761 goto out_abort; 762 spin_unlock(&xprt->transport_lock); 763 queue_work(xprtiod_workqueue, &xprt->task_cleanup); 764 return; 765 out_abort: 766 spin_unlock(&xprt->transport_lock); 767 } 768 769 bool xprt_lock_connect(struct rpc_xprt *xprt, 770 struct rpc_task *task, 771 void *cookie) 772 { 773 bool ret = false; 774 775 spin_lock_bh(&xprt->transport_lock); 776 if (!test_bit(XPRT_LOCKED, &xprt->state)) 777 goto out; 778 if (xprt->snd_task != task) 779 goto out; 780 xprt->snd_task = cookie; 781 ret = true; 782 out: 783 spin_unlock_bh(&xprt->transport_lock); 784 return ret; 785 } 786 787 void xprt_unlock_connect(struct rpc_xprt *xprt, void *cookie) 788 { 789 spin_lock_bh(&xprt->transport_lock); 790 if (xprt->snd_task != cookie) 791 goto out; 792 if (!test_bit(XPRT_LOCKED, &xprt->state)) 793 goto out; 794 xprt->snd_task =NULL; 795 xprt->ops->release_xprt(xprt, NULL); 796 xprt_schedule_autodisconnect(xprt); 797 out: 798 spin_unlock_bh(&xprt->transport_lock); 799 wake_up_bit(&xprt->state, XPRT_LOCKED); 800 } 801 802 /** 803 * xprt_connect - schedule a transport connect operation 804 * @task: RPC task that is requesting the connect 805 * 806 */ 807 void xprt_connect(struct rpc_task *task) 808 { 809 struct rpc_xprt *xprt = task->tk_rqstp->rq_xprt; 810 811 dprintk("RPC: %5u xprt_connect xprt %p %s connected\n", task->tk_pid, 812 xprt, (xprt_connected(xprt) ? "is" : "is not")); 813 814 if (!xprt_bound(xprt)) { 815 task->tk_status = -EAGAIN; 816 return; 817 } 818 if (!xprt_lock_write(xprt, task)) 819 return; 820 821 if (test_and_clear_bit(XPRT_CLOSE_WAIT, &xprt->state)) 822 xprt->ops->close(xprt); 823 824 if (!xprt_connected(xprt)) { 825 task->tk_timeout = task->tk_rqstp->rq_timeout; 826 task->tk_rqstp->rq_connect_cookie = xprt->connect_cookie; 827 rpc_sleep_on(&xprt->pending, task, NULL); 828 829 if (test_bit(XPRT_CLOSING, &xprt->state)) 830 return; 831 if (xprt_test_and_set_connecting(xprt)) 832 return; 833 /* Race breaker */ 834 if (!xprt_connected(xprt)) { 835 xprt->stat.connect_start = jiffies; 836 xprt->ops->connect(xprt, task); 837 } else { 838 xprt_clear_connecting(xprt); 839 task->tk_status = 0; 840 rpc_wake_up_queued_task(&xprt->pending, task); 841 } 842 } 843 xprt_release_write(xprt, task); 844 } 845 846 enum xprt_xid_rb_cmp { 847 XID_RB_EQUAL, 848 XID_RB_LEFT, 849 XID_RB_RIGHT, 850 }; 851 static enum xprt_xid_rb_cmp 852 xprt_xid_cmp(__be32 xid1, __be32 xid2) 853 { 854 if (xid1 == xid2) 855 return XID_RB_EQUAL; 856 if ((__force u32)xid1 < (__force u32)xid2) 857 return XID_RB_LEFT; 858 return XID_RB_RIGHT; 859 } 860 861 static struct rpc_rqst * 862 xprt_request_rb_find(struct rpc_xprt *xprt, __be32 xid) 863 { 864 struct rb_node *n = xprt->recv_queue.rb_node; 865 struct rpc_rqst *req; 866 867 while (n != NULL) { 868 req = rb_entry(n, struct rpc_rqst, rq_recv); 869 switch (xprt_xid_cmp(xid, req->rq_xid)) { 870 case XID_RB_LEFT: 871 n = n->rb_left; 872 break; 873 case XID_RB_RIGHT: 874 n = n->rb_right; 875 break; 876 case XID_RB_EQUAL: 877 return req; 878 } 879 } 880 return NULL; 881 } 882 883 static void 884 xprt_request_rb_insert(struct rpc_xprt *xprt, struct rpc_rqst *new) 885 { 886 struct rb_node **p = &xprt->recv_queue.rb_node; 887 struct rb_node *n = NULL; 888 struct rpc_rqst *req; 889 890 while (*p != NULL) { 891 n = *p; 892 req = rb_entry(n, struct rpc_rqst, rq_recv); 893 switch(xprt_xid_cmp(new->rq_xid, req->rq_xid)) { 894 case XID_RB_LEFT: 895 p = &n->rb_left; 896 break; 897 case XID_RB_RIGHT: 898 p = &n->rb_right; 899 break; 900 case XID_RB_EQUAL: 901 WARN_ON_ONCE(new != req); 902 return; 903 } 904 } 905 rb_link_node(&new->rq_recv, n, p); 906 rb_insert_color(&new->rq_recv, &xprt->recv_queue); 907 } 908 909 static void 910 xprt_request_rb_remove(struct rpc_xprt *xprt, struct rpc_rqst *req) 911 { 912 rb_erase(&req->rq_recv, &xprt->recv_queue); 913 } 914 915 /** 916 * xprt_lookup_rqst - find an RPC request corresponding to an XID 917 * @xprt: transport on which the original request was transmitted 918 * @xid: RPC XID of incoming reply 919 * 920 * Caller holds xprt->queue_lock. 921 */ 922 struct rpc_rqst *xprt_lookup_rqst(struct rpc_xprt *xprt, __be32 xid) 923 { 924 struct rpc_rqst *entry; 925 926 entry = xprt_request_rb_find(xprt, xid); 927 if (entry != NULL) { 928 trace_xprt_lookup_rqst(xprt, xid, 0); 929 entry->rq_rtt = ktime_sub(ktime_get(), entry->rq_xtime); 930 return entry; 931 } 932 933 dprintk("RPC: xprt_lookup_rqst did not find xid %08x\n", 934 ntohl(xid)); 935 trace_xprt_lookup_rqst(xprt, xid, -ENOENT); 936 xprt->stat.bad_xids++; 937 return NULL; 938 } 939 EXPORT_SYMBOL_GPL(xprt_lookup_rqst); 940 941 static bool 942 xprt_is_pinned_rqst(struct rpc_rqst *req) 943 { 944 return atomic_read(&req->rq_pin) != 0; 945 } 946 947 /** 948 * xprt_pin_rqst - Pin a request on the transport receive list 949 * @req: Request to pin 950 * 951 * Caller must ensure this is atomic with the call to xprt_lookup_rqst() 952 * so should be holding the xprt receive lock. 953 */ 954 void xprt_pin_rqst(struct rpc_rqst *req) 955 { 956 atomic_inc(&req->rq_pin); 957 } 958 EXPORT_SYMBOL_GPL(xprt_pin_rqst); 959 960 /** 961 * xprt_unpin_rqst - Unpin a request on the transport receive list 962 * @req: Request to pin 963 * 964 * Caller should be holding the xprt receive lock. 965 */ 966 void xprt_unpin_rqst(struct rpc_rqst *req) 967 { 968 if (!test_bit(RPC_TASK_MSG_PIN_WAIT, &req->rq_task->tk_runstate)) { 969 atomic_dec(&req->rq_pin); 970 return; 971 } 972 if (atomic_dec_and_test(&req->rq_pin)) 973 wake_up_var(&req->rq_pin); 974 } 975 EXPORT_SYMBOL_GPL(xprt_unpin_rqst); 976 977 static void xprt_wait_on_pinned_rqst(struct rpc_rqst *req) 978 { 979 wait_var_event(&req->rq_pin, !xprt_is_pinned_rqst(req)); 980 } 981 982 static bool 983 xprt_request_data_received(struct rpc_task *task) 984 { 985 return !test_bit(RPC_TASK_NEED_RECV, &task->tk_runstate) && 986 READ_ONCE(task->tk_rqstp->rq_reply_bytes_recvd) != 0; 987 } 988 989 static bool 990 xprt_request_need_enqueue_receive(struct rpc_task *task, struct rpc_rqst *req) 991 { 992 return !test_bit(RPC_TASK_NEED_RECV, &task->tk_runstate) && 993 READ_ONCE(task->tk_rqstp->rq_reply_bytes_recvd) == 0; 994 } 995 996 /** 997 * xprt_request_enqueue_receive - Add an request to the receive queue 998 * @task: RPC task 999 * 1000 */ 1001 void 1002 xprt_request_enqueue_receive(struct rpc_task *task) 1003 { 1004 struct rpc_rqst *req = task->tk_rqstp; 1005 struct rpc_xprt *xprt = req->rq_xprt; 1006 1007 if (!xprt_request_need_enqueue_receive(task, req)) 1008 return; 1009 spin_lock(&xprt->queue_lock); 1010 1011 /* Update the softirq receive buffer */ 1012 memcpy(&req->rq_private_buf, &req->rq_rcv_buf, 1013 sizeof(req->rq_private_buf)); 1014 1015 /* Add request to the receive list */ 1016 xprt_request_rb_insert(xprt, req); 1017 set_bit(RPC_TASK_NEED_RECV, &task->tk_runstate); 1018 spin_unlock(&xprt->queue_lock); 1019 1020 xprt_reset_majortimeo(req); 1021 /* Turn off autodisconnect */ 1022 del_singleshot_timer_sync(&xprt->timer); 1023 } 1024 1025 /** 1026 * xprt_request_dequeue_receive_locked - Remove a request from the receive queue 1027 * @task: RPC task 1028 * 1029 * Caller must hold xprt->queue_lock. 1030 */ 1031 static void 1032 xprt_request_dequeue_receive_locked(struct rpc_task *task) 1033 { 1034 struct rpc_rqst *req = task->tk_rqstp; 1035 1036 if (test_and_clear_bit(RPC_TASK_NEED_RECV, &task->tk_runstate)) 1037 xprt_request_rb_remove(req->rq_xprt, req); 1038 } 1039 1040 /** 1041 * xprt_update_rtt - Update RPC RTT statistics 1042 * @task: RPC request that recently completed 1043 * 1044 * Caller holds xprt->queue_lock. 1045 */ 1046 void xprt_update_rtt(struct rpc_task *task) 1047 { 1048 struct rpc_rqst *req = task->tk_rqstp; 1049 struct rpc_rtt *rtt = task->tk_client->cl_rtt; 1050 unsigned int timer = task->tk_msg.rpc_proc->p_timer; 1051 long m = usecs_to_jiffies(ktime_to_us(req->rq_rtt)); 1052 1053 if (timer) { 1054 if (req->rq_ntrans == 1) 1055 rpc_update_rtt(rtt, timer, m); 1056 rpc_set_timeo(rtt, timer, req->rq_ntrans - 1); 1057 } 1058 } 1059 EXPORT_SYMBOL_GPL(xprt_update_rtt); 1060 1061 /** 1062 * xprt_complete_rqst - called when reply processing is complete 1063 * @task: RPC request that recently completed 1064 * @copied: actual number of bytes received from the transport 1065 * 1066 * Caller holds xprt->queue_lock. 1067 */ 1068 void xprt_complete_rqst(struct rpc_task *task, int copied) 1069 { 1070 struct rpc_rqst *req = task->tk_rqstp; 1071 struct rpc_xprt *xprt = req->rq_xprt; 1072 1073 dprintk("RPC: %5u xid %08x complete (%d bytes received)\n", 1074 task->tk_pid, ntohl(req->rq_xid), copied); 1075 trace_xprt_complete_rqst(xprt, req->rq_xid, copied); 1076 1077 xprt->stat.recvs++; 1078 1079 req->rq_private_buf.len = copied; 1080 /* Ensure all writes are done before we update */ 1081 /* req->rq_reply_bytes_recvd */ 1082 smp_wmb(); 1083 req->rq_reply_bytes_recvd = copied; 1084 xprt_request_dequeue_receive_locked(task); 1085 rpc_wake_up_queued_task(&xprt->pending, task); 1086 } 1087 EXPORT_SYMBOL_GPL(xprt_complete_rqst); 1088 1089 static void xprt_timer(struct rpc_task *task) 1090 { 1091 struct rpc_rqst *req = task->tk_rqstp; 1092 struct rpc_xprt *xprt = req->rq_xprt; 1093 1094 if (task->tk_status != -ETIMEDOUT) 1095 return; 1096 1097 trace_xprt_timer(xprt, req->rq_xid, task->tk_status); 1098 if (!req->rq_reply_bytes_recvd) { 1099 if (xprt->ops->timer) 1100 xprt->ops->timer(xprt, task); 1101 } else 1102 task->tk_status = 0; 1103 } 1104 1105 /** 1106 * xprt_request_wait_receive - wait for the reply to an RPC request 1107 * @task: RPC task about to send a request 1108 * 1109 */ 1110 void xprt_request_wait_receive(struct rpc_task *task) 1111 { 1112 struct rpc_rqst *req = task->tk_rqstp; 1113 struct rpc_xprt *xprt = req->rq_xprt; 1114 1115 if (!test_bit(RPC_TASK_NEED_RECV, &task->tk_runstate)) 1116 return; 1117 /* 1118 * Sleep on the pending queue if we're expecting a reply. 1119 * The spinlock ensures atomicity between the test of 1120 * req->rq_reply_bytes_recvd, and the call to rpc_sleep_on(). 1121 */ 1122 spin_lock(&xprt->queue_lock); 1123 if (test_bit(RPC_TASK_NEED_RECV, &task->tk_runstate)) { 1124 xprt->ops->set_retrans_timeout(task); 1125 rpc_sleep_on(&xprt->pending, task, xprt_timer); 1126 /* 1127 * Send an extra queue wakeup call if the 1128 * connection was dropped in case the call to 1129 * rpc_sleep_on() raced. 1130 */ 1131 if (xprt_request_retransmit_after_disconnect(task)) 1132 rpc_wake_up_queued_task_set_status(&xprt->pending, 1133 task, -ENOTCONN); 1134 } 1135 spin_unlock(&xprt->queue_lock); 1136 } 1137 1138 static bool 1139 xprt_request_need_enqueue_transmit(struct rpc_task *task, struct rpc_rqst *req) 1140 { 1141 return !test_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate); 1142 } 1143 1144 /** 1145 * xprt_request_enqueue_transmit - queue a task for transmission 1146 * @task: pointer to rpc_task 1147 * 1148 * Add a task to the transmission queue. 1149 */ 1150 void 1151 xprt_request_enqueue_transmit(struct rpc_task *task) 1152 { 1153 struct rpc_rqst *pos, *req = task->tk_rqstp; 1154 struct rpc_xprt *xprt = req->rq_xprt; 1155 1156 if (xprt_request_need_enqueue_transmit(task, req)) { 1157 req->rq_bytes_sent = 0; 1158 spin_lock(&xprt->queue_lock); 1159 /* 1160 * Requests that carry congestion control credits are added 1161 * to the head of the list to avoid starvation issues. 1162 */ 1163 if (req->rq_cong) { 1164 xprt_clear_congestion_window_wait(xprt); 1165 list_for_each_entry(pos, &xprt->xmit_queue, rq_xmit) { 1166 if (pos->rq_cong) 1167 continue; 1168 /* Note: req is added _before_ pos */ 1169 list_add_tail(&req->rq_xmit, &pos->rq_xmit); 1170 INIT_LIST_HEAD(&req->rq_xmit2); 1171 trace_xprt_enq_xmit(task, 1); 1172 goto out; 1173 } 1174 } else if (RPC_IS_SWAPPER(task)) { 1175 list_for_each_entry(pos, &xprt->xmit_queue, rq_xmit) { 1176 if (pos->rq_cong || pos->rq_bytes_sent) 1177 continue; 1178 if (RPC_IS_SWAPPER(pos->rq_task)) 1179 continue; 1180 /* Note: req is added _before_ pos */ 1181 list_add_tail(&req->rq_xmit, &pos->rq_xmit); 1182 INIT_LIST_HEAD(&req->rq_xmit2); 1183 trace_xprt_enq_xmit(task, 2); 1184 goto out; 1185 } 1186 } else if (!req->rq_seqno) { 1187 list_for_each_entry(pos, &xprt->xmit_queue, rq_xmit) { 1188 if (pos->rq_task->tk_owner != task->tk_owner) 1189 continue; 1190 list_add_tail(&req->rq_xmit2, &pos->rq_xmit2); 1191 INIT_LIST_HEAD(&req->rq_xmit); 1192 trace_xprt_enq_xmit(task, 3); 1193 goto out; 1194 } 1195 } 1196 list_add_tail(&req->rq_xmit, &xprt->xmit_queue); 1197 INIT_LIST_HEAD(&req->rq_xmit2); 1198 trace_xprt_enq_xmit(task, 4); 1199 out: 1200 set_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate); 1201 spin_unlock(&xprt->queue_lock); 1202 } 1203 } 1204 1205 /** 1206 * xprt_request_dequeue_transmit_locked - remove a task from the transmission queue 1207 * @task: pointer to rpc_task 1208 * 1209 * Remove a task from the transmission queue 1210 * Caller must hold xprt->queue_lock 1211 */ 1212 static void 1213 xprt_request_dequeue_transmit_locked(struct rpc_task *task) 1214 { 1215 struct rpc_rqst *req = task->tk_rqstp; 1216 1217 if (!test_and_clear_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate)) 1218 return; 1219 if (!list_empty(&req->rq_xmit)) { 1220 list_del(&req->rq_xmit); 1221 if (!list_empty(&req->rq_xmit2)) { 1222 struct rpc_rqst *next = list_first_entry(&req->rq_xmit2, 1223 struct rpc_rqst, rq_xmit2); 1224 list_del(&req->rq_xmit2); 1225 list_add_tail(&next->rq_xmit, &next->rq_xprt->xmit_queue); 1226 } 1227 } else 1228 list_del(&req->rq_xmit2); 1229 } 1230 1231 /** 1232 * xprt_request_dequeue_transmit - remove a task from the transmission queue 1233 * @task: pointer to rpc_task 1234 * 1235 * Remove a task from the transmission queue 1236 */ 1237 static void 1238 xprt_request_dequeue_transmit(struct rpc_task *task) 1239 { 1240 struct rpc_rqst *req = task->tk_rqstp; 1241 struct rpc_xprt *xprt = req->rq_xprt; 1242 1243 spin_lock(&xprt->queue_lock); 1244 xprt_request_dequeue_transmit_locked(task); 1245 spin_unlock(&xprt->queue_lock); 1246 } 1247 1248 /** 1249 * xprt_request_prepare - prepare an encoded request for transport 1250 * @req: pointer to rpc_rqst 1251 * 1252 * Calls into the transport layer to do whatever is needed to prepare 1253 * the request for transmission or receive. 1254 */ 1255 void 1256 xprt_request_prepare(struct rpc_rqst *req) 1257 { 1258 struct rpc_xprt *xprt = req->rq_xprt; 1259 1260 if (xprt->ops->prepare_request) 1261 xprt->ops->prepare_request(req); 1262 } 1263 1264 /** 1265 * xprt_request_need_retransmit - Test if a task needs retransmission 1266 * @task: pointer to rpc_task 1267 * 1268 * Test for whether a connection breakage requires the task to retransmit 1269 */ 1270 bool 1271 xprt_request_need_retransmit(struct rpc_task *task) 1272 { 1273 return xprt_request_retransmit_after_disconnect(task); 1274 } 1275 1276 /** 1277 * xprt_prepare_transmit - reserve the transport before sending a request 1278 * @task: RPC task about to send a request 1279 * 1280 */ 1281 bool xprt_prepare_transmit(struct rpc_task *task) 1282 { 1283 struct rpc_rqst *req = task->tk_rqstp; 1284 struct rpc_xprt *xprt = req->rq_xprt; 1285 1286 dprintk("RPC: %5u xprt_prepare_transmit\n", task->tk_pid); 1287 1288 if (!xprt_lock_write(xprt, task)) { 1289 /* Race breaker: someone may have transmitted us */ 1290 if (!test_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate)) 1291 rpc_wake_up_queued_task_set_status(&xprt->sending, 1292 task, 0); 1293 return false; 1294 1295 } 1296 return true; 1297 } 1298 1299 void xprt_end_transmit(struct rpc_task *task) 1300 { 1301 xprt_release_write(task->tk_rqstp->rq_xprt, task); 1302 } 1303 1304 /** 1305 * xprt_request_transmit - send an RPC request on a transport 1306 * @req: pointer to request to transmit 1307 * @snd_task: RPC task that owns the transport lock 1308 * 1309 * This performs the transmission of a single request. 1310 * Note that if the request is not the same as snd_task, then it 1311 * does need to be pinned. 1312 * Returns '0' on success. 1313 */ 1314 static int 1315 xprt_request_transmit(struct rpc_rqst *req, struct rpc_task *snd_task) 1316 { 1317 struct rpc_xprt *xprt = req->rq_xprt; 1318 struct rpc_task *task = req->rq_task; 1319 unsigned int connect_cookie; 1320 int is_retrans = RPC_WAS_SENT(task); 1321 int status; 1322 1323 if (!req->rq_bytes_sent) { 1324 if (xprt_request_data_received(task)) { 1325 status = 0; 1326 goto out_dequeue; 1327 } 1328 /* Verify that our message lies in the RPCSEC_GSS window */ 1329 if (rpcauth_xmit_need_reencode(task)) { 1330 status = -EBADMSG; 1331 goto out_dequeue; 1332 } 1333 if (task->tk_ops->rpc_call_prepare_transmit) { 1334 task->tk_ops->rpc_call_prepare_transmit(task, 1335 task->tk_calldata); 1336 status = task->tk_status; 1337 if (status < 0) 1338 goto out_dequeue; 1339 } 1340 } 1341 1342 /* 1343 * Update req->rq_ntrans before transmitting to avoid races with 1344 * xprt_update_rtt(), which needs to know that it is recording a 1345 * reply to the first transmission. 1346 */ 1347 req->rq_ntrans++; 1348 1349 connect_cookie = xprt->connect_cookie; 1350 status = xprt->ops->send_request(req); 1351 if (status != 0) { 1352 req->rq_ntrans--; 1353 trace_xprt_transmit(req, status); 1354 return status; 1355 } 1356 1357 if (is_retrans) 1358 task->tk_client->cl_stats->rpcretrans++; 1359 1360 xprt_inject_disconnect(xprt); 1361 1362 task->tk_flags |= RPC_TASK_SENT; 1363 spin_lock_bh(&xprt->transport_lock); 1364 1365 xprt->stat.sends++; 1366 xprt->stat.req_u += xprt->stat.sends - xprt->stat.recvs; 1367 xprt->stat.bklog_u += xprt->backlog.qlen; 1368 xprt->stat.sending_u += xprt->sending.qlen; 1369 xprt->stat.pending_u += xprt->pending.qlen; 1370 spin_unlock_bh(&xprt->transport_lock); 1371 1372 req->rq_connect_cookie = connect_cookie; 1373 out_dequeue: 1374 trace_xprt_transmit(req, status); 1375 xprt_request_dequeue_transmit(task); 1376 rpc_wake_up_queued_task_set_status(&xprt->sending, task, status); 1377 return status; 1378 } 1379 1380 /** 1381 * xprt_transmit - send an RPC request on a transport 1382 * @task: controlling RPC task 1383 * 1384 * Attempts to drain the transmit queue. On exit, either the transport 1385 * signalled an error that needs to be handled before transmission can 1386 * resume, or @task finished transmitting, and detected that it already 1387 * received a reply. 1388 */ 1389 void 1390 xprt_transmit(struct rpc_task *task) 1391 { 1392 struct rpc_rqst *next, *req = task->tk_rqstp; 1393 struct rpc_xprt *xprt = req->rq_xprt; 1394 int status; 1395 1396 spin_lock(&xprt->queue_lock); 1397 while (!list_empty(&xprt->xmit_queue)) { 1398 next = list_first_entry(&xprt->xmit_queue, 1399 struct rpc_rqst, rq_xmit); 1400 xprt_pin_rqst(next); 1401 spin_unlock(&xprt->queue_lock); 1402 status = xprt_request_transmit(next, task); 1403 if (status == -EBADMSG && next != req) 1404 status = 0; 1405 cond_resched(); 1406 spin_lock(&xprt->queue_lock); 1407 xprt_unpin_rqst(next); 1408 if (status == 0) { 1409 if (!xprt_request_data_received(task) || 1410 test_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate)) 1411 continue; 1412 } else if (test_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate)) 1413 task->tk_status = status; 1414 break; 1415 } 1416 spin_unlock(&xprt->queue_lock); 1417 } 1418 1419 static void xprt_add_backlog(struct rpc_xprt *xprt, struct rpc_task *task) 1420 { 1421 set_bit(XPRT_CONGESTED, &xprt->state); 1422 rpc_sleep_on(&xprt->backlog, task, NULL); 1423 } 1424 1425 static void xprt_wake_up_backlog(struct rpc_xprt *xprt) 1426 { 1427 if (rpc_wake_up_next(&xprt->backlog) == NULL) 1428 clear_bit(XPRT_CONGESTED, &xprt->state); 1429 } 1430 1431 static bool xprt_throttle_congested(struct rpc_xprt *xprt, struct rpc_task *task) 1432 { 1433 bool ret = false; 1434 1435 if (!test_bit(XPRT_CONGESTED, &xprt->state)) 1436 goto out; 1437 spin_lock(&xprt->reserve_lock); 1438 if (test_bit(XPRT_CONGESTED, &xprt->state)) { 1439 rpc_sleep_on(&xprt->backlog, task, NULL); 1440 ret = true; 1441 } 1442 spin_unlock(&xprt->reserve_lock); 1443 out: 1444 return ret; 1445 } 1446 1447 static struct rpc_rqst *xprt_dynamic_alloc_slot(struct rpc_xprt *xprt) 1448 { 1449 struct rpc_rqst *req = ERR_PTR(-EAGAIN); 1450 1451 if (xprt->num_reqs >= xprt->max_reqs) 1452 goto out; 1453 ++xprt->num_reqs; 1454 spin_unlock(&xprt->reserve_lock); 1455 req = kzalloc(sizeof(struct rpc_rqst), GFP_NOFS); 1456 spin_lock(&xprt->reserve_lock); 1457 if (req != NULL) 1458 goto out; 1459 --xprt->num_reqs; 1460 req = ERR_PTR(-ENOMEM); 1461 out: 1462 return req; 1463 } 1464 1465 static bool xprt_dynamic_free_slot(struct rpc_xprt *xprt, struct rpc_rqst *req) 1466 { 1467 if (xprt->num_reqs > xprt->min_reqs) { 1468 --xprt->num_reqs; 1469 kfree(req); 1470 return true; 1471 } 1472 return false; 1473 } 1474 1475 void xprt_alloc_slot(struct rpc_xprt *xprt, struct rpc_task *task) 1476 { 1477 struct rpc_rqst *req; 1478 1479 spin_lock(&xprt->reserve_lock); 1480 if (!list_empty(&xprt->free)) { 1481 req = list_entry(xprt->free.next, struct rpc_rqst, rq_list); 1482 list_del(&req->rq_list); 1483 goto out_init_req; 1484 } 1485 req = xprt_dynamic_alloc_slot(xprt); 1486 if (!IS_ERR(req)) 1487 goto out_init_req; 1488 switch (PTR_ERR(req)) { 1489 case -ENOMEM: 1490 dprintk("RPC: dynamic allocation of request slot " 1491 "failed! Retrying\n"); 1492 task->tk_status = -ENOMEM; 1493 break; 1494 case -EAGAIN: 1495 xprt_add_backlog(xprt, task); 1496 dprintk("RPC: waiting for request slot\n"); 1497 /* fall through */ 1498 default: 1499 task->tk_status = -EAGAIN; 1500 } 1501 spin_unlock(&xprt->reserve_lock); 1502 return; 1503 out_init_req: 1504 xprt->stat.max_slots = max_t(unsigned int, xprt->stat.max_slots, 1505 xprt->num_reqs); 1506 spin_unlock(&xprt->reserve_lock); 1507 1508 task->tk_status = 0; 1509 task->tk_rqstp = req; 1510 } 1511 EXPORT_SYMBOL_GPL(xprt_alloc_slot); 1512 1513 void xprt_free_slot(struct rpc_xprt *xprt, struct rpc_rqst *req) 1514 { 1515 spin_lock(&xprt->reserve_lock); 1516 if (!xprt_dynamic_free_slot(xprt, req)) { 1517 memset(req, 0, sizeof(*req)); /* mark unused */ 1518 list_add(&req->rq_list, &xprt->free); 1519 } 1520 xprt_wake_up_backlog(xprt); 1521 spin_unlock(&xprt->reserve_lock); 1522 } 1523 EXPORT_SYMBOL_GPL(xprt_free_slot); 1524 1525 static void xprt_free_all_slots(struct rpc_xprt *xprt) 1526 { 1527 struct rpc_rqst *req; 1528 while (!list_empty(&xprt->free)) { 1529 req = list_first_entry(&xprt->free, struct rpc_rqst, rq_list); 1530 list_del(&req->rq_list); 1531 kfree(req); 1532 } 1533 } 1534 1535 struct rpc_xprt *xprt_alloc(struct net *net, size_t size, 1536 unsigned int num_prealloc, 1537 unsigned int max_alloc) 1538 { 1539 struct rpc_xprt *xprt; 1540 struct rpc_rqst *req; 1541 int i; 1542 1543 xprt = kzalloc(size, GFP_KERNEL); 1544 if (xprt == NULL) 1545 goto out; 1546 1547 xprt_init(xprt, net); 1548 1549 for (i = 0; i < num_prealloc; i++) { 1550 req = kzalloc(sizeof(struct rpc_rqst), GFP_KERNEL); 1551 if (!req) 1552 goto out_free; 1553 list_add(&req->rq_list, &xprt->free); 1554 } 1555 if (max_alloc > num_prealloc) 1556 xprt->max_reqs = max_alloc; 1557 else 1558 xprt->max_reqs = num_prealloc; 1559 xprt->min_reqs = num_prealloc; 1560 xprt->num_reqs = num_prealloc; 1561 1562 return xprt; 1563 1564 out_free: 1565 xprt_free(xprt); 1566 out: 1567 return NULL; 1568 } 1569 EXPORT_SYMBOL_GPL(xprt_alloc); 1570 1571 void xprt_free(struct rpc_xprt *xprt) 1572 { 1573 put_net(xprt->xprt_net); 1574 xprt_free_all_slots(xprt); 1575 kfree_rcu(xprt, rcu); 1576 } 1577 EXPORT_SYMBOL_GPL(xprt_free); 1578 1579 static void 1580 xprt_init_connect_cookie(struct rpc_rqst *req, struct rpc_xprt *xprt) 1581 { 1582 req->rq_connect_cookie = xprt_connect_cookie(xprt) - 1; 1583 } 1584 1585 static __be32 1586 xprt_alloc_xid(struct rpc_xprt *xprt) 1587 { 1588 __be32 xid; 1589 1590 spin_lock(&xprt->reserve_lock); 1591 xid = (__force __be32)xprt->xid++; 1592 spin_unlock(&xprt->reserve_lock); 1593 return xid; 1594 } 1595 1596 static void 1597 xprt_init_xid(struct rpc_xprt *xprt) 1598 { 1599 xprt->xid = prandom_u32(); 1600 } 1601 1602 static void 1603 xprt_request_init(struct rpc_task *task) 1604 { 1605 struct rpc_xprt *xprt = task->tk_xprt; 1606 struct rpc_rqst *req = task->tk_rqstp; 1607 1608 req->rq_timeout = task->tk_client->cl_timeout->to_initval; 1609 req->rq_task = task; 1610 req->rq_xprt = xprt; 1611 req->rq_buffer = NULL; 1612 req->rq_xid = xprt_alloc_xid(xprt); 1613 xprt_init_connect_cookie(req, xprt); 1614 req->rq_snd_buf.len = 0; 1615 req->rq_snd_buf.buflen = 0; 1616 req->rq_rcv_buf.len = 0; 1617 req->rq_rcv_buf.buflen = 0; 1618 req->rq_snd_buf.bvec = NULL; 1619 req->rq_rcv_buf.bvec = NULL; 1620 req->rq_release_snd_buf = NULL; 1621 xprt_reset_majortimeo(req); 1622 dprintk("RPC: %5u reserved req %p xid %08x\n", task->tk_pid, 1623 req, ntohl(req->rq_xid)); 1624 } 1625 1626 static void 1627 xprt_do_reserve(struct rpc_xprt *xprt, struct rpc_task *task) 1628 { 1629 xprt->ops->alloc_slot(xprt, task); 1630 if (task->tk_rqstp != NULL) 1631 xprt_request_init(task); 1632 } 1633 1634 /** 1635 * xprt_reserve - allocate an RPC request slot 1636 * @task: RPC task requesting a slot allocation 1637 * 1638 * If the transport is marked as being congested, or if no more 1639 * slots are available, place the task on the transport's 1640 * backlog queue. 1641 */ 1642 void xprt_reserve(struct rpc_task *task) 1643 { 1644 struct rpc_xprt *xprt = task->tk_xprt; 1645 1646 task->tk_status = 0; 1647 if (task->tk_rqstp != NULL) 1648 return; 1649 1650 task->tk_timeout = 0; 1651 task->tk_status = -EAGAIN; 1652 if (!xprt_throttle_congested(xprt, task)) 1653 xprt_do_reserve(xprt, task); 1654 } 1655 1656 /** 1657 * xprt_retry_reserve - allocate an RPC request slot 1658 * @task: RPC task requesting a slot allocation 1659 * 1660 * If no more slots are available, place the task on the transport's 1661 * backlog queue. 1662 * Note that the only difference with xprt_reserve is that we now 1663 * ignore the value of the XPRT_CONGESTED flag. 1664 */ 1665 void xprt_retry_reserve(struct rpc_task *task) 1666 { 1667 struct rpc_xprt *xprt = task->tk_xprt; 1668 1669 task->tk_status = 0; 1670 if (task->tk_rqstp != NULL) 1671 return; 1672 1673 task->tk_timeout = 0; 1674 task->tk_status = -EAGAIN; 1675 xprt_do_reserve(xprt, task); 1676 } 1677 1678 static void 1679 xprt_request_dequeue_all(struct rpc_task *task, struct rpc_rqst *req) 1680 { 1681 struct rpc_xprt *xprt = req->rq_xprt; 1682 1683 if (test_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate) || 1684 test_bit(RPC_TASK_NEED_RECV, &task->tk_runstate) || 1685 xprt_is_pinned_rqst(req)) { 1686 spin_lock(&xprt->queue_lock); 1687 xprt_request_dequeue_transmit_locked(task); 1688 xprt_request_dequeue_receive_locked(task); 1689 while (xprt_is_pinned_rqst(req)) { 1690 set_bit(RPC_TASK_MSG_PIN_WAIT, &task->tk_runstate); 1691 spin_unlock(&xprt->queue_lock); 1692 xprt_wait_on_pinned_rqst(req); 1693 spin_lock(&xprt->queue_lock); 1694 clear_bit(RPC_TASK_MSG_PIN_WAIT, &task->tk_runstate); 1695 } 1696 spin_unlock(&xprt->queue_lock); 1697 } 1698 } 1699 1700 /** 1701 * xprt_release - release an RPC request slot 1702 * @task: task which is finished with the slot 1703 * 1704 */ 1705 void xprt_release(struct rpc_task *task) 1706 { 1707 struct rpc_xprt *xprt; 1708 struct rpc_rqst *req = task->tk_rqstp; 1709 1710 if (req == NULL) { 1711 if (task->tk_client) { 1712 xprt = task->tk_xprt; 1713 xprt_release_write(xprt, task); 1714 } 1715 return; 1716 } 1717 1718 xprt = req->rq_xprt; 1719 if (task->tk_ops->rpc_count_stats != NULL) 1720 task->tk_ops->rpc_count_stats(task, task->tk_calldata); 1721 else if (task->tk_client) 1722 rpc_count_iostats(task, task->tk_client->cl_metrics); 1723 xprt_request_dequeue_all(task, req); 1724 spin_lock_bh(&xprt->transport_lock); 1725 xprt->ops->release_xprt(xprt, task); 1726 if (xprt->ops->release_request) 1727 xprt->ops->release_request(task); 1728 xprt->last_used = jiffies; 1729 xprt_schedule_autodisconnect(xprt); 1730 spin_unlock_bh(&xprt->transport_lock); 1731 if (req->rq_buffer) 1732 xprt->ops->buf_free(task); 1733 xprt_inject_disconnect(xprt); 1734 xdr_free_bvec(&req->rq_rcv_buf); 1735 xdr_free_bvec(&req->rq_snd_buf); 1736 if (req->rq_cred != NULL) 1737 put_rpccred(req->rq_cred); 1738 task->tk_rqstp = NULL; 1739 if (req->rq_release_snd_buf) 1740 req->rq_release_snd_buf(req); 1741 1742 dprintk("RPC: %5u release request %p\n", task->tk_pid, req); 1743 if (likely(!bc_prealloc(req))) 1744 xprt->ops->free_slot(xprt, req); 1745 else 1746 xprt_free_bc_request(req); 1747 } 1748 1749 #ifdef CONFIG_SUNRPC_BACKCHANNEL 1750 void 1751 xprt_init_bc_request(struct rpc_rqst *req, struct rpc_task *task) 1752 { 1753 struct xdr_buf *xbufp = &req->rq_snd_buf; 1754 1755 task->tk_rqstp = req; 1756 req->rq_task = task; 1757 xprt_init_connect_cookie(req, req->rq_xprt); 1758 /* 1759 * Set up the xdr_buf length. 1760 * This also indicates that the buffer is XDR encoded already. 1761 */ 1762 xbufp->len = xbufp->head[0].iov_len + xbufp->page_len + 1763 xbufp->tail[0].iov_len; 1764 } 1765 #endif 1766 1767 static void xprt_init(struct rpc_xprt *xprt, struct net *net) 1768 { 1769 kref_init(&xprt->kref); 1770 1771 spin_lock_init(&xprt->transport_lock); 1772 spin_lock_init(&xprt->reserve_lock); 1773 spin_lock_init(&xprt->queue_lock); 1774 1775 INIT_LIST_HEAD(&xprt->free); 1776 xprt->recv_queue = RB_ROOT; 1777 INIT_LIST_HEAD(&xprt->xmit_queue); 1778 #if defined(CONFIG_SUNRPC_BACKCHANNEL) 1779 spin_lock_init(&xprt->bc_pa_lock); 1780 INIT_LIST_HEAD(&xprt->bc_pa_list); 1781 #endif /* CONFIG_SUNRPC_BACKCHANNEL */ 1782 INIT_LIST_HEAD(&xprt->xprt_switch); 1783 1784 xprt->last_used = jiffies; 1785 xprt->cwnd = RPC_INITCWND; 1786 xprt->bind_index = 0; 1787 1788 rpc_init_wait_queue(&xprt->binding, "xprt_binding"); 1789 rpc_init_wait_queue(&xprt->pending, "xprt_pending"); 1790 rpc_init_wait_queue(&xprt->sending, "xprt_sending"); 1791 rpc_init_priority_wait_queue(&xprt->backlog, "xprt_backlog"); 1792 1793 xprt_init_xid(xprt); 1794 1795 xprt->xprt_net = get_net(net); 1796 } 1797 1798 /** 1799 * xprt_create_transport - create an RPC transport 1800 * @args: rpc transport creation arguments 1801 * 1802 */ 1803 struct rpc_xprt *xprt_create_transport(struct xprt_create *args) 1804 { 1805 struct rpc_xprt *xprt; 1806 struct xprt_class *t; 1807 1808 spin_lock(&xprt_list_lock); 1809 list_for_each_entry(t, &xprt_list, list) { 1810 if (t->ident == args->ident) { 1811 spin_unlock(&xprt_list_lock); 1812 goto found; 1813 } 1814 } 1815 spin_unlock(&xprt_list_lock); 1816 dprintk("RPC: transport (%d) not supported\n", args->ident); 1817 return ERR_PTR(-EIO); 1818 1819 found: 1820 xprt = t->setup(args); 1821 if (IS_ERR(xprt)) { 1822 dprintk("RPC: xprt_create_transport: failed, %ld\n", 1823 -PTR_ERR(xprt)); 1824 goto out; 1825 } 1826 if (args->flags & XPRT_CREATE_NO_IDLE_TIMEOUT) 1827 xprt->idle_timeout = 0; 1828 INIT_WORK(&xprt->task_cleanup, xprt_autoclose); 1829 if (xprt_has_timer(xprt)) 1830 timer_setup(&xprt->timer, xprt_init_autodisconnect, 0); 1831 else 1832 timer_setup(&xprt->timer, NULL, 0); 1833 1834 if (strlen(args->servername) > RPC_MAXNETNAMELEN) { 1835 xprt_destroy(xprt); 1836 return ERR_PTR(-EINVAL); 1837 } 1838 xprt->servername = kstrdup(args->servername, GFP_KERNEL); 1839 if (xprt->servername == NULL) { 1840 xprt_destroy(xprt); 1841 return ERR_PTR(-ENOMEM); 1842 } 1843 1844 rpc_xprt_debugfs_register(xprt); 1845 1846 dprintk("RPC: created transport %p with %u slots\n", xprt, 1847 xprt->max_reqs); 1848 out: 1849 return xprt; 1850 } 1851 1852 static void xprt_destroy_cb(struct work_struct *work) 1853 { 1854 struct rpc_xprt *xprt = 1855 container_of(work, struct rpc_xprt, task_cleanup); 1856 1857 rpc_xprt_debugfs_unregister(xprt); 1858 rpc_destroy_wait_queue(&xprt->binding); 1859 rpc_destroy_wait_queue(&xprt->pending); 1860 rpc_destroy_wait_queue(&xprt->sending); 1861 rpc_destroy_wait_queue(&xprt->backlog); 1862 kfree(xprt->servername); 1863 /* 1864 * Tear down transport state and free the rpc_xprt 1865 */ 1866 xprt->ops->destroy(xprt); 1867 } 1868 1869 /** 1870 * xprt_destroy - destroy an RPC transport, killing off all requests. 1871 * @xprt: transport to destroy 1872 * 1873 */ 1874 static void xprt_destroy(struct rpc_xprt *xprt) 1875 { 1876 dprintk("RPC: destroying transport %p\n", xprt); 1877 1878 /* 1879 * Exclude transport connect/disconnect handlers and autoclose 1880 */ 1881 wait_on_bit_lock(&xprt->state, XPRT_LOCKED, TASK_UNINTERRUPTIBLE); 1882 1883 del_timer_sync(&xprt->timer); 1884 1885 /* 1886 * Destroy sockets etc from the system workqueue so they can 1887 * safely flush receive work running on rpciod. 1888 */ 1889 INIT_WORK(&xprt->task_cleanup, xprt_destroy_cb); 1890 schedule_work(&xprt->task_cleanup); 1891 } 1892 1893 static void xprt_destroy_kref(struct kref *kref) 1894 { 1895 xprt_destroy(container_of(kref, struct rpc_xprt, kref)); 1896 } 1897 1898 /** 1899 * xprt_get - return a reference to an RPC transport. 1900 * @xprt: pointer to the transport 1901 * 1902 */ 1903 struct rpc_xprt *xprt_get(struct rpc_xprt *xprt) 1904 { 1905 if (xprt != NULL && kref_get_unless_zero(&xprt->kref)) 1906 return xprt; 1907 return NULL; 1908 } 1909 EXPORT_SYMBOL_GPL(xprt_get); 1910 1911 /** 1912 * xprt_put - release a reference to an RPC transport. 1913 * @xprt: pointer to the transport 1914 * 1915 */ 1916 void xprt_put(struct rpc_xprt *xprt) 1917 { 1918 if (xprt != NULL) 1919 kref_put(&xprt->kref, xprt_destroy_kref); 1920 } 1921 EXPORT_SYMBOL_GPL(xprt_put); 1922