1 // SPDX-License-Identifier: GPL-2.0 OR BSD-3-Clause 2 /* 3 * Copyright (c) 2014-2017 Oracle. All rights reserved. 4 * Copyright (c) 2003-2007 Network Appliance, Inc. All rights reserved. 5 * 6 * This software is available to you under a choice of one of two 7 * licenses. You may choose to be licensed under the terms of the GNU 8 * General Public License (GPL) Version 2, available from the file 9 * COPYING in the main directory of this source tree, or the BSD-type 10 * license below: 11 * 12 * Redistribution and use in source and binary forms, with or without 13 * modification, are permitted provided that the following conditions 14 * are met: 15 * 16 * Redistributions of source code must retain the above copyright 17 * notice, this list of conditions and the following disclaimer. 18 * 19 * Redistributions in binary form must reproduce the above 20 * copyright notice, this list of conditions and the following 21 * disclaimer in the documentation and/or other materials provided 22 * with the distribution. 23 * 24 * Neither the name of the Network Appliance, Inc. nor the names of 25 * its contributors may be used to endorse or promote products 26 * derived from this software without specific prior written 27 * permission. 28 * 29 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 30 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 31 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 32 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 33 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 34 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 35 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 36 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 37 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 38 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 39 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 40 */ 41 42 /* 43 * transport.c 44 * 45 * This file contains the top-level implementation of an RPC RDMA 46 * transport. 47 * 48 * Naming convention: functions beginning with xprt_ are part of the 49 * transport switch. All others are RPC RDMA internal. 50 */ 51 52 #include <linux/module.h> 53 #include <linux/slab.h> 54 #include <linux/seq_file.h> 55 #include <linux/smp.h> 56 57 #include <linux/sunrpc/addr.h> 58 #include <linux/sunrpc/svc_rdma.h> 59 60 #include "xprt_rdma.h" 61 #include <trace/events/rpcrdma.h> 62 63 #if IS_ENABLED(CONFIG_SUNRPC_DEBUG) 64 # define RPCDBG_FACILITY RPCDBG_TRANS 65 #endif 66 67 /* 68 * tunables 69 */ 70 71 unsigned int xprt_rdma_slot_table_entries = RPCRDMA_DEF_SLOT_TABLE; 72 unsigned int xprt_rdma_max_inline_read = RPCRDMA_DEF_INLINE; 73 unsigned int xprt_rdma_max_inline_write = RPCRDMA_DEF_INLINE; 74 unsigned int xprt_rdma_memreg_strategy = RPCRDMA_FRWR; 75 int xprt_rdma_pad_optimize; 76 77 #if IS_ENABLED(CONFIG_SUNRPC_DEBUG) 78 79 static unsigned int min_slot_table_size = RPCRDMA_MIN_SLOT_TABLE; 80 static unsigned int max_slot_table_size = RPCRDMA_MAX_SLOT_TABLE; 81 static unsigned int min_inline_size = RPCRDMA_MIN_INLINE; 82 static unsigned int max_inline_size = RPCRDMA_MAX_INLINE; 83 static unsigned int max_padding = PAGE_SIZE; 84 static unsigned int min_memreg = RPCRDMA_BOUNCEBUFFERS; 85 static unsigned int max_memreg = RPCRDMA_LAST - 1; 86 static unsigned int dummy; 87 88 static struct ctl_table_header *sunrpc_table_header; 89 90 static struct ctl_table xr_tunables_table[] = { 91 { 92 .procname = "rdma_slot_table_entries", 93 .data = &xprt_rdma_slot_table_entries, 94 .maxlen = sizeof(unsigned int), 95 .mode = 0644, 96 .proc_handler = proc_dointvec_minmax, 97 .extra1 = &min_slot_table_size, 98 .extra2 = &max_slot_table_size 99 }, 100 { 101 .procname = "rdma_max_inline_read", 102 .data = &xprt_rdma_max_inline_read, 103 .maxlen = sizeof(unsigned int), 104 .mode = 0644, 105 .proc_handler = proc_dointvec_minmax, 106 .extra1 = &min_inline_size, 107 .extra2 = &max_inline_size, 108 }, 109 { 110 .procname = "rdma_max_inline_write", 111 .data = &xprt_rdma_max_inline_write, 112 .maxlen = sizeof(unsigned int), 113 .mode = 0644, 114 .proc_handler = proc_dointvec_minmax, 115 .extra1 = &min_inline_size, 116 .extra2 = &max_inline_size, 117 }, 118 { 119 .procname = "rdma_inline_write_padding", 120 .data = &dummy, 121 .maxlen = sizeof(unsigned int), 122 .mode = 0644, 123 .proc_handler = proc_dointvec_minmax, 124 .extra1 = SYSCTL_ZERO, 125 .extra2 = &max_padding, 126 }, 127 { 128 .procname = "rdma_memreg_strategy", 129 .data = &xprt_rdma_memreg_strategy, 130 .maxlen = sizeof(unsigned int), 131 .mode = 0644, 132 .proc_handler = proc_dointvec_minmax, 133 .extra1 = &min_memreg, 134 .extra2 = &max_memreg, 135 }, 136 { 137 .procname = "rdma_pad_optimize", 138 .data = &xprt_rdma_pad_optimize, 139 .maxlen = sizeof(unsigned int), 140 .mode = 0644, 141 .proc_handler = proc_dointvec, 142 }, 143 { }, 144 }; 145 146 static struct ctl_table sunrpc_table[] = { 147 { 148 .procname = "sunrpc", 149 .mode = 0555, 150 .child = xr_tunables_table 151 }, 152 { }, 153 }; 154 155 #endif 156 157 static const struct rpc_xprt_ops xprt_rdma_procs; 158 159 static void 160 xprt_rdma_format_addresses4(struct rpc_xprt *xprt, struct sockaddr *sap) 161 { 162 struct sockaddr_in *sin = (struct sockaddr_in *)sap; 163 char buf[20]; 164 165 snprintf(buf, sizeof(buf), "%08x", ntohl(sin->sin_addr.s_addr)); 166 xprt->address_strings[RPC_DISPLAY_HEX_ADDR] = kstrdup(buf, GFP_KERNEL); 167 168 xprt->address_strings[RPC_DISPLAY_NETID] = RPCBIND_NETID_RDMA; 169 } 170 171 static void 172 xprt_rdma_format_addresses6(struct rpc_xprt *xprt, struct sockaddr *sap) 173 { 174 struct sockaddr_in6 *sin6 = (struct sockaddr_in6 *)sap; 175 char buf[40]; 176 177 snprintf(buf, sizeof(buf), "%pi6", &sin6->sin6_addr); 178 xprt->address_strings[RPC_DISPLAY_HEX_ADDR] = kstrdup(buf, GFP_KERNEL); 179 180 xprt->address_strings[RPC_DISPLAY_NETID] = RPCBIND_NETID_RDMA6; 181 } 182 183 void 184 xprt_rdma_format_addresses(struct rpc_xprt *xprt, struct sockaddr *sap) 185 { 186 char buf[128]; 187 188 switch (sap->sa_family) { 189 case AF_INET: 190 xprt_rdma_format_addresses4(xprt, sap); 191 break; 192 case AF_INET6: 193 xprt_rdma_format_addresses6(xprt, sap); 194 break; 195 default: 196 pr_err("rpcrdma: Unrecognized address family\n"); 197 return; 198 } 199 200 (void)rpc_ntop(sap, buf, sizeof(buf)); 201 xprt->address_strings[RPC_DISPLAY_ADDR] = kstrdup(buf, GFP_KERNEL); 202 203 snprintf(buf, sizeof(buf), "%u", rpc_get_port(sap)); 204 xprt->address_strings[RPC_DISPLAY_PORT] = kstrdup(buf, GFP_KERNEL); 205 206 snprintf(buf, sizeof(buf), "%4hx", rpc_get_port(sap)); 207 xprt->address_strings[RPC_DISPLAY_HEX_PORT] = kstrdup(buf, GFP_KERNEL); 208 209 xprt->address_strings[RPC_DISPLAY_PROTO] = "rdma"; 210 } 211 212 void 213 xprt_rdma_free_addresses(struct rpc_xprt *xprt) 214 { 215 unsigned int i; 216 217 for (i = 0; i < RPC_DISPLAY_MAX; i++) 218 switch (i) { 219 case RPC_DISPLAY_PROTO: 220 case RPC_DISPLAY_NETID: 221 continue; 222 default: 223 kfree(xprt->address_strings[i]); 224 } 225 } 226 227 /** 228 * xprt_rdma_connect_worker - establish connection in the background 229 * @work: worker thread context 230 * 231 * Requester holds the xprt's send lock to prevent activity on this 232 * transport while a fresh connection is being established. RPC tasks 233 * sleep on the xprt's pending queue waiting for connect to complete. 234 */ 235 static void 236 xprt_rdma_connect_worker(struct work_struct *work) 237 { 238 struct rpcrdma_xprt *r_xprt = container_of(work, struct rpcrdma_xprt, 239 rx_connect_worker.work); 240 struct rpc_xprt *xprt = &r_xprt->rx_xprt; 241 int rc; 242 243 rc = rpcrdma_xprt_connect(r_xprt); 244 xprt_clear_connecting(xprt); 245 if (r_xprt->rx_ep && r_xprt->rx_ep->re_connect_status > 0) { 246 xprt->connect_cookie++; 247 xprt->stat.connect_count++; 248 xprt->stat.connect_time += (long)jiffies - 249 xprt->stat.connect_start; 250 xprt_set_connected(xprt); 251 rc = -EAGAIN; 252 } 253 xprt_wake_pending_tasks(xprt, rc); 254 } 255 256 /** 257 * xprt_rdma_inject_disconnect - inject a connection fault 258 * @xprt: transport context 259 * 260 * If @xprt is connected, disconnect it to simulate spurious connection 261 * loss. 262 */ 263 static void 264 xprt_rdma_inject_disconnect(struct rpc_xprt *xprt) 265 { 266 struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt); 267 268 trace_xprtrdma_op_inject_dsc(r_xprt); 269 rdma_disconnect(r_xprt->rx_ep->re_id); 270 } 271 272 /** 273 * xprt_rdma_destroy - Full tear down of transport 274 * @xprt: doomed transport context 275 * 276 * Caller guarantees there will be no more calls to us with 277 * this @xprt. 278 */ 279 static void 280 xprt_rdma_destroy(struct rpc_xprt *xprt) 281 { 282 struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt); 283 284 trace_xprtrdma_op_destroy(r_xprt); 285 286 cancel_delayed_work_sync(&r_xprt->rx_connect_worker); 287 288 rpcrdma_xprt_disconnect(r_xprt); 289 rpcrdma_buffer_destroy(&r_xprt->rx_buf); 290 291 xprt_rdma_free_addresses(xprt); 292 xprt_free(xprt); 293 294 module_put(THIS_MODULE); 295 } 296 297 /* 60 second timeout, no retries */ 298 static const struct rpc_timeout xprt_rdma_default_timeout = { 299 .to_initval = 60 * HZ, 300 .to_maxval = 60 * HZ, 301 }; 302 303 /** 304 * xprt_setup_rdma - Set up transport to use RDMA 305 * 306 * @args: rpc transport arguments 307 */ 308 static struct rpc_xprt * 309 xprt_setup_rdma(struct xprt_create *args) 310 { 311 struct rpc_xprt *xprt; 312 struct rpcrdma_xprt *new_xprt; 313 struct sockaddr *sap; 314 int rc; 315 316 if (args->addrlen > sizeof(xprt->addr)) 317 return ERR_PTR(-EBADF); 318 319 if (!try_module_get(THIS_MODULE)) 320 return ERR_PTR(-EIO); 321 322 xprt = xprt_alloc(args->net, sizeof(struct rpcrdma_xprt), 0, 323 xprt_rdma_slot_table_entries); 324 if (!xprt) { 325 module_put(THIS_MODULE); 326 return ERR_PTR(-ENOMEM); 327 } 328 329 xprt->timeout = &xprt_rdma_default_timeout; 330 xprt->connect_timeout = xprt->timeout->to_initval; 331 xprt->max_reconnect_timeout = xprt->timeout->to_maxval; 332 xprt->bind_timeout = RPCRDMA_BIND_TO; 333 xprt->reestablish_timeout = RPCRDMA_INIT_REEST_TO; 334 xprt->idle_timeout = RPCRDMA_IDLE_DISC_TO; 335 336 xprt->resvport = 0; /* privileged port not needed */ 337 xprt->ops = &xprt_rdma_procs; 338 339 /* 340 * Set up RDMA-specific connect data. 341 */ 342 sap = args->dstaddr; 343 344 /* Ensure xprt->addr holds valid server TCP (not RDMA) 345 * address, for any side protocols which peek at it */ 346 xprt->prot = IPPROTO_TCP; 347 xprt->addrlen = args->addrlen; 348 memcpy(&xprt->addr, sap, xprt->addrlen); 349 350 if (rpc_get_port(sap)) 351 xprt_set_bound(xprt); 352 xprt_rdma_format_addresses(xprt, sap); 353 354 new_xprt = rpcx_to_rdmax(xprt); 355 rc = rpcrdma_buffer_create(new_xprt); 356 if (rc) { 357 xprt_rdma_free_addresses(xprt); 358 xprt_free(xprt); 359 module_put(THIS_MODULE); 360 return ERR_PTR(rc); 361 } 362 363 INIT_DELAYED_WORK(&new_xprt->rx_connect_worker, 364 xprt_rdma_connect_worker); 365 366 xprt->max_payload = RPCRDMA_MAX_DATA_SEGS << PAGE_SHIFT; 367 368 dprintk("RPC: %s: %s:%s\n", __func__, 369 xprt->address_strings[RPC_DISPLAY_ADDR], 370 xprt->address_strings[RPC_DISPLAY_PORT]); 371 trace_xprtrdma_create(new_xprt); 372 return xprt; 373 } 374 375 /** 376 * xprt_rdma_close - close a transport connection 377 * @xprt: transport context 378 * 379 * Called during autoclose or device removal. 380 * 381 * Caller holds @xprt's send lock to prevent activity on this 382 * transport while the connection is torn down. 383 */ 384 void xprt_rdma_close(struct rpc_xprt *xprt) 385 { 386 struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt); 387 388 trace_xprtrdma_op_close(r_xprt); 389 390 rpcrdma_xprt_disconnect(r_xprt); 391 392 xprt->reestablish_timeout = 0; 393 ++xprt->connect_cookie; 394 xprt_disconnect_done(xprt); 395 } 396 397 /** 398 * xprt_rdma_set_port - update server port with rpcbind result 399 * @xprt: controlling RPC transport 400 * @port: new port value 401 * 402 * Transport connect status is unchanged. 403 */ 404 static void 405 xprt_rdma_set_port(struct rpc_xprt *xprt, u16 port) 406 { 407 struct sockaddr *sap = (struct sockaddr *)&xprt->addr; 408 char buf[8]; 409 410 rpc_set_port(sap, port); 411 412 kfree(xprt->address_strings[RPC_DISPLAY_PORT]); 413 snprintf(buf, sizeof(buf), "%u", port); 414 xprt->address_strings[RPC_DISPLAY_PORT] = kstrdup(buf, GFP_KERNEL); 415 416 kfree(xprt->address_strings[RPC_DISPLAY_HEX_PORT]); 417 snprintf(buf, sizeof(buf), "%4hx", port); 418 xprt->address_strings[RPC_DISPLAY_HEX_PORT] = kstrdup(buf, GFP_KERNEL); 419 420 trace_xprtrdma_op_setport(container_of(xprt, struct rpcrdma_xprt, 421 rx_xprt)); 422 } 423 424 /** 425 * xprt_rdma_timer - invoked when an RPC times out 426 * @xprt: controlling RPC transport 427 * @task: RPC task that timed out 428 * 429 * Invoked when the transport is still connected, but an RPC 430 * retransmit timeout occurs. 431 * 432 * Since RDMA connections don't have a keep-alive, forcibly 433 * disconnect and retry to connect. This drives full 434 * detection of the network path, and retransmissions of 435 * all pending RPCs. 436 */ 437 static void 438 xprt_rdma_timer(struct rpc_xprt *xprt, struct rpc_task *task) 439 { 440 xprt_force_disconnect(xprt); 441 } 442 443 /** 444 * xprt_rdma_set_connect_timeout - set timeouts for establishing a connection 445 * @xprt: controlling transport instance 446 * @connect_timeout: reconnect timeout after client disconnects 447 * @reconnect_timeout: reconnect timeout after server disconnects 448 * 449 */ 450 static void xprt_rdma_set_connect_timeout(struct rpc_xprt *xprt, 451 unsigned long connect_timeout, 452 unsigned long reconnect_timeout) 453 { 454 struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt); 455 456 trace_xprtrdma_op_set_cto(r_xprt, connect_timeout, reconnect_timeout); 457 458 spin_lock(&xprt->transport_lock); 459 460 if (connect_timeout < xprt->connect_timeout) { 461 struct rpc_timeout to; 462 unsigned long initval; 463 464 to = *xprt->timeout; 465 initval = connect_timeout; 466 if (initval < RPCRDMA_INIT_REEST_TO << 1) 467 initval = RPCRDMA_INIT_REEST_TO << 1; 468 to.to_initval = initval; 469 to.to_maxval = initval; 470 r_xprt->rx_timeout = to; 471 xprt->timeout = &r_xprt->rx_timeout; 472 xprt->connect_timeout = connect_timeout; 473 } 474 475 if (reconnect_timeout < xprt->max_reconnect_timeout) 476 xprt->max_reconnect_timeout = reconnect_timeout; 477 478 spin_unlock(&xprt->transport_lock); 479 } 480 481 /** 482 * xprt_rdma_connect - schedule an attempt to reconnect 483 * @xprt: transport state 484 * @task: RPC scheduler context (unused) 485 * 486 */ 487 static void 488 xprt_rdma_connect(struct rpc_xprt *xprt, struct rpc_task *task) 489 { 490 struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt); 491 struct rpcrdma_ep *ep = r_xprt->rx_ep; 492 unsigned long delay; 493 494 delay = 0; 495 if (ep && ep->re_connect_status != 0) { 496 delay = xprt_reconnect_delay(xprt); 497 xprt_reconnect_backoff(xprt, RPCRDMA_INIT_REEST_TO); 498 } 499 trace_xprtrdma_op_connect(r_xprt, delay); 500 queue_delayed_work(xprtiod_workqueue, &r_xprt->rx_connect_worker, 501 delay); 502 } 503 504 /** 505 * xprt_rdma_alloc_slot - allocate an rpc_rqst 506 * @xprt: controlling RPC transport 507 * @task: RPC task requesting a fresh rpc_rqst 508 * 509 * tk_status values: 510 * %0 if task->tk_rqstp points to a fresh rpc_rqst 511 * %-EAGAIN if no rpc_rqst is available; queued on backlog 512 */ 513 static void 514 xprt_rdma_alloc_slot(struct rpc_xprt *xprt, struct rpc_task *task) 515 { 516 struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt); 517 struct rpcrdma_req *req; 518 519 req = rpcrdma_buffer_get(&r_xprt->rx_buf); 520 if (!req) 521 goto out_sleep; 522 task->tk_rqstp = &req->rl_slot; 523 task->tk_status = 0; 524 return; 525 526 out_sleep: 527 set_bit(XPRT_CONGESTED, &xprt->state); 528 rpc_sleep_on(&xprt->backlog, task, NULL); 529 task->tk_status = -EAGAIN; 530 } 531 532 /** 533 * xprt_rdma_free_slot - release an rpc_rqst 534 * @xprt: controlling RPC transport 535 * @rqst: rpc_rqst to release 536 * 537 */ 538 static void 539 xprt_rdma_free_slot(struct rpc_xprt *xprt, struct rpc_rqst *rqst) 540 { 541 struct rpcrdma_xprt *r_xprt = 542 container_of(xprt, struct rpcrdma_xprt, rx_xprt); 543 544 memset(rqst, 0, sizeof(*rqst)); 545 rpcrdma_buffer_put(&r_xprt->rx_buf, rpcr_to_rdmar(rqst)); 546 if (unlikely(!rpc_wake_up_next(&xprt->backlog))) 547 clear_bit(XPRT_CONGESTED, &xprt->state); 548 } 549 550 static bool rpcrdma_check_regbuf(struct rpcrdma_xprt *r_xprt, 551 struct rpcrdma_regbuf *rb, size_t size, 552 gfp_t flags) 553 { 554 if (unlikely(rdmab_length(rb) < size)) { 555 if (!rpcrdma_regbuf_realloc(rb, size, flags)) 556 return false; 557 r_xprt->rx_stats.hardway_register_count += size; 558 } 559 return true; 560 } 561 562 /** 563 * xprt_rdma_allocate - allocate transport resources for an RPC 564 * @task: RPC task 565 * 566 * Return values: 567 * 0: Success; rq_buffer points to RPC buffer to use 568 * ENOMEM: Out of memory, call again later 569 * EIO: A permanent error occurred, do not retry 570 */ 571 static int 572 xprt_rdma_allocate(struct rpc_task *task) 573 { 574 struct rpc_rqst *rqst = task->tk_rqstp; 575 struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(rqst->rq_xprt); 576 struct rpcrdma_req *req = rpcr_to_rdmar(rqst); 577 gfp_t flags; 578 579 flags = RPCRDMA_DEF_GFP; 580 if (RPC_IS_SWAPPER(task)) 581 flags = __GFP_MEMALLOC | GFP_NOWAIT | __GFP_NOWARN; 582 583 if (!rpcrdma_check_regbuf(r_xprt, req->rl_sendbuf, rqst->rq_callsize, 584 flags)) 585 goto out_fail; 586 if (!rpcrdma_check_regbuf(r_xprt, req->rl_recvbuf, rqst->rq_rcvsize, 587 flags)) 588 goto out_fail; 589 590 rqst->rq_buffer = rdmab_data(req->rl_sendbuf); 591 rqst->rq_rbuffer = rdmab_data(req->rl_recvbuf); 592 trace_xprtrdma_op_allocate(task, req); 593 return 0; 594 595 out_fail: 596 trace_xprtrdma_op_allocate(task, NULL); 597 return -ENOMEM; 598 } 599 600 /** 601 * xprt_rdma_free - release resources allocated by xprt_rdma_allocate 602 * @task: RPC task 603 * 604 * Caller guarantees rqst->rq_buffer is non-NULL. 605 */ 606 static void 607 xprt_rdma_free(struct rpc_task *task) 608 { 609 struct rpc_rqst *rqst = task->tk_rqstp; 610 struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(rqst->rq_xprt); 611 struct rpcrdma_req *req = rpcr_to_rdmar(rqst); 612 613 trace_xprtrdma_op_free(task, req); 614 615 if (!list_empty(&req->rl_registered)) 616 frwr_unmap_sync(r_xprt, req); 617 618 /* XXX: If the RPC is completing because of a signal and 619 * not because a reply was received, we ought to ensure 620 * that the Send completion has fired, so that memory 621 * involved with the Send is not still visible to the NIC. 622 */ 623 } 624 625 /** 626 * xprt_rdma_send_request - marshal and send an RPC request 627 * @rqst: RPC message in rq_snd_buf 628 * 629 * Caller holds the transport's write lock. 630 * 631 * Returns: 632 * %0 if the RPC message has been sent 633 * %-ENOTCONN if the caller should reconnect and call again 634 * %-EAGAIN if the caller should call again 635 * %-ENOBUFS if the caller should call again after a delay 636 * %-EMSGSIZE if encoding ran out of buffer space. The request 637 * was not sent. Do not try to send this message again. 638 * %-EIO if an I/O error occurred. The request was not sent. 639 * Do not try to send this message again. 640 */ 641 static int 642 xprt_rdma_send_request(struct rpc_rqst *rqst) 643 { 644 struct rpc_xprt *xprt = rqst->rq_xprt; 645 struct rpcrdma_req *req = rpcr_to_rdmar(rqst); 646 struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt); 647 int rc = 0; 648 649 #if defined(CONFIG_SUNRPC_BACKCHANNEL) 650 if (unlikely(!rqst->rq_buffer)) 651 return xprt_rdma_bc_send_reply(rqst); 652 #endif /* CONFIG_SUNRPC_BACKCHANNEL */ 653 654 if (!xprt_connected(xprt)) 655 return -ENOTCONN; 656 657 if (!xprt_request_get_cong(xprt, rqst)) 658 return -EBADSLT; 659 660 rc = rpcrdma_marshal_req(r_xprt, rqst); 661 if (rc < 0) 662 goto failed_marshal; 663 664 /* Must suppress retransmit to maintain credits */ 665 if (rqst->rq_connect_cookie == xprt->connect_cookie) 666 goto drop_connection; 667 rqst->rq_xtime = ktime_get(); 668 669 if (rpcrdma_post_sends(r_xprt, req)) 670 goto drop_connection; 671 672 rqst->rq_xmit_bytes_sent += rqst->rq_snd_buf.len; 673 674 /* An RPC with no reply will throw off credit accounting, 675 * so drop the connection to reset the credit grant. 676 */ 677 if (!rpc_reply_expected(rqst->rq_task)) 678 goto drop_connection; 679 return 0; 680 681 failed_marshal: 682 if (rc != -ENOTCONN) 683 return rc; 684 drop_connection: 685 xprt_rdma_close(xprt); 686 return -ENOTCONN; 687 } 688 689 void xprt_rdma_print_stats(struct rpc_xprt *xprt, struct seq_file *seq) 690 { 691 struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt); 692 long idle_time = 0; 693 694 if (xprt_connected(xprt)) 695 idle_time = (long)(jiffies - xprt->last_used) / HZ; 696 697 seq_puts(seq, "\txprt:\trdma "); 698 seq_printf(seq, "%u %lu %lu %lu %ld %lu %lu %lu %llu %llu ", 699 0, /* need a local port? */ 700 xprt->stat.bind_count, 701 xprt->stat.connect_count, 702 xprt->stat.connect_time / HZ, 703 idle_time, 704 xprt->stat.sends, 705 xprt->stat.recvs, 706 xprt->stat.bad_xids, 707 xprt->stat.req_u, 708 xprt->stat.bklog_u); 709 seq_printf(seq, "%lu %lu %lu %llu %llu %llu %llu %lu %lu %lu %lu ", 710 r_xprt->rx_stats.read_chunk_count, 711 r_xprt->rx_stats.write_chunk_count, 712 r_xprt->rx_stats.reply_chunk_count, 713 r_xprt->rx_stats.total_rdma_request, 714 r_xprt->rx_stats.total_rdma_reply, 715 r_xprt->rx_stats.pullup_copy_count, 716 r_xprt->rx_stats.fixup_copy_count, 717 r_xprt->rx_stats.hardway_register_count, 718 r_xprt->rx_stats.failed_marshal_count, 719 r_xprt->rx_stats.bad_reply_count, 720 r_xprt->rx_stats.nomsg_call_count); 721 seq_printf(seq, "%lu %lu %lu %lu %lu %lu\n", 722 r_xprt->rx_stats.mrs_recycled, 723 r_xprt->rx_stats.mrs_orphaned, 724 r_xprt->rx_stats.mrs_allocated, 725 r_xprt->rx_stats.local_inv_needed, 726 r_xprt->rx_stats.empty_sendctx_q, 727 r_xprt->rx_stats.reply_waits_for_send); 728 } 729 730 static int 731 xprt_rdma_enable_swap(struct rpc_xprt *xprt) 732 { 733 return 0; 734 } 735 736 static void 737 xprt_rdma_disable_swap(struct rpc_xprt *xprt) 738 { 739 } 740 741 /* 742 * Plumbing for rpc transport switch and kernel module 743 */ 744 745 static const struct rpc_xprt_ops xprt_rdma_procs = { 746 .reserve_xprt = xprt_reserve_xprt_cong, 747 .release_xprt = xprt_release_xprt_cong, /* sunrpc/xprt.c */ 748 .alloc_slot = xprt_rdma_alloc_slot, 749 .free_slot = xprt_rdma_free_slot, 750 .release_request = xprt_release_rqst_cong, /* ditto */ 751 .wait_for_reply_request = xprt_wait_for_reply_request_def, /* ditto */ 752 .timer = xprt_rdma_timer, 753 .rpcbind = rpcb_getport_async, /* sunrpc/rpcb_clnt.c */ 754 .set_port = xprt_rdma_set_port, 755 .connect = xprt_rdma_connect, 756 .buf_alloc = xprt_rdma_allocate, 757 .buf_free = xprt_rdma_free, 758 .send_request = xprt_rdma_send_request, 759 .close = xprt_rdma_close, 760 .destroy = xprt_rdma_destroy, 761 .set_connect_timeout = xprt_rdma_set_connect_timeout, 762 .print_stats = xprt_rdma_print_stats, 763 .enable_swap = xprt_rdma_enable_swap, 764 .disable_swap = xprt_rdma_disable_swap, 765 .inject_disconnect = xprt_rdma_inject_disconnect, 766 #if defined(CONFIG_SUNRPC_BACKCHANNEL) 767 .bc_setup = xprt_rdma_bc_setup, 768 .bc_maxpayload = xprt_rdma_bc_maxpayload, 769 .bc_num_slots = xprt_rdma_bc_max_slots, 770 .bc_free_rqst = xprt_rdma_bc_free_rqst, 771 .bc_destroy = xprt_rdma_bc_destroy, 772 #endif 773 }; 774 775 static struct xprt_class xprt_rdma = { 776 .list = LIST_HEAD_INIT(xprt_rdma.list), 777 .name = "rdma", 778 .owner = THIS_MODULE, 779 .ident = XPRT_TRANSPORT_RDMA, 780 .setup = xprt_setup_rdma, 781 }; 782 783 void xprt_rdma_cleanup(void) 784 { 785 #if IS_ENABLED(CONFIG_SUNRPC_DEBUG) 786 if (sunrpc_table_header) { 787 unregister_sysctl_table(sunrpc_table_header); 788 sunrpc_table_header = NULL; 789 } 790 #endif 791 792 xprt_unregister_transport(&xprt_rdma); 793 xprt_unregister_transport(&xprt_rdma_bc); 794 } 795 796 int xprt_rdma_init(void) 797 { 798 int rc; 799 800 rc = xprt_register_transport(&xprt_rdma); 801 if (rc) 802 return rc; 803 804 rc = xprt_register_transport(&xprt_rdma_bc); 805 if (rc) { 806 xprt_unregister_transport(&xprt_rdma); 807 return rc; 808 } 809 810 #if IS_ENABLED(CONFIG_SUNRPC_DEBUG) 811 if (!sunrpc_table_header) 812 sunrpc_table_header = register_sysctl_table(sunrpc_table); 813 #endif 814 return 0; 815 } 816