1 /* 2 * Copyright (c) 2014 Open Grid Computing, Inc. All rights reserved. 3 * Copyright (c) 2005-2007 Network Appliance, Inc. All rights reserved. 4 * 5 * This software is available to you under a choice of one of two 6 * licenses. You may choose to be licensed under the terms of the GNU 7 * General Public License (GPL) Version 2, available from the file 8 * COPYING in the main directory of this source tree, or the BSD-type 9 * license below: 10 * 11 * Redistribution and use in source and binary forms, with or without 12 * modification, are permitted provided that the following conditions 13 * are met: 14 * 15 * Redistributions of source code must retain the above copyright 16 * notice, this list of conditions and the following disclaimer. 17 * 18 * Redistributions in binary form must reproduce the above 19 * copyright notice, this list of conditions and the following 20 * disclaimer in the documentation and/or other materials provided 21 * with the distribution. 22 * 23 * Neither the name of the Network Appliance, Inc. nor the names of 24 * its contributors may be used to endorse or promote products 25 * derived from this software without specific prior written 26 * permission. 27 * 28 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 29 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 30 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 31 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 32 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 33 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 34 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 35 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 36 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 37 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 38 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 39 * 40 * Author: Tom Tucker <tom@opengridcomputing.com> 41 */ 42 43 #include <linux/sunrpc/svc_xprt.h> 44 #include <linux/sunrpc/debug.h> 45 #include <linux/sunrpc/rpc_rdma.h> 46 #include <linux/interrupt.h> 47 #include <linux/sched.h> 48 #include <linux/slab.h> 49 #include <linux/spinlock.h> 50 #include <linux/workqueue.h> 51 #include <rdma/ib_verbs.h> 52 #include <rdma/rdma_cm.h> 53 #include <linux/sunrpc/svc_rdma.h> 54 #include <linux/export.h> 55 #include "xprt_rdma.h" 56 57 #define RPCDBG_FACILITY RPCDBG_SVCXPRT 58 59 static struct svcxprt_rdma *rdma_create_xprt(struct svc_serv *, int); 60 static struct svc_xprt *svc_rdma_create(struct svc_serv *serv, 61 struct net *net, 62 struct sockaddr *sa, int salen, 63 int flags); 64 static struct svc_xprt *svc_rdma_accept(struct svc_xprt *xprt); 65 static void svc_rdma_release_rqst(struct svc_rqst *); 66 static void dto_tasklet_func(unsigned long data); 67 static void svc_rdma_detach(struct svc_xprt *xprt); 68 static void svc_rdma_free(struct svc_xprt *xprt); 69 static int svc_rdma_has_wspace(struct svc_xprt *xprt); 70 static int svc_rdma_secure_port(struct svc_rqst *); 71 static void rq_cq_reap(struct svcxprt_rdma *xprt); 72 static void sq_cq_reap(struct svcxprt_rdma *xprt); 73 74 static DECLARE_TASKLET(dto_tasklet, dto_tasklet_func, 0UL); 75 static DEFINE_SPINLOCK(dto_lock); 76 static LIST_HEAD(dto_xprt_q); 77 78 static struct svc_xprt_ops svc_rdma_ops = { 79 .xpo_create = svc_rdma_create, 80 .xpo_recvfrom = svc_rdma_recvfrom, 81 .xpo_sendto = svc_rdma_sendto, 82 .xpo_release_rqst = svc_rdma_release_rqst, 83 .xpo_detach = svc_rdma_detach, 84 .xpo_free = svc_rdma_free, 85 .xpo_prep_reply_hdr = svc_rdma_prep_reply_hdr, 86 .xpo_has_wspace = svc_rdma_has_wspace, 87 .xpo_accept = svc_rdma_accept, 88 .xpo_secure_port = svc_rdma_secure_port, 89 }; 90 91 struct svc_xprt_class svc_rdma_class = { 92 .xcl_name = "rdma", 93 .xcl_owner = THIS_MODULE, 94 .xcl_ops = &svc_rdma_ops, 95 .xcl_max_payload = RPCSVC_MAXPAYLOAD_RDMA, 96 .xcl_ident = XPRT_TRANSPORT_RDMA, 97 }; 98 99 #if defined(CONFIG_SUNRPC_BACKCHANNEL) 100 static struct svc_xprt *svc_rdma_bc_create(struct svc_serv *, struct net *, 101 struct sockaddr *, int, int); 102 static void svc_rdma_bc_detach(struct svc_xprt *); 103 static void svc_rdma_bc_free(struct svc_xprt *); 104 105 static struct svc_xprt_ops svc_rdma_bc_ops = { 106 .xpo_create = svc_rdma_bc_create, 107 .xpo_detach = svc_rdma_bc_detach, 108 .xpo_free = svc_rdma_bc_free, 109 .xpo_prep_reply_hdr = svc_rdma_prep_reply_hdr, 110 .xpo_secure_port = svc_rdma_secure_port, 111 }; 112 113 struct svc_xprt_class svc_rdma_bc_class = { 114 .xcl_name = "rdma-bc", 115 .xcl_owner = THIS_MODULE, 116 .xcl_ops = &svc_rdma_bc_ops, 117 .xcl_max_payload = (1024 - RPCRDMA_HDRLEN_MIN) 118 }; 119 120 static struct svc_xprt *svc_rdma_bc_create(struct svc_serv *serv, 121 struct net *net, 122 struct sockaddr *sa, int salen, 123 int flags) 124 { 125 struct svcxprt_rdma *cma_xprt; 126 struct svc_xprt *xprt; 127 128 cma_xprt = rdma_create_xprt(serv, 0); 129 if (!cma_xprt) 130 return ERR_PTR(-ENOMEM); 131 xprt = &cma_xprt->sc_xprt; 132 133 svc_xprt_init(net, &svc_rdma_bc_class, xprt, serv); 134 serv->sv_bc_xprt = xprt; 135 136 dprintk("svcrdma: %s(%p)\n", __func__, xprt); 137 return xprt; 138 } 139 140 static void svc_rdma_bc_detach(struct svc_xprt *xprt) 141 { 142 dprintk("svcrdma: %s(%p)\n", __func__, xprt); 143 } 144 145 static void svc_rdma_bc_free(struct svc_xprt *xprt) 146 { 147 struct svcxprt_rdma *rdma = 148 container_of(xprt, struct svcxprt_rdma, sc_xprt); 149 150 dprintk("svcrdma: %s(%p)\n", __func__, xprt); 151 if (xprt) 152 kfree(rdma); 153 } 154 #endif /* CONFIG_SUNRPC_BACKCHANNEL */ 155 156 static struct svc_rdma_op_ctxt *alloc_ctxt(struct svcxprt_rdma *xprt, 157 gfp_t flags) 158 { 159 struct svc_rdma_op_ctxt *ctxt; 160 161 ctxt = kmalloc(sizeof(*ctxt), flags); 162 if (ctxt) { 163 ctxt->xprt = xprt; 164 INIT_LIST_HEAD(&ctxt->free); 165 INIT_LIST_HEAD(&ctxt->dto_q); 166 } 167 return ctxt; 168 } 169 170 static bool svc_rdma_prealloc_ctxts(struct svcxprt_rdma *xprt) 171 { 172 unsigned int i; 173 174 /* Each RPC/RDMA credit can consume a number of send 175 * and receive WQEs. One ctxt is allocated for each. 176 */ 177 i = xprt->sc_sq_depth + xprt->sc_rq_depth; 178 179 while (i--) { 180 struct svc_rdma_op_ctxt *ctxt; 181 182 ctxt = alloc_ctxt(xprt, GFP_KERNEL); 183 if (!ctxt) { 184 dprintk("svcrdma: No memory for RDMA ctxt\n"); 185 return false; 186 } 187 list_add(&ctxt->free, &xprt->sc_ctxts); 188 } 189 return true; 190 } 191 192 struct svc_rdma_op_ctxt *svc_rdma_get_context(struct svcxprt_rdma *xprt) 193 { 194 struct svc_rdma_op_ctxt *ctxt = NULL; 195 196 spin_lock_bh(&xprt->sc_ctxt_lock); 197 xprt->sc_ctxt_used++; 198 if (list_empty(&xprt->sc_ctxts)) 199 goto out_empty; 200 201 ctxt = list_first_entry(&xprt->sc_ctxts, 202 struct svc_rdma_op_ctxt, free); 203 list_del_init(&ctxt->free); 204 spin_unlock_bh(&xprt->sc_ctxt_lock); 205 206 out: 207 ctxt->count = 0; 208 ctxt->frmr = NULL; 209 return ctxt; 210 211 out_empty: 212 /* Either pre-allocation missed the mark, or send 213 * queue accounting is broken. 214 */ 215 spin_unlock_bh(&xprt->sc_ctxt_lock); 216 217 ctxt = alloc_ctxt(xprt, GFP_NOIO); 218 if (ctxt) 219 goto out; 220 221 spin_lock_bh(&xprt->sc_ctxt_lock); 222 xprt->sc_ctxt_used--; 223 spin_unlock_bh(&xprt->sc_ctxt_lock); 224 WARN_ONCE(1, "svcrdma: empty RDMA ctxt list?\n"); 225 return NULL; 226 } 227 228 void svc_rdma_unmap_dma(struct svc_rdma_op_ctxt *ctxt) 229 { 230 struct svcxprt_rdma *xprt = ctxt->xprt; 231 int i; 232 for (i = 0; i < ctxt->count && ctxt->sge[i].length; i++) { 233 /* 234 * Unmap the DMA addr in the SGE if the lkey matches 235 * the local_dma_lkey, otherwise, ignore it since it is 236 * an FRMR lkey and will be unmapped later when the 237 * last WR that uses it completes. 238 */ 239 if (ctxt->sge[i].lkey == xprt->sc_pd->local_dma_lkey) { 240 atomic_dec(&xprt->sc_dma_used); 241 ib_dma_unmap_page(xprt->sc_cm_id->device, 242 ctxt->sge[i].addr, 243 ctxt->sge[i].length, 244 ctxt->direction); 245 } 246 } 247 } 248 249 void svc_rdma_put_context(struct svc_rdma_op_ctxt *ctxt, int free_pages) 250 { 251 struct svcxprt_rdma *xprt = ctxt->xprt; 252 int i; 253 254 if (free_pages) 255 for (i = 0; i < ctxt->count; i++) 256 put_page(ctxt->pages[i]); 257 258 spin_lock_bh(&xprt->sc_ctxt_lock); 259 xprt->sc_ctxt_used--; 260 list_add(&ctxt->free, &xprt->sc_ctxts); 261 spin_unlock_bh(&xprt->sc_ctxt_lock); 262 } 263 264 static void svc_rdma_destroy_ctxts(struct svcxprt_rdma *xprt) 265 { 266 while (!list_empty(&xprt->sc_ctxts)) { 267 struct svc_rdma_op_ctxt *ctxt; 268 269 ctxt = list_first_entry(&xprt->sc_ctxts, 270 struct svc_rdma_op_ctxt, free); 271 list_del(&ctxt->free); 272 kfree(ctxt); 273 } 274 } 275 276 static struct svc_rdma_req_map *alloc_req_map(gfp_t flags) 277 { 278 struct svc_rdma_req_map *map; 279 280 map = kmalloc(sizeof(*map), flags); 281 if (map) 282 INIT_LIST_HEAD(&map->free); 283 return map; 284 } 285 286 static bool svc_rdma_prealloc_maps(struct svcxprt_rdma *xprt) 287 { 288 unsigned int i; 289 290 /* One for each receive buffer on this connection. */ 291 i = xprt->sc_max_requests; 292 293 while (i--) { 294 struct svc_rdma_req_map *map; 295 296 map = alloc_req_map(GFP_KERNEL); 297 if (!map) { 298 dprintk("svcrdma: No memory for request map\n"); 299 return false; 300 } 301 list_add(&map->free, &xprt->sc_maps); 302 } 303 return true; 304 } 305 306 struct svc_rdma_req_map *svc_rdma_get_req_map(struct svcxprt_rdma *xprt) 307 { 308 struct svc_rdma_req_map *map = NULL; 309 310 spin_lock(&xprt->sc_map_lock); 311 if (list_empty(&xprt->sc_maps)) 312 goto out_empty; 313 314 map = list_first_entry(&xprt->sc_maps, 315 struct svc_rdma_req_map, free); 316 list_del_init(&map->free); 317 spin_unlock(&xprt->sc_map_lock); 318 319 out: 320 map->count = 0; 321 return map; 322 323 out_empty: 324 spin_unlock(&xprt->sc_map_lock); 325 326 /* Pre-allocation amount was incorrect */ 327 map = alloc_req_map(GFP_NOIO); 328 if (map) 329 goto out; 330 331 WARN_ONCE(1, "svcrdma: empty request map list?\n"); 332 return NULL; 333 } 334 335 void svc_rdma_put_req_map(struct svcxprt_rdma *xprt, 336 struct svc_rdma_req_map *map) 337 { 338 spin_lock(&xprt->sc_map_lock); 339 list_add(&map->free, &xprt->sc_maps); 340 spin_unlock(&xprt->sc_map_lock); 341 } 342 343 static void svc_rdma_destroy_maps(struct svcxprt_rdma *xprt) 344 { 345 while (!list_empty(&xprt->sc_maps)) { 346 struct svc_rdma_req_map *map; 347 348 map = list_first_entry(&xprt->sc_maps, 349 struct svc_rdma_req_map, free); 350 list_del(&map->free); 351 kfree(map); 352 } 353 } 354 355 /* ib_cq event handler */ 356 static void cq_event_handler(struct ib_event *event, void *context) 357 { 358 struct svc_xprt *xprt = context; 359 dprintk("svcrdma: received CQ event %s (%d), context=%p\n", 360 ib_event_msg(event->event), event->event, context); 361 set_bit(XPT_CLOSE, &xprt->xpt_flags); 362 } 363 364 /* QP event handler */ 365 static void qp_event_handler(struct ib_event *event, void *context) 366 { 367 struct svc_xprt *xprt = context; 368 369 switch (event->event) { 370 /* These are considered benign events */ 371 case IB_EVENT_PATH_MIG: 372 case IB_EVENT_COMM_EST: 373 case IB_EVENT_SQ_DRAINED: 374 case IB_EVENT_QP_LAST_WQE_REACHED: 375 dprintk("svcrdma: QP event %s (%d) received for QP=%p\n", 376 ib_event_msg(event->event), event->event, 377 event->element.qp); 378 break; 379 /* These are considered fatal events */ 380 case IB_EVENT_PATH_MIG_ERR: 381 case IB_EVENT_QP_FATAL: 382 case IB_EVENT_QP_REQ_ERR: 383 case IB_EVENT_QP_ACCESS_ERR: 384 case IB_EVENT_DEVICE_FATAL: 385 default: 386 dprintk("svcrdma: QP ERROR event %s (%d) received for QP=%p, " 387 "closing transport\n", 388 ib_event_msg(event->event), event->event, 389 event->element.qp); 390 set_bit(XPT_CLOSE, &xprt->xpt_flags); 391 break; 392 } 393 } 394 395 /* 396 * Data Transfer Operation Tasklet 397 * 398 * Walks a list of transports with I/O pending, removing entries as 399 * they are added to the server's I/O pending list. Two bits indicate 400 * if SQ, RQ, or both have I/O pending. The dto_lock is an irqsave 401 * spinlock that serializes access to the transport list with the RQ 402 * and SQ interrupt handlers. 403 */ 404 static void dto_tasklet_func(unsigned long data) 405 { 406 struct svcxprt_rdma *xprt; 407 unsigned long flags; 408 409 spin_lock_irqsave(&dto_lock, flags); 410 while (!list_empty(&dto_xprt_q)) { 411 xprt = list_entry(dto_xprt_q.next, 412 struct svcxprt_rdma, sc_dto_q); 413 list_del_init(&xprt->sc_dto_q); 414 spin_unlock_irqrestore(&dto_lock, flags); 415 416 rq_cq_reap(xprt); 417 sq_cq_reap(xprt); 418 419 svc_xprt_put(&xprt->sc_xprt); 420 spin_lock_irqsave(&dto_lock, flags); 421 } 422 spin_unlock_irqrestore(&dto_lock, flags); 423 } 424 425 /* 426 * Receive Queue Completion Handler 427 * 428 * Since an RQ completion handler is called on interrupt context, we 429 * need to defer the handling of the I/O to a tasklet 430 */ 431 static void rq_comp_handler(struct ib_cq *cq, void *cq_context) 432 { 433 struct svcxprt_rdma *xprt = cq_context; 434 unsigned long flags; 435 436 /* Guard against unconditional flush call for destroyed QP */ 437 if (atomic_read(&xprt->sc_xprt.xpt_ref.refcount)==0) 438 return; 439 440 /* 441 * Set the bit regardless of whether or not it's on the list 442 * because it may be on the list already due to an SQ 443 * completion. 444 */ 445 set_bit(RDMAXPRT_RQ_PENDING, &xprt->sc_flags); 446 447 /* 448 * If this transport is not already on the DTO transport queue, 449 * add it 450 */ 451 spin_lock_irqsave(&dto_lock, flags); 452 if (list_empty(&xprt->sc_dto_q)) { 453 svc_xprt_get(&xprt->sc_xprt); 454 list_add_tail(&xprt->sc_dto_q, &dto_xprt_q); 455 } 456 spin_unlock_irqrestore(&dto_lock, flags); 457 458 /* Tasklet does all the work to avoid irqsave locks. */ 459 tasklet_schedule(&dto_tasklet); 460 } 461 462 /* 463 * rq_cq_reap - Process the RQ CQ. 464 * 465 * Take all completing WC off the CQE and enqueue the associated DTO 466 * context on the dto_q for the transport. 467 * 468 * Note that caller must hold a transport reference. 469 */ 470 static void rq_cq_reap(struct svcxprt_rdma *xprt) 471 { 472 int ret; 473 struct ib_wc wc; 474 struct svc_rdma_op_ctxt *ctxt = NULL; 475 476 if (!test_and_clear_bit(RDMAXPRT_RQ_PENDING, &xprt->sc_flags)) 477 return; 478 479 ib_req_notify_cq(xprt->sc_rq_cq, IB_CQ_NEXT_COMP); 480 atomic_inc(&rdma_stat_rq_poll); 481 482 while ((ret = ib_poll_cq(xprt->sc_rq_cq, 1, &wc)) > 0) { 483 ctxt = (struct svc_rdma_op_ctxt *)(unsigned long)wc.wr_id; 484 ctxt->wc_status = wc.status; 485 ctxt->byte_len = wc.byte_len; 486 svc_rdma_unmap_dma(ctxt); 487 if (wc.status != IB_WC_SUCCESS) { 488 /* Close the transport */ 489 dprintk("svcrdma: transport closing putting ctxt %p\n", ctxt); 490 set_bit(XPT_CLOSE, &xprt->sc_xprt.xpt_flags); 491 svc_rdma_put_context(ctxt, 1); 492 svc_xprt_put(&xprt->sc_xprt); 493 continue; 494 } 495 spin_lock_bh(&xprt->sc_rq_dto_lock); 496 list_add_tail(&ctxt->dto_q, &xprt->sc_rq_dto_q); 497 spin_unlock_bh(&xprt->sc_rq_dto_lock); 498 svc_xprt_put(&xprt->sc_xprt); 499 } 500 501 if (ctxt) 502 atomic_inc(&rdma_stat_rq_prod); 503 504 set_bit(XPT_DATA, &xprt->sc_xprt.xpt_flags); 505 /* 506 * If data arrived before established event, 507 * don't enqueue. This defers RPC I/O until the 508 * RDMA connection is complete. 509 */ 510 if (!test_bit(RDMAXPRT_CONN_PENDING, &xprt->sc_flags)) 511 svc_xprt_enqueue(&xprt->sc_xprt); 512 } 513 514 /* 515 * Process a completion context 516 */ 517 static void process_context(struct svcxprt_rdma *xprt, 518 struct svc_rdma_op_ctxt *ctxt) 519 { 520 struct svc_rdma_op_ctxt *read_hdr; 521 int free_pages = 0; 522 523 svc_rdma_unmap_dma(ctxt); 524 525 switch (ctxt->wr_op) { 526 case IB_WR_SEND: 527 free_pages = 1; 528 break; 529 530 case IB_WR_RDMA_WRITE: 531 break; 532 533 case IB_WR_RDMA_READ: 534 case IB_WR_RDMA_READ_WITH_INV: 535 svc_rdma_put_frmr(xprt, ctxt->frmr); 536 537 if (!test_bit(RDMACTXT_F_LAST_CTXT, &ctxt->flags)) 538 break; 539 540 read_hdr = ctxt->read_hdr; 541 svc_rdma_put_context(ctxt, 0); 542 543 spin_lock_bh(&xprt->sc_rq_dto_lock); 544 set_bit(XPT_DATA, &xprt->sc_xprt.xpt_flags); 545 list_add_tail(&read_hdr->dto_q, 546 &xprt->sc_read_complete_q); 547 spin_unlock_bh(&xprt->sc_rq_dto_lock); 548 svc_xprt_enqueue(&xprt->sc_xprt); 549 return; 550 551 default: 552 dprintk("svcrdma: unexpected completion opcode=%d\n", 553 ctxt->wr_op); 554 break; 555 } 556 557 svc_rdma_put_context(ctxt, free_pages); 558 } 559 560 /* 561 * Send Queue Completion Handler - potentially called on interrupt context. 562 * 563 * Note that caller must hold a transport reference. 564 */ 565 static void sq_cq_reap(struct svcxprt_rdma *xprt) 566 { 567 struct svc_rdma_op_ctxt *ctxt = NULL; 568 struct ib_wc wc_a[6]; 569 struct ib_wc *wc; 570 struct ib_cq *cq = xprt->sc_sq_cq; 571 int ret; 572 573 memset(wc_a, 0, sizeof(wc_a)); 574 575 if (!test_and_clear_bit(RDMAXPRT_SQ_PENDING, &xprt->sc_flags)) 576 return; 577 578 ib_req_notify_cq(xprt->sc_sq_cq, IB_CQ_NEXT_COMP); 579 atomic_inc(&rdma_stat_sq_poll); 580 while ((ret = ib_poll_cq(cq, ARRAY_SIZE(wc_a), wc_a)) > 0) { 581 int i; 582 583 for (i = 0; i < ret; i++) { 584 wc = &wc_a[i]; 585 if (wc->status != IB_WC_SUCCESS) { 586 dprintk("svcrdma: sq wc err status %s (%d)\n", 587 ib_wc_status_msg(wc->status), 588 wc->status); 589 590 /* Close the transport */ 591 set_bit(XPT_CLOSE, &xprt->sc_xprt.xpt_flags); 592 } 593 594 /* Decrement used SQ WR count */ 595 atomic_dec(&xprt->sc_sq_count); 596 wake_up(&xprt->sc_send_wait); 597 598 ctxt = (struct svc_rdma_op_ctxt *) 599 (unsigned long)wc->wr_id; 600 if (ctxt) 601 process_context(xprt, ctxt); 602 603 svc_xprt_put(&xprt->sc_xprt); 604 } 605 } 606 607 if (ctxt) 608 atomic_inc(&rdma_stat_sq_prod); 609 } 610 611 static void sq_comp_handler(struct ib_cq *cq, void *cq_context) 612 { 613 struct svcxprt_rdma *xprt = cq_context; 614 unsigned long flags; 615 616 /* Guard against unconditional flush call for destroyed QP */ 617 if (atomic_read(&xprt->sc_xprt.xpt_ref.refcount)==0) 618 return; 619 620 /* 621 * Set the bit regardless of whether or not it's on the list 622 * because it may be on the list already due to an RQ 623 * completion. 624 */ 625 set_bit(RDMAXPRT_SQ_PENDING, &xprt->sc_flags); 626 627 /* 628 * If this transport is not already on the DTO transport queue, 629 * add it 630 */ 631 spin_lock_irqsave(&dto_lock, flags); 632 if (list_empty(&xprt->sc_dto_q)) { 633 svc_xprt_get(&xprt->sc_xprt); 634 list_add_tail(&xprt->sc_dto_q, &dto_xprt_q); 635 } 636 spin_unlock_irqrestore(&dto_lock, flags); 637 638 /* Tasklet does all the work to avoid irqsave locks. */ 639 tasklet_schedule(&dto_tasklet); 640 } 641 642 static struct svcxprt_rdma *rdma_create_xprt(struct svc_serv *serv, 643 int listener) 644 { 645 struct svcxprt_rdma *cma_xprt = kzalloc(sizeof *cma_xprt, GFP_KERNEL); 646 647 if (!cma_xprt) 648 return NULL; 649 svc_xprt_init(&init_net, &svc_rdma_class, &cma_xprt->sc_xprt, serv); 650 INIT_LIST_HEAD(&cma_xprt->sc_accept_q); 651 INIT_LIST_HEAD(&cma_xprt->sc_dto_q); 652 INIT_LIST_HEAD(&cma_xprt->sc_rq_dto_q); 653 INIT_LIST_HEAD(&cma_xprt->sc_read_complete_q); 654 INIT_LIST_HEAD(&cma_xprt->sc_frmr_q); 655 INIT_LIST_HEAD(&cma_xprt->sc_ctxts); 656 INIT_LIST_HEAD(&cma_xprt->sc_maps); 657 init_waitqueue_head(&cma_xprt->sc_send_wait); 658 659 spin_lock_init(&cma_xprt->sc_lock); 660 spin_lock_init(&cma_xprt->sc_rq_dto_lock); 661 spin_lock_init(&cma_xprt->sc_frmr_q_lock); 662 spin_lock_init(&cma_xprt->sc_ctxt_lock); 663 spin_lock_init(&cma_xprt->sc_map_lock); 664 665 if (listener) 666 set_bit(XPT_LISTENER, &cma_xprt->sc_xprt.xpt_flags); 667 668 return cma_xprt; 669 } 670 671 int svc_rdma_post_recv(struct svcxprt_rdma *xprt, gfp_t flags) 672 { 673 struct ib_recv_wr recv_wr, *bad_recv_wr; 674 struct svc_rdma_op_ctxt *ctxt; 675 struct page *page; 676 dma_addr_t pa; 677 int sge_no; 678 int buflen; 679 int ret; 680 681 ctxt = svc_rdma_get_context(xprt); 682 buflen = 0; 683 ctxt->direction = DMA_FROM_DEVICE; 684 for (sge_no = 0; buflen < xprt->sc_max_req_size; sge_no++) { 685 if (sge_no >= xprt->sc_max_sge) { 686 pr_err("svcrdma: Too many sges (%d)\n", sge_no); 687 goto err_put_ctxt; 688 } 689 page = alloc_page(flags); 690 if (!page) 691 goto err_put_ctxt; 692 ctxt->pages[sge_no] = page; 693 pa = ib_dma_map_page(xprt->sc_cm_id->device, 694 page, 0, PAGE_SIZE, 695 DMA_FROM_DEVICE); 696 if (ib_dma_mapping_error(xprt->sc_cm_id->device, pa)) 697 goto err_put_ctxt; 698 atomic_inc(&xprt->sc_dma_used); 699 ctxt->sge[sge_no].addr = pa; 700 ctxt->sge[sge_no].length = PAGE_SIZE; 701 ctxt->sge[sge_no].lkey = xprt->sc_pd->local_dma_lkey; 702 ctxt->count = sge_no + 1; 703 buflen += PAGE_SIZE; 704 } 705 recv_wr.next = NULL; 706 recv_wr.sg_list = &ctxt->sge[0]; 707 recv_wr.num_sge = ctxt->count; 708 recv_wr.wr_id = (u64)(unsigned long)ctxt; 709 710 svc_xprt_get(&xprt->sc_xprt); 711 ret = ib_post_recv(xprt->sc_qp, &recv_wr, &bad_recv_wr); 712 if (ret) { 713 svc_rdma_unmap_dma(ctxt); 714 svc_rdma_put_context(ctxt, 1); 715 svc_xprt_put(&xprt->sc_xprt); 716 } 717 return ret; 718 719 err_put_ctxt: 720 svc_rdma_unmap_dma(ctxt); 721 svc_rdma_put_context(ctxt, 1); 722 return -ENOMEM; 723 } 724 725 /* 726 * This function handles the CONNECT_REQUEST event on a listening 727 * endpoint. It is passed the cma_id for the _new_ connection. The context in 728 * this cma_id is inherited from the listening cma_id and is the svc_xprt 729 * structure for the listening endpoint. 730 * 731 * This function creates a new xprt for the new connection and enqueues it on 732 * the accept queue for the listent xprt. When the listen thread is kicked, it 733 * will call the recvfrom method on the listen xprt which will accept the new 734 * connection. 735 */ 736 static void handle_connect_req(struct rdma_cm_id *new_cma_id, size_t client_ird) 737 { 738 struct svcxprt_rdma *listen_xprt = new_cma_id->context; 739 struct svcxprt_rdma *newxprt; 740 struct sockaddr *sa; 741 742 /* Create a new transport */ 743 newxprt = rdma_create_xprt(listen_xprt->sc_xprt.xpt_server, 0); 744 if (!newxprt) { 745 dprintk("svcrdma: failed to create new transport\n"); 746 return; 747 } 748 newxprt->sc_cm_id = new_cma_id; 749 new_cma_id->context = newxprt; 750 dprintk("svcrdma: Creating newxprt=%p, cm_id=%p, listenxprt=%p\n", 751 newxprt, newxprt->sc_cm_id, listen_xprt); 752 753 /* Save client advertised inbound read limit for use later in accept. */ 754 newxprt->sc_ord = client_ird; 755 756 /* Set the local and remote addresses in the transport */ 757 sa = (struct sockaddr *)&newxprt->sc_cm_id->route.addr.dst_addr; 758 svc_xprt_set_remote(&newxprt->sc_xprt, sa, svc_addr_len(sa)); 759 sa = (struct sockaddr *)&newxprt->sc_cm_id->route.addr.src_addr; 760 svc_xprt_set_local(&newxprt->sc_xprt, sa, svc_addr_len(sa)); 761 762 /* 763 * Enqueue the new transport on the accept queue of the listening 764 * transport 765 */ 766 spin_lock_bh(&listen_xprt->sc_lock); 767 list_add_tail(&newxprt->sc_accept_q, &listen_xprt->sc_accept_q); 768 spin_unlock_bh(&listen_xprt->sc_lock); 769 770 set_bit(XPT_CONN, &listen_xprt->sc_xprt.xpt_flags); 771 svc_xprt_enqueue(&listen_xprt->sc_xprt); 772 } 773 774 /* 775 * Handles events generated on the listening endpoint. These events will be 776 * either be incoming connect requests or adapter removal events. 777 */ 778 static int rdma_listen_handler(struct rdma_cm_id *cma_id, 779 struct rdma_cm_event *event) 780 { 781 struct svcxprt_rdma *xprt = cma_id->context; 782 int ret = 0; 783 784 switch (event->event) { 785 case RDMA_CM_EVENT_CONNECT_REQUEST: 786 dprintk("svcrdma: Connect request on cma_id=%p, xprt = %p, " 787 "event = %s (%d)\n", cma_id, cma_id->context, 788 rdma_event_msg(event->event), event->event); 789 handle_connect_req(cma_id, 790 event->param.conn.initiator_depth); 791 break; 792 793 case RDMA_CM_EVENT_ESTABLISHED: 794 /* Accept complete */ 795 dprintk("svcrdma: Connection completed on LISTEN xprt=%p, " 796 "cm_id=%p\n", xprt, cma_id); 797 break; 798 799 case RDMA_CM_EVENT_DEVICE_REMOVAL: 800 dprintk("svcrdma: Device removal xprt=%p, cm_id=%p\n", 801 xprt, cma_id); 802 if (xprt) 803 set_bit(XPT_CLOSE, &xprt->sc_xprt.xpt_flags); 804 break; 805 806 default: 807 dprintk("svcrdma: Unexpected event on listening endpoint %p, " 808 "event = %s (%d)\n", cma_id, 809 rdma_event_msg(event->event), event->event); 810 break; 811 } 812 813 return ret; 814 } 815 816 static int rdma_cma_handler(struct rdma_cm_id *cma_id, 817 struct rdma_cm_event *event) 818 { 819 struct svc_xprt *xprt = cma_id->context; 820 struct svcxprt_rdma *rdma = 821 container_of(xprt, struct svcxprt_rdma, sc_xprt); 822 switch (event->event) { 823 case RDMA_CM_EVENT_ESTABLISHED: 824 /* Accept complete */ 825 svc_xprt_get(xprt); 826 dprintk("svcrdma: Connection completed on DTO xprt=%p, " 827 "cm_id=%p\n", xprt, cma_id); 828 clear_bit(RDMAXPRT_CONN_PENDING, &rdma->sc_flags); 829 svc_xprt_enqueue(xprt); 830 break; 831 case RDMA_CM_EVENT_DISCONNECTED: 832 dprintk("svcrdma: Disconnect on DTO xprt=%p, cm_id=%p\n", 833 xprt, cma_id); 834 if (xprt) { 835 set_bit(XPT_CLOSE, &xprt->xpt_flags); 836 svc_xprt_enqueue(xprt); 837 svc_xprt_put(xprt); 838 } 839 break; 840 case RDMA_CM_EVENT_DEVICE_REMOVAL: 841 dprintk("svcrdma: Device removal cma_id=%p, xprt = %p, " 842 "event = %s (%d)\n", cma_id, xprt, 843 rdma_event_msg(event->event), event->event); 844 if (xprt) { 845 set_bit(XPT_CLOSE, &xprt->xpt_flags); 846 svc_xprt_enqueue(xprt); 847 svc_xprt_put(xprt); 848 } 849 break; 850 default: 851 dprintk("svcrdma: Unexpected event on DTO endpoint %p, " 852 "event = %s (%d)\n", cma_id, 853 rdma_event_msg(event->event), event->event); 854 break; 855 } 856 return 0; 857 } 858 859 /* 860 * Create a listening RDMA service endpoint. 861 */ 862 static struct svc_xprt *svc_rdma_create(struct svc_serv *serv, 863 struct net *net, 864 struct sockaddr *sa, int salen, 865 int flags) 866 { 867 struct rdma_cm_id *listen_id; 868 struct svcxprt_rdma *cma_xprt; 869 int ret; 870 871 dprintk("svcrdma: Creating RDMA socket\n"); 872 if (sa->sa_family != AF_INET) { 873 dprintk("svcrdma: Address family %d is not supported.\n", sa->sa_family); 874 return ERR_PTR(-EAFNOSUPPORT); 875 } 876 cma_xprt = rdma_create_xprt(serv, 1); 877 if (!cma_xprt) 878 return ERR_PTR(-ENOMEM); 879 880 listen_id = rdma_create_id(&init_net, rdma_listen_handler, cma_xprt, 881 RDMA_PS_TCP, IB_QPT_RC); 882 if (IS_ERR(listen_id)) { 883 ret = PTR_ERR(listen_id); 884 dprintk("svcrdma: rdma_create_id failed = %d\n", ret); 885 goto err0; 886 } 887 888 ret = rdma_bind_addr(listen_id, sa); 889 if (ret) { 890 dprintk("svcrdma: rdma_bind_addr failed = %d\n", ret); 891 goto err1; 892 } 893 cma_xprt->sc_cm_id = listen_id; 894 895 ret = rdma_listen(listen_id, RPCRDMA_LISTEN_BACKLOG); 896 if (ret) { 897 dprintk("svcrdma: rdma_listen failed = %d\n", ret); 898 goto err1; 899 } 900 901 /* 902 * We need to use the address from the cm_id in case the 903 * caller specified 0 for the port number. 904 */ 905 sa = (struct sockaddr *)&cma_xprt->sc_cm_id->route.addr.src_addr; 906 svc_xprt_set_local(&cma_xprt->sc_xprt, sa, salen); 907 908 return &cma_xprt->sc_xprt; 909 910 err1: 911 rdma_destroy_id(listen_id); 912 err0: 913 kfree(cma_xprt); 914 return ERR_PTR(ret); 915 } 916 917 static struct svc_rdma_fastreg_mr *rdma_alloc_frmr(struct svcxprt_rdma *xprt) 918 { 919 struct ib_mr *mr; 920 struct scatterlist *sg; 921 struct svc_rdma_fastreg_mr *frmr; 922 u32 num_sg; 923 924 frmr = kmalloc(sizeof(*frmr), GFP_KERNEL); 925 if (!frmr) 926 goto err; 927 928 num_sg = min_t(u32, RPCSVC_MAXPAGES, xprt->sc_frmr_pg_list_len); 929 mr = ib_alloc_mr(xprt->sc_pd, IB_MR_TYPE_MEM_REG, num_sg); 930 if (IS_ERR(mr)) 931 goto err_free_frmr; 932 933 sg = kcalloc(RPCSVC_MAXPAGES, sizeof(*sg), GFP_KERNEL); 934 if (!sg) 935 goto err_free_mr; 936 937 sg_init_table(sg, RPCSVC_MAXPAGES); 938 939 frmr->mr = mr; 940 frmr->sg = sg; 941 INIT_LIST_HEAD(&frmr->frmr_list); 942 return frmr; 943 944 err_free_mr: 945 ib_dereg_mr(mr); 946 err_free_frmr: 947 kfree(frmr); 948 err: 949 return ERR_PTR(-ENOMEM); 950 } 951 952 static void rdma_dealloc_frmr_q(struct svcxprt_rdma *xprt) 953 { 954 struct svc_rdma_fastreg_mr *frmr; 955 956 while (!list_empty(&xprt->sc_frmr_q)) { 957 frmr = list_entry(xprt->sc_frmr_q.next, 958 struct svc_rdma_fastreg_mr, frmr_list); 959 list_del_init(&frmr->frmr_list); 960 kfree(frmr->sg); 961 ib_dereg_mr(frmr->mr); 962 kfree(frmr); 963 } 964 } 965 966 struct svc_rdma_fastreg_mr *svc_rdma_get_frmr(struct svcxprt_rdma *rdma) 967 { 968 struct svc_rdma_fastreg_mr *frmr = NULL; 969 970 spin_lock_bh(&rdma->sc_frmr_q_lock); 971 if (!list_empty(&rdma->sc_frmr_q)) { 972 frmr = list_entry(rdma->sc_frmr_q.next, 973 struct svc_rdma_fastreg_mr, frmr_list); 974 list_del_init(&frmr->frmr_list); 975 frmr->sg_nents = 0; 976 } 977 spin_unlock_bh(&rdma->sc_frmr_q_lock); 978 if (frmr) 979 return frmr; 980 981 return rdma_alloc_frmr(rdma); 982 } 983 984 void svc_rdma_put_frmr(struct svcxprt_rdma *rdma, 985 struct svc_rdma_fastreg_mr *frmr) 986 { 987 if (frmr) { 988 ib_dma_unmap_sg(rdma->sc_cm_id->device, 989 frmr->sg, frmr->sg_nents, frmr->direction); 990 atomic_dec(&rdma->sc_dma_used); 991 spin_lock_bh(&rdma->sc_frmr_q_lock); 992 WARN_ON_ONCE(!list_empty(&frmr->frmr_list)); 993 list_add(&frmr->frmr_list, &rdma->sc_frmr_q); 994 spin_unlock_bh(&rdma->sc_frmr_q_lock); 995 } 996 } 997 998 /* 999 * This is the xpo_recvfrom function for listening endpoints. Its 1000 * purpose is to accept incoming connections. The CMA callback handler 1001 * has already created a new transport and attached it to the new CMA 1002 * ID. 1003 * 1004 * There is a queue of pending connections hung on the listening 1005 * transport. This queue contains the new svc_xprt structure. This 1006 * function takes svc_xprt structures off the accept_q and completes 1007 * the connection. 1008 */ 1009 static struct svc_xprt *svc_rdma_accept(struct svc_xprt *xprt) 1010 { 1011 struct svcxprt_rdma *listen_rdma; 1012 struct svcxprt_rdma *newxprt = NULL; 1013 struct rdma_conn_param conn_param; 1014 struct ib_cq_init_attr cq_attr = {}; 1015 struct ib_qp_init_attr qp_attr; 1016 struct ib_device *dev; 1017 unsigned int i; 1018 int ret = 0; 1019 1020 listen_rdma = container_of(xprt, struct svcxprt_rdma, sc_xprt); 1021 clear_bit(XPT_CONN, &xprt->xpt_flags); 1022 /* Get the next entry off the accept list */ 1023 spin_lock_bh(&listen_rdma->sc_lock); 1024 if (!list_empty(&listen_rdma->sc_accept_q)) { 1025 newxprt = list_entry(listen_rdma->sc_accept_q.next, 1026 struct svcxprt_rdma, sc_accept_q); 1027 list_del_init(&newxprt->sc_accept_q); 1028 } 1029 if (!list_empty(&listen_rdma->sc_accept_q)) 1030 set_bit(XPT_CONN, &listen_rdma->sc_xprt.xpt_flags); 1031 spin_unlock_bh(&listen_rdma->sc_lock); 1032 if (!newxprt) 1033 return NULL; 1034 1035 dprintk("svcrdma: newxprt from accept queue = %p, cm_id=%p\n", 1036 newxprt, newxprt->sc_cm_id); 1037 1038 dev = newxprt->sc_cm_id->device; 1039 1040 /* Qualify the transport resource defaults with the 1041 * capabilities of this particular device */ 1042 newxprt->sc_max_sge = min((size_t)dev->attrs.max_sge, 1043 (size_t)RPCSVC_MAXPAGES); 1044 newxprt->sc_max_sge_rd = min_t(size_t, dev->attrs.max_sge_rd, 1045 RPCSVC_MAXPAGES); 1046 newxprt->sc_max_req_size = svcrdma_max_req_size; 1047 newxprt->sc_max_requests = min_t(u32, dev->attrs.max_qp_wr, 1048 svcrdma_max_requests); 1049 newxprt->sc_max_bc_requests = min_t(u32, dev->attrs.max_qp_wr, 1050 svcrdma_max_bc_requests); 1051 newxprt->sc_rq_depth = newxprt->sc_max_requests + 1052 newxprt->sc_max_bc_requests; 1053 newxprt->sc_sq_depth = RPCRDMA_SQ_DEPTH_MULT * newxprt->sc_rq_depth; 1054 1055 if (!svc_rdma_prealloc_ctxts(newxprt)) 1056 goto errout; 1057 if (!svc_rdma_prealloc_maps(newxprt)) 1058 goto errout; 1059 1060 /* 1061 * Limit ORD based on client limit, local device limit, and 1062 * configured svcrdma limit. 1063 */ 1064 newxprt->sc_ord = min_t(size_t, dev->attrs.max_qp_rd_atom, newxprt->sc_ord); 1065 newxprt->sc_ord = min_t(size_t, svcrdma_ord, newxprt->sc_ord); 1066 1067 newxprt->sc_pd = ib_alloc_pd(dev); 1068 if (IS_ERR(newxprt->sc_pd)) { 1069 dprintk("svcrdma: error creating PD for connect request\n"); 1070 goto errout; 1071 } 1072 cq_attr.cqe = newxprt->sc_sq_depth; 1073 newxprt->sc_sq_cq = ib_create_cq(dev, 1074 sq_comp_handler, 1075 cq_event_handler, 1076 newxprt, 1077 &cq_attr); 1078 if (IS_ERR(newxprt->sc_sq_cq)) { 1079 dprintk("svcrdma: error creating SQ CQ for connect request\n"); 1080 goto errout; 1081 } 1082 cq_attr.cqe = newxprt->sc_rq_depth; 1083 newxprt->sc_rq_cq = ib_create_cq(dev, 1084 rq_comp_handler, 1085 cq_event_handler, 1086 newxprt, 1087 &cq_attr); 1088 if (IS_ERR(newxprt->sc_rq_cq)) { 1089 dprintk("svcrdma: error creating RQ CQ for connect request\n"); 1090 goto errout; 1091 } 1092 1093 memset(&qp_attr, 0, sizeof qp_attr); 1094 qp_attr.event_handler = qp_event_handler; 1095 qp_attr.qp_context = &newxprt->sc_xprt; 1096 qp_attr.cap.max_send_wr = newxprt->sc_sq_depth; 1097 qp_attr.cap.max_recv_wr = newxprt->sc_rq_depth; 1098 qp_attr.cap.max_send_sge = newxprt->sc_max_sge; 1099 qp_attr.cap.max_recv_sge = newxprt->sc_max_sge; 1100 qp_attr.sq_sig_type = IB_SIGNAL_REQ_WR; 1101 qp_attr.qp_type = IB_QPT_RC; 1102 qp_attr.send_cq = newxprt->sc_sq_cq; 1103 qp_attr.recv_cq = newxprt->sc_rq_cq; 1104 dprintk("svcrdma: newxprt->sc_cm_id=%p, newxprt->sc_pd=%p\n" 1105 " cm_id->device=%p, sc_pd->device=%p\n" 1106 " cap.max_send_wr = %d\n" 1107 " cap.max_recv_wr = %d\n" 1108 " cap.max_send_sge = %d\n" 1109 " cap.max_recv_sge = %d\n", 1110 newxprt->sc_cm_id, newxprt->sc_pd, 1111 dev, newxprt->sc_pd->device, 1112 qp_attr.cap.max_send_wr, 1113 qp_attr.cap.max_recv_wr, 1114 qp_attr.cap.max_send_sge, 1115 qp_attr.cap.max_recv_sge); 1116 1117 ret = rdma_create_qp(newxprt->sc_cm_id, newxprt->sc_pd, &qp_attr); 1118 if (ret) { 1119 dprintk("svcrdma: failed to create QP, ret=%d\n", ret); 1120 goto errout; 1121 } 1122 newxprt->sc_qp = newxprt->sc_cm_id->qp; 1123 1124 /* 1125 * Use the most secure set of MR resources based on the 1126 * transport type and available memory management features in 1127 * the device. Here's the table implemented below: 1128 * 1129 * Fast Global DMA Remote WR 1130 * Reg LKEY MR Access 1131 * Sup'd Sup'd Needed Needed 1132 * 1133 * IWARP N N Y Y 1134 * N Y Y Y 1135 * Y N Y N 1136 * Y Y N - 1137 * 1138 * IB N N Y N 1139 * N Y N - 1140 * Y N Y N 1141 * Y Y N - 1142 * 1143 * NB: iWARP requires remote write access for the data sink 1144 * of an RDMA_READ. IB does not. 1145 */ 1146 newxprt->sc_reader = rdma_read_chunk_lcl; 1147 if (dev->attrs.device_cap_flags & IB_DEVICE_MEM_MGT_EXTENSIONS) { 1148 newxprt->sc_frmr_pg_list_len = 1149 dev->attrs.max_fast_reg_page_list_len; 1150 newxprt->sc_dev_caps |= SVCRDMA_DEVCAP_FAST_REG; 1151 newxprt->sc_reader = rdma_read_chunk_frmr; 1152 } 1153 1154 /* 1155 * Determine if a DMA MR is required and if so, what privs are required 1156 */ 1157 if (!rdma_protocol_iwarp(dev, newxprt->sc_cm_id->port_num) && 1158 !rdma_ib_or_roce(dev, newxprt->sc_cm_id->port_num)) 1159 goto errout; 1160 1161 if (rdma_protocol_iwarp(dev, newxprt->sc_cm_id->port_num)) 1162 newxprt->sc_dev_caps |= SVCRDMA_DEVCAP_READ_W_INV; 1163 1164 /* Post receive buffers */ 1165 for (i = 0; i < newxprt->sc_rq_depth; i++) { 1166 ret = svc_rdma_post_recv(newxprt, GFP_KERNEL); 1167 if (ret) { 1168 dprintk("svcrdma: failure posting receive buffers\n"); 1169 goto errout; 1170 } 1171 } 1172 1173 /* Swap out the handler */ 1174 newxprt->sc_cm_id->event_handler = rdma_cma_handler; 1175 1176 /* 1177 * Arm the CQs for the SQ and RQ before accepting so we can't 1178 * miss the first message 1179 */ 1180 ib_req_notify_cq(newxprt->sc_sq_cq, IB_CQ_NEXT_COMP); 1181 ib_req_notify_cq(newxprt->sc_rq_cq, IB_CQ_NEXT_COMP); 1182 1183 /* Accept Connection */ 1184 set_bit(RDMAXPRT_CONN_PENDING, &newxprt->sc_flags); 1185 memset(&conn_param, 0, sizeof conn_param); 1186 conn_param.responder_resources = 0; 1187 conn_param.initiator_depth = newxprt->sc_ord; 1188 ret = rdma_accept(newxprt->sc_cm_id, &conn_param); 1189 if (ret) { 1190 dprintk("svcrdma: failed to accept new connection, ret=%d\n", 1191 ret); 1192 goto errout; 1193 } 1194 1195 dprintk("svcrdma: new connection %p accepted with the following " 1196 "attributes:\n" 1197 " local_ip : %pI4\n" 1198 " local_port : %d\n" 1199 " remote_ip : %pI4\n" 1200 " remote_port : %d\n" 1201 " max_sge : %d\n" 1202 " max_sge_rd : %d\n" 1203 " sq_depth : %d\n" 1204 " max_requests : %d\n" 1205 " ord : %d\n", 1206 newxprt, 1207 &((struct sockaddr_in *)&newxprt->sc_cm_id-> 1208 route.addr.src_addr)->sin_addr.s_addr, 1209 ntohs(((struct sockaddr_in *)&newxprt->sc_cm_id-> 1210 route.addr.src_addr)->sin_port), 1211 &((struct sockaddr_in *)&newxprt->sc_cm_id-> 1212 route.addr.dst_addr)->sin_addr.s_addr, 1213 ntohs(((struct sockaddr_in *)&newxprt->sc_cm_id-> 1214 route.addr.dst_addr)->sin_port), 1215 newxprt->sc_max_sge, 1216 newxprt->sc_max_sge_rd, 1217 newxprt->sc_sq_depth, 1218 newxprt->sc_max_requests, 1219 newxprt->sc_ord); 1220 1221 return &newxprt->sc_xprt; 1222 1223 errout: 1224 dprintk("svcrdma: failure accepting new connection rc=%d.\n", ret); 1225 /* Take a reference in case the DTO handler runs */ 1226 svc_xprt_get(&newxprt->sc_xprt); 1227 if (newxprt->sc_qp && !IS_ERR(newxprt->sc_qp)) 1228 ib_destroy_qp(newxprt->sc_qp); 1229 rdma_destroy_id(newxprt->sc_cm_id); 1230 /* This call to put will destroy the transport */ 1231 svc_xprt_put(&newxprt->sc_xprt); 1232 return NULL; 1233 } 1234 1235 static void svc_rdma_release_rqst(struct svc_rqst *rqstp) 1236 { 1237 } 1238 1239 /* 1240 * When connected, an svc_xprt has at least two references: 1241 * 1242 * - A reference held by the cm_id between the ESTABLISHED and 1243 * DISCONNECTED events. If the remote peer disconnected first, this 1244 * reference could be gone. 1245 * 1246 * - A reference held by the svc_recv code that called this function 1247 * as part of close processing. 1248 * 1249 * At a minimum one references should still be held. 1250 */ 1251 static void svc_rdma_detach(struct svc_xprt *xprt) 1252 { 1253 struct svcxprt_rdma *rdma = 1254 container_of(xprt, struct svcxprt_rdma, sc_xprt); 1255 dprintk("svc: svc_rdma_detach(%p)\n", xprt); 1256 1257 /* Disconnect and flush posted WQE */ 1258 rdma_disconnect(rdma->sc_cm_id); 1259 } 1260 1261 static void __svc_rdma_free(struct work_struct *work) 1262 { 1263 struct svcxprt_rdma *rdma = 1264 container_of(work, struct svcxprt_rdma, sc_work); 1265 struct svc_xprt *xprt = &rdma->sc_xprt; 1266 1267 dprintk("svcrdma: %s(%p)\n", __func__, rdma); 1268 1269 /* We should only be called from kref_put */ 1270 if (atomic_read(&xprt->xpt_ref.refcount) != 0) 1271 pr_err("svcrdma: sc_xprt still in use? (%d)\n", 1272 atomic_read(&xprt->xpt_ref.refcount)); 1273 1274 /* 1275 * Destroy queued, but not processed read completions. Note 1276 * that this cleanup has to be done before destroying the 1277 * cm_id because the device ptr is needed to unmap the dma in 1278 * svc_rdma_put_context. 1279 */ 1280 while (!list_empty(&rdma->sc_read_complete_q)) { 1281 struct svc_rdma_op_ctxt *ctxt; 1282 ctxt = list_entry(rdma->sc_read_complete_q.next, 1283 struct svc_rdma_op_ctxt, 1284 dto_q); 1285 list_del_init(&ctxt->dto_q); 1286 svc_rdma_put_context(ctxt, 1); 1287 } 1288 1289 /* Destroy queued, but not processed recv completions */ 1290 while (!list_empty(&rdma->sc_rq_dto_q)) { 1291 struct svc_rdma_op_ctxt *ctxt; 1292 ctxt = list_entry(rdma->sc_rq_dto_q.next, 1293 struct svc_rdma_op_ctxt, 1294 dto_q); 1295 list_del_init(&ctxt->dto_q); 1296 svc_rdma_put_context(ctxt, 1); 1297 } 1298 1299 /* Warn if we leaked a resource or under-referenced */ 1300 if (rdma->sc_ctxt_used != 0) 1301 pr_err("svcrdma: ctxt still in use? (%d)\n", 1302 rdma->sc_ctxt_used); 1303 if (atomic_read(&rdma->sc_dma_used) != 0) 1304 pr_err("svcrdma: dma still in use? (%d)\n", 1305 atomic_read(&rdma->sc_dma_used)); 1306 1307 /* Final put of backchannel client transport */ 1308 if (xprt->xpt_bc_xprt) { 1309 xprt_put(xprt->xpt_bc_xprt); 1310 xprt->xpt_bc_xprt = NULL; 1311 } 1312 1313 rdma_dealloc_frmr_q(rdma); 1314 svc_rdma_destroy_ctxts(rdma); 1315 svc_rdma_destroy_maps(rdma); 1316 1317 /* Destroy the QP if present (not a listener) */ 1318 if (rdma->sc_qp && !IS_ERR(rdma->sc_qp)) 1319 ib_destroy_qp(rdma->sc_qp); 1320 1321 if (rdma->sc_sq_cq && !IS_ERR(rdma->sc_sq_cq)) 1322 ib_destroy_cq(rdma->sc_sq_cq); 1323 1324 if (rdma->sc_rq_cq && !IS_ERR(rdma->sc_rq_cq)) 1325 ib_destroy_cq(rdma->sc_rq_cq); 1326 1327 if (rdma->sc_pd && !IS_ERR(rdma->sc_pd)) 1328 ib_dealloc_pd(rdma->sc_pd); 1329 1330 /* Destroy the CM ID */ 1331 rdma_destroy_id(rdma->sc_cm_id); 1332 1333 kfree(rdma); 1334 } 1335 1336 static void svc_rdma_free(struct svc_xprt *xprt) 1337 { 1338 struct svcxprt_rdma *rdma = 1339 container_of(xprt, struct svcxprt_rdma, sc_xprt); 1340 INIT_WORK(&rdma->sc_work, __svc_rdma_free); 1341 queue_work(svc_rdma_wq, &rdma->sc_work); 1342 } 1343 1344 static int svc_rdma_has_wspace(struct svc_xprt *xprt) 1345 { 1346 struct svcxprt_rdma *rdma = 1347 container_of(xprt, struct svcxprt_rdma, sc_xprt); 1348 1349 /* 1350 * If there are already waiters on the SQ, 1351 * return false. 1352 */ 1353 if (waitqueue_active(&rdma->sc_send_wait)) 1354 return 0; 1355 1356 /* Otherwise return true. */ 1357 return 1; 1358 } 1359 1360 static int svc_rdma_secure_port(struct svc_rqst *rqstp) 1361 { 1362 return 1; 1363 } 1364 1365 int svc_rdma_send(struct svcxprt_rdma *xprt, struct ib_send_wr *wr) 1366 { 1367 struct ib_send_wr *bad_wr, *n_wr; 1368 int wr_count; 1369 int i; 1370 int ret; 1371 1372 if (test_bit(XPT_CLOSE, &xprt->sc_xprt.xpt_flags)) 1373 return -ENOTCONN; 1374 1375 wr_count = 1; 1376 for (n_wr = wr->next; n_wr; n_wr = n_wr->next) 1377 wr_count++; 1378 1379 /* If the SQ is full, wait until an SQ entry is available */ 1380 while (1) { 1381 spin_lock_bh(&xprt->sc_lock); 1382 if (xprt->sc_sq_depth < atomic_read(&xprt->sc_sq_count) + wr_count) { 1383 spin_unlock_bh(&xprt->sc_lock); 1384 atomic_inc(&rdma_stat_sq_starve); 1385 1386 /* See if we can opportunistically reap SQ WR to make room */ 1387 sq_cq_reap(xprt); 1388 1389 /* Wait until SQ WR available if SQ still full */ 1390 wait_event(xprt->sc_send_wait, 1391 atomic_read(&xprt->sc_sq_count) < 1392 xprt->sc_sq_depth); 1393 if (test_bit(XPT_CLOSE, &xprt->sc_xprt.xpt_flags)) 1394 return -ENOTCONN; 1395 continue; 1396 } 1397 /* Take a transport ref for each WR posted */ 1398 for (i = 0; i < wr_count; i++) 1399 svc_xprt_get(&xprt->sc_xprt); 1400 1401 /* Bump used SQ WR count and post */ 1402 atomic_add(wr_count, &xprt->sc_sq_count); 1403 ret = ib_post_send(xprt->sc_qp, wr, &bad_wr); 1404 if (ret) { 1405 set_bit(XPT_CLOSE, &xprt->sc_xprt.xpt_flags); 1406 atomic_sub(wr_count, &xprt->sc_sq_count); 1407 for (i = 0; i < wr_count; i ++) 1408 svc_xprt_put(&xprt->sc_xprt); 1409 dprintk("svcrdma: failed to post SQ WR rc=%d, " 1410 "sc_sq_count=%d, sc_sq_depth=%d\n", 1411 ret, atomic_read(&xprt->sc_sq_count), 1412 xprt->sc_sq_depth); 1413 } 1414 spin_unlock_bh(&xprt->sc_lock); 1415 if (ret) 1416 wake_up(&xprt->sc_send_wait); 1417 break; 1418 } 1419 return ret; 1420 } 1421 1422 void svc_rdma_send_error(struct svcxprt_rdma *xprt, struct rpcrdma_msg *rmsgp, 1423 enum rpcrdma_errcode err) 1424 { 1425 struct ib_send_wr err_wr; 1426 struct page *p; 1427 struct svc_rdma_op_ctxt *ctxt; 1428 __be32 *va; 1429 int length; 1430 int ret; 1431 1432 p = alloc_page(GFP_KERNEL); 1433 if (!p) 1434 return; 1435 va = page_address(p); 1436 1437 /* XDR encode error */ 1438 length = svc_rdma_xdr_encode_error(xprt, rmsgp, err, va); 1439 1440 ctxt = svc_rdma_get_context(xprt); 1441 ctxt->direction = DMA_FROM_DEVICE; 1442 ctxt->count = 1; 1443 ctxt->pages[0] = p; 1444 1445 /* Prepare SGE for local address */ 1446 ctxt->sge[0].addr = ib_dma_map_page(xprt->sc_cm_id->device, 1447 p, 0, length, DMA_FROM_DEVICE); 1448 if (ib_dma_mapping_error(xprt->sc_cm_id->device, ctxt->sge[0].addr)) { 1449 put_page(p); 1450 svc_rdma_put_context(ctxt, 1); 1451 return; 1452 } 1453 atomic_inc(&xprt->sc_dma_used); 1454 ctxt->sge[0].lkey = xprt->sc_pd->local_dma_lkey; 1455 ctxt->sge[0].length = length; 1456 1457 /* Prepare SEND WR */ 1458 memset(&err_wr, 0, sizeof err_wr); 1459 ctxt->wr_op = IB_WR_SEND; 1460 err_wr.wr_id = (unsigned long)ctxt; 1461 err_wr.sg_list = ctxt->sge; 1462 err_wr.num_sge = 1; 1463 err_wr.opcode = IB_WR_SEND; 1464 err_wr.send_flags = IB_SEND_SIGNALED; 1465 1466 /* Post It */ 1467 ret = svc_rdma_send(xprt, &err_wr); 1468 if (ret) { 1469 dprintk("svcrdma: Error %d posting send for protocol error\n", 1470 ret); 1471 svc_rdma_unmap_dma(ctxt); 1472 svc_rdma_put_context(ctxt, 1); 1473 } 1474 } 1475