1 /* 2 * Copyright (C) 2015, SUSE 3 * 4 * This program is free software; you can redistribute it and/or modify 5 * it under the terms of the GNU General Public License as published by 6 * the Free Software Foundation; either version 2, or (at your option) 7 * any later version. 8 * 9 */ 10 11 12 #include <linux/module.h> 13 #include <linux/dlm.h> 14 #include <linux/sched.h> 15 #include <linux/raid/md_p.h> 16 #include "md.h" 17 #include "bitmap.h" 18 #include "md-cluster.h" 19 20 #define LVB_SIZE 64 21 #define NEW_DEV_TIMEOUT 5000 22 23 struct dlm_lock_resource { 24 dlm_lockspace_t *ls; 25 struct dlm_lksb lksb; 26 char *name; /* lock name. */ 27 uint32_t flags; /* flags to pass to dlm_lock() */ 28 struct completion completion; /* completion for synchronized locking */ 29 void (*bast)(void *arg, int mode); /* blocking AST function pointer*/ 30 struct mddev *mddev; /* pointing back to mddev. */ 31 int mode; 32 }; 33 34 struct suspend_info { 35 int slot; 36 sector_t lo; 37 sector_t hi; 38 struct list_head list; 39 }; 40 41 struct resync_info { 42 __le64 lo; 43 __le64 hi; 44 }; 45 46 /* md_cluster_info flags */ 47 #define MD_CLUSTER_WAITING_FOR_NEWDISK 1 48 #define MD_CLUSTER_SUSPEND_READ_BALANCING 2 49 #define MD_CLUSTER_BEGIN_JOIN_CLUSTER 3 50 51 52 struct md_cluster_info { 53 /* dlm lock space and resources for clustered raid. */ 54 dlm_lockspace_t *lockspace; 55 int slot_number; 56 struct completion completion; 57 struct dlm_lock_resource *bitmap_lockres; 58 struct dlm_lock_resource *resync_lockres; 59 struct list_head suspend_list; 60 spinlock_t suspend_lock; 61 struct md_thread *recovery_thread; 62 unsigned long recovery_map; 63 /* communication loc resources */ 64 struct dlm_lock_resource *ack_lockres; 65 struct dlm_lock_resource *message_lockres; 66 struct dlm_lock_resource *token_lockres; 67 struct dlm_lock_resource *no_new_dev_lockres; 68 struct md_thread *recv_thread; 69 struct completion newdisk_completion; 70 unsigned long state; 71 }; 72 73 enum msg_type { 74 METADATA_UPDATED = 0, 75 RESYNCING, 76 NEWDISK, 77 REMOVE, 78 RE_ADD, 79 BITMAP_NEEDS_SYNC, 80 }; 81 82 struct cluster_msg { 83 __le32 type; 84 __le32 slot; 85 /* TODO: Unionize this for smaller footprint */ 86 __le64 low; 87 __le64 high; 88 char uuid[16]; 89 __le32 raid_slot; 90 }; 91 92 static void sync_ast(void *arg) 93 { 94 struct dlm_lock_resource *res; 95 96 res = arg; 97 complete(&res->completion); 98 } 99 100 static int dlm_lock_sync(struct dlm_lock_resource *res, int mode) 101 { 102 int ret = 0; 103 104 ret = dlm_lock(res->ls, mode, &res->lksb, 105 res->flags, res->name, strlen(res->name), 106 0, sync_ast, res, res->bast); 107 if (ret) 108 return ret; 109 wait_for_completion(&res->completion); 110 if (res->lksb.sb_status == 0) 111 res->mode = mode; 112 return res->lksb.sb_status; 113 } 114 115 static int dlm_unlock_sync(struct dlm_lock_resource *res) 116 { 117 return dlm_lock_sync(res, DLM_LOCK_NL); 118 } 119 120 static struct dlm_lock_resource *lockres_init(struct mddev *mddev, 121 char *name, void (*bastfn)(void *arg, int mode), int with_lvb) 122 { 123 struct dlm_lock_resource *res = NULL; 124 int ret, namelen; 125 struct md_cluster_info *cinfo = mddev->cluster_info; 126 127 res = kzalloc(sizeof(struct dlm_lock_resource), GFP_KERNEL); 128 if (!res) 129 return NULL; 130 init_completion(&res->completion); 131 res->ls = cinfo->lockspace; 132 res->mddev = mddev; 133 res->mode = DLM_LOCK_IV; 134 namelen = strlen(name); 135 res->name = kzalloc(namelen + 1, GFP_KERNEL); 136 if (!res->name) { 137 pr_err("md-cluster: Unable to allocate resource name for resource %s\n", name); 138 goto out_err; 139 } 140 strlcpy(res->name, name, namelen + 1); 141 if (with_lvb) { 142 res->lksb.sb_lvbptr = kzalloc(LVB_SIZE, GFP_KERNEL); 143 if (!res->lksb.sb_lvbptr) { 144 pr_err("md-cluster: Unable to allocate LVB for resource %s\n", name); 145 goto out_err; 146 } 147 res->flags = DLM_LKF_VALBLK; 148 } 149 150 if (bastfn) 151 res->bast = bastfn; 152 153 res->flags |= DLM_LKF_EXPEDITE; 154 155 ret = dlm_lock_sync(res, DLM_LOCK_NL); 156 if (ret) { 157 pr_err("md-cluster: Unable to lock NL on new lock resource %s\n", name); 158 goto out_err; 159 } 160 res->flags &= ~DLM_LKF_EXPEDITE; 161 res->flags |= DLM_LKF_CONVERT; 162 163 return res; 164 out_err: 165 kfree(res->lksb.sb_lvbptr); 166 kfree(res->name); 167 kfree(res); 168 return NULL; 169 } 170 171 static void lockres_free(struct dlm_lock_resource *res) 172 { 173 int ret; 174 175 if (!res) 176 return; 177 178 /* cancel a lock request or a conversion request that is blocked */ 179 res->flags |= DLM_LKF_CANCEL; 180 retry: 181 ret = dlm_unlock(res->ls, res->lksb.sb_lkid, 0, &res->lksb, res); 182 if (unlikely(ret != 0)) { 183 pr_info("%s: failed to unlock %s return %d\n", __func__, res->name, ret); 184 185 /* if a lock conversion is cancelled, then the lock is put 186 * back to grant queue, need to ensure it is unlocked */ 187 if (ret == -DLM_ECANCEL) 188 goto retry; 189 } 190 res->flags &= ~DLM_LKF_CANCEL; 191 wait_for_completion(&res->completion); 192 193 kfree(res->name); 194 kfree(res->lksb.sb_lvbptr); 195 kfree(res); 196 } 197 198 static void add_resync_info(struct dlm_lock_resource *lockres, 199 sector_t lo, sector_t hi) 200 { 201 struct resync_info *ri; 202 203 ri = (struct resync_info *)lockres->lksb.sb_lvbptr; 204 ri->lo = cpu_to_le64(lo); 205 ri->hi = cpu_to_le64(hi); 206 } 207 208 static struct suspend_info *read_resync_info(struct mddev *mddev, struct dlm_lock_resource *lockres) 209 { 210 struct resync_info ri; 211 struct suspend_info *s = NULL; 212 sector_t hi = 0; 213 214 dlm_lock_sync(lockres, DLM_LOCK_CR); 215 memcpy(&ri, lockres->lksb.sb_lvbptr, sizeof(struct resync_info)); 216 hi = le64_to_cpu(ri.hi); 217 if (hi > 0) { 218 s = kzalloc(sizeof(struct suspend_info), GFP_KERNEL); 219 if (!s) 220 goto out; 221 s->hi = hi; 222 s->lo = le64_to_cpu(ri.lo); 223 } 224 dlm_unlock_sync(lockres); 225 out: 226 return s; 227 } 228 229 static void recover_bitmaps(struct md_thread *thread) 230 { 231 struct mddev *mddev = thread->mddev; 232 struct md_cluster_info *cinfo = mddev->cluster_info; 233 struct dlm_lock_resource *bm_lockres; 234 char str[64]; 235 int slot, ret; 236 struct suspend_info *s, *tmp; 237 sector_t lo, hi; 238 239 while (cinfo->recovery_map) { 240 slot = fls64((u64)cinfo->recovery_map) - 1; 241 242 /* Clear suspend_area associated with the bitmap */ 243 spin_lock_irq(&cinfo->suspend_lock); 244 list_for_each_entry_safe(s, tmp, &cinfo->suspend_list, list) 245 if (slot == s->slot) { 246 list_del(&s->list); 247 kfree(s); 248 } 249 spin_unlock_irq(&cinfo->suspend_lock); 250 251 snprintf(str, 64, "bitmap%04d", slot); 252 bm_lockres = lockres_init(mddev, str, NULL, 1); 253 if (!bm_lockres) { 254 pr_err("md-cluster: Cannot initialize bitmaps\n"); 255 goto clear_bit; 256 } 257 258 ret = dlm_lock_sync(bm_lockres, DLM_LOCK_PW); 259 if (ret) { 260 pr_err("md-cluster: Could not DLM lock %s: %d\n", 261 str, ret); 262 goto clear_bit; 263 } 264 ret = bitmap_copy_from_slot(mddev, slot, &lo, &hi, true); 265 if (ret) { 266 pr_err("md-cluster: Could not copy data from bitmap %d\n", slot); 267 goto dlm_unlock; 268 } 269 if (hi > 0) { 270 /* TODO:Wait for current resync to get over */ 271 set_bit(MD_RECOVERY_NEEDED, &mddev->recovery); 272 if (lo < mddev->recovery_cp) 273 mddev->recovery_cp = lo; 274 md_check_recovery(mddev); 275 } 276 dlm_unlock: 277 dlm_unlock_sync(bm_lockres); 278 clear_bit: 279 clear_bit(slot, &cinfo->recovery_map); 280 } 281 } 282 283 static void recover_prep(void *arg) 284 { 285 struct mddev *mddev = arg; 286 struct md_cluster_info *cinfo = mddev->cluster_info; 287 set_bit(MD_CLUSTER_SUSPEND_READ_BALANCING, &cinfo->state); 288 } 289 290 static void __recover_slot(struct mddev *mddev, int slot) 291 { 292 struct md_cluster_info *cinfo = mddev->cluster_info; 293 294 set_bit(slot, &cinfo->recovery_map); 295 if (!cinfo->recovery_thread) { 296 cinfo->recovery_thread = md_register_thread(recover_bitmaps, 297 mddev, "recover"); 298 if (!cinfo->recovery_thread) { 299 pr_warn("md-cluster: Could not create recovery thread\n"); 300 return; 301 } 302 } 303 md_wakeup_thread(cinfo->recovery_thread); 304 } 305 306 static void recover_slot(void *arg, struct dlm_slot *slot) 307 { 308 struct mddev *mddev = arg; 309 struct md_cluster_info *cinfo = mddev->cluster_info; 310 311 pr_info("md-cluster: %s Node %d/%d down. My slot: %d. Initiating recovery.\n", 312 mddev->bitmap_info.cluster_name, 313 slot->nodeid, slot->slot, 314 cinfo->slot_number); 315 /* deduct one since dlm slot starts from one while the num of 316 * cluster-md begins with 0 */ 317 __recover_slot(mddev, slot->slot - 1); 318 } 319 320 static void recover_done(void *arg, struct dlm_slot *slots, 321 int num_slots, int our_slot, 322 uint32_t generation) 323 { 324 struct mddev *mddev = arg; 325 struct md_cluster_info *cinfo = mddev->cluster_info; 326 327 cinfo->slot_number = our_slot; 328 /* completion is only need to be complete when node join cluster, 329 * it doesn't need to run during another node's failure */ 330 if (test_bit(MD_CLUSTER_BEGIN_JOIN_CLUSTER, &cinfo->state)) { 331 complete(&cinfo->completion); 332 clear_bit(MD_CLUSTER_BEGIN_JOIN_CLUSTER, &cinfo->state); 333 } 334 clear_bit(MD_CLUSTER_SUSPEND_READ_BALANCING, &cinfo->state); 335 } 336 337 /* the ops is called when node join the cluster, and do lock recovery 338 * if node failure occurs */ 339 static const struct dlm_lockspace_ops md_ls_ops = { 340 .recover_prep = recover_prep, 341 .recover_slot = recover_slot, 342 .recover_done = recover_done, 343 }; 344 345 /* 346 * The BAST function for the ack lock resource 347 * This function wakes up the receive thread in 348 * order to receive and process the message. 349 */ 350 static void ack_bast(void *arg, int mode) 351 { 352 struct dlm_lock_resource *res = arg; 353 struct md_cluster_info *cinfo = res->mddev->cluster_info; 354 355 if (mode == DLM_LOCK_EX) 356 md_wakeup_thread(cinfo->recv_thread); 357 } 358 359 static void __remove_suspend_info(struct md_cluster_info *cinfo, int slot) 360 { 361 struct suspend_info *s, *tmp; 362 363 list_for_each_entry_safe(s, tmp, &cinfo->suspend_list, list) 364 if (slot == s->slot) { 365 list_del(&s->list); 366 kfree(s); 367 break; 368 } 369 } 370 371 static void remove_suspend_info(struct mddev *mddev, int slot) 372 { 373 struct md_cluster_info *cinfo = mddev->cluster_info; 374 spin_lock_irq(&cinfo->suspend_lock); 375 __remove_suspend_info(cinfo, slot); 376 spin_unlock_irq(&cinfo->suspend_lock); 377 mddev->pers->quiesce(mddev, 2); 378 } 379 380 381 static void process_suspend_info(struct mddev *mddev, 382 int slot, sector_t lo, sector_t hi) 383 { 384 struct md_cluster_info *cinfo = mddev->cluster_info; 385 struct suspend_info *s; 386 387 if (!hi) { 388 remove_suspend_info(mddev, slot); 389 set_bit(MD_RECOVERY_NEEDED, &mddev->recovery); 390 md_wakeup_thread(mddev->thread); 391 return; 392 } 393 s = kzalloc(sizeof(struct suspend_info), GFP_KERNEL); 394 if (!s) 395 return; 396 s->slot = slot; 397 s->lo = lo; 398 s->hi = hi; 399 mddev->pers->quiesce(mddev, 1); 400 mddev->pers->quiesce(mddev, 0); 401 spin_lock_irq(&cinfo->suspend_lock); 402 /* Remove existing entry (if exists) before adding */ 403 __remove_suspend_info(cinfo, slot); 404 list_add(&s->list, &cinfo->suspend_list); 405 spin_unlock_irq(&cinfo->suspend_lock); 406 mddev->pers->quiesce(mddev, 2); 407 } 408 409 static void process_add_new_disk(struct mddev *mddev, struct cluster_msg *cmsg) 410 { 411 char disk_uuid[64]; 412 struct md_cluster_info *cinfo = mddev->cluster_info; 413 char event_name[] = "EVENT=ADD_DEVICE"; 414 char raid_slot[16]; 415 char *envp[] = {event_name, disk_uuid, raid_slot, NULL}; 416 int len; 417 418 len = snprintf(disk_uuid, 64, "DEVICE_UUID="); 419 sprintf(disk_uuid + len, "%pU", cmsg->uuid); 420 snprintf(raid_slot, 16, "RAID_DISK=%d", le32_to_cpu(cmsg->raid_slot)); 421 pr_info("%s:%d Sending kobject change with %s and %s\n", __func__, __LINE__, disk_uuid, raid_slot); 422 init_completion(&cinfo->newdisk_completion); 423 set_bit(MD_CLUSTER_WAITING_FOR_NEWDISK, &cinfo->state); 424 kobject_uevent_env(&disk_to_dev(mddev->gendisk)->kobj, KOBJ_CHANGE, envp); 425 wait_for_completion_timeout(&cinfo->newdisk_completion, 426 NEW_DEV_TIMEOUT); 427 clear_bit(MD_CLUSTER_WAITING_FOR_NEWDISK, &cinfo->state); 428 } 429 430 431 static void process_metadata_update(struct mddev *mddev, struct cluster_msg *msg) 432 { 433 struct md_cluster_info *cinfo = mddev->cluster_info; 434 md_reload_sb(mddev, le32_to_cpu(msg->raid_slot)); 435 dlm_lock_sync(cinfo->no_new_dev_lockres, DLM_LOCK_CR); 436 } 437 438 static void process_remove_disk(struct mddev *mddev, struct cluster_msg *msg) 439 { 440 struct md_rdev *rdev = md_find_rdev_nr_rcu(mddev, 441 le32_to_cpu(msg->raid_slot)); 442 443 if (rdev) 444 md_kick_rdev_from_array(rdev); 445 else 446 pr_warn("%s: %d Could not find disk(%d) to REMOVE\n", 447 __func__, __LINE__, le32_to_cpu(msg->raid_slot)); 448 } 449 450 static void process_readd_disk(struct mddev *mddev, struct cluster_msg *msg) 451 { 452 struct md_rdev *rdev = md_find_rdev_nr_rcu(mddev, 453 le32_to_cpu(msg->raid_slot)); 454 455 if (rdev && test_bit(Faulty, &rdev->flags)) 456 clear_bit(Faulty, &rdev->flags); 457 else 458 pr_warn("%s: %d Could not find disk(%d) which is faulty", 459 __func__, __LINE__, le32_to_cpu(msg->raid_slot)); 460 } 461 462 static void process_recvd_msg(struct mddev *mddev, struct cluster_msg *msg) 463 { 464 if (WARN(mddev->cluster_info->slot_number - 1 == le32_to_cpu(msg->slot), 465 "node %d received it's own msg\n", le32_to_cpu(msg->slot))) 466 return; 467 switch (le32_to_cpu(msg->type)) { 468 case METADATA_UPDATED: 469 process_metadata_update(mddev, msg); 470 break; 471 case RESYNCING: 472 process_suspend_info(mddev, le32_to_cpu(msg->slot), 473 le64_to_cpu(msg->low), 474 le64_to_cpu(msg->high)); 475 break; 476 case NEWDISK: 477 process_add_new_disk(mddev, msg); 478 break; 479 case REMOVE: 480 process_remove_disk(mddev, msg); 481 break; 482 case RE_ADD: 483 process_readd_disk(mddev, msg); 484 break; 485 case BITMAP_NEEDS_SYNC: 486 __recover_slot(mddev, le32_to_cpu(msg->slot)); 487 break; 488 default: 489 pr_warn("%s:%d Received unknown message from %d\n", 490 __func__, __LINE__, msg->slot); 491 } 492 } 493 494 /* 495 * thread for receiving message 496 */ 497 static void recv_daemon(struct md_thread *thread) 498 { 499 struct md_cluster_info *cinfo = thread->mddev->cluster_info; 500 struct dlm_lock_resource *ack_lockres = cinfo->ack_lockres; 501 struct dlm_lock_resource *message_lockres = cinfo->message_lockres; 502 struct cluster_msg msg; 503 int ret; 504 505 /*get CR on Message*/ 506 if (dlm_lock_sync(message_lockres, DLM_LOCK_CR)) { 507 pr_err("md/raid1:failed to get CR on MESSAGE\n"); 508 return; 509 } 510 511 /* read lvb and wake up thread to process this message_lockres */ 512 memcpy(&msg, message_lockres->lksb.sb_lvbptr, sizeof(struct cluster_msg)); 513 process_recvd_msg(thread->mddev, &msg); 514 515 /*release CR on ack_lockres*/ 516 ret = dlm_unlock_sync(ack_lockres); 517 if (unlikely(ret != 0)) 518 pr_info("unlock ack failed return %d\n", ret); 519 /*up-convert to PR on message_lockres*/ 520 ret = dlm_lock_sync(message_lockres, DLM_LOCK_PR); 521 if (unlikely(ret != 0)) 522 pr_info("lock PR on msg failed return %d\n", ret); 523 /*get CR on ack_lockres again*/ 524 ret = dlm_lock_sync(ack_lockres, DLM_LOCK_CR); 525 if (unlikely(ret != 0)) 526 pr_info("lock CR on ack failed return %d\n", ret); 527 /*release CR on message_lockres*/ 528 ret = dlm_unlock_sync(message_lockres); 529 if (unlikely(ret != 0)) 530 pr_info("unlock msg failed return %d\n", ret); 531 } 532 533 /* lock_comm() 534 * Takes the lock on the TOKEN lock resource so no other 535 * node can communicate while the operation is underway. 536 * If called again, and the TOKEN lock is alread in EX mode 537 * return success. However, care must be taken that unlock_comm() 538 * is called only once. 539 */ 540 static int lock_comm(struct md_cluster_info *cinfo) 541 { 542 int error; 543 544 if (cinfo->token_lockres->mode == DLM_LOCK_EX) 545 return 0; 546 547 error = dlm_lock_sync(cinfo->token_lockres, DLM_LOCK_EX); 548 if (error) 549 pr_err("md-cluster(%s:%d): failed to get EX on TOKEN (%d)\n", 550 __func__, __LINE__, error); 551 return error; 552 } 553 554 static void unlock_comm(struct md_cluster_info *cinfo) 555 { 556 WARN_ON(cinfo->token_lockres->mode != DLM_LOCK_EX); 557 dlm_unlock_sync(cinfo->token_lockres); 558 } 559 560 /* __sendmsg() 561 * This function performs the actual sending of the message. This function is 562 * usually called after performing the encompassing operation 563 * The function: 564 * 1. Grabs the message lockresource in EX mode 565 * 2. Copies the message to the message LVB 566 * 3. Downconverts message lockresource to CW 567 * 4. Upconverts ack lock resource from CR to EX. This forces the BAST on other nodes 568 * and the other nodes read the message. The thread will wait here until all other 569 * nodes have released ack lock resource. 570 * 5. Downconvert ack lockresource to CR 571 */ 572 static int __sendmsg(struct md_cluster_info *cinfo, struct cluster_msg *cmsg) 573 { 574 int error; 575 int slot = cinfo->slot_number - 1; 576 577 cmsg->slot = cpu_to_le32(slot); 578 /*get EX on Message*/ 579 error = dlm_lock_sync(cinfo->message_lockres, DLM_LOCK_EX); 580 if (error) { 581 pr_err("md-cluster: failed to get EX on MESSAGE (%d)\n", error); 582 goto failed_message; 583 } 584 585 memcpy(cinfo->message_lockres->lksb.sb_lvbptr, (void *)cmsg, 586 sizeof(struct cluster_msg)); 587 /*down-convert EX to CW on Message*/ 588 error = dlm_lock_sync(cinfo->message_lockres, DLM_LOCK_CW); 589 if (error) { 590 pr_err("md-cluster: failed to convert EX to CW on MESSAGE(%d)\n", 591 error); 592 goto failed_ack; 593 } 594 595 /*up-convert CR to EX on Ack*/ 596 error = dlm_lock_sync(cinfo->ack_lockres, DLM_LOCK_EX); 597 if (error) { 598 pr_err("md-cluster: failed to convert CR to EX on ACK(%d)\n", 599 error); 600 goto failed_ack; 601 } 602 603 /*down-convert EX to CR on Ack*/ 604 error = dlm_lock_sync(cinfo->ack_lockres, DLM_LOCK_CR); 605 if (error) { 606 pr_err("md-cluster: failed to convert EX to CR on ACK(%d)\n", 607 error); 608 goto failed_ack; 609 } 610 611 failed_ack: 612 error = dlm_unlock_sync(cinfo->message_lockres); 613 if (unlikely(error != 0)) { 614 pr_err("md-cluster: failed convert to NL on MESSAGE(%d)\n", 615 error); 616 /* in case the message can't be released due to some reason */ 617 goto failed_ack; 618 } 619 failed_message: 620 return error; 621 } 622 623 static int sendmsg(struct md_cluster_info *cinfo, struct cluster_msg *cmsg) 624 { 625 int ret; 626 627 lock_comm(cinfo); 628 ret = __sendmsg(cinfo, cmsg); 629 unlock_comm(cinfo); 630 return ret; 631 } 632 633 static int gather_all_resync_info(struct mddev *mddev, int total_slots) 634 { 635 struct md_cluster_info *cinfo = mddev->cluster_info; 636 int i, ret = 0; 637 struct dlm_lock_resource *bm_lockres; 638 struct suspend_info *s; 639 char str[64]; 640 sector_t lo, hi; 641 642 643 for (i = 0; i < total_slots; i++) { 644 memset(str, '\0', 64); 645 snprintf(str, 64, "bitmap%04d", i); 646 bm_lockres = lockres_init(mddev, str, NULL, 1); 647 if (!bm_lockres) 648 return -ENOMEM; 649 if (i == (cinfo->slot_number - 1)) 650 continue; 651 652 bm_lockres->flags |= DLM_LKF_NOQUEUE; 653 ret = dlm_lock_sync(bm_lockres, DLM_LOCK_PW); 654 if (ret == -EAGAIN) { 655 memset(bm_lockres->lksb.sb_lvbptr, '\0', LVB_SIZE); 656 s = read_resync_info(mddev, bm_lockres); 657 if (s) { 658 pr_info("%s:%d Resync[%llu..%llu] in progress on %d\n", 659 __func__, __LINE__, 660 (unsigned long long) s->lo, 661 (unsigned long long) s->hi, i); 662 spin_lock_irq(&cinfo->suspend_lock); 663 s->slot = i; 664 list_add(&s->list, &cinfo->suspend_list); 665 spin_unlock_irq(&cinfo->suspend_lock); 666 } 667 ret = 0; 668 lockres_free(bm_lockres); 669 continue; 670 } 671 if (ret) { 672 lockres_free(bm_lockres); 673 goto out; 674 } 675 676 /* Read the disk bitmap sb and check if it needs recovery */ 677 ret = bitmap_copy_from_slot(mddev, i, &lo, &hi, false); 678 if (ret) { 679 pr_warn("md-cluster: Could not gather bitmaps from slot %d", i); 680 lockres_free(bm_lockres); 681 continue; 682 } 683 if ((hi > 0) && (lo < mddev->recovery_cp)) { 684 set_bit(MD_RECOVERY_NEEDED, &mddev->recovery); 685 mddev->recovery_cp = lo; 686 md_check_recovery(mddev); 687 } 688 689 dlm_unlock_sync(bm_lockres); 690 lockres_free(bm_lockres); 691 } 692 out: 693 return ret; 694 } 695 696 static int join(struct mddev *mddev, int nodes) 697 { 698 struct md_cluster_info *cinfo; 699 int ret, ops_rv; 700 char str[64]; 701 702 cinfo = kzalloc(sizeof(struct md_cluster_info), GFP_KERNEL); 703 if (!cinfo) 704 return -ENOMEM; 705 706 INIT_LIST_HEAD(&cinfo->suspend_list); 707 spin_lock_init(&cinfo->suspend_lock); 708 init_completion(&cinfo->completion); 709 set_bit(MD_CLUSTER_BEGIN_JOIN_CLUSTER, &cinfo->state); 710 711 mddev->cluster_info = cinfo; 712 713 memset(str, 0, 64); 714 sprintf(str, "%pU", mddev->uuid); 715 ret = dlm_new_lockspace(str, mddev->bitmap_info.cluster_name, 716 DLM_LSFL_FS, LVB_SIZE, 717 &md_ls_ops, mddev, &ops_rv, &cinfo->lockspace); 718 if (ret) 719 goto err; 720 wait_for_completion(&cinfo->completion); 721 if (nodes < cinfo->slot_number) { 722 pr_err("md-cluster: Slot allotted(%d) is greater than available slots(%d).", 723 cinfo->slot_number, nodes); 724 ret = -ERANGE; 725 goto err; 726 } 727 /* Initiate the communication resources */ 728 ret = -ENOMEM; 729 cinfo->recv_thread = md_register_thread(recv_daemon, mddev, "cluster_recv"); 730 if (!cinfo->recv_thread) { 731 pr_err("md-cluster: cannot allocate memory for recv_thread!\n"); 732 goto err; 733 } 734 cinfo->message_lockres = lockres_init(mddev, "message", NULL, 1); 735 if (!cinfo->message_lockres) 736 goto err; 737 cinfo->token_lockres = lockres_init(mddev, "token", NULL, 0); 738 if (!cinfo->token_lockres) 739 goto err; 740 cinfo->ack_lockres = lockres_init(mddev, "ack", ack_bast, 0); 741 if (!cinfo->ack_lockres) 742 goto err; 743 cinfo->no_new_dev_lockres = lockres_init(mddev, "no-new-dev", NULL, 0); 744 if (!cinfo->no_new_dev_lockres) 745 goto err; 746 747 /* get sync CR lock on ACK. */ 748 if (dlm_lock_sync(cinfo->ack_lockres, DLM_LOCK_CR)) 749 pr_err("md-cluster: failed to get a sync CR lock on ACK!(%d)\n", 750 ret); 751 /* get sync CR lock on no-new-dev. */ 752 if (dlm_lock_sync(cinfo->no_new_dev_lockres, DLM_LOCK_CR)) 753 pr_err("md-cluster: failed to get a sync CR lock on no-new-dev!(%d)\n", ret); 754 755 756 pr_info("md-cluster: Joined cluster %s slot %d\n", str, cinfo->slot_number); 757 snprintf(str, 64, "bitmap%04d", cinfo->slot_number - 1); 758 cinfo->bitmap_lockres = lockres_init(mddev, str, NULL, 1); 759 if (!cinfo->bitmap_lockres) 760 goto err; 761 if (dlm_lock_sync(cinfo->bitmap_lockres, DLM_LOCK_PW)) { 762 pr_err("Failed to get bitmap lock\n"); 763 ret = -EINVAL; 764 goto err; 765 } 766 767 cinfo->resync_lockres = lockres_init(mddev, "resync", NULL, 0); 768 if (!cinfo->resync_lockres) 769 goto err; 770 771 ret = gather_all_resync_info(mddev, nodes); 772 if (ret) 773 goto err; 774 775 return 0; 776 err: 777 lockres_free(cinfo->message_lockres); 778 lockres_free(cinfo->token_lockres); 779 lockres_free(cinfo->ack_lockres); 780 lockres_free(cinfo->no_new_dev_lockres); 781 lockres_free(cinfo->resync_lockres); 782 lockres_free(cinfo->bitmap_lockres); 783 if (cinfo->lockspace) 784 dlm_release_lockspace(cinfo->lockspace, 2); 785 mddev->cluster_info = NULL; 786 kfree(cinfo); 787 return ret; 788 } 789 790 static void resync_bitmap(struct mddev *mddev) 791 { 792 struct md_cluster_info *cinfo = mddev->cluster_info; 793 struct cluster_msg cmsg = {0}; 794 int err; 795 796 cmsg.type = cpu_to_le32(BITMAP_NEEDS_SYNC); 797 err = sendmsg(cinfo, &cmsg); 798 if (err) 799 pr_err("%s:%d: failed to send BITMAP_NEEDS_SYNC message (%d)\n", 800 __func__, __LINE__, err); 801 } 802 803 static int leave(struct mddev *mddev) 804 { 805 struct md_cluster_info *cinfo = mddev->cluster_info; 806 807 if (!cinfo) 808 return 0; 809 810 /* BITMAP_NEEDS_SYNC message should be sent when node 811 * is leaving the cluster with dirty bitmap, also we 812 * can only deliver it when dlm connection is available */ 813 if (cinfo->slot_number > 0 && mddev->recovery_cp != MaxSector) 814 resync_bitmap(mddev); 815 816 md_unregister_thread(&cinfo->recovery_thread); 817 md_unregister_thread(&cinfo->recv_thread); 818 lockres_free(cinfo->message_lockres); 819 lockres_free(cinfo->token_lockres); 820 lockres_free(cinfo->ack_lockres); 821 lockres_free(cinfo->no_new_dev_lockres); 822 lockres_free(cinfo->bitmap_lockres); 823 dlm_release_lockspace(cinfo->lockspace, 2); 824 return 0; 825 } 826 827 /* slot_number(): Returns the MD slot number to use 828 * DLM starts the slot numbers from 1, wheras cluster-md 829 * wants the number to be from zero, so we deduct one 830 */ 831 static int slot_number(struct mddev *mddev) 832 { 833 struct md_cluster_info *cinfo = mddev->cluster_info; 834 835 return cinfo->slot_number - 1; 836 } 837 838 static int metadata_update_start(struct mddev *mddev) 839 { 840 return lock_comm(mddev->cluster_info); 841 } 842 843 static int metadata_update_finish(struct mddev *mddev) 844 { 845 struct md_cluster_info *cinfo = mddev->cluster_info; 846 struct cluster_msg cmsg; 847 struct md_rdev *rdev; 848 int ret = 0; 849 int raid_slot = -1; 850 851 memset(&cmsg, 0, sizeof(cmsg)); 852 cmsg.type = cpu_to_le32(METADATA_UPDATED); 853 /* Pick up a good active device number to send. 854 */ 855 rdev_for_each(rdev, mddev) 856 if (rdev->raid_disk > -1 && !test_bit(Faulty, &rdev->flags)) { 857 raid_slot = rdev->desc_nr; 858 break; 859 } 860 if (raid_slot >= 0) { 861 cmsg.raid_slot = cpu_to_le32(raid_slot); 862 ret = __sendmsg(cinfo, &cmsg); 863 } else 864 pr_warn("md-cluster: No good device id found to send\n"); 865 unlock_comm(cinfo); 866 return ret; 867 } 868 869 static void metadata_update_cancel(struct mddev *mddev) 870 { 871 struct md_cluster_info *cinfo = mddev->cluster_info; 872 unlock_comm(cinfo); 873 } 874 875 static int resync_start(struct mddev *mddev) 876 { 877 struct md_cluster_info *cinfo = mddev->cluster_info; 878 cinfo->resync_lockres->flags |= DLM_LKF_NOQUEUE; 879 return dlm_lock_sync(cinfo->resync_lockres, DLM_LOCK_EX); 880 } 881 882 static int resync_info_update(struct mddev *mddev, sector_t lo, sector_t hi) 883 { 884 struct md_cluster_info *cinfo = mddev->cluster_info; 885 struct cluster_msg cmsg = {0}; 886 887 add_resync_info(cinfo->bitmap_lockres, lo, hi); 888 /* Re-acquire the lock to refresh LVB */ 889 dlm_lock_sync(cinfo->bitmap_lockres, DLM_LOCK_PW); 890 cmsg.type = cpu_to_le32(RESYNCING); 891 cmsg.low = cpu_to_le64(lo); 892 cmsg.high = cpu_to_le64(hi); 893 894 return sendmsg(cinfo, &cmsg); 895 } 896 897 static int resync_finish(struct mddev *mddev) 898 { 899 struct md_cluster_info *cinfo = mddev->cluster_info; 900 cinfo->resync_lockres->flags &= ~DLM_LKF_NOQUEUE; 901 dlm_unlock_sync(cinfo->resync_lockres); 902 return resync_info_update(mddev, 0, 0); 903 } 904 905 static int area_resyncing(struct mddev *mddev, int direction, 906 sector_t lo, sector_t hi) 907 { 908 struct md_cluster_info *cinfo = mddev->cluster_info; 909 int ret = 0; 910 struct suspend_info *s; 911 912 if ((direction == READ) && 913 test_bit(MD_CLUSTER_SUSPEND_READ_BALANCING, &cinfo->state)) 914 return 1; 915 916 spin_lock_irq(&cinfo->suspend_lock); 917 if (list_empty(&cinfo->suspend_list)) 918 goto out; 919 list_for_each_entry(s, &cinfo->suspend_list, list) 920 if (hi > s->lo && lo < s->hi) { 921 ret = 1; 922 break; 923 } 924 out: 925 spin_unlock_irq(&cinfo->suspend_lock); 926 return ret; 927 } 928 929 /* add_new_disk() - initiates a disk add 930 * However, if this fails before writing md_update_sb(), 931 * add_new_disk_cancel() must be called to release token lock 932 */ 933 static int add_new_disk(struct mddev *mddev, struct md_rdev *rdev) 934 { 935 struct md_cluster_info *cinfo = mddev->cluster_info; 936 struct cluster_msg cmsg; 937 int ret = 0; 938 struct mdp_superblock_1 *sb = page_address(rdev->sb_page); 939 char *uuid = sb->device_uuid; 940 941 memset(&cmsg, 0, sizeof(cmsg)); 942 cmsg.type = cpu_to_le32(NEWDISK); 943 memcpy(cmsg.uuid, uuid, 16); 944 cmsg.raid_slot = cpu_to_le32(rdev->desc_nr); 945 lock_comm(cinfo); 946 ret = __sendmsg(cinfo, &cmsg); 947 if (ret) 948 return ret; 949 cinfo->no_new_dev_lockres->flags |= DLM_LKF_NOQUEUE; 950 ret = dlm_lock_sync(cinfo->no_new_dev_lockres, DLM_LOCK_EX); 951 cinfo->no_new_dev_lockres->flags &= ~DLM_LKF_NOQUEUE; 952 /* Some node does not "see" the device */ 953 if (ret == -EAGAIN) 954 ret = -ENOENT; 955 if (ret) 956 unlock_comm(cinfo); 957 else 958 dlm_lock_sync(cinfo->no_new_dev_lockres, DLM_LOCK_CR); 959 return ret; 960 } 961 962 static void add_new_disk_cancel(struct mddev *mddev) 963 { 964 struct md_cluster_info *cinfo = mddev->cluster_info; 965 unlock_comm(cinfo); 966 } 967 968 static int new_disk_ack(struct mddev *mddev, bool ack) 969 { 970 struct md_cluster_info *cinfo = mddev->cluster_info; 971 972 if (!test_bit(MD_CLUSTER_WAITING_FOR_NEWDISK, &cinfo->state)) { 973 pr_warn("md-cluster(%s): Spurious cluster confirmation\n", mdname(mddev)); 974 return -EINVAL; 975 } 976 977 if (ack) 978 dlm_unlock_sync(cinfo->no_new_dev_lockres); 979 complete(&cinfo->newdisk_completion); 980 return 0; 981 } 982 983 static int remove_disk(struct mddev *mddev, struct md_rdev *rdev) 984 { 985 struct cluster_msg cmsg = {0}; 986 struct md_cluster_info *cinfo = mddev->cluster_info; 987 cmsg.type = cpu_to_le32(REMOVE); 988 cmsg.raid_slot = cpu_to_le32(rdev->desc_nr); 989 return __sendmsg(cinfo, &cmsg); 990 } 991 992 static int gather_bitmaps(struct md_rdev *rdev) 993 { 994 int sn, err; 995 sector_t lo, hi; 996 struct cluster_msg cmsg = {0}; 997 struct mddev *mddev = rdev->mddev; 998 struct md_cluster_info *cinfo = mddev->cluster_info; 999 1000 cmsg.type = cpu_to_le32(RE_ADD); 1001 cmsg.raid_slot = cpu_to_le32(rdev->desc_nr); 1002 err = sendmsg(cinfo, &cmsg); 1003 if (err) 1004 goto out; 1005 1006 for (sn = 0; sn < mddev->bitmap_info.nodes; sn++) { 1007 if (sn == (cinfo->slot_number - 1)) 1008 continue; 1009 err = bitmap_copy_from_slot(mddev, sn, &lo, &hi, false); 1010 if (err) { 1011 pr_warn("md-cluster: Could not gather bitmaps from slot %d", sn); 1012 goto out; 1013 } 1014 if ((hi > 0) && (lo < mddev->recovery_cp)) 1015 mddev->recovery_cp = lo; 1016 } 1017 out: 1018 return err; 1019 } 1020 1021 static struct md_cluster_operations cluster_ops = { 1022 .join = join, 1023 .leave = leave, 1024 .slot_number = slot_number, 1025 .resync_start = resync_start, 1026 .resync_finish = resync_finish, 1027 .resync_info_update = resync_info_update, 1028 .metadata_update_start = metadata_update_start, 1029 .metadata_update_finish = metadata_update_finish, 1030 .metadata_update_cancel = metadata_update_cancel, 1031 .area_resyncing = area_resyncing, 1032 .add_new_disk = add_new_disk, 1033 .add_new_disk_cancel = add_new_disk_cancel, 1034 .new_disk_ack = new_disk_ack, 1035 .remove_disk = remove_disk, 1036 .gather_bitmaps = gather_bitmaps, 1037 }; 1038 1039 static int __init cluster_init(void) 1040 { 1041 pr_warn("md-cluster: EXPERIMENTAL. Use with caution\n"); 1042 pr_info("Registering Cluster MD functions\n"); 1043 register_md_cluster_operations(&cluster_ops, THIS_MODULE); 1044 return 0; 1045 } 1046 1047 static void cluster_exit(void) 1048 { 1049 unregister_md_cluster_operations(); 1050 } 1051 1052 module_init(cluster_init); 1053 module_exit(cluster_exit); 1054 MODULE_AUTHOR("SUSE"); 1055 MODULE_LICENSE("GPL"); 1056 MODULE_DESCRIPTION("Clustering support for MD"); 1057