1 /* 2 * Copyright (c) 2003-2007 Network Appliance, Inc. All rights reserved. 3 * 4 * This software is available to you under a choice of one of two 5 * licenses. You may choose to be licensed under the terms of the GNU 6 * General Public License (GPL) Version 2, available from the file 7 * COPYING in the main directory of this source tree, or the BSD-type 8 * license below: 9 * 10 * Redistribution and use in source and binary forms, with or without 11 * modification, are permitted provided that the following conditions 12 * are met: 13 * 14 * Redistributions of source code must retain the above copyright 15 * notice, this list of conditions and the following disclaimer. 16 * 17 * Redistributions in binary form must reproduce the above 18 * copyright notice, this list of conditions and the following 19 * disclaimer in the documentation and/or other materials provided 20 * with the distribution. 21 * 22 * Neither the name of the Network Appliance, Inc. nor the names of 23 * its contributors may be used to endorse or promote products 24 * derived from this software without specific prior written 25 * permission. 26 * 27 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 28 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 29 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 30 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 31 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 32 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 33 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 34 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 35 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 36 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 37 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 38 */ 39 40 /* 41 * verbs.c 42 * 43 * Encapsulates the major functions managing: 44 * o adapters 45 * o endpoints 46 * o connections 47 * o buffer memory 48 */ 49 50 #include <linux/interrupt.h> 51 #include <linux/slab.h> 52 #include <linux/prefetch.h> 53 #include <linux/sunrpc/addr.h> 54 #include <linux/sunrpc/svc_rdma.h> 55 #include <asm/bitops.h> 56 #include <linux/module.h> /* try_module_get()/module_put() */ 57 58 #include "xprt_rdma.h" 59 60 /* 61 * Globals/Macros 62 */ 63 64 #if IS_ENABLED(CONFIG_SUNRPC_DEBUG) 65 # define RPCDBG_FACILITY RPCDBG_TRANS 66 #endif 67 68 /* 69 * internal functions 70 */ 71 72 static struct workqueue_struct *rpcrdma_receive_wq; 73 74 int 75 rpcrdma_alloc_wq(void) 76 { 77 struct workqueue_struct *recv_wq; 78 79 recv_wq = alloc_workqueue("xprtrdma_receive", 80 WQ_MEM_RECLAIM | WQ_UNBOUND | WQ_HIGHPRI, 81 0); 82 if (!recv_wq) 83 return -ENOMEM; 84 85 rpcrdma_receive_wq = recv_wq; 86 return 0; 87 } 88 89 void 90 rpcrdma_destroy_wq(void) 91 { 92 struct workqueue_struct *wq; 93 94 if (rpcrdma_receive_wq) { 95 wq = rpcrdma_receive_wq; 96 rpcrdma_receive_wq = NULL; 97 destroy_workqueue(wq); 98 } 99 } 100 101 static void 102 rpcrdma_qp_async_error_upcall(struct ib_event *event, void *context) 103 { 104 struct rpcrdma_ep *ep = context; 105 106 pr_err("RPC: %s: %s on device %s ep %p\n", 107 __func__, ib_event_msg(event->event), 108 event->device->name, context); 109 if (ep->rep_connected == 1) { 110 ep->rep_connected = -EIO; 111 rpcrdma_conn_func(ep); 112 wake_up_all(&ep->rep_connect_wait); 113 } 114 } 115 116 /** 117 * rpcrdma_wc_send - Invoked by RDMA provider for each polled Send WC 118 * @cq: completion queue (ignored) 119 * @wc: completed WR 120 * 121 */ 122 static void 123 rpcrdma_wc_send(struct ib_cq *cq, struct ib_wc *wc) 124 { 125 /* WARNING: Only wr_cqe and status are reliable at this point */ 126 if (wc->status != IB_WC_SUCCESS && wc->status != IB_WC_WR_FLUSH_ERR) 127 pr_err("rpcrdma: Send: %s (%u/0x%x)\n", 128 ib_wc_status_msg(wc->status), 129 wc->status, wc->vendor_err); 130 } 131 132 /* Perform basic sanity checking to avoid using garbage 133 * to update the credit grant value. 134 */ 135 static void 136 rpcrdma_update_granted_credits(struct rpcrdma_rep *rep) 137 { 138 struct rpcrdma_msg *rmsgp = rdmab_to_msg(rep->rr_rdmabuf); 139 struct rpcrdma_buffer *buffer = &rep->rr_rxprt->rx_buf; 140 u32 credits; 141 142 if (rep->rr_len < RPCRDMA_HDRLEN_ERR) 143 return; 144 145 credits = be32_to_cpu(rmsgp->rm_credit); 146 if (credits == 0) 147 credits = 1; /* don't deadlock */ 148 else if (credits > buffer->rb_max_requests) 149 credits = buffer->rb_max_requests; 150 151 atomic_set(&buffer->rb_credits, credits); 152 } 153 154 /** 155 * rpcrdma_wc_receive - Invoked by RDMA provider for each polled Receive WC 156 * @cq: completion queue (ignored) 157 * @wc: completed WR 158 * 159 */ 160 static void 161 rpcrdma_wc_receive(struct ib_cq *cq, struct ib_wc *wc) 162 { 163 struct ib_cqe *cqe = wc->wr_cqe; 164 struct rpcrdma_rep *rep = container_of(cqe, struct rpcrdma_rep, 165 rr_cqe); 166 167 /* WARNING: Only wr_id and status are reliable at this point */ 168 if (wc->status != IB_WC_SUCCESS) 169 goto out_fail; 170 171 /* status == SUCCESS means all fields in wc are trustworthy */ 172 if (wc->opcode != IB_WC_RECV) 173 return; 174 175 dprintk("RPC: %s: rep %p opcode 'recv', length %u: success\n", 176 __func__, rep, wc->byte_len); 177 178 rep->rr_len = wc->byte_len; 179 rep->rr_wc_flags = wc->wc_flags; 180 rep->rr_inv_rkey = wc->ex.invalidate_rkey; 181 182 ib_dma_sync_single_for_cpu(rep->rr_device, 183 rdmab_addr(rep->rr_rdmabuf), 184 rep->rr_len, DMA_FROM_DEVICE); 185 186 rpcrdma_update_granted_credits(rep); 187 188 out_schedule: 189 queue_work(rpcrdma_receive_wq, &rep->rr_work); 190 return; 191 192 out_fail: 193 if (wc->status != IB_WC_WR_FLUSH_ERR) 194 pr_err("rpcrdma: Recv: %s (%u/0x%x)\n", 195 ib_wc_status_msg(wc->status), 196 wc->status, wc->vendor_err); 197 rep->rr_len = RPCRDMA_BAD_LEN; 198 goto out_schedule; 199 } 200 201 static void 202 rpcrdma_update_connect_private(struct rpcrdma_xprt *r_xprt, 203 struct rdma_conn_param *param) 204 { 205 struct rpcrdma_create_data_internal *cdata = &r_xprt->rx_data; 206 const struct rpcrdma_connect_private *pmsg = param->private_data; 207 unsigned int rsize, wsize; 208 209 /* Default settings for RPC-over-RDMA Version One */ 210 r_xprt->rx_ia.ri_reminv_expected = false; 211 rsize = RPCRDMA_V1_DEF_INLINE_SIZE; 212 wsize = RPCRDMA_V1_DEF_INLINE_SIZE; 213 214 if (pmsg && 215 pmsg->cp_magic == rpcrdma_cmp_magic && 216 pmsg->cp_version == RPCRDMA_CMP_VERSION) { 217 r_xprt->rx_ia.ri_reminv_expected = true; 218 rsize = rpcrdma_decode_buffer_size(pmsg->cp_send_size); 219 wsize = rpcrdma_decode_buffer_size(pmsg->cp_recv_size); 220 } 221 222 if (rsize < cdata->inline_rsize) 223 cdata->inline_rsize = rsize; 224 if (wsize < cdata->inline_wsize) 225 cdata->inline_wsize = wsize; 226 pr_info("rpcrdma: max send %u, max recv %u\n", 227 cdata->inline_wsize, cdata->inline_rsize); 228 rpcrdma_set_max_header_sizes(r_xprt); 229 } 230 231 static int 232 rpcrdma_conn_upcall(struct rdma_cm_id *id, struct rdma_cm_event *event) 233 { 234 struct rpcrdma_xprt *xprt = id->context; 235 struct rpcrdma_ia *ia = &xprt->rx_ia; 236 struct rpcrdma_ep *ep = &xprt->rx_ep; 237 #if IS_ENABLED(CONFIG_SUNRPC_DEBUG) 238 struct sockaddr *sap = (struct sockaddr *)&ep->rep_remote_addr; 239 #endif 240 struct ib_qp_attr *attr = &ia->ri_qp_attr; 241 struct ib_qp_init_attr *iattr = &ia->ri_qp_init_attr; 242 int connstate = 0; 243 244 switch (event->event) { 245 case RDMA_CM_EVENT_ADDR_RESOLVED: 246 case RDMA_CM_EVENT_ROUTE_RESOLVED: 247 ia->ri_async_rc = 0; 248 complete(&ia->ri_done); 249 break; 250 case RDMA_CM_EVENT_ADDR_ERROR: 251 ia->ri_async_rc = -EHOSTUNREACH; 252 dprintk("RPC: %s: CM address resolution error, ep 0x%p\n", 253 __func__, ep); 254 complete(&ia->ri_done); 255 break; 256 case RDMA_CM_EVENT_ROUTE_ERROR: 257 ia->ri_async_rc = -ENETUNREACH; 258 dprintk("RPC: %s: CM route resolution error, ep 0x%p\n", 259 __func__, ep); 260 complete(&ia->ri_done); 261 break; 262 case RDMA_CM_EVENT_ESTABLISHED: 263 connstate = 1; 264 ib_query_qp(ia->ri_id->qp, attr, 265 IB_QP_MAX_QP_RD_ATOMIC | IB_QP_MAX_DEST_RD_ATOMIC, 266 iattr); 267 dprintk("RPC: %s: %d responder resources" 268 " (%d initiator)\n", 269 __func__, attr->max_dest_rd_atomic, 270 attr->max_rd_atomic); 271 rpcrdma_update_connect_private(xprt, &event->param.conn); 272 goto connected; 273 case RDMA_CM_EVENT_CONNECT_ERROR: 274 connstate = -ENOTCONN; 275 goto connected; 276 case RDMA_CM_EVENT_UNREACHABLE: 277 connstate = -ENETDOWN; 278 goto connected; 279 case RDMA_CM_EVENT_REJECTED: 280 connstate = -ECONNREFUSED; 281 goto connected; 282 case RDMA_CM_EVENT_DISCONNECTED: 283 connstate = -ECONNABORTED; 284 goto connected; 285 case RDMA_CM_EVENT_DEVICE_REMOVAL: 286 connstate = -ENODEV; 287 connected: 288 dprintk("RPC: %s: %sconnected\n", 289 __func__, connstate > 0 ? "" : "dis"); 290 atomic_set(&xprt->rx_buf.rb_credits, 1); 291 ep->rep_connected = connstate; 292 rpcrdma_conn_func(ep); 293 wake_up_all(&ep->rep_connect_wait); 294 /*FALLTHROUGH*/ 295 default: 296 dprintk("RPC: %s: %pIS:%u (ep 0x%p): %s\n", 297 __func__, sap, rpc_get_port(sap), ep, 298 rdma_event_msg(event->event)); 299 break; 300 } 301 302 #if IS_ENABLED(CONFIG_SUNRPC_DEBUG) 303 if (connstate == 1) { 304 int ird = attr->max_dest_rd_atomic; 305 int tird = ep->rep_remote_cma.responder_resources; 306 307 pr_info("rpcrdma: connection to %pIS:%u on %s, memreg '%s', %d credits, %d responders%s\n", 308 sap, rpc_get_port(sap), 309 ia->ri_device->name, 310 ia->ri_ops->ro_displayname, 311 xprt->rx_buf.rb_max_requests, 312 ird, ird < 4 && ird < tird / 2 ? " (low!)" : ""); 313 } else if (connstate < 0) { 314 pr_info("rpcrdma: connection to %pIS:%u closed (%d)\n", 315 sap, rpc_get_port(sap), connstate); 316 } 317 #endif 318 319 return 0; 320 } 321 322 static void rpcrdma_destroy_id(struct rdma_cm_id *id) 323 { 324 if (id) { 325 module_put(id->device->owner); 326 rdma_destroy_id(id); 327 } 328 } 329 330 static struct rdma_cm_id * 331 rpcrdma_create_id(struct rpcrdma_xprt *xprt, 332 struct rpcrdma_ia *ia, struct sockaddr *addr) 333 { 334 struct rdma_cm_id *id; 335 int rc; 336 337 init_completion(&ia->ri_done); 338 339 id = rdma_create_id(&init_net, rpcrdma_conn_upcall, xprt, RDMA_PS_TCP, 340 IB_QPT_RC); 341 if (IS_ERR(id)) { 342 rc = PTR_ERR(id); 343 dprintk("RPC: %s: rdma_create_id() failed %i\n", 344 __func__, rc); 345 return id; 346 } 347 348 ia->ri_async_rc = -ETIMEDOUT; 349 rc = rdma_resolve_addr(id, NULL, addr, RDMA_RESOLVE_TIMEOUT); 350 if (rc) { 351 dprintk("RPC: %s: rdma_resolve_addr() failed %i\n", 352 __func__, rc); 353 goto out; 354 } 355 wait_for_completion_interruptible_timeout(&ia->ri_done, 356 msecs_to_jiffies(RDMA_RESOLVE_TIMEOUT) + 1); 357 358 /* FIXME: 359 * Until xprtrdma supports DEVICE_REMOVAL, the provider must 360 * be pinned while there are active NFS/RDMA mounts to prevent 361 * hangs and crashes at umount time. 362 */ 363 if (!ia->ri_async_rc && !try_module_get(id->device->owner)) { 364 dprintk("RPC: %s: Failed to get device module\n", 365 __func__); 366 ia->ri_async_rc = -ENODEV; 367 } 368 rc = ia->ri_async_rc; 369 if (rc) 370 goto out; 371 372 ia->ri_async_rc = -ETIMEDOUT; 373 rc = rdma_resolve_route(id, RDMA_RESOLVE_TIMEOUT); 374 if (rc) { 375 dprintk("RPC: %s: rdma_resolve_route() failed %i\n", 376 __func__, rc); 377 goto put; 378 } 379 wait_for_completion_interruptible_timeout(&ia->ri_done, 380 msecs_to_jiffies(RDMA_RESOLVE_TIMEOUT) + 1); 381 rc = ia->ri_async_rc; 382 if (rc) 383 goto put; 384 385 return id; 386 put: 387 module_put(id->device->owner); 388 out: 389 rdma_destroy_id(id); 390 return ERR_PTR(rc); 391 } 392 393 /* 394 * Exported functions. 395 */ 396 397 /* 398 * Open and initialize an Interface Adapter. 399 * o initializes fields of struct rpcrdma_ia, including 400 * interface and provider attributes and protection zone. 401 */ 402 int 403 rpcrdma_ia_open(struct rpcrdma_xprt *xprt, struct sockaddr *addr, int memreg) 404 { 405 struct rpcrdma_ia *ia = &xprt->rx_ia; 406 int rc; 407 408 ia->ri_id = rpcrdma_create_id(xprt, ia, addr); 409 if (IS_ERR(ia->ri_id)) { 410 rc = PTR_ERR(ia->ri_id); 411 goto out1; 412 } 413 ia->ri_device = ia->ri_id->device; 414 415 ia->ri_pd = ib_alloc_pd(ia->ri_device, 0); 416 if (IS_ERR(ia->ri_pd)) { 417 rc = PTR_ERR(ia->ri_pd); 418 pr_err("rpcrdma: ib_alloc_pd() returned %d\n", rc); 419 goto out2; 420 } 421 422 switch (memreg) { 423 case RPCRDMA_FRMR: 424 if (frwr_is_supported(ia)) { 425 ia->ri_ops = &rpcrdma_frwr_memreg_ops; 426 break; 427 } 428 /*FALLTHROUGH*/ 429 case RPCRDMA_MTHCAFMR: 430 if (fmr_is_supported(ia)) { 431 ia->ri_ops = &rpcrdma_fmr_memreg_ops; 432 break; 433 } 434 /*FALLTHROUGH*/ 435 default: 436 pr_err("rpcrdma: Unsupported memory registration mode: %d\n", 437 memreg); 438 rc = -EINVAL; 439 goto out3; 440 } 441 442 return 0; 443 444 out3: 445 ib_dealloc_pd(ia->ri_pd); 446 ia->ri_pd = NULL; 447 out2: 448 rpcrdma_destroy_id(ia->ri_id); 449 ia->ri_id = NULL; 450 out1: 451 return rc; 452 } 453 454 /* 455 * Clean up/close an IA. 456 * o if event handles and PD have been initialized, free them. 457 * o close the IA 458 */ 459 void 460 rpcrdma_ia_close(struct rpcrdma_ia *ia) 461 { 462 dprintk("RPC: %s: entering\n", __func__); 463 if (ia->ri_id != NULL && !IS_ERR(ia->ri_id)) { 464 if (ia->ri_id->qp) 465 rdma_destroy_qp(ia->ri_id); 466 rpcrdma_destroy_id(ia->ri_id); 467 ia->ri_id = NULL; 468 } 469 470 /* If the pd is still busy, xprtrdma missed freeing a resource */ 471 if (ia->ri_pd && !IS_ERR(ia->ri_pd)) 472 ib_dealloc_pd(ia->ri_pd); 473 } 474 475 /* 476 * Create unconnected endpoint. 477 */ 478 int 479 rpcrdma_ep_create(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia, 480 struct rpcrdma_create_data_internal *cdata) 481 { 482 struct rpcrdma_connect_private *pmsg = &ep->rep_cm_private; 483 struct ib_cq *sendcq, *recvcq; 484 unsigned int max_qp_wr; 485 int rc; 486 487 if (ia->ri_device->attrs.max_sge < RPCRDMA_MAX_SEND_SGES) { 488 dprintk("RPC: %s: insufficient sge's available\n", 489 __func__); 490 return -ENOMEM; 491 } 492 493 if (ia->ri_device->attrs.max_qp_wr <= RPCRDMA_BACKWARD_WRS) { 494 dprintk("RPC: %s: insufficient wqe's available\n", 495 __func__); 496 return -ENOMEM; 497 } 498 max_qp_wr = ia->ri_device->attrs.max_qp_wr - RPCRDMA_BACKWARD_WRS - 1; 499 500 /* check provider's send/recv wr limits */ 501 if (cdata->max_requests > max_qp_wr) 502 cdata->max_requests = max_qp_wr; 503 504 ep->rep_attr.event_handler = rpcrdma_qp_async_error_upcall; 505 ep->rep_attr.qp_context = ep; 506 ep->rep_attr.srq = NULL; 507 ep->rep_attr.cap.max_send_wr = cdata->max_requests; 508 ep->rep_attr.cap.max_send_wr += RPCRDMA_BACKWARD_WRS; 509 ep->rep_attr.cap.max_send_wr += 1; /* drain cqe */ 510 rc = ia->ri_ops->ro_open(ia, ep, cdata); 511 if (rc) 512 return rc; 513 ep->rep_attr.cap.max_recv_wr = cdata->max_requests; 514 ep->rep_attr.cap.max_recv_wr += RPCRDMA_BACKWARD_WRS; 515 ep->rep_attr.cap.max_recv_wr += 1; /* drain cqe */ 516 ep->rep_attr.cap.max_send_sge = RPCRDMA_MAX_SEND_SGES; 517 ep->rep_attr.cap.max_recv_sge = 1; 518 ep->rep_attr.cap.max_inline_data = 0; 519 ep->rep_attr.sq_sig_type = IB_SIGNAL_REQ_WR; 520 ep->rep_attr.qp_type = IB_QPT_RC; 521 ep->rep_attr.port_num = ~0; 522 523 dprintk("RPC: %s: requested max: dtos: send %d recv %d; " 524 "iovs: send %d recv %d\n", 525 __func__, 526 ep->rep_attr.cap.max_send_wr, 527 ep->rep_attr.cap.max_recv_wr, 528 ep->rep_attr.cap.max_send_sge, 529 ep->rep_attr.cap.max_recv_sge); 530 531 /* set trigger for requesting send completion */ 532 ep->rep_cqinit = ep->rep_attr.cap.max_send_wr/2 - 1; 533 if (ep->rep_cqinit <= 2) 534 ep->rep_cqinit = 0; /* always signal? */ 535 INIT_CQCOUNT(ep); 536 init_waitqueue_head(&ep->rep_connect_wait); 537 INIT_DELAYED_WORK(&ep->rep_connect_worker, rpcrdma_connect_worker); 538 539 sendcq = ib_alloc_cq(ia->ri_device, NULL, 540 ep->rep_attr.cap.max_send_wr + 1, 541 0, IB_POLL_SOFTIRQ); 542 if (IS_ERR(sendcq)) { 543 rc = PTR_ERR(sendcq); 544 dprintk("RPC: %s: failed to create send CQ: %i\n", 545 __func__, rc); 546 goto out1; 547 } 548 549 recvcq = ib_alloc_cq(ia->ri_device, NULL, 550 ep->rep_attr.cap.max_recv_wr + 1, 551 0, IB_POLL_SOFTIRQ); 552 if (IS_ERR(recvcq)) { 553 rc = PTR_ERR(recvcq); 554 dprintk("RPC: %s: failed to create recv CQ: %i\n", 555 __func__, rc); 556 goto out2; 557 } 558 559 ep->rep_attr.send_cq = sendcq; 560 ep->rep_attr.recv_cq = recvcq; 561 562 /* Initialize cma parameters */ 563 memset(&ep->rep_remote_cma, 0, sizeof(ep->rep_remote_cma)); 564 565 /* Prepare RDMA-CM private message */ 566 pmsg->cp_magic = rpcrdma_cmp_magic; 567 pmsg->cp_version = RPCRDMA_CMP_VERSION; 568 pmsg->cp_flags |= ia->ri_ops->ro_send_w_inv_ok; 569 pmsg->cp_send_size = rpcrdma_encode_buffer_size(cdata->inline_wsize); 570 pmsg->cp_recv_size = rpcrdma_encode_buffer_size(cdata->inline_rsize); 571 ep->rep_remote_cma.private_data = pmsg; 572 ep->rep_remote_cma.private_data_len = sizeof(*pmsg); 573 574 /* Client offers RDMA Read but does not initiate */ 575 ep->rep_remote_cma.initiator_depth = 0; 576 if (ia->ri_device->attrs.max_qp_rd_atom > 32) /* arbitrary but <= 255 */ 577 ep->rep_remote_cma.responder_resources = 32; 578 else 579 ep->rep_remote_cma.responder_resources = 580 ia->ri_device->attrs.max_qp_rd_atom; 581 582 /* Limit transport retries so client can detect server 583 * GID changes quickly. RPC layer handles re-establishing 584 * transport connection and retransmission. 585 */ 586 ep->rep_remote_cma.retry_count = 6; 587 588 /* RPC-over-RDMA handles its own flow control. In addition, 589 * make all RNR NAKs visible so we know that RPC-over-RDMA 590 * flow control is working correctly (no NAKs should be seen). 591 */ 592 ep->rep_remote_cma.flow_control = 0; 593 ep->rep_remote_cma.rnr_retry_count = 0; 594 595 return 0; 596 597 out2: 598 ib_free_cq(sendcq); 599 out1: 600 return rc; 601 } 602 603 /* 604 * rpcrdma_ep_destroy 605 * 606 * Disconnect and destroy endpoint. After this, the only 607 * valid operations on the ep are to free it (if dynamically 608 * allocated) or re-create it. 609 */ 610 void 611 rpcrdma_ep_destroy(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia) 612 { 613 dprintk("RPC: %s: entering, connected is %d\n", 614 __func__, ep->rep_connected); 615 616 cancel_delayed_work_sync(&ep->rep_connect_worker); 617 618 if (ia->ri_id->qp) { 619 rpcrdma_ep_disconnect(ep, ia); 620 rdma_destroy_qp(ia->ri_id); 621 ia->ri_id->qp = NULL; 622 } 623 624 ib_free_cq(ep->rep_attr.recv_cq); 625 ib_free_cq(ep->rep_attr.send_cq); 626 } 627 628 /* 629 * Connect unconnected endpoint. 630 */ 631 int 632 rpcrdma_ep_connect(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia) 633 { 634 struct rdma_cm_id *id, *old; 635 int rc = 0; 636 int retry_count = 0; 637 638 if (ep->rep_connected != 0) { 639 struct rpcrdma_xprt *xprt; 640 retry: 641 dprintk("RPC: %s: reconnecting...\n", __func__); 642 643 rpcrdma_ep_disconnect(ep, ia); 644 645 xprt = container_of(ia, struct rpcrdma_xprt, rx_ia); 646 id = rpcrdma_create_id(xprt, ia, 647 (struct sockaddr *)&xprt->rx_data.addr); 648 if (IS_ERR(id)) { 649 rc = -EHOSTUNREACH; 650 goto out; 651 } 652 /* TEMP TEMP TEMP - fail if new device: 653 * Deregister/remarshal *all* requests! 654 * Close and recreate adapter, pd, etc! 655 * Re-determine all attributes still sane! 656 * More stuff I haven't thought of! 657 * Rrrgh! 658 */ 659 if (ia->ri_device != id->device) { 660 printk("RPC: %s: can't reconnect on " 661 "different device!\n", __func__); 662 rpcrdma_destroy_id(id); 663 rc = -ENETUNREACH; 664 goto out; 665 } 666 /* END TEMP */ 667 rc = rdma_create_qp(id, ia->ri_pd, &ep->rep_attr); 668 if (rc) { 669 dprintk("RPC: %s: rdma_create_qp failed %i\n", 670 __func__, rc); 671 rpcrdma_destroy_id(id); 672 rc = -ENETUNREACH; 673 goto out; 674 } 675 676 old = ia->ri_id; 677 ia->ri_id = id; 678 679 rdma_destroy_qp(old); 680 rpcrdma_destroy_id(old); 681 } else { 682 dprintk("RPC: %s: connecting...\n", __func__); 683 rc = rdma_create_qp(ia->ri_id, ia->ri_pd, &ep->rep_attr); 684 if (rc) { 685 dprintk("RPC: %s: rdma_create_qp failed %i\n", 686 __func__, rc); 687 /* do not update ep->rep_connected */ 688 return -ENETUNREACH; 689 } 690 } 691 692 ep->rep_connected = 0; 693 694 rc = rdma_connect(ia->ri_id, &ep->rep_remote_cma); 695 if (rc) { 696 dprintk("RPC: %s: rdma_connect() failed with %i\n", 697 __func__, rc); 698 goto out; 699 } 700 701 wait_event_interruptible(ep->rep_connect_wait, ep->rep_connected != 0); 702 703 /* 704 * Check state. A non-peer reject indicates no listener 705 * (ECONNREFUSED), which may be a transient state. All 706 * others indicate a transport condition which has already 707 * undergone a best-effort. 708 */ 709 if (ep->rep_connected == -ECONNREFUSED && 710 ++retry_count <= RDMA_CONNECT_RETRY_MAX) { 711 dprintk("RPC: %s: non-peer_reject, retry\n", __func__); 712 goto retry; 713 } 714 if (ep->rep_connected <= 0) { 715 /* Sometimes, the only way to reliably connect to remote 716 * CMs is to use same nonzero values for ORD and IRD. */ 717 if (retry_count++ <= RDMA_CONNECT_RETRY_MAX + 1 && 718 (ep->rep_remote_cma.responder_resources == 0 || 719 ep->rep_remote_cma.initiator_depth != 720 ep->rep_remote_cma.responder_resources)) { 721 if (ep->rep_remote_cma.responder_resources == 0) 722 ep->rep_remote_cma.responder_resources = 1; 723 ep->rep_remote_cma.initiator_depth = 724 ep->rep_remote_cma.responder_resources; 725 goto retry; 726 } 727 rc = ep->rep_connected; 728 } else { 729 struct rpcrdma_xprt *r_xprt; 730 unsigned int extras; 731 732 dprintk("RPC: %s: connected\n", __func__); 733 734 r_xprt = container_of(ia, struct rpcrdma_xprt, rx_ia); 735 extras = r_xprt->rx_buf.rb_bc_srv_max_requests; 736 737 if (extras) { 738 rc = rpcrdma_ep_post_extra_recv(r_xprt, extras); 739 if (rc) { 740 pr_warn("%s: rpcrdma_ep_post_extra_recv: %i\n", 741 __func__, rc); 742 rc = 0; 743 } 744 } 745 } 746 747 out: 748 if (rc) 749 ep->rep_connected = rc; 750 return rc; 751 } 752 753 /* 754 * rpcrdma_ep_disconnect 755 * 756 * This is separate from destroy to facilitate the ability 757 * to reconnect without recreating the endpoint. 758 * 759 * This call is not reentrant, and must not be made in parallel 760 * on the same endpoint. 761 */ 762 void 763 rpcrdma_ep_disconnect(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia) 764 { 765 int rc; 766 767 rc = rdma_disconnect(ia->ri_id); 768 if (!rc) { 769 /* returns without wait if not connected */ 770 wait_event_interruptible(ep->rep_connect_wait, 771 ep->rep_connected != 1); 772 dprintk("RPC: %s: after wait, %sconnected\n", __func__, 773 (ep->rep_connected == 1) ? "still " : "dis"); 774 } else { 775 dprintk("RPC: %s: rdma_disconnect %i\n", __func__, rc); 776 ep->rep_connected = rc; 777 } 778 779 ib_drain_qp(ia->ri_id->qp); 780 } 781 782 static void 783 rpcrdma_mr_recovery_worker(struct work_struct *work) 784 { 785 struct rpcrdma_buffer *buf = container_of(work, struct rpcrdma_buffer, 786 rb_recovery_worker.work); 787 struct rpcrdma_mw *mw; 788 789 spin_lock(&buf->rb_recovery_lock); 790 while (!list_empty(&buf->rb_stale_mrs)) { 791 mw = list_first_entry(&buf->rb_stale_mrs, 792 struct rpcrdma_mw, mw_list); 793 list_del_init(&mw->mw_list); 794 spin_unlock(&buf->rb_recovery_lock); 795 796 dprintk("RPC: %s: recovering MR %p\n", __func__, mw); 797 mw->mw_xprt->rx_ia.ri_ops->ro_recover_mr(mw); 798 799 spin_lock(&buf->rb_recovery_lock); 800 } 801 spin_unlock(&buf->rb_recovery_lock); 802 } 803 804 void 805 rpcrdma_defer_mr_recovery(struct rpcrdma_mw *mw) 806 { 807 struct rpcrdma_xprt *r_xprt = mw->mw_xprt; 808 struct rpcrdma_buffer *buf = &r_xprt->rx_buf; 809 810 spin_lock(&buf->rb_recovery_lock); 811 list_add(&mw->mw_list, &buf->rb_stale_mrs); 812 spin_unlock(&buf->rb_recovery_lock); 813 814 schedule_delayed_work(&buf->rb_recovery_worker, 0); 815 } 816 817 static void 818 rpcrdma_create_mrs(struct rpcrdma_xprt *r_xprt) 819 { 820 struct rpcrdma_buffer *buf = &r_xprt->rx_buf; 821 struct rpcrdma_ia *ia = &r_xprt->rx_ia; 822 unsigned int count; 823 LIST_HEAD(free); 824 LIST_HEAD(all); 825 826 for (count = 0; count < 32; count++) { 827 struct rpcrdma_mw *mw; 828 int rc; 829 830 mw = kzalloc(sizeof(*mw), GFP_KERNEL); 831 if (!mw) 832 break; 833 834 rc = ia->ri_ops->ro_init_mr(ia, mw); 835 if (rc) { 836 kfree(mw); 837 break; 838 } 839 840 mw->mw_xprt = r_xprt; 841 842 list_add(&mw->mw_list, &free); 843 list_add(&mw->mw_all, &all); 844 } 845 846 spin_lock(&buf->rb_mwlock); 847 list_splice(&free, &buf->rb_mws); 848 list_splice(&all, &buf->rb_all); 849 r_xprt->rx_stats.mrs_allocated += count; 850 spin_unlock(&buf->rb_mwlock); 851 852 dprintk("RPC: %s: created %u MRs\n", __func__, count); 853 } 854 855 static void 856 rpcrdma_mr_refresh_worker(struct work_struct *work) 857 { 858 struct rpcrdma_buffer *buf = container_of(work, struct rpcrdma_buffer, 859 rb_refresh_worker.work); 860 struct rpcrdma_xprt *r_xprt = container_of(buf, struct rpcrdma_xprt, 861 rx_buf); 862 863 rpcrdma_create_mrs(r_xprt); 864 } 865 866 struct rpcrdma_req * 867 rpcrdma_create_req(struct rpcrdma_xprt *r_xprt) 868 { 869 struct rpcrdma_buffer *buffer = &r_xprt->rx_buf; 870 struct rpcrdma_req *req; 871 872 req = kzalloc(sizeof(*req), GFP_KERNEL); 873 if (req == NULL) 874 return ERR_PTR(-ENOMEM); 875 876 INIT_LIST_HEAD(&req->rl_free); 877 spin_lock(&buffer->rb_reqslock); 878 list_add(&req->rl_all, &buffer->rb_allreqs); 879 spin_unlock(&buffer->rb_reqslock); 880 req->rl_cqe.done = rpcrdma_wc_send; 881 req->rl_buffer = &r_xprt->rx_buf; 882 INIT_LIST_HEAD(&req->rl_registered); 883 req->rl_send_wr.next = NULL; 884 req->rl_send_wr.wr_cqe = &req->rl_cqe; 885 req->rl_send_wr.sg_list = req->rl_send_sge; 886 req->rl_send_wr.opcode = IB_WR_SEND; 887 return req; 888 } 889 890 struct rpcrdma_rep * 891 rpcrdma_create_rep(struct rpcrdma_xprt *r_xprt) 892 { 893 struct rpcrdma_create_data_internal *cdata = &r_xprt->rx_data; 894 struct rpcrdma_ia *ia = &r_xprt->rx_ia; 895 struct rpcrdma_rep *rep; 896 int rc; 897 898 rc = -ENOMEM; 899 rep = kzalloc(sizeof(*rep), GFP_KERNEL); 900 if (rep == NULL) 901 goto out; 902 903 rep->rr_rdmabuf = rpcrdma_alloc_regbuf(cdata->inline_rsize, 904 DMA_FROM_DEVICE, GFP_KERNEL); 905 if (IS_ERR(rep->rr_rdmabuf)) { 906 rc = PTR_ERR(rep->rr_rdmabuf); 907 goto out_free; 908 } 909 910 rep->rr_device = ia->ri_device; 911 rep->rr_cqe.done = rpcrdma_wc_receive; 912 rep->rr_rxprt = r_xprt; 913 INIT_WORK(&rep->rr_work, rpcrdma_reply_handler); 914 rep->rr_recv_wr.next = NULL; 915 rep->rr_recv_wr.wr_cqe = &rep->rr_cqe; 916 rep->rr_recv_wr.sg_list = &rep->rr_rdmabuf->rg_iov; 917 rep->rr_recv_wr.num_sge = 1; 918 return rep; 919 920 out_free: 921 kfree(rep); 922 out: 923 return ERR_PTR(rc); 924 } 925 926 int 927 rpcrdma_buffer_create(struct rpcrdma_xprt *r_xprt) 928 { 929 struct rpcrdma_buffer *buf = &r_xprt->rx_buf; 930 int i, rc; 931 932 buf->rb_max_requests = r_xprt->rx_data.max_requests; 933 buf->rb_bc_srv_max_requests = 0; 934 atomic_set(&buf->rb_credits, 1); 935 spin_lock_init(&buf->rb_mwlock); 936 spin_lock_init(&buf->rb_lock); 937 spin_lock_init(&buf->rb_recovery_lock); 938 INIT_LIST_HEAD(&buf->rb_mws); 939 INIT_LIST_HEAD(&buf->rb_all); 940 INIT_LIST_HEAD(&buf->rb_stale_mrs); 941 INIT_DELAYED_WORK(&buf->rb_refresh_worker, 942 rpcrdma_mr_refresh_worker); 943 INIT_DELAYED_WORK(&buf->rb_recovery_worker, 944 rpcrdma_mr_recovery_worker); 945 946 rpcrdma_create_mrs(r_xprt); 947 948 INIT_LIST_HEAD(&buf->rb_send_bufs); 949 INIT_LIST_HEAD(&buf->rb_allreqs); 950 spin_lock_init(&buf->rb_reqslock); 951 for (i = 0; i < buf->rb_max_requests; i++) { 952 struct rpcrdma_req *req; 953 954 req = rpcrdma_create_req(r_xprt); 955 if (IS_ERR(req)) { 956 dprintk("RPC: %s: request buffer %d alloc" 957 " failed\n", __func__, i); 958 rc = PTR_ERR(req); 959 goto out; 960 } 961 req->rl_backchannel = false; 962 list_add(&req->rl_free, &buf->rb_send_bufs); 963 } 964 965 INIT_LIST_HEAD(&buf->rb_recv_bufs); 966 for (i = 0; i < buf->rb_max_requests + RPCRDMA_MAX_BC_REQUESTS; i++) { 967 struct rpcrdma_rep *rep; 968 969 rep = rpcrdma_create_rep(r_xprt); 970 if (IS_ERR(rep)) { 971 dprintk("RPC: %s: reply buffer %d alloc failed\n", 972 __func__, i); 973 rc = PTR_ERR(rep); 974 goto out; 975 } 976 list_add(&rep->rr_list, &buf->rb_recv_bufs); 977 } 978 979 return 0; 980 out: 981 rpcrdma_buffer_destroy(buf); 982 return rc; 983 } 984 985 static struct rpcrdma_req * 986 rpcrdma_buffer_get_req_locked(struct rpcrdma_buffer *buf) 987 { 988 struct rpcrdma_req *req; 989 990 req = list_first_entry(&buf->rb_send_bufs, 991 struct rpcrdma_req, rl_free); 992 list_del(&req->rl_free); 993 return req; 994 } 995 996 static struct rpcrdma_rep * 997 rpcrdma_buffer_get_rep_locked(struct rpcrdma_buffer *buf) 998 { 999 struct rpcrdma_rep *rep; 1000 1001 rep = list_first_entry(&buf->rb_recv_bufs, 1002 struct rpcrdma_rep, rr_list); 1003 list_del(&rep->rr_list); 1004 return rep; 1005 } 1006 1007 static void 1008 rpcrdma_destroy_rep(struct rpcrdma_rep *rep) 1009 { 1010 rpcrdma_free_regbuf(rep->rr_rdmabuf); 1011 kfree(rep); 1012 } 1013 1014 void 1015 rpcrdma_destroy_req(struct rpcrdma_req *req) 1016 { 1017 rpcrdma_free_regbuf(req->rl_recvbuf); 1018 rpcrdma_free_regbuf(req->rl_sendbuf); 1019 rpcrdma_free_regbuf(req->rl_rdmabuf); 1020 kfree(req); 1021 } 1022 1023 static void 1024 rpcrdma_destroy_mrs(struct rpcrdma_buffer *buf) 1025 { 1026 struct rpcrdma_xprt *r_xprt = container_of(buf, struct rpcrdma_xprt, 1027 rx_buf); 1028 struct rpcrdma_ia *ia = rdmab_to_ia(buf); 1029 struct rpcrdma_mw *mw; 1030 unsigned int count; 1031 1032 count = 0; 1033 spin_lock(&buf->rb_mwlock); 1034 while (!list_empty(&buf->rb_all)) { 1035 mw = list_entry(buf->rb_all.next, struct rpcrdma_mw, mw_all); 1036 list_del(&mw->mw_all); 1037 1038 spin_unlock(&buf->rb_mwlock); 1039 ia->ri_ops->ro_release_mr(mw); 1040 count++; 1041 spin_lock(&buf->rb_mwlock); 1042 } 1043 spin_unlock(&buf->rb_mwlock); 1044 r_xprt->rx_stats.mrs_allocated = 0; 1045 1046 dprintk("RPC: %s: released %u MRs\n", __func__, count); 1047 } 1048 1049 void 1050 rpcrdma_buffer_destroy(struct rpcrdma_buffer *buf) 1051 { 1052 cancel_delayed_work_sync(&buf->rb_recovery_worker); 1053 1054 while (!list_empty(&buf->rb_recv_bufs)) { 1055 struct rpcrdma_rep *rep; 1056 1057 rep = rpcrdma_buffer_get_rep_locked(buf); 1058 rpcrdma_destroy_rep(rep); 1059 } 1060 buf->rb_send_count = 0; 1061 1062 spin_lock(&buf->rb_reqslock); 1063 while (!list_empty(&buf->rb_allreqs)) { 1064 struct rpcrdma_req *req; 1065 1066 req = list_first_entry(&buf->rb_allreqs, 1067 struct rpcrdma_req, rl_all); 1068 list_del(&req->rl_all); 1069 1070 spin_unlock(&buf->rb_reqslock); 1071 rpcrdma_destroy_req(req); 1072 spin_lock(&buf->rb_reqslock); 1073 } 1074 spin_unlock(&buf->rb_reqslock); 1075 buf->rb_recv_count = 0; 1076 1077 rpcrdma_destroy_mrs(buf); 1078 } 1079 1080 struct rpcrdma_mw * 1081 rpcrdma_get_mw(struct rpcrdma_xprt *r_xprt) 1082 { 1083 struct rpcrdma_buffer *buf = &r_xprt->rx_buf; 1084 struct rpcrdma_mw *mw = NULL; 1085 1086 spin_lock(&buf->rb_mwlock); 1087 if (!list_empty(&buf->rb_mws)) { 1088 mw = list_first_entry(&buf->rb_mws, 1089 struct rpcrdma_mw, mw_list); 1090 list_del_init(&mw->mw_list); 1091 } 1092 spin_unlock(&buf->rb_mwlock); 1093 1094 if (!mw) 1095 goto out_nomws; 1096 return mw; 1097 1098 out_nomws: 1099 dprintk("RPC: %s: no MWs available\n", __func__); 1100 schedule_delayed_work(&buf->rb_refresh_worker, 0); 1101 1102 /* Allow the reply handler and refresh worker to run */ 1103 cond_resched(); 1104 1105 return NULL; 1106 } 1107 1108 void 1109 rpcrdma_put_mw(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mw *mw) 1110 { 1111 struct rpcrdma_buffer *buf = &r_xprt->rx_buf; 1112 1113 spin_lock(&buf->rb_mwlock); 1114 list_add_tail(&mw->mw_list, &buf->rb_mws); 1115 spin_unlock(&buf->rb_mwlock); 1116 } 1117 1118 static struct rpcrdma_rep * 1119 rpcrdma_buffer_get_rep(struct rpcrdma_buffer *buffers) 1120 { 1121 /* If an RPC previously completed without a reply (say, a 1122 * credential problem or a soft timeout occurs) then hold off 1123 * on supplying more Receive buffers until the number of new 1124 * pending RPCs catches up to the number of posted Receives. 1125 */ 1126 if (unlikely(buffers->rb_send_count < buffers->rb_recv_count)) 1127 return NULL; 1128 1129 if (unlikely(list_empty(&buffers->rb_recv_bufs))) 1130 return NULL; 1131 buffers->rb_recv_count++; 1132 return rpcrdma_buffer_get_rep_locked(buffers); 1133 } 1134 1135 /* 1136 * Get a set of request/reply buffers. 1137 * 1138 * Reply buffer (if available) is attached to send buffer upon return. 1139 */ 1140 struct rpcrdma_req * 1141 rpcrdma_buffer_get(struct rpcrdma_buffer *buffers) 1142 { 1143 struct rpcrdma_req *req; 1144 1145 spin_lock(&buffers->rb_lock); 1146 if (list_empty(&buffers->rb_send_bufs)) 1147 goto out_reqbuf; 1148 buffers->rb_send_count++; 1149 req = rpcrdma_buffer_get_req_locked(buffers); 1150 req->rl_reply = rpcrdma_buffer_get_rep(buffers); 1151 spin_unlock(&buffers->rb_lock); 1152 return req; 1153 1154 out_reqbuf: 1155 spin_unlock(&buffers->rb_lock); 1156 pr_warn("RPC: %s: out of request buffers\n", __func__); 1157 return NULL; 1158 } 1159 1160 /* 1161 * Put request/reply buffers back into pool. 1162 * Pre-decrement counter/array index. 1163 */ 1164 void 1165 rpcrdma_buffer_put(struct rpcrdma_req *req) 1166 { 1167 struct rpcrdma_buffer *buffers = req->rl_buffer; 1168 struct rpcrdma_rep *rep = req->rl_reply; 1169 1170 req->rl_send_wr.num_sge = 0; 1171 req->rl_reply = NULL; 1172 1173 spin_lock(&buffers->rb_lock); 1174 buffers->rb_send_count--; 1175 list_add_tail(&req->rl_free, &buffers->rb_send_bufs); 1176 if (rep) { 1177 buffers->rb_recv_count--; 1178 list_add_tail(&rep->rr_list, &buffers->rb_recv_bufs); 1179 } 1180 spin_unlock(&buffers->rb_lock); 1181 } 1182 1183 /* 1184 * Recover reply buffers from pool. 1185 * This happens when recovering from disconnect. 1186 */ 1187 void 1188 rpcrdma_recv_buffer_get(struct rpcrdma_req *req) 1189 { 1190 struct rpcrdma_buffer *buffers = req->rl_buffer; 1191 1192 spin_lock(&buffers->rb_lock); 1193 req->rl_reply = rpcrdma_buffer_get_rep(buffers); 1194 spin_unlock(&buffers->rb_lock); 1195 } 1196 1197 /* 1198 * Put reply buffers back into pool when not attached to 1199 * request. This happens in error conditions. 1200 */ 1201 void 1202 rpcrdma_recv_buffer_put(struct rpcrdma_rep *rep) 1203 { 1204 struct rpcrdma_buffer *buffers = &rep->rr_rxprt->rx_buf; 1205 1206 spin_lock(&buffers->rb_lock); 1207 buffers->rb_recv_count--; 1208 list_add_tail(&rep->rr_list, &buffers->rb_recv_bufs); 1209 spin_unlock(&buffers->rb_lock); 1210 } 1211 1212 /** 1213 * rpcrdma_alloc_regbuf - allocate and DMA-map memory for SEND/RECV buffers 1214 * @size: size of buffer to be allocated, in bytes 1215 * @direction: direction of data movement 1216 * @flags: GFP flags 1217 * 1218 * Returns an ERR_PTR, or a pointer to a regbuf, a buffer that 1219 * can be persistently DMA-mapped for I/O. 1220 * 1221 * xprtrdma uses a regbuf for posting an outgoing RDMA SEND, or for 1222 * receiving the payload of RDMA RECV operations. During Long Calls 1223 * or Replies they may be registered externally via ro_map. 1224 */ 1225 struct rpcrdma_regbuf * 1226 rpcrdma_alloc_regbuf(size_t size, enum dma_data_direction direction, 1227 gfp_t flags) 1228 { 1229 struct rpcrdma_regbuf *rb; 1230 1231 rb = kmalloc(sizeof(*rb) + size, flags); 1232 if (rb == NULL) 1233 return ERR_PTR(-ENOMEM); 1234 1235 rb->rg_device = NULL; 1236 rb->rg_direction = direction; 1237 rb->rg_iov.length = size; 1238 1239 return rb; 1240 } 1241 1242 /** 1243 * __rpcrdma_map_regbuf - DMA-map a regbuf 1244 * @ia: controlling rpcrdma_ia 1245 * @rb: regbuf to be mapped 1246 */ 1247 bool 1248 __rpcrdma_dma_map_regbuf(struct rpcrdma_ia *ia, struct rpcrdma_regbuf *rb) 1249 { 1250 if (rb->rg_direction == DMA_NONE) 1251 return false; 1252 1253 rb->rg_iov.addr = ib_dma_map_single(ia->ri_device, 1254 (void *)rb->rg_base, 1255 rdmab_length(rb), 1256 rb->rg_direction); 1257 if (ib_dma_mapping_error(ia->ri_device, rdmab_addr(rb))) 1258 return false; 1259 1260 rb->rg_device = ia->ri_device; 1261 rb->rg_iov.lkey = ia->ri_pd->local_dma_lkey; 1262 return true; 1263 } 1264 1265 static void 1266 rpcrdma_dma_unmap_regbuf(struct rpcrdma_regbuf *rb) 1267 { 1268 if (!rpcrdma_regbuf_is_mapped(rb)) 1269 return; 1270 1271 ib_dma_unmap_single(rb->rg_device, rdmab_addr(rb), 1272 rdmab_length(rb), rb->rg_direction); 1273 rb->rg_device = NULL; 1274 } 1275 1276 /** 1277 * rpcrdma_free_regbuf - deregister and free registered buffer 1278 * @rb: regbuf to be deregistered and freed 1279 */ 1280 void 1281 rpcrdma_free_regbuf(struct rpcrdma_regbuf *rb) 1282 { 1283 if (!rb) 1284 return; 1285 1286 rpcrdma_dma_unmap_regbuf(rb); 1287 kfree(rb); 1288 } 1289 1290 /* 1291 * Prepost any receive buffer, then post send. 1292 * 1293 * Receive buffer is donated to hardware, reclaimed upon recv completion. 1294 */ 1295 int 1296 rpcrdma_ep_post(struct rpcrdma_ia *ia, 1297 struct rpcrdma_ep *ep, 1298 struct rpcrdma_req *req) 1299 { 1300 struct ib_send_wr *send_wr = &req->rl_send_wr; 1301 struct ib_send_wr *send_wr_fail; 1302 int rc; 1303 1304 if (req->rl_reply) { 1305 rc = rpcrdma_ep_post_recv(ia, req->rl_reply); 1306 if (rc) 1307 return rc; 1308 req->rl_reply = NULL; 1309 } 1310 1311 dprintk("RPC: %s: posting %d s/g entries\n", 1312 __func__, send_wr->num_sge); 1313 1314 if (DECR_CQCOUNT(ep) > 0) 1315 send_wr->send_flags = 0; 1316 else { /* Provider must take a send completion every now and then */ 1317 INIT_CQCOUNT(ep); 1318 send_wr->send_flags = IB_SEND_SIGNALED; 1319 } 1320 1321 rc = ib_post_send(ia->ri_id->qp, send_wr, &send_wr_fail); 1322 if (rc) 1323 goto out_postsend_err; 1324 return 0; 1325 1326 out_postsend_err: 1327 pr_err("rpcrdma: RDMA Send ib_post_send returned %i\n", rc); 1328 return -ENOTCONN; 1329 } 1330 1331 int 1332 rpcrdma_ep_post_recv(struct rpcrdma_ia *ia, 1333 struct rpcrdma_rep *rep) 1334 { 1335 struct ib_recv_wr *recv_wr_fail; 1336 int rc; 1337 1338 if (!rpcrdma_dma_map_regbuf(ia, rep->rr_rdmabuf)) 1339 goto out_map; 1340 rc = ib_post_recv(ia->ri_id->qp, &rep->rr_recv_wr, &recv_wr_fail); 1341 if (rc) 1342 goto out_postrecv; 1343 return 0; 1344 1345 out_map: 1346 pr_err("rpcrdma: failed to DMA map the Receive buffer\n"); 1347 return -EIO; 1348 1349 out_postrecv: 1350 pr_err("rpcrdma: ib_post_recv returned %i\n", rc); 1351 return -ENOTCONN; 1352 } 1353 1354 /** 1355 * rpcrdma_ep_post_extra_recv - Post buffers for incoming backchannel requests 1356 * @r_xprt: transport associated with these backchannel resources 1357 * @min_reqs: minimum number of incoming requests expected 1358 * 1359 * Returns zero if all requested buffers were posted, or a negative errno. 1360 */ 1361 int 1362 rpcrdma_ep_post_extra_recv(struct rpcrdma_xprt *r_xprt, unsigned int count) 1363 { 1364 struct rpcrdma_buffer *buffers = &r_xprt->rx_buf; 1365 struct rpcrdma_ia *ia = &r_xprt->rx_ia; 1366 struct rpcrdma_rep *rep; 1367 int rc; 1368 1369 while (count--) { 1370 spin_lock(&buffers->rb_lock); 1371 if (list_empty(&buffers->rb_recv_bufs)) 1372 goto out_reqbuf; 1373 rep = rpcrdma_buffer_get_rep_locked(buffers); 1374 spin_unlock(&buffers->rb_lock); 1375 1376 rc = rpcrdma_ep_post_recv(ia, rep); 1377 if (rc) 1378 goto out_rc; 1379 } 1380 1381 return 0; 1382 1383 out_reqbuf: 1384 spin_unlock(&buffers->rb_lock); 1385 pr_warn("%s: no extra receive buffers\n", __func__); 1386 return -ENOMEM; 1387 1388 out_rc: 1389 rpcrdma_recv_buffer_put(rep); 1390 return rc; 1391 } 1392