1 /* 2 * Copyright (c) 2014 Open Grid Computing, Inc. All rights reserved. 3 * Copyright (c) 2005-2007 Network Appliance, Inc. All rights reserved. 4 * 5 * This software is available to you under a choice of one of two 6 * licenses. You may choose to be licensed under the terms of the GNU 7 * General Public License (GPL) Version 2, available from the file 8 * COPYING in the main directory of this source tree, or the BSD-type 9 * license below: 10 * 11 * Redistribution and use in source and binary forms, with or without 12 * modification, are permitted provided that the following conditions 13 * are met: 14 * 15 * Redistributions of source code must retain the above copyright 16 * notice, this list of conditions and the following disclaimer. 17 * 18 * Redistributions in binary form must reproduce the above 19 * copyright notice, this list of conditions and the following 20 * disclaimer in the documentation and/or other materials provided 21 * with the distribution. 22 * 23 * Neither the name of the Network Appliance, Inc. nor the names of 24 * its contributors may be used to endorse or promote products 25 * derived from this software without specific prior written 26 * permission. 27 * 28 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 29 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 30 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 31 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 32 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 33 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 34 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 35 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 36 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 37 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 38 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 39 * 40 * Author: Tom Tucker <tom@opengridcomputing.com> 41 */ 42 43 #include <linux/sunrpc/svc_xprt.h> 44 #include <linux/sunrpc/debug.h> 45 #include <linux/sunrpc/rpc_rdma.h> 46 #include <linux/interrupt.h> 47 #include <linux/sched.h> 48 #include <linux/slab.h> 49 #include <linux/spinlock.h> 50 #include <linux/workqueue.h> 51 #include <rdma/ib_verbs.h> 52 #include <rdma/rdma_cm.h> 53 #include <linux/sunrpc/svc_rdma.h> 54 #include <linux/export.h> 55 #include "xprt_rdma.h" 56 57 #define RPCDBG_FACILITY RPCDBG_SVCXPRT 58 59 static struct svcxprt_rdma *rdma_create_xprt(struct svc_serv *, int); 60 static struct svc_xprt *svc_rdma_create(struct svc_serv *serv, 61 struct net *net, 62 struct sockaddr *sa, int salen, 63 int flags); 64 static struct svc_xprt *svc_rdma_accept(struct svc_xprt *xprt); 65 static void svc_rdma_release_rqst(struct svc_rqst *); 66 static void svc_rdma_detach(struct svc_xprt *xprt); 67 static void svc_rdma_free(struct svc_xprt *xprt); 68 static int svc_rdma_has_wspace(struct svc_xprt *xprt); 69 static int svc_rdma_secure_port(struct svc_rqst *); 70 static void svc_rdma_kill_temp_xprt(struct svc_xprt *); 71 72 static struct svc_xprt_ops svc_rdma_ops = { 73 .xpo_create = svc_rdma_create, 74 .xpo_recvfrom = svc_rdma_recvfrom, 75 .xpo_sendto = svc_rdma_sendto, 76 .xpo_release_rqst = svc_rdma_release_rqst, 77 .xpo_detach = svc_rdma_detach, 78 .xpo_free = svc_rdma_free, 79 .xpo_prep_reply_hdr = svc_rdma_prep_reply_hdr, 80 .xpo_has_wspace = svc_rdma_has_wspace, 81 .xpo_accept = svc_rdma_accept, 82 .xpo_secure_port = svc_rdma_secure_port, 83 .xpo_kill_temp_xprt = svc_rdma_kill_temp_xprt, 84 }; 85 86 struct svc_xprt_class svc_rdma_class = { 87 .xcl_name = "rdma", 88 .xcl_owner = THIS_MODULE, 89 .xcl_ops = &svc_rdma_ops, 90 .xcl_max_payload = RPCSVC_MAXPAYLOAD_RDMA, 91 .xcl_ident = XPRT_TRANSPORT_RDMA, 92 }; 93 94 #if defined(CONFIG_SUNRPC_BACKCHANNEL) 95 static struct svc_xprt *svc_rdma_bc_create(struct svc_serv *, struct net *, 96 struct sockaddr *, int, int); 97 static void svc_rdma_bc_detach(struct svc_xprt *); 98 static void svc_rdma_bc_free(struct svc_xprt *); 99 100 static struct svc_xprt_ops svc_rdma_bc_ops = { 101 .xpo_create = svc_rdma_bc_create, 102 .xpo_detach = svc_rdma_bc_detach, 103 .xpo_free = svc_rdma_bc_free, 104 .xpo_prep_reply_hdr = svc_rdma_prep_reply_hdr, 105 .xpo_secure_port = svc_rdma_secure_port, 106 }; 107 108 struct svc_xprt_class svc_rdma_bc_class = { 109 .xcl_name = "rdma-bc", 110 .xcl_owner = THIS_MODULE, 111 .xcl_ops = &svc_rdma_bc_ops, 112 .xcl_max_payload = (1024 - RPCRDMA_HDRLEN_MIN) 113 }; 114 115 static struct svc_xprt *svc_rdma_bc_create(struct svc_serv *serv, 116 struct net *net, 117 struct sockaddr *sa, int salen, 118 int flags) 119 { 120 struct svcxprt_rdma *cma_xprt; 121 struct svc_xprt *xprt; 122 123 cma_xprt = rdma_create_xprt(serv, 0); 124 if (!cma_xprt) 125 return ERR_PTR(-ENOMEM); 126 xprt = &cma_xprt->sc_xprt; 127 128 svc_xprt_init(net, &svc_rdma_bc_class, xprt, serv); 129 serv->sv_bc_xprt = xprt; 130 131 dprintk("svcrdma: %s(%p)\n", __func__, xprt); 132 return xprt; 133 } 134 135 static void svc_rdma_bc_detach(struct svc_xprt *xprt) 136 { 137 dprintk("svcrdma: %s(%p)\n", __func__, xprt); 138 } 139 140 static void svc_rdma_bc_free(struct svc_xprt *xprt) 141 { 142 struct svcxprt_rdma *rdma = 143 container_of(xprt, struct svcxprt_rdma, sc_xprt); 144 145 dprintk("svcrdma: %s(%p)\n", __func__, xprt); 146 if (xprt) 147 kfree(rdma); 148 } 149 #endif /* CONFIG_SUNRPC_BACKCHANNEL */ 150 151 static struct svc_rdma_op_ctxt *alloc_ctxt(struct svcxprt_rdma *xprt, 152 gfp_t flags) 153 { 154 struct svc_rdma_op_ctxt *ctxt; 155 156 ctxt = kmalloc(sizeof(*ctxt), flags); 157 if (ctxt) { 158 ctxt->xprt = xprt; 159 INIT_LIST_HEAD(&ctxt->free); 160 INIT_LIST_HEAD(&ctxt->dto_q); 161 } 162 return ctxt; 163 } 164 165 static bool svc_rdma_prealloc_ctxts(struct svcxprt_rdma *xprt) 166 { 167 unsigned int i; 168 169 /* Each RPC/RDMA credit can consume a number of send 170 * and receive WQEs. One ctxt is allocated for each. 171 */ 172 i = xprt->sc_sq_depth + xprt->sc_rq_depth; 173 174 while (i--) { 175 struct svc_rdma_op_ctxt *ctxt; 176 177 ctxt = alloc_ctxt(xprt, GFP_KERNEL); 178 if (!ctxt) { 179 dprintk("svcrdma: No memory for RDMA ctxt\n"); 180 return false; 181 } 182 list_add(&ctxt->free, &xprt->sc_ctxts); 183 } 184 return true; 185 } 186 187 struct svc_rdma_op_ctxt *svc_rdma_get_context(struct svcxprt_rdma *xprt) 188 { 189 struct svc_rdma_op_ctxt *ctxt = NULL; 190 191 spin_lock_bh(&xprt->sc_ctxt_lock); 192 xprt->sc_ctxt_used++; 193 if (list_empty(&xprt->sc_ctxts)) 194 goto out_empty; 195 196 ctxt = list_first_entry(&xprt->sc_ctxts, 197 struct svc_rdma_op_ctxt, free); 198 list_del_init(&ctxt->free); 199 spin_unlock_bh(&xprt->sc_ctxt_lock); 200 201 out: 202 ctxt->count = 0; 203 ctxt->mapped_sges = 0; 204 ctxt->frmr = NULL; 205 return ctxt; 206 207 out_empty: 208 /* Either pre-allocation missed the mark, or send 209 * queue accounting is broken. 210 */ 211 spin_unlock_bh(&xprt->sc_ctxt_lock); 212 213 ctxt = alloc_ctxt(xprt, GFP_NOIO); 214 if (ctxt) 215 goto out; 216 217 spin_lock_bh(&xprt->sc_ctxt_lock); 218 xprt->sc_ctxt_used--; 219 spin_unlock_bh(&xprt->sc_ctxt_lock); 220 WARN_ONCE(1, "svcrdma: empty RDMA ctxt list?\n"); 221 return NULL; 222 } 223 224 void svc_rdma_unmap_dma(struct svc_rdma_op_ctxt *ctxt) 225 { 226 struct svcxprt_rdma *xprt = ctxt->xprt; 227 struct ib_device *device = xprt->sc_cm_id->device; 228 u32 lkey = xprt->sc_pd->local_dma_lkey; 229 unsigned int i, count; 230 231 for (count = 0, i = 0; i < ctxt->mapped_sges; i++) { 232 /* 233 * Unmap the DMA addr in the SGE if the lkey matches 234 * the local_dma_lkey, otherwise, ignore it since it is 235 * an FRMR lkey and will be unmapped later when the 236 * last WR that uses it completes. 237 */ 238 if (ctxt->sge[i].lkey == lkey) { 239 count++; 240 ib_dma_unmap_page(device, 241 ctxt->sge[i].addr, 242 ctxt->sge[i].length, 243 ctxt->direction); 244 } 245 } 246 ctxt->mapped_sges = 0; 247 atomic_sub(count, &xprt->sc_dma_used); 248 } 249 250 void svc_rdma_put_context(struct svc_rdma_op_ctxt *ctxt, int free_pages) 251 { 252 struct svcxprt_rdma *xprt = ctxt->xprt; 253 int i; 254 255 if (free_pages) 256 for (i = 0; i < ctxt->count; i++) 257 put_page(ctxt->pages[i]); 258 259 spin_lock_bh(&xprt->sc_ctxt_lock); 260 xprt->sc_ctxt_used--; 261 list_add(&ctxt->free, &xprt->sc_ctxts); 262 spin_unlock_bh(&xprt->sc_ctxt_lock); 263 } 264 265 static void svc_rdma_destroy_ctxts(struct svcxprt_rdma *xprt) 266 { 267 while (!list_empty(&xprt->sc_ctxts)) { 268 struct svc_rdma_op_ctxt *ctxt; 269 270 ctxt = list_first_entry(&xprt->sc_ctxts, 271 struct svc_rdma_op_ctxt, free); 272 list_del(&ctxt->free); 273 kfree(ctxt); 274 } 275 } 276 277 static struct svc_rdma_req_map *alloc_req_map(gfp_t flags) 278 { 279 struct svc_rdma_req_map *map; 280 281 map = kmalloc(sizeof(*map), flags); 282 if (map) 283 INIT_LIST_HEAD(&map->free); 284 return map; 285 } 286 287 static bool svc_rdma_prealloc_maps(struct svcxprt_rdma *xprt) 288 { 289 unsigned int i; 290 291 /* One for each receive buffer on this connection. */ 292 i = xprt->sc_max_requests; 293 294 while (i--) { 295 struct svc_rdma_req_map *map; 296 297 map = alloc_req_map(GFP_KERNEL); 298 if (!map) { 299 dprintk("svcrdma: No memory for request map\n"); 300 return false; 301 } 302 list_add(&map->free, &xprt->sc_maps); 303 } 304 return true; 305 } 306 307 struct svc_rdma_req_map *svc_rdma_get_req_map(struct svcxprt_rdma *xprt) 308 { 309 struct svc_rdma_req_map *map = NULL; 310 311 spin_lock(&xprt->sc_map_lock); 312 if (list_empty(&xprt->sc_maps)) 313 goto out_empty; 314 315 map = list_first_entry(&xprt->sc_maps, 316 struct svc_rdma_req_map, free); 317 list_del_init(&map->free); 318 spin_unlock(&xprt->sc_map_lock); 319 320 out: 321 map->count = 0; 322 return map; 323 324 out_empty: 325 spin_unlock(&xprt->sc_map_lock); 326 327 /* Pre-allocation amount was incorrect */ 328 map = alloc_req_map(GFP_NOIO); 329 if (map) 330 goto out; 331 332 WARN_ONCE(1, "svcrdma: empty request map list?\n"); 333 return NULL; 334 } 335 336 void svc_rdma_put_req_map(struct svcxprt_rdma *xprt, 337 struct svc_rdma_req_map *map) 338 { 339 spin_lock(&xprt->sc_map_lock); 340 list_add(&map->free, &xprt->sc_maps); 341 spin_unlock(&xprt->sc_map_lock); 342 } 343 344 static void svc_rdma_destroy_maps(struct svcxprt_rdma *xprt) 345 { 346 while (!list_empty(&xprt->sc_maps)) { 347 struct svc_rdma_req_map *map; 348 349 map = list_first_entry(&xprt->sc_maps, 350 struct svc_rdma_req_map, free); 351 list_del(&map->free); 352 kfree(map); 353 } 354 } 355 356 /* QP event handler */ 357 static void qp_event_handler(struct ib_event *event, void *context) 358 { 359 struct svc_xprt *xprt = context; 360 361 switch (event->event) { 362 /* These are considered benign events */ 363 case IB_EVENT_PATH_MIG: 364 case IB_EVENT_COMM_EST: 365 case IB_EVENT_SQ_DRAINED: 366 case IB_EVENT_QP_LAST_WQE_REACHED: 367 dprintk("svcrdma: QP event %s (%d) received for QP=%p\n", 368 ib_event_msg(event->event), event->event, 369 event->element.qp); 370 break; 371 /* These are considered fatal events */ 372 case IB_EVENT_PATH_MIG_ERR: 373 case IB_EVENT_QP_FATAL: 374 case IB_EVENT_QP_REQ_ERR: 375 case IB_EVENT_QP_ACCESS_ERR: 376 case IB_EVENT_DEVICE_FATAL: 377 default: 378 dprintk("svcrdma: QP ERROR event %s (%d) received for QP=%p, " 379 "closing transport\n", 380 ib_event_msg(event->event), event->event, 381 event->element.qp); 382 set_bit(XPT_CLOSE, &xprt->xpt_flags); 383 break; 384 } 385 } 386 387 /** 388 * svc_rdma_wc_receive - Invoked by RDMA provider for each polled Receive WC 389 * @cq: completion queue 390 * @wc: completed WR 391 * 392 */ 393 static void svc_rdma_wc_receive(struct ib_cq *cq, struct ib_wc *wc) 394 { 395 struct svcxprt_rdma *xprt = cq->cq_context; 396 struct ib_cqe *cqe = wc->wr_cqe; 397 struct svc_rdma_op_ctxt *ctxt; 398 399 /* WARNING: Only wc->wr_cqe and wc->status are reliable */ 400 ctxt = container_of(cqe, struct svc_rdma_op_ctxt, cqe); 401 ctxt->wc_status = wc->status; 402 svc_rdma_unmap_dma(ctxt); 403 404 if (wc->status != IB_WC_SUCCESS) 405 goto flushed; 406 407 /* All wc fields are now known to be valid */ 408 ctxt->byte_len = wc->byte_len; 409 spin_lock(&xprt->sc_rq_dto_lock); 410 list_add_tail(&ctxt->dto_q, &xprt->sc_rq_dto_q); 411 spin_unlock(&xprt->sc_rq_dto_lock); 412 413 set_bit(XPT_DATA, &xprt->sc_xprt.xpt_flags); 414 if (test_bit(RDMAXPRT_CONN_PENDING, &xprt->sc_flags)) 415 goto out; 416 svc_xprt_enqueue(&xprt->sc_xprt); 417 goto out; 418 419 flushed: 420 if (wc->status != IB_WC_WR_FLUSH_ERR) 421 pr_warn("svcrdma: receive: %s (%u/0x%x)\n", 422 ib_wc_status_msg(wc->status), 423 wc->status, wc->vendor_err); 424 set_bit(XPT_CLOSE, &xprt->sc_xprt.xpt_flags); 425 svc_rdma_put_context(ctxt, 1); 426 427 out: 428 svc_xprt_put(&xprt->sc_xprt); 429 } 430 431 static void svc_rdma_send_wc_common(struct svcxprt_rdma *xprt, 432 struct ib_wc *wc, 433 const char *opname) 434 { 435 if (wc->status != IB_WC_SUCCESS) 436 goto err; 437 438 out: 439 atomic_dec(&xprt->sc_sq_count); 440 wake_up(&xprt->sc_send_wait); 441 return; 442 443 err: 444 set_bit(XPT_CLOSE, &xprt->sc_xprt.xpt_flags); 445 if (wc->status != IB_WC_WR_FLUSH_ERR) 446 pr_err("svcrdma: %s: %s (%u/0x%x)\n", 447 opname, ib_wc_status_msg(wc->status), 448 wc->status, wc->vendor_err); 449 goto out; 450 } 451 452 static void svc_rdma_send_wc_common_put(struct ib_cq *cq, struct ib_wc *wc, 453 const char *opname) 454 { 455 struct svcxprt_rdma *xprt = cq->cq_context; 456 457 svc_rdma_send_wc_common(xprt, wc, opname); 458 svc_xprt_put(&xprt->sc_xprt); 459 } 460 461 /** 462 * svc_rdma_wc_send - Invoked by RDMA provider for each polled Send WC 463 * @cq: completion queue 464 * @wc: completed WR 465 * 466 */ 467 void svc_rdma_wc_send(struct ib_cq *cq, struct ib_wc *wc) 468 { 469 struct ib_cqe *cqe = wc->wr_cqe; 470 struct svc_rdma_op_ctxt *ctxt; 471 472 svc_rdma_send_wc_common_put(cq, wc, "send"); 473 474 ctxt = container_of(cqe, struct svc_rdma_op_ctxt, cqe); 475 svc_rdma_unmap_dma(ctxt); 476 svc_rdma_put_context(ctxt, 1); 477 } 478 479 /** 480 * svc_rdma_wc_write - Invoked by RDMA provider for each polled Write WC 481 * @cq: completion queue 482 * @wc: completed WR 483 * 484 */ 485 void svc_rdma_wc_write(struct ib_cq *cq, struct ib_wc *wc) 486 { 487 struct ib_cqe *cqe = wc->wr_cqe; 488 struct svc_rdma_op_ctxt *ctxt; 489 490 svc_rdma_send_wc_common_put(cq, wc, "write"); 491 492 ctxt = container_of(cqe, struct svc_rdma_op_ctxt, cqe); 493 svc_rdma_unmap_dma(ctxt); 494 svc_rdma_put_context(ctxt, 0); 495 } 496 497 /** 498 * svc_rdma_wc_reg - Invoked by RDMA provider for each polled FASTREG WC 499 * @cq: completion queue 500 * @wc: completed WR 501 * 502 */ 503 void svc_rdma_wc_reg(struct ib_cq *cq, struct ib_wc *wc) 504 { 505 svc_rdma_send_wc_common_put(cq, wc, "fastreg"); 506 } 507 508 /** 509 * svc_rdma_wc_read - Invoked by RDMA provider for each polled Read WC 510 * @cq: completion queue 511 * @wc: completed WR 512 * 513 */ 514 void svc_rdma_wc_read(struct ib_cq *cq, struct ib_wc *wc) 515 { 516 struct svcxprt_rdma *xprt = cq->cq_context; 517 struct ib_cqe *cqe = wc->wr_cqe; 518 struct svc_rdma_op_ctxt *ctxt; 519 520 svc_rdma_send_wc_common(xprt, wc, "read"); 521 522 ctxt = container_of(cqe, struct svc_rdma_op_ctxt, cqe); 523 svc_rdma_unmap_dma(ctxt); 524 svc_rdma_put_frmr(xprt, ctxt->frmr); 525 526 if (test_bit(RDMACTXT_F_LAST_CTXT, &ctxt->flags)) { 527 struct svc_rdma_op_ctxt *read_hdr; 528 529 read_hdr = ctxt->read_hdr; 530 spin_lock(&xprt->sc_rq_dto_lock); 531 list_add_tail(&read_hdr->dto_q, 532 &xprt->sc_read_complete_q); 533 spin_unlock(&xprt->sc_rq_dto_lock); 534 535 set_bit(XPT_DATA, &xprt->sc_xprt.xpt_flags); 536 svc_xprt_enqueue(&xprt->sc_xprt); 537 } 538 539 svc_rdma_put_context(ctxt, 0); 540 svc_xprt_put(&xprt->sc_xprt); 541 } 542 543 /** 544 * svc_rdma_wc_inv - Invoked by RDMA provider for each polled LOCAL_INV WC 545 * @cq: completion queue 546 * @wc: completed WR 547 * 548 */ 549 void svc_rdma_wc_inv(struct ib_cq *cq, struct ib_wc *wc) 550 { 551 svc_rdma_send_wc_common_put(cq, wc, "localInv"); 552 } 553 554 static struct svcxprt_rdma *rdma_create_xprt(struct svc_serv *serv, 555 int listener) 556 { 557 struct svcxprt_rdma *cma_xprt = kzalloc(sizeof *cma_xprt, GFP_KERNEL); 558 559 if (!cma_xprt) 560 return NULL; 561 svc_xprt_init(&init_net, &svc_rdma_class, &cma_xprt->sc_xprt, serv); 562 INIT_LIST_HEAD(&cma_xprt->sc_accept_q); 563 INIT_LIST_HEAD(&cma_xprt->sc_dto_q); 564 INIT_LIST_HEAD(&cma_xprt->sc_rq_dto_q); 565 INIT_LIST_HEAD(&cma_xprt->sc_read_complete_q); 566 INIT_LIST_HEAD(&cma_xprt->sc_frmr_q); 567 INIT_LIST_HEAD(&cma_xprt->sc_ctxts); 568 INIT_LIST_HEAD(&cma_xprt->sc_maps); 569 init_waitqueue_head(&cma_xprt->sc_send_wait); 570 571 spin_lock_init(&cma_xprt->sc_lock); 572 spin_lock_init(&cma_xprt->sc_rq_dto_lock); 573 spin_lock_init(&cma_xprt->sc_frmr_q_lock); 574 spin_lock_init(&cma_xprt->sc_ctxt_lock); 575 spin_lock_init(&cma_xprt->sc_map_lock); 576 577 if (listener) 578 set_bit(XPT_LISTENER, &cma_xprt->sc_xprt.xpt_flags); 579 580 return cma_xprt; 581 } 582 583 int svc_rdma_post_recv(struct svcxprt_rdma *xprt, gfp_t flags) 584 { 585 struct ib_recv_wr recv_wr, *bad_recv_wr; 586 struct svc_rdma_op_ctxt *ctxt; 587 struct page *page; 588 dma_addr_t pa; 589 int sge_no; 590 int buflen; 591 int ret; 592 593 ctxt = svc_rdma_get_context(xprt); 594 buflen = 0; 595 ctxt->direction = DMA_FROM_DEVICE; 596 ctxt->cqe.done = svc_rdma_wc_receive; 597 for (sge_no = 0; buflen < xprt->sc_max_req_size; sge_no++) { 598 if (sge_no >= xprt->sc_max_sge) { 599 pr_err("svcrdma: Too many sges (%d)\n", sge_no); 600 goto err_put_ctxt; 601 } 602 page = alloc_page(flags); 603 if (!page) 604 goto err_put_ctxt; 605 ctxt->pages[sge_no] = page; 606 pa = ib_dma_map_page(xprt->sc_cm_id->device, 607 page, 0, PAGE_SIZE, 608 DMA_FROM_DEVICE); 609 if (ib_dma_mapping_error(xprt->sc_cm_id->device, pa)) 610 goto err_put_ctxt; 611 svc_rdma_count_mappings(xprt, ctxt); 612 ctxt->sge[sge_no].addr = pa; 613 ctxt->sge[sge_no].length = PAGE_SIZE; 614 ctxt->sge[sge_no].lkey = xprt->sc_pd->local_dma_lkey; 615 ctxt->count = sge_no + 1; 616 buflen += PAGE_SIZE; 617 } 618 recv_wr.next = NULL; 619 recv_wr.sg_list = &ctxt->sge[0]; 620 recv_wr.num_sge = ctxt->count; 621 recv_wr.wr_cqe = &ctxt->cqe; 622 623 svc_xprt_get(&xprt->sc_xprt); 624 ret = ib_post_recv(xprt->sc_qp, &recv_wr, &bad_recv_wr); 625 if (ret) { 626 svc_rdma_unmap_dma(ctxt); 627 svc_rdma_put_context(ctxt, 1); 628 svc_xprt_put(&xprt->sc_xprt); 629 } 630 return ret; 631 632 err_put_ctxt: 633 svc_rdma_unmap_dma(ctxt); 634 svc_rdma_put_context(ctxt, 1); 635 return -ENOMEM; 636 } 637 638 int svc_rdma_repost_recv(struct svcxprt_rdma *xprt, gfp_t flags) 639 { 640 int ret = 0; 641 642 ret = svc_rdma_post_recv(xprt, flags); 643 if (ret) { 644 pr_err("svcrdma: could not post a receive buffer, err=%d.\n", 645 ret); 646 pr_err("svcrdma: closing transport %p.\n", xprt); 647 set_bit(XPT_CLOSE, &xprt->sc_xprt.xpt_flags); 648 ret = -ENOTCONN; 649 } 650 return ret; 651 } 652 653 static void 654 svc_rdma_parse_connect_private(struct svcxprt_rdma *newxprt, 655 struct rdma_conn_param *param) 656 { 657 const struct rpcrdma_connect_private *pmsg = param->private_data; 658 659 if (pmsg && 660 pmsg->cp_magic == rpcrdma_cmp_magic && 661 pmsg->cp_version == RPCRDMA_CMP_VERSION) { 662 newxprt->sc_snd_w_inv = pmsg->cp_flags & 663 RPCRDMA_CMP_F_SND_W_INV_OK; 664 665 dprintk("svcrdma: client send_size %u, recv_size %u " 666 "remote inv %ssupported\n", 667 rpcrdma_decode_buffer_size(pmsg->cp_send_size), 668 rpcrdma_decode_buffer_size(pmsg->cp_recv_size), 669 newxprt->sc_snd_w_inv ? "" : "un"); 670 } 671 } 672 673 /* 674 * This function handles the CONNECT_REQUEST event on a listening 675 * endpoint. It is passed the cma_id for the _new_ connection. The context in 676 * this cma_id is inherited from the listening cma_id and is the svc_xprt 677 * structure for the listening endpoint. 678 * 679 * This function creates a new xprt for the new connection and enqueues it on 680 * the accept queue for the listent xprt. When the listen thread is kicked, it 681 * will call the recvfrom method on the listen xprt which will accept the new 682 * connection. 683 */ 684 static void handle_connect_req(struct rdma_cm_id *new_cma_id, 685 struct rdma_conn_param *param) 686 { 687 struct svcxprt_rdma *listen_xprt = new_cma_id->context; 688 struct svcxprt_rdma *newxprt; 689 struct sockaddr *sa; 690 691 /* Create a new transport */ 692 newxprt = rdma_create_xprt(listen_xprt->sc_xprt.xpt_server, 0); 693 if (!newxprt) { 694 dprintk("svcrdma: failed to create new transport\n"); 695 return; 696 } 697 newxprt->sc_cm_id = new_cma_id; 698 new_cma_id->context = newxprt; 699 dprintk("svcrdma: Creating newxprt=%p, cm_id=%p, listenxprt=%p\n", 700 newxprt, newxprt->sc_cm_id, listen_xprt); 701 svc_rdma_parse_connect_private(newxprt, param); 702 703 /* Save client advertised inbound read limit for use later in accept. */ 704 newxprt->sc_ord = param->initiator_depth; 705 706 /* Set the local and remote addresses in the transport */ 707 sa = (struct sockaddr *)&newxprt->sc_cm_id->route.addr.dst_addr; 708 svc_xprt_set_remote(&newxprt->sc_xprt, sa, svc_addr_len(sa)); 709 sa = (struct sockaddr *)&newxprt->sc_cm_id->route.addr.src_addr; 710 svc_xprt_set_local(&newxprt->sc_xprt, sa, svc_addr_len(sa)); 711 712 /* 713 * Enqueue the new transport on the accept queue of the listening 714 * transport 715 */ 716 spin_lock_bh(&listen_xprt->sc_lock); 717 list_add_tail(&newxprt->sc_accept_q, &listen_xprt->sc_accept_q); 718 spin_unlock_bh(&listen_xprt->sc_lock); 719 720 set_bit(XPT_CONN, &listen_xprt->sc_xprt.xpt_flags); 721 svc_xprt_enqueue(&listen_xprt->sc_xprt); 722 } 723 724 /* 725 * Handles events generated on the listening endpoint. These events will be 726 * either be incoming connect requests or adapter removal events. 727 */ 728 static int rdma_listen_handler(struct rdma_cm_id *cma_id, 729 struct rdma_cm_event *event) 730 { 731 struct svcxprt_rdma *xprt = cma_id->context; 732 int ret = 0; 733 734 switch (event->event) { 735 case RDMA_CM_EVENT_CONNECT_REQUEST: 736 dprintk("svcrdma: Connect request on cma_id=%p, xprt = %p, " 737 "event = %s (%d)\n", cma_id, cma_id->context, 738 rdma_event_msg(event->event), event->event); 739 handle_connect_req(cma_id, &event->param.conn); 740 break; 741 742 case RDMA_CM_EVENT_ESTABLISHED: 743 /* Accept complete */ 744 dprintk("svcrdma: Connection completed on LISTEN xprt=%p, " 745 "cm_id=%p\n", xprt, cma_id); 746 break; 747 748 case RDMA_CM_EVENT_DEVICE_REMOVAL: 749 dprintk("svcrdma: Device removal xprt=%p, cm_id=%p\n", 750 xprt, cma_id); 751 if (xprt) 752 set_bit(XPT_CLOSE, &xprt->sc_xprt.xpt_flags); 753 break; 754 755 default: 756 dprintk("svcrdma: Unexpected event on listening endpoint %p, " 757 "event = %s (%d)\n", cma_id, 758 rdma_event_msg(event->event), event->event); 759 break; 760 } 761 762 return ret; 763 } 764 765 static int rdma_cma_handler(struct rdma_cm_id *cma_id, 766 struct rdma_cm_event *event) 767 { 768 struct svc_xprt *xprt = cma_id->context; 769 struct svcxprt_rdma *rdma = 770 container_of(xprt, struct svcxprt_rdma, sc_xprt); 771 switch (event->event) { 772 case RDMA_CM_EVENT_ESTABLISHED: 773 /* Accept complete */ 774 svc_xprt_get(xprt); 775 dprintk("svcrdma: Connection completed on DTO xprt=%p, " 776 "cm_id=%p\n", xprt, cma_id); 777 clear_bit(RDMAXPRT_CONN_PENDING, &rdma->sc_flags); 778 svc_xprt_enqueue(xprt); 779 break; 780 case RDMA_CM_EVENT_DISCONNECTED: 781 dprintk("svcrdma: Disconnect on DTO xprt=%p, cm_id=%p\n", 782 xprt, cma_id); 783 if (xprt) { 784 set_bit(XPT_CLOSE, &xprt->xpt_flags); 785 svc_xprt_enqueue(xprt); 786 svc_xprt_put(xprt); 787 } 788 break; 789 case RDMA_CM_EVENT_DEVICE_REMOVAL: 790 dprintk("svcrdma: Device removal cma_id=%p, xprt = %p, " 791 "event = %s (%d)\n", cma_id, xprt, 792 rdma_event_msg(event->event), event->event); 793 if (xprt) { 794 set_bit(XPT_CLOSE, &xprt->xpt_flags); 795 svc_xprt_enqueue(xprt); 796 svc_xprt_put(xprt); 797 } 798 break; 799 default: 800 dprintk("svcrdma: Unexpected event on DTO endpoint %p, " 801 "event = %s (%d)\n", cma_id, 802 rdma_event_msg(event->event), event->event); 803 break; 804 } 805 return 0; 806 } 807 808 /* 809 * Create a listening RDMA service endpoint. 810 */ 811 static struct svc_xprt *svc_rdma_create(struct svc_serv *serv, 812 struct net *net, 813 struct sockaddr *sa, int salen, 814 int flags) 815 { 816 struct rdma_cm_id *listen_id; 817 struct svcxprt_rdma *cma_xprt; 818 int ret; 819 820 dprintk("svcrdma: Creating RDMA socket\n"); 821 if ((sa->sa_family != AF_INET) && (sa->sa_family != AF_INET6)) { 822 dprintk("svcrdma: Address family %d is not supported.\n", sa->sa_family); 823 return ERR_PTR(-EAFNOSUPPORT); 824 } 825 cma_xprt = rdma_create_xprt(serv, 1); 826 if (!cma_xprt) 827 return ERR_PTR(-ENOMEM); 828 829 listen_id = rdma_create_id(&init_net, rdma_listen_handler, cma_xprt, 830 RDMA_PS_TCP, IB_QPT_RC); 831 if (IS_ERR(listen_id)) { 832 ret = PTR_ERR(listen_id); 833 dprintk("svcrdma: rdma_create_id failed = %d\n", ret); 834 goto err0; 835 } 836 837 /* Allow both IPv4 and IPv6 sockets to bind a single port 838 * at the same time. 839 */ 840 #if IS_ENABLED(CONFIG_IPV6) 841 ret = rdma_set_afonly(listen_id, 1); 842 if (ret) { 843 dprintk("svcrdma: rdma_set_afonly failed = %d\n", ret); 844 goto err1; 845 } 846 #endif 847 ret = rdma_bind_addr(listen_id, sa); 848 if (ret) { 849 dprintk("svcrdma: rdma_bind_addr failed = %d\n", ret); 850 goto err1; 851 } 852 cma_xprt->sc_cm_id = listen_id; 853 854 ret = rdma_listen(listen_id, RPCRDMA_LISTEN_BACKLOG); 855 if (ret) { 856 dprintk("svcrdma: rdma_listen failed = %d\n", ret); 857 goto err1; 858 } 859 860 /* 861 * We need to use the address from the cm_id in case the 862 * caller specified 0 for the port number. 863 */ 864 sa = (struct sockaddr *)&cma_xprt->sc_cm_id->route.addr.src_addr; 865 svc_xprt_set_local(&cma_xprt->sc_xprt, sa, salen); 866 867 return &cma_xprt->sc_xprt; 868 869 err1: 870 rdma_destroy_id(listen_id); 871 err0: 872 kfree(cma_xprt); 873 return ERR_PTR(ret); 874 } 875 876 static struct svc_rdma_fastreg_mr *rdma_alloc_frmr(struct svcxprt_rdma *xprt) 877 { 878 struct ib_mr *mr; 879 struct scatterlist *sg; 880 struct svc_rdma_fastreg_mr *frmr; 881 u32 num_sg; 882 883 frmr = kmalloc(sizeof(*frmr), GFP_KERNEL); 884 if (!frmr) 885 goto err; 886 887 num_sg = min_t(u32, RPCSVC_MAXPAGES, xprt->sc_frmr_pg_list_len); 888 mr = ib_alloc_mr(xprt->sc_pd, IB_MR_TYPE_MEM_REG, num_sg); 889 if (IS_ERR(mr)) 890 goto err_free_frmr; 891 892 sg = kcalloc(RPCSVC_MAXPAGES, sizeof(*sg), GFP_KERNEL); 893 if (!sg) 894 goto err_free_mr; 895 896 sg_init_table(sg, RPCSVC_MAXPAGES); 897 898 frmr->mr = mr; 899 frmr->sg = sg; 900 INIT_LIST_HEAD(&frmr->frmr_list); 901 return frmr; 902 903 err_free_mr: 904 ib_dereg_mr(mr); 905 err_free_frmr: 906 kfree(frmr); 907 err: 908 return ERR_PTR(-ENOMEM); 909 } 910 911 static void rdma_dealloc_frmr_q(struct svcxprt_rdma *xprt) 912 { 913 struct svc_rdma_fastreg_mr *frmr; 914 915 while (!list_empty(&xprt->sc_frmr_q)) { 916 frmr = list_entry(xprt->sc_frmr_q.next, 917 struct svc_rdma_fastreg_mr, frmr_list); 918 list_del_init(&frmr->frmr_list); 919 kfree(frmr->sg); 920 ib_dereg_mr(frmr->mr); 921 kfree(frmr); 922 } 923 } 924 925 struct svc_rdma_fastreg_mr *svc_rdma_get_frmr(struct svcxprt_rdma *rdma) 926 { 927 struct svc_rdma_fastreg_mr *frmr = NULL; 928 929 spin_lock_bh(&rdma->sc_frmr_q_lock); 930 if (!list_empty(&rdma->sc_frmr_q)) { 931 frmr = list_entry(rdma->sc_frmr_q.next, 932 struct svc_rdma_fastreg_mr, frmr_list); 933 list_del_init(&frmr->frmr_list); 934 frmr->sg_nents = 0; 935 } 936 spin_unlock_bh(&rdma->sc_frmr_q_lock); 937 if (frmr) 938 return frmr; 939 940 return rdma_alloc_frmr(rdma); 941 } 942 943 void svc_rdma_put_frmr(struct svcxprt_rdma *rdma, 944 struct svc_rdma_fastreg_mr *frmr) 945 { 946 if (frmr) { 947 ib_dma_unmap_sg(rdma->sc_cm_id->device, 948 frmr->sg, frmr->sg_nents, frmr->direction); 949 atomic_dec(&rdma->sc_dma_used); 950 spin_lock_bh(&rdma->sc_frmr_q_lock); 951 WARN_ON_ONCE(!list_empty(&frmr->frmr_list)); 952 list_add(&frmr->frmr_list, &rdma->sc_frmr_q); 953 spin_unlock_bh(&rdma->sc_frmr_q_lock); 954 } 955 } 956 957 /* 958 * This is the xpo_recvfrom function for listening endpoints. Its 959 * purpose is to accept incoming connections. The CMA callback handler 960 * has already created a new transport and attached it to the new CMA 961 * ID. 962 * 963 * There is a queue of pending connections hung on the listening 964 * transport. This queue contains the new svc_xprt structure. This 965 * function takes svc_xprt structures off the accept_q and completes 966 * the connection. 967 */ 968 static struct svc_xprt *svc_rdma_accept(struct svc_xprt *xprt) 969 { 970 struct svcxprt_rdma *listen_rdma; 971 struct svcxprt_rdma *newxprt = NULL; 972 struct rdma_conn_param conn_param; 973 struct rpcrdma_connect_private pmsg; 974 struct ib_qp_init_attr qp_attr; 975 struct ib_device *dev; 976 unsigned int i; 977 int ret = 0; 978 979 listen_rdma = container_of(xprt, struct svcxprt_rdma, sc_xprt); 980 clear_bit(XPT_CONN, &xprt->xpt_flags); 981 /* Get the next entry off the accept list */ 982 spin_lock_bh(&listen_rdma->sc_lock); 983 if (!list_empty(&listen_rdma->sc_accept_q)) { 984 newxprt = list_entry(listen_rdma->sc_accept_q.next, 985 struct svcxprt_rdma, sc_accept_q); 986 list_del_init(&newxprt->sc_accept_q); 987 } 988 if (!list_empty(&listen_rdma->sc_accept_q)) 989 set_bit(XPT_CONN, &listen_rdma->sc_xprt.xpt_flags); 990 spin_unlock_bh(&listen_rdma->sc_lock); 991 if (!newxprt) 992 return NULL; 993 994 dprintk("svcrdma: newxprt from accept queue = %p, cm_id=%p\n", 995 newxprt, newxprt->sc_cm_id); 996 997 dev = newxprt->sc_cm_id->device; 998 999 /* Qualify the transport resource defaults with the 1000 * capabilities of this particular device */ 1001 newxprt->sc_max_sge = min((size_t)dev->attrs.max_sge, 1002 (size_t)RPCSVC_MAXPAGES); 1003 newxprt->sc_max_sge_rd = min_t(size_t, dev->attrs.max_sge_rd, 1004 RPCSVC_MAXPAGES); 1005 newxprt->sc_max_req_size = svcrdma_max_req_size; 1006 newxprt->sc_max_requests = min_t(u32, dev->attrs.max_qp_wr, 1007 svcrdma_max_requests); 1008 newxprt->sc_max_bc_requests = min_t(u32, dev->attrs.max_qp_wr, 1009 svcrdma_max_bc_requests); 1010 newxprt->sc_rq_depth = newxprt->sc_max_requests + 1011 newxprt->sc_max_bc_requests; 1012 newxprt->sc_sq_depth = RPCRDMA_SQ_DEPTH_MULT * newxprt->sc_rq_depth; 1013 1014 if (!svc_rdma_prealloc_ctxts(newxprt)) 1015 goto errout; 1016 if (!svc_rdma_prealloc_maps(newxprt)) 1017 goto errout; 1018 1019 /* 1020 * Limit ORD based on client limit, local device limit, and 1021 * configured svcrdma limit. 1022 */ 1023 newxprt->sc_ord = min_t(size_t, dev->attrs.max_qp_rd_atom, newxprt->sc_ord); 1024 newxprt->sc_ord = min_t(size_t, svcrdma_ord, newxprt->sc_ord); 1025 1026 newxprt->sc_pd = ib_alloc_pd(dev, 0); 1027 if (IS_ERR(newxprt->sc_pd)) { 1028 dprintk("svcrdma: error creating PD for connect request\n"); 1029 goto errout; 1030 } 1031 newxprt->sc_sq_cq = ib_alloc_cq(dev, newxprt, newxprt->sc_sq_depth, 1032 0, IB_POLL_SOFTIRQ); 1033 if (IS_ERR(newxprt->sc_sq_cq)) { 1034 dprintk("svcrdma: error creating SQ CQ for connect request\n"); 1035 goto errout; 1036 } 1037 newxprt->sc_rq_cq = ib_alloc_cq(dev, newxprt, newxprt->sc_rq_depth, 1038 0, IB_POLL_SOFTIRQ); 1039 if (IS_ERR(newxprt->sc_rq_cq)) { 1040 dprintk("svcrdma: error creating RQ CQ for connect request\n"); 1041 goto errout; 1042 } 1043 1044 memset(&qp_attr, 0, sizeof qp_attr); 1045 qp_attr.event_handler = qp_event_handler; 1046 qp_attr.qp_context = &newxprt->sc_xprt; 1047 qp_attr.cap.max_send_wr = newxprt->sc_sq_depth; 1048 qp_attr.cap.max_recv_wr = newxprt->sc_rq_depth; 1049 qp_attr.cap.max_send_sge = newxprt->sc_max_sge; 1050 qp_attr.cap.max_recv_sge = newxprt->sc_max_sge; 1051 qp_attr.sq_sig_type = IB_SIGNAL_REQ_WR; 1052 qp_attr.qp_type = IB_QPT_RC; 1053 qp_attr.send_cq = newxprt->sc_sq_cq; 1054 qp_attr.recv_cq = newxprt->sc_rq_cq; 1055 dprintk("svcrdma: newxprt->sc_cm_id=%p, newxprt->sc_pd=%p\n" 1056 " cm_id->device=%p, sc_pd->device=%p\n" 1057 " cap.max_send_wr = %d\n" 1058 " cap.max_recv_wr = %d\n" 1059 " cap.max_send_sge = %d\n" 1060 " cap.max_recv_sge = %d\n", 1061 newxprt->sc_cm_id, newxprt->sc_pd, 1062 dev, newxprt->sc_pd->device, 1063 qp_attr.cap.max_send_wr, 1064 qp_attr.cap.max_recv_wr, 1065 qp_attr.cap.max_send_sge, 1066 qp_attr.cap.max_recv_sge); 1067 1068 ret = rdma_create_qp(newxprt->sc_cm_id, newxprt->sc_pd, &qp_attr); 1069 if (ret) { 1070 dprintk("svcrdma: failed to create QP, ret=%d\n", ret); 1071 goto errout; 1072 } 1073 newxprt->sc_qp = newxprt->sc_cm_id->qp; 1074 1075 /* 1076 * Use the most secure set of MR resources based on the 1077 * transport type and available memory management features in 1078 * the device. Here's the table implemented below: 1079 * 1080 * Fast Global DMA Remote WR 1081 * Reg LKEY MR Access 1082 * Sup'd Sup'd Needed Needed 1083 * 1084 * IWARP N N Y Y 1085 * N Y Y Y 1086 * Y N Y N 1087 * Y Y N - 1088 * 1089 * IB N N Y N 1090 * N Y N - 1091 * Y N Y N 1092 * Y Y N - 1093 * 1094 * NB: iWARP requires remote write access for the data sink 1095 * of an RDMA_READ. IB does not. 1096 */ 1097 newxprt->sc_reader = rdma_read_chunk_lcl; 1098 if (dev->attrs.device_cap_flags & IB_DEVICE_MEM_MGT_EXTENSIONS) { 1099 newxprt->sc_frmr_pg_list_len = 1100 dev->attrs.max_fast_reg_page_list_len; 1101 newxprt->sc_dev_caps |= SVCRDMA_DEVCAP_FAST_REG; 1102 newxprt->sc_reader = rdma_read_chunk_frmr; 1103 } else 1104 newxprt->sc_snd_w_inv = false; 1105 1106 /* 1107 * Determine if a DMA MR is required and if so, what privs are required 1108 */ 1109 if (!rdma_protocol_iwarp(dev, newxprt->sc_cm_id->port_num) && 1110 !rdma_ib_or_roce(dev, newxprt->sc_cm_id->port_num)) 1111 goto errout; 1112 1113 if (rdma_protocol_iwarp(dev, newxprt->sc_cm_id->port_num)) 1114 newxprt->sc_dev_caps |= SVCRDMA_DEVCAP_READ_W_INV; 1115 1116 /* Post receive buffers */ 1117 for (i = 0; i < newxprt->sc_max_requests; i++) { 1118 ret = svc_rdma_post_recv(newxprt, GFP_KERNEL); 1119 if (ret) { 1120 dprintk("svcrdma: failure posting receive buffers\n"); 1121 goto errout; 1122 } 1123 } 1124 1125 /* Swap out the handler */ 1126 newxprt->sc_cm_id->event_handler = rdma_cma_handler; 1127 1128 /* Construct RDMA-CM private message */ 1129 pmsg.cp_magic = rpcrdma_cmp_magic; 1130 pmsg.cp_version = RPCRDMA_CMP_VERSION; 1131 pmsg.cp_flags = 0; 1132 pmsg.cp_send_size = pmsg.cp_recv_size = 1133 rpcrdma_encode_buffer_size(newxprt->sc_max_req_size); 1134 1135 /* Accept Connection */ 1136 set_bit(RDMAXPRT_CONN_PENDING, &newxprt->sc_flags); 1137 memset(&conn_param, 0, sizeof conn_param); 1138 conn_param.responder_resources = 0; 1139 conn_param.initiator_depth = newxprt->sc_ord; 1140 conn_param.private_data = &pmsg; 1141 conn_param.private_data_len = sizeof(pmsg); 1142 ret = rdma_accept(newxprt->sc_cm_id, &conn_param); 1143 if (ret) { 1144 dprintk("svcrdma: failed to accept new connection, ret=%d\n", 1145 ret); 1146 goto errout; 1147 } 1148 1149 dprintk("svcrdma: new connection %p accepted with the following " 1150 "attributes:\n" 1151 " local_ip : %pI4\n" 1152 " local_port : %d\n" 1153 " remote_ip : %pI4\n" 1154 " remote_port : %d\n" 1155 " max_sge : %d\n" 1156 " max_sge_rd : %d\n" 1157 " sq_depth : %d\n" 1158 " max_requests : %d\n" 1159 " ord : %d\n", 1160 newxprt, 1161 &((struct sockaddr_in *)&newxprt->sc_cm_id-> 1162 route.addr.src_addr)->sin_addr.s_addr, 1163 ntohs(((struct sockaddr_in *)&newxprt->sc_cm_id-> 1164 route.addr.src_addr)->sin_port), 1165 &((struct sockaddr_in *)&newxprt->sc_cm_id-> 1166 route.addr.dst_addr)->sin_addr.s_addr, 1167 ntohs(((struct sockaddr_in *)&newxprt->sc_cm_id-> 1168 route.addr.dst_addr)->sin_port), 1169 newxprt->sc_max_sge, 1170 newxprt->sc_max_sge_rd, 1171 newxprt->sc_sq_depth, 1172 newxprt->sc_max_requests, 1173 newxprt->sc_ord); 1174 1175 return &newxprt->sc_xprt; 1176 1177 errout: 1178 dprintk("svcrdma: failure accepting new connection rc=%d.\n", ret); 1179 /* Take a reference in case the DTO handler runs */ 1180 svc_xprt_get(&newxprt->sc_xprt); 1181 if (newxprt->sc_qp && !IS_ERR(newxprt->sc_qp)) 1182 ib_destroy_qp(newxprt->sc_qp); 1183 rdma_destroy_id(newxprt->sc_cm_id); 1184 /* This call to put will destroy the transport */ 1185 svc_xprt_put(&newxprt->sc_xprt); 1186 return NULL; 1187 } 1188 1189 static void svc_rdma_release_rqst(struct svc_rqst *rqstp) 1190 { 1191 } 1192 1193 /* 1194 * When connected, an svc_xprt has at least two references: 1195 * 1196 * - A reference held by the cm_id between the ESTABLISHED and 1197 * DISCONNECTED events. If the remote peer disconnected first, this 1198 * reference could be gone. 1199 * 1200 * - A reference held by the svc_recv code that called this function 1201 * as part of close processing. 1202 * 1203 * At a minimum one references should still be held. 1204 */ 1205 static void svc_rdma_detach(struct svc_xprt *xprt) 1206 { 1207 struct svcxprt_rdma *rdma = 1208 container_of(xprt, struct svcxprt_rdma, sc_xprt); 1209 dprintk("svc: svc_rdma_detach(%p)\n", xprt); 1210 1211 /* Disconnect and flush posted WQE */ 1212 rdma_disconnect(rdma->sc_cm_id); 1213 } 1214 1215 static void __svc_rdma_free(struct work_struct *work) 1216 { 1217 struct svcxprt_rdma *rdma = 1218 container_of(work, struct svcxprt_rdma, sc_work); 1219 struct svc_xprt *xprt = &rdma->sc_xprt; 1220 1221 dprintk("svcrdma: %s(%p)\n", __func__, rdma); 1222 1223 if (rdma->sc_qp && !IS_ERR(rdma->sc_qp)) 1224 ib_drain_qp(rdma->sc_qp); 1225 1226 /* We should only be called from kref_put */ 1227 if (atomic_read(&xprt->xpt_ref.refcount) != 0) 1228 pr_err("svcrdma: sc_xprt still in use? (%d)\n", 1229 atomic_read(&xprt->xpt_ref.refcount)); 1230 1231 /* 1232 * Destroy queued, but not processed read completions. Note 1233 * that this cleanup has to be done before destroying the 1234 * cm_id because the device ptr is needed to unmap the dma in 1235 * svc_rdma_put_context. 1236 */ 1237 while (!list_empty(&rdma->sc_read_complete_q)) { 1238 struct svc_rdma_op_ctxt *ctxt; 1239 ctxt = list_entry(rdma->sc_read_complete_q.next, 1240 struct svc_rdma_op_ctxt, 1241 dto_q); 1242 list_del_init(&ctxt->dto_q); 1243 svc_rdma_put_context(ctxt, 1); 1244 } 1245 1246 /* Destroy queued, but not processed recv completions */ 1247 while (!list_empty(&rdma->sc_rq_dto_q)) { 1248 struct svc_rdma_op_ctxt *ctxt; 1249 ctxt = list_entry(rdma->sc_rq_dto_q.next, 1250 struct svc_rdma_op_ctxt, 1251 dto_q); 1252 list_del_init(&ctxt->dto_q); 1253 svc_rdma_put_context(ctxt, 1); 1254 } 1255 1256 /* Warn if we leaked a resource or under-referenced */ 1257 if (rdma->sc_ctxt_used != 0) 1258 pr_err("svcrdma: ctxt still in use? (%d)\n", 1259 rdma->sc_ctxt_used); 1260 if (atomic_read(&rdma->sc_dma_used) != 0) 1261 pr_err("svcrdma: dma still in use? (%d)\n", 1262 atomic_read(&rdma->sc_dma_used)); 1263 1264 /* Final put of backchannel client transport */ 1265 if (xprt->xpt_bc_xprt) { 1266 xprt_put(xprt->xpt_bc_xprt); 1267 xprt->xpt_bc_xprt = NULL; 1268 } 1269 1270 rdma_dealloc_frmr_q(rdma); 1271 svc_rdma_destroy_ctxts(rdma); 1272 svc_rdma_destroy_maps(rdma); 1273 1274 /* Destroy the QP if present (not a listener) */ 1275 if (rdma->sc_qp && !IS_ERR(rdma->sc_qp)) 1276 ib_destroy_qp(rdma->sc_qp); 1277 1278 if (rdma->sc_sq_cq && !IS_ERR(rdma->sc_sq_cq)) 1279 ib_free_cq(rdma->sc_sq_cq); 1280 1281 if (rdma->sc_rq_cq && !IS_ERR(rdma->sc_rq_cq)) 1282 ib_free_cq(rdma->sc_rq_cq); 1283 1284 if (rdma->sc_pd && !IS_ERR(rdma->sc_pd)) 1285 ib_dealloc_pd(rdma->sc_pd); 1286 1287 /* Destroy the CM ID */ 1288 rdma_destroy_id(rdma->sc_cm_id); 1289 1290 kfree(rdma); 1291 } 1292 1293 static void svc_rdma_free(struct svc_xprt *xprt) 1294 { 1295 struct svcxprt_rdma *rdma = 1296 container_of(xprt, struct svcxprt_rdma, sc_xprt); 1297 INIT_WORK(&rdma->sc_work, __svc_rdma_free); 1298 queue_work(svc_rdma_wq, &rdma->sc_work); 1299 } 1300 1301 static int svc_rdma_has_wspace(struct svc_xprt *xprt) 1302 { 1303 struct svcxprt_rdma *rdma = 1304 container_of(xprt, struct svcxprt_rdma, sc_xprt); 1305 1306 /* 1307 * If there are already waiters on the SQ, 1308 * return false. 1309 */ 1310 if (waitqueue_active(&rdma->sc_send_wait)) 1311 return 0; 1312 1313 /* Otherwise return true. */ 1314 return 1; 1315 } 1316 1317 static int svc_rdma_secure_port(struct svc_rqst *rqstp) 1318 { 1319 return 1; 1320 } 1321 1322 static void svc_rdma_kill_temp_xprt(struct svc_xprt *xprt) 1323 { 1324 } 1325 1326 int svc_rdma_send(struct svcxprt_rdma *xprt, struct ib_send_wr *wr) 1327 { 1328 struct ib_send_wr *bad_wr, *n_wr; 1329 int wr_count; 1330 int i; 1331 int ret; 1332 1333 if (test_bit(XPT_CLOSE, &xprt->sc_xprt.xpt_flags)) 1334 return -ENOTCONN; 1335 1336 wr_count = 1; 1337 for (n_wr = wr->next; n_wr; n_wr = n_wr->next) 1338 wr_count++; 1339 1340 /* If the SQ is full, wait until an SQ entry is available */ 1341 while (1) { 1342 spin_lock_bh(&xprt->sc_lock); 1343 if (xprt->sc_sq_depth < atomic_read(&xprt->sc_sq_count) + wr_count) { 1344 spin_unlock_bh(&xprt->sc_lock); 1345 atomic_inc(&rdma_stat_sq_starve); 1346 1347 /* Wait until SQ WR available if SQ still full */ 1348 wait_event(xprt->sc_send_wait, 1349 atomic_read(&xprt->sc_sq_count) < 1350 xprt->sc_sq_depth); 1351 if (test_bit(XPT_CLOSE, &xprt->sc_xprt.xpt_flags)) 1352 return -ENOTCONN; 1353 continue; 1354 } 1355 /* Take a transport ref for each WR posted */ 1356 for (i = 0; i < wr_count; i++) 1357 svc_xprt_get(&xprt->sc_xprt); 1358 1359 /* Bump used SQ WR count and post */ 1360 atomic_add(wr_count, &xprt->sc_sq_count); 1361 ret = ib_post_send(xprt->sc_qp, wr, &bad_wr); 1362 if (ret) { 1363 set_bit(XPT_CLOSE, &xprt->sc_xprt.xpt_flags); 1364 atomic_sub(wr_count, &xprt->sc_sq_count); 1365 for (i = 0; i < wr_count; i ++) 1366 svc_xprt_put(&xprt->sc_xprt); 1367 dprintk("svcrdma: failed to post SQ WR rc=%d, " 1368 "sc_sq_count=%d, sc_sq_depth=%d\n", 1369 ret, atomic_read(&xprt->sc_sq_count), 1370 xprt->sc_sq_depth); 1371 } 1372 spin_unlock_bh(&xprt->sc_lock); 1373 if (ret) 1374 wake_up(&xprt->sc_send_wait); 1375 break; 1376 } 1377 return ret; 1378 } 1379