1 /* 2 * Copyright (C) 2015, SUSE 3 * 4 * This program is free software; you can redistribute it and/or modify 5 * it under the terms of the GNU General Public License as published by 6 * the Free Software Foundation; either version 2, or (at your option) 7 * any later version. 8 * 9 */ 10 11 12 #include <linux/module.h> 13 #include <linux/kthread.h> 14 #include <linux/dlm.h> 15 #include <linux/sched.h> 16 #include <linux/raid/md_p.h> 17 #include "md.h" 18 #include "md-bitmap.h" 19 #include "md-cluster.h" 20 21 #define LVB_SIZE 64 22 #define NEW_DEV_TIMEOUT 5000 23 24 struct dlm_lock_resource { 25 dlm_lockspace_t *ls; 26 struct dlm_lksb lksb; 27 char *name; /* lock name. */ 28 uint32_t flags; /* flags to pass to dlm_lock() */ 29 wait_queue_head_t sync_locking; /* wait queue for synchronized locking */ 30 bool sync_locking_done; 31 void (*bast)(void *arg, int mode); /* blocking AST function pointer*/ 32 struct mddev *mddev; /* pointing back to mddev. */ 33 int mode; 34 }; 35 36 struct suspend_info { 37 int slot; 38 sector_t lo; 39 sector_t hi; 40 struct list_head list; 41 }; 42 43 struct resync_info { 44 __le64 lo; 45 __le64 hi; 46 }; 47 48 /* md_cluster_info flags */ 49 #define MD_CLUSTER_WAITING_FOR_NEWDISK 1 50 #define MD_CLUSTER_SUSPEND_READ_BALANCING 2 51 #define MD_CLUSTER_BEGIN_JOIN_CLUSTER 3 52 53 /* Lock the send communication. This is done through 54 * bit manipulation as opposed to a mutex in order to 55 * accomodate lock and hold. See next comment. 56 */ 57 #define MD_CLUSTER_SEND_LOCK 4 58 /* If cluster operations (such as adding a disk) must lock the 59 * communication channel, so as to perform extra operations 60 * (update metadata) and no other operation is allowed on the 61 * MD. Token needs to be locked and held until the operation 62 * completes witha md_update_sb(), which would eventually release 63 * the lock. 64 */ 65 #define MD_CLUSTER_SEND_LOCKED_ALREADY 5 66 /* We should receive message after node joined cluster and 67 * set up all the related infos such as bitmap and personality */ 68 #define MD_CLUSTER_ALREADY_IN_CLUSTER 6 69 #define MD_CLUSTER_PENDING_RECV_EVENT 7 70 #define MD_CLUSTER_HOLDING_MUTEX_FOR_RECVD 8 71 72 struct md_cluster_info { 73 struct mddev *mddev; /* the md device which md_cluster_info belongs to */ 74 /* dlm lock space and resources for clustered raid. */ 75 dlm_lockspace_t *lockspace; 76 int slot_number; 77 struct completion completion; 78 struct mutex recv_mutex; 79 struct dlm_lock_resource *bitmap_lockres; 80 struct dlm_lock_resource **other_bitmap_lockres; 81 struct dlm_lock_resource *resync_lockres; 82 struct list_head suspend_list; 83 spinlock_t suspend_lock; 84 struct md_thread *recovery_thread; 85 unsigned long recovery_map; 86 /* communication loc resources */ 87 struct dlm_lock_resource *ack_lockres; 88 struct dlm_lock_resource *message_lockres; 89 struct dlm_lock_resource *token_lockres; 90 struct dlm_lock_resource *no_new_dev_lockres; 91 struct md_thread *recv_thread; 92 struct completion newdisk_completion; 93 wait_queue_head_t wait; 94 unsigned long state; 95 /* record the region in RESYNCING message */ 96 sector_t sync_low; 97 sector_t sync_hi; 98 }; 99 100 enum msg_type { 101 METADATA_UPDATED = 0, 102 RESYNCING, 103 NEWDISK, 104 REMOVE, 105 RE_ADD, 106 BITMAP_NEEDS_SYNC, 107 CHANGE_CAPACITY, 108 }; 109 110 struct cluster_msg { 111 __le32 type; 112 __le32 slot; 113 /* TODO: Unionize this for smaller footprint */ 114 __le64 low; 115 __le64 high; 116 char uuid[16]; 117 __le32 raid_slot; 118 }; 119 120 static void sync_ast(void *arg) 121 { 122 struct dlm_lock_resource *res; 123 124 res = arg; 125 res->sync_locking_done = true; 126 wake_up(&res->sync_locking); 127 } 128 129 static int dlm_lock_sync(struct dlm_lock_resource *res, int mode) 130 { 131 int ret = 0; 132 133 ret = dlm_lock(res->ls, mode, &res->lksb, 134 res->flags, res->name, strlen(res->name), 135 0, sync_ast, res, res->bast); 136 if (ret) 137 return ret; 138 wait_event(res->sync_locking, res->sync_locking_done); 139 res->sync_locking_done = false; 140 if (res->lksb.sb_status == 0) 141 res->mode = mode; 142 return res->lksb.sb_status; 143 } 144 145 static int dlm_unlock_sync(struct dlm_lock_resource *res) 146 { 147 return dlm_lock_sync(res, DLM_LOCK_NL); 148 } 149 150 /* 151 * An variation of dlm_lock_sync, which make lock request could 152 * be interrupted 153 */ 154 static int dlm_lock_sync_interruptible(struct dlm_lock_resource *res, int mode, 155 struct mddev *mddev) 156 { 157 int ret = 0; 158 159 ret = dlm_lock(res->ls, mode, &res->lksb, 160 res->flags, res->name, strlen(res->name), 161 0, sync_ast, res, res->bast); 162 if (ret) 163 return ret; 164 165 wait_event(res->sync_locking, res->sync_locking_done 166 || kthread_should_stop() 167 || test_bit(MD_CLOSING, &mddev->flags)); 168 if (!res->sync_locking_done) { 169 /* 170 * the convert queue contains the lock request when request is 171 * interrupted, and sync_ast could still be run, so need to 172 * cancel the request and reset completion 173 */ 174 ret = dlm_unlock(res->ls, res->lksb.sb_lkid, DLM_LKF_CANCEL, 175 &res->lksb, res); 176 res->sync_locking_done = false; 177 if (unlikely(ret != 0)) 178 pr_info("failed to cancel previous lock request " 179 "%s return %d\n", res->name, ret); 180 return -EPERM; 181 } else 182 res->sync_locking_done = false; 183 if (res->lksb.sb_status == 0) 184 res->mode = mode; 185 return res->lksb.sb_status; 186 } 187 188 static struct dlm_lock_resource *lockres_init(struct mddev *mddev, 189 char *name, void (*bastfn)(void *arg, int mode), int with_lvb) 190 { 191 struct dlm_lock_resource *res = NULL; 192 int ret, namelen; 193 struct md_cluster_info *cinfo = mddev->cluster_info; 194 195 res = kzalloc(sizeof(struct dlm_lock_resource), GFP_KERNEL); 196 if (!res) 197 return NULL; 198 init_waitqueue_head(&res->sync_locking); 199 res->sync_locking_done = false; 200 res->ls = cinfo->lockspace; 201 res->mddev = mddev; 202 res->mode = DLM_LOCK_IV; 203 namelen = strlen(name); 204 res->name = kzalloc(namelen + 1, GFP_KERNEL); 205 if (!res->name) { 206 pr_err("md-cluster: Unable to allocate resource name for resource %s\n", name); 207 goto out_err; 208 } 209 strlcpy(res->name, name, namelen + 1); 210 if (with_lvb) { 211 res->lksb.sb_lvbptr = kzalloc(LVB_SIZE, GFP_KERNEL); 212 if (!res->lksb.sb_lvbptr) { 213 pr_err("md-cluster: Unable to allocate LVB for resource %s\n", name); 214 goto out_err; 215 } 216 res->flags = DLM_LKF_VALBLK; 217 } 218 219 if (bastfn) 220 res->bast = bastfn; 221 222 res->flags |= DLM_LKF_EXPEDITE; 223 224 ret = dlm_lock_sync(res, DLM_LOCK_NL); 225 if (ret) { 226 pr_err("md-cluster: Unable to lock NL on new lock resource %s\n", name); 227 goto out_err; 228 } 229 res->flags &= ~DLM_LKF_EXPEDITE; 230 res->flags |= DLM_LKF_CONVERT; 231 232 return res; 233 out_err: 234 kfree(res->lksb.sb_lvbptr); 235 kfree(res->name); 236 kfree(res); 237 return NULL; 238 } 239 240 static void lockres_free(struct dlm_lock_resource *res) 241 { 242 int ret = 0; 243 244 if (!res) 245 return; 246 247 /* 248 * use FORCEUNLOCK flag, so we can unlock even the lock is on the 249 * waiting or convert queue 250 */ 251 ret = dlm_unlock(res->ls, res->lksb.sb_lkid, DLM_LKF_FORCEUNLOCK, 252 &res->lksb, res); 253 if (unlikely(ret != 0)) 254 pr_err("failed to unlock %s return %d\n", res->name, ret); 255 else 256 wait_event(res->sync_locking, res->sync_locking_done); 257 258 kfree(res->name); 259 kfree(res->lksb.sb_lvbptr); 260 kfree(res); 261 } 262 263 static void add_resync_info(struct dlm_lock_resource *lockres, 264 sector_t lo, sector_t hi) 265 { 266 struct resync_info *ri; 267 268 ri = (struct resync_info *)lockres->lksb.sb_lvbptr; 269 ri->lo = cpu_to_le64(lo); 270 ri->hi = cpu_to_le64(hi); 271 } 272 273 static struct suspend_info *read_resync_info(struct mddev *mddev, struct dlm_lock_resource *lockres) 274 { 275 struct resync_info ri; 276 struct suspend_info *s = NULL; 277 sector_t hi = 0; 278 279 dlm_lock_sync(lockres, DLM_LOCK_CR); 280 memcpy(&ri, lockres->lksb.sb_lvbptr, sizeof(struct resync_info)); 281 hi = le64_to_cpu(ri.hi); 282 if (hi > 0) { 283 s = kzalloc(sizeof(struct suspend_info), GFP_KERNEL); 284 if (!s) 285 goto out; 286 s->hi = hi; 287 s->lo = le64_to_cpu(ri.lo); 288 } 289 dlm_unlock_sync(lockres); 290 out: 291 return s; 292 } 293 294 static void recover_bitmaps(struct md_thread *thread) 295 { 296 struct mddev *mddev = thread->mddev; 297 struct md_cluster_info *cinfo = mddev->cluster_info; 298 struct dlm_lock_resource *bm_lockres; 299 char str[64]; 300 int slot, ret; 301 struct suspend_info *s, *tmp; 302 sector_t lo, hi; 303 304 while (cinfo->recovery_map) { 305 slot = fls64((u64)cinfo->recovery_map) - 1; 306 307 /* Clear suspend_area associated with the bitmap */ 308 spin_lock_irq(&cinfo->suspend_lock); 309 list_for_each_entry_safe(s, tmp, &cinfo->suspend_list, list) 310 if (slot == s->slot) { 311 list_del(&s->list); 312 kfree(s); 313 } 314 spin_unlock_irq(&cinfo->suspend_lock); 315 316 snprintf(str, 64, "bitmap%04d", slot); 317 bm_lockres = lockres_init(mddev, str, NULL, 1); 318 if (!bm_lockres) { 319 pr_err("md-cluster: Cannot initialize bitmaps\n"); 320 goto clear_bit; 321 } 322 323 ret = dlm_lock_sync_interruptible(bm_lockres, DLM_LOCK_PW, mddev); 324 if (ret) { 325 pr_err("md-cluster: Could not DLM lock %s: %d\n", 326 str, ret); 327 goto clear_bit; 328 } 329 ret = md_bitmap_copy_from_slot(mddev, slot, &lo, &hi, true); 330 if (ret) { 331 pr_err("md-cluster: Could not copy data from bitmap %d\n", slot); 332 goto clear_bit; 333 } 334 if (hi > 0) { 335 if (lo < mddev->recovery_cp) 336 mddev->recovery_cp = lo; 337 /* wake up thread to continue resync in case resync 338 * is not finished */ 339 if (mddev->recovery_cp != MaxSector) { 340 set_bit(MD_RECOVERY_NEEDED, &mddev->recovery); 341 md_wakeup_thread(mddev->thread); 342 } 343 } 344 clear_bit: 345 lockres_free(bm_lockres); 346 clear_bit(slot, &cinfo->recovery_map); 347 } 348 } 349 350 static void recover_prep(void *arg) 351 { 352 struct mddev *mddev = arg; 353 struct md_cluster_info *cinfo = mddev->cluster_info; 354 set_bit(MD_CLUSTER_SUSPEND_READ_BALANCING, &cinfo->state); 355 } 356 357 static void __recover_slot(struct mddev *mddev, int slot) 358 { 359 struct md_cluster_info *cinfo = mddev->cluster_info; 360 361 set_bit(slot, &cinfo->recovery_map); 362 if (!cinfo->recovery_thread) { 363 cinfo->recovery_thread = md_register_thread(recover_bitmaps, 364 mddev, "recover"); 365 if (!cinfo->recovery_thread) { 366 pr_warn("md-cluster: Could not create recovery thread\n"); 367 return; 368 } 369 } 370 md_wakeup_thread(cinfo->recovery_thread); 371 } 372 373 static void recover_slot(void *arg, struct dlm_slot *slot) 374 { 375 struct mddev *mddev = arg; 376 struct md_cluster_info *cinfo = mddev->cluster_info; 377 378 pr_info("md-cluster: %s Node %d/%d down. My slot: %d. Initiating recovery.\n", 379 mddev->bitmap_info.cluster_name, 380 slot->nodeid, slot->slot, 381 cinfo->slot_number); 382 /* deduct one since dlm slot starts from one while the num of 383 * cluster-md begins with 0 */ 384 __recover_slot(mddev, slot->slot - 1); 385 } 386 387 static void recover_done(void *arg, struct dlm_slot *slots, 388 int num_slots, int our_slot, 389 uint32_t generation) 390 { 391 struct mddev *mddev = arg; 392 struct md_cluster_info *cinfo = mddev->cluster_info; 393 394 cinfo->slot_number = our_slot; 395 /* completion is only need to be complete when node join cluster, 396 * it doesn't need to run during another node's failure */ 397 if (test_bit(MD_CLUSTER_BEGIN_JOIN_CLUSTER, &cinfo->state)) { 398 complete(&cinfo->completion); 399 clear_bit(MD_CLUSTER_BEGIN_JOIN_CLUSTER, &cinfo->state); 400 } 401 clear_bit(MD_CLUSTER_SUSPEND_READ_BALANCING, &cinfo->state); 402 } 403 404 /* the ops is called when node join the cluster, and do lock recovery 405 * if node failure occurs */ 406 static const struct dlm_lockspace_ops md_ls_ops = { 407 .recover_prep = recover_prep, 408 .recover_slot = recover_slot, 409 .recover_done = recover_done, 410 }; 411 412 /* 413 * The BAST function for the ack lock resource 414 * This function wakes up the receive thread in 415 * order to receive and process the message. 416 */ 417 static void ack_bast(void *arg, int mode) 418 { 419 struct dlm_lock_resource *res = arg; 420 struct md_cluster_info *cinfo = res->mddev->cluster_info; 421 422 if (mode == DLM_LOCK_EX) { 423 if (test_bit(MD_CLUSTER_ALREADY_IN_CLUSTER, &cinfo->state)) 424 md_wakeup_thread(cinfo->recv_thread); 425 else 426 set_bit(MD_CLUSTER_PENDING_RECV_EVENT, &cinfo->state); 427 } 428 } 429 430 static void __remove_suspend_info(struct md_cluster_info *cinfo, int slot) 431 { 432 struct suspend_info *s, *tmp; 433 434 list_for_each_entry_safe(s, tmp, &cinfo->suspend_list, list) 435 if (slot == s->slot) { 436 list_del(&s->list); 437 kfree(s); 438 break; 439 } 440 } 441 442 static void remove_suspend_info(struct mddev *mddev, int slot) 443 { 444 struct md_cluster_info *cinfo = mddev->cluster_info; 445 mddev->pers->quiesce(mddev, 1); 446 spin_lock_irq(&cinfo->suspend_lock); 447 __remove_suspend_info(cinfo, slot); 448 spin_unlock_irq(&cinfo->suspend_lock); 449 mddev->pers->quiesce(mddev, 0); 450 } 451 452 453 static void process_suspend_info(struct mddev *mddev, 454 int slot, sector_t lo, sector_t hi) 455 { 456 struct md_cluster_info *cinfo = mddev->cluster_info; 457 struct suspend_info *s; 458 459 if (!hi) { 460 remove_suspend_info(mddev, slot); 461 set_bit(MD_RECOVERY_NEEDED, &mddev->recovery); 462 md_wakeup_thread(mddev->thread); 463 return; 464 } 465 466 /* 467 * The bitmaps are not same for different nodes 468 * if RESYNCING is happening in one node, then 469 * the node which received the RESYNCING message 470 * probably will perform resync with the region 471 * [lo, hi] again, so we could reduce resync time 472 * a lot if we can ensure that the bitmaps among 473 * different nodes are match up well. 474 * 475 * sync_low/hi is used to record the region which 476 * arrived in the previous RESYNCING message, 477 * 478 * Call bitmap_sync_with_cluster to clear 479 * NEEDED_MASK and set RESYNC_MASK since 480 * resync thread is running in another node, 481 * so we don't need to do the resync again 482 * with the same section */ 483 md_bitmap_sync_with_cluster(mddev, cinfo->sync_low, cinfo->sync_hi, lo, hi); 484 cinfo->sync_low = lo; 485 cinfo->sync_hi = hi; 486 487 s = kzalloc(sizeof(struct suspend_info), GFP_KERNEL); 488 if (!s) 489 return; 490 s->slot = slot; 491 s->lo = lo; 492 s->hi = hi; 493 mddev->pers->quiesce(mddev, 1); 494 spin_lock_irq(&cinfo->suspend_lock); 495 /* Remove existing entry (if exists) before adding */ 496 __remove_suspend_info(cinfo, slot); 497 list_add(&s->list, &cinfo->suspend_list); 498 spin_unlock_irq(&cinfo->suspend_lock); 499 mddev->pers->quiesce(mddev, 0); 500 } 501 502 static void process_add_new_disk(struct mddev *mddev, struct cluster_msg *cmsg) 503 { 504 char disk_uuid[64]; 505 struct md_cluster_info *cinfo = mddev->cluster_info; 506 char event_name[] = "EVENT=ADD_DEVICE"; 507 char raid_slot[16]; 508 char *envp[] = {event_name, disk_uuid, raid_slot, NULL}; 509 int len; 510 511 len = snprintf(disk_uuid, 64, "DEVICE_UUID="); 512 sprintf(disk_uuid + len, "%pU", cmsg->uuid); 513 snprintf(raid_slot, 16, "RAID_DISK=%d", le32_to_cpu(cmsg->raid_slot)); 514 pr_info("%s:%d Sending kobject change with %s and %s\n", __func__, __LINE__, disk_uuid, raid_slot); 515 init_completion(&cinfo->newdisk_completion); 516 set_bit(MD_CLUSTER_WAITING_FOR_NEWDISK, &cinfo->state); 517 kobject_uevent_env(&disk_to_dev(mddev->gendisk)->kobj, KOBJ_CHANGE, envp); 518 wait_for_completion_timeout(&cinfo->newdisk_completion, 519 NEW_DEV_TIMEOUT); 520 clear_bit(MD_CLUSTER_WAITING_FOR_NEWDISK, &cinfo->state); 521 } 522 523 524 static void process_metadata_update(struct mddev *mddev, struct cluster_msg *msg) 525 { 526 int got_lock = 0; 527 struct md_cluster_info *cinfo = mddev->cluster_info; 528 mddev->good_device_nr = le32_to_cpu(msg->raid_slot); 529 530 dlm_lock_sync(cinfo->no_new_dev_lockres, DLM_LOCK_CR); 531 wait_event(mddev->thread->wqueue, 532 (got_lock = mddev_trylock(mddev)) || 533 test_bit(MD_CLUSTER_HOLDING_MUTEX_FOR_RECVD, &cinfo->state)); 534 md_reload_sb(mddev, mddev->good_device_nr); 535 if (got_lock) 536 mddev_unlock(mddev); 537 } 538 539 static void process_remove_disk(struct mddev *mddev, struct cluster_msg *msg) 540 { 541 struct md_rdev *rdev; 542 543 rcu_read_lock(); 544 rdev = md_find_rdev_nr_rcu(mddev, le32_to_cpu(msg->raid_slot)); 545 if (rdev) { 546 set_bit(ClusterRemove, &rdev->flags); 547 set_bit(MD_RECOVERY_NEEDED, &mddev->recovery); 548 md_wakeup_thread(mddev->thread); 549 } 550 else 551 pr_warn("%s: %d Could not find disk(%d) to REMOVE\n", 552 __func__, __LINE__, le32_to_cpu(msg->raid_slot)); 553 rcu_read_unlock(); 554 } 555 556 static void process_readd_disk(struct mddev *mddev, struct cluster_msg *msg) 557 { 558 struct md_rdev *rdev; 559 560 rcu_read_lock(); 561 rdev = md_find_rdev_nr_rcu(mddev, le32_to_cpu(msg->raid_slot)); 562 if (rdev && test_bit(Faulty, &rdev->flags)) 563 clear_bit(Faulty, &rdev->flags); 564 else 565 pr_warn("%s: %d Could not find disk(%d) which is faulty", 566 __func__, __LINE__, le32_to_cpu(msg->raid_slot)); 567 rcu_read_unlock(); 568 } 569 570 static int process_recvd_msg(struct mddev *mddev, struct cluster_msg *msg) 571 { 572 int ret = 0; 573 574 if (WARN(mddev->cluster_info->slot_number - 1 == le32_to_cpu(msg->slot), 575 "node %d received it's own msg\n", le32_to_cpu(msg->slot))) 576 return -1; 577 switch (le32_to_cpu(msg->type)) { 578 case METADATA_UPDATED: 579 process_metadata_update(mddev, msg); 580 break; 581 case CHANGE_CAPACITY: 582 set_capacity(mddev->gendisk, mddev->array_sectors); 583 revalidate_disk(mddev->gendisk); 584 break; 585 case RESYNCING: 586 process_suspend_info(mddev, le32_to_cpu(msg->slot), 587 le64_to_cpu(msg->low), 588 le64_to_cpu(msg->high)); 589 break; 590 case NEWDISK: 591 process_add_new_disk(mddev, msg); 592 break; 593 case REMOVE: 594 process_remove_disk(mddev, msg); 595 break; 596 case RE_ADD: 597 process_readd_disk(mddev, msg); 598 break; 599 case BITMAP_NEEDS_SYNC: 600 __recover_slot(mddev, le32_to_cpu(msg->slot)); 601 break; 602 default: 603 ret = -1; 604 pr_warn("%s:%d Received unknown message from %d\n", 605 __func__, __LINE__, msg->slot); 606 } 607 return ret; 608 } 609 610 /* 611 * thread for receiving message 612 */ 613 static void recv_daemon(struct md_thread *thread) 614 { 615 struct md_cluster_info *cinfo = thread->mddev->cluster_info; 616 struct dlm_lock_resource *ack_lockres = cinfo->ack_lockres; 617 struct dlm_lock_resource *message_lockres = cinfo->message_lockres; 618 struct cluster_msg msg; 619 int ret; 620 621 mutex_lock(&cinfo->recv_mutex); 622 /*get CR on Message*/ 623 if (dlm_lock_sync(message_lockres, DLM_LOCK_CR)) { 624 pr_err("md/raid1:failed to get CR on MESSAGE\n"); 625 mutex_unlock(&cinfo->recv_mutex); 626 return; 627 } 628 629 /* read lvb and wake up thread to process this message_lockres */ 630 memcpy(&msg, message_lockres->lksb.sb_lvbptr, sizeof(struct cluster_msg)); 631 ret = process_recvd_msg(thread->mddev, &msg); 632 if (ret) 633 goto out; 634 635 /*release CR on ack_lockres*/ 636 ret = dlm_unlock_sync(ack_lockres); 637 if (unlikely(ret != 0)) 638 pr_info("unlock ack failed return %d\n", ret); 639 /*up-convert to PR on message_lockres*/ 640 ret = dlm_lock_sync(message_lockres, DLM_LOCK_PR); 641 if (unlikely(ret != 0)) 642 pr_info("lock PR on msg failed return %d\n", ret); 643 /*get CR on ack_lockres again*/ 644 ret = dlm_lock_sync(ack_lockres, DLM_LOCK_CR); 645 if (unlikely(ret != 0)) 646 pr_info("lock CR on ack failed return %d\n", ret); 647 out: 648 /*release CR on message_lockres*/ 649 ret = dlm_unlock_sync(message_lockres); 650 if (unlikely(ret != 0)) 651 pr_info("unlock msg failed return %d\n", ret); 652 mutex_unlock(&cinfo->recv_mutex); 653 } 654 655 /* lock_token() 656 * Takes the lock on the TOKEN lock resource so no other 657 * node can communicate while the operation is underway. 658 */ 659 static int lock_token(struct md_cluster_info *cinfo, bool mddev_locked) 660 { 661 int error, set_bit = 0; 662 struct mddev *mddev = cinfo->mddev; 663 664 /* 665 * If resync thread run after raid1d thread, then process_metadata_update 666 * could not continue if raid1d held reconfig_mutex (and raid1d is blocked 667 * since another node already got EX on Token and waitting the EX of Ack), 668 * so let resync wake up thread in case flag is set. 669 */ 670 if (mddev_locked && !test_bit(MD_CLUSTER_HOLDING_MUTEX_FOR_RECVD, 671 &cinfo->state)) { 672 error = test_and_set_bit_lock(MD_CLUSTER_HOLDING_MUTEX_FOR_RECVD, 673 &cinfo->state); 674 WARN_ON_ONCE(error); 675 md_wakeup_thread(mddev->thread); 676 set_bit = 1; 677 } 678 error = dlm_lock_sync(cinfo->token_lockres, DLM_LOCK_EX); 679 if (set_bit) 680 clear_bit_unlock(MD_CLUSTER_HOLDING_MUTEX_FOR_RECVD, &cinfo->state); 681 682 if (error) 683 pr_err("md-cluster(%s:%d): failed to get EX on TOKEN (%d)\n", 684 __func__, __LINE__, error); 685 686 /* Lock the receive sequence */ 687 mutex_lock(&cinfo->recv_mutex); 688 return error; 689 } 690 691 /* lock_comm() 692 * Sets the MD_CLUSTER_SEND_LOCK bit to lock the send channel. 693 */ 694 static int lock_comm(struct md_cluster_info *cinfo, bool mddev_locked) 695 { 696 wait_event(cinfo->wait, 697 !test_and_set_bit(MD_CLUSTER_SEND_LOCK, &cinfo->state)); 698 699 return lock_token(cinfo, mddev_locked); 700 } 701 702 static void unlock_comm(struct md_cluster_info *cinfo) 703 { 704 WARN_ON(cinfo->token_lockres->mode != DLM_LOCK_EX); 705 mutex_unlock(&cinfo->recv_mutex); 706 dlm_unlock_sync(cinfo->token_lockres); 707 clear_bit(MD_CLUSTER_SEND_LOCK, &cinfo->state); 708 wake_up(&cinfo->wait); 709 } 710 711 /* __sendmsg() 712 * This function performs the actual sending of the message. This function is 713 * usually called after performing the encompassing operation 714 * The function: 715 * 1. Grabs the message lockresource in EX mode 716 * 2. Copies the message to the message LVB 717 * 3. Downconverts message lockresource to CW 718 * 4. Upconverts ack lock resource from CR to EX. This forces the BAST on other nodes 719 * and the other nodes read the message. The thread will wait here until all other 720 * nodes have released ack lock resource. 721 * 5. Downconvert ack lockresource to CR 722 */ 723 static int __sendmsg(struct md_cluster_info *cinfo, struct cluster_msg *cmsg) 724 { 725 int error; 726 int slot = cinfo->slot_number - 1; 727 728 cmsg->slot = cpu_to_le32(slot); 729 /*get EX on Message*/ 730 error = dlm_lock_sync(cinfo->message_lockres, DLM_LOCK_EX); 731 if (error) { 732 pr_err("md-cluster: failed to get EX on MESSAGE (%d)\n", error); 733 goto failed_message; 734 } 735 736 memcpy(cinfo->message_lockres->lksb.sb_lvbptr, (void *)cmsg, 737 sizeof(struct cluster_msg)); 738 /*down-convert EX to CW on Message*/ 739 error = dlm_lock_sync(cinfo->message_lockres, DLM_LOCK_CW); 740 if (error) { 741 pr_err("md-cluster: failed to convert EX to CW on MESSAGE(%d)\n", 742 error); 743 goto failed_ack; 744 } 745 746 /*up-convert CR to EX on Ack*/ 747 error = dlm_lock_sync(cinfo->ack_lockres, DLM_LOCK_EX); 748 if (error) { 749 pr_err("md-cluster: failed to convert CR to EX on ACK(%d)\n", 750 error); 751 goto failed_ack; 752 } 753 754 /*down-convert EX to CR on Ack*/ 755 error = dlm_lock_sync(cinfo->ack_lockres, DLM_LOCK_CR); 756 if (error) { 757 pr_err("md-cluster: failed to convert EX to CR on ACK(%d)\n", 758 error); 759 goto failed_ack; 760 } 761 762 failed_ack: 763 error = dlm_unlock_sync(cinfo->message_lockres); 764 if (unlikely(error != 0)) { 765 pr_err("md-cluster: failed convert to NL on MESSAGE(%d)\n", 766 error); 767 /* in case the message can't be released due to some reason */ 768 goto failed_ack; 769 } 770 failed_message: 771 return error; 772 } 773 774 static int sendmsg(struct md_cluster_info *cinfo, struct cluster_msg *cmsg, 775 bool mddev_locked) 776 { 777 int ret; 778 779 lock_comm(cinfo, mddev_locked); 780 ret = __sendmsg(cinfo, cmsg); 781 unlock_comm(cinfo); 782 return ret; 783 } 784 785 static int gather_all_resync_info(struct mddev *mddev, int total_slots) 786 { 787 struct md_cluster_info *cinfo = mddev->cluster_info; 788 int i, ret = 0; 789 struct dlm_lock_resource *bm_lockres; 790 struct suspend_info *s; 791 char str[64]; 792 sector_t lo, hi; 793 794 795 for (i = 0; i < total_slots; i++) { 796 memset(str, '\0', 64); 797 snprintf(str, 64, "bitmap%04d", i); 798 bm_lockres = lockres_init(mddev, str, NULL, 1); 799 if (!bm_lockres) 800 return -ENOMEM; 801 if (i == (cinfo->slot_number - 1)) { 802 lockres_free(bm_lockres); 803 continue; 804 } 805 806 bm_lockres->flags |= DLM_LKF_NOQUEUE; 807 ret = dlm_lock_sync(bm_lockres, DLM_LOCK_PW); 808 if (ret == -EAGAIN) { 809 s = read_resync_info(mddev, bm_lockres); 810 if (s) { 811 pr_info("%s:%d Resync[%llu..%llu] in progress on %d\n", 812 __func__, __LINE__, 813 (unsigned long long) s->lo, 814 (unsigned long long) s->hi, i); 815 spin_lock_irq(&cinfo->suspend_lock); 816 s->slot = i; 817 list_add(&s->list, &cinfo->suspend_list); 818 spin_unlock_irq(&cinfo->suspend_lock); 819 } 820 ret = 0; 821 lockres_free(bm_lockres); 822 continue; 823 } 824 if (ret) { 825 lockres_free(bm_lockres); 826 goto out; 827 } 828 829 /* Read the disk bitmap sb and check if it needs recovery */ 830 ret = md_bitmap_copy_from_slot(mddev, i, &lo, &hi, false); 831 if (ret) { 832 pr_warn("md-cluster: Could not gather bitmaps from slot %d", i); 833 lockres_free(bm_lockres); 834 continue; 835 } 836 if ((hi > 0) && (lo < mddev->recovery_cp)) { 837 set_bit(MD_RECOVERY_NEEDED, &mddev->recovery); 838 mddev->recovery_cp = lo; 839 md_check_recovery(mddev); 840 } 841 842 lockres_free(bm_lockres); 843 } 844 out: 845 return ret; 846 } 847 848 static int join(struct mddev *mddev, int nodes) 849 { 850 struct md_cluster_info *cinfo; 851 int ret, ops_rv; 852 char str[64]; 853 854 cinfo = kzalloc(sizeof(struct md_cluster_info), GFP_KERNEL); 855 if (!cinfo) 856 return -ENOMEM; 857 858 INIT_LIST_HEAD(&cinfo->suspend_list); 859 spin_lock_init(&cinfo->suspend_lock); 860 init_completion(&cinfo->completion); 861 set_bit(MD_CLUSTER_BEGIN_JOIN_CLUSTER, &cinfo->state); 862 init_waitqueue_head(&cinfo->wait); 863 mutex_init(&cinfo->recv_mutex); 864 865 mddev->cluster_info = cinfo; 866 cinfo->mddev = mddev; 867 868 memset(str, 0, 64); 869 sprintf(str, "%pU", mddev->uuid); 870 ret = dlm_new_lockspace(str, mddev->bitmap_info.cluster_name, 871 DLM_LSFL_FS, LVB_SIZE, 872 &md_ls_ops, mddev, &ops_rv, &cinfo->lockspace); 873 if (ret) 874 goto err; 875 wait_for_completion(&cinfo->completion); 876 if (nodes < cinfo->slot_number) { 877 pr_err("md-cluster: Slot allotted(%d) is greater than available slots(%d).", 878 cinfo->slot_number, nodes); 879 ret = -ERANGE; 880 goto err; 881 } 882 /* Initiate the communication resources */ 883 ret = -ENOMEM; 884 cinfo->recv_thread = md_register_thread(recv_daemon, mddev, "cluster_recv"); 885 if (!cinfo->recv_thread) { 886 pr_err("md-cluster: cannot allocate memory for recv_thread!\n"); 887 goto err; 888 } 889 cinfo->message_lockres = lockres_init(mddev, "message", NULL, 1); 890 if (!cinfo->message_lockres) 891 goto err; 892 cinfo->token_lockres = lockres_init(mddev, "token", NULL, 0); 893 if (!cinfo->token_lockres) 894 goto err; 895 cinfo->no_new_dev_lockres = lockres_init(mddev, "no-new-dev", NULL, 0); 896 if (!cinfo->no_new_dev_lockres) 897 goto err; 898 899 ret = dlm_lock_sync(cinfo->token_lockres, DLM_LOCK_EX); 900 if (ret) { 901 ret = -EAGAIN; 902 pr_err("md-cluster: can't join cluster to avoid lock issue\n"); 903 goto err; 904 } 905 cinfo->ack_lockres = lockres_init(mddev, "ack", ack_bast, 0); 906 if (!cinfo->ack_lockres) { 907 ret = -ENOMEM; 908 goto err; 909 } 910 /* get sync CR lock on ACK. */ 911 if (dlm_lock_sync(cinfo->ack_lockres, DLM_LOCK_CR)) 912 pr_err("md-cluster: failed to get a sync CR lock on ACK!(%d)\n", 913 ret); 914 dlm_unlock_sync(cinfo->token_lockres); 915 /* get sync CR lock on no-new-dev. */ 916 if (dlm_lock_sync(cinfo->no_new_dev_lockres, DLM_LOCK_CR)) 917 pr_err("md-cluster: failed to get a sync CR lock on no-new-dev!(%d)\n", ret); 918 919 920 pr_info("md-cluster: Joined cluster %s slot %d\n", str, cinfo->slot_number); 921 snprintf(str, 64, "bitmap%04d", cinfo->slot_number - 1); 922 cinfo->bitmap_lockres = lockres_init(mddev, str, NULL, 1); 923 if (!cinfo->bitmap_lockres) { 924 ret = -ENOMEM; 925 goto err; 926 } 927 if (dlm_lock_sync(cinfo->bitmap_lockres, DLM_LOCK_PW)) { 928 pr_err("Failed to get bitmap lock\n"); 929 ret = -EINVAL; 930 goto err; 931 } 932 933 cinfo->resync_lockres = lockres_init(mddev, "resync", NULL, 0); 934 if (!cinfo->resync_lockres) { 935 ret = -ENOMEM; 936 goto err; 937 } 938 939 return 0; 940 err: 941 set_bit(MD_CLUSTER_HOLDING_MUTEX_FOR_RECVD, &cinfo->state); 942 md_unregister_thread(&cinfo->recovery_thread); 943 md_unregister_thread(&cinfo->recv_thread); 944 lockres_free(cinfo->message_lockres); 945 lockres_free(cinfo->token_lockres); 946 lockres_free(cinfo->ack_lockres); 947 lockres_free(cinfo->no_new_dev_lockres); 948 lockres_free(cinfo->resync_lockres); 949 lockres_free(cinfo->bitmap_lockres); 950 if (cinfo->lockspace) 951 dlm_release_lockspace(cinfo->lockspace, 2); 952 mddev->cluster_info = NULL; 953 kfree(cinfo); 954 return ret; 955 } 956 957 static void load_bitmaps(struct mddev *mddev, int total_slots) 958 { 959 struct md_cluster_info *cinfo = mddev->cluster_info; 960 961 /* load all the node's bitmap info for resync */ 962 if (gather_all_resync_info(mddev, total_slots)) 963 pr_err("md-cluster: failed to gather all resyn infos\n"); 964 set_bit(MD_CLUSTER_ALREADY_IN_CLUSTER, &cinfo->state); 965 /* wake up recv thread in case something need to be handled */ 966 if (test_and_clear_bit(MD_CLUSTER_PENDING_RECV_EVENT, &cinfo->state)) 967 md_wakeup_thread(cinfo->recv_thread); 968 } 969 970 static void resync_bitmap(struct mddev *mddev) 971 { 972 struct md_cluster_info *cinfo = mddev->cluster_info; 973 struct cluster_msg cmsg = {0}; 974 int err; 975 976 cmsg.type = cpu_to_le32(BITMAP_NEEDS_SYNC); 977 err = sendmsg(cinfo, &cmsg, 1); 978 if (err) 979 pr_err("%s:%d: failed to send BITMAP_NEEDS_SYNC message (%d)\n", 980 __func__, __LINE__, err); 981 } 982 983 static void unlock_all_bitmaps(struct mddev *mddev); 984 static int leave(struct mddev *mddev) 985 { 986 struct md_cluster_info *cinfo = mddev->cluster_info; 987 988 if (!cinfo) 989 return 0; 990 991 /* BITMAP_NEEDS_SYNC message should be sent when node 992 * is leaving the cluster with dirty bitmap, also we 993 * can only deliver it when dlm connection is available */ 994 if (cinfo->slot_number > 0 && mddev->recovery_cp != MaxSector) 995 resync_bitmap(mddev); 996 997 set_bit(MD_CLUSTER_HOLDING_MUTEX_FOR_RECVD, &cinfo->state); 998 md_unregister_thread(&cinfo->recovery_thread); 999 md_unregister_thread(&cinfo->recv_thread); 1000 lockres_free(cinfo->message_lockres); 1001 lockres_free(cinfo->token_lockres); 1002 lockres_free(cinfo->ack_lockres); 1003 lockres_free(cinfo->no_new_dev_lockres); 1004 lockres_free(cinfo->resync_lockres); 1005 lockres_free(cinfo->bitmap_lockres); 1006 unlock_all_bitmaps(mddev); 1007 dlm_release_lockspace(cinfo->lockspace, 2); 1008 kfree(cinfo); 1009 return 0; 1010 } 1011 1012 /* slot_number(): Returns the MD slot number to use 1013 * DLM starts the slot numbers from 1, wheras cluster-md 1014 * wants the number to be from zero, so we deduct one 1015 */ 1016 static int slot_number(struct mddev *mddev) 1017 { 1018 struct md_cluster_info *cinfo = mddev->cluster_info; 1019 1020 return cinfo->slot_number - 1; 1021 } 1022 1023 /* 1024 * Check if the communication is already locked, else lock the communication 1025 * channel. 1026 * If it is already locked, token is in EX mode, and hence lock_token() 1027 * should not be called. 1028 */ 1029 static int metadata_update_start(struct mddev *mddev) 1030 { 1031 struct md_cluster_info *cinfo = mddev->cluster_info; 1032 int ret; 1033 1034 /* 1035 * metadata_update_start is always called with the protection of 1036 * reconfig_mutex, so set WAITING_FOR_TOKEN here. 1037 */ 1038 ret = test_and_set_bit_lock(MD_CLUSTER_HOLDING_MUTEX_FOR_RECVD, 1039 &cinfo->state); 1040 WARN_ON_ONCE(ret); 1041 md_wakeup_thread(mddev->thread); 1042 1043 wait_event(cinfo->wait, 1044 !test_and_set_bit(MD_CLUSTER_SEND_LOCK, &cinfo->state) || 1045 test_and_clear_bit(MD_CLUSTER_SEND_LOCKED_ALREADY, &cinfo->state)); 1046 1047 /* If token is already locked, return 0 */ 1048 if (cinfo->token_lockres->mode == DLM_LOCK_EX) { 1049 clear_bit_unlock(MD_CLUSTER_HOLDING_MUTEX_FOR_RECVD, &cinfo->state); 1050 return 0; 1051 } 1052 1053 ret = lock_token(cinfo, 1); 1054 clear_bit_unlock(MD_CLUSTER_HOLDING_MUTEX_FOR_RECVD, &cinfo->state); 1055 return ret; 1056 } 1057 1058 static int metadata_update_finish(struct mddev *mddev) 1059 { 1060 struct md_cluster_info *cinfo = mddev->cluster_info; 1061 struct cluster_msg cmsg; 1062 struct md_rdev *rdev; 1063 int ret = 0; 1064 int raid_slot = -1; 1065 1066 memset(&cmsg, 0, sizeof(cmsg)); 1067 cmsg.type = cpu_to_le32(METADATA_UPDATED); 1068 /* Pick up a good active device number to send. 1069 */ 1070 rdev_for_each(rdev, mddev) 1071 if (rdev->raid_disk > -1 && !test_bit(Faulty, &rdev->flags)) { 1072 raid_slot = rdev->desc_nr; 1073 break; 1074 } 1075 if (raid_slot >= 0) { 1076 cmsg.raid_slot = cpu_to_le32(raid_slot); 1077 ret = __sendmsg(cinfo, &cmsg); 1078 } else 1079 pr_warn("md-cluster: No good device id found to send\n"); 1080 clear_bit(MD_CLUSTER_SEND_LOCKED_ALREADY, &cinfo->state); 1081 unlock_comm(cinfo); 1082 return ret; 1083 } 1084 1085 static void metadata_update_cancel(struct mddev *mddev) 1086 { 1087 struct md_cluster_info *cinfo = mddev->cluster_info; 1088 clear_bit(MD_CLUSTER_SEND_LOCKED_ALREADY, &cinfo->state); 1089 unlock_comm(cinfo); 1090 } 1091 1092 /* 1093 * return 0 if all the bitmaps have the same sync_size 1094 */ 1095 static int cluster_check_sync_size(struct mddev *mddev) 1096 { 1097 int i, rv; 1098 bitmap_super_t *sb; 1099 unsigned long my_sync_size, sync_size = 0; 1100 int node_num = mddev->bitmap_info.nodes; 1101 int current_slot = md_cluster_ops->slot_number(mddev); 1102 struct bitmap *bitmap = mddev->bitmap; 1103 char str[64]; 1104 struct dlm_lock_resource *bm_lockres; 1105 1106 sb = kmap_atomic(bitmap->storage.sb_page); 1107 my_sync_size = sb->sync_size; 1108 kunmap_atomic(sb); 1109 1110 for (i = 0; i < node_num; i++) { 1111 if (i == current_slot) 1112 continue; 1113 1114 bitmap = get_bitmap_from_slot(mddev, i); 1115 if (IS_ERR(bitmap)) { 1116 pr_err("can't get bitmap from slot %d\n", i); 1117 return -1; 1118 } 1119 1120 /* 1121 * If we can hold the bitmap lock of one node then 1122 * the slot is not occupied, update the sb. 1123 */ 1124 snprintf(str, 64, "bitmap%04d", i); 1125 bm_lockres = lockres_init(mddev, str, NULL, 1); 1126 if (!bm_lockres) { 1127 pr_err("md-cluster: Cannot initialize %s\n", str); 1128 md_bitmap_free(bitmap); 1129 return -1; 1130 } 1131 bm_lockres->flags |= DLM_LKF_NOQUEUE; 1132 rv = dlm_lock_sync(bm_lockres, DLM_LOCK_PW); 1133 if (!rv) 1134 md_bitmap_update_sb(bitmap); 1135 lockres_free(bm_lockres); 1136 1137 sb = kmap_atomic(bitmap->storage.sb_page); 1138 if (sync_size == 0) 1139 sync_size = sb->sync_size; 1140 else if (sync_size != sb->sync_size) { 1141 kunmap_atomic(sb); 1142 md_bitmap_free(bitmap); 1143 return -1; 1144 } 1145 kunmap_atomic(sb); 1146 md_bitmap_free(bitmap); 1147 } 1148 1149 return (my_sync_size == sync_size) ? 0 : -1; 1150 } 1151 1152 /* 1153 * Update the size for cluster raid is a little more complex, we perform it 1154 * by the steps: 1155 * 1. hold token lock and update superblock in initiator node. 1156 * 2. send METADATA_UPDATED msg to other nodes. 1157 * 3. The initiator node continues to check each bitmap's sync_size, if all 1158 * bitmaps have the same value of sync_size, then we can set capacity and 1159 * let other nodes to perform it. If one node can't update sync_size 1160 * accordingly, we need to revert to previous value. 1161 */ 1162 static void update_size(struct mddev *mddev, sector_t old_dev_sectors) 1163 { 1164 struct md_cluster_info *cinfo = mddev->cluster_info; 1165 struct cluster_msg cmsg; 1166 struct md_rdev *rdev; 1167 int ret = 0; 1168 int raid_slot = -1; 1169 1170 md_update_sb(mddev, 1); 1171 lock_comm(cinfo, 1); 1172 1173 memset(&cmsg, 0, sizeof(cmsg)); 1174 cmsg.type = cpu_to_le32(METADATA_UPDATED); 1175 rdev_for_each(rdev, mddev) 1176 if (rdev->raid_disk >= 0 && !test_bit(Faulty, &rdev->flags)) { 1177 raid_slot = rdev->desc_nr; 1178 break; 1179 } 1180 if (raid_slot >= 0) { 1181 cmsg.raid_slot = cpu_to_le32(raid_slot); 1182 /* 1183 * We can only change capiticy after all the nodes can do it, 1184 * so need to wait after other nodes already received the msg 1185 * and handled the change 1186 */ 1187 ret = __sendmsg(cinfo, &cmsg); 1188 if (ret) { 1189 pr_err("%s:%d: failed to send METADATA_UPDATED msg\n", 1190 __func__, __LINE__); 1191 unlock_comm(cinfo); 1192 return; 1193 } 1194 } else { 1195 pr_err("md-cluster: No good device id found to send\n"); 1196 unlock_comm(cinfo); 1197 return; 1198 } 1199 1200 /* 1201 * check the sync_size from other node's bitmap, if sync_size 1202 * have already updated in other nodes as expected, send an 1203 * empty metadata msg to permit the change of capacity 1204 */ 1205 if (cluster_check_sync_size(mddev) == 0) { 1206 memset(&cmsg, 0, sizeof(cmsg)); 1207 cmsg.type = cpu_to_le32(CHANGE_CAPACITY); 1208 ret = __sendmsg(cinfo, &cmsg); 1209 if (ret) 1210 pr_err("%s:%d: failed to send CHANGE_CAPACITY msg\n", 1211 __func__, __LINE__); 1212 set_capacity(mddev->gendisk, mddev->array_sectors); 1213 revalidate_disk(mddev->gendisk); 1214 } else { 1215 /* revert to previous sectors */ 1216 ret = mddev->pers->resize(mddev, old_dev_sectors); 1217 if (!ret) 1218 revalidate_disk(mddev->gendisk); 1219 ret = __sendmsg(cinfo, &cmsg); 1220 if (ret) 1221 pr_err("%s:%d: failed to send METADATA_UPDATED msg\n", 1222 __func__, __LINE__); 1223 } 1224 unlock_comm(cinfo); 1225 } 1226 1227 static int resync_start(struct mddev *mddev) 1228 { 1229 struct md_cluster_info *cinfo = mddev->cluster_info; 1230 return dlm_lock_sync_interruptible(cinfo->resync_lockres, DLM_LOCK_EX, mddev); 1231 } 1232 1233 static int resync_info_update(struct mddev *mddev, sector_t lo, sector_t hi) 1234 { 1235 struct md_cluster_info *cinfo = mddev->cluster_info; 1236 struct resync_info ri; 1237 struct cluster_msg cmsg = {0}; 1238 1239 /* do not send zero again, if we have sent before */ 1240 if (hi == 0) { 1241 memcpy(&ri, cinfo->bitmap_lockres->lksb.sb_lvbptr, sizeof(struct resync_info)); 1242 if (le64_to_cpu(ri.hi) == 0) 1243 return 0; 1244 } 1245 1246 add_resync_info(cinfo->bitmap_lockres, lo, hi); 1247 /* Re-acquire the lock to refresh LVB */ 1248 dlm_lock_sync(cinfo->bitmap_lockres, DLM_LOCK_PW); 1249 cmsg.type = cpu_to_le32(RESYNCING); 1250 cmsg.low = cpu_to_le64(lo); 1251 cmsg.high = cpu_to_le64(hi); 1252 1253 /* 1254 * mddev_lock is held if resync_info_update is called from 1255 * resync_finish (md_reap_sync_thread -> resync_finish) 1256 */ 1257 if (lo == 0 && hi == 0) 1258 return sendmsg(cinfo, &cmsg, 1); 1259 else 1260 return sendmsg(cinfo, &cmsg, 0); 1261 } 1262 1263 static int resync_finish(struct mddev *mddev) 1264 { 1265 struct md_cluster_info *cinfo = mddev->cluster_info; 1266 dlm_unlock_sync(cinfo->resync_lockres); 1267 return resync_info_update(mddev, 0, 0); 1268 } 1269 1270 static int area_resyncing(struct mddev *mddev, int direction, 1271 sector_t lo, sector_t hi) 1272 { 1273 struct md_cluster_info *cinfo = mddev->cluster_info; 1274 int ret = 0; 1275 struct suspend_info *s; 1276 1277 if ((direction == READ) && 1278 test_bit(MD_CLUSTER_SUSPEND_READ_BALANCING, &cinfo->state)) 1279 return 1; 1280 1281 spin_lock_irq(&cinfo->suspend_lock); 1282 if (list_empty(&cinfo->suspend_list)) 1283 goto out; 1284 list_for_each_entry(s, &cinfo->suspend_list, list) 1285 if (hi > s->lo && lo < s->hi) { 1286 ret = 1; 1287 break; 1288 } 1289 out: 1290 spin_unlock_irq(&cinfo->suspend_lock); 1291 return ret; 1292 } 1293 1294 /* add_new_disk() - initiates a disk add 1295 * However, if this fails before writing md_update_sb(), 1296 * add_new_disk_cancel() must be called to release token lock 1297 */ 1298 static int add_new_disk(struct mddev *mddev, struct md_rdev *rdev) 1299 { 1300 struct md_cluster_info *cinfo = mddev->cluster_info; 1301 struct cluster_msg cmsg; 1302 int ret = 0; 1303 struct mdp_superblock_1 *sb = page_address(rdev->sb_page); 1304 char *uuid = sb->device_uuid; 1305 1306 memset(&cmsg, 0, sizeof(cmsg)); 1307 cmsg.type = cpu_to_le32(NEWDISK); 1308 memcpy(cmsg.uuid, uuid, 16); 1309 cmsg.raid_slot = cpu_to_le32(rdev->desc_nr); 1310 lock_comm(cinfo, 1); 1311 ret = __sendmsg(cinfo, &cmsg); 1312 if (ret) { 1313 unlock_comm(cinfo); 1314 return ret; 1315 } 1316 cinfo->no_new_dev_lockres->flags |= DLM_LKF_NOQUEUE; 1317 ret = dlm_lock_sync(cinfo->no_new_dev_lockres, DLM_LOCK_EX); 1318 cinfo->no_new_dev_lockres->flags &= ~DLM_LKF_NOQUEUE; 1319 /* Some node does not "see" the device */ 1320 if (ret == -EAGAIN) 1321 ret = -ENOENT; 1322 if (ret) 1323 unlock_comm(cinfo); 1324 else { 1325 dlm_lock_sync(cinfo->no_new_dev_lockres, DLM_LOCK_CR); 1326 /* Since MD_CHANGE_DEVS will be set in add_bound_rdev which 1327 * will run soon after add_new_disk, the below path will be 1328 * invoked: 1329 * md_wakeup_thread(mddev->thread) 1330 * -> conf->thread (raid1d) 1331 * -> md_check_recovery -> md_update_sb 1332 * -> metadata_update_start/finish 1333 * MD_CLUSTER_SEND_LOCKED_ALREADY will be cleared eventually. 1334 * 1335 * For other failure cases, metadata_update_cancel and 1336 * add_new_disk_cancel also clear below bit as well. 1337 * */ 1338 set_bit(MD_CLUSTER_SEND_LOCKED_ALREADY, &cinfo->state); 1339 wake_up(&cinfo->wait); 1340 } 1341 return ret; 1342 } 1343 1344 static void add_new_disk_cancel(struct mddev *mddev) 1345 { 1346 struct md_cluster_info *cinfo = mddev->cluster_info; 1347 clear_bit(MD_CLUSTER_SEND_LOCKED_ALREADY, &cinfo->state); 1348 unlock_comm(cinfo); 1349 } 1350 1351 static int new_disk_ack(struct mddev *mddev, bool ack) 1352 { 1353 struct md_cluster_info *cinfo = mddev->cluster_info; 1354 1355 if (!test_bit(MD_CLUSTER_WAITING_FOR_NEWDISK, &cinfo->state)) { 1356 pr_warn("md-cluster(%s): Spurious cluster confirmation\n", mdname(mddev)); 1357 return -EINVAL; 1358 } 1359 1360 if (ack) 1361 dlm_unlock_sync(cinfo->no_new_dev_lockres); 1362 complete(&cinfo->newdisk_completion); 1363 return 0; 1364 } 1365 1366 static int remove_disk(struct mddev *mddev, struct md_rdev *rdev) 1367 { 1368 struct cluster_msg cmsg = {0}; 1369 struct md_cluster_info *cinfo = mddev->cluster_info; 1370 cmsg.type = cpu_to_le32(REMOVE); 1371 cmsg.raid_slot = cpu_to_le32(rdev->desc_nr); 1372 return sendmsg(cinfo, &cmsg, 1); 1373 } 1374 1375 static int lock_all_bitmaps(struct mddev *mddev) 1376 { 1377 int slot, my_slot, ret, held = 1, i = 0; 1378 char str[64]; 1379 struct md_cluster_info *cinfo = mddev->cluster_info; 1380 1381 cinfo->other_bitmap_lockres = kzalloc((mddev->bitmap_info.nodes - 1) * 1382 sizeof(struct dlm_lock_resource *), 1383 GFP_KERNEL); 1384 if (!cinfo->other_bitmap_lockres) { 1385 pr_err("md: can't alloc mem for other bitmap locks\n"); 1386 return 0; 1387 } 1388 1389 my_slot = slot_number(mddev); 1390 for (slot = 0; slot < mddev->bitmap_info.nodes; slot++) { 1391 if (slot == my_slot) 1392 continue; 1393 1394 memset(str, '\0', 64); 1395 snprintf(str, 64, "bitmap%04d", slot); 1396 cinfo->other_bitmap_lockres[i] = lockres_init(mddev, str, NULL, 1); 1397 if (!cinfo->other_bitmap_lockres[i]) 1398 return -ENOMEM; 1399 1400 cinfo->other_bitmap_lockres[i]->flags |= DLM_LKF_NOQUEUE; 1401 ret = dlm_lock_sync(cinfo->other_bitmap_lockres[i], DLM_LOCK_PW); 1402 if (ret) 1403 held = -1; 1404 i++; 1405 } 1406 1407 return held; 1408 } 1409 1410 static void unlock_all_bitmaps(struct mddev *mddev) 1411 { 1412 struct md_cluster_info *cinfo = mddev->cluster_info; 1413 int i; 1414 1415 /* release other node's bitmap lock if they are existed */ 1416 if (cinfo->other_bitmap_lockres) { 1417 for (i = 0; i < mddev->bitmap_info.nodes - 1; i++) { 1418 if (cinfo->other_bitmap_lockres[i]) { 1419 lockres_free(cinfo->other_bitmap_lockres[i]); 1420 } 1421 } 1422 kfree(cinfo->other_bitmap_lockres); 1423 } 1424 } 1425 1426 static int gather_bitmaps(struct md_rdev *rdev) 1427 { 1428 int sn, err; 1429 sector_t lo, hi; 1430 struct cluster_msg cmsg = {0}; 1431 struct mddev *mddev = rdev->mddev; 1432 struct md_cluster_info *cinfo = mddev->cluster_info; 1433 1434 cmsg.type = cpu_to_le32(RE_ADD); 1435 cmsg.raid_slot = cpu_to_le32(rdev->desc_nr); 1436 err = sendmsg(cinfo, &cmsg, 1); 1437 if (err) 1438 goto out; 1439 1440 for (sn = 0; sn < mddev->bitmap_info.nodes; sn++) { 1441 if (sn == (cinfo->slot_number - 1)) 1442 continue; 1443 err = md_bitmap_copy_from_slot(mddev, sn, &lo, &hi, false); 1444 if (err) { 1445 pr_warn("md-cluster: Could not gather bitmaps from slot %d", sn); 1446 goto out; 1447 } 1448 if ((hi > 0) && (lo < mddev->recovery_cp)) 1449 mddev->recovery_cp = lo; 1450 } 1451 out: 1452 return err; 1453 } 1454 1455 static struct md_cluster_operations cluster_ops = { 1456 .join = join, 1457 .leave = leave, 1458 .slot_number = slot_number, 1459 .resync_start = resync_start, 1460 .resync_finish = resync_finish, 1461 .resync_info_update = resync_info_update, 1462 .metadata_update_start = metadata_update_start, 1463 .metadata_update_finish = metadata_update_finish, 1464 .metadata_update_cancel = metadata_update_cancel, 1465 .area_resyncing = area_resyncing, 1466 .add_new_disk = add_new_disk, 1467 .add_new_disk_cancel = add_new_disk_cancel, 1468 .new_disk_ack = new_disk_ack, 1469 .remove_disk = remove_disk, 1470 .load_bitmaps = load_bitmaps, 1471 .gather_bitmaps = gather_bitmaps, 1472 .lock_all_bitmaps = lock_all_bitmaps, 1473 .unlock_all_bitmaps = unlock_all_bitmaps, 1474 .update_size = update_size, 1475 }; 1476 1477 static int __init cluster_init(void) 1478 { 1479 pr_warn("md-cluster: support raid1 and raid10 (limited support)\n"); 1480 pr_info("Registering Cluster MD functions\n"); 1481 register_md_cluster_operations(&cluster_ops, THIS_MODULE); 1482 return 0; 1483 } 1484 1485 static void cluster_exit(void) 1486 { 1487 unregister_md_cluster_operations(); 1488 } 1489 1490 module_init(cluster_init); 1491 module_exit(cluster_exit); 1492 MODULE_AUTHOR("SUSE"); 1493 MODULE_LICENSE("GPL"); 1494 MODULE_DESCRIPTION("Clustering support for MD"); 1495