1 /* 2 * Copyright (C) 2015, SUSE 3 * 4 * This program is free software; you can redistribute it and/or modify 5 * it under the terms of the GNU General Public License as published by 6 * the Free Software Foundation; either version 2, or (at your option) 7 * any later version. 8 * 9 */ 10 11 12 #include <linux/module.h> 13 #include <linux/kthread.h> 14 #include <linux/dlm.h> 15 #include <linux/sched.h> 16 #include <linux/raid/md_p.h> 17 #include "md.h" 18 #include "md-bitmap.h" 19 #include "md-cluster.h" 20 21 #define LVB_SIZE 64 22 #define NEW_DEV_TIMEOUT 5000 23 24 struct dlm_lock_resource { 25 dlm_lockspace_t *ls; 26 struct dlm_lksb lksb; 27 char *name; /* lock name. */ 28 uint32_t flags; /* flags to pass to dlm_lock() */ 29 wait_queue_head_t sync_locking; /* wait queue for synchronized locking */ 30 bool sync_locking_done; 31 void (*bast)(void *arg, int mode); /* blocking AST function pointer*/ 32 struct mddev *mddev; /* pointing back to mddev. */ 33 int mode; 34 }; 35 36 struct resync_info { 37 __le64 lo; 38 __le64 hi; 39 }; 40 41 /* md_cluster_info flags */ 42 #define MD_CLUSTER_WAITING_FOR_NEWDISK 1 43 #define MD_CLUSTER_SUSPEND_READ_BALANCING 2 44 #define MD_CLUSTER_BEGIN_JOIN_CLUSTER 3 45 46 /* Lock the send communication. This is done through 47 * bit manipulation as opposed to a mutex in order to 48 * accomodate lock and hold. See next comment. 49 */ 50 #define MD_CLUSTER_SEND_LOCK 4 51 /* If cluster operations (such as adding a disk) must lock the 52 * communication channel, so as to perform extra operations 53 * (update metadata) and no other operation is allowed on the 54 * MD. Token needs to be locked and held until the operation 55 * completes witha md_update_sb(), which would eventually release 56 * the lock. 57 */ 58 #define MD_CLUSTER_SEND_LOCKED_ALREADY 5 59 /* We should receive message after node joined cluster and 60 * set up all the related infos such as bitmap and personality */ 61 #define MD_CLUSTER_ALREADY_IN_CLUSTER 6 62 #define MD_CLUSTER_PENDING_RECV_EVENT 7 63 #define MD_CLUSTER_HOLDING_MUTEX_FOR_RECVD 8 64 65 struct md_cluster_info { 66 struct mddev *mddev; /* the md device which md_cluster_info belongs to */ 67 /* dlm lock space and resources for clustered raid. */ 68 dlm_lockspace_t *lockspace; 69 int slot_number; 70 struct completion completion; 71 struct mutex recv_mutex; 72 struct dlm_lock_resource *bitmap_lockres; 73 struct dlm_lock_resource **other_bitmap_lockres; 74 struct dlm_lock_resource *resync_lockres; 75 struct list_head suspend_list; 76 77 spinlock_t suspend_lock; 78 /* record the region which write should be suspended */ 79 sector_t suspend_lo; 80 sector_t suspend_hi; 81 int suspend_from; /* the slot which broadcast suspend_lo/hi */ 82 83 struct md_thread *recovery_thread; 84 unsigned long recovery_map; 85 /* communication loc resources */ 86 struct dlm_lock_resource *ack_lockres; 87 struct dlm_lock_resource *message_lockres; 88 struct dlm_lock_resource *token_lockres; 89 struct dlm_lock_resource *no_new_dev_lockres; 90 struct md_thread *recv_thread; 91 struct completion newdisk_completion; 92 wait_queue_head_t wait; 93 unsigned long state; 94 /* record the region in RESYNCING message */ 95 sector_t sync_low; 96 sector_t sync_hi; 97 }; 98 99 enum msg_type { 100 METADATA_UPDATED = 0, 101 RESYNCING, 102 NEWDISK, 103 REMOVE, 104 RE_ADD, 105 BITMAP_NEEDS_SYNC, 106 CHANGE_CAPACITY, 107 BITMAP_RESIZE, 108 }; 109 110 struct cluster_msg { 111 __le32 type; 112 __le32 slot; 113 /* TODO: Unionize this for smaller footprint */ 114 __le64 low; 115 __le64 high; 116 char uuid[16]; 117 __le32 raid_slot; 118 }; 119 120 static void sync_ast(void *arg) 121 { 122 struct dlm_lock_resource *res; 123 124 res = arg; 125 res->sync_locking_done = true; 126 wake_up(&res->sync_locking); 127 } 128 129 static int dlm_lock_sync(struct dlm_lock_resource *res, int mode) 130 { 131 int ret = 0; 132 133 ret = dlm_lock(res->ls, mode, &res->lksb, 134 res->flags, res->name, strlen(res->name), 135 0, sync_ast, res, res->bast); 136 if (ret) 137 return ret; 138 wait_event(res->sync_locking, res->sync_locking_done); 139 res->sync_locking_done = false; 140 if (res->lksb.sb_status == 0) 141 res->mode = mode; 142 return res->lksb.sb_status; 143 } 144 145 static int dlm_unlock_sync(struct dlm_lock_resource *res) 146 { 147 return dlm_lock_sync(res, DLM_LOCK_NL); 148 } 149 150 /* 151 * An variation of dlm_lock_sync, which make lock request could 152 * be interrupted 153 */ 154 static int dlm_lock_sync_interruptible(struct dlm_lock_resource *res, int mode, 155 struct mddev *mddev) 156 { 157 int ret = 0; 158 159 ret = dlm_lock(res->ls, mode, &res->lksb, 160 res->flags, res->name, strlen(res->name), 161 0, sync_ast, res, res->bast); 162 if (ret) 163 return ret; 164 165 wait_event(res->sync_locking, res->sync_locking_done 166 || kthread_should_stop() 167 || test_bit(MD_CLOSING, &mddev->flags)); 168 if (!res->sync_locking_done) { 169 /* 170 * the convert queue contains the lock request when request is 171 * interrupted, and sync_ast could still be run, so need to 172 * cancel the request and reset completion 173 */ 174 ret = dlm_unlock(res->ls, res->lksb.sb_lkid, DLM_LKF_CANCEL, 175 &res->lksb, res); 176 res->sync_locking_done = false; 177 if (unlikely(ret != 0)) 178 pr_info("failed to cancel previous lock request " 179 "%s return %d\n", res->name, ret); 180 return -EPERM; 181 } else 182 res->sync_locking_done = false; 183 if (res->lksb.sb_status == 0) 184 res->mode = mode; 185 return res->lksb.sb_status; 186 } 187 188 static struct dlm_lock_resource *lockres_init(struct mddev *mddev, 189 char *name, void (*bastfn)(void *arg, int mode), int with_lvb) 190 { 191 struct dlm_lock_resource *res = NULL; 192 int ret, namelen; 193 struct md_cluster_info *cinfo = mddev->cluster_info; 194 195 res = kzalloc(sizeof(struct dlm_lock_resource), GFP_KERNEL); 196 if (!res) 197 return NULL; 198 init_waitqueue_head(&res->sync_locking); 199 res->sync_locking_done = false; 200 res->ls = cinfo->lockspace; 201 res->mddev = mddev; 202 res->mode = DLM_LOCK_IV; 203 namelen = strlen(name); 204 res->name = kzalloc(namelen + 1, GFP_KERNEL); 205 if (!res->name) { 206 pr_err("md-cluster: Unable to allocate resource name for resource %s\n", name); 207 goto out_err; 208 } 209 strlcpy(res->name, name, namelen + 1); 210 if (with_lvb) { 211 res->lksb.sb_lvbptr = kzalloc(LVB_SIZE, GFP_KERNEL); 212 if (!res->lksb.sb_lvbptr) { 213 pr_err("md-cluster: Unable to allocate LVB for resource %s\n", name); 214 goto out_err; 215 } 216 res->flags = DLM_LKF_VALBLK; 217 } 218 219 if (bastfn) 220 res->bast = bastfn; 221 222 res->flags |= DLM_LKF_EXPEDITE; 223 224 ret = dlm_lock_sync(res, DLM_LOCK_NL); 225 if (ret) { 226 pr_err("md-cluster: Unable to lock NL on new lock resource %s\n", name); 227 goto out_err; 228 } 229 res->flags &= ~DLM_LKF_EXPEDITE; 230 res->flags |= DLM_LKF_CONVERT; 231 232 return res; 233 out_err: 234 kfree(res->lksb.sb_lvbptr); 235 kfree(res->name); 236 kfree(res); 237 return NULL; 238 } 239 240 static void lockres_free(struct dlm_lock_resource *res) 241 { 242 int ret = 0; 243 244 if (!res) 245 return; 246 247 /* 248 * use FORCEUNLOCK flag, so we can unlock even the lock is on the 249 * waiting or convert queue 250 */ 251 ret = dlm_unlock(res->ls, res->lksb.sb_lkid, DLM_LKF_FORCEUNLOCK, 252 &res->lksb, res); 253 if (unlikely(ret != 0)) 254 pr_err("failed to unlock %s return %d\n", res->name, ret); 255 else 256 wait_event(res->sync_locking, res->sync_locking_done); 257 258 kfree(res->name); 259 kfree(res->lksb.sb_lvbptr); 260 kfree(res); 261 } 262 263 static void add_resync_info(struct dlm_lock_resource *lockres, 264 sector_t lo, sector_t hi) 265 { 266 struct resync_info *ri; 267 268 ri = (struct resync_info *)lockres->lksb.sb_lvbptr; 269 ri->lo = cpu_to_le64(lo); 270 ri->hi = cpu_to_le64(hi); 271 } 272 273 static int read_resync_info(struct mddev *mddev, 274 struct dlm_lock_resource *lockres) 275 { 276 struct resync_info ri; 277 struct md_cluster_info *cinfo = mddev->cluster_info; 278 int ret = 0; 279 280 dlm_lock_sync(lockres, DLM_LOCK_CR); 281 memcpy(&ri, lockres->lksb.sb_lvbptr, sizeof(struct resync_info)); 282 if (le64_to_cpu(ri.hi) > 0) { 283 cinfo->suspend_hi = le64_to_cpu(ri.hi); 284 cinfo->suspend_lo = le64_to_cpu(ri.lo); 285 ret = 1; 286 } 287 dlm_unlock_sync(lockres); 288 return ret; 289 } 290 291 static void recover_bitmaps(struct md_thread *thread) 292 { 293 struct mddev *mddev = thread->mddev; 294 struct md_cluster_info *cinfo = mddev->cluster_info; 295 struct dlm_lock_resource *bm_lockres; 296 char str[64]; 297 int slot, ret; 298 sector_t lo, hi; 299 300 while (cinfo->recovery_map) { 301 slot = fls64((u64)cinfo->recovery_map) - 1; 302 303 snprintf(str, 64, "bitmap%04d", slot); 304 bm_lockres = lockres_init(mddev, str, NULL, 1); 305 if (!bm_lockres) { 306 pr_err("md-cluster: Cannot initialize bitmaps\n"); 307 goto clear_bit; 308 } 309 310 ret = dlm_lock_sync_interruptible(bm_lockres, DLM_LOCK_PW, mddev); 311 if (ret) { 312 pr_err("md-cluster: Could not DLM lock %s: %d\n", 313 str, ret); 314 goto clear_bit; 315 } 316 ret = md_bitmap_copy_from_slot(mddev, slot, &lo, &hi, true); 317 if (ret) { 318 pr_err("md-cluster: Could not copy data from bitmap %d\n", slot); 319 goto clear_bit; 320 } 321 322 /* Clear suspend_area associated with the bitmap */ 323 spin_lock_irq(&cinfo->suspend_lock); 324 cinfo->suspend_hi = 0; 325 cinfo->suspend_lo = 0; 326 cinfo->suspend_from = -1; 327 spin_unlock_irq(&cinfo->suspend_lock); 328 329 /* Kick off a reshape if needed */ 330 if (test_bit(MD_RESYNCING_REMOTE, &mddev->recovery) && 331 test_bit(MD_RECOVERY_RESHAPE, &mddev->recovery) && 332 mddev->reshape_position != MaxSector) 333 md_wakeup_thread(mddev->sync_thread); 334 335 if (hi > 0) { 336 if (lo < mddev->recovery_cp) 337 mddev->recovery_cp = lo; 338 /* wake up thread to continue resync in case resync 339 * is not finished */ 340 if (mddev->recovery_cp != MaxSector) { 341 /* 342 * clear the REMOTE flag since we will launch 343 * resync thread in current node. 344 */ 345 clear_bit(MD_RESYNCING_REMOTE, 346 &mddev->recovery); 347 set_bit(MD_RECOVERY_NEEDED, &mddev->recovery); 348 md_wakeup_thread(mddev->thread); 349 } 350 } 351 clear_bit: 352 lockres_free(bm_lockres); 353 clear_bit(slot, &cinfo->recovery_map); 354 } 355 } 356 357 static void recover_prep(void *arg) 358 { 359 struct mddev *mddev = arg; 360 struct md_cluster_info *cinfo = mddev->cluster_info; 361 set_bit(MD_CLUSTER_SUSPEND_READ_BALANCING, &cinfo->state); 362 } 363 364 static void __recover_slot(struct mddev *mddev, int slot) 365 { 366 struct md_cluster_info *cinfo = mddev->cluster_info; 367 368 set_bit(slot, &cinfo->recovery_map); 369 if (!cinfo->recovery_thread) { 370 cinfo->recovery_thread = md_register_thread(recover_bitmaps, 371 mddev, "recover"); 372 if (!cinfo->recovery_thread) { 373 pr_warn("md-cluster: Could not create recovery thread\n"); 374 return; 375 } 376 } 377 md_wakeup_thread(cinfo->recovery_thread); 378 } 379 380 static void recover_slot(void *arg, struct dlm_slot *slot) 381 { 382 struct mddev *mddev = arg; 383 struct md_cluster_info *cinfo = mddev->cluster_info; 384 385 pr_info("md-cluster: %s Node %d/%d down. My slot: %d. Initiating recovery.\n", 386 mddev->bitmap_info.cluster_name, 387 slot->nodeid, slot->slot, 388 cinfo->slot_number); 389 /* deduct one since dlm slot starts from one while the num of 390 * cluster-md begins with 0 */ 391 __recover_slot(mddev, slot->slot - 1); 392 } 393 394 static void recover_done(void *arg, struct dlm_slot *slots, 395 int num_slots, int our_slot, 396 uint32_t generation) 397 { 398 struct mddev *mddev = arg; 399 struct md_cluster_info *cinfo = mddev->cluster_info; 400 401 cinfo->slot_number = our_slot; 402 /* completion is only need to be complete when node join cluster, 403 * it doesn't need to run during another node's failure */ 404 if (test_bit(MD_CLUSTER_BEGIN_JOIN_CLUSTER, &cinfo->state)) { 405 complete(&cinfo->completion); 406 clear_bit(MD_CLUSTER_BEGIN_JOIN_CLUSTER, &cinfo->state); 407 } 408 clear_bit(MD_CLUSTER_SUSPEND_READ_BALANCING, &cinfo->state); 409 } 410 411 /* the ops is called when node join the cluster, and do lock recovery 412 * if node failure occurs */ 413 static const struct dlm_lockspace_ops md_ls_ops = { 414 .recover_prep = recover_prep, 415 .recover_slot = recover_slot, 416 .recover_done = recover_done, 417 }; 418 419 /* 420 * The BAST function for the ack lock resource 421 * This function wakes up the receive thread in 422 * order to receive and process the message. 423 */ 424 static void ack_bast(void *arg, int mode) 425 { 426 struct dlm_lock_resource *res = arg; 427 struct md_cluster_info *cinfo = res->mddev->cluster_info; 428 429 if (mode == DLM_LOCK_EX) { 430 if (test_bit(MD_CLUSTER_ALREADY_IN_CLUSTER, &cinfo->state)) 431 md_wakeup_thread(cinfo->recv_thread); 432 else 433 set_bit(MD_CLUSTER_PENDING_RECV_EVENT, &cinfo->state); 434 } 435 } 436 437 static void remove_suspend_info(struct mddev *mddev, int slot) 438 { 439 struct md_cluster_info *cinfo = mddev->cluster_info; 440 mddev->pers->quiesce(mddev, 1); 441 spin_lock_irq(&cinfo->suspend_lock); 442 cinfo->suspend_hi = 0; 443 cinfo->suspend_lo = 0; 444 spin_unlock_irq(&cinfo->suspend_lock); 445 mddev->pers->quiesce(mddev, 0); 446 } 447 448 static void process_suspend_info(struct mddev *mddev, 449 int slot, sector_t lo, sector_t hi) 450 { 451 struct md_cluster_info *cinfo = mddev->cluster_info; 452 struct mdp_superblock_1 *sb = NULL; 453 struct md_rdev *rdev; 454 455 if (!hi) { 456 /* 457 * clear the REMOTE flag since resync or recovery is finished 458 * in remote node. 459 */ 460 clear_bit(MD_RESYNCING_REMOTE, &mddev->recovery); 461 remove_suspend_info(mddev, slot); 462 set_bit(MD_RECOVERY_NEEDED, &mddev->recovery); 463 md_wakeup_thread(mddev->thread); 464 return; 465 } 466 467 rdev_for_each(rdev, mddev) 468 if (rdev->raid_disk > -1 && !test_bit(Faulty, &rdev->flags)) { 469 sb = page_address(rdev->sb_page); 470 break; 471 } 472 473 /* 474 * The bitmaps are not same for different nodes 475 * if RESYNCING is happening in one node, then 476 * the node which received the RESYNCING message 477 * probably will perform resync with the region 478 * [lo, hi] again, so we could reduce resync time 479 * a lot if we can ensure that the bitmaps among 480 * different nodes are match up well. 481 * 482 * sync_low/hi is used to record the region which 483 * arrived in the previous RESYNCING message, 484 * 485 * Call md_bitmap_sync_with_cluster to clear NEEDED_MASK 486 * and set RESYNC_MASK since resync thread is running 487 * in another node, so we don't need to do the resync 488 * again with the same section. 489 * 490 * Skip md_bitmap_sync_with_cluster in case reshape 491 * happening, because reshaping region is small and 492 * we don't want to trigger lots of WARN. 493 */ 494 if (sb && !(le32_to_cpu(sb->feature_map) & MD_FEATURE_RESHAPE_ACTIVE)) 495 md_bitmap_sync_with_cluster(mddev, cinfo->sync_low, 496 cinfo->sync_hi, lo, hi); 497 cinfo->sync_low = lo; 498 cinfo->sync_hi = hi; 499 500 mddev->pers->quiesce(mddev, 1); 501 spin_lock_irq(&cinfo->suspend_lock); 502 cinfo->suspend_from = slot; 503 cinfo->suspend_lo = lo; 504 cinfo->suspend_hi = hi; 505 spin_unlock_irq(&cinfo->suspend_lock); 506 mddev->pers->quiesce(mddev, 0); 507 } 508 509 static void process_add_new_disk(struct mddev *mddev, struct cluster_msg *cmsg) 510 { 511 char disk_uuid[64]; 512 struct md_cluster_info *cinfo = mddev->cluster_info; 513 char event_name[] = "EVENT=ADD_DEVICE"; 514 char raid_slot[16]; 515 char *envp[] = {event_name, disk_uuid, raid_slot, NULL}; 516 int len; 517 518 len = snprintf(disk_uuid, 64, "DEVICE_UUID="); 519 sprintf(disk_uuid + len, "%pU", cmsg->uuid); 520 snprintf(raid_slot, 16, "RAID_DISK=%d", le32_to_cpu(cmsg->raid_slot)); 521 pr_info("%s:%d Sending kobject change with %s and %s\n", __func__, __LINE__, disk_uuid, raid_slot); 522 init_completion(&cinfo->newdisk_completion); 523 set_bit(MD_CLUSTER_WAITING_FOR_NEWDISK, &cinfo->state); 524 kobject_uevent_env(&disk_to_dev(mddev->gendisk)->kobj, KOBJ_CHANGE, envp); 525 wait_for_completion_timeout(&cinfo->newdisk_completion, 526 NEW_DEV_TIMEOUT); 527 clear_bit(MD_CLUSTER_WAITING_FOR_NEWDISK, &cinfo->state); 528 } 529 530 531 static void process_metadata_update(struct mddev *mddev, struct cluster_msg *msg) 532 { 533 int got_lock = 0; 534 struct md_cluster_info *cinfo = mddev->cluster_info; 535 mddev->good_device_nr = le32_to_cpu(msg->raid_slot); 536 537 dlm_lock_sync(cinfo->no_new_dev_lockres, DLM_LOCK_CR); 538 wait_event(mddev->thread->wqueue, 539 (got_lock = mddev_trylock(mddev)) || 540 test_bit(MD_CLUSTER_HOLDING_MUTEX_FOR_RECVD, &cinfo->state)); 541 md_reload_sb(mddev, mddev->good_device_nr); 542 if (got_lock) 543 mddev_unlock(mddev); 544 } 545 546 static void process_remove_disk(struct mddev *mddev, struct cluster_msg *msg) 547 { 548 struct md_rdev *rdev; 549 550 rcu_read_lock(); 551 rdev = md_find_rdev_nr_rcu(mddev, le32_to_cpu(msg->raid_slot)); 552 if (rdev) { 553 set_bit(ClusterRemove, &rdev->flags); 554 set_bit(MD_RECOVERY_NEEDED, &mddev->recovery); 555 md_wakeup_thread(mddev->thread); 556 } 557 else 558 pr_warn("%s: %d Could not find disk(%d) to REMOVE\n", 559 __func__, __LINE__, le32_to_cpu(msg->raid_slot)); 560 rcu_read_unlock(); 561 } 562 563 static void process_readd_disk(struct mddev *mddev, struct cluster_msg *msg) 564 { 565 struct md_rdev *rdev; 566 567 rcu_read_lock(); 568 rdev = md_find_rdev_nr_rcu(mddev, le32_to_cpu(msg->raid_slot)); 569 if (rdev && test_bit(Faulty, &rdev->flags)) 570 clear_bit(Faulty, &rdev->flags); 571 else 572 pr_warn("%s: %d Could not find disk(%d) which is faulty", 573 __func__, __LINE__, le32_to_cpu(msg->raid_slot)); 574 rcu_read_unlock(); 575 } 576 577 static int process_recvd_msg(struct mddev *mddev, struct cluster_msg *msg) 578 { 579 int ret = 0; 580 581 if (WARN(mddev->cluster_info->slot_number - 1 == le32_to_cpu(msg->slot), 582 "node %d received it's own msg\n", le32_to_cpu(msg->slot))) 583 return -1; 584 switch (le32_to_cpu(msg->type)) { 585 case METADATA_UPDATED: 586 process_metadata_update(mddev, msg); 587 break; 588 case CHANGE_CAPACITY: 589 set_capacity(mddev->gendisk, mddev->array_sectors); 590 revalidate_disk(mddev->gendisk); 591 break; 592 case RESYNCING: 593 set_bit(MD_RESYNCING_REMOTE, &mddev->recovery); 594 process_suspend_info(mddev, le32_to_cpu(msg->slot), 595 le64_to_cpu(msg->low), 596 le64_to_cpu(msg->high)); 597 break; 598 case NEWDISK: 599 process_add_new_disk(mddev, msg); 600 break; 601 case REMOVE: 602 process_remove_disk(mddev, msg); 603 break; 604 case RE_ADD: 605 process_readd_disk(mddev, msg); 606 break; 607 case BITMAP_NEEDS_SYNC: 608 __recover_slot(mddev, le32_to_cpu(msg->slot)); 609 break; 610 case BITMAP_RESIZE: 611 if (le64_to_cpu(msg->high) != mddev->pers->size(mddev, 0, 0)) 612 ret = md_bitmap_resize(mddev->bitmap, 613 le64_to_cpu(msg->high), 0, 0); 614 break; 615 default: 616 ret = -1; 617 pr_warn("%s:%d Received unknown message from %d\n", 618 __func__, __LINE__, msg->slot); 619 } 620 return ret; 621 } 622 623 /* 624 * thread for receiving message 625 */ 626 static void recv_daemon(struct md_thread *thread) 627 { 628 struct md_cluster_info *cinfo = thread->mddev->cluster_info; 629 struct dlm_lock_resource *ack_lockres = cinfo->ack_lockres; 630 struct dlm_lock_resource *message_lockres = cinfo->message_lockres; 631 struct cluster_msg msg; 632 int ret; 633 634 mutex_lock(&cinfo->recv_mutex); 635 /*get CR on Message*/ 636 if (dlm_lock_sync(message_lockres, DLM_LOCK_CR)) { 637 pr_err("md/raid1:failed to get CR on MESSAGE\n"); 638 mutex_unlock(&cinfo->recv_mutex); 639 return; 640 } 641 642 /* read lvb and wake up thread to process this message_lockres */ 643 memcpy(&msg, message_lockres->lksb.sb_lvbptr, sizeof(struct cluster_msg)); 644 ret = process_recvd_msg(thread->mddev, &msg); 645 if (ret) 646 goto out; 647 648 /*release CR on ack_lockres*/ 649 ret = dlm_unlock_sync(ack_lockres); 650 if (unlikely(ret != 0)) 651 pr_info("unlock ack failed return %d\n", ret); 652 /*up-convert to PR on message_lockres*/ 653 ret = dlm_lock_sync(message_lockres, DLM_LOCK_PR); 654 if (unlikely(ret != 0)) 655 pr_info("lock PR on msg failed return %d\n", ret); 656 /*get CR on ack_lockres again*/ 657 ret = dlm_lock_sync(ack_lockres, DLM_LOCK_CR); 658 if (unlikely(ret != 0)) 659 pr_info("lock CR on ack failed return %d\n", ret); 660 out: 661 /*release CR on message_lockres*/ 662 ret = dlm_unlock_sync(message_lockres); 663 if (unlikely(ret != 0)) 664 pr_info("unlock msg failed return %d\n", ret); 665 mutex_unlock(&cinfo->recv_mutex); 666 } 667 668 /* lock_token() 669 * Takes the lock on the TOKEN lock resource so no other 670 * node can communicate while the operation is underway. 671 */ 672 static int lock_token(struct md_cluster_info *cinfo, bool mddev_locked) 673 { 674 int error, set_bit = 0; 675 struct mddev *mddev = cinfo->mddev; 676 677 /* 678 * If resync thread run after raid1d thread, then process_metadata_update 679 * could not continue if raid1d held reconfig_mutex (and raid1d is blocked 680 * since another node already got EX on Token and waitting the EX of Ack), 681 * so let resync wake up thread in case flag is set. 682 */ 683 if (mddev_locked && !test_bit(MD_CLUSTER_HOLDING_MUTEX_FOR_RECVD, 684 &cinfo->state)) { 685 error = test_and_set_bit_lock(MD_CLUSTER_HOLDING_MUTEX_FOR_RECVD, 686 &cinfo->state); 687 WARN_ON_ONCE(error); 688 md_wakeup_thread(mddev->thread); 689 set_bit = 1; 690 } 691 error = dlm_lock_sync(cinfo->token_lockres, DLM_LOCK_EX); 692 if (set_bit) 693 clear_bit_unlock(MD_CLUSTER_HOLDING_MUTEX_FOR_RECVD, &cinfo->state); 694 695 if (error) 696 pr_err("md-cluster(%s:%d): failed to get EX on TOKEN (%d)\n", 697 __func__, __LINE__, error); 698 699 /* Lock the receive sequence */ 700 mutex_lock(&cinfo->recv_mutex); 701 return error; 702 } 703 704 /* lock_comm() 705 * Sets the MD_CLUSTER_SEND_LOCK bit to lock the send channel. 706 */ 707 static int lock_comm(struct md_cluster_info *cinfo, bool mddev_locked) 708 { 709 wait_event(cinfo->wait, 710 !test_and_set_bit(MD_CLUSTER_SEND_LOCK, &cinfo->state)); 711 712 return lock_token(cinfo, mddev_locked); 713 } 714 715 static void unlock_comm(struct md_cluster_info *cinfo) 716 { 717 WARN_ON(cinfo->token_lockres->mode != DLM_LOCK_EX); 718 mutex_unlock(&cinfo->recv_mutex); 719 dlm_unlock_sync(cinfo->token_lockres); 720 clear_bit(MD_CLUSTER_SEND_LOCK, &cinfo->state); 721 wake_up(&cinfo->wait); 722 } 723 724 /* __sendmsg() 725 * This function performs the actual sending of the message. This function is 726 * usually called after performing the encompassing operation 727 * The function: 728 * 1. Grabs the message lockresource in EX mode 729 * 2. Copies the message to the message LVB 730 * 3. Downconverts message lockresource to CW 731 * 4. Upconverts ack lock resource from CR to EX. This forces the BAST on other nodes 732 * and the other nodes read the message. The thread will wait here until all other 733 * nodes have released ack lock resource. 734 * 5. Downconvert ack lockresource to CR 735 */ 736 static int __sendmsg(struct md_cluster_info *cinfo, struct cluster_msg *cmsg) 737 { 738 int error; 739 int slot = cinfo->slot_number - 1; 740 741 cmsg->slot = cpu_to_le32(slot); 742 /*get EX on Message*/ 743 error = dlm_lock_sync(cinfo->message_lockres, DLM_LOCK_EX); 744 if (error) { 745 pr_err("md-cluster: failed to get EX on MESSAGE (%d)\n", error); 746 goto failed_message; 747 } 748 749 memcpy(cinfo->message_lockres->lksb.sb_lvbptr, (void *)cmsg, 750 sizeof(struct cluster_msg)); 751 /*down-convert EX to CW on Message*/ 752 error = dlm_lock_sync(cinfo->message_lockres, DLM_LOCK_CW); 753 if (error) { 754 pr_err("md-cluster: failed to convert EX to CW on MESSAGE(%d)\n", 755 error); 756 goto failed_ack; 757 } 758 759 /*up-convert CR to EX on Ack*/ 760 error = dlm_lock_sync(cinfo->ack_lockres, DLM_LOCK_EX); 761 if (error) { 762 pr_err("md-cluster: failed to convert CR to EX on ACK(%d)\n", 763 error); 764 goto failed_ack; 765 } 766 767 /*down-convert EX to CR on Ack*/ 768 error = dlm_lock_sync(cinfo->ack_lockres, DLM_LOCK_CR); 769 if (error) { 770 pr_err("md-cluster: failed to convert EX to CR on ACK(%d)\n", 771 error); 772 goto failed_ack; 773 } 774 775 failed_ack: 776 error = dlm_unlock_sync(cinfo->message_lockres); 777 if (unlikely(error != 0)) { 778 pr_err("md-cluster: failed convert to NL on MESSAGE(%d)\n", 779 error); 780 /* in case the message can't be released due to some reason */ 781 goto failed_ack; 782 } 783 failed_message: 784 return error; 785 } 786 787 static int sendmsg(struct md_cluster_info *cinfo, struct cluster_msg *cmsg, 788 bool mddev_locked) 789 { 790 int ret; 791 792 lock_comm(cinfo, mddev_locked); 793 ret = __sendmsg(cinfo, cmsg); 794 unlock_comm(cinfo); 795 return ret; 796 } 797 798 static int gather_all_resync_info(struct mddev *mddev, int total_slots) 799 { 800 struct md_cluster_info *cinfo = mddev->cluster_info; 801 int i, ret = 0; 802 struct dlm_lock_resource *bm_lockres; 803 char str[64]; 804 sector_t lo, hi; 805 806 807 for (i = 0; i < total_slots; i++) { 808 memset(str, '\0', 64); 809 snprintf(str, 64, "bitmap%04d", i); 810 bm_lockres = lockres_init(mddev, str, NULL, 1); 811 if (!bm_lockres) 812 return -ENOMEM; 813 if (i == (cinfo->slot_number - 1)) { 814 lockres_free(bm_lockres); 815 continue; 816 } 817 818 bm_lockres->flags |= DLM_LKF_NOQUEUE; 819 ret = dlm_lock_sync(bm_lockres, DLM_LOCK_PW); 820 if (ret == -EAGAIN) { 821 if (read_resync_info(mddev, bm_lockres)) { 822 pr_info("%s:%d Resync[%llu..%llu] in progress on %d\n", 823 __func__, __LINE__, 824 (unsigned long long) cinfo->suspend_lo, 825 (unsigned long long) cinfo->suspend_hi, 826 i); 827 cinfo->suspend_from = i; 828 } 829 ret = 0; 830 lockres_free(bm_lockres); 831 continue; 832 } 833 if (ret) { 834 lockres_free(bm_lockres); 835 goto out; 836 } 837 838 /* Read the disk bitmap sb and check if it needs recovery */ 839 ret = md_bitmap_copy_from_slot(mddev, i, &lo, &hi, false); 840 if (ret) { 841 pr_warn("md-cluster: Could not gather bitmaps from slot %d", i); 842 lockres_free(bm_lockres); 843 continue; 844 } 845 if ((hi > 0) && (lo < mddev->recovery_cp)) { 846 set_bit(MD_RECOVERY_NEEDED, &mddev->recovery); 847 mddev->recovery_cp = lo; 848 md_check_recovery(mddev); 849 } 850 851 lockres_free(bm_lockres); 852 } 853 out: 854 return ret; 855 } 856 857 static int join(struct mddev *mddev, int nodes) 858 { 859 struct md_cluster_info *cinfo; 860 int ret, ops_rv; 861 char str[64]; 862 863 cinfo = kzalloc(sizeof(struct md_cluster_info), GFP_KERNEL); 864 if (!cinfo) 865 return -ENOMEM; 866 867 INIT_LIST_HEAD(&cinfo->suspend_list); 868 spin_lock_init(&cinfo->suspend_lock); 869 init_completion(&cinfo->completion); 870 set_bit(MD_CLUSTER_BEGIN_JOIN_CLUSTER, &cinfo->state); 871 init_waitqueue_head(&cinfo->wait); 872 mutex_init(&cinfo->recv_mutex); 873 874 mddev->cluster_info = cinfo; 875 cinfo->mddev = mddev; 876 877 memset(str, 0, 64); 878 sprintf(str, "%pU", mddev->uuid); 879 ret = dlm_new_lockspace(str, mddev->bitmap_info.cluster_name, 880 DLM_LSFL_FS, LVB_SIZE, 881 &md_ls_ops, mddev, &ops_rv, &cinfo->lockspace); 882 if (ret) 883 goto err; 884 wait_for_completion(&cinfo->completion); 885 if (nodes < cinfo->slot_number) { 886 pr_err("md-cluster: Slot allotted(%d) is greater than available slots(%d).", 887 cinfo->slot_number, nodes); 888 ret = -ERANGE; 889 goto err; 890 } 891 /* Initiate the communication resources */ 892 ret = -ENOMEM; 893 cinfo->recv_thread = md_register_thread(recv_daemon, mddev, "cluster_recv"); 894 if (!cinfo->recv_thread) { 895 pr_err("md-cluster: cannot allocate memory for recv_thread!\n"); 896 goto err; 897 } 898 cinfo->message_lockres = lockres_init(mddev, "message", NULL, 1); 899 if (!cinfo->message_lockres) 900 goto err; 901 cinfo->token_lockres = lockres_init(mddev, "token", NULL, 0); 902 if (!cinfo->token_lockres) 903 goto err; 904 cinfo->no_new_dev_lockres = lockres_init(mddev, "no-new-dev", NULL, 0); 905 if (!cinfo->no_new_dev_lockres) 906 goto err; 907 908 ret = dlm_lock_sync(cinfo->token_lockres, DLM_LOCK_EX); 909 if (ret) { 910 ret = -EAGAIN; 911 pr_err("md-cluster: can't join cluster to avoid lock issue\n"); 912 goto err; 913 } 914 cinfo->ack_lockres = lockres_init(mddev, "ack", ack_bast, 0); 915 if (!cinfo->ack_lockres) { 916 ret = -ENOMEM; 917 goto err; 918 } 919 /* get sync CR lock on ACK. */ 920 if (dlm_lock_sync(cinfo->ack_lockres, DLM_LOCK_CR)) 921 pr_err("md-cluster: failed to get a sync CR lock on ACK!(%d)\n", 922 ret); 923 dlm_unlock_sync(cinfo->token_lockres); 924 /* get sync CR lock on no-new-dev. */ 925 if (dlm_lock_sync(cinfo->no_new_dev_lockres, DLM_LOCK_CR)) 926 pr_err("md-cluster: failed to get a sync CR lock on no-new-dev!(%d)\n", ret); 927 928 929 pr_info("md-cluster: Joined cluster %s slot %d\n", str, cinfo->slot_number); 930 snprintf(str, 64, "bitmap%04d", cinfo->slot_number - 1); 931 cinfo->bitmap_lockres = lockres_init(mddev, str, NULL, 1); 932 if (!cinfo->bitmap_lockres) { 933 ret = -ENOMEM; 934 goto err; 935 } 936 if (dlm_lock_sync(cinfo->bitmap_lockres, DLM_LOCK_PW)) { 937 pr_err("Failed to get bitmap lock\n"); 938 ret = -EINVAL; 939 goto err; 940 } 941 942 cinfo->resync_lockres = lockres_init(mddev, "resync", NULL, 0); 943 if (!cinfo->resync_lockres) { 944 ret = -ENOMEM; 945 goto err; 946 } 947 948 return 0; 949 err: 950 set_bit(MD_CLUSTER_HOLDING_MUTEX_FOR_RECVD, &cinfo->state); 951 md_unregister_thread(&cinfo->recovery_thread); 952 md_unregister_thread(&cinfo->recv_thread); 953 lockres_free(cinfo->message_lockres); 954 lockres_free(cinfo->token_lockres); 955 lockres_free(cinfo->ack_lockres); 956 lockres_free(cinfo->no_new_dev_lockres); 957 lockres_free(cinfo->resync_lockres); 958 lockres_free(cinfo->bitmap_lockres); 959 if (cinfo->lockspace) 960 dlm_release_lockspace(cinfo->lockspace, 2); 961 mddev->cluster_info = NULL; 962 kfree(cinfo); 963 return ret; 964 } 965 966 static void load_bitmaps(struct mddev *mddev, int total_slots) 967 { 968 struct md_cluster_info *cinfo = mddev->cluster_info; 969 970 /* load all the node's bitmap info for resync */ 971 if (gather_all_resync_info(mddev, total_slots)) 972 pr_err("md-cluster: failed to gather all resyn infos\n"); 973 set_bit(MD_CLUSTER_ALREADY_IN_CLUSTER, &cinfo->state); 974 /* wake up recv thread in case something need to be handled */ 975 if (test_and_clear_bit(MD_CLUSTER_PENDING_RECV_EVENT, &cinfo->state)) 976 md_wakeup_thread(cinfo->recv_thread); 977 } 978 979 static void resync_bitmap(struct mddev *mddev) 980 { 981 struct md_cluster_info *cinfo = mddev->cluster_info; 982 struct cluster_msg cmsg = {0}; 983 int err; 984 985 cmsg.type = cpu_to_le32(BITMAP_NEEDS_SYNC); 986 err = sendmsg(cinfo, &cmsg, 1); 987 if (err) 988 pr_err("%s:%d: failed to send BITMAP_NEEDS_SYNC message (%d)\n", 989 __func__, __LINE__, err); 990 } 991 992 static void unlock_all_bitmaps(struct mddev *mddev); 993 static int leave(struct mddev *mddev) 994 { 995 struct md_cluster_info *cinfo = mddev->cluster_info; 996 997 if (!cinfo) 998 return 0; 999 1000 /* 1001 * BITMAP_NEEDS_SYNC message should be sent when node 1002 * is leaving the cluster with dirty bitmap, also we 1003 * can only deliver it when dlm connection is available. 1004 * 1005 * Also, we should send BITMAP_NEEDS_SYNC message in 1006 * case reshaping is interrupted. 1007 */ 1008 if ((cinfo->slot_number > 0 && mddev->recovery_cp != MaxSector) || 1009 (mddev->reshape_position != MaxSector && 1010 test_bit(MD_CLOSING, &mddev->flags))) 1011 resync_bitmap(mddev); 1012 1013 set_bit(MD_CLUSTER_HOLDING_MUTEX_FOR_RECVD, &cinfo->state); 1014 md_unregister_thread(&cinfo->recovery_thread); 1015 md_unregister_thread(&cinfo->recv_thread); 1016 lockres_free(cinfo->message_lockres); 1017 lockres_free(cinfo->token_lockres); 1018 lockres_free(cinfo->ack_lockres); 1019 lockres_free(cinfo->no_new_dev_lockres); 1020 lockres_free(cinfo->resync_lockres); 1021 lockres_free(cinfo->bitmap_lockres); 1022 unlock_all_bitmaps(mddev); 1023 dlm_release_lockspace(cinfo->lockspace, 2); 1024 kfree(cinfo); 1025 return 0; 1026 } 1027 1028 /* slot_number(): Returns the MD slot number to use 1029 * DLM starts the slot numbers from 1, wheras cluster-md 1030 * wants the number to be from zero, so we deduct one 1031 */ 1032 static int slot_number(struct mddev *mddev) 1033 { 1034 struct md_cluster_info *cinfo = mddev->cluster_info; 1035 1036 return cinfo->slot_number - 1; 1037 } 1038 1039 /* 1040 * Check if the communication is already locked, else lock the communication 1041 * channel. 1042 * If it is already locked, token is in EX mode, and hence lock_token() 1043 * should not be called. 1044 */ 1045 static int metadata_update_start(struct mddev *mddev) 1046 { 1047 struct md_cluster_info *cinfo = mddev->cluster_info; 1048 int ret; 1049 1050 /* 1051 * metadata_update_start is always called with the protection of 1052 * reconfig_mutex, so set WAITING_FOR_TOKEN here. 1053 */ 1054 ret = test_and_set_bit_lock(MD_CLUSTER_HOLDING_MUTEX_FOR_RECVD, 1055 &cinfo->state); 1056 WARN_ON_ONCE(ret); 1057 md_wakeup_thread(mddev->thread); 1058 1059 wait_event(cinfo->wait, 1060 !test_and_set_bit(MD_CLUSTER_SEND_LOCK, &cinfo->state) || 1061 test_and_clear_bit(MD_CLUSTER_SEND_LOCKED_ALREADY, &cinfo->state)); 1062 1063 /* If token is already locked, return 0 */ 1064 if (cinfo->token_lockres->mode == DLM_LOCK_EX) { 1065 clear_bit_unlock(MD_CLUSTER_HOLDING_MUTEX_FOR_RECVD, &cinfo->state); 1066 return 0; 1067 } 1068 1069 ret = lock_token(cinfo, 1); 1070 clear_bit_unlock(MD_CLUSTER_HOLDING_MUTEX_FOR_RECVD, &cinfo->state); 1071 return ret; 1072 } 1073 1074 static int metadata_update_finish(struct mddev *mddev) 1075 { 1076 struct md_cluster_info *cinfo = mddev->cluster_info; 1077 struct cluster_msg cmsg; 1078 struct md_rdev *rdev; 1079 int ret = 0; 1080 int raid_slot = -1; 1081 1082 memset(&cmsg, 0, sizeof(cmsg)); 1083 cmsg.type = cpu_to_le32(METADATA_UPDATED); 1084 /* Pick up a good active device number to send. 1085 */ 1086 rdev_for_each(rdev, mddev) 1087 if (rdev->raid_disk > -1 && !test_bit(Faulty, &rdev->flags)) { 1088 raid_slot = rdev->desc_nr; 1089 break; 1090 } 1091 if (raid_slot >= 0) { 1092 cmsg.raid_slot = cpu_to_le32(raid_slot); 1093 ret = __sendmsg(cinfo, &cmsg); 1094 } else 1095 pr_warn("md-cluster: No good device id found to send\n"); 1096 clear_bit(MD_CLUSTER_SEND_LOCKED_ALREADY, &cinfo->state); 1097 unlock_comm(cinfo); 1098 return ret; 1099 } 1100 1101 static void metadata_update_cancel(struct mddev *mddev) 1102 { 1103 struct md_cluster_info *cinfo = mddev->cluster_info; 1104 clear_bit(MD_CLUSTER_SEND_LOCKED_ALREADY, &cinfo->state); 1105 unlock_comm(cinfo); 1106 } 1107 1108 static int update_bitmap_size(struct mddev *mddev, sector_t size) 1109 { 1110 struct md_cluster_info *cinfo = mddev->cluster_info; 1111 struct cluster_msg cmsg = {0}; 1112 int ret; 1113 1114 cmsg.type = cpu_to_le32(BITMAP_RESIZE); 1115 cmsg.high = cpu_to_le64(size); 1116 ret = sendmsg(cinfo, &cmsg, 0); 1117 if (ret) 1118 pr_err("%s:%d: failed to send BITMAP_RESIZE message (%d)\n", 1119 __func__, __LINE__, ret); 1120 return ret; 1121 } 1122 1123 static int resize_bitmaps(struct mddev *mddev, sector_t newsize, sector_t oldsize) 1124 { 1125 struct bitmap_counts *counts; 1126 char str[64]; 1127 struct dlm_lock_resource *bm_lockres; 1128 struct bitmap *bitmap = mddev->bitmap; 1129 unsigned long my_pages = bitmap->counts.pages; 1130 int i, rv; 1131 1132 /* 1133 * We need to ensure all the nodes can grow to a larger 1134 * bitmap size before make the reshaping. 1135 */ 1136 rv = update_bitmap_size(mddev, newsize); 1137 if (rv) 1138 return rv; 1139 1140 for (i = 0; i < mddev->bitmap_info.nodes; i++) { 1141 if (i == md_cluster_ops->slot_number(mddev)) 1142 continue; 1143 1144 bitmap = get_bitmap_from_slot(mddev, i); 1145 if (IS_ERR(bitmap)) { 1146 pr_err("can't get bitmap from slot %d\n", i); 1147 goto out; 1148 } 1149 counts = &bitmap->counts; 1150 1151 /* 1152 * If we can hold the bitmap lock of one node then 1153 * the slot is not occupied, update the pages. 1154 */ 1155 snprintf(str, 64, "bitmap%04d", i); 1156 bm_lockres = lockres_init(mddev, str, NULL, 1); 1157 if (!bm_lockres) { 1158 pr_err("Cannot initialize %s lock\n", str); 1159 goto out; 1160 } 1161 bm_lockres->flags |= DLM_LKF_NOQUEUE; 1162 rv = dlm_lock_sync(bm_lockres, DLM_LOCK_PW); 1163 if (!rv) 1164 counts->pages = my_pages; 1165 lockres_free(bm_lockres); 1166 1167 if (my_pages != counts->pages) 1168 /* 1169 * Let's revert the bitmap size if one node 1170 * can't resize bitmap 1171 */ 1172 goto out; 1173 } 1174 1175 return 0; 1176 out: 1177 md_bitmap_free(bitmap); 1178 update_bitmap_size(mddev, oldsize); 1179 return -1; 1180 } 1181 1182 /* 1183 * return 0 if all the bitmaps have the same sync_size 1184 */ 1185 static int cluster_check_sync_size(struct mddev *mddev) 1186 { 1187 int i, rv; 1188 bitmap_super_t *sb; 1189 unsigned long my_sync_size, sync_size = 0; 1190 int node_num = mddev->bitmap_info.nodes; 1191 int current_slot = md_cluster_ops->slot_number(mddev); 1192 struct bitmap *bitmap = mddev->bitmap; 1193 char str[64]; 1194 struct dlm_lock_resource *bm_lockres; 1195 1196 sb = kmap_atomic(bitmap->storage.sb_page); 1197 my_sync_size = sb->sync_size; 1198 kunmap_atomic(sb); 1199 1200 for (i = 0; i < node_num; i++) { 1201 if (i == current_slot) 1202 continue; 1203 1204 bitmap = get_bitmap_from_slot(mddev, i); 1205 if (IS_ERR(bitmap)) { 1206 pr_err("can't get bitmap from slot %d\n", i); 1207 return -1; 1208 } 1209 1210 /* 1211 * If we can hold the bitmap lock of one node then 1212 * the slot is not occupied, update the sb. 1213 */ 1214 snprintf(str, 64, "bitmap%04d", i); 1215 bm_lockres = lockres_init(mddev, str, NULL, 1); 1216 if (!bm_lockres) { 1217 pr_err("md-cluster: Cannot initialize %s\n", str); 1218 md_bitmap_free(bitmap); 1219 return -1; 1220 } 1221 bm_lockres->flags |= DLM_LKF_NOQUEUE; 1222 rv = dlm_lock_sync(bm_lockres, DLM_LOCK_PW); 1223 if (!rv) 1224 md_bitmap_update_sb(bitmap); 1225 lockres_free(bm_lockres); 1226 1227 sb = kmap_atomic(bitmap->storage.sb_page); 1228 if (sync_size == 0) 1229 sync_size = sb->sync_size; 1230 else if (sync_size != sb->sync_size) { 1231 kunmap_atomic(sb); 1232 md_bitmap_free(bitmap); 1233 return -1; 1234 } 1235 kunmap_atomic(sb); 1236 md_bitmap_free(bitmap); 1237 } 1238 1239 return (my_sync_size == sync_size) ? 0 : -1; 1240 } 1241 1242 /* 1243 * Update the size for cluster raid is a little more complex, we perform it 1244 * by the steps: 1245 * 1. hold token lock and update superblock in initiator node. 1246 * 2. send METADATA_UPDATED msg to other nodes. 1247 * 3. The initiator node continues to check each bitmap's sync_size, if all 1248 * bitmaps have the same value of sync_size, then we can set capacity and 1249 * let other nodes to perform it. If one node can't update sync_size 1250 * accordingly, we need to revert to previous value. 1251 */ 1252 static void update_size(struct mddev *mddev, sector_t old_dev_sectors) 1253 { 1254 struct md_cluster_info *cinfo = mddev->cluster_info; 1255 struct cluster_msg cmsg; 1256 struct md_rdev *rdev; 1257 int ret = 0; 1258 int raid_slot = -1; 1259 1260 md_update_sb(mddev, 1); 1261 lock_comm(cinfo, 1); 1262 1263 memset(&cmsg, 0, sizeof(cmsg)); 1264 cmsg.type = cpu_to_le32(METADATA_UPDATED); 1265 rdev_for_each(rdev, mddev) 1266 if (rdev->raid_disk >= 0 && !test_bit(Faulty, &rdev->flags)) { 1267 raid_slot = rdev->desc_nr; 1268 break; 1269 } 1270 if (raid_slot >= 0) { 1271 cmsg.raid_slot = cpu_to_le32(raid_slot); 1272 /* 1273 * We can only change capiticy after all the nodes can do it, 1274 * so need to wait after other nodes already received the msg 1275 * and handled the change 1276 */ 1277 ret = __sendmsg(cinfo, &cmsg); 1278 if (ret) { 1279 pr_err("%s:%d: failed to send METADATA_UPDATED msg\n", 1280 __func__, __LINE__); 1281 unlock_comm(cinfo); 1282 return; 1283 } 1284 } else { 1285 pr_err("md-cluster: No good device id found to send\n"); 1286 unlock_comm(cinfo); 1287 return; 1288 } 1289 1290 /* 1291 * check the sync_size from other node's bitmap, if sync_size 1292 * have already updated in other nodes as expected, send an 1293 * empty metadata msg to permit the change of capacity 1294 */ 1295 if (cluster_check_sync_size(mddev) == 0) { 1296 memset(&cmsg, 0, sizeof(cmsg)); 1297 cmsg.type = cpu_to_le32(CHANGE_CAPACITY); 1298 ret = __sendmsg(cinfo, &cmsg); 1299 if (ret) 1300 pr_err("%s:%d: failed to send CHANGE_CAPACITY msg\n", 1301 __func__, __LINE__); 1302 set_capacity(mddev->gendisk, mddev->array_sectors); 1303 revalidate_disk(mddev->gendisk); 1304 } else { 1305 /* revert to previous sectors */ 1306 ret = mddev->pers->resize(mddev, old_dev_sectors); 1307 if (!ret) 1308 revalidate_disk(mddev->gendisk); 1309 ret = __sendmsg(cinfo, &cmsg); 1310 if (ret) 1311 pr_err("%s:%d: failed to send METADATA_UPDATED msg\n", 1312 __func__, __LINE__); 1313 } 1314 unlock_comm(cinfo); 1315 } 1316 1317 static int resync_start(struct mddev *mddev) 1318 { 1319 struct md_cluster_info *cinfo = mddev->cluster_info; 1320 return dlm_lock_sync_interruptible(cinfo->resync_lockres, DLM_LOCK_EX, mddev); 1321 } 1322 1323 static void resync_info_get(struct mddev *mddev, sector_t *lo, sector_t *hi) 1324 { 1325 struct md_cluster_info *cinfo = mddev->cluster_info; 1326 1327 spin_lock_irq(&cinfo->suspend_lock); 1328 *lo = cinfo->suspend_lo; 1329 *hi = cinfo->suspend_hi; 1330 spin_unlock_irq(&cinfo->suspend_lock); 1331 } 1332 1333 static int resync_info_update(struct mddev *mddev, sector_t lo, sector_t hi) 1334 { 1335 struct md_cluster_info *cinfo = mddev->cluster_info; 1336 struct resync_info ri; 1337 struct cluster_msg cmsg = {0}; 1338 1339 /* do not send zero again, if we have sent before */ 1340 if (hi == 0) { 1341 memcpy(&ri, cinfo->bitmap_lockres->lksb.sb_lvbptr, sizeof(struct resync_info)); 1342 if (le64_to_cpu(ri.hi) == 0) 1343 return 0; 1344 } 1345 1346 add_resync_info(cinfo->bitmap_lockres, lo, hi); 1347 /* Re-acquire the lock to refresh LVB */ 1348 dlm_lock_sync(cinfo->bitmap_lockres, DLM_LOCK_PW); 1349 cmsg.type = cpu_to_le32(RESYNCING); 1350 cmsg.low = cpu_to_le64(lo); 1351 cmsg.high = cpu_to_le64(hi); 1352 1353 /* 1354 * mddev_lock is held if resync_info_update is called from 1355 * resync_finish (md_reap_sync_thread -> resync_finish) 1356 */ 1357 if (lo == 0 && hi == 0) 1358 return sendmsg(cinfo, &cmsg, 1); 1359 else 1360 return sendmsg(cinfo, &cmsg, 0); 1361 } 1362 1363 static int resync_finish(struct mddev *mddev) 1364 { 1365 struct md_cluster_info *cinfo = mddev->cluster_info; 1366 int ret = 0; 1367 1368 clear_bit(MD_RESYNCING_REMOTE, &mddev->recovery); 1369 1370 /* 1371 * If resync thread is interrupted so we can't say resync is finished, 1372 * another node will launch resync thread to continue. 1373 */ 1374 if (!test_bit(MD_CLOSING, &mddev->flags)) 1375 ret = resync_info_update(mddev, 0, 0); 1376 dlm_unlock_sync(cinfo->resync_lockres); 1377 return ret; 1378 } 1379 1380 static int area_resyncing(struct mddev *mddev, int direction, 1381 sector_t lo, sector_t hi) 1382 { 1383 struct md_cluster_info *cinfo = mddev->cluster_info; 1384 int ret = 0; 1385 1386 if ((direction == READ) && 1387 test_bit(MD_CLUSTER_SUSPEND_READ_BALANCING, &cinfo->state)) 1388 return 1; 1389 1390 spin_lock_irq(&cinfo->suspend_lock); 1391 if (hi > cinfo->suspend_lo && lo < cinfo->suspend_hi) 1392 ret = 1; 1393 spin_unlock_irq(&cinfo->suspend_lock); 1394 return ret; 1395 } 1396 1397 /* add_new_disk() - initiates a disk add 1398 * However, if this fails before writing md_update_sb(), 1399 * add_new_disk_cancel() must be called to release token lock 1400 */ 1401 static int add_new_disk(struct mddev *mddev, struct md_rdev *rdev) 1402 { 1403 struct md_cluster_info *cinfo = mddev->cluster_info; 1404 struct cluster_msg cmsg; 1405 int ret = 0; 1406 struct mdp_superblock_1 *sb = page_address(rdev->sb_page); 1407 char *uuid = sb->device_uuid; 1408 1409 memset(&cmsg, 0, sizeof(cmsg)); 1410 cmsg.type = cpu_to_le32(NEWDISK); 1411 memcpy(cmsg.uuid, uuid, 16); 1412 cmsg.raid_slot = cpu_to_le32(rdev->desc_nr); 1413 lock_comm(cinfo, 1); 1414 ret = __sendmsg(cinfo, &cmsg); 1415 if (ret) { 1416 unlock_comm(cinfo); 1417 return ret; 1418 } 1419 cinfo->no_new_dev_lockres->flags |= DLM_LKF_NOQUEUE; 1420 ret = dlm_lock_sync(cinfo->no_new_dev_lockres, DLM_LOCK_EX); 1421 cinfo->no_new_dev_lockres->flags &= ~DLM_LKF_NOQUEUE; 1422 /* Some node does not "see" the device */ 1423 if (ret == -EAGAIN) 1424 ret = -ENOENT; 1425 if (ret) 1426 unlock_comm(cinfo); 1427 else { 1428 dlm_lock_sync(cinfo->no_new_dev_lockres, DLM_LOCK_CR); 1429 /* Since MD_CHANGE_DEVS will be set in add_bound_rdev which 1430 * will run soon after add_new_disk, the below path will be 1431 * invoked: 1432 * md_wakeup_thread(mddev->thread) 1433 * -> conf->thread (raid1d) 1434 * -> md_check_recovery -> md_update_sb 1435 * -> metadata_update_start/finish 1436 * MD_CLUSTER_SEND_LOCKED_ALREADY will be cleared eventually. 1437 * 1438 * For other failure cases, metadata_update_cancel and 1439 * add_new_disk_cancel also clear below bit as well. 1440 * */ 1441 set_bit(MD_CLUSTER_SEND_LOCKED_ALREADY, &cinfo->state); 1442 wake_up(&cinfo->wait); 1443 } 1444 return ret; 1445 } 1446 1447 static void add_new_disk_cancel(struct mddev *mddev) 1448 { 1449 struct md_cluster_info *cinfo = mddev->cluster_info; 1450 clear_bit(MD_CLUSTER_SEND_LOCKED_ALREADY, &cinfo->state); 1451 unlock_comm(cinfo); 1452 } 1453 1454 static int new_disk_ack(struct mddev *mddev, bool ack) 1455 { 1456 struct md_cluster_info *cinfo = mddev->cluster_info; 1457 1458 if (!test_bit(MD_CLUSTER_WAITING_FOR_NEWDISK, &cinfo->state)) { 1459 pr_warn("md-cluster(%s): Spurious cluster confirmation\n", mdname(mddev)); 1460 return -EINVAL; 1461 } 1462 1463 if (ack) 1464 dlm_unlock_sync(cinfo->no_new_dev_lockres); 1465 complete(&cinfo->newdisk_completion); 1466 return 0; 1467 } 1468 1469 static int remove_disk(struct mddev *mddev, struct md_rdev *rdev) 1470 { 1471 struct cluster_msg cmsg = {0}; 1472 struct md_cluster_info *cinfo = mddev->cluster_info; 1473 cmsg.type = cpu_to_le32(REMOVE); 1474 cmsg.raid_slot = cpu_to_le32(rdev->desc_nr); 1475 return sendmsg(cinfo, &cmsg, 1); 1476 } 1477 1478 static int lock_all_bitmaps(struct mddev *mddev) 1479 { 1480 int slot, my_slot, ret, held = 1, i = 0; 1481 char str[64]; 1482 struct md_cluster_info *cinfo = mddev->cluster_info; 1483 1484 cinfo->other_bitmap_lockres = 1485 kcalloc(mddev->bitmap_info.nodes - 1, 1486 sizeof(struct dlm_lock_resource *), GFP_KERNEL); 1487 if (!cinfo->other_bitmap_lockres) { 1488 pr_err("md: can't alloc mem for other bitmap locks\n"); 1489 return 0; 1490 } 1491 1492 my_slot = slot_number(mddev); 1493 for (slot = 0; slot < mddev->bitmap_info.nodes; slot++) { 1494 if (slot == my_slot) 1495 continue; 1496 1497 memset(str, '\0', 64); 1498 snprintf(str, 64, "bitmap%04d", slot); 1499 cinfo->other_bitmap_lockres[i] = lockres_init(mddev, str, NULL, 1); 1500 if (!cinfo->other_bitmap_lockres[i]) 1501 return -ENOMEM; 1502 1503 cinfo->other_bitmap_lockres[i]->flags |= DLM_LKF_NOQUEUE; 1504 ret = dlm_lock_sync(cinfo->other_bitmap_lockres[i], DLM_LOCK_PW); 1505 if (ret) 1506 held = -1; 1507 i++; 1508 } 1509 1510 return held; 1511 } 1512 1513 static void unlock_all_bitmaps(struct mddev *mddev) 1514 { 1515 struct md_cluster_info *cinfo = mddev->cluster_info; 1516 int i; 1517 1518 /* release other node's bitmap lock if they are existed */ 1519 if (cinfo->other_bitmap_lockres) { 1520 for (i = 0; i < mddev->bitmap_info.nodes - 1; i++) { 1521 if (cinfo->other_bitmap_lockres[i]) { 1522 lockres_free(cinfo->other_bitmap_lockres[i]); 1523 } 1524 } 1525 kfree(cinfo->other_bitmap_lockres); 1526 } 1527 } 1528 1529 static int gather_bitmaps(struct md_rdev *rdev) 1530 { 1531 int sn, err; 1532 sector_t lo, hi; 1533 struct cluster_msg cmsg = {0}; 1534 struct mddev *mddev = rdev->mddev; 1535 struct md_cluster_info *cinfo = mddev->cluster_info; 1536 1537 cmsg.type = cpu_to_le32(RE_ADD); 1538 cmsg.raid_slot = cpu_to_le32(rdev->desc_nr); 1539 err = sendmsg(cinfo, &cmsg, 1); 1540 if (err) 1541 goto out; 1542 1543 for (sn = 0; sn < mddev->bitmap_info.nodes; sn++) { 1544 if (sn == (cinfo->slot_number - 1)) 1545 continue; 1546 err = md_bitmap_copy_from_slot(mddev, sn, &lo, &hi, false); 1547 if (err) { 1548 pr_warn("md-cluster: Could not gather bitmaps from slot %d", sn); 1549 goto out; 1550 } 1551 if ((hi > 0) && (lo < mddev->recovery_cp)) 1552 mddev->recovery_cp = lo; 1553 } 1554 out: 1555 return err; 1556 } 1557 1558 static struct md_cluster_operations cluster_ops = { 1559 .join = join, 1560 .leave = leave, 1561 .slot_number = slot_number, 1562 .resync_start = resync_start, 1563 .resync_finish = resync_finish, 1564 .resync_info_update = resync_info_update, 1565 .resync_info_get = resync_info_get, 1566 .metadata_update_start = metadata_update_start, 1567 .metadata_update_finish = metadata_update_finish, 1568 .metadata_update_cancel = metadata_update_cancel, 1569 .area_resyncing = area_resyncing, 1570 .add_new_disk = add_new_disk, 1571 .add_new_disk_cancel = add_new_disk_cancel, 1572 .new_disk_ack = new_disk_ack, 1573 .remove_disk = remove_disk, 1574 .load_bitmaps = load_bitmaps, 1575 .gather_bitmaps = gather_bitmaps, 1576 .resize_bitmaps = resize_bitmaps, 1577 .lock_all_bitmaps = lock_all_bitmaps, 1578 .unlock_all_bitmaps = unlock_all_bitmaps, 1579 .update_size = update_size, 1580 }; 1581 1582 static int __init cluster_init(void) 1583 { 1584 pr_warn("md-cluster: support raid1 and raid10 (limited support)\n"); 1585 pr_info("Registering Cluster MD functions\n"); 1586 register_md_cluster_operations(&cluster_ops, THIS_MODULE); 1587 return 0; 1588 } 1589 1590 static void cluster_exit(void) 1591 { 1592 unregister_md_cluster_operations(); 1593 } 1594 1595 module_init(cluster_init); 1596 module_exit(cluster_exit); 1597 MODULE_AUTHOR("SUSE"); 1598 MODULE_LICENSE("GPL"); 1599 MODULE_DESCRIPTION("Clustering support for MD"); 1600