1 // SPDX-License-Identifier: GPL-2.0 OR BSD-3-Clause 2 /* 3 * Copyright (c) 2014-2017 Oracle. All rights reserved. 4 * Copyright (c) 2003-2007 Network Appliance, Inc. All rights reserved. 5 * 6 * This software is available to you under a choice of one of two 7 * licenses. You may choose to be licensed under the terms of the GNU 8 * General Public License (GPL) Version 2, available from the file 9 * COPYING in the main directory of this source tree, or the BSD-type 10 * license below: 11 * 12 * Redistribution and use in source and binary forms, with or without 13 * modification, are permitted provided that the following conditions 14 * are met: 15 * 16 * Redistributions of source code must retain the above copyright 17 * notice, this list of conditions and the following disclaimer. 18 * 19 * Redistributions in binary form must reproduce the above 20 * copyright notice, this list of conditions and the following 21 * disclaimer in the documentation and/or other materials provided 22 * with the distribution. 23 * 24 * Neither the name of the Network Appliance, Inc. nor the names of 25 * its contributors may be used to endorse or promote products 26 * derived from this software without specific prior written 27 * permission. 28 * 29 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 30 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 31 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 32 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 33 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 34 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 35 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 36 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 37 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 38 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 39 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 40 */ 41 42 /* 43 * verbs.c 44 * 45 * Encapsulates the major functions managing: 46 * o adapters 47 * o endpoints 48 * o connections 49 * o buffer memory 50 */ 51 52 #include <linux/interrupt.h> 53 #include <linux/slab.h> 54 #include <linux/sunrpc/addr.h> 55 #include <linux/sunrpc/svc_rdma.h> 56 #include <linux/log2.h> 57 58 #include <asm-generic/barrier.h> 59 #include <asm/bitops.h> 60 61 #include <rdma/ib_cm.h> 62 63 #include "xprt_rdma.h" 64 #include <trace/events/rpcrdma.h> 65 66 /* 67 * Globals/Macros 68 */ 69 70 #if IS_ENABLED(CONFIG_SUNRPC_DEBUG) 71 # define RPCDBG_FACILITY RPCDBG_TRANS 72 #endif 73 74 /* 75 * internal functions 76 */ 77 static void rpcrdma_sendctx_put_locked(struct rpcrdma_sendctx *sc); 78 static void rpcrdma_reps_destroy(struct rpcrdma_buffer *buf); 79 static void rpcrdma_mrs_create(struct rpcrdma_xprt *r_xprt); 80 static void rpcrdma_mrs_destroy(struct rpcrdma_buffer *buf); 81 static void rpcrdma_mr_free(struct rpcrdma_mr *mr); 82 static struct rpcrdma_regbuf * 83 rpcrdma_regbuf_alloc(size_t size, enum dma_data_direction direction, 84 gfp_t flags); 85 static void rpcrdma_regbuf_dma_unmap(struct rpcrdma_regbuf *rb); 86 static void rpcrdma_regbuf_free(struct rpcrdma_regbuf *rb); 87 static void rpcrdma_post_recvs(struct rpcrdma_xprt *r_xprt, bool temp); 88 89 /* Wait for outstanding transport work to finish. ib_drain_qp 90 * handles the drains in the wrong order for us, so open code 91 * them here. 92 */ 93 static void rpcrdma_xprt_drain(struct rpcrdma_xprt *r_xprt) 94 { 95 struct rpcrdma_ia *ia = &r_xprt->rx_ia; 96 97 /* Flush Receives, then wait for deferred Reply work 98 * to complete. 99 */ 100 ib_drain_rq(ia->ri_id->qp); 101 102 /* Deferred Reply processing might have scheduled 103 * local invalidations. 104 */ 105 ib_drain_sq(ia->ri_id->qp); 106 } 107 108 /** 109 * rpcrdma_qp_event_handler - Handle one QP event (error notification) 110 * @event: details of the event 111 * @context: ep that owns QP where event occurred 112 * 113 * Called from the RDMA provider (device driver) possibly in an interrupt 114 * context. 115 */ 116 static void 117 rpcrdma_qp_event_handler(struct ib_event *event, void *context) 118 { 119 struct rpcrdma_ep *ep = context; 120 struct rpcrdma_xprt *r_xprt = container_of(ep, struct rpcrdma_xprt, 121 rx_ep); 122 123 trace_xprtrdma_qp_event(r_xprt, event); 124 } 125 126 /** 127 * rpcrdma_wc_send - Invoked by RDMA provider for each polled Send WC 128 * @cq: completion queue (ignored) 129 * @wc: completed WR 130 * 131 */ 132 static void 133 rpcrdma_wc_send(struct ib_cq *cq, struct ib_wc *wc) 134 { 135 struct ib_cqe *cqe = wc->wr_cqe; 136 struct rpcrdma_sendctx *sc = 137 container_of(cqe, struct rpcrdma_sendctx, sc_cqe); 138 139 /* WARNING: Only wr_cqe and status are reliable at this point */ 140 trace_xprtrdma_wc_send(sc, wc); 141 rpcrdma_sendctx_put_locked(sc); 142 } 143 144 /** 145 * rpcrdma_wc_receive - Invoked by RDMA provider for each polled Receive WC 146 * @cq: completion queue (ignored) 147 * @wc: completed WR 148 * 149 */ 150 static void 151 rpcrdma_wc_receive(struct ib_cq *cq, struct ib_wc *wc) 152 { 153 struct ib_cqe *cqe = wc->wr_cqe; 154 struct rpcrdma_rep *rep = container_of(cqe, struct rpcrdma_rep, 155 rr_cqe); 156 struct rpcrdma_xprt *r_xprt = rep->rr_rxprt; 157 158 /* WARNING: Only wr_cqe and status are reliable at this point */ 159 trace_xprtrdma_wc_receive(wc); 160 --r_xprt->rx_ep.rep_receive_count; 161 if (wc->status != IB_WC_SUCCESS) 162 goto out_flushed; 163 164 /* status == SUCCESS means all fields in wc are trustworthy */ 165 rpcrdma_set_xdrlen(&rep->rr_hdrbuf, wc->byte_len); 166 rep->rr_wc_flags = wc->wc_flags; 167 rep->rr_inv_rkey = wc->ex.invalidate_rkey; 168 169 ib_dma_sync_single_for_cpu(rdmab_device(rep->rr_rdmabuf), 170 rdmab_addr(rep->rr_rdmabuf), 171 wc->byte_len, DMA_FROM_DEVICE); 172 173 rpcrdma_post_recvs(r_xprt, false); 174 rpcrdma_reply_handler(rep); 175 return; 176 177 out_flushed: 178 rpcrdma_recv_buffer_put(rep); 179 } 180 181 static void 182 rpcrdma_update_connect_private(struct rpcrdma_xprt *r_xprt, 183 struct rdma_conn_param *param) 184 { 185 const struct rpcrdma_connect_private *pmsg = param->private_data; 186 unsigned int rsize, wsize; 187 188 /* Default settings for RPC-over-RDMA Version One */ 189 r_xprt->rx_ia.ri_implicit_roundup = xprt_rdma_pad_optimize; 190 rsize = RPCRDMA_V1_DEF_INLINE_SIZE; 191 wsize = RPCRDMA_V1_DEF_INLINE_SIZE; 192 193 if (pmsg && 194 pmsg->cp_magic == rpcrdma_cmp_magic && 195 pmsg->cp_version == RPCRDMA_CMP_VERSION) { 196 r_xprt->rx_ia.ri_implicit_roundup = true; 197 rsize = rpcrdma_decode_buffer_size(pmsg->cp_send_size); 198 wsize = rpcrdma_decode_buffer_size(pmsg->cp_recv_size); 199 } 200 201 if (rsize < r_xprt->rx_ep.rep_inline_recv) 202 r_xprt->rx_ep.rep_inline_recv = rsize; 203 if (wsize < r_xprt->rx_ep.rep_inline_send) 204 r_xprt->rx_ep.rep_inline_send = wsize; 205 dprintk("RPC: %s: max send %u, max recv %u\n", __func__, 206 r_xprt->rx_ep.rep_inline_send, 207 r_xprt->rx_ep.rep_inline_recv); 208 rpcrdma_set_max_header_sizes(r_xprt); 209 } 210 211 /** 212 * rpcrdma_cm_event_handler - Handle RDMA CM events 213 * @id: rdma_cm_id on which an event has occurred 214 * @event: details of the event 215 * 216 * Called with @id's mutex held. Returns 1 if caller should 217 * destroy @id, otherwise 0. 218 */ 219 static int 220 rpcrdma_cm_event_handler(struct rdma_cm_id *id, struct rdma_cm_event *event) 221 { 222 struct rpcrdma_xprt *r_xprt = id->context; 223 struct rpcrdma_ia *ia = &r_xprt->rx_ia; 224 struct rpcrdma_ep *ep = &r_xprt->rx_ep; 225 struct rpc_xprt *xprt = &r_xprt->rx_xprt; 226 227 might_sleep(); 228 229 trace_xprtrdma_cm_event(r_xprt, event); 230 switch (event->event) { 231 case RDMA_CM_EVENT_ADDR_RESOLVED: 232 case RDMA_CM_EVENT_ROUTE_RESOLVED: 233 ia->ri_async_rc = 0; 234 complete(&ia->ri_done); 235 return 0; 236 case RDMA_CM_EVENT_ADDR_ERROR: 237 ia->ri_async_rc = -EPROTO; 238 complete(&ia->ri_done); 239 return 0; 240 case RDMA_CM_EVENT_ROUTE_ERROR: 241 ia->ri_async_rc = -ENETUNREACH; 242 complete(&ia->ri_done); 243 return 0; 244 case RDMA_CM_EVENT_DEVICE_REMOVAL: 245 #if IS_ENABLED(CONFIG_SUNRPC_DEBUG) 246 pr_info("rpcrdma: removing device %s for %s:%s\n", 247 ia->ri_id->device->name, 248 rpcrdma_addrstr(r_xprt), rpcrdma_portstr(r_xprt)); 249 #endif 250 set_bit(RPCRDMA_IAF_REMOVING, &ia->ri_flags); 251 ep->rep_connected = -ENODEV; 252 xprt_force_disconnect(xprt); 253 wait_for_completion(&ia->ri_remove_done); 254 255 ia->ri_id = NULL; 256 /* Return 1 to ensure the core destroys the id. */ 257 return 1; 258 case RDMA_CM_EVENT_ESTABLISHED: 259 ++xprt->connect_cookie; 260 ep->rep_connected = 1; 261 rpcrdma_update_connect_private(r_xprt, &event->param.conn); 262 wake_up_all(&ep->rep_connect_wait); 263 break; 264 case RDMA_CM_EVENT_CONNECT_ERROR: 265 ep->rep_connected = -ENOTCONN; 266 goto disconnected; 267 case RDMA_CM_EVENT_UNREACHABLE: 268 ep->rep_connected = -ENETUNREACH; 269 goto disconnected; 270 case RDMA_CM_EVENT_REJECTED: 271 dprintk("rpcrdma: connection to %s:%s rejected: %s\n", 272 rpcrdma_addrstr(r_xprt), rpcrdma_portstr(r_xprt), 273 rdma_reject_msg(id, event->status)); 274 ep->rep_connected = -ECONNREFUSED; 275 if (event->status == IB_CM_REJ_STALE_CONN) 276 ep->rep_connected = -EAGAIN; 277 goto disconnected; 278 case RDMA_CM_EVENT_DISCONNECTED: 279 ep->rep_connected = -ECONNABORTED; 280 disconnected: 281 xprt_force_disconnect(xprt); 282 wake_up_all(&ep->rep_connect_wait); 283 break; 284 default: 285 break; 286 } 287 288 dprintk("RPC: %s: %s:%s on %s/frwr: %s\n", __func__, 289 rpcrdma_addrstr(r_xprt), rpcrdma_portstr(r_xprt), 290 ia->ri_id->device->name, rdma_event_msg(event->event)); 291 return 0; 292 } 293 294 static struct rdma_cm_id * 295 rpcrdma_create_id(struct rpcrdma_xprt *xprt, struct rpcrdma_ia *ia) 296 { 297 unsigned long wtimeout = msecs_to_jiffies(RDMA_RESOLVE_TIMEOUT) + 1; 298 struct rdma_cm_id *id; 299 int rc; 300 301 trace_xprtrdma_conn_start(xprt); 302 303 init_completion(&ia->ri_done); 304 init_completion(&ia->ri_remove_done); 305 306 id = rdma_create_id(xprt->rx_xprt.xprt_net, rpcrdma_cm_event_handler, 307 xprt, RDMA_PS_TCP, IB_QPT_RC); 308 if (IS_ERR(id)) 309 return id; 310 311 ia->ri_async_rc = -ETIMEDOUT; 312 rc = rdma_resolve_addr(id, NULL, 313 (struct sockaddr *)&xprt->rx_xprt.addr, 314 RDMA_RESOLVE_TIMEOUT); 315 if (rc) 316 goto out; 317 rc = wait_for_completion_interruptible_timeout(&ia->ri_done, wtimeout); 318 if (rc < 0) { 319 trace_xprtrdma_conn_tout(xprt); 320 goto out; 321 } 322 323 rc = ia->ri_async_rc; 324 if (rc) 325 goto out; 326 327 ia->ri_async_rc = -ETIMEDOUT; 328 rc = rdma_resolve_route(id, RDMA_RESOLVE_TIMEOUT); 329 if (rc) 330 goto out; 331 rc = wait_for_completion_interruptible_timeout(&ia->ri_done, wtimeout); 332 if (rc < 0) { 333 trace_xprtrdma_conn_tout(xprt); 334 goto out; 335 } 336 rc = ia->ri_async_rc; 337 if (rc) 338 goto out; 339 340 return id; 341 342 out: 343 rdma_destroy_id(id); 344 return ERR_PTR(rc); 345 } 346 347 /* 348 * Exported functions. 349 */ 350 351 /** 352 * rpcrdma_ia_open - Open and initialize an Interface Adapter. 353 * @xprt: transport with IA to (re)initialize 354 * 355 * Returns 0 on success, negative errno if an appropriate 356 * Interface Adapter could not be found and opened. 357 */ 358 int 359 rpcrdma_ia_open(struct rpcrdma_xprt *xprt) 360 { 361 struct rpcrdma_ia *ia = &xprt->rx_ia; 362 int rc; 363 364 ia->ri_id = rpcrdma_create_id(xprt, ia); 365 if (IS_ERR(ia->ri_id)) { 366 rc = PTR_ERR(ia->ri_id); 367 goto out_err; 368 } 369 370 ia->ri_pd = ib_alloc_pd(ia->ri_id->device, 0); 371 if (IS_ERR(ia->ri_pd)) { 372 rc = PTR_ERR(ia->ri_pd); 373 pr_err("rpcrdma: ib_alloc_pd() returned %d\n", rc); 374 goto out_err; 375 } 376 377 switch (xprt_rdma_memreg_strategy) { 378 case RPCRDMA_FRWR: 379 if (frwr_is_supported(ia->ri_id->device)) 380 break; 381 /*FALLTHROUGH*/ 382 default: 383 pr_err("rpcrdma: Device %s does not support memreg mode %d\n", 384 ia->ri_id->device->name, xprt_rdma_memreg_strategy); 385 rc = -EINVAL; 386 goto out_err; 387 } 388 389 return 0; 390 391 out_err: 392 rpcrdma_ia_close(ia); 393 return rc; 394 } 395 396 /** 397 * rpcrdma_ia_remove - Handle device driver unload 398 * @ia: interface adapter being removed 399 * 400 * Divest transport H/W resources associated with this adapter, 401 * but allow it to be restored later. 402 */ 403 void 404 rpcrdma_ia_remove(struct rpcrdma_ia *ia) 405 { 406 struct rpcrdma_xprt *r_xprt = container_of(ia, struct rpcrdma_xprt, 407 rx_ia); 408 struct rpcrdma_ep *ep = &r_xprt->rx_ep; 409 struct rpcrdma_buffer *buf = &r_xprt->rx_buf; 410 struct rpcrdma_req *req; 411 412 cancel_work_sync(&buf->rb_refresh_worker); 413 414 /* This is similar to rpcrdma_ep_destroy, but: 415 * - Don't cancel the connect worker. 416 * - Don't call rpcrdma_ep_disconnect, which waits 417 * for another conn upcall, which will deadlock. 418 * - rdma_disconnect is unneeded, the underlying 419 * connection is already gone. 420 */ 421 if (ia->ri_id->qp) { 422 rpcrdma_xprt_drain(r_xprt); 423 rdma_destroy_qp(ia->ri_id); 424 ia->ri_id->qp = NULL; 425 } 426 ib_free_cq(ep->rep_attr.recv_cq); 427 ep->rep_attr.recv_cq = NULL; 428 ib_free_cq(ep->rep_attr.send_cq); 429 ep->rep_attr.send_cq = NULL; 430 431 /* The ULP is responsible for ensuring all DMA 432 * mappings and MRs are gone. 433 */ 434 rpcrdma_reps_destroy(buf); 435 list_for_each_entry(req, &buf->rb_allreqs, rl_all) { 436 rpcrdma_regbuf_dma_unmap(req->rl_rdmabuf); 437 rpcrdma_regbuf_dma_unmap(req->rl_sendbuf); 438 rpcrdma_regbuf_dma_unmap(req->rl_recvbuf); 439 } 440 rpcrdma_mrs_destroy(buf); 441 ib_dealloc_pd(ia->ri_pd); 442 ia->ri_pd = NULL; 443 444 /* Allow waiters to continue */ 445 complete(&ia->ri_remove_done); 446 447 trace_xprtrdma_remove(r_xprt); 448 } 449 450 /** 451 * rpcrdma_ia_close - Clean up/close an IA. 452 * @ia: interface adapter to close 453 * 454 */ 455 void 456 rpcrdma_ia_close(struct rpcrdma_ia *ia) 457 { 458 if (ia->ri_id != NULL && !IS_ERR(ia->ri_id)) { 459 if (ia->ri_id->qp) 460 rdma_destroy_qp(ia->ri_id); 461 rdma_destroy_id(ia->ri_id); 462 } 463 ia->ri_id = NULL; 464 465 /* If the pd is still busy, xprtrdma missed freeing a resource */ 466 if (ia->ri_pd && !IS_ERR(ia->ri_pd)) 467 ib_dealloc_pd(ia->ri_pd); 468 ia->ri_pd = NULL; 469 } 470 471 /** 472 * rpcrdma_ep_create - Create unconnected endpoint 473 * @r_xprt: transport to instantiate 474 * 475 * Returns zero on success, or a negative errno. 476 */ 477 int rpcrdma_ep_create(struct rpcrdma_xprt *r_xprt) 478 { 479 struct rpcrdma_ep *ep = &r_xprt->rx_ep; 480 struct rpcrdma_ia *ia = &r_xprt->rx_ia; 481 struct rpcrdma_connect_private *pmsg = &ep->rep_cm_private; 482 struct ib_cq *sendcq, *recvcq; 483 unsigned int max_sge; 484 int rc; 485 486 ep->rep_max_requests = xprt_rdma_slot_table_entries; 487 ep->rep_inline_send = xprt_rdma_max_inline_write; 488 ep->rep_inline_recv = xprt_rdma_max_inline_read; 489 490 max_sge = min_t(unsigned int, ia->ri_id->device->attrs.max_send_sge, 491 RPCRDMA_MAX_SEND_SGES); 492 if (max_sge < RPCRDMA_MIN_SEND_SGES) { 493 pr_warn("rpcrdma: HCA provides only %d send SGEs\n", max_sge); 494 return -ENOMEM; 495 } 496 ia->ri_max_send_sges = max_sge; 497 498 rc = frwr_open(ia, ep); 499 if (rc) 500 return rc; 501 502 ep->rep_attr.event_handler = rpcrdma_qp_event_handler; 503 ep->rep_attr.qp_context = ep; 504 ep->rep_attr.srq = NULL; 505 ep->rep_attr.cap.max_send_sge = max_sge; 506 ep->rep_attr.cap.max_recv_sge = 1; 507 ep->rep_attr.cap.max_inline_data = 0; 508 ep->rep_attr.sq_sig_type = IB_SIGNAL_REQ_WR; 509 ep->rep_attr.qp_type = IB_QPT_RC; 510 ep->rep_attr.port_num = ~0; 511 512 dprintk("RPC: %s: requested max: dtos: send %d recv %d; " 513 "iovs: send %d recv %d\n", 514 __func__, 515 ep->rep_attr.cap.max_send_wr, 516 ep->rep_attr.cap.max_recv_wr, 517 ep->rep_attr.cap.max_send_sge, 518 ep->rep_attr.cap.max_recv_sge); 519 520 ep->rep_send_batch = ep->rep_max_requests >> 3; 521 ep->rep_send_count = ep->rep_send_batch; 522 init_waitqueue_head(&ep->rep_connect_wait); 523 ep->rep_receive_count = 0; 524 525 sendcq = ib_alloc_cq_any(ia->ri_id->device, NULL, 526 ep->rep_attr.cap.max_send_wr + 1, 527 IB_POLL_WORKQUEUE); 528 if (IS_ERR(sendcq)) { 529 rc = PTR_ERR(sendcq); 530 goto out1; 531 } 532 533 recvcq = ib_alloc_cq_any(ia->ri_id->device, NULL, 534 ep->rep_attr.cap.max_recv_wr + 1, 535 IB_POLL_WORKQUEUE); 536 if (IS_ERR(recvcq)) { 537 rc = PTR_ERR(recvcq); 538 goto out2; 539 } 540 541 ep->rep_attr.send_cq = sendcq; 542 ep->rep_attr.recv_cq = recvcq; 543 544 /* Initialize cma parameters */ 545 memset(&ep->rep_remote_cma, 0, sizeof(ep->rep_remote_cma)); 546 547 /* Prepare RDMA-CM private message */ 548 pmsg->cp_magic = rpcrdma_cmp_magic; 549 pmsg->cp_version = RPCRDMA_CMP_VERSION; 550 pmsg->cp_flags |= RPCRDMA_CMP_F_SND_W_INV_OK; 551 pmsg->cp_send_size = rpcrdma_encode_buffer_size(ep->rep_inline_send); 552 pmsg->cp_recv_size = rpcrdma_encode_buffer_size(ep->rep_inline_recv); 553 ep->rep_remote_cma.private_data = pmsg; 554 ep->rep_remote_cma.private_data_len = sizeof(*pmsg); 555 556 /* Client offers RDMA Read but does not initiate */ 557 ep->rep_remote_cma.initiator_depth = 0; 558 ep->rep_remote_cma.responder_resources = 559 min_t(int, U8_MAX, ia->ri_id->device->attrs.max_qp_rd_atom); 560 561 /* Limit transport retries so client can detect server 562 * GID changes quickly. RPC layer handles re-establishing 563 * transport connection and retransmission. 564 */ 565 ep->rep_remote_cma.retry_count = 6; 566 567 /* RPC-over-RDMA handles its own flow control. In addition, 568 * make all RNR NAKs visible so we know that RPC-over-RDMA 569 * flow control is working correctly (no NAKs should be seen). 570 */ 571 ep->rep_remote_cma.flow_control = 0; 572 ep->rep_remote_cma.rnr_retry_count = 0; 573 574 return 0; 575 576 out2: 577 ib_free_cq(sendcq); 578 out1: 579 return rc; 580 } 581 582 /** 583 * rpcrdma_ep_destroy - Disconnect and destroy endpoint. 584 * @r_xprt: transport instance to shut down 585 * 586 */ 587 void rpcrdma_ep_destroy(struct rpcrdma_xprt *r_xprt) 588 { 589 struct rpcrdma_ep *ep = &r_xprt->rx_ep; 590 struct rpcrdma_ia *ia = &r_xprt->rx_ia; 591 592 if (ia->ri_id && ia->ri_id->qp) { 593 rpcrdma_ep_disconnect(ep, ia); 594 rdma_destroy_qp(ia->ri_id); 595 ia->ri_id->qp = NULL; 596 } 597 598 if (ep->rep_attr.recv_cq) 599 ib_free_cq(ep->rep_attr.recv_cq); 600 if (ep->rep_attr.send_cq) 601 ib_free_cq(ep->rep_attr.send_cq); 602 } 603 604 /* Re-establish a connection after a device removal event. 605 * Unlike a normal reconnection, a fresh PD and a new set 606 * of MRs and buffers is needed. 607 */ 608 static int rpcrdma_ep_recreate_xprt(struct rpcrdma_xprt *r_xprt, 609 struct ib_qp_init_attr *qp_init_attr) 610 { 611 struct rpcrdma_ia *ia = &r_xprt->rx_ia; 612 int rc, err; 613 614 trace_xprtrdma_reinsert(r_xprt); 615 616 rc = -EHOSTUNREACH; 617 if (rpcrdma_ia_open(r_xprt)) 618 goto out1; 619 620 rc = -ENOMEM; 621 err = rpcrdma_ep_create(r_xprt); 622 if (err) { 623 pr_err("rpcrdma: rpcrdma_ep_create returned %d\n", err); 624 goto out2; 625 } 626 627 rc = -ENETUNREACH; 628 err = rdma_create_qp(ia->ri_id, ia->ri_pd, qp_init_attr); 629 if (err) { 630 pr_err("rpcrdma: rdma_create_qp returned %d\n", err); 631 goto out3; 632 } 633 634 rpcrdma_mrs_create(r_xprt); 635 return 0; 636 637 out3: 638 rpcrdma_ep_destroy(r_xprt); 639 out2: 640 rpcrdma_ia_close(ia); 641 out1: 642 return rc; 643 } 644 645 static int rpcrdma_ep_reconnect(struct rpcrdma_xprt *r_xprt, 646 struct ib_qp_init_attr *qp_init_attr) 647 { 648 struct rpcrdma_ia *ia = &r_xprt->rx_ia; 649 struct rdma_cm_id *id, *old; 650 int err, rc; 651 652 trace_xprtrdma_reconnect(r_xprt); 653 654 rpcrdma_ep_disconnect(&r_xprt->rx_ep, ia); 655 656 rc = -EHOSTUNREACH; 657 id = rpcrdma_create_id(r_xprt, ia); 658 if (IS_ERR(id)) 659 goto out; 660 661 /* As long as the new ID points to the same device as the 662 * old ID, we can reuse the transport's existing PD and all 663 * previously allocated MRs. Also, the same device means 664 * the transport's previous DMA mappings are still valid. 665 * 666 * This is a sanity check only. There should be no way these 667 * point to two different devices here. 668 */ 669 old = id; 670 rc = -ENETUNREACH; 671 if (ia->ri_id->device != id->device) { 672 pr_err("rpcrdma: can't reconnect on different device!\n"); 673 goto out_destroy; 674 } 675 676 err = rdma_create_qp(id, ia->ri_pd, qp_init_attr); 677 if (err) 678 goto out_destroy; 679 680 /* Atomically replace the transport's ID and QP. */ 681 rc = 0; 682 old = ia->ri_id; 683 ia->ri_id = id; 684 rdma_destroy_qp(old); 685 686 out_destroy: 687 rdma_destroy_id(old); 688 out: 689 return rc; 690 } 691 692 /* 693 * Connect unconnected endpoint. 694 */ 695 int 696 rpcrdma_ep_connect(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia) 697 { 698 struct rpcrdma_xprt *r_xprt = container_of(ia, struct rpcrdma_xprt, 699 rx_ia); 700 struct rpc_xprt *xprt = &r_xprt->rx_xprt; 701 struct ib_qp_init_attr qp_init_attr; 702 int rc; 703 704 retry: 705 memcpy(&qp_init_attr, &ep->rep_attr, sizeof(qp_init_attr)); 706 switch (ep->rep_connected) { 707 case 0: 708 dprintk("RPC: %s: connecting...\n", __func__); 709 rc = rdma_create_qp(ia->ri_id, ia->ri_pd, &qp_init_attr); 710 if (rc) { 711 rc = -ENETUNREACH; 712 goto out_noupdate; 713 } 714 break; 715 case -ENODEV: 716 rc = rpcrdma_ep_recreate_xprt(r_xprt, &qp_init_attr); 717 if (rc) 718 goto out_noupdate; 719 break; 720 default: 721 rc = rpcrdma_ep_reconnect(r_xprt, &qp_init_attr); 722 if (rc) 723 goto out; 724 } 725 726 ep->rep_connected = 0; 727 xprt_clear_connected(xprt); 728 729 rpcrdma_post_recvs(r_xprt, true); 730 731 rc = rdma_connect(ia->ri_id, &ep->rep_remote_cma); 732 if (rc) 733 goto out; 734 735 if (xprt->reestablish_timeout < RPCRDMA_INIT_REEST_TO) 736 xprt->reestablish_timeout = RPCRDMA_INIT_REEST_TO; 737 wait_event_interruptible(ep->rep_connect_wait, ep->rep_connected != 0); 738 if (ep->rep_connected <= 0) { 739 if (ep->rep_connected == -EAGAIN) 740 goto retry; 741 rc = ep->rep_connected; 742 goto out; 743 } 744 745 dprintk("RPC: %s: connected\n", __func__); 746 747 out: 748 if (rc) 749 ep->rep_connected = rc; 750 751 out_noupdate: 752 return rc; 753 } 754 755 /** 756 * rpcrdma_ep_disconnect - Disconnect underlying transport 757 * @ep: endpoint to disconnect 758 * @ia: associated interface adapter 759 * 760 * This is separate from destroy to facilitate the ability 761 * to reconnect without recreating the endpoint. 762 * 763 * This call is not reentrant, and must not be made in parallel 764 * on the same endpoint. 765 */ 766 void 767 rpcrdma_ep_disconnect(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia) 768 { 769 struct rpcrdma_xprt *r_xprt = container_of(ep, struct rpcrdma_xprt, 770 rx_ep); 771 int rc; 772 773 /* returns without wait if ID is not connected */ 774 rc = rdma_disconnect(ia->ri_id); 775 if (!rc) 776 wait_event_interruptible(ep->rep_connect_wait, 777 ep->rep_connected != 1); 778 else 779 ep->rep_connected = rc; 780 trace_xprtrdma_disconnect(r_xprt, rc); 781 782 rpcrdma_xprt_drain(r_xprt); 783 } 784 785 /* Fixed-size circular FIFO queue. This implementation is wait-free and 786 * lock-free. 787 * 788 * Consumer is the code path that posts Sends. This path dequeues a 789 * sendctx for use by a Send operation. Multiple consumer threads 790 * are serialized by the RPC transport lock, which allows only one 791 * ->send_request call at a time. 792 * 793 * Producer is the code path that handles Send completions. This path 794 * enqueues a sendctx that has been completed. Multiple producer 795 * threads are serialized by the ib_poll_cq() function. 796 */ 797 798 /* rpcrdma_sendctxs_destroy() assumes caller has already quiesced 799 * queue activity, and rpcrdma_xprt_drain has flushed all remaining 800 * Send requests. 801 */ 802 static void rpcrdma_sendctxs_destroy(struct rpcrdma_buffer *buf) 803 { 804 unsigned long i; 805 806 for (i = 0; i <= buf->rb_sc_last; i++) 807 kfree(buf->rb_sc_ctxs[i]); 808 kfree(buf->rb_sc_ctxs); 809 } 810 811 static struct rpcrdma_sendctx *rpcrdma_sendctx_create(struct rpcrdma_ia *ia) 812 { 813 struct rpcrdma_sendctx *sc; 814 815 sc = kzalloc(struct_size(sc, sc_sges, ia->ri_max_send_sges), 816 GFP_KERNEL); 817 if (!sc) 818 return NULL; 819 820 sc->sc_wr.wr_cqe = &sc->sc_cqe; 821 sc->sc_wr.sg_list = sc->sc_sges; 822 sc->sc_wr.opcode = IB_WR_SEND; 823 sc->sc_cqe.done = rpcrdma_wc_send; 824 return sc; 825 } 826 827 static int rpcrdma_sendctxs_create(struct rpcrdma_xprt *r_xprt) 828 { 829 struct rpcrdma_buffer *buf = &r_xprt->rx_buf; 830 struct rpcrdma_sendctx *sc; 831 unsigned long i; 832 833 /* Maximum number of concurrent outstanding Send WRs. Capping 834 * the circular queue size stops Send Queue overflow by causing 835 * the ->send_request call to fail temporarily before too many 836 * Sends are posted. 837 */ 838 i = buf->rb_max_requests + RPCRDMA_MAX_BC_REQUESTS; 839 dprintk("RPC: %s: allocating %lu send_ctxs\n", __func__, i); 840 buf->rb_sc_ctxs = kcalloc(i, sizeof(sc), GFP_KERNEL); 841 if (!buf->rb_sc_ctxs) 842 return -ENOMEM; 843 844 buf->rb_sc_last = i - 1; 845 for (i = 0; i <= buf->rb_sc_last; i++) { 846 sc = rpcrdma_sendctx_create(&r_xprt->rx_ia); 847 if (!sc) 848 return -ENOMEM; 849 850 sc->sc_xprt = r_xprt; 851 buf->rb_sc_ctxs[i] = sc; 852 } 853 854 return 0; 855 } 856 857 /* The sendctx queue is not guaranteed to have a size that is a 858 * power of two, thus the helpers in circ_buf.h cannot be used. 859 * The other option is to use modulus (%), which can be expensive. 860 */ 861 static unsigned long rpcrdma_sendctx_next(struct rpcrdma_buffer *buf, 862 unsigned long item) 863 { 864 return likely(item < buf->rb_sc_last) ? item + 1 : 0; 865 } 866 867 /** 868 * rpcrdma_sendctx_get_locked - Acquire a send context 869 * @r_xprt: controlling transport instance 870 * 871 * Returns pointer to a free send completion context; or NULL if 872 * the queue is empty. 873 * 874 * Usage: Called to acquire an SGE array before preparing a Send WR. 875 * 876 * The caller serializes calls to this function (per transport), and 877 * provides an effective memory barrier that flushes the new value 878 * of rb_sc_head. 879 */ 880 struct rpcrdma_sendctx *rpcrdma_sendctx_get_locked(struct rpcrdma_xprt *r_xprt) 881 { 882 struct rpcrdma_buffer *buf = &r_xprt->rx_buf; 883 struct rpcrdma_sendctx *sc; 884 unsigned long next_head; 885 886 next_head = rpcrdma_sendctx_next(buf, buf->rb_sc_head); 887 888 if (next_head == READ_ONCE(buf->rb_sc_tail)) 889 goto out_emptyq; 890 891 /* ORDER: item must be accessed _before_ head is updated */ 892 sc = buf->rb_sc_ctxs[next_head]; 893 894 /* Releasing the lock in the caller acts as a memory 895 * barrier that flushes rb_sc_head. 896 */ 897 buf->rb_sc_head = next_head; 898 899 return sc; 900 901 out_emptyq: 902 /* The queue is "empty" if there have not been enough Send 903 * completions recently. This is a sign the Send Queue is 904 * backing up. Cause the caller to pause and try again. 905 */ 906 xprt_wait_for_buffer_space(&r_xprt->rx_xprt); 907 r_xprt->rx_stats.empty_sendctx_q++; 908 return NULL; 909 } 910 911 /** 912 * rpcrdma_sendctx_put_locked - Release a send context 913 * @sc: send context to release 914 * 915 * Usage: Called from Send completion to return a sendctxt 916 * to the queue. 917 * 918 * The caller serializes calls to this function (per transport). 919 */ 920 static void 921 rpcrdma_sendctx_put_locked(struct rpcrdma_sendctx *sc) 922 { 923 struct rpcrdma_buffer *buf = &sc->sc_xprt->rx_buf; 924 unsigned long next_tail; 925 926 /* Unmap SGEs of previously completed but unsignaled 927 * Sends by walking up the queue until @sc is found. 928 */ 929 next_tail = buf->rb_sc_tail; 930 do { 931 next_tail = rpcrdma_sendctx_next(buf, next_tail); 932 933 /* ORDER: item must be accessed _before_ tail is updated */ 934 rpcrdma_sendctx_unmap(buf->rb_sc_ctxs[next_tail]); 935 936 } while (buf->rb_sc_ctxs[next_tail] != sc); 937 938 /* Paired with READ_ONCE */ 939 smp_store_release(&buf->rb_sc_tail, next_tail); 940 941 xprt_write_space(&sc->sc_xprt->rx_xprt); 942 } 943 944 static void 945 rpcrdma_mrs_create(struct rpcrdma_xprt *r_xprt) 946 { 947 struct rpcrdma_buffer *buf = &r_xprt->rx_buf; 948 struct rpcrdma_ia *ia = &r_xprt->rx_ia; 949 unsigned int count; 950 951 for (count = 0; count < ia->ri_max_segs; count++) { 952 struct rpcrdma_mr *mr; 953 int rc; 954 955 mr = kzalloc(sizeof(*mr), GFP_NOFS); 956 if (!mr) 957 break; 958 959 rc = frwr_init_mr(ia, mr); 960 if (rc) { 961 kfree(mr); 962 break; 963 } 964 965 mr->mr_xprt = r_xprt; 966 967 spin_lock(&buf->rb_lock); 968 list_add(&mr->mr_list, &buf->rb_mrs); 969 list_add(&mr->mr_all, &buf->rb_all_mrs); 970 spin_unlock(&buf->rb_lock); 971 } 972 973 r_xprt->rx_stats.mrs_allocated += count; 974 trace_xprtrdma_createmrs(r_xprt, count); 975 } 976 977 static void 978 rpcrdma_mr_refresh_worker(struct work_struct *work) 979 { 980 struct rpcrdma_buffer *buf = container_of(work, struct rpcrdma_buffer, 981 rb_refresh_worker); 982 struct rpcrdma_xprt *r_xprt = container_of(buf, struct rpcrdma_xprt, 983 rx_buf); 984 985 rpcrdma_mrs_create(r_xprt); 986 xprt_write_space(&r_xprt->rx_xprt); 987 } 988 989 /** 990 * rpcrdma_req_create - Allocate an rpcrdma_req object 991 * @r_xprt: controlling r_xprt 992 * @size: initial size, in bytes, of send and receive buffers 993 * @flags: GFP flags passed to memory allocators 994 * 995 * Returns an allocated and fully initialized rpcrdma_req or NULL. 996 */ 997 struct rpcrdma_req *rpcrdma_req_create(struct rpcrdma_xprt *r_xprt, size_t size, 998 gfp_t flags) 999 { 1000 struct rpcrdma_buffer *buffer = &r_xprt->rx_buf; 1001 struct rpcrdma_regbuf *rb; 1002 struct rpcrdma_req *req; 1003 size_t maxhdrsize; 1004 1005 req = kzalloc(sizeof(*req), flags); 1006 if (req == NULL) 1007 goto out1; 1008 1009 /* Compute maximum header buffer size in bytes */ 1010 maxhdrsize = rpcrdma_fixed_maxsz + 3 + 1011 r_xprt->rx_ia.ri_max_segs * rpcrdma_readchunk_maxsz; 1012 maxhdrsize *= sizeof(__be32); 1013 rb = rpcrdma_regbuf_alloc(__roundup_pow_of_two(maxhdrsize), 1014 DMA_TO_DEVICE, flags); 1015 if (!rb) 1016 goto out2; 1017 req->rl_rdmabuf = rb; 1018 xdr_buf_init(&req->rl_hdrbuf, rdmab_data(rb), rdmab_length(rb)); 1019 1020 req->rl_sendbuf = rpcrdma_regbuf_alloc(size, DMA_TO_DEVICE, flags); 1021 if (!req->rl_sendbuf) 1022 goto out3; 1023 1024 req->rl_recvbuf = rpcrdma_regbuf_alloc(size, DMA_NONE, flags); 1025 if (!req->rl_recvbuf) 1026 goto out4; 1027 1028 INIT_LIST_HEAD(&req->rl_free_mrs); 1029 INIT_LIST_HEAD(&req->rl_registered); 1030 spin_lock(&buffer->rb_lock); 1031 list_add(&req->rl_all, &buffer->rb_allreqs); 1032 spin_unlock(&buffer->rb_lock); 1033 return req; 1034 1035 out4: 1036 kfree(req->rl_sendbuf); 1037 out3: 1038 kfree(req->rl_rdmabuf); 1039 out2: 1040 kfree(req); 1041 out1: 1042 return NULL; 1043 } 1044 1045 static struct rpcrdma_rep *rpcrdma_rep_create(struct rpcrdma_xprt *r_xprt, 1046 bool temp) 1047 { 1048 struct rpcrdma_rep *rep; 1049 1050 rep = kzalloc(sizeof(*rep), GFP_KERNEL); 1051 if (rep == NULL) 1052 goto out; 1053 1054 rep->rr_rdmabuf = rpcrdma_regbuf_alloc(r_xprt->rx_ep.rep_inline_recv, 1055 DMA_FROM_DEVICE, GFP_KERNEL); 1056 if (!rep->rr_rdmabuf) 1057 goto out_free; 1058 1059 xdr_buf_init(&rep->rr_hdrbuf, rdmab_data(rep->rr_rdmabuf), 1060 rdmab_length(rep->rr_rdmabuf)); 1061 rep->rr_cqe.done = rpcrdma_wc_receive; 1062 rep->rr_rxprt = r_xprt; 1063 rep->rr_recv_wr.next = NULL; 1064 rep->rr_recv_wr.wr_cqe = &rep->rr_cqe; 1065 rep->rr_recv_wr.sg_list = &rep->rr_rdmabuf->rg_iov; 1066 rep->rr_recv_wr.num_sge = 1; 1067 rep->rr_temp = temp; 1068 return rep; 1069 1070 out_free: 1071 kfree(rep); 1072 out: 1073 return NULL; 1074 } 1075 1076 static void rpcrdma_rep_destroy(struct rpcrdma_rep *rep) 1077 { 1078 rpcrdma_regbuf_free(rep->rr_rdmabuf); 1079 kfree(rep); 1080 } 1081 1082 static struct rpcrdma_rep *rpcrdma_rep_get_locked(struct rpcrdma_buffer *buf) 1083 { 1084 struct llist_node *node; 1085 1086 /* Calls to llist_del_first are required to be serialized */ 1087 node = llist_del_first(&buf->rb_free_reps); 1088 if (!node) 1089 return NULL; 1090 return llist_entry(node, struct rpcrdma_rep, rr_node); 1091 } 1092 1093 static void rpcrdma_rep_put(struct rpcrdma_buffer *buf, 1094 struct rpcrdma_rep *rep) 1095 { 1096 if (!rep->rr_temp) 1097 llist_add(&rep->rr_node, &buf->rb_free_reps); 1098 else 1099 rpcrdma_rep_destroy(rep); 1100 } 1101 1102 static void rpcrdma_reps_destroy(struct rpcrdma_buffer *buf) 1103 { 1104 struct rpcrdma_rep *rep; 1105 1106 while ((rep = rpcrdma_rep_get_locked(buf)) != NULL) 1107 rpcrdma_rep_destroy(rep); 1108 } 1109 1110 /** 1111 * rpcrdma_buffer_create - Create initial set of req/rep objects 1112 * @r_xprt: transport instance to (re)initialize 1113 * 1114 * Returns zero on success, otherwise a negative errno. 1115 */ 1116 int rpcrdma_buffer_create(struct rpcrdma_xprt *r_xprt) 1117 { 1118 struct rpcrdma_buffer *buf = &r_xprt->rx_buf; 1119 int i, rc; 1120 1121 buf->rb_max_requests = r_xprt->rx_ep.rep_max_requests; 1122 buf->rb_bc_srv_max_requests = 0; 1123 spin_lock_init(&buf->rb_lock); 1124 INIT_LIST_HEAD(&buf->rb_mrs); 1125 INIT_LIST_HEAD(&buf->rb_all_mrs); 1126 INIT_WORK(&buf->rb_refresh_worker, rpcrdma_mr_refresh_worker); 1127 1128 rpcrdma_mrs_create(r_xprt); 1129 1130 INIT_LIST_HEAD(&buf->rb_send_bufs); 1131 INIT_LIST_HEAD(&buf->rb_allreqs); 1132 1133 rc = -ENOMEM; 1134 for (i = 0; i < buf->rb_max_requests; i++) { 1135 struct rpcrdma_req *req; 1136 1137 req = rpcrdma_req_create(r_xprt, RPCRDMA_V1_DEF_INLINE_SIZE, 1138 GFP_KERNEL); 1139 if (!req) 1140 goto out; 1141 list_add(&req->rl_list, &buf->rb_send_bufs); 1142 } 1143 1144 buf->rb_credits = 1; 1145 init_llist_head(&buf->rb_free_reps); 1146 1147 rc = rpcrdma_sendctxs_create(r_xprt); 1148 if (rc) 1149 goto out; 1150 1151 return 0; 1152 out: 1153 rpcrdma_buffer_destroy(buf); 1154 return rc; 1155 } 1156 1157 /** 1158 * rpcrdma_req_destroy - Destroy an rpcrdma_req object 1159 * @req: unused object to be destroyed 1160 * 1161 * This function assumes that the caller prevents concurrent device 1162 * unload and transport tear-down. 1163 */ 1164 void rpcrdma_req_destroy(struct rpcrdma_req *req) 1165 { 1166 list_del(&req->rl_all); 1167 1168 while (!list_empty(&req->rl_free_mrs)) 1169 rpcrdma_mr_free(rpcrdma_mr_pop(&req->rl_free_mrs)); 1170 1171 rpcrdma_regbuf_free(req->rl_recvbuf); 1172 rpcrdma_regbuf_free(req->rl_sendbuf); 1173 rpcrdma_regbuf_free(req->rl_rdmabuf); 1174 kfree(req); 1175 } 1176 1177 static void 1178 rpcrdma_mrs_destroy(struct rpcrdma_buffer *buf) 1179 { 1180 struct rpcrdma_xprt *r_xprt = container_of(buf, struct rpcrdma_xprt, 1181 rx_buf); 1182 struct rpcrdma_mr *mr; 1183 unsigned int count; 1184 1185 count = 0; 1186 spin_lock(&buf->rb_lock); 1187 while ((mr = list_first_entry_or_null(&buf->rb_all_mrs, 1188 struct rpcrdma_mr, 1189 mr_all)) != NULL) { 1190 list_del(&mr->mr_all); 1191 spin_unlock(&buf->rb_lock); 1192 1193 frwr_release_mr(mr); 1194 count++; 1195 spin_lock(&buf->rb_lock); 1196 } 1197 spin_unlock(&buf->rb_lock); 1198 r_xprt->rx_stats.mrs_allocated = 0; 1199 } 1200 1201 /** 1202 * rpcrdma_buffer_destroy - Release all hw resources 1203 * @buf: root control block for resources 1204 * 1205 * ORDERING: relies on a prior rpcrdma_xprt_drain : 1206 * - No more Send or Receive completions can occur 1207 * - All MRs, reps, and reqs are returned to their free lists 1208 */ 1209 void 1210 rpcrdma_buffer_destroy(struct rpcrdma_buffer *buf) 1211 { 1212 cancel_work_sync(&buf->rb_refresh_worker); 1213 1214 rpcrdma_sendctxs_destroy(buf); 1215 rpcrdma_reps_destroy(buf); 1216 1217 while (!list_empty(&buf->rb_send_bufs)) { 1218 struct rpcrdma_req *req; 1219 1220 req = list_first_entry(&buf->rb_send_bufs, 1221 struct rpcrdma_req, rl_list); 1222 list_del(&req->rl_list); 1223 rpcrdma_req_destroy(req); 1224 } 1225 1226 rpcrdma_mrs_destroy(buf); 1227 } 1228 1229 /** 1230 * rpcrdma_mr_get - Allocate an rpcrdma_mr object 1231 * @r_xprt: controlling transport 1232 * 1233 * Returns an initialized rpcrdma_mr or NULL if no free 1234 * rpcrdma_mr objects are available. 1235 */ 1236 struct rpcrdma_mr * 1237 rpcrdma_mr_get(struct rpcrdma_xprt *r_xprt) 1238 { 1239 struct rpcrdma_buffer *buf = &r_xprt->rx_buf; 1240 struct rpcrdma_mr *mr; 1241 1242 spin_lock(&buf->rb_lock); 1243 mr = rpcrdma_mr_pop(&buf->rb_mrs); 1244 spin_unlock(&buf->rb_lock); 1245 return mr; 1246 } 1247 1248 /** 1249 * rpcrdma_mr_put - DMA unmap an MR and release it 1250 * @mr: MR to release 1251 * 1252 */ 1253 void rpcrdma_mr_put(struct rpcrdma_mr *mr) 1254 { 1255 struct rpcrdma_xprt *r_xprt = mr->mr_xprt; 1256 1257 if (mr->mr_dir != DMA_NONE) { 1258 trace_xprtrdma_mr_unmap(mr); 1259 ib_dma_unmap_sg(r_xprt->rx_ia.ri_id->device, 1260 mr->mr_sg, mr->mr_nents, mr->mr_dir); 1261 mr->mr_dir = DMA_NONE; 1262 } 1263 1264 rpcrdma_mr_push(mr, &mr->mr_req->rl_free_mrs); 1265 } 1266 1267 static void rpcrdma_mr_free(struct rpcrdma_mr *mr) 1268 { 1269 struct rpcrdma_xprt *r_xprt = mr->mr_xprt; 1270 struct rpcrdma_buffer *buf = &r_xprt->rx_buf; 1271 1272 mr->mr_req = NULL; 1273 spin_lock(&buf->rb_lock); 1274 rpcrdma_mr_push(mr, &buf->rb_mrs); 1275 spin_unlock(&buf->rb_lock); 1276 } 1277 1278 /** 1279 * rpcrdma_buffer_get - Get a request buffer 1280 * @buffers: Buffer pool from which to obtain a buffer 1281 * 1282 * Returns a fresh rpcrdma_req, or NULL if none are available. 1283 */ 1284 struct rpcrdma_req * 1285 rpcrdma_buffer_get(struct rpcrdma_buffer *buffers) 1286 { 1287 struct rpcrdma_req *req; 1288 1289 spin_lock(&buffers->rb_lock); 1290 req = list_first_entry_or_null(&buffers->rb_send_bufs, 1291 struct rpcrdma_req, rl_list); 1292 if (req) 1293 list_del_init(&req->rl_list); 1294 spin_unlock(&buffers->rb_lock); 1295 return req; 1296 } 1297 1298 /** 1299 * rpcrdma_buffer_put - Put request/reply buffers back into pool 1300 * @buffers: buffer pool 1301 * @req: object to return 1302 * 1303 */ 1304 void rpcrdma_buffer_put(struct rpcrdma_buffer *buffers, struct rpcrdma_req *req) 1305 { 1306 if (req->rl_reply) 1307 rpcrdma_rep_put(buffers, req->rl_reply); 1308 req->rl_reply = NULL; 1309 1310 spin_lock(&buffers->rb_lock); 1311 list_add(&req->rl_list, &buffers->rb_send_bufs); 1312 spin_unlock(&buffers->rb_lock); 1313 } 1314 1315 /** 1316 * rpcrdma_recv_buffer_put - Release rpcrdma_rep back to free list 1317 * @rep: rep to release 1318 * 1319 * Used after error conditions. 1320 */ 1321 void rpcrdma_recv_buffer_put(struct rpcrdma_rep *rep) 1322 { 1323 rpcrdma_rep_put(&rep->rr_rxprt->rx_buf, rep); 1324 } 1325 1326 /* Returns a pointer to a rpcrdma_regbuf object, or NULL. 1327 * 1328 * xprtrdma uses a regbuf for posting an outgoing RDMA SEND, or for 1329 * receiving the payload of RDMA RECV operations. During Long Calls 1330 * or Replies they may be registered externally via frwr_map. 1331 */ 1332 static struct rpcrdma_regbuf * 1333 rpcrdma_regbuf_alloc(size_t size, enum dma_data_direction direction, 1334 gfp_t flags) 1335 { 1336 struct rpcrdma_regbuf *rb; 1337 1338 rb = kmalloc(sizeof(*rb), flags); 1339 if (!rb) 1340 return NULL; 1341 rb->rg_data = kmalloc(size, flags); 1342 if (!rb->rg_data) { 1343 kfree(rb); 1344 return NULL; 1345 } 1346 1347 rb->rg_device = NULL; 1348 rb->rg_direction = direction; 1349 rb->rg_iov.length = size; 1350 return rb; 1351 } 1352 1353 /** 1354 * rpcrdma_regbuf_realloc - re-allocate a SEND/RECV buffer 1355 * @rb: regbuf to reallocate 1356 * @size: size of buffer to be allocated, in bytes 1357 * @flags: GFP flags 1358 * 1359 * Returns true if reallocation was successful. If false is 1360 * returned, @rb is left untouched. 1361 */ 1362 bool rpcrdma_regbuf_realloc(struct rpcrdma_regbuf *rb, size_t size, gfp_t flags) 1363 { 1364 void *buf; 1365 1366 buf = kmalloc(size, flags); 1367 if (!buf) 1368 return false; 1369 1370 rpcrdma_regbuf_dma_unmap(rb); 1371 kfree(rb->rg_data); 1372 1373 rb->rg_data = buf; 1374 rb->rg_iov.length = size; 1375 return true; 1376 } 1377 1378 /** 1379 * __rpcrdma_regbuf_dma_map - DMA-map a regbuf 1380 * @r_xprt: controlling transport instance 1381 * @rb: regbuf to be mapped 1382 * 1383 * Returns true if the buffer is now DMA mapped to @r_xprt's device 1384 */ 1385 bool __rpcrdma_regbuf_dma_map(struct rpcrdma_xprt *r_xprt, 1386 struct rpcrdma_regbuf *rb) 1387 { 1388 struct ib_device *device = r_xprt->rx_ia.ri_id->device; 1389 1390 if (rb->rg_direction == DMA_NONE) 1391 return false; 1392 1393 rb->rg_iov.addr = ib_dma_map_single(device, rdmab_data(rb), 1394 rdmab_length(rb), rb->rg_direction); 1395 if (ib_dma_mapping_error(device, rdmab_addr(rb))) { 1396 trace_xprtrdma_dma_maperr(rdmab_addr(rb)); 1397 return false; 1398 } 1399 1400 rb->rg_device = device; 1401 rb->rg_iov.lkey = r_xprt->rx_ia.ri_pd->local_dma_lkey; 1402 return true; 1403 } 1404 1405 static void rpcrdma_regbuf_dma_unmap(struct rpcrdma_regbuf *rb) 1406 { 1407 if (!rb) 1408 return; 1409 1410 if (!rpcrdma_regbuf_is_mapped(rb)) 1411 return; 1412 1413 ib_dma_unmap_single(rb->rg_device, rdmab_addr(rb), rdmab_length(rb), 1414 rb->rg_direction); 1415 rb->rg_device = NULL; 1416 } 1417 1418 static void rpcrdma_regbuf_free(struct rpcrdma_regbuf *rb) 1419 { 1420 rpcrdma_regbuf_dma_unmap(rb); 1421 if (rb) 1422 kfree(rb->rg_data); 1423 kfree(rb); 1424 } 1425 1426 /** 1427 * rpcrdma_ep_post - Post WRs to a transport's Send Queue 1428 * @ia: transport's device information 1429 * @ep: transport's RDMA endpoint information 1430 * @req: rpcrdma_req containing the Send WR to post 1431 * 1432 * Returns 0 if the post was successful, otherwise -ENOTCONN 1433 * is returned. 1434 */ 1435 int 1436 rpcrdma_ep_post(struct rpcrdma_ia *ia, 1437 struct rpcrdma_ep *ep, 1438 struct rpcrdma_req *req) 1439 { 1440 struct ib_send_wr *send_wr = &req->rl_sendctx->sc_wr; 1441 int rc; 1442 1443 if (!ep->rep_send_count || kref_read(&req->rl_kref) > 1) { 1444 send_wr->send_flags |= IB_SEND_SIGNALED; 1445 ep->rep_send_count = ep->rep_send_batch; 1446 } else { 1447 send_wr->send_flags &= ~IB_SEND_SIGNALED; 1448 --ep->rep_send_count; 1449 } 1450 1451 rc = frwr_send(ia, req); 1452 trace_xprtrdma_post_send(req, rc); 1453 if (rc) 1454 return -ENOTCONN; 1455 return 0; 1456 } 1457 1458 static void 1459 rpcrdma_post_recvs(struct rpcrdma_xprt *r_xprt, bool temp) 1460 { 1461 struct rpcrdma_buffer *buf = &r_xprt->rx_buf; 1462 struct rpcrdma_ep *ep = &r_xprt->rx_ep; 1463 struct ib_recv_wr *i, *wr, *bad_wr; 1464 struct rpcrdma_rep *rep; 1465 int needed, count, rc; 1466 1467 rc = 0; 1468 count = 0; 1469 1470 needed = buf->rb_credits + (buf->rb_bc_srv_max_requests << 1); 1471 if (likely(ep->rep_receive_count > needed)) 1472 goto out; 1473 needed -= ep->rep_receive_count; 1474 if (!temp) 1475 needed += RPCRDMA_MAX_RECV_BATCH; 1476 1477 /* fast path: all needed reps can be found on the free list */ 1478 wr = NULL; 1479 while (needed) { 1480 rep = rpcrdma_rep_get_locked(buf); 1481 if (!rep) 1482 rep = rpcrdma_rep_create(r_xprt, temp); 1483 if (!rep) 1484 break; 1485 1486 rep->rr_recv_wr.next = wr; 1487 wr = &rep->rr_recv_wr; 1488 --needed; 1489 } 1490 if (!wr) 1491 goto out; 1492 1493 for (i = wr; i; i = i->next) { 1494 rep = container_of(i, struct rpcrdma_rep, rr_recv_wr); 1495 1496 if (!rpcrdma_regbuf_dma_map(r_xprt, rep->rr_rdmabuf)) 1497 goto release_wrs; 1498 1499 trace_xprtrdma_post_recv(rep); 1500 ++count; 1501 } 1502 1503 rc = ib_post_recv(r_xprt->rx_ia.ri_id->qp, wr, 1504 (const struct ib_recv_wr **)&bad_wr); 1505 out: 1506 trace_xprtrdma_post_recvs(r_xprt, count, rc); 1507 if (rc) { 1508 for (wr = bad_wr; wr;) { 1509 struct rpcrdma_rep *rep; 1510 1511 rep = container_of(wr, struct rpcrdma_rep, rr_recv_wr); 1512 wr = wr->next; 1513 rpcrdma_recv_buffer_put(rep); 1514 --count; 1515 } 1516 } 1517 ep->rep_receive_count += count; 1518 return; 1519 1520 release_wrs: 1521 for (i = wr; i;) { 1522 rep = container_of(i, struct rpcrdma_rep, rr_recv_wr); 1523 i = i->next; 1524 rpcrdma_recv_buffer_put(rep); 1525 } 1526 } 1527