1bcf3ffd4SChuck Lever // SPDX-License-Identifier: GPL-2.0 OR BSD-3-Clause 2c06b540aSTom Tucker /* 3ecf85b23SChuck Lever * Copyright (c) 2016-2018 Oracle. All rights reserved. 40bf48289SSteve Wise * Copyright (c) 2014 Open Grid Computing, Inc. All rights reserved. 5c06b540aSTom Tucker * Copyright (c) 2005-2006 Network Appliance, Inc. All rights reserved. 6c06b540aSTom Tucker * 7c06b540aSTom Tucker * This software is available to you under a choice of one of two 8c06b540aSTom Tucker * licenses. You may choose to be licensed under the terms of the GNU 9c06b540aSTom Tucker * General Public License (GPL) Version 2, available from the file 10c06b540aSTom Tucker * COPYING in the main directory of this source tree, or the BSD-type 11c06b540aSTom Tucker * license below: 12c06b540aSTom Tucker * 13c06b540aSTom Tucker * Redistribution and use in source and binary forms, with or without 14c06b540aSTom Tucker * modification, are permitted provided that the following conditions 15c06b540aSTom Tucker * are met: 16c06b540aSTom Tucker * 17c06b540aSTom Tucker * Redistributions of source code must retain the above copyright 18c06b540aSTom Tucker * notice, this list of conditions and the following disclaimer. 19c06b540aSTom Tucker * 20c06b540aSTom Tucker * Redistributions in binary form must reproduce the above 21c06b540aSTom Tucker * copyright notice, this list of conditions and the following 22c06b540aSTom Tucker * disclaimer in the documentation and/or other materials provided 23c06b540aSTom Tucker * with the distribution. 24c06b540aSTom Tucker * 25c06b540aSTom Tucker * Neither the name of the Network Appliance, Inc. nor the names of 26c06b540aSTom Tucker * its contributors may be used to endorse or promote products 27c06b540aSTom Tucker * derived from this software without specific prior written 28c06b540aSTom Tucker * permission. 29c06b540aSTom Tucker * 30c06b540aSTom Tucker * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 31c06b540aSTom Tucker * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 32c06b540aSTom Tucker * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 33c06b540aSTom Tucker * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 34c06b540aSTom Tucker * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 35c06b540aSTom Tucker * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 36c06b540aSTom Tucker * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 37c06b540aSTom Tucker * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 38c06b540aSTom Tucker * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 39c06b540aSTom Tucker * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 40c06b540aSTom Tucker * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 41c06b540aSTom Tucker * 42c06b540aSTom Tucker * Author: Tom Tucker <tom@opengridcomputing.com> 43c06b540aSTom Tucker */ 44c06b540aSTom Tucker 459a6a180bSChuck Lever /* Operation 469a6a180bSChuck Lever * 479a6a180bSChuck Lever * The main entry point is svc_rdma_sendto. This is called by the 489a6a180bSChuck Lever * RPC server when an RPC Reply is ready to be transmitted to a client. 499a6a180bSChuck Lever * 509a6a180bSChuck Lever * The passed-in svc_rqst contains a struct xdr_buf which holds an 519a6a180bSChuck Lever * XDR-encoded RPC Reply message. sendto must construct the RPC-over-RDMA 529a6a180bSChuck Lever * transport header, post all Write WRs needed for this Reply, then post 539a6a180bSChuck Lever * a Send WR conveying the transport header and the RPC message itself to 549a6a180bSChuck Lever * the client. 559a6a180bSChuck Lever * 569a6a180bSChuck Lever * svc_rdma_sendto must fully transmit the Reply before returning, as 579a6a180bSChuck Lever * the svc_rqst will be recycled as soon as sendto returns. Remaining 589a6a180bSChuck Lever * resources referred to by the svc_rqst are also recycled at that time. 599a6a180bSChuck Lever * Therefore any resources that must remain longer must be detached 609a6a180bSChuck Lever * from the svc_rqst and released later. 619a6a180bSChuck Lever * 629a6a180bSChuck Lever * Page Management 639a6a180bSChuck Lever * 649a6a180bSChuck Lever * The I/O that performs Reply transmission is asynchronous, and may 659a6a180bSChuck Lever * complete well after sendto returns. Thus pages under I/O must be 669a6a180bSChuck Lever * removed from the svc_rqst before sendto returns. 679a6a180bSChuck Lever * 689a6a180bSChuck Lever * The logic here depends on Send Queue and completion ordering. Since 699a6a180bSChuck Lever * the Send WR is always posted last, it will always complete last. Thus 709a6a180bSChuck Lever * when it completes, it is guaranteed that all previous Write WRs have 719a6a180bSChuck Lever * also completed. 729a6a180bSChuck Lever * 739a6a180bSChuck Lever * Write WRs are constructed and posted. Each Write segment gets its own 749a6a180bSChuck Lever * svc_rdma_rw_ctxt, allowing the Write completion handler to find and 759a6a180bSChuck Lever * DMA-unmap the pages under I/O for that Write segment. The Write 769a6a180bSChuck Lever * completion handler does not release any pages. 779a6a180bSChuck Lever * 784201c746SChuck Lever * When the Send WR is constructed, it also gets its own svc_rdma_send_ctxt. 799a6a180bSChuck Lever * The ownership of all of the Reply's pages are transferred into that 809a6a180bSChuck Lever * ctxt, the Send WR is posted, and sendto returns. 819a6a180bSChuck Lever * 824201c746SChuck Lever * The svc_rdma_send_ctxt is presented when the Send WR completes. The 839a6a180bSChuck Lever * Send completion handler finally releases the Reply's pages. 849a6a180bSChuck Lever * 859a6a180bSChuck Lever * This mechanism also assumes that completions on the transport's Send 869a6a180bSChuck Lever * Completion Queue do not run in parallel. Otherwise a Write completion 879a6a180bSChuck Lever * and Send completion running at the same time could release pages that 889a6a180bSChuck Lever * are still DMA-mapped. 899a6a180bSChuck Lever * 909a6a180bSChuck Lever * Error Handling 919a6a180bSChuck Lever * 929a6a180bSChuck Lever * - If the Send WR is posted successfully, it will either complete 939a6a180bSChuck Lever * successfully, or get flushed. Either way, the Send completion 949a6a180bSChuck Lever * handler releases the Reply's pages. 959a6a180bSChuck Lever * - If the Send WR cannot be not posted, the forward path releases 969a6a180bSChuck Lever * the Reply's pages. 979a6a180bSChuck Lever * 989a6a180bSChuck Lever * This handles the case, without the use of page reference counting, 999a6a180bSChuck Lever * where two different Write segments send portions of the same page. 1009a6a180bSChuck Lever */ 1019a6a180bSChuck Lever 102c06b540aSTom Tucker #include <linux/spinlock.h> 103c06b540aSTom Tucker #include <asm/unaligned.h> 10498895edbSChuck Lever 105c06b540aSTom Tucker #include <rdma/ib_verbs.h> 106c06b540aSTom Tucker #include <rdma/rdma_cm.h> 10798895edbSChuck Lever 10898895edbSChuck Lever #include <linux/sunrpc/debug.h> 10998895edbSChuck Lever #include <linux/sunrpc/rpc_rdma.h> 110c06b540aSTom Tucker #include <linux/sunrpc/svc_rdma.h> 111c06b540aSTom Tucker 11298895edbSChuck Lever #include "xprt_rdma.h" 11398895edbSChuck Lever #include <trace/events/rpcrdma.h> 11498895edbSChuck Lever 115c06b540aSTom Tucker #define RPCDBG_FACILITY RPCDBG_SVCXPRT 116c06b540aSTom Tucker 1174201c746SChuck Lever static void svc_rdma_wc_send(struct ib_cq *cq, struct ib_wc *wc); 1184201c746SChuck Lever 1194201c746SChuck Lever static inline struct svc_rdma_send_ctxt * 1204201c746SChuck Lever svc_rdma_next_send_ctxt(struct list_head *list) 1214201c746SChuck Lever { 1224201c746SChuck Lever return list_first_entry_or_null(list, struct svc_rdma_send_ctxt, 1234201c746SChuck Lever sc_list); 1244201c746SChuck Lever } 1254201c746SChuck Lever 1264201c746SChuck Lever static struct svc_rdma_send_ctxt * 1274201c746SChuck Lever svc_rdma_send_ctxt_alloc(struct svcxprt_rdma *rdma) 1284201c746SChuck Lever { 1294201c746SChuck Lever struct svc_rdma_send_ctxt *ctxt; 13099722fe4SChuck Lever dma_addr_t addr; 13199722fe4SChuck Lever void *buffer; 13225fd86ecSChuck Lever size_t size; 1334201c746SChuck Lever int i; 1344201c746SChuck Lever 13525fd86ecSChuck Lever size = sizeof(*ctxt); 13625fd86ecSChuck Lever size += rdma->sc_max_send_sges * sizeof(struct ib_sge); 13725fd86ecSChuck Lever ctxt = kmalloc(size, GFP_KERNEL); 1384201c746SChuck Lever if (!ctxt) 13999722fe4SChuck Lever goto fail0; 14099722fe4SChuck Lever buffer = kmalloc(rdma->sc_max_req_size, GFP_KERNEL); 14199722fe4SChuck Lever if (!buffer) 14299722fe4SChuck Lever goto fail1; 14399722fe4SChuck Lever addr = ib_dma_map_single(rdma->sc_pd->device, buffer, 14499722fe4SChuck Lever rdma->sc_max_req_size, DMA_TO_DEVICE); 14599722fe4SChuck Lever if (ib_dma_mapping_error(rdma->sc_pd->device, addr)) 14699722fe4SChuck Lever goto fail2; 1474201c746SChuck Lever 1484201c746SChuck Lever ctxt->sc_send_wr.next = NULL; 1494201c746SChuck Lever ctxt->sc_send_wr.wr_cqe = &ctxt->sc_cqe; 1504201c746SChuck Lever ctxt->sc_send_wr.sg_list = ctxt->sc_sges; 1514201c746SChuck Lever ctxt->sc_send_wr.send_flags = IB_SEND_SIGNALED; 15299722fe4SChuck Lever ctxt->sc_cqe.done = svc_rdma_wc_send; 15399722fe4SChuck Lever ctxt->sc_xprt_buf = buffer; 15499722fe4SChuck Lever ctxt->sc_sges[0].addr = addr; 15599722fe4SChuck Lever 15625fd86ecSChuck Lever for (i = 0; i < rdma->sc_max_send_sges; i++) 1574201c746SChuck Lever ctxt->sc_sges[i].lkey = rdma->sc_pd->local_dma_lkey; 1584201c746SChuck Lever return ctxt; 15999722fe4SChuck Lever 16099722fe4SChuck Lever fail2: 16199722fe4SChuck Lever kfree(buffer); 16299722fe4SChuck Lever fail1: 16399722fe4SChuck Lever kfree(ctxt); 16499722fe4SChuck Lever fail0: 16599722fe4SChuck Lever return NULL; 1664201c746SChuck Lever } 1674201c746SChuck Lever 1684201c746SChuck Lever /** 1694201c746SChuck Lever * svc_rdma_send_ctxts_destroy - Release all send_ctxt's for an xprt 1704201c746SChuck Lever * @rdma: svcxprt_rdma being torn down 1714201c746SChuck Lever * 1724201c746SChuck Lever */ 1734201c746SChuck Lever void svc_rdma_send_ctxts_destroy(struct svcxprt_rdma *rdma) 1744201c746SChuck Lever { 1754201c746SChuck Lever struct svc_rdma_send_ctxt *ctxt; 1764201c746SChuck Lever 1774201c746SChuck Lever while ((ctxt = svc_rdma_next_send_ctxt(&rdma->sc_send_ctxts))) { 1784201c746SChuck Lever list_del(&ctxt->sc_list); 17999722fe4SChuck Lever ib_dma_unmap_single(rdma->sc_pd->device, 18099722fe4SChuck Lever ctxt->sc_sges[0].addr, 18199722fe4SChuck Lever rdma->sc_max_req_size, 18299722fe4SChuck Lever DMA_TO_DEVICE); 18399722fe4SChuck Lever kfree(ctxt->sc_xprt_buf); 1844201c746SChuck Lever kfree(ctxt); 1854201c746SChuck Lever } 1864201c746SChuck Lever } 1874201c746SChuck Lever 1884201c746SChuck Lever /** 1894201c746SChuck Lever * svc_rdma_send_ctxt_get - Get a free send_ctxt 1904201c746SChuck Lever * @rdma: controlling svcxprt_rdma 1914201c746SChuck Lever * 1924201c746SChuck Lever * Returns a ready-to-use send_ctxt, or NULL if none are 1934201c746SChuck Lever * available and a fresh one cannot be allocated. 1944201c746SChuck Lever */ 1954201c746SChuck Lever struct svc_rdma_send_ctxt *svc_rdma_send_ctxt_get(struct svcxprt_rdma *rdma) 1964201c746SChuck Lever { 1974201c746SChuck Lever struct svc_rdma_send_ctxt *ctxt; 1984201c746SChuck Lever 1994201c746SChuck Lever spin_lock(&rdma->sc_send_lock); 2004201c746SChuck Lever ctxt = svc_rdma_next_send_ctxt(&rdma->sc_send_ctxts); 2014201c746SChuck Lever if (!ctxt) 2024201c746SChuck Lever goto out_empty; 2034201c746SChuck Lever list_del(&ctxt->sc_list); 2044201c746SChuck Lever spin_unlock(&rdma->sc_send_lock); 2054201c746SChuck Lever 2064201c746SChuck Lever out: 2074201c746SChuck Lever ctxt->sc_send_wr.num_sge = 0; 20899722fe4SChuck Lever ctxt->sc_cur_sge_no = 0; 2094201c746SChuck Lever ctxt->sc_page_count = 0; 2104201c746SChuck Lever return ctxt; 2114201c746SChuck Lever 2124201c746SChuck Lever out_empty: 2134201c746SChuck Lever spin_unlock(&rdma->sc_send_lock); 2144201c746SChuck Lever ctxt = svc_rdma_send_ctxt_alloc(rdma); 2154201c746SChuck Lever if (!ctxt) 2164201c746SChuck Lever return NULL; 2174201c746SChuck Lever goto out; 2184201c746SChuck Lever } 2194201c746SChuck Lever 2204201c746SChuck Lever /** 2214201c746SChuck Lever * svc_rdma_send_ctxt_put - Return send_ctxt to free list 2224201c746SChuck Lever * @rdma: controlling svcxprt_rdma 2234201c746SChuck Lever * @ctxt: object to return to the free list 2244201c746SChuck Lever * 2254201c746SChuck Lever * Pages left in sc_pages are DMA unmapped and released. 2264201c746SChuck Lever */ 2274201c746SChuck Lever void svc_rdma_send_ctxt_put(struct svcxprt_rdma *rdma, 2284201c746SChuck Lever struct svc_rdma_send_ctxt *ctxt) 2294201c746SChuck Lever { 2304201c746SChuck Lever struct ib_device *device = rdma->sc_cm_id->device; 2314201c746SChuck Lever unsigned int i; 2324201c746SChuck Lever 23399722fe4SChuck Lever /* The first SGE contains the transport header, which 23499722fe4SChuck Lever * remains mapped until @ctxt is destroyed. 23599722fe4SChuck Lever */ 236832b2cb9SChuck Lever for (i = 1; i < ctxt->sc_send_wr.num_sge; i++) { 2374201c746SChuck Lever ib_dma_unmap_page(device, 2384201c746SChuck Lever ctxt->sc_sges[i].addr, 2394201c746SChuck Lever ctxt->sc_sges[i].length, 2404201c746SChuck Lever DMA_TO_DEVICE); 241832b2cb9SChuck Lever trace_svcrdma_dma_unmap_page(rdma, 242832b2cb9SChuck Lever ctxt->sc_sges[i].addr, 243832b2cb9SChuck Lever ctxt->sc_sges[i].length); 244832b2cb9SChuck Lever } 2454201c746SChuck Lever 2464201c746SChuck Lever for (i = 0; i < ctxt->sc_page_count; ++i) 2474201c746SChuck Lever put_page(ctxt->sc_pages[i]); 2484201c746SChuck Lever 2494201c746SChuck Lever spin_lock(&rdma->sc_send_lock); 2504201c746SChuck Lever list_add(&ctxt->sc_list, &rdma->sc_send_ctxts); 2514201c746SChuck Lever spin_unlock(&rdma->sc_send_lock); 2524201c746SChuck Lever } 2534201c746SChuck Lever 2544201c746SChuck Lever /** 2554201c746SChuck Lever * svc_rdma_wc_send - Invoked by RDMA provider for each polled Send WC 2564201c746SChuck Lever * @cq: Completion Queue context 2574201c746SChuck Lever * @wc: Work Completion object 2584201c746SChuck Lever * 2594201c746SChuck Lever * NB: The svc_xprt/svcxprt_rdma is pinned whenever it's possible that 2604201c746SChuck Lever * the Send completion handler could be running. 2614201c746SChuck Lever */ 2624201c746SChuck Lever static void svc_rdma_wc_send(struct ib_cq *cq, struct ib_wc *wc) 2634201c746SChuck Lever { 2644201c746SChuck Lever struct svcxprt_rdma *rdma = cq->cq_context; 2654201c746SChuck Lever struct ib_cqe *cqe = wc->wr_cqe; 2664201c746SChuck Lever struct svc_rdma_send_ctxt *ctxt; 2674201c746SChuck Lever 2684201c746SChuck Lever trace_svcrdma_wc_send(wc); 2694201c746SChuck Lever 2704201c746SChuck Lever atomic_inc(&rdma->sc_sq_avail); 2714201c746SChuck Lever wake_up(&rdma->sc_send_wait); 2724201c746SChuck Lever 2734201c746SChuck Lever ctxt = container_of(cqe, struct svc_rdma_send_ctxt, sc_cqe); 2744201c746SChuck Lever svc_rdma_send_ctxt_put(rdma, ctxt); 2754201c746SChuck Lever 2764201c746SChuck Lever if (unlikely(wc->status != IB_WC_SUCCESS)) { 2774201c746SChuck Lever set_bit(XPT_CLOSE, &rdma->sc_xprt.xpt_flags); 2784201c746SChuck Lever svc_xprt_enqueue(&rdma->sc_xprt); 2794201c746SChuck Lever } 2804201c746SChuck Lever 2814201c746SChuck Lever svc_xprt_put(&rdma->sc_xprt); 2824201c746SChuck Lever } 2834201c746SChuck Lever 2843abb03faSChuck Lever /** 2853abb03faSChuck Lever * svc_rdma_send - Post a single Send WR 2863abb03faSChuck Lever * @rdma: transport on which to post the WR 2873abb03faSChuck Lever * @wr: prepared Send WR to post 2883abb03faSChuck Lever * 2893abb03faSChuck Lever * Returns zero the Send WR was posted successfully. Otherwise, a 2903abb03faSChuck Lever * negative errno is returned. 2913abb03faSChuck Lever */ 2924201c746SChuck Lever int svc_rdma_send(struct svcxprt_rdma *rdma, struct ib_send_wr *wr) 2934201c746SChuck Lever { 2944201c746SChuck Lever int ret; 2954201c746SChuck Lever 2963abb03faSChuck Lever might_sleep(); 2974201c746SChuck Lever 2984201c746SChuck Lever /* If the SQ is full, wait until an SQ entry is available */ 2994201c746SChuck Lever while (1) { 3003abb03faSChuck Lever if ((atomic_dec_return(&rdma->sc_sq_avail) < 0)) { 3014201c746SChuck Lever atomic_inc(&rdma_stat_sq_starve); 3024201c746SChuck Lever trace_svcrdma_sq_full(rdma); 3033abb03faSChuck Lever atomic_inc(&rdma->sc_sq_avail); 3044201c746SChuck Lever wait_event(rdma->sc_send_wait, 3053abb03faSChuck Lever atomic_read(&rdma->sc_sq_avail) > 1); 3064201c746SChuck Lever if (test_bit(XPT_CLOSE, &rdma->sc_xprt.xpt_flags)) 3074201c746SChuck Lever return -ENOTCONN; 3084201c746SChuck Lever trace_svcrdma_sq_retry(rdma); 3094201c746SChuck Lever continue; 3104201c746SChuck Lever } 3114201c746SChuck Lever 3123abb03faSChuck Lever svc_xprt_get(&rdma->sc_xprt); 313ed288d74SBart Van Assche ret = ib_post_send(rdma->sc_qp, wr, NULL); 3144201c746SChuck Lever trace_svcrdma_post_send(wr, ret); 3154201c746SChuck Lever if (ret) { 3164201c746SChuck Lever set_bit(XPT_CLOSE, &rdma->sc_xprt.xpt_flags); 3174201c746SChuck Lever svc_xprt_put(&rdma->sc_xprt); 3184201c746SChuck Lever wake_up(&rdma->sc_send_wait); 3194201c746SChuck Lever } 3204201c746SChuck Lever break; 3214201c746SChuck Lever } 3224201c746SChuck Lever return ret; 3234201c746SChuck Lever } 3244201c746SChuck Lever 3259a6a180bSChuck Lever /* Returns length of transport header, in bytes. 3269a6a180bSChuck Lever */ 3279a6a180bSChuck Lever static unsigned int svc_rdma_reply_hdr_len(__be32 *rdma_resp) 3289a6a180bSChuck Lever { 3299a6a180bSChuck Lever unsigned int nsegs; 3309a6a180bSChuck Lever __be32 *p; 3319a6a180bSChuck Lever 3329a6a180bSChuck Lever p = rdma_resp; 3339a6a180bSChuck Lever 3349a6a180bSChuck Lever /* RPC-over-RDMA V1 replies never have a Read list. */ 3359a6a180bSChuck Lever p += rpcrdma_fixed_maxsz + 1; 3369a6a180bSChuck Lever 3379a6a180bSChuck Lever /* Skip Write list. */ 3389a6a180bSChuck Lever while (*p++ != xdr_zero) { 3399a6a180bSChuck Lever nsegs = be32_to_cpup(p++); 3409a6a180bSChuck Lever p += nsegs * rpcrdma_segment_maxsz; 3419a6a180bSChuck Lever } 3429a6a180bSChuck Lever 3439a6a180bSChuck Lever /* Skip Reply chunk. */ 3449a6a180bSChuck Lever if (*p++ != xdr_zero) { 3459a6a180bSChuck Lever nsegs = be32_to_cpup(p++); 3469a6a180bSChuck Lever p += nsegs * rpcrdma_segment_maxsz; 3479a6a180bSChuck Lever } 3489a6a180bSChuck Lever 3499a6a180bSChuck Lever return (unsigned long)p - (unsigned long)rdma_resp; 3509a6a180bSChuck Lever } 3519a6a180bSChuck Lever 3529a6a180bSChuck Lever /* One Write chunk is copied from Call transport header to Reply 3539a6a180bSChuck Lever * transport header. Each segment's length field is updated to 3549a6a180bSChuck Lever * reflect number of bytes consumed in the segment. 3559a6a180bSChuck Lever * 3569a6a180bSChuck Lever * Returns number of segments in this chunk. 3579a6a180bSChuck Lever */ 3589a6a180bSChuck Lever static unsigned int xdr_encode_write_chunk(__be32 *dst, __be32 *src, 3599a6a180bSChuck Lever unsigned int remaining) 3609a6a180bSChuck Lever { 3619a6a180bSChuck Lever unsigned int i, nsegs; 3629a6a180bSChuck Lever u32 seg_len; 3639a6a180bSChuck Lever 3649a6a180bSChuck Lever /* Write list discriminator */ 3659a6a180bSChuck Lever *dst++ = *src++; 3669a6a180bSChuck Lever 3679a6a180bSChuck Lever /* number of segments in this chunk */ 3689a6a180bSChuck Lever nsegs = be32_to_cpup(src); 3699a6a180bSChuck Lever *dst++ = *src++; 3709a6a180bSChuck Lever 3719a6a180bSChuck Lever for (i = nsegs; i; i--) { 3729a6a180bSChuck Lever /* segment's RDMA handle */ 3739a6a180bSChuck Lever *dst++ = *src++; 3749a6a180bSChuck Lever 3759a6a180bSChuck Lever /* bytes returned in this segment */ 3769a6a180bSChuck Lever seg_len = be32_to_cpu(*src); 3779a6a180bSChuck Lever if (remaining >= seg_len) { 3789a6a180bSChuck Lever /* entire segment was consumed */ 3799a6a180bSChuck Lever *dst = *src; 3809a6a180bSChuck Lever remaining -= seg_len; 3819a6a180bSChuck Lever } else { 3829a6a180bSChuck Lever /* segment only partly filled */ 3839a6a180bSChuck Lever *dst = cpu_to_be32(remaining); 3849a6a180bSChuck Lever remaining = 0; 3859a6a180bSChuck Lever } 3869a6a180bSChuck Lever dst++; src++; 3879a6a180bSChuck Lever 3889a6a180bSChuck Lever /* segment's RDMA offset */ 3899a6a180bSChuck Lever *dst++ = *src++; 3909a6a180bSChuck Lever *dst++ = *src++; 3919a6a180bSChuck Lever } 3929a6a180bSChuck Lever 3939a6a180bSChuck Lever return nsegs; 3949a6a180bSChuck Lever } 3959a6a180bSChuck Lever 3969a6a180bSChuck Lever /* The client provided a Write list in the Call message. Fill in 3979a6a180bSChuck Lever * the segments in the first Write chunk in the Reply's transport 3989a6a180bSChuck Lever * header with the number of bytes consumed in each segment. 3999a6a180bSChuck Lever * Remaining chunks are returned unused. 4009a6a180bSChuck Lever * 4019a6a180bSChuck Lever * Assumptions: 4029a6a180bSChuck Lever * - Client has provided only one Write chunk 4039a6a180bSChuck Lever */ 4049a6a180bSChuck Lever static void svc_rdma_xdr_encode_write_list(__be32 *rdma_resp, __be32 *wr_ch, 4059a6a180bSChuck Lever unsigned int consumed) 4069a6a180bSChuck Lever { 4079a6a180bSChuck Lever unsigned int nsegs; 4089a6a180bSChuck Lever __be32 *p, *q; 4099a6a180bSChuck Lever 4109a6a180bSChuck Lever /* RPC-over-RDMA V1 replies never have a Read list. */ 4119a6a180bSChuck Lever p = rdma_resp + rpcrdma_fixed_maxsz + 1; 4129a6a180bSChuck Lever 4139a6a180bSChuck Lever q = wr_ch; 4149a6a180bSChuck Lever while (*q != xdr_zero) { 4159a6a180bSChuck Lever nsegs = xdr_encode_write_chunk(p, q, consumed); 4169a6a180bSChuck Lever q += 2 + nsegs * rpcrdma_segment_maxsz; 4179a6a180bSChuck Lever p += 2 + nsegs * rpcrdma_segment_maxsz; 4189a6a180bSChuck Lever consumed = 0; 4199a6a180bSChuck Lever } 4209a6a180bSChuck Lever 4219a6a180bSChuck Lever /* Terminate Write list */ 4229a6a180bSChuck Lever *p++ = xdr_zero; 4239a6a180bSChuck Lever 4249a6a180bSChuck Lever /* Reply chunk discriminator; may be replaced later */ 4259a6a180bSChuck Lever *p = xdr_zero; 4269a6a180bSChuck Lever } 4279a6a180bSChuck Lever 4289a6a180bSChuck Lever /* The client provided a Reply chunk in the Call message. Fill in 4299a6a180bSChuck Lever * the segments in the Reply chunk in the Reply message with the 4309a6a180bSChuck Lever * number of bytes consumed in each segment. 4319a6a180bSChuck Lever * 4329a6a180bSChuck Lever * Assumptions: 4339a6a180bSChuck Lever * - Reply can always fit in the provided Reply chunk 4349a6a180bSChuck Lever */ 4359a6a180bSChuck Lever static void svc_rdma_xdr_encode_reply_chunk(__be32 *rdma_resp, __be32 *rp_ch, 4369a6a180bSChuck Lever unsigned int consumed) 4379a6a180bSChuck Lever { 4389a6a180bSChuck Lever __be32 *p; 4399a6a180bSChuck Lever 4409a6a180bSChuck Lever /* Find the Reply chunk in the Reply's xprt header. 4419a6a180bSChuck Lever * RPC-over-RDMA V1 replies never have a Read list. 4429a6a180bSChuck Lever */ 4439a6a180bSChuck Lever p = rdma_resp + rpcrdma_fixed_maxsz + 1; 4449a6a180bSChuck Lever 4459a6a180bSChuck Lever /* Skip past Write list */ 4469a6a180bSChuck Lever while (*p++ != xdr_zero) 4479a6a180bSChuck Lever p += 1 + be32_to_cpup(p) * rpcrdma_segment_maxsz; 4489a6a180bSChuck Lever 4499a6a180bSChuck Lever xdr_encode_write_chunk(p, rp_ch, consumed); 4509a6a180bSChuck Lever } 4519a6a180bSChuck Lever 4525fdca653SChuck Lever /* Parse the RPC Call's transport header. 45310dc4512SChuck Lever */ 4549a6a180bSChuck Lever static void svc_rdma_get_write_arrays(__be32 *rdma_argp, 4559a6a180bSChuck Lever __be32 **write, __be32 **reply) 45610dc4512SChuck Lever { 4575fdca653SChuck Lever __be32 *p; 45810dc4512SChuck Lever 4599a6a180bSChuck Lever p = rdma_argp + rpcrdma_fixed_maxsz; 4605fdca653SChuck Lever 4615fdca653SChuck Lever /* Read list */ 4625fdca653SChuck Lever while (*p++ != xdr_zero) 4635fdca653SChuck Lever p += 5; 4645fdca653SChuck Lever 4655fdca653SChuck Lever /* Write list */ 4665fdca653SChuck Lever if (*p != xdr_zero) { 4679a6a180bSChuck Lever *write = p; 4685fdca653SChuck Lever while (*p++ != xdr_zero) 4695fdca653SChuck Lever p += 1 + be32_to_cpu(*p) * 4; 4705fdca653SChuck Lever } else { 4715fdca653SChuck Lever *write = NULL; 4725fdca653SChuck Lever p++; 47310dc4512SChuck Lever } 47410dc4512SChuck Lever 4755fdca653SChuck Lever /* Reply chunk */ 4765fdca653SChuck Lever if (*p != xdr_zero) 4779a6a180bSChuck Lever *reply = p; 4785fdca653SChuck Lever else 4795fdca653SChuck Lever *reply = NULL; 48010dc4512SChuck Lever } 48110dc4512SChuck Lever 4826e6092caSChuck Lever static int svc_rdma_dma_map_page(struct svcxprt_rdma *rdma, 4834201c746SChuck Lever struct svc_rdma_send_ctxt *ctxt, 4846e6092caSChuck Lever struct page *page, 485f016f305SChuck Lever unsigned long offset, 4866e6092caSChuck Lever unsigned int len) 4876e6092caSChuck Lever { 4886e6092caSChuck Lever struct ib_device *dev = rdma->sc_cm_id->device; 4896e6092caSChuck Lever dma_addr_t dma_addr; 4906e6092caSChuck Lever 4916e6092caSChuck Lever dma_addr = ib_dma_map_page(dev, page, offset, len, DMA_TO_DEVICE); 492832b2cb9SChuck Lever trace_svcrdma_dma_map_page(rdma, dma_addr, len); 4936e6092caSChuck Lever if (ib_dma_mapping_error(dev, dma_addr)) 49491a08eaeSChuck Lever goto out_maperr; 4956e6092caSChuck Lever 49625fd86ecSChuck Lever ctxt->sc_sges[ctxt->sc_cur_sge_no].addr = dma_addr; 49725fd86ecSChuck Lever ctxt->sc_sges[ctxt->sc_cur_sge_no].length = len; 4984201c746SChuck Lever ctxt->sc_send_wr.num_sge++; 4996e6092caSChuck Lever return 0; 50091a08eaeSChuck Lever 50191a08eaeSChuck Lever out_maperr: 50291a08eaeSChuck Lever return -EIO; 5036e6092caSChuck Lever } 5046e6092caSChuck Lever 505f016f305SChuck Lever /* ib_dma_map_page() is used here because svc_rdma_dma_unmap() 506f016f305SChuck Lever * handles DMA-unmap and it uses ib_dma_unmap_page() exclusively. 507f016f305SChuck Lever */ 508f016f305SChuck Lever static int svc_rdma_dma_map_buf(struct svcxprt_rdma *rdma, 5094201c746SChuck Lever struct svc_rdma_send_ctxt *ctxt, 510f016f305SChuck Lever unsigned char *base, 511f016f305SChuck Lever unsigned int len) 512f016f305SChuck Lever { 51325fd86ecSChuck Lever return svc_rdma_dma_map_page(rdma, ctxt, virt_to_page(base), 514f016f305SChuck Lever offset_in_page(base), len); 515f016f305SChuck Lever } 516f016f305SChuck Lever 5176e6092caSChuck Lever /** 51899722fe4SChuck Lever * svc_rdma_sync_reply_hdr - DMA sync the transport header buffer 5196e6092caSChuck Lever * @rdma: controlling transport 52099722fe4SChuck Lever * @ctxt: send_ctxt for the Send WR 5216e6092caSChuck Lever * @len: length of transport header 5226e6092caSChuck Lever * 5236e6092caSChuck Lever */ 52499722fe4SChuck Lever void svc_rdma_sync_reply_hdr(struct svcxprt_rdma *rdma, 5254201c746SChuck Lever struct svc_rdma_send_ctxt *ctxt, 5266e6092caSChuck Lever unsigned int len) 5276e6092caSChuck Lever { 52899722fe4SChuck Lever ctxt->sc_sges[0].length = len; 52999722fe4SChuck Lever ctxt->sc_send_wr.num_sge++; 53099722fe4SChuck Lever ib_dma_sync_single_for_device(rdma->sc_pd->device, 53199722fe4SChuck Lever ctxt->sc_sges[0].addr, len, 53299722fe4SChuck Lever DMA_TO_DEVICE); 5336e6092caSChuck Lever } 5346e6092caSChuck Lever 535e248aa7bSChuck Lever /* If the xdr_buf has more elements than the device can 536e248aa7bSChuck Lever * transmit in a single RDMA Send, then the reply will 537e248aa7bSChuck Lever * have to be copied into a bounce buffer. 538e248aa7bSChuck Lever */ 539e248aa7bSChuck Lever static bool svc_rdma_pull_up_needed(struct svcxprt_rdma *rdma, 540e248aa7bSChuck Lever struct xdr_buf *xdr, 541e248aa7bSChuck Lever __be32 *wr_lst) 542e248aa7bSChuck Lever { 543e248aa7bSChuck Lever int elements; 544e248aa7bSChuck Lever 545e248aa7bSChuck Lever /* xdr->head */ 546e248aa7bSChuck Lever elements = 1; 547e248aa7bSChuck Lever 548e248aa7bSChuck Lever /* xdr->pages */ 549e248aa7bSChuck Lever if (!wr_lst) { 550e248aa7bSChuck Lever unsigned int remaining; 551e248aa7bSChuck Lever unsigned long pageoff; 552e248aa7bSChuck Lever 553e248aa7bSChuck Lever pageoff = xdr->page_base & ~PAGE_MASK; 554e248aa7bSChuck Lever remaining = xdr->page_len; 555e248aa7bSChuck Lever while (remaining) { 556e248aa7bSChuck Lever ++elements; 557e248aa7bSChuck Lever remaining -= min_t(u32, PAGE_SIZE - pageoff, 558e248aa7bSChuck Lever remaining); 559e248aa7bSChuck Lever pageoff = 0; 560e248aa7bSChuck Lever } 561e248aa7bSChuck Lever } 562e248aa7bSChuck Lever 563e248aa7bSChuck Lever /* xdr->tail */ 564e248aa7bSChuck Lever if (xdr->tail[0].iov_len) 565e248aa7bSChuck Lever ++elements; 566e248aa7bSChuck Lever 567e248aa7bSChuck Lever /* assume 1 SGE is needed for the transport header */ 568e248aa7bSChuck Lever return elements >= rdma->sc_max_send_sges; 569e248aa7bSChuck Lever } 570e248aa7bSChuck Lever 571e248aa7bSChuck Lever /* The device is not capable of sending the reply directly. 572e248aa7bSChuck Lever * Assemble the elements of @xdr into the transport header 573e248aa7bSChuck Lever * buffer. 574e248aa7bSChuck Lever */ 575e248aa7bSChuck Lever static int svc_rdma_pull_up_reply_msg(struct svcxprt_rdma *rdma, 576e248aa7bSChuck Lever struct svc_rdma_send_ctxt *ctxt, 577e248aa7bSChuck Lever struct xdr_buf *xdr, __be32 *wr_lst) 578e248aa7bSChuck Lever { 579e248aa7bSChuck Lever unsigned char *dst, *tailbase; 580e248aa7bSChuck Lever unsigned int taillen; 581e248aa7bSChuck Lever 582e248aa7bSChuck Lever dst = ctxt->sc_xprt_buf; 583e248aa7bSChuck Lever dst += ctxt->sc_sges[0].length; 584e248aa7bSChuck Lever 585e248aa7bSChuck Lever memcpy(dst, xdr->head[0].iov_base, xdr->head[0].iov_len); 586e248aa7bSChuck Lever dst += xdr->head[0].iov_len; 587e248aa7bSChuck Lever 588e248aa7bSChuck Lever tailbase = xdr->tail[0].iov_base; 589e248aa7bSChuck Lever taillen = xdr->tail[0].iov_len; 590e248aa7bSChuck Lever if (wr_lst) { 591e248aa7bSChuck Lever u32 xdrpad; 592e248aa7bSChuck Lever 59396f194b7SChuck Lever xdrpad = xdr_pad_size(xdr->page_len); 594e248aa7bSChuck Lever if (taillen && xdrpad) { 595e248aa7bSChuck Lever tailbase += xdrpad; 596e248aa7bSChuck Lever taillen -= xdrpad; 597e248aa7bSChuck Lever } 598e248aa7bSChuck Lever } else { 599e248aa7bSChuck Lever unsigned int len, remaining; 600e248aa7bSChuck Lever unsigned long pageoff; 601e248aa7bSChuck Lever struct page **ppages; 602e248aa7bSChuck Lever 603e248aa7bSChuck Lever ppages = xdr->pages + (xdr->page_base >> PAGE_SHIFT); 604e248aa7bSChuck Lever pageoff = xdr->page_base & ~PAGE_MASK; 605e248aa7bSChuck Lever remaining = xdr->page_len; 606e248aa7bSChuck Lever while (remaining) { 607e248aa7bSChuck Lever len = min_t(u32, PAGE_SIZE - pageoff, remaining); 608e248aa7bSChuck Lever 609e248aa7bSChuck Lever memcpy(dst, page_address(*ppages), len); 610e248aa7bSChuck Lever remaining -= len; 611e248aa7bSChuck Lever dst += len; 612e248aa7bSChuck Lever pageoff = 0; 613e248aa7bSChuck Lever } 614e248aa7bSChuck Lever } 615e248aa7bSChuck Lever 616e248aa7bSChuck Lever if (taillen) 617e248aa7bSChuck Lever memcpy(dst, tailbase, taillen); 618e248aa7bSChuck Lever 619e248aa7bSChuck Lever ctxt->sc_sges[0].length += xdr->len; 620e248aa7bSChuck Lever ib_dma_sync_single_for_device(rdma->sc_pd->device, 621e248aa7bSChuck Lever ctxt->sc_sges[0].addr, 622e248aa7bSChuck Lever ctxt->sc_sges[0].length, 623e248aa7bSChuck Lever DMA_TO_DEVICE); 624e248aa7bSChuck Lever 625e248aa7bSChuck Lever return 0; 626e248aa7bSChuck Lever } 627e248aa7bSChuck Lever 62899722fe4SChuck Lever /* svc_rdma_map_reply_msg - Map the buffer holding RPC message 62999722fe4SChuck Lever * @rdma: controlling transport 63099722fe4SChuck Lever * @ctxt: send_ctxt for the Send WR 63199722fe4SChuck Lever * @xdr: prepared xdr_buf containing RPC message 63299722fe4SChuck Lever * @wr_lst: pointer to Call header's Write list, or NULL 63399722fe4SChuck Lever * 63499722fe4SChuck Lever * Load the xdr_buf into the ctxt's sge array, and DMA map each 6359a6a180bSChuck Lever * element as it is added. 6369a6a180bSChuck Lever * 63723262790SChuck Lever * Returns zero on success, or a negative errno on failure. 638c06b540aSTom Tucker */ 63999722fe4SChuck Lever int svc_rdma_map_reply_msg(struct svcxprt_rdma *rdma, 6404201c746SChuck Lever struct svc_rdma_send_ctxt *ctxt, 6419a6a180bSChuck Lever struct xdr_buf *xdr, __be32 *wr_lst) 642c06b540aSTom Tucker { 64325fd86ecSChuck Lever unsigned int len, remaining; 644f016f305SChuck Lever unsigned long page_off; 6459a6a180bSChuck Lever struct page **ppages; 6469a6a180bSChuck Lever unsigned char *base; 6479a6a180bSChuck Lever u32 xdr_pad; 648c06b540aSTom Tucker int ret; 649c06b540aSTom Tucker 650e248aa7bSChuck Lever if (svc_rdma_pull_up_needed(rdma, xdr, wr_lst)) 651e248aa7bSChuck Lever return svc_rdma_pull_up_reply_msg(rdma, ctxt, xdr, wr_lst); 652e248aa7bSChuck Lever 653e248aa7bSChuck Lever ++ctxt->sc_cur_sge_no; 65425fd86ecSChuck Lever ret = svc_rdma_dma_map_buf(rdma, ctxt, 6559a6a180bSChuck Lever xdr->head[0].iov_base, 6569a6a180bSChuck Lever xdr->head[0].iov_len); 6579a6a180bSChuck Lever if (ret < 0) 6589a6a180bSChuck Lever return ret; 659c06b540aSTom Tucker 6609a6a180bSChuck Lever /* If a Write chunk is present, the xdr_buf's page list 6619a6a180bSChuck Lever * is not included inline. However the Upper Layer may 6629a6a180bSChuck Lever * have added XDR padding in the tail buffer, and that 6639a6a180bSChuck Lever * should not be included inline. 6649a6a180bSChuck Lever */ 6659a6a180bSChuck Lever if (wr_lst) { 6669a6a180bSChuck Lever base = xdr->tail[0].iov_base; 6679a6a180bSChuck Lever len = xdr->tail[0].iov_len; 66896f194b7SChuck Lever xdr_pad = xdr_pad_size(xdr->page_len); 669c06b540aSTom Tucker 6709a6a180bSChuck Lever if (len && xdr_pad) { 6719a6a180bSChuck Lever base += xdr_pad; 6729a6a180bSChuck Lever len -= xdr_pad; 673c06b540aSTom Tucker } 674c06b540aSTom Tucker 6759a6a180bSChuck Lever goto tail; 676c06b540aSTom Tucker } 6779a6a180bSChuck Lever 6789a6a180bSChuck Lever ppages = xdr->pages + (xdr->page_base >> PAGE_SHIFT); 6799a6a180bSChuck Lever page_off = xdr->page_base & ~PAGE_MASK; 6809a6a180bSChuck Lever remaining = xdr->page_len; 6819a6a180bSChuck Lever while (remaining) { 6829a6a180bSChuck Lever len = min_t(u32, PAGE_SIZE - page_off, remaining); 6839a6a180bSChuck Lever 684e248aa7bSChuck Lever ++ctxt->sc_cur_sge_no; 68525fd86ecSChuck Lever ret = svc_rdma_dma_map_page(rdma, ctxt, *ppages++, 68625fd86ecSChuck Lever page_off, len); 6879a6a180bSChuck Lever if (ret < 0) 6889a6a180bSChuck Lever return ret; 6899a6a180bSChuck Lever 6909a6a180bSChuck Lever remaining -= len; 6919a6a180bSChuck Lever page_off = 0; 692c06b540aSTom Tucker } 693c06b540aSTom Tucker 6949a6a180bSChuck Lever base = xdr->tail[0].iov_base; 6959a6a180bSChuck Lever len = xdr->tail[0].iov_len; 6969a6a180bSChuck Lever tail: 6979a6a180bSChuck Lever if (len) { 698e248aa7bSChuck Lever ++ctxt->sc_cur_sge_no; 69925fd86ecSChuck Lever ret = svc_rdma_dma_map_buf(rdma, ctxt, base, len); 7009a6a180bSChuck Lever if (ret < 0) 7019a6a180bSChuck Lever return ret; 7029a6a180bSChuck Lever } 70308ae4e7fSChuck Lever 70423262790SChuck Lever return 0; 705c06b540aSTom Tucker } 706c06b540aSTom Tucker 707c55ab070SChuck Lever /* The svc_rqst and all resources it owns are released as soon as 708c55ab070SChuck Lever * svc_rdma_sendto returns. Transfer pages under I/O to the ctxt 709c55ab070SChuck Lever * so they are released by the Send completion handler. 710c55ab070SChuck Lever */ 711c55ab070SChuck Lever static void svc_rdma_save_io_pages(struct svc_rqst *rqstp, 7124201c746SChuck Lever struct svc_rdma_send_ctxt *ctxt) 713c55ab070SChuck Lever { 714c55ab070SChuck Lever int i, pages = rqstp->rq_next_page - rqstp->rq_respages; 715c55ab070SChuck Lever 7164201c746SChuck Lever ctxt->sc_page_count += pages; 717c55ab070SChuck Lever for (i = 0; i < pages; i++) { 71899722fe4SChuck Lever ctxt->sc_pages[i] = rqstp->rq_respages[i]; 719c55ab070SChuck Lever rqstp->rq_respages[i] = NULL; 720c55ab070SChuck Lever } 721a53d5cb0SChuck Lever 722a53d5cb0SChuck Lever /* Prevent svc_xprt_release from releasing pages in rq_pages */ 723a53d5cb0SChuck Lever rqstp->rq_next_page = rqstp->rq_respages; 724c55ab070SChuck Lever } 725c55ab070SChuck Lever 7269a6a180bSChuck Lever /* Prepare the portion of the RPC Reply that will be transmitted 7279a6a180bSChuck Lever * via RDMA Send. The RPC-over-RDMA transport header is prepared 7284201c746SChuck Lever * in sc_sges[0], and the RPC xdr_buf is prepared in following sges. 7299a6a180bSChuck Lever * 7309a6a180bSChuck Lever * Depending on whether a Write list or Reply chunk is present, 7319a6a180bSChuck Lever * the server may send all, a portion of, or none of the xdr_buf. 7324201c746SChuck Lever * In the latter case, only the transport header (sc_sges[0]) is 7339a6a180bSChuck Lever * transmitted. 7349a6a180bSChuck Lever * 7359a6a180bSChuck Lever * RDMA Send is the last step of transmitting an RPC reply. Pages 7369a6a180bSChuck Lever * involved in the earlier RDMA Writes are here transferred out 73797bce634SChuck Lever * of the rqstp and into the sctxt's page array. These pages are 7389a6a180bSChuck Lever * DMA unmapped by each Write completion, but the subsequent Send 7399a6a180bSChuck Lever * completion finally releases these pages. 7409a6a180bSChuck Lever * 7419a6a180bSChuck Lever * Assumptions: 7429a6a180bSChuck Lever * - The Reply's transport header will never be larger than a page. 743c06b540aSTom Tucker */ 7449a6a180bSChuck Lever static int svc_rdma_send_reply_msg(struct svcxprt_rdma *rdma, 74597bce634SChuck Lever struct svc_rdma_send_ctxt *sctxt, 74697bce634SChuck Lever struct svc_rdma_recv_ctxt *rctxt, 747c06b540aSTom Tucker struct svc_rqst *rqstp, 7489a6a180bSChuck Lever __be32 *wr_lst, __be32 *rp_ch) 749c06b540aSTom Tucker { 7509a6a180bSChuck Lever int ret; 7510e7f011aSTom Tucker 7529a6a180bSChuck Lever if (!rp_ch) { 75397bce634SChuck Lever ret = svc_rdma_map_reply_msg(rdma, sctxt, 7549a6a180bSChuck Lever &rqstp->rq_res, wr_lst); 7559a6a180bSChuck Lever if (ret < 0) 75699722fe4SChuck Lever return ret; 7573fe04ee9SChuck Lever } 758c06b540aSTom Tucker 75997bce634SChuck Lever svc_rdma_save_io_pages(rqstp, sctxt); 7600bf48289SSteve Wise 76197bce634SChuck Lever if (rctxt->rc_inv_rkey) { 76297bce634SChuck Lever sctxt->sc_send_wr.opcode = IB_WR_SEND_WITH_INV; 76397bce634SChuck Lever sctxt->sc_send_wr.ex.invalidate_rkey = rctxt->rc_inv_rkey; 76497bce634SChuck Lever } else { 76597bce634SChuck Lever sctxt->sc_send_wr.opcode = IB_WR_SEND; 766986b7889SChuck Lever } 767986b7889SChuck Lever dprintk("svcrdma: posting Send WR with %u sge(s)\n", 76897bce634SChuck Lever sctxt->sc_send_wr.num_sge); 76997bce634SChuck Lever return svc_rdma_send(rdma, &sctxt->sc_send_wr); 770c06b540aSTom Tucker } 771c06b540aSTom Tucker 7724757d90bSChuck Lever /* Given the client-provided Write and Reply chunks, the server was not 7734757d90bSChuck Lever * able to form a complete reply. Return an RDMA_ERROR message so the 7744757d90bSChuck Lever * client can retire this RPC transaction. As above, the Send completion 7754757d90bSChuck Lever * routine releases payload pages that were part of a previous RDMA Write. 7764757d90bSChuck Lever * 7774757d90bSChuck Lever * Remote Invalidation is skipped for simplicity. 7784757d90bSChuck Lever */ 7794757d90bSChuck Lever static int svc_rdma_send_error_msg(struct svcxprt_rdma *rdma, 78099722fe4SChuck Lever struct svc_rdma_send_ctxt *ctxt, 78199722fe4SChuck Lever struct svc_rqst *rqstp) 7824757d90bSChuck Lever { 7834757d90bSChuck Lever __be32 *p; 7844757d90bSChuck Lever 78599722fe4SChuck Lever p = ctxt->sc_xprt_buf; 78699722fe4SChuck Lever trace_svcrdma_err_chunk(*p); 78799722fe4SChuck Lever p += 3; 7884757d90bSChuck Lever *p++ = rdma_error; 7894757d90bSChuck Lever *p = err_chunk; 79099722fe4SChuck Lever svc_rdma_sync_reply_hdr(rdma, ctxt, RPCRDMA_HDRLEN_ERR); 7914757d90bSChuck Lever 7924757d90bSChuck Lever svc_rdma_save_io_pages(rqstp, ctxt); 7934757d90bSChuck Lever 794986b7889SChuck Lever ctxt->sc_send_wr.opcode = IB_WR_SEND; 795758a3bf9SChuck Lever return svc_rdma_send(rdma, &ctxt->sc_send_wr); 79699722fe4SChuck Lever } 79799722fe4SChuck Lever 7989a6a180bSChuck Lever /** 7999a6a180bSChuck Lever * svc_rdma_sendto - Transmit an RPC reply 8009a6a180bSChuck Lever * @rqstp: processed RPC request, reply XDR already in ::rq_res 8019a6a180bSChuck Lever * 8029a6a180bSChuck Lever * Any resources still associated with @rqstp are released upon return. 8039a6a180bSChuck Lever * If no reply message was possible, the connection is closed. 8049a6a180bSChuck Lever * 8059a6a180bSChuck Lever * Returns: 8069a6a180bSChuck Lever * %0 if an RPC reply has been successfully posted, 8079a6a180bSChuck Lever * %-ENOMEM if a resource shortage occurred (connection is lost), 8089a6a180bSChuck Lever * %-ENOTCONN if posting failed (connection is lost). 8099a6a180bSChuck Lever */ 810c06b540aSTom Tucker int svc_rdma_sendto(struct svc_rqst *rqstp) 811c06b540aSTom Tucker { 812c06b540aSTom Tucker struct svc_xprt *xprt = rqstp->rq_xprt; 813c06b540aSTom Tucker struct svcxprt_rdma *rdma = 814c06b540aSTom Tucker container_of(xprt, struct svcxprt_rdma, sc_xprt); 8153a88092eSChuck Lever struct svc_rdma_recv_ctxt *rctxt = rqstp->rq_xprt_ctxt; 8169a6a180bSChuck Lever __be32 *p, *rdma_argp, *rdma_resp, *wr_lst, *rp_ch; 8179a6a180bSChuck Lever struct xdr_buf *xdr = &rqstp->rq_res; 81899722fe4SChuck Lever struct svc_rdma_send_ctxt *sctxt; 8199a6a180bSChuck Lever int ret; 820c06b540aSTom Tucker 8213316f063SChuck Lever rdma_argp = rctxt->rc_recv_buf; 8229a6a180bSChuck Lever svc_rdma_get_write_arrays(rdma_argp, &wr_lst, &rp_ch); 823c06b540aSTom Tucker 824e4eb42ceSChuck Lever /* Create the RDMA response header. xprt->xpt_mutex, 825e4eb42ceSChuck Lever * acquired in svc_send(), serializes RPC replies. The 826e4eb42ceSChuck Lever * code path below that inserts the credit grant value 827e4eb42ceSChuck Lever * into each transport header runs only inside this 828e4eb42ceSChuck Lever * critical section. 829e4eb42ceSChuck Lever */ 83078da2b3cSChuck Lever ret = -ENOMEM; 83199722fe4SChuck Lever sctxt = svc_rdma_send_ctxt_get(rdma); 83299722fe4SChuck Lever if (!sctxt) 83378da2b3cSChuck Lever goto err0; 83499722fe4SChuck Lever rdma_resp = sctxt->sc_xprt_buf; 83598fc21d3SChuck Lever 8369a6a180bSChuck Lever p = rdma_resp; 8379a6a180bSChuck Lever *p++ = *rdma_argp; 8389a6a180bSChuck Lever *p++ = *(rdma_argp + 1); 83998fc21d3SChuck Lever *p++ = rdma->sc_fc_credits; 8409a6a180bSChuck Lever *p++ = rp_ch ? rdma_nomsg : rdma_msg; 84198fc21d3SChuck Lever 84298fc21d3SChuck Lever /* Start with empty chunks */ 84398fc21d3SChuck Lever *p++ = xdr_zero; 84498fc21d3SChuck Lever *p++ = xdr_zero; 84598fc21d3SChuck Lever *p = xdr_zero; 846c06b540aSTom Tucker 8479a6a180bSChuck Lever if (wr_lst) { 8489a6a180bSChuck Lever /* XXX: Presume the client sent only one Write chunk */ 84941205539SChuck Lever unsigned long offset; 85041205539SChuck Lever unsigned int length; 85141205539SChuck Lever 85241205539SChuck Lever if (rctxt->rc_read_payload_length) { 85341205539SChuck Lever offset = rctxt->rc_read_payload_offset; 85441205539SChuck Lever length = rctxt->rc_read_payload_length; 85541205539SChuck Lever } else { 85641205539SChuck Lever offset = xdr->head[0].iov_len; 85741205539SChuck Lever length = xdr->page_len; 85841205539SChuck Lever } 85941205539SChuck Lever ret = svc_rdma_send_write_chunk(rdma, wr_lst, xdr, offset, 86041205539SChuck Lever length); 86108ae4e7fSChuck Lever if (ret < 0) 8624757d90bSChuck Lever goto err2; 8639a6a180bSChuck Lever svc_rdma_xdr_encode_write_list(rdma_resp, wr_lst, ret); 86408ae4e7fSChuck Lever } 8659a6a180bSChuck Lever if (rp_ch) { 8669a6a180bSChuck Lever ret = svc_rdma_send_reply_chunk(rdma, rp_ch, wr_lst, xdr); 86708ae4e7fSChuck Lever if (ret < 0) 8684757d90bSChuck Lever goto err2; 8699a6a180bSChuck Lever svc_rdma_xdr_encode_reply_chunk(rdma_resp, rp_ch, ret); 87008ae4e7fSChuck Lever } 871c06b540aSTom Tucker 87299722fe4SChuck Lever svc_rdma_sync_reply_hdr(rdma, sctxt, svc_rdma_reply_hdr_len(rdma_resp)); 87397bce634SChuck Lever ret = svc_rdma_send_reply_msg(rdma, sctxt, rctxt, rqstp, 8749a6a180bSChuck Lever wr_lst, rp_ch); 8753e1eeb98SChuck Lever if (ret < 0) 87699722fe4SChuck Lever goto err1; 8773a88092eSChuck Lever ret = 0; 8783a88092eSChuck Lever 8793a88092eSChuck Lever out: 8803a88092eSChuck Lever rqstp->rq_xprt_ctxt = NULL; 8813a88092eSChuck Lever svc_rdma_recv_ctxt_put(rdma, rctxt); 8823a88092eSChuck Lever return ret; 883afd566eaSTom Tucker 8844757d90bSChuck Lever err2: 885b20dae70SColin Ian King if (ret != -E2BIG && ret != -EINVAL) 8864757d90bSChuck Lever goto err1; 8874757d90bSChuck Lever 88899722fe4SChuck Lever ret = svc_rdma_send_error_msg(rdma, sctxt, rqstp); 8894757d90bSChuck Lever if (ret < 0) 89099722fe4SChuck Lever goto err1; 8913a88092eSChuck Lever ret = 0; 8923a88092eSChuck Lever goto out; 8934757d90bSChuck Lever 894afd566eaSTom Tucker err1: 89599722fe4SChuck Lever svc_rdma_send_ctxt_put(rdma, sctxt); 896afd566eaSTom Tucker err0: 897bd2abef3SChuck Lever trace_svcrdma_send_failed(rqstp, ret); 8989a6a180bSChuck Lever set_bit(XPT_CLOSE, &xprt->xpt_flags); 8993a88092eSChuck Lever ret = -ENOTCONN; 9003a88092eSChuck Lever goto out; 901c06b540aSTom Tucker } 90241205539SChuck Lever 90341205539SChuck Lever /** 90441205539SChuck Lever * svc_rdma_read_payload - special processing for a READ payload 90541205539SChuck Lever * @rqstp: svc_rqst to operate on 90641205539SChuck Lever * @offset: payload's byte offset in @xdr 90741205539SChuck Lever * @length: size of payload, in bytes 90841205539SChuck Lever * 90941205539SChuck Lever * Returns zero on success. 91041205539SChuck Lever * 91141205539SChuck Lever * For the moment, just record the xdr_buf location of the READ 91241205539SChuck Lever * payload. svc_rdma_sendto will use that location later when 91341205539SChuck Lever * we actually send the payload. 91441205539SChuck Lever */ 91541205539SChuck Lever int svc_rdma_read_payload(struct svc_rqst *rqstp, unsigned int offset, 91641205539SChuck Lever unsigned int length) 91741205539SChuck Lever { 91841205539SChuck Lever struct svc_rdma_recv_ctxt *rctxt = rqstp->rq_xprt_ctxt; 91941205539SChuck Lever 92041205539SChuck Lever /* XXX: Just one READ payload slot for now, since our 92141205539SChuck Lever * transport implementation currently supports only one 92241205539SChuck Lever * Write chunk. 92341205539SChuck Lever */ 92441205539SChuck Lever rctxt->rc_read_payload_offset = offset; 92541205539SChuck Lever rctxt->rc_read_payload_length = length; 92641205539SChuck Lever 92741205539SChuck Lever return 0; 92841205539SChuck Lever } 929