1 /* 2 * Copyright (c) 2003-2007 Network Appliance, Inc. All rights reserved. 3 * 4 * This software is available to you under a choice of one of two 5 * licenses. You may choose to be licensed under the terms of the GNU 6 * General Public License (GPL) Version 2, available from the file 7 * COPYING in the main directory of this source tree, or the BSD-type 8 * license below: 9 * 10 * Redistribution and use in source and binary forms, with or without 11 * modification, are permitted provided that the following conditions 12 * are met: 13 * 14 * Redistributions of source code must retain the above copyright 15 * notice, this list of conditions and the following disclaimer. 16 * 17 * Redistributions in binary form must reproduce the above 18 * copyright notice, this list of conditions and the following 19 * disclaimer in the documentation and/or other materials provided 20 * with the distribution. 21 * 22 * Neither the name of the Network Appliance, Inc. nor the names of 23 * its contributors may be used to endorse or promote products 24 * derived from this software without specific prior written 25 * permission. 26 * 27 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 28 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 29 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 30 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 31 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 32 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 33 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 34 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 35 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 36 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 37 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 38 */ 39 40 /* 41 * verbs.c 42 * 43 * Encapsulates the major functions managing: 44 * o adapters 45 * o endpoints 46 * o connections 47 * o buffer memory 48 */ 49 50 #include <linux/interrupt.h> 51 #include <linux/slab.h> 52 #include <linux/prefetch.h> 53 #include <linux/sunrpc/addr.h> 54 #include <linux/sunrpc/svc_rdma.h> 55 #include <asm/bitops.h> 56 57 #include <rdma/ib_cm.h> 58 59 #include "xprt_rdma.h" 60 61 /* 62 * Globals/Macros 63 */ 64 65 #if IS_ENABLED(CONFIG_SUNRPC_DEBUG) 66 # define RPCDBG_FACILITY RPCDBG_TRANS 67 #endif 68 69 /* 70 * internal functions 71 */ 72 static void rpcrdma_create_mrs(struct rpcrdma_xprt *r_xprt); 73 static void rpcrdma_destroy_mrs(struct rpcrdma_buffer *buf); 74 static void rpcrdma_dma_unmap_regbuf(struct rpcrdma_regbuf *rb); 75 76 static struct workqueue_struct *rpcrdma_receive_wq __read_mostly; 77 78 int 79 rpcrdma_alloc_wq(void) 80 { 81 struct workqueue_struct *recv_wq; 82 83 recv_wq = alloc_workqueue("xprtrdma_receive", 84 WQ_MEM_RECLAIM | WQ_UNBOUND | WQ_HIGHPRI, 85 0); 86 if (!recv_wq) 87 return -ENOMEM; 88 89 rpcrdma_receive_wq = recv_wq; 90 return 0; 91 } 92 93 void 94 rpcrdma_destroy_wq(void) 95 { 96 struct workqueue_struct *wq; 97 98 if (rpcrdma_receive_wq) { 99 wq = rpcrdma_receive_wq; 100 rpcrdma_receive_wq = NULL; 101 destroy_workqueue(wq); 102 } 103 } 104 105 static void 106 rpcrdma_qp_async_error_upcall(struct ib_event *event, void *context) 107 { 108 struct rpcrdma_ep *ep = context; 109 110 pr_err("rpcrdma: %s on device %s ep %p\n", 111 ib_event_msg(event->event), event->device->name, context); 112 113 if (ep->rep_connected == 1) { 114 ep->rep_connected = -EIO; 115 rpcrdma_conn_func(ep); 116 wake_up_all(&ep->rep_connect_wait); 117 } 118 } 119 120 /** 121 * rpcrdma_wc_send - Invoked by RDMA provider for each polled Send WC 122 * @cq: completion queue (ignored) 123 * @wc: completed WR 124 * 125 */ 126 static void 127 rpcrdma_wc_send(struct ib_cq *cq, struct ib_wc *wc) 128 { 129 /* WARNING: Only wr_cqe and status are reliable at this point */ 130 if (wc->status != IB_WC_SUCCESS && wc->status != IB_WC_WR_FLUSH_ERR) 131 pr_err("rpcrdma: Send: %s (%u/0x%x)\n", 132 ib_wc_status_msg(wc->status), 133 wc->status, wc->vendor_err); 134 } 135 136 /* Perform basic sanity checking to avoid using garbage 137 * to update the credit grant value. 138 */ 139 static void 140 rpcrdma_update_granted_credits(struct rpcrdma_rep *rep) 141 { 142 struct rpcrdma_buffer *buffer = &rep->rr_rxprt->rx_buf; 143 __be32 *p = rep->rr_rdmabuf->rg_base; 144 u32 credits; 145 146 credits = be32_to_cpup(p + 2); 147 if (credits == 0) 148 credits = 1; /* don't deadlock */ 149 else if (credits > buffer->rb_max_requests) 150 credits = buffer->rb_max_requests; 151 152 atomic_set(&buffer->rb_credits, credits); 153 } 154 155 /** 156 * rpcrdma_wc_receive - Invoked by RDMA provider for each polled Receive WC 157 * @cq: completion queue (ignored) 158 * @wc: completed WR 159 * 160 */ 161 static void 162 rpcrdma_wc_receive(struct ib_cq *cq, struct ib_wc *wc) 163 { 164 struct ib_cqe *cqe = wc->wr_cqe; 165 struct rpcrdma_rep *rep = container_of(cqe, struct rpcrdma_rep, 166 rr_cqe); 167 168 /* WARNING: Only wr_id and status are reliable at this point */ 169 if (wc->status != IB_WC_SUCCESS) 170 goto out_fail; 171 172 /* status == SUCCESS means all fields in wc are trustworthy */ 173 dprintk("RPC: %s: rep %p opcode 'recv', length %u: success\n", 174 __func__, rep, wc->byte_len); 175 176 rpcrdma_set_xdrlen(&rep->rr_hdrbuf, wc->byte_len); 177 rep->rr_wc_flags = wc->wc_flags; 178 rep->rr_inv_rkey = wc->ex.invalidate_rkey; 179 180 ib_dma_sync_single_for_cpu(rdmab_device(rep->rr_rdmabuf), 181 rdmab_addr(rep->rr_rdmabuf), 182 wc->byte_len, DMA_FROM_DEVICE); 183 184 if (wc->byte_len >= RPCRDMA_HDRLEN_ERR) 185 rpcrdma_update_granted_credits(rep); 186 187 out_schedule: 188 queue_work(rpcrdma_receive_wq, &rep->rr_work); 189 return; 190 191 out_fail: 192 if (wc->status != IB_WC_WR_FLUSH_ERR) 193 pr_err("rpcrdma: Recv: %s (%u/0x%x)\n", 194 ib_wc_status_msg(wc->status), 195 wc->status, wc->vendor_err); 196 rpcrdma_set_xdrlen(&rep->rr_hdrbuf, 0); 197 goto out_schedule; 198 } 199 200 static void 201 rpcrdma_update_connect_private(struct rpcrdma_xprt *r_xprt, 202 struct rdma_conn_param *param) 203 { 204 struct rpcrdma_create_data_internal *cdata = &r_xprt->rx_data; 205 const struct rpcrdma_connect_private *pmsg = param->private_data; 206 unsigned int rsize, wsize; 207 208 /* Default settings for RPC-over-RDMA Version One */ 209 r_xprt->rx_ia.ri_reminv_expected = false; 210 r_xprt->rx_ia.ri_implicit_roundup = xprt_rdma_pad_optimize; 211 rsize = RPCRDMA_V1_DEF_INLINE_SIZE; 212 wsize = RPCRDMA_V1_DEF_INLINE_SIZE; 213 214 if (pmsg && 215 pmsg->cp_magic == rpcrdma_cmp_magic && 216 pmsg->cp_version == RPCRDMA_CMP_VERSION) { 217 r_xprt->rx_ia.ri_reminv_expected = true; 218 r_xprt->rx_ia.ri_implicit_roundup = true; 219 rsize = rpcrdma_decode_buffer_size(pmsg->cp_send_size); 220 wsize = rpcrdma_decode_buffer_size(pmsg->cp_recv_size); 221 } 222 223 if (rsize < cdata->inline_rsize) 224 cdata->inline_rsize = rsize; 225 if (wsize < cdata->inline_wsize) 226 cdata->inline_wsize = wsize; 227 dprintk("RPC: %s: max send %u, max recv %u\n", 228 __func__, cdata->inline_wsize, cdata->inline_rsize); 229 rpcrdma_set_max_header_sizes(r_xprt); 230 } 231 232 static int 233 rpcrdma_conn_upcall(struct rdma_cm_id *id, struct rdma_cm_event *event) 234 { 235 struct rpcrdma_xprt *xprt = id->context; 236 struct rpcrdma_ia *ia = &xprt->rx_ia; 237 struct rpcrdma_ep *ep = &xprt->rx_ep; 238 #if IS_ENABLED(CONFIG_SUNRPC_DEBUG) 239 struct sockaddr *sap = (struct sockaddr *)&ep->rep_remote_addr; 240 #endif 241 int connstate = 0; 242 243 switch (event->event) { 244 case RDMA_CM_EVENT_ADDR_RESOLVED: 245 case RDMA_CM_EVENT_ROUTE_RESOLVED: 246 ia->ri_async_rc = 0; 247 complete(&ia->ri_done); 248 break; 249 case RDMA_CM_EVENT_ADDR_ERROR: 250 ia->ri_async_rc = -EHOSTUNREACH; 251 dprintk("RPC: %s: CM address resolution error, ep 0x%p\n", 252 __func__, ep); 253 complete(&ia->ri_done); 254 break; 255 case RDMA_CM_EVENT_ROUTE_ERROR: 256 ia->ri_async_rc = -ENETUNREACH; 257 dprintk("RPC: %s: CM route resolution error, ep 0x%p\n", 258 __func__, ep); 259 complete(&ia->ri_done); 260 break; 261 case RDMA_CM_EVENT_DEVICE_REMOVAL: 262 #if IS_ENABLED(CONFIG_SUNRPC_DEBUG) 263 pr_info("rpcrdma: removing device %s for %pIS:%u\n", 264 ia->ri_device->name, 265 sap, rpc_get_port(sap)); 266 #endif 267 set_bit(RPCRDMA_IAF_REMOVING, &ia->ri_flags); 268 ep->rep_connected = -ENODEV; 269 xprt_force_disconnect(&xprt->rx_xprt); 270 wait_for_completion(&ia->ri_remove_done); 271 272 ia->ri_id = NULL; 273 ia->ri_pd = NULL; 274 ia->ri_device = NULL; 275 /* Return 1 to ensure the core destroys the id. */ 276 return 1; 277 case RDMA_CM_EVENT_ESTABLISHED: 278 connstate = 1; 279 rpcrdma_update_connect_private(xprt, &event->param.conn); 280 goto connected; 281 case RDMA_CM_EVENT_CONNECT_ERROR: 282 connstate = -ENOTCONN; 283 goto connected; 284 case RDMA_CM_EVENT_UNREACHABLE: 285 connstate = -ENETDOWN; 286 goto connected; 287 case RDMA_CM_EVENT_REJECTED: 288 dprintk("rpcrdma: connection to %pIS:%u rejected: %s\n", 289 sap, rpc_get_port(sap), 290 rdma_reject_msg(id, event->status)); 291 connstate = -ECONNREFUSED; 292 if (event->status == IB_CM_REJ_STALE_CONN) 293 connstate = -EAGAIN; 294 goto connected; 295 case RDMA_CM_EVENT_DISCONNECTED: 296 connstate = -ECONNABORTED; 297 connected: 298 atomic_set(&xprt->rx_buf.rb_credits, 1); 299 ep->rep_connected = connstate; 300 rpcrdma_conn_func(ep); 301 wake_up_all(&ep->rep_connect_wait); 302 /*FALLTHROUGH*/ 303 default: 304 dprintk("RPC: %s: %pIS:%u on %s/%s (ep 0x%p): %s\n", 305 __func__, sap, rpc_get_port(sap), 306 ia->ri_device->name, ia->ri_ops->ro_displayname, 307 ep, rdma_event_msg(event->event)); 308 break; 309 } 310 311 return 0; 312 } 313 314 static struct rdma_cm_id * 315 rpcrdma_create_id(struct rpcrdma_xprt *xprt, 316 struct rpcrdma_ia *ia, struct sockaddr *addr) 317 { 318 unsigned long wtimeout = msecs_to_jiffies(RDMA_RESOLVE_TIMEOUT) + 1; 319 struct rdma_cm_id *id; 320 int rc; 321 322 init_completion(&ia->ri_done); 323 init_completion(&ia->ri_remove_done); 324 325 id = rdma_create_id(&init_net, rpcrdma_conn_upcall, xprt, RDMA_PS_TCP, 326 IB_QPT_RC); 327 if (IS_ERR(id)) { 328 rc = PTR_ERR(id); 329 dprintk("RPC: %s: rdma_create_id() failed %i\n", 330 __func__, rc); 331 return id; 332 } 333 334 ia->ri_async_rc = -ETIMEDOUT; 335 rc = rdma_resolve_addr(id, NULL, addr, RDMA_RESOLVE_TIMEOUT); 336 if (rc) { 337 dprintk("RPC: %s: rdma_resolve_addr() failed %i\n", 338 __func__, rc); 339 goto out; 340 } 341 rc = wait_for_completion_interruptible_timeout(&ia->ri_done, wtimeout); 342 if (rc < 0) { 343 dprintk("RPC: %s: wait() exited: %i\n", 344 __func__, rc); 345 goto out; 346 } 347 348 rc = ia->ri_async_rc; 349 if (rc) 350 goto out; 351 352 ia->ri_async_rc = -ETIMEDOUT; 353 rc = rdma_resolve_route(id, RDMA_RESOLVE_TIMEOUT); 354 if (rc) { 355 dprintk("RPC: %s: rdma_resolve_route() failed %i\n", 356 __func__, rc); 357 goto out; 358 } 359 rc = wait_for_completion_interruptible_timeout(&ia->ri_done, wtimeout); 360 if (rc < 0) { 361 dprintk("RPC: %s: wait() exited: %i\n", 362 __func__, rc); 363 goto out; 364 } 365 rc = ia->ri_async_rc; 366 if (rc) 367 goto out; 368 369 return id; 370 371 out: 372 rdma_destroy_id(id); 373 return ERR_PTR(rc); 374 } 375 376 /* 377 * Exported functions. 378 */ 379 380 /** 381 * rpcrdma_ia_open - Open and initialize an Interface Adapter. 382 * @xprt: controlling transport 383 * @addr: IP address of remote peer 384 * 385 * Returns 0 on success, negative errno if an appropriate 386 * Interface Adapter could not be found and opened. 387 */ 388 int 389 rpcrdma_ia_open(struct rpcrdma_xprt *xprt, struct sockaddr *addr) 390 { 391 struct rpcrdma_ia *ia = &xprt->rx_ia; 392 int rc; 393 394 ia->ri_id = rpcrdma_create_id(xprt, ia, addr); 395 if (IS_ERR(ia->ri_id)) { 396 rc = PTR_ERR(ia->ri_id); 397 goto out_err; 398 } 399 ia->ri_device = ia->ri_id->device; 400 401 ia->ri_pd = ib_alloc_pd(ia->ri_device, 0); 402 if (IS_ERR(ia->ri_pd)) { 403 rc = PTR_ERR(ia->ri_pd); 404 pr_err("rpcrdma: ib_alloc_pd() returned %d\n", rc); 405 goto out_err; 406 } 407 408 switch (xprt_rdma_memreg_strategy) { 409 case RPCRDMA_FRMR: 410 if (frwr_is_supported(ia)) { 411 ia->ri_ops = &rpcrdma_frwr_memreg_ops; 412 break; 413 } 414 /*FALLTHROUGH*/ 415 case RPCRDMA_MTHCAFMR: 416 if (fmr_is_supported(ia)) { 417 ia->ri_ops = &rpcrdma_fmr_memreg_ops; 418 break; 419 } 420 /*FALLTHROUGH*/ 421 default: 422 pr_err("rpcrdma: Device %s does not support memreg mode %d\n", 423 ia->ri_device->name, xprt_rdma_memreg_strategy); 424 rc = -EINVAL; 425 goto out_err; 426 } 427 428 return 0; 429 430 out_err: 431 rpcrdma_ia_close(ia); 432 return rc; 433 } 434 435 /** 436 * rpcrdma_ia_remove - Handle device driver unload 437 * @ia: interface adapter being removed 438 * 439 * Divest transport H/W resources associated with this adapter, 440 * but allow it to be restored later. 441 */ 442 void 443 rpcrdma_ia_remove(struct rpcrdma_ia *ia) 444 { 445 struct rpcrdma_xprt *r_xprt = container_of(ia, struct rpcrdma_xprt, 446 rx_ia); 447 struct rpcrdma_ep *ep = &r_xprt->rx_ep; 448 struct rpcrdma_buffer *buf = &r_xprt->rx_buf; 449 struct rpcrdma_req *req; 450 struct rpcrdma_rep *rep; 451 452 cancel_delayed_work_sync(&buf->rb_refresh_worker); 453 454 /* This is similar to rpcrdma_ep_destroy, but: 455 * - Don't cancel the connect worker. 456 * - Don't call rpcrdma_ep_disconnect, which waits 457 * for another conn upcall, which will deadlock. 458 * - rdma_disconnect is unneeded, the underlying 459 * connection is already gone. 460 */ 461 if (ia->ri_id->qp) { 462 ib_drain_qp(ia->ri_id->qp); 463 rdma_destroy_qp(ia->ri_id); 464 ia->ri_id->qp = NULL; 465 } 466 ib_free_cq(ep->rep_attr.recv_cq); 467 ib_free_cq(ep->rep_attr.send_cq); 468 469 /* The ULP is responsible for ensuring all DMA 470 * mappings and MRs are gone. 471 */ 472 list_for_each_entry(rep, &buf->rb_recv_bufs, rr_list) 473 rpcrdma_dma_unmap_regbuf(rep->rr_rdmabuf); 474 list_for_each_entry(req, &buf->rb_allreqs, rl_all) { 475 rpcrdma_dma_unmap_regbuf(req->rl_rdmabuf); 476 rpcrdma_dma_unmap_regbuf(req->rl_sendbuf); 477 rpcrdma_dma_unmap_regbuf(req->rl_recvbuf); 478 } 479 rpcrdma_destroy_mrs(buf); 480 481 /* Allow waiters to continue */ 482 complete(&ia->ri_remove_done); 483 } 484 485 /** 486 * rpcrdma_ia_close - Clean up/close an IA. 487 * @ia: interface adapter to close 488 * 489 */ 490 void 491 rpcrdma_ia_close(struct rpcrdma_ia *ia) 492 { 493 dprintk("RPC: %s: entering\n", __func__); 494 if (ia->ri_id != NULL && !IS_ERR(ia->ri_id)) { 495 if (ia->ri_id->qp) 496 rdma_destroy_qp(ia->ri_id); 497 rdma_destroy_id(ia->ri_id); 498 } 499 ia->ri_id = NULL; 500 ia->ri_device = NULL; 501 502 /* If the pd is still busy, xprtrdma missed freeing a resource */ 503 if (ia->ri_pd && !IS_ERR(ia->ri_pd)) 504 ib_dealloc_pd(ia->ri_pd); 505 ia->ri_pd = NULL; 506 } 507 508 /* 509 * Create unconnected endpoint. 510 */ 511 int 512 rpcrdma_ep_create(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia, 513 struct rpcrdma_create_data_internal *cdata) 514 { 515 struct rpcrdma_connect_private *pmsg = &ep->rep_cm_private; 516 unsigned int max_qp_wr, max_sge; 517 struct ib_cq *sendcq, *recvcq; 518 int rc; 519 520 max_sge = min_t(unsigned int, ia->ri_device->attrs.max_sge, 521 RPCRDMA_MAX_SEND_SGES); 522 if (max_sge < RPCRDMA_MIN_SEND_SGES) { 523 pr_warn("rpcrdma: HCA provides only %d send SGEs\n", max_sge); 524 return -ENOMEM; 525 } 526 ia->ri_max_send_sges = max_sge - RPCRDMA_MIN_SEND_SGES; 527 528 if (ia->ri_device->attrs.max_qp_wr <= RPCRDMA_BACKWARD_WRS) { 529 dprintk("RPC: %s: insufficient wqe's available\n", 530 __func__); 531 return -ENOMEM; 532 } 533 max_qp_wr = ia->ri_device->attrs.max_qp_wr - RPCRDMA_BACKWARD_WRS - 1; 534 535 /* check provider's send/recv wr limits */ 536 if (cdata->max_requests > max_qp_wr) 537 cdata->max_requests = max_qp_wr; 538 539 ep->rep_attr.event_handler = rpcrdma_qp_async_error_upcall; 540 ep->rep_attr.qp_context = ep; 541 ep->rep_attr.srq = NULL; 542 ep->rep_attr.cap.max_send_wr = cdata->max_requests; 543 ep->rep_attr.cap.max_send_wr += RPCRDMA_BACKWARD_WRS; 544 ep->rep_attr.cap.max_send_wr += 1; /* drain cqe */ 545 rc = ia->ri_ops->ro_open(ia, ep, cdata); 546 if (rc) 547 return rc; 548 ep->rep_attr.cap.max_recv_wr = cdata->max_requests; 549 ep->rep_attr.cap.max_recv_wr += RPCRDMA_BACKWARD_WRS; 550 ep->rep_attr.cap.max_recv_wr += 1; /* drain cqe */ 551 ep->rep_attr.cap.max_send_sge = max_sge; 552 ep->rep_attr.cap.max_recv_sge = 1; 553 ep->rep_attr.cap.max_inline_data = 0; 554 ep->rep_attr.sq_sig_type = IB_SIGNAL_REQ_WR; 555 ep->rep_attr.qp_type = IB_QPT_RC; 556 ep->rep_attr.port_num = ~0; 557 558 dprintk("RPC: %s: requested max: dtos: send %d recv %d; " 559 "iovs: send %d recv %d\n", 560 __func__, 561 ep->rep_attr.cap.max_send_wr, 562 ep->rep_attr.cap.max_recv_wr, 563 ep->rep_attr.cap.max_send_sge, 564 ep->rep_attr.cap.max_recv_sge); 565 566 /* set trigger for requesting send completion */ 567 ep->rep_cqinit = ep->rep_attr.cap.max_send_wr/2 - 1; 568 if (ep->rep_cqinit <= 2) 569 ep->rep_cqinit = 0; /* always signal? */ 570 rpcrdma_init_cqcount(ep, 0); 571 init_waitqueue_head(&ep->rep_connect_wait); 572 INIT_DELAYED_WORK(&ep->rep_connect_worker, rpcrdma_connect_worker); 573 574 sendcq = ib_alloc_cq(ia->ri_device, NULL, 575 ep->rep_attr.cap.max_send_wr + 1, 576 0, IB_POLL_SOFTIRQ); 577 if (IS_ERR(sendcq)) { 578 rc = PTR_ERR(sendcq); 579 dprintk("RPC: %s: failed to create send CQ: %i\n", 580 __func__, rc); 581 goto out1; 582 } 583 584 recvcq = ib_alloc_cq(ia->ri_device, NULL, 585 ep->rep_attr.cap.max_recv_wr + 1, 586 0, IB_POLL_SOFTIRQ); 587 if (IS_ERR(recvcq)) { 588 rc = PTR_ERR(recvcq); 589 dprintk("RPC: %s: failed to create recv CQ: %i\n", 590 __func__, rc); 591 goto out2; 592 } 593 594 ep->rep_attr.send_cq = sendcq; 595 ep->rep_attr.recv_cq = recvcq; 596 597 /* Initialize cma parameters */ 598 memset(&ep->rep_remote_cma, 0, sizeof(ep->rep_remote_cma)); 599 600 /* Prepare RDMA-CM private message */ 601 pmsg->cp_magic = rpcrdma_cmp_magic; 602 pmsg->cp_version = RPCRDMA_CMP_VERSION; 603 pmsg->cp_flags |= ia->ri_ops->ro_send_w_inv_ok; 604 pmsg->cp_send_size = rpcrdma_encode_buffer_size(cdata->inline_wsize); 605 pmsg->cp_recv_size = rpcrdma_encode_buffer_size(cdata->inline_rsize); 606 ep->rep_remote_cma.private_data = pmsg; 607 ep->rep_remote_cma.private_data_len = sizeof(*pmsg); 608 609 /* Client offers RDMA Read but does not initiate */ 610 ep->rep_remote_cma.initiator_depth = 0; 611 if (ia->ri_device->attrs.max_qp_rd_atom > 32) /* arbitrary but <= 255 */ 612 ep->rep_remote_cma.responder_resources = 32; 613 else 614 ep->rep_remote_cma.responder_resources = 615 ia->ri_device->attrs.max_qp_rd_atom; 616 617 /* Limit transport retries so client can detect server 618 * GID changes quickly. RPC layer handles re-establishing 619 * transport connection and retransmission. 620 */ 621 ep->rep_remote_cma.retry_count = 6; 622 623 /* RPC-over-RDMA handles its own flow control. In addition, 624 * make all RNR NAKs visible so we know that RPC-over-RDMA 625 * flow control is working correctly (no NAKs should be seen). 626 */ 627 ep->rep_remote_cma.flow_control = 0; 628 ep->rep_remote_cma.rnr_retry_count = 0; 629 630 return 0; 631 632 out2: 633 ib_free_cq(sendcq); 634 out1: 635 return rc; 636 } 637 638 /* 639 * rpcrdma_ep_destroy 640 * 641 * Disconnect and destroy endpoint. After this, the only 642 * valid operations on the ep are to free it (if dynamically 643 * allocated) or re-create it. 644 */ 645 void 646 rpcrdma_ep_destroy(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia) 647 { 648 dprintk("RPC: %s: entering, connected is %d\n", 649 __func__, ep->rep_connected); 650 651 cancel_delayed_work_sync(&ep->rep_connect_worker); 652 653 if (ia->ri_id->qp) { 654 rpcrdma_ep_disconnect(ep, ia); 655 rdma_destroy_qp(ia->ri_id); 656 ia->ri_id->qp = NULL; 657 } 658 659 ib_free_cq(ep->rep_attr.recv_cq); 660 ib_free_cq(ep->rep_attr.send_cq); 661 } 662 663 /* Re-establish a connection after a device removal event. 664 * Unlike a normal reconnection, a fresh PD and a new set 665 * of MRs and buffers is needed. 666 */ 667 static int 668 rpcrdma_ep_recreate_xprt(struct rpcrdma_xprt *r_xprt, 669 struct rpcrdma_ep *ep, struct rpcrdma_ia *ia) 670 { 671 struct sockaddr *sap = (struct sockaddr *)&r_xprt->rx_data.addr; 672 int rc, err; 673 674 pr_info("%s: r_xprt = %p\n", __func__, r_xprt); 675 676 rc = -EHOSTUNREACH; 677 if (rpcrdma_ia_open(r_xprt, sap)) 678 goto out1; 679 680 rc = -ENOMEM; 681 err = rpcrdma_ep_create(ep, ia, &r_xprt->rx_data); 682 if (err) { 683 pr_err("rpcrdma: rpcrdma_ep_create returned %d\n", err); 684 goto out2; 685 } 686 687 rc = -ENETUNREACH; 688 err = rdma_create_qp(ia->ri_id, ia->ri_pd, &ep->rep_attr); 689 if (err) { 690 pr_err("rpcrdma: rdma_create_qp returned %d\n", err); 691 goto out3; 692 } 693 694 rpcrdma_create_mrs(r_xprt); 695 return 0; 696 697 out3: 698 rpcrdma_ep_destroy(ep, ia); 699 out2: 700 rpcrdma_ia_close(ia); 701 out1: 702 return rc; 703 } 704 705 static int 706 rpcrdma_ep_reconnect(struct rpcrdma_xprt *r_xprt, struct rpcrdma_ep *ep, 707 struct rpcrdma_ia *ia) 708 { 709 struct sockaddr *sap = (struct sockaddr *)&r_xprt->rx_data.addr; 710 struct rdma_cm_id *id, *old; 711 int err, rc; 712 713 dprintk("RPC: %s: reconnecting...\n", __func__); 714 715 rpcrdma_ep_disconnect(ep, ia); 716 717 rc = -EHOSTUNREACH; 718 id = rpcrdma_create_id(r_xprt, ia, sap); 719 if (IS_ERR(id)) 720 goto out; 721 722 /* As long as the new ID points to the same device as the 723 * old ID, we can reuse the transport's existing PD and all 724 * previously allocated MRs. Also, the same device means 725 * the transport's previous DMA mappings are still valid. 726 * 727 * This is a sanity check only. There should be no way these 728 * point to two different devices here. 729 */ 730 old = id; 731 rc = -ENETUNREACH; 732 if (ia->ri_device != id->device) { 733 pr_err("rpcrdma: can't reconnect on different device!\n"); 734 goto out_destroy; 735 } 736 737 err = rdma_create_qp(id, ia->ri_pd, &ep->rep_attr); 738 if (err) { 739 dprintk("RPC: %s: rdma_create_qp returned %d\n", 740 __func__, err); 741 goto out_destroy; 742 } 743 744 /* Atomically replace the transport's ID and QP. */ 745 rc = 0; 746 old = ia->ri_id; 747 ia->ri_id = id; 748 rdma_destroy_qp(old); 749 750 out_destroy: 751 rdma_destroy_id(old); 752 out: 753 return rc; 754 } 755 756 /* 757 * Connect unconnected endpoint. 758 */ 759 int 760 rpcrdma_ep_connect(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia) 761 { 762 struct rpcrdma_xprt *r_xprt = container_of(ia, struct rpcrdma_xprt, 763 rx_ia); 764 unsigned int extras; 765 int rc; 766 767 retry: 768 switch (ep->rep_connected) { 769 case 0: 770 dprintk("RPC: %s: connecting...\n", __func__); 771 rc = rdma_create_qp(ia->ri_id, ia->ri_pd, &ep->rep_attr); 772 if (rc) { 773 dprintk("RPC: %s: rdma_create_qp failed %i\n", 774 __func__, rc); 775 rc = -ENETUNREACH; 776 goto out_noupdate; 777 } 778 break; 779 case -ENODEV: 780 rc = rpcrdma_ep_recreate_xprt(r_xprt, ep, ia); 781 if (rc) 782 goto out_noupdate; 783 break; 784 default: 785 rc = rpcrdma_ep_reconnect(r_xprt, ep, ia); 786 if (rc) 787 goto out; 788 } 789 790 ep->rep_connected = 0; 791 792 rc = rdma_connect(ia->ri_id, &ep->rep_remote_cma); 793 if (rc) { 794 dprintk("RPC: %s: rdma_connect() failed with %i\n", 795 __func__, rc); 796 goto out; 797 } 798 799 wait_event_interruptible(ep->rep_connect_wait, ep->rep_connected != 0); 800 if (ep->rep_connected <= 0) { 801 if (ep->rep_connected == -EAGAIN) 802 goto retry; 803 rc = ep->rep_connected; 804 goto out; 805 } 806 807 dprintk("RPC: %s: connected\n", __func__); 808 extras = r_xprt->rx_buf.rb_bc_srv_max_requests; 809 if (extras) 810 rpcrdma_ep_post_extra_recv(r_xprt, extras); 811 812 out: 813 if (rc) 814 ep->rep_connected = rc; 815 816 out_noupdate: 817 return rc; 818 } 819 820 /* 821 * rpcrdma_ep_disconnect 822 * 823 * This is separate from destroy to facilitate the ability 824 * to reconnect without recreating the endpoint. 825 * 826 * This call is not reentrant, and must not be made in parallel 827 * on the same endpoint. 828 */ 829 void 830 rpcrdma_ep_disconnect(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia) 831 { 832 int rc; 833 834 rc = rdma_disconnect(ia->ri_id); 835 if (!rc) { 836 /* returns without wait if not connected */ 837 wait_event_interruptible(ep->rep_connect_wait, 838 ep->rep_connected != 1); 839 dprintk("RPC: %s: after wait, %sconnected\n", __func__, 840 (ep->rep_connected == 1) ? "still " : "dis"); 841 } else { 842 dprintk("RPC: %s: rdma_disconnect %i\n", __func__, rc); 843 ep->rep_connected = rc; 844 } 845 846 ib_drain_qp(ia->ri_id->qp); 847 } 848 849 static void 850 rpcrdma_mr_recovery_worker(struct work_struct *work) 851 { 852 struct rpcrdma_buffer *buf = container_of(work, struct rpcrdma_buffer, 853 rb_recovery_worker.work); 854 struct rpcrdma_mw *mw; 855 856 spin_lock(&buf->rb_recovery_lock); 857 while (!list_empty(&buf->rb_stale_mrs)) { 858 mw = rpcrdma_pop_mw(&buf->rb_stale_mrs); 859 spin_unlock(&buf->rb_recovery_lock); 860 861 dprintk("RPC: %s: recovering MR %p\n", __func__, mw); 862 mw->mw_xprt->rx_ia.ri_ops->ro_recover_mr(mw); 863 864 spin_lock(&buf->rb_recovery_lock); 865 } 866 spin_unlock(&buf->rb_recovery_lock); 867 } 868 869 void 870 rpcrdma_defer_mr_recovery(struct rpcrdma_mw *mw) 871 { 872 struct rpcrdma_xprt *r_xprt = mw->mw_xprt; 873 struct rpcrdma_buffer *buf = &r_xprt->rx_buf; 874 875 spin_lock(&buf->rb_recovery_lock); 876 rpcrdma_push_mw(mw, &buf->rb_stale_mrs); 877 spin_unlock(&buf->rb_recovery_lock); 878 879 schedule_delayed_work(&buf->rb_recovery_worker, 0); 880 } 881 882 static void 883 rpcrdma_create_mrs(struct rpcrdma_xprt *r_xprt) 884 { 885 struct rpcrdma_buffer *buf = &r_xprt->rx_buf; 886 struct rpcrdma_ia *ia = &r_xprt->rx_ia; 887 unsigned int count; 888 LIST_HEAD(free); 889 LIST_HEAD(all); 890 891 for (count = 0; count < 32; count++) { 892 struct rpcrdma_mw *mw; 893 int rc; 894 895 mw = kzalloc(sizeof(*mw), GFP_KERNEL); 896 if (!mw) 897 break; 898 899 rc = ia->ri_ops->ro_init_mr(ia, mw); 900 if (rc) { 901 kfree(mw); 902 break; 903 } 904 905 mw->mw_xprt = r_xprt; 906 907 list_add(&mw->mw_list, &free); 908 list_add(&mw->mw_all, &all); 909 } 910 911 spin_lock(&buf->rb_mwlock); 912 list_splice(&free, &buf->rb_mws); 913 list_splice(&all, &buf->rb_all); 914 r_xprt->rx_stats.mrs_allocated += count; 915 spin_unlock(&buf->rb_mwlock); 916 917 dprintk("RPC: %s: created %u MRs\n", __func__, count); 918 } 919 920 static void 921 rpcrdma_mr_refresh_worker(struct work_struct *work) 922 { 923 struct rpcrdma_buffer *buf = container_of(work, struct rpcrdma_buffer, 924 rb_refresh_worker.work); 925 struct rpcrdma_xprt *r_xprt = container_of(buf, struct rpcrdma_xprt, 926 rx_buf); 927 928 rpcrdma_create_mrs(r_xprt); 929 } 930 931 struct rpcrdma_req * 932 rpcrdma_create_req(struct rpcrdma_xprt *r_xprt) 933 { 934 struct rpcrdma_buffer *buffer = &r_xprt->rx_buf; 935 struct rpcrdma_req *req; 936 937 req = kzalloc(sizeof(*req), GFP_KERNEL); 938 if (req == NULL) 939 return ERR_PTR(-ENOMEM); 940 941 spin_lock(&buffer->rb_reqslock); 942 list_add(&req->rl_all, &buffer->rb_allreqs); 943 spin_unlock(&buffer->rb_reqslock); 944 req->rl_cqe.done = rpcrdma_wc_send; 945 req->rl_buffer = &r_xprt->rx_buf; 946 INIT_LIST_HEAD(&req->rl_registered); 947 req->rl_send_wr.next = NULL; 948 req->rl_send_wr.wr_cqe = &req->rl_cqe; 949 req->rl_send_wr.sg_list = req->rl_send_sge; 950 req->rl_send_wr.opcode = IB_WR_SEND; 951 return req; 952 } 953 954 struct rpcrdma_rep * 955 rpcrdma_create_rep(struct rpcrdma_xprt *r_xprt) 956 { 957 struct rpcrdma_create_data_internal *cdata = &r_xprt->rx_data; 958 struct rpcrdma_rep *rep; 959 int rc; 960 961 rc = -ENOMEM; 962 rep = kzalloc(sizeof(*rep), GFP_KERNEL); 963 if (rep == NULL) 964 goto out; 965 966 rep->rr_rdmabuf = rpcrdma_alloc_regbuf(cdata->inline_rsize, 967 DMA_FROM_DEVICE, GFP_KERNEL); 968 if (IS_ERR(rep->rr_rdmabuf)) { 969 rc = PTR_ERR(rep->rr_rdmabuf); 970 goto out_free; 971 } 972 xdr_buf_init(&rep->rr_hdrbuf, rep->rr_rdmabuf->rg_base, 973 rdmab_length(rep->rr_rdmabuf)); 974 975 rep->rr_cqe.done = rpcrdma_wc_receive; 976 rep->rr_rxprt = r_xprt; 977 INIT_WORK(&rep->rr_work, rpcrdma_reply_handler); 978 rep->rr_recv_wr.next = NULL; 979 rep->rr_recv_wr.wr_cqe = &rep->rr_cqe; 980 rep->rr_recv_wr.sg_list = &rep->rr_rdmabuf->rg_iov; 981 rep->rr_recv_wr.num_sge = 1; 982 return rep; 983 984 out_free: 985 kfree(rep); 986 out: 987 return ERR_PTR(rc); 988 } 989 990 int 991 rpcrdma_buffer_create(struct rpcrdma_xprt *r_xprt) 992 { 993 struct rpcrdma_buffer *buf = &r_xprt->rx_buf; 994 int i, rc; 995 996 buf->rb_max_requests = r_xprt->rx_data.max_requests; 997 buf->rb_bc_srv_max_requests = 0; 998 atomic_set(&buf->rb_credits, 1); 999 spin_lock_init(&buf->rb_mwlock); 1000 spin_lock_init(&buf->rb_lock); 1001 spin_lock_init(&buf->rb_recovery_lock); 1002 INIT_LIST_HEAD(&buf->rb_mws); 1003 INIT_LIST_HEAD(&buf->rb_all); 1004 INIT_LIST_HEAD(&buf->rb_stale_mrs); 1005 INIT_DELAYED_WORK(&buf->rb_refresh_worker, 1006 rpcrdma_mr_refresh_worker); 1007 INIT_DELAYED_WORK(&buf->rb_recovery_worker, 1008 rpcrdma_mr_recovery_worker); 1009 1010 rpcrdma_create_mrs(r_xprt); 1011 1012 INIT_LIST_HEAD(&buf->rb_send_bufs); 1013 INIT_LIST_HEAD(&buf->rb_allreqs); 1014 spin_lock_init(&buf->rb_reqslock); 1015 for (i = 0; i < buf->rb_max_requests; i++) { 1016 struct rpcrdma_req *req; 1017 1018 req = rpcrdma_create_req(r_xprt); 1019 if (IS_ERR(req)) { 1020 dprintk("RPC: %s: request buffer %d alloc" 1021 " failed\n", __func__, i); 1022 rc = PTR_ERR(req); 1023 goto out; 1024 } 1025 req->rl_backchannel = false; 1026 list_add(&req->rl_list, &buf->rb_send_bufs); 1027 } 1028 1029 INIT_LIST_HEAD(&buf->rb_recv_bufs); 1030 for (i = 0; i < buf->rb_max_requests + RPCRDMA_MAX_BC_REQUESTS; i++) { 1031 struct rpcrdma_rep *rep; 1032 1033 rep = rpcrdma_create_rep(r_xprt); 1034 if (IS_ERR(rep)) { 1035 dprintk("RPC: %s: reply buffer %d alloc failed\n", 1036 __func__, i); 1037 rc = PTR_ERR(rep); 1038 goto out; 1039 } 1040 list_add(&rep->rr_list, &buf->rb_recv_bufs); 1041 } 1042 1043 return 0; 1044 out: 1045 rpcrdma_buffer_destroy(buf); 1046 return rc; 1047 } 1048 1049 static struct rpcrdma_req * 1050 rpcrdma_buffer_get_req_locked(struct rpcrdma_buffer *buf) 1051 { 1052 struct rpcrdma_req *req; 1053 1054 req = list_first_entry(&buf->rb_send_bufs, 1055 struct rpcrdma_req, rl_list); 1056 list_del_init(&req->rl_list); 1057 return req; 1058 } 1059 1060 static struct rpcrdma_rep * 1061 rpcrdma_buffer_get_rep_locked(struct rpcrdma_buffer *buf) 1062 { 1063 struct rpcrdma_rep *rep; 1064 1065 rep = list_first_entry(&buf->rb_recv_bufs, 1066 struct rpcrdma_rep, rr_list); 1067 list_del(&rep->rr_list); 1068 return rep; 1069 } 1070 1071 static void 1072 rpcrdma_destroy_rep(struct rpcrdma_rep *rep) 1073 { 1074 rpcrdma_free_regbuf(rep->rr_rdmabuf); 1075 kfree(rep); 1076 } 1077 1078 void 1079 rpcrdma_destroy_req(struct rpcrdma_req *req) 1080 { 1081 rpcrdma_free_regbuf(req->rl_recvbuf); 1082 rpcrdma_free_regbuf(req->rl_sendbuf); 1083 rpcrdma_free_regbuf(req->rl_rdmabuf); 1084 kfree(req); 1085 } 1086 1087 static void 1088 rpcrdma_destroy_mrs(struct rpcrdma_buffer *buf) 1089 { 1090 struct rpcrdma_xprt *r_xprt = container_of(buf, struct rpcrdma_xprt, 1091 rx_buf); 1092 struct rpcrdma_ia *ia = rdmab_to_ia(buf); 1093 struct rpcrdma_mw *mw; 1094 unsigned int count; 1095 1096 count = 0; 1097 spin_lock(&buf->rb_mwlock); 1098 while (!list_empty(&buf->rb_all)) { 1099 mw = list_entry(buf->rb_all.next, struct rpcrdma_mw, mw_all); 1100 list_del(&mw->mw_all); 1101 1102 spin_unlock(&buf->rb_mwlock); 1103 ia->ri_ops->ro_release_mr(mw); 1104 count++; 1105 spin_lock(&buf->rb_mwlock); 1106 } 1107 spin_unlock(&buf->rb_mwlock); 1108 r_xprt->rx_stats.mrs_allocated = 0; 1109 1110 dprintk("RPC: %s: released %u MRs\n", __func__, count); 1111 } 1112 1113 void 1114 rpcrdma_buffer_destroy(struct rpcrdma_buffer *buf) 1115 { 1116 cancel_delayed_work_sync(&buf->rb_recovery_worker); 1117 cancel_delayed_work_sync(&buf->rb_refresh_worker); 1118 1119 while (!list_empty(&buf->rb_recv_bufs)) { 1120 struct rpcrdma_rep *rep; 1121 1122 rep = rpcrdma_buffer_get_rep_locked(buf); 1123 rpcrdma_destroy_rep(rep); 1124 } 1125 buf->rb_send_count = 0; 1126 1127 spin_lock(&buf->rb_reqslock); 1128 while (!list_empty(&buf->rb_allreqs)) { 1129 struct rpcrdma_req *req; 1130 1131 req = list_first_entry(&buf->rb_allreqs, 1132 struct rpcrdma_req, rl_all); 1133 list_del(&req->rl_all); 1134 1135 spin_unlock(&buf->rb_reqslock); 1136 rpcrdma_destroy_req(req); 1137 spin_lock(&buf->rb_reqslock); 1138 } 1139 spin_unlock(&buf->rb_reqslock); 1140 buf->rb_recv_count = 0; 1141 1142 rpcrdma_destroy_mrs(buf); 1143 } 1144 1145 struct rpcrdma_mw * 1146 rpcrdma_get_mw(struct rpcrdma_xprt *r_xprt) 1147 { 1148 struct rpcrdma_buffer *buf = &r_xprt->rx_buf; 1149 struct rpcrdma_mw *mw = NULL; 1150 1151 spin_lock(&buf->rb_mwlock); 1152 if (!list_empty(&buf->rb_mws)) 1153 mw = rpcrdma_pop_mw(&buf->rb_mws); 1154 spin_unlock(&buf->rb_mwlock); 1155 1156 if (!mw) 1157 goto out_nomws; 1158 mw->mw_flags = 0; 1159 return mw; 1160 1161 out_nomws: 1162 dprintk("RPC: %s: no MWs available\n", __func__); 1163 if (r_xprt->rx_ep.rep_connected != -ENODEV) 1164 schedule_delayed_work(&buf->rb_refresh_worker, 0); 1165 1166 /* Allow the reply handler and refresh worker to run */ 1167 cond_resched(); 1168 1169 return NULL; 1170 } 1171 1172 void 1173 rpcrdma_put_mw(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mw *mw) 1174 { 1175 struct rpcrdma_buffer *buf = &r_xprt->rx_buf; 1176 1177 spin_lock(&buf->rb_mwlock); 1178 rpcrdma_push_mw(mw, &buf->rb_mws); 1179 spin_unlock(&buf->rb_mwlock); 1180 } 1181 1182 static struct rpcrdma_rep * 1183 rpcrdma_buffer_get_rep(struct rpcrdma_buffer *buffers) 1184 { 1185 /* If an RPC previously completed without a reply (say, a 1186 * credential problem or a soft timeout occurs) then hold off 1187 * on supplying more Receive buffers until the number of new 1188 * pending RPCs catches up to the number of posted Receives. 1189 */ 1190 if (unlikely(buffers->rb_send_count < buffers->rb_recv_count)) 1191 return NULL; 1192 1193 if (unlikely(list_empty(&buffers->rb_recv_bufs))) 1194 return NULL; 1195 buffers->rb_recv_count++; 1196 return rpcrdma_buffer_get_rep_locked(buffers); 1197 } 1198 1199 /* 1200 * Get a set of request/reply buffers. 1201 * 1202 * Reply buffer (if available) is attached to send buffer upon return. 1203 */ 1204 struct rpcrdma_req * 1205 rpcrdma_buffer_get(struct rpcrdma_buffer *buffers) 1206 { 1207 struct rpcrdma_req *req; 1208 1209 spin_lock(&buffers->rb_lock); 1210 if (list_empty(&buffers->rb_send_bufs)) 1211 goto out_reqbuf; 1212 buffers->rb_send_count++; 1213 req = rpcrdma_buffer_get_req_locked(buffers); 1214 req->rl_reply = rpcrdma_buffer_get_rep(buffers); 1215 spin_unlock(&buffers->rb_lock); 1216 return req; 1217 1218 out_reqbuf: 1219 spin_unlock(&buffers->rb_lock); 1220 pr_warn("RPC: %s: out of request buffers\n", __func__); 1221 return NULL; 1222 } 1223 1224 /* 1225 * Put request/reply buffers back into pool. 1226 * Pre-decrement counter/array index. 1227 */ 1228 void 1229 rpcrdma_buffer_put(struct rpcrdma_req *req) 1230 { 1231 struct rpcrdma_buffer *buffers = req->rl_buffer; 1232 struct rpcrdma_rep *rep = req->rl_reply; 1233 1234 req->rl_send_wr.num_sge = 0; 1235 req->rl_reply = NULL; 1236 1237 spin_lock(&buffers->rb_lock); 1238 buffers->rb_send_count--; 1239 list_add_tail(&req->rl_list, &buffers->rb_send_bufs); 1240 if (rep) { 1241 buffers->rb_recv_count--; 1242 list_add_tail(&rep->rr_list, &buffers->rb_recv_bufs); 1243 } 1244 spin_unlock(&buffers->rb_lock); 1245 } 1246 1247 /* 1248 * Recover reply buffers from pool. 1249 * This happens when recovering from disconnect. 1250 */ 1251 void 1252 rpcrdma_recv_buffer_get(struct rpcrdma_req *req) 1253 { 1254 struct rpcrdma_buffer *buffers = req->rl_buffer; 1255 1256 spin_lock(&buffers->rb_lock); 1257 req->rl_reply = rpcrdma_buffer_get_rep(buffers); 1258 spin_unlock(&buffers->rb_lock); 1259 } 1260 1261 /* 1262 * Put reply buffers back into pool when not attached to 1263 * request. This happens in error conditions. 1264 */ 1265 void 1266 rpcrdma_recv_buffer_put(struct rpcrdma_rep *rep) 1267 { 1268 struct rpcrdma_buffer *buffers = &rep->rr_rxprt->rx_buf; 1269 1270 spin_lock(&buffers->rb_lock); 1271 buffers->rb_recv_count--; 1272 list_add_tail(&rep->rr_list, &buffers->rb_recv_bufs); 1273 spin_unlock(&buffers->rb_lock); 1274 } 1275 1276 /** 1277 * rpcrdma_alloc_regbuf - allocate and DMA-map memory for SEND/RECV buffers 1278 * @size: size of buffer to be allocated, in bytes 1279 * @direction: direction of data movement 1280 * @flags: GFP flags 1281 * 1282 * Returns an ERR_PTR, or a pointer to a regbuf, a buffer that 1283 * can be persistently DMA-mapped for I/O. 1284 * 1285 * xprtrdma uses a regbuf for posting an outgoing RDMA SEND, or for 1286 * receiving the payload of RDMA RECV operations. During Long Calls 1287 * or Replies they may be registered externally via ro_map. 1288 */ 1289 struct rpcrdma_regbuf * 1290 rpcrdma_alloc_regbuf(size_t size, enum dma_data_direction direction, 1291 gfp_t flags) 1292 { 1293 struct rpcrdma_regbuf *rb; 1294 1295 rb = kmalloc(sizeof(*rb) + size, flags); 1296 if (rb == NULL) 1297 return ERR_PTR(-ENOMEM); 1298 1299 rb->rg_device = NULL; 1300 rb->rg_direction = direction; 1301 rb->rg_iov.length = size; 1302 1303 return rb; 1304 } 1305 1306 /** 1307 * __rpcrdma_map_regbuf - DMA-map a regbuf 1308 * @ia: controlling rpcrdma_ia 1309 * @rb: regbuf to be mapped 1310 */ 1311 bool 1312 __rpcrdma_dma_map_regbuf(struct rpcrdma_ia *ia, struct rpcrdma_regbuf *rb) 1313 { 1314 struct ib_device *device = ia->ri_device; 1315 1316 if (rb->rg_direction == DMA_NONE) 1317 return false; 1318 1319 rb->rg_iov.addr = ib_dma_map_single(device, 1320 (void *)rb->rg_base, 1321 rdmab_length(rb), 1322 rb->rg_direction); 1323 if (ib_dma_mapping_error(device, rdmab_addr(rb))) 1324 return false; 1325 1326 rb->rg_device = device; 1327 rb->rg_iov.lkey = ia->ri_pd->local_dma_lkey; 1328 return true; 1329 } 1330 1331 static void 1332 rpcrdma_dma_unmap_regbuf(struct rpcrdma_regbuf *rb) 1333 { 1334 if (!rpcrdma_regbuf_is_mapped(rb)) 1335 return; 1336 1337 ib_dma_unmap_single(rb->rg_device, rdmab_addr(rb), 1338 rdmab_length(rb), rb->rg_direction); 1339 rb->rg_device = NULL; 1340 } 1341 1342 /** 1343 * rpcrdma_free_regbuf - deregister and free registered buffer 1344 * @rb: regbuf to be deregistered and freed 1345 */ 1346 void 1347 rpcrdma_free_regbuf(struct rpcrdma_regbuf *rb) 1348 { 1349 if (!rb) 1350 return; 1351 1352 rpcrdma_dma_unmap_regbuf(rb); 1353 kfree(rb); 1354 } 1355 1356 /* 1357 * Prepost any receive buffer, then post send. 1358 * 1359 * Receive buffer is donated to hardware, reclaimed upon recv completion. 1360 */ 1361 int 1362 rpcrdma_ep_post(struct rpcrdma_ia *ia, 1363 struct rpcrdma_ep *ep, 1364 struct rpcrdma_req *req) 1365 { 1366 struct ib_send_wr *send_wr = &req->rl_send_wr; 1367 struct ib_send_wr *send_wr_fail; 1368 int rc; 1369 1370 if (req->rl_reply) { 1371 rc = rpcrdma_ep_post_recv(ia, req->rl_reply); 1372 if (rc) 1373 return rc; 1374 req->rl_reply = NULL; 1375 } 1376 1377 dprintk("RPC: %s: posting %d s/g entries\n", 1378 __func__, send_wr->num_sge); 1379 1380 rpcrdma_set_signaled(ep, send_wr); 1381 rc = ib_post_send(ia->ri_id->qp, send_wr, &send_wr_fail); 1382 if (rc) 1383 goto out_postsend_err; 1384 return 0; 1385 1386 out_postsend_err: 1387 pr_err("rpcrdma: RDMA Send ib_post_send returned %i\n", rc); 1388 return -ENOTCONN; 1389 } 1390 1391 int 1392 rpcrdma_ep_post_recv(struct rpcrdma_ia *ia, 1393 struct rpcrdma_rep *rep) 1394 { 1395 struct ib_recv_wr *recv_wr_fail; 1396 int rc; 1397 1398 if (!rpcrdma_dma_map_regbuf(ia, rep->rr_rdmabuf)) 1399 goto out_map; 1400 rc = ib_post_recv(ia->ri_id->qp, &rep->rr_recv_wr, &recv_wr_fail); 1401 if (rc) 1402 goto out_postrecv; 1403 return 0; 1404 1405 out_map: 1406 pr_err("rpcrdma: failed to DMA map the Receive buffer\n"); 1407 return -EIO; 1408 1409 out_postrecv: 1410 pr_err("rpcrdma: ib_post_recv returned %i\n", rc); 1411 return -ENOTCONN; 1412 } 1413 1414 /** 1415 * rpcrdma_ep_post_extra_recv - Post buffers for incoming backchannel requests 1416 * @r_xprt: transport associated with these backchannel resources 1417 * @min_reqs: minimum number of incoming requests expected 1418 * 1419 * Returns zero if all requested buffers were posted, or a negative errno. 1420 */ 1421 int 1422 rpcrdma_ep_post_extra_recv(struct rpcrdma_xprt *r_xprt, unsigned int count) 1423 { 1424 struct rpcrdma_buffer *buffers = &r_xprt->rx_buf; 1425 struct rpcrdma_ia *ia = &r_xprt->rx_ia; 1426 struct rpcrdma_rep *rep; 1427 int rc; 1428 1429 while (count--) { 1430 spin_lock(&buffers->rb_lock); 1431 if (list_empty(&buffers->rb_recv_bufs)) 1432 goto out_reqbuf; 1433 rep = rpcrdma_buffer_get_rep_locked(buffers); 1434 spin_unlock(&buffers->rb_lock); 1435 1436 rc = rpcrdma_ep_post_recv(ia, rep); 1437 if (rc) 1438 goto out_rc; 1439 } 1440 1441 return 0; 1442 1443 out_reqbuf: 1444 spin_unlock(&buffers->rb_lock); 1445 pr_warn("%s: no extra receive buffers\n", __func__); 1446 return -ENOMEM; 1447 1448 out_rc: 1449 rpcrdma_recv_buffer_put(rep); 1450 return rc; 1451 } 1452