1 // SPDX-License-Identifier: GPL-2.0 2 /* 3 * Multipath support for RPC 4 * 5 * Copyright (c) 2015, 2016, Primary Data, Inc. All rights reserved. 6 * 7 * Trond Myklebust <trond.myklebust@primarydata.com> 8 * 9 */ 10 #include <linux/types.h> 11 #include <linux/kref.h> 12 #include <linux/list.h> 13 #include <linux/rcupdate.h> 14 #include <linux/rculist.h> 15 #include <linux/slab.h> 16 #include <asm/cmpxchg.h> 17 #include <linux/spinlock.h> 18 #include <linux/sunrpc/xprt.h> 19 #include <linux/sunrpc/addr.h> 20 #include <linux/sunrpc/xprtmultipath.h> 21 22 typedef struct rpc_xprt *(*xprt_switch_find_xprt_t)(struct rpc_xprt_switch *xps, 23 const struct rpc_xprt *cur); 24 25 static const struct rpc_xprt_iter_ops rpc_xprt_iter_singular; 26 static const struct rpc_xprt_iter_ops rpc_xprt_iter_roundrobin; 27 static const struct rpc_xprt_iter_ops rpc_xprt_iter_listall; 28 29 static void xprt_switch_add_xprt_locked(struct rpc_xprt_switch *xps, 30 struct rpc_xprt *xprt) 31 { 32 if (unlikely(xprt_get(xprt) == NULL)) 33 return; 34 list_add_tail_rcu(&xprt->xprt_switch, &xps->xps_xprt_list); 35 smp_wmb(); 36 if (xps->xps_nxprts == 0) 37 xps->xps_net = xprt->xprt_net; 38 xps->xps_nxprts++; 39 xps->xps_nactive++; 40 } 41 42 /** 43 * rpc_xprt_switch_add_xprt - Add a new rpc_xprt to an rpc_xprt_switch 44 * @xps: pointer to struct rpc_xprt_switch 45 * @xprt: pointer to struct rpc_xprt 46 * 47 * Adds xprt to the end of the list of struct rpc_xprt in xps. 48 */ 49 void rpc_xprt_switch_add_xprt(struct rpc_xprt_switch *xps, 50 struct rpc_xprt *xprt) 51 { 52 if (xprt == NULL) 53 return; 54 spin_lock(&xps->xps_lock); 55 if (xps->xps_net == xprt->xprt_net || xps->xps_net == NULL) 56 xprt_switch_add_xprt_locked(xps, xprt); 57 spin_unlock(&xps->xps_lock); 58 } 59 60 static void xprt_switch_remove_xprt_locked(struct rpc_xprt_switch *xps, 61 struct rpc_xprt *xprt) 62 { 63 if (unlikely(xprt == NULL)) 64 return; 65 xps->xps_nactive--; 66 xps->xps_nxprts--; 67 if (xps->xps_nxprts == 0) 68 xps->xps_net = NULL; 69 smp_wmb(); 70 list_del_rcu(&xprt->xprt_switch); 71 } 72 73 /** 74 * rpc_xprt_switch_remove_xprt - Removes an rpc_xprt from a rpc_xprt_switch 75 * @xps: pointer to struct rpc_xprt_switch 76 * @xprt: pointer to struct rpc_xprt 77 * 78 * Removes xprt from the list of struct rpc_xprt in xps. 79 */ 80 void rpc_xprt_switch_remove_xprt(struct rpc_xprt_switch *xps, 81 struct rpc_xprt *xprt) 82 { 83 spin_lock(&xps->xps_lock); 84 xprt_switch_remove_xprt_locked(xps, xprt); 85 spin_unlock(&xps->xps_lock); 86 xprt_put(xprt); 87 } 88 89 /** 90 * xprt_switch_alloc - Allocate a new struct rpc_xprt_switch 91 * @xprt: pointer to struct rpc_xprt 92 * @gfp_flags: allocation flags 93 * 94 * On success, returns an initialised struct rpc_xprt_switch, containing 95 * the entry xprt. Returns NULL on failure. 96 */ 97 struct rpc_xprt_switch *xprt_switch_alloc(struct rpc_xprt *xprt, 98 gfp_t gfp_flags) 99 { 100 struct rpc_xprt_switch *xps; 101 102 xps = kmalloc(sizeof(*xps), gfp_flags); 103 if (xps != NULL) { 104 spin_lock_init(&xps->xps_lock); 105 kref_init(&xps->xps_kref); 106 xps->xps_nxprts = xps->xps_nactive = 0; 107 atomic_long_set(&xps->xps_queuelen, 0); 108 xps->xps_net = NULL; 109 INIT_LIST_HEAD(&xps->xps_xprt_list); 110 xps->xps_iter_ops = &rpc_xprt_iter_singular; 111 xprt_switch_add_xprt_locked(xps, xprt); 112 } 113 114 return xps; 115 } 116 117 static void xprt_switch_free_entries(struct rpc_xprt_switch *xps) 118 { 119 spin_lock(&xps->xps_lock); 120 while (!list_empty(&xps->xps_xprt_list)) { 121 struct rpc_xprt *xprt; 122 123 xprt = list_first_entry(&xps->xps_xprt_list, 124 struct rpc_xprt, xprt_switch); 125 xprt_switch_remove_xprt_locked(xps, xprt); 126 spin_unlock(&xps->xps_lock); 127 xprt_put(xprt); 128 spin_lock(&xps->xps_lock); 129 } 130 spin_unlock(&xps->xps_lock); 131 } 132 133 static void xprt_switch_free(struct kref *kref) 134 { 135 struct rpc_xprt_switch *xps = container_of(kref, 136 struct rpc_xprt_switch, xps_kref); 137 138 xprt_switch_free_entries(xps); 139 kfree_rcu(xps, xps_rcu); 140 } 141 142 /** 143 * xprt_switch_get - Return a reference to a rpc_xprt_switch 144 * @xps: pointer to struct rpc_xprt_switch 145 * 146 * Returns a reference to xps unless the refcount is already zero. 147 */ 148 struct rpc_xprt_switch *xprt_switch_get(struct rpc_xprt_switch *xps) 149 { 150 if (xps != NULL && kref_get_unless_zero(&xps->xps_kref)) 151 return xps; 152 return NULL; 153 } 154 155 /** 156 * xprt_switch_put - Release a reference to a rpc_xprt_switch 157 * @xps: pointer to struct rpc_xprt_switch 158 * 159 * Release the reference to xps, and free it once the refcount is zero. 160 */ 161 void xprt_switch_put(struct rpc_xprt_switch *xps) 162 { 163 if (xps != NULL) 164 kref_put(&xps->xps_kref, xprt_switch_free); 165 } 166 167 /** 168 * rpc_xprt_switch_set_roundrobin - Set a round-robin policy on rpc_xprt_switch 169 * @xps: pointer to struct rpc_xprt_switch 170 * 171 * Sets a round-robin default policy for iterators acting on xps. 172 */ 173 void rpc_xprt_switch_set_roundrobin(struct rpc_xprt_switch *xps) 174 { 175 if (READ_ONCE(xps->xps_iter_ops) != &rpc_xprt_iter_roundrobin) 176 WRITE_ONCE(xps->xps_iter_ops, &rpc_xprt_iter_roundrobin); 177 } 178 179 static 180 const struct rpc_xprt_iter_ops *xprt_iter_ops(const struct rpc_xprt_iter *xpi) 181 { 182 if (xpi->xpi_ops != NULL) 183 return xpi->xpi_ops; 184 return rcu_dereference(xpi->xpi_xpswitch)->xps_iter_ops; 185 } 186 187 static 188 void xprt_iter_no_rewind(struct rpc_xprt_iter *xpi) 189 { 190 } 191 192 static 193 void xprt_iter_default_rewind(struct rpc_xprt_iter *xpi) 194 { 195 WRITE_ONCE(xpi->xpi_cursor, NULL); 196 } 197 198 static 199 bool xprt_is_active(const struct rpc_xprt *xprt) 200 { 201 return kref_read(&xprt->kref) != 0; 202 } 203 204 static 205 struct rpc_xprt *xprt_switch_find_first_entry(struct list_head *head) 206 { 207 struct rpc_xprt *pos; 208 209 list_for_each_entry_rcu(pos, head, xprt_switch) { 210 if (xprt_is_active(pos)) 211 return pos; 212 } 213 return NULL; 214 } 215 216 static 217 struct rpc_xprt *xprt_iter_first_entry(struct rpc_xprt_iter *xpi) 218 { 219 struct rpc_xprt_switch *xps = rcu_dereference(xpi->xpi_xpswitch); 220 221 if (xps == NULL) 222 return NULL; 223 return xprt_switch_find_first_entry(&xps->xps_xprt_list); 224 } 225 226 static 227 struct rpc_xprt *xprt_switch_find_current_entry(struct list_head *head, 228 const struct rpc_xprt *cur) 229 { 230 struct rpc_xprt *pos; 231 bool found = false; 232 233 list_for_each_entry_rcu(pos, head, xprt_switch) { 234 if (cur == pos) 235 found = true; 236 if (found && xprt_is_active(pos)) 237 return pos; 238 } 239 return NULL; 240 } 241 242 static 243 struct rpc_xprt *xprt_iter_current_entry(struct rpc_xprt_iter *xpi) 244 { 245 struct rpc_xprt_switch *xps = rcu_dereference(xpi->xpi_xpswitch); 246 struct list_head *head; 247 248 if (xps == NULL) 249 return NULL; 250 head = &xps->xps_xprt_list; 251 if (xpi->xpi_cursor == NULL || xps->xps_nxprts < 2) 252 return xprt_switch_find_first_entry(head); 253 return xprt_switch_find_current_entry(head, xpi->xpi_cursor); 254 } 255 256 bool rpc_xprt_switch_has_addr(struct rpc_xprt_switch *xps, 257 const struct sockaddr *sap) 258 { 259 struct list_head *head; 260 struct rpc_xprt *pos; 261 262 if (xps == NULL || sap == NULL) 263 return false; 264 265 head = &xps->xps_xprt_list; 266 list_for_each_entry_rcu(pos, head, xprt_switch) { 267 if (rpc_cmp_addr_port(sap, (struct sockaddr *)&pos->addr)) { 268 pr_info("RPC: addr %s already in xprt switch\n", 269 pos->address_strings[RPC_DISPLAY_ADDR]); 270 return true; 271 } 272 } 273 return false; 274 } 275 276 static 277 struct rpc_xprt *xprt_switch_find_next_entry(struct list_head *head, 278 const struct rpc_xprt *cur) 279 { 280 struct rpc_xprt *pos, *prev = NULL; 281 bool found = false; 282 283 list_for_each_entry_rcu(pos, head, xprt_switch) { 284 if (cur == prev) 285 found = true; 286 if (found && xprt_is_active(pos)) 287 return pos; 288 prev = pos; 289 } 290 return NULL; 291 } 292 293 static 294 struct rpc_xprt *xprt_switch_set_next_cursor(struct rpc_xprt_switch *xps, 295 struct rpc_xprt **cursor, 296 xprt_switch_find_xprt_t find_next) 297 { 298 struct rpc_xprt *pos, *old; 299 300 old = smp_load_acquire(cursor); 301 pos = find_next(xps, old); 302 smp_store_release(cursor, pos); 303 return pos; 304 } 305 306 static 307 struct rpc_xprt *xprt_iter_next_entry_multiple(struct rpc_xprt_iter *xpi, 308 xprt_switch_find_xprt_t find_next) 309 { 310 struct rpc_xprt_switch *xps = rcu_dereference(xpi->xpi_xpswitch); 311 312 if (xps == NULL) 313 return NULL; 314 return xprt_switch_set_next_cursor(xps, &xpi->xpi_cursor, find_next); 315 } 316 317 static 318 struct rpc_xprt *__xprt_switch_find_next_entry_roundrobin(struct list_head *head, 319 const struct rpc_xprt *cur) 320 { 321 struct rpc_xprt *ret; 322 323 ret = xprt_switch_find_next_entry(head, cur); 324 if (ret != NULL) 325 return ret; 326 return xprt_switch_find_first_entry(head); 327 } 328 329 static 330 struct rpc_xprt *xprt_switch_find_next_entry_roundrobin(struct rpc_xprt_switch *xps, 331 const struct rpc_xprt *cur) 332 { 333 struct list_head *head = &xps->xps_xprt_list; 334 struct rpc_xprt *xprt; 335 unsigned int nactive; 336 337 for (;;) { 338 unsigned long xprt_queuelen, xps_queuelen; 339 340 xprt = __xprt_switch_find_next_entry_roundrobin(head, cur); 341 if (!xprt) 342 break; 343 xprt_queuelen = atomic_long_read(&xprt->queuelen); 344 xps_queuelen = atomic_long_read(&xps->xps_queuelen); 345 nactive = READ_ONCE(xps->xps_nactive); 346 /* Exit loop if xprt_queuelen <= average queue length */ 347 if (xprt_queuelen * nactive <= xps_queuelen) 348 break; 349 cur = xprt; 350 } 351 return xprt; 352 } 353 354 static 355 struct rpc_xprt *xprt_iter_next_entry_roundrobin(struct rpc_xprt_iter *xpi) 356 { 357 return xprt_iter_next_entry_multiple(xpi, 358 xprt_switch_find_next_entry_roundrobin); 359 } 360 361 static 362 struct rpc_xprt *xprt_switch_find_next_entry_all(struct rpc_xprt_switch *xps, 363 const struct rpc_xprt *cur) 364 { 365 return xprt_switch_find_next_entry(&xps->xps_xprt_list, cur); 366 } 367 368 static 369 struct rpc_xprt *xprt_iter_next_entry_all(struct rpc_xprt_iter *xpi) 370 { 371 return xprt_iter_next_entry_multiple(xpi, 372 xprt_switch_find_next_entry_all); 373 } 374 375 /* 376 * xprt_iter_rewind - Resets the xprt iterator 377 * @xpi: pointer to rpc_xprt_iter 378 * 379 * Resets xpi to ensure that it points to the first entry in the list 380 * of transports. 381 */ 382 static 383 void xprt_iter_rewind(struct rpc_xprt_iter *xpi) 384 { 385 rcu_read_lock(); 386 xprt_iter_ops(xpi)->xpi_rewind(xpi); 387 rcu_read_unlock(); 388 } 389 390 static void __xprt_iter_init(struct rpc_xprt_iter *xpi, 391 struct rpc_xprt_switch *xps, 392 const struct rpc_xprt_iter_ops *ops) 393 { 394 rcu_assign_pointer(xpi->xpi_xpswitch, xprt_switch_get(xps)); 395 xpi->xpi_cursor = NULL; 396 xpi->xpi_ops = ops; 397 } 398 399 /** 400 * xprt_iter_init - Initialise an xprt iterator 401 * @xpi: pointer to rpc_xprt_iter 402 * @xps: pointer to rpc_xprt_switch 403 * 404 * Initialises the iterator to use the default iterator ops 405 * as set in xps. This function is mainly intended for internal 406 * use in the rpc_client. 407 */ 408 void xprt_iter_init(struct rpc_xprt_iter *xpi, 409 struct rpc_xprt_switch *xps) 410 { 411 __xprt_iter_init(xpi, xps, NULL); 412 } 413 414 /** 415 * xprt_iter_init_listall - Initialise an xprt iterator 416 * @xpi: pointer to rpc_xprt_iter 417 * @xps: pointer to rpc_xprt_switch 418 * 419 * Initialises the iterator to iterate once through the entire list 420 * of entries in xps. 421 */ 422 void xprt_iter_init_listall(struct rpc_xprt_iter *xpi, 423 struct rpc_xprt_switch *xps) 424 { 425 __xprt_iter_init(xpi, xps, &rpc_xprt_iter_listall); 426 } 427 428 /** 429 * xprt_iter_xchg_switch - Atomically swap out the rpc_xprt_switch 430 * @xpi: pointer to rpc_xprt_iter 431 * @newswitch: pointer to a new rpc_xprt_switch or NULL 432 * 433 * Swaps out the existing xpi->xpi_xpswitch with a new value. 434 */ 435 struct rpc_xprt_switch *xprt_iter_xchg_switch(struct rpc_xprt_iter *xpi, 436 struct rpc_xprt_switch *newswitch) 437 { 438 struct rpc_xprt_switch __rcu *oldswitch; 439 440 /* Atomically swap out the old xpswitch */ 441 oldswitch = xchg(&xpi->xpi_xpswitch, RCU_INITIALIZER(newswitch)); 442 if (newswitch != NULL) 443 xprt_iter_rewind(xpi); 444 return rcu_dereference_protected(oldswitch, true); 445 } 446 447 /** 448 * xprt_iter_destroy - Destroys the xprt iterator 449 * @xpi: pointer to rpc_xprt_iter 450 */ 451 void xprt_iter_destroy(struct rpc_xprt_iter *xpi) 452 { 453 xprt_switch_put(xprt_iter_xchg_switch(xpi, NULL)); 454 } 455 456 /** 457 * xprt_iter_xprt - Returns the rpc_xprt pointed to by the cursor 458 * @xpi: pointer to rpc_xprt_iter 459 * 460 * Returns a pointer to the struct rpc_xprt that is currently 461 * pointed to by the cursor. 462 * Caller must be holding rcu_read_lock(). 463 */ 464 struct rpc_xprt *xprt_iter_xprt(struct rpc_xprt_iter *xpi) 465 { 466 WARN_ON_ONCE(!rcu_read_lock_held()); 467 return xprt_iter_ops(xpi)->xpi_xprt(xpi); 468 } 469 470 static 471 struct rpc_xprt *xprt_iter_get_helper(struct rpc_xprt_iter *xpi, 472 struct rpc_xprt *(*fn)(struct rpc_xprt_iter *)) 473 { 474 struct rpc_xprt *ret; 475 476 do { 477 ret = fn(xpi); 478 if (ret == NULL) 479 break; 480 ret = xprt_get(ret); 481 } while (ret == NULL); 482 return ret; 483 } 484 485 /** 486 * xprt_iter_get_xprt - Returns the rpc_xprt pointed to by the cursor 487 * @xpi: pointer to rpc_xprt_iter 488 * 489 * Returns a reference to the struct rpc_xprt that is currently 490 * pointed to by the cursor. 491 */ 492 struct rpc_xprt *xprt_iter_get_xprt(struct rpc_xprt_iter *xpi) 493 { 494 struct rpc_xprt *xprt; 495 496 rcu_read_lock(); 497 xprt = xprt_iter_get_helper(xpi, xprt_iter_ops(xpi)->xpi_xprt); 498 rcu_read_unlock(); 499 return xprt; 500 } 501 502 /** 503 * xprt_iter_get_next - Returns the next rpc_xprt following the cursor 504 * @xpi: pointer to rpc_xprt_iter 505 * 506 * Returns a reference to the struct rpc_xprt that immediately follows the 507 * entry pointed to by the cursor. 508 */ 509 struct rpc_xprt *xprt_iter_get_next(struct rpc_xprt_iter *xpi) 510 { 511 struct rpc_xprt *xprt; 512 513 rcu_read_lock(); 514 xprt = xprt_iter_get_helper(xpi, xprt_iter_ops(xpi)->xpi_next); 515 rcu_read_unlock(); 516 return xprt; 517 } 518 519 /* Policy for always returning the first entry in the rpc_xprt_switch */ 520 static 521 const struct rpc_xprt_iter_ops rpc_xprt_iter_singular = { 522 .xpi_rewind = xprt_iter_no_rewind, 523 .xpi_xprt = xprt_iter_first_entry, 524 .xpi_next = xprt_iter_first_entry, 525 }; 526 527 /* Policy for round-robin iteration of entries in the rpc_xprt_switch */ 528 static 529 const struct rpc_xprt_iter_ops rpc_xprt_iter_roundrobin = { 530 .xpi_rewind = xprt_iter_default_rewind, 531 .xpi_xprt = xprt_iter_current_entry, 532 .xpi_next = xprt_iter_next_entry_roundrobin, 533 }; 534 535 /* Policy for once-through iteration of entries in the rpc_xprt_switch */ 536 static 537 const struct rpc_xprt_iter_ops rpc_xprt_iter_listall = { 538 .xpi_rewind = xprt_iter_default_rewind, 539 .xpi_xprt = xprt_iter_current_entry, 540 .xpi_next = xprt_iter_next_entry_all, 541 }; 542