1 /* 2 * Copyright (C) 2015, SUSE 3 * 4 * This program is free software; you can redistribute it and/or modify 5 * it under the terms of the GNU General Public License as published by 6 * the Free Software Foundation; either version 2, or (at your option) 7 * any later version. 8 * 9 */ 10 11 12 #include <linux/module.h> 13 #include <linux/dlm.h> 14 #include <linux/sched.h> 15 #include "md.h" 16 #include "md-cluster.h" 17 18 #define LVB_SIZE 64 19 20 struct dlm_lock_resource { 21 dlm_lockspace_t *ls; 22 struct dlm_lksb lksb; 23 char *name; /* lock name. */ 24 uint32_t flags; /* flags to pass to dlm_lock() */ 25 struct completion completion; /* completion for synchronized locking */ 26 void (*bast)(void *arg, int mode); /* blocking AST function pointer*/ 27 struct mddev *mddev; /* pointing back to mddev. */ 28 }; 29 30 struct suspend_info { 31 int slot; 32 sector_t lo; 33 sector_t hi; 34 struct list_head list; 35 }; 36 37 struct resync_info { 38 __le64 lo; 39 __le64 hi; 40 }; 41 42 struct md_cluster_info { 43 /* dlm lock space and resources for clustered raid. */ 44 dlm_lockspace_t *lockspace; 45 int slot_number; 46 struct completion completion; 47 struct dlm_lock_resource *sb_lock; 48 struct mutex sb_mutex; 49 struct dlm_lock_resource *bitmap_lockres; 50 struct list_head suspend_list; 51 spinlock_t suspend_lock; 52 }; 53 54 static void sync_ast(void *arg) 55 { 56 struct dlm_lock_resource *res; 57 58 res = (struct dlm_lock_resource *) arg; 59 complete(&res->completion); 60 } 61 62 static int dlm_lock_sync(struct dlm_lock_resource *res, int mode) 63 { 64 int ret = 0; 65 66 init_completion(&res->completion); 67 ret = dlm_lock(res->ls, mode, &res->lksb, 68 res->flags, res->name, strlen(res->name), 69 0, sync_ast, res, res->bast); 70 if (ret) 71 return ret; 72 wait_for_completion(&res->completion); 73 return res->lksb.sb_status; 74 } 75 76 static int dlm_unlock_sync(struct dlm_lock_resource *res) 77 { 78 return dlm_lock_sync(res, DLM_LOCK_NL); 79 } 80 81 static struct dlm_lock_resource *lockres_init(struct mddev *mddev, 82 char *name, void (*bastfn)(void *arg, int mode), int with_lvb) 83 { 84 struct dlm_lock_resource *res = NULL; 85 int ret, namelen; 86 struct md_cluster_info *cinfo = mddev->cluster_info; 87 88 res = kzalloc(sizeof(struct dlm_lock_resource), GFP_KERNEL); 89 if (!res) 90 return NULL; 91 res->ls = cinfo->lockspace; 92 res->mddev = mddev; 93 namelen = strlen(name); 94 res->name = kzalloc(namelen + 1, GFP_KERNEL); 95 if (!res->name) { 96 pr_err("md-cluster: Unable to allocate resource name for resource %s\n", name); 97 goto out_err; 98 } 99 strlcpy(res->name, name, namelen + 1); 100 if (with_lvb) { 101 res->lksb.sb_lvbptr = kzalloc(LVB_SIZE, GFP_KERNEL); 102 if (!res->lksb.sb_lvbptr) { 103 pr_err("md-cluster: Unable to allocate LVB for resource %s\n", name); 104 goto out_err; 105 } 106 res->flags = DLM_LKF_VALBLK; 107 } 108 109 if (bastfn) 110 res->bast = bastfn; 111 112 res->flags |= DLM_LKF_EXPEDITE; 113 114 ret = dlm_lock_sync(res, DLM_LOCK_NL); 115 if (ret) { 116 pr_err("md-cluster: Unable to lock NL on new lock resource %s\n", name); 117 goto out_err; 118 } 119 res->flags &= ~DLM_LKF_EXPEDITE; 120 res->flags |= DLM_LKF_CONVERT; 121 122 return res; 123 out_err: 124 kfree(res->lksb.sb_lvbptr); 125 kfree(res->name); 126 kfree(res); 127 return NULL; 128 } 129 130 static void lockres_free(struct dlm_lock_resource *res) 131 { 132 if (!res) 133 return; 134 135 init_completion(&res->completion); 136 dlm_unlock(res->ls, res->lksb.sb_lkid, 0, &res->lksb, res); 137 wait_for_completion(&res->completion); 138 139 kfree(res->name); 140 kfree(res->lksb.sb_lvbptr); 141 kfree(res); 142 } 143 144 static char *pretty_uuid(char *dest, char *src) 145 { 146 int i, len = 0; 147 148 for (i = 0; i < 16; i++) { 149 if (i == 4 || i == 6 || i == 8 || i == 10) 150 len += sprintf(dest + len, "-"); 151 len += sprintf(dest + len, "%02x", (__u8)src[i]); 152 } 153 return dest; 154 } 155 156 static void add_resync_info(struct mddev *mddev, struct dlm_lock_resource *lockres, 157 sector_t lo, sector_t hi) 158 { 159 struct resync_info *ri; 160 161 ri = (struct resync_info *)lockres->lksb.sb_lvbptr; 162 ri->lo = cpu_to_le64(lo); 163 ri->hi = cpu_to_le64(hi); 164 } 165 166 static struct suspend_info *read_resync_info(struct mddev *mddev, struct dlm_lock_resource *lockres) 167 { 168 struct resync_info ri; 169 struct suspend_info *s = NULL; 170 sector_t hi = 0; 171 172 dlm_lock_sync(lockres, DLM_LOCK_CR); 173 memcpy(&ri, lockres->lksb.sb_lvbptr, sizeof(struct resync_info)); 174 hi = le64_to_cpu(ri.hi); 175 if (ri.hi > 0) { 176 s = kzalloc(sizeof(struct suspend_info), GFP_KERNEL); 177 if (!s) 178 goto out; 179 s->hi = hi; 180 s->lo = le64_to_cpu(ri.lo); 181 } 182 dlm_unlock_sync(lockres); 183 out: 184 return s; 185 } 186 187 static void recover_prep(void *arg) 188 { 189 } 190 191 static void recover_slot(void *arg, struct dlm_slot *slot) 192 { 193 struct mddev *mddev = arg; 194 struct md_cluster_info *cinfo = mddev->cluster_info; 195 196 pr_info("md-cluster: %s Node %d/%d down. My slot: %d. Initiating recovery.\n", 197 mddev->bitmap_info.cluster_name, 198 slot->nodeid, slot->slot, 199 cinfo->slot_number); 200 } 201 202 static void recover_done(void *arg, struct dlm_slot *slots, 203 int num_slots, int our_slot, 204 uint32_t generation) 205 { 206 struct mddev *mddev = arg; 207 struct md_cluster_info *cinfo = mddev->cluster_info; 208 209 cinfo->slot_number = our_slot; 210 complete(&cinfo->completion); 211 } 212 213 static const struct dlm_lockspace_ops md_ls_ops = { 214 .recover_prep = recover_prep, 215 .recover_slot = recover_slot, 216 .recover_done = recover_done, 217 }; 218 219 static int gather_all_resync_info(struct mddev *mddev, int total_slots) 220 { 221 struct md_cluster_info *cinfo = mddev->cluster_info; 222 int i, ret = 0; 223 struct dlm_lock_resource *bm_lockres; 224 struct suspend_info *s; 225 char str[64]; 226 227 228 for (i = 0; i < total_slots; i++) { 229 memset(str, '\0', 64); 230 snprintf(str, 64, "bitmap%04d", i); 231 bm_lockres = lockres_init(mddev, str, NULL, 1); 232 if (!bm_lockres) 233 return -ENOMEM; 234 if (i == (cinfo->slot_number - 1)) 235 continue; 236 237 bm_lockres->flags |= DLM_LKF_NOQUEUE; 238 ret = dlm_lock_sync(bm_lockres, DLM_LOCK_PW); 239 if (ret == -EAGAIN) { 240 memset(bm_lockres->lksb.sb_lvbptr, '\0', LVB_SIZE); 241 s = read_resync_info(mddev, bm_lockres); 242 if (s) { 243 pr_info("%s:%d Resync[%llu..%llu] in progress on %d\n", 244 __func__, __LINE__, 245 (unsigned long long) s->lo, 246 (unsigned long long) s->hi, i); 247 spin_lock_irq(&cinfo->suspend_lock); 248 s->slot = i; 249 list_add(&s->list, &cinfo->suspend_list); 250 spin_unlock_irq(&cinfo->suspend_lock); 251 } 252 ret = 0; 253 lockres_free(bm_lockres); 254 continue; 255 } 256 if (ret) 257 goto out; 258 /* TODO: Read the disk bitmap sb and check if it needs recovery */ 259 dlm_unlock_sync(bm_lockres); 260 lockres_free(bm_lockres); 261 } 262 out: 263 return ret; 264 } 265 266 static int join(struct mddev *mddev, int nodes) 267 { 268 struct md_cluster_info *cinfo; 269 int ret, ops_rv; 270 char str[64]; 271 272 if (!try_module_get(THIS_MODULE)) 273 return -ENOENT; 274 275 cinfo = kzalloc(sizeof(struct md_cluster_info), GFP_KERNEL); 276 if (!cinfo) 277 return -ENOMEM; 278 279 init_completion(&cinfo->completion); 280 281 mutex_init(&cinfo->sb_mutex); 282 mddev->cluster_info = cinfo; 283 284 memset(str, 0, 64); 285 pretty_uuid(str, mddev->uuid); 286 ret = dlm_new_lockspace(str, mddev->bitmap_info.cluster_name, 287 DLM_LSFL_FS, LVB_SIZE, 288 &md_ls_ops, mddev, &ops_rv, &cinfo->lockspace); 289 if (ret) 290 goto err; 291 wait_for_completion(&cinfo->completion); 292 if (nodes <= cinfo->slot_number) { 293 pr_err("md-cluster: Slot allotted(%d) greater than available slots(%d)", cinfo->slot_number - 1, 294 nodes); 295 ret = -ERANGE; 296 goto err; 297 } 298 cinfo->sb_lock = lockres_init(mddev, "cmd-super", 299 NULL, 0); 300 if (!cinfo->sb_lock) { 301 ret = -ENOMEM; 302 goto err; 303 } 304 305 pr_info("md-cluster: Joined cluster %s slot %d\n", str, cinfo->slot_number); 306 snprintf(str, 64, "bitmap%04d", cinfo->slot_number - 1); 307 cinfo->bitmap_lockres = lockres_init(mddev, str, NULL, 1); 308 if (!cinfo->bitmap_lockres) 309 goto err; 310 if (dlm_lock_sync(cinfo->bitmap_lockres, DLM_LOCK_PW)) { 311 pr_err("Failed to get bitmap lock\n"); 312 ret = -EINVAL; 313 goto err; 314 } 315 316 INIT_LIST_HEAD(&cinfo->suspend_list); 317 spin_lock_init(&cinfo->suspend_lock); 318 319 ret = gather_all_resync_info(mddev, nodes); 320 if (ret) 321 goto err; 322 323 return 0; 324 err: 325 lockres_free(cinfo->bitmap_lockres); 326 lockres_free(cinfo->sb_lock); 327 if (cinfo->lockspace) 328 dlm_release_lockspace(cinfo->lockspace, 2); 329 mddev->cluster_info = NULL; 330 kfree(cinfo); 331 module_put(THIS_MODULE); 332 return ret; 333 } 334 335 static int leave(struct mddev *mddev) 336 { 337 struct md_cluster_info *cinfo = mddev->cluster_info; 338 339 if (!cinfo) 340 return 0; 341 lockres_free(cinfo->sb_lock); 342 lockres_free(cinfo->bitmap_lockres); 343 dlm_release_lockspace(cinfo->lockspace, 2); 344 return 0; 345 } 346 347 /* slot_number(): Returns the MD slot number to use 348 * DLM starts the slot numbers from 1, wheras cluster-md 349 * wants the number to be from zero, so we deduct one 350 */ 351 static int slot_number(struct mddev *mddev) 352 { 353 struct md_cluster_info *cinfo = mddev->cluster_info; 354 355 return cinfo->slot_number - 1; 356 } 357 358 static void resync_info_update(struct mddev *mddev, sector_t lo, sector_t hi) 359 { 360 struct md_cluster_info *cinfo = mddev->cluster_info; 361 362 add_resync_info(mddev, cinfo->bitmap_lockres, lo, hi); 363 /* Re-acquire the lock to refresh LVB */ 364 dlm_lock_sync(cinfo->bitmap_lockres, DLM_LOCK_PW); 365 } 366 367 static struct md_cluster_operations cluster_ops = { 368 .join = join, 369 .leave = leave, 370 .slot_number = slot_number, 371 .resync_info_update = resync_info_update, 372 }; 373 374 static int __init cluster_init(void) 375 { 376 pr_warn("md-cluster: EXPERIMENTAL. Use with caution\n"); 377 pr_info("Registering Cluster MD functions\n"); 378 register_md_cluster_operations(&cluster_ops, THIS_MODULE); 379 return 0; 380 } 381 382 static void cluster_exit(void) 383 { 384 unregister_md_cluster_operations(); 385 } 386 387 module_init(cluster_init); 388 module_exit(cluster_exit); 389 MODULE_LICENSE("GPL"); 390 MODULE_DESCRIPTION("Clustering support for MD"); 391