1 /* 2 * Copyright (c) 2003-2007 Network Appliance, Inc. All rights reserved. 3 * 4 * This software is available to you under a choice of one of two 5 * licenses. You may choose to be licensed under the terms of the GNU 6 * General Public License (GPL) Version 2, available from the file 7 * COPYING in the main directory of this source tree, or the BSD-type 8 * license below: 9 * 10 * Redistribution and use in source and binary forms, with or without 11 * modification, are permitted provided that the following conditions 12 * are met: 13 * 14 * Redistributions of source code must retain the above copyright 15 * notice, this list of conditions and the following disclaimer. 16 * 17 * Redistributions in binary form must reproduce the above 18 * copyright notice, this list of conditions and the following 19 * disclaimer in the documentation and/or other materials provided 20 * with the distribution. 21 * 22 * Neither the name of the Network Appliance, Inc. nor the names of 23 * its contributors may be used to endorse or promote products 24 * derived from this software without specific prior written 25 * permission. 26 * 27 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 28 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 29 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 30 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 31 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 32 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 33 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 34 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 35 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 36 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 37 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 38 */ 39 40 /* 41 * verbs.c 42 * 43 * Encapsulates the major functions managing: 44 * o adapters 45 * o endpoints 46 * o connections 47 * o buffer memory 48 */ 49 50 #include <linux/interrupt.h> 51 #include <linux/slab.h> 52 #include <linux/prefetch.h> 53 #include <linux/sunrpc/addr.h> 54 #include <linux/sunrpc/svc_rdma.h> 55 #include <asm/bitops.h> 56 57 #include <rdma/ib_cm.h> 58 59 #include "xprt_rdma.h" 60 61 /* 62 * Globals/Macros 63 */ 64 65 #if IS_ENABLED(CONFIG_SUNRPC_DEBUG) 66 # define RPCDBG_FACILITY RPCDBG_TRANS 67 #endif 68 69 /* 70 * internal functions 71 */ 72 static void rpcrdma_create_mrs(struct rpcrdma_xprt *r_xprt); 73 static void rpcrdma_destroy_mrs(struct rpcrdma_buffer *buf); 74 static void rpcrdma_dma_unmap_regbuf(struct rpcrdma_regbuf *rb); 75 76 static struct workqueue_struct *rpcrdma_receive_wq __read_mostly; 77 78 int 79 rpcrdma_alloc_wq(void) 80 { 81 struct workqueue_struct *recv_wq; 82 83 recv_wq = alloc_workqueue("xprtrdma_receive", 84 WQ_MEM_RECLAIM | WQ_UNBOUND | WQ_HIGHPRI, 85 0); 86 if (!recv_wq) 87 return -ENOMEM; 88 89 rpcrdma_receive_wq = recv_wq; 90 return 0; 91 } 92 93 void 94 rpcrdma_destroy_wq(void) 95 { 96 struct workqueue_struct *wq; 97 98 if (rpcrdma_receive_wq) { 99 wq = rpcrdma_receive_wq; 100 rpcrdma_receive_wq = NULL; 101 destroy_workqueue(wq); 102 } 103 } 104 105 static void 106 rpcrdma_qp_async_error_upcall(struct ib_event *event, void *context) 107 { 108 struct rpcrdma_ep *ep = context; 109 110 pr_err("rpcrdma: %s on device %s ep %p\n", 111 ib_event_msg(event->event), event->device->name, context); 112 113 if (ep->rep_connected == 1) { 114 ep->rep_connected = -EIO; 115 rpcrdma_conn_func(ep); 116 wake_up_all(&ep->rep_connect_wait); 117 } 118 } 119 120 /** 121 * rpcrdma_wc_send - Invoked by RDMA provider for each polled Send WC 122 * @cq: completion queue (ignored) 123 * @wc: completed WR 124 * 125 */ 126 static void 127 rpcrdma_wc_send(struct ib_cq *cq, struct ib_wc *wc) 128 { 129 /* WARNING: Only wr_cqe and status are reliable at this point */ 130 if (wc->status != IB_WC_SUCCESS && wc->status != IB_WC_WR_FLUSH_ERR) 131 pr_err("rpcrdma: Send: %s (%u/0x%x)\n", 132 ib_wc_status_msg(wc->status), 133 wc->status, wc->vendor_err); 134 } 135 136 /* Perform basic sanity checking to avoid using garbage 137 * to update the credit grant value. 138 */ 139 static void 140 rpcrdma_update_granted_credits(struct rpcrdma_rep *rep) 141 { 142 struct rpcrdma_msg *rmsgp = rdmab_to_msg(rep->rr_rdmabuf); 143 struct rpcrdma_buffer *buffer = &rep->rr_rxprt->rx_buf; 144 u32 credits; 145 146 if (rep->rr_len < RPCRDMA_HDRLEN_ERR) 147 return; 148 149 credits = be32_to_cpu(rmsgp->rm_credit); 150 if (credits == 0) 151 credits = 1; /* don't deadlock */ 152 else if (credits > buffer->rb_max_requests) 153 credits = buffer->rb_max_requests; 154 155 atomic_set(&buffer->rb_credits, credits); 156 } 157 158 /** 159 * rpcrdma_wc_receive - Invoked by RDMA provider for each polled Receive WC 160 * @cq: completion queue (ignored) 161 * @wc: completed WR 162 * 163 */ 164 static void 165 rpcrdma_wc_receive(struct ib_cq *cq, struct ib_wc *wc) 166 { 167 struct ib_cqe *cqe = wc->wr_cqe; 168 struct rpcrdma_rep *rep = container_of(cqe, struct rpcrdma_rep, 169 rr_cqe); 170 171 /* WARNING: Only wr_id and status are reliable at this point */ 172 if (wc->status != IB_WC_SUCCESS) 173 goto out_fail; 174 175 /* status == SUCCESS means all fields in wc are trustworthy */ 176 if (wc->opcode != IB_WC_RECV) 177 return; 178 179 dprintk("RPC: %s: rep %p opcode 'recv', length %u: success\n", 180 __func__, rep, wc->byte_len); 181 182 rep->rr_len = wc->byte_len; 183 rep->rr_wc_flags = wc->wc_flags; 184 rep->rr_inv_rkey = wc->ex.invalidate_rkey; 185 186 ib_dma_sync_single_for_cpu(rdmab_device(rep->rr_rdmabuf), 187 rdmab_addr(rep->rr_rdmabuf), 188 rep->rr_len, DMA_FROM_DEVICE); 189 190 rpcrdma_update_granted_credits(rep); 191 192 out_schedule: 193 queue_work(rpcrdma_receive_wq, &rep->rr_work); 194 return; 195 196 out_fail: 197 if (wc->status != IB_WC_WR_FLUSH_ERR) 198 pr_err("rpcrdma: Recv: %s (%u/0x%x)\n", 199 ib_wc_status_msg(wc->status), 200 wc->status, wc->vendor_err); 201 rep->rr_len = RPCRDMA_BAD_LEN; 202 goto out_schedule; 203 } 204 205 static void 206 rpcrdma_update_connect_private(struct rpcrdma_xprt *r_xprt, 207 struct rdma_conn_param *param) 208 { 209 struct rpcrdma_create_data_internal *cdata = &r_xprt->rx_data; 210 const struct rpcrdma_connect_private *pmsg = param->private_data; 211 unsigned int rsize, wsize; 212 213 /* Default settings for RPC-over-RDMA Version One */ 214 r_xprt->rx_ia.ri_reminv_expected = false; 215 r_xprt->rx_ia.ri_implicit_roundup = xprt_rdma_pad_optimize; 216 rsize = RPCRDMA_V1_DEF_INLINE_SIZE; 217 wsize = RPCRDMA_V1_DEF_INLINE_SIZE; 218 219 if (pmsg && 220 pmsg->cp_magic == rpcrdma_cmp_magic && 221 pmsg->cp_version == RPCRDMA_CMP_VERSION) { 222 r_xprt->rx_ia.ri_reminv_expected = true; 223 r_xprt->rx_ia.ri_implicit_roundup = true; 224 rsize = rpcrdma_decode_buffer_size(pmsg->cp_send_size); 225 wsize = rpcrdma_decode_buffer_size(pmsg->cp_recv_size); 226 } 227 228 if (rsize < cdata->inline_rsize) 229 cdata->inline_rsize = rsize; 230 if (wsize < cdata->inline_wsize) 231 cdata->inline_wsize = wsize; 232 dprintk("RPC: %s: max send %u, max recv %u\n", 233 __func__, cdata->inline_wsize, cdata->inline_rsize); 234 rpcrdma_set_max_header_sizes(r_xprt); 235 } 236 237 static int 238 rpcrdma_conn_upcall(struct rdma_cm_id *id, struct rdma_cm_event *event) 239 { 240 struct rpcrdma_xprt *xprt = id->context; 241 struct rpcrdma_ia *ia = &xprt->rx_ia; 242 struct rpcrdma_ep *ep = &xprt->rx_ep; 243 #if IS_ENABLED(CONFIG_SUNRPC_DEBUG) 244 struct sockaddr *sap = (struct sockaddr *)&ep->rep_remote_addr; 245 #endif 246 int connstate = 0; 247 248 switch (event->event) { 249 case RDMA_CM_EVENT_ADDR_RESOLVED: 250 case RDMA_CM_EVENT_ROUTE_RESOLVED: 251 ia->ri_async_rc = 0; 252 complete(&ia->ri_done); 253 break; 254 case RDMA_CM_EVENT_ADDR_ERROR: 255 ia->ri_async_rc = -EHOSTUNREACH; 256 dprintk("RPC: %s: CM address resolution error, ep 0x%p\n", 257 __func__, ep); 258 complete(&ia->ri_done); 259 break; 260 case RDMA_CM_EVENT_ROUTE_ERROR: 261 ia->ri_async_rc = -ENETUNREACH; 262 dprintk("RPC: %s: CM route resolution error, ep 0x%p\n", 263 __func__, ep); 264 complete(&ia->ri_done); 265 break; 266 case RDMA_CM_EVENT_DEVICE_REMOVAL: 267 #if IS_ENABLED(CONFIG_SUNRPC_DEBUG) 268 pr_info("rpcrdma: removing device %s for %pIS:%u\n", 269 ia->ri_device->name, 270 sap, rpc_get_port(sap)); 271 #endif 272 set_bit(RPCRDMA_IAF_REMOVING, &ia->ri_flags); 273 ep->rep_connected = -ENODEV; 274 xprt_force_disconnect(&xprt->rx_xprt); 275 wait_for_completion(&ia->ri_remove_done); 276 277 ia->ri_id = NULL; 278 ia->ri_pd = NULL; 279 ia->ri_device = NULL; 280 /* Return 1 to ensure the core destroys the id. */ 281 return 1; 282 case RDMA_CM_EVENT_ESTABLISHED: 283 connstate = 1; 284 rpcrdma_update_connect_private(xprt, &event->param.conn); 285 goto connected; 286 case RDMA_CM_EVENT_CONNECT_ERROR: 287 connstate = -ENOTCONN; 288 goto connected; 289 case RDMA_CM_EVENT_UNREACHABLE: 290 connstate = -ENETDOWN; 291 goto connected; 292 case RDMA_CM_EVENT_REJECTED: 293 dprintk("rpcrdma: connection to %pIS:%u rejected: %s\n", 294 sap, rpc_get_port(sap), 295 rdma_reject_msg(id, event->status)); 296 connstate = -ECONNREFUSED; 297 if (event->status == IB_CM_REJ_STALE_CONN) 298 connstate = -EAGAIN; 299 goto connected; 300 case RDMA_CM_EVENT_DISCONNECTED: 301 connstate = -ECONNABORTED; 302 connected: 303 atomic_set(&xprt->rx_buf.rb_credits, 1); 304 ep->rep_connected = connstate; 305 rpcrdma_conn_func(ep); 306 wake_up_all(&ep->rep_connect_wait); 307 /*FALLTHROUGH*/ 308 default: 309 dprintk("RPC: %s: %pIS:%u on %s/%s (ep 0x%p): %s\n", 310 __func__, sap, rpc_get_port(sap), 311 ia->ri_device->name, ia->ri_ops->ro_displayname, 312 ep, rdma_event_msg(event->event)); 313 break; 314 } 315 316 return 0; 317 } 318 319 static struct rdma_cm_id * 320 rpcrdma_create_id(struct rpcrdma_xprt *xprt, 321 struct rpcrdma_ia *ia, struct sockaddr *addr) 322 { 323 unsigned long wtimeout = msecs_to_jiffies(RDMA_RESOLVE_TIMEOUT) + 1; 324 struct rdma_cm_id *id; 325 int rc; 326 327 init_completion(&ia->ri_done); 328 init_completion(&ia->ri_remove_done); 329 330 id = rdma_create_id(&init_net, rpcrdma_conn_upcall, xprt, RDMA_PS_TCP, 331 IB_QPT_RC); 332 if (IS_ERR(id)) { 333 rc = PTR_ERR(id); 334 dprintk("RPC: %s: rdma_create_id() failed %i\n", 335 __func__, rc); 336 return id; 337 } 338 339 ia->ri_async_rc = -ETIMEDOUT; 340 rc = rdma_resolve_addr(id, NULL, addr, RDMA_RESOLVE_TIMEOUT); 341 if (rc) { 342 dprintk("RPC: %s: rdma_resolve_addr() failed %i\n", 343 __func__, rc); 344 goto out; 345 } 346 rc = wait_for_completion_interruptible_timeout(&ia->ri_done, wtimeout); 347 if (rc < 0) { 348 dprintk("RPC: %s: wait() exited: %i\n", 349 __func__, rc); 350 goto out; 351 } 352 353 rc = ia->ri_async_rc; 354 if (rc) 355 goto out; 356 357 ia->ri_async_rc = -ETIMEDOUT; 358 rc = rdma_resolve_route(id, RDMA_RESOLVE_TIMEOUT); 359 if (rc) { 360 dprintk("RPC: %s: rdma_resolve_route() failed %i\n", 361 __func__, rc); 362 goto out; 363 } 364 rc = wait_for_completion_interruptible_timeout(&ia->ri_done, wtimeout); 365 if (rc < 0) { 366 dprintk("RPC: %s: wait() exited: %i\n", 367 __func__, rc); 368 goto out; 369 } 370 rc = ia->ri_async_rc; 371 if (rc) 372 goto out; 373 374 return id; 375 376 out: 377 rdma_destroy_id(id); 378 return ERR_PTR(rc); 379 } 380 381 /* 382 * Exported functions. 383 */ 384 385 /** 386 * rpcrdma_ia_open - Open and initialize an Interface Adapter. 387 * @xprt: controlling transport 388 * @addr: IP address of remote peer 389 * 390 * Returns 0 on success, negative errno if an appropriate 391 * Interface Adapter could not be found and opened. 392 */ 393 int 394 rpcrdma_ia_open(struct rpcrdma_xprt *xprt, struct sockaddr *addr) 395 { 396 struct rpcrdma_ia *ia = &xprt->rx_ia; 397 int rc; 398 399 ia->ri_id = rpcrdma_create_id(xprt, ia, addr); 400 if (IS_ERR(ia->ri_id)) { 401 rc = PTR_ERR(ia->ri_id); 402 goto out_err; 403 } 404 ia->ri_device = ia->ri_id->device; 405 406 ia->ri_pd = ib_alloc_pd(ia->ri_device, 0); 407 if (IS_ERR(ia->ri_pd)) { 408 rc = PTR_ERR(ia->ri_pd); 409 pr_err("rpcrdma: ib_alloc_pd() returned %d\n", rc); 410 goto out_err; 411 } 412 413 switch (xprt_rdma_memreg_strategy) { 414 case RPCRDMA_FRMR: 415 if (frwr_is_supported(ia)) { 416 ia->ri_ops = &rpcrdma_frwr_memreg_ops; 417 break; 418 } 419 /*FALLTHROUGH*/ 420 case RPCRDMA_MTHCAFMR: 421 if (fmr_is_supported(ia)) { 422 ia->ri_ops = &rpcrdma_fmr_memreg_ops; 423 break; 424 } 425 /*FALLTHROUGH*/ 426 default: 427 pr_err("rpcrdma: Device %s does not support memreg mode %d\n", 428 ia->ri_device->name, xprt_rdma_memreg_strategy); 429 rc = -EINVAL; 430 goto out_err; 431 } 432 433 return 0; 434 435 out_err: 436 rpcrdma_ia_close(ia); 437 return rc; 438 } 439 440 /** 441 * rpcrdma_ia_remove - Handle device driver unload 442 * @ia: interface adapter being removed 443 * 444 * Divest transport H/W resources associated with this adapter, 445 * but allow it to be restored later. 446 */ 447 void 448 rpcrdma_ia_remove(struct rpcrdma_ia *ia) 449 { 450 struct rpcrdma_xprt *r_xprt = container_of(ia, struct rpcrdma_xprt, 451 rx_ia); 452 struct rpcrdma_ep *ep = &r_xprt->rx_ep; 453 struct rpcrdma_buffer *buf = &r_xprt->rx_buf; 454 struct rpcrdma_req *req; 455 struct rpcrdma_rep *rep; 456 457 cancel_delayed_work_sync(&buf->rb_refresh_worker); 458 459 /* This is similar to rpcrdma_ep_destroy, but: 460 * - Don't cancel the connect worker. 461 * - Don't call rpcrdma_ep_disconnect, which waits 462 * for another conn upcall, which will deadlock. 463 * - rdma_disconnect is unneeded, the underlying 464 * connection is already gone. 465 */ 466 if (ia->ri_id->qp) { 467 ib_drain_qp(ia->ri_id->qp); 468 rdma_destroy_qp(ia->ri_id); 469 ia->ri_id->qp = NULL; 470 } 471 ib_free_cq(ep->rep_attr.recv_cq); 472 ib_free_cq(ep->rep_attr.send_cq); 473 474 /* The ULP is responsible for ensuring all DMA 475 * mappings and MRs are gone. 476 */ 477 list_for_each_entry(rep, &buf->rb_recv_bufs, rr_list) 478 rpcrdma_dma_unmap_regbuf(rep->rr_rdmabuf); 479 list_for_each_entry(req, &buf->rb_allreqs, rl_all) { 480 rpcrdma_dma_unmap_regbuf(req->rl_rdmabuf); 481 rpcrdma_dma_unmap_regbuf(req->rl_sendbuf); 482 rpcrdma_dma_unmap_regbuf(req->rl_recvbuf); 483 } 484 rpcrdma_destroy_mrs(buf); 485 486 /* Allow waiters to continue */ 487 complete(&ia->ri_remove_done); 488 } 489 490 /** 491 * rpcrdma_ia_close - Clean up/close an IA. 492 * @ia: interface adapter to close 493 * 494 */ 495 void 496 rpcrdma_ia_close(struct rpcrdma_ia *ia) 497 { 498 dprintk("RPC: %s: entering\n", __func__); 499 if (ia->ri_id != NULL && !IS_ERR(ia->ri_id)) { 500 if (ia->ri_id->qp) 501 rdma_destroy_qp(ia->ri_id); 502 rdma_destroy_id(ia->ri_id); 503 } 504 ia->ri_id = NULL; 505 ia->ri_device = NULL; 506 507 /* If the pd is still busy, xprtrdma missed freeing a resource */ 508 if (ia->ri_pd && !IS_ERR(ia->ri_pd)) 509 ib_dealloc_pd(ia->ri_pd); 510 ia->ri_pd = NULL; 511 } 512 513 /* 514 * Create unconnected endpoint. 515 */ 516 int 517 rpcrdma_ep_create(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia, 518 struct rpcrdma_create_data_internal *cdata) 519 { 520 struct rpcrdma_connect_private *pmsg = &ep->rep_cm_private; 521 unsigned int max_qp_wr, max_sge; 522 struct ib_cq *sendcq, *recvcq; 523 int rc; 524 525 max_sge = min_t(unsigned int, ia->ri_device->attrs.max_sge, 526 RPCRDMA_MAX_SEND_SGES); 527 if (max_sge < RPCRDMA_MIN_SEND_SGES) { 528 pr_warn("rpcrdma: HCA provides only %d send SGEs\n", max_sge); 529 return -ENOMEM; 530 } 531 ia->ri_max_send_sges = max_sge - RPCRDMA_MIN_SEND_SGES; 532 533 if (ia->ri_device->attrs.max_qp_wr <= RPCRDMA_BACKWARD_WRS) { 534 dprintk("RPC: %s: insufficient wqe's available\n", 535 __func__); 536 return -ENOMEM; 537 } 538 max_qp_wr = ia->ri_device->attrs.max_qp_wr - RPCRDMA_BACKWARD_WRS - 1; 539 540 /* check provider's send/recv wr limits */ 541 if (cdata->max_requests > max_qp_wr) 542 cdata->max_requests = max_qp_wr; 543 544 ep->rep_attr.event_handler = rpcrdma_qp_async_error_upcall; 545 ep->rep_attr.qp_context = ep; 546 ep->rep_attr.srq = NULL; 547 ep->rep_attr.cap.max_send_wr = cdata->max_requests; 548 ep->rep_attr.cap.max_send_wr += RPCRDMA_BACKWARD_WRS; 549 ep->rep_attr.cap.max_send_wr += 1; /* drain cqe */ 550 rc = ia->ri_ops->ro_open(ia, ep, cdata); 551 if (rc) 552 return rc; 553 ep->rep_attr.cap.max_recv_wr = cdata->max_requests; 554 ep->rep_attr.cap.max_recv_wr += RPCRDMA_BACKWARD_WRS; 555 ep->rep_attr.cap.max_recv_wr += 1; /* drain cqe */ 556 ep->rep_attr.cap.max_send_sge = max_sge; 557 ep->rep_attr.cap.max_recv_sge = 1; 558 ep->rep_attr.cap.max_inline_data = 0; 559 ep->rep_attr.sq_sig_type = IB_SIGNAL_REQ_WR; 560 ep->rep_attr.qp_type = IB_QPT_RC; 561 ep->rep_attr.port_num = ~0; 562 563 dprintk("RPC: %s: requested max: dtos: send %d recv %d; " 564 "iovs: send %d recv %d\n", 565 __func__, 566 ep->rep_attr.cap.max_send_wr, 567 ep->rep_attr.cap.max_recv_wr, 568 ep->rep_attr.cap.max_send_sge, 569 ep->rep_attr.cap.max_recv_sge); 570 571 /* set trigger for requesting send completion */ 572 ep->rep_cqinit = ep->rep_attr.cap.max_send_wr/2 - 1; 573 if (ep->rep_cqinit <= 2) 574 ep->rep_cqinit = 0; /* always signal? */ 575 rpcrdma_init_cqcount(ep, 0); 576 init_waitqueue_head(&ep->rep_connect_wait); 577 INIT_DELAYED_WORK(&ep->rep_connect_worker, rpcrdma_connect_worker); 578 579 sendcq = ib_alloc_cq(ia->ri_device, NULL, 580 ep->rep_attr.cap.max_send_wr + 1, 581 0, IB_POLL_SOFTIRQ); 582 if (IS_ERR(sendcq)) { 583 rc = PTR_ERR(sendcq); 584 dprintk("RPC: %s: failed to create send CQ: %i\n", 585 __func__, rc); 586 goto out1; 587 } 588 589 recvcq = ib_alloc_cq(ia->ri_device, NULL, 590 ep->rep_attr.cap.max_recv_wr + 1, 591 0, IB_POLL_SOFTIRQ); 592 if (IS_ERR(recvcq)) { 593 rc = PTR_ERR(recvcq); 594 dprintk("RPC: %s: failed to create recv CQ: %i\n", 595 __func__, rc); 596 goto out2; 597 } 598 599 ep->rep_attr.send_cq = sendcq; 600 ep->rep_attr.recv_cq = recvcq; 601 602 /* Initialize cma parameters */ 603 memset(&ep->rep_remote_cma, 0, sizeof(ep->rep_remote_cma)); 604 605 /* Prepare RDMA-CM private message */ 606 pmsg->cp_magic = rpcrdma_cmp_magic; 607 pmsg->cp_version = RPCRDMA_CMP_VERSION; 608 pmsg->cp_flags |= ia->ri_ops->ro_send_w_inv_ok; 609 pmsg->cp_send_size = rpcrdma_encode_buffer_size(cdata->inline_wsize); 610 pmsg->cp_recv_size = rpcrdma_encode_buffer_size(cdata->inline_rsize); 611 ep->rep_remote_cma.private_data = pmsg; 612 ep->rep_remote_cma.private_data_len = sizeof(*pmsg); 613 614 /* Client offers RDMA Read but does not initiate */ 615 ep->rep_remote_cma.initiator_depth = 0; 616 if (ia->ri_device->attrs.max_qp_rd_atom > 32) /* arbitrary but <= 255 */ 617 ep->rep_remote_cma.responder_resources = 32; 618 else 619 ep->rep_remote_cma.responder_resources = 620 ia->ri_device->attrs.max_qp_rd_atom; 621 622 /* Limit transport retries so client can detect server 623 * GID changes quickly. RPC layer handles re-establishing 624 * transport connection and retransmission. 625 */ 626 ep->rep_remote_cma.retry_count = 6; 627 628 /* RPC-over-RDMA handles its own flow control. In addition, 629 * make all RNR NAKs visible so we know that RPC-over-RDMA 630 * flow control is working correctly (no NAKs should be seen). 631 */ 632 ep->rep_remote_cma.flow_control = 0; 633 ep->rep_remote_cma.rnr_retry_count = 0; 634 635 return 0; 636 637 out2: 638 ib_free_cq(sendcq); 639 out1: 640 return rc; 641 } 642 643 /* 644 * rpcrdma_ep_destroy 645 * 646 * Disconnect and destroy endpoint. After this, the only 647 * valid operations on the ep are to free it (if dynamically 648 * allocated) or re-create it. 649 */ 650 void 651 rpcrdma_ep_destroy(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia) 652 { 653 dprintk("RPC: %s: entering, connected is %d\n", 654 __func__, ep->rep_connected); 655 656 cancel_delayed_work_sync(&ep->rep_connect_worker); 657 658 if (ia->ri_id->qp) { 659 rpcrdma_ep_disconnect(ep, ia); 660 rdma_destroy_qp(ia->ri_id); 661 ia->ri_id->qp = NULL; 662 } 663 664 ib_free_cq(ep->rep_attr.recv_cq); 665 ib_free_cq(ep->rep_attr.send_cq); 666 } 667 668 /* Re-establish a connection after a device removal event. 669 * Unlike a normal reconnection, a fresh PD and a new set 670 * of MRs and buffers is needed. 671 */ 672 static int 673 rpcrdma_ep_recreate_xprt(struct rpcrdma_xprt *r_xprt, 674 struct rpcrdma_ep *ep, struct rpcrdma_ia *ia) 675 { 676 struct sockaddr *sap = (struct sockaddr *)&r_xprt->rx_data.addr; 677 int rc, err; 678 679 pr_info("%s: r_xprt = %p\n", __func__, r_xprt); 680 681 rc = -EHOSTUNREACH; 682 if (rpcrdma_ia_open(r_xprt, sap)) 683 goto out1; 684 685 rc = -ENOMEM; 686 err = rpcrdma_ep_create(ep, ia, &r_xprt->rx_data); 687 if (err) { 688 pr_err("rpcrdma: rpcrdma_ep_create returned %d\n", err); 689 goto out2; 690 } 691 692 rc = -ENETUNREACH; 693 err = rdma_create_qp(ia->ri_id, ia->ri_pd, &ep->rep_attr); 694 if (err) { 695 pr_err("rpcrdma: rdma_create_qp returned %d\n", err); 696 goto out3; 697 } 698 699 rpcrdma_create_mrs(r_xprt); 700 return 0; 701 702 out3: 703 rpcrdma_ep_destroy(ep, ia); 704 out2: 705 rpcrdma_ia_close(ia); 706 out1: 707 return rc; 708 } 709 710 static int 711 rpcrdma_ep_reconnect(struct rpcrdma_xprt *r_xprt, struct rpcrdma_ep *ep, 712 struct rpcrdma_ia *ia) 713 { 714 struct sockaddr *sap = (struct sockaddr *)&r_xprt->rx_data.addr; 715 struct rdma_cm_id *id, *old; 716 int err, rc; 717 718 dprintk("RPC: %s: reconnecting...\n", __func__); 719 720 rpcrdma_ep_disconnect(ep, ia); 721 722 rc = -EHOSTUNREACH; 723 id = rpcrdma_create_id(r_xprt, ia, sap); 724 if (IS_ERR(id)) 725 goto out; 726 727 /* As long as the new ID points to the same device as the 728 * old ID, we can reuse the transport's existing PD and all 729 * previously allocated MRs. Also, the same device means 730 * the transport's previous DMA mappings are still valid. 731 * 732 * This is a sanity check only. There should be no way these 733 * point to two different devices here. 734 */ 735 old = id; 736 rc = -ENETUNREACH; 737 if (ia->ri_device != id->device) { 738 pr_err("rpcrdma: can't reconnect on different device!\n"); 739 goto out_destroy; 740 } 741 742 err = rdma_create_qp(id, ia->ri_pd, &ep->rep_attr); 743 if (err) { 744 dprintk("RPC: %s: rdma_create_qp returned %d\n", 745 __func__, err); 746 goto out_destroy; 747 } 748 749 /* Atomically replace the transport's ID and QP. */ 750 rc = 0; 751 old = ia->ri_id; 752 ia->ri_id = id; 753 rdma_destroy_qp(old); 754 755 out_destroy: 756 rdma_destroy_id(old); 757 out: 758 return rc; 759 } 760 761 /* 762 * Connect unconnected endpoint. 763 */ 764 int 765 rpcrdma_ep_connect(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia) 766 { 767 struct rpcrdma_xprt *r_xprt = container_of(ia, struct rpcrdma_xprt, 768 rx_ia); 769 unsigned int extras; 770 int rc; 771 772 retry: 773 switch (ep->rep_connected) { 774 case 0: 775 dprintk("RPC: %s: connecting...\n", __func__); 776 rc = rdma_create_qp(ia->ri_id, ia->ri_pd, &ep->rep_attr); 777 if (rc) { 778 dprintk("RPC: %s: rdma_create_qp failed %i\n", 779 __func__, rc); 780 rc = -ENETUNREACH; 781 goto out_noupdate; 782 } 783 break; 784 case -ENODEV: 785 rc = rpcrdma_ep_recreate_xprt(r_xprt, ep, ia); 786 if (rc) 787 goto out_noupdate; 788 break; 789 default: 790 rc = rpcrdma_ep_reconnect(r_xprt, ep, ia); 791 if (rc) 792 goto out; 793 } 794 795 ep->rep_connected = 0; 796 797 rc = rdma_connect(ia->ri_id, &ep->rep_remote_cma); 798 if (rc) { 799 dprintk("RPC: %s: rdma_connect() failed with %i\n", 800 __func__, rc); 801 goto out; 802 } 803 804 wait_event_interruptible(ep->rep_connect_wait, ep->rep_connected != 0); 805 if (ep->rep_connected <= 0) { 806 if (ep->rep_connected == -EAGAIN) 807 goto retry; 808 rc = ep->rep_connected; 809 goto out; 810 } 811 812 dprintk("RPC: %s: connected\n", __func__); 813 extras = r_xprt->rx_buf.rb_bc_srv_max_requests; 814 if (extras) 815 rpcrdma_ep_post_extra_recv(r_xprt, extras); 816 817 out: 818 if (rc) 819 ep->rep_connected = rc; 820 821 out_noupdate: 822 return rc; 823 } 824 825 /* 826 * rpcrdma_ep_disconnect 827 * 828 * This is separate from destroy to facilitate the ability 829 * to reconnect without recreating the endpoint. 830 * 831 * This call is not reentrant, and must not be made in parallel 832 * on the same endpoint. 833 */ 834 void 835 rpcrdma_ep_disconnect(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia) 836 { 837 int rc; 838 839 rc = rdma_disconnect(ia->ri_id); 840 if (!rc) { 841 /* returns without wait if not connected */ 842 wait_event_interruptible(ep->rep_connect_wait, 843 ep->rep_connected != 1); 844 dprintk("RPC: %s: after wait, %sconnected\n", __func__, 845 (ep->rep_connected == 1) ? "still " : "dis"); 846 } else { 847 dprintk("RPC: %s: rdma_disconnect %i\n", __func__, rc); 848 ep->rep_connected = rc; 849 } 850 851 ib_drain_qp(ia->ri_id->qp); 852 } 853 854 static void 855 rpcrdma_mr_recovery_worker(struct work_struct *work) 856 { 857 struct rpcrdma_buffer *buf = container_of(work, struct rpcrdma_buffer, 858 rb_recovery_worker.work); 859 struct rpcrdma_mw *mw; 860 861 spin_lock(&buf->rb_recovery_lock); 862 while (!list_empty(&buf->rb_stale_mrs)) { 863 mw = rpcrdma_pop_mw(&buf->rb_stale_mrs); 864 spin_unlock(&buf->rb_recovery_lock); 865 866 dprintk("RPC: %s: recovering MR %p\n", __func__, mw); 867 mw->mw_xprt->rx_ia.ri_ops->ro_recover_mr(mw); 868 869 spin_lock(&buf->rb_recovery_lock); 870 } 871 spin_unlock(&buf->rb_recovery_lock); 872 } 873 874 void 875 rpcrdma_defer_mr_recovery(struct rpcrdma_mw *mw) 876 { 877 struct rpcrdma_xprt *r_xprt = mw->mw_xprt; 878 struct rpcrdma_buffer *buf = &r_xprt->rx_buf; 879 880 spin_lock(&buf->rb_recovery_lock); 881 rpcrdma_push_mw(mw, &buf->rb_stale_mrs); 882 spin_unlock(&buf->rb_recovery_lock); 883 884 schedule_delayed_work(&buf->rb_recovery_worker, 0); 885 } 886 887 static void 888 rpcrdma_create_mrs(struct rpcrdma_xprt *r_xprt) 889 { 890 struct rpcrdma_buffer *buf = &r_xprt->rx_buf; 891 struct rpcrdma_ia *ia = &r_xprt->rx_ia; 892 unsigned int count; 893 LIST_HEAD(free); 894 LIST_HEAD(all); 895 896 for (count = 0; count < 32; count++) { 897 struct rpcrdma_mw *mw; 898 int rc; 899 900 mw = kzalloc(sizeof(*mw), GFP_KERNEL); 901 if (!mw) 902 break; 903 904 rc = ia->ri_ops->ro_init_mr(ia, mw); 905 if (rc) { 906 kfree(mw); 907 break; 908 } 909 910 mw->mw_xprt = r_xprt; 911 912 list_add(&mw->mw_list, &free); 913 list_add(&mw->mw_all, &all); 914 } 915 916 spin_lock(&buf->rb_mwlock); 917 list_splice(&free, &buf->rb_mws); 918 list_splice(&all, &buf->rb_all); 919 r_xprt->rx_stats.mrs_allocated += count; 920 spin_unlock(&buf->rb_mwlock); 921 922 dprintk("RPC: %s: created %u MRs\n", __func__, count); 923 } 924 925 static void 926 rpcrdma_mr_refresh_worker(struct work_struct *work) 927 { 928 struct rpcrdma_buffer *buf = container_of(work, struct rpcrdma_buffer, 929 rb_refresh_worker.work); 930 struct rpcrdma_xprt *r_xprt = container_of(buf, struct rpcrdma_xprt, 931 rx_buf); 932 933 rpcrdma_create_mrs(r_xprt); 934 } 935 936 struct rpcrdma_req * 937 rpcrdma_create_req(struct rpcrdma_xprt *r_xprt) 938 { 939 struct rpcrdma_buffer *buffer = &r_xprt->rx_buf; 940 struct rpcrdma_req *req; 941 942 req = kzalloc(sizeof(*req), GFP_KERNEL); 943 if (req == NULL) 944 return ERR_PTR(-ENOMEM); 945 946 spin_lock(&buffer->rb_reqslock); 947 list_add(&req->rl_all, &buffer->rb_allreqs); 948 spin_unlock(&buffer->rb_reqslock); 949 req->rl_cqe.done = rpcrdma_wc_send; 950 req->rl_buffer = &r_xprt->rx_buf; 951 INIT_LIST_HEAD(&req->rl_registered); 952 req->rl_send_wr.next = NULL; 953 req->rl_send_wr.wr_cqe = &req->rl_cqe; 954 req->rl_send_wr.sg_list = req->rl_send_sge; 955 req->rl_send_wr.opcode = IB_WR_SEND; 956 return req; 957 } 958 959 struct rpcrdma_rep * 960 rpcrdma_create_rep(struct rpcrdma_xprt *r_xprt) 961 { 962 struct rpcrdma_create_data_internal *cdata = &r_xprt->rx_data; 963 struct rpcrdma_rep *rep; 964 int rc; 965 966 rc = -ENOMEM; 967 rep = kzalloc(sizeof(*rep), GFP_KERNEL); 968 if (rep == NULL) 969 goto out; 970 971 rep->rr_rdmabuf = rpcrdma_alloc_regbuf(cdata->inline_rsize, 972 DMA_FROM_DEVICE, GFP_KERNEL); 973 if (IS_ERR(rep->rr_rdmabuf)) { 974 rc = PTR_ERR(rep->rr_rdmabuf); 975 goto out_free; 976 } 977 978 rep->rr_cqe.done = rpcrdma_wc_receive; 979 rep->rr_rxprt = r_xprt; 980 INIT_WORK(&rep->rr_work, rpcrdma_reply_handler); 981 rep->rr_recv_wr.next = NULL; 982 rep->rr_recv_wr.wr_cqe = &rep->rr_cqe; 983 rep->rr_recv_wr.sg_list = &rep->rr_rdmabuf->rg_iov; 984 rep->rr_recv_wr.num_sge = 1; 985 return rep; 986 987 out_free: 988 kfree(rep); 989 out: 990 return ERR_PTR(rc); 991 } 992 993 int 994 rpcrdma_buffer_create(struct rpcrdma_xprt *r_xprt) 995 { 996 struct rpcrdma_buffer *buf = &r_xprt->rx_buf; 997 int i, rc; 998 999 buf->rb_max_requests = r_xprt->rx_data.max_requests; 1000 buf->rb_bc_srv_max_requests = 0; 1001 atomic_set(&buf->rb_credits, 1); 1002 spin_lock_init(&buf->rb_mwlock); 1003 spin_lock_init(&buf->rb_lock); 1004 spin_lock_init(&buf->rb_recovery_lock); 1005 INIT_LIST_HEAD(&buf->rb_mws); 1006 INIT_LIST_HEAD(&buf->rb_all); 1007 INIT_LIST_HEAD(&buf->rb_pending); 1008 INIT_LIST_HEAD(&buf->rb_stale_mrs); 1009 INIT_DELAYED_WORK(&buf->rb_refresh_worker, 1010 rpcrdma_mr_refresh_worker); 1011 INIT_DELAYED_WORK(&buf->rb_recovery_worker, 1012 rpcrdma_mr_recovery_worker); 1013 1014 rpcrdma_create_mrs(r_xprt); 1015 1016 INIT_LIST_HEAD(&buf->rb_send_bufs); 1017 INIT_LIST_HEAD(&buf->rb_allreqs); 1018 spin_lock_init(&buf->rb_reqslock); 1019 for (i = 0; i < buf->rb_max_requests; i++) { 1020 struct rpcrdma_req *req; 1021 1022 req = rpcrdma_create_req(r_xprt); 1023 if (IS_ERR(req)) { 1024 dprintk("RPC: %s: request buffer %d alloc" 1025 " failed\n", __func__, i); 1026 rc = PTR_ERR(req); 1027 goto out; 1028 } 1029 req->rl_backchannel = false; 1030 list_add(&req->rl_list, &buf->rb_send_bufs); 1031 } 1032 1033 INIT_LIST_HEAD(&buf->rb_recv_bufs); 1034 for (i = 0; i < buf->rb_max_requests + RPCRDMA_MAX_BC_REQUESTS; i++) { 1035 struct rpcrdma_rep *rep; 1036 1037 rep = rpcrdma_create_rep(r_xprt); 1038 if (IS_ERR(rep)) { 1039 dprintk("RPC: %s: reply buffer %d alloc failed\n", 1040 __func__, i); 1041 rc = PTR_ERR(rep); 1042 goto out; 1043 } 1044 list_add(&rep->rr_list, &buf->rb_recv_bufs); 1045 } 1046 1047 return 0; 1048 out: 1049 rpcrdma_buffer_destroy(buf); 1050 return rc; 1051 } 1052 1053 static struct rpcrdma_req * 1054 rpcrdma_buffer_get_req_locked(struct rpcrdma_buffer *buf) 1055 { 1056 struct rpcrdma_req *req; 1057 1058 req = list_first_entry(&buf->rb_send_bufs, 1059 struct rpcrdma_req, rl_list); 1060 list_del_init(&req->rl_list); 1061 return req; 1062 } 1063 1064 static struct rpcrdma_rep * 1065 rpcrdma_buffer_get_rep_locked(struct rpcrdma_buffer *buf) 1066 { 1067 struct rpcrdma_rep *rep; 1068 1069 rep = list_first_entry(&buf->rb_recv_bufs, 1070 struct rpcrdma_rep, rr_list); 1071 list_del(&rep->rr_list); 1072 return rep; 1073 } 1074 1075 static void 1076 rpcrdma_destroy_rep(struct rpcrdma_rep *rep) 1077 { 1078 rpcrdma_free_regbuf(rep->rr_rdmabuf); 1079 kfree(rep); 1080 } 1081 1082 void 1083 rpcrdma_destroy_req(struct rpcrdma_req *req) 1084 { 1085 rpcrdma_free_regbuf(req->rl_recvbuf); 1086 rpcrdma_free_regbuf(req->rl_sendbuf); 1087 rpcrdma_free_regbuf(req->rl_rdmabuf); 1088 kfree(req); 1089 } 1090 1091 static void 1092 rpcrdma_destroy_mrs(struct rpcrdma_buffer *buf) 1093 { 1094 struct rpcrdma_xprt *r_xprt = container_of(buf, struct rpcrdma_xprt, 1095 rx_buf); 1096 struct rpcrdma_ia *ia = rdmab_to_ia(buf); 1097 struct rpcrdma_mw *mw; 1098 unsigned int count; 1099 1100 count = 0; 1101 spin_lock(&buf->rb_mwlock); 1102 while (!list_empty(&buf->rb_all)) { 1103 mw = list_entry(buf->rb_all.next, struct rpcrdma_mw, mw_all); 1104 list_del(&mw->mw_all); 1105 1106 spin_unlock(&buf->rb_mwlock); 1107 ia->ri_ops->ro_release_mr(mw); 1108 count++; 1109 spin_lock(&buf->rb_mwlock); 1110 } 1111 spin_unlock(&buf->rb_mwlock); 1112 r_xprt->rx_stats.mrs_allocated = 0; 1113 1114 dprintk("RPC: %s: released %u MRs\n", __func__, count); 1115 } 1116 1117 void 1118 rpcrdma_buffer_destroy(struct rpcrdma_buffer *buf) 1119 { 1120 cancel_delayed_work_sync(&buf->rb_recovery_worker); 1121 cancel_delayed_work_sync(&buf->rb_refresh_worker); 1122 1123 while (!list_empty(&buf->rb_recv_bufs)) { 1124 struct rpcrdma_rep *rep; 1125 1126 rep = rpcrdma_buffer_get_rep_locked(buf); 1127 rpcrdma_destroy_rep(rep); 1128 } 1129 buf->rb_send_count = 0; 1130 1131 spin_lock(&buf->rb_reqslock); 1132 while (!list_empty(&buf->rb_allreqs)) { 1133 struct rpcrdma_req *req; 1134 1135 req = list_first_entry(&buf->rb_allreqs, 1136 struct rpcrdma_req, rl_all); 1137 list_del(&req->rl_all); 1138 1139 spin_unlock(&buf->rb_reqslock); 1140 rpcrdma_destroy_req(req); 1141 spin_lock(&buf->rb_reqslock); 1142 } 1143 spin_unlock(&buf->rb_reqslock); 1144 buf->rb_recv_count = 0; 1145 1146 rpcrdma_destroy_mrs(buf); 1147 } 1148 1149 struct rpcrdma_mw * 1150 rpcrdma_get_mw(struct rpcrdma_xprt *r_xprt) 1151 { 1152 struct rpcrdma_buffer *buf = &r_xprt->rx_buf; 1153 struct rpcrdma_mw *mw = NULL; 1154 1155 spin_lock(&buf->rb_mwlock); 1156 if (!list_empty(&buf->rb_mws)) 1157 mw = rpcrdma_pop_mw(&buf->rb_mws); 1158 spin_unlock(&buf->rb_mwlock); 1159 1160 if (!mw) 1161 goto out_nomws; 1162 mw->mw_flags = 0; 1163 return mw; 1164 1165 out_nomws: 1166 dprintk("RPC: %s: no MWs available\n", __func__); 1167 if (r_xprt->rx_ep.rep_connected != -ENODEV) 1168 schedule_delayed_work(&buf->rb_refresh_worker, 0); 1169 1170 /* Allow the reply handler and refresh worker to run */ 1171 cond_resched(); 1172 1173 return NULL; 1174 } 1175 1176 void 1177 rpcrdma_put_mw(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mw *mw) 1178 { 1179 struct rpcrdma_buffer *buf = &r_xprt->rx_buf; 1180 1181 spin_lock(&buf->rb_mwlock); 1182 rpcrdma_push_mw(mw, &buf->rb_mws); 1183 spin_unlock(&buf->rb_mwlock); 1184 } 1185 1186 static struct rpcrdma_rep * 1187 rpcrdma_buffer_get_rep(struct rpcrdma_buffer *buffers) 1188 { 1189 /* If an RPC previously completed without a reply (say, a 1190 * credential problem or a soft timeout occurs) then hold off 1191 * on supplying more Receive buffers until the number of new 1192 * pending RPCs catches up to the number of posted Receives. 1193 */ 1194 if (unlikely(buffers->rb_send_count < buffers->rb_recv_count)) 1195 return NULL; 1196 1197 if (unlikely(list_empty(&buffers->rb_recv_bufs))) 1198 return NULL; 1199 buffers->rb_recv_count++; 1200 return rpcrdma_buffer_get_rep_locked(buffers); 1201 } 1202 1203 /* 1204 * Get a set of request/reply buffers. 1205 * 1206 * Reply buffer (if available) is attached to send buffer upon return. 1207 */ 1208 struct rpcrdma_req * 1209 rpcrdma_buffer_get(struct rpcrdma_buffer *buffers) 1210 { 1211 struct rpcrdma_req *req; 1212 1213 spin_lock(&buffers->rb_lock); 1214 if (list_empty(&buffers->rb_send_bufs)) 1215 goto out_reqbuf; 1216 buffers->rb_send_count++; 1217 req = rpcrdma_buffer_get_req_locked(buffers); 1218 req->rl_reply = rpcrdma_buffer_get_rep(buffers); 1219 spin_unlock(&buffers->rb_lock); 1220 return req; 1221 1222 out_reqbuf: 1223 spin_unlock(&buffers->rb_lock); 1224 pr_warn("RPC: %s: out of request buffers\n", __func__); 1225 return NULL; 1226 } 1227 1228 /* 1229 * Put request/reply buffers back into pool. 1230 * Pre-decrement counter/array index. 1231 */ 1232 void 1233 rpcrdma_buffer_put(struct rpcrdma_req *req) 1234 { 1235 struct rpcrdma_buffer *buffers = req->rl_buffer; 1236 struct rpcrdma_rep *rep = req->rl_reply; 1237 1238 req->rl_send_wr.num_sge = 0; 1239 req->rl_reply = NULL; 1240 1241 spin_lock(&buffers->rb_lock); 1242 buffers->rb_send_count--; 1243 list_add_tail(&req->rl_list, &buffers->rb_send_bufs); 1244 if (rep) { 1245 buffers->rb_recv_count--; 1246 list_add_tail(&rep->rr_list, &buffers->rb_recv_bufs); 1247 } 1248 spin_unlock(&buffers->rb_lock); 1249 } 1250 1251 /* 1252 * Recover reply buffers from pool. 1253 * This happens when recovering from disconnect. 1254 */ 1255 void 1256 rpcrdma_recv_buffer_get(struct rpcrdma_req *req) 1257 { 1258 struct rpcrdma_buffer *buffers = req->rl_buffer; 1259 1260 spin_lock(&buffers->rb_lock); 1261 req->rl_reply = rpcrdma_buffer_get_rep(buffers); 1262 spin_unlock(&buffers->rb_lock); 1263 } 1264 1265 /* 1266 * Put reply buffers back into pool when not attached to 1267 * request. This happens in error conditions. 1268 */ 1269 void 1270 rpcrdma_recv_buffer_put(struct rpcrdma_rep *rep) 1271 { 1272 struct rpcrdma_buffer *buffers = &rep->rr_rxprt->rx_buf; 1273 1274 spin_lock(&buffers->rb_lock); 1275 buffers->rb_recv_count--; 1276 list_add_tail(&rep->rr_list, &buffers->rb_recv_bufs); 1277 spin_unlock(&buffers->rb_lock); 1278 } 1279 1280 /** 1281 * rpcrdma_alloc_regbuf - allocate and DMA-map memory for SEND/RECV buffers 1282 * @size: size of buffer to be allocated, in bytes 1283 * @direction: direction of data movement 1284 * @flags: GFP flags 1285 * 1286 * Returns an ERR_PTR, or a pointer to a regbuf, a buffer that 1287 * can be persistently DMA-mapped for I/O. 1288 * 1289 * xprtrdma uses a regbuf for posting an outgoing RDMA SEND, or for 1290 * receiving the payload of RDMA RECV operations. During Long Calls 1291 * or Replies they may be registered externally via ro_map. 1292 */ 1293 struct rpcrdma_regbuf * 1294 rpcrdma_alloc_regbuf(size_t size, enum dma_data_direction direction, 1295 gfp_t flags) 1296 { 1297 struct rpcrdma_regbuf *rb; 1298 1299 rb = kmalloc(sizeof(*rb) + size, flags); 1300 if (rb == NULL) 1301 return ERR_PTR(-ENOMEM); 1302 1303 rb->rg_device = NULL; 1304 rb->rg_direction = direction; 1305 rb->rg_iov.length = size; 1306 1307 return rb; 1308 } 1309 1310 /** 1311 * __rpcrdma_map_regbuf - DMA-map a regbuf 1312 * @ia: controlling rpcrdma_ia 1313 * @rb: regbuf to be mapped 1314 */ 1315 bool 1316 __rpcrdma_dma_map_regbuf(struct rpcrdma_ia *ia, struct rpcrdma_regbuf *rb) 1317 { 1318 struct ib_device *device = ia->ri_device; 1319 1320 if (rb->rg_direction == DMA_NONE) 1321 return false; 1322 1323 rb->rg_iov.addr = ib_dma_map_single(device, 1324 (void *)rb->rg_base, 1325 rdmab_length(rb), 1326 rb->rg_direction); 1327 if (ib_dma_mapping_error(device, rdmab_addr(rb))) 1328 return false; 1329 1330 rb->rg_device = device; 1331 rb->rg_iov.lkey = ia->ri_pd->local_dma_lkey; 1332 return true; 1333 } 1334 1335 static void 1336 rpcrdma_dma_unmap_regbuf(struct rpcrdma_regbuf *rb) 1337 { 1338 if (!rpcrdma_regbuf_is_mapped(rb)) 1339 return; 1340 1341 ib_dma_unmap_single(rb->rg_device, rdmab_addr(rb), 1342 rdmab_length(rb), rb->rg_direction); 1343 rb->rg_device = NULL; 1344 } 1345 1346 /** 1347 * rpcrdma_free_regbuf - deregister and free registered buffer 1348 * @rb: regbuf to be deregistered and freed 1349 */ 1350 void 1351 rpcrdma_free_regbuf(struct rpcrdma_regbuf *rb) 1352 { 1353 if (!rb) 1354 return; 1355 1356 rpcrdma_dma_unmap_regbuf(rb); 1357 kfree(rb); 1358 } 1359 1360 /* 1361 * Prepost any receive buffer, then post send. 1362 * 1363 * Receive buffer is donated to hardware, reclaimed upon recv completion. 1364 */ 1365 int 1366 rpcrdma_ep_post(struct rpcrdma_ia *ia, 1367 struct rpcrdma_ep *ep, 1368 struct rpcrdma_req *req) 1369 { 1370 struct ib_send_wr *send_wr = &req->rl_send_wr; 1371 struct ib_send_wr *send_wr_fail; 1372 int rc; 1373 1374 if (req->rl_reply) { 1375 rc = rpcrdma_ep_post_recv(ia, req->rl_reply); 1376 if (rc) 1377 return rc; 1378 req->rl_reply = NULL; 1379 } 1380 1381 dprintk("RPC: %s: posting %d s/g entries\n", 1382 __func__, send_wr->num_sge); 1383 1384 rpcrdma_set_signaled(ep, send_wr); 1385 rc = ib_post_send(ia->ri_id->qp, send_wr, &send_wr_fail); 1386 if (rc) 1387 goto out_postsend_err; 1388 return 0; 1389 1390 out_postsend_err: 1391 pr_err("rpcrdma: RDMA Send ib_post_send returned %i\n", rc); 1392 return -ENOTCONN; 1393 } 1394 1395 int 1396 rpcrdma_ep_post_recv(struct rpcrdma_ia *ia, 1397 struct rpcrdma_rep *rep) 1398 { 1399 struct ib_recv_wr *recv_wr_fail; 1400 int rc; 1401 1402 if (!rpcrdma_dma_map_regbuf(ia, rep->rr_rdmabuf)) 1403 goto out_map; 1404 rc = ib_post_recv(ia->ri_id->qp, &rep->rr_recv_wr, &recv_wr_fail); 1405 if (rc) 1406 goto out_postrecv; 1407 return 0; 1408 1409 out_map: 1410 pr_err("rpcrdma: failed to DMA map the Receive buffer\n"); 1411 return -EIO; 1412 1413 out_postrecv: 1414 pr_err("rpcrdma: ib_post_recv returned %i\n", rc); 1415 return -ENOTCONN; 1416 } 1417 1418 /** 1419 * rpcrdma_ep_post_extra_recv - Post buffers for incoming backchannel requests 1420 * @r_xprt: transport associated with these backchannel resources 1421 * @min_reqs: minimum number of incoming requests expected 1422 * 1423 * Returns zero if all requested buffers were posted, or a negative errno. 1424 */ 1425 int 1426 rpcrdma_ep_post_extra_recv(struct rpcrdma_xprt *r_xprt, unsigned int count) 1427 { 1428 struct rpcrdma_buffer *buffers = &r_xprt->rx_buf; 1429 struct rpcrdma_ia *ia = &r_xprt->rx_ia; 1430 struct rpcrdma_rep *rep; 1431 int rc; 1432 1433 while (count--) { 1434 spin_lock(&buffers->rb_lock); 1435 if (list_empty(&buffers->rb_recv_bufs)) 1436 goto out_reqbuf; 1437 rep = rpcrdma_buffer_get_rep_locked(buffers); 1438 spin_unlock(&buffers->rb_lock); 1439 1440 rc = rpcrdma_ep_post_recv(ia, rep); 1441 if (rc) 1442 goto out_rc; 1443 } 1444 1445 return 0; 1446 1447 out_reqbuf: 1448 spin_unlock(&buffers->rb_lock); 1449 pr_warn("%s: no extra receive buffers\n", __func__); 1450 return -ENOMEM; 1451 1452 out_rc: 1453 rpcrdma_recv_buffer_put(rep); 1454 return rc; 1455 } 1456