1 /* 2 * Copyright (C) 2015, SUSE 3 * 4 * This program is free software; you can redistribute it and/or modify 5 * it under the terms of the GNU General Public License as published by 6 * the Free Software Foundation; either version 2, or (at your option) 7 * any later version. 8 * 9 */ 10 11 12 #include <linux/module.h> 13 #include <linux/kthread.h> 14 #include <linux/dlm.h> 15 #include <linux/sched.h> 16 #include <linux/raid/md_p.h> 17 #include "md.h" 18 #include "bitmap.h" 19 #include "md-cluster.h" 20 21 #define LVB_SIZE 64 22 #define NEW_DEV_TIMEOUT 5000 23 24 struct dlm_lock_resource { 25 dlm_lockspace_t *ls; 26 struct dlm_lksb lksb; 27 char *name; /* lock name. */ 28 uint32_t flags; /* flags to pass to dlm_lock() */ 29 wait_queue_head_t sync_locking; /* wait queue for synchronized locking */ 30 bool sync_locking_done; 31 void (*bast)(void *arg, int mode); /* blocking AST function pointer*/ 32 struct mddev *mddev; /* pointing back to mddev. */ 33 int mode; 34 }; 35 36 struct suspend_info { 37 int slot; 38 sector_t lo; 39 sector_t hi; 40 struct list_head list; 41 }; 42 43 struct resync_info { 44 __le64 lo; 45 __le64 hi; 46 }; 47 48 /* md_cluster_info flags */ 49 #define MD_CLUSTER_WAITING_FOR_NEWDISK 1 50 #define MD_CLUSTER_SUSPEND_READ_BALANCING 2 51 #define MD_CLUSTER_BEGIN_JOIN_CLUSTER 3 52 53 /* Lock the send communication. This is done through 54 * bit manipulation as opposed to a mutex in order to 55 * accomodate lock and hold. See next comment. 56 */ 57 #define MD_CLUSTER_SEND_LOCK 4 58 /* If cluster operations (such as adding a disk) must lock the 59 * communication channel, so as to perform extra operations 60 * (update metadata) and no other operation is allowed on the 61 * MD. Token needs to be locked and held until the operation 62 * completes witha md_update_sb(), which would eventually release 63 * the lock. 64 */ 65 #define MD_CLUSTER_SEND_LOCKED_ALREADY 5 66 /* We should receive message after node joined cluster and 67 * set up all the related infos such as bitmap and personality */ 68 #define MD_CLUSTER_ALREADY_IN_CLUSTER 6 69 #define MD_CLUSTER_PENDING_RECV_EVENT 7 70 #define MD_CLUSTER_HOLDING_MUTEX_FOR_RECVD 8 71 72 struct md_cluster_info { 73 struct mddev *mddev; /* the md device which md_cluster_info belongs to */ 74 /* dlm lock space and resources for clustered raid. */ 75 dlm_lockspace_t *lockspace; 76 int slot_number; 77 struct completion completion; 78 struct mutex recv_mutex; 79 struct dlm_lock_resource *bitmap_lockres; 80 struct dlm_lock_resource **other_bitmap_lockres; 81 struct dlm_lock_resource *resync_lockres; 82 struct list_head suspend_list; 83 spinlock_t suspend_lock; 84 struct md_thread *recovery_thread; 85 unsigned long recovery_map; 86 /* communication loc resources */ 87 struct dlm_lock_resource *ack_lockres; 88 struct dlm_lock_resource *message_lockres; 89 struct dlm_lock_resource *token_lockres; 90 struct dlm_lock_resource *no_new_dev_lockres; 91 struct md_thread *recv_thread; 92 struct completion newdisk_completion; 93 wait_queue_head_t wait; 94 unsigned long state; 95 /* record the region in RESYNCING message */ 96 sector_t sync_low; 97 sector_t sync_hi; 98 }; 99 100 enum msg_type { 101 METADATA_UPDATED = 0, 102 RESYNCING, 103 NEWDISK, 104 REMOVE, 105 RE_ADD, 106 BITMAP_NEEDS_SYNC, 107 }; 108 109 struct cluster_msg { 110 __le32 type; 111 __le32 slot; 112 /* TODO: Unionize this for smaller footprint */ 113 __le64 low; 114 __le64 high; 115 char uuid[16]; 116 __le32 raid_slot; 117 }; 118 119 static void sync_ast(void *arg) 120 { 121 struct dlm_lock_resource *res; 122 123 res = arg; 124 res->sync_locking_done = true; 125 wake_up(&res->sync_locking); 126 } 127 128 static int dlm_lock_sync(struct dlm_lock_resource *res, int mode) 129 { 130 int ret = 0; 131 132 ret = dlm_lock(res->ls, mode, &res->lksb, 133 res->flags, res->name, strlen(res->name), 134 0, sync_ast, res, res->bast); 135 if (ret) 136 return ret; 137 wait_event(res->sync_locking, res->sync_locking_done); 138 res->sync_locking_done = false; 139 if (res->lksb.sb_status == 0) 140 res->mode = mode; 141 return res->lksb.sb_status; 142 } 143 144 static int dlm_unlock_sync(struct dlm_lock_resource *res) 145 { 146 return dlm_lock_sync(res, DLM_LOCK_NL); 147 } 148 149 /* 150 * An variation of dlm_lock_sync, which make lock request could 151 * be interrupted 152 */ 153 static int dlm_lock_sync_interruptible(struct dlm_lock_resource *res, int mode, 154 struct mddev *mddev) 155 { 156 int ret = 0; 157 158 ret = dlm_lock(res->ls, mode, &res->lksb, 159 res->flags, res->name, strlen(res->name), 160 0, sync_ast, res, res->bast); 161 if (ret) 162 return ret; 163 164 wait_event(res->sync_locking, res->sync_locking_done 165 || kthread_should_stop() 166 || test_bit(MD_CLOSING, &mddev->flags)); 167 if (!res->sync_locking_done) { 168 /* 169 * the convert queue contains the lock request when request is 170 * interrupted, and sync_ast could still be run, so need to 171 * cancel the request and reset completion 172 */ 173 ret = dlm_unlock(res->ls, res->lksb.sb_lkid, DLM_LKF_CANCEL, 174 &res->lksb, res); 175 res->sync_locking_done = false; 176 if (unlikely(ret != 0)) 177 pr_info("failed to cancel previous lock request " 178 "%s return %d\n", res->name, ret); 179 return -EPERM; 180 } else 181 res->sync_locking_done = false; 182 if (res->lksb.sb_status == 0) 183 res->mode = mode; 184 return res->lksb.sb_status; 185 } 186 187 static struct dlm_lock_resource *lockres_init(struct mddev *mddev, 188 char *name, void (*bastfn)(void *arg, int mode), int with_lvb) 189 { 190 struct dlm_lock_resource *res = NULL; 191 int ret, namelen; 192 struct md_cluster_info *cinfo = mddev->cluster_info; 193 194 res = kzalloc(sizeof(struct dlm_lock_resource), GFP_KERNEL); 195 if (!res) 196 return NULL; 197 init_waitqueue_head(&res->sync_locking); 198 res->sync_locking_done = false; 199 res->ls = cinfo->lockspace; 200 res->mddev = mddev; 201 res->mode = DLM_LOCK_IV; 202 namelen = strlen(name); 203 res->name = kzalloc(namelen + 1, GFP_KERNEL); 204 if (!res->name) { 205 pr_err("md-cluster: Unable to allocate resource name for resource %s\n", name); 206 goto out_err; 207 } 208 strlcpy(res->name, name, namelen + 1); 209 if (with_lvb) { 210 res->lksb.sb_lvbptr = kzalloc(LVB_SIZE, GFP_KERNEL); 211 if (!res->lksb.sb_lvbptr) { 212 pr_err("md-cluster: Unable to allocate LVB for resource %s\n", name); 213 goto out_err; 214 } 215 res->flags = DLM_LKF_VALBLK; 216 } 217 218 if (bastfn) 219 res->bast = bastfn; 220 221 res->flags |= DLM_LKF_EXPEDITE; 222 223 ret = dlm_lock_sync(res, DLM_LOCK_NL); 224 if (ret) { 225 pr_err("md-cluster: Unable to lock NL on new lock resource %s\n", name); 226 goto out_err; 227 } 228 res->flags &= ~DLM_LKF_EXPEDITE; 229 res->flags |= DLM_LKF_CONVERT; 230 231 return res; 232 out_err: 233 kfree(res->lksb.sb_lvbptr); 234 kfree(res->name); 235 kfree(res); 236 return NULL; 237 } 238 239 static void lockres_free(struct dlm_lock_resource *res) 240 { 241 int ret = 0; 242 243 if (!res) 244 return; 245 246 /* 247 * use FORCEUNLOCK flag, so we can unlock even the lock is on the 248 * waiting or convert queue 249 */ 250 ret = dlm_unlock(res->ls, res->lksb.sb_lkid, DLM_LKF_FORCEUNLOCK, 251 &res->lksb, res); 252 if (unlikely(ret != 0)) 253 pr_err("failed to unlock %s return %d\n", res->name, ret); 254 else 255 wait_event(res->sync_locking, res->sync_locking_done); 256 257 kfree(res->name); 258 kfree(res->lksb.sb_lvbptr); 259 kfree(res); 260 } 261 262 static void add_resync_info(struct dlm_lock_resource *lockres, 263 sector_t lo, sector_t hi) 264 { 265 struct resync_info *ri; 266 267 ri = (struct resync_info *)lockres->lksb.sb_lvbptr; 268 ri->lo = cpu_to_le64(lo); 269 ri->hi = cpu_to_le64(hi); 270 } 271 272 static struct suspend_info *read_resync_info(struct mddev *mddev, struct dlm_lock_resource *lockres) 273 { 274 struct resync_info ri; 275 struct suspend_info *s = NULL; 276 sector_t hi = 0; 277 278 dlm_lock_sync(lockres, DLM_LOCK_CR); 279 memcpy(&ri, lockres->lksb.sb_lvbptr, sizeof(struct resync_info)); 280 hi = le64_to_cpu(ri.hi); 281 if (hi > 0) { 282 s = kzalloc(sizeof(struct suspend_info), GFP_KERNEL); 283 if (!s) 284 goto out; 285 s->hi = hi; 286 s->lo = le64_to_cpu(ri.lo); 287 } 288 dlm_unlock_sync(lockres); 289 out: 290 return s; 291 } 292 293 static void recover_bitmaps(struct md_thread *thread) 294 { 295 struct mddev *mddev = thread->mddev; 296 struct md_cluster_info *cinfo = mddev->cluster_info; 297 struct dlm_lock_resource *bm_lockres; 298 char str[64]; 299 int slot, ret; 300 struct suspend_info *s, *tmp; 301 sector_t lo, hi; 302 303 while (cinfo->recovery_map) { 304 slot = fls64((u64)cinfo->recovery_map) - 1; 305 306 /* Clear suspend_area associated with the bitmap */ 307 spin_lock_irq(&cinfo->suspend_lock); 308 list_for_each_entry_safe(s, tmp, &cinfo->suspend_list, list) 309 if (slot == s->slot) { 310 list_del(&s->list); 311 kfree(s); 312 } 313 spin_unlock_irq(&cinfo->suspend_lock); 314 315 snprintf(str, 64, "bitmap%04d", slot); 316 bm_lockres = lockres_init(mddev, str, NULL, 1); 317 if (!bm_lockres) { 318 pr_err("md-cluster: Cannot initialize bitmaps\n"); 319 goto clear_bit; 320 } 321 322 ret = dlm_lock_sync_interruptible(bm_lockres, DLM_LOCK_PW, mddev); 323 if (ret) { 324 pr_err("md-cluster: Could not DLM lock %s: %d\n", 325 str, ret); 326 goto clear_bit; 327 } 328 ret = bitmap_copy_from_slot(mddev, slot, &lo, &hi, true); 329 if (ret) { 330 pr_err("md-cluster: Could not copy data from bitmap %d\n", slot); 331 goto clear_bit; 332 } 333 if (hi > 0) { 334 if (lo < mddev->recovery_cp) 335 mddev->recovery_cp = lo; 336 /* wake up thread to continue resync in case resync 337 * is not finished */ 338 if (mddev->recovery_cp != MaxSector) { 339 set_bit(MD_RECOVERY_NEEDED, &mddev->recovery); 340 md_wakeup_thread(mddev->thread); 341 } 342 } 343 clear_bit: 344 lockres_free(bm_lockres); 345 clear_bit(slot, &cinfo->recovery_map); 346 } 347 } 348 349 static void recover_prep(void *arg) 350 { 351 struct mddev *mddev = arg; 352 struct md_cluster_info *cinfo = mddev->cluster_info; 353 set_bit(MD_CLUSTER_SUSPEND_READ_BALANCING, &cinfo->state); 354 } 355 356 static void __recover_slot(struct mddev *mddev, int slot) 357 { 358 struct md_cluster_info *cinfo = mddev->cluster_info; 359 360 set_bit(slot, &cinfo->recovery_map); 361 if (!cinfo->recovery_thread) { 362 cinfo->recovery_thread = md_register_thread(recover_bitmaps, 363 mddev, "recover"); 364 if (!cinfo->recovery_thread) { 365 pr_warn("md-cluster: Could not create recovery thread\n"); 366 return; 367 } 368 } 369 md_wakeup_thread(cinfo->recovery_thread); 370 } 371 372 static void recover_slot(void *arg, struct dlm_slot *slot) 373 { 374 struct mddev *mddev = arg; 375 struct md_cluster_info *cinfo = mddev->cluster_info; 376 377 pr_info("md-cluster: %s Node %d/%d down. My slot: %d. Initiating recovery.\n", 378 mddev->bitmap_info.cluster_name, 379 slot->nodeid, slot->slot, 380 cinfo->slot_number); 381 /* deduct one since dlm slot starts from one while the num of 382 * cluster-md begins with 0 */ 383 __recover_slot(mddev, slot->slot - 1); 384 } 385 386 static void recover_done(void *arg, struct dlm_slot *slots, 387 int num_slots, int our_slot, 388 uint32_t generation) 389 { 390 struct mddev *mddev = arg; 391 struct md_cluster_info *cinfo = mddev->cluster_info; 392 393 cinfo->slot_number = our_slot; 394 /* completion is only need to be complete when node join cluster, 395 * it doesn't need to run during another node's failure */ 396 if (test_bit(MD_CLUSTER_BEGIN_JOIN_CLUSTER, &cinfo->state)) { 397 complete(&cinfo->completion); 398 clear_bit(MD_CLUSTER_BEGIN_JOIN_CLUSTER, &cinfo->state); 399 } 400 clear_bit(MD_CLUSTER_SUSPEND_READ_BALANCING, &cinfo->state); 401 } 402 403 /* the ops is called when node join the cluster, and do lock recovery 404 * if node failure occurs */ 405 static const struct dlm_lockspace_ops md_ls_ops = { 406 .recover_prep = recover_prep, 407 .recover_slot = recover_slot, 408 .recover_done = recover_done, 409 }; 410 411 /* 412 * The BAST function for the ack lock resource 413 * This function wakes up the receive thread in 414 * order to receive and process the message. 415 */ 416 static void ack_bast(void *arg, int mode) 417 { 418 struct dlm_lock_resource *res = arg; 419 struct md_cluster_info *cinfo = res->mddev->cluster_info; 420 421 if (mode == DLM_LOCK_EX) { 422 if (test_bit(MD_CLUSTER_ALREADY_IN_CLUSTER, &cinfo->state)) 423 md_wakeup_thread(cinfo->recv_thread); 424 else 425 set_bit(MD_CLUSTER_PENDING_RECV_EVENT, &cinfo->state); 426 } 427 } 428 429 static void __remove_suspend_info(struct md_cluster_info *cinfo, int slot) 430 { 431 struct suspend_info *s, *tmp; 432 433 list_for_each_entry_safe(s, tmp, &cinfo->suspend_list, list) 434 if (slot == s->slot) { 435 list_del(&s->list); 436 kfree(s); 437 break; 438 } 439 } 440 441 static void remove_suspend_info(struct mddev *mddev, int slot) 442 { 443 struct md_cluster_info *cinfo = mddev->cluster_info; 444 spin_lock_irq(&cinfo->suspend_lock); 445 __remove_suspend_info(cinfo, slot); 446 spin_unlock_irq(&cinfo->suspend_lock); 447 mddev->pers->quiesce(mddev, 2); 448 } 449 450 451 static void process_suspend_info(struct mddev *mddev, 452 int slot, sector_t lo, sector_t hi) 453 { 454 struct md_cluster_info *cinfo = mddev->cluster_info; 455 struct suspend_info *s; 456 457 if (!hi) { 458 remove_suspend_info(mddev, slot); 459 set_bit(MD_RECOVERY_NEEDED, &mddev->recovery); 460 md_wakeup_thread(mddev->thread); 461 return; 462 } 463 464 /* 465 * The bitmaps are not same for different nodes 466 * if RESYNCING is happening in one node, then 467 * the node which received the RESYNCING message 468 * probably will perform resync with the region 469 * [lo, hi] again, so we could reduce resync time 470 * a lot if we can ensure that the bitmaps among 471 * different nodes are match up well. 472 * 473 * sync_low/hi is used to record the region which 474 * arrived in the previous RESYNCING message, 475 * 476 * Call bitmap_sync_with_cluster to clear 477 * NEEDED_MASK and set RESYNC_MASK since 478 * resync thread is running in another node, 479 * so we don't need to do the resync again 480 * with the same section */ 481 bitmap_sync_with_cluster(mddev, cinfo->sync_low, 482 cinfo->sync_hi, 483 lo, hi); 484 cinfo->sync_low = lo; 485 cinfo->sync_hi = hi; 486 487 s = kzalloc(sizeof(struct suspend_info), GFP_KERNEL); 488 if (!s) 489 return; 490 s->slot = slot; 491 s->lo = lo; 492 s->hi = hi; 493 mddev->pers->quiesce(mddev, 1); 494 mddev->pers->quiesce(mddev, 0); 495 spin_lock_irq(&cinfo->suspend_lock); 496 /* Remove existing entry (if exists) before adding */ 497 __remove_suspend_info(cinfo, slot); 498 list_add(&s->list, &cinfo->suspend_list); 499 spin_unlock_irq(&cinfo->suspend_lock); 500 mddev->pers->quiesce(mddev, 2); 501 } 502 503 static void process_add_new_disk(struct mddev *mddev, struct cluster_msg *cmsg) 504 { 505 char disk_uuid[64]; 506 struct md_cluster_info *cinfo = mddev->cluster_info; 507 char event_name[] = "EVENT=ADD_DEVICE"; 508 char raid_slot[16]; 509 char *envp[] = {event_name, disk_uuid, raid_slot, NULL}; 510 int len; 511 512 len = snprintf(disk_uuid, 64, "DEVICE_UUID="); 513 sprintf(disk_uuid + len, "%pU", cmsg->uuid); 514 snprintf(raid_slot, 16, "RAID_DISK=%d", le32_to_cpu(cmsg->raid_slot)); 515 pr_info("%s:%d Sending kobject change with %s and %s\n", __func__, __LINE__, disk_uuid, raid_slot); 516 init_completion(&cinfo->newdisk_completion); 517 set_bit(MD_CLUSTER_WAITING_FOR_NEWDISK, &cinfo->state); 518 kobject_uevent_env(&disk_to_dev(mddev->gendisk)->kobj, KOBJ_CHANGE, envp); 519 wait_for_completion_timeout(&cinfo->newdisk_completion, 520 NEW_DEV_TIMEOUT); 521 clear_bit(MD_CLUSTER_WAITING_FOR_NEWDISK, &cinfo->state); 522 } 523 524 525 static void process_metadata_update(struct mddev *mddev, struct cluster_msg *msg) 526 { 527 int got_lock = 0; 528 struct md_cluster_info *cinfo = mddev->cluster_info; 529 mddev->good_device_nr = le32_to_cpu(msg->raid_slot); 530 531 dlm_lock_sync(cinfo->no_new_dev_lockres, DLM_LOCK_CR); 532 wait_event(mddev->thread->wqueue, 533 (got_lock = mddev_trylock(mddev)) || 534 test_bit(MD_CLUSTER_HOLDING_MUTEX_FOR_RECVD, &cinfo->state)); 535 md_reload_sb(mddev, mddev->good_device_nr); 536 if (got_lock) 537 mddev_unlock(mddev); 538 } 539 540 static void process_remove_disk(struct mddev *mddev, struct cluster_msg *msg) 541 { 542 struct md_rdev *rdev; 543 544 rcu_read_lock(); 545 rdev = md_find_rdev_nr_rcu(mddev, le32_to_cpu(msg->raid_slot)); 546 if (rdev) { 547 set_bit(ClusterRemove, &rdev->flags); 548 set_bit(MD_RECOVERY_NEEDED, &mddev->recovery); 549 md_wakeup_thread(mddev->thread); 550 } 551 else 552 pr_warn("%s: %d Could not find disk(%d) to REMOVE\n", 553 __func__, __LINE__, le32_to_cpu(msg->raid_slot)); 554 rcu_read_unlock(); 555 } 556 557 static void process_readd_disk(struct mddev *mddev, struct cluster_msg *msg) 558 { 559 struct md_rdev *rdev; 560 561 rcu_read_lock(); 562 rdev = md_find_rdev_nr_rcu(mddev, le32_to_cpu(msg->raid_slot)); 563 if (rdev && test_bit(Faulty, &rdev->flags)) 564 clear_bit(Faulty, &rdev->flags); 565 else 566 pr_warn("%s: %d Could not find disk(%d) which is faulty", 567 __func__, __LINE__, le32_to_cpu(msg->raid_slot)); 568 rcu_read_unlock(); 569 } 570 571 static int process_recvd_msg(struct mddev *mddev, struct cluster_msg *msg) 572 { 573 int ret = 0; 574 575 if (WARN(mddev->cluster_info->slot_number - 1 == le32_to_cpu(msg->slot), 576 "node %d received it's own msg\n", le32_to_cpu(msg->slot))) 577 return -1; 578 switch (le32_to_cpu(msg->type)) { 579 case METADATA_UPDATED: 580 process_metadata_update(mddev, msg); 581 break; 582 case RESYNCING: 583 process_suspend_info(mddev, le32_to_cpu(msg->slot), 584 le64_to_cpu(msg->low), 585 le64_to_cpu(msg->high)); 586 break; 587 case NEWDISK: 588 process_add_new_disk(mddev, msg); 589 break; 590 case REMOVE: 591 process_remove_disk(mddev, msg); 592 break; 593 case RE_ADD: 594 process_readd_disk(mddev, msg); 595 break; 596 case BITMAP_NEEDS_SYNC: 597 __recover_slot(mddev, le32_to_cpu(msg->slot)); 598 break; 599 default: 600 ret = -1; 601 pr_warn("%s:%d Received unknown message from %d\n", 602 __func__, __LINE__, msg->slot); 603 } 604 return ret; 605 } 606 607 /* 608 * thread for receiving message 609 */ 610 static void recv_daemon(struct md_thread *thread) 611 { 612 struct md_cluster_info *cinfo = thread->mddev->cluster_info; 613 struct dlm_lock_resource *ack_lockres = cinfo->ack_lockres; 614 struct dlm_lock_resource *message_lockres = cinfo->message_lockres; 615 struct cluster_msg msg; 616 int ret; 617 618 mutex_lock(&cinfo->recv_mutex); 619 /*get CR on Message*/ 620 if (dlm_lock_sync(message_lockres, DLM_LOCK_CR)) { 621 pr_err("md/raid1:failed to get CR on MESSAGE\n"); 622 mutex_unlock(&cinfo->recv_mutex); 623 return; 624 } 625 626 /* read lvb and wake up thread to process this message_lockres */ 627 memcpy(&msg, message_lockres->lksb.sb_lvbptr, sizeof(struct cluster_msg)); 628 ret = process_recvd_msg(thread->mddev, &msg); 629 if (ret) 630 goto out; 631 632 /*release CR on ack_lockres*/ 633 ret = dlm_unlock_sync(ack_lockres); 634 if (unlikely(ret != 0)) 635 pr_info("unlock ack failed return %d\n", ret); 636 /*up-convert to PR on message_lockres*/ 637 ret = dlm_lock_sync(message_lockres, DLM_LOCK_PR); 638 if (unlikely(ret != 0)) 639 pr_info("lock PR on msg failed return %d\n", ret); 640 /*get CR on ack_lockres again*/ 641 ret = dlm_lock_sync(ack_lockres, DLM_LOCK_CR); 642 if (unlikely(ret != 0)) 643 pr_info("lock CR on ack failed return %d\n", ret); 644 out: 645 /*release CR on message_lockres*/ 646 ret = dlm_unlock_sync(message_lockres); 647 if (unlikely(ret != 0)) 648 pr_info("unlock msg failed return %d\n", ret); 649 mutex_unlock(&cinfo->recv_mutex); 650 } 651 652 /* lock_token() 653 * Takes the lock on the TOKEN lock resource so no other 654 * node can communicate while the operation is underway. 655 */ 656 static int lock_token(struct md_cluster_info *cinfo, bool mddev_locked) 657 { 658 int error, set_bit = 0; 659 struct mddev *mddev = cinfo->mddev; 660 661 /* 662 * If resync thread run after raid1d thread, then process_metadata_update 663 * could not continue if raid1d held reconfig_mutex (and raid1d is blocked 664 * since another node already got EX on Token and waitting the EX of Ack), 665 * so let resync wake up thread in case flag is set. 666 */ 667 if (mddev_locked && !test_bit(MD_CLUSTER_HOLDING_MUTEX_FOR_RECVD, 668 &cinfo->state)) { 669 error = test_and_set_bit_lock(MD_CLUSTER_HOLDING_MUTEX_FOR_RECVD, 670 &cinfo->state); 671 WARN_ON_ONCE(error); 672 md_wakeup_thread(mddev->thread); 673 set_bit = 1; 674 } 675 error = dlm_lock_sync(cinfo->token_lockres, DLM_LOCK_EX); 676 if (set_bit) 677 clear_bit_unlock(MD_CLUSTER_HOLDING_MUTEX_FOR_RECVD, &cinfo->state); 678 679 if (error) 680 pr_err("md-cluster(%s:%d): failed to get EX on TOKEN (%d)\n", 681 __func__, __LINE__, error); 682 683 /* Lock the receive sequence */ 684 mutex_lock(&cinfo->recv_mutex); 685 return error; 686 } 687 688 /* lock_comm() 689 * Sets the MD_CLUSTER_SEND_LOCK bit to lock the send channel. 690 */ 691 static int lock_comm(struct md_cluster_info *cinfo, bool mddev_locked) 692 { 693 wait_event(cinfo->wait, 694 !test_and_set_bit(MD_CLUSTER_SEND_LOCK, &cinfo->state)); 695 696 return lock_token(cinfo, mddev_locked); 697 } 698 699 static void unlock_comm(struct md_cluster_info *cinfo) 700 { 701 WARN_ON(cinfo->token_lockres->mode != DLM_LOCK_EX); 702 mutex_unlock(&cinfo->recv_mutex); 703 dlm_unlock_sync(cinfo->token_lockres); 704 clear_bit(MD_CLUSTER_SEND_LOCK, &cinfo->state); 705 wake_up(&cinfo->wait); 706 } 707 708 /* __sendmsg() 709 * This function performs the actual sending of the message. This function is 710 * usually called after performing the encompassing operation 711 * The function: 712 * 1. Grabs the message lockresource in EX mode 713 * 2. Copies the message to the message LVB 714 * 3. Downconverts message lockresource to CW 715 * 4. Upconverts ack lock resource from CR to EX. This forces the BAST on other nodes 716 * and the other nodes read the message. The thread will wait here until all other 717 * nodes have released ack lock resource. 718 * 5. Downconvert ack lockresource to CR 719 */ 720 static int __sendmsg(struct md_cluster_info *cinfo, struct cluster_msg *cmsg) 721 { 722 int error; 723 int slot = cinfo->slot_number - 1; 724 725 cmsg->slot = cpu_to_le32(slot); 726 /*get EX on Message*/ 727 error = dlm_lock_sync(cinfo->message_lockres, DLM_LOCK_EX); 728 if (error) { 729 pr_err("md-cluster: failed to get EX on MESSAGE (%d)\n", error); 730 goto failed_message; 731 } 732 733 memcpy(cinfo->message_lockres->lksb.sb_lvbptr, (void *)cmsg, 734 sizeof(struct cluster_msg)); 735 /*down-convert EX to CW on Message*/ 736 error = dlm_lock_sync(cinfo->message_lockres, DLM_LOCK_CW); 737 if (error) { 738 pr_err("md-cluster: failed to convert EX to CW on MESSAGE(%d)\n", 739 error); 740 goto failed_ack; 741 } 742 743 /*up-convert CR to EX on Ack*/ 744 error = dlm_lock_sync(cinfo->ack_lockres, DLM_LOCK_EX); 745 if (error) { 746 pr_err("md-cluster: failed to convert CR to EX on ACK(%d)\n", 747 error); 748 goto failed_ack; 749 } 750 751 /*down-convert EX to CR on Ack*/ 752 error = dlm_lock_sync(cinfo->ack_lockres, DLM_LOCK_CR); 753 if (error) { 754 pr_err("md-cluster: failed to convert EX to CR on ACK(%d)\n", 755 error); 756 goto failed_ack; 757 } 758 759 failed_ack: 760 error = dlm_unlock_sync(cinfo->message_lockres); 761 if (unlikely(error != 0)) { 762 pr_err("md-cluster: failed convert to NL on MESSAGE(%d)\n", 763 error); 764 /* in case the message can't be released due to some reason */ 765 goto failed_ack; 766 } 767 failed_message: 768 return error; 769 } 770 771 static int sendmsg(struct md_cluster_info *cinfo, struct cluster_msg *cmsg, 772 bool mddev_locked) 773 { 774 int ret; 775 776 lock_comm(cinfo, mddev_locked); 777 ret = __sendmsg(cinfo, cmsg); 778 unlock_comm(cinfo); 779 return ret; 780 } 781 782 static int gather_all_resync_info(struct mddev *mddev, int total_slots) 783 { 784 struct md_cluster_info *cinfo = mddev->cluster_info; 785 int i, ret = 0; 786 struct dlm_lock_resource *bm_lockres; 787 struct suspend_info *s; 788 char str[64]; 789 sector_t lo, hi; 790 791 792 for (i = 0; i < total_slots; i++) { 793 memset(str, '\0', 64); 794 snprintf(str, 64, "bitmap%04d", i); 795 bm_lockres = lockres_init(mddev, str, NULL, 1); 796 if (!bm_lockres) 797 return -ENOMEM; 798 if (i == (cinfo->slot_number - 1)) { 799 lockres_free(bm_lockres); 800 continue; 801 } 802 803 bm_lockres->flags |= DLM_LKF_NOQUEUE; 804 ret = dlm_lock_sync(bm_lockres, DLM_LOCK_PW); 805 if (ret == -EAGAIN) { 806 s = read_resync_info(mddev, bm_lockres); 807 if (s) { 808 pr_info("%s:%d Resync[%llu..%llu] in progress on %d\n", 809 __func__, __LINE__, 810 (unsigned long long) s->lo, 811 (unsigned long long) s->hi, i); 812 spin_lock_irq(&cinfo->suspend_lock); 813 s->slot = i; 814 list_add(&s->list, &cinfo->suspend_list); 815 spin_unlock_irq(&cinfo->suspend_lock); 816 } 817 ret = 0; 818 lockres_free(bm_lockres); 819 continue; 820 } 821 if (ret) { 822 lockres_free(bm_lockres); 823 goto out; 824 } 825 826 /* Read the disk bitmap sb and check if it needs recovery */ 827 ret = bitmap_copy_from_slot(mddev, i, &lo, &hi, false); 828 if (ret) { 829 pr_warn("md-cluster: Could not gather bitmaps from slot %d", i); 830 lockres_free(bm_lockres); 831 continue; 832 } 833 if ((hi > 0) && (lo < mddev->recovery_cp)) { 834 set_bit(MD_RECOVERY_NEEDED, &mddev->recovery); 835 mddev->recovery_cp = lo; 836 md_check_recovery(mddev); 837 } 838 839 lockres_free(bm_lockres); 840 } 841 out: 842 return ret; 843 } 844 845 static int join(struct mddev *mddev, int nodes) 846 { 847 struct md_cluster_info *cinfo; 848 int ret, ops_rv; 849 char str[64]; 850 851 cinfo = kzalloc(sizeof(struct md_cluster_info), GFP_KERNEL); 852 if (!cinfo) 853 return -ENOMEM; 854 855 INIT_LIST_HEAD(&cinfo->suspend_list); 856 spin_lock_init(&cinfo->suspend_lock); 857 init_completion(&cinfo->completion); 858 set_bit(MD_CLUSTER_BEGIN_JOIN_CLUSTER, &cinfo->state); 859 init_waitqueue_head(&cinfo->wait); 860 mutex_init(&cinfo->recv_mutex); 861 862 mddev->cluster_info = cinfo; 863 cinfo->mddev = mddev; 864 865 memset(str, 0, 64); 866 sprintf(str, "%pU", mddev->uuid); 867 ret = dlm_new_lockspace(str, mddev->bitmap_info.cluster_name, 868 DLM_LSFL_FS, LVB_SIZE, 869 &md_ls_ops, mddev, &ops_rv, &cinfo->lockspace); 870 if (ret) 871 goto err; 872 wait_for_completion(&cinfo->completion); 873 if (nodes < cinfo->slot_number) { 874 pr_err("md-cluster: Slot allotted(%d) is greater than available slots(%d).", 875 cinfo->slot_number, nodes); 876 ret = -ERANGE; 877 goto err; 878 } 879 /* Initiate the communication resources */ 880 ret = -ENOMEM; 881 cinfo->recv_thread = md_register_thread(recv_daemon, mddev, "cluster_recv"); 882 if (!cinfo->recv_thread) { 883 pr_err("md-cluster: cannot allocate memory for recv_thread!\n"); 884 goto err; 885 } 886 cinfo->message_lockres = lockres_init(mddev, "message", NULL, 1); 887 if (!cinfo->message_lockres) 888 goto err; 889 cinfo->token_lockres = lockres_init(mddev, "token", NULL, 0); 890 if (!cinfo->token_lockres) 891 goto err; 892 cinfo->no_new_dev_lockres = lockres_init(mddev, "no-new-dev", NULL, 0); 893 if (!cinfo->no_new_dev_lockres) 894 goto err; 895 896 ret = dlm_lock_sync(cinfo->token_lockres, DLM_LOCK_EX); 897 if (ret) { 898 ret = -EAGAIN; 899 pr_err("md-cluster: can't join cluster to avoid lock issue\n"); 900 goto err; 901 } 902 cinfo->ack_lockres = lockres_init(mddev, "ack", ack_bast, 0); 903 if (!cinfo->ack_lockres) { 904 ret = -ENOMEM; 905 goto err; 906 } 907 /* get sync CR lock on ACK. */ 908 if (dlm_lock_sync(cinfo->ack_lockres, DLM_LOCK_CR)) 909 pr_err("md-cluster: failed to get a sync CR lock on ACK!(%d)\n", 910 ret); 911 dlm_unlock_sync(cinfo->token_lockres); 912 /* get sync CR lock on no-new-dev. */ 913 if (dlm_lock_sync(cinfo->no_new_dev_lockres, DLM_LOCK_CR)) 914 pr_err("md-cluster: failed to get a sync CR lock on no-new-dev!(%d)\n", ret); 915 916 917 pr_info("md-cluster: Joined cluster %s slot %d\n", str, cinfo->slot_number); 918 snprintf(str, 64, "bitmap%04d", cinfo->slot_number - 1); 919 cinfo->bitmap_lockres = lockres_init(mddev, str, NULL, 1); 920 if (!cinfo->bitmap_lockres) { 921 ret = -ENOMEM; 922 goto err; 923 } 924 if (dlm_lock_sync(cinfo->bitmap_lockres, DLM_LOCK_PW)) { 925 pr_err("Failed to get bitmap lock\n"); 926 ret = -EINVAL; 927 goto err; 928 } 929 930 cinfo->resync_lockres = lockres_init(mddev, "resync", NULL, 0); 931 if (!cinfo->resync_lockres) { 932 ret = -ENOMEM; 933 goto err; 934 } 935 936 return 0; 937 err: 938 set_bit(MD_CLUSTER_HOLDING_MUTEX_FOR_RECVD, &cinfo->state); 939 md_unregister_thread(&cinfo->recovery_thread); 940 md_unregister_thread(&cinfo->recv_thread); 941 lockres_free(cinfo->message_lockres); 942 lockres_free(cinfo->token_lockres); 943 lockres_free(cinfo->ack_lockres); 944 lockres_free(cinfo->no_new_dev_lockres); 945 lockres_free(cinfo->resync_lockres); 946 lockres_free(cinfo->bitmap_lockres); 947 if (cinfo->lockspace) 948 dlm_release_lockspace(cinfo->lockspace, 2); 949 mddev->cluster_info = NULL; 950 kfree(cinfo); 951 return ret; 952 } 953 954 static void load_bitmaps(struct mddev *mddev, int total_slots) 955 { 956 struct md_cluster_info *cinfo = mddev->cluster_info; 957 958 /* load all the node's bitmap info for resync */ 959 if (gather_all_resync_info(mddev, total_slots)) 960 pr_err("md-cluster: failed to gather all resyn infos\n"); 961 set_bit(MD_CLUSTER_ALREADY_IN_CLUSTER, &cinfo->state); 962 /* wake up recv thread in case something need to be handled */ 963 if (test_and_clear_bit(MD_CLUSTER_PENDING_RECV_EVENT, &cinfo->state)) 964 md_wakeup_thread(cinfo->recv_thread); 965 } 966 967 static void resync_bitmap(struct mddev *mddev) 968 { 969 struct md_cluster_info *cinfo = mddev->cluster_info; 970 struct cluster_msg cmsg = {0}; 971 int err; 972 973 cmsg.type = cpu_to_le32(BITMAP_NEEDS_SYNC); 974 err = sendmsg(cinfo, &cmsg, 1); 975 if (err) 976 pr_err("%s:%d: failed to send BITMAP_NEEDS_SYNC message (%d)\n", 977 __func__, __LINE__, err); 978 } 979 980 static void unlock_all_bitmaps(struct mddev *mddev); 981 static int leave(struct mddev *mddev) 982 { 983 struct md_cluster_info *cinfo = mddev->cluster_info; 984 985 if (!cinfo) 986 return 0; 987 988 /* BITMAP_NEEDS_SYNC message should be sent when node 989 * is leaving the cluster with dirty bitmap, also we 990 * can only deliver it when dlm connection is available */ 991 if (cinfo->slot_number > 0 && mddev->recovery_cp != MaxSector) 992 resync_bitmap(mddev); 993 994 set_bit(MD_CLUSTER_HOLDING_MUTEX_FOR_RECVD, &cinfo->state); 995 md_unregister_thread(&cinfo->recovery_thread); 996 md_unregister_thread(&cinfo->recv_thread); 997 lockres_free(cinfo->message_lockres); 998 lockres_free(cinfo->token_lockres); 999 lockres_free(cinfo->ack_lockres); 1000 lockres_free(cinfo->no_new_dev_lockres); 1001 lockres_free(cinfo->resync_lockres); 1002 lockres_free(cinfo->bitmap_lockres); 1003 unlock_all_bitmaps(mddev); 1004 dlm_release_lockspace(cinfo->lockspace, 2); 1005 kfree(cinfo); 1006 return 0; 1007 } 1008 1009 /* slot_number(): Returns the MD slot number to use 1010 * DLM starts the slot numbers from 1, wheras cluster-md 1011 * wants the number to be from zero, so we deduct one 1012 */ 1013 static int slot_number(struct mddev *mddev) 1014 { 1015 struct md_cluster_info *cinfo = mddev->cluster_info; 1016 1017 return cinfo->slot_number - 1; 1018 } 1019 1020 /* 1021 * Check if the communication is already locked, else lock the communication 1022 * channel. 1023 * If it is already locked, token is in EX mode, and hence lock_token() 1024 * should not be called. 1025 */ 1026 static int metadata_update_start(struct mddev *mddev) 1027 { 1028 struct md_cluster_info *cinfo = mddev->cluster_info; 1029 int ret; 1030 1031 /* 1032 * metadata_update_start is always called with the protection of 1033 * reconfig_mutex, so set WAITING_FOR_TOKEN here. 1034 */ 1035 ret = test_and_set_bit_lock(MD_CLUSTER_HOLDING_MUTEX_FOR_RECVD, 1036 &cinfo->state); 1037 WARN_ON_ONCE(ret); 1038 md_wakeup_thread(mddev->thread); 1039 1040 wait_event(cinfo->wait, 1041 !test_and_set_bit(MD_CLUSTER_SEND_LOCK, &cinfo->state) || 1042 test_and_clear_bit(MD_CLUSTER_SEND_LOCKED_ALREADY, &cinfo->state)); 1043 1044 /* If token is already locked, return 0 */ 1045 if (cinfo->token_lockres->mode == DLM_LOCK_EX) { 1046 clear_bit_unlock(MD_CLUSTER_HOLDING_MUTEX_FOR_RECVD, &cinfo->state); 1047 return 0; 1048 } 1049 1050 ret = lock_token(cinfo, 1); 1051 clear_bit_unlock(MD_CLUSTER_HOLDING_MUTEX_FOR_RECVD, &cinfo->state); 1052 return ret; 1053 } 1054 1055 static int metadata_update_finish(struct mddev *mddev) 1056 { 1057 struct md_cluster_info *cinfo = mddev->cluster_info; 1058 struct cluster_msg cmsg; 1059 struct md_rdev *rdev; 1060 int ret = 0; 1061 int raid_slot = -1; 1062 1063 memset(&cmsg, 0, sizeof(cmsg)); 1064 cmsg.type = cpu_to_le32(METADATA_UPDATED); 1065 /* Pick up a good active device number to send. 1066 */ 1067 rdev_for_each(rdev, mddev) 1068 if (rdev->raid_disk > -1 && !test_bit(Faulty, &rdev->flags)) { 1069 raid_slot = rdev->desc_nr; 1070 break; 1071 } 1072 if (raid_slot >= 0) { 1073 cmsg.raid_slot = cpu_to_le32(raid_slot); 1074 ret = __sendmsg(cinfo, &cmsg); 1075 } else 1076 pr_warn("md-cluster: No good device id found to send\n"); 1077 clear_bit(MD_CLUSTER_SEND_LOCKED_ALREADY, &cinfo->state); 1078 unlock_comm(cinfo); 1079 return ret; 1080 } 1081 1082 static void metadata_update_cancel(struct mddev *mddev) 1083 { 1084 struct md_cluster_info *cinfo = mddev->cluster_info; 1085 clear_bit(MD_CLUSTER_SEND_LOCKED_ALREADY, &cinfo->state); 1086 unlock_comm(cinfo); 1087 } 1088 1089 static int resync_start(struct mddev *mddev) 1090 { 1091 struct md_cluster_info *cinfo = mddev->cluster_info; 1092 return dlm_lock_sync_interruptible(cinfo->resync_lockres, DLM_LOCK_EX, mddev); 1093 } 1094 1095 static int resync_info_update(struct mddev *mddev, sector_t lo, sector_t hi) 1096 { 1097 struct md_cluster_info *cinfo = mddev->cluster_info; 1098 struct resync_info ri; 1099 struct cluster_msg cmsg = {0}; 1100 1101 /* do not send zero again, if we have sent before */ 1102 if (hi == 0) { 1103 memcpy(&ri, cinfo->bitmap_lockres->lksb.sb_lvbptr, sizeof(struct resync_info)); 1104 if (le64_to_cpu(ri.hi) == 0) 1105 return 0; 1106 } 1107 1108 add_resync_info(cinfo->bitmap_lockres, lo, hi); 1109 /* Re-acquire the lock to refresh LVB */ 1110 dlm_lock_sync(cinfo->bitmap_lockres, DLM_LOCK_PW); 1111 cmsg.type = cpu_to_le32(RESYNCING); 1112 cmsg.low = cpu_to_le64(lo); 1113 cmsg.high = cpu_to_le64(hi); 1114 1115 /* 1116 * mddev_lock is held if resync_info_update is called from 1117 * resync_finish (md_reap_sync_thread -> resync_finish) 1118 */ 1119 if (lo == 0 && hi == 0) 1120 return sendmsg(cinfo, &cmsg, 1); 1121 else 1122 return sendmsg(cinfo, &cmsg, 0); 1123 } 1124 1125 static int resync_finish(struct mddev *mddev) 1126 { 1127 struct md_cluster_info *cinfo = mddev->cluster_info; 1128 dlm_unlock_sync(cinfo->resync_lockres); 1129 return resync_info_update(mddev, 0, 0); 1130 } 1131 1132 static int area_resyncing(struct mddev *mddev, int direction, 1133 sector_t lo, sector_t hi) 1134 { 1135 struct md_cluster_info *cinfo = mddev->cluster_info; 1136 int ret = 0; 1137 struct suspend_info *s; 1138 1139 if ((direction == READ) && 1140 test_bit(MD_CLUSTER_SUSPEND_READ_BALANCING, &cinfo->state)) 1141 return 1; 1142 1143 spin_lock_irq(&cinfo->suspend_lock); 1144 if (list_empty(&cinfo->suspend_list)) 1145 goto out; 1146 list_for_each_entry(s, &cinfo->suspend_list, list) 1147 if (hi > s->lo && lo < s->hi) { 1148 ret = 1; 1149 break; 1150 } 1151 out: 1152 spin_unlock_irq(&cinfo->suspend_lock); 1153 return ret; 1154 } 1155 1156 /* add_new_disk() - initiates a disk add 1157 * However, if this fails before writing md_update_sb(), 1158 * add_new_disk_cancel() must be called to release token lock 1159 */ 1160 static int add_new_disk(struct mddev *mddev, struct md_rdev *rdev) 1161 { 1162 struct md_cluster_info *cinfo = mddev->cluster_info; 1163 struct cluster_msg cmsg; 1164 int ret = 0; 1165 struct mdp_superblock_1 *sb = page_address(rdev->sb_page); 1166 char *uuid = sb->device_uuid; 1167 1168 memset(&cmsg, 0, sizeof(cmsg)); 1169 cmsg.type = cpu_to_le32(NEWDISK); 1170 memcpy(cmsg.uuid, uuid, 16); 1171 cmsg.raid_slot = cpu_to_le32(rdev->desc_nr); 1172 lock_comm(cinfo, 1); 1173 ret = __sendmsg(cinfo, &cmsg); 1174 if (ret) 1175 return ret; 1176 cinfo->no_new_dev_lockres->flags |= DLM_LKF_NOQUEUE; 1177 ret = dlm_lock_sync(cinfo->no_new_dev_lockres, DLM_LOCK_EX); 1178 cinfo->no_new_dev_lockres->flags &= ~DLM_LKF_NOQUEUE; 1179 /* Some node does not "see" the device */ 1180 if (ret == -EAGAIN) 1181 ret = -ENOENT; 1182 if (ret) 1183 unlock_comm(cinfo); 1184 else { 1185 dlm_lock_sync(cinfo->no_new_dev_lockres, DLM_LOCK_CR); 1186 /* Since MD_CHANGE_DEVS will be set in add_bound_rdev which 1187 * will run soon after add_new_disk, the below path will be 1188 * invoked: 1189 * md_wakeup_thread(mddev->thread) 1190 * -> conf->thread (raid1d) 1191 * -> md_check_recovery -> md_update_sb 1192 * -> metadata_update_start/finish 1193 * MD_CLUSTER_SEND_LOCKED_ALREADY will be cleared eventually. 1194 * 1195 * For other failure cases, metadata_update_cancel and 1196 * add_new_disk_cancel also clear below bit as well. 1197 * */ 1198 set_bit(MD_CLUSTER_SEND_LOCKED_ALREADY, &cinfo->state); 1199 wake_up(&cinfo->wait); 1200 } 1201 return ret; 1202 } 1203 1204 static void add_new_disk_cancel(struct mddev *mddev) 1205 { 1206 struct md_cluster_info *cinfo = mddev->cluster_info; 1207 clear_bit(MD_CLUSTER_SEND_LOCKED_ALREADY, &cinfo->state); 1208 unlock_comm(cinfo); 1209 } 1210 1211 static int new_disk_ack(struct mddev *mddev, bool ack) 1212 { 1213 struct md_cluster_info *cinfo = mddev->cluster_info; 1214 1215 if (!test_bit(MD_CLUSTER_WAITING_FOR_NEWDISK, &cinfo->state)) { 1216 pr_warn("md-cluster(%s): Spurious cluster confirmation\n", mdname(mddev)); 1217 return -EINVAL; 1218 } 1219 1220 if (ack) 1221 dlm_unlock_sync(cinfo->no_new_dev_lockres); 1222 complete(&cinfo->newdisk_completion); 1223 return 0; 1224 } 1225 1226 static int remove_disk(struct mddev *mddev, struct md_rdev *rdev) 1227 { 1228 struct cluster_msg cmsg = {0}; 1229 struct md_cluster_info *cinfo = mddev->cluster_info; 1230 cmsg.type = cpu_to_le32(REMOVE); 1231 cmsg.raid_slot = cpu_to_le32(rdev->desc_nr); 1232 return sendmsg(cinfo, &cmsg, 1); 1233 } 1234 1235 static int lock_all_bitmaps(struct mddev *mddev) 1236 { 1237 int slot, my_slot, ret, held = 1, i = 0; 1238 char str[64]; 1239 struct md_cluster_info *cinfo = mddev->cluster_info; 1240 1241 cinfo->other_bitmap_lockres = kzalloc((mddev->bitmap_info.nodes - 1) * 1242 sizeof(struct dlm_lock_resource *), 1243 GFP_KERNEL); 1244 if (!cinfo->other_bitmap_lockres) { 1245 pr_err("md: can't alloc mem for other bitmap locks\n"); 1246 return 0; 1247 } 1248 1249 my_slot = slot_number(mddev); 1250 for (slot = 0; slot < mddev->bitmap_info.nodes; slot++) { 1251 if (slot == my_slot) 1252 continue; 1253 1254 memset(str, '\0', 64); 1255 snprintf(str, 64, "bitmap%04d", slot); 1256 cinfo->other_bitmap_lockres[i] = lockres_init(mddev, str, NULL, 1); 1257 if (!cinfo->other_bitmap_lockres[i]) 1258 return -ENOMEM; 1259 1260 cinfo->other_bitmap_lockres[i]->flags |= DLM_LKF_NOQUEUE; 1261 ret = dlm_lock_sync(cinfo->other_bitmap_lockres[i], DLM_LOCK_PW); 1262 if (ret) 1263 held = -1; 1264 i++; 1265 } 1266 1267 return held; 1268 } 1269 1270 static void unlock_all_bitmaps(struct mddev *mddev) 1271 { 1272 struct md_cluster_info *cinfo = mddev->cluster_info; 1273 int i; 1274 1275 /* release other node's bitmap lock if they are existed */ 1276 if (cinfo->other_bitmap_lockres) { 1277 for (i = 0; i < mddev->bitmap_info.nodes - 1; i++) { 1278 if (cinfo->other_bitmap_lockres[i]) { 1279 lockres_free(cinfo->other_bitmap_lockres[i]); 1280 } 1281 } 1282 kfree(cinfo->other_bitmap_lockres); 1283 } 1284 } 1285 1286 static int gather_bitmaps(struct md_rdev *rdev) 1287 { 1288 int sn, err; 1289 sector_t lo, hi; 1290 struct cluster_msg cmsg = {0}; 1291 struct mddev *mddev = rdev->mddev; 1292 struct md_cluster_info *cinfo = mddev->cluster_info; 1293 1294 cmsg.type = cpu_to_le32(RE_ADD); 1295 cmsg.raid_slot = cpu_to_le32(rdev->desc_nr); 1296 err = sendmsg(cinfo, &cmsg, 1); 1297 if (err) 1298 goto out; 1299 1300 for (sn = 0; sn < mddev->bitmap_info.nodes; sn++) { 1301 if (sn == (cinfo->slot_number - 1)) 1302 continue; 1303 err = bitmap_copy_from_slot(mddev, sn, &lo, &hi, false); 1304 if (err) { 1305 pr_warn("md-cluster: Could not gather bitmaps from slot %d", sn); 1306 goto out; 1307 } 1308 if ((hi > 0) && (lo < mddev->recovery_cp)) 1309 mddev->recovery_cp = lo; 1310 } 1311 out: 1312 return err; 1313 } 1314 1315 static struct md_cluster_operations cluster_ops = { 1316 .join = join, 1317 .leave = leave, 1318 .slot_number = slot_number, 1319 .resync_start = resync_start, 1320 .resync_finish = resync_finish, 1321 .resync_info_update = resync_info_update, 1322 .metadata_update_start = metadata_update_start, 1323 .metadata_update_finish = metadata_update_finish, 1324 .metadata_update_cancel = metadata_update_cancel, 1325 .area_resyncing = area_resyncing, 1326 .add_new_disk = add_new_disk, 1327 .add_new_disk_cancel = add_new_disk_cancel, 1328 .new_disk_ack = new_disk_ack, 1329 .remove_disk = remove_disk, 1330 .load_bitmaps = load_bitmaps, 1331 .gather_bitmaps = gather_bitmaps, 1332 .lock_all_bitmaps = lock_all_bitmaps, 1333 .unlock_all_bitmaps = unlock_all_bitmaps, 1334 }; 1335 1336 static int __init cluster_init(void) 1337 { 1338 pr_warn("md-cluster: EXPERIMENTAL. Use with caution\n"); 1339 pr_info("Registering Cluster MD functions\n"); 1340 register_md_cluster_operations(&cluster_ops, THIS_MODULE); 1341 return 0; 1342 } 1343 1344 static void cluster_exit(void) 1345 { 1346 unregister_md_cluster_operations(); 1347 } 1348 1349 module_init(cluster_init); 1350 module_exit(cluster_exit); 1351 MODULE_AUTHOR("SUSE"); 1352 MODULE_LICENSE("GPL"); 1353 MODULE_DESCRIPTION("Clustering support for MD"); 1354