xref: /openbmc/linux/net/sunrpc/backchannel_rqst.c (revision 2cfe9bbe)
1 // SPDX-License-Identifier: GPL-2.0-only
2 /******************************************************************************
3 
4 (c) 2007 Network Appliance, Inc.  All Rights Reserved.
5 (c) 2009 NetApp.  All Rights Reserved.
6 
7 
8 ******************************************************************************/
9 
10 #include <linux/tcp.h>
11 #include <linux/slab.h>
12 #include <linux/sunrpc/xprt.h>
13 #include <linux/export.h>
14 #include <linux/sunrpc/bc_xprt.h>
15 
16 #if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
17 #define RPCDBG_FACILITY	RPCDBG_TRANS
18 #endif
19 
20 #define BC_MAX_SLOTS	64U
21 
22 unsigned int xprt_bc_max_slots(struct rpc_xprt *xprt)
23 {
24 	return BC_MAX_SLOTS;
25 }
26 
27 /*
28  * Helper routines that track the number of preallocation elements
29  * on the transport.
30  */
31 static inline int xprt_need_to_requeue(struct rpc_xprt *xprt)
32 {
33 	return xprt->bc_alloc_count < xprt->bc_alloc_max;
34 }
35 
36 /*
37  * Free the preallocated rpc_rqst structure and the memory
38  * buffers hanging off of it.
39  */
40 static void xprt_free_allocation(struct rpc_rqst *req)
41 {
42 	struct xdr_buf *xbufp;
43 
44 	dprintk("RPC:        free allocations for req= %p\n", req);
45 	WARN_ON_ONCE(test_bit(RPC_BC_PA_IN_USE, &req->rq_bc_pa_state));
46 	xbufp = &req->rq_rcv_buf;
47 	free_page((unsigned long)xbufp->head[0].iov_base);
48 	xbufp = &req->rq_snd_buf;
49 	free_page((unsigned long)xbufp->head[0].iov_base);
50 	kfree(req);
51 }
52 
53 static int xprt_alloc_xdr_buf(struct xdr_buf *buf, gfp_t gfp_flags)
54 {
55 	struct page *page;
56 	/* Preallocate one XDR receive buffer */
57 	page = alloc_page(gfp_flags);
58 	if (page == NULL)
59 		return -ENOMEM;
60 	xdr_buf_init(buf, page_address(page), PAGE_SIZE);
61 	return 0;
62 }
63 
64 static struct rpc_rqst *xprt_alloc_bc_req(struct rpc_xprt *xprt)
65 {
66 	gfp_t gfp_flags = GFP_KERNEL | __GFP_NORETRY | __GFP_NOWARN;
67 	struct rpc_rqst *req;
68 
69 	/* Pre-allocate one backchannel rpc_rqst */
70 	req = kzalloc(sizeof(*req), gfp_flags);
71 	if (req == NULL)
72 		return NULL;
73 
74 	req->rq_xprt = xprt;
75 	INIT_LIST_HEAD(&req->rq_bc_list);
76 
77 	/* Preallocate one XDR receive buffer */
78 	if (xprt_alloc_xdr_buf(&req->rq_rcv_buf, gfp_flags) < 0) {
79 		printk(KERN_ERR "Failed to create bc receive xbuf\n");
80 		goto out_free;
81 	}
82 	req->rq_rcv_buf.len = PAGE_SIZE;
83 
84 	/* Preallocate one XDR send buffer */
85 	if (xprt_alloc_xdr_buf(&req->rq_snd_buf, gfp_flags) < 0) {
86 		printk(KERN_ERR "Failed to create bc snd xbuf\n");
87 		goto out_free;
88 	}
89 	return req;
90 out_free:
91 	xprt_free_allocation(req);
92 	return NULL;
93 }
94 
95 /*
96  * Preallocate up to min_reqs structures and related buffers for use
97  * by the backchannel.  This function can be called multiple times
98  * when creating new sessions that use the same rpc_xprt.  The
99  * preallocated buffers are added to the pool of resources used by
100  * the rpc_xprt.  Any one of these resources may be used by an
101  * incoming callback request.  It's up to the higher levels in the
102  * stack to enforce that the maximum number of session slots is not
103  * being exceeded.
104  *
105  * Some callback arguments can be large.  For example, a pNFS server
106  * using multiple deviceids.  The list can be unbound, but the client
107  * has the ability to tell the server the maximum size of the callback
108  * requests.  Each deviceID is 16 bytes, so allocate one page
109  * for the arguments to have enough room to receive a number of these
110  * deviceIDs.  The NFS client indicates to the pNFS server that its
111  * callback requests can be up to 4096 bytes in size.
112  */
113 int xprt_setup_backchannel(struct rpc_xprt *xprt, unsigned int min_reqs)
114 {
115 	if (!xprt->ops->bc_setup)
116 		return 0;
117 	return xprt->ops->bc_setup(xprt, min_reqs);
118 }
119 EXPORT_SYMBOL_GPL(xprt_setup_backchannel);
120 
121 int xprt_setup_bc(struct rpc_xprt *xprt, unsigned int min_reqs)
122 {
123 	struct rpc_rqst *req;
124 	struct list_head tmp_list;
125 	int i;
126 
127 	dprintk("RPC:       setup backchannel transport\n");
128 
129 	if (min_reqs > BC_MAX_SLOTS)
130 		min_reqs = BC_MAX_SLOTS;
131 
132 	/*
133 	 * We use a temporary list to keep track of the preallocated
134 	 * buffers.  Once we're done building the list we splice it
135 	 * into the backchannel preallocation list off of the rpc_xprt
136 	 * struct.  This helps minimize the amount of time the list
137 	 * lock is held on the rpc_xprt struct.  It also makes cleanup
138 	 * easier in case of memory allocation errors.
139 	 */
140 	INIT_LIST_HEAD(&tmp_list);
141 	for (i = 0; i < min_reqs; i++) {
142 		/* Pre-allocate one backchannel rpc_rqst */
143 		req = xprt_alloc_bc_req(xprt);
144 		if (req == NULL) {
145 			printk(KERN_ERR "Failed to create bc rpc_rqst\n");
146 			goto out_free;
147 		}
148 
149 		/* Add the allocated buffer to the tmp list */
150 		dprintk("RPC:       adding req= %p\n", req);
151 		list_add(&req->rq_bc_pa_list, &tmp_list);
152 	}
153 
154 	/*
155 	 * Add the temporary list to the backchannel preallocation list
156 	 */
157 	spin_lock(&xprt->bc_pa_lock);
158 	list_splice(&tmp_list, &xprt->bc_pa_list);
159 	xprt->bc_alloc_count += min_reqs;
160 	xprt->bc_alloc_max += min_reqs;
161 	atomic_add(min_reqs, &xprt->bc_slot_count);
162 	spin_unlock(&xprt->bc_pa_lock);
163 
164 	dprintk("RPC:       setup backchannel transport done\n");
165 	return 0;
166 
167 out_free:
168 	/*
169 	 * Memory allocation failed, free the temporary list
170 	 */
171 	while (!list_empty(&tmp_list)) {
172 		req = list_first_entry(&tmp_list,
173 				struct rpc_rqst,
174 				rq_bc_pa_list);
175 		list_del(&req->rq_bc_pa_list);
176 		xprt_free_allocation(req);
177 	}
178 
179 	dprintk("RPC:       setup backchannel transport failed\n");
180 	return -ENOMEM;
181 }
182 
183 /**
184  * xprt_destroy_backchannel - Destroys the backchannel preallocated structures.
185  * @xprt:	the transport holding the preallocated strucures
186  * @max_reqs:	the maximum number of preallocated structures to destroy
187  *
188  * Since these structures may have been allocated by multiple calls
189  * to xprt_setup_backchannel, we only destroy up to the maximum number
190  * of reqs specified by the caller.
191  */
192 void xprt_destroy_backchannel(struct rpc_xprt *xprt, unsigned int max_reqs)
193 {
194 	if (xprt->ops->bc_destroy)
195 		xprt->ops->bc_destroy(xprt, max_reqs);
196 }
197 EXPORT_SYMBOL_GPL(xprt_destroy_backchannel);
198 
199 void xprt_destroy_bc(struct rpc_xprt *xprt, unsigned int max_reqs)
200 {
201 	struct rpc_rqst *req = NULL, *tmp = NULL;
202 
203 	dprintk("RPC:        destroy backchannel transport\n");
204 
205 	if (max_reqs == 0)
206 		goto out;
207 
208 	spin_lock_bh(&xprt->bc_pa_lock);
209 	xprt->bc_alloc_max -= min(max_reqs, xprt->bc_alloc_max);
210 	list_for_each_entry_safe(req, tmp, &xprt->bc_pa_list, rq_bc_pa_list) {
211 		dprintk("RPC:        req=%p\n", req);
212 		list_del(&req->rq_bc_pa_list);
213 		xprt_free_allocation(req);
214 		xprt->bc_alloc_count--;
215 		atomic_dec(&xprt->bc_slot_count);
216 		if (--max_reqs == 0)
217 			break;
218 	}
219 	spin_unlock_bh(&xprt->bc_pa_lock);
220 
221 out:
222 	dprintk("RPC:        backchannel list empty= %s\n",
223 		list_empty(&xprt->bc_pa_list) ? "true" : "false");
224 }
225 
226 static struct rpc_rqst *xprt_get_bc_request(struct rpc_xprt *xprt, __be32 xid,
227 		struct rpc_rqst *new)
228 {
229 	struct rpc_rqst *req = NULL;
230 
231 	dprintk("RPC:       allocate a backchannel request\n");
232 	if (list_empty(&xprt->bc_pa_list)) {
233 		if (!new)
234 			goto not_found;
235 		if (atomic_read(&xprt->bc_slot_count) >= BC_MAX_SLOTS)
236 			goto not_found;
237 		list_add_tail(&new->rq_bc_pa_list, &xprt->bc_pa_list);
238 		xprt->bc_alloc_count++;
239 		atomic_inc(&xprt->bc_slot_count);
240 	}
241 	req = list_first_entry(&xprt->bc_pa_list, struct rpc_rqst,
242 				rq_bc_pa_list);
243 	req->rq_reply_bytes_recvd = 0;
244 	memcpy(&req->rq_private_buf, &req->rq_rcv_buf,
245 			sizeof(req->rq_private_buf));
246 	req->rq_xid = xid;
247 	req->rq_connect_cookie = xprt->connect_cookie;
248 	dprintk("RPC:       backchannel req=%p\n", req);
249 not_found:
250 	return req;
251 }
252 
253 /*
254  * Return the preallocated rpc_rqst structure and XDR buffers
255  * associated with this rpc_task.
256  */
257 void xprt_free_bc_request(struct rpc_rqst *req)
258 {
259 	struct rpc_xprt *xprt = req->rq_xprt;
260 
261 	xprt->ops->bc_free_rqst(req);
262 }
263 
264 void xprt_free_bc_rqst(struct rpc_rqst *req)
265 {
266 	struct rpc_xprt *xprt = req->rq_xprt;
267 
268 	dprintk("RPC:       free backchannel req=%p\n", req);
269 
270 	req->rq_connect_cookie = xprt->connect_cookie - 1;
271 	smp_mb__before_atomic();
272 	clear_bit(RPC_BC_PA_IN_USE, &req->rq_bc_pa_state);
273 	smp_mb__after_atomic();
274 
275 	/*
276 	 * Return it to the list of preallocations so that it
277 	 * may be reused by a new callback request.
278 	 */
279 	spin_lock_bh(&xprt->bc_pa_lock);
280 	if (xprt_need_to_requeue(xprt)) {
281 		list_add_tail(&req->rq_bc_pa_list, &xprt->bc_pa_list);
282 		xprt->bc_alloc_count++;
283 		atomic_inc(&xprt->bc_slot_count);
284 		req = NULL;
285 	}
286 	spin_unlock_bh(&xprt->bc_pa_lock);
287 	if (req != NULL) {
288 		/*
289 		 * The last remaining session was destroyed while this
290 		 * entry was in use.  Free the entry and don't attempt
291 		 * to add back to the list because there is no need to
292 		 * have anymore preallocated entries.
293 		 */
294 		dprintk("RPC:       Last session removed req=%p\n", req);
295 		xprt_free_allocation(req);
296 	}
297 	xprt_put(xprt);
298 }
299 
300 /*
301  * One or more rpc_rqst structure have been preallocated during the
302  * backchannel setup.  Buffer space for the send and private XDR buffers
303  * has been preallocated as well.  Use xprt_alloc_bc_request to allocate
304  * to this request.  Use xprt_free_bc_request to return it.
305  *
306  * We know that we're called in soft interrupt context, grab the spin_lock
307  * since there is no need to grab the bottom half spin_lock.
308  *
309  * Return an available rpc_rqst, otherwise NULL if non are available.
310  */
311 struct rpc_rqst *xprt_lookup_bc_request(struct rpc_xprt *xprt, __be32 xid)
312 {
313 	struct rpc_rqst *req, *new = NULL;
314 
315 	do {
316 		spin_lock(&xprt->bc_pa_lock);
317 		list_for_each_entry(req, &xprt->bc_pa_list, rq_bc_pa_list) {
318 			if (req->rq_connect_cookie != xprt->connect_cookie)
319 				continue;
320 			if (req->rq_xid == xid)
321 				goto found;
322 		}
323 		req = xprt_get_bc_request(xprt, xid, new);
324 found:
325 		spin_unlock(&xprt->bc_pa_lock);
326 		if (new) {
327 			if (req != new)
328 				xprt_free_allocation(new);
329 			break;
330 		} else if (req)
331 			break;
332 		new = xprt_alloc_bc_req(xprt);
333 	} while (new);
334 	return req;
335 }
336 
337 /*
338  * Add callback request to callback list.  The callback
339  * service sleeps on the sv_cb_waitq waiting for new
340  * requests.  Wake it up after adding enqueing the
341  * request.
342  */
343 void xprt_complete_bc_request(struct rpc_rqst *req, uint32_t copied)
344 {
345 	struct rpc_xprt *xprt = req->rq_xprt;
346 	struct svc_serv *bc_serv = xprt->bc_serv;
347 
348 	spin_lock(&xprt->bc_pa_lock);
349 	list_del(&req->rq_bc_pa_list);
350 	xprt->bc_alloc_count--;
351 	spin_unlock(&xprt->bc_pa_lock);
352 
353 	req->rq_private_buf.len = copied;
354 	set_bit(RPC_BC_PA_IN_USE, &req->rq_bc_pa_state);
355 
356 	dprintk("RPC:       add callback request to list\n");
357 	xprt_get(xprt);
358 	spin_lock(&bc_serv->sv_cb_lock);
359 	list_add(&req->rq_bc_list, &bc_serv->sv_cb_list);
360 	wake_up(&bc_serv->sv_cb_waitq);
361 	spin_unlock(&bc_serv->sv_cb_lock);
362 }
363