1 /* 2 * Copyright (c) 2014 Open Grid Computing, Inc. All rights reserved. 3 * Copyright (c) 2005-2007 Network Appliance, Inc. All rights reserved. 4 * 5 * This software is available to you under a choice of one of two 6 * licenses. You may choose to be licensed under the terms of the GNU 7 * General Public License (GPL) Version 2, available from the file 8 * COPYING in the main directory of this source tree, or the BSD-type 9 * license below: 10 * 11 * Redistribution and use in source and binary forms, with or without 12 * modification, are permitted provided that the following conditions 13 * are met: 14 * 15 * Redistributions of source code must retain the above copyright 16 * notice, this list of conditions and the following disclaimer. 17 * 18 * Redistributions in binary form must reproduce the above 19 * copyright notice, this list of conditions and the following 20 * disclaimer in the documentation and/or other materials provided 21 * with the distribution. 22 * 23 * Neither the name of the Network Appliance, Inc. nor the names of 24 * its contributors may be used to endorse or promote products 25 * derived from this software without specific prior written 26 * permission. 27 * 28 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 29 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 30 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 31 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 32 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 33 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 34 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 35 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 36 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 37 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 38 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 39 * 40 * Author: Tom Tucker <tom@opengridcomputing.com> 41 */ 42 43 #include <linux/sunrpc/svc_xprt.h> 44 #include <linux/sunrpc/addr.h> 45 #include <linux/sunrpc/debug.h> 46 #include <linux/sunrpc/rpc_rdma.h> 47 #include <linux/interrupt.h> 48 #include <linux/sched.h> 49 #include <linux/slab.h> 50 #include <linux/spinlock.h> 51 #include <linux/workqueue.h> 52 #include <rdma/ib_verbs.h> 53 #include <rdma/rdma_cm.h> 54 #include <rdma/rw.h> 55 #include <linux/sunrpc/svc_rdma.h> 56 #include <linux/export.h> 57 #include "xprt_rdma.h" 58 59 #define RPCDBG_FACILITY RPCDBG_SVCXPRT 60 61 static int svc_rdma_post_recv(struct svcxprt_rdma *xprt); 62 static struct svcxprt_rdma *rdma_create_xprt(struct svc_serv *, int); 63 static struct svc_xprt *svc_rdma_create(struct svc_serv *serv, 64 struct net *net, 65 struct sockaddr *sa, int salen, 66 int flags); 67 static struct svc_xprt *svc_rdma_accept(struct svc_xprt *xprt); 68 static void svc_rdma_release_rqst(struct svc_rqst *); 69 static void svc_rdma_detach(struct svc_xprt *xprt); 70 static void svc_rdma_free(struct svc_xprt *xprt); 71 static int svc_rdma_has_wspace(struct svc_xprt *xprt); 72 static void svc_rdma_secure_port(struct svc_rqst *); 73 static void svc_rdma_kill_temp_xprt(struct svc_xprt *); 74 75 static const struct svc_xprt_ops svc_rdma_ops = { 76 .xpo_create = svc_rdma_create, 77 .xpo_recvfrom = svc_rdma_recvfrom, 78 .xpo_sendto = svc_rdma_sendto, 79 .xpo_release_rqst = svc_rdma_release_rqst, 80 .xpo_detach = svc_rdma_detach, 81 .xpo_free = svc_rdma_free, 82 .xpo_prep_reply_hdr = svc_rdma_prep_reply_hdr, 83 .xpo_has_wspace = svc_rdma_has_wspace, 84 .xpo_accept = svc_rdma_accept, 85 .xpo_secure_port = svc_rdma_secure_port, 86 .xpo_kill_temp_xprt = svc_rdma_kill_temp_xprt, 87 }; 88 89 struct svc_xprt_class svc_rdma_class = { 90 .xcl_name = "rdma", 91 .xcl_owner = THIS_MODULE, 92 .xcl_ops = &svc_rdma_ops, 93 .xcl_max_payload = RPCSVC_MAXPAYLOAD_RDMA, 94 .xcl_ident = XPRT_TRANSPORT_RDMA, 95 }; 96 97 #if defined(CONFIG_SUNRPC_BACKCHANNEL) 98 static struct svc_xprt *svc_rdma_bc_create(struct svc_serv *, struct net *, 99 struct sockaddr *, int, int); 100 static void svc_rdma_bc_detach(struct svc_xprt *); 101 static void svc_rdma_bc_free(struct svc_xprt *); 102 103 static const struct svc_xprt_ops svc_rdma_bc_ops = { 104 .xpo_create = svc_rdma_bc_create, 105 .xpo_detach = svc_rdma_bc_detach, 106 .xpo_free = svc_rdma_bc_free, 107 .xpo_prep_reply_hdr = svc_rdma_prep_reply_hdr, 108 .xpo_secure_port = svc_rdma_secure_port, 109 }; 110 111 struct svc_xprt_class svc_rdma_bc_class = { 112 .xcl_name = "rdma-bc", 113 .xcl_owner = THIS_MODULE, 114 .xcl_ops = &svc_rdma_bc_ops, 115 .xcl_max_payload = (1024 - RPCRDMA_HDRLEN_MIN) 116 }; 117 118 static struct svc_xprt *svc_rdma_bc_create(struct svc_serv *serv, 119 struct net *net, 120 struct sockaddr *sa, int salen, 121 int flags) 122 { 123 struct svcxprt_rdma *cma_xprt; 124 struct svc_xprt *xprt; 125 126 cma_xprt = rdma_create_xprt(serv, 0); 127 if (!cma_xprt) 128 return ERR_PTR(-ENOMEM); 129 xprt = &cma_xprt->sc_xprt; 130 131 svc_xprt_init(net, &svc_rdma_bc_class, xprt, serv); 132 set_bit(XPT_CONG_CTRL, &xprt->xpt_flags); 133 serv->sv_bc_xprt = xprt; 134 135 dprintk("svcrdma: %s(%p)\n", __func__, xprt); 136 return xprt; 137 } 138 139 static void svc_rdma_bc_detach(struct svc_xprt *xprt) 140 { 141 dprintk("svcrdma: %s(%p)\n", __func__, xprt); 142 } 143 144 static void svc_rdma_bc_free(struct svc_xprt *xprt) 145 { 146 struct svcxprt_rdma *rdma = 147 container_of(xprt, struct svcxprt_rdma, sc_xprt); 148 149 dprintk("svcrdma: %s(%p)\n", __func__, xprt); 150 if (xprt) 151 kfree(rdma); 152 } 153 #endif /* CONFIG_SUNRPC_BACKCHANNEL */ 154 155 static struct svc_rdma_op_ctxt *alloc_ctxt(struct svcxprt_rdma *xprt, 156 gfp_t flags) 157 { 158 struct svc_rdma_op_ctxt *ctxt; 159 160 ctxt = kmalloc(sizeof(*ctxt), flags); 161 if (ctxt) { 162 ctxt->xprt = xprt; 163 INIT_LIST_HEAD(&ctxt->list); 164 } 165 return ctxt; 166 } 167 168 static bool svc_rdma_prealloc_ctxts(struct svcxprt_rdma *xprt) 169 { 170 unsigned int i; 171 172 /* Each RPC/RDMA credit can consume one Receive and 173 * one Send WQE at the same time. 174 */ 175 i = xprt->sc_sq_depth + xprt->sc_rq_depth; 176 177 while (i--) { 178 struct svc_rdma_op_ctxt *ctxt; 179 180 ctxt = alloc_ctxt(xprt, GFP_KERNEL); 181 if (!ctxt) { 182 dprintk("svcrdma: No memory for RDMA ctxt\n"); 183 return false; 184 } 185 list_add(&ctxt->list, &xprt->sc_ctxts); 186 } 187 return true; 188 } 189 190 struct svc_rdma_op_ctxt *svc_rdma_get_context(struct svcxprt_rdma *xprt) 191 { 192 struct svc_rdma_op_ctxt *ctxt = NULL; 193 194 spin_lock(&xprt->sc_ctxt_lock); 195 xprt->sc_ctxt_used++; 196 if (list_empty(&xprt->sc_ctxts)) 197 goto out_empty; 198 199 ctxt = list_first_entry(&xprt->sc_ctxts, 200 struct svc_rdma_op_ctxt, list); 201 list_del(&ctxt->list); 202 spin_unlock(&xprt->sc_ctxt_lock); 203 204 out: 205 ctxt->count = 0; 206 ctxt->mapped_sges = 0; 207 return ctxt; 208 209 out_empty: 210 /* Either pre-allocation missed the mark, or send 211 * queue accounting is broken. 212 */ 213 spin_unlock(&xprt->sc_ctxt_lock); 214 215 ctxt = alloc_ctxt(xprt, GFP_NOIO); 216 if (ctxt) 217 goto out; 218 219 spin_lock(&xprt->sc_ctxt_lock); 220 xprt->sc_ctxt_used--; 221 spin_unlock(&xprt->sc_ctxt_lock); 222 WARN_ONCE(1, "svcrdma: empty RDMA ctxt list?\n"); 223 return NULL; 224 } 225 226 void svc_rdma_unmap_dma(struct svc_rdma_op_ctxt *ctxt) 227 { 228 struct svcxprt_rdma *xprt = ctxt->xprt; 229 struct ib_device *device = xprt->sc_cm_id->device; 230 unsigned int i; 231 232 for (i = 0; i < ctxt->mapped_sges; i++) 233 ib_dma_unmap_page(device, 234 ctxt->sge[i].addr, 235 ctxt->sge[i].length, 236 ctxt->direction); 237 ctxt->mapped_sges = 0; 238 } 239 240 void svc_rdma_put_context(struct svc_rdma_op_ctxt *ctxt, int free_pages) 241 { 242 struct svcxprt_rdma *xprt = ctxt->xprt; 243 int i; 244 245 if (free_pages) 246 for (i = 0; i < ctxt->count; i++) 247 put_page(ctxt->pages[i]); 248 249 spin_lock(&xprt->sc_ctxt_lock); 250 xprt->sc_ctxt_used--; 251 list_add(&ctxt->list, &xprt->sc_ctxts); 252 spin_unlock(&xprt->sc_ctxt_lock); 253 } 254 255 static void svc_rdma_destroy_ctxts(struct svcxprt_rdma *xprt) 256 { 257 while (!list_empty(&xprt->sc_ctxts)) { 258 struct svc_rdma_op_ctxt *ctxt; 259 260 ctxt = list_first_entry(&xprt->sc_ctxts, 261 struct svc_rdma_op_ctxt, list); 262 list_del(&ctxt->list); 263 kfree(ctxt); 264 } 265 } 266 267 /* QP event handler */ 268 static void qp_event_handler(struct ib_event *event, void *context) 269 { 270 struct svc_xprt *xprt = context; 271 272 switch (event->event) { 273 /* These are considered benign events */ 274 case IB_EVENT_PATH_MIG: 275 case IB_EVENT_COMM_EST: 276 case IB_EVENT_SQ_DRAINED: 277 case IB_EVENT_QP_LAST_WQE_REACHED: 278 dprintk("svcrdma: QP event %s (%d) received for QP=%p\n", 279 ib_event_msg(event->event), event->event, 280 event->element.qp); 281 break; 282 /* These are considered fatal events */ 283 case IB_EVENT_PATH_MIG_ERR: 284 case IB_EVENT_QP_FATAL: 285 case IB_EVENT_QP_REQ_ERR: 286 case IB_EVENT_QP_ACCESS_ERR: 287 case IB_EVENT_DEVICE_FATAL: 288 default: 289 dprintk("svcrdma: QP ERROR event %s (%d) received for QP=%p, " 290 "closing transport\n", 291 ib_event_msg(event->event), event->event, 292 event->element.qp); 293 set_bit(XPT_CLOSE, &xprt->xpt_flags); 294 svc_xprt_enqueue(xprt); 295 break; 296 } 297 } 298 299 /** 300 * svc_rdma_wc_receive - Invoked by RDMA provider for each polled Receive WC 301 * @cq: completion queue 302 * @wc: completed WR 303 * 304 */ 305 static void svc_rdma_wc_receive(struct ib_cq *cq, struct ib_wc *wc) 306 { 307 struct svcxprt_rdma *xprt = cq->cq_context; 308 struct ib_cqe *cqe = wc->wr_cqe; 309 struct svc_rdma_op_ctxt *ctxt; 310 311 /* WARNING: Only wc->wr_cqe and wc->status are reliable */ 312 ctxt = container_of(cqe, struct svc_rdma_op_ctxt, cqe); 313 svc_rdma_unmap_dma(ctxt); 314 315 if (wc->status != IB_WC_SUCCESS) 316 goto flushed; 317 318 /* All wc fields are now known to be valid */ 319 ctxt->byte_len = wc->byte_len; 320 spin_lock(&xprt->sc_rq_dto_lock); 321 list_add_tail(&ctxt->list, &xprt->sc_rq_dto_q); 322 spin_unlock(&xprt->sc_rq_dto_lock); 323 324 svc_rdma_post_recv(xprt); 325 326 set_bit(XPT_DATA, &xprt->sc_xprt.xpt_flags); 327 if (test_bit(RDMAXPRT_CONN_PENDING, &xprt->sc_flags)) 328 goto out; 329 goto out_enqueue; 330 331 flushed: 332 if (wc->status != IB_WC_WR_FLUSH_ERR) 333 pr_err("svcrdma: Recv: %s (%u/0x%x)\n", 334 ib_wc_status_msg(wc->status), 335 wc->status, wc->vendor_err); 336 set_bit(XPT_CLOSE, &xprt->sc_xprt.xpt_flags); 337 svc_rdma_put_context(ctxt, 1); 338 339 out_enqueue: 340 svc_xprt_enqueue(&xprt->sc_xprt); 341 out: 342 svc_xprt_put(&xprt->sc_xprt); 343 } 344 345 /** 346 * svc_rdma_wc_send - Invoked by RDMA provider for each polled Send WC 347 * @cq: completion queue 348 * @wc: completed WR 349 * 350 */ 351 void svc_rdma_wc_send(struct ib_cq *cq, struct ib_wc *wc) 352 { 353 struct svcxprt_rdma *xprt = cq->cq_context; 354 struct ib_cqe *cqe = wc->wr_cqe; 355 struct svc_rdma_op_ctxt *ctxt; 356 357 atomic_inc(&xprt->sc_sq_avail); 358 wake_up(&xprt->sc_send_wait); 359 360 ctxt = container_of(cqe, struct svc_rdma_op_ctxt, cqe); 361 svc_rdma_unmap_dma(ctxt); 362 svc_rdma_put_context(ctxt, 1); 363 364 if (unlikely(wc->status != IB_WC_SUCCESS)) { 365 set_bit(XPT_CLOSE, &xprt->sc_xprt.xpt_flags); 366 svc_xprt_enqueue(&xprt->sc_xprt); 367 if (wc->status != IB_WC_WR_FLUSH_ERR) 368 pr_err("svcrdma: Send: %s (%u/0x%x)\n", 369 ib_wc_status_msg(wc->status), 370 wc->status, wc->vendor_err); 371 } 372 373 svc_xprt_put(&xprt->sc_xprt); 374 } 375 376 static struct svcxprt_rdma *rdma_create_xprt(struct svc_serv *serv, 377 int listener) 378 { 379 struct svcxprt_rdma *cma_xprt = kzalloc(sizeof *cma_xprt, GFP_KERNEL); 380 381 if (!cma_xprt) 382 return NULL; 383 svc_xprt_init(&init_net, &svc_rdma_class, &cma_xprt->sc_xprt, serv); 384 INIT_LIST_HEAD(&cma_xprt->sc_accept_q); 385 INIT_LIST_HEAD(&cma_xprt->sc_rq_dto_q); 386 INIT_LIST_HEAD(&cma_xprt->sc_read_complete_q); 387 INIT_LIST_HEAD(&cma_xprt->sc_ctxts); 388 INIT_LIST_HEAD(&cma_xprt->sc_rw_ctxts); 389 init_waitqueue_head(&cma_xprt->sc_send_wait); 390 391 spin_lock_init(&cma_xprt->sc_lock); 392 spin_lock_init(&cma_xprt->sc_rq_dto_lock); 393 spin_lock_init(&cma_xprt->sc_ctxt_lock); 394 spin_lock_init(&cma_xprt->sc_rw_ctxt_lock); 395 396 /* 397 * Note that this implies that the underlying transport support 398 * has some form of congestion control (see RFC 7530 section 3.1 399 * paragraph 2). For now, we assume that all supported RDMA 400 * transports are suitable here. 401 */ 402 set_bit(XPT_CONG_CTRL, &cma_xprt->sc_xprt.xpt_flags); 403 404 if (listener) { 405 strcpy(cma_xprt->sc_xprt.xpt_remotebuf, "listener"); 406 set_bit(XPT_LISTENER, &cma_xprt->sc_xprt.xpt_flags); 407 } 408 409 return cma_xprt; 410 } 411 412 static int 413 svc_rdma_post_recv(struct svcxprt_rdma *xprt) 414 { 415 struct ib_recv_wr recv_wr, *bad_recv_wr; 416 struct svc_rdma_op_ctxt *ctxt; 417 struct page *page; 418 dma_addr_t pa; 419 int sge_no; 420 int buflen; 421 int ret; 422 423 ctxt = svc_rdma_get_context(xprt); 424 buflen = 0; 425 ctxt->direction = DMA_FROM_DEVICE; 426 ctxt->cqe.done = svc_rdma_wc_receive; 427 for (sge_no = 0; buflen < xprt->sc_max_req_size; sge_no++) { 428 if (sge_no >= xprt->sc_max_sge) { 429 pr_err("svcrdma: Too many sges (%d)\n", sge_no); 430 goto err_put_ctxt; 431 } 432 page = alloc_page(GFP_KERNEL); 433 if (!page) 434 goto err_put_ctxt; 435 ctxt->pages[sge_no] = page; 436 pa = ib_dma_map_page(xprt->sc_cm_id->device, 437 page, 0, PAGE_SIZE, 438 DMA_FROM_DEVICE); 439 if (ib_dma_mapping_error(xprt->sc_cm_id->device, pa)) 440 goto err_put_ctxt; 441 svc_rdma_count_mappings(xprt, ctxt); 442 ctxt->sge[sge_no].addr = pa; 443 ctxt->sge[sge_no].length = PAGE_SIZE; 444 ctxt->sge[sge_no].lkey = xprt->sc_pd->local_dma_lkey; 445 ctxt->count = sge_no + 1; 446 buflen += PAGE_SIZE; 447 } 448 recv_wr.next = NULL; 449 recv_wr.sg_list = &ctxt->sge[0]; 450 recv_wr.num_sge = ctxt->count; 451 recv_wr.wr_cqe = &ctxt->cqe; 452 453 svc_xprt_get(&xprt->sc_xprt); 454 ret = ib_post_recv(xprt->sc_qp, &recv_wr, &bad_recv_wr); 455 if (ret) { 456 svc_rdma_unmap_dma(ctxt); 457 svc_rdma_put_context(ctxt, 1); 458 svc_xprt_put(&xprt->sc_xprt); 459 } 460 return ret; 461 462 err_put_ctxt: 463 svc_rdma_unmap_dma(ctxt); 464 svc_rdma_put_context(ctxt, 1); 465 return -ENOMEM; 466 } 467 468 static void 469 svc_rdma_parse_connect_private(struct svcxprt_rdma *newxprt, 470 struct rdma_conn_param *param) 471 { 472 const struct rpcrdma_connect_private *pmsg = param->private_data; 473 474 if (pmsg && 475 pmsg->cp_magic == rpcrdma_cmp_magic && 476 pmsg->cp_version == RPCRDMA_CMP_VERSION) { 477 newxprt->sc_snd_w_inv = pmsg->cp_flags & 478 RPCRDMA_CMP_F_SND_W_INV_OK; 479 480 dprintk("svcrdma: client send_size %u, recv_size %u " 481 "remote inv %ssupported\n", 482 rpcrdma_decode_buffer_size(pmsg->cp_send_size), 483 rpcrdma_decode_buffer_size(pmsg->cp_recv_size), 484 newxprt->sc_snd_w_inv ? "" : "un"); 485 } 486 } 487 488 /* 489 * This function handles the CONNECT_REQUEST event on a listening 490 * endpoint. It is passed the cma_id for the _new_ connection. The context in 491 * this cma_id is inherited from the listening cma_id and is the svc_xprt 492 * structure for the listening endpoint. 493 * 494 * This function creates a new xprt for the new connection and enqueues it on 495 * the accept queue for the listent xprt. When the listen thread is kicked, it 496 * will call the recvfrom method on the listen xprt which will accept the new 497 * connection. 498 */ 499 static void handle_connect_req(struct rdma_cm_id *new_cma_id, 500 struct rdma_conn_param *param) 501 { 502 struct svcxprt_rdma *listen_xprt = new_cma_id->context; 503 struct svcxprt_rdma *newxprt; 504 struct sockaddr *sa; 505 506 /* Create a new transport */ 507 newxprt = rdma_create_xprt(listen_xprt->sc_xprt.xpt_server, 0); 508 if (!newxprt) { 509 dprintk("svcrdma: failed to create new transport\n"); 510 return; 511 } 512 newxprt->sc_cm_id = new_cma_id; 513 new_cma_id->context = newxprt; 514 dprintk("svcrdma: Creating newxprt=%p, cm_id=%p, listenxprt=%p\n", 515 newxprt, newxprt->sc_cm_id, listen_xprt); 516 svc_rdma_parse_connect_private(newxprt, param); 517 518 /* Save client advertised inbound read limit for use later in accept. */ 519 newxprt->sc_ord = param->initiator_depth; 520 521 /* Set the local and remote addresses in the transport */ 522 sa = (struct sockaddr *)&newxprt->sc_cm_id->route.addr.dst_addr; 523 svc_xprt_set_remote(&newxprt->sc_xprt, sa, svc_addr_len(sa)); 524 sa = (struct sockaddr *)&newxprt->sc_cm_id->route.addr.src_addr; 525 svc_xprt_set_local(&newxprt->sc_xprt, sa, svc_addr_len(sa)); 526 527 /* 528 * Enqueue the new transport on the accept queue of the listening 529 * transport 530 */ 531 spin_lock_bh(&listen_xprt->sc_lock); 532 list_add_tail(&newxprt->sc_accept_q, &listen_xprt->sc_accept_q); 533 spin_unlock_bh(&listen_xprt->sc_lock); 534 535 set_bit(XPT_CONN, &listen_xprt->sc_xprt.xpt_flags); 536 svc_xprt_enqueue(&listen_xprt->sc_xprt); 537 } 538 539 /* 540 * Handles events generated on the listening endpoint. These events will be 541 * either be incoming connect requests or adapter removal events. 542 */ 543 static int rdma_listen_handler(struct rdma_cm_id *cma_id, 544 struct rdma_cm_event *event) 545 { 546 struct svcxprt_rdma *xprt = cma_id->context; 547 int ret = 0; 548 549 switch (event->event) { 550 case RDMA_CM_EVENT_CONNECT_REQUEST: 551 dprintk("svcrdma: Connect request on cma_id=%p, xprt = %p, " 552 "event = %s (%d)\n", cma_id, cma_id->context, 553 rdma_event_msg(event->event), event->event); 554 handle_connect_req(cma_id, &event->param.conn); 555 break; 556 557 case RDMA_CM_EVENT_ESTABLISHED: 558 /* Accept complete */ 559 dprintk("svcrdma: Connection completed on LISTEN xprt=%p, " 560 "cm_id=%p\n", xprt, cma_id); 561 break; 562 563 case RDMA_CM_EVENT_DEVICE_REMOVAL: 564 dprintk("svcrdma: Device removal xprt=%p, cm_id=%p\n", 565 xprt, cma_id); 566 if (xprt) { 567 set_bit(XPT_CLOSE, &xprt->sc_xprt.xpt_flags); 568 svc_xprt_enqueue(&xprt->sc_xprt); 569 } 570 break; 571 572 default: 573 dprintk("svcrdma: Unexpected event on listening endpoint %p, " 574 "event = %s (%d)\n", cma_id, 575 rdma_event_msg(event->event), event->event); 576 break; 577 } 578 579 return ret; 580 } 581 582 static int rdma_cma_handler(struct rdma_cm_id *cma_id, 583 struct rdma_cm_event *event) 584 { 585 struct svc_xprt *xprt = cma_id->context; 586 struct svcxprt_rdma *rdma = 587 container_of(xprt, struct svcxprt_rdma, sc_xprt); 588 switch (event->event) { 589 case RDMA_CM_EVENT_ESTABLISHED: 590 /* Accept complete */ 591 svc_xprt_get(xprt); 592 dprintk("svcrdma: Connection completed on DTO xprt=%p, " 593 "cm_id=%p\n", xprt, cma_id); 594 clear_bit(RDMAXPRT_CONN_PENDING, &rdma->sc_flags); 595 svc_xprt_enqueue(xprt); 596 break; 597 case RDMA_CM_EVENT_DISCONNECTED: 598 dprintk("svcrdma: Disconnect on DTO xprt=%p, cm_id=%p\n", 599 xprt, cma_id); 600 if (xprt) { 601 set_bit(XPT_CLOSE, &xprt->xpt_flags); 602 svc_xprt_enqueue(xprt); 603 svc_xprt_put(xprt); 604 } 605 break; 606 case RDMA_CM_EVENT_DEVICE_REMOVAL: 607 dprintk("svcrdma: Device removal cma_id=%p, xprt = %p, " 608 "event = %s (%d)\n", cma_id, xprt, 609 rdma_event_msg(event->event), event->event); 610 if (xprt) { 611 set_bit(XPT_CLOSE, &xprt->xpt_flags); 612 svc_xprt_enqueue(xprt); 613 svc_xprt_put(xprt); 614 } 615 break; 616 default: 617 dprintk("svcrdma: Unexpected event on DTO endpoint %p, " 618 "event = %s (%d)\n", cma_id, 619 rdma_event_msg(event->event), event->event); 620 break; 621 } 622 return 0; 623 } 624 625 /* 626 * Create a listening RDMA service endpoint. 627 */ 628 static struct svc_xprt *svc_rdma_create(struct svc_serv *serv, 629 struct net *net, 630 struct sockaddr *sa, int salen, 631 int flags) 632 { 633 struct rdma_cm_id *listen_id; 634 struct svcxprt_rdma *cma_xprt; 635 int ret; 636 637 dprintk("svcrdma: Creating RDMA socket\n"); 638 if ((sa->sa_family != AF_INET) && (sa->sa_family != AF_INET6)) { 639 dprintk("svcrdma: Address family %d is not supported.\n", sa->sa_family); 640 return ERR_PTR(-EAFNOSUPPORT); 641 } 642 cma_xprt = rdma_create_xprt(serv, 1); 643 if (!cma_xprt) 644 return ERR_PTR(-ENOMEM); 645 646 listen_id = rdma_create_id(&init_net, rdma_listen_handler, cma_xprt, 647 RDMA_PS_TCP, IB_QPT_RC); 648 if (IS_ERR(listen_id)) { 649 ret = PTR_ERR(listen_id); 650 dprintk("svcrdma: rdma_create_id failed = %d\n", ret); 651 goto err0; 652 } 653 654 /* Allow both IPv4 and IPv6 sockets to bind a single port 655 * at the same time. 656 */ 657 #if IS_ENABLED(CONFIG_IPV6) 658 ret = rdma_set_afonly(listen_id, 1); 659 if (ret) { 660 dprintk("svcrdma: rdma_set_afonly failed = %d\n", ret); 661 goto err1; 662 } 663 #endif 664 ret = rdma_bind_addr(listen_id, sa); 665 if (ret) { 666 dprintk("svcrdma: rdma_bind_addr failed = %d\n", ret); 667 goto err1; 668 } 669 cma_xprt->sc_cm_id = listen_id; 670 671 ret = rdma_listen(listen_id, RPCRDMA_LISTEN_BACKLOG); 672 if (ret) { 673 dprintk("svcrdma: rdma_listen failed = %d\n", ret); 674 goto err1; 675 } 676 677 /* 678 * We need to use the address from the cm_id in case the 679 * caller specified 0 for the port number. 680 */ 681 sa = (struct sockaddr *)&cma_xprt->sc_cm_id->route.addr.src_addr; 682 svc_xprt_set_local(&cma_xprt->sc_xprt, sa, salen); 683 684 return &cma_xprt->sc_xprt; 685 686 err1: 687 rdma_destroy_id(listen_id); 688 err0: 689 kfree(cma_xprt); 690 return ERR_PTR(ret); 691 } 692 693 /* 694 * This is the xpo_recvfrom function for listening endpoints. Its 695 * purpose is to accept incoming connections. The CMA callback handler 696 * has already created a new transport and attached it to the new CMA 697 * ID. 698 * 699 * There is a queue of pending connections hung on the listening 700 * transport. This queue contains the new svc_xprt structure. This 701 * function takes svc_xprt structures off the accept_q and completes 702 * the connection. 703 */ 704 static struct svc_xprt *svc_rdma_accept(struct svc_xprt *xprt) 705 { 706 struct svcxprt_rdma *listen_rdma; 707 struct svcxprt_rdma *newxprt = NULL; 708 struct rdma_conn_param conn_param; 709 struct rpcrdma_connect_private pmsg; 710 struct ib_qp_init_attr qp_attr; 711 struct ib_device *dev; 712 struct sockaddr *sap; 713 unsigned int i, ctxts; 714 int ret = 0; 715 716 listen_rdma = container_of(xprt, struct svcxprt_rdma, sc_xprt); 717 clear_bit(XPT_CONN, &xprt->xpt_flags); 718 /* Get the next entry off the accept list */ 719 spin_lock_bh(&listen_rdma->sc_lock); 720 if (!list_empty(&listen_rdma->sc_accept_q)) { 721 newxprt = list_entry(listen_rdma->sc_accept_q.next, 722 struct svcxprt_rdma, sc_accept_q); 723 list_del_init(&newxprt->sc_accept_q); 724 } 725 if (!list_empty(&listen_rdma->sc_accept_q)) 726 set_bit(XPT_CONN, &listen_rdma->sc_xprt.xpt_flags); 727 spin_unlock_bh(&listen_rdma->sc_lock); 728 if (!newxprt) 729 return NULL; 730 731 dprintk("svcrdma: newxprt from accept queue = %p, cm_id=%p\n", 732 newxprt, newxprt->sc_cm_id); 733 734 dev = newxprt->sc_cm_id->device; 735 newxprt->sc_port_num = newxprt->sc_cm_id->port_num; 736 737 /* Qualify the transport resource defaults with the 738 * capabilities of this particular device */ 739 newxprt->sc_max_sge = min((size_t)dev->attrs.max_sge, 740 (size_t)RPCSVC_MAXPAGES); 741 newxprt->sc_max_req_size = svcrdma_max_req_size; 742 newxprt->sc_max_requests = svcrdma_max_requests; 743 newxprt->sc_max_bc_requests = svcrdma_max_bc_requests; 744 newxprt->sc_rq_depth = newxprt->sc_max_requests + 745 newxprt->sc_max_bc_requests; 746 if (newxprt->sc_rq_depth > dev->attrs.max_qp_wr) { 747 pr_warn("svcrdma: reducing receive depth to %d\n", 748 dev->attrs.max_qp_wr); 749 newxprt->sc_rq_depth = dev->attrs.max_qp_wr; 750 newxprt->sc_max_requests = newxprt->sc_rq_depth - 2; 751 newxprt->sc_max_bc_requests = 2; 752 } 753 newxprt->sc_fc_credits = cpu_to_be32(newxprt->sc_max_requests); 754 ctxts = rdma_rw_mr_factor(dev, newxprt->sc_port_num, RPCSVC_MAXPAGES); 755 ctxts *= newxprt->sc_max_requests; 756 newxprt->sc_sq_depth = newxprt->sc_rq_depth + ctxts; 757 if (newxprt->sc_sq_depth > dev->attrs.max_qp_wr) { 758 pr_warn("svcrdma: reducing send depth to %d\n", 759 dev->attrs.max_qp_wr); 760 newxprt->sc_sq_depth = dev->attrs.max_qp_wr; 761 } 762 atomic_set(&newxprt->sc_sq_avail, newxprt->sc_sq_depth); 763 764 if (!svc_rdma_prealloc_ctxts(newxprt)) 765 goto errout; 766 767 newxprt->sc_pd = ib_alloc_pd(dev, 0); 768 if (IS_ERR(newxprt->sc_pd)) { 769 dprintk("svcrdma: error creating PD for connect request\n"); 770 goto errout; 771 } 772 newxprt->sc_sq_cq = ib_alloc_cq(dev, newxprt, newxprt->sc_sq_depth, 773 0, IB_POLL_WORKQUEUE); 774 if (IS_ERR(newxprt->sc_sq_cq)) { 775 dprintk("svcrdma: error creating SQ CQ for connect request\n"); 776 goto errout; 777 } 778 newxprt->sc_rq_cq = ib_alloc_cq(dev, newxprt, newxprt->sc_rq_depth, 779 0, IB_POLL_WORKQUEUE); 780 if (IS_ERR(newxprt->sc_rq_cq)) { 781 dprintk("svcrdma: error creating RQ CQ for connect request\n"); 782 goto errout; 783 } 784 785 memset(&qp_attr, 0, sizeof qp_attr); 786 qp_attr.event_handler = qp_event_handler; 787 qp_attr.qp_context = &newxprt->sc_xprt; 788 qp_attr.port_num = newxprt->sc_port_num; 789 qp_attr.cap.max_rdma_ctxs = ctxts; 790 qp_attr.cap.max_send_wr = newxprt->sc_sq_depth - ctxts; 791 qp_attr.cap.max_recv_wr = newxprt->sc_rq_depth; 792 qp_attr.cap.max_send_sge = newxprt->sc_max_sge; 793 qp_attr.cap.max_recv_sge = newxprt->sc_max_sge; 794 qp_attr.sq_sig_type = IB_SIGNAL_REQ_WR; 795 qp_attr.qp_type = IB_QPT_RC; 796 qp_attr.send_cq = newxprt->sc_sq_cq; 797 qp_attr.recv_cq = newxprt->sc_rq_cq; 798 dprintk("svcrdma: newxprt->sc_cm_id=%p, newxprt->sc_pd=%p\n", 799 newxprt->sc_cm_id, newxprt->sc_pd); 800 dprintk(" cap.max_send_wr = %d, cap.max_recv_wr = %d\n", 801 qp_attr.cap.max_send_wr, qp_attr.cap.max_recv_wr); 802 dprintk(" cap.max_send_sge = %d, cap.max_recv_sge = %d\n", 803 qp_attr.cap.max_send_sge, qp_attr.cap.max_recv_sge); 804 805 ret = rdma_create_qp(newxprt->sc_cm_id, newxprt->sc_pd, &qp_attr); 806 if (ret) { 807 dprintk("svcrdma: failed to create QP, ret=%d\n", ret); 808 goto errout; 809 } 810 newxprt->sc_qp = newxprt->sc_cm_id->qp; 811 812 if (!(dev->attrs.device_cap_flags & IB_DEVICE_MEM_MGT_EXTENSIONS)) 813 newxprt->sc_snd_w_inv = false; 814 if (!rdma_protocol_iwarp(dev, newxprt->sc_port_num) && 815 !rdma_ib_or_roce(dev, newxprt->sc_port_num)) 816 goto errout; 817 818 /* Post receive buffers */ 819 for (i = 0; i < newxprt->sc_max_requests; i++) { 820 ret = svc_rdma_post_recv(newxprt); 821 if (ret) { 822 dprintk("svcrdma: failure posting receive buffers\n"); 823 goto errout; 824 } 825 } 826 827 /* Swap out the handler */ 828 newxprt->sc_cm_id->event_handler = rdma_cma_handler; 829 830 /* Construct RDMA-CM private message */ 831 pmsg.cp_magic = rpcrdma_cmp_magic; 832 pmsg.cp_version = RPCRDMA_CMP_VERSION; 833 pmsg.cp_flags = 0; 834 pmsg.cp_send_size = pmsg.cp_recv_size = 835 rpcrdma_encode_buffer_size(newxprt->sc_max_req_size); 836 837 /* Accept Connection */ 838 set_bit(RDMAXPRT_CONN_PENDING, &newxprt->sc_flags); 839 memset(&conn_param, 0, sizeof conn_param); 840 conn_param.responder_resources = 0; 841 conn_param.initiator_depth = min_t(int, newxprt->sc_ord, 842 dev->attrs.max_qp_init_rd_atom); 843 if (!conn_param.initiator_depth) { 844 dprintk("svcrdma: invalid ORD setting\n"); 845 ret = -EINVAL; 846 goto errout; 847 } 848 conn_param.private_data = &pmsg; 849 conn_param.private_data_len = sizeof(pmsg); 850 ret = rdma_accept(newxprt->sc_cm_id, &conn_param); 851 if (ret) 852 goto errout; 853 854 dprintk("svcrdma: new connection %p accepted:\n", newxprt); 855 sap = (struct sockaddr *)&newxprt->sc_cm_id->route.addr.src_addr; 856 dprintk(" local address : %pIS:%u\n", sap, rpc_get_port(sap)); 857 sap = (struct sockaddr *)&newxprt->sc_cm_id->route.addr.dst_addr; 858 dprintk(" remote address : %pIS:%u\n", sap, rpc_get_port(sap)); 859 dprintk(" max_sge : %d\n", newxprt->sc_max_sge); 860 dprintk(" sq_depth : %d\n", newxprt->sc_sq_depth); 861 dprintk(" rdma_rw_ctxs : %d\n", ctxts); 862 dprintk(" max_requests : %d\n", newxprt->sc_max_requests); 863 dprintk(" ord : %d\n", conn_param.initiator_depth); 864 865 return &newxprt->sc_xprt; 866 867 errout: 868 dprintk("svcrdma: failure accepting new connection rc=%d.\n", ret); 869 /* Take a reference in case the DTO handler runs */ 870 svc_xprt_get(&newxprt->sc_xprt); 871 if (newxprt->sc_qp && !IS_ERR(newxprt->sc_qp)) 872 ib_destroy_qp(newxprt->sc_qp); 873 rdma_destroy_id(newxprt->sc_cm_id); 874 /* This call to put will destroy the transport */ 875 svc_xprt_put(&newxprt->sc_xprt); 876 return NULL; 877 } 878 879 static void svc_rdma_release_rqst(struct svc_rqst *rqstp) 880 { 881 } 882 883 /* 884 * When connected, an svc_xprt has at least two references: 885 * 886 * - A reference held by the cm_id between the ESTABLISHED and 887 * DISCONNECTED events. If the remote peer disconnected first, this 888 * reference could be gone. 889 * 890 * - A reference held by the svc_recv code that called this function 891 * as part of close processing. 892 * 893 * At a minimum one references should still be held. 894 */ 895 static void svc_rdma_detach(struct svc_xprt *xprt) 896 { 897 struct svcxprt_rdma *rdma = 898 container_of(xprt, struct svcxprt_rdma, sc_xprt); 899 dprintk("svc: svc_rdma_detach(%p)\n", xprt); 900 901 /* Disconnect and flush posted WQE */ 902 rdma_disconnect(rdma->sc_cm_id); 903 } 904 905 static void __svc_rdma_free(struct work_struct *work) 906 { 907 struct svcxprt_rdma *rdma = 908 container_of(work, struct svcxprt_rdma, sc_work); 909 struct svc_xprt *xprt = &rdma->sc_xprt; 910 911 dprintk("svcrdma: %s(%p)\n", __func__, rdma); 912 913 if (rdma->sc_qp && !IS_ERR(rdma->sc_qp)) 914 ib_drain_qp(rdma->sc_qp); 915 916 /* We should only be called from kref_put */ 917 if (kref_read(&xprt->xpt_ref) != 0) 918 pr_err("svcrdma: sc_xprt still in use? (%d)\n", 919 kref_read(&xprt->xpt_ref)); 920 921 while (!list_empty(&rdma->sc_read_complete_q)) { 922 struct svc_rdma_op_ctxt *ctxt; 923 ctxt = list_first_entry(&rdma->sc_read_complete_q, 924 struct svc_rdma_op_ctxt, list); 925 list_del(&ctxt->list); 926 svc_rdma_put_context(ctxt, 1); 927 } 928 while (!list_empty(&rdma->sc_rq_dto_q)) { 929 struct svc_rdma_op_ctxt *ctxt; 930 ctxt = list_first_entry(&rdma->sc_rq_dto_q, 931 struct svc_rdma_op_ctxt, list); 932 list_del(&ctxt->list); 933 svc_rdma_put_context(ctxt, 1); 934 } 935 936 /* Warn if we leaked a resource or under-referenced */ 937 if (rdma->sc_ctxt_used != 0) 938 pr_err("svcrdma: ctxt still in use? (%d)\n", 939 rdma->sc_ctxt_used); 940 941 /* Final put of backchannel client transport */ 942 if (xprt->xpt_bc_xprt) { 943 xprt_put(xprt->xpt_bc_xprt); 944 xprt->xpt_bc_xprt = NULL; 945 } 946 947 svc_rdma_destroy_rw_ctxts(rdma); 948 svc_rdma_destroy_ctxts(rdma); 949 950 /* Destroy the QP if present (not a listener) */ 951 if (rdma->sc_qp && !IS_ERR(rdma->sc_qp)) 952 ib_destroy_qp(rdma->sc_qp); 953 954 if (rdma->sc_sq_cq && !IS_ERR(rdma->sc_sq_cq)) 955 ib_free_cq(rdma->sc_sq_cq); 956 957 if (rdma->sc_rq_cq && !IS_ERR(rdma->sc_rq_cq)) 958 ib_free_cq(rdma->sc_rq_cq); 959 960 if (rdma->sc_pd && !IS_ERR(rdma->sc_pd)) 961 ib_dealloc_pd(rdma->sc_pd); 962 963 /* Destroy the CM ID */ 964 rdma_destroy_id(rdma->sc_cm_id); 965 966 kfree(rdma); 967 } 968 969 static void svc_rdma_free(struct svc_xprt *xprt) 970 { 971 struct svcxprt_rdma *rdma = 972 container_of(xprt, struct svcxprt_rdma, sc_xprt); 973 INIT_WORK(&rdma->sc_work, __svc_rdma_free); 974 queue_work(svc_rdma_wq, &rdma->sc_work); 975 } 976 977 static int svc_rdma_has_wspace(struct svc_xprt *xprt) 978 { 979 struct svcxprt_rdma *rdma = 980 container_of(xprt, struct svcxprt_rdma, sc_xprt); 981 982 /* 983 * If there are already waiters on the SQ, 984 * return false. 985 */ 986 if (waitqueue_active(&rdma->sc_send_wait)) 987 return 0; 988 989 /* Otherwise return true. */ 990 return 1; 991 } 992 993 static void svc_rdma_secure_port(struct svc_rqst *rqstp) 994 { 995 set_bit(RQ_SECURE, &rqstp->rq_flags); 996 } 997 998 static void svc_rdma_kill_temp_xprt(struct svc_xprt *xprt) 999 { 1000 } 1001 1002 int svc_rdma_send(struct svcxprt_rdma *xprt, struct ib_send_wr *wr) 1003 { 1004 struct ib_send_wr *bad_wr, *n_wr; 1005 int wr_count; 1006 int i; 1007 int ret; 1008 1009 if (test_bit(XPT_CLOSE, &xprt->sc_xprt.xpt_flags)) 1010 return -ENOTCONN; 1011 1012 wr_count = 1; 1013 for (n_wr = wr->next; n_wr; n_wr = n_wr->next) 1014 wr_count++; 1015 1016 /* If the SQ is full, wait until an SQ entry is available */ 1017 while (1) { 1018 if ((atomic_sub_return(wr_count, &xprt->sc_sq_avail) < 0)) { 1019 atomic_inc(&rdma_stat_sq_starve); 1020 1021 /* Wait until SQ WR available if SQ still full */ 1022 atomic_add(wr_count, &xprt->sc_sq_avail); 1023 wait_event(xprt->sc_send_wait, 1024 atomic_read(&xprt->sc_sq_avail) > wr_count); 1025 if (test_bit(XPT_CLOSE, &xprt->sc_xprt.xpt_flags)) 1026 return -ENOTCONN; 1027 continue; 1028 } 1029 /* Take a transport ref for each WR posted */ 1030 for (i = 0; i < wr_count; i++) 1031 svc_xprt_get(&xprt->sc_xprt); 1032 1033 /* Bump used SQ WR count and post */ 1034 ret = ib_post_send(xprt->sc_qp, wr, &bad_wr); 1035 if (ret) { 1036 set_bit(XPT_CLOSE, &xprt->sc_xprt.xpt_flags); 1037 for (i = 0; i < wr_count; i ++) 1038 svc_xprt_put(&xprt->sc_xprt); 1039 dprintk("svcrdma: failed to post SQ WR rc=%d\n", ret); 1040 dprintk(" sc_sq_avail=%d, sc_sq_depth=%d\n", 1041 atomic_read(&xprt->sc_sq_avail), 1042 xprt->sc_sq_depth); 1043 wake_up(&xprt->sc_send_wait); 1044 } 1045 break; 1046 } 1047 return ret; 1048 } 1049