1 /* 2 * Copyright (c) 2014 Open Grid Computing, Inc. All rights reserved. 3 * Copyright (c) 2005-2007 Network Appliance, Inc. All rights reserved. 4 * 5 * This software is available to you under a choice of one of two 6 * licenses. You may choose to be licensed under the terms of the GNU 7 * General Public License (GPL) Version 2, available from the file 8 * COPYING in the main directory of this source tree, or the BSD-type 9 * license below: 10 * 11 * Redistribution and use in source and binary forms, with or without 12 * modification, are permitted provided that the following conditions 13 * are met: 14 * 15 * Redistributions of source code must retain the above copyright 16 * notice, this list of conditions and the following disclaimer. 17 * 18 * Redistributions in binary form must reproduce the above 19 * copyright notice, this list of conditions and the following 20 * disclaimer in the documentation and/or other materials provided 21 * with the distribution. 22 * 23 * Neither the name of the Network Appliance, Inc. nor the names of 24 * its contributors may be used to endorse or promote products 25 * derived from this software without specific prior written 26 * permission. 27 * 28 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 29 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 30 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 31 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 32 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 33 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 34 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 35 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 36 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 37 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 38 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 39 * 40 * Author: Tom Tucker <tom@opengridcomputing.com> 41 */ 42 43 #include <linux/sunrpc/svc_xprt.h> 44 #include <linux/sunrpc/addr.h> 45 #include <linux/sunrpc/debug.h> 46 #include <linux/sunrpc/rpc_rdma.h> 47 #include <linux/interrupt.h> 48 #include <linux/sched.h> 49 #include <linux/slab.h> 50 #include <linux/spinlock.h> 51 #include <linux/workqueue.h> 52 #include <rdma/ib_verbs.h> 53 #include <rdma/rdma_cm.h> 54 #include <rdma/rw.h> 55 #include <linux/sunrpc/svc_rdma.h> 56 #include <linux/export.h> 57 #include "xprt_rdma.h" 58 59 #define RPCDBG_FACILITY RPCDBG_SVCXPRT 60 61 static struct svcxprt_rdma *rdma_create_xprt(struct svc_serv *, int); 62 static struct svc_xprt *svc_rdma_create(struct svc_serv *serv, 63 struct net *net, 64 struct sockaddr *sa, int salen, 65 int flags); 66 static struct svc_xprt *svc_rdma_accept(struct svc_xprt *xprt); 67 static void svc_rdma_release_rqst(struct svc_rqst *); 68 static void svc_rdma_detach(struct svc_xprt *xprt); 69 static void svc_rdma_free(struct svc_xprt *xprt); 70 static int svc_rdma_has_wspace(struct svc_xprt *xprt); 71 static int svc_rdma_secure_port(struct svc_rqst *); 72 static void svc_rdma_kill_temp_xprt(struct svc_xprt *); 73 74 static const struct svc_xprt_ops svc_rdma_ops = { 75 .xpo_create = svc_rdma_create, 76 .xpo_recvfrom = svc_rdma_recvfrom, 77 .xpo_sendto = svc_rdma_sendto, 78 .xpo_release_rqst = svc_rdma_release_rqst, 79 .xpo_detach = svc_rdma_detach, 80 .xpo_free = svc_rdma_free, 81 .xpo_prep_reply_hdr = svc_rdma_prep_reply_hdr, 82 .xpo_has_wspace = svc_rdma_has_wspace, 83 .xpo_accept = svc_rdma_accept, 84 .xpo_secure_port = svc_rdma_secure_port, 85 .xpo_kill_temp_xprt = svc_rdma_kill_temp_xprt, 86 }; 87 88 struct svc_xprt_class svc_rdma_class = { 89 .xcl_name = "rdma", 90 .xcl_owner = THIS_MODULE, 91 .xcl_ops = &svc_rdma_ops, 92 .xcl_max_payload = RPCSVC_MAXPAYLOAD_RDMA, 93 .xcl_ident = XPRT_TRANSPORT_RDMA, 94 }; 95 96 #if defined(CONFIG_SUNRPC_BACKCHANNEL) 97 static struct svc_xprt *svc_rdma_bc_create(struct svc_serv *, struct net *, 98 struct sockaddr *, int, int); 99 static void svc_rdma_bc_detach(struct svc_xprt *); 100 static void svc_rdma_bc_free(struct svc_xprt *); 101 102 static const struct svc_xprt_ops svc_rdma_bc_ops = { 103 .xpo_create = svc_rdma_bc_create, 104 .xpo_detach = svc_rdma_bc_detach, 105 .xpo_free = svc_rdma_bc_free, 106 .xpo_prep_reply_hdr = svc_rdma_prep_reply_hdr, 107 .xpo_secure_port = svc_rdma_secure_port, 108 }; 109 110 struct svc_xprt_class svc_rdma_bc_class = { 111 .xcl_name = "rdma-bc", 112 .xcl_owner = THIS_MODULE, 113 .xcl_ops = &svc_rdma_bc_ops, 114 .xcl_max_payload = (1024 - RPCRDMA_HDRLEN_MIN) 115 }; 116 117 static struct svc_xprt *svc_rdma_bc_create(struct svc_serv *serv, 118 struct net *net, 119 struct sockaddr *sa, int salen, 120 int flags) 121 { 122 struct svcxprt_rdma *cma_xprt; 123 struct svc_xprt *xprt; 124 125 cma_xprt = rdma_create_xprt(serv, 0); 126 if (!cma_xprt) 127 return ERR_PTR(-ENOMEM); 128 xprt = &cma_xprt->sc_xprt; 129 130 svc_xprt_init(net, &svc_rdma_bc_class, xprt, serv); 131 set_bit(XPT_CONG_CTRL, &xprt->xpt_flags); 132 serv->sv_bc_xprt = xprt; 133 134 dprintk("svcrdma: %s(%p)\n", __func__, xprt); 135 return xprt; 136 } 137 138 static void svc_rdma_bc_detach(struct svc_xprt *xprt) 139 { 140 dprintk("svcrdma: %s(%p)\n", __func__, xprt); 141 } 142 143 static void svc_rdma_bc_free(struct svc_xprt *xprt) 144 { 145 struct svcxprt_rdma *rdma = 146 container_of(xprt, struct svcxprt_rdma, sc_xprt); 147 148 dprintk("svcrdma: %s(%p)\n", __func__, xprt); 149 if (xprt) 150 kfree(rdma); 151 } 152 #endif /* CONFIG_SUNRPC_BACKCHANNEL */ 153 154 static struct svc_rdma_op_ctxt *alloc_ctxt(struct svcxprt_rdma *xprt, 155 gfp_t flags) 156 { 157 struct svc_rdma_op_ctxt *ctxt; 158 159 ctxt = kmalloc(sizeof(*ctxt), flags); 160 if (ctxt) { 161 ctxt->xprt = xprt; 162 INIT_LIST_HEAD(&ctxt->list); 163 } 164 return ctxt; 165 } 166 167 static bool svc_rdma_prealloc_ctxts(struct svcxprt_rdma *xprt) 168 { 169 unsigned int i; 170 171 /* Each RPC/RDMA credit can consume one Receive and 172 * one Send WQE at the same time. 173 */ 174 i = xprt->sc_sq_depth + xprt->sc_rq_depth; 175 176 while (i--) { 177 struct svc_rdma_op_ctxt *ctxt; 178 179 ctxt = alloc_ctxt(xprt, GFP_KERNEL); 180 if (!ctxt) { 181 dprintk("svcrdma: No memory for RDMA ctxt\n"); 182 return false; 183 } 184 list_add(&ctxt->list, &xprt->sc_ctxts); 185 } 186 return true; 187 } 188 189 struct svc_rdma_op_ctxt *svc_rdma_get_context(struct svcxprt_rdma *xprt) 190 { 191 struct svc_rdma_op_ctxt *ctxt = NULL; 192 193 spin_lock(&xprt->sc_ctxt_lock); 194 xprt->sc_ctxt_used++; 195 if (list_empty(&xprt->sc_ctxts)) 196 goto out_empty; 197 198 ctxt = list_first_entry(&xprt->sc_ctxts, 199 struct svc_rdma_op_ctxt, list); 200 list_del(&ctxt->list); 201 spin_unlock(&xprt->sc_ctxt_lock); 202 203 out: 204 ctxt->count = 0; 205 ctxt->mapped_sges = 0; 206 return ctxt; 207 208 out_empty: 209 /* Either pre-allocation missed the mark, or send 210 * queue accounting is broken. 211 */ 212 spin_unlock(&xprt->sc_ctxt_lock); 213 214 ctxt = alloc_ctxt(xprt, GFP_NOIO); 215 if (ctxt) 216 goto out; 217 218 spin_lock(&xprt->sc_ctxt_lock); 219 xprt->sc_ctxt_used--; 220 spin_unlock(&xprt->sc_ctxt_lock); 221 WARN_ONCE(1, "svcrdma: empty RDMA ctxt list?\n"); 222 return NULL; 223 } 224 225 void svc_rdma_unmap_dma(struct svc_rdma_op_ctxt *ctxt) 226 { 227 struct svcxprt_rdma *xprt = ctxt->xprt; 228 struct ib_device *device = xprt->sc_cm_id->device; 229 unsigned int i; 230 231 for (i = 0; i < ctxt->mapped_sges; i++) 232 ib_dma_unmap_page(device, 233 ctxt->sge[i].addr, 234 ctxt->sge[i].length, 235 ctxt->direction); 236 ctxt->mapped_sges = 0; 237 } 238 239 void svc_rdma_put_context(struct svc_rdma_op_ctxt *ctxt, int free_pages) 240 { 241 struct svcxprt_rdma *xprt = ctxt->xprt; 242 int i; 243 244 if (free_pages) 245 for (i = 0; i < ctxt->count; i++) 246 put_page(ctxt->pages[i]); 247 248 spin_lock(&xprt->sc_ctxt_lock); 249 xprt->sc_ctxt_used--; 250 list_add(&ctxt->list, &xprt->sc_ctxts); 251 spin_unlock(&xprt->sc_ctxt_lock); 252 } 253 254 static void svc_rdma_destroy_ctxts(struct svcxprt_rdma *xprt) 255 { 256 while (!list_empty(&xprt->sc_ctxts)) { 257 struct svc_rdma_op_ctxt *ctxt; 258 259 ctxt = list_first_entry(&xprt->sc_ctxts, 260 struct svc_rdma_op_ctxt, list); 261 list_del(&ctxt->list); 262 kfree(ctxt); 263 } 264 } 265 266 /* QP event handler */ 267 static void qp_event_handler(struct ib_event *event, void *context) 268 { 269 struct svc_xprt *xprt = context; 270 271 switch (event->event) { 272 /* These are considered benign events */ 273 case IB_EVENT_PATH_MIG: 274 case IB_EVENT_COMM_EST: 275 case IB_EVENT_SQ_DRAINED: 276 case IB_EVENT_QP_LAST_WQE_REACHED: 277 dprintk("svcrdma: QP event %s (%d) received for QP=%p\n", 278 ib_event_msg(event->event), event->event, 279 event->element.qp); 280 break; 281 /* These are considered fatal events */ 282 case IB_EVENT_PATH_MIG_ERR: 283 case IB_EVENT_QP_FATAL: 284 case IB_EVENT_QP_REQ_ERR: 285 case IB_EVENT_QP_ACCESS_ERR: 286 case IB_EVENT_DEVICE_FATAL: 287 default: 288 dprintk("svcrdma: QP ERROR event %s (%d) received for QP=%p, " 289 "closing transport\n", 290 ib_event_msg(event->event), event->event, 291 event->element.qp); 292 set_bit(XPT_CLOSE, &xprt->xpt_flags); 293 svc_xprt_enqueue(xprt); 294 break; 295 } 296 } 297 298 /** 299 * svc_rdma_wc_receive - Invoked by RDMA provider for each polled Receive WC 300 * @cq: completion queue 301 * @wc: completed WR 302 * 303 */ 304 static void svc_rdma_wc_receive(struct ib_cq *cq, struct ib_wc *wc) 305 { 306 struct svcxprt_rdma *xprt = cq->cq_context; 307 struct ib_cqe *cqe = wc->wr_cqe; 308 struct svc_rdma_op_ctxt *ctxt; 309 310 /* WARNING: Only wc->wr_cqe and wc->status are reliable */ 311 ctxt = container_of(cqe, struct svc_rdma_op_ctxt, cqe); 312 svc_rdma_unmap_dma(ctxt); 313 314 if (wc->status != IB_WC_SUCCESS) 315 goto flushed; 316 317 /* All wc fields are now known to be valid */ 318 ctxt->byte_len = wc->byte_len; 319 spin_lock(&xprt->sc_rq_dto_lock); 320 list_add_tail(&ctxt->list, &xprt->sc_rq_dto_q); 321 spin_unlock(&xprt->sc_rq_dto_lock); 322 323 set_bit(XPT_DATA, &xprt->sc_xprt.xpt_flags); 324 if (test_bit(RDMAXPRT_CONN_PENDING, &xprt->sc_flags)) 325 goto out; 326 goto out_enqueue; 327 328 flushed: 329 if (wc->status != IB_WC_WR_FLUSH_ERR) 330 pr_warn("svcrdma: receive: %s (%u/0x%x)\n", 331 ib_wc_status_msg(wc->status), 332 wc->status, wc->vendor_err); 333 set_bit(XPT_CLOSE, &xprt->sc_xprt.xpt_flags); 334 svc_rdma_put_context(ctxt, 1); 335 336 out_enqueue: 337 svc_xprt_enqueue(&xprt->sc_xprt); 338 out: 339 svc_xprt_put(&xprt->sc_xprt); 340 } 341 342 /** 343 * svc_rdma_wc_send - Invoked by RDMA provider for each polled Send WC 344 * @cq: completion queue 345 * @wc: completed WR 346 * 347 */ 348 void svc_rdma_wc_send(struct ib_cq *cq, struct ib_wc *wc) 349 { 350 struct svcxprt_rdma *xprt = cq->cq_context; 351 struct ib_cqe *cqe = wc->wr_cqe; 352 struct svc_rdma_op_ctxt *ctxt; 353 354 atomic_inc(&xprt->sc_sq_avail); 355 wake_up(&xprt->sc_send_wait); 356 357 ctxt = container_of(cqe, struct svc_rdma_op_ctxt, cqe); 358 svc_rdma_unmap_dma(ctxt); 359 svc_rdma_put_context(ctxt, 1); 360 361 if (unlikely(wc->status != IB_WC_SUCCESS)) { 362 set_bit(XPT_CLOSE, &xprt->sc_xprt.xpt_flags); 363 svc_xprt_enqueue(&xprt->sc_xprt); 364 if (wc->status != IB_WC_WR_FLUSH_ERR) 365 pr_err("svcrdma: Send: %s (%u/0x%x)\n", 366 ib_wc_status_msg(wc->status), 367 wc->status, wc->vendor_err); 368 } 369 370 svc_xprt_put(&xprt->sc_xprt); 371 } 372 373 static struct svcxprt_rdma *rdma_create_xprt(struct svc_serv *serv, 374 int listener) 375 { 376 struct svcxprt_rdma *cma_xprt = kzalloc(sizeof *cma_xprt, GFP_KERNEL); 377 378 if (!cma_xprt) 379 return NULL; 380 svc_xprt_init(&init_net, &svc_rdma_class, &cma_xprt->sc_xprt, serv); 381 INIT_LIST_HEAD(&cma_xprt->sc_accept_q); 382 INIT_LIST_HEAD(&cma_xprt->sc_rq_dto_q); 383 INIT_LIST_HEAD(&cma_xprt->sc_read_complete_q); 384 INIT_LIST_HEAD(&cma_xprt->sc_ctxts); 385 INIT_LIST_HEAD(&cma_xprt->sc_rw_ctxts); 386 init_waitqueue_head(&cma_xprt->sc_send_wait); 387 388 spin_lock_init(&cma_xprt->sc_lock); 389 spin_lock_init(&cma_xprt->sc_rq_dto_lock); 390 spin_lock_init(&cma_xprt->sc_ctxt_lock); 391 spin_lock_init(&cma_xprt->sc_rw_ctxt_lock); 392 393 /* 394 * Note that this implies that the underlying transport support 395 * has some form of congestion control (see RFC 7530 section 3.1 396 * paragraph 2). For now, we assume that all supported RDMA 397 * transports are suitable here. 398 */ 399 set_bit(XPT_CONG_CTRL, &cma_xprt->sc_xprt.xpt_flags); 400 401 if (listener) 402 set_bit(XPT_LISTENER, &cma_xprt->sc_xprt.xpt_flags); 403 404 return cma_xprt; 405 } 406 407 int svc_rdma_post_recv(struct svcxprt_rdma *xprt, gfp_t flags) 408 { 409 struct ib_recv_wr recv_wr, *bad_recv_wr; 410 struct svc_rdma_op_ctxt *ctxt; 411 struct page *page; 412 dma_addr_t pa; 413 int sge_no; 414 int buflen; 415 int ret; 416 417 ctxt = svc_rdma_get_context(xprt); 418 buflen = 0; 419 ctxt->direction = DMA_FROM_DEVICE; 420 ctxt->cqe.done = svc_rdma_wc_receive; 421 for (sge_no = 0; buflen < xprt->sc_max_req_size; sge_no++) { 422 if (sge_no >= xprt->sc_max_sge) { 423 pr_err("svcrdma: Too many sges (%d)\n", sge_no); 424 goto err_put_ctxt; 425 } 426 page = alloc_page(flags); 427 if (!page) 428 goto err_put_ctxt; 429 ctxt->pages[sge_no] = page; 430 pa = ib_dma_map_page(xprt->sc_cm_id->device, 431 page, 0, PAGE_SIZE, 432 DMA_FROM_DEVICE); 433 if (ib_dma_mapping_error(xprt->sc_cm_id->device, pa)) 434 goto err_put_ctxt; 435 svc_rdma_count_mappings(xprt, ctxt); 436 ctxt->sge[sge_no].addr = pa; 437 ctxt->sge[sge_no].length = PAGE_SIZE; 438 ctxt->sge[sge_no].lkey = xprt->sc_pd->local_dma_lkey; 439 ctxt->count = sge_no + 1; 440 buflen += PAGE_SIZE; 441 } 442 recv_wr.next = NULL; 443 recv_wr.sg_list = &ctxt->sge[0]; 444 recv_wr.num_sge = ctxt->count; 445 recv_wr.wr_cqe = &ctxt->cqe; 446 447 svc_xprt_get(&xprt->sc_xprt); 448 ret = ib_post_recv(xprt->sc_qp, &recv_wr, &bad_recv_wr); 449 if (ret) { 450 svc_rdma_unmap_dma(ctxt); 451 svc_rdma_put_context(ctxt, 1); 452 svc_xprt_put(&xprt->sc_xprt); 453 } 454 return ret; 455 456 err_put_ctxt: 457 svc_rdma_unmap_dma(ctxt); 458 svc_rdma_put_context(ctxt, 1); 459 return -ENOMEM; 460 } 461 462 int svc_rdma_repost_recv(struct svcxprt_rdma *xprt, gfp_t flags) 463 { 464 int ret = 0; 465 466 ret = svc_rdma_post_recv(xprt, flags); 467 if (ret) { 468 pr_err("svcrdma: could not post a receive buffer, err=%d.\n", 469 ret); 470 pr_err("svcrdma: closing transport %p.\n", xprt); 471 set_bit(XPT_CLOSE, &xprt->sc_xprt.xpt_flags); 472 ret = -ENOTCONN; 473 } 474 return ret; 475 } 476 477 static void 478 svc_rdma_parse_connect_private(struct svcxprt_rdma *newxprt, 479 struct rdma_conn_param *param) 480 { 481 const struct rpcrdma_connect_private *pmsg = param->private_data; 482 483 if (pmsg && 484 pmsg->cp_magic == rpcrdma_cmp_magic && 485 pmsg->cp_version == RPCRDMA_CMP_VERSION) { 486 newxprt->sc_snd_w_inv = pmsg->cp_flags & 487 RPCRDMA_CMP_F_SND_W_INV_OK; 488 489 dprintk("svcrdma: client send_size %u, recv_size %u " 490 "remote inv %ssupported\n", 491 rpcrdma_decode_buffer_size(pmsg->cp_send_size), 492 rpcrdma_decode_buffer_size(pmsg->cp_recv_size), 493 newxprt->sc_snd_w_inv ? "" : "un"); 494 } 495 } 496 497 /* 498 * This function handles the CONNECT_REQUEST event on a listening 499 * endpoint. It is passed the cma_id for the _new_ connection. The context in 500 * this cma_id is inherited from the listening cma_id and is the svc_xprt 501 * structure for the listening endpoint. 502 * 503 * This function creates a new xprt for the new connection and enqueues it on 504 * the accept queue for the listent xprt. When the listen thread is kicked, it 505 * will call the recvfrom method on the listen xprt which will accept the new 506 * connection. 507 */ 508 static void handle_connect_req(struct rdma_cm_id *new_cma_id, 509 struct rdma_conn_param *param) 510 { 511 struct svcxprt_rdma *listen_xprt = new_cma_id->context; 512 struct svcxprt_rdma *newxprt; 513 struct sockaddr *sa; 514 515 /* Create a new transport */ 516 newxprt = rdma_create_xprt(listen_xprt->sc_xprt.xpt_server, 0); 517 if (!newxprt) { 518 dprintk("svcrdma: failed to create new transport\n"); 519 return; 520 } 521 newxprt->sc_cm_id = new_cma_id; 522 new_cma_id->context = newxprt; 523 dprintk("svcrdma: Creating newxprt=%p, cm_id=%p, listenxprt=%p\n", 524 newxprt, newxprt->sc_cm_id, listen_xprt); 525 svc_rdma_parse_connect_private(newxprt, param); 526 527 /* Save client advertised inbound read limit for use later in accept. */ 528 newxprt->sc_ord = param->initiator_depth; 529 530 /* Set the local and remote addresses in the transport */ 531 sa = (struct sockaddr *)&newxprt->sc_cm_id->route.addr.dst_addr; 532 svc_xprt_set_remote(&newxprt->sc_xprt, sa, svc_addr_len(sa)); 533 sa = (struct sockaddr *)&newxprt->sc_cm_id->route.addr.src_addr; 534 svc_xprt_set_local(&newxprt->sc_xprt, sa, svc_addr_len(sa)); 535 536 /* 537 * Enqueue the new transport on the accept queue of the listening 538 * transport 539 */ 540 spin_lock_bh(&listen_xprt->sc_lock); 541 list_add_tail(&newxprt->sc_accept_q, &listen_xprt->sc_accept_q); 542 spin_unlock_bh(&listen_xprt->sc_lock); 543 544 set_bit(XPT_CONN, &listen_xprt->sc_xprt.xpt_flags); 545 svc_xprt_enqueue(&listen_xprt->sc_xprt); 546 } 547 548 /* 549 * Handles events generated on the listening endpoint. These events will be 550 * either be incoming connect requests or adapter removal events. 551 */ 552 static int rdma_listen_handler(struct rdma_cm_id *cma_id, 553 struct rdma_cm_event *event) 554 { 555 struct svcxprt_rdma *xprt = cma_id->context; 556 int ret = 0; 557 558 switch (event->event) { 559 case RDMA_CM_EVENT_CONNECT_REQUEST: 560 dprintk("svcrdma: Connect request on cma_id=%p, xprt = %p, " 561 "event = %s (%d)\n", cma_id, cma_id->context, 562 rdma_event_msg(event->event), event->event); 563 handle_connect_req(cma_id, &event->param.conn); 564 break; 565 566 case RDMA_CM_EVENT_ESTABLISHED: 567 /* Accept complete */ 568 dprintk("svcrdma: Connection completed on LISTEN xprt=%p, " 569 "cm_id=%p\n", xprt, cma_id); 570 break; 571 572 case RDMA_CM_EVENT_DEVICE_REMOVAL: 573 dprintk("svcrdma: Device removal xprt=%p, cm_id=%p\n", 574 xprt, cma_id); 575 if (xprt) { 576 set_bit(XPT_CLOSE, &xprt->sc_xprt.xpt_flags); 577 svc_xprt_enqueue(&xprt->sc_xprt); 578 } 579 break; 580 581 default: 582 dprintk("svcrdma: Unexpected event on listening endpoint %p, " 583 "event = %s (%d)\n", cma_id, 584 rdma_event_msg(event->event), event->event); 585 break; 586 } 587 588 return ret; 589 } 590 591 static int rdma_cma_handler(struct rdma_cm_id *cma_id, 592 struct rdma_cm_event *event) 593 { 594 struct svc_xprt *xprt = cma_id->context; 595 struct svcxprt_rdma *rdma = 596 container_of(xprt, struct svcxprt_rdma, sc_xprt); 597 switch (event->event) { 598 case RDMA_CM_EVENT_ESTABLISHED: 599 /* Accept complete */ 600 svc_xprt_get(xprt); 601 dprintk("svcrdma: Connection completed on DTO xprt=%p, " 602 "cm_id=%p\n", xprt, cma_id); 603 clear_bit(RDMAXPRT_CONN_PENDING, &rdma->sc_flags); 604 svc_xprt_enqueue(xprt); 605 break; 606 case RDMA_CM_EVENT_DISCONNECTED: 607 dprintk("svcrdma: Disconnect on DTO xprt=%p, cm_id=%p\n", 608 xprt, cma_id); 609 if (xprt) { 610 set_bit(XPT_CLOSE, &xprt->xpt_flags); 611 svc_xprt_enqueue(xprt); 612 svc_xprt_put(xprt); 613 } 614 break; 615 case RDMA_CM_EVENT_DEVICE_REMOVAL: 616 dprintk("svcrdma: Device removal cma_id=%p, xprt = %p, " 617 "event = %s (%d)\n", cma_id, xprt, 618 rdma_event_msg(event->event), event->event); 619 if (xprt) { 620 set_bit(XPT_CLOSE, &xprt->xpt_flags); 621 svc_xprt_enqueue(xprt); 622 svc_xprt_put(xprt); 623 } 624 break; 625 default: 626 dprintk("svcrdma: Unexpected event on DTO endpoint %p, " 627 "event = %s (%d)\n", cma_id, 628 rdma_event_msg(event->event), event->event); 629 break; 630 } 631 return 0; 632 } 633 634 /* 635 * Create a listening RDMA service endpoint. 636 */ 637 static struct svc_xprt *svc_rdma_create(struct svc_serv *serv, 638 struct net *net, 639 struct sockaddr *sa, int salen, 640 int flags) 641 { 642 struct rdma_cm_id *listen_id; 643 struct svcxprt_rdma *cma_xprt; 644 int ret; 645 646 dprintk("svcrdma: Creating RDMA socket\n"); 647 if ((sa->sa_family != AF_INET) && (sa->sa_family != AF_INET6)) { 648 dprintk("svcrdma: Address family %d is not supported.\n", sa->sa_family); 649 return ERR_PTR(-EAFNOSUPPORT); 650 } 651 cma_xprt = rdma_create_xprt(serv, 1); 652 if (!cma_xprt) 653 return ERR_PTR(-ENOMEM); 654 655 listen_id = rdma_create_id(&init_net, rdma_listen_handler, cma_xprt, 656 RDMA_PS_TCP, IB_QPT_RC); 657 if (IS_ERR(listen_id)) { 658 ret = PTR_ERR(listen_id); 659 dprintk("svcrdma: rdma_create_id failed = %d\n", ret); 660 goto err0; 661 } 662 663 /* Allow both IPv4 and IPv6 sockets to bind a single port 664 * at the same time. 665 */ 666 #if IS_ENABLED(CONFIG_IPV6) 667 ret = rdma_set_afonly(listen_id, 1); 668 if (ret) { 669 dprintk("svcrdma: rdma_set_afonly failed = %d\n", ret); 670 goto err1; 671 } 672 #endif 673 ret = rdma_bind_addr(listen_id, sa); 674 if (ret) { 675 dprintk("svcrdma: rdma_bind_addr failed = %d\n", ret); 676 goto err1; 677 } 678 cma_xprt->sc_cm_id = listen_id; 679 680 ret = rdma_listen(listen_id, RPCRDMA_LISTEN_BACKLOG); 681 if (ret) { 682 dprintk("svcrdma: rdma_listen failed = %d\n", ret); 683 goto err1; 684 } 685 686 /* 687 * We need to use the address from the cm_id in case the 688 * caller specified 0 for the port number. 689 */ 690 sa = (struct sockaddr *)&cma_xprt->sc_cm_id->route.addr.src_addr; 691 svc_xprt_set_local(&cma_xprt->sc_xprt, sa, salen); 692 693 return &cma_xprt->sc_xprt; 694 695 err1: 696 rdma_destroy_id(listen_id); 697 err0: 698 kfree(cma_xprt); 699 return ERR_PTR(ret); 700 } 701 702 /* 703 * This is the xpo_recvfrom function for listening endpoints. Its 704 * purpose is to accept incoming connections. The CMA callback handler 705 * has already created a new transport and attached it to the new CMA 706 * ID. 707 * 708 * There is a queue of pending connections hung on the listening 709 * transport. This queue contains the new svc_xprt structure. This 710 * function takes svc_xprt structures off the accept_q and completes 711 * the connection. 712 */ 713 static struct svc_xprt *svc_rdma_accept(struct svc_xprt *xprt) 714 { 715 struct svcxprt_rdma *listen_rdma; 716 struct svcxprt_rdma *newxprt = NULL; 717 struct rdma_conn_param conn_param; 718 struct rpcrdma_connect_private pmsg; 719 struct ib_qp_init_attr qp_attr; 720 struct ib_device *dev; 721 struct sockaddr *sap; 722 unsigned int i, ctxts; 723 int ret = 0; 724 725 listen_rdma = container_of(xprt, struct svcxprt_rdma, sc_xprt); 726 clear_bit(XPT_CONN, &xprt->xpt_flags); 727 /* Get the next entry off the accept list */ 728 spin_lock_bh(&listen_rdma->sc_lock); 729 if (!list_empty(&listen_rdma->sc_accept_q)) { 730 newxprt = list_entry(listen_rdma->sc_accept_q.next, 731 struct svcxprt_rdma, sc_accept_q); 732 list_del_init(&newxprt->sc_accept_q); 733 } 734 if (!list_empty(&listen_rdma->sc_accept_q)) 735 set_bit(XPT_CONN, &listen_rdma->sc_xprt.xpt_flags); 736 spin_unlock_bh(&listen_rdma->sc_lock); 737 if (!newxprt) 738 return NULL; 739 740 dprintk("svcrdma: newxprt from accept queue = %p, cm_id=%p\n", 741 newxprt, newxprt->sc_cm_id); 742 743 dev = newxprt->sc_cm_id->device; 744 newxprt->sc_port_num = newxprt->sc_cm_id->port_num; 745 746 /* Qualify the transport resource defaults with the 747 * capabilities of this particular device */ 748 newxprt->sc_max_sge = min((size_t)dev->attrs.max_sge, 749 (size_t)RPCSVC_MAXPAGES); 750 newxprt->sc_max_req_size = svcrdma_max_req_size; 751 newxprt->sc_max_requests = svcrdma_max_requests; 752 newxprt->sc_max_bc_requests = svcrdma_max_bc_requests; 753 newxprt->sc_rq_depth = newxprt->sc_max_requests + 754 newxprt->sc_max_bc_requests; 755 if (newxprt->sc_rq_depth > dev->attrs.max_qp_wr) { 756 pr_warn("svcrdma: reducing receive depth to %d\n", 757 dev->attrs.max_qp_wr); 758 newxprt->sc_rq_depth = dev->attrs.max_qp_wr; 759 newxprt->sc_max_requests = newxprt->sc_rq_depth - 2; 760 newxprt->sc_max_bc_requests = 2; 761 } 762 newxprt->sc_fc_credits = cpu_to_be32(newxprt->sc_max_requests); 763 ctxts = rdma_rw_mr_factor(dev, newxprt->sc_port_num, RPCSVC_MAXPAGES); 764 ctxts *= newxprt->sc_max_requests; 765 newxprt->sc_sq_depth = newxprt->sc_rq_depth + ctxts; 766 if (newxprt->sc_sq_depth > dev->attrs.max_qp_wr) { 767 pr_warn("svcrdma: reducing send depth to %d\n", 768 dev->attrs.max_qp_wr); 769 newxprt->sc_sq_depth = dev->attrs.max_qp_wr; 770 } 771 atomic_set(&newxprt->sc_sq_avail, newxprt->sc_sq_depth); 772 773 if (!svc_rdma_prealloc_ctxts(newxprt)) 774 goto errout; 775 776 /* 777 * Limit ORD based on client limit, local device limit, and 778 * configured svcrdma limit. 779 */ 780 newxprt->sc_ord = min_t(size_t, dev->attrs.max_qp_rd_atom, newxprt->sc_ord); 781 newxprt->sc_ord = min_t(size_t, svcrdma_ord, newxprt->sc_ord); 782 783 newxprt->sc_pd = ib_alloc_pd(dev, 0); 784 if (IS_ERR(newxprt->sc_pd)) { 785 dprintk("svcrdma: error creating PD for connect request\n"); 786 goto errout; 787 } 788 newxprt->sc_sq_cq = ib_alloc_cq(dev, newxprt, newxprt->sc_sq_depth, 789 0, IB_POLL_WORKQUEUE); 790 if (IS_ERR(newxprt->sc_sq_cq)) { 791 dprintk("svcrdma: error creating SQ CQ for connect request\n"); 792 goto errout; 793 } 794 newxprt->sc_rq_cq = ib_alloc_cq(dev, newxprt, newxprt->sc_rq_depth, 795 0, IB_POLL_WORKQUEUE); 796 if (IS_ERR(newxprt->sc_rq_cq)) { 797 dprintk("svcrdma: error creating RQ CQ for connect request\n"); 798 goto errout; 799 } 800 801 memset(&qp_attr, 0, sizeof qp_attr); 802 qp_attr.event_handler = qp_event_handler; 803 qp_attr.qp_context = &newxprt->sc_xprt; 804 qp_attr.port_num = newxprt->sc_port_num; 805 qp_attr.cap.max_rdma_ctxs = ctxts; 806 qp_attr.cap.max_send_wr = newxprt->sc_sq_depth - ctxts; 807 qp_attr.cap.max_recv_wr = newxprt->sc_rq_depth; 808 qp_attr.cap.max_send_sge = newxprt->sc_max_sge; 809 qp_attr.cap.max_recv_sge = newxprt->sc_max_sge; 810 qp_attr.sq_sig_type = IB_SIGNAL_REQ_WR; 811 qp_attr.qp_type = IB_QPT_RC; 812 qp_attr.send_cq = newxprt->sc_sq_cq; 813 qp_attr.recv_cq = newxprt->sc_rq_cq; 814 dprintk("svcrdma: newxprt->sc_cm_id=%p, newxprt->sc_pd=%p\n", 815 newxprt->sc_cm_id, newxprt->sc_pd); 816 dprintk(" cap.max_send_wr = %d, cap.max_recv_wr = %d\n", 817 qp_attr.cap.max_send_wr, qp_attr.cap.max_recv_wr); 818 dprintk(" cap.max_send_sge = %d, cap.max_recv_sge = %d\n", 819 qp_attr.cap.max_send_sge, qp_attr.cap.max_recv_sge); 820 821 ret = rdma_create_qp(newxprt->sc_cm_id, newxprt->sc_pd, &qp_attr); 822 if (ret) { 823 dprintk("svcrdma: failed to create QP, ret=%d\n", ret); 824 goto errout; 825 } 826 newxprt->sc_qp = newxprt->sc_cm_id->qp; 827 828 if (!(dev->attrs.device_cap_flags & IB_DEVICE_MEM_MGT_EXTENSIONS)) 829 newxprt->sc_snd_w_inv = false; 830 if (!rdma_protocol_iwarp(dev, newxprt->sc_port_num) && 831 !rdma_ib_or_roce(dev, newxprt->sc_port_num)) 832 goto errout; 833 834 /* Post receive buffers */ 835 for (i = 0; i < newxprt->sc_max_requests; i++) { 836 ret = svc_rdma_post_recv(newxprt, GFP_KERNEL); 837 if (ret) { 838 dprintk("svcrdma: failure posting receive buffers\n"); 839 goto errout; 840 } 841 } 842 843 /* Swap out the handler */ 844 newxprt->sc_cm_id->event_handler = rdma_cma_handler; 845 846 /* Construct RDMA-CM private message */ 847 pmsg.cp_magic = rpcrdma_cmp_magic; 848 pmsg.cp_version = RPCRDMA_CMP_VERSION; 849 pmsg.cp_flags = 0; 850 pmsg.cp_send_size = pmsg.cp_recv_size = 851 rpcrdma_encode_buffer_size(newxprt->sc_max_req_size); 852 853 /* Accept Connection */ 854 set_bit(RDMAXPRT_CONN_PENDING, &newxprt->sc_flags); 855 memset(&conn_param, 0, sizeof conn_param); 856 conn_param.responder_resources = 0; 857 conn_param.initiator_depth = newxprt->sc_ord; 858 conn_param.private_data = &pmsg; 859 conn_param.private_data_len = sizeof(pmsg); 860 ret = rdma_accept(newxprt->sc_cm_id, &conn_param); 861 if (ret) { 862 dprintk("svcrdma: failed to accept new connection, ret=%d\n", 863 ret); 864 goto errout; 865 } 866 867 dprintk("svcrdma: new connection %p accepted:\n", newxprt); 868 sap = (struct sockaddr *)&newxprt->sc_cm_id->route.addr.src_addr; 869 dprintk(" local address : %pIS:%u\n", sap, rpc_get_port(sap)); 870 sap = (struct sockaddr *)&newxprt->sc_cm_id->route.addr.dst_addr; 871 dprintk(" remote address : %pIS:%u\n", sap, rpc_get_port(sap)); 872 dprintk(" max_sge : %d\n", newxprt->sc_max_sge); 873 dprintk(" sq_depth : %d\n", newxprt->sc_sq_depth); 874 dprintk(" rdma_rw_ctxs : %d\n", ctxts); 875 dprintk(" max_requests : %d\n", newxprt->sc_max_requests); 876 dprintk(" ord : %d\n", newxprt->sc_ord); 877 878 return &newxprt->sc_xprt; 879 880 errout: 881 dprintk("svcrdma: failure accepting new connection rc=%d.\n", ret); 882 /* Take a reference in case the DTO handler runs */ 883 svc_xprt_get(&newxprt->sc_xprt); 884 if (newxprt->sc_qp && !IS_ERR(newxprt->sc_qp)) 885 ib_destroy_qp(newxprt->sc_qp); 886 rdma_destroy_id(newxprt->sc_cm_id); 887 /* This call to put will destroy the transport */ 888 svc_xprt_put(&newxprt->sc_xprt); 889 return NULL; 890 } 891 892 static void svc_rdma_release_rqst(struct svc_rqst *rqstp) 893 { 894 } 895 896 /* 897 * When connected, an svc_xprt has at least two references: 898 * 899 * - A reference held by the cm_id between the ESTABLISHED and 900 * DISCONNECTED events. If the remote peer disconnected first, this 901 * reference could be gone. 902 * 903 * - A reference held by the svc_recv code that called this function 904 * as part of close processing. 905 * 906 * At a minimum one references should still be held. 907 */ 908 static void svc_rdma_detach(struct svc_xprt *xprt) 909 { 910 struct svcxprt_rdma *rdma = 911 container_of(xprt, struct svcxprt_rdma, sc_xprt); 912 dprintk("svc: svc_rdma_detach(%p)\n", xprt); 913 914 /* Disconnect and flush posted WQE */ 915 rdma_disconnect(rdma->sc_cm_id); 916 } 917 918 static void __svc_rdma_free(struct work_struct *work) 919 { 920 struct svcxprt_rdma *rdma = 921 container_of(work, struct svcxprt_rdma, sc_work); 922 struct svc_xprt *xprt = &rdma->sc_xprt; 923 924 dprintk("svcrdma: %s(%p)\n", __func__, rdma); 925 926 if (rdma->sc_qp && !IS_ERR(rdma->sc_qp)) 927 ib_drain_qp(rdma->sc_qp); 928 929 /* We should only be called from kref_put */ 930 if (kref_read(&xprt->xpt_ref) != 0) 931 pr_err("svcrdma: sc_xprt still in use? (%d)\n", 932 kref_read(&xprt->xpt_ref)); 933 934 while (!list_empty(&rdma->sc_read_complete_q)) { 935 struct svc_rdma_op_ctxt *ctxt; 936 ctxt = list_first_entry(&rdma->sc_read_complete_q, 937 struct svc_rdma_op_ctxt, list); 938 list_del(&ctxt->list); 939 svc_rdma_put_context(ctxt, 1); 940 } 941 while (!list_empty(&rdma->sc_rq_dto_q)) { 942 struct svc_rdma_op_ctxt *ctxt; 943 ctxt = list_first_entry(&rdma->sc_rq_dto_q, 944 struct svc_rdma_op_ctxt, list); 945 list_del(&ctxt->list); 946 svc_rdma_put_context(ctxt, 1); 947 } 948 949 /* Warn if we leaked a resource or under-referenced */ 950 if (rdma->sc_ctxt_used != 0) 951 pr_err("svcrdma: ctxt still in use? (%d)\n", 952 rdma->sc_ctxt_used); 953 954 /* Final put of backchannel client transport */ 955 if (xprt->xpt_bc_xprt) { 956 xprt_put(xprt->xpt_bc_xprt); 957 xprt->xpt_bc_xprt = NULL; 958 } 959 960 svc_rdma_destroy_rw_ctxts(rdma); 961 svc_rdma_destroy_ctxts(rdma); 962 963 /* Destroy the QP if present (not a listener) */ 964 if (rdma->sc_qp && !IS_ERR(rdma->sc_qp)) 965 ib_destroy_qp(rdma->sc_qp); 966 967 if (rdma->sc_sq_cq && !IS_ERR(rdma->sc_sq_cq)) 968 ib_free_cq(rdma->sc_sq_cq); 969 970 if (rdma->sc_rq_cq && !IS_ERR(rdma->sc_rq_cq)) 971 ib_free_cq(rdma->sc_rq_cq); 972 973 if (rdma->sc_pd && !IS_ERR(rdma->sc_pd)) 974 ib_dealloc_pd(rdma->sc_pd); 975 976 /* Destroy the CM ID */ 977 rdma_destroy_id(rdma->sc_cm_id); 978 979 kfree(rdma); 980 } 981 982 static void svc_rdma_free(struct svc_xprt *xprt) 983 { 984 struct svcxprt_rdma *rdma = 985 container_of(xprt, struct svcxprt_rdma, sc_xprt); 986 INIT_WORK(&rdma->sc_work, __svc_rdma_free); 987 queue_work(svc_rdma_wq, &rdma->sc_work); 988 } 989 990 static int svc_rdma_has_wspace(struct svc_xprt *xprt) 991 { 992 struct svcxprt_rdma *rdma = 993 container_of(xprt, struct svcxprt_rdma, sc_xprt); 994 995 /* 996 * If there are already waiters on the SQ, 997 * return false. 998 */ 999 if (waitqueue_active(&rdma->sc_send_wait)) 1000 return 0; 1001 1002 /* Otherwise return true. */ 1003 return 1; 1004 } 1005 1006 static int svc_rdma_secure_port(struct svc_rqst *rqstp) 1007 { 1008 return 1; 1009 } 1010 1011 static void svc_rdma_kill_temp_xprt(struct svc_xprt *xprt) 1012 { 1013 } 1014 1015 int svc_rdma_send(struct svcxprt_rdma *xprt, struct ib_send_wr *wr) 1016 { 1017 struct ib_send_wr *bad_wr, *n_wr; 1018 int wr_count; 1019 int i; 1020 int ret; 1021 1022 if (test_bit(XPT_CLOSE, &xprt->sc_xprt.xpt_flags)) 1023 return -ENOTCONN; 1024 1025 wr_count = 1; 1026 for (n_wr = wr->next; n_wr; n_wr = n_wr->next) 1027 wr_count++; 1028 1029 /* If the SQ is full, wait until an SQ entry is available */ 1030 while (1) { 1031 if ((atomic_sub_return(wr_count, &xprt->sc_sq_avail) < 0)) { 1032 atomic_inc(&rdma_stat_sq_starve); 1033 1034 /* Wait until SQ WR available if SQ still full */ 1035 atomic_add(wr_count, &xprt->sc_sq_avail); 1036 wait_event(xprt->sc_send_wait, 1037 atomic_read(&xprt->sc_sq_avail) > wr_count); 1038 if (test_bit(XPT_CLOSE, &xprt->sc_xprt.xpt_flags)) 1039 return -ENOTCONN; 1040 continue; 1041 } 1042 /* Take a transport ref for each WR posted */ 1043 for (i = 0; i < wr_count; i++) 1044 svc_xprt_get(&xprt->sc_xprt); 1045 1046 /* Bump used SQ WR count and post */ 1047 ret = ib_post_send(xprt->sc_qp, wr, &bad_wr); 1048 if (ret) { 1049 set_bit(XPT_CLOSE, &xprt->sc_xprt.xpt_flags); 1050 for (i = 0; i < wr_count; i ++) 1051 svc_xprt_put(&xprt->sc_xprt); 1052 dprintk("svcrdma: failed to post SQ WR rc=%d\n", ret); 1053 dprintk(" sc_sq_avail=%d, sc_sq_depth=%d\n", 1054 atomic_read(&xprt->sc_sq_avail), 1055 xprt->sc_sq_depth); 1056 wake_up(&xprt->sc_send_wait); 1057 } 1058 break; 1059 } 1060 return ret; 1061 } 1062