1 /* 2 * Copyright (c) 2003-2007 Network Appliance, Inc. All rights reserved. 3 * 4 * This software is available to you under a choice of one of two 5 * licenses. You may choose to be licensed under the terms of the GNU 6 * General Public License (GPL) Version 2, available from the file 7 * COPYING in the main directory of this source tree, or the BSD-type 8 * license below: 9 * 10 * Redistribution and use in source and binary forms, with or without 11 * modification, are permitted provided that the following conditions 12 * are met: 13 * 14 * Redistributions of source code must retain the above copyright 15 * notice, this list of conditions and the following disclaimer. 16 * 17 * Redistributions in binary form must reproduce the above 18 * copyright notice, this list of conditions and the following 19 * disclaimer in the documentation and/or other materials provided 20 * with the distribution. 21 * 22 * Neither the name of the Network Appliance, Inc. nor the names of 23 * its contributors may be used to endorse or promote products 24 * derived from this software without specific prior written 25 * permission. 26 * 27 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 28 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 29 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 30 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 31 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 32 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 33 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 34 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 35 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 36 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 37 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 38 */ 39 40 /* 41 * verbs.c 42 * 43 * Encapsulates the major functions managing: 44 * o adapters 45 * o endpoints 46 * o connections 47 * o buffer memory 48 */ 49 50 #include <linux/interrupt.h> 51 #include <linux/slab.h> 52 #include <linux/prefetch.h> 53 #include <linux/sunrpc/addr.h> 54 #include <linux/sunrpc/svc_rdma.h> 55 #include <asm/bitops.h> 56 #include <linux/module.h> /* try_module_get()/module_put() */ 57 #include <rdma/ib_cm.h> 58 59 #include "xprt_rdma.h" 60 61 /* 62 * Globals/Macros 63 */ 64 65 #if IS_ENABLED(CONFIG_SUNRPC_DEBUG) 66 # define RPCDBG_FACILITY RPCDBG_TRANS 67 #endif 68 69 /* 70 * internal functions 71 */ 72 73 static struct workqueue_struct *rpcrdma_receive_wq; 74 75 int 76 rpcrdma_alloc_wq(void) 77 { 78 struct workqueue_struct *recv_wq; 79 80 recv_wq = alloc_workqueue("xprtrdma_receive", 81 WQ_MEM_RECLAIM | WQ_UNBOUND | WQ_HIGHPRI, 82 0); 83 if (!recv_wq) 84 return -ENOMEM; 85 86 rpcrdma_receive_wq = recv_wq; 87 return 0; 88 } 89 90 void 91 rpcrdma_destroy_wq(void) 92 { 93 struct workqueue_struct *wq; 94 95 if (rpcrdma_receive_wq) { 96 wq = rpcrdma_receive_wq; 97 rpcrdma_receive_wq = NULL; 98 destroy_workqueue(wq); 99 } 100 } 101 102 static void 103 rpcrdma_qp_async_error_upcall(struct ib_event *event, void *context) 104 { 105 struct rpcrdma_ep *ep = context; 106 107 pr_err("rpcrdma: %s on device %s ep %p\n", 108 ib_event_msg(event->event), event->device->name, context); 109 110 if (ep->rep_connected == 1) { 111 ep->rep_connected = -EIO; 112 rpcrdma_conn_func(ep); 113 wake_up_all(&ep->rep_connect_wait); 114 } 115 } 116 117 /** 118 * rpcrdma_wc_send - Invoked by RDMA provider for each polled Send WC 119 * @cq: completion queue (ignored) 120 * @wc: completed WR 121 * 122 */ 123 static void 124 rpcrdma_wc_send(struct ib_cq *cq, struct ib_wc *wc) 125 { 126 /* WARNING: Only wr_cqe and status are reliable at this point */ 127 if (wc->status != IB_WC_SUCCESS && wc->status != IB_WC_WR_FLUSH_ERR) 128 pr_err("rpcrdma: Send: %s (%u/0x%x)\n", 129 ib_wc_status_msg(wc->status), 130 wc->status, wc->vendor_err); 131 } 132 133 /* Perform basic sanity checking to avoid using garbage 134 * to update the credit grant value. 135 */ 136 static void 137 rpcrdma_update_granted_credits(struct rpcrdma_rep *rep) 138 { 139 struct rpcrdma_msg *rmsgp = rdmab_to_msg(rep->rr_rdmabuf); 140 struct rpcrdma_buffer *buffer = &rep->rr_rxprt->rx_buf; 141 u32 credits; 142 143 if (rep->rr_len < RPCRDMA_HDRLEN_ERR) 144 return; 145 146 credits = be32_to_cpu(rmsgp->rm_credit); 147 if (credits == 0) 148 credits = 1; /* don't deadlock */ 149 else if (credits > buffer->rb_max_requests) 150 credits = buffer->rb_max_requests; 151 152 atomic_set(&buffer->rb_credits, credits); 153 } 154 155 /** 156 * rpcrdma_wc_receive - Invoked by RDMA provider for each polled Receive WC 157 * @cq: completion queue (ignored) 158 * @wc: completed WR 159 * 160 */ 161 static void 162 rpcrdma_wc_receive(struct ib_cq *cq, struct ib_wc *wc) 163 { 164 struct ib_cqe *cqe = wc->wr_cqe; 165 struct rpcrdma_rep *rep = container_of(cqe, struct rpcrdma_rep, 166 rr_cqe); 167 168 /* WARNING: Only wr_id and status are reliable at this point */ 169 if (wc->status != IB_WC_SUCCESS) 170 goto out_fail; 171 172 /* status == SUCCESS means all fields in wc are trustworthy */ 173 if (wc->opcode != IB_WC_RECV) 174 return; 175 176 dprintk("RPC: %s: rep %p opcode 'recv', length %u: success\n", 177 __func__, rep, wc->byte_len); 178 179 rep->rr_len = wc->byte_len; 180 rep->rr_wc_flags = wc->wc_flags; 181 rep->rr_inv_rkey = wc->ex.invalidate_rkey; 182 183 ib_dma_sync_single_for_cpu(rep->rr_device, 184 rdmab_addr(rep->rr_rdmabuf), 185 rep->rr_len, DMA_FROM_DEVICE); 186 187 rpcrdma_update_granted_credits(rep); 188 189 out_schedule: 190 queue_work(rpcrdma_receive_wq, &rep->rr_work); 191 return; 192 193 out_fail: 194 if (wc->status != IB_WC_WR_FLUSH_ERR) 195 pr_err("rpcrdma: Recv: %s (%u/0x%x)\n", 196 ib_wc_status_msg(wc->status), 197 wc->status, wc->vendor_err); 198 rep->rr_len = RPCRDMA_BAD_LEN; 199 goto out_schedule; 200 } 201 202 static void 203 rpcrdma_update_connect_private(struct rpcrdma_xprt *r_xprt, 204 struct rdma_conn_param *param) 205 { 206 struct rpcrdma_create_data_internal *cdata = &r_xprt->rx_data; 207 const struct rpcrdma_connect_private *pmsg = param->private_data; 208 unsigned int rsize, wsize; 209 210 /* Default settings for RPC-over-RDMA Version One */ 211 r_xprt->rx_ia.ri_reminv_expected = false; 212 r_xprt->rx_ia.ri_implicit_roundup = xprt_rdma_pad_optimize; 213 rsize = RPCRDMA_V1_DEF_INLINE_SIZE; 214 wsize = RPCRDMA_V1_DEF_INLINE_SIZE; 215 216 if (pmsg && 217 pmsg->cp_magic == rpcrdma_cmp_magic && 218 pmsg->cp_version == RPCRDMA_CMP_VERSION) { 219 r_xprt->rx_ia.ri_reminv_expected = true; 220 r_xprt->rx_ia.ri_implicit_roundup = true; 221 rsize = rpcrdma_decode_buffer_size(pmsg->cp_send_size); 222 wsize = rpcrdma_decode_buffer_size(pmsg->cp_recv_size); 223 } 224 225 if (rsize < cdata->inline_rsize) 226 cdata->inline_rsize = rsize; 227 if (wsize < cdata->inline_wsize) 228 cdata->inline_wsize = wsize; 229 dprintk("RPC: %s: max send %u, max recv %u\n", 230 __func__, cdata->inline_wsize, cdata->inline_rsize); 231 rpcrdma_set_max_header_sizes(r_xprt); 232 } 233 234 static int 235 rpcrdma_conn_upcall(struct rdma_cm_id *id, struct rdma_cm_event *event) 236 { 237 struct rpcrdma_xprt *xprt = id->context; 238 struct rpcrdma_ia *ia = &xprt->rx_ia; 239 struct rpcrdma_ep *ep = &xprt->rx_ep; 240 #if IS_ENABLED(CONFIG_SUNRPC_DEBUG) 241 struct sockaddr *sap = (struct sockaddr *)&ep->rep_remote_addr; 242 #endif 243 struct ib_qp_attr *attr = &ia->ri_qp_attr; 244 struct ib_qp_init_attr *iattr = &ia->ri_qp_init_attr; 245 int connstate = 0; 246 247 switch (event->event) { 248 case RDMA_CM_EVENT_ADDR_RESOLVED: 249 case RDMA_CM_EVENT_ROUTE_RESOLVED: 250 ia->ri_async_rc = 0; 251 complete(&ia->ri_done); 252 break; 253 case RDMA_CM_EVENT_ADDR_ERROR: 254 ia->ri_async_rc = -EHOSTUNREACH; 255 dprintk("RPC: %s: CM address resolution error, ep 0x%p\n", 256 __func__, ep); 257 complete(&ia->ri_done); 258 break; 259 case RDMA_CM_EVENT_ROUTE_ERROR: 260 ia->ri_async_rc = -ENETUNREACH; 261 dprintk("RPC: %s: CM route resolution error, ep 0x%p\n", 262 __func__, ep); 263 complete(&ia->ri_done); 264 break; 265 case RDMA_CM_EVENT_ESTABLISHED: 266 connstate = 1; 267 ib_query_qp(ia->ri_id->qp, attr, 268 IB_QP_MAX_QP_RD_ATOMIC | IB_QP_MAX_DEST_RD_ATOMIC, 269 iattr); 270 dprintk("RPC: %s: %d responder resources" 271 " (%d initiator)\n", 272 __func__, attr->max_dest_rd_atomic, 273 attr->max_rd_atomic); 274 rpcrdma_update_connect_private(xprt, &event->param.conn); 275 goto connected; 276 case RDMA_CM_EVENT_CONNECT_ERROR: 277 connstate = -ENOTCONN; 278 goto connected; 279 case RDMA_CM_EVENT_UNREACHABLE: 280 connstate = -ENETDOWN; 281 goto connected; 282 case RDMA_CM_EVENT_REJECTED: 283 #if IS_ENABLED(CONFIG_SUNRPC_DEBUG) 284 pr_info("rpcrdma: connection to %pIS:%u on %s rejected: %s\n", 285 sap, rpc_get_port(sap), ia->ri_device->name, 286 rdma_reject_msg(id, event->status)); 287 #endif 288 connstate = -ECONNREFUSED; 289 if (event->status == IB_CM_REJ_STALE_CONN) 290 connstate = -EAGAIN; 291 goto connected; 292 case RDMA_CM_EVENT_DISCONNECTED: 293 connstate = -ECONNABORTED; 294 goto connected; 295 case RDMA_CM_EVENT_DEVICE_REMOVAL: 296 connstate = -ENODEV; 297 connected: 298 dprintk("RPC: %s: %sconnected\n", 299 __func__, connstate > 0 ? "" : "dis"); 300 atomic_set(&xprt->rx_buf.rb_credits, 1); 301 ep->rep_connected = connstate; 302 rpcrdma_conn_func(ep); 303 wake_up_all(&ep->rep_connect_wait); 304 /*FALLTHROUGH*/ 305 default: 306 dprintk("RPC: %s: %pIS:%u (ep 0x%p): %s\n", 307 __func__, sap, rpc_get_port(sap), ep, 308 rdma_event_msg(event->event)); 309 break; 310 } 311 312 #if IS_ENABLED(CONFIG_SUNRPC_DEBUG) 313 if (connstate == 1) { 314 int ird = attr->max_dest_rd_atomic; 315 int tird = ep->rep_remote_cma.responder_resources; 316 317 pr_info("rpcrdma: connection to %pIS:%u on %s, memreg '%s', %d credits, %d responders%s\n", 318 sap, rpc_get_port(sap), 319 ia->ri_device->name, 320 ia->ri_ops->ro_displayname, 321 xprt->rx_buf.rb_max_requests, 322 ird, ird < 4 && ird < tird / 2 ? " (low!)" : ""); 323 } else if (connstate < 0) { 324 pr_info("rpcrdma: connection to %pIS:%u closed (%d)\n", 325 sap, rpc_get_port(sap), connstate); 326 } 327 #endif 328 329 return 0; 330 } 331 332 static void rpcrdma_destroy_id(struct rdma_cm_id *id) 333 { 334 if (id) { 335 module_put(id->device->owner); 336 rdma_destroy_id(id); 337 } 338 } 339 340 static struct rdma_cm_id * 341 rpcrdma_create_id(struct rpcrdma_xprt *xprt, 342 struct rpcrdma_ia *ia, struct sockaddr *addr) 343 { 344 unsigned long wtimeout = msecs_to_jiffies(RDMA_RESOLVE_TIMEOUT) + 1; 345 struct rdma_cm_id *id; 346 int rc; 347 348 init_completion(&ia->ri_done); 349 350 id = rdma_create_id(&init_net, rpcrdma_conn_upcall, xprt, RDMA_PS_TCP, 351 IB_QPT_RC); 352 if (IS_ERR(id)) { 353 rc = PTR_ERR(id); 354 dprintk("RPC: %s: rdma_create_id() failed %i\n", 355 __func__, rc); 356 return id; 357 } 358 359 ia->ri_async_rc = -ETIMEDOUT; 360 rc = rdma_resolve_addr(id, NULL, addr, RDMA_RESOLVE_TIMEOUT); 361 if (rc) { 362 dprintk("RPC: %s: rdma_resolve_addr() failed %i\n", 363 __func__, rc); 364 goto out; 365 } 366 rc = wait_for_completion_interruptible_timeout(&ia->ri_done, wtimeout); 367 if (rc < 0) { 368 dprintk("RPC: %s: wait() exited: %i\n", 369 __func__, rc); 370 goto out; 371 } 372 373 /* FIXME: 374 * Until xprtrdma supports DEVICE_REMOVAL, the provider must 375 * be pinned while there are active NFS/RDMA mounts to prevent 376 * hangs and crashes at umount time. 377 */ 378 if (!ia->ri_async_rc && !try_module_get(id->device->owner)) { 379 dprintk("RPC: %s: Failed to get device module\n", 380 __func__); 381 ia->ri_async_rc = -ENODEV; 382 } 383 rc = ia->ri_async_rc; 384 if (rc) 385 goto out; 386 387 ia->ri_async_rc = -ETIMEDOUT; 388 rc = rdma_resolve_route(id, RDMA_RESOLVE_TIMEOUT); 389 if (rc) { 390 dprintk("RPC: %s: rdma_resolve_route() failed %i\n", 391 __func__, rc); 392 goto put; 393 } 394 rc = wait_for_completion_interruptible_timeout(&ia->ri_done, wtimeout); 395 if (rc < 0) { 396 dprintk("RPC: %s: wait() exited: %i\n", 397 __func__, rc); 398 goto put; 399 } 400 rc = ia->ri_async_rc; 401 if (rc) 402 goto put; 403 404 return id; 405 put: 406 module_put(id->device->owner); 407 out: 408 rdma_destroy_id(id); 409 return ERR_PTR(rc); 410 } 411 412 /* 413 * Exported functions. 414 */ 415 416 /* 417 * Open and initialize an Interface Adapter. 418 * o initializes fields of struct rpcrdma_ia, including 419 * interface and provider attributes and protection zone. 420 */ 421 int 422 rpcrdma_ia_open(struct rpcrdma_xprt *xprt, struct sockaddr *addr, int memreg) 423 { 424 struct rpcrdma_ia *ia = &xprt->rx_ia; 425 int rc; 426 427 ia->ri_id = rpcrdma_create_id(xprt, ia, addr); 428 if (IS_ERR(ia->ri_id)) { 429 rc = PTR_ERR(ia->ri_id); 430 goto out1; 431 } 432 ia->ri_device = ia->ri_id->device; 433 434 ia->ri_pd = ib_alloc_pd(ia->ri_device, 0); 435 if (IS_ERR(ia->ri_pd)) { 436 rc = PTR_ERR(ia->ri_pd); 437 pr_err("rpcrdma: ib_alloc_pd() returned %d\n", rc); 438 goto out2; 439 } 440 441 switch (memreg) { 442 case RPCRDMA_FRMR: 443 if (frwr_is_supported(ia)) { 444 ia->ri_ops = &rpcrdma_frwr_memreg_ops; 445 break; 446 } 447 /*FALLTHROUGH*/ 448 case RPCRDMA_MTHCAFMR: 449 if (fmr_is_supported(ia)) { 450 ia->ri_ops = &rpcrdma_fmr_memreg_ops; 451 break; 452 } 453 /*FALLTHROUGH*/ 454 default: 455 pr_err("rpcrdma: Unsupported memory registration mode: %d\n", 456 memreg); 457 rc = -EINVAL; 458 goto out3; 459 } 460 461 return 0; 462 463 out3: 464 ib_dealloc_pd(ia->ri_pd); 465 ia->ri_pd = NULL; 466 out2: 467 rpcrdma_destroy_id(ia->ri_id); 468 ia->ri_id = NULL; 469 out1: 470 return rc; 471 } 472 473 /* 474 * Clean up/close an IA. 475 * o if event handles and PD have been initialized, free them. 476 * o close the IA 477 */ 478 void 479 rpcrdma_ia_close(struct rpcrdma_ia *ia) 480 { 481 dprintk("RPC: %s: entering\n", __func__); 482 if (ia->ri_id != NULL && !IS_ERR(ia->ri_id)) { 483 if (ia->ri_id->qp) 484 rdma_destroy_qp(ia->ri_id); 485 rpcrdma_destroy_id(ia->ri_id); 486 ia->ri_id = NULL; 487 } 488 489 /* If the pd is still busy, xprtrdma missed freeing a resource */ 490 if (ia->ri_pd && !IS_ERR(ia->ri_pd)) 491 ib_dealloc_pd(ia->ri_pd); 492 } 493 494 /* 495 * Create unconnected endpoint. 496 */ 497 int 498 rpcrdma_ep_create(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia, 499 struct rpcrdma_create_data_internal *cdata) 500 { 501 struct rpcrdma_connect_private *pmsg = &ep->rep_cm_private; 502 unsigned int max_qp_wr, max_sge; 503 struct ib_cq *sendcq, *recvcq; 504 int rc; 505 506 max_sge = min(ia->ri_device->attrs.max_sge, RPCRDMA_MAX_SEND_SGES); 507 if (max_sge < RPCRDMA_MIN_SEND_SGES) { 508 pr_warn("rpcrdma: HCA provides only %d send SGEs\n", max_sge); 509 return -ENOMEM; 510 } 511 ia->ri_max_send_sges = max_sge - RPCRDMA_MIN_SEND_SGES; 512 513 if (ia->ri_device->attrs.max_qp_wr <= RPCRDMA_BACKWARD_WRS) { 514 dprintk("RPC: %s: insufficient wqe's available\n", 515 __func__); 516 return -ENOMEM; 517 } 518 max_qp_wr = ia->ri_device->attrs.max_qp_wr - RPCRDMA_BACKWARD_WRS - 1; 519 520 /* check provider's send/recv wr limits */ 521 if (cdata->max_requests > max_qp_wr) 522 cdata->max_requests = max_qp_wr; 523 524 ep->rep_attr.event_handler = rpcrdma_qp_async_error_upcall; 525 ep->rep_attr.qp_context = ep; 526 ep->rep_attr.srq = NULL; 527 ep->rep_attr.cap.max_send_wr = cdata->max_requests; 528 ep->rep_attr.cap.max_send_wr += RPCRDMA_BACKWARD_WRS; 529 ep->rep_attr.cap.max_send_wr += 1; /* drain cqe */ 530 rc = ia->ri_ops->ro_open(ia, ep, cdata); 531 if (rc) 532 return rc; 533 ep->rep_attr.cap.max_recv_wr = cdata->max_requests; 534 ep->rep_attr.cap.max_recv_wr += RPCRDMA_BACKWARD_WRS; 535 ep->rep_attr.cap.max_recv_wr += 1; /* drain cqe */ 536 ep->rep_attr.cap.max_send_sge = max_sge; 537 ep->rep_attr.cap.max_recv_sge = 1; 538 ep->rep_attr.cap.max_inline_data = 0; 539 ep->rep_attr.sq_sig_type = IB_SIGNAL_REQ_WR; 540 ep->rep_attr.qp_type = IB_QPT_RC; 541 ep->rep_attr.port_num = ~0; 542 543 dprintk("RPC: %s: requested max: dtos: send %d recv %d; " 544 "iovs: send %d recv %d\n", 545 __func__, 546 ep->rep_attr.cap.max_send_wr, 547 ep->rep_attr.cap.max_recv_wr, 548 ep->rep_attr.cap.max_send_sge, 549 ep->rep_attr.cap.max_recv_sge); 550 551 /* set trigger for requesting send completion */ 552 ep->rep_cqinit = ep->rep_attr.cap.max_send_wr/2 - 1; 553 if (ep->rep_cqinit <= 2) 554 ep->rep_cqinit = 0; /* always signal? */ 555 rpcrdma_init_cqcount(ep, 0); 556 init_waitqueue_head(&ep->rep_connect_wait); 557 INIT_DELAYED_WORK(&ep->rep_connect_worker, rpcrdma_connect_worker); 558 559 sendcq = ib_alloc_cq(ia->ri_device, NULL, 560 ep->rep_attr.cap.max_send_wr + 1, 561 0, IB_POLL_SOFTIRQ); 562 if (IS_ERR(sendcq)) { 563 rc = PTR_ERR(sendcq); 564 dprintk("RPC: %s: failed to create send CQ: %i\n", 565 __func__, rc); 566 goto out1; 567 } 568 569 recvcq = ib_alloc_cq(ia->ri_device, NULL, 570 ep->rep_attr.cap.max_recv_wr + 1, 571 0, IB_POLL_SOFTIRQ); 572 if (IS_ERR(recvcq)) { 573 rc = PTR_ERR(recvcq); 574 dprintk("RPC: %s: failed to create recv CQ: %i\n", 575 __func__, rc); 576 goto out2; 577 } 578 579 ep->rep_attr.send_cq = sendcq; 580 ep->rep_attr.recv_cq = recvcq; 581 582 /* Initialize cma parameters */ 583 memset(&ep->rep_remote_cma, 0, sizeof(ep->rep_remote_cma)); 584 585 /* Prepare RDMA-CM private message */ 586 pmsg->cp_magic = rpcrdma_cmp_magic; 587 pmsg->cp_version = RPCRDMA_CMP_VERSION; 588 pmsg->cp_flags |= ia->ri_ops->ro_send_w_inv_ok; 589 pmsg->cp_send_size = rpcrdma_encode_buffer_size(cdata->inline_wsize); 590 pmsg->cp_recv_size = rpcrdma_encode_buffer_size(cdata->inline_rsize); 591 ep->rep_remote_cma.private_data = pmsg; 592 ep->rep_remote_cma.private_data_len = sizeof(*pmsg); 593 594 /* Client offers RDMA Read but does not initiate */ 595 ep->rep_remote_cma.initiator_depth = 0; 596 if (ia->ri_device->attrs.max_qp_rd_atom > 32) /* arbitrary but <= 255 */ 597 ep->rep_remote_cma.responder_resources = 32; 598 else 599 ep->rep_remote_cma.responder_resources = 600 ia->ri_device->attrs.max_qp_rd_atom; 601 602 /* Limit transport retries so client can detect server 603 * GID changes quickly. RPC layer handles re-establishing 604 * transport connection and retransmission. 605 */ 606 ep->rep_remote_cma.retry_count = 6; 607 608 /* RPC-over-RDMA handles its own flow control. In addition, 609 * make all RNR NAKs visible so we know that RPC-over-RDMA 610 * flow control is working correctly (no NAKs should be seen). 611 */ 612 ep->rep_remote_cma.flow_control = 0; 613 ep->rep_remote_cma.rnr_retry_count = 0; 614 615 return 0; 616 617 out2: 618 ib_free_cq(sendcq); 619 out1: 620 return rc; 621 } 622 623 /* 624 * rpcrdma_ep_destroy 625 * 626 * Disconnect and destroy endpoint. After this, the only 627 * valid operations on the ep are to free it (if dynamically 628 * allocated) or re-create it. 629 */ 630 void 631 rpcrdma_ep_destroy(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia) 632 { 633 dprintk("RPC: %s: entering, connected is %d\n", 634 __func__, ep->rep_connected); 635 636 cancel_delayed_work_sync(&ep->rep_connect_worker); 637 638 if (ia->ri_id->qp) { 639 rpcrdma_ep_disconnect(ep, ia); 640 rdma_destroy_qp(ia->ri_id); 641 ia->ri_id->qp = NULL; 642 } 643 644 ib_free_cq(ep->rep_attr.recv_cq); 645 ib_free_cq(ep->rep_attr.send_cq); 646 } 647 648 /* 649 * Connect unconnected endpoint. 650 */ 651 int 652 rpcrdma_ep_connect(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia) 653 { 654 struct rpcrdma_xprt *r_xprt = container_of(ia, struct rpcrdma_xprt, 655 rx_ia); 656 struct rdma_cm_id *id, *old; 657 struct sockaddr *sap; 658 unsigned int extras; 659 int rc = 0; 660 661 if (ep->rep_connected != 0) { 662 retry: 663 dprintk("RPC: %s: reconnecting...\n", __func__); 664 665 rpcrdma_ep_disconnect(ep, ia); 666 667 sap = (struct sockaddr *)&r_xprt->rx_data.addr; 668 id = rpcrdma_create_id(r_xprt, ia, sap); 669 if (IS_ERR(id)) { 670 rc = -EHOSTUNREACH; 671 goto out; 672 } 673 /* TEMP TEMP TEMP - fail if new device: 674 * Deregister/remarshal *all* requests! 675 * Close and recreate adapter, pd, etc! 676 * Re-determine all attributes still sane! 677 * More stuff I haven't thought of! 678 * Rrrgh! 679 */ 680 if (ia->ri_device != id->device) { 681 printk("RPC: %s: can't reconnect on " 682 "different device!\n", __func__); 683 rpcrdma_destroy_id(id); 684 rc = -ENETUNREACH; 685 goto out; 686 } 687 /* END TEMP */ 688 rc = rdma_create_qp(id, ia->ri_pd, &ep->rep_attr); 689 if (rc) { 690 dprintk("RPC: %s: rdma_create_qp failed %i\n", 691 __func__, rc); 692 rpcrdma_destroy_id(id); 693 rc = -ENETUNREACH; 694 goto out; 695 } 696 697 old = ia->ri_id; 698 ia->ri_id = id; 699 700 rdma_destroy_qp(old); 701 rpcrdma_destroy_id(old); 702 } else { 703 dprintk("RPC: %s: connecting...\n", __func__); 704 rc = rdma_create_qp(ia->ri_id, ia->ri_pd, &ep->rep_attr); 705 if (rc) { 706 dprintk("RPC: %s: rdma_create_qp failed %i\n", 707 __func__, rc); 708 /* do not update ep->rep_connected */ 709 return -ENETUNREACH; 710 } 711 } 712 713 ep->rep_connected = 0; 714 715 rc = rdma_connect(ia->ri_id, &ep->rep_remote_cma); 716 if (rc) { 717 dprintk("RPC: %s: rdma_connect() failed with %i\n", 718 __func__, rc); 719 goto out; 720 } 721 722 wait_event_interruptible(ep->rep_connect_wait, ep->rep_connected != 0); 723 if (ep->rep_connected <= 0) { 724 if (ep->rep_connected == -EAGAIN) 725 goto retry; 726 rc = ep->rep_connected; 727 goto out; 728 } 729 730 dprintk("RPC: %s: connected\n", __func__); 731 extras = r_xprt->rx_buf.rb_bc_srv_max_requests; 732 if (extras) 733 rpcrdma_ep_post_extra_recv(r_xprt, extras); 734 735 out: 736 if (rc) 737 ep->rep_connected = rc; 738 return rc; 739 } 740 741 /* 742 * rpcrdma_ep_disconnect 743 * 744 * This is separate from destroy to facilitate the ability 745 * to reconnect without recreating the endpoint. 746 * 747 * This call is not reentrant, and must not be made in parallel 748 * on the same endpoint. 749 */ 750 void 751 rpcrdma_ep_disconnect(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia) 752 { 753 int rc; 754 755 rc = rdma_disconnect(ia->ri_id); 756 if (!rc) { 757 /* returns without wait if not connected */ 758 wait_event_interruptible(ep->rep_connect_wait, 759 ep->rep_connected != 1); 760 dprintk("RPC: %s: after wait, %sconnected\n", __func__, 761 (ep->rep_connected == 1) ? "still " : "dis"); 762 } else { 763 dprintk("RPC: %s: rdma_disconnect %i\n", __func__, rc); 764 ep->rep_connected = rc; 765 } 766 767 ib_drain_qp(ia->ri_id->qp); 768 } 769 770 static void 771 rpcrdma_mr_recovery_worker(struct work_struct *work) 772 { 773 struct rpcrdma_buffer *buf = container_of(work, struct rpcrdma_buffer, 774 rb_recovery_worker.work); 775 struct rpcrdma_mw *mw; 776 777 spin_lock(&buf->rb_recovery_lock); 778 while (!list_empty(&buf->rb_stale_mrs)) { 779 mw = rpcrdma_pop_mw(&buf->rb_stale_mrs); 780 spin_unlock(&buf->rb_recovery_lock); 781 782 dprintk("RPC: %s: recovering MR %p\n", __func__, mw); 783 mw->mw_xprt->rx_ia.ri_ops->ro_recover_mr(mw); 784 785 spin_lock(&buf->rb_recovery_lock); 786 } 787 spin_unlock(&buf->rb_recovery_lock); 788 } 789 790 void 791 rpcrdma_defer_mr_recovery(struct rpcrdma_mw *mw) 792 { 793 struct rpcrdma_xprt *r_xprt = mw->mw_xprt; 794 struct rpcrdma_buffer *buf = &r_xprt->rx_buf; 795 796 spin_lock(&buf->rb_recovery_lock); 797 rpcrdma_push_mw(mw, &buf->rb_stale_mrs); 798 spin_unlock(&buf->rb_recovery_lock); 799 800 schedule_delayed_work(&buf->rb_recovery_worker, 0); 801 } 802 803 static void 804 rpcrdma_create_mrs(struct rpcrdma_xprt *r_xprt) 805 { 806 struct rpcrdma_buffer *buf = &r_xprt->rx_buf; 807 struct rpcrdma_ia *ia = &r_xprt->rx_ia; 808 unsigned int count; 809 LIST_HEAD(free); 810 LIST_HEAD(all); 811 812 for (count = 0; count < 32; count++) { 813 struct rpcrdma_mw *mw; 814 int rc; 815 816 mw = kzalloc(sizeof(*mw), GFP_KERNEL); 817 if (!mw) 818 break; 819 820 rc = ia->ri_ops->ro_init_mr(ia, mw); 821 if (rc) { 822 kfree(mw); 823 break; 824 } 825 826 mw->mw_xprt = r_xprt; 827 828 list_add(&mw->mw_list, &free); 829 list_add(&mw->mw_all, &all); 830 } 831 832 spin_lock(&buf->rb_mwlock); 833 list_splice(&free, &buf->rb_mws); 834 list_splice(&all, &buf->rb_all); 835 r_xprt->rx_stats.mrs_allocated += count; 836 spin_unlock(&buf->rb_mwlock); 837 838 dprintk("RPC: %s: created %u MRs\n", __func__, count); 839 } 840 841 static void 842 rpcrdma_mr_refresh_worker(struct work_struct *work) 843 { 844 struct rpcrdma_buffer *buf = container_of(work, struct rpcrdma_buffer, 845 rb_refresh_worker.work); 846 struct rpcrdma_xprt *r_xprt = container_of(buf, struct rpcrdma_xprt, 847 rx_buf); 848 849 rpcrdma_create_mrs(r_xprt); 850 } 851 852 struct rpcrdma_req * 853 rpcrdma_create_req(struct rpcrdma_xprt *r_xprt) 854 { 855 struct rpcrdma_buffer *buffer = &r_xprt->rx_buf; 856 struct rpcrdma_req *req; 857 858 req = kzalloc(sizeof(*req), GFP_KERNEL); 859 if (req == NULL) 860 return ERR_PTR(-ENOMEM); 861 862 INIT_LIST_HEAD(&req->rl_free); 863 spin_lock(&buffer->rb_reqslock); 864 list_add(&req->rl_all, &buffer->rb_allreqs); 865 spin_unlock(&buffer->rb_reqslock); 866 req->rl_cqe.done = rpcrdma_wc_send; 867 req->rl_buffer = &r_xprt->rx_buf; 868 INIT_LIST_HEAD(&req->rl_registered); 869 req->rl_send_wr.next = NULL; 870 req->rl_send_wr.wr_cqe = &req->rl_cqe; 871 req->rl_send_wr.sg_list = req->rl_send_sge; 872 req->rl_send_wr.opcode = IB_WR_SEND; 873 return req; 874 } 875 876 struct rpcrdma_rep * 877 rpcrdma_create_rep(struct rpcrdma_xprt *r_xprt) 878 { 879 struct rpcrdma_create_data_internal *cdata = &r_xprt->rx_data; 880 struct rpcrdma_ia *ia = &r_xprt->rx_ia; 881 struct rpcrdma_rep *rep; 882 int rc; 883 884 rc = -ENOMEM; 885 rep = kzalloc(sizeof(*rep), GFP_KERNEL); 886 if (rep == NULL) 887 goto out; 888 889 rep->rr_rdmabuf = rpcrdma_alloc_regbuf(cdata->inline_rsize, 890 DMA_FROM_DEVICE, GFP_KERNEL); 891 if (IS_ERR(rep->rr_rdmabuf)) { 892 rc = PTR_ERR(rep->rr_rdmabuf); 893 goto out_free; 894 } 895 896 rep->rr_device = ia->ri_device; 897 rep->rr_cqe.done = rpcrdma_wc_receive; 898 rep->rr_rxprt = r_xprt; 899 INIT_WORK(&rep->rr_work, rpcrdma_reply_handler); 900 rep->rr_recv_wr.next = NULL; 901 rep->rr_recv_wr.wr_cqe = &rep->rr_cqe; 902 rep->rr_recv_wr.sg_list = &rep->rr_rdmabuf->rg_iov; 903 rep->rr_recv_wr.num_sge = 1; 904 return rep; 905 906 out_free: 907 kfree(rep); 908 out: 909 return ERR_PTR(rc); 910 } 911 912 int 913 rpcrdma_buffer_create(struct rpcrdma_xprt *r_xprt) 914 { 915 struct rpcrdma_buffer *buf = &r_xprt->rx_buf; 916 int i, rc; 917 918 buf->rb_max_requests = r_xprt->rx_data.max_requests; 919 buf->rb_bc_srv_max_requests = 0; 920 atomic_set(&buf->rb_credits, 1); 921 spin_lock_init(&buf->rb_mwlock); 922 spin_lock_init(&buf->rb_lock); 923 spin_lock_init(&buf->rb_recovery_lock); 924 INIT_LIST_HEAD(&buf->rb_mws); 925 INIT_LIST_HEAD(&buf->rb_all); 926 INIT_LIST_HEAD(&buf->rb_stale_mrs); 927 INIT_DELAYED_WORK(&buf->rb_refresh_worker, 928 rpcrdma_mr_refresh_worker); 929 INIT_DELAYED_WORK(&buf->rb_recovery_worker, 930 rpcrdma_mr_recovery_worker); 931 932 rpcrdma_create_mrs(r_xprt); 933 934 INIT_LIST_HEAD(&buf->rb_send_bufs); 935 INIT_LIST_HEAD(&buf->rb_allreqs); 936 spin_lock_init(&buf->rb_reqslock); 937 for (i = 0; i < buf->rb_max_requests; i++) { 938 struct rpcrdma_req *req; 939 940 req = rpcrdma_create_req(r_xprt); 941 if (IS_ERR(req)) { 942 dprintk("RPC: %s: request buffer %d alloc" 943 " failed\n", __func__, i); 944 rc = PTR_ERR(req); 945 goto out; 946 } 947 req->rl_backchannel = false; 948 list_add(&req->rl_free, &buf->rb_send_bufs); 949 } 950 951 INIT_LIST_HEAD(&buf->rb_recv_bufs); 952 for (i = 0; i < buf->rb_max_requests + RPCRDMA_MAX_BC_REQUESTS; i++) { 953 struct rpcrdma_rep *rep; 954 955 rep = rpcrdma_create_rep(r_xprt); 956 if (IS_ERR(rep)) { 957 dprintk("RPC: %s: reply buffer %d alloc failed\n", 958 __func__, i); 959 rc = PTR_ERR(rep); 960 goto out; 961 } 962 list_add(&rep->rr_list, &buf->rb_recv_bufs); 963 } 964 965 return 0; 966 out: 967 rpcrdma_buffer_destroy(buf); 968 return rc; 969 } 970 971 static struct rpcrdma_req * 972 rpcrdma_buffer_get_req_locked(struct rpcrdma_buffer *buf) 973 { 974 struct rpcrdma_req *req; 975 976 req = list_first_entry(&buf->rb_send_bufs, 977 struct rpcrdma_req, rl_free); 978 list_del(&req->rl_free); 979 return req; 980 } 981 982 static struct rpcrdma_rep * 983 rpcrdma_buffer_get_rep_locked(struct rpcrdma_buffer *buf) 984 { 985 struct rpcrdma_rep *rep; 986 987 rep = list_first_entry(&buf->rb_recv_bufs, 988 struct rpcrdma_rep, rr_list); 989 list_del(&rep->rr_list); 990 return rep; 991 } 992 993 static void 994 rpcrdma_destroy_rep(struct rpcrdma_rep *rep) 995 { 996 rpcrdma_free_regbuf(rep->rr_rdmabuf); 997 kfree(rep); 998 } 999 1000 void 1001 rpcrdma_destroy_req(struct rpcrdma_req *req) 1002 { 1003 rpcrdma_free_regbuf(req->rl_recvbuf); 1004 rpcrdma_free_regbuf(req->rl_sendbuf); 1005 rpcrdma_free_regbuf(req->rl_rdmabuf); 1006 kfree(req); 1007 } 1008 1009 static void 1010 rpcrdma_destroy_mrs(struct rpcrdma_buffer *buf) 1011 { 1012 struct rpcrdma_xprt *r_xprt = container_of(buf, struct rpcrdma_xprt, 1013 rx_buf); 1014 struct rpcrdma_ia *ia = rdmab_to_ia(buf); 1015 struct rpcrdma_mw *mw; 1016 unsigned int count; 1017 1018 count = 0; 1019 spin_lock(&buf->rb_mwlock); 1020 while (!list_empty(&buf->rb_all)) { 1021 mw = list_entry(buf->rb_all.next, struct rpcrdma_mw, mw_all); 1022 list_del(&mw->mw_all); 1023 1024 spin_unlock(&buf->rb_mwlock); 1025 ia->ri_ops->ro_release_mr(mw); 1026 count++; 1027 spin_lock(&buf->rb_mwlock); 1028 } 1029 spin_unlock(&buf->rb_mwlock); 1030 r_xprt->rx_stats.mrs_allocated = 0; 1031 1032 dprintk("RPC: %s: released %u MRs\n", __func__, count); 1033 } 1034 1035 void 1036 rpcrdma_buffer_destroy(struct rpcrdma_buffer *buf) 1037 { 1038 cancel_delayed_work_sync(&buf->rb_recovery_worker); 1039 1040 while (!list_empty(&buf->rb_recv_bufs)) { 1041 struct rpcrdma_rep *rep; 1042 1043 rep = rpcrdma_buffer_get_rep_locked(buf); 1044 rpcrdma_destroy_rep(rep); 1045 } 1046 buf->rb_send_count = 0; 1047 1048 spin_lock(&buf->rb_reqslock); 1049 while (!list_empty(&buf->rb_allreqs)) { 1050 struct rpcrdma_req *req; 1051 1052 req = list_first_entry(&buf->rb_allreqs, 1053 struct rpcrdma_req, rl_all); 1054 list_del(&req->rl_all); 1055 1056 spin_unlock(&buf->rb_reqslock); 1057 rpcrdma_destroy_req(req); 1058 spin_lock(&buf->rb_reqslock); 1059 } 1060 spin_unlock(&buf->rb_reqslock); 1061 buf->rb_recv_count = 0; 1062 1063 rpcrdma_destroy_mrs(buf); 1064 } 1065 1066 struct rpcrdma_mw * 1067 rpcrdma_get_mw(struct rpcrdma_xprt *r_xprt) 1068 { 1069 struct rpcrdma_buffer *buf = &r_xprt->rx_buf; 1070 struct rpcrdma_mw *mw = NULL; 1071 1072 spin_lock(&buf->rb_mwlock); 1073 if (!list_empty(&buf->rb_mws)) 1074 mw = rpcrdma_pop_mw(&buf->rb_mws); 1075 spin_unlock(&buf->rb_mwlock); 1076 1077 if (!mw) 1078 goto out_nomws; 1079 return mw; 1080 1081 out_nomws: 1082 dprintk("RPC: %s: no MWs available\n", __func__); 1083 schedule_delayed_work(&buf->rb_refresh_worker, 0); 1084 1085 /* Allow the reply handler and refresh worker to run */ 1086 cond_resched(); 1087 1088 return NULL; 1089 } 1090 1091 void 1092 rpcrdma_put_mw(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mw *mw) 1093 { 1094 struct rpcrdma_buffer *buf = &r_xprt->rx_buf; 1095 1096 spin_lock(&buf->rb_mwlock); 1097 rpcrdma_push_mw(mw, &buf->rb_mws); 1098 spin_unlock(&buf->rb_mwlock); 1099 } 1100 1101 static struct rpcrdma_rep * 1102 rpcrdma_buffer_get_rep(struct rpcrdma_buffer *buffers) 1103 { 1104 /* If an RPC previously completed without a reply (say, a 1105 * credential problem or a soft timeout occurs) then hold off 1106 * on supplying more Receive buffers until the number of new 1107 * pending RPCs catches up to the number of posted Receives. 1108 */ 1109 if (unlikely(buffers->rb_send_count < buffers->rb_recv_count)) 1110 return NULL; 1111 1112 if (unlikely(list_empty(&buffers->rb_recv_bufs))) 1113 return NULL; 1114 buffers->rb_recv_count++; 1115 return rpcrdma_buffer_get_rep_locked(buffers); 1116 } 1117 1118 /* 1119 * Get a set of request/reply buffers. 1120 * 1121 * Reply buffer (if available) is attached to send buffer upon return. 1122 */ 1123 struct rpcrdma_req * 1124 rpcrdma_buffer_get(struct rpcrdma_buffer *buffers) 1125 { 1126 struct rpcrdma_req *req; 1127 1128 spin_lock(&buffers->rb_lock); 1129 if (list_empty(&buffers->rb_send_bufs)) 1130 goto out_reqbuf; 1131 buffers->rb_send_count++; 1132 req = rpcrdma_buffer_get_req_locked(buffers); 1133 req->rl_reply = rpcrdma_buffer_get_rep(buffers); 1134 spin_unlock(&buffers->rb_lock); 1135 return req; 1136 1137 out_reqbuf: 1138 spin_unlock(&buffers->rb_lock); 1139 pr_warn("RPC: %s: out of request buffers\n", __func__); 1140 return NULL; 1141 } 1142 1143 /* 1144 * Put request/reply buffers back into pool. 1145 * Pre-decrement counter/array index. 1146 */ 1147 void 1148 rpcrdma_buffer_put(struct rpcrdma_req *req) 1149 { 1150 struct rpcrdma_buffer *buffers = req->rl_buffer; 1151 struct rpcrdma_rep *rep = req->rl_reply; 1152 1153 req->rl_send_wr.num_sge = 0; 1154 req->rl_reply = NULL; 1155 1156 spin_lock(&buffers->rb_lock); 1157 buffers->rb_send_count--; 1158 list_add_tail(&req->rl_free, &buffers->rb_send_bufs); 1159 if (rep) { 1160 buffers->rb_recv_count--; 1161 list_add_tail(&rep->rr_list, &buffers->rb_recv_bufs); 1162 } 1163 spin_unlock(&buffers->rb_lock); 1164 } 1165 1166 /* 1167 * Recover reply buffers from pool. 1168 * This happens when recovering from disconnect. 1169 */ 1170 void 1171 rpcrdma_recv_buffer_get(struct rpcrdma_req *req) 1172 { 1173 struct rpcrdma_buffer *buffers = req->rl_buffer; 1174 1175 spin_lock(&buffers->rb_lock); 1176 req->rl_reply = rpcrdma_buffer_get_rep(buffers); 1177 spin_unlock(&buffers->rb_lock); 1178 } 1179 1180 /* 1181 * Put reply buffers back into pool when not attached to 1182 * request. This happens in error conditions. 1183 */ 1184 void 1185 rpcrdma_recv_buffer_put(struct rpcrdma_rep *rep) 1186 { 1187 struct rpcrdma_buffer *buffers = &rep->rr_rxprt->rx_buf; 1188 1189 spin_lock(&buffers->rb_lock); 1190 buffers->rb_recv_count--; 1191 list_add_tail(&rep->rr_list, &buffers->rb_recv_bufs); 1192 spin_unlock(&buffers->rb_lock); 1193 } 1194 1195 /** 1196 * rpcrdma_alloc_regbuf - allocate and DMA-map memory for SEND/RECV buffers 1197 * @size: size of buffer to be allocated, in bytes 1198 * @direction: direction of data movement 1199 * @flags: GFP flags 1200 * 1201 * Returns an ERR_PTR, or a pointer to a regbuf, a buffer that 1202 * can be persistently DMA-mapped for I/O. 1203 * 1204 * xprtrdma uses a regbuf for posting an outgoing RDMA SEND, or for 1205 * receiving the payload of RDMA RECV operations. During Long Calls 1206 * or Replies they may be registered externally via ro_map. 1207 */ 1208 struct rpcrdma_regbuf * 1209 rpcrdma_alloc_regbuf(size_t size, enum dma_data_direction direction, 1210 gfp_t flags) 1211 { 1212 struct rpcrdma_regbuf *rb; 1213 1214 rb = kmalloc(sizeof(*rb) + size, flags); 1215 if (rb == NULL) 1216 return ERR_PTR(-ENOMEM); 1217 1218 rb->rg_device = NULL; 1219 rb->rg_direction = direction; 1220 rb->rg_iov.length = size; 1221 1222 return rb; 1223 } 1224 1225 /** 1226 * __rpcrdma_map_regbuf - DMA-map a regbuf 1227 * @ia: controlling rpcrdma_ia 1228 * @rb: regbuf to be mapped 1229 */ 1230 bool 1231 __rpcrdma_dma_map_regbuf(struct rpcrdma_ia *ia, struct rpcrdma_regbuf *rb) 1232 { 1233 if (rb->rg_direction == DMA_NONE) 1234 return false; 1235 1236 rb->rg_iov.addr = ib_dma_map_single(ia->ri_device, 1237 (void *)rb->rg_base, 1238 rdmab_length(rb), 1239 rb->rg_direction); 1240 if (ib_dma_mapping_error(ia->ri_device, rdmab_addr(rb))) 1241 return false; 1242 1243 rb->rg_device = ia->ri_device; 1244 rb->rg_iov.lkey = ia->ri_pd->local_dma_lkey; 1245 return true; 1246 } 1247 1248 static void 1249 rpcrdma_dma_unmap_regbuf(struct rpcrdma_regbuf *rb) 1250 { 1251 if (!rpcrdma_regbuf_is_mapped(rb)) 1252 return; 1253 1254 ib_dma_unmap_single(rb->rg_device, rdmab_addr(rb), 1255 rdmab_length(rb), rb->rg_direction); 1256 rb->rg_device = NULL; 1257 } 1258 1259 /** 1260 * rpcrdma_free_regbuf - deregister and free registered buffer 1261 * @rb: regbuf to be deregistered and freed 1262 */ 1263 void 1264 rpcrdma_free_regbuf(struct rpcrdma_regbuf *rb) 1265 { 1266 if (!rb) 1267 return; 1268 1269 rpcrdma_dma_unmap_regbuf(rb); 1270 kfree(rb); 1271 } 1272 1273 /* 1274 * Prepost any receive buffer, then post send. 1275 * 1276 * Receive buffer is donated to hardware, reclaimed upon recv completion. 1277 */ 1278 int 1279 rpcrdma_ep_post(struct rpcrdma_ia *ia, 1280 struct rpcrdma_ep *ep, 1281 struct rpcrdma_req *req) 1282 { 1283 struct ib_send_wr *send_wr = &req->rl_send_wr; 1284 struct ib_send_wr *send_wr_fail; 1285 int rc; 1286 1287 if (req->rl_reply) { 1288 rc = rpcrdma_ep_post_recv(ia, req->rl_reply); 1289 if (rc) 1290 return rc; 1291 req->rl_reply = NULL; 1292 } 1293 1294 dprintk("RPC: %s: posting %d s/g entries\n", 1295 __func__, send_wr->num_sge); 1296 1297 rpcrdma_set_signaled(ep, send_wr); 1298 rc = ib_post_send(ia->ri_id->qp, send_wr, &send_wr_fail); 1299 if (rc) 1300 goto out_postsend_err; 1301 return 0; 1302 1303 out_postsend_err: 1304 pr_err("rpcrdma: RDMA Send ib_post_send returned %i\n", rc); 1305 return -ENOTCONN; 1306 } 1307 1308 int 1309 rpcrdma_ep_post_recv(struct rpcrdma_ia *ia, 1310 struct rpcrdma_rep *rep) 1311 { 1312 struct ib_recv_wr *recv_wr_fail; 1313 int rc; 1314 1315 if (!rpcrdma_dma_map_regbuf(ia, rep->rr_rdmabuf)) 1316 goto out_map; 1317 rc = ib_post_recv(ia->ri_id->qp, &rep->rr_recv_wr, &recv_wr_fail); 1318 if (rc) 1319 goto out_postrecv; 1320 return 0; 1321 1322 out_map: 1323 pr_err("rpcrdma: failed to DMA map the Receive buffer\n"); 1324 return -EIO; 1325 1326 out_postrecv: 1327 pr_err("rpcrdma: ib_post_recv returned %i\n", rc); 1328 return -ENOTCONN; 1329 } 1330 1331 /** 1332 * rpcrdma_ep_post_extra_recv - Post buffers for incoming backchannel requests 1333 * @r_xprt: transport associated with these backchannel resources 1334 * @min_reqs: minimum number of incoming requests expected 1335 * 1336 * Returns zero if all requested buffers were posted, or a negative errno. 1337 */ 1338 int 1339 rpcrdma_ep_post_extra_recv(struct rpcrdma_xprt *r_xprt, unsigned int count) 1340 { 1341 struct rpcrdma_buffer *buffers = &r_xprt->rx_buf; 1342 struct rpcrdma_ia *ia = &r_xprt->rx_ia; 1343 struct rpcrdma_rep *rep; 1344 int rc; 1345 1346 while (count--) { 1347 spin_lock(&buffers->rb_lock); 1348 if (list_empty(&buffers->rb_recv_bufs)) 1349 goto out_reqbuf; 1350 rep = rpcrdma_buffer_get_rep_locked(buffers); 1351 spin_unlock(&buffers->rb_lock); 1352 1353 rc = rpcrdma_ep_post_recv(ia, rep); 1354 if (rc) 1355 goto out_rc; 1356 } 1357 1358 return 0; 1359 1360 out_reqbuf: 1361 spin_unlock(&buffers->rb_lock); 1362 pr_warn("%s: no extra receive buffers\n", __func__); 1363 return -ENOMEM; 1364 1365 out_rc: 1366 rpcrdma_recv_buffer_put(rep); 1367 return rc; 1368 } 1369