1 /* 2 * Copyright (c) 2014-2017 Oracle. All rights reserved. 3 * Copyright (c) 2003-2007 Network Appliance, Inc. All rights reserved. 4 * 5 * This software is available to you under a choice of one of two 6 * licenses. You may choose to be licensed under the terms of the GNU 7 * General Public License (GPL) Version 2, available from the file 8 * COPYING in the main directory of this source tree, or the BSD-type 9 * license below: 10 * 11 * Redistribution and use in source and binary forms, with or without 12 * modification, are permitted provided that the following conditions 13 * are met: 14 * 15 * Redistributions of source code must retain the above copyright 16 * notice, this list of conditions and the following disclaimer. 17 * 18 * Redistributions in binary form must reproduce the above 19 * copyright notice, this list of conditions and the following 20 * disclaimer in the documentation and/or other materials provided 21 * with the distribution. 22 * 23 * Neither the name of the Network Appliance, Inc. nor the names of 24 * its contributors may be used to endorse or promote products 25 * derived from this software without specific prior written 26 * permission. 27 * 28 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 29 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 30 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 31 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 32 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 33 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 34 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 35 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 36 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 37 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 38 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 39 */ 40 41 /* 42 * verbs.c 43 * 44 * Encapsulates the major functions managing: 45 * o adapters 46 * o endpoints 47 * o connections 48 * o buffer memory 49 */ 50 51 #include <linux/interrupt.h> 52 #include <linux/slab.h> 53 #include <linux/sunrpc/addr.h> 54 #include <linux/sunrpc/svc_rdma.h> 55 56 #include <asm-generic/barrier.h> 57 #include <asm/bitops.h> 58 59 #include <rdma/ib_cm.h> 60 61 #include "xprt_rdma.h" 62 63 /* 64 * Globals/Macros 65 */ 66 67 #if IS_ENABLED(CONFIG_SUNRPC_DEBUG) 68 # define RPCDBG_FACILITY RPCDBG_TRANS 69 #endif 70 71 /* 72 * internal functions 73 */ 74 static void rpcrdma_mrs_create(struct rpcrdma_xprt *r_xprt); 75 static void rpcrdma_mrs_destroy(struct rpcrdma_buffer *buf); 76 static void rpcrdma_dma_unmap_regbuf(struct rpcrdma_regbuf *rb); 77 78 struct workqueue_struct *rpcrdma_receive_wq __read_mostly; 79 80 int 81 rpcrdma_alloc_wq(void) 82 { 83 struct workqueue_struct *recv_wq; 84 85 recv_wq = alloc_workqueue("xprtrdma_receive", 86 WQ_MEM_RECLAIM | WQ_HIGHPRI, 87 0); 88 if (!recv_wq) 89 return -ENOMEM; 90 91 rpcrdma_receive_wq = recv_wq; 92 return 0; 93 } 94 95 void 96 rpcrdma_destroy_wq(void) 97 { 98 struct workqueue_struct *wq; 99 100 if (rpcrdma_receive_wq) { 101 wq = rpcrdma_receive_wq; 102 rpcrdma_receive_wq = NULL; 103 destroy_workqueue(wq); 104 } 105 } 106 107 static void 108 rpcrdma_qp_async_error_upcall(struct ib_event *event, void *context) 109 { 110 struct rpcrdma_ep *ep = context; 111 struct rpcrdma_xprt *r_xprt = container_of(ep, struct rpcrdma_xprt, 112 rx_ep); 113 114 trace_xprtrdma_qp_error(r_xprt, event); 115 pr_err("rpcrdma: %s on device %s ep %p\n", 116 ib_event_msg(event->event), event->device->name, context); 117 118 if (ep->rep_connected == 1) { 119 ep->rep_connected = -EIO; 120 rpcrdma_conn_func(ep); 121 wake_up_all(&ep->rep_connect_wait); 122 } 123 } 124 125 /** 126 * rpcrdma_wc_send - Invoked by RDMA provider for each polled Send WC 127 * @cq: completion queue (ignored) 128 * @wc: completed WR 129 * 130 */ 131 static void 132 rpcrdma_wc_send(struct ib_cq *cq, struct ib_wc *wc) 133 { 134 struct ib_cqe *cqe = wc->wr_cqe; 135 struct rpcrdma_sendctx *sc = 136 container_of(cqe, struct rpcrdma_sendctx, sc_cqe); 137 138 /* WARNING: Only wr_cqe and status are reliable at this point */ 139 trace_xprtrdma_wc_send(sc, wc); 140 if (wc->status != IB_WC_SUCCESS && wc->status != IB_WC_WR_FLUSH_ERR) 141 pr_err("rpcrdma: Send: %s (%u/0x%x)\n", 142 ib_wc_status_msg(wc->status), 143 wc->status, wc->vendor_err); 144 145 rpcrdma_sendctx_put_locked(sc); 146 } 147 148 /** 149 * rpcrdma_wc_receive - Invoked by RDMA provider for each polled Receive WC 150 * @cq: completion queue (ignored) 151 * @wc: completed WR 152 * 153 */ 154 static void 155 rpcrdma_wc_receive(struct ib_cq *cq, struct ib_wc *wc) 156 { 157 struct ib_cqe *cqe = wc->wr_cqe; 158 struct rpcrdma_rep *rep = container_of(cqe, struct rpcrdma_rep, 159 rr_cqe); 160 161 /* WARNING: Only wr_id and status are reliable at this point */ 162 trace_xprtrdma_wc_receive(rep, wc); 163 if (wc->status != IB_WC_SUCCESS) 164 goto out_fail; 165 166 /* status == SUCCESS means all fields in wc are trustworthy */ 167 rpcrdma_set_xdrlen(&rep->rr_hdrbuf, wc->byte_len); 168 rep->rr_wc_flags = wc->wc_flags; 169 rep->rr_inv_rkey = wc->ex.invalidate_rkey; 170 171 ib_dma_sync_single_for_cpu(rdmab_device(rep->rr_rdmabuf), 172 rdmab_addr(rep->rr_rdmabuf), 173 wc->byte_len, DMA_FROM_DEVICE); 174 175 out_schedule: 176 rpcrdma_reply_handler(rep); 177 return; 178 179 out_fail: 180 if (wc->status != IB_WC_WR_FLUSH_ERR) 181 pr_err("rpcrdma: Recv: %s (%u/0x%x)\n", 182 ib_wc_status_msg(wc->status), 183 wc->status, wc->vendor_err); 184 rpcrdma_set_xdrlen(&rep->rr_hdrbuf, 0); 185 goto out_schedule; 186 } 187 188 static void 189 rpcrdma_update_connect_private(struct rpcrdma_xprt *r_xprt, 190 struct rdma_conn_param *param) 191 { 192 struct rpcrdma_create_data_internal *cdata = &r_xprt->rx_data; 193 const struct rpcrdma_connect_private *pmsg = param->private_data; 194 unsigned int rsize, wsize; 195 196 /* Default settings for RPC-over-RDMA Version One */ 197 r_xprt->rx_ia.ri_implicit_roundup = xprt_rdma_pad_optimize; 198 rsize = RPCRDMA_V1_DEF_INLINE_SIZE; 199 wsize = RPCRDMA_V1_DEF_INLINE_SIZE; 200 201 if (pmsg && 202 pmsg->cp_magic == rpcrdma_cmp_magic && 203 pmsg->cp_version == RPCRDMA_CMP_VERSION) { 204 r_xprt->rx_ia.ri_implicit_roundup = true; 205 rsize = rpcrdma_decode_buffer_size(pmsg->cp_send_size); 206 wsize = rpcrdma_decode_buffer_size(pmsg->cp_recv_size); 207 } 208 209 if (rsize < cdata->inline_rsize) 210 cdata->inline_rsize = rsize; 211 if (wsize < cdata->inline_wsize) 212 cdata->inline_wsize = wsize; 213 dprintk("RPC: %s: max send %u, max recv %u\n", 214 __func__, cdata->inline_wsize, cdata->inline_rsize); 215 rpcrdma_set_max_header_sizes(r_xprt); 216 } 217 218 static int 219 rpcrdma_conn_upcall(struct rdma_cm_id *id, struct rdma_cm_event *event) 220 { 221 struct rpcrdma_xprt *xprt = id->context; 222 struct rpcrdma_ia *ia = &xprt->rx_ia; 223 struct rpcrdma_ep *ep = &xprt->rx_ep; 224 int connstate = 0; 225 226 trace_xprtrdma_conn_upcall(xprt, event); 227 switch (event->event) { 228 case RDMA_CM_EVENT_ADDR_RESOLVED: 229 case RDMA_CM_EVENT_ROUTE_RESOLVED: 230 ia->ri_async_rc = 0; 231 complete(&ia->ri_done); 232 break; 233 case RDMA_CM_EVENT_ADDR_ERROR: 234 ia->ri_async_rc = -EHOSTUNREACH; 235 complete(&ia->ri_done); 236 break; 237 case RDMA_CM_EVENT_ROUTE_ERROR: 238 ia->ri_async_rc = -ENETUNREACH; 239 complete(&ia->ri_done); 240 break; 241 case RDMA_CM_EVENT_DEVICE_REMOVAL: 242 #if IS_ENABLED(CONFIG_SUNRPC_DEBUG) 243 pr_info("rpcrdma: removing device %s for %s:%s\n", 244 ia->ri_device->name, 245 rpcrdma_addrstr(xprt), rpcrdma_portstr(xprt)); 246 #endif 247 set_bit(RPCRDMA_IAF_REMOVING, &ia->ri_flags); 248 ep->rep_connected = -ENODEV; 249 xprt_force_disconnect(&xprt->rx_xprt); 250 wait_for_completion(&ia->ri_remove_done); 251 252 ia->ri_id = NULL; 253 ia->ri_device = NULL; 254 /* Return 1 to ensure the core destroys the id. */ 255 return 1; 256 case RDMA_CM_EVENT_ESTABLISHED: 257 ++xprt->rx_xprt.connect_cookie; 258 connstate = 1; 259 rpcrdma_update_connect_private(xprt, &event->param.conn); 260 goto connected; 261 case RDMA_CM_EVENT_CONNECT_ERROR: 262 connstate = -ENOTCONN; 263 goto connected; 264 case RDMA_CM_EVENT_UNREACHABLE: 265 connstate = -ENETDOWN; 266 goto connected; 267 case RDMA_CM_EVENT_REJECTED: 268 dprintk("rpcrdma: connection to %s:%s rejected: %s\n", 269 rpcrdma_addrstr(xprt), rpcrdma_portstr(xprt), 270 rdma_reject_msg(id, event->status)); 271 connstate = -ECONNREFUSED; 272 if (event->status == IB_CM_REJ_STALE_CONN) 273 connstate = -EAGAIN; 274 goto connected; 275 case RDMA_CM_EVENT_DISCONNECTED: 276 ++xprt->rx_xprt.connect_cookie; 277 connstate = -ECONNABORTED; 278 connected: 279 xprt->rx_buf.rb_credits = 1; 280 ep->rep_connected = connstate; 281 rpcrdma_conn_func(ep); 282 wake_up_all(&ep->rep_connect_wait); 283 /*FALLTHROUGH*/ 284 default: 285 dprintk("RPC: %s: %s:%s on %s/%s (ep 0x%p): %s\n", 286 __func__, 287 rpcrdma_addrstr(xprt), rpcrdma_portstr(xprt), 288 ia->ri_device->name, ia->ri_ops->ro_displayname, 289 ep, rdma_event_msg(event->event)); 290 break; 291 } 292 293 return 0; 294 } 295 296 static struct rdma_cm_id * 297 rpcrdma_create_id(struct rpcrdma_xprt *xprt, struct rpcrdma_ia *ia) 298 { 299 unsigned long wtimeout = msecs_to_jiffies(RDMA_RESOLVE_TIMEOUT) + 1; 300 struct rdma_cm_id *id; 301 int rc; 302 303 trace_xprtrdma_conn_start(xprt); 304 305 init_completion(&ia->ri_done); 306 init_completion(&ia->ri_remove_done); 307 308 id = rdma_create_id(&init_net, rpcrdma_conn_upcall, xprt, RDMA_PS_TCP, 309 IB_QPT_RC); 310 if (IS_ERR(id)) { 311 rc = PTR_ERR(id); 312 dprintk("RPC: %s: rdma_create_id() failed %i\n", 313 __func__, rc); 314 return id; 315 } 316 317 ia->ri_async_rc = -ETIMEDOUT; 318 rc = rdma_resolve_addr(id, NULL, 319 (struct sockaddr *)&xprt->rx_xprt.addr, 320 RDMA_RESOLVE_TIMEOUT); 321 if (rc) { 322 dprintk("RPC: %s: rdma_resolve_addr() failed %i\n", 323 __func__, rc); 324 goto out; 325 } 326 rc = wait_for_completion_interruptible_timeout(&ia->ri_done, wtimeout); 327 if (rc < 0) { 328 trace_xprtrdma_conn_tout(xprt); 329 goto out; 330 } 331 332 rc = ia->ri_async_rc; 333 if (rc) 334 goto out; 335 336 ia->ri_async_rc = -ETIMEDOUT; 337 rc = rdma_resolve_route(id, RDMA_RESOLVE_TIMEOUT); 338 if (rc) { 339 dprintk("RPC: %s: rdma_resolve_route() failed %i\n", 340 __func__, rc); 341 goto out; 342 } 343 rc = wait_for_completion_interruptible_timeout(&ia->ri_done, wtimeout); 344 if (rc < 0) { 345 trace_xprtrdma_conn_tout(xprt); 346 goto out; 347 } 348 rc = ia->ri_async_rc; 349 if (rc) 350 goto out; 351 352 return id; 353 354 out: 355 rdma_destroy_id(id); 356 return ERR_PTR(rc); 357 } 358 359 /* 360 * Exported functions. 361 */ 362 363 /** 364 * rpcrdma_ia_open - Open and initialize an Interface Adapter. 365 * @xprt: transport with IA to (re)initialize 366 * 367 * Returns 0 on success, negative errno if an appropriate 368 * Interface Adapter could not be found and opened. 369 */ 370 int 371 rpcrdma_ia_open(struct rpcrdma_xprt *xprt) 372 { 373 struct rpcrdma_ia *ia = &xprt->rx_ia; 374 int rc; 375 376 ia->ri_id = rpcrdma_create_id(xprt, ia); 377 if (IS_ERR(ia->ri_id)) { 378 rc = PTR_ERR(ia->ri_id); 379 goto out_err; 380 } 381 ia->ri_device = ia->ri_id->device; 382 383 ia->ri_pd = ib_alloc_pd(ia->ri_device, 0); 384 if (IS_ERR(ia->ri_pd)) { 385 rc = PTR_ERR(ia->ri_pd); 386 pr_err("rpcrdma: ib_alloc_pd() returned %d\n", rc); 387 goto out_err; 388 } 389 390 switch (xprt_rdma_memreg_strategy) { 391 case RPCRDMA_FRWR: 392 if (frwr_is_supported(ia)) { 393 ia->ri_ops = &rpcrdma_frwr_memreg_ops; 394 break; 395 } 396 /*FALLTHROUGH*/ 397 case RPCRDMA_MTHCAFMR: 398 if (fmr_is_supported(ia)) { 399 ia->ri_ops = &rpcrdma_fmr_memreg_ops; 400 break; 401 } 402 /*FALLTHROUGH*/ 403 default: 404 pr_err("rpcrdma: Device %s does not support memreg mode %d\n", 405 ia->ri_device->name, xprt_rdma_memreg_strategy); 406 rc = -EINVAL; 407 goto out_err; 408 } 409 410 return 0; 411 412 out_err: 413 rpcrdma_ia_close(ia); 414 return rc; 415 } 416 417 /** 418 * rpcrdma_ia_remove - Handle device driver unload 419 * @ia: interface adapter being removed 420 * 421 * Divest transport H/W resources associated with this adapter, 422 * but allow it to be restored later. 423 */ 424 void 425 rpcrdma_ia_remove(struct rpcrdma_ia *ia) 426 { 427 struct rpcrdma_xprt *r_xprt = container_of(ia, struct rpcrdma_xprt, 428 rx_ia); 429 struct rpcrdma_ep *ep = &r_xprt->rx_ep; 430 struct rpcrdma_buffer *buf = &r_xprt->rx_buf; 431 struct rpcrdma_req *req; 432 struct rpcrdma_rep *rep; 433 434 cancel_delayed_work_sync(&buf->rb_refresh_worker); 435 436 /* This is similar to rpcrdma_ep_destroy, but: 437 * - Don't cancel the connect worker. 438 * - Don't call rpcrdma_ep_disconnect, which waits 439 * for another conn upcall, which will deadlock. 440 * - rdma_disconnect is unneeded, the underlying 441 * connection is already gone. 442 */ 443 if (ia->ri_id->qp) { 444 ib_drain_qp(ia->ri_id->qp); 445 rdma_destroy_qp(ia->ri_id); 446 ia->ri_id->qp = NULL; 447 } 448 ib_free_cq(ep->rep_attr.recv_cq); 449 ep->rep_attr.recv_cq = NULL; 450 ib_free_cq(ep->rep_attr.send_cq); 451 ep->rep_attr.send_cq = NULL; 452 453 /* The ULP is responsible for ensuring all DMA 454 * mappings and MRs are gone. 455 */ 456 list_for_each_entry(rep, &buf->rb_recv_bufs, rr_list) 457 rpcrdma_dma_unmap_regbuf(rep->rr_rdmabuf); 458 list_for_each_entry(req, &buf->rb_allreqs, rl_all) { 459 rpcrdma_dma_unmap_regbuf(req->rl_rdmabuf); 460 rpcrdma_dma_unmap_regbuf(req->rl_sendbuf); 461 rpcrdma_dma_unmap_regbuf(req->rl_recvbuf); 462 } 463 rpcrdma_mrs_destroy(buf); 464 ib_dealloc_pd(ia->ri_pd); 465 ia->ri_pd = NULL; 466 467 /* Allow waiters to continue */ 468 complete(&ia->ri_remove_done); 469 470 trace_xprtrdma_remove(r_xprt); 471 } 472 473 /** 474 * rpcrdma_ia_close - Clean up/close an IA. 475 * @ia: interface adapter to close 476 * 477 */ 478 void 479 rpcrdma_ia_close(struct rpcrdma_ia *ia) 480 { 481 if (ia->ri_id != NULL && !IS_ERR(ia->ri_id)) { 482 if (ia->ri_id->qp) 483 rdma_destroy_qp(ia->ri_id); 484 rdma_destroy_id(ia->ri_id); 485 } 486 ia->ri_id = NULL; 487 ia->ri_device = NULL; 488 489 /* If the pd is still busy, xprtrdma missed freeing a resource */ 490 if (ia->ri_pd && !IS_ERR(ia->ri_pd)) 491 ib_dealloc_pd(ia->ri_pd); 492 ia->ri_pd = NULL; 493 } 494 495 /* 496 * Create unconnected endpoint. 497 */ 498 int 499 rpcrdma_ep_create(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia, 500 struct rpcrdma_create_data_internal *cdata) 501 { 502 struct rpcrdma_connect_private *pmsg = &ep->rep_cm_private; 503 unsigned int max_qp_wr, max_sge; 504 struct ib_cq *sendcq, *recvcq; 505 int rc; 506 507 max_sge = min_t(unsigned int, ia->ri_device->attrs.max_sge, 508 RPCRDMA_MAX_SEND_SGES); 509 if (max_sge < RPCRDMA_MIN_SEND_SGES) { 510 pr_warn("rpcrdma: HCA provides only %d send SGEs\n", max_sge); 511 return -ENOMEM; 512 } 513 ia->ri_max_send_sges = max_sge; 514 515 if (ia->ri_device->attrs.max_qp_wr <= RPCRDMA_BACKWARD_WRS) { 516 dprintk("RPC: %s: insufficient wqe's available\n", 517 __func__); 518 return -ENOMEM; 519 } 520 max_qp_wr = ia->ri_device->attrs.max_qp_wr - RPCRDMA_BACKWARD_WRS - 1; 521 522 /* check provider's send/recv wr limits */ 523 if (cdata->max_requests > max_qp_wr) 524 cdata->max_requests = max_qp_wr; 525 526 ep->rep_attr.event_handler = rpcrdma_qp_async_error_upcall; 527 ep->rep_attr.qp_context = ep; 528 ep->rep_attr.srq = NULL; 529 ep->rep_attr.cap.max_send_wr = cdata->max_requests; 530 ep->rep_attr.cap.max_send_wr += RPCRDMA_BACKWARD_WRS; 531 ep->rep_attr.cap.max_send_wr += 1; /* drain cqe */ 532 rc = ia->ri_ops->ro_open(ia, ep, cdata); 533 if (rc) 534 return rc; 535 ep->rep_attr.cap.max_recv_wr = cdata->max_requests; 536 ep->rep_attr.cap.max_recv_wr += RPCRDMA_BACKWARD_WRS; 537 ep->rep_attr.cap.max_recv_wr += 1; /* drain cqe */ 538 ep->rep_attr.cap.max_send_sge = max_sge; 539 ep->rep_attr.cap.max_recv_sge = 1; 540 ep->rep_attr.cap.max_inline_data = 0; 541 ep->rep_attr.sq_sig_type = IB_SIGNAL_REQ_WR; 542 ep->rep_attr.qp_type = IB_QPT_RC; 543 ep->rep_attr.port_num = ~0; 544 545 dprintk("RPC: %s: requested max: dtos: send %d recv %d; " 546 "iovs: send %d recv %d\n", 547 __func__, 548 ep->rep_attr.cap.max_send_wr, 549 ep->rep_attr.cap.max_recv_wr, 550 ep->rep_attr.cap.max_send_sge, 551 ep->rep_attr.cap.max_recv_sge); 552 553 /* set trigger for requesting send completion */ 554 ep->rep_send_batch = min_t(unsigned int, RPCRDMA_MAX_SEND_BATCH, 555 cdata->max_requests >> 2); 556 ep->rep_send_count = ep->rep_send_batch; 557 init_waitqueue_head(&ep->rep_connect_wait); 558 INIT_DELAYED_WORK(&ep->rep_connect_worker, rpcrdma_connect_worker); 559 560 sendcq = ib_alloc_cq(ia->ri_device, NULL, 561 ep->rep_attr.cap.max_send_wr + 1, 562 1, IB_POLL_WORKQUEUE); 563 if (IS_ERR(sendcq)) { 564 rc = PTR_ERR(sendcq); 565 dprintk("RPC: %s: failed to create send CQ: %i\n", 566 __func__, rc); 567 goto out1; 568 } 569 570 recvcq = ib_alloc_cq(ia->ri_device, NULL, 571 ep->rep_attr.cap.max_recv_wr + 1, 572 0, IB_POLL_WORKQUEUE); 573 if (IS_ERR(recvcq)) { 574 rc = PTR_ERR(recvcq); 575 dprintk("RPC: %s: failed to create recv CQ: %i\n", 576 __func__, rc); 577 goto out2; 578 } 579 580 ep->rep_attr.send_cq = sendcq; 581 ep->rep_attr.recv_cq = recvcq; 582 583 /* Initialize cma parameters */ 584 memset(&ep->rep_remote_cma, 0, sizeof(ep->rep_remote_cma)); 585 586 /* Prepare RDMA-CM private message */ 587 pmsg->cp_magic = rpcrdma_cmp_magic; 588 pmsg->cp_version = RPCRDMA_CMP_VERSION; 589 pmsg->cp_flags |= ia->ri_ops->ro_send_w_inv_ok; 590 pmsg->cp_send_size = rpcrdma_encode_buffer_size(cdata->inline_wsize); 591 pmsg->cp_recv_size = rpcrdma_encode_buffer_size(cdata->inline_rsize); 592 ep->rep_remote_cma.private_data = pmsg; 593 ep->rep_remote_cma.private_data_len = sizeof(*pmsg); 594 595 /* Client offers RDMA Read but does not initiate */ 596 ep->rep_remote_cma.initiator_depth = 0; 597 ep->rep_remote_cma.responder_resources = 598 min_t(int, U8_MAX, ia->ri_device->attrs.max_qp_rd_atom); 599 600 /* Limit transport retries so client can detect server 601 * GID changes quickly. RPC layer handles re-establishing 602 * transport connection and retransmission. 603 */ 604 ep->rep_remote_cma.retry_count = 6; 605 606 /* RPC-over-RDMA handles its own flow control. In addition, 607 * make all RNR NAKs visible so we know that RPC-over-RDMA 608 * flow control is working correctly (no NAKs should be seen). 609 */ 610 ep->rep_remote_cma.flow_control = 0; 611 ep->rep_remote_cma.rnr_retry_count = 0; 612 613 return 0; 614 615 out2: 616 ib_free_cq(sendcq); 617 out1: 618 return rc; 619 } 620 621 /* 622 * rpcrdma_ep_destroy 623 * 624 * Disconnect and destroy endpoint. After this, the only 625 * valid operations on the ep are to free it (if dynamically 626 * allocated) or re-create it. 627 */ 628 void 629 rpcrdma_ep_destroy(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia) 630 { 631 cancel_delayed_work_sync(&ep->rep_connect_worker); 632 633 if (ia->ri_id && ia->ri_id->qp) { 634 rpcrdma_ep_disconnect(ep, ia); 635 rdma_destroy_qp(ia->ri_id); 636 ia->ri_id->qp = NULL; 637 } 638 639 if (ep->rep_attr.recv_cq) 640 ib_free_cq(ep->rep_attr.recv_cq); 641 if (ep->rep_attr.send_cq) 642 ib_free_cq(ep->rep_attr.send_cq); 643 } 644 645 /* Re-establish a connection after a device removal event. 646 * Unlike a normal reconnection, a fresh PD and a new set 647 * of MRs and buffers is needed. 648 */ 649 static int 650 rpcrdma_ep_recreate_xprt(struct rpcrdma_xprt *r_xprt, 651 struct rpcrdma_ep *ep, struct rpcrdma_ia *ia) 652 { 653 int rc, err; 654 655 trace_xprtrdma_reinsert(r_xprt); 656 657 rc = -EHOSTUNREACH; 658 if (rpcrdma_ia_open(r_xprt)) 659 goto out1; 660 661 rc = -ENOMEM; 662 err = rpcrdma_ep_create(ep, ia, &r_xprt->rx_data); 663 if (err) { 664 pr_err("rpcrdma: rpcrdma_ep_create returned %d\n", err); 665 goto out2; 666 } 667 668 rc = -ENETUNREACH; 669 err = rdma_create_qp(ia->ri_id, ia->ri_pd, &ep->rep_attr); 670 if (err) { 671 pr_err("rpcrdma: rdma_create_qp returned %d\n", err); 672 goto out3; 673 } 674 675 rpcrdma_mrs_create(r_xprt); 676 return 0; 677 678 out3: 679 rpcrdma_ep_destroy(ep, ia); 680 out2: 681 rpcrdma_ia_close(ia); 682 out1: 683 return rc; 684 } 685 686 static int 687 rpcrdma_ep_reconnect(struct rpcrdma_xprt *r_xprt, struct rpcrdma_ep *ep, 688 struct rpcrdma_ia *ia) 689 { 690 struct rdma_cm_id *id, *old; 691 int err, rc; 692 693 trace_xprtrdma_reconnect(r_xprt); 694 695 rpcrdma_ep_disconnect(ep, ia); 696 697 rc = -EHOSTUNREACH; 698 id = rpcrdma_create_id(r_xprt, ia); 699 if (IS_ERR(id)) 700 goto out; 701 702 /* As long as the new ID points to the same device as the 703 * old ID, we can reuse the transport's existing PD and all 704 * previously allocated MRs. Also, the same device means 705 * the transport's previous DMA mappings are still valid. 706 * 707 * This is a sanity check only. There should be no way these 708 * point to two different devices here. 709 */ 710 old = id; 711 rc = -ENETUNREACH; 712 if (ia->ri_device != id->device) { 713 pr_err("rpcrdma: can't reconnect on different device!\n"); 714 goto out_destroy; 715 } 716 717 err = rdma_create_qp(id, ia->ri_pd, &ep->rep_attr); 718 if (err) { 719 dprintk("RPC: %s: rdma_create_qp returned %d\n", 720 __func__, err); 721 goto out_destroy; 722 } 723 724 /* Atomically replace the transport's ID and QP. */ 725 rc = 0; 726 old = ia->ri_id; 727 ia->ri_id = id; 728 rdma_destroy_qp(old); 729 730 out_destroy: 731 rdma_destroy_id(old); 732 out: 733 return rc; 734 } 735 736 /* 737 * Connect unconnected endpoint. 738 */ 739 int 740 rpcrdma_ep_connect(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia) 741 { 742 struct rpcrdma_xprt *r_xprt = container_of(ia, struct rpcrdma_xprt, 743 rx_ia); 744 unsigned int extras; 745 int rc; 746 747 retry: 748 switch (ep->rep_connected) { 749 case 0: 750 dprintk("RPC: %s: connecting...\n", __func__); 751 rc = rdma_create_qp(ia->ri_id, ia->ri_pd, &ep->rep_attr); 752 if (rc) { 753 dprintk("RPC: %s: rdma_create_qp failed %i\n", 754 __func__, rc); 755 rc = -ENETUNREACH; 756 goto out_noupdate; 757 } 758 break; 759 case -ENODEV: 760 rc = rpcrdma_ep_recreate_xprt(r_xprt, ep, ia); 761 if (rc) 762 goto out_noupdate; 763 break; 764 default: 765 rc = rpcrdma_ep_reconnect(r_xprt, ep, ia); 766 if (rc) 767 goto out; 768 } 769 770 ep->rep_connected = 0; 771 772 rc = rdma_connect(ia->ri_id, &ep->rep_remote_cma); 773 if (rc) { 774 dprintk("RPC: %s: rdma_connect() failed with %i\n", 775 __func__, rc); 776 goto out; 777 } 778 779 wait_event_interruptible(ep->rep_connect_wait, ep->rep_connected != 0); 780 if (ep->rep_connected <= 0) { 781 if (ep->rep_connected == -EAGAIN) 782 goto retry; 783 rc = ep->rep_connected; 784 goto out; 785 } 786 787 dprintk("RPC: %s: connected\n", __func__); 788 extras = r_xprt->rx_buf.rb_bc_srv_max_requests; 789 if (extras) 790 rpcrdma_ep_post_extra_recv(r_xprt, extras); 791 792 out: 793 if (rc) 794 ep->rep_connected = rc; 795 796 out_noupdate: 797 return rc; 798 } 799 800 /* 801 * rpcrdma_ep_disconnect 802 * 803 * This is separate from destroy to facilitate the ability 804 * to reconnect without recreating the endpoint. 805 * 806 * This call is not reentrant, and must not be made in parallel 807 * on the same endpoint. 808 */ 809 void 810 rpcrdma_ep_disconnect(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia) 811 { 812 int rc; 813 814 rc = rdma_disconnect(ia->ri_id); 815 if (!rc) 816 /* returns without wait if not connected */ 817 wait_event_interruptible(ep->rep_connect_wait, 818 ep->rep_connected != 1); 819 else 820 ep->rep_connected = rc; 821 trace_xprtrdma_disconnect(container_of(ep, struct rpcrdma_xprt, 822 rx_ep), rc); 823 824 ib_drain_qp(ia->ri_id->qp); 825 } 826 827 /* Fixed-size circular FIFO queue. This implementation is wait-free and 828 * lock-free. 829 * 830 * Consumer is the code path that posts Sends. This path dequeues a 831 * sendctx for use by a Send operation. Multiple consumer threads 832 * are serialized by the RPC transport lock, which allows only one 833 * ->send_request call at a time. 834 * 835 * Producer is the code path that handles Send completions. This path 836 * enqueues a sendctx that has been completed. Multiple producer 837 * threads are serialized by the ib_poll_cq() function. 838 */ 839 840 /* rpcrdma_sendctxs_destroy() assumes caller has already quiesced 841 * queue activity, and ib_drain_qp has flushed all remaining Send 842 * requests. 843 */ 844 static void rpcrdma_sendctxs_destroy(struct rpcrdma_buffer *buf) 845 { 846 unsigned long i; 847 848 for (i = 0; i <= buf->rb_sc_last; i++) 849 kfree(buf->rb_sc_ctxs[i]); 850 kfree(buf->rb_sc_ctxs); 851 } 852 853 static struct rpcrdma_sendctx *rpcrdma_sendctx_create(struct rpcrdma_ia *ia) 854 { 855 struct rpcrdma_sendctx *sc; 856 857 sc = kzalloc(sizeof(*sc) + 858 ia->ri_max_send_sges * sizeof(struct ib_sge), 859 GFP_KERNEL); 860 if (!sc) 861 return NULL; 862 863 sc->sc_wr.wr_cqe = &sc->sc_cqe; 864 sc->sc_wr.sg_list = sc->sc_sges; 865 sc->sc_wr.opcode = IB_WR_SEND; 866 sc->sc_cqe.done = rpcrdma_wc_send; 867 return sc; 868 } 869 870 static int rpcrdma_sendctxs_create(struct rpcrdma_xprt *r_xprt) 871 { 872 struct rpcrdma_buffer *buf = &r_xprt->rx_buf; 873 struct rpcrdma_sendctx *sc; 874 unsigned long i; 875 876 /* Maximum number of concurrent outstanding Send WRs. Capping 877 * the circular queue size stops Send Queue overflow by causing 878 * the ->send_request call to fail temporarily before too many 879 * Sends are posted. 880 */ 881 i = buf->rb_max_requests + RPCRDMA_MAX_BC_REQUESTS; 882 dprintk("RPC: %s: allocating %lu send_ctxs\n", __func__, i); 883 buf->rb_sc_ctxs = kcalloc(i, sizeof(sc), GFP_KERNEL); 884 if (!buf->rb_sc_ctxs) 885 return -ENOMEM; 886 887 buf->rb_sc_last = i - 1; 888 for (i = 0; i <= buf->rb_sc_last; i++) { 889 sc = rpcrdma_sendctx_create(&r_xprt->rx_ia); 890 if (!sc) 891 goto out_destroy; 892 893 sc->sc_xprt = r_xprt; 894 buf->rb_sc_ctxs[i] = sc; 895 } 896 897 return 0; 898 899 out_destroy: 900 rpcrdma_sendctxs_destroy(buf); 901 return -ENOMEM; 902 } 903 904 /* The sendctx queue is not guaranteed to have a size that is a 905 * power of two, thus the helpers in circ_buf.h cannot be used. 906 * The other option is to use modulus (%), which can be expensive. 907 */ 908 static unsigned long rpcrdma_sendctx_next(struct rpcrdma_buffer *buf, 909 unsigned long item) 910 { 911 return likely(item < buf->rb_sc_last) ? item + 1 : 0; 912 } 913 914 /** 915 * rpcrdma_sendctx_get_locked - Acquire a send context 916 * @buf: transport buffers from which to acquire an unused context 917 * 918 * Returns pointer to a free send completion context; or NULL if 919 * the queue is empty. 920 * 921 * Usage: Called to acquire an SGE array before preparing a Send WR. 922 * 923 * The caller serializes calls to this function (per rpcrdma_buffer), 924 * and provides an effective memory barrier that flushes the new value 925 * of rb_sc_head. 926 */ 927 struct rpcrdma_sendctx *rpcrdma_sendctx_get_locked(struct rpcrdma_buffer *buf) 928 { 929 struct rpcrdma_xprt *r_xprt; 930 struct rpcrdma_sendctx *sc; 931 unsigned long next_head; 932 933 next_head = rpcrdma_sendctx_next(buf, buf->rb_sc_head); 934 935 if (next_head == READ_ONCE(buf->rb_sc_tail)) 936 goto out_emptyq; 937 938 /* ORDER: item must be accessed _before_ head is updated */ 939 sc = buf->rb_sc_ctxs[next_head]; 940 941 /* Releasing the lock in the caller acts as a memory 942 * barrier that flushes rb_sc_head. 943 */ 944 buf->rb_sc_head = next_head; 945 946 return sc; 947 948 out_emptyq: 949 /* The queue is "empty" if there have not been enough Send 950 * completions recently. This is a sign the Send Queue is 951 * backing up. Cause the caller to pause and try again. 952 */ 953 dprintk("RPC: %s: empty sendctx queue\n", __func__); 954 r_xprt = container_of(buf, struct rpcrdma_xprt, rx_buf); 955 r_xprt->rx_stats.empty_sendctx_q++; 956 return NULL; 957 } 958 959 /** 960 * rpcrdma_sendctx_put_locked - Release a send context 961 * @sc: send context to release 962 * 963 * Usage: Called from Send completion to return a sendctxt 964 * to the queue. 965 * 966 * The caller serializes calls to this function (per rpcrdma_buffer). 967 */ 968 void rpcrdma_sendctx_put_locked(struct rpcrdma_sendctx *sc) 969 { 970 struct rpcrdma_buffer *buf = &sc->sc_xprt->rx_buf; 971 unsigned long next_tail; 972 973 /* Unmap SGEs of previously completed by unsignaled 974 * Sends by walking up the queue until @sc is found. 975 */ 976 next_tail = buf->rb_sc_tail; 977 do { 978 next_tail = rpcrdma_sendctx_next(buf, next_tail); 979 980 /* ORDER: item must be accessed _before_ tail is updated */ 981 rpcrdma_unmap_sendctx(buf->rb_sc_ctxs[next_tail]); 982 983 } while (buf->rb_sc_ctxs[next_tail] != sc); 984 985 /* Paired with READ_ONCE */ 986 smp_store_release(&buf->rb_sc_tail, next_tail); 987 } 988 989 static void 990 rpcrdma_mr_recovery_worker(struct work_struct *work) 991 { 992 struct rpcrdma_buffer *buf = container_of(work, struct rpcrdma_buffer, 993 rb_recovery_worker.work); 994 struct rpcrdma_mr *mr; 995 996 spin_lock(&buf->rb_recovery_lock); 997 while (!list_empty(&buf->rb_stale_mrs)) { 998 mr = rpcrdma_mr_pop(&buf->rb_stale_mrs); 999 spin_unlock(&buf->rb_recovery_lock); 1000 1001 trace_xprtrdma_recover_mr(mr); 1002 mr->mr_xprt->rx_ia.ri_ops->ro_recover_mr(mr); 1003 1004 spin_lock(&buf->rb_recovery_lock); 1005 } 1006 spin_unlock(&buf->rb_recovery_lock); 1007 } 1008 1009 void 1010 rpcrdma_mr_defer_recovery(struct rpcrdma_mr *mr) 1011 { 1012 struct rpcrdma_xprt *r_xprt = mr->mr_xprt; 1013 struct rpcrdma_buffer *buf = &r_xprt->rx_buf; 1014 1015 spin_lock(&buf->rb_recovery_lock); 1016 rpcrdma_mr_push(mr, &buf->rb_stale_mrs); 1017 spin_unlock(&buf->rb_recovery_lock); 1018 1019 schedule_delayed_work(&buf->rb_recovery_worker, 0); 1020 } 1021 1022 static void 1023 rpcrdma_mrs_create(struct rpcrdma_xprt *r_xprt) 1024 { 1025 struct rpcrdma_buffer *buf = &r_xprt->rx_buf; 1026 struct rpcrdma_ia *ia = &r_xprt->rx_ia; 1027 unsigned int count; 1028 LIST_HEAD(free); 1029 LIST_HEAD(all); 1030 1031 for (count = 0; count < 3; count++) { 1032 struct rpcrdma_mr *mr; 1033 int rc; 1034 1035 mr = kzalloc(sizeof(*mr), GFP_KERNEL); 1036 if (!mr) 1037 break; 1038 1039 rc = ia->ri_ops->ro_init_mr(ia, mr); 1040 if (rc) { 1041 kfree(mr); 1042 break; 1043 } 1044 1045 mr->mr_xprt = r_xprt; 1046 1047 list_add(&mr->mr_list, &free); 1048 list_add(&mr->mr_all, &all); 1049 } 1050 1051 spin_lock(&buf->rb_mrlock); 1052 list_splice(&free, &buf->rb_mrs); 1053 list_splice(&all, &buf->rb_all); 1054 r_xprt->rx_stats.mrs_allocated += count; 1055 spin_unlock(&buf->rb_mrlock); 1056 trace_xprtrdma_createmrs(r_xprt, count); 1057 1058 xprt_write_space(&r_xprt->rx_xprt); 1059 } 1060 1061 static void 1062 rpcrdma_mr_refresh_worker(struct work_struct *work) 1063 { 1064 struct rpcrdma_buffer *buf = container_of(work, struct rpcrdma_buffer, 1065 rb_refresh_worker.work); 1066 struct rpcrdma_xprt *r_xprt = container_of(buf, struct rpcrdma_xprt, 1067 rx_buf); 1068 1069 rpcrdma_mrs_create(r_xprt); 1070 } 1071 1072 struct rpcrdma_req * 1073 rpcrdma_create_req(struct rpcrdma_xprt *r_xprt) 1074 { 1075 struct rpcrdma_buffer *buffer = &r_xprt->rx_buf; 1076 struct rpcrdma_regbuf *rb; 1077 struct rpcrdma_req *req; 1078 1079 req = kzalloc(sizeof(*req), GFP_KERNEL); 1080 if (req == NULL) 1081 return ERR_PTR(-ENOMEM); 1082 1083 rb = rpcrdma_alloc_regbuf(RPCRDMA_HDRBUF_SIZE, 1084 DMA_TO_DEVICE, GFP_KERNEL); 1085 if (IS_ERR(rb)) { 1086 kfree(req); 1087 return ERR_PTR(-ENOMEM); 1088 } 1089 req->rl_rdmabuf = rb; 1090 xdr_buf_init(&req->rl_hdrbuf, rb->rg_base, rdmab_length(rb)); 1091 req->rl_buffer = buffer; 1092 INIT_LIST_HEAD(&req->rl_registered); 1093 1094 spin_lock(&buffer->rb_reqslock); 1095 list_add(&req->rl_all, &buffer->rb_allreqs); 1096 spin_unlock(&buffer->rb_reqslock); 1097 return req; 1098 } 1099 1100 /** 1101 * rpcrdma_create_rep - Allocate an rpcrdma_rep object 1102 * @r_xprt: controlling transport 1103 * 1104 * Returns 0 on success or a negative errno on failure. 1105 */ 1106 int 1107 rpcrdma_create_rep(struct rpcrdma_xprt *r_xprt) 1108 { 1109 struct rpcrdma_create_data_internal *cdata = &r_xprt->rx_data; 1110 struct rpcrdma_buffer *buf = &r_xprt->rx_buf; 1111 struct rpcrdma_rep *rep; 1112 int rc; 1113 1114 rc = -ENOMEM; 1115 rep = kzalloc(sizeof(*rep), GFP_KERNEL); 1116 if (rep == NULL) 1117 goto out; 1118 1119 rep->rr_rdmabuf = rpcrdma_alloc_regbuf(cdata->inline_rsize, 1120 DMA_FROM_DEVICE, GFP_KERNEL); 1121 if (IS_ERR(rep->rr_rdmabuf)) { 1122 rc = PTR_ERR(rep->rr_rdmabuf); 1123 goto out_free; 1124 } 1125 xdr_buf_init(&rep->rr_hdrbuf, rep->rr_rdmabuf->rg_base, 1126 rdmab_length(rep->rr_rdmabuf)); 1127 1128 rep->rr_cqe.done = rpcrdma_wc_receive; 1129 rep->rr_rxprt = r_xprt; 1130 INIT_WORK(&rep->rr_work, rpcrdma_deferred_completion); 1131 rep->rr_recv_wr.next = NULL; 1132 rep->rr_recv_wr.wr_cqe = &rep->rr_cqe; 1133 rep->rr_recv_wr.sg_list = &rep->rr_rdmabuf->rg_iov; 1134 rep->rr_recv_wr.num_sge = 1; 1135 1136 spin_lock(&buf->rb_lock); 1137 list_add(&rep->rr_list, &buf->rb_recv_bufs); 1138 spin_unlock(&buf->rb_lock); 1139 return 0; 1140 1141 out_free: 1142 kfree(rep); 1143 out: 1144 dprintk("RPC: %s: reply buffer %d alloc failed\n", 1145 __func__, rc); 1146 return rc; 1147 } 1148 1149 int 1150 rpcrdma_buffer_create(struct rpcrdma_xprt *r_xprt) 1151 { 1152 struct rpcrdma_buffer *buf = &r_xprt->rx_buf; 1153 int i, rc; 1154 1155 buf->rb_max_requests = r_xprt->rx_data.max_requests; 1156 buf->rb_bc_srv_max_requests = 0; 1157 spin_lock_init(&buf->rb_mrlock); 1158 spin_lock_init(&buf->rb_lock); 1159 spin_lock_init(&buf->rb_recovery_lock); 1160 INIT_LIST_HEAD(&buf->rb_mrs); 1161 INIT_LIST_HEAD(&buf->rb_all); 1162 INIT_LIST_HEAD(&buf->rb_stale_mrs); 1163 INIT_DELAYED_WORK(&buf->rb_refresh_worker, 1164 rpcrdma_mr_refresh_worker); 1165 INIT_DELAYED_WORK(&buf->rb_recovery_worker, 1166 rpcrdma_mr_recovery_worker); 1167 1168 rpcrdma_mrs_create(r_xprt); 1169 1170 INIT_LIST_HEAD(&buf->rb_send_bufs); 1171 INIT_LIST_HEAD(&buf->rb_allreqs); 1172 spin_lock_init(&buf->rb_reqslock); 1173 for (i = 0; i < buf->rb_max_requests; i++) { 1174 struct rpcrdma_req *req; 1175 1176 req = rpcrdma_create_req(r_xprt); 1177 if (IS_ERR(req)) { 1178 dprintk("RPC: %s: request buffer %d alloc" 1179 " failed\n", __func__, i); 1180 rc = PTR_ERR(req); 1181 goto out; 1182 } 1183 list_add(&req->rl_list, &buf->rb_send_bufs); 1184 } 1185 1186 INIT_LIST_HEAD(&buf->rb_recv_bufs); 1187 for (i = 0; i <= buf->rb_max_requests; i++) { 1188 rc = rpcrdma_create_rep(r_xprt); 1189 if (rc) 1190 goto out; 1191 } 1192 1193 rc = rpcrdma_sendctxs_create(r_xprt); 1194 if (rc) 1195 goto out; 1196 1197 return 0; 1198 out: 1199 rpcrdma_buffer_destroy(buf); 1200 return rc; 1201 } 1202 1203 static struct rpcrdma_req * 1204 rpcrdma_buffer_get_req_locked(struct rpcrdma_buffer *buf) 1205 { 1206 struct rpcrdma_req *req; 1207 1208 req = list_first_entry(&buf->rb_send_bufs, 1209 struct rpcrdma_req, rl_list); 1210 list_del_init(&req->rl_list); 1211 return req; 1212 } 1213 1214 static struct rpcrdma_rep * 1215 rpcrdma_buffer_get_rep_locked(struct rpcrdma_buffer *buf) 1216 { 1217 struct rpcrdma_rep *rep; 1218 1219 rep = list_first_entry(&buf->rb_recv_bufs, 1220 struct rpcrdma_rep, rr_list); 1221 list_del(&rep->rr_list); 1222 return rep; 1223 } 1224 1225 static void 1226 rpcrdma_destroy_rep(struct rpcrdma_rep *rep) 1227 { 1228 rpcrdma_free_regbuf(rep->rr_rdmabuf); 1229 kfree(rep); 1230 } 1231 1232 void 1233 rpcrdma_destroy_req(struct rpcrdma_req *req) 1234 { 1235 rpcrdma_free_regbuf(req->rl_recvbuf); 1236 rpcrdma_free_regbuf(req->rl_sendbuf); 1237 rpcrdma_free_regbuf(req->rl_rdmabuf); 1238 kfree(req); 1239 } 1240 1241 static void 1242 rpcrdma_mrs_destroy(struct rpcrdma_buffer *buf) 1243 { 1244 struct rpcrdma_xprt *r_xprt = container_of(buf, struct rpcrdma_xprt, 1245 rx_buf); 1246 struct rpcrdma_ia *ia = rdmab_to_ia(buf); 1247 struct rpcrdma_mr *mr; 1248 unsigned int count; 1249 1250 count = 0; 1251 spin_lock(&buf->rb_mrlock); 1252 while (!list_empty(&buf->rb_all)) { 1253 mr = list_entry(buf->rb_all.next, struct rpcrdma_mr, mr_all); 1254 list_del(&mr->mr_all); 1255 1256 spin_unlock(&buf->rb_mrlock); 1257 ia->ri_ops->ro_release_mr(mr); 1258 count++; 1259 spin_lock(&buf->rb_mrlock); 1260 } 1261 spin_unlock(&buf->rb_mrlock); 1262 r_xprt->rx_stats.mrs_allocated = 0; 1263 1264 dprintk("RPC: %s: released %u MRs\n", __func__, count); 1265 } 1266 1267 void 1268 rpcrdma_buffer_destroy(struct rpcrdma_buffer *buf) 1269 { 1270 cancel_delayed_work_sync(&buf->rb_recovery_worker); 1271 cancel_delayed_work_sync(&buf->rb_refresh_worker); 1272 1273 rpcrdma_sendctxs_destroy(buf); 1274 1275 while (!list_empty(&buf->rb_recv_bufs)) { 1276 struct rpcrdma_rep *rep; 1277 1278 rep = rpcrdma_buffer_get_rep_locked(buf); 1279 rpcrdma_destroy_rep(rep); 1280 } 1281 buf->rb_send_count = 0; 1282 1283 spin_lock(&buf->rb_reqslock); 1284 while (!list_empty(&buf->rb_allreqs)) { 1285 struct rpcrdma_req *req; 1286 1287 req = list_first_entry(&buf->rb_allreqs, 1288 struct rpcrdma_req, rl_all); 1289 list_del(&req->rl_all); 1290 1291 spin_unlock(&buf->rb_reqslock); 1292 rpcrdma_destroy_req(req); 1293 spin_lock(&buf->rb_reqslock); 1294 } 1295 spin_unlock(&buf->rb_reqslock); 1296 buf->rb_recv_count = 0; 1297 1298 rpcrdma_mrs_destroy(buf); 1299 } 1300 1301 /** 1302 * rpcrdma_mr_get - Allocate an rpcrdma_mr object 1303 * @r_xprt: controlling transport 1304 * 1305 * Returns an initialized rpcrdma_mr or NULL if no free 1306 * rpcrdma_mr objects are available. 1307 */ 1308 struct rpcrdma_mr * 1309 rpcrdma_mr_get(struct rpcrdma_xprt *r_xprt) 1310 { 1311 struct rpcrdma_buffer *buf = &r_xprt->rx_buf; 1312 struct rpcrdma_mr *mr = NULL; 1313 1314 spin_lock(&buf->rb_mrlock); 1315 if (!list_empty(&buf->rb_mrs)) 1316 mr = rpcrdma_mr_pop(&buf->rb_mrs); 1317 spin_unlock(&buf->rb_mrlock); 1318 1319 if (!mr) 1320 goto out_nomrs; 1321 return mr; 1322 1323 out_nomrs: 1324 trace_xprtrdma_nomrs(r_xprt); 1325 if (r_xprt->rx_ep.rep_connected != -ENODEV) 1326 schedule_delayed_work(&buf->rb_refresh_worker, 0); 1327 1328 /* Allow the reply handler and refresh worker to run */ 1329 cond_resched(); 1330 1331 return NULL; 1332 } 1333 1334 static void 1335 __rpcrdma_mr_put(struct rpcrdma_buffer *buf, struct rpcrdma_mr *mr) 1336 { 1337 spin_lock(&buf->rb_mrlock); 1338 rpcrdma_mr_push(mr, &buf->rb_mrs); 1339 spin_unlock(&buf->rb_mrlock); 1340 } 1341 1342 /** 1343 * rpcrdma_mr_put - Release an rpcrdma_mr object 1344 * @mr: object to release 1345 * 1346 */ 1347 void 1348 rpcrdma_mr_put(struct rpcrdma_mr *mr) 1349 { 1350 __rpcrdma_mr_put(&mr->mr_xprt->rx_buf, mr); 1351 } 1352 1353 /** 1354 * rpcrdma_mr_unmap_and_put - DMA unmap an MR and release it 1355 * @mr: object to release 1356 * 1357 */ 1358 void 1359 rpcrdma_mr_unmap_and_put(struct rpcrdma_mr *mr) 1360 { 1361 struct rpcrdma_xprt *r_xprt = mr->mr_xprt; 1362 1363 trace_xprtrdma_dma_unmap(mr); 1364 ib_dma_unmap_sg(r_xprt->rx_ia.ri_device, 1365 mr->mr_sg, mr->mr_nents, mr->mr_dir); 1366 __rpcrdma_mr_put(&r_xprt->rx_buf, mr); 1367 } 1368 1369 static struct rpcrdma_rep * 1370 rpcrdma_buffer_get_rep(struct rpcrdma_buffer *buffers) 1371 { 1372 /* If an RPC previously completed without a reply (say, a 1373 * credential problem or a soft timeout occurs) then hold off 1374 * on supplying more Receive buffers until the number of new 1375 * pending RPCs catches up to the number of posted Receives. 1376 */ 1377 if (unlikely(buffers->rb_send_count < buffers->rb_recv_count)) 1378 return NULL; 1379 1380 if (unlikely(list_empty(&buffers->rb_recv_bufs))) 1381 return NULL; 1382 buffers->rb_recv_count++; 1383 return rpcrdma_buffer_get_rep_locked(buffers); 1384 } 1385 1386 /* 1387 * Get a set of request/reply buffers. 1388 * 1389 * Reply buffer (if available) is attached to send buffer upon return. 1390 */ 1391 struct rpcrdma_req * 1392 rpcrdma_buffer_get(struct rpcrdma_buffer *buffers) 1393 { 1394 struct rpcrdma_req *req; 1395 1396 spin_lock(&buffers->rb_lock); 1397 if (list_empty(&buffers->rb_send_bufs)) 1398 goto out_reqbuf; 1399 buffers->rb_send_count++; 1400 req = rpcrdma_buffer_get_req_locked(buffers); 1401 req->rl_reply = rpcrdma_buffer_get_rep(buffers); 1402 spin_unlock(&buffers->rb_lock); 1403 1404 return req; 1405 1406 out_reqbuf: 1407 spin_unlock(&buffers->rb_lock); 1408 return NULL; 1409 } 1410 1411 /* 1412 * Put request/reply buffers back into pool. 1413 * Pre-decrement counter/array index. 1414 */ 1415 void 1416 rpcrdma_buffer_put(struct rpcrdma_req *req) 1417 { 1418 struct rpcrdma_buffer *buffers = req->rl_buffer; 1419 struct rpcrdma_rep *rep = req->rl_reply; 1420 1421 req->rl_reply = NULL; 1422 1423 spin_lock(&buffers->rb_lock); 1424 buffers->rb_send_count--; 1425 list_add_tail(&req->rl_list, &buffers->rb_send_bufs); 1426 if (rep) { 1427 buffers->rb_recv_count--; 1428 list_add_tail(&rep->rr_list, &buffers->rb_recv_bufs); 1429 } 1430 spin_unlock(&buffers->rb_lock); 1431 } 1432 1433 /* 1434 * Recover reply buffers from pool. 1435 * This happens when recovering from disconnect. 1436 */ 1437 void 1438 rpcrdma_recv_buffer_get(struct rpcrdma_req *req) 1439 { 1440 struct rpcrdma_buffer *buffers = req->rl_buffer; 1441 1442 spin_lock(&buffers->rb_lock); 1443 req->rl_reply = rpcrdma_buffer_get_rep(buffers); 1444 spin_unlock(&buffers->rb_lock); 1445 } 1446 1447 /* 1448 * Put reply buffers back into pool when not attached to 1449 * request. This happens in error conditions. 1450 */ 1451 void 1452 rpcrdma_recv_buffer_put(struct rpcrdma_rep *rep) 1453 { 1454 struct rpcrdma_buffer *buffers = &rep->rr_rxprt->rx_buf; 1455 1456 spin_lock(&buffers->rb_lock); 1457 buffers->rb_recv_count--; 1458 list_add_tail(&rep->rr_list, &buffers->rb_recv_bufs); 1459 spin_unlock(&buffers->rb_lock); 1460 } 1461 1462 /** 1463 * rpcrdma_alloc_regbuf - allocate and DMA-map memory for SEND/RECV buffers 1464 * @size: size of buffer to be allocated, in bytes 1465 * @direction: direction of data movement 1466 * @flags: GFP flags 1467 * 1468 * Returns an ERR_PTR, or a pointer to a regbuf, a buffer that 1469 * can be persistently DMA-mapped for I/O. 1470 * 1471 * xprtrdma uses a regbuf for posting an outgoing RDMA SEND, or for 1472 * receiving the payload of RDMA RECV operations. During Long Calls 1473 * or Replies they may be registered externally via ro_map. 1474 */ 1475 struct rpcrdma_regbuf * 1476 rpcrdma_alloc_regbuf(size_t size, enum dma_data_direction direction, 1477 gfp_t flags) 1478 { 1479 struct rpcrdma_regbuf *rb; 1480 1481 rb = kmalloc(sizeof(*rb) + size, flags); 1482 if (rb == NULL) 1483 return ERR_PTR(-ENOMEM); 1484 1485 rb->rg_device = NULL; 1486 rb->rg_direction = direction; 1487 rb->rg_iov.length = size; 1488 1489 return rb; 1490 } 1491 1492 /** 1493 * __rpcrdma_map_regbuf - DMA-map a regbuf 1494 * @ia: controlling rpcrdma_ia 1495 * @rb: regbuf to be mapped 1496 */ 1497 bool 1498 __rpcrdma_dma_map_regbuf(struct rpcrdma_ia *ia, struct rpcrdma_regbuf *rb) 1499 { 1500 struct ib_device *device = ia->ri_device; 1501 1502 if (rb->rg_direction == DMA_NONE) 1503 return false; 1504 1505 rb->rg_iov.addr = ib_dma_map_single(device, 1506 (void *)rb->rg_base, 1507 rdmab_length(rb), 1508 rb->rg_direction); 1509 if (ib_dma_mapping_error(device, rdmab_addr(rb))) 1510 return false; 1511 1512 rb->rg_device = device; 1513 rb->rg_iov.lkey = ia->ri_pd->local_dma_lkey; 1514 return true; 1515 } 1516 1517 static void 1518 rpcrdma_dma_unmap_regbuf(struct rpcrdma_regbuf *rb) 1519 { 1520 if (!rb) 1521 return; 1522 1523 if (!rpcrdma_regbuf_is_mapped(rb)) 1524 return; 1525 1526 ib_dma_unmap_single(rb->rg_device, rdmab_addr(rb), 1527 rdmab_length(rb), rb->rg_direction); 1528 rb->rg_device = NULL; 1529 } 1530 1531 /** 1532 * rpcrdma_free_regbuf - deregister and free registered buffer 1533 * @rb: regbuf to be deregistered and freed 1534 */ 1535 void 1536 rpcrdma_free_regbuf(struct rpcrdma_regbuf *rb) 1537 { 1538 rpcrdma_dma_unmap_regbuf(rb); 1539 kfree(rb); 1540 } 1541 1542 /* 1543 * Prepost any receive buffer, then post send. 1544 * 1545 * Receive buffer is donated to hardware, reclaimed upon recv completion. 1546 */ 1547 int 1548 rpcrdma_ep_post(struct rpcrdma_ia *ia, 1549 struct rpcrdma_ep *ep, 1550 struct rpcrdma_req *req) 1551 { 1552 struct ib_send_wr *send_wr = &req->rl_sendctx->sc_wr; 1553 int rc; 1554 1555 if (req->rl_reply) { 1556 rc = rpcrdma_ep_post_recv(ia, req->rl_reply); 1557 if (rc) 1558 return rc; 1559 req->rl_reply = NULL; 1560 } 1561 1562 if (!ep->rep_send_count || 1563 test_bit(RPCRDMA_REQ_F_TX_RESOURCES, &req->rl_flags)) { 1564 send_wr->send_flags |= IB_SEND_SIGNALED; 1565 ep->rep_send_count = ep->rep_send_batch; 1566 } else { 1567 send_wr->send_flags &= ~IB_SEND_SIGNALED; 1568 --ep->rep_send_count; 1569 } 1570 1571 rc = ia->ri_ops->ro_send(ia, req); 1572 trace_xprtrdma_post_send(req, rc); 1573 if (rc) 1574 return -ENOTCONN; 1575 return 0; 1576 } 1577 1578 int 1579 rpcrdma_ep_post_recv(struct rpcrdma_ia *ia, 1580 struct rpcrdma_rep *rep) 1581 { 1582 struct ib_recv_wr *recv_wr_fail; 1583 int rc; 1584 1585 if (!rpcrdma_dma_map_regbuf(ia, rep->rr_rdmabuf)) 1586 goto out_map; 1587 rc = ib_post_recv(ia->ri_id->qp, &rep->rr_recv_wr, &recv_wr_fail); 1588 trace_xprtrdma_post_recv(rep, rc); 1589 if (rc) 1590 return -ENOTCONN; 1591 return 0; 1592 1593 out_map: 1594 pr_err("rpcrdma: failed to DMA map the Receive buffer\n"); 1595 return -EIO; 1596 } 1597 1598 /** 1599 * rpcrdma_ep_post_extra_recv - Post buffers for incoming backchannel requests 1600 * @r_xprt: transport associated with these backchannel resources 1601 * @count: minimum number of incoming requests expected 1602 * 1603 * Returns zero if all requested buffers were posted, or a negative errno. 1604 */ 1605 int 1606 rpcrdma_ep_post_extra_recv(struct rpcrdma_xprt *r_xprt, unsigned int count) 1607 { 1608 struct rpcrdma_buffer *buffers = &r_xprt->rx_buf; 1609 struct rpcrdma_ia *ia = &r_xprt->rx_ia; 1610 struct rpcrdma_rep *rep; 1611 int rc; 1612 1613 while (count--) { 1614 spin_lock(&buffers->rb_lock); 1615 if (list_empty(&buffers->rb_recv_bufs)) 1616 goto out_reqbuf; 1617 rep = rpcrdma_buffer_get_rep_locked(buffers); 1618 spin_unlock(&buffers->rb_lock); 1619 1620 rc = rpcrdma_ep_post_recv(ia, rep); 1621 if (rc) 1622 goto out_rc; 1623 } 1624 1625 return 0; 1626 1627 out_reqbuf: 1628 spin_unlock(&buffers->rb_lock); 1629 trace_xprtrdma_noreps(r_xprt); 1630 return -ENOMEM; 1631 1632 out_rc: 1633 rpcrdma_recv_buffer_put(rep); 1634 return rc; 1635 } 1636