1bcf3ffd4SChuck Lever // SPDX-License-Identifier: GPL-2.0 OR BSD-3-Clause 2c06b540aSTom Tucker /* 3ecf85b23SChuck Lever * Copyright (c) 2016-2018 Oracle. All rights reserved. 40bf48289SSteve Wise * Copyright (c) 2014 Open Grid Computing, Inc. All rights reserved. 5c06b540aSTom Tucker * Copyright (c) 2005-2006 Network Appliance, Inc. All rights reserved. 6c06b540aSTom Tucker * 7c06b540aSTom Tucker * This software is available to you under a choice of one of two 8c06b540aSTom Tucker * licenses. You may choose to be licensed under the terms of the GNU 9c06b540aSTom Tucker * General Public License (GPL) Version 2, available from the file 10c06b540aSTom Tucker * COPYING in the main directory of this source tree, or the BSD-type 11c06b540aSTom Tucker * license below: 12c06b540aSTom Tucker * 13c06b540aSTom Tucker * Redistribution and use in source and binary forms, with or without 14c06b540aSTom Tucker * modification, are permitted provided that the following conditions 15c06b540aSTom Tucker * are met: 16c06b540aSTom Tucker * 17c06b540aSTom Tucker * Redistributions of source code must retain the above copyright 18c06b540aSTom Tucker * notice, this list of conditions and the following disclaimer. 19c06b540aSTom Tucker * 20c06b540aSTom Tucker * Redistributions in binary form must reproduce the above 21c06b540aSTom Tucker * copyright notice, this list of conditions and the following 22c06b540aSTom Tucker * disclaimer in the documentation and/or other materials provided 23c06b540aSTom Tucker * with the distribution. 24c06b540aSTom Tucker * 25c06b540aSTom Tucker * Neither the name of the Network Appliance, Inc. nor the names of 26c06b540aSTom Tucker * its contributors may be used to endorse or promote products 27c06b540aSTom Tucker * derived from this software without specific prior written 28c06b540aSTom Tucker * permission. 29c06b540aSTom Tucker * 30c06b540aSTom Tucker * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 31c06b540aSTom Tucker * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 32c06b540aSTom Tucker * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 33c06b540aSTom Tucker * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 34c06b540aSTom Tucker * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 35c06b540aSTom Tucker * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 36c06b540aSTom Tucker * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 37c06b540aSTom Tucker * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 38c06b540aSTom Tucker * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 39c06b540aSTom Tucker * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 40c06b540aSTom Tucker * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 41c06b540aSTom Tucker * 42c06b540aSTom Tucker * Author: Tom Tucker <tom@opengridcomputing.com> 43c06b540aSTom Tucker */ 44c06b540aSTom Tucker 459a6a180bSChuck Lever /* Operation 469a6a180bSChuck Lever * 479a6a180bSChuck Lever * The main entry point is svc_rdma_sendto. This is called by the 489a6a180bSChuck Lever * RPC server when an RPC Reply is ready to be transmitted to a client. 499a6a180bSChuck Lever * 509a6a180bSChuck Lever * The passed-in svc_rqst contains a struct xdr_buf which holds an 519a6a180bSChuck Lever * XDR-encoded RPC Reply message. sendto must construct the RPC-over-RDMA 529a6a180bSChuck Lever * transport header, post all Write WRs needed for this Reply, then post 539a6a180bSChuck Lever * a Send WR conveying the transport header and the RPC message itself to 549a6a180bSChuck Lever * the client. 559a6a180bSChuck Lever * 569a6a180bSChuck Lever * svc_rdma_sendto must fully transmit the Reply before returning, as 579a6a180bSChuck Lever * the svc_rqst will be recycled as soon as sendto returns. Remaining 589a6a180bSChuck Lever * resources referred to by the svc_rqst are also recycled at that time. 599a6a180bSChuck Lever * Therefore any resources that must remain longer must be detached 609a6a180bSChuck Lever * from the svc_rqst and released later. 619a6a180bSChuck Lever * 629a6a180bSChuck Lever * Page Management 639a6a180bSChuck Lever * 649a6a180bSChuck Lever * The I/O that performs Reply transmission is asynchronous, and may 659a6a180bSChuck Lever * complete well after sendto returns. Thus pages under I/O must be 669a6a180bSChuck Lever * removed from the svc_rqst before sendto returns. 679a6a180bSChuck Lever * 689a6a180bSChuck Lever * The logic here depends on Send Queue and completion ordering. Since 699a6a180bSChuck Lever * the Send WR is always posted last, it will always complete last. Thus 709a6a180bSChuck Lever * when it completes, it is guaranteed that all previous Write WRs have 719a6a180bSChuck Lever * also completed. 729a6a180bSChuck Lever * 739a6a180bSChuck Lever * Write WRs are constructed and posted. Each Write segment gets its own 749a6a180bSChuck Lever * svc_rdma_rw_ctxt, allowing the Write completion handler to find and 759a6a180bSChuck Lever * DMA-unmap the pages under I/O for that Write segment. The Write 769a6a180bSChuck Lever * completion handler does not release any pages. 779a6a180bSChuck Lever * 784201c746SChuck Lever * When the Send WR is constructed, it also gets its own svc_rdma_send_ctxt. 799a6a180bSChuck Lever * The ownership of all of the Reply's pages are transferred into that 809a6a180bSChuck Lever * ctxt, the Send WR is posted, and sendto returns. 819a6a180bSChuck Lever * 824201c746SChuck Lever * The svc_rdma_send_ctxt is presented when the Send WR completes. The 839a6a180bSChuck Lever * Send completion handler finally releases the Reply's pages. 849a6a180bSChuck Lever * 859a6a180bSChuck Lever * This mechanism also assumes that completions on the transport's Send 869a6a180bSChuck Lever * Completion Queue do not run in parallel. Otherwise a Write completion 879a6a180bSChuck Lever * and Send completion running at the same time could release pages that 889a6a180bSChuck Lever * are still DMA-mapped. 899a6a180bSChuck Lever * 909a6a180bSChuck Lever * Error Handling 919a6a180bSChuck Lever * 929a6a180bSChuck Lever * - If the Send WR is posted successfully, it will either complete 939a6a180bSChuck Lever * successfully, or get flushed. Either way, the Send completion 949a6a180bSChuck Lever * handler releases the Reply's pages. 959a6a180bSChuck Lever * - If the Send WR cannot be not posted, the forward path releases 969a6a180bSChuck Lever * the Reply's pages. 979a6a180bSChuck Lever * 989a6a180bSChuck Lever * This handles the case, without the use of page reference counting, 999a6a180bSChuck Lever * where two different Write segments send portions of the same page. 1009a6a180bSChuck Lever */ 1019a6a180bSChuck Lever 102c06b540aSTom Tucker #include <linux/spinlock.h> 103c06b540aSTom Tucker #include <asm/unaligned.h> 10498895edbSChuck Lever 105c06b540aSTom Tucker #include <rdma/ib_verbs.h> 106c06b540aSTom Tucker #include <rdma/rdma_cm.h> 10798895edbSChuck Lever 10898895edbSChuck Lever #include <linux/sunrpc/debug.h> 10998895edbSChuck Lever #include <linux/sunrpc/rpc_rdma.h> 110c06b540aSTom Tucker #include <linux/sunrpc/svc_rdma.h> 111c06b540aSTom Tucker 11298895edbSChuck Lever #include "xprt_rdma.h" 11398895edbSChuck Lever #include <trace/events/rpcrdma.h> 11498895edbSChuck Lever 115c06b540aSTom Tucker #define RPCDBG_FACILITY RPCDBG_SVCXPRT 116c06b540aSTom Tucker 1174201c746SChuck Lever static void svc_rdma_wc_send(struct ib_cq *cq, struct ib_wc *wc); 1184201c746SChuck Lever 1194201c746SChuck Lever static inline struct svc_rdma_send_ctxt * 1204201c746SChuck Lever svc_rdma_next_send_ctxt(struct list_head *list) 1214201c746SChuck Lever { 1224201c746SChuck Lever return list_first_entry_or_null(list, struct svc_rdma_send_ctxt, 1234201c746SChuck Lever sc_list); 1244201c746SChuck Lever } 1254201c746SChuck Lever 1264201c746SChuck Lever static struct svc_rdma_send_ctxt * 1274201c746SChuck Lever svc_rdma_send_ctxt_alloc(struct svcxprt_rdma *rdma) 1284201c746SChuck Lever { 1294201c746SChuck Lever struct svc_rdma_send_ctxt *ctxt; 13099722fe4SChuck Lever dma_addr_t addr; 13199722fe4SChuck Lever void *buffer; 13225fd86ecSChuck Lever size_t size; 1334201c746SChuck Lever int i; 1344201c746SChuck Lever 13525fd86ecSChuck Lever size = sizeof(*ctxt); 13625fd86ecSChuck Lever size += rdma->sc_max_send_sges * sizeof(struct ib_sge); 13725fd86ecSChuck Lever ctxt = kmalloc(size, GFP_KERNEL); 1384201c746SChuck Lever if (!ctxt) 13999722fe4SChuck Lever goto fail0; 14099722fe4SChuck Lever buffer = kmalloc(rdma->sc_max_req_size, GFP_KERNEL); 14199722fe4SChuck Lever if (!buffer) 14299722fe4SChuck Lever goto fail1; 14399722fe4SChuck Lever addr = ib_dma_map_single(rdma->sc_pd->device, buffer, 14499722fe4SChuck Lever rdma->sc_max_req_size, DMA_TO_DEVICE); 14599722fe4SChuck Lever if (ib_dma_mapping_error(rdma->sc_pd->device, addr)) 14699722fe4SChuck Lever goto fail2; 1474201c746SChuck Lever 1484201c746SChuck Lever ctxt->sc_send_wr.next = NULL; 1494201c746SChuck Lever ctxt->sc_send_wr.wr_cqe = &ctxt->sc_cqe; 1504201c746SChuck Lever ctxt->sc_send_wr.sg_list = ctxt->sc_sges; 1514201c746SChuck Lever ctxt->sc_send_wr.send_flags = IB_SEND_SIGNALED; 15299722fe4SChuck Lever ctxt->sc_cqe.done = svc_rdma_wc_send; 15399722fe4SChuck Lever ctxt->sc_xprt_buf = buffer; 15499722fe4SChuck Lever ctxt->sc_sges[0].addr = addr; 15599722fe4SChuck Lever 15625fd86ecSChuck Lever for (i = 0; i < rdma->sc_max_send_sges; i++) 1574201c746SChuck Lever ctxt->sc_sges[i].lkey = rdma->sc_pd->local_dma_lkey; 1584201c746SChuck Lever return ctxt; 15999722fe4SChuck Lever 16099722fe4SChuck Lever fail2: 16199722fe4SChuck Lever kfree(buffer); 16299722fe4SChuck Lever fail1: 16399722fe4SChuck Lever kfree(ctxt); 16499722fe4SChuck Lever fail0: 16599722fe4SChuck Lever return NULL; 1664201c746SChuck Lever } 1674201c746SChuck Lever 1684201c746SChuck Lever /** 1694201c746SChuck Lever * svc_rdma_send_ctxts_destroy - Release all send_ctxt's for an xprt 1704201c746SChuck Lever * @rdma: svcxprt_rdma being torn down 1714201c746SChuck Lever * 1724201c746SChuck Lever */ 1734201c746SChuck Lever void svc_rdma_send_ctxts_destroy(struct svcxprt_rdma *rdma) 1744201c746SChuck Lever { 1754201c746SChuck Lever struct svc_rdma_send_ctxt *ctxt; 1764201c746SChuck Lever 1774201c746SChuck Lever while ((ctxt = svc_rdma_next_send_ctxt(&rdma->sc_send_ctxts))) { 1784201c746SChuck Lever list_del(&ctxt->sc_list); 17999722fe4SChuck Lever ib_dma_unmap_single(rdma->sc_pd->device, 18099722fe4SChuck Lever ctxt->sc_sges[0].addr, 18199722fe4SChuck Lever rdma->sc_max_req_size, 18299722fe4SChuck Lever DMA_TO_DEVICE); 18399722fe4SChuck Lever kfree(ctxt->sc_xprt_buf); 1844201c746SChuck Lever kfree(ctxt); 1854201c746SChuck Lever } 1864201c746SChuck Lever } 1874201c746SChuck Lever 1884201c746SChuck Lever /** 1894201c746SChuck Lever * svc_rdma_send_ctxt_get - Get a free send_ctxt 1904201c746SChuck Lever * @rdma: controlling svcxprt_rdma 1914201c746SChuck Lever * 1924201c746SChuck Lever * Returns a ready-to-use send_ctxt, or NULL if none are 1934201c746SChuck Lever * available and a fresh one cannot be allocated. 1944201c746SChuck Lever */ 1954201c746SChuck Lever struct svc_rdma_send_ctxt *svc_rdma_send_ctxt_get(struct svcxprt_rdma *rdma) 1964201c746SChuck Lever { 1974201c746SChuck Lever struct svc_rdma_send_ctxt *ctxt; 1984201c746SChuck Lever 1994201c746SChuck Lever spin_lock(&rdma->sc_send_lock); 2004201c746SChuck Lever ctxt = svc_rdma_next_send_ctxt(&rdma->sc_send_ctxts); 2014201c746SChuck Lever if (!ctxt) 2024201c746SChuck Lever goto out_empty; 2034201c746SChuck Lever list_del(&ctxt->sc_list); 2044201c746SChuck Lever spin_unlock(&rdma->sc_send_lock); 2054201c746SChuck Lever 2064201c746SChuck Lever out: 2074201c746SChuck Lever ctxt->sc_send_wr.num_sge = 0; 20899722fe4SChuck Lever ctxt->sc_cur_sge_no = 0; 2094201c746SChuck Lever ctxt->sc_page_count = 0; 2104201c746SChuck Lever return ctxt; 2114201c746SChuck Lever 2124201c746SChuck Lever out_empty: 2134201c746SChuck Lever spin_unlock(&rdma->sc_send_lock); 2144201c746SChuck Lever ctxt = svc_rdma_send_ctxt_alloc(rdma); 2154201c746SChuck Lever if (!ctxt) 2164201c746SChuck Lever return NULL; 2174201c746SChuck Lever goto out; 2184201c746SChuck Lever } 2194201c746SChuck Lever 2204201c746SChuck Lever /** 2214201c746SChuck Lever * svc_rdma_send_ctxt_put - Return send_ctxt to free list 2224201c746SChuck Lever * @rdma: controlling svcxprt_rdma 2234201c746SChuck Lever * @ctxt: object to return to the free list 2244201c746SChuck Lever * 2254201c746SChuck Lever * Pages left in sc_pages are DMA unmapped and released. 2264201c746SChuck Lever */ 2274201c746SChuck Lever void svc_rdma_send_ctxt_put(struct svcxprt_rdma *rdma, 2284201c746SChuck Lever struct svc_rdma_send_ctxt *ctxt) 2294201c746SChuck Lever { 2304201c746SChuck Lever struct ib_device *device = rdma->sc_cm_id->device; 2314201c746SChuck Lever unsigned int i; 2324201c746SChuck Lever 23399722fe4SChuck Lever /* The first SGE contains the transport header, which 23499722fe4SChuck Lever * remains mapped until @ctxt is destroyed. 23599722fe4SChuck Lever */ 236832b2cb9SChuck Lever for (i = 1; i < ctxt->sc_send_wr.num_sge; i++) { 2374201c746SChuck Lever ib_dma_unmap_page(device, 2384201c746SChuck Lever ctxt->sc_sges[i].addr, 2394201c746SChuck Lever ctxt->sc_sges[i].length, 2404201c746SChuck Lever DMA_TO_DEVICE); 241832b2cb9SChuck Lever trace_svcrdma_dma_unmap_page(rdma, 242832b2cb9SChuck Lever ctxt->sc_sges[i].addr, 243832b2cb9SChuck Lever ctxt->sc_sges[i].length); 244832b2cb9SChuck Lever } 2454201c746SChuck Lever 2464201c746SChuck Lever for (i = 0; i < ctxt->sc_page_count; ++i) 2474201c746SChuck Lever put_page(ctxt->sc_pages[i]); 2484201c746SChuck Lever 2494201c746SChuck Lever spin_lock(&rdma->sc_send_lock); 2504201c746SChuck Lever list_add(&ctxt->sc_list, &rdma->sc_send_ctxts); 2514201c746SChuck Lever spin_unlock(&rdma->sc_send_lock); 2524201c746SChuck Lever } 2534201c746SChuck Lever 2544201c746SChuck Lever /** 2554201c746SChuck Lever * svc_rdma_wc_send - Invoked by RDMA provider for each polled Send WC 2564201c746SChuck Lever * @cq: Completion Queue context 2574201c746SChuck Lever * @wc: Work Completion object 2584201c746SChuck Lever * 2594201c746SChuck Lever * NB: The svc_xprt/svcxprt_rdma is pinned whenever it's possible that 2604201c746SChuck Lever * the Send completion handler could be running. 2614201c746SChuck Lever */ 2624201c746SChuck Lever static void svc_rdma_wc_send(struct ib_cq *cq, struct ib_wc *wc) 2634201c746SChuck Lever { 2644201c746SChuck Lever struct svcxprt_rdma *rdma = cq->cq_context; 2654201c746SChuck Lever struct ib_cqe *cqe = wc->wr_cqe; 2664201c746SChuck Lever struct svc_rdma_send_ctxt *ctxt; 2674201c746SChuck Lever 2684201c746SChuck Lever trace_svcrdma_wc_send(wc); 2694201c746SChuck Lever 2704201c746SChuck Lever atomic_inc(&rdma->sc_sq_avail); 2714201c746SChuck Lever wake_up(&rdma->sc_send_wait); 2724201c746SChuck Lever 2734201c746SChuck Lever ctxt = container_of(cqe, struct svc_rdma_send_ctxt, sc_cqe); 2744201c746SChuck Lever svc_rdma_send_ctxt_put(rdma, ctxt); 2754201c746SChuck Lever 2764201c746SChuck Lever if (unlikely(wc->status != IB_WC_SUCCESS)) { 2774201c746SChuck Lever set_bit(XPT_CLOSE, &rdma->sc_xprt.xpt_flags); 2784201c746SChuck Lever svc_xprt_enqueue(&rdma->sc_xprt); 2794201c746SChuck Lever } 2804201c746SChuck Lever 2814201c746SChuck Lever svc_xprt_put(&rdma->sc_xprt); 2824201c746SChuck Lever } 2834201c746SChuck Lever 2843abb03faSChuck Lever /** 2853abb03faSChuck Lever * svc_rdma_send - Post a single Send WR 2863abb03faSChuck Lever * @rdma: transport on which to post the WR 2873abb03faSChuck Lever * @wr: prepared Send WR to post 2883abb03faSChuck Lever * 2893abb03faSChuck Lever * Returns zero the Send WR was posted successfully. Otherwise, a 2903abb03faSChuck Lever * negative errno is returned. 2913abb03faSChuck Lever */ 2924201c746SChuck Lever int svc_rdma_send(struct svcxprt_rdma *rdma, struct ib_send_wr *wr) 2934201c746SChuck Lever { 2944201c746SChuck Lever int ret; 2954201c746SChuck Lever 2963abb03faSChuck Lever might_sleep(); 2974201c746SChuck Lever 2984201c746SChuck Lever /* If the SQ is full, wait until an SQ entry is available */ 2994201c746SChuck Lever while (1) { 3003abb03faSChuck Lever if ((atomic_dec_return(&rdma->sc_sq_avail) < 0)) { 3014201c746SChuck Lever atomic_inc(&rdma_stat_sq_starve); 3024201c746SChuck Lever trace_svcrdma_sq_full(rdma); 3033abb03faSChuck Lever atomic_inc(&rdma->sc_sq_avail); 3044201c746SChuck Lever wait_event(rdma->sc_send_wait, 3053abb03faSChuck Lever atomic_read(&rdma->sc_sq_avail) > 1); 3064201c746SChuck Lever if (test_bit(XPT_CLOSE, &rdma->sc_xprt.xpt_flags)) 3074201c746SChuck Lever return -ENOTCONN; 3084201c746SChuck Lever trace_svcrdma_sq_retry(rdma); 3094201c746SChuck Lever continue; 3104201c746SChuck Lever } 3114201c746SChuck Lever 3123abb03faSChuck Lever svc_xprt_get(&rdma->sc_xprt); 313ed288d74SBart Van Assche ret = ib_post_send(rdma->sc_qp, wr, NULL); 3144201c746SChuck Lever trace_svcrdma_post_send(wr, ret); 3154201c746SChuck Lever if (ret) { 3164201c746SChuck Lever set_bit(XPT_CLOSE, &rdma->sc_xprt.xpt_flags); 3174201c746SChuck Lever svc_xprt_put(&rdma->sc_xprt); 3184201c746SChuck Lever wake_up(&rdma->sc_send_wait); 3194201c746SChuck Lever } 3204201c746SChuck Lever break; 3214201c746SChuck Lever } 3224201c746SChuck Lever return ret; 3234201c746SChuck Lever } 3244201c746SChuck Lever 3259a6a180bSChuck Lever /* Returns length of transport header, in bytes. 3269a6a180bSChuck Lever */ 3279a6a180bSChuck Lever static unsigned int svc_rdma_reply_hdr_len(__be32 *rdma_resp) 3289a6a180bSChuck Lever { 3299a6a180bSChuck Lever unsigned int nsegs; 3309a6a180bSChuck Lever __be32 *p; 3319a6a180bSChuck Lever 3329a6a180bSChuck Lever p = rdma_resp; 3339a6a180bSChuck Lever 3349a6a180bSChuck Lever /* RPC-over-RDMA V1 replies never have a Read list. */ 3359a6a180bSChuck Lever p += rpcrdma_fixed_maxsz + 1; 3369a6a180bSChuck Lever 3379a6a180bSChuck Lever /* Skip Write list. */ 3389a6a180bSChuck Lever while (*p++ != xdr_zero) { 3399a6a180bSChuck Lever nsegs = be32_to_cpup(p++); 3409a6a180bSChuck Lever p += nsegs * rpcrdma_segment_maxsz; 3419a6a180bSChuck Lever } 3429a6a180bSChuck Lever 3439a6a180bSChuck Lever /* Skip Reply chunk. */ 3449a6a180bSChuck Lever if (*p++ != xdr_zero) { 3459a6a180bSChuck Lever nsegs = be32_to_cpup(p++); 3469a6a180bSChuck Lever p += nsegs * rpcrdma_segment_maxsz; 3479a6a180bSChuck Lever } 3489a6a180bSChuck Lever 3499a6a180bSChuck Lever return (unsigned long)p - (unsigned long)rdma_resp; 3509a6a180bSChuck Lever } 3519a6a180bSChuck Lever 3529a6a180bSChuck Lever /* One Write chunk is copied from Call transport header to Reply 3539a6a180bSChuck Lever * transport header. Each segment's length field is updated to 3549a6a180bSChuck Lever * reflect number of bytes consumed in the segment. 3559a6a180bSChuck Lever * 3569a6a180bSChuck Lever * Returns number of segments in this chunk. 3579a6a180bSChuck Lever */ 3589a6a180bSChuck Lever static unsigned int xdr_encode_write_chunk(__be32 *dst, __be32 *src, 3599a6a180bSChuck Lever unsigned int remaining) 3609a6a180bSChuck Lever { 3619a6a180bSChuck Lever unsigned int i, nsegs; 3629a6a180bSChuck Lever u32 seg_len; 3639a6a180bSChuck Lever 3649a6a180bSChuck Lever /* Write list discriminator */ 3659a6a180bSChuck Lever *dst++ = *src++; 3669a6a180bSChuck Lever 3679a6a180bSChuck Lever /* number of segments in this chunk */ 3689a6a180bSChuck Lever nsegs = be32_to_cpup(src); 3699a6a180bSChuck Lever *dst++ = *src++; 3709a6a180bSChuck Lever 3719a6a180bSChuck Lever for (i = nsegs; i; i--) { 3729a6a180bSChuck Lever /* segment's RDMA handle */ 3739a6a180bSChuck Lever *dst++ = *src++; 3749a6a180bSChuck Lever 3759a6a180bSChuck Lever /* bytes returned in this segment */ 3769a6a180bSChuck Lever seg_len = be32_to_cpu(*src); 3779a6a180bSChuck Lever if (remaining >= seg_len) { 3789a6a180bSChuck Lever /* entire segment was consumed */ 3799a6a180bSChuck Lever *dst = *src; 3809a6a180bSChuck Lever remaining -= seg_len; 3819a6a180bSChuck Lever } else { 3829a6a180bSChuck Lever /* segment only partly filled */ 3839a6a180bSChuck Lever *dst = cpu_to_be32(remaining); 3849a6a180bSChuck Lever remaining = 0; 3859a6a180bSChuck Lever } 3869a6a180bSChuck Lever dst++; src++; 3879a6a180bSChuck Lever 3889a6a180bSChuck Lever /* segment's RDMA offset */ 3899a6a180bSChuck Lever *dst++ = *src++; 3909a6a180bSChuck Lever *dst++ = *src++; 3919a6a180bSChuck Lever } 3929a6a180bSChuck Lever 3939a6a180bSChuck Lever return nsegs; 3949a6a180bSChuck Lever } 3959a6a180bSChuck Lever 3969a6a180bSChuck Lever /* The client provided a Write list in the Call message. Fill in 3979a6a180bSChuck Lever * the segments in the first Write chunk in the Reply's transport 3989a6a180bSChuck Lever * header with the number of bytes consumed in each segment. 3999a6a180bSChuck Lever * Remaining chunks are returned unused. 4009a6a180bSChuck Lever * 4019a6a180bSChuck Lever * Assumptions: 4029a6a180bSChuck Lever * - Client has provided only one Write chunk 4039a6a180bSChuck Lever */ 4049a6a180bSChuck Lever static void svc_rdma_xdr_encode_write_list(__be32 *rdma_resp, __be32 *wr_ch, 4059a6a180bSChuck Lever unsigned int consumed) 4069a6a180bSChuck Lever { 4079a6a180bSChuck Lever unsigned int nsegs; 4089a6a180bSChuck Lever __be32 *p, *q; 4099a6a180bSChuck Lever 4109a6a180bSChuck Lever /* RPC-over-RDMA V1 replies never have a Read list. */ 4119a6a180bSChuck Lever p = rdma_resp + rpcrdma_fixed_maxsz + 1; 4129a6a180bSChuck Lever 4139a6a180bSChuck Lever q = wr_ch; 4149a6a180bSChuck Lever while (*q != xdr_zero) { 4159a6a180bSChuck Lever nsegs = xdr_encode_write_chunk(p, q, consumed); 4169a6a180bSChuck Lever q += 2 + nsegs * rpcrdma_segment_maxsz; 4179a6a180bSChuck Lever p += 2 + nsegs * rpcrdma_segment_maxsz; 4189a6a180bSChuck Lever consumed = 0; 4199a6a180bSChuck Lever } 4209a6a180bSChuck Lever 4219a6a180bSChuck Lever /* Terminate Write list */ 4229a6a180bSChuck Lever *p++ = xdr_zero; 4239a6a180bSChuck Lever 4249a6a180bSChuck Lever /* Reply chunk discriminator; may be replaced later */ 4259a6a180bSChuck Lever *p = xdr_zero; 4269a6a180bSChuck Lever } 4279a6a180bSChuck Lever 4289a6a180bSChuck Lever /* The client provided a Reply chunk in the Call message. Fill in 4299a6a180bSChuck Lever * the segments in the Reply chunk in the Reply message with the 4309a6a180bSChuck Lever * number of bytes consumed in each segment. 4319a6a180bSChuck Lever * 4329a6a180bSChuck Lever * Assumptions: 4339a6a180bSChuck Lever * - Reply can always fit in the provided Reply chunk 4349a6a180bSChuck Lever */ 4359a6a180bSChuck Lever static void svc_rdma_xdr_encode_reply_chunk(__be32 *rdma_resp, __be32 *rp_ch, 4369a6a180bSChuck Lever unsigned int consumed) 4379a6a180bSChuck Lever { 4389a6a180bSChuck Lever __be32 *p; 4399a6a180bSChuck Lever 4409a6a180bSChuck Lever /* Find the Reply chunk in the Reply's xprt header. 4419a6a180bSChuck Lever * RPC-over-RDMA V1 replies never have a Read list. 4429a6a180bSChuck Lever */ 4439a6a180bSChuck Lever p = rdma_resp + rpcrdma_fixed_maxsz + 1; 4449a6a180bSChuck Lever 4459a6a180bSChuck Lever /* Skip past Write list */ 4469a6a180bSChuck Lever while (*p++ != xdr_zero) 4479a6a180bSChuck Lever p += 1 + be32_to_cpup(p) * rpcrdma_segment_maxsz; 4489a6a180bSChuck Lever 4499a6a180bSChuck Lever xdr_encode_write_chunk(p, rp_ch, consumed); 4509a6a180bSChuck Lever } 4519a6a180bSChuck Lever 4526e6092caSChuck Lever static int svc_rdma_dma_map_page(struct svcxprt_rdma *rdma, 4534201c746SChuck Lever struct svc_rdma_send_ctxt *ctxt, 4546e6092caSChuck Lever struct page *page, 455f016f305SChuck Lever unsigned long offset, 4566e6092caSChuck Lever unsigned int len) 4576e6092caSChuck Lever { 4586e6092caSChuck Lever struct ib_device *dev = rdma->sc_cm_id->device; 4596e6092caSChuck Lever dma_addr_t dma_addr; 4606e6092caSChuck Lever 4616e6092caSChuck Lever dma_addr = ib_dma_map_page(dev, page, offset, len, DMA_TO_DEVICE); 462832b2cb9SChuck Lever trace_svcrdma_dma_map_page(rdma, dma_addr, len); 4636e6092caSChuck Lever if (ib_dma_mapping_error(dev, dma_addr)) 46491a08eaeSChuck Lever goto out_maperr; 4656e6092caSChuck Lever 46625fd86ecSChuck Lever ctxt->sc_sges[ctxt->sc_cur_sge_no].addr = dma_addr; 46725fd86ecSChuck Lever ctxt->sc_sges[ctxt->sc_cur_sge_no].length = len; 4684201c746SChuck Lever ctxt->sc_send_wr.num_sge++; 4696e6092caSChuck Lever return 0; 47091a08eaeSChuck Lever 47191a08eaeSChuck Lever out_maperr: 47291a08eaeSChuck Lever return -EIO; 4736e6092caSChuck Lever } 4746e6092caSChuck Lever 475f016f305SChuck Lever /* ib_dma_map_page() is used here because svc_rdma_dma_unmap() 476f016f305SChuck Lever * handles DMA-unmap and it uses ib_dma_unmap_page() exclusively. 477f016f305SChuck Lever */ 478f016f305SChuck Lever static int svc_rdma_dma_map_buf(struct svcxprt_rdma *rdma, 4794201c746SChuck Lever struct svc_rdma_send_ctxt *ctxt, 480f016f305SChuck Lever unsigned char *base, 481f016f305SChuck Lever unsigned int len) 482f016f305SChuck Lever { 48325fd86ecSChuck Lever return svc_rdma_dma_map_page(rdma, ctxt, virt_to_page(base), 484f016f305SChuck Lever offset_in_page(base), len); 485f016f305SChuck Lever } 486f016f305SChuck Lever 4876e6092caSChuck Lever /** 48899722fe4SChuck Lever * svc_rdma_sync_reply_hdr - DMA sync the transport header buffer 4896e6092caSChuck Lever * @rdma: controlling transport 49099722fe4SChuck Lever * @ctxt: send_ctxt for the Send WR 4916e6092caSChuck Lever * @len: length of transport header 4926e6092caSChuck Lever * 4936e6092caSChuck Lever */ 49499722fe4SChuck Lever void svc_rdma_sync_reply_hdr(struct svcxprt_rdma *rdma, 4954201c746SChuck Lever struct svc_rdma_send_ctxt *ctxt, 4966e6092caSChuck Lever unsigned int len) 4976e6092caSChuck Lever { 49899722fe4SChuck Lever ctxt->sc_sges[0].length = len; 49999722fe4SChuck Lever ctxt->sc_send_wr.num_sge++; 50099722fe4SChuck Lever ib_dma_sync_single_for_device(rdma->sc_pd->device, 50199722fe4SChuck Lever ctxt->sc_sges[0].addr, len, 50299722fe4SChuck Lever DMA_TO_DEVICE); 5036e6092caSChuck Lever } 5046e6092caSChuck Lever 5054554755eSChuck Lever /** 5064554755eSChuck Lever * svc_rdma_pull_up_needed - Determine whether to use pull-up 5074554755eSChuck Lever * @rdma: controlling transport 5084554755eSChuck Lever * @rctxt: Write and Reply chunks provided by client 5094554755eSChuck Lever * @xdr: xdr_buf containing RPC message to transmit 5104554755eSChuck Lever * 5114554755eSChuck Lever * Returns: 5124554755eSChuck Lever * %true if pull-up must be used 5134554755eSChuck Lever * %false otherwise 514e248aa7bSChuck Lever */ 515e248aa7bSChuck Lever static bool svc_rdma_pull_up_needed(struct svcxprt_rdma *rdma, 5164554755eSChuck Lever const struct svc_rdma_recv_ctxt *rctxt, 5174554755eSChuck Lever struct xdr_buf *xdr) 518e248aa7bSChuck Lever { 519e248aa7bSChuck Lever int elements; 520e248aa7bSChuck Lever 521e248aa7bSChuck Lever /* xdr->head */ 522e248aa7bSChuck Lever elements = 1; 523e248aa7bSChuck Lever 524e248aa7bSChuck Lever /* xdr->pages */ 5254554755eSChuck Lever if (!rctxt || !rctxt->rc_write_list) { 526e248aa7bSChuck Lever unsigned int remaining; 527e248aa7bSChuck Lever unsigned long pageoff; 528e248aa7bSChuck Lever 529e248aa7bSChuck Lever pageoff = xdr->page_base & ~PAGE_MASK; 530e248aa7bSChuck Lever remaining = xdr->page_len; 531e248aa7bSChuck Lever while (remaining) { 532e248aa7bSChuck Lever ++elements; 533e248aa7bSChuck Lever remaining -= min_t(u32, PAGE_SIZE - pageoff, 534e248aa7bSChuck Lever remaining); 535e248aa7bSChuck Lever pageoff = 0; 536e248aa7bSChuck Lever } 537e248aa7bSChuck Lever } 538e248aa7bSChuck Lever 539e248aa7bSChuck Lever /* xdr->tail */ 540e248aa7bSChuck Lever if (xdr->tail[0].iov_len) 541e248aa7bSChuck Lever ++elements; 542e248aa7bSChuck Lever 543e248aa7bSChuck Lever /* assume 1 SGE is needed for the transport header */ 544e248aa7bSChuck Lever return elements >= rdma->sc_max_send_sges; 545e248aa7bSChuck Lever } 546e248aa7bSChuck Lever 5474554755eSChuck Lever /** 5484554755eSChuck Lever * svc_rdma_pull_up_reply_msg - Copy Reply into a single buffer 5494554755eSChuck Lever * @rdma: controlling transport 5504554755eSChuck Lever * @sctxt: send_ctxt for the Send WR; xprt hdr is already prepared 5514554755eSChuck Lever * @rctxt: Write and Reply chunks provided by client 5524554755eSChuck Lever * @xdr: prepared xdr_buf containing RPC message 5534554755eSChuck Lever * 5544554755eSChuck Lever * The device is not capable of sending the reply directly. 5554554755eSChuck Lever * Assemble the elements of @xdr into the transport header buffer. 5564554755eSChuck Lever * 5574554755eSChuck Lever * Returns zero on success, or a negative errno on failure. 558e248aa7bSChuck Lever */ 559e248aa7bSChuck Lever static int svc_rdma_pull_up_reply_msg(struct svcxprt_rdma *rdma, 5604554755eSChuck Lever struct svc_rdma_send_ctxt *sctxt, 5614554755eSChuck Lever const struct svc_rdma_recv_ctxt *rctxt, 5624554755eSChuck Lever const struct xdr_buf *xdr) 563e248aa7bSChuck Lever { 564e248aa7bSChuck Lever unsigned char *dst, *tailbase; 565e248aa7bSChuck Lever unsigned int taillen; 566e248aa7bSChuck Lever 5674554755eSChuck Lever dst = sctxt->sc_xprt_buf; 5684554755eSChuck Lever dst += sctxt->sc_sges[0].length; 569e248aa7bSChuck Lever 570e248aa7bSChuck Lever memcpy(dst, xdr->head[0].iov_base, xdr->head[0].iov_len); 571e248aa7bSChuck Lever dst += xdr->head[0].iov_len; 572e248aa7bSChuck Lever 573e248aa7bSChuck Lever tailbase = xdr->tail[0].iov_base; 574e248aa7bSChuck Lever taillen = xdr->tail[0].iov_len; 5754554755eSChuck Lever if (rctxt && rctxt->rc_write_list) { 576e248aa7bSChuck Lever u32 xdrpad; 577e248aa7bSChuck Lever 57896f194b7SChuck Lever xdrpad = xdr_pad_size(xdr->page_len); 579e248aa7bSChuck Lever if (taillen && xdrpad) { 580e248aa7bSChuck Lever tailbase += xdrpad; 581e248aa7bSChuck Lever taillen -= xdrpad; 582e248aa7bSChuck Lever } 583e248aa7bSChuck Lever } else { 584e248aa7bSChuck Lever unsigned int len, remaining; 585e248aa7bSChuck Lever unsigned long pageoff; 586e248aa7bSChuck Lever struct page **ppages; 587e248aa7bSChuck Lever 588e248aa7bSChuck Lever ppages = xdr->pages + (xdr->page_base >> PAGE_SHIFT); 589e248aa7bSChuck Lever pageoff = xdr->page_base & ~PAGE_MASK; 590e248aa7bSChuck Lever remaining = xdr->page_len; 591e248aa7bSChuck Lever while (remaining) { 592e248aa7bSChuck Lever len = min_t(u32, PAGE_SIZE - pageoff, remaining); 593e248aa7bSChuck Lever 594e248aa7bSChuck Lever memcpy(dst, page_address(*ppages), len); 595e248aa7bSChuck Lever remaining -= len; 596e248aa7bSChuck Lever dst += len; 597e248aa7bSChuck Lever pageoff = 0; 598e248aa7bSChuck Lever } 599e248aa7bSChuck Lever } 600e248aa7bSChuck Lever 601e248aa7bSChuck Lever if (taillen) 602e248aa7bSChuck Lever memcpy(dst, tailbase, taillen); 603e248aa7bSChuck Lever 6044554755eSChuck Lever sctxt->sc_sges[0].length += xdr->len; 605e248aa7bSChuck Lever ib_dma_sync_single_for_device(rdma->sc_pd->device, 6064554755eSChuck Lever sctxt->sc_sges[0].addr, 6074554755eSChuck Lever sctxt->sc_sges[0].length, 608e248aa7bSChuck Lever DMA_TO_DEVICE); 609e248aa7bSChuck Lever 610e248aa7bSChuck Lever return 0; 611e248aa7bSChuck Lever } 612e248aa7bSChuck Lever 6134554755eSChuck Lever /* svc_rdma_map_reply_msg - DMA map the buffer holding RPC message 61499722fe4SChuck Lever * @rdma: controlling transport 6154554755eSChuck Lever * @sctxt: send_ctxt for the Send WR 6164554755eSChuck Lever * @rctxt: Write and Reply chunks provided by client 61799722fe4SChuck Lever * @xdr: prepared xdr_buf containing RPC message 61899722fe4SChuck Lever * 61999722fe4SChuck Lever * Load the xdr_buf into the ctxt's sge array, and DMA map each 6209a6a180bSChuck Lever * element as it is added. 6219a6a180bSChuck Lever * 62223262790SChuck Lever * Returns zero on success, or a negative errno on failure. 623c06b540aSTom Tucker */ 62499722fe4SChuck Lever int svc_rdma_map_reply_msg(struct svcxprt_rdma *rdma, 6254554755eSChuck Lever struct svc_rdma_send_ctxt *sctxt, 6264554755eSChuck Lever const struct svc_rdma_recv_ctxt *rctxt, 6274554755eSChuck Lever struct xdr_buf *xdr) 628c06b540aSTom Tucker { 62925fd86ecSChuck Lever unsigned int len, remaining; 630f016f305SChuck Lever unsigned long page_off; 6319a6a180bSChuck Lever struct page **ppages; 6329a6a180bSChuck Lever unsigned char *base; 6339a6a180bSChuck Lever u32 xdr_pad; 634c06b540aSTom Tucker int ret; 635c06b540aSTom Tucker 6364554755eSChuck Lever if (svc_rdma_pull_up_needed(rdma, rctxt, xdr)) 6374554755eSChuck Lever return svc_rdma_pull_up_reply_msg(rdma, sctxt, rctxt, xdr); 638e248aa7bSChuck Lever 6394554755eSChuck Lever ++sctxt->sc_cur_sge_no; 6404554755eSChuck Lever ret = svc_rdma_dma_map_buf(rdma, sctxt, 6419a6a180bSChuck Lever xdr->head[0].iov_base, 6429a6a180bSChuck Lever xdr->head[0].iov_len); 6439a6a180bSChuck Lever if (ret < 0) 6449a6a180bSChuck Lever return ret; 645c06b540aSTom Tucker 6469a6a180bSChuck Lever /* If a Write chunk is present, the xdr_buf's page list 6479a6a180bSChuck Lever * is not included inline. However the Upper Layer may 6489a6a180bSChuck Lever * have added XDR padding in the tail buffer, and that 6499a6a180bSChuck Lever * should not be included inline. 6509a6a180bSChuck Lever */ 6514554755eSChuck Lever if (rctxt && rctxt->rc_write_list) { 6529a6a180bSChuck Lever base = xdr->tail[0].iov_base; 6539a6a180bSChuck Lever len = xdr->tail[0].iov_len; 65496f194b7SChuck Lever xdr_pad = xdr_pad_size(xdr->page_len); 655c06b540aSTom Tucker 6569a6a180bSChuck Lever if (len && xdr_pad) { 6579a6a180bSChuck Lever base += xdr_pad; 6589a6a180bSChuck Lever len -= xdr_pad; 659c06b540aSTom Tucker } 660c06b540aSTom Tucker 6619a6a180bSChuck Lever goto tail; 662c06b540aSTom Tucker } 6639a6a180bSChuck Lever 6649a6a180bSChuck Lever ppages = xdr->pages + (xdr->page_base >> PAGE_SHIFT); 6659a6a180bSChuck Lever page_off = xdr->page_base & ~PAGE_MASK; 6669a6a180bSChuck Lever remaining = xdr->page_len; 6679a6a180bSChuck Lever while (remaining) { 6689a6a180bSChuck Lever len = min_t(u32, PAGE_SIZE - page_off, remaining); 6699a6a180bSChuck Lever 6704554755eSChuck Lever ++sctxt->sc_cur_sge_no; 6714554755eSChuck Lever ret = svc_rdma_dma_map_page(rdma, sctxt, *ppages++, 67225fd86ecSChuck Lever page_off, len); 6739a6a180bSChuck Lever if (ret < 0) 6749a6a180bSChuck Lever return ret; 6759a6a180bSChuck Lever 6769a6a180bSChuck Lever remaining -= len; 6779a6a180bSChuck Lever page_off = 0; 678c06b540aSTom Tucker } 679c06b540aSTom Tucker 6809a6a180bSChuck Lever base = xdr->tail[0].iov_base; 6819a6a180bSChuck Lever len = xdr->tail[0].iov_len; 6829a6a180bSChuck Lever tail: 6839a6a180bSChuck Lever if (len) { 6844554755eSChuck Lever ++sctxt->sc_cur_sge_no; 6854554755eSChuck Lever ret = svc_rdma_dma_map_buf(rdma, sctxt, base, len); 6869a6a180bSChuck Lever if (ret < 0) 6879a6a180bSChuck Lever return ret; 6889a6a180bSChuck Lever } 68908ae4e7fSChuck Lever 69023262790SChuck Lever return 0; 691c06b540aSTom Tucker } 692c06b540aSTom Tucker 693c55ab070SChuck Lever /* The svc_rqst and all resources it owns are released as soon as 694c55ab070SChuck Lever * svc_rdma_sendto returns. Transfer pages under I/O to the ctxt 695c55ab070SChuck Lever * so they are released by the Send completion handler. 696c55ab070SChuck Lever */ 697c55ab070SChuck Lever static void svc_rdma_save_io_pages(struct svc_rqst *rqstp, 6984201c746SChuck Lever struct svc_rdma_send_ctxt *ctxt) 699c55ab070SChuck Lever { 700c55ab070SChuck Lever int i, pages = rqstp->rq_next_page - rqstp->rq_respages; 701c55ab070SChuck Lever 7024201c746SChuck Lever ctxt->sc_page_count += pages; 703c55ab070SChuck Lever for (i = 0; i < pages; i++) { 70499722fe4SChuck Lever ctxt->sc_pages[i] = rqstp->rq_respages[i]; 705c55ab070SChuck Lever rqstp->rq_respages[i] = NULL; 706c55ab070SChuck Lever } 707a53d5cb0SChuck Lever 708a53d5cb0SChuck Lever /* Prevent svc_xprt_release from releasing pages in rq_pages */ 709a53d5cb0SChuck Lever rqstp->rq_next_page = rqstp->rq_respages; 710c55ab070SChuck Lever } 711c55ab070SChuck Lever 7129a6a180bSChuck Lever /* Prepare the portion of the RPC Reply that will be transmitted 7139a6a180bSChuck Lever * via RDMA Send. The RPC-over-RDMA transport header is prepared 7144201c746SChuck Lever * in sc_sges[0], and the RPC xdr_buf is prepared in following sges. 7159a6a180bSChuck Lever * 7169a6a180bSChuck Lever * Depending on whether a Write list or Reply chunk is present, 7179a6a180bSChuck Lever * the server may send all, a portion of, or none of the xdr_buf. 7184201c746SChuck Lever * In the latter case, only the transport header (sc_sges[0]) is 7199a6a180bSChuck Lever * transmitted. 7209a6a180bSChuck Lever * 7219a6a180bSChuck Lever * RDMA Send is the last step of transmitting an RPC reply. Pages 7229a6a180bSChuck Lever * involved in the earlier RDMA Writes are here transferred out 72397bce634SChuck Lever * of the rqstp and into the sctxt's page array. These pages are 7249a6a180bSChuck Lever * DMA unmapped by each Write completion, but the subsequent Send 7259a6a180bSChuck Lever * completion finally releases these pages. 7269a6a180bSChuck Lever * 7279a6a180bSChuck Lever * Assumptions: 7289a6a180bSChuck Lever * - The Reply's transport header will never be larger than a page. 729c06b540aSTom Tucker */ 7309a6a180bSChuck Lever static int svc_rdma_send_reply_msg(struct svcxprt_rdma *rdma, 73197bce634SChuck Lever struct svc_rdma_send_ctxt *sctxt, 732db9602e4SChuck Lever const struct svc_rdma_recv_ctxt *rctxt, 733db9602e4SChuck Lever struct svc_rqst *rqstp) 734c06b540aSTom Tucker { 7359a6a180bSChuck Lever int ret; 7360e7f011aSTom Tucker 737db9602e4SChuck Lever if (!rctxt->rc_reply_chunk) { 7384554755eSChuck Lever ret = svc_rdma_map_reply_msg(rdma, sctxt, rctxt, 7394554755eSChuck Lever &rqstp->rq_res); 7409a6a180bSChuck Lever if (ret < 0) 74199722fe4SChuck Lever return ret; 7423fe04ee9SChuck Lever } 743c06b540aSTom Tucker 74497bce634SChuck Lever svc_rdma_save_io_pages(rqstp, sctxt); 7450bf48289SSteve Wise 74697bce634SChuck Lever if (rctxt->rc_inv_rkey) { 74797bce634SChuck Lever sctxt->sc_send_wr.opcode = IB_WR_SEND_WITH_INV; 74897bce634SChuck Lever sctxt->sc_send_wr.ex.invalidate_rkey = rctxt->rc_inv_rkey; 74997bce634SChuck Lever } else { 75097bce634SChuck Lever sctxt->sc_send_wr.opcode = IB_WR_SEND; 751986b7889SChuck Lever } 752986b7889SChuck Lever dprintk("svcrdma: posting Send WR with %u sge(s)\n", 75397bce634SChuck Lever sctxt->sc_send_wr.num_sge); 75497bce634SChuck Lever return svc_rdma_send(rdma, &sctxt->sc_send_wr); 755c06b540aSTom Tucker } 756c06b540aSTom Tucker 7574757d90bSChuck Lever /* Given the client-provided Write and Reply chunks, the server was not 7584757d90bSChuck Lever * able to form a complete reply. Return an RDMA_ERROR message so the 7594757d90bSChuck Lever * client can retire this RPC transaction. As above, the Send completion 7604757d90bSChuck Lever * routine releases payload pages that were part of a previous RDMA Write. 7614757d90bSChuck Lever * 7624757d90bSChuck Lever * Remote Invalidation is skipped for simplicity. 7634757d90bSChuck Lever */ 7644757d90bSChuck Lever static int svc_rdma_send_error_msg(struct svcxprt_rdma *rdma, 76599722fe4SChuck Lever struct svc_rdma_send_ctxt *ctxt, 76699722fe4SChuck Lever struct svc_rqst *rqstp) 7674757d90bSChuck Lever { 7684757d90bSChuck Lever __be32 *p; 7694757d90bSChuck Lever 77099722fe4SChuck Lever p = ctxt->sc_xprt_buf; 77199722fe4SChuck Lever trace_svcrdma_err_chunk(*p); 77299722fe4SChuck Lever p += 3; 7734757d90bSChuck Lever *p++ = rdma_error; 7744757d90bSChuck Lever *p = err_chunk; 77599722fe4SChuck Lever svc_rdma_sync_reply_hdr(rdma, ctxt, RPCRDMA_HDRLEN_ERR); 7764757d90bSChuck Lever 7774757d90bSChuck Lever svc_rdma_save_io_pages(rqstp, ctxt); 7784757d90bSChuck Lever 779986b7889SChuck Lever ctxt->sc_send_wr.opcode = IB_WR_SEND; 780758a3bf9SChuck Lever return svc_rdma_send(rdma, &ctxt->sc_send_wr); 78199722fe4SChuck Lever } 78299722fe4SChuck Lever 7839a6a180bSChuck Lever /** 7849a6a180bSChuck Lever * svc_rdma_sendto - Transmit an RPC reply 7859a6a180bSChuck Lever * @rqstp: processed RPC request, reply XDR already in ::rq_res 7869a6a180bSChuck Lever * 7879a6a180bSChuck Lever * Any resources still associated with @rqstp are released upon return. 7889a6a180bSChuck Lever * If no reply message was possible, the connection is closed. 7899a6a180bSChuck Lever * 7909a6a180bSChuck Lever * Returns: 7919a6a180bSChuck Lever * %0 if an RPC reply has been successfully posted, 7929a6a180bSChuck Lever * %-ENOMEM if a resource shortage occurred (connection is lost), 7939a6a180bSChuck Lever * %-ENOTCONN if posting failed (connection is lost). 7949a6a180bSChuck Lever */ 795c06b540aSTom Tucker int svc_rdma_sendto(struct svc_rqst *rqstp) 796c06b540aSTom Tucker { 797c06b540aSTom Tucker struct svc_xprt *xprt = rqstp->rq_xprt; 798c06b540aSTom Tucker struct svcxprt_rdma *rdma = 799c06b540aSTom Tucker container_of(xprt, struct svcxprt_rdma, sc_xprt); 8003a88092eSChuck Lever struct svc_rdma_recv_ctxt *rctxt = rqstp->rq_xprt_ctxt; 8012fe8c446SChuck Lever __be32 *rdma_argp = rctxt->rc_recv_buf; 8022fe8c446SChuck Lever __be32 *wr_lst = rctxt->rc_write_list; 8032fe8c446SChuck Lever __be32 *rp_ch = rctxt->rc_reply_chunk; 8049a6a180bSChuck Lever struct xdr_buf *xdr = &rqstp->rq_res; 80599722fe4SChuck Lever struct svc_rdma_send_ctxt *sctxt; 8062fe8c446SChuck Lever __be32 *p, *rdma_resp; 8079a6a180bSChuck Lever int ret; 808c06b540aSTom Tucker 809e4eb42ceSChuck Lever /* Create the RDMA response header. xprt->xpt_mutex, 810e4eb42ceSChuck Lever * acquired in svc_send(), serializes RPC replies. The 811e4eb42ceSChuck Lever * code path below that inserts the credit grant value 812e4eb42ceSChuck Lever * into each transport header runs only inside this 813e4eb42ceSChuck Lever * critical section. 814e4eb42ceSChuck Lever */ 81578da2b3cSChuck Lever ret = -ENOMEM; 81699722fe4SChuck Lever sctxt = svc_rdma_send_ctxt_get(rdma); 81799722fe4SChuck Lever if (!sctxt) 81878da2b3cSChuck Lever goto err0; 81999722fe4SChuck Lever rdma_resp = sctxt->sc_xprt_buf; 82098fc21d3SChuck Lever 8219a6a180bSChuck Lever p = rdma_resp; 8229a6a180bSChuck Lever *p++ = *rdma_argp; 8239a6a180bSChuck Lever *p++ = *(rdma_argp + 1); 82498fc21d3SChuck Lever *p++ = rdma->sc_fc_credits; 8259a6a180bSChuck Lever *p++ = rp_ch ? rdma_nomsg : rdma_msg; 82698fc21d3SChuck Lever 82798fc21d3SChuck Lever /* Start with empty chunks */ 82898fc21d3SChuck Lever *p++ = xdr_zero; 82998fc21d3SChuck Lever *p++ = xdr_zero; 83098fc21d3SChuck Lever *p = xdr_zero; 831c06b540aSTom Tucker 8329a6a180bSChuck Lever if (wr_lst) { 8339a6a180bSChuck Lever /* XXX: Presume the client sent only one Write chunk */ 83441205539SChuck Lever unsigned long offset; 83541205539SChuck Lever unsigned int length; 83641205539SChuck Lever 83741205539SChuck Lever if (rctxt->rc_read_payload_length) { 83841205539SChuck Lever offset = rctxt->rc_read_payload_offset; 83941205539SChuck Lever length = rctxt->rc_read_payload_length; 84041205539SChuck Lever } else { 84141205539SChuck Lever offset = xdr->head[0].iov_len; 84241205539SChuck Lever length = xdr->page_len; 84341205539SChuck Lever } 84441205539SChuck Lever ret = svc_rdma_send_write_chunk(rdma, wr_lst, xdr, offset, 84541205539SChuck Lever length); 84608ae4e7fSChuck Lever if (ret < 0) 8474757d90bSChuck Lever goto err2; 8489a6a180bSChuck Lever svc_rdma_xdr_encode_write_list(rdma_resp, wr_lst, ret); 84908ae4e7fSChuck Lever } 8509a6a180bSChuck Lever if (rp_ch) { 8516fa5785eSChuck Lever ret = svc_rdma_send_reply_chunk(rdma, rctxt, &rqstp->rq_res); 85208ae4e7fSChuck Lever if (ret < 0) 8534757d90bSChuck Lever goto err2; 8549a6a180bSChuck Lever svc_rdma_xdr_encode_reply_chunk(rdma_resp, rp_ch, ret); 85508ae4e7fSChuck Lever } 856c06b540aSTom Tucker 85799722fe4SChuck Lever svc_rdma_sync_reply_hdr(rdma, sctxt, svc_rdma_reply_hdr_len(rdma_resp)); 858db9602e4SChuck Lever ret = svc_rdma_send_reply_msg(rdma, sctxt, rctxt, rqstp); 8593e1eeb98SChuck Lever if (ret < 0) 86099722fe4SChuck Lever goto err1; 8613a88092eSChuck Lever ret = 0; 8623a88092eSChuck Lever 8633a88092eSChuck Lever out: 8643a88092eSChuck Lever rqstp->rq_xprt_ctxt = NULL; 8653a88092eSChuck Lever svc_rdma_recv_ctxt_put(rdma, rctxt); 8663a88092eSChuck Lever return ret; 867afd566eaSTom Tucker 8684757d90bSChuck Lever err2: 869b20dae70SColin Ian King if (ret != -E2BIG && ret != -EINVAL) 8704757d90bSChuck Lever goto err1; 8714757d90bSChuck Lever 87299722fe4SChuck Lever ret = svc_rdma_send_error_msg(rdma, sctxt, rqstp); 8734757d90bSChuck Lever if (ret < 0) 87499722fe4SChuck Lever goto err1; 8753a88092eSChuck Lever ret = 0; 8763a88092eSChuck Lever goto out; 8774757d90bSChuck Lever 878afd566eaSTom Tucker err1: 87999722fe4SChuck Lever svc_rdma_send_ctxt_put(rdma, sctxt); 880afd566eaSTom Tucker err0: 881bd2abef3SChuck Lever trace_svcrdma_send_failed(rqstp, ret); 8829a6a180bSChuck Lever set_bit(XPT_CLOSE, &xprt->xpt_flags); 8833a88092eSChuck Lever ret = -ENOTCONN; 8843a88092eSChuck Lever goto out; 885c06b540aSTom Tucker } 88641205539SChuck Lever 88741205539SChuck Lever /** 88841205539SChuck Lever * svc_rdma_read_payload - special processing for a READ payload 88941205539SChuck Lever * @rqstp: svc_rqst to operate on 89041205539SChuck Lever * @offset: payload's byte offset in @xdr 89141205539SChuck Lever * @length: size of payload, in bytes 89241205539SChuck Lever * 89341205539SChuck Lever * Returns zero on success. 89441205539SChuck Lever * 89541205539SChuck Lever * For the moment, just record the xdr_buf location of the READ 89641205539SChuck Lever * payload. svc_rdma_sendto will use that location later when 89741205539SChuck Lever * we actually send the payload. 89841205539SChuck Lever */ 89941205539SChuck Lever int svc_rdma_read_payload(struct svc_rqst *rqstp, unsigned int offset, 90041205539SChuck Lever unsigned int length) 90141205539SChuck Lever { 90241205539SChuck Lever struct svc_rdma_recv_ctxt *rctxt = rqstp->rq_xprt_ctxt; 90341205539SChuck Lever 90441205539SChuck Lever /* XXX: Just one READ payload slot for now, since our 90541205539SChuck Lever * transport implementation currently supports only one 90641205539SChuck Lever * Write chunk. 90741205539SChuck Lever */ 90841205539SChuck Lever rctxt->rc_read_payload_offset = offset; 90941205539SChuck Lever rctxt->rc_read_payload_length = length; 91041205539SChuck Lever 91141205539SChuck Lever return 0; 91241205539SChuck Lever } 913