1 /* 2 * Copyright (c) 2005-2007 Network Appliance, Inc. All rights reserved. 3 * 4 * This software is available to you under a choice of one of two 5 * licenses. You may choose to be licensed under the terms of the GNU 6 * General Public License (GPL) Version 2, available from the file 7 * COPYING in the main directory of this source tree, or the BSD-type 8 * license below: 9 * 10 * Redistribution and use in source and binary forms, with or without 11 * modification, are permitted provided that the following conditions 12 * are met: 13 * 14 * Redistributions of source code must retain the above copyright 15 * notice, this list of conditions and the following disclaimer. 16 * 17 * Redistributions in binary form must reproduce the above 18 * copyright notice, this list of conditions and the following 19 * disclaimer in the documentation and/or other materials provided 20 * with the distribution. 21 * 22 * Neither the name of the Network Appliance, Inc. nor the names of 23 * its contributors may be used to endorse or promote products 24 * derived from this software without specific prior written 25 * permission. 26 * 27 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 28 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 29 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 30 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 31 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 32 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 33 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 34 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 35 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 36 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 37 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 38 * 39 * Author: Tom Tucker <tom@opengridcomputing.com> 40 */ 41 42 #include <linux/sunrpc/svc_xprt.h> 43 #include <linux/sunrpc/debug.h> 44 #include <linux/sunrpc/rpc_rdma.h> 45 #include <linux/spinlock.h> 46 #include <rdma/ib_verbs.h> 47 #include <rdma/rdma_cm.h> 48 #include <linux/sunrpc/svc_rdma.h> 49 50 #define RPCDBG_FACILITY RPCDBG_SVCXPRT 51 52 static struct svc_xprt *svc_rdma_create(struct svc_serv *serv, 53 struct sockaddr *sa, int salen, 54 int flags); 55 static struct svc_xprt *svc_rdma_accept(struct svc_xprt *xprt); 56 static void svc_rdma_release_rqst(struct svc_rqst *); 57 static void dto_tasklet_func(unsigned long data); 58 static void svc_rdma_detach(struct svc_xprt *xprt); 59 static void svc_rdma_free(struct svc_xprt *xprt); 60 static int svc_rdma_has_wspace(struct svc_xprt *xprt); 61 static void rq_cq_reap(struct svcxprt_rdma *xprt); 62 static void sq_cq_reap(struct svcxprt_rdma *xprt); 63 64 DECLARE_TASKLET(dto_tasklet, dto_tasklet_func, 0UL); 65 static DEFINE_SPINLOCK(dto_lock); 66 static LIST_HEAD(dto_xprt_q); 67 68 static struct svc_xprt_ops svc_rdma_ops = { 69 .xpo_create = svc_rdma_create, 70 .xpo_recvfrom = svc_rdma_recvfrom, 71 .xpo_sendto = svc_rdma_sendto, 72 .xpo_release_rqst = svc_rdma_release_rqst, 73 .xpo_detach = svc_rdma_detach, 74 .xpo_free = svc_rdma_free, 75 .xpo_prep_reply_hdr = svc_rdma_prep_reply_hdr, 76 .xpo_has_wspace = svc_rdma_has_wspace, 77 .xpo_accept = svc_rdma_accept, 78 }; 79 80 struct svc_xprt_class svc_rdma_class = { 81 .xcl_name = "rdma", 82 .xcl_owner = THIS_MODULE, 83 .xcl_ops = &svc_rdma_ops, 84 .xcl_max_payload = RPCSVC_MAXPAYLOAD_TCP, 85 }; 86 87 static int rdma_bump_context_cache(struct svcxprt_rdma *xprt) 88 { 89 int target; 90 int at_least_one = 0; 91 struct svc_rdma_op_ctxt *ctxt; 92 93 target = min(xprt->sc_ctxt_cnt + xprt->sc_ctxt_bump, 94 xprt->sc_ctxt_max); 95 96 spin_lock_bh(&xprt->sc_ctxt_lock); 97 while (xprt->sc_ctxt_cnt < target) { 98 xprt->sc_ctxt_cnt++; 99 spin_unlock_bh(&xprt->sc_ctxt_lock); 100 101 ctxt = kmalloc(sizeof(*ctxt), GFP_KERNEL); 102 103 spin_lock_bh(&xprt->sc_ctxt_lock); 104 if (ctxt) { 105 at_least_one = 1; 106 ctxt->next = xprt->sc_ctxt_head; 107 xprt->sc_ctxt_head = ctxt; 108 } else { 109 /* kmalloc failed...give up for now */ 110 xprt->sc_ctxt_cnt--; 111 break; 112 } 113 } 114 spin_unlock_bh(&xprt->sc_ctxt_lock); 115 dprintk("svcrdma: sc_ctxt_max=%d, sc_ctxt_cnt=%d\n", 116 xprt->sc_ctxt_max, xprt->sc_ctxt_cnt); 117 return at_least_one; 118 } 119 120 struct svc_rdma_op_ctxt *svc_rdma_get_context(struct svcxprt_rdma *xprt) 121 { 122 struct svc_rdma_op_ctxt *ctxt; 123 124 while (1) { 125 spin_lock_bh(&xprt->sc_ctxt_lock); 126 if (unlikely(xprt->sc_ctxt_head == NULL)) { 127 /* Try to bump my cache. */ 128 spin_unlock_bh(&xprt->sc_ctxt_lock); 129 130 if (rdma_bump_context_cache(xprt)) 131 continue; 132 133 printk(KERN_INFO "svcrdma: sleeping waiting for " 134 "context memory on xprt=%p\n", 135 xprt); 136 schedule_timeout_uninterruptible(msecs_to_jiffies(500)); 137 continue; 138 } 139 ctxt = xprt->sc_ctxt_head; 140 xprt->sc_ctxt_head = ctxt->next; 141 spin_unlock_bh(&xprt->sc_ctxt_lock); 142 ctxt->xprt = xprt; 143 INIT_LIST_HEAD(&ctxt->dto_q); 144 ctxt->count = 0; 145 break; 146 } 147 return ctxt; 148 } 149 150 void svc_rdma_put_context(struct svc_rdma_op_ctxt *ctxt, int free_pages) 151 { 152 struct svcxprt_rdma *xprt; 153 int i; 154 155 BUG_ON(!ctxt); 156 xprt = ctxt->xprt; 157 if (free_pages) 158 for (i = 0; i < ctxt->count; i++) 159 put_page(ctxt->pages[i]); 160 161 for (i = 0; i < ctxt->count; i++) 162 dma_unmap_single(xprt->sc_cm_id->device->dma_device, 163 ctxt->sge[i].addr, 164 ctxt->sge[i].length, 165 ctxt->direction); 166 spin_lock_bh(&xprt->sc_ctxt_lock); 167 ctxt->next = xprt->sc_ctxt_head; 168 xprt->sc_ctxt_head = ctxt; 169 spin_unlock_bh(&xprt->sc_ctxt_lock); 170 } 171 172 /* ib_cq event handler */ 173 static void cq_event_handler(struct ib_event *event, void *context) 174 { 175 struct svc_xprt *xprt = context; 176 dprintk("svcrdma: received CQ event id=%d, context=%p\n", 177 event->event, context); 178 set_bit(XPT_CLOSE, &xprt->xpt_flags); 179 } 180 181 /* QP event handler */ 182 static void qp_event_handler(struct ib_event *event, void *context) 183 { 184 struct svc_xprt *xprt = context; 185 186 switch (event->event) { 187 /* These are considered benign events */ 188 case IB_EVENT_PATH_MIG: 189 case IB_EVENT_COMM_EST: 190 case IB_EVENT_SQ_DRAINED: 191 case IB_EVENT_QP_LAST_WQE_REACHED: 192 dprintk("svcrdma: QP event %d received for QP=%p\n", 193 event->event, event->element.qp); 194 break; 195 /* These are considered fatal events */ 196 case IB_EVENT_PATH_MIG_ERR: 197 case IB_EVENT_QP_FATAL: 198 case IB_EVENT_QP_REQ_ERR: 199 case IB_EVENT_QP_ACCESS_ERR: 200 case IB_EVENT_DEVICE_FATAL: 201 default: 202 dprintk("svcrdma: QP ERROR event %d received for QP=%p, " 203 "closing transport\n", 204 event->event, event->element.qp); 205 set_bit(XPT_CLOSE, &xprt->xpt_flags); 206 break; 207 } 208 } 209 210 /* 211 * Data Transfer Operation Tasklet 212 * 213 * Walks a list of transports with I/O pending, removing entries as 214 * they are added to the server's I/O pending list. Two bits indicate 215 * if SQ, RQ, or both have I/O pending. The dto_lock is an irqsave 216 * spinlock that serializes access to the transport list with the RQ 217 * and SQ interrupt handlers. 218 */ 219 static void dto_tasklet_func(unsigned long data) 220 { 221 struct svcxprt_rdma *xprt; 222 unsigned long flags; 223 224 spin_lock_irqsave(&dto_lock, flags); 225 while (!list_empty(&dto_xprt_q)) { 226 xprt = list_entry(dto_xprt_q.next, 227 struct svcxprt_rdma, sc_dto_q); 228 list_del_init(&xprt->sc_dto_q); 229 spin_unlock_irqrestore(&dto_lock, flags); 230 231 if (test_and_clear_bit(RDMAXPRT_RQ_PENDING, &xprt->sc_flags)) { 232 ib_req_notify_cq(xprt->sc_rq_cq, IB_CQ_NEXT_COMP); 233 rq_cq_reap(xprt); 234 set_bit(XPT_DATA, &xprt->sc_xprt.xpt_flags); 235 /* 236 * If data arrived before established event, 237 * don't enqueue. This defers RPC I/O until the 238 * RDMA connection is complete. 239 */ 240 if (!test_bit(RDMAXPRT_CONN_PENDING, &xprt->sc_flags)) 241 svc_xprt_enqueue(&xprt->sc_xprt); 242 } 243 244 if (test_and_clear_bit(RDMAXPRT_SQ_PENDING, &xprt->sc_flags)) { 245 ib_req_notify_cq(xprt->sc_sq_cq, IB_CQ_NEXT_COMP); 246 sq_cq_reap(xprt); 247 } 248 249 svc_xprt_put(&xprt->sc_xprt); 250 spin_lock_irqsave(&dto_lock, flags); 251 } 252 spin_unlock_irqrestore(&dto_lock, flags); 253 } 254 255 /* 256 * Receive Queue Completion Handler 257 * 258 * Since an RQ completion handler is called on interrupt context, we 259 * need to defer the handling of the I/O to a tasklet 260 */ 261 static void rq_comp_handler(struct ib_cq *cq, void *cq_context) 262 { 263 struct svcxprt_rdma *xprt = cq_context; 264 unsigned long flags; 265 266 /* 267 * Set the bit regardless of whether or not it's on the list 268 * because it may be on the list already due to an SQ 269 * completion. 270 */ 271 set_bit(RDMAXPRT_RQ_PENDING, &xprt->sc_flags); 272 273 /* 274 * If this transport is not already on the DTO transport queue, 275 * add it 276 */ 277 spin_lock_irqsave(&dto_lock, flags); 278 if (list_empty(&xprt->sc_dto_q)) { 279 svc_xprt_get(&xprt->sc_xprt); 280 list_add_tail(&xprt->sc_dto_q, &dto_xprt_q); 281 } 282 spin_unlock_irqrestore(&dto_lock, flags); 283 284 /* Tasklet does all the work to avoid irqsave locks. */ 285 tasklet_schedule(&dto_tasklet); 286 } 287 288 /* 289 * rq_cq_reap - Process the RQ CQ. 290 * 291 * Take all completing WC off the CQE and enqueue the associated DTO 292 * context on the dto_q for the transport. 293 */ 294 static void rq_cq_reap(struct svcxprt_rdma *xprt) 295 { 296 int ret; 297 struct ib_wc wc; 298 struct svc_rdma_op_ctxt *ctxt = NULL; 299 300 atomic_inc(&rdma_stat_rq_poll); 301 302 spin_lock_bh(&xprt->sc_rq_dto_lock); 303 while ((ret = ib_poll_cq(xprt->sc_rq_cq, 1, &wc)) > 0) { 304 ctxt = (struct svc_rdma_op_ctxt *)(unsigned long)wc.wr_id; 305 ctxt->wc_status = wc.status; 306 ctxt->byte_len = wc.byte_len; 307 if (wc.status != IB_WC_SUCCESS) { 308 /* Close the transport */ 309 set_bit(XPT_CLOSE, &xprt->sc_xprt.xpt_flags); 310 svc_rdma_put_context(ctxt, 1); 311 continue; 312 } 313 list_add_tail(&ctxt->dto_q, &xprt->sc_rq_dto_q); 314 } 315 spin_unlock_bh(&xprt->sc_rq_dto_lock); 316 317 if (ctxt) 318 atomic_inc(&rdma_stat_rq_prod); 319 } 320 321 /* 322 * Send Queue Completion Handler - potentially called on interrupt context. 323 */ 324 static void sq_cq_reap(struct svcxprt_rdma *xprt) 325 { 326 struct svc_rdma_op_ctxt *ctxt = NULL; 327 struct ib_wc wc; 328 struct ib_cq *cq = xprt->sc_sq_cq; 329 int ret; 330 331 atomic_inc(&rdma_stat_sq_poll); 332 while ((ret = ib_poll_cq(cq, 1, &wc)) > 0) { 333 ctxt = (struct svc_rdma_op_ctxt *)(unsigned long)wc.wr_id; 334 xprt = ctxt->xprt; 335 336 if (wc.status != IB_WC_SUCCESS) 337 /* Close the transport */ 338 set_bit(XPT_CLOSE, &xprt->sc_xprt.xpt_flags); 339 340 /* Decrement used SQ WR count */ 341 atomic_dec(&xprt->sc_sq_count); 342 wake_up(&xprt->sc_send_wait); 343 344 switch (ctxt->wr_op) { 345 case IB_WR_SEND: 346 case IB_WR_RDMA_WRITE: 347 svc_rdma_put_context(ctxt, 1); 348 break; 349 350 case IB_WR_RDMA_READ: 351 if (test_bit(RDMACTXT_F_LAST_CTXT, &ctxt->flags)) { 352 set_bit(XPT_DATA, &xprt->sc_xprt.xpt_flags); 353 set_bit(RDMACTXT_F_READ_DONE, &ctxt->flags); 354 spin_lock_bh(&xprt->sc_read_complete_lock); 355 list_add_tail(&ctxt->dto_q, 356 &xprt->sc_read_complete_q); 357 spin_unlock_bh(&xprt->sc_read_complete_lock); 358 svc_xprt_enqueue(&xprt->sc_xprt); 359 } 360 break; 361 362 default: 363 printk(KERN_ERR "svcrdma: unexpected completion type, " 364 "opcode=%d, status=%d\n", 365 wc.opcode, wc.status); 366 break; 367 } 368 } 369 370 if (ctxt) 371 atomic_inc(&rdma_stat_sq_prod); 372 } 373 374 static void sq_comp_handler(struct ib_cq *cq, void *cq_context) 375 { 376 struct svcxprt_rdma *xprt = cq_context; 377 unsigned long flags; 378 379 /* 380 * Set the bit regardless of whether or not it's on the list 381 * because it may be on the list already due to an RQ 382 * completion. 383 */ 384 set_bit(RDMAXPRT_SQ_PENDING, &xprt->sc_flags); 385 386 /* 387 * If this transport is not already on the DTO transport queue, 388 * add it 389 */ 390 spin_lock_irqsave(&dto_lock, flags); 391 if (list_empty(&xprt->sc_dto_q)) { 392 svc_xprt_get(&xprt->sc_xprt); 393 list_add_tail(&xprt->sc_dto_q, &dto_xprt_q); 394 } 395 spin_unlock_irqrestore(&dto_lock, flags); 396 397 /* Tasklet does all the work to avoid irqsave locks. */ 398 tasklet_schedule(&dto_tasklet); 399 } 400 401 static void create_context_cache(struct svcxprt_rdma *xprt, 402 int ctxt_count, int ctxt_bump, int ctxt_max) 403 { 404 struct svc_rdma_op_ctxt *ctxt; 405 int i; 406 407 xprt->sc_ctxt_max = ctxt_max; 408 xprt->sc_ctxt_bump = ctxt_bump; 409 xprt->sc_ctxt_cnt = 0; 410 xprt->sc_ctxt_head = NULL; 411 for (i = 0; i < ctxt_count; i++) { 412 ctxt = kmalloc(sizeof(*ctxt), GFP_KERNEL); 413 if (ctxt) { 414 ctxt->next = xprt->sc_ctxt_head; 415 xprt->sc_ctxt_head = ctxt; 416 xprt->sc_ctxt_cnt++; 417 } 418 } 419 } 420 421 static void destroy_context_cache(struct svc_rdma_op_ctxt *ctxt) 422 { 423 struct svc_rdma_op_ctxt *next; 424 if (!ctxt) 425 return; 426 427 do { 428 next = ctxt->next; 429 kfree(ctxt); 430 ctxt = next; 431 } while (next); 432 } 433 434 static struct svcxprt_rdma *rdma_create_xprt(struct svc_serv *serv, 435 int listener) 436 { 437 struct svcxprt_rdma *cma_xprt = kzalloc(sizeof *cma_xprt, GFP_KERNEL); 438 439 if (!cma_xprt) 440 return NULL; 441 svc_xprt_init(&svc_rdma_class, &cma_xprt->sc_xprt, serv); 442 INIT_LIST_HEAD(&cma_xprt->sc_accept_q); 443 INIT_LIST_HEAD(&cma_xprt->sc_dto_q); 444 INIT_LIST_HEAD(&cma_xprt->sc_rq_dto_q); 445 INIT_LIST_HEAD(&cma_xprt->sc_read_complete_q); 446 init_waitqueue_head(&cma_xprt->sc_send_wait); 447 448 spin_lock_init(&cma_xprt->sc_lock); 449 spin_lock_init(&cma_xprt->sc_read_complete_lock); 450 spin_lock_init(&cma_xprt->sc_ctxt_lock); 451 spin_lock_init(&cma_xprt->sc_rq_dto_lock); 452 453 cma_xprt->sc_ord = svcrdma_ord; 454 455 cma_xprt->sc_max_req_size = svcrdma_max_req_size; 456 cma_xprt->sc_max_requests = svcrdma_max_requests; 457 cma_xprt->sc_sq_depth = svcrdma_max_requests * RPCRDMA_SQ_DEPTH_MULT; 458 atomic_set(&cma_xprt->sc_sq_count, 0); 459 460 if (!listener) { 461 int reqs = cma_xprt->sc_max_requests; 462 create_context_cache(cma_xprt, 463 reqs << 1, /* starting size */ 464 reqs, /* bump amount */ 465 reqs + 466 cma_xprt->sc_sq_depth + 467 RPCRDMA_MAX_THREADS + 1); /* max */ 468 if (!cma_xprt->sc_ctxt_head) { 469 kfree(cma_xprt); 470 return NULL; 471 } 472 clear_bit(XPT_LISTENER, &cma_xprt->sc_xprt.xpt_flags); 473 } else 474 set_bit(XPT_LISTENER, &cma_xprt->sc_xprt.xpt_flags); 475 476 return cma_xprt; 477 } 478 479 struct page *svc_rdma_get_page(void) 480 { 481 struct page *page; 482 483 while ((page = alloc_page(GFP_KERNEL)) == NULL) { 484 /* If we can't get memory, wait a bit and try again */ 485 printk(KERN_INFO "svcrdma: out of memory...retrying in 1000 " 486 "jiffies.\n"); 487 schedule_timeout_uninterruptible(msecs_to_jiffies(1000)); 488 } 489 return page; 490 } 491 492 int svc_rdma_post_recv(struct svcxprt_rdma *xprt) 493 { 494 struct ib_recv_wr recv_wr, *bad_recv_wr; 495 struct svc_rdma_op_ctxt *ctxt; 496 struct page *page; 497 unsigned long pa; 498 int sge_no; 499 int buflen; 500 int ret; 501 502 ctxt = svc_rdma_get_context(xprt); 503 buflen = 0; 504 ctxt->direction = DMA_FROM_DEVICE; 505 for (sge_no = 0; buflen < xprt->sc_max_req_size; sge_no++) { 506 BUG_ON(sge_no >= xprt->sc_max_sge); 507 page = svc_rdma_get_page(); 508 ctxt->pages[sge_no] = page; 509 pa = ib_dma_map_page(xprt->sc_cm_id->device, 510 page, 0, PAGE_SIZE, 511 DMA_FROM_DEVICE); 512 ctxt->sge[sge_no].addr = pa; 513 ctxt->sge[sge_no].length = PAGE_SIZE; 514 ctxt->sge[sge_no].lkey = xprt->sc_phys_mr->lkey; 515 buflen += PAGE_SIZE; 516 } 517 ctxt->count = sge_no; 518 recv_wr.next = NULL; 519 recv_wr.sg_list = &ctxt->sge[0]; 520 recv_wr.num_sge = ctxt->count; 521 recv_wr.wr_id = (u64)(unsigned long)ctxt; 522 523 ret = ib_post_recv(xprt->sc_qp, &recv_wr, &bad_recv_wr); 524 return ret; 525 } 526 527 /* 528 * This function handles the CONNECT_REQUEST event on a listening 529 * endpoint. It is passed the cma_id for the _new_ connection. The context in 530 * this cma_id is inherited from the listening cma_id and is the svc_xprt 531 * structure for the listening endpoint. 532 * 533 * This function creates a new xprt for the new connection and enqueues it on 534 * the accept queue for the listent xprt. When the listen thread is kicked, it 535 * will call the recvfrom method on the listen xprt which will accept the new 536 * connection. 537 */ 538 static void handle_connect_req(struct rdma_cm_id *new_cma_id) 539 { 540 struct svcxprt_rdma *listen_xprt = new_cma_id->context; 541 struct svcxprt_rdma *newxprt; 542 543 /* Create a new transport */ 544 newxprt = rdma_create_xprt(listen_xprt->sc_xprt.xpt_server, 0); 545 if (!newxprt) { 546 dprintk("svcrdma: failed to create new transport\n"); 547 return; 548 } 549 newxprt->sc_cm_id = new_cma_id; 550 new_cma_id->context = newxprt; 551 dprintk("svcrdma: Creating newxprt=%p, cm_id=%p, listenxprt=%p\n", 552 newxprt, newxprt->sc_cm_id, listen_xprt); 553 554 /* 555 * Enqueue the new transport on the accept queue of the listening 556 * transport 557 */ 558 spin_lock_bh(&listen_xprt->sc_lock); 559 list_add_tail(&newxprt->sc_accept_q, &listen_xprt->sc_accept_q); 560 spin_unlock_bh(&listen_xprt->sc_lock); 561 562 /* 563 * Can't use svc_xprt_received here because we are not on a 564 * rqstp thread 565 */ 566 set_bit(XPT_CONN, &listen_xprt->sc_xprt.xpt_flags); 567 svc_xprt_enqueue(&listen_xprt->sc_xprt); 568 } 569 570 /* 571 * Handles events generated on the listening endpoint. These events will be 572 * either be incoming connect requests or adapter removal events. 573 */ 574 static int rdma_listen_handler(struct rdma_cm_id *cma_id, 575 struct rdma_cm_event *event) 576 { 577 struct svcxprt_rdma *xprt = cma_id->context; 578 int ret = 0; 579 580 switch (event->event) { 581 case RDMA_CM_EVENT_CONNECT_REQUEST: 582 dprintk("svcrdma: Connect request on cma_id=%p, xprt = %p, " 583 "event=%d\n", cma_id, cma_id->context, event->event); 584 handle_connect_req(cma_id); 585 break; 586 587 case RDMA_CM_EVENT_ESTABLISHED: 588 /* Accept complete */ 589 dprintk("svcrdma: Connection completed on LISTEN xprt=%p, " 590 "cm_id=%p\n", xprt, cma_id); 591 break; 592 593 case RDMA_CM_EVENT_DEVICE_REMOVAL: 594 dprintk("svcrdma: Device removal xprt=%p, cm_id=%p\n", 595 xprt, cma_id); 596 if (xprt) 597 set_bit(XPT_CLOSE, &xprt->sc_xprt.xpt_flags); 598 break; 599 600 default: 601 dprintk("svcrdma: Unexpected event on listening endpoint %p, " 602 "event=%d\n", cma_id, event->event); 603 break; 604 } 605 606 return ret; 607 } 608 609 static int rdma_cma_handler(struct rdma_cm_id *cma_id, 610 struct rdma_cm_event *event) 611 { 612 struct svc_xprt *xprt = cma_id->context; 613 struct svcxprt_rdma *rdma = 614 container_of(xprt, struct svcxprt_rdma, sc_xprt); 615 switch (event->event) { 616 case RDMA_CM_EVENT_ESTABLISHED: 617 /* Accept complete */ 618 svc_xprt_get(xprt); 619 dprintk("svcrdma: Connection completed on DTO xprt=%p, " 620 "cm_id=%p\n", xprt, cma_id); 621 clear_bit(RDMAXPRT_CONN_PENDING, &rdma->sc_flags); 622 svc_xprt_enqueue(xprt); 623 break; 624 case RDMA_CM_EVENT_DISCONNECTED: 625 dprintk("svcrdma: Disconnect on DTO xprt=%p, cm_id=%p\n", 626 xprt, cma_id); 627 if (xprt) { 628 set_bit(XPT_CLOSE, &xprt->xpt_flags); 629 svc_xprt_enqueue(xprt); 630 } 631 break; 632 case RDMA_CM_EVENT_DEVICE_REMOVAL: 633 dprintk("svcrdma: Device removal cma_id=%p, xprt = %p, " 634 "event=%d\n", cma_id, xprt, event->event); 635 if (xprt) { 636 set_bit(XPT_CLOSE, &xprt->xpt_flags); 637 svc_xprt_enqueue(xprt); 638 } 639 break; 640 default: 641 dprintk("svcrdma: Unexpected event on DTO endpoint %p, " 642 "event=%d\n", cma_id, event->event); 643 break; 644 } 645 return 0; 646 } 647 648 /* 649 * Create a listening RDMA service endpoint. 650 */ 651 static struct svc_xprt *svc_rdma_create(struct svc_serv *serv, 652 struct sockaddr *sa, int salen, 653 int flags) 654 { 655 struct rdma_cm_id *listen_id; 656 struct svcxprt_rdma *cma_xprt; 657 struct svc_xprt *xprt; 658 int ret; 659 660 dprintk("svcrdma: Creating RDMA socket\n"); 661 662 cma_xprt = rdma_create_xprt(serv, 1); 663 if (!cma_xprt) 664 return ERR_PTR(ENOMEM); 665 xprt = &cma_xprt->sc_xprt; 666 667 listen_id = rdma_create_id(rdma_listen_handler, cma_xprt, RDMA_PS_TCP); 668 if (IS_ERR(listen_id)) { 669 svc_xprt_put(&cma_xprt->sc_xprt); 670 dprintk("svcrdma: rdma_create_id failed = %ld\n", 671 PTR_ERR(listen_id)); 672 return (void *)listen_id; 673 } 674 ret = rdma_bind_addr(listen_id, sa); 675 if (ret) { 676 rdma_destroy_id(listen_id); 677 svc_xprt_put(&cma_xprt->sc_xprt); 678 dprintk("svcrdma: rdma_bind_addr failed = %d\n", ret); 679 return ERR_PTR(ret); 680 } 681 cma_xprt->sc_cm_id = listen_id; 682 683 ret = rdma_listen(listen_id, RPCRDMA_LISTEN_BACKLOG); 684 if (ret) { 685 rdma_destroy_id(listen_id); 686 svc_xprt_put(&cma_xprt->sc_xprt); 687 dprintk("svcrdma: rdma_listen failed = %d\n", ret); 688 return ERR_PTR(ret); 689 } 690 691 /* 692 * We need to use the address from the cm_id in case the 693 * caller specified 0 for the port number. 694 */ 695 sa = (struct sockaddr *)&cma_xprt->sc_cm_id->route.addr.src_addr; 696 svc_xprt_set_local(&cma_xprt->sc_xprt, sa, salen); 697 698 return &cma_xprt->sc_xprt; 699 } 700 701 /* 702 * This is the xpo_recvfrom function for listening endpoints. Its 703 * purpose is to accept incoming connections. The CMA callback handler 704 * has already created a new transport and attached it to the new CMA 705 * ID. 706 * 707 * There is a queue of pending connections hung on the listening 708 * transport. This queue contains the new svc_xprt structure. This 709 * function takes svc_xprt structures off the accept_q and completes 710 * the connection. 711 */ 712 static struct svc_xprt *svc_rdma_accept(struct svc_xprt *xprt) 713 { 714 struct svcxprt_rdma *listen_rdma; 715 struct svcxprt_rdma *newxprt = NULL; 716 struct rdma_conn_param conn_param; 717 struct ib_qp_init_attr qp_attr; 718 struct ib_device_attr devattr; 719 struct sockaddr *sa; 720 int ret; 721 int i; 722 723 listen_rdma = container_of(xprt, struct svcxprt_rdma, sc_xprt); 724 clear_bit(XPT_CONN, &xprt->xpt_flags); 725 /* Get the next entry off the accept list */ 726 spin_lock_bh(&listen_rdma->sc_lock); 727 if (!list_empty(&listen_rdma->sc_accept_q)) { 728 newxprt = list_entry(listen_rdma->sc_accept_q.next, 729 struct svcxprt_rdma, sc_accept_q); 730 list_del_init(&newxprt->sc_accept_q); 731 } 732 if (!list_empty(&listen_rdma->sc_accept_q)) 733 set_bit(XPT_CONN, &listen_rdma->sc_xprt.xpt_flags); 734 spin_unlock_bh(&listen_rdma->sc_lock); 735 if (!newxprt) 736 return NULL; 737 738 dprintk("svcrdma: newxprt from accept queue = %p, cm_id=%p\n", 739 newxprt, newxprt->sc_cm_id); 740 741 ret = ib_query_device(newxprt->sc_cm_id->device, &devattr); 742 if (ret) { 743 dprintk("svcrdma: could not query device attributes on " 744 "device %p, rc=%d\n", newxprt->sc_cm_id->device, ret); 745 goto errout; 746 } 747 748 /* Qualify the transport resource defaults with the 749 * capabilities of this particular device */ 750 newxprt->sc_max_sge = min((size_t)devattr.max_sge, 751 (size_t)RPCSVC_MAXPAGES); 752 newxprt->sc_max_requests = min((size_t)devattr.max_qp_wr, 753 (size_t)svcrdma_max_requests); 754 newxprt->sc_sq_depth = RPCRDMA_SQ_DEPTH_MULT * newxprt->sc_max_requests; 755 756 newxprt->sc_ord = min((size_t)devattr.max_qp_rd_atom, 757 (size_t)svcrdma_ord); 758 759 newxprt->sc_pd = ib_alloc_pd(newxprt->sc_cm_id->device); 760 if (IS_ERR(newxprt->sc_pd)) { 761 dprintk("svcrdma: error creating PD for connect request\n"); 762 goto errout; 763 } 764 newxprt->sc_sq_cq = ib_create_cq(newxprt->sc_cm_id->device, 765 sq_comp_handler, 766 cq_event_handler, 767 newxprt, 768 newxprt->sc_sq_depth, 769 0); 770 if (IS_ERR(newxprt->sc_sq_cq)) { 771 dprintk("svcrdma: error creating SQ CQ for connect request\n"); 772 goto errout; 773 } 774 newxprt->sc_rq_cq = ib_create_cq(newxprt->sc_cm_id->device, 775 rq_comp_handler, 776 cq_event_handler, 777 newxprt, 778 newxprt->sc_max_requests, 779 0); 780 if (IS_ERR(newxprt->sc_rq_cq)) { 781 dprintk("svcrdma: error creating RQ CQ for connect request\n"); 782 goto errout; 783 } 784 785 memset(&qp_attr, 0, sizeof qp_attr); 786 qp_attr.event_handler = qp_event_handler; 787 qp_attr.qp_context = &newxprt->sc_xprt; 788 qp_attr.cap.max_send_wr = newxprt->sc_sq_depth; 789 qp_attr.cap.max_recv_wr = newxprt->sc_max_requests; 790 qp_attr.cap.max_send_sge = newxprt->sc_max_sge; 791 qp_attr.cap.max_recv_sge = newxprt->sc_max_sge; 792 qp_attr.sq_sig_type = IB_SIGNAL_REQ_WR; 793 qp_attr.qp_type = IB_QPT_RC; 794 qp_attr.send_cq = newxprt->sc_sq_cq; 795 qp_attr.recv_cq = newxprt->sc_rq_cq; 796 dprintk("svcrdma: newxprt->sc_cm_id=%p, newxprt->sc_pd=%p\n" 797 " cm_id->device=%p, sc_pd->device=%p\n" 798 " cap.max_send_wr = %d\n" 799 " cap.max_recv_wr = %d\n" 800 " cap.max_send_sge = %d\n" 801 " cap.max_recv_sge = %d\n", 802 newxprt->sc_cm_id, newxprt->sc_pd, 803 newxprt->sc_cm_id->device, newxprt->sc_pd->device, 804 qp_attr.cap.max_send_wr, 805 qp_attr.cap.max_recv_wr, 806 qp_attr.cap.max_send_sge, 807 qp_attr.cap.max_recv_sge); 808 809 ret = rdma_create_qp(newxprt->sc_cm_id, newxprt->sc_pd, &qp_attr); 810 if (ret) { 811 /* 812 * XXX: This is a hack. We need a xx_request_qp interface 813 * that will adjust the qp_attr's with a best-effort 814 * number 815 */ 816 qp_attr.cap.max_send_sge -= 2; 817 qp_attr.cap.max_recv_sge -= 2; 818 ret = rdma_create_qp(newxprt->sc_cm_id, newxprt->sc_pd, 819 &qp_attr); 820 if (ret) { 821 dprintk("svcrdma: failed to create QP, ret=%d\n", ret); 822 goto errout; 823 } 824 newxprt->sc_max_sge = qp_attr.cap.max_send_sge; 825 newxprt->sc_max_sge = qp_attr.cap.max_recv_sge; 826 newxprt->sc_sq_depth = qp_attr.cap.max_send_wr; 827 newxprt->sc_max_requests = qp_attr.cap.max_recv_wr; 828 } 829 svc_xprt_get(&newxprt->sc_xprt); 830 newxprt->sc_qp = newxprt->sc_cm_id->qp; 831 832 /* Register all of physical memory */ 833 newxprt->sc_phys_mr = ib_get_dma_mr(newxprt->sc_pd, 834 IB_ACCESS_LOCAL_WRITE | 835 IB_ACCESS_REMOTE_WRITE); 836 if (IS_ERR(newxprt->sc_phys_mr)) { 837 dprintk("svcrdma: Failed to create DMA MR ret=%d\n", ret); 838 goto errout; 839 } 840 841 /* Post receive buffers */ 842 for (i = 0; i < newxprt->sc_max_requests; i++) { 843 ret = svc_rdma_post_recv(newxprt); 844 if (ret) { 845 dprintk("svcrdma: failure posting receive buffers\n"); 846 goto errout; 847 } 848 } 849 850 /* Swap out the handler */ 851 newxprt->sc_cm_id->event_handler = rdma_cma_handler; 852 853 /* Accept Connection */ 854 set_bit(RDMAXPRT_CONN_PENDING, &newxprt->sc_flags); 855 memset(&conn_param, 0, sizeof conn_param); 856 conn_param.responder_resources = 0; 857 conn_param.initiator_depth = newxprt->sc_ord; 858 ret = rdma_accept(newxprt->sc_cm_id, &conn_param); 859 if (ret) { 860 dprintk("svcrdma: failed to accept new connection, ret=%d\n", 861 ret); 862 goto errout; 863 } 864 865 dprintk("svcrdma: new connection %p accepted with the following " 866 "attributes:\n" 867 " local_ip : %d.%d.%d.%d\n" 868 " local_port : %d\n" 869 " remote_ip : %d.%d.%d.%d\n" 870 " remote_port : %d\n" 871 " max_sge : %d\n" 872 " sq_depth : %d\n" 873 " max_requests : %d\n" 874 " ord : %d\n", 875 newxprt, 876 NIPQUAD(((struct sockaddr_in *)&newxprt->sc_cm_id-> 877 route.addr.src_addr)->sin_addr.s_addr), 878 ntohs(((struct sockaddr_in *)&newxprt->sc_cm_id-> 879 route.addr.src_addr)->sin_port), 880 NIPQUAD(((struct sockaddr_in *)&newxprt->sc_cm_id-> 881 route.addr.dst_addr)->sin_addr.s_addr), 882 ntohs(((struct sockaddr_in *)&newxprt->sc_cm_id-> 883 route.addr.dst_addr)->sin_port), 884 newxprt->sc_max_sge, 885 newxprt->sc_sq_depth, 886 newxprt->sc_max_requests, 887 newxprt->sc_ord); 888 889 /* Set the local and remote addresses in the transport */ 890 sa = (struct sockaddr *)&newxprt->sc_cm_id->route.addr.dst_addr; 891 svc_xprt_set_remote(&newxprt->sc_xprt, sa, svc_addr_len(sa)); 892 sa = (struct sockaddr *)&newxprt->sc_cm_id->route.addr.src_addr; 893 svc_xprt_set_local(&newxprt->sc_xprt, sa, svc_addr_len(sa)); 894 895 ib_req_notify_cq(newxprt->sc_sq_cq, IB_CQ_NEXT_COMP); 896 ib_req_notify_cq(newxprt->sc_rq_cq, IB_CQ_NEXT_COMP); 897 return &newxprt->sc_xprt; 898 899 errout: 900 dprintk("svcrdma: failure accepting new connection rc=%d.\n", ret); 901 /* Take a reference in case the DTO handler runs */ 902 svc_xprt_get(&newxprt->sc_xprt); 903 if (newxprt->sc_qp && !IS_ERR(newxprt->sc_qp)) { 904 ib_destroy_qp(newxprt->sc_qp); 905 svc_xprt_put(&newxprt->sc_xprt); 906 } 907 rdma_destroy_id(newxprt->sc_cm_id); 908 /* This call to put will destroy the transport */ 909 svc_xprt_put(&newxprt->sc_xprt); 910 return NULL; 911 } 912 913 /* 914 * Post an RQ WQE to the RQ when the rqst is being released. This 915 * effectively returns an RQ credit to the client. The rq_xprt_ctxt 916 * will be null if the request is deferred due to an RDMA_READ or the 917 * transport had no data ready (EAGAIN). Note that an RPC deferred in 918 * svc_process will still return the credit, this is because the data 919 * is copied and no longer consume a WQE/WC. 920 */ 921 static void svc_rdma_release_rqst(struct svc_rqst *rqstp) 922 { 923 int err; 924 struct svcxprt_rdma *rdma = 925 container_of(rqstp->rq_xprt, struct svcxprt_rdma, sc_xprt); 926 if (rqstp->rq_xprt_ctxt) { 927 BUG_ON(rqstp->rq_xprt_ctxt != rdma); 928 err = svc_rdma_post_recv(rdma); 929 if (err) 930 dprintk("svcrdma: failed to post an RQ WQE error=%d\n", 931 err); 932 } 933 rqstp->rq_xprt_ctxt = NULL; 934 } 935 936 /* 937 * When connected, an svc_xprt has at least three references: 938 * 939 * - A reference held by the QP. We still hold that here because this 940 * code deletes the QP and puts the reference. 941 * 942 * - A reference held by the cm_id between the ESTABLISHED and 943 * DISCONNECTED events. If the remote peer disconnected first, this 944 * reference could be gone. 945 * 946 * - A reference held by the svc_recv code that called this function 947 * as part of close processing. 948 * 949 * At a minimum two references should still be held. 950 */ 951 static void svc_rdma_detach(struct svc_xprt *xprt) 952 { 953 struct svcxprt_rdma *rdma = 954 container_of(xprt, struct svcxprt_rdma, sc_xprt); 955 dprintk("svc: svc_rdma_detach(%p)\n", xprt); 956 957 /* Disconnect and flush posted WQE */ 958 rdma_disconnect(rdma->sc_cm_id); 959 960 /* Destroy the QP if present (not a listener) */ 961 if (rdma->sc_qp && !IS_ERR(rdma->sc_qp)) { 962 ib_destroy_qp(rdma->sc_qp); 963 svc_xprt_put(xprt); 964 } 965 966 /* Destroy the CM ID */ 967 rdma_destroy_id(rdma->sc_cm_id); 968 } 969 970 static void svc_rdma_free(struct svc_xprt *xprt) 971 { 972 struct svcxprt_rdma *rdma = (struct svcxprt_rdma *)xprt; 973 dprintk("svcrdma: svc_rdma_free(%p)\n", rdma); 974 /* We should only be called from kref_put */ 975 BUG_ON(atomic_read(&xprt->xpt_ref.refcount) != 0); 976 if (rdma->sc_sq_cq && !IS_ERR(rdma->sc_sq_cq)) 977 ib_destroy_cq(rdma->sc_sq_cq); 978 979 if (rdma->sc_rq_cq && !IS_ERR(rdma->sc_rq_cq)) 980 ib_destroy_cq(rdma->sc_rq_cq); 981 982 if (rdma->sc_phys_mr && !IS_ERR(rdma->sc_phys_mr)) 983 ib_dereg_mr(rdma->sc_phys_mr); 984 985 if (rdma->sc_pd && !IS_ERR(rdma->sc_pd)) 986 ib_dealloc_pd(rdma->sc_pd); 987 988 destroy_context_cache(rdma->sc_ctxt_head); 989 kfree(rdma); 990 } 991 992 static int svc_rdma_has_wspace(struct svc_xprt *xprt) 993 { 994 struct svcxprt_rdma *rdma = 995 container_of(xprt, struct svcxprt_rdma, sc_xprt); 996 997 /* 998 * If there are fewer SQ WR available than required to send a 999 * simple response, return false. 1000 */ 1001 if ((rdma->sc_sq_depth - atomic_read(&rdma->sc_sq_count) < 3)) 1002 return 0; 1003 1004 /* 1005 * ...or there are already waiters on the SQ, 1006 * return false. 1007 */ 1008 if (waitqueue_active(&rdma->sc_send_wait)) 1009 return 0; 1010 1011 /* Otherwise return true. */ 1012 return 1; 1013 } 1014 1015 int svc_rdma_send(struct svcxprt_rdma *xprt, struct ib_send_wr *wr) 1016 { 1017 struct ib_send_wr *bad_wr; 1018 int ret; 1019 1020 if (test_bit(XPT_CLOSE, &xprt->sc_xprt.xpt_flags)) 1021 return 0; 1022 1023 BUG_ON(wr->send_flags != IB_SEND_SIGNALED); 1024 BUG_ON(((struct svc_rdma_op_ctxt *)(unsigned long)wr->wr_id)->wr_op != 1025 wr->opcode); 1026 /* If the SQ is full, wait until an SQ entry is available */ 1027 while (1) { 1028 spin_lock_bh(&xprt->sc_lock); 1029 if (xprt->sc_sq_depth == atomic_read(&xprt->sc_sq_count)) { 1030 spin_unlock_bh(&xprt->sc_lock); 1031 atomic_inc(&rdma_stat_sq_starve); 1032 /* See if we can reap some SQ WR */ 1033 sq_cq_reap(xprt); 1034 1035 /* Wait until SQ WR available if SQ still full */ 1036 wait_event(xprt->sc_send_wait, 1037 atomic_read(&xprt->sc_sq_count) < 1038 xprt->sc_sq_depth); 1039 continue; 1040 } 1041 /* Bumped used SQ WR count and post */ 1042 ret = ib_post_send(xprt->sc_qp, wr, &bad_wr); 1043 if (!ret) 1044 atomic_inc(&xprt->sc_sq_count); 1045 else 1046 dprintk("svcrdma: failed to post SQ WR rc=%d, " 1047 "sc_sq_count=%d, sc_sq_depth=%d\n", 1048 ret, atomic_read(&xprt->sc_sq_count), 1049 xprt->sc_sq_depth); 1050 spin_unlock_bh(&xprt->sc_lock); 1051 break; 1052 } 1053 return ret; 1054 } 1055 1056 int svc_rdma_send_error(struct svcxprt_rdma *xprt, struct rpcrdma_msg *rmsgp, 1057 enum rpcrdma_errcode err) 1058 { 1059 struct ib_send_wr err_wr; 1060 struct ib_sge sge; 1061 struct page *p; 1062 struct svc_rdma_op_ctxt *ctxt; 1063 u32 *va; 1064 int length; 1065 int ret; 1066 1067 p = svc_rdma_get_page(); 1068 va = page_address(p); 1069 1070 /* XDR encode error */ 1071 length = svc_rdma_xdr_encode_error(xprt, rmsgp, err, va); 1072 1073 /* Prepare SGE for local address */ 1074 sge.addr = ib_dma_map_page(xprt->sc_cm_id->device, 1075 p, 0, PAGE_SIZE, DMA_FROM_DEVICE); 1076 sge.lkey = xprt->sc_phys_mr->lkey; 1077 sge.length = length; 1078 1079 ctxt = svc_rdma_get_context(xprt); 1080 ctxt->count = 1; 1081 ctxt->pages[0] = p; 1082 1083 /* Prepare SEND WR */ 1084 memset(&err_wr, 0, sizeof err_wr); 1085 ctxt->wr_op = IB_WR_SEND; 1086 err_wr.wr_id = (unsigned long)ctxt; 1087 err_wr.sg_list = &sge; 1088 err_wr.num_sge = 1; 1089 err_wr.opcode = IB_WR_SEND; 1090 err_wr.send_flags = IB_SEND_SIGNALED; 1091 1092 /* Post It */ 1093 ret = svc_rdma_send(xprt, &err_wr); 1094 if (ret) { 1095 dprintk("svcrdma: Error posting send = %d\n", ret); 1096 svc_rdma_put_context(ctxt, 1); 1097 } 1098 1099 return ret; 1100 } 1101