1 // SPDX-License-Identifier: GPL-2.0 2 /* 3 * Multipath support for RPC 4 * 5 * Copyright (c) 2015, 2016, Primary Data, Inc. All rights reserved. 6 * 7 * Trond Myklebust <trond.myklebust@primarydata.com> 8 * 9 */ 10 #include <linux/types.h> 11 #include <linux/kref.h> 12 #include <linux/list.h> 13 #include <linux/rcupdate.h> 14 #include <linux/rculist.h> 15 #include <linux/slab.h> 16 #include <asm/cmpxchg.h> 17 #include <linux/spinlock.h> 18 #include <linux/sunrpc/xprt.h> 19 #include <linux/sunrpc/addr.h> 20 #include <linux/sunrpc/xprtmultipath.h> 21 22 typedef struct rpc_xprt *(*xprt_switch_find_xprt_t)(struct rpc_xprt_switch *xps, 23 const struct rpc_xprt *cur); 24 25 static const struct rpc_xprt_iter_ops rpc_xprt_iter_singular; 26 static const struct rpc_xprt_iter_ops rpc_xprt_iter_roundrobin; 27 static const struct rpc_xprt_iter_ops rpc_xprt_iter_listall; 28 29 static void xprt_switch_add_xprt_locked(struct rpc_xprt_switch *xps, 30 struct rpc_xprt *xprt) 31 { 32 if (unlikely(xprt_get(xprt) == NULL)) 33 return; 34 list_add_tail_rcu(&xprt->xprt_switch, &xps->xps_xprt_list); 35 smp_wmb(); 36 if (xps->xps_nxprts == 0) 37 xps->xps_net = xprt->xprt_net; 38 xps->xps_nxprts++; 39 xps->xps_nactive++; 40 } 41 42 /** 43 * rpc_xprt_switch_add_xprt - Add a new rpc_xprt to an rpc_xprt_switch 44 * @xps: pointer to struct rpc_xprt_switch 45 * @xprt: pointer to struct rpc_xprt 46 * 47 * Adds xprt to the end of the list of struct rpc_xprt in xps. 48 */ 49 void rpc_xprt_switch_add_xprt(struct rpc_xprt_switch *xps, 50 struct rpc_xprt *xprt) 51 { 52 if (xprt == NULL) 53 return; 54 spin_lock(&xps->xps_lock); 55 if (xps->xps_net == xprt->xprt_net || xps->xps_net == NULL) 56 xprt_switch_add_xprt_locked(xps, xprt); 57 spin_unlock(&xps->xps_lock); 58 } 59 60 static void xprt_switch_remove_xprt_locked(struct rpc_xprt_switch *xps, 61 struct rpc_xprt *xprt) 62 { 63 if (unlikely(xprt == NULL)) 64 return; 65 xps->xps_nactive--; 66 xps->xps_nxprts--; 67 if (xps->xps_nxprts == 0) 68 xps->xps_net = NULL; 69 smp_wmb(); 70 list_del_rcu(&xprt->xprt_switch); 71 } 72 73 /** 74 * rpc_xprt_switch_remove_xprt - Removes an rpc_xprt from a rpc_xprt_switch 75 * @xps: pointer to struct rpc_xprt_switch 76 * @xprt: pointer to struct rpc_xprt 77 * 78 * Removes xprt from the list of struct rpc_xprt in xps. 79 */ 80 void rpc_xprt_switch_remove_xprt(struct rpc_xprt_switch *xps, 81 struct rpc_xprt *xprt) 82 { 83 spin_lock(&xps->xps_lock); 84 xprt_switch_remove_xprt_locked(xps, xprt); 85 spin_unlock(&xps->xps_lock); 86 xprt_put(xprt); 87 } 88 89 static DEFINE_IDA(rpc_xprtswitch_ids); 90 91 void xprt_multipath_cleanup_ids(void) 92 { 93 ida_destroy(&rpc_xprtswitch_ids); 94 } 95 96 static int xprt_switch_alloc_id(struct rpc_xprt_switch *xps, gfp_t gfp_flags) 97 { 98 int id; 99 100 id = ida_simple_get(&rpc_xprtswitch_ids, 0, 0, gfp_flags); 101 if (id < 0) 102 return id; 103 104 xps->xps_id = id; 105 return 0; 106 } 107 108 static void xprt_switch_free_id(struct rpc_xprt_switch *xps) 109 { 110 ida_simple_remove(&rpc_xprtswitch_ids, xps->xps_id); 111 } 112 113 /** 114 * xprt_switch_alloc - Allocate a new struct rpc_xprt_switch 115 * @xprt: pointer to struct rpc_xprt 116 * @gfp_flags: allocation flags 117 * 118 * On success, returns an initialised struct rpc_xprt_switch, containing 119 * the entry xprt. Returns NULL on failure. 120 */ 121 struct rpc_xprt_switch *xprt_switch_alloc(struct rpc_xprt *xprt, 122 gfp_t gfp_flags) 123 { 124 struct rpc_xprt_switch *xps; 125 126 xps = kmalloc(sizeof(*xps), gfp_flags); 127 if (xps != NULL) { 128 spin_lock_init(&xps->xps_lock); 129 kref_init(&xps->xps_kref); 130 xprt_switch_alloc_id(xps, gfp_flags); 131 xps->xps_nxprts = xps->xps_nactive = 0; 132 atomic_long_set(&xps->xps_queuelen, 0); 133 xps->xps_net = NULL; 134 INIT_LIST_HEAD(&xps->xps_xprt_list); 135 xps->xps_iter_ops = &rpc_xprt_iter_singular; 136 xprt_switch_add_xprt_locked(xps, xprt); 137 } 138 139 return xps; 140 } 141 142 static void xprt_switch_free_entries(struct rpc_xprt_switch *xps) 143 { 144 spin_lock(&xps->xps_lock); 145 while (!list_empty(&xps->xps_xprt_list)) { 146 struct rpc_xprt *xprt; 147 148 xprt = list_first_entry(&xps->xps_xprt_list, 149 struct rpc_xprt, xprt_switch); 150 xprt_switch_remove_xprt_locked(xps, xprt); 151 spin_unlock(&xps->xps_lock); 152 xprt_put(xprt); 153 spin_lock(&xps->xps_lock); 154 } 155 spin_unlock(&xps->xps_lock); 156 } 157 158 static void xprt_switch_free(struct kref *kref) 159 { 160 struct rpc_xprt_switch *xps = container_of(kref, 161 struct rpc_xprt_switch, xps_kref); 162 163 xprt_switch_free_entries(xps); 164 xprt_switch_free_id(xps); 165 kfree_rcu(xps, xps_rcu); 166 } 167 168 /** 169 * xprt_switch_get - Return a reference to a rpc_xprt_switch 170 * @xps: pointer to struct rpc_xprt_switch 171 * 172 * Returns a reference to xps unless the refcount is already zero. 173 */ 174 struct rpc_xprt_switch *xprt_switch_get(struct rpc_xprt_switch *xps) 175 { 176 if (xps != NULL && kref_get_unless_zero(&xps->xps_kref)) 177 return xps; 178 return NULL; 179 } 180 181 /** 182 * xprt_switch_put - Release a reference to a rpc_xprt_switch 183 * @xps: pointer to struct rpc_xprt_switch 184 * 185 * Release the reference to xps, and free it once the refcount is zero. 186 */ 187 void xprt_switch_put(struct rpc_xprt_switch *xps) 188 { 189 if (xps != NULL) 190 kref_put(&xps->xps_kref, xprt_switch_free); 191 } 192 193 /** 194 * rpc_xprt_switch_set_roundrobin - Set a round-robin policy on rpc_xprt_switch 195 * @xps: pointer to struct rpc_xprt_switch 196 * 197 * Sets a round-robin default policy for iterators acting on xps. 198 */ 199 void rpc_xprt_switch_set_roundrobin(struct rpc_xprt_switch *xps) 200 { 201 if (READ_ONCE(xps->xps_iter_ops) != &rpc_xprt_iter_roundrobin) 202 WRITE_ONCE(xps->xps_iter_ops, &rpc_xprt_iter_roundrobin); 203 } 204 205 static 206 const struct rpc_xprt_iter_ops *xprt_iter_ops(const struct rpc_xprt_iter *xpi) 207 { 208 if (xpi->xpi_ops != NULL) 209 return xpi->xpi_ops; 210 return rcu_dereference(xpi->xpi_xpswitch)->xps_iter_ops; 211 } 212 213 static 214 void xprt_iter_no_rewind(struct rpc_xprt_iter *xpi) 215 { 216 } 217 218 static 219 void xprt_iter_default_rewind(struct rpc_xprt_iter *xpi) 220 { 221 WRITE_ONCE(xpi->xpi_cursor, NULL); 222 } 223 224 static 225 bool xprt_is_active(const struct rpc_xprt *xprt) 226 { 227 return kref_read(&xprt->kref) != 0; 228 } 229 230 static 231 struct rpc_xprt *xprt_switch_find_first_entry(struct list_head *head) 232 { 233 struct rpc_xprt *pos; 234 235 list_for_each_entry_rcu(pos, head, xprt_switch) { 236 if (xprt_is_active(pos)) 237 return pos; 238 } 239 return NULL; 240 } 241 242 static 243 struct rpc_xprt *xprt_iter_first_entry(struct rpc_xprt_iter *xpi) 244 { 245 struct rpc_xprt_switch *xps = rcu_dereference(xpi->xpi_xpswitch); 246 247 if (xps == NULL) 248 return NULL; 249 return xprt_switch_find_first_entry(&xps->xps_xprt_list); 250 } 251 252 static 253 struct rpc_xprt *xprt_switch_find_current_entry(struct list_head *head, 254 const struct rpc_xprt *cur) 255 { 256 struct rpc_xprt *pos; 257 bool found = false; 258 259 list_for_each_entry_rcu(pos, head, xprt_switch) { 260 if (cur == pos) 261 found = true; 262 if (found && xprt_is_active(pos)) 263 return pos; 264 } 265 return NULL; 266 } 267 268 static 269 struct rpc_xprt *xprt_iter_current_entry(struct rpc_xprt_iter *xpi) 270 { 271 struct rpc_xprt_switch *xps = rcu_dereference(xpi->xpi_xpswitch); 272 struct list_head *head; 273 274 if (xps == NULL) 275 return NULL; 276 head = &xps->xps_xprt_list; 277 if (xpi->xpi_cursor == NULL || xps->xps_nxprts < 2) 278 return xprt_switch_find_first_entry(head); 279 return xprt_switch_find_current_entry(head, xpi->xpi_cursor); 280 } 281 282 bool rpc_xprt_switch_has_addr(struct rpc_xprt_switch *xps, 283 const struct sockaddr *sap) 284 { 285 struct list_head *head; 286 struct rpc_xprt *pos; 287 288 if (xps == NULL || sap == NULL) 289 return false; 290 291 head = &xps->xps_xprt_list; 292 list_for_each_entry_rcu(pos, head, xprt_switch) { 293 if (rpc_cmp_addr_port(sap, (struct sockaddr *)&pos->addr)) { 294 pr_info("RPC: addr %s already in xprt switch\n", 295 pos->address_strings[RPC_DISPLAY_ADDR]); 296 return true; 297 } 298 } 299 return false; 300 } 301 302 static 303 struct rpc_xprt *xprt_switch_find_next_entry(struct list_head *head, 304 const struct rpc_xprt *cur) 305 { 306 struct rpc_xprt *pos, *prev = NULL; 307 bool found = false; 308 309 list_for_each_entry_rcu(pos, head, xprt_switch) { 310 if (cur == prev) 311 found = true; 312 if (found && xprt_is_active(pos)) 313 return pos; 314 prev = pos; 315 } 316 return NULL; 317 } 318 319 static 320 struct rpc_xprt *xprt_switch_set_next_cursor(struct rpc_xprt_switch *xps, 321 struct rpc_xprt **cursor, 322 xprt_switch_find_xprt_t find_next) 323 { 324 struct rpc_xprt *pos, *old; 325 326 old = smp_load_acquire(cursor); 327 pos = find_next(xps, old); 328 smp_store_release(cursor, pos); 329 return pos; 330 } 331 332 static 333 struct rpc_xprt *xprt_iter_next_entry_multiple(struct rpc_xprt_iter *xpi, 334 xprt_switch_find_xprt_t find_next) 335 { 336 struct rpc_xprt_switch *xps = rcu_dereference(xpi->xpi_xpswitch); 337 338 if (xps == NULL) 339 return NULL; 340 return xprt_switch_set_next_cursor(xps, &xpi->xpi_cursor, find_next); 341 } 342 343 static 344 struct rpc_xprt *__xprt_switch_find_next_entry_roundrobin(struct list_head *head, 345 const struct rpc_xprt *cur) 346 { 347 struct rpc_xprt *ret; 348 349 ret = xprt_switch_find_next_entry(head, cur); 350 if (ret != NULL) 351 return ret; 352 return xprt_switch_find_first_entry(head); 353 } 354 355 static 356 struct rpc_xprt *xprt_switch_find_next_entry_roundrobin(struct rpc_xprt_switch *xps, 357 const struct rpc_xprt *cur) 358 { 359 struct list_head *head = &xps->xps_xprt_list; 360 struct rpc_xprt *xprt; 361 unsigned int nactive; 362 363 for (;;) { 364 unsigned long xprt_queuelen, xps_queuelen; 365 366 xprt = __xprt_switch_find_next_entry_roundrobin(head, cur); 367 if (!xprt) 368 break; 369 xprt_queuelen = atomic_long_read(&xprt->queuelen); 370 xps_queuelen = atomic_long_read(&xps->xps_queuelen); 371 nactive = READ_ONCE(xps->xps_nactive); 372 /* Exit loop if xprt_queuelen <= average queue length */ 373 if (xprt_queuelen * nactive <= xps_queuelen) 374 break; 375 cur = xprt; 376 } 377 return xprt; 378 } 379 380 static 381 struct rpc_xprt *xprt_iter_next_entry_roundrobin(struct rpc_xprt_iter *xpi) 382 { 383 return xprt_iter_next_entry_multiple(xpi, 384 xprt_switch_find_next_entry_roundrobin); 385 } 386 387 static 388 struct rpc_xprt *xprt_switch_find_next_entry_all(struct rpc_xprt_switch *xps, 389 const struct rpc_xprt *cur) 390 { 391 return xprt_switch_find_next_entry(&xps->xps_xprt_list, cur); 392 } 393 394 static 395 struct rpc_xprt *xprt_iter_next_entry_all(struct rpc_xprt_iter *xpi) 396 { 397 return xprt_iter_next_entry_multiple(xpi, 398 xprt_switch_find_next_entry_all); 399 } 400 401 /* 402 * xprt_iter_rewind - Resets the xprt iterator 403 * @xpi: pointer to rpc_xprt_iter 404 * 405 * Resets xpi to ensure that it points to the first entry in the list 406 * of transports. 407 */ 408 static 409 void xprt_iter_rewind(struct rpc_xprt_iter *xpi) 410 { 411 rcu_read_lock(); 412 xprt_iter_ops(xpi)->xpi_rewind(xpi); 413 rcu_read_unlock(); 414 } 415 416 static void __xprt_iter_init(struct rpc_xprt_iter *xpi, 417 struct rpc_xprt_switch *xps, 418 const struct rpc_xprt_iter_ops *ops) 419 { 420 rcu_assign_pointer(xpi->xpi_xpswitch, xprt_switch_get(xps)); 421 xpi->xpi_cursor = NULL; 422 xpi->xpi_ops = ops; 423 } 424 425 /** 426 * xprt_iter_init - Initialise an xprt iterator 427 * @xpi: pointer to rpc_xprt_iter 428 * @xps: pointer to rpc_xprt_switch 429 * 430 * Initialises the iterator to use the default iterator ops 431 * as set in xps. This function is mainly intended for internal 432 * use in the rpc_client. 433 */ 434 void xprt_iter_init(struct rpc_xprt_iter *xpi, 435 struct rpc_xprt_switch *xps) 436 { 437 __xprt_iter_init(xpi, xps, NULL); 438 } 439 440 /** 441 * xprt_iter_init_listall - Initialise an xprt iterator 442 * @xpi: pointer to rpc_xprt_iter 443 * @xps: pointer to rpc_xprt_switch 444 * 445 * Initialises the iterator to iterate once through the entire list 446 * of entries in xps. 447 */ 448 void xprt_iter_init_listall(struct rpc_xprt_iter *xpi, 449 struct rpc_xprt_switch *xps) 450 { 451 __xprt_iter_init(xpi, xps, &rpc_xprt_iter_listall); 452 } 453 454 /** 455 * xprt_iter_xchg_switch - Atomically swap out the rpc_xprt_switch 456 * @xpi: pointer to rpc_xprt_iter 457 * @newswitch: pointer to a new rpc_xprt_switch or NULL 458 * 459 * Swaps out the existing xpi->xpi_xpswitch with a new value. 460 */ 461 struct rpc_xprt_switch *xprt_iter_xchg_switch(struct rpc_xprt_iter *xpi, 462 struct rpc_xprt_switch *newswitch) 463 { 464 struct rpc_xprt_switch __rcu *oldswitch; 465 466 /* Atomically swap out the old xpswitch */ 467 oldswitch = xchg(&xpi->xpi_xpswitch, RCU_INITIALIZER(newswitch)); 468 if (newswitch != NULL) 469 xprt_iter_rewind(xpi); 470 return rcu_dereference_protected(oldswitch, true); 471 } 472 473 /** 474 * xprt_iter_destroy - Destroys the xprt iterator 475 * @xpi: pointer to rpc_xprt_iter 476 */ 477 void xprt_iter_destroy(struct rpc_xprt_iter *xpi) 478 { 479 xprt_switch_put(xprt_iter_xchg_switch(xpi, NULL)); 480 } 481 482 /** 483 * xprt_iter_xprt - Returns the rpc_xprt pointed to by the cursor 484 * @xpi: pointer to rpc_xprt_iter 485 * 486 * Returns a pointer to the struct rpc_xprt that is currently 487 * pointed to by the cursor. 488 * Caller must be holding rcu_read_lock(). 489 */ 490 struct rpc_xprt *xprt_iter_xprt(struct rpc_xprt_iter *xpi) 491 { 492 WARN_ON_ONCE(!rcu_read_lock_held()); 493 return xprt_iter_ops(xpi)->xpi_xprt(xpi); 494 } 495 496 static 497 struct rpc_xprt *xprt_iter_get_helper(struct rpc_xprt_iter *xpi, 498 struct rpc_xprt *(*fn)(struct rpc_xprt_iter *)) 499 { 500 struct rpc_xprt *ret; 501 502 do { 503 ret = fn(xpi); 504 if (ret == NULL) 505 break; 506 ret = xprt_get(ret); 507 } while (ret == NULL); 508 return ret; 509 } 510 511 /** 512 * xprt_iter_get_xprt - Returns the rpc_xprt pointed to by the cursor 513 * @xpi: pointer to rpc_xprt_iter 514 * 515 * Returns a reference to the struct rpc_xprt that is currently 516 * pointed to by the cursor. 517 */ 518 struct rpc_xprt *xprt_iter_get_xprt(struct rpc_xprt_iter *xpi) 519 { 520 struct rpc_xprt *xprt; 521 522 rcu_read_lock(); 523 xprt = xprt_iter_get_helper(xpi, xprt_iter_ops(xpi)->xpi_xprt); 524 rcu_read_unlock(); 525 return xprt; 526 } 527 528 /** 529 * xprt_iter_get_next - Returns the next rpc_xprt following the cursor 530 * @xpi: pointer to rpc_xprt_iter 531 * 532 * Returns a reference to the struct rpc_xprt that immediately follows the 533 * entry pointed to by the cursor. 534 */ 535 struct rpc_xprt *xprt_iter_get_next(struct rpc_xprt_iter *xpi) 536 { 537 struct rpc_xprt *xprt; 538 539 rcu_read_lock(); 540 xprt = xprt_iter_get_helper(xpi, xprt_iter_ops(xpi)->xpi_next); 541 rcu_read_unlock(); 542 return xprt; 543 } 544 545 /* Policy for always returning the first entry in the rpc_xprt_switch */ 546 static 547 const struct rpc_xprt_iter_ops rpc_xprt_iter_singular = { 548 .xpi_rewind = xprt_iter_no_rewind, 549 .xpi_xprt = xprt_iter_first_entry, 550 .xpi_next = xprt_iter_first_entry, 551 }; 552 553 /* Policy for round-robin iteration of entries in the rpc_xprt_switch */ 554 static 555 const struct rpc_xprt_iter_ops rpc_xprt_iter_roundrobin = { 556 .xpi_rewind = xprt_iter_default_rewind, 557 .xpi_xprt = xprt_iter_current_entry, 558 .xpi_next = xprt_iter_next_entry_roundrobin, 559 }; 560 561 /* Policy for once-through iteration of entries in the rpc_xprt_switch */ 562 static 563 const struct rpc_xprt_iter_ops rpc_xprt_iter_listall = { 564 .xpi_rewind = xprt_iter_default_rewind, 565 .xpi_xprt = xprt_iter_current_entry, 566 .xpi_next = xprt_iter_next_entry_all, 567 }; 568