xref: /openbmc/linux/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c (revision ecc23d0a422a3118fcf6e4f0a46e17a6c2047b02)
1bcf3ffd4SChuck Lever // SPDX-License-Identifier: GPL-2.0 OR BSD-3-Clause
2d5b31be6STom Tucker /*
3ecf85b23SChuck Lever  * Copyright (c) 2016-2018 Oracle. All rights reserved.
40bf48289SSteve Wise  * Copyright (c) 2014 Open Grid Computing, Inc. All rights reserved.
5d5b31be6STom Tucker  * Copyright (c) 2005-2006 Network Appliance, Inc. All rights reserved.
6d5b31be6STom Tucker  *
7d5b31be6STom Tucker  * This software is available to you under a choice of one of two
8d5b31be6STom Tucker  * licenses.  You may choose to be licensed under the terms of the GNU
9d5b31be6STom Tucker  * General Public License (GPL) Version 2, available from the file
10d5b31be6STom Tucker  * COPYING in the main directory of this source tree, or the BSD-type
11d5b31be6STom Tucker  * license below:
12d5b31be6STom Tucker  *
13d5b31be6STom Tucker  * Redistribution and use in source and binary forms, with or without
14d5b31be6STom Tucker  * modification, are permitted provided that the following conditions
15d5b31be6STom Tucker  * are met:
16d5b31be6STom Tucker  *
17d5b31be6STom Tucker  *      Redistributions of source code must retain the above copyright
18d5b31be6STom Tucker  *      notice, this list of conditions and the following disclaimer.
19d5b31be6STom Tucker  *
20d5b31be6STom Tucker  *      Redistributions in binary form must reproduce the above
21d5b31be6STom Tucker  *      copyright notice, this list of conditions and the following
22d5b31be6STom Tucker  *      disclaimer in the documentation and/or other materials provided
23d5b31be6STom Tucker  *      with the distribution.
24d5b31be6STom Tucker  *
25d5b31be6STom Tucker  *      Neither the name of the Network Appliance, Inc. nor the names of
26d5b31be6STom Tucker  *      its contributors may be used to endorse or promote products
27d5b31be6STom Tucker  *      derived from this software without specific prior written
28d5b31be6STom Tucker  *      permission.
29d5b31be6STom Tucker  *
30d5b31be6STom Tucker  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
31d5b31be6STom Tucker  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
32d5b31be6STom Tucker  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
33d5b31be6STom Tucker  * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
34d5b31be6STom Tucker  * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
35d5b31be6STom Tucker  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
36d5b31be6STom Tucker  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
37d5b31be6STom Tucker  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
38d5b31be6STom Tucker  * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
39d5b31be6STom Tucker  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
40d5b31be6STom Tucker  * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
41d5b31be6STom Tucker  *
42d5b31be6STom Tucker  * Author: Tom Tucker <tom@opengridcomputing.com>
43d5b31be6STom Tucker  */
44d5b31be6STom Tucker 
45cafc7398SChuck Lever /* Operation
46cafc7398SChuck Lever  *
47cafc7398SChuck Lever  * The main entry point is svc_rdma_recvfrom. This is called from
48cafc7398SChuck Lever  * svc_recv when the transport indicates there is incoming data to
49cafc7398SChuck Lever  * be read. "Data Ready" is signaled when an RDMA Receive completes,
50cafc7398SChuck Lever  * or when a set of RDMA Reads complete.
51cafc7398SChuck Lever  *
52cafc7398SChuck Lever  * An svc_rqst is passed in. This structure contains an array of
53cafc7398SChuck Lever  * free pages (rq_pages) that will contain the incoming RPC message.
54cafc7398SChuck Lever  *
55cafc7398SChuck Lever  * Short messages are moved directly into svc_rqst::rq_arg, and
56cafc7398SChuck Lever  * the RPC Call is ready to be processed by the Upper Layer.
57cafc7398SChuck Lever  * svc_rdma_recvfrom returns the length of the RPC Call message,
58cafc7398SChuck Lever  * completing the reception of the RPC Call.
59cafc7398SChuck Lever  *
60cafc7398SChuck Lever  * However, when an incoming message has Read chunks,
61cafc7398SChuck Lever  * svc_rdma_recvfrom must post RDMA Reads to pull the RPC Call's
62cafc7398SChuck Lever  * data payload from the client. svc_rdma_recvfrom sets up the
63cafc7398SChuck Lever  * RDMA Reads using pages in svc_rqst::rq_pages, which are
64ecf85b23SChuck Lever  * transferred to an svc_rdma_recv_ctxt for the duration of the
65cafc7398SChuck Lever  * I/O. svc_rdma_recvfrom then returns zero, since the RPC message
66cafc7398SChuck Lever  * is still not yet ready.
67cafc7398SChuck Lever  *
68cafc7398SChuck Lever  * When the Read chunk payloads have become available on the
69cafc7398SChuck Lever  * server, "Data Ready" is raised again, and svc_recv calls
70cafc7398SChuck Lever  * svc_rdma_recvfrom again. This second call may use a different
71cafc7398SChuck Lever  * svc_rqst than the first one, thus any information that needs
72cafc7398SChuck Lever  * to be preserved across these two calls is kept in an
73ecf85b23SChuck Lever  * svc_rdma_recv_ctxt.
74cafc7398SChuck Lever  *
75cafc7398SChuck Lever  * The second call to svc_rdma_recvfrom performs final assembly
76cafc7398SChuck Lever  * of the RPC Call message, using the RDMA Read sink pages kept in
77ecf85b23SChuck Lever  * the svc_rdma_recv_ctxt. The xdr_buf is copied from the
78ecf85b23SChuck Lever  * svc_rdma_recv_ctxt to the second svc_rqst. The second call returns
79cafc7398SChuck Lever  * the length of the completed RPC Call message.
80cafc7398SChuck Lever  *
81cafc7398SChuck Lever  * Page Management
82cafc7398SChuck Lever  *
83cafc7398SChuck Lever  * Pages under I/O must be transferred from the first svc_rqst to an
84ecf85b23SChuck Lever  * svc_rdma_recv_ctxt before the first svc_rdma_recvfrom call returns.
85cafc7398SChuck Lever  *
86cafc7398SChuck Lever  * The first svc_rqst supplies pages for RDMA Reads. These are moved
87cafc7398SChuck Lever  * from rqstp::rq_pages into ctxt::pages. The consumed elements of
88cafc7398SChuck Lever  * the rq_pages array are set to NULL and refilled with the first
89cafc7398SChuck Lever  * svc_rdma_recvfrom call returns.
90cafc7398SChuck Lever  *
91cafc7398SChuck Lever  * During the second svc_rdma_recvfrom call, RDMA Read sink pages
929af723beSChuck Lever  * are transferred from the svc_rdma_recv_ctxt to the second svc_rqst.
93cafc7398SChuck Lever  */
94cafc7398SChuck Lever 
9578147ca8SChuck Lever #include <linux/slab.h>
9698895edbSChuck Lever #include <linux/spinlock.h>
97d5b31be6STom Tucker #include <asm/unaligned.h>
98d5b31be6STom Tucker #include <rdma/ib_verbs.h>
99d5b31be6STom Tucker #include <rdma/rdma_cm.h>
100cafc7398SChuck Lever 
101cafc7398SChuck Lever #include <linux/sunrpc/xdr.h>
102cafc7398SChuck Lever #include <linux/sunrpc/debug.h>
103cafc7398SChuck Lever #include <linux/sunrpc/rpc_rdma.h>
104d5b31be6STom Tucker #include <linux/sunrpc/svc_rdma.h>
105d5b31be6STom Tucker 
10698895edbSChuck Lever #include "xprt_rdma.h"
10798895edbSChuck Lever #include <trace/events/rpcrdma.h>
10898895edbSChuck Lever 
109ecf85b23SChuck Lever static void svc_rdma_wc_receive(struct ib_cq *cq, struct ib_wc *wc);
110ecf85b23SChuck Lever 
111ecf85b23SChuck Lever static inline struct svc_rdma_recv_ctxt *
svc_rdma_next_recv_ctxt(struct list_head * list)112ecf85b23SChuck Lever svc_rdma_next_recv_ctxt(struct list_head *list)
113ecf85b23SChuck Lever {
114ecf85b23SChuck Lever 	return list_first_entry_or_null(list, struct svc_rdma_recv_ctxt,
115ecf85b23SChuck Lever 					rc_list);
116ecf85b23SChuck Lever }
117ecf85b23SChuck Lever 
svc_rdma_recv_cid_init(struct svcxprt_rdma * rdma,struct rpc_rdma_cid * cid)1189b3bcf8cSChuck Lever static void svc_rdma_recv_cid_init(struct svcxprt_rdma *rdma,
1199b3bcf8cSChuck Lever 				   struct rpc_rdma_cid *cid)
1209b3bcf8cSChuck Lever {
1219b3bcf8cSChuck Lever 	cid->ci_queue_id = rdma->sc_rq_cq->res.id;
1229b3bcf8cSChuck Lever 	cid->ci_completion_id = atomic_inc_return(&rdma->sc_completion_ids);
1239b3bcf8cSChuck Lever }
1249b3bcf8cSChuck Lever 
1253316f063SChuck Lever static struct svc_rdma_recv_ctxt *
svc_rdma_recv_ctxt_alloc(struct svcxprt_rdma * rdma)1263316f063SChuck Lever svc_rdma_recv_ctxt_alloc(struct svcxprt_rdma *rdma)
1273316f063SChuck Lever {
128c5d68d25SChuck Lever 	int node = ibdev_to_node(rdma->sc_cm_id->device);
1293316f063SChuck Lever 	struct svc_rdma_recv_ctxt *ctxt;
1303316f063SChuck Lever 	dma_addr_t addr;
1313316f063SChuck Lever 	void *buffer;
1323316f063SChuck Lever 
133c5d68d25SChuck Lever 	ctxt = kmalloc_node(sizeof(*ctxt), GFP_KERNEL, node);
1343316f063SChuck Lever 	if (!ctxt)
1353316f063SChuck Lever 		goto fail0;
136c5d68d25SChuck Lever 	buffer = kmalloc_node(rdma->sc_max_req_size, GFP_KERNEL, node);
1373316f063SChuck Lever 	if (!buffer)
1383316f063SChuck Lever 		goto fail1;
1393316f063SChuck Lever 	addr = ib_dma_map_single(rdma->sc_pd->device, buffer,
1403316f063SChuck Lever 				 rdma->sc_max_req_size, DMA_FROM_DEVICE);
1413316f063SChuck Lever 	if (ib_dma_mapping_error(rdma->sc_pd->device, addr))
1423316f063SChuck Lever 		goto fail2;
1433316f063SChuck Lever 
1449b3bcf8cSChuck Lever 	svc_rdma_recv_cid_init(rdma, &ctxt->rc_cid);
14578147ca8SChuck Lever 	pcl_init(&ctxt->rc_call_pcl);
14678147ca8SChuck Lever 	pcl_init(&ctxt->rc_read_pcl);
14778147ca8SChuck Lever 	pcl_init(&ctxt->rc_write_pcl);
14878147ca8SChuck Lever 	pcl_init(&ctxt->rc_reply_pcl);
1499b3bcf8cSChuck Lever 
1503316f063SChuck Lever 	ctxt->rc_recv_wr.next = NULL;
1513316f063SChuck Lever 	ctxt->rc_recv_wr.wr_cqe = &ctxt->rc_cqe;
1523316f063SChuck Lever 	ctxt->rc_recv_wr.sg_list = &ctxt->rc_recv_sge;
1533316f063SChuck Lever 	ctxt->rc_recv_wr.num_sge = 1;
1543316f063SChuck Lever 	ctxt->rc_cqe.done = svc_rdma_wc_receive;
1553316f063SChuck Lever 	ctxt->rc_recv_sge.addr = addr;
1563316f063SChuck Lever 	ctxt->rc_recv_sge.length = rdma->sc_max_req_size;
1573316f063SChuck Lever 	ctxt->rc_recv_sge.lkey = rdma->sc_pd->local_dma_lkey;
1583316f063SChuck Lever 	ctxt->rc_recv_buf = buffer;
1593316f063SChuck Lever 	return ctxt;
1603316f063SChuck Lever 
1613316f063SChuck Lever fail2:
1623316f063SChuck Lever 	kfree(buffer);
1633316f063SChuck Lever fail1:
1643316f063SChuck Lever 	kfree(ctxt);
1653316f063SChuck Lever fail0:
1663316f063SChuck Lever 	return NULL;
1673316f063SChuck Lever }
1683316f063SChuck Lever 
svc_rdma_recv_ctxt_destroy(struct svcxprt_rdma * rdma,struct svc_rdma_recv_ctxt * ctxt)169eb5d7a62SChuck Lever static void svc_rdma_recv_ctxt_destroy(struct svcxprt_rdma *rdma,
170eb5d7a62SChuck Lever 				       struct svc_rdma_recv_ctxt *ctxt)
171eb5d7a62SChuck Lever {
172eb5d7a62SChuck Lever 	ib_dma_unmap_single(rdma->sc_pd->device, ctxt->rc_recv_sge.addr,
173eb5d7a62SChuck Lever 			    ctxt->rc_recv_sge.length, DMA_FROM_DEVICE);
174eb5d7a62SChuck Lever 	kfree(ctxt->rc_recv_buf);
175eb5d7a62SChuck Lever 	kfree(ctxt);
176eb5d7a62SChuck Lever }
177eb5d7a62SChuck Lever 
178ecf85b23SChuck Lever /**
179ecf85b23SChuck Lever  * svc_rdma_recv_ctxts_destroy - Release all recv_ctxt's for an xprt
180ecf85b23SChuck Lever  * @rdma: svcxprt_rdma being torn down
181ecf85b23SChuck Lever  *
182ecf85b23SChuck Lever  */
svc_rdma_recv_ctxts_destroy(struct svcxprt_rdma * rdma)183ecf85b23SChuck Lever void svc_rdma_recv_ctxts_destroy(struct svcxprt_rdma *rdma)
184ecf85b23SChuck Lever {
185ecf85b23SChuck Lever 	struct svc_rdma_recv_ctxt *ctxt;
1864866073eSChuck Lever 	struct llist_node *node;
187ecf85b23SChuck Lever 
1884866073eSChuck Lever 	while ((node = llist_del_first(&rdma->sc_recv_ctxts))) {
1894866073eSChuck Lever 		ctxt = llist_entry(node, struct svc_rdma_recv_ctxt, rc_node);
190eb5d7a62SChuck Lever 		svc_rdma_recv_ctxt_destroy(rdma, ctxt);
191ecf85b23SChuck Lever 	}
192ecf85b23SChuck Lever }
193ecf85b23SChuck Lever 
1949d0b09d5SChuck Lever /**
1959d0b09d5SChuck Lever  * svc_rdma_recv_ctxt_get - Allocate a recv_ctxt
1969d0b09d5SChuck Lever  * @rdma: controlling svcxprt_rdma
1979d0b09d5SChuck Lever  *
1989d0b09d5SChuck Lever  * Returns a recv_ctxt or (rarely) NULL if none are available.
1999d0b09d5SChuck Lever  */
svc_rdma_recv_ctxt_get(struct svcxprt_rdma * rdma)2009d0b09d5SChuck Lever struct svc_rdma_recv_ctxt *svc_rdma_recv_ctxt_get(struct svcxprt_rdma *rdma)
201ecf85b23SChuck Lever {
202ecf85b23SChuck Lever 	struct svc_rdma_recv_ctxt *ctxt;
2034866073eSChuck Lever 	struct llist_node *node;
204ecf85b23SChuck Lever 
2054866073eSChuck Lever 	node = llist_del_first(&rdma->sc_recv_ctxts);
2064866073eSChuck Lever 	if (!node)
207ecf85b23SChuck Lever 		goto out_empty;
2084866073eSChuck Lever 	ctxt = llist_entry(node, struct svc_rdma_recv_ctxt, rc_node);
209ecf85b23SChuck Lever 
210ecf85b23SChuck Lever out:
211ecf85b23SChuck Lever 	ctxt->rc_page_count = 0;
212ecf85b23SChuck Lever 	return ctxt;
213ecf85b23SChuck Lever 
214ecf85b23SChuck Lever out_empty:
2153316f063SChuck Lever 	ctxt = svc_rdma_recv_ctxt_alloc(rdma);
216ecf85b23SChuck Lever 	if (!ctxt)
217ecf85b23SChuck Lever 		return NULL;
218ecf85b23SChuck Lever 	goto out;
219ecf85b23SChuck Lever }
220ecf85b23SChuck Lever 
221ecf85b23SChuck Lever /**
222ecf85b23SChuck Lever  * svc_rdma_recv_ctxt_put - Return recv_ctxt to free list
223ecf85b23SChuck Lever  * @rdma: controlling svcxprt_rdma
224ecf85b23SChuck Lever  * @ctxt: object to return to the free list
225ecf85b23SChuck Lever  *
226ecf85b23SChuck Lever  */
svc_rdma_recv_ctxt_put(struct svcxprt_rdma * rdma,struct svc_rdma_recv_ctxt * ctxt)227ecf85b23SChuck Lever void svc_rdma_recv_ctxt_put(struct svcxprt_rdma *rdma,
2281e5f4160SChuck Lever 			    struct svc_rdma_recv_ctxt *ctxt)
229ecf85b23SChuck Lever {
23078147ca8SChuck Lever 	pcl_free(&ctxt->rc_call_pcl);
23178147ca8SChuck Lever 	pcl_free(&ctxt->rc_read_pcl);
23278147ca8SChuck Lever 	pcl_free(&ctxt->rc_write_pcl);
23378147ca8SChuck Lever 	pcl_free(&ctxt->rc_reply_pcl);
23478147ca8SChuck Lever 
2354866073eSChuck Lever 	llist_add(&ctxt->rc_node, &rdma->sc_recv_ctxts);
236ecf85b23SChuck Lever }
237ecf85b23SChuck Lever 
23823cf1ee1SChuck Lever /**
239948f072aSNeilBrown  * svc_rdma_release_ctxt - Release transport-specific per-rqst resources
240948f072aSNeilBrown  * @xprt: the transport which owned the context
241948f072aSNeilBrown  * @vctxt: the context from rqstp->rq_xprt_ctxt or dr->xprt_ctxt
24223cf1ee1SChuck Lever  *
24323cf1ee1SChuck Lever  * Ensure that the recv_ctxt is released whether or not a Reply
24423cf1ee1SChuck Lever  * was sent. For example, the client could close the connection,
24523cf1ee1SChuck Lever  * or svc_process could drop an RPC, before the Reply is sent.
24623cf1ee1SChuck Lever  */
svc_rdma_release_ctxt(struct svc_xprt * xprt,void * vctxt)247948f072aSNeilBrown void svc_rdma_release_ctxt(struct svc_xprt *xprt, void *vctxt)
24823cf1ee1SChuck Lever {
249948f072aSNeilBrown 	struct svc_rdma_recv_ctxt *ctxt = vctxt;
25023cf1ee1SChuck Lever 	struct svcxprt_rdma *rdma =
25123cf1ee1SChuck Lever 		container_of(xprt, struct svcxprt_rdma, sc_xprt);
25223cf1ee1SChuck Lever 
25323cf1ee1SChuck Lever 	if (ctxt)
25423cf1ee1SChuck Lever 		svc_rdma_recv_ctxt_put(rdma, ctxt);
25523cf1ee1SChuck Lever }
25623cf1ee1SChuck Lever 
svc_rdma_refresh_recvs(struct svcxprt_rdma * rdma,unsigned int wanted)25777f0a2aaSChuck Lever static bool svc_rdma_refresh_recvs(struct svcxprt_rdma *rdma,
258c5d68d25SChuck Lever 				   unsigned int wanted)
25977f0a2aaSChuck Lever {
26077f0a2aaSChuck Lever 	const struct ib_recv_wr *bad_wr = NULL;
26177f0a2aaSChuck Lever 	struct svc_rdma_recv_ctxt *ctxt;
26277f0a2aaSChuck Lever 	struct ib_recv_wr *recv_chain;
26377f0a2aaSChuck Lever 	int ret;
26477f0a2aaSChuck Lever 
26577f0a2aaSChuck Lever 	if (test_bit(XPT_CLOSE, &rdma->sc_xprt.xpt_flags))
26677f0a2aaSChuck Lever 		return false;
26777f0a2aaSChuck Lever 
26877f0a2aaSChuck Lever 	recv_chain = NULL;
26977f0a2aaSChuck Lever 	while (wanted--) {
27077f0a2aaSChuck Lever 		ctxt = svc_rdma_recv_ctxt_get(rdma);
27177f0a2aaSChuck Lever 		if (!ctxt)
27277f0a2aaSChuck Lever 			break;
27377f0a2aaSChuck Lever 
27477f0a2aaSChuck Lever 		trace_svcrdma_post_recv(ctxt);
27577f0a2aaSChuck Lever 		ctxt->rc_recv_wr.next = recv_chain;
27677f0a2aaSChuck Lever 		recv_chain = &ctxt->rc_recv_wr;
277c558d475SChuck Lever 		rdma->sc_pending_recvs++;
27877f0a2aaSChuck Lever 	}
27977f0a2aaSChuck Lever 	if (!recv_chain)
28077f0a2aaSChuck Lever 		return false;
28177f0a2aaSChuck Lever 
28277f0a2aaSChuck Lever 	ret = ib_post_recv(rdma->sc_qp, recv_chain, &bad_wr);
28377f0a2aaSChuck Lever 	if (ret)
28477f0a2aaSChuck Lever 		goto err_free;
28577f0a2aaSChuck Lever 	return true;
28677f0a2aaSChuck Lever 
28777f0a2aaSChuck Lever err_free:
28877f0a2aaSChuck Lever 	trace_svcrdma_rq_post_err(rdma, ret);
28977f0a2aaSChuck Lever 	while (bad_wr) {
29077f0a2aaSChuck Lever 		ctxt = container_of(bad_wr, struct svc_rdma_recv_ctxt,
29177f0a2aaSChuck Lever 				    rc_recv_wr);
29277f0a2aaSChuck Lever 		bad_wr = bad_wr->next;
29377f0a2aaSChuck Lever 		svc_rdma_recv_ctxt_put(rdma, ctxt);
29477f0a2aaSChuck Lever 	}
295c558d475SChuck Lever 	/* Since we're destroying the xprt, no need to reset
296c558d475SChuck Lever 	 * sc_pending_recvs. */
29777f0a2aaSChuck Lever 	return false;
29877f0a2aaSChuck Lever }
29977f0a2aaSChuck Lever 
300ecf85b23SChuck Lever /**
301ecf85b23SChuck Lever  * svc_rdma_post_recvs - Post initial set of Recv WRs
302ecf85b23SChuck Lever  * @rdma: fresh svcxprt_rdma
303ecf85b23SChuck Lever  *
304ecf85b23SChuck Lever  * Returns true if successful, otherwise false.
305ecf85b23SChuck Lever  */
svc_rdma_post_recvs(struct svcxprt_rdma * rdma)306ecf85b23SChuck Lever bool svc_rdma_post_recvs(struct svcxprt_rdma *rdma)
307ecf85b23SChuck Lever {
308c5d68d25SChuck Lever 	return svc_rdma_refresh_recvs(rdma, rdma->sc_max_requests);
309ecf85b23SChuck Lever }
310ecf85b23SChuck Lever 
311ecf85b23SChuck Lever /**
312ecf85b23SChuck Lever  * svc_rdma_wc_receive - Invoked by RDMA provider for each polled Receive WC
313ecf85b23SChuck Lever  * @cq: Completion Queue context
314ecf85b23SChuck Lever  * @wc: Work Completion object
315ecf85b23SChuck Lever  *
316ecf85b23SChuck Lever  */
svc_rdma_wc_receive(struct ib_cq * cq,struct ib_wc * wc)317ecf85b23SChuck Lever static void svc_rdma_wc_receive(struct ib_cq *cq, struct ib_wc *wc)
318ecf85b23SChuck Lever {
319ecf85b23SChuck Lever 	struct svcxprt_rdma *rdma = cq->cq_context;
320ecf85b23SChuck Lever 	struct ib_cqe *cqe = wc->wr_cqe;
321ecf85b23SChuck Lever 	struct svc_rdma_recv_ctxt *ctxt;
322ecf85b23SChuck Lever 
323c558d475SChuck Lever 	rdma->sc_pending_recvs--;
324c558d475SChuck Lever 
325ecf85b23SChuck Lever 	/* WARNING: Only wc->wr_cqe and wc->status are reliable */
326ecf85b23SChuck Lever 	ctxt = container_of(cqe, struct svc_rdma_recv_ctxt, rc_cqe);
327ecf85b23SChuck Lever 
328ecf85b23SChuck Lever 	if (wc->status != IB_WC_SUCCESS)
329ecf85b23SChuck Lever 		goto flushed;
3308dcc5721SChuck Lever 	trace_svcrdma_wc_recv(wc, &ctxt->rc_cid);
331ecf85b23SChuck Lever 
3327b748c30SChuck Lever 	/* If receive posting fails, the connection is about to be
3337b748c30SChuck Lever 	 * lost anyway. The server will not be able to send a reply
3347b748c30SChuck Lever 	 * for this RPC, and the client will retransmit this RPC
3357b748c30SChuck Lever 	 * anyway when it reconnects.
3367b748c30SChuck Lever 	 *
3377b748c30SChuck Lever 	 * Therefore we drop the Receive, even if status was SUCCESS
3387b748c30SChuck Lever 	 * to reduce the likelihood of replayed requests once the
3397b748c30SChuck Lever 	 * client reconnects.
3407b748c30SChuck Lever 	 */
341c558d475SChuck Lever 	if (rdma->sc_pending_recvs < rdma->sc_max_requests)
342c5d68d25SChuck Lever 		if (!svc_rdma_refresh_recvs(rdma, rdma->sc_recv_batch))
3438dcc5721SChuck Lever 			goto dropped;
344bade4be6SChuck Lever 
345ecf85b23SChuck Lever 	/* All wc fields are now known to be valid */
346ecf85b23SChuck Lever 	ctxt->rc_byte_len = wc->byte_len;
3473316f063SChuck Lever 
348ecf85b23SChuck Lever 	spin_lock(&rdma->sc_rq_dto_lock);
349ecf85b23SChuck Lever 	list_add_tail(&ctxt->rc_list, &rdma->sc_rq_dto_q);
35095503d29SJ. Bruce Fields 	/* Note the unlock pairs with the smp_rmb in svc_xprt_ready: */
351ecf85b23SChuck Lever 	set_bit(XPT_DATA, &rdma->sc_xprt.xpt_flags);
35295503d29SJ. Bruce Fields 	spin_unlock(&rdma->sc_rq_dto_lock);
353ecf85b23SChuck Lever 	if (!test_bit(RDMAXPRT_CONN_PENDING, &rdma->sc_flags))
354ecf85b23SChuck Lever 		svc_xprt_enqueue(&rdma->sc_xprt);
355365e9992SChuck Lever 	return;
356ecf85b23SChuck Lever 
357ecf85b23SChuck Lever flushed:
3588dcc5721SChuck Lever 	if (wc->status == IB_WC_WR_FLUSH_ERR)
3598dcc5721SChuck Lever 		trace_svcrdma_wc_recv_flush(wc, &ctxt->rc_cid);
3608dcc5721SChuck Lever 	else
3618dcc5721SChuck Lever 		trace_svcrdma_wc_recv_err(wc, &ctxt->rc_cid);
3628dcc5721SChuck Lever dropped:
363bade4be6SChuck Lever 	svc_rdma_recv_ctxt_put(rdma, ctxt);
364e844d307SChuck Lever 	svc_xprt_deferred_close(&rdma->sc_xprt);
365ecf85b23SChuck Lever }
366ecf85b23SChuck Lever 
367ecf85b23SChuck Lever /**
368ecf85b23SChuck Lever  * svc_rdma_flush_recv_queues - Drain pending Receive work
369ecf85b23SChuck Lever  * @rdma: svcxprt_rdma being shut down
370ecf85b23SChuck Lever  *
371ecf85b23SChuck Lever  */
svc_rdma_flush_recv_queues(struct svcxprt_rdma * rdma)372ecf85b23SChuck Lever void svc_rdma_flush_recv_queues(struct svcxprt_rdma *rdma)
373ecf85b23SChuck Lever {
374ecf85b23SChuck Lever 	struct svc_rdma_recv_ctxt *ctxt;
375ecf85b23SChuck Lever 
376ecf85b23SChuck Lever 	while ((ctxt = svc_rdma_next_recv_ctxt(&rdma->sc_rq_dto_q))) {
377ecf85b23SChuck Lever 		list_del(&ctxt->rc_list);
3781e5f4160SChuck Lever 		svc_rdma_recv_ctxt_put(rdma, ctxt);
379ecf85b23SChuck Lever 	}
380ecf85b23SChuck Lever }
381ecf85b23SChuck Lever 
svc_rdma_build_arg_xdr(struct svc_rqst * rqstp,struct svc_rdma_recv_ctxt * ctxt)3826f29d07cSChuck Lever static void svc_rdma_build_arg_xdr(struct svc_rqst *rqstp,
383ecf85b23SChuck Lever 				   struct svc_rdma_recv_ctxt *ctxt)
384d5b31be6STom Tucker {
3853316f063SChuck Lever 	struct xdr_buf *arg = &rqstp->rq_arg;
386d5b31be6STom Tucker 
3873316f063SChuck Lever 	arg->head[0].iov_base = ctxt->rc_recv_buf;
3883316f063SChuck Lever 	arg->head[0].iov_len = ctxt->rc_byte_len;
3893316f063SChuck Lever 	arg->tail[0].iov_base = NULL;
3903316f063SChuck Lever 	arg->tail[0].iov_len = 0;
3913316f063SChuck Lever 	arg->page_len = 0;
3923316f063SChuck Lever 	arg->page_base = 0;
3933316f063SChuck Lever 	arg->buflen = ctxt->rc_byte_len;
3943316f063SChuck Lever 	arg->len = ctxt->rc_byte_len;
395d5b31be6STom Tucker }
396d5b31be6STom Tucker 
39778147ca8SChuck Lever /**
39878147ca8SChuck Lever  * xdr_count_read_segments - Count number of Read segments in Read list
39978147ca8SChuck Lever  * @rctxt: Ingress receive context
40078147ca8SChuck Lever  * @p: Start of an un-decoded Read list
40178147ca8SChuck Lever  *
40278147ca8SChuck Lever  * Before allocating anything, ensure the ingress Read list is safe
40378147ca8SChuck Lever  * to use.
40478147ca8SChuck Lever  *
40578147ca8SChuck Lever  * The segment count is limited to how many segments can fit in the
40678147ca8SChuck Lever  * transport header without overflowing the buffer. That's about 40
40778147ca8SChuck Lever  * Read segments for a 1KB inline threshold.
40878147ca8SChuck Lever  *
40978147ca8SChuck Lever  * Return values:
41078147ca8SChuck Lever  *   %true: Read list is valid. @rctxt's xdr_stream is updated to point
41178147ca8SChuck Lever  *	    to the first byte past the Read list. rc_read_pcl and
41278147ca8SChuck Lever  *	    rc_call_pcl cl_count fields are set to the number of
41378147ca8SChuck Lever  *	    Read segments in the list.
41478147ca8SChuck Lever  *  %false: Read list is corrupt. @rctxt's xdr_stream is left in an
41578147ca8SChuck Lever  *	    unknown state.
4163c22f326SChuck Lever  */
xdr_count_read_segments(struct svc_rdma_recv_ctxt * rctxt,__be32 * p)41778147ca8SChuck Lever static bool xdr_count_read_segments(struct svc_rdma_recv_ctxt *rctxt, __be32 *p)
41878147ca8SChuck Lever {
41978147ca8SChuck Lever 	rctxt->rc_call_pcl.cl_count = 0;
42078147ca8SChuck Lever 	rctxt->rc_read_pcl.cl_count = 0;
42178147ca8SChuck Lever 	while (xdr_item_is_present(p)) {
42278147ca8SChuck Lever 		u32 position, handle, length;
42378147ca8SChuck Lever 		u64 offset;
4243c22f326SChuck Lever 
42578147ca8SChuck Lever 		p = xdr_inline_decode(&rctxt->rc_stream,
42678147ca8SChuck Lever 				      rpcrdma_readseg_maxsz * sizeof(*p));
42778147ca8SChuck Lever 		if (!p)
42878147ca8SChuck Lever 			return false;
42978147ca8SChuck Lever 
43078147ca8SChuck Lever 		xdr_decode_read_segment(p, &position, &handle,
43178147ca8SChuck Lever 					    &length, &offset);
43278147ca8SChuck Lever 		if (position) {
43378147ca8SChuck Lever 			if (position & 3)
43478147ca8SChuck Lever 				return false;
43578147ca8SChuck Lever 			++rctxt->rc_read_pcl.cl_count;
43678147ca8SChuck Lever 		} else {
43778147ca8SChuck Lever 			++rctxt->rc_call_pcl.cl_count;
43878147ca8SChuck Lever 		}
43978147ca8SChuck Lever 
44078147ca8SChuck Lever 		p = xdr_inline_decode(&rctxt->rc_stream, sizeof(*p));
44178147ca8SChuck Lever 		if (!p)
44278147ca8SChuck Lever 			return false;
44378147ca8SChuck Lever 	}
44478147ca8SChuck Lever 	return true;
44578147ca8SChuck Lever }
446a80a3234SChuck Lever 
447e77340e0SChuck Lever /* Sanity check the Read list.
448e77340e0SChuck Lever  *
449e77340e0SChuck Lever  * Sanity checks:
450e604aad2SChuck Lever  * - Read list does not overflow Receive buffer.
45178147ca8SChuck Lever  * - Chunk size limited by largest NFS data payload.
452e77340e0SChuck Lever  *
453e604aad2SChuck Lever  * Return values:
454e604aad2SChuck Lever  *   %true: Read list is valid. @rctxt's xdr_stream is updated
455e604aad2SChuck Lever  *	    to point to the first byte past the Read list.
456e604aad2SChuck Lever  *  %false: Read list is corrupt. @rctxt's xdr_stream is left
457e604aad2SChuck Lever  *	    in an unknown state.
458e77340e0SChuck Lever  */
xdr_check_read_list(struct svc_rdma_recv_ctxt * rctxt)459e604aad2SChuck Lever static bool xdr_check_read_list(struct svc_rdma_recv_ctxt *rctxt)
460e77340e0SChuck Lever {
461e604aad2SChuck Lever 	__be32 *p;
462e77340e0SChuck Lever 
463e604aad2SChuck Lever 	p = xdr_inline_decode(&rctxt->rc_stream, sizeof(*p));
464e604aad2SChuck Lever 	if (!p)
465e604aad2SChuck Lever 		return false;
46678147ca8SChuck Lever 	if (!xdr_count_read_segments(rctxt, p))
467e604aad2SChuck Lever 		return false;
46878147ca8SChuck Lever 	if (!pcl_alloc_call(rctxt, p))
469e604aad2SChuck Lever 		return false;
47078147ca8SChuck Lever 	return pcl_alloc_read(rctxt, p);
471a80a3234SChuck Lever }
472a80a3234SChuck Lever 
xdr_check_write_chunk(struct svc_rdma_recv_ctxt * rctxt)47378147ca8SChuck Lever static bool xdr_check_write_chunk(struct svc_rdma_recv_ctxt *rctxt)
474a80a3234SChuck Lever {
47578147ca8SChuck Lever 	u32 segcount;
476e604aad2SChuck Lever 	__be32 *p;
477a80a3234SChuck Lever 
47878147ca8SChuck Lever 	if (xdr_stream_decode_u32(&rctxt->rc_stream, &segcount))
47978147ca8SChuck Lever 		return false;
48078147ca8SChuck Lever 
481*838dd342SChuck Lever 	/* Before trusting the segcount value enough to use it in
482*838dd342SChuck Lever 	 * a computation, perform a simple range check. This is an
483*838dd342SChuck Lever 	 * arbitrary but sensible limit (ie, not architectural).
484*838dd342SChuck Lever 	 */
485*838dd342SChuck Lever 	if (unlikely(segcount > RPCSVC_MAXPAGES))
486*838dd342SChuck Lever 		return false;
487*838dd342SChuck Lever 
48878147ca8SChuck Lever 	p = xdr_inline_decode(&rctxt->rc_stream,
48978147ca8SChuck Lever 			      segcount * rpcrdma_segment_maxsz * sizeof(*p));
49078147ca8SChuck Lever 	return p != NULL;
49178147ca8SChuck Lever }
49278147ca8SChuck Lever 
49378147ca8SChuck Lever /**
49478147ca8SChuck Lever  * xdr_count_write_chunks - Count number of Write chunks in Write list
49578147ca8SChuck Lever  * @rctxt: Received header and decoding state
49678147ca8SChuck Lever  * @p: start of an un-decoded Write list
49778147ca8SChuck Lever  *
49878147ca8SChuck Lever  * Before allocating anything, ensure the ingress Write list is
49978147ca8SChuck Lever  * safe to use.
50078147ca8SChuck Lever  *
50178147ca8SChuck Lever  * Return values:
50278147ca8SChuck Lever  *       %true: Write list is valid. @rctxt's xdr_stream is updated
50378147ca8SChuck Lever  *		to point to the first byte past the Write list, and
50478147ca8SChuck Lever  *		the number of Write chunks is in rc_write_pcl.cl_count.
50578147ca8SChuck Lever  *      %false: Write list is corrupt. @rctxt's xdr_stream is left
50678147ca8SChuck Lever  *		in an indeterminate state.
50778147ca8SChuck Lever  */
xdr_count_write_chunks(struct svc_rdma_recv_ctxt * rctxt,__be32 * p)50878147ca8SChuck Lever static bool xdr_count_write_chunks(struct svc_rdma_recv_ctxt *rctxt, __be32 *p)
50978147ca8SChuck Lever {
51078147ca8SChuck Lever 	rctxt->rc_write_pcl.cl_count = 0;
51178147ca8SChuck Lever 	while (xdr_item_is_present(p)) {
51278147ca8SChuck Lever 		if (!xdr_check_write_chunk(rctxt))
51378147ca8SChuck Lever 			return false;
51478147ca8SChuck Lever 		++rctxt->rc_write_pcl.cl_count;
515e604aad2SChuck Lever 		p = xdr_inline_decode(&rctxt->rc_stream, sizeof(*p));
516e604aad2SChuck Lever 		if (!p)
517e604aad2SChuck Lever 			return false;
5183c22f326SChuck Lever 	}
51978147ca8SChuck Lever 	return true;
5203c22f326SChuck Lever }
5213c22f326SChuck Lever 
5223c22f326SChuck Lever /* Sanity check the Write list.
5233c22f326SChuck Lever  *
5243c22f326SChuck Lever  * Implementation limits:
525e604aad2SChuck Lever  * - This implementation currently supports only one Write chunk.
5263c22f326SChuck Lever  *
5273c22f326SChuck Lever  * Sanity checks:
528e604aad2SChuck Lever  * - Write list does not overflow Receive buffer.
529e604aad2SChuck Lever  * - Chunk size limited by largest NFS data payload.
5303c22f326SChuck Lever  *
531e604aad2SChuck Lever  * Return values:
532e604aad2SChuck Lever  *       %true: Write list is valid. @rctxt's xdr_stream is updated
533e604aad2SChuck Lever  *		to point to the first byte past the Write list.
534e604aad2SChuck Lever  *      %false: Write list is corrupt. @rctxt's xdr_stream is left
535e604aad2SChuck Lever  *		in an unknown state.
5363c22f326SChuck Lever  */
xdr_check_write_list(struct svc_rdma_recv_ctxt * rctxt)537e604aad2SChuck Lever static bool xdr_check_write_list(struct svc_rdma_recv_ctxt *rctxt)
5383c22f326SChuck Lever {
539e604aad2SChuck Lever 	__be32 *p;
5403c22f326SChuck Lever 
541e604aad2SChuck Lever 	p = xdr_inline_decode(&rctxt->rc_stream, sizeof(*p));
5423c22f326SChuck Lever 	if (!p)
543e604aad2SChuck Lever 		return false;
54478147ca8SChuck Lever 	if (!xdr_count_write_chunks(rctxt, p))
54578147ca8SChuck Lever 		return false;
54678147ca8SChuck Lever 	if (!pcl_alloc_write(rctxt, &rctxt->rc_write_pcl, p))
54778147ca8SChuck Lever 		return false;
54878147ca8SChuck Lever 
54978147ca8SChuck Lever 	rctxt->rc_cur_result_payload = pcl_first_chunk(&rctxt->rc_write_pcl);
5507954c850SChuck Lever 	return true;
551a80a3234SChuck Lever }
552a80a3234SChuck Lever 
553ca5c76abSChuck Lever /* Sanity check the Reply chunk.
554ca5c76abSChuck Lever  *
555ca5c76abSChuck Lever  * Sanity checks:
556e604aad2SChuck Lever  * - Reply chunk does not overflow Receive buffer.
557e604aad2SChuck Lever  * - Chunk size limited by largest NFS data payload.
558ca5c76abSChuck Lever  *
559e604aad2SChuck Lever  * Return values:
560e604aad2SChuck Lever  *       %true: Reply chunk is valid. @rctxt's xdr_stream is updated
561e604aad2SChuck Lever  *		to point to the first byte past the Reply chunk.
562e604aad2SChuck Lever  *      %false: Reply chunk is corrupt. @rctxt's xdr_stream is left
563e604aad2SChuck Lever  *		in an unknown state.
564ca5c76abSChuck Lever  */
xdr_check_reply_chunk(struct svc_rdma_recv_ctxt * rctxt)565e604aad2SChuck Lever static bool xdr_check_reply_chunk(struct svc_rdma_recv_ctxt *rctxt)
566a80a3234SChuck Lever {
567e604aad2SChuck Lever 	__be32 *p;
568e604aad2SChuck Lever 
569e604aad2SChuck Lever 	p = xdr_inline_decode(&rctxt->rc_stream, sizeof(*p));
570ca5c76abSChuck Lever 	if (!p)
571e604aad2SChuck Lever 		return false;
57278147ca8SChuck Lever 
57378147ca8SChuck Lever 	if (!xdr_item_is_present(p))
574e604aad2SChuck Lever 		return true;
57578147ca8SChuck Lever 	if (!xdr_check_write_chunk(rctxt))
57678147ca8SChuck Lever 		return false;
57778147ca8SChuck Lever 
57878147ca8SChuck Lever 	rctxt->rc_reply_pcl.cl_count = 1;
57978147ca8SChuck Lever 	return pcl_alloc_write(rctxt, &rctxt->rc_reply_pcl, p);
580a80a3234SChuck Lever }
581a80a3234SChuck Lever 
58297bce634SChuck Lever /* RPC-over-RDMA Version One private extension: Remote Invalidation.
58397bce634SChuck Lever  * Responder's choice: requester signals it can handle Send With
58497bce634SChuck Lever  * Invalidate, and responder chooses one R_key to invalidate.
58597bce634SChuck Lever  *
58697bce634SChuck Lever  * If there is exactly one distinct R_key in the received transport
58797bce634SChuck Lever  * header, set rc_inv_rkey to that R_key. Otherwise, set it to zero.
58897bce634SChuck Lever  */
svc_rdma_get_inv_rkey(struct svcxprt_rdma * rdma,struct svc_rdma_recv_ctxt * ctxt)58997bce634SChuck Lever static void svc_rdma_get_inv_rkey(struct svcxprt_rdma *rdma,
59097bce634SChuck Lever 				  struct svc_rdma_recv_ctxt *ctxt)
59197bce634SChuck Lever {
592eb3de6a4SChuck Lever 	struct svc_rdma_segment *segment;
593eb3de6a4SChuck Lever 	struct svc_rdma_chunk *chunk;
594eb3de6a4SChuck Lever 	u32 inv_rkey;
59597bce634SChuck Lever 
59697bce634SChuck Lever 	ctxt->rc_inv_rkey = 0;
59797bce634SChuck Lever 
59897bce634SChuck Lever 	if (!rdma->sc_snd_w_inv)
59997bce634SChuck Lever 		return;
60097bce634SChuck Lever 
601eb3de6a4SChuck Lever 	inv_rkey = 0;
602eb3de6a4SChuck Lever 	pcl_for_each_chunk(chunk, &ctxt->rc_call_pcl) {
603eb3de6a4SChuck Lever 		pcl_for_each_segment(segment, chunk) {
604eb3de6a4SChuck Lever 			if (inv_rkey == 0)
605eb3de6a4SChuck Lever 				inv_rkey = segment->rs_handle;
606eb3de6a4SChuck Lever 			else if (inv_rkey != segment->rs_handle)
60797bce634SChuck Lever 				return;
60897bce634SChuck Lever 		}
609eb3de6a4SChuck Lever 	}
610eb3de6a4SChuck Lever 	pcl_for_each_chunk(chunk, &ctxt->rc_read_pcl) {
611eb3de6a4SChuck Lever 		pcl_for_each_segment(segment, chunk) {
612eb3de6a4SChuck Lever 			if (inv_rkey == 0)
613eb3de6a4SChuck Lever 				inv_rkey = segment->rs_handle;
614eb3de6a4SChuck Lever 			else if (inv_rkey != segment->rs_handle)
61597bce634SChuck Lever 				return;
61697bce634SChuck Lever 		}
61797bce634SChuck Lever 	}
618eb3de6a4SChuck Lever 	pcl_for_each_chunk(chunk, &ctxt->rc_write_pcl) {
619eb3de6a4SChuck Lever 		pcl_for_each_segment(segment, chunk) {
620eb3de6a4SChuck Lever 			if (inv_rkey == 0)
621eb3de6a4SChuck Lever 				inv_rkey = segment->rs_handle;
622eb3de6a4SChuck Lever 			else if (inv_rkey != segment->rs_handle)
62397bce634SChuck Lever 				return;
62497bce634SChuck Lever 		}
62597bce634SChuck Lever 	}
626eb3de6a4SChuck Lever 	pcl_for_each_chunk(chunk, &ctxt->rc_reply_pcl) {
627eb3de6a4SChuck Lever 		pcl_for_each_segment(segment, chunk) {
628eb3de6a4SChuck Lever 			if (inv_rkey == 0)
629eb3de6a4SChuck Lever 				inv_rkey = segment->rs_handle;
630eb3de6a4SChuck Lever 			else if (inv_rkey != segment->rs_handle)
631eb3de6a4SChuck Lever 				return;
632eb3de6a4SChuck Lever 		}
633eb3de6a4SChuck Lever 	}
634eb3de6a4SChuck Lever 	ctxt->rc_inv_rkey = inv_rkey;
63597bce634SChuck Lever }
63697bce634SChuck Lever 
637e604aad2SChuck Lever /**
638e604aad2SChuck Lever  * svc_rdma_xdr_decode_req - Decode the transport header
639e604aad2SChuck Lever  * @rq_arg: xdr_buf containing ingress RPC/RDMA message
640e604aad2SChuck Lever  * @rctxt: state of decoding
641e604aad2SChuck Lever  *
642e604aad2SChuck Lever  * On entry, xdr->head[0].iov_base points to first byte of the
643e604aad2SChuck Lever  * RPC-over-RDMA transport header.
644a80a3234SChuck Lever  *
645a80a3234SChuck Lever  * On successful exit, head[0] points to first byte past the
646a80a3234SChuck Lever  * RPC-over-RDMA header. For RDMA_MSG, this is the RPC message.
647e604aad2SChuck Lever  *
648a80a3234SChuck Lever  * The length of the RPC-over-RDMA header is returned.
649a80a3234SChuck Lever  *
650a80a3234SChuck Lever  * Assumptions:
651a80a3234SChuck Lever  * - The transport header is entirely contained in the head iovec.
652a80a3234SChuck Lever  */
svc_rdma_xdr_decode_req(struct xdr_buf * rq_arg,struct svc_rdma_recv_ctxt * rctxt)653e604aad2SChuck Lever static int svc_rdma_xdr_decode_req(struct xdr_buf *rq_arg,
654e604aad2SChuck Lever 				   struct svc_rdma_recv_ctxt *rctxt)
655a80a3234SChuck Lever {
656e604aad2SChuck Lever 	__be32 *p, *rdma_argp;
657a80a3234SChuck Lever 	unsigned int hdr_len;
658a80a3234SChuck Lever 
659a80a3234SChuck Lever 	rdma_argp = rq_arg->head[0].iov_base;
660e604aad2SChuck Lever 	xdr_init_decode(&rctxt->rc_stream, rq_arg, rdma_argp, NULL);
661a80a3234SChuck Lever 
662e604aad2SChuck Lever 	p = xdr_inline_decode(&rctxt->rc_stream,
663e604aad2SChuck Lever 			      rpcrdma_fixed_maxsz * sizeof(*p));
664e604aad2SChuck Lever 	if (unlikely(!p))
665e604aad2SChuck Lever 		goto out_short;
666e604aad2SChuck Lever 	p++;
667e604aad2SChuck Lever 	if (*p != rpcrdma_version)
668e604aad2SChuck Lever 		goto out_version;
669e604aad2SChuck Lever 	p += 2;
67058b2e0feSChuck Lever 	rctxt->rc_msgtype = *p;
67158b2e0feSChuck Lever 	switch (rctxt->rc_msgtype) {
672a80a3234SChuck Lever 	case rdma_msg:
673a80a3234SChuck Lever 		break;
674a80a3234SChuck Lever 	case rdma_nomsg:
675a80a3234SChuck Lever 		break;
676a80a3234SChuck Lever 	case rdma_done:
677a80a3234SChuck Lever 		goto out_drop;
678a80a3234SChuck Lever 	case rdma_error:
679a80a3234SChuck Lever 		goto out_drop;
680a80a3234SChuck Lever 	default:
681a80a3234SChuck Lever 		goto out_proc;
682a80a3234SChuck Lever 	}
683a80a3234SChuck Lever 
684e604aad2SChuck Lever 	if (!xdr_check_read_list(rctxt))
685a80a3234SChuck Lever 		goto out_inval;
686e604aad2SChuck Lever 	if (!xdr_check_write_list(rctxt))
687a80a3234SChuck Lever 		goto out_inval;
688e604aad2SChuck Lever 	if (!xdr_check_reply_chunk(rctxt))
689a80a3234SChuck Lever 		goto out_inval;
690a80a3234SChuck Lever 
691e604aad2SChuck Lever 	rq_arg->head[0].iov_base = rctxt->rc_stream.p;
692e604aad2SChuck Lever 	hdr_len = xdr_stream_pos(&rctxt->rc_stream);
693a80a3234SChuck Lever 	rq_arg->head[0].iov_len -= hdr_len;
69471641d99SChuck Lever 	rq_arg->len -= hdr_len;
695007140eeSChuck Lever 	trace_svcrdma_decode_rqst(rctxt, rdma_argp, hdr_len);
696a80a3234SChuck Lever 	return hdr_len;
697a80a3234SChuck Lever 
698a80a3234SChuck Lever out_short:
699007140eeSChuck Lever 	trace_svcrdma_decode_short_err(rctxt, rq_arg->len);
700a80a3234SChuck Lever 	return -EINVAL;
701a80a3234SChuck Lever 
702a80a3234SChuck Lever out_version:
703007140eeSChuck Lever 	trace_svcrdma_decode_badvers_err(rctxt, rdma_argp);
704a80a3234SChuck Lever 	return -EPROTONOSUPPORT;
705a80a3234SChuck Lever 
706a80a3234SChuck Lever out_drop:
707007140eeSChuck Lever 	trace_svcrdma_decode_drop_err(rctxt, rdma_argp);
708a80a3234SChuck Lever 	return 0;
709a80a3234SChuck Lever 
710a80a3234SChuck Lever out_proc:
711007140eeSChuck Lever 	trace_svcrdma_decode_badproc_err(rctxt, rdma_argp);
712a80a3234SChuck Lever 	return -EINVAL;
713a80a3234SChuck Lever 
714a80a3234SChuck Lever out_inval:
715007140eeSChuck Lever 	trace_svcrdma_decode_parse_err(rctxt, rdma_argp);
716a80a3234SChuck Lever 	return -EINVAL;
717a80a3234SChuck Lever }
718a80a3234SChuck Lever 
svc_rdma_send_error(struct svcxprt_rdma * rdma,struct svc_rdma_recv_ctxt * rctxt,int status)719ba6cc977SChuck Lever static void svc_rdma_send_error(struct svcxprt_rdma *rdma,
720d1f6e236SChuck Lever 				struct svc_rdma_recv_ctxt *rctxt,
721d1f6e236SChuck Lever 				int status)
7226b19cc5cSChuck Lever {
723ba6cc977SChuck Lever 	struct svc_rdma_send_ctxt *sctxt;
7246b19cc5cSChuck Lever 
725ba6cc977SChuck Lever 	sctxt = svc_rdma_send_ctxt_get(rdma);
726ba6cc977SChuck Lever 	if (!sctxt)
7276b19cc5cSChuck Lever 		return;
728ba6cc977SChuck Lever 	svc_rdma_send_error_msg(rdma, sctxt, rctxt, status);
7296b19cc5cSChuck Lever }
7306b19cc5cSChuck Lever 
7315d252f90SChuck Lever /* By convention, backchannel calls arrive via rdma_msg type
7325d252f90SChuck Lever  * messages, and never populate the chunk lists. This makes
7335d252f90SChuck Lever  * the RPC/RDMA header small and fixed in size, so it is
7345d252f90SChuck Lever  * straightforward to check the RPC header's direction field.
7355d252f90SChuck Lever  */
svc_rdma_is_reverse_direction_reply(struct svc_xprt * xprt,struct svc_rdma_recv_ctxt * rctxt)73658b2e0feSChuck Lever static bool svc_rdma_is_reverse_direction_reply(struct svc_xprt *xprt,
73758b2e0feSChuck Lever 						struct svc_rdma_recv_ctxt *rctxt)
7385d252f90SChuck Lever {
73958b2e0feSChuck Lever 	__be32 *p = rctxt->rc_recv_buf;
7405d252f90SChuck Lever 
7415d252f90SChuck Lever 	if (!xprt->xpt_bc_xprt)
7425d252f90SChuck Lever 		return false;
7435d252f90SChuck Lever 
74458b2e0feSChuck Lever 	if (rctxt->rc_msgtype != rdma_msg)
7455d252f90SChuck Lever 		return false;
7465d252f90SChuck Lever 
74758b2e0feSChuck Lever 	if (!pcl_is_empty(&rctxt->rc_call_pcl))
748f5821c76SChuck Lever 		return false;
74958b2e0feSChuck Lever 	if (!pcl_is_empty(&rctxt->rc_read_pcl))
750f5821c76SChuck Lever 		return false;
75158b2e0feSChuck Lever 	if (!pcl_is_empty(&rctxt->rc_write_pcl))
75258b2e0feSChuck Lever 		return false;
75358b2e0feSChuck Lever 	if (!pcl_is_empty(&rctxt->rc_reply_pcl))
754f5821c76SChuck Lever 		return false;
755f5821c76SChuck Lever 
75658b2e0feSChuck Lever 	/* RPC call direction */
75758b2e0feSChuck Lever 	if (*(p + 8) == cpu_to_be32(RPC_CALL))
7585d252f90SChuck Lever 		return false;
7595d252f90SChuck Lever 
7605d252f90SChuck Lever 	return true;
7615d252f90SChuck Lever }
7625d252f90SChuck Lever 
763cafc7398SChuck Lever /**
764cafc7398SChuck Lever  * svc_rdma_recvfrom - Receive an RPC call
765cafc7398SChuck Lever  * @rqstp: request structure into which to receive an RPC Call
766cafc7398SChuck Lever  *
767cafc7398SChuck Lever  * Returns:
768cafc7398SChuck Lever  *	The positive number of bytes in the RPC Call message,
769cafc7398SChuck Lever  *	%0 if there were no Calls ready to return,
770cafc7398SChuck Lever  *	%-EINVAL if the Read chunk data is too large,
771cafc7398SChuck Lever  *	%-ENOMEM if rdma_rw context pool was exhausted,
772cafc7398SChuck Lever  *	%-ENOTCONN if posting failed (connection is lost),
773cafc7398SChuck Lever  *	%-EIO if rdma_rw initialization failed (DMA mapping, etc).
774cafc7398SChuck Lever  *
775cafc7398SChuck Lever  * Called in a loop when XPT_DATA is set. XPT_DATA is cleared only
776cafc7398SChuck Lever  * when there are no remaining ctxt's to process.
777cafc7398SChuck Lever  *
778cafc7398SChuck Lever  * The next ctxt is removed from the "receive" lists.
779cafc7398SChuck Lever  *
780cafc7398SChuck Lever  * - If the ctxt completes a Receive, then construct the Call
781cafc7398SChuck Lever  *   message from the contents of the Receive buffer.
782cafc7398SChuck Lever  *
783cafc7398SChuck Lever  *   - If there are no Read chunks in this message, then finish
784cafc7398SChuck Lever  *     assembling the Call message and return the number of bytes
785cafc7398SChuck Lever  *     in the message.
786cafc7398SChuck Lever  *
787cafc7398SChuck Lever  *   - If there are Read chunks in this message, post Read WRs to
78888770b8dSChuck Lever  *     pull that payload. When the Read WRs complete, build the
78988770b8dSChuck Lever  *     full message and return the number of bytes in it.
790d5b31be6STom Tucker  */
svc_rdma_recvfrom(struct svc_rqst * rqstp)791d5b31be6STom Tucker int svc_rdma_recvfrom(struct svc_rqst *rqstp)
792d5b31be6STom Tucker {
793d5b31be6STom Tucker 	struct svc_xprt *xprt = rqstp->rq_xprt;
794d5b31be6STom Tucker 	struct svcxprt_rdma *rdma_xprt =
795d5b31be6STom Tucker 		container_of(xprt, struct svcxprt_rdma, sc_xprt);
796ecf85b23SChuck Lever 	struct svc_rdma_recv_ctxt *ctxt;
7972d6491a5SChuck Lever 	int ret;
798d5b31be6STom Tucker 
799baf6d18bSChuck Lever 	/* Prevent svc_xprt_release() from releasing pages in rq_pages
800baf6d18bSChuck Lever 	 * when returning 0 or an error.
801baf6d18bSChuck Lever 	 */
802baf6d18bSChuck Lever 	rqstp->rq_respages = rqstp->rq_pages;
803baf6d18bSChuck Lever 	rqstp->rq_next_page = rqstp->rq_respages;
804baf6d18bSChuck Lever 
80523cf1ee1SChuck Lever 	rqstp->rq_xprt_ctxt = NULL;
80623cf1ee1SChuck Lever 
807e3eded5eSChuck Lever 	ctxt = NULL;
80881fa3275SChuck Lever 	spin_lock(&rdma_xprt->sc_rq_dto_lock);
809ecf85b23SChuck Lever 	ctxt = svc_rdma_next_recv_ctxt(&rdma_xprt->sc_rq_dto_q);
810e3eded5eSChuck Lever 	if (ctxt)
811e3eded5eSChuck Lever 		list_del(&ctxt->rc_list);
812e3eded5eSChuck Lever 	else
8132d6491a5SChuck Lever 		/* No new incoming requests, terminate the loop */
814d5b31be6STom Tucker 		clear_bit(XPT_DATA, &xprt->xpt_flags);
8152d6491a5SChuck Lever 	spin_unlock(&rdma_xprt->sc_rq_dto_lock);
816d5b31be6STom Tucker 
8177d81ee87SChuck Lever 	/* Unblock the transport for the next receive */
8187d81ee87SChuck Lever 	svc_xprt_received(xprt);
819e3eded5eSChuck Lever 	if (!ctxt)
820e3eded5eSChuck Lever 		return 0;
8217d81ee87SChuck Lever 
822e3eded5eSChuck Lever 	percpu_counter_inc(&svcrdma_stat_recv);
823dd2d055bSChuck Lever 	ib_dma_sync_single_for_cpu(rdma_xprt->sc_pd->device,
824dd2d055bSChuck Lever 				   ctxt->rc_recv_sge.addr, ctxt->rc_byte_len,
825dd2d055bSChuck Lever 				   DMA_FROM_DEVICE);
8266f29d07cSChuck Lever 	svc_rdma_build_arg_xdr(rqstp, ctxt);
827d5b31be6STom Tucker 
828e604aad2SChuck Lever 	ret = svc_rdma_xdr_decode_req(&rqstp->rq_arg, ctxt);
829a6081b82SChuck Lever 	if (ret < 0)
830a6081b82SChuck Lever 		goto out_err;
831a0544c94SChuck Lever 	if (ret == 0)
832a0544c94SChuck Lever 		goto out_drop;
833d5b31be6STom Tucker 
83458b2e0feSChuck Lever 	if (svc_rdma_is_reverse_direction_reply(xprt, ctxt))
835ea740bd5SChuck Lever 		goto out_backchannel;
836ea740bd5SChuck Lever 
83797bce634SChuck Lever 	svc_rdma_get_inv_rkey(rdma_xprt, ctxt);
8385d252f90SChuck Lever 
839d96962e6SChuck Lever 	if (!pcl_is_empty(&ctxt->rc_read_pcl) ||
8409af723beSChuck Lever 	    !pcl_is_empty(&ctxt->rc_call_pcl)) {
8419af723beSChuck Lever 		ret = svc_rdma_process_read_list(rdma_xprt, rqstp, ctxt);
8429af723beSChuck Lever 		if (ret < 0)
8439af723beSChuck Lever 			goto out_readfail;
8449af723beSChuck Lever 	}
845d5b31be6STom Tucker 
8463a88092eSChuck Lever 	rqstp->rq_xprt_ctxt = ctxt;
847d5b31be6STom Tucker 	rqstp->rq_prot = IPPROTO_MAX;
848d5b31be6STom Tucker 	svc_xprt_copy_addrs(rqstp, xprt);
849319951ebSChuck Lever 	set_bit(RQ_SECURE, &rqstp->rq_flags);
85071641d99SChuck Lever 	return rqstp->rq_arg.len;
851d5b31be6STom Tucker 
852a6081b82SChuck Lever out_err:
853d1f6e236SChuck Lever 	svc_rdma_send_error(rdma_xprt, ctxt, ret);
8541e5f4160SChuck Lever 	svc_rdma_recv_ctxt_put(rdma_xprt, ctxt);
855a6081b82SChuck Lever 	return 0;
856a6081b82SChuck Lever 
857d96962e6SChuck Lever out_readfail:
858cafc7398SChuck Lever 	if (ret == -EINVAL)
859d1f6e236SChuck Lever 		svc_rdma_send_error(rdma_xprt, ctxt, ret);
8601e5f4160SChuck Lever 	svc_rdma_recv_ctxt_put(rdma_xprt, ctxt);
861be739f32SChuck Lever 	svc_xprt_deferred_close(xprt);
862be739f32SChuck Lever 	return -ENOTCONN;
8635d252f90SChuck Lever 
864ea740bd5SChuck Lever out_backchannel:
865ea740bd5SChuck Lever 	svc_rdma_handle_bc_reply(rqstp, ctxt);
866a0544c94SChuck Lever out_drop:
8671e5f4160SChuck Lever 	svc_rdma_recv_ctxt_put(rdma_xprt, ctxt);
86848272502SChuck Lever 	return 0;
869d5b31be6STom Tucker }
870