1 /* 2 * Copyright (c) 2016, 2017 Oracle. All rights reserved. 3 * Copyright (c) 2014 Open Grid Computing, Inc. All rights reserved. 4 * Copyright (c) 2005-2006 Network Appliance, Inc. All rights reserved. 5 * 6 * This software is available to you under a choice of one of two 7 * licenses. You may choose to be licensed under the terms of the GNU 8 * General Public License (GPL) Version 2, available from the file 9 * COPYING in the main directory of this source tree, or the BSD-type 10 * license below: 11 * 12 * Redistribution and use in source and binary forms, with or without 13 * modification, are permitted provided that the following conditions 14 * are met: 15 * 16 * Redistributions of source code must retain the above copyright 17 * notice, this list of conditions and the following disclaimer. 18 * 19 * Redistributions in binary form must reproduce the above 20 * copyright notice, this list of conditions and the following 21 * disclaimer in the documentation and/or other materials provided 22 * with the distribution. 23 * 24 * Neither the name of the Network Appliance, Inc. nor the names of 25 * its contributors may be used to endorse or promote products 26 * derived from this software without specific prior written 27 * permission. 28 * 29 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 30 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 31 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 32 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 33 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 34 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 35 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 36 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 37 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 38 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 39 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 40 * 41 * Author: Tom Tucker <tom@opengridcomputing.com> 42 */ 43 44 /* Operation 45 * 46 * The main entry point is svc_rdma_recvfrom. This is called from 47 * svc_recv when the transport indicates there is incoming data to 48 * be read. "Data Ready" is signaled when an RDMA Receive completes, 49 * or when a set of RDMA Reads complete. 50 * 51 * An svc_rqst is passed in. This structure contains an array of 52 * free pages (rq_pages) that will contain the incoming RPC message. 53 * 54 * Short messages are moved directly into svc_rqst::rq_arg, and 55 * the RPC Call is ready to be processed by the Upper Layer. 56 * svc_rdma_recvfrom returns the length of the RPC Call message, 57 * completing the reception of the RPC Call. 58 * 59 * However, when an incoming message has Read chunks, 60 * svc_rdma_recvfrom must post RDMA Reads to pull the RPC Call's 61 * data payload from the client. svc_rdma_recvfrom sets up the 62 * RDMA Reads using pages in svc_rqst::rq_pages, which are 63 * transferred to an svc_rdma_op_ctxt for the duration of the 64 * I/O. svc_rdma_recvfrom then returns zero, since the RPC message 65 * is still not yet ready. 66 * 67 * When the Read chunk payloads have become available on the 68 * server, "Data Ready" is raised again, and svc_recv calls 69 * svc_rdma_recvfrom again. This second call may use a different 70 * svc_rqst than the first one, thus any information that needs 71 * to be preserved across these two calls is kept in an 72 * svc_rdma_op_ctxt. 73 * 74 * The second call to svc_rdma_recvfrom performs final assembly 75 * of the RPC Call message, using the RDMA Read sink pages kept in 76 * the svc_rdma_op_ctxt. The xdr_buf is copied from the 77 * svc_rdma_op_ctxt to the second svc_rqst. The second call returns 78 * the length of the completed RPC Call message. 79 * 80 * Page Management 81 * 82 * Pages under I/O must be transferred from the first svc_rqst to an 83 * svc_rdma_op_ctxt before the first svc_rdma_recvfrom call returns. 84 * 85 * The first svc_rqst supplies pages for RDMA Reads. These are moved 86 * from rqstp::rq_pages into ctxt::pages. The consumed elements of 87 * the rq_pages array are set to NULL and refilled with the first 88 * svc_rdma_recvfrom call returns. 89 * 90 * During the second svc_rdma_recvfrom call, RDMA Read sink pages 91 * are transferred from the svc_rdma_op_ctxt to the second svc_rqst 92 * (see rdma_read_complete() below). 93 */ 94 95 #include <asm/unaligned.h> 96 #include <rdma/ib_verbs.h> 97 #include <rdma/rdma_cm.h> 98 99 #include <linux/spinlock.h> 100 101 #include <linux/sunrpc/xdr.h> 102 #include <linux/sunrpc/debug.h> 103 #include <linux/sunrpc/rpc_rdma.h> 104 #include <linux/sunrpc/svc_rdma.h> 105 106 #define RPCDBG_FACILITY RPCDBG_SVCXPRT 107 108 /* 109 * Replace the pages in the rq_argpages array with the pages from the SGE in 110 * the RDMA_RECV completion. The SGL should contain full pages up until the 111 * last one. 112 */ 113 static void svc_rdma_build_arg_xdr(struct svc_rqst *rqstp, 114 struct svc_rdma_op_ctxt *ctxt) 115 { 116 struct page *page; 117 int sge_no; 118 u32 len; 119 120 /* The reply path assumes the Call's transport header resides 121 * in rqstp->rq_pages[0]. 122 */ 123 page = ctxt->pages[0]; 124 put_page(rqstp->rq_pages[0]); 125 rqstp->rq_pages[0] = page; 126 127 /* Set up the XDR head */ 128 rqstp->rq_arg.head[0].iov_base = page_address(page); 129 rqstp->rq_arg.head[0].iov_len = 130 min_t(size_t, ctxt->byte_len, ctxt->sge[0].length); 131 rqstp->rq_arg.len = ctxt->byte_len; 132 rqstp->rq_arg.buflen = ctxt->byte_len; 133 134 /* Compute bytes past head in the SGL */ 135 len = ctxt->byte_len - rqstp->rq_arg.head[0].iov_len; 136 137 /* If data remains, store it in the pagelist */ 138 rqstp->rq_arg.page_len = len; 139 rqstp->rq_arg.page_base = 0; 140 141 sge_no = 1; 142 while (len && sge_no < ctxt->count) { 143 page = ctxt->pages[sge_no]; 144 put_page(rqstp->rq_pages[sge_no]); 145 rqstp->rq_pages[sge_no] = page; 146 len -= min_t(u32, len, ctxt->sge[sge_no].length); 147 sge_no++; 148 } 149 rqstp->rq_respages = &rqstp->rq_pages[sge_no]; 150 rqstp->rq_next_page = rqstp->rq_respages + 1; 151 152 /* If not all pages were used from the SGL, free the remaining ones */ 153 len = sge_no; 154 while (sge_no < ctxt->count) { 155 page = ctxt->pages[sge_no++]; 156 put_page(page); 157 } 158 ctxt->count = len; 159 160 /* Set up tail */ 161 rqstp->rq_arg.tail[0].iov_base = NULL; 162 rqstp->rq_arg.tail[0].iov_len = 0; 163 } 164 165 /* This accommodates the largest possible Write chunk, 166 * in one segment. 167 */ 168 #define MAX_BYTES_WRITE_SEG ((u32)(RPCSVC_MAXPAGES << PAGE_SHIFT)) 169 170 /* This accommodates the largest possible Position-Zero 171 * Read chunk or Reply chunk, in one segment. 172 */ 173 #define MAX_BYTES_SPECIAL_SEG ((u32)((RPCSVC_MAXPAGES + 2) << PAGE_SHIFT)) 174 175 /* Sanity check the Read list. 176 * 177 * Implementation limits: 178 * - This implementation supports only one Read chunk. 179 * 180 * Sanity checks: 181 * - Read list does not overflow buffer. 182 * - Segment size limited by largest NFS data payload. 183 * 184 * The segment count is limited to how many segments can 185 * fit in the transport header without overflowing the 186 * buffer. That's about 40 Read segments for a 1KB inline 187 * threshold. 188 * 189 * Returns pointer to the following Write list. 190 */ 191 static __be32 *xdr_check_read_list(__be32 *p, const __be32 *end) 192 { 193 u32 position; 194 bool first; 195 196 first = true; 197 while (*p++ != xdr_zero) { 198 if (first) { 199 position = be32_to_cpup(p++); 200 first = false; 201 } else if (be32_to_cpup(p++) != position) { 202 return NULL; 203 } 204 p++; /* handle */ 205 if (be32_to_cpup(p++) > MAX_BYTES_SPECIAL_SEG) 206 return NULL; 207 p += 2; /* offset */ 208 209 if (p > end) 210 return NULL; 211 } 212 return p; 213 } 214 215 /* The segment count is limited to how many segments can 216 * fit in the transport header without overflowing the 217 * buffer. That's about 60 Write segments for a 1KB inline 218 * threshold. 219 */ 220 static __be32 *xdr_check_write_chunk(__be32 *p, const __be32 *end, 221 u32 maxlen) 222 { 223 u32 i, segcount; 224 225 segcount = be32_to_cpup(p++); 226 for (i = 0; i < segcount; i++) { 227 p++; /* handle */ 228 if (be32_to_cpup(p++) > maxlen) 229 return NULL; 230 p += 2; /* offset */ 231 232 if (p > end) 233 return NULL; 234 } 235 236 return p; 237 } 238 239 /* Sanity check the Write list. 240 * 241 * Implementation limits: 242 * - This implementation supports only one Write chunk. 243 * 244 * Sanity checks: 245 * - Write list does not overflow buffer. 246 * - Segment size limited by largest NFS data payload. 247 * 248 * Returns pointer to the following Reply chunk. 249 */ 250 static __be32 *xdr_check_write_list(__be32 *p, const __be32 *end) 251 { 252 u32 chcount; 253 254 chcount = 0; 255 while (*p++ != xdr_zero) { 256 p = xdr_check_write_chunk(p, end, MAX_BYTES_WRITE_SEG); 257 if (!p) 258 return NULL; 259 if (chcount++ > 1) 260 return NULL; 261 } 262 return p; 263 } 264 265 /* Sanity check the Reply chunk. 266 * 267 * Sanity checks: 268 * - Reply chunk does not overflow buffer. 269 * - Segment size limited by largest NFS data payload. 270 * 271 * Returns pointer to the following RPC header. 272 */ 273 static __be32 *xdr_check_reply_chunk(__be32 *p, const __be32 *end) 274 { 275 if (*p++ != xdr_zero) { 276 p = xdr_check_write_chunk(p, end, MAX_BYTES_SPECIAL_SEG); 277 if (!p) 278 return NULL; 279 } 280 return p; 281 } 282 283 /* On entry, xdr->head[0].iov_base points to first byte in the 284 * RPC-over-RDMA header. 285 * 286 * On successful exit, head[0] points to first byte past the 287 * RPC-over-RDMA header. For RDMA_MSG, this is the RPC message. 288 * The length of the RPC-over-RDMA header is returned. 289 * 290 * Assumptions: 291 * - The transport header is entirely contained in the head iovec. 292 */ 293 static int svc_rdma_xdr_decode_req(struct xdr_buf *rq_arg) 294 { 295 __be32 *p, *end, *rdma_argp; 296 unsigned int hdr_len; 297 char *proc; 298 299 /* Verify that there's enough bytes for header + something */ 300 if (rq_arg->len <= RPCRDMA_HDRLEN_ERR) 301 goto out_short; 302 303 rdma_argp = rq_arg->head[0].iov_base; 304 if (*(rdma_argp + 1) != rpcrdma_version) 305 goto out_version; 306 307 switch (*(rdma_argp + 3)) { 308 case rdma_msg: 309 proc = "RDMA_MSG"; 310 break; 311 case rdma_nomsg: 312 proc = "RDMA_NOMSG"; 313 break; 314 315 case rdma_done: 316 goto out_drop; 317 318 case rdma_error: 319 goto out_drop; 320 321 default: 322 goto out_proc; 323 } 324 325 end = (__be32 *)((unsigned long)rdma_argp + rq_arg->len); 326 p = xdr_check_read_list(rdma_argp + 4, end); 327 if (!p) 328 goto out_inval; 329 p = xdr_check_write_list(p, end); 330 if (!p) 331 goto out_inval; 332 p = xdr_check_reply_chunk(p, end); 333 if (!p) 334 goto out_inval; 335 if (p > end) 336 goto out_inval; 337 338 rq_arg->head[0].iov_base = p; 339 hdr_len = (unsigned long)p - (unsigned long)rdma_argp; 340 rq_arg->head[0].iov_len -= hdr_len; 341 rq_arg->len -= hdr_len; 342 dprintk("svcrdma: received %s request for XID 0x%08x, hdr_len=%u\n", 343 proc, be32_to_cpup(rdma_argp), hdr_len); 344 return hdr_len; 345 346 out_short: 347 dprintk("svcrdma: header too short = %d\n", rq_arg->len); 348 return -EINVAL; 349 350 out_version: 351 dprintk("svcrdma: bad xprt version: %u\n", 352 be32_to_cpup(rdma_argp + 1)); 353 return -EPROTONOSUPPORT; 354 355 out_drop: 356 dprintk("svcrdma: dropping RDMA_DONE/ERROR message\n"); 357 return 0; 358 359 out_proc: 360 dprintk("svcrdma: bad rdma procedure (%u)\n", 361 be32_to_cpup(rdma_argp + 3)); 362 return -EINVAL; 363 364 out_inval: 365 dprintk("svcrdma: failed to parse transport header\n"); 366 return -EINVAL; 367 } 368 369 static void rdma_read_complete(struct svc_rqst *rqstp, 370 struct svc_rdma_op_ctxt *head) 371 { 372 int page_no; 373 374 /* Copy RPC pages */ 375 for (page_no = 0; page_no < head->count; page_no++) { 376 put_page(rqstp->rq_pages[page_no]); 377 rqstp->rq_pages[page_no] = head->pages[page_no]; 378 } 379 380 /* Point rq_arg.pages past header */ 381 rqstp->rq_arg.pages = &rqstp->rq_pages[head->hdr_count]; 382 rqstp->rq_arg.page_len = head->arg.page_len; 383 384 /* rq_respages starts after the last arg page */ 385 rqstp->rq_respages = &rqstp->rq_pages[page_no]; 386 rqstp->rq_next_page = rqstp->rq_respages + 1; 387 388 /* Rebuild rq_arg head and tail. */ 389 rqstp->rq_arg.head[0] = head->arg.head[0]; 390 rqstp->rq_arg.tail[0] = head->arg.tail[0]; 391 rqstp->rq_arg.len = head->arg.len; 392 rqstp->rq_arg.buflen = head->arg.buflen; 393 } 394 395 static void svc_rdma_send_error(struct svcxprt_rdma *xprt, 396 __be32 *rdma_argp, int status) 397 { 398 struct svc_rdma_op_ctxt *ctxt; 399 __be32 *p, *err_msgp; 400 unsigned int length; 401 struct page *page; 402 int ret; 403 404 page = alloc_page(GFP_KERNEL); 405 if (!page) 406 return; 407 err_msgp = page_address(page); 408 409 p = err_msgp; 410 *p++ = *rdma_argp; 411 *p++ = *(rdma_argp + 1); 412 *p++ = xprt->sc_fc_credits; 413 *p++ = rdma_error; 414 if (status == -EPROTONOSUPPORT) { 415 *p++ = err_vers; 416 *p++ = rpcrdma_version; 417 *p++ = rpcrdma_version; 418 } else { 419 *p++ = err_chunk; 420 } 421 length = (unsigned long)p - (unsigned long)err_msgp; 422 423 /* Map transport header; no RPC message payload */ 424 ctxt = svc_rdma_get_context(xprt); 425 ret = svc_rdma_map_reply_hdr(xprt, ctxt, err_msgp, length); 426 if (ret) { 427 dprintk("svcrdma: Error %d mapping send for protocol error\n", 428 ret); 429 return; 430 } 431 432 ret = svc_rdma_post_send_wr(xprt, ctxt, 1, 0); 433 if (ret) { 434 dprintk("svcrdma: Error %d posting send for protocol error\n", 435 ret); 436 svc_rdma_unmap_dma(ctxt); 437 svc_rdma_put_context(ctxt, 1); 438 } 439 } 440 441 /* By convention, backchannel calls arrive via rdma_msg type 442 * messages, and never populate the chunk lists. This makes 443 * the RPC/RDMA header small and fixed in size, so it is 444 * straightforward to check the RPC header's direction field. 445 */ 446 static bool svc_rdma_is_backchannel_reply(struct svc_xprt *xprt, 447 __be32 *rdma_resp) 448 { 449 __be32 *p; 450 451 if (!xprt->xpt_bc_xprt) 452 return false; 453 454 p = rdma_resp + 3; 455 if (*p++ != rdma_msg) 456 return false; 457 458 if (*p++ != xdr_zero) 459 return false; 460 if (*p++ != xdr_zero) 461 return false; 462 if (*p++ != xdr_zero) 463 return false; 464 465 /* XID sanity */ 466 if (*p++ != *rdma_resp) 467 return false; 468 /* call direction */ 469 if (*p == cpu_to_be32(RPC_CALL)) 470 return false; 471 472 return true; 473 } 474 475 /** 476 * svc_rdma_recvfrom - Receive an RPC call 477 * @rqstp: request structure into which to receive an RPC Call 478 * 479 * Returns: 480 * The positive number of bytes in the RPC Call message, 481 * %0 if there were no Calls ready to return, 482 * %-EINVAL if the Read chunk data is too large, 483 * %-ENOMEM if rdma_rw context pool was exhausted, 484 * %-ENOTCONN if posting failed (connection is lost), 485 * %-EIO if rdma_rw initialization failed (DMA mapping, etc). 486 * 487 * Called in a loop when XPT_DATA is set. XPT_DATA is cleared only 488 * when there are no remaining ctxt's to process. 489 * 490 * The next ctxt is removed from the "receive" lists. 491 * 492 * - If the ctxt completes a Read, then finish assembling the Call 493 * message and return the number of bytes in the message. 494 * 495 * - If the ctxt completes a Receive, then construct the Call 496 * message from the contents of the Receive buffer. 497 * 498 * - If there are no Read chunks in this message, then finish 499 * assembling the Call message and return the number of bytes 500 * in the message. 501 * 502 * - If there are Read chunks in this message, post Read WRs to 503 * pull that payload and return 0. 504 */ 505 int svc_rdma_recvfrom(struct svc_rqst *rqstp) 506 { 507 struct svc_xprt *xprt = rqstp->rq_xprt; 508 struct svcxprt_rdma *rdma_xprt = 509 container_of(xprt, struct svcxprt_rdma, sc_xprt); 510 struct svc_rdma_op_ctxt *ctxt; 511 __be32 *p; 512 int ret; 513 514 spin_lock(&rdma_xprt->sc_rq_dto_lock); 515 if (!list_empty(&rdma_xprt->sc_read_complete_q)) { 516 ctxt = list_first_entry(&rdma_xprt->sc_read_complete_q, 517 struct svc_rdma_op_ctxt, list); 518 list_del(&ctxt->list); 519 spin_unlock(&rdma_xprt->sc_rq_dto_lock); 520 rdma_read_complete(rqstp, ctxt); 521 goto complete; 522 } else if (!list_empty(&rdma_xprt->sc_rq_dto_q)) { 523 ctxt = list_first_entry(&rdma_xprt->sc_rq_dto_q, 524 struct svc_rdma_op_ctxt, list); 525 list_del(&ctxt->list); 526 } else { 527 /* No new incoming requests, terminate the loop */ 528 clear_bit(XPT_DATA, &xprt->xpt_flags); 529 spin_unlock(&rdma_xprt->sc_rq_dto_lock); 530 return 0; 531 } 532 spin_unlock(&rdma_xprt->sc_rq_dto_lock); 533 534 dprintk("svcrdma: recvfrom: ctxt=%p on xprt=%p, rqstp=%p\n", 535 ctxt, rdma_xprt, rqstp); 536 atomic_inc(&rdma_stat_recv); 537 538 svc_rdma_build_arg_xdr(rqstp, ctxt); 539 540 p = (__be32 *)rqstp->rq_arg.head[0].iov_base; 541 ret = svc_rdma_xdr_decode_req(&rqstp->rq_arg); 542 if (ret < 0) 543 goto out_err; 544 if (ret == 0) 545 goto out_drop; 546 rqstp->rq_xprt_hlen = ret; 547 548 if (svc_rdma_is_backchannel_reply(xprt, p)) { 549 ret = svc_rdma_handle_bc_reply(xprt->xpt_bc_xprt, p, 550 &rqstp->rq_arg); 551 svc_rdma_put_context(ctxt, 0); 552 return ret; 553 } 554 555 p += rpcrdma_fixed_maxsz; 556 if (*p != xdr_zero) 557 goto out_readchunk; 558 559 complete: 560 svc_rdma_put_context(ctxt, 0); 561 dprintk("svcrdma: recvfrom: xprt=%p, rqstp=%p, rq_arg.len=%u\n", 562 rdma_xprt, rqstp, rqstp->rq_arg.len); 563 rqstp->rq_prot = IPPROTO_MAX; 564 svc_xprt_copy_addrs(rqstp, xprt); 565 return rqstp->rq_arg.len; 566 567 out_readchunk: 568 ret = svc_rdma_recv_read_chunk(rdma_xprt, rqstp, ctxt, p); 569 if (ret < 0) 570 goto out_postfail; 571 return 0; 572 573 out_err: 574 svc_rdma_send_error(rdma_xprt, p, ret); 575 svc_rdma_put_context(ctxt, 0); 576 return 0; 577 578 out_postfail: 579 if (ret == -EINVAL) 580 svc_rdma_send_error(rdma_xprt, p, ret); 581 svc_rdma_put_context(ctxt, 1); 582 return ret; 583 584 out_drop: 585 svc_rdma_put_context(ctxt, 1); 586 return 0; 587 } 588