1 /* 2 * Copyright (c) 2014-2017 Oracle. All rights reserved. 3 * Copyright (c) 2003-2007 Network Appliance, Inc. All rights reserved. 4 * 5 * This software is available to you under a choice of one of two 6 * licenses. You may choose to be licensed under the terms of the GNU 7 * General Public License (GPL) Version 2, available from the file 8 * COPYING in the main directory of this source tree, or the BSD-type 9 * license below: 10 * 11 * Redistribution and use in source and binary forms, with or without 12 * modification, are permitted provided that the following conditions 13 * are met: 14 * 15 * Redistributions of source code must retain the above copyright 16 * notice, this list of conditions and the following disclaimer. 17 * 18 * Redistributions in binary form must reproduce the above 19 * copyright notice, this list of conditions and the following 20 * disclaimer in the documentation and/or other materials provided 21 * with the distribution. 22 * 23 * Neither the name of the Network Appliance, Inc. nor the names of 24 * its contributors may be used to endorse or promote products 25 * derived from this software without specific prior written 26 * permission. 27 * 28 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 29 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 30 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 31 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 32 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 33 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 34 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 35 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 36 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 37 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 38 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 39 */ 40 41 /* 42 * transport.c 43 * 44 * This file contains the top-level implementation of an RPC RDMA 45 * transport. 46 * 47 * Naming convention: functions beginning with xprt_ are part of the 48 * transport switch. All others are RPC RDMA internal. 49 */ 50 51 #include <linux/module.h> 52 #include <linux/slab.h> 53 #include <linux/seq_file.h> 54 #include <linux/sunrpc/addr.h> 55 56 #include "xprt_rdma.h" 57 58 #if IS_ENABLED(CONFIG_SUNRPC_DEBUG) 59 # define RPCDBG_FACILITY RPCDBG_TRANS 60 #endif 61 62 /* 63 * tunables 64 */ 65 66 static unsigned int xprt_rdma_slot_table_entries = RPCRDMA_DEF_SLOT_TABLE; 67 unsigned int xprt_rdma_max_inline_read = RPCRDMA_DEF_INLINE; 68 static unsigned int xprt_rdma_max_inline_write = RPCRDMA_DEF_INLINE; 69 static unsigned int xprt_rdma_inline_write_padding; 70 unsigned int xprt_rdma_memreg_strategy = RPCRDMA_FRMR; 71 int xprt_rdma_pad_optimize; 72 73 #if IS_ENABLED(CONFIG_SUNRPC_DEBUG) 74 75 static unsigned int min_slot_table_size = RPCRDMA_MIN_SLOT_TABLE; 76 static unsigned int max_slot_table_size = RPCRDMA_MAX_SLOT_TABLE; 77 static unsigned int min_inline_size = RPCRDMA_MIN_INLINE; 78 static unsigned int max_inline_size = RPCRDMA_MAX_INLINE; 79 static unsigned int zero; 80 static unsigned int max_padding = PAGE_SIZE; 81 static unsigned int min_memreg = RPCRDMA_BOUNCEBUFFERS; 82 static unsigned int max_memreg = RPCRDMA_LAST - 1; 83 84 static struct ctl_table_header *sunrpc_table_header; 85 86 static struct ctl_table xr_tunables_table[] = { 87 { 88 .procname = "rdma_slot_table_entries", 89 .data = &xprt_rdma_slot_table_entries, 90 .maxlen = sizeof(unsigned int), 91 .mode = 0644, 92 .proc_handler = proc_dointvec_minmax, 93 .extra1 = &min_slot_table_size, 94 .extra2 = &max_slot_table_size 95 }, 96 { 97 .procname = "rdma_max_inline_read", 98 .data = &xprt_rdma_max_inline_read, 99 .maxlen = sizeof(unsigned int), 100 .mode = 0644, 101 .proc_handler = proc_dointvec_minmax, 102 .extra1 = &min_inline_size, 103 .extra2 = &max_inline_size, 104 }, 105 { 106 .procname = "rdma_max_inline_write", 107 .data = &xprt_rdma_max_inline_write, 108 .maxlen = sizeof(unsigned int), 109 .mode = 0644, 110 .proc_handler = proc_dointvec_minmax, 111 .extra1 = &min_inline_size, 112 .extra2 = &max_inline_size, 113 }, 114 { 115 .procname = "rdma_inline_write_padding", 116 .data = &xprt_rdma_inline_write_padding, 117 .maxlen = sizeof(unsigned int), 118 .mode = 0644, 119 .proc_handler = proc_dointvec_minmax, 120 .extra1 = &zero, 121 .extra2 = &max_padding, 122 }, 123 { 124 .procname = "rdma_memreg_strategy", 125 .data = &xprt_rdma_memreg_strategy, 126 .maxlen = sizeof(unsigned int), 127 .mode = 0644, 128 .proc_handler = proc_dointvec_minmax, 129 .extra1 = &min_memreg, 130 .extra2 = &max_memreg, 131 }, 132 { 133 .procname = "rdma_pad_optimize", 134 .data = &xprt_rdma_pad_optimize, 135 .maxlen = sizeof(unsigned int), 136 .mode = 0644, 137 .proc_handler = proc_dointvec, 138 }, 139 { }, 140 }; 141 142 static struct ctl_table sunrpc_table[] = { 143 { 144 .procname = "sunrpc", 145 .mode = 0555, 146 .child = xr_tunables_table 147 }, 148 { }, 149 }; 150 151 #endif 152 153 static const struct rpc_xprt_ops xprt_rdma_procs; 154 155 static void 156 xprt_rdma_format_addresses4(struct rpc_xprt *xprt, struct sockaddr *sap) 157 { 158 struct sockaddr_in *sin = (struct sockaddr_in *)sap; 159 char buf[20]; 160 161 snprintf(buf, sizeof(buf), "%08x", ntohl(sin->sin_addr.s_addr)); 162 xprt->address_strings[RPC_DISPLAY_HEX_ADDR] = kstrdup(buf, GFP_KERNEL); 163 164 xprt->address_strings[RPC_DISPLAY_NETID] = RPCBIND_NETID_RDMA; 165 } 166 167 static void 168 xprt_rdma_format_addresses6(struct rpc_xprt *xprt, struct sockaddr *sap) 169 { 170 struct sockaddr_in6 *sin6 = (struct sockaddr_in6 *)sap; 171 char buf[40]; 172 173 snprintf(buf, sizeof(buf), "%pi6", &sin6->sin6_addr); 174 xprt->address_strings[RPC_DISPLAY_HEX_ADDR] = kstrdup(buf, GFP_KERNEL); 175 176 xprt->address_strings[RPC_DISPLAY_NETID] = RPCBIND_NETID_RDMA6; 177 } 178 179 void 180 xprt_rdma_format_addresses(struct rpc_xprt *xprt, struct sockaddr *sap) 181 { 182 char buf[128]; 183 184 switch (sap->sa_family) { 185 case AF_INET: 186 xprt_rdma_format_addresses4(xprt, sap); 187 break; 188 case AF_INET6: 189 xprt_rdma_format_addresses6(xprt, sap); 190 break; 191 default: 192 pr_err("rpcrdma: Unrecognized address family\n"); 193 return; 194 } 195 196 (void)rpc_ntop(sap, buf, sizeof(buf)); 197 xprt->address_strings[RPC_DISPLAY_ADDR] = kstrdup(buf, GFP_KERNEL); 198 199 snprintf(buf, sizeof(buf), "%u", rpc_get_port(sap)); 200 xprt->address_strings[RPC_DISPLAY_PORT] = kstrdup(buf, GFP_KERNEL); 201 202 snprintf(buf, sizeof(buf), "%4hx", rpc_get_port(sap)); 203 xprt->address_strings[RPC_DISPLAY_HEX_PORT] = kstrdup(buf, GFP_KERNEL); 204 205 xprt->address_strings[RPC_DISPLAY_PROTO] = "rdma"; 206 } 207 208 void 209 xprt_rdma_free_addresses(struct rpc_xprt *xprt) 210 { 211 unsigned int i; 212 213 for (i = 0; i < RPC_DISPLAY_MAX; i++) 214 switch (i) { 215 case RPC_DISPLAY_PROTO: 216 case RPC_DISPLAY_NETID: 217 continue; 218 default: 219 kfree(xprt->address_strings[i]); 220 } 221 } 222 223 void 224 rpcrdma_conn_func(struct rpcrdma_ep *ep) 225 { 226 schedule_delayed_work(&ep->rep_connect_worker, 0); 227 } 228 229 void 230 rpcrdma_connect_worker(struct work_struct *work) 231 { 232 struct rpcrdma_ep *ep = 233 container_of(work, struct rpcrdma_ep, rep_connect_worker.work); 234 struct rpcrdma_xprt *r_xprt = 235 container_of(ep, struct rpcrdma_xprt, rx_ep); 236 struct rpc_xprt *xprt = &r_xprt->rx_xprt; 237 238 spin_lock_bh(&xprt->transport_lock); 239 if (++xprt->connect_cookie == 0) /* maintain a reserved value */ 240 ++xprt->connect_cookie; 241 if (ep->rep_connected > 0) { 242 if (!xprt_test_and_set_connected(xprt)) 243 xprt_wake_pending_tasks(xprt, 0); 244 } else { 245 if (xprt_test_and_clear_connected(xprt)) 246 xprt_wake_pending_tasks(xprt, -ENOTCONN); 247 } 248 spin_unlock_bh(&xprt->transport_lock); 249 } 250 251 static void 252 xprt_rdma_connect_worker(struct work_struct *work) 253 { 254 struct rpcrdma_xprt *r_xprt = container_of(work, struct rpcrdma_xprt, 255 rx_connect_worker.work); 256 struct rpc_xprt *xprt = &r_xprt->rx_xprt; 257 int rc = 0; 258 259 xprt_clear_connected(xprt); 260 261 dprintk("RPC: %s: %sconnect\n", __func__, 262 r_xprt->rx_ep.rep_connected != 0 ? "re" : ""); 263 rc = rpcrdma_ep_connect(&r_xprt->rx_ep, &r_xprt->rx_ia); 264 if (rc) 265 xprt_wake_pending_tasks(xprt, rc); 266 267 dprintk("RPC: %s: exit\n", __func__); 268 xprt_clear_connecting(xprt); 269 } 270 271 static void 272 xprt_rdma_inject_disconnect(struct rpc_xprt *xprt) 273 { 274 struct rpcrdma_xprt *r_xprt = container_of(xprt, struct rpcrdma_xprt, 275 rx_xprt); 276 277 pr_info("rpcrdma: injecting transport disconnect on xprt=%p\n", xprt); 278 rdma_disconnect(r_xprt->rx_ia.ri_id); 279 } 280 281 /* 282 * xprt_rdma_destroy 283 * 284 * Destroy the xprt. 285 * Free all memory associated with the object, including its own. 286 * NOTE: none of the *destroy methods free memory for their top-level 287 * objects, even though they may have allocated it (they do free 288 * private memory). It's up to the caller to handle it. In this 289 * case (RDMA transport), all structure memory is inlined with the 290 * struct rpcrdma_xprt. 291 */ 292 static void 293 xprt_rdma_destroy(struct rpc_xprt *xprt) 294 { 295 struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt); 296 297 dprintk("RPC: %s: called\n", __func__); 298 299 cancel_delayed_work_sync(&r_xprt->rx_connect_worker); 300 301 xprt_clear_connected(xprt); 302 303 rpcrdma_ep_destroy(&r_xprt->rx_ep, &r_xprt->rx_ia); 304 rpcrdma_buffer_destroy(&r_xprt->rx_buf); 305 rpcrdma_ia_close(&r_xprt->rx_ia); 306 307 xprt_rdma_free_addresses(xprt); 308 309 xprt_free(xprt); 310 311 dprintk("RPC: %s: returning\n", __func__); 312 313 module_put(THIS_MODULE); 314 } 315 316 static const struct rpc_timeout xprt_rdma_default_timeout = { 317 .to_initval = 60 * HZ, 318 .to_maxval = 60 * HZ, 319 }; 320 321 /** 322 * xprt_setup_rdma - Set up transport to use RDMA 323 * 324 * @args: rpc transport arguments 325 */ 326 static struct rpc_xprt * 327 xprt_setup_rdma(struct xprt_create *args) 328 { 329 struct rpcrdma_create_data_internal cdata; 330 struct rpc_xprt *xprt; 331 struct rpcrdma_xprt *new_xprt; 332 struct rpcrdma_ep *new_ep; 333 struct sockaddr *sap; 334 int rc; 335 336 if (args->addrlen > sizeof(xprt->addr)) { 337 dprintk("RPC: %s: address too large\n", __func__); 338 return ERR_PTR(-EBADF); 339 } 340 341 xprt = xprt_alloc(args->net, sizeof(struct rpcrdma_xprt), 342 xprt_rdma_slot_table_entries, 343 xprt_rdma_slot_table_entries); 344 if (xprt == NULL) { 345 dprintk("RPC: %s: couldn't allocate rpcrdma_xprt\n", 346 __func__); 347 return ERR_PTR(-ENOMEM); 348 } 349 350 /* 60 second timeout, no retries */ 351 xprt->timeout = &xprt_rdma_default_timeout; 352 xprt->bind_timeout = RPCRDMA_BIND_TO; 353 xprt->reestablish_timeout = RPCRDMA_INIT_REEST_TO; 354 xprt->idle_timeout = RPCRDMA_IDLE_DISC_TO; 355 356 xprt->resvport = 0; /* privileged port not needed */ 357 xprt->tsh_size = 0; /* RPC-RDMA handles framing */ 358 xprt->ops = &xprt_rdma_procs; 359 360 /* 361 * Set up RDMA-specific connect data. 362 */ 363 364 sap = (struct sockaddr *)&cdata.addr; 365 memcpy(sap, args->dstaddr, args->addrlen); 366 367 /* Ensure xprt->addr holds valid server TCP (not RDMA) 368 * address, for any side protocols which peek at it */ 369 xprt->prot = IPPROTO_TCP; 370 xprt->addrlen = args->addrlen; 371 memcpy(&xprt->addr, sap, xprt->addrlen); 372 373 if (rpc_get_port(sap)) 374 xprt_set_bound(xprt); 375 376 cdata.max_requests = xprt->max_reqs; 377 378 cdata.rsize = RPCRDMA_MAX_SEGS * PAGE_SIZE; /* RDMA write max */ 379 cdata.wsize = RPCRDMA_MAX_SEGS * PAGE_SIZE; /* RDMA read max */ 380 381 cdata.inline_wsize = xprt_rdma_max_inline_write; 382 if (cdata.inline_wsize > cdata.wsize) 383 cdata.inline_wsize = cdata.wsize; 384 385 cdata.inline_rsize = xprt_rdma_max_inline_read; 386 if (cdata.inline_rsize > cdata.rsize) 387 cdata.inline_rsize = cdata.rsize; 388 389 cdata.padding = xprt_rdma_inline_write_padding; 390 391 /* 392 * Create new transport instance, which includes initialized 393 * o ia 394 * o endpoint 395 * o buffers 396 */ 397 398 new_xprt = rpcx_to_rdmax(xprt); 399 400 rc = rpcrdma_ia_open(new_xprt, sap); 401 if (rc) 402 goto out1; 403 404 /* 405 * initialize and create ep 406 */ 407 new_xprt->rx_data = cdata; 408 new_ep = &new_xprt->rx_ep; 409 new_ep->rep_remote_addr = cdata.addr; 410 411 rc = rpcrdma_ep_create(&new_xprt->rx_ep, 412 &new_xprt->rx_ia, &new_xprt->rx_data); 413 if (rc) 414 goto out2; 415 416 /* 417 * Allocate pre-registered send and receive buffers for headers and 418 * any inline data. Also specify any padding which will be provided 419 * from a preregistered zero buffer. 420 */ 421 rc = rpcrdma_buffer_create(new_xprt); 422 if (rc) 423 goto out3; 424 425 /* 426 * Register a callback for connection events. This is necessary because 427 * connection loss notification is async. We also catch connection loss 428 * when reaping receives. 429 */ 430 INIT_DELAYED_WORK(&new_xprt->rx_connect_worker, 431 xprt_rdma_connect_worker); 432 433 xprt_rdma_format_addresses(xprt, sap); 434 xprt->max_payload = new_xprt->rx_ia.ri_ops->ro_maxpages(new_xprt); 435 if (xprt->max_payload == 0) 436 goto out4; 437 xprt->max_payload <<= PAGE_SHIFT; 438 dprintk("RPC: %s: transport data payload maximum: %zu bytes\n", 439 __func__, xprt->max_payload); 440 441 if (!try_module_get(THIS_MODULE)) 442 goto out4; 443 444 dprintk("RPC: %s: %s:%s\n", __func__, 445 xprt->address_strings[RPC_DISPLAY_ADDR], 446 xprt->address_strings[RPC_DISPLAY_PORT]); 447 return xprt; 448 449 out4: 450 xprt_rdma_free_addresses(xprt); 451 rc = -EINVAL; 452 out3: 453 rpcrdma_ep_destroy(new_ep, &new_xprt->rx_ia); 454 out2: 455 rpcrdma_ia_close(&new_xprt->rx_ia); 456 out1: 457 xprt_free(xprt); 458 return ERR_PTR(rc); 459 } 460 461 /** 462 * xprt_rdma_close - Close down RDMA connection 463 * @xprt: generic transport to be closed 464 * 465 * Called during transport shutdown reconnect, or device 466 * removal. Caller holds the transport's write lock. 467 */ 468 static void 469 xprt_rdma_close(struct rpc_xprt *xprt) 470 { 471 struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt); 472 struct rpcrdma_ep *ep = &r_xprt->rx_ep; 473 struct rpcrdma_ia *ia = &r_xprt->rx_ia; 474 475 dprintk("RPC: %s: closing xprt %p\n", __func__, xprt); 476 477 if (test_and_clear_bit(RPCRDMA_IAF_REMOVING, &ia->ri_flags)) { 478 xprt_clear_connected(xprt); 479 rpcrdma_ia_remove(ia); 480 return; 481 } 482 if (ep->rep_connected == -ENODEV) 483 return; 484 if (ep->rep_connected > 0) 485 xprt->reestablish_timeout = 0; 486 xprt_disconnect_done(xprt); 487 rpcrdma_ep_disconnect(ep, ia); 488 } 489 490 static void 491 xprt_rdma_set_port(struct rpc_xprt *xprt, u16 port) 492 { 493 struct sockaddr_in *sap; 494 495 sap = (struct sockaddr_in *)&xprt->addr; 496 sap->sin_port = htons(port); 497 sap = (struct sockaddr_in *)&rpcx_to_rdmad(xprt).addr; 498 sap->sin_port = htons(port); 499 dprintk("RPC: %s: %u\n", __func__, port); 500 } 501 502 /** 503 * xprt_rdma_timer - invoked when an RPC times out 504 * @xprt: controlling RPC transport 505 * @task: RPC task that timed out 506 * 507 * Invoked when the transport is still connected, but an RPC 508 * retransmit timeout occurs. 509 * 510 * Since RDMA connections don't have a keep-alive, forcibly 511 * disconnect and retry to connect. This drives full 512 * detection of the network path, and retransmissions of 513 * all pending RPCs. 514 */ 515 static void 516 xprt_rdma_timer(struct rpc_xprt *xprt, struct rpc_task *task) 517 { 518 dprintk("RPC: %5u %s: xprt = %p\n", task->tk_pid, __func__, xprt); 519 520 xprt_force_disconnect(xprt); 521 } 522 523 static void 524 xprt_rdma_connect(struct rpc_xprt *xprt, struct rpc_task *task) 525 { 526 struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt); 527 528 if (r_xprt->rx_ep.rep_connected != 0) { 529 /* Reconnect */ 530 schedule_delayed_work(&r_xprt->rx_connect_worker, 531 xprt->reestablish_timeout); 532 xprt->reestablish_timeout <<= 1; 533 if (xprt->reestablish_timeout > RPCRDMA_MAX_REEST_TO) 534 xprt->reestablish_timeout = RPCRDMA_MAX_REEST_TO; 535 else if (xprt->reestablish_timeout < RPCRDMA_INIT_REEST_TO) 536 xprt->reestablish_timeout = RPCRDMA_INIT_REEST_TO; 537 } else { 538 schedule_delayed_work(&r_xprt->rx_connect_worker, 0); 539 if (!RPC_IS_ASYNC(task)) 540 flush_delayed_work(&r_xprt->rx_connect_worker); 541 } 542 } 543 544 /* Allocate a fixed-size buffer in which to construct and send the 545 * RPC-over-RDMA header for this request. 546 */ 547 static bool 548 rpcrdma_get_rdmabuf(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req, 549 gfp_t flags) 550 { 551 size_t size = RPCRDMA_HDRBUF_SIZE; 552 struct rpcrdma_regbuf *rb; 553 554 if (req->rl_rdmabuf) 555 return true; 556 557 rb = rpcrdma_alloc_regbuf(size, DMA_TO_DEVICE, flags); 558 if (IS_ERR(rb)) 559 return false; 560 561 r_xprt->rx_stats.hardway_register_count += size; 562 req->rl_rdmabuf = rb; 563 xdr_buf_init(&req->rl_hdrbuf, rb->rg_base, rdmab_length(rb)); 564 return true; 565 } 566 567 static bool 568 rpcrdma_get_sendbuf(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req, 569 size_t size, gfp_t flags) 570 { 571 struct rpcrdma_regbuf *rb; 572 573 if (req->rl_sendbuf && rdmab_length(req->rl_sendbuf) >= size) 574 return true; 575 576 rb = rpcrdma_alloc_regbuf(size, DMA_TO_DEVICE, flags); 577 if (IS_ERR(rb)) 578 return false; 579 580 rpcrdma_free_regbuf(req->rl_sendbuf); 581 r_xprt->rx_stats.hardway_register_count += size; 582 req->rl_sendbuf = rb; 583 return true; 584 } 585 586 /* The rq_rcv_buf is used only if a Reply chunk is necessary. 587 * The decision to use a Reply chunk is made later in 588 * rpcrdma_marshal_req. This buffer is registered at that time. 589 * 590 * Otherwise, the associated RPC Reply arrives in a separate 591 * Receive buffer, arbitrarily chosen by the HCA. The buffer 592 * allocated here for the RPC Reply is not utilized in that 593 * case. See rpcrdma_inline_fixup. 594 * 595 * A regbuf is used here to remember the buffer size. 596 */ 597 static bool 598 rpcrdma_get_recvbuf(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req, 599 size_t size, gfp_t flags) 600 { 601 struct rpcrdma_regbuf *rb; 602 603 if (req->rl_recvbuf && rdmab_length(req->rl_recvbuf) >= size) 604 return true; 605 606 rb = rpcrdma_alloc_regbuf(size, DMA_NONE, flags); 607 if (IS_ERR(rb)) 608 return false; 609 610 rpcrdma_free_regbuf(req->rl_recvbuf); 611 r_xprt->rx_stats.hardway_register_count += size; 612 req->rl_recvbuf = rb; 613 return true; 614 } 615 616 /** 617 * xprt_rdma_allocate - allocate transport resources for an RPC 618 * @task: RPC task 619 * 620 * Return values: 621 * 0: Success; rq_buffer points to RPC buffer to use 622 * ENOMEM: Out of memory, call again later 623 * EIO: A permanent error occurred, do not retry 624 * 625 * The RDMA allocate/free functions need the task structure as a place 626 * to hide the struct rpcrdma_req, which is necessary for the actual 627 * send/recv sequence. 628 * 629 * xprt_rdma_allocate provides buffers that are already mapped for 630 * DMA, and a local DMA lkey is provided for each. 631 */ 632 static int 633 xprt_rdma_allocate(struct rpc_task *task) 634 { 635 struct rpc_rqst *rqst = task->tk_rqstp; 636 struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(rqst->rq_xprt); 637 struct rpcrdma_req *req; 638 gfp_t flags; 639 640 req = rpcrdma_buffer_get(&r_xprt->rx_buf); 641 if (req == NULL) 642 return -ENOMEM; 643 644 flags = RPCRDMA_DEF_GFP; 645 if (RPC_IS_SWAPPER(task)) 646 flags = __GFP_MEMALLOC | GFP_NOWAIT | __GFP_NOWARN; 647 648 if (!rpcrdma_get_rdmabuf(r_xprt, req, flags)) 649 goto out_fail; 650 if (!rpcrdma_get_sendbuf(r_xprt, req, rqst->rq_callsize, flags)) 651 goto out_fail; 652 if (!rpcrdma_get_recvbuf(r_xprt, req, rqst->rq_rcvsize, flags)) 653 goto out_fail; 654 655 dprintk("RPC: %5u %s: send size = %zd, recv size = %zd, req = %p\n", 656 task->tk_pid, __func__, rqst->rq_callsize, 657 rqst->rq_rcvsize, req); 658 659 req->rl_connect_cookie = 0; /* our reserved value */ 660 rpcrdma_set_xprtdata(rqst, req); 661 rqst->rq_buffer = req->rl_sendbuf->rg_base; 662 rqst->rq_rbuffer = req->rl_recvbuf->rg_base; 663 return 0; 664 665 out_fail: 666 rpcrdma_buffer_put(req); 667 return -ENOMEM; 668 } 669 670 /** 671 * xprt_rdma_free - release resources allocated by xprt_rdma_allocate 672 * @task: RPC task 673 * 674 * Caller guarantees rqst->rq_buffer is non-NULL. 675 */ 676 static void 677 xprt_rdma_free(struct rpc_task *task) 678 { 679 struct rpc_rqst *rqst = task->tk_rqstp; 680 struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(rqst->rq_xprt); 681 struct rpcrdma_req *req = rpcr_to_rdmar(rqst); 682 683 if (test_bit(RPCRDMA_REQ_F_BACKCHANNEL, &req->rl_flags)) 684 return; 685 686 dprintk("RPC: %s: called on 0x%p\n", __func__, req->rl_reply); 687 688 if (test_bit(RPCRDMA_REQ_F_PENDING, &req->rl_flags)) 689 rpcrdma_release_rqst(r_xprt, req); 690 rpcrdma_buffer_put(req); 691 } 692 693 /** 694 * xprt_rdma_send_request - marshal and send an RPC request 695 * @task: RPC task with an RPC message in rq_snd_buf 696 * 697 * Caller holds the transport's write lock. 698 * 699 * Return values: 700 * 0: The request has been sent 701 * ENOTCONN: Caller needs to invoke connect logic then call again 702 * ENOBUFS: Call again later to send the request 703 * EIO: A permanent error occurred. The request was not sent, 704 * and don't try it again 705 * 706 * send_request invokes the meat of RPC RDMA. It must do the following: 707 * 708 * 1. Marshal the RPC request into an RPC RDMA request, which means 709 * putting a header in front of data, and creating IOVs for RDMA 710 * from those in the request. 711 * 2. In marshaling, detect opportunities for RDMA, and use them. 712 * 3. Post a recv message to set up asynch completion, then send 713 * the request (rpcrdma_ep_post). 714 * 4. No partial sends are possible in the RPC-RDMA protocol (as in UDP). 715 */ 716 static int 717 xprt_rdma_send_request(struct rpc_task *task) 718 { 719 struct rpc_rqst *rqst = task->tk_rqstp; 720 struct rpc_xprt *xprt = rqst->rq_xprt; 721 struct rpcrdma_req *req = rpcr_to_rdmar(rqst); 722 struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt); 723 int rc = 0; 724 725 if (!xprt_connected(xprt)) 726 goto drop_connection; 727 728 /* On retransmit, remove any previously registered chunks */ 729 if (unlikely(!list_empty(&req->rl_registered))) 730 r_xprt->rx_ia.ri_ops->ro_unmap_sync(r_xprt, 731 &req->rl_registered); 732 733 rc = rpcrdma_marshal_req(r_xprt, rqst); 734 if (rc < 0) 735 goto failed_marshal; 736 737 if (req->rl_reply == NULL) /* e.g. reconnection */ 738 rpcrdma_recv_buffer_get(req); 739 740 /* Must suppress retransmit to maintain credits */ 741 if (req->rl_connect_cookie == xprt->connect_cookie) 742 goto drop_connection; 743 req->rl_connect_cookie = xprt->connect_cookie; 744 745 set_bit(RPCRDMA_REQ_F_PENDING, &req->rl_flags); 746 if (rpcrdma_ep_post(&r_xprt->rx_ia, &r_xprt->rx_ep, req)) 747 goto drop_connection; 748 749 rqst->rq_xmit_bytes_sent += rqst->rq_snd_buf.len; 750 rqst->rq_bytes_sent = 0; 751 return 0; 752 753 failed_marshal: 754 if (rc != -ENOTCONN) 755 return rc; 756 drop_connection: 757 xprt_disconnect_done(xprt); 758 return -ENOTCONN; /* implies disconnect */ 759 } 760 761 void xprt_rdma_print_stats(struct rpc_xprt *xprt, struct seq_file *seq) 762 { 763 struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt); 764 long idle_time = 0; 765 766 if (xprt_connected(xprt)) 767 idle_time = (long)(jiffies - xprt->last_used) / HZ; 768 769 seq_puts(seq, "\txprt:\trdma "); 770 seq_printf(seq, "%u %lu %lu %lu %ld %lu %lu %lu %llu %llu ", 771 0, /* need a local port? */ 772 xprt->stat.bind_count, 773 xprt->stat.connect_count, 774 xprt->stat.connect_time, 775 idle_time, 776 xprt->stat.sends, 777 xprt->stat.recvs, 778 xprt->stat.bad_xids, 779 xprt->stat.req_u, 780 xprt->stat.bklog_u); 781 seq_printf(seq, "%lu %lu %lu %llu %llu %llu %llu %lu %lu %lu %lu ", 782 r_xprt->rx_stats.read_chunk_count, 783 r_xprt->rx_stats.write_chunk_count, 784 r_xprt->rx_stats.reply_chunk_count, 785 r_xprt->rx_stats.total_rdma_request, 786 r_xprt->rx_stats.total_rdma_reply, 787 r_xprt->rx_stats.pullup_copy_count, 788 r_xprt->rx_stats.fixup_copy_count, 789 r_xprt->rx_stats.hardway_register_count, 790 r_xprt->rx_stats.failed_marshal_count, 791 r_xprt->rx_stats.bad_reply_count, 792 r_xprt->rx_stats.nomsg_call_count); 793 seq_printf(seq, "%lu %lu %lu %lu %lu %lu\n", 794 r_xprt->rx_stats.mrs_recovered, 795 r_xprt->rx_stats.mrs_orphaned, 796 r_xprt->rx_stats.mrs_allocated, 797 r_xprt->rx_stats.local_inv_needed, 798 r_xprt->rx_stats.empty_sendctx_q, 799 r_xprt->rx_stats.reply_waits_for_send); 800 } 801 802 static int 803 xprt_rdma_enable_swap(struct rpc_xprt *xprt) 804 { 805 return 0; 806 } 807 808 static void 809 xprt_rdma_disable_swap(struct rpc_xprt *xprt) 810 { 811 } 812 813 /* 814 * Plumbing for rpc transport switch and kernel module 815 */ 816 817 static const struct rpc_xprt_ops xprt_rdma_procs = { 818 .reserve_xprt = xprt_reserve_xprt_cong, 819 .release_xprt = xprt_release_xprt_cong, /* sunrpc/xprt.c */ 820 .alloc_slot = xprt_alloc_slot, 821 .release_request = xprt_release_rqst_cong, /* ditto */ 822 .set_retrans_timeout = xprt_set_retrans_timeout_def, /* ditto */ 823 .timer = xprt_rdma_timer, 824 .rpcbind = rpcb_getport_async, /* sunrpc/rpcb_clnt.c */ 825 .set_port = xprt_rdma_set_port, 826 .connect = xprt_rdma_connect, 827 .buf_alloc = xprt_rdma_allocate, 828 .buf_free = xprt_rdma_free, 829 .send_request = xprt_rdma_send_request, 830 .close = xprt_rdma_close, 831 .destroy = xprt_rdma_destroy, 832 .print_stats = xprt_rdma_print_stats, 833 .enable_swap = xprt_rdma_enable_swap, 834 .disable_swap = xprt_rdma_disable_swap, 835 .inject_disconnect = xprt_rdma_inject_disconnect, 836 #if defined(CONFIG_SUNRPC_BACKCHANNEL) 837 .bc_setup = xprt_rdma_bc_setup, 838 .bc_up = xprt_rdma_bc_up, 839 .bc_maxpayload = xprt_rdma_bc_maxpayload, 840 .bc_free_rqst = xprt_rdma_bc_free_rqst, 841 .bc_destroy = xprt_rdma_bc_destroy, 842 #endif 843 }; 844 845 static struct xprt_class xprt_rdma = { 846 .list = LIST_HEAD_INIT(xprt_rdma.list), 847 .name = "rdma", 848 .owner = THIS_MODULE, 849 .ident = XPRT_TRANSPORT_RDMA, 850 .setup = xprt_setup_rdma, 851 }; 852 853 void xprt_rdma_cleanup(void) 854 { 855 int rc; 856 857 dprintk("RPCRDMA Module Removed, deregister RPC RDMA transport\n"); 858 #if IS_ENABLED(CONFIG_SUNRPC_DEBUG) 859 if (sunrpc_table_header) { 860 unregister_sysctl_table(sunrpc_table_header); 861 sunrpc_table_header = NULL; 862 } 863 #endif 864 rc = xprt_unregister_transport(&xprt_rdma); 865 if (rc) 866 dprintk("RPC: %s: xprt_unregister returned %i\n", 867 __func__, rc); 868 869 rpcrdma_destroy_wq(); 870 871 rc = xprt_unregister_transport(&xprt_rdma_bc); 872 if (rc) 873 dprintk("RPC: %s: xprt_unregister(bc) returned %i\n", 874 __func__, rc); 875 } 876 877 int xprt_rdma_init(void) 878 { 879 int rc; 880 881 rc = rpcrdma_alloc_wq(); 882 if (rc) 883 return rc; 884 885 rc = xprt_register_transport(&xprt_rdma); 886 if (rc) { 887 rpcrdma_destroy_wq(); 888 return rc; 889 } 890 891 rc = xprt_register_transport(&xprt_rdma_bc); 892 if (rc) { 893 xprt_unregister_transport(&xprt_rdma); 894 rpcrdma_destroy_wq(); 895 return rc; 896 } 897 898 dprintk("RPCRDMA Module Init, register RPC RDMA transport\n"); 899 900 dprintk("Defaults:\n"); 901 dprintk("\tSlots %d\n" 902 "\tMaxInlineRead %d\n\tMaxInlineWrite %d\n", 903 xprt_rdma_slot_table_entries, 904 xprt_rdma_max_inline_read, xprt_rdma_max_inline_write); 905 dprintk("\tPadding %d\n\tMemreg %d\n", 906 xprt_rdma_inline_write_padding, xprt_rdma_memreg_strategy); 907 908 #if IS_ENABLED(CONFIG_SUNRPC_DEBUG) 909 if (!sunrpc_table_header) 910 sunrpc_table_header = register_sysctl_table(sunrpc_table); 911 #endif 912 return 0; 913 } 914