xref: /openbmc/linux/net/sunrpc/xprtrdma/backchannel.c (revision 124fa17d3e33060fbb28e995a42c7f5c8b31b345)
1f531a5dbSChuck Lever /*
2f531a5dbSChuck Lever  * Copyright (c) 2015 Oracle.  All rights reserved.
3f531a5dbSChuck Lever  *
4f531a5dbSChuck Lever  * Support for backward direction RPCs on RPC/RDMA.
5f531a5dbSChuck Lever  */
6f531a5dbSChuck Lever 
7f531a5dbSChuck Lever #include <linux/module.h>
8f531a5dbSChuck Lever 
9f531a5dbSChuck Lever #include "xprt_rdma.h"
10f531a5dbSChuck Lever 
11f531a5dbSChuck Lever #if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
12f531a5dbSChuck Lever # define RPCDBG_FACILITY	RPCDBG_TRANS
13f531a5dbSChuck Lever #endif
14f531a5dbSChuck Lever 
15f531a5dbSChuck Lever static void rpcrdma_bc_free_rqst(struct rpcrdma_xprt *r_xprt,
16f531a5dbSChuck Lever 				 struct rpc_rqst *rqst)
17f531a5dbSChuck Lever {
18f531a5dbSChuck Lever 	struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
19f531a5dbSChuck Lever 	struct rpcrdma_req *req = rpcr_to_rdmar(rqst);
20f531a5dbSChuck Lever 
21f531a5dbSChuck Lever 	spin_lock(&buf->rb_reqslock);
22f531a5dbSChuck Lever 	list_del(&req->rl_all);
23f531a5dbSChuck Lever 	spin_unlock(&buf->rb_reqslock);
24f531a5dbSChuck Lever 
25f531a5dbSChuck Lever 	rpcrdma_destroy_req(&r_xprt->rx_ia, req);
26f531a5dbSChuck Lever 
27f531a5dbSChuck Lever 	kfree(rqst);
28f531a5dbSChuck Lever }
29f531a5dbSChuck Lever 
30f531a5dbSChuck Lever static int rpcrdma_bc_setup_rqst(struct rpcrdma_xprt *r_xprt,
31f531a5dbSChuck Lever 				 struct rpc_rqst *rqst)
32f531a5dbSChuck Lever {
33f531a5dbSChuck Lever 	struct rpcrdma_ia *ia = &r_xprt->rx_ia;
34f531a5dbSChuck Lever 	struct rpcrdma_regbuf *rb;
35f531a5dbSChuck Lever 	struct rpcrdma_req *req;
36f531a5dbSChuck Lever 	struct xdr_buf *buf;
37f531a5dbSChuck Lever 	size_t size;
38f531a5dbSChuck Lever 
39f531a5dbSChuck Lever 	req = rpcrdma_create_req(r_xprt);
40f531a5dbSChuck Lever 	if (!req)
41f531a5dbSChuck Lever 		return -ENOMEM;
42f531a5dbSChuck Lever 	req->rl_backchannel = true;
43f531a5dbSChuck Lever 
44f531a5dbSChuck Lever 	size = RPCRDMA_INLINE_WRITE_THRESHOLD(rqst);
45f531a5dbSChuck Lever 	rb = rpcrdma_alloc_regbuf(ia, size, GFP_KERNEL);
46f531a5dbSChuck Lever 	if (IS_ERR(rb))
47f531a5dbSChuck Lever 		goto out_fail;
48f531a5dbSChuck Lever 	req->rl_rdmabuf = rb;
49f531a5dbSChuck Lever 
50f531a5dbSChuck Lever 	size += RPCRDMA_INLINE_READ_THRESHOLD(rqst);
51f531a5dbSChuck Lever 	rb = rpcrdma_alloc_regbuf(ia, size, GFP_KERNEL);
52f531a5dbSChuck Lever 	if (IS_ERR(rb))
53f531a5dbSChuck Lever 		goto out_fail;
54f531a5dbSChuck Lever 	rb->rg_owner = req;
55f531a5dbSChuck Lever 	req->rl_sendbuf = rb;
56f531a5dbSChuck Lever 	/* so that rpcr_to_rdmar works when receiving a request */
57f531a5dbSChuck Lever 	rqst->rq_buffer = (void *)req->rl_sendbuf->rg_base;
58f531a5dbSChuck Lever 
59f531a5dbSChuck Lever 	buf = &rqst->rq_snd_buf;
60f531a5dbSChuck Lever 	buf->head[0].iov_base = rqst->rq_buffer;
61f531a5dbSChuck Lever 	buf->head[0].iov_len = 0;
62f531a5dbSChuck Lever 	buf->tail[0].iov_base = NULL;
63f531a5dbSChuck Lever 	buf->tail[0].iov_len = 0;
64f531a5dbSChuck Lever 	buf->page_len = 0;
65f531a5dbSChuck Lever 	buf->len = 0;
66f531a5dbSChuck Lever 	buf->buflen = size;
67f531a5dbSChuck Lever 
68f531a5dbSChuck Lever 	return 0;
69f531a5dbSChuck Lever 
70f531a5dbSChuck Lever out_fail:
71f531a5dbSChuck Lever 	rpcrdma_bc_free_rqst(r_xprt, rqst);
72f531a5dbSChuck Lever 	return -ENOMEM;
73f531a5dbSChuck Lever }
74f531a5dbSChuck Lever 
75f531a5dbSChuck Lever /* Allocate and add receive buffers to the rpcrdma_buffer's
76f531a5dbSChuck Lever  * existing list of rep's. These are released when the
77f531a5dbSChuck Lever  * transport is destroyed.
78f531a5dbSChuck Lever  */
79f531a5dbSChuck Lever static int rpcrdma_bc_setup_reps(struct rpcrdma_xprt *r_xprt,
80f531a5dbSChuck Lever 				 unsigned int count)
81f531a5dbSChuck Lever {
82f531a5dbSChuck Lever 	struct rpcrdma_buffer *buffers = &r_xprt->rx_buf;
83f531a5dbSChuck Lever 	struct rpcrdma_rep *rep;
84f531a5dbSChuck Lever 	unsigned long flags;
85f531a5dbSChuck Lever 	int rc = 0;
86f531a5dbSChuck Lever 
87f531a5dbSChuck Lever 	while (count--) {
88f531a5dbSChuck Lever 		rep = rpcrdma_create_rep(r_xprt);
89f531a5dbSChuck Lever 		if (IS_ERR(rep)) {
90f531a5dbSChuck Lever 			pr_err("RPC:       %s: reply buffer alloc failed\n",
91f531a5dbSChuck Lever 			       __func__);
92f531a5dbSChuck Lever 			rc = PTR_ERR(rep);
93f531a5dbSChuck Lever 			break;
94f531a5dbSChuck Lever 		}
95f531a5dbSChuck Lever 
96f531a5dbSChuck Lever 		spin_lock_irqsave(&buffers->rb_lock, flags);
97f531a5dbSChuck Lever 		list_add(&rep->rr_list, &buffers->rb_recv_bufs);
98f531a5dbSChuck Lever 		spin_unlock_irqrestore(&buffers->rb_lock, flags);
99f531a5dbSChuck Lever 	}
100f531a5dbSChuck Lever 
101f531a5dbSChuck Lever 	return rc;
102f531a5dbSChuck Lever }
103f531a5dbSChuck Lever 
104f531a5dbSChuck Lever /**
105f531a5dbSChuck Lever  * xprt_rdma_bc_setup - Pre-allocate resources for handling backchannel requests
106f531a5dbSChuck Lever  * @xprt: transport associated with these backchannel resources
107f531a5dbSChuck Lever  * @reqs: number of concurrent incoming requests to expect
108f531a5dbSChuck Lever  *
109f531a5dbSChuck Lever  * Returns 0 on success; otherwise a negative errno
110f531a5dbSChuck Lever  */
111f531a5dbSChuck Lever int xprt_rdma_bc_setup(struct rpc_xprt *xprt, unsigned int reqs)
112f531a5dbSChuck Lever {
113f531a5dbSChuck Lever 	struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
114f531a5dbSChuck Lever 	struct rpcrdma_buffer *buffer = &r_xprt->rx_buf;
115f531a5dbSChuck Lever 	struct rpc_rqst *rqst;
116f531a5dbSChuck Lever 	unsigned int i;
117f531a5dbSChuck Lever 	int rc;
118f531a5dbSChuck Lever 
119f531a5dbSChuck Lever 	/* The backchannel reply path returns each rpc_rqst to the
120f531a5dbSChuck Lever 	 * bc_pa_list _after_ the reply is sent. If the server is
121f531a5dbSChuck Lever 	 * faster than the client, it can send another backward
122f531a5dbSChuck Lever 	 * direction request before the rpc_rqst is returned to the
123f531a5dbSChuck Lever 	 * list. The client rejects the request in this case.
124f531a5dbSChuck Lever 	 *
125f531a5dbSChuck Lever 	 * Twice as many rpc_rqsts are prepared to ensure there is
126f531a5dbSChuck Lever 	 * always an rpc_rqst available as soon as a reply is sent.
127f531a5dbSChuck Lever 	 */
128*124fa17dSChuck Lever 	if (reqs > RPCRDMA_BACKWARD_WRS >> 1)
129*124fa17dSChuck Lever 		goto out_err;
130*124fa17dSChuck Lever 
131f531a5dbSChuck Lever 	for (i = 0; i < (reqs << 1); i++) {
132f531a5dbSChuck Lever 		rqst = kzalloc(sizeof(*rqst), GFP_KERNEL);
133f531a5dbSChuck Lever 		if (!rqst) {
134f531a5dbSChuck Lever 			pr_err("RPC:       %s: Failed to create bc rpc_rqst\n",
135f531a5dbSChuck Lever 			       __func__);
136f531a5dbSChuck Lever 			goto out_free;
137f531a5dbSChuck Lever 		}
138f531a5dbSChuck Lever 
139f531a5dbSChuck Lever 		rqst->rq_xprt = &r_xprt->rx_xprt;
140f531a5dbSChuck Lever 		INIT_LIST_HEAD(&rqst->rq_list);
141f531a5dbSChuck Lever 		INIT_LIST_HEAD(&rqst->rq_bc_list);
142f531a5dbSChuck Lever 
143f531a5dbSChuck Lever 		if (rpcrdma_bc_setup_rqst(r_xprt, rqst))
144f531a5dbSChuck Lever 			goto out_free;
145f531a5dbSChuck Lever 
146f531a5dbSChuck Lever 		spin_lock_bh(&xprt->bc_pa_lock);
147f531a5dbSChuck Lever 		list_add(&rqst->rq_bc_pa_list, &xprt->bc_pa_list);
148f531a5dbSChuck Lever 		spin_unlock_bh(&xprt->bc_pa_lock);
149f531a5dbSChuck Lever 	}
150f531a5dbSChuck Lever 
151f531a5dbSChuck Lever 	rc = rpcrdma_bc_setup_reps(r_xprt, reqs);
152f531a5dbSChuck Lever 	if (rc)
153f531a5dbSChuck Lever 		goto out_free;
154f531a5dbSChuck Lever 
155f531a5dbSChuck Lever 	rc = rpcrdma_ep_post_extra_recv(r_xprt, reqs);
156f531a5dbSChuck Lever 	if (rc)
157f531a5dbSChuck Lever 		goto out_free;
158f531a5dbSChuck Lever 
159f531a5dbSChuck Lever 	buffer->rb_bc_srv_max_requests = reqs;
160f531a5dbSChuck Lever 	request_module("svcrdma");
161f531a5dbSChuck Lever 
162f531a5dbSChuck Lever 	return 0;
163f531a5dbSChuck Lever 
164f531a5dbSChuck Lever out_free:
165f531a5dbSChuck Lever 	xprt_rdma_bc_destroy(xprt, reqs);
166f531a5dbSChuck Lever 
167*124fa17dSChuck Lever out_err:
168f531a5dbSChuck Lever 	pr_err("RPC:       %s: setup backchannel transport failed\n", __func__);
169f531a5dbSChuck Lever 	return -ENOMEM;
170f531a5dbSChuck Lever }
171f531a5dbSChuck Lever 
172f531a5dbSChuck Lever /**
173f531a5dbSChuck Lever  * xprt_rdma_bc_destroy - Release resources for handling backchannel requests
174f531a5dbSChuck Lever  * @xprt: transport associated with these backchannel resources
175f531a5dbSChuck Lever  * @reqs: number of incoming requests to destroy; ignored
176f531a5dbSChuck Lever  */
177f531a5dbSChuck Lever void xprt_rdma_bc_destroy(struct rpc_xprt *xprt, unsigned int reqs)
178f531a5dbSChuck Lever {
179f531a5dbSChuck Lever 	struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
180f531a5dbSChuck Lever 	struct rpc_rqst *rqst, *tmp;
181f531a5dbSChuck Lever 
182f531a5dbSChuck Lever 	spin_lock_bh(&xprt->bc_pa_lock);
183f531a5dbSChuck Lever 	list_for_each_entry_safe(rqst, tmp, &xprt->bc_pa_list, rq_bc_pa_list) {
184f531a5dbSChuck Lever 		list_del(&rqst->rq_bc_pa_list);
185f531a5dbSChuck Lever 		spin_unlock_bh(&xprt->bc_pa_lock);
186f531a5dbSChuck Lever 
187f531a5dbSChuck Lever 		rpcrdma_bc_free_rqst(r_xprt, rqst);
188f531a5dbSChuck Lever 
189f531a5dbSChuck Lever 		spin_lock_bh(&xprt->bc_pa_lock);
190f531a5dbSChuck Lever 	}
191f531a5dbSChuck Lever 	spin_unlock_bh(&xprt->bc_pa_lock);
192f531a5dbSChuck Lever }
193f531a5dbSChuck Lever 
194f531a5dbSChuck Lever /**
195f531a5dbSChuck Lever  * xprt_rdma_bc_free_rqst - Release a backchannel rqst
196f531a5dbSChuck Lever  * @rqst: request to release
197f531a5dbSChuck Lever  */
198f531a5dbSChuck Lever void xprt_rdma_bc_free_rqst(struct rpc_rqst *rqst)
199f531a5dbSChuck Lever {
200f531a5dbSChuck Lever 	struct rpc_xprt *xprt = rqst->rq_xprt;
201f531a5dbSChuck Lever 
202f531a5dbSChuck Lever 	smp_mb__before_atomic();
203f531a5dbSChuck Lever 	WARN_ON_ONCE(!test_bit(RPC_BC_PA_IN_USE, &rqst->rq_bc_pa_state));
204f531a5dbSChuck Lever 	clear_bit(RPC_BC_PA_IN_USE, &rqst->rq_bc_pa_state);
205f531a5dbSChuck Lever 	smp_mb__after_atomic();
206f531a5dbSChuck Lever 
207f531a5dbSChuck Lever 	spin_lock_bh(&xprt->bc_pa_lock);
208f531a5dbSChuck Lever 	list_add_tail(&rqst->rq_bc_pa_list, &xprt->bc_pa_list);
209f531a5dbSChuck Lever 	spin_unlock_bh(&xprt->bc_pa_lock);
210f531a5dbSChuck Lever }
211