1 // SPDX-License-Identifier: GPL-2.0 2 /* 3 * Copyright (c) 2015, 2017 Oracle. All rights reserved. 4 * Copyright (c) 2003-2007 Network Appliance, Inc. All rights reserved. 5 */ 6 7 /* Lightweight memory registration using Fast Registration Work 8 * Requests (FRWR). 9 * 10 * FRWR features ordered asynchronous registration and invalidation 11 * of arbitrarily-sized memory regions. This is the fastest and safest 12 * but most complex memory registration mode. 13 */ 14 15 /* Normal operation 16 * 17 * A Memory Region is prepared for RDMA Read or Write using a FAST_REG 18 * Work Request (frwr_map). When the RDMA operation is finished, this 19 * Memory Region is invalidated using a LOCAL_INV Work Request 20 * (frwr_unmap_async and frwr_unmap_sync). 21 * 22 * Typically FAST_REG Work Requests are not signaled, and neither are 23 * RDMA Send Work Requests (with the exception of signaling occasionally 24 * to prevent provider work queue overflows). This greatly reduces HCA 25 * interrupt workload. 26 */ 27 28 /* Transport recovery 29 * 30 * frwr_map and frwr_unmap_* cannot run at the same time the transport 31 * connect worker is running. The connect worker holds the transport 32 * send lock, just as ->send_request does. This prevents frwr_map and 33 * the connect worker from running concurrently. When a connection is 34 * closed, the Receive completion queue is drained before the allowing 35 * the connect worker to get control. This prevents frwr_unmap and the 36 * connect worker from running concurrently. 37 * 38 * When the underlying transport disconnects, MRs that are in flight 39 * are flushed and are likely unusable. Thus all flushed MRs are 40 * destroyed. New MRs are created on demand. 41 */ 42 43 #include <linux/sunrpc/rpc_rdma.h> 44 #include <linux/sunrpc/svc_rdma.h> 45 46 #include "xprt_rdma.h" 47 #include <trace/events/rpcrdma.h> 48 49 #if IS_ENABLED(CONFIG_SUNRPC_DEBUG) 50 # define RPCDBG_FACILITY RPCDBG_TRANS 51 #endif 52 53 /** 54 * frwr_is_supported - Check if device supports FRWR 55 * @device: interface adapter to check 56 * 57 * Returns true if device supports FRWR, otherwise false 58 */ 59 bool frwr_is_supported(struct ib_device *device) 60 { 61 struct ib_device_attr *attrs = &device->attrs; 62 63 if (!(attrs->device_cap_flags & IB_DEVICE_MEM_MGT_EXTENSIONS)) 64 goto out_not_supported; 65 if (attrs->max_fast_reg_page_list_len == 0) 66 goto out_not_supported; 67 return true; 68 69 out_not_supported: 70 pr_info("rpcrdma: 'frwr' mode is not supported by device %s\n", 71 device->name); 72 return false; 73 } 74 75 /** 76 * frwr_release_mr - Destroy one MR 77 * @mr: MR allocated by frwr_init_mr 78 * 79 */ 80 void frwr_release_mr(struct rpcrdma_mr *mr) 81 { 82 int rc; 83 84 rc = ib_dereg_mr(mr->frwr.fr_mr); 85 if (rc) 86 trace_xprtrdma_frwr_dereg(mr, rc); 87 kfree(mr->mr_sg); 88 kfree(mr); 89 } 90 91 static void frwr_mr_recycle(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr *mr) 92 { 93 trace_xprtrdma_mr_recycle(mr); 94 95 if (mr->mr_dir != DMA_NONE) { 96 trace_xprtrdma_mr_unmap(mr); 97 ib_dma_unmap_sg(r_xprt->rx_ia.ri_id->device, 98 mr->mr_sg, mr->mr_nents, mr->mr_dir); 99 mr->mr_dir = DMA_NONE; 100 } 101 102 spin_lock(&r_xprt->rx_buf.rb_lock); 103 list_del(&mr->mr_all); 104 r_xprt->rx_stats.mrs_recycled++; 105 spin_unlock(&r_xprt->rx_buf.rb_lock); 106 107 frwr_release_mr(mr); 108 } 109 110 /* MRs are dynamically allocated, so simply clean up and release the MR. 111 * A replacement MR will subsequently be allocated on demand. 112 */ 113 static void 114 frwr_mr_recycle_worker(struct work_struct *work) 115 { 116 struct rpcrdma_mr *mr = container_of(work, struct rpcrdma_mr, 117 mr_recycle); 118 119 frwr_mr_recycle(mr->mr_xprt, mr); 120 } 121 122 /* frwr_recycle - Discard MRs 123 * @req: request to reset 124 * 125 * Used after a reconnect. These MRs could be in flight, we can't 126 * tell. Safe thing to do is release them. 127 */ 128 void frwr_recycle(struct rpcrdma_req *req) 129 { 130 struct rpcrdma_mr *mr; 131 132 while ((mr = rpcrdma_mr_pop(&req->rl_registered))) 133 frwr_mr_recycle(mr->mr_xprt, mr); 134 } 135 136 /* frwr_reset - Place MRs back on the free list 137 * @req: request to reset 138 * 139 * Used after a failed marshal. For FRWR, this means the MRs 140 * don't have to be fully released and recreated. 141 * 142 * NB: This is safe only as long as none of @req's MRs are 143 * involved with an ongoing asynchronous FAST_REG or LOCAL_INV 144 * Work Request. 145 */ 146 void frwr_reset(struct rpcrdma_req *req) 147 { 148 struct rpcrdma_mr *mr; 149 150 while ((mr = rpcrdma_mr_pop(&req->rl_registered))) 151 rpcrdma_mr_put(mr); 152 } 153 154 /** 155 * frwr_init_mr - Initialize one MR 156 * @ia: interface adapter 157 * @mr: generic MR to prepare for FRWR 158 * 159 * Returns zero if successful. Otherwise a negative errno 160 * is returned. 161 */ 162 int frwr_init_mr(struct rpcrdma_ia *ia, struct rpcrdma_mr *mr) 163 { 164 unsigned int depth = ia->ri_max_frwr_depth; 165 struct scatterlist *sg; 166 struct ib_mr *frmr; 167 int rc; 168 169 /* NB: ib_alloc_mr and device drivers typically allocate 170 * memory with GFP_KERNEL. 171 */ 172 frmr = ib_alloc_mr(ia->ri_pd, ia->ri_mrtype, depth); 173 if (IS_ERR(frmr)) 174 goto out_mr_err; 175 176 sg = kcalloc(depth, sizeof(*sg), GFP_NOFS); 177 if (!sg) 178 goto out_list_err; 179 180 mr->frwr.fr_mr = frmr; 181 mr->mr_dir = DMA_NONE; 182 INIT_LIST_HEAD(&mr->mr_list); 183 INIT_WORK(&mr->mr_recycle, frwr_mr_recycle_worker); 184 init_completion(&mr->frwr.fr_linv_done); 185 186 sg_init_table(sg, depth); 187 mr->mr_sg = sg; 188 return 0; 189 190 out_mr_err: 191 rc = PTR_ERR(frmr); 192 trace_xprtrdma_frwr_alloc(mr, rc); 193 return rc; 194 195 out_list_err: 196 ib_dereg_mr(frmr); 197 return -ENOMEM; 198 } 199 200 /** 201 * frwr_open - Prepare an endpoint for use with FRWR 202 * @ia: interface adapter this endpoint will use 203 * @ep: endpoint to prepare 204 * 205 * On success, sets: 206 * ep->rep_attr.cap.max_send_wr 207 * ep->rep_attr.cap.max_recv_wr 208 * ep->rep_max_requests 209 * ia->ri_max_segs 210 * 211 * And these FRWR-related fields: 212 * ia->ri_max_frwr_depth 213 * ia->ri_mrtype 214 * 215 * On failure, a negative errno is returned. 216 */ 217 int frwr_open(struct rpcrdma_ia *ia, struct rpcrdma_ep *ep) 218 { 219 struct ib_device_attr *attrs = &ia->ri_id->device->attrs; 220 int max_qp_wr, depth, delta; 221 222 ia->ri_mrtype = IB_MR_TYPE_MEM_REG; 223 if (attrs->device_cap_flags & IB_DEVICE_SG_GAPS_REG) 224 ia->ri_mrtype = IB_MR_TYPE_SG_GAPS; 225 226 /* Quirk: Some devices advertise a large max_fast_reg_page_list_len 227 * capability, but perform optimally when the MRs are not larger 228 * than a page. 229 */ 230 if (attrs->max_sge_rd > 1) 231 ia->ri_max_frwr_depth = attrs->max_sge_rd; 232 else 233 ia->ri_max_frwr_depth = attrs->max_fast_reg_page_list_len; 234 if (ia->ri_max_frwr_depth > RPCRDMA_MAX_DATA_SEGS) 235 ia->ri_max_frwr_depth = RPCRDMA_MAX_DATA_SEGS; 236 dprintk("RPC: %s: max FR page list depth = %u\n", 237 __func__, ia->ri_max_frwr_depth); 238 239 /* Add room for frwr register and invalidate WRs. 240 * 1. FRWR reg WR for head 241 * 2. FRWR invalidate WR for head 242 * 3. N FRWR reg WRs for pagelist 243 * 4. N FRWR invalidate WRs for pagelist 244 * 5. FRWR reg WR for tail 245 * 6. FRWR invalidate WR for tail 246 * 7. The RDMA_SEND WR 247 */ 248 depth = 7; 249 250 /* Calculate N if the device max FRWR depth is smaller than 251 * RPCRDMA_MAX_DATA_SEGS. 252 */ 253 if (ia->ri_max_frwr_depth < RPCRDMA_MAX_DATA_SEGS) { 254 delta = RPCRDMA_MAX_DATA_SEGS - ia->ri_max_frwr_depth; 255 do { 256 depth += 2; /* FRWR reg + invalidate */ 257 delta -= ia->ri_max_frwr_depth; 258 } while (delta > 0); 259 } 260 261 max_qp_wr = ia->ri_id->device->attrs.max_qp_wr; 262 max_qp_wr -= RPCRDMA_BACKWARD_WRS; 263 max_qp_wr -= 1; 264 if (max_qp_wr < RPCRDMA_MIN_SLOT_TABLE) 265 return -ENOMEM; 266 if (ep->rep_max_requests > max_qp_wr) 267 ep->rep_max_requests = max_qp_wr; 268 ep->rep_attr.cap.max_send_wr = ep->rep_max_requests * depth; 269 if (ep->rep_attr.cap.max_send_wr > max_qp_wr) { 270 ep->rep_max_requests = max_qp_wr / depth; 271 if (!ep->rep_max_requests) 272 return -EINVAL; 273 ep->rep_attr.cap.max_send_wr = ep->rep_max_requests * depth; 274 } 275 ep->rep_attr.cap.max_send_wr += RPCRDMA_BACKWARD_WRS; 276 ep->rep_attr.cap.max_send_wr += 1; /* for ib_drain_sq */ 277 ep->rep_attr.cap.max_recv_wr = ep->rep_max_requests; 278 ep->rep_attr.cap.max_recv_wr += RPCRDMA_BACKWARD_WRS; 279 ep->rep_attr.cap.max_recv_wr += 1; /* for ib_drain_rq */ 280 281 ia->ri_max_segs = 282 DIV_ROUND_UP(RPCRDMA_MAX_DATA_SEGS, ia->ri_max_frwr_depth); 283 /* Reply chunks require segments for head and tail buffers */ 284 ia->ri_max_segs += 2; 285 if (ia->ri_max_segs > RPCRDMA_MAX_HDR_SEGS) 286 ia->ri_max_segs = RPCRDMA_MAX_HDR_SEGS; 287 return 0; 288 } 289 290 /** 291 * frwr_maxpages - Compute size of largest payload 292 * @r_xprt: transport 293 * 294 * Returns maximum size of an RPC message, in pages. 295 * 296 * FRWR mode conveys a list of pages per chunk segment. The 297 * maximum length of that list is the FRWR page list depth. 298 */ 299 size_t frwr_maxpages(struct rpcrdma_xprt *r_xprt) 300 { 301 struct rpcrdma_ia *ia = &r_xprt->rx_ia; 302 303 return min_t(unsigned int, RPCRDMA_MAX_DATA_SEGS, 304 (ia->ri_max_segs - 2) * ia->ri_max_frwr_depth); 305 } 306 307 /** 308 * frwr_map - Register a memory region 309 * @r_xprt: controlling transport 310 * @seg: memory region co-ordinates 311 * @nsegs: number of segments remaining 312 * @writing: true when RDMA Write will be used 313 * @xid: XID of RPC using the registered memory 314 * @mr: MR to fill in 315 * 316 * Prepare a REG_MR Work Request to register a memory region 317 * for remote access via RDMA READ or RDMA WRITE. 318 * 319 * Returns the next segment or a negative errno pointer. 320 * On success, @mr is filled in. 321 */ 322 struct rpcrdma_mr_seg *frwr_map(struct rpcrdma_xprt *r_xprt, 323 struct rpcrdma_mr_seg *seg, 324 int nsegs, bool writing, __be32 xid, 325 struct rpcrdma_mr *mr) 326 { 327 struct rpcrdma_ia *ia = &r_xprt->rx_ia; 328 struct ib_reg_wr *reg_wr; 329 struct ib_mr *ibmr; 330 int i, n; 331 u8 key; 332 333 if (nsegs > ia->ri_max_frwr_depth) 334 nsegs = ia->ri_max_frwr_depth; 335 for (i = 0; i < nsegs;) { 336 if (seg->mr_page) 337 sg_set_page(&mr->mr_sg[i], 338 seg->mr_page, 339 seg->mr_len, 340 offset_in_page(seg->mr_offset)); 341 else 342 sg_set_buf(&mr->mr_sg[i], seg->mr_offset, 343 seg->mr_len); 344 345 ++seg; 346 ++i; 347 if (ia->ri_mrtype == IB_MR_TYPE_SG_GAPS) 348 continue; 349 if ((i < nsegs && offset_in_page(seg->mr_offset)) || 350 offset_in_page((seg-1)->mr_offset + (seg-1)->mr_len)) 351 break; 352 } 353 mr->mr_dir = rpcrdma_data_dir(writing); 354 355 mr->mr_nents = 356 ib_dma_map_sg(ia->ri_id->device, mr->mr_sg, i, mr->mr_dir); 357 if (!mr->mr_nents) 358 goto out_dmamap_err; 359 360 ibmr = mr->frwr.fr_mr; 361 n = ib_map_mr_sg(ibmr, mr->mr_sg, mr->mr_nents, NULL, PAGE_SIZE); 362 if (unlikely(n != mr->mr_nents)) 363 goto out_mapmr_err; 364 365 ibmr->iova &= 0x00000000ffffffff; 366 ibmr->iova |= ((u64)be32_to_cpu(xid)) << 32; 367 key = (u8)(ibmr->rkey & 0x000000FF); 368 ib_update_fast_reg_key(ibmr, ++key); 369 370 reg_wr = &mr->frwr.fr_regwr; 371 reg_wr->mr = ibmr; 372 reg_wr->key = ibmr->rkey; 373 reg_wr->access = writing ? 374 IB_ACCESS_REMOTE_WRITE | IB_ACCESS_LOCAL_WRITE : 375 IB_ACCESS_REMOTE_READ; 376 377 mr->mr_handle = ibmr->rkey; 378 mr->mr_length = ibmr->length; 379 mr->mr_offset = ibmr->iova; 380 trace_xprtrdma_mr_map(mr); 381 382 return seg; 383 384 out_dmamap_err: 385 mr->mr_dir = DMA_NONE; 386 trace_xprtrdma_frwr_sgerr(mr, i); 387 return ERR_PTR(-EIO); 388 389 out_mapmr_err: 390 trace_xprtrdma_frwr_maperr(mr, n); 391 return ERR_PTR(-EIO); 392 } 393 394 /** 395 * frwr_wc_fastreg - Invoked by RDMA provider for a flushed FastReg WC 396 * @cq: completion queue (ignored) 397 * @wc: completed WR 398 * 399 */ 400 static void frwr_wc_fastreg(struct ib_cq *cq, struct ib_wc *wc) 401 { 402 struct ib_cqe *cqe = wc->wr_cqe; 403 struct rpcrdma_frwr *frwr = 404 container_of(cqe, struct rpcrdma_frwr, fr_cqe); 405 406 /* WARNING: Only wr_cqe and status are reliable at this point */ 407 trace_xprtrdma_wc_fastreg(wc, frwr); 408 /* The MR will get recycled when the associated req is retransmitted */ 409 } 410 411 /** 412 * frwr_send - post Send WR containing the RPC Call message 413 * @ia: interface adapter 414 * @req: Prepared RPC Call 415 * 416 * For FRWR, chain any FastReg WRs to the Send WR. Only a 417 * single ib_post_send call is needed to register memory 418 * and then post the Send WR. 419 * 420 * Returns the result of ib_post_send. 421 */ 422 int frwr_send(struct rpcrdma_ia *ia, struct rpcrdma_req *req) 423 { 424 struct ib_send_wr *post_wr; 425 struct rpcrdma_mr *mr; 426 427 post_wr = &req->rl_sendctx->sc_wr; 428 list_for_each_entry(mr, &req->rl_registered, mr_list) { 429 struct rpcrdma_frwr *frwr; 430 431 frwr = &mr->frwr; 432 433 frwr->fr_cqe.done = frwr_wc_fastreg; 434 frwr->fr_regwr.wr.next = post_wr; 435 frwr->fr_regwr.wr.wr_cqe = &frwr->fr_cqe; 436 frwr->fr_regwr.wr.num_sge = 0; 437 frwr->fr_regwr.wr.opcode = IB_WR_REG_MR; 438 frwr->fr_regwr.wr.send_flags = 0; 439 440 post_wr = &frwr->fr_regwr.wr; 441 } 442 443 /* If ib_post_send fails, the next ->send_request for 444 * @req will queue these MRs for recovery. 445 */ 446 return ib_post_send(ia->ri_id->qp, post_wr, NULL); 447 } 448 449 /** 450 * frwr_reminv - handle a remotely invalidated mr on the @mrs list 451 * @rep: Received reply 452 * @mrs: list of MRs to check 453 * 454 */ 455 void frwr_reminv(struct rpcrdma_rep *rep, struct list_head *mrs) 456 { 457 struct rpcrdma_mr *mr; 458 459 list_for_each_entry(mr, mrs, mr_list) 460 if (mr->mr_handle == rep->rr_inv_rkey) { 461 list_del_init(&mr->mr_list); 462 trace_xprtrdma_mr_remoteinv(mr); 463 rpcrdma_mr_put(mr); 464 break; /* only one invalidated MR per RPC */ 465 } 466 } 467 468 static void __frwr_release_mr(struct ib_wc *wc, struct rpcrdma_mr *mr) 469 { 470 if (wc->status != IB_WC_SUCCESS) 471 rpcrdma_mr_recycle(mr); 472 else 473 rpcrdma_mr_put(mr); 474 } 475 476 /** 477 * frwr_wc_localinv - Invoked by RDMA provider for a LOCAL_INV WC 478 * @cq: completion queue (ignored) 479 * @wc: completed WR 480 * 481 */ 482 static void frwr_wc_localinv(struct ib_cq *cq, struct ib_wc *wc) 483 { 484 struct ib_cqe *cqe = wc->wr_cqe; 485 struct rpcrdma_frwr *frwr = 486 container_of(cqe, struct rpcrdma_frwr, fr_cqe); 487 struct rpcrdma_mr *mr = container_of(frwr, struct rpcrdma_mr, frwr); 488 489 /* WARNING: Only wr_cqe and status are reliable at this point */ 490 trace_xprtrdma_wc_li(wc, frwr); 491 __frwr_release_mr(wc, mr); 492 } 493 494 /** 495 * frwr_wc_localinv_wake - Invoked by RDMA provider for a LOCAL_INV WC 496 * @cq: completion queue (ignored) 497 * @wc: completed WR 498 * 499 * Awaken anyone waiting for an MR to finish being fenced. 500 */ 501 static void frwr_wc_localinv_wake(struct ib_cq *cq, struct ib_wc *wc) 502 { 503 struct ib_cqe *cqe = wc->wr_cqe; 504 struct rpcrdma_frwr *frwr = 505 container_of(cqe, struct rpcrdma_frwr, fr_cqe); 506 struct rpcrdma_mr *mr = container_of(frwr, struct rpcrdma_mr, frwr); 507 508 /* WARNING: Only wr_cqe and status are reliable at this point */ 509 trace_xprtrdma_wc_li_wake(wc, frwr); 510 __frwr_release_mr(wc, mr); 511 complete(&frwr->fr_linv_done); 512 } 513 514 /** 515 * frwr_unmap_sync - invalidate memory regions that were registered for @req 516 * @r_xprt: controlling transport instance 517 * @req: rpcrdma_req with a non-empty list of MRs to process 518 * 519 * Sleeps until it is safe for the host CPU to access the previously mapped 520 * memory regions. This guarantees that registered MRs are properly fenced 521 * from the server before the RPC consumer accesses the data in them. It 522 * also ensures proper Send flow control: waking the next RPC waits until 523 * this RPC has relinquished all its Send Queue entries. 524 */ 525 void frwr_unmap_sync(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req) 526 { 527 struct ib_send_wr *first, **prev, *last; 528 const struct ib_send_wr *bad_wr; 529 struct rpcrdma_frwr *frwr; 530 struct rpcrdma_mr *mr; 531 int rc; 532 533 /* ORDER: Invalidate all of the MRs first 534 * 535 * Chain the LOCAL_INV Work Requests and post them with 536 * a single ib_post_send() call. 537 */ 538 frwr = NULL; 539 prev = &first; 540 while ((mr = rpcrdma_mr_pop(&req->rl_registered))) { 541 542 trace_xprtrdma_mr_localinv(mr); 543 r_xprt->rx_stats.local_inv_needed++; 544 545 frwr = &mr->frwr; 546 frwr->fr_cqe.done = frwr_wc_localinv; 547 last = &frwr->fr_invwr; 548 last->next = NULL; 549 last->wr_cqe = &frwr->fr_cqe; 550 last->sg_list = NULL; 551 last->num_sge = 0; 552 last->opcode = IB_WR_LOCAL_INV; 553 last->send_flags = IB_SEND_SIGNALED; 554 last->ex.invalidate_rkey = mr->mr_handle; 555 556 *prev = last; 557 prev = &last->next; 558 } 559 560 /* Strong send queue ordering guarantees that when the 561 * last WR in the chain completes, all WRs in the chain 562 * are complete. 563 */ 564 frwr->fr_cqe.done = frwr_wc_localinv_wake; 565 reinit_completion(&frwr->fr_linv_done); 566 567 /* Transport disconnect drains the receive CQ before it 568 * replaces the QP. The RPC reply handler won't call us 569 * unless ri_id->qp is a valid pointer. 570 */ 571 bad_wr = NULL; 572 rc = ib_post_send(r_xprt->rx_ia.ri_id->qp, first, &bad_wr); 573 trace_xprtrdma_post_send(req, rc); 574 575 /* The final LOCAL_INV WR in the chain is supposed to 576 * do the wake. If it was never posted, the wake will 577 * not happen, so don't wait in that case. 578 */ 579 if (bad_wr != first) 580 wait_for_completion(&frwr->fr_linv_done); 581 if (!rc) 582 return; 583 584 /* Recycle MRs in the LOCAL_INV chain that did not get posted. 585 */ 586 while (bad_wr) { 587 frwr = container_of(bad_wr, struct rpcrdma_frwr, 588 fr_invwr); 589 mr = container_of(frwr, struct rpcrdma_mr, frwr); 590 bad_wr = bad_wr->next; 591 592 list_del_init(&mr->mr_list); 593 rpcrdma_mr_recycle(mr); 594 } 595 } 596 597 /** 598 * frwr_wc_localinv_done - Invoked by RDMA provider for a signaled LOCAL_INV WC 599 * @cq: completion queue (ignored) 600 * @wc: completed WR 601 * 602 */ 603 static void frwr_wc_localinv_done(struct ib_cq *cq, struct ib_wc *wc) 604 { 605 struct ib_cqe *cqe = wc->wr_cqe; 606 struct rpcrdma_frwr *frwr = 607 container_of(cqe, struct rpcrdma_frwr, fr_cqe); 608 struct rpcrdma_mr *mr = container_of(frwr, struct rpcrdma_mr, frwr); 609 struct rpcrdma_rep *rep = mr->mr_req->rl_reply; 610 611 /* WARNING: Only wr_cqe and status are reliable at this point */ 612 trace_xprtrdma_wc_li_done(wc, frwr); 613 __frwr_release_mr(wc, mr); 614 615 /* Ensure @rep is generated before __frwr_release_mr */ 616 smp_rmb(); 617 rpcrdma_complete_rqst(rep); 618 } 619 620 /** 621 * frwr_unmap_async - invalidate memory regions that were registered for @req 622 * @r_xprt: controlling transport instance 623 * @req: rpcrdma_req with a non-empty list of MRs to process 624 * 625 * This guarantees that registered MRs are properly fenced from the 626 * server before the RPC consumer accesses the data in them. It also 627 * ensures proper Send flow control: waking the next RPC waits until 628 * this RPC has relinquished all its Send Queue entries. 629 */ 630 void frwr_unmap_async(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req) 631 { 632 struct ib_send_wr *first, *last, **prev; 633 const struct ib_send_wr *bad_wr; 634 struct rpcrdma_frwr *frwr; 635 struct rpcrdma_mr *mr; 636 int rc; 637 638 /* Chain the LOCAL_INV Work Requests and post them with 639 * a single ib_post_send() call. 640 */ 641 frwr = NULL; 642 prev = &first; 643 while ((mr = rpcrdma_mr_pop(&req->rl_registered))) { 644 645 trace_xprtrdma_mr_localinv(mr); 646 r_xprt->rx_stats.local_inv_needed++; 647 648 frwr = &mr->frwr; 649 frwr->fr_cqe.done = frwr_wc_localinv; 650 last = &frwr->fr_invwr; 651 last->next = NULL; 652 last->wr_cqe = &frwr->fr_cqe; 653 last->sg_list = NULL; 654 last->num_sge = 0; 655 last->opcode = IB_WR_LOCAL_INV; 656 last->send_flags = IB_SEND_SIGNALED; 657 last->ex.invalidate_rkey = mr->mr_handle; 658 659 *prev = last; 660 prev = &last->next; 661 } 662 663 /* Strong send queue ordering guarantees that when the 664 * last WR in the chain completes, all WRs in the chain 665 * are complete. The last completion will wake up the 666 * RPC waiter. 667 */ 668 frwr->fr_cqe.done = frwr_wc_localinv_done; 669 670 /* Transport disconnect drains the receive CQ before it 671 * replaces the QP. The RPC reply handler won't call us 672 * unless ri_id->qp is a valid pointer. 673 */ 674 bad_wr = NULL; 675 rc = ib_post_send(r_xprt->rx_ia.ri_id->qp, first, &bad_wr); 676 trace_xprtrdma_post_send(req, rc); 677 if (!rc) 678 return; 679 680 /* Recycle MRs in the LOCAL_INV chain that did not get posted. 681 */ 682 while (bad_wr) { 683 frwr = container_of(bad_wr, struct rpcrdma_frwr, fr_invwr); 684 mr = container_of(frwr, struct rpcrdma_mr, frwr); 685 bad_wr = bad_wr->next; 686 687 rpcrdma_mr_recycle(mr); 688 } 689 690 /* The final LOCAL_INV WR in the chain is supposed to 691 * do the wake. If it was never posted, the wake will 692 * not happen, so wake here in that case. 693 */ 694 rpcrdma_complete_rqst(req->rl_reply); 695 } 696