1bcf3ffd4SChuck Lever // SPDX-License-Identifier: GPL-2.0 OR BSD-3-Clause 2c06b540aSTom Tucker /* 3ecf85b23SChuck Lever * Copyright (c) 2016-2018 Oracle. All rights reserved. 40bf48289SSteve Wise * Copyright (c) 2014 Open Grid Computing, Inc. All rights reserved. 5c06b540aSTom Tucker * Copyright (c) 2005-2006 Network Appliance, Inc. All rights reserved. 6c06b540aSTom Tucker * 7c06b540aSTom Tucker * This software is available to you under a choice of one of two 8c06b540aSTom Tucker * licenses. You may choose to be licensed under the terms of the GNU 9c06b540aSTom Tucker * General Public License (GPL) Version 2, available from the file 10c06b540aSTom Tucker * COPYING in the main directory of this source tree, or the BSD-type 11c06b540aSTom Tucker * license below: 12c06b540aSTom Tucker * 13c06b540aSTom Tucker * Redistribution and use in source and binary forms, with or without 14c06b540aSTom Tucker * modification, are permitted provided that the following conditions 15c06b540aSTom Tucker * are met: 16c06b540aSTom Tucker * 17c06b540aSTom Tucker * Redistributions of source code must retain the above copyright 18c06b540aSTom Tucker * notice, this list of conditions and the following disclaimer. 19c06b540aSTom Tucker * 20c06b540aSTom Tucker * Redistributions in binary form must reproduce the above 21c06b540aSTom Tucker * copyright notice, this list of conditions and the following 22c06b540aSTom Tucker * disclaimer in the documentation and/or other materials provided 23c06b540aSTom Tucker * with the distribution. 24c06b540aSTom Tucker * 25c06b540aSTom Tucker * Neither the name of the Network Appliance, Inc. nor the names of 26c06b540aSTom Tucker * its contributors may be used to endorse or promote products 27c06b540aSTom Tucker * derived from this software without specific prior written 28c06b540aSTom Tucker * permission. 29c06b540aSTom Tucker * 30c06b540aSTom Tucker * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 31c06b540aSTom Tucker * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 32c06b540aSTom Tucker * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 33c06b540aSTom Tucker * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 34c06b540aSTom Tucker * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 35c06b540aSTom Tucker * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 36c06b540aSTom Tucker * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 37c06b540aSTom Tucker * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 38c06b540aSTom Tucker * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 39c06b540aSTom Tucker * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 40c06b540aSTom Tucker * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 41c06b540aSTom Tucker * 42c06b540aSTom Tucker * Author: Tom Tucker <tom@opengridcomputing.com> 43c06b540aSTom Tucker */ 44c06b540aSTom Tucker 459a6a180bSChuck Lever /* Operation 469a6a180bSChuck Lever * 479a6a180bSChuck Lever * The main entry point is svc_rdma_sendto. This is called by the 489a6a180bSChuck Lever * RPC server when an RPC Reply is ready to be transmitted to a client. 499a6a180bSChuck Lever * 509a6a180bSChuck Lever * The passed-in svc_rqst contains a struct xdr_buf which holds an 519a6a180bSChuck Lever * XDR-encoded RPC Reply message. sendto must construct the RPC-over-RDMA 529a6a180bSChuck Lever * transport header, post all Write WRs needed for this Reply, then post 539a6a180bSChuck Lever * a Send WR conveying the transport header and the RPC message itself to 549a6a180bSChuck Lever * the client. 559a6a180bSChuck Lever * 569a6a180bSChuck Lever * svc_rdma_sendto must fully transmit the Reply before returning, as 579a6a180bSChuck Lever * the svc_rqst will be recycled as soon as sendto returns. Remaining 589a6a180bSChuck Lever * resources referred to by the svc_rqst are also recycled at that time. 599a6a180bSChuck Lever * Therefore any resources that must remain longer must be detached 609a6a180bSChuck Lever * from the svc_rqst and released later. 619a6a180bSChuck Lever * 629a6a180bSChuck Lever * Page Management 639a6a180bSChuck Lever * 649a6a180bSChuck Lever * The I/O that performs Reply transmission is asynchronous, and may 659a6a180bSChuck Lever * complete well after sendto returns. Thus pages under I/O must be 669a6a180bSChuck Lever * removed from the svc_rqst before sendto returns. 679a6a180bSChuck Lever * 689a6a180bSChuck Lever * The logic here depends on Send Queue and completion ordering. Since 699a6a180bSChuck Lever * the Send WR is always posted last, it will always complete last. Thus 709a6a180bSChuck Lever * when it completes, it is guaranteed that all previous Write WRs have 719a6a180bSChuck Lever * also completed. 729a6a180bSChuck Lever * 739a6a180bSChuck Lever * Write WRs are constructed and posted. Each Write segment gets its own 749a6a180bSChuck Lever * svc_rdma_rw_ctxt, allowing the Write completion handler to find and 759a6a180bSChuck Lever * DMA-unmap the pages under I/O for that Write segment. The Write 769a6a180bSChuck Lever * completion handler does not release any pages. 779a6a180bSChuck Lever * 784201c746SChuck Lever * When the Send WR is constructed, it also gets its own svc_rdma_send_ctxt. 799a6a180bSChuck Lever * The ownership of all of the Reply's pages are transferred into that 809a6a180bSChuck Lever * ctxt, the Send WR is posted, and sendto returns. 819a6a180bSChuck Lever * 824201c746SChuck Lever * The svc_rdma_send_ctxt is presented when the Send WR completes. The 839a6a180bSChuck Lever * Send completion handler finally releases the Reply's pages. 849a6a180bSChuck Lever * 859a6a180bSChuck Lever * This mechanism also assumes that completions on the transport's Send 869a6a180bSChuck Lever * Completion Queue do not run in parallel. Otherwise a Write completion 879a6a180bSChuck Lever * and Send completion running at the same time could release pages that 889a6a180bSChuck Lever * are still DMA-mapped. 899a6a180bSChuck Lever * 909a6a180bSChuck Lever * Error Handling 919a6a180bSChuck Lever * 929a6a180bSChuck Lever * - If the Send WR is posted successfully, it will either complete 939a6a180bSChuck Lever * successfully, or get flushed. Either way, the Send completion 949a6a180bSChuck Lever * handler releases the Reply's pages. 959a6a180bSChuck Lever * - If the Send WR cannot be not posted, the forward path releases 969a6a180bSChuck Lever * the Reply's pages. 979a6a180bSChuck Lever * 989a6a180bSChuck Lever * This handles the case, without the use of page reference counting, 999a6a180bSChuck Lever * where two different Write segments send portions of the same page. 1009a6a180bSChuck Lever */ 1019a6a180bSChuck Lever 102c06b540aSTom Tucker #include <linux/spinlock.h> 103c06b540aSTom Tucker #include <asm/unaligned.h> 10498895edbSChuck Lever 105c06b540aSTom Tucker #include <rdma/ib_verbs.h> 106c06b540aSTom Tucker #include <rdma/rdma_cm.h> 10798895edbSChuck Lever 10898895edbSChuck Lever #include <linux/sunrpc/debug.h> 10998895edbSChuck Lever #include <linux/sunrpc/rpc_rdma.h> 110c06b540aSTom Tucker #include <linux/sunrpc/svc_rdma.h> 111c06b540aSTom Tucker 11298895edbSChuck Lever #include "xprt_rdma.h" 11398895edbSChuck Lever #include <trace/events/rpcrdma.h> 11498895edbSChuck Lever 115c06b540aSTom Tucker #define RPCDBG_FACILITY RPCDBG_SVCXPRT 116c06b540aSTom Tucker 1174201c746SChuck Lever static void svc_rdma_wc_send(struct ib_cq *cq, struct ib_wc *wc); 1184201c746SChuck Lever 1194201c746SChuck Lever static inline struct svc_rdma_send_ctxt * 1204201c746SChuck Lever svc_rdma_next_send_ctxt(struct list_head *list) 1214201c746SChuck Lever { 1224201c746SChuck Lever return list_first_entry_or_null(list, struct svc_rdma_send_ctxt, 1234201c746SChuck Lever sc_list); 1244201c746SChuck Lever } 1254201c746SChuck Lever 1264201c746SChuck Lever static struct svc_rdma_send_ctxt * 1274201c746SChuck Lever svc_rdma_send_ctxt_alloc(struct svcxprt_rdma *rdma) 1284201c746SChuck Lever { 1294201c746SChuck Lever struct svc_rdma_send_ctxt *ctxt; 13099722fe4SChuck Lever dma_addr_t addr; 13199722fe4SChuck Lever void *buffer; 13225fd86ecSChuck Lever size_t size; 1334201c746SChuck Lever int i; 1344201c746SChuck Lever 13525fd86ecSChuck Lever size = sizeof(*ctxt); 13625fd86ecSChuck Lever size += rdma->sc_max_send_sges * sizeof(struct ib_sge); 13725fd86ecSChuck Lever ctxt = kmalloc(size, GFP_KERNEL); 1384201c746SChuck Lever if (!ctxt) 13999722fe4SChuck Lever goto fail0; 14099722fe4SChuck Lever buffer = kmalloc(rdma->sc_max_req_size, GFP_KERNEL); 14199722fe4SChuck Lever if (!buffer) 14299722fe4SChuck Lever goto fail1; 14399722fe4SChuck Lever addr = ib_dma_map_single(rdma->sc_pd->device, buffer, 14499722fe4SChuck Lever rdma->sc_max_req_size, DMA_TO_DEVICE); 14599722fe4SChuck Lever if (ib_dma_mapping_error(rdma->sc_pd->device, addr)) 14699722fe4SChuck Lever goto fail2; 1474201c746SChuck Lever 1484201c746SChuck Lever ctxt->sc_send_wr.next = NULL; 1494201c746SChuck Lever ctxt->sc_send_wr.wr_cqe = &ctxt->sc_cqe; 1504201c746SChuck Lever ctxt->sc_send_wr.sg_list = ctxt->sc_sges; 1514201c746SChuck Lever ctxt->sc_send_wr.send_flags = IB_SEND_SIGNALED; 15299722fe4SChuck Lever ctxt->sc_cqe.done = svc_rdma_wc_send; 15399722fe4SChuck Lever ctxt->sc_xprt_buf = buffer; 15499722fe4SChuck Lever ctxt->sc_sges[0].addr = addr; 15599722fe4SChuck Lever 15625fd86ecSChuck Lever for (i = 0; i < rdma->sc_max_send_sges; i++) 1574201c746SChuck Lever ctxt->sc_sges[i].lkey = rdma->sc_pd->local_dma_lkey; 1584201c746SChuck Lever return ctxt; 15999722fe4SChuck Lever 16099722fe4SChuck Lever fail2: 16199722fe4SChuck Lever kfree(buffer); 16299722fe4SChuck Lever fail1: 16399722fe4SChuck Lever kfree(ctxt); 16499722fe4SChuck Lever fail0: 16599722fe4SChuck Lever return NULL; 1664201c746SChuck Lever } 1674201c746SChuck Lever 1684201c746SChuck Lever /** 1694201c746SChuck Lever * svc_rdma_send_ctxts_destroy - Release all send_ctxt's for an xprt 1704201c746SChuck Lever * @rdma: svcxprt_rdma being torn down 1714201c746SChuck Lever * 1724201c746SChuck Lever */ 1734201c746SChuck Lever void svc_rdma_send_ctxts_destroy(struct svcxprt_rdma *rdma) 1744201c746SChuck Lever { 1754201c746SChuck Lever struct svc_rdma_send_ctxt *ctxt; 1764201c746SChuck Lever 1774201c746SChuck Lever while ((ctxt = svc_rdma_next_send_ctxt(&rdma->sc_send_ctxts))) { 1784201c746SChuck Lever list_del(&ctxt->sc_list); 17999722fe4SChuck Lever ib_dma_unmap_single(rdma->sc_pd->device, 18099722fe4SChuck Lever ctxt->sc_sges[0].addr, 18199722fe4SChuck Lever rdma->sc_max_req_size, 18299722fe4SChuck Lever DMA_TO_DEVICE); 18399722fe4SChuck Lever kfree(ctxt->sc_xprt_buf); 1844201c746SChuck Lever kfree(ctxt); 1854201c746SChuck Lever } 1864201c746SChuck Lever } 1874201c746SChuck Lever 1884201c746SChuck Lever /** 1894201c746SChuck Lever * svc_rdma_send_ctxt_get - Get a free send_ctxt 1904201c746SChuck Lever * @rdma: controlling svcxprt_rdma 1914201c746SChuck Lever * 1924201c746SChuck Lever * Returns a ready-to-use send_ctxt, or NULL if none are 1934201c746SChuck Lever * available and a fresh one cannot be allocated. 1944201c746SChuck Lever */ 1954201c746SChuck Lever struct svc_rdma_send_ctxt *svc_rdma_send_ctxt_get(struct svcxprt_rdma *rdma) 1964201c746SChuck Lever { 1974201c746SChuck Lever struct svc_rdma_send_ctxt *ctxt; 1984201c746SChuck Lever 1994201c746SChuck Lever spin_lock(&rdma->sc_send_lock); 2004201c746SChuck Lever ctxt = svc_rdma_next_send_ctxt(&rdma->sc_send_ctxts); 2014201c746SChuck Lever if (!ctxt) 2024201c746SChuck Lever goto out_empty; 2034201c746SChuck Lever list_del(&ctxt->sc_list); 2044201c746SChuck Lever spin_unlock(&rdma->sc_send_lock); 2054201c746SChuck Lever 2064201c746SChuck Lever out: 2074201c746SChuck Lever ctxt->sc_send_wr.num_sge = 0; 20899722fe4SChuck Lever ctxt->sc_cur_sge_no = 0; 2094201c746SChuck Lever ctxt->sc_page_count = 0; 2104201c746SChuck Lever return ctxt; 2114201c746SChuck Lever 2124201c746SChuck Lever out_empty: 2134201c746SChuck Lever spin_unlock(&rdma->sc_send_lock); 2144201c746SChuck Lever ctxt = svc_rdma_send_ctxt_alloc(rdma); 2154201c746SChuck Lever if (!ctxt) 2164201c746SChuck Lever return NULL; 2174201c746SChuck Lever goto out; 2184201c746SChuck Lever } 2194201c746SChuck Lever 2204201c746SChuck Lever /** 2214201c746SChuck Lever * svc_rdma_send_ctxt_put - Return send_ctxt to free list 2224201c746SChuck Lever * @rdma: controlling svcxprt_rdma 2234201c746SChuck Lever * @ctxt: object to return to the free list 2244201c746SChuck Lever * 2254201c746SChuck Lever * Pages left in sc_pages are DMA unmapped and released. 2264201c746SChuck Lever */ 2274201c746SChuck Lever void svc_rdma_send_ctxt_put(struct svcxprt_rdma *rdma, 2284201c746SChuck Lever struct svc_rdma_send_ctxt *ctxt) 2294201c746SChuck Lever { 2304201c746SChuck Lever struct ib_device *device = rdma->sc_cm_id->device; 2314201c746SChuck Lever unsigned int i; 2324201c746SChuck Lever 23399722fe4SChuck Lever /* The first SGE contains the transport header, which 23499722fe4SChuck Lever * remains mapped until @ctxt is destroyed. 23599722fe4SChuck Lever */ 236832b2cb9SChuck Lever for (i = 1; i < ctxt->sc_send_wr.num_sge; i++) { 2374201c746SChuck Lever ib_dma_unmap_page(device, 2384201c746SChuck Lever ctxt->sc_sges[i].addr, 2394201c746SChuck Lever ctxt->sc_sges[i].length, 2404201c746SChuck Lever DMA_TO_DEVICE); 241832b2cb9SChuck Lever trace_svcrdma_dma_unmap_page(rdma, 242832b2cb9SChuck Lever ctxt->sc_sges[i].addr, 243832b2cb9SChuck Lever ctxt->sc_sges[i].length); 244832b2cb9SChuck Lever } 2454201c746SChuck Lever 2464201c746SChuck Lever for (i = 0; i < ctxt->sc_page_count; ++i) 2474201c746SChuck Lever put_page(ctxt->sc_pages[i]); 2484201c746SChuck Lever 2494201c746SChuck Lever spin_lock(&rdma->sc_send_lock); 2504201c746SChuck Lever list_add(&ctxt->sc_list, &rdma->sc_send_ctxts); 2514201c746SChuck Lever spin_unlock(&rdma->sc_send_lock); 2524201c746SChuck Lever } 2534201c746SChuck Lever 2544201c746SChuck Lever /** 2554201c746SChuck Lever * svc_rdma_wc_send - Invoked by RDMA provider for each polled Send WC 2564201c746SChuck Lever * @cq: Completion Queue context 2574201c746SChuck Lever * @wc: Work Completion object 2584201c746SChuck Lever * 2594201c746SChuck Lever * NB: The svc_xprt/svcxprt_rdma is pinned whenever it's possible that 2604201c746SChuck Lever * the Send completion handler could be running. 2614201c746SChuck Lever */ 2624201c746SChuck Lever static void svc_rdma_wc_send(struct ib_cq *cq, struct ib_wc *wc) 2634201c746SChuck Lever { 2644201c746SChuck Lever struct svcxprt_rdma *rdma = cq->cq_context; 2654201c746SChuck Lever struct ib_cqe *cqe = wc->wr_cqe; 2664201c746SChuck Lever struct svc_rdma_send_ctxt *ctxt; 2674201c746SChuck Lever 2684201c746SChuck Lever trace_svcrdma_wc_send(wc); 2694201c746SChuck Lever 2704201c746SChuck Lever atomic_inc(&rdma->sc_sq_avail); 2714201c746SChuck Lever wake_up(&rdma->sc_send_wait); 2724201c746SChuck Lever 2734201c746SChuck Lever ctxt = container_of(cqe, struct svc_rdma_send_ctxt, sc_cqe); 2744201c746SChuck Lever svc_rdma_send_ctxt_put(rdma, ctxt); 2754201c746SChuck Lever 2764201c746SChuck Lever if (unlikely(wc->status != IB_WC_SUCCESS)) { 2774201c746SChuck Lever set_bit(XPT_CLOSE, &rdma->sc_xprt.xpt_flags); 2784201c746SChuck Lever svc_xprt_enqueue(&rdma->sc_xprt); 2794201c746SChuck Lever } 2804201c746SChuck Lever 2814201c746SChuck Lever svc_xprt_put(&rdma->sc_xprt); 2824201c746SChuck Lever } 2834201c746SChuck Lever 2843abb03faSChuck Lever /** 2853abb03faSChuck Lever * svc_rdma_send - Post a single Send WR 2863abb03faSChuck Lever * @rdma: transport on which to post the WR 2873abb03faSChuck Lever * @wr: prepared Send WR to post 2883abb03faSChuck Lever * 2893abb03faSChuck Lever * Returns zero the Send WR was posted successfully. Otherwise, a 2903abb03faSChuck Lever * negative errno is returned. 2913abb03faSChuck Lever */ 2924201c746SChuck Lever int svc_rdma_send(struct svcxprt_rdma *rdma, struct ib_send_wr *wr) 2934201c746SChuck Lever { 2944201c746SChuck Lever int ret; 2954201c746SChuck Lever 2963abb03faSChuck Lever might_sleep(); 2974201c746SChuck Lever 2984201c746SChuck Lever /* If the SQ is full, wait until an SQ entry is available */ 2994201c746SChuck Lever while (1) { 3003abb03faSChuck Lever if ((atomic_dec_return(&rdma->sc_sq_avail) < 0)) { 3014201c746SChuck Lever atomic_inc(&rdma_stat_sq_starve); 3024201c746SChuck Lever trace_svcrdma_sq_full(rdma); 3033abb03faSChuck Lever atomic_inc(&rdma->sc_sq_avail); 3044201c746SChuck Lever wait_event(rdma->sc_send_wait, 3053abb03faSChuck Lever atomic_read(&rdma->sc_sq_avail) > 1); 3064201c746SChuck Lever if (test_bit(XPT_CLOSE, &rdma->sc_xprt.xpt_flags)) 3074201c746SChuck Lever return -ENOTCONN; 3084201c746SChuck Lever trace_svcrdma_sq_retry(rdma); 3094201c746SChuck Lever continue; 3104201c746SChuck Lever } 3114201c746SChuck Lever 3123abb03faSChuck Lever svc_xprt_get(&rdma->sc_xprt); 313ed288d74SBart Van Assche ret = ib_post_send(rdma->sc_qp, wr, NULL); 3144201c746SChuck Lever trace_svcrdma_post_send(wr, ret); 3154201c746SChuck Lever if (ret) { 3164201c746SChuck Lever set_bit(XPT_CLOSE, &rdma->sc_xprt.xpt_flags); 3174201c746SChuck Lever svc_xprt_put(&rdma->sc_xprt); 3184201c746SChuck Lever wake_up(&rdma->sc_send_wait); 3194201c746SChuck Lever } 3204201c746SChuck Lever break; 3214201c746SChuck Lever } 3224201c746SChuck Lever return ret; 3234201c746SChuck Lever } 3244201c746SChuck Lever 325cf570a93SChuck Lever static u32 xdr_padsize(u32 len) 326cf570a93SChuck Lever { 327cf570a93SChuck Lever return (len & 3) ? (4 - (len & 3)) : 0; 328cf570a93SChuck Lever } 329cf570a93SChuck Lever 3309a6a180bSChuck Lever /* Returns length of transport header, in bytes. 3319a6a180bSChuck Lever */ 3329a6a180bSChuck Lever static unsigned int svc_rdma_reply_hdr_len(__be32 *rdma_resp) 3339a6a180bSChuck Lever { 3349a6a180bSChuck Lever unsigned int nsegs; 3359a6a180bSChuck Lever __be32 *p; 3369a6a180bSChuck Lever 3379a6a180bSChuck Lever p = rdma_resp; 3389a6a180bSChuck Lever 3399a6a180bSChuck Lever /* RPC-over-RDMA V1 replies never have a Read list. */ 3409a6a180bSChuck Lever p += rpcrdma_fixed_maxsz + 1; 3419a6a180bSChuck Lever 3429a6a180bSChuck Lever /* Skip Write list. */ 3439a6a180bSChuck Lever while (*p++ != xdr_zero) { 3449a6a180bSChuck Lever nsegs = be32_to_cpup(p++); 3459a6a180bSChuck Lever p += nsegs * rpcrdma_segment_maxsz; 3469a6a180bSChuck Lever } 3479a6a180bSChuck Lever 3489a6a180bSChuck Lever /* Skip Reply chunk. */ 3499a6a180bSChuck Lever if (*p++ != xdr_zero) { 3509a6a180bSChuck Lever nsegs = be32_to_cpup(p++); 3519a6a180bSChuck Lever p += nsegs * rpcrdma_segment_maxsz; 3529a6a180bSChuck Lever } 3539a6a180bSChuck Lever 3549a6a180bSChuck Lever return (unsigned long)p - (unsigned long)rdma_resp; 3559a6a180bSChuck Lever } 3569a6a180bSChuck Lever 3579a6a180bSChuck Lever /* One Write chunk is copied from Call transport header to Reply 3589a6a180bSChuck Lever * transport header. Each segment's length field is updated to 3599a6a180bSChuck Lever * reflect number of bytes consumed in the segment. 3609a6a180bSChuck Lever * 3619a6a180bSChuck Lever * Returns number of segments in this chunk. 3629a6a180bSChuck Lever */ 3639a6a180bSChuck Lever static unsigned int xdr_encode_write_chunk(__be32 *dst, __be32 *src, 3649a6a180bSChuck Lever unsigned int remaining) 3659a6a180bSChuck Lever { 3669a6a180bSChuck Lever unsigned int i, nsegs; 3679a6a180bSChuck Lever u32 seg_len; 3689a6a180bSChuck Lever 3699a6a180bSChuck Lever /* Write list discriminator */ 3709a6a180bSChuck Lever *dst++ = *src++; 3719a6a180bSChuck Lever 3729a6a180bSChuck Lever /* number of segments in this chunk */ 3739a6a180bSChuck Lever nsegs = be32_to_cpup(src); 3749a6a180bSChuck Lever *dst++ = *src++; 3759a6a180bSChuck Lever 3769a6a180bSChuck Lever for (i = nsegs; i; i--) { 3779a6a180bSChuck Lever /* segment's RDMA handle */ 3789a6a180bSChuck Lever *dst++ = *src++; 3799a6a180bSChuck Lever 3809a6a180bSChuck Lever /* bytes returned in this segment */ 3819a6a180bSChuck Lever seg_len = be32_to_cpu(*src); 3829a6a180bSChuck Lever if (remaining >= seg_len) { 3839a6a180bSChuck Lever /* entire segment was consumed */ 3849a6a180bSChuck Lever *dst = *src; 3859a6a180bSChuck Lever remaining -= seg_len; 3869a6a180bSChuck Lever } else { 3879a6a180bSChuck Lever /* segment only partly filled */ 3889a6a180bSChuck Lever *dst = cpu_to_be32(remaining); 3899a6a180bSChuck Lever remaining = 0; 3909a6a180bSChuck Lever } 3919a6a180bSChuck Lever dst++; src++; 3929a6a180bSChuck Lever 3939a6a180bSChuck Lever /* segment's RDMA offset */ 3949a6a180bSChuck Lever *dst++ = *src++; 3959a6a180bSChuck Lever *dst++ = *src++; 3969a6a180bSChuck Lever } 3979a6a180bSChuck Lever 3989a6a180bSChuck Lever return nsegs; 3999a6a180bSChuck Lever } 4009a6a180bSChuck Lever 4019a6a180bSChuck Lever /* The client provided a Write list in the Call message. Fill in 4029a6a180bSChuck Lever * the segments in the first Write chunk in the Reply's transport 4039a6a180bSChuck Lever * header with the number of bytes consumed in each segment. 4049a6a180bSChuck Lever * Remaining chunks are returned unused. 4059a6a180bSChuck Lever * 4069a6a180bSChuck Lever * Assumptions: 4079a6a180bSChuck Lever * - Client has provided only one Write chunk 4089a6a180bSChuck Lever */ 4099a6a180bSChuck Lever static void svc_rdma_xdr_encode_write_list(__be32 *rdma_resp, __be32 *wr_ch, 4109a6a180bSChuck Lever unsigned int consumed) 4119a6a180bSChuck Lever { 4129a6a180bSChuck Lever unsigned int nsegs; 4139a6a180bSChuck Lever __be32 *p, *q; 4149a6a180bSChuck Lever 4159a6a180bSChuck Lever /* RPC-over-RDMA V1 replies never have a Read list. */ 4169a6a180bSChuck Lever p = rdma_resp + rpcrdma_fixed_maxsz + 1; 4179a6a180bSChuck Lever 4189a6a180bSChuck Lever q = wr_ch; 4199a6a180bSChuck Lever while (*q != xdr_zero) { 4209a6a180bSChuck Lever nsegs = xdr_encode_write_chunk(p, q, consumed); 4219a6a180bSChuck Lever q += 2 + nsegs * rpcrdma_segment_maxsz; 4229a6a180bSChuck Lever p += 2 + nsegs * rpcrdma_segment_maxsz; 4239a6a180bSChuck Lever consumed = 0; 4249a6a180bSChuck Lever } 4259a6a180bSChuck Lever 4269a6a180bSChuck Lever /* Terminate Write list */ 4279a6a180bSChuck Lever *p++ = xdr_zero; 4289a6a180bSChuck Lever 4299a6a180bSChuck Lever /* Reply chunk discriminator; may be replaced later */ 4309a6a180bSChuck Lever *p = xdr_zero; 4319a6a180bSChuck Lever } 4329a6a180bSChuck Lever 4339a6a180bSChuck Lever /* The client provided a Reply chunk in the Call message. Fill in 4349a6a180bSChuck Lever * the segments in the Reply chunk in the Reply message with the 4359a6a180bSChuck Lever * number of bytes consumed in each segment. 4369a6a180bSChuck Lever * 4379a6a180bSChuck Lever * Assumptions: 4389a6a180bSChuck Lever * - Reply can always fit in the provided Reply chunk 4399a6a180bSChuck Lever */ 4409a6a180bSChuck Lever static void svc_rdma_xdr_encode_reply_chunk(__be32 *rdma_resp, __be32 *rp_ch, 4419a6a180bSChuck Lever unsigned int consumed) 4429a6a180bSChuck Lever { 4439a6a180bSChuck Lever __be32 *p; 4449a6a180bSChuck Lever 4459a6a180bSChuck Lever /* Find the Reply chunk in the Reply's xprt header. 4469a6a180bSChuck Lever * RPC-over-RDMA V1 replies never have a Read list. 4479a6a180bSChuck Lever */ 4489a6a180bSChuck Lever p = rdma_resp + rpcrdma_fixed_maxsz + 1; 4499a6a180bSChuck Lever 4509a6a180bSChuck Lever /* Skip past Write list */ 4519a6a180bSChuck Lever while (*p++ != xdr_zero) 4529a6a180bSChuck Lever p += 1 + be32_to_cpup(p) * rpcrdma_segment_maxsz; 4539a6a180bSChuck Lever 4549a6a180bSChuck Lever xdr_encode_write_chunk(p, rp_ch, consumed); 4559a6a180bSChuck Lever } 4569a6a180bSChuck Lever 4575fdca653SChuck Lever /* Parse the RPC Call's transport header. 45810dc4512SChuck Lever */ 4599a6a180bSChuck Lever static void svc_rdma_get_write_arrays(__be32 *rdma_argp, 4609a6a180bSChuck Lever __be32 **write, __be32 **reply) 46110dc4512SChuck Lever { 4625fdca653SChuck Lever __be32 *p; 46310dc4512SChuck Lever 4649a6a180bSChuck Lever p = rdma_argp + rpcrdma_fixed_maxsz; 4655fdca653SChuck Lever 4665fdca653SChuck Lever /* Read list */ 4675fdca653SChuck Lever while (*p++ != xdr_zero) 4685fdca653SChuck Lever p += 5; 4695fdca653SChuck Lever 4705fdca653SChuck Lever /* Write list */ 4715fdca653SChuck Lever if (*p != xdr_zero) { 4729a6a180bSChuck Lever *write = p; 4735fdca653SChuck Lever while (*p++ != xdr_zero) 4745fdca653SChuck Lever p += 1 + be32_to_cpu(*p) * 4; 4755fdca653SChuck Lever } else { 4765fdca653SChuck Lever *write = NULL; 4775fdca653SChuck Lever p++; 47810dc4512SChuck Lever } 47910dc4512SChuck Lever 4805fdca653SChuck Lever /* Reply chunk */ 4815fdca653SChuck Lever if (*p != xdr_zero) 4829a6a180bSChuck Lever *reply = p; 4835fdca653SChuck Lever else 4845fdca653SChuck Lever *reply = NULL; 48510dc4512SChuck Lever } 48610dc4512SChuck Lever 4876e6092caSChuck Lever static int svc_rdma_dma_map_page(struct svcxprt_rdma *rdma, 4884201c746SChuck Lever struct svc_rdma_send_ctxt *ctxt, 4896e6092caSChuck Lever struct page *page, 490f016f305SChuck Lever unsigned long offset, 4916e6092caSChuck Lever unsigned int len) 4926e6092caSChuck Lever { 4936e6092caSChuck Lever struct ib_device *dev = rdma->sc_cm_id->device; 4946e6092caSChuck Lever dma_addr_t dma_addr; 4956e6092caSChuck Lever 4966e6092caSChuck Lever dma_addr = ib_dma_map_page(dev, page, offset, len, DMA_TO_DEVICE); 497832b2cb9SChuck Lever trace_svcrdma_dma_map_page(rdma, dma_addr, len); 4986e6092caSChuck Lever if (ib_dma_mapping_error(dev, dma_addr)) 49991a08eaeSChuck Lever goto out_maperr; 5006e6092caSChuck Lever 50125fd86ecSChuck Lever ctxt->sc_sges[ctxt->sc_cur_sge_no].addr = dma_addr; 50225fd86ecSChuck Lever ctxt->sc_sges[ctxt->sc_cur_sge_no].length = len; 5034201c746SChuck Lever ctxt->sc_send_wr.num_sge++; 5046e6092caSChuck Lever return 0; 50591a08eaeSChuck Lever 50691a08eaeSChuck Lever out_maperr: 50791a08eaeSChuck Lever return -EIO; 5086e6092caSChuck Lever } 5096e6092caSChuck Lever 510f016f305SChuck Lever /* ib_dma_map_page() is used here because svc_rdma_dma_unmap() 511f016f305SChuck Lever * handles DMA-unmap and it uses ib_dma_unmap_page() exclusively. 512f016f305SChuck Lever */ 513f016f305SChuck Lever static int svc_rdma_dma_map_buf(struct svcxprt_rdma *rdma, 5144201c746SChuck Lever struct svc_rdma_send_ctxt *ctxt, 515f016f305SChuck Lever unsigned char *base, 516f016f305SChuck Lever unsigned int len) 517f016f305SChuck Lever { 51825fd86ecSChuck Lever return svc_rdma_dma_map_page(rdma, ctxt, virt_to_page(base), 519f016f305SChuck Lever offset_in_page(base), len); 520f016f305SChuck Lever } 521f016f305SChuck Lever 5226e6092caSChuck Lever /** 52399722fe4SChuck Lever * svc_rdma_sync_reply_hdr - DMA sync the transport header buffer 5246e6092caSChuck Lever * @rdma: controlling transport 52599722fe4SChuck Lever * @ctxt: send_ctxt for the Send WR 5266e6092caSChuck Lever * @len: length of transport header 5276e6092caSChuck Lever * 5286e6092caSChuck Lever */ 52999722fe4SChuck Lever void svc_rdma_sync_reply_hdr(struct svcxprt_rdma *rdma, 5304201c746SChuck Lever struct svc_rdma_send_ctxt *ctxt, 5316e6092caSChuck Lever unsigned int len) 5326e6092caSChuck Lever { 53399722fe4SChuck Lever ctxt->sc_sges[0].length = len; 53499722fe4SChuck Lever ctxt->sc_send_wr.num_sge++; 53599722fe4SChuck Lever ib_dma_sync_single_for_device(rdma->sc_pd->device, 53699722fe4SChuck Lever ctxt->sc_sges[0].addr, len, 53799722fe4SChuck Lever DMA_TO_DEVICE); 5386e6092caSChuck Lever } 5396e6092caSChuck Lever 540e248aa7bSChuck Lever /* If the xdr_buf has more elements than the device can 541e248aa7bSChuck Lever * transmit in a single RDMA Send, then the reply will 542e248aa7bSChuck Lever * have to be copied into a bounce buffer. 543e248aa7bSChuck Lever */ 544e248aa7bSChuck Lever static bool svc_rdma_pull_up_needed(struct svcxprt_rdma *rdma, 545e248aa7bSChuck Lever struct xdr_buf *xdr, 546e248aa7bSChuck Lever __be32 *wr_lst) 547e248aa7bSChuck Lever { 548e248aa7bSChuck Lever int elements; 549e248aa7bSChuck Lever 550e248aa7bSChuck Lever /* xdr->head */ 551e248aa7bSChuck Lever elements = 1; 552e248aa7bSChuck Lever 553e248aa7bSChuck Lever /* xdr->pages */ 554e248aa7bSChuck Lever if (!wr_lst) { 555e248aa7bSChuck Lever unsigned int remaining; 556e248aa7bSChuck Lever unsigned long pageoff; 557e248aa7bSChuck Lever 558e248aa7bSChuck Lever pageoff = xdr->page_base & ~PAGE_MASK; 559e248aa7bSChuck Lever remaining = xdr->page_len; 560e248aa7bSChuck Lever while (remaining) { 561e248aa7bSChuck Lever ++elements; 562e248aa7bSChuck Lever remaining -= min_t(u32, PAGE_SIZE - pageoff, 563e248aa7bSChuck Lever remaining); 564e248aa7bSChuck Lever pageoff = 0; 565e248aa7bSChuck Lever } 566e248aa7bSChuck Lever } 567e248aa7bSChuck Lever 568e248aa7bSChuck Lever /* xdr->tail */ 569e248aa7bSChuck Lever if (xdr->tail[0].iov_len) 570e248aa7bSChuck Lever ++elements; 571e248aa7bSChuck Lever 572e248aa7bSChuck Lever /* assume 1 SGE is needed for the transport header */ 573e248aa7bSChuck Lever return elements >= rdma->sc_max_send_sges; 574e248aa7bSChuck Lever } 575e248aa7bSChuck Lever 576e248aa7bSChuck Lever /* The device is not capable of sending the reply directly. 577e248aa7bSChuck Lever * Assemble the elements of @xdr into the transport header 578e248aa7bSChuck Lever * buffer. 579e248aa7bSChuck Lever */ 580e248aa7bSChuck Lever static int svc_rdma_pull_up_reply_msg(struct svcxprt_rdma *rdma, 581e248aa7bSChuck Lever struct svc_rdma_send_ctxt *ctxt, 582e248aa7bSChuck Lever struct xdr_buf *xdr, __be32 *wr_lst) 583e248aa7bSChuck Lever { 584e248aa7bSChuck Lever unsigned char *dst, *tailbase; 585e248aa7bSChuck Lever unsigned int taillen; 586e248aa7bSChuck Lever 587e248aa7bSChuck Lever dst = ctxt->sc_xprt_buf; 588e248aa7bSChuck Lever dst += ctxt->sc_sges[0].length; 589e248aa7bSChuck Lever 590e248aa7bSChuck Lever memcpy(dst, xdr->head[0].iov_base, xdr->head[0].iov_len); 591e248aa7bSChuck Lever dst += xdr->head[0].iov_len; 592e248aa7bSChuck Lever 593e248aa7bSChuck Lever tailbase = xdr->tail[0].iov_base; 594e248aa7bSChuck Lever taillen = xdr->tail[0].iov_len; 595e248aa7bSChuck Lever if (wr_lst) { 596e248aa7bSChuck Lever u32 xdrpad; 597e248aa7bSChuck Lever 598e248aa7bSChuck Lever xdrpad = xdr_padsize(xdr->page_len); 599e248aa7bSChuck Lever if (taillen && xdrpad) { 600e248aa7bSChuck Lever tailbase += xdrpad; 601e248aa7bSChuck Lever taillen -= xdrpad; 602e248aa7bSChuck Lever } 603e248aa7bSChuck Lever } else { 604e248aa7bSChuck Lever unsigned int len, remaining; 605e248aa7bSChuck Lever unsigned long pageoff; 606e248aa7bSChuck Lever struct page **ppages; 607e248aa7bSChuck Lever 608e248aa7bSChuck Lever ppages = xdr->pages + (xdr->page_base >> PAGE_SHIFT); 609e248aa7bSChuck Lever pageoff = xdr->page_base & ~PAGE_MASK; 610e248aa7bSChuck Lever remaining = xdr->page_len; 611e248aa7bSChuck Lever while (remaining) { 612e248aa7bSChuck Lever len = min_t(u32, PAGE_SIZE - pageoff, remaining); 613e248aa7bSChuck Lever 614e248aa7bSChuck Lever memcpy(dst, page_address(*ppages), len); 615e248aa7bSChuck Lever remaining -= len; 616e248aa7bSChuck Lever dst += len; 617e248aa7bSChuck Lever pageoff = 0; 618e248aa7bSChuck Lever } 619e248aa7bSChuck Lever } 620e248aa7bSChuck Lever 621e248aa7bSChuck Lever if (taillen) 622e248aa7bSChuck Lever memcpy(dst, tailbase, taillen); 623e248aa7bSChuck Lever 624e248aa7bSChuck Lever ctxt->sc_sges[0].length += xdr->len; 625e248aa7bSChuck Lever ib_dma_sync_single_for_device(rdma->sc_pd->device, 626e248aa7bSChuck Lever ctxt->sc_sges[0].addr, 627e248aa7bSChuck Lever ctxt->sc_sges[0].length, 628e248aa7bSChuck Lever DMA_TO_DEVICE); 629e248aa7bSChuck Lever 630e248aa7bSChuck Lever return 0; 631e248aa7bSChuck Lever } 632e248aa7bSChuck Lever 63399722fe4SChuck Lever /* svc_rdma_map_reply_msg - Map the buffer holding RPC message 63499722fe4SChuck Lever * @rdma: controlling transport 63599722fe4SChuck Lever * @ctxt: send_ctxt for the Send WR 63699722fe4SChuck Lever * @xdr: prepared xdr_buf containing RPC message 63799722fe4SChuck Lever * @wr_lst: pointer to Call header's Write list, or NULL 63899722fe4SChuck Lever * 63999722fe4SChuck Lever * Load the xdr_buf into the ctxt's sge array, and DMA map each 6409a6a180bSChuck Lever * element as it is added. 6419a6a180bSChuck Lever * 64223262790SChuck Lever * Returns zero on success, or a negative errno on failure. 643c06b540aSTom Tucker */ 64499722fe4SChuck Lever int svc_rdma_map_reply_msg(struct svcxprt_rdma *rdma, 6454201c746SChuck Lever struct svc_rdma_send_ctxt *ctxt, 6469a6a180bSChuck Lever struct xdr_buf *xdr, __be32 *wr_lst) 647c06b540aSTom Tucker { 64825fd86ecSChuck Lever unsigned int len, remaining; 649f016f305SChuck Lever unsigned long page_off; 6509a6a180bSChuck Lever struct page **ppages; 6519a6a180bSChuck Lever unsigned char *base; 6529a6a180bSChuck Lever u32 xdr_pad; 653c06b540aSTom Tucker int ret; 654c06b540aSTom Tucker 655e248aa7bSChuck Lever if (svc_rdma_pull_up_needed(rdma, xdr, wr_lst)) 656e248aa7bSChuck Lever return svc_rdma_pull_up_reply_msg(rdma, ctxt, xdr, wr_lst); 657e248aa7bSChuck Lever 658e248aa7bSChuck Lever ++ctxt->sc_cur_sge_no; 65925fd86ecSChuck Lever ret = svc_rdma_dma_map_buf(rdma, ctxt, 6609a6a180bSChuck Lever xdr->head[0].iov_base, 6619a6a180bSChuck Lever xdr->head[0].iov_len); 6629a6a180bSChuck Lever if (ret < 0) 6639a6a180bSChuck Lever return ret; 664c06b540aSTom Tucker 6659a6a180bSChuck Lever /* If a Write chunk is present, the xdr_buf's page list 6669a6a180bSChuck Lever * is not included inline. However the Upper Layer may 6679a6a180bSChuck Lever * have added XDR padding in the tail buffer, and that 6689a6a180bSChuck Lever * should not be included inline. 6699a6a180bSChuck Lever */ 6709a6a180bSChuck Lever if (wr_lst) { 6719a6a180bSChuck Lever base = xdr->tail[0].iov_base; 6729a6a180bSChuck Lever len = xdr->tail[0].iov_len; 6739a6a180bSChuck Lever xdr_pad = xdr_padsize(xdr->page_len); 674c06b540aSTom Tucker 6759a6a180bSChuck Lever if (len && xdr_pad) { 6769a6a180bSChuck Lever base += xdr_pad; 6779a6a180bSChuck Lever len -= xdr_pad; 678c06b540aSTom Tucker } 679c06b540aSTom Tucker 6809a6a180bSChuck Lever goto tail; 681c06b540aSTom Tucker } 6829a6a180bSChuck Lever 6839a6a180bSChuck Lever ppages = xdr->pages + (xdr->page_base >> PAGE_SHIFT); 6849a6a180bSChuck Lever page_off = xdr->page_base & ~PAGE_MASK; 6859a6a180bSChuck Lever remaining = xdr->page_len; 6869a6a180bSChuck Lever while (remaining) { 6879a6a180bSChuck Lever len = min_t(u32, PAGE_SIZE - page_off, remaining); 6889a6a180bSChuck Lever 689e248aa7bSChuck Lever ++ctxt->sc_cur_sge_no; 69025fd86ecSChuck Lever ret = svc_rdma_dma_map_page(rdma, ctxt, *ppages++, 69125fd86ecSChuck Lever page_off, len); 6929a6a180bSChuck Lever if (ret < 0) 6939a6a180bSChuck Lever return ret; 6949a6a180bSChuck Lever 6959a6a180bSChuck Lever remaining -= len; 6969a6a180bSChuck Lever page_off = 0; 697c06b540aSTom Tucker } 698c06b540aSTom Tucker 6999a6a180bSChuck Lever base = xdr->tail[0].iov_base; 7009a6a180bSChuck Lever len = xdr->tail[0].iov_len; 7019a6a180bSChuck Lever tail: 7029a6a180bSChuck Lever if (len) { 703e248aa7bSChuck Lever ++ctxt->sc_cur_sge_no; 70425fd86ecSChuck Lever ret = svc_rdma_dma_map_buf(rdma, ctxt, base, len); 7059a6a180bSChuck Lever if (ret < 0) 7069a6a180bSChuck Lever return ret; 7079a6a180bSChuck Lever } 70808ae4e7fSChuck Lever 70923262790SChuck Lever return 0; 710c06b540aSTom Tucker } 711c06b540aSTom Tucker 712c55ab070SChuck Lever /* The svc_rqst and all resources it owns are released as soon as 713c55ab070SChuck Lever * svc_rdma_sendto returns. Transfer pages under I/O to the ctxt 714c55ab070SChuck Lever * so they are released by the Send completion handler. 715c55ab070SChuck Lever */ 716c55ab070SChuck Lever static void svc_rdma_save_io_pages(struct svc_rqst *rqstp, 7174201c746SChuck Lever struct svc_rdma_send_ctxt *ctxt) 718c55ab070SChuck Lever { 719c55ab070SChuck Lever int i, pages = rqstp->rq_next_page - rqstp->rq_respages; 720c55ab070SChuck Lever 7214201c746SChuck Lever ctxt->sc_page_count += pages; 722c55ab070SChuck Lever for (i = 0; i < pages; i++) { 72399722fe4SChuck Lever ctxt->sc_pages[i] = rqstp->rq_respages[i]; 724c55ab070SChuck Lever rqstp->rq_respages[i] = NULL; 725c55ab070SChuck Lever } 726a53d5cb0SChuck Lever 727a53d5cb0SChuck Lever /* Prevent svc_xprt_release from releasing pages in rq_pages */ 728a53d5cb0SChuck Lever rqstp->rq_next_page = rqstp->rq_respages; 729c55ab070SChuck Lever } 730c55ab070SChuck Lever 7319a6a180bSChuck Lever /* Prepare the portion of the RPC Reply that will be transmitted 7329a6a180bSChuck Lever * via RDMA Send. The RPC-over-RDMA transport header is prepared 7334201c746SChuck Lever * in sc_sges[0], and the RPC xdr_buf is prepared in following sges. 7349a6a180bSChuck Lever * 7359a6a180bSChuck Lever * Depending on whether a Write list or Reply chunk is present, 7369a6a180bSChuck Lever * the server may send all, a portion of, or none of the xdr_buf. 7374201c746SChuck Lever * In the latter case, only the transport header (sc_sges[0]) is 7389a6a180bSChuck Lever * transmitted. 7399a6a180bSChuck Lever * 7409a6a180bSChuck Lever * RDMA Send is the last step of transmitting an RPC reply. Pages 7419a6a180bSChuck Lever * involved in the earlier RDMA Writes are here transferred out 74297bce634SChuck Lever * of the rqstp and into the sctxt's page array. These pages are 7439a6a180bSChuck Lever * DMA unmapped by each Write completion, but the subsequent Send 7449a6a180bSChuck Lever * completion finally releases these pages. 7459a6a180bSChuck Lever * 7469a6a180bSChuck Lever * Assumptions: 7479a6a180bSChuck Lever * - The Reply's transport header will never be larger than a page. 748c06b540aSTom Tucker */ 7499a6a180bSChuck Lever static int svc_rdma_send_reply_msg(struct svcxprt_rdma *rdma, 75097bce634SChuck Lever struct svc_rdma_send_ctxt *sctxt, 75197bce634SChuck Lever struct svc_rdma_recv_ctxt *rctxt, 752c06b540aSTom Tucker struct svc_rqst *rqstp, 7539a6a180bSChuck Lever __be32 *wr_lst, __be32 *rp_ch) 754c06b540aSTom Tucker { 7559a6a180bSChuck Lever int ret; 7560e7f011aSTom Tucker 7579a6a180bSChuck Lever if (!rp_ch) { 75897bce634SChuck Lever ret = svc_rdma_map_reply_msg(rdma, sctxt, 7599a6a180bSChuck Lever &rqstp->rq_res, wr_lst); 7609a6a180bSChuck Lever if (ret < 0) 76199722fe4SChuck Lever return ret; 7623fe04ee9SChuck Lever } 763c06b540aSTom Tucker 76497bce634SChuck Lever svc_rdma_save_io_pages(rqstp, sctxt); 7650bf48289SSteve Wise 76697bce634SChuck Lever if (rctxt->rc_inv_rkey) { 76797bce634SChuck Lever sctxt->sc_send_wr.opcode = IB_WR_SEND_WITH_INV; 76897bce634SChuck Lever sctxt->sc_send_wr.ex.invalidate_rkey = rctxt->rc_inv_rkey; 76997bce634SChuck Lever } else { 77097bce634SChuck Lever sctxt->sc_send_wr.opcode = IB_WR_SEND; 771986b7889SChuck Lever } 772986b7889SChuck Lever dprintk("svcrdma: posting Send WR with %u sge(s)\n", 77397bce634SChuck Lever sctxt->sc_send_wr.num_sge); 77497bce634SChuck Lever return svc_rdma_send(rdma, &sctxt->sc_send_wr); 775c06b540aSTom Tucker } 776c06b540aSTom Tucker 7774757d90bSChuck Lever /* Given the client-provided Write and Reply chunks, the server was not 7784757d90bSChuck Lever * able to form a complete reply. Return an RDMA_ERROR message so the 7794757d90bSChuck Lever * client can retire this RPC transaction. As above, the Send completion 7804757d90bSChuck Lever * routine releases payload pages that were part of a previous RDMA Write. 7814757d90bSChuck Lever * 7824757d90bSChuck Lever * Remote Invalidation is skipped for simplicity. 7834757d90bSChuck Lever */ 7844757d90bSChuck Lever static int svc_rdma_send_error_msg(struct svcxprt_rdma *rdma, 78599722fe4SChuck Lever struct svc_rdma_send_ctxt *ctxt, 78699722fe4SChuck Lever struct svc_rqst *rqstp) 7874757d90bSChuck Lever { 7884757d90bSChuck Lever __be32 *p; 7894757d90bSChuck Lever int ret; 7904757d90bSChuck Lever 79199722fe4SChuck Lever p = ctxt->sc_xprt_buf; 79299722fe4SChuck Lever trace_svcrdma_err_chunk(*p); 79399722fe4SChuck Lever p += 3; 7944757d90bSChuck Lever *p++ = rdma_error; 7954757d90bSChuck Lever *p = err_chunk; 79699722fe4SChuck Lever svc_rdma_sync_reply_hdr(rdma, ctxt, RPCRDMA_HDRLEN_ERR); 7974757d90bSChuck Lever 7984757d90bSChuck Lever svc_rdma_save_io_pages(rqstp, ctxt); 7994757d90bSChuck Lever 800986b7889SChuck Lever ctxt->sc_send_wr.opcode = IB_WR_SEND; 801986b7889SChuck Lever ret = svc_rdma_send(rdma, &ctxt->sc_send_wr); 80299722fe4SChuck Lever if (ret) { 8034201c746SChuck Lever svc_rdma_send_ctxt_put(rdma, ctxt); 8044757d90bSChuck Lever return ret; 8054757d90bSChuck Lever } 8064757d90bSChuck Lever 80799722fe4SChuck Lever return 0; 80899722fe4SChuck Lever } 80999722fe4SChuck Lever 8109a6a180bSChuck Lever /** 8119a6a180bSChuck Lever * svc_rdma_sendto - Transmit an RPC reply 8129a6a180bSChuck Lever * @rqstp: processed RPC request, reply XDR already in ::rq_res 8139a6a180bSChuck Lever * 8149a6a180bSChuck Lever * Any resources still associated with @rqstp are released upon return. 8159a6a180bSChuck Lever * If no reply message was possible, the connection is closed. 8169a6a180bSChuck Lever * 8179a6a180bSChuck Lever * Returns: 8189a6a180bSChuck Lever * %0 if an RPC reply has been successfully posted, 8199a6a180bSChuck Lever * %-ENOMEM if a resource shortage occurred (connection is lost), 8209a6a180bSChuck Lever * %-ENOTCONN if posting failed (connection is lost). 8219a6a180bSChuck Lever */ 822c06b540aSTom Tucker int svc_rdma_sendto(struct svc_rqst *rqstp) 823c06b540aSTom Tucker { 824c06b540aSTom Tucker struct svc_xprt *xprt = rqstp->rq_xprt; 825c06b540aSTom Tucker struct svcxprt_rdma *rdma = 826c06b540aSTom Tucker container_of(xprt, struct svcxprt_rdma, sc_xprt); 8273a88092eSChuck Lever struct svc_rdma_recv_ctxt *rctxt = rqstp->rq_xprt_ctxt; 8289a6a180bSChuck Lever __be32 *p, *rdma_argp, *rdma_resp, *wr_lst, *rp_ch; 8299a6a180bSChuck Lever struct xdr_buf *xdr = &rqstp->rq_res; 83099722fe4SChuck Lever struct svc_rdma_send_ctxt *sctxt; 8319a6a180bSChuck Lever int ret; 832c06b540aSTom Tucker 8333316f063SChuck Lever rdma_argp = rctxt->rc_recv_buf; 8349a6a180bSChuck Lever svc_rdma_get_write_arrays(rdma_argp, &wr_lst, &rp_ch); 835c06b540aSTom Tucker 836e4eb42ceSChuck Lever /* Create the RDMA response header. xprt->xpt_mutex, 837e4eb42ceSChuck Lever * acquired in svc_send(), serializes RPC replies. The 838e4eb42ceSChuck Lever * code path below that inserts the credit grant value 839e4eb42ceSChuck Lever * into each transport header runs only inside this 840e4eb42ceSChuck Lever * critical section. 841e4eb42ceSChuck Lever */ 84278da2b3cSChuck Lever ret = -ENOMEM; 84399722fe4SChuck Lever sctxt = svc_rdma_send_ctxt_get(rdma); 84499722fe4SChuck Lever if (!sctxt) 84578da2b3cSChuck Lever goto err0; 84699722fe4SChuck Lever rdma_resp = sctxt->sc_xprt_buf; 84798fc21d3SChuck Lever 8489a6a180bSChuck Lever p = rdma_resp; 8499a6a180bSChuck Lever *p++ = *rdma_argp; 8509a6a180bSChuck Lever *p++ = *(rdma_argp + 1); 85198fc21d3SChuck Lever *p++ = rdma->sc_fc_credits; 8529a6a180bSChuck Lever *p++ = rp_ch ? rdma_nomsg : rdma_msg; 85398fc21d3SChuck Lever 85498fc21d3SChuck Lever /* Start with empty chunks */ 85598fc21d3SChuck Lever *p++ = xdr_zero; 85698fc21d3SChuck Lever *p++ = xdr_zero; 85798fc21d3SChuck Lever *p = xdr_zero; 858c06b540aSTom Tucker 8599a6a180bSChuck Lever if (wr_lst) { 8609a6a180bSChuck Lever /* XXX: Presume the client sent only one Write chunk */ 86141205539SChuck Lever unsigned long offset; 86241205539SChuck Lever unsigned int length; 86341205539SChuck Lever 86441205539SChuck Lever if (rctxt->rc_read_payload_length) { 86541205539SChuck Lever offset = rctxt->rc_read_payload_offset; 86641205539SChuck Lever length = rctxt->rc_read_payload_length; 86741205539SChuck Lever } else { 86841205539SChuck Lever offset = xdr->head[0].iov_len; 86941205539SChuck Lever length = xdr->page_len; 87041205539SChuck Lever } 87141205539SChuck Lever ret = svc_rdma_send_write_chunk(rdma, wr_lst, xdr, offset, 87241205539SChuck Lever length); 87308ae4e7fSChuck Lever if (ret < 0) 8744757d90bSChuck Lever goto err2; 8759a6a180bSChuck Lever svc_rdma_xdr_encode_write_list(rdma_resp, wr_lst, ret); 87608ae4e7fSChuck Lever } 8779a6a180bSChuck Lever if (rp_ch) { 8789a6a180bSChuck Lever ret = svc_rdma_send_reply_chunk(rdma, rp_ch, wr_lst, xdr); 87908ae4e7fSChuck Lever if (ret < 0) 8804757d90bSChuck Lever goto err2; 8819a6a180bSChuck Lever svc_rdma_xdr_encode_reply_chunk(rdma_resp, rp_ch, ret); 88208ae4e7fSChuck Lever } 883c06b540aSTom Tucker 88499722fe4SChuck Lever svc_rdma_sync_reply_hdr(rdma, sctxt, svc_rdma_reply_hdr_len(rdma_resp)); 88597bce634SChuck Lever ret = svc_rdma_send_reply_msg(rdma, sctxt, rctxt, rqstp, 8869a6a180bSChuck Lever wr_lst, rp_ch); 8873e1eeb98SChuck Lever if (ret < 0) 88899722fe4SChuck Lever goto err1; 8893a88092eSChuck Lever ret = 0; 8903a88092eSChuck Lever 8913a88092eSChuck Lever out: 8923a88092eSChuck Lever rqstp->rq_xprt_ctxt = NULL; 8933a88092eSChuck Lever svc_rdma_recv_ctxt_put(rdma, rctxt); 8943a88092eSChuck Lever return ret; 895afd566eaSTom Tucker 8964757d90bSChuck Lever err2: 897b20dae70SColin Ian King if (ret != -E2BIG && ret != -EINVAL) 8984757d90bSChuck Lever goto err1; 8994757d90bSChuck Lever 90099722fe4SChuck Lever ret = svc_rdma_send_error_msg(rdma, sctxt, rqstp); 9014757d90bSChuck Lever if (ret < 0) 90299722fe4SChuck Lever goto err1; 9033a88092eSChuck Lever ret = 0; 9043a88092eSChuck Lever goto out; 9054757d90bSChuck Lever 906afd566eaSTom Tucker err1: 90799722fe4SChuck Lever svc_rdma_send_ctxt_put(rdma, sctxt); 908afd566eaSTom Tucker err0: 909bd2abef3SChuck Lever trace_svcrdma_send_failed(rqstp, ret); 9109a6a180bSChuck Lever set_bit(XPT_CLOSE, &xprt->xpt_flags); 9113a88092eSChuck Lever ret = -ENOTCONN; 9123a88092eSChuck Lever goto out; 913c06b540aSTom Tucker } 91441205539SChuck Lever 91541205539SChuck Lever /** 91641205539SChuck Lever * svc_rdma_read_payload - special processing for a READ payload 91741205539SChuck Lever * @rqstp: svc_rqst to operate on 91841205539SChuck Lever * @offset: payload's byte offset in @xdr 91941205539SChuck Lever * @length: size of payload, in bytes 92041205539SChuck Lever * 92141205539SChuck Lever * Returns zero on success. 92241205539SChuck Lever * 92341205539SChuck Lever * For the moment, just record the xdr_buf location of the READ 92441205539SChuck Lever * payload. svc_rdma_sendto will use that location later when 92541205539SChuck Lever * we actually send the payload. 92641205539SChuck Lever */ 92741205539SChuck Lever int svc_rdma_read_payload(struct svc_rqst *rqstp, unsigned int offset, 92841205539SChuck Lever unsigned int length) 92941205539SChuck Lever { 93041205539SChuck Lever struct svc_rdma_recv_ctxt *rctxt = rqstp->rq_xprt_ctxt; 93141205539SChuck Lever 93241205539SChuck Lever /* XXX: Just one READ payload slot for now, since our 93341205539SChuck Lever * transport implementation currently supports only one 93441205539SChuck Lever * Write chunk. 93541205539SChuck Lever */ 93641205539SChuck Lever rctxt->rc_read_payload_offset = offset; 93741205539SChuck Lever rctxt->rc_read_payload_length = length; 93841205539SChuck Lever 93941205539SChuck Lever return 0; 94041205539SChuck Lever } 941