1 // SPDX-License-Identifier: GPL-2.0 2 /* 3 * Multipath support for RPC 4 * 5 * Copyright (c) 2015, 2016, Primary Data, Inc. All rights reserved. 6 * 7 * Trond Myklebust <trond.myklebust@primarydata.com> 8 * 9 */ 10 #include <linux/types.h> 11 #include <linux/kref.h> 12 #include <linux/list.h> 13 #include <linux/rcupdate.h> 14 #include <linux/rculist.h> 15 #include <linux/slab.h> 16 #include <asm/cmpxchg.h> 17 #include <linux/spinlock.h> 18 #include <linux/sunrpc/xprt.h> 19 #include <linux/sunrpc/addr.h> 20 #include <linux/sunrpc/xprtmultipath.h> 21 22 #include "sysfs.h" 23 24 typedef struct rpc_xprt *(*xprt_switch_find_xprt_t)(struct rpc_xprt_switch *xps, 25 const struct rpc_xprt *cur); 26 27 static const struct rpc_xprt_iter_ops rpc_xprt_iter_singular; 28 static const struct rpc_xprt_iter_ops rpc_xprt_iter_roundrobin; 29 static const struct rpc_xprt_iter_ops rpc_xprt_iter_listall; 30 31 static void xprt_switch_add_xprt_locked(struct rpc_xprt_switch *xps, 32 struct rpc_xprt *xprt) 33 { 34 if (unlikely(xprt_get(xprt) == NULL)) 35 return; 36 list_add_tail_rcu(&xprt->xprt_switch, &xps->xps_xprt_list); 37 smp_wmb(); 38 if (xps->xps_nxprts == 0) 39 xps->xps_net = xprt->xprt_net; 40 xps->xps_nxprts++; 41 xps->xps_nactive++; 42 } 43 44 /** 45 * rpc_xprt_switch_add_xprt - Add a new rpc_xprt to an rpc_xprt_switch 46 * @xps: pointer to struct rpc_xprt_switch 47 * @xprt: pointer to struct rpc_xprt 48 * 49 * Adds xprt to the end of the list of struct rpc_xprt in xps. 50 */ 51 void rpc_xprt_switch_add_xprt(struct rpc_xprt_switch *xps, 52 struct rpc_xprt *xprt) 53 { 54 if (xprt == NULL) 55 return; 56 spin_lock(&xps->xps_lock); 57 if (xps->xps_net == xprt->xprt_net || xps->xps_net == NULL) 58 xprt_switch_add_xprt_locked(xps, xprt); 59 spin_unlock(&xps->xps_lock); 60 rpc_sysfs_xprt_setup(xps, xprt, GFP_KERNEL); 61 } 62 63 static void xprt_switch_remove_xprt_locked(struct rpc_xprt_switch *xps, 64 struct rpc_xprt *xprt) 65 { 66 if (unlikely(xprt == NULL)) 67 return; 68 xps->xps_nactive--; 69 xps->xps_nxprts--; 70 if (xps->xps_nxprts == 0) 71 xps->xps_net = NULL; 72 smp_wmb(); 73 list_del_rcu(&xprt->xprt_switch); 74 } 75 76 /** 77 * rpc_xprt_switch_remove_xprt - Removes an rpc_xprt from a rpc_xprt_switch 78 * @xps: pointer to struct rpc_xprt_switch 79 * @xprt: pointer to struct rpc_xprt 80 * 81 * Removes xprt from the list of struct rpc_xprt in xps. 82 */ 83 void rpc_xprt_switch_remove_xprt(struct rpc_xprt_switch *xps, 84 struct rpc_xprt *xprt) 85 { 86 spin_lock(&xps->xps_lock); 87 xprt_switch_remove_xprt_locked(xps, xprt); 88 spin_unlock(&xps->xps_lock); 89 xprt_put(xprt); 90 } 91 92 static DEFINE_IDA(rpc_xprtswitch_ids); 93 94 void xprt_multipath_cleanup_ids(void) 95 { 96 ida_destroy(&rpc_xprtswitch_ids); 97 } 98 99 static int xprt_switch_alloc_id(struct rpc_xprt_switch *xps, gfp_t gfp_flags) 100 { 101 int id; 102 103 id = ida_simple_get(&rpc_xprtswitch_ids, 0, 0, gfp_flags); 104 if (id < 0) 105 return id; 106 107 xps->xps_id = id; 108 return 0; 109 } 110 111 static void xprt_switch_free_id(struct rpc_xprt_switch *xps) 112 { 113 ida_simple_remove(&rpc_xprtswitch_ids, xps->xps_id); 114 } 115 116 /** 117 * xprt_switch_alloc - Allocate a new struct rpc_xprt_switch 118 * @xprt: pointer to struct rpc_xprt 119 * @gfp_flags: allocation flags 120 * 121 * On success, returns an initialised struct rpc_xprt_switch, containing 122 * the entry xprt. Returns NULL on failure. 123 */ 124 struct rpc_xprt_switch *xprt_switch_alloc(struct rpc_xprt *xprt, 125 gfp_t gfp_flags) 126 { 127 struct rpc_xprt_switch *xps; 128 129 xps = kmalloc(sizeof(*xps), gfp_flags); 130 if (xps != NULL) { 131 spin_lock_init(&xps->xps_lock); 132 kref_init(&xps->xps_kref); 133 xprt_switch_alloc_id(xps, gfp_flags); 134 xps->xps_nxprts = xps->xps_nactive = 0; 135 atomic_long_set(&xps->xps_queuelen, 0); 136 xps->xps_net = NULL; 137 INIT_LIST_HEAD(&xps->xps_xprt_list); 138 xps->xps_iter_ops = &rpc_xprt_iter_singular; 139 rpc_sysfs_xprt_switch_setup(xps, xprt, gfp_flags); 140 xprt_switch_add_xprt_locked(xps, xprt); 141 rpc_sysfs_xprt_setup(xps, xprt, gfp_flags); 142 } 143 144 return xps; 145 } 146 147 static void xprt_switch_free_entries(struct rpc_xprt_switch *xps) 148 { 149 spin_lock(&xps->xps_lock); 150 while (!list_empty(&xps->xps_xprt_list)) { 151 struct rpc_xprt *xprt; 152 153 xprt = list_first_entry(&xps->xps_xprt_list, 154 struct rpc_xprt, xprt_switch); 155 xprt_switch_remove_xprt_locked(xps, xprt); 156 spin_unlock(&xps->xps_lock); 157 xprt_put(xprt); 158 spin_lock(&xps->xps_lock); 159 } 160 spin_unlock(&xps->xps_lock); 161 } 162 163 static void xprt_switch_free(struct kref *kref) 164 { 165 struct rpc_xprt_switch *xps = container_of(kref, 166 struct rpc_xprt_switch, xps_kref); 167 168 xprt_switch_free_entries(xps); 169 rpc_sysfs_xprt_switch_destroy(xps); 170 xprt_switch_free_id(xps); 171 kfree_rcu(xps, xps_rcu); 172 } 173 174 /** 175 * xprt_switch_get - Return a reference to a rpc_xprt_switch 176 * @xps: pointer to struct rpc_xprt_switch 177 * 178 * Returns a reference to xps unless the refcount is already zero. 179 */ 180 struct rpc_xprt_switch *xprt_switch_get(struct rpc_xprt_switch *xps) 181 { 182 if (xps != NULL && kref_get_unless_zero(&xps->xps_kref)) 183 return xps; 184 return NULL; 185 } 186 187 /** 188 * xprt_switch_put - Release a reference to a rpc_xprt_switch 189 * @xps: pointer to struct rpc_xprt_switch 190 * 191 * Release the reference to xps, and free it once the refcount is zero. 192 */ 193 void xprt_switch_put(struct rpc_xprt_switch *xps) 194 { 195 if (xps != NULL) 196 kref_put(&xps->xps_kref, xprt_switch_free); 197 } 198 199 /** 200 * rpc_xprt_switch_set_roundrobin - Set a round-robin policy on rpc_xprt_switch 201 * @xps: pointer to struct rpc_xprt_switch 202 * 203 * Sets a round-robin default policy for iterators acting on xps. 204 */ 205 void rpc_xprt_switch_set_roundrobin(struct rpc_xprt_switch *xps) 206 { 207 if (READ_ONCE(xps->xps_iter_ops) != &rpc_xprt_iter_roundrobin) 208 WRITE_ONCE(xps->xps_iter_ops, &rpc_xprt_iter_roundrobin); 209 } 210 211 static 212 const struct rpc_xprt_iter_ops *xprt_iter_ops(const struct rpc_xprt_iter *xpi) 213 { 214 if (xpi->xpi_ops != NULL) 215 return xpi->xpi_ops; 216 return rcu_dereference(xpi->xpi_xpswitch)->xps_iter_ops; 217 } 218 219 static 220 void xprt_iter_no_rewind(struct rpc_xprt_iter *xpi) 221 { 222 } 223 224 static 225 void xprt_iter_default_rewind(struct rpc_xprt_iter *xpi) 226 { 227 WRITE_ONCE(xpi->xpi_cursor, NULL); 228 } 229 230 static 231 bool xprt_is_active(const struct rpc_xprt *xprt) 232 { 233 return kref_read(&xprt->kref) != 0; 234 } 235 236 static 237 struct rpc_xprt *xprt_switch_find_first_entry(struct list_head *head) 238 { 239 struct rpc_xprt *pos; 240 241 list_for_each_entry_rcu(pos, head, xprt_switch) { 242 if (xprt_is_active(pos)) 243 return pos; 244 } 245 return NULL; 246 } 247 248 static 249 struct rpc_xprt *xprt_iter_first_entry(struct rpc_xprt_iter *xpi) 250 { 251 struct rpc_xprt_switch *xps = rcu_dereference(xpi->xpi_xpswitch); 252 253 if (xps == NULL) 254 return NULL; 255 return xprt_switch_find_first_entry(&xps->xps_xprt_list); 256 } 257 258 static 259 struct rpc_xprt *xprt_switch_find_current_entry(struct list_head *head, 260 const struct rpc_xprt *cur) 261 { 262 struct rpc_xprt *pos; 263 bool found = false; 264 265 list_for_each_entry_rcu(pos, head, xprt_switch) { 266 if (cur == pos) 267 found = true; 268 if (found && xprt_is_active(pos)) 269 return pos; 270 } 271 return NULL; 272 } 273 274 static 275 struct rpc_xprt *xprt_iter_current_entry(struct rpc_xprt_iter *xpi) 276 { 277 struct rpc_xprt_switch *xps = rcu_dereference(xpi->xpi_xpswitch); 278 struct list_head *head; 279 280 if (xps == NULL) 281 return NULL; 282 head = &xps->xps_xprt_list; 283 if (xpi->xpi_cursor == NULL || xps->xps_nxprts < 2) 284 return xprt_switch_find_first_entry(head); 285 return xprt_switch_find_current_entry(head, xpi->xpi_cursor); 286 } 287 288 bool rpc_xprt_switch_has_addr(struct rpc_xprt_switch *xps, 289 const struct sockaddr *sap) 290 { 291 struct list_head *head; 292 struct rpc_xprt *pos; 293 294 if (xps == NULL || sap == NULL) 295 return false; 296 297 head = &xps->xps_xprt_list; 298 list_for_each_entry_rcu(pos, head, xprt_switch) { 299 if (rpc_cmp_addr_port(sap, (struct sockaddr *)&pos->addr)) { 300 pr_info("RPC: addr %s already in xprt switch\n", 301 pos->address_strings[RPC_DISPLAY_ADDR]); 302 return true; 303 } 304 } 305 return false; 306 } 307 308 static 309 struct rpc_xprt *xprt_switch_find_next_entry(struct list_head *head, 310 const struct rpc_xprt *cur) 311 { 312 struct rpc_xprt *pos, *prev = NULL; 313 bool found = false; 314 315 list_for_each_entry_rcu(pos, head, xprt_switch) { 316 if (cur == prev) 317 found = true; 318 if (found && xprt_is_active(pos)) 319 return pos; 320 prev = pos; 321 } 322 return NULL; 323 } 324 325 static 326 struct rpc_xprt *xprt_switch_set_next_cursor(struct rpc_xprt_switch *xps, 327 struct rpc_xprt **cursor, 328 xprt_switch_find_xprt_t find_next) 329 { 330 struct rpc_xprt *pos, *old; 331 332 old = smp_load_acquire(cursor); 333 pos = find_next(xps, old); 334 smp_store_release(cursor, pos); 335 return pos; 336 } 337 338 static 339 struct rpc_xprt *xprt_iter_next_entry_multiple(struct rpc_xprt_iter *xpi, 340 xprt_switch_find_xprt_t find_next) 341 { 342 struct rpc_xprt_switch *xps = rcu_dereference(xpi->xpi_xpswitch); 343 344 if (xps == NULL) 345 return NULL; 346 return xprt_switch_set_next_cursor(xps, &xpi->xpi_cursor, find_next); 347 } 348 349 static 350 struct rpc_xprt *__xprt_switch_find_next_entry_roundrobin(struct list_head *head, 351 const struct rpc_xprt *cur) 352 { 353 struct rpc_xprt *ret; 354 355 ret = xprt_switch_find_next_entry(head, cur); 356 if (ret != NULL) 357 return ret; 358 return xprt_switch_find_first_entry(head); 359 } 360 361 static 362 struct rpc_xprt *xprt_switch_find_next_entry_roundrobin(struct rpc_xprt_switch *xps, 363 const struct rpc_xprt *cur) 364 { 365 struct list_head *head = &xps->xps_xprt_list; 366 struct rpc_xprt *xprt; 367 unsigned int nactive; 368 369 for (;;) { 370 unsigned long xprt_queuelen, xps_queuelen; 371 372 xprt = __xprt_switch_find_next_entry_roundrobin(head, cur); 373 if (!xprt) 374 break; 375 xprt_queuelen = atomic_long_read(&xprt->queuelen); 376 xps_queuelen = atomic_long_read(&xps->xps_queuelen); 377 nactive = READ_ONCE(xps->xps_nactive); 378 /* Exit loop if xprt_queuelen <= average queue length */ 379 if (xprt_queuelen * nactive <= xps_queuelen) 380 break; 381 cur = xprt; 382 } 383 return xprt; 384 } 385 386 static 387 struct rpc_xprt *xprt_iter_next_entry_roundrobin(struct rpc_xprt_iter *xpi) 388 { 389 return xprt_iter_next_entry_multiple(xpi, 390 xprt_switch_find_next_entry_roundrobin); 391 } 392 393 static 394 struct rpc_xprt *xprt_switch_find_next_entry_all(struct rpc_xprt_switch *xps, 395 const struct rpc_xprt *cur) 396 { 397 return xprt_switch_find_next_entry(&xps->xps_xprt_list, cur); 398 } 399 400 static 401 struct rpc_xprt *xprt_iter_next_entry_all(struct rpc_xprt_iter *xpi) 402 { 403 return xprt_iter_next_entry_multiple(xpi, 404 xprt_switch_find_next_entry_all); 405 } 406 407 /* 408 * xprt_iter_rewind - Resets the xprt iterator 409 * @xpi: pointer to rpc_xprt_iter 410 * 411 * Resets xpi to ensure that it points to the first entry in the list 412 * of transports. 413 */ 414 static 415 void xprt_iter_rewind(struct rpc_xprt_iter *xpi) 416 { 417 rcu_read_lock(); 418 xprt_iter_ops(xpi)->xpi_rewind(xpi); 419 rcu_read_unlock(); 420 } 421 422 static void __xprt_iter_init(struct rpc_xprt_iter *xpi, 423 struct rpc_xprt_switch *xps, 424 const struct rpc_xprt_iter_ops *ops) 425 { 426 rcu_assign_pointer(xpi->xpi_xpswitch, xprt_switch_get(xps)); 427 xpi->xpi_cursor = NULL; 428 xpi->xpi_ops = ops; 429 } 430 431 /** 432 * xprt_iter_init - Initialise an xprt iterator 433 * @xpi: pointer to rpc_xprt_iter 434 * @xps: pointer to rpc_xprt_switch 435 * 436 * Initialises the iterator to use the default iterator ops 437 * as set in xps. This function is mainly intended for internal 438 * use in the rpc_client. 439 */ 440 void xprt_iter_init(struct rpc_xprt_iter *xpi, 441 struct rpc_xprt_switch *xps) 442 { 443 __xprt_iter_init(xpi, xps, NULL); 444 } 445 446 /** 447 * xprt_iter_init_listall - Initialise an xprt iterator 448 * @xpi: pointer to rpc_xprt_iter 449 * @xps: pointer to rpc_xprt_switch 450 * 451 * Initialises the iterator to iterate once through the entire list 452 * of entries in xps. 453 */ 454 void xprt_iter_init_listall(struct rpc_xprt_iter *xpi, 455 struct rpc_xprt_switch *xps) 456 { 457 __xprt_iter_init(xpi, xps, &rpc_xprt_iter_listall); 458 } 459 460 /** 461 * xprt_iter_xchg_switch - Atomically swap out the rpc_xprt_switch 462 * @xpi: pointer to rpc_xprt_iter 463 * @newswitch: pointer to a new rpc_xprt_switch or NULL 464 * 465 * Swaps out the existing xpi->xpi_xpswitch with a new value. 466 */ 467 struct rpc_xprt_switch *xprt_iter_xchg_switch(struct rpc_xprt_iter *xpi, 468 struct rpc_xprt_switch *newswitch) 469 { 470 struct rpc_xprt_switch __rcu *oldswitch; 471 472 /* Atomically swap out the old xpswitch */ 473 oldswitch = xchg(&xpi->xpi_xpswitch, RCU_INITIALIZER(newswitch)); 474 if (newswitch != NULL) 475 xprt_iter_rewind(xpi); 476 return rcu_dereference_protected(oldswitch, true); 477 } 478 479 /** 480 * xprt_iter_destroy - Destroys the xprt iterator 481 * @xpi: pointer to rpc_xprt_iter 482 */ 483 void xprt_iter_destroy(struct rpc_xprt_iter *xpi) 484 { 485 xprt_switch_put(xprt_iter_xchg_switch(xpi, NULL)); 486 } 487 488 /** 489 * xprt_iter_xprt - Returns the rpc_xprt pointed to by the cursor 490 * @xpi: pointer to rpc_xprt_iter 491 * 492 * Returns a pointer to the struct rpc_xprt that is currently 493 * pointed to by the cursor. 494 * Caller must be holding rcu_read_lock(). 495 */ 496 struct rpc_xprt *xprt_iter_xprt(struct rpc_xprt_iter *xpi) 497 { 498 WARN_ON_ONCE(!rcu_read_lock_held()); 499 return xprt_iter_ops(xpi)->xpi_xprt(xpi); 500 } 501 502 static 503 struct rpc_xprt *xprt_iter_get_helper(struct rpc_xprt_iter *xpi, 504 struct rpc_xprt *(*fn)(struct rpc_xprt_iter *)) 505 { 506 struct rpc_xprt *ret; 507 508 do { 509 ret = fn(xpi); 510 if (ret == NULL) 511 break; 512 ret = xprt_get(ret); 513 } while (ret == NULL); 514 return ret; 515 } 516 517 /** 518 * xprt_iter_get_xprt - Returns the rpc_xprt pointed to by the cursor 519 * @xpi: pointer to rpc_xprt_iter 520 * 521 * Returns a reference to the struct rpc_xprt that is currently 522 * pointed to by the cursor. 523 */ 524 struct rpc_xprt *xprt_iter_get_xprt(struct rpc_xprt_iter *xpi) 525 { 526 struct rpc_xprt *xprt; 527 528 rcu_read_lock(); 529 xprt = xprt_iter_get_helper(xpi, xprt_iter_ops(xpi)->xpi_xprt); 530 rcu_read_unlock(); 531 return xprt; 532 } 533 534 /** 535 * xprt_iter_get_next - Returns the next rpc_xprt following the cursor 536 * @xpi: pointer to rpc_xprt_iter 537 * 538 * Returns a reference to the struct rpc_xprt that immediately follows the 539 * entry pointed to by the cursor. 540 */ 541 struct rpc_xprt *xprt_iter_get_next(struct rpc_xprt_iter *xpi) 542 { 543 struct rpc_xprt *xprt; 544 545 rcu_read_lock(); 546 xprt = xprt_iter_get_helper(xpi, xprt_iter_ops(xpi)->xpi_next); 547 rcu_read_unlock(); 548 return xprt; 549 } 550 551 /* Policy for always returning the first entry in the rpc_xprt_switch */ 552 static 553 const struct rpc_xprt_iter_ops rpc_xprt_iter_singular = { 554 .xpi_rewind = xprt_iter_no_rewind, 555 .xpi_xprt = xprt_iter_first_entry, 556 .xpi_next = xprt_iter_first_entry, 557 }; 558 559 /* Policy for round-robin iteration of entries in the rpc_xprt_switch */ 560 static 561 const struct rpc_xprt_iter_ops rpc_xprt_iter_roundrobin = { 562 .xpi_rewind = xprt_iter_default_rewind, 563 .xpi_xprt = xprt_iter_current_entry, 564 .xpi_next = xprt_iter_next_entry_roundrobin, 565 }; 566 567 /* Policy for once-through iteration of entries in the rpc_xprt_switch */ 568 static 569 const struct rpc_xprt_iter_ops rpc_xprt_iter_listall = { 570 .xpi_rewind = xprt_iter_default_rewind, 571 .xpi_xprt = xprt_iter_current_entry, 572 .xpi_next = xprt_iter_next_entry_all, 573 }; 574