1 // SPDX-License-Identifier: GPL-2.0 2 /* 3 * Multipath support for RPC 4 * 5 * Copyright (c) 2015, 2016, Primary Data, Inc. All rights reserved. 6 * 7 * Trond Myklebust <trond.myklebust@primarydata.com> 8 * 9 */ 10 #include <linux/types.h> 11 #include <linux/kref.h> 12 #include <linux/list.h> 13 #include <linux/rcupdate.h> 14 #include <linux/rculist.h> 15 #include <linux/slab.h> 16 #include <asm/cmpxchg.h> 17 #include <linux/spinlock.h> 18 #include <linux/sunrpc/xprt.h> 19 #include <linux/sunrpc/addr.h> 20 #include <linux/sunrpc/xprtmultipath.h> 21 22 typedef struct rpc_xprt *(*xprt_switch_find_xprt_t)(struct list_head *head, 23 const struct rpc_xprt *cur); 24 25 static const struct rpc_xprt_iter_ops rpc_xprt_iter_singular; 26 static const struct rpc_xprt_iter_ops rpc_xprt_iter_roundrobin; 27 static const struct rpc_xprt_iter_ops rpc_xprt_iter_listall; 28 29 static void xprt_switch_add_xprt_locked(struct rpc_xprt_switch *xps, 30 struct rpc_xprt *xprt) 31 { 32 if (unlikely(xprt_get(xprt) == NULL)) 33 return; 34 list_add_tail_rcu(&xprt->xprt_switch, &xps->xps_xprt_list); 35 smp_wmb(); 36 if (xps->xps_nxprts == 0) 37 xps->xps_net = xprt->xprt_net; 38 xps->xps_nxprts++; 39 xps->xps_nactive++; 40 } 41 42 /** 43 * rpc_xprt_switch_add_xprt - Add a new rpc_xprt to an rpc_xprt_switch 44 * @xps: pointer to struct rpc_xprt_switch 45 * @xprt: pointer to struct rpc_xprt 46 * 47 * Adds xprt to the end of the list of struct rpc_xprt in xps. 48 */ 49 void rpc_xprt_switch_add_xprt(struct rpc_xprt_switch *xps, 50 struct rpc_xprt *xprt) 51 { 52 if (xprt == NULL) 53 return; 54 spin_lock(&xps->xps_lock); 55 if (xps->xps_net == xprt->xprt_net || xps->xps_net == NULL) 56 xprt_switch_add_xprt_locked(xps, xprt); 57 spin_unlock(&xps->xps_lock); 58 } 59 60 static void xprt_switch_remove_xprt_locked(struct rpc_xprt_switch *xps, 61 struct rpc_xprt *xprt) 62 { 63 if (unlikely(xprt == NULL)) 64 return; 65 xps->xps_nactive--; 66 xps->xps_nxprts--; 67 if (xps->xps_nxprts == 0) 68 xps->xps_net = NULL; 69 smp_wmb(); 70 list_del_rcu(&xprt->xprt_switch); 71 } 72 73 /** 74 * rpc_xprt_switch_remove_xprt - Removes an rpc_xprt from a rpc_xprt_switch 75 * @xps: pointer to struct rpc_xprt_switch 76 * @xprt: pointer to struct rpc_xprt 77 * 78 * Removes xprt from the list of struct rpc_xprt in xps. 79 */ 80 void rpc_xprt_switch_remove_xprt(struct rpc_xprt_switch *xps, 81 struct rpc_xprt *xprt) 82 { 83 spin_lock(&xps->xps_lock); 84 xprt_switch_remove_xprt_locked(xps, xprt); 85 spin_unlock(&xps->xps_lock); 86 xprt_put(xprt); 87 } 88 89 /** 90 * xprt_switch_alloc - Allocate a new struct rpc_xprt_switch 91 * @xprt: pointer to struct rpc_xprt 92 * @gfp_flags: allocation flags 93 * 94 * On success, returns an initialised struct rpc_xprt_switch, containing 95 * the entry xprt. Returns NULL on failure. 96 */ 97 struct rpc_xprt_switch *xprt_switch_alloc(struct rpc_xprt *xprt, 98 gfp_t gfp_flags) 99 { 100 struct rpc_xprt_switch *xps; 101 102 xps = kmalloc(sizeof(*xps), gfp_flags); 103 if (xps != NULL) { 104 spin_lock_init(&xps->xps_lock); 105 kref_init(&xps->xps_kref); 106 xps->xps_nxprts = xps->xps_nactive = 0; 107 atomic_long_set(&xps->xps_queuelen, 0); 108 xps->xps_net = NULL; 109 INIT_LIST_HEAD(&xps->xps_xprt_list); 110 xps->xps_iter_ops = &rpc_xprt_iter_singular; 111 xprt_switch_add_xprt_locked(xps, xprt); 112 } 113 114 return xps; 115 } 116 117 static void xprt_switch_free_entries(struct rpc_xprt_switch *xps) 118 { 119 spin_lock(&xps->xps_lock); 120 while (!list_empty(&xps->xps_xprt_list)) { 121 struct rpc_xprt *xprt; 122 123 xprt = list_first_entry(&xps->xps_xprt_list, 124 struct rpc_xprt, xprt_switch); 125 xprt_switch_remove_xprt_locked(xps, xprt); 126 spin_unlock(&xps->xps_lock); 127 xprt_put(xprt); 128 spin_lock(&xps->xps_lock); 129 } 130 spin_unlock(&xps->xps_lock); 131 } 132 133 static void xprt_switch_free(struct kref *kref) 134 { 135 struct rpc_xprt_switch *xps = container_of(kref, 136 struct rpc_xprt_switch, xps_kref); 137 138 xprt_switch_free_entries(xps); 139 kfree_rcu(xps, xps_rcu); 140 } 141 142 /** 143 * xprt_switch_get - Return a reference to a rpc_xprt_switch 144 * @xps: pointer to struct rpc_xprt_switch 145 * 146 * Returns a reference to xps unless the refcount is already zero. 147 */ 148 struct rpc_xprt_switch *xprt_switch_get(struct rpc_xprt_switch *xps) 149 { 150 if (xps != NULL && kref_get_unless_zero(&xps->xps_kref)) 151 return xps; 152 return NULL; 153 } 154 155 /** 156 * xprt_switch_put - Release a reference to a rpc_xprt_switch 157 * @xps: pointer to struct rpc_xprt_switch 158 * 159 * Release the reference to xps, and free it once the refcount is zero. 160 */ 161 void xprt_switch_put(struct rpc_xprt_switch *xps) 162 { 163 if (xps != NULL) 164 kref_put(&xps->xps_kref, xprt_switch_free); 165 } 166 167 /** 168 * rpc_xprt_switch_set_roundrobin - Set a round-robin policy on rpc_xprt_switch 169 * @xps: pointer to struct rpc_xprt_switch 170 * 171 * Sets a round-robin default policy for iterators acting on xps. 172 */ 173 void rpc_xprt_switch_set_roundrobin(struct rpc_xprt_switch *xps) 174 { 175 if (READ_ONCE(xps->xps_iter_ops) != &rpc_xprt_iter_roundrobin) 176 WRITE_ONCE(xps->xps_iter_ops, &rpc_xprt_iter_roundrobin); 177 } 178 179 static 180 const struct rpc_xprt_iter_ops *xprt_iter_ops(const struct rpc_xprt_iter *xpi) 181 { 182 if (xpi->xpi_ops != NULL) 183 return xpi->xpi_ops; 184 return rcu_dereference(xpi->xpi_xpswitch)->xps_iter_ops; 185 } 186 187 static 188 void xprt_iter_no_rewind(struct rpc_xprt_iter *xpi) 189 { 190 } 191 192 static 193 void xprt_iter_default_rewind(struct rpc_xprt_iter *xpi) 194 { 195 WRITE_ONCE(xpi->xpi_cursor, NULL); 196 } 197 198 static 199 bool xprt_is_active(const struct rpc_xprt *xprt) 200 { 201 return kref_read(&xprt->kref) != 0; 202 } 203 204 static 205 struct rpc_xprt *xprt_switch_find_first_entry(struct list_head *head) 206 { 207 struct rpc_xprt *pos; 208 209 list_for_each_entry_rcu(pos, head, xprt_switch) { 210 if (xprt_is_active(pos)) 211 return pos; 212 } 213 return NULL; 214 } 215 216 static 217 struct rpc_xprt *xprt_iter_first_entry(struct rpc_xprt_iter *xpi) 218 { 219 struct rpc_xprt_switch *xps = rcu_dereference(xpi->xpi_xpswitch); 220 221 if (xps == NULL) 222 return NULL; 223 return xprt_switch_find_first_entry(&xps->xps_xprt_list); 224 } 225 226 static 227 struct rpc_xprt *xprt_switch_find_current_entry(struct list_head *head, 228 const struct rpc_xprt *cur) 229 { 230 struct rpc_xprt *pos; 231 bool found = false; 232 233 list_for_each_entry_rcu(pos, head, xprt_switch) { 234 if (cur == pos) 235 found = true; 236 if (found && xprt_is_active(pos)) 237 return pos; 238 } 239 return NULL; 240 } 241 242 static 243 struct rpc_xprt *xprt_iter_current_entry(struct rpc_xprt_iter *xpi) 244 { 245 struct rpc_xprt_switch *xps = rcu_dereference(xpi->xpi_xpswitch); 246 struct list_head *head; 247 248 if (xps == NULL) 249 return NULL; 250 head = &xps->xps_xprt_list; 251 if (xpi->xpi_cursor == NULL || xps->xps_nxprts < 2) 252 return xprt_switch_find_first_entry(head); 253 return xprt_switch_find_current_entry(head, xpi->xpi_cursor); 254 } 255 256 bool rpc_xprt_switch_has_addr(struct rpc_xprt_switch *xps, 257 const struct sockaddr *sap) 258 { 259 struct list_head *head; 260 struct rpc_xprt *pos; 261 262 if (xps == NULL || sap == NULL) 263 return false; 264 265 head = &xps->xps_xprt_list; 266 list_for_each_entry_rcu(pos, head, xprt_switch) { 267 if (rpc_cmp_addr_port(sap, (struct sockaddr *)&pos->addr)) { 268 pr_info("RPC: addr %s already in xprt switch\n", 269 pos->address_strings[RPC_DISPLAY_ADDR]); 270 return true; 271 } 272 } 273 return false; 274 } 275 276 static 277 struct rpc_xprt *xprt_switch_find_next_entry(struct list_head *head, 278 const struct rpc_xprt *cur) 279 { 280 struct rpc_xprt *pos, *prev = NULL; 281 bool found = false; 282 283 list_for_each_entry_rcu(pos, head, xprt_switch) { 284 if (cur == prev) 285 found = true; 286 if (found && xprt_is_active(pos)) 287 return pos; 288 prev = pos; 289 } 290 return NULL; 291 } 292 293 static 294 struct rpc_xprt *xprt_switch_set_next_cursor(struct list_head *head, 295 struct rpc_xprt **cursor, 296 xprt_switch_find_xprt_t find_next) 297 { 298 struct rpc_xprt *cur, *pos, *old; 299 300 cur = READ_ONCE(*cursor); 301 for (;;) { 302 old = cur; 303 pos = find_next(head, old); 304 if (pos == NULL) 305 break; 306 cur = cmpxchg_relaxed(cursor, old, pos); 307 if (cur == old) 308 break; 309 } 310 return pos; 311 } 312 313 static 314 struct rpc_xprt *xprt_iter_next_entry_multiple(struct rpc_xprt_iter *xpi, 315 xprt_switch_find_xprt_t find_next) 316 { 317 struct rpc_xprt_switch *xps = rcu_dereference(xpi->xpi_xpswitch); 318 319 if (xps == NULL) 320 return NULL; 321 return xprt_switch_set_next_cursor(&xps->xps_xprt_list, 322 &xpi->xpi_cursor, 323 find_next); 324 } 325 326 static 327 struct rpc_xprt *xprt_switch_find_next_entry_roundrobin(struct list_head *head, 328 const struct rpc_xprt *cur) 329 { 330 struct rpc_xprt *ret; 331 332 ret = xprt_switch_find_next_entry(head, cur); 333 if (ret != NULL) 334 return ret; 335 return xprt_switch_find_first_entry(head); 336 } 337 338 static 339 struct rpc_xprt *xprt_iter_next_entry_roundrobin(struct rpc_xprt_iter *xpi) 340 { 341 struct rpc_xprt_switch *xps = rcu_dereference(xpi->xpi_xpswitch); 342 struct rpc_xprt *xprt; 343 unsigned long xprt_queuelen; 344 unsigned long xps_queuelen; 345 346 do { 347 xprt = xprt_iter_next_entry_multiple(xpi, 348 xprt_switch_find_next_entry_roundrobin); 349 if (xprt == NULL) 350 break; 351 xprt_queuelen = atomic_long_read(&xprt->queuelen); 352 if (xprt_queuelen <= 2) 353 break; 354 xps_queuelen = atomic_long_read(&xps->xps_queuelen); 355 /* Exit loop if xprt_queuelen <= average queue length */ 356 } while (xprt_queuelen * READ_ONCE(xps->xps_nactive) > xps_queuelen); 357 return xprt; 358 } 359 360 static 361 struct rpc_xprt *xprt_iter_next_entry_all(struct rpc_xprt_iter *xpi) 362 { 363 return xprt_iter_next_entry_multiple(xpi, xprt_switch_find_next_entry); 364 } 365 366 /* 367 * xprt_iter_rewind - Resets the xprt iterator 368 * @xpi: pointer to rpc_xprt_iter 369 * 370 * Resets xpi to ensure that it points to the first entry in the list 371 * of transports. 372 */ 373 static 374 void xprt_iter_rewind(struct rpc_xprt_iter *xpi) 375 { 376 rcu_read_lock(); 377 xprt_iter_ops(xpi)->xpi_rewind(xpi); 378 rcu_read_unlock(); 379 } 380 381 static void __xprt_iter_init(struct rpc_xprt_iter *xpi, 382 struct rpc_xprt_switch *xps, 383 const struct rpc_xprt_iter_ops *ops) 384 { 385 rcu_assign_pointer(xpi->xpi_xpswitch, xprt_switch_get(xps)); 386 xpi->xpi_cursor = NULL; 387 xpi->xpi_ops = ops; 388 } 389 390 /** 391 * xprt_iter_init - Initialise an xprt iterator 392 * @xpi: pointer to rpc_xprt_iter 393 * @xps: pointer to rpc_xprt_switch 394 * 395 * Initialises the iterator to use the default iterator ops 396 * as set in xps. This function is mainly intended for internal 397 * use in the rpc_client. 398 */ 399 void xprt_iter_init(struct rpc_xprt_iter *xpi, 400 struct rpc_xprt_switch *xps) 401 { 402 __xprt_iter_init(xpi, xps, NULL); 403 } 404 405 /** 406 * xprt_iter_init_listall - Initialise an xprt iterator 407 * @xpi: pointer to rpc_xprt_iter 408 * @xps: pointer to rpc_xprt_switch 409 * 410 * Initialises the iterator to iterate once through the entire list 411 * of entries in xps. 412 */ 413 void xprt_iter_init_listall(struct rpc_xprt_iter *xpi, 414 struct rpc_xprt_switch *xps) 415 { 416 __xprt_iter_init(xpi, xps, &rpc_xprt_iter_listall); 417 } 418 419 /** 420 * xprt_iter_xchg_switch - Atomically swap out the rpc_xprt_switch 421 * @xpi: pointer to rpc_xprt_iter 422 * @newswitch: pointer to a new rpc_xprt_switch or NULL 423 * 424 * Swaps out the existing xpi->xpi_xpswitch with a new value. 425 */ 426 struct rpc_xprt_switch *xprt_iter_xchg_switch(struct rpc_xprt_iter *xpi, 427 struct rpc_xprt_switch *newswitch) 428 { 429 struct rpc_xprt_switch __rcu *oldswitch; 430 431 /* Atomically swap out the old xpswitch */ 432 oldswitch = xchg(&xpi->xpi_xpswitch, RCU_INITIALIZER(newswitch)); 433 if (newswitch != NULL) 434 xprt_iter_rewind(xpi); 435 return rcu_dereference_protected(oldswitch, true); 436 } 437 438 /** 439 * xprt_iter_destroy - Destroys the xprt iterator 440 * @xpi: pointer to rpc_xprt_iter 441 */ 442 void xprt_iter_destroy(struct rpc_xprt_iter *xpi) 443 { 444 xprt_switch_put(xprt_iter_xchg_switch(xpi, NULL)); 445 } 446 447 /** 448 * xprt_iter_xprt - Returns the rpc_xprt pointed to by the cursor 449 * @xpi: pointer to rpc_xprt_iter 450 * 451 * Returns a pointer to the struct rpc_xprt that is currently 452 * pointed to by the cursor. 453 * Caller must be holding rcu_read_lock(). 454 */ 455 struct rpc_xprt *xprt_iter_xprt(struct rpc_xprt_iter *xpi) 456 { 457 WARN_ON_ONCE(!rcu_read_lock_held()); 458 return xprt_iter_ops(xpi)->xpi_xprt(xpi); 459 } 460 461 static 462 struct rpc_xprt *xprt_iter_get_helper(struct rpc_xprt_iter *xpi, 463 struct rpc_xprt *(*fn)(struct rpc_xprt_iter *)) 464 { 465 struct rpc_xprt *ret; 466 467 do { 468 ret = fn(xpi); 469 if (ret == NULL) 470 break; 471 ret = xprt_get(ret); 472 } while (ret == NULL); 473 return ret; 474 } 475 476 /** 477 * xprt_iter_get_xprt - Returns the rpc_xprt pointed to by the cursor 478 * @xpi: pointer to rpc_xprt_iter 479 * 480 * Returns a reference to the struct rpc_xprt that is currently 481 * pointed to by the cursor. 482 */ 483 struct rpc_xprt *xprt_iter_get_xprt(struct rpc_xprt_iter *xpi) 484 { 485 struct rpc_xprt *xprt; 486 487 rcu_read_lock(); 488 xprt = xprt_iter_get_helper(xpi, xprt_iter_ops(xpi)->xpi_xprt); 489 rcu_read_unlock(); 490 return xprt; 491 } 492 493 /** 494 * xprt_iter_get_next - Returns the next rpc_xprt following the cursor 495 * @xpi: pointer to rpc_xprt_iter 496 * 497 * Returns a reference to the struct rpc_xprt that immediately follows the 498 * entry pointed to by the cursor. 499 */ 500 struct rpc_xprt *xprt_iter_get_next(struct rpc_xprt_iter *xpi) 501 { 502 struct rpc_xprt *xprt; 503 504 rcu_read_lock(); 505 xprt = xprt_iter_get_helper(xpi, xprt_iter_ops(xpi)->xpi_next); 506 rcu_read_unlock(); 507 return xprt; 508 } 509 510 /* Policy for always returning the first entry in the rpc_xprt_switch */ 511 static 512 const struct rpc_xprt_iter_ops rpc_xprt_iter_singular = { 513 .xpi_rewind = xprt_iter_no_rewind, 514 .xpi_xprt = xprt_iter_first_entry, 515 .xpi_next = xprt_iter_first_entry, 516 }; 517 518 /* Policy for round-robin iteration of entries in the rpc_xprt_switch */ 519 static 520 const struct rpc_xprt_iter_ops rpc_xprt_iter_roundrobin = { 521 .xpi_rewind = xprt_iter_default_rewind, 522 .xpi_xprt = xprt_iter_current_entry, 523 .xpi_next = xprt_iter_next_entry_roundrobin, 524 }; 525 526 /* Policy for once-through iteration of entries in the rpc_xprt_switch */ 527 static 528 const struct rpc_xprt_iter_ops rpc_xprt_iter_listall = { 529 .xpi_rewind = xprt_iter_default_rewind, 530 .xpi_xprt = xprt_iter_current_entry, 531 .xpi_next = xprt_iter_next_entry_all, 532 }; 533