1 /* 2 * Copyright (c) 2003-2007 Network Appliance, Inc. All rights reserved. 3 * 4 * This software is available to you under a choice of one of two 5 * licenses. You may choose to be licensed under the terms of the GNU 6 * General Public License (GPL) Version 2, available from the file 7 * COPYING in the main directory of this source tree, or the BSD-type 8 * license below: 9 * 10 * Redistribution and use in source and binary forms, with or without 11 * modification, are permitted provided that the following conditions 12 * are met: 13 * 14 * Redistributions of source code must retain the above copyright 15 * notice, this list of conditions and the following disclaimer. 16 * 17 * Redistributions in binary form must reproduce the above 18 * copyright notice, this list of conditions and the following 19 * disclaimer in the documentation and/or other materials provided 20 * with the distribution. 21 * 22 * Neither the name of the Network Appliance, Inc. nor the names of 23 * its contributors may be used to endorse or promote products 24 * derived from this software without specific prior written 25 * permission. 26 * 27 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 28 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 29 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 30 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 31 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 32 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 33 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 34 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 35 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 36 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 37 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 38 */ 39 40 /* 41 * verbs.c 42 * 43 * Encapsulates the major functions managing: 44 * o adapters 45 * o endpoints 46 * o connections 47 * o buffer memory 48 */ 49 50 #include <linux/interrupt.h> 51 #include <linux/slab.h> 52 #include <linux/prefetch.h> 53 #include <linux/sunrpc/addr.h> 54 #include <linux/sunrpc/svc_rdma.h> 55 #include <asm/bitops.h> 56 #include <linux/module.h> /* try_module_get()/module_put() */ 57 #include <rdma/ib_cm.h> 58 59 #include "xprt_rdma.h" 60 61 /* 62 * Globals/Macros 63 */ 64 65 #if IS_ENABLED(CONFIG_SUNRPC_DEBUG) 66 # define RPCDBG_FACILITY RPCDBG_TRANS 67 #endif 68 69 /* 70 * internal functions 71 */ 72 73 static struct workqueue_struct *rpcrdma_receive_wq; 74 75 int 76 rpcrdma_alloc_wq(void) 77 { 78 struct workqueue_struct *recv_wq; 79 80 recv_wq = alloc_workqueue("xprtrdma_receive", 81 WQ_MEM_RECLAIM | WQ_UNBOUND | WQ_HIGHPRI, 82 0); 83 if (!recv_wq) 84 return -ENOMEM; 85 86 rpcrdma_receive_wq = recv_wq; 87 return 0; 88 } 89 90 void 91 rpcrdma_destroy_wq(void) 92 { 93 struct workqueue_struct *wq; 94 95 if (rpcrdma_receive_wq) { 96 wq = rpcrdma_receive_wq; 97 rpcrdma_receive_wq = NULL; 98 destroy_workqueue(wq); 99 } 100 } 101 102 static void 103 rpcrdma_qp_async_error_upcall(struct ib_event *event, void *context) 104 { 105 struct rpcrdma_ep *ep = context; 106 107 pr_err("rpcrdma: %s on device %s ep %p\n", 108 ib_event_msg(event->event), event->device->name, context); 109 110 if (ep->rep_connected == 1) { 111 ep->rep_connected = -EIO; 112 rpcrdma_conn_func(ep); 113 wake_up_all(&ep->rep_connect_wait); 114 } 115 } 116 117 /** 118 * rpcrdma_wc_send - Invoked by RDMA provider for each polled Send WC 119 * @cq: completion queue (ignored) 120 * @wc: completed WR 121 * 122 */ 123 static void 124 rpcrdma_wc_send(struct ib_cq *cq, struct ib_wc *wc) 125 { 126 /* WARNING: Only wr_cqe and status are reliable at this point */ 127 if (wc->status != IB_WC_SUCCESS && wc->status != IB_WC_WR_FLUSH_ERR) 128 pr_err("rpcrdma: Send: %s (%u/0x%x)\n", 129 ib_wc_status_msg(wc->status), 130 wc->status, wc->vendor_err); 131 } 132 133 /* Perform basic sanity checking to avoid using garbage 134 * to update the credit grant value. 135 */ 136 static void 137 rpcrdma_update_granted_credits(struct rpcrdma_rep *rep) 138 { 139 struct rpcrdma_msg *rmsgp = rdmab_to_msg(rep->rr_rdmabuf); 140 struct rpcrdma_buffer *buffer = &rep->rr_rxprt->rx_buf; 141 u32 credits; 142 143 if (rep->rr_len < RPCRDMA_HDRLEN_ERR) 144 return; 145 146 credits = be32_to_cpu(rmsgp->rm_credit); 147 if (credits == 0) 148 credits = 1; /* don't deadlock */ 149 else if (credits > buffer->rb_max_requests) 150 credits = buffer->rb_max_requests; 151 152 atomic_set(&buffer->rb_credits, credits); 153 } 154 155 /** 156 * rpcrdma_wc_receive - Invoked by RDMA provider for each polled Receive WC 157 * @cq: completion queue (ignored) 158 * @wc: completed WR 159 * 160 */ 161 static void 162 rpcrdma_wc_receive(struct ib_cq *cq, struct ib_wc *wc) 163 { 164 struct ib_cqe *cqe = wc->wr_cqe; 165 struct rpcrdma_rep *rep = container_of(cqe, struct rpcrdma_rep, 166 rr_cqe); 167 168 /* WARNING: Only wr_id and status are reliable at this point */ 169 if (wc->status != IB_WC_SUCCESS) 170 goto out_fail; 171 172 /* status == SUCCESS means all fields in wc are trustworthy */ 173 if (wc->opcode != IB_WC_RECV) 174 return; 175 176 dprintk("RPC: %s: rep %p opcode 'recv', length %u: success\n", 177 __func__, rep, wc->byte_len); 178 179 rep->rr_len = wc->byte_len; 180 rep->rr_wc_flags = wc->wc_flags; 181 rep->rr_inv_rkey = wc->ex.invalidate_rkey; 182 183 ib_dma_sync_single_for_cpu(rep->rr_device, 184 rdmab_addr(rep->rr_rdmabuf), 185 rep->rr_len, DMA_FROM_DEVICE); 186 187 rpcrdma_update_granted_credits(rep); 188 189 out_schedule: 190 queue_work(rpcrdma_receive_wq, &rep->rr_work); 191 return; 192 193 out_fail: 194 if (wc->status != IB_WC_WR_FLUSH_ERR) 195 pr_err("rpcrdma: Recv: %s (%u/0x%x)\n", 196 ib_wc_status_msg(wc->status), 197 wc->status, wc->vendor_err); 198 rep->rr_len = RPCRDMA_BAD_LEN; 199 goto out_schedule; 200 } 201 202 static void 203 rpcrdma_update_connect_private(struct rpcrdma_xprt *r_xprt, 204 struct rdma_conn_param *param) 205 { 206 struct rpcrdma_create_data_internal *cdata = &r_xprt->rx_data; 207 const struct rpcrdma_connect_private *pmsg = param->private_data; 208 unsigned int rsize, wsize; 209 210 /* Default settings for RPC-over-RDMA Version One */ 211 r_xprt->rx_ia.ri_reminv_expected = false; 212 r_xprt->rx_ia.ri_implicit_roundup = xprt_rdma_pad_optimize; 213 rsize = RPCRDMA_V1_DEF_INLINE_SIZE; 214 wsize = RPCRDMA_V1_DEF_INLINE_SIZE; 215 216 if (pmsg && 217 pmsg->cp_magic == rpcrdma_cmp_magic && 218 pmsg->cp_version == RPCRDMA_CMP_VERSION) { 219 r_xprt->rx_ia.ri_reminv_expected = true; 220 r_xprt->rx_ia.ri_implicit_roundup = true; 221 rsize = rpcrdma_decode_buffer_size(pmsg->cp_send_size); 222 wsize = rpcrdma_decode_buffer_size(pmsg->cp_recv_size); 223 } 224 225 if (rsize < cdata->inline_rsize) 226 cdata->inline_rsize = rsize; 227 if (wsize < cdata->inline_wsize) 228 cdata->inline_wsize = wsize; 229 dprintk("RPC: %s: max send %u, max recv %u\n", 230 __func__, cdata->inline_wsize, cdata->inline_rsize); 231 rpcrdma_set_max_header_sizes(r_xprt); 232 } 233 234 static int 235 rpcrdma_conn_upcall(struct rdma_cm_id *id, struct rdma_cm_event *event) 236 { 237 struct rpcrdma_xprt *xprt = id->context; 238 struct rpcrdma_ia *ia = &xprt->rx_ia; 239 struct rpcrdma_ep *ep = &xprt->rx_ep; 240 #if IS_ENABLED(CONFIG_SUNRPC_DEBUG) 241 struct sockaddr *sap = (struct sockaddr *)&ep->rep_remote_addr; 242 #endif 243 struct ib_qp_attr *attr = &ia->ri_qp_attr; 244 struct ib_qp_init_attr *iattr = &ia->ri_qp_init_attr; 245 int connstate = 0; 246 247 switch (event->event) { 248 case RDMA_CM_EVENT_ADDR_RESOLVED: 249 case RDMA_CM_EVENT_ROUTE_RESOLVED: 250 ia->ri_async_rc = 0; 251 complete(&ia->ri_done); 252 break; 253 case RDMA_CM_EVENT_ADDR_ERROR: 254 ia->ri_async_rc = -EHOSTUNREACH; 255 dprintk("RPC: %s: CM address resolution error, ep 0x%p\n", 256 __func__, ep); 257 complete(&ia->ri_done); 258 break; 259 case RDMA_CM_EVENT_ROUTE_ERROR: 260 ia->ri_async_rc = -ENETUNREACH; 261 dprintk("RPC: %s: CM route resolution error, ep 0x%p\n", 262 __func__, ep); 263 complete(&ia->ri_done); 264 break; 265 case RDMA_CM_EVENT_ESTABLISHED: 266 connstate = 1; 267 ib_query_qp(ia->ri_id->qp, attr, 268 IB_QP_MAX_QP_RD_ATOMIC | IB_QP_MAX_DEST_RD_ATOMIC, 269 iattr); 270 dprintk("RPC: %s: %d responder resources" 271 " (%d initiator)\n", 272 __func__, attr->max_dest_rd_atomic, 273 attr->max_rd_atomic); 274 rpcrdma_update_connect_private(xprt, &event->param.conn); 275 goto connected; 276 case RDMA_CM_EVENT_CONNECT_ERROR: 277 connstate = -ENOTCONN; 278 goto connected; 279 case RDMA_CM_EVENT_UNREACHABLE: 280 connstate = -ENETDOWN; 281 goto connected; 282 case RDMA_CM_EVENT_REJECTED: 283 #if IS_ENABLED(CONFIG_SUNRPC_DEBUG) 284 pr_info("rpcrdma: connection to %pIS:%u on %s rejected: %s\n", 285 sap, rpc_get_port(sap), ia->ri_device->name, 286 rdma_reject_msg(id, event->status)); 287 #endif 288 connstate = -ECONNREFUSED; 289 if (event->status == IB_CM_REJ_STALE_CONN) 290 connstate = -EAGAIN; 291 goto connected; 292 case RDMA_CM_EVENT_DISCONNECTED: 293 connstate = -ECONNABORTED; 294 goto connected; 295 case RDMA_CM_EVENT_DEVICE_REMOVAL: 296 connstate = -ENODEV; 297 connected: 298 dprintk("RPC: %s: %sconnected\n", 299 __func__, connstate > 0 ? "" : "dis"); 300 atomic_set(&xprt->rx_buf.rb_credits, 1); 301 ep->rep_connected = connstate; 302 rpcrdma_conn_func(ep); 303 wake_up_all(&ep->rep_connect_wait); 304 /*FALLTHROUGH*/ 305 default: 306 dprintk("RPC: %s: %pIS:%u (ep 0x%p): %s\n", 307 __func__, sap, rpc_get_port(sap), ep, 308 rdma_event_msg(event->event)); 309 break; 310 } 311 312 #if IS_ENABLED(CONFIG_SUNRPC_DEBUG) 313 if (connstate == 1) { 314 int ird = attr->max_dest_rd_atomic; 315 int tird = ep->rep_remote_cma.responder_resources; 316 317 pr_info("rpcrdma: connection to %pIS:%u on %s, memreg '%s', %d credits, %d responders%s\n", 318 sap, rpc_get_port(sap), 319 ia->ri_device->name, 320 ia->ri_ops->ro_displayname, 321 xprt->rx_buf.rb_max_requests, 322 ird, ird < 4 && ird < tird / 2 ? " (low!)" : ""); 323 } else if (connstate < 0) { 324 pr_info("rpcrdma: connection to %pIS:%u closed (%d)\n", 325 sap, rpc_get_port(sap), connstate); 326 } 327 #endif 328 329 return 0; 330 } 331 332 static void rpcrdma_destroy_id(struct rdma_cm_id *id) 333 { 334 if (id) { 335 module_put(id->device->owner); 336 rdma_destroy_id(id); 337 } 338 } 339 340 static struct rdma_cm_id * 341 rpcrdma_create_id(struct rpcrdma_xprt *xprt, 342 struct rpcrdma_ia *ia, struct sockaddr *addr) 343 { 344 unsigned long wtimeout = msecs_to_jiffies(RDMA_RESOLVE_TIMEOUT) + 1; 345 struct rdma_cm_id *id; 346 int rc; 347 348 init_completion(&ia->ri_done); 349 350 id = rdma_create_id(&init_net, rpcrdma_conn_upcall, xprt, RDMA_PS_TCP, 351 IB_QPT_RC); 352 if (IS_ERR(id)) { 353 rc = PTR_ERR(id); 354 dprintk("RPC: %s: rdma_create_id() failed %i\n", 355 __func__, rc); 356 return id; 357 } 358 359 ia->ri_async_rc = -ETIMEDOUT; 360 rc = rdma_resolve_addr(id, NULL, addr, RDMA_RESOLVE_TIMEOUT); 361 if (rc) { 362 dprintk("RPC: %s: rdma_resolve_addr() failed %i\n", 363 __func__, rc); 364 goto out; 365 } 366 rc = wait_for_completion_interruptible_timeout(&ia->ri_done, wtimeout); 367 if (rc < 0) { 368 dprintk("RPC: %s: wait() exited: %i\n", 369 __func__, rc); 370 goto out; 371 } 372 373 /* FIXME: 374 * Until xprtrdma supports DEVICE_REMOVAL, the provider must 375 * be pinned while there are active NFS/RDMA mounts to prevent 376 * hangs and crashes at umount time. 377 */ 378 if (!ia->ri_async_rc && !try_module_get(id->device->owner)) { 379 dprintk("RPC: %s: Failed to get device module\n", 380 __func__); 381 ia->ri_async_rc = -ENODEV; 382 } 383 rc = ia->ri_async_rc; 384 if (rc) 385 goto out; 386 387 ia->ri_async_rc = -ETIMEDOUT; 388 rc = rdma_resolve_route(id, RDMA_RESOLVE_TIMEOUT); 389 if (rc) { 390 dprintk("RPC: %s: rdma_resolve_route() failed %i\n", 391 __func__, rc); 392 goto put; 393 } 394 rc = wait_for_completion_interruptible_timeout(&ia->ri_done, wtimeout); 395 if (rc < 0) { 396 dprintk("RPC: %s: wait() exited: %i\n", 397 __func__, rc); 398 goto put; 399 } 400 rc = ia->ri_async_rc; 401 if (rc) 402 goto put; 403 404 return id; 405 put: 406 module_put(id->device->owner); 407 out: 408 rdma_destroy_id(id); 409 return ERR_PTR(rc); 410 } 411 412 /* 413 * Exported functions. 414 */ 415 416 /* 417 * Open and initialize an Interface Adapter. 418 * o initializes fields of struct rpcrdma_ia, including 419 * interface and provider attributes and protection zone. 420 */ 421 int 422 rpcrdma_ia_open(struct rpcrdma_xprt *xprt, struct sockaddr *addr, int memreg) 423 { 424 struct rpcrdma_ia *ia = &xprt->rx_ia; 425 int rc; 426 427 ia->ri_id = rpcrdma_create_id(xprt, ia, addr); 428 if (IS_ERR(ia->ri_id)) { 429 rc = PTR_ERR(ia->ri_id); 430 goto out1; 431 } 432 ia->ri_device = ia->ri_id->device; 433 434 ia->ri_pd = ib_alloc_pd(ia->ri_device, 0); 435 if (IS_ERR(ia->ri_pd)) { 436 rc = PTR_ERR(ia->ri_pd); 437 pr_err("rpcrdma: ib_alloc_pd() returned %d\n", rc); 438 goto out2; 439 } 440 441 switch (memreg) { 442 case RPCRDMA_FRMR: 443 if (frwr_is_supported(ia)) { 444 ia->ri_ops = &rpcrdma_frwr_memreg_ops; 445 break; 446 } 447 /*FALLTHROUGH*/ 448 case RPCRDMA_MTHCAFMR: 449 if (fmr_is_supported(ia)) { 450 ia->ri_ops = &rpcrdma_fmr_memreg_ops; 451 break; 452 } 453 /*FALLTHROUGH*/ 454 default: 455 pr_err("rpcrdma: Unsupported memory registration mode: %d\n", 456 memreg); 457 rc = -EINVAL; 458 goto out3; 459 } 460 461 return 0; 462 463 out3: 464 ib_dealloc_pd(ia->ri_pd); 465 ia->ri_pd = NULL; 466 out2: 467 rpcrdma_destroy_id(ia->ri_id); 468 ia->ri_id = NULL; 469 out1: 470 return rc; 471 } 472 473 /* 474 * Clean up/close an IA. 475 * o if event handles and PD have been initialized, free them. 476 * o close the IA 477 */ 478 void 479 rpcrdma_ia_close(struct rpcrdma_ia *ia) 480 { 481 dprintk("RPC: %s: entering\n", __func__); 482 if (ia->ri_id != NULL && !IS_ERR(ia->ri_id)) { 483 if (ia->ri_id->qp) 484 rdma_destroy_qp(ia->ri_id); 485 rpcrdma_destroy_id(ia->ri_id); 486 ia->ri_id = NULL; 487 } 488 489 /* If the pd is still busy, xprtrdma missed freeing a resource */ 490 if (ia->ri_pd && !IS_ERR(ia->ri_pd)) 491 ib_dealloc_pd(ia->ri_pd); 492 } 493 494 /* 495 * Create unconnected endpoint. 496 */ 497 int 498 rpcrdma_ep_create(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia, 499 struct rpcrdma_create_data_internal *cdata) 500 { 501 struct rpcrdma_connect_private *pmsg = &ep->rep_cm_private; 502 unsigned int max_qp_wr, max_sge; 503 struct ib_cq *sendcq, *recvcq; 504 int rc; 505 506 max_sge = min_t(unsigned int, ia->ri_device->attrs.max_sge, 507 RPCRDMA_MAX_SEND_SGES); 508 if (max_sge < RPCRDMA_MIN_SEND_SGES) { 509 pr_warn("rpcrdma: HCA provides only %d send SGEs\n", max_sge); 510 return -ENOMEM; 511 } 512 ia->ri_max_send_sges = max_sge - RPCRDMA_MIN_SEND_SGES; 513 514 if (ia->ri_device->attrs.max_qp_wr <= RPCRDMA_BACKWARD_WRS) { 515 dprintk("RPC: %s: insufficient wqe's available\n", 516 __func__); 517 return -ENOMEM; 518 } 519 max_qp_wr = ia->ri_device->attrs.max_qp_wr - RPCRDMA_BACKWARD_WRS - 1; 520 521 /* check provider's send/recv wr limits */ 522 if (cdata->max_requests > max_qp_wr) 523 cdata->max_requests = max_qp_wr; 524 525 ep->rep_attr.event_handler = rpcrdma_qp_async_error_upcall; 526 ep->rep_attr.qp_context = ep; 527 ep->rep_attr.srq = NULL; 528 ep->rep_attr.cap.max_send_wr = cdata->max_requests; 529 ep->rep_attr.cap.max_send_wr += RPCRDMA_BACKWARD_WRS; 530 ep->rep_attr.cap.max_send_wr += 1; /* drain cqe */ 531 rc = ia->ri_ops->ro_open(ia, ep, cdata); 532 if (rc) 533 return rc; 534 ep->rep_attr.cap.max_recv_wr = cdata->max_requests; 535 ep->rep_attr.cap.max_recv_wr += RPCRDMA_BACKWARD_WRS; 536 ep->rep_attr.cap.max_recv_wr += 1; /* drain cqe */ 537 ep->rep_attr.cap.max_send_sge = max_sge; 538 ep->rep_attr.cap.max_recv_sge = 1; 539 ep->rep_attr.cap.max_inline_data = 0; 540 ep->rep_attr.sq_sig_type = IB_SIGNAL_REQ_WR; 541 ep->rep_attr.qp_type = IB_QPT_RC; 542 ep->rep_attr.port_num = ~0; 543 544 dprintk("RPC: %s: requested max: dtos: send %d recv %d; " 545 "iovs: send %d recv %d\n", 546 __func__, 547 ep->rep_attr.cap.max_send_wr, 548 ep->rep_attr.cap.max_recv_wr, 549 ep->rep_attr.cap.max_send_sge, 550 ep->rep_attr.cap.max_recv_sge); 551 552 /* set trigger for requesting send completion */ 553 ep->rep_cqinit = ep->rep_attr.cap.max_send_wr/2 - 1; 554 if (ep->rep_cqinit <= 2) 555 ep->rep_cqinit = 0; /* always signal? */ 556 rpcrdma_init_cqcount(ep, 0); 557 init_waitqueue_head(&ep->rep_connect_wait); 558 INIT_DELAYED_WORK(&ep->rep_connect_worker, rpcrdma_connect_worker); 559 560 sendcq = ib_alloc_cq(ia->ri_device, NULL, 561 ep->rep_attr.cap.max_send_wr + 1, 562 0, IB_POLL_SOFTIRQ); 563 if (IS_ERR(sendcq)) { 564 rc = PTR_ERR(sendcq); 565 dprintk("RPC: %s: failed to create send CQ: %i\n", 566 __func__, rc); 567 goto out1; 568 } 569 570 recvcq = ib_alloc_cq(ia->ri_device, NULL, 571 ep->rep_attr.cap.max_recv_wr + 1, 572 0, IB_POLL_SOFTIRQ); 573 if (IS_ERR(recvcq)) { 574 rc = PTR_ERR(recvcq); 575 dprintk("RPC: %s: failed to create recv CQ: %i\n", 576 __func__, rc); 577 goto out2; 578 } 579 580 ep->rep_attr.send_cq = sendcq; 581 ep->rep_attr.recv_cq = recvcq; 582 583 /* Initialize cma parameters */ 584 memset(&ep->rep_remote_cma, 0, sizeof(ep->rep_remote_cma)); 585 586 /* Prepare RDMA-CM private message */ 587 pmsg->cp_magic = rpcrdma_cmp_magic; 588 pmsg->cp_version = RPCRDMA_CMP_VERSION; 589 pmsg->cp_flags |= ia->ri_ops->ro_send_w_inv_ok; 590 pmsg->cp_send_size = rpcrdma_encode_buffer_size(cdata->inline_wsize); 591 pmsg->cp_recv_size = rpcrdma_encode_buffer_size(cdata->inline_rsize); 592 ep->rep_remote_cma.private_data = pmsg; 593 ep->rep_remote_cma.private_data_len = sizeof(*pmsg); 594 595 /* Client offers RDMA Read but does not initiate */ 596 ep->rep_remote_cma.initiator_depth = 0; 597 if (ia->ri_device->attrs.max_qp_rd_atom > 32) /* arbitrary but <= 255 */ 598 ep->rep_remote_cma.responder_resources = 32; 599 else 600 ep->rep_remote_cma.responder_resources = 601 ia->ri_device->attrs.max_qp_rd_atom; 602 603 /* Limit transport retries so client can detect server 604 * GID changes quickly. RPC layer handles re-establishing 605 * transport connection and retransmission. 606 */ 607 ep->rep_remote_cma.retry_count = 6; 608 609 /* RPC-over-RDMA handles its own flow control. In addition, 610 * make all RNR NAKs visible so we know that RPC-over-RDMA 611 * flow control is working correctly (no NAKs should be seen). 612 */ 613 ep->rep_remote_cma.flow_control = 0; 614 ep->rep_remote_cma.rnr_retry_count = 0; 615 616 return 0; 617 618 out2: 619 ib_free_cq(sendcq); 620 out1: 621 return rc; 622 } 623 624 /* 625 * rpcrdma_ep_destroy 626 * 627 * Disconnect and destroy endpoint. After this, the only 628 * valid operations on the ep are to free it (if dynamically 629 * allocated) or re-create it. 630 */ 631 void 632 rpcrdma_ep_destroy(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia) 633 { 634 dprintk("RPC: %s: entering, connected is %d\n", 635 __func__, ep->rep_connected); 636 637 cancel_delayed_work_sync(&ep->rep_connect_worker); 638 639 if (ia->ri_id->qp) { 640 rpcrdma_ep_disconnect(ep, ia); 641 rdma_destroy_qp(ia->ri_id); 642 ia->ri_id->qp = NULL; 643 } 644 645 ib_free_cq(ep->rep_attr.recv_cq); 646 ib_free_cq(ep->rep_attr.send_cq); 647 } 648 649 /* 650 * Connect unconnected endpoint. 651 */ 652 int 653 rpcrdma_ep_connect(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia) 654 { 655 struct rpcrdma_xprt *r_xprt = container_of(ia, struct rpcrdma_xprt, 656 rx_ia); 657 struct rdma_cm_id *id, *old; 658 struct sockaddr *sap; 659 unsigned int extras; 660 int rc = 0; 661 662 if (ep->rep_connected != 0) { 663 retry: 664 dprintk("RPC: %s: reconnecting...\n", __func__); 665 666 rpcrdma_ep_disconnect(ep, ia); 667 668 sap = (struct sockaddr *)&r_xprt->rx_data.addr; 669 id = rpcrdma_create_id(r_xprt, ia, sap); 670 if (IS_ERR(id)) { 671 rc = -EHOSTUNREACH; 672 goto out; 673 } 674 /* TEMP TEMP TEMP - fail if new device: 675 * Deregister/remarshal *all* requests! 676 * Close and recreate adapter, pd, etc! 677 * Re-determine all attributes still sane! 678 * More stuff I haven't thought of! 679 * Rrrgh! 680 */ 681 if (ia->ri_device != id->device) { 682 printk("RPC: %s: can't reconnect on " 683 "different device!\n", __func__); 684 rpcrdma_destroy_id(id); 685 rc = -ENETUNREACH; 686 goto out; 687 } 688 /* END TEMP */ 689 rc = rdma_create_qp(id, ia->ri_pd, &ep->rep_attr); 690 if (rc) { 691 dprintk("RPC: %s: rdma_create_qp failed %i\n", 692 __func__, rc); 693 rpcrdma_destroy_id(id); 694 rc = -ENETUNREACH; 695 goto out; 696 } 697 698 old = ia->ri_id; 699 ia->ri_id = id; 700 701 rdma_destroy_qp(old); 702 rpcrdma_destroy_id(old); 703 } else { 704 dprintk("RPC: %s: connecting...\n", __func__); 705 rc = rdma_create_qp(ia->ri_id, ia->ri_pd, &ep->rep_attr); 706 if (rc) { 707 dprintk("RPC: %s: rdma_create_qp failed %i\n", 708 __func__, rc); 709 /* do not update ep->rep_connected */ 710 return -ENETUNREACH; 711 } 712 } 713 714 ep->rep_connected = 0; 715 716 rc = rdma_connect(ia->ri_id, &ep->rep_remote_cma); 717 if (rc) { 718 dprintk("RPC: %s: rdma_connect() failed with %i\n", 719 __func__, rc); 720 goto out; 721 } 722 723 wait_event_interruptible(ep->rep_connect_wait, ep->rep_connected != 0); 724 if (ep->rep_connected <= 0) { 725 if (ep->rep_connected == -EAGAIN) 726 goto retry; 727 rc = ep->rep_connected; 728 goto out; 729 } 730 731 dprintk("RPC: %s: connected\n", __func__); 732 extras = r_xprt->rx_buf.rb_bc_srv_max_requests; 733 if (extras) 734 rpcrdma_ep_post_extra_recv(r_xprt, extras); 735 736 out: 737 if (rc) 738 ep->rep_connected = rc; 739 return rc; 740 } 741 742 /* 743 * rpcrdma_ep_disconnect 744 * 745 * This is separate from destroy to facilitate the ability 746 * to reconnect without recreating the endpoint. 747 * 748 * This call is not reentrant, and must not be made in parallel 749 * on the same endpoint. 750 */ 751 void 752 rpcrdma_ep_disconnect(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia) 753 { 754 int rc; 755 756 rc = rdma_disconnect(ia->ri_id); 757 if (!rc) { 758 /* returns without wait if not connected */ 759 wait_event_interruptible(ep->rep_connect_wait, 760 ep->rep_connected != 1); 761 dprintk("RPC: %s: after wait, %sconnected\n", __func__, 762 (ep->rep_connected == 1) ? "still " : "dis"); 763 } else { 764 dprintk("RPC: %s: rdma_disconnect %i\n", __func__, rc); 765 ep->rep_connected = rc; 766 } 767 768 ib_drain_qp(ia->ri_id->qp); 769 } 770 771 static void 772 rpcrdma_mr_recovery_worker(struct work_struct *work) 773 { 774 struct rpcrdma_buffer *buf = container_of(work, struct rpcrdma_buffer, 775 rb_recovery_worker.work); 776 struct rpcrdma_mw *mw; 777 778 spin_lock(&buf->rb_recovery_lock); 779 while (!list_empty(&buf->rb_stale_mrs)) { 780 mw = rpcrdma_pop_mw(&buf->rb_stale_mrs); 781 spin_unlock(&buf->rb_recovery_lock); 782 783 dprintk("RPC: %s: recovering MR %p\n", __func__, mw); 784 mw->mw_xprt->rx_ia.ri_ops->ro_recover_mr(mw); 785 786 spin_lock(&buf->rb_recovery_lock); 787 } 788 spin_unlock(&buf->rb_recovery_lock); 789 } 790 791 void 792 rpcrdma_defer_mr_recovery(struct rpcrdma_mw *mw) 793 { 794 struct rpcrdma_xprt *r_xprt = mw->mw_xprt; 795 struct rpcrdma_buffer *buf = &r_xprt->rx_buf; 796 797 spin_lock(&buf->rb_recovery_lock); 798 rpcrdma_push_mw(mw, &buf->rb_stale_mrs); 799 spin_unlock(&buf->rb_recovery_lock); 800 801 schedule_delayed_work(&buf->rb_recovery_worker, 0); 802 } 803 804 static void 805 rpcrdma_create_mrs(struct rpcrdma_xprt *r_xprt) 806 { 807 struct rpcrdma_buffer *buf = &r_xprt->rx_buf; 808 struct rpcrdma_ia *ia = &r_xprt->rx_ia; 809 unsigned int count; 810 LIST_HEAD(free); 811 LIST_HEAD(all); 812 813 for (count = 0; count < 32; count++) { 814 struct rpcrdma_mw *mw; 815 int rc; 816 817 mw = kzalloc(sizeof(*mw), GFP_KERNEL); 818 if (!mw) 819 break; 820 821 rc = ia->ri_ops->ro_init_mr(ia, mw); 822 if (rc) { 823 kfree(mw); 824 break; 825 } 826 827 mw->mw_xprt = r_xprt; 828 829 list_add(&mw->mw_list, &free); 830 list_add(&mw->mw_all, &all); 831 } 832 833 spin_lock(&buf->rb_mwlock); 834 list_splice(&free, &buf->rb_mws); 835 list_splice(&all, &buf->rb_all); 836 r_xprt->rx_stats.mrs_allocated += count; 837 spin_unlock(&buf->rb_mwlock); 838 839 dprintk("RPC: %s: created %u MRs\n", __func__, count); 840 } 841 842 static void 843 rpcrdma_mr_refresh_worker(struct work_struct *work) 844 { 845 struct rpcrdma_buffer *buf = container_of(work, struct rpcrdma_buffer, 846 rb_refresh_worker.work); 847 struct rpcrdma_xprt *r_xprt = container_of(buf, struct rpcrdma_xprt, 848 rx_buf); 849 850 rpcrdma_create_mrs(r_xprt); 851 } 852 853 struct rpcrdma_req * 854 rpcrdma_create_req(struct rpcrdma_xprt *r_xprt) 855 { 856 struct rpcrdma_buffer *buffer = &r_xprt->rx_buf; 857 struct rpcrdma_req *req; 858 859 req = kzalloc(sizeof(*req), GFP_KERNEL); 860 if (req == NULL) 861 return ERR_PTR(-ENOMEM); 862 863 INIT_LIST_HEAD(&req->rl_free); 864 spin_lock(&buffer->rb_reqslock); 865 list_add(&req->rl_all, &buffer->rb_allreqs); 866 spin_unlock(&buffer->rb_reqslock); 867 req->rl_cqe.done = rpcrdma_wc_send; 868 req->rl_buffer = &r_xprt->rx_buf; 869 INIT_LIST_HEAD(&req->rl_registered); 870 req->rl_send_wr.next = NULL; 871 req->rl_send_wr.wr_cqe = &req->rl_cqe; 872 req->rl_send_wr.sg_list = req->rl_send_sge; 873 req->rl_send_wr.opcode = IB_WR_SEND; 874 return req; 875 } 876 877 struct rpcrdma_rep * 878 rpcrdma_create_rep(struct rpcrdma_xprt *r_xprt) 879 { 880 struct rpcrdma_create_data_internal *cdata = &r_xprt->rx_data; 881 struct rpcrdma_ia *ia = &r_xprt->rx_ia; 882 struct rpcrdma_rep *rep; 883 int rc; 884 885 rc = -ENOMEM; 886 rep = kzalloc(sizeof(*rep), GFP_KERNEL); 887 if (rep == NULL) 888 goto out; 889 890 rep->rr_rdmabuf = rpcrdma_alloc_regbuf(cdata->inline_rsize, 891 DMA_FROM_DEVICE, GFP_KERNEL); 892 if (IS_ERR(rep->rr_rdmabuf)) { 893 rc = PTR_ERR(rep->rr_rdmabuf); 894 goto out_free; 895 } 896 897 rep->rr_device = ia->ri_device; 898 rep->rr_cqe.done = rpcrdma_wc_receive; 899 rep->rr_rxprt = r_xprt; 900 INIT_WORK(&rep->rr_work, rpcrdma_reply_handler); 901 rep->rr_recv_wr.next = NULL; 902 rep->rr_recv_wr.wr_cqe = &rep->rr_cqe; 903 rep->rr_recv_wr.sg_list = &rep->rr_rdmabuf->rg_iov; 904 rep->rr_recv_wr.num_sge = 1; 905 return rep; 906 907 out_free: 908 kfree(rep); 909 out: 910 return ERR_PTR(rc); 911 } 912 913 int 914 rpcrdma_buffer_create(struct rpcrdma_xprt *r_xprt) 915 { 916 struct rpcrdma_buffer *buf = &r_xprt->rx_buf; 917 int i, rc; 918 919 buf->rb_max_requests = r_xprt->rx_data.max_requests; 920 buf->rb_bc_srv_max_requests = 0; 921 atomic_set(&buf->rb_credits, 1); 922 spin_lock_init(&buf->rb_mwlock); 923 spin_lock_init(&buf->rb_lock); 924 spin_lock_init(&buf->rb_recovery_lock); 925 INIT_LIST_HEAD(&buf->rb_mws); 926 INIT_LIST_HEAD(&buf->rb_all); 927 INIT_LIST_HEAD(&buf->rb_stale_mrs); 928 INIT_DELAYED_WORK(&buf->rb_refresh_worker, 929 rpcrdma_mr_refresh_worker); 930 INIT_DELAYED_WORK(&buf->rb_recovery_worker, 931 rpcrdma_mr_recovery_worker); 932 933 rpcrdma_create_mrs(r_xprt); 934 935 INIT_LIST_HEAD(&buf->rb_send_bufs); 936 INIT_LIST_HEAD(&buf->rb_allreqs); 937 spin_lock_init(&buf->rb_reqslock); 938 for (i = 0; i < buf->rb_max_requests; i++) { 939 struct rpcrdma_req *req; 940 941 req = rpcrdma_create_req(r_xprt); 942 if (IS_ERR(req)) { 943 dprintk("RPC: %s: request buffer %d alloc" 944 " failed\n", __func__, i); 945 rc = PTR_ERR(req); 946 goto out; 947 } 948 req->rl_backchannel = false; 949 list_add(&req->rl_free, &buf->rb_send_bufs); 950 } 951 952 INIT_LIST_HEAD(&buf->rb_recv_bufs); 953 for (i = 0; i < buf->rb_max_requests + RPCRDMA_MAX_BC_REQUESTS; i++) { 954 struct rpcrdma_rep *rep; 955 956 rep = rpcrdma_create_rep(r_xprt); 957 if (IS_ERR(rep)) { 958 dprintk("RPC: %s: reply buffer %d alloc failed\n", 959 __func__, i); 960 rc = PTR_ERR(rep); 961 goto out; 962 } 963 list_add(&rep->rr_list, &buf->rb_recv_bufs); 964 } 965 966 return 0; 967 out: 968 rpcrdma_buffer_destroy(buf); 969 return rc; 970 } 971 972 static struct rpcrdma_req * 973 rpcrdma_buffer_get_req_locked(struct rpcrdma_buffer *buf) 974 { 975 struct rpcrdma_req *req; 976 977 req = list_first_entry(&buf->rb_send_bufs, 978 struct rpcrdma_req, rl_free); 979 list_del(&req->rl_free); 980 return req; 981 } 982 983 static struct rpcrdma_rep * 984 rpcrdma_buffer_get_rep_locked(struct rpcrdma_buffer *buf) 985 { 986 struct rpcrdma_rep *rep; 987 988 rep = list_first_entry(&buf->rb_recv_bufs, 989 struct rpcrdma_rep, rr_list); 990 list_del(&rep->rr_list); 991 return rep; 992 } 993 994 static void 995 rpcrdma_destroy_rep(struct rpcrdma_rep *rep) 996 { 997 rpcrdma_free_regbuf(rep->rr_rdmabuf); 998 kfree(rep); 999 } 1000 1001 void 1002 rpcrdma_destroy_req(struct rpcrdma_req *req) 1003 { 1004 rpcrdma_free_regbuf(req->rl_recvbuf); 1005 rpcrdma_free_regbuf(req->rl_sendbuf); 1006 rpcrdma_free_regbuf(req->rl_rdmabuf); 1007 kfree(req); 1008 } 1009 1010 static void 1011 rpcrdma_destroy_mrs(struct rpcrdma_buffer *buf) 1012 { 1013 struct rpcrdma_xprt *r_xprt = container_of(buf, struct rpcrdma_xprt, 1014 rx_buf); 1015 struct rpcrdma_ia *ia = rdmab_to_ia(buf); 1016 struct rpcrdma_mw *mw; 1017 unsigned int count; 1018 1019 count = 0; 1020 spin_lock(&buf->rb_mwlock); 1021 while (!list_empty(&buf->rb_all)) { 1022 mw = list_entry(buf->rb_all.next, struct rpcrdma_mw, mw_all); 1023 list_del(&mw->mw_all); 1024 1025 spin_unlock(&buf->rb_mwlock); 1026 ia->ri_ops->ro_release_mr(mw); 1027 count++; 1028 spin_lock(&buf->rb_mwlock); 1029 } 1030 spin_unlock(&buf->rb_mwlock); 1031 r_xprt->rx_stats.mrs_allocated = 0; 1032 1033 dprintk("RPC: %s: released %u MRs\n", __func__, count); 1034 } 1035 1036 void 1037 rpcrdma_buffer_destroy(struct rpcrdma_buffer *buf) 1038 { 1039 cancel_delayed_work_sync(&buf->rb_recovery_worker); 1040 1041 while (!list_empty(&buf->rb_recv_bufs)) { 1042 struct rpcrdma_rep *rep; 1043 1044 rep = rpcrdma_buffer_get_rep_locked(buf); 1045 rpcrdma_destroy_rep(rep); 1046 } 1047 buf->rb_send_count = 0; 1048 1049 spin_lock(&buf->rb_reqslock); 1050 while (!list_empty(&buf->rb_allreqs)) { 1051 struct rpcrdma_req *req; 1052 1053 req = list_first_entry(&buf->rb_allreqs, 1054 struct rpcrdma_req, rl_all); 1055 list_del(&req->rl_all); 1056 1057 spin_unlock(&buf->rb_reqslock); 1058 rpcrdma_destroy_req(req); 1059 spin_lock(&buf->rb_reqslock); 1060 } 1061 spin_unlock(&buf->rb_reqslock); 1062 buf->rb_recv_count = 0; 1063 1064 rpcrdma_destroy_mrs(buf); 1065 } 1066 1067 struct rpcrdma_mw * 1068 rpcrdma_get_mw(struct rpcrdma_xprt *r_xprt) 1069 { 1070 struct rpcrdma_buffer *buf = &r_xprt->rx_buf; 1071 struct rpcrdma_mw *mw = NULL; 1072 1073 spin_lock(&buf->rb_mwlock); 1074 if (!list_empty(&buf->rb_mws)) 1075 mw = rpcrdma_pop_mw(&buf->rb_mws); 1076 spin_unlock(&buf->rb_mwlock); 1077 1078 if (!mw) 1079 goto out_nomws; 1080 return mw; 1081 1082 out_nomws: 1083 dprintk("RPC: %s: no MWs available\n", __func__); 1084 schedule_delayed_work(&buf->rb_refresh_worker, 0); 1085 1086 /* Allow the reply handler and refresh worker to run */ 1087 cond_resched(); 1088 1089 return NULL; 1090 } 1091 1092 void 1093 rpcrdma_put_mw(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mw *mw) 1094 { 1095 struct rpcrdma_buffer *buf = &r_xprt->rx_buf; 1096 1097 spin_lock(&buf->rb_mwlock); 1098 rpcrdma_push_mw(mw, &buf->rb_mws); 1099 spin_unlock(&buf->rb_mwlock); 1100 } 1101 1102 static struct rpcrdma_rep * 1103 rpcrdma_buffer_get_rep(struct rpcrdma_buffer *buffers) 1104 { 1105 /* If an RPC previously completed without a reply (say, a 1106 * credential problem or a soft timeout occurs) then hold off 1107 * on supplying more Receive buffers until the number of new 1108 * pending RPCs catches up to the number of posted Receives. 1109 */ 1110 if (unlikely(buffers->rb_send_count < buffers->rb_recv_count)) 1111 return NULL; 1112 1113 if (unlikely(list_empty(&buffers->rb_recv_bufs))) 1114 return NULL; 1115 buffers->rb_recv_count++; 1116 return rpcrdma_buffer_get_rep_locked(buffers); 1117 } 1118 1119 /* 1120 * Get a set of request/reply buffers. 1121 * 1122 * Reply buffer (if available) is attached to send buffer upon return. 1123 */ 1124 struct rpcrdma_req * 1125 rpcrdma_buffer_get(struct rpcrdma_buffer *buffers) 1126 { 1127 struct rpcrdma_req *req; 1128 1129 spin_lock(&buffers->rb_lock); 1130 if (list_empty(&buffers->rb_send_bufs)) 1131 goto out_reqbuf; 1132 buffers->rb_send_count++; 1133 req = rpcrdma_buffer_get_req_locked(buffers); 1134 req->rl_reply = rpcrdma_buffer_get_rep(buffers); 1135 spin_unlock(&buffers->rb_lock); 1136 return req; 1137 1138 out_reqbuf: 1139 spin_unlock(&buffers->rb_lock); 1140 pr_warn("RPC: %s: out of request buffers\n", __func__); 1141 return NULL; 1142 } 1143 1144 /* 1145 * Put request/reply buffers back into pool. 1146 * Pre-decrement counter/array index. 1147 */ 1148 void 1149 rpcrdma_buffer_put(struct rpcrdma_req *req) 1150 { 1151 struct rpcrdma_buffer *buffers = req->rl_buffer; 1152 struct rpcrdma_rep *rep = req->rl_reply; 1153 1154 req->rl_send_wr.num_sge = 0; 1155 req->rl_reply = NULL; 1156 1157 spin_lock(&buffers->rb_lock); 1158 buffers->rb_send_count--; 1159 list_add_tail(&req->rl_free, &buffers->rb_send_bufs); 1160 if (rep) { 1161 buffers->rb_recv_count--; 1162 list_add_tail(&rep->rr_list, &buffers->rb_recv_bufs); 1163 } 1164 spin_unlock(&buffers->rb_lock); 1165 } 1166 1167 /* 1168 * Recover reply buffers from pool. 1169 * This happens when recovering from disconnect. 1170 */ 1171 void 1172 rpcrdma_recv_buffer_get(struct rpcrdma_req *req) 1173 { 1174 struct rpcrdma_buffer *buffers = req->rl_buffer; 1175 1176 spin_lock(&buffers->rb_lock); 1177 req->rl_reply = rpcrdma_buffer_get_rep(buffers); 1178 spin_unlock(&buffers->rb_lock); 1179 } 1180 1181 /* 1182 * Put reply buffers back into pool when not attached to 1183 * request. This happens in error conditions. 1184 */ 1185 void 1186 rpcrdma_recv_buffer_put(struct rpcrdma_rep *rep) 1187 { 1188 struct rpcrdma_buffer *buffers = &rep->rr_rxprt->rx_buf; 1189 1190 spin_lock(&buffers->rb_lock); 1191 buffers->rb_recv_count--; 1192 list_add_tail(&rep->rr_list, &buffers->rb_recv_bufs); 1193 spin_unlock(&buffers->rb_lock); 1194 } 1195 1196 /** 1197 * rpcrdma_alloc_regbuf - allocate and DMA-map memory for SEND/RECV buffers 1198 * @size: size of buffer to be allocated, in bytes 1199 * @direction: direction of data movement 1200 * @flags: GFP flags 1201 * 1202 * Returns an ERR_PTR, or a pointer to a regbuf, a buffer that 1203 * can be persistently DMA-mapped for I/O. 1204 * 1205 * xprtrdma uses a regbuf for posting an outgoing RDMA SEND, or for 1206 * receiving the payload of RDMA RECV operations. During Long Calls 1207 * or Replies they may be registered externally via ro_map. 1208 */ 1209 struct rpcrdma_regbuf * 1210 rpcrdma_alloc_regbuf(size_t size, enum dma_data_direction direction, 1211 gfp_t flags) 1212 { 1213 struct rpcrdma_regbuf *rb; 1214 1215 rb = kmalloc(sizeof(*rb) + size, flags); 1216 if (rb == NULL) 1217 return ERR_PTR(-ENOMEM); 1218 1219 rb->rg_device = NULL; 1220 rb->rg_direction = direction; 1221 rb->rg_iov.length = size; 1222 1223 return rb; 1224 } 1225 1226 /** 1227 * __rpcrdma_map_regbuf - DMA-map a regbuf 1228 * @ia: controlling rpcrdma_ia 1229 * @rb: regbuf to be mapped 1230 */ 1231 bool 1232 __rpcrdma_dma_map_regbuf(struct rpcrdma_ia *ia, struct rpcrdma_regbuf *rb) 1233 { 1234 if (rb->rg_direction == DMA_NONE) 1235 return false; 1236 1237 rb->rg_iov.addr = ib_dma_map_single(ia->ri_device, 1238 (void *)rb->rg_base, 1239 rdmab_length(rb), 1240 rb->rg_direction); 1241 if (ib_dma_mapping_error(ia->ri_device, rdmab_addr(rb))) 1242 return false; 1243 1244 rb->rg_device = ia->ri_device; 1245 rb->rg_iov.lkey = ia->ri_pd->local_dma_lkey; 1246 return true; 1247 } 1248 1249 static void 1250 rpcrdma_dma_unmap_regbuf(struct rpcrdma_regbuf *rb) 1251 { 1252 if (!rpcrdma_regbuf_is_mapped(rb)) 1253 return; 1254 1255 ib_dma_unmap_single(rb->rg_device, rdmab_addr(rb), 1256 rdmab_length(rb), rb->rg_direction); 1257 rb->rg_device = NULL; 1258 } 1259 1260 /** 1261 * rpcrdma_free_regbuf - deregister and free registered buffer 1262 * @rb: regbuf to be deregistered and freed 1263 */ 1264 void 1265 rpcrdma_free_regbuf(struct rpcrdma_regbuf *rb) 1266 { 1267 if (!rb) 1268 return; 1269 1270 rpcrdma_dma_unmap_regbuf(rb); 1271 kfree(rb); 1272 } 1273 1274 /* 1275 * Prepost any receive buffer, then post send. 1276 * 1277 * Receive buffer is donated to hardware, reclaimed upon recv completion. 1278 */ 1279 int 1280 rpcrdma_ep_post(struct rpcrdma_ia *ia, 1281 struct rpcrdma_ep *ep, 1282 struct rpcrdma_req *req) 1283 { 1284 struct ib_send_wr *send_wr = &req->rl_send_wr; 1285 struct ib_send_wr *send_wr_fail; 1286 int rc; 1287 1288 if (req->rl_reply) { 1289 rc = rpcrdma_ep_post_recv(ia, req->rl_reply); 1290 if (rc) 1291 return rc; 1292 req->rl_reply = NULL; 1293 } 1294 1295 dprintk("RPC: %s: posting %d s/g entries\n", 1296 __func__, send_wr->num_sge); 1297 1298 rpcrdma_set_signaled(ep, send_wr); 1299 rc = ib_post_send(ia->ri_id->qp, send_wr, &send_wr_fail); 1300 if (rc) 1301 goto out_postsend_err; 1302 return 0; 1303 1304 out_postsend_err: 1305 pr_err("rpcrdma: RDMA Send ib_post_send returned %i\n", rc); 1306 return -ENOTCONN; 1307 } 1308 1309 int 1310 rpcrdma_ep_post_recv(struct rpcrdma_ia *ia, 1311 struct rpcrdma_rep *rep) 1312 { 1313 struct ib_recv_wr *recv_wr_fail; 1314 int rc; 1315 1316 if (!rpcrdma_dma_map_regbuf(ia, rep->rr_rdmabuf)) 1317 goto out_map; 1318 rc = ib_post_recv(ia->ri_id->qp, &rep->rr_recv_wr, &recv_wr_fail); 1319 if (rc) 1320 goto out_postrecv; 1321 return 0; 1322 1323 out_map: 1324 pr_err("rpcrdma: failed to DMA map the Receive buffer\n"); 1325 return -EIO; 1326 1327 out_postrecv: 1328 pr_err("rpcrdma: ib_post_recv returned %i\n", rc); 1329 return -ENOTCONN; 1330 } 1331 1332 /** 1333 * rpcrdma_ep_post_extra_recv - Post buffers for incoming backchannel requests 1334 * @r_xprt: transport associated with these backchannel resources 1335 * @min_reqs: minimum number of incoming requests expected 1336 * 1337 * Returns zero if all requested buffers were posted, or a negative errno. 1338 */ 1339 int 1340 rpcrdma_ep_post_extra_recv(struct rpcrdma_xprt *r_xprt, unsigned int count) 1341 { 1342 struct rpcrdma_buffer *buffers = &r_xprt->rx_buf; 1343 struct rpcrdma_ia *ia = &r_xprt->rx_ia; 1344 struct rpcrdma_rep *rep; 1345 int rc; 1346 1347 while (count--) { 1348 spin_lock(&buffers->rb_lock); 1349 if (list_empty(&buffers->rb_recv_bufs)) 1350 goto out_reqbuf; 1351 rep = rpcrdma_buffer_get_rep_locked(buffers); 1352 spin_unlock(&buffers->rb_lock); 1353 1354 rc = rpcrdma_ep_post_recv(ia, rep); 1355 if (rc) 1356 goto out_rc; 1357 } 1358 1359 return 0; 1360 1361 out_reqbuf: 1362 spin_unlock(&buffers->rb_lock); 1363 pr_warn("%s: no extra receive buffers\n", __func__); 1364 return -ENOMEM; 1365 1366 out_rc: 1367 rpcrdma_recv_buffer_put(rep); 1368 return rc; 1369 } 1370