xref: /openbmc/linux/drivers/md/md-cluster.c (revision 4b26a08a)
1 /*
2  * Copyright (C) 2015, SUSE
3  *
4  * This program is free software; you can redistribute it and/or modify
5  * it under the terms of the GNU General Public License as published by
6  * the Free Software Foundation; either version 2, or (at your option)
7  * any later version.
8  *
9  */
10 
11 
12 #include <linux/module.h>
13 #include <linux/dlm.h>
14 #include <linux/sched.h>
15 #include "md.h"
16 #include "bitmap.h"
17 #include "md-cluster.h"
18 
19 #define LVB_SIZE	64
20 
21 struct dlm_lock_resource {
22 	dlm_lockspace_t *ls;
23 	struct dlm_lksb lksb;
24 	char *name; /* lock name. */
25 	uint32_t flags; /* flags to pass to dlm_lock() */
26 	struct completion completion; /* completion for synchronized locking */
27 	void (*bast)(void *arg, int mode); /* blocking AST function pointer*/
28 	struct mddev *mddev; /* pointing back to mddev. */
29 };
30 
31 struct suspend_info {
32 	int slot;
33 	sector_t lo;
34 	sector_t hi;
35 	struct list_head list;
36 };
37 
38 struct resync_info {
39 	__le64 lo;
40 	__le64 hi;
41 };
42 
43 struct md_cluster_info {
44 	/* dlm lock space and resources for clustered raid. */
45 	dlm_lockspace_t *lockspace;
46 	int slot_number;
47 	struct completion completion;
48 	struct dlm_lock_resource *sb_lock;
49 	struct mutex sb_mutex;
50 	struct dlm_lock_resource *bitmap_lockres;
51 	struct list_head suspend_list;
52 	spinlock_t suspend_lock;
53 	struct md_thread *recovery_thread;
54 	unsigned long recovery_map;
55 };
56 
57 static void sync_ast(void *arg)
58 {
59 	struct dlm_lock_resource *res;
60 
61 	res = (struct dlm_lock_resource *) arg;
62 	complete(&res->completion);
63 }
64 
65 static int dlm_lock_sync(struct dlm_lock_resource *res, int mode)
66 {
67 	int ret = 0;
68 
69 	init_completion(&res->completion);
70 	ret = dlm_lock(res->ls, mode, &res->lksb,
71 			res->flags, res->name, strlen(res->name),
72 			0, sync_ast, res, res->bast);
73 	if (ret)
74 		return ret;
75 	wait_for_completion(&res->completion);
76 	return res->lksb.sb_status;
77 }
78 
79 static int dlm_unlock_sync(struct dlm_lock_resource *res)
80 {
81 	return dlm_lock_sync(res, DLM_LOCK_NL);
82 }
83 
84 static struct dlm_lock_resource *lockres_init(struct mddev *mddev,
85 		char *name, void (*bastfn)(void *arg, int mode), int with_lvb)
86 {
87 	struct dlm_lock_resource *res = NULL;
88 	int ret, namelen;
89 	struct md_cluster_info *cinfo = mddev->cluster_info;
90 
91 	res = kzalloc(sizeof(struct dlm_lock_resource), GFP_KERNEL);
92 	if (!res)
93 		return NULL;
94 	res->ls = cinfo->lockspace;
95 	res->mddev = mddev;
96 	namelen = strlen(name);
97 	res->name = kzalloc(namelen + 1, GFP_KERNEL);
98 	if (!res->name) {
99 		pr_err("md-cluster: Unable to allocate resource name for resource %s\n", name);
100 		goto out_err;
101 	}
102 	strlcpy(res->name, name, namelen + 1);
103 	if (with_lvb) {
104 		res->lksb.sb_lvbptr = kzalloc(LVB_SIZE, GFP_KERNEL);
105 		if (!res->lksb.sb_lvbptr) {
106 			pr_err("md-cluster: Unable to allocate LVB for resource %s\n", name);
107 			goto out_err;
108 		}
109 		res->flags = DLM_LKF_VALBLK;
110 	}
111 
112 	if (bastfn)
113 		res->bast = bastfn;
114 
115 	res->flags |= DLM_LKF_EXPEDITE;
116 
117 	ret = dlm_lock_sync(res, DLM_LOCK_NL);
118 	if (ret) {
119 		pr_err("md-cluster: Unable to lock NL on new lock resource %s\n", name);
120 		goto out_err;
121 	}
122 	res->flags &= ~DLM_LKF_EXPEDITE;
123 	res->flags |= DLM_LKF_CONVERT;
124 
125 	return res;
126 out_err:
127 	kfree(res->lksb.sb_lvbptr);
128 	kfree(res->name);
129 	kfree(res);
130 	return NULL;
131 }
132 
133 static void lockres_free(struct dlm_lock_resource *res)
134 {
135 	if (!res)
136 		return;
137 
138 	init_completion(&res->completion);
139 	dlm_unlock(res->ls, res->lksb.sb_lkid, 0, &res->lksb, res);
140 	wait_for_completion(&res->completion);
141 
142 	kfree(res->name);
143 	kfree(res->lksb.sb_lvbptr);
144 	kfree(res);
145 }
146 
147 static char *pretty_uuid(char *dest, char *src)
148 {
149 	int i, len = 0;
150 
151 	for (i = 0; i < 16; i++) {
152 		if (i == 4 || i == 6 || i == 8 || i == 10)
153 			len += sprintf(dest + len, "-");
154 		len += sprintf(dest + len, "%02x", (__u8)src[i]);
155 	}
156 	return dest;
157 }
158 
159 static void add_resync_info(struct mddev *mddev, struct dlm_lock_resource *lockres,
160 		sector_t lo, sector_t hi)
161 {
162 	struct resync_info *ri;
163 
164 	ri = (struct resync_info *)lockres->lksb.sb_lvbptr;
165 	ri->lo = cpu_to_le64(lo);
166 	ri->hi = cpu_to_le64(hi);
167 }
168 
169 static struct suspend_info *read_resync_info(struct mddev *mddev, struct dlm_lock_resource *lockres)
170 {
171 	struct resync_info ri;
172 	struct suspend_info *s = NULL;
173 	sector_t hi = 0;
174 
175 	dlm_lock_sync(lockres, DLM_LOCK_CR);
176 	memcpy(&ri, lockres->lksb.sb_lvbptr, sizeof(struct resync_info));
177 	hi = le64_to_cpu(ri.hi);
178 	if (ri.hi > 0) {
179 		s = kzalloc(sizeof(struct suspend_info), GFP_KERNEL);
180 		if (!s)
181 			goto out;
182 		s->hi = hi;
183 		s->lo = le64_to_cpu(ri.lo);
184 	}
185 	dlm_unlock_sync(lockres);
186 out:
187 	return s;
188 }
189 
190 void recover_bitmaps(struct md_thread *thread)
191 {
192 	struct mddev *mddev = thread->mddev;
193 	struct md_cluster_info *cinfo = mddev->cluster_info;
194 	struct dlm_lock_resource *bm_lockres;
195 	char str[64];
196 	int slot, ret;
197 	struct suspend_info *s, *tmp;
198 	sector_t lo, hi;
199 
200 	while (cinfo->recovery_map) {
201 		slot = fls64((u64)cinfo->recovery_map) - 1;
202 
203 		/* Clear suspend_area associated with the bitmap */
204 		spin_lock_irq(&cinfo->suspend_lock);
205 		list_for_each_entry_safe(s, tmp, &cinfo->suspend_list, list)
206 			if (slot == s->slot) {
207 				list_del(&s->list);
208 				kfree(s);
209 			}
210 		spin_unlock_irq(&cinfo->suspend_lock);
211 
212 		snprintf(str, 64, "bitmap%04d", slot);
213 		bm_lockres = lockres_init(mddev, str, NULL, 1);
214 		if (!bm_lockres) {
215 			pr_err("md-cluster: Cannot initialize bitmaps\n");
216 			goto clear_bit;
217 		}
218 
219 		ret = dlm_lock_sync(bm_lockres, DLM_LOCK_PW);
220 		if (ret) {
221 			pr_err("md-cluster: Could not DLM lock %s: %d\n",
222 					str, ret);
223 			goto clear_bit;
224 		}
225 		ret = bitmap_copy_from_slot(mddev, slot, &lo, &hi);
226 		if (ret) {
227 			pr_err("md-cluster: Could not copy data from bitmap %d\n", slot);
228 			goto dlm_unlock;
229 		}
230 		if (hi > 0) {
231 			/* TODO:Wait for current resync to get over */
232 			set_bit(MD_RECOVERY_NEEDED, &mddev->recovery);
233 			if (lo < mddev->recovery_cp)
234 				mddev->recovery_cp = lo;
235 			md_check_recovery(mddev);
236 		}
237 dlm_unlock:
238 		dlm_unlock_sync(bm_lockres);
239 clear_bit:
240 		clear_bit(slot, &cinfo->recovery_map);
241 	}
242 }
243 
244 static void recover_prep(void *arg)
245 {
246 }
247 
248 static void recover_slot(void *arg, struct dlm_slot *slot)
249 {
250 	struct mddev *mddev = arg;
251 	struct md_cluster_info *cinfo = mddev->cluster_info;
252 
253 	pr_info("md-cluster: %s Node %d/%d down. My slot: %d. Initiating recovery.\n",
254 			mddev->bitmap_info.cluster_name,
255 			slot->nodeid, slot->slot,
256 			cinfo->slot_number);
257 	set_bit(slot->slot - 1, &cinfo->recovery_map);
258 	if (!cinfo->recovery_thread) {
259 		cinfo->recovery_thread = md_register_thread(recover_bitmaps,
260 				mddev, "recover");
261 		if (!cinfo->recovery_thread) {
262 			pr_warn("md-cluster: Could not create recovery thread\n");
263 			return;
264 		}
265 	}
266 	md_wakeup_thread(cinfo->recovery_thread);
267 }
268 
269 static void recover_done(void *arg, struct dlm_slot *slots,
270 		int num_slots, int our_slot,
271 		uint32_t generation)
272 {
273 	struct mddev *mddev = arg;
274 	struct md_cluster_info *cinfo = mddev->cluster_info;
275 
276 	cinfo->slot_number = our_slot;
277 	complete(&cinfo->completion);
278 }
279 
280 static const struct dlm_lockspace_ops md_ls_ops = {
281 	.recover_prep = recover_prep,
282 	.recover_slot = recover_slot,
283 	.recover_done = recover_done,
284 };
285 
286 static int gather_all_resync_info(struct mddev *mddev, int total_slots)
287 {
288 	struct md_cluster_info *cinfo = mddev->cluster_info;
289 	int i, ret = 0;
290 	struct dlm_lock_resource *bm_lockres;
291 	struct suspend_info *s;
292 	char str[64];
293 
294 
295 	for (i = 0; i < total_slots; i++) {
296 		memset(str, '\0', 64);
297 		snprintf(str, 64, "bitmap%04d", i);
298 		bm_lockres = lockres_init(mddev, str, NULL, 1);
299 		if (!bm_lockres)
300 			return -ENOMEM;
301 		if (i == (cinfo->slot_number - 1))
302 			continue;
303 
304 		bm_lockres->flags |= DLM_LKF_NOQUEUE;
305 		ret = dlm_lock_sync(bm_lockres, DLM_LOCK_PW);
306 		if (ret == -EAGAIN) {
307 			memset(bm_lockres->lksb.sb_lvbptr, '\0', LVB_SIZE);
308 			s = read_resync_info(mddev, bm_lockres);
309 			if (s) {
310 				pr_info("%s:%d Resync[%llu..%llu] in progress on %d\n",
311 						__func__, __LINE__,
312 						(unsigned long long) s->lo,
313 						(unsigned long long) s->hi, i);
314 				spin_lock_irq(&cinfo->suspend_lock);
315 				s->slot = i;
316 				list_add(&s->list, &cinfo->suspend_list);
317 				spin_unlock_irq(&cinfo->suspend_lock);
318 			}
319 			ret = 0;
320 			lockres_free(bm_lockres);
321 			continue;
322 		}
323 		if (ret)
324 			goto out;
325 		/* TODO: Read the disk bitmap sb and check if it needs recovery */
326 		dlm_unlock_sync(bm_lockres);
327 		lockres_free(bm_lockres);
328 	}
329 out:
330 	return ret;
331 }
332 
333 static int join(struct mddev *mddev, int nodes)
334 {
335 	struct md_cluster_info *cinfo;
336 	int ret, ops_rv;
337 	char str[64];
338 
339 	if (!try_module_get(THIS_MODULE))
340 		return -ENOENT;
341 
342 	cinfo = kzalloc(sizeof(struct md_cluster_info), GFP_KERNEL);
343 	if (!cinfo)
344 		return -ENOMEM;
345 
346 	init_completion(&cinfo->completion);
347 
348 	mutex_init(&cinfo->sb_mutex);
349 	mddev->cluster_info = cinfo;
350 
351 	memset(str, 0, 64);
352 	pretty_uuid(str, mddev->uuid);
353 	ret = dlm_new_lockspace(str, mddev->bitmap_info.cluster_name,
354 				DLM_LSFL_FS, LVB_SIZE,
355 				&md_ls_ops, mddev, &ops_rv, &cinfo->lockspace);
356 	if (ret)
357 		goto err;
358 	wait_for_completion(&cinfo->completion);
359 	if (nodes <= cinfo->slot_number) {
360 		pr_err("md-cluster: Slot allotted(%d) greater than available slots(%d)", cinfo->slot_number - 1,
361 			nodes);
362 		ret = -ERANGE;
363 		goto err;
364 	}
365 	cinfo->sb_lock = lockres_init(mddev, "cmd-super",
366 					NULL, 0);
367 	if (!cinfo->sb_lock) {
368 		ret = -ENOMEM;
369 		goto err;
370 	}
371 
372 	pr_info("md-cluster: Joined cluster %s slot %d\n", str, cinfo->slot_number);
373 	snprintf(str, 64, "bitmap%04d", cinfo->slot_number - 1);
374 	cinfo->bitmap_lockres = lockres_init(mddev, str, NULL, 1);
375 	if (!cinfo->bitmap_lockres)
376 		goto err;
377 	if (dlm_lock_sync(cinfo->bitmap_lockres, DLM_LOCK_PW)) {
378 		pr_err("Failed to get bitmap lock\n");
379 		ret = -EINVAL;
380 		goto err;
381 	}
382 
383 	INIT_LIST_HEAD(&cinfo->suspend_list);
384 	spin_lock_init(&cinfo->suspend_lock);
385 
386 	ret = gather_all_resync_info(mddev, nodes);
387 	if (ret)
388 		goto err;
389 
390 	return 0;
391 err:
392 	lockres_free(cinfo->bitmap_lockres);
393 	lockres_free(cinfo->sb_lock);
394 	if (cinfo->lockspace)
395 		dlm_release_lockspace(cinfo->lockspace, 2);
396 	mddev->cluster_info = NULL;
397 	kfree(cinfo);
398 	module_put(THIS_MODULE);
399 	return ret;
400 }
401 
402 static int leave(struct mddev *mddev)
403 {
404 	struct md_cluster_info *cinfo = mddev->cluster_info;
405 
406 	if (!cinfo)
407 		return 0;
408 	md_unregister_thread(&cinfo->recovery_thread);
409 	lockres_free(cinfo->sb_lock);
410 	lockres_free(cinfo->bitmap_lockres);
411 	dlm_release_lockspace(cinfo->lockspace, 2);
412 	return 0;
413 }
414 
415 /* slot_number(): Returns the MD slot number to use
416  * DLM starts the slot numbers from 1, wheras cluster-md
417  * wants the number to be from zero, so we deduct one
418  */
419 static int slot_number(struct mddev *mddev)
420 {
421 	struct md_cluster_info *cinfo = mddev->cluster_info;
422 
423 	return cinfo->slot_number - 1;
424 }
425 
426 static void resync_info_update(struct mddev *mddev, sector_t lo, sector_t hi)
427 {
428 	struct md_cluster_info *cinfo = mddev->cluster_info;
429 
430 	add_resync_info(mddev, cinfo->bitmap_lockres, lo, hi);
431 	/* Re-acquire the lock to refresh LVB */
432 	dlm_lock_sync(cinfo->bitmap_lockres, DLM_LOCK_PW);
433 }
434 
435 static struct md_cluster_operations cluster_ops = {
436 	.join   = join,
437 	.leave  = leave,
438 	.slot_number = slot_number,
439 	.resync_info_update = resync_info_update,
440 };
441 
442 static int __init cluster_init(void)
443 {
444 	pr_warn("md-cluster: EXPERIMENTAL. Use with caution\n");
445 	pr_info("Registering Cluster MD functions\n");
446 	register_md_cluster_operations(&cluster_ops, THIS_MODULE);
447 	return 0;
448 }
449 
450 static void cluster_exit(void)
451 {
452 	unregister_md_cluster_operations();
453 }
454 
455 module_init(cluster_init);
456 module_exit(cluster_exit);
457 MODULE_LICENSE("GPL");
458 MODULE_DESCRIPTION("Clustering support for MD");
459