1 /* 2 * Copyright (c) 2014 Open Grid Computing, Inc. All rights reserved. 3 * Copyright (c) 2005-2007 Network Appliance, Inc. All rights reserved. 4 * 5 * This software is available to you under a choice of one of two 6 * licenses. You may choose to be licensed under the terms of the GNU 7 * General Public License (GPL) Version 2, available from the file 8 * COPYING in the main directory of this source tree, or the BSD-type 9 * license below: 10 * 11 * Redistribution and use in source and binary forms, with or without 12 * modification, are permitted provided that the following conditions 13 * are met: 14 * 15 * Redistributions of source code must retain the above copyright 16 * notice, this list of conditions and the following disclaimer. 17 * 18 * Redistributions in binary form must reproduce the above 19 * copyright notice, this list of conditions and the following 20 * disclaimer in the documentation and/or other materials provided 21 * with the distribution. 22 * 23 * Neither the name of the Network Appliance, Inc. nor the names of 24 * its contributors may be used to endorse or promote products 25 * derived from this software without specific prior written 26 * permission. 27 * 28 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 29 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 30 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 31 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 32 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 33 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 34 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 35 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 36 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 37 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 38 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 39 * 40 * Author: Tom Tucker <tom@opengridcomputing.com> 41 */ 42 43 #include <linux/sunrpc/svc_xprt.h> 44 #include <linux/sunrpc/addr.h> 45 #include <linux/sunrpc/debug.h> 46 #include <linux/sunrpc/rpc_rdma.h> 47 #include <linux/interrupt.h> 48 #include <linux/sched.h> 49 #include <linux/slab.h> 50 #include <linux/spinlock.h> 51 #include <linux/workqueue.h> 52 #include <rdma/ib_verbs.h> 53 #include <rdma/rdma_cm.h> 54 #include <rdma/rw.h> 55 #include <linux/sunrpc/svc_rdma.h> 56 #include <linux/export.h> 57 #include "xprt_rdma.h" 58 59 #define RPCDBG_FACILITY RPCDBG_SVCXPRT 60 61 static struct svcxprt_rdma *rdma_create_xprt(struct svc_serv *, int); 62 static struct svc_xprt *svc_rdma_create(struct svc_serv *serv, 63 struct net *net, 64 struct sockaddr *sa, int salen, 65 int flags); 66 static struct svc_xprt *svc_rdma_accept(struct svc_xprt *xprt); 67 static void svc_rdma_release_rqst(struct svc_rqst *); 68 static void svc_rdma_detach(struct svc_xprt *xprt); 69 static void svc_rdma_free(struct svc_xprt *xprt); 70 static int svc_rdma_has_wspace(struct svc_xprt *xprt); 71 static int svc_rdma_secure_port(struct svc_rqst *); 72 static void svc_rdma_kill_temp_xprt(struct svc_xprt *); 73 74 static const struct svc_xprt_ops svc_rdma_ops = { 75 .xpo_create = svc_rdma_create, 76 .xpo_recvfrom = svc_rdma_recvfrom, 77 .xpo_sendto = svc_rdma_sendto, 78 .xpo_release_rqst = svc_rdma_release_rqst, 79 .xpo_detach = svc_rdma_detach, 80 .xpo_free = svc_rdma_free, 81 .xpo_prep_reply_hdr = svc_rdma_prep_reply_hdr, 82 .xpo_has_wspace = svc_rdma_has_wspace, 83 .xpo_accept = svc_rdma_accept, 84 .xpo_secure_port = svc_rdma_secure_port, 85 .xpo_kill_temp_xprt = svc_rdma_kill_temp_xprt, 86 }; 87 88 struct svc_xprt_class svc_rdma_class = { 89 .xcl_name = "rdma", 90 .xcl_owner = THIS_MODULE, 91 .xcl_ops = &svc_rdma_ops, 92 .xcl_max_payload = RPCSVC_MAXPAYLOAD_RDMA, 93 .xcl_ident = XPRT_TRANSPORT_RDMA, 94 }; 95 96 #if defined(CONFIG_SUNRPC_BACKCHANNEL) 97 static struct svc_xprt *svc_rdma_bc_create(struct svc_serv *, struct net *, 98 struct sockaddr *, int, int); 99 static void svc_rdma_bc_detach(struct svc_xprt *); 100 static void svc_rdma_bc_free(struct svc_xprt *); 101 102 static const struct svc_xprt_ops svc_rdma_bc_ops = { 103 .xpo_create = svc_rdma_bc_create, 104 .xpo_detach = svc_rdma_bc_detach, 105 .xpo_free = svc_rdma_bc_free, 106 .xpo_prep_reply_hdr = svc_rdma_prep_reply_hdr, 107 .xpo_secure_port = svc_rdma_secure_port, 108 }; 109 110 struct svc_xprt_class svc_rdma_bc_class = { 111 .xcl_name = "rdma-bc", 112 .xcl_owner = THIS_MODULE, 113 .xcl_ops = &svc_rdma_bc_ops, 114 .xcl_max_payload = (1024 - RPCRDMA_HDRLEN_MIN) 115 }; 116 117 static struct svc_xprt *svc_rdma_bc_create(struct svc_serv *serv, 118 struct net *net, 119 struct sockaddr *sa, int salen, 120 int flags) 121 { 122 struct svcxprt_rdma *cma_xprt; 123 struct svc_xprt *xprt; 124 125 cma_xprt = rdma_create_xprt(serv, 0); 126 if (!cma_xprt) 127 return ERR_PTR(-ENOMEM); 128 xprt = &cma_xprt->sc_xprt; 129 130 svc_xprt_init(net, &svc_rdma_bc_class, xprt, serv); 131 set_bit(XPT_CONG_CTRL, &xprt->xpt_flags); 132 serv->sv_bc_xprt = xprt; 133 134 dprintk("svcrdma: %s(%p)\n", __func__, xprt); 135 return xprt; 136 } 137 138 static void svc_rdma_bc_detach(struct svc_xprt *xprt) 139 { 140 dprintk("svcrdma: %s(%p)\n", __func__, xprt); 141 } 142 143 static void svc_rdma_bc_free(struct svc_xprt *xprt) 144 { 145 struct svcxprt_rdma *rdma = 146 container_of(xprt, struct svcxprt_rdma, sc_xprt); 147 148 dprintk("svcrdma: %s(%p)\n", __func__, xprt); 149 if (xprt) 150 kfree(rdma); 151 } 152 #endif /* CONFIG_SUNRPC_BACKCHANNEL */ 153 154 static struct svc_rdma_op_ctxt *alloc_ctxt(struct svcxprt_rdma *xprt, 155 gfp_t flags) 156 { 157 struct svc_rdma_op_ctxt *ctxt; 158 159 ctxt = kmalloc(sizeof(*ctxt), flags); 160 if (ctxt) { 161 ctxt->xprt = xprt; 162 INIT_LIST_HEAD(&ctxt->list); 163 } 164 return ctxt; 165 } 166 167 static bool svc_rdma_prealloc_ctxts(struct svcxprt_rdma *xprt) 168 { 169 unsigned int i; 170 171 /* Each RPC/RDMA credit can consume one Receive and 172 * one Send WQE at the same time. 173 */ 174 i = xprt->sc_sq_depth + xprt->sc_rq_depth; 175 176 while (i--) { 177 struct svc_rdma_op_ctxt *ctxt; 178 179 ctxt = alloc_ctxt(xprt, GFP_KERNEL); 180 if (!ctxt) { 181 dprintk("svcrdma: No memory for RDMA ctxt\n"); 182 return false; 183 } 184 list_add(&ctxt->list, &xprt->sc_ctxts); 185 } 186 return true; 187 } 188 189 struct svc_rdma_op_ctxt *svc_rdma_get_context(struct svcxprt_rdma *xprt) 190 { 191 struct svc_rdma_op_ctxt *ctxt = NULL; 192 193 spin_lock(&xprt->sc_ctxt_lock); 194 xprt->sc_ctxt_used++; 195 if (list_empty(&xprt->sc_ctxts)) 196 goto out_empty; 197 198 ctxt = list_first_entry(&xprt->sc_ctxts, 199 struct svc_rdma_op_ctxt, list); 200 list_del(&ctxt->list); 201 spin_unlock(&xprt->sc_ctxt_lock); 202 203 out: 204 ctxt->count = 0; 205 ctxt->mapped_sges = 0; 206 return ctxt; 207 208 out_empty: 209 /* Either pre-allocation missed the mark, or send 210 * queue accounting is broken. 211 */ 212 spin_unlock(&xprt->sc_ctxt_lock); 213 214 ctxt = alloc_ctxt(xprt, GFP_NOIO); 215 if (ctxt) 216 goto out; 217 218 spin_lock(&xprt->sc_ctxt_lock); 219 xprt->sc_ctxt_used--; 220 spin_unlock(&xprt->sc_ctxt_lock); 221 WARN_ONCE(1, "svcrdma: empty RDMA ctxt list?\n"); 222 return NULL; 223 } 224 225 void svc_rdma_unmap_dma(struct svc_rdma_op_ctxt *ctxt) 226 { 227 struct svcxprt_rdma *xprt = ctxt->xprt; 228 struct ib_device *device = xprt->sc_cm_id->device; 229 unsigned int i; 230 231 for (i = 0; i < ctxt->mapped_sges; i++) 232 ib_dma_unmap_page(device, 233 ctxt->sge[i].addr, 234 ctxt->sge[i].length, 235 ctxt->direction); 236 ctxt->mapped_sges = 0; 237 } 238 239 void svc_rdma_put_context(struct svc_rdma_op_ctxt *ctxt, int free_pages) 240 { 241 struct svcxprt_rdma *xprt = ctxt->xprt; 242 int i; 243 244 if (free_pages) 245 for (i = 0; i < ctxt->count; i++) 246 put_page(ctxt->pages[i]); 247 248 spin_lock(&xprt->sc_ctxt_lock); 249 xprt->sc_ctxt_used--; 250 list_add(&ctxt->list, &xprt->sc_ctxts); 251 spin_unlock(&xprt->sc_ctxt_lock); 252 } 253 254 static void svc_rdma_destroy_ctxts(struct svcxprt_rdma *xprt) 255 { 256 while (!list_empty(&xprt->sc_ctxts)) { 257 struct svc_rdma_op_ctxt *ctxt; 258 259 ctxt = list_first_entry(&xprt->sc_ctxts, 260 struct svc_rdma_op_ctxt, list); 261 list_del(&ctxt->list); 262 kfree(ctxt); 263 } 264 } 265 266 /* QP event handler */ 267 static void qp_event_handler(struct ib_event *event, void *context) 268 { 269 struct svc_xprt *xprt = context; 270 271 switch (event->event) { 272 /* These are considered benign events */ 273 case IB_EVENT_PATH_MIG: 274 case IB_EVENT_COMM_EST: 275 case IB_EVENT_SQ_DRAINED: 276 case IB_EVENT_QP_LAST_WQE_REACHED: 277 dprintk("svcrdma: QP event %s (%d) received for QP=%p\n", 278 ib_event_msg(event->event), event->event, 279 event->element.qp); 280 break; 281 /* These are considered fatal events */ 282 case IB_EVENT_PATH_MIG_ERR: 283 case IB_EVENT_QP_FATAL: 284 case IB_EVENT_QP_REQ_ERR: 285 case IB_EVENT_QP_ACCESS_ERR: 286 case IB_EVENT_DEVICE_FATAL: 287 default: 288 dprintk("svcrdma: QP ERROR event %s (%d) received for QP=%p, " 289 "closing transport\n", 290 ib_event_msg(event->event), event->event, 291 event->element.qp); 292 set_bit(XPT_CLOSE, &xprt->xpt_flags); 293 break; 294 } 295 } 296 297 /** 298 * svc_rdma_wc_receive - Invoked by RDMA provider for each polled Receive WC 299 * @cq: completion queue 300 * @wc: completed WR 301 * 302 */ 303 static void svc_rdma_wc_receive(struct ib_cq *cq, struct ib_wc *wc) 304 { 305 struct svcxprt_rdma *xprt = cq->cq_context; 306 struct ib_cqe *cqe = wc->wr_cqe; 307 struct svc_rdma_op_ctxt *ctxt; 308 309 /* WARNING: Only wc->wr_cqe and wc->status are reliable */ 310 ctxt = container_of(cqe, struct svc_rdma_op_ctxt, cqe); 311 svc_rdma_unmap_dma(ctxt); 312 313 if (wc->status != IB_WC_SUCCESS) 314 goto flushed; 315 316 /* All wc fields are now known to be valid */ 317 ctxt->byte_len = wc->byte_len; 318 spin_lock(&xprt->sc_rq_dto_lock); 319 list_add_tail(&ctxt->list, &xprt->sc_rq_dto_q); 320 spin_unlock(&xprt->sc_rq_dto_lock); 321 322 set_bit(XPT_DATA, &xprt->sc_xprt.xpt_flags); 323 if (test_bit(RDMAXPRT_CONN_PENDING, &xprt->sc_flags)) 324 goto out; 325 svc_xprt_enqueue(&xprt->sc_xprt); 326 goto out; 327 328 flushed: 329 if (wc->status != IB_WC_WR_FLUSH_ERR) 330 pr_warn("svcrdma: receive: %s (%u/0x%x)\n", 331 ib_wc_status_msg(wc->status), 332 wc->status, wc->vendor_err); 333 set_bit(XPT_CLOSE, &xprt->sc_xprt.xpt_flags); 334 svc_rdma_put_context(ctxt, 1); 335 336 out: 337 svc_xprt_put(&xprt->sc_xprt); 338 } 339 340 /** 341 * svc_rdma_wc_send - Invoked by RDMA provider for each polled Send WC 342 * @cq: completion queue 343 * @wc: completed WR 344 * 345 */ 346 void svc_rdma_wc_send(struct ib_cq *cq, struct ib_wc *wc) 347 { 348 struct svcxprt_rdma *xprt = cq->cq_context; 349 struct ib_cqe *cqe = wc->wr_cqe; 350 struct svc_rdma_op_ctxt *ctxt; 351 352 atomic_inc(&xprt->sc_sq_avail); 353 wake_up(&xprt->sc_send_wait); 354 355 ctxt = container_of(cqe, struct svc_rdma_op_ctxt, cqe); 356 svc_rdma_unmap_dma(ctxt); 357 svc_rdma_put_context(ctxt, 1); 358 359 if (unlikely(wc->status != IB_WC_SUCCESS)) { 360 set_bit(XPT_CLOSE, &xprt->sc_xprt.xpt_flags); 361 if (wc->status != IB_WC_WR_FLUSH_ERR) 362 pr_err("svcrdma: Send: %s (%u/0x%x)\n", 363 ib_wc_status_msg(wc->status), 364 wc->status, wc->vendor_err); 365 } 366 367 svc_xprt_put(&xprt->sc_xprt); 368 } 369 370 static struct svcxprt_rdma *rdma_create_xprt(struct svc_serv *serv, 371 int listener) 372 { 373 struct svcxprt_rdma *cma_xprt = kzalloc(sizeof *cma_xprt, GFP_KERNEL); 374 375 if (!cma_xprt) 376 return NULL; 377 svc_xprt_init(&init_net, &svc_rdma_class, &cma_xprt->sc_xprt, serv); 378 INIT_LIST_HEAD(&cma_xprt->sc_accept_q); 379 INIT_LIST_HEAD(&cma_xprt->sc_rq_dto_q); 380 INIT_LIST_HEAD(&cma_xprt->sc_read_complete_q); 381 INIT_LIST_HEAD(&cma_xprt->sc_ctxts); 382 INIT_LIST_HEAD(&cma_xprt->sc_rw_ctxts); 383 init_waitqueue_head(&cma_xprt->sc_send_wait); 384 385 spin_lock_init(&cma_xprt->sc_lock); 386 spin_lock_init(&cma_xprt->sc_rq_dto_lock); 387 spin_lock_init(&cma_xprt->sc_ctxt_lock); 388 spin_lock_init(&cma_xprt->sc_rw_ctxt_lock); 389 390 /* 391 * Note that this implies that the underlying transport support 392 * has some form of congestion control (see RFC 7530 section 3.1 393 * paragraph 2). For now, we assume that all supported RDMA 394 * transports are suitable here. 395 */ 396 set_bit(XPT_CONG_CTRL, &cma_xprt->sc_xprt.xpt_flags); 397 398 if (listener) 399 set_bit(XPT_LISTENER, &cma_xprt->sc_xprt.xpt_flags); 400 401 return cma_xprt; 402 } 403 404 int svc_rdma_post_recv(struct svcxprt_rdma *xprt, gfp_t flags) 405 { 406 struct ib_recv_wr recv_wr, *bad_recv_wr; 407 struct svc_rdma_op_ctxt *ctxt; 408 struct page *page; 409 dma_addr_t pa; 410 int sge_no; 411 int buflen; 412 int ret; 413 414 ctxt = svc_rdma_get_context(xprt); 415 buflen = 0; 416 ctxt->direction = DMA_FROM_DEVICE; 417 ctxt->cqe.done = svc_rdma_wc_receive; 418 for (sge_no = 0; buflen < xprt->sc_max_req_size; sge_no++) { 419 if (sge_no >= xprt->sc_max_sge) { 420 pr_err("svcrdma: Too many sges (%d)\n", sge_no); 421 goto err_put_ctxt; 422 } 423 page = alloc_page(flags); 424 if (!page) 425 goto err_put_ctxt; 426 ctxt->pages[sge_no] = page; 427 pa = ib_dma_map_page(xprt->sc_cm_id->device, 428 page, 0, PAGE_SIZE, 429 DMA_FROM_DEVICE); 430 if (ib_dma_mapping_error(xprt->sc_cm_id->device, pa)) 431 goto err_put_ctxt; 432 svc_rdma_count_mappings(xprt, ctxt); 433 ctxt->sge[sge_no].addr = pa; 434 ctxt->sge[sge_no].length = PAGE_SIZE; 435 ctxt->sge[sge_no].lkey = xprt->sc_pd->local_dma_lkey; 436 ctxt->count = sge_no + 1; 437 buflen += PAGE_SIZE; 438 } 439 recv_wr.next = NULL; 440 recv_wr.sg_list = &ctxt->sge[0]; 441 recv_wr.num_sge = ctxt->count; 442 recv_wr.wr_cqe = &ctxt->cqe; 443 444 svc_xprt_get(&xprt->sc_xprt); 445 ret = ib_post_recv(xprt->sc_qp, &recv_wr, &bad_recv_wr); 446 if (ret) { 447 svc_rdma_unmap_dma(ctxt); 448 svc_rdma_put_context(ctxt, 1); 449 svc_xprt_put(&xprt->sc_xprt); 450 } 451 return ret; 452 453 err_put_ctxt: 454 svc_rdma_unmap_dma(ctxt); 455 svc_rdma_put_context(ctxt, 1); 456 return -ENOMEM; 457 } 458 459 int svc_rdma_repost_recv(struct svcxprt_rdma *xprt, gfp_t flags) 460 { 461 int ret = 0; 462 463 ret = svc_rdma_post_recv(xprt, flags); 464 if (ret) { 465 pr_err("svcrdma: could not post a receive buffer, err=%d.\n", 466 ret); 467 pr_err("svcrdma: closing transport %p.\n", xprt); 468 set_bit(XPT_CLOSE, &xprt->sc_xprt.xpt_flags); 469 ret = -ENOTCONN; 470 } 471 return ret; 472 } 473 474 static void 475 svc_rdma_parse_connect_private(struct svcxprt_rdma *newxprt, 476 struct rdma_conn_param *param) 477 { 478 const struct rpcrdma_connect_private *pmsg = param->private_data; 479 480 if (pmsg && 481 pmsg->cp_magic == rpcrdma_cmp_magic && 482 pmsg->cp_version == RPCRDMA_CMP_VERSION) { 483 newxprt->sc_snd_w_inv = pmsg->cp_flags & 484 RPCRDMA_CMP_F_SND_W_INV_OK; 485 486 dprintk("svcrdma: client send_size %u, recv_size %u " 487 "remote inv %ssupported\n", 488 rpcrdma_decode_buffer_size(pmsg->cp_send_size), 489 rpcrdma_decode_buffer_size(pmsg->cp_recv_size), 490 newxprt->sc_snd_w_inv ? "" : "un"); 491 } 492 } 493 494 /* 495 * This function handles the CONNECT_REQUEST event on a listening 496 * endpoint. It is passed the cma_id for the _new_ connection. The context in 497 * this cma_id is inherited from the listening cma_id and is the svc_xprt 498 * structure for the listening endpoint. 499 * 500 * This function creates a new xprt for the new connection and enqueues it on 501 * the accept queue for the listent xprt. When the listen thread is kicked, it 502 * will call the recvfrom method on the listen xprt which will accept the new 503 * connection. 504 */ 505 static void handle_connect_req(struct rdma_cm_id *new_cma_id, 506 struct rdma_conn_param *param) 507 { 508 struct svcxprt_rdma *listen_xprt = new_cma_id->context; 509 struct svcxprt_rdma *newxprt; 510 struct sockaddr *sa; 511 512 /* Create a new transport */ 513 newxprt = rdma_create_xprt(listen_xprt->sc_xprt.xpt_server, 0); 514 if (!newxprt) { 515 dprintk("svcrdma: failed to create new transport\n"); 516 return; 517 } 518 newxprt->sc_cm_id = new_cma_id; 519 new_cma_id->context = newxprt; 520 dprintk("svcrdma: Creating newxprt=%p, cm_id=%p, listenxprt=%p\n", 521 newxprt, newxprt->sc_cm_id, listen_xprt); 522 svc_rdma_parse_connect_private(newxprt, param); 523 524 /* Save client advertised inbound read limit for use later in accept. */ 525 newxprt->sc_ord = param->initiator_depth; 526 527 /* Set the local and remote addresses in the transport */ 528 sa = (struct sockaddr *)&newxprt->sc_cm_id->route.addr.dst_addr; 529 svc_xprt_set_remote(&newxprt->sc_xprt, sa, svc_addr_len(sa)); 530 sa = (struct sockaddr *)&newxprt->sc_cm_id->route.addr.src_addr; 531 svc_xprt_set_local(&newxprt->sc_xprt, sa, svc_addr_len(sa)); 532 533 /* 534 * Enqueue the new transport on the accept queue of the listening 535 * transport 536 */ 537 spin_lock_bh(&listen_xprt->sc_lock); 538 list_add_tail(&newxprt->sc_accept_q, &listen_xprt->sc_accept_q); 539 spin_unlock_bh(&listen_xprt->sc_lock); 540 541 set_bit(XPT_CONN, &listen_xprt->sc_xprt.xpt_flags); 542 svc_xprt_enqueue(&listen_xprt->sc_xprt); 543 } 544 545 /* 546 * Handles events generated on the listening endpoint. These events will be 547 * either be incoming connect requests or adapter removal events. 548 */ 549 static int rdma_listen_handler(struct rdma_cm_id *cma_id, 550 struct rdma_cm_event *event) 551 { 552 struct svcxprt_rdma *xprt = cma_id->context; 553 int ret = 0; 554 555 switch (event->event) { 556 case RDMA_CM_EVENT_CONNECT_REQUEST: 557 dprintk("svcrdma: Connect request on cma_id=%p, xprt = %p, " 558 "event = %s (%d)\n", cma_id, cma_id->context, 559 rdma_event_msg(event->event), event->event); 560 handle_connect_req(cma_id, &event->param.conn); 561 break; 562 563 case RDMA_CM_EVENT_ESTABLISHED: 564 /* Accept complete */ 565 dprintk("svcrdma: Connection completed on LISTEN xprt=%p, " 566 "cm_id=%p\n", xprt, cma_id); 567 break; 568 569 case RDMA_CM_EVENT_DEVICE_REMOVAL: 570 dprintk("svcrdma: Device removal xprt=%p, cm_id=%p\n", 571 xprt, cma_id); 572 if (xprt) 573 set_bit(XPT_CLOSE, &xprt->sc_xprt.xpt_flags); 574 break; 575 576 default: 577 dprintk("svcrdma: Unexpected event on listening endpoint %p, " 578 "event = %s (%d)\n", cma_id, 579 rdma_event_msg(event->event), event->event); 580 break; 581 } 582 583 return ret; 584 } 585 586 static int rdma_cma_handler(struct rdma_cm_id *cma_id, 587 struct rdma_cm_event *event) 588 { 589 struct svc_xprt *xprt = cma_id->context; 590 struct svcxprt_rdma *rdma = 591 container_of(xprt, struct svcxprt_rdma, sc_xprt); 592 switch (event->event) { 593 case RDMA_CM_EVENT_ESTABLISHED: 594 /* Accept complete */ 595 svc_xprt_get(xprt); 596 dprintk("svcrdma: Connection completed on DTO xprt=%p, " 597 "cm_id=%p\n", xprt, cma_id); 598 clear_bit(RDMAXPRT_CONN_PENDING, &rdma->sc_flags); 599 svc_xprt_enqueue(xprt); 600 break; 601 case RDMA_CM_EVENT_DISCONNECTED: 602 dprintk("svcrdma: Disconnect on DTO xprt=%p, cm_id=%p\n", 603 xprt, cma_id); 604 if (xprt) { 605 set_bit(XPT_CLOSE, &xprt->xpt_flags); 606 svc_xprt_enqueue(xprt); 607 svc_xprt_put(xprt); 608 } 609 break; 610 case RDMA_CM_EVENT_DEVICE_REMOVAL: 611 dprintk("svcrdma: Device removal cma_id=%p, xprt = %p, " 612 "event = %s (%d)\n", cma_id, xprt, 613 rdma_event_msg(event->event), event->event); 614 if (xprt) { 615 set_bit(XPT_CLOSE, &xprt->xpt_flags); 616 svc_xprt_enqueue(xprt); 617 svc_xprt_put(xprt); 618 } 619 break; 620 default: 621 dprintk("svcrdma: Unexpected event on DTO endpoint %p, " 622 "event = %s (%d)\n", cma_id, 623 rdma_event_msg(event->event), event->event); 624 break; 625 } 626 return 0; 627 } 628 629 /* 630 * Create a listening RDMA service endpoint. 631 */ 632 static struct svc_xprt *svc_rdma_create(struct svc_serv *serv, 633 struct net *net, 634 struct sockaddr *sa, int salen, 635 int flags) 636 { 637 struct rdma_cm_id *listen_id; 638 struct svcxprt_rdma *cma_xprt; 639 int ret; 640 641 dprintk("svcrdma: Creating RDMA socket\n"); 642 if ((sa->sa_family != AF_INET) && (sa->sa_family != AF_INET6)) { 643 dprintk("svcrdma: Address family %d is not supported.\n", sa->sa_family); 644 return ERR_PTR(-EAFNOSUPPORT); 645 } 646 cma_xprt = rdma_create_xprt(serv, 1); 647 if (!cma_xprt) 648 return ERR_PTR(-ENOMEM); 649 650 listen_id = rdma_create_id(&init_net, rdma_listen_handler, cma_xprt, 651 RDMA_PS_TCP, IB_QPT_RC); 652 if (IS_ERR(listen_id)) { 653 ret = PTR_ERR(listen_id); 654 dprintk("svcrdma: rdma_create_id failed = %d\n", ret); 655 goto err0; 656 } 657 658 /* Allow both IPv4 and IPv6 sockets to bind a single port 659 * at the same time. 660 */ 661 #if IS_ENABLED(CONFIG_IPV6) 662 ret = rdma_set_afonly(listen_id, 1); 663 if (ret) { 664 dprintk("svcrdma: rdma_set_afonly failed = %d\n", ret); 665 goto err1; 666 } 667 #endif 668 ret = rdma_bind_addr(listen_id, sa); 669 if (ret) { 670 dprintk("svcrdma: rdma_bind_addr failed = %d\n", ret); 671 goto err1; 672 } 673 cma_xprt->sc_cm_id = listen_id; 674 675 ret = rdma_listen(listen_id, RPCRDMA_LISTEN_BACKLOG); 676 if (ret) { 677 dprintk("svcrdma: rdma_listen failed = %d\n", ret); 678 goto err1; 679 } 680 681 /* 682 * We need to use the address from the cm_id in case the 683 * caller specified 0 for the port number. 684 */ 685 sa = (struct sockaddr *)&cma_xprt->sc_cm_id->route.addr.src_addr; 686 svc_xprt_set_local(&cma_xprt->sc_xprt, sa, salen); 687 688 return &cma_xprt->sc_xprt; 689 690 err1: 691 rdma_destroy_id(listen_id); 692 err0: 693 kfree(cma_xprt); 694 return ERR_PTR(ret); 695 } 696 697 /* 698 * This is the xpo_recvfrom function for listening endpoints. Its 699 * purpose is to accept incoming connections. The CMA callback handler 700 * has already created a new transport and attached it to the new CMA 701 * ID. 702 * 703 * There is a queue of pending connections hung on the listening 704 * transport. This queue contains the new svc_xprt structure. This 705 * function takes svc_xprt structures off the accept_q and completes 706 * the connection. 707 */ 708 static struct svc_xprt *svc_rdma_accept(struct svc_xprt *xprt) 709 { 710 struct svcxprt_rdma *listen_rdma; 711 struct svcxprt_rdma *newxprt = NULL; 712 struct rdma_conn_param conn_param; 713 struct rpcrdma_connect_private pmsg; 714 struct ib_qp_init_attr qp_attr; 715 struct ib_device *dev; 716 struct sockaddr *sap; 717 unsigned int i, ctxts; 718 int ret = 0; 719 720 listen_rdma = container_of(xprt, struct svcxprt_rdma, sc_xprt); 721 clear_bit(XPT_CONN, &xprt->xpt_flags); 722 /* Get the next entry off the accept list */ 723 spin_lock_bh(&listen_rdma->sc_lock); 724 if (!list_empty(&listen_rdma->sc_accept_q)) { 725 newxprt = list_entry(listen_rdma->sc_accept_q.next, 726 struct svcxprt_rdma, sc_accept_q); 727 list_del_init(&newxprt->sc_accept_q); 728 } 729 if (!list_empty(&listen_rdma->sc_accept_q)) 730 set_bit(XPT_CONN, &listen_rdma->sc_xprt.xpt_flags); 731 spin_unlock_bh(&listen_rdma->sc_lock); 732 if (!newxprt) 733 return NULL; 734 735 dprintk("svcrdma: newxprt from accept queue = %p, cm_id=%p\n", 736 newxprt, newxprt->sc_cm_id); 737 738 dev = newxprt->sc_cm_id->device; 739 newxprt->sc_port_num = newxprt->sc_cm_id->port_num; 740 741 /* Qualify the transport resource defaults with the 742 * capabilities of this particular device */ 743 newxprt->sc_max_sge = min((size_t)dev->attrs.max_sge, 744 (size_t)RPCSVC_MAXPAGES); 745 newxprt->sc_max_req_size = svcrdma_max_req_size; 746 newxprt->sc_max_requests = svcrdma_max_requests; 747 newxprt->sc_max_bc_requests = svcrdma_max_bc_requests; 748 newxprt->sc_rq_depth = newxprt->sc_max_requests + 749 newxprt->sc_max_bc_requests; 750 if (newxprt->sc_rq_depth > dev->attrs.max_qp_wr) { 751 pr_warn("svcrdma: reducing receive depth to %d\n", 752 dev->attrs.max_qp_wr); 753 newxprt->sc_rq_depth = dev->attrs.max_qp_wr; 754 newxprt->sc_max_requests = newxprt->sc_rq_depth - 2; 755 newxprt->sc_max_bc_requests = 2; 756 } 757 newxprt->sc_fc_credits = cpu_to_be32(newxprt->sc_max_requests); 758 ctxts = rdma_rw_mr_factor(dev, newxprt->sc_port_num, RPCSVC_MAXPAGES); 759 ctxts *= newxprt->sc_max_requests; 760 newxprt->sc_sq_depth = newxprt->sc_rq_depth + ctxts; 761 if (newxprt->sc_sq_depth > dev->attrs.max_qp_wr) { 762 pr_warn("svcrdma: reducing send depth to %d\n", 763 dev->attrs.max_qp_wr); 764 newxprt->sc_sq_depth = dev->attrs.max_qp_wr; 765 } 766 atomic_set(&newxprt->sc_sq_avail, newxprt->sc_sq_depth); 767 768 if (!svc_rdma_prealloc_ctxts(newxprt)) 769 goto errout; 770 771 /* 772 * Limit ORD based on client limit, local device limit, and 773 * configured svcrdma limit. 774 */ 775 newxprt->sc_ord = min_t(size_t, dev->attrs.max_qp_rd_atom, newxprt->sc_ord); 776 newxprt->sc_ord = min_t(size_t, svcrdma_ord, newxprt->sc_ord); 777 778 newxprt->sc_pd = ib_alloc_pd(dev, 0); 779 if (IS_ERR(newxprt->sc_pd)) { 780 dprintk("svcrdma: error creating PD for connect request\n"); 781 goto errout; 782 } 783 newxprt->sc_sq_cq = ib_alloc_cq(dev, newxprt, newxprt->sc_sq_depth, 784 0, IB_POLL_WORKQUEUE); 785 if (IS_ERR(newxprt->sc_sq_cq)) { 786 dprintk("svcrdma: error creating SQ CQ for connect request\n"); 787 goto errout; 788 } 789 newxprt->sc_rq_cq = ib_alloc_cq(dev, newxprt, newxprt->sc_rq_depth, 790 0, IB_POLL_WORKQUEUE); 791 if (IS_ERR(newxprt->sc_rq_cq)) { 792 dprintk("svcrdma: error creating RQ CQ for connect request\n"); 793 goto errout; 794 } 795 796 memset(&qp_attr, 0, sizeof qp_attr); 797 qp_attr.event_handler = qp_event_handler; 798 qp_attr.qp_context = &newxprt->sc_xprt; 799 qp_attr.port_num = newxprt->sc_port_num; 800 qp_attr.cap.max_rdma_ctxs = ctxts; 801 qp_attr.cap.max_send_wr = newxprt->sc_sq_depth - ctxts; 802 qp_attr.cap.max_recv_wr = newxprt->sc_rq_depth; 803 qp_attr.cap.max_send_sge = newxprt->sc_max_sge; 804 qp_attr.cap.max_recv_sge = newxprt->sc_max_sge; 805 qp_attr.sq_sig_type = IB_SIGNAL_REQ_WR; 806 qp_attr.qp_type = IB_QPT_RC; 807 qp_attr.send_cq = newxprt->sc_sq_cq; 808 qp_attr.recv_cq = newxprt->sc_rq_cq; 809 dprintk("svcrdma: newxprt->sc_cm_id=%p, newxprt->sc_pd=%p\n", 810 newxprt->sc_cm_id, newxprt->sc_pd); 811 dprintk(" cap.max_send_wr = %d, cap.max_recv_wr = %d\n", 812 qp_attr.cap.max_send_wr, qp_attr.cap.max_recv_wr); 813 dprintk(" cap.max_send_sge = %d, cap.max_recv_sge = %d\n", 814 qp_attr.cap.max_send_sge, qp_attr.cap.max_recv_sge); 815 816 ret = rdma_create_qp(newxprt->sc_cm_id, newxprt->sc_pd, &qp_attr); 817 if (ret) { 818 dprintk("svcrdma: failed to create QP, ret=%d\n", ret); 819 goto errout; 820 } 821 newxprt->sc_qp = newxprt->sc_cm_id->qp; 822 823 if (!(dev->attrs.device_cap_flags & IB_DEVICE_MEM_MGT_EXTENSIONS)) 824 newxprt->sc_snd_w_inv = false; 825 if (!rdma_protocol_iwarp(dev, newxprt->sc_port_num) && 826 !rdma_ib_or_roce(dev, newxprt->sc_port_num)) 827 goto errout; 828 829 /* Post receive buffers */ 830 for (i = 0; i < newxprt->sc_max_requests; i++) { 831 ret = svc_rdma_post_recv(newxprt, GFP_KERNEL); 832 if (ret) { 833 dprintk("svcrdma: failure posting receive buffers\n"); 834 goto errout; 835 } 836 } 837 838 /* Swap out the handler */ 839 newxprt->sc_cm_id->event_handler = rdma_cma_handler; 840 841 /* Construct RDMA-CM private message */ 842 pmsg.cp_magic = rpcrdma_cmp_magic; 843 pmsg.cp_version = RPCRDMA_CMP_VERSION; 844 pmsg.cp_flags = 0; 845 pmsg.cp_send_size = pmsg.cp_recv_size = 846 rpcrdma_encode_buffer_size(newxprt->sc_max_req_size); 847 848 /* Accept Connection */ 849 set_bit(RDMAXPRT_CONN_PENDING, &newxprt->sc_flags); 850 memset(&conn_param, 0, sizeof conn_param); 851 conn_param.responder_resources = 0; 852 conn_param.initiator_depth = newxprt->sc_ord; 853 conn_param.private_data = &pmsg; 854 conn_param.private_data_len = sizeof(pmsg); 855 ret = rdma_accept(newxprt->sc_cm_id, &conn_param); 856 if (ret) { 857 dprintk("svcrdma: failed to accept new connection, ret=%d\n", 858 ret); 859 goto errout; 860 } 861 862 dprintk("svcrdma: new connection %p accepted:\n", newxprt); 863 sap = (struct sockaddr *)&newxprt->sc_cm_id->route.addr.src_addr; 864 dprintk(" local address : %pIS:%u\n", sap, rpc_get_port(sap)); 865 sap = (struct sockaddr *)&newxprt->sc_cm_id->route.addr.dst_addr; 866 dprintk(" remote address : %pIS:%u\n", sap, rpc_get_port(sap)); 867 dprintk(" max_sge : %d\n", newxprt->sc_max_sge); 868 dprintk(" sq_depth : %d\n", newxprt->sc_sq_depth); 869 dprintk(" rdma_rw_ctxs : %d\n", ctxts); 870 dprintk(" max_requests : %d\n", newxprt->sc_max_requests); 871 dprintk(" ord : %d\n", newxprt->sc_ord); 872 873 return &newxprt->sc_xprt; 874 875 errout: 876 dprintk("svcrdma: failure accepting new connection rc=%d.\n", ret); 877 /* Take a reference in case the DTO handler runs */ 878 svc_xprt_get(&newxprt->sc_xprt); 879 if (newxprt->sc_qp && !IS_ERR(newxprt->sc_qp)) 880 ib_destroy_qp(newxprt->sc_qp); 881 rdma_destroy_id(newxprt->sc_cm_id); 882 /* This call to put will destroy the transport */ 883 svc_xprt_put(&newxprt->sc_xprt); 884 return NULL; 885 } 886 887 static void svc_rdma_release_rqst(struct svc_rqst *rqstp) 888 { 889 } 890 891 /* 892 * When connected, an svc_xprt has at least two references: 893 * 894 * - A reference held by the cm_id between the ESTABLISHED and 895 * DISCONNECTED events. If the remote peer disconnected first, this 896 * reference could be gone. 897 * 898 * - A reference held by the svc_recv code that called this function 899 * as part of close processing. 900 * 901 * At a minimum one references should still be held. 902 */ 903 static void svc_rdma_detach(struct svc_xprt *xprt) 904 { 905 struct svcxprt_rdma *rdma = 906 container_of(xprt, struct svcxprt_rdma, sc_xprt); 907 dprintk("svc: svc_rdma_detach(%p)\n", xprt); 908 909 /* Disconnect and flush posted WQE */ 910 rdma_disconnect(rdma->sc_cm_id); 911 } 912 913 static void __svc_rdma_free(struct work_struct *work) 914 { 915 struct svcxprt_rdma *rdma = 916 container_of(work, struct svcxprt_rdma, sc_work); 917 struct svc_xprt *xprt = &rdma->sc_xprt; 918 919 dprintk("svcrdma: %s(%p)\n", __func__, rdma); 920 921 if (rdma->sc_qp && !IS_ERR(rdma->sc_qp)) 922 ib_drain_qp(rdma->sc_qp); 923 924 /* We should only be called from kref_put */ 925 if (kref_read(&xprt->xpt_ref) != 0) 926 pr_err("svcrdma: sc_xprt still in use? (%d)\n", 927 kref_read(&xprt->xpt_ref)); 928 929 while (!list_empty(&rdma->sc_read_complete_q)) { 930 struct svc_rdma_op_ctxt *ctxt; 931 ctxt = list_first_entry(&rdma->sc_read_complete_q, 932 struct svc_rdma_op_ctxt, list); 933 list_del(&ctxt->list); 934 svc_rdma_put_context(ctxt, 1); 935 } 936 while (!list_empty(&rdma->sc_rq_dto_q)) { 937 struct svc_rdma_op_ctxt *ctxt; 938 ctxt = list_first_entry(&rdma->sc_rq_dto_q, 939 struct svc_rdma_op_ctxt, list); 940 list_del(&ctxt->list); 941 svc_rdma_put_context(ctxt, 1); 942 } 943 944 /* Warn if we leaked a resource or under-referenced */ 945 if (rdma->sc_ctxt_used != 0) 946 pr_err("svcrdma: ctxt still in use? (%d)\n", 947 rdma->sc_ctxt_used); 948 949 /* Final put of backchannel client transport */ 950 if (xprt->xpt_bc_xprt) { 951 xprt_put(xprt->xpt_bc_xprt); 952 xprt->xpt_bc_xprt = NULL; 953 } 954 955 svc_rdma_destroy_rw_ctxts(rdma); 956 svc_rdma_destroy_ctxts(rdma); 957 958 /* Destroy the QP if present (not a listener) */ 959 if (rdma->sc_qp && !IS_ERR(rdma->sc_qp)) 960 ib_destroy_qp(rdma->sc_qp); 961 962 if (rdma->sc_sq_cq && !IS_ERR(rdma->sc_sq_cq)) 963 ib_free_cq(rdma->sc_sq_cq); 964 965 if (rdma->sc_rq_cq && !IS_ERR(rdma->sc_rq_cq)) 966 ib_free_cq(rdma->sc_rq_cq); 967 968 if (rdma->sc_pd && !IS_ERR(rdma->sc_pd)) 969 ib_dealloc_pd(rdma->sc_pd); 970 971 /* Destroy the CM ID */ 972 rdma_destroy_id(rdma->sc_cm_id); 973 974 kfree(rdma); 975 } 976 977 static void svc_rdma_free(struct svc_xprt *xprt) 978 { 979 struct svcxprt_rdma *rdma = 980 container_of(xprt, struct svcxprt_rdma, sc_xprt); 981 INIT_WORK(&rdma->sc_work, __svc_rdma_free); 982 queue_work(svc_rdma_wq, &rdma->sc_work); 983 } 984 985 static int svc_rdma_has_wspace(struct svc_xprt *xprt) 986 { 987 struct svcxprt_rdma *rdma = 988 container_of(xprt, struct svcxprt_rdma, sc_xprt); 989 990 /* 991 * If there are already waiters on the SQ, 992 * return false. 993 */ 994 if (waitqueue_active(&rdma->sc_send_wait)) 995 return 0; 996 997 /* Otherwise return true. */ 998 return 1; 999 } 1000 1001 static int svc_rdma_secure_port(struct svc_rqst *rqstp) 1002 { 1003 return 1; 1004 } 1005 1006 static void svc_rdma_kill_temp_xprt(struct svc_xprt *xprt) 1007 { 1008 } 1009 1010 int svc_rdma_send(struct svcxprt_rdma *xprt, struct ib_send_wr *wr) 1011 { 1012 struct ib_send_wr *bad_wr, *n_wr; 1013 int wr_count; 1014 int i; 1015 int ret; 1016 1017 if (test_bit(XPT_CLOSE, &xprt->sc_xprt.xpt_flags)) 1018 return -ENOTCONN; 1019 1020 wr_count = 1; 1021 for (n_wr = wr->next; n_wr; n_wr = n_wr->next) 1022 wr_count++; 1023 1024 /* If the SQ is full, wait until an SQ entry is available */ 1025 while (1) { 1026 if ((atomic_sub_return(wr_count, &xprt->sc_sq_avail) < 0)) { 1027 atomic_inc(&rdma_stat_sq_starve); 1028 1029 /* Wait until SQ WR available if SQ still full */ 1030 atomic_add(wr_count, &xprt->sc_sq_avail); 1031 wait_event(xprt->sc_send_wait, 1032 atomic_read(&xprt->sc_sq_avail) > wr_count); 1033 if (test_bit(XPT_CLOSE, &xprt->sc_xprt.xpt_flags)) 1034 return -ENOTCONN; 1035 continue; 1036 } 1037 /* Take a transport ref for each WR posted */ 1038 for (i = 0; i < wr_count; i++) 1039 svc_xprt_get(&xprt->sc_xprt); 1040 1041 /* Bump used SQ WR count and post */ 1042 ret = ib_post_send(xprt->sc_qp, wr, &bad_wr); 1043 if (ret) { 1044 set_bit(XPT_CLOSE, &xprt->sc_xprt.xpt_flags); 1045 for (i = 0; i < wr_count; i ++) 1046 svc_xprt_put(&xprt->sc_xprt); 1047 dprintk("svcrdma: failed to post SQ WR rc=%d\n", ret); 1048 dprintk(" sc_sq_avail=%d, sc_sq_depth=%d\n", 1049 atomic_read(&xprt->sc_sq_avail), 1050 xprt->sc_sq_depth); 1051 wake_up(&xprt->sc_send_wait); 1052 } 1053 break; 1054 } 1055 return ret; 1056 } 1057