1 // SPDX-License-Identifier: GPL-2.0 2 /* 3 * Multipath support for RPC 4 * 5 * Copyright (c) 2015, 2016, Primary Data, Inc. All rights reserved. 6 * 7 * Trond Myklebust <trond.myklebust@primarydata.com> 8 * 9 */ 10 #include <linux/types.h> 11 #include <linux/kref.h> 12 #include <linux/list.h> 13 #include <linux/rcupdate.h> 14 #include <linux/rculist.h> 15 #include <linux/slab.h> 16 #include <asm/cmpxchg.h> 17 #include <linux/spinlock.h> 18 #include <linux/sunrpc/xprt.h> 19 #include <linux/sunrpc/addr.h> 20 #include <linux/sunrpc/xprtmultipath.h> 21 22 typedef struct rpc_xprt *(*xprt_switch_find_xprt_t)(struct list_head *head, 23 const struct rpc_xprt *cur); 24 25 static const struct rpc_xprt_iter_ops rpc_xprt_iter_singular; 26 static const struct rpc_xprt_iter_ops rpc_xprt_iter_roundrobin; 27 static const struct rpc_xprt_iter_ops rpc_xprt_iter_listall; 28 29 static void xprt_switch_add_xprt_locked(struct rpc_xprt_switch *xps, 30 struct rpc_xprt *xprt) 31 { 32 if (unlikely(xprt_get(xprt) == NULL)) 33 return; 34 list_add_tail_rcu(&xprt->xprt_switch, &xps->xps_xprt_list); 35 smp_wmb(); 36 if (xps->xps_nxprts == 0) 37 xps->xps_net = xprt->xprt_net; 38 xps->xps_nxprts++; 39 xps->xps_nactive++; 40 } 41 42 /** 43 * rpc_xprt_switch_add_xprt - Add a new rpc_xprt to an rpc_xprt_switch 44 * @xps: pointer to struct rpc_xprt_switch 45 * @xprt: pointer to struct rpc_xprt 46 * 47 * Adds xprt to the end of the list of struct rpc_xprt in xps. 48 */ 49 void rpc_xprt_switch_add_xprt(struct rpc_xprt_switch *xps, 50 struct rpc_xprt *xprt) 51 { 52 if (xprt == NULL) 53 return; 54 spin_lock(&xps->xps_lock); 55 if (xps->xps_net == xprt->xprt_net || xps->xps_net == NULL) 56 xprt_switch_add_xprt_locked(xps, xprt); 57 spin_unlock(&xps->xps_lock); 58 } 59 60 static void xprt_switch_remove_xprt_locked(struct rpc_xprt_switch *xps, 61 struct rpc_xprt *xprt) 62 { 63 if (unlikely(xprt == NULL)) 64 return; 65 xps->xps_nactive--; 66 xps->xps_nxprts--; 67 if (xps->xps_nxprts == 0) 68 xps->xps_net = NULL; 69 smp_wmb(); 70 list_del_rcu(&xprt->xprt_switch); 71 } 72 73 /** 74 * rpc_xprt_switch_remove_xprt - Removes an rpc_xprt from a rpc_xprt_switch 75 * @xps: pointer to struct rpc_xprt_switch 76 * @xprt: pointer to struct rpc_xprt 77 * 78 * Removes xprt from the list of struct rpc_xprt in xps. 79 */ 80 void rpc_xprt_switch_remove_xprt(struct rpc_xprt_switch *xps, 81 struct rpc_xprt *xprt) 82 { 83 spin_lock(&xps->xps_lock); 84 xprt_switch_remove_xprt_locked(xps, xprt); 85 spin_unlock(&xps->xps_lock); 86 xprt_put(xprt); 87 } 88 89 /** 90 * xprt_switch_alloc - Allocate a new struct rpc_xprt_switch 91 * @xprt: pointer to struct rpc_xprt 92 * @gfp_flags: allocation flags 93 * 94 * On success, returns an initialised struct rpc_xprt_switch, containing 95 * the entry xprt. Returns NULL on failure. 96 */ 97 struct rpc_xprt_switch *xprt_switch_alloc(struct rpc_xprt *xprt, 98 gfp_t gfp_flags) 99 { 100 struct rpc_xprt_switch *xps; 101 102 xps = kmalloc(sizeof(*xps), gfp_flags); 103 if (xps != NULL) { 104 spin_lock_init(&xps->xps_lock); 105 kref_init(&xps->xps_kref); 106 xps->xps_nxprts = 0; 107 INIT_LIST_HEAD(&xps->xps_xprt_list); 108 xps->xps_iter_ops = &rpc_xprt_iter_singular; 109 xprt_switch_add_xprt_locked(xps, xprt); 110 } 111 112 return xps; 113 } 114 115 static void xprt_switch_free_entries(struct rpc_xprt_switch *xps) 116 { 117 spin_lock(&xps->xps_lock); 118 while (!list_empty(&xps->xps_xprt_list)) { 119 struct rpc_xprt *xprt; 120 121 xprt = list_first_entry(&xps->xps_xprt_list, 122 struct rpc_xprt, xprt_switch); 123 xprt_switch_remove_xprt_locked(xps, xprt); 124 spin_unlock(&xps->xps_lock); 125 xprt_put(xprt); 126 spin_lock(&xps->xps_lock); 127 } 128 spin_unlock(&xps->xps_lock); 129 } 130 131 static void xprt_switch_free(struct kref *kref) 132 { 133 struct rpc_xprt_switch *xps = container_of(kref, 134 struct rpc_xprt_switch, xps_kref); 135 136 xprt_switch_free_entries(xps); 137 kfree_rcu(xps, xps_rcu); 138 } 139 140 /** 141 * xprt_switch_get - Return a reference to a rpc_xprt_switch 142 * @xps: pointer to struct rpc_xprt_switch 143 * 144 * Returns a reference to xps unless the refcount is already zero. 145 */ 146 struct rpc_xprt_switch *xprt_switch_get(struct rpc_xprt_switch *xps) 147 { 148 if (xps != NULL && kref_get_unless_zero(&xps->xps_kref)) 149 return xps; 150 return NULL; 151 } 152 153 /** 154 * xprt_switch_put - Release a reference to a rpc_xprt_switch 155 * @xps: pointer to struct rpc_xprt_switch 156 * 157 * Release the reference to xps, and free it once the refcount is zero. 158 */ 159 void xprt_switch_put(struct rpc_xprt_switch *xps) 160 { 161 if (xps != NULL) 162 kref_put(&xps->xps_kref, xprt_switch_free); 163 } 164 165 /** 166 * rpc_xprt_switch_set_roundrobin - Set a round-robin policy on rpc_xprt_switch 167 * @xps: pointer to struct rpc_xprt_switch 168 * 169 * Sets a round-robin default policy for iterators acting on xps. 170 */ 171 void rpc_xprt_switch_set_roundrobin(struct rpc_xprt_switch *xps) 172 { 173 if (READ_ONCE(xps->xps_iter_ops) != &rpc_xprt_iter_roundrobin) 174 WRITE_ONCE(xps->xps_iter_ops, &rpc_xprt_iter_roundrobin); 175 } 176 177 static 178 const struct rpc_xprt_iter_ops *xprt_iter_ops(const struct rpc_xprt_iter *xpi) 179 { 180 if (xpi->xpi_ops != NULL) 181 return xpi->xpi_ops; 182 return rcu_dereference(xpi->xpi_xpswitch)->xps_iter_ops; 183 } 184 185 static 186 void xprt_iter_no_rewind(struct rpc_xprt_iter *xpi) 187 { 188 } 189 190 static 191 void xprt_iter_default_rewind(struct rpc_xprt_iter *xpi) 192 { 193 WRITE_ONCE(xpi->xpi_cursor, NULL); 194 } 195 196 static 197 bool xprt_is_active(const struct rpc_xprt *xprt) 198 { 199 return kref_read(&xprt->kref) != 0; 200 } 201 202 static 203 struct rpc_xprt *xprt_switch_find_first_entry(struct list_head *head) 204 { 205 struct rpc_xprt *pos; 206 207 list_for_each_entry_rcu(pos, head, xprt_switch) { 208 if (xprt_is_active(pos)) 209 return pos; 210 } 211 return NULL; 212 } 213 214 static 215 struct rpc_xprt *xprt_iter_first_entry(struct rpc_xprt_iter *xpi) 216 { 217 struct rpc_xprt_switch *xps = rcu_dereference(xpi->xpi_xpswitch); 218 219 if (xps == NULL) 220 return NULL; 221 return xprt_switch_find_first_entry(&xps->xps_xprt_list); 222 } 223 224 static 225 struct rpc_xprt *xprt_switch_find_current_entry(struct list_head *head, 226 const struct rpc_xprt *cur) 227 { 228 struct rpc_xprt *pos; 229 bool found = false; 230 231 list_for_each_entry_rcu(pos, head, xprt_switch) { 232 if (cur == pos) 233 found = true; 234 if (found && xprt_is_active(pos)) 235 return pos; 236 } 237 return NULL; 238 } 239 240 static 241 struct rpc_xprt *xprt_iter_current_entry(struct rpc_xprt_iter *xpi) 242 { 243 struct rpc_xprt_switch *xps = rcu_dereference(xpi->xpi_xpswitch); 244 struct list_head *head; 245 246 if (xps == NULL) 247 return NULL; 248 head = &xps->xps_xprt_list; 249 if (xpi->xpi_cursor == NULL || xps->xps_nxprts < 2) 250 return xprt_switch_find_first_entry(head); 251 return xprt_switch_find_current_entry(head, xpi->xpi_cursor); 252 } 253 254 bool rpc_xprt_switch_has_addr(struct rpc_xprt_switch *xps, 255 const struct sockaddr *sap) 256 { 257 struct list_head *head; 258 struct rpc_xprt *pos; 259 260 if (xps == NULL || sap == NULL) 261 return false; 262 263 head = &xps->xps_xprt_list; 264 list_for_each_entry_rcu(pos, head, xprt_switch) { 265 if (rpc_cmp_addr_port(sap, (struct sockaddr *)&pos->addr)) { 266 pr_info("RPC: addr %s already in xprt switch\n", 267 pos->address_strings[RPC_DISPLAY_ADDR]); 268 return true; 269 } 270 } 271 return false; 272 } 273 274 static 275 struct rpc_xprt *xprt_switch_find_next_entry(struct list_head *head, 276 const struct rpc_xprt *cur) 277 { 278 struct rpc_xprt *pos, *prev = NULL; 279 bool found = false; 280 281 list_for_each_entry_rcu(pos, head, xprt_switch) { 282 if (cur == prev) 283 found = true; 284 if (found && xprt_is_active(pos)) 285 return pos; 286 prev = pos; 287 } 288 return NULL; 289 } 290 291 static 292 struct rpc_xprt *xprt_switch_set_next_cursor(struct list_head *head, 293 struct rpc_xprt **cursor, 294 xprt_switch_find_xprt_t find_next) 295 { 296 struct rpc_xprt *cur, *pos, *old; 297 298 cur = READ_ONCE(*cursor); 299 for (;;) { 300 old = cur; 301 pos = find_next(head, old); 302 if (pos == NULL) 303 break; 304 cur = cmpxchg_relaxed(cursor, old, pos); 305 if (cur == old) 306 break; 307 } 308 return pos; 309 } 310 311 static 312 struct rpc_xprt *xprt_iter_next_entry_multiple(struct rpc_xprt_iter *xpi, 313 xprt_switch_find_xprt_t find_next) 314 { 315 struct rpc_xprt_switch *xps = rcu_dereference(xpi->xpi_xpswitch); 316 317 if (xps == NULL) 318 return NULL; 319 return xprt_switch_set_next_cursor(&xps->xps_xprt_list, 320 &xpi->xpi_cursor, 321 find_next); 322 } 323 324 static 325 struct rpc_xprt *xprt_switch_find_next_entry_roundrobin(struct list_head *head, 326 const struct rpc_xprt *cur) 327 { 328 struct rpc_xprt *ret; 329 330 ret = xprt_switch_find_next_entry(head, cur); 331 if (ret != NULL) 332 return ret; 333 return xprt_switch_find_first_entry(head); 334 } 335 336 static 337 struct rpc_xprt *xprt_iter_next_entry_roundrobin(struct rpc_xprt_iter *xpi) 338 { 339 struct rpc_xprt_switch *xps = rcu_dereference(xpi->xpi_xpswitch); 340 struct rpc_xprt *xprt; 341 unsigned long xprt_queuelen; 342 unsigned long xps_queuelen; 343 344 do { 345 xprt = xprt_iter_next_entry_multiple(xpi, 346 xprt_switch_find_next_entry_roundrobin); 347 if (xprt == NULL) 348 break; 349 xprt_queuelen = atomic_long_read(&xprt->queuelen); 350 if (xprt_queuelen <= 2) 351 break; 352 xps_queuelen = atomic_long_read(&xps->xps_queuelen); 353 /* Exit loop if xprt_queuelen <= average queue length */ 354 } while (xprt_queuelen * READ_ONCE(xps->xps_nactive) > xps_queuelen); 355 return xprt; 356 } 357 358 static 359 struct rpc_xprt *xprt_iter_next_entry_all(struct rpc_xprt_iter *xpi) 360 { 361 return xprt_iter_next_entry_multiple(xpi, xprt_switch_find_next_entry); 362 } 363 364 /* 365 * xprt_iter_rewind - Resets the xprt iterator 366 * @xpi: pointer to rpc_xprt_iter 367 * 368 * Resets xpi to ensure that it points to the first entry in the list 369 * of transports. 370 */ 371 static 372 void xprt_iter_rewind(struct rpc_xprt_iter *xpi) 373 { 374 rcu_read_lock(); 375 xprt_iter_ops(xpi)->xpi_rewind(xpi); 376 rcu_read_unlock(); 377 } 378 379 static void __xprt_iter_init(struct rpc_xprt_iter *xpi, 380 struct rpc_xprt_switch *xps, 381 const struct rpc_xprt_iter_ops *ops) 382 { 383 rcu_assign_pointer(xpi->xpi_xpswitch, xprt_switch_get(xps)); 384 xpi->xpi_cursor = NULL; 385 xpi->xpi_ops = ops; 386 } 387 388 /** 389 * xprt_iter_init - Initialise an xprt iterator 390 * @xpi: pointer to rpc_xprt_iter 391 * @xps: pointer to rpc_xprt_switch 392 * 393 * Initialises the iterator to use the default iterator ops 394 * as set in xps. This function is mainly intended for internal 395 * use in the rpc_client. 396 */ 397 void xprt_iter_init(struct rpc_xprt_iter *xpi, 398 struct rpc_xprt_switch *xps) 399 { 400 __xprt_iter_init(xpi, xps, NULL); 401 } 402 403 /** 404 * xprt_iter_init_listall - Initialise an xprt iterator 405 * @xpi: pointer to rpc_xprt_iter 406 * @xps: pointer to rpc_xprt_switch 407 * 408 * Initialises the iterator to iterate once through the entire list 409 * of entries in xps. 410 */ 411 void xprt_iter_init_listall(struct rpc_xprt_iter *xpi, 412 struct rpc_xprt_switch *xps) 413 { 414 __xprt_iter_init(xpi, xps, &rpc_xprt_iter_listall); 415 } 416 417 /** 418 * xprt_iter_xchg_switch - Atomically swap out the rpc_xprt_switch 419 * @xpi: pointer to rpc_xprt_iter 420 * @newswitch: pointer to a new rpc_xprt_switch or NULL 421 * 422 * Swaps out the existing xpi->xpi_xpswitch with a new value. 423 */ 424 struct rpc_xprt_switch *xprt_iter_xchg_switch(struct rpc_xprt_iter *xpi, 425 struct rpc_xprt_switch *newswitch) 426 { 427 struct rpc_xprt_switch __rcu *oldswitch; 428 429 /* Atomically swap out the old xpswitch */ 430 oldswitch = xchg(&xpi->xpi_xpswitch, RCU_INITIALIZER(newswitch)); 431 if (newswitch != NULL) 432 xprt_iter_rewind(xpi); 433 return rcu_dereference_protected(oldswitch, true); 434 } 435 436 /** 437 * xprt_iter_destroy - Destroys the xprt iterator 438 * @xpi: pointer to rpc_xprt_iter 439 */ 440 void xprt_iter_destroy(struct rpc_xprt_iter *xpi) 441 { 442 xprt_switch_put(xprt_iter_xchg_switch(xpi, NULL)); 443 } 444 445 /** 446 * xprt_iter_xprt - Returns the rpc_xprt pointed to by the cursor 447 * @xpi: pointer to rpc_xprt_iter 448 * 449 * Returns a pointer to the struct rpc_xprt that is currently 450 * pointed to by the cursor. 451 * Caller must be holding rcu_read_lock(). 452 */ 453 struct rpc_xprt *xprt_iter_xprt(struct rpc_xprt_iter *xpi) 454 { 455 WARN_ON_ONCE(!rcu_read_lock_held()); 456 return xprt_iter_ops(xpi)->xpi_xprt(xpi); 457 } 458 459 static 460 struct rpc_xprt *xprt_iter_get_helper(struct rpc_xprt_iter *xpi, 461 struct rpc_xprt *(*fn)(struct rpc_xprt_iter *)) 462 { 463 struct rpc_xprt *ret; 464 465 do { 466 ret = fn(xpi); 467 if (ret == NULL) 468 break; 469 ret = xprt_get(ret); 470 } while (ret == NULL); 471 return ret; 472 } 473 474 /** 475 * xprt_iter_get_xprt - Returns the rpc_xprt pointed to by the cursor 476 * @xpi: pointer to rpc_xprt_iter 477 * 478 * Returns a reference to the struct rpc_xprt that is currently 479 * pointed to by the cursor. 480 */ 481 struct rpc_xprt *xprt_iter_get_xprt(struct rpc_xprt_iter *xpi) 482 { 483 struct rpc_xprt *xprt; 484 485 rcu_read_lock(); 486 xprt = xprt_iter_get_helper(xpi, xprt_iter_ops(xpi)->xpi_xprt); 487 rcu_read_unlock(); 488 return xprt; 489 } 490 491 /** 492 * xprt_iter_get_next - Returns the next rpc_xprt following the cursor 493 * @xpi: pointer to rpc_xprt_iter 494 * 495 * Returns a reference to the struct rpc_xprt that immediately follows the 496 * entry pointed to by the cursor. 497 */ 498 struct rpc_xprt *xprt_iter_get_next(struct rpc_xprt_iter *xpi) 499 { 500 struct rpc_xprt *xprt; 501 502 rcu_read_lock(); 503 xprt = xprt_iter_get_helper(xpi, xprt_iter_ops(xpi)->xpi_next); 504 rcu_read_unlock(); 505 return xprt; 506 } 507 508 /* Policy for always returning the first entry in the rpc_xprt_switch */ 509 static 510 const struct rpc_xprt_iter_ops rpc_xprt_iter_singular = { 511 .xpi_rewind = xprt_iter_no_rewind, 512 .xpi_xprt = xprt_iter_first_entry, 513 .xpi_next = xprt_iter_first_entry, 514 }; 515 516 /* Policy for round-robin iteration of entries in the rpc_xprt_switch */ 517 static 518 const struct rpc_xprt_iter_ops rpc_xprt_iter_roundrobin = { 519 .xpi_rewind = xprt_iter_default_rewind, 520 .xpi_xprt = xprt_iter_current_entry, 521 .xpi_next = xprt_iter_next_entry_roundrobin, 522 }; 523 524 /* Policy for once-through iteration of entries in the rpc_xprt_switch */ 525 static 526 const struct rpc_xprt_iter_ops rpc_xprt_iter_listall = { 527 .xpi_rewind = xprt_iter_default_rewind, 528 .xpi_xprt = xprt_iter_current_entry, 529 .xpi_next = xprt_iter_next_entry_all, 530 }; 531