1 /* 2 * Copyright (C) 2015, SUSE 3 * 4 * This program is free software; you can redistribute it and/or modify 5 * it under the terms of the GNU General Public License as published by 6 * the Free Software Foundation; either version 2, or (at your option) 7 * any later version. 8 * 9 */ 10 11 12 #include <linux/module.h> 13 #include <linux/dlm.h> 14 #include <linux/sched.h> 15 #include <linux/raid/md_p.h> 16 #include "md.h" 17 #include "bitmap.h" 18 #include "md-cluster.h" 19 20 #define LVB_SIZE 64 21 #define NEW_DEV_TIMEOUT 5000 22 23 struct dlm_lock_resource { 24 dlm_lockspace_t *ls; 25 struct dlm_lksb lksb; 26 char *name; /* lock name. */ 27 uint32_t flags; /* flags to pass to dlm_lock() */ 28 struct completion completion; /* completion for synchronized locking */ 29 void (*bast)(void *arg, int mode); /* blocking AST function pointer*/ 30 struct mddev *mddev; /* pointing back to mddev. */ 31 }; 32 33 struct suspend_info { 34 int slot; 35 sector_t lo; 36 sector_t hi; 37 struct list_head list; 38 }; 39 40 struct resync_info { 41 __le64 lo; 42 __le64 hi; 43 }; 44 45 /* md_cluster_info flags */ 46 #define MD_CLUSTER_WAITING_FOR_NEWDISK 1 47 48 49 struct md_cluster_info { 50 /* dlm lock space and resources for clustered raid. */ 51 dlm_lockspace_t *lockspace; 52 int slot_number; 53 struct completion completion; 54 struct dlm_lock_resource *sb_lock; 55 struct mutex sb_mutex; 56 struct dlm_lock_resource *bitmap_lockres; 57 struct list_head suspend_list; 58 spinlock_t suspend_lock; 59 struct md_thread *recovery_thread; 60 unsigned long recovery_map; 61 /* communication loc resources */ 62 struct dlm_lock_resource *ack_lockres; 63 struct dlm_lock_resource *message_lockres; 64 struct dlm_lock_resource *token_lockres; 65 struct dlm_lock_resource *no_new_dev_lockres; 66 struct md_thread *recv_thread; 67 struct completion newdisk_completion; 68 unsigned long state; 69 }; 70 71 enum msg_type { 72 METADATA_UPDATED = 0, 73 RESYNCING, 74 NEWDISK, 75 REMOVE, 76 }; 77 78 struct cluster_msg { 79 int type; 80 int slot; 81 /* TODO: Unionize this for smaller footprint */ 82 sector_t low; 83 sector_t high; 84 char uuid[16]; 85 int raid_slot; 86 }; 87 88 static void sync_ast(void *arg) 89 { 90 struct dlm_lock_resource *res; 91 92 res = (struct dlm_lock_resource *) arg; 93 complete(&res->completion); 94 } 95 96 static int dlm_lock_sync(struct dlm_lock_resource *res, int mode) 97 { 98 int ret = 0; 99 100 init_completion(&res->completion); 101 ret = dlm_lock(res->ls, mode, &res->lksb, 102 res->flags, res->name, strlen(res->name), 103 0, sync_ast, res, res->bast); 104 if (ret) 105 return ret; 106 wait_for_completion(&res->completion); 107 return res->lksb.sb_status; 108 } 109 110 static int dlm_unlock_sync(struct dlm_lock_resource *res) 111 { 112 return dlm_lock_sync(res, DLM_LOCK_NL); 113 } 114 115 static struct dlm_lock_resource *lockres_init(struct mddev *mddev, 116 char *name, void (*bastfn)(void *arg, int mode), int with_lvb) 117 { 118 struct dlm_lock_resource *res = NULL; 119 int ret, namelen; 120 struct md_cluster_info *cinfo = mddev->cluster_info; 121 122 res = kzalloc(sizeof(struct dlm_lock_resource), GFP_KERNEL); 123 if (!res) 124 return NULL; 125 res->ls = cinfo->lockspace; 126 res->mddev = mddev; 127 namelen = strlen(name); 128 res->name = kzalloc(namelen + 1, GFP_KERNEL); 129 if (!res->name) { 130 pr_err("md-cluster: Unable to allocate resource name for resource %s\n", name); 131 goto out_err; 132 } 133 strlcpy(res->name, name, namelen + 1); 134 if (with_lvb) { 135 res->lksb.sb_lvbptr = kzalloc(LVB_SIZE, GFP_KERNEL); 136 if (!res->lksb.sb_lvbptr) { 137 pr_err("md-cluster: Unable to allocate LVB for resource %s\n", name); 138 goto out_err; 139 } 140 res->flags = DLM_LKF_VALBLK; 141 } 142 143 if (bastfn) 144 res->bast = bastfn; 145 146 res->flags |= DLM_LKF_EXPEDITE; 147 148 ret = dlm_lock_sync(res, DLM_LOCK_NL); 149 if (ret) { 150 pr_err("md-cluster: Unable to lock NL on new lock resource %s\n", name); 151 goto out_err; 152 } 153 res->flags &= ~DLM_LKF_EXPEDITE; 154 res->flags |= DLM_LKF_CONVERT; 155 156 return res; 157 out_err: 158 kfree(res->lksb.sb_lvbptr); 159 kfree(res->name); 160 kfree(res); 161 return NULL; 162 } 163 164 static void lockres_free(struct dlm_lock_resource *res) 165 { 166 if (!res) 167 return; 168 169 init_completion(&res->completion); 170 dlm_unlock(res->ls, res->lksb.sb_lkid, 0, &res->lksb, res); 171 wait_for_completion(&res->completion); 172 173 kfree(res->name); 174 kfree(res->lksb.sb_lvbptr); 175 kfree(res); 176 } 177 178 static char *pretty_uuid(char *dest, char *src) 179 { 180 int i, len = 0; 181 182 for (i = 0; i < 16; i++) { 183 if (i == 4 || i == 6 || i == 8 || i == 10) 184 len += sprintf(dest + len, "-"); 185 len += sprintf(dest + len, "%02x", (__u8)src[i]); 186 } 187 return dest; 188 } 189 190 static void add_resync_info(struct mddev *mddev, struct dlm_lock_resource *lockres, 191 sector_t lo, sector_t hi) 192 { 193 struct resync_info *ri; 194 195 ri = (struct resync_info *)lockres->lksb.sb_lvbptr; 196 ri->lo = cpu_to_le64(lo); 197 ri->hi = cpu_to_le64(hi); 198 } 199 200 static struct suspend_info *read_resync_info(struct mddev *mddev, struct dlm_lock_resource *lockres) 201 { 202 struct resync_info ri; 203 struct suspend_info *s = NULL; 204 sector_t hi = 0; 205 206 dlm_lock_sync(lockres, DLM_LOCK_CR); 207 memcpy(&ri, lockres->lksb.sb_lvbptr, sizeof(struct resync_info)); 208 hi = le64_to_cpu(ri.hi); 209 if (ri.hi > 0) { 210 s = kzalloc(sizeof(struct suspend_info), GFP_KERNEL); 211 if (!s) 212 goto out; 213 s->hi = hi; 214 s->lo = le64_to_cpu(ri.lo); 215 } 216 dlm_unlock_sync(lockres); 217 out: 218 return s; 219 } 220 221 static void recover_bitmaps(struct md_thread *thread) 222 { 223 struct mddev *mddev = thread->mddev; 224 struct md_cluster_info *cinfo = mddev->cluster_info; 225 struct dlm_lock_resource *bm_lockres; 226 char str[64]; 227 int slot, ret; 228 struct suspend_info *s, *tmp; 229 sector_t lo, hi; 230 231 while (cinfo->recovery_map) { 232 slot = fls64((u64)cinfo->recovery_map) - 1; 233 234 /* Clear suspend_area associated with the bitmap */ 235 spin_lock_irq(&cinfo->suspend_lock); 236 list_for_each_entry_safe(s, tmp, &cinfo->suspend_list, list) 237 if (slot == s->slot) { 238 list_del(&s->list); 239 kfree(s); 240 } 241 spin_unlock_irq(&cinfo->suspend_lock); 242 243 snprintf(str, 64, "bitmap%04d", slot); 244 bm_lockres = lockres_init(mddev, str, NULL, 1); 245 if (!bm_lockres) { 246 pr_err("md-cluster: Cannot initialize bitmaps\n"); 247 goto clear_bit; 248 } 249 250 ret = dlm_lock_sync(bm_lockres, DLM_LOCK_PW); 251 if (ret) { 252 pr_err("md-cluster: Could not DLM lock %s: %d\n", 253 str, ret); 254 goto clear_bit; 255 } 256 ret = bitmap_copy_from_slot(mddev, slot, &lo, &hi); 257 if (ret) { 258 pr_err("md-cluster: Could not copy data from bitmap %d\n", slot); 259 goto dlm_unlock; 260 } 261 if (hi > 0) { 262 /* TODO:Wait for current resync to get over */ 263 set_bit(MD_RECOVERY_NEEDED, &mddev->recovery); 264 if (lo < mddev->recovery_cp) 265 mddev->recovery_cp = lo; 266 md_check_recovery(mddev); 267 } 268 dlm_unlock: 269 dlm_unlock_sync(bm_lockres); 270 clear_bit: 271 clear_bit(slot, &cinfo->recovery_map); 272 } 273 } 274 275 static void recover_prep(void *arg) 276 { 277 } 278 279 static void recover_slot(void *arg, struct dlm_slot *slot) 280 { 281 struct mddev *mddev = arg; 282 struct md_cluster_info *cinfo = mddev->cluster_info; 283 284 pr_info("md-cluster: %s Node %d/%d down. My slot: %d. Initiating recovery.\n", 285 mddev->bitmap_info.cluster_name, 286 slot->nodeid, slot->slot, 287 cinfo->slot_number); 288 set_bit(slot->slot - 1, &cinfo->recovery_map); 289 if (!cinfo->recovery_thread) { 290 cinfo->recovery_thread = md_register_thread(recover_bitmaps, 291 mddev, "recover"); 292 if (!cinfo->recovery_thread) { 293 pr_warn("md-cluster: Could not create recovery thread\n"); 294 return; 295 } 296 } 297 md_wakeup_thread(cinfo->recovery_thread); 298 } 299 300 static void recover_done(void *arg, struct dlm_slot *slots, 301 int num_slots, int our_slot, 302 uint32_t generation) 303 { 304 struct mddev *mddev = arg; 305 struct md_cluster_info *cinfo = mddev->cluster_info; 306 307 cinfo->slot_number = our_slot; 308 complete(&cinfo->completion); 309 } 310 311 static const struct dlm_lockspace_ops md_ls_ops = { 312 .recover_prep = recover_prep, 313 .recover_slot = recover_slot, 314 .recover_done = recover_done, 315 }; 316 317 /* 318 * The BAST function for the ack lock resource 319 * This function wakes up the receive thread in 320 * order to receive and process the message. 321 */ 322 static void ack_bast(void *arg, int mode) 323 { 324 struct dlm_lock_resource *res = (struct dlm_lock_resource *)arg; 325 struct md_cluster_info *cinfo = res->mddev->cluster_info; 326 327 if (mode == DLM_LOCK_EX) 328 md_wakeup_thread(cinfo->recv_thread); 329 } 330 331 static void __remove_suspend_info(struct md_cluster_info *cinfo, int slot) 332 { 333 struct suspend_info *s, *tmp; 334 335 list_for_each_entry_safe(s, tmp, &cinfo->suspend_list, list) 336 if (slot == s->slot) { 337 pr_info("%s:%d Deleting suspend_info: %d\n", 338 __func__, __LINE__, slot); 339 list_del(&s->list); 340 kfree(s); 341 break; 342 } 343 } 344 345 static void remove_suspend_info(struct md_cluster_info *cinfo, int slot) 346 { 347 spin_lock_irq(&cinfo->suspend_lock); 348 __remove_suspend_info(cinfo, slot); 349 spin_unlock_irq(&cinfo->suspend_lock); 350 } 351 352 353 static void process_suspend_info(struct md_cluster_info *cinfo, 354 int slot, sector_t lo, sector_t hi) 355 { 356 struct suspend_info *s; 357 358 if (!hi) { 359 remove_suspend_info(cinfo, slot); 360 return; 361 } 362 s = kzalloc(sizeof(struct suspend_info), GFP_KERNEL); 363 if (!s) 364 return; 365 s->slot = slot; 366 s->lo = lo; 367 s->hi = hi; 368 spin_lock_irq(&cinfo->suspend_lock); 369 /* Remove existing entry (if exists) before adding */ 370 __remove_suspend_info(cinfo, slot); 371 list_add(&s->list, &cinfo->suspend_list); 372 spin_unlock_irq(&cinfo->suspend_lock); 373 } 374 375 static void process_add_new_disk(struct mddev *mddev, struct cluster_msg *cmsg) 376 { 377 char disk_uuid[64]; 378 struct md_cluster_info *cinfo = mddev->cluster_info; 379 char event_name[] = "EVENT=ADD_DEVICE"; 380 char raid_slot[16]; 381 char *envp[] = {event_name, disk_uuid, raid_slot, NULL}; 382 int len; 383 384 len = snprintf(disk_uuid, 64, "DEVICE_UUID="); 385 pretty_uuid(disk_uuid + len, cmsg->uuid); 386 snprintf(raid_slot, 16, "RAID_DISK=%d", cmsg->raid_slot); 387 pr_info("%s:%d Sending kobject change with %s and %s\n", __func__, __LINE__, disk_uuid, raid_slot); 388 init_completion(&cinfo->newdisk_completion); 389 set_bit(MD_CLUSTER_WAITING_FOR_NEWDISK, &cinfo->state); 390 kobject_uevent_env(&disk_to_dev(mddev->gendisk)->kobj, KOBJ_CHANGE, envp); 391 wait_for_completion_timeout(&cinfo->newdisk_completion, 392 NEW_DEV_TIMEOUT); 393 clear_bit(MD_CLUSTER_WAITING_FOR_NEWDISK, &cinfo->state); 394 } 395 396 397 static void process_metadata_update(struct mddev *mddev, struct cluster_msg *msg) 398 { 399 struct md_cluster_info *cinfo = mddev->cluster_info; 400 401 md_reload_sb(mddev); 402 dlm_lock_sync(cinfo->no_new_dev_lockres, DLM_LOCK_CR); 403 } 404 405 static void process_remove_disk(struct mddev *mddev, struct cluster_msg *msg) 406 { 407 struct md_rdev *rdev = md_find_rdev_nr_rcu(mddev, msg->raid_slot); 408 409 if (rdev) 410 md_kick_rdev_from_array(rdev); 411 else 412 pr_warn("%s: %d Could not find disk(%d) to REMOVE\n", __func__, __LINE__, msg->raid_slot); 413 } 414 415 static void process_recvd_msg(struct mddev *mddev, struct cluster_msg *msg) 416 { 417 switch (msg->type) { 418 case METADATA_UPDATED: 419 pr_info("%s: %d Received message: METADATA_UPDATE from %d\n", 420 __func__, __LINE__, msg->slot); 421 process_metadata_update(mddev, msg); 422 break; 423 case RESYNCING: 424 pr_info("%s: %d Received message: RESYNCING from %d\n", 425 __func__, __LINE__, msg->slot); 426 process_suspend_info(mddev->cluster_info, msg->slot, 427 msg->low, msg->high); 428 break; 429 case NEWDISK: 430 pr_info("%s: %d Received message: NEWDISK from %d\n", 431 __func__, __LINE__, msg->slot); 432 process_add_new_disk(mddev, msg); 433 break; 434 case REMOVE: 435 pr_info("%s: %d Received REMOVE from %d\n", 436 __func__, __LINE__, msg->slot); 437 process_remove_disk(mddev, msg); 438 break; 439 default: 440 pr_warn("%s:%d Received unknown message from %d\n", 441 __func__, __LINE__, msg->slot); 442 } 443 } 444 445 /* 446 * thread for receiving message 447 */ 448 static void recv_daemon(struct md_thread *thread) 449 { 450 struct md_cluster_info *cinfo = thread->mddev->cluster_info; 451 struct dlm_lock_resource *ack_lockres = cinfo->ack_lockres; 452 struct dlm_lock_resource *message_lockres = cinfo->message_lockres; 453 struct cluster_msg msg; 454 455 /*get CR on Message*/ 456 if (dlm_lock_sync(message_lockres, DLM_LOCK_CR)) { 457 pr_err("md/raid1:failed to get CR on MESSAGE\n"); 458 return; 459 } 460 461 /* read lvb and wake up thread to process this message_lockres */ 462 memcpy(&msg, message_lockres->lksb.sb_lvbptr, sizeof(struct cluster_msg)); 463 process_recvd_msg(thread->mddev, &msg); 464 465 /*release CR on ack_lockres*/ 466 dlm_unlock_sync(ack_lockres); 467 /*up-convert to EX on message_lockres*/ 468 dlm_lock_sync(message_lockres, DLM_LOCK_EX); 469 /*get CR on ack_lockres again*/ 470 dlm_lock_sync(ack_lockres, DLM_LOCK_CR); 471 /*release CR on message_lockres*/ 472 dlm_unlock_sync(message_lockres); 473 } 474 475 /* lock_comm() 476 * Takes the lock on the TOKEN lock resource so no other 477 * node can communicate while the operation is underway. 478 */ 479 static int lock_comm(struct md_cluster_info *cinfo) 480 { 481 int error; 482 483 error = dlm_lock_sync(cinfo->token_lockres, DLM_LOCK_EX); 484 if (error) 485 pr_err("md-cluster(%s:%d): failed to get EX on TOKEN (%d)\n", 486 __func__, __LINE__, error); 487 return error; 488 } 489 490 static void unlock_comm(struct md_cluster_info *cinfo) 491 { 492 dlm_unlock_sync(cinfo->token_lockres); 493 } 494 495 /* __sendmsg() 496 * This function performs the actual sending of the message. This function is 497 * usually called after performing the encompassing operation 498 * The function: 499 * 1. Grabs the message lockresource in EX mode 500 * 2. Copies the message to the message LVB 501 * 3. Downconverts message lockresource to CR 502 * 4. Upconverts ack lock resource from CR to EX. This forces the BAST on other nodes 503 * and the other nodes read the message. The thread will wait here until all other 504 * nodes have released ack lock resource. 505 * 5. Downconvert ack lockresource to CR 506 */ 507 static int __sendmsg(struct md_cluster_info *cinfo, struct cluster_msg *cmsg) 508 { 509 int error; 510 int slot = cinfo->slot_number - 1; 511 512 cmsg->slot = cpu_to_le32(slot); 513 /*get EX on Message*/ 514 error = dlm_lock_sync(cinfo->message_lockres, DLM_LOCK_EX); 515 if (error) { 516 pr_err("md-cluster: failed to get EX on MESSAGE (%d)\n", error); 517 goto failed_message; 518 } 519 520 memcpy(cinfo->message_lockres->lksb.sb_lvbptr, (void *)cmsg, 521 sizeof(struct cluster_msg)); 522 /*down-convert EX to CR on Message*/ 523 error = dlm_lock_sync(cinfo->message_lockres, DLM_LOCK_CR); 524 if (error) { 525 pr_err("md-cluster: failed to convert EX to CR on MESSAGE(%d)\n", 526 error); 527 goto failed_message; 528 } 529 530 /*up-convert CR to EX on Ack*/ 531 error = dlm_lock_sync(cinfo->ack_lockres, DLM_LOCK_EX); 532 if (error) { 533 pr_err("md-cluster: failed to convert CR to EX on ACK(%d)\n", 534 error); 535 goto failed_ack; 536 } 537 538 /*down-convert EX to CR on Ack*/ 539 error = dlm_lock_sync(cinfo->ack_lockres, DLM_LOCK_CR); 540 if (error) { 541 pr_err("md-cluster: failed to convert EX to CR on ACK(%d)\n", 542 error); 543 goto failed_ack; 544 } 545 546 failed_ack: 547 dlm_unlock_sync(cinfo->message_lockres); 548 failed_message: 549 return error; 550 } 551 552 static int sendmsg(struct md_cluster_info *cinfo, struct cluster_msg *cmsg) 553 { 554 int ret; 555 556 lock_comm(cinfo); 557 ret = __sendmsg(cinfo, cmsg); 558 unlock_comm(cinfo); 559 return ret; 560 } 561 562 static int gather_all_resync_info(struct mddev *mddev, int total_slots) 563 { 564 struct md_cluster_info *cinfo = mddev->cluster_info; 565 int i, ret = 0; 566 struct dlm_lock_resource *bm_lockres; 567 struct suspend_info *s; 568 char str[64]; 569 570 571 for (i = 0; i < total_slots; i++) { 572 memset(str, '\0', 64); 573 snprintf(str, 64, "bitmap%04d", i); 574 bm_lockres = lockres_init(mddev, str, NULL, 1); 575 if (!bm_lockres) 576 return -ENOMEM; 577 if (i == (cinfo->slot_number - 1)) 578 continue; 579 580 bm_lockres->flags |= DLM_LKF_NOQUEUE; 581 ret = dlm_lock_sync(bm_lockres, DLM_LOCK_PW); 582 if (ret == -EAGAIN) { 583 memset(bm_lockres->lksb.sb_lvbptr, '\0', LVB_SIZE); 584 s = read_resync_info(mddev, bm_lockres); 585 if (s) { 586 pr_info("%s:%d Resync[%llu..%llu] in progress on %d\n", 587 __func__, __LINE__, 588 (unsigned long long) s->lo, 589 (unsigned long long) s->hi, i); 590 spin_lock_irq(&cinfo->suspend_lock); 591 s->slot = i; 592 list_add(&s->list, &cinfo->suspend_list); 593 spin_unlock_irq(&cinfo->suspend_lock); 594 } 595 ret = 0; 596 lockres_free(bm_lockres); 597 continue; 598 } 599 if (ret) 600 goto out; 601 /* TODO: Read the disk bitmap sb and check if it needs recovery */ 602 dlm_unlock_sync(bm_lockres); 603 lockres_free(bm_lockres); 604 } 605 out: 606 return ret; 607 } 608 609 static int join(struct mddev *mddev, int nodes) 610 { 611 struct md_cluster_info *cinfo; 612 int ret, ops_rv; 613 char str[64]; 614 615 if (!try_module_get(THIS_MODULE)) 616 return -ENOENT; 617 618 cinfo = kzalloc(sizeof(struct md_cluster_info), GFP_KERNEL); 619 if (!cinfo) 620 return -ENOMEM; 621 622 init_completion(&cinfo->completion); 623 624 mutex_init(&cinfo->sb_mutex); 625 mddev->cluster_info = cinfo; 626 627 memset(str, 0, 64); 628 pretty_uuid(str, mddev->uuid); 629 ret = dlm_new_lockspace(str, mddev->bitmap_info.cluster_name, 630 DLM_LSFL_FS, LVB_SIZE, 631 &md_ls_ops, mddev, &ops_rv, &cinfo->lockspace); 632 if (ret) 633 goto err; 634 wait_for_completion(&cinfo->completion); 635 if (nodes < cinfo->slot_number) { 636 pr_err("md-cluster: Slot allotted(%d) is greater than available slots(%d).", 637 cinfo->slot_number, nodes); 638 ret = -ERANGE; 639 goto err; 640 } 641 cinfo->sb_lock = lockres_init(mddev, "cmd-super", 642 NULL, 0); 643 if (!cinfo->sb_lock) { 644 ret = -ENOMEM; 645 goto err; 646 } 647 /* Initiate the communication resources */ 648 ret = -ENOMEM; 649 cinfo->recv_thread = md_register_thread(recv_daemon, mddev, "cluster_recv"); 650 if (!cinfo->recv_thread) { 651 pr_err("md-cluster: cannot allocate memory for recv_thread!\n"); 652 goto err; 653 } 654 cinfo->message_lockres = lockres_init(mddev, "message", NULL, 1); 655 if (!cinfo->message_lockres) 656 goto err; 657 cinfo->token_lockres = lockres_init(mddev, "token", NULL, 0); 658 if (!cinfo->token_lockres) 659 goto err; 660 cinfo->ack_lockres = lockres_init(mddev, "ack", ack_bast, 0); 661 if (!cinfo->ack_lockres) 662 goto err; 663 cinfo->no_new_dev_lockres = lockres_init(mddev, "no-new-dev", NULL, 0); 664 if (!cinfo->no_new_dev_lockres) 665 goto err; 666 667 /* get sync CR lock on ACK. */ 668 if (dlm_lock_sync(cinfo->ack_lockres, DLM_LOCK_CR)) 669 pr_err("md-cluster: failed to get a sync CR lock on ACK!(%d)\n", 670 ret); 671 /* get sync CR lock on no-new-dev. */ 672 if (dlm_lock_sync(cinfo->no_new_dev_lockres, DLM_LOCK_CR)) 673 pr_err("md-cluster: failed to get a sync CR lock on no-new-dev!(%d)\n", ret); 674 675 676 pr_info("md-cluster: Joined cluster %s slot %d\n", str, cinfo->slot_number); 677 snprintf(str, 64, "bitmap%04d", cinfo->slot_number - 1); 678 cinfo->bitmap_lockres = lockres_init(mddev, str, NULL, 1); 679 if (!cinfo->bitmap_lockres) 680 goto err; 681 if (dlm_lock_sync(cinfo->bitmap_lockres, DLM_LOCK_PW)) { 682 pr_err("Failed to get bitmap lock\n"); 683 ret = -EINVAL; 684 goto err; 685 } 686 687 INIT_LIST_HEAD(&cinfo->suspend_list); 688 spin_lock_init(&cinfo->suspend_lock); 689 690 ret = gather_all_resync_info(mddev, nodes); 691 if (ret) 692 goto err; 693 694 return 0; 695 err: 696 lockres_free(cinfo->message_lockres); 697 lockres_free(cinfo->token_lockres); 698 lockres_free(cinfo->ack_lockres); 699 lockres_free(cinfo->no_new_dev_lockres); 700 lockres_free(cinfo->bitmap_lockres); 701 lockres_free(cinfo->sb_lock); 702 if (cinfo->lockspace) 703 dlm_release_lockspace(cinfo->lockspace, 2); 704 mddev->cluster_info = NULL; 705 kfree(cinfo); 706 module_put(THIS_MODULE); 707 return ret; 708 } 709 710 static int leave(struct mddev *mddev) 711 { 712 struct md_cluster_info *cinfo = mddev->cluster_info; 713 714 if (!cinfo) 715 return 0; 716 md_unregister_thread(&cinfo->recovery_thread); 717 md_unregister_thread(&cinfo->recv_thread); 718 lockres_free(cinfo->message_lockres); 719 lockres_free(cinfo->token_lockres); 720 lockres_free(cinfo->ack_lockres); 721 lockres_free(cinfo->no_new_dev_lockres); 722 lockres_free(cinfo->sb_lock); 723 lockres_free(cinfo->bitmap_lockres); 724 dlm_release_lockspace(cinfo->lockspace, 2); 725 return 0; 726 } 727 728 /* slot_number(): Returns the MD slot number to use 729 * DLM starts the slot numbers from 1, wheras cluster-md 730 * wants the number to be from zero, so we deduct one 731 */ 732 static int slot_number(struct mddev *mddev) 733 { 734 struct md_cluster_info *cinfo = mddev->cluster_info; 735 736 return cinfo->slot_number - 1; 737 } 738 739 static void resync_info_update(struct mddev *mddev, sector_t lo, sector_t hi) 740 { 741 struct md_cluster_info *cinfo = mddev->cluster_info; 742 743 add_resync_info(mddev, cinfo->bitmap_lockres, lo, hi); 744 /* Re-acquire the lock to refresh LVB */ 745 dlm_lock_sync(cinfo->bitmap_lockres, DLM_LOCK_PW); 746 } 747 748 static int metadata_update_start(struct mddev *mddev) 749 { 750 return lock_comm(mddev->cluster_info); 751 } 752 753 static int metadata_update_finish(struct mddev *mddev) 754 { 755 struct md_cluster_info *cinfo = mddev->cluster_info; 756 struct cluster_msg cmsg; 757 int ret; 758 759 memset(&cmsg, 0, sizeof(cmsg)); 760 cmsg.type = cpu_to_le32(METADATA_UPDATED); 761 ret = __sendmsg(cinfo, &cmsg); 762 unlock_comm(cinfo); 763 return ret; 764 } 765 766 static int metadata_update_cancel(struct mddev *mddev) 767 { 768 struct md_cluster_info *cinfo = mddev->cluster_info; 769 770 return dlm_unlock_sync(cinfo->token_lockres); 771 } 772 773 static int resync_send(struct mddev *mddev, enum msg_type type, 774 sector_t lo, sector_t hi) 775 { 776 struct md_cluster_info *cinfo = mddev->cluster_info; 777 struct cluster_msg cmsg; 778 int slot = cinfo->slot_number - 1; 779 780 pr_info("%s:%d lo: %llu hi: %llu\n", __func__, __LINE__, 781 (unsigned long long)lo, 782 (unsigned long long)hi); 783 resync_info_update(mddev, lo, hi); 784 cmsg.type = cpu_to_le32(type); 785 cmsg.slot = cpu_to_le32(slot); 786 cmsg.low = cpu_to_le64(lo); 787 cmsg.high = cpu_to_le64(hi); 788 return sendmsg(cinfo, &cmsg); 789 } 790 791 static int resync_start(struct mddev *mddev, sector_t lo, sector_t hi) 792 { 793 pr_info("%s:%d\n", __func__, __LINE__); 794 return resync_send(mddev, RESYNCING, lo, hi); 795 } 796 797 static void resync_finish(struct mddev *mddev) 798 { 799 pr_info("%s:%d\n", __func__, __LINE__); 800 resync_send(mddev, RESYNCING, 0, 0); 801 } 802 803 static int area_resyncing(struct mddev *mddev, sector_t lo, sector_t hi) 804 { 805 struct md_cluster_info *cinfo = mddev->cluster_info; 806 int ret = 0; 807 struct suspend_info *s; 808 809 spin_lock_irq(&cinfo->suspend_lock); 810 if (list_empty(&cinfo->suspend_list)) 811 goto out; 812 list_for_each_entry(s, &cinfo->suspend_list, list) 813 if (hi > s->lo && lo < s->hi) { 814 ret = 1; 815 break; 816 } 817 out: 818 spin_unlock_irq(&cinfo->suspend_lock); 819 return ret; 820 } 821 822 static int add_new_disk_start(struct mddev *mddev, struct md_rdev *rdev) 823 { 824 struct md_cluster_info *cinfo = mddev->cluster_info; 825 struct cluster_msg cmsg; 826 int ret = 0; 827 struct mdp_superblock_1 *sb = page_address(rdev->sb_page); 828 char *uuid = sb->device_uuid; 829 830 memset(&cmsg, 0, sizeof(cmsg)); 831 cmsg.type = cpu_to_le32(NEWDISK); 832 memcpy(cmsg.uuid, uuid, 16); 833 cmsg.raid_slot = rdev->desc_nr; 834 lock_comm(cinfo); 835 ret = __sendmsg(cinfo, &cmsg); 836 if (ret) 837 return ret; 838 cinfo->no_new_dev_lockres->flags |= DLM_LKF_NOQUEUE; 839 ret = dlm_lock_sync(cinfo->no_new_dev_lockres, DLM_LOCK_EX); 840 cinfo->no_new_dev_lockres->flags &= ~DLM_LKF_NOQUEUE; 841 /* Some node does not "see" the device */ 842 if (ret == -EAGAIN) 843 ret = -ENOENT; 844 else 845 dlm_lock_sync(cinfo->no_new_dev_lockres, DLM_LOCK_CR); 846 return ret; 847 } 848 849 static int add_new_disk_finish(struct mddev *mddev) 850 { 851 struct cluster_msg cmsg; 852 struct md_cluster_info *cinfo = mddev->cluster_info; 853 int ret; 854 /* Write sb and inform others */ 855 md_update_sb(mddev, 1); 856 cmsg.type = METADATA_UPDATED; 857 ret = __sendmsg(cinfo, &cmsg); 858 unlock_comm(cinfo); 859 return ret; 860 } 861 862 static int new_disk_ack(struct mddev *mddev, bool ack) 863 { 864 struct md_cluster_info *cinfo = mddev->cluster_info; 865 866 if (!test_bit(MD_CLUSTER_WAITING_FOR_NEWDISK, &cinfo->state)) { 867 pr_warn("md-cluster(%s): Spurious cluster confirmation\n", mdname(mddev)); 868 return -EINVAL; 869 } 870 871 if (ack) 872 dlm_unlock_sync(cinfo->no_new_dev_lockres); 873 complete(&cinfo->newdisk_completion); 874 return 0; 875 } 876 877 static int remove_disk(struct mddev *mddev, struct md_rdev *rdev) 878 { 879 struct cluster_msg cmsg; 880 struct md_cluster_info *cinfo = mddev->cluster_info; 881 cmsg.type = REMOVE; 882 cmsg.raid_slot = rdev->desc_nr; 883 return __sendmsg(cinfo, &cmsg); 884 } 885 886 static struct md_cluster_operations cluster_ops = { 887 .join = join, 888 .leave = leave, 889 .slot_number = slot_number, 890 .resync_info_update = resync_info_update, 891 .resync_start = resync_start, 892 .resync_finish = resync_finish, 893 .metadata_update_start = metadata_update_start, 894 .metadata_update_finish = metadata_update_finish, 895 .metadata_update_cancel = metadata_update_cancel, 896 .area_resyncing = area_resyncing, 897 .add_new_disk_start = add_new_disk_start, 898 .add_new_disk_finish = add_new_disk_finish, 899 .new_disk_ack = new_disk_ack, 900 .remove_disk = remove_disk, 901 }; 902 903 static int __init cluster_init(void) 904 { 905 pr_warn("md-cluster: EXPERIMENTAL. Use with caution\n"); 906 pr_info("Registering Cluster MD functions\n"); 907 register_md_cluster_operations(&cluster_ops, THIS_MODULE); 908 return 0; 909 } 910 911 static void cluster_exit(void) 912 { 913 unregister_md_cluster_operations(); 914 } 915 916 module_init(cluster_init); 917 module_exit(cluster_exit); 918 MODULE_LICENSE("GPL"); 919 MODULE_DESCRIPTION("Clustering support for MD"); 920