1 // SPDX-License-Identifier: GPL-2.0 OR BSD-3-Clause 2 /* 3 * Copyright (c) 2014-2017 Oracle. All rights reserved. 4 * Copyright (c) 2003-2007 Network Appliance, Inc. All rights reserved. 5 * 6 * This software is available to you under a choice of one of two 7 * licenses. You may choose to be licensed under the terms of the GNU 8 * General Public License (GPL) Version 2, available from the file 9 * COPYING in the main directory of this source tree, or the BSD-type 10 * license below: 11 * 12 * Redistribution and use in source and binary forms, with or without 13 * modification, are permitted provided that the following conditions 14 * are met: 15 * 16 * Redistributions of source code must retain the above copyright 17 * notice, this list of conditions and the following disclaimer. 18 * 19 * Redistributions in binary form must reproduce the above 20 * copyright notice, this list of conditions and the following 21 * disclaimer in the documentation and/or other materials provided 22 * with the distribution. 23 * 24 * Neither the name of the Network Appliance, Inc. nor the names of 25 * its contributors may be used to endorse or promote products 26 * derived from this software without specific prior written 27 * permission. 28 * 29 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 30 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 31 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 32 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 33 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 34 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 35 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 36 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 37 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 38 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 39 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 40 */ 41 42 /* 43 * verbs.c 44 * 45 * Encapsulates the major functions managing: 46 * o adapters 47 * o endpoints 48 * o connections 49 * o buffer memory 50 */ 51 52 #include <linux/interrupt.h> 53 #include <linux/slab.h> 54 #include <linux/sunrpc/addr.h> 55 #include <linux/sunrpc/svc_rdma.h> 56 #include <linux/log2.h> 57 58 #include <asm-generic/barrier.h> 59 #include <asm/bitops.h> 60 61 #include <rdma/ib_cm.h> 62 63 #include "xprt_rdma.h" 64 #include <trace/events/rpcrdma.h> 65 66 /* 67 * Globals/Macros 68 */ 69 70 #if IS_ENABLED(CONFIG_SUNRPC_DEBUG) 71 # define RPCDBG_FACILITY RPCDBG_TRANS 72 #endif 73 74 /* 75 * internal functions 76 */ 77 static int rpcrdma_sendctxs_create(struct rpcrdma_xprt *r_xprt); 78 static void rpcrdma_sendctxs_destroy(struct rpcrdma_xprt *r_xprt); 79 static void rpcrdma_sendctx_put_locked(struct rpcrdma_xprt *r_xprt, 80 struct rpcrdma_sendctx *sc); 81 static int rpcrdma_reqs_setup(struct rpcrdma_xprt *r_xprt); 82 static void rpcrdma_reqs_reset(struct rpcrdma_xprt *r_xprt); 83 static void rpcrdma_rep_destroy(struct rpcrdma_rep *rep); 84 static void rpcrdma_reps_unmap(struct rpcrdma_xprt *r_xprt); 85 static void rpcrdma_mrs_create(struct rpcrdma_xprt *r_xprt); 86 static void rpcrdma_mrs_destroy(struct rpcrdma_xprt *r_xprt); 87 static struct rpcrdma_regbuf * 88 rpcrdma_regbuf_alloc(size_t size, enum dma_data_direction direction, 89 gfp_t flags); 90 static void rpcrdma_regbuf_dma_unmap(struct rpcrdma_regbuf *rb); 91 static void rpcrdma_regbuf_free(struct rpcrdma_regbuf *rb); 92 93 /* Wait for outstanding transport work to finish. ib_drain_qp 94 * handles the drains in the wrong order for us, so open code 95 * them here. 96 */ 97 static void rpcrdma_xprt_drain(struct rpcrdma_xprt *r_xprt) 98 { 99 struct rpcrdma_ia *ia = &r_xprt->rx_ia; 100 101 /* Flush Receives, then wait for deferred Reply work 102 * to complete. 103 */ 104 ib_drain_rq(ia->ri_id->qp); 105 106 /* Deferred Reply processing might have scheduled 107 * local invalidations. 108 */ 109 ib_drain_sq(ia->ri_id->qp); 110 } 111 112 /** 113 * rpcrdma_qp_event_handler - Handle one QP event (error notification) 114 * @event: details of the event 115 * @context: ep that owns QP where event occurred 116 * 117 * Called from the RDMA provider (device driver) possibly in an interrupt 118 * context. 119 */ 120 static void 121 rpcrdma_qp_event_handler(struct ib_event *event, void *context) 122 { 123 struct rpcrdma_ep *ep = context; 124 struct rpcrdma_xprt *r_xprt = container_of(ep, struct rpcrdma_xprt, 125 rx_ep); 126 127 trace_xprtrdma_qp_event(r_xprt, event); 128 } 129 130 /** 131 * rpcrdma_wc_send - Invoked by RDMA provider for each polled Send WC 132 * @cq: completion queue 133 * @wc: completed WR 134 * 135 */ 136 static void 137 rpcrdma_wc_send(struct ib_cq *cq, struct ib_wc *wc) 138 { 139 struct ib_cqe *cqe = wc->wr_cqe; 140 struct rpcrdma_sendctx *sc = 141 container_of(cqe, struct rpcrdma_sendctx, sc_cqe); 142 143 /* WARNING: Only wr_cqe and status are reliable at this point */ 144 trace_xprtrdma_wc_send(sc, wc); 145 rpcrdma_sendctx_put_locked((struct rpcrdma_xprt *)cq->cq_context, sc); 146 } 147 148 /** 149 * rpcrdma_wc_receive - Invoked by RDMA provider for each polled Receive WC 150 * @cq: completion queue (ignored) 151 * @wc: completed WR 152 * 153 */ 154 static void 155 rpcrdma_wc_receive(struct ib_cq *cq, struct ib_wc *wc) 156 { 157 struct ib_cqe *cqe = wc->wr_cqe; 158 struct rpcrdma_rep *rep = container_of(cqe, struct rpcrdma_rep, 159 rr_cqe); 160 struct rpcrdma_xprt *r_xprt = rep->rr_rxprt; 161 162 /* WARNING: Only wr_cqe and status are reliable at this point */ 163 trace_xprtrdma_wc_receive(wc); 164 --r_xprt->rx_ep.rep_receive_count; 165 if (wc->status != IB_WC_SUCCESS) 166 goto out_flushed; 167 168 /* status == SUCCESS means all fields in wc are trustworthy */ 169 rpcrdma_set_xdrlen(&rep->rr_hdrbuf, wc->byte_len); 170 rep->rr_wc_flags = wc->wc_flags; 171 rep->rr_inv_rkey = wc->ex.invalidate_rkey; 172 173 ib_dma_sync_single_for_cpu(rdmab_device(rep->rr_rdmabuf), 174 rdmab_addr(rep->rr_rdmabuf), 175 wc->byte_len, DMA_FROM_DEVICE); 176 177 rpcrdma_reply_handler(rep); 178 return; 179 180 out_flushed: 181 rpcrdma_rep_destroy(rep); 182 } 183 184 static void rpcrdma_update_cm_private(struct rpcrdma_xprt *r_xprt, 185 struct rdma_conn_param *param) 186 { 187 const struct rpcrdma_connect_private *pmsg = param->private_data; 188 struct rpcrdma_ep *ep = &r_xprt->rx_ep; 189 unsigned int rsize, wsize; 190 191 /* Default settings for RPC-over-RDMA Version One */ 192 r_xprt->rx_ia.ri_implicit_roundup = xprt_rdma_pad_optimize; 193 rsize = RPCRDMA_V1_DEF_INLINE_SIZE; 194 wsize = RPCRDMA_V1_DEF_INLINE_SIZE; 195 196 if (pmsg && 197 pmsg->cp_magic == rpcrdma_cmp_magic && 198 pmsg->cp_version == RPCRDMA_CMP_VERSION) { 199 r_xprt->rx_ia.ri_implicit_roundup = true; 200 rsize = rpcrdma_decode_buffer_size(pmsg->cp_send_size); 201 wsize = rpcrdma_decode_buffer_size(pmsg->cp_recv_size); 202 } 203 204 if (rsize < ep->rep_inline_recv) 205 ep->rep_inline_recv = rsize; 206 if (wsize < ep->rep_inline_send) 207 ep->rep_inline_send = wsize; 208 209 rpcrdma_set_max_header_sizes(r_xprt); 210 } 211 212 /** 213 * rpcrdma_cm_event_handler - Handle RDMA CM events 214 * @id: rdma_cm_id on which an event has occurred 215 * @event: details of the event 216 * 217 * Called with @id's mutex held. Returns 1 if caller should 218 * destroy @id, otherwise 0. 219 */ 220 static int 221 rpcrdma_cm_event_handler(struct rdma_cm_id *id, struct rdma_cm_event *event) 222 { 223 struct rpcrdma_xprt *r_xprt = id->context; 224 struct rpcrdma_ia *ia = &r_xprt->rx_ia; 225 struct rpcrdma_ep *ep = &r_xprt->rx_ep; 226 struct rpc_xprt *xprt = &r_xprt->rx_xprt; 227 228 might_sleep(); 229 230 trace_xprtrdma_cm_event(r_xprt, event); 231 switch (event->event) { 232 case RDMA_CM_EVENT_ADDR_RESOLVED: 233 case RDMA_CM_EVENT_ROUTE_RESOLVED: 234 ia->ri_async_rc = 0; 235 complete(&ia->ri_done); 236 return 0; 237 case RDMA_CM_EVENT_ADDR_ERROR: 238 ia->ri_async_rc = -EPROTO; 239 complete(&ia->ri_done); 240 return 0; 241 case RDMA_CM_EVENT_ROUTE_ERROR: 242 ia->ri_async_rc = -ENETUNREACH; 243 complete(&ia->ri_done); 244 return 0; 245 case RDMA_CM_EVENT_DEVICE_REMOVAL: 246 #if IS_ENABLED(CONFIG_SUNRPC_DEBUG) 247 pr_info("rpcrdma: removing device %s for %s:%s\n", 248 ia->ri_id->device->name, 249 rpcrdma_addrstr(r_xprt), rpcrdma_portstr(r_xprt)); 250 #endif 251 init_completion(&ia->ri_remove_done); 252 set_bit(RPCRDMA_IAF_REMOVING, &ia->ri_flags); 253 ep->rep_connected = -ENODEV; 254 xprt_force_disconnect(xprt); 255 wait_for_completion(&ia->ri_remove_done); 256 257 ia->ri_id = NULL; 258 /* Return 1 to ensure the core destroys the id. */ 259 return 1; 260 case RDMA_CM_EVENT_ESTABLISHED: 261 ++xprt->connect_cookie; 262 ep->rep_connected = 1; 263 rpcrdma_update_cm_private(r_xprt, &event->param.conn); 264 trace_xprtrdma_inline_thresh(r_xprt); 265 wake_up_all(&ep->rep_connect_wait); 266 break; 267 case RDMA_CM_EVENT_CONNECT_ERROR: 268 ep->rep_connected = -ENOTCONN; 269 goto disconnected; 270 case RDMA_CM_EVENT_UNREACHABLE: 271 ep->rep_connected = -ENETUNREACH; 272 goto disconnected; 273 case RDMA_CM_EVENT_REJECTED: 274 dprintk("rpcrdma: connection to %s:%s rejected: %s\n", 275 rpcrdma_addrstr(r_xprt), rpcrdma_portstr(r_xprt), 276 rdma_reject_msg(id, event->status)); 277 ep->rep_connected = -ECONNREFUSED; 278 if (event->status == IB_CM_REJ_STALE_CONN) 279 ep->rep_connected = -EAGAIN; 280 goto disconnected; 281 case RDMA_CM_EVENT_DISCONNECTED: 282 ep->rep_connected = -ECONNABORTED; 283 disconnected: 284 xprt_force_disconnect(xprt); 285 wake_up_all(&ep->rep_connect_wait); 286 break; 287 default: 288 break; 289 } 290 291 dprintk("RPC: %s: %s:%s on %s/frwr: %s\n", __func__, 292 rpcrdma_addrstr(r_xprt), rpcrdma_portstr(r_xprt), 293 ia->ri_id->device->name, rdma_event_msg(event->event)); 294 return 0; 295 } 296 297 static struct rdma_cm_id * 298 rpcrdma_create_id(struct rpcrdma_xprt *xprt, struct rpcrdma_ia *ia) 299 { 300 unsigned long wtimeout = msecs_to_jiffies(RDMA_RESOLVE_TIMEOUT) + 1; 301 struct rdma_cm_id *id; 302 int rc; 303 304 init_completion(&ia->ri_done); 305 306 id = rdma_create_id(xprt->rx_xprt.xprt_net, rpcrdma_cm_event_handler, 307 xprt, RDMA_PS_TCP, IB_QPT_RC); 308 if (IS_ERR(id)) 309 return id; 310 311 ia->ri_async_rc = -ETIMEDOUT; 312 rc = rdma_resolve_addr(id, NULL, 313 (struct sockaddr *)&xprt->rx_xprt.addr, 314 RDMA_RESOLVE_TIMEOUT); 315 if (rc) 316 goto out; 317 rc = wait_for_completion_interruptible_timeout(&ia->ri_done, wtimeout); 318 if (rc < 0) 319 goto out; 320 321 rc = ia->ri_async_rc; 322 if (rc) 323 goto out; 324 325 ia->ri_async_rc = -ETIMEDOUT; 326 rc = rdma_resolve_route(id, RDMA_RESOLVE_TIMEOUT); 327 if (rc) 328 goto out; 329 rc = wait_for_completion_interruptible_timeout(&ia->ri_done, wtimeout); 330 if (rc < 0) 331 goto out; 332 rc = ia->ri_async_rc; 333 if (rc) 334 goto out; 335 336 return id; 337 338 out: 339 rdma_destroy_id(id); 340 return ERR_PTR(rc); 341 } 342 343 /* 344 * Exported functions. 345 */ 346 347 /** 348 * rpcrdma_ia_open - Open and initialize an Interface Adapter. 349 * @xprt: transport with IA to (re)initialize 350 * 351 * Returns 0 on success, negative errno if an appropriate 352 * Interface Adapter could not be found and opened. 353 */ 354 int 355 rpcrdma_ia_open(struct rpcrdma_xprt *xprt) 356 { 357 struct rpcrdma_ia *ia = &xprt->rx_ia; 358 int rc; 359 360 ia->ri_id = rpcrdma_create_id(xprt, ia); 361 if (IS_ERR(ia->ri_id)) { 362 rc = PTR_ERR(ia->ri_id); 363 goto out_err; 364 } 365 366 ia->ri_pd = ib_alloc_pd(ia->ri_id->device, 0); 367 if (IS_ERR(ia->ri_pd)) { 368 rc = PTR_ERR(ia->ri_pd); 369 pr_err("rpcrdma: ib_alloc_pd() returned %d\n", rc); 370 goto out_err; 371 } 372 373 return 0; 374 375 out_err: 376 rpcrdma_ia_close(ia); 377 return rc; 378 } 379 380 /** 381 * rpcrdma_ia_remove - Handle device driver unload 382 * @ia: interface adapter being removed 383 * 384 * Divest transport H/W resources associated with this adapter, 385 * but allow it to be restored later. 386 * 387 * Caller must hold the transport send lock. 388 */ 389 void 390 rpcrdma_ia_remove(struct rpcrdma_ia *ia) 391 { 392 struct rpcrdma_xprt *r_xprt = container_of(ia, struct rpcrdma_xprt, 393 rx_ia); 394 struct rpcrdma_ep *ep = &r_xprt->rx_ep; 395 396 /* This is similar to rpcrdma_ep_destroy, but: 397 * - Don't cancel the connect worker. 398 * - Don't call rpcrdma_ep_disconnect, which waits 399 * for another conn upcall, which will deadlock. 400 * - rdma_disconnect is unneeded, the underlying 401 * connection is already gone. 402 */ 403 if (ia->ri_id->qp) { 404 rpcrdma_xprt_drain(r_xprt); 405 rdma_destroy_qp(ia->ri_id); 406 ia->ri_id->qp = NULL; 407 } 408 ib_free_cq(ep->rep_attr.recv_cq); 409 ep->rep_attr.recv_cq = NULL; 410 ib_free_cq(ep->rep_attr.send_cq); 411 ep->rep_attr.send_cq = NULL; 412 413 /* The ULP is responsible for ensuring all DMA 414 * mappings and MRs are gone. 415 */ 416 rpcrdma_reps_unmap(r_xprt); 417 rpcrdma_reqs_reset(r_xprt); 418 rpcrdma_mrs_destroy(r_xprt); 419 rpcrdma_sendctxs_destroy(r_xprt); 420 ib_dealloc_pd(ia->ri_pd); 421 ia->ri_pd = NULL; 422 423 /* Allow waiters to continue */ 424 complete(&ia->ri_remove_done); 425 426 trace_xprtrdma_remove(r_xprt); 427 } 428 429 /** 430 * rpcrdma_ia_close - Clean up/close an IA. 431 * @ia: interface adapter to close 432 * 433 */ 434 void 435 rpcrdma_ia_close(struct rpcrdma_ia *ia) 436 { 437 if (ia->ri_id != NULL && !IS_ERR(ia->ri_id)) { 438 if (ia->ri_id->qp) 439 rdma_destroy_qp(ia->ri_id); 440 rdma_destroy_id(ia->ri_id); 441 } 442 ia->ri_id = NULL; 443 444 /* If the pd is still busy, xprtrdma missed freeing a resource */ 445 if (ia->ri_pd && !IS_ERR(ia->ri_pd)) 446 ib_dealloc_pd(ia->ri_pd); 447 ia->ri_pd = NULL; 448 } 449 450 /** 451 * rpcrdma_ep_create - Create unconnected endpoint 452 * @r_xprt: transport to instantiate 453 * 454 * Returns zero on success, or a negative errno. 455 */ 456 int rpcrdma_ep_create(struct rpcrdma_xprt *r_xprt) 457 { 458 struct rpcrdma_ep *ep = &r_xprt->rx_ep; 459 struct rpcrdma_ia *ia = &r_xprt->rx_ia; 460 struct rpcrdma_connect_private *pmsg = &ep->rep_cm_private; 461 struct ib_cq *sendcq, *recvcq; 462 int rc; 463 464 ep->rep_max_requests = r_xprt->rx_xprt.max_reqs; 465 ep->rep_inline_send = xprt_rdma_max_inline_write; 466 ep->rep_inline_recv = xprt_rdma_max_inline_read; 467 468 rc = frwr_query_device(r_xprt, ia->ri_id->device); 469 if (rc) 470 return rc; 471 r_xprt->rx_buf.rb_max_requests = cpu_to_be32(ep->rep_max_requests); 472 473 ep->rep_attr.event_handler = rpcrdma_qp_event_handler; 474 ep->rep_attr.qp_context = ep; 475 ep->rep_attr.srq = NULL; 476 ep->rep_attr.cap.max_inline_data = 0; 477 ep->rep_attr.sq_sig_type = IB_SIGNAL_REQ_WR; 478 ep->rep_attr.qp_type = IB_QPT_RC; 479 ep->rep_attr.port_num = ~0; 480 481 dprintk("RPC: %s: requested max: dtos: send %d recv %d; " 482 "iovs: send %d recv %d\n", 483 __func__, 484 ep->rep_attr.cap.max_send_wr, 485 ep->rep_attr.cap.max_recv_wr, 486 ep->rep_attr.cap.max_send_sge, 487 ep->rep_attr.cap.max_recv_sge); 488 489 ep->rep_send_batch = ep->rep_max_requests >> 3; 490 ep->rep_send_count = ep->rep_send_batch; 491 init_waitqueue_head(&ep->rep_connect_wait); 492 ep->rep_receive_count = 0; 493 494 sendcq = ib_alloc_cq_any(ia->ri_id->device, r_xprt, 495 ep->rep_attr.cap.max_send_wr + 1, 496 IB_POLL_WORKQUEUE); 497 if (IS_ERR(sendcq)) { 498 rc = PTR_ERR(sendcq); 499 goto out1; 500 } 501 502 recvcq = ib_alloc_cq_any(ia->ri_id->device, NULL, 503 ep->rep_attr.cap.max_recv_wr + 1, 504 IB_POLL_WORKQUEUE); 505 if (IS_ERR(recvcq)) { 506 rc = PTR_ERR(recvcq); 507 goto out2; 508 } 509 510 ep->rep_attr.send_cq = sendcq; 511 ep->rep_attr.recv_cq = recvcq; 512 513 /* Initialize cma parameters */ 514 memset(&ep->rep_remote_cma, 0, sizeof(ep->rep_remote_cma)); 515 516 /* Prepare RDMA-CM private message */ 517 pmsg->cp_magic = rpcrdma_cmp_magic; 518 pmsg->cp_version = RPCRDMA_CMP_VERSION; 519 pmsg->cp_flags |= RPCRDMA_CMP_F_SND_W_INV_OK; 520 pmsg->cp_send_size = rpcrdma_encode_buffer_size(ep->rep_inline_send); 521 pmsg->cp_recv_size = rpcrdma_encode_buffer_size(ep->rep_inline_recv); 522 ep->rep_remote_cma.private_data = pmsg; 523 ep->rep_remote_cma.private_data_len = sizeof(*pmsg); 524 525 /* Client offers RDMA Read but does not initiate */ 526 ep->rep_remote_cma.initiator_depth = 0; 527 ep->rep_remote_cma.responder_resources = 528 min_t(int, U8_MAX, ia->ri_id->device->attrs.max_qp_rd_atom); 529 530 /* Limit transport retries so client can detect server 531 * GID changes quickly. RPC layer handles re-establishing 532 * transport connection and retransmission. 533 */ 534 ep->rep_remote_cma.retry_count = 6; 535 536 /* RPC-over-RDMA handles its own flow control. In addition, 537 * make all RNR NAKs visible so we know that RPC-over-RDMA 538 * flow control is working correctly (no NAKs should be seen). 539 */ 540 ep->rep_remote_cma.flow_control = 0; 541 ep->rep_remote_cma.rnr_retry_count = 0; 542 543 return 0; 544 545 out2: 546 ib_free_cq(sendcq); 547 out1: 548 return rc; 549 } 550 551 /** 552 * rpcrdma_ep_destroy - Disconnect and destroy endpoint. 553 * @r_xprt: transport instance to shut down 554 * 555 */ 556 void rpcrdma_ep_destroy(struct rpcrdma_xprt *r_xprt) 557 { 558 struct rpcrdma_ep *ep = &r_xprt->rx_ep; 559 struct rpcrdma_ia *ia = &r_xprt->rx_ia; 560 561 if (ia->ri_id && ia->ri_id->qp) { 562 rpcrdma_ep_disconnect(ep, ia); 563 rdma_destroy_qp(ia->ri_id); 564 ia->ri_id->qp = NULL; 565 } 566 567 if (ep->rep_attr.recv_cq) 568 ib_free_cq(ep->rep_attr.recv_cq); 569 if (ep->rep_attr.send_cq) 570 ib_free_cq(ep->rep_attr.send_cq); 571 } 572 573 /* Re-establish a connection after a device removal event. 574 * Unlike a normal reconnection, a fresh PD and a new set 575 * of MRs and buffers is needed. 576 */ 577 static int rpcrdma_ep_recreate_xprt(struct rpcrdma_xprt *r_xprt, 578 struct ib_qp_init_attr *qp_init_attr) 579 { 580 struct rpcrdma_ia *ia = &r_xprt->rx_ia; 581 struct rpcrdma_ep *ep = &r_xprt->rx_ep; 582 int rc, err; 583 584 trace_xprtrdma_reinsert(r_xprt); 585 586 rc = -EHOSTUNREACH; 587 if (rpcrdma_ia_open(r_xprt)) 588 goto out1; 589 590 rc = -ENOMEM; 591 err = rpcrdma_ep_create(r_xprt); 592 if (err) { 593 pr_err("rpcrdma: rpcrdma_ep_create returned %d\n", err); 594 goto out2; 595 } 596 memcpy(qp_init_attr, &ep->rep_attr, sizeof(*qp_init_attr)); 597 598 rc = -ENETUNREACH; 599 err = rdma_create_qp(ia->ri_id, ia->ri_pd, qp_init_attr); 600 if (err) { 601 pr_err("rpcrdma: rdma_create_qp returned %d\n", err); 602 goto out3; 603 } 604 return 0; 605 606 out3: 607 rpcrdma_ep_destroy(r_xprt); 608 out2: 609 rpcrdma_ia_close(ia); 610 out1: 611 return rc; 612 } 613 614 static int rpcrdma_ep_reconnect(struct rpcrdma_xprt *r_xprt, 615 struct ib_qp_init_attr *qp_init_attr) 616 { 617 struct rpcrdma_ia *ia = &r_xprt->rx_ia; 618 struct rdma_cm_id *id, *old; 619 int err, rc; 620 621 rpcrdma_ep_disconnect(&r_xprt->rx_ep, ia); 622 623 rc = -EHOSTUNREACH; 624 id = rpcrdma_create_id(r_xprt, ia); 625 if (IS_ERR(id)) 626 goto out; 627 628 /* As long as the new ID points to the same device as the 629 * old ID, we can reuse the transport's existing PD and all 630 * previously allocated MRs. Also, the same device means 631 * the transport's previous DMA mappings are still valid. 632 * 633 * This is a sanity check only. There should be no way these 634 * point to two different devices here. 635 */ 636 old = id; 637 rc = -ENETUNREACH; 638 if (ia->ri_id->device != id->device) { 639 pr_err("rpcrdma: can't reconnect on different device!\n"); 640 goto out_destroy; 641 } 642 643 err = rdma_create_qp(id, ia->ri_pd, qp_init_attr); 644 if (err) 645 goto out_destroy; 646 647 /* Atomically replace the transport's ID and QP. */ 648 rc = 0; 649 old = ia->ri_id; 650 ia->ri_id = id; 651 rdma_destroy_qp(old); 652 653 out_destroy: 654 rdma_destroy_id(old); 655 out: 656 return rc; 657 } 658 659 /* 660 * Connect unconnected endpoint. 661 */ 662 int 663 rpcrdma_ep_connect(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia) 664 { 665 struct rpcrdma_xprt *r_xprt = container_of(ia, struct rpcrdma_xprt, 666 rx_ia); 667 struct rpc_xprt *xprt = &r_xprt->rx_xprt; 668 struct ib_qp_init_attr qp_init_attr; 669 int rc; 670 671 retry: 672 memcpy(&qp_init_attr, &ep->rep_attr, sizeof(qp_init_attr)); 673 switch (ep->rep_connected) { 674 case 0: 675 rc = rdma_create_qp(ia->ri_id, ia->ri_pd, &qp_init_attr); 676 if (rc) { 677 rc = -ENETUNREACH; 678 goto out_noupdate; 679 } 680 break; 681 case -ENODEV: 682 rc = rpcrdma_ep_recreate_xprt(r_xprt, &qp_init_attr); 683 if (rc) 684 goto out_noupdate; 685 break; 686 default: 687 rc = rpcrdma_ep_reconnect(r_xprt, &qp_init_attr); 688 if (rc) 689 goto out; 690 } 691 692 ep->rep_connected = 0; 693 xprt_clear_connected(xprt); 694 695 rpcrdma_reset_cwnd(r_xprt); 696 rpcrdma_post_recvs(r_xprt, true); 697 698 rc = rpcrdma_sendctxs_create(r_xprt); 699 if (rc) 700 goto out; 701 702 rc = rdma_connect(ia->ri_id, &ep->rep_remote_cma); 703 if (rc) 704 goto out; 705 706 if (xprt->reestablish_timeout < RPCRDMA_INIT_REEST_TO) 707 xprt->reestablish_timeout = RPCRDMA_INIT_REEST_TO; 708 wait_event_interruptible(ep->rep_connect_wait, ep->rep_connected != 0); 709 if (ep->rep_connected <= 0) { 710 if (ep->rep_connected == -EAGAIN) 711 goto retry; 712 rc = ep->rep_connected; 713 goto out; 714 } 715 716 rc = rpcrdma_reqs_setup(r_xprt); 717 if (rc) { 718 rpcrdma_ep_disconnect(ep, ia); 719 goto out; 720 } 721 rpcrdma_mrs_create(r_xprt); 722 723 out: 724 if (rc) 725 ep->rep_connected = rc; 726 727 out_noupdate: 728 trace_xprtrdma_connect(r_xprt, rc); 729 return rc; 730 } 731 732 /** 733 * rpcrdma_ep_disconnect - Disconnect underlying transport 734 * @ep: endpoint to disconnect 735 * @ia: associated interface adapter 736 * 737 * Caller serializes. Either the transport send lock is held, 738 * or we're being called to destroy the transport. 739 */ 740 void 741 rpcrdma_ep_disconnect(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia) 742 { 743 struct rpcrdma_xprt *r_xprt = container_of(ep, struct rpcrdma_xprt, 744 rx_ep); 745 int rc; 746 747 /* returns without wait if ID is not connected */ 748 rc = rdma_disconnect(ia->ri_id); 749 if (!rc) 750 wait_event_interruptible(ep->rep_connect_wait, 751 ep->rep_connected != 1); 752 else 753 ep->rep_connected = rc; 754 trace_xprtrdma_disconnect(r_xprt, rc); 755 756 rpcrdma_xprt_drain(r_xprt); 757 rpcrdma_reqs_reset(r_xprt); 758 rpcrdma_mrs_destroy(r_xprt); 759 rpcrdma_sendctxs_destroy(r_xprt); 760 } 761 762 /* Fixed-size circular FIFO queue. This implementation is wait-free and 763 * lock-free. 764 * 765 * Consumer is the code path that posts Sends. This path dequeues a 766 * sendctx for use by a Send operation. Multiple consumer threads 767 * are serialized by the RPC transport lock, which allows only one 768 * ->send_request call at a time. 769 * 770 * Producer is the code path that handles Send completions. This path 771 * enqueues a sendctx that has been completed. Multiple producer 772 * threads are serialized by the ib_poll_cq() function. 773 */ 774 775 /* rpcrdma_sendctxs_destroy() assumes caller has already quiesced 776 * queue activity, and rpcrdma_xprt_drain has flushed all remaining 777 * Send requests. 778 */ 779 static void rpcrdma_sendctxs_destroy(struct rpcrdma_xprt *r_xprt) 780 { 781 struct rpcrdma_buffer *buf = &r_xprt->rx_buf; 782 unsigned long i; 783 784 if (!buf->rb_sc_ctxs) 785 return; 786 for (i = 0; i <= buf->rb_sc_last; i++) 787 kfree(buf->rb_sc_ctxs[i]); 788 kfree(buf->rb_sc_ctxs); 789 buf->rb_sc_ctxs = NULL; 790 } 791 792 static struct rpcrdma_sendctx *rpcrdma_sendctx_create(struct rpcrdma_ep *ep) 793 { 794 struct rpcrdma_sendctx *sc; 795 796 sc = kzalloc(struct_size(sc, sc_sges, ep->rep_attr.cap.max_send_sge), 797 GFP_KERNEL); 798 if (!sc) 799 return NULL; 800 801 sc->sc_cqe.done = rpcrdma_wc_send; 802 return sc; 803 } 804 805 static int rpcrdma_sendctxs_create(struct rpcrdma_xprt *r_xprt) 806 { 807 struct rpcrdma_buffer *buf = &r_xprt->rx_buf; 808 struct rpcrdma_sendctx *sc; 809 unsigned long i; 810 811 /* Maximum number of concurrent outstanding Send WRs. Capping 812 * the circular queue size stops Send Queue overflow by causing 813 * the ->send_request call to fail temporarily before too many 814 * Sends are posted. 815 */ 816 i = r_xprt->rx_ep.rep_max_requests + RPCRDMA_MAX_BC_REQUESTS; 817 buf->rb_sc_ctxs = kcalloc(i, sizeof(sc), GFP_KERNEL); 818 if (!buf->rb_sc_ctxs) 819 return -ENOMEM; 820 821 buf->rb_sc_last = i - 1; 822 for (i = 0; i <= buf->rb_sc_last; i++) { 823 sc = rpcrdma_sendctx_create(&r_xprt->rx_ep); 824 if (!sc) 825 return -ENOMEM; 826 827 buf->rb_sc_ctxs[i] = sc; 828 } 829 830 buf->rb_sc_head = 0; 831 buf->rb_sc_tail = 0; 832 return 0; 833 } 834 835 /* The sendctx queue is not guaranteed to have a size that is a 836 * power of two, thus the helpers in circ_buf.h cannot be used. 837 * The other option is to use modulus (%), which can be expensive. 838 */ 839 static unsigned long rpcrdma_sendctx_next(struct rpcrdma_buffer *buf, 840 unsigned long item) 841 { 842 return likely(item < buf->rb_sc_last) ? item + 1 : 0; 843 } 844 845 /** 846 * rpcrdma_sendctx_get_locked - Acquire a send context 847 * @r_xprt: controlling transport instance 848 * 849 * Returns pointer to a free send completion context; or NULL if 850 * the queue is empty. 851 * 852 * Usage: Called to acquire an SGE array before preparing a Send WR. 853 * 854 * The caller serializes calls to this function (per transport), and 855 * provides an effective memory barrier that flushes the new value 856 * of rb_sc_head. 857 */ 858 struct rpcrdma_sendctx *rpcrdma_sendctx_get_locked(struct rpcrdma_xprt *r_xprt) 859 { 860 struct rpcrdma_buffer *buf = &r_xprt->rx_buf; 861 struct rpcrdma_sendctx *sc; 862 unsigned long next_head; 863 864 next_head = rpcrdma_sendctx_next(buf, buf->rb_sc_head); 865 866 if (next_head == READ_ONCE(buf->rb_sc_tail)) 867 goto out_emptyq; 868 869 /* ORDER: item must be accessed _before_ head is updated */ 870 sc = buf->rb_sc_ctxs[next_head]; 871 872 /* Releasing the lock in the caller acts as a memory 873 * barrier that flushes rb_sc_head. 874 */ 875 buf->rb_sc_head = next_head; 876 877 return sc; 878 879 out_emptyq: 880 /* The queue is "empty" if there have not been enough Send 881 * completions recently. This is a sign the Send Queue is 882 * backing up. Cause the caller to pause and try again. 883 */ 884 xprt_wait_for_buffer_space(&r_xprt->rx_xprt); 885 r_xprt->rx_stats.empty_sendctx_q++; 886 return NULL; 887 } 888 889 /** 890 * rpcrdma_sendctx_put_locked - Release a send context 891 * @r_xprt: controlling transport instance 892 * @sc: send context to release 893 * 894 * Usage: Called from Send completion to return a sendctxt 895 * to the queue. 896 * 897 * The caller serializes calls to this function (per transport). 898 */ 899 static void rpcrdma_sendctx_put_locked(struct rpcrdma_xprt *r_xprt, 900 struct rpcrdma_sendctx *sc) 901 { 902 struct rpcrdma_buffer *buf = &r_xprt->rx_buf; 903 unsigned long next_tail; 904 905 /* Unmap SGEs of previously completed but unsignaled 906 * Sends by walking up the queue until @sc is found. 907 */ 908 next_tail = buf->rb_sc_tail; 909 do { 910 next_tail = rpcrdma_sendctx_next(buf, next_tail); 911 912 /* ORDER: item must be accessed _before_ tail is updated */ 913 rpcrdma_sendctx_unmap(buf->rb_sc_ctxs[next_tail]); 914 915 } while (buf->rb_sc_ctxs[next_tail] != sc); 916 917 /* Paired with READ_ONCE */ 918 smp_store_release(&buf->rb_sc_tail, next_tail); 919 920 xprt_write_space(&r_xprt->rx_xprt); 921 } 922 923 static void 924 rpcrdma_mrs_create(struct rpcrdma_xprt *r_xprt) 925 { 926 struct rpcrdma_buffer *buf = &r_xprt->rx_buf; 927 struct rpcrdma_ia *ia = &r_xprt->rx_ia; 928 unsigned int count; 929 930 for (count = 0; count < ia->ri_max_rdma_segs; count++) { 931 struct rpcrdma_mr *mr; 932 int rc; 933 934 mr = kzalloc(sizeof(*mr), GFP_NOFS); 935 if (!mr) 936 break; 937 938 rc = frwr_init_mr(ia, mr); 939 if (rc) { 940 kfree(mr); 941 break; 942 } 943 944 mr->mr_xprt = r_xprt; 945 946 spin_lock(&buf->rb_lock); 947 rpcrdma_mr_push(mr, &buf->rb_mrs); 948 list_add(&mr->mr_all, &buf->rb_all_mrs); 949 spin_unlock(&buf->rb_lock); 950 } 951 952 r_xprt->rx_stats.mrs_allocated += count; 953 trace_xprtrdma_createmrs(r_xprt, count); 954 } 955 956 static void 957 rpcrdma_mr_refresh_worker(struct work_struct *work) 958 { 959 struct rpcrdma_buffer *buf = container_of(work, struct rpcrdma_buffer, 960 rb_refresh_worker); 961 struct rpcrdma_xprt *r_xprt = container_of(buf, struct rpcrdma_xprt, 962 rx_buf); 963 964 rpcrdma_mrs_create(r_xprt); 965 xprt_write_space(&r_xprt->rx_xprt); 966 } 967 968 /** 969 * rpcrdma_mrs_refresh - Wake the MR refresh worker 970 * @r_xprt: controlling transport instance 971 * 972 */ 973 void rpcrdma_mrs_refresh(struct rpcrdma_xprt *r_xprt) 974 { 975 struct rpcrdma_buffer *buf = &r_xprt->rx_buf; 976 struct rpcrdma_ep *ep = &r_xprt->rx_ep; 977 978 /* If there is no underlying device, it's no use to 979 * wake the refresh worker. 980 */ 981 if (ep->rep_connected != -ENODEV) { 982 /* The work is scheduled on a WQ_MEM_RECLAIM 983 * workqueue in order to prevent MR allocation 984 * from recursing into NFS during direct reclaim. 985 */ 986 queue_work(xprtiod_workqueue, &buf->rb_refresh_worker); 987 } 988 } 989 990 /** 991 * rpcrdma_req_create - Allocate an rpcrdma_req object 992 * @r_xprt: controlling r_xprt 993 * @size: initial size, in bytes, of send and receive buffers 994 * @flags: GFP flags passed to memory allocators 995 * 996 * Returns an allocated and fully initialized rpcrdma_req or NULL. 997 */ 998 struct rpcrdma_req *rpcrdma_req_create(struct rpcrdma_xprt *r_xprt, size_t size, 999 gfp_t flags) 1000 { 1001 struct rpcrdma_buffer *buffer = &r_xprt->rx_buf; 1002 struct rpcrdma_req *req; 1003 1004 req = kzalloc(sizeof(*req), flags); 1005 if (req == NULL) 1006 goto out1; 1007 1008 req->rl_sendbuf = rpcrdma_regbuf_alloc(size, DMA_TO_DEVICE, flags); 1009 if (!req->rl_sendbuf) 1010 goto out2; 1011 1012 req->rl_recvbuf = rpcrdma_regbuf_alloc(size, DMA_NONE, flags); 1013 if (!req->rl_recvbuf) 1014 goto out3; 1015 1016 INIT_LIST_HEAD(&req->rl_free_mrs); 1017 INIT_LIST_HEAD(&req->rl_registered); 1018 spin_lock(&buffer->rb_lock); 1019 list_add(&req->rl_all, &buffer->rb_allreqs); 1020 spin_unlock(&buffer->rb_lock); 1021 return req; 1022 1023 out3: 1024 kfree(req->rl_sendbuf); 1025 out2: 1026 kfree(req); 1027 out1: 1028 return NULL; 1029 } 1030 1031 /** 1032 * rpcrdma_req_setup - Per-connection instance setup of an rpcrdma_req object 1033 * @r_xprt: controlling transport instance 1034 * @req: rpcrdma_req object to set up 1035 * 1036 * Returns zero on success, and a negative errno on failure. 1037 */ 1038 int rpcrdma_req_setup(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req) 1039 { 1040 struct rpcrdma_regbuf *rb; 1041 size_t maxhdrsize; 1042 1043 /* Compute maximum header buffer size in bytes */ 1044 maxhdrsize = rpcrdma_fixed_maxsz + 3 + 1045 r_xprt->rx_ia.ri_max_rdma_segs * rpcrdma_readchunk_maxsz; 1046 maxhdrsize *= sizeof(__be32); 1047 rb = rpcrdma_regbuf_alloc(__roundup_pow_of_two(maxhdrsize), 1048 DMA_TO_DEVICE, GFP_KERNEL); 1049 if (!rb) 1050 goto out; 1051 1052 if (!__rpcrdma_regbuf_dma_map(r_xprt, rb)) 1053 goto out_free; 1054 1055 req->rl_rdmabuf = rb; 1056 xdr_buf_init(&req->rl_hdrbuf, rdmab_data(rb), rdmab_length(rb)); 1057 return 0; 1058 1059 out_free: 1060 rpcrdma_regbuf_free(rb); 1061 out: 1062 return -ENOMEM; 1063 } 1064 1065 /* ASSUMPTION: the rb_allreqs list is stable for the duration, 1066 * and thus can be walked without holding rb_lock. Eg. the 1067 * caller is holding the transport send lock to exclude 1068 * device removal or disconnection. 1069 */ 1070 static int rpcrdma_reqs_setup(struct rpcrdma_xprt *r_xprt) 1071 { 1072 struct rpcrdma_buffer *buf = &r_xprt->rx_buf; 1073 struct rpcrdma_req *req; 1074 int rc; 1075 1076 list_for_each_entry(req, &buf->rb_allreqs, rl_all) { 1077 rc = rpcrdma_req_setup(r_xprt, req); 1078 if (rc) 1079 return rc; 1080 } 1081 return 0; 1082 } 1083 1084 static void rpcrdma_req_reset(struct rpcrdma_req *req) 1085 { 1086 /* Credits are valid for only one connection */ 1087 req->rl_slot.rq_cong = 0; 1088 1089 rpcrdma_regbuf_free(req->rl_rdmabuf); 1090 req->rl_rdmabuf = NULL; 1091 1092 rpcrdma_regbuf_dma_unmap(req->rl_sendbuf); 1093 rpcrdma_regbuf_dma_unmap(req->rl_recvbuf); 1094 } 1095 1096 /* ASSUMPTION: the rb_allreqs list is stable for the duration, 1097 * and thus can be walked without holding rb_lock. Eg. the 1098 * caller is holding the transport send lock to exclude 1099 * device removal or disconnection. 1100 */ 1101 static void rpcrdma_reqs_reset(struct rpcrdma_xprt *r_xprt) 1102 { 1103 struct rpcrdma_buffer *buf = &r_xprt->rx_buf; 1104 struct rpcrdma_req *req; 1105 1106 list_for_each_entry(req, &buf->rb_allreqs, rl_all) 1107 rpcrdma_req_reset(req); 1108 } 1109 1110 /* No locking needed here. This function is called only by the 1111 * Receive completion handler. 1112 */ 1113 static noinline 1114 struct rpcrdma_rep *rpcrdma_rep_create(struct rpcrdma_xprt *r_xprt, 1115 bool temp) 1116 { 1117 struct rpcrdma_rep *rep; 1118 1119 rep = kzalloc(sizeof(*rep), GFP_KERNEL); 1120 if (rep == NULL) 1121 goto out; 1122 1123 rep->rr_rdmabuf = rpcrdma_regbuf_alloc(r_xprt->rx_ep.rep_inline_recv, 1124 DMA_FROM_DEVICE, GFP_KERNEL); 1125 if (!rep->rr_rdmabuf) 1126 goto out_free; 1127 1128 if (!rpcrdma_regbuf_dma_map(r_xprt, rep->rr_rdmabuf)) 1129 goto out_free_regbuf; 1130 1131 xdr_buf_init(&rep->rr_hdrbuf, rdmab_data(rep->rr_rdmabuf), 1132 rdmab_length(rep->rr_rdmabuf)); 1133 rep->rr_cqe.done = rpcrdma_wc_receive; 1134 rep->rr_rxprt = r_xprt; 1135 rep->rr_recv_wr.next = NULL; 1136 rep->rr_recv_wr.wr_cqe = &rep->rr_cqe; 1137 rep->rr_recv_wr.sg_list = &rep->rr_rdmabuf->rg_iov; 1138 rep->rr_recv_wr.num_sge = 1; 1139 rep->rr_temp = temp; 1140 list_add(&rep->rr_all, &r_xprt->rx_buf.rb_all_reps); 1141 return rep; 1142 1143 out_free_regbuf: 1144 rpcrdma_regbuf_free(rep->rr_rdmabuf); 1145 out_free: 1146 kfree(rep); 1147 out: 1148 return NULL; 1149 } 1150 1151 /* No locking needed here. This function is invoked only by the 1152 * Receive completion handler, or during transport shutdown. 1153 */ 1154 static void rpcrdma_rep_destroy(struct rpcrdma_rep *rep) 1155 { 1156 list_del(&rep->rr_all); 1157 rpcrdma_regbuf_free(rep->rr_rdmabuf); 1158 kfree(rep); 1159 } 1160 1161 static struct rpcrdma_rep *rpcrdma_rep_get_locked(struct rpcrdma_buffer *buf) 1162 { 1163 struct llist_node *node; 1164 1165 /* Calls to llist_del_first are required to be serialized */ 1166 node = llist_del_first(&buf->rb_free_reps); 1167 if (!node) 1168 return NULL; 1169 return llist_entry(node, struct rpcrdma_rep, rr_node); 1170 } 1171 1172 static void rpcrdma_rep_put(struct rpcrdma_buffer *buf, 1173 struct rpcrdma_rep *rep) 1174 { 1175 llist_add(&rep->rr_node, &buf->rb_free_reps); 1176 } 1177 1178 static void rpcrdma_reps_unmap(struct rpcrdma_xprt *r_xprt) 1179 { 1180 struct rpcrdma_buffer *buf = &r_xprt->rx_buf; 1181 struct rpcrdma_rep *rep; 1182 1183 list_for_each_entry(rep, &buf->rb_all_reps, rr_all) { 1184 rpcrdma_regbuf_dma_unmap(rep->rr_rdmabuf); 1185 rep->rr_temp = true; 1186 } 1187 } 1188 1189 static void rpcrdma_reps_destroy(struct rpcrdma_buffer *buf) 1190 { 1191 struct rpcrdma_rep *rep; 1192 1193 while ((rep = rpcrdma_rep_get_locked(buf)) != NULL) 1194 rpcrdma_rep_destroy(rep); 1195 } 1196 1197 /** 1198 * rpcrdma_buffer_create - Create initial set of req/rep objects 1199 * @r_xprt: transport instance to (re)initialize 1200 * 1201 * Returns zero on success, otherwise a negative errno. 1202 */ 1203 int rpcrdma_buffer_create(struct rpcrdma_xprt *r_xprt) 1204 { 1205 struct rpcrdma_buffer *buf = &r_xprt->rx_buf; 1206 int i, rc; 1207 1208 buf->rb_bc_srv_max_requests = 0; 1209 spin_lock_init(&buf->rb_lock); 1210 INIT_LIST_HEAD(&buf->rb_mrs); 1211 INIT_LIST_HEAD(&buf->rb_all_mrs); 1212 INIT_WORK(&buf->rb_refresh_worker, rpcrdma_mr_refresh_worker); 1213 1214 INIT_LIST_HEAD(&buf->rb_send_bufs); 1215 INIT_LIST_HEAD(&buf->rb_allreqs); 1216 INIT_LIST_HEAD(&buf->rb_all_reps); 1217 1218 rc = -ENOMEM; 1219 for (i = 0; i < r_xprt->rx_xprt.max_reqs; i++) { 1220 struct rpcrdma_req *req; 1221 1222 req = rpcrdma_req_create(r_xprt, RPCRDMA_V1_DEF_INLINE_SIZE * 2, 1223 GFP_KERNEL); 1224 if (!req) 1225 goto out; 1226 list_add(&req->rl_list, &buf->rb_send_bufs); 1227 } 1228 1229 init_llist_head(&buf->rb_free_reps); 1230 1231 return 0; 1232 out: 1233 rpcrdma_buffer_destroy(buf); 1234 return rc; 1235 } 1236 1237 /** 1238 * rpcrdma_req_destroy - Destroy an rpcrdma_req object 1239 * @req: unused object to be destroyed 1240 * 1241 * Relies on caller holding the transport send lock to protect 1242 * removing req->rl_all from buf->rb_all_reqs safely. 1243 */ 1244 void rpcrdma_req_destroy(struct rpcrdma_req *req) 1245 { 1246 struct rpcrdma_mr *mr; 1247 1248 list_del(&req->rl_all); 1249 1250 while ((mr = rpcrdma_mr_pop(&req->rl_free_mrs))) { 1251 struct rpcrdma_buffer *buf = &mr->mr_xprt->rx_buf; 1252 1253 spin_lock(&buf->rb_lock); 1254 list_del(&mr->mr_all); 1255 spin_unlock(&buf->rb_lock); 1256 1257 frwr_release_mr(mr); 1258 } 1259 1260 rpcrdma_regbuf_free(req->rl_recvbuf); 1261 rpcrdma_regbuf_free(req->rl_sendbuf); 1262 rpcrdma_regbuf_free(req->rl_rdmabuf); 1263 kfree(req); 1264 } 1265 1266 /** 1267 * rpcrdma_mrs_destroy - Release all of a transport's MRs 1268 * @r_xprt: controlling transport instance 1269 * 1270 * Relies on caller holding the transport send lock to protect 1271 * removing mr->mr_list from req->rl_free_mrs safely. 1272 */ 1273 static void rpcrdma_mrs_destroy(struct rpcrdma_xprt *r_xprt) 1274 { 1275 struct rpcrdma_buffer *buf = &r_xprt->rx_buf; 1276 struct rpcrdma_mr *mr; 1277 1278 cancel_work_sync(&buf->rb_refresh_worker); 1279 1280 spin_lock(&buf->rb_lock); 1281 while ((mr = list_first_entry_or_null(&buf->rb_all_mrs, 1282 struct rpcrdma_mr, 1283 mr_all)) != NULL) { 1284 list_del(&mr->mr_list); 1285 list_del(&mr->mr_all); 1286 spin_unlock(&buf->rb_lock); 1287 1288 frwr_release_mr(mr); 1289 1290 spin_lock(&buf->rb_lock); 1291 } 1292 spin_unlock(&buf->rb_lock); 1293 } 1294 1295 /** 1296 * rpcrdma_buffer_destroy - Release all hw resources 1297 * @buf: root control block for resources 1298 * 1299 * ORDERING: relies on a prior rpcrdma_xprt_drain : 1300 * - No more Send or Receive completions can occur 1301 * - All MRs, reps, and reqs are returned to their free lists 1302 */ 1303 void 1304 rpcrdma_buffer_destroy(struct rpcrdma_buffer *buf) 1305 { 1306 rpcrdma_reps_destroy(buf); 1307 1308 while (!list_empty(&buf->rb_send_bufs)) { 1309 struct rpcrdma_req *req; 1310 1311 req = list_first_entry(&buf->rb_send_bufs, 1312 struct rpcrdma_req, rl_list); 1313 list_del(&req->rl_list); 1314 rpcrdma_req_destroy(req); 1315 } 1316 } 1317 1318 /** 1319 * rpcrdma_mr_get - Allocate an rpcrdma_mr object 1320 * @r_xprt: controlling transport 1321 * 1322 * Returns an initialized rpcrdma_mr or NULL if no free 1323 * rpcrdma_mr objects are available. 1324 */ 1325 struct rpcrdma_mr * 1326 rpcrdma_mr_get(struct rpcrdma_xprt *r_xprt) 1327 { 1328 struct rpcrdma_buffer *buf = &r_xprt->rx_buf; 1329 struct rpcrdma_mr *mr; 1330 1331 spin_lock(&buf->rb_lock); 1332 mr = rpcrdma_mr_pop(&buf->rb_mrs); 1333 spin_unlock(&buf->rb_lock); 1334 return mr; 1335 } 1336 1337 /** 1338 * rpcrdma_mr_put - DMA unmap an MR and release it 1339 * @mr: MR to release 1340 * 1341 */ 1342 void rpcrdma_mr_put(struct rpcrdma_mr *mr) 1343 { 1344 struct rpcrdma_xprt *r_xprt = mr->mr_xprt; 1345 1346 if (mr->mr_dir != DMA_NONE) { 1347 trace_xprtrdma_mr_unmap(mr); 1348 ib_dma_unmap_sg(r_xprt->rx_ia.ri_id->device, 1349 mr->mr_sg, mr->mr_nents, mr->mr_dir); 1350 mr->mr_dir = DMA_NONE; 1351 } 1352 1353 rpcrdma_mr_push(mr, &mr->mr_req->rl_free_mrs); 1354 } 1355 1356 /** 1357 * rpcrdma_buffer_get - Get a request buffer 1358 * @buffers: Buffer pool from which to obtain a buffer 1359 * 1360 * Returns a fresh rpcrdma_req, or NULL if none are available. 1361 */ 1362 struct rpcrdma_req * 1363 rpcrdma_buffer_get(struct rpcrdma_buffer *buffers) 1364 { 1365 struct rpcrdma_req *req; 1366 1367 spin_lock(&buffers->rb_lock); 1368 req = list_first_entry_or_null(&buffers->rb_send_bufs, 1369 struct rpcrdma_req, rl_list); 1370 if (req) 1371 list_del_init(&req->rl_list); 1372 spin_unlock(&buffers->rb_lock); 1373 return req; 1374 } 1375 1376 /** 1377 * rpcrdma_buffer_put - Put request/reply buffers back into pool 1378 * @buffers: buffer pool 1379 * @req: object to return 1380 * 1381 */ 1382 void rpcrdma_buffer_put(struct rpcrdma_buffer *buffers, struct rpcrdma_req *req) 1383 { 1384 if (req->rl_reply) 1385 rpcrdma_rep_put(buffers, req->rl_reply); 1386 req->rl_reply = NULL; 1387 1388 spin_lock(&buffers->rb_lock); 1389 list_add(&req->rl_list, &buffers->rb_send_bufs); 1390 spin_unlock(&buffers->rb_lock); 1391 } 1392 1393 /** 1394 * rpcrdma_recv_buffer_put - Release rpcrdma_rep back to free list 1395 * @rep: rep to release 1396 * 1397 * Used after error conditions. 1398 */ 1399 void rpcrdma_recv_buffer_put(struct rpcrdma_rep *rep) 1400 { 1401 rpcrdma_rep_put(&rep->rr_rxprt->rx_buf, rep); 1402 } 1403 1404 /* Returns a pointer to a rpcrdma_regbuf object, or NULL. 1405 * 1406 * xprtrdma uses a regbuf for posting an outgoing RDMA SEND, or for 1407 * receiving the payload of RDMA RECV operations. During Long Calls 1408 * or Replies they may be registered externally via frwr_map. 1409 */ 1410 static struct rpcrdma_regbuf * 1411 rpcrdma_regbuf_alloc(size_t size, enum dma_data_direction direction, 1412 gfp_t flags) 1413 { 1414 struct rpcrdma_regbuf *rb; 1415 1416 rb = kmalloc(sizeof(*rb), flags); 1417 if (!rb) 1418 return NULL; 1419 rb->rg_data = kmalloc(size, flags); 1420 if (!rb->rg_data) { 1421 kfree(rb); 1422 return NULL; 1423 } 1424 1425 rb->rg_device = NULL; 1426 rb->rg_direction = direction; 1427 rb->rg_iov.length = size; 1428 return rb; 1429 } 1430 1431 /** 1432 * rpcrdma_regbuf_realloc - re-allocate a SEND/RECV buffer 1433 * @rb: regbuf to reallocate 1434 * @size: size of buffer to be allocated, in bytes 1435 * @flags: GFP flags 1436 * 1437 * Returns true if reallocation was successful. If false is 1438 * returned, @rb is left untouched. 1439 */ 1440 bool rpcrdma_regbuf_realloc(struct rpcrdma_regbuf *rb, size_t size, gfp_t flags) 1441 { 1442 void *buf; 1443 1444 buf = kmalloc(size, flags); 1445 if (!buf) 1446 return false; 1447 1448 rpcrdma_regbuf_dma_unmap(rb); 1449 kfree(rb->rg_data); 1450 1451 rb->rg_data = buf; 1452 rb->rg_iov.length = size; 1453 return true; 1454 } 1455 1456 /** 1457 * __rpcrdma_regbuf_dma_map - DMA-map a regbuf 1458 * @r_xprt: controlling transport instance 1459 * @rb: regbuf to be mapped 1460 * 1461 * Returns true if the buffer is now DMA mapped to @r_xprt's device 1462 */ 1463 bool __rpcrdma_regbuf_dma_map(struct rpcrdma_xprt *r_xprt, 1464 struct rpcrdma_regbuf *rb) 1465 { 1466 struct ib_device *device = r_xprt->rx_ia.ri_id->device; 1467 1468 if (rb->rg_direction == DMA_NONE) 1469 return false; 1470 1471 rb->rg_iov.addr = ib_dma_map_single(device, rdmab_data(rb), 1472 rdmab_length(rb), rb->rg_direction); 1473 if (ib_dma_mapping_error(device, rdmab_addr(rb))) { 1474 trace_xprtrdma_dma_maperr(rdmab_addr(rb)); 1475 return false; 1476 } 1477 1478 rb->rg_device = device; 1479 rb->rg_iov.lkey = r_xprt->rx_ia.ri_pd->local_dma_lkey; 1480 return true; 1481 } 1482 1483 static void rpcrdma_regbuf_dma_unmap(struct rpcrdma_regbuf *rb) 1484 { 1485 if (!rb) 1486 return; 1487 1488 if (!rpcrdma_regbuf_is_mapped(rb)) 1489 return; 1490 1491 ib_dma_unmap_single(rb->rg_device, rdmab_addr(rb), rdmab_length(rb), 1492 rb->rg_direction); 1493 rb->rg_device = NULL; 1494 } 1495 1496 static void rpcrdma_regbuf_free(struct rpcrdma_regbuf *rb) 1497 { 1498 rpcrdma_regbuf_dma_unmap(rb); 1499 if (rb) 1500 kfree(rb->rg_data); 1501 kfree(rb); 1502 } 1503 1504 /** 1505 * rpcrdma_ep_post - Post WRs to a transport's Send Queue 1506 * @ia: transport's device information 1507 * @ep: transport's RDMA endpoint information 1508 * @req: rpcrdma_req containing the Send WR to post 1509 * 1510 * Returns 0 if the post was successful, otherwise -ENOTCONN 1511 * is returned. 1512 */ 1513 int 1514 rpcrdma_ep_post(struct rpcrdma_ia *ia, 1515 struct rpcrdma_ep *ep, 1516 struct rpcrdma_req *req) 1517 { 1518 struct ib_send_wr *send_wr = &req->rl_wr; 1519 int rc; 1520 1521 if (!ep->rep_send_count || kref_read(&req->rl_kref) > 1) { 1522 send_wr->send_flags |= IB_SEND_SIGNALED; 1523 ep->rep_send_count = ep->rep_send_batch; 1524 } else { 1525 send_wr->send_flags &= ~IB_SEND_SIGNALED; 1526 --ep->rep_send_count; 1527 } 1528 1529 rc = frwr_send(ia, req); 1530 trace_xprtrdma_post_send(req, rc); 1531 if (rc) 1532 return -ENOTCONN; 1533 return 0; 1534 } 1535 1536 /** 1537 * rpcrdma_post_recvs - Refill the Receive Queue 1538 * @r_xprt: controlling transport instance 1539 * @temp: mark Receive buffers to be deleted after use 1540 * 1541 */ 1542 void rpcrdma_post_recvs(struct rpcrdma_xprt *r_xprt, bool temp) 1543 { 1544 struct rpcrdma_buffer *buf = &r_xprt->rx_buf; 1545 struct rpcrdma_ep *ep = &r_xprt->rx_ep; 1546 struct ib_recv_wr *wr, *bad_wr; 1547 struct rpcrdma_rep *rep; 1548 int needed, count, rc; 1549 1550 rc = 0; 1551 count = 0; 1552 1553 needed = buf->rb_credits + (buf->rb_bc_srv_max_requests << 1); 1554 if (likely(ep->rep_receive_count > needed)) 1555 goto out; 1556 needed -= ep->rep_receive_count; 1557 if (!temp) 1558 needed += RPCRDMA_MAX_RECV_BATCH; 1559 1560 /* fast path: all needed reps can be found on the free list */ 1561 wr = NULL; 1562 while (needed) { 1563 rep = rpcrdma_rep_get_locked(buf); 1564 if (rep && rep->rr_temp) { 1565 rpcrdma_rep_destroy(rep); 1566 continue; 1567 } 1568 if (!rep) 1569 rep = rpcrdma_rep_create(r_xprt, temp); 1570 if (!rep) 1571 break; 1572 1573 trace_xprtrdma_post_recv(rep); 1574 rep->rr_recv_wr.next = wr; 1575 wr = &rep->rr_recv_wr; 1576 --needed; 1577 ++count; 1578 } 1579 if (!wr) 1580 goto out; 1581 1582 rc = ib_post_recv(r_xprt->rx_ia.ri_id->qp, wr, 1583 (const struct ib_recv_wr **)&bad_wr); 1584 out: 1585 trace_xprtrdma_post_recvs(r_xprt, count, rc); 1586 if (rc) { 1587 for (wr = bad_wr; wr;) { 1588 struct rpcrdma_rep *rep; 1589 1590 rep = container_of(wr, struct rpcrdma_rep, rr_recv_wr); 1591 wr = wr->next; 1592 rpcrdma_recv_buffer_put(rep); 1593 --count; 1594 } 1595 } 1596 ep->rep_receive_count += count; 1597 return; 1598 } 1599