xref: /openbmc/linux/net/sunrpc/xprtrdma/backchannel.c (revision 83128a60ca74e996c5e0336c4fff0579f4a8c909)
1f531a5dbSChuck Lever /*
2f531a5dbSChuck Lever  * Copyright (c) 2015 Oracle.  All rights reserved.
3f531a5dbSChuck Lever  *
4f531a5dbSChuck Lever  * Support for backward direction RPCs on RPC/RDMA.
5f531a5dbSChuck Lever  */
6f531a5dbSChuck Lever 
7f531a5dbSChuck Lever #include <linux/module.h>
8f531a5dbSChuck Lever 
9f531a5dbSChuck Lever #include "xprt_rdma.h"
10f531a5dbSChuck Lever 
11f531a5dbSChuck Lever #if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
12f531a5dbSChuck Lever # define RPCDBG_FACILITY	RPCDBG_TRANS
13f531a5dbSChuck Lever #endif
14f531a5dbSChuck Lever 
15f531a5dbSChuck Lever static void rpcrdma_bc_free_rqst(struct rpcrdma_xprt *r_xprt,
16f531a5dbSChuck Lever 				 struct rpc_rqst *rqst)
17f531a5dbSChuck Lever {
18f531a5dbSChuck Lever 	struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
19f531a5dbSChuck Lever 	struct rpcrdma_req *req = rpcr_to_rdmar(rqst);
20f531a5dbSChuck Lever 
21f531a5dbSChuck Lever 	spin_lock(&buf->rb_reqslock);
22f531a5dbSChuck Lever 	list_del(&req->rl_all);
23f531a5dbSChuck Lever 	spin_unlock(&buf->rb_reqslock);
24f531a5dbSChuck Lever 
25f531a5dbSChuck Lever 	rpcrdma_destroy_req(&r_xprt->rx_ia, req);
26f531a5dbSChuck Lever 
27f531a5dbSChuck Lever 	kfree(rqst);
28f531a5dbSChuck Lever }
29f531a5dbSChuck Lever 
30f531a5dbSChuck Lever static int rpcrdma_bc_setup_rqst(struct rpcrdma_xprt *r_xprt,
31f531a5dbSChuck Lever 				 struct rpc_rqst *rqst)
32f531a5dbSChuck Lever {
33f531a5dbSChuck Lever 	struct rpcrdma_ia *ia = &r_xprt->rx_ia;
34f531a5dbSChuck Lever 	struct rpcrdma_regbuf *rb;
35f531a5dbSChuck Lever 	struct rpcrdma_req *req;
36f531a5dbSChuck Lever 	struct xdr_buf *buf;
37f531a5dbSChuck Lever 	size_t size;
38f531a5dbSChuck Lever 
39f531a5dbSChuck Lever 	req = rpcrdma_create_req(r_xprt);
40f531a5dbSChuck Lever 	if (!req)
41f531a5dbSChuck Lever 		return -ENOMEM;
42f531a5dbSChuck Lever 	req->rl_backchannel = true;
43f531a5dbSChuck Lever 
44f531a5dbSChuck Lever 	size = RPCRDMA_INLINE_WRITE_THRESHOLD(rqst);
45f531a5dbSChuck Lever 	rb = rpcrdma_alloc_regbuf(ia, size, GFP_KERNEL);
46f531a5dbSChuck Lever 	if (IS_ERR(rb))
47f531a5dbSChuck Lever 		goto out_fail;
48f531a5dbSChuck Lever 	req->rl_rdmabuf = rb;
49f531a5dbSChuck Lever 
50f531a5dbSChuck Lever 	size += RPCRDMA_INLINE_READ_THRESHOLD(rqst);
51f531a5dbSChuck Lever 	rb = rpcrdma_alloc_regbuf(ia, size, GFP_KERNEL);
52f531a5dbSChuck Lever 	if (IS_ERR(rb))
53f531a5dbSChuck Lever 		goto out_fail;
54f531a5dbSChuck Lever 	rb->rg_owner = req;
55f531a5dbSChuck Lever 	req->rl_sendbuf = rb;
56f531a5dbSChuck Lever 	/* so that rpcr_to_rdmar works when receiving a request */
57f531a5dbSChuck Lever 	rqst->rq_buffer = (void *)req->rl_sendbuf->rg_base;
58f531a5dbSChuck Lever 
59f531a5dbSChuck Lever 	buf = &rqst->rq_snd_buf;
60f531a5dbSChuck Lever 	buf->head[0].iov_base = rqst->rq_buffer;
61f531a5dbSChuck Lever 	buf->head[0].iov_len = 0;
62f531a5dbSChuck Lever 	buf->tail[0].iov_base = NULL;
63f531a5dbSChuck Lever 	buf->tail[0].iov_len = 0;
64f531a5dbSChuck Lever 	buf->page_len = 0;
65f531a5dbSChuck Lever 	buf->len = 0;
66f531a5dbSChuck Lever 	buf->buflen = size;
67f531a5dbSChuck Lever 
68f531a5dbSChuck Lever 	return 0;
69f531a5dbSChuck Lever 
70f531a5dbSChuck Lever out_fail:
71f531a5dbSChuck Lever 	rpcrdma_bc_free_rqst(r_xprt, rqst);
72f531a5dbSChuck Lever 	return -ENOMEM;
73f531a5dbSChuck Lever }
74f531a5dbSChuck Lever 
75f531a5dbSChuck Lever /* Allocate and add receive buffers to the rpcrdma_buffer's
76f531a5dbSChuck Lever  * existing list of rep's. These are released when the
77f531a5dbSChuck Lever  * transport is destroyed.
78f531a5dbSChuck Lever  */
79f531a5dbSChuck Lever static int rpcrdma_bc_setup_reps(struct rpcrdma_xprt *r_xprt,
80f531a5dbSChuck Lever 				 unsigned int count)
81f531a5dbSChuck Lever {
82f531a5dbSChuck Lever 	struct rpcrdma_buffer *buffers = &r_xprt->rx_buf;
83f531a5dbSChuck Lever 	struct rpcrdma_rep *rep;
84f531a5dbSChuck Lever 	unsigned long flags;
85f531a5dbSChuck Lever 	int rc = 0;
86f531a5dbSChuck Lever 
87f531a5dbSChuck Lever 	while (count--) {
88f531a5dbSChuck Lever 		rep = rpcrdma_create_rep(r_xprt);
89f531a5dbSChuck Lever 		if (IS_ERR(rep)) {
90f531a5dbSChuck Lever 			pr_err("RPC:       %s: reply buffer alloc failed\n",
91f531a5dbSChuck Lever 			       __func__);
92f531a5dbSChuck Lever 			rc = PTR_ERR(rep);
93f531a5dbSChuck Lever 			break;
94f531a5dbSChuck Lever 		}
95f531a5dbSChuck Lever 
96f531a5dbSChuck Lever 		spin_lock_irqsave(&buffers->rb_lock, flags);
97f531a5dbSChuck Lever 		list_add(&rep->rr_list, &buffers->rb_recv_bufs);
98f531a5dbSChuck Lever 		spin_unlock_irqrestore(&buffers->rb_lock, flags);
99f531a5dbSChuck Lever 	}
100f531a5dbSChuck Lever 
101f531a5dbSChuck Lever 	return rc;
102f531a5dbSChuck Lever }
103f531a5dbSChuck Lever 
104f531a5dbSChuck Lever /**
105f531a5dbSChuck Lever  * xprt_rdma_bc_setup - Pre-allocate resources for handling backchannel requests
106f531a5dbSChuck Lever  * @xprt: transport associated with these backchannel resources
107f531a5dbSChuck Lever  * @reqs: number of concurrent incoming requests to expect
108f531a5dbSChuck Lever  *
109f531a5dbSChuck Lever  * Returns 0 on success; otherwise a negative errno
110f531a5dbSChuck Lever  */
111f531a5dbSChuck Lever int xprt_rdma_bc_setup(struct rpc_xprt *xprt, unsigned int reqs)
112f531a5dbSChuck Lever {
113f531a5dbSChuck Lever 	struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
114f531a5dbSChuck Lever 	struct rpcrdma_buffer *buffer = &r_xprt->rx_buf;
115f531a5dbSChuck Lever 	struct rpc_rqst *rqst;
116f531a5dbSChuck Lever 	unsigned int i;
117f531a5dbSChuck Lever 	int rc;
118f531a5dbSChuck Lever 
119f531a5dbSChuck Lever 	/* The backchannel reply path returns each rpc_rqst to the
120f531a5dbSChuck Lever 	 * bc_pa_list _after_ the reply is sent. If the server is
121f531a5dbSChuck Lever 	 * faster than the client, it can send another backward
122f531a5dbSChuck Lever 	 * direction request before the rpc_rqst is returned to the
123f531a5dbSChuck Lever 	 * list. The client rejects the request in this case.
124f531a5dbSChuck Lever 	 *
125f531a5dbSChuck Lever 	 * Twice as many rpc_rqsts are prepared to ensure there is
126f531a5dbSChuck Lever 	 * always an rpc_rqst available as soon as a reply is sent.
127f531a5dbSChuck Lever 	 */
128124fa17dSChuck Lever 	if (reqs > RPCRDMA_BACKWARD_WRS >> 1)
129124fa17dSChuck Lever 		goto out_err;
130124fa17dSChuck Lever 
131f531a5dbSChuck Lever 	for (i = 0; i < (reqs << 1); i++) {
132f531a5dbSChuck Lever 		rqst = kzalloc(sizeof(*rqst), GFP_KERNEL);
133f531a5dbSChuck Lever 		if (!rqst) {
134f531a5dbSChuck Lever 			pr_err("RPC:       %s: Failed to create bc rpc_rqst\n",
135f531a5dbSChuck Lever 			       __func__);
136f531a5dbSChuck Lever 			goto out_free;
137f531a5dbSChuck Lever 		}
138f531a5dbSChuck Lever 
139f531a5dbSChuck Lever 		rqst->rq_xprt = &r_xprt->rx_xprt;
140f531a5dbSChuck Lever 		INIT_LIST_HEAD(&rqst->rq_list);
141f531a5dbSChuck Lever 		INIT_LIST_HEAD(&rqst->rq_bc_list);
142f531a5dbSChuck Lever 
143f531a5dbSChuck Lever 		if (rpcrdma_bc_setup_rqst(r_xprt, rqst))
144f531a5dbSChuck Lever 			goto out_free;
145f531a5dbSChuck Lever 
146f531a5dbSChuck Lever 		spin_lock_bh(&xprt->bc_pa_lock);
147f531a5dbSChuck Lever 		list_add(&rqst->rq_bc_pa_list, &xprt->bc_pa_list);
148f531a5dbSChuck Lever 		spin_unlock_bh(&xprt->bc_pa_lock);
149f531a5dbSChuck Lever 	}
150f531a5dbSChuck Lever 
151f531a5dbSChuck Lever 	rc = rpcrdma_bc_setup_reps(r_xprt, reqs);
152f531a5dbSChuck Lever 	if (rc)
153f531a5dbSChuck Lever 		goto out_free;
154f531a5dbSChuck Lever 
155f531a5dbSChuck Lever 	rc = rpcrdma_ep_post_extra_recv(r_xprt, reqs);
156f531a5dbSChuck Lever 	if (rc)
157f531a5dbSChuck Lever 		goto out_free;
158f531a5dbSChuck Lever 
159f531a5dbSChuck Lever 	buffer->rb_bc_srv_max_requests = reqs;
160f531a5dbSChuck Lever 	request_module("svcrdma");
161f531a5dbSChuck Lever 
162f531a5dbSChuck Lever 	return 0;
163f531a5dbSChuck Lever 
164f531a5dbSChuck Lever out_free:
165f531a5dbSChuck Lever 	xprt_rdma_bc_destroy(xprt, reqs);
166f531a5dbSChuck Lever 
167124fa17dSChuck Lever out_err:
168f531a5dbSChuck Lever 	pr_err("RPC:       %s: setup backchannel transport failed\n", __func__);
169f531a5dbSChuck Lever 	return -ENOMEM;
170f531a5dbSChuck Lever }
171f531a5dbSChuck Lever 
172f531a5dbSChuck Lever /**
173*83128a60SChuck Lever  * rpcrdma_bc_marshal_reply - Send backwards direction reply
174*83128a60SChuck Lever  * @rqst: buffer containing RPC reply data
175*83128a60SChuck Lever  *
176*83128a60SChuck Lever  * Returns zero on success.
177*83128a60SChuck Lever  */
178*83128a60SChuck Lever int rpcrdma_bc_marshal_reply(struct rpc_rqst *rqst)
179*83128a60SChuck Lever {
180*83128a60SChuck Lever 	struct rpc_xprt *xprt = rqst->rq_xprt;
181*83128a60SChuck Lever 	struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
182*83128a60SChuck Lever 	struct rpcrdma_req *req = rpcr_to_rdmar(rqst);
183*83128a60SChuck Lever 	struct rpcrdma_msg *headerp;
184*83128a60SChuck Lever 	size_t rpclen;
185*83128a60SChuck Lever 
186*83128a60SChuck Lever 	headerp = rdmab_to_msg(req->rl_rdmabuf);
187*83128a60SChuck Lever 	headerp->rm_xid = rqst->rq_xid;
188*83128a60SChuck Lever 	headerp->rm_vers = rpcrdma_version;
189*83128a60SChuck Lever 	headerp->rm_credit =
190*83128a60SChuck Lever 			cpu_to_be32(r_xprt->rx_buf.rb_bc_srv_max_requests);
191*83128a60SChuck Lever 	headerp->rm_type = rdma_msg;
192*83128a60SChuck Lever 	headerp->rm_body.rm_chunks[0] = xdr_zero;
193*83128a60SChuck Lever 	headerp->rm_body.rm_chunks[1] = xdr_zero;
194*83128a60SChuck Lever 	headerp->rm_body.rm_chunks[2] = xdr_zero;
195*83128a60SChuck Lever 
196*83128a60SChuck Lever 	rpclen = rqst->rq_svec[0].iov_len;
197*83128a60SChuck Lever 
198*83128a60SChuck Lever 	pr_info("RPC:       %s: rpclen %zd headerp 0x%p lkey 0x%x\n",
199*83128a60SChuck Lever 		__func__, rpclen, headerp, rdmab_lkey(req->rl_rdmabuf));
200*83128a60SChuck Lever 	pr_info("RPC:       %s: RPC/RDMA: %*ph\n",
201*83128a60SChuck Lever 		__func__, (int)RPCRDMA_HDRLEN_MIN, headerp);
202*83128a60SChuck Lever 	pr_info("RPC:       %s:      RPC: %*ph\n",
203*83128a60SChuck Lever 		__func__, (int)rpclen, rqst->rq_svec[0].iov_base);
204*83128a60SChuck Lever 
205*83128a60SChuck Lever 	req->rl_send_iov[0].addr = rdmab_addr(req->rl_rdmabuf);
206*83128a60SChuck Lever 	req->rl_send_iov[0].length = RPCRDMA_HDRLEN_MIN;
207*83128a60SChuck Lever 	req->rl_send_iov[0].lkey = rdmab_lkey(req->rl_rdmabuf);
208*83128a60SChuck Lever 
209*83128a60SChuck Lever 	req->rl_send_iov[1].addr = rdmab_addr(req->rl_sendbuf);
210*83128a60SChuck Lever 	req->rl_send_iov[1].length = rpclen;
211*83128a60SChuck Lever 	req->rl_send_iov[1].lkey = rdmab_lkey(req->rl_sendbuf);
212*83128a60SChuck Lever 
213*83128a60SChuck Lever 	req->rl_niovs = 2;
214*83128a60SChuck Lever 	return 0;
215*83128a60SChuck Lever }
216*83128a60SChuck Lever 
217*83128a60SChuck Lever /**
218f531a5dbSChuck Lever  * xprt_rdma_bc_destroy - Release resources for handling backchannel requests
219f531a5dbSChuck Lever  * @xprt: transport associated with these backchannel resources
220f531a5dbSChuck Lever  * @reqs: number of incoming requests to destroy; ignored
221f531a5dbSChuck Lever  */
222f531a5dbSChuck Lever void xprt_rdma_bc_destroy(struct rpc_xprt *xprt, unsigned int reqs)
223f531a5dbSChuck Lever {
224f531a5dbSChuck Lever 	struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
225f531a5dbSChuck Lever 	struct rpc_rqst *rqst, *tmp;
226f531a5dbSChuck Lever 
227f531a5dbSChuck Lever 	spin_lock_bh(&xprt->bc_pa_lock);
228f531a5dbSChuck Lever 	list_for_each_entry_safe(rqst, tmp, &xprt->bc_pa_list, rq_bc_pa_list) {
229f531a5dbSChuck Lever 		list_del(&rqst->rq_bc_pa_list);
230f531a5dbSChuck Lever 		spin_unlock_bh(&xprt->bc_pa_lock);
231f531a5dbSChuck Lever 
232f531a5dbSChuck Lever 		rpcrdma_bc_free_rqst(r_xprt, rqst);
233f531a5dbSChuck Lever 
234f531a5dbSChuck Lever 		spin_lock_bh(&xprt->bc_pa_lock);
235f531a5dbSChuck Lever 	}
236f531a5dbSChuck Lever 	spin_unlock_bh(&xprt->bc_pa_lock);
237f531a5dbSChuck Lever }
238f531a5dbSChuck Lever 
239f531a5dbSChuck Lever /**
240f531a5dbSChuck Lever  * xprt_rdma_bc_free_rqst - Release a backchannel rqst
241f531a5dbSChuck Lever  * @rqst: request to release
242f531a5dbSChuck Lever  */
243f531a5dbSChuck Lever void xprt_rdma_bc_free_rqst(struct rpc_rqst *rqst)
244f531a5dbSChuck Lever {
245f531a5dbSChuck Lever 	struct rpc_xprt *xprt = rqst->rq_xprt;
246f531a5dbSChuck Lever 
247f531a5dbSChuck Lever 	smp_mb__before_atomic();
248f531a5dbSChuck Lever 	WARN_ON_ONCE(!test_bit(RPC_BC_PA_IN_USE, &rqst->rq_bc_pa_state));
249f531a5dbSChuck Lever 	clear_bit(RPC_BC_PA_IN_USE, &rqst->rq_bc_pa_state);
250f531a5dbSChuck Lever 	smp_mb__after_atomic();
251f531a5dbSChuck Lever 
252f531a5dbSChuck Lever 	spin_lock_bh(&xprt->bc_pa_lock);
253f531a5dbSChuck Lever 	list_add_tail(&rqst->rq_bc_pa_list, &xprt->bc_pa_list);
254f531a5dbSChuck Lever 	spin_unlock_bh(&xprt->bc_pa_lock);
255f531a5dbSChuck Lever }
256