1 /* 2 * Copyright (C) 2015, SUSE 3 * 4 * This program is free software; you can redistribute it and/or modify 5 * it under the terms of the GNU General Public License as published by 6 * the Free Software Foundation; either version 2, or (at your option) 7 * any later version. 8 * 9 */ 10 11 12 #include <linux/module.h> 13 #include <linux/dlm.h> 14 #include <linux/sched.h> 15 #include <linux/raid/md_p.h> 16 #include "md.h" 17 #include "bitmap.h" 18 #include "md-cluster.h" 19 20 #define LVB_SIZE 64 21 #define NEW_DEV_TIMEOUT 5000 22 23 struct dlm_lock_resource { 24 dlm_lockspace_t *ls; 25 struct dlm_lksb lksb; 26 char *name; /* lock name. */ 27 uint32_t flags; /* flags to pass to dlm_lock() */ 28 struct completion completion; /* completion for synchronized locking */ 29 void (*bast)(void *arg, int mode); /* blocking AST function pointer*/ 30 struct mddev *mddev; /* pointing back to mddev. */ 31 int mode; 32 }; 33 34 struct suspend_info { 35 int slot; 36 sector_t lo; 37 sector_t hi; 38 struct list_head list; 39 }; 40 41 struct resync_info { 42 __le64 lo; 43 __le64 hi; 44 }; 45 46 /* md_cluster_info flags */ 47 #define MD_CLUSTER_WAITING_FOR_NEWDISK 1 48 #define MD_CLUSTER_SUSPEND_READ_BALANCING 2 49 #define MD_CLUSTER_BEGIN_JOIN_CLUSTER 3 50 51 /* Lock the send communication. This is done through 52 * bit manipulation as opposed to a mutex in order to 53 * accomodate lock and hold. See next comment. 54 */ 55 #define MD_CLUSTER_SEND_LOCK 4 56 /* If cluster operations (such as adding a disk) must lock the 57 * communication channel, so as to perform extra operations 58 * (update metadata) and no other operation is allowed on the 59 * MD. Token needs to be locked and held until the operation 60 * completes witha md_update_sb(), which would eventually release 61 * the lock. 62 */ 63 #define MD_CLUSTER_SEND_LOCKED_ALREADY 5 64 /* We should receive message after node joined cluster and 65 * set up all the related infos such as bitmap and personality */ 66 #define MD_CLUSTER_ALREADY_IN_CLUSTER 6 67 #define MD_CLUSTER_PENDING_RECV_EVENT 7 68 69 70 struct md_cluster_info { 71 /* dlm lock space and resources for clustered raid. */ 72 dlm_lockspace_t *lockspace; 73 int slot_number; 74 struct completion completion; 75 struct mutex recv_mutex; 76 struct dlm_lock_resource *bitmap_lockres; 77 struct dlm_lock_resource **other_bitmap_lockres; 78 struct dlm_lock_resource *resync_lockres; 79 struct list_head suspend_list; 80 spinlock_t suspend_lock; 81 struct md_thread *recovery_thread; 82 unsigned long recovery_map; 83 /* communication loc resources */ 84 struct dlm_lock_resource *ack_lockres; 85 struct dlm_lock_resource *message_lockres; 86 struct dlm_lock_resource *token_lockres; 87 struct dlm_lock_resource *no_new_dev_lockres; 88 struct md_thread *recv_thread; 89 struct completion newdisk_completion; 90 wait_queue_head_t wait; 91 unsigned long state; 92 /* record the region in RESYNCING message */ 93 sector_t sync_low; 94 sector_t sync_hi; 95 }; 96 97 enum msg_type { 98 METADATA_UPDATED = 0, 99 RESYNCING, 100 NEWDISK, 101 REMOVE, 102 RE_ADD, 103 BITMAP_NEEDS_SYNC, 104 }; 105 106 struct cluster_msg { 107 __le32 type; 108 __le32 slot; 109 /* TODO: Unionize this for smaller footprint */ 110 __le64 low; 111 __le64 high; 112 char uuid[16]; 113 __le32 raid_slot; 114 }; 115 116 static void sync_ast(void *arg) 117 { 118 struct dlm_lock_resource *res; 119 120 res = arg; 121 complete(&res->completion); 122 } 123 124 static int dlm_lock_sync(struct dlm_lock_resource *res, int mode) 125 { 126 int ret = 0; 127 128 ret = dlm_lock(res->ls, mode, &res->lksb, 129 res->flags, res->name, strlen(res->name), 130 0, sync_ast, res, res->bast); 131 if (ret) 132 return ret; 133 wait_for_completion(&res->completion); 134 if (res->lksb.sb_status == 0) 135 res->mode = mode; 136 return res->lksb.sb_status; 137 } 138 139 static int dlm_unlock_sync(struct dlm_lock_resource *res) 140 { 141 return dlm_lock_sync(res, DLM_LOCK_NL); 142 } 143 144 static struct dlm_lock_resource *lockres_init(struct mddev *mddev, 145 char *name, void (*bastfn)(void *arg, int mode), int with_lvb) 146 { 147 struct dlm_lock_resource *res = NULL; 148 int ret, namelen; 149 struct md_cluster_info *cinfo = mddev->cluster_info; 150 151 res = kzalloc(sizeof(struct dlm_lock_resource), GFP_KERNEL); 152 if (!res) 153 return NULL; 154 init_completion(&res->completion); 155 res->ls = cinfo->lockspace; 156 res->mddev = mddev; 157 res->mode = DLM_LOCK_IV; 158 namelen = strlen(name); 159 res->name = kzalloc(namelen + 1, GFP_KERNEL); 160 if (!res->name) { 161 pr_err("md-cluster: Unable to allocate resource name for resource %s\n", name); 162 goto out_err; 163 } 164 strlcpy(res->name, name, namelen + 1); 165 if (with_lvb) { 166 res->lksb.sb_lvbptr = kzalloc(LVB_SIZE, GFP_KERNEL); 167 if (!res->lksb.sb_lvbptr) { 168 pr_err("md-cluster: Unable to allocate LVB for resource %s\n", name); 169 goto out_err; 170 } 171 res->flags = DLM_LKF_VALBLK; 172 } 173 174 if (bastfn) 175 res->bast = bastfn; 176 177 res->flags |= DLM_LKF_EXPEDITE; 178 179 ret = dlm_lock_sync(res, DLM_LOCK_NL); 180 if (ret) { 181 pr_err("md-cluster: Unable to lock NL on new lock resource %s\n", name); 182 goto out_err; 183 } 184 res->flags &= ~DLM_LKF_EXPEDITE; 185 res->flags |= DLM_LKF_CONVERT; 186 187 return res; 188 out_err: 189 kfree(res->lksb.sb_lvbptr); 190 kfree(res->name); 191 kfree(res); 192 return NULL; 193 } 194 195 static void lockres_free(struct dlm_lock_resource *res) 196 { 197 int ret = 0; 198 199 if (!res) 200 return; 201 202 /* 203 * use FORCEUNLOCK flag, so we can unlock even the lock is on the 204 * waiting or convert queue 205 */ 206 ret = dlm_unlock(res->ls, res->lksb.sb_lkid, DLM_LKF_FORCEUNLOCK, 207 &res->lksb, res); 208 if (unlikely(ret != 0)) 209 pr_err("failed to unlock %s return %d\n", res->name, ret); 210 else 211 wait_for_completion(&res->completion); 212 213 kfree(res->name); 214 kfree(res->lksb.sb_lvbptr); 215 kfree(res); 216 } 217 218 static void add_resync_info(struct dlm_lock_resource *lockres, 219 sector_t lo, sector_t hi) 220 { 221 struct resync_info *ri; 222 223 ri = (struct resync_info *)lockres->lksb.sb_lvbptr; 224 ri->lo = cpu_to_le64(lo); 225 ri->hi = cpu_to_le64(hi); 226 } 227 228 static struct suspend_info *read_resync_info(struct mddev *mddev, struct dlm_lock_resource *lockres) 229 { 230 struct resync_info ri; 231 struct suspend_info *s = NULL; 232 sector_t hi = 0; 233 234 dlm_lock_sync(lockres, DLM_LOCK_CR); 235 memcpy(&ri, lockres->lksb.sb_lvbptr, sizeof(struct resync_info)); 236 hi = le64_to_cpu(ri.hi); 237 if (hi > 0) { 238 s = kzalloc(sizeof(struct suspend_info), GFP_KERNEL); 239 if (!s) 240 goto out; 241 s->hi = hi; 242 s->lo = le64_to_cpu(ri.lo); 243 } 244 dlm_unlock_sync(lockres); 245 out: 246 return s; 247 } 248 249 static void recover_bitmaps(struct md_thread *thread) 250 { 251 struct mddev *mddev = thread->mddev; 252 struct md_cluster_info *cinfo = mddev->cluster_info; 253 struct dlm_lock_resource *bm_lockres; 254 char str[64]; 255 int slot, ret; 256 struct suspend_info *s, *tmp; 257 sector_t lo, hi; 258 259 while (cinfo->recovery_map) { 260 slot = fls64((u64)cinfo->recovery_map) - 1; 261 262 /* Clear suspend_area associated with the bitmap */ 263 spin_lock_irq(&cinfo->suspend_lock); 264 list_for_each_entry_safe(s, tmp, &cinfo->suspend_list, list) 265 if (slot == s->slot) { 266 list_del(&s->list); 267 kfree(s); 268 } 269 spin_unlock_irq(&cinfo->suspend_lock); 270 271 snprintf(str, 64, "bitmap%04d", slot); 272 bm_lockres = lockres_init(mddev, str, NULL, 1); 273 if (!bm_lockres) { 274 pr_err("md-cluster: Cannot initialize bitmaps\n"); 275 goto clear_bit; 276 } 277 278 ret = dlm_lock_sync(bm_lockres, DLM_LOCK_PW); 279 if (ret) { 280 pr_err("md-cluster: Could not DLM lock %s: %d\n", 281 str, ret); 282 goto clear_bit; 283 } 284 ret = bitmap_copy_from_slot(mddev, slot, &lo, &hi, true); 285 if (ret) { 286 pr_err("md-cluster: Could not copy data from bitmap %d\n", slot); 287 goto clear_bit; 288 } 289 if (hi > 0) { 290 if (lo < mddev->recovery_cp) 291 mddev->recovery_cp = lo; 292 /* wake up thread to continue resync in case resync 293 * is not finished */ 294 if (mddev->recovery_cp != MaxSector) { 295 set_bit(MD_RECOVERY_NEEDED, &mddev->recovery); 296 md_wakeup_thread(mddev->thread); 297 } 298 } 299 clear_bit: 300 lockres_free(bm_lockres); 301 clear_bit(slot, &cinfo->recovery_map); 302 } 303 } 304 305 static void recover_prep(void *arg) 306 { 307 struct mddev *mddev = arg; 308 struct md_cluster_info *cinfo = mddev->cluster_info; 309 set_bit(MD_CLUSTER_SUSPEND_READ_BALANCING, &cinfo->state); 310 } 311 312 static void __recover_slot(struct mddev *mddev, int slot) 313 { 314 struct md_cluster_info *cinfo = mddev->cluster_info; 315 316 set_bit(slot, &cinfo->recovery_map); 317 if (!cinfo->recovery_thread) { 318 cinfo->recovery_thread = md_register_thread(recover_bitmaps, 319 mddev, "recover"); 320 if (!cinfo->recovery_thread) { 321 pr_warn("md-cluster: Could not create recovery thread\n"); 322 return; 323 } 324 } 325 md_wakeup_thread(cinfo->recovery_thread); 326 } 327 328 static void recover_slot(void *arg, struct dlm_slot *slot) 329 { 330 struct mddev *mddev = arg; 331 struct md_cluster_info *cinfo = mddev->cluster_info; 332 333 pr_info("md-cluster: %s Node %d/%d down. My slot: %d. Initiating recovery.\n", 334 mddev->bitmap_info.cluster_name, 335 slot->nodeid, slot->slot, 336 cinfo->slot_number); 337 /* deduct one since dlm slot starts from one while the num of 338 * cluster-md begins with 0 */ 339 __recover_slot(mddev, slot->slot - 1); 340 } 341 342 static void recover_done(void *arg, struct dlm_slot *slots, 343 int num_slots, int our_slot, 344 uint32_t generation) 345 { 346 struct mddev *mddev = arg; 347 struct md_cluster_info *cinfo = mddev->cluster_info; 348 349 cinfo->slot_number = our_slot; 350 /* completion is only need to be complete when node join cluster, 351 * it doesn't need to run during another node's failure */ 352 if (test_bit(MD_CLUSTER_BEGIN_JOIN_CLUSTER, &cinfo->state)) { 353 complete(&cinfo->completion); 354 clear_bit(MD_CLUSTER_BEGIN_JOIN_CLUSTER, &cinfo->state); 355 } 356 clear_bit(MD_CLUSTER_SUSPEND_READ_BALANCING, &cinfo->state); 357 } 358 359 /* the ops is called when node join the cluster, and do lock recovery 360 * if node failure occurs */ 361 static const struct dlm_lockspace_ops md_ls_ops = { 362 .recover_prep = recover_prep, 363 .recover_slot = recover_slot, 364 .recover_done = recover_done, 365 }; 366 367 /* 368 * The BAST function for the ack lock resource 369 * This function wakes up the receive thread in 370 * order to receive and process the message. 371 */ 372 static void ack_bast(void *arg, int mode) 373 { 374 struct dlm_lock_resource *res = arg; 375 struct md_cluster_info *cinfo = res->mddev->cluster_info; 376 377 if (mode == DLM_LOCK_EX) { 378 if (test_bit(MD_CLUSTER_ALREADY_IN_CLUSTER, &cinfo->state)) 379 md_wakeup_thread(cinfo->recv_thread); 380 else 381 set_bit(MD_CLUSTER_PENDING_RECV_EVENT, &cinfo->state); 382 } 383 } 384 385 static void __remove_suspend_info(struct md_cluster_info *cinfo, int slot) 386 { 387 struct suspend_info *s, *tmp; 388 389 list_for_each_entry_safe(s, tmp, &cinfo->suspend_list, list) 390 if (slot == s->slot) { 391 list_del(&s->list); 392 kfree(s); 393 break; 394 } 395 } 396 397 static void remove_suspend_info(struct mddev *mddev, int slot) 398 { 399 struct md_cluster_info *cinfo = mddev->cluster_info; 400 spin_lock_irq(&cinfo->suspend_lock); 401 __remove_suspend_info(cinfo, slot); 402 spin_unlock_irq(&cinfo->suspend_lock); 403 mddev->pers->quiesce(mddev, 2); 404 } 405 406 407 static void process_suspend_info(struct mddev *mddev, 408 int slot, sector_t lo, sector_t hi) 409 { 410 struct md_cluster_info *cinfo = mddev->cluster_info; 411 struct suspend_info *s; 412 413 if (!hi) { 414 remove_suspend_info(mddev, slot); 415 set_bit(MD_RECOVERY_NEEDED, &mddev->recovery); 416 md_wakeup_thread(mddev->thread); 417 return; 418 } 419 420 /* 421 * The bitmaps are not same for different nodes 422 * if RESYNCING is happening in one node, then 423 * the node which received the RESYNCING message 424 * probably will perform resync with the region 425 * [lo, hi] again, so we could reduce resync time 426 * a lot if we can ensure that the bitmaps among 427 * different nodes are match up well. 428 * 429 * sync_low/hi is used to record the region which 430 * arrived in the previous RESYNCING message, 431 * 432 * Call bitmap_sync_with_cluster to clear 433 * NEEDED_MASK and set RESYNC_MASK since 434 * resync thread is running in another node, 435 * so we don't need to do the resync again 436 * with the same section */ 437 bitmap_sync_with_cluster(mddev, cinfo->sync_low, 438 cinfo->sync_hi, 439 lo, hi); 440 cinfo->sync_low = lo; 441 cinfo->sync_hi = hi; 442 443 s = kzalloc(sizeof(struct suspend_info), GFP_KERNEL); 444 if (!s) 445 return; 446 s->slot = slot; 447 s->lo = lo; 448 s->hi = hi; 449 mddev->pers->quiesce(mddev, 1); 450 mddev->pers->quiesce(mddev, 0); 451 spin_lock_irq(&cinfo->suspend_lock); 452 /* Remove existing entry (if exists) before adding */ 453 __remove_suspend_info(cinfo, slot); 454 list_add(&s->list, &cinfo->suspend_list); 455 spin_unlock_irq(&cinfo->suspend_lock); 456 mddev->pers->quiesce(mddev, 2); 457 } 458 459 static void process_add_new_disk(struct mddev *mddev, struct cluster_msg *cmsg) 460 { 461 char disk_uuid[64]; 462 struct md_cluster_info *cinfo = mddev->cluster_info; 463 char event_name[] = "EVENT=ADD_DEVICE"; 464 char raid_slot[16]; 465 char *envp[] = {event_name, disk_uuid, raid_slot, NULL}; 466 int len; 467 468 len = snprintf(disk_uuid, 64, "DEVICE_UUID="); 469 sprintf(disk_uuid + len, "%pU", cmsg->uuid); 470 snprintf(raid_slot, 16, "RAID_DISK=%d", le32_to_cpu(cmsg->raid_slot)); 471 pr_info("%s:%d Sending kobject change with %s and %s\n", __func__, __LINE__, disk_uuid, raid_slot); 472 init_completion(&cinfo->newdisk_completion); 473 set_bit(MD_CLUSTER_WAITING_FOR_NEWDISK, &cinfo->state); 474 kobject_uevent_env(&disk_to_dev(mddev->gendisk)->kobj, KOBJ_CHANGE, envp); 475 wait_for_completion_timeout(&cinfo->newdisk_completion, 476 NEW_DEV_TIMEOUT); 477 clear_bit(MD_CLUSTER_WAITING_FOR_NEWDISK, &cinfo->state); 478 } 479 480 481 static void process_metadata_update(struct mddev *mddev, struct cluster_msg *msg) 482 { 483 struct md_cluster_info *cinfo = mddev->cluster_info; 484 mddev->good_device_nr = le32_to_cpu(msg->raid_slot); 485 set_bit(MD_RELOAD_SB, &mddev->flags); 486 dlm_lock_sync(cinfo->no_new_dev_lockres, DLM_LOCK_CR); 487 md_wakeup_thread(mddev->thread); 488 } 489 490 static void process_remove_disk(struct mddev *mddev, struct cluster_msg *msg) 491 { 492 struct md_rdev *rdev; 493 494 rcu_read_lock(); 495 rdev = md_find_rdev_nr_rcu(mddev, le32_to_cpu(msg->raid_slot)); 496 if (rdev) { 497 set_bit(ClusterRemove, &rdev->flags); 498 set_bit(MD_RECOVERY_NEEDED, &mddev->recovery); 499 md_wakeup_thread(mddev->thread); 500 } 501 else 502 pr_warn("%s: %d Could not find disk(%d) to REMOVE\n", 503 __func__, __LINE__, le32_to_cpu(msg->raid_slot)); 504 rcu_read_unlock(); 505 } 506 507 static void process_readd_disk(struct mddev *mddev, struct cluster_msg *msg) 508 { 509 struct md_rdev *rdev; 510 511 rcu_read_lock(); 512 rdev = md_find_rdev_nr_rcu(mddev, le32_to_cpu(msg->raid_slot)); 513 if (rdev && test_bit(Faulty, &rdev->flags)) 514 clear_bit(Faulty, &rdev->flags); 515 else 516 pr_warn("%s: %d Could not find disk(%d) which is faulty", 517 __func__, __LINE__, le32_to_cpu(msg->raid_slot)); 518 rcu_read_unlock(); 519 } 520 521 static int process_recvd_msg(struct mddev *mddev, struct cluster_msg *msg) 522 { 523 int ret = 0; 524 525 if (WARN(mddev->cluster_info->slot_number - 1 == le32_to_cpu(msg->slot), 526 "node %d received it's own msg\n", le32_to_cpu(msg->slot))) 527 return -1; 528 switch (le32_to_cpu(msg->type)) { 529 case METADATA_UPDATED: 530 process_metadata_update(mddev, msg); 531 break; 532 case RESYNCING: 533 process_suspend_info(mddev, le32_to_cpu(msg->slot), 534 le64_to_cpu(msg->low), 535 le64_to_cpu(msg->high)); 536 break; 537 case NEWDISK: 538 process_add_new_disk(mddev, msg); 539 break; 540 case REMOVE: 541 process_remove_disk(mddev, msg); 542 break; 543 case RE_ADD: 544 process_readd_disk(mddev, msg); 545 break; 546 case BITMAP_NEEDS_SYNC: 547 __recover_slot(mddev, le32_to_cpu(msg->slot)); 548 break; 549 default: 550 ret = -1; 551 pr_warn("%s:%d Received unknown message from %d\n", 552 __func__, __LINE__, msg->slot); 553 } 554 return ret; 555 } 556 557 /* 558 * thread for receiving message 559 */ 560 static void recv_daemon(struct md_thread *thread) 561 { 562 struct md_cluster_info *cinfo = thread->mddev->cluster_info; 563 struct dlm_lock_resource *ack_lockres = cinfo->ack_lockres; 564 struct dlm_lock_resource *message_lockres = cinfo->message_lockres; 565 struct cluster_msg msg; 566 int ret; 567 568 mutex_lock(&cinfo->recv_mutex); 569 /*get CR on Message*/ 570 if (dlm_lock_sync(message_lockres, DLM_LOCK_CR)) { 571 pr_err("md/raid1:failed to get CR on MESSAGE\n"); 572 mutex_unlock(&cinfo->recv_mutex); 573 return; 574 } 575 576 /* read lvb and wake up thread to process this message_lockres */ 577 memcpy(&msg, message_lockres->lksb.sb_lvbptr, sizeof(struct cluster_msg)); 578 ret = process_recvd_msg(thread->mddev, &msg); 579 if (ret) 580 goto out; 581 582 /*release CR on ack_lockres*/ 583 ret = dlm_unlock_sync(ack_lockres); 584 if (unlikely(ret != 0)) 585 pr_info("unlock ack failed return %d\n", ret); 586 /*up-convert to PR on message_lockres*/ 587 ret = dlm_lock_sync(message_lockres, DLM_LOCK_PR); 588 if (unlikely(ret != 0)) 589 pr_info("lock PR on msg failed return %d\n", ret); 590 /*get CR on ack_lockres again*/ 591 ret = dlm_lock_sync(ack_lockres, DLM_LOCK_CR); 592 if (unlikely(ret != 0)) 593 pr_info("lock CR on ack failed return %d\n", ret); 594 out: 595 /*release CR on message_lockres*/ 596 ret = dlm_unlock_sync(message_lockres); 597 if (unlikely(ret != 0)) 598 pr_info("unlock msg failed return %d\n", ret); 599 mutex_unlock(&cinfo->recv_mutex); 600 } 601 602 /* lock_token() 603 * Takes the lock on the TOKEN lock resource so no other 604 * node can communicate while the operation is underway. 605 */ 606 static int lock_token(struct md_cluster_info *cinfo) 607 { 608 int error; 609 610 error = dlm_lock_sync(cinfo->token_lockres, DLM_LOCK_EX); 611 if (error) 612 pr_err("md-cluster(%s:%d): failed to get EX on TOKEN (%d)\n", 613 __func__, __LINE__, error); 614 615 /* Lock the receive sequence */ 616 mutex_lock(&cinfo->recv_mutex); 617 return error; 618 } 619 620 /* lock_comm() 621 * Sets the MD_CLUSTER_SEND_LOCK bit to lock the send channel. 622 */ 623 static int lock_comm(struct md_cluster_info *cinfo) 624 { 625 wait_event(cinfo->wait, 626 !test_and_set_bit(MD_CLUSTER_SEND_LOCK, &cinfo->state)); 627 628 return lock_token(cinfo); 629 } 630 631 static void unlock_comm(struct md_cluster_info *cinfo) 632 { 633 WARN_ON(cinfo->token_lockres->mode != DLM_LOCK_EX); 634 mutex_unlock(&cinfo->recv_mutex); 635 dlm_unlock_sync(cinfo->token_lockres); 636 clear_bit(MD_CLUSTER_SEND_LOCK, &cinfo->state); 637 wake_up(&cinfo->wait); 638 } 639 640 /* __sendmsg() 641 * This function performs the actual sending of the message. This function is 642 * usually called after performing the encompassing operation 643 * The function: 644 * 1. Grabs the message lockresource in EX mode 645 * 2. Copies the message to the message LVB 646 * 3. Downconverts message lockresource to CW 647 * 4. Upconverts ack lock resource from CR to EX. This forces the BAST on other nodes 648 * and the other nodes read the message. The thread will wait here until all other 649 * nodes have released ack lock resource. 650 * 5. Downconvert ack lockresource to CR 651 */ 652 static int __sendmsg(struct md_cluster_info *cinfo, struct cluster_msg *cmsg) 653 { 654 int error; 655 int slot = cinfo->slot_number - 1; 656 657 cmsg->slot = cpu_to_le32(slot); 658 /*get EX on Message*/ 659 error = dlm_lock_sync(cinfo->message_lockres, DLM_LOCK_EX); 660 if (error) { 661 pr_err("md-cluster: failed to get EX on MESSAGE (%d)\n", error); 662 goto failed_message; 663 } 664 665 memcpy(cinfo->message_lockres->lksb.sb_lvbptr, (void *)cmsg, 666 sizeof(struct cluster_msg)); 667 /*down-convert EX to CW on Message*/ 668 error = dlm_lock_sync(cinfo->message_lockres, DLM_LOCK_CW); 669 if (error) { 670 pr_err("md-cluster: failed to convert EX to CW on MESSAGE(%d)\n", 671 error); 672 goto failed_ack; 673 } 674 675 /*up-convert CR to EX on Ack*/ 676 error = dlm_lock_sync(cinfo->ack_lockres, DLM_LOCK_EX); 677 if (error) { 678 pr_err("md-cluster: failed to convert CR to EX on ACK(%d)\n", 679 error); 680 goto failed_ack; 681 } 682 683 /*down-convert EX to CR on Ack*/ 684 error = dlm_lock_sync(cinfo->ack_lockres, DLM_LOCK_CR); 685 if (error) { 686 pr_err("md-cluster: failed to convert EX to CR on ACK(%d)\n", 687 error); 688 goto failed_ack; 689 } 690 691 failed_ack: 692 error = dlm_unlock_sync(cinfo->message_lockres); 693 if (unlikely(error != 0)) { 694 pr_err("md-cluster: failed convert to NL on MESSAGE(%d)\n", 695 error); 696 /* in case the message can't be released due to some reason */ 697 goto failed_ack; 698 } 699 failed_message: 700 return error; 701 } 702 703 static int sendmsg(struct md_cluster_info *cinfo, struct cluster_msg *cmsg) 704 { 705 int ret; 706 707 lock_comm(cinfo); 708 ret = __sendmsg(cinfo, cmsg); 709 unlock_comm(cinfo); 710 return ret; 711 } 712 713 static int gather_all_resync_info(struct mddev *mddev, int total_slots) 714 { 715 struct md_cluster_info *cinfo = mddev->cluster_info; 716 int i, ret = 0; 717 struct dlm_lock_resource *bm_lockres; 718 struct suspend_info *s; 719 char str[64]; 720 sector_t lo, hi; 721 722 723 for (i = 0; i < total_slots; i++) { 724 memset(str, '\0', 64); 725 snprintf(str, 64, "bitmap%04d", i); 726 bm_lockres = lockres_init(mddev, str, NULL, 1); 727 if (!bm_lockres) 728 return -ENOMEM; 729 if (i == (cinfo->slot_number - 1)) { 730 lockres_free(bm_lockres); 731 continue; 732 } 733 734 bm_lockres->flags |= DLM_LKF_NOQUEUE; 735 ret = dlm_lock_sync(bm_lockres, DLM_LOCK_PW); 736 if (ret == -EAGAIN) { 737 memset(bm_lockres->lksb.sb_lvbptr, '\0', LVB_SIZE); 738 s = read_resync_info(mddev, bm_lockres); 739 if (s) { 740 pr_info("%s:%d Resync[%llu..%llu] in progress on %d\n", 741 __func__, __LINE__, 742 (unsigned long long) s->lo, 743 (unsigned long long) s->hi, i); 744 spin_lock_irq(&cinfo->suspend_lock); 745 s->slot = i; 746 list_add(&s->list, &cinfo->suspend_list); 747 spin_unlock_irq(&cinfo->suspend_lock); 748 } 749 ret = 0; 750 lockres_free(bm_lockres); 751 continue; 752 } 753 if (ret) { 754 lockres_free(bm_lockres); 755 goto out; 756 } 757 758 /* Read the disk bitmap sb and check if it needs recovery */ 759 ret = bitmap_copy_from_slot(mddev, i, &lo, &hi, false); 760 if (ret) { 761 pr_warn("md-cluster: Could not gather bitmaps from slot %d", i); 762 lockres_free(bm_lockres); 763 continue; 764 } 765 if ((hi > 0) && (lo < mddev->recovery_cp)) { 766 set_bit(MD_RECOVERY_NEEDED, &mddev->recovery); 767 mddev->recovery_cp = lo; 768 md_check_recovery(mddev); 769 } 770 771 lockres_free(bm_lockres); 772 } 773 out: 774 return ret; 775 } 776 777 static int join(struct mddev *mddev, int nodes) 778 { 779 struct md_cluster_info *cinfo; 780 int ret, ops_rv; 781 char str[64]; 782 783 cinfo = kzalloc(sizeof(struct md_cluster_info), GFP_KERNEL); 784 if (!cinfo) 785 return -ENOMEM; 786 787 INIT_LIST_HEAD(&cinfo->suspend_list); 788 spin_lock_init(&cinfo->suspend_lock); 789 init_completion(&cinfo->completion); 790 set_bit(MD_CLUSTER_BEGIN_JOIN_CLUSTER, &cinfo->state); 791 init_waitqueue_head(&cinfo->wait); 792 mutex_init(&cinfo->recv_mutex); 793 794 mddev->cluster_info = cinfo; 795 796 memset(str, 0, 64); 797 sprintf(str, "%pU", mddev->uuid); 798 ret = dlm_new_lockspace(str, mddev->bitmap_info.cluster_name, 799 DLM_LSFL_FS, LVB_SIZE, 800 &md_ls_ops, mddev, &ops_rv, &cinfo->lockspace); 801 if (ret) 802 goto err; 803 wait_for_completion(&cinfo->completion); 804 if (nodes < cinfo->slot_number) { 805 pr_err("md-cluster: Slot allotted(%d) is greater than available slots(%d).", 806 cinfo->slot_number, nodes); 807 ret = -ERANGE; 808 goto err; 809 } 810 /* Initiate the communication resources */ 811 ret = -ENOMEM; 812 cinfo->recv_thread = md_register_thread(recv_daemon, mddev, "cluster_recv"); 813 if (!cinfo->recv_thread) { 814 pr_err("md-cluster: cannot allocate memory for recv_thread!\n"); 815 goto err; 816 } 817 cinfo->message_lockres = lockres_init(mddev, "message", NULL, 1); 818 if (!cinfo->message_lockres) 819 goto err; 820 cinfo->token_lockres = lockres_init(mddev, "token", NULL, 0); 821 if (!cinfo->token_lockres) 822 goto err; 823 cinfo->no_new_dev_lockres = lockres_init(mddev, "no-new-dev", NULL, 0); 824 if (!cinfo->no_new_dev_lockres) 825 goto err; 826 827 ret = dlm_lock_sync(cinfo->token_lockres, DLM_LOCK_EX); 828 if (ret) { 829 ret = -EAGAIN; 830 pr_err("md-cluster: can't join cluster to avoid lock issue\n"); 831 goto err; 832 } 833 cinfo->ack_lockres = lockres_init(mddev, "ack", ack_bast, 0); 834 if (!cinfo->ack_lockres) { 835 ret = -ENOMEM; 836 goto err; 837 } 838 /* get sync CR lock on ACK. */ 839 if (dlm_lock_sync(cinfo->ack_lockres, DLM_LOCK_CR)) 840 pr_err("md-cluster: failed to get a sync CR lock on ACK!(%d)\n", 841 ret); 842 dlm_unlock_sync(cinfo->token_lockres); 843 /* get sync CR lock on no-new-dev. */ 844 if (dlm_lock_sync(cinfo->no_new_dev_lockres, DLM_LOCK_CR)) 845 pr_err("md-cluster: failed to get a sync CR lock on no-new-dev!(%d)\n", ret); 846 847 848 pr_info("md-cluster: Joined cluster %s slot %d\n", str, cinfo->slot_number); 849 snprintf(str, 64, "bitmap%04d", cinfo->slot_number - 1); 850 cinfo->bitmap_lockres = lockres_init(mddev, str, NULL, 1); 851 if (!cinfo->bitmap_lockres) { 852 ret = -ENOMEM; 853 goto err; 854 } 855 if (dlm_lock_sync(cinfo->bitmap_lockres, DLM_LOCK_PW)) { 856 pr_err("Failed to get bitmap lock\n"); 857 ret = -EINVAL; 858 goto err; 859 } 860 861 cinfo->resync_lockres = lockres_init(mddev, "resync", NULL, 0); 862 if (!cinfo->resync_lockres) { 863 ret = -ENOMEM; 864 goto err; 865 } 866 867 return 0; 868 err: 869 md_unregister_thread(&cinfo->recovery_thread); 870 md_unregister_thread(&cinfo->recv_thread); 871 lockres_free(cinfo->message_lockres); 872 lockres_free(cinfo->token_lockres); 873 lockres_free(cinfo->ack_lockres); 874 lockres_free(cinfo->no_new_dev_lockres); 875 lockres_free(cinfo->resync_lockres); 876 lockres_free(cinfo->bitmap_lockres); 877 if (cinfo->lockspace) 878 dlm_release_lockspace(cinfo->lockspace, 2); 879 mddev->cluster_info = NULL; 880 kfree(cinfo); 881 return ret; 882 } 883 884 static void load_bitmaps(struct mddev *mddev, int total_slots) 885 { 886 struct md_cluster_info *cinfo = mddev->cluster_info; 887 888 /* load all the node's bitmap info for resync */ 889 if (gather_all_resync_info(mddev, total_slots)) 890 pr_err("md-cluster: failed to gather all resyn infos\n"); 891 set_bit(MD_CLUSTER_ALREADY_IN_CLUSTER, &cinfo->state); 892 /* wake up recv thread in case something need to be handled */ 893 if (test_and_clear_bit(MD_CLUSTER_PENDING_RECV_EVENT, &cinfo->state)) 894 md_wakeup_thread(cinfo->recv_thread); 895 } 896 897 static void resync_bitmap(struct mddev *mddev) 898 { 899 struct md_cluster_info *cinfo = mddev->cluster_info; 900 struct cluster_msg cmsg = {0}; 901 int err; 902 903 cmsg.type = cpu_to_le32(BITMAP_NEEDS_SYNC); 904 err = sendmsg(cinfo, &cmsg); 905 if (err) 906 pr_err("%s:%d: failed to send BITMAP_NEEDS_SYNC message (%d)\n", 907 __func__, __LINE__, err); 908 } 909 910 static void unlock_all_bitmaps(struct mddev *mddev); 911 static int leave(struct mddev *mddev) 912 { 913 struct md_cluster_info *cinfo = mddev->cluster_info; 914 915 if (!cinfo) 916 return 0; 917 918 /* BITMAP_NEEDS_SYNC message should be sent when node 919 * is leaving the cluster with dirty bitmap, also we 920 * can only deliver it when dlm connection is available */ 921 if (cinfo->slot_number > 0 && mddev->recovery_cp != MaxSector) 922 resync_bitmap(mddev); 923 924 md_unregister_thread(&cinfo->recovery_thread); 925 md_unregister_thread(&cinfo->recv_thread); 926 lockres_free(cinfo->message_lockres); 927 lockres_free(cinfo->token_lockres); 928 lockres_free(cinfo->ack_lockres); 929 lockres_free(cinfo->no_new_dev_lockres); 930 lockres_free(cinfo->resync_lockres); 931 lockres_free(cinfo->bitmap_lockres); 932 unlock_all_bitmaps(mddev); 933 dlm_release_lockspace(cinfo->lockspace, 2); 934 return 0; 935 } 936 937 /* slot_number(): Returns the MD slot number to use 938 * DLM starts the slot numbers from 1, wheras cluster-md 939 * wants the number to be from zero, so we deduct one 940 */ 941 static int slot_number(struct mddev *mddev) 942 { 943 struct md_cluster_info *cinfo = mddev->cluster_info; 944 945 return cinfo->slot_number - 1; 946 } 947 948 /* 949 * Check if the communication is already locked, else lock the communication 950 * channel. 951 * If it is already locked, token is in EX mode, and hence lock_token() 952 * should not be called. 953 */ 954 static int metadata_update_start(struct mddev *mddev) 955 { 956 struct md_cluster_info *cinfo = mddev->cluster_info; 957 958 wait_event(cinfo->wait, 959 !test_and_set_bit(MD_CLUSTER_SEND_LOCK, &cinfo->state) || 960 test_and_clear_bit(MD_CLUSTER_SEND_LOCKED_ALREADY, &cinfo->state)); 961 962 /* If token is already locked, return 0 */ 963 if (cinfo->token_lockres->mode == DLM_LOCK_EX) 964 return 0; 965 966 return lock_token(cinfo); 967 } 968 969 static int metadata_update_finish(struct mddev *mddev) 970 { 971 struct md_cluster_info *cinfo = mddev->cluster_info; 972 struct cluster_msg cmsg; 973 struct md_rdev *rdev; 974 int ret = 0; 975 int raid_slot = -1; 976 977 memset(&cmsg, 0, sizeof(cmsg)); 978 cmsg.type = cpu_to_le32(METADATA_UPDATED); 979 /* Pick up a good active device number to send. 980 */ 981 rdev_for_each(rdev, mddev) 982 if (rdev->raid_disk > -1 && !test_bit(Faulty, &rdev->flags)) { 983 raid_slot = rdev->desc_nr; 984 break; 985 } 986 if (raid_slot >= 0) { 987 cmsg.raid_slot = cpu_to_le32(raid_slot); 988 ret = __sendmsg(cinfo, &cmsg); 989 } else 990 pr_warn("md-cluster: No good device id found to send\n"); 991 clear_bit(MD_CLUSTER_SEND_LOCKED_ALREADY, &cinfo->state); 992 unlock_comm(cinfo); 993 return ret; 994 } 995 996 static void metadata_update_cancel(struct mddev *mddev) 997 { 998 struct md_cluster_info *cinfo = mddev->cluster_info; 999 clear_bit(MD_CLUSTER_SEND_LOCKED_ALREADY, &cinfo->state); 1000 unlock_comm(cinfo); 1001 } 1002 1003 static int resync_start(struct mddev *mddev) 1004 { 1005 struct md_cluster_info *cinfo = mddev->cluster_info; 1006 return dlm_lock_sync(cinfo->resync_lockres, DLM_LOCK_EX); 1007 } 1008 1009 static int resync_info_update(struct mddev *mddev, sector_t lo, sector_t hi) 1010 { 1011 struct md_cluster_info *cinfo = mddev->cluster_info; 1012 struct resync_info ri; 1013 struct cluster_msg cmsg = {0}; 1014 1015 /* do not send zero again, if we have sent before */ 1016 if (hi == 0) { 1017 memcpy(&ri, cinfo->bitmap_lockres->lksb.sb_lvbptr, sizeof(struct resync_info)); 1018 if (le64_to_cpu(ri.hi) == 0) 1019 return 0; 1020 } 1021 1022 add_resync_info(cinfo->bitmap_lockres, lo, hi); 1023 /* Re-acquire the lock to refresh LVB */ 1024 dlm_lock_sync(cinfo->bitmap_lockres, DLM_LOCK_PW); 1025 cmsg.type = cpu_to_le32(RESYNCING); 1026 cmsg.low = cpu_to_le64(lo); 1027 cmsg.high = cpu_to_le64(hi); 1028 1029 return sendmsg(cinfo, &cmsg); 1030 } 1031 1032 static int resync_finish(struct mddev *mddev) 1033 { 1034 struct md_cluster_info *cinfo = mddev->cluster_info; 1035 dlm_unlock_sync(cinfo->resync_lockres); 1036 return resync_info_update(mddev, 0, 0); 1037 } 1038 1039 static int area_resyncing(struct mddev *mddev, int direction, 1040 sector_t lo, sector_t hi) 1041 { 1042 struct md_cluster_info *cinfo = mddev->cluster_info; 1043 int ret = 0; 1044 struct suspend_info *s; 1045 1046 if ((direction == READ) && 1047 test_bit(MD_CLUSTER_SUSPEND_READ_BALANCING, &cinfo->state)) 1048 return 1; 1049 1050 spin_lock_irq(&cinfo->suspend_lock); 1051 if (list_empty(&cinfo->suspend_list)) 1052 goto out; 1053 list_for_each_entry(s, &cinfo->suspend_list, list) 1054 if (hi > s->lo && lo < s->hi) { 1055 ret = 1; 1056 break; 1057 } 1058 out: 1059 spin_unlock_irq(&cinfo->suspend_lock); 1060 return ret; 1061 } 1062 1063 /* add_new_disk() - initiates a disk add 1064 * However, if this fails before writing md_update_sb(), 1065 * add_new_disk_cancel() must be called to release token lock 1066 */ 1067 static int add_new_disk(struct mddev *mddev, struct md_rdev *rdev) 1068 { 1069 struct md_cluster_info *cinfo = mddev->cluster_info; 1070 struct cluster_msg cmsg; 1071 int ret = 0; 1072 struct mdp_superblock_1 *sb = page_address(rdev->sb_page); 1073 char *uuid = sb->device_uuid; 1074 1075 memset(&cmsg, 0, sizeof(cmsg)); 1076 cmsg.type = cpu_to_le32(NEWDISK); 1077 memcpy(cmsg.uuid, uuid, 16); 1078 cmsg.raid_slot = cpu_to_le32(rdev->desc_nr); 1079 lock_comm(cinfo); 1080 ret = __sendmsg(cinfo, &cmsg); 1081 if (ret) 1082 return ret; 1083 cinfo->no_new_dev_lockres->flags |= DLM_LKF_NOQUEUE; 1084 ret = dlm_lock_sync(cinfo->no_new_dev_lockres, DLM_LOCK_EX); 1085 cinfo->no_new_dev_lockres->flags &= ~DLM_LKF_NOQUEUE; 1086 /* Some node does not "see" the device */ 1087 if (ret == -EAGAIN) 1088 ret = -ENOENT; 1089 if (ret) 1090 unlock_comm(cinfo); 1091 else { 1092 dlm_lock_sync(cinfo->no_new_dev_lockres, DLM_LOCK_CR); 1093 /* Since MD_CHANGE_DEVS will be set in add_bound_rdev which 1094 * will run soon after add_new_disk, the below path will be 1095 * invoked: 1096 * md_wakeup_thread(mddev->thread) 1097 * -> conf->thread (raid1d) 1098 * -> md_check_recovery -> md_update_sb 1099 * -> metadata_update_start/finish 1100 * MD_CLUSTER_SEND_LOCKED_ALREADY will be cleared eventually. 1101 * 1102 * For other failure cases, metadata_update_cancel and 1103 * add_new_disk_cancel also clear below bit as well. 1104 * */ 1105 set_bit(MD_CLUSTER_SEND_LOCKED_ALREADY, &cinfo->state); 1106 wake_up(&cinfo->wait); 1107 } 1108 return ret; 1109 } 1110 1111 static void add_new_disk_cancel(struct mddev *mddev) 1112 { 1113 struct md_cluster_info *cinfo = mddev->cluster_info; 1114 clear_bit(MD_CLUSTER_SEND_LOCKED_ALREADY, &cinfo->state); 1115 unlock_comm(cinfo); 1116 } 1117 1118 static int new_disk_ack(struct mddev *mddev, bool ack) 1119 { 1120 struct md_cluster_info *cinfo = mddev->cluster_info; 1121 1122 if (!test_bit(MD_CLUSTER_WAITING_FOR_NEWDISK, &cinfo->state)) { 1123 pr_warn("md-cluster(%s): Spurious cluster confirmation\n", mdname(mddev)); 1124 return -EINVAL; 1125 } 1126 1127 if (ack) 1128 dlm_unlock_sync(cinfo->no_new_dev_lockres); 1129 complete(&cinfo->newdisk_completion); 1130 return 0; 1131 } 1132 1133 static int remove_disk(struct mddev *mddev, struct md_rdev *rdev) 1134 { 1135 struct cluster_msg cmsg = {0}; 1136 struct md_cluster_info *cinfo = mddev->cluster_info; 1137 cmsg.type = cpu_to_le32(REMOVE); 1138 cmsg.raid_slot = cpu_to_le32(rdev->desc_nr); 1139 return sendmsg(cinfo, &cmsg); 1140 } 1141 1142 static int lock_all_bitmaps(struct mddev *mddev) 1143 { 1144 int slot, my_slot, ret, held = 1, i = 0; 1145 char str[64]; 1146 struct md_cluster_info *cinfo = mddev->cluster_info; 1147 1148 cinfo->other_bitmap_lockres = kzalloc((mddev->bitmap_info.nodes - 1) * 1149 sizeof(struct dlm_lock_resource *), 1150 GFP_KERNEL); 1151 if (!cinfo->other_bitmap_lockres) { 1152 pr_err("md: can't alloc mem for other bitmap locks\n"); 1153 return 0; 1154 } 1155 1156 my_slot = slot_number(mddev); 1157 for (slot = 0; slot < mddev->bitmap_info.nodes; slot++) { 1158 if (slot == my_slot) 1159 continue; 1160 1161 memset(str, '\0', 64); 1162 snprintf(str, 64, "bitmap%04d", slot); 1163 cinfo->other_bitmap_lockres[i] = lockres_init(mddev, str, NULL, 1); 1164 if (!cinfo->other_bitmap_lockres[i]) 1165 return -ENOMEM; 1166 1167 cinfo->other_bitmap_lockres[i]->flags |= DLM_LKF_NOQUEUE; 1168 ret = dlm_lock_sync(cinfo->other_bitmap_lockres[i], DLM_LOCK_PW); 1169 if (ret) 1170 held = -1; 1171 i++; 1172 } 1173 1174 return held; 1175 } 1176 1177 static void unlock_all_bitmaps(struct mddev *mddev) 1178 { 1179 struct md_cluster_info *cinfo = mddev->cluster_info; 1180 int i; 1181 1182 /* release other node's bitmap lock if they are existed */ 1183 if (cinfo->other_bitmap_lockres) { 1184 for (i = 0; i < mddev->bitmap_info.nodes - 1; i++) { 1185 if (cinfo->other_bitmap_lockres[i]) { 1186 lockres_free(cinfo->other_bitmap_lockres[i]); 1187 } 1188 } 1189 kfree(cinfo->other_bitmap_lockres); 1190 } 1191 } 1192 1193 static int gather_bitmaps(struct md_rdev *rdev) 1194 { 1195 int sn, err; 1196 sector_t lo, hi; 1197 struct cluster_msg cmsg = {0}; 1198 struct mddev *mddev = rdev->mddev; 1199 struct md_cluster_info *cinfo = mddev->cluster_info; 1200 1201 cmsg.type = cpu_to_le32(RE_ADD); 1202 cmsg.raid_slot = cpu_to_le32(rdev->desc_nr); 1203 err = sendmsg(cinfo, &cmsg); 1204 if (err) 1205 goto out; 1206 1207 for (sn = 0; sn < mddev->bitmap_info.nodes; sn++) { 1208 if (sn == (cinfo->slot_number - 1)) 1209 continue; 1210 err = bitmap_copy_from_slot(mddev, sn, &lo, &hi, false); 1211 if (err) { 1212 pr_warn("md-cluster: Could not gather bitmaps from slot %d", sn); 1213 goto out; 1214 } 1215 if ((hi > 0) && (lo < mddev->recovery_cp)) 1216 mddev->recovery_cp = lo; 1217 } 1218 out: 1219 return err; 1220 } 1221 1222 static struct md_cluster_operations cluster_ops = { 1223 .join = join, 1224 .leave = leave, 1225 .slot_number = slot_number, 1226 .resync_start = resync_start, 1227 .resync_finish = resync_finish, 1228 .resync_info_update = resync_info_update, 1229 .metadata_update_start = metadata_update_start, 1230 .metadata_update_finish = metadata_update_finish, 1231 .metadata_update_cancel = metadata_update_cancel, 1232 .area_resyncing = area_resyncing, 1233 .add_new_disk = add_new_disk, 1234 .add_new_disk_cancel = add_new_disk_cancel, 1235 .new_disk_ack = new_disk_ack, 1236 .remove_disk = remove_disk, 1237 .load_bitmaps = load_bitmaps, 1238 .gather_bitmaps = gather_bitmaps, 1239 .lock_all_bitmaps = lock_all_bitmaps, 1240 .unlock_all_bitmaps = unlock_all_bitmaps, 1241 }; 1242 1243 static int __init cluster_init(void) 1244 { 1245 pr_warn("md-cluster: EXPERIMENTAL. Use with caution\n"); 1246 pr_info("Registering Cluster MD functions\n"); 1247 register_md_cluster_operations(&cluster_ops, THIS_MODULE); 1248 return 0; 1249 } 1250 1251 static void cluster_exit(void) 1252 { 1253 unregister_md_cluster_operations(); 1254 } 1255 1256 module_init(cluster_init); 1257 module_exit(cluster_exit); 1258 MODULE_AUTHOR("SUSE"); 1259 MODULE_LICENSE("GPL"); 1260 MODULE_DESCRIPTION("Clustering support for MD"); 1261