1 // SPDX-License-Identifier: GPL-2.0 OR BSD-3-Clause 2 /* 3 * Copyright (c) 2014-2017 Oracle. All rights reserved. 4 * Copyright (c) 2003-2007 Network Appliance, Inc. All rights reserved. 5 * 6 * This software is available to you under a choice of one of two 7 * licenses. You may choose to be licensed under the terms of the GNU 8 * General Public License (GPL) Version 2, available from the file 9 * COPYING in the main directory of this source tree, or the BSD-type 10 * license below: 11 * 12 * Redistribution and use in source and binary forms, with or without 13 * modification, are permitted provided that the following conditions 14 * are met: 15 * 16 * Redistributions of source code must retain the above copyright 17 * notice, this list of conditions and the following disclaimer. 18 * 19 * Redistributions in binary form must reproduce the above 20 * copyright notice, this list of conditions and the following 21 * disclaimer in the documentation and/or other materials provided 22 * with the distribution. 23 * 24 * Neither the name of the Network Appliance, Inc. nor the names of 25 * its contributors may be used to endorse or promote products 26 * derived from this software without specific prior written 27 * permission. 28 * 29 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 30 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 31 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 32 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 33 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 34 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 35 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 36 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 37 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 38 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 39 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 40 */ 41 42 /* 43 * rpc_rdma.c 44 * 45 * This file contains the guts of the RPC RDMA protocol, and 46 * does marshaling/unmarshaling, etc. It is also where interfacing 47 * to the Linux RPC framework lives. 48 */ 49 50 #include <linux/highmem.h> 51 52 #include <linux/sunrpc/svc_rdma.h> 53 54 #include "xprt_rdma.h" 55 #include <trace/events/rpcrdma.h> 56 57 #if IS_ENABLED(CONFIG_SUNRPC_DEBUG) 58 # define RPCDBG_FACILITY RPCDBG_TRANS 59 #endif 60 61 /* Returns size of largest RPC-over-RDMA header in a Call message 62 * 63 * The largest Call header contains a full-size Read list and a 64 * minimal Reply chunk. 65 */ 66 static unsigned int rpcrdma_max_call_header_size(unsigned int maxsegs) 67 { 68 unsigned int size; 69 70 /* Fixed header fields and list discriminators */ 71 size = RPCRDMA_HDRLEN_MIN; 72 73 /* Maximum Read list size */ 74 maxsegs += 2; /* segment for head and tail buffers */ 75 size = maxsegs * rpcrdma_readchunk_maxsz * sizeof(__be32); 76 77 /* Minimal Read chunk size */ 78 size += sizeof(__be32); /* segment count */ 79 size += rpcrdma_segment_maxsz * sizeof(__be32); 80 size += sizeof(__be32); /* list discriminator */ 81 82 dprintk("RPC: %s: max call header size = %u\n", 83 __func__, size); 84 return size; 85 } 86 87 /* Returns size of largest RPC-over-RDMA header in a Reply message 88 * 89 * There is only one Write list or one Reply chunk per Reply 90 * message. The larger list is the Write list. 91 */ 92 static unsigned int rpcrdma_max_reply_header_size(unsigned int maxsegs) 93 { 94 unsigned int size; 95 96 /* Fixed header fields and list discriminators */ 97 size = RPCRDMA_HDRLEN_MIN; 98 99 /* Maximum Write list size */ 100 maxsegs += 2; /* segment for head and tail buffers */ 101 size = sizeof(__be32); /* segment count */ 102 size += maxsegs * rpcrdma_segment_maxsz * sizeof(__be32); 103 size += sizeof(__be32); /* list discriminator */ 104 105 dprintk("RPC: %s: max reply header size = %u\n", 106 __func__, size); 107 return size; 108 } 109 110 void rpcrdma_set_max_header_sizes(struct rpcrdma_xprt *r_xprt) 111 { 112 struct rpcrdma_create_data_internal *cdata = &r_xprt->rx_data; 113 struct rpcrdma_ia *ia = &r_xprt->rx_ia; 114 unsigned int maxsegs = ia->ri_max_segs; 115 116 ia->ri_max_inline_write = cdata->inline_wsize - 117 rpcrdma_max_call_header_size(maxsegs); 118 ia->ri_max_inline_read = cdata->inline_rsize - 119 rpcrdma_max_reply_header_size(maxsegs); 120 } 121 122 /* The client can send a request inline as long as the RPCRDMA header 123 * plus the RPC call fit under the transport's inline limit. If the 124 * combined call message size exceeds that limit, the client must use 125 * a Read chunk for this operation. 126 * 127 * A Read chunk is also required if sending the RPC call inline would 128 * exceed this device's max_sge limit. 129 */ 130 static bool rpcrdma_args_inline(struct rpcrdma_xprt *r_xprt, 131 struct rpc_rqst *rqst) 132 { 133 struct xdr_buf *xdr = &rqst->rq_snd_buf; 134 unsigned int count, remaining, offset; 135 136 if (xdr->len > r_xprt->rx_ia.ri_max_inline_write) 137 return false; 138 139 if (xdr->page_len) { 140 remaining = xdr->page_len; 141 offset = offset_in_page(xdr->page_base); 142 count = RPCRDMA_MIN_SEND_SGES; 143 while (remaining) { 144 remaining -= min_t(unsigned int, 145 PAGE_SIZE - offset, remaining); 146 offset = 0; 147 if (++count > r_xprt->rx_ia.ri_max_send_sges) 148 return false; 149 } 150 } 151 152 return true; 153 } 154 155 /* The client can't know how large the actual reply will be. Thus it 156 * plans for the largest possible reply for that particular ULP 157 * operation. If the maximum combined reply message size exceeds that 158 * limit, the client must provide a write list or a reply chunk for 159 * this request. 160 */ 161 static bool rpcrdma_results_inline(struct rpcrdma_xprt *r_xprt, 162 struct rpc_rqst *rqst) 163 { 164 struct rpcrdma_ia *ia = &r_xprt->rx_ia; 165 166 return rqst->rq_rcv_buf.buflen <= ia->ri_max_inline_read; 167 } 168 169 /* Split @vec on page boundaries into SGEs. FMR registers pages, not 170 * a byte range. Other modes coalesce these SGEs into a single MR 171 * when they can. 172 * 173 * Returns pointer to next available SGE, and bumps the total number 174 * of SGEs consumed. 175 */ 176 static struct rpcrdma_mr_seg * 177 rpcrdma_convert_kvec(struct kvec *vec, struct rpcrdma_mr_seg *seg, 178 unsigned int *n) 179 { 180 u32 remaining, page_offset; 181 char *base; 182 183 base = vec->iov_base; 184 page_offset = offset_in_page(base); 185 remaining = vec->iov_len; 186 while (remaining) { 187 seg->mr_page = NULL; 188 seg->mr_offset = base; 189 seg->mr_len = min_t(u32, PAGE_SIZE - page_offset, remaining); 190 remaining -= seg->mr_len; 191 base += seg->mr_len; 192 ++seg; 193 ++(*n); 194 page_offset = 0; 195 } 196 return seg; 197 } 198 199 /* Convert @xdrbuf into SGEs no larger than a page each. As they 200 * are registered, these SGEs are then coalesced into RDMA segments 201 * when the selected memreg mode supports it. 202 * 203 * Returns positive number of SGEs consumed, or a negative errno. 204 */ 205 206 static int 207 rpcrdma_convert_iovs(struct rpcrdma_xprt *r_xprt, struct xdr_buf *xdrbuf, 208 unsigned int pos, enum rpcrdma_chunktype type, 209 struct rpcrdma_mr_seg *seg) 210 { 211 unsigned long page_base; 212 unsigned int len, n; 213 struct page **ppages; 214 215 n = 0; 216 if (pos == 0) 217 seg = rpcrdma_convert_kvec(&xdrbuf->head[0], seg, &n); 218 219 len = xdrbuf->page_len; 220 ppages = xdrbuf->pages + (xdrbuf->page_base >> PAGE_SHIFT); 221 page_base = offset_in_page(xdrbuf->page_base); 222 while (len) { 223 if (unlikely(!*ppages)) { 224 /* XXX: Certain upper layer operations do 225 * not provide receive buffer pages. 226 */ 227 *ppages = alloc_page(GFP_ATOMIC); 228 if (!*ppages) 229 return -ENOBUFS; 230 } 231 seg->mr_page = *ppages; 232 seg->mr_offset = (char *)page_base; 233 seg->mr_len = min_t(u32, PAGE_SIZE - page_base, len); 234 len -= seg->mr_len; 235 ++ppages; 236 ++seg; 237 ++n; 238 page_base = 0; 239 } 240 241 /* When encoding a Read chunk, the tail iovec contains an 242 * XDR pad and may be omitted. 243 */ 244 if (type == rpcrdma_readch && r_xprt->rx_ia.ri_implicit_roundup) 245 goto out; 246 247 /* When encoding a Write chunk, some servers need to see an 248 * extra segment for non-XDR-aligned Write chunks. The upper 249 * layer provides space in the tail iovec that may be used 250 * for this purpose. 251 */ 252 if (type == rpcrdma_writech && r_xprt->rx_ia.ri_implicit_roundup) 253 goto out; 254 255 if (xdrbuf->tail[0].iov_len) 256 seg = rpcrdma_convert_kvec(&xdrbuf->tail[0], seg, &n); 257 258 out: 259 if (unlikely(n > RPCRDMA_MAX_SEGS)) 260 return -EIO; 261 return n; 262 } 263 264 static inline int 265 encode_item_present(struct xdr_stream *xdr) 266 { 267 __be32 *p; 268 269 p = xdr_reserve_space(xdr, sizeof(*p)); 270 if (unlikely(!p)) 271 return -EMSGSIZE; 272 273 *p = xdr_one; 274 return 0; 275 } 276 277 static inline int 278 encode_item_not_present(struct xdr_stream *xdr) 279 { 280 __be32 *p; 281 282 p = xdr_reserve_space(xdr, sizeof(*p)); 283 if (unlikely(!p)) 284 return -EMSGSIZE; 285 286 *p = xdr_zero; 287 return 0; 288 } 289 290 static void 291 xdr_encode_rdma_segment(__be32 *iptr, struct rpcrdma_mr *mr) 292 { 293 *iptr++ = cpu_to_be32(mr->mr_handle); 294 *iptr++ = cpu_to_be32(mr->mr_length); 295 xdr_encode_hyper(iptr, mr->mr_offset); 296 } 297 298 static int 299 encode_rdma_segment(struct xdr_stream *xdr, struct rpcrdma_mr *mr) 300 { 301 __be32 *p; 302 303 p = xdr_reserve_space(xdr, 4 * sizeof(*p)); 304 if (unlikely(!p)) 305 return -EMSGSIZE; 306 307 xdr_encode_rdma_segment(p, mr); 308 return 0; 309 } 310 311 static int 312 encode_read_segment(struct xdr_stream *xdr, struct rpcrdma_mr *mr, 313 u32 position) 314 { 315 __be32 *p; 316 317 p = xdr_reserve_space(xdr, 6 * sizeof(*p)); 318 if (unlikely(!p)) 319 return -EMSGSIZE; 320 321 *p++ = xdr_one; /* Item present */ 322 *p++ = cpu_to_be32(position); 323 xdr_encode_rdma_segment(p, mr); 324 return 0; 325 } 326 327 /* Register and XDR encode the Read list. Supports encoding a list of read 328 * segments that belong to a single read chunk. 329 * 330 * Encoding key for single-list chunks (HLOO = Handle32 Length32 Offset64): 331 * 332 * Read chunklist (a linked list): 333 * N elements, position P (same P for all chunks of same arg!): 334 * 1 - PHLOO - 1 - PHLOO - ... - 1 - PHLOO - 0 335 * 336 * Returns zero on success, or a negative errno if a failure occurred. 337 * @xdr is advanced to the next position in the stream. 338 * 339 * Only a single @pos value is currently supported. 340 */ 341 static noinline int 342 rpcrdma_encode_read_list(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req, 343 struct rpc_rqst *rqst, enum rpcrdma_chunktype rtype) 344 { 345 struct xdr_stream *xdr = &req->rl_stream; 346 struct rpcrdma_mr_seg *seg; 347 struct rpcrdma_mr *mr; 348 unsigned int pos; 349 int nsegs; 350 351 pos = rqst->rq_snd_buf.head[0].iov_len; 352 if (rtype == rpcrdma_areadch) 353 pos = 0; 354 seg = req->rl_segments; 355 nsegs = rpcrdma_convert_iovs(r_xprt, &rqst->rq_snd_buf, pos, 356 rtype, seg); 357 if (nsegs < 0) 358 return nsegs; 359 360 do { 361 seg = r_xprt->rx_ia.ri_ops->ro_map(r_xprt, seg, nsegs, 362 false, &mr); 363 if (IS_ERR(seg)) 364 return PTR_ERR(seg); 365 rpcrdma_mr_push(mr, &req->rl_registered); 366 367 if (encode_read_segment(xdr, mr, pos) < 0) 368 return -EMSGSIZE; 369 370 trace_xprtrdma_read_chunk(rqst->rq_task, pos, mr, nsegs); 371 r_xprt->rx_stats.read_chunk_count++; 372 nsegs -= mr->mr_nents; 373 } while (nsegs); 374 375 return 0; 376 } 377 378 /* Register and XDR encode the Write list. Supports encoding a list 379 * containing one array of plain segments that belong to a single 380 * write chunk. 381 * 382 * Encoding key for single-list chunks (HLOO = Handle32 Length32 Offset64): 383 * 384 * Write chunklist (a list of (one) counted array): 385 * N elements: 386 * 1 - N - HLOO - HLOO - ... - HLOO - 0 387 * 388 * Returns zero on success, or a negative errno if a failure occurred. 389 * @xdr is advanced to the next position in the stream. 390 * 391 * Only a single Write chunk is currently supported. 392 */ 393 static noinline int 394 rpcrdma_encode_write_list(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req, 395 struct rpc_rqst *rqst, enum rpcrdma_chunktype wtype) 396 { 397 struct xdr_stream *xdr = &req->rl_stream; 398 struct rpcrdma_mr_seg *seg; 399 struct rpcrdma_mr *mr; 400 int nsegs, nchunks; 401 __be32 *segcount; 402 403 seg = req->rl_segments; 404 nsegs = rpcrdma_convert_iovs(r_xprt, &rqst->rq_rcv_buf, 405 rqst->rq_rcv_buf.head[0].iov_len, 406 wtype, seg); 407 if (nsegs < 0) 408 return nsegs; 409 410 if (encode_item_present(xdr) < 0) 411 return -EMSGSIZE; 412 segcount = xdr_reserve_space(xdr, sizeof(*segcount)); 413 if (unlikely(!segcount)) 414 return -EMSGSIZE; 415 /* Actual value encoded below */ 416 417 nchunks = 0; 418 do { 419 seg = r_xprt->rx_ia.ri_ops->ro_map(r_xprt, seg, nsegs, 420 true, &mr); 421 if (IS_ERR(seg)) 422 return PTR_ERR(seg); 423 rpcrdma_mr_push(mr, &req->rl_registered); 424 425 if (encode_rdma_segment(xdr, mr) < 0) 426 return -EMSGSIZE; 427 428 trace_xprtrdma_write_chunk(rqst->rq_task, mr, nsegs); 429 r_xprt->rx_stats.write_chunk_count++; 430 r_xprt->rx_stats.total_rdma_request += mr->mr_length; 431 nchunks++; 432 nsegs -= mr->mr_nents; 433 } while (nsegs); 434 435 /* Update count of segments in this Write chunk */ 436 *segcount = cpu_to_be32(nchunks); 437 438 return 0; 439 } 440 441 /* Register and XDR encode the Reply chunk. Supports encoding an array 442 * of plain segments that belong to a single write (reply) chunk. 443 * 444 * Encoding key for single-list chunks (HLOO = Handle32 Length32 Offset64): 445 * 446 * Reply chunk (a counted array): 447 * N elements: 448 * 1 - N - HLOO - HLOO - ... - HLOO 449 * 450 * Returns zero on success, or a negative errno if a failure occurred. 451 * @xdr is advanced to the next position in the stream. 452 */ 453 static noinline int 454 rpcrdma_encode_reply_chunk(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req, 455 struct rpc_rqst *rqst, enum rpcrdma_chunktype wtype) 456 { 457 struct xdr_stream *xdr = &req->rl_stream; 458 struct rpcrdma_mr_seg *seg; 459 struct rpcrdma_mr *mr; 460 int nsegs, nchunks; 461 __be32 *segcount; 462 463 seg = req->rl_segments; 464 nsegs = rpcrdma_convert_iovs(r_xprt, &rqst->rq_rcv_buf, 0, wtype, seg); 465 if (nsegs < 0) 466 return nsegs; 467 468 if (encode_item_present(xdr) < 0) 469 return -EMSGSIZE; 470 segcount = xdr_reserve_space(xdr, sizeof(*segcount)); 471 if (unlikely(!segcount)) 472 return -EMSGSIZE; 473 /* Actual value encoded below */ 474 475 nchunks = 0; 476 do { 477 seg = r_xprt->rx_ia.ri_ops->ro_map(r_xprt, seg, nsegs, 478 true, &mr); 479 if (IS_ERR(seg)) 480 return PTR_ERR(seg); 481 rpcrdma_mr_push(mr, &req->rl_registered); 482 483 if (encode_rdma_segment(xdr, mr) < 0) 484 return -EMSGSIZE; 485 486 trace_xprtrdma_reply_chunk(rqst->rq_task, mr, nsegs); 487 r_xprt->rx_stats.reply_chunk_count++; 488 r_xprt->rx_stats.total_rdma_request += mr->mr_length; 489 nchunks++; 490 nsegs -= mr->mr_nents; 491 } while (nsegs); 492 493 /* Update count of segments in the Reply chunk */ 494 *segcount = cpu_to_be32(nchunks); 495 496 return 0; 497 } 498 499 /** 500 * rpcrdma_unmap_sendctx - DMA-unmap Send buffers 501 * @sc: sendctx containing SGEs to unmap 502 * 503 */ 504 void 505 rpcrdma_unmap_sendctx(struct rpcrdma_sendctx *sc) 506 { 507 struct rpcrdma_ia *ia = &sc->sc_xprt->rx_ia; 508 struct ib_sge *sge; 509 unsigned int count; 510 511 /* The first two SGEs contain the transport header and 512 * the inline buffer. These are always left mapped so 513 * they can be cheaply re-used. 514 */ 515 sge = &sc->sc_sges[2]; 516 for (count = sc->sc_unmap_count; count; ++sge, --count) 517 ib_dma_unmap_page(ia->ri_device, 518 sge->addr, sge->length, DMA_TO_DEVICE); 519 520 if (test_and_clear_bit(RPCRDMA_REQ_F_TX_RESOURCES, &sc->sc_req->rl_flags)) { 521 smp_mb__after_atomic(); 522 wake_up_bit(&sc->sc_req->rl_flags, RPCRDMA_REQ_F_TX_RESOURCES); 523 } 524 } 525 526 /* Prepare an SGE for the RPC-over-RDMA transport header. 527 */ 528 static bool 529 rpcrdma_prepare_hdr_sge(struct rpcrdma_ia *ia, struct rpcrdma_req *req, 530 u32 len) 531 { 532 struct rpcrdma_sendctx *sc = req->rl_sendctx; 533 struct rpcrdma_regbuf *rb = req->rl_rdmabuf; 534 struct ib_sge *sge = sc->sc_sges; 535 536 if (!rpcrdma_dma_map_regbuf(ia, rb)) 537 goto out_regbuf; 538 sge->addr = rdmab_addr(rb); 539 sge->length = len; 540 sge->lkey = rdmab_lkey(rb); 541 542 ib_dma_sync_single_for_device(rdmab_device(rb), sge->addr, 543 sge->length, DMA_TO_DEVICE); 544 sc->sc_wr.num_sge++; 545 return true; 546 547 out_regbuf: 548 pr_err("rpcrdma: failed to DMA map a Send buffer\n"); 549 return false; 550 } 551 552 /* Prepare the Send SGEs. The head and tail iovec, and each entry 553 * in the page list, gets its own SGE. 554 */ 555 static bool 556 rpcrdma_prepare_msg_sges(struct rpcrdma_ia *ia, struct rpcrdma_req *req, 557 struct xdr_buf *xdr, enum rpcrdma_chunktype rtype) 558 { 559 struct rpcrdma_sendctx *sc = req->rl_sendctx; 560 unsigned int sge_no, page_base, len, remaining; 561 struct rpcrdma_regbuf *rb = req->rl_sendbuf; 562 struct ib_device *device = ia->ri_device; 563 struct ib_sge *sge = sc->sc_sges; 564 u32 lkey = ia->ri_pd->local_dma_lkey; 565 struct page *page, **ppages; 566 567 /* The head iovec is straightforward, as it is already 568 * DMA-mapped. Sync the content that has changed. 569 */ 570 if (!rpcrdma_dma_map_regbuf(ia, rb)) 571 goto out_regbuf; 572 sge_no = 1; 573 sge[sge_no].addr = rdmab_addr(rb); 574 sge[sge_no].length = xdr->head[0].iov_len; 575 sge[sge_no].lkey = rdmab_lkey(rb); 576 ib_dma_sync_single_for_device(rdmab_device(rb), sge[sge_no].addr, 577 sge[sge_no].length, DMA_TO_DEVICE); 578 579 /* If there is a Read chunk, the page list is being handled 580 * via explicit RDMA, and thus is skipped here. However, the 581 * tail iovec may include an XDR pad for the page list, as 582 * well as additional content, and may not reside in the 583 * same page as the head iovec. 584 */ 585 if (rtype == rpcrdma_readch) { 586 len = xdr->tail[0].iov_len; 587 588 /* Do not include the tail if it is only an XDR pad */ 589 if (len < 4) 590 goto out; 591 592 page = virt_to_page(xdr->tail[0].iov_base); 593 page_base = offset_in_page(xdr->tail[0].iov_base); 594 595 /* If the content in the page list is an odd length, 596 * xdr_write_pages() has added a pad at the beginning 597 * of the tail iovec. Force the tail's non-pad content 598 * to land at the next XDR position in the Send message. 599 */ 600 page_base += len & 3; 601 len -= len & 3; 602 goto map_tail; 603 } 604 605 /* If there is a page list present, temporarily DMA map 606 * and prepare an SGE for each page to be sent. 607 */ 608 if (xdr->page_len) { 609 ppages = xdr->pages + (xdr->page_base >> PAGE_SHIFT); 610 page_base = offset_in_page(xdr->page_base); 611 remaining = xdr->page_len; 612 while (remaining) { 613 sge_no++; 614 if (sge_no > RPCRDMA_MAX_SEND_SGES - 2) 615 goto out_mapping_overflow; 616 617 len = min_t(u32, PAGE_SIZE - page_base, remaining); 618 sge[sge_no].addr = ib_dma_map_page(device, *ppages, 619 page_base, len, 620 DMA_TO_DEVICE); 621 if (ib_dma_mapping_error(device, sge[sge_no].addr)) 622 goto out_mapping_err; 623 sge[sge_no].length = len; 624 sge[sge_no].lkey = lkey; 625 626 sc->sc_unmap_count++; 627 ppages++; 628 remaining -= len; 629 page_base = 0; 630 } 631 } 632 633 /* The tail iovec is not always constructed in the same 634 * page where the head iovec resides (see, for example, 635 * gss_wrap_req_priv). To neatly accommodate that case, 636 * DMA map it separately. 637 */ 638 if (xdr->tail[0].iov_len) { 639 page = virt_to_page(xdr->tail[0].iov_base); 640 page_base = offset_in_page(xdr->tail[0].iov_base); 641 len = xdr->tail[0].iov_len; 642 643 map_tail: 644 sge_no++; 645 sge[sge_no].addr = ib_dma_map_page(device, page, 646 page_base, len, 647 DMA_TO_DEVICE); 648 if (ib_dma_mapping_error(device, sge[sge_no].addr)) 649 goto out_mapping_err; 650 sge[sge_no].length = len; 651 sge[sge_no].lkey = lkey; 652 sc->sc_unmap_count++; 653 } 654 655 out: 656 sc->sc_wr.num_sge += sge_no; 657 if (sc->sc_unmap_count) 658 __set_bit(RPCRDMA_REQ_F_TX_RESOURCES, &req->rl_flags); 659 return true; 660 661 out_regbuf: 662 pr_err("rpcrdma: failed to DMA map a Send buffer\n"); 663 return false; 664 665 out_mapping_overflow: 666 rpcrdma_unmap_sendctx(sc); 667 pr_err("rpcrdma: too many Send SGEs (%u)\n", sge_no); 668 return false; 669 670 out_mapping_err: 671 rpcrdma_unmap_sendctx(sc); 672 pr_err("rpcrdma: Send mapping error\n"); 673 return false; 674 } 675 676 /** 677 * rpcrdma_prepare_send_sges - Construct SGEs for a Send WR 678 * @r_xprt: controlling transport 679 * @req: context of RPC Call being marshalled 680 * @hdrlen: size of transport header, in bytes 681 * @xdr: xdr_buf containing RPC Call 682 * @rtype: chunk type being encoded 683 * 684 * Returns 0 on success; otherwise a negative errno is returned. 685 */ 686 int 687 rpcrdma_prepare_send_sges(struct rpcrdma_xprt *r_xprt, 688 struct rpcrdma_req *req, u32 hdrlen, 689 struct xdr_buf *xdr, enum rpcrdma_chunktype rtype) 690 { 691 req->rl_sendctx = rpcrdma_sendctx_get_locked(&r_xprt->rx_buf); 692 if (!req->rl_sendctx) 693 return -EAGAIN; 694 req->rl_sendctx->sc_wr.num_sge = 0; 695 req->rl_sendctx->sc_unmap_count = 0; 696 req->rl_sendctx->sc_req = req; 697 __clear_bit(RPCRDMA_REQ_F_TX_RESOURCES, &req->rl_flags); 698 699 if (!rpcrdma_prepare_hdr_sge(&r_xprt->rx_ia, req, hdrlen)) 700 return -EIO; 701 702 if (rtype != rpcrdma_areadch) 703 if (!rpcrdma_prepare_msg_sges(&r_xprt->rx_ia, req, xdr, rtype)) 704 return -EIO; 705 706 return 0; 707 } 708 709 /** 710 * rpcrdma_marshal_req - Marshal and send one RPC request 711 * @r_xprt: controlling transport 712 * @rqst: RPC request to be marshaled 713 * 714 * For the RPC in "rqst", this function: 715 * - Chooses the transfer mode (eg., RDMA_MSG or RDMA_NOMSG) 716 * - Registers Read, Write, and Reply chunks 717 * - Constructs the transport header 718 * - Posts a Send WR to send the transport header and request 719 * 720 * Returns: 721 * %0 if the RPC was sent successfully, 722 * %-ENOTCONN if the connection was lost, 723 * %-EAGAIN if the caller should call again with the same arguments, 724 * %-ENOBUFS if the caller should call again after a delay, 725 * %-EMSGSIZE if the transport header is too small, 726 * %-EIO if a permanent problem occurred while marshaling. 727 */ 728 int 729 rpcrdma_marshal_req(struct rpcrdma_xprt *r_xprt, struct rpc_rqst *rqst) 730 { 731 struct rpcrdma_req *req = rpcr_to_rdmar(rqst); 732 struct xdr_stream *xdr = &req->rl_stream; 733 enum rpcrdma_chunktype rtype, wtype; 734 bool ddp_allowed; 735 __be32 *p; 736 int ret; 737 738 rpcrdma_set_xdrlen(&req->rl_hdrbuf, 0); 739 xdr_init_encode(xdr, &req->rl_hdrbuf, 740 req->rl_rdmabuf->rg_base); 741 742 /* Fixed header fields */ 743 ret = -EMSGSIZE; 744 p = xdr_reserve_space(xdr, 4 * sizeof(*p)); 745 if (!p) 746 goto out_err; 747 *p++ = rqst->rq_xid; 748 *p++ = rpcrdma_version; 749 *p++ = cpu_to_be32(r_xprt->rx_buf.rb_max_requests); 750 751 /* When the ULP employs a GSS flavor that guarantees integrity 752 * or privacy, direct data placement of individual data items 753 * is not allowed. 754 */ 755 ddp_allowed = !(rqst->rq_cred->cr_auth->au_flags & 756 RPCAUTH_AUTH_DATATOUCH); 757 758 /* 759 * Chunks needed for results? 760 * 761 * o If the expected result is under the inline threshold, all ops 762 * return as inline. 763 * o Large read ops return data as write chunk(s), header as 764 * inline. 765 * o Large non-read ops return as a single reply chunk. 766 */ 767 if (rpcrdma_results_inline(r_xprt, rqst)) 768 wtype = rpcrdma_noch; 769 else if (ddp_allowed && rqst->rq_rcv_buf.flags & XDRBUF_READ) 770 wtype = rpcrdma_writech; 771 else 772 wtype = rpcrdma_replych; 773 774 /* 775 * Chunks needed for arguments? 776 * 777 * o If the total request is under the inline threshold, all ops 778 * are sent as inline. 779 * o Large write ops transmit data as read chunk(s), header as 780 * inline. 781 * o Large non-write ops are sent with the entire message as a 782 * single read chunk (protocol 0-position special case). 783 * 784 * This assumes that the upper layer does not present a request 785 * that both has a data payload, and whose non-data arguments 786 * by themselves are larger than the inline threshold. 787 */ 788 if (rpcrdma_args_inline(r_xprt, rqst)) { 789 *p++ = rdma_msg; 790 rtype = rpcrdma_noch; 791 } else if (ddp_allowed && rqst->rq_snd_buf.flags & XDRBUF_WRITE) { 792 *p++ = rdma_msg; 793 rtype = rpcrdma_readch; 794 } else { 795 r_xprt->rx_stats.nomsg_call_count++; 796 *p++ = rdma_nomsg; 797 rtype = rpcrdma_areadch; 798 } 799 800 /* If this is a retransmit, discard previously registered 801 * chunks. Very likely the connection has been replaced, 802 * so these registrations are invalid and unusable. 803 */ 804 while (unlikely(!list_empty(&req->rl_registered))) { 805 struct rpcrdma_mr *mr; 806 807 mr = rpcrdma_mr_pop(&req->rl_registered); 808 rpcrdma_mr_defer_recovery(mr); 809 } 810 811 /* This implementation supports the following combinations 812 * of chunk lists in one RPC-over-RDMA Call message: 813 * 814 * - Read list 815 * - Write list 816 * - Reply chunk 817 * - Read list + Reply chunk 818 * 819 * It might not yet support the following combinations: 820 * 821 * - Read list + Write list 822 * 823 * It does not support the following combinations: 824 * 825 * - Write list + Reply chunk 826 * - Read list + Write list + Reply chunk 827 * 828 * This implementation supports only a single chunk in each 829 * Read or Write list. Thus for example the client cannot 830 * send a Call message with a Position Zero Read chunk and a 831 * regular Read chunk at the same time. 832 */ 833 if (rtype != rpcrdma_noch) { 834 ret = rpcrdma_encode_read_list(r_xprt, req, rqst, rtype); 835 if (ret) 836 goto out_err; 837 } 838 ret = encode_item_not_present(xdr); 839 if (ret) 840 goto out_err; 841 842 if (wtype == rpcrdma_writech) { 843 ret = rpcrdma_encode_write_list(r_xprt, req, rqst, wtype); 844 if (ret) 845 goto out_err; 846 } 847 ret = encode_item_not_present(xdr); 848 if (ret) 849 goto out_err; 850 851 if (wtype != rpcrdma_replych) 852 ret = encode_item_not_present(xdr); 853 else 854 ret = rpcrdma_encode_reply_chunk(r_xprt, req, rqst, wtype); 855 if (ret) 856 goto out_err; 857 858 trace_xprtrdma_marshal(rqst, xdr_stream_pos(xdr), rtype, wtype); 859 860 ret = rpcrdma_prepare_send_sges(r_xprt, req, xdr_stream_pos(xdr), 861 &rqst->rq_snd_buf, rtype); 862 if (ret) 863 goto out_err; 864 return 0; 865 866 out_err: 867 switch (ret) { 868 case -EAGAIN: 869 xprt_wait_for_buffer_space(rqst->rq_task, NULL); 870 break; 871 case -ENOBUFS: 872 break; 873 default: 874 r_xprt->rx_stats.failed_marshal_count++; 875 } 876 return ret; 877 } 878 879 /** 880 * rpcrdma_inline_fixup - Scatter inline received data into rqst's iovecs 881 * @rqst: controlling RPC request 882 * @srcp: points to RPC message payload in receive buffer 883 * @copy_len: remaining length of receive buffer content 884 * @pad: Write chunk pad bytes needed (zero for pure inline) 885 * 886 * The upper layer has set the maximum number of bytes it can 887 * receive in each component of rq_rcv_buf. These values are set in 888 * the head.iov_len, page_len, tail.iov_len, and buflen fields. 889 * 890 * Unlike the TCP equivalent (xdr_partial_copy_from_skb), in 891 * many cases this function simply updates iov_base pointers in 892 * rq_rcv_buf to point directly to the received reply data, to 893 * avoid copying reply data. 894 * 895 * Returns the count of bytes which had to be memcopied. 896 */ 897 static unsigned long 898 rpcrdma_inline_fixup(struct rpc_rqst *rqst, char *srcp, int copy_len, int pad) 899 { 900 unsigned long fixup_copy_count; 901 int i, npages, curlen; 902 char *destp; 903 struct page **ppages; 904 int page_base; 905 906 /* The head iovec is redirected to the RPC reply message 907 * in the receive buffer, to avoid a memcopy. 908 */ 909 rqst->rq_rcv_buf.head[0].iov_base = srcp; 910 rqst->rq_private_buf.head[0].iov_base = srcp; 911 912 /* The contents of the receive buffer that follow 913 * head.iov_len bytes are copied into the page list. 914 */ 915 curlen = rqst->rq_rcv_buf.head[0].iov_len; 916 if (curlen > copy_len) 917 curlen = copy_len; 918 trace_xprtrdma_fixup(rqst, copy_len, curlen); 919 srcp += curlen; 920 copy_len -= curlen; 921 922 ppages = rqst->rq_rcv_buf.pages + 923 (rqst->rq_rcv_buf.page_base >> PAGE_SHIFT); 924 page_base = offset_in_page(rqst->rq_rcv_buf.page_base); 925 fixup_copy_count = 0; 926 if (copy_len && rqst->rq_rcv_buf.page_len) { 927 int pagelist_len; 928 929 pagelist_len = rqst->rq_rcv_buf.page_len; 930 if (pagelist_len > copy_len) 931 pagelist_len = copy_len; 932 npages = PAGE_ALIGN(page_base + pagelist_len) >> PAGE_SHIFT; 933 for (i = 0; i < npages; i++) { 934 curlen = PAGE_SIZE - page_base; 935 if (curlen > pagelist_len) 936 curlen = pagelist_len; 937 938 trace_xprtrdma_fixup_pg(rqst, i, srcp, 939 copy_len, curlen); 940 destp = kmap_atomic(ppages[i]); 941 memcpy(destp + page_base, srcp, curlen); 942 flush_dcache_page(ppages[i]); 943 kunmap_atomic(destp); 944 srcp += curlen; 945 copy_len -= curlen; 946 fixup_copy_count += curlen; 947 pagelist_len -= curlen; 948 if (!pagelist_len) 949 break; 950 page_base = 0; 951 } 952 953 /* Implicit padding for the last segment in a Write 954 * chunk is inserted inline at the front of the tail 955 * iovec. The upper layer ignores the content of 956 * the pad. Simply ensure inline content in the tail 957 * that follows the Write chunk is properly aligned. 958 */ 959 if (pad) 960 srcp -= pad; 961 } 962 963 /* The tail iovec is redirected to the remaining data 964 * in the receive buffer, to avoid a memcopy. 965 */ 966 if (copy_len || pad) { 967 rqst->rq_rcv_buf.tail[0].iov_base = srcp; 968 rqst->rq_private_buf.tail[0].iov_base = srcp; 969 } 970 971 return fixup_copy_count; 972 } 973 974 /* By convention, backchannel calls arrive via rdma_msg type 975 * messages, and never populate the chunk lists. This makes 976 * the RPC/RDMA header small and fixed in size, so it is 977 * straightforward to check the RPC header's direction field. 978 */ 979 static bool 980 rpcrdma_is_bcall(struct rpcrdma_xprt *r_xprt, struct rpcrdma_rep *rep) 981 #if defined(CONFIG_SUNRPC_BACKCHANNEL) 982 { 983 struct xdr_stream *xdr = &rep->rr_stream; 984 __be32 *p; 985 986 if (rep->rr_proc != rdma_msg) 987 return false; 988 989 /* Peek at stream contents without advancing. */ 990 p = xdr_inline_decode(xdr, 0); 991 992 /* Chunk lists */ 993 if (*p++ != xdr_zero) 994 return false; 995 if (*p++ != xdr_zero) 996 return false; 997 if (*p++ != xdr_zero) 998 return false; 999 1000 /* RPC header */ 1001 if (*p++ != rep->rr_xid) 1002 return false; 1003 if (*p != cpu_to_be32(RPC_CALL)) 1004 return false; 1005 1006 /* Now that we are sure this is a backchannel call, 1007 * advance to the RPC header. 1008 */ 1009 p = xdr_inline_decode(xdr, 3 * sizeof(*p)); 1010 if (unlikely(!p)) 1011 goto out_short; 1012 1013 rpcrdma_bc_receive_call(r_xprt, rep); 1014 return true; 1015 1016 out_short: 1017 pr_warn("RPC/RDMA short backward direction call\n"); 1018 return true; 1019 } 1020 #else /* CONFIG_SUNRPC_BACKCHANNEL */ 1021 { 1022 return false; 1023 } 1024 #endif /* CONFIG_SUNRPC_BACKCHANNEL */ 1025 1026 static int decode_rdma_segment(struct xdr_stream *xdr, u32 *length) 1027 { 1028 u32 handle; 1029 u64 offset; 1030 __be32 *p; 1031 1032 p = xdr_inline_decode(xdr, 4 * sizeof(*p)); 1033 if (unlikely(!p)) 1034 return -EIO; 1035 1036 handle = be32_to_cpup(p++); 1037 *length = be32_to_cpup(p++); 1038 xdr_decode_hyper(p, &offset); 1039 1040 trace_xprtrdma_decode_seg(handle, *length, offset); 1041 return 0; 1042 } 1043 1044 static int decode_write_chunk(struct xdr_stream *xdr, u32 *length) 1045 { 1046 u32 segcount, seglength; 1047 __be32 *p; 1048 1049 p = xdr_inline_decode(xdr, sizeof(*p)); 1050 if (unlikely(!p)) 1051 return -EIO; 1052 1053 *length = 0; 1054 segcount = be32_to_cpup(p); 1055 while (segcount--) { 1056 if (decode_rdma_segment(xdr, &seglength)) 1057 return -EIO; 1058 *length += seglength; 1059 } 1060 1061 return 0; 1062 } 1063 1064 /* In RPC-over-RDMA Version One replies, a Read list is never 1065 * expected. This decoder is a stub that returns an error if 1066 * a Read list is present. 1067 */ 1068 static int decode_read_list(struct xdr_stream *xdr) 1069 { 1070 __be32 *p; 1071 1072 p = xdr_inline_decode(xdr, sizeof(*p)); 1073 if (unlikely(!p)) 1074 return -EIO; 1075 if (unlikely(*p != xdr_zero)) 1076 return -EIO; 1077 return 0; 1078 } 1079 1080 /* Supports only one Write chunk in the Write list 1081 */ 1082 static int decode_write_list(struct xdr_stream *xdr, u32 *length) 1083 { 1084 u32 chunklen; 1085 bool first; 1086 __be32 *p; 1087 1088 *length = 0; 1089 first = true; 1090 do { 1091 p = xdr_inline_decode(xdr, sizeof(*p)); 1092 if (unlikely(!p)) 1093 return -EIO; 1094 if (*p == xdr_zero) 1095 break; 1096 if (!first) 1097 return -EIO; 1098 1099 if (decode_write_chunk(xdr, &chunklen)) 1100 return -EIO; 1101 *length += chunklen; 1102 first = false; 1103 } while (true); 1104 return 0; 1105 } 1106 1107 static int decode_reply_chunk(struct xdr_stream *xdr, u32 *length) 1108 { 1109 __be32 *p; 1110 1111 p = xdr_inline_decode(xdr, sizeof(*p)); 1112 if (unlikely(!p)) 1113 return -EIO; 1114 1115 *length = 0; 1116 if (*p != xdr_zero) 1117 if (decode_write_chunk(xdr, length)) 1118 return -EIO; 1119 return 0; 1120 } 1121 1122 static int 1123 rpcrdma_decode_msg(struct rpcrdma_xprt *r_xprt, struct rpcrdma_rep *rep, 1124 struct rpc_rqst *rqst) 1125 { 1126 struct xdr_stream *xdr = &rep->rr_stream; 1127 u32 writelist, replychunk, rpclen; 1128 char *base; 1129 1130 /* Decode the chunk lists */ 1131 if (decode_read_list(xdr)) 1132 return -EIO; 1133 if (decode_write_list(xdr, &writelist)) 1134 return -EIO; 1135 if (decode_reply_chunk(xdr, &replychunk)) 1136 return -EIO; 1137 1138 /* RDMA_MSG sanity checks */ 1139 if (unlikely(replychunk)) 1140 return -EIO; 1141 1142 /* Build the RPC reply's Payload stream in rqst->rq_rcv_buf */ 1143 base = (char *)xdr_inline_decode(xdr, 0); 1144 rpclen = xdr_stream_remaining(xdr); 1145 r_xprt->rx_stats.fixup_copy_count += 1146 rpcrdma_inline_fixup(rqst, base, rpclen, writelist & 3); 1147 1148 r_xprt->rx_stats.total_rdma_reply += writelist; 1149 return rpclen + xdr_align_size(writelist); 1150 } 1151 1152 static noinline int 1153 rpcrdma_decode_nomsg(struct rpcrdma_xprt *r_xprt, struct rpcrdma_rep *rep) 1154 { 1155 struct xdr_stream *xdr = &rep->rr_stream; 1156 u32 writelist, replychunk; 1157 1158 /* Decode the chunk lists */ 1159 if (decode_read_list(xdr)) 1160 return -EIO; 1161 if (decode_write_list(xdr, &writelist)) 1162 return -EIO; 1163 if (decode_reply_chunk(xdr, &replychunk)) 1164 return -EIO; 1165 1166 /* RDMA_NOMSG sanity checks */ 1167 if (unlikely(writelist)) 1168 return -EIO; 1169 if (unlikely(!replychunk)) 1170 return -EIO; 1171 1172 /* Reply chunk buffer already is the reply vector */ 1173 r_xprt->rx_stats.total_rdma_reply += replychunk; 1174 return replychunk; 1175 } 1176 1177 static noinline int 1178 rpcrdma_decode_error(struct rpcrdma_xprt *r_xprt, struct rpcrdma_rep *rep, 1179 struct rpc_rqst *rqst) 1180 { 1181 struct xdr_stream *xdr = &rep->rr_stream; 1182 __be32 *p; 1183 1184 p = xdr_inline_decode(xdr, sizeof(*p)); 1185 if (unlikely(!p)) 1186 return -EIO; 1187 1188 switch (*p) { 1189 case err_vers: 1190 p = xdr_inline_decode(xdr, 2 * sizeof(*p)); 1191 if (!p) 1192 break; 1193 dprintk("RPC: %5u: %s: server reports version error (%u-%u)\n", 1194 rqst->rq_task->tk_pid, __func__, 1195 be32_to_cpup(p), be32_to_cpu(*(p + 1))); 1196 break; 1197 case err_chunk: 1198 dprintk("RPC: %5u: %s: server reports header decoding error\n", 1199 rqst->rq_task->tk_pid, __func__); 1200 break; 1201 default: 1202 dprintk("RPC: %5u: %s: server reports unrecognized error %d\n", 1203 rqst->rq_task->tk_pid, __func__, be32_to_cpup(p)); 1204 } 1205 1206 r_xprt->rx_stats.bad_reply_count++; 1207 return -EREMOTEIO; 1208 } 1209 1210 /* Perform XID lookup, reconstruction of the RPC reply, and 1211 * RPC completion while holding the transport lock to ensure 1212 * the rep, rqst, and rq_task pointers remain stable. 1213 */ 1214 void rpcrdma_complete_rqst(struct rpcrdma_rep *rep) 1215 { 1216 struct rpcrdma_xprt *r_xprt = rep->rr_rxprt; 1217 struct rpc_xprt *xprt = &r_xprt->rx_xprt; 1218 struct rpc_rqst *rqst = rep->rr_rqst; 1219 unsigned long cwnd; 1220 int status; 1221 1222 xprt->reestablish_timeout = 0; 1223 1224 switch (rep->rr_proc) { 1225 case rdma_msg: 1226 status = rpcrdma_decode_msg(r_xprt, rep, rqst); 1227 break; 1228 case rdma_nomsg: 1229 status = rpcrdma_decode_nomsg(r_xprt, rep); 1230 break; 1231 case rdma_error: 1232 status = rpcrdma_decode_error(r_xprt, rep, rqst); 1233 break; 1234 default: 1235 status = -EIO; 1236 } 1237 if (status < 0) 1238 goto out_badheader; 1239 1240 out: 1241 spin_lock(&xprt->recv_lock); 1242 cwnd = xprt->cwnd; 1243 xprt->cwnd = r_xprt->rx_buf.rb_credits << RPC_CWNDSHIFT; 1244 if (xprt->cwnd > cwnd) 1245 xprt_release_rqst_cong(rqst->rq_task); 1246 1247 xprt_complete_rqst(rqst->rq_task, status); 1248 xprt_unpin_rqst(rqst); 1249 spin_unlock(&xprt->recv_lock); 1250 return; 1251 1252 /* If the incoming reply terminated a pending RPC, the next 1253 * RPC call will post a replacement receive buffer as it is 1254 * being marshaled. 1255 */ 1256 out_badheader: 1257 trace_xprtrdma_reply_hdr(rep); 1258 r_xprt->rx_stats.bad_reply_count++; 1259 status = -EIO; 1260 goto out; 1261 } 1262 1263 void rpcrdma_release_rqst(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req) 1264 { 1265 /* Invalidate and unmap the data payloads before waking 1266 * the waiting application. This guarantees the memory 1267 * regions are properly fenced from the server before the 1268 * application accesses the data. It also ensures proper 1269 * send flow control: waking the next RPC waits until this 1270 * RPC has relinquished all its Send Queue entries. 1271 */ 1272 if (!list_empty(&req->rl_registered)) 1273 r_xprt->rx_ia.ri_ops->ro_unmap_sync(r_xprt, 1274 &req->rl_registered); 1275 1276 /* Ensure that any DMA mapped pages associated with 1277 * the Send of the RPC Call have been unmapped before 1278 * allowing the RPC to complete. This protects argument 1279 * memory not controlled by the RPC client from being 1280 * re-used before we're done with it. 1281 */ 1282 if (test_bit(RPCRDMA_REQ_F_TX_RESOURCES, &req->rl_flags)) { 1283 r_xprt->rx_stats.reply_waits_for_send++; 1284 out_of_line_wait_on_bit(&req->rl_flags, 1285 RPCRDMA_REQ_F_TX_RESOURCES, 1286 bit_wait, 1287 TASK_UNINTERRUPTIBLE); 1288 } 1289 } 1290 1291 /* Reply handling runs in the poll worker thread. Anything that 1292 * might wait is deferred to a separate workqueue. 1293 */ 1294 void rpcrdma_deferred_completion(struct work_struct *work) 1295 { 1296 struct rpcrdma_rep *rep = 1297 container_of(work, struct rpcrdma_rep, rr_work); 1298 struct rpcrdma_req *req = rpcr_to_rdmar(rep->rr_rqst); 1299 struct rpcrdma_xprt *r_xprt = rep->rr_rxprt; 1300 1301 trace_xprtrdma_defer_cmp(rep); 1302 if (rep->rr_wc_flags & IB_WC_WITH_INVALIDATE) 1303 r_xprt->rx_ia.ri_ops->ro_reminv(rep, &req->rl_registered); 1304 rpcrdma_release_rqst(r_xprt, req); 1305 rpcrdma_complete_rqst(rep); 1306 } 1307 1308 /* Process received RPC/RDMA messages. 1309 * 1310 * Errors must result in the RPC task either being awakened, or 1311 * allowed to timeout, to discover the errors at that time. 1312 */ 1313 void rpcrdma_reply_handler(struct rpcrdma_rep *rep) 1314 { 1315 struct rpcrdma_xprt *r_xprt = rep->rr_rxprt; 1316 struct rpc_xprt *xprt = &r_xprt->rx_xprt; 1317 struct rpcrdma_buffer *buf = &r_xprt->rx_buf; 1318 struct rpcrdma_req *req; 1319 struct rpc_rqst *rqst; 1320 u32 credits; 1321 __be32 *p; 1322 1323 --buf->rb_posted_receives; 1324 1325 if (rep->rr_hdrbuf.head[0].iov_len == 0) 1326 goto out_badstatus; 1327 1328 /* Fixed transport header fields */ 1329 xdr_init_decode(&rep->rr_stream, &rep->rr_hdrbuf, 1330 rep->rr_hdrbuf.head[0].iov_base); 1331 p = xdr_inline_decode(&rep->rr_stream, 4 * sizeof(*p)); 1332 if (unlikely(!p)) 1333 goto out_shortreply; 1334 rep->rr_xid = *p++; 1335 rep->rr_vers = *p++; 1336 credits = be32_to_cpu(*p++); 1337 rep->rr_proc = *p++; 1338 1339 if (rep->rr_vers != rpcrdma_version) 1340 goto out_badversion; 1341 1342 if (rpcrdma_is_bcall(r_xprt, rep)) 1343 return; 1344 1345 /* Match incoming rpcrdma_rep to an rpcrdma_req to 1346 * get context for handling any incoming chunks. 1347 */ 1348 spin_lock(&xprt->recv_lock); 1349 rqst = xprt_lookup_rqst(xprt, rep->rr_xid); 1350 if (!rqst) 1351 goto out_norqst; 1352 xprt_pin_rqst(rqst); 1353 1354 if (credits == 0) 1355 credits = 1; /* don't deadlock */ 1356 else if (credits > buf->rb_max_requests) 1357 credits = buf->rb_max_requests; 1358 buf->rb_credits = credits; 1359 1360 spin_unlock(&xprt->recv_lock); 1361 1362 req = rpcr_to_rdmar(rqst); 1363 req->rl_reply = rep; 1364 rep->rr_rqst = rqst; 1365 clear_bit(RPCRDMA_REQ_F_PENDING, &req->rl_flags); 1366 1367 trace_xprtrdma_reply(rqst->rq_task, rep, req, credits); 1368 1369 rpcrdma_post_recvs(r_xprt, false); 1370 queue_work(rpcrdma_receive_wq, &rep->rr_work); 1371 return; 1372 1373 out_badversion: 1374 trace_xprtrdma_reply_vers(rep); 1375 goto repost; 1376 1377 /* The RPC transaction has already been terminated, or the header 1378 * is corrupt. 1379 */ 1380 out_norqst: 1381 spin_unlock(&xprt->recv_lock); 1382 trace_xprtrdma_reply_rqst(rep); 1383 goto repost; 1384 1385 out_shortreply: 1386 trace_xprtrdma_reply_short(rep); 1387 1388 /* If no pending RPC transaction was matched, post a replacement 1389 * receive buffer before returning. 1390 */ 1391 repost: 1392 rpcrdma_post_recvs(r_xprt, false); 1393 out_badstatus: 1394 rpcrdma_recv_buffer_put(rep); 1395 } 1396