1 // SPDX-License-Identifier: GPL-2.0 2 3 #include <linux/ceph/ceph_debug.h> 4 5 #include <linux/module.h> 6 #include <linux/slab.h> 7 8 #include <linux/ceph/libceph.h> 9 #include <linux/ceph/osdmap.h> 10 #include <linux/ceph/decode.h> 11 #include <linux/crush/hash.h> 12 #include <linux/crush/mapper.h> 13 14 char *ceph_osdmap_state_str(char *str, int len, u32 state) 15 { 16 if (!len) 17 return str; 18 19 if ((state & CEPH_OSD_EXISTS) && (state & CEPH_OSD_UP)) 20 snprintf(str, len, "exists, up"); 21 else if (state & CEPH_OSD_EXISTS) 22 snprintf(str, len, "exists"); 23 else if (state & CEPH_OSD_UP) 24 snprintf(str, len, "up"); 25 else 26 snprintf(str, len, "doesn't exist"); 27 28 return str; 29 } 30 31 /* maps */ 32 33 static int calc_bits_of(unsigned int t) 34 { 35 int b = 0; 36 while (t) { 37 t = t >> 1; 38 b++; 39 } 40 return b; 41 } 42 43 /* 44 * the foo_mask is the smallest value 2^n-1 that is >= foo. 45 */ 46 static void calc_pg_masks(struct ceph_pg_pool_info *pi) 47 { 48 pi->pg_num_mask = (1 << calc_bits_of(pi->pg_num-1)) - 1; 49 pi->pgp_num_mask = (1 << calc_bits_of(pi->pgp_num-1)) - 1; 50 } 51 52 /* 53 * decode crush map 54 */ 55 static int crush_decode_uniform_bucket(void **p, void *end, 56 struct crush_bucket_uniform *b) 57 { 58 dout("crush_decode_uniform_bucket %p to %p\n", *p, end); 59 ceph_decode_need(p, end, (1+b->h.size) * sizeof(u32), bad); 60 b->item_weight = ceph_decode_32(p); 61 return 0; 62 bad: 63 return -EINVAL; 64 } 65 66 static int crush_decode_list_bucket(void **p, void *end, 67 struct crush_bucket_list *b) 68 { 69 int j; 70 dout("crush_decode_list_bucket %p to %p\n", *p, end); 71 b->item_weights = kcalloc(b->h.size, sizeof(u32), GFP_NOFS); 72 if (b->item_weights == NULL) 73 return -ENOMEM; 74 b->sum_weights = kcalloc(b->h.size, sizeof(u32), GFP_NOFS); 75 if (b->sum_weights == NULL) 76 return -ENOMEM; 77 ceph_decode_need(p, end, 2 * b->h.size * sizeof(u32), bad); 78 for (j = 0; j < b->h.size; j++) { 79 b->item_weights[j] = ceph_decode_32(p); 80 b->sum_weights[j] = ceph_decode_32(p); 81 } 82 return 0; 83 bad: 84 return -EINVAL; 85 } 86 87 static int crush_decode_tree_bucket(void **p, void *end, 88 struct crush_bucket_tree *b) 89 { 90 int j; 91 dout("crush_decode_tree_bucket %p to %p\n", *p, end); 92 ceph_decode_8_safe(p, end, b->num_nodes, bad); 93 b->node_weights = kcalloc(b->num_nodes, sizeof(u32), GFP_NOFS); 94 if (b->node_weights == NULL) 95 return -ENOMEM; 96 ceph_decode_need(p, end, b->num_nodes * sizeof(u32), bad); 97 for (j = 0; j < b->num_nodes; j++) 98 b->node_weights[j] = ceph_decode_32(p); 99 return 0; 100 bad: 101 return -EINVAL; 102 } 103 104 static int crush_decode_straw_bucket(void **p, void *end, 105 struct crush_bucket_straw *b) 106 { 107 int j; 108 dout("crush_decode_straw_bucket %p to %p\n", *p, end); 109 b->item_weights = kcalloc(b->h.size, sizeof(u32), GFP_NOFS); 110 if (b->item_weights == NULL) 111 return -ENOMEM; 112 b->straws = kcalloc(b->h.size, sizeof(u32), GFP_NOFS); 113 if (b->straws == NULL) 114 return -ENOMEM; 115 ceph_decode_need(p, end, 2 * b->h.size * sizeof(u32), bad); 116 for (j = 0; j < b->h.size; j++) { 117 b->item_weights[j] = ceph_decode_32(p); 118 b->straws[j] = ceph_decode_32(p); 119 } 120 return 0; 121 bad: 122 return -EINVAL; 123 } 124 125 static int crush_decode_straw2_bucket(void **p, void *end, 126 struct crush_bucket_straw2 *b) 127 { 128 int j; 129 dout("crush_decode_straw2_bucket %p to %p\n", *p, end); 130 b->item_weights = kcalloc(b->h.size, sizeof(u32), GFP_NOFS); 131 if (b->item_weights == NULL) 132 return -ENOMEM; 133 ceph_decode_need(p, end, b->h.size * sizeof(u32), bad); 134 for (j = 0; j < b->h.size; j++) 135 b->item_weights[j] = ceph_decode_32(p); 136 return 0; 137 bad: 138 return -EINVAL; 139 } 140 141 struct crush_name_node { 142 struct rb_node cn_node; 143 int cn_id; 144 char cn_name[]; 145 }; 146 147 static struct crush_name_node *alloc_crush_name(size_t name_len) 148 { 149 struct crush_name_node *cn; 150 151 cn = kmalloc(sizeof(*cn) + name_len + 1, GFP_NOIO); 152 if (!cn) 153 return NULL; 154 155 RB_CLEAR_NODE(&cn->cn_node); 156 return cn; 157 } 158 159 static void free_crush_name(struct crush_name_node *cn) 160 { 161 WARN_ON(!RB_EMPTY_NODE(&cn->cn_node)); 162 163 kfree(cn); 164 } 165 166 DEFINE_RB_FUNCS(crush_name, struct crush_name_node, cn_id, cn_node) 167 168 static int decode_crush_names(void **p, void *end, struct rb_root *root) 169 { 170 u32 n; 171 172 ceph_decode_32_safe(p, end, n, e_inval); 173 while (n--) { 174 struct crush_name_node *cn; 175 int id; 176 u32 name_len; 177 178 ceph_decode_32_safe(p, end, id, e_inval); 179 ceph_decode_32_safe(p, end, name_len, e_inval); 180 ceph_decode_need(p, end, name_len, e_inval); 181 182 cn = alloc_crush_name(name_len); 183 if (!cn) 184 return -ENOMEM; 185 186 cn->cn_id = id; 187 memcpy(cn->cn_name, *p, name_len); 188 cn->cn_name[name_len] = '\0'; 189 *p += name_len; 190 191 if (!__insert_crush_name(root, cn)) { 192 free_crush_name(cn); 193 return -EEXIST; 194 } 195 } 196 197 return 0; 198 199 e_inval: 200 return -EINVAL; 201 } 202 203 void clear_crush_names(struct rb_root *root) 204 { 205 while (!RB_EMPTY_ROOT(root)) { 206 struct crush_name_node *cn = 207 rb_entry(rb_first(root), struct crush_name_node, cn_node); 208 209 erase_crush_name(root, cn); 210 free_crush_name(cn); 211 } 212 } 213 214 static struct crush_choose_arg_map *alloc_choose_arg_map(void) 215 { 216 struct crush_choose_arg_map *arg_map; 217 218 arg_map = kzalloc(sizeof(*arg_map), GFP_NOIO); 219 if (!arg_map) 220 return NULL; 221 222 RB_CLEAR_NODE(&arg_map->node); 223 return arg_map; 224 } 225 226 static void free_choose_arg_map(struct crush_choose_arg_map *arg_map) 227 { 228 if (arg_map) { 229 int i, j; 230 231 WARN_ON(!RB_EMPTY_NODE(&arg_map->node)); 232 233 for (i = 0; i < arg_map->size; i++) { 234 struct crush_choose_arg *arg = &arg_map->args[i]; 235 236 for (j = 0; j < arg->weight_set_size; j++) 237 kfree(arg->weight_set[j].weights); 238 kfree(arg->weight_set); 239 kfree(arg->ids); 240 } 241 kfree(arg_map->args); 242 kfree(arg_map); 243 } 244 } 245 246 DEFINE_RB_FUNCS(choose_arg_map, struct crush_choose_arg_map, choose_args_index, 247 node); 248 249 void clear_choose_args(struct crush_map *c) 250 { 251 while (!RB_EMPTY_ROOT(&c->choose_args)) { 252 struct crush_choose_arg_map *arg_map = 253 rb_entry(rb_first(&c->choose_args), 254 struct crush_choose_arg_map, node); 255 256 erase_choose_arg_map(&c->choose_args, arg_map); 257 free_choose_arg_map(arg_map); 258 } 259 } 260 261 static u32 *decode_array_32_alloc(void **p, void *end, u32 *plen) 262 { 263 u32 *a = NULL; 264 u32 len; 265 int ret; 266 267 ceph_decode_32_safe(p, end, len, e_inval); 268 if (len) { 269 u32 i; 270 271 a = kmalloc_array(len, sizeof(u32), GFP_NOIO); 272 if (!a) { 273 ret = -ENOMEM; 274 goto fail; 275 } 276 277 ceph_decode_need(p, end, len * sizeof(u32), e_inval); 278 for (i = 0; i < len; i++) 279 a[i] = ceph_decode_32(p); 280 } 281 282 *plen = len; 283 return a; 284 285 e_inval: 286 ret = -EINVAL; 287 fail: 288 kfree(a); 289 return ERR_PTR(ret); 290 } 291 292 /* 293 * Assumes @arg is zero-initialized. 294 */ 295 static int decode_choose_arg(void **p, void *end, struct crush_choose_arg *arg) 296 { 297 int ret; 298 299 ceph_decode_32_safe(p, end, arg->weight_set_size, e_inval); 300 if (arg->weight_set_size) { 301 u32 i; 302 303 arg->weight_set = kmalloc_array(arg->weight_set_size, 304 sizeof(*arg->weight_set), 305 GFP_NOIO); 306 if (!arg->weight_set) 307 return -ENOMEM; 308 309 for (i = 0; i < arg->weight_set_size; i++) { 310 struct crush_weight_set *w = &arg->weight_set[i]; 311 312 w->weights = decode_array_32_alloc(p, end, &w->size); 313 if (IS_ERR(w->weights)) { 314 ret = PTR_ERR(w->weights); 315 w->weights = NULL; 316 return ret; 317 } 318 } 319 } 320 321 arg->ids = decode_array_32_alloc(p, end, &arg->ids_size); 322 if (IS_ERR(arg->ids)) { 323 ret = PTR_ERR(arg->ids); 324 arg->ids = NULL; 325 return ret; 326 } 327 328 return 0; 329 330 e_inval: 331 return -EINVAL; 332 } 333 334 static int decode_choose_args(void **p, void *end, struct crush_map *c) 335 { 336 struct crush_choose_arg_map *arg_map = NULL; 337 u32 num_choose_arg_maps, num_buckets; 338 int ret; 339 340 ceph_decode_32_safe(p, end, num_choose_arg_maps, e_inval); 341 while (num_choose_arg_maps--) { 342 arg_map = alloc_choose_arg_map(); 343 if (!arg_map) { 344 ret = -ENOMEM; 345 goto fail; 346 } 347 348 ceph_decode_64_safe(p, end, arg_map->choose_args_index, 349 e_inval); 350 arg_map->size = c->max_buckets; 351 arg_map->args = kcalloc(arg_map->size, sizeof(*arg_map->args), 352 GFP_NOIO); 353 if (!arg_map->args) { 354 ret = -ENOMEM; 355 goto fail; 356 } 357 358 ceph_decode_32_safe(p, end, num_buckets, e_inval); 359 while (num_buckets--) { 360 struct crush_choose_arg *arg; 361 u32 bucket_index; 362 363 ceph_decode_32_safe(p, end, bucket_index, e_inval); 364 if (bucket_index >= arg_map->size) 365 goto e_inval; 366 367 arg = &arg_map->args[bucket_index]; 368 ret = decode_choose_arg(p, end, arg); 369 if (ret) 370 goto fail; 371 372 if (arg->ids_size && 373 arg->ids_size != c->buckets[bucket_index]->size) 374 goto e_inval; 375 } 376 377 insert_choose_arg_map(&c->choose_args, arg_map); 378 } 379 380 return 0; 381 382 e_inval: 383 ret = -EINVAL; 384 fail: 385 free_choose_arg_map(arg_map); 386 return ret; 387 } 388 389 static void crush_finalize(struct crush_map *c) 390 { 391 __s32 b; 392 393 /* Space for the array of pointers to per-bucket workspace */ 394 c->working_size = sizeof(struct crush_work) + 395 c->max_buckets * sizeof(struct crush_work_bucket *); 396 397 for (b = 0; b < c->max_buckets; b++) { 398 if (!c->buckets[b]) 399 continue; 400 401 switch (c->buckets[b]->alg) { 402 default: 403 /* 404 * The base case, permutation variables and 405 * the pointer to the permutation array. 406 */ 407 c->working_size += sizeof(struct crush_work_bucket); 408 break; 409 } 410 /* Every bucket has a permutation array. */ 411 c->working_size += c->buckets[b]->size * sizeof(__u32); 412 } 413 } 414 415 static struct crush_map *crush_decode(void *pbyval, void *end) 416 { 417 struct crush_map *c; 418 int err; 419 int i, j; 420 void **p = &pbyval; 421 void *start = pbyval; 422 u32 magic; 423 424 dout("crush_decode %p to %p len %d\n", *p, end, (int)(end - *p)); 425 426 c = kzalloc(sizeof(*c), GFP_NOFS); 427 if (c == NULL) 428 return ERR_PTR(-ENOMEM); 429 430 c->type_names = RB_ROOT; 431 c->names = RB_ROOT; 432 c->choose_args = RB_ROOT; 433 434 /* set tunables to default values */ 435 c->choose_local_tries = 2; 436 c->choose_local_fallback_tries = 5; 437 c->choose_total_tries = 19; 438 c->chooseleaf_descend_once = 0; 439 440 ceph_decode_need(p, end, 4*sizeof(u32), bad); 441 magic = ceph_decode_32(p); 442 if (magic != CRUSH_MAGIC) { 443 pr_err("crush_decode magic %x != current %x\n", 444 (unsigned int)magic, (unsigned int)CRUSH_MAGIC); 445 goto bad; 446 } 447 c->max_buckets = ceph_decode_32(p); 448 c->max_rules = ceph_decode_32(p); 449 c->max_devices = ceph_decode_32(p); 450 451 c->buckets = kcalloc(c->max_buckets, sizeof(*c->buckets), GFP_NOFS); 452 if (c->buckets == NULL) 453 goto badmem; 454 c->rules = kcalloc(c->max_rules, sizeof(*c->rules), GFP_NOFS); 455 if (c->rules == NULL) 456 goto badmem; 457 458 /* buckets */ 459 for (i = 0; i < c->max_buckets; i++) { 460 int size = 0; 461 u32 alg; 462 struct crush_bucket *b; 463 464 ceph_decode_32_safe(p, end, alg, bad); 465 if (alg == 0) { 466 c->buckets[i] = NULL; 467 continue; 468 } 469 dout("crush_decode bucket %d off %x %p to %p\n", 470 i, (int)(*p-start), *p, end); 471 472 switch (alg) { 473 case CRUSH_BUCKET_UNIFORM: 474 size = sizeof(struct crush_bucket_uniform); 475 break; 476 case CRUSH_BUCKET_LIST: 477 size = sizeof(struct crush_bucket_list); 478 break; 479 case CRUSH_BUCKET_TREE: 480 size = sizeof(struct crush_bucket_tree); 481 break; 482 case CRUSH_BUCKET_STRAW: 483 size = sizeof(struct crush_bucket_straw); 484 break; 485 case CRUSH_BUCKET_STRAW2: 486 size = sizeof(struct crush_bucket_straw2); 487 break; 488 default: 489 goto bad; 490 } 491 BUG_ON(size == 0); 492 b = c->buckets[i] = kzalloc(size, GFP_NOFS); 493 if (b == NULL) 494 goto badmem; 495 496 ceph_decode_need(p, end, 4*sizeof(u32), bad); 497 b->id = ceph_decode_32(p); 498 b->type = ceph_decode_16(p); 499 b->alg = ceph_decode_8(p); 500 b->hash = ceph_decode_8(p); 501 b->weight = ceph_decode_32(p); 502 b->size = ceph_decode_32(p); 503 504 dout("crush_decode bucket size %d off %x %p to %p\n", 505 b->size, (int)(*p-start), *p, end); 506 507 b->items = kcalloc(b->size, sizeof(__s32), GFP_NOFS); 508 if (b->items == NULL) 509 goto badmem; 510 511 ceph_decode_need(p, end, b->size*sizeof(u32), bad); 512 for (j = 0; j < b->size; j++) 513 b->items[j] = ceph_decode_32(p); 514 515 switch (b->alg) { 516 case CRUSH_BUCKET_UNIFORM: 517 err = crush_decode_uniform_bucket(p, end, 518 (struct crush_bucket_uniform *)b); 519 if (err < 0) 520 goto fail; 521 break; 522 case CRUSH_BUCKET_LIST: 523 err = crush_decode_list_bucket(p, end, 524 (struct crush_bucket_list *)b); 525 if (err < 0) 526 goto fail; 527 break; 528 case CRUSH_BUCKET_TREE: 529 err = crush_decode_tree_bucket(p, end, 530 (struct crush_bucket_tree *)b); 531 if (err < 0) 532 goto fail; 533 break; 534 case CRUSH_BUCKET_STRAW: 535 err = crush_decode_straw_bucket(p, end, 536 (struct crush_bucket_straw *)b); 537 if (err < 0) 538 goto fail; 539 break; 540 case CRUSH_BUCKET_STRAW2: 541 err = crush_decode_straw2_bucket(p, end, 542 (struct crush_bucket_straw2 *)b); 543 if (err < 0) 544 goto fail; 545 break; 546 } 547 } 548 549 /* rules */ 550 dout("rule vec is %p\n", c->rules); 551 for (i = 0; i < c->max_rules; i++) { 552 u32 yes; 553 struct crush_rule *r; 554 555 ceph_decode_32_safe(p, end, yes, bad); 556 if (!yes) { 557 dout("crush_decode NO rule %d off %x %p to %p\n", 558 i, (int)(*p-start), *p, end); 559 c->rules[i] = NULL; 560 continue; 561 } 562 563 dout("crush_decode rule %d off %x %p to %p\n", 564 i, (int)(*p-start), *p, end); 565 566 /* len */ 567 ceph_decode_32_safe(p, end, yes, bad); 568 #if BITS_PER_LONG == 32 569 if (yes > (ULONG_MAX - sizeof(*r)) 570 / sizeof(struct crush_rule_step)) 571 goto bad; 572 #endif 573 r = kmalloc(struct_size(r, steps, yes), GFP_NOFS); 574 c->rules[i] = r; 575 if (r == NULL) 576 goto badmem; 577 dout(" rule %d is at %p\n", i, r); 578 r->len = yes; 579 ceph_decode_copy_safe(p, end, &r->mask, 4, bad); /* 4 u8's */ 580 ceph_decode_need(p, end, r->len*3*sizeof(u32), bad); 581 for (j = 0; j < r->len; j++) { 582 r->steps[j].op = ceph_decode_32(p); 583 r->steps[j].arg1 = ceph_decode_32(p); 584 r->steps[j].arg2 = ceph_decode_32(p); 585 } 586 } 587 588 err = decode_crush_names(p, end, &c->type_names); 589 if (err) 590 goto fail; 591 592 err = decode_crush_names(p, end, &c->names); 593 if (err) 594 goto fail; 595 596 ceph_decode_skip_map(p, end, 32, string, bad); /* rule_name_map */ 597 598 /* tunables */ 599 ceph_decode_need(p, end, 3*sizeof(u32), done); 600 c->choose_local_tries = ceph_decode_32(p); 601 c->choose_local_fallback_tries = ceph_decode_32(p); 602 c->choose_total_tries = ceph_decode_32(p); 603 dout("crush decode tunable choose_local_tries = %d\n", 604 c->choose_local_tries); 605 dout("crush decode tunable choose_local_fallback_tries = %d\n", 606 c->choose_local_fallback_tries); 607 dout("crush decode tunable choose_total_tries = %d\n", 608 c->choose_total_tries); 609 610 ceph_decode_need(p, end, sizeof(u32), done); 611 c->chooseleaf_descend_once = ceph_decode_32(p); 612 dout("crush decode tunable chooseleaf_descend_once = %d\n", 613 c->chooseleaf_descend_once); 614 615 ceph_decode_need(p, end, sizeof(u8), done); 616 c->chooseleaf_vary_r = ceph_decode_8(p); 617 dout("crush decode tunable chooseleaf_vary_r = %d\n", 618 c->chooseleaf_vary_r); 619 620 /* skip straw_calc_version, allowed_bucket_algs */ 621 ceph_decode_need(p, end, sizeof(u8) + sizeof(u32), done); 622 *p += sizeof(u8) + sizeof(u32); 623 624 ceph_decode_need(p, end, sizeof(u8), done); 625 c->chooseleaf_stable = ceph_decode_8(p); 626 dout("crush decode tunable chooseleaf_stable = %d\n", 627 c->chooseleaf_stable); 628 629 if (*p != end) { 630 /* class_map */ 631 ceph_decode_skip_map(p, end, 32, 32, bad); 632 /* class_name */ 633 ceph_decode_skip_map(p, end, 32, string, bad); 634 /* class_bucket */ 635 ceph_decode_skip_map_of_map(p, end, 32, 32, 32, bad); 636 } 637 638 if (*p != end) { 639 err = decode_choose_args(p, end, c); 640 if (err) 641 goto fail; 642 } 643 644 done: 645 crush_finalize(c); 646 dout("crush_decode success\n"); 647 return c; 648 649 badmem: 650 err = -ENOMEM; 651 fail: 652 dout("crush_decode fail %d\n", err); 653 crush_destroy(c); 654 return ERR_PTR(err); 655 656 bad: 657 err = -EINVAL; 658 goto fail; 659 } 660 661 int ceph_pg_compare(const struct ceph_pg *lhs, const struct ceph_pg *rhs) 662 { 663 if (lhs->pool < rhs->pool) 664 return -1; 665 if (lhs->pool > rhs->pool) 666 return 1; 667 if (lhs->seed < rhs->seed) 668 return -1; 669 if (lhs->seed > rhs->seed) 670 return 1; 671 672 return 0; 673 } 674 675 int ceph_spg_compare(const struct ceph_spg *lhs, const struct ceph_spg *rhs) 676 { 677 int ret; 678 679 ret = ceph_pg_compare(&lhs->pgid, &rhs->pgid); 680 if (ret) 681 return ret; 682 683 if (lhs->shard < rhs->shard) 684 return -1; 685 if (lhs->shard > rhs->shard) 686 return 1; 687 688 return 0; 689 } 690 691 static struct ceph_pg_mapping *alloc_pg_mapping(size_t payload_len) 692 { 693 struct ceph_pg_mapping *pg; 694 695 pg = kmalloc(sizeof(*pg) + payload_len, GFP_NOIO); 696 if (!pg) 697 return NULL; 698 699 RB_CLEAR_NODE(&pg->node); 700 return pg; 701 } 702 703 static void free_pg_mapping(struct ceph_pg_mapping *pg) 704 { 705 WARN_ON(!RB_EMPTY_NODE(&pg->node)); 706 707 kfree(pg); 708 } 709 710 /* 711 * rbtree of pg_mapping for handling pg_temp (explicit mapping of pgid 712 * to a set of osds) and primary_temp (explicit primary setting) 713 */ 714 DEFINE_RB_FUNCS2(pg_mapping, struct ceph_pg_mapping, pgid, ceph_pg_compare, 715 RB_BYPTR, const struct ceph_pg *, node) 716 717 /* 718 * rbtree of pg pool info 719 */ 720 DEFINE_RB_FUNCS(pg_pool, struct ceph_pg_pool_info, id, node) 721 722 struct ceph_pg_pool_info *ceph_pg_pool_by_id(struct ceph_osdmap *map, u64 id) 723 { 724 return lookup_pg_pool(&map->pg_pools, id); 725 } 726 727 const char *ceph_pg_pool_name_by_id(struct ceph_osdmap *map, u64 id) 728 { 729 struct ceph_pg_pool_info *pi; 730 731 if (id == CEPH_NOPOOL) 732 return NULL; 733 734 if (WARN_ON_ONCE(id > (u64) INT_MAX)) 735 return NULL; 736 737 pi = lookup_pg_pool(&map->pg_pools, id); 738 return pi ? pi->name : NULL; 739 } 740 EXPORT_SYMBOL(ceph_pg_pool_name_by_id); 741 742 int ceph_pg_poolid_by_name(struct ceph_osdmap *map, const char *name) 743 { 744 struct rb_node *rbp; 745 746 for (rbp = rb_first(&map->pg_pools); rbp; rbp = rb_next(rbp)) { 747 struct ceph_pg_pool_info *pi = 748 rb_entry(rbp, struct ceph_pg_pool_info, node); 749 if (pi->name && strcmp(pi->name, name) == 0) 750 return pi->id; 751 } 752 return -ENOENT; 753 } 754 EXPORT_SYMBOL(ceph_pg_poolid_by_name); 755 756 u64 ceph_pg_pool_flags(struct ceph_osdmap *map, u64 id) 757 { 758 struct ceph_pg_pool_info *pi; 759 760 pi = lookup_pg_pool(&map->pg_pools, id); 761 return pi ? pi->flags : 0; 762 } 763 EXPORT_SYMBOL(ceph_pg_pool_flags); 764 765 static void __remove_pg_pool(struct rb_root *root, struct ceph_pg_pool_info *pi) 766 { 767 erase_pg_pool(root, pi); 768 kfree(pi->name); 769 kfree(pi); 770 } 771 772 static int decode_pool(void **p, void *end, struct ceph_pg_pool_info *pi) 773 { 774 u8 ev, cv; 775 unsigned len, num; 776 void *pool_end; 777 778 ceph_decode_need(p, end, 2 + 4, bad); 779 ev = ceph_decode_8(p); /* encoding version */ 780 cv = ceph_decode_8(p); /* compat version */ 781 if (ev < 5) { 782 pr_warn("got v %d < 5 cv %d of ceph_pg_pool\n", ev, cv); 783 return -EINVAL; 784 } 785 if (cv > 9) { 786 pr_warn("got v %d cv %d > 9 of ceph_pg_pool\n", ev, cv); 787 return -EINVAL; 788 } 789 len = ceph_decode_32(p); 790 ceph_decode_need(p, end, len, bad); 791 pool_end = *p + len; 792 793 pi->type = ceph_decode_8(p); 794 pi->size = ceph_decode_8(p); 795 pi->crush_ruleset = ceph_decode_8(p); 796 pi->object_hash = ceph_decode_8(p); 797 798 pi->pg_num = ceph_decode_32(p); 799 pi->pgp_num = ceph_decode_32(p); 800 801 *p += 4 + 4; /* skip lpg* */ 802 *p += 4; /* skip last_change */ 803 *p += 8 + 4; /* skip snap_seq, snap_epoch */ 804 805 /* skip snaps */ 806 num = ceph_decode_32(p); 807 while (num--) { 808 *p += 8; /* snapid key */ 809 *p += 1 + 1; /* versions */ 810 len = ceph_decode_32(p); 811 *p += len; 812 } 813 814 /* skip removed_snaps */ 815 num = ceph_decode_32(p); 816 *p += num * (8 + 8); 817 818 *p += 8; /* skip auid */ 819 pi->flags = ceph_decode_64(p); 820 *p += 4; /* skip crash_replay_interval */ 821 822 if (ev >= 7) 823 pi->min_size = ceph_decode_8(p); 824 else 825 pi->min_size = pi->size - pi->size / 2; 826 827 if (ev >= 8) 828 *p += 8 + 8; /* skip quota_max_* */ 829 830 if (ev >= 9) { 831 /* skip tiers */ 832 num = ceph_decode_32(p); 833 *p += num * 8; 834 835 *p += 8; /* skip tier_of */ 836 *p += 1; /* skip cache_mode */ 837 838 pi->read_tier = ceph_decode_64(p); 839 pi->write_tier = ceph_decode_64(p); 840 } else { 841 pi->read_tier = -1; 842 pi->write_tier = -1; 843 } 844 845 if (ev >= 10) { 846 /* skip properties */ 847 num = ceph_decode_32(p); 848 while (num--) { 849 len = ceph_decode_32(p); 850 *p += len; /* key */ 851 len = ceph_decode_32(p); 852 *p += len; /* val */ 853 } 854 } 855 856 if (ev >= 11) { 857 /* skip hit_set_params */ 858 *p += 1 + 1; /* versions */ 859 len = ceph_decode_32(p); 860 *p += len; 861 862 *p += 4; /* skip hit_set_period */ 863 *p += 4; /* skip hit_set_count */ 864 } 865 866 if (ev >= 12) 867 *p += 4; /* skip stripe_width */ 868 869 if (ev >= 13) { 870 *p += 8; /* skip target_max_bytes */ 871 *p += 8; /* skip target_max_objects */ 872 *p += 4; /* skip cache_target_dirty_ratio_micro */ 873 *p += 4; /* skip cache_target_full_ratio_micro */ 874 *p += 4; /* skip cache_min_flush_age */ 875 *p += 4; /* skip cache_min_evict_age */ 876 } 877 878 if (ev >= 14) { 879 /* skip erasure_code_profile */ 880 len = ceph_decode_32(p); 881 *p += len; 882 } 883 884 /* 885 * last_force_op_resend_preluminous, will be overridden if the 886 * map was encoded with RESEND_ON_SPLIT 887 */ 888 if (ev >= 15) 889 pi->last_force_request_resend = ceph_decode_32(p); 890 else 891 pi->last_force_request_resend = 0; 892 893 if (ev >= 16) 894 *p += 4; /* skip min_read_recency_for_promote */ 895 896 if (ev >= 17) 897 *p += 8; /* skip expected_num_objects */ 898 899 if (ev >= 19) 900 *p += 4; /* skip cache_target_dirty_high_ratio_micro */ 901 902 if (ev >= 20) 903 *p += 4; /* skip min_write_recency_for_promote */ 904 905 if (ev >= 21) 906 *p += 1; /* skip use_gmt_hitset */ 907 908 if (ev >= 22) 909 *p += 1; /* skip fast_read */ 910 911 if (ev >= 23) { 912 *p += 4; /* skip hit_set_grade_decay_rate */ 913 *p += 4; /* skip hit_set_search_last_n */ 914 } 915 916 if (ev >= 24) { 917 /* skip opts */ 918 *p += 1 + 1; /* versions */ 919 len = ceph_decode_32(p); 920 *p += len; 921 } 922 923 if (ev >= 25) 924 pi->last_force_request_resend = ceph_decode_32(p); 925 926 /* ignore the rest */ 927 928 *p = pool_end; 929 calc_pg_masks(pi); 930 return 0; 931 932 bad: 933 return -EINVAL; 934 } 935 936 static int decode_pool_names(void **p, void *end, struct ceph_osdmap *map) 937 { 938 struct ceph_pg_pool_info *pi; 939 u32 num, len; 940 u64 pool; 941 942 ceph_decode_32_safe(p, end, num, bad); 943 dout(" %d pool names\n", num); 944 while (num--) { 945 ceph_decode_64_safe(p, end, pool, bad); 946 ceph_decode_32_safe(p, end, len, bad); 947 dout(" pool %llu len %d\n", pool, len); 948 ceph_decode_need(p, end, len, bad); 949 pi = lookup_pg_pool(&map->pg_pools, pool); 950 if (pi) { 951 char *name = kstrndup(*p, len, GFP_NOFS); 952 953 if (!name) 954 return -ENOMEM; 955 kfree(pi->name); 956 pi->name = name; 957 dout(" name is %s\n", pi->name); 958 } 959 *p += len; 960 } 961 return 0; 962 963 bad: 964 return -EINVAL; 965 } 966 967 /* 968 * CRUSH workspaces 969 * 970 * workspace_manager framework borrowed from fs/btrfs/compression.c. 971 * Two simplifications: there is only one type of workspace and there 972 * is always at least one workspace. 973 */ 974 static struct crush_work *alloc_workspace(const struct crush_map *c) 975 { 976 struct crush_work *work; 977 size_t work_size; 978 979 WARN_ON(!c->working_size); 980 work_size = crush_work_size(c, CEPH_PG_MAX_SIZE); 981 dout("%s work_size %zu bytes\n", __func__, work_size); 982 983 work = ceph_kvmalloc(work_size, GFP_NOIO); 984 if (!work) 985 return NULL; 986 987 INIT_LIST_HEAD(&work->item); 988 crush_init_workspace(c, work); 989 return work; 990 } 991 992 static void free_workspace(struct crush_work *work) 993 { 994 WARN_ON(!list_empty(&work->item)); 995 kvfree(work); 996 } 997 998 static void init_workspace_manager(struct workspace_manager *wsm) 999 { 1000 INIT_LIST_HEAD(&wsm->idle_ws); 1001 spin_lock_init(&wsm->ws_lock); 1002 atomic_set(&wsm->total_ws, 0); 1003 wsm->free_ws = 0; 1004 init_waitqueue_head(&wsm->ws_wait); 1005 } 1006 1007 static void add_initial_workspace(struct workspace_manager *wsm, 1008 struct crush_work *work) 1009 { 1010 WARN_ON(!list_empty(&wsm->idle_ws)); 1011 1012 list_add(&work->item, &wsm->idle_ws); 1013 atomic_set(&wsm->total_ws, 1); 1014 wsm->free_ws = 1; 1015 } 1016 1017 static void cleanup_workspace_manager(struct workspace_manager *wsm) 1018 { 1019 struct crush_work *work; 1020 1021 while (!list_empty(&wsm->idle_ws)) { 1022 work = list_first_entry(&wsm->idle_ws, struct crush_work, 1023 item); 1024 list_del_init(&work->item); 1025 free_workspace(work); 1026 } 1027 atomic_set(&wsm->total_ws, 0); 1028 wsm->free_ws = 0; 1029 } 1030 1031 /* 1032 * Finds an available workspace or allocates a new one. If it's not 1033 * possible to allocate a new one, waits until there is one. 1034 */ 1035 static struct crush_work *get_workspace(struct workspace_manager *wsm, 1036 const struct crush_map *c) 1037 { 1038 struct crush_work *work; 1039 int cpus = num_online_cpus(); 1040 1041 again: 1042 spin_lock(&wsm->ws_lock); 1043 if (!list_empty(&wsm->idle_ws)) { 1044 work = list_first_entry(&wsm->idle_ws, struct crush_work, 1045 item); 1046 list_del_init(&work->item); 1047 wsm->free_ws--; 1048 spin_unlock(&wsm->ws_lock); 1049 return work; 1050 1051 } 1052 if (atomic_read(&wsm->total_ws) > cpus) { 1053 DEFINE_WAIT(wait); 1054 1055 spin_unlock(&wsm->ws_lock); 1056 prepare_to_wait(&wsm->ws_wait, &wait, TASK_UNINTERRUPTIBLE); 1057 if (atomic_read(&wsm->total_ws) > cpus && !wsm->free_ws) 1058 schedule(); 1059 finish_wait(&wsm->ws_wait, &wait); 1060 goto again; 1061 } 1062 atomic_inc(&wsm->total_ws); 1063 spin_unlock(&wsm->ws_lock); 1064 1065 work = alloc_workspace(c); 1066 if (!work) { 1067 atomic_dec(&wsm->total_ws); 1068 wake_up(&wsm->ws_wait); 1069 1070 /* 1071 * Do not return the error but go back to waiting. We 1072 * have the inital workspace and the CRUSH computation 1073 * time is bounded so we will get it eventually. 1074 */ 1075 WARN_ON(atomic_read(&wsm->total_ws) < 1); 1076 goto again; 1077 } 1078 return work; 1079 } 1080 1081 /* 1082 * Puts a workspace back on the list or frees it if we have enough 1083 * idle ones sitting around. 1084 */ 1085 static void put_workspace(struct workspace_manager *wsm, 1086 struct crush_work *work) 1087 { 1088 spin_lock(&wsm->ws_lock); 1089 if (wsm->free_ws <= num_online_cpus()) { 1090 list_add(&work->item, &wsm->idle_ws); 1091 wsm->free_ws++; 1092 spin_unlock(&wsm->ws_lock); 1093 goto wake; 1094 } 1095 spin_unlock(&wsm->ws_lock); 1096 1097 free_workspace(work); 1098 atomic_dec(&wsm->total_ws); 1099 wake: 1100 if (wq_has_sleeper(&wsm->ws_wait)) 1101 wake_up(&wsm->ws_wait); 1102 } 1103 1104 /* 1105 * osd map 1106 */ 1107 struct ceph_osdmap *ceph_osdmap_alloc(void) 1108 { 1109 struct ceph_osdmap *map; 1110 1111 map = kzalloc(sizeof(*map), GFP_NOIO); 1112 if (!map) 1113 return NULL; 1114 1115 map->pg_pools = RB_ROOT; 1116 map->pool_max = -1; 1117 map->pg_temp = RB_ROOT; 1118 map->primary_temp = RB_ROOT; 1119 map->pg_upmap = RB_ROOT; 1120 map->pg_upmap_items = RB_ROOT; 1121 1122 init_workspace_manager(&map->crush_wsm); 1123 1124 return map; 1125 } 1126 1127 void ceph_osdmap_destroy(struct ceph_osdmap *map) 1128 { 1129 dout("osdmap_destroy %p\n", map); 1130 1131 if (map->crush) 1132 crush_destroy(map->crush); 1133 cleanup_workspace_manager(&map->crush_wsm); 1134 1135 while (!RB_EMPTY_ROOT(&map->pg_temp)) { 1136 struct ceph_pg_mapping *pg = 1137 rb_entry(rb_first(&map->pg_temp), 1138 struct ceph_pg_mapping, node); 1139 erase_pg_mapping(&map->pg_temp, pg); 1140 free_pg_mapping(pg); 1141 } 1142 while (!RB_EMPTY_ROOT(&map->primary_temp)) { 1143 struct ceph_pg_mapping *pg = 1144 rb_entry(rb_first(&map->primary_temp), 1145 struct ceph_pg_mapping, node); 1146 erase_pg_mapping(&map->primary_temp, pg); 1147 free_pg_mapping(pg); 1148 } 1149 while (!RB_EMPTY_ROOT(&map->pg_upmap)) { 1150 struct ceph_pg_mapping *pg = 1151 rb_entry(rb_first(&map->pg_upmap), 1152 struct ceph_pg_mapping, node); 1153 rb_erase(&pg->node, &map->pg_upmap); 1154 kfree(pg); 1155 } 1156 while (!RB_EMPTY_ROOT(&map->pg_upmap_items)) { 1157 struct ceph_pg_mapping *pg = 1158 rb_entry(rb_first(&map->pg_upmap_items), 1159 struct ceph_pg_mapping, node); 1160 rb_erase(&pg->node, &map->pg_upmap_items); 1161 kfree(pg); 1162 } 1163 while (!RB_EMPTY_ROOT(&map->pg_pools)) { 1164 struct ceph_pg_pool_info *pi = 1165 rb_entry(rb_first(&map->pg_pools), 1166 struct ceph_pg_pool_info, node); 1167 __remove_pg_pool(&map->pg_pools, pi); 1168 } 1169 kvfree(map->osd_state); 1170 kvfree(map->osd_weight); 1171 kvfree(map->osd_addr); 1172 kvfree(map->osd_primary_affinity); 1173 kfree(map); 1174 } 1175 1176 /* 1177 * Adjust max_osd value, (re)allocate arrays. 1178 * 1179 * The new elements are properly initialized. 1180 */ 1181 static int osdmap_set_max_osd(struct ceph_osdmap *map, u32 max) 1182 { 1183 u32 *state; 1184 u32 *weight; 1185 struct ceph_entity_addr *addr; 1186 u32 to_copy; 1187 int i; 1188 1189 dout("%s old %u new %u\n", __func__, map->max_osd, max); 1190 if (max == map->max_osd) 1191 return 0; 1192 1193 state = ceph_kvmalloc(array_size(max, sizeof(*state)), GFP_NOFS); 1194 weight = ceph_kvmalloc(array_size(max, sizeof(*weight)), GFP_NOFS); 1195 addr = ceph_kvmalloc(array_size(max, sizeof(*addr)), GFP_NOFS); 1196 if (!state || !weight || !addr) { 1197 kvfree(state); 1198 kvfree(weight); 1199 kvfree(addr); 1200 return -ENOMEM; 1201 } 1202 1203 to_copy = min(map->max_osd, max); 1204 if (map->osd_state) { 1205 memcpy(state, map->osd_state, to_copy * sizeof(*state)); 1206 memcpy(weight, map->osd_weight, to_copy * sizeof(*weight)); 1207 memcpy(addr, map->osd_addr, to_copy * sizeof(*addr)); 1208 kvfree(map->osd_state); 1209 kvfree(map->osd_weight); 1210 kvfree(map->osd_addr); 1211 } 1212 1213 map->osd_state = state; 1214 map->osd_weight = weight; 1215 map->osd_addr = addr; 1216 for (i = map->max_osd; i < max; i++) { 1217 map->osd_state[i] = 0; 1218 map->osd_weight[i] = CEPH_OSD_OUT; 1219 memset(map->osd_addr + i, 0, sizeof(*map->osd_addr)); 1220 } 1221 1222 if (map->osd_primary_affinity) { 1223 u32 *affinity; 1224 1225 affinity = ceph_kvmalloc(array_size(max, sizeof(*affinity)), 1226 GFP_NOFS); 1227 if (!affinity) 1228 return -ENOMEM; 1229 1230 memcpy(affinity, map->osd_primary_affinity, 1231 to_copy * sizeof(*affinity)); 1232 kvfree(map->osd_primary_affinity); 1233 1234 map->osd_primary_affinity = affinity; 1235 for (i = map->max_osd; i < max; i++) 1236 map->osd_primary_affinity[i] = 1237 CEPH_OSD_DEFAULT_PRIMARY_AFFINITY; 1238 } 1239 1240 map->max_osd = max; 1241 1242 return 0; 1243 } 1244 1245 static int osdmap_set_crush(struct ceph_osdmap *map, struct crush_map *crush) 1246 { 1247 struct crush_work *work; 1248 1249 if (IS_ERR(crush)) 1250 return PTR_ERR(crush); 1251 1252 work = alloc_workspace(crush); 1253 if (!work) { 1254 crush_destroy(crush); 1255 return -ENOMEM; 1256 } 1257 1258 if (map->crush) 1259 crush_destroy(map->crush); 1260 cleanup_workspace_manager(&map->crush_wsm); 1261 map->crush = crush; 1262 add_initial_workspace(&map->crush_wsm, work); 1263 return 0; 1264 } 1265 1266 #define OSDMAP_WRAPPER_COMPAT_VER 7 1267 #define OSDMAP_CLIENT_DATA_COMPAT_VER 1 1268 1269 /* 1270 * Return 0 or error. On success, *v is set to 0 for old (v6) osdmaps, 1271 * to struct_v of the client_data section for new (v7 and above) 1272 * osdmaps. 1273 */ 1274 static int get_osdmap_client_data_v(void **p, void *end, 1275 const char *prefix, u8 *v) 1276 { 1277 u8 struct_v; 1278 1279 ceph_decode_8_safe(p, end, struct_v, e_inval); 1280 if (struct_v >= 7) { 1281 u8 struct_compat; 1282 1283 ceph_decode_8_safe(p, end, struct_compat, e_inval); 1284 if (struct_compat > OSDMAP_WRAPPER_COMPAT_VER) { 1285 pr_warn("got v %d cv %d > %d of %s ceph_osdmap\n", 1286 struct_v, struct_compat, 1287 OSDMAP_WRAPPER_COMPAT_VER, prefix); 1288 return -EINVAL; 1289 } 1290 *p += 4; /* ignore wrapper struct_len */ 1291 1292 ceph_decode_8_safe(p, end, struct_v, e_inval); 1293 ceph_decode_8_safe(p, end, struct_compat, e_inval); 1294 if (struct_compat > OSDMAP_CLIENT_DATA_COMPAT_VER) { 1295 pr_warn("got v %d cv %d > %d of %s ceph_osdmap client data\n", 1296 struct_v, struct_compat, 1297 OSDMAP_CLIENT_DATA_COMPAT_VER, prefix); 1298 return -EINVAL; 1299 } 1300 *p += 4; /* ignore client data struct_len */ 1301 } else { 1302 u16 version; 1303 1304 *p -= 1; 1305 ceph_decode_16_safe(p, end, version, e_inval); 1306 if (version < 6) { 1307 pr_warn("got v %d < 6 of %s ceph_osdmap\n", 1308 version, prefix); 1309 return -EINVAL; 1310 } 1311 1312 /* old osdmap enconding */ 1313 struct_v = 0; 1314 } 1315 1316 *v = struct_v; 1317 return 0; 1318 1319 e_inval: 1320 return -EINVAL; 1321 } 1322 1323 static int __decode_pools(void **p, void *end, struct ceph_osdmap *map, 1324 bool incremental) 1325 { 1326 u32 n; 1327 1328 ceph_decode_32_safe(p, end, n, e_inval); 1329 while (n--) { 1330 struct ceph_pg_pool_info *pi; 1331 u64 pool; 1332 int ret; 1333 1334 ceph_decode_64_safe(p, end, pool, e_inval); 1335 1336 pi = lookup_pg_pool(&map->pg_pools, pool); 1337 if (!incremental || !pi) { 1338 pi = kzalloc(sizeof(*pi), GFP_NOFS); 1339 if (!pi) 1340 return -ENOMEM; 1341 1342 RB_CLEAR_NODE(&pi->node); 1343 pi->id = pool; 1344 1345 if (!__insert_pg_pool(&map->pg_pools, pi)) { 1346 kfree(pi); 1347 return -EEXIST; 1348 } 1349 } 1350 1351 ret = decode_pool(p, end, pi); 1352 if (ret) 1353 return ret; 1354 } 1355 1356 return 0; 1357 1358 e_inval: 1359 return -EINVAL; 1360 } 1361 1362 static int decode_pools(void **p, void *end, struct ceph_osdmap *map) 1363 { 1364 return __decode_pools(p, end, map, false); 1365 } 1366 1367 static int decode_new_pools(void **p, void *end, struct ceph_osdmap *map) 1368 { 1369 return __decode_pools(p, end, map, true); 1370 } 1371 1372 typedef struct ceph_pg_mapping *(*decode_mapping_fn_t)(void **, void *, bool); 1373 1374 static int decode_pg_mapping(void **p, void *end, struct rb_root *mapping_root, 1375 decode_mapping_fn_t fn, bool incremental) 1376 { 1377 u32 n; 1378 1379 WARN_ON(!incremental && !fn); 1380 1381 ceph_decode_32_safe(p, end, n, e_inval); 1382 while (n--) { 1383 struct ceph_pg_mapping *pg; 1384 struct ceph_pg pgid; 1385 int ret; 1386 1387 ret = ceph_decode_pgid(p, end, &pgid); 1388 if (ret) 1389 return ret; 1390 1391 pg = lookup_pg_mapping(mapping_root, &pgid); 1392 if (pg) { 1393 WARN_ON(!incremental); 1394 erase_pg_mapping(mapping_root, pg); 1395 free_pg_mapping(pg); 1396 } 1397 1398 if (fn) { 1399 pg = fn(p, end, incremental); 1400 if (IS_ERR(pg)) 1401 return PTR_ERR(pg); 1402 1403 if (pg) { 1404 pg->pgid = pgid; /* struct */ 1405 insert_pg_mapping(mapping_root, pg); 1406 } 1407 } 1408 } 1409 1410 return 0; 1411 1412 e_inval: 1413 return -EINVAL; 1414 } 1415 1416 static struct ceph_pg_mapping *__decode_pg_temp(void **p, void *end, 1417 bool incremental) 1418 { 1419 struct ceph_pg_mapping *pg; 1420 u32 len, i; 1421 1422 ceph_decode_32_safe(p, end, len, e_inval); 1423 if (len == 0 && incremental) 1424 return NULL; /* new_pg_temp: [] to remove */ 1425 if (len > (SIZE_MAX - sizeof(*pg)) / sizeof(u32)) 1426 return ERR_PTR(-EINVAL); 1427 1428 ceph_decode_need(p, end, len * sizeof(u32), e_inval); 1429 pg = alloc_pg_mapping(len * sizeof(u32)); 1430 if (!pg) 1431 return ERR_PTR(-ENOMEM); 1432 1433 pg->pg_temp.len = len; 1434 for (i = 0; i < len; i++) 1435 pg->pg_temp.osds[i] = ceph_decode_32(p); 1436 1437 return pg; 1438 1439 e_inval: 1440 return ERR_PTR(-EINVAL); 1441 } 1442 1443 static int decode_pg_temp(void **p, void *end, struct ceph_osdmap *map) 1444 { 1445 return decode_pg_mapping(p, end, &map->pg_temp, __decode_pg_temp, 1446 false); 1447 } 1448 1449 static int decode_new_pg_temp(void **p, void *end, struct ceph_osdmap *map) 1450 { 1451 return decode_pg_mapping(p, end, &map->pg_temp, __decode_pg_temp, 1452 true); 1453 } 1454 1455 static struct ceph_pg_mapping *__decode_primary_temp(void **p, void *end, 1456 bool incremental) 1457 { 1458 struct ceph_pg_mapping *pg; 1459 u32 osd; 1460 1461 ceph_decode_32_safe(p, end, osd, e_inval); 1462 if (osd == (u32)-1 && incremental) 1463 return NULL; /* new_primary_temp: -1 to remove */ 1464 1465 pg = alloc_pg_mapping(0); 1466 if (!pg) 1467 return ERR_PTR(-ENOMEM); 1468 1469 pg->primary_temp.osd = osd; 1470 return pg; 1471 1472 e_inval: 1473 return ERR_PTR(-EINVAL); 1474 } 1475 1476 static int decode_primary_temp(void **p, void *end, struct ceph_osdmap *map) 1477 { 1478 return decode_pg_mapping(p, end, &map->primary_temp, 1479 __decode_primary_temp, false); 1480 } 1481 1482 static int decode_new_primary_temp(void **p, void *end, 1483 struct ceph_osdmap *map) 1484 { 1485 return decode_pg_mapping(p, end, &map->primary_temp, 1486 __decode_primary_temp, true); 1487 } 1488 1489 u32 ceph_get_primary_affinity(struct ceph_osdmap *map, int osd) 1490 { 1491 BUG_ON(osd >= map->max_osd); 1492 1493 if (!map->osd_primary_affinity) 1494 return CEPH_OSD_DEFAULT_PRIMARY_AFFINITY; 1495 1496 return map->osd_primary_affinity[osd]; 1497 } 1498 1499 static int set_primary_affinity(struct ceph_osdmap *map, int osd, u32 aff) 1500 { 1501 BUG_ON(osd >= map->max_osd); 1502 1503 if (!map->osd_primary_affinity) { 1504 int i; 1505 1506 map->osd_primary_affinity = ceph_kvmalloc( 1507 array_size(map->max_osd, sizeof(*map->osd_primary_affinity)), 1508 GFP_NOFS); 1509 if (!map->osd_primary_affinity) 1510 return -ENOMEM; 1511 1512 for (i = 0; i < map->max_osd; i++) 1513 map->osd_primary_affinity[i] = 1514 CEPH_OSD_DEFAULT_PRIMARY_AFFINITY; 1515 } 1516 1517 map->osd_primary_affinity[osd] = aff; 1518 1519 return 0; 1520 } 1521 1522 static int decode_primary_affinity(void **p, void *end, 1523 struct ceph_osdmap *map) 1524 { 1525 u32 len, i; 1526 1527 ceph_decode_32_safe(p, end, len, e_inval); 1528 if (len == 0) { 1529 kvfree(map->osd_primary_affinity); 1530 map->osd_primary_affinity = NULL; 1531 return 0; 1532 } 1533 if (len != map->max_osd) 1534 goto e_inval; 1535 1536 ceph_decode_need(p, end, map->max_osd*sizeof(u32), e_inval); 1537 1538 for (i = 0; i < map->max_osd; i++) { 1539 int ret; 1540 1541 ret = set_primary_affinity(map, i, ceph_decode_32(p)); 1542 if (ret) 1543 return ret; 1544 } 1545 1546 return 0; 1547 1548 e_inval: 1549 return -EINVAL; 1550 } 1551 1552 static int decode_new_primary_affinity(void **p, void *end, 1553 struct ceph_osdmap *map) 1554 { 1555 u32 n; 1556 1557 ceph_decode_32_safe(p, end, n, e_inval); 1558 while (n--) { 1559 u32 osd, aff; 1560 int ret; 1561 1562 ceph_decode_32_safe(p, end, osd, e_inval); 1563 ceph_decode_32_safe(p, end, aff, e_inval); 1564 1565 ret = set_primary_affinity(map, osd, aff); 1566 if (ret) 1567 return ret; 1568 1569 pr_info("osd%d primary-affinity 0x%x\n", osd, aff); 1570 } 1571 1572 return 0; 1573 1574 e_inval: 1575 return -EINVAL; 1576 } 1577 1578 static struct ceph_pg_mapping *__decode_pg_upmap(void **p, void *end, 1579 bool __unused) 1580 { 1581 return __decode_pg_temp(p, end, false); 1582 } 1583 1584 static int decode_pg_upmap(void **p, void *end, struct ceph_osdmap *map) 1585 { 1586 return decode_pg_mapping(p, end, &map->pg_upmap, __decode_pg_upmap, 1587 false); 1588 } 1589 1590 static int decode_new_pg_upmap(void **p, void *end, struct ceph_osdmap *map) 1591 { 1592 return decode_pg_mapping(p, end, &map->pg_upmap, __decode_pg_upmap, 1593 true); 1594 } 1595 1596 static int decode_old_pg_upmap(void **p, void *end, struct ceph_osdmap *map) 1597 { 1598 return decode_pg_mapping(p, end, &map->pg_upmap, NULL, true); 1599 } 1600 1601 static struct ceph_pg_mapping *__decode_pg_upmap_items(void **p, void *end, 1602 bool __unused) 1603 { 1604 struct ceph_pg_mapping *pg; 1605 u32 len, i; 1606 1607 ceph_decode_32_safe(p, end, len, e_inval); 1608 if (len > (SIZE_MAX - sizeof(*pg)) / (2 * sizeof(u32))) 1609 return ERR_PTR(-EINVAL); 1610 1611 ceph_decode_need(p, end, 2 * len * sizeof(u32), e_inval); 1612 pg = alloc_pg_mapping(2 * len * sizeof(u32)); 1613 if (!pg) 1614 return ERR_PTR(-ENOMEM); 1615 1616 pg->pg_upmap_items.len = len; 1617 for (i = 0; i < len; i++) { 1618 pg->pg_upmap_items.from_to[i][0] = ceph_decode_32(p); 1619 pg->pg_upmap_items.from_to[i][1] = ceph_decode_32(p); 1620 } 1621 1622 return pg; 1623 1624 e_inval: 1625 return ERR_PTR(-EINVAL); 1626 } 1627 1628 static int decode_pg_upmap_items(void **p, void *end, struct ceph_osdmap *map) 1629 { 1630 return decode_pg_mapping(p, end, &map->pg_upmap_items, 1631 __decode_pg_upmap_items, false); 1632 } 1633 1634 static int decode_new_pg_upmap_items(void **p, void *end, 1635 struct ceph_osdmap *map) 1636 { 1637 return decode_pg_mapping(p, end, &map->pg_upmap_items, 1638 __decode_pg_upmap_items, true); 1639 } 1640 1641 static int decode_old_pg_upmap_items(void **p, void *end, 1642 struct ceph_osdmap *map) 1643 { 1644 return decode_pg_mapping(p, end, &map->pg_upmap_items, NULL, true); 1645 } 1646 1647 /* 1648 * decode a full map. 1649 */ 1650 static int osdmap_decode(void **p, void *end, struct ceph_osdmap *map) 1651 { 1652 u8 struct_v; 1653 u32 epoch = 0; 1654 void *start = *p; 1655 u32 max; 1656 u32 len, i; 1657 int err; 1658 1659 dout("%s %p to %p len %d\n", __func__, *p, end, (int)(end - *p)); 1660 1661 err = get_osdmap_client_data_v(p, end, "full", &struct_v); 1662 if (err) 1663 goto bad; 1664 1665 /* fsid, epoch, created, modified */ 1666 ceph_decode_need(p, end, sizeof(map->fsid) + sizeof(u32) + 1667 sizeof(map->created) + sizeof(map->modified), e_inval); 1668 ceph_decode_copy(p, &map->fsid, sizeof(map->fsid)); 1669 epoch = map->epoch = ceph_decode_32(p); 1670 ceph_decode_copy(p, &map->created, sizeof(map->created)); 1671 ceph_decode_copy(p, &map->modified, sizeof(map->modified)); 1672 1673 /* pools */ 1674 err = decode_pools(p, end, map); 1675 if (err) 1676 goto bad; 1677 1678 /* pool_name */ 1679 err = decode_pool_names(p, end, map); 1680 if (err) 1681 goto bad; 1682 1683 ceph_decode_32_safe(p, end, map->pool_max, e_inval); 1684 1685 ceph_decode_32_safe(p, end, map->flags, e_inval); 1686 1687 /* max_osd */ 1688 ceph_decode_32_safe(p, end, max, e_inval); 1689 1690 /* (re)alloc osd arrays */ 1691 err = osdmap_set_max_osd(map, max); 1692 if (err) 1693 goto bad; 1694 1695 /* osd_state, osd_weight, osd_addrs->client_addr */ 1696 ceph_decode_need(p, end, 3*sizeof(u32) + 1697 map->max_osd*(struct_v >= 5 ? sizeof(u32) : 1698 sizeof(u8)) + 1699 sizeof(*map->osd_weight), e_inval); 1700 if (ceph_decode_32(p) != map->max_osd) 1701 goto e_inval; 1702 1703 if (struct_v >= 5) { 1704 for (i = 0; i < map->max_osd; i++) 1705 map->osd_state[i] = ceph_decode_32(p); 1706 } else { 1707 for (i = 0; i < map->max_osd; i++) 1708 map->osd_state[i] = ceph_decode_8(p); 1709 } 1710 1711 if (ceph_decode_32(p) != map->max_osd) 1712 goto e_inval; 1713 1714 for (i = 0; i < map->max_osd; i++) 1715 map->osd_weight[i] = ceph_decode_32(p); 1716 1717 if (ceph_decode_32(p) != map->max_osd) 1718 goto e_inval; 1719 1720 for (i = 0; i < map->max_osd; i++) { 1721 err = ceph_decode_entity_addr(p, end, &map->osd_addr[i]); 1722 if (err) 1723 goto bad; 1724 } 1725 1726 /* pg_temp */ 1727 err = decode_pg_temp(p, end, map); 1728 if (err) 1729 goto bad; 1730 1731 /* primary_temp */ 1732 if (struct_v >= 1) { 1733 err = decode_primary_temp(p, end, map); 1734 if (err) 1735 goto bad; 1736 } 1737 1738 /* primary_affinity */ 1739 if (struct_v >= 2) { 1740 err = decode_primary_affinity(p, end, map); 1741 if (err) 1742 goto bad; 1743 } else { 1744 WARN_ON(map->osd_primary_affinity); 1745 } 1746 1747 /* crush */ 1748 ceph_decode_32_safe(p, end, len, e_inval); 1749 err = osdmap_set_crush(map, crush_decode(*p, min(*p + len, end))); 1750 if (err) 1751 goto bad; 1752 1753 *p += len; 1754 if (struct_v >= 3) { 1755 /* erasure_code_profiles */ 1756 ceph_decode_skip_map_of_map(p, end, string, string, string, 1757 e_inval); 1758 } 1759 1760 if (struct_v >= 4) { 1761 err = decode_pg_upmap(p, end, map); 1762 if (err) 1763 goto bad; 1764 1765 err = decode_pg_upmap_items(p, end, map); 1766 if (err) 1767 goto bad; 1768 } else { 1769 WARN_ON(!RB_EMPTY_ROOT(&map->pg_upmap)); 1770 WARN_ON(!RB_EMPTY_ROOT(&map->pg_upmap_items)); 1771 } 1772 1773 /* ignore the rest */ 1774 *p = end; 1775 1776 dout("full osdmap epoch %d max_osd %d\n", map->epoch, map->max_osd); 1777 return 0; 1778 1779 e_inval: 1780 err = -EINVAL; 1781 bad: 1782 pr_err("corrupt full osdmap (%d) epoch %d off %d (%p of %p-%p)\n", 1783 err, epoch, (int)(*p - start), *p, start, end); 1784 print_hex_dump(KERN_DEBUG, "osdmap: ", 1785 DUMP_PREFIX_OFFSET, 16, 1, 1786 start, end - start, true); 1787 return err; 1788 } 1789 1790 /* 1791 * Allocate and decode a full map. 1792 */ 1793 struct ceph_osdmap *ceph_osdmap_decode(void **p, void *end) 1794 { 1795 struct ceph_osdmap *map; 1796 int ret; 1797 1798 map = ceph_osdmap_alloc(); 1799 if (!map) 1800 return ERR_PTR(-ENOMEM); 1801 1802 ret = osdmap_decode(p, end, map); 1803 if (ret) { 1804 ceph_osdmap_destroy(map); 1805 return ERR_PTR(ret); 1806 } 1807 1808 return map; 1809 } 1810 1811 /* 1812 * Encoding order is (new_up_client, new_state, new_weight). Need to 1813 * apply in the (new_weight, new_state, new_up_client) order, because 1814 * an incremental map may look like e.g. 1815 * 1816 * new_up_client: { osd=6, addr=... } # set osd_state and addr 1817 * new_state: { osd=6, xorstate=EXISTS } # clear osd_state 1818 */ 1819 static int decode_new_up_state_weight(void **p, void *end, u8 struct_v, 1820 struct ceph_osdmap *map) 1821 { 1822 void *new_up_client; 1823 void *new_state; 1824 void *new_weight_end; 1825 u32 len; 1826 int i; 1827 1828 new_up_client = *p; 1829 ceph_decode_32_safe(p, end, len, e_inval); 1830 for (i = 0; i < len; ++i) { 1831 struct ceph_entity_addr addr; 1832 1833 ceph_decode_skip_32(p, end, e_inval); 1834 if (ceph_decode_entity_addr(p, end, &addr)) 1835 goto e_inval; 1836 } 1837 1838 new_state = *p; 1839 ceph_decode_32_safe(p, end, len, e_inval); 1840 len *= sizeof(u32) + (struct_v >= 5 ? sizeof(u32) : sizeof(u8)); 1841 ceph_decode_need(p, end, len, e_inval); 1842 *p += len; 1843 1844 /* new_weight */ 1845 ceph_decode_32_safe(p, end, len, e_inval); 1846 while (len--) { 1847 s32 osd; 1848 u32 w; 1849 1850 ceph_decode_need(p, end, 2*sizeof(u32), e_inval); 1851 osd = ceph_decode_32(p); 1852 w = ceph_decode_32(p); 1853 BUG_ON(osd >= map->max_osd); 1854 pr_info("osd%d weight 0x%x %s\n", osd, w, 1855 w == CEPH_OSD_IN ? "(in)" : 1856 (w == CEPH_OSD_OUT ? "(out)" : "")); 1857 map->osd_weight[osd] = w; 1858 1859 /* 1860 * If we are marking in, set the EXISTS, and clear the 1861 * AUTOOUT and NEW bits. 1862 */ 1863 if (w) { 1864 map->osd_state[osd] |= CEPH_OSD_EXISTS; 1865 map->osd_state[osd] &= ~(CEPH_OSD_AUTOOUT | 1866 CEPH_OSD_NEW); 1867 } 1868 } 1869 new_weight_end = *p; 1870 1871 /* new_state (up/down) */ 1872 *p = new_state; 1873 len = ceph_decode_32(p); 1874 while (len--) { 1875 s32 osd; 1876 u32 xorstate; 1877 int ret; 1878 1879 osd = ceph_decode_32(p); 1880 if (struct_v >= 5) 1881 xorstate = ceph_decode_32(p); 1882 else 1883 xorstate = ceph_decode_8(p); 1884 if (xorstate == 0) 1885 xorstate = CEPH_OSD_UP; 1886 BUG_ON(osd >= map->max_osd); 1887 if ((map->osd_state[osd] & CEPH_OSD_UP) && 1888 (xorstate & CEPH_OSD_UP)) 1889 pr_info("osd%d down\n", osd); 1890 if ((map->osd_state[osd] & CEPH_OSD_EXISTS) && 1891 (xorstate & CEPH_OSD_EXISTS)) { 1892 pr_info("osd%d does not exist\n", osd); 1893 ret = set_primary_affinity(map, osd, 1894 CEPH_OSD_DEFAULT_PRIMARY_AFFINITY); 1895 if (ret) 1896 return ret; 1897 memset(map->osd_addr + osd, 0, sizeof(*map->osd_addr)); 1898 map->osd_state[osd] = 0; 1899 } else { 1900 map->osd_state[osd] ^= xorstate; 1901 } 1902 } 1903 1904 /* new_up_client */ 1905 *p = new_up_client; 1906 len = ceph_decode_32(p); 1907 while (len--) { 1908 s32 osd; 1909 struct ceph_entity_addr addr; 1910 1911 osd = ceph_decode_32(p); 1912 BUG_ON(osd >= map->max_osd); 1913 if (ceph_decode_entity_addr(p, end, &addr)) 1914 goto e_inval; 1915 pr_info("osd%d up\n", osd); 1916 map->osd_state[osd] |= CEPH_OSD_EXISTS | CEPH_OSD_UP; 1917 map->osd_addr[osd] = addr; 1918 } 1919 1920 *p = new_weight_end; 1921 return 0; 1922 1923 e_inval: 1924 return -EINVAL; 1925 } 1926 1927 /* 1928 * decode and apply an incremental map update. 1929 */ 1930 struct ceph_osdmap *osdmap_apply_incremental(void **p, void *end, 1931 struct ceph_osdmap *map) 1932 { 1933 struct ceph_fsid fsid; 1934 u32 epoch = 0; 1935 struct ceph_timespec modified; 1936 s32 len; 1937 u64 pool; 1938 __s64 new_pool_max; 1939 __s32 new_flags, max; 1940 void *start = *p; 1941 int err; 1942 u8 struct_v; 1943 1944 dout("%s %p to %p len %d\n", __func__, *p, end, (int)(end - *p)); 1945 1946 err = get_osdmap_client_data_v(p, end, "inc", &struct_v); 1947 if (err) 1948 goto bad; 1949 1950 /* fsid, epoch, modified, new_pool_max, new_flags */ 1951 ceph_decode_need(p, end, sizeof(fsid) + sizeof(u32) + sizeof(modified) + 1952 sizeof(u64) + sizeof(u32), e_inval); 1953 ceph_decode_copy(p, &fsid, sizeof(fsid)); 1954 epoch = ceph_decode_32(p); 1955 BUG_ON(epoch != map->epoch+1); 1956 ceph_decode_copy(p, &modified, sizeof(modified)); 1957 new_pool_max = ceph_decode_64(p); 1958 new_flags = ceph_decode_32(p); 1959 1960 /* full map? */ 1961 ceph_decode_32_safe(p, end, len, e_inval); 1962 if (len > 0) { 1963 dout("apply_incremental full map len %d, %p to %p\n", 1964 len, *p, end); 1965 return ceph_osdmap_decode(p, min(*p+len, end)); 1966 } 1967 1968 /* new crush? */ 1969 ceph_decode_32_safe(p, end, len, e_inval); 1970 if (len > 0) { 1971 err = osdmap_set_crush(map, 1972 crush_decode(*p, min(*p + len, end))); 1973 if (err) 1974 goto bad; 1975 *p += len; 1976 } 1977 1978 /* new flags? */ 1979 if (new_flags >= 0) 1980 map->flags = new_flags; 1981 if (new_pool_max >= 0) 1982 map->pool_max = new_pool_max; 1983 1984 /* new max? */ 1985 ceph_decode_32_safe(p, end, max, e_inval); 1986 if (max >= 0) { 1987 err = osdmap_set_max_osd(map, max); 1988 if (err) 1989 goto bad; 1990 } 1991 1992 map->epoch++; 1993 map->modified = modified; 1994 1995 /* new_pools */ 1996 err = decode_new_pools(p, end, map); 1997 if (err) 1998 goto bad; 1999 2000 /* new_pool_names */ 2001 err = decode_pool_names(p, end, map); 2002 if (err) 2003 goto bad; 2004 2005 /* old_pool */ 2006 ceph_decode_32_safe(p, end, len, e_inval); 2007 while (len--) { 2008 struct ceph_pg_pool_info *pi; 2009 2010 ceph_decode_64_safe(p, end, pool, e_inval); 2011 pi = lookup_pg_pool(&map->pg_pools, pool); 2012 if (pi) 2013 __remove_pg_pool(&map->pg_pools, pi); 2014 } 2015 2016 /* new_up_client, new_state, new_weight */ 2017 err = decode_new_up_state_weight(p, end, struct_v, map); 2018 if (err) 2019 goto bad; 2020 2021 /* new_pg_temp */ 2022 err = decode_new_pg_temp(p, end, map); 2023 if (err) 2024 goto bad; 2025 2026 /* new_primary_temp */ 2027 if (struct_v >= 1) { 2028 err = decode_new_primary_temp(p, end, map); 2029 if (err) 2030 goto bad; 2031 } 2032 2033 /* new_primary_affinity */ 2034 if (struct_v >= 2) { 2035 err = decode_new_primary_affinity(p, end, map); 2036 if (err) 2037 goto bad; 2038 } 2039 2040 if (struct_v >= 3) { 2041 /* new_erasure_code_profiles */ 2042 ceph_decode_skip_map_of_map(p, end, string, string, string, 2043 e_inval); 2044 /* old_erasure_code_profiles */ 2045 ceph_decode_skip_set(p, end, string, e_inval); 2046 } 2047 2048 if (struct_v >= 4) { 2049 err = decode_new_pg_upmap(p, end, map); 2050 if (err) 2051 goto bad; 2052 2053 err = decode_old_pg_upmap(p, end, map); 2054 if (err) 2055 goto bad; 2056 2057 err = decode_new_pg_upmap_items(p, end, map); 2058 if (err) 2059 goto bad; 2060 2061 err = decode_old_pg_upmap_items(p, end, map); 2062 if (err) 2063 goto bad; 2064 } 2065 2066 /* ignore the rest */ 2067 *p = end; 2068 2069 dout("inc osdmap epoch %d max_osd %d\n", map->epoch, map->max_osd); 2070 return map; 2071 2072 e_inval: 2073 err = -EINVAL; 2074 bad: 2075 pr_err("corrupt inc osdmap (%d) epoch %d off %d (%p of %p-%p)\n", 2076 err, epoch, (int)(*p - start), *p, start, end); 2077 print_hex_dump(KERN_DEBUG, "osdmap: ", 2078 DUMP_PREFIX_OFFSET, 16, 1, 2079 start, end - start, true); 2080 return ERR_PTR(err); 2081 } 2082 2083 void ceph_oloc_copy(struct ceph_object_locator *dest, 2084 const struct ceph_object_locator *src) 2085 { 2086 ceph_oloc_destroy(dest); 2087 2088 dest->pool = src->pool; 2089 if (src->pool_ns) 2090 dest->pool_ns = ceph_get_string(src->pool_ns); 2091 else 2092 dest->pool_ns = NULL; 2093 } 2094 EXPORT_SYMBOL(ceph_oloc_copy); 2095 2096 void ceph_oloc_destroy(struct ceph_object_locator *oloc) 2097 { 2098 ceph_put_string(oloc->pool_ns); 2099 } 2100 EXPORT_SYMBOL(ceph_oloc_destroy); 2101 2102 void ceph_oid_copy(struct ceph_object_id *dest, 2103 const struct ceph_object_id *src) 2104 { 2105 ceph_oid_destroy(dest); 2106 2107 if (src->name != src->inline_name) { 2108 /* very rare, see ceph_object_id definition */ 2109 dest->name = kmalloc(src->name_len + 1, 2110 GFP_NOIO | __GFP_NOFAIL); 2111 } else { 2112 dest->name = dest->inline_name; 2113 } 2114 memcpy(dest->name, src->name, src->name_len + 1); 2115 dest->name_len = src->name_len; 2116 } 2117 EXPORT_SYMBOL(ceph_oid_copy); 2118 2119 static __printf(2, 0) 2120 int oid_printf_vargs(struct ceph_object_id *oid, const char *fmt, va_list ap) 2121 { 2122 int len; 2123 2124 WARN_ON(!ceph_oid_empty(oid)); 2125 2126 len = vsnprintf(oid->inline_name, sizeof(oid->inline_name), fmt, ap); 2127 if (len >= sizeof(oid->inline_name)) 2128 return len; 2129 2130 oid->name_len = len; 2131 return 0; 2132 } 2133 2134 /* 2135 * If oid doesn't fit into inline buffer, BUG. 2136 */ 2137 void ceph_oid_printf(struct ceph_object_id *oid, const char *fmt, ...) 2138 { 2139 va_list ap; 2140 2141 va_start(ap, fmt); 2142 BUG_ON(oid_printf_vargs(oid, fmt, ap)); 2143 va_end(ap); 2144 } 2145 EXPORT_SYMBOL(ceph_oid_printf); 2146 2147 static __printf(3, 0) 2148 int oid_aprintf_vargs(struct ceph_object_id *oid, gfp_t gfp, 2149 const char *fmt, va_list ap) 2150 { 2151 va_list aq; 2152 int len; 2153 2154 va_copy(aq, ap); 2155 len = oid_printf_vargs(oid, fmt, aq); 2156 va_end(aq); 2157 2158 if (len) { 2159 char *external_name; 2160 2161 external_name = kmalloc(len + 1, gfp); 2162 if (!external_name) 2163 return -ENOMEM; 2164 2165 oid->name = external_name; 2166 WARN_ON(vsnprintf(oid->name, len + 1, fmt, ap) != len); 2167 oid->name_len = len; 2168 } 2169 2170 return 0; 2171 } 2172 2173 /* 2174 * If oid doesn't fit into inline buffer, allocate. 2175 */ 2176 int ceph_oid_aprintf(struct ceph_object_id *oid, gfp_t gfp, 2177 const char *fmt, ...) 2178 { 2179 va_list ap; 2180 int ret; 2181 2182 va_start(ap, fmt); 2183 ret = oid_aprintf_vargs(oid, gfp, fmt, ap); 2184 va_end(ap); 2185 2186 return ret; 2187 } 2188 EXPORT_SYMBOL(ceph_oid_aprintf); 2189 2190 void ceph_oid_destroy(struct ceph_object_id *oid) 2191 { 2192 if (oid->name != oid->inline_name) 2193 kfree(oid->name); 2194 } 2195 EXPORT_SYMBOL(ceph_oid_destroy); 2196 2197 /* 2198 * osds only 2199 */ 2200 static bool __osds_equal(const struct ceph_osds *lhs, 2201 const struct ceph_osds *rhs) 2202 { 2203 if (lhs->size == rhs->size && 2204 !memcmp(lhs->osds, rhs->osds, rhs->size * sizeof(rhs->osds[0]))) 2205 return true; 2206 2207 return false; 2208 } 2209 2210 /* 2211 * osds + primary 2212 */ 2213 static bool osds_equal(const struct ceph_osds *lhs, 2214 const struct ceph_osds *rhs) 2215 { 2216 if (__osds_equal(lhs, rhs) && 2217 lhs->primary == rhs->primary) 2218 return true; 2219 2220 return false; 2221 } 2222 2223 static bool osds_valid(const struct ceph_osds *set) 2224 { 2225 /* non-empty set */ 2226 if (set->size > 0 && set->primary >= 0) 2227 return true; 2228 2229 /* empty can_shift_osds set */ 2230 if (!set->size && set->primary == -1) 2231 return true; 2232 2233 /* empty !can_shift_osds set - all NONE */ 2234 if (set->size > 0 && set->primary == -1) { 2235 int i; 2236 2237 for (i = 0; i < set->size; i++) { 2238 if (set->osds[i] != CRUSH_ITEM_NONE) 2239 break; 2240 } 2241 if (i == set->size) 2242 return true; 2243 } 2244 2245 return false; 2246 } 2247 2248 void ceph_osds_copy(struct ceph_osds *dest, const struct ceph_osds *src) 2249 { 2250 memcpy(dest->osds, src->osds, src->size * sizeof(src->osds[0])); 2251 dest->size = src->size; 2252 dest->primary = src->primary; 2253 } 2254 2255 bool ceph_pg_is_split(const struct ceph_pg *pgid, u32 old_pg_num, 2256 u32 new_pg_num) 2257 { 2258 int old_bits = calc_bits_of(old_pg_num); 2259 int old_mask = (1 << old_bits) - 1; 2260 int n; 2261 2262 WARN_ON(pgid->seed >= old_pg_num); 2263 if (new_pg_num <= old_pg_num) 2264 return false; 2265 2266 for (n = 1; ; n++) { 2267 int next_bit = n << (old_bits - 1); 2268 u32 s = next_bit | pgid->seed; 2269 2270 if (s < old_pg_num || s == pgid->seed) 2271 continue; 2272 if (s >= new_pg_num) 2273 break; 2274 2275 s = ceph_stable_mod(s, old_pg_num, old_mask); 2276 if (s == pgid->seed) 2277 return true; 2278 } 2279 2280 return false; 2281 } 2282 2283 bool ceph_is_new_interval(const struct ceph_osds *old_acting, 2284 const struct ceph_osds *new_acting, 2285 const struct ceph_osds *old_up, 2286 const struct ceph_osds *new_up, 2287 int old_size, 2288 int new_size, 2289 int old_min_size, 2290 int new_min_size, 2291 u32 old_pg_num, 2292 u32 new_pg_num, 2293 bool old_sort_bitwise, 2294 bool new_sort_bitwise, 2295 bool old_recovery_deletes, 2296 bool new_recovery_deletes, 2297 const struct ceph_pg *pgid) 2298 { 2299 return !osds_equal(old_acting, new_acting) || 2300 !osds_equal(old_up, new_up) || 2301 old_size != new_size || 2302 old_min_size != new_min_size || 2303 ceph_pg_is_split(pgid, old_pg_num, new_pg_num) || 2304 old_sort_bitwise != new_sort_bitwise || 2305 old_recovery_deletes != new_recovery_deletes; 2306 } 2307 2308 static int calc_pg_rank(int osd, const struct ceph_osds *acting) 2309 { 2310 int i; 2311 2312 for (i = 0; i < acting->size; i++) { 2313 if (acting->osds[i] == osd) 2314 return i; 2315 } 2316 2317 return -1; 2318 } 2319 2320 static bool primary_changed(const struct ceph_osds *old_acting, 2321 const struct ceph_osds *new_acting) 2322 { 2323 if (!old_acting->size && !new_acting->size) 2324 return false; /* both still empty */ 2325 2326 if (!old_acting->size ^ !new_acting->size) 2327 return true; /* was empty, now not, or vice versa */ 2328 2329 if (old_acting->primary != new_acting->primary) 2330 return true; /* primary changed */ 2331 2332 if (calc_pg_rank(old_acting->primary, old_acting) != 2333 calc_pg_rank(new_acting->primary, new_acting)) 2334 return true; 2335 2336 return false; /* same primary (tho replicas may have changed) */ 2337 } 2338 2339 bool ceph_osds_changed(const struct ceph_osds *old_acting, 2340 const struct ceph_osds *new_acting, 2341 bool any_change) 2342 { 2343 if (primary_changed(old_acting, new_acting)) 2344 return true; 2345 2346 if (any_change && !__osds_equal(old_acting, new_acting)) 2347 return true; 2348 2349 return false; 2350 } 2351 2352 /* 2353 * Map an object into a PG. 2354 * 2355 * Should only be called with target_oid and target_oloc (as opposed to 2356 * base_oid and base_oloc), since tiering isn't taken into account. 2357 */ 2358 void __ceph_object_locator_to_pg(struct ceph_pg_pool_info *pi, 2359 const struct ceph_object_id *oid, 2360 const struct ceph_object_locator *oloc, 2361 struct ceph_pg *raw_pgid) 2362 { 2363 WARN_ON(pi->id != oloc->pool); 2364 2365 if (!oloc->pool_ns) { 2366 raw_pgid->pool = oloc->pool; 2367 raw_pgid->seed = ceph_str_hash(pi->object_hash, oid->name, 2368 oid->name_len); 2369 dout("%s %s -> raw_pgid %llu.%x\n", __func__, oid->name, 2370 raw_pgid->pool, raw_pgid->seed); 2371 } else { 2372 char stack_buf[256]; 2373 char *buf = stack_buf; 2374 int nsl = oloc->pool_ns->len; 2375 size_t total = nsl + 1 + oid->name_len; 2376 2377 if (total > sizeof(stack_buf)) 2378 buf = kmalloc(total, GFP_NOIO | __GFP_NOFAIL); 2379 memcpy(buf, oloc->pool_ns->str, nsl); 2380 buf[nsl] = '\037'; 2381 memcpy(buf + nsl + 1, oid->name, oid->name_len); 2382 raw_pgid->pool = oloc->pool; 2383 raw_pgid->seed = ceph_str_hash(pi->object_hash, buf, total); 2384 if (buf != stack_buf) 2385 kfree(buf); 2386 dout("%s %s ns %.*s -> raw_pgid %llu.%x\n", __func__, 2387 oid->name, nsl, oloc->pool_ns->str, 2388 raw_pgid->pool, raw_pgid->seed); 2389 } 2390 } 2391 2392 int ceph_object_locator_to_pg(struct ceph_osdmap *osdmap, 2393 const struct ceph_object_id *oid, 2394 const struct ceph_object_locator *oloc, 2395 struct ceph_pg *raw_pgid) 2396 { 2397 struct ceph_pg_pool_info *pi; 2398 2399 pi = ceph_pg_pool_by_id(osdmap, oloc->pool); 2400 if (!pi) 2401 return -ENOENT; 2402 2403 __ceph_object_locator_to_pg(pi, oid, oloc, raw_pgid); 2404 return 0; 2405 } 2406 EXPORT_SYMBOL(ceph_object_locator_to_pg); 2407 2408 /* 2409 * Map a raw PG (full precision ps) into an actual PG. 2410 */ 2411 static void raw_pg_to_pg(struct ceph_pg_pool_info *pi, 2412 const struct ceph_pg *raw_pgid, 2413 struct ceph_pg *pgid) 2414 { 2415 pgid->pool = raw_pgid->pool; 2416 pgid->seed = ceph_stable_mod(raw_pgid->seed, pi->pg_num, 2417 pi->pg_num_mask); 2418 } 2419 2420 /* 2421 * Map a raw PG (full precision ps) into a placement ps (placement 2422 * seed). Include pool id in that value so that different pools don't 2423 * use the same seeds. 2424 */ 2425 static u32 raw_pg_to_pps(struct ceph_pg_pool_info *pi, 2426 const struct ceph_pg *raw_pgid) 2427 { 2428 if (pi->flags & CEPH_POOL_FLAG_HASHPSPOOL) { 2429 /* hash pool id and seed so that pool PGs do not overlap */ 2430 return crush_hash32_2(CRUSH_HASH_RJENKINS1, 2431 ceph_stable_mod(raw_pgid->seed, 2432 pi->pgp_num, 2433 pi->pgp_num_mask), 2434 raw_pgid->pool); 2435 } else { 2436 /* 2437 * legacy behavior: add ps and pool together. this is 2438 * not a great approach because the PGs from each pool 2439 * will overlap on top of each other: 0.5 == 1.4 == 2440 * 2.3 == ... 2441 */ 2442 return ceph_stable_mod(raw_pgid->seed, pi->pgp_num, 2443 pi->pgp_num_mask) + 2444 (unsigned)raw_pgid->pool; 2445 } 2446 } 2447 2448 /* 2449 * Magic value used for a "default" fallback choose_args, used if the 2450 * crush_choose_arg_map passed to do_crush() does not exist. If this 2451 * also doesn't exist, fall back to canonical weights. 2452 */ 2453 #define CEPH_DEFAULT_CHOOSE_ARGS -1 2454 2455 static int do_crush(struct ceph_osdmap *map, int ruleno, int x, 2456 int *result, int result_max, 2457 const __u32 *weight, int weight_max, 2458 s64 choose_args_index) 2459 { 2460 struct crush_choose_arg_map *arg_map; 2461 struct crush_work *work; 2462 int r; 2463 2464 BUG_ON(result_max > CEPH_PG_MAX_SIZE); 2465 2466 arg_map = lookup_choose_arg_map(&map->crush->choose_args, 2467 choose_args_index); 2468 if (!arg_map) 2469 arg_map = lookup_choose_arg_map(&map->crush->choose_args, 2470 CEPH_DEFAULT_CHOOSE_ARGS); 2471 2472 work = get_workspace(&map->crush_wsm, map->crush); 2473 r = crush_do_rule(map->crush, ruleno, x, result, result_max, 2474 weight, weight_max, work, 2475 arg_map ? arg_map->args : NULL); 2476 put_workspace(&map->crush_wsm, work); 2477 return r; 2478 } 2479 2480 static void remove_nonexistent_osds(struct ceph_osdmap *osdmap, 2481 struct ceph_pg_pool_info *pi, 2482 struct ceph_osds *set) 2483 { 2484 int i; 2485 2486 if (ceph_can_shift_osds(pi)) { 2487 int removed = 0; 2488 2489 /* shift left */ 2490 for (i = 0; i < set->size; i++) { 2491 if (!ceph_osd_exists(osdmap, set->osds[i])) { 2492 removed++; 2493 continue; 2494 } 2495 if (removed) 2496 set->osds[i - removed] = set->osds[i]; 2497 } 2498 set->size -= removed; 2499 } else { 2500 /* set dne devices to NONE */ 2501 for (i = 0; i < set->size; i++) { 2502 if (!ceph_osd_exists(osdmap, set->osds[i])) 2503 set->osds[i] = CRUSH_ITEM_NONE; 2504 } 2505 } 2506 } 2507 2508 /* 2509 * Calculate raw set (CRUSH output) for given PG and filter out 2510 * nonexistent OSDs. ->primary is undefined for a raw set. 2511 * 2512 * Placement seed (CRUSH input) is returned through @ppps. 2513 */ 2514 static void pg_to_raw_osds(struct ceph_osdmap *osdmap, 2515 struct ceph_pg_pool_info *pi, 2516 const struct ceph_pg *raw_pgid, 2517 struct ceph_osds *raw, 2518 u32 *ppps) 2519 { 2520 u32 pps = raw_pg_to_pps(pi, raw_pgid); 2521 int ruleno; 2522 int len; 2523 2524 ceph_osds_init(raw); 2525 if (ppps) 2526 *ppps = pps; 2527 2528 ruleno = crush_find_rule(osdmap->crush, pi->crush_ruleset, pi->type, 2529 pi->size); 2530 if (ruleno < 0) { 2531 pr_err("no crush rule: pool %lld ruleset %d type %d size %d\n", 2532 pi->id, pi->crush_ruleset, pi->type, pi->size); 2533 return; 2534 } 2535 2536 if (pi->size > ARRAY_SIZE(raw->osds)) { 2537 pr_err_ratelimited("pool %lld ruleset %d type %d too wide: size %d > %zu\n", 2538 pi->id, pi->crush_ruleset, pi->type, pi->size, 2539 ARRAY_SIZE(raw->osds)); 2540 return; 2541 } 2542 2543 len = do_crush(osdmap, ruleno, pps, raw->osds, pi->size, 2544 osdmap->osd_weight, osdmap->max_osd, pi->id); 2545 if (len < 0) { 2546 pr_err("error %d from crush rule %d: pool %lld ruleset %d type %d size %d\n", 2547 len, ruleno, pi->id, pi->crush_ruleset, pi->type, 2548 pi->size); 2549 return; 2550 } 2551 2552 raw->size = len; 2553 remove_nonexistent_osds(osdmap, pi, raw); 2554 } 2555 2556 /* apply pg_upmap[_items] mappings */ 2557 static void apply_upmap(struct ceph_osdmap *osdmap, 2558 const struct ceph_pg *pgid, 2559 struct ceph_osds *raw) 2560 { 2561 struct ceph_pg_mapping *pg; 2562 int i, j; 2563 2564 pg = lookup_pg_mapping(&osdmap->pg_upmap, pgid); 2565 if (pg) { 2566 /* make sure targets aren't marked out */ 2567 for (i = 0; i < pg->pg_upmap.len; i++) { 2568 int osd = pg->pg_upmap.osds[i]; 2569 2570 if (osd != CRUSH_ITEM_NONE && 2571 osd < osdmap->max_osd && 2572 osdmap->osd_weight[osd] == 0) { 2573 /* reject/ignore explicit mapping */ 2574 return; 2575 } 2576 } 2577 for (i = 0; i < pg->pg_upmap.len; i++) 2578 raw->osds[i] = pg->pg_upmap.osds[i]; 2579 raw->size = pg->pg_upmap.len; 2580 /* check and apply pg_upmap_items, if any */ 2581 } 2582 2583 pg = lookup_pg_mapping(&osdmap->pg_upmap_items, pgid); 2584 if (pg) { 2585 /* 2586 * Note: this approach does not allow a bidirectional swap, 2587 * e.g., [[1,2],[2,1]] applied to [0,1,2] -> [0,2,1]. 2588 */ 2589 for (i = 0; i < pg->pg_upmap_items.len; i++) { 2590 int from = pg->pg_upmap_items.from_to[i][0]; 2591 int to = pg->pg_upmap_items.from_to[i][1]; 2592 int pos = -1; 2593 bool exists = false; 2594 2595 /* make sure replacement doesn't already appear */ 2596 for (j = 0; j < raw->size; j++) { 2597 int osd = raw->osds[j]; 2598 2599 if (osd == to) { 2600 exists = true; 2601 break; 2602 } 2603 /* ignore mapping if target is marked out */ 2604 if (osd == from && pos < 0 && 2605 !(to != CRUSH_ITEM_NONE && 2606 to < osdmap->max_osd && 2607 osdmap->osd_weight[to] == 0)) { 2608 pos = j; 2609 } 2610 } 2611 if (!exists && pos >= 0) 2612 raw->osds[pos] = to; 2613 } 2614 } 2615 } 2616 2617 /* 2618 * Given raw set, calculate up set and up primary. By definition of an 2619 * up set, the result won't contain nonexistent or down OSDs. 2620 * 2621 * This is done in-place - on return @set is the up set. If it's 2622 * empty, ->primary will remain undefined. 2623 */ 2624 static void raw_to_up_osds(struct ceph_osdmap *osdmap, 2625 struct ceph_pg_pool_info *pi, 2626 struct ceph_osds *set) 2627 { 2628 int i; 2629 2630 /* ->primary is undefined for a raw set */ 2631 BUG_ON(set->primary != -1); 2632 2633 if (ceph_can_shift_osds(pi)) { 2634 int removed = 0; 2635 2636 /* shift left */ 2637 for (i = 0; i < set->size; i++) { 2638 if (ceph_osd_is_down(osdmap, set->osds[i])) { 2639 removed++; 2640 continue; 2641 } 2642 if (removed) 2643 set->osds[i - removed] = set->osds[i]; 2644 } 2645 set->size -= removed; 2646 if (set->size > 0) 2647 set->primary = set->osds[0]; 2648 } else { 2649 /* set down/dne devices to NONE */ 2650 for (i = set->size - 1; i >= 0; i--) { 2651 if (ceph_osd_is_down(osdmap, set->osds[i])) 2652 set->osds[i] = CRUSH_ITEM_NONE; 2653 else 2654 set->primary = set->osds[i]; 2655 } 2656 } 2657 } 2658 2659 static void apply_primary_affinity(struct ceph_osdmap *osdmap, 2660 struct ceph_pg_pool_info *pi, 2661 u32 pps, 2662 struct ceph_osds *up) 2663 { 2664 int i; 2665 int pos = -1; 2666 2667 /* 2668 * Do we have any non-default primary_affinity values for these 2669 * osds? 2670 */ 2671 if (!osdmap->osd_primary_affinity) 2672 return; 2673 2674 for (i = 0; i < up->size; i++) { 2675 int osd = up->osds[i]; 2676 2677 if (osd != CRUSH_ITEM_NONE && 2678 osdmap->osd_primary_affinity[osd] != 2679 CEPH_OSD_DEFAULT_PRIMARY_AFFINITY) { 2680 break; 2681 } 2682 } 2683 if (i == up->size) 2684 return; 2685 2686 /* 2687 * Pick the primary. Feed both the seed (for the pg) and the 2688 * osd into the hash/rng so that a proportional fraction of an 2689 * osd's pgs get rejected as primary. 2690 */ 2691 for (i = 0; i < up->size; i++) { 2692 int osd = up->osds[i]; 2693 u32 aff; 2694 2695 if (osd == CRUSH_ITEM_NONE) 2696 continue; 2697 2698 aff = osdmap->osd_primary_affinity[osd]; 2699 if (aff < CEPH_OSD_MAX_PRIMARY_AFFINITY && 2700 (crush_hash32_2(CRUSH_HASH_RJENKINS1, 2701 pps, osd) >> 16) >= aff) { 2702 /* 2703 * We chose not to use this primary. Note it 2704 * anyway as a fallback in case we don't pick 2705 * anyone else, but keep looking. 2706 */ 2707 if (pos < 0) 2708 pos = i; 2709 } else { 2710 pos = i; 2711 break; 2712 } 2713 } 2714 if (pos < 0) 2715 return; 2716 2717 up->primary = up->osds[pos]; 2718 2719 if (ceph_can_shift_osds(pi) && pos > 0) { 2720 /* move the new primary to the front */ 2721 for (i = pos; i > 0; i--) 2722 up->osds[i] = up->osds[i - 1]; 2723 up->osds[0] = up->primary; 2724 } 2725 } 2726 2727 /* 2728 * Get pg_temp and primary_temp mappings for given PG. 2729 * 2730 * Note that a PG may have none, only pg_temp, only primary_temp or 2731 * both pg_temp and primary_temp mappings. This means @temp isn't 2732 * always a valid OSD set on return: in the "only primary_temp" case, 2733 * @temp will have its ->primary >= 0 but ->size == 0. 2734 */ 2735 static void get_temp_osds(struct ceph_osdmap *osdmap, 2736 struct ceph_pg_pool_info *pi, 2737 const struct ceph_pg *pgid, 2738 struct ceph_osds *temp) 2739 { 2740 struct ceph_pg_mapping *pg; 2741 int i; 2742 2743 ceph_osds_init(temp); 2744 2745 /* pg_temp? */ 2746 pg = lookup_pg_mapping(&osdmap->pg_temp, pgid); 2747 if (pg) { 2748 for (i = 0; i < pg->pg_temp.len; i++) { 2749 if (ceph_osd_is_down(osdmap, pg->pg_temp.osds[i])) { 2750 if (ceph_can_shift_osds(pi)) 2751 continue; 2752 2753 temp->osds[temp->size++] = CRUSH_ITEM_NONE; 2754 } else { 2755 temp->osds[temp->size++] = pg->pg_temp.osds[i]; 2756 } 2757 } 2758 2759 /* apply pg_temp's primary */ 2760 for (i = 0; i < temp->size; i++) { 2761 if (temp->osds[i] != CRUSH_ITEM_NONE) { 2762 temp->primary = temp->osds[i]; 2763 break; 2764 } 2765 } 2766 } 2767 2768 /* primary_temp? */ 2769 pg = lookup_pg_mapping(&osdmap->primary_temp, pgid); 2770 if (pg) 2771 temp->primary = pg->primary_temp.osd; 2772 } 2773 2774 /* 2775 * Map a PG to its acting set as well as its up set. 2776 * 2777 * Acting set is used for data mapping purposes, while up set can be 2778 * recorded for detecting interval changes and deciding whether to 2779 * resend a request. 2780 */ 2781 void ceph_pg_to_up_acting_osds(struct ceph_osdmap *osdmap, 2782 struct ceph_pg_pool_info *pi, 2783 const struct ceph_pg *raw_pgid, 2784 struct ceph_osds *up, 2785 struct ceph_osds *acting) 2786 { 2787 struct ceph_pg pgid; 2788 u32 pps; 2789 2790 WARN_ON(pi->id != raw_pgid->pool); 2791 raw_pg_to_pg(pi, raw_pgid, &pgid); 2792 2793 pg_to_raw_osds(osdmap, pi, raw_pgid, up, &pps); 2794 apply_upmap(osdmap, &pgid, up); 2795 raw_to_up_osds(osdmap, pi, up); 2796 apply_primary_affinity(osdmap, pi, pps, up); 2797 get_temp_osds(osdmap, pi, &pgid, acting); 2798 if (!acting->size) { 2799 memcpy(acting->osds, up->osds, up->size * sizeof(up->osds[0])); 2800 acting->size = up->size; 2801 if (acting->primary == -1) 2802 acting->primary = up->primary; 2803 } 2804 WARN_ON(!osds_valid(up) || !osds_valid(acting)); 2805 } 2806 2807 bool ceph_pg_to_primary_shard(struct ceph_osdmap *osdmap, 2808 struct ceph_pg_pool_info *pi, 2809 const struct ceph_pg *raw_pgid, 2810 struct ceph_spg *spgid) 2811 { 2812 struct ceph_pg pgid; 2813 struct ceph_osds up, acting; 2814 int i; 2815 2816 WARN_ON(pi->id != raw_pgid->pool); 2817 raw_pg_to_pg(pi, raw_pgid, &pgid); 2818 2819 if (ceph_can_shift_osds(pi)) { 2820 spgid->pgid = pgid; /* struct */ 2821 spgid->shard = CEPH_SPG_NOSHARD; 2822 return true; 2823 } 2824 2825 ceph_pg_to_up_acting_osds(osdmap, pi, &pgid, &up, &acting); 2826 for (i = 0; i < acting.size; i++) { 2827 if (acting.osds[i] == acting.primary) { 2828 spgid->pgid = pgid; /* struct */ 2829 spgid->shard = i; 2830 return true; 2831 } 2832 } 2833 2834 return false; 2835 } 2836 2837 /* 2838 * Return acting primary for given PG, or -1 if none. 2839 */ 2840 int ceph_pg_to_acting_primary(struct ceph_osdmap *osdmap, 2841 const struct ceph_pg *raw_pgid) 2842 { 2843 struct ceph_pg_pool_info *pi; 2844 struct ceph_osds up, acting; 2845 2846 pi = ceph_pg_pool_by_id(osdmap, raw_pgid->pool); 2847 if (!pi) 2848 return -1; 2849 2850 ceph_pg_to_up_acting_osds(osdmap, pi, raw_pgid, &up, &acting); 2851 return acting.primary; 2852 } 2853 EXPORT_SYMBOL(ceph_pg_to_acting_primary); 2854 2855 static struct crush_loc_node *alloc_crush_loc(size_t type_name_len, 2856 size_t name_len) 2857 { 2858 struct crush_loc_node *loc; 2859 2860 loc = kmalloc(sizeof(*loc) + type_name_len + name_len + 2, GFP_NOIO); 2861 if (!loc) 2862 return NULL; 2863 2864 RB_CLEAR_NODE(&loc->cl_node); 2865 return loc; 2866 } 2867 2868 static void free_crush_loc(struct crush_loc_node *loc) 2869 { 2870 WARN_ON(!RB_EMPTY_NODE(&loc->cl_node)); 2871 2872 kfree(loc); 2873 } 2874 2875 static int crush_loc_compare(const struct crush_loc *loc1, 2876 const struct crush_loc *loc2) 2877 { 2878 return strcmp(loc1->cl_type_name, loc2->cl_type_name) ?: 2879 strcmp(loc1->cl_name, loc2->cl_name); 2880 } 2881 2882 DEFINE_RB_FUNCS2(crush_loc, struct crush_loc_node, cl_loc, crush_loc_compare, 2883 RB_BYPTR, const struct crush_loc *, cl_node) 2884 2885 /* 2886 * Parses a set of <bucket type name>':'<bucket name> pairs separated 2887 * by '|', e.g. "rack:foo1|rack:foo2|datacenter:bar". 2888 * 2889 * Note that @crush_location is modified by strsep(). 2890 */ 2891 int ceph_parse_crush_location(char *crush_location, struct rb_root *locs) 2892 { 2893 struct crush_loc_node *loc; 2894 const char *type_name, *name, *colon; 2895 size_t type_name_len, name_len; 2896 2897 dout("%s '%s'\n", __func__, crush_location); 2898 while ((type_name = strsep(&crush_location, "|"))) { 2899 colon = strchr(type_name, ':'); 2900 if (!colon) 2901 return -EINVAL; 2902 2903 type_name_len = colon - type_name; 2904 if (type_name_len == 0) 2905 return -EINVAL; 2906 2907 name = colon + 1; 2908 name_len = strlen(name); 2909 if (name_len == 0) 2910 return -EINVAL; 2911 2912 loc = alloc_crush_loc(type_name_len, name_len); 2913 if (!loc) 2914 return -ENOMEM; 2915 2916 loc->cl_loc.cl_type_name = loc->cl_data; 2917 memcpy(loc->cl_loc.cl_type_name, type_name, type_name_len); 2918 loc->cl_loc.cl_type_name[type_name_len] = '\0'; 2919 2920 loc->cl_loc.cl_name = loc->cl_data + type_name_len + 1; 2921 memcpy(loc->cl_loc.cl_name, name, name_len); 2922 loc->cl_loc.cl_name[name_len] = '\0'; 2923 2924 if (!__insert_crush_loc(locs, loc)) { 2925 free_crush_loc(loc); 2926 return -EEXIST; 2927 } 2928 2929 dout("%s type_name '%s' name '%s'\n", __func__, 2930 loc->cl_loc.cl_type_name, loc->cl_loc.cl_name); 2931 } 2932 2933 return 0; 2934 } 2935 2936 int ceph_compare_crush_locs(struct rb_root *locs1, struct rb_root *locs2) 2937 { 2938 struct rb_node *n1 = rb_first(locs1); 2939 struct rb_node *n2 = rb_first(locs2); 2940 int ret; 2941 2942 for ( ; n1 && n2; n1 = rb_next(n1), n2 = rb_next(n2)) { 2943 struct crush_loc_node *loc1 = 2944 rb_entry(n1, struct crush_loc_node, cl_node); 2945 struct crush_loc_node *loc2 = 2946 rb_entry(n2, struct crush_loc_node, cl_node); 2947 2948 ret = crush_loc_compare(&loc1->cl_loc, &loc2->cl_loc); 2949 if (ret) 2950 return ret; 2951 } 2952 2953 if (!n1 && n2) 2954 return -1; 2955 if (n1 && !n2) 2956 return 1; 2957 return 0; 2958 } 2959 2960 void ceph_clear_crush_locs(struct rb_root *locs) 2961 { 2962 while (!RB_EMPTY_ROOT(locs)) { 2963 struct crush_loc_node *loc = 2964 rb_entry(rb_first(locs), struct crush_loc_node, cl_node); 2965 2966 erase_crush_loc(locs, loc); 2967 free_crush_loc(loc); 2968 } 2969 } 2970 2971 /* 2972 * [a-zA-Z0-9-_.]+ 2973 */ 2974 static bool is_valid_crush_name(const char *name) 2975 { 2976 do { 2977 if (!('a' <= *name && *name <= 'z') && 2978 !('A' <= *name && *name <= 'Z') && 2979 !('0' <= *name && *name <= '9') && 2980 *name != '-' && *name != '_' && *name != '.') 2981 return false; 2982 } while (*++name != '\0'); 2983 2984 return true; 2985 } 2986 2987 /* 2988 * Gets the parent of an item. Returns its id (<0 because the 2989 * parent is always a bucket), type id (>0 for the same reason, 2990 * via @parent_type_id) and location (via @parent_loc). If no 2991 * parent, returns 0. 2992 * 2993 * Does a linear search, as there are no parent pointers of any 2994 * kind. Note that the result is ambigous for items that occur 2995 * multiple times in the map. 2996 */ 2997 static int get_immediate_parent(struct crush_map *c, int id, 2998 u16 *parent_type_id, 2999 struct crush_loc *parent_loc) 3000 { 3001 struct crush_bucket *b; 3002 struct crush_name_node *type_cn, *cn; 3003 int i, j; 3004 3005 for (i = 0; i < c->max_buckets; i++) { 3006 b = c->buckets[i]; 3007 if (!b) 3008 continue; 3009 3010 /* ignore per-class shadow hierarchy */ 3011 cn = lookup_crush_name(&c->names, b->id); 3012 if (!cn || !is_valid_crush_name(cn->cn_name)) 3013 continue; 3014 3015 for (j = 0; j < b->size; j++) { 3016 if (b->items[j] != id) 3017 continue; 3018 3019 *parent_type_id = b->type; 3020 type_cn = lookup_crush_name(&c->type_names, b->type); 3021 parent_loc->cl_type_name = type_cn->cn_name; 3022 parent_loc->cl_name = cn->cn_name; 3023 return b->id; 3024 } 3025 } 3026 3027 return 0; /* no parent */ 3028 } 3029 3030 /* 3031 * Calculates the locality/distance from an item to a client 3032 * location expressed in terms of CRUSH hierarchy as a set of 3033 * (bucket type name, bucket name) pairs. Specifically, looks 3034 * for the lowest-valued bucket type for which the location of 3035 * @id matches one of the locations in @locs, so for standard 3036 * bucket types (host = 1, rack = 3, datacenter = 8, zone = 9) 3037 * a matching host is closer than a matching rack and a matching 3038 * data center is closer than a matching zone. 3039 * 3040 * Specifying multiple locations (a "multipath" location) such 3041 * as "rack=foo1 rack=foo2 datacenter=bar" is allowed -- @locs 3042 * is a multimap. The locality will be: 3043 * 3044 * - 3 for OSDs in racks foo1 and foo2 3045 * - 8 for OSDs in data center bar 3046 * - -1 for all other OSDs 3047 * 3048 * The lowest possible bucket type is 1, so the best locality 3049 * for an OSD is 1 (i.e. a matching host). Locality 0 would be 3050 * the OSD itself. 3051 */ 3052 int ceph_get_crush_locality(struct ceph_osdmap *osdmap, int id, 3053 struct rb_root *locs) 3054 { 3055 struct crush_loc loc; 3056 u16 type_id; 3057 3058 /* 3059 * Instead of repeated get_immediate_parent() calls, 3060 * the location of @id could be obtained with a single 3061 * depth-first traversal. 3062 */ 3063 for (;;) { 3064 id = get_immediate_parent(osdmap->crush, id, &type_id, &loc); 3065 if (id >= 0) 3066 return -1; /* not local */ 3067 3068 if (lookup_crush_loc(locs, &loc)) 3069 return type_id; 3070 } 3071 } 3072