1 /* 2 * Copyright (c) 2014 Open Grid Computing, Inc. All rights reserved. 3 * Copyright (c) 2005-2007 Network Appliance, Inc. All rights reserved. 4 * 5 * This software is available to you under a choice of one of two 6 * licenses. You may choose to be licensed under the terms of the GNU 7 * General Public License (GPL) Version 2, available from the file 8 * COPYING in the main directory of this source tree, or the BSD-type 9 * license below: 10 * 11 * Redistribution and use in source and binary forms, with or without 12 * modification, are permitted provided that the following conditions 13 * are met: 14 * 15 * Redistributions of source code must retain the above copyright 16 * notice, this list of conditions and the following disclaimer. 17 * 18 * Redistributions in binary form must reproduce the above 19 * copyright notice, this list of conditions and the following 20 * disclaimer in the documentation and/or other materials provided 21 * with the distribution. 22 * 23 * Neither the name of the Network Appliance, Inc. nor the names of 24 * its contributors may be used to endorse or promote products 25 * derived from this software without specific prior written 26 * permission. 27 * 28 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 29 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 30 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 31 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 32 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 33 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 34 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 35 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 36 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 37 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 38 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 39 * 40 * Author: Tom Tucker <tom@opengridcomputing.com> 41 */ 42 43 #include <linux/sunrpc/svc_xprt.h> 44 #include <linux/sunrpc/debug.h> 45 #include <linux/sunrpc/rpc_rdma.h> 46 #include <linux/interrupt.h> 47 #include <linux/sched.h> 48 #include <linux/slab.h> 49 #include <linux/spinlock.h> 50 #include <linux/workqueue.h> 51 #include <rdma/ib_verbs.h> 52 #include <rdma/rdma_cm.h> 53 #include <linux/sunrpc/svc_rdma.h> 54 #include <linux/export.h> 55 #include "xprt_rdma.h" 56 57 #define RPCDBG_FACILITY RPCDBG_SVCXPRT 58 59 static struct svcxprt_rdma *rdma_create_xprt(struct svc_serv *, int); 60 static struct svc_xprt *svc_rdma_create(struct svc_serv *serv, 61 struct net *net, 62 struct sockaddr *sa, int salen, 63 int flags); 64 static struct svc_xprt *svc_rdma_accept(struct svc_xprt *xprt); 65 static void svc_rdma_release_rqst(struct svc_rqst *); 66 static void svc_rdma_detach(struct svc_xprt *xprt); 67 static void svc_rdma_free(struct svc_xprt *xprt); 68 static int svc_rdma_has_wspace(struct svc_xprt *xprt); 69 static int svc_rdma_secure_port(struct svc_rqst *); 70 71 static struct svc_xprt_ops svc_rdma_ops = { 72 .xpo_create = svc_rdma_create, 73 .xpo_recvfrom = svc_rdma_recvfrom, 74 .xpo_sendto = svc_rdma_sendto, 75 .xpo_release_rqst = svc_rdma_release_rqst, 76 .xpo_detach = svc_rdma_detach, 77 .xpo_free = svc_rdma_free, 78 .xpo_prep_reply_hdr = svc_rdma_prep_reply_hdr, 79 .xpo_has_wspace = svc_rdma_has_wspace, 80 .xpo_accept = svc_rdma_accept, 81 .xpo_secure_port = svc_rdma_secure_port, 82 }; 83 84 struct svc_xprt_class svc_rdma_class = { 85 .xcl_name = "rdma", 86 .xcl_owner = THIS_MODULE, 87 .xcl_ops = &svc_rdma_ops, 88 .xcl_max_payload = RPCSVC_MAXPAYLOAD_RDMA, 89 .xcl_ident = XPRT_TRANSPORT_RDMA, 90 }; 91 92 #if defined(CONFIG_SUNRPC_BACKCHANNEL) 93 static struct svc_xprt *svc_rdma_bc_create(struct svc_serv *, struct net *, 94 struct sockaddr *, int, int); 95 static void svc_rdma_bc_detach(struct svc_xprt *); 96 static void svc_rdma_bc_free(struct svc_xprt *); 97 98 static struct svc_xprt_ops svc_rdma_bc_ops = { 99 .xpo_create = svc_rdma_bc_create, 100 .xpo_detach = svc_rdma_bc_detach, 101 .xpo_free = svc_rdma_bc_free, 102 .xpo_prep_reply_hdr = svc_rdma_prep_reply_hdr, 103 .xpo_secure_port = svc_rdma_secure_port, 104 }; 105 106 struct svc_xprt_class svc_rdma_bc_class = { 107 .xcl_name = "rdma-bc", 108 .xcl_owner = THIS_MODULE, 109 .xcl_ops = &svc_rdma_bc_ops, 110 .xcl_max_payload = (1024 - RPCRDMA_HDRLEN_MIN) 111 }; 112 113 static struct svc_xprt *svc_rdma_bc_create(struct svc_serv *serv, 114 struct net *net, 115 struct sockaddr *sa, int salen, 116 int flags) 117 { 118 struct svcxprt_rdma *cma_xprt; 119 struct svc_xprt *xprt; 120 121 cma_xprt = rdma_create_xprt(serv, 0); 122 if (!cma_xprt) 123 return ERR_PTR(-ENOMEM); 124 xprt = &cma_xprt->sc_xprt; 125 126 svc_xprt_init(net, &svc_rdma_bc_class, xprt, serv); 127 serv->sv_bc_xprt = xprt; 128 129 dprintk("svcrdma: %s(%p)\n", __func__, xprt); 130 return xprt; 131 } 132 133 static void svc_rdma_bc_detach(struct svc_xprt *xprt) 134 { 135 dprintk("svcrdma: %s(%p)\n", __func__, xprt); 136 } 137 138 static void svc_rdma_bc_free(struct svc_xprt *xprt) 139 { 140 struct svcxprt_rdma *rdma = 141 container_of(xprt, struct svcxprt_rdma, sc_xprt); 142 143 dprintk("svcrdma: %s(%p)\n", __func__, xprt); 144 if (xprt) 145 kfree(rdma); 146 } 147 #endif /* CONFIG_SUNRPC_BACKCHANNEL */ 148 149 static struct svc_rdma_op_ctxt *alloc_ctxt(struct svcxprt_rdma *xprt, 150 gfp_t flags) 151 { 152 struct svc_rdma_op_ctxt *ctxt; 153 154 ctxt = kmalloc(sizeof(*ctxt), flags); 155 if (ctxt) { 156 ctxt->xprt = xprt; 157 INIT_LIST_HEAD(&ctxt->free); 158 INIT_LIST_HEAD(&ctxt->dto_q); 159 } 160 return ctxt; 161 } 162 163 static bool svc_rdma_prealloc_ctxts(struct svcxprt_rdma *xprt) 164 { 165 unsigned int i; 166 167 /* Each RPC/RDMA credit can consume a number of send 168 * and receive WQEs. One ctxt is allocated for each. 169 */ 170 i = xprt->sc_sq_depth + xprt->sc_rq_depth; 171 172 while (i--) { 173 struct svc_rdma_op_ctxt *ctxt; 174 175 ctxt = alloc_ctxt(xprt, GFP_KERNEL); 176 if (!ctxt) { 177 dprintk("svcrdma: No memory for RDMA ctxt\n"); 178 return false; 179 } 180 list_add(&ctxt->free, &xprt->sc_ctxts); 181 } 182 return true; 183 } 184 185 struct svc_rdma_op_ctxt *svc_rdma_get_context(struct svcxprt_rdma *xprt) 186 { 187 struct svc_rdma_op_ctxt *ctxt = NULL; 188 189 spin_lock_bh(&xprt->sc_ctxt_lock); 190 xprt->sc_ctxt_used++; 191 if (list_empty(&xprt->sc_ctxts)) 192 goto out_empty; 193 194 ctxt = list_first_entry(&xprt->sc_ctxts, 195 struct svc_rdma_op_ctxt, free); 196 list_del_init(&ctxt->free); 197 spin_unlock_bh(&xprt->sc_ctxt_lock); 198 199 out: 200 ctxt->count = 0; 201 ctxt->mapped_sges = 0; 202 ctxt->frmr = NULL; 203 return ctxt; 204 205 out_empty: 206 /* Either pre-allocation missed the mark, or send 207 * queue accounting is broken. 208 */ 209 spin_unlock_bh(&xprt->sc_ctxt_lock); 210 211 ctxt = alloc_ctxt(xprt, GFP_NOIO); 212 if (ctxt) 213 goto out; 214 215 spin_lock_bh(&xprt->sc_ctxt_lock); 216 xprt->sc_ctxt_used--; 217 spin_unlock_bh(&xprt->sc_ctxt_lock); 218 WARN_ONCE(1, "svcrdma: empty RDMA ctxt list?\n"); 219 return NULL; 220 } 221 222 void svc_rdma_unmap_dma(struct svc_rdma_op_ctxt *ctxt) 223 { 224 struct svcxprt_rdma *xprt = ctxt->xprt; 225 struct ib_device *device = xprt->sc_cm_id->device; 226 u32 lkey = xprt->sc_pd->local_dma_lkey; 227 unsigned int i, count; 228 229 for (count = 0, i = 0; i < ctxt->mapped_sges; i++) { 230 /* 231 * Unmap the DMA addr in the SGE if the lkey matches 232 * the local_dma_lkey, otherwise, ignore it since it is 233 * an FRMR lkey and will be unmapped later when the 234 * last WR that uses it completes. 235 */ 236 if (ctxt->sge[i].lkey == lkey) { 237 count++; 238 ib_dma_unmap_page(device, 239 ctxt->sge[i].addr, 240 ctxt->sge[i].length, 241 ctxt->direction); 242 } 243 } 244 ctxt->mapped_sges = 0; 245 atomic_sub(count, &xprt->sc_dma_used); 246 } 247 248 void svc_rdma_put_context(struct svc_rdma_op_ctxt *ctxt, int free_pages) 249 { 250 struct svcxprt_rdma *xprt = ctxt->xprt; 251 int i; 252 253 if (free_pages) 254 for (i = 0; i < ctxt->count; i++) 255 put_page(ctxt->pages[i]); 256 257 spin_lock_bh(&xprt->sc_ctxt_lock); 258 xprt->sc_ctxt_used--; 259 list_add(&ctxt->free, &xprt->sc_ctxts); 260 spin_unlock_bh(&xprt->sc_ctxt_lock); 261 } 262 263 static void svc_rdma_destroy_ctxts(struct svcxprt_rdma *xprt) 264 { 265 while (!list_empty(&xprt->sc_ctxts)) { 266 struct svc_rdma_op_ctxt *ctxt; 267 268 ctxt = list_first_entry(&xprt->sc_ctxts, 269 struct svc_rdma_op_ctxt, free); 270 list_del(&ctxt->free); 271 kfree(ctxt); 272 } 273 } 274 275 static struct svc_rdma_req_map *alloc_req_map(gfp_t flags) 276 { 277 struct svc_rdma_req_map *map; 278 279 map = kmalloc(sizeof(*map), flags); 280 if (map) 281 INIT_LIST_HEAD(&map->free); 282 return map; 283 } 284 285 static bool svc_rdma_prealloc_maps(struct svcxprt_rdma *xprt) 286 { 287 unsigned int i; 288 289 /* One for each receive buffer on this connection. */ 290 i = xprt->sc_max_requests; 291 292 while (i--) { 293 struct svc_rdma_req_map *map; 294 295 map = alloc_req_map(GFP_KERNEL); 296 if (!map) { 297 dprintk("svcrdma: No memory for request map\n"); 298 return false; 299 } 300 list_add(&map->free, &xprt->sc_maps); 301 } 302 return true; 303 } 304 305 struct svc_rdma_req_map *svc_rdma_get_req_map(struct svcxprt_rdma *xprt) 306 { 307 struct svc_rdma_req_map *map = NULL; 308 309 spin_lock(&xprt->sc_map_lock); 310 if (list_empty(&xprt->sc_maps)) 311 goto out_empty; 312 313 map = list_first_entry(&xprt->sc_maps, 314 struct svc_rdma_req_map, free); 315 list_del_init(&map->free); 316 spin_unlock(&xprt->sc_map_lock); 317 318 out: 319 map->count = 0; 320 return map; 321 322 out_empty: 323 spin_unlock(&xprt->sc_map_lock); 324 325 /* Pre-allocation amount was incorrect */ 326 map = alloc_req_map(GFP_NOIO); 327 if (map) 328 goto out; 329 330 WARN_ONCE(1, "svcrdma: empty request map list?\n"); 331 return NULL; 332 } 333 334 void svc_rdma_put_req_map(struct svcxprt_rdma *xprt, 335 struct svc_rdma_req_map *map) 336 { 337 spin_lock(&xprt->sc_map_lock); 338 list_add(&map->free, &xprt->sc_maps); 339 spin_unlock(&xprt->sc_map_lock); 340 } 341 342 static void svc_rdma_destroy_maps(struct svcxprt_rdma *xprt) 343 { 344 while (!list_empty(&xprt->sc_maps)) { 345 struct svc_rdma_req_map *map; 346 347 map = list_first_entry(&xprt->sc_maps, 348 struct svc_rdma_req_map, free); 349 list_del(&map->free); 350 kfree(map); 351 } 352 } 353 354 /* QP event handler */ 355 static void qp_event_handler(struct ib_event *event, void *context) 356 { 357 struct svc_xprt *xprt = context; 358 359 switch (event->event) { 360 /* These are considered benign events */ 361 case IB_EVENT_PATH_MIG: 362 case IB_EVENT_COMM_EST: 363 case IB_EVENT_SQ_DRAINED: 364 case IB_EVENT_QP_LAST_WQE_REACHED: 365 dprintk("svcrdma: QP event %s (%d) received for QP=%p\n", 366 ib_event_msg(event->event), event->event, 367 event->element.qp); 368 break; 369 /* These are considered fatal events */ 370 case IB_EVENT_PATH_MIG_ERR: 371 case IB_EVENT_QP_FATAL: 372 case IB_EVENT_QP_REQ_ERR: 373 case IB_EVENT_QP_ACCESS_ERR: 374 case IB_EVENT_DEVICE_FATAL: 375 default: 376 dprintk("svcrdma: QP ERROR event %s (%d) received for QP=%p, " 377 "closing transport\n", 378 ib_event_msg(event->event), event->event, 379 event->element.qp); 380 set_bit(XPT_CLOSE, &xprt->xpt_flags); 381 break; 382 } 383 } 384 385 /** 386 * svc_rdma_wc_receive - Invoked by RDMA provider for each polled Receive WC 387 * @cq: completion queue 388 * @wc: completed WR 389 * 390 */ 391 static void svc_rdma_wc_receive(struct ib_cq *cq, struct ib_wc *wc) 392 { 393 struct svcxprt_rdma *xprt = cq->cq_context; 394 struct ib_cqe *cqe = wc->wr_cqe; 395 struct svc_rdma_op_ctxt *ctxt; 396 397 /* WARNING: Only wc->wr_cqe and wc->status are reliable */ 398 ctxt = container_of(cqe, struct svc_rdma_op_ctxt, cqe); 399 ctxt->wc_status = wc->status; 400 svc_rdma_unmap_dma(ctxt); 401 402 if (wc->status != IB_WC_SUCCESS) 403 goto flushed; 404 405 /* All wc fields are now known to be valid */ 406 ctxt->byte_len = wc->byte_len; 407 spin_lock(&xprt->sc_rq_dto_lock); 408 list_add_tail(&ctxt->dto_q, &xprt->sc_rq_dto_q); 409 spin_unlock(&xprt->sc_rq_dto_lock); 410 411 set_bit(XPT_DATA, &xprt->sc_xprt.xpt_flags); 412 if (test_bit(RDMAXPRT_CONN_PENDING, &xprt->sc_flags)) 413 goto out; 414 svc_xprt_enqueue(&xprt->sc_xprt); 415 goto out; 416 417 flushed: 418 if (wc->status != IB_WC_WR_FLUSH_ERR) 419 pr_warn("svcrdma: receive: %s (%u/0x%x)\n", 420 ib_wc_status_msg(wc->status), 421 wc->status, wc->vendor_err); 422 set_bit(XPT_CLOSE, &xprt->sc_xprt.xpt_flags); 423 svc_rdma_put_context(ctxt, 1); 424 425 out: 426 svc_xprt_put(&xprt->sc_xprt); 427 } 428 429 static void svc_rdma_send_wc_common(struct svcxprt_rdma *xprt, 430 struct ib_wc *wc, 431 const char *opname) 432 { 433 if (wc->status != IB_WC_SUCCESS) 434 goto err; 435 436 out: 437 atomic_dec(&xprt->sc_sq_count); 438 wake_up(&xprt->sc_send_wait); 439 return; 440 441 err: 442 set_bit(XPT_CLOSE, &xprt->sc_xprt.xpt_flags); 443 if (wc->status != IB_WC_WR_FLUSH_ERR) 444 pr_err("svcrdma: %s: %s (%u/0x%x)\n", 445 opname, ib_wc_status_msg(wc->status), 446 wc->status, wc->vendor_err); 447 goto out; 448 } 449 450 static void svc_rdma_send_wc_common_put(struct ib_cq *cq, struct ib_wc *wc, 451 const char *opname) 452 { 453 struct svcxprt_rdma *xprt = cq->cq_context; 454 455 svc_rdma_send_wc_common(xprt, wc, opname); 456 svc_xprt_put(&xprt->sc_xprt); 457 } 458 459 /** 460 * svc_rdma_wc_send - Invoked by RDMA provider for each polled Send WC 461 * @cq: completion queue 462 * @wc: completed WR 463 * 464 */ 465 void svc_rdma_wc_send(struct ib_cq *cq, struct ib_wc *wc) 466 { 467 struct ib_cqe *cqe = wc->wr_cqe; 468 struct svc_rdma_op_ctxt *ctxt; 469 470 svc_rdma_send_wc_common_put(cq, wc, "send"); 471 472 ctxt = container_of(cqe, struct svc_rdma_op_ctxt, cqe); 473 svc_rdma_unmap_dma(ctxt); 474 svc_rdma_put_context(ctxt, 1); 475 } 476 477 /** 478 * svc_rdma_wc_write - Invoked by RDMA provider for each polled Write WC 479 * @cq: completion queue 480 * @wc: completed WR 481 * 482 */ 483 void svc_rdma_wc_write(struct ib_cq *cq, struct ib_wc *wc) 484 { 485 struct ib_cqe *cqe = wc->wr_cqe; 486 struct svc_rdma_op_ctxt *ctxt; 487 488 svc_rdma_send_wc_common_put(cq, wc, "write"); 489 490 ctxt = container_of(cqe, struct svc_rdma_op_ctxt, cqe); 491 svc_rdma_unmap_dma(ctxt); 492 svc_rdma_put_context(ctxt, 0); 493 } 494 495 /** 496 * svc_rdma_wc_reg - Invoked by RDMA provider for each polled FASTREG WC 497 * @cq: completion queue 498 * @wc: completed WR 499 * 500 */ 501 void svc_rdma_wc_reg(struct ib_cq *cq, struct ib_wc *wc) 502 { 503 svc_rdma_send_wc_common_put(cq, wc, "fastreg"); 504 } 505 506 /** 507 * svc_rdma_wc_read - Invoked by RDMA provider for each polled Read WC 508 * @cq: completion queue 509 * @wc: completed WR 510 * 511 */ 512 void svc_rdma_wc_read(struct ib_cq *cq, struct ib_wc *wc) 513 { 514 struct svcxprt_rdma *xprt = cq->cq_context; 515 struct ib_cqe *cqe = wc->wr_cqe; 516 struct svc_rdma_op_ctxt *ctxt; 517 518 svc_rdma_send_wc_common(xprt, wc, "read"); 519 520 ctxt = container_of(cqe, struct svc_rdma_op_ctxt, cqe); 521 svc_rdma_unmap_dma(ctxt); 522 svc_rdma_put_frmr(xprt, ctxt->frmr); 523 524 if (test_bit(RDMACTXT_F_LAST_CTXT, &ctxt->flags)) { 525 struct svc_rdma_op_ctxt *read_hdr; 526 527 read_hdr = ctxt->read_hdr; 528 spin_lock(&xprt->sc_rq_dto_lock); 529 list_add_tail(&read_hdr->dto_q, 530 &xprt->sc_read_complete_q); 531 spin_unlock(&xprt->sc_rq_dto_lock); 532 533 set_bit(XPT_DATA, &xprt->sc_xprt.xpt_flags); 534 svc_xprt_enqueue(&xprt->sc_xprt); 535 } 536 537 svc_rdma_put_context(ctxt, 0); 538 svc_xprt_put(&xprt->sc_xprt); 539 } 540 541 /** 542 * svc_rdma_wc_inv - Invoked by RDMA provider for each polled LOCAL_INV WC 543 * @cq: completion queue 544 * @wc: completed WR 545 * 546 */ 547 void svc_rdma_wc_inv(struct ib_cq *cq, struct ib_wc *wc) 548 { 549 svc_rdma_send_wc_common_put(cq, wc, "localInv"); 550 } 551 552 static struct svcxprt_rdma *rdma_create_xprt(struct svc_serv *serv, 553 int listener) 554 { 555 struct svcxprt_rdma *cma_xprt = kzalloc(sizeof *cma_xprt, GFP_KERNEL); 556 557 if (!cma_xprt) 558 return NULL; 559 svc_xprt_init(&init_net, &svc_rdma_class, &cma_xprt->sc_xprt, serv); 560 INIT_LIST_HEAD(&cma_xprt->sc_accept_q); 561 INIT_LIST_HEAD(&cma_xprt->sc_dto_q); 562 INIT_LIST_HEAD(&cma_xprt->sc_rq_dto_q); 563 INIT_LIST_HEAD(&cma_xprt->sc_read_complete_q); 564 INIT_LIST_HEAD(&cma_xprt->sc_frmr_q); 565 INIT_LIST_HEAD(&cma_xprt->sc_ctxts); 566 INIT_LIST_HEAD(&cma_xprt->sc_maps); 567 init_waitqueue_head(&cma_xprt->sc_send_wait); 568 569 spin_lock_init(&cma_xprt->sc_lock); 570 spin_lock_init(&cma_xprt->sc_rq_dto_lock); 571 spin_lock_init(&cma_xprt->sc_frmr_q_lock); 572 spin_lock_init(&cma_xprt->sc_ctxt_lock); 573 spin_lock_init(&cma_xprt->sc_map_lock); 574 575 if (listener) 576 set_bit(XPT_LISTENER, &cma_xprt->sc_xprt.xpt_flags); 577 578 return cma_xprt; 579 } 580 581 int svc_rdma_post_recv(struct svcxprt_rdma *xprt, gfp_t flags) 582 { 583 struct ib_recv_wr recv_wr, *bad_recv_wr; 584 struct svc_rdma_op_ctxt *ctxt; 585 struct page *page; 586 dma_addr_t pa; 587 int sge_no; 588 int buflen; 589 int ret; 590 591 ctxt = svc_rdma_get_context(xprt); 592 buflen = 0; 593 ctxt->direction = DMA_FROM_DEVICE; 594 ctxt->cqe.done = svc_rdma_wc_receive; 595 for (sge_no = 0; buflen < xprt->sc_max_req_size; sge_no++) { 596 if (sge_no >= xprt->sc_max_sge) { 597 pr_err("svcrdma: Too many sges (%d)\n", sge_no); 598 goto err_put_ctxt; 599 } 600 page = alloc_page(flags); 601 if (!page) 602 goto err_put_ctxt; 603 ctxt->pages[sge_no] = page; 604 pa = ib_dma_map_page(xprt->sc_cm_id->device, 605 page, 0, PAGE_SIZE, 606 DMA_FROM_DEVICE); 607 if (ib_dma_mapping_error(xprt->sc_cm_id->device, pa)) 608 goto err_put_ctxt; 609 svc_rdma_count_mappings(xprt, ctxt); 610 ctxt->sge[sge_no].addr = pa; 611 ctxt->sge[sge_no].length = PAGE_SIZE; 612 ctxt->sge[sge_no].lkey = xprt->sc_pd->local_dma_lkey; 613 ctxt->count = sge_no + 1; 614 buflen += PAGE_SIZE; 615 } 616 recv_wr.next = NULL; 617 recv_wr.sg_list = &ctxt->sge[0]; 618 recv_wr.num_sge = ctxt->count; 619 recv_wr.wr_cqe = &ctxt->cqe; 620 621 svc_xprt_get(&xprt->sc_xprt); 622 ret = ib_post_recv(xprt->sc_qp, &recv_wr, &bad_recv_wr); 623 if (ret) { 624 svc_rdma_unmap_dma(ctxt); 625 svc_rdma_put_context(ctxt, 1); 626 svc_xprt_put(&xprt->sc_xprt); 627 } 628 return ret; 629 630 err_put_ctxt: 631 svc_rdma_unmap_dma(ctxt); 632 svc_rdma_put_context(ctxt, 1); 633 return -ENOMEM; 634 } 635 636 int svc_rdma_repost_recv(struct svcxprt_rdma *xprt, gfp_t flags) 637 { 638 int ret = 0; 639 640 ret = svc_rdma_post_recv(xprt, flags); 641 if (ret) { 642 pr_err("svcrdma: could not post a receive buffer, err=%d.\n", 643 ret); 644 pr_err("svcrdma: closing transport %p.\n", xprt); 645 set_bit(XPT_CLOSE, &xprt->sc_xprt.xpt_flags); 646 ret = -ENOTCONN; 647 } 648 return ret; 649 } 650 651 static void 652 svc_rdma_parse_connect_private(struct svcxprt_rdma *newxprt, 653 struct rdma_conn_param *param) 654 { 655 const struct rpcrdma_connect_private *pmsg = param->private_data; 656 657 if (pmsg && 658 pmsg->cp_magic == rpcrdma_cmp_magic && 659 pmsg->cp_version == RPCRDMA_CMP_VERSION) { 660 newxprt->sc_snd_w_inv = pmsg->cp_flags & 661 RPCRDMA_CMP_F_SND_W_INV_OK; 662 663 dprintk("svcrdma: client send_size %u, recv_size %u " 664 "remote inv %ssupported\n", 665 rpcrdma_decode_buffer_size(pmsg->cp_send_size), 666 rpcrdma_decode_buffer_size(pmsg->cp_recv_size), 667 newxprt->sc_snd_w_inv ? "" : "un"); 668 } 669 } 670 671 /* 672 * This function handles the CONNECT_REQUEST event on a listening 673 * endpoint. It is passed the cma_id for the _new_ connection. The context in 674 * this cma_id is inherited from the listening cma_id and is the svc_xprt 675 * structure for the listening endpoint. 676 * 677 * This function creates a new xprt for the new connection and enqueues it on 678 * the accept queue for the listent xprt. When the listen thread is kicked, it 679 * will call the recvfrom method on the listen xprt which will accept the new 680 * connection. 681 */ 682 static void handle_connect_req(struct rdma_cm_id *new_cma_id, 683 struct rdma_conn_param *param) 684 { 685 struct svcxprt_rdma *listen_xprt = new_cma_id->context; 686 struct svcxprt_rdma *newxprt; 687 struct sockaddr *sa; 688 689 /* Create a new transport */ 690 newxprt = rdma_create_xprt(listen_xprt->sc_xprt.xpt_server, 0); 691 if (!newxprt) { 692 dprintk("svcrdma: failed to create new transport\n"); 693 return; 694 } 695 newxprt->sc_cm_id = new_cma_id; 696 new_cma_id->context = newxprt; 697 dprintk("svcrdma: Creating newxprt=%p, cm_id=%p, listenxprt=%p\n", 698 newxprt, newxprt->sc_cm_id, listen_xprt); 699 svc_rdma_parse_connect_private(newxprt, param); 700 701 /* Save client advertised inbound read limit for use later in accept. */ 702 newxprt->sc_ord = param->initiator_depth; 703 704 /* Set the local and remote addresses in the transport */ 705 sa = (struct sockaddr *)&newxprt->sc_cm_id->route.addr.dst_addr; 706 svc_xprt_set_remote(&newxprt->sc_xprt, sa, svc_addr_len(sa)); 707 sa = (struct sockaddr *)&newxprt->sc_cm_id->route.addr.src_addr; 708 svc_xprt_set_local(&newxprt->sc_xprt, sa, svc_addr_len(sa)); 709 710 /* 711 * Enqueue the new transport on the accept queue of the listening 712 * transport 713 */ 714 spin_lock_bh(&listen_xprt->sc_lock); 715 list_add_tail(&newxprt->sc_accept_q, &listen_xprt->sc_accept_q); 716 spin_unlock_bh(&listen_xprt->sc_lock); 717 718 set_bit(XPT_CONN, &listen_xprt->sc_xprt.xpt_flags); 719 svc_xprt_enqueue(&listen_xprt->sc_xprt); 720 } 721 722 /* 723 * Handles events generated on the listening endpoint. These events will be 724 * either be incoming connect requests or adapter removal events. 725 */ 726 static int rdma_listen_handler(struct rdma_cm_id *cma_id, 727 struct rdma_cm_event *event) 728 { 729 struct svcxprt_rdma *xprt = cma_id->context; 730 int ret = 0; 731 732 switch (event->event) { 733 case RDMA_CM_EVENT_CONNECT_REQUEST: 734 dprintk("svcrdma: Connect request on cma_id=%p, xprt = %p, " 735 "event = %s (%d)\n", cma_id, cma_id->context, 736 rdma_event_msg(event->event), event->event); 737 handle_connect_req(cma_id, &event->param.conn); 738 break; 739 740 case RDMA_CM_EVENT_ESTABLISHED: 741 /* Accept complete */ 742 dprintk("svcrdma: Connection completed on LISTEN xprt=%p, " 743 "cm_id=%p\n", xprt, cma_id); 744 break; 745 746 case RDMA_CM_EVENT_DEVICE_REMOVAL: 747 dprintk("svcrdma: Device removal xprt=%p, cm_id=%p\n", 748 xprt, cma_id); 749 if (xprt) 750 set_bit(XPT_CLOSE, &xprt->sc_xprt.xpt_flags); 751 break; 752 753 default: 754 dprintk("svcrdma: Unexpected event on listening endpoint %p, " 755 "event = %s (%d)\n", cma_id, 756 rdma_event_msg(event->event), event->event); 757 break; 758 } 759 760 return ret; 761 } 762 763 static int rdma_cma_handler(struct rdma_cm_id *cma_id, 764 struct rdma_cm_event *event) 765 { 766 struct svc_xprt *xprt = cma_id->context; 767 struct svcxprt_rdma *rdma = 768 container_of(xprt, struct svcxprt_rdma, sc_xprt); 769 switch (event->event) { 770 case RDMA_CM_EVENT_ESTABLISHED: 771 /* Accept complete */ 772 svc_xprt_get(xprt); 773 dprintk("svcrdma: Connection completed on DTO xprt=%p, " 774 "cm_id=%p\n", xprt, cma_id); 775 clear_bit(RDMAXPRT_CONN_PENDING, &rdma->sc_flags); 776 svc_xprt_enqueue(xprt); 777 break; 778 case RDMA_CM_EVENT_DISCONNECTED: 779 dprintk("svcrdma: Disconnect on DTO xprt=%p, cm_id=%p\n", 780 xprt, cma_id); 781 if (xprt) { 782 set_bit(XPT_CLOSE, &xprt->xpt_flags); 783 svc_xprt_enqueue(xprt); 784 svc_xprt_put(xprt); 785 } 786 break; 787 case RDMA_CM_EVENT_DEVICE_REMOVAL: 788 dprintk("svcrdma: Device removal cma_id=%p, xprt = %p, " 789 "event = %s (%d)\n", cma_id, xprt, 790 rdma_event_msg(event->event), event->event); 791 if (xprt) { 792 set_bit(XPT_CLOSE, &xprt->xpt_flags); 793 svc_xprt_enqueue(xprt); 794 svc_xprt_put(xprt); 795 } 796 break; 797 default: 798 dprintk("svcrdma: Unexpected event on DTO endpoint %p, " 799 "event = %s (%d)\n", cma_id, 800 rdma_event_msg(event->event), event->event); 801 break; 802 } 803 return 0; 804 } 805 806 /* 807 * Create a listening RDMA service endpoint. 808 */ 809 static struct svc_xprt *svc_rdma_create(struct svc_serv *serv, 810 struct net *net, 811 struct sockaddr *sa, int salen, 812 int flags) 813 { 814 struct rdma_cm_id *listen_id; 815 struct svcxprt_rdma *cma_xprt; 816 int ret; 817 818 dprintk("svcrdma: Creating RDMA socket\n"); 819 if ((sa->sa_family != AF_INET) && (sa->sa_family != AF_INET6)) { 820 dprintk("svcrdma: Address family %d is not supported.\n", sa->sa_family); 821 return ERR_PTR(-EAFNOSUPPORT); 822 } 823 cma_xprt = rdma_create_xprt(serv, 1); 824 if (!cma_xprt) 825 return ERR_PTR(-ENOMEM); 826 827 listen_id = rdma_create_id(&init_net, rdma_listen_handler, cma_xprt, 828 RDMA_PS_TCP, IB_QPT_RC); 829 if (IS_ERR(listen_id)) { 830 ret = PTR_ERR(listen_id); 831 dprintk("svcrdma: rdma_create_id failed = %d\n", ret); 832 goto err0; 833 } 834 835 /* Allow both IPv4 and IPv6 sockets to bind a single port 836 * at the same time. 837 */ 838 #if IS_ENABLED(CONFIG_IPV6) 839 ret = rdma_set_afonly(listen_id, 1); 840 if (ret) { 841 dprintk("svcrdma: rdma_set_afonly failed = %d\n", ret); 842 goto err1; 843 } 844 #endif 845 ret = rdma_bind_addr(listen_id, sa); 846 if (ret) { 847 dprintk("svcrdma: rdma_bind_addr failed = %d\n", ret); 848 goto err1; 849 } 850 cma_xprt->sc_cm_id = listen_id; 851 852 ret = rdma_listen(listen_id, RPCRDMA_LISTEN_BACKLOG); 853 if (ret) { 854 dprintk("svcrdma: rdma_listen failed = %d\n", ret); 855 goto err1; 856 } 857 858 /* 859 * We need to use the address from the cm_id in case the 860 * caller specified 0 for the port number. 861 */ 862 sa = (struct sockaddr *)&cma_xprt->sc_cm_id->route.addr.src_addr; 863 svc_xprt_set_local(&cma_xprt->sc_xprt, sa, salen); 864 865 return &cma_xprt->sc_xprt; 866 867 err1: 868 rdma_destroy_id(listen_id); 869 err0: 870 kfree(cma_xprt); 871 return ERR_PTR(ret); 872 } 873 874 static struct svc_rdma_fastreg_mr *rdma_alloc_frmr(struct svcxprt_rdma *xprt) 875 { 876 struct ib_mr *mr; 877 struct scatterlist *sg; 878 struct svc_rdma_fastreg_mr *frmr; 879 u32 num_sg; 880 881 frmr = kmalloc(sizeof(*frmr), GFP_KERNEL); 882 if (!frmr) 883 goto err; 884 885 num_sg = min_t(u32, RPCSVC_MAXPAGES, xprt->sc_frmr_pg_list_len); 886 mr = ib_alloc_mr(xprt->sc_pd, IB_MR_TYPE_MEM_REG, num_sg); 887 if (IS_ERR(mr)) 888 goto err_free_frmr; 889 890 sg = kcalloc(RPCSVC_MAXPAGES, sizeof(*sg), GFP_KERNEL); 891 if (!sg) 892 goto err_free_mr; 893 894 sg_init_table(sg, RPCSVC_MAXPAGES); 895 896 frmr->mr = mr; 897 frmr->sg = sg; 898 INIT_LIST_HEAD(&frmr->frmr_list); 899 return frmr; 900 901 err_free_mr: 902 ib_dereg_mr(mr); 903 err_free_frmr: 904 kfree(frmr); 905 err: 906 return ERR_PTR(-ENOMEM); 907 } 908 909 static void rdma_dealloc_frmr_q(struct svcxprt_rdma *xprt) 910 { 911 struct svc_rdma_fastreg_mr *frmr; 912 913 while (!list_empty(&xprt->sc_frmr_q)) { 914 frmr = list_entry(xprt->sc_frmr_q.next, 915 struct svc_rdma_fastreg_mr, frmr_list); 916 list_del_init(&frmr->frmr_list); 917 kfree(frmr->sg); 918 ib_dereg_mr(frmr->mr); 919 kfree(frmr); 920 } 921 } 922 923 struct svc_rdma_fastreg_mr *svc_rdma_get_frmr(struct svcxprt_rdma *rdma) 924 { 925 struct svc_rdma_fastreg_mr *frmr = NULL; 926 927 spin_lock_bh(&rdma->sc_frmr_q_lock); 928 if (!list_empty(&rdma->sc_frmr_q)) { 929 frmr = list_entry(rdma->sc_frmr_q.next, 930 struct svc_rdma_fastreg_mr, frmr_list); 931 list_del_init(&frmr->frmr_list); 932 frmr->sg_nents = 0; 933 } 934 spin_unlock_bh(&rdma->sc_frmr_q_lock); 935 if (frmr) 936 return frmr; 937 938 return rdma_alloc_frmr(rdma); 939 } 940 941 void svc_rdma_put_frmr(struct svcxprt_rdma *rdma, 942 struct svc_rdma_fastreg_mr *frmr) 943 { 944 if (frmr) { 945 ib_dma_unmap_sg(rdma->sc_cm_id->device, 946 frmr->sg, frmr->sg_nents, frmr->direction); 947 atomic_dec(&rdma->sc_dma_used); 948 spin_lock_bh(&rdma->sc_frmr_q_lock); 949 WARN_ON_ONCE(!list_empty(&frmr->frmr_list)); 950 list_add(&frmr->frmr_list, &rdma->sc_frmr_q); 951 spin_unlock_bh(&rdma->sc_frmr_q_lock); 952 } 953 } 954 955 /* 956 * This is the xpo_recvfrom function for listening endpoints. Its 957 * purpose is to accept incoming connections. The CMA callback handler 958 * has already created a new transport and attached it to the new CMA 959 * ID. 960 * 961 * There is a queue of pending connections hung on the listening 962 * transport. This queue contains the new svc_xprt structure. This 963 * function takes svc_xprt structures off the accept_q and completes 964 * the connection. 965 */ 966 static struct svc_xprt *svc_rdma_accept(struct svc_xprt *xprt) 967 { 968 struct svcxprt_rdma *listen_rdma; 969 struct svcxprt_rdma *newxprt = NULL; 970 struct rdma_conn_param conn_param; 971 struct rpcrdma_connect_private pmsg; 972 struct ib_qp_init_attr qp_attr; 973 struct ib_device *dev; 974 unsigned int i; 975 int ret = 0; 976 977 listen_rdma = container_of(xprt, struct svcxprt_rdma, sc_xprt); 978 clear_bit(XPT_CONN, &xprt->xpt_flags); 979 /* Get the next entry off the accept list */ 980 spin_lock_bh(&listen_rdma->sc_lock); 981 if (!list_empty(&listen_rdma->sc_accept_q)) { 982 newxprt = list_entry(listen_rdma->sc_accept_q.next, 983 struct svcxprt_rdma, sc_accept_q); 984 list_del_init(&newxprt->sc_accept_q); 985 } 986 if (!list_empty(&listen_rdma->sc_accept_q)) 987 set_bit(XPT_CONN, &listen_rdma->sc_xprt.xpt_flags); 988 spin_unlock_bh(&listen_rdma->sc_lock); 989 if (!newxprt) 990 return NULL; 991 992 dprintk("svcrdma: newxprt from accept queue = %p, cm_id=%p\n", 993 newxprt, newxprt->sc_cm_id); 994 995 dev = newxprt->sc_cm_id->device; 996 997 /* Qualify the transport resource defaults with the 998 * capabilities of this particular device */ 999 newxprt->sc_max_sge = min((size_t)dev->attrs.max_sge, 1000 (size_t)RPCSVC_MAXPAGES); 1001 newxprt->sc_max_sge_rd = min_t(size_t, dev->attrs.max_sge_rd, 1002 RPCSVC_MAXPAGES); 1003 newxprt->sc_max_req_size = svcrdma_max_req_size; 1004 newxprt->sc_max_requests = min_t(u32, dev->attrs.max_qp_wr, 1005 svcrdma_max_requests); 1006 newxprt->sc_max_bc_requests = min_t(u32, dev->attrs.max_qp_wr, 1007 svcrdma_max_bc_requests); 1008 newxprt->sc_rq_depth = newxprt->sc_max_requests + 1009 newxprt->sc_max_bc_requests; 1010 newxprt->sc_sq_depth = RPCRDMA_SQ_DEPTH_MULT * newxprt->sc_rq_depth; 1011 1012 if (!svc_rdma_prealloc_ctxts(newxprt)) 1013 goto errout; 1014 if (!svc_rdma_prealloc_maps(newxprt)) 1015 goto errout; 1016 1017 /* 1018 * Limit ORD based on client limit, local device limit, and 1019 * configured svcrdma limit. 1020 */ 1021 newxprt->sc_ord = min_t(size_t, dev->attrs.max_qp_rd_atom, newxprt->sc_ord); 1022 newxprt->sc_ord = min_t(size_t, svcrdma_ord, newxprt->sc_ord); 1023 1024 newxprt->sc_pd = ib_alloc_pd(dev, 0); 1025 if (IS_ERR(newxprt->sc_pd)) { 1026 dprintk("svcrdma: error creating PD for connect request\n"); 1027 goto errout; 1028 } 1029 newxprt->sc_sq_cq = ib_alloc_cq(dev, newxprt, newxprt->sc_sq_depth, 1030 0, IB_POLL_SOFTIRQ); 1031 if (IS_ERR(newxprt->sc_sq_cq)) { 1032 dprintk("svcrdma: error creating SQ CQ for connect request\n"); 1033 goto errout; 1034 } 1035 newxprt->sc_rq_cq = ib_alloc_cq(dev, newxprt, newxprt->sc_rq_depth, 1036 0, IB_POLL_SOFTIRQ); 1037 if (IS_ERR(newxprt->sc_rq_cq)) { 1038 dprintk("svcrdma: error creating RQ CQ for connect request\n"); 1039 goto errout; 1040 } 1041 1042 memset(&qp_attr, 0, sizeof qp_attr); 1043 qp_attr.event_handler = qp_event_handler; 1044 qp_attr.qp_context = &newxprt->sc_xprt; 1045 qp_attr.cap.max_send_wr = newxprt->sc_sq_depth; 1046 qp_attr.cap.max_recv_wr = newxprt->sc_rq_depth; 1047 qp_attr.cap.max_send_sge = newxprt->sc_max_sge; 1048 qp_attr.cap.max_recv_sge = newxprt->sc_max_sge; 1049 qp_attr.sq_sig_type = IB_SIGNAL_REQ_WR; 1050 qp_attr.qp_type = IB_QPT_RC; 1051 qp_attr.send_cq = newxprt->sc_sq_cq; 1052 qp_attr.recv_cq = newxprt->sc_rq_cq; 1053 dprintk("svcrdma: newxprt->sc_cm_id=%p, newxprt->sc_pd=%p\n" 1054 " cm_id->device=%p, sc_pd->device=%p\n" 1055 " cap.max_send_wr = %d\n" 1056 " cap.max_recv_wr = %d\n" 1057 " cap.max_send_sge = %d\n" 1058 " cap.max_recv_sge = %d\n", 1059 newxprt->sc_cm_id, newxprt->sc_pd, 1060 dev, newxprt->sc_pd->device, 1061 qp_attr.cap.max_send_wr, 1062 qp_attr.cap.max_recv_wr, 1063 qp_attr.cap.max_send_sge, 1064 qp_attr.cap.max_recv_sge); 1065 1066 ret = rdma_create_qp(newxprt->sc_cm_id, newxprt->sc_pd, &qp_attr); 1067 if (ret) { 1068 dprintk("svcrdma: failed to create QP, ret=%d\n", ret); 1069 goto errout; 1070 } 1071 newxprt->sc_qp = newxprt->sc_cm_id->qp; 1072 1073 /* 1074 * Use the most secure set of MR resources based on the 1075 * transport type and available memory management features in 1076 * the device. Here's the table implemented below: 1077 * 1078 * Fast Global DMA Remote WR 1079 * Reg LKEY MR Access 1080 * Sup'd Sup'd Needed Needed 1081 * 1082 * IWARP N N Y Y 1083 * N Y Y Y 1084 * Y N Y N 1085 * Y Y N - 1086 * 1087 * IB N N Y N 1088 * N Y N - 1089 * Y N Y N 1090 * Y Y N - 1091 * 1092 * NB: iWARP requires remote write access for the data sink 1093 * of an RDMA_READ. IB does not. 1094 */ 1095 newxprt->sc_reader = rdma_read_chunk_lcl; 1096 if (dev->attrs.device_cap_flags & IB_DEVICE_MEM_MGT_EXTENSIONS) { 1097 newxprt->sc_frmr_pg_list_len = 1098 dev->attrs.max_fast_reg_page_list_len; 1099 newxprt->sc_dev_caps |= SVCRDMA_DEVCAP_FAST_REG; 1100 newxprt->sc_reader = rdma_read_chunk_frmr; 1101 } else 1102 newxprt->sc_snd_w_inv = false; 1103 1104 /* 1105 * Determine if a DMA MR is required and if so, what privs are required 1106 */ 1107 if (!rdma_protocol_iwarp(dev, newxprt->sc_cm_id->port_num) && 1108 !rdma_ib_or_roce(dev, newxprt->sc_cm_id->port_num)) 1109 goto errout; 1110 1111 if (rdma_protocol_iwarp(dev, newxprt->sc_cm_id->port_num)) 1112 newxprt->sc_dev_caps |= SVCRDMA_DEVCAP_READ_W_INV; 1113 1114 /* Post receive buffers */ 1115 for (i = 0; i < newxprt->sc_max_requests; i++) { 1116 ret = svc_rdma_post_recv(newxprt, GFP_KERNEL); 1117 if (ret) { 1118 dprintk("svcrdma: failure posting receive buffers\n"); 1119 goto errout; 1120 } 1121 } 1122 1123 /* Swap out the handler */ 1124 newxprt->sc_cm_id->event_handler = rdma_cma_handler; 1125 1126 /* Construct RDMA-CM private message */ 1127 pmsg.cp_magic = rpcrdma_cmp_magic; 1128 pmsg.cp_version = RPCRDMA_CMP_VERSION; 1129 pmsg.cp_flags = 0; 1130 pmsg.cp_send_size = pmsg.cp_recv_size = 1131 rpcrdma_encode_buffer_size(newxprt->sc_max_req_size); 1132 1133 /* Accept Connection */ 1134 set_bit(RDMAXPRT_CONN_PENDING, &newxprt->sc_flags); 1135 memset(&conn_param, 0, sizeof conn_param); 1136 conn_param.responder_resources = 0; 1137 conn_param.initiator_depth = newxprt->sc_ord; 1138 conn_param.private_data = &pmsg; 1139 conn_param.private_data_len = sizeof(pmsg); 1140 ret = rdma_accept(newxprt->sc_cm_id, &conn_param); 1141 if (ret) { 1142 dprintk("svcrdma: failed to accept new connection, ret=%d\n", 1143 ret); 1144 goto errout; 1145 } 1146 1147 dprintk("svcrdma: new connection %p accepted with the following " 1148 "attributes:\n" 1149 " local_ip : %pI4\n" 1150 " local_port : %d\n" 1151 " remote_ip : %pI4\n" 1152 " remote_port : %d\n" 1153 " max_sge : %d\n" 1154 " max_sge_rd : %d\n" 1155 " sq_depth : %d\n" 1156 " max_requests : %d\n" 1157 " ord : %d\n", 1158 newxprt, 1159 &((struct sockaddr_in *)&newxprt->sc_cm_id-> 1160 route.addr.src_addr)->sin_addr.s_addr, 1161 ntohs(((struct sockaddr_in *)&newxprt->sc_cm_id-> 1162 route.addr.src_addr)->sin_port), 1163 &((struct sockaddr_in *)&newxprt->sc_cm_id-> 1164 route.addr.dst_addr)->sin_addr.s_addr, 1165 ntohs(((struct sockaddr_in *)&newxprt->sc_cm_id-> 1166 route.addr.dst_addr)->sin_port), 1167 newxprt->sc_max_sge, 1168 newxprt->sc_max_sge_rd, 1169 newxprt->sc_sq_depth, 1170 newxprt->sc_max_requests, 1171 newxprt->sc_ord); 1172 1173 return &newxprt->sc_xprt; 1174 1175 errout: 1176 dprintk("svcrdma: failure accepting new connection rc=%d.\n", ret); 1177 /* Take a reference in case the DTO handler runs */ 1178 svc_xprt_get(&newxprt->sc_xprt); 1179 if (newxprt->sc_qp && !IS_ERR(newxprt->sc_qp)) 1180 ib_destroy_qp(newxprt->sc_qp); 1181 rdma_destroy_id(newxprt->sc_cm_id); 1182 /* This call to put will destroy the transport */ 1183 svc_xprt_put(&newxprt->sc_xprt); 1184 return NULL; 1185 } 1186 1187 static void svc_rdma_release_rqst(struct svc_rqst *rqstp) 1188 { 1189 } 1190 1191 /* 1192 * When connected, an svc_xprt has at least two references: 1193 * 1194 * - A reference held by the cm_id between the ESTABLISHED and 1195 * DISCONNECTED events. If the remote peer disconnected first, this 1196 * reference could be gone. 1197 * 1198 * - A reference held by the svc_recv code that called this function 1199 * as part of close processing. 1200 * 1201 * At a minimum one references should still be held. 1202 */ 1203 static void svc_rdma_detach(struct svc_xprt *xprt) 1204 { 1205 struct svcxprt_rdma *rdma = 1206 container_of(xprt, struct svcxprt_rdma, sc_xprt); 1207 dprintk("svc: svc_rdma_detach(%p)\n", xprt); 1208 1209 /* Disconnect and flush posted WQE */ 1210 rdma_disconnect(rdma->sc_cm_id); 1211 } 1212 1213 static void __svc_rdma_free(struct work_struct *work) 1214 { 1215 struct svcxprt_rdma *rdma = 1216 container_of(work, struct svcxprt_rdma, sc_work); 1217 struct svc_xprt *xprt = &rdma->sc_xprt; 1218 1219 dprintk("svcrdma: %s(%p)\n", __func__, rdma); 1220 1221 if (rdma->sc_qp && !IS_ERR(rdma->sc_qp)) 1222 ib_drain_qp(rdma->sc_qp); 1223 1224 /* We should only be called from kref_put */ 1225 if (atomic_read(&xprt->xpt_ref.refcount) != 0) 1226 pr_err("svcrdma: sc_xprt still in use? (%d)\n", 1227 atomic_read(&xprt->xpt_ref.refcount)); 1228 1229 /* 1230 * Destroy queued, but not processed read completions. Note 1231 * that this cleanup has to be done before destroying the 1232 * cm_id because the device ptr is needed to unmap the dma in 1233 * svc_rdma_put_context. 1234 */ 1235 while (!list_empty(&rdma->sc_read_complete_q)) { 1236 struct svc_rdma_op_ctxt *ctxt; 1237 ctxt = list_entry(rdma->sc_read_complete_q.next, 1238 struct svc_rdma_op_ctxt, 1239 dto_q); 1240 list_del_init(&ctxt->dto_q); 1241 svc_rdma_put_context(ctxt, 1); 1242 } 1243 1244 /* Destroy queued, but not processed recv completions */ 1245 while (!list_empty(&rdma->sc_rq_dto_q)) { 1246 struct svc_rdma_op_ctxt *ctxt; 1247 ctxt = list_entry(rdma->sc_rq_dto_q.next, 1248 struct svc_rdma_op_ctxt, 1249 dto_q); 1250 list_del_init(&ctxt->dto_q); 1251 svc_rdma_put_context(ctxt, 1); 1252 } 1253 1254 /* Warn if we leaked a resource or under-referenced */ 1255 if (rdma->sc_ctxt_used != 0) 1256 pr_err("svcrdma: ctxt still in use? (%d)\n", 1257 rdma->sc_ctxt_used); 1258 if (atomic_read(&rdma->sc_dma_used) != 0) 1259 pr_err("svcrdma: dma still in use? (%d)\n", 1260 atomic_read(&rdma->sc_dma_used)); 1261 1262 /* Final put of backchannel client transport */ 1263 if (xprt->xpt_bc_xprt) { 1264 xprt_put(xprt->xpt_bc_xprt); 1265 xprt->xpt_bc_xprt = NULL; 1266 } 1267 1268 rdma_dealloc_frmr_q(rdma); 1269 svc_rdma_destroy_ctxts(rdma); 1270 svc_rdma_destroy_maps(rdma); 1271 1272 /* Destroy the QP if present (not a listener) */ 1273 if (rdma->sc_qp && !IS_ERR(rdma->sc_qp)) 1274 ib_destroy_qp(rdma->sc_qp); 1275 1276 if (rdma->sc_sq_cq && !IS_ERR(rdma->sc_sq_cq)) 1277 ib_free_cq(rdma->sc_sq_cq); 1278 1279 if (rdma->sc_rq_cq && !IS_ERR(rdma->sc_rq_cq)) 1280 ib_free_cq(rdma->sc_rq_cq); 1281 1282 if (rdma->sc_pd && !IS_ERR(rdma->sc_pd)) 1283 ib_dealloc_pd(rdma->sc_pd); 1284 1285 /* Destroy the CM ID */ 1286 rdma_destroy_id(rdma->sc_cm_id); 1287 1288 kfree(rdma); 1289 } 1290 1291 static void svc_rdma_free(struct svc_xprt *xprt) 1292 { 1293 struct svcxprt_rdma *rdma = 1294 container_of(xprt, struct svcxprt_rdma, sc_xprt); 1295 INIT_WORK(&rdma->sc_work, __svc_rdma_free); 1296 queue_work(svc_rdma_wq, &rdma->sc_work); 1297 } 1298 1299 static int svc_rdma_has_wspace(struct svc_xprt *xprt) 1300 { 1301 struct svcxprt_rdma *rdma = 1302 container_of(xprt, struct svcxprt_rdma, sc_xprt); 1303 1304 /* 1305 * If there are already waiters on the SQ, 1306 * return false. 1307 */ 1308 if (waitqueue_active(&rdma->sc_send_wait)) 1309 return 0; 1310 1311 /* Otherwise return true. */ 1312 return 1; 1313 } 1314 1315 static int svc_rdma_secure_port(struct svc_rqst *rqstp) 1316 { 1317 return 1; 1318 } 1319 1320 int svc_rdma_send(struct svcxprt_rdma *xprt, struct ib_send_wr *wr) 1321 { 1322 struct ib_send_wr *bad_wr, *n_wr; 1323 int wr_count; 1324 int i; 1325 int ret; 1326 1327 if (test_bit(XPT_CLOSE, &xprt->sc_xprt.xpt_flags)) 1328 return -ENOTCONN; 1329 1330 wr_count = 1; 1331 for (n_wr = wr->next; n_wr; n_wr = n_wr->next) 1332 wr_count++; 1333 1334 /* If the SQ is full, wait until an SQ entry is available */ 1335 while (1) { 1336 spin_lock_bh(&xprt->sc_lock); 1337 if (xprt->sc_sq_depth < atomic_read(&xprt->sc_sq_count) + wr_count) { 1338 spin_unlock_bh(&xprt->sc_lock); 1339 atomic_inc(&rdma_stat_sq_starve); 1340 1341 /* Wait until SQ WR available if SQ still full */ 1342 wait_event(xprt->sc_send_wait, 1343 atomic_read(&xprt->sc_sq_count) < 1344 xprt->sc_sq_depth); 1345 if (test_bit(XPT_CLOSE, &xprt->sc_xprt.xpt_flags)) 1346 return -ENOTCONN; 1347 continue; 1348 } 1349 /* Take a transport ref for each WR posted */ 1350 for (i = 0; i < wr_count; i++) 1351 svc_xprt_get(&xprt->sc_xprt); 1352 1353 /* Bump used SQ WR count and post */ 1354 atomic_add(wr_count, &xprt->sc_sq_count); 1355 ret = ib_post_send(xprt->sc_qp, wr, &bad_wr); 1356 if (ret) { 1357 set_bit(XPT_CLOSE, &xprt->sc_xprt.xpt_flags); 1358 atomic_sub(wr_count, &xprt->sc_sq_count); 1359 for (i = 0; i < wr_count; i ++) 1360 svc_xprt_put(&xprt->sc_xprt); 1361 dprintk("svcrdma: failed to post SQ WR rc=%d, " 1362 "sc_sq_count=%d, sc_sq_depth=%d\n", 1363 ret, atomic_read(&xprt->sc_sq_count), 1364 xprt->sc_sq_depth); 1365 } 1366 spin_unlock_bh(&xprt->sc_lock); 1367 if (ret) 1368 wake_up(&xprt->sc_send_wait); 1369 break; 1370 } 1371 return ret; 1372 } 1373