1 /* 2 * Copyright (c) 2014 Open Grid Computing, Inc. All rights reserved. 3 * Copyright (c) 2005-2007 Network Appliance, Inc. All rights reserved. 4 * 5 * This software is available to you under a choice of one of two 6 * licenses. You may choose to be licensed under the terms of the GNU 7 * General Public License (GPL) Version 2, available from the file 8 * COPYING in the main directory of this source tree, or the BSD-type 9 * license below: 10 * 11 * Redistribution and use in source and binary forms, with or without 12 * modification, are permitted provided that the following conditions 13 * are met: 14 * 15 * Redistributions of source code must retain the above copyright 16 * notice, this list of conditions and the following disclaimer. 17 * 18 * Redistributions in binary form must reproduce the above 19 * copyright notice, this list of conditions and the following 20 * disclaimer in the documentation and/or other materials provided 21 * with the distribution. 22 * 23 * Neither the name of the Network Appliance, Inc. nor the names of 24 * its contributors may be used to endorse or promote products 25 * derived from this software without specific prior written 26 * permission. 27 * 28 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 29 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 30 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 31 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 32 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 33 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 34 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 35 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 36 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 37 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 38 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 39 * 40 * Author: Tom Tucker <tom@opengridcomputing.com> 41 */ 42 43 #include <linux/sunrpc/svc_xprt.h> 44 #include <linux/sunrpc/addr.h> 45 #include <linux/sunrpc/debug.h> 46 #include <linux/sunrpc/rpc_rdma.h> 47 #include <linux/interrupt.h> 48 #include <linux/sched.h> 49 #include <linux/slab.h> 50 #include <linux/spinlock.h> 51 #include <linux/workqueue.h> 52 #include <rdma/ib_verbs.h> 53 #include <rdma/rdma_cm.h> 54 #include <rdma/rw.h> 55 #include <linux/sunrpc/svc_rdma.h> 56 #include <linux/export.h> 57 #include "xprt_rdma.h" 58 59 #define RPCDBG_FACILITY RPCDBG_SVCXPRT 60 61 static int svc_rdma_post_recv(struct svcxprt_rdma *xprt); 62 static struct svcxprt_rdma *rdma_create_xprt(struct svc_serv *, int); 63 static struct svc_xprt *svc_rdma_create(struct svc_serv *serv, 64 struct net *net, 65 struct sockaddr *sa, int salen, 66 int flags); 67 static struct svc_xprt *svc_rdma_accept(struct svc_xprt *xprt); 68 static void svc_rdma_release_rqst(struct svc_rqst *); 69 static void svc_rdma_detach(struct svc_xprt *xprt); 70 static void svc_rdma_free(struct svc_xprt *xprt); 71 static int svc_rdma_has_wspace(struct svc_xprt *xprt); 72 static int svc_rdma_secure_port(struct svc_rqst *); 73 static void svc_rdma_kill_temp_xprt(struct svc_xprt *); 74 75 static const struct svc_xprt_ops svc_rdma_ops = { 76 .xpo_create = svc_rdma_create, 77 .xpo_recvfrom = svc_rdma_recvfrom, 78 .xpo_sendto = svc_rdma_sendto, 79 .xpo_release_rqst = svc_rdma_release_rqst, 80 .xpo_detach = svc_rdma_detach, 81 .xpo_free = svc_rdma_free, 82 .xpo_prep_reply_hdr = svc_rdma_prep_reply_hdr, 83 .xpo_has_wspace = svc_rdma_has_wspace, 84 .xpo_accept = svc_rdma_accept, 85 .xpo_secure_port = svc_rdma_secure_port, 86 .xpo_kill_temp_xprt = svc_rdma_kill_temp_xprt, 87 }; 88 89 struct svc_xprt_class svc_rdma_class = { 90 .xcl_name = "rdma", 91 .xcl_owner = THIS_MODULE, 92 .xcl_ops = &svc_rdma_ops, 93 .xcl_max_payload = RPCSVC_MAXPAYLOAD_RDMA, 94 .xcl_ident = XPRT_TRANSPORT_RDMA, 95 }; 96 97 #if defined(CONFIG_SUNRPC_BACKCHANNEL) 98 static struct svc_xprt *svc_rdma_bc_create(struct svc_serv *, struct net *, 99 struct sockaddr *, int, int); 100 static void svc_rdma_bc_detach(struct svc_xprt *); 101 static void svc_rdma_bc_free(struct svc_xprt *); 102 103 static const struct svc_xprt_ops svc_rdma_bc_ops = { 104 .xpo_create = svc_rdma_bc_create, 105 .xpo_detach = svc_rdma_bc_detach, 106 .xpo_free = svc_rdma_bc_free, 107 .xpo_prep_reply_hdr = svc_rdma_prep_reply_hdr, 108 .xpo_secure_port = svc_rdma_secure_port, 109 }; 110 111 struct svc_xprt_class svc_rdma_bc_class = { 112 .xcl_name = "rdma-bc", 113 .xcl_owner = THIS_MODULE, 114 .xcl_ops = &svc_rdma_bc_ops, 115 .xcl_max_payload = (1024 - RPCRDMA_HDRLEN_MIN) 116 }; 117 118 static struct svc_xprt *svc_rdma_bc_create(struct svc_serv *serv, 119 struct net *net, 120 struct sockaddr *sa, int salen, 121 int flags) 122 { 123 struct svcxprt_rdma *cma_xprt; 124 struct svc_xprt *xprt; 125 126 cma_xprt = rdma_create_xprt(serv, 0); 127 if (!cma_xprt) 128 return ERR_PTR(-ENOMEM); 129 xprt = &cma_xprt->sc_xprt; 130 131 svc_xprt_init(net, &svc_rdma_bc_class, xprt, serv); 132 set_bit(XPT_CONG_CTRL, &xprt->xpt_flags); 133 serv->sv_bc_xprt = xprt; 134 135 dprintk("svcrdma: %s(%p)\n", __func__, xprt); 136 return xprt; 137 } 138 139 static void svc_rdma_bc_detach(struct svc_xprt *xprt) 140 { 141 dprintk("svcrdma: %s(%p)\n", __func__, xprt); 142 } 143 144 static void svc_rdma_bc_free(struct svc_xprt *xprt) 145 { 146 struct svcxprt_rdma *rdma = 147 container_of(xprt, struct svcxprt_rdma, sc_xprt); 148 149 dprintk("svcrdma: %s(%p)\n", __func__, xprt); 150 if (xprt) 151 kfree(rdma); 152 } 153 #endif /* CONFIG_SUNRPC_BACKCHANNEL */ 154 155 static struct svc_rdma_op_ctxt *alloc_ctxt(struct svcxprt_rdma *xprt, 156 gfp_t flags) 157 { 158 struct svc_rdma_op_ctxt *ctxt; 159 160 ctxt = kmalloc(sizeof(*ctxt), flags); 161 if (ctxt) { 162 ctxt->xprt = xprt; 163 INIT_LIST_HEAD(&ctxt->list); 164 } 165 return ctxt; 166 } 167 168 static bool svc_rdma_prealloc_ctxts(struct svcxprt_rdma *xprt) 169 { 170 unsigned int i; 171 172 /* Each RPC/RDMA credit can consume one Receive and 173 * one Send WQE at the same time. 174 */ 175 i = xprt->sc_sq_depth + xprt->sc_rq_depth; 176 177 while (i--) { 178 struct svc_rdma_op_ctxt *ctxt; 179 180 ctxt = alloc_ctxt(xprt, GFP_KERNEL); 181 if (!ctxt) { 182 dprintk("svcrdma: No memory for RDMA ctxt\n"); 183 return false; 184 } 185 list_add(&ctxt->list, &xprt->sc_ctxts); 186 } 187 return true; 188 } 189 190 struct svc_rdma_op_ctxt *svc_rdma_get_context(struct svcxprt_rdma *xprt) 191 { 192 struct svc_rdma_op_ctxt *ctxt = NULL; 193 194 spin_lock(&xprt->sc_ctxt_lock); 195 xprt->sc_ctxt_used++; 196 if (list_empty(&xprt->sc_ctxts)) 197 goto out_empty; 198 199 ctxt = list_first_entry(&xprt->sc_ctxts, 200 struct svc_rdma_op_ctxt, list); 201 list_del(&ctxt->list); 202 spin_unlock(&xprt->sc_ctxt_lock); 203 204 out: 205 ctxt->count = 0; 206 ctxt->mapped_sges = 0; 207 return ctxt; 208 209 out_empty: 210 /* Either pre-allocation missed the mark, or send 211 * queue accounting is broken. 212 */ 213 spin_unlock(&xprt->sc_ctxt_lock); 214 215 ctxt = alloc_ctxt(xprt, GFP_NOIO); 216 if (ctxt) 217 goto out; 218 219 spin_lock(&xprt->sc_ctxt_lock); 220 xprt->sc_ctxt_used--; 221 spin_unlock(&xprt->sc_ctxt_lock); 222 WARN_ONCE(1, "svcrdma: empty RDMA ctxt list?\n"); 223 return NULL; 224 } 225 226 void svc_rdma_unmap_dma(struct svc_rdma_op_ctxt *ctxt) 227 { 228 struct svcxprt_rdma *xprt = ctxt->xprt; 229 struct ib_device *device = xprt->sc_cm_id->device; 230 unsigned int i; 231 232 for (i = 0; i < ctxt->mapped_sges; i++) 233 ib_dma_unmap_page(device, 234 ctxt->sge[i].addr, 235 ctxt->sge[i].length, 236 ctxt->direction); 237 ctxt->mapped_sges = 0; 238 } 239 240 void svc_rdma_put_context(struct svc_rdma_op_ctxt *ctxt, int free_pages) 241 { 242 struct svcxprt_rdma *xprt = ctxt->xprt; 243 int i; 244 245 if (free_pages) 246 for (i = 0; i < ctxt->count; i++) 247 put_page(ctxt->pages[i]); 248 249 spin_lock(&xprt->sc_ctxt_lock); 250 xprt->sc_ctxt_used--; 251 list_add(&ctxt->list, &xprt->sc_ctxts); 252 spin_unlock(&xprt->sc_ctxt_lock); 253 } 254 255 static void svc_rdma_destroy_ctxts(struct svcxprt_rdma *xprt) 256 { 257 while (!list_empty(&xprt->sc_ctxts)) { 258 struct svc_rdma_op_ctxt *ctxt; 259 260 ctxt = list_first_entry(&xprt->sc_ctxts, 261 struct svc_rdma_op_ctxt, list); 262 list_del(&ctxt->list); 263 kfree(ctxt); 264 } 265 } 266 267 /* QP event handler */ 268 static void qp_event_handler(struct ib_event *event, void *context) 269 { 270 struct svc_xprt *xprt = context; 271 272 switch (event->event) { 273 /* These are considered benign events */ 274 case IB_EVENT_PATH_MIG: 275 case IB_EVENT_COMM_EST: 276 case IB_EVENT_SQ_DRAINED: 277 case IB_EVENT_QP_LAST_WQE_REACHED: 278 dprintk("svcrdma: QP event %s (%d) received for QP=%p\n", 279 ib_event_msg(event->event), event->event, 280 event->element.qp); 281 break; 282 /* These are considered fatal events */ 283 case IB_EVENT_PATH_MIG_ERR: 284 case IB_EVENT_QP_FATAL: 285 case IB_EVENT_QP_REQ_ERR: 286 case IB_EVENT_QP_ACCESS_ERR: 287 case IB_EVENT_DEVICE_FATAL: 288 default: 289 dprintk("svcrdma: QP ERROR event %s (%d) received for QP=%p, " 290 "closing transport\n", 291 ib_event_msg(event->event), event->event, 292 event->element.qp); 293 set_bit(XPT_CLOSE, &xprt->xpt_flags); 294 svc_xprt_enqueue(xprt); 295 break; 296 } 297 } 298 299 /** 300 * svc_rdma_wc_receive - Invoked by RDMA provider for each polled Receive WC 301 * @cq: completion queue 302 * @wc: completed WR 303 * 304 */ 305 static void svc_rdma_wc_receive(struct ib_cq *cq, struct ib_wc *wc) 306 { 307 struct svcxprt_rdma *xprt = cq->cq_context; 308 struct ib_cqe *cqe = wc->wr_cqe; 309 struct svc_rdma_op_ctxt *ctxt; 310 311 /* WARNING: Only wc->wr_cqe and wc->status are reliable */ 312 ctxt = container_of(cqe, struct svc_rdma_op_ctxt, cqe); 313 svc_rdma_unmap_dma(ctxt); 314 315 if (wc->status != IB_WC_SUCCESS) 316 goto flushed; 317 318 /* All wc fields are now known to be valid */ 319 ctxt->byte_len = wc->byte_len; 320 spin_lock(&xprt->sc_rq_dto_lock); 321 list_add_tail(&ctxt->list, &xprt->sc_rq_dto_q); 322 spin_unlock(&xprt->sc_rq_dto_lock); 323 324 svc_rdma_post_recv(xprt); 325 326 set_bit(XPT_DATA, &xprt->sc_xprt.xpt_flags); 327 if (test_bit(RDMAXPRT_CONN_PENDING, &xprt->sc_flags)) 328 goto out; 329 goto out_enqueue; 330 331 flushed: 332 if (wc->status != IB_WC_WR_FLUSH_ERR) 333 pr_warn("svcrdma: receive: %s (%u/0x%x)\n", 334 ib_wc_status_msg(wc->status), 335 wc->status, wc->vendor_err); 336 set_bit(XPT_CLOSE, &xprt->sc_xprt.xpt_flags); 337 svc_rdma_put_context(ctxt, 1); 338 339 out_enqueue: 340 svc_xprt_enqueue(&xprt->sc_xprt); 341 out: 342 svc_xprt_put(&xprt->sc_xprt); 343 } 344 345 /** 346 * svc_rdma_wc_send - Invoked by RDMA provider for each polled Send WC 347 * @cq: completion queue 348 * @wc: completed WR 349 * 350 */ 351 void svc_rdma_wc_send(struct ib_cq *cq, struct ib_wc *wc) 352 { 353 struct svcxprt_rdma *xprt = cq->cq_context; 354 struct ib_cqe *cqe = wc->wr_cqe; 355 struct svc_rdma_op_ctxt *ctxt; 356 357 atomic_inc(&xprt->sc_sq_avail); 358 wake_up(&xprt->sc_send_wait); 359 360 ctxt = container_of(cqe, struct svc_rdma_op_ctxt, cqe); 361 svc_rdma_unmap_dma(ctxt); 362 svc_rdma_put_context(ctxt, 1); 363 364 if (unlikely(wc->status != IB_WC_SUCCESS)) { 365 set_bit(XPT_CLOSE, &xprt->sc_xprt.xpt_flags); 366 svc_xprt_enqueue(&xprt->sc_xprt); 367 if (wc->status != IB_WC_WR_FLUSH_ERR) 368 pr_err("svcrdma: Send: %s (%u/0x%x)\n", 369 ib_wc_status_msg(wc->status), 370 wc->status, wc->vendor_err); 371 } 372 373 svc_xprt_put(&xprt->sc_xprt); 374 } 375 376 static struct svcxprt_rdma *rdma_create_xprt(struct svc_serv *serv, 377 int listener) 378 { 379 struct svcxprt_rdma *cma_xprt = kzalloc(sizeof *cma_xprt, GFP_KERNEL); 380 381 if (!cma_xprt) 382 return NULL; 383 svc_xprt_init(&init_net, &svc_rdma_class, &cma_xprt->sc_xprt, serv); 384 INIT_LIST_HEAD(&cma_xprt->sc_accept_q); 385 INIT_LIST_HEAD(&cma_xprt->sc_rq_dto_q); 386 INIT_LIST_HEAD(&cma_xprt->sc_read_complete_q); 387 INIT_LIST_HEAD(&cma_xprt->sc_ctxts); 388 INIT_LIST_HEAD(&cma_xprt->sc_rw_ctxts); 389 init_waitqueue_head(&cma_xprt->sc_send_wait); 390 391 spin_lock_init(&cma_xprt->sc_lock); 392 spin_lock_init(&cma_xprt->sc_rq_dto_lock); 393 spin_lock_init(&cma_xprt->sc_ctxt_lock); 394 spin_lock_init(&cma_xprt->sc_rw_ctxt_lock); 395 396 /* 397 * Note that this implies that the underlying transport support 398 * has some form of congestion control (see RFC 7530 section 3.1 399 * paragraph 2). For now, we assume that all supported RDMA 400 * transports are suitable here. 401 */ 402 set_bit(XPT_CONG_CTRL, &cma_xprt->sc_xprt.xpt_flags); 403 404 if (listener) 405 set_bit(XPT_LISTENER, &cma_xprt->sc_xprt.xpt_flags); 406 407 return cma_xprt; 408 } 409 410 static int 411 svc_rdma_post_recv(struct svcxprt_rdma *xprt) 412 { 413 struct ib_recv_wr recv_wr, *bad_recv_wr; 414 struct svc_rdma_op_ctxt *ctxt; 415 struct page *page; 416 dma_addr_t pa; 417 int sge_no; 418 int buflen; 419 int ret; 420 421 ctxt = svc_rdma_get_context(xprt); 422 buflen = 0; 423 ctxt->direction = DMA_FROM_DEVICE; 424 ctxt->cqe.done = svc_rdma_wc_receive; 425 for (sge_no = 0; buflen < xprt->sc_max_req_size; sge_no++) { 426 if (sge_no >= xprt->sc_max_sge) { 427 pr_err("svcrdma: Too many sges (%d)\n", sge_no); 428 goto err_put_ctxt; 429 } 430 page = alloc_page(GFP_KERNEL); 431 if (!page) 432 goto err_put_ctxt; 433 ctxt->pages[sge_no] = page; 434 pa = ib_dma_map_page(xprt->sc_cm_id->device, 435 page, 0, PAGE_SIZE, 436 DMA_FROM_DEVICE); 437 if (ib_dma_mapping_error(xprt->sc_cm_id->device, pa)) 438 goto err_put_ctxt; 439 svc_rdma_count_mappings(xprt, ctxt); 440 ctxt->sge[sge_no].addr = pa; 441 ctxt->sge[sge_no].length = PAGE_SIZE; 442 ctxt->sge[sge_no].lkey = xprt->sc_pd->local_dma_lkey; 443 ctxt->count = sge_no + 1; 444 buflen += PAGE_SIZE; 445 } 446 recv_wr.next = NULL; 447 recv_wr.sg_list = &ctxt->sge[0]; 448 recv_wr.num_sge = ctxt->count; 449 recv_wr.wr_cqe = &ctxt->cqe; 450 451 svc_xprt_get(&xprt->sc_xprt); 452 ret = ib_post_recv(xprt->sc_qp, &recv_wr, &bad_recv_wr); 453 if (ret) { 454 svc_rdma_unmap_dma(ctxt); 455 svc_rdma_put_context(ctxt, 1); 456 svc_xprt_put(&xprt->sc_xprt); 457 } 458 return ret; 459 460 err_put_ctxt: 461 svc_rdma_unmap_dma(ctxt); 462 svc_rdma_put_context(ctxt, 1); 463 return -ENOMEM; 464 } 465 466 static void 467 svc_rdma_parse_connect_private(struct svcxprt_rdma *newxprt, 468 struct rdma_conn_param *param) 469 { 470 const struct rpcrdma_connect_private *pmsg = param->private_data; 471 472 if (pmsg && 473 pmsg->cp_magic == rpcrdma_cmp_magic && 474 pmsg->cp_version == RPCRDMA_CMP_VERSION) { 475 newxprt->sc_snd_w_inv = pmsg->cp_flags & 476 RPCRDMA_CMP_F_SND_W_INV_OK; 477 478 dprintk("svcrdma: client send_size %u, recv_size %u " 479 "remote inv %ssupported\n", 480 rpcrdma_decode_buffer_size(pmsg->cp_send_size), 481 rpcrdma_decode_buffer_size(pmsg->cp_recv_size), 482 newxprt->sc_snd_w_inv ? "" : "un"); 483 } 484 } 485 486 /* 487 * This function handles the CONNECT_REQUEST event on a listening 488 * endpoint. It is passed the cma_id for the _new_ connection. The context in 489 * this cma_id is inherited from the listening cma_id and is the svc_xprt 490 * structure for the listening endpoint. 491 * 492 * This function creates a new xprt for the new connection and enqueues it on 493 * the accept queue for the listent xprt. When the listen thread is kicked, it 494 * will call the recvfrom method on the listen xprt which will accept the new 495 * connection. 496 */ 497 static void handle_connect_req(struct rdma_cm_id *new_cma_id, 498 struct rdma_conn_param *param) 499 { 500 struct svcxprt_rdma *listen_xprt = new_cma_id->context; 501 struct svcxprt_rdma *newxprt; 502 struct sockaddr *sa; 503 504 /* Create a new transport */ 505 newxprt = rdma_create_xprt(listen_xprt->sc_xprt.xpt_server, 0); 506 if (!newxprt) { 507 dprintk("svcrdma: failed to create new transport\n"); 508 return; 509 } 510 newxprt->sc_cm_id = new_cma_id; 511 new_cma_id->context = newxprt; 512 dprintk("svcrdma: Creating newxprt=%p, cm_id=%p, listenxprt=%p\n", 513 newxprt, newxprt->sc_cm_id, listen_xprt); 514 svc_rdma_parse_connect_private(newxprt, param); 515 516 /* Save client advertised inbound read limit for use later in accept. */ 517 newxprt->sc_ord = param->initiator_depth; 518 519 /* Set the local and remote addresses in the transport */ 520 sa = (struct sockaddr *)&newxprt->sc_cm_id->route.addr.dst_addr; 521 svc_xprt_set_remote(&newxprt->sc_xprt, sa, svc_addr_len(sa)); 522 sa = (struct sockaddr *)&newxprt->sc_cm_id->route.addr.src_addr; 523 svc_xprt_set_local(&newxprt->sc_xprt, sa, svc_addr_len(sa)); 524 525 /* 526 * Enqueue the new transport on the accept queue of the listening 527 * transport 528 */ 529 spin_lock_bh(&listen_xprt->sc_lock); 530 list_add_tail(&newxprt->sc_accept_q, &listen_xprt->sc_accept_q); 531 spin_unlock_bh(&listen_xprt->sc_lock); 532 533 set_bit(XPT_CONN, &listen_xprt->sc_xprt.xpt_flags); 534 svc_xprt_enqueue(&listen_xprt->sc_xprt); 535 } 536 537 /* 538 * Handles events generated on the listening endpoint. These events will be 539 * either be incoming connect requests or adapter removal events. 540 */ 541 static int rdma_listen_handler(struct rdma_cm_id *cma_id, 542 struct rdma_cm_event *event) 543 { 544 struct svcxprt_rdma *xprt = cma_id->context; 545 int ret = 0; 546 547 switch (event->event) { 548 case RDMA_CM_EVENT_CONNECT_REQUEST: 549 dprintk("svcrdma: Connect request on cma_id=%p, xprt = %p, " 550 "event = %s (%d)\n", cma_id, cma_id->context, 551 rdma_event_msg(event->event), event->event); 552 handle_connect_req(cma_id, &event->param.conn); 553 break; 554 555 case RDMA_CM_EVENT_ESTABLISHED: 556 /* Accept complete */ 557 dprintk("svcrdma: Connection completed on LISTEN xprt=%p, " 558 "cm_id=%p\n", xprt, cma_id); 559 break; 560 561 case RDMA_CM_EVENT_DEVICE_REMOVAL: 562 dprintk("svcrdma: Device removal xprt=%p, cm_id=%p\n", 563 xprt, cma_id); 564 if (xprt) { 565 set_bit(XPT_CLOSE, &xprt->sc_xprt.xpt_flags); 566 svc_xprt_enqueue(&xprt->sc_xprt); 567 } 568 break; 569 570 default: 571 dprintk("svcrdma: Unexpected event on listening endpoint %p, " 572 "event = %s (%d)\n", cma_id, 573 rdma_event_msg(event->event), event->event); 574 break; 575 } 576 577 return ret; 578 } 579 580 static int rdma_cma_handler(struct rdma_cm_id *cma_id, 581 struct rdma_cm_event *event) 582 { 583 struct svc_xprt *xprt = cma_id->context; 584 struct svcxprt_rdma *rdma = 585 container_of(xprt, struct svcxprt_rdma, sc_xprt); 586 switch (event->event) { 587 case RDMA_CM_EVENT_ESTABLISHED: 588 /* Accept complete */ 589 svc_xprt_get(xprt); 590 dprintk("svcrdma: Connection completed on DTO xprt=%p, " 591 "cm_id=%p\n", xprt, cma_id); 592 clear_bit(RDMAXPRT_CONN_PENDING, &rdma->sc_flags); 593 svc_xprt_enqueue(xprt); 594 break; 595 case RDMA_CM_EVENT_DISCONNECTED: 596 dprintk("svcrdma: Disconnect on DTO xprt=%p, cm_id=%p\n", 597 xprt, cma_id); 598 if (xprt) { 599 set_bit(XPT_CLOSE, &xprt->xpt_flags); 600 svc_xprt_enqueue(xprt); 601 svc_xprt_put(xprt); 602 } 603 break; 604 case RDMA_CM_EVENT_DEVICE_REMOVAL: 605 dprintk("svcrdma: Device removal cma_id=%p, xprt = %p, " 606 "event = %s (%d)\n", cma_id, xprt, 607 rdma_event_msg(event->event), event->event); 608 if (xprt) { 609 set_bit(XPT_CLOSE, &xprt->xpt_flags); 610 svc_xprt_enqueue(xprt); 611 svc_xprt_put(xprt); 612 } 613 break; 614 default: 615 dprintk("svcrdma: Unexpected event on DTO endpoint %p, " 616 "event = %s (%d)\n", cma_id, 617 rdma_event_msg(event->event), event->event); 618 break; 619 } 620 return 0; 621 } 622 623 /* 624 * Create a listening RDMA service endpoint. 625 */ 626 static struct svc_xprt *svc_rdma_create(struct svc_serv *serv, 627 struct net *net, 628 struct sockaddr *sa, int salen, 629 int flags) 630 { 631 struct rdma_cm_id *listen_id; 632 struct svcxprt_rdma *cma_xprt; 633 int ret; 634 635 dprintk("svcrdma: Creating RDMA socket\n"); 636 if ((sa->sa_family != AF_INET) && (sa->sa_family != AF_INET6)) { 637 dprintk("svcrdma: Address family %d is not supported.\n", sa->sa_family); 638 return ERR_PTR(-EAFNOSUPPORT); 639 } 640 cma_xprt = rdma_create_xprt(serv, 1); 641 if (!cma_xprt) 642 return ERR_PTR(-ENOMEM); 643 644 listen_id = rdma_create_id(&init_net, rdma_listen_handler, cma_xprt, 645 RDMA_PS_TCP, IB_QPT_RC); 646 if (IS_ERR(listen_id)) { 647 ret = PTR_ERR(listen_id); 648 dprintk("svcrdma: rdma_create_id failed = %d\n", ret); 649 goto err0; 650 } 651 652 /* Allow both IPv4 and IPv6 sockets to bind a single port 653 * at the same time. 654 */ 655 #if IS_ENABLED(CONFIG_IPV6) 656 ret = rdma_set_afonly(listen_id, 1); 657 if (ret) { 658 dprintk("svcrdma: rdma_set_afonly failed = %d\n", ret); 659 goto err1; 660 } 661 #endif 662 ret = rdma_bind_addr(listen_id, sa); 663 if (ret) { 664 dprintk("svcrdma: rdma_bind_addr failed = %d\n", ret); 665 goto err1; 666 } 667 cma_xprt->sc_cm_id = listen_id; 668 669 ret = rdma_listen(listen_id, RPCRDMA_LISTEN_BACKLOG); 670 if (ret) { 671 dprintk("svcrdma: rdma_listen failed = %d\n", ret); 672 goto err1; 673 } 674 675 /* 676 * We need to use the address from the cm_id in case the 677 * caller specified 0 for the port number. 678 */ 679 sa = (struct sockaddr *)&cma_xprt->sc_cm_id->route.addr.src_addr; 680 svc_xprt_set_local(&cma_xprt->sc_xprt, sa, salen); 681 682 return &cma_xprt->sc_xprt; 683 684 err1: 685 rdma_destroy_id(listen_id); 686 err0: 687 kfree(cma_xprt); 688 return ERR_PTR(ret); 689 } 690 691 /* 692 * This is the xpo_recvfrom function for listening endpoints. Its 693 * purpose is to accept incoming connections. The CMA callback handler 694 * has already created a new transport and attached it to the new CMA 695 * ID. 696 * 697 * There is a queue of pending connections hung on the listening 698 * transport. This queue contains the new svc_xprt structure. This 699 * function takes svc_xprt structures off the accept_q and completes 700 * the connection. 701 */ 702 static struct svc_xprt *svc_rdma_accept(struct svc_xprt *xprt) 703 { 704 struct svcxprt_rdma *listen_rdma; 705 struct svcxprt_rdma *newxprt = NULL; 706 struct rdma_conn_param conn_param; 707 struct rpcrdma_connect_private pmsg; 708 struct ib_qp_init_attr qp_attr; 709 struct ib_device *dev; 710 struct sockaddr *sap; 711 unsigned int i, ctxts; 712 int ret = 0; 713 714 listen_rdma = container_of(xprt, struct svcxprt_rdma, sc_xprt); 715 clear_bit(XPT_CONN, &xprt->xpt_flags); 716 /* Get the next entry off the accept list */ 717 spin_lock_bh(&listen_rdma->sc_lock); 718 if (!list_empty(&listen_rdma->sc_accept_q)) { 719 newxprt = list_entry(listen_rdma->sc_accept_q.next, 720 struct svcxprt_rdma, sc_accept_q); 721 list_del_init(&newxprt->sc_accept_q); 722 } 723 if (!list_empty(&listen_rdma->sc_accept_q)) 724 set_bit(XPT_CONN, &listen_rdma->sc_xprt.xpt_flags); 725 spin_unlock_bh(&listen_rdma->sc_lock); 726 if (!newxprt) 727 return NULL; 728 729 dprintk("svcrdma: newxprt from accept queue = %p, cm_id=%p\n", 730 newxprt, newxprt->sc_cm_id); 731 732 dev = newxprt->sc_cm_id->device; 733 newxprt->sc_port_num = newxprt->sc_cm_id->port_num; 734 735 /* Qualify the transport resource defaults with the 736 * capabilities of this particular device */ 737 newxprt->sc_max_sge = min((size_t)dev->attrs.max_sge, 738 (size_t)RPCSVC_MAXPAGES); 739 newxprt->sc_max_req_size = svcrdma_max_req_size; 740 newxprt->sc_max_requests = svcrdma_max_requests; 741 newxprt->sc_max_bc_requests = svcrdma_max_bc_requests; 742 newxprt->sc_rq_depth = newxprt->sc_max_requests + 743 newxprt->sc_max_bc_requests; 744 if (newxprt->sc_rq_depth > dev->attrs.max_qp_wr) { 745 pr_warn("svcrdma: reducing receive depth to %d\n", 746 dev->attrs.max_qp_wr); 747 newxprt->sc_rq_depth = dev->attrs.max_qp_wr; 748 newxprt->sc_max_requests = newxprt->sc_rq_depth - 2; 749 newxprt->sc_max_bc_requests = 2; 750 } 751 newxprt->sc_fc_credits = cpu_to_be32(newxprt->sc_max_requests); 752 ctxts = rdma_rw_mr_factor(dev, newxprt->sc_port_num, RPCSVC_MAXPAGES); 753 ctxts *= newxprt->sc_max_requests; 754 newxprt->sc_sq_depth = newxprt->sc_rq_depth + ctxts; 755 if (newxprt->sc_sq_depth > dev->attrs.max_qp_wr) { 756 pr_warn("svcrdma: reducing send depth to %d\n", 757 dev->attrs.max_qp_wr); 758 newxprt->sc_sq_depth = dev->attrs.max_qp_wr; 759 } 760 atomic_set(&newxprt->sc_sq_avail, newxprt->sc_sq_depth); 761 762 if (!svc_rdma_prealloc_ctxts(newxprt)) 763 goto errout; 764 765 /* 766 * Limit ORD based on client limit, local device limit, and 767 * configured svcrdma limit. 768 */ 769 newxprt->sc_ord = min_t(size_t, dev->attrs.max_qp_rd_atom, newxprt->sc_ord); 770 newxprt->sc_ord = min_t(size_t, svcrdma_ord, newxprt->sc_ord); 771 772 newxprt->sc_pd = ib_alloc_pd(dev, 0); 773 if (IS_ERR(newxprt->sc_pd)) { 774 dprintk("svcrdma: error creating PD for connect request\n"); 775 goto errout; 776 } 777 newxprt->sc_sq_cq = ib_alloc_cq(dev, newxprt, newxprt->sc_sq_depth, 778 0, IB_POLL_WORKQUEUE); 779 if (IS_ERR(newxprt->sc_sq_cq)) { 780 dprintk("svcrdma: error creating SQ CQ for connect request\n"); 781 goto errout; 782 } 783 newxprt->sc_rq_cq = ib_alloc_cq(dev, newxprt, newxprt->sc_rq_depth, 784 0, IB_POLL_WORKQUEUE); 785 if (IS_ERR(newxprt->sc_rq_cq)) { 786 dprintk("svcrdma: error creating RQ CQ for connect request\n"); 787 goto errout; 788 } 789 790 memset(&qp_attr, 0, sizeof qp_attr); 791 qp_attr.event_handler = qp_event_handler; 792 qp_attr.qp_context = &newxprt->sc_xprt; 793 qp_attr.port_num = newxprt->sc_port_num; 794 qp_attr.cap.max_rdma_ctxs = ctxts; 795 qp_attr.cap.max_send_wr = newxprt->sc_sq_depth - ctxts; 796 qp_attr.cap.max_recv_wr = newxprt->sc_rq_depth; 797 qp_attr.cap.max_send_sge = newxprt->sc_max_sge; 798 qp_attr.cap.max_recv_sge = newxprt->sc_max_sge; 799 qp_attr.sq_sig_type = IB_SIGNAL_REQ_WR; 800 qp_attr.qp_type = IB_QPT_RC; 801 qp_attr.send_cq = newxprt->sc_sq_cq; 802 qp_attr.recv_cq = newxprt->sc_rq_cq; 803 dprintk("svcrdma: newxprt->sc_cm_id=%p, newxprt->sc_pd=%p\n", 804 newxprt->sc_cm_id, newxprt->sc_pd); 805 dprintk(" cap.max_send_wr = %d, cap.max_recv_wr = %d\n", 806 qp_attr.cap.max_send_wr, qp_attr.cap.max_recv_wr); 807 dprintk(" cap.max_send_sge = %d, cap.max_recv_sge = %d\n", 808 qp_attr.cap.max_send_sge, qp_attr.cap.max_recv_sge); 809 810 ret = rdma_create_qp(newxprt->sc_cm_id, newxprt->sc_pd, &qp_attr); 811 if (ret) { 812 dprintk("svcrdma: failed to create QP, ret=%d\n", ret); 813 goto errout; 814 } 815 newxprt->sc_qp = newxprt->sc_cm_id->qp; 816 817 if (!(dev->attrs.device_cap_flags & IB_DEVICE_MEM_MGT_EXTENSIONS)) 818 newxprt->sc_snd_w_inv = false; 819 if (!rdma_protocol_iwarp(dev, newxprt->sc_port_num) && 820 !rdma_ib_or_roce(dev, newxprt->sc_port_num)) 821 goto errout; 822 823 /* Post receive buffers */ 824 for (i = 0; i < newxprt->sc_max_requests; i++) { 825 ret = svc_rdma_post_recv(newxprt); 826 if (ret) { 827 dprintk("svcrdma: failure posting receive buffers\n"); 828 goto errout; 829 } 830 } 831 832 /* Swap out the handler */ 833 newxprt->sc_cm_id->event_handler = rdma_cma_handler; 834 835 /* Construct RDMA-CM private message */ 836 pmsg.cp_magic = rpcrdma_cmp_magic; 837 pmsg.cp_version = RPCRDMA_CMP_VERSION; 838 pmsg.cp_flags = 0; 839 pmsg.cp_send_size = pmsg.cp_recv_size = 840 rpcrdma_encode_buffer_size(newxprt->sc_max_req_size); 841 842 /* Accept Connection */ 843 set_bit(RDMAXPRT_CONN_PENDING, &newxprt->sc_flags); 844 memset(&conn_param, 0, sizeof conn_param); 845 conn_param.responder_resources = 0; 846 conn_param.initiator_depth = newxprt->sc_ord; 847 conn_param.private_data = &pmsg; 848 conn_param.private_data_len = sizeof(pmsg); 849 ret = rdma_accept(newxprt->sc_cm_id, &conn_param); 850 if (ret) { 851 dprintk("svcrdma: failed to accept new connection, ret=%d\n", 852 ret); 853 goto errout; 854 } 855 856 dprintk("svcrdma: new connection %p accepted:\n", newxprt); 857 sap = (struct sockaddr *)&newxprt->sc_cm_id->route.addr.src_addr; 858 dprintk(" local address : %pIS:%u\n", sap, rpc_get_port(sap)); 859 sap = (struct sockaddr *)&newxprt->sc_cm_id->route.addr.dst_addr; 860 dprintk(" remote address : %pIS:%u\n", sap, rpc_get_port(sap)); 861 dprintk(" max_sge : %d\n", newxprt->sc_max_sge); 862 dprintk(" sq_depth : %d\n", newxprt->sc_sq_depth); 863 dprintk(" rdma_rw_ctxs : %d\n", ctxts); 864 dprintk(" max_requests : %d\n", newxprt->sc_max_requests); 865 dprintk(" ord : %d\n", newxprt->sc_ord); 866 867 return &newxprt->sc_xprt; 868 869 errout: 870 dprintk("svcrdma: failure accepting new connection rc=%d.\n", ret); 871 /* Take a reference in case the DTO handler runs */ 872 svc_xprt_get(&newxprt->sc_xprt); 873 if (newxprt->sc_qp && !IS_ERR(newxprt->sc_qp)) 874 ib_destroy_qp(newxprt->sc_qp); 875 rdma_destroy_id(newxprt->sc_cm_id); 876 /* This call to put will destroy the transport */ 877 svc_xprt_put(&newxprt->sc_xprt); 878 return NULL; 879 } 880 881 static void svc_rdma_release_rqst(struct svc_rqst *rqstp) 882 { 883 } 884 885 /* 886 * When connected, an svc_xprt has at least two references: 887 * 888 * - A reference held by the cm_id between the ESTABLISHED and 889 * DISCONNECTED events. If the remote peer disconnected first, this 890 * reference could be gone. 891 * 892 * - A reference held by the svc_recv code that called this function 893 * as part of close processing. 894 * 895 * At a minimum one references should still be held. 896 */ 897 static void svc_rdma_detach(struct svc_xprt *xprt) 898 { 899 struct svcxprt_rdma *rdma = 900 container_of(xprt, struct svcxprt_rdma, sc_xprt); 901 dprintk("svc: svc_rdma_detach(%p)\n", xprt); 902 903 /* Disconnect and flush posted WQE */ 904 rdma_disconnect(rdma->sc_cm_id); 905 } 906 907 static void __svc_rdma_free(struct work_struct *work) 908 { 909 struct svcxprt_rdma *rdma = 910 container_of(work, struct svcxprt_rdma, sc_work); 911 struct svc_xprt *xprt = &rdma->sc_xprt; 912 913 dprintk("svcrdma: %s(%p)\n", __func__, rdma); 914 915 if (rdma->sc_qp && !IS_ERR(rdma->sc_qp)) 916 ib_drain_qp(rdma->sc_qp); 917 918 /* We should only be called from kref_put */ 919 if (kref_read(&xprt->xpt_ref) != 0) 920 pr_err("svcrdma: sc_xprt still in use? (%d)\n", 921 kref_read(&xprt->xpt_ref)); 922 923 while (!list_empty(&rdma->sc_read_complete_q)) { 924 struct svc_rdma_op_ctxt *ctxt; 925 ctxt = list_first_entry(&rdma->sc_read_complete_q, 926 struct svc_rdma_op_ctxt, list); 927 list_del(&ctxt->list); 928 svc_rdma_put_context(ctxt, 1); 929 } 930 while (!list_empty(&rdma->sc_rq_dto_q)) { 931 struct svc_rdma_op_ctxt *ctxt; 932 ctxt = list_first_entry(&rdma->sc_rq_dto_q, 933 struct svc_rdma_op_ctxt, list); 934 list_del(&ctxt->list); 935 svc_rdma_put_context(ctxt, 1); 936 } 937 938 /* Warn if we leaked a resource or under-referenced */ 939 if (rdma->sc_ctxt_used != 0) 940 pr_err("svcrdma: ctxt still in use? (%d)\n", 941 rdma->sc_ctxt_used); 942 943 /* Final put of backchannel client transport */ 944 if (xprt->xpt_bc_xprt) { 945 xprt_put(xprt->xpt_bc_xprt); 946 xprt->xpt_bc_xprt = NULL; 947 } 948 949 svc_rdma_destroy_rw_ctxts(rdma); 950 svc_rdma_destroy_ctxts(rdma); 951 952 /* Destroy the QP if present (not a listener) */ 953 if (rdma->sc_qp && !IS_ERR(rdma->sc_qp)) 954 ib_destroy_qp(rdma->sc_qp); 955 956 if (rdma->sc_sq_cq && !IS_ERR(rdma->sc_sq_cq)) 957 ib_free_cq(rdma->sc_sq_cq); 958 959 if (rdma->sc_rq_cq && !IS_ERR(rdma->sc_rq_cq)) 960 ib_free_cq(rdma->sc_rq_cq); 961 962 if (rdma->sc_pd && !IS_ERR(rdma->sc_pd)) 963 ib_dealloc_pd(rdma->sc_pd); 964 965 /* Destroy the CM ID */ 966 rdma_destroy_id(rdma->sc_cm_id); 967 968 kfree(rdma); 969 } 970 971 static void svc_rdma_free(struct svc_xprt *xprt) 972 { 973 struct svcxprt_rdma *rdma = 974 container_of(xprt, struct svcxprt_rdma, sc_xprt); 975 INIT_WORK(&rdma->sc_work, __svc_rdma_free); 976 queue_work(svc_rdma_wq, &rdma->sc_work); 977 } 978 979 static int svc_rdma_has_wspace(struct svc_xprt *xprt) 980 { 981 struct svcxprt_rdma *rdma = 982 container_of(xprt, struct svcxprt_rdma, sc_xprt); 983 984 /* 985 * If there are already waiters on the SQ, 986 * return false. 987 */ 988 if (waitqueue_active(&rdma->sc_send_wait)) 989 return 0; 990 991 /* Otherwise return true. */ 992 return 1; 993 } 994 995 static int svc_rdma_secure_port(struct svc_rqst *rqstp) 996 { 997 return 1; 998 } 999 1000 static void svc_rdma_kill_temp_xprt(struct svc_xprt *xprt) 1001 { 1002 } 1003 1004 int svc_rdma_send(struct svcxprt_rdma *xprt, struct ib_send_wr *wr) 1005 { 1006 struct ib_send_wr *bad_wr, *n_wr; 1007 int wr_count; 1008 int i; 1009 int ret; 1010 1011 if (test_bit(XPT_CLOSE, &xprt->sc_xprt.xpt_flags)) 1012 return -ENOTCONN; 1013 1014 wr_count = 1; 1015 for (n_wr = wr->next; n_wr; n_wr = n_wr->next) 1016 wr_count++; 1017 1018 /* If the SQ is full, wait until an SQ entry is available */ 1019 while (1) { 1020 if ((atomic_sub_return(wr_count, &xprt->sc_sq_avail) < 0)) { 1021 atomic_inc(&rdma_stat_sq_starve); 1022 1023 /* Wait until SQ WR available if SQ still full */ 1024 atomic_add(wr_count, &xprt->sc_sq_avail); 1025 wait_event(xprt->sc_send_wait, 1026 atomic_read(&xprt->sc_sq_avail) > wr_count); 1027 if (test_bit(XPT_CLOSE, &xprt->sc_xprt.xpt_flags)) 1028 return -ENOTCONN; 1029 continue; 1030 } 1031 /* Take a transport ref for each WR posted */ 1032 for (i = 0; i < wr_count; i++) 1033 svc_xprt_get(&xprt->sc_xprt); 1034 1035 /* Bump used SQ WR count and post */ 1036 ret = ib_post_send(xprt->sc_qp, wr, &bad_wr); 1037 if (ret) { 1038 set_bit(XPT_CLOSE, &xprt->sc_xprt.xpt_flags); 1039 for (i = 0; i < wr_count; i ++) 1040 svc_xprt_put(&xprt->sc_xprt); 1041 dprintk("svcrdma: failed to post SQ WR rc=%d\n", ret); 1042 dprintk(" sc_sq_avail=%d, sc_sq_depth=%d\n", 1043 atomic_read(&xprt->sc_sq_avail), 1044 xprt->sc_sq_depth); 1045 wake_up(&xprt->sc_send_wait); 1046 } 1047 break; 1048 } 1049 return ret; 1050 } 1051