1 // SPDX-License-Identifier: GPL-2.0 OR BSD-3-Clause 2 /* 3 * Copyright (c) 2016-2018 Oracle. All rights reserved. 4 * Copyright (c) 2014 Open Grid Computing, Inc. All rights reserved. 5 * Copyright (c) 2005-2006 Network Appliance, Inc. All rights reserved. 6 * 7 * This software is available to you under a choice of one of two 8 * licenses. You may choose to be licensed under the terms of the GNU 9 * General Public License (GPL) Version 2, available from the file 10 * COPYING in the main directory of this source tree, or the BSD-type 11 * license below: 12 * 13 * Redistribution and use in source and binary forms, with or without 14 * modification, are permitted provided that the following conditions 15 * are met: 16 * 17 * Redistributions of source code must retain the above copyright 18 * notice, this list of conditions and the following disclaimer. 19 * 20 * Redistributions in binary form must reproduce the above 21 * copyright notice, this list of conditions and the following 22 * disclaimer in the documentation and/or other materials provided 23 * with the distribution. 24 * 25 * Neither the name of the Network Appliance, Inc. nor the names of 26 * its contributors may be used to endorse or promote products 27 * derived from this software without specific prior written 28 * permission. 29 * 30 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 31 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 32 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 33 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 34 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 35 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 36 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 37 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 38 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 39 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 40 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 41 * 42 * Author: Tom Tucker <tom@opengridcomputing.com> 43 */ 44 45 /* Operation 46 * 47 * The main entry point is svc_rdma_recvfrom. This is called from 48 * svc_recv when the transport indicates there is incoming data to 49 * be read. "Data Ready" is signaled when an RDMA Receive completes, 50 * or when a set of RDMA Reads complete. 51 * 52 * An svc_rqst is passed in. This structure contains an array of 53 * free pages (rq_pages) that will contain the incoming RPC message. 54 * 55 * Short messages are moved directly into svc_rqst::rq_arg, and 56 * the RPC Call is ready to be processed by the Upper Layer. 57 * svc_rdma_recvfrom returns the length of the RPC Call message, 58 * completing the reception of the RPC Call. 59 * 60 * However, when an incoming message has Read chunks, 61 * svc_rdma_recvfrom must post RDMA Reads to pull the RPC Call's 62 * data payload from the client. svc_rdma_recvfrom sets up the 63 * RDMA Reads using pages in svc_rqst::rq_pages, which are 64 * transferred to an svc_rdma_recv_ctxt for the duration of the 65 * I/O. svc_rdma_recvfrom then returns zero, since the RPC message 66 * is still not yet ready. 67 * 68 * When the Read chunk payloads have become available on the 69 * server, "Data Ready" is raised again, and svc_recv calls 70 * svc_rdma_recvfrom again. This second call may use a different 71 * svc_rqst than the first one, thus any information that needs 72 * to be preserved across these two calls is kept in an 73 * svc_rdma_recv_ctxt. 74 * 75 * The second call to svc_rdma_recvfrom performs final assembly 76 * of the RPC Call message, using the RDMA Read sink pages kept in 77 * the svc_rdma_recv_ctxt. The xdr_buf is copied from the 78 * svc_rdma_recv_ctxt to the second svc_rqst. The second call returns 79 * the length of the completed RPC Call message. 80 * 81 * Page Management 82 * 83 * Pages under I/O must be transferred from the first svc_rqst to an 84 * svc_rdma_recv_ctxt before the first svc_rdma_recvfrom call returns. 85 * 86 * The first svc_rqst supplies pages for RDMA Reads. These are moved 87 * from rqstp::rq_pages into ctxt::pages. The consumed elements of 88 * the rq_pages array are set to NULL and refilled with the first 89 * svc_rdma_recvfrom call returns. 90 * 91 * During the second svc_rdma_recvfrom call, RDMA Read sink pages 92 * are transferred from the svc_rdma_recv_ctxt to the second svc_rqst 93 * (see rdma_read_complete() below). 94 */ 95 96 #include <linux/spinlock.h> 97 #include <asm/unaligned.h> 98 #include <rdma/ib_verbs.h> 99 #include <rdma/rdma_cm.h> 100 101 #include <linux/sunrpc/xdr.h> 102 #include <linux/sunrpc/debug.h> 103 #include <linux/sunrpc/rpc_rdma.h> 104 #include <linux/sunrpc/svc_rdma.h> 105 106 #include "xprt_rdma.h" 107 #include <trace/events/rpcrdma.h> 108 109 #define RPCDBG_FACILITY RPCDBG_SVCXPRT 110 111 static void svc_rdma_wc_receive(struct ib_cq *cq, struct ib_wc *wc); 112 113 static inline struct svc_rdma_recv_ctxt * 114 svc_rdma_next_recv_ctxt(struct list_head *list) 115 { 116 return list_first_entry_or_null(list, struct svc_rdma_recv_ctxt, 117 rc_list); 118 } 119 120 static struct svc_rdma_recv_ctxt * 121 svc_rdma_recv_ctxt_alloc(struct svcxprt_rdma *rdma) 122 { 123 struct svc_rdma_recv_ctxt *ctxt; 124 dma_addr_t addr; 125 void *buffer; 126 127 ctxt = kmalloc(sizeof(*ctxt), GFP_KERNEL); 128 if (!ctxt) 129 goto fail0; 130 buffer = kmalloc(rdma->sc_max_req_size, GFP_KERNEL); 131 if (!buffer) 132 goto fail1; 133 addr = ib_dma_map_single(rdma->sc_pd->device, buffer, 134 rdma->sc_max_req_size, DMA_FROM_DEVICE); 135 if (ib_dma_mapping_error(rdma->sc_pd->device, addr)) 136 goto fail2; 137 138 ctxt->rc_recv_wr.next = NULL; 139 ctxt->rc_recv_wr.wr_cqe = &ctxt->rc_cqe; 140 ctxt->rc_recv_wr.sg_list = &ctxt->rc_recv_sge; 141 ctxt->rc_recv_wr.num_sge = 1; 142 ctxt->rc_cqe.done = svc_rdma_wc_receive; 143 ctxt->rc_recv_sge.addr = addr; 144 ctxt->rc_recv_sge.length = rdma->sc_max_req_size; 145 ctxt->rc_recv_sge.lkey = rdma->sc_pd->local_dma_lkey; 146 ctxt->rc_recv_buf = buffer; 147 ctxt->rc_temp = false; 148 return ctxt; 149 150 fail2: 151 kfree(buffer); 152 fail1: 153 kfree(ctxt); 154 fail0: 155 return NULL; 156 } 157 158 static void svc_rdma_recv_ctxt_destroy(struct svcxprt_rdma *rdma, 159 struct svc_rdma_recv_ctxt *ctxt) 160 { 161 ib_dma_unmap_single(rdma->sc_pd->device, ctxt->rc_recv_sge.addr, 162 ctxt->rc_recv_sge.length, DMA_FROM_DEVICE); 163 kfree(ctxt->rc_recv_buf); 164 kfree(ctxt); 165 } 166 167 /** 168 * svc_rdma_recv_ctxts_destroy - Release all recv_ctxt's for an xprt 169 * @rdma: svcxprt_rdma being torn down 170 * 171 */ 172 void svc_rdma_recv_ctxts_destroy(struct svcxprt_rdma *rdma) 173 { 174 struct svc_rdma_recv_ctxt *ctxt; 175 struct llist_node *node; 176 177 while ((node = llist_del_first(&rdma->sc_recv_ctxts))) { 178 ctxt = llist_entry(node, struct svc_rdma_recv_ctxt, rc_node); 179 svc_rdma_recv_ctxt_destroy(rdma, ctxt); 180 } 181 } 182 183 static struct svc_rdma_recv_ctxt * 184 svc_rdma_recv_ctxt_get(struct svcxprt_rdma *rdma) 185 { 186 struct svc_rdma_recv_ctxt *ctxt; 187 struct llist_node *node; 188 189 node = llist_del_first(&rdma->sc_recv_ctxts); 190 if (!node) 191 goto out_empty; 192 ctxt = llist_entry(node, struct svc_rdma_recv_ctxt, rc_node); 193 194 out: 195 ctxt->rc_page_count = 0; 196 ctxt->rc_read_payload_length = 0; 197 return ctxt; 198 199 out_empty: 200 ctxt = svc_rdma_recv_ctxt_alloc(rdma); 201 if (!ctxt) 202 return NULL; 203 goto out; 204 } 205 206 /** 207 * svc_rdma_recv_ctxt_put - Return recv_ctxt to free list 208 * @rdma: controlling svcxprt_rdma 209 * @ctxt: object to return to the free list 210 * 211 */ 212 void svc_rdma_recv_ctxt_put(struct svcxprt_rdma *rdma, 213 struct svc_rdma_recv_ctxt *ctxt) 214 { 215 unsigned int i; 216 217 for (i = 0; i < ctxt->rc_page_count; i++) 218 put_page(ctxt->rc_pages[i]); 219 220 if (!ctxt->rc_temp) 221 llist_add(&ctxt->rc_node, &rdma->sc_recv_ctxts); 222 else 223 svc_rdma_recv_ctxt_destroy(rdma, ctxt); 224 } 225 226 /** 227 * svc_rdma_release_rqst - Release transport-specific per-rqst resources 228 * @rqstp: svc_rqst being released 229 * 230 * Ensure that the recv_ctxt is released whether or not a Reply 231 * was sent. For example, the client could close the connection, 232 * or svc_process could drop an RPC, before the Reply is sent. 233 */ 234 void svc_rdma_release_rqst(struct svc_rqst *rqstp) 235 { 236 struct svc_rdma_recv_ctxt *ctxt = rqstp->rq_xprt_ctxt; 237 struct svc_xprt *xprt = rqstp->rq_xprt; 238 struct svcxprt_rdma *rdma = 239 container_of(xprt, struct svcxprt_rdma, sc_xprt); 240 241 rqstp->rq_xprt_ctxt = NULL; 242 if (ctxt) 243 svc_rdma_recv_ctxt_put(rdma, ctxt); 244 } 245 246 static int __svc_rdma_post_recv(struct svcxprt_rdma *rdma, 247 struct svc_rdma_recv_ctxt *ctxt) 248 { 249 int ret; 250 251 svc_xprt_get(&rdma->sc_xprt); 252 ret = ib_post_recv(rdma->sc_qp, &ctxt->rc_recv_wr, NULL); 253 trace_svcrdma_post_recv(&ctxt->rc_recv_wr, ret); 254 if (ret) 255 goto err_post; 256 return 0; 257 258 err_post: 259 svc_rdma_recv_ctxt_put(rdma, ctxt); 260 svc_xprt_put(&rdma->sc_xprt); 261 return ret; 262 } 263 264 static int svc_rdma_post_recv(struct svcxprt_rdma *rdma) 265 { 266 struct svc_rdma_recv_ctxt *ctxt; 267 268 ctxt = svc_rdma_recv_ctxt_get(rdma); 269 if (!ctxt) 270 return -ENOMEM; 271 return __svc_rdma_post_recv(rdma, ctxt); 272 } 273 274 /** 275 * svc_rdma_post_recvs - Post initial set of Recv WRs 276 * @rdma: fresh svcxprt_rdma 277 * 278 * Returns true if successful, otherwise false. 279 */ 280 bool svc_rdma_post_recvs(struct svcxprt_rdma *rdma) 281 { 282 struct svc_rdma_recv_ctxt *ctxt; 283 unsigned int i; 284 int ret; 285 286 for (i = 0; i < rdma->sc_max_requests; i++) { 287 ctxt = svc_rdma_recv_ctxt_get(rdma); 288 if (!ctxt) 289 return false; 290 ctxt->rc_temp = true; 291 ret = __svc_rdma_post_recv(rdma, ctxt); 292 if (ret) 293 return false; 294 } 295 return true; 296 } 297 298 /** 299 * svc_rdma_wc_receive - Invoked by RDMA provider for each polled Receive WC 300 * @cq: Completion Queue context 301 * @wc: Work Completion object 302 * 303 * NB: The svc_xprt/svcxprt_rdma is pinned whenever it's possible that 304 * the Receive completion handler could be running. 305 */ 306 static void svc_rdma_wc_receive(struct ib_cq *cq, struct ib_wc *wc) 307 { 308 struct svcxprt_rdma *rdma = cq->cq_context; 309 struct ib_cqe *cqe = wc->wr_cqe; 310 struct svc_rdma_recv_ctxt *ctxt; 311 312 trace_svcrdma_wc_receive(wc); 313 314 /* WARNING: Only wc->wr_cqe and wc->status are reliable */ 315 ctxt = container_of(cqe, struct svc_rdma_recv_ctxt, rc_cqe); 316 317 if (wc->status != IB_WC_SUCCESS) 318 goto flushed; 319 320 if (svc_rdma_post_recv(rdma)) 321 goto post_err; 322 323 /* All wc fields are now known to be valid */ 324 ctxt->rc_byte_len = wc->byte_len; 325 ib_dma_sync_single_for_cpu(rdma->sc_pd->device, 326 ctxt->rc_recv_sge.addr, 327 wc->byte_len, DMA_FROM_DEVICE); 328 329 spin_lock(&rdma->sc_rq_dto_lock); 330 list_add_tail(&ctxt->rc_list, &rdma->sc_rq_dto_q); 331 /* Note the unlock pairs with the smp_rmb in svc_xprt_ready: */ 332 set_bit(XPT_DATA, &rdma->sc_xprt.xpt_flags); 333 spin_unlock(&rdma->sc_rq_dto_lock); 334 if (!test_bit(RDMAXPRT_CONN_PENDING, &rdma->sc_flags)) 335 svc_xprt_enqueue(&rdma->sc_xprt); 336 goto out; 337 338 flushed: 339 post_err: 340 svc_rdma_recv_ctxt_put(rdma, ctxt); 341 set_bit(XPT_CLOSE, &rdma->sc_xprt.xpt_flags); 342 svc_xprt_enqueue(&rdma->sc_xprt); 343 out: 344 svc_xprt_put(&rdma->sc_xprt); 345 } 346 347 /** 348 * svc_rdma_flush_recv_queues - Drain pending Receive work 349 * @rdma: svcxprt_rdma being shut down 350 * 351 */ 352 void svc_rdma_flush_recv_queues(struct svcxprt_rdma *rdma) 353 { 354 struct svc_rdma_recv_ctxt *ctxt; 355 356 while ((ctxt = svc_rdma_next_recv_ctxt(&rdma->sc_read_complete_q))) { 357 list_del(&ctxt->rc_list); 358 svc_rdma_recv_ctxt_put(rdma, ctxt); 359 } 360 while ((ctxt = svc_rdma_next_recv_ctxt(&rdma->sc_rq_dto_q))) { 361 list_del(&ctxt->rc_list); 362 svc_rdma_recv_ctxt_put(rdma, ctxt); 363 } 364 } 365 366 static void svc_rdma_build_arg_xdr(struct svc_rqst *rqstp, 367 struct svc_rdma_recv_ctxt *ctxt) 368 { 369 struct xdr_buf *arg = &rqstp->rq_arg; 370 371 arg->head[0].iov_base = ctxt->rc_recv_buf; 372 arg->head[0].iov_len = ctxt->rc_byte_len; 373 arg->tail[0].iov_base = NULL; 374 arg->tail[0].iov_len = 0; 375 arg->page_len = 0; 376 arg->page_base = 0; 377 arg->buflen = ctxt->rc_byte_len; 378 arg->len = ctxt->rc_byte_len; 379 } 380 381 /* This accommodates the largest possible Write chunk. 382 */ 383 #define MAX_BYTES_WRITE_CHUNK ((u32)(RPCSVC_MAXPAGES << PAGE_SHIFT)) 384 385 /* This accommodates the largest possible Position-Zero 386 * Read chunk or Reply chunk. 387 */ 388 #define MAX_BYTES_SPECIAL_CHUNK ((u32)((RPCSVC_MAXPAGES + 2) << PAGE_SHIFT)) 389 390 /* Sanity check the Read list. 391 * 392 * Implementation limits: 393 * - This implementation supports only one Read chunk. 394 * 395 * Sanity checks: 396 * - Read list does not overflow Receive buffer. 397 * - Segment size limited by largest NFS data payload. 398 * 399 * The segment count is limited to how many segments can 400 * fit in the transport header without overflowing the 401 * buffer. That's about 40 Read segments for a 1KB inline 402 * threshold. 403 * 404 * Return values: 405 * %true: Read list is valid. @rctxt's xdr_stream is updated 406 * to point to the first byte past the Read list. 407 * %false: Read list is corrupt. @rctxt's xdr_stream is left 408 * in an unknown state. 409 */ 410 static bool xdr_check_read_list(struct svc_rdma_recv_ctxt *rctxt) 411 { 412 u32 position, len; 413 bool first; 414 __be32 *p; 415 416 p = xdr_inline_decode(&rctxt->rc_stream, sizeof(*p)); 417 if (!p) 418 return false; 419 420 len = 0; 421 first = true; 422 while (*p != xdr_zero) { 423 p = xdr_inline_decode(&rctxt->rc_stream, 424 rpcrdma_readseg_maxsz * sizeof(*p)); 425 if (!p) 426 return false; 427 428 if (first) { 429 position = be32_to_cpup(p); 430 first = false; 431 } else if (be32_to_cpup(p) != position) { 432 return false; 433 } 434 p += 2; 435 len += be32_to_cpup(p); 436 437 p = xdr_inline_decode(&rctxt->rc_stream, sizeof(*p)); 438 if (!p) 439 return false; 440 } 441 return len <= MAX_BYTES_SPECIAL_CHUNK; 442 } 443 444 /* The segment count is limited to how many segments can 445 * fit in the transport header without overflowing the 446 * buffer. That's about 60 Write segments for a 1KB inline 447 * threshold. 448 */ 449 static bool xdr_check_write_chunk(struct svc_rdma_recv_ctxt *rctxt, u32 maxlen) 450 { 451 u32 i, segcount, total; 452 __be32 *p; 453 454 p = xdr_inline_decode(&rctxt->rc_stream, sizeof(*p)); 455 if (!p) 456 return false; 457 segcount = be32_to_cpup(p); 458 459 total = 0; 460 for (i = 0; i < segcount; i++) { 461 u32 handle, length; 462 u64 offset; 463 464 p = xdr_inline_decode(&rctxt->rc_stream, 465 rpcrdma_segment_maxsz * sizeof(*p)); 466 if (!p) 467 return false; 468 469 handle = be32_to_cpup(p++); 470 length = be32_to_cpup(p++); 471 xdr_decode_hyper(p, &offset); 472 trace_svcrdma_decode_wseg(handle, length, offset); 473 474 total += length; 475 } 476 return total <= maxlen; 477 } 478 479 /* Sanity check the Write list. 480 * 481 * Implementation limits: 482 * - This implementation currently supports only one Write chunk. 483 * 484 * Sanity checks: 485 * - Write list does not overflow Receive buffer. 486 * - Chunk size limited by largest NFS data payload. 487 * 488 * Return values: 489 * %true: Write list is valid. @rctxt's xdr_stream is updated 490 * to point to the first byte past the Write list. 491 * %false: Write list is corrupt. @rctxt's xdr_stream is left 492 * in an unknown state. 493 */ 494 static bool xdr_check_write_list(struct svc_rdma_recv_ctxt *rctxt) 495 { 496 u32 chcount = 0; 497 __be32 *p; 498 499 p = xdr_inline_decode(&rctxt->rc_stream, sizeof(*p)); 500 if (!p) 501 return false; 502 rctxt->rc_write_list = p; 503 while (*p != xdr_zero) { 504 if (!xdr_check_write_chunk(rctxt, MAX_BYTES_WRITE_CHUNK)) 505 return false; 506 ++chcount; 507 p = xdr_inline_decode(&rctxt->rc_stream, sizeof(*p)); 508 if (!p) 509 return false; 510 } 511 if (!chcount) 512 rctxt->rc_write_list = NULL; 513 return chcount < 2; 514 } 515 516 /* Sanity check the Reply chunk. 517 * 518 * Sanity checks: 519 * - Reply chunk does not overflow Receive buffer. 520 * - Chunk size limited by largest NFS data payload. 521 * 522 * Return values: 523 * %true: Reply chunk is valid. @rctxt's xdr_stream is updated 524 * to point to the first byte past the Reply chunk. 525 * %false: Reply chunk is corrupt. @rctxt's xdr_stream is left 526 * in an unknown state. 527 */ 528 static bool xdr_check_reply_chunk(struct svc_rdma_recv_ctxt *rctxt) 529 { 530 __be32 *p; 531 532 p = xdr_inline_decode(&rctxt->rc_stream, sizeof(*p)); 533 if (!p) 534 return false; 535 rctxt->rc_reply_chunk = p; 536 if (*p != xdr_zero) { 537 if (!xdr_check_write_chunk(rctxt, MAX_BYTES_SPECIAL_CHUNK)) 538 return false; 539 } else { 540 rctxt->rc_reply_chunk = NULL; 541 } 542 return true; 543 } 544 545 /* RPC-over-RDMA Version One private extension: Remote Invalidation. 546 * Responder's choice: requester signals it can handle Send With 547 * Invalidate, and responder chooses one R_key to invalidate. 548 * 549 * If there is exactly one distinct R_key in the received transport 550 * header, set rc_inv_rkey to that R_key. Otherwise, set it to zero. 551 * 552 * Perform this operation while the received transport header is 553 * still in the CPU cache. 554 */ 555 static void svc_rdma_get_inv_rkey(struct svcxprt_rdma *rdma, 556 struct svc_rdma_recv_ctxt *ctxt) 557 { 558 __be32 inv_rkey, *p; 559 u32 i, segcount; 560 561 ctxt->rc_inv_rkey = 0; 562 563 if (!rdma->sc_snd_w_inv) 564 return; 565 566 inv_rkey = xdr_zero; 567 p = ctxt->rc_recv_buf; 568 p += rpcrdma_fixed_maxsz; 569 570 /* Read list */ 571 while (*p++ != xdr_zero) { 572 p++; /* position */ 573 if (inv_rkey == xdr_zero) 574 inv_rkey = *p; 575 else if (inv_rkey != *p) 576 return; 577 p += 4; 578 } 579 580 /* Write list */ 581 while (*p++ != xdr_zero) { 582 segcount = be32_to_cpup(p++); 583 for (i = 0; i < segcount; i++) { 584 if (inv_rkey == xdr_zero) 585 inv_rkey = *p; 586 else if (inv_rkey != *p) 587 return; 588 p += 4; 589 } 590 } 591 592 /* Reply chunk */ 593 if (*p++ != xdr_zero) { 594 segcount = be32_to_cpup(p++); 595 for (i = 0; i < segcount; i++) { 596 if (inv_rkey == xdr_zero) 597 inv_rkey = *p; 598 else if (inv_rkey != *p) 599 return; 600 p += 4; 601 } 602 } 603 604 ctxt->rc_inv_rkey = be32_to_cpu(inv_rkey); 605 } 606 607 /** 608 * svc_rdma_xdr_decode_req - Decode the transport header 609 * @rq_arg: xdr_buf containing ingress RPC/RDMA message 610 * @rctxt: state of decoding 611 * 612 * On entry, xdr->head[0].iov_base points to first byte of the 613 * RPC-over-RDMA transport header. 614 * 615 * On successful exit, head[0] points to first byte past the 616 * RPC-over-RDMA header. For RDMA_MSG, this is the RPC message. 617 * 618 * The length of the RPC-over-RDMA header is returned. 619 * 620 * Assumptions: 621 * - The transport header is entirely contained in the head iovec. 622 */ 623 static int svc_rdma_xdr_decode_req(struct xdr_buf *rq_arg, 624 struct svc_rdma_recv_ctxt *rctxt) 625 { 626 __be32 *p, *rdma_argp; 627 unsigned int hdr_len; 628 629 rdma_argp = rq_arg->head[0].iov_base; 630 xdr_init_decode(&rctxt->rc_stream, rq_arg, rdma_argp, NULL); 631 632 p = xdr_inline_decode(&rctxt->rc_stream, 633 rpcrdma_fixed_maxsz * sizeof(*p)); 634 if (unlikely(!p)) 635 goto out_short; 636 p++; 637 if (*p != rpcrdma_version) 638 goto out_version; 639 p += 2; 640 switch (*p) { 641 case rdma_msg: 642 break; 643 case rdma_nomsg: 644 break; 645 case rdma_done: 646 goto out_drop; 647 case rdma_error: 648 goto out_drop; 649 default: 650 goto out_proc; 651 } 652 653 if (!xdr_check_read_list(rctxt)) 654 goto out_inval; 655 if (!xdr_check_write_list(rctxt)) 656 goto out_inval; 657 if (!xdr_check_reply_chunk(rctxt)) 658 goto out_inval; 659 660 rq_arg->head[0].iov_base = rctxt->rc_stream.p; 661 hdr_len = xdr_stream_pos(&rctxt->rc_stream); 662 rq_arg->head[0].iov_len -= hdr_len; 663 rq_arg->len -= hdr_len; 664 trace_svcrdma_decode_rqst(rdma_argp, hdr_len); 665 return hdr_len; 666 667 out_short: 668 trace_svcrdma_decode_short_err(rq_arg->len); 669 return -EINVAL; 670 671 out_version: 672 trace_svcrdma_decode_badvers_err(rdma_argp); 673 return -EPROTONOSUPPORT; 674 675 out_drop: 676 trace_svcrdma_decode_drop_err(rdma_argp); 677 return 0; 678 679 out_proc: 680 trace_svcrdma_decode_badproc_err(rdma_argp); 681 return -EINVAL; 682 683 out_inval: 684 trace_svcrdma_decode_parse_err(rdma_argp); 685 return -EINVAL; 686 } 687 688 static void rdma_read_complete(struct svc_rqst *rqstp, 689 struct svc_rdma_recv_ctxt *head) 690 { 691 int page_no; 692 693 /* Move Read chunk pages to rqstp so that they will be released 694 * when svc_process is done with them. 695 */ 696 for (page_no = 0; page_no < head->rc_page_count; page_no++) { 697 put_page(rqstp->rq_pages[page_no]); 698 rqstp->rq_pages[page_no] = head->rc_pages[page_no]; 699 } 700 head->rc_page_count = 0; 701 702 /* Point rq_arg.pages past header */ 703 rqstp->rq_arg.pages = &rqstp->rq_pages[head->rc_hdr_count]; 704 rqstp->rq_arg.page_len = head->rc_arg.page_len; 705 706 /* rq_respages starts after the last arg page */ 707 rqstp->rq_respages = &rqstp->rq_pages[page_no]; 708 rqstp->rq_next_page = rqstp->rq_respages + 1; 709 710 /* Rebuild rq_arg head and tail. */ 711 rqstp->rq_arg.head[0] = head->rc_arg.head[0]; 712 rqstp->rq_arg.tail[0] = head->rc_arg.tail[0]; 713 rqstp->rq_arg.len = head->rc_arg.len; 714 rqstp->rq_arg.buflen = head->rc_arg.buflen; 715 } 716 717 static void svc_rdma_send_error(struct svcxprt_rdma *xprt, 718 __be32 *rdma_argp, int status) 719 { 720 struct svc_rdma_send_ctxt *ctxt; 721 __be32 *p; 722 int ret; 723 724 ctxt = svc_rdma_send_ctxt_get(xprt); 725 if (!ctxt) 726 return; 727 728 p = xdr_reserve_space(&ctxt->sc_stream, 729 rpcrdma_fixed_maxsz * sizeof(*p)); 730 if (!p) 731 goto put_ctxt; 732 733 *p++ = *rdma_argp; 734 *p++ = *(rdma_argp + 1); 735 *p++ = xprt->sc_fc_credits; 736 *p = rdma_error; 737 738 switch (status) { 739 case -EPROTONOSUPPORT: 740 p = xdr_reserve_space(&ctxt->sc_stream, 3 * sizeof(*p)); 741 if (!p) 742 goto put_ctxt; 743 744 *p++ = err_vers; 745 *p++ = rpcrdma_version; 746 *p = rpcrdma_version; 747 trace_svcrdma_err_vers(*rdma_argp); 748 break; 749 default: 750 p = xdr_reserve_space(&ctxt->sc_stream, sizeof(*p)); 751 if (!p) 752 goto put_ctxt; 753 754 *p = err_chunk; 755 trace_svcrdma_err_chunk(*rdma_argp); 756 } 757 758 ctxt->sc_send_wr.num_sge = 1; 759 ctxt->sc_send_wr.opcode = IB_WR_SEND; 760 ctxt->sc_sges[0].length = ctxt->sc_hdrbuf.len; 761 ret = svc_rdma_send(xprt, &ctxt->sc_send_wr); 762 if (ret) 763 goto put_ctxt; 764 return; 765 766 put_ctxt: 767 svc_rdma_send_ctxt_put(xprt, ctxt); 768 } 769 770 /* By convention, backchannel calls arrive via rdma_msg type 771 * messages, and never populate the chunk lists. This makes 772 * the RPC/RDMA header small and fixed in size, so it is 773 * straightforward to check the RPC header's direction field. 774 */ 775 static bool svc_rdma_is_backchannel_reply(struct svc_xprt *xprt, 776 __be32 *rdma_resp) 777 { 778 __be32 *p; 779 780 if (!xprt->xpt_bc_xprt) 781 return false; 782 783 p = rdma_resp + 3; 784 if (*p++ != rdma_msg) 785 return false; 786 787 if (*p++ != xdr_zero) 788 return false; 789 if (*p++ != xdr_zero) 790 return false; 791 if (*p++ != xdr_zero) 792 return false; 793 794 /* XID sanity */ 795 if (*p++ != *rdma_resp) 796 return false; 797 /* call direction */ 798 if (*p == cpu_to_be32(RPC_CALL)) 799 return false; 800 801 return true; 802 } 803 804 /** 805 * svc_rdma_recvfrom - Receive an RPC call 806 * @rqstp: request structure into which to receive an RPC Call 807 * 808 * Returns: 809 * The positive number of bytes in the RPC Call message, 810 * %0 if there were no Calls ready to return, 811 * %-EINVAL if the Read chunk data is too large, 812 * %-ENOMEM if rdma_rw context pool was exhausted, 813 * %-ENOTCONN if posting failed (connection is lost), 814 * %-EIO if rdma_rw initialization failed (DMA mapping, etc). 815 * 816 * Called in a loop when XPT_DATA is set. XPT_DATA is cleared only 817 * when there are no remaining ctxt's to process. 818 * 819 * The next ctxt is removed from the "receive" lists. 820 * 821 * - If the ctxt completes a Read, then finish assembling the Call 822 * message and return the number of bytes in the message. 823 * 824 * - If the ctxt completes a Receive, then construct the Call 825 * message from the contents of the Receive buffer. 826 * 827 * - If there are no Read chunks in this message, then finish 828 * assembling the Call message and return the number of bytes 829 * in the message. 830 * 831 * - If there are Read chunks in this message, post Read WRs to 832 * pull that payload and return 0. 833 */ 834 int svc_rdma_recvfrom(struct svc_rqst *rqstp) 835 { 836 struct svc_xprt *xprt = rqstp->rq_xprt; 837 struct svcxprt_rdma *rdma_xprt = 838 container_of(xprt, struct svcxprt_rdma, sc_xprt); 839 struct svc_rdma_recv_ctxt *ctxt; 840 __be32 *p; 841 int ret; 842 843 rqstp->rq_xprt_ctxt = NULL; 844 845 spin_lock(&rdma_xprt->sc_rq_dto_lock); 846 ctxt = svc_rdma_next_recv_ctxt(&rdma_xprt->sc_read_complete_q); 847 if (ctxt) { 848 list_del(&ctxt->rc_list); 849 spin_unlock(&rdma_xprt->sc_rq_dto_lock); 850 rdma_read_complete(rqstp, ctxt); 851 goto complete; 852 } 853 ctxt = svc_rdma_next_recv_ctxt(&rdma_xprt->sc_rq_dto_q); 854 if (!ctxt) { 855 /* No new incoming requests, terminate the loop */ 856 clear_bit(XPT_DATA, &xprt->xpt_flags); 857 spin_unlock(&rdma_xprt->sc_rq_dto_lock); 858 return 0; 859 } 860 list_del(&ctxt->rc_list); 861 spin_unlock(&rdma_xprt->sc_rq_dto_lock); 862 863 atomic_inc(&rdma_stat_recv); 864 865 svc_rdma_build_arg_xdr(rqstp, ctxt); 866 867 /* Prevent svc_xprt_release from releasing pages in rq_pages 868 * if we return 0 or an error. 869 */ 870 rqstp->rq_respages = rqstp->rq_pages; 871 rqstp->rq_next_page = rqstp->rq_respages; 872 873 p = (__be32 *)rqstp->rq_arg.head[0].iov_base; 874 ret = svc_rdma_xdr_decode_req(&rqstp->rq_arg, ctxt); 875 if (ret < 0) 876 goto out_err; 877 if (ret == 0) 878 goto out_drop; 879 rqstp->rq_xprt_hlen = ret; 880 881 if (svc_rdma_is_backchannel_reply(xprt, p)) 882 goto out_backchannel; 883 884 svc_rdma_get_inv_rkey(rdma_xprt, ctxt); 885 886 p += rpcrdma_fixed_maxsz; 887 if (*p != xdr_zero) 888 goto out_readchunk; 889 890 complete: 891 rqstp->rq_xprt_ctxt = ctxt; 892 rqstp->rq_prot = IPPROTO_MAX; 893 svc_xprt_copy_addrs(rqstp, xprt); 894 return rqstp->rq_arg.len; 895 896 out_readchunk: 897 ret = svc_rdma_recv_read_chunk(rdma_xprt, rqstp, ctxt, p); 898 if (ret < 0) 899 goto out_postfail; 900 return 0; 901 902 out_err: 903 svc_rdma_send_error(rdma_xprt, p, ret); 904 svc_rdma_recv_ctxt_put(rdma_xprt, ctxt); 905 return 0; 906 907 out_postfail: 908 if (ret == -EINVAL) 909 svc_rdma_send_error(rdma_xprt, p, ret); 910 svc_rdma_recv_ctxt_put(rdma_xprt, ctxt); 911 return ret; 912 913 out_backchannel: 914 svc_rdma_handle_bc_reply(rqstp, ctxt); 915 out_drop: 916 svc_rdma_recv_ctxt_put(rdma_xprt, ctxt); 917 return 0; 918 } 919