1 // SPDX-License-Identifier: GPL-2.0 OR BSD-3-Clause 2 /* 3 * Copyright (c) 2014-2020, Oracle and/or its affiliates. 4 * Copyright (c) 2003-2007 Network Appliance, Inc. All rights reserved. 5 * 6 * This software is available to you under a choice of one of two 7 * licenses. You may choose to be licensed under the terms of the GNU 8 * General Public License (GPL) Version 2, available from the file 9 * COPYING in the main directory of this source tree, or the BSD-type 10 * license below: 11 * 12 * Redistribution and use in source and binary forms, with or without 13 * modification, are permitted provided that the following conditions 14 * are met: 15 * 16 * Redistributions of source code must retain the above copyright 17 * notice, this list of conditions and the following disclaimer. 18 * 19 * Redistributions in binary form must reproduce the above 20 * copyright notice, this list of conditions and the following 21 * disclaimer in the documentation and/or other materials provided 22 * with the distribution. 23 * 24 * Neither the name of the Network Appliance, Inc. nor the names of 25 * its contributors may be used to endorse or promote products 26 * derived from this software without specific prior written 27 * permission. 28 * 29 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 30 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 31 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 32 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 33 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 34 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 35 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 36 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 37 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 38 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 39 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 40 */ 41 42 /* 43 * rpc_rdma.c 44 * 45 * This file contains the guts of the RPC RDMA protocol, and 46 * does marshaling/unmarshaling, etc. It is also where interfacing 47 * to the Linux RPC framework lives. 48 */ 49 50 #include <linux/highmem.h> 51 52 #include <linux/sunrpc/svc_rdma.h> 53 54 #include "xprt_rdma.h" 55 #include <trace/events/rpcrdma.h> 56 57 #if IS_ENABLED(CONFIG_SUNRPC_DEBUG) 58 # define RPCDBG_FACILITY RPCDBG_TRANS 59 #endif 60 61 /* Returns size of largest RPC-over-RDMA header in a Call message 62 * 63 * The largest Call header contains a full-size Read list and a 64 * minimal Reply chunk. 65 */ 66 static unsigned int rpcrdma_max_call_header_size(unsigned int maxsegs) 67 { 68 unsigned int size; 69 70 /* Fixed header fields and list discriminators */ 71 size = RPCRDMA_HDRLEN_MIN; 72 73 /* Maximum Read list size */ 74 size += maxsegs * rpcrdma_readchunk_maxsz * sizeof(__be32); 75 76 /* Minimal Read chunk size */ 77 size += sizeof(__be32); /* segment count */ 78 size += rpcrdma_segment_maxsz * sizeof(__be32); 79 size += sizeof(__be32); /* list discriminator */ 80 81 return size; 82 } 83 84 /* Returns size of largest RPC-over-RDMA header in a Reply message 85 * 86 * There is only one Write list or one Reply chunk per Reply 87 * message. The larger list is the Write list. 88 */ 89 static unsigned int rpcrdma_max_reply_header_size(unsigned int maxsegs) 90 { 91 unsigned int size; 92 93 /* Fixed header fields and list discriminators */ 94 size = RPCRDMA_HDRLEN_MIN; 95 96 /* Maximum Write list size */ 97 size += sizeof(__be32); /* segment count */ 98 size += maxsegs * rpcrdma_segment_maxsz * sizeof(__be32); 99 size += sizeof(__be32); /* list discriminator */ 100 101 return size; 102 } 103 104 /** 105 * rpcrdma_set_max_header_sizes - Initialize inline payload sizes 106 * @ep: endpoint to initialize 107 * 108 * The max_inline fields contain the maximum size of an RPC message 109 * so the marshaling code doesn't have to repeat this calculation 110 * for every RPC. 111 */ 112 void rpcrdma_set_max_header_sizes(struct rpcrdma_ep *ep) 113 { 114 unsigned int maxsegs = ep->re_max_rdma_segs; 115 116 ep->re_max_inline_send = 117 ep->re_inline_send - rpcrdma_max_call_header_size(maxsegs); 118 ep->re_max_inline_recv = 119 ep->re_inline_recv - rpcrdma_max_reply_header_size(maxsegs); 120 } 121 122 /* The client can send a request inline as long as the RPCRDMA header 123 * plus the RPC call fit under the transport's inline limit. If the 124 * combined call message size exceeds that limit, the client must use 125 * a Read chunk for this operation. 126 * 127 * A Read chunk is also required if sending the RPC call inline would 128 * exceed this device's max_sge limit. 129 */ 130 static bool rpcrdma_args_inline(struct rpcrdma_xprt *r_xprt, 131 struct rpc_rqst *rqst) 132 { 133 struct xdr_buf *xdr = &rqst->rq_snd_buf; 134 struct rpcrdma_ep *ep = r_xprt->rx_ep; 135 unsigned int count, remaining, offset; 136 137 if (xdr->len > ep->re_max_inline_send) 138 return false; 139 140 if (xdr->page_len) { 141 remaining = xdr->page_len; 142 offset = offset_in_page(xdr->page_base); 143 count = RPCRDMA_MIN_SEND_SGES; 144 while (remaining) { 145 remaining -= min_t(unsigned int, 146 PAGE_SIZE - offset, remaining); 147 offset = 0; 148 if (++count > ep->re_attr.cap.max_send_sge) 149 return false; 150 } 151 } 152 153 return true; 154 } 155 156 /* The client can't know how large the actual reply will be. Thus it 157 * plans for the largest possible reply for that particular ULP 158 * operation. If the maximum combined reply message size exceeds that 159 * limit, the client must provide a write list or a reply chunk for 160 * this request. 161 */ 162 static bool rpcrdma_results_inline(struct rpcrdma_xprt *r_xprt, 163 struct rpc_rqst *rqst) 164 { 165 return rqst->rq_rcv_buf.buflen <= r_xprt->rx_ep->re_max_inline_recv; 166 } 167 168 /* The client is required to provide a Reply chunk if the maximum 169 * size of the non-payload part of the RPC Reply is larger than 170 * the inline threshold. 171 */ 172 static bool 173 rpcrdma_nonpayload_inline(const struct rpcrdma_xprt *r_xprt, 174 const struct rpc_rqst *rqst) 175 { 176 const struct xdr_buf *buf = &rqst->rq_rcv_buf; 177 178 return (buf->head[0].iov_len + buf->tail[0].iov_len) < 179 r_xprt->rx_ep->re_max_inline_recv; 180 } 181 182 /* ACL likes to be lazy in allocating pages. For TCP, these 183 * pages can be allocated during receive processing. Not true 184 * for RDMA, which must always provision receive buffers 185 * up front. 186 */ 187 static noinline int 188 rpcrdma_alloc_sparse_pages(struct xdr_buf *buf) 189 { 190 struct page **ppages; 191 int len; 192 193 len = buf->page_len; 194 ppages = buf->pages + (buf->page_base >> PAGE_SHIFT); 195 while (len > 0) { 196 if (!*ppages) 197 *ppages = alloc_page(GFP_NOWAIT | __GFP_NOWARN); 198 if (!*ppages) 199 return -ENOBUFS; 200 ppages++; 201 len -= PAGE_SIZE; 202 } 203 204 return 0; 205 } 206 207 /* Convert @vec to a single SGL element. 208 * 209 * Returns pointer to next available SGE, and bumps the total number 210 * of SGEs consumed. 211 */ 212 static struct rpcrdma_mr_seg * 213 rpcrdma_convert_kvec(struct kvec *vec, struct rpcrdma_mr_seg *seg, 214 unsigned int *n) 215 { 216 seg->mr_page = virt_to_page(vec->iov_base); 217 seg->mr_offset = offset_in_page(vec->iov_base); 218 seg->mr_len = vec->iov_len; 219 ++seg; 220 ++(*n); 221 return seg; 222 } 223 224 /* Convert @xdrbuf into SGEs no larger than a page each. As they 225 * are registered, these SGEs are then coalesced into RDMA segments 226 * when the selected memreg mode supports it. 227 * 228 * Returns positive number of SGEs consumed, or a negative errno. 229 */ 230 231 static int 232 rpcrdma_convert_iovs(struct rpcrdma_xprt *r_xprt, struct xdr_buf *xdrbuf, 233 unsigned int pos, enum rpcrdma_chunktype type, 234 struct rpcrdma_mr_seg *seg) 235 { 236 unsigned long page_base; 237 unsigned int len, n; 238 struct page **ppages; 239 240 n = 0; 241 if (pos == 0) 242 seg = rpcrdma_convert_kvec(&xdrbuf->head[0], seg, &n); 243 244 len = xdrbuf->page_len; 245 ppages = xdrbuf->pages + (xdrbuf->page_base >> PAGE_SHIFT); 246 page_base = offset_in_page(xdrbuf->page_base); 247 while (len) { 248 seg->mr_page = *ppages; 249 seg->mr_offset = page_base; 250 seg->mr_len = min_t(u32, PAGE_SIZE - page_base, len); 251 len -= seg->mr_len; 252 ++ppages; 253 ++seg; 254 ++n; 255 page_base = 0; 256 } 257 258 if (type == rpcrdma_readch || type == rpcrdma_writech) 259 goto out; 260 261 if (xdrbuf->tail[0].iov_len) 262 rpcrdma_convert_kvec(&xdrbuf->tail[0], seg, &n); 263 264 out: 265 if (unlikely(n > RPCRDMA_MAX_SEGS)) 266 return -EIO; 267 return n; 268 } 269 270 static int 271 encode_rdma_segment(struct xdr_stream *xdr, struct rpcrdma_mr *mr) 272 { 273 __be32 *p; 274 275 p = xdr_reserve_space(xdr, 4 * sizeof(*p)); 276 if (unlikely(!p)) 277 return -EMSGSIZE; 278 279 xdr_encode_rdma_segment(p, mr->mr_handle, mr->mr_length, mr->mr_offset); 280 return 0; 281 } 282 283 static int 284 encode_read_segment(struct xdr_stream *xdr, struct rpcrdma_mr *mr, 285 u32 position) 286 { 287 __be32 *p; 288 289 p = xdr_reserve_space(xdr, 6 * sizeof(*p)); 290 if (unlikely(!p)) 291 return -EMSGSIZE; 292 293 *p++ = xdr_one; /* Item present */ 294 xdr_encode_read_segment(p, position, mr->mr_handle, mr->mr_length, 295 mr->mr_offset); 296 return 0; 297 } 298 299 static struct rpcrdma_mr_seg *rpcrdma_mr_prepare(struct rpcrdma_xprt *r_xprt, 300 struct rpcrdma_req *req, 301 struct rpcrdma_mr_seg *seg, 302 int nsegs, bool writing, 303 struct rpcrdma_mr **mr) 304 { 305 *mr = rpcrdma_mr_pop(&req->rl_free_mrs); 306 if (!*mr) { 307 *mr = rpcrdma_mr_get(r_xprt); 308 if (!*mr) 309 goto out_getmr_err; 310 (*mr)->mr_req = req; 311 } 312 313 rpcrdma_mr_push(*mr, &req->rl_registered); 314 return frwr_map(r_xprt, seg, nsegs, writing, req->rl_slot.rq_xid, *mr); 315 316 out_getmr_err: 317 trace_xprtrdma_nomrs_err(r_xprt, req); 318 xprt_wait_for_buffer_space(&r_xprt->rx_xprt); 319 rpcrdma_mrs_refresh(r_xprt); 320 return ERR_PTR(-EAGAIN); 321 } 322 323 /* Register and XDR encode the Read list. Supports encoding a list of read 324 * segments that belong to a single read chunk. 325 * 326 * Encoding key for single-list chunks (HLOO = Handle32 Length32 Offset64): 327 * 328 * Read chunklist (a linked list): 329 * N elements, position P (same P for all chunks of same arg!): 330 * 1 - PHLOO - 1 - PHLOO - ... - 1 - PHLOO - 0 331 * 332 * Returns zero on success, or a negative errno if a failure occurred. 333 * @xdr is advanced to the next position in the stream. 334 * 335 * Only a single @pos value is currently supported. 336 */ 337 static int rpcrdma_encode_read_list(struct rpcrdma_xprt *r_xprt, 338 struct rpcrdma_req *req, 339 struct rpc_rqst *rqst, 340 enum rpcrdma_chunktype rtype) 341 { 342 struct xdr_stream *xdr = &req->rl_stream; 343 struct rpcrdma_mr_seg *seg; 344 struct rpcrdma_mr *mr; 345 unsigned int pos; 346 int nsegs; 347 348 if (rtype == rpcrdma_noch_pullup || rtype == rpcrdma_noch_mapped) 349 goto done; 350 351 pos = rqst->rq_snd_buf.head[0].iov_len; 352 if (rtype == rpcrdma_areadch) 353 pos = 0; 354 seg = req->rl_segments; 355 nsegs = rpcrdma_convert_iovs(r_xprt, &rqst->rq_snd_buf, pos, 356 rtype, seg); 357 if (nsegs < 0) 358 return nsegs; 359 360 do { 361 seg = rpcrdma_mr_prepare(r_xprt, req, seg, nsegs, false, &mr); 362 if (IS_ERR(seg)) 363 return PTR_ERR(seg); 364 365 if (encode_read_segment(xdr, mr, pos) < 0) 366 return -EMSGSIZE; 367 368 trace_xprtrdma_chunk_read(rqst->rq_task, pos, mr, nsegs); 369 r_xprt->rx_stats.read_chunk_count++; 370 nsegs -= mr->mr_nents; 371 } while (nsegs); 372 373 done: 374 if (xdr_stream_encode_item_absent(xdr) < 0) 375 return -EMSGSIZE; 376 return 0; 377 } 378 379 /* Register and XDR encode the Write list. Supports encoding a list 380 * containing one array of plain segments that belong to a single 381 * write chunk. 382 * 383 * Encoding key for single-list chunks (HLOO = Handle32 Length32 Offset64): 384 * 385 * Write chunklist (a list of (one) counted array): 386 * N elements: 387 * 1 - N - HLOO - HLOO - ... - HLOO - 0 388 * 389 * Returns zero on success, or a negative errno if a failure occurred. 390 * @xdr is advanced to the next position in the stream. 391 * 392 * Only a single Write chunk is currently supported. 393 */ 394 static int rpcrdma_encode_write_list(struct rpcrdma_xprt *r_xprt, 395 struct rpcrdma_req *req, 396 struct rpc_rqst *rqst, 397 enum rpcrdma_chunktype wtype) 398 { 399 struct xdr_stream *xdr = &req->rl_stream; 400 struct rpcrdma_ep *ep = r_xprt->rx_ep; 401 struct rpcrdma_mr_seg *seg; 402 struct rpcrdma_mr *mr; 403 int nsegs, nchunks; 404 __be32 *segcount; 405 406 if (wtype != rpcrdma_writech) 407 goto done; 408 409 seg = req->rl_segments; 410 nsegs = rpcrdma_convert_iovs(r_xprt, &rqst->rq_rcv_buf, 411 rqst->rq_rcv_buf.head[0].iov_len, 412 wtype, seg); 413 if (nsegs < 0) 414 return nsegs; 415 416 if (xdr_stream_encode_item_present(xdr) < 0) 417 return -EMSGSIZE; 418 segcount = xdr_reserve_space(xdr, sizeof(*segcount)); 419 if (unlikely(!segcount)) 420 return -EMSGSIZE; 421 /* Actual value encoded below */ 422 423 nchunks = 0; 424 do { 425 seg = rpcrdma_mr_prepare(r_xprt, req, seg, nsegs, true, &mr); 426 if (IS_ERR(seg)) 427 return PTR_ERR(seg); 428 429 if (encode_rdma_segment(xdr, mr) < 0) 430 return -EMSGSIZE; 431 432 trace_xprtrdma_chunk_write(rqst->rq_task, mr, nsegs); 433 r_xprt->rx_stats.write_chunk_count++; 434 r_xprt->rx_stats.total_rdma_request += mr->mr_length; 435 nchunks++; 436 nsegs -= mr->mr_nents; 437 } while (nsegs); 438 439 if (xdr_pad_size(rqst->rq_rcv_buf.page_len)) { 440 if (encode_rdma_segment(xdr, ep->re_write_pad_mr) < 0) 441 return -EMSGSIZE; 442 443 trace_xprtrdma_chunk_wp(rqst->rq_task, ep->re_write_pad_mr, 444 nsegs); 445 r_xprt->rx_stats.write_chunk_count++; 446 r_xprt->rx_stats.total_rdma_request += mr->mr_length; 447 nchunks++; 448 nsegs -= mr->mr_nents; 449 } 450 451 /* Update count of segments in this Write chunk */ 452 *segcount = cpu_to_be32(nchunks); 453 454 done: 455 if (xdr_stream_encode_item_absent(xdr) < 0) 456 return -EMSGSIZE; 457 return 0; 458 } 459 460 /* Register and XDR encode the Reply chunk. Supports encoding an array 461 * of plain segments that belong to a single write (reply) chunk. 462 * 463 * Encoding key for single-list chunks (HLOO = Handle32 Length32 Offset64): 464 * 465 * Reply chunk (a counted array): 466 * N elements: 467 * 1 - N - HLOO - HLOO - ... - HLOO 468 * 469 * Returns zero on success, or a negative errno if a failure occurred. 470 * @xdr is advanced to the next position in the stream. 471 */ 472 static int rpcrdma_encode_reply_chunk(struct rpcrdma_xprt *r_xprt, 473 struct rpcrdma_req *req, 474 struct rpc_rqst *rqst, 475 enum rpcrdma_chunktype wtype) 476 { 477 struct xdr_stream *xdr = &req->rl_stream; 478 struct rpcrdma_mr_seg *seg; 479 struct rpcrdma_mr *mr; 480 int nsegs, nchunks; 481 __be32 *segcount; 482 483 if (wtype != rpcrdma_replych) { 484 if (xdr_stream_encode_item_absent(xdr) < 0) 485 return -EMSGSIZE; 486 return 0; 487 } 488 489 seg = req->rl_segments; 490 nsegs = rpcrdma_convert_iovs(r_xprt, &rqst->rq_rcv_buf, 0, wtype, seg); 491 if (nsegs < 0) 492 return nsegs; 493 494 if (xdr_stream_encode_item_present(xdr) < 0) 495 return -EMSGSIZE; 496 segcount = xdr_reserve_space(xdr, sizeof(*segcount)); 497 if (unlikely(!segcount)) 498 return -EMSGSIZE; 499 /* Actual value encoded below */ 500 501 nchunks = 0; 502 do { 503 seg = rpcrdma_mr_prepare(r_xprt, req, seg, nsegs, true, &mr); 504 if (IS_ERR(seg)) 505 return PTR_ERR(seg); 506 507 if (encode_rdma_segment(xdr, mr) < 0) 508 return -EMSGSIZE; 509 510 trace_xprtrdma_chunk_reply(rqst->rq_task, mr, nsegs); 511 r_xprt->rx_stats.reply_chunk_count++; 512 r_xprt->rx_stats.total_rdma_request += mr->mr_length; 513 nchunks++; 514 nsegs -= mr->mr_nents; 515 } while (nsegs); 516 517 /* Update count of segments in the Reply chunk */ 518 *segcount = cpu_to_be32(nchunks); 519 520 return 0; 521 } 522 523 static void rpcrdma_sendctx_done(struct kref *kref) 524 { 525 struct rpcrdma_req *req = 526 container_of(kref, struct rpcrdma_req, rl_kref); 527 struct rpcrdma_rep *rep = req->rl_reply; 528 529 rpcrdma_complete_rqst(rep); 530 rep->rr_rxprt->rx_stats.reply_waits_for_send++; 531 } 532 533 /** 534 * rpcrdma_sendctx_unmap - DMA-unmap Send buffer 535 * @sc: sendctx containing SGEs to unmap 536 * 537 */ 538 void rpcrdma_sendctx_unmap(struct rpcrdma_sendctx *sc) 539 { 540 struct rpcrdma_regbuf *rb = sc->sc_req->rl_sendbuf; 541 struct ib_sge *sge; 542 543 if (!sc->sc_unmap_count) 544 return; 545 546 /* The first two SGEs contain the transport header and 547 * the inline buffer. These are always left mapped so 548 * they can be cheaply re-used. 549 */ 550 for (sge = &sc->sc_sges[2]; sc->sc_unmap_count; 551 ++sge, --sc->sc_unmap_count) 552 ib_dma_unmap_page(rdmab_device(rb), sge->addr, sge->length, 553 DMA_TO_DEVICE); 554 555 kref_put(&sc->sc_req->rl_kref, rpcrdma_sendctx_done); 556 } 557 558 /* Prepare an SGE for the RPC-over-RDMA transport header. 559 */ 560 static void rpcrdma_prepare_hdr_sge(struct rpcrdma_xprt *r_xprt, 561 struct rpcrdma_req *req, u32 len) 562 { 563 struct rpcrdma_sendctx *sc = req->rl_sendctx; 564 struct rpcrdma_regbuf *rb = req->rl_rdmabuf; 565 struct ib_sge *sge = &sc->sc_sges[req->rl_wr.num_sge++]; 566 567 sge->addr = rdmab_addr(rb); 568 sge->length = len; 569 sge->lkey = rdmab_lkey(rb); 570 571 ib_dma_sync_single_for_device(rdmab_device(rb), sge->addr, sge->length, 572 DMA_TO_DEVICE); 573 } 574 575 /* The head iovec is straightforward, as it is usually already 576 * DMA-mapped. Sync the content that has changed. 577 */ 578 static bool rpcrdma_prepare_head_iov(struct rpcrdma_xprt *r_xprt, 579 struct rpcrdma_req *req, unsigned int len) 580 { 581 struct rpcrdma_sendctx *sc = req->rl_sendctx; 582 struct ib_sge *sge = &sc->sc_sges[req->rl_wr.num_sge++]; 583 struct rpcrdma_regbuf *rb = req->rl_sendbuf; 584 585 if (!rpcrdma_regbuf_dma_map(r_xprt, rb)) 586 return false; 587 588 sge->addr = rdmab_addr(rb); 589 sge->length = len; 590 sge->lkey = rdmab_lkey(rb); 591 592 ib_dma_sync_single_for_device(rdmab_device(rb), sge->addr, sge->length, 593 DMA_TO_DEVICE); 594 return true; 595 } 596 597 /* If there is a page list present, DMA map and prepare an 598 * SGE for each page to be sent. 599 */ 600 static bool rpcrdma_prepare_pagelist(struct rpcrdma_req *req, 601 struct xdr_buf *xdr) 602 { 603 struct rpcrdma_sendctx *sc = req->rl_sendctx; 604 struct rpcrdma_regbuf *rb = req->rl_sendbuf; 605 unsigned int page_base, len, remaining; 606 struct page **ppages; 607 struct ib_sge *sge; 608 609 ppages = xdr->pages + (xdr->page_base >> PAGE_SHIFT); 610 page_base = offset_in_page(xdr->page_base); 611 remaining = xdr->page_len; 612 while (remaining) { 613 sge = &sc->sc_sges[req->rl_wr.num_sge++]; 614 len = min_t(unsigned int, PAGE_SIZE - page_base, remaining); 615 sge->addr = ib_dma_map_page(rdmab_device(rb), *ppages, 616 page_base, len, DMA_TO_DEVICE); 617 if (ib_dma_mapping_error(rdmab_device(rb), sge->addr)) 618 goto out_mapping_err; 619 620 sge->length = len; 621 sge->lkey = rdmab_lkey(rb); 622 623 sc->sc_unmap_count++; 624 ppages++; 625 remaining -= len; 626 page_base = 0; 627 } 628 629 return true; 630 631 out_mapping_err: 632 trace_xprtrdma_dma_maperr(sge->addr); 633 return false; 634 } 635 636 /* The tail iovec may include an XDR pad for the page list, 637 * as well as additional content, and may not reside in the 638 * same page as the head iovec. 639 */ 640 static bool rpcrdma_prepare_tail_iov(struct rpcrdma_req *req, 641 struct xdr_buf *xdr, 642 unsigned int page_base, unsigned int len) 643 { 644 struct rpcrdma_sendctx *sc = req->rl_sendctx; 645 struct ib_sge *sge = &sc->sc_sges[req->rl_wr.num_sge++]; 646 struct rpcrdma_regbuf *rb = req->rl_sendbuf; 647 struct page *page = virt_to_page(xdr->tail[0].iov_base); 648 649 sge->addr = ib_dma_map_page(rdmab_device(rb), page, page_base, len, 650 DMA_TO_DEVICE); 651 if (ib_dma_mapping_error(rdmab_device(rb), sge->addr)) 652 goto out_mapping_err; 653 654 sge->length = len; 655 sge->lkey = rdmab_lkey(rb); 656 ++sc->sc_unmap_count; 657 return true; 658 659 out_mapping_err: 660 trace_xprtrdma_dma_maperr(sge->addr); 661 return false; 662 } 663 664 /* Copy the tail to the end of the head buffer. 665 */ 666 static void rpcrdma_pullup_tail_iov(struct rpcrdma_xprt *r_xprt, 667 struct rpcrdma_req *req, 668 struct xdr_buf *xdr) 669 { 670 unsigned char *dst; 671 672 dst = (unsigned char *)xdr->head[0].iov_base; 673 dst += xdr->head[0].iov_len + xdr->page_len; 674 memmove(dst, xdr->tail[0].iov_base, xdr->tail[0].iov_len); 675 r_xprt->rx_stats.pullup_copy_count += xdr->tail[0].iov_len; 676 } 677 678 /* Copy pagelist content into the head buffer. 679 */ 680 static void rpcrdma_pullup_pagelist(struct rpcrdma_xprt *r_xprt, 681 struct rpcrdma_req *req, 682 struct xdr_buf *xdr) 683 { 684 unsigned int len, page_base, remaining; 685 struct page **ppages; 686 unsigned char *src, *dst; 687 688 dst = (unsigned char *)xdr->head[0].iov_base; 689 dst += xdr->head[0].iov_len; 690 ppages = xdr->pages + (xdr->page_base >> PAGE_SHIFT); 691 page_base = offset_in_page(xdr->page_base); 692 remaining = xdr->page_len; 693 while (remaining) { 694 src = page_address(*ppages); 695 src += page_base; 696 len = min_t(unsigned int, PAGE_SIZE - page_base, remaining); 697 memcpy(dst, src, len); 698 r_xprt->rx_stats.pullup_copy_count += len; 699 700 ppages++; 701 dst += len; 702 remaining -= len; 703 page_base = 0; 704 } 705 } 706 707 /* Copy the contents of @xdr into @rl_sendbuf and DMA sync it. 708 * When the head, pagelist, and tail are small, a pull-up copy 709 * is considerably less costly than DMA mapping the components 710 * of @xdr. 711 * 712 * Assumptions: 713 * - the caller has already verified that the total length 714 * of the RPC Call body will fit into @rl_sendbuf. 715 */ 716 static bool rpcrdma_prepare_noch_pullup(struct rpcrdma_xprt *r_xprt, 717 struct rpcrdma_req *req, 718 struct xdr_buf *xdr) 719 { 720 if (unlikely(xdr->tail[0].iov_len)) 721 rpcrdma_pullup_tail_iov(r_xprt, req, xdr); 722 723 if (unlikely(xdr->page_len)) 724 rpcrdma_pullup_pagelist(r_xprt, req, xdr); 725 726 /* The whole RPC message resides in the head iovec now */ 727 return rpcrdma_prepare_head_iov(r_xprt, req, xdr->len); 728 } 729 730 static bool rpcrdma_prepare_noch_mapped(struct rpcrdma_xprt *r_xprt, 731 struct rpcrdma_req *req, 732 struct xdr_buf *xdr) 733 { 734 struct kvec *tail = &xdr->tail[0]; 735 736 if (!rpcrdma_prepare_head_iov(r_xprt, req, xdr->head[0].iov_len)) 737 return false; 738 if (xdr->page_len) 739 if (!rpcrdma_prepare_pagelist(req, xdr)) 740 return false; 741 if (tail->iov_len) 742 if (!rpcrdma_prepare_tail_iov(req, xdr, 743 offset_in_page(tail->iov_base), 744 tail->iov_len)) 745 return false; 746 747 if (req->rl_sendctx->sc_unmap_count) 748 kref_get(&req->rl_kref); 749 return true; 750 } 751 752 static bool rpcrdma_prepare_readch(struct rpcrdma_xprt *r_xprt, 753 struct rpcrdma_req *req, 754 struct xdr_buf *xdr) 755 { 756 if (!rpcrdma_prepare_head_iov(r_xprt, req, xdr->head[0].iov_len)) 757 return false; 758 759 /* If there is a Read chunk, the page list is being handled 760 * via explicit RDMA, and thus is skipped here. 761 */ 762 763 /* Do not include the tail if it is only an XDR pad */ 764 if (xdr->tail[0].iov_len > 3) { 765 unsigned int page_base, len; 766 767 /* If the content in the page list is an odd length, 768 * xdr_write_pages() adds a pad at the beginning of 769 * the tail iovec. Force the tail's non-pad content to 770 * land at the next XDR position in the Send message. 771 */ 772 page_base = offset_in_page(xdr->tail[0].iov_base); 773 len = xdr->tail[0].iov_len; 774 page_base += len & 3; 775 len -= len & 3; 776 if (!rpcrdma_prepare_tail_iov(req, xdr, page_base, len)) 777 return false; 778 kref_get(&req->rl_kref); 779 } 780 781 return true; 782 } 783 784 /** 785 * rpcrdma_prepare_send_sges - Construct SGEs for a Send WR 786 * @r_xprt: controlling transport 787 * @req: context of RPC Call being marshalled 788 * @hdrlen: size of transport header, in bytes 789 * @xdr: xdr_buf containing RPC Call 790 * @rtype: chunk type being encoded 791 * 792 * Returns 0 on success; otherwise a negative errno is returned. 793 */ 794 inline int rpcrdma_prepare_send_sges(struct rpcrdma_xprt *r_xprt, 795 struct rpcrdma_req *req, u32 hdrlen, 796 struct xdr_buf *xdr, 797 enum rpcrdma_chunktype rtype) 798 { 799 int ret; 800 801 ret = -EAGAIN; 802 req->rl_sendctx = rpcrdma_sendctx_get_locked(r_xprt); 803 if (!req->rl_sendctx) 804 goto out_nosc; 805 req->rl_sendctx->sc_unmap_count = 0; 806 req->rl_sendctx->sc_req = req; 807 kref_init(&req->rl_kref); 808 req->rl_wr.wr_cqe = &req->rl_sendctx->sc_cqe; 809 req->rl_wr.sg_list = req->rl_sendctx->sc_sges; 810 req->rl_wr.num_sge = 0; 811 req->rl_wr.opcode = IB_WR_SEND; 812 813 rpcrdma_prepare_hdr_sge(r_xprt, req, hdrlen); 814 815 ret = -EIO; 816 switch (rtype) { 817 case rpcrdma_noch_pullup: 818 if (!rpcrdma_prepare_noch_pullup(r_xprt, req, xdr)) 819 goto out_unmap; 820 break; 821 case rpcrdma_noch_mapped: 822 if (!rpcrdma_prepare_noch_mapped(r_xprt, req, xdr)) 823 goto out_unmap; 824 break; 825 case rpcrdma_readch: 826 if (!rpcrdma_prepare_readch(r_xprt, req, xdr)) 827 goto out_unmap; 828 break; 829 case rpcrdma_areadch: 830 break; 831 default: 832 goto out_unmap; 833 } 834 835 return 0; 836 837 out_unmap: 838 rpcrdma_sendctx_unmap(req->rl_sendctx); 839 out_nosc: 840 trace_xprtrdma_prepsend_failed(&req->rl_slot, ret); 841 return ret; 842 } 843 844 /** 845 * rpcrdma_marshal_req - Marshal and send one RPC request 846 * @r_xprt: controlling transport 847 * @rqst: RPC request to be marshaled 848 * 849 * For the RPC in "rqst", this function: 850 * - Chooses the transfer mode (eg., RDMA_MSG or RDMA_NOMSG) 851 * - Registers Read, Write, and Reply chunks 852 * - Constructs the transport header 853 * - Posts a Send WR to send the transport header and request 854 * 855 * Returns: 856 * %0 if the RPC was sent successfully, 857 * %-ENOTCONN if the connection was lost, 858 * %-EAGAIN if the caller should call again with the same arguments, 859 * %-ENOBUFS if the caller should call again after a delay, 860 * %-EMSGSIZE if the transport header is too small, 861 * %-EIO if a permanent problem occurred while marshaling. 862 */ 863 int 864 rpcrdma_marshal_req(struct rpcrdma_xprt *r_xprt, struct rpc_rqst *rqst) 865 { 866 struct rpcrdma_req *req = rpcr_to_rdmar(rqst); 867 struct xdr_stream *xdr = &req->rl_stream; 868 enum rpcrdma_chunktype rtype, wtype; 869 struct xdr_buf *buf = &rqst->rq_snd_buf; 870 bool ddp_allowed; 871 __be32 *p; 872 int ret; 873 874 if (unlikely(rqst->rq_rcv_buf.flags & XDRBUF_SPARSE_PAGES)) { 875 ret = rpcrdma_alloc_sparse_pages(&rqst->rq_rcv_buf); 876 if (ret) 877 return ret; 878 } 879 880 rpcrdma_set_xdrlen(&req->rl_hdrbuf, 0); 881 xdr_init_encode(xdr, &req->rl_hdrbuf, rdmab_data(req->rl_rdmabuf), 882 rqst); 883 884 /* Fixed header fields */ 885 ret = -EMSGSIZE; 886 p = xdr_reserve_space(xdr, 4 * sizeof(*p)); 887 if (!p) 888 goto out_err; 889 *p++ = rqst->rq_xid; 890 *p++ = rpcrdma_version; 891 *p++ = r_xprt->rx_buf.rb_max_requests; 892 893 /* When the ULP employs a GSS flavor that guarantees integrity 894 * or privacy, direct data placement of individual data items 895 * is not allowed. 896 */ 897 ddp_allowed = !test_bit(RPCAUTH_AUTH_DATATOUCH, 898 &rqst->rq_cred->cr_auth->au_flags); 899 900 /* 901 * Chunks needed for results? 902 * 903 * o If the expected result is under the inline threshold, all ops 904 * return as inline. 905 * o Large read ops return data as write chunk(s), header as 906 * inline. 907 * o Large non-read ops return as a single reply chunk. 908 */ 909 if (rpcrdma_results_inline(r_xprt, rqst)) 910 wtype = rpcrdma_noch; 911 else if ((ddp_allowed && rqst->rq_rcv_buf.flags & XDRBUF_READ) && 912 rpcrdma_nonpayload_inline(r_xprt, rqst)) 913 wtype = rpcrdma_writech; 914 else 915 wtype = rpcrdma_replych; 916 917 /* 918 * Chunks needed for arguments? 919 * 920 * o If the total request is under the inline threshold, all ops 921 * are sent as inline. 922 * o Large write ops transmit data as read chunk(s), header as 923 * inline. 924 * o Large non-write ops are sent with the entire message as a 925 * single read chunk (protocol 0-position special case). 926 * 927 * This assumes that the upper layer does not present a request 928 * that both has a data payload, and whose non-data arguments 929 * by themselves are larger than the inline threshold. 930 */ 931 if (rpcrdma_args_inline(r_xprt, rqst)) { 932 *p++ = rdma_msg; 933 rtype = buf->len < rdmab_length(req->rl_sendbuf) ? 934 rpcrdma_noch_pullup : rpcrdma_noch_mapped; 935 } else if (ddp_allowed && buf->flags & XDRBUF_WRITE) { 936 *p++ = rdma_msg; 937 rtype = rpcrdma_readch; 938 } else { 939 r_xprt->rx_stats.nomsg_call_count++; 940 *p++ = rdma_nomsg; 941 rtype = rpcrdma_areadch; 942 } 943 944 /* This implementation supports the following combinations 945 * of chunk lists in one RPC-over-RDMA Call message: 946 * 947 * - Read list 948 * - Write list 949 * - Reply chunk 950 * - Read list + Reply chunk 951 * 952 * It might not yet support the following combinations: 953 * 954 * - Read list + Write list 955 * 956 * It does not support the following combinations: 957 * 958 * - Write list + Reply chunk 959 * - Read list + Write list + Reply chunk 960 * 961 * This implementation supports only a single chunk in each 962 * Read or Write list. Thus for example the client cannot 963 * send a Call message with a Position Zero Read chunk and a 964 * regular Read chunk at the same time. 965 */ 966 ret = rpcrdma_encode_read_list(r_xprt, req, rqst, rtype); 967 if (ret) 968 goto out_err; 969 ret = rpcrdma_encode_write_list(r_xprt, req, rqst, wtype); 970 if (ret) 971 goto out_err; 972 ret = rpcrdma_encode_reply_chunk(r_xprt, req, rqst, wtype); 973 if (ret) 974 goto out_err; 975 976 ret = rpcrdma_prepare_send_sges(r_xprt, req, req->rl_hdrbuf.len, 977 buf, rtype); 978 if (ret) 979 goto out_err; 980 981 trace_xprtrdma_marshal(req, rtype, wtype); 982 return 0; 983 984 out_err: 985 trace_xprtrdma_marshal_failed(rqst, ret); 986 r_xprt->rx_stats.failed_marshal_count++; 987 frwr_reset(req); 988 return ret; 989 } 990 991 static void __rpcrdma_update_cwnd_locked(struct rpc_xprt *xprt, 992 struct rpcrdma_buffer *buf, 993 u32 grant) 994 { 995 buf->rb_credits = grant; 996 xprt->cwnd = grant << RPC_CWNDSHIFT; 997 } 998 999 static void rpcrdma_update_cwnd(struct rpcrdma_xprt *r_xprt, u32 grant) 1000 { 1001 struct rpc_xprt *xprt = &r_xprt->rx_xprt; 1002 1003 spin_lock(&xprt->transport_lock); 1004 __rpcrdma_update_cwnd_locked(xprt, &r_xprt->rx_buf, grant); 1005 spin_unlock(&xprt->transport_lock); 1006 } 1007 1008 /** 1009 * rpcrdma_reset_cwnd - Reset the xprt's congestion window 1010 * @r_xprt: controlling transport instance 1011 * 1012 * Prepare @r_xprt for the next connection by reinitializing 1013 * its credit grant to one (see RFC 8166, Section 3.3.3). 1014 */ 1015 void rpcrdma_reset_cwnd(struct rpcrdma_xprt *r_xprt) 1016 { 1017 struct rpc_xprt *xprt = &r_xprt->rx_xprt; 1018 1019 spin_lock(&xprt->transport_lock); 1020 xprt->cong = 0; 1021 __rpcrdma_update_cwnd_locked(xprt, &r_xprt->rx_buf, 1); 1022 spin_unlock(&xprt->transport_lock); 1023 } 1024 1025 /** 1026 * rpcrdma_inline_fixup - Scatter inline received data into rqst's iovecs 1027 * @rqst: controlling RPC request 1028 * @srcp: points to RPC message payload in receive buffer 1029 * @copy_len: remaining length of receive buffer content 1030 * @pad: Write chunk pad bytes needed (zero for pure inline) 1031 * 1032 * The upper layer has set the maximum number of bytes it can 1033 * receive in each component of rq_rcv_buf. These values are set in 1034 * the head.iov_len, page_len, tail.iov_len, and buflen fields. 1035 * 1036 * Unlike the TCP equivalent (xdr_partial_copy_from_skb), in 1037 * many cases this function simply updates iov_base pointers in 1038 * rq_rcv_buf to point directly to the received reply data, to 1039 * avoid copying reply data. 1040 * 1041 * Returns the count of bytes which had to be memcopied. 1042 */ 1043 static unsigned long 1044 rpcrdma_inline_fixup(struct rpc_rqst *rqst, char *srcp, int copy_len, int pad) 1045 { 1046 unsigned long fixup_copy_count; 1047 int i, npages, curlen; 1048 char *destp; 1049 struct page **ppages; 1050 int page_base; 1051 1052 /* The head iovec is redirected to the RPC reply message 1053 * in the receive buffer, to avoid a memcopy. 1054 */ 1055 rqst->rq_rcv_buf.head[0].iov_base = srcp; 1056 rqst->rq_private_buf.head[0].iov_base = srcp; 1057 1058 /* The contents of the receive buffer that follow 1059 * head.iov_len bytes are copied into the page list. 1060 */ 1061 curlen = rqst->rq_rcv_buf.head[0].iov_len; 1062 if (curlen > copy_len) 1063 curlen = copy_len; 1064 srcp += curlen; 1065 copy_len -= curlen; 1066 1067 ppages = rqst->rq_rcv_buf.pages + 1068 (rqst->rq_rcv_buf.page_base >> PAGE_SHIFT); 1069 page_base = offset_in_page(rqst->rq_rcv_buf.page_base); 1070 fixup_copy_count = 0; 1071 if (copy_len && rqst->rq_rcv_buf.page_len) { 1072 int pagelist_len; 1073 1074 pagelist_len = rqst->rq_rcv_buf.page_len; 1075 if (pagelist_len > copy_len) 1076 pagelist_len = copy_len; 1077 npages = PAGE_ALIGN(page_base + pagelist_len) >> PAGE_SHIFT; 1078 for (i = 0; i < npages; i++) { 1079 curlen = PAGE_SIZE - page_base; 1080 if (curlen > pagelist_len) 1081 curlen = pagelist_len; 1082 1083 destp = kmap_atomic(ppages[i]); 1084 memcpy(destp + page_base, srcp, curlen); 1085 flush_dcache_page(ppages[i]); 1086 kunmap_atomic(destp); 1087 srcp += curlen; 1088 copy_len -= curlen; 1089 fixup_copy_count += curlen; 1090 pagelist_len -= curlen; 1091 if (!pagelist_len) 1092 break; 1093 page_base = 0; 1094 } 1095 1096 /* Implicit padding for the last segment in a Write 1097 * chunk is inserted inline at the front of the tail 1098 * iovec. The upper layer ignores the content of 1099 * the pad. Simply ensure inline content in the tail 1100 * that follows the Write chunk is properly aligned. 1101 */ 1102 if (pad) 1103 srcp -= pad; 1104 } 1105 1106 /* The tail iovec is redirected to the remaining data 1107 * in the receive buffer, to avoid a memcopy. 1108 */ 1109 if (copy_len || pad) { 1110 rqst->rq_rcv_buf.tail[0].iov_base = srcp; 1111 rqst->rq_private_buf.tail[0].iov_base = srcp; 1112 } 1113 1114 if (fixup_copy_count) 1115 trace_xprtrdma_fixup(rqst, fixup_copy_count); 1116 return fixup_copy_count; 1117 } 1118 1119 /* By convention, backchannel calls arrive via rdma_msg type 1120 * messages, and never populate the chunk lists. This makes 1121 * the RPC/RDMA header small and fixed in size, so it is 1122 * straightforward to check the RPC header's direction field. 1123 */ 1124 static bool 1125 rpcrdma_is_bcall(struct rpcrdma_xprt *r_xprt, struct rpcrdma_rep *rep) 1126 #if defined(CONFIG_SUNRPC_BACKCHANNEL) 1127 { 1128 struct xdr_stream *xdr = &rep->rr_stream; 1129 __be32 *p; 1130 1131 if (rep->rr_proc != rdma_msg) 1132 return false; 1133 1134 /* Peek at stream contents without advancing. */ 1135 p = xdr_inline_decode(xdr, 0); 1136 1137 /* Chunk lists */ 1138 if (xdr_item_is_present(p++)) 1139 return false; 1140 if (xdr_item_is_present(p++)) 1141 return false; 1142 if (xdr_item_is_present(p++)) 1143 return false; 1144 1145 /* RPC header */ 1146 if (*p++ != rep->rr_xid) 1147 return false; 1148 if (*p != cpu_to_be32(RPC_CALL)) 1149 return false; 1150 1151 /* Now that we are sure this is a backchannel call, 1152 * advance to the RPC header. 1153 */ 1154 p = xdr_inline_decode(xdr, 3 * sizeof(*p)); 1155 if (unlikely(!p)) 1156 return true; 1157 1158 rpcrdma_bc_receive_call(r_xprt, rep); 1159 return true; 1160 } 1161 #else /* CONFIG_SUNRPC_BACKCHANNEL */ 1162 { 1163 return false; 1164 } 1165 #endif /* CONFIG_SUNRPC_BACKCHANNEL */ 1166 1167 static int decode_rdma_segment(struct xdr_stream *xdr, u32 *length) 1168 { 1169 u32 handle; 1170 u64 offset; 1171 __be32 *p; 1172 1173 p = xdr_inline_decode(xdr, 4 * sizeof(*p)); 1174 if (unlikely(!p)) 1175 return -EIO; 1176 1177 xdr_decode_rdma_segment(p, &handle, length, &offset); 1178 trace_xprtrdma_decode_seg(handle, *length, offset); 1179 return 0; 1180 } 1181 1182 static int decode_write_chunk(struct xdr_stream *xdr, u32 *length) 1183 { 1184 u32 segcount, seglength; 1185 __be32 *p; 1186 1187 p = xdr_inline_decode(xdr, sizeof(*p)); 1188 if (unlikely(!p)) 1189 return -EIO; 1190 1191 *length = 0; 1192 segcount = be32_to_cpup(p); 1193 while (segcount--) { 1194 if (decode_rdma_segment(xdr, &seglength)) 1195 return -EIO; 1196 *length += seglength; 1197 } 1198 1199 return 0; 1200 } 1201 1202 /* In RPC-over-RDMA Version One replies, a Read list is never 1203 * expected. This decoder is a stub that returns an error if 1204 * a Read list is present. 1205 */ 1206 static int decode_read_list(struct xdr_stream *xdr) 1207 { 1208 __be32 *p; 1209 1210 p = xdr_inline_decode(xdr, sizeof(*p)); 1211 if (unlikely(!p)) 1212 return -EIO; 1213 if (unlikely(xdr_item_is_present(p))) 1214 return -EIO; 1215 return 0; 1216 } 1217 1218 /* Supports only one Write chunk in the Write list 1219 */ 1220 static int decode_write_list(struct xdr_stream *xdr, u32 *length) 1221 { 1222 u32 chunklen; 1223 bool first; 1224 __be32 *p; 1225 1226 *length = 0; 1227 first = true; 1228 do { 1229 p = xdr_inline_decode(xdr, sizeof(*p)); 1230 if (unlikely(!p)) 1231 return -EIO; 1232 if (xdr_item_is_absent(p)) 1233 break; 1234 if (!first) 1235 return -EIO; 1236 1237 if (decode_write_chunk(xdr, &chunklen)) 1238 return -EIO; 1239 *length += chunklen; 1240 first = false; 1241 } while (true); 1242 return 0; 1243 } 1244 1245 static int decode_reply_chunk(struct xdr_stream *xdr, u32 *length) 1246 { 1247 __be32 *p; 1248 1249 p = xdr_inline_decode(xdr, sizeof(*p)); 1250 if (unlikely(!p)) 1251 return -EIO; 1252 1253 *length = 0; 1254 if (xdr_item_is_present(p)) 1255 if (decode_write_chunk(xdr, length)) 1256 return -EIO; 1257 return 0; 1258 } 1259 1260 static int 1261 rpcrdma_decode_msg(struct rpcrdma_xprt *r_xprt, struct rpcrdma_rep *rep, 1262 struct rpc_rqst *rqst) 1263 { 1264 struct xdr_stream *xdr = &rep->rr_stream; 1265 u32 writelist, replychunk, rpclen; 1266 char *base; 1267 1268 /* Decode the chunk lists */ 1269 if (decode_read_list(xdr)) 1270 return -EIO; 1271 if (decode_write_list(xdr, &writelist)) 1272 return -EIO; 1273 if (decode_reply_chunk(xdr, &replychunk)) 1274 return -EIO; 1275 1276 /* RDMA_MSG sanity checks */ 1277 if (unlikely(replychunk)) 1278 return -EIO; 1279 1280 /* Build the RPC reply's Payload stream in rqst->rq_rcv_buf */ 1281 base = (char *)xdr_inline_decode(xdr, 0); 1282 rpclen = xdr_stream_remaining(xdr); 1283 r_xprt->rx_stats.fixup_copy_count += 1284 rpcrdma_inline_fixup(rqst, base, rpclen, writelist & 3); 1285 1286 r_xprt->rx_stats.total_rdma_reply += writelist; 1287 return rpclen + xdr_align_size(writelist); 1288 } 1289 1290 static noinline int 1291 rpcrdma_decode_nomsg(struct rpcrdma_xprt *r_xprt, struct rpcrdma_rep *rep) 1292 { 1293 struct xdr_stream *xdr = &rep->rr_stream; 1294 u32 writelist, replychunk; 1295 1296 /* Decode the chunk lists */ 1297 if (decode_read_list(xdr)) 1298 return -EIO; 1299 if (decode_write_list(xdr, &writelist)) 1300 return -EIO; 1301 if (decode_reply_chunk(xdr, &replychunk)) 1302 return -EIO; 1303 1304 /* RDMA_NOMSG sanity checks */ 1305 if (unlikely(writelist)) 1306 return -EIO; 1307 if (unlikely(!replychunk)) 1308 return -EIO; 1309 1310 /* Reply chunk buffer already is the reply vector */ 1311 r_xprt->rx_stats.total_rdma_reply += replychunk; 1312 return replychunk; 1313 } 1314 1315 static noinline int 1316 rpcrdma_decode_error(struct rpcrdma_xprt *r_xprt, struct rpcrdma_rep *rep, 1317 struct rpc_rqst *rqst) 1318 { 1319 struct xdr_stream *xdr = &rep->rr_stream; 1320 __be32 *p; 1321 1322 p = xdr_inline_decode(xdr, sizeof(*p)); 1323 if (unlikely(!p)) 1324 return -EIO; 1325 1326 switch (*p) { 1327 case err_vers: 1328 p = xdr_inline_decode(xdr, 2 * sizeof(*p)); 1329 if (!p) 1330 break; 1331 trace_xprtrdma_err_vers(rqst, p, p + 1); 1332 break; 1333 case err_chunk: 1334 trace_xprtrdma_err_chunk(rqst); 1335 break; 1336 default: 1337 trace_xprtrdma_err_unrecognized(rqst, p); 1338 } 1339 1340 return -EIO; 1341 } 1342 1343 /** 1344 * rpcrdma_unpin_rqst - Release rqst without completing it 1345 * @rep: RPC/RDMA Receive context 1346 * 1347 * This is done when a connection is lost so that a Reply 1348 * can be dropped and its matching Call can be subsequently 1349 * retransmitted on a new connection. 1350 */ 1351 void rpcrdma_unpin_rqst(struct rpcrdma_rep *rep) 1352 { 1353 struct rpc_xprt *xprt = &rep->rr_rxprt->rx_xprt; 1354 struct rpc_rqst *rqst = rep->rr_rqst; 1355 struct rpcrdma_req *req = rpcr_to_rdmar(rqst); 1356 1357 req->rl_reply = NULL; 1358 rep->rr_rqst = NULL; 1359 1360 spin_lock(&xprt->queue_lock); 1361 xprt_unpin_rqst(rqst); 1362 spin_unlock(&xprt->queue_lock); 1363 } 1364 1365 /** 1366 * rpcrdma_complete_rqst - Pass completed rqst back to RPC 1367 * @rep: RPC/RDMA Receive context 1368 * 1369 * Reconstruct the RPC reply and complete the transaction 1370 * while @rqst is still pinned to ensure the rep, rqst, and 1371 * rq_task pointers remain stable. 1372 */ 1373 void rpcrdma_complete_rqst(struct rpcrdma_rep *rep) 1374 { 1375 struct rpcrdma_xprt *r_xprt = rep->rr_rxprt; 1376 struct rpc_xprt *xprt = &r_xprt->rx_xprt; 1377 struct rpc_rqst *rqst = rep->rr_rqst; 1378 int status; 1379 1380 switch (rep->rr_proc) { 1381 case rdma_msg: 1382 status = rpcrdma_decode_msg(r_xprt, rep, rqst); 1383 break; 1384 case rdma_nomsg: 1385 status = rpcrdma_decode_nomsg(r_xprt, rep); 1386 break; 1387 case rdma_error: 1388 status = rpcrdma_decode_error(r_xprt, rep, rqst); 1389 break; 1390 default: 1391 status = -EIO; 1392 } 1393 if (status < 0) 1394 goto out_badheader; 1395 1396 out: 1397 spin_lock(&xprt->queue_lock); 1398 xprt_complete_rqst(rqst->rq_task, status); 1399 xprt_unpin_rqst(rqst); 1400 spin_unlock(&xprt->queue_lock); 1401 return; 1402 1403 out_badheader: 1404 trace_xprtrdma_reply_hdr_err(rep); 1405 r_xprt->rx_stats.bad_reply_count++; 1406 rqst->rq_task->tk_status = status; 1407 status = 0; 1408 goto out; 1409 } 1410 1411 static void rpcrdma_reply_done(struct kref *kref) 1412 { 1413 struct rpcrdma_req *req = 1414 container_of(kref, struct rpcrdma_req, rl_kref); 1415 1416 rpcrdma_complete_rqst(req->rl_reply); 1417 } 1418 1419 /** 1420 * rpcrdma_reply_handler - Process received RPC/RDMA messages 1421 * @rep: Incoming rpcrdma_rep object to process 1422 * 1423 * Errors must result in the RPC task either being awakened, or 1424 * allowed to timeout, to discover the errors at that time. 1425 */ 1426 void rpcrdma_reply_handler(struct rpcrdma_rep *rep) 1427 { 1428 struct rpcrdma_xprt *r_xprt = rep->rr_rxprt; 1429 struct rpc_xprt *xprt = &r_xprt->rx_xprt; 1430 struct rpcrdma_buffer *buf = &r_xprt->rx_buf; 1431 struct rpcrdma_req *req; 1432 struct rpc_rqst *rqst; 1433 u32 credits; 1434 __be32 *p; 1435 1436 /* Any data means we had a useful conversation, so 1437 * then we don't need to delay the next reconnect. 1438 */ 1439 if (xprt->reestablish_timeout) 1440 xprt->reestablish_timeout = 0; 1441 1442 /* Fixed transport header fields */ 1443 xdr_init_decode(&rep->rr_stream, &rep->rr_hdrbuf, 1444 rep->rr_hdrbuf.head[0].iov_base, NULL); 1445 p = xdr_inline_decode(&rep->rr_stream, 4 * sizeof(*p)); 1446 if (unlikely(!p)) 1447 goto out_shortreply; 1448 rep->rr_xid = *p++; 1449 rep->rr_vers = *p++; 1450 credits = be32_to_cpu(*p++); 1451 rep->rr_proc = *p++; 1452 1453 if (rep->rr_vers != rpcrdma_version) 1454 goto out_badversion; 1455 1456 if (rpcrdma_is_bcall(r_xprt, rep)) 1457 return; 1458 1459 /* Match incoming rpcrdma_rep to an rpcrdma_req to 1460 * get context for handling any incoming chunks. 1461 */ 1462 spin_lock(&xprt->queue_lock); 1463 rqst = xprt_lookup_rqst(xprt, rep->rr_xid); 1464 if (!rqst) 1465 goto out_norqst; 1466 xprt_pin_rqst(rqst); 1467 spin_unlock(&xprt->queue_lock); 1468 1469 if (credits == 0) 1470 credits = 1; /* don't deadlock */ 1471 else if (credits > r_xprt->rx_ep->re_max_requests) 1472 credits = r_xprt->rx_ep->re_max_requests; 1473 rpcrdma_post_recvs(r_xprt, credits + (buf->rb_bc_srv_max_requests << 1), 1474 false); 1475 if (buf->rb_credits != credits) 1476 rpcrdma_update_cwnd(r_xprt, credits); 1477 1478 req = rpcr_to_rdmar(rqst); 1479 if (unlikely(req->rl_reply)) 1480 rpcrdma_rep_put(buf, req->rl_reply); 1481 req->rl_reply = rep; 1482 rep->rr_rqst = rqst; 1483 1484 trace_xprtrdma_reply(rqst->rq_task, rep, credits); 1485 1486 if (rep->rr_wc_flags & IB_WC_WITH_INVALIDATE) 1487 frwr_reminv(rep, &req->rl_registered); 1488 if (!list_empty(&req->rl_registered)) 1489 frwr_unmap_async(r_xprt, req); 1490 /* LocalInv completion will complete the RPC */ 1491 else 1492 kref_put(&req->rl_kref, rpcrdma_reply_done); 1493 return; 1494 1495 out_badversion: 1496 trace_xprtrdma_reply_vers_err(rep); 1497 goto out; 1498 1499 out_norqst: 1500 spin_unlock(&xprt->queue_lock); 1501 trace_xprtrdma_reply_rqst_err(rep); 1502 goto out; 1503 1504 out_shortreply: 1505 trace_xprtrdma_reply_short_err(rep); 1506 1507 out: 1508 rpcrdma_rep_put(buf, rep); 1509 } 1510