1 /* 2 * linux/net/sunrpc/xprt.c 3 * 4 * This is a generic RPC call interface supporting congestion avoidance, 5 * and asynchronous calls. 6 * 7 * The interface works like this: 8 * 9 * - When a process places a call, it allocates a request slot if 10 * one is available. Otherwise, it sleeps on the backlog queue 11 * (xprt_reserve). 12 * - Next, the caller puts together the RPC message, stuffs it into 13 * the request struct, and calls xprt_transmit(). 14 * - xprt_transmit sends the message and installs the caller on the 15 * transport's wait list. At the same time, if a reply is expected, 16 * it installs a timer that is run after the packet's timeout has 17 * expired. 18 * - When a packet arrives, the data_ready handler walks the list of 19 * pending requests for that transport. If a matching XID is found, the 20 * caller is woken up, and the timer removed. 21 * - When no reply arrives within the timeout interval, the timer is 22 * fired by the kernel and runs xprt_timer(). It either adjusts the 23 * timeout values (minor timeout) or wakes up the caller with a status 24 * of -ETIMEDOUT. 25 * - When the caller receives a notification from RPC that a reply arrived, 26 * it should release the RPC slot, and process the reply. 27 * If the call timed out, it may choose to retry the operation by 28 * adjusting the initial timeout value, and simply calling rpc_call 29 * again. 30 * 31 * Support for async RPC is done through a set of RPC-specific scheduling 32 * primitives that `transparently' work for processes as well as async 33 * tasks that rely on callbacks. 34 * 35 * Copyright (C) 1995-1997, Olaf Kirch <okir@monad.swb.de> 36 * 37 * Transport switch API copyright (C) 2005, Chuck Lever <cel@netapp.com> 38 */ 39 40 #include <linux/module.h> 41 42 #include <linux/types.h> 43 #include <linux/interrupt.h> 44 #include <linux/workqueue.h> 45 #include <linux/net.h> 46 #include <linux/ktime.h> 47 48 #include <linux/sunrpc/clnt.h> 49 #include <linux/sunrpc/metrics.h> 50 #include <linux/sunrpc/bc_xprt.h> 51 52 #include "sunrpc.h" 53 54 /* 55 * Local variables 56 */ 57 58 #ifdef RPC_DEBUG 59 # define RPCDBG_FACILITY RPCDBG_XPRT 60 #endif 61 62 /* 63 * Local functions 64 */ 65 static void xprt_init(struct rpc_xprt *xprt, struct net *net); 66 static void xprt_request_init(struct rpc_task *, struct rpc_xprt *); 67 static void xprt_connect_status(struct rpc_task *task); 68 static int __xprt_get_cong(struct rpc_xprt *, struct rpc_task *); 69 static void xprt_destroy(struct rpc_xprt *xprt); 70 71 static DEFINE_SPINLOCK(xprt_list_lock); 72 static LIST_HEAD(xprt_list); 73 74 /* 75 * The transport code maintains an estimate on the maximum number of out- 76 * standing RPC requests, using a smoothed version of the congestion 77 * avoidance implemented in 44BSD. This is basically the Van Jacobson 78 * congestion algorithm: If a retransmit occurs, the congestion window is 79 * halved; otherwise, it is incremented by 1/cwnd when 80 * 81 * - a reply is received and 82 * - a full number of requests are outstanding and 83 * - the congestion window hasn't been updated recently. 84 */ 85 #define RPC_CWNDSHIFT (8U) 86 #define RPC_CWNDSCALE (1U << RPC_CWNDSHIFT) 87 #define RPC_INITCWND RPC_CWNDSCALE 88 #define RPC_MAXCWND(xprt) ((xprt)->max_reqs << RPC_CWNDSHIFT) 89 90 #define RPCXPRT_CONGESTED(xprt) ((xprt)->cong >= (xprt)->cwnd) 91 92 /** 93 * xprt_register_transport - register a transport implementation 94 * @transport: transport to register 95 * 96 * If a transport implementation is loaded as a kernel module, it can 97 * call this interface to make itself known to the RPC client. 98 * 99 * Returns: 100 * 0: transport successfully registered 101 * -EEXIST: transport already registered 102 * -EINVAL: transport module being unloaded 103 */ 104 int xprt_register_transport(struct xprt_class *transport) 105 { 106 struct xprt_class *t; 107 int result; 108 109 result = -EEXIST; 110 spin_lock(&xprt_list_lock); 111 list_for_each_entry(t, &xprt_list, list) { 112 /* don't register the same transport class twice */ 113 if (t->ident == transport->ident) 114 goto out; 115 } 116 117 list_add_tail(&transport->list, &xprt_list); 118 printk(KERN_INFO "RPC: Registered %s transport module.\n", 119 transport->name); 120 result = 0; 121 122 out: 123 spin_unlock(&xprt_list_lock); 124 return result; 125 } 126 EXPORT_SYMBOL_GPL(xprt_register_transport); 127 128 /** 129 * xprt_unregister_transport - unregister a transport implementation 130 * @transport: transport to unregister 131 * 132 * Returns: 133 * 0: transport successfully unregistered 134 * -ENOENT: transport never registered 135 */ 136 int xprt_unregister_transport(struct xprt_class *transport) 137 { 138 struct xprt_class *t; 139 int result; 140 141 result = 0; 142 spin_lock(&xprt_list_lock); 143 list_for_each_entry(t, &xprt_list, list) { 144 if (t == transport) { 145 printk(KERN_INFO 146 "RPC: Unregistered %s transport module.\n", 147 transport->name); 148 list_del_init(&transport->list); 149 goto out; 150 } 151 } 152 result = -ENOENT; 153 154 out: 155 spin_unlock(&xprt_list_lock); 156 return result; 157 } 158 EXPORT_SYMBOL_GPL(xprt_unregister_transport); 159 160 /** 161 * xprt_load_transport - load a transport implementation 162 * @transport_name: transport to load 163 * 164 * Returns: 165 * 0: transport successfully loaded 166 * -ENOENT: transport module not available 167 */ 168 int xprt_load_transport(const char *transport_name) 169 { 170 struct xprt_class *t; 171 int result; 172 173 result = 0; 174 spin_lock(&xprt_list_lock); 175 list_for_each_entry(t, &xprt_list, list) { 176 if (strcmp(t->name, transport_name) == 0) { 177 spin_unlock(&xprt_list_lock); 178 goto out; 179 } 180 } 181 spin_unlock(&xprt_list_lock); 182 result = request_module("xprt%s", transport_name); 183 out: 184 return result; 185 } 186 EXPORT_SYMBOL_GPL(xprt_load_transport); 187 188 /** 189 * xprt_reserve_xprt - serialize write access to transports 190 * @task: task that is requesting access to the transport 191 * @xprt: pointer to the target transport 192 * 193 * This prevents mixing the payload of separate requests, and prevents 194 * transport connects from colliding with writes. No congestion control 195 * is provided. 196 */ 197 int xprt_reserve_xprt(struct rpc_xprt *xprt, struct rpc_task *task) 198 { 199 struct rpc_rqst *req = task->tk_rqstp; 200 int priority; 201 202 if (test_and_set_bit(XPRT_LOCKED, &xprt->state)) { 203 if (task == xprt->snd_task) 204 return 1; 205 goto out_sleep; 206 } 207 xprt->snd_task = task; 208 if (req != NULL) 209 req->rq_ntrans++; 210 211 return 1; 212 213 out_sleep: 214 dprintk("RPC: %5u failed to lock transport %p\n", 215 task->tk_pid, xprt); 216 task->tk_timeout = 0; 217 task->tk_status = -EAGAIN; 218 if (req == NULL) 219 priority = RPC_PRIORITY_LOW; 220 else if (!req->rq_ntrans) 221 priority = RPC_PRIORITY_NORMAL; 222 else 223 priority = RPC_PRIORITY_HIGH; 224 rpc_sleep_on_priority(&xprt->sending, task, NULL, priority); 225 return 0; 226 } 227 EXPORT_SYMBOL_GPL(xprt_reserve_xprt); 228 229 static void xprt_clear_locked(struct rpc_xprt *xprt) 230 { 231 xprt->snd_task = NULL; 232 if (!test_bit(XPRT_CLOSE_WAIT, &xprt->state)) { 233 smp_mb__before_clear_bit(); 234 clear_bit(XPRT_LOCKED, &xprt->state); 235 smp_mb__after_clear_bit(); 236 } else 237 queue_work(rpciod_workqueue, &xprt->task_cleanup); 238 } 239 240 /* 241 * xprt_reserve_xprt_cong - serialize write access to transports 242 * @task: task that is requesting access to the transport 243 * 244 * Same as xprt_reserve_xprt, but Van Jacobson congestion control is 245 * integrated into the decision of whether a request is allowed to be 246 * woken up and given access to the transport. 247 */ 248 int xprt_reserve_xprt_cong(struct rpc_xprt *xprt, struct rpc_task *task) 249 { 250 struct rpc_rqst *req = task->tk_rqstp; 251 int priority; 252 253 if (test_and_set_bit(XPRT_LOCKED, &xprt->state)) { 254 if (task == xprt->snd_task) 255 return 1; 256 goto out_sleep; 257 } 258 if (req == NULL) { 259 xprt->snd_task = task; 260 return 1; 261 } 262 if (__xprt_get_cong(xprt, task)) { 263 xprt->snd_task = task; 264 req->rq_ntrans++; 265 return 1; 266 } 267 xprt_clear_locked(xprt); 268 out_sleep: 269 dprintk("RPC: %5u failed to lock transport %p\n", task->tk_pid, xprt); 270 task->tk_timeout = 0; 271 task->tk_status = -EAGAIN; 272 if (req == NULL) 273 priority = RPC_PRIORITY_LOW; 274 else if (!req->rq_ntrans) 275 priority = RPC_PRIORITY_NORMAL; 276 else 277 priority = RPC_PRIORITY_HIGH; 278 rpc_sleep_on_priority(&xprt->sending, task, NULL, priority); 279 return 0; 280 } 281 EXPORT_SYMBOL_GPL(xprt_reserve_xprt_cong); 282 283 static inline int xprt_lock_write(struct rpc_xprt *xprt, struct rpc_task *task) 284 { 285 int retval; 286 287 spin_lock_bh(&xprt->transport_lock); 288 retval = xprt->ops->reserve_xprt(xprt, task); 289 spin_unlock_bh(&xprt->transport_lock); 290 return retval; 291 } 292 293 static bool __xprt_lock_write_func(struct rpc_task *task, void *data) 294 { 295 struct rpc_xprt *xprt = data; 296 struct rpc_rqst *req; 297 298 req = task->tk_rqstp; 299 xprt->snd_task = task; 300 if (req) 301 req->rq_ntrans++; 302 return true; 303 } 304 305 static void __xprt_lock_write_next(struct rpc_xprt *xprt) 306 { 307 if (test_and_set_bit(XPRT_LOCKED, &xprt->state)) 308 return; 309 310 if (rpc_wake_up_first(&xprt->sending, __xprt_lock_write_func, xprt)) 311 return; 312 xprt_clear_locked(xprt); 313 } 314 315 static bool __xprt_lock_write_cong_func(struct rpc_task *task, void *data) 316 { 317 struct rpc_xprt *xprt = data; 318 struct rpc_rqst *req; 319 320 req = task->tk_rqstp; 321 if (req == NULL) { 322 xprt->snd_task = task; 323 return true; 324 } 325 if (__xprt_get_cong(xprt, task)) { 326 xprt->snd_task = task; 327 req->rq_ntrans++; 328 return true; 329 } 330 return false; 331 } 332 333 static void __xprt_lock_write_next_cong(struct rpc_xprt *xprt) 334 { 335 if (test_and_set_bit(XPRT_LOCKED, &xprt->state)) 336 return; 337 if (RPCXPRT_CONGESTED(xprt)) 338 goto out_unlock; 339 if (rpc_wake_up_first(&xprt->sending, __xprt_lock_write_cong_func, xprt)) 340 return; 341 out_unlock: 342 xprt_clear_locked(xprt); 343 } 344 345 /** 346 * xprt_release_xprt - allow other requests to use a transport 347 * @xprt: transport with other tasks potentially waiting 348 * @task: task that is releasing access to the transport 349 * 350 * Note that "task" can be NULL. No congestion control is provided. 351 */ 352 void xprt_release_xprt(struct rpc_xprt *xprt, struct rpc_task *task) 353 { 354 if (xprt->snd_task == task) { 355 if (task != NULL) { 356 struct rpc_rqst *req = task->tk_rqstp; 357 if (req != NULL) 358 req->rq_bytes_sent = 0; 359 } 360 xprt_clear_locked(xprt); 361 __xprt_lock_write_next(xprt); 362 } 363 } 364 EXPORT_SYMBOL_GPL(xprt_release_xprt); 365 366 /** 367 * xprt_release_xprt_cong - allow other requests to use a transport 368 * @xprt: transport with other tasks potentially waiting 369 * @task: task that is releasing access to the transport 370 * 371 * Note that "task" can be NULL. Another task is awoken to use the 372 * transport if the transport's congestion window allows it. 373 */ 374 void xprt_release_xprt_cong(struct rpc_xprt *xprt, struct rpc_task *task) 375 { 376 if (xprt->snd_task == task) { 377 if (task != NULL) { 378 struct rpc_rqst *req = task->tk_rqstp; 379 if (req != NULL) 380 req->rq_bytes_sent = 0; 381 } 382 xprt_clear_locked(xprt); 383 __xprt_lock_write_next_cong(xprt); 384 } 385 } 386 EXPORT_SYMBOL_GPL(xprt_release_xprt_cong); 387 388 static inline void xprt_release_write(struct rpc_xprt *xprt, struct rpc_task *task) 389 { 390 spin_lock_bh(&xprt->transport_lock); 391 xprt->ops->release_xprt(xprt, task); 392 spin_unlock_bh(&xprt->transport_lock); 393 } 394 395 /* 396 * Van Jacobson congestion avoidance. Check if the congestion window 397 * overflowed. Put the task to sleep if this is the case. 398 */ 399 static int 400 __xprt_get_cong(struct rpc_xprt *xprt, struct rpc_task *task) 401 { 402 struct rpc_rqst *req = task->tk_rqstp; 403 404 if (req->rq_cong) 405 return 1; 406 dprintk("RPC: %5u xprt_cwnd_limited cong = %lu cwnd = %lu\n", 407 task->tk_pid, xprt->cong, xprt->cwnd); 408 if (RPCXPRT_CONGESTED(xprt)) 409 return 0; 410 req->rq_cong = 1; 411 xprt->cong += RPC_CWNDSCALE; 412 return 1; 413 } 414 415 /* 416 * Adjust the congestion window, and wake up the next task 417 * that has been sleeping due to congestion 418 */ 419 static void 420 __xprt_put_cong(struct rpc_xprt *xprt, struct rpc_rqst *req) 421 { 422 if (!req->rq_cong) 423 return; 424 req->rq_cong = 0; 425 xprt->cong -= RPC_CWNDSCALE; 426 __xprt_lock_write_next_cong(xprt); 427 } 428 429 /** 430 * xprt_release_rqst_cong - housekeeping when request is complete 431 * @task: RPC request that recently completed 432 * 433 * Useful for transports that require congestion control. 434 */ 435 void xprt_release_rqst_cong(struct rpc_task *task) 436 { 437 struct rpc_rqst *req = task->tk_rqstp; 438 439 __xprt_put_cong(req->rq_xprt, req); 440 } 441 EXPORT_SYMBOL_GPL(xprt_release_rqst_cong); 442 443 /** 444 * xprt_adjust_cwnd - adjust transport congestion window 445 * @xprt: pointer to xprt 446 * @task: recently completed RPC request used to adjust window 447 * @result: result code of completed RPC request 448 * 449 * We use a time-smoothed congestion estimator to avoid heavy oscillation. 450 */ 451 void xprt_adjust_cwnd(struct rpc_xprt *xprt, struct rpc_task *task, int result) 452 { 453 struct rpc_rqst *req = task->tk_rqstp; 454 unsigned long cwnd = xprt->cwnd; 455 456 if (result >= 0 && cwnd <= xprt->cong) { 457 /* The (cwnd >> 1) term makes sure 458 * the result gets rounded properly. */ 459 cwnd += (RPC_CWNDSCALE * RPC_CWNDSCALE + (cwnd >> 1)) / cwnd; 460 if (cwnd > RPC_MAXCWND(xprt)) 461 cwnd = RPC_MAXCWND(xprt); 462 __xprt_lock_write_next_cong(xprt); 463 } else if (result == -ETIMEDOUT) { 464 cwnd >>= 1; 465 if (cwnd < RPC_CWNDSCALE) 466 cwnd = RPC_CWNDSCALE; 467 } 468 dprintk("RPC: cong %ld, cwnd was %ld, now %ld\n", 469 xprt->cong, xprt->cwnd, cwnd); 470 xprt->cwnd = cwnd; 471 __xprt_put_cong(xprt, req); 472 } 473 EXPORT_SYMBOL_GPL(xprt_adjust_cwnd); 474 475 /** 476 * xprt_wake_pending_tasks - wake all tasks on a transport's pending queue 477 * @xprt: transport with waiting tasks 478 * @status: result code to plant in each task before waking it 479 * 480 */ 481 void xprt_wake_pending_tasks(struct rpc_xprt *xprt, int status) 482 { 483 if (status < 0) 484 rpc_wake_up_status(&xprt->pending, status); 485 else 486 rpc_wake_up(&xprt->pending); 487 } 488 EXPORT_SYMBOL_GPL(xprt_wake_pending_tasks); 489 490 /** 491 * xprt_wait_for_buffer_space - wait for transport output buffer to clear 492 * @task: task to be put to sleep 493 * @action: function pointer to be executed after wait 494 * 495 * Note that we only set the timer for the case of RPC_IS_SOFT(), since 496 * we don't in general want to force a socket disconnection due to 497 * an incomplete RPC call transmission. 498 */ 499 void xprt_wait_for_buffer_space(struct rpc_task *task, rpc_action action) 500 { 501 struct rpc_rqst *req = task->tk_rqstp; 502 struct rpc_xprt *xprt = req->rq_xprt; 503 504 task->tk_timeout = RPC_IS_SOFT(task) ? req->rq_timeout : 0; 505 rpc_sleep_on(&xprt->pending, task, action); 506 } 507 EXPORT_SYMBOL_GPL(xprt_wait_for_buffer_space); 508 509 /** 510 * xprt_write_space - wake the task waiting for transport output buffer space 511 * @xprt: transport with waiting tasks 512 * 513 * Can be called in a soft IRQ context, so xprt_write_space never sleeps. 514 */ 515 void xprt_write_space(struct rpc_xprt *xprt) 516 { 517 spin_lock_bh(&xprt->transport_lock); 518 if (xprt->snd_task) { 519 dprintk("RPC: write space: waking waiting task on " 520 "xprt %p\n", xprt); 521 rpc_wake_up_queued_task(&xprt->pending, xprt->snd_task); 522 } 523 spin_unlock_bh(&xprt->transport_lock); 524 } 525 EXPORT_SYMBOL_GPL(xprt_write_space); 526 527 /** 528 * xprt_set_retrans_timeout_def - set a request's retransmit timeout 529 * @task: task whose timeout is to be set 530 * 531 * Set a request's retransmit timeout based on the transport's 532 * default timeout parameters. Used by transports that don't adjust 533 * the retransmit timeout based on round-trip time estimation. 534 */ 535 void xprt_set_retrans_timeout_def(struct rpc_task *task) 536 { 537 task->tk_timeout = task->tk_rqstp->rq_timeout; 538 } 539 EXPORT_SYMBOL_GPL(xprt_set_retrans_timeout_def); 540 541 /** 542 * xprt_set_retrans_timeout_rtt - set a request's retransmit timeout 543 * @task: task whose timeout is to be set 544 * 545 * Set a request's retransmit timeout using the RTT estimator. 546 */ 547 void xprt_set_retrans_timeout_rtt(struct rpc_task *task) 548 { 549 int timer = task->tk_msg.rpc_proc->p_timer; 550 struct rpc_clnt *clnt = task->tk_client; 551 struct rpc_rtt *rtt = clnt->cl_rtt; 552 struct rpc_rqst *req = task->tk_rqstp; 553 unsigned long max_timeout = clnt->cl_timeout->to_maxval; 554 555 task->tk_timeout = rpc_calc_rto(rtt, timer); 556 task->tk_timeout <<= rpc_ntimeo(rtt, timer) + req->rq_retries; 557 if (task->tk_timeout > max_timeout || task->tk_timeout == 0) 558 task->tk_timeout = max_timeout; 559 } 560 EXPORT_SYMBOL_GPL(xprt_set_retrans_timeout_rtt); 561 562 static void xprt_reset_majortimeo(struct rpc_rqst *req) 563 { 564 const struct rpc_timeout *to = req->rq_task->tk_client->cl_timeout; 565 566 req->rq_majortimeo = req->rq_timeout; 567 if (to->to_exponential) 568 req->rq_majortimeo <<= to->to_retries; 569 else 570 req->rq_majortimeo += to->to_increment * to->to_retries; 571 if (req->rq_majortimeo > to->to_maxval || req->rq_majortimeo == 0) 572 req->rq_majortimeo = to->to_maxval; 573 req->rq_majortimeo += jiffies; 574 } 575 576 /** 577 * xprt_adjust_timeout - adjust timeout values for next retransmit 578 * @req: RPC request containing parameters to use for the adjustment 579 * 580 */ 581 int xprt_adjust_timeout(struct rpc_rqst *req) 582 { 583 struct rpc_xprt *xprt = req->rq_xprt; 584 const struct rpc_timeout *to = req->rq_task->tk_client->cl_timeout; 585 int status = 0; 586 587 if (time_before(jiffies, req->rq_majortimeo)) { 588 if (to->to_exponential) 589 req->rq_timeout <<= 1; 590 else 591 req->rq_timeout += to->to_increment; 592 if (to->to_maxval && req->rq_timeout >= to->to_maxval) 593 req->rq_timeout = to->to_maxval; 594 req->rq_retries++; 595 } else { 596 req->rq_timeout = to->to_initval; 597 req->rq_retries = 0; 598 xprt_reset_majortimeo(req); 599 /* Reset the RTT counters == "slow start" */ 600 spin_lock_bh(&xprt->transport_lock); 601 rpc_init_rtt(req->rq_task->tk_client->cl_rtt, to->to_initval); 602 spin_unlock_bh(&xprt->transport_lock); 603 status = -ETIMEDOUT; 604 } 605 606 if (req->rq_timeout == 0) { 607 printk(KERN_WARNING "xprt_adjust_timeout: rq_timeout = 0!\n"); 608 req->rq_timeout = 5 * HZ; 609 } 610 return status; 611 } 612 613 static void xprt_autoclose(struct work_struct *work) 614 { 615 struct rpc_xprt *xprt = 616 container_of(work, struct rpc_xprt, task_cleanup); 617 618 xprt->ops->close(xprt); 619 clear_bit(XPRT_CLOSE_WAIT, &xprt->state); 620 xprt_release_write(xprt, NULL); 621 } 622 623 /** 624 * xprt_disconnect_done - mark a transport as disconnected 625 * @xprt: transport to flag for disconnect 626 * 627 */ 628 void xprt_disconnect_done(struct rpc_xprt *xprt) 629 { 630 dprintk("RPC: disconnected transport %p\n", xprt); 631 spin_lock_bh(&xprt->transport_lock); 632 xprt_clear_connected(xprt); 633 xprt_wake_pending_tasks(xprt, -EAGAIN); 634 spin_unlock_bh(&xprt->transport_lock); 635 } 636 EXPORT_SYMBOL_GPL(xprt_disconnect_done); 637 638 /** 639 * xprt_force_disconnect - force a transport to disconnect 640 * @xprt: transport to disconnect 641 * 642 */ 643 void xprt_force_disconnect(struct rpc_xprt *xprt) 644 { 645 /* Don't race with the test_bit() in xprt_clear_locked() */ 646 spin_lock_bh(&xprt->transport_lock); 647 set_bit(XPRT_CLOSE_WAIT, &xprt->state); 648 /* Try to schedule an autoclose RPC call */ 649 if (test_and_set_bit(XPRT_LOCKED, &xprt->state) == 0) 650 queue_work(rpciod_workqueue, &xprt->task_cleanup); 651 xprt_wake_pending_tasks(xprt, -EAGAIN); 652 spin_unlock_bh(&xprt->transport_lock); 653 } 654 655 /** 656 * xprt_conditional_disconnect - force a transport to disconnect 657 * @xprt: transport to disconnect 658 * @cookie: 'connection cookie' 659 * 660 * This attempts to break the connection if and only if 'cookie' matches 661 * the current transport 'connection cookie'. It ensures that we don't 662 * try to break the connection more than once when we need to retransmit 663 * a batch of RPC requests. 664 * 665 */ 666 void xprt_conditional_disconnect(struct rpc_xprt *xprt, unsigned int cookie) 667 { 668 /* Don't race with the test_bit() in xprt_clear_locked() */ 669 spin_lock_bh(&xprt->transport_lock); 670 if (cookie != xprt->connect_cookie) 671 goto out; 672 if (test_bit(XPRT_CLOSING, &xprt->state) || !xprt_connected(xprt)) 673 goto out; 674 set_bit(XPRT_CLOSE_WAIT, &xprt->state); 675 /* Try to schedule an autoclose RPC call */ 676 if (test_and_set_bit(XPRT_LOCKED, &xprt->state) == 0) 677 queue_work(rpciod_workqueue, &xprt->task_cleanup); 678 xprt_wake_pending_tasks(xprt, -EAGAIN); 679 out: 680 spin_unlock_bh(&xprt->transport_lock); 681 } 682 683 static void 684 xprt_init_autodisconnect(unsigned long data) 685 { 686 struct rpc_xprt *xprt = (struct rpc_xprt *)data; 687 688 spin_lock(&xprt->transport_lock); 689 if (!list_empty(&xprt->recv)) 690 goto out_abort; 691 if (test_and_set_bit(XPRT_LOCKED, &xprt->state)) 692 goto out_abort; 693 spin_unlock(&xprt->transport_lock); 694 set_bit(XPRT_CONNECTION_CLOSE, &xprt->state); 695 queue_work(rpciod_workqueue, &xprt->task_cleanup); 696 return; 697 out_abort: 698 spin_unlock(&xprt->transport_lock); 699 } 700 701 /** 702 * xprt_connect - schedule a transport connect operation 703 * @task: RPC task that is requesting the connect 704 * 705 */ 706 void xprt_connect(struct rpc_task *task) 707 { 708 struct rpc_xprt *xprt = task->tk_rqstp->rq_xprt; 709 710 dprintk("RPC: %5u xprt_connect xprt %p %s connected\n", task->tk_pid, 711 xprt, (xprt_connected(xprt) ? "is" : "is not")); 712 713 if (!xprt_bound(xprt)) { 714 task->tk_status = -EAGAIN; 715 return; 716 } 717 if (!xprt_lock_write(xprt, task)) 718 return; 719 720 if (test_and_clear_bit(XPRT_CLOSE_WAIT, &xprt->state)) 721 xprt->ops->close(xprt); 722 723 if (xprt_connected(xprt)) 724 xprt_release_write(xprt, task); 725 else { 726 task->tk_rqstp->rq_bytes_sent = 0; 727 task->tk_timeout = task->tk_rqstp->rq_timeout; 728 rpc_sleep_on(&xprt->pending, task, xprt_connect_status); 729 730 if (test_bit(XPRT_CLOSING, &xprt->state)) 731 return; 732 if (xprt_test_and_set_connecting(xprt)) 733 return; 734 xprt->stat.connect_start = jiffies; 735 xprt->ops->connect(xprt, task); 736 } 737 } 738 739 static void xprt_connect_status(struct rpc_task *task) 740 { 741 struct rpc_xprt *xprt = task->tk_rqstp->rq_xprt; 742 743 if (task->tk_status == 0) { 744 xprt->stat.connect_count++; 745 xprt->stat.connect_time += (long)jiffies - xprt->stat.connect_start; 746 dprintk("RPC: %5u xprt_connect_status: connection established\n", 747 task->tk_pid); 748 return; 749 } 750 751 switch (task->tk_status) { 752 case -EAGAIN: 753 dprintk("RPC: %5u xprt_connect_status: retrying\n", task->tk_pid); 754 break; 755 case -ETIMEDOUT: 756 dprintk("RPC: %5u xprt_connect_status: connect attempt timed " 757 "out\n", task->tk_pid); 758 break; 759 default: 760 dprintk("RPC: %5u xprt_connect_status: error %d connecting to " 761 "server %s\n", task->tk_pid, -task->tk_status, 762 xprt->servername); 763 xprt_release_write(xprt, task); 764 task->tk_status = -EIO; 765 } 766 } 767 768 /** 769 * xprt_lookup_rqst - find an RPC request corresponding to an XID 770 * @xprt: transport on which the original request was transmitted 771 * @xid: RPC XID of incoming reply 772 * 773 */ 774 struct rpc_rqst *xprt_lookup_rqst(struct rpc_xprt *xprt, __be32 xid) 775 { 776 struct rpc_rqst *entry; 777 778 list_for_each_entry(entry, &xprt->recv, rq_list) 779 if (entry->rq_xid == xid) 780 return entry; 781 782 dprintk("RPC: xprt_lookup_rqst did not find xid %08x\n", 783 ntohl(xid)); 784 xprt->stat.bad_xids++; 785 return NULL; 786 } 787 EXPORT_SYMBOL_GPL(xprt_lookup_rqst); 788 789 static void xprt_update_rtt(struct rpc_task *task) 790 { 791 struct rpc_rqst *req = task->tk_rqstp; 792 struct rpc_rtt *rtt = task->tk_client->cl_rtt; 793 unsigned int timer = task->tk_msg.rpc_proc->p_timer; 794 long m = usecs_to_jiffies(ktime_to_us(req->rq_rtt)); 795 796 if (timer) { 797 if (req->rq_ntrans == 1) 798 rpc_update_rtt(rtt, timer, m); 799 rpc_set_timeo(rtt, timer, req->rq_ntrans - 1); 800 } 801 } 802 803 /** 804 * xprt_complete_rqst - called when reply processing is complete 805 * @task: RPC request that recently completed 806 * @copied: actual number of bytes received from the transport 807 * 808 * Caller holds transport lock. 809 */ 810 void xprt_complete_rqst(struct rpc_task *task, int copied) 811 { 812 struct rpc_rqst *req = task->tk_rqstp; 813 struct rpc_xprt *xprt = req->rq_xprt; 814 815 dprintk("RPC: %5u xid %08x complete (%d bytes received)\n", 816 task->tk_pid, ntohl(req->rq_xid), copied); 817 818 xprt->stat.recvs++; 819 req->rq_rtt = ktime_sub(ktime_get(), req->rq_xtime); 820 if (xprt->ops->timer != NULL) 821 xprt_update_rtt(task); 822 823 list_del_init(&req->rq_list); 824 req->rq_private_buf.len = copied; 825 /* Ensure all writes are done before we update */ 826 /* req->rq_reply_bytes_recvd */ 827 smp_wmb(); 828 req->rq_reply_bytes_recvd = copied; 829 rpc_wake_up_queued_task(&xprt->pending, task); 830 } 831 EXPORT_SYMBOL_GPL(xprt_complete_rqst); 832 833 static void xprt_timer(struct rpc_task *task) 834 { 835 struct rpc_rqst *req = task->tk_rqstp; 836 struct rpc_xprt *xprt = req->rq_xprt; 837 838 if (task->tk_status != -ETIMEDOUT) 839 return; 840 dprintk("RPC: %5u xprt_timer\n", task->tk_pid); 841 842 spin_lock_bh(&xprt->transport_lock); 843 if (!req->rq_reply_bytes_recvd) { 844 if (xprt->ops->timer) 845 xprt->ops->timer(xprt, task); 846 } else 847 task->tk_status = 0; 848 spin_unlock_bh(&xprt->transport_lock); 849 } 850 851 static inline int xprt_has_timer(struct rpc_xprt *xprt) 852 { 853 return xprt->idle_timeout != 0; 854 } 855 856 /** 857 * xprt_prepare_transmit - reserve the transport before sending a request 858 * @task: RPC task about to send a request 859 * 860 */ 861 bool xprt_prepare_transmit(struct rpc_task *task) 862 { 863 struct rpc_rqst *req = task->tk_rqstp; 864 struct rpc_xprt *xprt = req->rq_xprt; 865 bool ret = false; 866 867 dprintk("RPC: %5u xprt_prepare_transmit\n", task->tk_pid); 868 869 spin_lock_bh(&xprt->transport_lock); 870 if (!req->rq_bytes_sent) { 871 if (req->rq_reply_bytes_recvd) { 872 task->tk_status = req->rq_reply_bytes_recvd; 873 goto out_unlock; 874 } 875 if ((task->tk_flags & RPC_TASK_NO_RETRANS_TIMEOUT) 876 && xprt_connected(xprt) 877 && req->rq_connect_cookie == xprt->connect_cookie) { 878 xprt->ops->set_retrans_timeout(task); 879 rpc_sleep_on(&xprt->pending, task, xprt_timer); 880 goto out_unlock; 881 } 882 } 883 if (!xprt->ops->reserve_xprt(xprt, task)) { 884 task->tk_status = -EAGAIN; 885 goto out_unlock; 886 } 887 ret = true; 888 out_unlock: 889 spin_unlock_bh(&xprt->transport_lock); 890 return ret; 891 } 892 893 void xprt_end_transmit(struct rpc_task *task) 894 { 895 xprt_release_write(task->tk_rqstp->rq_xprt, task); 896 } 897 898 /** 899 * xprt_transmit - send an RPC request on a transport 900 * @task: controlling RPC task 901 * 902 * We have to copy the iovec because sendmsg fiddles with its contents. 903 */ 904 void xprt_transmit(struct rpc_task *task) 905 { 906 struct rpc_rqst *req = task->tk_rqstp; 907 struct rpc_xprt *xprt = req->rq_xprt; 908 int status, numreqs; 909 910 dprintk("RPC: %5u xprt_transmit(%u)\n", task->tk_pid, req->rq_slen); 911 912 if (!req->rq_reply_bytes_recvd) { 913 if (list_empty(&req->rq_list) && rpc_reply_expected(task)) { 914 /* 915 * Add to the list only if we're expecting a reply 916 */ 917 spin_lock_bh(&xprt->transport_lock); 918 /* Update the softirq receive buffer */ 919 memcpy(&req->rq_private_buf, &req->rq_rcv_buf, 920 sizeof(req->rq_private_buf)); 921 /* Add request to the receive list */ 922 list_add_tail(&req->rq_list, &xprt->recv); 923 spin_unlock_bh(&xprt->transport_lock); 924 xprt_reset_majortimeo(req); 925 /* Turn off autodisconnect */ 926 del_singleshot_timer_sync(&xprt->timer); 927 } 928 } else if (!req->rq_bytes_sent) 929 return; 930 931 req->rq_xtime = ktime_get(); 932 status = xprt->ops->send_request(task); 933 if (status != 0) { 934 task->tk_status = status; 935 return; 936 } 937 938 dprintk("RPC: %5u xmit complete\n", task->tk_pid); 939 task->tk_flags |= RPC_TASK_SENT; 940 spin_lock_bh(&xprt->transport_lock); 941 942 xprt->ops->set_retrans_timeout(task); 943 944 numreqs = atomic_read(&xprt->num_reqs); 945 if (numreqs > xprt->stat.max_slots) 946 xprt->stat.max_slots = numreqs; 947 xprt->stat.sends++; 948 xprt->stat.req_u += xprt->stat.sends - xprt->stat.recvs; 949 xprt->stat.bklog_u += xprt->backlog.qlen; 950 xprt->stat.sending_u += xprt->sending.qlen; 951 xprt->stat.pending_u += xprt->pending.qlen; 952 953 /* Don't race with disconnect */ 954 if (!xprt_connected(xprt)) 955 task->tk_status = -ENOTCONN; 956 else { 957 /* 958 * Sleep on the pending queue since 959 * we're expecting a reply. 960 */ 961 if (!req->rq_reply_bytes_recvd && rpc_reply_expected(task)) 962 rpc_sleep_on(&xprt->pending, task, xprt_timer); 963 req->rq_connect_cookie = xprt->connect_cookie; 964 } 965 spin_unlock_bh(&xprt->transport_lock); 966 } 967 968 static void xprt_add_backlog(struct rpc_xprt *xprt, struct rpc_task *task) 969 { 970 set_bit(XPRT_CONGESTED, &xprt->state); 971 rpc_sleep_on(&xprt->backlog, task, NULL); 972 } 973 974 static void xprt_wake_up_backlog(struct rpc_xprt *xprt) 975 { 976 if (rpc_wake_up_next(&xprt->backlog) == NULL) 977 clear_bit(XPRT_CONGESTED, &xprt->state); 978 } 979 980 static bool xprt_throttle_congested(struct rpc_xprt *xprt, struct rpc_task *task) 981 { 982 bool ret = false; 983 984 if (!test_bit(XPRT_CONGESTED, &xprt->state)) 985 goto out; 986 spin_lock(&xprt->reserve_lock); 987 if (test_bit(XPRT_CONGESTED, &xprt->state)) { 988 rpc_sleep_on(&xprt->backlog, task, NULL); 989 ret = true; 990 } 991 spin_unlock(&xprt->reserve_lock); 992 out: 993 return ret; 994 } 995 996 static struct rpc_rqst *xprt_dynamic_alloc_slot(struct rpc_xprt *xprt, gfp_t gfp_flags) 997 { 998 struct rpc_rqst *req = ERR_PTR(-EAGAIN); 999 1000 if (!atomic_add_unless(&xprt->num_reqs, 1, xprt->max_reqs)) 1001 goto out; 1002 req = kzalloc(sizeof(struct rpc_rqst), gfp_flags); 1003 if (req != NULL) 1004 goto out; 1005 atomic_dec(&xprt->num_reqs); 1006 req = ERR_PTR(-ENOMEM); 1007 out: 1008 return req; 1009 } 1010 1011 static bool xprt_dynamic_free_slot(struct rpc_xprt *xprt, struct rpc_rqst *req) 1012 { 1013 if (atomic_add_unless(&xprt->num_reqs, -1, xprt->min_reqs)) { 1014 kfree(req); 1015 return true; 1016 } 1017 return false; 1018 } 1019 1020 void xprt_alloc_slot(struct rpc_xprt *xprt, struct rpc_task *task) 1021 { 1022 struct rpc_rqst *req; 1023 1024 spin_lock(&xprt->reserve_lock); 1025 if (!list_empty(&xprt->free)) { 1026 req = list_entry(xprt->free.next, struct rpc_rqst, rq_list); 1027 list_del(&req->rq_list); 1028 goto out_init_req; 1029 } 1030 req = xprt_dynamic_alloc_slot(xprt, GFP_NOWAIT|__GFP_NOWARN); 1031 if (!IS_ERR(req)) 1032 goto out_init_req; 1033 switch (PTR_ERR(req)) { 1034 case -ENOMEM: 1035 dprintk("RPC: dynamic allocation of request slot " 1036 "failed! Retrying\n"); 1037 task->tk_status = -ENOMEM; 1038 break; 1039 case -EAGAIN: 1040 xprt_add_backlog(xprt, task); 1041 dprintk("RPC: waiting for request slot\n"); 1042 default: 1043 task->tk_status = -EAGAIN; 1044 } 1045 spin_unlock(&xprt->reserve_lock); 1046 return; 1047 out_init_req: 1048 task->tk_status = 0; 1049 task->tk_rqstp = req; 1050 xprt_request_init(task, xprt); 1051 spin_unlock(&xprt->reserve_lock); 1052 } 1053 EXPORT_SYMBOL_GPL(xprt_alloc_slot); 1054 1055 void xprt_lock_and_alloc_slot(struct rpc_xprt *xprt, struct rpc_task *task) 1056 { 1057 /* Note: grabbing the xprt_lock_write() ensures that we throttle 1058 * new slot allocation if the transport is congested (i.e. when 1059 * reconnecting a stream transport or when out of socket write 1060 * buffer space). 1061 */ 1062 if (xprt_lock_write(xprt, task)) { 1063 xprt_alloc_slot(xprt, task); 1064 xprt_release_write(xprt, task); 1065 } 1066 } 1067 EXPORT_SYMBOL_GPL(xprt_lock_and_alloc_slot); 1068 1069 static void xprt_free_slot(struct rpc_xprt *xprt, struct rpc_rqst *req) 1070 { 1071 spin_lock(&xprt->reserve_lock); 1072 if (!xprt_dynamic_free_slot(xprt, req)) { 1073 memset(req, 0, sizeof(*req)); /* mark unused */ 1074 list_add(&req->rq_list, &xprt->free); 1075 } 1076 xprt_wake_up_backlog(xprt); 1077 spin_unlock(&xprt->reserve_lock); 1078 } 1079 1080 static void xprt_free_all_slots(struct rpc_xprt *xprt) 1081 { 1082 struct rpc_rqst *req; 1083 while (!list_empty(&xprt->free)) { 1084 req = list_first_entry(&xprt->free, struct rpc_rqst, rq_list); 1085 list_del(&req->rq_list); 1086 kfree(req); 1087 } 1088 } 1089 1090 struct rpc_xprt *xprt_alloc(struct net *net, size_t size, 1091 unsigned int num_prealloc, 1092 unsigned int max_alloc) 1093 { 1094 struct rpc_xprt *xprt; 1095 struct rpc_rqst *req; 1096 int i; 1097 1098 xprt = kzalloc(size, GFP_KERNEL); 1099 if (xprt == NULL) 1100 goto out; 1101 1102 xprt_init(xprt, net); 1103 1104 for (i = 0; i < num_prealloc; i++) { 1105 req = kzalloc(sizeof(struct rpc_rqst), GFP_KERNEL); 1106 if (!req) 1107 goto out_free; 1108 list_add(&req->rq_list, &xprt->free); 1109 } 1110 if (max_alloc > num_prealloc) 1111 xprt->max_reqs = max_alloc; 1112 else 1113 xprt->max_reqs = num_prealloc; 1114 xprt->min_reqs = num_prealloc; 1115 atomic_set(&xprt->num_reqs, num_prealloc); 1116 1117 return xprt; 1118 1119 out_free: 1120 xprt_free(xprt); 1121 out: 1122 return NULL; 1123 } 1124 EXPORT_SYMBOL_GPL(xprt_alloc); 1125 1126 void xprt_free(struct rpc_xprt *xprt) 1127 { 1128 put_net(xprt->xprt_net); 1129 xprt_free_all_slots(xprt); 1130 kfree(xprt); 1131 } 1132 EXPORT_SYMBOL_GPL(xprt_free); 1133 1134 /** 1135 * xprt_reserve - allocate an RPC request slot 1136 * @task: RPC task requesting a slot allocation 1137 * 1138 * If the transport is marked as being congested, or if no more 1139 * slots are available, place the task on the transport's 1140 * backlog queue. 1141 */ 1142 void xprt_reserve(struct rpc_task *task) 1143 { 1144 struct rpc_xprt *xprt; 1145 1146 task->tk_status = 0; 1147 if (task->tk_rqstp != NULL) 1148 return; 1149 1150 task->tk_timeout = 0; 1151 task->tk_status = -EAGAIN; 1152 rcu_read_lock(); 1153 xprt = rcu_dereference(task->tk_client->cl_xprt); 1154 if (!xprt_throttle_congested(xprt, task)) 1155 xprt->ops->alloc_slot(xprt, task); 1156 rcu_read_unlock(); 1157 } 1158 1159 /** 1160 * xprt_retry_reserve - allocate an RPC request slot 1161 * @task: RPC task requesting a slot allocation 1162 * 1163 * If no more slots are available, place the task on the transport's 1164 * backlog queue. 1165 * Note that the only difference with xprt_reserve is that we now 1166 * ignore the value of the XPRT_CONGESTED flag. 1167 */ 1168 void xprt_retry_reserve(struct rpc_task *task) 1169 { 1170 struct rpc_xprt *xprt; 1171 1172 task->tk_status = 0; 1173 if (task->tk_rqstp != NULL) 1174 return; 1175 1176 task->tk_timeout = 0; 1177 task->tk_status = -EAGAIN; 1178 rcu_read_lock(); 1179 xprt = rcu_dereference(task->tk_client->cl_xprt); 1180 xprt->ops->alloc_slot(xprt, task); 1181 rcu_read_unlock(); 1182 } 1183 1184 static inline __be32 xprt_alloc_xid(struct rpc_xprt *xprt) 1185 { 1186 return (__force __be32)xprt->xid++; 1187 } 1188 1189 static inline void xprt_init_xid(struct rpc_xprt *xprt) 1190 { 1191 xprt->xid = net_random(); 1192 } 1193 1194 static void xprt_request_init(struct rpc_task *task, struct rpc_xprt *xprt) 1195 { 1196 struct rpc_rqst *req = task->tk_rqstp; 1197 1198 INIT_LIST_HEAD(&req->rq_list); 1199 req->rq_timeout = task->tk_client->cl_timeout->to_initval; 1200 req->rq_task = task; 1201 req->rq_xprt = xprt; 1202 req->rq_buffer = NULL; 1203 req->rq_xid = xprt_alloc_xid(xprt); 1204 req->rq_connect_cookie = xprt->connect_cookie - 1; 1205 req->rq_bytes_sent = 0; 1206 req->rq_snd_buf.len = 0; 1207 req->rq_snd_buf.buflen = 0; 1208 req->rq_rcv_buf.len = 0; 1209 req->rq_rcv_buf.buflen = 0; 1210 req->rq_release_snd_buf = NULL; 1211 xprt_reset_majortimeo(req); 1212 dprintk("RPC: %5u reserved req %p xid %08x\n", task->tk_pid, 1213 req, ntohl(req->rq_xid)); 1214 } 1215 1216 /** 1217 * xprt_release - release an RPC request slot 1218 * @task: task which is finished with the slot 1219 * 1220 */ 1221 void xprt_release(struct rpc_task *task) 1222 { 1223 struct rpc_xprt *xprt; 1224 struct rpc_rqst *req = task->tk_rqstp; 1225 1226 if (req == NULL) { 1227 if (task->tk_client) { 1228 rcu_read_lock(); 1229 xprt = rcu_dereference(task->tk_client->cl_xprt); 1230 if (xprt->snd_task == task) 1231 xprt_release_write(xprt, task); 1232 rcu_read_unlock(); 1233 } 1234 return; 1235 } 1236 1237 xprt = req->rq_xprt; 1238 if (task->tk_ops->rpc_count_stats != NULL) 1239 task->tk_ops->rpc_count_stats(task, task->tk_calldata); 1240 else if (task->tk_client) 1241 rpc_count_iostats(task, task->tk_client->cl_metrics); 1242 spin_lock_bh(&xprt->transport_lock); 1243 xprt->ops->release_xprt(xprt, task); 1244 if (xprt->ops->release_request) 1245 xprt->ops->release_request(task); 1246 if (!list_empty(&req->rq_list)) 1247 list_del(&req->rq_list); 1248 xprt->last_used = jiffies; 1249 if (list_empty(&xprt->recv) && xprt_has_timer(xprt)) 1250 mod_timer(&xprt->timer, 1251 xprt->last_used + xprt->idle_timeout); 1252 spin_unlock_bh(&xprt->transport_lock); 1253 if (req->rq_buffer) 1254 xprt->ops->buf_free(req->rq_buffer); 1255 if (req->rq_cred != NULL) 1256 put_rpccred(req->rq_cred); 1257 task->tk_rqstp = NULL; 1258 if (req->rq_release_snd_buf) 1259 req->rq_release_snd_buf(req); 1260 1261 dprintk("RPC: %5u release request %p\n", task->tk_pid, req); 1262 if (likely(!bc_prealloc(req))) 1263 xprt_free_slot(xprt, req); 1264 else 1265 xprt_free_bc_request(req); 1266 } 1267 1268 static void xprt_init(struct rpc_xprt *xprt, struct net *net) 1269 { 1270 atomic_set(&xprt->count, 1); 1271 1272 spin_lock_init(&xprt->transport_lock); 1273 spin_lock_init(&xprt->reserve_lock); 1274 1275 INIT_LIST_HEAD(&xprt->free); 1276 INIT_LIST_HEAD(&xprt->recv); 1277 #if defined(CONFIG_SUNRPC_BACKCHANNEL) 1278 spin_lock_init(&xprt->bc_pa_lock); 1279 INIT_LIST_HEAD(&xprt->bc_pa_list); 1280 #endif /* CONFIG_SUNRPC_BACKCHANNEL */ 1281 1282 xprt->last_used = jiffies; 1283 xprt->cwnd = RPC_INITCWND; 1284 xprt->bind_index = 0; 1285 1286 rpc_init_wait_queue(&xprt->binding, "xprt_binding"); 1287 rpc_init_wait_queue(&xprt->pending, "xprt_pending"); 1288 rpc_init_priority_wait_queue(&xprt->sending, "xprt_sending"); 1289 rpc_init_priority_wait_queue(&xprt->backlog, "xprt_backlog"); 1290 1291 xprt_init_xid(xprt); 1292 1293 xprt->xprt_net = get_net(net); 1294 } 1295 1296 /** 1297 * xprt_create_transport - create an RPC transport 1298 * @args: rpc transport creation arguments 1299 * 1300 */ 1301 struct rpc_xprt *xprt_create_transport(struct xprt_create *args) 1302 { 1303 struct rpc_xprt *xprt; 1304 struct xprt_class *t; 1305 1306 spin_lock(&xprt_list_lock); 1307 list_for_each_entry(t, &xprt_list, list) { 1308 if (t->ident == args->ident) { 1309 spin_unlock(&xprt_list_lock); 1310 goto found; 1311 } 1312 } 1313 spin_unlock(&xprt_list_lock); 1314 printk(KERN_ERR "RPC: transport (%d) not supported\n", args->ident); 1315 return ERR_PTR(-EIO); 1316 1317 found: 1318 xprt = t->setup(args); 1319 if (IS_ERR(xprt)) { 1320 dprintk("RPC: xprt_create_transport: failed, %ld\n", 1321 -PTR_ERR(xprt)); 1322 goto out; 1323 } 1324 if (args->flags & XPRT_CREATE_NO_IDLE_TIMEOUT) 1325 xprt->idle_timeout = 0; 1326 INIT_WORK(&xprt->task_cleanup, xprt_autoclose); 1327 if (xprt_has_timer(xprt)) 1328 setup_timer(&xprt->timer, xprt_init_autodisconnect, 1329 (unsigned long)xprt); 1330 else 1331 init_timer(&xprt->timer); 1332 1333 if (strlen(args->servername) > RPC_MAXNETNAMELEN) { 1334 xprt_destroy(xprt); 1335 return ERR_PTR(-EINVAL); 1336 } 1337 xprt->servername = kstrdup(args->servername, GFP_KERNEL); 1338 if (xprt->servername == NULL) { 1339 xprt_destroy(xprt); 1340 return ERR_PTR(-ENOMEM); 1341 } 1342 1343 dprintk("RPC: created transport %p with %u slots\n", xprt, 1344 xprt->max_reqs); 1345 out: 1346 return xprt; 1347 } 1348 1349 /** 1350 * xprt_destroy - destroy an RPC transport, killing off all requests. 1351 * @xprt: transport to destroy 1352 * 1353 */ 1354 static void xprt_destroy(struct rpc_xprt *xprt) 1355 { 1356 dprintk("RPC: destroying transport %p\n", xprt); 1357 del_timer_sync(&xprt->timer); 1358 1359 rpc_destroy_wait_queue(&xprt->binding); 1360 rpc_destroy_wait_queue(&xprt->pending); 1361 rpc_destroy_wait_queue(&xprt->sending); 1362 rpc_destroy_wait_queue(&xprt->backlog); 1363 cancel_work_sync(&xprt->task_cleanup); 1364 kfree(xprt->servername); 1365 /* 1366 * Tear down transport state and free the rpc_xprt 1367 */ 1368 xprt->ops->destroy(xprt); 1369 } 1370 1371 /** 1372 * xprt_put - release a reference to an RPC transport. 1373 * @xprt: pointer to the transport 1374 * 1375 */ 1376 void xprt_put(struct rpc_xprt *xprt) 1377 { 1378 if (atomic_dec_and_test(&xprt->count)) 1379 xprt_destroy(xprt); 1380 } 1381 1382 /** 1383 * xprt_get - return a reference to an RPC transport. 1384 * @xprt: pointer to the transport 1385 * 1386 */ 1387 struct rpc_xprt *xprt_get(struct rpc_xprt *xprt) 1388 { 1389 if (atomic_inc_not_zero(&xprt->count)) 1390 return xprt; 1391 return NULL; 1392 } 1393