xref: /openbmc/linux/drivers/md/md-cluster.c (revision 96ae923a)
1 /*
2  * Copyright (C) 2015, SUSE
3  *
4  * This program is free software; you can redistribute it and/or modify
5  * it under the terms of the GNU General Public License as published by
6  * the Free Software Foundation; either version 2, or (at your option)
7  * any later version.
8  *
9  */
10 
11 
12 #include <linux/module.h>
13 #include <linux/dlm.h>
14 #include <linux/sched.h>
15 #include "md.h"
16 #include "md-cluster.h"
17 
18 #define LVB_SIZE	64
19 
20 struct dlm_lock_resource {
21 	dlm_lockspace_t *ls;
22 	struct dlm_lksb lksb;
23 	char *name; /* lock name. */
24 	uint32_t flags; /* flags to pass to dlm_lock() */
25 	struct completion completion; /* completion for synchronized locking */
26 	void (*bast)(void *arg, int mode); /* blocking AST function pointer*/
27 	struct mddev *mddev; /* pointing back to mddev. */
28 };
29 
30 struct suspend_info {
31 	int slot;
32 	sector_t lo;
33 	sector_t hi;
34 	struct list_head list;
35 };
36 
37 struct resync_info {
38 	__le64 lo;
39 	__le64 hi;
40 };
41 
42 struct md_cluster_info {
43 	/* dlm lock space and resources for clustered raid. */
44 	dlm_lockspace_t *lockspace;
45 	int slot_number;
46 	struct completion completion;
47 	struct dlm_lock_resource *sb_lock;
48 	struct mutex sb_mutex;
49 	struct dlm_lock_resource *bitmap_lockres;
50 	struct list_head suspend_list;
51 	spinlock_t suspend_lock;
52 };
53 
54 static void sync_ast(void *arg)
55 {
56 	struct dlm_lock_resource *res;
57 
58 	res = (struct dlm_lock_resource *) arg;
59 	complete(&res->completion);
60 }
61 
62 static int dlm_lock_sync(struct dlm_lock_resource *res, int mode)
63 {
64 	int ret = 0;
65 
66 	init_completion(&res->completion);
67 	ret = dlm_lock(res->ls, mode, &res->lksb,
68 			res->flags, res->name, strlen(res->name),
69 			0, sync_ast, res, res->bast);
70 	if (ret)
71 		return ret;
72 	wait_for_completion(&res->completion);
73 	return res->lksb.sb_status;
74 }
75 
76 static int dlm_unlock_sync(struct dlm_lock_resource *res)
77 {
78 	return dlm_lock_sync(res, DLM_LOCK_NL);
79 }
80 
81 static struct dlm_lock_resource *lockres_init(struct mddev *mddev,
82 		char *name, void (*bastfn)(void *arg, int mode), int with_lvb)
83 {
84 	struct dlm_lock_resource *res = NULL;
85 	int ret, namelen;
86 	struct md_cluster_info *cinfo = mddev->cluster_info;
87 
88 	res = kzalloc(sizeof(struct dlm_lock_resource), GFP_KERNEL);
89 	if (!res)
90 		return NULL;
91 	res->ls = cinfo->lockspace;
92 	res->mddev = mddev;
93 	namelen = strlen(name);
94 	res->name = kzalloc(namelen + 1, GFP_KERNEL);
95 	if (!res->name) {
96 		pr_err("md-cluster: Unable to allocate resource name for resource %s\n", name);
97 		goto out_err;
98 	}
99 	strlcpy(res->name, name, namelen + 1);
100 	if (with_lvb) {
101 		res->lksb.sb_lvbptr = kzalloc(LVB_SIZE, GFP_KERNEL);
102 		if (!res->lksb.sb_lvbptr) {
103 			pr_err("md-cluster: Unable to allocate LVB for resource %s\n", name);
104 			goto out_err;
105 		}
106 		res->flags = DLM_LKF_VALBLK;
107 	}
108 
109 	if (bastfn)
110 		res->bast = bastfn;
111 
112 	res->flags |= DLM_LKF_EXPEDITE;
113 
114 	ret = dlm_lock_sync(res, DLM_LOCK_NL);
115 	if (ret) {
116 		pr_err("md-cluster: Unable to lock NL on new lock resource %s\n", name);
117 		goto out_err;
118 	}
119 	res->flags &= ~DLM_LKF_EXPEDITE;
120 	res->flags |= DLM_LKF_CONVERT;
121 
122 	return res;
123 out_err:
124 	kfree(res->lksb.sb_lvbptr);
125 	kfree(res->name);
126 	kfree(res);
127 	return NULL;
128 }
129 
130 static void lockres_free(struct dlm_lock_resource *res)
131 {
132 	if (!res)
133 		return;
134 
135 	init_completion(&res->completion);
136 	dlm_unlock(res->ls, res->lksb.sb_lkid, 0, &res->lksb, res);
137 	wait_for_completion(&res->completion);
138 
139 	kfree(res->name);
140 	kfree(res->lksb.sb_lvbptr);
141 	kfree(res);
142 }
143 
144 static char *pretty_uuid(char *dest, char *src)
145 {
146 	int i, len = 0;
147 
148 	for (i = 0; i < 16; i++) {
149 		if (i == 4 || i == 6 || i == 8 || i == 10)
150 			len += sprintf(dest + len, "-");
151 		len += sprintf(dest + len, "%02x", (__u8)src[i]);
152 	}
153 	return dest;
154 }
155 
156 static void add_resync_info(struct mddev *mddev, struct dlm_lock_resource *lockres,
157 		sector_t lo, sector_t hi)
158 {
159 	struct resync_info *ri;
160 
161 	ri = (struct resync_info *)lockres->lksb.sb_lvbptr;
162 	ri->lo = cpu_to_le64(lo);
163 	ri->hi = cpu_to_le64(hi);
164 }
165 
166 static struct suspend_info *read_resync_info(struct mddev *mddev, struct dlm_lock_resource *lockres)
167 {
168 	struct resync_info ri;
169 	struct suspend_info *s = NULL;
170 	sector_t hi = 0;
171 
172 	dlm_lock_sync(lockres, DLM_LOCK_CR);
173 	memcpy(&ri, lockres->lksb.sb_lvbptr, sizeof(struct resync_info));
174 	hi = le64_to_cpu(ri.hi);
175 	if (ri.hi > 0) {
176 		s = kzalloc(sizeof(struct suspend_info), GFP_KERNEL);
177 		if (!s)
178 			goto out;
179 		s->hi = hi;
180 		s->lo = le64_to_cpu(ri.lo);
181 	}
182 	dlm_unlock_sync(lockres);
183 out:
184 	return s;
185 }
186 
187 static void recover_prep(void *arg)
188 {
189 }
190 
191 static void recover_slot(void *arg, struct dlm_slot *slot)
192 {
193 	struct mddev *mddev = arg;
194 	struct md_cluster_info *cinfo = mddev->cluster_info;
195 
196 	pr_info("md-cluster: %s Node %d/%d down. My slot: %d. Initiating recovery.\n",
197 			mddev->bitmap_info.cluster_name,
198 			slot->nodeid, slot->slot,
199 			cinfo->slot_number);
200 }
201 
202 static void recover_done(void *arg, struct dlm_slot *slots,
203 		int num_slots, int our_slot,
204 		uint32_t generation)
205 {
206 	struct mddev *mddev = arg;
207 	struct md_cluster_info *cinfo = mddev->cluster_info;
208 
209 	cinfo->slot_number = our_slot;
210 	complete(&cinfo->completion);
211 }
212 
213 static const struct dlm_lockspace_ops md_ls_ops = {
214 	.recover_prep = recover_prep,
215 	.recover_slot = recover_slot,
216 	.recover_done = recover_done,
217 };
218 
219 static int gather_all_resync_info(struct mddev *mddev, int total_slots)
220 {
221 	struct md_cluster_info *cinfo = mddev->cluster_info;
222 	int i, ret = 0;
223 	struct dlm_lock_resource *bm_lockres;
224 	struct suspend_info *s;
225 	char str[64];
226 
227 
228 	for (i = 0; i < total_slots; i++) {
229 		memset(str, '\0', 64);
230 		snprintf(str, 64, "bitmap%04d", i);
231 		bm_lockres = lockres_init(mddev, str, NULL, 1);
232 		if (!bm_lockres)
233 			return -ENOMEM;
234 		if (i == (cinfo->slot_number - 1))
235 			continue;
236 
237 		bm_lockres->flags |= DLM_LKF_NOQUEUE;
238 		ret = dlm_lock_sync(bm_lockres, DLM_LOCK_PW);
239 		if (ret == -EAGAIN) {
240 			memset(bm_lockres->lksb.sb_lvbptr, '\0', LVB_SIZE);
241 			s = read_resync_info(mddev, bm_lockres);
242 			if (s) {
243 				pr_info("%s:%d Resync[%llu..%llu] in progress on %d\n",
244 						__func__, __LINE__,
245 						(unsigned long long) s->lo,
246 						(unsigned long long) s->hi, i);
247 				spin_lock_irq(&cinfo->suspend_lock);
248 				s->slot = i;
249 				list_add(&s->list, &cinfo->suspend_list);
250 				spin_unlock_irq(&cinfo->suspend_lock);
251 			}
252 			ret = 0;
253 			lockres_free(bm_lockres);
254 			continue;
255 		}
256 		if (ret)
257 			goto out;
258 		/* TODO: Read the disk bitmap sb and check if it needs recovery */
259 		dlm_unlock_sync(bm_lockres);
260 		lockres_free(bm_lockres);
261 	}
262 out:
263 	return ret;
264 }
265 
266 static int join(struct mddev *mddev, int nodes)
267 {
268 	struct md_cluster_info *cinfo;
269 	int ret, ops_rv;
270 	char str[64];
271 
272 	if (!try_module_get(THIS_MODULE))
273 		return -ENOENT;
274 
275 	cinfo = kzalloc(sizeof(struct md_cluster_info), GFP_KERNEL);
276 	if (!cinfo)
277 		return -ENOMEM;
278 
279 	init_completion(&cinfo->completion);
280 
281 	mutex_init(&cinfo->sb_mutex);
282 	mddev->cluster_info = cinfo;
283 
284 	memset(str, 0, 64);
285 	pretty_uuid(str, mddev->uuid);
286 	ret = dlm_new_lockspace(str, mddev->bitmap_info.cluster_name,
287 				DLM_LSFL_FS, LVB_SIZE,
288 				&md_ls_ops, mddev, &ops_rv, &cinfo->lockspace);
289 	if (ret)
290 		goto err;
291 	wait_for_completion(&cinfo->completion);
292 	if (nodes <= cinfo->slot_number) {
293 		pr_err("md-cluster: Slot allotted(%d) greater than available slots(%d)", cinfo->slot_number - 1,
294 			nodes);
295 		ret = -ERANGE;
296 		goto err;
297 	}
298 	cinfo->sb_lock = lockres_init(mddev, "cmd-super",
299 					NULL, 0);
300 	if (!cinfo->sb_lock) {
301 		ret = -ENOMEM;
302 		goto err;
303 	}
304 
305 	pr_info("md-cluster: Joined cluster %s slot %d\n", str, cinfo->slot_number);
306 	snprintf(str, 64, "bitmap%04d", cinfo->slot_number - 1);
307 	cinfo->bitmap_lockres = lockres_init(mddev, str, NULL, 1);
308 	if (!cinfo->bitmap_lockres)
309 		goto err;
310 	if (dlm_lock_sync(cinfo->bitmap_lockres, DLM_LOCK_PW)) {
311 		pr_err("Failed to get bitmap lock\n");
312 		ret = -EINVAL;
313 		goto err;
314 	}
315 
316 	INIT_LIST_HEAD(&cinfo->suspend_list);
317 	spin_lock_init(&cinfo->suspend_lock);
318 
319 	ret = gather_all_resync_info(mddev, nodes);
320 	if (ret)
321 		goto err;
322 
323 	return 0;
324 err:
325 	lockres_free(cinfo->bitmap_lockres);
326 	lockres_free(cinfo->sb_lock);
327 	if (cinfo->lockspace)
328 		dlm_release_lockspace(cinfo->lockspace, 2);
329 	mddev->cluster_info = NULL;
330 	kfree(cinfo);
331 	module_put(THIS_MODULE);
332 	return ret;
333 }
334 
335 static int leave(struct mddev *mddev)
336 {
337 	struct md_cluster_info *cinfo = mddev->cluster_info;
338 
339 	if (!cinfo)
340 		return 0;
341 	lockres_free(cinfo->sb_lock);
342 	lockres_free(cinfo->bitmap_lockres);
343 	dlm_release_lockspace(cinfo->lockspace, 2);
344 	return 0;
345 }
346 
347 /* slot_number(): Returns the MD slot number to use
348  * DLM starts the slot numbers from 1, wheras cluster-md
349  * wants the number to be from zero, so we deduct one
350  */
351 static int slot_number(struct mddev *mddev)
352 {
353 	struct md_cluster_info *cinfo = mddev->cluster_info;
354 
355 	return cinfo->slot_number - 1;
356 }
357 
358 static void resync_info_update(struct mddev *mddev, sector_t lo, sector_t hi)
359 {
360 	struct md_cluster_info *cinfo = mddev->cluster_info;
361 
362 	add_resync_info(mddev, cinfo->bitmap_lockres, lo, hi);
363 	/* Re-acquire the lock to refresh LVB */
364 	dlm_lock_sync(cinfo->bitmap_lockres, DLM_LOCK_PW);
365 }
366 
367 static struct md_cluster_operations cluster_ops = {
368 	.join   = join,
369 	.leave  = leave,
370 	.slot_number = slot_number,
371 	.resync_info_update = resync_info_update,
372 };
373 
374 static int __init cluster_init(void)
375 {
376 	pr_warn("md-cluster: EXPERIMENTAL. Use with caution\n");
377 	pr_info("Registering Cluster MD functions\n");
378 	register_md_cluster_operations(&cluster_ops, THIS_MODULE);
379 	return 0;
380 }
381 
382 static void cluster_exit(void)
383 {
384 	unregister_md_cluster_operations();
385 }
386 
387 module_init(cluster_init);
388 module_exit(cluster_exit);
389 MODULE_LICENSE("GPL");
390 MODULE_DESCRIPTION("Clustering support for MD");
391