1 /* 2 * Copyright (c) 2014-2017 Oracle. All rights reserved. 3 * Copyright (c) 2003-2007 Network Appliance, Inc. All rights reserved. 4 * 5 * This software is available to you under a choice of one of two 6 * licenses. You may choose to be licensed under the terms of the GNU 7 * General Public License (GPL) Version 2, available from the file 8 * COPYING in the main directory of this source tree, or the BSD-type 9 * license below: 10 * 11 * Redistribution and use in source and binary forms, with or without 12 * modification, are permitted provided that the following conditions 13 * are met: 14 * 15 * Redistributions of source code must retain the above copyright 16 * notice, this list of conditions and the following disclaimer. 17 * 18 * Redistributions in binary form must reproduce the above 19 * copyright notice, this list of conditions and the following 20 * disclaimer in the documentation and/or other materials provided 21 * with the distribution. 22 * 23 * Neither the name of the Network Appliance, Inc. nor the names of 24 * its contributors may be used to endorse or promote products 25 * derived from this software without specific prior written 26 * permission. 27 * 28 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 29 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 30 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 31 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 32 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 33 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 34 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 35 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 36 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 37 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 38 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 39 */ 40 41 /* 42 * verbs.c 43 * 44 * Encapsulates the major functions managing: 45 * o adapters 46 * o endpoints 47 * o connections 48 * o buffer memory 49 */ 50 51 #include <linux/interrupt.h> 52 #include <linux/slab.h> 53 #include <linux/sunrpc/addr.h> 54 #include <linux/sunrpc/svc_rdma.h> 55 56 #include <asm-generic/barrier.h> 57 #include <asm/bitops.h> 58 59 #include <rdma/ib_cm.h> 60 61 #include "xprt_rdma.h" 62 63 /* 64 * Globals/Macros 65 */ 66 67 #if IS_ENABLED(CONFIG_SUNRPC_DEBUG) 68 # define RPCDBG_FACILITY RPCDBG_TRANS 69 #endif 70 71 /* 72 * internal functions 73 */ 74 static void rpcrdma_mrs_create(struct rpcrdma_xprt *r_xprt); 75 static void rpcrdma_mrs_destroy(struct rpcrdma_buffer *buf); 76 static void rpcrdma_dma_unmap_regbuf(struct rpcrdma_regbuf *rb); 77 78 struct workqueue_struct *rpcrdma_receive_wq __read_mostly; 79 80 int 81 rpcrdma_alloc_wq(void) 82 { 83 struct workqueue_struct *recv_wq; 84 85 recv_wq = alloc_workqueue("xprtrdma_receive", 86 WQ_MEM_RECLAIM | WQ_HIGHPRI, 87 0); 88 if (!recv_wq) 89 return -ENOMEM; 90 91 rpcrdma_receive_wq = recv_wq; 92 return 0; 93 } 94 95 void 96 rpcrdma_destroy_wq(void) 97 { 98 struct workqueue_struct *wq; 99 100 if (rpcrdma_receive_wq) { 101 wq = rpcrdma_receive_wq; 102 rpcrdma_receive_wq = NULL; 103 destroy_workqueue(wq); 104 } 105 } 106 107 static void 108 rpcrdma_qp_async_error_upcall(struct ib_event *event, void *context) 109 { 110 struct rpcrdma_ep *ep = context; 111 struct rpcrdma_xprt *r_xprt = container_of(ep, struct rpcrdma_xprt, 112 rx_ep); 113 114 trace_xprtrdma_qp_error(r_xprt, event); 115 pr_err("rpcrdma: %s on device %s ep %p\n", 116 ib_event_msg(event->event), event->device->name, context); 117 118 if (ep->rep_connected == 1) { 119 ep->rep_connected = -EIO; 120 rpcrdma_conn_func(ep); 121 wake_up_all(&ep->rep_connect_wait); 122 } 123 } 124 125 /** 126 * rpcrdma_wc_send - Invoked by RDMA provider for each polled Send WC 127 * @cq: completion queue (ignored) 128 * @wc: completed WR 129 * 130 */ 131 static void 132 rpcrdma_wc_send(struct ib_cq *cq, struct ib_wc *wc) 133 { 134 struct ib_cqe *cqe = wc->wr_cqe; 135 struct rpcrdma_sendctx *sc = 136 container_of(cqe, struct rpcrdma_sendctx, sc_cqe); 137 138 /* WARNING: Only wr_cqe and status are reliable at this point */ 139 trace_xprtrdma_wc_send(sc, wc); 140 if (wc->status != IB_WC_SUCCESS && wc->status != IB_WC_WR_FLUSH_ERR) 141 pr_err("rpcrdma: Send: %s (%u/0x%x)\n", 142 ib_wc_status_msg(wc->status), 143 wc->status, wc->vendor_err); 144 145 rpcrdma_sendctx_put_locked(sc); 146 } 147 148 /** 149 * rpcrdma_wc_receive - Invoked by RDMA provider for each polled Receive WC 150 * @cq: completion queue (ignored) 151 * @wc: completed WR 152 * 153 */ 154 static void 155 rpcrdma_wc_receive(struct ib_cq *cq, struct ib_wc *wc) 156 { 157 struct ib_cqe *cqe = wc->wr_cqe; 158 struct rpcrdma_rep *rep = container_of(cqe, struct rpcrdma_rep, 159 rr_cqe); 160 161 /* WARNING: Only wr_id and status are reliable at this point */ 162 trace_xprtrdma_wc_receive(rep, wc); 163 if (wc->status != IB_WC_SUCCESS) 164 goto out_fail; 165 166 /* status == SUCCESS means all fields in wc are trustworthy */ 167 rpcrdma_set_xdrlen(&rep->rr_hdrbuf, wc->byte_len); 168 rep->rr_wc_flags = wc->wc_flags; 169 rep->rr_inv_rkey = wc->ex.invalidate_rkey; 170 171 ib_dma_sync_single_for_cpu(rdmab_device(rep->rr_rdmabuf), 172 rdmab_addr(rep->rr_rdmabuf), 173 wc->byte_len, DMA_FROM_DEVICE); 174 175 out_schedule: 176 rpcrdma_reply_handler(rep); 177 return; 178 179 out_fail: 180 if (wc->status != IB_WC_WR_FLUSH_ERR) 181 pr_err("rpcrdma: Recv: %s (%u/0x%x)\n", 182 ib_wc_status_msg(wc->status), 183 wc->status, wc->vendor_err); 184 rpcrdma_set_xdrlen(&rep->rr_hdrbuf, 0); 185 goto out_schedule; 186 } 187 188 static void 189 rpcrdma_update_connect_private(struct rpcrdma_xprt *r_xprt, 190 struct rdma_conn_param *param) 191 { 192 struct rpcrdma_create_data_internal *cdata = &r_xprt->rx_data; 193 const struct rpcrdma_connect_private *pmsg = param->private_data; 194 unsigned int rsize, wsize; 195 196 /* Default settings for RPC-over-RDMA Version One */ 197 r_xprt->rx_ia.ri_implicit_roundup = xprt_rdma_pad_optimize; 198 rsize = RPCRDMA_V1_DEF_INLINE_SIZE; 199 wsize = RPCRDMA_V1_DEF_INLINE_SIZE; 200 201 if (pmsg && 202 pmsg->cp_magic == rpcrdma_cmp_magic && 203 pmsg->cp_version == RPCRDMA_CMP_VERSION) { 204 r_xprt->rx_ia.ri_implicit_roundup = true; 205 rsize = rpcrdma_decode_buffer_size(pmsg->cp_send_size); 206 wsize = rpcrdma_decode_buffer_size(pmsg->cp_recv_size); 207 } 208 209 if (rsize < cdata->inline_rsize) 210 cdata->inline_rsize = rsize; 211 if (wsize < cdata->inline_wsize) 212 cdata->inline_wsize = wsize; 213 dprintk("RPC: %s: max send %u, max recv %u\n", 214 __func__, cdata->inline_wsize, cdata->inline_rsize); 215 rpcrdma_set_max_header_sizes(r_xprt); 216 } 217 218 static int 219 rpcrdma_conn_upcall(struct rdma_cm_id *id, struct rdma_cm_event *event) 220 { 221 struct rpcrdma_xprt *xprt = id->context; 222 struct rpcrdma_ia *ia = &xprt->rx_ia; 223 struct rpcrdma_ep *ep = &xprt->rx_ep; 224 int connstate = 0; 225 226 trace_xprtrdma_conn_upcall(xprt, event); 227 switch (event->event) { 228 case RDMA_CM_EVENT_ADDR_RESOLVED: 229 case RDMA_CM_EVENT_ROUTE_RESOLVED: 230 ia->ri_async_rc = 0; 231 complete(&ia->ri_done); 232 break; 233 case RDMA_CM_EVENT_ADDR_ERROR: 234 ia->ri_async_rc = -EHOSTUNREACH; 235 complete(&ia->ri_done); 236 break; 237 case RDMA_CM_EVENT_ROUTE_ERROR: 238 ia->ri_async_rc = -ENETUNREACH; 239 complete(&ia->ri_done); 240 break; 241 case RDMA_CM_EVENT_DEVICE_REMOVAL: 242 #if IS_ENABLED(CONFIG_SUNRPC_DEBUG) 243 pr_info("rpcrdma: removing device %s for %s:%s\n", 244 ia->ri_device->name, 245 rpcrdma_addrstr(xprt), rpcrdma_portstr(xprt)); 246 #endif 247 set_bit(RPCRDMA_IAF_REMOVING, &ia->ri_flags); 248 ep->rep_connected = -ENODEV; 249 xprt_force_disconnect(&xprt->rx_xprt); 250 wait_for_completion(&ia->ri_remove_done); 251 252 ia->ri_id = NULL; 253 ia->ri_device = NULL; 254 /* Return 1 to ensure the core destroys the id. */ 255 return 1; 256 case RDMA_CM_EVENT_ESTABLISHED: 257 ++xprt->rx_xprt.connect_cookie; 258 connstate = 1; 259 rpcrdma_update_connect_private(xprt, &event->param.conn); 260 goto connected; 261 case RDMA_CM_EVENT_CONNECT_ERROR: 262 connstate = -ENOTCONN; 263 goto connected; 264 case RDMA_CM_EVENT_UNREACHABLE: 265 connstate = -ENETDOWN; 266 goto connected; 267 case RDMA_CM_EVENT_REJECTED: 268 dprintk("rpcrdma: connection to %s:%s rejected: %s\n", 269 rpcrdma_addrstr(xprt), rpcrdma_portstr(xprt), 270 rdma_reject_msg(id, event->status)); 271 connstate = -ECONNREFUSED; 272 if (event->status == IB_CM_REJ_STALE_CONN) 273 connstate = -EAGAIN; 274 goto connected; 275 case RDMA_CM_EVENT_DISCONNECTED: 276 ++xprt->rx_xprt.connect_cookie; 277 connstate = -ECONNABORTED; 278 connected: 279 xprt->rx_buf.rb_credits = 1; 280 ep->rep_connected = connstate; 281 rpcrdma_conn_func(ep); 282 wake_up_all(&ep->rep_connect_wait); 283 /*FALLTHROUGH*/ 284 default: 285 dprintk("RPC: %s: %s:%s on %s/%s (ep 0x%p): %s\n", 286 __func__, 287 rpcrdma_addrstr(xprt), rpcrdma_portstr(xprt), 288 ia->ri_device->name, ia->ri_ops->ro_displayname, 289 ep, rdma_event_msg(event->event)); 290 break; 291 } 292 293 return 0; 294 } 295 296 static struct rdma_cm_id * 297 rpcrdma_create_id(struct rpcrdma_xprt *xprt, struct rpcrdma_ia *ia) 298 { 299 unsigned long wtimeout = msecs_to_jiffies(RDMA_RESOLVE_TIMEOUT) + 1; 300 struct rdma_cm_id *id; 301 int rc; 302 303 trace_xprtrdma_conn_start(xprt); 304 305 init_completion(&ia->ri_done); 306 init_completion(&ia->ri_remove_done); 307 308 id = rdma_create_id(&init_net, rpcrdma_conn_upcall, xprt, RDMA_PS_TCP, 309 IB_QPT_RC); 310 if (IS_ERR(id)) { 311 rc = PTR_ERR(id); 312 dprintk("RPC: %s: rdma_create_id() failed %i\n", 313 __func__, rc); 314 return id; 315 } 316 317 ia->ri_async_rc = -ETIMEDOUT; 318 rc = rdma_resolve_addr(id, NULL, 319 (struct sockaddr *)&xprt->rx_xprt.addr, 320 RDMA_RESOLVE_TIMEOUT); 321 if (rc) { 322 dprintk("RPC: %s: rdma_resolve_addr() failed %i\n", 323 __func__, rc); 324 goto out; 325 } 326 rc = wait_for_completion_interruptible_timeout(&ia->ri_done, wtimeout); 327 if (rc < 0) { 328 trace_xprtrdma_conn_tout(xprt); 329 goto out; 330 } 331 332 rc = ia->ri_async_rc; 333 if (rc) 334 goto out; 335 336 ia->ri_async_rc = -ETIMEDOUT; 337 rc = rdma_resolve_route(id, RDMA_RESOLVE_TIMEOUT); 338 if (rc) { 339 dprintk("RPC: %s: rdma_resolve_route() failed %i\n", 340 __func__, rc); 341 goto out; 342 } 343 rc = wait_for_completion_interruptible_timeout(&ia->ri_done, wtimeout); 344 if (rc < 0) { 345 trace_xprtrdma_conn_tout(xprt); 346 goto out; 347 } 348 rc = ia->ri_async_rc; 349 if (rc) 350 goto out; 351 352 return id; 353 354 out: 355 rdma_destroy_id(id); 356 return ERR_PTR(rc); 357 } 358 359 /* 360 * Exported functions. 361 */ 362 363 /** 364 * rpcrdma_ia_open - Open and initialize an Interface Adapter. 365 * @xprt: transport with IA to (re)initialize 366 * 367 * Returns 0 on success, negative errno if an appropriate 368 * Interface Adapter could not be found and opened. 369 */ 370 int 371 rpcrdma_ia_open(struct rpcrdma_xprt *xprt) 372 { 373 struct rpcrdma_ia *ia = &xprt->rx_ia; 374 int rc; 375 376 ia->ri_id = rpcrdma_create_id(xprt, ia); 377 if (IS_ERR(ia->ri_id)) { 378 rc = PTR_ERR(ia->ri_id); 379 goto out_err; 380 } 381 ia->ri_device = ia->ri_id->device; 382 383 ia->ri_pd = ib_alloc_pd(ia->ri_device, 0); 384 if (IS_ERR(ia->ri_pd)) { 385 rc = PTR_ERR(ia->ri_pd); 386 pr_err("rpcrdma: ib_alloc_pd() returned %d\n", rc); 387 goto out_err; 388 } 389 390 switch (xprt_rdma_memreg_strategy) { 391 case RPCRDMA_FRWR: 392 if (frwr_is_supported(ia)) { 393 ia->ri_ops = &rpcrdma_frwr_memreg_ops; 394 break; 395 } 396 /*FALLTHROUGH*/ 397 case RPCRDMA_MTHCAFMR: 398 if (fmr_is_supported(ia)) { 399 ia->ri_ops = &rpcrdma_fmr_memreg_ops; 400 break; 401 } 402 /*FALLTHROUGH*/ 403 default: 404 pr_err("rpcrdma: Device %s does not support memreg mode %d\n", 405 ia->ri_device->name, xprt_rdma_memreg_strategy); 406 rc = -EINVAL; 407 goto out_err; 408 } 409 410 return 0; 411 412 out_err: 413 rpcrdma_ia_close(ia); 414 return rc; 415 } 416 417 /** 418 * rpcrdma_ia_remove - Handle device driver unload 419 * @ia: interface adapter being removed 420 * 421 * Divest transport H/W resources associated with this adapter, 422 * but allow it to be restored later. 423 */ 424 void 425 rpcrdma_ia_remove(struct rpcrdma_ia *ia) 426 { 427 struct rpcrdma_xprt *r_xprt = container_of(ia, struct rpcrdma_xprt, 428 rx_ia); 429 struct rpcrdma_ep *ep = &r_xprt->rx_ep; 430 struct rpcrdma_buffer *buf = &r_xprt->rx_buf; 431 struct rpcrdma_req *req; 432 struct rpcrdma_rep *rep; 433 434 cancel_delayed_work_sync(&buf->rb_refresh_worker); 435 436 /* This is similar to rpcrdma_ep_destroy, but: 437 * - Don't cancel the connect worker. 438 * - Don't call rpcrdma_ep_disconnect, which waits 439 * for another conn upcall, which will deadlock. 440 * - rdma_disconnect is unneeded, the underlying 441 * connection is already gone. 442 */ 443 if (ia->ri_id->qp) { 444 ib_drain_qp(ia->ri_id->qp); 445 rdma_destroy_qp(ia->ri_id); 446 ia->ri_id->qp = NULL; 447 } 448 ib_free_cq(ep->rep_attr.recv_cq); 449 ep->rep_attr.recv_cq = NULL; 450 ib_free_cq(ep->rep_attr.send_cq); 451 ep->rep_attr.send_cq = NULL; 452 453 /* The ULP is responsible for ensuring all DMA 454 * mappings and MRs are gone. 455 */ 456 list_for_each_entry(rep, &buf->rb_recv_bufs, rr_list) 457 rpcrdma_dma_unmap_regbuf(rep->rr_rdmabuf); 458 list_for_each_entry(req, &buf->rb_allreqs, rl_all) { 459 rpcrdma_dma_unmap_regbuf(req->rl_rdmabuf); 460 rpcrdma_dma_unmap_regbuf(req->rl_sendbuf); 461 rpcrdma_dma_unmap_regbuf(req->rl_recvbuf); 462 } 463 rpcrdma_mrs_destroy(buf); 464 ib_dealloc_pd(ia->ri_pd); 465 ia->ri_pd = NULL; 466 467 /* Allow waiters to continue */ 468 complete(&ia->ri_remove_done); 469 470 trace_xprtrdma_remove(r_xprt); 471 } 472 473 /** 474 * rpcrdma_ia_close - Clean up/close an IA. 475 * @ia: interface adapter to close 476 * 477 */ 478 void 479 rpcrdma_ia_close(struct rpcrdma_ia *ia) 480 { 481 if (ia->ri_id != NULL && !IS_ERR(ia->ri_id)) { 482 if (ia->ri_id->qp) 483 rdma_destroy_qp(ia->ri_id); 484 rdma_destroy_id(ia->ri_id); 485 } 486 ia->ri_id = NULL; 487 ia->ri_device = NULL; 488 489 /* If the pd is still busy, xprtrdma missed freeing a resource */ 490 if (ia->ri_pd && !IS_ERR(ia->ri_pd)) 491 ib_dealloc_pd(ia->ri_pd); 492 ia->ri_pd = NULL; 493 } 494 495 /* 496 * Create unconnected endpoint. 497 */ 498 int 499 rpcrdma_ep_create(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia, 500 struct rpcrdma_create_data_internal *cdata) 501 { 502 struct rpcrdma_connect_private *pmsg = &ep->rep_cm_private; 503 unsigned int max_qp_wr, max_sge; 504 struct ib_cq *sendcq, *recvcq; 505 int rc; 506 507 max_sge = min_t(unsigned int, ia->ri_device->attrs.max_sge, 508 RPCRDMA_MAX_SEND_SGES); 509 if (max_sge < RPCRDMA_MIN_SEND_SGES) { 510 pr_warn("rpcrdma: HCA provides only %d send SGEs\n", max_sge); 511 return -ENOMEM; 512 } 513 ia->ri_max_send_sges = max_sge; 514 515 if (ia->ri_device->attrs.max_qp_wr <= RPCRDMA_BACKWARD_WRS) { 516 dprintk("RPC: %s: insufficient wqe's available\n", 517 __func__); 518 return -ENOMEM; 519 } 520 max_qp_wr = ia->ri_device->attrs.max_qp_wr - RPCRDMA_BACKWARD_WRS - 1; 521 522 /* check provider's send/recv wr limits */ 523 if (cdata->max_requests > max_qp_wr) 524 cdata->max_requests = max_qp_wr; 525 526 ep->rep_attr.event_handler = rpcrdma_qp_async_error_upcall; 527 ep->rep_attr.qp_context = ep; 528 ep->rep_attr.srq = NULL; 529 ep->rep_attr.cap.max_send_wr = cdata->max_requests; 530 ep->rep_attr.cap.max_send_wr += RPCRDMA_BACKWARD_WRS; 531 ep->rep_attr.cap.max_send_wr += 1; /* drain cqe */ 532 rc = ia->ri_ops->ro_open(ia, ep, cdata); 533 if (rc) 534 return rc; 535 ep->rep_attr.cap.max_recv_wr = cdata->max_requests; 536 ep->rep_attr.cap.max_recv_wr += RPCRDMA_BACKWARD_WRS; 537 ep->rep_attr.cap.max_recv_wr += 1; /* drain cqe */ 538 ep->rep_attr.cap.max_send_sge = max_sge; 539 ep->rep_attr.cap.max_recv_sge = 1; 540 ep->rep_attr.cap.max_inline_data = 0; 541 ep->rep_attr.sq_sig_type = IB_SIGNAL_REQ_WR; 542 ep->rep_attr.qp_type = IB_QPT_RC; 543 ep->rep_attr.port_num = ~0; 544 545 dprintk("RPC: %s: requested max: dtos: send %d recv %d; " 546 "iovs: send %d recv %d\n", 547 __func__, 548 ep->rep_attr.cap.max_send_wr, 549 ep->rep_attr.cap.max_recv_wr, 550 ep->rep_attr.cap.max_send_sge, 551 ep->rep_attr.cap.max_recv_sge); 552 553 /* set trigger for requesting send completion */ 554 ep->rep_send_batch = min_t(unsigned int, RPCRDMA_MAX_SEND_BATCH, 555 cdata->max_requests >> 2); 556 ep->rep_send_count = ep->rep_send_batch; 557 init_waitqueue_head(&ep->rep_connect_wait); 558 INIT_DELAYED_WORK(&ep->rep_connect_worker, rpcrdma_connect_worker); 559 560 sendcq = ib_alloc_cq(ia->ri_device, NULL, 561 ep->rep_attr.cap.max_send_wr + 1, 562 1, IB_POLL_WORKQUEUE); 563 if (IS_ERR(sendcq)) { 564 rc = PTR_ERR(sendcq); 565 dprintk("RPC: %s: failed to create send CQ: %i\n", 566 __func__, rc); 567 goto out1; 568 } 569 570 recvcq = ib_alloc_cq(ia->ri_device, NULL, 571 ep->rep_attr.cap.max_recv_wr + 1, 572 0, IB_POLL_WORKQUEUE); 573 if (IS_ERR(recvcq)) { 574 rc = PTR_ERR(recvcq); 575 dprintk("RPC: %s: failed to create recv CQ: %i\n", 576 __func__, rc); 577 goto out2; 578 } 579 580 ep->rep_attr.send_cq = sendcq; 581 ep->rep_attr.recv_cq = recvcq; 582 583 /* Initialize cma parameters */ 584 memset(&ep->rep_remote_cma, 0, sizeof(ep->rep_remote_cma)); 585 586 /* Prepare RDMA-CM private message */ 587 pmsg->cp_magic = rpcrdma_cmp_magic; 588 pmsg->cp_version = RPCRDMA_CMP_VERSION; 589 pmsg->cp_flags |= ia->ri_ops->ro_send_w_inv_ok; 590 pmsg->cp_send_size = rpcrdma_encode_buffer_size(cdata->inline_wsize); 591 pmsg->cp_recv_size = rpcrdma_encode_buffer_size(cdata->inline_rsize); 592 ep->rep_remote_cma.private_data = pmsg; 593 ep->rep_remote_cma.private_data_len = sizeof(*pmsg); 594 595 /* Client offers RDMA Read but does not initiate */ 596 ep->rep_remote_cma.initiator_depth = 0; 597 ep->rep_remote_cma.responder_resources = 598 min_t(int, U8_MAX, ia->ri_device->attrs.max_qp_rd_atom); 599 600 /* Limit transport retries so client can detect server 601 * GID changes quickly. RPC layer handles re-establishing 602 * transport connection and retransmission. 603 */ 604 ep->rep_remote_cma.retry_count = 6; 605 606 /* RPC-over-RDMA handles its own flow control. In addition, 607 * make all RNR NAKs visible so we know that RPC-over-RDMA 608 * flow control is working correctly (no NAKs should be seen). 609 */ 610 ep->rep_remote_cma.flow_control = 0; 611 ep->rep_remote_cma.rnr_retry_count = 0; 612 613 return 0; 614 615 out2: 616 ib_free_cq(sendcq); 617 out1: 618 return rc; 619 } 620 621 /* 622 * rpcrdma_ep_destroy 623 * 624 * Disconnect and destroy endpoint. After this, the only 625 * valid operations on the ep are to free it (if dynamically 626 * allocated) or re-create it. 627 */ 628 void 629 rpcrdma_ep_destroy(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia) 630 { 631 cancel_delayed_work_sync(&ep->rep_connect_worker); 632 633 if (ia->ri_id && ia->ri_id->qp) { 634 rpcrdma_ep_disconnect(ep, ia); 635 rdma_destroy_qp(ia->ri_id); 636 ia->ri_id->qp = NULL; 637 } 638 639 if (ep->rep_attr.recv_cq) 640 ib_free_cq(ep->rep_attr.recv_cq); 641 if (ep->rep_attr.send_cq) 642 ib_free_cq(ep->rep_attr.send_cq); 643 } 644 645 /* Re-establish a connection after a device removal event. 646 * Unlike a normal reconnection, a fresh PD and a new set 647 * of MRs and buffers is needed. 648 */ 649 static int 650 rpcrdma_ep_recreate_xprt(struct rpcrdma_xprt *r_xprt, 651 struct rpcrdma_ep *ep, struct rpcrdma_ia *ia) 652 { 653 int rc, err; 654 655 trace_xprtrdma_reinsert(r_xprt); 656 657 rc = -EHOSTUNREACH; 658 if (rpcrdma_ia_open(r_xprt)) 659 goto out1; 660 661 rc = -ENOMEM; 662 err = rpcrdma_ep_create(ep, ia, &r_xprt->rx_data); 663 if (err) { 664 pr_err("rpcrdma: rpcrdma_ep_create returned %d\n", err); 665 goto out2; 666 } 667 668 rc = -ENETUNREACH; 669 err = rdma_create_qp(ia->ri_id, ia->ri_pd, &ep->rep_attr); 670 if (err) { 671 pr_err("rpcrdma: rdma_create_qp returned %d\n", err); 672 goto out3; 673 } 674 675 rpcrdma_mrs_create(r_xprt); 676 return 0; 677 678 out3: 679 rpcrdma_ep_destroy(ep, ia); 680 out2: 681 rpcrdma_ia_close(ia); 682 out1: 683 return rc; 684 } 685 686 static int 687 rpcrdma_ep_reconnect(struct rpcrdma_xprt *r_xprt, struct rpcrdma_ep *ep, 688 struct rpcrdma_ia *ia) 689 { 690 struct rdma_cm_id *id, *old; 691 int err, rc; 692 693 trace_xprtrdma_reconnect(r_xprt); 694 695 rpcrdma_ep_disconnect(ep, ia); 696 697 rc = -EHOSTUNREACH; 698 id = rpcrdma_create_id(r_xprt, ia); 699 if (IS_ERR(id)) 700 goto out; 701 702 /* As long as the new ID points to the same device as the 703 * old ID, we can reuse the transport's existing PD and all 704 * previously allocated MRs. Also, the same device means 705 * the transport's previous DMA mappings are still valid. 706 * 707 * This is a sanity check only. There should be no way these 708 * point to two different devices here. 709 */ 710 old = id; 711 rc = -ENETUNREACH; 712 if (ia->ri_device != id->device) { 713 pr_err("rpcrdma: can't reconnect on different device!\n"); 714 goto out_destroy; 715 } 716 717 err = rdma_create_qp(id, ia->ri_pd, &ep->rep_attr); 718 if (err) { 719 dprintk("RPC: %s: rdma_create_qp returned %d\n", 720 __func__, err); 721 goto out_destroy; 722 } 723 724 /* Atomically replace the transport's ID and QP. */ 725 rc = 0; 726 old = ia->ri_id; 727 ia->ri_id = id; 728 rdma_destroy_qp(old); 729 730 out_destroy: 731 rdma_destroy_id(old); 732 out: 733 return rc; 734 } 735 736 /* 737 * Connect unconnected endpoint. 738 */ 739 int 740 rpcrdma_ep_connect(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia) 741 { 742 struct rpcrdma_xprt *r_xprt = container_of(ia, struct rpcrdma_xprt, 743 rx_ia); 744 unsigned int extras; 745 int rc; 746 747 retry: 748 switch (ep->rep_connected) { 749 case 0: 750 dprintk("RPC: %s: connecting...\n", __func__); 751 rc = rdma_create_qp(ia->ri_id, ia->ri_pd, &ep->rep_attr); 752 if (rc) { 753 dprintk("RPC: %s: rdma_create_qp failed %i\n", 754 __func__, rc); 755 rc = -ENETUNREACH; 756 goto out_noupdate; 757 } 758 break; 759 case -ENODEV: 760 rc = rpcrdma_ep_recreate_xprt(r_xprt, ep, ia); 761 if (rc) 762 goto out_noupdate; 763 break; 764 default: 765 rc = rpcrdma_ep_reconnect(r_xprt, ep, ia); 766 if (rc) 767 goto out; 768 } 769 770 ep->rep_connected = 0; 771 772 rc = rdma_connect(ia->ri_id, &ep->rep_remote_cma); 773 if (rc) { 774 dprintk("RPC: %s: rdma_connect() failed with %i\n", 775 __func__, rc); 776 goto out; 777 } 778 779 wait_event_interruptible(ep->rep_connect_wait, ep->rep_connected != 0); 780 if (ep->rep_connected <= 0) { 781 if (ep->rep_connected == -EAGAIN) 782 goto retry; 783 rc = ep->rep_connected; 784 goto out; 785 } 786 787 dprintk("RPC: %s: connected\n", __func__); 788 extras = r_xprt->rx_buf.rb_bc_srv_max_requests; 789 if (extras) 790 rpcrdma_ep_post_extra_recv(r_xprt, extras); 791 792 out: 793 if (rc) 794 ep->rep_connected = rc; 795 796 out_noupdate: 797 return rc; 798 } 799 800 /* 801 * rpcrdma_ep_disconnect 802 * 803 * This is separate from destroy to facilitate the ability 804 * to reconnect without recreating the endpoint. 805 * 806 * This call is not reentrant, and must not be made in parallel 807 * on the same endpoint. 808 */ 809 void 810 rpcrdma_ep_disconnect(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia) 811 { 812 int rc; 813 814 rc = rdma_disconnect(ia->ri_id); 815 if (!rc) 816 /* returns without wait if not connected */ 817 wait_event_interruptible(ep->rep_connect_wait, 818 ep->rep_connected != 1); 819 else 820 ep->rep_connected = rc; 821 trace_xprtrdma_disconnect(container_of(ep, struct rpcrdma_xprt, 822 rx_ep), rc); 823 824 ib_drain_qp(ia->ri_id->qp); 825 } 826 827 /* Fixed-size circular FIFO queue. This implementation is wait-free and 828 * lock-free. 829 * 830 * Consumer is the code path that posts Sends. This path dequeues a 831 * sendctx for use by a Send operation. Multiple consumer threads 832 * are serialized by the RPC transport lock, which allows only one 833 * ->send_request call at a time. 834 * 835 * Producer is the code path that handles Send completions. This path 836 * enqueues a sendctx that has been completed. Multiple producer 837 * threads are serialized by the ib_poll_cq() function. 838 */ 839 840 /* rpcrdma_sendctxs_destroy() assumes caller has already quiesced 841 * queue activity, and ib_drain_qp has flushed all remaining Send 842 * requests. 843 */ 844 static void rpcrdma_sendctxs_destroy(struct rpcrdma_buffer *buf) 845 { 846 unsigned long i; 847 848 for (i = 0; i <= buf->rb_sc_last; i++) 849 kfree(buf->rb_sc_ctxs[i]); 850 kfree(buf->rb_sc_ctxs); 851 } 852 853 static struct rpcrdma_sendctx *rpcrdma_sendctx_create(struct rpcrdma_ia *ia) 854 { 855 struct rpcrdma_sendctx *sc; 856 857 sc = kzalloc(sizeof(*sc) + 858 ia->ri_max_send_sges * sizeof(struct ib_sge), 859 GFP_KERNEL); 860 if (!sc) 861 return NULL; 862 863 sc->sc_wr.wr_cqe = &sc->sc_cqe; 864 sc->sc_wr.sg_list = sc->sc_sges; 865 sc->sc_wr.opcode = IB_WR_SEND; 866 sc->sc_cqe.done = rpcrdma_wc_send; 867 return sc; 868 } 869 870 static int rpcrdma_sendctxs_create(struct rpcrdma_xprt *r_xprt) 871 { 872 struct rpcrdma_buffer *buf = &r_xprt->rx_buf; 873 struct rpcrdma_sendctx *sc; 874 unsigned long i; 875 876 /* Maximum number of concurrent outstanding Send WRs. Capping 877 * the circular queue size stops Send Queue overflow by causing 878 * the ->send_request call to fail temporarily before too many 879 * Sends are posted. 880 */ 881 i = buf->rb_max_requests + RPCRDMA_MAX_BC_REQUESTS; 882 dprintk("RPC: %s: allocating %lu send_ctxs\n", __func__, i); 883 buf->rb_sc_ctxs = kcalloc(i, sizeof(sc), GFP_KERNEL); 884 if (!buf->rb_sc_ctxs) 885 return -ENOMEM; 886 887 buf->rb_sc_last = i - 1; 888 for (i = 0; i <= buf->rb_sc_last; i++) { 889 sc = rpcrdma_sendctx_create(&r_xprt->rx_ia); 890 if (!sc) 891 goto out_destroy; 892 893 sc->sc_xprt = r_xprt; 894 buf->rb_sc_ctxs[i] = sc; 895 } 896 897 return 0; 898 899 out_destroy: 900 rpcrdma_sendctxs_destroy(buf); 901 return -ENOMEM; 902 } 903 904 /* The sendctx queue is not guaranteed to have a size that is a 905 * power of two, thus the helpers in circ_buf.h cannot be used. 906 * The other option is to use modulus (%), which can be expensive. 907 */ 908 static unsigned long rpcrdma_sendctx_next(struct rpcrdma_buffer *buf, 909 unsigned long item) 910 { 911 return likely(item < buf->rb_sc_last) ? item + 1 : 0; 912 } 913 914 /** 915 * rpcrdma_sendctx_get_locked - Acquire a send context 916 * @buf: transport buffers from which to acquire an unused context 917 * 918 * Returns pointer to a free send completion context; or NULL if 919 * the queue is empty. 920 * 921 * Usage: Called to acquire an SGE array before preparing a Send WR. 922 * 923 * The caller serializes calls to this function (per rpcrdma_buffer), 924 * and provides an effective memory barrier that flushes the new value 925 * of rb_sc_head. 926 */ 927 struct rpcrdma_sendctx *rpcrdma_sendctx_get_locked(struct rpcrdma_buffer *buf) 928 { 929 struct rpcrdma_xprt *r_xprt; 930 struct rpcrdma_sendctx *sc; 931 unsigned long next_head; 932 933 next_head = rpcrdma_sendctx_next(buf, buf->rb_sc_head); 934 935 if (next_head == READ_ONCE(buf->rb_sc_tail)) 936 goto out_emptyq; 937 938 /* ORDER: item must be accessed _before_ head is updated */ 939 sc = buf->rb_sc_ctxs[next_head]; 940 941 /* Releasing the lock in the caller acts as a memory 942 * barrier that flushes rb_sc_head. 943 */ 944 buf->rb_sc_head = next_head; 945 946 return sc; 947 948 out_emptyq: 949 /* The queue is "empty" if there have not been enough Send 950 * completions recently. This is a sign the Send Queue is 951 * backing up. Cause the caller to pause and try again. 952 */ 953 dprintk("RPC: %s: empty sendctx queue\n", __func__); 954 r_xprt = container_of(buf, struct rpcrdma_xprt, rx_buf); 955 r_xprt->rx_stats.empty_sendctx_q++; 956 return NULL; 957 } 958 959 /** 960 * rpcrdma_sendctx_put_locked - Release a send context 961 * @sc: send context to release 962 * 963 * Usage: Called from Send completion to return a sendctxt 964 * to the queue. 965 * 966 * The caller serializes calls to this function (per rpcrdma_buffer). 967 */ 968 void rpcrdma_sendctx_put_locked(struct rpcrdma_sendctx *sc) 969 { 970 struct rpcrdma_buffer *buf = &sc->sc_xprt->rx_buf; 971 unsigned long next_tail; 972 973 /* Unmap SGEs of previously completed by unsignaled 974 * Sends by walking up the queue until @sc is found. 975 */ 976 next_tail = buf->rb_sc_tail; 977 do { 978 next_tail = rpcrdma_sendctx_next(buf, next_tail); 979 980 /* ORDER: item must be accessed _before_ tail is updated */ 981 rpcrdma_unmap_sendctx(buf->rb_sc_ctxs[next_tail]); 982 983 } while (buf->rb_sc_ctxs[next_tail] != sc); 984 985 /* Paired with READ_ONCE */ 986 smp_store_release(&buf->rb_sc_tail, next_tail); 987 } 988 989 static void 990 rpcrdma_mr_recovery_worker(struct work_struct *work) 991 { 992 struct rpcrdma_buffer *buf = container_of(work, struct rpcrdma_buffer, 993 rb_recovery_worker.work); 994 struct rpcrdma_mr *mr; 995 996 spin_lock(&buf->rb_recovery_lock); 997 while (!list_empty(&buf->rb_stale_mrs)) { 998 mr = rpcrdma_mr_pop(&buf->rb_stale_mrs); 999 spin_unlock(&buf->rb_recovery_lock); 1000 1001 trace_xprtrdma_recover_mr(mr); 1002 mr->mr_xprt->rx_ia.ri_ops->ro_recover_mr(mr); 1003 1004 spin_lock(&buf->rb_recovery_lock); 1005 } 1006 spin_unlock(&buf->rb_recovery_lock); 1007 } 1008 1009 void 1010 rpcrdma_mr_defer_recovery(struct rpcrdma_mr *mr) 1011 { 1012 struct rpcrdma_xprt *r_xprt = mr->mr_xprt; 1013 struct rpcrdma_buffer *buf = &r_xprt->rx_buf; 1014 1015 spin_lock(&buf->rb_recovery_lock); 1016 rpcrdma_mr_push(mr, &buf->rb_stale_mrs); 1017 spin_unlock(&buf->rb_recovery_lock); 1018 1019 schedule_delayed_work(&buf->rb_recovery_worker, 0); 1020 } 1021 1022 static void 1023 rpcrdma_mrs_create(struct rpcrdma_xprt *r_xprt) 1024 { 1025 struct rpcrdma_buffer *buf = &r_xprt->rx_buf; 1026 struct rpcrdma_ia *ia = &r_xprt->rx_ia; 1027 unsigned int count; 1028 LIST_HEAD(free); 1029 LIST_HEAD(all); 1030 1031 for (count = 0; count < 3; count++) { 1032 struct rpcrdma_mr *mr; 1033 int rc; 1034 1035 mr = kzalloc(sizeof(*mr), GFP_KERNEL); 1036 if (!mr) 1037 break; 1038 1039 rc = ia->ri_ops->ro_init_mr(ia, mr); 1040 if (rc) { 1041 kfree(mr); 1042 break; 1043 } 1044 1045 mr->mr_xprt = r_xprt; 1046 1047 list_add(&mr->mr_list, &free); 1048 list_add(&mr->mr_all, &all); 1049 } 1050 1051 spin_lock(&buf->rb_mrlock); 1052 list_splice(&free, &buf->rb_mrs); 1053 list_splice(&all, &buf->rb_all); 1054 r_xprt->rx_stats.mrs_allocated += count; 1055 spin_unlock(&buf->rb_mrlock); 1056 trace_xprtrdma_createmrs(r_xprt, count); 1057 1058 xprt_write_space(&r_xprt->rx_xprt); 1059 } 1060 1061 static void 1062 rpcrdma_mr_refresh_worker(struct work_struct *work) 1063 { 1064 struct rpcrdma_buffer *buf = container_of(work, struct rpcrdma_buffer, 1065 rb_refresh_worker.work); 1066 struct rpcrdma_xprt *r_xprt = container_of(buf, struct rpcrdma_xprt, 1067 rx_buf); 1068 1069 rpcrdma_mrs_create(r_xprt); 1070 } 1071 1072 struct rpcrdma_req * 1073 rpcrdma_create_req(struct rpcrdma_xprt *r_xprt) 1074 { 1075 struct rpcrdma_buffer *buffer = &r_xprt->rx_buf; 1076 struct rpcrdma_regbuf *rb; 1077 struct rpcrdma_req *req; 1078 1079 req = kzalloc(sizeof(*req), GFP_KERNEL); 1080 if (req == NULL) 1081 return ERR_PTR(-ENOMEM); 1082 1083 rb = rpcrdma_alloc_regbuf(RPCRDMA_HDRBUF_SIZE, 1084 DMA_TO_DEVICE, GFP_KERNEL); 1085 if (IS_ERR(rb)) { 1086 kfree(req); 1087 return ERR_PTR(-ENOMEM); 1088 } 1089 req->rl_rdmabuf = rb; 1090 xdr_buf_init(&req->rl_hdrbuf, rb->rg_base, rdmab_length(rb)); 1091 req->rl_buffer = buffer; 1092 INIT_LIST_HEAD(&req->rl_registered); 1093 1094 spin_lock(&buffer->rb_reqslock); 1095 list_add(&req->rl_all, &buffer->rb_allreqs); 1096 spin_unlock(&buffer->rb_reqslock); 1097 return req; 1098 } 1099 1100 /** 1101 * rpcrdma_create_rep - Allocate an rpcrdma_rep object 1102 * @r_xprt: controlling transport 1103 * 1104 * Returns 0 on success or a negative errno on failure. 1105 */ 1106 int 1107 rpcrdma_create_rep(struct rpcrdma_xprt *r_xprt) 1108 { 1109 struct rpcrdma_create_data_internal *cdata = &r_xprt->rx_data; 1110 struct rpcrdma_buffer *buf = &r_xprt->rx_buf; 1111 struct rpcrdma_rep *rep; 1112 int rc; 1113 1114 rc = -ENOMEM; 1115 rep = kzalloc(sizeof(*rep), GFP_KERNEL); 1116 if (rep == NULL) 1117 goto out; 1118 1119 rep->rr_rdmabuf = rpcrdma_alloc_regbuf(cdata->inline_rsize, 1120 DMA_FROM_DEVICE, GFP_KERNEL); 1121 if (IS_ERR(rep->rr_rdmabuf)) { 1122 rc = PTR_ERR(rep->rr_rdmabuf); 1123 goto out_free; 1124 } 1125 xdr_buf_init(&rep->rr_hdrbuf, rep->rr_rdmabuf->rg_base, 1126 rdmab_length(rep->rr_rdmabuf)); 1127 1128 rep->rr_cqe.done = rpcrdma_wc_receive; 1129 rep->rr_rxprt = r_xprt; 1130 INIT_WORK(&rep->rr_work, rpcrdma_deferred_completion); 1131 rep->rr_recv_wr.next = NULL; 1132 rep->rr_recv_wr.wr_cqe = &rep->rr_cqe; 1133 rep->rr_recv_wr.sg_list = &rep->rr_rdmabuf->rg_iov; 1134 rep->rr_recv_wr.num_sge = 1; 1135 1136 spin_lock(&buf->rb_lock); 1137 list_add(&rep->rr_list, &buf->rb_recv_bufs); 1138 spin_unlock(&buf->rb_lock); 1139 return 0; 1140 1141 out_free: 1142 kfree(rep); 1143 out: 1144 dprintk("RPC: %s: reply buffer %d alloc failed\n", 1145 __func__, rc); 1146 return rc; 1147 } 1148 1149 int 1150 rpcrdma_buffer_create(struct rpcrdma_xprt *r_xprt) 1151 { 1152 struct rpcrdma_buffer *buf = &r_xprt->rx_buf; 1153 int i, rc; 1154 1155 buf->rb_max_requests = r_xprt->rx_data.max_requests; 1156 buf->rb_bc_srv_max_requests = 0; 1157 spin_lock_init(&buf->rb_mrlock); 1158 spin_lock_init(&buf->rb_lock); 1159 spin_lock_init(&buf->rb_recovery_lock); 1160 INIT_LIST_HEAD(&buf->rb_mrs); 1161 INIT_LIST_HEAD(&buf->rb_all); 1162 INIT_LIST_HEAD(&buf->rb_stale_mrs); 1163 INIT_DELAYED_WORK(&buf->rb_refresh_worker, 1164 rpcrdma_mr_refresh_worker); 1165 INIT_DELAYED_WORK(&buf->rb_recovery_worker, 1166 rpcrdma_mr_recovery_worker); 1167 1168 rpcrdma_mrs_create(r_xprt); 1169 1170 INIT_LIST_HEAD(&buf->rb_send_bufs); 1171 INIT_LIST_HEAD(&buf->rb_allreqs); 1172 spin_lock_init(&buf->rb_reqslock); 1173 for (i = 0; i < buf->rb_max_requests; i++) { 1174 struct rpcrdma_req *req; 1175 1176 req = rpcrdma_create_req(r_xprt); 1177 if (IS_ERR(req)) { 1178 dprintk("RPC: %s: request buffer %d alloc" 1179 " failed\n", __func__, i); 1180 rc = PTR_ERR(req); 1181 goto out; 1182 } 1183 list_add(&req->rl_list, &buf->rb_send_bufs); 1184 } 1185 1186 INIT_LIST_HEAD(&buf->rb_recv_bufs); 1187 for (i = 0; i <= buf->rb_max_requests; i++) { 1188 rc = rpcrdma_create_rep(r_xprt); 1189 if (rc) 1190 goto out; 1191 } 1192 1193 rc = rpcrdma_sendctxs_create(r_xprt); 1194 if (rc) 1195 goto out; 1196 1197 return 0; 1198 out: 1199 rpcrdma_buffer_destroy(buf); 1200 return rc; 1201 } 1202 1203 static struct rpcrdma_req * 1204 rpcrdma_buffer_get_req_locked(struct rpcrdma_buffer *buf) 1205 { 1206 struct rpcrdma_req *req; 1207 1208 req = list_first_entry(&buf->rb_send_bufs, 1209 struct rpcrdma_req, rl_list); 1210 list_del_init(&req->rl_list); 1211 return req; 1212 } 1213 1214 static struct rpcrdma_rep * 1215 rpcrdma_buffer_get_rep_locked(struct rpcrdma_buffer *buf) 1216 { 1217 struct rpcrdma_rep *rep; 1218 1219 rep = list_first_entry(&buf->rb_recv_bufs, 1220 struct rpcrdma_rep, rr_list); 1221 list_del(&rep->rr_list); 1222 return rep; 1223 } 1224 1225 static void 1226 rpcrdma_destroy_rep(struct rpcrdma_rep *rep) 1227 { 1228 rpcrdma_free_regbuf(rep->rr_rdmabuf); 1229 kfree(rep); 1230 } 1231 1232 void 1233 rpcrdma_destroy_req(struct rpcrdma_req *req) 1234 { 1235 rpcrdma_free_regbuf(req->rl_recvbuf); 1236 rpcrdma_free_regbuf(req->rl_sendbuf); 1237 rpcrdma_free_regbuf(req->rl_rdmabuf); 1238 kfree(req); 1239 } 1240 1241 static void 1242 rpcrdma_mrs_destroy(struct rpcrdma_buffer *buf) 1243 { 1244 struct rpcrdma_xprt *r_xprt = container_of(buf, struct rpcrdma_xprt, 1245 rx_buf); 1246 struct rpcrdma_ia *ia = rdmab_to_ia(buf); 1247 struct rpcrdma_mr *mr; 1248 unsigned int count; 1249 1250 count = 0; 1251 spin_lock(&buf->rb_mrlock); 1252 while (!list_empty(&buf->rb_all)) { 1253 mr = list_entry(buf->rb_all.next, struct rpcrdma_mr, mr_all); 1254 list_del(&mr->mr_all); 1255 1256 spin_unlock(&buf->rb_mrlock); 1257 1258 /* Ensure MW is not on any rl_registered list */ 1259 if (!list_empty(&mr->mr_list)) 1260 list_del(&mr->mr_list); 1261 1262 ia->ri_ops->ro_release_mr(mr); 1263 count++; 1264 spin_lock(&buf->rb_mrlock); 1265 } 1266 spin_unlock(&buf->rb_mrlock); 1267 r_xprt->rx_stats.mrs_allocated = 0; 1268 1269 dprintk("RPC: %s: released %u MRs\n", __func__, count); 1270 } 1271 1272 void 1273 rpcrdma_buffer_destroy(struct rpcrdma_buffer *buf) 1274 { 1275 cancel_delayed_work_sync(&buf->rb_recovery_worker); 1276 cancel_delayed_work_sync(&buf->rb_refresh_worker); 1277 1278 rpcrdma_sendctxs_destroy(buf); 1279 1280 while (!list_empty(&buf->rb_recv_bufs)) { 1281 struct rpcrdma_rep *rep; 1282 1283 rep = rpcrdma_buffer_get_rep_locked(buf); 1284 rpcrdma_destroy_rep(rep); 1285 } 1286 buf->rb_send_count = 0; 1287 1288 spin_lock(&buf->rb_reqslock); 1289 while (!list_empty(&buf->rb_allreqs)) { 1290 struct rpcrdma_req *req; 1291 1292 req = list_first_entry(&buf->rb_allreqs, 1293 struct rpcrdma_req, rl_all); 1294 list_del(&req->rl_all); 1295 1296 spin_unlock(&buf->rb_reqslock); 1297 rpcrdma_destroy_req(req); 1298 spin_lock(&buf->rb_reqslock); 1299 } 1300 spin_unlock(&buf->rb_reqslock); 1301 buf->rb_recv_count = 0; 1302 1303 rpcrdma_mrs_destroy(buf); 1304 } 1305 1306 /** 1307 * rpcrdma_mr_get - Allocate an rpcrdma_mr object 1308 * @r_xprt: controlling transport 1309 * 1310 * Returns an initialized rpcrdma_mr or NULL if no free 1311 * rpcrdma_mr objects are available. 1312 */ 1313 struct rpcrdma_mr * 1314 rpcrdma_mr_get(struct rpcrdma_xprt *r_xprt) 1315 { 1316 struct rpcrdma_buffer *buf = &r_xprt->rx_buf; 1317 struct rpcrdma_mr *mr = NULL; 1318 1319 spin_lock(&buf->rb_mrlock); 1320 if (!list_empty(&buf->rb_mrs)) 1321 mr = rpcrdma_mr_pop(&buf->rb_mrs); 1322 spin_unlock(&buf->rb_mrlock); 1323 1324 if (!mr) 1325 goto out_nomrs; 1326 return mr; 1327 1328 out_nomrs: 1329 trace_xprtrdma_nomrs(r_xprt); 1330 if (r_xprt->rx_ep.rep_connected != -ENODEV) 1331 schedule_delayed_work(&buf->rb_refresh_worker, 0); 1332 1333 /* Allow the reply handler and refresh worker to run */ 1334 cond_resched(); 1335 1336 return NULL; 1337 } 1338 1339 static void 1340 __rpcrdma_mr_put(struct rpcrdma_buffer *buf, struct rpcrdma_mr *mr) 1341 { 1342 spin_lock(&buf->rb_mrlock); 1343 rpcrdma_mr_push(mr, &buf->rb_mrs); 1344 spin_unlock(&buf->rb_mrlock); 1345 } 1346 1347 /** 1348 * rpcrdma_mr_put - Release an rpcrdma_mr object 1349 * @mr: object to release 1350 * 1351 */ 1352 void 1353 rpcrdma_mr_put(struct rpcrdma_mr *mr) 1354 { 1355 __rpcrdma_mr_put(&mr->mr_xprt->rx_buf, mr); 1356 } 1357 1358 /** 1359 * rpcrdma_mr_unmap_and_put - DMA unmap an MR and release it 1360 * @mr: object to release 1361 * 1362 */ 1363 void 1364 rpcrdma_mr_unmap_and_put(struct rpcrdma_mr *mr) 1365 { 1366 struct rpcrdma_xprt *r_xprt = mr->mr_xprt; 1367 1368 trace_xprtrdma_dma_unmap(mr); 1369 ib_dma_unmap_sg(r_xprt->rx_ia.ri_device, 1370 mr->mr_sg, mr->mr_nents, mr->mr_dir); 1371 __rpcrdma_mr_put(&r_xprt->rx_buf, mr); 1372 } 1373 1374 static struct rpcrdma_rep * 1375 rpcrdma_buffer_get_rep(struct rpcrdma_buffer *buffers) 1376 { 1377 /* If an RPC previously completed without a reply (say, a 1378 * credential problem or a soft timeout occurs) then hold off 1379 * on supplying more Receive buffers until the number of new 1380 * pending RPCs catches up to the number of posted Receives. 1381 */ 1382 if (unlikely(buffers->rb_send_count < buffers->rb_recv_count)) 1383 return NULL; 1384 1385 if (unlikely(list_empty(&buffers->rb_recv_bufs))) 1386 return NULL; 1387 buffers->rb_recv_count++; 1388 return rpcrdma_buffer_get_rep_locked(buffers); 1389 } 1390 1391 /* 1392 * Get a set of request/reply buffers. 1393 * 1394 * Reply buffer (if available) is attached to send buffer upon return. 1395 */ 1396 struct rpcrdma_req * 1397 rpcrdma_buffer_get(struct rpcrdma_buffer *buffers) 1398 { 1399 struct rpcrdma_req *req; 1400 1401 spin_lock(&buffers->rb_lock); 1402 if (list_empty(&buffers->rb_send_bufs)) 1403 goto out_reqbuf; 1404 buffers->rb_send_count++; 1405 req = rpcrdma_buffer_get_req_locked(buffers); 1406 req->rl_reply = rpcrdma_buffer_get_rep(buffers); 1407 spin_unlock(&buffers->rb_lock); 1408 1409 return req; 1410 1411 out_reqbuf: 1412 spin_unlock(&buffers->rb_lock); 1413 return NULL; 1414 } 1415 1416 /* 1417 * Put request/reply buffers back into pool. 1418 * Pre-decrement counter/array index. 1419 */ 1420 void 1421 rpcrdma_buffer_put(struct rpcrdma_req *req) 1422 { 1423 struct rpcrdma_buffer *buffers = req->rl_buffer; 1424 struct rpcrdma_rep *rep = req->rl_reply; 1425 1426 req->rl_reply = NULL; 1427 1428 spin_lock(&buffers->rb_lock); 1429 buffers->rb_send_count--; 1430 list_add_tail(&req->rl_list, &buffers->rb_send_bufs); 1431 if (rep) { 1432 buffers->rb_recv_count--; 1433 list_add_tail(&rep->rr_list, &buffers->rb_recv_bufs); 1434 } 1435 spin_unlock(&buffers->rb_lock); 1436 } 1437 1438 /* 1439 * Recover reply buffers from pool. 1440 * This happens when recovering from disconnect. 1441 */ 1442 void 1443 rpcrdma_recv_buffer_get(struct rpcrdma_req *req) 1444 { 1445 struct rpcrdma_buffer *buffers = req->rl_buffer; 1446 1447 spin_lock(&buffers->rb_lock); 1448 req->rl_reply = rpcrdma_buffer_get_rep(buffers); 1449 spin_unlock(&buffers->rb_lock); 1450 } 1451 1452 /* 1453 * Put reply buffers back into pool when not attached to 1454 * request. This happens in error conditions. 1455 */ 1456 void 1457 rpcrdma_recv_buffer_put(struct rpcrdma_rep *rep) 1458 { 1459 struct rpcrdma_buffer *buffers = &rep->rr_rxprt->rx_buf; 1460 1461 spin_lock(&buffers->rb_lock); 1462 buffers->rb_recv_count--; 1463 list_add_tail(&rep->rr_list, &buffers->rb_recv_bufs); 1464 spin_unlock(&buffers->rb_lock); 1465 } 1466 1467 /** 1468 * rpcrdma_alloc_regbuf - allocate and DMA-map memory for SEND/RECV buffers 1469 * @size: size of buffer to be allocated, in bytes 1470 * @direction: direction of data movement 1471 * @flags: GFP flags 1472 * 1473 * Returns an ERR_PTR, or a pointer to a regbuf, a buffer that 1474 * can be persistently DMA-mapped for I/O. 1475 * 1476 * xprtrdma uses a regbuf for posting an outgoing RDMA SEND, or for 1477 * receiving the payload of RDMA RECV operations. During Long Calls 1478 * or Replies they may be registered externally via ro_map. 1479 */ 1480 struct rpcrdma_regbuf * 1481 rpcrdma_alloc_regbuf(size_t size, enum dma_data_direction direction, 1482 gfp_t flags) 1483 { 1484 struct rpcrdma_regbuf *rb; 1485 1486 rb = kmalloc(sizeof(*rb) + size, flags); 1487 if (rb == NULL) 1488 return ERR_PTR(-ENOMEM); 1489 1490 rb->rg_device = NULL; 1491 rb->rg_direction = direction; 1492 rb->rg_iov.length = size; 1493 1494 return rb; 1495 } 1496 1497 /** 1498 * __rpcrdma_map_regbuf - DMA-map a regbuf 1499 * @ia: controlling rpcrdma_ia 1500 * @rb: regbuf to be mapped 1501 */ 1502 bool 1503 __rpcrdma_dma_map_regbuf(struct rpcrdma_ia *ia, struct rpcrdma_regbuf *rb) 1504 { 1505 struct ib_device *device = ia->ri_device; 1506 1507 if (rb->rg_direction == DMA_NONE) 1508 return false; 1509 1510 rb->rg_iov.addr = ib_dma_map_single(device, 1511 (void *)rb->rg_base, 1512 rdmab_length(rb), 1513 rb->rg_direction); 1514 if (ib_dma_mapping_error(device, rdmab_addr(rb))) 1515 return false; 1516 1517 rb->rg_device = device; 1518 rb->rg_iov.lkey = ia->ri_pd->local_dma_lkey; 1519 return true; 1520 } 1521 1522 static void 1523 rpcrdma_dma_unmap_regbuf(struct rpcrdma_regbuf *rb) 1524 { 1525 if (!rb) 1526 return; 1527 1528 if (!rpcrdma_regbuf_is_mapped(rb)) 1529 return; 1530 1531 ib_dma_unmap_single(rb->rg_device, rdmab_addr(rb), 1532 rdmab_length(rb), rb->rg_direction); 1533 rb->rg_device = NULL; 1534 } 1535 1536 /** 1537 * rpcrdma_free_regbuf - deregister and free registered buffer 1538 * @rb: regbuf to be deregistered and freed 1539 */ 1540 void 1541 rpcrdma_free_regbuf(struct rpcrdma_regbuf *rb) 1542 { 1543 rpcrdma_dma_unmap_regbuf(rb); 1544 kfree(rb); 1545 } 1546 1547 /* 1548 * Prepost any receive buffer, then post send. 1549 * 1550 * Receive buffer is donated to hardware, reclaimed upon recv completion. 1551 */ 1552 int 1553 rpcrdma_ep_post(struct rpcrdma_ia *ia, 1554 struct rpcrdma_ep *ep, 1555 struct rpcrdma_req *req) 1556 { 1557 struct ib_send_wr *send_wr = &req->rl_sendctx->sc_wr; 1558 int rc; 1559 1560 if (req->rl_reply) { 1561 rc = rpcrdma_ep_post_recv(ia, req->rl_reply); 1562 if (rc) 1563 return rc; 1564 req->rl_reply = NULL; 1565 } 1566 1567 if (!ep->rep_send_count || 1568 test_bit(RPCRDMA_REQ_F_TX_RESOURCES, &req->rl_flags)) { 1569 send_wr->send_flags |= IB_SEND_SIGNALED; 1570 ep->rep_send_count = ep->rep_send_batch; 1571 } else { 1572 send_wr->send_flags &= ~IB_SEND_SIGNALED; 1573 --ep->rep_send_count; 1574 } 1575 1576 rc = ia->ri_ops->ro_send(ia, req); 1577 trace_xprtrdma_post_send(req, rc); 1578 if (rc) 1579 return -ENOTCONN; 1580 return 0; 1581 } 1582 1583 int 1584 rpcrdma_ep_post_recv(struct rpcrdma_ia *ia, 1585 struct rpcrdma_rep *rep) 1586 { 1587 struct ib_recv_wr *recv_wr_fail; 1588 int rc; 1589 1590 if (!rpcrdma_dma_map_regbuf(ia, rep->rr_rdmabuf)) 1591 goto out_map; 1592 rc = ib_post_recv(ia->ri_id->qp, &rep->rr_recv_wr, &recv_wr_fail); 1593 trace_xprtrdma_post_recv(rep, rc); 1594 if (rc) 1595 return -ENOTCONN; 1596 return 0; 1597 1598 out_map: 1599 pr_err("rpcrdma: failed to DMA map the Receive buffer\n"); 1600 return -EIO; 1601 } 1602 1603 /** 1604 * rpcrdma_ep_post_extra_recv - Post buffers for incoming backchannel requests 1605 * @r_xprt: transport associated with these backchannel resources 1606 * @count: minimum number of incoming requests expected 1607 * 1608 * Returns zero if all requested buffers were posted, or a negative errno. 1609 */ 1610 int 1611 rpcrdma_ep_post_extra_recv(struct rpcrdma_xprt *r_xprt, unsigned int count) 1612 { 1613 struct rpcrdma_buffer *buffers = &r_xprt->rx_buf; 1614 struct rpcrdma_ia *ia = &r_xprt->rx_ia; 1615 struct rpcrdma_rep *rep; 1616 int rc; 1617 1618 while (count--) { 1619 spin_lock(&buffers->rb_lock); 1620 if (list_empty(&buffers->rb_recv_bufs)) 1621 goto out_reqbuf; 1622 rep = rpcrdma_buffer_get_rep_locked(buffers); 1623 spin_unlock(&buffers->rb_lock); 1624 1625 rc = rpcrdma_ep_post_recv(ia, rep); 1626 if (rc) 1627 goto out_rc; 1628 } 1629 1630 return 0; 1631 1632 out_reqbuf: 1633 spin_unlock(&buffers->rb_lock); 1634 trace_xprtrdma_noreps(r_xprt); 1635 return -ENOMEM; 1636 1637 out_rc: 1638 rpcrdma_recv_buffer_put(rep); 1639 return rc; 1640 } 1641