1 /* 2 * Copyright (c) 2014-2017 Oracle. All rights reserved. 3 * Copyright (c) 2003-2007 Network Appliance, Inc. All rights reserved. 4 * 5 * This software is available to you under a choice of one of two 6 * licenses. You may choose to be licensed under the terms of the GNU 7 * General Public License (GPL) Version 2, available from the file 8 * COPYING in the main directory of this source tree, or the BSD-type 9 * license below: 10 * 11 * Redistribution and use in source and binary forms, with or without 12 * modification, are permitted provided that the following conditions 13 * are met: 14 * 15 * Redistributions of source code must retain the above copyright 16 * notice, this list of conditions and the following disclaimer. 17 * 18 * Redistributions in binary form must reproduce the above 19 * copyright notice, this list of conditions and the following 20 * disclaimer in the documentation and/or other materials provided 21 * with the distribution. 22 * 23 * Neither the name of the Network Appliance, Inc. nor the names of 24 * its contributors may be used to endorse or promote products 25 * derived from this software without specific prior written 26 * permission. 27 * 28 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 29 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 30 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 31 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 32 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 33 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 34 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 35 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 36 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 37 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 38 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 39 */ 40 41 /* 42 * verbs.c 43 * 44 * Encapsulates the major functions managing: 45 * o adapters 46 * o endpoints 47 * o connections 48 * o buffer memory 49 */ 50 51 #include <linux/interrupt.h> 52 #include <linux/slab.h> 53 #include <linux/sunrpc/addr.h> 54 #include <linux/sunrpc/svc_rdma.h> 55 56 #include <asm-generic/barrier.h> 57 #include <asm/bitops.h> 58 59 #include <rdma/ib_cm.h> 60 61 #include "xprt_rdma.h" 62 63 /* 64 * Globals/Macros 65 */ 66 67 #if IS_ENABLED(CONFIG_SUNRPC_DEBUG) 68 # define RPCDBG_FACILITY RPCDBG_TRANS 69 #endif 70 71 /* 72 * internal functions 73 */ 74 static void rpcrdma_mrs_create(struct rpcrdma_xprt *r_xprt); 75 static void rpcrdma_mrs_destroy(struct rpcrdma_buffer *buf); 76 static void rpcrdma_dma_unmap_regbuf(struct rpcrdma_regbuf *rb); 77 78 struct workqueue_struct *rpcrdma_receive_wq __read_mostly; 79 80 int 81 rpcrdma_alloc_wq(void) 82 { 83 struct workqueue_struct *recv_wq; 84 85 recv_wq = alloc_workqueue("xprtrdma_receive", 86 WQ_MEM_RECLAIM | WQ_HIGHPRI, 87 0); 88 if (!recv_wq) 89 return -ENOMEM; 90 91 rpcrdma_receive_wq = recv_wq; 92 return 0; 93 } 94 95 void 96 rpcrdma_destroy_wq(void) 97 { 98 struct workqueue_struct *wq; 99 100 if (rpcrdma_receive_wq) { 101 wq = rpcrdma_receive_wq; 102 rpcrdma_receive_wq = NULL; 103 destroy_workqueue(wq); 104 } 105 } 106 107 static void 108 rpcrdma_qp_async_error_upcall(struct ib_event *event, void *context) 109 { 110 struct rpcrdma_ep *ep = context; 111 struct rpcrdma_xprt *r_xprt = container_of(ep, struct rpcrdma_xprt, 112 rx_ep); 113 114 trace_xprtrdma_qp_error(r_xprt, event); 115 pr_err("rpcrdma: %s on device %s ep %p\n", 116 ib_event_msg(event->event), event->device->name, context); 117 118 if (ep->rep_connected == 1) { 119 ep->rep_connected = -EIO; 120 rpcrdma_conn_func(ep); 121 wake_up_all(&ep->rep_connect_wait); 122 } 123 } 124 125 /** 126 * rpcrdma_wc_send - Invoked by RDMA provider for each polled Send WC 127 * @cq: completion queue (ignored) 128 * @wc: completed WR 129 * 130 */ 131 static void 132 rpcrdma_wc_send(struct ib_cq *cq, struct ib_wc *wc) 133 { 134 struct ib_cqe *cqe = wc->wr_cqe; 135 struct rpcrdma_sendctx *sc = 136 container_of(cqe, struct rpcrdma_sendctx, sc_cqe); 137 138 /* WARNING: Only wr_cqe and status are reliable at this point */ 139 trace_xprtrdma_wc_send(sc, wc); 140 if (wc->status != IB_WC_SUCCESS && wc->status != IB_WC_WR_FLUSH_ERR) 141 pr_err("rpcrdma: Send: %s (%u/0x%x)\n", 142 ib_wc_status_msg(wc->status), 143 wc->status, wc->vendor_err); 144 145 rpcrdma_sendctx_put_locked(sc); 146 } 147 148 /** 149 * rpcrdma_wc_receive - Invoked by RDMA provider for each polled Receive WC 150 * @cq: completion queue (ignored) 151 * @wc: completed WR 152 * 153 */ 154 static void 155 rpcrdma_wc_receive(struct ib_cq *cq, struct ib_wc *wc) 156 { 157 struct ib_cqe *cqe = wc->wr_cqe; 158 struct rpcrdma_rep *rep = container_of(cqe, struct rpcrdma_rep, 159 rr_cqe); 160 161 /* WARNING: Only wr_id and status are reliable at this point */ 162 trace_xprtrdma_wc_receive(rep, wc); 163 if (wc->status != IB_WC_SUCCESS) 164 goto out_fail; 165 166 /* status == SUCCESS means all fields in wc are trustworthy */ 167 rpcrdma_set_xdrlen(&rep->rr_hdrbuf, wc->byte_len); 168 rep->rr_wc_flags = wc->wc_flags; 169 rep->rr_inv_rkey = wc->ex.invalidate_rkey; 170 171 ib_dma_sync_single_for_cpu(rdmab_device(rep->rr_rdmabuf), 172 rdmab_addr(rep->rr_rdmabuf), 173 wc->byte_len, DMA_FROM_DEVICE); 174 175 out_schedule: 176 rpcrdma_reply_handler(rep); 177 return; 178 179 out_fail: 180 if (wc->status != IB_WC_WR_FLUSH_ERR) 181 pr_err("rpcrdma: Recv: %s (%u/0x%x)\n", 182 ib_wc_status_msg(wc->status), 183 wc->status, wc->vendor_err); 184 rpcrdma_set_xdrlen(&rep->rr_hdrbuf, 0); 185 goto out_schedule; 186 } 187 188 static void 189 rpcrdma_update_connect_private(struct rpcrdma_xprt *r_xprt, 190 struct rdma_conn_param *param) 191 { 192 struct rpcrdma_create_data_internal *cdata = &r_xprt->rx_data; 193 const struct rpcrdma_connect_private *pmsg = param->private_data; 194 unsigned int rsize, wsize; 195 196 /* Default settings for RPC-over-RDMA Version One */ 197 r_xprt->rx_ia.ri_implicit_roundup = xprt_rdma_pad_optimize; 198 rsize = RPCRDMA_V1_DEF_INLINE_SIZE; 199 wsize = RPCRDMA_V1_DEF_INLINE_SIZE; 200 201 if (pmsg && 202 pmsg->cp_magic == rpcrdma_cmp_magic && 203 pmsg->cp_version == RPCRDMA_CMP_VERSION) { 204 r_xprt->rx_ia.ri_implicit_roundup = true; 205 rsize = rpcrdma_decode_buffer_size(pmsg->cp_send_size); 206 wsize = rpcrdma_decode_buffer_size(pmsg->cp_recv_size); 207 } 208 209 if (rsize < cdata->inline_rsize) 210 cdata->inline_rsize = rsize; 211 if (wsize < cdata->inline_wsize) 212 cdata->inline_wsize = wsize; 213 dprintk("RPC: %s: max send %u, max recv %u\n", 214 __func__, cdata->inline_wsize, cdata->inline_rsize); 215 rpcrdma_set_max_header_sizes(r_xprt); 216 } 217 218 static int 219 rpcrdma_conn_upcall(struct rdma_cm_id *id, struct rdma_cm_event *event) 220 { 221 struct rpcrdma_xprt *xprt = id->context; 222 struct rpcrdma_ia *ia = &xprt->rx_ia; 223 struct rpcrdma_ep *ep = &xprt->rx_ep; 224 int connstate = 0; 225 226 trace_xprtrdma_conn_upcall(xprt, event); 227 switch (event->event) { 228 case RDMA_CM_EVENT_ADDR_RESOLVED: 229 case RDMA_CM_EVENT_ROUTE_RESOLVED: 230 ia->ri_async_rc = 0; 231 complete(&ia->ri_done); 232 break; 233 case RDMA_CM_EVENT_ADDR_ERROR: 234 ia->ri_async_rc = -EHOSTUNREACH; 235 complete(&ia->ri_done); 236 break; 237 case RDMA_CM_EVENT_ROUTE_ERROR: 238 ia->ri_async_rc = -ENETUNREACH; 239 complete(&ia->ri_done); 240 break; 241 case RDMA_CM_EVENT_DEVICE_REMOVAL: 242 #if IS_ENABLED(CONFIG_SUNRPC_DEBUG) 243 pr_info("rpcrdma: removing device %s for %s:%s\n", 244 ia->ri_device->name, 245 rpcrdma_addrstr(xprt), rpcrdma_portstr(xprt)); 246 #endif 247 set_bit(RPCRDMA_IAF_REMOVING, &ia->ri_flags); 248 ep->rep_connected = -ENODEV; 249 xprt_force_disconnect(&xprt->rx_xprt); 250 wait_for_completion(&ia->ri_remove_done); 251 252 ia->ri_id = NULL; 253 ia->ri_pd = NULL; 254 ia->ri_device = NULL; 255 /* Return 1 to ensure the core destroys the id. */ 256 return 1; 257 case RDMA_CM_EVENT_ESTABLISHED: 258 connstate = 1; 259 rpcrdma_update_connect_private(xprt, &event->param.conn); 260 goto connected; 261 case RDMA_CM_EVENT_CONNECT_ERROR: 262 connstate = -ENOTCONN; 263 goto connected; 264 case RDMA_CM_EVENT_UNREACHABLE: 265 connstate = -ENETDOWN; 266 goto connected; 267 case RDMA_CM_EVENT_REJECTED: 268 dprintk("rpcrdma: connection to %s:%s rejected: %s\n", 269 rpcrdma_addrstr(xprt), rpcrdma_portstr(xprt), 270 rdma_reject_msg(id, event->status)); 271 connstate = -ECONNREFUSED; 272 if (event->status == IB_CM_REJ_STALE_CONN) 273 connstate = -EAGAIN; 274 goto connected; 275 case RDMA_CM_EVENT_DISCONNECTED: 276 connstate = -ECONNABORTED; 277 connected: 278 xprt->rx_buf.rb_credits = 1; 279 ep->rep_connected = connstate; 280 rpcrdma_conn_func(ep); 281 wake_up_all(&ep->rep_connect_wait); 282 /*FALLTHROUGH*/ 283 default: 284 dprintk("RPC: %s: %s:%s on %s/%s (ep 0x%p): %s\n", 285 __func__, 286 rpcrdma_addrstr(xprt), rpcrdma_portstr(xprt), 287 ia->ri_device->name, ia->ri_ops->ro_displayname, 288 ep, rdma_event_msg(event->event)); 289 break; 290 } 291 292 return 0; 293 } 294 295 static struct rdma_cm_id * 296 rpcrdma_create_id(struct rpcrdma_xprt *xprt, struct rpcrdma_ia *ia) 297 { 298 unsigned long wtimeout = msecs_to_jiffies(RDMA_RESOLVE_TIMEOUT) + 1; 299 struct rdma_cm_id *id; 300 int rc; 301 302 trace_xprtrdma_conn_start(xprt); 303 304 init_completion(&ia->ri_done); 305 init_completion(&ia->ri_remove_done); 306 307 id = rdma_create_id(&init_net, rpcrdma_conn_upcall, xprt, RDMA_PS_TCP, 308 IB_QPT_RC); 309 if (IS_ERR(id)) { 310 rc = PTR_ERR(id); 311 dprintk("RPC: %s: rdma_create_id() failed %i\n", 312 __func__, rc); 313 return id; 314 } 315 316 ia->ri_async_rc = -ETIMEDOUT; 317 rc = rdma_resolve_addr(id, NULL, 318 (struct sockaddr *)&xprt->rx_xprt.addr, 319 RDMA_RESOLVE_TIMEOUT); 320 if (rc) { 321 dprintk("RPC: %s: rdma_resolve_addr() failed %i\n", 322 __func__, rc); 323 goto out; 324 } 325 rc = wait_for_completion_interruptible_timeout(&ia->ri_done, wtimeout); 326 if (rc < 0) { 327 trace_xprtrdma_conn_tout(xprt); 328 goto out; 329 } 330 331 rc = ia->ri_async_rc; 332 if (rc) 333 goto out; 334 335 ia->ri_async_rc = -ETIMEDOUT; 336 rc = rdma_resolve_route(id, RDMA_RESOLVE_TIMEOUT); 337 if (rc) { 338 dprintk("RPC: %s: rdma_resolve_route() failed %i\n", 339 __func__, rc); 340 goto out; 341 } 342 rc = wait_for_completion_interruptible_timeout(&ia->ri_done, wtimeout); 343 if (rc < 0) { 344 trace_xprtrdma_conn_tout(xprt); 345 goto out; 346 } 347 rc = ia->ri_async_rc; 348 if (rc) 349 goto out; 350 351 return id; 352 353 out: 354 rdma_destroy_id(id); 355 return ERR_PTR(rc); 356 } 357 358 /* 359 * Exported functions. 360 */ 361 362 /** 363 * rpcrdma_ia_open - Open and initialize an Interface Adapter. 364 * @xprt: transport with IA to (re)initialize 365 * 366 * Returns 0 on success, negative errno if an appropriate 367 * Interface Adapter could not be found and opened. 368 */ 369 int 370 rpcrdma_ia_open(struct rpcrdma_xprt *xprt) 371 { 372 struct rpcrdma_ia *ia = &xprt->rx_ia; 373 int rc; 374 375 ia->ri_id = rpcrdma_create_id(xprt, ia); 376 if (IS_ERR(ia->ri_id)) { 377 rc = PTR_ERR(ia->ri_id); 378 goto out_err; 379 } 380 ia->ri_device = ia->ri_id->device; 381 382 ia->ri_pd = ib_alloc_pd(ia->ri_device, 0); 383 if (IS_ERR(ia->ri_pd)) { 384 rc = PTR_ERR(ia->ri_pd); 385 pr_err("rpcrdma: ib_alloc_pd() returned %d\n", rc); 386 goto out_err; 387 } 388 389 switch (xprt_rdma_memreg_strategy) { 390 case RPCRDMA_FRWR: 391 if (frwr_is_supported(ia)) { 392 ia->ri_ops = &rpcrdma_frwr_memreg_ops; 393 break; 394 } 395 /*FALLTHROUGH*/ 396 case RPCRDMA_MTHCAFMR: 397 if (fmr_is_supported(ia)) { 398 ia->ri_ops = &rpcrdma_fmr_memreg_ops; 399 break; 400 } 401 /*FALLTHROUGH*/ 402 default: 403 pr_err("rpcrdma: Device %s does not support memreg mode %d\n", 404 ia->ri_device->name, xprt_rdma_memreg_strategy); 405 rc = -EINVAL; 406 goto out_err; 407 } 408 409 return 0; 410 411 out_err: 412 rpcrdma_ia_close(ia); 413 return rc; 414 } 415 416 /** 417 * rpcrdma_ia_remove - Handle device driver unload 418 * @ia: interface adapter being removed 419 * 420 * Divest transport H/W resources associated with this adapter, 421 * but allow it to be restored later. 422 */ 423 void 424 rpcrdma_ia_remove(struct rpcrdma_ia *ia) 425 { 426 struct rpcrdma_xprt *r_xprt = container_of(ia, struct rpcrdma_xprt, 427 rx_ia); 428 struct rpcrdma_ep *ep = &r_xprt->rx_ep; 429 struct rpcrdma_buffer *buf = &r_xprt->rx_buf; 430 struct rpcrdma_req *req; 431 struct rpcrdma_rep *rep; 432 433 cancel_delayed_work_sync(&buf->rb_refresh_worker); 434 435 /* This is similar to rpcrdma_ep_destroy, but: 436 * - Don't cancel the connect worker. 437 * - Don't call rpcrdma_ep_disconnect, which waits 438 * for another conn upcall, which will deadlock. 439 * - rdma_disconnect is unneeded, the underlying 440 * connection is already gone. 441 */ 442 if (ia->ri_id->qp) { 443 ib_drain_qp(ia->ri_id->qp); 444 rdma_destroy_qp(ia->ri_id); 445 ia->ri_id->qp = NULL; 446 } 447 ib_free_cq(ep->rep_attr.recv_cq); 448 ib_free_cq(ep->rep_attr.send_cq); 449 450 /* The ULP is responsible for ensuring all DMA 451 * mappings and MRs are gone. 452 */ 453 list_for_each_entry(rep, &buf->rb_recv_bufs, rr_list) 454 rpcrdma_dma_unmap_regbuf(rep->rr_rdmabuf); 455 list_for_each_entry(req, &buf->rb_allreqs, rl_all) { 456 rpcrdma_dma_unmap_regbuf(req->rl_rdmabuf); 457 rpcrdma_dma_unmap_regbuf(req->rl_sendbuf); 458 rpcrdma_dma_unmap_regbuf(req->rl_recvbuf); 459 } 460 rpcrdma_mrs_destroy(buf); 461 462 /* Allow waiters to continue */ 463 complete(&ia->ri_remove_done); 464 465 trace_xprtrdma_remove(r_xprt); 466 } 467 468 /** 469 * rpcrdma_ia_close - Clean up/close an IA. 470 * @ia: interface adapter to close 471 * 472 */ 473 void 474 rpcrdma_ia_close(struct rpcrdma_ia *ia) 475 { 476 if (ia->ri_id != NULL && !IS_ERR(ia->ri_id)) { 477 if (ia->ri_id->qp) 478 rdma_destroy_qp(ia->ri_id); 479 rdma_destroy_id(ia->ri_id); 480 } 481 ia->ri_id = NULL; 482 ia->ri_device = NULL; 483 484 /* If the pd is still busy, xprtrdma missed freeing a resource */ 485 if (ia->ri_pd && !IS_ERR(ia->ri_pd)) 486 ib_dealloc_pd(ia->ri_pd); 487 ia->ri_pd = NULL; 488 } 489 490 /* 491 * Create unconnected endpoint. 492 */ 493 int 494 rpcrdma_ep_create(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia, 495 struct rpcrdma_create_data_internal *cdata) 496 { 497 struct rpcrdma_connect_private *pmsg = &ep->rep_cm_private; 498 unsigned int max_qp_wr, max_sge; 499 struct ib_cq *sendcq, *recvcq; 500 int rc; 501 502 max_sge = min_t(unsigned int, ia->ri_device->attrs.max_sge, 503 RPCRDMA_MAX_SEND_SGES); 504 if (max_sge < RPCRDMA_MIN_SEND_SGES) { 505 pr_warn("rpcrdma: HCA provides only %d send SGEs\n", max_sge); 506 return -ENOMEM; 507 } 508 ia->ri_max_send_sges = max_sge; 509 510 if (ia->ri_device->attrs.max_qp_wr <= RPCRDMA_BACKWARD_WRS) { 511 dprintk("RPC: %s: insufficient wqe's available\n", 512 __func__); 513 return -ENOMEM; 514 } 515 max_qp_wr = ia->ri_device->attrs.max_qp_wr - RPCRDMA_BACKWARD_WRS - 1; 516 517 /* check provider's send/recv wr limits */ 518 if (cdata->max_requests > max_qp_wr) 519 cdata->max_requests = max_qp_wr; 520 521 ep->rep_attr.event_handler = rpcrdma_qp_async_error_upcall; 522 ep->rep_attr.qp_context = ep; 523 ep->rep_attr.srq = NULL; 524 ep->rep_attr.cap.max_send_wr = cdata->max_requests; 525 ep->rep_attr.cap.max_send_wr += RPCRDMA_BACKWARD_WRS; 526 ep->rep_attr.cap.max_send_wr += 1; /* drain cqe */ 527 rc = ia->ri_ops->ro_open(ia, ep, cdata); 528 if (rc) 529 return rc; 530 ep->rep_attr.cap.max_recv_wr = cdata->max_requests; 531 ep->rep_attr.cap.max_recv_wr += RPCRDMA_BACKWARD_WRS; 532 ep->rep_attr.cap.max_recv_wr += 1; /* drain cqe */ 533 ep->rep_attr.cap.max_send_sge = max_sge; 534 ep->rep_attr.cap.max_recv_sge = 1; 535 ep->rep_attr.cap.max_inline_data = 0; 536 ep->rep_attr.sq_sig_type = IB_SIGNAL_REQ_WR; 537 ep->rep_attr.qp_type = IB_QPT_RC; 538 ep->rep_attr.port_num = ~0; 539 540 dprintk("RPC: %s: requested max: dtos: send %d recv %d; " 541 "iovs: send %d recv %d\n", 542 __func__, 543 ep->rep_attr.cap.max_send_wr, 544 ep->rep_attr.cap.max_recv_wr, 545 ep->rep_attr.cap.max_send_sge, 546 ep->rep_attr.cap.max_recv_sge); 547 548 /* set trigger for requesting send completion */ 549 ep->rep_send_batch = min_t(unsigned int, RPCRDMA_MAX_SEND_BATCH, 550 cdata->max_requests >> 2); 551 ep->rep_send_count = ep->rep_send_batch; 552 init_waitqueue_head(&ep->rep_connect_wait); 553 INIT_DELAYED_WORK(&ep->rep_connect_worker, rpcrdma_connect_worker); 554 555 sendcq = ib_alloc_cq(ia->ri_device, NULL, 556 ep->rep_attr.cap.max_send_wr + 1, 557 1, IB_POLL_WORKQUEUE); 558 if (IS_ERR(sendcq)) { 559 rc = PTR_ERR(sendcq); 560 dprintk("RPC: %s: failed to create send CQ: %i\n", 561 __func__, rc); 562 goto out1; 563 } 564 565 recvcq = ib_alloc_cq(ia->ri_device, NULL, 566 ep->rep_attr.cap.max_recv_wr + 1, 567 0, IB_POLL_WORKQUEUE); 568 if (IS_ERR(recvcq)) { 569 rc = PTR_ERR(recvcq); 570 dprintk("RPC: %s: failed to create recv CQ: %i\n", 571 __func__, rc); 572 goto out2; 573 } 574 575 ep->rep_attr.send_cq = sendcq; 576 ep->rep_attr.recv_cq = recvcq; 577 578 /* Initialize cma parameters */ 579 memset(&ep->rep_remote_cma, 0, sizeof(ep->rep_remote_cma)); 580 581 /* Prepare RDMA-CM private message */ 582 pmsg->cp_magic = rpcrdma_cmp_magic; 583 pmsg->cp_version = RPCRDMA_CMP_VERSION; 584 pmsg->cp_flags |= ia->ri_ops->ro_send_w_inv_ok; 585 pmsg->cp_send_size = rpcrdma_encode_buffer_size(cdata->inline_wsize); 586 pmsg->cp_recv_size = rpcrdma_encode_buffer_size(cdata->inline_rsize); 587 ep->rep_remote_cma.private_data = pmsg; 588 ep->rep_remote_cma.private_data_len = sizeof(*pmsg); 589 590 /* Client offers RDMA Read but does not initiate */ 591 ep->rep_remote_cma.initiator_depth = 0; 592 if (ia->ri_device->attrs.max_qp_rd_atom > 32) /* arbitrary but <= 255 */ 593 ep->rep_remote_cma.responder_resources = 32; 594 else 595 ep->rep_remote_cma.responder_resources = 596 ia->ri_device->attrs.max_qp_rd_atom; 597 598 /* Limit transport retries so client can detect server 599 * GID changes quickly. RPC layer handles re-establishing 600 * transport connection and retransmission. 601 */ 602 ep->rep_remote_cma.retry_count = 6; 603 604 /* RPC-over-RDMA handles its own flow control. In addition, 605 * make all RNR NAKs visible so we know that RPC-over-RDMA 606 * flow control is working correctly (no NAKs should be seen). 607 */ 608 ep->rep_remote_cma.flow_control = 0; 609 ep->rep_remote_cma.rnr_retry_count = 0; 610 611 return 0; 612 613 out2: 614 ib_free_cq(sendcq); 615 out1: 616 return rc; 617 } 618 619 /* 620 * rpcrdma_ep_destroy 621 * 622 * Disconnect and destroy endpoint. After this, the only 623 * valid operations on the ep are to free it (if dynamically 624 * allocated) or re-create it. 625 */ 626 void 627 rpcrdma_ep_destroy(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia) 628 { 629 cancel_delayed_work_sync(&ep->rep_connect_worker); 630 631 if (ia->ri_id->qp) { 632 rpcrdma_ep_disconnect(ep, ia); 633 rdma_destroy_qp(ia->ri_id); 634 ia->ri_id->qp = NULL; 635 } 636 637 ib_free_cq(ep->rep_attr.recv_cq); 638 ib_free_cq(ep->rep_attr.send_cq); 639 } 640 641 /* Re-establish a connection after a device removal event. 642 * Unlike a normal reconnection, a fresh PD and a new set 643 * of MRs and buffers is needed. 644 */ 645 static int 646 rpcrdma_ep_recreate_xprt(struct rpcrdma_xprt *r_xprt, 647 struct rpcrdma_ep *ep, struct rpcrdma_ia *ia) 648 { 649 int rc, err; 650 651 trace_xprtrdma_reinsert(r_xprt); 652 653 rc = -EHOSTUNREACH; 654 if (rpcrdma_ia_open(r_xprt)) 655 goto out1; 656 657 rc = -ENOMEM; 658 err = rpcrdma_ep_create(ep, ia, &r_xprt->rx_data); 659 if (err) { 660 pr_err("rpcrdma: rpcrdma_ep_create returned %d\n", err); 661 goto out2; 662 } 663 664 rc = -ENETUNREACH; 665 err = rdma_create_qp(ia->ri_id, ia->ri_pd, &ep->rep_attr); 666 if (err) { 667 pr_err("rpcrdma: rdma_create_qp returned %d\n", err); 668 goto out3; 669 } 670 671 rpcrdma_mrs_create(r_xprt); 672 return 0; 673 674 out3: 675 rpcrdma_ep_destroy(ep, ia); 676 out2: 677 rpcrdma_ia_close(ia); 678 out1: 679 return rc; 680 } 681 682 static int 683 rpcrdma_ep_reconnect(struct rpcrdma_xprt *r_xprt, struct rpcrdma_ep *ep, 684 struct rpcrdma_ia *ia) 685 { 686 struct rdma_cm_id *id, *old; 687 int err, rc; 688 689 trace_xprtrdma_reconnect(r_xprt); 690 691 rpcrdma_ep_disconnect(ep, ia); 692 693 rc = -EHOSTUNREACH; 694 id = rpcrdma_create_id(r_xprt, ia); 695 if (IS_ERR(id)) 696 goto out; 697 698 /* As long as the new ID points to the same device as the 699 * old ID, we can reuse the transport's existing PD and all 700 * previously allocated MRs. Also, the same device means 701 * the transport's previous DMA mappings are still valid. 702 * 703 * This is a sanity check only. There should be no way these 704 * point to two different devices here. 705 */ 706 old = id; 707 rc = -ENETUNREACH; 708 if (ia->ri_device != id->device) { 709 pr_err("rpcrdma: can't reconnect on different device!\n"); 710 goto out_destroy; 711 } 712 713 err = rdma_create_qp(id, ia->ri_pd, &ep->rep_attr); 714 if (err) { 715 dprintk("RPC: %s: rdma_create_qp returned %d\n", 716 __func__, err); 717 goto out_destroy; 718 } 719 720 /* Atomically replace the transport's ID and QP. */ 721 rc = 0; 722 old = ia->ri_id; 723 ia->ri_id = id; 724 rdma_destroy_qp(old); 725 726 out_destroy: 727 rdma_destroy_id(old); 728 out: 729 return rc; 730 } 731 732 /* 733 * Connect unconnected endpoint. 734 */ 735 int 736 rpcrdma_ep_connect(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia) 737 { 738 struct rpcrdma_xprt *r_xprt = container_of(ia, struct rpcrdma_xprt, 739 rx_ia); 740 unsigned int extras; 741 int rc; 742 743 retry: 744 switch (ep->rep_connected) { 745 case 0: 746 dprintk("RPC: %s: connecting...\n", __func__); 747 rc = rdma_create_qp(ia->ri_id, ia->ri_pd, &ep->rep_attr); 748 if (rc) { 749 dprintk("RPC: %s: rdma_create_qp failed %i\n", 750 __func__, rc); 751 rc = -ENETUNREACH; 752 goto out_noupdate; 753 } 754 break; 755 case -ENODEV: 756 rc = rpcrdma_ep_recreate_xprt(r_xprt, ep, ia); 757 if (rc) 758 goto out_noupdate; 759 break; 760 default: 761 rc = rpcrdma_ep_reconnect(r_xprt, ep, ia); 762 if (rc) 763 goto out; 764 } 765 766 ep->rep_connected = 0; 767 768 rc = rdma_connect(ia->ri_id, &ep->rep_remote_cma); 769 if (rc) { 770 dprintk("RPC: %s: rdma_connect() failed with %i\n", 771 __func__, rc); 772 goto out; 773 } 774 775 wait_event_interruptible(ep->rep_connect_wait, ep->rep_connected != 0); 776 if (ep->rep_connected <= 0) { 777 if (ep->rep_connected == -EAGAIN) 778 goto retry; 779 rc = ep->rep_connected; 780 goto out; 781 } 782 783 dprintk("RPC: %s: connected\n", __func__); 784 extras = r_xprt->rx_buf.rb_bc_srv_max_requests; 785 if (extras) 786 rpcrdma_ep_post_extra_recv(r_xprt, extras); 787 788 out: 789 if (rc) 790 ep->rep_connected = rc; 791 792 out_noupdate: 793 return rc; 794 } 795 796 /* 797 * rpcrdma_ep_disconnect 798 * 799 * This is separate from destroy to facilitate the ability 800 * to reconnect without recreating the endpoint. 801 * 802 * This call is not reentrant, and must not be made in parallel 803 * on the same endpoint. 804 */ 805 void 806 rpcrdma_ep_disconnect(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia) 807 { 808 int rc; 809 810 rc = rdma_disconnect(ia->ri_id); 811 if (!rc) 812 /* returns without wait if not connected */ 813 wait_event_interruptible(ep->rep_connect_wait, 814 ep->rep_connected != 1); 815 else 816 ep->rep_connected = rc; 817 trace_xprtrdma_disconnect(container_of(ep, struct rpcrdma_xprt, 818 rx_ep), rc); 819 820 ib_drain_qp(ia->ri_id->qp); 821 } 822 823 /* Fixed-size circular FIFO queue. This implementation is wait-free and 824 * lock-free. 825 * 826 * Consumer is the code path that posts Sends. This path dequeues a 827 * sendctx for use by a Send operation. Multiple consumer threads 828 * are serialized by the RPC transport lock, which allows only one 829 * ->send_request call at a time. 830 * 831 * Producer is the code path that handles Send completions. This path 832 * enqueues a sendctx that has been completed. Multiple producer 833 * threads are serialized by the ib_poll_cq() function. 834 */ 835 836 /* rpcrdma_sendctxs_destroy() assumes caller has already quiesced 837 * queue activity, and ib_drain_qp has flushed all remaining Send 838 * requests. 839 */ 840 static void rpcrdma_sendctxs_destroy(struct rpcrdma_buffer *buf) 841 { 842 unsigned long i; 843 844 for (i = 0; i <= buf->rb_sc_last; i++) 845 kfree(buf->rb_sc_ctxs[i]); 846 kfree(buf->rb_sc_ctxs); 847 } 848 849 static struct rpcrdma_sendctx *rpcrdma_sendctx_create(struct rpcrdma_ia *ia) 850 { 851 struct rpcrdma_sendctx *sc; 852 853 sc = kzalloc(sizeof(*sc) + 854 ia->ri_max_send_sges * sizeof(struct ib_sge), 855 GFP_KERNEL); 856 if (!sc) 857 return NULL; 858 859 sc->sc_wr.wr_cqe = &sc->sc_cqe; 860 sc->sc_wr.sg_list = sc->sc_sges; 861 sc->sc_wr.opcode = IB_WR_SEND; 862 sc->sc_cqe.done = rpcrdma_wc_send; 863 return sc; 864 } 865 866 static int rpcrdma_sendctxs_create(struct rpcrdma_xprt *r_xprt) 867 { 868 struct rpcrdma_buffer *buf = &r_xprt->rx_buf; 869 struct rpcrdma_sendctx *sc; 870 unsigned long i; 871 872 /* Maximum number of concurrent outstanding Send WRs. Capping 873 * the circular queue size stops Send Queue overflow by causing 874 * the ->send_request call to fail temporarily before too many 875 * Sends are posted. 876 */ 877 i = buf->rb_max_requests + RPCRDMA_MAX_BC_REQUESTS; 878 dprintk("RPC: %s: allocating %lu send_ctxs\n", __func__, i); 879 buf->rb_sc_ctxs = kcalloc(i, sizeof(sc), GFP_KERNEL); 880 if (!buf->rb_sc_ctxs) 881 return -ENOMEM; 882 883 buf->rb_sc_last = i - 1; 884 for (i = 0; i <= buf->rb_sc_last; i++) { 885 sc = rpcrdma_sendctx_create(&r_xprt->rx_ia); 886 if (!sc) 887 goto out_destroy; 888 889 sc->sc_xprt = r_xprt; 890 buf->rb_sc_ctxs[i] = sc; 891 } 892 893 return 0; 894 895 out_destroy: 896 rpcrdma_sendctxs_destroy(buf); 897 return -ENOMEM; 898 } 899 900 /* The sendctx queue is not guaranteed to have a size that is a 901 * power of two, thus the helpers in circ_buf.h cannot be used. 902 * The other option is to use modulus (%), which can be expensive. 903 */ 904 static unsigned long rpcrdma_sendctx_next(struct rpcrdma_buffer *buf, 905 unsigned long item) 906 { 907 return likely(item < buf->rb_sc_last) ? item + 1 : 0; 908 } 909 910 /** 911 * rpcrdma_sendctx_get_locked - Acquire a send context 912 * @buf: transport buffers from which to acquire an unused context 913 * 914 * Returns pointer to a free send completion context; or NULL if 915 * the queue is empty. 916 * 917 * Usage: Called to acquire an SGE array before preparing a Send WR. 918 * 919 * The caller serializes calls to this function (per rpcrdma_buffer), 920 * and provides an effective memory barrier that flushes the new value 921 * of rb_sc_head. 922 */ 923 struct rpcrdma_sendctx *rpcrdma_sendctx_get_locked(struct rpcrdma_buffer *buf) 924 { 925 struct rpcrdma_xprt *r_xprt; 926 struct rpcrdma_sendctx *sc; 927 unsigned long next_head; 928 929 next_head = rpcrdma_sendctx_next(buf, buf->rb_sc_head); 930 931 if (next_head == READ_ONCE(buf->rb_sc_tail)) 932 goto out_emptyq; 933 934 /* ORDER: item must be accessed _before_ head is updated */ 935 sc = buf->rb_sc_ctxs[next_head]; 936 937 /* Releasing the lock in the caller acts as a memory 938 * barrier that flushes rb_sc_head. 939 */ 940 buf->rb_sc_head = next_head; 941 942 return sc; 943 944 out_emptyq: 945 /* The queue is "empty" if there have not been enough Send 946 * completions recently. This is a sign the Send Queue is 947 * backing up. Cause the caller to pause and try again. 948 */ 949 dprintk("RPC: %s: empty sendctx queue\n", __func__); 950 r_xprt = container_of(buf, struct rpcrdma_xprt, rx_buf); 951 r_xprt->rx_stats.empty_sendctx_q++; 952 return NULL; 953 } 954 955 /** 956 * rpcrdma_sendctx_put_locked - Release a send context 957 * @sc: send context to release 958 * 959 * Usage: Called from Send completion to return a sendctxt 960 * to the queue. 961 * 962 * The caller serializes calls to this function (per rpcrdma_buffer). 963 */ 964 void rpcrdma_sendctx_put_locked(struct rpcrdma_sendctx *sc) 965 { 966 struct rpcrdma_buffer *buf = &sc->sc_xprt->rx_buf; 967 unsigned long next_tail; 968 969 /* Unmap SGEs of previously completed by unsignaled 970 * Sends by walking up the queue until @sc is found. 971 */ 972 next_tail = buf->rb_sc_tail; 973 do { 974 next_tail = rpcrdma_sendctx_next(buf, next_tail); 975 976 /* ORDER: item must be accessed _before_ tail is updated */ 977 rpcrdma_unmap_sendctx(buf->rb_sc_ctxs[next_tail]); 978 979 } while (buf->rb_sc_ctxs[next_tail] != sc); 980 981 /* Paired with READ_ONCE */ 982 smp_store_release(&buf->rb_sc_tail, next_tail); 983 } 984 985 static void 986 rpcrdma_mr_recovery_worker(struct work_struct *work) 987 { 988 struct rpcrdma_buffer *buf = container_of(work, struct rpcrdma_buffer, 989 rb_recovery_worker.work); 990 struct rpcrdma_mr *mr; 991 992 spin_lock(&buf->rb_recovery_lock); 993 while (!list_empty(&buf->rb_stale_mrs)) { 994 mr = rpcrdma_mr_pop(&buf->rb_stale_mrs); 995 spin_unlock(&buf->rb_recovery_lock); 996 997 trace_xprtrdma_recover_mr(mr); 998 mr->mr_xprt->rx_ia.ri_ops->ro_recover_mr(mr); 999 1000 spin_lock(&buf->rb_recovery_lock); 1001 } 1002 spin_unlock(&buf->rb_recovery_lock); 1003 } 1004 1005 void 1006 rpcrdma_mr_defer_recovery(struct rpcrdma_mr *mr) 1007 { 1008 struct rpcrdma_xprt *r_xprt = mr->mr_xprt; 1009 struct rpcrdma_buffer *buf = &r_xprt->rx_buf; 1010 1011 spin_lock(&buf->rb_recovery_lock); 1012 rpcrdma_mr_push(mr, &buf->rb_stale_mrs); 1013 spin_unlock(&buf->rb_recovery_lock); 1014 1015 schedule_delayed_work(&buf->rb_recovery_worker, 0); 1016 } 1017 1018 static void 1019 rpcrdma_mrs_create(struct rpcrdma_xprt *r_xprt) 1020 { 1021 struct rpcrdma_buffer *buf = &r_xprt->rx_buf; 1022 struct rpcrdma_ia *ia = &r_xprt->rx_ia; 1023 unsigned int count; 1024 LIST_HEAD(free); 1025 LIST_HEAD(all); 1026 1027 for (count = 0; count < 32; count++) { 1028 struct rpcrdma_mr *mr; 1029 int rc; 1030 1031 mr = kzalloc(sizeof(*mr), GFP_KERNEL); 1032 if (!mr) 1033 break; 1034 1035 rc = ia->ri_ops->ro_init_mr(ia, mr); 1036 if (rc) { 1037 kfree(mr); 1038 break; 1039 } 1040 1041 mr->mr_xprt = r_xprt; 1042 1043 list_add(&mr->mr_list, &free); 1044 list_add(&mr->mr_all, &all); 1045 } 1046 1047 spin_lock(&buf->rb_mrlock); 1048 list_splice(&free, &buf->rb_mrs); 1049 list_splice(&all, &buf->rb_all); 1050 r_xprt->rx_stats.mrs_allocated += count; 1051 spin_unlock(&buf->rb_mrlock); 1052 1053 trace_xprtrdma_createmrs(r_xprt, count); 1054 } 1055 1056 static void 1057 rpcrdma_mr_refresh_worker(struct work_struct *work) 1058 { 1059 struct rpcrdma_buffer *buf = container_of(work, struct rpcrdma_buffer, 1060 rb_refresh_worker.work); 1061 struct rpcrdma_xprt *r_xprt = container_of(buf, struct rpcrdma_xprt, 1062 rx_buf); 1063 1064 rpcrdma_mrs_create(r_xprt); 1065 } 1066 1067 struct rpcrdma_req * 1068 rpcrdma_create_req(struct rpcrdma_xprt *r_xprt) 1069 { 1070 struct rpcrdma_buffer *buffer = &r_xprt->rx_buf; 1071 struct rpcrdma_req *req; 1072 1073 req = kzalloc(sizeof(*req), GFP_KERNEL); 1074 if (req == NULL) 1075 return ERR_PTR(-ENOMEM); 1076 1077 spin_lock(&buffer->rb_reqslock); 1078 list_add(&req->rl_all, &buffer->rb_allreqs); 1079 spin_unlock(&buffer->rb_reqslock); 1080 req->rl_buffer = &r_xprt->rx_buf; 1081 INIT_LIST_HEAD(&req->rl_registered); 1082 return req; 1083 } 1084 1085 /** 1086 * rpcrdma_create_rep - Allocate an rpcrdma_rep object 1087 * @r_xprt: controlling transport 1088 * 1089 * Returns 0 on success or a negative errno on failure. 1090 */ 1091 int 1092 rpcrdma_create_rep(struct rpcrdma_xprt *r_xprt) 1093 { 1094 struct rpcrdma_create_data_internal *cdata = &r_xprt->rx_data; 1095 struct rpcrdma_buffer *buf = &r_xprt->rx_buf; 1096 struct rpcrdma_rep *rep; 1097 int rc; 1098 1099 rc = -ENOMEM; 1100 rep = kzalloc(sizeof(*rep), GFP_KERNEL); 1101 if (rep == NULL) 1102 goto out; 1103 1104 rep->rr_rdmabuf = rpcrdma_alloc_regbuf(cdata->inline_rsize, 1105 DMA_FROM_DEVICE, GFP_KERNEL); 1106 if (IS_ERR(rep->rr_rdmabuf)) { 1107 rc = PTR_ERR(rep->rr_rdmabuf); 1108 goto out_free; 1109 } 1110 xdr_buf_init(&rep->rr_hdrbuf, rep->rr_rdmabuf->rg_base, 1111 rdmab_length(rep->rr_rdmabuf)); 1112 1113 rep->rr_cqe.done = rpcrdma_wc_receive; 1114 rep->rr_rxprt = r_xprt; 1115 INIT_WORK(&rep->rr_work, rpcrdma_deferred_completion); 1116 rep->rr_recv_wr.next = NULL; 1117 rep->rr_recv_wr.wr_cqe = &rep->rr_cqe; 1118 rep->rr_recv_wr.sg_list = &rep->rr_rdmabuf->rg_iov; 1119 rep->rr_recv_wr.num_sge = 1; 1120 1121 spin_lock(&buf->rb_lock); 1122 list_add(&rep->rr_list, &buf->rb_recv_bufs); 1123 spin_unlock(&buf->rb_lock); 1124 return 0; 1125 1126 out_free: 1127 kfree(rep); 1128 out: 1129 dprintk("RPC: %s: reply buffer %d alloc failed\n", 1130 __func__, rc); 1131 return rc; 1132 } 1133 1134 int 1135 rpcrdma_buffer_create(struct rpcrdma_xprt *r_xprt) 1136 { 1137 struct rpcrdma_buffer *buf = &r_xprt->rx_buf; 1138 int i, rc; 1139 1140 buf->rb_max_requests = r_xprt->rx_data.max_requests; 1141 buf->rb_bc_srv_max_requests = 0; 1142 spin_lock_init(&buf->rb_mrlock); 1143 spin_lock_init(&buf->rb_lock); 1144 spin_lock_init(&buf->rb_recovery_lock); 1145 INIT_LIST_HEAD(&buf->rb_mrs); 1146 INIT_LIST_HEAD(&buf->rb_all); 1147 INIT_LIST_HEAD(&buf->rb_stale_mrs); 1148 INIT_DELAYED_WORK(&buf->rb_refresh_worker, 1149 rpcrdma_mr_refresh_worker); 1150 INIT_DELAYED_WORK(&buf->rb_recovery_worker, 1151 rpcrdma_mr_recovery_worker); 1152 1153 rpcrdma_mrs_create(r_xprt); 1154 1155 INIT_LIST_HEAD(&buf->rb_send_bufs); 1156 INIT_LIST_HEAD(&buf->rb_allreqs); 1157 spin_lock_init(&buf->rb_reqslock); 1158 for (i = 0; i < buf->rb_max_requests; i++) { 1159 struct rpcrdma_req *req; 1160 1161 req = rpcrdma_create_req(r_xprt); 1162 if (IS_ERR(req)) { 1163 dprintk("RPC: %s: request buffer %d alloc" 1164 " failed\n", __func__, i); 1165 rc = PTR_ERR(req); 1166 goto out; 1167 } 1168 list_add(&req->rl_list, &buf->rb_send_bufs); 1169 } 1170 1171 INIT_LIST_HEAD(&buf->rb_recv_bufs); 1172 for (i = 0; i <= buf->rb_max_requests; i++) { 1173 rc = rpcrdma_create_rep(r_xprt); 1174 if (rc) 1175 goto out; 1176 } 1177 1178 rc = rpcrdma_sendctxs_create(r_xprt); 1179 if (rc) 1180 goto out; 1181 1182 return 0; 1183 out: 1184 rpcrdma_buffer_destroy(buf); 1185 return rc; 1186 } 1187 1188 static struct rpcrdma_req * 1189 rpcrdma_buffer_get_req_locked(struct rpcrdma_buffer *buf) 1190 { 1191 struct rpcrdma_req *req; 1192 1193 req = list_first_entry(&buf->rb_send_bufs, 1194 struct rpcrdma_req, rl_list); 1195 list_del_init(&req->rl_list); 1196 return req; 1197 } 1198 1199 static struct rpcrdma_rep * 1200 rpcrdma_buffer_get_rep_locked(struct rpcrdma_buffer *buf) 1201 { 1202 struct rpcrdma_rep *rep; 1203 1204 rep = list_first_entry(&buf->rb_recv_bufs, 1205 struct rpcrdma_rep, rr_list); 1206 list_del(&rep->rr_list); 1207 return rep; 1208 } 1209 1210 static void 1211 rpcrdma_destroy_rep(struct rpcrdma_rep *rep) 1212 { 1213 rpcrdma_free_regbuf(rep->rr_rdmabuf); 1214 kfree(rep); 1215 } 1216 1217 void 1218 rpcrdma_destroy_req(struct rpcrdma_req *req) 1219 { 1220 rpcrdma_free_regbuf(req->rl_recvbuf); 1221 rpcrdma_free_regbuf(req->rl_sendbuf); 1222 rpcrdma_free_regbuf(req->rl_rdmabuf); 1223 kfree(req); 1224 } 1225 1226 static void 1227 rpcrdma_mrs_destroy(struct rpcrdma_buffer *buf) 1228 { 1229 struct rpcrdma_xprt *r_xprt = container_of(buf, struct rpcrdma_xprt, 1230 rx_buf); 1231 struct rpcrdma_ia *ia = rdmab_to_ia(buf); 1232 struct rpcrdma_mr *mr; 1233 unsigned int count; 1234 1235 count = 0; 1236 spin_lock(&buf->rb_mrlock); 1237 while (!list_empty(&buf->rb_all)) { 1238 mr = list_entry(buf->rb_all.next, struct rpcrdma_mr, mr_all); 1239 list_del(&mr->mr_all); 1240 1241 spin_unlock(&buf->rb_mrlock); 1242 ia->ri_ops->ro_release_mr(mr); 1243 count++; 1244 spin_lock(&buf->rb_mrlock); 1245 } 1246 spin_unlock(&buf->rb_mrlock); 1247 r_xprt->rx_stats.mrs_allocated = 0; 1248 1249 dprintk("RPC: %s: released %u MRs\n", __func__, count); 1250 } 1251 1252 void 1253 rpcrdma_buffer_destroy(struct rpcrdma_buffer *buf) 1254 { 1255 cancel_delayed_work_sync(&buf->rb_recovery_worker); 1256 cancel_delayed_work_sync(&buf->rb_refresh_worker); 1257 1258 rpcrdma_sendctxs_destroy(buf); 1259 1260 while (!list_empty(&buf->rb_recv_bufs)) { 1261 struct rpcrdma_rep *rep; 1262 1263 rep = rpcrdma_buffer_get_rep_locked(buf); 1264 rpcrdma_destroy_rep(rep); 1265 } 1266 buf->rb_send_count = 0; 1267 1268 spin_lock(&buf->rb_reqslock); 1269 while (!list_empty(&buf->rb_allreqs)) { 1270 struct rpcrdma_req *req; 1271 1272 req = list_first_entry(&buf->rb_allreqs, 1273 struct rpcrdma_req, rl_all); 1274 list_del(&req->rl_all); 1275 1276 spin_unlock(&buf->rb_reqslock); 1277 rpcrdma_destroy_req(req); 1278 spin_lock(&buf->rb_reqslock); 1279 } 1280 spin_unlock(&buf->rb_reqslock); 1281 buf->rb_recv_count = 0; 1282 1283 rpcrdma_mrs_destroy(buf); 1284 } 1285 1286 /** 1287 * rpcrdma_mr_get - Allocate an rpcrdma_mr object 1288 * @r_xprt: controlling transport 1289 * 1290 * Returns an initialized rpcrdma_mr or NULL if no free 1291 * rpcrdma_mr objects are available. 1292 */ 1293 struct rpcrdma_mr * 1294 rpcrdma_mr_get(struct rpcrdma_xprt *r_xprt) 1295 { 1296 struct rpcrdma_buffer *buf = &r_xprt->rx_buf; 1297 struct rpcrdma_mr *mr = NULL; 1298 1299 spin_lock(&buf->rb_mrlock); 1300 if (!list_empty(&buf->rb_mrs)) 1301 mr = rpcrdma_mr_pop(&buf->rb_mrs); 1302 spin_unlock(&buf->rb_mrlock); 1303 1304 if (!mr) 1305 goto out_nomrs; 1306 return mr; 1307 1308 out_nomrs: 1309 trace_xprtrdma_nomrs(r_xprt); 1310 if (r_xprt->rx_ep.rep_connected != -ENODEV) 1311 schedule_delayed_work(&buf->rb_refresh_worker, 0); 1312 1313 /* Allow the reply handler and refresh worker to run */ 1314 cond_resched(); 1315 1316 return NULL; 1317 } 1318 1319 static void 1320 __rpcrdma_mr_put(struct rpcrdma_buffer *buf, struct rpcrdma_mr *mr) 1321 { 1322 spin_lock(&buf->rb_mrlock); 1323 rpcrdma_mr_push(mr, &buf->rb_mrs); 1324 spin_unlock(&buf->rb_mrlock); 1325 } 1326 1327 /** 1328 * rpcrdma_mr_put - Release an rpcrdma_mr object 1329 * @mr: object to release 1330 * 1331 */ 1332 void 1333 rpcrdma_mr_put(struct rpcrdma_mr *mr) 1334 { 1335 __rpcrdma_mr_put(&mr->mr_xprt->rx_buf, mr); 1336 } 1337 1338 /** 1339 * rpcrdma_mr_unmap_and_put - DMA unmap an MR and release it 1340 * @mr: object to release 1341 * 1342 */ 1343 void 1344 rpcrdma_mr_unmap_and_put(struct rpcrdma_mr *mr) 1345 { 1346 struct rpcrdma_xprt *r_xprt = mr->mr_xprt; 1347 1348 trace_xprtrdma_dma_unmap(mr); 1349 ib_dma_unmap_sg(r_xprt->rx_ia.ri_device, 1350 mr->mr_sg, mr->mr_nents, mr->mr_dir); 1351 __rpcrdma_mr_put(&r_xprt->rx_buf, mr); 1352 } 1353 1354 static struct rpcrdma_rep * 1355 rpcrdma_buffer_get_rep(struct rpcrdma_buffer *buffers) 1356 { 1357 /* If an RPC previously completed without a reply (say, a 1358 * credential problem or a soft timeout occurs) then hold off 1359 * on supplying more Receive buffers until the number of new 1360 * pending RPCs catches up to the number of posted Receives. 1361 */ 1362 if (unlikely(buffers->rb_send_count < buffers->rb_recv_count)) 1363 return NULL; 1364 1365 if (unlikely(list_empty(&buffers->rb_recv_bufs))) 1366 return NULL; 1367 buffers->rb_recv_count++; 1368 return rpcrdma_buffer_get_rep_locked(buffers); 1369 } 1370 1371 /* 1372 * Get a set of request/reply buffers. 1373 * 1374 * Reply buffer (if available) is attached to send buffer upon return. 1375 */ 1376 struct rpcrdma_req * 1377 rpcrdma_buffer_get(struct rpcrdma_buffer *buffers) 1378 { 1379 struct rpcrdma_req *req; 1380 1381 spin_lock(&buffers->rb_lock); 1382 if (list_empty(&buffers->rb_send_bufs)) 1383 goto out_reqbuf; 1384 buffers->rb_send_count++; 1385 req = rpcrdma_buffer_get_req_locked(buffers); 1386 req->rl_reply = rpcrdma_buffer_get_rep(buffers); 1387 spin_unlock(&buffers->rb_lock); 1388 1389 return req; 1390 1391 out_reqbuf: 1392 spin_unlock(&buffers->rb_lock); 1393 return NULL; 1394 } 1395 1396 /* 1397 * Put request/reply buffers back into pool. 1398 * Pre-decrement counter/array index. 1399 */ 1400 void 1401 rpcrdma_buffer_put(struct rpcrdma_req *req) 1402 { 1403 struct rpcrdma_buffer *buffers = req->rl_buffer; 1404 struct rpcrdma_rep *rep = req->rl_reply; 1405 1406 req->rl_reply = NULL; 1407 1408 spin_lock(&buffers->rb_lock); 1409 buffers->rb_send_count--; 1410 list_add_tail(&req->rl_list, &buffers->rb_send_bufs); 1411 if (rep) { 1412 buffers->rb_recv_count--; 1413 list_add_tail(&rep->rr_list, &buffers->rb_recv_bufs); 1414 } 1415 spin_unlock(&buffers->rb_lock); 1416 } 1417 1418 /* 1419 * Recover reply buffers from pool. 1420 * This happens when recovering from disconnect. 1421 */ 1422 void 1423 rpcrdma_recv_buffer_get(struct rpcrdma_req *req) 1424 { 1425 struct rpcrdma_buffer *buffers = req->rl_buffer; 1426 1427 spin_lock(&buffers->rb_lock); 1428 req->rl_reply = rpcrdma_buffer_get_rep(buffers); 1429 spin_unlock(&buffers->rb_lock); 1430 } 1431 1432 /* 1433 * Put reply buffers back into pool when not attached to 1434 * request. This happens in error conditions. 1435 */ 1436 void 1437 rpcrdma_recv_buffer_put(struct rpcrdma_rep *rep) 1438 { 1439 struct rpcrdma_buffer *buffers = &rep->rr_rxprt->rx_buf; 1440 1441 spin_lock(&buffers->rb_lock); 1442 buffers->rb_recv_count--; 1443 list_add_tail(&rep->rr_list, &buffers->rb_recv_bufs); 1444 spin_unlock(&buffers->rb_lock); 1445 } 1446 1447 /** 1448 * rpcrdma_alloc_regbuf - allocate and DMA-map memory for SEND/RECV buffers 1449 * @size: size of buffer to be allocated, in bytes 1450 * @direction: direction of data movement 1451 * @flags: GFP flags 1452 * 1453 * Returns an ERR_PTR, or a pointer to a regbuf, a buffer that 1454 * can be persistently DMA-mapped for I/O. 1455 * 1456 * xprtrdma uses a regbuf for posting an outgoing RDMA SEND, or for 1457 * receiving the payload of RDMA RECV operations. During Long Calls 1458 * or Replies they may be registered externally via ro_map. 1459 */ 1460 struct rpcrdma_regbuf * 1461 rpcrdma_alloc_regbuf(size_t size, enum dma_data_direction direction, 1462 gfp_t flags) 1463 { 1464 struct rpcrdma_regbuf *rb; 1465 1466 rb = kmalloc(sizeof(*rb) + size, flags); 1467 if (rb == NULL) 1468 return ERR_PTR(-ENOMEM); 1469 1470 rb->rg_device = NULL; 1471 rb->rg_direction = direction; 1472 rb->rg_iov.length = size; 1473 1474 return rb; 1475 } 1476 1477 /** 1478 * __rpcrdma_map_regbuf - DMA-map a regbuf 1479 * @ia: controlling rpcrdma_ia 1480 * @rb: regbuf to be mapped 1481 */ 1482 bool 1483 __rpcrdma_dma_map_regbuf(struct rpcrdma_ia *ia, struct rpcrdma_regbuf *rb) 1484 { 1485 struct ib_device *device = ia->ri_device; 1486 1487 if (rb->rg_direction == DMA_NONE) 1488 return false; 1489 1490 rb->rg_iov.addr = ib_dma_map_single(device, 1491 (void *)rb->rg_base, 1492 rdmab_length(rb), 1493 rb->rg_direction); 1494 if (ib_dma_mapping_error(device, rdmab_addr(rb))) 1495 return false; 1496 1497 rb->rg_device = device; 1498 rb->rg_iov.lkey = ia->ri_pd->local_dma_lkey; 1499 return true; 1500 } 1501 1502 static void 1503 rpcrdma_dma_unmap_regbuf(struct rpcrdma_regbuf *rb) 1504 { 1505 if (!rb) 1506 return; 1507 1508 if (!rpcrdma_regbuf_is_mapped(rb)) 1509 return; 1510 1511 ib_dma_unmap_single(rb->rg_device, rdmab_addr(rb), 1512 rdmab_length(rb), rb->rg_direction); 1513 rb->rg_device = NULL; 1514 } 1515 1516 /** 1517 * rpcrdma_free_regbuf - deregister and free registered buffer 1518 * @rb: regbuf to be deregistered and freed 1519 */ 1520 void 1521 rpcrdma_free_regbuf(struct rpcrdma_regbuf *rb) 1522 { 1523 rpcrdma_dma_unmap_regbuf(rb); 1524 kfree(rb); 1525 } 1526 1527 /* 1528 * Prepost any receive buffer, then post send. 1529 * 1530 * Receive buffer is donated to hardware, reclaimed upon recv completion. 1531 */ 1532 int 1533 rpcrdma_ep_post(struct rpcrdma_ia *ia, 1534 struct rpcrdma_ep *ep, 1535 struct rpcrdma_req *req) 1536 { 1537 struct ib_send_wr *send_wr = &req->rl_sendctx->sc_wr; 1538 struct ib_send_wr *send_wr_fail; 1539 int rc; 1540 1541 if (req->rl_reply) { 1542 rc = rpcrdma_ep_post_recv(ia, req->rl_reply); 1543 if (rc) 1544 return rc; 1545 req->rl_reply = NULL; 1546 } 1547 1548 if (!ep->rep_send_count || 1549 test_bit(RPCRDMA_REQ_F_TX_RESOURCES, &req->rl_flags)) { 1550 send_wr->send_flags |= IB_SEND_SIGNALED; 1551 ep->rep_send_count = ep->rep_send_batch; 1552 } else { 1553 send_wr->send_flags &= ~IB_SEND_SIGNALED; 1554 --ep->rep_send_count; 1555 } 1556 1557 rc = ib_post_send(ia->ri_id->qp, send_wr, &send_wr_fail); 1558 trace_xprtrdma_post_send(req, rc); 1559 if (rc) 1560 return -ENOTCONN; 1561 return 0; 1562 } 1563 1564 int 1565 rpcrdma_ep_post_recv(struct rpcrdma_ia *ia, 1566 struct rpcrdma_rep *rep) 1567 { 1568 struct ib_recv_wr *recv_wr_fail; 1569 int rc; 1570 1571 if (!rpcrdma_dma_map_regbuf(ia, rep->rr_rdmabuf)) 1572 goto out_map; 1573 rc = ib_post_recv(ia->ri_id->qp, &rep->rr_recv_wr, &recv_wr_fail); 1574 trace_xprtrdma_post_recv(rep, rc); 1575 if (rc) 1576 return -ENOTCONN; 1577 return 0; 1578 1579 out_map: 1580 pr_err("rpcrdma: failed to DMA map the Receive buffer\n"); 1581 return -EIO; 1582 } 1583 1584 /** 1585 * rpcrdma_ep_post_extra_recv - Post buffers for incoming backchannel requests 1586 * @r_xprt: transport associated with these backchannel resources 1587 * @count: minimum number of incoming requests expected 1588 * 1589 * Returns zero if all requested buffers were posted, or a negative errno. 1590 */ 1591 int 1592 rpcrdma_ep_post_extra_recv(struct rpcrdma_xprt *r_xprt, unsigned int count) 1593 { 1594 struct rpcrdma_buffer *buffers = &r_xprt->rx_buf; 1595 struct rpcrdma_ia *ia = &r_xprt->rx_ia; 1596 struct rpcrdma_rep *rep; 1597 int rc; 1598 1599 while (count--) { 1600 spin_lock(&buffers->rb_lock); 1601 if (list_empty(&buffers->rb_recv_bufs)) 1602 goto out_reqbuf; 1603 rep = rpcrdma_buffer_get_rep_locked(buffers); 1604 spin_unlock(&buffers->rb_lock); 1605 1606 rc = rpcrdma_ep_post_recv(ia, rep); 1607 if (rc) 1608 goto out_rc; 1609 } 1610 1611 return 0; 1612 1613 out_reqbuf: 1614 spin_unlock(&buffers->rb_lock); 1615 trace_xprtrdma_noreps(r_xprt); 1616 return -ENOMEM; 1617 1618 out_rc: 1619 rpcrdma_recv_buffer_put(rep); 1620 return rc; 1621 } 1622