1 // SPDX-License-Identifier: GPL-2.0 2 /* 3 * Multipath support for RPC 4 * 5 * Copyright (c) 2015, 2016, Primary Data, Inc. All rights reserved. 6 * 7 * Trond Myklebust <trond.myklebust@primarydata.com> 8 * 9 */ 10 #include <linux/types.h> 11 #include <linux/kref.h> 12 #include <linux/list.h> 13 #include <linux/rcupdate.h> 14 #include <linux/rculist.h> 15 #include <linux/slab.h> 16 #include <asm/cmpxchg.h> 17 #include <linux/spinlock.h> 18 #include <linux/sunrpc/xprt.h> 19 #include <linux/sunrpc/addr.h> 20 #include <linux/sunrpc/xprtmultipath.h> 21 22 typedef struct rpc_xprt *(*xprt_switch_find_xprt_t)(struct list_head *head, 23 const struct rpc_xprt *cur); 24 25 static const struct rpc_xprt_iter_ops rpc_xprt_iter_singular; 26 static const struct rpc_xprt_iter_ops rpc_xprt_iter_roundrobin; 27 static const struct rpc_xprt_iter_ops rpc_xprt_iter_listall; 28 29 static void xprt_switch_add_xprt_locked(struct rpc_xprt_switch *xps, 30 struct rpc_xprt *xprt) 31 { 32 if (unlikely(xprt_get(xprt) == NULL)) 33 return; 34 list_add_tail_rcu(&xprt->xprt_switch, &xps->xps_xprt_list); 35 smp_wmb(); 36 if (xps->xps_nxprts == 0) 37 xps->xps_net = xprt->xprt_net; 38 xps->xps_nxprts++; 39 xps->xps_nactive++; 40 } 41 42 /** 43 * rpc_xprt_switch_add_xprt - Add a new rpc_xprt to an rpc_xprt_switch 44 * @xps: pointer to struct rpc_xprt_switch 45 * @xprt: pointer to struct rpc_xprt 46 * 47 * Adds xprt to the end of the list of struct rpc_xprt in xps. 48 */ 49 void rpc_xprt_switch_add_xprt(struct rpc_xprt_switch *xps, 50 struct rpc_xprt *xprt) 51 { 52 if (xprt == NULL) 53 return; 54 spin_lock(&xps->xps_lock); 55 if (xps->xps_net == xprt->xprt_net || xps->xps_net == NULL) 56 xprt_switch_add_xprt_locked(xps, xprt); 57 spin_unlock(&xps->xps_lock); 58 } 59 60 static void xprt_switch_remove_xprt_locked(struct rpc_xprt_switch *xps, 61 struct rpc_xprt *xprt) 62 { 63 if (unlikely(xprt == NULL)) 64 return; 65 xps->xps_nactive--; 66 xps->xps_nxprts--; 67 if (xps->xps_nxprts == 0) 68 xps->xps_net = NULL; 69 smp_wmb(); 70 list_del_rcu(&xprt->xprt_switch); 71 } 72 73 /** 74 * rpc_xprt_switch_remove_xprt - Removes an rpc_xprt from a rpc_xprt_switch 75 * @xps: pointer to struct rpc_xprt_switch 76 * @xprt: pointer to struct rpc_xprt 77 * 78 * Removes xprt from the list of struct rpc_xprt in xps. 79 */ 80 void rpc_xprt_switch_remove_xprt(struct rpc_xprt_switch *xps, 81 struct rpc_xprt *xprt) 82 { 83 spin_lock(&xps->xps_lock); 84 xprt_switch_remove_xprt_locked(xps, xprt); 85 spin_unlock(&xps->xps_lock); 86 xprt_put(xprt); 87 } 88 89 /** 90 * xprt_switch_alloc - Allocate a new struct rpc_xprt_switch 91 * @xprt: pointer to struct rpc_xprt 92 * @gfp_flags: allocation flags 93 * 94 * On success, returns an initialised struct rpc_xprt_switch, containing 95 * the entry xprt. Returns NULL on failure. 96 */ 97 struct rpc_xprt_switch *xprt_switch_alloc(struct rpc_xprt *xprt, 98 gfp_t gfp_flags) 99 { 100 struct rpc_xprt_switch *xps; 101 102 xps = kmalloc(sizeof(*xps), gfp_flags); 103 if (xps != NULL) { 104 spin_lock_init(&xps->xps_lock); 105 kref_init(&xps->xps_kref); 106 xps->xps_nxprts = 0; 107 INIT_LIST_HEAD(&xps->xps_xprt_list); 108 xps->xps_iter_ops = &rpc_xprt_iter_singular; 109 xprt_switch_add_xprt_locked(xps, xprt); 110 } 111 112 return xps; 113 } 114 115 static void xprt_switch_free_entries(struct rpc_xprt_switch *xps) 116 { 117 spin_lock(&xps->xps_lock); 118 while (!list_empty(&xps->xps_xprt_list)) { 119 struct rpc_xprt *xprt; 120 121 xprt = list_first_entry(&xps->xps_xprt_list, 122 struct rpc_xprt, xprt_switch); 123 xprt_switch_remove_xprt_locked(xps, xprt); 124 spin_unlock(&xps->xps_lock); 125 xprt_put(xprt); 126 spin_lock(&xps->xps_lock); 127 } 128 spin_unlock(&xps->xps_lock); 129 } 130 131 static void xprt_switch_free(struct kref *kref) 132 { 133 struct rpc_xprt_switch *xps = container_of(kref, 134 struct rpc_xprt_switch, xps_kref); 135 136 xprt_switch_free_entries(xps); 137 kfree_rcu(xps, xps_rcu); 138 } 139 140 /** 141 * xprt_switch_get - Return a reference to a rpc_xprt_switch 142 * @xps: pointer to struct rpc_xprt_switch 143 * 144 * Returns a reference to xps unless the refcount is already zero. 145 */ 146 struct rpc_xprt_switch *xprt_switch_get(struct rpc_xprt_switch *xps) 147 { 148 if (xps != NULL && kref_get_unless_zero(&xps->xps_kref)) 149 return xps; 150 return NULL; 151 } 152 153 /** 154 * xprt_switch_put - Release a reference to a rpc_xprt_switch 155 * @xps: pointer to struct rpc_xprt_switch 156 * 157 * Release the reference to xps, and free it once the refcount is zero. 158 */ 159 void xprt_switch_put(struct rpc_xprt_switch *xps) 160 { 161 if (xps != NULL) 162 kref_put(&xps->xps_kref, xprt_switch_free); 163 } 164 165 /** 166 * rpc_xprt_switch_set_roundrobin - Set a round-robin policy on rpc_xprt_switch 167 * @xps: pointer to struct rpc_xprt_switch 168 * 169 * Sets a round-robin default policy for iterators acting on xps. 170 */ 171 void rpc_xprt_switch_set_roundrobin(struct rpc_xprt_switch *xps) 172 { 173 if (READ_ONCE(xps->xps_iter_ops) != &rpc_xprt_iter_roundrobin) 174 WRITE_ONCE(xps->xps_iter_ops, &rpc_xprt_iter_roundrobin); 175 } 176 177 static 178 const struct rpc_xprt_iter_ops *xprt_iter_ops(const struct rpc_xprt_iter *xpi) 179 { 180 if (xpi->xpi_ops != NULL) 181 return xpi->xpi_ops; 182 return rcu_dereference(xpi->xpi_xpswitch)->xps_iter_ops; 183 } 184 185 static 186 void xprt_iter_no_rewind(struct rpc_xprt_iter *xpi) 187 { 188 } 189 190 static 191 void xprt_iter_default_rewind(struct rpc_xprt_iter *xpi) 192 { 193 WRITE_ONCE(xpi->xpi_cursor, NULL); 194 } 195 196 static 197 struct rpc_xprt *xprt_switch_find_first_entry(struct list_head *head) 198 { 199 return list_first_or_null_rcu(head, struct rpc_xprt, xprt_switch); 200 } 201 202 static 203 struct rpc_xprt *xprt_iter_first_entry(struct rpc_xprt_iter *xpi) 204 { 205 struct rpc_xprt_switch *xps = rcu_dereference(xpi->xpi_xpswitch); 206 207 if (xps == NULL) 208 return NULL; 209 return xprt_switch_find_first_entry(&xps->xps_xprt_list); 210 } 211 212 static 213 struct rpc_xprt *xprt_switch_find_current_entry(struct list_head *head, 214 const struct rpc_xprt *cur) 215 { 216 struct rpc_xprt *pos; 217 218 list_for_each_entry_rcu(pos, head, xprt_switch) { 219 if (cur == pos) 220 return pos; 221 } 222 return NULL; 223 } 224 225 static 226 struct rpc_xprt *xprt_iter_current_entry(struct rpc_xprt_iter *xpi) 227 { 228 struct rpc_xprt_switch *xps = rcu_dereference(xpi->xpi_xpswitch); 229 struct list_head *head; 230 231 if (xps == NULL) 232 return NULL; 233 head = &xps->xps_xprt_list; 234 if (xpi->xpi_cursor == NULL || xps->xps_nxprts < 2) 235 return xprt_switch_find_first_entry(head); 236 return xprt_switch_find_current_entry(head, xpi->xpi_cursor); 237 } 238 239 bool rpc_xprt_switch_has_addr(struct rpc_xprt_switch *xps, 240 const struct sockaddr *sap) 241 { 242 struct list_head *head; 243 struct rpc_xprt *pos; 244 245 if (xps == NULL || sap == NULL) 246 return false; 247 248 head = &xps->xps_xprt_list; 249 list_for_each_entry_rcu(pos, head, xprt_switch) { 250 if (rpc_cmp_addr_port(sap, (struct sockaddr *)&pos->addr)) { 251 pr_info("RPC: addr %s already in xprt switch\n", 252 pos->address_strings[RPC_DISPLAY_ADDR]); 253 return true; 254 } 255 } 256 return false; 257 } 258 259 static 260 struct rpc_xprt *xprt_switch_find_next_entry(struct list_head *head, 261 const struct rpc_xprt *cur) 262 { 263 struct rpc_xprt *pos, *prev = NULL; 264 265 list_for_each_entry_rcu(pos, head, xprt_switch) { 266 if (cur == prev) 267 return pos; 268 prev = pos; 269 } 270 return NULL; 271 } 272 273 static 274 struct rpc_xprt *xprt_switch_set_next_cursor(struct list_head *head, 275 struct rpc_xprt **cursor, 276 xprt_switch_find_xprt_t find_next) 277 { 278 struct rpc_xprt *cur, *pos, *old; 279 280 cur = READ_ONCE(*cursor); 281 for (;;) { 282 old = cur; 283 pos = find_next(head, old); 284 if (pos == NULL) 285 break; 286 cur = cmpxchg_relaxed(cursor, old, pos); 287 if (cur == old) 288 break; 289 } 290 return pos; 291 } 292 293 static 294 struct rpc_xprt *xprt_iter_next_entry_multiple(struct rpc_xprt_iter *xpi, 295 xprt_switch_find_xprt_t find_next) 296 { 297 struct rpc_xprt_switch *xps = rcu_dereference(xpi->xpi_xpswitch); 298 299 if (xps == NULL) 300 return NULL; 301 return xprt_switch_set_next_cursor(&xps->xps_xprt_list, 302 &xpi->xpi_cursor, 303 find_next); 304 } 305 306 static 307 struct rpc_xprt *xprt_switch_find_next_entry_roundrobin(struct list_head *head, 308 const struct rpc_xprt *cur) 309 { 310 struct rpc_xprt *ret; 311 312 ret = xprt_switch_find_next_entry(head, cur); 313 if (ret != NULL) 314 return ret; 315 return xprt_switch_find_first_entry(head); 316 } 317 318 static 319 struct rpc_xprt *xprt_iter_next_entry_roundrobin(struct rpc_xprt_iter *xpi) 320 { 321 struct rpc_xprt_switch *xps = rcu_dereference(xpi->xpi_xpswitch); 322 struct rpc_xprt *xprt; 323 unsigned long xprt_queuelen; 324 unsigned long xps_queuelen; 325 326 do { 327 xprt = xprt_iter_next_entry_multiple(xpi, 328 xprt_switch_find_next_entry_roundrobin); 329 if (xprt == NULL) 330 break; 331 xprt_queuelen = atomic_long_read(&xprt->queuelen); 332 if (xprt_queuelen <= 2) 333 break; 334 xps_queuelen = atomic_long_read(&xps->xps_queuelen); 335 /* Exit loop if xprt_queuelen <= average queue length */ 336 } while (xprt_queuelen * READ_ONCE(xps->xps_nactive) > xps_queuelen); 337 return xprt; 338 } 339 340 static 341 struct rpc_xprt *xprt_iter_next_entry_all(struct rpc_xprt_iter *xpi) 342 { 343 return xprt_iter_next_entry_multiple(xpi, xprt_switch_find_next_entry); 344 } 345 346 /* 347 * xprt_iter_rewind - Resets the xprt iterator 348 * @xpi: pointer to rpc_xprt_iter 349 * 350 * Resets xpi to ensure that it points to the first entry in the list 351 * of transports. 352 */ 353 static 354 void xprt_iter_rewind(struct rpc_xprt_iter *xpi) 355 { 356 rcu_read_lock(); 357 xprt_iter_ops(xpi)->xpi_rewind(xpi); 358 rcu_read_unlock(); 359 } 360 361 static void __xprt_iter_init(struct rpc_xprt_iter *xpi, 362 struct rpc_xprt_switch *xps, 363 const struct rpc_xprt_iter_ops *ops) 364 { 365 rcu_assign_pointer(xpi->xpi_xpswitch, xprt_switch_get(xps)); 366 xpi->xpi_cursor = NULL; 367 xpi->xpi_ops = ops; 368 } 369 370 /** 371 * xprt_iter_init - Initialise an xprt iterator 372 * @xpi: pointer to rpc_xprt_iter 373 * @xps: pointer to rpc_xprt_switch 374 * 375 * Initialises the iterator to use the default iterator ops 376 * as set in xps. This function is mainly intended for internal 377 * use in the rpc_client. 378 */ 379 void xprt_iter_init(struct rpc_xprt_iter *xpi, 380 struct rpc_xprt_switch *xps) 381 { 382 __xprt_iter_init(xpi, xps, NULL); 383 } 384 385 /** 386 * xprt_iter_init_listall - Initialise an xprt iterator 387 * @xpi: pointer to rpc_xprt_iter 388 * @xps: pointer to rpc_xprt_switch 389 * 390 * Initialises the iterator to iterate once through the entire list 391 * of entries in xps. 392 */ 393 void xprt_iter_init_listall(struct rpc_xprt_iter *xpi, 394 struct rpc_xprt_switch *xps) 395 { 396 __xprt_iter_init(xpi, xps, &rpc_xprt_iter_listall); 397 } 398 399 /** 400 * xprt_iter_xchg_switch - Atomically swap out the rpc_xprt_switch 401 * @xpi: pointer to rpc_xprt_iter 402 * @newswitch: pointer to a new rpc_xprt_switch or NULL 403 * 404 * Swaps out the existing xpi->xpi_xpswitch with a new value. 405 */ 406 struct rpc_xprt_switch *xprt_iter_xchg_switch(struct rpc_xprt_iter *xpi, 407 struct rpc_xprt_switch *newswitch) 408 { 409 struct rpc_xprt_switch __rcu *oldswitch; 410 411 /* Atomically swap out the old xpswitch */ 412 oldswitch = xchg(&xpi->xpi_xpswitch, RCU_INITIALIZER(newswitch)); 413 if (newswitch != NULL) 414 xprt_iter_rewind(xpi); 415 return rcu_dereference_protected(oldswitch, true); 416 } 417 418 /** 419 * xprt_iter_destroy - Destroys the xprt iterator 420 * @xpi: pointer to rpc_xprt_iter 421 */ 422 void xprt_iter_destroy(struct rpc_xprt_iter *xpi) 423 { 424 xprt_switch_put(xprt_iter_xchg_switch(xpi, NULL)); 425 } 426 427 /** 428 * xprt_iter_xprt - Returns the rpc_xprt pointed to by the cursor 429 * @xpi: pointer to rpc_xprt_iter 430 * 431 * Returns a pointer to the struct rpc_xprt that is currently 432 * pointed to by the cursor. 433 * Caller must be holding rcu_read_lock(). 434 */ 435 struct rpc_xprt *xprt_iter_xprt(struct rpc_xprt_iter *xpi) 436 { 437 WARN_ON_ONCE(!rcu_read_lock_held()); 438 return xprt_iter_ops(xpi)->xpi_xprt(xpi); 439 } 440 441 static 442 struct rpc_xprt *xprt_iter_get_helper(struct rpc_xprt_iter *xpi, 443 struct rpc_xprt *(*fn)(struct rpc_xprt_iter *)) 444 { 445 struct rpc_xprt *ret; 446 447 do { 448 ret = fn(xpi); 449 if (ret == NULL) 450 break; 451 ret = xprt_get(ret); 452 } while (ret == NULL); 453 return ret; 454 } 455 456 /** 457 * xprt_iter_get_xprt - Returns the rpc_xprt pointed to by the cursor 458 * @xpi: pointer to rpc_xprt_iter 459 * 460 * Returns a reference to the struct rpc_xprt that is currently 461 * pointed to by the cursor. 462 */ 463 struct rpc_xprt *xprt_iter_get_xprt(struct rpc_xprt_iter *xpi) 464 { 465 struct rpc_xprt *xprt; 466 467 rcu_read_lock(); 468 xprt = xprt_iter_get_helper(xpi, xprt_iter_ops(xpi)->xpi_xprt); 469 rcu_read_unlock(); 470 return xprt; 471 } 472 473 /** 474 * xprt_iter_get_next - Returns the next rpc_xprt following the cursor 475 * @xpi: pointer to rpc_xprt_iter 476 * 477 * Returns a reference to the struct rpc_xprt that immediately follows the 478 * entry pointed to by the cursor. 479 */ 480 struct rpc_xprt *xprt_iter_get_next(struct rpc_xprt_iter *xpi) 481 { 482 struct rpc_xprt *xprt; 483 484 rcu_read_lock(); 485 xprt = xprt_iter_get_helper(xpi, xprt_iter_ops(xpi)->xpi_next); 486 rcu_read_unlock(); 487 return xprt; 488 } 489 490 /* Policy for always returning the first entry in the rpc_xprt_switch */ 491 static 492 const struct rpc_xprt_iter_ops rpc_xprt_iter_singular = { 493 .xpi_rewind = xprt_iter_no_rewind, 494 .xpi_xprt = xprt_iter_first_entry, 495 .xpi_next = xprt_iter_first_entry, 496 }; 497 498 /* Policy for round-robin iteration of entries in the rpc_xprt_switch */ 499 static 500 const struct rpc_xprt_iter_ops rpc_xprt_iter_roundrobin = { 501 .xpi_rewind = xprt_iter_default_rewind, 502 .xpi_xprt = xprt_iter_current_entry, 503 .xpi_next = xprt_iter_next_entry_roundrobin, 504 }; 505 506 /* Policy for once-through iteration of entries in the rpc_xprt_switch */ 507 static 508 const struct rpc_xprt_iter_ops rpc_xprt_iter_listall = { 509 .xpi_rewind = xprt_iter_default_rewind, 510 .xpi_xprt = xprt_iter_current_entry, 511 .xpi_next = xprt_iter_next_entry_all, 512 }; 513