xref: /openbmc/linux/net/sunrpc/xprtmultipath.c (revision 587bc725)
1 // SPDX-License-Identifier: GPL-2.0
2 /*
3  * Multipath support for RPC
4  *
5  * Copyright (c) 2015, 2016, Primary Data, Inc. All rights reserved.
6  *
7  * Trond Myklebust <trond.myklebust@primarydata.com>
8  *
9  */
10 #include <linux/types.h>
11 #include <linux/kref.h>
12 #include <linux/list.h>
13 #include <linux/rcupdate.h>
14 #include <linux/rculist.h>
15 #include <linux/slab.h>
16 #include <asm/cmpxchg.h>
17 #include <linux/spinlock.h>
18 #include <linux/sunrpc/xprt.h>
19 #include <linux/sunrpc/addr.h>
20 #include <linux/sunrpc/xprtmultipath.h>
21 
22 #include "sysfs.h"
23 
24 typedef struct rpc_xprt *(*xprt_switch_find_xprt_t)(struct rpc_xprt_switch *xps,
25 		const struct rpc_xprt *cur);
26 
27 static const struct rpc_xprt_iter_ops rpc_xprt_iter_singular;
28 static const struct rpc_xprt_iter_ops rpc_xprt_iter_roundrobin;
29 static const struct rpc_xprt_iter_ops rpc_xprt_iter_listall;
30 
31 static void xprt_switch_add_xprt_locked(struct rpc_xprt_switch *xps,
32 		struct rpc_xprt *xprt)
33 {
34 	if (unlikely(xprt_get(xprt) == NULL))
35 		return;
36 	list_add_tail_rcu(&xprt->xprt_switch, &xps->xps_xprt_list);
37 	smp_wmb();
38 	if (xps->xps_nxprts == 0)
39 		xps->xps_net = xprt->xprt_net;
40 	xps->xps_nxprts++;
41 	xps->xps_nactive++;
42 }
43 
44 /**
45  * rpc_xprt_switch_add_xprt - Add a new rpc_xprt to an rpc_xprt_switch
46  * @xps: pointer to struct rpc_xprt_switch
47  * @xprt: pointer to struct rpc_xprt
48  *
49  * Adds xprt to the end of the list of struct rpc_xprt in xps.
50  */
51 void rpc_xprt_switch_add_xprt(struct rpc_xprt_switch *xps,
52 		struct rpc_xprt *xprt)
53 {
54 	if (xprt == NULL)
55 		return;
56 	spin_lock(&xps->xps_lock);
57 	if (xps->xps_net == xprt->xprt_net || xps->xps_net == NULL)
58 		xprt_switch_add_xprt_locked(xps, xprt);
59 	spin_unlock(&xps->xps_lock);
60 	rpc_sysfs_xprt_setup(xps, xprt, GFP_KERNEL);
61 }
62 
63 static void xprt_switch_remove_xprt_locked(struct rpc_xprt_switch *xps,
64 		struct rpc_xprt *xprt)
65 {
66 	if (unlikely(xprt == NULL))
67 		return;
68 	xps->xps_nactive--;
69 	xps->xps_nxprts--;
70 	if (xps->xps_nxprts == 0)
71 		xps->xps_net = NULL;
72 	smp_wmb();
73 	list_del_rcu(&xprt->xprt_switch);
74 }
75 
76 /**
77  * rpc_xprt_switch_remove_xprt - Removes an rpc_xprt from a rpc_xprt_switch
78  * @xps: pointer to struct rpc_xprt_switch
79  * @xprt: pointer to struct rpc_xprt
80  *
81  * Removes xprt from the list of struct rpc_xprt in xps.
82  */
83 void rpc_xprt_switch_remove_xprt(struct rpc_xprt_switch *xps,
84 		struct rpc_xprt *xprt)
85 {
86 	spin_lock(&xps->xps_lock);
87 	xprt_switch_remove_xprt_locked(xps, xprt);
88 	spin_unlock(&xps->xps_lock);
89 	xprt_put(xprt);
90 }
91 
92 static DEFINE_IDA(rpc_xprtswitch_ids);
93 
94 void xprt_multipath_cleanup_ids(void)
95 {
96 	ida_destroy(&rpc_xprtswitch_ids);
97 }
98 
99 static int xprt_switch_alloc_id(struct rpc_xprt_switch *xps, gfp_t gfp_flags)
100 {
101 	int id;
102 
103 	id = ida_simple_get(&rpc_xprtswitch_ids, 0, 0, gfp_flags);
104 	if (id < 0)
105 		return id;
106 
107 	xps->xps_id = id;
108 	return 0;
109 }
110 
111 static void xprt_switch_free_id(struct rpc_xprt_switch *xps)
112 {
113 	ida_simple_remove(&rpc_xprtswitch_ids, xps->xps_id);
114 }
115 
116 /**
117  * xprt_switch_alloc - Allocate a new struct rpc_xprt_switch
118  * @xprt: pointer to struct rpc_xprt
119  * @gfp_flags: allocation flags
120  *
121  * On success, returns an initialised struct rpc_xprt_switch, containing
122  * the entry xprt. Returns NULL on failure.
123  */
124 struct rpc_xprt_switch *xprt_switch_alloc(struct rpc_xprt *xprt,
125 		gfp_t gfp_flags)
126 {
127 	struct rpc_xprt_switch *xps;
128 
129 	xps = kmalloc(sizeof(*xps), gfp_flags);
130 	if (xps != NULL) {
131 		spin_lock_init(&xps->xps_lock);
132 		kref_init(&xps->xps_kref);
133 		xprt_switch_alloc_id(xps, gfp_flags);
134 		xps->xps_nxprts = xps->xps_nactive = 0;
135 		atomic_long_set(&xps->xps_queuelen, 0);
136 		xps->xps_net = NULL;
137 		INIT_LIST_HEAD(&xps->xps_xprt_list);
138 		xps->xps_iter_ops = &rpc_xprt_iter_singular;
139 		rpc_sysfs_xprt_switch_setup(xps, xprt, gfp_flags);
140 		xprt_switch_add_xprt_locked(xps, xprt);
141 		rpc_sysfs_xprt_setup(xps, xprt, gfp_flags);
142 	}
143 
144 	return xps;
145 }
146 
147 static void xprt_switch_free_entries(struct rpc_xprt_switch *xps)
148 {
149 	spin_lock(&xps->xps_lock);
150 	while (!list_empty(&xps->xps_xprt_list)) {
151 		struct rpc_xprt *xprt;
152 
153 		xprt = list_first_entry(&xps->xps_xprt_list,
154 				struct rpc_xprt, xprt_switch);
155 		xprt_switch_remove_xprt_locked(xps, xprt);
156 		spin_unlock(&xps->xps_lock);
157 		xprt_put(xprt);
158 		spin_lock(&xps->xps_lock);
159 	}
160 	spin_unlock(&xps->xps_lock);
161 }
162 
163 static void xprt_switch_free(struct kref *kref)
164 {
165 	struct rpc_xprt_switch *xps = container_of(kref,
166 			struct rpc_xprt_switch, xps_kref);
167 
168 	xprt_switch_free_entries(xps);
169 	rpc_sysfs_xprt_switch_destroy(xps);
170 	xprt_switch_free_id(xps);
171 	kfree_rcu(xps, xps_rcu);
172 }
173 
174 /**
175  * xprt_switch_get - Return a reference to a rpc_xprt_switch
176  * @xps: pointer to struct rpc_xprt_switch
177  *
178  * Returns a reference to xps unless the refcount is already zero.
179  */
180 struct rpc_xprt_switch *xprt_switch_get(struct rpc_xprt_switch *xps)
181 {
182 	if (xps != NULL && kref_get_unless_zero(&xps->xps_kref))
183 		return xps;
184 	return NULL;
185 }
186 
187 /**
188  * xprt_switch_put - Release a reference to a rpc_xprt_switch
189  * @xps: pointer to struct rpc_xprt_switch
190  *
191  * Release the reference to xps, and free it once the refcount is zero.
192  */
193 void xprt_switch_put(struct rpc_xprt_switch *xps)
194 {
195 	if (xps != NULL)
196 		kref_put(&xps->xps_kref, xprt_switch_free);
197 }
198 
199 /**
200  * rpc_xprt_switch_set_roundrobin - Set a round-robin policy on rpc_xprt_switch
201  * @xps: pointer to struct rpc_xprt_switch
202  *
203  * Sets a round-robin default policy for iterators acting on xps.
204  */
205 void rpc_xprt_switch_set_roundrobin(struct rpc_xprt_switch *xps)
206 {
207 	if (READ_ONCE(xps->xps_iter_ops) != &rpc_xprt_iter_roundrobin)
208 		WRITE_ONCE(xps->xps_iter_ops, &rpc_xprt_iter_roundrobin);
209 }
210 
211 static
212 const struct rpc_xprt_iter_ops *xprt_iter_ops(const struct rpc_xprt_iter *xpi)
213 {
214 	if (xpi->xpi_ops != NULL)
215 		return xpi->xpi_ops;
216 	return rcu_dereference(xpi->xpi_xpswitch)->xps_iter_ops;
217 }
218 
219 static
220 void xprt_iter_no_rewind(struct rpc_xprt_iter *xpi)
221 {
222 }
223 
224 static
225 void xprt_iter_default_rewind(struct rpc_xprt_iter *xpi)
226 {
227 	WRITE_ONCE(xpi->xpi_cursor, NULL);
228 }
229 
230 static
231 bool xprt_is_active(const struct rpc_xprt *xprt)
232 {
233 	return kref_read(&xprt->kref) != 0;
234 }
235 
236 static
237 struct rpc_xprt *xprt_switch_find_first_entry(struct list_head *head)
238 {
239 	struct rpc_xprt *pos;
240 
241 	list_for_each_entry_rcu(pos, head, xprt_switch) {
242 		if (xprt_is_active(pos))
243 			return pos;
244 	}
245 	return NULL;
246 }
247 
248 static
249 struct rpc_xprt *xprt_iter_first_entry(struct rpc_xprt_iter *xpi)
250 {
251 	struct rpc_xprt_switch *xps = rcu_dereference(xpi->xpi_xpswitch);
252 
253 	if (xps == NULL)
254 		return NULL;
255 	return xprt_switch_find_first_entry(&xps->xps_xprt_list);
256 }
257 
258 static
259 struct rpc_xprt *xprt_switch_find_current_entry(struct list_head *head,
260 		const struct rpc_xprt *cur)
261 {
262 	struct rpc_xprt *pos;
263 	bool found = false;
264 
265 	list_for_each_entry_rcu(pos, head, xprt_switch) {
266 		if (cur == pos)
267 			found = true;
268 		if (found && xprt_is_active(pos))
269 			return pos;
270 	}
271 	return NULL;
272 }
273 
274 static
275 struct rpc_xprt *xprt_iter_current_entry(struct rpc_xprt_iter *xpi)
276 {
277 	struct rpc_xprt_switch *xps = rcu_dereference(xpi->xpi_xpswitch);
278 	struct list_head *head;
279 
280 	if (xps == NULL)
281 		return NULL;
282 	head = &xps->xps_xprt_list;
283 	if (xpi->xpi_cursor == NULL || xps->xps_nxprts < 2)
284 		return xprt_switch_find_first_entry(head);
285 	return xprt_switch_find_current_entry(head, xpi->xpi_cursor);
286 }
287 
288 bool rpc_xprt_switch_has_addr(struct rpc_xprt_switch *xps,
289 			      const struct sockaddr *sap)
290 {
291 	struct list_head *head;
292 	struct rpc_xprt *pos;
293 
294 	if (xps == NULL || sap == NULL)
295 		return false;
296 
297 	head = &xps->xps_xprt_list;
298 	list_for_each_entry_rcu(pos, head, xprt_switch) {
299 		if (rpc_cmp_addr_port(sap, (struct sockaddr *)&pos->addr)) {
300 			pr_info("RPC:   addr %s already in xprt switch\n",
301 				pos->address_strings[RPC_DISPLAY_ADDR]);
302 			return true;
303 		}
304 	}
305 	return false;
306 }
307 
308 static
309 struct rpc_xprt *xprt_switch_find_next_entry(struct list_head *head,
310 		const struct rpc_xprt *cur)
311 {
312 	struct rpc_xprt *pos, *prev = NULL;
313 	bool found = false;
314 
315 	list_for_each_entry_rcu(pos, head, xprt_switch) {
316 		if (cur == prev)
317 			found = true;
318 		if (found && xprt_is_active(pos))
319 			return pos;
320 		prev = pos;
321 	}
322 	return NULL;
323 }
324 
325 static
326 struct rpc_xprt *xprt_switch_set_next_cursor(struct rpc_xprt_switch *xps,
327 		struct rpc_xprt **cursor,
328 		xprt_switch_find_xprt_t find_next)
329 {
330 	struct rpc_xprt *pos, *old;
331 
332 	old = smp_load_acquire(cursor);
333 	pos = find_next(xps, old);
334 	smp_store_release(cursor, pos);
335 	return pos;
336 }
337 
338 static
339 struct rpc_xprt *xprt_iter_next_entry_multiple(struct rpc_xprt_iter *xpi,
340 		xprt_switch_find_xprt_t find_next)
341 {
342 	struct rpc_xprt_switch *xps = rcu_dereference(xpi->xpi_xpswitch);
343 
344 	if (xps == NULL)
345 		return NULL;
346 	return xprt_switch_set_next_cursor(xps, &xpi->xpi_cursor, find_next);
347 }
348 
349 static
350 struct rpc_xprt *__xprt_switch_find_next_entry_roundrobin(struct list_head *head,
351 		const struct rpc_xprt *cur)
352 {
353 	struct rpc_xprt *ret;
354 
355 	ret = xprt_switch_find_next_entry(head, cur);
356 	if (ret != NULL)
357 		return ret;
358 	return xprt_switch_find_first_entry(head);
359 }
360 
361 static
362 struct rpc_xprt *xprt_switch_find_next_entry_roundrobin(struct rpc_xprt_switch *xps,
363 		const struct rpc_xprt *cur)
364 {
365 	struct list_head *head = &xps->xps_xprt_list;
366 	struct rpc_xprt *xprt;
367 	unsigned int nactive;
368 
369 	for (;;) {
370 		unsigned long xprt_queuelen, xps_queuelen;
371 
372 		xprt = __xprt_switch_find_next_entry_roundrobin(head, cur);
373 		if (!xprt)
374 			break;
375 		xprt_queuelen = atomic_long_read(&xprt->queuelen);
376 		xps_queuelen = atomic_long_read(&xps->xps_queuelen);
377 		nactive = READ_ONCE(xps->xps_nactive);
378 		/* Exit loop if xprt_queuelen <= average queue length */
379 		if (xprt_queuelen * nactive <= xps_queuelen)
380 			break;
381 		cur = xprt;
382 	}
383 	return xprt;
384 }
385 
386 static
387 struct rpc_xprt *xprt_iter_next_entry_roundrobin(struct rpc_xprt_iter *xpi)
388 {
389 	return xprt_iter_next_entry_multiple(xpi,
390 			xprt_switch_find_next_entry_roundrobin);
391 }
392 
393 static
394 struct rpc_xprt *xprt_switch_find_next_entry_all(struct rpc_xprt_switch *xps,
395 		const struct rpc_xprt *cur)
396 {
397 	return xprt_switch_find_next_entry(&xps->xps_xprt_list, cur);
398 }
399 
400 static
401 struct rpc_xprt *xprt_iter_next_entry_all(struct rpc_xprt_iter *xpi)
402 {
403 	return xprt_iter_next_entry_multiple(xpi,
404 			xprt_switch_find_next_entry_all);
405 }
406 
407 /*
408  * xprt_iter_rewind - Resets the xprt iterator
409  * @xpi: pointer to rpc_xprt_iter
410  *
411  * Resets xpi to ensure that it points to the first entry in the list
412  * of transports.
413  */
414 static
415 void xprt_iter_rewind(struct rpc_xprt_iter *xpi)
416 {
417 	rcu_read_lock();
418 	xprt_iter_ops(xpi)->xpi_rewind(xpi);
419 	rcu_read_unlock();
420 }
421 
422 static void __xprt_iter_init(struct rpc_xprt_iter *xpi,
423 		struct rpc_xprt_switch *xps,
424 		const struct rpc_xprt_iter_ops *ops)
425 {
426 	rcu_assign_pointer(xpi->xpi_xpswitch, xprt_switch_get(xps));
427 	xpi->xpi_cursor = NULL;
428 	xpi->xpi_ops = ops;
429 }
430 
431 /**
432  * xprt_iter_init - Initialise an xprt iterator
433  * @xpi: pointer to rpc_xprt_iter
434  * @xps: pointer to rpc_xprt_switch
435  *
436  * Initialises the iterator to use the default iterator ops
437  * as set in xps. This function is mainly intended for internal
438  * use in the rpc_client.
439  */
440 void xprt_iter_init(struct rpc_xprt_iter *xpi,
441 		struct rpc_xprt_switch *xps)
442 {
443 	__xprt_iter_init(xpi, xps, NULL);
444 }
445 
446 /**
447  * xprt_iter_init_listall - Initialise an xprt iterator
448  * @xpi: pointer to rpc_xprt_iter
449  * @xps: pointer to rpc_xprt_switch
450  *
451  * Initialises the iterator to iterate once through the entire list
452  * of entries in xps.
453  */
454 void xprt_iter_init_listall(struct rpc_xprt_iter *xpi,
455 		struct rpc_xprt_switch *xps)
456 {
457 	__xprt_iter_init(xpi, xps, &rpc_xprt_iter_listall);
458 }
459 
460 /**
461  * xprt_iter_xchg_switch - Atomically swap out the rpc_xprt_switch
462  * @xpi: pointer to rpc_xprt_iter
463  * @newswitch: pointer to a new rpc_xprt_switch or NULL
464  *
465  * Swaps out the existing xpi->xpi_xpswitch with a new value.
466  */
467 struct rpc_xprt_switch *xprt_iter_xchg_switch(struct rpc_xprt_iter *xpi,
468 		struct rpc_xprt_switch *newswitch)
469 {
470 	struct rpc_xprt_switch __rcu *oldswitch;
471 
472 	/* Atomically swap out the old xpswitch */
473 	oldswitch = xchg(&xpi->xpi_xpswitch, RCU_INITIALIZER(newswitch));
474 	if (newswitch != NULL)
475 		xprt_iter_rewind(xpi);
476 	return rcu_dereference_protected(oldswitch, true);
477 }
478 
479 /**
480  * xprt_iter_destroy - Destroys the xprt iterator
481  * @xpi: pointer to rpc_xprt_iter
482  */
483 void xprt_iter_destroy(struct rpc_xprt_iter *xpi)
484 {
485 	xprt_switch_put(xprt_iter_xchg_switch(xpi, NULL));
486 }
487 
488 /**
489  * xprt_iter_xprt - Returns the rpc_xprt pointed to by the cursor
490  * @xpi: pointer to rpc_xprt_iter
491  *
492  * Returns a pointer to the struct rpc_xprt that is currently
493  * pointed to by the cursor.
494  * Caller must be holding rcu_read_lock().
495  */
496 struct rpc_xprt *xprt_iter_xprt(struct rpc_xprt_iter *xpi)
497 {
498 	WARN_ON_ONCE(!rcu_read_lock_held());
499 	return xprt_iter_ops(xpi)->xpi_xprt(xpi);
500 }
501 
502 static
503 struct rpc_xprt *xprt_iter_get_helper(struct rpc_xprt_iter *xpi,
504 		struct rpc_xprt *(*fn)(struct rpc_xprt_iter *))
505 {
506 	struct rpc_xprt *ret;
507 
508 	do {
509 		ret = fn(xpi);
510 		if (ret == NULL)
511 			break;
512 		ret = xprt_get(ret);
513 	} while (ret == NULL);
514 	return ret;
515 }
516 
517 /**
518  * xprt_iter_get_xprt - Returns the rpc_xprt pointed to by the cursor
519  * @xpi: pointer to rpc_xprt_iter
520  *
521  * Returns a reference to the struct rpc_xprt that is currently
522  * pointed to by the cursor.
523  */
524 struct rpc_xprt *xprt_iter_get_xprt(struct rpc_xprt_iter *xpi)
525 {
526 	struct rpc_xprt *xprt;
527 
528 	rcu_read_lock();
529 	xprt = xprt_iter_get_helper(xpi, xprt_iter_ops(xpi)->xpi_xprt);
530 	rcu_read_unlock();
531 	return xprt;
532 }
533 
534 /**
535  * xprt_iter_get_next - Returns the next rpc_xprt following the cursor
536  * @xpi: pointer to rpc_xprt_iter
537  *
538  * Returns a reference to the struct rpc_xprt that immediately follows the
539  * entry pointed to by the cursor.
540  */
541 struct rpc_xprt *xprt_iter_get_next(struct rpc_xprt_iter *xpi)
542 {
543 	struct rpc_xprt *xprt;
544 
545 	rcu_read_lock();
546 	xprt = xprt_iter_get_helper(xpi, xprt_iter_ops(xpi)->xpi_next);
547 	rcu_read_unlock();
548 	return xprt;
549 }
550 
551 /* Policy for always returning the first entry in the rpc_xprt_switch */
552 static
553 const struct rpc_xprt_iter_ops rpc_xprt_iter_singular = {
554 	.xpi_rewind = xprt_iter_no_rewind,
555 	.xpi_xprt = xprt_iter_first_entry,
556 	.xpi_next = xprt_iter_first_entry,
557 };
558 
559 /* Policy for round-robin iteration of entries in the rpc_xprt_switch */
560 static
561 const struct rpc_xprt_iter_ops rpc_xprt_iter_roundrobin = {
562 	.xpi_rewind = xprt_iter_default_rewind,
563 	.xpi_xprt = xprt_iter_current_entry,
564 	.xpi_next = xprt_iter_next_entry_roundrobin,
565 };
566 
567 /* Policy for once-through iteration of entries in the rpc_xprt_switch */
568 static
569 const struct rpc_xprt_iter_ops rpc_xprt_iter_listall = {
570 	.xpi_rewind = xprt_iter_default_rewind,
571 	.xpi_xprt = xprt_iter_current_entry,
572 	.xpi_next = xprt_iter_next_entry_all,
573 };
574