xref: /openbmc/linux/drivers/md/md-cluster.c (revision c186b128)
1 /*
2  * Copyright (C) 2015, SUSE
3  *
4  * This program is free software; you can redistribute it and/or modify
5  * it under the terms of the GNU General Public License as published by
6  * the Free Software Foundation; either version 2, or (at your option)
7  * any later version.
8  *
9  */
10 
11 
12 #include <linux/module.h>
13 #include <linux/dlm.h>
14 #include <linux/sched.h>
15 #include <linux/raid/md_p.h>
16 #include "md.h"
17 #include "bitmap.h"
18 #include "md-cluster.h"
19 
20 #define LVB_SIZE	64
21 #define NEW_DEV_TIMEOUT 5000
22 
23 struct dlm_lock_resource {
24 	dlm_lockspace_t *ls;
25 	struct dlm_lksb lksb;
26 	char *name; /* lock name. */
27 	uint32_t flags; /* flags to pass to dlm_lock() */
28 	struct completion completion; /* completion for synchronized locking */
29 	void (*bast)(void *arg, int mode); /* blocking AST function pointer*/
30 	struct mddev *mddev; /* pointing back to mddev. */
31 };
32 
33 struct suspend_info {
34 	int slot;
35 	sector_t lo;
36 	sector_t hi;
37 	struct list_head list;
38 };
39 
40 struct resync_info {
41 	__le64 lo;
42 	__le64 hi;
43 };
44 
45 /* md_cluster_info flags */
46 #define		MD_CLUSTER_WAITING_FOR_NEWDISK		1
47 #define		MD_CLUSTER_SUSPEND_READ_BALANCING	2
48 #define		MD_CLUSTER_BEGIN_JOIN_CLUSTER		3
49 
50 
51 struct md_cluster_info {
52 	/* dlm lock space and resources for clustered raid. */
53 	dlm_lockspace_t *lockspace;
54 	int slot_number;
55 	struct completion completion;
56 	struct mutex sb_mutex;
57 	struct dlm_lock_resource *bitmap_lockres;
58 	struct dlm_lock_resource *resync_lockres;
59 	struct list_head suspend_list;
60 	spinlock_t suspend_lock;
61 	struct md_thread *recovery_thread;
62 	unsigned long recovery_map;
63 	/* communication loc resources */
64 	struct dlm_lock_resource *ack_lockres;
65 	struct dlm_lock_resource *message_lockres;
66 	struct dlm_lock_resource *token_lockres;
67 	struct dlm_lock_resource *no_new_dev_lockres;
68 	struct md_thread *recv_thread;
69 	struct completion newdisk_completion;
70 	unsigned long state;
71 };
72 
73 enum msg_type {
74 	METADATA_UPDATED = 0,
75 	RESYNCING,
76 	NEWDISK,
77 	REMOVE,
78 	RE_ADD,
79 	BITMAP_NEEDS_SYNC,
80 };
81 
82 struct cluster_msg {
83 	int type;
84 	int slot;
85 	/* TODO: Unionize this for smaller footprint */
86 	sector_t low;
87 	sector_t high;
88 	char uuid[16];
89 	int raid_slot;
90 };
91 
92 static void sync_ast(void *arg)
93 {
94 	struct dlm_lock_resource *res;
95 
96 	res = (struct dlm_lock_resource *) arg;
97 	complete(&res->completion);
98 }
99 
100 static int dlm_lock_sync(struct dlm_lock_resource *res, int mode)
101 {
102 	int ret = 0;
103 
104 	ret = dlm_lock(res->ls, mode, &res->lksb,
105 			res->flags, res->name, strlen(res->name),
106 			0, sync_ast, res, res->bast);
107 	if (ret)
108 		return ret;
109 	wait_for_completion(&res->completion);
110 	return res->lksb.sb_status;
111 }
112 
113 static int dlm_unlock_sync(struct dlm_lock_resource *res)
114 {
115 	return dlm_lock_sync(res, DLM_LOCK_NL);
116 }
117 
118 static struct dlm_lock_resource *lockres_init(struct mddev *mddev,
119 		char *name, void (*bastfn)(void *arg, int mode), int with_lvb)
120 {
121 	struct dlm_lock_resource *res = NULL;
122 	int ret, namelen;
123 	struct md_cluster_info *cinfo = mddev->cluster_info;
124 
125 	res = kzalloc(sizeof(struct dlm_lock_resource), GFP_KERNEL);
126 	if (!res)
127 		return NULL;
128 	init_completion(&res->completion);
129 	res->ls = cinfo->lockspace;
130 	res->mddev = mddev;
131 	namelen = strlen(name);
132 	res->name = kzalloc(namelen + 1, GFP_KERNEL);
133 	if (!res->name) {
134 		pr_err("md-cluster: Unable to allocate resource name for resource %s\n", name);
135 		goto out_err;
136 	}
137 	strlcpy(res->name, name, namelen + 1);
138 	if (with_lvb) {
139 		res->lksb.sb_lvbptr = kzalloc(LVB_SIZE, GFP_KERNEL);
140 		if (!res->lksb.sb_lvbptr) {
141 			pr_err("md-cluster: Unable to allocate LVB for resource %s\n", name);
142 			goto out_err;
143 		}
144 		res->flags = DLM_LKF_VALBLK;
145 	}
146 
147 	if (bastfn)
148 		res->bast = bastfn;
149 
150 	res->flags |= DLM_LKF_EXPEDITE;
151 
152 	ret = dlm_lock_sync(res, DLM_LOCK_NL);
153 	if (ret) {
154 		pr_err("md-cluster: Unable to lock NL on new lock resource %s\n", name);
155 		goto out_err;
156 	}
157 	res->flags &= ~DLM_LKF_EXPEDITE;
158 	res->flags |= DLM_LKF_CONVERT;
159 
160 	return res;
161 out_err:
162 	kfree(res->lksb.sb_lvbptr);
163 	kfree(res->name);
164 	kfree(res);
165 	return NULL;
166 }
167 
168 static void lockres_free(struct dlm_lock_resource *res)
169 {
170 	int ret;
171 
172 	if (!res)
173 		return;
174 
175 	/* cancel a lock request or a conversion request that is blocked */
176 	res->flags |= DLM_LKF_CANCEL;
177 retry:
178 	ret = dlm_unlock(res->ls, res->lksb.sb_lkid, 0, &res->lksb, res);
179 	if (unlikely(ret != 0)) {
180 		pr_info("%s: failed to unlock %s return %d\n", __func__, res->name, ret);
181 
182 		/* if a lock conversion is cancelled, then the lock is put
183 		 * back to grant queue, need to ensure it is unlocked */
184 		if (ret == -DLM_ECANCEL)
185 			goto retry;
186 	}
187 	res->flags &= ~DLM_LKF_CANCEL;
188 	wait_for_completion(&res->completion);
189 
190 	kfree(res->name);
191 	kfree(res->lksb.sb_lvbptr);
192 	kfree(res);
193 }
194 
195 static void add_resync_info(struct mddev *mddev, struct dlm_lock_resource *lockres,
196 		sector_t lo, sector_t hi)
197 {
198 	struct resync_info *ri;
199 
200 	ri = (struct resync_info *)lockres->lksb.sb_lvbptr;
201 	ri->lo = cpu_to_le64(lo);
202 	ri->hi = cpu_to_le64(hi);
203 }
204 
205 static struct suspend_info *read_resync_info(struct mddev *mddev, struct dlm_lock_resource *lockres)
206 {
207 	struct resync_info ri;
208 	struct suspend_info *s = NULL;
209 	sector_t hi = 0;
210 
211 	dlm_lock_sync(lockres, DLM_LOCK_CR);
212 	memcpy(&ri, lockres->lksb.sb_lvbptr, sizeof(struct resync_info));
213 	hi = le64_to_cpu(ri.hi);
214 	if (ri.hi > 0) {
215 		s = kzalloc(sizeof(struct suspend_info), GFP_KERNEL);
216 		if (!s)
217 			goto out;
218 		s->hi = hi;
219 		s->lo = le64_to_cpu(ri.lo);
220 	}
221 	dlm_unlock_sync(lockres);
222 out:
223 	return s;
224 }
225 
226 static void recover_bitmaps(struct md_thread *thread)
227 {
228 	struct mddev *mddev = thread->mddev;
229 	struct md_cluster_info *cinfo = mddev->cluster_info;
230 	struct dlm_lock_resource *bm_lockres;
231 	char str[64];
232 	int slot, ret;
233 	struct suspend_info *s, *tmp;
234 	sector_t lo, hi;
235 
236 	while (cinfo->recovery_map) {
237 		slot = fls64((u64)cinfo->recovery_map) - 1;
238 
239 		/* Clear suspend_area associated with the bitmap */
240 		spin_lock_irq(&cinfo->suspend_lock);
241 		list_for_each_entry_safe(s, tmp, &cinfo->suspend_list, list)
242 			if (slot == s->slot) {
243 				list_del(&s->list);
244 				kfree(s);
245 			}
246 		spin_unlock_irq(&cinfo->suspend_lock);
247 
248 		snprintf(str, 64, "bitmap%04d", slot);
249 		bm_lockres = lockres_init(mddev, str, NULL, 1);
250 		if (!bm_lockres) {
251 			pr_err("md-cluster: Cannot initialize bitmaps\n");
252 			goto clear_bit;
253 		}
254 
255 		ret = dlm_lock_sync(bm_lockres, DLM_LOCK_PW);
256 		if (ret) {
257 			pr_err("md-cluster: Could not DLM lock %s: %d\n",
258 					str, ret);
259 			goto clear_bit;
260 		}
261 		ret = bitmap_copy_from_slot(mddev, slot, &lo, &hi, true);
262 		if (ret) {
263 			pr_err("md-cluster: Could not copy data from bitmap %d\n", slot);
264 			goto dlm_unlock;
265 		}
266 		if (hi > 0) {
267 			/* TODO:Wait for current resync to get over */
268 			set_bit(MD_RECOVERY_NEEDED, &mddev->recovery);
269 			if (lo < mddev->recovery_cp)
270 				mddev->recovery_cp = lo;
271 			md_check_recovery(mddev);
272 		}
273 dlm_unlock:
274 		dlm_unlock_sync(bm_lockres);
275 clear_bit:
276 		clear_bit(slot, &cinfo->recovery_map);
277 	}
278 }
279 
280 static void recover_prep(void *arg)
281 {
282 	struct mddev *mddev = arg;
283 	struct md_cluster_info *cinfo = mddev->cluster_info;
284 	set_bit(MD_CLUSTER_SUSPEND_READ_BALANCING, &cinfo->state);
285 }
286 
287 static void __recover_slot(struct mddev *mddev, int slot)
288 {
289 	struct md_cluster_info *cinfo = mddev->cluster_info;
290 
291 	set_bit(slot, &cinfo->recovery_map);
292 	if (!cinfo->recovery_thread) {
293 		cinfo->recovery_thread = md_register_thread(recover_bitmaps,
294 				mddev, "recover");
295 		if (!cinfo->recovery_thread) {
296 			pr_warn("md-cluster: Could not create recovery thread\n");
297 			return;
298 		}
299 	}
300 	md_wakeup_thread(cinfo->recovery_thread);
301 }
302 
303 static void recover_slot(void *arg, struct dlm_slot *slot)
304 {
305 	struct mddev *mddev = arg;
306 	struct md_cluster_info *cinfo = mddev->cluster_info;
307 
308 	pr_info("md-cluster: %s Node %d/%d down. My slot: %d. Initiating recovery.\n",
309 			mddev->bitmap_info.cluster_name,
310 			slot->nodeid, slot->slot,
311 			cinfo->slot_number);
312 	/* deduct one since dlm slot starts from one while the num of
313 	 * cluster-md begins with 0 */
314 	__recover_slot(mddev, slot->slot - 1);
315 }
316 
317 static void recover_done(void *arg, struct dlm_slot *slots,
318 		int num_slots, int our_slot,
319 		uint32_t generation)
320 {
321 	struct mddev *mddev = arg;
322 	struct md_cluster_info *cinfo = mddev->cluster_info;
323 
324 	cinfo->slot_number = our_slot;
325 	/* completion is only need to be complete when node join cluster,
326 	 * it doesn't need to run during another node's failure */
327 	if (test_bit(MD_CLUSTER_BEGIN_JOIN_CLUSTER, &cinfo->state)) {
328 		complete(&cinfo->completion);
329 		clear_bit(MD_CLUSTER_BEGIN_JOIN_CLUSTER, &cinfo->state);
330 	}
331 	clear_bit(MD_CLUSTER_SUSPEND_READ_BALANCING, &cinfo->state);
332 }
333 
334 /* the ops is called when node join the cluster, and do lock recovery
335  * if node failure occurs */
336 static const struct dlm_lockspace_ops md_ls_ops = {
337 	.recover_prep = recover_prep,
338 	.recover_slot = recover_slot,
339 	.recover_done = recover_done,
340 };
341 
342 /*
343  * The BAST function for the ack lock resource
344  * This function wakes up the receive thread in
345  * order to receive and process the message.
346  */
347 static void ack_bast(void *arg, int mode)
348 {
349 	struct dlm_lock_resource *res = (struct dlm_lock_resource *)arg;
350 	struct md_cluster_info *cinfo = res->mddev->cluster_info;
351 
352 	if (mode == DLM_LOCK_EX)
353 		md_wakeup_thread(cinfo->recv_thread);
354 }
355 
356 static void __remove_suspend_info(struct md_cluster_info *cinfo, int slot)
357 {
358 	struct suspend_info *s, *tmp;
359 
360 	list_for_each_entry_safe(s, tmp, &cinfo->suspend_list, list)
361 		if (slot == s->slot) {
362 			pr_info("%s:%d Deleting suspend_info: %d\n",
363 					__func__, __LINE__, slot);
364 			list_del(&s->list);
365 			kfree(s);
366 			break;
367 		}
368 }
369 
370 static void remove_suspend_info(struct mddev *mddev, int slot)
371 {
372 	struct md_cluster_info *cinfo = mddev->cluster_info;
373 	spin_lock_irq(&cinfo->suspend_lock);
374 	__remove_suspend_info(cinfo, slot);
375 	spin_unlock_irq(&cinfo->suspend_lock);
376 	mddev->pers->quiesce(mddev, 2);
377 }
378 
379 
380 static void process_suspend_info(struct mddev *mddev,
381 		int slot, sector_t lo, sector_t hi)
382 {
383 	struct md_cluster_info *cinfo = mddev->cluster_info;
384 	struct suspend_info *s;
385 
386 	if (!hi) {
387 		remove_suspend_info(mddev, slot);
388 		set_bit(MD_RECOVERY_NEEDED, &mddev->recovery);
389 		md_wakeup_thread(mddev->thread);
390 		return;
391 	}
392 	s = kzalloc(sizeof(struct suspend_info), GFP_KERNEL);
393 	if (!s)
394 		return;
395 	s->slot = slot;
396 	s->lo = lo;
397 	s->hi = hi;
398 	mddev->pers->quiesce(mddev, 1);
399 	mddev->pers->quiesce(mddev, 0);
400 	spin_lock_irq(&cinfo->suspend_lock);
401 	/* Remove existing entry (if exists) before adding */
402 	__remove_suspend_info(cinfo, slot);
403 	list_add(&s->list, &cinfo->suspend_list);
404 	spin_unlock_irq(&cinfo->suspend_lock);
405 	mddev->pers->quiesce(mddev, 2);
406 }
407 
408 static void process_add_new_disk(struct mddev *mddev, struct cluster_msg *cmsg)
409 {
410 	char disk_uuid[64];
411 	struct md_cluster_info *cinfo = mddev->cluster_info;
412 	char event_name[] = "EVENT=ADD_DEVICE";
413 	char raid_slot[16];
414 	char *envp[] = {event_name, disk_uuid, raid_slot, NULL};
415 	int len;
416 
417 	len = snprintf(disk_uuid, 64, "DEVICE_UUID=");
418 	sprintf(disk_uuid + len, "%pU", cmsg->uuid);
419 	snprintf(raid_slot, 16, "RAID_DISK=%d", cmsg->raid_slot);
420 	pr_info("%s:%d Sending kobject change with %s and %s\n", __func__, __LINE__, disk_uuid, raid_slot);
421 	init_completion(&cinfo->newdisk_completion);
422 	set_bit(MD_CLUSTER_WAITING_FOR_NEWDISK, &cinfo->state);
423 	kobject_uevent_env(&disk_to_dev(mddev->gendisk)->kobj, KOBJ_CHANGE, envp);
424 	wait_for_completion_timeout(&cinfo->newdisk_completion,
425 			NEW_DEV_TIMEOUT);
426 	clear_bit(MD_CLUSTER_WAITING_FOR_NEWDISK, &cinfo->state);
427 }
428 
429 
430 static void process_metadata_update(struct mddev *mddev, struct cluster_msg *msg)
431 {
432 	struct md_cluster_info *cinfo = mddev->cluster_info;
433 	md_reload_sb(mddev, le32_to_cpu(msg->raid_slot));
434 	dlm_lock_sync(cinfo->no_new_dev_lockres, DLM_LOCK_CR);
435 }
436 
437 static void process_remove_disk(struct mddev *mddev, struct cluster_msg *msg)
438 {
439 	struct md_rdev *rdev = md_find_rdev_nr_rcu(mddev, msg->raid_slot);
440 
441 	if (rdev)
442 		md_kick_rdev_from_array(rdev);
443 	else
444 		pr_warn("%s: %d Could not find disk(%d) to REMOVE\n", __func__, __LINE__, msg->raid_slot);
445 }
446 
447 static void process_readd_disk(struct mddev *mddev, struct cluster_msg *msg)
448 {
449 	struct md_rdev *rdev = md_find_rdev_nr_rcu(mddev, msg->raid_slot);
450 
451 	if (rdev && test_bit(Faulty, &rdev->flags))
452 		clear_bit(Faulty, &rdev->flags);
453 	else
454 		pr_warn("%s: %d Could not find disk(%d) which is faulty", __func__, __LINE__, msg->raid_slot);
455 }
456 
457 static void process_recvd_msg(struct mddev *mddev, struct cluster_msg *msg)
458 {
459 	switch (msg->type) {
460 	case METADATA_UPDATED:
461 		pr_info("%s: %d Received message: METADATA_UPDATE from %d\n",
462 			__func__, __LINE__, msg->slot);
463 		process_metadata_update(mddev, msg);
464 		break;
465 	case RESYNCING:
466 		pr_info("%s: %d Received message: RESYNCING from %d\n",
467 			__func__, __LINE__, msg->slot);
468 		process_suspend_info(mddev, msg->slot,
469 				msg->low, msg->high);
470 		break;
471 	case NEWDISK:
472 		pr_info("%s: %d Received message: NEWDISK from %d\n",
473 			__func__, __LINE__, msg->slot);
474 		process_add_new_disk(mddev, msg);
475 		break;
476 	case REMOVE:
477 		pr_info("%s: %d Received REMOVE from %d\n",
478 			__func__, __LINE__, msg->slot);
479 		process_remove_disk(mddev, msg);
480 		break;
481 	case RE_ADD:
482 		pr_info("%s: %d Received RE_ADD from %d\n",
483 			__func__, __LINE__, msg->slot);
484 		process_readd_disk(mddev, msg);
485 		break;
486 	case BITMAP_NEEDS_SYNC:
487 		pr_info("%s: %d Received BITMAP_NEEDS_SYNC from %d\n",
488 			__func__, __LINE__, msg->slot);
489 		__recover_slot(mddev, msg->slot);
490 		break;
491 	default:
492 		pr_warn("%s:%d Received unknown message from %d\n",
493 			__func__, __LINE__, msg->slot);
494 	}
495 }
496 
497 /*
498  * thread for receiving message
499  */
500 static void recv_daemon(struct md_thread *thread)
501 {
502 	struct md_cluster_info *cinfo = thread->mddev->cluster_info;
503 	struct dlm_lock_resource *ack_lockres = cinfo->ack_lockres;
504 	struct dlm_lock_resource *message_lockres = cinfo->message_lockres;
505 	struct cluster_msg msg;
506 	int ret;
507 
508 	/*get CR on Message*/
509 	if (dlm_lock_sync(message_lockres, DLM_LOCK_CR)) {
510 		pr_err("md/raid1:failed to get CR on MESSAGE\n");
511 		return;
512 	}
513 
514 	/* read lvb and wake up thread to process this message_lockres */
515 	memcpy(&msg, message_lockres->lksb.sb_lvbptr, sizeof(struct cluster_msg));
516 	process_recvd_msg(thread->mddev, &msg);
517 
518 	/*release CR on ack_lockres*/
519 	ret = dlm_unlock_sync(ack_lockres);
520 	if (unlikely(ret != 0))
521 		pr_info("unlock ack failed return %d\n", ret);
522 	/*up-convert to PR on message_lockres*/
523 	ret = dlm_lock_sync(message_lockres, DLM_LOCK_PR);
524 	if (unlikely(ret != 0))
525 		pr_info("lock PR on msg failed return %d\n", ret);
526 	/*get CR on ack_lockres again*/
527 	ret = dlm_lock_sync(ack_lockres, DLM_LOCK_CR);
528 	if (unlikely(ret != 0))
529 		pr_info("lock CR on ack failed return %d\n", ret);
530 	/*release CR on message_lockres*/
531 	ret = dlm_unlock_sync(message_lockres);
532 	if (unlikely(ret != 0))
533 		pr_info("unlock msg failed return %d\n", ret);
534 }
535 
536 /* lock_comm()
537  * Takes the lock on the TOKEN lock resource so no other
538  * node can communicate while the operation is underway.
539  */
540 static int lock_comm(struct md_cluster_info *cinfo)
541 {
542 	int error;
543 
544 	error = dlm_lock_sync(cinfo->token_lockres, DLM_LOCK_EX);
545 	if (error)
546 		pr_err("md-cluster(%s:%d): failed to get EX on TOKEN (%d)\n",
547 				__func__, __LINE__, error);
548 	return error;
549 }
550 
551 static void unlock_comm(struct md_cluster_info *cinfo)
552 {
553 	dlm_unlock_sync(cinfo->token_lockres);
554 }
555 
556 /* __sendmsg()
557  * This function performs the actual sending of the message. This function is
558  * usually called after performing the encompassing operation
559  * The function:
560  * 1. Grabs the message lockresource in EX mode
561  * 2. Copies the message to the message LVB
562  * 3. Downconverts message lockresource to CW
563  * 4. Upconverts ack lock resource from CR to EX. This forces the BAST on other nodes
564  *    and the other nodes read the message. The thread will wait here until all other
565  *    nodes have released ack lock resource.
566  * 5. Downconvert ack lockresource to CR
567  */
568 static int __sendmsg(struct md_cluster_info *cinfo, struct cluster_msg *cmsg)
569 {
570 	int error;
571 	int slot = cinfo->slot_number - 1;
572 
573 	cmsg->slot = cpu_to_le32(slot);
574 	/*get EX on Message*/
575 	error = dlm_lock_sync(cinfo->message_lockres, DLM_LOCK_EX);
576 	if (error) {
577 		pr_err("md-cluster: failed to get EX on MESSAGE (%d)\n", error);
578 		goto failed_message;
579 	}
580 
581 	memcpy(cinfo->message_lockres->lksb.sb_lvbptr, (void *)cmsg,
582 			sizeof(struct cluster_msg));
583 	/*down-convert EX to CW on Message*/
584 	error = dlm_lock_sync(cinfo->message_lockres, DLM_LOCK_CW);
585 	if (error) {
586 		pr_err("md-cluster: failed to convert EX to CW on MESSAGE(%d)\n",
587 				error);
588 		goto failed_ack;
589 	}
590 
591 	/*up-convert CR to EX on Ack*/
592 	error = dlm_lock_sync(cinfo->ack_lockres, DLM_LOCK_EX);
593 	if (error) {
594 		pr_err("md-cluster: failed to convert CR to EX on ACK(%d)\n",
595 				error);
596 		goto failed_ack;
597 	}
598 
599 	/*down-convert EX to CR on Ack*/
600 	error = dlm_lock_sync(cinfo->ack_lockres, DLM_LOCK_CR);
601 	if (error) {
602 		pr_err("md-cluster: failed to convert EX to CR on ACK(%d)\n",
603 				error);
604 		goto failed_ack;
605 	}
606 
607 failed_ack:
608 	error = dlm_unlock_sync(cinfo->message_lockres);
609 	if (unlikely(error != 0)) {
610 		pr_err("md-cluster: failed convert to NL on MESSAGE(%d)\n",
611 			error);
612 		/* in case the message can't be released due to some reason */
613 		goto failed_ack;
614 	}
615 failed_message:
616 	return error;
617 }
618 
619 static int sendmsg(struct md_cluster_info *cinfo, struct cluster_msg *cmsg)
620 {
621 	int ret;
622 
623 	lock_comm(cinfo);
624 	ret = __sendmsg(cinfo, cmsg);
625 	unlock_comm(cinfo);
626 	return ret;
627 }
628 
629 static int gather_all_resync_info(struct mddev *mddev, int total_slots)
630 {
631 	struct md_cluster_info *cinfo = mddev->cluster_info;
632 	int i, ret = 0;
633 	struct dlm_lock_resource *bm_lockres;
634 	struct suspend_info *s;
635 	char str[64];
636 	sector_t lo, hi;
637 
638 
639 	for (i = 0; i < total_slots; i++) {
640 		memset(str, '\0', 64);
641 		snprintf(str, 64, "bitmap%04d", i);
642 		bm_lockres = lockres_init(mddev, str, NULL, 1);
643 		if (!bm_lockres)
644 			return -ENOMEM;
645 		if (i == (cinfo->slot_number - 1))
646 			continue;
647 
648 		bm_lockres->flags |= DLM_LKF_NOQUEUE;
649 		ret = dlm_lock_sync(bm_lockres, DLM_LOCK_PW);
650 		if (ret == -EAGAIN) {
651 			memset(bm_lockres->lksb.sb_lvbptr, '\0', LVB_SIZE);
652 			s = read_resync_info(mddev, bm_lockres);
653 			if (s) {
654 				pr_info("%s:%d Resync[%llu..%llu] in progress on %d\n",
655 						__func__, __LINE__,
656 						(unsigned long long) s->lo,
657 						(unsigned long long) s->hi, i);
658 				spin_lock_irq(&cinfo->suspend_lock);
659 				s->slot = i;
660 				list_add(&s->list, &cinfo->suspend_list);
661 				spin_unlock_irq(&cinfo->suspend_lock);
662 			}
663 			ret = 0;
664 			lockres_free(bm_lockres);
665 			continue;
666 		}
667 		if (ret) {
668 			lockres_free(bm_lockres);
669 			goto out;
670 		}
671 
672 		/* Read the disk bitmap sb and check if it needs recovery */
673 		ret = bitmap_copy_from_slot(mddev, i, &lo, &hi, false);
674 		if (ret) {
675 			pr_warn("md-cluster: Could not gather bitmaps from slot %d", i);
676 			lockres_free(bm_lockres);
677 			continue;
678 		}
679 		if ((hi > 0) && (lo < mddev->recovery_cp)) {
680 			set_bit(MD_RECOVERY_NEEDED, &mddev->recovery);
681 			mddev->recovery_cp = lo;
682 			md_check_recovery(mddev);
683 		}
684 
685 		dlm_unlock_sync(bm_lockres);
686 		lockres_free(bm_lockres);
687 	}
688 out:
689 	return ret;
690 }
691 
692 static int join(struct mddev *mddev, int nodes)
693 {
694 	struct md_cluster_info *cinfo;
695 	int ret, ops_rv;
696 	char str[64];
697 
698 	cinfo = kzalloc(sizeof(struct md_cluster_info), GFP_KERNEL);
699 	if (!cinfo)
700 		return -ENOMEM;
701 
702 	INIT_LIST_HEAD(&cinfo->suspend_list);
703 	spin_lock_init(&cinfo->suspend_lock);
704 	init_completion(&cinfo->completion);
705 	set_bit(MD_CLUSTER_BEGIN_JOIN_CLUSTER, &cinfo->state);
706 
707 	mutex_init(&cinfo->sb_mutex);
708 	mddev->cluster_info = cinfo;
709 
710 	memset(str, 0, 64);
711 	sprintf(str, "%pU", mddev->uuid);
712 	ret = dlm_new_lockspace(str, mddev->bitmap_info.cluster_name,
713 				DLM_LSFL_FS, LVB_SIZE,
714 				&md_ls_ops, mddev, &ops_rv, &cinfo->lockspace);
715 	if (ret)
716 		goto err;
717 	wait_for_completion(&cinfo->completion);
718 	if (nodes < cinfo->slot_number) {
719 		pr_err("md-cluster: Slot allotted(%d) is greater than available slots(%d).",
720 			cinfo->slot_number, nodes);
721 		ret = -ERANGE;
722 		goto err;
723 	}
724 	/* Initiate the communication resources */
725 	ret = -ENOMEM;
726 	cinfo->recv_thread = md_register_thread(recv_daemon, mddev, "cluster_recv");
727 	if (!cinfo->recv_thread) {
728 		pr_err("md-cluster: cannot allocate memory for recv_thread!\n");
729 		goto err;
730 	}
731 	cinfo->message_lockres = lockres_init(mddev, "message", NULL, 1);
732 	if (!cinfo->message_lockres)
733 		goto err;
734 	cinfo->token_lockres = lockres_init(mddev, "token", NULL, 0);
735 	if (!cinfo->token_lockres)
736 		goto err;
737 	cinfo->ack_lockres = lockres_init(mddev, "ack", ack_bast, 0);
738 	if (!cinfo->ack_lockres)
739 		goto err;
740 	cinfo->no_new_dev_lockres = lockres_init(mddev, "no-new-dev", NULL, 0);
741 	if (!cinfo->no_new_dev_lockres)
742 		goto err;
743 
744 	/* get sync CR lock on ACK. */
745 	if (dlm_lock_sync(cinfo->ack_lockres, DLM_LOCK_CR))
746 		pr_err("md-cluster: failed to get a sync CR lock on ACK!(%d)\n",
747 				ret);
748 	/* get sync CR lock on no-new-dev. */
749 	if (dlm_lock_sync(cinfo->no_new_dev_lockres, DLM_LOCK_CR))
750 		pr_err("md-cluster: failed to get a sync CR lock on no-new-dev!(%d)\n", ret);
751 
752 
753 	pr_info("md-cluster: Joined cluster %s slot %d\n", str, cinfo->slot_number);
754 	snprintf(str, 64, "bitmap%04d", cinfo->slot_number - 1);
755 	cinfo->bitmap_lockres = lockres_init(mddev, str, NULL, 1);
756 	if (!cinfo->bitmap_lockres)
757 		goto err;
758 	if (dlm_lock_sync(cinfo->bitmap_lockres, DLM_LOCK_PW)) {
759 		pr_err("Failed to get bitmap lock\n");
760 		ret = -EINVAL;
761 		goto err;
762 	}
763 
764 	cinfo->resync_lockres = lockres_init(mddev, "resync", NULL, 0);
765 	if (!cinfo->resync_lockres)
766 		goto err;
767 
768 	ret = gather_all_resync_info(mddev, nodes);
769 	if (ret)
770 		goto err;
771 
772 	return 0;
773 err:
774 	lockres_free(cinfo->message_lockres);
775 	lockres_free(cinfo->token_lockres);
776 	lockres_free(cinfo->ack_lockres);
777 	lockres_free(cinfo->no_new_dev_lockres);
778 	lockres_free(cinfo->resync_lockres);
779 	lockres_free(cinfo->bitmap_lockres);
780 	if (cinfo->lockspace)
781 		dlm_release_lockspace(cinfo->lockspace, 2);
782 	mddev->cluster_info = NULL;
783 	kfree(cinfo);
784 	return ret;
785 }
786 
787 static void resync_bitmap(struct mddev *mddev)
788 {
789 	struct md_cluster_info *cinfo = mddev->cluster_info;
790 	struct cluster_msg cmsg = {0};
791 	int err;
792 
793 	cmsg.type = cpu_to_le32(BITMAP_NEEDS_SYNC);
794 	err = sendmsg(cinfo, &cmsg);
795 	if (err)
796 		pr_err("%s:%d: failed to send BITMAP_NEEDS_SYNC message (%d)\n",
797 			__func__, __LINE__, err);
798 }
799 
800 static int leave(struct mddev *mddev)
801 {
802 	struct md_cluster_info *cinfo = mddev->cluster_info;
803 
804 	if (!cinfo)
805 		return 0;
806 
807 	/* BITMAP_NEEDS_SYNC message should be sent when node
808 	 * is leaving the cluster with dirty bitmap, also we
809 	 * can only deliver it when dlm connection is available */
810 	if (cinfo->slot_number > 0 && mddev->recovery_cp != MaxSector)
811 		resync_bitmap(mddev);
812 
813 	md_unregister_thread(&cinfo->recovery_thread);
814 	md_unregister_thread(&cinfo->recv_thread);
815 	lockres_free(cinfo->message_lockres);
816 	lockres_free(cinfo->token_lockres);
817 	lockres_free(cinfo->ack_lockres);
818 	lockres_free(cinfo->no_new_dev_lockres);
819 	lockres_free(cinfo->bitmap_lockres);
820 	dlm_release_lockspace(cinfo->lockspace, 2);
821 	return 0;
822 }
823 
824 /* slot_number(): Returns the MD slot number to use
825  * DLM starts the slot numbers from 1, wheras cluster-md
826  * wants the number to be from zero, so we deduct one
827  */
828 static int slot_number(struct mddev *mddev)
829 {
830 	struct md_cluster_info *cinfo = mddev->cluster_info;
831 
832 	return cinfo->slot_number - 1;
833 }
834 
835 static int metadata_update_start(struct mddev *mddev)
836 {
837 	return lock_comm(mddev->cluster_info);
838 }
839 
840 static int metadata_update_finish(struct mddev *mddev)
841 {
842 	struct md_cluster_info *cinfo = mddev->cluster_info;
843 	struct cluster_msg cmsg;
844 	struct md_rdev *rdev;
845 	int ret = 0;
846 
847 	memset(&cmsg, 0, sizeof(cmsg));
848 	cmsg.type = cpu_to_le32(METADATA_UPDATED);
849 	cmsg.raid_slot = -1;
850 	/* Pick up a good active device number to send.
851 	 */
852 	rdev_for_each(rdev, mddev)
853 		if (rdev->raid_disk > -1 && !test_bit(Faulty, &rdev->flags)) {
854 			cmsg.raid_slot = cpu_to_le32(rdev->desc_nr);
855 			break;
856 		}
857 	if (cmsg.raid_slot >= 0)
858 		ret = __sendmsg(cinfo, &cmsg);
859 	else
860 		pr_warn("md-cluster: No good device id found to send\n");
861 	unlock_comm(cinfo);
862 	return ret;
863 }
864 
865 static int metadata_update_cancel(struct mddev *mddev)
866 {
867 	struct md_cluster_info *cinfo = mddev->cluster_info;
868 
869 	return dlm_unlock_sync(cinfo->token_lockres);
870 }
871 
872 static int resync_start(struct mddev *mddev)
873 {
874 	struct md_cluster_info *cinfo = mddev->cluster_info;
875 	cinfo->resync_lockres->flags |= DLM_LKF_NOQUEUE;
876 	return dlm_lock_sync(cinfo->resync_lockres, DLM_LOCK_EX);
877 }
878 
879 static int resync_info_update(struct mddev *mddev, sector_t lo, sector_t hi)
880 {
881 	struct md_cluster_info *cinfo = mddev->cluster_info;
882 	struct cluster_msg cmsg;
883 	int slot = cinfo->slot_number - 1;
884 
885 	add_resync_info(mddev, cinfo->bitmap_lockres, lo, hi);
886 	/* Re-acquire the lock to refresh LVB */
887 	dlm_lock_sync(cinfo->bitmap_lockres, DLM_LOCK_PW);
888 	cmsg.type = cpu_to_le32(RESYNCING);
889 	cmsg.slot = cpu_to_le32(slot);
890 	cmsg.low = cpu_to_le64(lo);
891 	cmsg.high = cpu_to_le64(hi);
892 
893 	return sendmsg(cinfo, &cmsg);
894 }
895 
896 static int resync_finish(struct mddev *mddev)
897 {
898 	struct md_cluster_info *cinfo = mddev->cluster_info;
899 	cinfo->resync_lockres->flags &= ~DLM_LKF_NOQUEUE;
900 	dlm_unlock_sync(cinfo->resync_lockres);
901 	return resync_info_update(mddev, 0, 0);
902 }
903 
904 static int area_resyncing(struct mddev *mddev, int direction,
905 		sector_t lo, sector_t hi)
906 {
907 	struct md_cluster_info *cinfo = mddev->cluster_info;
908 	int ret = 0;
909 	struct suspend_info *s;
910 
911 	if ((direction == READ) &&
912 		test_bit(MD_CLUSTER_SUSPEND_READ_BALANCING, &cinfo->state))
913 		return 1;
914 
915 	spin_lock_irq(&cinfo->suspend_lock);
916 	if (list_empty(&cinfo->suspend_list))
917 		goto out;
918 	list_for_each_entry(s, &cinfo->suspend_list, list)
919 		if (hi > s->lo && lo < s->hi) {
920 			ret = 1;
921 			break;
922 		}
923 out:
924 	spin_unlock_irq(&cinfo->suspend_lock);
925 	return ret;
926 }
927 
928 static int add_new_disk_start(struct mddev *mddev, struct md_rdev *rdev)
929 {
930 	struct md_cluster_info *cinfo = mddev->cluster_info;
931 	struct cluster_msg cmsg;
932 	int ret = 0;
933 	struct mdp_superblock_1 *sb = page_address(rdev->sb_page);
934 	char *uuid = sb->device_uuid;
935 
936 	memset(&cmsg, 0, sizeof(cmsg));
937 	cmsg.type = cpu_to_le32(NEWDISK);
938 	memcpy(cmsg.uuid, uuid, 16);
939 	cmsg.raid_slot = rdev->desc_nr;
940 	lock_comm(cinfo);
941 	ret = __sendmsg(cinfo, &cmsg);
942 	if (ret)
943 		return ret;
944 	cinfo->no_new_dev_lockres->flags |= DLM_LKF_NOQUEUE;
945 	ret = dlm_lock_sync(cinfo->no_new_dev_lockres, DLM_LOCK_EX);
946 	cinfo->no_new_dev_lockres->flags &= ~DLM_LKF_NOQUEUE;
947 	/* Some node does not "see" the device */
948 	if (ret == -EAGAIN)
949 		ret = -ENOENT;
950 	else
951 		dlm_lock_sync(cinfo->no_new_dev_lockres, DLM_LOCK_CR);
952 	return ret;
953 }
954 
955 static int add_new_disk_finish(struct mddev *mddev)
956 {
957 	/* Write sb and inform others */
958 	md_update_sb(mddev, 1);
959 	return metadata_update_finish(mddev);
960 }
961 
962 static int new_disk_ack(struct mddev *mddev, bool ack)
963 {
964 	struct md_cluster_info *cinfo = mddev->cluster_info;
965 
966 	if (!test_bit(MD_CLUSTER_WAITING_FOR_NEWDISK, &cinfo->state)) {
967 		pr_warn("md-cluster(%s): Spurious cluster confirmation\n", mdname(mddev));
968 		return -EINVAL;
969 	}
970 
971 	if (ack)
972 		dlm_unlock_sync(cinfo->no_new_dev_lockres);
973 	complete(&cinfo->newdisk_completion);
974 	return 0;
975 }
976 
977 static int remove_disk(struct mddev *mddev, struct md_rdev *rdev)
978 {
979 	struct cluster_msg cmsg;
980 	struct md_cluster_info *cinfo = mddev->cluster_info;
981 	cmsg.type = REMOVE;
982 	cmsg.raid_slot = rdev->desc_nr;
983 	return __sendmsg(cinfo, &cmsg);
984 }
985 
986 static int gather_bitmaps(struct md_rdev *rdev)
987 {
988 	int sn, err;
989 	sector_t lo, hi;
990 	struct cluster_msg cmsg;
991 	struct mddev *mddev = rdev->mddev;
992 	struct md_cluster_info *cinfo = mddev->cluster_info;
993 
994 	cmsg.type = RE_ADD;
995 	cmsg.raid_slot = rdev->desc_nr;
996 	err = sendmsg(cinfo, &cmsg);
997 	if (err)
998 		goto out;
999 
1000 	for (sn = 0; sn < mddev->bitmap_info.nodes; sn++) {
1001 		if (sn == (cinfo->slot_number - 1))
1002 			continue;
1003 		err = bitmap_copy_from_slot(mddev, sn, &lo, &hi, false);
1004 		if (err) {
1005 			pr_warn("md-cluster: Could not gather bitmaps from slot %d", sn);
1006 			goto out;
1007 		}
1008 		if ((hi > 0) && (lo < mddev->recovery_cp))
1009 			mddev->recovery_cp = lo;
1010 	}
1011 out:
1012 	return err;
1013 }
1014 
1015 static struct md_cluster_operations cluster_ops = {
1016 	.join   = join,
1017 	.leave  = leave,
1018 	.slot_number = slot_number,
1019 	.resync_start = resync_start,
1020 	.resync_finish = resync_finish,
1021 	.resync_info_update = resync_info_update,
1022 	.metadata_update_start = metadata_update_start,
1023 	.metadata_update_finish = metadata_update_finish,
1024 	.metadata_update_cancel = metadata_update_cancel,
1025 	.area_resyncing = area_resyncing,
1026 	.add_new_disk_start = add_new_disk_start,
1027 	.add_new_disk_finish = add_new_disk_finish,
1028 	.new_disk_ack = new_disk_ack,
1029 	.remove_disk = remove_disk,
1030 	.gather_bitmaps = gather_bitmaps,
1031 };
1032 
1033 static int __init cluster_init(void)
1034 {
1035 	pr_warn("md-cluster: EXPERIMENTAL. Use with caution\n");
1036 	pr_info("Registering Cluster MD functions\n");
1037 	register_md_cluster_operations(&cluster_ops, THIS_MODULE);
1038 	return 0;
1039 }
1040 
1041 static void cluster_exit(void)
1042 {
1043 	unregister_md_cluster_operations();
1044 }
1045 
1046 module_init(cluster_init);
1047 module_exit(cluster_exit);
1048 MODULE_LICENSE("GPL");
1049 MODULE_DESCRIPTION("Clustering support for MD");
1050