1 /* 2 * linux/net/sunrpc/svc_xprt.c 3 * 4 * Author: Tom Tucker <tom@opengridcomputing.com> 5 */ 6 7 #include <linux/sched.h> 8 #include <linux/errno.h> 9 #include <linux/freezer.h> 10 #include <linux/kthread.h> 11 #include <linux/slab.h> 12 #include <net/sock.h> 13 #include <linux/sunrpc/stats.h> 14 #include <linux/sunrpc/svc_xprt.h> 15 #include <linux/sunrpc/svcsock.h> 16 17 #define RPCDBG_FACILITY RPCDBG_SVCXPRT 18 19 static struct svc_deferred_req *svc_deferred_dequeue(struct svc_xprt *xprt); 20 static int svc_deferred_recv(struct svc_rqst *rqstp); 21 static struct cache_deferred_req *svc_defer(struct cache_req *req); 22 static void svc_age_temp_xprts(unsigned long closure); 23 24 /* apparently the "standard" is that clients close 25 * idle connections after 5 minutes, servers after 26 * 6 minutes 27 * http://www.connectathon.org/talks96/nfstcp.pdf 28 */ 29 static int svc_conn_age_period = 6*60; 30 31 /* List of registered transport classes */ 32 static DEFINE_SPINLOCK(svc_xprt_class_lock); 33 static LIST_HEAD(svc_xprt_class_list); 34 35 /* SMP locking strategy: 36 * 37 * svc_pool->sp_lock protects most of the fields of that pool. 38 * svc_serv->sv_lock protects sv_tempsocks, sv_permsocks, sv_tmpcnt. 39 * when both need to be taken (rare), svc_serv->sv_lock is first. 40 * BKL protects svc_serv->sv_nrthread. 41 * svc_sock->sk_lock protects the svc_sock->sk_deferred list 42 * and the ->sk_info_authunix cache. 43 * 44 * The XPT_BUSY bit in xprt->xpt_flags prevents a transport being 45 * enqueued multiply. During normal transport processing this bit 46 * is set by svc_xprt_enqueue and cleared by svc_xprt_received. 47 * Providers should not manipulate this bit directly. 48 * 49 * Some flags can be set to certain values at any time 50 * providing that certain rules are followed: 51 * 52 * XPT_CONN, XPT_DATA: 53 * - Can be set or cleared at any time. 54 * - After a set, svc_xprt_enqueue must be called to enqueue 55 * the transport for processing. 56 * - After a clear, the transport must be read/accepted. 57 * If this succeeds, it must be set again. 58 * XPT_CLOSE: 59 * - Can set at any time. It is never cleared. 60 * XPT_DEAD: 61 * - Can only be set while XPT_BUSY is held which ensures 62 * that no other thread will be using the transport or will 63 * try to set XPT_DEAD. 64 */ 65 66 int svc_reg_xprt_class(struct svc_xprt_class *xcl) 67 { 68 struct svc_xprt_class *cl; 69 int res = -EEXIST; 70 71 dprintk("svc: Adding svc transport class '%s'\n", xcl->xcl_name); 72 73 INIT_LIST_HEAD(&xcl->xcl_list); 74 spin_lock(&svc_xprt_class_lock); 75 /* Make sure there isn't already a class with the same name */ 76 list_for_each_entry(cl, &svc_xprt_class_list, xcl_list) { 77 if (strcmp(xcl->xcl_name, cl->xcl_name) == 0) 78 goto out; 79 } 80 list_add_tail(&xcl->xcl_list, &svc_xprt_class_list); 81 res = 0; 82 out: 83 spin_unlock(&svc_xprt_class_lock); 84 return res; 85 } 86 EXPORT_SYMBOL_GPL(svc_reg_xprt_class); 87 88 void svc_unreg_xprt_class(struct svc_xprt_class *xcl) 89 { 90 dprintk("svc: Removing svc transport class '%s'\n", xcl->xcl_name); 91 spin_lock(&svc_xprt_class_lock); 92 list_del_init(&xcl->xcl_list); 93 spin_unlock(&svc_xprt_class_lock); 94 } 95 EXPORT_SYMBOL_GPL(svc_unreg_xprt_class); 96 97 /* 98 * Format the transport list for printing 99 */ 100 int svc_print_xprts(char *buf, int maxlen) 101 { 102 struct svc_xprt_class *xcl; 103 char tmpstr[80]; 104 int len = 0; 105 buf[0] = '\0'; 106 107 spin_lock(&svc_xprt_class_lock); 108 list_for_each_entry(xcl, &svc_xprt_class_list, xcl_list) { 109 int slen; 110 111 sprintf(tmpstr, "%s %d\n", xcl->xcl_name, xcl->xcl_max_payload); 112 slen = strlen(tmpstr); 113 if (len + slen > maxlen) 114 break; 115 len += slen; 116 strcat(buf, tmpstr); 117 } 118 spin_unlock(&svc_xprt_class_lock); 119 120 return len; 121 } 122 123 static void svc_xprt_free(struct kref *kref) 124 { 125 struct svc_xprt *xprt = 126 container_of(kref, struct svc_xprt, xpt_ref); 127 struct module *owner = xprt->xpt_class->xcl_owner; 128 if (test_bit(XPT_CACHE_AUTH, &xprt->xpt_flags)) 129 svcauth_unix_info_release(xprt); 130 put_net(xprt->xpt_net); 131 xprt->xpt_ops->xpo_free(xprt); 132 module_put(owner); 133 } 134 135 void svc_xprt_put(struct svc_xprt *xprt) 136 { 137 kref_put(&xprt->xpt_ref, svc_xprt_free); 138 } 139 EXPORT_SYMBOL_GPL(svc_xprt_put); 140 141 /* 142 * Called by transport drivers to initialize the transport independent 143 * portion of the transport instance. 144 */ 145 void svc_xprt_init(struct svc_xprt_class *xcl, struct svc_xprt *xprt, 146 struct svc_serv *serv) 147 { 148 memset(xprt, 0, sizeof(*xprt)); 149 xprt->xpt_class = xcl; 150 xprt->xpt_ops = xcl->xcl_ops; 151 kref_init(&xprt->xpt_ref); 152 xprt->xpt_server = serv; 153 INIT_LIST_HEAD(&xprt->xpt_list); 154 INIT_LIST_HEAD(&xprt->xpt_ready); 155 INIT_LIST_HEAD(&xprt->xpt_deferred); 156 INIT_LIST_HEAD(&xprt->xpt_users); 157 mutex_init(&xprt->xpt_mutex); 158 spin_lock_init(&xprt->xpt_lock); 159 set_bit(XPT_BUSY, &xprt->xpt_flags); 160 rpc_init_wait_queue(&xprt->xpt_bc_pending, "xpt_bc_pending"); 161 xprt->xpt_net = get_net(&init_net); 162 } 163 EXPORT_SYMBOL_GPL(svc_xprt_init); 164 165 static struct svc_xprt *__svc_xpo_create(struct svc_xprt_class *xcl, 166 struct svc_serv *serv, 167 struct net *net, 168 const int family, 169 const unsigned short port, 170 int flags) 171 { 172 struct sockaddr_in sin = { 173 .sin_family = AF_INET, 174 .sin_addr.s_addr = htonl(INADDR_ANY), 175 .sin_port = htons(port), 176 }; 177 #if defined(CONFIG_IPV6) || defined(CONFIG_IPV6_MODULE) 178 struct sockaddr_in6 sin6 = { 179 .sin6_family = AF_INET6, 180 .sin6_addr = IN6ADDR_ANY_INIT, 181 .sin6_port = htons(port), 182 }; 183 #endif /* defined(CONFIG_IPV6) || defined(CONFIG_IPV6_MODULE) */ 184 struct sockaddr *sap; 185 size_t len; 186 187 switch (family) { 188 case PF_INET: 189 sap = (struct sockaddr *)&sin; 190 len = sizeof(sin); 191 break; 192 #if defined(CONFIG_IPV6) || defined(CONFIG_IPV6_MODULE) 193 case PF_INET6: 194 sap = (struct sockaddr *)&sin6; 195 len = sizeof(sin6); 196 break; 197 #endif /* defined(CONFIG_IPV6) || defined(CONFIG_IPV6_MODULE) */ 198 default: 199 return ERR_PTR(-EAFNOSUPPORT); 200 } 201 202 return xcl->xcl_ops->xpo_create(serv, net, sap, len, flags); 203 } 204 205 int svc_create_xprt(struct svc_serv *serv, const char *xprt_name, 206 struct net *net, const int family, 207 const unsigned short port, int flags) 208 { 209 struct svc_xprt_class *xcl; 210 211 dprintk("svc: creating transport %s[%d]\n", xprt_name, port); 212 spin_lock(&svc_xprt_class_lock); 213 list_for_each_entry(xcl, &svc_xprt_class_list, xcl_list) { 214 struct svc_xprt *newxprt; 215 unsigned short newport; 216 217 if (strcmp(xprt_name, xcl->xcl_name)) 218 continue; 219 220 if (!try_module_get(xcl->xcl_owner)) 221 goto err; 222 223 spin_unlock(&svc_xprt_class_lock); 224 newxprt = __svc_xpo_create(xcl, serv, net, family, port, flags); 225 if (IS_ERR(newxprt)) { 226 module_put(xcl->xcl_owner); 227 return PTR_ERR(newxprt); 228 } 229 230 clear_bit(XPT_TEMP, &newxprt->xpt_flags); 231 spin_lock_bh(&serv->sv_lock); 232 list_add(&newxprt->xpt_list, &serv->sv_permsocks); 233 spin_unlock_bh(&serv->sv_lock); 234 newport = svc_xprt_local_port(newxprt); 235 clear_bit(XPT_BUSY, &newxprt->xpt_flags); 236 return newport; 237 } 238 err: 239 spin_unlock(&svc_xprt_class_lock); 240 dprintk("svc: transport %s not found\n", xprt_name); 241 242 /* This errno is exposed to user space. Provide a reasonable 243 * perror msg for a bad transport. */ 244 return -EPROTONOSUPPORT; 245 } 246 EXPORT_SYMBOL_GPL(svc_create_xprt); 247 248 /* 249 * Copy the local and remote xprt addresses to the rqstp structure 250 */ 251 void svc_xprt_copy_addrs(struct svc_rqst *rqstp, struct svc_xprt *xprt) 252 { 253 struct sockaddr *sin; 254 255 memcpy(&rqstp->rq_addr, &xprt->xpt_remote, xprt->xpt_remotelen); 256 rqstp->rq_addrlen = xprt->xpt_remotelen; 257 258 /* 259 * Destination address in request is needed for binding the 260 * source address in RPC replies/callbacks later. 261 */ 262 sin = (struct sockaddr *)&xprt->xpt_local; 263 switch (sin->sa_family) { 264 case AF_INET: 265 rqstp->rq_daddr.addr = ((struct sockaddr_in *)sin)->sin_addr; 266 break; 267 case AF_INET6: 268 rqstp->rq_daddr.addr6 = ((struct sockaddr_in6 *)sin)->sin6_addr; 269 break; 270 } 271 } 272 EXPORT_SYMBOL_GPL(svc_xprt_copy_addrs); 273 274 /** 275 * svc_print_addr - Format rq_addr field for printing 276 * @rqstp: svc_rqst struct containing address to print 277 * @buf: target buffer for formatted address 278 * @len: length of target buffer 279 * 280 */ 281 char *svc_print_addr(struct svc_rqst *rqstp, char *buf, size_t len) 282 { 283 return __svc_print_addr(svc_addr(rqstp), buf, len); 284 } 285 EXPORT_SYMBOL_GPL(svc_print_addr); 286 287 /* 288 * Queue up an idle server thread. Must have pool->sp_lock held. 289 * Note: this is really a stack rather than a queue, so that we only 290 * use as many different threads as we need, and the rest don't pollute 291 * the cache. 292 */ 293 static void svc_thread_enqueue(struct svc_pool *pool, struct svc_rqst *rqstp) 294 { 295 list_add(&rqstp->rq_list, &pool->sp_threads); 296 } 297 298 /* 299 * Dequeue an nfsd thread. Must have pool->sp_lock held. 300 */ 301 static void svc_thread_dequeue(struct svc_pool *pool, struct svc_rqst *rqstp) 302 { 303 list_del(&rqstp->rq_list); 304 } 305 306 /* 307 * Queue up a transport with data pending. If there are idle nfsd 308 * processes, wake 'em up. 309 * 310 */ 311 void svc_xprt_enqueue(struct svc_xprt *xprt) 312 { 313 struct svc_serv *serv = xprt->xpt_server; 314 struct svc_pool *pool; 315 struct svc_rqst *rqstp; 316 int cpu; 317 318 if (!(xprt->xpt_flags & 319 ((1<<XPT_CONN)|(1<<XPT_DATA)|(1<<XPT_CLOSE)|(1<<XPT_DEFERRED)))) 320 return; 321 322 cpu = get_cpu(); 323 pool = svc_pool_for_cpu(xprt->xpt_server, cpu); 324 put_cpu(); 325 326 spin_lock_bh(&pool->sp_lock); 327 328 if (!list_empty(&pool->sp_threads) && 329 !list_empty(&pool->sp_sockets)) 330 printk(KERN_ERR 331 "svc_xprt_enqueue: " 332 "threads and transports both waiting??\n"); 333 334 pool->sp_stats.packets++; 335 336 /* Mark transport as busy. It will remain in this state until 337 * the provider calls svc_xprt_received. We update XPT_BUSY 338 * atomically because it also guards against trying to enqueue 339 * the transport twice. 340 */ 341 if (test_and_set_bit(XPT_BUSY, &xprt->xpt_flags)) { 342 /* Don't enqueue transport while already enqueued */ 343 dprintk("svc: transport %p busy, not enqueued\n", xprt); 344 goto out_unlock; 345 } 346 BUG_ON(xprt->xpt_pool != NULL); 347 xprt->xpt_pool = pool; 348 349 /* Handle pending connection */ 350 if (test_bit(XPT_CONN, &xprt->xpt_flags)) 351 goto process; 352 353 /* Handle close in-progress */ 354 if (test_bit(XPT_CLOSE, &xprt->xpt_flags)) 355 goto process; 356 357 /* Check if we have space to reply to a request */ 358 if (!xprt->xpt_ops->xpo_has_wspace(xprt)) { 359 /* Don't enqueue while not enough space for reply */ 360 dprintk("svc: no write space, transport %p not enqueued\n", 361 xprt); 362 xprt->xpt_pool = NULL; 363 clear_bit(XPT_BUSY, &xprt->xpt_flags); 364 goto out_unlock; 365 } 366 367 process: 368 if (!list_empty(&pool->sp_threads)) { 369 rqstp = list_entry(pool->sp_threads.next, 370 struct svc_rqst, 371 rq_list); 372 dprintk("svc: transport %p served by daemon %p\n", 373 xprt, rqstp); 374 svc_thread_dequeue(pool, rqstp); 375 if (rqstp->rq_xprt) 376 printk(KERN_ERR 377 "svc_xprt_enqueue: server %p, rq_xprt=%p!\n", 378 rqstp, rqstp->rq_xprt); 379 rqstp->rq_xprt = xprt; 380 svc_xprt_get(xprt); 381 rqstp->rq_reserved = serv->sv_max_mesg; 382 atomic_add(rqstp->rq_reserved, &xprt->xpt_reserved); 383 pool->sp_stats.threads_woken++; 384 BUG_ON(xprt->xpt_pool != pool); 385 wake_up(&rqstp->rq_wait); 386 } else { 387 dprintk("svc: transport %p put into queue\n", xprt); 388 list_add_tail(&xprt->xpt_ready, &pool->sp_sockets); 389 pool->sp_stats.sockets_queued++; 390 BUG_ON(xprt->xpt_pool != pool); 391 } 392 393 out_unlock: 394 spin_unlock_bh(&pool->sp_lock); 395 } 396 EXPORT_SYMBOL_GPL(svc_xprt_enqueue); 397 398 /* 399 * Dequeue the first transport. Must be called with the pool->sp_lock held. 400 */ 401 static struct svc_xprt *svc_xprt_dequeue(struct svc_pool *pool) 402 { 403 struct svc_xprt *xprt; 404 405 if (list_empty(&pool->sp_sockets)) 406 return NULL; 407 408 xprt = list_entry(pool->sp_sockets.next, 409 struct svc_xprt, xpt_ready); 410 list_del_init(&xprt->xpt_ready); 411 412 dprintk("svc: transport %p dequeued, inuse=%d\n", 413 xprt, atomic_read(&xprt->xpt_ref.refcount)); 414 415 return xprt; 416 } 417 418 /* 419 * svc_xprt_received conditionally queues the transport for processing 420 * by another thread. The caller must hold the XPT_BUSY bit and must 421 * not thereafter touch transport data. 422 * 423 * Note: XPT_DATA only gets cleared when a read-attempt finds no (or 424 * insufficient) data. 425 */ 426 void svc_xprt_received(struct svc_xprt *xprt) 427 { 428 BUG_ON(!test_bit(XPT_BUSY, &xprt->xpt_flags)); 429 xprt->xpt_pool = NULL; 430 /* As soon as we clear busy, the xprt could be closed and 431 * 'put', so we need a reference to call svc_xprt_enqueue with: 432 */ 433 svc_xprt_get(xprt); 434 clear_bit(XPT_BUSY, &xprt->xpt_flags); 435 svc_xprt_enqueue(xprt); 436 svc_xprt_put(xprt); 437 } 438 EXPORT_SYMBOL_GPL(svc_xprt_received); 439 440 /** 441 * svc_reserve - change the space reserved for the reply to a request. 442 * @rqstp: The request in question 443 * @space: new max space to reserve 444 * 445 * Each request reserves some space on the output queue of the transport 446 * to make sure the reply fits. This function reduces that reserved 447 * space to be the amount of space used already, plus @space. 448 * 449 */ 450 void svc_reserve(struct svc_rqst *rqstp, int space) 451 { 452 space += rqstp->rq_res.head[0].iov_len; 453 454 if (space < rqstp->rq_reserved) { 455 struct svc_xprt *xprt = rqstp->rq_xprt; 456 atomic_sub((rqstp->rq_reserved - space), &xprt->xpt_reserved); 457 rqstp->rq_reserved = space; 458 459 svc_xprt_enqueue(xprt); 460 } 461 } 462 EXPORT_SYMBOL_GPL(svc_reserve); 463 464 static void svc_xprt_release(struct svc_rqst *rqstp) 465 { 466 struct svc_xprt *xprt = rqstp->rq_xprt; 467 468 rqstp->rq_xprt->xpt_ops->xpo_release_rqst(rqstp); 469 470 kfree(rqstp->rq_deferred); 471 rqstp->rq_deferred = NULL; 472 473 svc_free_res_pages(rqstp); 474 rqstp->rq_res.page_len = 0; 475 rqstp->rq_res.page_base = 0; 476 477 /* Reset response buffer and release 478 * the reservation. 479 * But first, check that enough space was reserved 480 * for the reply, otherwise we have a bug! 481 */ 482 if ((rqstp->rq_res.len) > rqstp->rq_reserved) 483 printk(KERN_ERR "RPC request reserved %d but used %d\n", 484 rqstp->rq_reserved, 485 rqstp->rq_res.len); 486 487 rqstp->rq_res.head[0].iov_len = 0; 488 svc_reserve(rqstp, 0); 489 rqstp->rq_xprt = NULL; 490 491 svc_xprt_put(xprt); 492 } 493 494 /* 495 * External function to wake up a server waiting for data 496 * This really only makes sense for services like lockd 497 * which have exactly one thread anyway. 498 */ 499 void svc_wake_up(struct svc_serv *serv) 500 { 501 struct svc_rqst *rqstp; 502 unsigned int i; 503 struct svc_pool *pool; 504 505 for (i = 0; i < serv->sv_nrpools; i++) { 506 pool = &serv->sv_pools[i]; 507 508 spin_lock_bh(&pool->sp_lock); 509 if (!list_empty(&pool->sp_threads)) { 510 rqstp = list_entry(pool->sp_threads.next, 511 struct svc_rqst, 512 rq_list); 513 dprintk("svc: daemon %p woken up.\n", rqstp); 514 /* 515 svc_thread_dequeue(pool, rqstp); 516 rqstp->rq_xprt = NULL; 517 */ 518 wake_up(&rqstp->rq_wait); 519 } 520 spin_unlock_bh(&pool->sp_lock); 521 } 522 } 523 EXPORT_SYMBOL_GPL(svc_wake_up); 524 525 int svc_port_is_privileged(struct sockaddr *sin) 526 { 527 switch (sin->sa_family) { 528 case AF_INET: 529 return ntohs(((struct sockaddr_in *)sin)->sin_port) 530 < PROT_SOCK; 531 case AF_INET6: 532 return ntohs(((struct sockaddr_in6 *)sin)->sin6_port) 533 < PROT_SOCK; 534 default: 535 return 0; 536 } 537 } 538 539 /* 540 * Make sure that we don't have too many active connections. If we have, 541 * something must be dropped. It's not clear what will happen if we allow 542 * "too many" connections, but when dealing with network-facing software, 543 * we have to code defensively. Here we do that by imposing hard limits. 544 * 545 * There's no point in trying to do random drop here for DoS 546 * prevention. The NFS clients does 1 reconnect in 15 seconds. An 547 * attacker can easily beat that. 548 * 549 * The only somewhat efficient mechanism would be if drop old 550 * connections from the same IP first. But right now we don't even 551 * record the client IP in svc_sock. 552 * 553 * single-threaded services that expect a lot of clients will probably 554 * need to set sv_maxconn to override the default value which is based 555 * on the number of threads 556 */ 557 static void svc_check_conn_limits(struct svc_serv *serv) 558 { 559 unsigned int limit = serv->sv_maxconn ? serv->sv_maxconn : 560 (serv->sv_nrthreads+3) * 20; 561 562 if (serv->sv_tmpcnt > limit) { 563 struct svc_xprt *xprt = NULL; 564 spin_lock_bh(&serv->sv_lock); 565 if (!list_empty(&serv->sv_tempsocks)) { 566 if (net_ratelimit()) { 567 /* Try to help the admin */ 568 printk(KERN_NOTICE "%s: too many open " 569 "connections, consider increasing %s\n", 570 serv->sv_name, serv->sv_maxconn ? 571 "the max number of connections." : 572 "the number of threads."); 573 } 574 /* 575 * Always select the oldest connection. It's not fair, 576 * but so is life 577 */ 578 xprt = list_entry(serv->sv_tempsocks.prev, 579 struct svc_xprt, 580 xpt_list); 581 set_bit(XPT_CLOSE, &xprt->xpt_flags); 582 svc_xprt_get(xprt); 583 } 584 spin_unlock_bh(&serv->sv_lock); 585 586 if (xprt) { 587 svc_xprt_enqueue(xprt); 588 svc_xprt_put(xprt); 589 } 590 } 591 } 592 593 /* 594 * Receive the next request on any transport. This code is carefully 595 * organised not to touch any cachelines in the shared svc_serv 596 * structure, only cachelines in the local svc_pool. 597 */ 598 int svc_recv(struct svc_rqst *rqstp, long timeout) 599 { 600 struct svc_xprt *xprt = NULL; 601 struct svc_serv *serv = rqstp->rq_server; 602 struct svc_pool *pool = rqstp->rq_pool; 603 int len, i; 604 int pages; 605 struct xdr_buf *arg; 606 DECLARE_WAITQUEUE(wait, current); 607 long time_left; 608 609 dprintk("svc: server %p waiting for data (to = %ld)\n", 610 rqstp, timeout); 611 612 if (rqstp->rq_xprt) 613 printk(KERN_ERR 614 "svc_recv: service %p, transport not NULL!\n", 615 rqstp); 616 if (waitqueue_active(&rqstp->rq_wait)) 617 printk(KERN_ERR 618 "svc_recv: service %p, wait queue active!\n", 619 rqstp); 620 621 /* now allocate needed pages. If we get a failure, sleep briefly */ 622 pages = (serv->sv_max_mesg + PAGE_SIZE) / PAGE_SIZE; 623 for (i = 0; i < pages ; i++) 624 while (rqstp->rq_pages[i] == NULL) { 625 struct page *p = alloc_page(GFP_KERNEL); 626 if (!p) { 627 set_current_state(TASK_INTERRUPTIBLE); 628 if (signalled() || kthread_should_stop()) { 629 set_current_state(TASK_RUNNING); 630 return -EINTR; 631 } 632 schedule_timeout(msecs_to_jiffies(500)); 633 } 634 rqstp->rq_pages[i] = p; 635 } 636 rqstp->rq_pages[i++] = NULL; /* this might be seen in nfs_read_actor */ 637 BUG_ON(pages >= RPCSVC_MAXPAGES); 638 639 /* Make arg->head point to first page and arg->pages point to rest */ 640 arg = &rqstp->rq_arg; 641 arg->head[0].iov_base = page_address(rqstp->rq_pages[0]); 642 arg->head[0].iov_len = PAGE_SIZE; 643 arg->pages = rqstp->rq_pages + 1; 644 arg->page_base = 0; 645 /* save at least one page for response */ 646 arg->page_len = (pages-2)*PAGE_SIZE; 647 arg->len = (pages-1)*PAGE_SIZE; 648 arg->tail[0].iov_len = 0; 649 650 try_to_freeze(); 651 cond_resched(); 652 if (signalled() || kthread_should_stop()) 653 return -EINTR; 654 655 /* Normally we will wait up to 5 seconds for any required 656 * cache information to be provided. 657 */ 658 rqstp->rq_chandle.thread_wait = 5*HZ; 659 660 spin_lock_bh(&pool->sp_lock); 661 xprt = svc_xprt_dequeue(pool); 662 if (xprt) { 663 rqstp->rq_xprt = xprt; 664 svc_xprt_get(xprt); 665 rqstp->rq_reserved = serv->sv_max_mesg; 666 atomic_add(rqstp->rq_reserved, &xprt->xpt_reserved); 667 668 /* As there is a shortage of threads and this request 669 * had to be queued, don't allow the thread to wait so 670 * long for cache updates. 671 */ 672 rqstp->rq_chandle.thread_wait = 1*HZ; 673 } else { 674 /* No data pending. Go to sleep */ 675 svc_thread_enqueue(pool, rqstp); 676 677 /* 678 * We have to be able to interrupt this wait 679 * to bring down the daemons ... 680 */ 681 set_current_state(TASK_INTERRUPTIBLE); 682 683 /* 684 * checking kthread_should_stop() here allows us to avoid 685 * locking and signalling when stopping kthreads that call 686 * svc_recv. If the thread has already been woken up, then 687 * we can exit here without sleeping. If not, then it 688 * it'll be woken up quickly during the schedule_timeout 689 */ 690 if (kthread_should_stop()) { 691 set_current_state(TASK_RUNNING); 692 spin_unlock_bh(&pool->sp_lock); 693 return -EINTR; 694 } 695 696 add_wait_queue(&rqstp->rq_wait, &wait); 697 spin_unlock_bh(&pool->sp_lock); 698 699 time_left = schedule_timeout(timeout); 700 701 try_to_freeze(); 702 703 spin_lock_bh(&pool->sp_lock); 704 remove_wait_queue(&rqstp->rq_wait, &wait); 705 if (!time_left) 706 pool->sp_stats.threads_timedout++; 707 708 xprt = rqstp->rq_xprt; 709 if (!xprt) { 710 svc_thread_dequeue(pool, rqstp); 711 spin_unlock_bh(&pool->sp_lock); 712 dprintk("svc: server %p, no data yet\n", rqstp); 713 if (signalled() || kthread_should_stop()) 714 return -EINTR; 715 else 716 return -EAGAIN; 717 } 718 } 719 spin_unlock_bh(&pool->sp_lock); 720 721 len = 0; 722 if (test_bit(XPT_CLOSE, &xprt->xpt_flags)) { 723 dprintk("svc_recv: found XPT_CLOSE\n"); 724 svc_delete_xprt(xprt); 725 } else if (test_bit(XPT_LISTENER, &xprt->xpt_flags)) { 726 struct svc_xprt *newxpt; 727 newxpt = xprt->xpt_ops->xpo_accept(xprt); 728 if (newxpt) { 729 /* 730 * We know this module_get will succeed because the 731 * listener holds a reference too 732 */ 733 __module_get(newxpt->xpt_class->xcl_owner); 734 svc_check_conn_limits(xprt->xpt_server); 735 spin_lock_bh(&serv->sv_lock); 736 set_bit(XPT_TEMP, &newxpt->xpt_flags); 737 list_add(&newxpt->xpt_list, &serv->sv_tempsocks); 738 serv->sv_tmpcnt++; 739 if (serv->sv_temptimer.function == NULL) { 740 /* setup timer to age temp transports */ 741 setup_timer(&serv->sv_temptimer, 742 svc_age_temp_xprts, 743 (unsigned long)serv); 744 mod_timer(&serv->sv_temptimer, 745 jiffies + svc_conn_age_period * HZ); 746 } 747 spin_unlock_bh(&serv->sv_lock); 748 svc_xprt_received(newxpt); 749 } 750 svc_xprt_received(xprt); 751 } else { 752 dprintk("svc: server %p, pool %u, transport %p, inuse=%d\n", 753 rqstp, pool->sp_id, xprt, 754 atomic_read(&xprt->xpt_ref.refcount)); 755 rqstp->rq_deferred = svc_deferred_dequeue(xprt); 756 if (rqstp->rq_deferred) { 757 svc_xprt_received(xprt); 758 len = svc_deferred_recv(rqstp); 759 } else { 760 len = xprt->xpt_ops->xpo_recvfrom(rqstp); 761 svc_xprt_received(xprt); 762 } 763 dprintk("svc: got len=%d\n", len); 764 } 765 766 /* No data, incomplete (TCP) read, or accept() */ 767 if (len == 0 || len == -EAGAIN) { 768 rqstp->rq_res.len = 0; 769 svc_xprt_release(rqstp); 770 return -EAGAIN; 771 } 772 clear_bit(XPT_OLD, &xprt->xpt_flags); 773 774 rqstp->rq_secure = svc_port_is_privileged(svc_addr(rqstp)); 775 rqstp->rq_chandle.defer = svc_defer; 776 777 if (serv->sv_stats) 778 serv->sv_stats->netcnt++; 779 return len; 780 } 781 EXPORT_SYMBOL_GPL(svc_recv); 782 783 /* 784 * Drop request 785 */ 786 void svc_drop(struct svc_rqst *rqstp) 787 { 788 dprintk("svc: xprt %p dropped request\n", rqstp->rq_xprt); 789 svc_xprt_release(rqstp); 790 } 791 EXPORT_SYMBOL_GPL(svc_drop); 792 793 /* 794 * Return reply to client. 795 */ 796 int svc_send(struct svc_rqst *rqstp) 797 { 798 struct svc_xprt *xprt; 799 int len; 800 struct xdr_buf *xb; 801 802 xprt = rqstp->rq_xprt; 803 if (!xprt) 804 return -EFAULT; 805 806 /* release the receive skb before sending the reply */ 807 rqstp->rq_xprt->xpt_ops->xpo_release_rqst(rqstp); 808 809 /* calculate over-all length */ 810 xb = &rqstp->rq_res; 811 xb->len = xb->head[0].iov_len + 812 xb->page_len + 813 xb->tail[0].iov_len; 814 815 /* Grab mutex to serialize outgoing data. */ 816 mutex_lock(&xprt->xpt_mutex); 817 if (test_bit(XPT_DEAD, &xprt->xpt_flags)) 818 len = -ENOTCONN; 819 else 820 len = xprt->xpt_ops->xpo_sendto(rqstp); 821 mutex_unlock(&xprt->xpt_mutex); 822 rpc_wake_up(&xprt->xpt_bc_pending); 823 svc_xprt_release(rqstp); 824 825 if (len == -ECONNREFUSED || len == -ENOTCONN || len == -EAGAIN) 826 return 0; 827 return len; 828 } 829 830 /* 831 * Timer function to close old temporary transports, using 832 * a mark-and-sweep algorithm. 833 */ 834 static void svc_age_temp_xprts(unsigned long closure) 835 { 836 struct svc_serv *serv = (struct svc_serv *)closure; 837 struct svc_xprt *xprt; 838 struct list_head *le, *next; 839 LIST_HEAD(to_be_aged); 840 841 dprintk("svc_age_temp_xprts\n"); 842 843 if (!spin_trylock_bh(&serv->sv_lock)) { 844 /* busy, try again 1 sec later */ 845 dprintk("svc_age_temp_xprts: busy\n"); 846 mod_timer(&serv->sv_temptimer, jiffies + HZ); 847 return; 848 } 849 850 list_for_each_safe(le, next, &serv->sv_tempsocks) { 851 xprt = list_entry(le, struct svc_xprt, xpt_list); 852 853 /* First time through, just mark it OLD. Second time 854 * through, close it. */ 855 if (!test_and_set_bit(XPT_OLD, &xprt->xpt_flags)) 856 continue; 857 if (atomic_read(&xprt->xpt_ref.refcount) > 1 || 858 test_bit(XPT_BUSY, &xprt->xpt_flags)) 859 continue; 860 svc_xprt_get(xprt); 861 list_move(le, &to_be_aged); 862 set_bit(XPT_CLOSE, &xprt->xpt_flags); 863 set_bit(XPT_DETACHED, &xprt->xpt_flags); 864 } 865 spin_unlock_bh(&serv->sv_lock); 866 867 while (!list_empty(&to_be_aged)) { 868 le = to_be_aged.next; 869 /* fiddling the xpt_list node is safe 'cos we're XPT_DETACHED */ 870 list_del_init(le); 871 xprt = list_entry(le, struct svc_xprt, xpt_list); 872 873 dprintk("queuing xprt %p for closing\n", xprt); 874 875 /* a thread will dequeue and close it soon */ 876 svc_xprt_enqueue(xprt); 877 svc_xprt_put(xprt); 878 } 879 880 mod_timer(&serv->sv_temptimer, jiffies + svc_conn_age_period * HZ); 881 } 882 883 static void call_xpt_users(struct svc_xprt *xprt) 884 { 885 struct svc_xpt_user *u; 886 887 spin_lock(&xprt->xpt_lock); 888 while (!list_empty(&xprt->xpt_users)) { 889 u = list_first_entry(&xprt->xpt_users, struct svc_xpt_user, list); 890 list_del(&u->list); 891 u->callback(u); 892 } 893 spin_unlock(&xprt->xpt_lock); 894 } 895 896 /* 897 * Remove a dead transport 898 */ 899 void svc_delete_xprt(struct svc_xprt *xprt) 900 { 901 struct svc_serv *serv = xprt->xpt_server; 902 struct svc_deferred_req *dr; 903 904 /* Only do this once */ 905 if (test_and_set_bit(XPT_DEAD, &xprt->xpt_flags)) 906 BUG(); 907 908 dprintk("svc: svc_delete_xprt(%p)\n", xprt); 909 xprt->xpt_ops->xpo_detach(xprt); 910 911 spin_lock_bh(&serv->sv_lock); 912 if (!test_and_set_bit(XPT_DETACHED, &xprt->xpt_flags)) 913 list_del_init(&xprt->xpt_list); 914 /* 915 * We used to delete the transport from whichever list 916 * it's sk_xprt.xpt_ready node was on, but we don't actually 917 * need to. This is because the only time we're called 918 * while still attached to a queue, the queue itself 919 * is about to be destroyed (in svc_destroy). 920 */ 921 if (test_bit(XPT_TEMP, &xprt->xpt_flags)) 922 serv->sv_tmpcnt--; 923 spin_unlock_bh(&serv->sv_lock); 924 925 while ((dr = svc_deferred_dequeue(xprt)) != NULL) 926 kfree(dr); 927 928 call_xpt_users(xprt); 929 svc_xprt_put(xprt); 930 } 931 932 void svc_close_xprt(struct svc_xprt *xprt) 933 { 934 set_bit(XPT_CLOSE, &xprt->xpt_flags); 935 if (test_and_set_bit(XPT_BUSY, &xprt->xpt_flags)) 936 /* someone else will have to effect the close */ 937 return; 938 939 svc_delete_xprt(xprt); 940 } 941 EXPORT_SYMBOL_GPL(svc_close_xprt); 942 943 void svc_close_all(struct list_head *xprt_list) 944 { 945 struct svc_xprt *xprt; 946 struct svc_xprt *tmp; 947 948 list_for_each_entry_safe(xprt, tmp, xprt_list, xpt_list) { 949 set_bit(XPT_CLOSE, &xprt->xpt_flags); 950 if (test_bit(XPT_BUSY, &xprt->xpt_flags)) { 951 /* Waiting to be processed, but no threads left, 952 * So just remove it from the waiting list 953 */ 954 list_del_init(&xprt->xpt_ready); 955 clear_bit(XPT_BUSY, &xprt->xpt_flags); 956 } 957 svc_close_xprt(xprt); 958 } 959 } 960 961 /* 962 * Handle defer and revisit of requests 963 */ 964 965 static void svc_revisit(struct cache_deferred_req *dreq, int too_many) 966 { 967 struct svc_deferred_req *dr = 968 container_of(dreq, struct svc_deferred_req, handle); 969 struct svc_xprt *xprt = dr->xprt; 970 971 spin_lock(&xprt->xpt_lock); 972 set_bit(XPT_DEFERRED, &xprt->xpt_flags); 973 if (too_many || test_bit(XPT_DEAD, &xprt->xpt_flags)) { 974 spin_unlock(&xprt->xpt_lock); 975 dprintk("revisit canceled\n"); 976 svc_xprt_put(xprt); 977 kfree(dr); 978 return; 979 } 980 dprintk("revisit queued\n"); 981 dr->xprt = NULL; 982 list_add(&dr->handle.recent, &xprt->xpt_deferred); 983 spin_unlock(&xprt->xpt_lock); 984 svc_xprt_enqueue(xprt); 985 svc_xprt_put(xprt); 986 } 987 988 /* 989 * Save the request off for later processing. The request buffer looks 990 * like this: 991 * 992 * <xprt-header><rpc-header><rpc-pagelist><rpc-tail> 993 * 994 * This code can only handle requests that consist of an xprt-header 995 * and rpc-header. 996 */ 997 static struct cache_deferred_req *svc_defer(struct cache_req *req) 998 { 999 struct svc_rqst *rqstp = container_of(req, struct svc_rqst, rq_chandle); 1000 struct svc_deferred_req *dr; 1001 1002 if (rqstp->rq_arg.page_len || !rqstp->rq_usedeferral) 1003 return NULL; /* if more than a page, give up FIXME */ 1004 if (rqstp->rq_deferred) { 1005 dr = rqstp->rq_deferred; 1006 rqstp->rq_deferred = NULL; 1007 } else { 1008 size_t skip; 1009 size_t size; 1010 /* FIXME maybe discard if size too large */ 1011 size = sizeof(struct svc_deferred_req) + rqstp->rq_arg.len; 1012 dr = kmalloc(size, GFP_KERNEL); 1013 if (dr == NULL) 1014 return NULL; 1015 1016 dr->handle.owner = rqstp->rq_server; 1017 dr->prot = rqstp->rq_prot; 1018 memcpy(&dr->addr, &rqstp->rq_addr, rqstp->rq_addrlen); 1019 dr->addrlen = rqstp->rq_addrlen; 1020 dr->daddr = rqstp->rq_daddr; 1021 dr->argslen = rqstp->rq_arg.len >> 2; 1022 dr->xprt_hlen = rqstp->rq_xprt_hlen; 1023 1024 /* back up head to the start of the buffer and copy */ 1025 skip = rqstp->rq_arg.len - rqstp->rq_arg.head[0].iov_len; 1026 memcpy(dr->args, rqstp->rq_arg.head[0].iov_base - skip, 1027 dr->argslen << 2); 1028 } 1029 svc_xprt_get(rqstp->rq_xprt); 1030 dr->xprt = rqstp->rq_xprt; 1031 1032 dr->handle.revisit = svc_revisit; 1033 return &dr->handle; 1034 } 1035 1036 /* 1037 * recv data from a deferred request into an active one 1038 */ 1039 static int svc_deferred_recv(struct svc_rqst *rqstp) 1040 { 1041 struct svc_deferred_req *dr = rqstp->rq_deferred; 1042 1043 /* setup iov_base past transport header */ 1044 rqstp->rq_arg.head[0].iov_base = dr->args + (dr->xprt_hlen>>2); 1045 /* The iov_len does not include the transport header bytes */ 1046 rqstp->rq_arg.head[0].iov_len = (dr->argslen<<2) - dr->xprt_hlen; 1047 rqstp->rq_arg.page_len = 0; 1048 /* The rq_arg.len includes the transport header bytes */ 1049 rqstp->rq_arg.len = dr->argslen<<2; 1050 rqstp->rq_prot = dr->prot; 1051 memcpy(&rqstp->rq_addr, &dr->addr, dr->addrlen); 1052 rqstp->rq_addrlen = dr->addrlen; 1053 /* Save off transport header len in case we get deferred again */ 1054 rqstp->rq_xprt_hlen = dr->xprt_hlen; 1055 rqstp->rq_daddr = dr->daddr; 1056 rqstp->rq_respages = rqstp->rq_pages; 1057 return (dr->argslen<<2) - dr->xprt_hlen; 1058 } 1059 1060 1061 static struct svc_deferred_req *svc_deferred_dequeue(struct svc_xprt *xprt) 1062 { 1063 struct svc_deferred_req *dr = NULL; 1064 1065 if (!test_bit(XPT_DEFERRED, &xprt->xpt_flags)) 1066 return NULL; 1067 spin_lock(&xprt->xpt_lock); 1068 clear_bit(XPT_DEFERRED, &xprt->xpt_flags); 1069 if (!list_empty(&xprt->xpt_deferred)) { 1070 dr = list_entry(xprt->xpt_deferred.next, 1071 struct svc_deferred_req, 1072 handle.recent); 1073 list_del_init(&dr->handle.recent); 1074 set_bit(XPT_DEFERRED, &xprt->xpt_flags); 1075 } 1076 spin_unlock(&xprt->xpt_lock); 1077 return dr; 1078 } 1079 1080 /** 1081 * svc_find_xprt - find an RPC transport instance 1082 * @serv: pointer to svc_serv to search 1083 * @xcl_name: C string containing transport's class name 1084 * @af: Address family of transport's local address 1085 * @port: transport's IP port number 1086 * 1087 * Return the transport instance pointer for the endpoint accepting 1088 * connections/peer traffic from the specified transport class, 1089 * address family and port. 1090 * 1091 * Specifying 0 for the address family or port is effectively a 1092 * wild-card, and will result in matching the first transport in the 1093 * service's list that has a matching class name. 1094 */ 1095 struct svc_xprt *svc_find_xprt(struct svc_serv *serv, const char *xcl_name, 1096 const sa_family_t af, const unsigned short port) 1097 { 1098 struct svc_xprt *xprt; 1099 struct svc_xprt *found = NULL; 1100 1101 /* Sanity check the args */ 1102 if (serv == NULL || xcl_name == NULL) 1103 return found; 1104 1105 spin_lock_bh(&serv->sv_lock); 1106 list_for_each_entry(xprt, &serv->sv_permsocks, xpt_list) { 1107 if (strcmp(xprt->xpt_class->xcl_name, xcl_name)) 1108 continue; 1109 if (af != AF_UNSPEC && af != xprt->xpt_local.ss_family) 1110 continue; 1111 if (port != 0 && port != svc_xprt_local_port(xprt)) 1112 continue; 1113 found = xprt; 1114 svc_xprt_get(xprt); 1115 break; 1116 } 1117 spin_unlock_bh(&serv->sv_lock); 1118 return found; 1119 } 1120 EXPORT_SYMBOL_GPL(svc_find_xprt); 1121 1122 static int svc_one_xprt_name(const struct svc_xprt *xprt, 1123 char *pos, int remaining) 1124 { 1125 int len; 1126 1127 len = snprintf(pos, remaining, "%s %u\n", 1128 xprt->xpt_class->xcl_name, 1129 svc_xprt_local_port(xprt)); 1130 if (len >= remaining) 1131 return -ENAMETOOLONG; 1132 return len; 1133 } 1134 1135 /** 1136 * svc_xprt_names - format a buffer with a list of transport names 1137 * @serv: pointer to an RPC service 1138 * @buf: pointer to a buffer to be filled in 1139 * @buflen: length of buffer to be filled in 1140 * 1141 * Fills in @buf with a string containing a list of transport names, 1142 * each name terminated with '\n'. 1143 * 1144 * Returns positive length of the filled-in string on success; otherwise 1145 * a negative errno value is returned if an error occurs. 1146 */ 1147 int svc_xprt_names(struct svc_serv *serv, char *buf, const int buflen) 1148 { 1149 struct svc_xprt *xprt; 1150 int len, totlen; 1151 char *pos; 1152 1153 /* Sanity check args */ 1154 if (!serv) 1155 return 0; 1156 1157 spin_lock_bh(&serv->sv_lock); 1158 1159 pos = buf; 1160 totlen = 0; 1161 list_for_each_entry(xprt, &serv->sv_permsocks, xpt_list) { 1162 len = svc_one_xprt_name(xprt, pos, buflen - totlen); 1163 if (len < 0) { 1164 *buf = '\0'; 1165 totlen = len; 1166 } 1167 if (len <= 0) 1168 break; 1169 1170 pos += len; 1171 totlen += len; 1172 } 1173 1174 spin_unlock_bh(&serv->sv_lock); 1175 return totlen; 1176 } 1177 EXPORT_SYMBOL_GPL(svc_xprt_names); 1178 1179 1180 /*----------------------------------------------------------------------------*/ 1181 1182 static void *svc_pool_stats_start(struct seq_file *m, loff_t *pos) 1183 { 1184 unsigned int pidx = (unsigned int)*pos; 1185 struct svc_serv *serv = m->private; 1186 1187 dprintk("svc_pool_stats_start, *pidx=%u\n", pidx); 1188 1189 if (!pidx) 1190 return SEQ_START_TOKEN; 1191 return (pidx > serv->sv_nrpools ? NULL : &serv->sv_pools[pidx-1]); 1192 } 1193 1194 static void *svc_pool_stats_next(struct seq_file *m, void *p, loff_t *pos) 1195 { 1196 struct svc_pool *pool = p; 1197 struct svc_serv *serv = m->private; 1198 1199 dprintk("svc_pool_stats_next, *pos=%llu\n", *pos); 1200 1201 if (p == SEQ_START_TOKEN) { 1202 pool = &serv->sv_pools[0]; 1203 } else { 1204 unsigned int pidx = (pool - &serv->sv_pools[0]); 1205 if (pidx < serv->sv_nrpools-1) 1206 pool = &serv->sv_pools[pidx+1]; 1207 else 1208 pool = NULL; 1209 } 1210 ++*pos; 1211 return pool; 1212 } 1213 1214 static void svc_pool_stats_stop(struct seq_file *m, void *p) 1215 { 1216 } 1217 1218 static int svc_pool_stats_show(struct seq_file *m, void *p) 1219 { 1220 struct svc_pool *pool = p; 1221 1222 if (p == SEQ_START_TOKEN) { 1223 seq_puts(m, "# pool packets-arrived sockets-enqueued threads-woken threads-timedout\n"); 1224 return 0; 1225 } 1226 1227 seq_printf(m, "%u %lu %lu %lu %lu\n", 1228 pool->sp_id, 1229 pool->sp_stats.packets, 1230 pool->sp_stats.sockets_queued, 1231 pool->sp_stats.threads_woken, 1232 pool->sp_stats.threads_timedout); 1233 1234 return 0; 1235 } 1236 1237 static const struct seq_operations svc_pool_stats_seq_ops = { 1238 .start = svc_pool_stats_start, 1239 .next = svc_pool_stats_next, 1240 .stop = svc_pool_stats_stop, 1241 .show = svc_pool_stats_show, 1242 }; 1243 1244 int svc_pool_stats_open(struct svc_serv *serv, struct file *file) 1245 { 1246 int err; 1247 1248 err = seq_open(file, &svc_pool_stats_seq_ops); 1249 if (!err) 1250 ((struct seq_file *) file->private_data)->private = serv; 1251 return err; 1252 } 1253 EXPORT_SYMBOL(svc_pool_stats_open); 1254 1255 /*----------------------------------------------------------------------------*/ 1256