1 // SPDX-License-Identifier: GPL-2.0 OR BSD-3-Clause 2 /* 3 * Copyright (c) 2014-2017 Oracle. All rights reserved. 4 * Copyright (c) 2003-2007 Network Appliance, Inc. All rights reserved. 5 * 6 * This software is available to you under a choice of one of two 7 * licenses. You may choose to be licensed under the terms of the GNU 8 * General Public License (GPL) Version 2, available from the file 9 * COPYING in the main directory of this source tree, or the BSD-type 10 * license below: 11 * 12 * Redistribution and use in source and binary forms, with or without 13 * modification, are permitted provided that the following conditions 14 * are met: 15 * 16 * Redistributions of source code must retain the above copyright 17 * notice, this list of conditions and the following disclaimer. 18 * 19 * Redistributions in binary form must reproduce the above 20 * copyright notice, this list of conditions and the following 21 * disclaimer in the documentation and/or other materials provided 22 * with the distribution. 23 * 24 * Neither the name of the Network Appliance, Inc. nor the names of 25 * its contributors may be used to endorse or promote products 26 * derived from this software without specific prior written 27 * permission. 28 * 29 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 30 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 31 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 32 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 33 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 34 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 35 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 36 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 37 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 38 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 39 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 40 */ 41 42 /* 43 * rpc_rdma.c 44 * 45 * This file contains the guts of the RPC RDMA protocol, and 46 * does marshaling/unmarshaling, etc. It is also where interfacing 47 * to the Linux RPC framework lives. 48 */ 49 50 #include <linux/highmem.h> 51 52 #include <linux/sunrpc/svc_rdma.h> 53 54 #include "xprt_rdma.h" 55 #include <trace/events/rpcrdma.h> 56 57 #if IS_ENABLED(CONFIG_SUNRPC_DEBUG) 58 # define RPCDBG_FACILITY RPCDBG_TRANS 59 #endif 60 61 /* Returns size of largest RPC-over-RDMA header in a Call message 62 * 63 * The largest Call header contains a full-size Read list and a 64 * minimal Reply chunk. 65 */ 66 static unsigned int rpcrdma_max_call_header_size(unsigned int maxsegs) 67 { 68 unsigned int size; 69 70 /* Fixed header fields and list discriminators */ 71 size = RPCRDMA_HDRLEN_MIN; 72 73 /* Maximum Read list size */ 74 size += maxsegs * rpcrdma_readchunk_maxsz * sizeof(__be32); 75 76 /* Minimal Read chunk size */ 77 size += sizeof(__be32); /* segment count */ 78 size += rpcrdma_segment_maxsz * sizeof(__be32); 79 size += sizeof(__be32); /* list discriminator */ 80 81 return size; 82 } 83 84 /* Returns size of largest RPC-over-RDMA header in a Reply message 85 * 86 * There is only one Write list or one Reply chunk per Reply 87 * message. The larger list is the Write list. 88 */ 89 static unsigned int rpcrdma_max_reply_header_size(unsigned int maxsegs) 90 { 91 unsigned int size; 92 93 /* Fixed header fields and list discriminators */ 94 size = RPCRDMA_HDRLEN_MIN; 95 96 /* Maximum Write list size */ 97 size += sizeof(__be32); /* segment count */ 98 size += maxsegs * rpcrdma_segment_maxsz * sizeof(__be32); 99 size += sizeof(__be32); /* list discriminator */ 100 101 return size; 102 } 103 104 /** 105 * rpcrdma_set_max_header_sizes - Initialize inline payload sizes 106 * @ep: endpoint to initialize 107 * 108 * The max_inline fields contain the maximum size of an RPC message 109 * so the marshaling code doesn't have to repeat this calculation 110 * for every RPC. 111 */ 112 void rpcrdma_set_max_header_sizes(struct rpcrdma_ep *ep) 113 { 114 unsigned int maxsegs = ep->re_max_rdma_segs; 115 116 ep->re_max_inline_send = 117 ep->re_inline_send - rpcrdma_max_call_header_size(maxsegs); 118 ep->re_max_inline_recv = 119 ep->re_inline_recv - rpcrdma_max_reply_header_size(maxsegs); 120 } 121 122 /* The client can send a request inline as long as the RPCRDMA header 123 * plus the RPC call fit under the transport's inline limit. If the 124 * combined call message size exceeds that limit, the client must use 125 * a Read chunk for this operation. 126 * 127 * A Read chunk is also required if sending the RPC call inline would 128 * exceed this device's max_sge limit. 129 */ 130 static bool rpcrdma_args_inline(struct rpcrdma_xprt *r_xprt, 131 struct rpc_rqst *rqst) 132 { 133 struct xdr_buf *xdr = &rqst->rq_snd_buf; 134 struct rpcrdma_ep *ep = r_xprt->rx_ep; 135 unsigned int count, remaining, offset; 136 137 if (xdr->len > ep->re_max_inline_send) 138 return false; 139 140 if (xdr->page_len) { 141 remaining = xdr->page_len; 142 offset = offset_in_page(xdr->page_base); 143 count = RPCRDMA_MIN_SEND_SGES; 144 while (remaining) { 145 remaining -= min_t(unsigned int, 146 PAGE_SIZE - offset, remaining); 147 offset = 0; 148 if (++count > ep->re_attr.cap.max_send_sge) 149 return false; 150 } 151 } 152 153 return true; 154 } 155 156 /* The client can't know how large the actual reply will be. Thus it 157 * plans for the largest possible reply for that particular ULP 158 * operation. If the maximum combined reply message size exceeds that 159 * limit, the client must provide a write list or a reply chunk for 160 * this request. 161 */ 162 static bool rpcrdma_results_inline(struct rpcrdma_xprt *r_xprt, 163 struct rpc_rqst *rqst) 164 { 165 return rqst->rq_rcv_buf.buflen <= r_xprt->rx_ep->re_max_inline_recv; 166 } 167 168 /* The client is required to provide a Reply chunk if the maximum 169 * size of the non-payload part of the RPC Reply is larger than 170 * the inline threshold. 171 */ 172 static bool 173 rpcrdma_nonpayload_inline(const struct rpcrdma_xprt *r_xprt, 174 const struct rpc_rqst *rqst) 175 { 176 const struct xdr_buf *buf = &rqst->rq_rcv_buf; 177 178 return (buf->head[0].iov_len + buf->tail[0].iov_len) < 179 r_xprt->rx_ep->re_max_inline_recv; 180 } 181 182 /* Split @vec on page boundaries into SGEs. FMR registers pages, not 183 * a byte range. Other modes coalesce these SGEs into a single MR 184 * when they can. 185 * 186 * Returns pointer to next available SGE, and bumps the total number 187 * of SGEs consumed. 188 */ 189 static struct rpcrdma_mr_seg * 190 rpcrdma_convert_kvec(struct kvec *vec, struct rpcrdma_mr_seg *seg, 191 unsigned int *n) 192 { 193 u32 remaining, page_offset; 194 char *base; 195 196 base = vec->iov_base; 197 page_offset = offset_in_page(base); 198 remaining = vec->iov_len; 199 while (remaining) { 200 seg->mr_page = NULL; 201 seg->mr_offset = base; 202 seg->mr_len = min_t(u32, PAGE_SIZE - page_offset, remaining); 203 remaining -= seg->mr_len; 204 base += seg->mr_len; 205 ++seg; 206 ++(*n); 207 page_offset = 0; 208 } 209 return seg; 210 } 211 212 /* Convert @xdrbuf into SGEs no larger than a page each. As they 213 * are registered, these SGEs are then coalesced into RDMA segments 214 * when the selected memreg mode supports it. 215 * 216 * Returns positive number of SGEs consumed, or a negative errno. 217 */ 218 219 static int 220 rpcrdma_convert_iovs(struct rpcrdma_xprt *r_xprt, struct xdr_buf *xdrbuf, 221 unsigned int pos, enum rpcrdma_chunktype type, 222 struct rpcrdma_mr_seg *seg) 223 { 224 unsigned long page_base; 225 unsigned int len, n; 226 struct page **ppages; 227 228 n = 0; 229 if (pos == 0) 230 seg = rpcrdma_convert_kvec(&xdrbuf->head[0], seg, &n); 231 232 len = xdrbuf->page_len; 233 ppages = xdrbuf->pages + (xdrbuf->page_base >> PAGE_SHIFT); 234 page_base = offset_in_page(xdrbuf->page_base); 235 while (len) { 236 /* ACL likes to be lazy in allocating pages - ACLs 237 * are small by default but can get huge. 238 */ 239 if (unlikely(xdrbuf->flags & XDRBUF_SPARSE_PAGES)) { 240 if (!*ppages) 241 *ppages = alloc_page(GFP_NOWAIT | __GFP_NOWARN); 242 if (!*ppages) 243 return -ENOBUFS; 244 } 245 seg->mr_page = *ppages; 246 seg->mr_offset = (char *)page_base; 247 seg->mr_len = min_t(u32, PAGE_SIZE - page_base, len); 248 len -= seg->mr_len; 249 ++ppages; 250 ++seg; 251 ++n; 252 page_base = 0; 253 } 254 255 /* When encoding a Read chunk, the tail iovec contains an 256 * XDR pad and may be omitted. 257 */ 258 if (type == rpcrdma_readch && r_xprt->rx_ep->re_implicit_roundup) 259 goto out; 260 261 /* When encoding a Write chunk, some servers need to see an 262 * extra segment for non-XDR-aligned Write chunks. The upper 263 * layer provides space in the tail iovec that may be used 264 * for this purpose. 265 */ 266 if (type == rpcrdma_writech && r_xprt->rx_ep->re_implicit_roundup) 267 goto out; 268 269 if (xdrbuf->tail[0].iov_len) 270 seg = rpcrdma_convert_kvec(&xdrbuf->tail[0], seg, &n); 271 272 out: 273 if (unlikely(n > RPCRDMA_MAX_SEGS)) 274 return -EIO; 275 return n; 276 } 277 278 static int 279 encode_rdma_segment(struct xdr_stream *xdr, struct rpcrdma_mr *mr) 280 { 281 __be32 *p; 282 283 p = xdr_reserve_space(xdr, 4 * sizeof(*p)); 284 if (unlikely(!p)) 285 return -EMSGSIZE; 286 287 xdr_encode_rdma_segment(p, mr->mr_handle, mr->mr_length, mr->mr_offset); 288 return 0; 289 } 290 291 static int 292 encode_read_segment(struct xdr_stream *xdr, struct rpcrdma_mr *mr, 293 u32 position) 294 { 295 __be32 *p; 296 297 p = xdr_reserve_space(xdr, 6 * sizeof(*p)); 298 if (unlikely(!p)) 299 return -EMSGSIZE; 300 301 *p++ = xdr_one; /* Item present */ 302 xdr_encode_read_segment(p, position, mr->mr_handle, mr->mr_length, 303 mr->mr_offset); 304 return 0; 305 } 306 307 static struct rpcrdma_mr_seg *rpcrdma_mr_prepare(struct rpcrdma_xprt *r_xprt, 308 struct rpcrdma_req *req, 309 struct rpcrdma_mr_seg *seg, 310 int nsegs, bool writing, 311 struct rpcrdma_mr **mr) 312 { 313 *mr = rpcrdma_mr_pop(&req->rl_free_mrs); 314 if (!*mr) { 315 *mr = rpcrdma_mr_get(r_xprt); 316 if (!*mr) 317 goto out_getmr_err; 318 trace_xprtrdma_mr_get(req); 319 (*mr)->mr_req = req; 320 } 321 322 rpcrdma_mr_push(*mr, &req->rl_registered); 323 return frwr_map(r_xprt, seg, nsegs, writing, req->rl_slot.rq_xid, *mr); 324 325 out_getmr_err: 326 trace_xprtrdma_nomrs(req); 327 xprt_wait_for_buffer_space(&r_xprt->rx_xprt); 328 rpcrdma_mrs_refresh(r_xprt); 329 return ERR_PTR(-EAGAIN); 330 } 331 332 /* Register and XDR encode the Read list. Supports encoding a list of read 333 * segments that belong to a single read chunk. 334 * 335 * Encoding key for single-list chunks (HLOO = Handle32 Length32 Offset64): 336 * 337 * Read chunklist (a linked list): 338 * N elements, position P (same P for all chunks of same arg!): 339 * 1 - PHLOO - 1 - PHLOO - ... - 1 - PHLOO - 0 340 * 341 * Returns zero on success, or a negative errno if a failure occurred. 342 * @xdr is advanced to the next position in the stream. 343 * 344 * Only a single @pos value is currently supported. 345 */ 346 static int rpcrdma_encode_read_list(struct rpcrdma_xprt *r_xprt, 347 struct rpcrdma_req *req, 348 struct rpc_rqst *rqst, 349 enum rpcrdma_chunktype rtype) 350 { 351 struct xdr_stream *xdr = &req->rl_stream; 352 struct rpcrdma_mr_seg *seg; 353 struct rpcrdma_mr *mr; 354 unsigned int pos; 355 int nsegs; 356 357 if (rtype == rpcrdma_noch_pullup || rtype == rpcrdma_noch_mapped) 358 goto done; 359 360 pos = rqst->rq_snd_buf.head[0].iov_len; 361 if (rtype == rpcrdma_areadch) 362 pos = 0; 363 seg = req->rl_segments; 364 nsegs = rpcrdma_convert_iovs(r_xprt, &rqst->rq_snd_buf, pos, 365 rtype, seg); 366 if (nsegs < 0) 367 return nsegs; 368 369 do { 370 seg = rpcrdma_mr_prepare(r_xprt, req, seg, nsegs, false, &mr); 371 if (IS_ERR(seg)) 372 return PTR_ERR(seg); 373 374 if (encode_read_segment(xdr, mr, pos) < 0) 375 return -EMSGSIZE; 376 377 trace_xprtrdma_chunk_read(rqst->rq_task, pos, mr, nsegs); 378 r_xprt->rx_stats.read_chunk_count++; 379 nsegs -= mr->mr_nents; 380 } while (nsegs); 381 382 done: 383 if (xdr_stream_encode_item_absent(xdr) < 0) 384 return -EMSGSIZE; 385 return 0; 386 } 387 388 /* Register and XDR encode the Write list. Supports encoding a list 389 * containing one array of plain segments that belong to a single 390 * write chunk. 391 * 392 * Encoding key for single-list chunks (HLOO = Handle32 Length32 Offset64): 393 * 394 * Write chunklist (a list of (one) counted array): 395 * N elements: 396 * 1 - N - HLOO - HLOO - ... - HLOO - 0 397 * 398 * Returns zero on success, or a negative errno if a failure occurred. 399 * @xdr is advanced to the next position in the stream. 400 * 401 * Only a single Write chunk is currently supported. 402 */ 403 static int rpcrdma_encode_write_list(struct rpcrdma_xprt *r_xprt, 404 struct rpcrdma_req *req, 405 struct rpc_rqst *rqst, 406 enum rpcrdma_chunktype wtype) 407 { 408 struct xdr_stream *xdr = &req->rl_stream; 409 struct rpcrdma_mr_seg *seg; 410 struct rpcrdma_mr *mr; 411 int nsegs, nchunks; 412 __be32 *segcount; 413 414 if (wtype != rpcrdma_writech) 415 goto done; 416 417 seg = req->rl_segments; 418 nsegs = rpcrdma_convert_iovs(r_xprt, &rqst->rq_rcv_buf, 419 rqst->rq_rcv_buf.head[0].iov_len, 420 wtype, seg); 421 if (nsegs < 0) 422 return nsegs; 423 424 if (xdr_stream_encode_item_present(xdr) < 0) 425 return -EMSGSIZE; 426 segcount = xdr_reserve_space(xdr, sizeof(*segcount)); 427 if (unlikely(!segcount)) 428 return -EMSGSIZE; 429 /* Actual value encoded below */ 430 431 nchunks = 0; 432 do { 433 seg = rpcrdma_mr_prepare(r_xprt, req, seg, nsegs, true, &mr); 434 if (IS_ERR(seg)) 435 return PTR_ERR(seg); 436 437 if (encode_rdma_segment(xdr, mr) < 0) 438 return -EMSGSIZE; 439 440 trace_xprtrdma_chunk_write(rqst->rq_task, mr, nsegs); 441 r_xprt->rx_stats.write_chunk_count++; 442 r_xprt->rx_stats.total_rdma_request += mr->mr_length; 443 nchunks++; 444 nsegs -= mr->mr_nents; 445 } while (nsegs); 446 447 /* Update count of segments in this Write chunk */ 448 *segcount = cpu_to_be32(nchunks); 449 450 done: 451 if (xdr_stream_encode_item_absent(xdr) < 0) 452 return -EMSGSIZE; 453 return 0; 454 } 455 456 /* Register and XDR encode the Reply chunk. Supports encoding an array 457 * of plain segments that belong to a single write (reply) chunk. 458 * 459 * Encoding key for single-list chunks (HLOO = Handle32 Length32 Offset64): 460 * 461 * Reply chunk (a counted array): 462 * N elements: 463 * 1 - N - HLOO - HLOO - ... - HLOO 464 * 465 * Returns zero on success, or a negative errno if a failure occurred. 466 * @xdr is advanced to the next position in the stream. 467 */ 468 static int rpcrdma_encode_reply_chunk(struct rpcrdma_xprt *r_xprt, 469 struct rpcrdma_req *req, 470 struct rpc_rqst *rqst, 471 enum rpcrdma_chunktype wtype) 472 { 473 struct xdr_stream *xdr = &req->rl_stream; 474 struct rpcrdma_mr_seg *seg; 475 struct rpcrdma_mr *mr; 476 int nsegs, nchunks; 477 __be32 *segcount; 478 479 if (wtype != rpcrdma_replych) { 480 if (xdr_stream_encode_item_absent(xdr) < 0) 481 return -EMSGSIZE; 482 return 0; 483 } 484 485 seg = req->rl_segments; 486 nsegs = rpcrdma_convert_iovs(r_xprt, &rqst->rq_rcv_buf, 0, wtype, seg); 487 if (nsegs < 0) 488 return nsegs; 489 490 if (xdr_stream_encode_item_present(xdr) < 0) 491 return -EMSGSIZE; 492 segcount = xdr_reserve_space(xdr, sizeof(*segcount)); 493 if (unlikely(!segcount)) 494 return -EMSGSIZE; 495 /* Actual value encoded below */ 496 497 nchunks = 0; 498 do { 499 seg = rpcrdma_mr_prepare(r_xprt, req, seg, nsegs, true, &mr); 500 if (IS_ERR(seg)) 501 return PTR_ERR(seg); 502 503 if (encode_rdma_segment(xdr, mr) < 0) 504 return -EMSGSIZE; 505 506 trace_xprtrdma_chunk_reply(rqst->rq_task, mr, nsegs); 507 r_xprt->rx_stats.reply_chunk_count++; 508 r_xprt->rx_stats.total_rdma_request += mr->mr_length; 509 nchunks++; 510 nsegs -= mr->mr_nents; 511 } while (nsegs); 512 513 /* Update count of segments in the Reply chunk */ 514 *segcount = cpu_to_be32(nchunks); 515 516 return 0; 517 } 518 519 static void rpcrdma_sendctx_done(struct kref *kref) 520 { 521 struct rpcrdma_req *req = 522 container_of(kref, struct rpcrdma_req, rl_kref); 523 struct rpcrdma_rep *rep = req->rl_reply; 524 525 rpcrdma_complete_rqst(rep); 526 rep->rr_rxprt->rx_stats.reply_waits_for_send++; 527 } 528 529 /** 530 * rpcrdma_sendctx_unmap - DMA-unmap Send buffer 531 * @sc: sendctx containing SGEs to unmap 532 * 533 */ 534 void rpcrdma_sendctx_unmap(struct rpcrdma_sendctx *sc) 535 { 536 struct rpcrdma_regbuf *rb = sc->sc_req->rl_sendbuf; 537 struct ib_sge *sge; 538 539 if (!sc->sc_unmap_count) 540 return; 541 542 /* The first two SGEs contain the transport header and 543 * the inline buffer. These are always left mapped so 544 * they can be cheaply re-used. 545 */ 546 for (sge = &sc->sc_sges[2]; sc->sc_unmap_count; 547 ++sge, --sc->sc_unmap_count) 548 ib_dma_unmap_page(rdmab_device(rb), sge->addr, sge->length, 549 DMA_TO_DEVICE); 550 551 kref_put(&sc->sc_req->rl_kref, rpcrdma_sendctx_done); 552 } 553 554 /* Prepare an SGE for the RPC-over-RDMA transport header. 555 */ 556 static void rpcrdma_prepare_hdr_sge(struct rpcrdma_xprt *r_xprt, 557 struct rpcrdma_req *req, u32 len) 558 { 559 struct rpcrdma_sendctx *sc = req->rl_sendctx; 560 struct rpcrdma_regbuf *rb = req->rl_rdmabuf; 561 struct ib_sge *sge = &sc->sc_sges[req->rl_wr.num_sge++]; 562 563 sge->addr = rdmab_addr(rb); 564 sge->length = len; 565 sge->lkey = rdmab_lkey(rb); 566 567 ib_dma_sync_single_for_device(rdmab_device(rb), sge->addr, sge->length, 568 DMA_TO_DEVICE); 569 } 570 571 /* The head iovec is straightforward, as it is usually already 572 * DMA-mapped. Sync the content that has changed. 573 */ 574 static bool rpcrdma_prepare_head_iov(struct rpcrdma_xprt *r_xprt, 575 struct rpcrdma_req *req, unsigned int len) 576 { 577 struct rpcrdma_sendctx *sc = req->rl_sendctx; 578 struct ib_sge *sge = &sc->sc_sges[req->rl_wr.num_sge++]; 579 struct rpcrdma_regbuf *rb = req->rl_sendbuf; 580 581 if (!rpcrdma_regbuf_dma_map(r_xprt, rb)) 582 return false; 583 584 sge->addr = rdmab_addr(rb); 585 sge->length = len; 586 sge->lkey = rdmab_lkey(rb); 587 588 ib_dma_sync_single_for_device(rdmab_device(rb), sge->addr, sge->length, 589 DMA_TO_DEVICE); 590 return true; 591 } 592 593 /* If there is a page list present, DMA map and prepare an 594 * SGE for each page to be sent. 595 */ 596 static bool rpcrdma_prepare_pagelist(struct rpcrdma_req *req, 597 struct xdr_buf *xdr) 598 { 599 struct rpcrdma_sendctx *sc = req->rl_sendctx; 600 struct rpcrdma_regbuf *rb = req->rl_sendbuf; 601 unsigned int page_base, len, remaining; 602 struct page **ppages; 603 struct ib_sge *sge; 604 605 ppages = xdr->pages + (xdr->page_base >> PAGE_SHIFT); 606 page_base = offset_in_page(xdr->page_base); 607 remaining = xdr->page_len; 608 while (remaining) { 609 sge = &sc->sc_sges[req->rl_wr.num_sge++]; 610 len = min_t(unsigned int, PAGE_SIZE - page_base, remaining); 611 sge->addr = ib_dma_map_page(rdmab_device(rb), *ppages, 612 page_base, len, DMA_TO_DEVICE); 613 if (ib_dma_mapping_error(rdmab_device(rb), sge->addr)) 614 goto out_mapping_err; 615 616 sge->length = len; 617 sge->lkey = rdmab_lkey(rb); 618 619 sc->sc_unmap_count++; 620 ppages++; 621 remaining -= len; 622 page_base = 0; 623 } 624 625 return true; 626 627 out_mapping_err: 628 trace_xprtrdma_dma_maperr(sge->addr); 629 return false; 630 } 631 632 /* The tail iovec may include an XDR pad for the page list, 633 * as well as additional content, and may not reside in the 634 * same page as the head iovec. 635 */ 636 static bool rpcrdma_prepare_tail_iov(struct rpcrdma_req *req, 637 struct xdr_buf *xdr, 638 unsigned int page_base, unsigned int len) 639 { 640 struct rpcrdma_sendctx *sc = req->rl_sendctx; 641 struct ib_sge *sge = &sc->sc_sges[req->rl_wr.num_sge++]; 642 struct rpcrdma_regbuf *rb = req->rl_sendbuf; 643 struct page *page = virt_to_page(xdr->tail[0].iov_base); 644 645 sge->addr = ib_dma_map_page(rdmab_device(rb), page, page_base, len, 646 DMA_TO_DEVICE); 647 if (ib_dma_mapping_error(rdmab_device(rb), sge->addr)) 648 goto out_mapping_err; 649 650 sge->length = len; 651 sge->lkey = rdmab_lkey(rb); 652 ++sc->sc_unmap_count; 653 return true; 654 655 out_mapping_err: 656 trace_xprtrdma_dma_maperr(sge->addr); 657 return false; 658 } 659 660 /* Copy the tail to the end of the head buffer. 661 */ 662 static void rpcrdma_pullup_tail_iov(struct rpcrdma_xprt *r_xprt, 663 struct rpcrdma_req *req, 664 struct xdr_buf *xdr) 665 { 666 unsigned char *dst; 667 668 dst = (unsigned char *)xdr->head[0].iov_base; 669 dst += xdr->head[0].iov_len + xdr->page_len; 670 memmove(dst, xdr->tail[0].iov_base, xdr->tail[0].iov_len); 671 r_xprt->rx_stats.pullup_copy_count += xdr->tail[0].iov_len; 672 } 673 674 /* Copy pagelist content into the head buffer. 675 */ 676 static void rpcrdma_pullup_pagelist(struct rpcrdma_xprt *r_xprt, 677 struct rpcrdma_req *req, 678 struct xdr_buf *xdr) 679 { 680 unsigned int len, page_base, remaining; 681 struct page **ppages; 682 unsigned char *src, *dst; 683 684 dst = (unsigned char *)xdr->head[0].iov_base; 685 dst += xdr->head[0].iov_len; 686 ppages = xdr->pages + (xdr->page_base >> PAGE_SHIFT); 687 page_base = offset_in_page(xdr->page_base); 688 remaining = xdr->page_len; 689 while (remaining) { 690 src = page_address(*ppages); 691 src += page_base; 692 len = min_t(unsigned int, PAGE_SIZE - page_base, remaining); 693 memcpy(dst, src, len); 694 r_xprt->rx_stats.pullup_copy_count += len; 695 696 ppages++; 697 dst += len; 698 remaining -= len; 699 page_base = 0; 700 } 701 } 702 703 /* Copy the contents of @xdr into @rl_sendbuf and DMA sync it. 704 * When the head, pagelist, and tail are small, a pull-up copy 705 * is considerably less costly than DMA mapping the components 706 * of @xdr. 707 * 708 * Assumptions: 709 * - the caller has already verified that the total length 710 * of the RPC Call body will fit into @rl_sendbuf. 711 */ 712 static bool rpcrdma_prepare_noch_pullup(struct rpcrdma_xprt *r_xprt, 713 struct rpcrdma_req *req, 714 struct xdr_buf *xdr) 715 { 716 if (unlikely(xdr->tail[0].iov_len)) 717 rpcrdma_pullup_tail_iov(r_xprt, req, xdr); 718 719 if (unlikely(xdr->page_len)) 720 rpcrdma_pullup_pagelist(r_xprt, req, xdr); 721 722 /* The whole RPC message resides in the head iovec now */ 723 return rpcrdma_prepare_head_iov(r_xprt, req, xdr->len); 724 } 725 726 static bool rpcrdma_prepare_noch_mapped(struct rpcrdma_xprt *r_xprt, 727 struct rpcrdma_req *req, 728 struct xdr_buf *xdr) 729 { 730 struct kvec *tail = &xdr->tail[0]; 731 732 if (!rpcrdma_prepare_head_iov(r_xprt, req, xdr->head[0].iov_len)) 733 return false; 734 if (xdr->page_len) 735 if (!rpcrdma_prepare_pagelist(req, xdr)) 736 return false; 737 if (tail->iov_len) 738 if (!rpcrdma_prepare_tail_iov(req, xdr, 739 offset_in_page(tail->iov_base), 740 tail->iov_len)) 741 return false; 742 743 if (req->rl_sendctx->sc_unmap_count) 744 kref_get(&req->rl_kref); 745 return true; 746 } 747 748 static bool rpcrdma_prepare_readch(struct rpcrdma_xprt *r_xprt, 749 struct rpcrdma_req *req, 750 struct xdr_buf *xdr) 751 { 752 if (!rpcrdma_prepare_head_iov(r_xprt, req, xdr->head[0].iov_len)) 753 return false; 754 755 /* If there is a Read chunk, the page list is being handled 756 * via explicit RDMA, and thus is skipped here. 757 */ 758 759 /* Do not include the tail if it is only an XDR pad */ 760 if (xdr->tail[0].iov_len > 3) { 761 unsigned int page_base, len; 762 763 /* If the content in the page list is an odd length, 764 * xdr_write_pages() adds a pad at the beginning of 765 * the tail iovec. Force the tail's non-pad content to 766 * land at the next XDR position in the Send message. 767 */ 768 page_base = offset_in_page(xdr->tail[0].iov_base); 769 len = xdr->tail[0].iov_len; 770 page_base += len & 3; 771 len -= len & 3; 772 if (!rpcrdma_prepare_tail_iov(req, xdr, page_base, len)) 773 return false; 774 kref_get(&req->rl_kref); 775 } 776 777 return true; 778 } 779 780 /** 781 * rpcrdma_prepare_send_sges - Construct SGEs for a Send WR 782 * @r_xprt: controlling transport 783 * @req: context of RPC Call being marshalled 784 * @hdrlen: size of transport header, in bytes 785 * @xdr: xdr_buf containing RPC Call 786 * @rtype: chunk type being encoded 787 * 788 * Returns 0 on success; otherwise a negative errno is returned. 789 */ 790 inline int rpcrdma_prepare_send_sges(struct rpcrdma_xprt *r_xprt, 791 struct rpcrdma_req *req, u32 hdrlen, 792 struct xdr_buf *xdr, 793 enum rpcrdma_chunktype rtype) 794 { 795 int ret; 796 797 ret = -EAGAIN; 798 req->rl_sendctx = rpcrdma_sendctx_get_locked(r_xprt); 799 if (!req->rl_sendctx) 800 goto out_nosc; 801 req->rl_sendctx->sc_unmap_count = 0; 802 req->rl_sendctx->sc_req = req; 803 kref_init(&req->rl_kref); 804 req->rl_wr.wr_cqe = &req->rl_sendctx->sc_cqe; 805 req->rl_wr.sg_list = req->rl_sendctx->sc_sges; 806 req->rl_wr.num_sge = 0; 807 req->rl_wr.opcode = IB_WR_SEND; 808 809 rpcrdma_prepare_hdr_sge(r_xprt, req, hdrlen); 810 811 ret = -EIO; 812 switch (rtype) { 813 case rpcrdma_noch_pullup: 814 if (!rpcrdma_prepare_noch_pullup(r_xprt, req, xdr)) 815 goto out_unmap; 816 break; 817 case rpcrdma_noch_mapped: 818 if (!rpcrdma_prepare_noch_mapped(r_xprt, req, xdr)) 819 goto out_unmap; 820 break; 821 case rpcrdma_readch: 822 if (!rpcrdma_prepare_readch(r_xprt, req, xdr)) 823 goto out_unmap; 824 break; 825 case rpcrdma_areadch: 826 break; 827 default: 828 goto out_unmap; 829 } 830 831 return 0; 832 833 out_unmap: 834 rpcrdma_sendctx_unmap(req->rl_sendctx); 835 out_nosc: 836 trace_xprtrdma_prepsend_failed(&req->rl_slot, ret); 837 return ret; 838 } 839 840 /** 841 * rpcrdma_marshal_req - Marshal and send one RPC request 842 * @r_xprt: controlling transport 843 * @rqst: RPC request to be marshaled 844 * 845 * For the RPC in "rqst", this function: 846 * - Chooses the transfer mode (eg., RDMA_MSG or RDMA_NOMSG) 847 * - Registers Read, Write, and Reply chunks 848 * - Constructs the transport header 849 * - Posts a Send WR to send the transport header and request 850 * 851 * Returns: 852 * %0 if the RPC was sent successfully, 853 * %-ENOTCONN if the connection was lost, 854 * %-EAGAIN if the caller should call again with the same arguments, 855 * %-ENOBUFS if the caller should call again after a delay, 856 * %-EMSGSIZE if the transport header is too small, 857 * %-EIO if a permanent problem occurred while marshaling. 858 */ 859 int 860 rpcrdma_marshal_req(struct rpcrdma_xprt *r_xprt, struct rpc_rqst *rqst) 861 { 862 struct rpcrdma_req *req = rpcr_to_rdmar(rqst); 863 struct xdr_stream *xdr = &req->rl_stream; 864 enum rpcrdma_chunktype rtype, wtype; 865 struct xdr_buf *buf = &rqst->rq_snd_buf; 866 bool ddp_allowed; 867 __be32 *p; 868 int ret; 869 870 rpcrdma_set_xdrlen(&req->rl_hdrbuf, 0); 871 xdr_init_encode(xdr, &req->rl_hdrbuf, rdmab_data(req->rl_rdmabuf), 872 rqst); 873 874 /* Fixed header fields */ 875 ret = -EMSGSIZE; 876 p = xdr_reserve_space(xdr, 4 * sizeof(*p)); 877 if (!p) 878 goto out_err; 879 *p++ = rqst->rq_xid; 880 *p++ = rpcrdma_version; 881 *p++ = r_xprt->rx_buf.rb_max_requests; 882 883 /* When the ULP employs a GSS flavor that guarantees integrity 884 * or privacy, direct data placement of individual data items 885 * is not allowed. 886 */ 887 ddp_allowed = !test_bit(RPCAUTH_AUTH_DATATOUCH, 888 &rqst->rq_cred->cr_auth->au_flags); 889 890 /* 891 * Chunks needed for results? 892 * 893 * o If the expected result is under the inline threshold, all ops 894 * return as inline. 895 * o Large read ops return data as write chunk(s), header as 896 * inline. 897 * o Large non-read ops return as a single reply chunk. 898 */ 899 if (rpcrdma_results_inline(r_xprt, rqst)) 900 wtype = rpcrdma_noch; 901 else if ((ddp_allowed && rqst->rq_rcv_buf.flags & XDRBUF_READ) && 902 rpcrdma_nonpayload_inline(r_xprt, rqst)) 903 wtype = rpcrdma_writech; 904 else 905 wtype = rpcrdma_replych; 906 907 /* 908 * Chunks needed for arguments? 909 * 910 * o If the total request is under the inline threshold, all ops 911 * are sent as inline. 912 * o Large write ops transmit data as read chunk(s), header as 913 * inline. 914 * o Large non-write ops are sent with the entire message as a 915 * single read chunk (protocol 0-position special case). 916 * 917 * This assumes that the upper layer does not present a request 918 * that both has a data payload, and whose non-data arguments 919 * by themselves are larger than the inline threshold. 920 */ 921 if (rpcrdma_args_inline(r_xprt, rqst)) { 922 *p++ = rdma_msg; 923 rtype = buf->len < rdmab_length(req->rl_sendbuf) ? 924 rpcrdma_noch_pullup : rpcrdma_noch_mapped; 925 } else if (ddp_allowed && buf->flags & XDRBUF_WRITE) { 926 *p++ = rdma_msg; 927 rtype = rpcrdma_readch; 928 } else { 929 r_xprt->rx_stats.nomsg_call_count++; 930 *p++ = rdma_nomsg; 931 rtype = rpcrdma_areadch; 932 } 933 934 /* This implementation supports the following combinations 935 * of chunk lists in one RPC-over-RDMA Call message: 936 * 937 * - Read list 938 * - Write list 939 * - Reply chunk 940 * - Read list + Reply chunk 941 * 942 * It might not yet support the following combinations: 943 * 944 * - Read list + Write list 945 * 946 * It does not support the following combinations: 947 * 948 * - Write list + Reply chunk 949 * - Read list + Write list + Reply chunk 950 * 951 * This implementation supports only a single chunk in each 952 * Read or Write list. Thus for example the client cannot 953 * send a Call message with a Position Zero Read chunk and a 954 * regular Read chunk at the same time. 955 */ 956 ret = rpcrdma_encode_read_list(r_xprt, req, rqst, rtype); 957 if (ret) 958 goto out_err; 959 ret = rpcrdma_encode_write_list(r_xprt, req, rqst, wtype); 960 if (ret) 961 goto out_err; 962 ret = rpcrdma_encode_reply_chunk(r_xprt, req, rqst, wtype); 963 if (ret) 964 goto out_err; 965 966 ret = rpcrdma_prepare_send_sges(r_xprt, req, req->rl_hdrbuf.len, 967 buf, rtype); 968 if (ret) 969 goto out_err; 970 971 trace_xprtrdma_marshal(req, rtype, wtype); 972 return 0; 973 974 out_err: 975 trace_xprtrdma_marshal_failed(rqst, ret); 976 r_xprt->rx_stats.failed_marshal_count++; 977 frwr_reset(req); 978 return ret; 979 } 980 981 static void __rpcrdma_update_cwnd_locked(struct rpc_xprt *xprt, 982 struct rpcrdma_buffer *buf, 983 u32 grant) 984 { 985 buf->rb_credits = grant; 986 xprt->cwnd = grant << RPC_CWNDSHIFT; 987 } 988 989 static void rpcrdma_update_cwnd(struct rpcrdma_xprt *r_xprt, u32 grant) 990 { 991 struct rpc_xprt *xprt = &r_xprt->rx_xprt; 992 993 spin_lock(&xprt->transport_lock); 994 __rpcrdma_update_cwnd_locked(xprt, &r_xprt->rx_buf, grant); 995 spin_unlock(&xprt->transport_lock); 996 } 997 998 /** 999 * rpcrdma_reset_cwnd - Reset the xprt's congestion window 1000 * @r_xprt: controlling transport instance 1001 * 1002 * Prepare @r_xprt for the next connection by reinitializing 1003 * its credit grant to one (see RFC 8166, Section 3.3.3). 1004 */ 1005 void rpcrdma_reset_cwnd(struct rpcrdma_xprt *r_xprt) 1006 { 1007 struct rpc_xprt *xprt = &r_xprt->rx_xprt; 1008 1009 spin_lock(&xprt->transport_lock); 1010 xprt->cong = 0; 1011 __rpcrdma_update_cwnd_locked(xprt, &r_xprt->rx_buf, 1); 1012 spin_unlock(&xprt->transport_lock); 1013 } 1014 1015 /** 1016 * rpcrdma_inline_fixup - Scatter inline received data into rqst's iovecs 1017 * @rqst: controlling RPC request 1018 * @srcp: points to RPC message payload in receive buffer 1019 * @copy_len: remaining length of receive buffer content 1020 * @pad: Write chunk pad bytes needed (zero for pure inline) 1021 * 1022 * The upper layer has set the maximum number of bytes it can 1023 * receive in each component of rq_rcv_buf. These values are set in 1024 * the head.iov_len, page_len, tail.iov_len, and buflen fields. 1025 * 1026 * Unlike the TCP equivalent (xdr_partial_copy_from_skb), in 1027 * many cases this function simply updates iov_base pointers in 1028 * rq_rcv_buf to point directly to the received reply data, to 1029 * avoid copying reply data. 1030 * 1031 * Returns the count of bytes which had to be memcopied. 1032 */ 1033 static unsigned long 1034 rpcrdma_inline_fixup(struct rpc_rqst *rqst, char *srcp, int copy_len, int pad) 1035 { 1036 unsigned long fixup_copy_count; 1037 int i, npages, curlen; 1038 char *destp; 1039 struct page **ppages; 1040 int page_base; 1041 1042 /* The head iovec is redirected to the RPC reply message 1043 * in the receive buffer, to avoid a memcopy. 1044 */ 1045 rqst->rq_rcv_buf.head[0].iov_base = srcp; 1046 rqst->rq_private_buf.head[0].iov_base = srcp; 1047 1048 /* The contents of the receive buffer that follow 1049 * head.iov_len bytes are copied into the page list. 1050 */ 1051 curlen = rqst->rq_rcv_buf.head[0].iov_len; 1052 if (curlen > copy_len) 1053 curlen = copy_len; 1054 srcp += curlen; 1055 copy_len -= curlen; 1056 1057 ppages = rqst->rq_rcv_buf.pages + 1058 (rqst->rq_rcv_buf.page_base >> PAGE_SHIFT); 1059 page_base = offset_in_page(rqst->rq_rcv_buf.page_base); 1060 fixup_copy_count = 0; 1061 if (copy_len && rqst->rq_rcv_buf.page_len) { 1062 int pagelist_len; 1063 1064 pagelist_len = rqst->rq_rcv_buf.page_len; 1065 if (pagelist_len > copy_len) 1066 pagelist_len = copy_len; 1067 npages = PAGE_ALIGN(page_base + pagelist_len) >> PAGE_SHIFT; 1068 for (i = 0; i < npages; i++) { 1069 curlen = PAGE_SIZE - page_base; 1070 if (curlen > pagelist_len) 1071 curlen = pagelist_len; 1072 1073 destp = kmap_atomic(ppages[i]); 1074 memcpy(destp + page_base, srcp, curlen); 1075 flush_dcache_page(ppages[i]); 1076 kunmap_atomic(destp); 1077 srcp += curlen; 1078 copy_len -= curlen; 1079 fixup_copy_count += curlen; 1080 pagelist_len -= curlen; 1081 if (!pagelist_len) 1082 break; 1083 page_base = 0; 1084 } 1085 1086 /* Implicit padding for the last segment in a Write 1087 * chunk is inserted inline at the front of the tail 1088 * iovec. The upper layer ignores the content of 1089 * the pad. Simply ensure inline content in the tail 1090 * that follows the Write chunk is properly aligned. 1091 */ 1092 if (pad) 1093 srcp -= pad; 1094 } 1095 1096 /* The tail iovec is redirected to the remaining data 1097 * in the receive buffer, to avoid a memcopy. 1098 */ 1099 if (copy_len || pad) { 1100 rqst->rq_rcv_buf.tail[0].iov_base = srcp; 1101 rqst->rq_private_buf.tail[0].iov_base = srcp; 1102 } 1103 1104 if (fixup_copy_count) 1105 trace_xprtrdma_fixup(rqst, fixup_copy_count); 1106 return fixup_copy_count; 1107 } 1108 1109 /* By convention, backchannel calls arrive via rdma_msg type 1110 * messages, and never populate the chunk lists. This makes 1111 * the RPC/RDMA header small and fixed in size, so it is 1112 * straightforward to check the RPC header's direction field. 1113 */ 1114 static bool 1115 rpcrdma_is_bcall(struct rpcrdma_xprt *r_xprt, struct rpcrdma_rep *rep) 1116 #if defined(CONFIG_SUNRPC_BACKCHANNEL) 1117 { 1118 struct xdr_stream *xdr = &rep->rr_stream; 1119 __be32 *p; 1120 1121 if (rep->rr_proc != rdma_msg) 1122 return false; 1123 1124 /* Peek at stream contents without advancing. */ 1125 p = xdr_inline_decode(xdr, 0); 1126 1127 /* Chunk lists */ 1128 if (xdr_item_is_present(p++)) 1129 return false; 1130 if (xdr_item_is_present(p++)) 1131 return false; 1132 if (xdr_item_is_present(p++)) 1133 return false; 1134 1135 /* RPC header */ 1136 if (*p++ != rep->rr_xid) 1137 return false; 1138 if (*p != cpu_to_be32(RPC_CALL)) 1139 return false; 1140 1141 /* Now that we are sure this is a backchannel call, 1142 * advance to the RPC header. 1143 */ 1144 p = xdr_inline_decode(xdr, 3 * sizeof(*p)); 1145 if (unlikely(!p)) 1146 goto out_short; 1147 1148 rpcrdma_bc_receive_call(r_xprt, rep); 1149 return true; 1150 1151 out_short: 1152 pr_warn("RPC/RDMA short backward direction call\n"); 1153 return true; 1154 } 1155 #else /* CONFIG_SUNRPC_BACKCHANNEL */ 1156 { 1157 return false; 1158 } 1159 #endif /* CONFIG_SUNRPC_BACKCHANNEL */ 1160 1161 static int decode_rdma_segment(struct xdr_stream *xdr, u32 *length) 1162 { 1163 u32 handle; 1164 u64 offset; 1165 __be32 *p; 1166 1167 p = xdr_inline_decode(xdr, 4 * sizeof(*p)); 1168 if (unlikely(!p)) 1169 return -EIO; 1170 1171 xdr_decode_rdma_segment(p, &handle, length, &offset); 1172 trace_xprtrdma_decode_seg(handle, *length, offset); 1173 return 0; 1174 } 1175 1176 static int decode_write_chunk(struct xdr_stream *xdr, u32 *length) 1177 { 1178 u32 segcount, seglength; 1179 __be32 *p; 1180 1181 p = xdr_inline_decode(xdr, sizeof(*p)); 1182 if (unlikely(!p)) 1183 return -EIO; 1184 1185 *length = 0; 1186 segcount = be32_to_cpup(p); 1187 while (segcount--) { 1188 if (decode_rdma_segment(xdr, &seglength)) 1189 return -EIO; 1190 *length += seglength; 1191 } 1192 1193 return 0; 1194 } 1195 1196 /* In RPC-over-RDMA Version One replies, a Read list is never 1197 * expected. This decoder is a stub that returns an error if 1198 * a Read list is present. 1199 */ 1200 static int decode_read_list(struct xdr_stream *xdr) 1201 { 1202 __be32 *p; 1203 1204 p = xdr_inline_decode(xdr, sizeof(*p)); 1205 if (unlikely(!p)) 1206 return -EIO; 1207 if (unlikely(xdr_item_is_present(p))) 1208 return -EIO; 1209 return 0; 1210 } 1211 1212 /* Supports only one Write chunk in the Write list 1213 */ 1214 static int decode_write_list(struct xdr_stream *xdr, u32 *length) 1215 { 1216 u32 chunklen; 1217 bool first; 1218 __be32 *p; 1219 1220 *length = 0; 1221 first = true; 1222 do { 1223 p = xdr_inline_decode(xdr, sizeof(*p)); 1224 if (unlikely(!p)) 1225 return -EIO; 1226 if (xdr_item_is_absent(p)) 1227 break; 1228 if (!first) 1229 return -EIO; 1230 1231 if (decode_write_chunk(xdr, &chunklen)) 1232 return -EIO; 1233 *length += chunklen; 1234 first = false; 1235 } while (true); 1236 return 0; 1237 } 1238 1239 static int decode_reply_chunk(struct xdr_stream *xdr, u32 *length) 1240 { 1241 __be32 *p; 1242 1243 p = xdr_inline_decode(xdr, sizeof(*p)); 1244 if (unlikely(!p)) 1245 return -EIO; 1246 1247 *length = 0; 1248 if (xdr_item_is_present(p)) 1249 if (decode_write_chunk(xdr, length)) 1250 return -EIO; 1251 return 0; 1252 } 1253 1254 static int 1255 rpcrdma_decode_msg(struct rpcrdma_xprt *r_xprt, struct rpcrdma_rep *rep, 1256 struct rpc_rqst *rqst) 1257 { 1258 struct xdr_stream *xdr = &rep->rr_stream; 1259 u32 writelist, replychunk, rpclen; 1260 char *base; 1261 1262 /* Decode the chunk lists */ 1263 if (decode_read_list(xdr)) 1264 return -EIO; 1265 if (decode_write_list(xdr, &writelist)) 1266 return -EIO; 1267 if (decode_reply_chunk(xdr, &replychunk)) 1268 return -EIO; 1269 1270 /* RDMA_MSG sanity checks */ 1271 if (unlikely(replychunk)) 1272 return -EIO; 1273 1274 /* Build the RPC reply's Payload stream in rqst->rq_rcv_buf */ 1275 base = (char *)xdr_inline_decode(xdr, 0); 1276 rpclen = xdr_stream_remaining(xdr); 1277 r_xprt->rx_stats.fixup_copy_count += 1278 rpcrdma_inline_fixup(rqst, base, rpclen, writelist & 3); 1279 1280 r_xprt->rx_stats.total_rdma_reply += writelist; 1281 return rpclen + xdr_align_size(writelist); 1282 } 1283 1284 static noinline int 1285 rpcrdma_decode_nomsg(struct rpcrdma_xprt *r_xprt, struct rpcrdma_rep *rep) 1286 { 1287 struct xdr_stream *xdr = &rep->rr_stream; 1288 u32 writelist, replychunk; 1289 1290 /* Decode the chunk lists */ 1291 if (decode_read_list(xdr)) 1292 return -EIO; 1293 if (decode_write_list(xdr, &writelist)) 1294 return -EIO; 1295 if (decode_reply_chunk(xdr, &replychunk)) 1296 return -EIO; 1297 1298 /* RDMA_NOMSG sanity checks */ 1299 if (unlikely(writelist)) 1300 return -EIO; 1301 if (unlikely(!replychunk)) 1302 return -EIO; 1303 1304 /* Reply chunk buffer already is the reply vector */ 1305 r_xprt->rx_stats.total_rdma_reply += replychunk; 1306 return replychunk; 1307 } 1308 1309 static noinline int 1310 rpcrdma_decode_error(struct rpcrdma_xprt *r_xprt, struct rpcrdma_rep *rep, 1311 struct rpc_rqst *rqst) 1312 { 1313 struct xdr_stream *xdr = &rep->rr_stream; 1314 __be32 *p; 1315 1316 p = xdr_inline_decode(xdr, sizeof(*p)); 1317 if (unlikely(!p)) 1318 return -EIO; 1319 1320 switch (*p) { 1321 case err_vers: 1322 p = xdr_inline_decode(xdr, 2 * sizeof(*p)); 1323 if (!p) 1324 break; 1325 dprintk("RPC: %s: server reports " 1326 "version error (%u-%u), xid %08x\n", __func__, 1327 be32_to_cpup(p), be32_to_cpu(*(p + 1)), 1328 be32_to_cpu(rep->rr_xid)); 1329 break; 1330 case err_chunk: 1331 dprintk("RPC: %s: server reports " 1332 "header decoding error, xid %08x\n", __func__, 1333 be32_to_cpu(rep->rr_xid)); 1334 break; 1335 default: 1336 dprintk("RPC: %s: server reports " 1337 "unrecognized error %d, xid %08x\n", __func__, 1338 be32_to_cpup(p), be32_to_cpu(rep->rr_xid)); 1339 } 1340 1341 return -EIO; 1342 } 1343 1344 /* Perform XID lookup, reconstruction of the RPC reply, and 1345 * RPC completion while holding the transport lock to ensure 1346 * the rep, rqst, and rq_task pointers remain stable. 1347 */ 1348 void rpcrdma_complete_rqst(struct rpcrdma_rep *rep) 1349 { 1350 struct rpcrdma_xprt *r_xprt = rep->rr_rxprt; 1351 struct rpc_xprt *xprt = &r_xprt->rx_xprt; 1352 struct rpc_rqst *rqst = rep->rr_rqst; 1353 int status; 1354 1355 switch (rep->rr_proc) { 1356 case rdma_msg: 1357 status = rpcrdma_decode_msg(r_xprt, rep, rqst); 1358 break; 1359 case rdma_nomsg: 1360 status = rpcrdma_decode_nomsg(r_xprt, rep); 1361 break; 1362 case rdma_error: 1363 status = rpcrdma_decode_error(r_xprt, rep, rqst); 1364 break; 1365 default: 1366 status = -EIO; 1367 } 1368 if (status < 0) 1369 goto out_badheader; 1370 1371 out: 1372 spin_lock(&xprt->queue_lock); 1373 xprt_complete_rqst(rqst->rq_task, status); 1374 xprt_unpin_rqst(rqst); 1375 spin_unlock(&xprt->queue_lock); 1376 return; 1377 1378 out_badheader: 1379 trace_xprtrdma_reply_hdr(rep); 1380 r_xprt->rx_stats.bad_reply_count++; 1381 rqst->rq_task->tk_status = status; 1382 status = 0; 1383 goto out; 1384 } 1385 1386 static void rpcrdma_reply_done(struct kref *kref) 1387 { 1388 struct rpcrdma_req *req = 1389 container_of(kref, struct rpcrdma_req, rl_kref); 1390 1391 rpcrdma_complete_rqst(req->rl_reply); 1392 } 1393 1394 /** 1395 * rpcrdma_reply_handler - Process received RPC/RDMA messages 1396 * @rep: Incoming rpcrdma_rep object to process 1397 * 1398 * Errors must result in the RPC task either being awakened, or 1399 * allowed to timeout, to discover the errors at that time. 1400 */ 1401 void rpcrdma_reply_handler(struct rpcrdma_rep *rep) 1402 { 1403 struct rpcrdma_xprt *r_xprt = rep->rr_rxprt; 1404 struct rpc_xprt *xprt = &r_xprt->rx_xprt; 1405 struct rpcrdma_buffer *buf = &r_xprt->rx_buf; 1406 struct rpcrdma_req *req; 1407 struct rpc_rqst *rqst; 1408 u32 credits; 1409 __be32 *p; 1410 1411 /* Any data means we had a useful conversation, so 1412 * then we don't need to delay the next reconnect. 1413 */ 1414 if (xprt->reestablish_timeout) 1415 xprt->reestablish_timeout = 0; 1416 1417 /* Fixed transport header fields */ 1418 xdr_init_decode(&rep->rr_stream, &rep->rr_hdrbuf, 1419 rep->rr_hdrbuf.head[0].iov_base, NULL); 1420 p = xdr_inline_decode(&rep->rr_stream, 4 * sizeof(*p)); 1421 if (unlikely(!p)) 1422 goto out_shortreply; 1423 rep->rr_xid = *p++; 1424 rep->rr_vers = *p++; 1425 credits = be32_to_cpu(*p++); 1426 rep->rr_proc = *p++; 1427 1428 if (rep->rr_vers != rpcrdma_version) 1429 goto out_badversion; 1430 1431 if (rpcrdma_is_bcall(r_xprt, rep)) 1432 return; 1433 1434 /* Match incoming rpcrdma_rep to an rpcrdma_req to 1435 * get context for handling any incoming chunks. 1436 */ 1437 spin_lock(&xprt->queue_lock); 1438 rqst = xprt_lookup_rqst(xprt, rep->rr_xid); 1439 if (!rqst) 1440 goto out_norqst; 1441 xprt_pin_rqst(rqst); 1442 spin_unlock(&xprt->queue_lock); 1443 1444 if (credits == 0) 1445 credits = 1; /* don't deadlock */ 1446 else if (credits > r_xprt->rx_ep->re_max_requests) 1447 credits = r_xprt->rx_ep->re_max_requests; 1448 if (buf->rb_credits != credits) 1449 rpcrdma_update_cwnd(r_xprt, credits); 1450 rpcrdma_post_recvs(r_xprt, false); 1451 1452 req = rpcr_to_rdmar(rqst); 1453 if (req->rl_reply) { 1454 trace_xprtrdma_leaked_rep(rqst, req->rl_reply); 1455 rpcrdma_recv_buffer_put(req->rl_reply); 1456 } 1457 req->rl_reply = rep; 1458 rep->rr_rqst = rqst; 1459 1460 trace_xprtrdma_reply(rqst->rq_task, rep, req, credits); 1461 1462 if (rep->rr_wc_flags & IB_WC_WITH_INVALIDATE) 1463 frwr_reminv(rep, &req->rl_registered); 1464 if (!list_empty(&req->rl_registered)) 1465 frwr_unmap_async(r_xprt, req); 1466 /* LocalInv completion will complete the RPC */ 1467 else 1468 kref_put(&req->rl_kref, rpcrdma_reply_done); 1469 return; 1470 1471 out_badversion: 1472 trace_xprtrdma_reply_vers(rep); 1473 goto out; 1474 1475 out_norqst: 1476 spin_unlock(&xprt->queue_lock); 1477 trace_xprtrdma_reply_rqst(rep); 1478 goto out; 1479 1480 out_shortreply: 1481 trace_xprtrdma_reply_short(rep); 1482 1483 out: 1484 rpcrdma_recv_buffer_put(rep); 1485 } 1486