1 /* 2 * linux/net/sunrpc/xprt.c 3 * 4 * This is a generic RPC call interface supporting congestion avoidance, 5 * and asynchronous calls. 6 * 7 * The interface works like this: 8 * 9 * - When a process places a call, it allocates a request slot if 10 * one is available. Otherwise, it sleeps on the backlog queue 11 * (xprt_reserve). 12 * - Next, the caller puts together the RPC message, stuffs it into 13 * the request struct, and calls xprt_transmit(). 14 * - xprt_transmit sends the message and installs the caller on the 15 * transport's wait list. At the same time, if a reply is expected, 16 * it installs a timer that is run after the packet's timeout has 17 * expired. 18 * - When a packet arrives, the data_ready handler walks the list of 19 * pending requests for that transport. If a matching XID is found, the 20 * caller is woken up, and the timer removed. 21 * - When no reply arrives within the timeout interval, the timer is 22 * fired by the kernel and runs xprt_timer(). It either adjusts the 23 * timeout values (minor timeout) or wakes up the caller with a status 24 * of -ETIMEDOUT. 25 * - When the caller receives a notification from RPC that a reply arrived, 26 * it should release the RPC slot, and process the reply. 27 * If the call timed out, it may choose to retry the operation by 28 * adjusting the initial timeout value, and simply calling rpc_call 29 * again. 30 * 31 * Support for async RPC is done through a set of RPC-specific scheduling 32 * primitives that `transparently' work for processes as well as async 33 * tasks that rely on callbacks. 34 * 35 * Copyright (C) 1995-1997, Olaf Kirch <okir@monad.swb.de> 36 * 37 * Transport switch API copyright (C) 2005, Chuck Lever <cel@netapp.com> 38 */ 39 40 #include <linux/module.h> 41 42 #include <linux/types.h> 43 #include <linux/interrupt.h> 44 #include <linux/workqueue.h> 45 #include <linux/net.h> 46 #include <linux/ktime.h> 47 48 #include <linux/sunrpc/clnt.h> 49 #include <linux/sunrpc/metrics.h> 50 #include <linux/sunrpc/bc_xprt.h> 51 #include <linux/rcupdate.h> 52 53 #include <trace/events/sunrpc.h> 54 55 #include "sunrpc.h" 56 57 /* 58 * Local variables 59 */ 60 61 #if IS_ENABLED(CONFIG_SUNRPC_DEBUG) 62 # define RPCDBG_FACILITY RPCDBG_XPRT 63 #endif 64 65 /* 66 * Local functions 67 */ 68 static void xprt_init(struct rpc_xprt *xprt, struct net *net); 69 static __be32 xprt_alloc_xid(struct rpc_xprt *xprt); 70 static void xprt_connect_status(struct rpc_task *task); 71 static void xprt_destroy(struct rpc_xprt *xprt); 72 73 static DEFINE_SPINLOCK(xprt_list_lock); 74 static LIST_HEAD(xprt_list); 75 76 /** 77 * xprt_register_transport - register a transport implementation 78 * @transport: transport to register 79 * 80 * If a transport implementation is loaded as a kernel module, it can 81 * call this interface to make itself known to the RPC client. 82 * 83 * Returns: 84 * 0: transport successfully registered 85 * -EEXIST: transport already registered 86 * -EINVAL: transport module being unloaded 87 */ 88 int xprt_register_transport(struct xprt_class *transport) 89 { 90 struct xprt_class *t; 91 int result; 92 93 result = -EEXIST; 94 spin_lock(&xprt_list_lock); 95 list_for_each_entry(t, &xprt_list, list) { 96 /* don't register the same transport class twice */ 97 if (t->ident == transport->ident) 98 goto out; 99 } 100 101 list_add_tail(&transport->list, &xprt_list); 102 printk(KERN_INFO "RPC: Registered %s transport module.\n", 103 transport->name); 104 result = 0; 105 106 out: 107 spin_unlock(&xprt_list_lock); 108 return result; 109 } 110 EXPORT_SYMBOL_GPL(xprt_register_transport); 111 112 /** 113 * xprt_unregister_transport - unregister a transport implementation 114 * @transport: transport to unregister 115 * 116 * Returns: 117 * 0: transport successfully unregistered 118 * -ENOENT: transport never registered 119 */ 120 int xprt_unregister_transport(struct xprt_class *transport) 121 { 122 struct xprt_class *t; 123 int result; 124 125 result = 0; 126 spin_lock(&xprt_list_lock); 127 list_for_each_entry(t, &xprt_list, list) { 128 if (t == transport) { 129 printk(KERN_INFO 130 "RPC: Unregistered %s transport module.\n", 131 transport->name); 132 list_del_init(&transport->list); 133 goto out; 134 } 135 } 136 result = -ENOENT; 137 138 out: 139 spin_unlock(&xprt_list_lock); 140 return result; 141 } 142 EXPORT_SYMBOL_GPL(xprt_unregister_transport); 143 144 /** 145 * xprt_load_transport - load a transport implementation 146 * @transport_name: transport to load 147 * 148 * Returns: 149 * 0: transport successfully loaded 150 * -ENOENT: transport module not available 151 */ 152 int xprt_load_transport(const char *transport_name) 153 { 154 struct xprt_class *t; 155 int result; 156 157 result = 0; 158 spin_lock(&xprt_list_lock); 159 list_for_each_entry(t, &xprt_list, list) { 160 if (strcmp(t->name, transport_name) == 0) { 161 spin_unlock(&xprt_list_lock); 162 goto out; 163 } 164 } 165 spin_unlock(&xprt_list_lock); 166 result = request_module("xprt%s", transport_name); 167 out: 168 return result; 169 } 170 EXPORT_SYMBOL_GPL(xprt_load_transport); 171 172 static void xprt_clear_locked(struct rpc_xprt *xprt) 173 { 174 xprt->snd_task = NULL; 175 if (!test_bit(XPRT_CLOSE_WAIT, &xprt->state)) { 176 smp_mb__before_atomic(); 177 clear_bit(XPRT_LOCKED, &xprt->state); 178 smp_mb__after_atomic(); 179 } else 180 queue_work(xprtiod_workqueue, &xprt->task_cleanup); 181 } 182 183 /** 184 * xprt_reserve_xprt - serialize write access to transports 185 * @task: task that is requesting access to the transport 186 * @xprt: pointer to the target transport 187 * 188 * This prevents mixing the payload of separate requests, and prevents 189 * transport connects from colliding with writes. No congestion control 190 * is provided. 191 */ 192 int xprt_reserve_xprt(struct rpc_xprt *xprt, struct rpc_task *task) 193 { 194 struct rpc_rqst *req = task->tk_rqstp; 195 196 if (test_and_set_bit(XPRT_LOCKED, &xprt->state)) { 197 if (task == xprt->snd_task) 198 return 1; 199 goto out_sleep; 200 } 201 if (test_bit(XPRT_WRITE_SPACE, &xprt->state)) 202 goto out_unlock; 203 xprt->snd_task = task; 204 205 return 1; 206 207 out_unlock: 208 xprt_clear_locked(xprt); 209 out_sleep: 210 dprintk("RPC: %5u failed to lock transport %p\n", 211 task->tk_pid, xprt); 212 task->tk_timeout = RPC_IS_SOFT(task) ? req->rq_timeout : 0; 213 task->tk_status = -EAGAIN; 214 rpc_sleep_on(&xprt->sending, task, NULL); 215 return 0; 216 } 217 EXPORT_SYMBOL_GPL(xprt_reserve_xprt); 218 219 static bool 220 xprt_need_congestion_window_wait(struct rpc_xprt *xprt) 221 { 222 return test_bit(XPRT_CWND_WAIT, &xprt->state); 223 } 224 225 static void 226 xprt_set_congestion_window_wait(struct rpc_xprt *xprt) 227 { 228 if (!list_empty(&xprt->xmit_queue)) { 229 /* Peek at head of queue to see if it can make progress */ 230 if (list_first_entry(&xprt->xmit_queue, struct rpc_rqst, 231 rq_xmit)->rq_cong) 232 return; 233 } 234 set_bit(XPRT_CWND_WAIT, &xprt->state); 235 } 236 237 static void 238 xprt_test_and_clear_congestion_window_wait(struct rpc_xprt *xprt) 239 { 240 if (!RPCXPRT_CONGESTED(xprt)) 241 clear_bit(XPRT_CWND_WAIT, &xprt->state); 242 } 243 244 /* 245 * xprt_reserve_xprt_cong - serialize write access to transports 246 * @task: task that is requesting access to the transport 247 * 248 * Same as xprt_reserve_xprt, but Van Jacobson congestion control is 249 * integrated into the decision of whether a request is allowed to be 250 * woken up and given access to the transport. 251 * Note that the lock is only granted if we know there are free slots. 252 */ 253 int xprt_reserve_xprt_cong(struct rpc_xprt *xprt, struct rpc_task *task) 254 { 255 struct rpc_rqst *req = task->tk_rqstp; 256 257 if (test_and_set_bit(XPRT_LOCKED, &xprt->state)) { 258 if (task == xprt->snd_task) 259 return 1; 260 goto out_sleep; 261 } 262 if (req == NULL) { 263 xprt->snd_task = task; 264 return 1; 265 } 266 if (test_bit(XPRT_WRITE_SPACE, &xprt->state)) 267 goto out_unlock; 268 if (!xprt_need_congestion_window_wait(xprt)) { 269 xprt->snd_task = task; 270 return 1; 271 } 272 out_unlock: 273 xprt_clear_locked(xprt); 274 out_sleep: 275 dprintk("RPC: %5u failed to lock transport %p\n", task->tk_pid, xprt); 276 task->tk_timeout = RPC_IS_SOFT(task) ? req->rq_timeout : 0; 277 task->tk_status = -EAGAIN; 278 rpc_sleep_on(&xprt->sending, task, NULL); 279 return 0; 280 } 281 EXPORT_SYMBOL_GPL(xprt_reserve_xprt_cong); 282 283 static inline int xprt_lock_write(struct rpc_xprt *xprt, struct rpc_task *task) 284 { 285 int retval; 286 287 if (test_bit(XPRT_LOCKED, &xprt->state) && xprt->snd_task == task) 288 return 1; 289 spin_lock_bh(&xprt->transport_lock); 290 retval = xprt->ops->reserve_xprt(xprt, task); 291 spin_unlock_bh(&xprt->transport_lock); 292 return retval; 293 } 294 295 static bool __xprt_lock_write_func(struct rpc_task *task, void *data) 296 { 297 struct rpc_xprt *xprt = data; 298 299 xprt->snd_task = task; 300 return true; 301 } 302 303 static void __xprt_lock_write_next(struct rpc_xprt *xprt) 304 { 305 if (test_and_set_bit(XPRT_LOCKED, &xprt->state)) 306 return; 307 if (test_bit(XPRT_WRITE_SPACE, &xprt->state)) 308 goto out_unlock; 309 if (rpc_wake_up_first_on_wq(xprtiod_workqueue, &xprt->sending, 310 __xprt_lock_write_func, xprt)) 311 return; 312 out_unlock: 313 xprt_clear_locked(xprt); 314 } 315 316 static void __xprt_lock_write_next_cong(struct rpc_xprt *xprt) 317 { 318 if (test_and_set_bit(XPRT_LOCKED, &xprt->state)) 319 return; 320 if (test_bit(XPRT_WRITE_SPACE, &xprt->state)) 321 goto out_unlock; 322 if (xprt_need_congestion_window_wait(xprt)) 323 goto out_unlock; 324 if (rpc_wake_up_first_on_wq(xprtiod_workqueue, &xprt->sending, 325 __xprt_lock_write_func, xprt)) 326 return; 327 out_unlock: 328 xprt_clear_locked(xprt); 329 } 330 331 /** 332 * xprt_release_xprt - allow other requests to use a transport 333 * @xprt: transport with other tasks potentially waiting 334 * @task: task that is releasing access to the transport 335 * 336 * Note that "task" can be NULL. No congestion control is provided. 337 */ 338 void xprt_release_xprt(struct rpc_xprt *xprt, struct rpc_task *task) 339 { 340 if (xprt->snd_task == task) { 341 xprt_clear_locked(xprt); 342 __xprt_lock_write_next(xprt); 343 } 344 } 345 EXPORT_SYMBOL_GPL(xprt_release_xprt); 346 347 /** 348 * xprt_release_xprt_cong - allow other requests to use a transport 349 * @xprt: transport with other tasks potentially waiting 350 * @task: task that is releasing access to the transport 351 * 352 * Note that "task" can be NULL. Another task is awoken to use the 353 * transport if the transport's congestion window allows it. 354 */ 355 void xprt_release_xprt_cong(struct rpc_xprt *xprt, struct rpc_task *task) 356 { 357 if (xprt->snd_task == task) { 358 xprt_clear_locked(xprt); 359 __xprt_lock_write_next_cong(xprt); 360 } 361 } 362 EXPORT_SYMBOL_GPL(xprt_release_xprt_cong); 363 364 static inline void xprt_release_write(struct rpc_xprt *xprt, struct rpc_task *task) 365 { 366 if (xprt->snd_task != task) 367 return; 368 spin_lock_bh(&xprt->transport_lock); 369 xprt->ops->release_xprt(xprt, task); 370 spin_unlock_bh(&xprt->transport_lock); 371 } 372 373 /* 374 * Van Jacobson congestion avoidance. Check if the congestion window 375 * overflowed. Put the task to sleep if this is the case. 376 */ 377 static int 378 __xprt_get_cong(struct rpc_xprt *xprt, struct rpc_rqst *req) 379 { 380 if (req->rq_cong) 381 return 1; 382 dprintk("RPC: %5u xprt_cwnd_limited cong = %lu cwnd = %lu\n", 383 req->rq_task->tk_pid, xprt->cong, xprt->cwnd); 384 if (RPCXPRT_CONGESTED(xprt)) { 385 xprt_set_congestion_window_wait(xprt); 386 return 0; 387 } 388 req->rq_cong = 1; 389 xprt->cong += RPC_CWNDSCALE; 390 return 1; 391 } 392 393 /* 394 * Adjust the congestion window, and wake up the next task 395 * that has been sleeping due to congestion 396 */ 397 static void 398 __xprt_put_cong(struct rpc_xprt *xprt, struct rpc_rqst *req) 399 { 400 if (!req->rq_cong) 401 return; 402 req->rq_cong = 0; 403 xprt->cong -= RPC_CWNDSCALE; 404 xprt_test_and_clear_congestion_window_wait(xprt); 405 __xprt_lock_write_next_cong(xprt); 406 } 407 408 /** 409 * xprt_request_get_cong - Request congestion control credits 410 * @xprt: pointer to transport 411 * @req: pointer to RPC request 412 * 413 * Useful for transports that require congestion control. 414 */ 415 bool 416 xprt_request_get_cong(struct rpc_xprt *xprt, struct rpc_rqst *req) 417 { 418 bool ret = false; 419 420 if (req->rq_cong) 421 return true; 422 spin_lock_bh(&xprt->transport_lock); 423 ret = __xprt_get_cong(xprt, req) != 0; 424 spin_unlock_bh(&xprt->transport_lock); 425 return ret; 426 } 427 EXPORT_SYMBOL_GPL(xprt_request_get_cong); 428 429 /** 430 * xprt_release_rqst_cong - housekeeping when request is complete 431 * @task: RPC request that recently completed 432 * 433 * Useful for transports that require congestion control. 434 */ 435 void xprt_release_rqst_cong(struct rpc_task *task) 436 { 437 struct rpc_rqst *req = task->tk_rqstp; 438 439 __xprt_put_cong(req->rq_xprt, req); 440 } 441 EXPORT_SYMBOL_GPL(xprt_release_rqst_cong); 442 443 /* 444 * Clear the congestion window wait flag and wake up the next 445 * entry on xprt->sending 446 */ 447 static void 448 xprt_clear_congestion_window_wait(struct rpc_xprt *xprt) 449 { 450 if (test_and_clear_bit(XPRT_CWND_WAIT, &xprt->state)) { 451 spin_lock_bh(&xprt->transport_lock); 452 __xprt_lock_write_next_cong(xprt); 453 spin_unlock_bh(&xprt->transport_lock); 454 } 455 } 456 457 /** 458 * xprt_adjust_cwnd - adjust transport congestion window 459 * @xprt: pointer to xprt 460 * @task: recently completed RPC request used to adjust window 461 * @result: result code of completed RPC request 462 * 463 * The transport code maintains an estimate on the maximum number of out- 464 * standing RPC requests, using a smoothed version of the congestion 465 * avoidance implemented in 44BSD. This is basically the Van Jacobson 466 * congestion algorithm: If a retransmit occurs, the congestion window is 467 * halved; otherwise, it is incremented by 1/cwnd when 468 * 469 * - a reply is received and 470 * - a full number of requests are outstanding and 471 * - the congestion window hasn't been updated recently. 472 */ 473 void xprt_adjust_cwnd(struct rpc_xprt *xprt, struct rpc_task *task, int result) 474 { 475 struct rpc_rqst *req = task->tk_rqstp; 476 unsigned long cwnd = xprt->cwnd; 477 478 if (result >= 0 && cwnd <= xprt->cong) { 479 /* The (cwnd >> 1) term makes sure 480 * the result gets rounded properly. */ 481 cwnd += (RPC_CWNDSCALE * RPC_CWNDSCALE + (cwnd >> 1)) / cwnd; 482 if (cwnd > RPC_MAXCWND(xprt)) 483 cwnd = RPC_MAXCWND(xprt); 484 __xprt_lock_write_next_cong(xprt); 485 } else if (result == -ETIMEDOUT) { 486 cwnd >>= 1; 487 if (cwnd < RPC_CWNDSCALE) 488 cwnd = RPC_CWNDSCALE; 489 } 490 dprintk("RPC: cong %ld, cwnd was %ld, now %ld\n", 491 xprt->cong, xprt->cwnd, cwnd); 492 xprt->cwnd = cwnd; 493 __xprt_put_cong(xprt, req); 494 } 495 EXPORT_SYMBOL_GPL(xprt_adjust_cwnd); 496 497 /** 498 * xprt_wake_pending_tasks - wake all tasks on a transport's pending queue 499 * @xprt: transport with waiting tasks 500 * @status: result code to plant in each task before waking it 501 * 502 */ 503 void xprt_wake_pending_tasks(struct rpc_xprt *xprt, int status) 504 { 505 if (status < 0) 506 rpc_wake_up_status(&xprt->pending, status); 507 else 508 rpc_wake_up(&xprt->pending); 509 } 510 EXPORT_SYMBOL_GPL(xprt_wake_pending_tasks); 511 512 /** 513 * xprt_wait_for_buffer_space - wait for transport output buffer to clear 514 * @xprt: transport 515 * 516 * Note that we only set the timer for the case of RPC_IS_SOFT(), since 517 * we don't in general want to force a socket disconnection due to 518 * an incomplete RPC call transmission. 519 */ 520 void xprt_wait_for_buffer_space(struct rpc_xprt *xprt) 521 { 522 set_bit(XPRT_WRITE_SPACE, &xprt->state); 523 } 524 EXPORT_SYMBOL_GPL(xprt_wait_for_buffer_space); 525 526 static bool 527 xprt_clear_write_space_locked(struct rpc_xprt *xprt) 528 { 529 if (test_and_clear_bit(XPRT_WRITE_SPACE, &xprt->state)) { 530 __xprt_lock_write_next(xprt); 531 dprintk("RPC: write space: waking waiting task on " 532 "xprt %p\n", xprt); 533 return true; 534 } 535 return false; 536 } 537 538 /** 539 * xprt_write_space - wake the task waiting for transport output buffer space 540 * @xprt: transport with waiting tasks 541 * 542 * Can be called in a soft IRQ context, so xprt_write_space never sleeps. 543 */ 544 bool xprt_write_space(struct rpc_xprt *xprt) 545 { 546 bool ret; 547 548 if (!test_bit(XPRT_WRITE_SPACE, &xprt->state)) 549 return false; 550 spin_lock_bh(&xprt->transport_lock); 551 ret = xprt_clear_write_space_locked(xprt); 552 spin_unlock_bh(&xprt->transport_lock); 553 return ret; 554 } 555 EXPORT_SYMBOL_GPL(xprt_write_space); 556 557 /** 558 * xprt_set_retrans_timeout_def - set a request's retransmit timeout 559 * @task: task whose timeout is to be set 560 * 561 * Set a request's retransmit timeout based on the transport's 562 * default timeout parameters. Used by transports that don't adjust 563 * the retransmit timeout based on round-trip time estimation. 564 */ 565 void xprt_set_retrans_timeout_def(struct rpc_task *task) 566 { 567 task->tk_timeout = task->tk_rqstp->rq_timeout; 568 } 569 EXPORT_SYMBOL_GPL(xprt_set_retrans_timeout_def); 570 571 /** 572 * xprt_set_retrans_timeout_rtt - set a request's retransmit timeout 573 * @task: task whose timeout is to be set 574 * 575 * Set a request's retransmit timeout using the RTT estimator. 576 */ 577 void xprt_set_retrans_timeout_rtt(struct rpc_task *task) 578 { 579 int timer = task->tk_msg.rpc_proc->p_timer; 580 struct rpc_clnt *clnt = task->tk_client; 581 struct rpc_rtt *rtt = clnt->cl_rtt; 582 struct rpc_rqst *req = task->tk_rqstp; 583 unsigned long max_timeout = clnt->cl_timeout->to_maxval; 584 585 task->tk_timeout = rpc_calc_rto(rtt, timer); 586 task->tk_timeout <<= rpc_ntimeo(rtt, timer) + req->rq_retries; 587 if (task->tk_timeout > max_timeout || task->tk_timeout == 0) 588 task->tk_timeout = max_timeout; 589 } 590 EXPORT_SYMBOL_GPL(xprt_set_retrans_timeout_rtt); 591 592 static void xprt_reset_majortimeo(struct rpc_rqst *req) 593 { 594 const struct rpc_timeout *to = req->rq_task->tk_client->cl_timeout; 595 596 req->rq_majortimeo = req->rq_timeout; 597 if (to->to_exponential) 598 req->rq_majortimeo <<= to->to_retries; 599 else 600 req->rq_majortimeo += to->to_increment * to->to_retries; 601 if (req->rq_majortimeo > to->to_maxval || req->rq_majortimeo == 0) 602 req->rq_majortimeo = to->to_maxval; 603 req->rq_majortimeo += jiffies; 604 } 605 606 /** 607 * xprt_adjust_timeout - adjust timeout values for next retransmit 608 * @req: RPC request containing parameters to use for the adjustment 609 * 610 */ 611 int xprt_adjust_timeout(struct rpc_rqst *req) 612 { 613 struct rpc_xprt *xprt = req->rq_xprt; 614 const struct rpc_timeout *to = req->rq_task->tk_client->cl_timeout; 615 int status = 0; 616 617 if (time_before(jiffies, req->rq_majortimeo)) { 618 if (to->to_exponential) 619 req->rq_timeout <<= 1; 620 else 621 req->rq_timeout += to->to_increment; 622 if (to->to_maxval && req->rq_timeout >= to->to_maxval) 623 req->rq_timeout = to->to_maxval; 624 req->rq_retries++; 625 } else { 626 req->rq_timeout = to->to_initval; 627 req->rq_retries = 0; 628 xprt_reset_majortimeo(req); 629 /* Reset the RTT counters == "slow start" */ 630 spin_lock_bh(&xprt->transport_lock); 631 rpc_init_rtt(req->rq_task->tk_client->cl_rtt, to->to_initval); 632 spin_unlock_bh(&xprt->transport_lock); 633 status = -ETIMEDOUT; 634 } 635 636 if (req->rq_timeout == 0) { 637 printk(KERN_WARNING "xprt_adjust_timeout: rq_timeout = 0!\n"); 638 req->rq_timeout = 5 * HZ; 639 } 640 return status; 641 } 642 643 static void xprt_autoclose(struct work_struct *work) 644 { 645 struct rpc_xprt *xprt = 646 container_of(work, struct rpc_xprt, task_cleanup); 647 648 clear_bit(XPRT_CLOSE_WAIT, &xprt->state); 649 xprt->ops->close(xprt); 650 xprt_release_write(xprt, NULL); 651 wake_up_bit(&xprt->state, XPRT_LOCKED); 652 } 653 654 /** 655 * xprt_disconnect_done - mark a transport as disconnected 656 * @xprt: transport to flag for disconnect 657 * 658 */ 659 void xprt_disconnect_done(struct rpc_xprt *xprt) 660 { 661 dprintk("RPC: disconnected transport %p\n", xprt); 662 spin_lock_bh(&xprt->transport_lock); 663 xprt_clear_connected(xprt); 664 xprt_clear_write_space_locked(xprt); 665 xprt_wake_pending_tasks(xprt, -EAGAIN); 666 spin_unlock_bh(&xprt->transport_lock); 667 } 668 EXPORT_SYMBOL_GPL(xprt_disconnect_done); 669 670 /** 671 * xprt_force_disconnect - force a transport to disconnect 672 * @xprt: transport to disconnect 673 * 674 */ 675 void xprt_force_disconnect(struct rpc_xprt *xprt) 676 { 677 /* Don't race with the test_bit() in xprt_clear_locked() */ 678 spin_lock_bh(&xprt->transport_lock); 679 set_bit(XPRT_CLOSE_WAIT, &xprt->state); 680 /* Try to schedule an autoclose RPC call */ 681 if (test_and_set_bit(XPRT_LOCKED, &xprt->state) == 0) 682 queue_work(xprtiod_workqueue, &xprt->task_cleanup); 683 xprt_wake_pending_tasks(xprt, -EAGAIN); 684 spin_unlock_bh(&xprt->transport_lock); 685 } 686 EXPORT_SYMBOL_GPL(xprt_force_disconnect); 687 688 static unsigned int 689 xprt_connect_cookie(struct rpc_xprt *xprt) 690 { 691 return READ_ONCE(xprt->connect_cookie); 692 } 693 694 static bool 695 xprt_request_retransmit_after_disconnect(struct rpc_task *task) 696 { 697 struct rpc_rqst *req = task->tk_rqstp; 698 struct rpc_xprt *xprt = req->rq_xprt; 699 700 return req->rq_connect_cookie != xprt_connect_cookie(xprt) || 701 !xprt_connected(xprt); 702 } 703 704 /** 705 * xprt_conditional_disconnect - force a transport to disconnect 706 * @xprt: transport to disconnect 707 * @cookie: 'connection cookie' 708 * 709 * This attempts to break the connection if and only if 'cookie' matches 710 * the current transport 'connection cookie'. It ensures that we don't 711 * try to break the connection more than once when we need to retransmit 712 * a batch of RPC requests. 713 * 714 */ 715 void xprt_conditional_disconnect(struct rpc_xprt *xprt, unsigned int cookie) 716 { 717 /* Don't race with the test_bit() in xprt_clear_locked() */ 718 spin_lock_bh(&xprt->transport_lock); 719 if (cookie != xprt->connect_cookie) 720 goto out; 721 if (test_bit(XPRT_CLOSING, &xprt->state)) 722 goto out; 723 set_bit(XPRT_CLOSE_WAIT, &xprt->state); 724 /* Try to schedule an autoclose RPC call */ 725 if (test_and_set_bit(XPRT_LOCKED, &xprt->state) == 0) 726 queue_work(xprtiod_workqueue, &xprt->task_cleanup); 727 xprt_wake_pending_tasks(xprt, -EAGAIN); 728 out: 729 spin_unlock_bh(&xprt->transport_lock); 730 } 731 732 static bool 733 xprt_has_timer(const struct rpc_xprt *xprt) 734 { 735 return xprt->idle_timeout != 0; 736 } 737 738 static void 739 xprt_schedule_autodisconnect(struct rpc_xprt *xprt) 740 __must_hold(&xprt->transport_lock) 741 { 742 if (RB_EMPTY_ROOT(&xprt->recv_queue) && xprt_has_timer(xprt)) 743 mod_timer(&xprt->timer, xprt->last_used + xprt->idle_timeout); 744 } 745 746 static void 747 xprt_init_autodisconnect(struct timer_list *t) 748 { 749 struct rpc_xprt *xprt = from_timer(xprt, t, timer); 750 751 spin_lock(&xprt->transport_lock); 752 if (!RB_EMPTY_ROOT(&xprt->recv_queue)) 753 goto out_abort; 754 /* Reset xprt->last_used to avoid connect/autodisconnect cycling */ 755 xprt->last_used = jiffies; 756 if (test_and_set_bit(XPRT_LOCKED, &xprt->state)) 757 goto out_abort; 758 spin_unlock(&xprt->transport_lock); 759 queue_work(xprtiod_workqueue, &xprt->task_cleanup); 760 return; 761 out_abort: 762 spin_unlock(&xprt->transport_lock); 763 } 764 765 bool xprt_lock_connect(struct rpc_xprt *xprt, 766 struct rpc_task *task, 767 void *cookie) 768 { 769 bool ret = false; 770 771 spin_lock_bh(&xprt->transport_lock); 772 if (!test_bit(XPRT_LOCKED, &xprt->state)) 773 goto out; 774 if (xprt->snd_task != task) 775 goto out; 776 xprt->snd_task = cookie; 777 ret = true; 778 out: 779 spin_unlock_bh(&xprt->transport_lock); 780 return ret; 781 } 782 783 void xprt_unlock_connect(struct rpc_xprt *xprt, void *cookie) 784 { 785 spin_lock_bh(&xprt->transport_lock); 786 if (xprt->snd_task != cookie) 787 goto out; 788 if (!test_bit(XPRT_LOCKED, &xprt->state)) 789 goto out; 790 xprt->snd_task =NULL; 791 xprt->ops->release_xprt(xprt, NULL); 792 xprt_schedule_autodisconnect(xprt); 793 out: 794 spin_unlock_bh(&xprt->transport_lock); 795 wake_up_bit(&xprt->state, XPRT_LOCKED); 796 } 797 798 /** 799 * xprt_connect - schedule a transport connect operation 800 * @task: RPC task that is requesting the connect 801 * 802 */ 803 void xprt_connect(struct rpc_task *task) 804 { 805 struct rpc_xprt *xprt = task->tk_rqstp->rq_xprt; 806 807 dprintk("RPC: %5u xprt_connect xprt %p %s connected\n", task->tk_pid, 808 xprt, (xprt_connected(xprt) ? "is" : "is not")); 809 810 if (!xprt_bound(xprt)) { 811 task->tk_status = -EAGAIN; 812 return; 813 } 814 if (!xprt_lock_write(xprt, task)) 815 return; 816 817 if (test_and_clear_bit(XPRT_CLOSE_WAIT, &xprt->state)) 818 xprt->ops->close(xprt); 819 820 if (!xprt_connected(xprt)) { 821 task->tk_timeout = task->tk_rqstp->rq_timeout; 822 task->tk_rqstp->rq_connect_cookie = xprt->connect_cookie; 823 rpc_sleep_on(&xprt->pending, task, xprt_connect_status); 824 825 if (test_bit(XPRT_CLOSING, &xprt->state)) 826 return; 827 if (xprt_test_and_set_connecting(xprt)) 828 return; 829 /* Race breaker */ 830 if (!xprt_connected(xprt)) { 831 xprt->stat.connect_start = jiffies; 832 xprt->ops->connect(xprt, task); 833 } else { 834 xprt_clear_connecting(xprt); 835 task->tk_status = 0; 836 rpc_wake_up_queued_task(&xprt->pending, task); 837 } 838 } 839 xprt_release_write(xprt, task); 840 } 841 842 static void xprt_connect_status(struct rpc_task *task) 843 { 844 switch (task->tk_status) { 845 case 0: 846 dprintk("RPC: %5u xprt_connect_status: connection established\n", 847 task->tk_pid); 848 break; 849 case -ECONNREFUSED: 850 case -ECONNRESET: 851 case -ECONNABORTED: 852 case -ENETUNREACH: 853 case -EHOSTUNREACH: 854 case -EPIPE: 855 case -EAGAIN: 856 dprintk("RPC: %5u xprt_connect_status: retrying\n", task->tk_pid); 857 break; 858 case -ETIMEDOUT: 859 dprintk("RPC: %5u xprt_connect_status: connect attempt timed " 860 "out\n", task->tk_pid); 861 break; 862 default: 863 dprintk("RPC: %5u xprt_connect_status: error %d connecting to " 864 "server %s\n", task->tk_pid, -task->tk_status, 865 task->tk_rqstp->rq_xprt->servername); 866 task->tk_status = -EIO; 867 } 868 } 869 870 enum xprt_xid_rb_cmp { 871 XID_RB_EQUAL, 872 XID_RB_LEFT, 873 XID_RB_RIGHT, 874 }; 875 static enum xprt_xid_rb_cmp 876 xprt_xid_cmp(__be32 xid1, __be32 xid2) 877 { 878 if (xid1 == xid2) 879 return XID_RB_EQUAL; 880 if ((__force u32)xid1 < (__force u32)xid2) 881 return XID_RB_LEFT; 882 return XID_RB_RIGHT; 883 } 884 885 static struct rpc_rqst * 886 xprt_request_rb_find(struct rpc_xprt *xprt, __be32 xid) 887 { 888 struct rb_node *n = xprt->recv_queue.rb_node; 889 struct rpc_rqst *req; 890 891 while (n != NULL) { 892 req = rb_entry(n, struct rpc_rqst, rq_recv); 893 switch (xprt_xid_cmp(xid, req->rq_xid)) { 894 case XID_RB_LEFT: 895 n = n->rb_left; 896 break; 897 case XID_RB_RIGHT: 898 n = n->rb_right; 899 break; 900 case XID_RB_EQUAL: 901 return req; 902 } 903 } 904 return NULL; 905 } 906 907 static void 908 xprt_request_rb_insert(struct rpc_xprt *xprt, struct rpc_rqst *new) 909 { 910 struct rb_node **p = &xprt->recv_queue.rb_node; 911 struct rb_node *n = NULL; 912 struct rpc_rqst *req; 913 914 while (*p != NULL) { 915 n = *p; 916 req = rb_entry(n, struct rpc_rqst, rq_recv); 917 switch(xprt_xid_cmp(new->rq_xid, req->rq_xid)) { 918 case XID_RB_LEFT: 919 p = &n->rb_left; 920 break; 921 case XID_RB_RIGHT: 922 p = &n->rb_right; 923 break; 924 case XID_RB_EQUAL: 925 WARN_ON_ONCE(new != req); 926 return; 927 } 928 } 929 rb_link_node(&new->rq_recv, n, p); 930 rb_insert_color(&new->rq_recv, &xprt->recv_queue); 931 } 932 933 static void 934 xprt_request_rb_remove(struct rpc_xprt *xprt, struct rpc_rqst *req) 935 { 936 rb_erase(&req->rq_recv, &xprt->recv_queue); 937 } 938 939 /** 940 * xprt_lookup_rqst - find an RPC request corresponding to an XID 941 * @xprt: transport on which the original request was transmitted 942 * @xid: RPC XID of incoming reply 943 * 944 * Caller holds xprt->queue_lock. 945 */ 946 struct rpc_rqst *xprt_lookup_rqst(struct rpc_xprt *xprt, __be32 xid) 947 { 948 struct rpc_rqst *entry; 949 950 entry = xprt_request_rb_find(xprt, xid); 951 if (entry != NULL) { 952 trace_xprt_lookup_rqst(xprt, xid, 0); 953 entry->rq_rtt = ktime_sub(ktime_get(), entry->rq_xtime); 954 return entry; 955 } 956 957 dprintk("RPC: xprt_lookup_rqst did not find xid %08x\n", 958 ntohl(xid)); 959 trace_xprt_lookup_rqst(xprt, xid, -ENOENT); 960 xprt->stat.bad_xids++; 961 return NULL; 962 } 963 EXPORT_SYMBOL_GPL(xprt_lookup_rqst); 964 965 static bool 966 xprt_is_pinned_rqst(struct rpc_rqst *req) 967 { 968 return atomic_read(&req->rq_pin) != 0; 969 } 970 971 /** 972 * xprt_pin_rqst - Pin a request on the transport receive list 973 * @req: Request to pin 974 * 975 * Caller must ensure this is atomic with the call to xprt_lookup_rqst() 976 * so should be holding the xprt receive lock. 977 */ 978 void xprt_pin_rqst(struct rpc_rqst *req) 979 { 980 atomic_inc(&req->rq_pin); 981 } 982 EXPORT_SYMBOL_GPL(xprt_pin_rqst); 983 984 /** 985 * xprt_unpin_rqst - Unpin a request on the transport receive list 986 * @req: Request to pin 987 * 988 * Caller should be holding the xprt receive lock. 989 */ 990 void xprt_unpin_rqst(struct rpc_rqst *req) 991 { 992 if (!test_bit(RPC_TASK_MSG_PIN_WAIT, &req->rq_task->tk_runstate)) { 993 atomic_dec(&req->rq_pin); 994 return; 995 } 996 if (atomic_dec_and_test(&req->rq_pin)) 997 wake_up_var(&req->rq_pin); 998 } 999 EXPORT_SYMBOL_GPL(xprt_unpin_rqst); 1000 1001 static void xprt_wait_on_pinned_rqst(struct rpc_rqst *req) 1002 { 1003 wait_var_event(&req->rq_pin, !xprt_is_pinned_rqst(req)); 1004 } 1005 1006 static bool 1007 xprt_request_data_received(struct rpc_task *task) 1008 { 1009 return !test_bit(RPC_TASK_NEED_RECV, &task->tk_runstate) && 1010 READ_ONCE(task->tk_rqstp->rq_reply_bytes_recvd) != 0; 1011 } 1012 1013 static bool 1014 xprt_request_need_enqueue_receive(struct rpc_task *task, struct rpc_rqst *req) 1015 { 1016 return !test_bit(RPC_TASK_NEED_RECV, &task->tk_runstate) && 1017 READ_ONCE(task->tk_rqstp->rq_reply_bytes_recvd) == 0; 1018 } 1019 1020 /** 1021 * xprt_request_enqueue_receive - Add an request to the receive queue 1022 * @task: RPC task 1023 * 1024 */ 1025 void 1026 xprt_request_enqueue_receive(struct rpc_task *task) 1027 { 1028 struct rpc_rqst *req = task->tk_rqstp; 1029 struct rpc_xprt *xprt = req->rq_xprt; 1030 1031 if (!xprt_request_need_enqueue_receive(task, req)) 1032 return; 1033 spin_lock(&xprt->queue_lock); 1034 1035 /* Update the softirq receive buffer */ 1036 memcpy(&req->rq_private_buf, &req->rq_rcv_buf, 1037 sizeof(req->rq_private_buf)); 1038 1039 /* Add request to the receive list */ 1040 xprt_request_rb_insert(xprt, req); 1041 set_bit(RPC_TASK_NEED_RECV, &task->tk_runstate); 1042 spin_unlock(&xprt->queue_lock); 1043 1044 xprt_reset_majortimeo(req); 1045 /* Turn off autodisconnect */ 1046 del_singleshot_timer_sync(&xprt->timer); 1047 } 1048 1049 /** 1050 * xprt_request_dequeue_receive_locked - Remove a request from the receive queue 1051 * @task: RPC task 1052 * 1053 * Caller must hold xprt->queue_lock. 1054 */ 1055 static void 1056 xprt_request_dequeue_receive_locked(struct rpc_task *task) 1057 { 1058 struct rpc_rqst *req = task->tk_rqstp; 1059 1060 if (test_and_clear_bit(RPC_TASK_NEED_RECV, &task->tk_runstate)) 1061 xprt_request_rb_remove(req->rq_xprt, req); 1062 } 1063 1064 /** 1065 * xprt_update_rtt - Update RPC RTT statistics 1066 * @task: RPC request that recently completed 1067 * 1068 * Caller holds xprt->queue_lock. 1069 */ 1070 void xprt_update_rtt(struct rpc_task *task) 1071 { 1072 struct rpc_rqst *req = task->tk_rqstp; 1073 struct rpc_rtt *rtt = task->tk_client->cl_rtt; 1074 unsigned int timer = task->tk_msg.rpc_proc->p_timer; 1075 long m = usecs_to_jiffies(ktime_to_us(req->rq_rtt)); 1076 1077 if (timer) { 1078 if (req->rq_ntrans == 1) 1079 rpc_update_rtt(rtt, timer, m); 1080 rpc_set_timeo(rtt, timer, req->rq_ntrans - 1); 1081 } 1082 } 1083 EXPORT_SYMBOL_GPL(xprt_update_rtt); 1084 1085 /** 1086 * xprt_complete_rqst - called when reply processing is complete 1087 * @task: RPC request that recently completed 1088 * @copied: actual number of bytes received from the transport 1089 * 1090 * Caller holds xprt->queue_lock. 1091 */ 1092 void xprt_complete_rqst(struct rpc_task *task, int copied) 1093 { 1094 struct rpc_rqst *req = task->tk_rqstp; 1095 struct rpc_xprt *xprt = req->rq_xprt; 1096 1097 dprintk("RPC: %5u xid %08x complete (%d bytes received)\n", 1098 task->tk_pid, ntohl(req->rq_xid), copied); 1099 trace_xprt_complete_rqst(xprt, req->rq_xid, copied); 1100 1101 xprt->stat.recvs++; 1102 1103 req->rq_private_buf.len = copied; 1104 /* Ensure all writes are done before we update */ 1105 /* req->rq_reply_bytes_recvd */ 1106 smp_wmb(); 1107 req->rq_reply_bytes_recvd = copied; 1108 xprt_request_dequeue_receive_locked(task); 1109 rpc_wake_up_queued_task(&xprt->pending, task); 1110 } 1111 EXPORT_SYMBOL_GPL(xprt_complete_rqst); 1112 1113 static void xprt_timer(struct rpc_task *task) 1114 { 1115 struct rpc_rqst *req = task->tk_rqstp; 1116 struct rpc_xprt *xprt = req->rq_xprt; 1117 1118 if (task->tk_status != -ETIMEDOUT) 1119 return; 1120 1121 trace_xprt_timer(xprt, req->rq_xid, task->tk_status); 1122 if (!req->rq_reply_bytes_recvd) { 1123 if (xprt->ops->timer) 1124 xprt->ops->timer(xprt, task); 1125 } else 1126 task->tk_status = 0; 1127 } 1128 1129 /** 1130 * xprt_request_wait_receive - wait for the reply to an RPC request 1131 * @task: RPC task about to send a request 1132 * 1133 */ 1134 void xprt_request_wait_receive(struct rpc_task *task) 1135 { 1136 struct rpc_rqst *req = task->tk_rqstp; 1137 struct rpc_xprt *xprt = req->rq_xprt; 1138 1139 if (!test_bit(RPC_TASK_NEED_RECV, &task->tk_runstate)) 1140 return; 1141 /* 1142 * Sleep on the pending queue if we're expecting a reply. 1143 * The spinlock ensures atomicity between the test of 1144 * req->rq_reply_bytes_recvd, and the call to rpc_sleep_on(). 1145 */ 1146 spin_lock(&xprt->queue_lock); 1147 if (test_bit(RPC_TASK_NEED_RECV, &task->tk_runstate)) { 1148 xprt->ops->set_retrans_timeout(task); 1149 rpc_sleep_on(&xprt->pending, task, xprt_timer); 1150 /* 1151 * Send an extra queue wakeup call if the 1152 * connection was dropped in case the call to 1153 * rpc_sleep_on() raced. 1154 */ 1155 if (xprt_request_retransmit_after_disconnect(task)) 1156 rpc_wake_up_queued_task_set_status(&xprt->pending, 1157 task, -ENOTCONN); 1158 } 1159 spin_unlock(&xprt->queue_lock); 1160 } 1161 1162 static bool 1163 xprt_request_need_enqueue_transmit(struct rpc_task *task, struct rpc_rqst *req) 1164 { 1165 return !test_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate); 1166 } 1167 1168 /** 1169 * xprt_request_enqueue_transmit - queue a task for transmission 1170 * @task: pointer to rpc_task 1171 * 1172 * Add a task to the transmission queue. 1173 */ 1174 void 1175 xprt_request_enqueue_transmit(struct rpc_task *task) 1176 { 1177 struct rpc_rqst *pos, *req = task->tk_rqstp; 1178 struct rpc_xprt *xprt = req->rq_xprt; 1179 1180 if (xprt_request_need_enqueue_transmit(task, req)) { 1181 spin_lock(&xprt->queue_lock); 1182 /* 1183 * Requests that carry congestion control credits are added 1184 * to the head of the list to avoid starvation issues. 1185 */ 1186 if (req->rq_cong) { 1187 xprt_clear_congestion_window_wait(xprt); 1188 list_for_each_entry(pos, &xprt->xmit_queue, rq_xmit) { 1189 if (pos->rq_cong) 1190 continue; 1191 /* Note: req is added _before_ pos */ 1192 list_add_tail(&req->rq_xmit, &pos->rq_xmit); 1193 INIT_LIST_HEAD(&req->rq_xmit2); 1194 goto out; 1195 } 1196 } else if (RPC_IS_SWAPPER(task)) { 1197 list_for_each_entry(pos, &xprt->xmit_queue, rq_xmit) { 1198 if (pos->rq_cong || pos->rq_bytes_sent) 1199 continue; 1200 if (RPC_IS_SWAPPER(pos->rq_task)) 1201 continue; 1202 /* Note: req is added _before_ pos */ 1203 list_add_tail(&req->rq_xmit, &pos->rq_xmit); 1204 INIT_LIST_HEAD(&req->rq_xmit2); 1205 goto out; 1206 } 1207 } else { 1208 list_for_each_entry(pos, &xprt->xmit_queue, rq_xmit) { 1209 if (pos->rq_task->tk_owner != task->tk_owner) 1210 continue; 1211 list_add_tail(&req->rq_xmit2, &pos->rq_xmit2); 1212 INIT_LIST_HEAD(&req->rq_xmit); 1213 goto out; 1214 } 1215 } 1216 list_add_tail(&req->rq_xmit, &xprt->xmit_queue); 1217 INIT_LIST_HEAD(&req->rq_xmit2); 1218 out: 1219 set_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate); 1220 spin_unlock(&xprt->queue_lock); 1221 } 1222 } 1223 1224 /** 1225 * xprt_request_dequeue_transmit_locked - remove a task from the transmission queue 1226 * @task: pointer to rpc_task 1227 * 1228 * Remove a task from the transmission queue 1229 * Caller must hold xprt->queue_lock 1230 */ 1231 static void 1232 xprt_request_dequeue_transmit_locked(struct rpc_task *task) 1233 { 1234 struct rpc_rqst *req = task->tk_rqstp; 1235 1236 if (!test_and_clear_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate)) 1237 return; 1238 if (!list_empty(&req->rq_xmit)) { 1239 list_del(&req->rq_xmit); 1240 if (!list_empty(&req->rq_xmit2)) { 1241 struct rpc_rqst *next = list_first_entry(&req->rq_xmit2, 1242 struct rpc_rqst, rq_xmit2); 1243 list_del(&req->rq_xmit2); 1244 list_add_tail(&next->rq_xmit, &next->rq_xprt->xmit_queue); 1245 } 1246 } else 1247 list_del(&req->rq_xmit2); 1248 } 1249 1250 /** 1251 * xprt_request_dequeue_transmit - remove a task from the transmission queue 1252 * @task: pointer to rpc_task 1253 * 1254 * Remove a task from the transmission queue 1255 */ 1256 static void 1257 xprt_request_dequeue_transmit(struct rpc_task *task) 1258 { 1259 struct rpc_rqst *req = task->tk_rqstp; 1260 struct rpc_xprt *xprt = req->rq_xprt; 1261 1262 spin_lock(&xprt->queue_lock); 1263 xprt_request_dequeue_transmit_locked(task); 1264 spin_unlock(&xprt->queue_lock); 1265 } 1266 1267 /** 1268 * xprt_request_prepare - prepare an encoded request for transport 1269 * @req: pointer to rpc_rqst 1270 * 1271 * Calls into the transport layer to do whatever is needed to prepare 1272 * the request for transmission or receive. 1273 */ 1274 void 1275 xprt_request_prepare(struct rpc_rqst *req) 1276 { 1277 struct rpc_xprt *xprt = req->rq_xprt; 1278 1279 if (xprt->ops->prepare_request) 1280 xprt->ops->prepare_request(req); 1281 } 1282 1283 /** 1284 * xprt_request_need_retransmit - Test if a task needs retransmission 1285 * @task: pointer to rpc_task 1286 * 1287 * Test for whether a connection breakage requires the task to retransmit 1288 */ 1289 bool 1290 xprt_request_need_retransmit(struct rpc_task *task) 1291 { 1292 return xprt_request_retransmit_after_disconnect(task); 1293 } 1294 1295 /** 1296 * xprt_prepare_transmit - reserve the transport before sending a request 1297 * @task: RPC task about to send a request 1298 * 1299 */ 1300 bool xprt_prepare_transmit(struct rpc_task *task) 1301 { 1302 struct rpc_rqst *req = task->tk_rqstp; 1303 struct rpc_xprt *xprt = req->rq_xprt; 1304 1305 dprintk("RPC: %5u xprt_prepare_transmit\n", task->tk_pid); 1306 1307 if (!xprt_lock_write(xprt, task)) { 1308 /* Race breaker: someone may have transmitted us */ 1309 if (!test_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate)) 1310 rpc_wake_up_queued_task_set_status(&xprt->sending, 1311 task, 0); 1312 return false; 1313 1314 } 1315 return true; 1316 } 1317 1318 void xprt_end_transmit(struct rpc_task *task) 1319 { 1320 xprt_release_write(task->tk_rqstp->rq_xprt, task); 1321 } 1322 1323 /** 1324 * xprt_request_transmit - send an RPC request on a transport 1325 * @req: pointer to request to transmit 1326 * @snd_task: RPC task that owns the transport lock 1327 * 1328 * This performs the transmission of a single request. 1329 * Note that if the request is not the same as snd_task, then it 1330 * does need to be pinned. 1331 * Returns '0' on success. 1332 */ 1333 static int 1334 xprt_request_transmit(struct rpc_rqst *req, struct rpc_task *snd_task) 1335 { 1336 struct rpc_xprt *xprt = req->rq_xprt; 1337 struct rpc_task *task = req->rq_task; 1338 unsigned int connect_cookie; 1339 int is_retrans = RPC_WAS_SENT(task); 1340 int status; 1341 1342 dprintk("RPC: %5u xprt_transmit(%u)\n", task->tk_pid, req->rq_slen); 1343 1344 if (!req->rq_bytes_sent) { 1345 if (xprt_request_data_received(task)) { 1346 status = 0; 1347 goto out_dequeue; 1348 } 1349 /* Verify that our message lies in the RPCSEC_GSS window */ 1350 if (rpcauth_xmit_need_reencode(task)) { 1351 status = -EBADMSG; 1352 goto out_dequeue; 1353 } 1354 } 1355 1356 /* 1357 * Update req->rq_ntrans before transmitting to avoid races with 1358 * xprt_update_rtt(), which needs to know that it is recording a 1359 * reply to the first transmission. 1360 */ 1361 req->rq_ntrans++; 1362 1363 connect_cookie = xprt->connect_cookie; 1364 status = xprt->ops->send_request(req); 1365 trace_xprt_transmit(xprt, req->rq_xid, status); 1366 if (status != 0) { 1367 req->rq_ntrans--; 1368 return status; 1369 } 1370 1371 if (is_retrans) 1372 task->tk_client->cl_stats->rpcretrans++; 1373 1374 xprt_inject_disconnect(xprt); 1375 1376 dprintk("RPC: %5u xmit complete\n", task->tk_pid); 1377 task->tk_flags |= RPC_TASK_SENT; 1378 spin_lock_bh(&xprt->transport_lock); 1379 1380 xprt->stat.sends++; 1381 xprt->stat.req_u += xprt->stat.sends - xprt->stat.recvs; 1382 xprt->stat.bklog_u += xprt->backlog.qlen; 1383 xprt->stat.sending_u += xprt->sending.qlen; 1384 xprt->stat.pending_u += xprt->pending.qlen; 1385 spin_unlock_bh(&xprt->transport_lock); 1386 1387 req->rq_connect_cookie = connect_cookie; 1388 out_dequeue: 1389 xprt_request_dequeue_transmit(task); 1390 rpc_wake_up_queued_task_set_status(&xprt->sending, task, status); 1391 return status; 1392 } 1393 1394 /** 1395 * xprt_transmit - send an RPC request on a transport 1396 * @task: controlling RPC task 1397 * 1398 * Attempts to drain the transmit queue. On exit, either the transport 1399 * signalled an error that needs to be handled before transmission can 1400 * resume, or @task finished transmitting, and detected that it already 1401 * received a reply. 1402 */ 1403 void 1404 xprt_transmit(struct rpc_task *task) 1405 { 1406 struct rpc_rqst *next, *req = task->tk_rqstp; 1407 struct rpc_xprt *xprt = req->rq_xprt; 1408 int status; 1409 1410 spin_lock(&xprt->queue_lock); 1411 while (!list_empty(&xprt->xmit_queue)) { 1412 next = list_first_entry(&xprt->xmit_queue, 1413 struct rpc_rqst, rq_xmit); 1414 xprt_pin_rqst(next); 1415 spin_unlock(&xprt->queue_lock); 1416 status = xprt_request_transmit(next, task); 1417 if (status == -EBADMSG && next != req) 1418 status = 0; 1419 cond_resched(); 1420 spin_lock(&xprt->queue_lock); 1421 xprt_unpin_rqst(next); 1422 if (status == 0) { 1423 if (!xprt_request_data_received(task) || 1424 test_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate)) 1425 continue; 1426 } else if (test_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate)) 1427 task->tk_status = status; 1428 break; 1429 } 1430 spin_unlock(&xprt->queue_lock); 1431 } 1432 1433 static void xprt_add_backlog(struct rpc_xprt *xprt, struct rpc_task *task) 1434 { 1435 set_bit(XPRT_CONGESTED, &xprt->state); 1436 rpc_sleep_on(&xprt->backlog, task, NULL); 1437 } 1438 1439 static void xprt_wake_up_backlog(struct rpc_xprt *xprt) 1440 { 1441 if (rpc_wake_up_next(&xprt->backlog) == NULL) 1442 clear_bit(XPRT_CONGESTED, &xprt->state); 1443 } 1444 1445 static bool xprt_throttle_congested(struct rpc_xprt *xprt, struct rpc_task *task) 1446 { 1447 bool ret = false; 1448 1449 if (!test_bit(XPRT_CONGESTED, &xprt->state)) 1450 goto out; 1451 spin_lock(&xprt->reserve_lock); 1452 if (test_bit(XPRT_CONGESTED, &xprt->state)) { 1453 rpc_sleep_on(&xprt->backlog, task, NULL); 1454 ret = true; 1455 } 1456 spin_unlock(&xprt->reserve_lock); 1457 out: 1458 return ret; 1459 } 1460 1461 static struct rpc_rqst *xprt_dynamic_alloc_slot(struct rpc_xprt *xprt) 1462 { 1463 struct rpc_rqst *req = ERR_PTR(-EAGAIN); 1464 1465 if (xprt->num_reqs >= xprt->max_reqs) 1466 goto out; 1467 ++xprt->num_reqs; 1468 spin_unlock(&xprt->reserve_lock); 1469 req = kzalloc(sizeof(struct rpc_rqst), GFP_NOFS); 1470 spin_lock(&xprt->reserve_lock); 1471 if (req != NULL) 1472 goto out; 1473 --xprt->num_reqs; 1474 req = ERR_PTR(-ENOMEM); 1475 out: 1476 return req; 1477 } 1478 1479 static bool xprt_dynamic_free_slot(struct rpc_xprt *xprt, struct rpc_rqst *req) 1480 { 1481 if (xprt->num_reqs > xprt->min_reqs) { 1482 --xprt->num_reqs; 1483 kfree(req); 1484 return true; 1485 } 1486 return false; 1487 } 1488 1489 void xprt_alloc_slot(struct rpc_xprt *xprt, struct rpc_task *task) 1490 { 1491 struct rpc_rqst *req; 1492 1493 spin_lock(&xprt->reserve_lock); 1494 if (!list_empty(&xprt->free)) { 1495 req = list_entry(xprt->free.next, struct rpc_rqst, rq_list); 1496 list_del(&req->rq_list); 1497 goto out_init_req; 1498 } 1499 req = xprt_dynamic_alloc_slot(xprt); 1500 if (!IS_ERR(req)) 1501 goto out_init_req; 1502 switch (PTR_ERR(req)) { 1503 case -ENOMEM: 1504 dprintk("RPC: dynamic allocation of request slot " 1505 "failed! Retrying\n"); 1506 task->tk_status = -ENOMEM; 1507 break; 1508 case -EAGAIN: 1509 xprt_add_backlog(xprt, task); 1510 dprintk("RPC: waiting for request slot\n"); 1511 /* fall through */ 1512 default: 1513 task->tk_status = -EAGAIN; 1514 } 1515 spin_unlock(&xprt->reserve_lock); 1516 return; 1517 out_init_req: 1518 xprt->stat.max_slots = max_t(unsigned int, xprt->stat.max_slots, 1519 xprt->num_reqs); 1520 spin_unlock(&xprt->reserve_lock); 1521 1522 task->tk_status = 0; 1523 task->tk_rqstp = req; 1524 } 1525 EXPORT_SYMBOL_GPL(xprt_alloc_slot); 1526 1527 void xprt_free_slot(struct rpc_xprt *xprt, struct rpc_rqst *req) 1528 { 1529 spin_lock(&xprt->reserve_lock); 1530 if (!xprt_dynamic_free_slot(xprt, req)) { 1531 memset(req, 0, sizeof(*req)); /* mark unused */ 1532 list_add(&req->rq_list, &xprt->free); 1533 } 1534 xprt_wake_up_backlog(xprt); 1535 spin_unlock(&xprt->reserve_lock); 1536 } 1537 EXPORT_SYMBOL_GPL(xprt_free_slot); 1538 1539 static void xprt_free_all_slots(struct rpc_xprt *xprt) 1540 { 1541 struct rpc_rqst *req; 1542 while (!list_empty(&xprt->free)) { 1543 req = list_first_entry(&xprt->free, struct rpc_rqst, rq_list); 1544 list_del(&req->rq_list); 1545 kfree(req); 1546 } 1547 } 1548 1549 struct rpc_xprt *xprt_alloc(struct net *net, size_t size, 1550 unsigned int num_prealloc, 1551 unsigned int max_alloc) 1552 { 1553 struct rpc_xprt *xprt; 1554 struct rpc_rqst *req; 1555 int i; 1556 1557 xprt = kzalloc(size, GFP_KERNEL); 1558 if (xprt == NULL) 1559 goto out; 1560 1561 xprt_init(xprt, net); 1562 1563 for (i = 0; i < num_prealloc; i++) { 1564 req = kzalloc(sizeof(struct rpc_rqst), GFP_KERNEL); 1565 if (!req) 1566 goto out_free; 1567 list_add(&req->rq_list, &xprt->free); 1568 } 1569 if (max_alloc > num_prealloc) 1570 xprt->max_reqs = max_alloc; 1571 else 1572 xprt->max_reqs = num_prealloc; 1573 xprt->min_reqs = num_prealloc; 1574 xprt->num_reqs = num_prealloc; 1575 1576 return xprt; 1577 1578 out_free: 1579 xprt_free(xprt); 1580 out: 1581 return NULL; 1582 } 1583 EXPORT_SYMBOL_GPL(xprt_alloc); 1584 1585 void xprt_free(struct rpc_xprt *xprt) 1586 { 1587 put_net(xprt->xprt_net); 1588 xprt_free_all_slots(xprt); 1589 kfree_rcu(xprt, rcu); 1590 } 1591 EXPORT_SYMBOL_GPL(xprt_free); 1592 1593 static void 1594 xprt_init_connect_cookie(struct rpc_rqst *req, struct rpc_xprt *xprt) 1595 { 1596 req->rq_connect_cookie = xprt_connect_cookie(xprt) - 1; 1597 } 1598 1599 static __be32 1600 xprt_alloc_xid(struct rpc_xprt *xprt) 1601 { 1602 __be32 xid; 1603 1604 spin_lock(&xprt->reserve_lock); 1605 xid = (__force __be32)xprt->xid++; 1606 spin_unlock(&xprt->reserve_lock); 1607 return xid; 1608 } 1609 1610 static void 1611 xprt_init_xid(struct rpc_xprt *xprt) 1612 { 1613 xprt->xid = prandom_u32(); 1614 } 1615 1616 static void 1617 xprt_request_init(struct rpc_task *task) 1618 { 1619 struct rpc_xprt *xprt = task->tk_xprt; 1620 struct rpc_rqst *req = task->tk_rqstp; 1621 1622 req->rq_timeout = task->tk_client->cl_timeout->to_initval; 1623 req->rq_task = task; 1624 req->rq_xprt = xprt; 1625 req->rq_buffer = NULL; 1626 req->rq_xid = xprt_alloc_xid(xprt); 1627 xprt_init_connect_cookie(req, xprt); 1628 req->rq_bytes_sent = 0; 1629 req->rq_snd_buf.len = 0; 1630 req->rq_snd_buf.buflen = 0; 1631 req->rq_rcv_buf.len = 0; 1632 req->rq_rcv_buf.buflen = 0; 1633 req->rq_snd_buf.bvec = NULL; 1634 req->rq_rcv_buf.bvec = NULL; 1635 req->rq_release_snd_buf = NULL; 1636 xprt_reset_majortimeo(req); 1637 dprintk("RPC: %5u reserved req %p xid %08x\n", task->tk_pid, 1638 req, ntohl(req->rq_xid)); 1639 } 1640 1641 static void 1642 xprt_do_reserve(struct rpc_xprt *xprt, struct rpc_task *task) 1643 { 1644 xprt->ops->alloc_slot(xprt, task); 1645 if (task->tk_rqstp != NULL) 1646 xprt_request_init(task); 1647 } 1648 1649 /** 1650 * xprt_reserve - allocate an RPC request slot 1651 * @task: RPC task requesting a slot allocation 1652 * 1653 * If the transport is marked as being congested, or if no more 1654 * slots are available, place the task on the transport's 1655 * backlog queue. 1656 */ 1657 void xprt_reserve(struct rpc_task *task) 1658 { 1659 struct rpc_xprt *xprt = task->tk_xprt; 1660 1661 task->tk_status = 0; 1662 if (task->tk_rqstp != NULL) 1663 return; 1664 1665 task->tk_timeout = 0; 1666 task->tk_status = -EAGAIN; 1667 if (!xprt_throttle_congested(xprt, task)) 1668 xprt_do_reserve(xprt, task); 1669 } 1670 1671 /** 1672 * xprt_retry_reserve - allocate an RPC request slot 1673 * @task: RPC task requesting a slot allocation 1674 * 1675 * If no more slots are available, place the task on the transport's 1676 * backlog queue. 1677 * Note that the only difference with xprt_reserve is that we now 1678 * ignore the value of the XPRT_CONGESTED flag. 1679 */ 1680 void xprt_retry_reserve(struct rpc_task *task) 1681 { 1682 struct rpc_xprt *xprt = task->tk_xprt; 1683 1684 task->tk_status = 0; 1685 if (task->tk_rqstp != NULL) 1686 return; 1687 1688 task->tk_timeout = 0; 1689 task->tk_status = -EAGAIN; 1690 xprt_do_reserve(xprt, task); 1691 } 1692 1693 static void 1694 xprt_request_dequeue_all(struct rpc_task *task, struct rpc_rqst *req) 1695 { 1696 struct rpc_xprt *xprt = req->rq_xprt; 1697 1698 if (test_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate) || 1699 test_bit(RPC_TASK_NEED_RECV, &task->tk_runstate) || 1700 xprt_is_pinned_rqst(req)) { 1701 spin_lock(&xprt->queue_lock); 1702 xprt_request_dequeue_transmit_locked(task); 1703 xprt_request_dequeue_receive_locked(task); 1704 while (xprt_is_pinned_rqst(req)) { 1705 set_bit(RPC_TASK_MSG_PIN_WAIT, &task->tk_runstate); 1706 spin_unlock(&xprt->queue_lock); 1707 xprt_wait_on_pinned_rqst(req); 1708 spin_lock(&xprt->queue_lock); 1709 clear_bit(RPC_TASK_MSG_PIN_WAIT, &task->tk_runstate); 1710 } 1711 spin_unlock(&xprt->queue_lock); 1712 } 1713 } 1714 1715 /** 1716 * xprt_release - release an RPC request slot 1717 * @task: task which is finished with the slot 1718 * 1719 */ 1720 void xprt_release(struct rpc_task *task) 1721 { 1722 struct rpc_xprt *xprt; 1723 struct rpc_rqst *req = task->tk_rqstp; 1724 1725 if (req == NULL) { 1726 if (task->tk_client) { 1727 xprt = task->tk_xprt; 1728 xprt_release_write(xprt, task); 1729 } 1730 return; 1731 } 1732 1733 xprt = req->rq_xprt; 1734 if (task->tk_ops->rpc_count_stats != NULL) 1735 task->tk_ops->rpc_count_stats(task, task->tk_calldata); 1736 else if (task->tk_client) 1737 rpc_count_iostats(task, task->tk_client->cl_metrics); 1738 xprt_request_dequeue_all(task, req); 1739 spin_lock_bh(&xprt->transport_lock); 1740 xprt->ops->release_xprt(xprt, task); 1741 if (xprt->ops->release_request) 1742 xprt->ops->release_request(task); 1743 xprt->last_used = jiffies; 1744 xprt_schedule_autodisconnect(xprt); 1745 spin_unlock_bh(&xprt->transport_lock); 1746 if (req->rq_buffer) 1747 xprt->ops->buf_free(task); 1748 xprt_inject_disconnect(xprt); 1749 xdr_free_bvec(&req->rq_rcv_buf); 1750 if (req->rq_cred != NULL) 1751 put_rpccred(req->rq_cred); 1752 task->tk_rqstp = NULL; 1753 if (req->rq_release_snd_buf) 1754 req->rq_release_snd_buf(req); 1755 1756 dprintk("RPC: %5u release request %p\n", task->tk_pid, req); 1757 if (likely(!bc_prealloc(req))) 1758 xprt->ops->free_slot(xprt, req); 1759 else 1760 xprt_free_bc_request(req); 1761 } 1762 1763 #ifdef CONFIG_SUNRPC_BACKCHANNEL 1764 void 1765 xprt_init_bc_request(struct rpc_rqst *req, struct rpc_task *task) 1766 { 1767 struct xdr_buf *xbufp = &req->rq_snd_buf; 1768 1769 task->tk_rqstp = req; 1770 req->rq_task = task; 1771 xprt_init_connect_cookie(req, req->rq_xprt); 1772 /* 1773 * Set up the xdr_buf length. 1774 * This also indicates that the buffer is XDR encoded already. 1775 */ 1776 xbufp->len = xbufp->head[0].iov_len + xbufp->page_len + 1777 xbufp->tail[0].iov_len; 1778 req->rq_bytes_sent = 0; 1779 } 1780 #endif 1781 1782 static void xprt_init(struct rpc_xprt *xprt, struct net *net) 1783 { 1784 kref_init(&xprt->kref); 1785 1786 spin_lock_init(&xprt->transport_lock); 1787 spin_lock_init(&xprt->reserve_lock); 1788 spin_lock_init(&xprt->queue_lock); 1789 1790 INIT_LIST_HEAD(&xprt->free); 1791 xprt->recv_queue = RB_ROOT; 1792 INIT_LIST_HEAD(&xprt->xmit_queue); 1793 #if defined(CONFIG_SUNRPC_BACKCHANNEL) 1794 spin_lock_init(&xprt->bc_pa_lock); 1795 INIT_LIST_HEAD(&xprt->bc_pa_list); 1796 #endif /* CONFIG_SUNRPC_BACKCHANNEL */ 1797 INIT_LIST_HEAD(&xprt->xprt_switch); 1798 1799 xprt->last_used = jiffies; 1800 xprt->cwnd = RPC_INITCWND; 1801 xprt->bind_index = 0; 1802 1803 rpc_init_wait_queue(&xprt->binding, "xprt_binding"); 1804 rpc_init_wait_queue(&xprt->pending, "xprt_pending"); 1805 rpc_init_wait_queue(&xprt->sending, "xprt_sending"); 1806 rpc_init_priority_wait_queue(&xprt->backlog, "xprt_backlog"); 1807 1808 xprt_init_xid(xprt); 1809 1810 xprt->xprt_net = get_net(net); 1811 } 1812 1813 /** 1814 * xprt_create_transport - create an RPC transport 1815 * @args: rpc transport creation arguments 1816 * 1817 */ 1818 struct rpc_xprt *xprt_create_transport(struct xprt_create *args) 1819 { 1820 struct rpc_xprt *xprt; 1821 struct xprt_class *t; 1822 1823 spin_lock(&xprt_list_lock); 1824 list_for_each_entry(t, &xprt_list, list) { 1825 if (t->ident == args->ident) { 1826 spin_unlock(&xprt_list_lock); 1827 goto found; 1828 } 1829 } 1830 spin_unlock(&xprt_list_lock); 1831 dprintk("RPC: transport (%d) not supported\n", args->ident); 1832 return ERR_PTR(-EIO); 1833 1834 found: 1835 xprt = t->setup(args); 1836 if (IS_ERR(xprt)) { 1837 dprintk("RPC: xprt_create_transport: failed, %ld\n", 1838 -PTR_ERR(xprt)); 1839 goto out; 1840 } 1841 if (args->flags & XPRT_CREATE_NO_IDLE_TIMEOUT) 1842 xprt->idle_timeout = 0; 1843 INIT_WORK(&xprt->task_cleanup, xprt_autoclose); 1844 if (xprt_has_timer(xprt)) 1845 timer_setup(&xprt->timer, xprt_init_autodisconnect, 0); 1846 else 1847 timer_setup(&xprt->timer, NULL, 0); 1848 1849 if (strlen(args->servername) > RPC_MAXNETNAMELEN) { 1850 xprt_destroy(xprt); 1851 return ERR_PTR(-EINVAL); 1852 } 1853 xprt->servername = kstrdup(args->servername, GFP_KERNEL); 1854 if (xprt->servername == NULL) { 1855 xprt_destroy(xprt); 1856 return ERR_PTR(-ENOMEM); 1857 } 1858 1859 rpc_xprt_debugfs_register(xprt); 1860 1861 dprintk("RPC: created transport %p with %u slots\n", xprt, 1862 xprt->max_reqs); 1863 out: 1864 return xprt; 1865 } 1866 1867 static void xprt_destroy_cb(struct work_struct *work) 1868 { 1869 struct rpc_xprt *xprt = 1870 container_of(work, struct rpc_xprt, task_cleanup); 1871 1872 rpc_xprt_debugfs_unregister(xprt); 1873 rpc_destroy_wait_queue(&xprt->binding); 1874 rpc_destroy_wait_queue(&xprt->pending); 1875 rpc_destroy_wait_queue(&xprt->sending); 1876 rpc_destroy_wait_queue(&xprt->backlog); 1877 kfree(xprt->servername); 1878 /* 1879 * Tear down transport state and free the rpc_xprt 1880 */ 1881 xprt->ops->destroy(xprt); 1882 } 1883 1884 /** 1885 * xprt_destroy - destroy an RPC transport, killing off all requests. 1886 * @xprt: transport to destroy 1887 * 1888 */ 1889 static void xprt_destroy(struct rpc_xprt *xprt) 1890 { 1891 dprintk("RPC: destroying transport %p\n", xprt); 1892 1893 /* 1894 * Exclude transport connect/disconnect handlers and autoclose 1895 */ 1896 wait_on_bit_lock(&xprt->state, XPRT_LOCKED, TASK_UNINTERRUPTIBLE); 1897 1898 del_timer_sync(&xprt->timer); 1899 1900 /* 1901 * Destroy sockets etc from the system workqueue so they can 1902 * safely flush receive work running on rpciod. 1903 */ 1904 INIT_WORK(&xprt->task_cleanup, xprt_destroy_cb); 1905 schedule_work(&xprt->task_cleanup); 1906 } 1907 1908 static void xprt_destroy_kref(struct kref *kref) 1909 { 1910 xprt_destroy(container_of(kref, struct rpc_xprt, kref)); 1911 } 1912 1913 /** 1914 * xprt_get - return a reference to an RPC transport. 1915 * @xprt: pointer to the transport 1916 * 1917 */ 1918 struct rpc_xprt *xprt_get(struct rpc_xprt *xprt) 1919 { 1920 if (xprt != NULL && kref_get_unless_zero(&xprt->kref)) 1921 return xprt; 1922 return NULL; 1923 } 1924 EXPORT_SYMBOL_GPL(xprt_get); 1925 1926 /** 1927 * xprt_put - release a reference to an RPC transport. 1928 * @xprt: pointer to the transport 1929 * 1930 */ 1931 void xprt_put(struct rpc_xprt *xprt) 1932 { 1933 if (xprt != NULL) 1934 kref_put(&xprt->kref, xprt_destroy_kref); 1935 } 1936 EXPORT_SYMBOL_GPL(xprt_put); 1937