1 /* 2 * Copyright (c) 2016 Oracle. All rights reserved. 3 * Copyright (c) 2014 Open Grid Computing, Inc. All rights reserved. 4 * Copyright (c) 2005-2006 Network Appliance, Inc. All rights reserved. 5 * 6 * This software is available to you under a choice of one of two 7 * licenses. You may choose to be licensed under the terms of the GNU 8 * General Public License (GPL) Version 2, available from the file 9 * COPYING in the main directory of this source tree, or the BSD-type 10 * license below: 11 * 12 * Redistribution and use in source and binary forms, with or without 13 * modification, are permitted provided that the following conditions 14 * are met: 15 * 16 * Redistributions of source code must retain the above copyright 17 * notice, this list of conditions and the following disclaimer. 18 * 19 * Redistributions in binary form must reproduce the above 20 * copyright notice, this list of conditions and the following 21 * disclaimer in the documentation and/or other materials provided 22 * with the distribution. 23 * 24 * Neither the name of the Network Appliance, Inc. nor the names of 25 * its contributors may be used to endorse or promote products 26 * derived from this software without specific prior written 27 * permission. 28 * 29 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 30 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 31 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 32 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 33 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 34 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 35 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 36 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 37 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 38 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 39 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 40 * 41 * Author: Tom Tucker <tom@opengridcomputing.com> 42 */ 43 44 /* Operation 45 * 46 * The main entry point is svc_rdma_sendto. This is called by the 47 * RPC server when an RPC Reply is ready to be transmitted to a client. 48 * 49 * The passed-in svc_rqst contains a struct xdr_buf which holds an 50 * XDR-encoded RPC Reply message. sendto must construct the RPC-over-RDMA 51 * transport header, post all Write WRs needed for this Reply, then post 52 * a Send WR conveying the transport header and the RPC message itself to 53 * the client. 54 * 55 * svc_rdma_sendto must fully transmit the Reply before returning, as 56 * the svc_rqst will be recycled as soon as sendto returns. Remaining 57 * resources referred to by the svc_rqst are also recycled at that time. 58 * Therefore any resources that must remain longer must be detached 59 * from the svc_rqst and released later. 60 * 61 * Page Management 62 * 63 * The I/O that performs Reply transmission is asynchronous, and may 64 * complete well after sendto returns. Thus pages under I/O must be 65 * removed from the svc_rqst before sendto returns. 66 * 67 * The logic here depends on Send Queue and completion ordering. Since 68 * the Send WR is always posted last, it will always complete last. Thus 69 * when it completes, it is guaranteed that all previous Write WRs have 70 * also completed. 71 * 72 * Write WRs are constructed and posted. Each Write segment gets its own 73 * svc_rdma_rw_ctxt, allowing the Write completion handler to find and 74 * DMA-unmap the pages under I/O for that Write segment. The Write 75 * completion handler does not release any pages. 76 * 77 * When the Send WR is constructed, it also gets its own svc_rdma_op_ctxt. 78 * The ownership of all of the Reply's pages are transferred into that 79 * ctxt, the Send WR is posted, and sendto returns. 80 * 81 * The svc_rdma_op_ctxt is presented when the Send WR completes. The 82 * Send completion handler finally releases the Reply's pages. 83 * 84 * This mechanism also assumes that completions on the transport's Send 85 * Completion Queue do not run in parallel. Otherwise a Write completion 86 * and Send completion running at the same time could release pages that 87 * are still DMA-mapped. 88 * 89 * Error Handling 90 * 91 * - If the Send WR is posted successfully, it will either complete 92 * successfully, or get flushed. Either way, the Send completion 93 * handler releases the Reply's pages. 94 * - If the Send WR cannot be not posted, the forward path releases 95 * the Reply's pages. 96 * 97 * This handles the case, without the use of page reference counting, 98 * where two different Write segments send portions of the same page. 99 */ 100 101 #include <linux/sunrpc/debug.h> 102 #include <linux/sunrpc/rpc_rdma.h> 103 #include <linux/spinlock.h> 104 #include <asm/unaligned.h> 105 #include <rdma/ib_verbs.h> 106 #include <rdma/rdma_cm.h> 107 #include <linux/sunrpc/svc_rdma.h> 108 109 #define RPCDBG_FACILITY RPCDBG_SVCXPRT 110 111 static u32 xdr_padsize(u32 len) 112 { 113 return (len & 3) ? (4 - (len & 3)) : 0; 114 } 115 116 /* Returns length of transport header, in bytes. 117 */ 118 static unsigned int svc_rdma_reply_hdr_len(__be32 *rdma_resp) 119 { 120 unsigned int nsegs; 121 __be32 *p; 122 123 p = rdma_resp; 124 125 /* RPC-over-RDMA V1 replies never have a Read list. */ 126 p += rpcrdma_fixed_maxsz + 1; 127 128 /* Skip Write list. */ 129 while (*p++ != xdr_zero) { 130 nsegs = be32_to_cpup(p++); 131 p += nsegs * rpcrdma_segment_maxsz; 132 } 133 134 /* Skip Reply chunk. */ 135 if (*p++ != xdr_zero) { 136 nsegs = be32_to_cpup(p++); 137 p += nsegs * rpcrdma_segment_maxsz; 138 } 139 140 return (unsigned long)p - (unsigned long)rdma_resp; 141 } 142 143 /* One Write chunk is copied from Call transport header to Reply 144 * transport header. Each segment's length field is updated to 145 * reflect number of bytes consumed in the segment. 146 * 147 * Returns number of segments in this chunk. 148 */ 149 static unsigned int xdr_encode_write_chunk(__be32 *dst, __be32 *src, 150 unsigned int remaining) 151 { 152 unsigned int i, nsegs; 153 u32 seg_len; 154 155 /* Write list discriminator */ 156 *dst++ = *src++; 157 158 /* number of segments in this chunk */ 159 nsegs = be32_to_cpup(src); 160 *dst++ = *src++; 161 162 for (i = nsegs; i; i--) { 163 /* segment's RDMA handle */ 164 *dst++ = *src++; 165 166 /* bytes returned in this segment */ 167 seg_len = be32_to_cpu(*src); 168 if (remaining >= seg_len) { 169 /* entire segment was consumed */ 170 *dst = *src; 171 remaining -= seg_len; 172 } else { 173 /* segment only partly filled */ 174 *dst = cpu_to_be32(remaining); 175 remaining = 0; 176 } 177 dst++; src++; 178 179 /* segment's RDMA offset */ 180 *dst++ = *src++; 181 *dst++ = *src++; 182 } 183 184 return nsegs; 185 } 186 187 /* The client provided a Write list in the Call message. Fill in 188 * the segments in the first Write chunk in the Reply's transport 189 * header with the number of bytes consumed in each segment. 190 * Remaining chunks are returned unused. 191 * 192 * Assumptions: 193 * - Client has provided only one Write chunk 194 */ 195 static void svc_rdma_xdr_encode_write_list(__be32 *rdma_resp, __be32 *wr_ch, 196 unsigned int consumed) 197 { 198 unsigned int nsegs; 199 __be32 *p, *q; 200 201 /* RPC-over-RDMA V1 replies never have a Read list. */ 202 p = rdma_resp + rpcrdma_fixed_maxsz + 1; 203 204 q = wr_ch; 205 while (*q != xdr_zero) { 206 nsegs = xdr_encode_write_chunk(p, q, consumed); 207 q += 2 + nsegs * rpcrdma_segment_maxsz; 208 p += 2 + nsegs * rpcrdma_segment_maxsz; 209 consumed = 0; 210 } 211 212 /* Terminate Write list */ 213 *p++ = xdr_zero; 214 215 /* Reply chunk discriminator; may be replaced later */ 216 *p = xdr_zero; 217 } 218 219 /* The client provided a Reply chunk in the Call message. Fill in 220 * the segments in the Reply chunk in the Reply message with the 221 * number of bytes consumed in each segment. 222 * 223 * Assumptions: 224 * - Reply can always fit in the provided Reply chunk 225 */ 226 static void svc_rdma_xdr_encode_reply_chunk(__be32 *rdma_resp, __be32 *rp_ch, 227 unsigned int consumed) 228 { 229 __be32 *p; 230 231 /* Find the Reply chunk in the Reply's xprt header. 232 * RPC-over-RDMA V1 replies never have a Read list. 233 */ 234 p = rdma_resp + rpcrdma_fixed_maxsz + 1; 235 236 /* Skip past Write list */ 237 while (*p++ != xdr_zero) 238 p += 1 + be32_to_cpup(p) * rpcrdma_segment_maxsz; 239 240 xdr_encode_write_chunk(p, rp_ch, consumed); 241 } 242 243 /* Parse the RPC Call's transport header. 244 */ 245 static void svc_rdma_get_write_arrays(__be32 *rdma_argp, 246 __be32 **write, __be32 **reply) 247 { 248 __be32 *p; 249 250 p = rdma_argp + rpcrdma_fixed_maxsz; 251 252 /* Read list */ 253 while (*p++ != xdr_zero) 254 p += 5; 255 256 /* Write list */ 257 if (*p != xdr_zero) { 258 *write = p; 259 while (*p++ != xdr_zero) 260 p += 1 + be32_to_cpu(*p) * 4; 261 } else { 262 *write = NULL; 263 p++; 264 } 265 266 /* Reply chunk */ 267 if (*p != xdr_zero) 268 *reply = p; 269 else 270 *reply = NULL; 271 } 272 273 /* RPC-over-RDMA Version One private extension: Remote Invalidation. 274 * Responder's choice: requester signals it can handle Send With 275 * Invalidate, and responder chooses one rkey to invalidate. 276 * 277 * Find a candidate rkey to invalidate when sending a reply. Picks the 278 * first R_key it finds in the chunk lists. 279 * 280 * Returns zero if RPC's chunk lists are empty. 281 */ 282 static u32 svc_rdma_get_inv_rkey(__be32 *rdma_argp, 283 __be32 *wr_lst, __be32 *rp_ch) 284 { 285 __be32 *p; 286 287 p = rdma_argp + rpcrdma_fixed_maxsz; 288 if (*p != xdr_zero) 289 p += 2; 290 else if (wr_lst && be32_to_cpup(wr_lst + 1)) 291 p = wr_lst + 2; 292 else if (rp_ch && be32_to_cpup(rp_ch + 1)) 293 p = rp_ch + 2; 294 else 295 return 0; 296 return be32_to_cpup(p); 297 } 298 299 /* ib_dma_map_page() is used here because svc_rdma_dma_unmap() 300 * is used during completion to DMA-unmap this memory, and 301 * it uses ib_dma_unmap_page() exclusively. 302 */ 303 static int svc_rdma_dma_map_buf(struct svcxprt_rdma *rdma, 304 struct svc_rdma_op_ctxt *ctxt, 305 unsigned int sge_no, 306 unsigned char *base, 307 unsigned int len) 308 { 309 unsigned long offset = (unsigned long)base & ~PAGE_MASK; 310 struct ib_device *dev = rdma->sc_cm_id->device; 311 dma_addr_t dma_addr; 312 313 dma_addr = ib_dma_map_page(dev, virt_to_page(base), 314 offset, len, DMA_TO_DEVICE); 315 if (ib_dma_mapping_error(dev, dma_addr)) 316 goto out_maperr; 317 318 ctxt->sge[sge_no].addr = dma_addr; 319 ctxt->sge[sge_no].length = len; 320 ctxt->sge[sge_no].lkey = rdma->sc_pd->local_dma_lkey; 321 svc_rdma_count_mappings(rdma, ctxt); 322 return 0; 323 324 out_maperr: 325 pr_err("svcrdma: failed to map buffer\n"); 326 return -EIO; 327 } 328 329 static int svc_rdma_dma_map_page(struct svcxprt_rdma *rdma, 330 struct svc_rdma_op_ctxt *ctxt, 331 unsigned int sge_no, 332 struct page *page, 333 unsigned int offset, 334 unsigned int len) 335 { 336 struct ib_device *dev = rdma->sc_cm_id->device; 337 dma_addr_t dma_addr; 338 339 dma_addr = ib_dma_map_page(dev, page, offset, len, DMA_TO_DEVICE); 340 if (ib_dma_mapping_error(dev, dma_addr)) 341 goto out_maperr; 342 343 ctxt->sge[sge_no].addr = dma_addr; 344 ctxt->sge[sge_no].length = len; 345 ctxt->sge[sge_no].lkey = rdma->sc_pd->local_dma_lkey; 346 svc_rdma_count_mappings(rdma, ctxt); 347 return 0; 348 349 out_maperr: 350 pr_err("svcrdma: failed to map page\n"); 351 return -EIO; 352 } 353 354 /** 355 * svc_rdma_map_reply_hdr - DMA map the transport header buffer 356 * @rdma: controlling transport 357 * @ctxt: op_ctxt for the Send WR 358 * @rdma_resp: buffer containing transport header 359 * @len: length of transport header 360 * 361 * Returns: 362 * %0 if the header is DMA mapped, 363 * %-EIO if DMA mapping failed. 364 */ 365 int svc_rdma_map_reply_hdr(struct svcxprt_rdma *rdma, 366 struct svc_rdma_op_ctxt *ctxt, 367 __be32 *rdma_resp, 368 unsigned int len) 369 { 370 ctxt->direction = DMA_TO_DEVICE; 371 ctxt->pages[0] = virt_to_page(rdma_resp); 372 ctxt->count = 1; 373 return svc_rdma_dma_map_page(rdma, ctxt, 0, ctxt->pages[0], 0, len); 374 } 375 376 /* Load the xdr_buf into the ctxt's sge array, and DMA map each 377 * element as it is added. 378 * 379 * Returns the number of sge elements loaded on success, or 380 * a negative errno on failure. 381 */ 382 static int svc_rdma_map_reply_msg(struct svcxprt_rdma *rdma, 383 struct svc_rdma_op_ctxt *ctxt, 384 struct xdr_buf *xdr, __be32 *wr_lst) 385 { 386 unsigned int len, sge_no, remaining, page_off; 387 struct page **ppages; 388 unsigned char *base; 389 u32 xdr_pad; 390 int ret; 391 392 sge_no = 1; 393 394 ret = svc_rdma_dma_map_buf(rdma, ctxt, sge_no++, 395 xdr->head[0].iov_base, 396 xdr->head[0].iov_len); 397 if (ret < 0) 398 return ret; 399 400 /* If a Write chunk is present, the xdr_buf's page list 401 * is not included inline. However the Upper Layer may 402 * have added XDR padding in the tail buffer, and that 403 * should not be included inline. 404 */ 405 if (wr_lst) { 406 base = xdr->tail[0].iov_base; 407 len = xdr->tail[0].iov_len; 408 xdr_pad = xdr_padsize(xdr->page_len); 409 410 if (len && xdr_pad) { 411 base += xdr_pad; 412 len -= xdr_pad; 413 } 414 415 goto tail; 416 } 417 418 ppages = xdr->pages + (xdr->page_base >> PAGE_SHIFT); 419 page_off = xdr->page_base & ~PAGE_MASK; 420 remaining = xdr->page_len; 421 while (remaining) { 422 len = min_t(u32, PAGE_SIZE - page_off, remaining); 423 424 ret = svc_rdma_dma_map_page(rdma, ctxt, sge_no++, 425 *ppages++, page_off, len); 426 if (ret < 0) 427 return ret; 428 429 remaining -= len; 430 page_off = 0; 431 } 432 433 base = xdr->tail[0].iov_base; 434 len = xdr->tail[0].iov_len; 435 tail: 436 if (len) { 437 ret = svc_rdma_dma_map_buf(rdma, ctxt, sge_no++, base, len); 438 if (ret < 0) 439 return ret; 440 } 441 442 return sge_no - 1; 443 } 444 445 /* The svc_rqst and all resources it owns are released as soon as 446 * svc_rdma_sendto returns. Transfer pages under I/O to the ctxt 447 * so they are released by the Send completion handler. 448 */ 449 static void svc_rdma_save_io_pages(struct svc_rqst *rqstp, 450 struct svc_rdma_op_ctxt *ctxt) 451 { 452 int i, pages = rqstp->rq_next_page - rqstp->rq_respages; 453 454 ctxt->count += pages; 455 for (i = 0; i < pages; i++) { 456 ctxt->pages[i + 1] = rqstp->rq_respages[i]; 457 rqstp->rq_respages[i] = NULL; 458 } 459 rqstp->rq_next_page = rqstp->rq_respages + 1; 460 } 461 462 /** 463 * svc_rdma_post_send_wr - Set up and post one Send Work Request 464 * @rdma: controlling transport 465 * @ctxt: op_ctxt for transmitting the Send WR 466 * @num_sge: number of SGEs to send 467 * @inv_rkey: R_key argument to Send With Invalidate, or zero 468 * 469 * Returns: 470 * %0 if the Send* was posted successfully, 471 * %-ENOTCONN if the connection was lost or dropped, 472 * %-EINVAL if there was a problem with the Send we built, 473 * %-ENOMEM if ib_post_send failed. 474 */ 475 int svc_rdma_post_send_wr(struct svcxprt_rdma *rdma, 476 struct svc_rdma_op_ctxt *ctxt, int num_sge, 477 u32 inv_rkey) 478 { 479 struct ib_send_wr *send_wr = &ctxt->send_wr; 480 481 dprintk("svcrdma: posting Send WR with %u sge(s)\n", num_sge); 482 483 send_wr->next = NULL; 484 ctxt->cqe.done = svc_rdma_wc_send; 485 send_wr->wr_cqe = &ctxt->cqe; 486 send_wr->sg_list = ctxt->sge; 487 send_wr->num_sge = num_sge; 488 send_wr->send_flags = IB_SEND_SIGNALED; 489 if (inv_rkey) { 490 send_wr->opcode = IB_WR_SEND_WITH_INV; 491 send_wr->ex.invalidate_rkey = inv_rkey; 492 } else { 493 send_wr->opcode = IB_WR_SEND; 494 } 495 496 return svc_rdma_send(rdma, send_wr); 497 } 498 499 /* Prepare the portion of the RPC Reply that will be transmitted 500 * via RDMA Send. The RPC-over-RDMA transport header is prepared 501 * in sge[0], and the RPC xdr_buf is prepared in following sges. 502 * 503 * Depending on whether a Write list or Reply chunk is present, 504 * the server may send all, a portion of, or none of the xdr_buf. 505 * In the latter case, only the transport header (sge[0]) is 506 * transmitted. 507 * 508 * RDMA Send is the last step of transmitting an RPC reply. Pages 509 * involved in the earlier RDMA Writes are here transferred out 510 * of the rqstp and into the ctxt's page array. These pages are 511 * DMA unmapped by each Write completion, but the subsequent Send 512 * completion finally releases these pages. 513 * 514 * Assumptions: 515 * - The Reply's transport header will never be larger than a page. 516 */ 517 static int svc_rdma_send_reply_msg(struct svcxprt_rdma *rdma, 518 __be32 *rdma_argp, __be32 *rdma_resp, 519 struct svc_rqst *rqstp, 520 __be32 *wr_lst, __be32 *rp_ch) 521 { 522 struct svc_rdma_op_ctxt *ctxt; 523 u32 inv_rkey; 524 int ret; 525 526 dprintk("svcrdma: sending %s reply: head=%zu, pagelen=%u, tail=%zu\n", 527 (rp_ch ? "RDMA_NOMSG" : "RDMA_MSG"), 528 rqstp->rq_res.head[0].iov_len, 529 rqstp->rq_res.page_len, 530 rqstp->rq_res.tail[0].iov_len); 531 532 ctxt = svc_rdma_get_context(rdma); 533 534 ret = svc_rdma_map_reply_hdr(rdma, ctxt, rdma_resp, 535 svc_rdma_reply_hdr_len(rdma_resp)); 536 if (ret < 0) 537 goto err; 538 539 if (!rp_ch) { 540 ret = svc_rdma_map_reply_msg(rdma, ctxt, 541 &rqstp->rq_res, wr_lst); 542 if (ret < 0) 543 goto err; 544 } 545 546 svc_rdma_save_io_pages(rqstp, ctxt); 547 548 inv_rkey = 0; 549 if (rdma->sc_snd_w_inv) 550 inv_rkey = svc_rdma_get_inv_rkey(rdma_argp, wr_lst, rp_ch); 551 ret = svc_rdma_post_send_wr(rdma, ctxt, 1 + ret, inv_rkey); 552 if (ret) 553 goto err; 554 555 return 0; 556 557 err: 558 svc_rdma_unmap_dma(ctxt); 559 svc_rdma_put_context(ctxt, 1); 560 return ret; 561 } 562 563 /* Given the client-provided Write and Reply chunks, the server was not 564 * able to form a complete reply. Return an RDMA_ERROR message so the 565 * client can retire this RPC transaction. As above, the Send completion 566 * routine releases payload pages that were part of a previous RDMA Write. 567 * 568 * Remote Invalidation is skipped for simplicity. 569 */ 570 static int svc_rdma_send_error_msg(struct svcxprt_rdma *rdma, 571 __be32 *rdma_resp, struct svc_rqst *rqstp) 572 { 573 struct svc_rdma_op_ctxt *ctxt; 574 __be32 *p; 575 int ret; 576 577 ctxt = svc_rdma_get_context(rdma); 578 579 /* Replace the original transport header with an 580 * RDMA_ERROR response. XID etc are preserved. 581 */ 582 p = rdma_resp + 3; 583 *p++ = rdma_error; 584 *p = err_chunk; 585 586 ret = svc_rdma_map_reply_hdr(rdma, ctxt, rdma_resp, 20); 587 if (ret < 0) 588 goto err; 589 590 svc_rdma_save_io_pages(rqstp, ctxt); 591 592 ret = svc_rdma_post_send_wr(rdma, ctxt, 1 + ret, 0); 593 if (ret) 594 goto err; 595 596 return 0; 597 598 err: 599 pr_err("svcrdma: failed to post Send WR (%d)\n", ret); 600 svc_rdma_unmap_dma(ctxt); 601 svc_rdma_put_context(ctxt, 1); 602 return ret; 603 } 604 605 void svc_rdma_prep_reply_hdr(struct svc_rqst *rqstp) 606 { 607 } 608 609 /** 610 * svc_rdma_sendto - Transmit an RPC reply 611 * @rqstp: processed RPC request, reply XDR already in ::rq_res 612 * 613 * Any resources still associated with @rqstp are released upon return. 614 * If no reply message was possible, the connection is closed. 615 * 616 * Returns: 617 * %0 if an RPC reply has been successfully posted, 618 * %-ENOMEM if a resource shortage occurred (connection is lost), 619 * %-ENOTCONN if posting failed (connection is lost). 620 */ 621 int svc_rdma_sendto(struct svc_rqst *rqstp) 622 { 623 struct svc_xprt *xprt = rqstp->rq_xprt; 624 struct svcxprt_rdma *rdma = 625 container_of(xprt, struct svcxprt_rdma, sc_xprt); 626 __be32 *p, *rdma_argp, *rdma_resp, *wr_lst, *rp_ch; 627 struct xdr_buf *xdr = &rqstp->rq_res; 628 struct page *res_page; 629 int ret; 630 631 /* Find the call's chunk lists to decide how to send the reply. 632 * Receive places the Call's xprt header at the start of page 0. 633 */ 634 rdma_argp = page_address(rqstp->rq_pages[0]); 635 svc_rdma_get_write_arrays(rdma_argp, &wr_lst, &rp_ch); 636 637 dprintk("svcrdma: preparing response for XID 0x%08x\n", 638 be32_to_cpup(rdma_argp)); 639 640 /* Create the RDMA response header. xprt->xpt_mutex, 641 * acquired in svc_send(), serializes RPC replies. The 642 * code path below that inserts the credit grant value 643 * into each transport header runs only inside this 644 * critical section. 645 */ 646 ret = -ENOMEM; 647 res_page = alloc_page(GFP_KERNEL); 648 if (!res_page) 649 goto err0; 650 rdma_resp = page_address(res_page); 651 652 p = rdma_resp; 653 *p++ = *rdma_argp; 654 *p++ = *(rdma_argp + 1); 655 *p++ = rdma->sc_fc_credits; 656 *p++ = rp_ch ? rdma_nomsg : rdma_msg; 657 658 /* Start with empty chunks */ 659 *p++ = xdr_zero; 660 *p++ = xdr_zero; 661 *p = xdr_zero; 662 663 if (wr_lst) { 664 /* XXX: Presume the client sent only one Write chunk */ 665 ret = svc_rdma_send_write_chunk(rdma, wr_lst, xdr); 666 if (ret < 0) 667 goto err2; 668 svc_rdma_xdr_encode_write_list(rdma_resp, wr_lst, ret); 669 } 670 if (rp_ch) { 671 ret = svc_rdma_send_reply_chunk(rdma, rp_ch, wr_lst, xdr); 672 if (ret < 0) 673 goto err2; 674 svc_rdma_xdr_encode_reply_chunk(rdma_resp, rp_ch, ret); 675 } 676 677 ret = svc_rdma_post_recv(rdma, GFP_KERNEL); 678 if (ret) 679 goto err1; 680 ret = svc_rdma_send_reply_msg(rdma, rdma_argp, rdma_resp, rqstp, 681 wr_lst, rp_ch); 682 if (ret < 0) 683 goto err0; 684 return 0; 685 686 err2: 687 if (ret != -E2BIG && ret != -EINVAL) 688 goto err1; 689 690 ret = svc_rdma_post_recv(rdma, GFP_KERNEL); 691 if (ret) 692 goto err1; 693 ret = svc_rdma_send_error_msg(rdma, rdma_resp, rqstp); 694 if (ret < 0) 695 goto err0; 696 return 0; 697 698 err1: 699 put_page(res_page); 700 err0: 701 pr_err("svcrdma: Could not send reply, err=%d. Closing transport.\n", 702 ret); 703 set_bit(XPT_CLOSE, &xprt->xpt_flags); 704 return -ENOTCONN; 705 } 706