1 /* 2 * Copyright (C) 2015, SUSE 3 * 4 * This program is free software; you can redistribute it and/or modify 5 * it under the terms of the GNU General Public License as published by 6 * the Free Software Foundation; either version 2, or (at your option) 7 * any later version. 8 * 9 */ 10 11 12 #include <linux/module.h> 13 #include <linux/kthread.h> 14 #include <linux/dlm.h> 15 #include <linux/sched.h> 16 #include <linux/raid/md_p.h> 17 #include "md.h" 18 #include "bitmap.h" 19 #include "md-cluster.h" 20 21 #define LVB_SIZE 64 22 #define NEW_DEV_TIMEOUT 5000 23 24 struct dlm_lock_resource { 25 dlm_lockspace_t *ls; 26 struct dlm_lksb lksb; 27 char *name; /* lock name. */ 28 uint32_t flags; /* flags to pass to dlm_lock() */ 29 wait_queue_head_t sync_locking; /* wait queue for synchronized locking */ 30 bool sync_locking_done; 31 void (*bast)(void *arg, int mode); /* blocking AST function pointer*/ 32 struct mddev *mddev; /* pointing back to mddev. */ 33 int mode; 34 }; 35 36 struct suspend_info { 37 int slot; 38 sector_t lo; 39 sector_t hi; 40 struct list_head list; 41 }; 42 43 struct resync_info { 44 __le64 lo; 45 __le64 hi; 46 }; 47 48 /* md_cluster_info flags */ 49 #define MD_CLUSTER_WAITING_FOR_NEWDISK 1 50 #define MD_CLUSTER_SUSPEND_READ_BALANCING 2 51 #define MD_CLUSTER_BEGIN_JOIN_CLUSTER 3 52 53 /* Lock the send communication. This is done through 54 * bit manipulation as opposed to a mutex in order to 55 * accomodate lock and hold. See next comment. 56 */ 57 #define MD_CLUSTER_SEND_LOCK 4 58 /* If cluster operations (such as adding a disk) must lock the 59 * communication channel, so as to perform extra operations 60 * (update metadata) and no other operation is allowed on the 61 * MD. Token needs to be locked and held until the operation 62 * completes witha md_update_sb(), which would eventually release 63 * the lock. 64 */ 65 #define MD_CLUSTER_SEND_LOCKED_ALREADY 5 66 /* We should receive message after node joined cluster and 67 * set up all the related infos such as bitmap and personality */ 68 #define MD_CLUSTER_ALREADY_IN_CLUSTER 6 69 #define MD_CLUSTER_PENDING_RECV_EVENT 7 70 71 72 struct md_cluster_info { 73 /* dlm lock space and resources for clustered raid. */ 74 dlm_lockspace_t *lockspace; 75 int slot_number; 76 struct completion completion; 77 struct mutex recv_mutex; 78 struct dlm_lock_resource *bitmap_lockres; 79 struct dlm_lock_resource **other_bitmap_lockres; 80 struct dlm_lock_resource *resync_lockres; 81 struct list_head suspend_list; 82 spinlock_t suspend_lock; 83 struct md_thread *recovery_thread; 84 unsigned long recovery_map; 85 /* communication loc resources */ 86 struct dlm_lock_resource *ack_lockres; 87 struct dlm_lock_resource *message_lockres; 88 struct dlm_lock_resource *token_lockres; 89 struct dlm_lock_resource *no_new_dev_lockres; 90 struct md_thread *recv_thread; 91 struct completion newdisk_completion; 92 wait_queue_head_t wait; 93 unsigned long state; 94 /* record the region in RESYNCING message */ 95 sector_t sync_low; 96 sector_t sync_hi; 97 }; 98 99 enum msg_type { 100 METADATA_UPDATED = 0, 101 RESYNCING, 102 NEWDISK, 103 REMOVE, 104 RE_ADD, 105 BITMAP_NEEDS_SYNC, 106 }; 107 108 struct cluster_msg { 109 __le32 type; 110 __le32 slot; 111 /* TODO: Unionize this for smaller footprint */ 112 __le64 low; 113 __le64 high; 114 char uuid[16]; 115 __le32 raid_slot; 116 }; 117 118 static void sync_ast(void *arg) 119 { 120 struct dlm_lock_resource *res; 121 122 res = arg; 123 res->sync_locking_done = true; 124 wake_up(&res->sync_locking); 125 } 126 127 static int dlm_lock_sync(struct dlm_lock_resource *res, int mode) 128 { 129 int ret = 0; 130 131 ret = dlm_lock(res->ls, mode, &res->lksb, 132 res->flags, res->name, strlen(res->name), 133 0, sync_ast, res, res->bast); 134 if (ret) 135 return ret; 136 wait_event(res->sync_locking, res->sync_locking_done); 137 res->sync_locking_done = false; 138 if (res->lksb.sb_status == 0) 139 res->mode = mode; 140 return res->lksb.sb_status; 141 } 142 143 static int dlm_unlock_sync(struct dlm_lock_resource *res) 144 { 145 return dlm_lock_sync(res, DLM_LOCK_NL); 146 } 147 148 /* 149 * An variation of dlm_lock_sync, which make lock request could 150 * be interrupted 151 */ 152 static int dlm_lock_sync_interruptible(struct dlm_lock_resource *res, int mode, 153 struct mddev *mddev) 154 { 155 int ret = 0; 156 157 ret = dlm_lock(res->ls, mode, &res->lksb, 158 res->flags, res->name, strlen(res->name), 159 0, sync_ast, res, res->bast); 160 if (ret) 161 return ret; 162 163 wait_event(res->sync_locking, res->sync_locking_done 164 || kthread_should_stop()); 165 if (!res->sync_locking_done) { 166 /* 167 * the convert queue contains the lock request when request is 168 * interrupted, and sync_ast could still be run, so need to 169 * cancel the request and reset completion 170 */ 171 ret = dlm_unlock(res->ls, res->lksb.sb_lkid, DLM_LKF_CANCEL, 172 &res->lksb, res); 173 res->sync_locking_done = false; 174 if (unlikely(ret != 0)) 175 pr_info("failed to cancel previous lock request " 176 "%s return %d\n", res->name, ret); 177 return -EPERM; 178 } else 179 res->sync_locking_done = false; 180 if (res->lksb.sb_status == 0) 181 res->mode = mode; 182 return res->lksb.sb_status; 183 } 184 185 static struct dlm_lock_resource *lockres_init(struct mddev *mddev, 186 char *name, void (*bastfn)(void *arg, int mode), int with_lvb) 187 { 188 struct dlm_lock_resource *res = NULL; 189 int ret, namelen; 190 struct md_cluster_info *cinfo = mddev->cluster_info; 191 192 res = kzalloc(sizeof(struct dlm_lock_resource), GFP_KERNEL); 193 if (!res) 194 return NULL; 195 init_waitqueue_head(&res->sync_locking); 196 res->sync_locking_done = false; 197 res->ls = cinfo->lockspace; 198 res->mddev = mddev; 199 res->mode = DLM_LOCK_IV; 200 namelen = strlen(name); 201 res->name = kzalloc(namelen + 1, GFP_KERNEL); 202 if (!res->name) { 203 pr_err("md-cluster: Unable to allocate resource name for resource %s\n", name); 204 goto out_err; 205 } 206 strlcpy(res->name, name, namelen + 1); 207 if (with_lvb) { 208 res->lksb.sb_lvbptr = kzalloc(LVB_SIZE, GFP_KERNEL); 209 if (!res->lksb.sb_lvbptr) { 210 pr_err("md-cluster: Unable to allocate LVB for resource %s\n", name); 211 goto out_err; 212 } 213 res->flags = DLM_LKF_VALBLK; 214 } 215 216 if (bastfn) 217 res->bast = bastfn; 218 219 res->flags |= DLM_LKF_EXPEDITE; 220 221 ret = dlm_lock_sync(res, DLM_LOCK_NL); 222 if (ret) { 223 pr_err("md-cluster: Unable to lock NL on new lock resource %s\n", name); 224 goto out_err; 225 } 226 res->flags &= ~DLM_LKF_EXPEDITE; 227 res->flags |= DLM_LKF_CONVERT; 228 229 return res; 230 out_err: 231 kfree(res->lksb.sb_lvbptr); 232 kfree(res->name); 233 kfree(res); 234 return NULL; 235 } 236 237 static void lockres_free(struct dlm_lock_resource *res) 238 { 239 int ret = 0; 240 241 if (!res) 242 return; 243 244 /* 245 * use FORCEUNLOCK flag, so we can unlock even the lock is on the 246 * waiting or convert queue 247 */ 248 ret = dlm_unlock(res->ls, res->lksb.sb_lkid, DLM_LKF_FORCEUNLOCK, 249 &res->lksb, res); 250 if (unlikely(ret != 0)) 251 pr_err("failed to unlock %s return %d\n", res->name, ret); 252 else 253 wait_event(res->sync_locking, res->sync_locking_done); 254 255 kfree(res->name); 256 kfree(res->lksb.sb_lvbptr); 257 kfree(res); 258 } 259 260 static void add_resync_info(struct dlm_lock_resource *lockres, 261 sector_t lo, sector_t hi) 262 { 263 struct resync_info *ri; 264 265 ri = (struct resync_info *)lockres->lksb.sb_lvbptr; 266 ri->lo = cpu_to_le64(lo); 267 ri->hi = cpu_to_le64(hi); 268 } 269 270 static struct suspend_info *read_resync_info(struct mddev *mddev, struct dlm_lock_resource *lockres) 271 { 272 struct resync_info ri; 273 struct suspend_info *s = NULL; 274 sector_t hi = 0; 275 276 dlm_lock_sync(lockres, DLM_LOCK_CR); 277 memcpy(&ri, lockres->lksb.sb_lvbptr, sizeof(struct resync_info)); 278 hi = le64_to_cpu(ri.hi); 279 if (hi > 0) { 280 s = kzalloc(sizeof(struct suspend_info), GFP_KERNEL); 281 if (!s) 282 goto out; 283 s->hi = hi; 284 s->lo = le64_to_cpu(ri.lo); 285 } 286 dlm_unlock_sync(lockres); 287 out: 288 return s; 289 } 290 291 static void recover_bitmaps(struct md_thread *thread) 292 { 293 struct mddev *mddev = thread->mddev; 294 struct md_cluster_info *cinfo = mddev->cluster_info; 295 struct dlm_lock_resource *bm_lockres; 296 char str[64]; 297 int slot, ret; 298 struct suspend_info *s, *tmp; 299 sector_t lo, hi; 300 301 while (cinfo->recovery_map) { 302 slot = fls64((u64)cinfo->recovery_map) - 1; 303 304 /* Clear suspend_area associated with the bitmap */ 305 spin_lock_irq(&cinfo->suspend_lock); 306 list_for_each_entry_safe(s, tmp, &cinfo->suspend_list, list) 307 if (slot == s->slot) { 308 list_del(&s->list); 309 kfree(s); 310 } 311 spin_unlock_irq(&cinfo->suspend_lock); 312 313 snprintf(str, 64, "bitmap%04d", slot); 314 bm_lockres = lockres_init(mddev, str, NULL, 1); 315 if (!bm_lockres) { 316 pr_err("md-cluster: Cannot initialize bitmaps\n"); 317 goto clear_bit; 318 } 319 320 ret = dlm_lock_sync_interruptible(bm_lockres, DLM_LOCK_PW, mddev); 321 if (ret) { 322 pr_err("md-cluster: Could not DLM lock %s: %d\n", 323 str, ret); 324 goto clear_bit; 325 } 326 ret = bitmap_copy_from_slot(mddev, slot, &lo, &hi, true); 327 if (ret) { 328 pr_err("md-cluster: Could not copy data from bitmap %d\n", slot); 329 goto clear_bit; 330 } 331 if (hi > 0) { 332 if (lo < mddev->recovery_cp) 333 mddev->recovery_cp = lo; 334 /* wake up thread to continue resync in case resync 335 * is not finished */ 336 if (mddev->recovery_cp != MaxSector) { 337 set_bit(MD_RECOVERY_NEEDED, &mddev->recovery); 338 md_wakeup_thread(mddev->thread); 339 } 340 } 341 clear_bit: 342 lockres_free(bm_lockres); 343 clear_bit(slot, &cinfo->recovery_map); 344 } 345 } 346 347 static void recover_prep(void *arg) 348 { 349 struct mddev *mddev = arg; 350 struct md_cluster_info *cinfo = mddev->cluster_info; 351 set_bit(MD_CLUSTER_SUSPEND_READ_BALANCING, &cinfo->state); 352 } 353 354 static void __recover_slot(struct mddev *mddev, int slot) 355 { 356 struct md_cluster_info *cinfo = mddev->cluster_info; 357 358 set_bit(slot, &cinfo->recovery_map); 359 if (!cinfo->recovery_thread) { 360 cinfo->recovery_thread = md_register_thread(recover_bitmaps, 361 mddev, "recover"); 362 if (!cinfo->recovery_thread) { 363 pr_warn("md-cluster: Could not create recovery thread\n"); 364 return; 365 } 366 } 367 md_wakeup_thread(cinfo->recovery_thread); 368 } 369 370 static void recover_slot(void *arg, struct dlm_slot *slot) 371 { 372 struct mddev *mddev = arg; 373 struct md_cluster_info *cinfo = mddev->cluster_info; 374 375 pr_info("md-cluster: %s Node %d/%d down. My slot: %d. Initiating recovery.\n", 376 mddev->bitmap_info.cluster_name, 377 slot->nodeid, slot->slot, 378 cinfo->slot_number); 379 /* deduct one since dlm slot starts from one while the num of 380 * cluster-md begins with 0 */ 381 __recover_slot(mddev, slot->slot - 1); 382 } 383 384 static void recover_done(void *arg, struct dlm_slot *slots, 385 int num_slots, int our_slot, 386 uint32_t generation) 387 { 388 struct mddev *mddev = arg; 389 struct md_cluster_info *cinfo = mddev->cluster_info; 390 391 cinfo->slot_number = our_slot; 392 /* completion is only need to be complete when node join cluster, 393 * it doesn't need to run during another node's failure */ 394 if (test_bit(MD_CLUSTER_BEGIN_JOIN_CLUSTER, &cinfo->state)) { 395 complete(&cinfo->completion); 396 clear_bit(MD_CLUSTER_BEGIN_JOIN_CLUSTER, &cinfo->state); 397 } 398 clear_bit(MD_CLUSTER_SUSPEND_READ_BALANCING, &cinfo->state); 399 } 400 401 /* the ops is called when node join the cluster, and do lock recovery 402 * if node failure occurs */ 403 static const struct dlm_lockspace_ops md_ls_ops = { 404 .recover_prep = recover_prep, 405 .recover_slot = recover_slot, 406 .recover_done = recover_done, 407 }; 408 409 /* 410 * The BAST function for the ack lock resource 411 * This function wakes up the receive thread in 412 * order to receive and process the message. 413 */ 414 static void ack_bast(void *arg, int mode) 415 { 416 struct dlm_lock_resource *res = arg; 417 struct md_cluster_info *cinfo = res->mddev->cluster_info; 418 419 if (mode == DLM_LOCK_EX) { 420 if (test_bit(MD_CLUSTER_ALREADY_IN_CLUSTER, &cinfo->state)) 421 md_wakeup_thread(cinfo->recv_thread); 422 else 423 set_bit(MD_CLUSTER_PENDING_RECV_EVENT, &cinfo->state); 424 } 425 } 426 427 static void __remove_suspend_info(struct md_cluster_info *cinfo, int slot) 428 { 429 struct suspend_info *s, *tmp; 430 431 list_for_each_entry_safe(s, tmp, &cinfo->suspend_list, list) 432 if (slot == s->slot) { 433 list_del(&s->list); 434 kfree(s); 435 break; 436 } 437 } 438 439 static void remove_suspend_info(struct mddev *mddev, int slot) 440 { 441 struct md_cluster_info *cinfo = mddev->cluster_info; 442 spin_lock_irq(&cinfo->suspend_lock); 443 __remove_suspend_info(cinfo, slot); 444 spin_unlock_irq(&cinfo->suspend_lock); 445 mddev->pers->quiesce(mddev, 2); 446 } 447 448 449 static void process_suspend_info(struct mddev *mddev, 450 int slot, sector_t lo, sector_t hi) 451 { 452 struct md_cluster_info *cinfo = mddev->cluster_info; 453 struct suspend_info *s; 454 455 if (!hi) { 456 remove_suspend_info(mddev, slot); 457 set_bit(MD_RECOVERY_NEEDED, &mddev->recovery); 458 md_wakeup_thread(mddev->thread); 459 return; 460 } 461 462 /* 463 * The bitmaps are not same for different nodes 464 * if RESYNCING is happening in one node, then 465 * the node which received the RESYNCING message 466 * probably will perform resync with the region 467 * [lo, hi] again, so we could reduce resync time 468 * a lot if we can ensure that the bitmaps among 469 * different nodes are match up well. 470 * 471 * sync_low/hi is used to record the region which 472 * arrived in the previous RESYNCING message, 473 * 474 * Call bitmap_sync_with_cluster to clear 475 * NEEDED_MASK and set RESYNC_MASK since 476 * resync thread is running in another node, 477 * so we don't need to do the resync again 478 * with the same section */ 479 bitmap_sync_with_cluster(mddev, cinfo->sync_low, 480 cinfo->sync_hi, 481 lo, hi); 482 cinfo->sync_low = lo; 483 cinfo->sync_hi = hi; 484 485 s = kzalloc(sizeof(struct suspend_info), GFP_KERNEL); 486 if (!s) 487 return; 488 s->slot = slot; 489 s->lo = lo; 490 s->hi = hi; 491 mddev->pers->quiesce(mddev, 1); 492 mddev->pers->quiesce(mddev, 0); 493 spin_lock_irq(&cinfo->suspend_lock); 494 /* Remove existing entry (if exists) before adding */ 495 __remove_suspend_info(cinfo, slot); 496 list_add(&s->list, &cinfo->suspend_list); 497 spin_unlock_irq(&cinfo->suspend_lock); 498 mddev->pers->quiesce(mddev, 2); 499 } 500 501 static void process_add_new_disk(struct mddev *mddev, struct cluster_msg *cmsg) 502 { 503 char disk_uuid[64]; 504 struct md_cluster_info *cinfo = mddev->cluster_info; 505 char event_name[] = "EVENT=ADD_DEVICE"; 506 char raid_slot[16]; 507 char *envp[] = {event_name, disk_uuid, raid_slot, NULL}; 508 int len; 509 510 len = snprintf(disk_uuid, 64, "DEVICE_UUID="); 511 sprintf(disk_uuid + len, "%pU", cmsg->uuid); 512 snprintf(raid_slot, 16, "RAID_DISK=%d", le32_to_cpu(cmsg->raid_slot)); 513 pr_info("%s:%d Sending kobject change with %s and %s\n", __func__, __LINE__, disk_uuid, raid_slot); 514 init_completion(&cinfo->newdisk_completion); 515 set_bit(MD_CLUSTER_WAITING_FOR_NEWDISK, &cinfo->state); 516 kobject_uevent_env(&disk_to_dev(mddev->gendisk)->kobj, KOBJ_CHANGE, envp); 517 wait_for_completion_timeout(&cinfo->newdisk_completion, 518 NEW_DEV_TIMEOUT); 519 clear_bit(MD_CLUSTER_WAITING_FOR_NEWDISK, &cinfo->state); 520 } 521 522 523 static void process_metadata_update(struct mddev *mddev, struct cluster_msg *msg) 524 { 525 struct md_cluster_info *cinfo = mddev->cluster_info; 526 mddev->good_device_nr = le32_to_cpu(msg->raid_slot); 527 set_bit(MD_RELOAD_SB, &mddev->flags); 528 dlm_lock_sync(cinfo->no_new_dev_lockres, DLM_LOCK_CR); 529 md_wakeup_thread(mddev->thread); 530 } 531 532 static void process_remove_disk(struct mddev *mddev, struct cluster_msg *msg) 533 { 534 struct md_rdev *rdev; 535 536 rcu_read_lock(); 537 rdev = md_find_rdev_nr_rcu(mddev, le32_to_cpu(msg->raid_slot)); 538 if (rdev) { 539 set_bit(ClusterRemove, &rdev->flags); 540 set_bit(MD_RECOVERY_NEEDED, &mddev->recovery); 541 md_wakeup_thread(mddev->thread); 542 } 543 else 544 pr_warn("%s: %d Could not find disk(%d) to REMOVE\n", 545 __func__, __LINE__, le32_to_cpu(msg->raid_slot)); 546 rcu_read_unlock(); 547 } 548 549 static void process_readd_disk(struct mddev *mddev, struct cluster_msg *msg) 550 { 551 struct md_rdev *rdev; 552 553 rcu_read_lock(); 554 rdev = md_find_rdev_nr_rcu(mddev, le32_to_cpu(msg->raid_slot)); 555 if (rdev && test_bit(Faulty, &rdev->flags)) 556 clear_bit(Faulty, &rdev->flags); 557 else 558 pr_warn("%s: %d Could not find disk(%d) which is faulty", 559 __func__, __LINE__, le32_to_cpu(msg->raid_slot)); 560 rcu_read_unlock(); 561 } 562 563 static int process_recvd_msg(struct mddev *mddev, struct cluster_msg *msg) 564 { 565 int ret = 0; 566 567 if (WARN(mddev->cluster_info->slot_number - 1 == le32_to_cpu(msg->slot), 568 "node %d received it's own msg\n", le32_to_cpu(msg->slot))) 569 return -1; 570 switch (le32_to_cpu(msg->type)) { 571 case METADATA_UPDATED: 572 process_metadata_update(mddev, msg); 573 break; 574 case RESYNCING: 575 process_suspend_info(mddev, le32_to_cpu(msg->slot), 576 le64_to_cpu(msg->low), 577 le64_to_cpu(msg->high)); 578 break; 579 case NEWDISK: 580 process_add_new_disk(mddev, msg); 581 break; 582 case REMOVE: 583 process_remove_disk(mddev, msg); 584 break; 585 case RE_ADD: 586 process_readd_disk(mddev, msg); 587 break; 588 case BITMAP_NEEDS_SYNC: 589 __recover_slot(mddev, le32_to_cpu(msg->slot)); 590 break; 591 default: 592 ret = -1; 593 pr_warn("%s:%d Received unknown message from %d\n", 594 __func__, __LINE__, msg->slot); 595 } 596 return ret; 597 } 598 599 /* 600 * thread for receiving message 601 */ 602 static void recv_daemon(struct md_thread *thread) 603 { 604 struct md_cluster_info *cinfo = thread->mddev->cluster_info; 605 struct dlm_lock_resource *ack_lockres = cinfo->ack_lockres; 606 struct dlm_lock_resource *message_lockres = cinfo->message_lockres; 607 struct cluster_msg msg; 608 int ret; 609 610 mutex_lock(&cinfo->recv_mutex); 611 /*get CR on Message*/ 612 if (dlm_lock_sync(message_lockres, DLM_LOCK_CR)) { 613 pr_err("md/raid1:failed to get CR on MESSAGE\n"); 614 mutex_unlock(&cinfo->recv_mutex); 615 return; 616 } 617 618 /* read lvb and wake up thread to process this message_lockres */ 619 memcpy(&msg, message_lockres->lksb.sb_lvbptr, sizeof(struct cluster_msg)); 620 ret = process_recvd_msg(thread->mddev, &msg); 621 if (ret) 622 goto out; 623 624 /*release CR on ack_lockres*/ 625 ret = dlm_unlock_sync(ack_lockres); 626 if (unlikely(ret != 0)) 627 pr_info("unlock ack failed return %d\n", ret); 628 /*up-convert to PR on message_lockres*/ 629 ret = dlm_lock_sync(message_lockres, DLM_LOCK_PR); 630 if (unlikely(ret != 0)) 631 pr_info("lock PR on msg failed return %d\n", ret); 632 /*get CR on ack_lockres again*/ 633 ret = dlm_lock_sync(ack_lockres, DLM_LOCK_CR); 634 if (unlikely(ret != 0)) 635 pr_info("lock CR on ack failed return %d\n", ret); 636 out: 637 /*release CR on message_lockres*/ 638 ret = dlm_unlock_sync(message_lockres); 639 if (unlikely(ret != 0)) 640 pr_info("unlock msg failed return %d\n", ret); 641 mutex_unlock(&cinfo->recv_mutex); 642 } 643 644 /* lock_token() 645 * Takes the lock on the TOKEN lock resource so no other 646 * node can communicate while the operation is underway. 647 */ 648 static int lock_token(struct md_cluster_info *cinfo) 649 { 650 int error; 651 652 error = dlm_lock_sync(cinfo->token_lockres, DLM_LOCK_EX); 653 if (error) 654 pr_err("md-cluster(%s:%d): failed to get EX on TOKEN (%d)\n", 655 __func__, __LINE__, error); 656 657 /* Lock the receive sequence */ 658 mutex_lock(&cinfo->recv_mutex); 659 return error; 660 } 661 662 /* lock_comm() 663 * Sets the MD_CLUSTER_SEND_LOCK bit to lock the send channel. 664 */ 665 static int lock_comm(struct md_cluster_info *cinfo) 666 { 667 wait_event(cinfo->wait, 668 !test_and_set_bit(MD_CLUSTER_SEND_LOCK, &cinfo->state)); 669 670 return lock_token(cinfo); 671 } 672 673 static void unlock_comm(struct md_cluster_info *cinfo) 674 { 675 WARN_ON(cinfo->token_lockres->mode != DLM_LOCK_EX); 676 mutex_unlock(&cinfo->recv_mutex); 677 dlm_unlock_sync(cinfo->token_lockres); 678 clear_bit(MD_CLUSTER_SEND_LOCK, &cinfo->state); 679 wake_up(&cinfo->wait); 680 } 681 682 /* __sendmsg() 683 * This function performs the actual sending of the message. This function is 684 * usually called after performing the encompassing operation 685 * The function: 686 * 1. Grabs the message lockresource in EX mode 687 * 2. Copies the message to the message LVB 688 * 3. Downconverts message lockresource to CW 689 * 4. Upconverts ack lock resource from CR to EX. This forces the BAST on other nodes 690 * and the other nodes read the message. The thread will wait here until all other 691 * nodes have released ack lock resource. 692 * 5. Downconvert ack lockresource to CR 693 */ 694 static int __sendmsg(struct md_cluster_info *cinfo, struct cluster_msg *cmsg) 695 { 696 int error; 697 int slot = cinfo->slot_number - 1; 698 699 cmsg->slot = cpu_to_le32(slot); 700 /*get EX on Message*/ 701 error = dlm_lock_sync(cinfo->message_lockres, DLM_LOCK_EX); 702 if (error) { 703 pr_err("md-cluster: failed to get EX on MESSAGE (%d)\n", error); 704 goto failed_message; 705 } 706 707 memcpy(cinfo->message_lockres->lksb.sb_lvbptr, (void *)cmsg, 708 sizeof(struct cluster_msg)); 709 /*down-convert EX to CW on Message*/ 710 error = dlm_lock_sync(cinfo->message_lockres, DLM_LOCK_CW); 711 if (error) { 712 pr_err("md-cluster: failed to convert EX to CW on MESSAGE(%d)\n", 713 error); 714 goto failed_ack; 715 } 716 717 /*up-convert CR to EX on Ack*/ 718 error = dlm_lock_sync(cinfo->ack_lockres, DLM_LOCK_EX); 719 if (error) { 720 pr_err("md-cluster: failed to convert CR to EX on ACK(%d)\n", 721 error); 722 goto failed_ack; 723 } 724 725 /*down-convert EX to CR on Ack*/ 726 error = dlm_lock_sync(cinfo->ack_lockres, DLM_LOCK_CR); 727 if (error) { 728 pr_err("md-cluster: failed to convert EX to CR on ACK(%d)\n", 729 error); 730 goto failed_ack; 731 } 732 733 failed_ack: 734 error = dlm_unlock_sync(cinfo->message_lockres); 735 if (unlikely(error != 0)) { 736 pr_err("md-cluster: failed convert to NL on MESSAGE(%d)\n", 737 error); 738 /* in case the message can't be released due to some reason */ 739 goto failed_ack; 740 } 741 failed_message: 742 return error; 743 } 744 745 static int sendmsg(struct md_cluster_info *cinfo, struct cluster_msg *cmsg) 746 { 747 int ret; 748 749 lock_comm(cinfo); 750 ret = __sendmsg(cinfo, cmsg); 751 unlock_comm(cinfo); 752 return ret; 753 } 754 755 static int gather_all_resync_info(struct mddev *mddev, int total_slots) 756 { 757 struct md_cluster_info *cinfo = mddev->cluster_info; 758 int i, ret = 0; 759 struct dlm_lock_resource *bm_lockres; 760 struct suspend_info *s; 761 char str[64]; 762 sector_t lo, hi; 763 764 765 for (i = 0; i < total_slots; i++) { 766 memset(str, '\0', 64); 767 snprintf(str, 64, "bitmap%04d", i); 768 bm_lockres = lockres_init(mddev, str, NULL, 1); 769 if (!bm_lockres) 770 return -ENOMEM; 771 if (i == (cinfo->slot_number - 1)) { 772 lockres_free(bm_lockres); 773 continue; 774 } 775 776 bm_lockres->flags |= DLM_LKF_NOQUEUE; 777 ret = dlm_lock_sync(bm_lockres, DLM_LOCK_PW); 778 if (ret == -EAGAIN) { 779 memset(bm_lockres->lksb.sb_lvbptr, '\0', LVB_SIZE); 780 s = read_resync_info(mddev, bm_lockres); 781 if (s) { 782 pr_info("%s:%d Resync[%llu..%llu] in progress on %d\n", 783 __func__, __LINE__, 784 (unsigned long long) s->lo, 785 (unsigned long long) s->hi, i); 786 spin_lock_irq(&cinfo->suspend_lock); 787 s->slot = i; 788 list_add(&s->list, &cinfo->suspend_list); 789 spin_unlock_irq(&cinfo->suspend_lock); 790 } 791 ret = 0; 792 lockres_free(bm_lockres); 793 continue; 794 } 795 if (ret) { 796 lockres_free(bm_lockres); 797 goto out; 798 } 799 800 /* Read the disk bitmap sb and check if it needs recovery */ 801 ret = bitmap_copy_from_slot(mddev, i, &lo, &hi, false); 802 if (ret) { 803 pr_warn("md-cluster: Could not gather bitmaps from slot %d", i); 804 lockres_free(bm_lockres); 805 continue; 806 } 807 if ((hi > 0) && (lo < mddev->recovery_cp)) { 808 set_bit(MD_RECOVERY_NEEDED, &mddev->recovery); 809 mddev->recovery_cp = lo; 810 md_check_recovery(mddev); 811 } 812 813 lockres_free(bm_lockres); 814 } 815 out: 816 return ret; 817 } 818 819 static int join(struct mddev *mddev, int nodes) 820 { 821 struct md_cluster_info *cinfo; 822 int ret, ops_rv; 823 char str[64]; 824 825 cinfo = kzalloc(sizeof(struct md_cluster_info), GFP_KERNEL); 826 if (!cinfo) 827 return -ENOMEM; 828 829 INIT_LIST_HEAD(&cinfo->suspend_list); 830 spin_lock_init(&cinfo->suspend_lock); 831 init_completion(&cinfo->completion); 832 set_bit(MD_CLUSTER_BEGIN_JOIN_CLUSTER, &cinfo->state); 833 init_waitqueue_head(&cinfo->wait); 834 mutex_init(&cinfo->recv_mutex); 835 836 mddev->cluster_info = cinfo; 837 838 memset(str, 0, 64); 839 sprintf(str, "%pU", mddev->uuid); 840 ret = dlm_new_lockspace(str, mddev->bitmap_info.cluster_name, 841 DLM_LSFL_FS, LVB_SIZE, 842 &md_ls_ops, mddev, &ops_rv, &cinfo->lockspace); 843 if (ret) 844 goto err; 845 wait_for_completion(&cinfo->completion); 846 if (nodes < cinfo->slot_number) { 847 pr_err("md-cluster: Slot allotted(%d) is greater than available slots(%d).", 848 cinfo->slot_number, nodes); 849 ret = -ERANGE; 850 goto err; 851 } 852 /* Initiate the communication resources */ 853 ret = -ENOMEM; 854 cinfo->recv_thread = md_register_thread(recv_daemon, mddev, "cluster_recv"); 855 if (!cinfo->recv_thread) { 856 pr_err("md-cluster: cannot allocate memory for recv_thread!\n"); 857 goto err; 858 } 859 cinfo->message_lockres = lockres_init(mddev, "message", NULL, 1); 860 if (!cinfo->message_lockres) 861 goto err; 862 cinfo->token_lockres = lockres_init(mddev, "token", NULL, 0); 863 if (!cinfo->token_lockres) 864 goto err; 865 cinfo->no_new_dev_lockres = lockres_init(mddev, "no-new-dev", NULL, 0); 866 if (!cinfo->no_new_dev_lockres) 867 goto err; 868 869 ret = dlm_lock_sync(cinfo->token_lockres, DLM_LOCK_EX); 870 if (ret) { 871 ret = -EAGAIN; 872 pr_err("md-cluster: can't join cluster to avoid lock issue\n"); 873 goto err; 874 } 875 cinfo->ack_lockres = lockres_init(mddev, "ack", ack_bast, 0); 876 if (!cinfo->ack_lockres) { 877 ret = -ENOMEM; 878 goto err; 879 } 880 /* get sync CR lock on ACK. */ 881 if (dlm_lock_sync(cinfo->ack_lockres, DLM_LOCK_CR)) 882 pr_err("md-cluster: failed to get a sync CR lock on ACK!(%d)\n", 883 ret); 884 dlm_unlock_sync(cinfo->token_lockres); 885 /* get sync CR lock on no-new-dev. */ 886 if (dlm_lock_sync(cinfo->no_new_dev_lockres, DLM_LOCK_CR)) 887 pr_err("md-cluster: failed to get a sync CR lock on no-new-dev!(%d)\n", ret); 888 889 890 pr_info("md-cluster: Joined cluster %s slot %d\n", str, cinfo->slot_number); 891 snprintf(str, 64, "bitmap%04d", cinfo->slot_number - 1); 892 cinfo->bitmap_lockres = lockres_init(mddev, str, NULL, 1); 893 if (!cinfo->bitmap_lockres) { 894 ret = -ENOMEM; 895 goto err; 896 } 897 if (dlm_lock_sync(cinfo->bitmap_lockres, DLM_LOCK_PW)) { 898 pr_err("Failed to get bitmap lock\n"); 899 ret = -EINVAL; 900 goto err; 901 } 902 903 cinfo->resync_lockres = lockres_init(mddev, "resync", NULL, 0); 904 if (!cinfo->resync_lockres) { 905 ret = -ENOMEM; 906 goto err; 907 } 908 909 return 0; 910 err: 911 md_unregister_thread(&cinfo->recovery_thread); 912 md_unregister_thread(&cinfo->recv_thread); 913 lockres_free(cinfo->message_lockres); 914 lockres_free(cinfo->token_lockres); 915 lockres_free(cinfo->ack_lockres); 916 lockres_free(cinfo->no_new_dev_lockres); 917 lockres_free(cinfo->resync_lockres); 918 lockres_free(cinfo->bitmap_lockres); 919 if (cinfo->lockspace) 920 dlm_release_lockspace(cinfo->lockspace, 2); 921 mddev->cluster_info = NULL; 922 kfree(cinfo); 923 return ret; 924 } 925 926 static void load_bitmaps(struct mddev *mddev, int total_slots) 927 { 928 struct md_cluster_info *cinfo = mddev->cluster_info; 929 930 /* load all the node's bitmap info for resync */ 931 if (gather_all_resync_info(mddev, total_slots)) 932 pr_err("md-cluster: failed to gather all resyn infos\n"); 933 set_bit(MD_CLUSTER_ALREADY_IN_CLUSTER, &cinfo->state); 934 /* wake up recv thread in case something need to be handled */ 935 if (test_and_clear_bit(MD_CLUSTER_PENDING_RECV_EVENT, &cinfo->state)) 936 md_wakeup_thread(cinfo->recv_thread); 937 } 938 939 static void resync_bitmap(struct mddev *mddev) 940 { 941 struct md_cluster_info *cinfo = mddev->cluster_info; 942 struct cluster_msg cmsg = {0}; 943 int err; 944 945 cmsg.type = cpu_to_le32(BITMAP_NEEDS_SYNC); 946 err = sendmsg(cinfo, &cmsg); 947 if (err) 948 pr_err("%s:%d: failed to send BITMAP_NEEDS_SYNC message (%d)\n", 949 __func__, __LINE__, err); 950 } 951 952 static void unlock_all_bitmaps(struct mddev *mddev); 953 static int leave(struct mddev *mddev) 954 { 955 struct md_cluster_info *cinfo = mddev->cluster_info; 956 957 if (!cinfo) 958 return 0; 959 960 /* BITMAP_NEEDS_SYNC message should be sent when node 961 * is leaving the cluster with dirty bitmap, also we 962 * can only deliver it when dlm connection is available */ 963 if (cinfo->slot_number > 0 && mddev->recovery_cp != MaxSector) 964 resync_bitmap(mddev); 965 966 md_unregister_thread(&cinfo->recovery_thread); 967 md_unregister_thread(&cinfo->recv_thread); 968 lockres_free(cinfo->message_lockres); 969 lockres_free(cinfo->token_lockres); 970 lockres_free(cinfo->ack_lockres); 971 lockres_free(cinfo->no_new_dev_lockres); 972 lockres_free(cinfo->resync_lockres); 973 lockres_free(cinfo->bitmap_lockres); 974 unlock_all_bitmaps(mddev); 975 dlm_release_lockspace(cinfo->lockspace, 2); 976 return 0; 977 } 978 979 /* slot_number(): Returns the MD slot number to use 980 * DLM starts the slot numbers from 1, wheras cluster-md 981 * wants the number to be from zero, so we deduct one 982 */ 983 static int slot_number(struct mddev *mddev) 984 { 985 struct md_cluster_info *cinfo = mddev->cluster_info; 986 987 return cinfo->slot_number - 1; 988 } 989 990 /* 991 * Check if the communication is already locked, else lock the communication 992 * channel. 993 * If it is already locked, token is in EX mode, and hence lock_token() 994 * should not be called. 995 */ 996 static int metadata_update_start(struct mddev *mddev) 997 { 998 struct md_cluster_info *cinfo = mddev->cluster_info; 999 1000 wait_event(cinfo->wait, 1001 !test_and_set_bit(MD_CLUSTER_SEND_LOCK, &cinfo->state) || 1002 test_and_clear_bit(MD_CLUSTER_SEND_LOCKED_ALREADY, &cinfo->state)); 1003 1004 /* If token is already locked, return 0 */ 1005 if (cinfo->token_lockres->mode == DLM_LOCK_EX) 1006 return 0; 1007 1008 return lock_token(cinfo); 1009 } 1010 1011 static int metadata_update_finish(struct mddev *mddev) 1012 { 1013 struct md_cluster_info *cinfo = mddev->cluster_info; 1014 struct cluster_msg cmsg; 1015 struct md_rdev *rdev; 1016 int ret = 0; 1017 int raid_slot = -1; 1018 1019 memset(&cmsg, 0, sizeof(cmsg)); 1020 cmsg.type = cpu_to_le32(METADATA_UPDATED); 1021 /* Pick up a good active device number to send. 1022 */ 1023 rdev_for_each(rdev, mddev) 1024 if (rdev->raid_disk > -1 && !test_bit(Faulty, &rdev->flags)) { 1025 raid_slot = rdev->desc_nr; 1026 break; 1027 } 1028 if (raid_slot >= 0) { 1029 cmsg.raid_slot = cpu_to_le32(raid_slot); 1030 ret = __sendmsg(cinfo, &cmsg); 1031 } else 1032 pr_warn("md-cluster: No good device id found to send\n"); 1033 clear_bit(MD_CLUSTER_SEND_LOCKED_ALREADY, &cinfo->state); 1034 unlock_comm(cinfo); 1035 return ret; 1036 } 1037 1038 static void metadata_update_cancel(struct mddev *mddev) 1039 { 1040 struct md_cluster_info *cinfo = mddev->cluster_info; 1041 clear_bit(MD_CLUSTER_SEND_LOCKED_ALREADY, &cinfo->state); 1042 unlock_comm(cinfo); 1043 } 1044 1045 static int resync_start(struct mddev *mddev) 1046 { 1047 struct md_cluster_info *cinfo = mddev->cluster_info; 1048 return dlm_lock_sync(cinfo->resync_lockres, DLM_LOCK_EX); 1049 } 1050 1051 static int resync_info_update(struct mddev *mddev, sector_t lo, sector_t hi) 1052 { 1053 struct md_cluster_info *cinfo = mddev->cluster_info; 1054 struct resync_info ri; 1055 struct cluster_msg cmsg = {0}; 1056 1057 /* do not send zero again, if we have sent before */ 1058 if (hi == 0) { 1059 memcpy(&ri, cinfo->bitmap_lockres->lksb.sb_lvbptr, sizeof(struct resync_info)); 1060 if (le64_to_cpu(ri.hi) == 0) 1061 return 0; 1062 } 1063 1064 add_resync_info(cinfo->bitmap_lockres, lo, hi); 1065 /* Re-acquire the lock to refresh LVB */ 1066 dlm_lock_sync(cinfo->bitmap_lockres, DLM_LOCK_PW); 1067 cmsg.type = cpu_to_le32(RESYNCING); 1068 cmsg.low = cpu_to_le64(lo); 1069 cmsg.high = cpu_to_le64(hi); 1070 1071 return sendmsg(cinfo, &cmsg); 1072 } 1073 1074 static int resync_finish(struct mddev *mddev) 1075 { 1076 struct md_cluster_info *cinfo = mddev->cluster_info; 1077 dlm_unlock_sync(cinfo->resync_lockres); 1078 return resync_info_update(mddev, 0, 0); 1079 } 1080 1081 static int area_resyncing(struct mddev *mddev, int direction, 1082 sector_t lo, sector_t hi) 1083 { 1084 struct md_cluster_info *cinfo = mddev->cluster_info; 1085 int ret = 0; 1086 struct suspend_info *s; 1087 1088 if ((direction == READ) && 1089 test_bit(MD_CLUSTER_SUSPEND_READ_BALANCING, &cinfo->state)) 1090 return 1; 1091 1092 spin_lock_irq(&cinfo->suspend_lock); 1093 if (list_empty(&cinfo->suspend_list)) 1094 goto out; 1095 list_for_each_entry(s, &cinfo->suspend_list, list) 1096 if (hi > s->lo && lo < s->hi) { 1097 ret = 1; 1098 break; 1099 } 1100 out: 1101 spin_unlock_irq(&cinfo->suspend_lock); 1102 return ret; 1103 } 1104 1105 /* add_new_disk() - initiates a disk add 1106 * However, if this fails before writing md_update_sb(), 1107 * add_new_disk_cancel() must be called to release token lock 1108 */ 1109 static int add_new_disk(struct mddev *mddev, struct md_rdev *rdev) 1110 { 1111 struct md_cluster_info *cinfo = mddev->cluster_info; 1112 struct cluster_msg cmsg; 1113 int ret = 0; 1114 struct mdp_superblock_1 *sb = page_address(rdev->sb_page); 1115 char *uuid = sb->device_uuid; 1116 1117 memset(&cmsg, 0, sizeof(cmsg)); 1118 cmsg.type = cpu_to_le32(NEWDISK); 1119 memcpy(cmsg.uuid, uuid, 16); 1120 cmsg.raid_slot = cpu_to_le32(rdev->desc_nr); 1121 lock_comm(cinfo); 1122 ret = __sendmsg(cinfo, &cmsg); 1123 if (ret) 1124 return ret; 1125 cinfo->no_new_dev_lockres->flags |= DLM_LKF_NOQUEUE; 1126 ret = dlm_lock_sync(cinfo->no_new_dev_lockres, DLM_LOCK_EX); 1127 cinfo->no_new_dev_lockres->flags &= ~DLM_LKF_NOQUEUE; 1128 /* Some node does not "see" the device */ 1129 if (ret == -EAGAIN) 1130 ret = -ENOENT; 1131 if (ret) 1132 unlock_comm(cinfo); 1133 else { 1134 dlm_lock_sync(cinfo->no_new_dev_lockres, DLM_LOCK_CR); 1135 /* Since MD_CHANGE_DEVS will be set in add_bound_rdev which 1136 * will run soon after add_new_disk, the below path will be 1137 * invoked: 1138 * md_wakeup_thread(mddev->thread) 1139 * -> conf->thread (raid1d) 1140 * -> md_check_recovery -> md_update_sb 1141 * -> metadata_update_start/finish 1142 * MD_CLUSTER_SEND_LOCKED_ALREADY will be cleared eventually. 1143 * 1144 * For other failure cases, metadata_update_cancel and 1145 * add_new_disk_cancel also clear below bit as well. 1146 * */ 1147 set_bit(MD_CLUSTER_SEND_LOCKED_ALREADY, &cinfo->state); 1148 wake_up(&cinfo->wait); 1149 } 1150 return ret; 1151 } 1152 1153 static void add_new_disk_cancel(struct mddev *mddev) 1154 { 1155 struct md_cluster_info *cinfo = mddev->cluster_info; 1156 clear_bit(MD_CLUSTER_SEND_LOCKED_ALREADY, &cinfo->state); 1157 unlock_comm(cinfo); 1158 } 1159 1160 static int new_disk_ack(struct mddev *mddev, bool ack) 1161 { 1162 struct md_cluster_info *cinfo = mddev->cluster_info; 1163 1164 if (!test_bit(MD_CLUSTER_WAITING_FOR_NEWDISK, &cinfo->state)) { 1165 pr_warn("md-cluster(%s): Spurious cluster confirmation\n", mdname(mddev)); 1166 return -EINVAL; 1167 } 1168 1169 if (ack) 1170 dlm_unlock_sync(cinfo->no_new_dev_lockres); 1171 complete(&cinfo->newdisk_completion); 1172 return 0; 1173 } 1174 1175 static int remove_disk(struct mddev *mddev, struct md_rdev *rdev) 1176 { 1177 struct cluster_msg cmsg = {0}; 1178 struct md_cluster_info *cinfo = mddev->cluster_info; 1179 cmsg.type = cpu_to_le32(REMOVE); 1180 cmsg.raid_slot = cpu_to_le32(rdev->desc_nr); 1181 return sendmsg(cinfo, &cmsg); 1182 } 1183 1184 static int lock_all_bitmaps(struct mddev *mddev) 1185 { 1186 int slot, my_slot, ret, held = 1, i = 0; 1187 char str[64]; 1188 struct md_cluster_info *cinfo = mddev->cluster_info; 1189 1190 cinfo->other_bitmap_lockres = kzalloc((mddev->bitmap_info.nodes - 1) * 1191 sizeof(struct dlm_lock_resource *), 1192 GFP_KERNEL); 1193 if (!cinfo->other_bitmap_lockres) { 1194 pr_err("md: can't alloc mem for other bitmap locks\n"); 1195 return 0; 1196 } 1197 1198 my_slot = slot_number(mddev); 1199 for (slot = 0; slot < mddev->bitmap_info.nodes; slot++) { 1200 if (slot == my_slot) 1201 continue; 1202 1203 memset(str, '\0', 64); 1204 snprintf(str, 64, "bitmap%04d", slot); 1205 cinfo->other_bitmap_lockres[i] = lockres_init(mddev, str, NULL, 1); 1206 if (!cinfo->other_bitmap_lockres[i]) 1207 return -ENOMEM; 1208 1209 cinfo->other_bitmap_lockres[i]->flags |= DLM_LKF_NOQUEUE; 1210 ret = dlm_lock_sync(cinfo->other_bitmap_lockres[i], DLM_LOCK_PW); 1211 if (ret) 1212 held = -1; 1213 i++; 1214 } 1215 1216 return held; 1217 } 1218 1219 static void unlock_all_bitmaps(struct mddev *mddev) 1220 { 1221 struct md_cluster_info *cinfo = mddev->cluster_info; 1222 int i; 1223 1224 /* release other node's bitmap lock if they are existed */ 1225 if (cinfo->other_bitmap_lockres) { 1226 for (i = 0; i < mddev->bitmap_info.nodes - 1; i++) { 1227 if (cinfo->other_bitmap_lockres[i]) { 1228 lockres_free(cinfo->other_bitmap_lockres[i]); 1229 } 1230 } 1231 kfree(cinfo->other_bitmap_lockres); 1232 } 1233 } 1234 1235 static int gather_bitmaps(struct md_rdev *rdev) 1236 { 1237 int sn, err; 1238 sector_t lo, hi; 1239 struct cluster_msg cmsg = {0}; 1240 struct mddev *mddev = rdev->mddev; 1241 struct md_cluster_info *cinfo = mddev->cluster_info; 1242 1243 cmsg.type = cpu_to_le32(RE_ADD); 1244 cmsg.raid_slot = cpu_to_le32(rdev->desc_nr); 1245 err = sendmsg(cinfo, &cmsg); 1246 if (err) 1247 goto out; 1248 1249 for (sn = 0; sn < mddev->bitmap_info.nodes; sn++) { 1250 if (sn == (cinfo->slot_number - 1)) 1251 continue; 1252 err = bitmap_copy_from_slot(mddev, sn, &lo, &hi, false); 1253 if (err) { 1254 pr_warn("md-cluster: Could not gather bitmaps from slot %d", sn); 1255 goto out; 1256 } 1257 if ((hi > 0) && (lo < mddev->recovery_cp)) 1258 mddev->recovery_cp = lo; 1259 } 1260 out: 1261 return err; 1262 } 1263 1264 static struct md_cluster_operations cluster_ops = { 1265 .join = join, 1266 .leave = leave, 1267 .slot_number = slot_number, 1268 .resync_start = resync_start, 1269 .resync_finish = resync_finish, 1270 .resync_info_update = resync_info_update, 1271 .metadata_update_start = metadata_update_start, 1272 .metadata_update_finish = metadata_update_finish, 1273 .metadata_update_cancel = metadata_update_cancel, 1274 .area_resyncing = area_resyncing, 1275 .add_new_disk = add_new_disk, 1276 .add_new_disk_cancel = add_new_disk_cancel, 1277 .new_disk_ack = new_disk_ack, 1278 .remove_disk = remove_disk, 1279 .load_bitmaps = load_bitmaps, 1280 .gather_bitmaps = gather_bitmaps, 1281 .lock_all_bitmaps = lock_all_bitmaps, 1282 .unlock_all_bitmaps = unlock_all_bitmaps, 1283 }; 1284 1285 static int __init cluster_init(void) 1286 { 1287 pr_warn("md-cluster: EXPERIMENTAL. Use with caution\n"); 1288 pr_info("Registering Cluster MD functions\n"); 1289 register_md_cluster_operations(&cluster_ops, THIS_MODULE); 1290 return 0; 1291 } 1292 1293 static void cluster_exit(void) 1294 { 1295 unregister_md_cluster_operations(); 1296 } 1297 1298 module_init(cluster_init); 1299 module_exit(cluster_exit); 1300 MODULE_AUTHOR("SUSE"); 1301 MODULE_LICENSE("GPL"); 1302 MODULE_DESCRIPTION("Clustering support for MD"); 1303