1 /* 2 * Copyright (C) 2015, SUSE 3 * 4 * This program is free software; you can redistribute it and/or modify 5 * it under the terms of the GNU General Public License as published by 6 * the Free Software Foundation; either version 2, or (at your option) 7 * any later version. 8 * 9 */ 10 11 12 #include <linux/module.h> 13 #include <linux/dlm.h> 14 #include <linux/sched.h> 15 #include "md.h" 16 #include "bitmap.h" 17 #include "md-cluster.h" 18 19 #define LVB_SIZE 64 20 21 struct dlm_lock_resource { 22 dlm_lockspace_t *ls; 23 struct dlm_lksb lksb; 24 char *name; /* lock name. */ 25 uint32_t flags; /* flags to pass to dlm_lock() */ 26 struct completion completion; /* completion for synchronized locking */ 27 void (*bast)(void *arg, int mode); /* blocking AST function pointer*/ 28 struct mddev *mddev; /* pointing back to mddev. */ 29 }; 30 31 struct suspend_info { 32 int slot; 33 sector_t lo; 34 sector_t hi; 35 struct list_head list; 36 }; 37 38 struct resync_info { 39 __le64 lo; 40 __le64 hi; 41 }; 42 43 struct md_cluster_info { 44 /* dlm lock space and resources for clustered raid. */ 45 dlm_lockspace_t *lockspace; 46 int slot_number; 47 struct completion completion; 48 struct dlm_lock_resource *sb_lock; 49 struct mutex sb_mutex; 50 struct dlm_lock_resource *bitmap_lockres; 51 struct list_head suspend_list; 52 spinlock_t suspend_lock; 53 struct md_thread *recovery_thread; 54 unsigned long recovery_map; 55 }; 56 57 static void sync_ast(void *arg) 58 { 59 struct dlm_lock_resource *res; 60 61 res = (struct dlm_lock_resource *) arg; 62 complete(&res->completion); 63 } 64 65 static int dlm_lock_sync(struct dlm_lock_resource *res, int mode) 66 { 67 int ret = 0; 68 69 init_completion(&res->completion); 70 ret = dlm_lock(res->ls, mode, &res->lksb, 71 res->flags, res->name, strlen(res->name), 72 0, sync_ast, res, res->bast); 73 if (ret) 74 return ret; 75 wait_for_completion(&res->completion); 76 return res->lksb.sb_status; 77 } 78 79 static int dlm_unlock_sync(struct dlm_lock_resource *res) 80 { 81 return dlm_lock_sync(res, DLM_LOCK_NL); 82 } 83 84 static struct dlm_lock_resource *lockres_init(struct mddev *mddev, 85 char *name, void (*bastfn)(void *arg, int mode), int with_lvb) 86 { 87 struct dlm_lock_resource *res = NULL; 88 int ret, namelen; 89 struct md_cluster_info *cinfo = mddev->cluster_info; 90 91 res = kzalloc(sizeof(struct dlm_lock_resource), GFP_KERNEL); 92 if (!res) 93 return NULL; 94 res->ls = cinfo->lockspace; 95 res->mddev = mddev; 96 namelen = strlen(name); 97 res->name = kzalloc(namelen + 1, GFP_KERNEL); 98 if (!res->name) { 99 pr_err("md-cluster: Unable to allocate resource name for resource %s\n", name); 100 goto out_err; 101 } 102 strlcpy(res->name, name, namelen + 1); 103 if (with_lvb) { 104 res->lksb.sb_lvbptr = kzalloc(LVB_SIZE, GFP_KERNEL); 105 if (!res->lksb.sb_lvbptr) { 106 pr_err("md-cluster: Unable to allocate LVB for resource %s\n", name); 107 goto out_err; 108 } 109 res->flags = DLM_LKF_VALBLK; 110 } 111 112 if (bastfn) 113 res->bast = bastfn; 114 115 res->flags |= DLM_LKF_EXPEDITE; 116 117 ret = dlm_lock_sync(res, DLM_LOCK_NL); 118 if (ret) { 119 pr_err("md-cluster: Unable to lock NL on new lock resource %s\n", name); 120 goto out_err; 121 } 122 res->flags &= ~DLM_LKF_EXPEDITE; 123 res->flags |= DLM_LKF_CONVERT; 124 125 return res; 126 out_err: 127 kfree(res->lksb.sb_lvbptr); 128 kfree(res->name); 129 kfree(res); 130 return NULL; 131 } 132 133 static void lockres_free(struct dlm_lock_resource *res) 134 { 135 if (!res) 136 return; 137 138 init_completion(&res->completion); 139 dlm_unlock(res->ls, res->lksb.sb_lkid, 0, &res->lksb, res); 140 wait_for_completion(&res->completion); 141 142 kfree(res->name); 143 kfree(res->lksb.sb_lvbptr); 144 kfree(res); 145 } 146 147 static char *pretty_uuid(char *dest, char *src) 148 { 149 int i, len = 0; 150 151 for (i = 0; i < 16; i++) { 152 if (i == 4 || i == 6 || i == 8 || i == 10) 153 len += sprintf(dest + len, "-"); 154 len += sprintf(dest + len, "%02x", (__u8)src[i]); 155 } 156 return dest; 157 } 158 159 static void add_resync_info(struct mddev *mddev, struct dlm_lock_resource *lockres, 160 sector_t lo, sector_t hi) 161 { 162 struct resync_info *ri; 163 164 ri = (struct resync_info *)lockres->lksb.sb_lvbptr; 165 ri->lo = cpu_to_le64(lo); 166 ri->hi = cpu_to_le64(hi); 167 } 168 169 static struct suspend_info *read_resync_info(struct mddev *mddev, struct dlm_lock_resource *lockres) 170 { 171 struct resync_info ri; 172 struct suspend_info *s = NULL; 173 sector_t hi = 0; 174 175 dlm_lock_sync(lockres, DLM_LOCK_CR); 176 memcpy(&ri, lockres->lksb.sb_lvbptr, sizeof(struct resync_info)); 177 hi = le64_to_cpu(ri.hi); 178 if (ri.hi > 0) { 179 s = kzalloc(sizeof(struct suspend_info), GFP_KERNEL); 180 if (!s) 181 goto out; 182 s->hi = hi; 183 s->lo = le64_to_cpu(ri.lo); 184 } 185 dlm_unlock_sync(lockres); 186 out: 187 return s; 188 } 189 190 void recover_bitmaps(struct md_thread *thread) 191 { 192 struct mddev *mddev = thread->mddev; 193 struct md_cluster_info *cinfo = mddev->cluster_info; 194 struct dlm_lock_resource *bm_lockres; 195 char str[64]; 196 int slot, ret; 197 struct suspend_info *s, *tmp; 198 sector_t lo, hi; 199 200 while (cinfo->recovery_map) { 201 slot = fls64((u64)cinfo->recovery_map) - 1; 202 203 /* Clear suspend_area associated with the bitmap */ 204 spin_lock_irq(&cinfo->suspend_lock); 205 list_for_each_entry_safe(s, tmp, &cinfo->suspend_list, list) 206 if (slot == s->slot) { 207 list_del(&s->list); 208 kfree(s); 209 } 210 spin_unlock_irq(&cinfo->suspend_lock); 211 212 snprintf(str, 64, "bitmap%04d", slot); 213 bm_lockres = lockres_init(mddev, str, NULL, 1); 214 if (!bm_lockres) { 215 pr_err("md-cluster: Cannot initialize bitmaps\n"); 216 goto clear_bit; 217 } 218 219 ret = dlm_lock_sync(bm_lockres, DLM_LOCK_PW); 220 if (ret) { 221 pr_err("md-cluster: Could not DLM lock %s: %d\n", 222 str, ret); 223 goto clear_bit; 224 } 225 ret = bitmap_copy_from_slot(mddev, slot, &lo, &hi); 226 if (ret) { 227 pr_err("md-cluster: Could not copy data from bitmap %d\n", slot); 228 goto dlm_unlock; 229 } 230 if (hi > 0) { 231 /* TODO:Wait for current resync to get over */ 232 set_bit(MD_RECOVERY_NEEDED, &mddev->recovery); 233 if (lo < mddev->recovery_cp) 234 mddev->recovery_cp = lo; 235 md_check_recovery(mddev); 236 } 237 dlm_unlock: 238 dlm_unlock_sync(bm_lockres); 239 clear_bit: 240 clear_bit(slot, &cinfo->recovery_map); 241 } 242 } 243 244 static void recover_prep(void *arg) 245 { 246 } 247 248 static void recover_slot(void *arg, struct dlm_slot *slot) 249 { 250 struct mddev *mddev = arg; 251 struct md_cluster_info *cinfo = mddev->cluster_info; 252 253 pr_info("md-cluster: %s Node %d/%d down. My slot: %d. Initiating recovery.\n", 254 mddev->bitmap_info.cluster_name, 255 slot->nodeid, slot->slot, 256 cinfo->slot_number); 257 set_bit(slot->slot - 1, &cinfo->recovery_map); 258 if (!cinfo->recovery_thread) { 259 cinfo->recovery_thread = md_register_thread(recover_bitmaps, 260 mddev, "recover"); 261 if (!cinfo->recovery_thread) { 262 pr_warn("md-cluster: Could not create recovery thread\n"); 263 return; 264 } 265 } 266 md_wakeup_thread(cinfo->recovery_thread); 267 } 268 269 static void recover_done(void *arg, struct dlm_slot *slots, 270 int num_slots, int our_slot, 271 uint32_t generation) 272 { 273 struct mddev *mddev = arg; 274 struct md_cluster_info *cinfo = mddev->cluster_info; 275 276 cinfo->slot_number = our_slot; 277 complete(&cinfo->completion); 278 } 279 280 static const struct dlm_lockspace_ops md_ls_ops = { 281 .recover_prep = recover_prep, 282 .recover_slot = recover_slot, 283 .recover_done = recover_done, 284 }; 285 286 static int gather_all_resync_info(struct mddev *mddev, int total_slots) 287 { 288 struct md_cluster_info *cinfo = mddev->cluster_info; 289 int i, ret = 0; 290 struct dlm_lock_resource *bm_lockres; 291 struct suspend_info *s; 292 char str[64]; 293 294 295 for (i = 0; i < total_slots; i++) { 296 memset(str, '\0', 64); 297 snprintf(str, 64, "bitmap%04d", i); 298 bm_lockres = lockres_init(mddev, str, NULL, 1); 299 if (!bm_lockres) 300 return -ENOMEM; 301 if (i == (cinfo->slot_number - 1)) 302 continue; 303 304 bm_lockres->flags |= DLM_LKF_NOQUEUE; 305 ret = dlm_lock_sync(bm_lockres, DLM_LOCK_PW); 306 if (ret == -EAGAIN) { 307 memset(bm_lockres->lksb.sb_lvbptr, '\0', LVB_SIZE); 308 s = read_resync_info(mddev, bm_lockres); 309 if (s) { 310 pr_info("%s:%d Resync[%llu..%llu] in progress on %d\n", 311 __func__, __LINE__, 312 (unsigned long long) s->lo, 313 (unsigned long long) s->hi, i); 314 spin_lock_irq(&cinfo->suspend_lock); 315 s->slot = i; 316 list_add(&s->list, &cinfo->suspend_list); 317 spin_unlock_irq(&cinfo->suspend_lock); 318 } 319 ret = 0; 320 lockres_free(bm_lockres); 321 continue; 322 } 323 if (ret) 324 goto out; 325 /* TODO: Read the disk bitmap sb and check if it needs recovery */ 326 dlm_unlock_sync(bm_lockres); 327 lockres_free(bm_lockres); 328 } 329 out: 330 return ret; 331 } 332 333 static int join(struct mddev *mddev, int nodes) 334 { 335 struct md_cluster_info *cinfo; 336 int ret, ops_rv; 337 char str[64]; 338 339 if (!try_module_get(THIS_MODULE)) 340 return -ENOENT; 341 342 cinfo = kzalloc(sizeof(struct md_cluster_info), GFP_KERNEL); 343 if (!cinfo) 344 return -ENOMEM; 345 346 init_completion(&cinfo->completion); 347 348 mutex_init(&cinfo->sb_mutex); 349 mddev->cluster_info = cinfo; 350 351 memset(str, 0, 64); 352 pretty_uuid(str, mddev->uuid); 353 ret = dlm_new_lockspace(str, mddev->bitmap_info.cluster_name, 354 DLM_LSFL_FS, LVB_SIZE, 355 &md_ls_ops, mddev, &ops_rv, &cinfo->lockspace); 356 if (ret) 357 goto err; 358 wait_for_completion(&cinfo->completion); 359 if (nodes <= cinfo->slot_number) { 360 pr_err("md-cluster: Slot allotted(%d) greater than available slots(%d)", cinfo->slot_number - 1, 361 nodes); 362 ret = -ERANGE; 363 goto err; 364 } 365 cinfo->sb_lock = lockres_init(mddev, "cmd-super", 366 NULL, 0); 367 if (!cinfo->sb_lock) { 368 ret = -ENOMEM; 369 goto err; 370 } 371 372 pr_info("md-cluster: Joined cluster %s slot %d\n", str, cinfo->slot_number); 373 snprintf(str, 64, "bitmap%04d", cinfo->slot_number - 1); 374 cinfo->bitmap_lockres = lockres_init(mddev, str, NULL, 1); 375 if (!cinfo->bitmap_lockres) 376 goto err; 377 if (dlm_lock_sync(cinfo->bitmap_lockres, DLM_LOCK_PW)) { 378 pr_err("Failed to get bitmap lock\n"); 379 ret = -EINVAL; 380 goto err; 381 } 382 383 INIT_LIST_HEAD(&cinfo->suspend_list); 384 spin_lock_init(&cinfo->suspend_lock); 385 386 ret = gather_all_resync_info(mddev, nodes); 387 if (ret) 388 goto err; 389 390 return 0; 391 err: 392 lockres_free(cinfo->bitmap_lockres); 393 lockres_free(cinfo->sb_lock); 394 if (cinfo->lockspace) 395 dlm_release_lockspace(cinfo->lockspace, 2); 396 mddev->cluster_info = NULL; 397 kfree(cinfo); 398 module_put(THIS_MODULE); 399 return ret; 400 } 401 402 static int leave(struct mddev *mddev) 403 { 404 struct md_cluster_info *cinfo = mddev->cluster_info; 405 406 if (!cinfo) 407 return 0; 408 md_unregister_thread(&cinfo->recovery_thread); 409 lockres_free(cinfo->sb_lock); 410 lockres_free(cinfo->bitmap_lockres); 411 dlm_release_lockspace(cinfo->lockspace, 2); 412 return 0; 413 } 414 415 /* slot_number(): Returns the MD slot number to use 416 * DLM starts the slot numbers from 1, wheras cluster-md 417 * wants the number to be from zero, so we deduct one 418 */ 419 static int slot_number(struct mddev *mddev) 420 { 421 struct md_cluster_info *cinfo = mddev->cluster_info; 422 423 return cinfo->slot_number - 1; 424 } 425 426 static void resync_info_update(struct mddev *mddev, sector_t lo, sector_t hi) 427 { 428 struct md_cluster_info *cinfo = mddev->cluster_info; 429 430 add_resync_info(mddev, cinfo->bitmap_lockres, lo, hi); 431 /* Re-acquire the lock to refresh LVB */ 432 dlm_lock_sync(cinfo->bitmap_lockres, DLM_LOCK_PW); 433 } 434 435 static struct md_cluster_operations cluster_ops = { 436 .join = join, 437 .leave = leave, 438 .slot_number = slot_number, 439 .resync_info_update = resync_info_update, 440 }; 441 442 static int __init cluster_init(void) 443 { 444 pr_warn("md-cluster: EXPERIMENTAL. Use with caution\n"); 445 pr_info("Registering Cluster MD functions\n"); 446 register_md_cluster_operations(&cluster_ops, THIS_MODULE); 447 return 0; 448 } 449 450 static void cluster_exit(void) 451 { 452 unregister_md_cluster_operations(); 453 } 454 455 module_init(cluster_init); 456 module_exit(cluster_exit); 457 MODULE_LICENSE("GPL"); 458 MODULE_DESCRIPTION("Clustering support for MD"); 459