1 /* 2 * Copyright (C) 2015, SUSE 3 * 4 * This program is free software; you can redistribute it and/or modify 5 * it under the terms of the GNU General Public License as published by 6 * the Free Software Foundation; either version 2, or (at your option) 7 * any later version. 8 * 9 */ 10 11 12 #include <linux/module.h> 13 #include <linux/kthread.h> 14 #include <linux/dlm.h> 15 #include <linux/sched.h> 16 #include <linux/raid/md_p.h> 17 #include "md.h" 18 #include "md-bitmap.h" 19 #include "md-cluster.h" 20 21 #define LVB_SIZE 64 22 #define NEW_DEV_TIMEOUT 5000 23 24 struct dlm_lock_resource { 25 dlm_lockspace_t *ls; 26 struct dlm_lksb lksb; 27 char *name; /* lock name. */ 28 uint32_t flags; /* flags to pass to dlm_lock() */ 29 wait_queue_head_t sync_locking; /* wait queue for synchronized locking */ 30 bool sync_locking_done; 31 void (*bast)(void *arg, int mode); /* blocking AST function pointer*/ 32 struct mddev *mddev; /* pointing back to mddev. */ 33 int mode; 34 }; 35 36 struct suspend_info { 37 int slot; 38 sector_t lo; 39 sector_t hi; 40 struct list_head list; 41 }; 42 43 struct resync_info { 44 __le64 lo; 45 __le64 hi; 46 }; 47 48 /* md_cluster_info flags */ 49 #define MD_CLUSTER_WAITING_FOR_NEWDISK 1 50 #define MD_CLUSTER_SUSPEND_READ_BALANCING 2 51 #define MD_CLUSTER_BEGIN_JOIN_CLUSTER 3 52 53 /* Lock the send communication. This is done through 54 * bit manipulation as opposed to a mutex in order to 55 * accomodate lock and hold. See next comment. 56 */ 57 #define MD_CLUSTER_SEND_LOCK 4 58 /* If cluster operations (such as adding a disk) must lock the 59 * communication channel, so as to perform extra operations 60 * (update metadata) and no other operation is allowed on the 61 * MD. Token needs to be locked and held until the operation 62 * completes witha md_update_sb(), which would eventually release 63 * the lock. 64 */ 65 #define MD_CLUSTER_SEND_LOCKED_ALREADY 5 66 /* We should receive message after node joined cluster and 67 * set up all the related infos such as bitmap and personality */ 68 #define MD_CLUSTER_ALREADY_IN_CLUSTER 6 69 #define MD_CLUSTER_PENDING_RECV_EVENT 7 70 #define MD_CLUSTER_HOLDING_MUTEX_FOR_RECVD 8 71 72 struct md_cluster_info { 73 struct mddev *mddev; /* the md device which md_cluster_info belongs to */ 74 /* dlm lock space and resources for clustered raid. */ 75 dlm_lockspace_t *lockspace; 76 int slot_number; 77 struct completion completion; 78 struct mutex recv_mutex; 79 struct dlm_lock_resource *bitmap_lockres; 80 struct dlm_lock_resource **other_bitmap_lockres; 81 struct dlm_lock_resource *resync_lockres; 82 struct list_head suspend_list; 83 spinlock_t suspend_lock; 84 struct md_thread *recovery_thread; 85 unsigned long recovery_map; 86 /* communication loc resources */ 87 struct dlm_lock_resource *ack_lockres; 88 struct dlm_lock_resource *message_lockres; 89 struct dlm_lock_resource *token_lockres; 90 struct dlm_lock_resource *no_new_dev_lockres; 91 struct md_thread *recv_thread; 92 struct completion newdisk_completion; 93 wait_queue_head_t wait; 94 unsigned long state; 95 /* record the region in RESYNCING message */ 96 sector_t sync_low; 97 sector_t sync_hi; 98 }; 99 100 enum msg_type { 101 METADATA_UPDATED = 0, 102 RESYNCING, 103 NEWDISK, 104 REMOVE, 105 RE_ADD, 106 BITMAP_NEEDS_SYNC, 107 CHANGE_CAPACITY, 108 BITMAP_RESIZE, 109 }; 110 111 struct cluster_msg { 112 __le32 type; 113 __le32 slot; 114 /* TODO: Unionize this for smaller footprint */ 115 __le64 low; 116 __le64 high; 117 char uuid[16]; 118 __le32 raid_slot; 119 }; 120 121 static void sync_ast(void *arg) 122 { 123 struct dlm_lock_resource *res; 124 125 res = arg; 126 res->sync_locking_done = true; 127 wake_up(&res->sync_locking); 128 } 129 130 static int dlm_lock_sync(struct dlm_lock_resource *res, int mode) 131 { 132 int ret = 0; 133 134 ret = dlm_lock(res->ls, mode, &res->lksb, 135 res->flags, res->name, strlen(res->name), 136 0, sync_ast, res, res->bast); 137 if (ret) 138 return ret; 139 wait_event(res->sync_locking, res->sync_locking_done); 140 res->sync_locking_done = false; 141 if (res->lksb.sb_status == 0) 142 res->mode = mode; 143 return res->lksb.sb_status; 144 } 145 146 static int dlm_unlock_sync(struct dlm_lock_resource *res) 147 { 148 return dlm_lock_sync(res, DLM_LOCK_NL); 149 } 150 151 /* 152 * An variation of dlm_lock_sync, which make lock request could 153 * be interrupted 154 */ 155 static int dlm_lock_sync_interruptible(struct dlm_lock_resource *res, int mode, 156 struct mddev *mddev) 157 { 158 int ret = 0; 159 160 ret = dlm_lock(res->ls, mode, &res->lksb, 161 res->flags, res->name, strlen(res->name), 162 0, sync_ast, res, res->bast); 163 if (ret) 164 return ret; 165 166 wait_event(res->sync_locking, res->sync_locking_done 167 || kthread_should_stop() 168 || test_bit(MD_CLOSING, &mddev->flags)); 169 if (!res->sync_locking_done) { 170 /* 171 * the convert queue contains the lock request when request is 172 * interrupted, and sync_ast could still be run, so need to 173 * cancel the request and reset completion 174 */ 175 ret = dlm_unlock(res->ls, res->lksb.sb_lkid, DLM_LKF_CANCEL, 176 &res->lksb, res); 177 res->sync_locking_done = false; 178 if (unlikely(ret != 0)) 179 pr_info("failed to cancel previous lock request " 180 "%s return %d\n", res->name, ret); 181 return -EPERM; 182 } else 183 res->sync_locking_done = false; 184 if (res->lksb.sb_status == 0) 185 res->mode = mode; 186 return res->lksb.sb_status; 187 } 188 189 static struct dlm_lock_resource *lockres_init(struct mddev *mddev, 190 char *name, void (*bastfn)(void *arg, int mode), int with_lvb) 191 { 192 struct dlm_lock_resource *res = NULL; 193 int ret, namelen; 194 struct md_cluster_info *cinfo = mddev->cluster_info; 195 196 res = kzalloc(sizeof(struct dlm_lock_resource), GFP_KERNEL); 197 if (!res) 198 return NULL; 199 init_waitqueue_head(&res->sync_locking); 200 res->sync_locking_done = false; 201 res->ls = cinfo->lockspace; 202 res->mddev = mddev; 203 res->mode = DLM_LOCK_IV; 204 namelen = strlen(name); 205 res->name = kzalloc(namelen + 1, GFP_KERNEL); 206 if (!res->name) { 207 pr_err("md-cluster: Unable to allocate resource name for resource %s\n", name); 208 goto out_err; 209 } 210 strlcpy(res->name, name, namelen + 1); 211 if (with_lvb) { 212 res->lksb.sb_lvbptr = kzalloc(LVB_SIZE, GFP_KERNEL); 213 if (!res->lksb.sb_lvbptr) { 214 pr_err("md-cluster: Unable to allocate LVB for resource %s\n", name); 215 goto out_err; 216 } 217 res->flags = DLM_LKF_VALBLK; 218 } 219 220 if (bastfn) 221 res->bast = bastfn; 222 223 res->flags |= DLM_LKF_EXPEDITE; 224 225 ret = dlm_lock_sync(res, DLM_LOCK_NL); 226 if (ret) { 227 pr_err("md-cluster: Unable to lock NL on new lock resource %s\n", name); 228 goto out_err; 229 } 230 res->flags &= ~DLM_LKF_EXPEDITE; 231 res->flags |= DLM_LKF_CONVERT; 232 233 return res; 234 out_err: 235 kfree(res->lksb.sb_lvbptr); 236 kfree(res->name); 237 kfree(res); 238 return NULL; 239 } 240 241 static void lockres_free(struct dlm_lock_resource *res) 242 { 243 int ret = 0; 244 245 if (!res) 246 return; 247 248 /* 249 * use FORCEUNLOCK flag, so we can unlock even the lock is on the 250 * waiting or convert queue 251 */ 252 ret = dlm_unlock(res->ls, res->lksb.sb_lkid, DLM_LKF_FORCEUNLOCK, 253 &res->lksb, res); 254 if (unlikely(ret != 0)) 255 pr_err("failed to unlock %s return %d\n", res->name, ret); 256 else 257 wait_event(res->sync_locking, res->sync_locking_done); 258 259 kfree(res->name); 260 kfree(res->lksb.sb_lvbptr); 261 kfree(res); 262 } 263 264 static void add_resync_info(struct dlm_lock_resource *lockres, 265 sector_t lo, sector_t hi) 266 { 267 struct resync_info *ri; 268 269 ri = (struct resync_info *)lockres->lksb.sb_lvbptr; 270 ri->lo = cpu_to_le64(lo); 271 ri->hi = cpu_to_le64(hi); 272 } 273 274 static struct suspend_info *read_resync_info(struct mddev *mddev, struct dlm_lock_resource *lockres) 275 { 276 struct resync_info ri; 277 struct suspend_info *s = NULL; 278 sector_t hi = 0; 279 280 dlm_lock_sync(lockres, DLM_LOCK_CR); 281 memcpy(&ri, lockres->lksb.sb_lvbptr, sizeof(struct resync_info)); 282 hi = le64_to_cpu(ri.hi); 283 if (hi > 0) { 284 s = kzalloc(sizeof(struct suspend_info), GFP_KERNEL); 285 if (!s) 286 goto out; 287 s->hi = hi; 288 s->lo = le64_to_cpu(ri.lo); 289 } 290 dlm_unlock_sync(lockres); 291 out: 292 return s; 293 } 294 295 static void recover_bitmaps(struct md_thread *thread) 296 { 297 struct mddev *mddev = thread->mddev; 298 struct md_cluster_info *cinfo = mddev->cluster_info; 299 struct dlm_lock_resource *bm_lockres; 300 char str[64]; 301 int slot, ret; 302 struct suspend_info *s, *tmp; 303 sector_t lo, hi; 304 305 while (cinfo->recovery_map) { 306 slot = fls64((u64)cinfo->recovery_map) - 1; 307 308 snprintf(str, 64, "bitmap%04d", slot); 309 bm_lockres = lockres_init(mddev, str, NULL, 1); 310 if (!bm_lockres) { 311 pr_err("md-cluster: Cannot initialize bitmaps\n"); 312 goto clear_bit; 313 } 314 315 ret = dlm_lock_sync_interruptible(bm_lockres, DLM_LOCK_PW, mddev); 316 if (ret) { 317 pr_err("md-cluster: Could not DLM lock %s: %d\n", 318 str, ret); 319 goto clear_bit; 320 } 321 ret = md_bitmap_copy_from_slot(mddev, slot, &lo, &hi, true); 322 if (ret) { 323 pr_err("md-cluster: Could not copy data from bitmap %d\n", slot); 324 goto clear_bit; 325 } 326 327 /* Clear suspend_area associated with the bitmap */ 328 spin_lock_irq(&cinfo->suspend_lock); 329 list_for_each_entry_safe(s, tmp, &cinfo->suspend_list, list) 330 if (slot == s->slot) { 331 list_del(&s->list); 332 kfree(s); 333 } 334 spin_unlock_irq(&cinfo->suspend_lock); 335 336 if (hi > 0) { 337 if (lo < mddev->recovery_cp) 338 mddev->recovery_cp = lo; 339 /* wake up thread to continue resync in case resync 340 * is not finished */ 341 if (mddev->recovery_cp != MaxSector) { 342 /* 343 * clear the REMOTE flag since we will launch 344 * resync thread in current node. 345 */ 346 clear_bit(MD_RESYNCING_REMOTE, 347 &mddev->recovery); 348 set_bit(MD_RECOVERY_NEEDED, &mddev->recovery); 349 md_wakeup_thread(mddev->thread); 350 } 351 } 352 clear_bit: 353 lockres_free(bm_lockres); 354 clear_bit(slot, &cinfo->recovery_map); 355 } 356 } 357 358 static void recover_prep(void *arg) 359 { 360 struct mddev *mddev = arg; 361 struct md_cluster_info *cinfo = mddev->cluster_info; 362 set_bit(MD_CLUSTER_SUSPEND_READ_BALANCING, &cinfo->state); 363 } 364 365 static void __recover_slot(struct mddev *mddev, int slot) 366 { 367 struct md_cluster_info *cinfo = mddev->cluster_info; 368 369 set_bit(slot, &cinfo->recovery_map); 370 if (!cinfo->recovery_thread) { 371 cinfo->recovery_thread = md_register_thread(recover_bitmaps, 372 mddev, "recover"); 373 if (!cinfo->recovery_thread) { 374 pr_warn("md-cluster: Could not create recovery thread\n"); 375 return; 376 } 377 } 378 md_wakeup_thread(cinfo->recovery_thread); 379 } 380 381 static void recover_slot(void *arg, struct dlm_slot *slot) 382 { 383 struct mddev *mddev = arg; 384 struct md_cluster_info *cinfo = mddev->cluster_info; 385 386 pr_info("md-cluster: %s Node %d/%d down. My slot: %d. Initiating recovery.\n", 387 mddev->bitmap_info.cluster_name, 388 slot->nodeid, slot->slot, 389 cinfo->slot_number); 390 /* deduct one since dlm slot starts from one while the num of 391 * cluster-md begins with 0 */ 392 __recover_slot(mddev, slot->slot - 1); 393 } 394 395 static void recover_done(void *arg, struct dlm_slot *slots, 396 int num_slots, int our_slot, 397 uint32_t generation) 398 { 399 struct mddev *mddev = arg; 400 struct md_cluster_info *cinfo = mddev->cluster_info; 401 402 cinfo->slot_number = our_slot; 403 /* completion is only need to be complete when node join cluster, 404 * it doesn't need to run during another node's failure */ 405 if (test_bit(MD_CLUSTER_BEGIN_JOIN_CLUSTER, &cinfo->state)) { 406 complete(&cinfo->completion); 407 clear_bit(MD_CLUSTER_BEGIN_JOIN_CLUSTER, &cinfo->state); 408 } 409 clear_bit(MD_CLUSTER_SUSPEND_READ_BALANCING, &cinfo->state); 410 } 411 412 /* the ops is called when node join the cluster, and do lock recovery 413 * if node failure occurs */ 414 static const struct dlm_lockspace_ops md_ls_ops = { 415 .recover_prep = recover_prep, 416 .recover_slot = recover_slot, 417 .recover_done = recover_done, 418 }; 419 420 /* 421 * The BAST function for the ack lock resource 422 * This function wakes up the receive thread in 423 * order to receive and process the message. 424 */ 425 static void ack_bast(void *arg, int mode) 426 { 427 struct dlm_lock_resource *res = arg; 428 struct md_cluster_info *cinfo = res->mddev->cluster_info; 429 430 if (mode == DLM_LOCK_EX) { 431 if (test_bit(MD_CLUSTER_ALREADY_IN_CLUSTER, &cinfo->state)) 432 md_wakeup_thread(cinfo->recv_thread); 433 else 434 set_bit(MD_CLUSTER_PENDING_RECV_EVENT, &cinfo->state); 435 } 436 } 437 438 static void __remove_suspend_info(struct md_cluster_info *cinfo, int slot) 439 { 440 struct suspend_info *s, *tmp; 441 442 list_for_each_entry_safe(s, tmp, &cinfo->suspend_list, list) 443 if (slot == s->slot) { 444 list_del(&s->list); 445 kfree(s); 446 break; 447 } 448 } 449 450 static void remove_suspend_info(struct mddev *mddev, int slot) 451 { 452 struct md_cluster_info *cinfo = mddev->cluster_info; 453 mddev->pers->quiesce(mddev, 1); 454 spin_lock_irq(&cinfo->suspend_lock); 455 __remove_suspend_info(cinfo, slot); 456 spin_unlock_irq(&cinfo->suspend_lock); 457 mddev->pers->quiesce(mddev, 0); 458 } 459 460 461 static void process_suspend_info(struct mddev *mddev, 462 int slot, sector_t lo, sector_t hi) 463 { 464 struct md_cluster_info *cinfo = mddev->cluster_info; 465 struct suspend_info *s; 466 467 if (!hi) { 468 /* 469 * clear the REMOTE flag since resync or recovery is finished 470 * in remote node. 471 */ 472 clear_bit(MD_RESYNCING_REMOTE, &mddev->recovery); 473 remove_suspend_info(mddev, slot); 474 set_bit(MD_RECOVERY_NEEDED, &mddev->recovery); 475 md_wakeup_thread(mddev->thread); 476 return; 477 } 478 479 /* 480 * The bitmaps are not same for different nodes 481 * if RESYNCING is happening in one node, then 482 * the node which received the RESYNCING message 483 * probably will perform resync with the region 484 * [lo, hi] again, so we could reduce resync time 485 * a lot if we can ensure that the bitmaps among 486 * different nodes are match up well. 487 * 488 * sync_low/hi is used to record the region which 489 * arrived in the previous RESYNCING message, 490 * 491 * Call bitmap_sync_with_cluster to clear 492 * NEEDED_MASK and set RESYNC_MASK since 493 * resync thread is running in another node, 494 * so we don't need to do the resync again 495 * with the same section */ 496 md_bitmap_sync_with_cluster(mddev, cinfo->sync_low, cinfo->sync_hi, lo, hi); 497 cinfo->sync_low = lo; 498 cinfo->sync_hi = hi; 499 500 s = kzalloc(sizeof(struct suspend_info), GFP_KERNEL); 501 if (!s) 502 return; 503 s->slot = slot; 504 s->lo = lo; 505 s->hi = hi; 506 mddev->pers->quiesce(mddev, 1); 507 spin_lock_irq(&cinfo->suspend_lock); 508 /* Remove existing entry (if exists) before adding */ 509 __remove_suspend_info(cinfo, slot); 510 list_add(&s->list, &cinfo->suspend_list); 511 spin_unlock_irq(&cinfo->suspend_lock); 512 mddev->pers->quiesce(mddev, 0); 513 } 514 515 static void process_add_new_disk(struct mddev *mddev, struct cluster_msg *cmsg) 516 { 517 char disk_uuid[64]; 518 struct md_cluster_info *cinfo = mddev->cluster_info; 519 char event_name[] = "EVENT=ADD_DEVICE"; 520 char raid_slot[16]; 521 char *envp[] = {event_name, disk_uuid, raid_slot, NULL}; 522 int len; 523 524 len = snprintf(disk_uuid, 64, "DEVICE_UUID="); 525 sprintf(disk_uuid + len, "%pU", cmsg->uuid); 526 snprintf(raid_slot, 16, "RAID_DISK=%d", le32_to_cpu(cmsg->raid_slot)); 527 pr_info("%s:%d Sending kobject change with %s and %s\n", __func__, __LINE__, disk_uuid, raid_slot); 528 init_completion(&cinfo->newdisk_completion); 529 set_bit(MD_CLUSTER_WAITING_FOR_NEWDISK, &cinfo->state); 530 kobject_uevent_env(&disk_to_dev(mddev->gendisk)->kobj, KOBJ_CHANGE, envp); 531 wait_for_completion_timeout(&cinfo->newdisk_completion, 532 NEW_DEV_TIMEOUT); 533 clear_bit(MD_CLUSTER_WAITING_FOR_NEWDISK, &cinfo->state); 534 } 535 536 537 static void process_metadata_update(struct mddev *mddev, struct cluster_msg *msg) 538 { 539 int got_lock = 0; 540 struct md_cluster_info *cinfo = mddev->cluster_info; 541 mddev->good_device_nr = le32_to_cpu(msg->raid_slot); 542 543 dlm_lock_sync(cinfo->no_new_dev_lockres, DLM_LOCK_CR); 544 wait_event(mddev->thread->wqueue, 545 (got_lock = mddev_trylock(mddev)) || 546 test_bit(MD_CLUSTER_HOLDING_MUTEX_FOR_RECVD, &cinfo->state)); 547 md_reload_sb(mddev, mddev->good_device_nr); 548 if (got_lock) 549 mddev_unlock(mddev); 550 } 551 552 static void process_remove_disk(struct mddev *mddev, struct cluster_msg *msg) 553 { 554 struct md_rdev *rdev; 555 556 rcu_read_lock(); 557 rdev = md_find_rdev_nr_rcu(mddev, le32_to_cpu(msg->raid_slot)); 558 if (rdev) { 559 set_bit(ClusterRemove, &rdev->flags); 560 set_bit(MD_RECOVERY_NEEDED, &mddev->recovery); 561 md_wakeup_thread(mddev->thread); 562 } 563 else 564 pr_warn("%s: %d Could not find disk(%d) to REMOVE\n", 565 __func__, __LINE__, le32_to_cpu(msg->raid_slot)); 566 rcu_read_unlock(); 567 } 568 569 static void process_readd_disk(struct mddev *mddev, struct cluster_msg *msg) 570 { 571 struct md_rdev *rdev; 572 573 rcu_read_lock(); 574 rdev = md_find_rdev_nr_rcu(mddev, le32_to_cpu(msg->raid_slot)); 575 if (rdev && test_bit(Faulty, &rdev->flags)) 576 clear_bit(Faulty, &rdev->flags); 577 else 578 pr_warn("%s: %d Could not find disk(%d) which is faulty", 579 __func__, __LINE__, le32_to_cpu(msg->raid_slot)); 580 rcu_read_unlock(); 581 } 582 583 static int process_recvd_msg(struct mddev *mddev, struct cluster_msg *msg) 584 { 585 int ret = 0; 586 587 if (WARN(mddev->cluster_info->slot_number - 1 == le32_to_cpu(msg->slot), 588 "node %d received it's own msg\n", le32_to_cpu(msg->slot))) 589 return -1; 590 switch (le32_to_cpu(msg->type)) { 591 case METADATA_UPDATED: 592 process_metadata_update(mddev, msg); 593 break; 594 case CHANGE_CAPACITY: 595 set_capacity(mddev->gendisk, mddev->array_sectors); 596 revalidate_disk(mddev->gendisk); 597 break; 598 case RESYNCING: 599 set_bit(MD_RESYNCING_REMOTE, &mddev->recovery); 600 process_suspend_info(mddev, le32_to_cpu(msg->slot), 601 le64_to_cpu(msg->low), 602 le64_to_cpu(msg->high)); 603 break; 604 case NEWDISK: 605 process_add_new_disk(mddev, msg); 606 break; 607 case REMOVE: 608 process_remove_disk(mddev, msg); 609 break; 610 case RE_ADD: 611 process_readd_disk(mddev, msg); 612 break; 613 case BITMAP_NEEDS_SYNC: 614 __recover_slot(mddev, le32_to_cpu(msg->slot)); 615 break; 616 case BITMAP_RESIZE: 617 if (le64_to_cpu(msg->high) != mddev->pers->size(mddev, 0, 0)) 618 ret = md_bitmap_resize(mddev->bitmap, 619 le64_to_cpu(msg->high), 0, 0); 620 break; 621 default: 622 ret = -1; 623 pr_warn("%s:%d Received unknown message from %d\n", 624 __func__, __LINE__, msg->slot); 625 } 626 return ret; 627 } 628 629 /* 630 * thread for receiving message 631 */ 632 static void recv_daemon(struct md_thread *thread) 633 { 634 struct md_cluster_info *cinfo = thread->mddev->cluster_info; 635 struct dlm_lock_resource *ack_lockres = cinfo->ack_lockres; 636 struct dlm_lock_resource *message_lockres = cinfo->message_lockres; 637 struct cluster_msg msg; 638 int ret; 639 640 mutex_lock(&cinfo->recv_mutex); 641 /*get CR on Message*/ 642 if (dlm_lock_sync(message_lockres, DLM_LOCK_CR)) { 643 pr_err("md/raid1:failed to get CR on MESSAGE\n"); 644 mutex_unlock(&cinfo->recv_mutex); 645 return; 646 } 647 648 /* read lvb and wake up thread to process this message_lockres */ 649 memcpy(&msg, message_lockres->lksb.sb_lvbptr, sizeof(struct cluster_msg)); 650 ret = process_recvd_msg(thread->mddev, &msg); 651 if (ret) 652 goto out; 653 654 /*release CR on ack_lockres*/ 655 ret = dlm_unlock_sync(ack_lockres); 656 if (unlikely(ret != 0)) 657 pr_info("unlock ack failed return %d\n", ret); 658 /*up-convert to PR on message_lockres*/ 659 ret = dlm_lock_sync(message_lockres, DLM_LOCK_PR); 660 if (unlikely(ret != 0)) 661 pr_info("lock PR on msg failed return %d\n", ret); 662 /*get CR on ack_lockres again*/ 663 ret = dlm_lock_sync(ack_lockres, DLM_LOCK_CR); 664 if (unlikely(ret != 0)) 665 pr_info("lock CR on ack failed return %d\n", ret); 666 out: 667 /*release CR on message_lockres*/ 668 ret = dlm_unlock_sync(message_lockres); 669 if (unlikely(ret != 0)) 670 pr_info("unlock msg failed return %d\n", ret); 671 mutex_unlock(&cinfo->recv_mutex); 672 } 673 674 /* lock_token() 675 * Takes the lock on the TOKEN lock resource so no other 676 * node can communicate while the operation is underway. 677 */ 678 static int lock_token(struct md_cluster_info *cinfo, bool mddev_locked) 679 { 680 int error, set_bit = 0; 681 struct mddev *mddev = cinfo->mddev; 682 683 /* 684 * If resync thread run after raid1d thread, then process_metadata_update 685 * could not continue if raid1d held reconfig_mutex (and raid1d is blocked 686 * since another node already got EX on Token and waitting the EX of Ack), 687 * so let resync wake up thread in case flag is set. 688 */ 689 if (mddev_locked && !test_bit(MD_CLUSTER_HOLDING_MUTEX_FOR_RECVD, 690 &cinfo->state)) { 691 error = test_and_set_bit_lock(MD_CLUSTER_HOLDING_MUTEX_FOR_RECVD, 692 &cinfo->state); 693 WARN_ON_ONCE(error); 694 md_wakeup_thread(mddev->thread); 695 set_bit = 1; 696 } 697 error = dlm_lock_sync(cinfo->token_lockres, DLM_LOCK_EX); 698 if (set_bit) 699 clear_bit_unlock(MD_CLUSTER_HOLDING_MUTEX_FOR_RECVD, &cinfo->state); 700 701 if (error) 702 pr_err("md-cluster(%s:%d): failed to get EX on TOKEN (%d)\n", 703 __func__, __LINE__, error); 704 705 /* Lock the receive sequence */ 706 mutex_lock(&cinfo->recv_mutex); 707 return error; 708 } 709 710 /* lock_comm() 711 * Sets the MD_CLUSTER_SEND_LOCK bit to lock the send channel. 712 */ 713 static int lock_comm(struct md_cluster_info *cinfo, bool mddev_locked) 714 { 715 wait_event(cinfo->wait, 716 !test_and_set_bit(MD_CLUSTER_SEND_LOCK, &cinfo->state)); 717 718 return lock_token(cinfo, mddev_locked); 719 } 720 721 static void unlock_comm(struct md_cluster_info *cinfo) 722 { 723 WARN_ON(cinfo->token_lockres->mode != DLM_LOCK_EX); 724 mutex_unlock(&cinfo->recv_mutex); 725 dlm_unlock_sync(cinfo->token_lockres); 726 clear_bit(MD_CLUSTER_SEND_LOCK, &cinfo->state); 727 wake_up(&cinfo->wait); 728 } 729 730 /* __sendmsg() 731 * This function performs the actual sending of the message. This function is 732 * usually called after performing the encompassing operation 733 * The function: 734 * 1. Grabs the message lockresource in EX mode 735 * 2. Copies the message to the message LVB 736 * 3. Downconverts message lockresource to CW 737 * 4. Upconverts ack lock resource from CR to EX. This forces the BAST on other nodes 738 * and the other nodes read the message. The thread will wait here until all other 739 * nodes have released ack lock resource. 740 * 5. Downconvert ack lockresource to CR 741 */ 742 static int __sendmsg(struct md_cluster_info *cinfo, struct cluster_msg *cmsg) 743 { 744 int error; 745 int slot = cinfo->slot_number - 1; 746 747 cmsg->slot = cpu_to_le32(slot); 748 /*get EX on Message*/ 749 error = dlm_lock_sync(cinfo->message_lockres, DLM_LOCK_EX); 750 if (error) { 751 pr_err("md-cluster: failed to get EX on MESSAGE (%d)\n", error); 752 goto failed_message; 753 } 754 755 memcpy(cinfo->message_lockres->lksb.sb_lvbptr, (void *)cmsg, 756 sizeof(struct cluster_msg)); 757 /*down-convert EX to CW on Message*/ 758 error = dlm_lock_sync(cinfo->message_lockres, DLM_LOCK_CW); 759 if (error) { 760 pr_err("md-cluster: failed to convert EX to CW on MESSAGE(%d)\n", 761 error); 762 goto failed_ack; 763 } 764 765 /*up-convert CR to EX on Ack*/ 766 error = dlm_lock_sync(cinfo->ack_lockres, DLM_LOCK_EX); 767 if (error) { 768 pr_err("md-cluster: failed to convert CR to EX on ACK(%d)\n", 769 error); 770 goto failed_ack; 771 } 772 773 /*down-convert EX to CR on Ack*/ 774 error = dlm_lock_sync(cinfo->ack_lockres, DLM_LOCK_CR); 775 if (error) { 776 pr_err("md-cluster: failed to convert EX to CR on ACK(%d)\n", 777 error); 778 goto failed_ack; 779 } 780 781 failed_ack: 782 error = dlm_unlock_sync(cinfo->message_lockres); 783 if (unlikely(error != 0)) { 784 pr_err("md-cluster: failed convert to NL on MESSAGE(%d)\n", 785 error); 786 /* in case the message can't be released due to some reason */ 787 goto failed_ack; 788 } 789 failed_message: 790 return error; 791 } 792 793 static int sendmsg(struct md_cluster_info *cinfo, struct cluster_msg *cmsg, 794 bool mddev_locked) 795 { 796 int ret; 797 798 lock_comm(cinfo, mddev_locked); 799 ret = __sendmsg(cinfo, cmsg); 800 unlock_comm(cinfo); 801 return ret; 802 } 803 804 static int gather_all_resync_info(struct mddev *mddev, int total_slots) 805 { 806 struct md_cluster_info *cinfo = mddev->cluster_info; 807 int i, ret = 0; 808 struct dlm_lock_resource *bm_lockres; 809 struct suspend_info *s; 810 char str[64]; 811 sector_t lo, hi; 812 813 814 for (i = 0; i < total_slots; i++) { 815 memset(str, '\0', 64); 816 snprintf(str, 64, "bitmap%04d", i); 817 bm_lockres = lockres_init(mddev, str, NULL, 1); 818 if (!bm_lockres) 819 return -ENOMEM; 820 if (i == (cinfo->slot_number - 1)) { 821 lockres_free(bm_lockres); 822 continue; 823 } 824 825 bm_lockres->flags |= DLM_LKF_NOQUEUE; 826 ret = dlm_lock_sync(bm_lockres, DLM_LOCK_PW); 827 if (ret == -EAGAIN) { 828 s = read_resync_info(mddev, bm_lockres); 829 if (s) { 830 pr_info("%s:%d Resync[%llu..%llu] in progress on %d\n", 831 __func__, __LINE__, 832 (unsigned long long) s->lo, 833 (unsigned long long) s->hi, i); 834 spin_lock_irq(&cinfo->suspend_lock); 835 s->slot = i; 836 list_add(&s->list, &cinfo->suspend_list); 837 spin_unlock_irq(&cinfo->suspend_lock); 838 } 839 ret = 0; 840 lockres_free(bm_lockres); 841 continue; 842 } 843 if (ret) { 844 lockres_free(bm_lockres); 845 goto out; 846 } 847 848 /* Read the disk bitmap sb and check if it needs recovery */ 849 ret = md_bitmap_copy_from_slot(mddev, i, &lo, &hi, false); 850 if (ret) { 851 pr_warn("md-cluster: Could not gather bitmaps from slot %d", i); 852 lockres_free(bm_lockres); 853 continue; 854 } 855 if ((hi > 0) && (lo < mddev->recovery_cp)) { 856 set_bit(MD_RECOVERY_NEEDED, &mddev->recovery); 857 mddev->recovery_cp = lo; 858 md_check_recovery(mddev); 859 } 860 861 lockres_free(bm_lockres); 862 } 863 out: 864 return ret; 865 } 866 867 static int join(struct mddev *mddev, int nodes) 868 { 869 struct md_cluster_info *cinfo; 870 int ret, ops_rv; 871 char str[64]; 872 873 cinfo = kzalloc(sizeof(struct md_cluster_info), GFP_KERNEL); 874 if (!cinfo) 875 return -ENOMEM; 876 877 INIT_LIST_HEAD(&cinfo->suspend_list); 878 spin_lock_init(&cinfo->suspend_lock); 879 init_completion(&cinfo->completion); 880 set_bit(MD_CLUSTER_BEGIN_JOIN_CLUSTER, &cinfo->state); 881 init_waitqueue_head(&cinfo->wait); 882 mutex_init(&cinfo->recv_mutex); 883 884 mddev->cluster_info = cinfo; 885 cinfo->mddev = mddev; 886 887 memset(str, 0, 64); 888 sprintf(str, "%pU", mddev->uuid); 889 ret = dlm_new_lockspace(str, mddev->bitmap_info.cluster_name, 890 DLM_LSFL_FS, LVB_SIZE, 891 &md_ls_ops, mddev, &ops_rv, &cinfo->lockspace); 892 if (ret) 893 goto err; 894 wait_for_completion(&cinfo->completion); 895 if (nodes < cinfo->slot_number) { 896 pr_err("md-cluster: Slot allotted(%d) is greater than available slots(%d).", 897 cinfo->slot_number, nodes); 898 ret = -ERANGE; 899 goto err; 900 } 901 /* Initiate the communication resources */ 902 ret = -ENOMEM; 903 cinfo->recv_thread = md_register_thread(recv_daemon, mddev, "cluster_recv"); 904 if (!cinfo->recv_thread) { 905 pr_err("md-cluster: cannot allocate memory for recv_thread!\n"); 906 goto err; 907 } 908 cinfo->message_lockres = lockres_init(mddev, "message", NULL, 1); 909 if (!cinfo->message_lockres) 910 goto err; 911 cinfo->token_lockres = lockres_init(mddev, "token", NULL, 0); 912 if (!cinfo->token_lockres) 913 goto err; 914 cinfo->no_new_dev_lockres = lockres_init(mddev, "no-new-dev", NULL, 0); 915 if (!cinfo->no_new_dev_lockres) 916 goto err; 917 918 ret = dlm_lock_sync(cinfo->token_lockres, DLM_LOCK_EX); 919 if (ret) { 920 ret = -EAGAIN; 921 pr_err("md-cluster: can't join cluster to avoid lock issue\n"); 922 goto err; 923 } 924 cinfo->ack_lockres = lockres_init(mddev, "ack", ack_bast, 0); 925 if (!cinfo->ack_lockres) { 926 ret = -ENOMEM; 927 goto err; 928 } 929 /* get sync CR lock on ACK. */ 930 if (dlm_lock_sync(cinfo->ack_lockres, DLM_LOCK_CR)) 931 pr_err("md-cluster: failed to get a sync CR lock on ACK!(%d)\n", 932 ret); 933 dlm_unlock_sync(cinfo->token_lockres); 934 /* get sync CR lock on no-new-dev. */ 935 if (dlm_lock_sync(cinfo->no_new_dev_lockres, DLM_LOCK_CR)) 936 pr_err("md-cluster: failed to get a sync CR lock on no-new-dev!(%d)\n", ret); 937 938 939 pr_info("md-cluster: Joined cluster %s slot %d\n", str, cinfo->slot_number); 940 snprintf(str, 64, "bitmap%04d", cinfo->slot_number - 1); 941 cinfo->bitmap_lockres = lockres_init(mddev, str, NULL, 1); 942 if (!cinfo->bitmap_lockres) { 943 ret = -ENOMEM; 944 goto err; 945 } 946 if (dlm_lock_sync(cinfo->bitmap_lockres, DLM_LOCK_PW)) { 947 pr_err("Failed to get bitmap lock\n"); 948 ret = -EINVAL; 949 goto err; 950 } 951 952 cinfo->resync_lockres = lockres_init(mddev, "resync", NULL, 0); 953 if (!cinfo->resync_lockres) { 954 ret = -ENOMEM; 955 goto err; 956 } 957 958 return 0; 959 err: 960 set_bit(MD_CLUSTER_HOLDING_MUTEX_FOR_RECVD, &cinfo->state); 961 md_unregister_thread(&cinfo->recovery_thread); 962 md_unregister_thread(&cinfo->recv_thread); 963 lockres_free(cinfo->message_lockres); 964 lockres_free(cinfo->token_lockres); 965 lockres_free(cinfo->ack_lockres); 966 lockres_free(cinfo->no_new_dev_lockres); 967 lockres_free(cinfo->resync_lockres); 968 lockres_free(cinfo->bitmap_lockres); 969 if (cinfo->lockspace) 970 dlm_release_lockspace(cinfo->lockspace, 2); 971 mddev->cluster_info = NULL; 972 kfree(cinfo); 973 return ret; 974 } 975 976 static void load_bitmaps(struct mddev *mddev, int total_slots) 977 { 978 struct md_cluster_info *cinfo = mddev->cluster_info; 979 980 /* load all the node's bitmap info for resync */ 981 if (gather_all_resync_info(mddev, total_slots)) 982 pr_err("md-cluster: failed to gather all resyn infos\n"); 983 set_bit(MD_CLUSTER_ALREADY_IN_CLUSTER, &cinfo->state); 984 /* wake up recv thread in case something need to be handled */ 985 if (test_and_clear_bit(MD_CLUSTER_PENDING_RECV_EVENT, &cinfo->state)) 986 md_wakeup_thread(cinfo->recv_thread); 987 } 988 989 static void resync_bitmap(struct mddev *mddev) 990 { 991 struct md_cluster_info *cinfo = mddev->cluster_info; 992 struct cluster_msg cmsg = {0}; 993 int err; 994 995 cmsg.type = cpu_to_le32(BITMAP_NEEDS_SYNC); 996 err = sendmsg(cinfo, &cmsg, 1); 997 if (err) 998 pr_err("%s:%d: failed to send BITMAP_NEEDS_SYNC message (%d)\n", 999 __func__, __LINE__, err); 1000 } 1001 1002 static void unlock_all_bitmaps(struct mddev *mddev); 1003 static int leave(struct mddev *mddev) 1004 { 1005 struct md_cluster_info *cinfo = mddev->cluster_info; 1006 1007 if (!cinfo) 1008 return 0; 1009 1010 /* BITMAP_NEEDS_SYNC message should be sent when node 1011 * is leaving the cluster with dirty bitmap, also we 1012 * can only deliver it when dlm connection is available */ 1013 if (cinfo->slot_number > 0 && mddev->recovery_cp != MaxSector) 1014 resync_bitmap(mddev); 1015 1016 set_bit(MD_CLUSTER_HOLDING_MUTEX_FOR_RECVD, &cinfo->state); 1017 md_unregister_thread(&cinfo->recovery_thread); 1018 md_unregister_thread(&cinfo->recv_thread); 1019 lockres_free(cinfo->message_lockres); 1020 lockres_free(cinfo->token_lockres); 1021 lockres_free(cinfo->ack_lockres); 1022 lockres_free(cinfo->no_new_dev_lockres); 1023 lockres_free(cinfo->resync_lockres); 1024 lockres_free(cinfo->bitmap_lockres); 1025 unlock_all_bitmaps(mddev); 1026 dlm_release_lockspace(cinfo->lockspace, 2); 1027 kfree(cinfo); 1028 return 0; 1029 } 1030 1031 /* slot_number(): Returns the MD slot number to use 1032 * DLM starts the slot numbers from 1, wheras cluster-md 1033 * wants the number to be from zero, so we deduct one 1034 */ 1035 static int slot_number(struct mddev *mddev) 1036 { 1037 struct md_cluster_info *cinfo = mddev->cluster_info; 1038 1039 return cinfo->slot_number - 1; 1040 } 1041 1042 /* 1043 * Check if the communication is already locked, else lock the communication 1044 * channel. 1045 * If it is already locked, token is in EX mode, and hence lock_token() 1046 * should not be called. 1047 */ 1048 static int metadata_update_start(struct mddev *mddev) 1049 { 1050 struct md_cluster_info *cinfo = mddev->cluster_info; 1051 int ret; 1052 1053 /* 1054 * metadata_update_start is always called with the protection of 1055 * reconfig_mutex, so set WAITING_FOR_TOKEN here. 1056 */ 1057 ret = test_and_set_bit_lock(MD_CLUSTER_HOLDING_MUTEX_FOR_RECVD, 1058 &cinfo->state); 1059 WARN_ON_ONCE(ret); 1060 md_wakeup_thread(mddev->thread); 1061 1062 wait_event(cinfo->wait, 1063 !test_and_set_bit(MD_CLUSTER_SEND_LOCK, &cinfo->state) || 1064 test_and_clear_bit(MD_CLUSTER_SEND_LOCKED_ALREADY, &cinfo->state)); 1065 1066 /* If token is already locked, return 0 */ 1067 if (cinfo->token_lockres->mode == DLM_LOCK_EX) { 1068 clear_bit_unlock(MD_CLUSTER_HOLDING_MUTEX_FOR_RECVD, &cinfo->state); 1069 return 0; 1070 } 1071 1072 ret = lock_token(cinfo, 1); 1073 clear_bit_unlock(MD_CLUSTER_HOLDING_MUTEX_FOR_RECVD, &cinfo->state); 1074 return ret; 1075 } 1076 1077 static int metadata_update_finish(struct mddev *mddev) 1078 { 1079 struct md_cluster_info *cinfo = mddev->cluster_info; 1080 struct cluster_msg cmsg; 1081 struct md_rdev *rdev; 1082 int ret = 0; 1083 int raid_slot = -1; 1084 1085 memset(&cmsg, 0, sizeof(cmsg)); 1086 cmsg.type = cpu_to_le32(METADATA_UPDATED); 1087 /* Pick up a good active device number to send. 1088 */ 1089 rdev_for_each(rdev, mddev) 1090 if (rdev->raid_disk > -1 && !test_bit(Faulty, &rdev->flags)) { 1091 raid_slot = rdev->desc_nr; 1092 break; 1093 } 1094 if (raid_slot >= 0) { 1095 cmsg.raid_slot = cpu_to_le32(raid_slot); 1096 ret = __sendmsg(cinfo, &cmsg); 1097 } else 1098 pr_warn("md-cluster: No good device id found to send\n"); 1099 clear_bit(MD_CLUSTER_SEND_LOCKED_ALREADY, &cinfo->state); 1100 unlock_comm(cinfo); 1101 return ret; 1102 } 1103 1104 static void metadata_update_cancel(struct mddev *mddev) 1105 { 1106 struct md_cluster_info *cinfo = mddev->cluster_info; 1107 clear_bit(MD_CLUSTER_SEND_LOCKED_ALREADY, &cinfo->state); 1108 unlock_comm(cinfo); 1109 } 1110 1111 static int update_bitmap_size(struct mddev *mddev, sector_t size) 1112 { 1113 struct md_cluster_info *cinfo = mddev->cluster_info; 1114 struct cluster_msg cmsg = {0}; 1115 int ret; 1116 1117 cmsg.type = cpu_to_le32(BITMAP_RESIZE); 1118 cmsg.high = cpu_to_le64(size); 1119 ret = sendmsg(cinfo, &cmsg, 0); 1120 if (ret) 1121 pr_err("%s:%d: failed to send BITMAP_RESIZE message (%d)\n", 1122 __func__, __LINE__, ret); 1123 return ret; 1124 } 1125 1126 static int resize_bitmaps(struct mddev *mddev, sector_t newsize, sector_t oldsize) 1127 { 1128 struct bitmap_counts *counts; 1129 char str[64]; 1130 struct dlm_lock_resource *bm_lockres; 1131 struct bitmap *bitmap = mddev->bitmap; 1132 unsigned long my_pages = bitmap->counts.pages; 1133 int i, rv; 1134 1135 /* 1136 * We need to ensure all the nodes can grow to a larger 1137 * bitmap size before make the reshaping. 1138 */ 1139 rv = update_bitmap_size(mddev, newsize); 1140 if (rv) 1141 return rv; 1142 1143 for (i = 0; i < mddev->bitmap_info.nodes; i++) { 1144 if (i == md_cluster_ops->slot_number(mddev)) 1145 continue; 1146 1147 bitmap = get_bitmap_from_slot(mddev, i); 1148 if (IS_ERR(bitmap)) { 1149 pr_err("can't get bitmap from slot %d\n", i); 1150 goto out; 1151 } 1152 counts = &bitmap->counts; 1153 1154 /* 1155 * If we can hold the bitmap lock of one node then 1156 * the slot is not occupied, update the pages. 1157 */ 1158 snprintf(str, 64, "bitmap%04d", i); 1159 bm_lockres = lockres_init(mddev, str, NULL, 1); 1160 if (!bm_lockres) { 1161 pr_err("Cannot initialize %s lock\n", str); 1162 goto out; 1163 } 1164 bm_lockres->flags |= DLM_LKF_NOQUEUE; 1165 rv = dlm_lock_sync(bm_lockres, DLM_LOCK_PW); 1166 if (!rv) 1167 counts->pages = my_pages; 1168 lockres_free(bm_lockres); 1169 1170 if (my_pages != counts->pages) 1171 /* 1172 * Let's revert the bitmap size if one node 1173 * can't resize bitmap 1174 */ 1175 goto out; 1176 } 1177 1178 return 0; 1179 out: 1180 md_bitmap_free(bitmap); 1181 update_bitmap_size(mddev, oldsize); 1182 return -1; 1183 } 1184 1185 /* 1186 * return 0 if all the bitmaps have the same sync_size 1187 */ 1188 static int cluster_check_sync_size(struct mddev *mddev) 1189 { 1190 int i, rv; 1191 bitmap_super_t *sb; 1192 unsigned long my_sync_size, sync_size = 0; 1193 int node_num = mddev->bitmap_info.nodes; 1194 int current_slot = md_cluster_ops->slot_number(mddev); 1195 struct bitmap *bitmap = mddev->bitmap; 1196 char str[64]; 1197 struct dlm_lock_resource *bm_lockres; 1198 1199 sb = kmap_atomic(bitmap->storage.sb_page); 1200 my_sync_size = sb->sync_size; 1201 kunmap_atomic(sb); 1202 1203 for (i = 0; i < node_num; i++) { 1204 if (i == current_slot) 1205 continue; 1206 1207 bitmap = get_bitmap_from_slot(mddev, i); 1208 if (IS_ERR(bitmap)) { 1209 pr_err("can't get bitmap from slot %d\n", i); 1210 return -1; 1211 } 1212 1213 /* 1214 * If we can hold the bitmap lock of one node then 1215 * the slot is not occupied, update the sb. 1216 */ 1217 snprintf(str, 64, "bitmap%04d", i); 1218 bm_lockres = lockres_init(mddev, str, NULL, 1); 1219 if (!bm_lockres) { 1220 pr_err("md-cluster: Cannot initialize %s\n", str); 1221 md_bitmap_free(bitmap); 1222 return -1; 1223 } 1224 bm_lockres->flags |= DLM_LKF_NOQUEUE; 1225 rv = dlm_lock_sync(bm_lockres, DLM_LOCK_PW); 1226 if (!rv) 1227 md_bitmap_update_sb(bitmap); 1228 lockres_free(bm_lockres); 1229 1230 sb = kmap_atomic(bitmap->storage.sb_page); 1231 if (sync_size == 0) 1232 sync_size = sb->sync_size; 1233 else if (sync_size != sb->sync_size) { 1234 kunmap_atomic(sb); 1235 md_bitmap_free(bitmap); 1236 return -1; 1237 } 1238 kunmap_atomic(sb); 1239 md_bitmap_free(bitmap); 1240 } 1241 1242 return (my_sync_size == sync_size) ? 0 : -1; 1243 } 1244 1245 /* 1246 * Update the size for cluster raid is a little more complex, we perform it 1247 * by the steps: 1248 * 1. hold token lock and update superblock in initiator node. 1249 * 2. send METADATA_UPDATED msg to other nodes. 1250 * 3. The initiator node continues to check each bitmap's sync_size, if all 1251 * bitmaps have the same value of sync_size, then we can set capacity and 1252 * let other nodes to perform it. If one node can't update sync_size 1253 * accordingly, we need to revert to previous value. 1254 */ 1255 static void update_size(struct mddev *mddev, sector_t old_dev_sectors) 1256 { 1257 struct md_cluster_info *cinfo = mddev->cluster_info; 1258 struct cluster_msg cmsg; 1259 struct md_rdev *rdev; 1260 int ret = 0; 1261 int raid_slot = -1; 1262 1263 md_update_sb(mddev, 1); 1264 lock_comm(cinfo, 1); 1265 1266 memset(&cmsg, 0, sizeof(cmsg)); 1267 cmsg.type = cpu_to_le32(METADATA_UPDATED); 1268 rdev_for_each(rdev, mddev) 1269 if (rdev->raid_disk >= 0 && !test_bit(Faulty, &rdev->flags)) { 1270 raid_slot = rdev->desc_nr; 1271 break; 1272 } 1273 if (raid_slot >= 0) { 1274 cmsg.raid_slot = cpu_to_le32(raid_slot); 1275 /* 1276 * We can only change capiticy after all the nodes can do it, 1277 * so need to wait after other nodes already received the msg 1278 * and handled the change 1279 */ 1280 ret = __sendmsg(cinfo, &cmsg); 1281 if (ret) { 1282 pr_err("%s:%d: failed to send METADATA_UPDATED msg\n", 1283 __func__, __LINE__); 1284 unlock_comm(cinfo); 1285 return; 1286 } 1287 } else { 1288 pr_err("md-cluster: No good device id found to send\n"); 1289 unlock_comm(cinfo); 1290 return; 1291 } 1292 1293 /* 1294 * check the sync_size from other node's bitmap, if sync_size 1295 * have already updated in other nodes as expected, send an 1296 * empty metadata msg to permit the change of capacity 1297 */ 1298 if (cluster_check_sync_size(mddev) == 0) { 1299 memset(&cmsg, 0, sizeof(cmsg)); 1300 cmsg.type = cpu_to_le32(CHANGE_CAPACITY); 1301 ret = __sendmsg(cinfo, &cmsg); 1302 if (ret) 1303 pr_err("%s:%d: failed to send CHANGE_CAPACITY msg\n", 1304 __func__, __LINE__); 1305 set_capacity(mddev->gendisk, mddev->array_sectors); 1306 revalidate_disk(mddev->gendisk); 1307 } else { 1308 /* revert to previous sectors */ 1309 ret = mddev->pers->resize(mddev, old_dev_sectors); 1310 if (!ret) 1311 revalidate_disk(mddev->gendisk); 1312 ret = __sendmsg(cinfo, &cmsg); 1313 if (ret) 1314 pr_err("%s:%d: failed to send METADATA_UPDATED msg\n", 1315 __func__, __LINE__); 1316 } 1317 unlock_comm(cinfo); 1318 } 1319 1320 static int resync_start(struct mddev *mddev) 1321 { 1322 struct md_cluster_info *cinfo = mddev->cluster_info; 1323 return dlm_lock_sync_interruptible(cinfo->resync_lockres, DLM_LOCK_EX, mddev); 1324 } 1325 1326 static int resync_info_update(struct mddev *mddev, sector_t lo, sector_t hi) 1327 { 1328 struct md_cluster_info *cinfo = mddev->cluster_info; 1329 struct resync_info ri; 1330 struct cluster_msg cmsg = {0}; 1331 1332 /* do not send zero again, if we have sent before */ 1333 if (hi == 0) { 1334 memcpy(&ri, cinfo->bitmap_lockres->lksb.sb_lvbptr, sizeof(struct resync_info)); 1335 if (le64_to_cpu(ri.hi) == 0) 1336 return 0; 1337 } 1338 1339 add_resync_info(cinfo->bitmap_lockres, lo, hi); 1340 /* Re-acquire the lock to refresh LVB */ 1341 dlm_lock_sync(cinfo->bitmap_lockres, DLM_LOCK_PW); 1342 cmsg.type = cpu_to_le32(RESYNCING); 1343 cmsg.low = cpu_to_le64(lo); 1344 cmsg.high = cpu_to_le64(hi); 1345 1346 /* 1347 * mddev_lock is held if resync_info_update is called from 1348 * resync_finish (md_reap_sync_thread -> resync_finish) 1349 */ 1350 if (lo == 0 && hi == 0) 1351 return sendmsg(cinfo, &cmsg, 1); 1352 else 1353 return sendmsg(cinfo, &cmsg, 0); 1354 } 1355 1356 static int resync_finish(struct mddev *mddev) 1357 { 1358 struct md_cluster_info *cinfo = mddev->cluster_info; 1359 int ret = 0; 1360 1361 clear_bit(MD_RESYNCING_REMOTE, &mddev->recovery); 1362 1363 /* 1364 * If resync thread is interrupted so we can't say resync is finished, 1365 * another node will launch resync thread to continue. 1366 */ 1367 if (!test_bit(MD_CLOSING, &mddev->flags)) 1368 ret = resync_info_update(mddev, 0, 0); 1369 dlm_unlock_sync(cinfo->resync_lockres); 1370 return ret; 1371 } 1372 1373 static int area_resyncing(struct mddev *mddev, int direction, 1374 sector_t lo, sector_t hi) 1375 { 1376 struct md_cluster_info *cinfo = mddev->cluster_info; 1377 int ret = 0; 1378 struct suspend_info *s; 1379 1380 if ((direction == READ) && 1381 test_bit(MD_CLUSTER_SUSPEND_READ_BALANCING, &cinfo->state)) 1382 return 1; 1383 1384 spin_lock_irq(&cinfo->suspend_lock); 1385 if (list_empty(&cinfo->suspend_list)) 1386 goto out; 1387 list_for_each_entry(s, &cinfo->suspend_list, list) 1388 if (hi > s->lo && lo < s->hi) { 1389 ret = 1; 1390 break; 1391 } 1392 out: 1393 spin_unlock_irq(&cinfo->suspend_lock); 1394 return ret; 1395 } 1396 1397 /* add_new_disk() - initiates a disk add 1398 * However, if this fails before writing md_update_sb(), 1399 * add_new_disk_cancel() must be called to release token lock 1400 */ 1401 static int add_new_disk(struct mddev *mddev, struct md_rdev *rdev) 1402 { 1403 struct md_cluster_info *cinfo = mddev->cluster_info; 1404 struct cluster_msg cmsg; 1405 int ret = 0; 1406 struct mdp_superblock_1 *sb = page_address(rdev->sb_page); 1407 char *uuid = sb->device_uuid; 1408 1409 memset(&cmsg, 0, sizeof(cmsg)); 1410 cmsg.type = cpu_to_le32(NEWDISK); 1411 memcpy(cmsg.uuid, uuid, 16); 1412 cmsg.raid_slot = cpu_to_le32(rdev->desc_nr); 1413 lock_comm(cinfo, 1); 1414 ret = __sendmsg(cinfo, &cmsg); 1415 if (ret) { 1416 unlock_comm(cinfo); 1417 return ret; 1418 } 1419 cinfo->no_new_dev_lockres->flags |= DLM_LKF_NOQUEUE; 1420 ret = dlm_lock_sync(cinfo->no_new_dev_lockres, DLM_LOCK_EX); 1421 cinfo->no_new_dev_lockres->flags &= ~DLM_LKF_NOQUEUE; 1422 /* Some node does not "see" the device */ 1423 if (ret == -EAGAIN) 1424 ret = -ENOENT; 1425 if (ret) 1426 unlock_comm(cinfo); 1427 else { 1428 dlm_lock_sync(cinfo->no_new_dev_lockres, DLM_LOCK_CR); 1429 /* Since MD_CHANGE_DEVS will be set in add_bound_rdev which 1430 * will run soon after add_new_disk, the below path will be 1431 * invoked: 1432 * md_wakeup_thread(mddev->thread) 1433 * -> conf->thread (raid1d) 1434 * -> md_check_recovery -> md_update_sb 1435 * -> metadata_update_start/finish 1436 * MD_CLUSTER_SEND_LOCKED_ALREADY will be cleared eventually. 1437 * 1438 * For other failure cases, metadata_update_cancel and 1439 * add_new_disk_cancel also clear below bit as well. 1440 * */ 1441 set_bit(MD_CLUSTER_SEND_LOCKED_ALREADY, &cinfo->state); 1442 wake_up(&cinfo->wait); 1443 } 1444 return ret; 1445 } 1446 1447 static void add_new_disk_cancel(struct mddev *mddev) 1448 { 1449 struct md_cluster_info *cinfo = mddev->cluster_info; 1450 clear_bit(MD_CLUSTER_SEND_LOCKED_ALREADY, &cinfo->state); 1451 unlock_comm(cinfo); 1452 } 1453 1454 static int new_disk_ack(struct mddev *mddev, bool ack) 1455 { 1456 struct md_cluster_info *cinfo = mddev->cluster_info; 1457 1458 if (!test_bit(MD_CLUSTER_WAITING_FOR_NEWDISK, &cinfo->state)) { 1459 pr_warn("md-cluster(%s): Spurious cluster confirmation\n", mdname(mddev)); 1460 return -EINVAL; 1461 } 1462 1463 if (ack) 1464 dlm_unlock_sync(cinfo->no_new_dev_lockres); 1465 complete(&cinfo->newdisk_completion); 1466 return 0; 1467 } 1468 1469 static int remove_disk(struct mddev *mddev, struct md_rdev *rdev) 1470 { 1471 struct cluster_msg cmsg = {0}; 1472 struct md_cluster_info *cinfo = mddev->cluster_info; 1473 cmsg.type = cpu_to_le32(REMOVE); 1474 cmsg.raid_slot = cpu_to_le32(rdev->desc_nr); 1475 return sendmsg(cinfo, &cmsg, 1); 1476 } 1477 1478 static int lock_all_bitmaps(struct mddev *mddev) 1479 { 1480 int slot, my_slot, ret, held = 1, i = 0; 1481 char str[64]; 1482 struct md_cluster_info *cinfo = mddev->cluster_info; 1483 1484 cinfo->other_bitmap_lockres = 1485 kcalloc(mddev->bitmap_info.nodes - 1, 1486 sizeof(struct dlm_lock_resource *), GFP_KERNEL); 1487 if (!cinfo->other_bitmap_lockres) { 1488 pr_err("md: can't alloc mem for other bitmap locks\n"); 1489 return 0; 1490 } 1491 1492 my_slot = slot_number(mddev); 1493 for (slot = 0; slot < mddev->bitmap_info.nodes; slot++) { 1494 if (slot == my_slot) 1495 continue; 1496 1497 memset(str, '\0', 64); 1498 snprintf(str, 64, "bitmap%04d", slot); 1499 cinfo->other_bitmap_lockres[i] = lockres_init(mddev, str, NULL, 1); 1500 if (!cinfo->other_bitmap_lockres[i]) 1501 return -ENOMEM; 1502 1503 cinfo->other_bitmap_lockres[i]->flags |= DLM_LKF_NOQUEUE; 1504 ret = dlm_lock_sync(cinfo->other_bitmap_lockres[i], DLM_LOCK_PW); 1505 if (ret) 1506 held = -1; 1507 i++; 1508 } 1509 1510 return held; 1511 } 1512 1513 static void unlock_all_bitmaps(struct mddev *mddev) 1514 { 1515 struct md_cluster_info *cinfo = mddev->cluster_info; 1516 int i; 1517 1518 /* release other node's bitmap lock if they are existed */ 1519 if (cinfo->other_bitmap_lockres) { 1520 for (i = 0; i < mddev->bitmap_info.nodes - 1; i++) { 1521 if (cinfo->other_bitmap_lockres[i]) { 1522 lockres_free(cinfo->other_bitmap_lockres[i]); 1523 } 1524 } 1525 kfree(cinfo->other_bitmap_lockres); 1526 } 1527 } 1528 1529 static int gather_bitmaps(struct md_rdev *rdev) 1530 { 1531 int sn, err; 1532 sector_t lo, hi; 1533 struct cluster_msg cmsg = {0}; 1534 struct mddev *mddev = rdev->mddev; 1535 struct md_cluster_info *cinfo = mddev->cluster_info; 1536 1537 cmsg.type = cpu_to_le32(RE_ADD); 1538 cmsg.raid_slot = cpu_to_le32(rdev->desc_nr); 1539 err = sendmsg(cinfo, &cmsg, 1); 1540 if (err) 1541 goto out; 1542 1543 for (sn = 0; sn < mddev->bitmap_info.nodes; sn++) { 1544 if (sn == (cinfo->slot_number - 1)) 1545 continue; 1546 err = md_bitmap_copy_from_slot(mddev, sn, &lo, &hi, false); 1547 if (err) { 1548 pr_warn("md-cluster: Could not gather bitmaps from slot %d", sn); 1549 goto out; 1550 } 1551 if ((hi > 0) && (lo < mddev->recovery_cp)) 1552 mddev->recovery_cp = lo; 1553 } 1554 out: 1555 return err; 1556 } 1557 1558 static struct md_cluster_operations cluster_ops = { 1559 .join = join, 1560 .leave = leave, 1561 .slot_number = slot_number, 1562 .resync_start = resync_start, 1563 .resync_finish = resync_finish, 1564 .resync_info_update = resync_info_update, 1565 .metadata_update_start = metadata_update_start, 1566 .metadata_update_finish = metadata_update_finish, 1567 .metadata_update_cancel = metadata_update_cancel, 1568 .area_resyncing = area_resyncing, 1569 .add_new_disk = add_new_disk, 1570 .add_new_disk_cancel = add_new_disk_cancel, 1571 .new_disk_ack = new_disk_ack, 1572 .remove_disk = remove_disk, 1573 .load_bitmaps = load_bitmaps, 1574 .gather_bitmaps = gather_bitmaps, 1575 .resize_bitmaps = resize_bitmaps, 1576 .lock_all_bitmaps = lock_all_bitmaps, 1577 .unlock_all_bitmaps = unlock_all_bitmaps, 1578 .update_size = update_size, 1579 }; 1580 1581 static int __init cluster_init(void) 1582 { 1583 pr_warn("md-cluster: support raid1 and raid10 (limited support)\n"); 1584 pr_info("Registering Cluster MD functions\n"); 1585 register_md_cluster_operations(&cluster_ops, THIS_MODULE); 1586 return 0; 1587 } 1588 1589 static void cluster_exit(void) 1590 { 1591 unregister_md_cluster_operations(); 1592 } 1593 1594 module_init(cluster_init); 1595 module_exit(cluster_exit); 1596 MODULE_AUTHOR("SUSE"); 1597 MODULE_LICENSE("GPL"); 1598 MODULE_DESCRIPTION("Clustering support for MD"); 1599