1 /* 2 * linux/net/sunrpc/xprt.c 3 * 4 * This is a generic RPC call interface supporting congestion avoidance, 5 * and asynchronous calls. 6 * 7 * The interface works like this: 8 * 9 * - When a process places a call, it allocates a request slot if 10 * one is available. Otherwise, it sleeps on the backlog queue 11 * (xprt_reserve). 12 * - Next, the caller puts together the RPC message, stuffs it into 13 * the request struct, and calls xprt_transmit(). 14 * - xprt_transmit sends the message and installs the caller on the 15 * transport's wait list. At the same time, if a reply is expected, 16 * it installs a timer that is run after the packet's timeout has 17 * expired. 18 * - When a packet arrives, the data_ready handler walks the list of 19 * pending requests for that transport. If a matching XID is found, the 20 * caller is woken up, and the timer removed. 21 * - When no reply arrives within the timeout interval, the timer is 22 * fired by the kernel and runs xprt_timer(). It either adjusts the 23 * timeout values (minor timeout) or wakes up the caller with a status 24 * of -ETIMEDOUT. 25 * - When the caller receives a notification from RPC that a reply arrived, 26 * it should release the RPC slot, and process the reply. 27 * If the call timed out, it may choose to retry the operation by 28 * adjusting the initial timeout value, and simply calling rpc_call 29 * again. 30 * 31 * Support for async RPC is done through a set of RPC-specific scheduling 32 * primitives that `transparently' work for processes as well as async 33 * tasks that rely on callbacks. 34 * 35 * Copyright (C) 1995-1997, Olaf Kirch <okir@monad.swb.de> 36 * 37 * Transport switch API copyright (C) 2005, Chuck Lever <cel@netapp.com> 38 */ 39 40 #include <linux/module.h> 41 42 #include <linux/types.h> 43 #include <linux/interrupt.h> 44 #include <linux/workqueue.h> 45 #include <linux/net.h> 46 #include <linux/ktime.h> 47 48 #include <linux/sunrpc/clnt.h> 49 #include <linux/sunrpc/metrics.h> 50 #include <linux/sunrpc/bc_xprt.h> 51 52 #include "sunrpc.h" 53 54 /* 55 * Local variables 56 */ 57 58 #ifdef RPC_DEBUG 59 # define RPCDBG_FACILITY RPCDBG_XPRT 60 #endif 61 62 /* 63 * Local functions 64 */ 65 static void xprt_init(struct rpc_xprt *xprt, struct net *net); 66 static void xprt_request_init(struct rpc_task *, struct rpc_xprt *); 67 static void xprt_connect_status(struct rpc_task *task); 68 static int __xprt_get_cong(struct rpc_xprt *, struct rpc_task *); 69 static void xprt_destroy(struct rpc_xprt *xprt); 70 71 static DEFINE_SPINLOCK(xprt_list_lock); 72 static LIST_HEAD(xprt_list); 73 74 /* 75 * The transport code maintains an estimate on the maximum number of out- 76 * standing RPC requests, using a smoothed version of the congestion 77 * avoidance implemented in 44BSD. This is basically the Van Jacobson 78 * congestion algorithm: If a retransmit occurs, the congestion window is 79 * halved; otherwise, it is incremented by 1/cwnd when 80 * 81 * - a reply is received and 82 * - a full number of requests are outstanding and 83 * - the congestion window hasn't been updated recently. 84 */ 85 #define RPC_CWNDSHIFT (8U) 86 #define RPC_CWNDSCALE (1U << RPC_CWNDSHIFT) 87 #define RPC_INITCWND RPC_CWNDSCALE 88 #define RPC_MAXCWND(xprt) ((xprt)->max_reqs << RPC_CWNDSHIFT) 89 90 #define RPCXPRT_CONGESTED(xprt) ((xprt)->cong >= (xprt)->cwnd) 91 92 /** 93 * xprt_register_transport - register a transport implementation 94 * @transport: transport to register 95 * 96 * If a transport implementation is loaded as a kernel module, it can 97 * call this interface to make itself known to the RPC client. 98 * 99 * Returns: 100 * 0: transport successfully registered 101 * -EEXIST: transport already registered 102 * -EINVAL: transport module being unloaded 103 */ 104 int xprt_register_transport(struct xprt_class *transport) 105 { 106 struct xprt_class *t; 107 int result; 108 109 result = -EEXIST; 110 spin_lock(&xprt_list_lock); 111 list_for_each_entry(t, &xprt_list, list) { 112 /* don't register the same transport class twice */ 113 if (t->ident == transport->ident) 114 goto out; 115 } 116 117 list_add_tail(&transport->list, &xprt_list); 118 printk(KERN_INFO "RPC: Registered %s transport module.\n", 119 transport->name); 120 result = 0; 121 122 out: 123 spin_unlock(&xprt_list_lock); 124 return result; 125 } 126 EXPORT_SYMBOL_GPL(xprt_register_transport); 127 128 /** 129 * xprt_unregister_transport - unregister a transport implementation 130 * @transport: transport to unregister 131 * 132 * Returns: 133 * 0: transport successfully unregistered 134 * -ENOENT: transport never registered 135 */ 136 int xprt_unregister_transport(struct xprt_class *transport) 137 { 138 struct xprt_class *t; 139 int result; 140 141 result = 0; 142 spin_lock(&xprt_list_lock); 143 list_for_each_entry(t, &xprt_list, list) { 144 if (t == transport) { 145 printk(KERN_INFO 146 "RPC: Unregistered %s transport module.\n", 147 transport->name); 148 list_del_init(&transport->list); 149 goto out; 150 } 151 } 152 result = -ENOENT; 153 154 out: 155 spin_unlock(&xprt_list_lock); 156 return result; 157 } 158 EXPORT_SYMBOL_GPL(xprt_unregister_transport); 159 160 /** 161 * xprt_load_transport - load a transport implementation 162 * @transport_name: transport to load 163 * 164 * Returns: 165 * 0: transport successfully loaded 166 * -ENOENT: transport module not available 167 */ 168 int xprt_load_transport(const char *transport_name) 169 { 170 struct xprt_class *t; 171 int result; 172 173 result = 0; 174 spin_lock(&xprt_list_lock); 175 list_for_each_entry(t, &xprt_list, list) { 176 if (strcmp(t->name, transport_name) == 0) { 177 spin_unlock(&xprt_list_lock); 178 goto out; 179 } 180 } 181 spin_unlock(&xprt_list_lock); 182 result = request_module("xprt%s", transport_name); 183 out: 184 return result; 185 } 186 EXPORT_SYMBOL_GPL(xprt_load_transport); 187 188 /** 189 * xprt_reserve_xprt - serialize write access to transports 190 * @task: task that is requesting access to the transport 191 * @xprt: pointer to the target transport 192 * 193 * This prevents mixing the payload of separate requests, and prevents 194 * transport connects from colliding with writes. No congestion control 195 * is provided. 196 */ 197 int xprt_reserve_xprt(struct rpc_xprt *xprt, struct rpc_task *task) 198 { 199 struct rpc_rqst *req = task->tk_rqstp; 200 int priority; 201 202 if (test_and_set_bit(XPRT_LOCKED, &xprt->state)) { 203 if (task == xprt->snd_task) 204 return 1; 205 goto out_sleep; 206 } 207 xprt->snd_task = task; 208 if (req != NULL) 209 req->rq_ntrans++; 210 211 return 1; 212 213 out_sleep: 214 dprintk("RPC: %5u failed to lock transport %p\n", 215 task->tk_pid, xprt); 216 task->tk_timeout = 0; 217 task->tk_status = -EAGAIN; 218 if (req == NULL) 219 priority = RPC_PRIORITY_LOW; 220 else if (!req->rq_ntrans) 221 priority = RPC_PRIORITY_NORMAL; 222 else 223 priority = RPC_PRIORITY_HIGH; 224 rpc_sleep_on_priority(&xprt->sending, task, NULL, priority); 225 return 0; 226 } 227 EXPORT_SYMBOL_GPL(xprt_reserve_xprt); 228 229 static void xprt_clear_locked(struct rpc_xprt *xprt) 230 { 231 xprt->snd_task = NULL; 232 if (!test_bit(XPRT_CLOSE_WAIT, &xprt->state)) { 233 smp_mb__before_clear_bit(); 234 clear_bit(XPRT_LOCKED, &xprt->state); 235 smp_mb__after_clear_bit(); 236 } else 237 queue_work(rpciod_workqueue, &xprt->task_cleanup); 238 } 239 240 /* 241 * xprt_reserve_xprt_cong - serialize write access to transports 242 * @task: task that is requesting access to the transport 243 * 244 * Same as xprt_reserve_xprt, but Van Jacobson congestion control is 245 * integrated into the decision of whether a request is allowed to be 246 * woken up and given access to the transport. 247 */ 248 int xprt_reserve_xprt_cong(struct rpc_xprt *xprt, struct rpc_task *task) 249 { 250 struct rpc_rqst *req = task->tk_rqstp; 251 int priority; 252 253 if (test_and_set_bit(XPRT_LOCKED, &xprt->state)) { 254 if (task == xprt->snd_task) 255 return 1; 256 goto out_sleep; 257 } 258 if (req == NULL) { 259 xprt->snd_task = task; 260 return 1; 261 } 262 if (__xprt_get_cong(xprt, task)) { 263 xprt->snd_task = task; 264 req->rq_ntrans++; 265 return 1; 266 } 267 xprt_clear_locked(xprt); 268 out_sleep: 269 dprintk("RPC: %5u failed to lock transport %p\n", task->tk_pid, xprt); 270 task->tk_timeout = 0; 271 task->tk_status = -EAGAIN; 272 if (req == NULL) 273 priority = RPC_PRIORITY_LOW; 274 else if (!req->rq_ntrans) 275 priority = RPC_PRIORITY_NORMAL; 276 else 277 priority = RPC_PRIORITY_HIGH; 278 rpc_sleep_on_priority(&xprt->sending, task, NULL, priority); 279 return 0; 280 } 281 EXPORT_SYMBOL_GPL(xprt_reserve_xprt_cong); 282 283 static inline int xprt_lock_write(struct rpc_xprt *xprt, struct rpc_task *task) 284 { 285 int retval; 286 287 spin_lock_bh(&xprt->transport_lock); 288 retval = xprt->ops->reserve_xprt(xprt, task); 289 spin_unlock_bh(&xprt->transport_lock); 290 return retval; 291 } 292 293 static bool __xprt_lock_write_func(struct rpc_task *task, void *data) 294 { 295 struct rpc_xprt *xprt = data; 296 struct rpc_rqst *req; 297 298 req = task->tk_rqstp; 299 xprt->snd_task = task; 300 if (req) 301 req->rq_ntrans++; 302 return true; 303 } 304 305 static void __xprt_lock_write_next(struct rpc_xprt *xprt) 306 { 307 if (test_and_set_bit(XPRT_LOCKED, &xprt->state)) 308 return; 309 310 if (rpc_wake_up_first(&xprt->sending, __xprt_lock_write_func, xprt)) 311 return; 312 xprt_clear_locked(xprt); 313 } 314 315 static bool __xprt_lock_write_cong_func(struct rpc_task *task, void *data) 316 { 317 struct rpc_xprt *xprt = data; 318 struct rpc_rqst *req; 319 320 req = task->tk_rqstp; 321 if (req == NULL) { 322 xprt->snd_task = task; 323 return true; 324 } 325 if (__xprt_get_cong(xprt, task)) { 326 xprt->snd_task = task; 327 req->rq_ntrans++; 328 return true; 329 } 330 return false; 331 } 332 333 static void __xprt_lock_write_next_cong(struct rpc_xprt *xprt) 334 { 335 if (test_and_set_bit(XPRT_LOCKED, &xprt->state)) 336 return; 337 if (RPCXPRT_CONGESTED(xprt)) 338 goto out_unlock; 339 if (rpc_wake_up_first(&xprt->sending, __xprt_lock_write_cong_func, xprt)) 340 return; 341 out_unlock: 342 xprt_clear_locked(xprt); 343 } 344 345 /** 346 * xprt_release_xprt - allow other requests to use a transport 347 * @xprt: transport with other tasks potentially waiting 348 * @task: task that is releasing access to the transport 349 * 350 * Note that "task" can be NULL. No congestion control is provided. 351 */ 352 void xprt_release_xprt(struct rpc_xprt *xprt, struct rpc_task *task) 353 { 354 if (xprt->snd_task == task) { 355 if (task != NULL) { 356 struct rpc_rqst *req = task->tk_rqstp; 357 if (req != NULL) 358 req->rq_bytes_sent = 0; 359 } 360 xprt_clear_locked(xprt); 361 __xprt_lock_write_next(xprt); 362 } 363 } 364 EXPORT_SYMBOL_GPL(xprt_release_xprt); 365 366 /** 367 * xprt_release_xprt_cong - allow other requests to use a transport 368 * @xprt: transport with other tasks potentially waiting 369 * @task: task that is releasing access to the transport 370 * 371 * Note that "task" can be NULL. Another task is awoken to use the 372 * transport if the transport's congestion window allows it. 373 */ 374 void xprt_release_xprt_cong(struct rpc_xprt *xprt, struct rpc_task *task) 375 { 376 if (xprt->snd_task == task) { 377 if (task != NULL) { 378 struct rpc_rqst *req = task->tk_rqstp; 379 if (req != NULL) 380 req->rq_bytes_sent = 0; 381 } 382 xprt_clear_locked(xprt); 383 __xprt_lock_write_next_cong(xprt); 384 } 385 } 386 EXPORT_SYMBOL_GPL(xprt_release_xprt_cong); 387 388 static inline void xprt_release_write(struct rpc_xprt *xprt, struct rpc_task *task) 389 { 390 spin_lock_bh(&xprt->transport_lock); 391 xprt->ops->release_xprt(xprt, task); 392 spin_unlock_bh(&xprt->transport_lock); 393 } 394 395 /* 396 * Van Jacobson congestion avoidance. Check if the congestion window 397 * overflowed. Put the task to sleep if this is the case. 398 */ 399 static int 400 __xprt_get_cong(struct rpc_xprt *xprt, struct rpc_task *task) 401 { 402 struct rpc_rqst *req = task->tk_rqstp; 403 404 if (req->rq_cong) 405 return 1; 406 dprintk("RPC: %5u xprt_cwnd_limited cong = %lu cwnd = %lu\n", 407 task->tk_pid, xprt->cong, xprt->cwnd); 408 if (RPCXPRT_CONGESTED(xprt)) 409 return 0; 410 req->rq_cong = 1; 411 xprt->cong += RPC_CWNDSCALE; 412 return 1; 413 } 414 415 /* 416 * Adjust the congestion window, and wake up the next task 417 * that has been sleeping due to congestion 418 */ 419 static void 420 __xprt_put_cong(struct rpc_xprt *xprt, struct rpc_rqst *req) 421 { 422 if (!req->rq_cong) 423 return; 424 req->rq_cong = 0; 425 xprt->cong -= RPC_CWNDSCALE; 426 __xprt_lock_write_next_cong(xprt); 427 } 428 429 /** 430 * xprt_release_rqst_cong - housekeeping when request is complete 431 * @task: RPC request that recently completed 432 * 433 * Useful for transports that require congestion control. 434 */ 435 void xprt_release_rqst_cong(struct rpc_task *task) 436 { 437 struct rpc_rqst *req = task->tk_rqstp; 438 439 __xprt_put_cong(req->rq_xprt, req); 440 } 441 EXPORT_SYMBOL_GPL(xprt_release_rqst_cong); 442 443 /** 444 * xprt_adjust_cwnd - adjust transport congestion window 445 * @xprt: pointer to xprt 446 * @task: recently completed RPC request used to adjust window 447 * @result: result code of completed RPC request 448 * 449 * We use a time-smoothed congestion estimator to avoid heavy oscillation. 450 */ 451 void xprt_adjust_cwnd(struct rpc_xprt *xprt, struct rpc_task *task, int result) 452 { 453 struct rpc_rqst *req = task->tk_rqstp; 454 unsigned long cwnd = xprt->cwnd; 455 456 if (result >= 0 && cwnd <= xprt->cong) { 457 /* The (cwnd >> 1) term makes sure 458 * the result gets rounded properly. */ 459 cwnd += (RPC_CWNDSCALE * RPC_CWNDSCALE + (cwnd >> 1)) / cwnd; 460 if (cwnd > RPC_MAXCWND(xprt)) 461 cwnd = RPC_MAXCWND(xprt); 462 __xprt_lock_write_next_cong(xprt); 463 } else if (result == -ETIMEDOUT) { 464 cwnd >>= 1; 465 if (cwnd < RPC_CWNDSCALE) 466 cwnd = RPC_CWNDSCALE; 467 } 468 dprintk("RPC: cong %ld, cwnd was %ld, now %ld\n", 469 xprt->cong, xprt->cwnd, cwnd); 470 xprt->cwnd = cwnd; 471 __xprt_put_cong(xprt, req); 472 } 473 EXPORT_SYMBOL_GPL(xprt_adjust_cwnd); 474 475 /** 476 * xprt_wake_pending_tasks - wake all tasks on a transport's pending queue 477 * @xprt: transport with waiting tasks 478 * @status: result code to plant in each task before waking it 479 * 480 */ 481 void xprt_wake_pending_tasks(struct rpc_xprt *xprt, int status) 482 { 483 if (status < 0) 484 rpc_wake_up_status(&xprt->pending, status); 485 else 486 rpc_wake_up(&xprt->pending); 487 } 488 EXPORT_SYMBOL_GPL(xprt_wake_pending_tasks); 489 490 /** 491 * xprt_wait_for_buffer_space - wait for transport output buffer to clear 492 * @task: task to be put to sleep 493 * @action: function pointer to be executed after wait 494 * 495 * Note that we only set the timer for the case of RPC_IS_SOFT(), since 496 * we don't in general want to force a socket disconnection due to 497 * an incomplete RPC call transmission. 498 */ 499 void xprt_wait_for_buffer_space(struct rpc_task *task, rpc_action action) 500 { 501 struct rpc_rqst *req = task->tk_rqstp; 502 struct rpc_xprt *xprt = req->rq_xprt; 503 504 task->tk_timeout = RPC_IS_SOFT(task) ? req->rq_timeout : 0; 505 rpc_sleep_on(&xprt->pending, task, action); 506 } 507 EXPORT_SYMBOL_GPL(xprt_wait_for_buffer_space); 508 509 /** 510 * xprt_write_space - wake the task waiting for transport output buffer space 511 * @xprt: transport with waiting tasks 512 * 513 * Can be called in a soft IRQ context, so xprt_write_space never sleeps. 514 */ 515 void xprt_write_space(struct rpc_xprt *xprt) 516 { 517 spin_lock_bh(&xprt->transport_lock); 518 if (xprt->snd_task) { 519 dprintk("RPC: write space: waking waiting task on " 520 "xprt %p\n", xprt); 521 rpc_wake_up_queued_task(&xprt->pending, xprt->snd_task); 522 } 523 spin_unlock_bh(&xprt->transport_lock); 524 } 525 EXPORT_SYMBOL_GPL(xprt_write_space); 526 527 /** 528 * xprt_set_retrans_timeout_def - set a request's retransmit timeout 529 * @task: task whose timeout is to be set 530 * 531 * Set a request's retransmit timeout based on the transport's 532 * default timeout parameters. Used by transports that don't adjust 533 * the retransmit timeout based on round-trip time estimation. 534 */ 535 void xprt_set_retrans_timeout_def(struct rpc_task *task) 536 { 537 task->tk_timeout = task->tk_rqstp->rq_timeout; 538 } 539 EXPORT_SYMBOL_GPL(xprt_set_retrans_timeout_def); 540 541 /** 542 * xprt_set_retrans_timeout_rtt - set a request's retransmit timeout 543 * @task: task whose timeout is to be set 544 * 545 * Set a request's retransmit timeout using the RTT estimator. 546 */ 547 void xprt_set_retrans_timeout_rtt(struct rpc_task *task) 548 { 549 int timer = task->tk_msg.rpc_proc->p_timer; 550 struct rpc_clnt *clnt = task->tk_client; 551 struct rpc_rtt *rtt = clnt->cl_rtt; 552 struct rpc_rqst *req = task->tk_rqstp; 553 unsigned long max_timeout = clnt->cl_timeout->to_maxval; 554 555 task->tk_timeout = rpc_calc_rto(rtt, timer); 556 task->tk_timeout <<= rpc_ntimeo(rtt, timer) + req->rq_retries; 557 if (task->tk_timeout > max_timeout || task->tk_timeout == 0) 558 task->tk_timeout = max_timeout; 559 } 560 EXPORT_SYMBOL_GPL(xprt_set_retrans_timeout_rtt); 561 562 static void xprt_reset_majortimeo(struct rpc_rqst *req) 563 { 564 const struct rpc_timeout *to = req->rq_task->tk_client->cl_timeout; 565 566 req->rq_majortimeo = req->rq_timeout; 567 if (to->to_exponential) 568 req->rq_majortimeo <<= to->to_retries; 569 else 570 req->rq_majortimeo += to->to_increment * to->to_retries; 571 if (req->rq_majortimeo > to->to_maxval || req->rq_majortimeo == 0) 572 req->rq_majortimeo = to->to_maxval; 573 req->rq_majortimeo += jiffies; 574 } 575 576 /** 577 * xprt_adjust_timeout - adjust timeout values for next retransmit 578 * @req: RPC request containing parameters to use for the adjustment 579 * 580 */ 581 int xprt_adjust_timeout(struct rpc_rqst *req) 582 { 583 struct rpc_xprt *xprt = req->rq_xprt; 584 const struct rpc_timeout *to = req->rq_task->tk_client->cl_timeout; 585 int status = 0; 586 587 if (time_before(jiffies, req->rq_majortimeo)) { 588 if (to->to_exponential) 589 req->rq_timeout <<= 1; 590 else 591 req->rq_timeout += to->to_increment; 592 if (to->to_maxval && req->rq_timeout >= to->to_maxval) 593 req->rq_timeout = to->to_maxval; 594 req->rq_retries++; 595 } else { 596 req->rq_timeout = to->to_initval; 597 req->rq_retries = 0; 598 xprt_reset_majortimeo(req); 599 /* Reset the RTT counters == "slow start" */ 600 spin_lock_bh(&xprt->transport_lock); 601 rpc_init_rtt(req->rq_task->tk_client->cl_rtt, to->to_initval); 602 spin_unlock_bh(&xprt->transport_lock); 603 status = -ETIMEDOUT; 604 } 605 606 if (req->rq_timeout == 0) { 607 printk(KERN_WARNING "xprt_adjust_timeout: rq_timeout = 0!\n"); 608 req->rq_timeout = 5 * HZ; 609 } 610 return status; 611 } 612 613 static void xprt_autoclose(struct work_struct *work) 614 { 615 struct rpc_xprt *xprt = 616 container_of(work, struct rpc_xprt, task_cleanup); 617 618 xprt->ops->close(xprt); 619 clear_bit(XPRT_CLOSE_WAIT, &xprt->state); 620 xprt_release_write(xprt, NULL); 621 } 622 623 /** 624 * xprt_disconnect_done - mark a transport as disconnected 625 * @xprt: transport to flag for disconnect 626 * 627 */ 628 void xprt_disconnect_done(struct rpc_xprt *xprt) 629 { 630 dprintk("RPC: disconnected transport %p\n", xprt); 631 spin_lock_bh(&xprt->transport_lock); 632 xprt_clear_connected(xprt); 633 xprt_wake_pending_tasks(xprt, -EAGAIN); 634 spin_unlock_bh(&xprt->transport_lock); 635 } 636 EXPORT_SYMBOL_GPL(xprt_disconnect_done); 637 638 /** 639 * xprt_force_disconnect - force a transport to disconnect 640 * @xprt: transport to disconnect 641 * 642 */ 643 void xprt_force_disconnect(struct rpc_xprt *xprt) 644 { 645 /* Don't race with the test_bit() in xprt_clear_locked() */ 646 spin_lock_bh(&xprt->transport_lock); 647 set_bit(XPRT_CLOSE_WAIT, &xprt->state); 648 /* Try to schedule an autoclose RPC call */ 649 if (test_and_set_bit(XPRT_LOCKED, &xprt->state) == 0) 650 queue_work(rpciod_workqueue, &xprt->task_cleanup); 651 xprt_wake_pending_tasks(xprt, -EAGAIN); 652 spin_unlock_bh(&xprt->transport_lock); 653 } 654 655 /** 656 * xprt_conditional_disconnect - force a transport to disconnect 657 * @xprt: transport to disconnect 658 * @cookie: 'connection cookie' 659 * 660 * This attempts to break the connection if and only if 'cookie' matches 661 * the current transport 'connection cookie'. It ensures that we don't 662 * try to break the connection more than once when we need to retransmit 663 * a batch of RPC requests. 664 * 665 */ 666 void xprt_conditional_disconnect(struct rpc_xprt *xprt, unsigned int cookie) 667 { 668 /* Don't race with the test_bit() in xprt_clear_locked() */ 669 spin_lock_bh(&xprt->transport_lock); 670 if (cookie != xprt->connect_cookie) 671 goto out; 672 if (test_bit(XPRT_CLOSING, &xprt->state) || !xprt_connected(xprt)) 673 goto out; 674 set_bit(XPRT_CLOSE_WAIT, &xprt->state); 675 /* Try to schedule an autoclose RPC call */ 676 if (test_and_set_bit(XPRT_LOCKED, &xprt->state) == 0) 677 queue_work(rpciod_workqueue, &xprt->task_cleanup); 678 xprt_wake_pending_tasks(xprt, -EAGAIN); 679 out: 680 spin_unlock_bh(&xprt->transport_lock); 681 } 682 683 static void 684 xprt_init_autodisconnect(unsigned long data) 685 { 686 struct rpc_xprt *xprt = (struct rpc_xprt *)data; 687 688 spin_lock(&xprt->transport_lock); 689 if (!list_empty(&xprt->recv)) 690 goto out_abort; 691 if (test_and_set_bit(XPRT_LOCKED, &xprt->state)) 692 goto out_abort; 693 spin_unlock(&xprt->transport_lock); 694 set_bit(XPRT_CONNECTION_CLOSE, &xprt->state); 695 queue_work(rpciod_workqueue, &xprt->task_cleanup); 696 return; 697 out_abort: 698 spin_unlock(&xprt->transport_lock); 699 } 700 701 /** 702 * xprt_connect - schedule a transport connect operation 703 * @task: RPC task that is requesting the connect 704 * 705 */ 706 void xprt_connect(struct rpc_task *task) 707 { 708 struct rpc_xprt *xprt = task->tk_rqstp->rq_xprt; 709 710 dprintk("RPC: %5u xprt_connect xprt %p %s connected\n", task->tk_pid, 711 xprt, (xprt_connected(xprt) ? "is" : "is not")); 712 713 if (!xprt_bound(xprt)) { 714 task->tk_status = -EAGAIN; 715 return; 716 } 717 if (!xprt_lock_write(xprt, task)) 718 return; 719 720 if (test_and_clear_bit(XPRT_CLOSE_WAIT, &xprt->state)) 721 xprt->ops->close(xprt); 722 723 if (xprt_connected(xprt)) 724 xprt_release_write(xprt, task); 725 else { 726 task->tk_rqstp->rq_bytes_sent = 0; 727 task->tk_timeout = task->tk_rqstp->rq_timeout; 728 rpc_sleep_on(&xprt->pending, task, xprt_connect_status); 729 730 if (test_bit(XPRT_CLOSING, &xprt->state)) 731 return; 732 if (xprt_test_and_set_connecting(xprt)) 733 return; 734 xprt->stat.connect_start = jiffies; 735 xprt->ops->connect(xprt, task); 736 } 737 } 738 739 static void xprt_connect_status(struct rpc_task *task) 740 { 741 struct rpc_xprt *xprt = task->tk_rqstp->rq_xprt; 742 743 if (task->tk_status == 0) { 744 xprt->stat.connect_count++; 745 xprt->stat.connect_time += (long)jiffies - xprt->stat.connect_start; 746 dprintk("RPC: %5u xprt_connect_status: connection established\n", 747 task->tk_pid); 748 return; 749 } 750 751 switch (task->tk_status) { 752 case -ECONNREFUSED: 753 case -ECONNRESET: 754 case -ECONNABORTED: 755 case -ENETUNREACH: 756 case -EHOSTUNREACH: 757 case -EAGAIN: 758 dprintk("RPC: %5u xprt_connect_status: retrying\n", task->tk_pid); 759 break; 760 case -ETIMEDOUT: 761 dprintk("RPC: %5u xprt_connect_status: connect attempt timed " 762 "out\n", task->tk_pid); 763 break; 764 default: 765 dprintk("RPC: %5u xprt_connect_status: error %d connecting to " 766 "server %s\n", task->tk_pid, -task->tk_status, 767 xprt->servername); 768 xprt_release_write(xprt, task); 769 task->tk_status = -EIO; 770 } 771 } 772 773 /** 774 * xprt_lookup_rqst - find an RPC request corresponding to an XID 775 * @xprt: transport on which the original request was transmitted 776 * @xid: RPC XID of incoming reply 777 * 778 */ 779 struct rpc_rqst *xprt_lookup_rqst(struct rpc_xprt *xprt, __be32 xid) 780 { 781 struct rpc_rqst *entry; 782 783 list_for_each_entry(entry, &xprt->recv, rq_list) 784 if (entry->rq_xid == xid) 785 return entry; 786 787 dprintk("RPC: xprt_lookup_rqst did not find xid %08x\n", 788 ntohl(xid)); 789 xprt->stat.bad_xids++; 790 return NULL; 791 } 792 EXPORT_SYMBOL_GPL(xprt_lookup_rqst); 793 794 static void xprt_update_rtt(struct rpc_task *task) 795 { 796 struct rpc_rqst *req = task->tk_rqstp; 797 struct rpc_rtt *rtt = task->tk_client->cl_rtt; 798 unsigned int timer = task->tk_msg.rpc_proc->p_timer; 799 long m = usecs_to_jiffies(ktime_to_us(req->rq_rtt)); 800 801 if (timer) { 802 if (req->rq_ntrans == 1) 803 rpc_update_rtt(rtt, timer, m); 804 rpc_set_timeo(rtt, timer, req->rq_ntrans - 1); 805 } 806 } 807 808 /** 809 * xprt_complete_rqst - called when reply processing is complete 810 * @task: RPC request that recently completed 811 * @copied: actual number of bytes received from the transport 812 * 813 * Caller holds transport lock. 814 */ 815 void xprt_complete_rqst(struct rpc_task *task, int copied) 816 { 817 struct rpc_rqst *req = task->tk_rqstp; 818 struct rpc_xprt *xprt = req->rq_xprt; 819 820 dprintk("RPC: %5u xid %08x complete (%d bytes received)\n", 821 task->tk_pid, ntohl(req->rq_xid), copied); 822 823 xprt->stat.recvs++; 824 req->rq_rtt = ktime_sub(ktime_get(), req->rq_xtime); 825 if (xprt->ops->timer != NULL) 826 xprt_update_rtt(task); 827 828 list_del_init(&req->rq_list); 829 req->rq_private_buf.len = copied; 830 /* Ensure all writes are done before we update */ 831 /* req->rq_reply_bytes_recvd */ 832 smp_wmb(); 833 req->rq_reply_bytes_recvd = copied; 834 rpc_wake_up_queued_task(&xprt->pending, task); 835 } 836 EXPORT_SYMBOL_GPL(xprt_complete_rqst); 837 838 static void xprt_timer(struct rpc_task *task) 839 { 840 struct rpc_rqst *req = task->tk_rqstp; 841 struct rpc_xprt *xprt = req->rq_xprt; 842 843 if (task->tk_status != -ETIMEDOUT) 844 return; 845 dprintk("RPC: %5u xprt_timer\n", task->tk_pid); 846 847 spin_lock_bh(&xprt->transport_lock); 848 if (!req->rq_reply_bytes_recvd) { 849 if (xprt->ops->timer) 850 xprt->ops->timer(xprt, task); 851 } else 852 task->tk_status = 0; 853 spin_unlock_bh(&xprt->transport_lock); 854 } 855 856 static inline int xprt_has_timer(struct rpc_xprt *xprt) 857 { 858 return xprt->idle_timeout != 0; 859 } 860 861 /** 862 * xprt_prepare_transmit - reserve the transport before sending a request 863 * @task: RPC task about to send a request 864 * 865 */ 866 bool xprt_prepare_transmit(struct rpc_task *task) 867 { 868 struct rpc_rqst *req = task->tk_rqstp; 869 struct rpc_xprt *xprt = req->rq_xprt; 870 bool ret = false; 871 872 dprintk("RPC: %5u xprt_prepare_transmit\n", task->tk_pid); 873 874 spin_lock_bh(&xprt->transport_lock); 875 if (!req->rq_bytes_sent) { 876 if (req->rq_reply_bytes_recvd) { 877 task->tk_status = req->rq_reply_bytes_recvd; 878 goto out_unlock; 879 } 880 if ((task->tk_flags & RPC_TASK_NO_RETRANS_TIMEOUT) 881 && xprt_connected(xprt) 882 && req->rq_connect_cookie == xprt->connect_cookie) { 883 xprt->ops->set_retrans_timeout(task); 884 rpc_sleep_on(&xprt->pending, task, xprt_timer); 885 goto out_unlock; 886 } 887 } 888 if (!xprt->ops->reserve_xprt(xprt, task)) { 889 task->tk_status = -EAGAIN; 890 goto out_unlock; 891 } 892 ret = true; 893 out_unlock: 894 spin_unlock_bh(&xprt->transport_lock); 895 return ret; 896 } 897 898 void xprt_end_transmit(struct rpc_task *task) 899 { 900 xprt_release_write(task->tk_rqstp->rq_xprt, task); 901 } 902 903 /** 904 * xprt_transmit - send an RPC request on a transport 905 * @task: controlling RPC task 906 * 907 * We have to copy the iovec because sendmsg fiddles with its contents. 908 */ 909 void xprt_transmit(struct rpc_task *task) 910 { 911 struct rpc_rqst *req = task->tk_rqstp; 912 struct rpc_xprt *xprt = req->rq_xprt; 913 int status, numreqs; 914 915 dprintk("RPC: %5u xprt_transmit(%u)\n", task->tk_pid, req->rq_slen); 916 917 if (!req->rq_reply_bytes_recvd) { 918 if (list_empty(&req->rq_list) && rpc_reply_expected(task)) { 919 /* 920 * Add to the list only if we're expecting a reply 921 */ 922 spin_lock_bh(&xprt->transport_lock); 923 /* Update the softirq receive buffer */ 924 memcpy(&req->rq_private_buf, &req->rq_rcv_buf, 925 sizeof(req->rq_private_buf)); 926 /* Add request to the receive list */ 927 list_add_tail(&req->rq_list, &xprt->recv); 928 spin_unlock_bh(&xprt->transport_lock); 929 xprt_reset_majortimeo(req); 930 /* Turn off autodisconnect */ 931 del_singleshot_timer_sync(&xprt->timer); 932 } 933 } else if (!req->rq_bytes_sent) 934 return; 935 936 req->rq_xtime = ktime_get(); 937 status = xprt->ops->send_request(task); 938 if (status != 0) { 939 task->tk_status = status; 940 return; 941 } 942 943 dprintk("RPC: %5u xmit complete\n", task->tk_pid); 944 task->tk_flags |= RPC_TASK_SENT; 945 spin_lock_bh(&xprt->transport_lock); 946 947 xprt->ops->set_retrans_timeout(task); 948 949 numreqs = atomic_read(&xprt->num_reqs); 950 if (numreqs > xprt->stat.max_slots) 951 xprt->stat.max_slots = numreqs; 952 xprt->stat.sends++; 953 xprt->stat.req_u += xprt->stat.sends - xprt->stat.recvs; 954 xprt->stat.bklog_u += xprt->backlog.qlen; 955 xprt->stat.sending_u += xprt->sending.qlen; 956 xprt->stat.pending_u += xprt->pending.qlen; 957 958 /* Don't race with disconnect */ 959 if (!xprt_connected(xprt)) 960 task->tk_status = -ENOTCONN; 961 else { 962 /* 963 * Sleep on the pending queue since 964 * we're expecting a reply. 965 */ 966 if (!req->rq_reply_bytes_recvd && rpc_reply_expected(task)) 967 rpc_sleep_on(&xprt->pending, task, xprt_timer); 968 req->rq_connect_cookie = xprt->connect_cookie; 969 } 970 spin_unlock_bh(&xprt->transport_lock); 971 } 972 973 static void xprt_add_backlog(struct rpc_xprt *xprt, struct rpc_task *task) 974 { 975 set_bit(XPRT_CONGESTED, &xprt->state); 976 rpc_sleep_on(&xprt->backlog, task, NULL); 977 } 978 979 static void xprt_wake_up_backlog(struct rpc_xprt *xprt) 980 { 981 if (rpc_wake_up_next(&xprt->backlog) == NULL) 982 clear_bit(XPRT_CONGESTED, &xprt->state); 983 } 984 985 static bool xprt_throttle_congested(struct rpc_xprt *xprt, struct rpc_task *task) 986 { 987 bool ret = false; 988 989 if (!test_bit(XPRT_CONGESTED, &xprt->state)) 990 goto out; 991 spin_lock(&xprt->reserve_lock); 992 if (test_bit(XPRT_CONGESTED, &xprt->state)) { 993 rpc_sleep_on(&xprt->backlog, task, NULL); 994 ret = true; 995 } 996 spin_unlock(&xprt->reserve_lock); 997 out: 998 return ret; 999 } 1000 1001 static struct rpc_rqst *xprt_dynamic_alloc_slot(struct rpc_xprt *xprt, gfp_t gfp_flags) 1002 { 1003 struct rpc_rqst *req = ERR_PTR(-EAGAIN); 1004 1005 if (!atomic_add_unless(&xprt->num_reqs, 1, xprt->max_reqs)) 1006 goto out; 1007 req = kzalloc(sizeof(struct rpc_rqst), gfp_flags); 1008 if (req != NULL) 1009 goto out; 1010 atomic_dec(&xprt->num_reqs); 1011 req = ERR_PTR(-ENOMEM); 1012 out: 1013 return req; 1014 } 1015 1016 static bool xprt_dynamic_free_slot(struct rpc_xprt *xprt, struct rpc_rqst *req) 1017 { 1018 if (atomic_add_unless(&xprt->num_reqs, -1, xprt->min_reqs)) { 1019 kfree(req); 1020 return true; 1021 } 1022 return false; 1023 } 1024 1025 void xprt_alloc_slot(struct rpc_xprt *xprt, struct rpc_task *task) 1026 { 1027 struct rpc_rqst *req; 1028 1029 spin_lock(&xprt->reserve_lock); 1030 if (!list_empty(&xprt->free)) { 1031 req = list_entry(xprt->free.next, struct rpc_rqst, rq_list); 1032 list_del(&req->rq_list); 1033 goto out_init_req; 1034 } 1035 req = xprt_dynamic_alloc_slot(xprt, GFP_NOWAIT|__GFP_NOWARN); 1036 if (!IS_ERR(req)) 1037 goto out_init_req; 1038 switch (PTR_ERR(req)) { 1039 case -ENOMEM: 1040 dprintk("RPC: dynamic allocation of request slot " 1041 "failed! Retrying\n"); 1042 task->tk_status = -ENOMEM; 1043 break; 1044 case -EAGAIN: 1045 xprt_add_backlog(xprt, task); 1046 dprintk("RPC: waiting for request slot\n"); 1047 default: 1048 task->tk_status = -EAGAIN; 1049 } 1050 spin_unlock(&xprt->reserve_lock); 1051 return; 1052 out_init_req: 1053 task->tk_status = 0; 1054 task->tk_rqstp = req; 1055 xprt_request_init(task, xprt); 1056 spin_unlock(&xprt->reserve_lock); 1057 } 1058 EXPORT_SYMBOL_GPL(xprt_alloc_slot); 1059 1060 void xprt_lock_and_alloc_slot(struct rpc_xprt *xprt, struct rpc_task *task) 1061 { 1062 /* Note: grabbing the xprt_lock_write() ensures that we throttle 1063 * new slot allocation if the transport is congested (i.e. when 1064 * reconnecting a stream transport or when out of socket write 1065 * buffer space). 1066 */ 1067 if (xprt_lock_write(xprt, task)) { 1068 xprt_alloc_slot(xprt, task); 1069 xprt_release_write(xprt, task); 1070 } 1071 } 1072 EXPORT_SYMBOL_GPL(xprt_lock_and_alloc_slot); 1073 1074 static void xprt_free_slot(struct rpc_xprt *xprt, struct rpc_rqst *req) 1075 { 1076 spin_lock(&xprt->reserve_lock); 1077 if (!xprt_dynamic_free_slot(xprt, req)) { 1078 memset(req, 0, sizeof(*req)); /* mark unused */ 1079 list_add(&req->rq_list, &xprt->free); 1080 } 1081 xprt_wake_up_backlog(xprt); 1082 spin_unlock(&xprt->reserve_lock); 1083 } 1084 1085 static void xprt_free_all_slots(struct rpc_xprt *xprt) 1086 { 1087 struct rpc_rqst *req; 1088 while (!list_empty(&xprt->free)) { 1089 req = list_first_entry(&xprt->free, struct rpc_rqst, rq_list); 1090 list_del(&req->rq_list); 1091 kfree(req); 1092 } 1093 } 1094 1095 struct rpc_xprt *xprt_alloc(struct net *net, size_t size, 1096 unsigned int num_prealloc, 1097 unsigned int max_alloc) 1098 { 1099 struct rpc_xprt *xprt; 1100 struct rpc_rqst *req; 1101 int i; 1102 1103 xprt = kzalloc(size, GFP_KERNEL); 1104 if (xprt == NULL) 1105 goto out; 1106 1107 xprt_init(xprt, net); 1108 1109 for (i = 0; i < num_prealloc; i++) { 1110 req = kzalloc(sizeof(struct rpc_rqst), GFP_KERNEL); 1111 if (!req) 1112 goto out_free; 1113 list_add(&req->rq_list, &xprt->free); 1114 } 1115 if (max_alloc > num_prealloc) 1116 xprt->max_reqs = max_alloc; 1117 else 1118 xprt->max_reqs = num_prealloc; 1119 xprt->min_reqs = num_prealloc; 1120 atomic_set(&xprt->num_reqs, num_prealloc); 1121 1122 return xprt; 1123 1124 out_free: 1125 xprt_free(xprt); 1126 out: 1127 return NULL; 1128 } 1129 EXPORT_SYMBOL_GPL(xprt_alloc); 1130 1131 void xprt_free(struct rpc_xprt *xprt) 1132 { 1133 put_net(xprt->xprt_net); 1134 xprt_free_all_slots(xprt); 1135 kfree(xprt); 1136 } 1137 EXPORT_SYMBOL_GPL(xprt_free); 1138 1139 /** 1140 * xprt_reserve - allocate an RPC request slot 1141 * @task: RPC task requesting a slot allocation 1142 * 1143 * If the transport is marked as being congested, or if no more 1144 * slots are available, place the task on the transport's 1145 * backlog queue. 1146 */ 1147 void xprt_reserve(struct rpc_task *task) 1148 { 1149 struct rpc_xprt *xprt; 1150 1151 task->tk_status = 0; 1152 if (task->tk_rqstp != NULL) 1153 return; 1154 1155 task->tk_timeout = 0; 1156 task->tk_status = -EAGAIN; 1157 rcu_read_lock(); 1158 xprt = rcu_dereference(task->tk_client->cl_xprt); 1159 if (!xprt_throttle_congested(xprt, task)) 1160 xprt->ops->alloc_slot(xprt, task); 1161 rcu_read_unlock(); 1162 } 1163 1164 /** 1165 * xprt_retry_reserve - allocate an RPC request slot 1166 * @task: RPC task requesting a slot allocation 1167 * 1168 * If no more slots are available, place the task on the transport's 1169 * backlog queue. 1170 * Note that the only difference with xprt_reserve is that we now 1171 * ignore the value of the XPRT_CONGESTED flag. 1172 */ 1173 void xprt_retry_reserve(struct rpc_task *task) 1174 { 1175 struct rpc_xprt *xprt; 1176 1177 task->tk_status = 0; 1178 if (task->tk_rqstp != NULL) 1179 return; 1180 1181 task->tk_timeout = 0; 1182 task->tk_status = -EAGAIN; 1183 rcu_read_lock(); 1184 xprt = rcu_dereference(task->tk_client->cl_xprt); 1185 xprt->ops->alloc_slot(xprt, task); 1186 rcu_read_unlock(); 1187 } 1188 1189 static inline __be32 xprt_alloc_xid(struct rpc_xprt *xprt) 1190 { 1191 return (__force __be32)xprt->xid++; 1192 } 1193 1194 static inline void xprt_init_xid(struct rpc_xprt *xprt) 1195 { 1196 xprt->xid = prandom_u32(); 1197 } 1198 1199 static void xprt_request_init(struct rpc_task *task, struct rpc_xprt *xprt) 1200 { 1201 struct rpc_rqst *req = task->tk_rqstp; 1202 1203 INIT_LIST_HEAD(&req->rq_list); 1204 req->rq_timeout = task->tk_client->cl_timeout->to_initval; 1205 req->rq_task = task; 1206 req->rq_xprt = xprt; 1207 req->rq_buffer = NULL; 1208 req->rq_xid = xprt_alloc_xid(xprt); 1209 req->rq_connect_cookie = xprt->connect_cookie - 1; 1210 req->rq_bytes_sent = 0; 1211 req->rq_snd_buf.len = 0; 1212 req->rq_snd_buf.buflen = 0; 1213 req->rq_rcv_buf.len = 0; 1214 req->rq_rcv_buf.buflen = 0; 1215 req->rq_release_snd_buf = NULL; 1216 xprt_reset_majortimeo(req); 1217 dprintk("RPC: %5u reserved req %p xid %08x\n", task->tk_pid, 1218 req, ntohl(req->rq_xid)); 1219 } 1220 1221 /** 1222 * xprt_release - release an RPC request slot 1223 * @task: task which is finished with the slot 1224 * 1225 */ 1226 void xprt_release(struct rpc_task *task) 1227 { 1228 struct rpc_xprt *xprt; 1229 struct rpc_rqst *req = task->tk_rqstp; 1230 1231 if (req == NULL) { 1232 if (task->tk_client) { 1233 rcu_read_lock(); 1234 xprt = rcu_dereference(task->tk_client->cl_xprt); 1235 if (xprt->snd_task == task) 1236 xprt_release_write(xprt, task); 1237 rcu_read_unlock(); 1238 } 1239 return; 1240 } 1241 1242 xprt = req->rq_xprt; 1243 if (task->tk_ops->rpc_count_stats != NULL) 1244 task->tk_ops->rpc_count_stats(task, task->tk_calldata); 1245 else if (task->tk_client) 1246 rpc_count_iostats(task, task->tk_client->cl_metrics); 1247 spin_lock_bh(&xprt->transport_lock); 1248 xprt->ops->release_xprt(xprt, task); 1249 if (xprt->ops->release_request) 1250 xprt->ops->release_request(task); 1251 if (!list_empty(&req->rq_list)) 1252 list_del(&req->rq_list); 1253 xprt->last_used = jiffies; 1254 if (list_empty(&xprt->recv) && xprt_has_timer(xprt)) 1255 mod_timer(&xprt->timer, 1256 xprt->last_used + xprt->idle_timeout); 1257 spin_unlock_bh(&xprt->transport_lock); 1258 if (req->rq_buffer) 1259 xprt->ops->buf_free(req->rq_buffer); 1260 if (req->rq_cred != NULL) 1261 put_rpccred(req->rq_cred); 1262 task->tk_rqstp = NULL; 1263 if (req->rq_release_snd_buf) 1264 req->rq_release_snd_buf(req); 1265 1266 dprintk("RPC: %5u release request %p\n", task->tk_pid, req); 1267 if (likely(!bc_prealloc(req))) 1268 xprt_free_slot(xprt, req); 1269 else 1270 xprt_free_bc_request(req); 1271 } 1272 1273 static void xprt_init(struct rpc_xprt *xprt, struct net *net) 1274 { 1275 atomic_set(&xprt->count, 1); 1276 1277 spin_lock_init(&xprt->transport_lock); 1278 spin_lock_init(&xprt->reserve_lock); 1279 1280 INIT_LIST_HEAD(&xprt->free); 1281 INIT_LIST_HEAD(&xprt->recv); 1282 #if defined(CONFIG_SUNRPC_BACKCHANNEL) 1283 spin_lock_init(&xprt->bc_pa_lock); 1284 INIT_LIST_HEAD(&xprt->bc_pa_list); 1285 #endif /* CONFIG_SUNRPC_BACKCHANNEL */ 1286 1287 xprt->last_used = jiffies; 1288 xprt->cwnd = RPC_INITCWND; 1289 xprt->bind_index = 0; 1290 1291 rpc_init_wait_queue(&xprt->binding, "xprt_binding"); 1292 rpc_init_wait_queue(&xprt->pending, "xprt_pending"); 1293 rpc_init_priority_wait_queue(&xprt->sending, "xprt_sending"); 1294 rpc_init_priority_wait_queue(&xprt->backlog, "xprt_backlog"); 1295 1296 xprt_init_xid(xprt); 1297 1298 xprt->xprt_net = get_net(net); 1299 } 1300 1301 /** 1302 * xprt_create_transport - create an RPC transport 1303 * @args: rpc transport creation arguments 1304 * 1305 */ 1306 struct rpc_xprt *xprt_create_transport(struct xprt_create *args) 1307 { 1308 struct rpc_xprt *xprt; 1309 struct xprt_class *t; 1310 1311 spin_lock(&xprt_list_lock); 1312 list_for_each_entry(t, &xprt_list, list) { 1313 if (t->ident == args->ident) { 1314 spin_unlock(&xprt_list_lock); 1315 goto found; 1316 } 1317 } 1318 spin_unlock(&xprt_list_lock); 1319 printk(KERN_ERR "RPC: transport (%d) not supported\n", args->ident); 1320 return ERR_PTR(-EIO); 1321 1322 found: 1323 xprt = t->setup(args); 1324 if (IS_ERR(xprt)) { 1325 dprintk("RPC: xprt_create_transport: failed, %ld\n", 1326 -PTR_ERR(xprt)); 1327 goto out; 1328 } 1329 if (args->flags & XPRT_CREATE_NO_IDLE_TIMEOUT) 1330 xprt->idle_timeout = 0; 1331 INIT_WORK(&xprt->task_cleanup, xprt_autoclose); 1332 if (xprt_has_timer(xprt)) 1333 setup_timer(&xprt->timer, xprt_init_autodisconnect, 1334 (unsigned long)xprt); 1335 else 1336 init_timer(&xprt->timer); 1337 1338 if (strlen(args->servername) > RPC_MAXNETNAMELEN) { 1339 xprt_destroy(xprt); 1340 return ERR_PTR(-EINVAL); 1341 } 1342 xprt->servername = kstrdup(args->servername, GFP_KERNEL); 1343 if (xprt->servername == NULL) { 1344 xprt_destroy(xprt); 1345 return ERR_PTR(-ENOMEM); 1346 } 1347 1348 dprintk("RPC: created transport %p with %u slots\n", xprt, 1349 xprt->max_reqs); 1350 out: 1351 return xprt; 1352 } 1353 1354 /** 1355 * xprt_destroy - destroy an RPC transport, killing off all requests. 1356 * @xprt: transport to destroy 1357 * 1358 */ 1359 static void xprt_destroy(struct rpc_xprt *xprt) 1360 { 1361 dprintk("RPC: destroying transport %p\n", xprt); 1362 del_timer_sync(&xprt->timer); 1363 1364 rpc_destroy_wait_queue(&xprt->binding); 1365 rpc_destroy_wait_queue(&xprt->pending); 1366 rpc_destroy_wait_queue(&xprt->sending); 1367 rpc_destroy_wait_queue(&xprt->backlog); 1368 cancel_work_sync(&xprt->task_cleanup); 1369 kfree(xprt->servername); 1370 /* 1371 * Tear down transport state and free the rpc_xprt 1372 */ 1373 xprt->ops->destroy(xprt); 1374 } 1375 1376 /** 1377 * xprt_put - release a reference to an RPC transport. 1378 * @xprt: pointer to the transport 1379 * 1380 */ 1381 void xprt_put(struct rpc_xprt *xprt) 1382 { 1383 if (atomic_dec_and_test(&xprt->count)) 1384 xprt_destroy(xprt); 1385 } 1386 1387 /** 1388 * xprt_get - return a reference to an RPC transport. 1389 * @xprt: pointer to the transport 1390 * 1391 */ 1392 struct rpc_xprt *xprt_get(struct rpc_xprt *xprt) 1393 { 1394 if (atomic_inc_not_zero(&xprt->count)) 1395 return xprt; 1396 return NULL; 1397 } 1398