xref: /openbmc/linux/net/sunrpc/xprtmultipath.c (revision 163f8821)
1 // SPDX-License-Identifier: GPL-2.0
2 /*
3  * Multipath support for RPC
4  *
5  * Copyright (c) 2015, 2016, Primary Data, Inc. All rights reserved.
6  *
7  * Trond Myklebust <trond.myklebust@primarydata.com>
8  *
9  */
10 #include <linux/types.h>
11 #include <linux/kref.h>
12 #include <linux/list.h>
13 #include <linux/rcupdate.h>
14 #include <linux/rculist.h>
15 #include <linux/slab.h>
16 #include <asm/cmpxchg.h>
17 #include <linux/spinlock.h>
18 #include <linux/sunrpc/xprt.h>
19 #include <linux/sunrpc/addr.h>
20 #include <linux/sunrpc/xprtmultipath.h>
21 
22 typedef struct rpc_xprt *(*xprt_switch_find_xprt_t)(struct list_head *head,
23 		const struct rpc_xprt *cur);
24 
25 static const struct rpc_xprt_iter_ops rpc_xprt_iter_singular;
26 static const struct rpc_xprt_iter_ops rpc_xprt_iter_roundrobin;
27 static const struct rpc_xprt_iter_ops rpc_xprt_iter_listall;
28 
29 static void xprt_switch_add_xprt_locked(struct rpc_xprt_switch *xps,
30 		struct rpc_xprt *xprt)
31 {
32 	if (unlikely(xprt_get(xprt) == NULL))
33 		return;
34 	list_add_tail_rcu(&xprt->xprt_switch, &xps->xps_xprt_list);
35 	smp_wmb();
36 	if (xps->xps_nxprts == 0)
37 		xps->xps_net = xprt->xprt_net;
38 	xps->xps_nxprts++;
39 	xps->xps_nactive++;
40 }
41 
42 /**
43  * rpc_xprt_switch_add_xprt - Add a new rpc_xprt to an rpc_xprt_switch
44  * @xps: pointer to struct rpc_xprt_switch
45  * @xprt: pointer to struct rpc_xprt
46  *
47  * Adds xprt to the end of the list of struct rpc_xprt in xps.
48  */
49 void rpc_xprt_switch_add_xprt(struct rpc_xprt_switch *xps,
50 		struct rpc_xprt *xprt)
51 {
52 	if (xprt == NULL)
53 		return;
54 	spin_lock(&xps->xps_lock);
55 	if (xps->xps_net == xprt->xprt_net || xps->xps_net == NULL)
56 		xprt_switch_add_xprt_locked(xps, xprt);
57 	spin_unlock(&xps->xps_lock);
58 }
59 
60 static void xprt_switch_remove_xprt_locked(struct rpc_xprt_switch *xps,
61 		struct rpc_xprt *xprt)
62 {
63 	if (unlikely(xprt == NULL))
64 		return;
65 	xps->xps_nactive--;
66 	xps->xps_nxprts--;
67 	if (xps->xps_nxprts == 0)
68 		xps->xps_net = NULL;
69 	smp_wmb();
70 	list_del_rcu(&xprt->xprt_switch);
71 }
72 
73 /**
74  * rpc_xprt_switch_remove_xprt - Removes an rpc_xprt from a rpc_xprt_switch
75  * @xps: pointer to struct rpc_xprt_switch
76  * @xprt: pointer to struct rpc_xprt
77  *
78  * Removes xprt from the list of struct rpc_xprt in xps.
79  */
80 void rpc_xprt_switch_remove_xprt(struct rpc_xprt_switch *xps,
81 		struct rpc_xprt *xprt)
82 {
83 	spin_lock(&xps->xps_lock);
84 	xprt_switch_remove_xprt_locked(xps, xprt);
85 	spin_unlock(&xps->xps_lock);
86 	xprt_put(xprt);
87 }
88 
89 /**
90  * xprt_switch_alloc - Allocate a new struct rpc_xprt_switch
91  * @xprt: pointer to struct rpc_xprt
92  * @gfp_flags: allocation flags
93  *
94  * On success, returns an initialised struct rpc_xprt_switch, containing
95  * the entry xprt. Returns NULL on failure.
96  */
97 struct rpc_xprt_switch *xprt_switch_alloc(struct rpc_xprt *xprt,
98 		gfp_t gfp_flags)
99 {
100 	struct rpc_xprt_switch *xps;
101 
102 	xps = kmalloc(sizeof(*xps), gfp_flags);
103 	if (xps != NULL) {
104 		spin_lock_init(&xps->xps_lock);
105 		kref_init(&xps->xps_kref);
106 		xps->xps_nxprts = 0;
107 		INIT_LIST_HEAD(&xps->xps_xprt_list);
108 		xps->xps_iter_ops = &rpc_xprt_iter_singular;
109 		xprt_switch_add_xprt_locked(xps, xprt);
110 	}
111 
112 	return xps;
113 }
114 
115 static void xprt_switch_free_entries(struct rpc_xprt_switch *xps)
116 {
117 	spin_lock(&xps->xps_lock);
118 	while (!list_empty(&xps->xps_xprt_list)) {
119 		struct rpc_xprt *xprt;
120 
121 		xprt = list_first_entry(&xps->xps_xprt_list,
122 				struct rpc_xprt, xprt_switch);
123 		xprt_switch_remove_xprt_locked(xps, xprt);
124 		spin_unlock(&xps->xps_lock);
125 		xprt_put(xprt);
126 		spin_lock(&xps->xps_lock);
127 	}
128 	spin_unlock(&xps->xps_lock);
129 }
130 
131 static void xprt_switch_free(struct kref *kref)
132 {
133 	struct rpc_xprt_switch *xps = container_of(kref,
134 			struct rpc_xprt_switch, xps_kref);
135 
136 	xprt_switch_free_entries(xps);
137 	kfree_rcu(xps, xps_rcu);
138 }
139 
140 /**
141  * xprt_switch_get - Return a reference to a rpc_xprt_switch
142  * @xps: pointer to struct rpc_xprt_switch
143  *
144  * Returns a reference to xps unless the refcount is already zero.
145  */
146 struct rpc_xprt_switch *xprt_switch_get(struct rpc_xprt_switch *xps)
147 {
148 	if (xps != NULL && kref_get_unless_zero(&xps->xps_kref))
149 		return xps;
150 	return NULL;
151 }
152 
153 /**
154  * xprt_switch_put - Release a reference to a rpc_xprt_switch
155  * @xps: pointer to struct rpc_xprt_switch
156  *
157  * Release the reference to xps, and free it once the refcount is zero.
158  */
159 void xprt_switch_put(struct rpc_xprt_switch *xps)
160 {
161 	if (xps != NULL)
162 		kref_put(&xps->xps_kref, xprt_switch_free);
163 }
164 
165 /**
166  * rpc_xprt_switch_set_roundrobin - Set a round-robin policy on rpc_xprt_switch
167  * @xps: pointer to struct rpc_xprt_switch
168  *
169  * Sets a round-robin default policy for iterators acting on xps.
170  */
171 void rpc_xprt_switch_set_roundrobin(struct rpc_xprt_switch *xps)
172 {
173 	if (READ_ONCE(xps->xps_iter_ops) != &rpc_xprt_iter_roundrobin)
174 		WRITE_ONCE(xps->xps_iter_ops, &rpc_xprt_iter_roundrobin);
175 }
176 
177 static
178 const struct rpc_xprt_iter_ops *xprt_iter_ops(const struct rpc_xprt_iter *xpi)
179 {
180 	if (xpi->xpi_ops != NULL)
181 		return xpi->xpi_ops;
182 	return rcu_dereference(xpi->xpi_xpswitch)->xps_iter_ops;
183 }
184 
185 static
186 void xprt_iter_no_rewind(struct rpc_xprt_iter *xpi)
187 {
188 }
189 
190 static
191 void xprt_iter_default_rewind(struct rpc_xprt_iter *xpi)
192 {
193 	WRITE_ONCE(xpi->xpi_cursor, NULL);
194 }
195 
196 static
197 bool xprt_is_active(const struct rpc_xprt *xprt)
198 {
199 	return kref_read(&xprt->kref) != 0;
200 }
201 
202 static
203 struct rpc_xprt *xprt_switch_find_first_entry(struct list_head *head)
204 {
205 	struct rpc_xprt *pos;
206 
207 	list_for_each_entry_rcu(pos, head, xprt_switch) {
208 		if (xprt_is_active(pos))
209 			return pos;
210 	}
211 	return NULL;
212 }
213 
214 static
215 struct rpc_xprt *xprt_iter_first_entry(struct rpc_xprt_iter *xpi)
216 {
217 	struct rpc_xprt_switch *xps = rcu_dereference(xpi->xpi_xpswitch);
218 
219 	if (xps == NULL)
220 		return NULL;
221 	return xprt_switch_find_first_entry(&xps->xps_xprt_list);
222 }
223 
224 static
225 struct rpc_xprt *xprt_switch_find_current_entry(struct list_head *head,
226 		const struct rpc_xprt *cur)
227 {
228 	struct rpc_xprt *pos;
229 	bool found = false;
230 
231 	list_for_each_entry_rcu(pos, head, xprt_switch) {
232 		if (cur == pos)
233 			found = true;
234 		if (found && xprt_is_active(pos))
235 			return pos;
236 	}
237 	return NULL;
238 }
239 
240 static
241 struct rpc_xprt *xprt_iter_current_entry(struct rpc_xprt_iter *xpi)
242 {
243 	struct rpc_xprt_switch *xps = rcu_dereference(xpi->xpi_xpswitch);
244 	struct list_head *head;
245 
246 	if (xps == NULL)
247 		return NULL;
248 	head = &xps->xps_xprt_list;
249 	if (xpi->xpi_cursor == NULL || xps->xps_nxprts < 2)
250 		return xprt_switch_find_first_entry(head);
251 	return xprt_switch_find_current_entry(head, xpi->xpi_cursor);
252 }
253 
254 bool rpc_xprt_switch_has_addr(struct rpc_xprt_switch *xps,
255 			      const struct sockaddr *sap)
256 {
257 	struct list_head *head;
258 	struct rpc_xprt *pos;
259 
260 	if (xps == NULL || sap == NULL)
261 		return false;
262 
263 	head = &xps->xps_xprt_list;
264 	list_for_each_entry_rcu(pos, head, xprt_switch) {
265 		if (rpc_cmp_addr_port(sap, (struct sockaddr *)&pos->addr)) {
266 			pr_info("RPC:   addr %s already in xprt switch\n",
267 				pos->address_strings[RPC_DISPLAY_ADDR]);
268 			return true;
269 		}
270 	}
271 	return false;
272 }
273 
274 static
275 struct rpc_xprt *xprt_switch_find_next_entry(struct list_head *head,
276 		const struct rpc_xprt *cur)
277 {
278 	struct rpc_xprt *pos, *prev = NULL;
279 	bool found = false;
280 
281 	list_for_each_entry_rcu(pos, head, xprt_switch) {
282 		if (cur == prev)
283 			found = true;
284 		if (found && xprt_is_active(pos))
285 			return pos;
286 		prev = pos;
287 	}
288 	return NULL;
289 }
290 
291 static
292 struct rpc_xprt *xprt_switch_set_next_cursor(struct list_head *head,
293 		struct rpc_xprt **cursor,
294 		xprt_switch_find_xprt_t find_next)
295 {
296 	struct rpc_xprt *cur, *pos, *old;
297 
298 	cur = READ_ONCE(*cursor);
299 	for (;;) {
300 		old = cur;
301 		pos = find_next(head, old);
302 		if (pos == NULL)
303 			break;
304 		cur = cmpxchg_relaxed(cursor, old, pos);
305 		if (cur == old)
306 			break;
307 	}
308 	return pos;
309 }
310 
311 static
312 struct rpc_xprt *xprt_iter_next_entry_multiple(struct rpc_xprt_iter *xpi,
313 		xprt_switch_find_xprt_t find_next)
314 {
315 	struct rpc_xprt_switch *xps = rcu_dereference(xpi->xpi_xpswitch);
316 
317 	if (xps == NULL)
318 		return NULL;
319 	return xprt_switch_set_next_cursor(&xps->xps_xprt_list,
320 			&xpi->xpi_cursor,
321 			find_next);
322 }
323 
324 static
325 struct rpc_xprt *xprt_switch_find_next_entry_roundrobin(struct list_head *head,
326 		const struct rpc_xprt *cur)
327 {
328 	struct rpc_xprt *ret;
329 
330 	ret = xprt_switch_find_next_entry(head, cur);
331 	if (ret != NULL)
332 		return ret;
333 	return xprt_switch_find_first_entry(head);
334 }
335 
336 static
337 struct rpc_xprt *xprt_iter_next_entry_roundrobin(struct rpc_xprt_iter *xpi)
338 {
339 	struct rpc_xprt_switch *xps = rcu_dereference(xpi->xpi_xpswitch);
340 	struct rpc_xprt *xprt;
341 	unsigned long xprt_queuelen;
342 	unsigned long xps_queuelen;
343 
344 	do {
345 		xprt = xprt_iter_next_entry_multiple(xpi,
346 			xprt_switch_find_next_entry_roundrobin);
347 		if (xprt == NULL)
348 			break;
349 		xprt_queuelen = atomic_long_read(&xprt->queuelen);
350 		if (xprt_queuelen <= 2)
351 			break;
352 		xps_queuelen = atomic_long_read(&xps->xps_queuelen);
353 		/* Exit loop if xprt_queuelen <= average queue length */
354 	} while (xprt_queuelen * READ_ONCE(xps->xps_nactive) > xps_queuelen);
355 	return xprt;
356 }
357 
358 static
359 struct rpc_xprt *xprt_iter_next_entry_all(struct rpc_xprt_iter *xpi)
360 {
361 	return xprt_iter_next_entry_multiple(xpi, xprt_switch_find_next_entry);
362 }
363 
364 /*
365  * xprt_iter_rewind - Resets the xprt iterator
366  * @xpi: pointer to rpc_xprt_iter
367  *
368  * Resets xpi to ensure that it points to the first entry in the list
369  * of transports.
370  */
371 static
372 void xprt_iter_rewind(struct rpc_xprt_iter *xpi)
373 {
374 	rcu_read_lock();
375 	xprt_iter_ops(xpi)->xpi_rewind(xpi);
376 	rcu_read_unlock();
377 }
378 
379 static void __xprt_iter_init(struct rpc_xprt_iter *xpi,
380 		struct rpc_xprt_switch *xps,
381 		const struct rpc_xprt_iter_ops *ops)
382 {
383 	rcu_assign_pointer(xpi->xpi_xpswitch, xprt_switch_get(xps));
384 	xpi->xpi_cursor = NULL;
385 	xpi->xpi_ops = ops;
386 }
387 
388 /**
389  * xprt_iter_init - Initialise an xprt iterator
390  * @xpi: pointer to rpc_xprt_iter
391  * @xps: pointer to rpc_xprt_switch
392  *
393  * Initialises the iterator to use the default iterator ops
394  * as set in xps. This function is mainly intended for internal
395  * use in the rpc_client.
396  */
397 void xprt_iter_init(struct rpc_xprt_iter *xpi,
398 		struct rpc_xprt_switch *xps)
399 {
400 	__xprt_iter_init(xpi, xps, NULL);
401 }
402 
403 /**
404  * xprt_iter_init_listall - Initialise an xprt iterator
405  * @xpi: pointer to rpc_xprt_iter
406  * @xps: pointer to rpc_xprt_switch
407  *
408  * Initialises the iterator to iterate once through the entire list
409  * of entries in xps.
410  */
411 void xprt_iter_init_listall(struct rpc_xprt_iter *xpi,
412 		struct rpc_xprt_switch *xps)
413 {
414 	__xprt_iter_init(xpi, xps, &rpc_xprt_iter_listall);
415 }
416 
417 /**
418  * xprt_iter_xchg_switch - Atomically swap out the rpc_xprt_switch
419  * @xpi: pointer to rpc_xprt_iter
420  * @newswitch: pointer to a new rpc_xprt_switch or NULL
421  *
422  * Swaps out the existing xpi->xpi_xpswitch with a new value.
423  */
424 struct rpc_xprt_switch *xprt_iter_xchg_switch(struct rpc_xprt_iter *xpi,
425 		struct rpc_xprt_switch *newswitch)
426 {
427 	struct rpc_xprt_switch __rcu *oldswitch;
428 
429 	/* Atomically swap out the old xpswitch */
430 	oldswitch = xchg(&xpi->xpi_xpswitch, RCU_INITIALIZER(newswitch));
431 	if (newswitch != NULL)
432 		xprt_iter_rewind(xpi);
433 	return rcu_dereference_protected(oldswitch, true);
434 }
435 
436 /**
437  * xprt_iter_destroy - Destroys the xprt iterator
438  * @xpi: pointer to rpc_xprt_iter
439  */
440 void xprt_iter_destroy(struct rpc_xprt_iter *xpi)
441 {
442 	xprt_switch_put(xprt_iter_xchg_switch(xpi, NULL));
443 }
444 
445 /**
446  * xprt_iter_xprt - Returns the rpc_xprt pointed to by the cursor
447  * @xpi: pointer to rpc_xprt_iter
448  *
449  * Returns a pointer to the struct rpc_xprt that is currently
450  * pointed to by the cursor.
451  * Caller must be holding rcu_read_lock().
452  */
453 struct rpc_xprt *xprt_iter_xprt(struct rpc_xprt_iter *xpi)
454 {
455 	WARN_ON_ONCE(!rcu_read_lock_held());
456 	return xprt_iter_ops(xpi)->xpi_xprt(xpi);
457 }
458 
459 static
460 struct rpc_xprt *xprt_iter_get_helper(struct rpc_xprt_iter *xpi,
461 		struct rpc_xprt *(*fn)(struct rpc_xprt_iter *))
462 {
463 	struct rpc_xprt *ret;
464 
465 	do {
466 		ret = fn(xpi);
467 		if (ret == NULL)
468 			break;
469 		ret = xprt_get(ret);
470 	} while (ret == NULL);
471 	return ret;
472 }
473 
474 /**
475  * xprt_iter_get_xprt - Returns the rpc_xprt pointed to by the cursor
476  * @xpi: pointer to rpc_xprt_iter
477  *
478  * Returns a reference to the struct rpc_xprt that is currently
479  * pointed to by the cursor.
480  */
481 struct rpc_xprt *xprt_iter_get_xprt(struct rpc_xprt_iter *xpi)
482 {
483 	struct rpc_xprt *xprt;
484 
485 	rcu_read_lock();
486 	xprt = xprt_iter_get_helper(xpi, xprt_iter_ops(xpi)->xpi_xprt);
487 	rcu_read_unlock();
488 	return xprt;
489 }
490 
491 /**
492  * xprt_iter_get_next - Returns the next rpc_xprt following the cursor
493  * @xpi: pointer to rpc_xprt_iter
494  *
495  * Returns a reference to the struct rpc_xprt that immediately follows the
496  * entry pointed to by the cursor.
497  */
498 struct rpc_xprt *xprt_iter_get_next(struct rpc_xprt_iter *xpi)
499 {
500 	struct rpc_xprt *xprt;
501 
502 	rcu_read_lock();
503 	xprt = xprt_iter_get_helper(xpi, xprt_iter_ops(xpi)->xpi_next);
504 	rcu_read_unlock();
505 	return xprt;
506 }
507 
508 /* Policy for always returning the first entry in the rpc_xprt_switch */
509 static
510 const struct rpc_xprt_iter_ops rpc_xprt_iter_singular = {
511 	.xpi_rewind = xprt_iter_no_rewind,
512 	.xpi_xprt = xprt_iter_first_entry,
513 	.xpi_next = xprt_iter_first_entry,
514 };
515 
516 /* Policy for round-robin iteration of entries in the rpc_xprt_switch */
517 static
518 const struct rpc_xprt_iter_ops rpc_xprt_iter_roundrobin = {
519 	.xpi_rewind = xprt_iter_default_rewind,
520 	.xpi_xprt = xprt_iter_current_entry,
521 	.xpi_next = xprt_iter_next_entry_roundrobin,
522 };
523 
524 /* Policy for once-through iteration of entries in the rpc_xprt_switch */
525 static
526 const struct rpc_xprt_iter_ops rpc_xprt_iter_listall = {
527 	.xpi_rewind = xprt_iter_default_rewind,
528 	.xpi_xprt = xprt_iter_current_entry,
529 	.xpi_next = xprt_iter_next_entry_all,
530 };
531