1 // SPDX-License-Identifier: GPL-2.0 2 /* 3 * Multipath support for RPC 4 * 5 * Copyright (c) 2015, 2016, Primary Data, Inc. All rights reserved. 6 * 7 * Trond Myklebust <trond.myklebust@primarydata.com> 8 * 9 */ 10 #include <linux/types.h> 11 #include <linux/kref.h> 12 #include <linux/list.h> 13 #include <linux/rcupdate.h> 14 #include <linux/rculist.h> 15 #include <linux/slab.h> 16 #include <asm/cmpxchg.h> 17 #include <linux/spinlock.h> 18 #include <linux/sunrpc/xprt.h> 19 #include <linux/sunrpc/addr.h> 20 #include <linux/sunrpc/xprtmultipath.h> 21 22 typedef struct rpc_xprt *(*xprt_switch_find_xprt_t)(struct list_head *head, 23 const struct rpc_xprt *cur); 24 25 static const struct rpc_xprt_iter_ops rpc_xprt_iter_singular; 26 static const struct rpc_xprt_iter_ops rpc_xprt_iter_roundrobin; 27 static const struct rpc_xprt_iter_ops rpc_xprt_iter_listall; 28 29 static void xprt_switch_add_xprt_locked(struct rpc_xprt_switch *xps, 30 struct rpc_xprt *xprt) 31 { 32 if (unlikely(xprt_get(xprt) == NULL)) 33 return; 34 list_add_tail_rcu(&xprt->xprt_switch, &xps->xps_xprt_list); 35 smp_wmb(); 36 if (xps->xps_nxprts == 0) 37 xps->xps_net = xprt->xprt_net; 38 xps->xps_nxprts++; 39 xps->xps_nactive++; 40 } 41 42 /** 43 * rpc_xprt_switch_add_xprt - Add a new rpc_xprt to an rpc_xprt_switch 44 * @xps: pointer to struct rpc_xprt_switch 45 * @xprt: pointer to struct rpc_xprt 46 * 47 * Adds xprt to the end of the list of struct rpc_xprt in xps. 48 */ 49 void rpc_xprt_switch_add_xprt(struct rpc_xprt_switch *xps, 50 struct rpc_xprt *xprt) 51 { 52 if (xprt == NULL) 53 return; 54 spin_lock(&xps->xps_lock); 55 if ((xps->xps_net == xprt->xprt_net || xps->xps_net == NULL) && 56 !rpc_xprt_switch_has_addr(xps, (struct sockaddr *)&xprt->addr)) 57 xprt_switch_add_xprt_locked(xps, xprt); 58 spin_unlock(&xps->xps_lock); 59 } 60 61 static void xprt_switch_remove_xprt_locked(struct rpc_xprt_switch *xps, 62 struct rpc_xprt *xprt) 63 { 64 if (unlikely(xprt == NULL)) 65 return; 66 xps->xps_nactive--; 67 xps->xps_nxprts--; 68 if (xps->xps_nxprts == 0) 69 xps->xps_net = NULL; 70 smp_wmb(); 71 list_del_rcu(&xprt->xprt_switch); 72 } 73 74 /** 75 * rpc_xprt_switch_remove_xprt - Removes an rpc_xprt from a rpc_xprt_switch 76 * @xps: pointer to struct rpc_xprt_switch 77 * @xprt: pointer to struct rpc_xprt 78 * 79 * Removes xprt from the list of struct rpc_xprt in xps. 80 */ 81 void rpc_xprt_switch_remove_xprt(struct rpc_xprt_switch *xps, 82 struct rpc_xprt *xprt) 83 { 84 spin_lock(&xps->xps_lock); 85 xprt_switch_remove_xprt_locked(xps, xprt); 86 spin_unlock(&xps->xps_lock); 87 xprt_put(xprt); 88 } 89 90 /** 91 * xprt_switch_alloc - Allocate a new struct rpc_xprt_switch 92 * @xprt: pointer to struct rpc_xprt 93 * @gfp_flags: allocation flags 94 * 95 * On success, returns an initialised struct rpc_xprt_switch, containing 96 * the entry xprt. Returns NULL on failure. 97 */ 98 struct rpc_xprt_switch *xprt_switch_alloc(struct rpc_xprt *xprt, 99 gfp_t gfp_flags) 100 { 101 struct rpc_xprt_switch *xps; 102 103 xps = kmalloc(sizeof(*xps), gfp_flags); 104 if (xps != NULL) { 105 spin_lock_init(&xps->xps_lock); 106 kref_init(&xps->xps_kref); 107 xps->xps_nxprts = 0; 108 INIT_LIST_HEAD(&xps->xps_xprt_list); 109 xps->xps_iter_ops = &rpc_xprt_iter_singular; 110 xprt_switch_add_xprt_locked(xps, xprt); 111 } 112 113 return xps; 114 } 115 116 static void xprt_switch_free_entries(struct rpc_xprt_switch *xps) 117 { 118 spin_lock(&xps->xps_lock); 119 while (!list_empty(&xps->xps_xprt_list)) { 120 struct rpc_xprt *xprt; 121 122 xprt = list_first_entry(&xps->xps_xprt_list, 123 struct rpc_xprt, xprt_switch); 124 xprt_switch_remove_xprt_locked(xps, xprt); 125 spin_unlock(&xps->xps_lock); 126 xprt_put(xprt); 127 spin_lock(&xps->xps_lock); 128 } 129 spin_unlock(&xps->xps_lock); 130 } 131 132 static void xprt_switch_free(struct kref *kref) 133 { 134 struct rpc_xprt_switch *xps = container_of(kref, 135 struct rpc_xprt_switch, xps_kref); 136 137 xprt_switch_free_entries(xps); 138 kfree_rcu(xps, xps_rcu); 139 } 140 141 /** 142 * xprt_switch_get - Return a reference to a rpc_xprt_switch 143 * @xps: pointer to struct rpc_xprt_switch 144 * 145 * Returns a reference to xps unless the refcount is already zero. 146 */ 147 struct rpc_xprt_switch *xprt_switch_get(struct rpc_xprt_switch *xps) 148 { 149 if (xps != NULL && kref_get_unless_zero(&xps->xps_kref)) 150 return xps; 151 return NULL; 152 } 153 154 /** 155 * xprt_switch_put - Release a reference to a rpc_xprt_switch 156 * @xps: pointer to struct rpc_xprt_switch 157 * 158 * Release the reference to xps, and free it once the refcount is zero. 159 */ 160 void xprt_switch_put(struct rpc_xprt_switch *xps) 161 { 162 if (xps != NULL) 163 kref_put(&xps->xps_kref, xprt_switch_free); 164 } 165 166 /** 167 * rpc_xprt_switch_set_roundrobin - Set a round-robin policy on rpc_xprt_switch 168 * @xps: pointer to struct rpc_xprt_switch 169 * 170 * Sets a round-robin default policy for iterators acting on xps. 171 */ 172 void rpc_xprt_switch_set_roundrobin(struct rpc_xprt_switch *xps) 173 { 174 if (READ_ONCE(xps->xps_iter_ops) != &rpc_xprt_iter_roundrobin) 175 WRITE_ONCE(xps->xps_iter_ops, &rpc_xprt_iter_roundrobin); 176 } 177 178 static 179 const struct rpc_xprt_iter_ops *xprt_iter_ops(const struct rpc_xprt_iter *xpi) 180 { 181 if (xpi->xpi_ops != NULL) 182 return xpi->xpi_ops; 183 return rcu_dereference(xpi->xpi_xpswitch)->xps_iter_ops; 184 } 185 186 static 187 void xprt_iter_no_rewind(struct rpc_xprt_iter *xpi) 188 { 189 } 190 191 static 192 void xprt_iter_default_rewind(struct rpc_xprt_iter *xpi) 193 { 194 WRITE_ONCE(xpi->xpi_cursor, NULL); 195 } 196 197 static 198 struct rpc_xprt *xprt_switch_find_first_entry(struct list_head *head) 199 { 200 return list_first_or_null_rcu(head, struct rpc_xprt, xprt_switch); 201 } 202 203 static 204 struct rpc_xprt *xprt_iter_first_entry(struct rpc_xprt_iter *xpi) 205 { 206 struct rpc_xprt_switch *xps = rcu_dereference(xpi->xpi_xpswitch); 207 208 if (xps == NULL) 209 return NULL; 210 return xprt_switch_find_first_entry(&xps->xps_xprt_list); 211 } 212 213 static 214 struct rpc_xprt *xprt_switch_find_current_entry(struct list_head *head, 215 const struct rpc_xprt *cur) 216 { 217 struct rpc_xprt *pos; 218 219 list_for_each_entry_rcu(pos, head, xprt_switch) { 220 if (cur == pos) 221 return pos; 222 } 223 return NULL; 224 } 225 226 static 227 struct rpc_xprt *xprt_iter_current_entry(struct rpc_xprt_iter *xpi) 228 { 229 struct rpc_xprt_switch *xps = rcu_dereference(xpi->xpi_xpswitch); 230 struct list_head *head; 231 232 if (xps == NULL) 233 return NULL; 234 head = &xps->xps_xprt_list; 235 if (xpi->xpi_cursor == NULL || xps->xps_nxprts < 2) 236 return xprt_switch_find_first_entry(head); 237 return xprt_switch_find_current_entry(head, xpi->xpi_cursor); 238 } 239 240 bool rpc_xprt_switch_has_addr(struct rpc_xprt_switch *xps, 241 const struct sockaddr *sap) 242 { 243 struct list_head *head; 244 struct rpc_xprt *pos; 245 246 if (xps == NULL || sap == NULL) 247 return false; 248 249 head = &xps->xps_xprt_list; 250 list_for_each_entry_rcu(pos, head, xprt_switch) { 251 if (rpc_cmp_addr_port(sap, (struct sockaddr *)&pos->addr)) { 252 pr_info("RPC: addr %s already in xprt switch\n", 253 pos->address_strings[RPC_DISPLAY_ADDR]); 254 return true; 255 } 256 } 257 return false; 258 } 259 260 static 261 struct rpc_xprt *xprt_switch_find_next_entry(struct list_head *head, 262 const struct rpc_xprt *cur) 263 { 264 struct rpc_xprt *pos, *prev = NULL; 265 266 list_for_each_entry_rcu(pos, head, xprt_switch) { 267 if (cur == prev) 268 return pos; 269 prev = pos; 270 } 271 return NULL; 272 } 273 274 static 275 struct rpc_xprt *xprt_switch_set_next_cursor(struct list_head *head, 276 struct rpc_xprt **cursor, 277 xprt_switch_find_xprt_t find_next) 278 { 279 struct rpc_xprt *cur, *pos, *old; 280 281 cur = READ_ONCE(*cursor); 282 for (;;) { 283 old = cur; 284 pos = find_next(head, old); 285 if (pos == NULL) 286 break; 287 cur = cmpxchg_relaxed(cursor, old, pos); 288 if (cur == old) 289 break; 290 } 291 return pos; 292 } 293 294 static 295 struct rpc_xprt *xprt_iter_next_entry_multiple(struct rpc_xprt_iter *xpi, 296 xprt_switch_find_xprt_t find_next) 297 { 298 struct rpc_xprt_switch *xps = rcu_dereference(xpi->xpi_xpswitch); 299 300 if (xps == NULL) 301 return NULL; 302 return xprt_switch_set_next_cursor(&xps->xps_xprt_list, 303 &xpi->xpi_cursor, 304 find_next); 305 } 306 307 static 308 struct rpc_xprt *xprt_switch_find_next_entry_roundrobin(struct list_head *head, 309 const struct rpc_xprt *cur) 310 { 311 struct rpc_xprt *ret; 312 313 ret = xprt_switch_find_next_entry(head, cur); 314 if (ret != NULL) 315 return ret; 316 return xprt_switch_find_first_entry(head); 317 } 318 319 static 320 struct rpc_xprt *xprt_iter_next_entry_roundrobin(struct rpc_xprt_iter *xpi) 321 { 322 struct rpc_xprt_switch *xps = rcu_dereference(xpi->xpi_xpswitch); 323 struct rpc_xprt *xprt; 324 unsigned long xprt_queuelen; 325 unsigned long xps_queuelen; 326 unsigned long xps_avglen; 327 328 do { 329 xprt = xprt_iter_next_entry_multiple(xpi, 330 xprt_switch_find_next_entry_roundrobin); 331 if (xprt == NULL) 332 break; 333 xprt_queuelen = atomic_long_read(&xprt->queuelen); 334 if (xprt_queuelen <= 2) 335 break; 336 xps_queuelen = atomic_long_read(&xps->xps_queuelen); 337 xps_avglen = DIV_ROUND_UP(xps_queuelen, xps->xps_nactive); 338 } while (xprt_queuelen > xps_avglen); 339 return xprt; 340 } 341 342 static 343 struct rpc_xprt *xprt_iter_next_entry_all(struct rpc_xprt_iter *xpi) 344 { 345 return xprt_iter_next_entry_multiple(xpi, xprt_switch_find_next_entry); 346 } 347 348 /* 349 * xprt_iter_rewind - Resets the xprt iterator 350 * @xpi: pointer to rpc_xprt_iter 351 * 352 * Resets xpi to ensure that it points to the first entry in the list 353 * of transports. 354 */ 355 static 356 void xprt_iter_rewind(struct rpc_xprt_iter *xpi) 357 { 358 rcu_read_lock(); 359 xprt_iter_ops(xpi)->xpi_rewind(xpi); 360 rcu_read_unlock(); 361 } 362 363 static void __xprt_iter_init(struct rpc_xprt_iter *xpi, 364 struct rpc_xprt_switch *xps, 365 const struct rpc_xprt_iter_ops *ops) 366 { 367 rcu_assign_pointer(xpi->xpi_xpswitch, xprt_switch_get(xps)); 368 xpi->xpi_cursor = NULL; 369 xpi->xpi_ops = ops; 370 } 371 372 /** 373 * xprt_iter_init - Initialise an xprt iterator 374 * @xpi: pointer to rpc_xprt_iter 375 * @xps: pointer to rpc_xprt_switch 376 * 377 * Initialises the iterator to use the default iterator ops 378 * as set in xps. This function is mainly intended for internal 379 * use in the rpc_client. 380 */ 381 void xprt_iter_init(struct rpc_xprt_iter *xpi, 382 struct rpc_xprt_switch *xps) 383 { 384 __xprt_iter_init(xpi, xps, NULL); 385 } 386 387 /** 388 * xprt_iter_init_listall - Initialise an xprt iterator 389 * @xpi: pointer to rpc_xprt_iter 390 * @xps: pointer to rpc_xprt_switch 391 * 392 * Initialises the iterator to iterate once through the entire list 393 * of entries in xps. 394 */ 395 void xprt_iter_init_listall(struct rpc_xprt_iter *xpi, 396 struct rpc_xprt_switch *xps) 397 { 398 __xprt_iter_init(xpi, xps, &rpc_xprt_iter_listall); 399 } 400 401 /** 402 * xprt_iter_xchg_switch - Atomically swap out the rpc_xprt_switch 403 * @xpi: pointer to rpc_xprt_iter 404 * @newswitch: pointer to a new rpc_xprt_switch or NULL 405 * 406 * Swaps out the existing xpi->xpi_xpswitch with a new value. 407 */ 408 struct rpc_xprt_switch *xprt_iter_xchg_switch(struct rpc_xprt_iter *xpi, 409 struct rpc_xprt_switch *newswitch) 410 { 411 struct rpc_xprt_switch __rcu *oldswitch; 412 413 /* Atomically swap out the old xpswitch */ 414 oldswitch = xchg(&xpi->xpi_xpswitch, RCU_INITIALIZER(newswitch)); 415 if (newswitch != NULL) 416 xprt_iter_rewind(xpi); 417 return rcu_dereference_protected(oldswitch, true); 418 } 419 420 /** 421 * xprt_iter_destroy - Destroys the xprt iterator 422 * @xpi: pointer to rpc_xprt_iter 423 */ 424 void xprt_iter_destroy(struct rpc_xprt_iter *xpi) 425 { 426 xprt_switch_put(xprt_iter_xchg_switch(xpi, NULL)); 427 } 428 429 /** 430 * xprt_iter_xprt - Returns the rpc_xprt pointed to by the cursor 431 * @xpi: pointer to rpc_xprt_iter 432 * 433 * Returns a pointer to the struct rpc_xprt that is currently 434 * pointed to by the cursor. 435 * Caller must be holding rcu_read_lock(). 436 */ 437 struct rpc_xprt *xprt_iter_xprt(struct rpc_xprt_iter *xpi) 438 { 439 WARN_ON_ONCE(!rcu_read_lock_held()); 440 return xprt_iter_ops(xpi)->xpi_xprt(xpi); 441 } 442 443 static 444 struct rpc_xprt *xprt_iter_get_helper(struct rpc_xprt_iter *xpi, 445 struct rpc_xprt *(*fn)(struct rpc_xprt_iter *)) 446 { 447 struct rpc_xprt *ret; 448 449 do { 450 ret = fn(xpi); 451 if (ret == NULL) 452 break; 453 ret = xprt_get(ret); 454 } while (ret == NULL); 455 return ret; 456 } 457 458 /** 459 * xprt_iter_get_xprt - Returns the rpc_xprt pointed to by the cursor 460 * @xpi: pointer to rpc_xprt_iter 461 * 462 * Returns a reference to the struct rpc_xprt that is currently 463 * pointed to by the cursor. 464 */ 465 struct rpc_xprt *xprt_iter_get_xprt(struct rpc_xprt_iter *xpi) 466 { 467 struct rpc_xprt *xprt; 468 469 rcu_read_lock(); 470 xprt = xprt_iter_get_helper(xpi, xprt_iter_ops(xpi)->xpi_xprt); 471 rcu_read_unlock(); 472 return xprt; 473 } 474 475 /** 476 * xprt_iter_get_next - Returns the next rpc_xprt following the cursor 477 * @xpi: pointer to rpc_xprt_iter 478 * 479 * Returns a reference to the struct rpc_xprt that immediately follows the 480 * entry pointed to by the cursor. 481 */ 482 struct rpc_xprt *xprt_iter_get_next(struct rpc_xprt_iter *xpi) 483 { 484 struct rpc_xprt *xprt; 485 486 rcu_read_lock(); 487 xprt = xprt_iter_get_helper(xpi, xprt_iter_ops(xpi)->xpi_next); 488 rcu_read_unlock(); 489 return xprt; 490 } 491 492 /* Policy for always returning the first entry in the rpc_xprt_switch */ 493 static 494 const struct rpc_xprt_iter_ops rpc_xprt_iter_singular = { 495 .xpi_rewind = xprt_iter_no_rewind, 496 .xpi_xprt = xprt_iter_first_entry, 497 .xpi_next = xprt_iter_first_entry, 498 }; 499 500 /* Policy for round-robin iteration of entries in the rpc_xprt_switch */ 501 static 502 const struct rpc_xprt_iter_ops rpc_xprt_iter_roundrobin = { 503 .xpi_rewind = xprt_iter_default_rewind, 504 .xpi_xprt = xprt_iter_current_entry, 505 .xpi_next = xprt_iter_next_entry_roundrobin, 506 }; 507 508 /* Policy for once-through iteration of entries in the rpc_xprt_switch */ 509 static 510 const struct rpc_xprt_iter_ops rpc_xprt_iter_listall = { 511 .xpi_rewind = xprt_iter_default_rewind, 512 .xpi_xprt = xprt_iter_current_entry, 513 .xpi_next = xprt_iter_next_entry_all, 514 }; 515