1 /* 2 * Copyright (c) 2003-2007 Network Appliance, Inc. All rights reserved. 3 * 4 * This software is available to you under a choice of one of two 5 * licenses. You may choose to be licensed under the terms of the GNU 6 * General Public License (GPL) Version 2, available from the file 7 * COPYING in the main directory of this source tree, or the BSD-type 8 * license below: 9 * 10 * Redistribution and use in source and binary forms, with or without 11 * modification, are permitted provided that the following conditions 12 * are met: 13 * 14 * Redistributions of source code must retain the above copyright 15 * notice, this list of conditions and the following disclaimer. 16 * 17 * Redistributions in binary form must reproduce the above 18 * copyright notice, this list of conditions and the following 19 * disclaimer in the documentation and/or other materials provided 20 * with the distribution. 21 * 22 * Neither the name of the Network Appliance, Inc. nor the names of 23 * its contributors may be used to endorse or promote products 24 * derived from this software without specific prior written 25 * permission. 26 * 27 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 28 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 29 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 30 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 31 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 32 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 33 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 34 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 35 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 36 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 37 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 38 */ 39 40 /* 41 * verbs.c 42 * 43 * Encapsulates the major functions managing: 44 * o adapters 45 * o endpoints 46 * o connections 47 * o buffer memory 48 */ 49 50 #include <linux/interrupt.h> 51 #include <linux/slab.h> 52 #include <linux/prefetch.h> 53 #include <linux/sunrpc/addr.h> 54 #include <asm/bitops.h> 55 #include <linux/module.h> /* try_module_get()/module_put() */ 56 57 #include "xprt_rdma.h" 58 59 /* 60 * Globals/Macros 61 */ 62 63 #if IS_ENABLED(CONFIG_SUNRPC_DEBUG) 64 # define RPCDBG_FACILITY RPCDBG_TRANS 65 #endif 66 67 /* 68 * internal functions 69 */ 70 71 static struct workqueue_struct *rpcrdma_receive_wq; 72 73 int 74 rpcrdma_alloc_wq(void) 75 { 76 struct workqueue_struct *recv_wq; 77 78 recv_wq = alloc_workqueue("xprtrdma_receive", 79 WQ_MEM_RECLAIM | WQ_UNBOUND | WQ_HIGHPRI, 80 0); 81 if (!recv_wq) 82 return -ENOMEM; 83 84 rpcrdma_receive_wq = recv_wq; 85 return 0; 86 } 87 88 void 89 rpcrdma_destroy_wq(void) 90 { 91 struct workqueue_struct *wq; 92 93 if (rpcrdma_receive_wq) { 94 wq = rpcrdma_receive_wq; 95 rpcrdma_receive_wq = NULL; 96 destroy_workqueue(wq); 97 } 98 } 99 100 static void 101 rpcrdma_qp_async_error_upcall(struct ib_event *event, void *context) 102 { 103 struct rpcrdma_ep *ep = context; 104 105 pr_err("RPC: %s: %s on device %s ep %p\n", 106 __func__, ib_event_msg(event->event), 107 event->device->name, context); 108 if (ep->rep_connected == 1) { 109 ep->rep_connected = -EIO; 110 rpcrdma_conn_func(ep); 111 wake_up_all(&ep->rep_connect_wait); 112 } 113 } 114 115 /** 116 * rpcrdma_wc_send - Invoked by RDMA provider for each polled Send WC 117 * @cq: completion queue (ignored) 118 * @wc: completed WR 119 * 120 */ 121 static void 122 rpcrdma_wc_send(struct ib_cq *cq, struct ib_wc *wc) 123 { 124 /* WARNING: Only wr_cqe and status are reliable at this point */ 125 if (wc->status != IB_WC_SUCCESS && wc->status != IB_WC_WR_FLUSH_ERR) 126 pr_err("rpcrdma: Send: %s (%u/0x%x)\n", 127 ib_wc_status_msg(wc->status), 128 wc->status, wc->vendor_err); 129 } 130 131 static void 132 rpcrdma_receive_worker(struct work_struct *work) 133 { 134 struct rpcrdma_rep *rep = 135 container_of(work, struct rpcrdma_rep, rr_work); 136 137 rpcrdma_reply_handler(rep); 138 } 139 140 /* Perform basic sanity checking to avoid using garbage 141 * to update the credit grant value. 142 */ 143 static void 144 rpcrdma_update_granted_credits(struct rpcrdma_rep *rep) 145 { 146 struct rpcrdma_msg *rmsgp = rdmab_to_msg(rep->rr_rdmabuf); 147 struct rpcrdma_buffer *buffer = &rep->rr_rxprt->rx_buf; 148 u32 credits; 149 150 if (rep->rr_len < RPCRDMA_HDRLEN_ERR) 151 return; 152 153 credits = be32_to_cpu(rmsgp->rm_credit); 154 if (credits == 0) 155 credits = 1; /* don't deadlock */ 156 else if (credits > buffer->rb_max_requests) 157 credits = buffer->rb_max_requests; 158 159 atomic_set(&buffer->rb_credits, credits); 160 } 161 162 /** 163 * rpcrdma_receive_wc - Invoked by RDMA provider for each polled Receive WC 164 * @cq: completion queue (ignored) 165 * @wc: completed WR 166 * 167 */ 168 static void 169 rpcrdma_receive_wc(struct ib_cq *cq, struct ib_wc *wc) 170 { 171 struct ib_cqe *cqe = wc->wr_cqe; 172 struct rpcrdma_rep *rep = container_of(cqe, struct rpcrdma_rep, 173 rr_cqe); 174 175 /* WARNING: Only wr_id and status are reliable at this point */ 176 if (wc->status != IB_WC_SUCCESS) 177 goto out_fail; 178 179 /* status == SUCCESS means all fields in wc are trustworthy */ 180 if (wc->opcode != IB_WC_RECV) 181 return; 182 183 dprintk("RPC: %s: rep %p opcode 'recv', length %u: success\n", 184 __func__, rep, wc->byte_len); 185 186 rep->rr_len = wc->byte_len; 187 ib_dma_sync_single_for_cpu(rep->rr_device, 188 rdmab_addr(rep->rr_rdmabuf), 189 rep->rr_len, DMA_FROM_DEVICE); 190 191 rpcrdma_update_granted_credits(rep); 192 193 out_schedule: 194 queue_work(rpcrdma_receive_wq, &rep->rr_work); 195 return; 196 197 out_fail: 198 if (wc->status != IB_WC_WR_FLUSH_ERR) 199 pr_err("rpcrdma: Recv: %s (%u/0x%x)\n", 200 ib_wc_status_msg(wc->status), 201 wc->status, wc->vendor_err); 202 rep->rr_len = RPCRDMA_BAD_LEN; 203 goto out_schedule; 204 } 205 206 static int 207 rpcrdma_conn_upcall(struct rdma_cm_id *id, struct rdma_cm_event *event) 208 { 209 struct rpcrdma_xprt *xprt = id->context; 210 struct rpcrdma_ia *ia = &xprt->rx_ia; 211 struct rpcrdma_ep *ep = &xprt->rx_ep; 212 #if IS_ENABLED(CONFIG_SUNRPC_DEBUG) 213 struct sockaddr *sap = (struct sockaddr *)&ep->rep_remote_addr; 214 #endif 215 struct ib_qp_attr *attr = &ia->ri_qp_attr; 216 struct ib_qp_init_attr *iattr = &ia->ri_qp_init_attr; 217 int connstate = 0; 218 219 switch (event->event) { 220 case RDMA_CM_EVENT_ADDR_RESOLVED: 221 case RDMA_CM_EVENT_ROUTE_RESOLVED: 222 ia->ri_async_rc = 0; 223 complete(&ia->ri_done); 224 break; 225 case RDMA_CM_EVENT_ADDR_ERROR: 226 ia->ri_async_rc = -EHOSTUNREACH; 227 dprintk("RPC: %s: CM address resolution error, ep 0x%p\n", 228 __func__, ep); 229 complete(&ia->ri_done); 230 break; 231 case RDMA_CM_EVENT_ROUTE_ERROR: 232 ia->ri_async_rc = -ENETUNREACH; 233 dprintk("RPC: %s: CM route resolution error, ep 0x%p\n", 234 __func__, ep); 235 complete(&ia->ri_done); 236 break; 237 case RDMA_CM_EVENT_ESTABLISHED: 238 connstate = 1; 239 ib_query_qp(ia->ri_id->qp, attr, 240 IB_QP_MAX_QP_RD_ATOMIC | IB_QP_MAX_DEST_RD_ATOMIC, 241 iattr); 242 dprintk("RPC: %s: %d responder resources" 243 " (%d initiator)\n", 244 __func__, attr->max_dest_rd_atomic, 245 attr->max_rd_atomic); 246 goto connected; 247 case RDMA_CM_EVENT_CONNECT_ERROR: 248 connstate = -ENOTCONN; 249 goto connected; 250 case RDMA_CM_EVENT_UNREACHABLE: 251 connstate = -ENETDOWN; 252 goto connected; 253 case RDMA_CM_EVENT_REJECTED: 254 connstate = -ECONNREFUSED; 255 goto connected; 256 case RDMA_CM_EVENT_DISCONNECTED: 257 connstate = -ECONNABORTED; 258 goto connected; 259 case RDMA_CM_EVENT_DEVICE_REMOVAL: 260 connstate = -ENODEV; 261 connected: 262 dprintk("RPC: %s: %sconnected\n", 263 __func__, connstate > 0 ? "" : "dis"); 264 atomic_set(&xprt->rx_buf.rb_credits, 1); 265 ep->rep_connected = connstate; 266 rpcrdma_conn_func(ep); 267 wake_up_all(&ep->rep_connect_wait); 268 /*FALLTHROUGH*/ 269 default: 270 dprintk("RPC: %s: %pIS:%u (ep 0x%p): %s\n", 271 __func__, sap, rpc_get_port(sap), ep, 272 rdma_event_msg(event->event)); 273 break; 274 } 275 276 #if IS_ENABLED(CONFIG_SUNRPC_DEBUG) 277 if (connstate == 1) { 278 int ird = attr->max_dest_rd_atomic; 279 int tird = ep->rep_remote_cma.responder_resources; 280 281 pr_info("rpcrdma: connection to %pIS:%u on %s, memreg '%s', %d credits, %d responders%s\n", 282 sap, rpc_get_port(sap), 283 ia->ri_device->name, 284 ia->ri_ops->ro_displayname, 285 xprt->rx_buf.rb_max_requests, 286 ird, ird < 4 && ird < tird / 2 ? " (low!)" : ""); 287 } else if (connstate < 0) { 288 pr_info("rpcrdma: connection to %pIS:%u closed (%d)\n", 289 sap, rpc_get_port(sap), connstate); 290 } 291 #endif 292 293 return 0; 294 } 295 296 static void rpcrdma_destroy_id(struct rdma_cm_id *id) 297 { 298 if (id) { 299 module_put(id->device->owner); 300 rdma_destroy_id(id); 301 } 302 } 303 304 static struct rdma_cm_id * 305 rpcrdma_create_id(struct rpcrdma_xprt *xprt, 306 struct rpcrdma_ia *ia, struct sockaddr *addr) 307 { 308 struct rdma_cm_id *id; 309 int rc; 310 311 init_completion(&ia->ri_done); 312 313 id = rdma_create_id(&init_net, rpcrdma_conn_upcall, xprt, RDMA_PS_TCP, 314 IB_QPT_RC); 315 if (IS_ERR(id)) { 316 rc = PTR_ERR(id); 317 dprintk("RPC: %s: rdma_create_id() failed %i\n", 318 __func__, rc); 319 return id; 320 } 321 322 ia->ri_async_rc = -ETIMEDOUT; 323 rc = rdma_resolve_addr(id, NULL, addr, RDMA_RESOLVE_TIMEOUT); 324 if (rc) { 325 dprintk("RPC: %s: rdma_resolve_addr() failed %i\n", 326 __func__, rc); 327 goto out; 328 } 329 wait_for_completion_interruptible_timeout(&ia->ri_done, 330 msecs_to_jiffies(RDMA_RESOLVE_TIMEOUT) + 1); 331 332 /* FIXME: 333 * Until xprtrdma supports DEVICE_REMOVAL, the provider must 334 * be pinned while there are active NFS/RDMA mounts to prevent 335 * hangs and crashes at umount time. 336 */ 337 if (!ia->ri_async_rc && !try_module_get(id->device->owner)) { 338 dprintk("RPC: %s: Failed to get device module\n", 339 __func__); 340 ia->ri_async_rc = -ENODEV; 341 } 342 rc = ia->ri_async_rc; 343 if (rc) 344 goto out; 345 346 ia->ri_async_rc = -ETIMEDOUT; 347 rc = rdma_resolve_route(id, RDMA_RESOLVE_TIMEOUT); 348 if (rc) { 349 dprintk("RPC: %s: rdma_resolve_route() failed %i\n", 350 __func__, rc); 351 goto put; 352 } 353 wait_for_completion_interruptible_timeout(&ia->ri_done, 354 msecs_to_jiffies(RDMA_RESOLVE_TIMEOUT) + 1); 355 rc = ia->ri_async_rc; 356 if (rc) 357 goto put; 358 359 return id; 360 put: 361 module_put(id->device->owner); 362 out: 363 rdma_destroy_id(id); 364 return ERR_PTR(rc); 365 } 366 367 /* 368 * Exported functions. 369 */ 370 371 /* 372 * Open and initialize an Interface Adapter. 373 * o initializes fields of struct rpcrdma_ia, including 374 * interface and provider attributes and protection zone. 375 */ 376 int 377 rpcrdma_ia_open(struct rpcrdma_xprt *xprt, struct sockaddr *addr, int memreg) 378 { 379 struct rpcrdma_ia *ia = &xprt->rx_ia; 380 int rc; 381 382 ia->ri_id = rpcrdma_create_id(xprt, ia, addr); 383 if (IS_ERR(ia->ri_id)) { 384 rc = PTR_ERR(ia->ri_id); 385 goto out1; 386 } 387 ia->ri_device = ia->ri_id->device; 388 389 ia->ri_pd = ib_alloc_pd(ia->ri_device); 390 if (IS_ERR(ia->ri_pd)) { 391 rc = PTR_ERR(ia->ri_pd); 392 pr_err("rpcrdma: ib_alloc_pd() returned %d\n", rc); 393 goto out2; 394 } 395 396 switch (memreg) { 397 case RPCRDMA_FRMR: 398 if (frwr_is_supported(ia)) { 399 ia->ri_ops = &rpcrdma_frwr_memreg_ops; 400 break; 401 } 402 /*FALLTHROUGH*/ 403 case RPCRDMA_MTHCAFMR: 404 if (fmr_is_supported(ia)) { 405 ia->ri_ops = &rpcrdma_fmr_memreg_ops; 406 break; 407 } 408 /*FALLTHROUGH*/ 409 default: 410 pr_err("rpcrdma: Unsupported memory registration mode: %d\n", 411 memreg); 412 rc = -EINVAL; 413 goto out3; 414 } 415 416 return 0; 417 418 out3: 419 ib_dealloc_pd(ia->ri_pd); 420 ia->ri_pd = NULL; 421 out2: 422 rpcrdma_destroy_id(ia->ri_id); 423 ia->ri_id = NULL; 424 out1: 425 return rc; 426 } 427 428 /* 429 * Clean up/close an IA. 430 * o if event handles and PD have been initialized, free them. 431 * o close the IA 432 */ 433 void 434 rpcrdma_ia_close(struct rpcrdma_ia *ia) 435 { 436 dprintk("RPC: %s: entering\n", __func__); 437 if (ia->ri_id != NULL && !IS_ERR(ia->ri_id)) { 438 if (ia->ri_id->qp) 439 rdma_destroy_qp(ia->ri_id); 440 rpcrdma_destroy_id(ia->ri_id); 441 ia->ri_id = NULL; 442 } 443 444 /* If the pd is still busy, xprtrdma missed freeing a resource */ 445 if (ia->ri_pd && !IS_ERR(ia->ri_pd)) 446 ib_dealloc_pd(ia->ri_pd); 447 } 448 449 /* 450 * Create unconnected endpoint. 451 */ 452 int 453 rpcrdma_ep_create(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia, 454 struct rpcrdma_create_data_internal *cdata) 455 { 456 struct ib_cq *sendcq, *recvcq; 457 unsigned int max_qp_wr; 458 int rc; 459 460 if (ia->ri_device->attrs.max_sge < RPCRDMA_MAX_IOVS) { 461 dprintk("RPC: %s: insufficient sge's available\n", 462 __func__); 463 return -ENOMEM; 464 } 465 466 if (ia->ri_device->attrs.max_qp_wr <= RPCRDMA_BACKWARD_WRS) { 467 dprintk("RPC: %s: insufficient wqe's available\n", 468 __func__); 469 return -ENOMEM; 470 } 471 max_qp_wr = ia->ri_device->attrs.max_qp_wr - RPCRDMA_BACKWARD_WRS - 1; 472 473 /* check provider's send/recv wr limits */ 474 if (cdata->max_requests > max_qp_wr) 475 cdata->max_requests = max_qp_wr; 476 477 ep->rep_attr.event_handler = rpcrdma_qp_async_error_upcall; 478 ep->rep_attr.qp_context = ep; 479 ep->rep_attr.srq = NULL; 480 ep->rep_attr.cap.max_send_wr = cdata->max_requests; 481 ep->rep_attr.cap.max_send_wr += RPCRDMA_BACKWARD_WRS; 482 ep->rep_attr.cap.max_send_wr += 1; /* drain cqe */ 483 rc = ia->ri_ops->ro_open(ia, ep, cdata); 484 if (rc) 485 return rc; 486 ep->rep_attr.cap.max_recv_wr = cdata->max_requests; 487 ep->rep_attr.cap.max_recv_wr += RPCRDMA_BACKWARD_WRS; 488 ep->rep_attr.cap.max_recv_wr += 1; /* drain cqe */ 489 ep->rep_attr.cap.max_send_sge = RPCRDMA_MAX_IOVS; 490 ep->rep_attr.cap.max_recv_sge = 1; 491 ep->rep_attr.cap.max_inline_data = 0; 492 ep->rep_attr.sq_sig_type = IB_SIGNAL_REQ_WR; 493 ep->rep_attr.qp_type = IB_QPT_RC; 494 ep->rep_attr.port_num = ~0; 495 496 dprintk("RPC: %s: requested max: dtos: send %d recv %d; " 497 "iovs: send %d recv %d\n", 498 __func__, 499 ep->rep_attr.cap.max_send_wr, 500 ep->rep_attr.cap.max_recv_wr, 501 ep->rep_attr.cap.max_send_sge, 502 ep->rep_attr.cap.max_recv_sge); 503 504 /* set trigger for requesting send completion */ 505 ep->rep_cqinit = ep->rep_attr.cap.max_send_wr/2 - 1; 506 if (ep->rep_cqinit <= 2) 507 ep->rep_cqinit = 0; /* always signal? */ 508 INIT_CQCOUNT(ep); 509 init_waitqueue_head(&ep->rep_connect_wait); 510 INIT_DELAYED_WORK(&ep->rep_connect_worker, rpcrdma_connect_worker); 511 512 sendcq = ib_alloc_cq(ia->ri_device, NULL, 513 ep->rep_attr.cap.max_send_wr + 1, 514 0, IB_POLL_SOFTIRQ); 515 if (IS_ERR(sendcq)) { 516 rc = PTR_ERR(sendcq); 517 dprintk("RPC: %s: failed to create send CQ: %i\n", 518 __func__, rc); 519 goto out1; 520 } 521 522 recvcq = ib_alloc_cq(ia->ri_device, NULL, 523 ep->rep_attr.cap.max_recv_wr + 1, 524 0, IB_POLL_SOFTIRQ); 525 if (IS_ERR(recvcq)) { 526 rc = PTR_ERR(recvcq); 527 dprintk("RPC: %s: failed to create recv CQ: %i\n", 528 __func__, rc); 529 goto out2; 530 } 531 532 ep->rep_attr.send_cq = sendcq; 533 ep->rep_attr.recv_cq = recvcq; 534 535 /* Initialize cma parameters */ 536 memset(&ep->rep_remote_cma, 0, sizeof(ep->rep_remote_cma)); 537 538 /* RPC/RDMA does not use private data */ 539 ep->rep_remote_cma.private_data = NULL; 540 ep->rep_remote_cma.private_data_len = 0; 541 542 /* Client offers RDMA Read but does not initiate */ 543 ep->rep_remote_cma.initiator_depth = 0; 544 if (ia->ri_device->attrs.max_qp_rd_atom > 32) /* arbitrary but <= 255 */ 545 ep->rep_remote_cma.responder_resources = 32; 546 else 547 ep->rep_remote_cma.responder_resources = 548 ia->ri_device->attrs.max_qp_rd_atom; 549 550 /* Limit transport retries so client can detect server 551 * GID changes quickly. RPC layer handles re-establishing 552 * transport connection and retransmission. 553 */ 554 ep->rep_remote_cma.retry_count = 6; 555 556 /* RPC-over-RDMA handles its own flow control. In addition, 557 * make all RNR NAKs visible so we know that RPC-over-RDMA 558 * flow control is working correctly (no NAKs should be seen). 559 */ 560 ep->rep_remote_cma.flow_control = 0; 561 ep->rep_remote_cma.rnr_retry_count = 0; 562 563 return 0; 564 565 out2: 566 ib_free_cq(sendcq); 567 out1: 568 return rc; 569 } 570 571 /* 572 * rpcrdma_ep_destroy 573 * 574 * Disconnect and destroy endpoint. After this, the only 575 * valid operations on the ep are to free it (if dynamically 576 * allocated) or re-create it. 577 */ 578 void 579 rpcrdma_ep_destroy(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia) 580 { 581 dprintk("RPC: %s: entering, connected is %d\n", 582 __func__, ep->rep_connected); 583 584 cancel_delayed_work_sync(&ep->rep_connect_worker); 585 586 if (ia->ri_id->qp) { 587 rpcrdma_ep_disconnect(ep, ia); 588 rdma_destroy_qp(ia->ri_id); 589 ia->ri_id->qp = NULL; 590 } 591 592 ib_free_cq(ep->rep_attr.recv_cq); 593 ib_free_cq(ep->rep_attr.send_cq); 594 } 595 596 /* 597 * Connect unconnected endpoint. 598 */ 599 int 600 rpcrdma_ep_connect(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia) 601 { 602 struct rdma_cm_id *id, *old; 603 int rc = 0; 604 int retry_count = 0; 605 606 if (ep->rep_connected != 0) { 607 struct rpcrdma_xprt *xprt; 608 retry: 609 dprintk("RPC: %s: reconnecting...\n", __func__); 610 611 rpcrdma_ep_disconnect(ep, ia); 612 613 xprt = container_of(ia, struct rpcrdma_xprt, rx_ia); 614 id = rpcrdma_create_id(xprt, ia, 615 (struct sockaddr *)&xprt->rx_data.addr); 616 if (IS_ERR(id)) { 617 rc = -EHOSTUNREACH; 618 goto out; 619 } 620 /* TEMP TEMP TEMP - fail if new device: 621 * Deregister/remarshal *all* requests! 622 * Close and recreate adapter, pd, etc! 623 * Re-determine all attributes still sane! 624 * More stuff I haven't thought of! 625 * Rrrgh! 626 */ 627 if (ia->ri_device != id->device) { 628 printk("RPC: %s: can't reconnect on " 629 "different device!\n", __func__); 630 rpcrdma_destroy_id(id); 631 rc = -ENETUNREACH; 632 goto out; 633 } 634 /* END TEMP */ 635 rc = rdma_create_qp(id, ia->ri_pd, &ep->rep_attr); 636 if (rc) { 637 dprintk("RPC: %s: rdma_create_qp failed %i\n", 638 __func__, rc); 639 rpcrdma_destroy_id(id); 640 rc = -ENETUNREACH; 641 goto out; 642 } 643 644 old = ia->ri_id; 645 ia->ri_id = id; 646 647 rdma_destroy_qp(old); 648 rpcrdma_destroy_id(old); 649 } else { 650 dprintk("RPC: %s: connecting...\n", __func__); 651 rc = rdma_create_qp(ia->ri_id, ia->ri_pd, &ep->rep_attr); 652 if (rc) { 653 dprintk("RPC: %s: rdma_create_qp failed %i\n", 654 __func__, rc); 655 /* do not update ep->rep_connected */ 656 return -ENETUNREACH; 657 } 658 } 659 660 ep->rep_connected = 0; 661 662 rc = rdma_connect(ia->ri_id, &ep->rep_remote_cma); 663 if (rc) { 664 dprintk("RPC: %s: rdma_connect() failed with %i\n", 665 __func__, rc); 666 goto out; 667 } 668 669 wait_event_interruptible(ep->rep_connect_wait, ep->rep_connected != 0); 670 671 /* 672 * Check state. A non-peer reject indicates no listener 673 * (ECONNREFUSED), which may be a transient state. All 674 * others indicate a transport condition which has already 675 * undergone a best-effort. 676 */ 677 if (ep->rep_connected == -ECONNREFUSED && 678 ++retry_count <= RDMA_CONNECT_RETRY_MAX) { 679 dprintk("RPC: %s: non-peer_reject, retry\n", __func__); 680 goto retry; 681 } 682 if (ep->rep_connected <= 0) { 683 /* Sometimes, the only way to reliably connect to remote 684 * CMs is to use same nonzero values for ORD and IRD. */ 685 if (retry_count++ <= RDMA_CONNECT_RETRY_MAX + 1 && 686 (ep->rep_remote_cma.responder_resources == 0 || 687 ep->rep_remote_cma.initiator_depth != 688 ep->rep_remote_cma.responder_resources)) { 689 if (ep->rep_remote_cma.responder_resources == 0) 690 ep->rep_remote_cma.responder_resources = 1; 691 ep->rep_remote_cma.initiator_depth = 692 ep->rep_remote_cma.responder_resources; 693 goto retry; 694 } 695 rc = ep->rep_connected; 696 } else { 697 struct rpcrdma_xprt *r_xprt; 698 unsigned int extras; 699 700 dprintk("RPC: %s: connected\n", __func__); 701 702 r_xprt = container_of(ia, struct rpcrdma_xprt, rx_ia); 703 extras = r_xprt->rx_buf.rb_bc_srv_max_requests; 704 705 if (extras) { 706 rc = rpcrdma_ep_post_extra_recv(r_xprt, extras); 707 if (rc) { 708 pr_warn("%s: rpcrdma_ep_post_extra_recv: %i\n", 709 __func__, rc); 710 rc = 0; 711 } 712 } 713 } 714 715 out: 716 if (rc) 717 ep->rep_connected = rc; 718 return rc; 719 } 720 721 /* 722 * rpcrdma_ep_disconnect 723 * 724 * This is separate from destroy to facilitate the ability 725 * to reconnect without recreating the endpoint. 726 * 727 * This call is not reentrant, and must not be made in parallel 728 * on the same endpoint. 729 */ 730 void 731 rpcrdma_ep_disconnect(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia) 732 { 733 int rc; 734 735 rc = rdma_disconnect(ia->ri_id); 736 if (!rc) { 737 /* returns without wait if not connected */ 738 wait_event_interruptible(ep->rep_connect_wait, 739 ep->rep_connected != 1); 740 dprintk("RPC: %s: after wait, %sconnected\n", __func__, 741 (ep->rep_connected == 1) ? "still " : "dis"); 742 } else { 743 dprintk("RPC: %s: rdma_disconnect %i\n", __func__, rc); 744 ep->rep_connected = rc; 745 } 746 747 ib_drain_qp(ia->ri_id->qp); 748 } 749 750 static void 751 rpcrdma_mr_recovery_worker(struct work_struct *work) 752 { 753 struct rpcrdma_buffer *buf = container_of(work, struct rpcrdma_buffer, 754 rb_recovery_worker.work); 755 struct rpcrdma_mw *mw; 756 757 spin_lock(&buf->rb_recovery_lock); 758 while (!list_empty(&buf->rb_stale_mrs)) { 759 mw = list_first_entry(&buf->rb_stale_mrs, 760 struct rpcrdma_mw, mw_list); 761 list_del_init(&mw->mw_list); 762 spin_unlock(&buf->rb_recovery_lock); 763 764 dprintk("RPC: %s: recovering MR %p\n", __func__, mw); 765 mw->mw_xprt->rx_ia.ri_ops->ro_recover_mr(mw); 766 767 spin_lock(&buf->rb_recovery_lock); 768 } 769 spin_unlock(&buf->rb_recovery_lock); 770 } 771 772 void 773 rpcrdma_defer_mr_recovery(struct rpcrdma_mw *mw) 774 { 775 struct rpcrdma_xprt *r_xprt = mw->mw_xprt; 776 struct rpcrdma_buffer *buf = &r_xprt->rx_buf; 777 778 spin_lock(&buf->rb_recovery_lock); 779 list_add(&mw->mw_list, &buf->rb_stale_mrs); 780 spin_unlock(&buf->rb_recovery_lock); 781 782 schedule_delayed_work(&buf->rb_recovery_worker, 0); 783 } 784 785 static void 786 rpcrdma_create_mrs(struct rpcrdma_xprt *r_xprt) 787 { 788 struct rpcrdma_buffer *buf = &r_xprt->rx_buf; 789 struct rpcrdma_ia *ia = &r_xprt->rx_ia; 790 unsigned int count; 791 LIST_HEAD(free); 792 LIST_HEAD(all); 793 794 for (count = 0; count < 32; count++) { 795 struct rpcrdma_mw *mw; 796 int rc; 797 798 mw = kzalloc(sizeof(*mw), GFP_KERNEL); 799 if (!mw) 800 break; 801 802 rc = ia->ri_ops->ro_init_mr(ia, mw); 803 if (rc) { 804 kfree(mw); 805 break; 806 } 807 808 mw->mw_xprt = r_xprt; 809 810 list_add(&mw->mw_list, &free); 811 list_add(&mw->mw_all, &all); 812 } 813 814 spin_lock(&buf->rb_mwlock); 815 list_splice(&free, &buf->rb_mws); 816 list_splice(&all, &buf->rb_all); 817 r_xprt->rx_stats.mrs_allocated += count; 818 spin_unlock(&buf->rb_mwlock); 819 820 dprintk("RPC: %s: created %u MRs\n", __func__, count); 821 } 822 823 static void 824 rpcrdma_mr_refresh_worker(struct work_struct *work) 825 { 826 struct rpcrdma_buffer *buf = container_of(work, struct rpcrdma_buffer, 827 rb_refresh_worker.work); 828 struct rpcrdma_xprt *r_xprt = container_of(buf, struct rpcrdma_xprt, 829 rx_buf); 830 831 rpcrdma_create_mrs(r_xprt); 832 } 833 834 struct rpcrdma_req * 835 rpcrdma_create_req(struct rpcrdma_xprt *r_xprt) 836 { 837 struct rpcrdma_buffer *buffer = &r_xprt->rx_buf; 838 struct rpcrdma_req *req; 839 840 req = kzalloc(sizeof(*req), GFP_KERNEL); 841 if (req == NULL) 842 return ERR_PTR(-ENOMEM); 843 844 INIT_LIST_HEAD(&req->rl_free); 845 spin_lock(&buffer->rb_reqslock); 846 list_add(&req->rl_all, &buffer->rb_allreqs); 847 spin_unlock(&buffer->rb_reqslock); 848 req->rl_cqe.done = rpcrdma_wc_send; 849 req->rl_buffer = &r_xprt->rx_buf; 850 INIT_LIST_HEAD(&req->rl_registered); 851 return req; 852 } 853 854 struct rpcrdma_rep * 855 rpcrdma_create_rep(struct rpcrdma_xprt *r_xprt) 856 { 857 struct rpcrdma_create_data_internal *cdata = &r_xprt->rx_data; 858 struct rpcrdma_ia *ia = &r_xprt->rx_ia; 859 struct rpcrdma_rep *rep; 860 int rc; 861 862 rc = -ENOMEM; 863 rep = kzalloc(sizeof(*rep), GFP_KERNEL); 864 if (rep == NULL) 865 goto out; 866 867 rep->rr_rdmabuf = rpcrdma_alloc_regbuf(ia, cdata->inline_rsize, 868 GFP_KERNEL); 869 if (IS_ERR(rep->rr_rdmabuf)) { 870 rc = PTR_ERR(rep->rr_rdmabuf); 871 goto out_free; 872 } 873 874 rep->rr_device = ia->ri_device; 875 rep->rr_cqe.done = rpcrdma_receive_wc; 876 rep->rr_rxprt = r_xprt; 877 INIT_WORK(&rep->rr_work, rpcrdma_receive_worker); 878 return rep; 879 880 out_free: 881 kfree(rep); 882 out: 883 return ERR_PTR(rc); 884 } 885 886 int 887 rpcrdma_buffer_create(struct rpcrdma_xprt *r_xprt) 888 { 889 struct rpcrdma_buffer *buf = &r_xprt->rx_buf; 890 int i, rc; 891 892 buf->rb_max_requests = r_xprt->rx_data.max_requests; 893 buf->rb_bc_srv_max_requests = 0; 894 atomic_set(&buf->rb_credits, 1); 895 spin_lock_init(&buf->rb_mwlock); 896 spin_lock_init(&buf->rb_lock); 897 spin_lock_init(&buf->rb_recovery_lock); 898 INIT_LIST_HEAD(&buf->rb_mws); 899 INIT_LIST_HEAD(&buf->rb_all); 900 INIT_LIST_HEAD(&buf->rb_stale_mrs); 901 INIT_DELAYED_WORK(&buf->rb_refresh_worker, 902 rpcrdma_mr_refresh_worker); 903 INIT_DELAYED_WORK(&buf->rb_recovery_worker, 904 rpcrdma_mr_recovery_worker); 905 906 rpcrdma_create_mrs(r_xprt); 907 908 INIT_LIST_HEAD(&buf->rb_send_bufs); 909 INIT_LIST_HEAD(&buf->rb_allreqs); 910 spin_lock_init(&buf->rb_reqslock); 911 for (i = 0; i < buf->rb_max_requests; i++) { 912 struct rpcrdma_req *req; 913 914 req = rpcrdma_create_req(r_xprt); 915 if (IS_ERR(req)) { 916 dprintk("RPC: %s: request buffer %d alloc" 917 " failed\n", __func__, i); 918 rc = PTR_ERR(req); 919 goto out; 920 } 921 req->rl_backchannel = false; 922 list_add(&req->rl_free, &buf->rb_send_bufs); 923 } 924 925 INIT_LIST_HEAD(&buf->rb_recv_bufs); 926 for (i = 0; i < buf->rb_max_requests; i++) { 927 struct rpcrdma_rep *rep; 928 929 rep = rpcrdma_create_rep(r_xprt); 930 if (IS_ERR(rep)) { 931 dprintk("RPC: %s: reply buffer %d alloc failed\n", 932 __func__, i); 933 rc = PTR_ERR(rep); 934 goto out; 935 } 936 list_add(&rep->rr_list, &buf->rb_recv_bufs); 937 } 938 939 return 0; 940 out: 941 rpcrdma_buffer_destroy(buf); 942 return rc; 943 } 944 945 static struct rpcrdma_req * 946 rpcrdma_buffer_get_req_locked(struct rpcrdma_buffer *buf) 947 { 948 struct rpcrdma_req *req; 949 950 req = list_first_entry(&buf->rb_send_bufs, 951 struct rpcrdma_req, rl_free); 952 list_del(&req->rl_free); 953 return req; 954 } 955 956 static struct rpcrdma_rep * 957 rpcrdma_buffer_get_rep_locked(struct rpcrdma_buffer *buf) 958 { 959 struct rpcrdma_rep *rep; 960 961 rep = list_first_entry(&buf->rb_recv_bufs, 962 struct rpcrdma_rep, rr_list); 963 list_del(&rep->rr_list); 964 return rep; 965 } 966 967 static void 968 rpcrdma_destroy_rep(struct rpcrdma_ia *ia, struct rpcrdma_rep *rep) 969 { 970 rpcrdma_free_regbuf(ia, rep->rr_rdmabuf); 971 kfree(rep); 972 } 973 974 void 975 rpcrdma_destroy_req(struct rpcrdma_ia *ia, struct rpcrdma_req *req) 976 { 977 rpcrdma_free_regbuf(ia, req->rl_sendbuf); 978 rpcrdma_free_regbuf(ia, req->rl_rdmabuf); 979 kfree(req); 980 } 981 982 static void 983 rpcrdma_destroy_mrs(struct rpcrdma_buffer *buf) 984 { 985 struct rpcrdma_xprt *r_xprt = container_of(buf, struct rpcrdma_xprt, 986 rx_buf); 987 struct rpcrdma_ia *ia = rdmab_to_ia(buf); 988 struct rpcrdma_mw *mw; 989 unsigned int count; 990 991 count = 0; 992 spin_lock(&buf->rb_mwlock); 993 while (!list_empty(&buf->rb_all)) { 994 mw = list_entry(buf->rb_all.next, struct rpcrdma_mw, mw_all); 995 list_del(&mw->mw_all); 996 997 spin_unlock(&buf->rb_mwlock); 998 ia->ri_ops->ro_release_mr(mw); 999 count++; 1000 spin_lock(&buf->rb_mwlock); 1001 } 1002 spin_unlock(&buf->rb_mwlock); 1003 r_xprt->rx_stats.mrs_allocated = 0; 1004 1005 dprintk("RPC: %s: released %u MRs\n", __func__, count); 1006 } 1007 1008 void 1009 rpcrdma_buffer_destroy(struct rpcrdma_buffer *buf) 1010 { 1011 struct rpcrdma_ia *ia = rdmab_to_ia(buf); 1012 1013 cancel_delayed_work_sync(&buf->rb_recovery_worker); 1014 1015 while (!list_empty(&buf->rb_recv_bufs)) { 1016 struct rpcrdma_rep *rep; 1017 1018 rep = rpcrdma_buffer_get_rep_locked(buf); 1019 rpcrdma_destroy_rep(ia, rep); 1020 } 1021 1022 spin_lock(&buf->rb_reqslock); 1023 while (!list_empty(&buf->rb_allreqs)) { 1024 struct rpcrdma_req *req; 1025 1026 req = list_first_entry(&buf->rb_allreqs, 1027 struct rpcrdma_req, rl_all); 1028 list_del(&req->rl_all); 1029 1030 spin_unlock(&buf->rb_reqslock); 1031 rpcrdma_destroy_req(ia, req); 1032 spin_lock(&buf->rb_reqslock); 1033 } 1034 spin_unlock(&buf->rb_reqslock); 1035 1036 rpcrdma_destroy_mrs(buf); 1037 } 1038 1039 struct rpcrdma_mw * 1040 rpcrdma_get_mw(struct rpcrdma_xprt *r_xprt) 1041 { 1042 struct rpcrdma_buffer *buf = &r_xprt->rx_buf; 1043 struct rpcrdma_mw *mw = NULL; 1044 1045 spin_lock(&buf->rb_mwlock); 1046 if (!list_empty(&buf->rb_mws)) { 1047 mw = list_first_entry(&buf->rb_mws, 1048 struct rpcrdma_mw, mw_list); 1049 list_del_init(&mw->mw_list); 1050 } 1051 spin_unlock(&buf->rb_mwlock); 1052 1053 if (!mw) 1054 goto out_nomws; 1055 return mw; 1056 1057 out_nomws: 1058 dprintk("RPC: %s: no MWs available\n", __func__); 1059 schedule_delayed_work(&buf->rb_refresh_worker, 0); 1060 1061 /* Allow the reply handler and refresh worker to run */ 1062 cond_resched(); 1063 1064 return NULL; 1065 } 1066 1067 void 1068 rpcrdma_put_mw(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mw *mw) 1069 { 1070 struct rpcrdma_buffer *buf = &r_xprt->rx_buf; 1071 1072 spin_lock(&buf->rb_mwlock); 1073 list_add_tail(&mw->mw_list, &buf->rb_mws); 1074 spin_unlock(&buf->rb_mwlock); 1075 } 1076 1077 /* 1078 * Get a set of request/reply buffers. 1079 */ 1080 struct rpcrdma_req * 1081 rpcrdma_buffer_get(struct rpcrdma_buffer *buffers) 1082 { 1083 struct rpcrdma_req *req; 1084 1085 spin_lock(&buffers->rb_lock); 1086 if (list_empty(&buffers->rb_send_bufs)) 1087 goto out_reqbuf; 1088 req = rpcrdma_buffer_get_req_locked(buffers); 1089 if (list_empty(&buffers->rb_recv_bufs)) 1090 goto out_repbuf; 1091 req->rl_reply = rpcrdma_buffer_get_rep_locked(buffers); 1092 spin_unlock(&buffers->rb_lock); 1093 return req; 1094 1095 out_reqbuf: 1096 spin_unlock(&buffers->rb_lock); 1097 pr_warn("rpcrdma: out of request buffers (%p)\n", buffers); 1098 return NULL; 1099 out_repbuf: 1100 list_add(&req->rl_free, &buffers->rb_send_bufs); 1101 spin_unlock(&buffers->rb_lock); 1102 pr_warn("rpcrdma: out of reply buffers (%p)\n", buffers); 1103 return NULL; 1104 } 1105 1106 /* 1107 * Put request/reply buffers back into pool. 1108 * Pre-decrement counter/array index. 1109 */ 1110 void 1111 rpcrdma_buffer_put(struct rpcrdma_req *req) 1112 { 1113 struct rpcrdma_buffer *buffers = req->rl_buffer; 1114 struct rpcrdma_rep *rep = req->rl_reply; 1115 1116 req->rl_niovs = 0; 1117 req->rl_reply = NULL; 1118 1119 spin_lock(&buffers->rb_lock); 1120 list_add_tail(&req->rl_free, &buffers->rb_send_bufs); 1121 if (rep) 1122 list_add_tail(&rep->rr_list, &buffers->rb_recv_bufs); 1123 spin_unlock(&buffers->rb_lock); 1124 } 1125 1126 /* 1127 * Recover reply buffers from pool. 1128 * This happens when recovering from disconnect. 1129 */ 1130 void 1131 rpcrdma_recv_buffer_get(struct rpcrdma_req *req) 1132 { 1133 struct rpcrdma_buffer *buffers = req->rl_buffer; 1134 1135 spin_lock(&buffers->rb_lock); 1136 if (!list_empty(&buffers->rb_recv_bufs)) 1137 req->rl_reply = rpcrdma_buffer_get_rep_locked(buffers); 1138 spin_unlock(&buffers->rb_lock); 1139 } 1140 1141 /* 1142 * Put reply buffers back into pool when not attached to 1143 * request. This happens in error conditions. 1144 */ 1145 void 1146 rpcrdma_recv_buffer_put(struct rpcrdma_rep *rep) 1147 { 1148 struct rpcrdma_buffer *buffers = &rep->rr_rxprt->rx_buf; 1149 1150 spin_lock(&buffers->rb_lock); 1151 list_add_tail(&rep->rr_list, &buffers->rb_recv_bufs); 1152 spin_unlock(&buffers->rb_lock); 1153 } 1154 1155 /* 1156 * Wrappers for internal-use kmalloc memory registration, used by buffer code. 1157 */ 1158 1159 /** 1160 * rpcrdma_alloc_regbuf - kmalloc and register memory for SEND/RECV buffers 1161 * @ia: controlling rpcrdma_ia 1162 * @size: size of buffer to be allocated, in bytes 1163 * @flags: GFP flags 1164 * 1165 * Returns pointer to private header of an area of internally 1166 * registered memory, or an ERR_PTR. The registered buffer follows 1167 * the end of the private header. 1168 * 1169 * xprtrdma uses a regbuf for posting an outgoing RDMA SEND, or for 1170 * receiving the payload of RDMA RECV operations. regbufs are not 1171 * used for RDMA READ/WRITE operations, thus are registered only for 1172 * LOCAL access. 1173 */ 1174 struct rpcrdma_regbuf * 1175 rpcrdma_alloc_regbuf(struct rpcrdma_ia *ia, size_t size, gfp_t flags) 1176 { 1177 struct rpcrdma_regbuf *rb; 1178 struct ib_sge *iov; 1179 1180 rb = kmalloc(sizeof(*rb) + size, flags); 1181 if (rb == NULL) 1182 goto out; 1183 1184 iov = &rb->rg_iov; 1185 iov->addr = ib_dma_map_single(ia->ri_device, 1186 (void *)rb->rg_base, size, 1187 DMA_BIDIRECTIONAL); 1188 if (ib_dma_mapping_error(ia->ri_device, iov->addr)) 1189 goto out_free; 1190 1191 iov->length = size; 1192 iov->lkey = ia->ri_pd->local_dma_lkey; 1193 rb->rg_size = size; 1194 rb->rg_owner = NULL; 1195 return rb; 1196 1197 out_free: 1198 kfree(rb); 1199 out: 1200 return ERR_PTR(-ENOMEM); 1201 } 1202 1203 /** 1204 * rpcrdma_free_regbuf - deregister and free registered buffer 1205 * @ia: controlling rpcrdma_ia 1206 * @rb: regbuf to be deregistered and freed 1207 */ 1208 void 1209 rpcrdma_free_regbuf(struct rpcrdma_ia *ia, struct rpcrdma_regbuf *rb) 1210 { 1211 struct ib_sge *iov; 1212 1213 if (!rb) 1214 return; 1215 1216 iov = &rb->rg_iov; 1217 ib_dma_unmap_single(ia->ri_device, 1218 iov->addr, iov->length, DMA_BIDIRECTIONAL); 1219 kfree(rb); 1220 } 1221 1222 /* 1223 * Prepost any receive buffer, then post send. 1224 * 1225 * Receive buffer is donated to hardware, reclaimed upon recv completion. 1226 */ 1227 int 1228 rpcrdma_ep_post(struct rpcrdma_ia *ia, 1229 struct rpcrdma_ep *ep, 1230 struct rpcrdma_req *req) 1231 { 1232 struct ib_device *device = ia->ri_device; 1233 struct ib_send_wr send_wr, *send_wr_fail; 1234 struct rpcrdma_rep *rep = req->rl_reply; 1235 struct ib_sge *iov = req->rl_send_iov; 1236 int i, rc; 1237 1238 if (rep) { 1239 rc = rpcrdma_ep_post_recv(ia, ep, rep); 1240 if (rc) 1241 return rc; 1242 req->rl_reply = NULL; 1243 } 1244 1245 send_wr.next = NULL; 1246 send_wr.wr_cqe = &req->rl_cqe; 1247 send_wr.sg_list = iov; 1248 send_wr.num_sge = req->rl_niovs; 1249 send_wr.opcode = IB_WR_SEND; 1250 1251 for (i = 0; i < send_wr.num_sge; i++) 1252 ib_dma_sync_single_for_device(device, iov[i].addr, 1253 iov[i].length, DMA_TO_DEVICE); 1254 dprintk("RPC: %s: posting %d s/g entries\n", 1255 __func__, send_wr.num_sge); 1256 1257 if (DECR_CQCOUNT(ep) > 0) 1258 send_wr.send_flags = 0; 1259 else { /* Provider must take a send completion every now and then */ 1260 INIT_CQCOUNT(ep); 1261 send_wr.send_flags = IB_SEND_SIGNALED; 1262 } 1263 1264 rc = ib_post_send(ia->ri_id->qp, &send_wr, &send_wr_fail); 1265 if (rc) 1266 goto out_postsend_err; 1267 return 0; 1268 1269 out_postsend_err: 1270 pr_err("rpcrdma: RDMA Send ib_post_send returned %i\n", rc); 1271 return -ENOTCONN; 1272 } 1273 1274 /* 1275 * (Re)post a receive buffer. 1276 */ 1277 int 1278 rpcrdma_ep_post_recv(struct rpcrdma_ia *ia, 1279 struct rpcrdma_ep *ep, 1280 struct rpcrdma_rep *rep) 1281 { 1282 struct ib_recv_wr recv_wr, *recv_wr_fail; 1283 int rc; 1284 1285 recv_wr.next = NULL; 1286 recv_wr.wr_cqe = &rep->rr_cqe; 1287 recv_wr.sg_list = &rep->rr_rdmabuf->rg_iov; 1288 recv_wr.num_sge = 1; 1289 1290 ib_dma_sync_single_for_cpu(ia->ri_device, 1291 rdmab_addr(rep->rr_rdmabuf), 1292 rdmab_length(rep->rr_rdmabuf), 1293 DMA_BIDIRECTIONAL); 1294 1295 rc = ib_post_recv(ia->ri_id->qp, &recv_wr, &recv_wr_fail); 1296 if (rc) 1297 goto out_postrecv; 1298 return 0; 1299 1300 out_postrecv: 1301 pr_err("rpcrdma: ib_post_recv returned %i\n", rc); 1302 return -ENOTCONN; 1303 } 1304 1305 /** 1306 * rpcrdma_ep_post_extra_recv - Post buffers for incoming backchannel requests 1307 * @r_xprt: transport associated with these backchannel resources 1308 * @min_reqs: minimum number of incoming requests expected 1309 * 1310 * Returns zero if all requested buffers were posted, or a negative errno. 1311 */ 1312 int 1313 rpcrdma_ep_post_extra_recv(struct rpcrdma_xprt *r_xprt, unsigned int count) 1314 { 1315 struct rpcrdma_buffer *buffers = &r_xprt->rx_buf; 1316 struct rpcrdma_ia *ia = &r_xprt->rx_ia; 1317 struct rpcrdma_ep *ep = &r_xprt->rx_ep; 1318 struct rpcrdma_rep *rep; 1319 int rc; 1320 1321 while (count--) { 1322 spin_lock(&buffers->rb_lock); 1323 if (list_empty(&buffers->rb_recv_bufs)) 1324 goto out_reqbuf; 1325 rep = rpcrdma_buffer_get_rep_locked(buffers); 1326 spin_unlock(&buffers->rb_lock); 1327 1328 rc = rpcrdma_ep_post_recv(ia, ep, rep); 1329 if (rc) 1330 goto out_rc; 1331 } 1332 1333 return 0; 1334 1335 out_reqbuf: 1336 spin_unlock(&buffers->rb_lock); 1337 pr_warn("%s: no extra receive buffers\n", __func__); 1338 return -ENOMEM; 1339 1340 out_rc: 1341 rpcrdma_recv_buffer_put(rep); 1342 return rc; 1343 } 1344