1 /* 2 * Copyright (c) 2014 Open Grid Computing, Inc. All rights reserved. 3 * Copyright (c) 2005-2007 Network Appliance, Inc. All rights reserved. 4 * 5 * This software is available to you under a choice of one of two 6 * licenses. You may choose to be licensed under the terms of the GNU 7 * General Public License (GPL) Version 2, available from the file 8 * COPYING in the main directory of this source tree, or the BSD-type 9 * license below: 10 * 11 * Redistribution and use in source and binary forms, with or without 12 * modification, are permitted provided that the following conditions 13 * are met: 14 * 15 * Redistributions of source code must retain the above copyright 16 * notice, this list of conditions and the following disclaimer. 17 * 18 * Redistributions in binary form must reproduce the above 19 * copyright notice, this list of conditions and the following 20 * disclaimer in the documentation and/or other materials provided 21 * with the distribution. 22 * 23 * Neither the name of the Network Appliance, Inc. nor the names of 24 * its contributors may be used to endorse or promote products 25 * derived from this software without specific prior written 26 * permission. 27 * 28 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 29 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 30 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 31 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 32 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 33 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 34 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 35 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 36 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 37 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 38 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 39 * 40 * Author: Tom Tucker <tom@opengridcomputing.com> 41 */ 42 43 #include <linux/sunrpc/svc_xprt.h> 44 #include <linux/sunrpc/addr.h> 45 #include <linux/sunrpc/debug.h> 46 #include <linux/sunrpc/rpc_rdma.h> 47 #include <linux/interrupt.h> 48 #include <linux/sched.h> 49 #include <linux/slab.h> 50 #include <linux/spinlock.h> 51 #include <linux/workqueue.h> 52 #include <rdma/ib_verbs.h> 53 #include <rdma/rdma_cm.h> 54 #include <linux/sunrpc/svc_rdma.h> 55 #include <linux/export.h> 56 #include "xprt_rdma.h" 57 58 #define RPCDBG_FACILITY RPCDBG_SVCXPRT 59 60 static struct svcxprt_rdma *rdma_create_xprt(struct svc_serv *, int); 61 static struct svc_xprt *svc_rdma_create(struct svc_serv *serv, 62 struct net *net, 63 struct sockaddr *sa, int salen, 64 int flags); 65 static struct svc_xprt *svc_rdma_accept(struct svc_xprt *xprt); 66 static void svc_rdma_release_rqst(struct svc_rqst *); 67 static void svc_rdma_detach(struct svc_xprt *xprt); 68 static void svc_rdma_free(struct svc_xprt *xprt); 69 static int svc_rdma_has_wspace(struct svc_xprt *xprt); 70 static int svc_rdma_secure_port(struct svc_rqst *); 71 static void svc_rdma_kill_temp_xprt(struct svc_xprt *); 72 73 static struct svc_xprt_ops svc_rdma_ops = { 74 .xpo_create = svc_rdma_create, 75 .xpo_recvfrom = svc_rdma_recvfrom, 76 .xpo_sendto = svc_rdma_sendto, 77 .xpo_release_rqst = svc_rdma_release_rqst, 78 .xpo_detach = svc_rdma_detach, 79 .xpo_free = svc_rdma_free, 80 .xpo_prep_reply_hdr = svc_rdma_prep_reply_hdr, 81 .xpo_has_wspace = svc_rdma_has_wspace, 82 .xpo_accept = svc_rdma_accept, 83 .xpo_secure_port = svc_rdma_secure_port, 84 .xpo_kill_temp_xprt = svc_rdma_kill_temp_xprt, 85 }; 86 87 struct svc_xprt_class svc_rdma_class = { 88 .xcl_name = "rdma", 89 .xcl_owner = THIS_MODULE, 90 .xcl_ops = &svc_rdma_ops, 91 .xcl_max_payload = RPCSVC_MAXPAYLOAD_RDMA, 92 .xcl_ident = XPRT_TRANSPORT_RDMA, 93 }; 94 95 #if defined(CONFIG_SUNRPC_BACKCHANNEL) 96 static struct svc_xprt *svc_rdma_bc_create(struct svc_serv *, struct net *, 97 struct sockaddr *, int, int); 98 static void svc_rdma_bc_detach(struct svc_xprt *); 99 static void svc_rdma_bc_free(struct svc_xprt *); 100 101 static struct svc_xprt_ops svc_rdma_bc_ops = { 102 .xpo_create = svc_rdma_bc_create, 103 .xpo_detach = svc_rdma_bc_detach, 104 .xpo_free = svc_rdma_bc_free, 105 .xpo_prep_reply_hdr = svc_rdma_prep_reply_hdr, 106 .xpo_secure_port = svc_rdma_secure_port, 107 }; 108 109 struct svc_xprt_class svc_rdma_bc_class = { 110 .xcl_name = "rdma-bc", 111 .xcl_owner = THIS_MODULE, 112 .xcl_ops = &svc_rdma_bc_ops, 113 .xcl_max_payload = (1024 - RPCRDMA_HDRLEN_MIN) 114 }; 115 116 static struct svc_xprt *svc_rdma_bc_create(struct svc_serv *serv, 117 struct net *net, 118 struct sockaddr *sa, int salen, 119 int flags) 120 { 121 struct svcxprt_rdma *cma_xprt; 122 struct svc_xprt *xprt; 123 124 cma_xprt = rdma_create_xprt(serv, 0); 125 if (!cma_xprt) 126 return ERR_PTR(-ENOMEM); 127 xprt = &cma_xprt->sc_xprt; 128 129 svc_xprt_init(net, &svc_rdma_bc_class, xprt, serv); 130 serv->sv_bc_xprt = xprt; 131 132 dprintk("svcrdma: %s(%p)\n", __func__, xprt); 133 return xprt; 134 } 135 136 static void svc_rdma_bc_detach(struct svc_xprt *xprt) 137 { 138 dprintk("svcrdma: %s(%p)\n", __func__, xprt); 139 } 140 141 static void svc_rdma_bc_free(struct svc_xprt *xprt) 142 { 143 struct svcxprt_rdma *rdma = 144 container_of(xprt, struct svcxprt_rdma, sc_xprt); 145 146 dprintk("svcrdma: %s(%p)\n", __func__, xprt); 147 if (xprt) 148 kfree(rdma); 149 } 150 #endif /* CONFIG_SUNRPC_BACKCHANNEL */ 151 152 static struct svc_rdma_op_ctxt *alloc_ctxt(struct svcxprt_rdma *xprt, 153 gfp_t flags) 154 { 155 struct svc_rdma_op_ctxt *ctxt; 156 157 ctxt = kmalloc(sizeof(*ctxt), flags); 158 if (ctxt) { 159 ctxt->xprt = xprt; 160 INIT_LIST_HEAD(&ctxt->list); 161 } 162 return ctxt; 163 } 164 165 static bool svc_rdma_prealloc_ctxts(struct svcxprt_rdma *xprt) 166 { 167 unsigned int i; 168 169 /* Each RPC/RDMA credit can consume a number of send 170 * and receive WQEs. One ctxt is allocated for each. 171 */ 172 i = xprt->sc_sq_depth + xprt->sc_rq_depth; 173 174 while (i--) { 175 struct svc_rdma_op_ctxt *ctxt; 176 177 ctxt = alloc_ctxt(xprt, GFP_KERNEL); 178 if (!ctxt) { 179 dprintk("svcrdma: No memory for RDMA ctxt\n"); 180 return false; 181 } 182 list_add(&ctxt->list, &xprt->sc_ctxts); 183 } 184 return true; 185 } 186 187 struct svc_rdma_op_ctxt *svc_rdma_get_context(struct svcxprt_rdma *xprt) 188 { 189 struct svc_rdma_op_ctxt *ctxt = NULL; 190 191 spin_lock(&xprt->sc_ctxt_lock); 192 xprt->sc_ctxt_used++; 193 if (list_empty(&xprt->sc_ctxts)) 194 goto out_empty; 195 196 ctxt = list_first_entry(&xprt->sc_ctxts, 197 struct svc_rdma_op_ctxt, list); 198 list_del(&ctxt->list); 199 spin_unlock(&xprt->sc_ctxt_lock); 200 201 out: 202 ctxt->count = 0; 203 ctxt->mapped_sges = 0; 204 ctxt->frmr = NULL; 205 return ctxt; 206 207 out_empty: 208 /* Either pre-allocation missed the mark, or send 209 * queue accounting is broken. 210 */ 211 spin_unlock(&xprt->sc_ctxt_lock); 212 213 ctxt = alloc_ctxt(xprt, GFP_NOIO); 214 if (ctxt) 215 goto out; 216 217 spin_lock(&xprt->sc_ctxt_lock); 218 xprt->sc_ctxt_used--; 219 spin_unlock(&xprt->sc_ctxt_lock); 220 WARN_ONCE(1, "svcrdma: empty RDMA ctxt list?\n"); 221 return NULL; 222 } 223 224 void svc_rdma_unmap_dma(struct svc_rdma_op_ctxt *ctxt) 225 { 226 struct svcxprt_rdma *xprt = ctxt->xprt; 227 struct ib_device *device = xprt->sc_cm_id->device; 228 u32 lkey = xprt->sc_pd->local_dma_lkey; 229 unsigned int i; 230 231 for (i = 0; i < ctxt->mapped_sges; i++) { 232 /* 233 * Unmap the DMA addr in the SGE if the lkey matches 234 * the local_dma_lkey, otherwise, ignore it since it is 235 * an FRMR lkey and will be unmapped later when the 236 * last WR that uses it completes. 237 */ 238 if (ctxt->sge[i].lkey == lkey) 239 ib_dma_unmap_page(device, 240 ctxt->sge[i].addr, 241 ctxt->sge[i].length, 242 ctxt->direction); 243 } 244 ctxt->mapped_sges = 0; 245 } 246 247 void svc_rdma_put_context(struct svc_rdma_op_ctxt *ctxt, int free_pages) 248 { 249 struct svcxprt_rdma *xprt = ctxt->xprt; 250 int i; 251 252 if (free_pages) 253 for (i = 0; i < ctxt->count; i++) 254 put_page(ctxt->pages[i]); 255 256 spin_lock(&xprt->sc_ctxt_lock); 257 xprt->sc_ctxt_used--; 258 list_add(&ctxt->list, &xprt->sc_ctxts); 259 spin_unlock(&xprt->sc_ctxt_lock); 260 } 261 262 static void svc_rdma_destroy_ctxts(struct svcxprt_rdma *xprt) 263 { 264 while (!list_empty(&xprt->sc_ctxts)) { 265 struct svc_rdma_op_ctxt *ctxt; 266 267 ctxt = list_first_entry(&xprt->sc_ctxts, 268 struct svc_rdma_op_ctxt, list); 269 list_del(&ctxt->list); 270 kfree(ctxt); 271 } 272 } 273 274 static struct svc_rdma_req_map *alloc_req_map(gfp_t flags) 275 { 276 struct svc_rdma_req_map *map; 277 278 map = kmalloc(sizeof(*map), flags); 279 if (map) 280 INIT_LIST_HEAD(&map->free); 281 return map; 282 } 283 284 static bool svc_rdma_prealloc_maps(struct svcxprt_rdma *xprt) 285 { 286 unsigned int i; 287 288 /* One for each receive buffer on this connection. */ 289 i = xprt->sc_max_requests; 290 291 while (i--) { 292 struct svc_rdma_req_map *map; 293 294 map = alloc_req_map(GFP_KERNEL); 295 if (!map) { 296 dprintk("svcrdma: No memory for request map\n"); 297 return false; 298 } 299 list_add(&map->free, &xprt->sc_maps); 300 } 301 return true; 302 } 303 304 struct svc_rdma_req_map *svc_rdma_get_req_map(struct svcxprt_rdma *xprt) 305 { 306 struct svc_rdma_req_map *map = NULL; 307 308 spin_lock(&xprt->sc_map_lock); 309 if (list_empty(&xprt->sc_maps)) 310 goto out_empty; 311 312 map = list_first_entry(&xprt->sc_maps, 313 struct svc_rdma_req_map, free); 314 list_del_init(&map->free); 315 spin_unlock(&xprt->sc_map_lock); 316 317 out: 318 map->count = 0; 319 return map; 320 321 out_empty: 322 spin_unlock(&xprt->sc_map_lock); 323 324 /* Pre-allocation amount was incorrect */ 325 map = alloc_req_map(GFP_NOIO); 326 if (map) 327 goto out; 328 329 WARN_ONCE(1, "svcrdma: empty request map list?\n"); 330 return NULL; 331 } 332 333 void svc_rdma_put_req_map(struct svcxprt_rdma *xprt, 334 struct svc_rdma_req_map *map) 335 { 336 spin_lock(&xprt->sc_map_lock); 337 list_add(&map->free, &xprt->sc_maps); 338 spin_unlock(&xprt->sc_map_lock); 339 } 340 341 static void svc_rdma_destroy_maps(struct svcxprt_rdma *xprt) 342 { 343 while (!list_empty(&xprt->sc_maps)) { 344 struct svc_rdma_req_map *map; 345 346 map = list_first_entry(&xprt->sc_maps, 347 struct svc_rdma_req_map, free); 348 list_del(&map->free); 349 kfree(map); 350 } 351 } 352 353 /* QP event handler */ 354 static void qp_event_handler(struct ib_event *event, void *context) 355 { 356 struct svc_xprt *xprt = context; 357 358 switch (event->event) { 359 /* These are considered benign events */ 360 case IB_EVENT_PATH_MIG: 361 case IB_EVENT_COMM_EST: 362 case IB_EVENT_SQ_DRAINED: 363 case IB_EVENT_QP_LAST_WQE_REACHED: 364 dprintk("svcrdma: QP event %s (%d) received for QP=%p\n", 365 ib_event_msg(event->event), event->event, 366 event->element.qp); 367 break; 368 /* These are considered fatal events */ 369 case IB_EVENT_PATH_MIG_ERR: 370 case IB_EVENT_QP_FATAL: 371 case IB_EVENT_QP_REQ_ERR: 372 case IB_EVENT_QP_ACCESS_ERR: 373 case IB_EVENT_DEVICE_FATAL: 374 default: 375 dprintk("svcrdma: QP ERROR event %s (%d) received for QP=%p, " 376 "closing transport\n", 377 ib_event_msg(event->event), event->event, 378 event->element.qp); 379 set_bit(XPT_CLOSE, &xprt->xpt_flags); 380 break; 381 } 382 } 383 384 /** 385 * svc_rdma_wc_receive - Invoked by RDMA provider for each polled Receive WC 386 * @cq: completion queue 387 * @wc: completed WR 388 * 389 */ 390 static void svc_rdma_wc_receive(struct ib_cq *cq, struct ib_wc *wc) 391 { 392 struct svcxprt_rdma *xprt = cq->cq_context; 393 struct ib_cqe *cqe = wc->wr_cqe; 394 struct svc_rdma_op_ctxt *ctxt; 395 396 /* WARNING: Only wc->wr_cqe and wc->status are reliable */ 397 ctxt = container_of(cqe, struct svc_rdma_op_ctxt, cqe); 398 svc_rdma_unmap_dma(ctxt); 399 400 if (wc->status != IB_WC_SUCCESS) 401 goto flushed; 402 403 /* All wc fields are now known to be valid */ 404 ctxt->byte_len = wc->byte_len; 405 spin_lock(&xprt->sc_rq_dto_lock); 406 list_add_tail(&ctxt->list, &xprt->sc_rq_dto_q); 407 spin_unlock(&xprt->sc_rq_dto_lock); 408 409 set_bit(XPT_DATA, &xprt->sc_xprt.xpt_flags); 410 if (test_bit(RDMAXPRT_CONN_PENDING, &xprt->sc_flags)) 411 goto out; 412 svc_xprt_enqueue(&xprt->sc_xprt); 413 goto out; 414 415 flushed: 416 if (wc->status != IB_WC_WR_FLUSH_ERR) 417 pr_warn("svcrdma: receive: %s (%u/0x%x)\n", 418 ib_wc_status_msg(wc->status), 419 wc->status, wc->vendor_err); 420 set_bit(XPT_CLOSE, &xprt->sc_xprt.xpt_flags); 421 svc_rdma_put_context(ctxt, 1); 422 423 out: 424 svc_xprt_put(&xprt->sc_xprt); 425 } 426 427 static void svc_rdma_send_wc_common(struct svcxprt_rdma *xprt, 428 struct ib_wc *wc, 429 const char *opname) 430 { 431 if (wc->status != IB_WC_SUCCESS) 432 goto err; 433 434 out: 435 atomic_inc(&xprt->sc_sq_avail); 436 wake_up(&xprt->sc_send_wait); 437 return; 438 439 err: 440 set_bit(XPT_CLOSE, &xprt->sc_xprt.xpt_flags); 441 if (wc->status != IB_WC_WR_FLUSH_ERR) 442 pr_err("svcrdma: %s: %s (%u/0x%x)\n", 443 opname, ib_wc_status_msg(wc->status), 444 wc->status, wc->vendor_err); 445 goto out; 446 } 447 448 static void svc_rdma_send_wc_common_put(struct ib_cq *cq, struct ib_wc *wc, 449 const char *opname) 450 { 451 struct svcxprt_rdma *xprt = cq->cq_context; 452 453 svc_rdma_send_wc_common(xprt, wc, opname); 454 svc_xprt_put(&xprt->sc_xprt); 455 } 456 457 /** 458 * svc_rdma_wc_send - Invoked by RDMA provider for each polled Send WC 459 * @cq: completion queue 460 * @wc: completed WR 461 * 462 */ 463 void svc_rdma_wc_send(struct ib_cq *cq, struct ib_wc *wc) 464 { 465 struct ib_cqe *cqe = wc->wr_cqe; 466 struct svc_rdma_op_ctxt *ctxt; 467 468 svc_rdma_send_wc_common_put(cq, wc, "send"); 469 470 ctxt = container_of(cqe, struct svc_rdma_op_ctxt, cqe); 471 svc_rdma_unmap_dma(ctxt); 472 svc_rdma_put_context(ctxt, 1); 473 } 474 475 /** 476 * svc_rdma_wc_write - Invoked by RDMA provider for each polled Write WC 477 * @cq: completion queue 478 * @wc: completed WR 479 * 480 */ 481 void svc_rdma_wc_write(struct ib_cq *cq, struct ib_wc *wc) 482 { 483 struct ib_cqe *cqe = wc->wr_cqe; 484 struct svc_rdma_op_ctxt *ctxt; 485 486 svc_rdma_send_wc_common_put(cq, wc, "write"); 487 488 ctxt = container_of(cqe, struct svc_rdma_op_ctxt, cqe); 489 svc_rdma_unmap_dma(ctxt); 490 svc_rdma_put_context(ctxt, 0); 491 } 492 493 /** 494 * svc_rdma_wc_reg - Invoked by RDMA provider for each polled FASTREG WC 495 * @cq: completion queue 496 * @wc: completed WR 497 * 498 */ 499 void svc_rdma_wc_reg(struct ib_cq *cq, struct ib_wc *wc) 500 { 501 svc_rdma_send_wc_common_put(cq, wc, "fastreg"); 502 } 503 504 /** 505 * svc_rdma_wc_read - Invoked by RDMA provider for each polled Read WC 506 * @cq: completion queue 507 * @wc: completed WR 508 * 509 */ 510 void svc_rdma_wc_read(struct ib_cq *cq, struct ib_wc *wc) 511 { 512 struct svcxprt_rdma *xprt = cq->cq_context; 513 struct ib_cqe *cqe = wc->wr_cqe; 514 struct svc_rdma_op_ctxt *ctxt; 515 516 svc_rdma_send_wc_common(xprt, wc, "read"); 517 518 ctxt = container_of(cqe, struct svc_rdma_op_ctxt, cqe); 519 svc_rdma_unmap_dma(ctxt); 520 svc_rdma_put_frmr(xprt, ctxt->frmr); 521 522 if (test_bit(RDMACTXT_F_LAST_CTXT, &ctxt->flags)) { 523 struct svc_rdma_op_ctxt *read_hdr; 524 525 read_hdr = ctxt->read_hdr; 526 spin_lock(&xprt->sc_rq_dto_lock); 527 list_add_tail(&read_hdr->list, 528 &xprt->sc_read_complete_q); 529 spin_unlock(&xprt->sc_rq_dto_lock); 530 531 set_bit(XPT_DATA, &xprt->sc_xprt.xpt_flags); 532 svc_xprt_enqueue(&xprt->sc_xprt); 533 } 534 535 svc_rdma_put_context(ctxt, 0); 536 svc_xprt_put(&xprt->sc_xprt); 537 } 538 539 /** 540 * svc_rdma_wc_inv - Invoked by RDMA provider for each polled LOCAL_INV WC 541 * @cq: completion queue 542 * @wc: completed WR 543 * 544 */ 545 void svc_rdma_wc_inv(struct ib_cq *cq, struct ib_wc *wc) 546 { 547 svc_rdma_send_wc_common_put(cq, wc, "localInv"); 548 } 549 550 static struct svcxprt_rdma *rdma_create_xprt(struct svc_serv *serv, 551 int listener) 552 { 553 struct svcxprt_rdma *cma_xprt = kzalloc(sizeof *cma_xprt, GFP_KERNEL); 554 555 if (!cma_xprt) 556 return NULL; 557 svc_xprt_init(&init_net, &svc_rdma_class, &cma_xprt->sc_xprt, serv); 558 INIT_LIST_HEAD(&cma_xprt->sc_accept_q); 559 INIT_LIST_HEAD(&cma_xprt->sc_rq_dto_q); 560 INIT_LIST_HEAD(&cma_xprt->sc_read_complete_q); 561 INIT_LIST_HEAD(&cma_xprt->sc_frmr_q); 562 INIT_LIST_HEAD(&cma_xprt->sc_ctxts); 563 INIT_LIST_HEAD(&cma_xprt->sc_maps); 564 init_waitqueue_head(&cma_xprt->sc_send_wait); 565 566 spin_lock_init(&cma_xprt->sc_lock); 567 spin_lock_init(&cma_xprt->sc_rq_dto_lock); 568 spin_lock_init(&cma_xprt->sc_frmr_q_lock); 569 spin_lock_init(&cma_xprt->sc_ctxt_lock); 570 spin_lock_init(&cma_xprt->sc_map_lock); 571 572 /* 573 * Note that this implies that the underlying transport support 574 * has some form of congestion control (see RFC 7530 section 3.1 575 * paragraph 2). For now, we assume that all supported RDMA 576 * transports are suitable here. 577 */ 578 set_bit(XPT_CONG_CTRL, &cma_xprt->sc_xprt.xpt_flags); 579 580 if (listener) 581 set_bit(XPT_LISTENER, &cma_xprt->sc_xprt.xpt_flags); 582 583 return cma_xprt; 584 } 585 586 int svc_rdma_post_recv(struct svcxprt_rdma *xprt, gfp_t flags) 587 { 588 struct ib_recv_wr recv_wr, *bad_recv_wr; 589 struct svc_rdma_op_ctxt *ctxt; 590 struct page *page; 591 dma_addr_t pa; 592 int sge_no; 593 int buflen; 594 int ret; 595 596 ctxt = svc_rdma_get_context(xprt); 597 buflen = 0; 598 ctxt->direction = DMA_FROM_DEVICE; 599 ctxt->cqe.done = svc_rdma_wc_receive; 600 for (sge_no = 0; buflen < xprt->sc_max_req_size; sge_no++) { 601 if (sge_no >= xprt->sc_max_sge) { 602 pr_err("svcrdma: Too many sges (%d)\n", sge_no); 603 goto err_put_ctxt; 604 } 605 page = alloc_page(flags); 606 if (!page) 607 goto err_put_ctxt; 608 ctxt->pages[sge_no] = page; 609 pa = ib_dma_map_page(xprt->sc_cm_id->device, 610 page, 0, PAGE_SIZE, 611 DMA_FROM_DEVICE); 612 if (ib_dma_mapping_error(xprt->sc_cm_id->device, pa)) 613 goto err_put_ctxt; 614 svc_rdma_count_mappings(xprt, ctxt); 615 ctxt->sge[sge_no].addr = pa; 616 ctxt->sge[sge_no].length = PAGE_SIZE; 617 ctxt->sge[sge_no].lkey = xprt->sc_pd->local_dma_lkey; 618 ctxt->count = sge_no + 1; 619 buflen += PAGE_SIZE; 620 } 621 recv_wr.next = NULL; 622 recv_wr.sg_list = &ctxt->sge[0]; 623 recv_wr.num_sge = ctxt->count; 624 recv_wr.wr_cqe = &ctxt->cqe; 625 626 svc_xprt_get(&xprt->sc_xprt); 627 ret = ib_post_recv(xprt->sc_qp, &recv_wr, &bad_recv_wr); 628 if (ret) { 629 svc_rdma_unmap_dma(ctxt); 630 svc_rdma_put_context(ctxt, 1); 631 svc_xprt_put(&xprt->sc_xprt); 632 } 633 return ret; 634 635 err_put_ctxt: 636 svc_rdma_unmap_dma(ctxt); 637 svc_rdma_put_context(ctxt, 1); 638 return -ENOMEM; 639 } 640 641 int svc_rdma_repost_recv(struct svcxprt_rdma *xprt, gfp_t flags) 642 { 643 int ret = 0; 644 645 ret = svc_rdma_post_recv(xprt, flags); 646 if (ret) { 647 pr_err("svcrdma: could not post a receive buffer, err=%d.\n", 648 ret); 649 pr_err("svcrdma: closing transport %p.\n", xprt); 650 set_bit(XPT_CLOSE, &xprt->sc_xprt.xpt_flags); 651 ret = -ENOTCONN; 652 } 653 return ret; 654 } 655 656 static void 657 svc_rdma_parse_connect_private(struct svcxprt_rdma *newxprt, 658 struct rdma_conn_param *param) 659 { 660 const struct rpcrdma_connect_private *pmsg = param->private_data; 661 662 if (pmsg && 663 pmsg->cp_magic == rpcrdma_cmp_magic && 664 pmsg->cp_version == RPCRDMA_CMP_VERSION) { 665 newxprt->sc_snd_w_inv = pmsg->cp_flags & 666 RPCRDMA_CMP_F_SND_W_INV_OK; 667 668 dprintk("svcrdma: client send_size %u, recv_size %u " 669 "remote inv %ssupported\n", 670 rpcrdma_decode_buffer_size(pmsg->cp_send_size), 671 rpcrdma_decode_buffer_size(pmsg->cp_recv_size), 672 newxprt->sc_snd_w_inv ? "" : "un"); 673 } 674 } 675 676 /* 677 * This function handles the CONNECT_REQUEST event on a listening 678 * endpoint. It is passed the cma_id for the _new_ connection. The context in 679 * this cma_id is inherited from the listening cma_id and is the svc_xprt 680 * structure for the listening endpoint. 681 * 682 * This function creates a new xprt for the new connection and enqueues it on 683 * the accept queue for the listent xprt. When the listen thread is kicked, it 684 * will call the recvfrom method on the listen xprt which will accept the new 685 * connection. 686 */ 687 static void handle_connect_req(struct rdma_cm_id *new_cma_id, 688 struct rdma_conn_param *param) 689 { 690 struct svcxprt_rdma *listen_xprt = new_cma_id->context; 691 struct svcxprt_rdma *newxprt; 692 struct sockaddr *sa; 693 694 /* Create a new transport */ 695 newxprt = rdma_create_xprt(listen_xprt->sc_xprt.xpt_server, 0); 696 if (!newxprt) { 697 dprintk("svcrdma: failed to create new transport\n"); 698 return; 699 } 700 newxprt->sc_cm_id = new_cma_id; 701 new_cma_id->context = newxprt; 702 dprintk("svcrdma: Creating newxprt=%p, cm_id=%p, listenxprt=%p\n", 703 newxprt, newxprt->sc_cm_id, listen_xprt); 704 svc_rdma_parse_connect_private(newxprt, param); 705 706 /* Save client advertised inbound read limit for use later in accept. */ 707 newxprt->sc_ord = param->initiator_depth; 708 709 /* Set the local and remote addresses in the transport */ 710 sa = (struct sockaddr *)&newxprt->sc_cm_id->route.addr.dst_addr; 711 svc_xprt_set_remote(&newxprt->sc_xprt, sa, svc_addr_len(sa)); 712 sa = (struct sockaddr *)&newxprt->sc_cm_id->route.addr.src_addr; 713 svc_xprt_set_local(&newxprt->sc_xprt, sa, svc_addr_len(sa)); 714 715 /* 716 * Enqueue the new transport on the accept queue of the listening 717 * transport 718 */ 719 spin_lock_bh(&listen_xprt->sc_lock); 720 list_add_tail(&newxprt->sc_accept_q, &listen_xprt->sc_accept_q); 721 spin_unlock_bh(&listen_xprt->sc_lock); 722 723 set_bit(XPT_CONN, &listen_xprt->sc_xprt.xpt_flags); 724 svc_xprt_enqueue(&listen_xprt->sc_xprt); 725 } 726 727 /* 728 * Handles events generated on the listening endpoint. These events will be 729 * either be incoming connect requests or adapter removal events. 730 */ 731 static int rdma_listen_handler(struct rdma_cm_id *cma_id, 732 struct rdma_cm_event *event) 733 { 734 struct svcxprt_rdma *xprt = cma_id->context; 735 int ret = 0; 736 737 switch (event->event) { 738 case RDMA_CM_EVENT_CONNECT_REQUEST: 739 dprintk("svcrdma: Connect request on cma_id=%p, xprt = %p, " 740 "event = %s (%d)\n", cma_id, cma_id->context, 741 rdma_event_msg(event->event), event->event); 742 handle_connect_req(cma_id, &event->param.conn); 743 break; 744 745 case RDMA_CM_EVENT_ESTABLISHED: 746 /* Accept complete */ 747 dprintk("svcrdma: Connection completed on LISTEN xprt=%p, " 748 "cm_id=%p\n", xprt, cma_id); 749 break; 750 751 case RDMA_CM_EVENT_DEVICE_REMOVAL: 752 dprintk("svcrdma: Device removal xprt=%p, cm_id=%p\n", 753 xprt, cma_id); 754 if (xprt) 755 set_bit(XPT_CLOSE, &xprt->sc_xprt.xpt_flags); 756 break; 757 758 default: 759 dprintk("svcrdma: Unexpected event on listening endpoint %p, " 760 "event = %s (%d)\n", cma_id, 761 rdma_event_msg(event->event), event->event); 762 break; 763 } 764 765 return ret; 766 } 767 768 static int rdma_cma_handler(struct rdma_cm_id *cma_id, 769 struct rdma_cm_event *event) 770 { 771 struct svc_xprt *xprt = cma_id->context; 772 struct svcxprt_rdma *rdma = 773 container_of(xprt, struct svcxprt_rdma, sc_xprt); 774 switch (event->event) { 775 case RDMA_CM_EVENT_ESTABLISHED: 776 /* Accept complete */ 777 svc_xprt_get(xprt); 778 dprintk("svcrdma: Connection completed on DTO xprt=%p, " 779 "cm_id=%p\n", xprt, cma_id); 780 clear_bit(RDMAXPRT_CONN_PENDING, &rdma->sc_flags); 781 svc_xprt_enqueue(xprt); 782 break; 783 case RDMA_CM_EVENT_DISCONNECTED: 784 dprintk("svcrdma: Disconnect on DTO xprt=%p, cm_id=%p\n", 785 xprt, cma_id); 786 if (xprt) { 787 set_bit(XPT_CLOSE, &xprt->xpt_flags); 788 svc_xprt_enqueue(xprt); 789 svc_xprt_put(xprt); 790 } 791 break; 792 case RDMA_CM_EVENT_DEVICE_REMOVAL: 793 dprintk("svcrdma: Device removal cma_id=%p, xprt = %p, " 794 "event = %s (%d)\n", cma_id, xprt, 795 rdma_event_msg(event->event), event->event); 796 if (xprt) { 797 set_bit(XPT_CLOSE, &xprt->xpt_flags); 798 svc_xprt_enqueue(xprt); 799 svc_xprt_put(xprt); 800 } 801 break; 802 default: 803 dprintk("svcrdma: Unexpected event on DTO endpoint %p, " 804 "event = %s (%d)\n", cma_id, 805 rdma_event_msg(event->event), event->event); 806 break; 807 } 808 return 0; 809 } 810 811 /* 812 * Create a listening RDMA service endpoint. 813 */ 814 static struct svc_xprt *svc_rdma_create(struct svc_serv *serv, 815 struct net *net, 816 struct sockaddr *sa, int salen, 817 int flags) 818 { 819 struct rdma_cm_id *listen_id; 820 struct svcxprt_rdma *cma_xprt; 821 int ret; 822 823 dprintk("svcrdma: Creating RDMA socket\n"); 824 if ((sa->sa_family != AF_INET) && (sa->sa_family != AF_INET6)) { 825 dprintk("svcrdma: Address family %d is not supported.\n", sa->sa_family); 826 return ERR_PTR(-EAFNOSUPPORT); 827 } 828 cma_xprt = rdma_create_xprt(serv, 1); 829 if (!cma_xprt) 830 return ERR_PTR(-ENOMEM); 831 832 listen_id = rdma_create_id(&init_net, rdma_listen_handler, cma_xprt, 833 RDMA_PS_TCP, IB_QPT_RC); 834 if (IS_ERR(listen_id)) { 835 ret = PTR_ERR(listen_id); 836 dprintk("svcrdma: rdma_create_id failed = %d\n", ret); 837 goto err0; 838 } 839 840 /* Allow both IPv4 and IPv6 sockets to bind a single port 841 * at the same time. 842 */ 843 #if IS_ENABLED(CONFIG_IPV6) 844 ret = rdma_set_afonly(listen_id, 1); 845 if (ret) { 846 dprintk("svcrdma: rdma_set_afonly failed = %d\n", ret); 847 goto err1; 848 } 849 #endif 850 ret = rdma_bind_addr(listen_id, sa); 851 if (ret) { 852 dprintk("svcrdma: rdma_bind_addr failed = %d\n", ret); 853 goto err1; 854 } 855 cma_xprt->sc_cm_id = listen_id; 856 857 ret = rdma_listen(listen_id, RPCRDMA_LISTEN_BACKLOG); 858 if (ret) { 859 dprintk("svcrdma: rdma_listen failed = %d\n", ret); 860 goto err1; 861 } 862 863 /* 864 * We need to use the address from the cm_id in case the 865 * caller specified 0 for the port number. 866 */ 867 sa = (struct sockaddr *)&cma_xprt->sc_cm_id->route.addr.src_addr; 868 svc_xprt_set_local(&cma_xprt->sc_xprt, sa, salen); 869 870 return &cma_xprt->sc_xprt; 871 872 err1: 873 rdma_destroy_id(listen_id); 874 err0: 875 kfree(cma_xprt); 876 return ERR_PTR(ret); 877 } 878 879 static struct svc_rdma_fastreg_mr *rdma_alloc_frmr(struct svcxprt_rdma *xprt) 880 { 881 struct ib_mr *mr; 882 struct scatterlist *sg; 883 struct svc_rdma_fastreg_mr *frmr; 884 u32 num_sg; 885 886 frmr = kmalloc(sizeof(*frmr), GFP_KERNEL); 887 if (!frmr) 888 goto err; 889 890 num_sg = min_t(u32, RPCSVC_MAXPAGES, xprt->sc_frmr_pg_list_len); 891 mr = ib_alloc_mr(xprt->sc_pd, IB_MR_TYPE_MEM_REG, num_sg); 892 if (IS_ERR(mr)) 893 goto err_free_frmr; 894 895 sg = kcalloc(RPCSVC_MAXPAGES, sizeof(*sg), GFP_KERNEL); 896 if (!sg) 897 goto err_free_mr; 898 899 sg_init_table(sg, RPCSVC_MAXPAGES); 900 901 frmr->mr = mr; 902 frmr->sg = sg; 903 INIT_LIST_HEAD(&frmr->frmr_list); 904 return frmr; 905 906 err_free_mr: 907 ib_dereg_mr(mr); 908 err_free_frmr: 909 kfree(frmr); 910 err: 911 return ERR_PTR(-ENOMEM); 912 } 913 914 static void rdma_dealloc_frmr_q(struct svcxprt_rdma *xprt) 915 { 916 struct svc_rdma_fastreg_mr *frmr; 917 918 while (!list_empty(&xprt->sc_frmr_q)) { 919 frmr = list_entry(xprt->sc_frmr_q.next, 920 struct svc_rdma_fastreg_mr, frmr_list); 921 list_del_init(&frmr->frmr_list); 922 kfree(frmr->sg); 923 ib_dereg_mr(frmr->mr); 924 kfree(frmr); 925 } 926 } 927 928 struct svc_rdma_fastreg_mr *svc_rdma_get_frmr(struct svcxprt_rdma *rdma) 929 { 930 struct svc_rdma_fastreg_mr *frmr = NULL; 931 932 spin_lock(&rdma->sc_frmr_q_lock); 933 if (!list_empty(&rdma->sc_frmr_q)) { 934 frmr = list_entry(rdma->sc_frmr_q.next, 935 struct svc_rdma_fastreg_mr, frmr_list); 936 list_del_init(&frmr->frmr_list); 937 frmr->sg_nents = 0; 938 } 939 spin_unlock(&rdma->sc_frmr_q_lock); 940 if (frmr) 941 return frmr; 942 943 return rdma_alloc_frmr(rdma); 944 } 945 946 void svc_rdma_put_frmr(struct svcxprt_rdma *rdma, 947 struct svc_rdma_fastreg_mr *frmr) 948 { 949 if (frmr) { 950 ib_dma_unmap_sg(rdma->sc_cm_id->device, 951 frmr->sg, frmr->sg_nents, frmr->direction); 952 spin_lock(&rdma->sc_frmr_q_lock); 953 WARN_ON_ONCE(!list_empty(&frmr->frmr_list)); 954 list_add(&frmr->frmr_list, &rdma->sc_frmr_q); 955 spin_unlock(&rdma->sc_frmr_q_lock); 956 } 957 } 958 959 /* 960 * This is the xpo_recvfrom function for listening endpoints. Its 961 * purpose is to accept incoming connections. The CMA callback handler 962 * has already created a new transport and attached it to the new CMA 963 * ID. 964 * 965 * There is a queue of pending connections hung on the listening 966 * transport. This queue contains the new svc_xprt structure. This 967 * function takes svc_xprt structures off the accept_q and completes 968 * the connection. 969 */ 970 static struct svc_xprt *svc_rdma_accept(struct svc_xprt *xprt) 971 { 972 struct svcxprt_rdma *listen_rdma; 973 struct svcxprt_rdma *newxprt = NULL; 974 struct rdma_conn_param conn_param; 975 struct rpcrdma_connect_private pmsg; 976 struct ib_qp_init_attr qp_attr; 977 struct ib_device *dev; 978 struct sockaddr *sap; 979 unsigned int i; 980 int ret = 0; 981 982 listen_rdma = container_of(xprt, struct svcxprt_rdma, sc_xprt); 983 clear_bit(XPT_CONN, &xprt->xpt_flags); 984 /* Get the next entry off the accept list */ 985 spin_lock_bh(&listen_rdma->sc_lock); 986 if (!list_empty(&listen_rdma->sc_accept_q)) { 987 newxprt = list_entry(listen_rdma->sc_accept_q.next, 988 struct svcxprt_rdma, sc_accept_q); 989 list_del_init(&newxprt->sc_accept_q); 990 } 991 if (!list_empty(&listen_rdma->sc_accept_q)) 992 set_bit(XPT_CONN, &listen_rdma->sc_xprt.xpt_flags); 993 spin_unlock_bh(&listen_rdma->sc_lock); 994 if (!newxprt) 995 return NULL; 996 997 dprintk("svcrdma: newxprt from accept queue = %p, cm_id=%p\n", 998 newxprt, newxprt->sc_cm_id); 999 1000 dev = newxprt->sc_cm_id->device; 1001 1002 /* Qualify the transport resource defaults with the 1003 * capabilities of this particular device */ 1004 newxprt->sc_max_sge = min((size_t)dev->attrs.max_sge, 1005 (size_t)RPCSVC_MAXPAGES); 1006 newxprt->sc_max_sge_rd = min_t(size_t, dev->attrs.max_sge_rd, 1007 RPCSVC_MAXPAGES); 1008 newxprt->sc_max_req_size = svcrdma_max_req_size; 1009 newxprt->sc_max_requests = min_t(u32, dev->attrs.max_qp_wr, 1010 svcrdma_max_requests); 1011 newxprt->sc_fc_credits = cpu_to_be32(newxprt->sc_max_requests); 1012 newxprt->sc_max_bc_requests = min_t(u32, dev->attrs.max_qp_wr, 1013 svcrdma_max_bc_requests); 1014 newxprt->sc_rq_depth = newxprt->sc_max_requests + 1015 newxprt->sc_max_bc_requests; 1016 newxprt->sc_sq_depth = RPCRDMA_SQ_DEPTH_MULT * newxprt->sc_rq_depth; 1017 atomic_set(&newxprt->sc_sq_avail, newxprt->sc_sq_depth); 1018 1019 if (!svc_rdma_prealloc_ctxts(newxprt)) 1020 goto errout; 1021 if (!svc_rdma_prealloc_maps(newxprt)) 1022 goto errout; 1023 1024 /* 1025 * Limit ORD based on client limit, local device limit, and 1026 * configured svcrdma limit. 1027 */ 1028 newxprt->sc_ord = min_t(size_t, dev->attrs.max_qp_rd_atom, newxprt->sc_ord); 1029 newxprt->sc_ord = min_t(size_t, svcrdma_ord, newxprt->sc_ord); 1030 1031 newxprt->sc_pd = ib_alloc_pd(dev, 0); 1032 if (IS_ERR(newxprt->sc_pd)) { 1033 dprintk("svcrdma: error creating PD for connect request\n"); 1034 goto errout; 1035 } 1036 newxprt->sc_sq_cq = ib_alloc_cq(dev, newxprt, newxprt->sc_sq_depth, 1037 0, IB_POLL_WORKQUEUE); 1038 if (IS_ERR(newxprt->sc_sq_cq)) { 1039 dprintk("svcrdma: error creating SQ CQ for connect request\n"); 1040 goto errout; 1041 } 1042 newxprt->sc_rq_cq = ib_alloc_cq(dev, newxprt, newxprt->sc_rq_depth, 1043 0, IB_POLL_WORKQUEUE); 1044 if (IS_ERR(newxprt->sc_rq_cq)) { 1045 dprintk("svcrdma: error creating RQ CQ for connect request\n"); 1046 goto errout; 1047 } 1048 1049 memset(&qp_attr, 0, sizeof qp_attr); 1050 qp_attr.event_handler = qp_event_handler; 1051 qp_attr.qp_context = &newxprt->sc_xprt; 1052 qp_attr.cap.max_send_wr = newxprt->sc_sq_depth; 1053 qp_attr.cap.max_recv_wr = newxprt->sc_rq_depth; 1054 qp_attr.cap.max_send_sge = newxprt->sc_max_sge; 1055 qp_attr.cap.max_recv_sge = newxprt->sc_max_sge; 1056 qp_attr.sq_sig_type = IB_SIGNAL_REQ_WR; 1057 qp_attr.qp_type = IB_QPT_RC; 1058 qp_attr.send_cq = newxprt->sc_sq_cq; 1059 qp_attr.recv_cq = newxprt->sc_rq_cq; 1060 dprintk("svcrdma: newxprt->sc_cm_id=%p, newxprt->sc_pd=%p\n", 1061 newxprt->sc_cm_id, newxprt->sc_pd); 1062 dprintk(" cap.max_send_wr = %d, cap.max_recv_wr = %d\n", 1063 qp_attr.cap.max_send_wr, qp_attr.cap.max_recv_wr); 1064 dprintk(" cap.max_send_sge = %d, cap.max_recv_sge = %d\n", 1065 qp_attr.cap.max_send_sge, qp_attr.cap.max_recv_sge); 1066 1067 ret = rdma_create_qp(newxprt->sc_cm_id, newxprt->sc_pd, &qp_attr); 1068 if (ret) { 1069 dprintk("svcrdma: failed to create QP, ret=%d\n", ret); 1070 goto errout; 1071 } 1072 newxprt->sc_qp = newxprt->sc_cm_id->qp; 1073 1074 /* 1075 * Use the most secure set of MR resources based on the 1076 * transport type and available memory management features in 1077 * the device. Here's the table implemented below: 1078 * 1079 * Fast Global DMA Remote WR 1080 * Reg LKEY MR Access 1081 * Sup'd Sup'd Needed Needed 1082 * 1083 * IWARP N N Y Y 1084 * N Y Y Y 1085 * Y N Y N 1086 * Y Y N - 1087 * 1088 * IB N N Y N 1089 * N Y N - 1090 * Y N Y N 1091 * Y Y N - 1092 * 1093 * NB: iWARP requires remote write access for the data sink 1094 * of an RDMA_READ. IB does not. 1095 */ 1096 newxprt->sc_reader = rdma_read_chunk_lcl; 1097 if (dev->attrs.device_cap_flags & IB_DEVICE_MEM_MGT_EXTENSIONS) { 1098 newxprt->sc_frmr_pg_list_len = 1099 dev->attrs.max_fast_reg_page_list_len; 1100 newxprt->sc_dev_caps |= SVCRDMA_DEVCAP_FAST_REG; 1101 newxprt->sc_reader = rdma_read_chunk_frmr; 1102 } else 1103 newxprt->sc_snd_w_inv = false; 1104 1105 /* 1106 * Determine if a DMA MR is required and if so, what privs are required 1107 */ 1108 if (!rdma_protocol_iwarp(dev, newxprt->sc_cm_id->port_num) && 1109 !rdma_ib_or_roce(dev, newxprt->sc_cm_id->port_num)) 1110 goto errout; 1111 1112 if (rdma_protocol_iwarp(dev, newxprt->sc_cm_id->port_num)) 1113 newxprt->sc_dev_caps |= SVCRDMA_DEVCAP_READ_W_INV; 1114 1115 /* Post receive buffers */ 1116 for (i = 0; i < newxprt->sc_max_requests; i++) { 1117 ret = svc_rdma_post_recv(newxprt, GFP_KERNEL); 1118 if (ret) { 1119 dprintk("svcrdma: failure posting receive buffers\n"); 1120 goto errout; 1121 } 1122 } 1123 1124 /* Swap out the handler */ 1125 newxprt->sc_cm_id->event_handler = rdma_cma_handler; 1126 1127 /* Construct RDMA-CM private message */ 1128 pmsg.cp_magic = rpcrdma_cmp_magic; 1129 pmsg.cp_version = RPCRDMA_CMP_VERSION; 1130 pmsg.cp_flags = 0; 1131 pmsg.cp_send_size = pmsg.cp_recv_size = 1132 rpcrdma_encode_buffer_size(newxprt->sc_max_req_size); 1133 1134 /* Accept Connection */ 1135 set_bit(RDMAXPRT_CONN_PENDING, &newxprt->sc_flags); 1136 memset(&conn_param, 0, sizeof conn_param); 1137 conn_param.responder_resources = 0; 1138 conn_param.initiator_depth = newxprt->sc_ord; 1139 conn_param.private_data = &pmsg; 1140 conn_param.private_data_len = sizeof(pmsg); 1141 ret = rdma_accept(newxprt->sc_cm_id, &conn_param); 1142 if (ret) { 1143 dprintk("svcrdma: failed to accept new connection, ret=%d\n", 1144 ret); 1145 goto errout; 1146 } 1147 1148 dprintk("svcrdma: new connection %p accepted:\n", newxprt); 1149 sap = (struct sockaddr *)&newxprt->sc_cm_id->route.addr.src_addr; 1150 dprintk(" local address : %pIS:%u\n", sap, rpc_get_port(sap)); 1151 sap = (struct sockaddr *)&newxprt->sc_cm_id->route.addr.dst_addr; 1152 dprintk(" remote address : %pIS:%u\n", sap, rpc_get_port(sap)); 1153 dprintk(" max_sge : %d\n", newxprt->sc_max_sge); 1154 dprintk(" max_sge_rd : %d\n", newxprt->sc_max_sge_rd); 1155 dprintk(" sq_depth : %d\n", newxprt->sc_sq_depth); 1156 dprintk(" max_requests : %d\n", newxprt->sc_max_requests); 1157 dprintk(" ord : %d\n", newxprt->sc_ord); 1158 1159 return &newxprt->sc_xprt; 1160 1161 errout: 1162 dprintk("svcrdma: failure accepting new connection rc=%d.\n", ret); 1163 /* Take a reference in case the DTO handler runs */ 1164 svc_xprt_get(&newxprt->sc_xprt); 1165 if (newxprt->sc_qp && !IS_ERR(newxprt->sc_qp)) 1166 ib_destroy_qp(newxprt->sc_qp); 1167 rdma_destroy_id(newxprt->sc_cm_id); 1168 /* This call to put will destroy the transport */ 1169 svc_xprt_put(&newxprt->sc_xprt); 1170 return NULL; 1171 } 1172 1173 static void svc_rdma_release_rqst(struct svc_rqst *rqstp) 1174 { 1175 } 1176 1177 /* 1178 * When connected, an svc_xprt has at least two references: 1179 * 1180 * - A reference held by the cm_id between the ESTABLISHED and 1181 * DISCONNECTED events. If the remote peer disconnected first, this 1182 * reference could be gone. 1183 * 1184 * - A reference held by the svc_recv code that called this function 1185 * as part of close processing. 1186 * 1187 * At a minimum one references should still be held. 1188 */ 1189 static void svc_rdma_detach(struct svc_xprt *xprt) 1190 { 1191 struct svcxprt_rdma *rdma = 1192 container_of(xprt, struct svcxprt_rdma, sc_xprt); 1193 dprintk("svc: svc_rdma_detach(%p)\n", xprt); 1194 1195 /* Disconnect and flush posted WQE */ 1196 rdma_disconnect(rdma->sc_cm_id); 1197 } 1198 1199 static void __svc_rdma_free(struct work_struct *work) 1200 { 1201 struct svcxprt_rdma *rdma = 1202 container_of(work, struct svcxprt_rdma, sc_work); 1203 struct svc_xprt *xprt = &rdma->sc_xprt; 1204 1205 dprintk("svcrdma: %s(%p)\n", __func__, rdma); 1206 1207 if (rdma->sc_qp && !IS_ERR(rdma->sc_qp)) 1208 ib_drain_qp(rdma->sc_qp); 1209 1210 /* We should only be called from kref_put */ 1211 if (kref_read(&xprt->xpt_ref) != 0) 1212 pr_err("svcrdma: sc_xprt still in use? (%d)\n", 1213 kref_read(&xprt->xpt_ref)); 1214 1215 /* 1216 * Destroy queued, but not processed read completions. Note 1217 * that this cleanup has to be done before destroying the 1218 * cm_id because the device ptr is needed to unmap the dma in 1219 * svc_rdma_put_context. 1220 */ 1221 while (!list_empty(&rdma->sc_read_complete_q)) { 1222 struct svc_rdma_op_ctxt *ctxt; 1223 ctxt = list_first_entry(&rdma->sc_read_complete_q, 1224 struct svc_rdma_op_ctxt, list); 1225 list_del(&ctxt->list); 1226 svc_rdma_put_context(ctxt, 1); 1227 } 1228 1229 /* Destroy queued, but not processed recv completions */ 1230 while (!list_empty(&rdma->sc_rq_dto_q)) { 1231 struct svc_rdma_op_ctxt *ctxt; 1232 ctxt = list_first_entry(&rdma->sc_rq_dto_q, 1233 struct svc_rdma_op_ctxt, list); 1234 list_del(&ctxt->list); 1235 svc_rdma_put_context(ctxt, 1); 1236 } 1237 1238 /* Warn if we leaked a resource or under-referenced */ 1239 if (rdma->sc_ctxt_used != 0) 1240 pr_err("svcrdma: ctxt still in use? (%d)\n", 1241 rdma->sc_ctxt_used); 1242 1243 /* Final put of backchannel client transport */ 1244 if (xprt->xpt_bc_xprt) { 1245 xprt_put(xprt->xpt_bc_xprt); 1246 xprt->xpt_bc_xprt = NULL; 1247 } 1248 1249 rdma_dealloc_frmr_q(rdma); 1250 svc_rdma_destroy_ctxts(rdma); 1251 svc_rdma_destroy_maps(rdma); 1252 1253 /* Destroy the QP if present (not a listener) */ 1254 if (rdma->sc_qp && !IS_ERR(rdma->sc_qp)) 1255 ib_destroy_qp(rdma->sc_qp); 1256 1257 if (rdma->sc_sq_cq && !IS_ERR(rdma->sc_sq_cq)) 1258 ib_free_cq(rdma->sc_sq_cq); 1259 1260 if (rdma->sc_rq_cq && !IS_ERR(rdma->sc_rq_cq)) 1261 ib_free_cq(rdma->sc_rq_cq); 1262 1263 if (rdma->sc_pd && !IS_ERR(rdma->sc_pd)) 1264 ib_dealloc_pd(rdma->sc_pd); 1265 1266 /* Destroy the CM ID */ 1267 rdma_destroy_id(rdma->sc_cm_id); 1268 1269 kfree(rdma); 1270 } 1271 1272 static void svc_rdma_free(struct svc_xprt *xprt) 1273 { 1274 struct svcxprt_rdma *rdma = 1275 container_of(xprt, struct svcxprt_rdma, sc_xprt); 1276 INIT_WORK(&rdma->sc_work, __svc_rdma_free); 1277 queue_work(svc_rdma_wq, &rdma->sc_work); 1278 } 1279 1280 static int svc_rdma_has_wspace(struct svc_xprt *xprt) 1281 { 1282 struct svcxprt_rdma *rdma = 1283 container_of(xprt, struct svcxprt_rdma, sc_xprt); 1284 1285 /* 1286 * If there are already waiters on the SQ, 1287 * return false. 1288 */ 1289 if (waitqueue_active(&rdma->sc_send_wait)) 1290 return 0; 1291 1292 /* Otherwise return true. */ 1293 return 1; 1294 } 1295 1296 static int svc_rdma_secure_port(struct svc_rqst *rqstp) 1297 { 1298 return 1; 1299 } 1300 1301 static void svc_rdma_kill_temp_xprt(struct svc_xprt *xprt) 1302 { 1303 } 1304 1305 int svc_rdma_send(struct svcxprt_rdma *xprt, struct ib_send_wr *wr) 1306 { 1307 struct ib_send_wr *bad_wr, *n_wr; 1308 int wr_count; 1309 int i; 1310 int ret; 1311 1312 if (test_bit(XPT_CLOSE, &xprt->sc_xprt.xpt_flags)) 1313 return -ENOTCONN; 1314 1315 wr_count = 1; 1316 for (n_wr = wr->next; n_wr; n_wr = n_wr->next) 1317 wr_count++; 1318 1319 /* If the SQ is full, wait until an SQ entry is available */ 1320 while (1) { 1321 if ((atomic_sub_return(wr_count, &xprt->sc_sq_avail) < 0)) { 1322 atomic_inc(&rdma_stat_sq_starve); 1323 1324 /* Wait until SQ WR available if SQ still full */ 1325 atomic_add(wr_count, &xprt->sc_sq_avail); 1326 wait_event(xprt->sc_send_wait, 1327 atomic_read(&xprt->sc_sq_avail) > wr_count); 1328 if (test_bit(XPT_CLOSE, &xprt->sc_xprt.xpt_flags)) 1329 return -ENOTCONN; 1330 continue; 1331 } 1332 /* Take a transport ref for each WR posted */ 1333 for (i = 0; i < wr_count; i++) 1334 svc_xprt_get(&xprt->sc_xprt); 1335 1336 /* Bump used SQ WR count and post */ 1337 ret = ib_post_send(xprt->sc_qp, wr, &bad_wr); 1338 if (ret) { 1339 set_bit(XPT_CLOSE, &xprt->sc_xprt.xpt_flags); 1340 for (i = 0; i < wr_count; i ++) 1341 svc_xprt_put(&xprt->sc_xprt); 1342 dprintk("svcrdma: failed to post SQ WR rc=%d\n", ret); 1343 dprintk(" sc_sq_avail=%d, sc_sq_depth=%d\n", 1344 atomic_read(&xprt->sc_sq_avail), 1345 xprt->sc_sq_depth); 1346 wake_up(&xprt->sc_send_wait); 1347 } 1348 break; 1349 } 1350 return ret; 1351 } 1352