xref: /openbmc/linux/drivers/md/md-cluster.c (revision 0ba95977)
1 /*
2  * Copyright (C) 2015, SUSE
3  *
4  * This program is free software; you can redistribute it and/or modify
5  * it under the terms of the GNU General Public License as published by
6  * the Free Software Foundation; either version 2, or (at your option)
7  * any later version.
8  *
9  */
10 
11 
12 #include <linux/module.h>
13 #include <linux/kthread.h>
14 #include <linux/dlm.h>
15 #include <linux/sched.h>
16 #include <linux/raid/md_p.h>
17 #include "md.h"
18 #include "bitmap.h"
19 #include "md-cluster.h"
20 
21 #define LVB_SIZE	64
22 #define NEW_DEV_TIMEOUT 5000
23 
24 struct dlm_lock_resource {
25 	dlm_lockspace_t *ls;
26 	struct dlm_lksb lksb;
27 	char *name; /* lock name. */
28 	uint32_t flags; /* flags to pass to dlm_lock() */
29 	wait_queue_head_t sync_locking; /* wait queue for synchronized locking */
30 	bool sync_locking_done;
31 	void (*bast)(void *arg, int mode); /* blocking AST function pointer*/
32 	struct mddev *mddev; /* pointing back to mddev. */
33 	int mode;
34 };
35 
36 struct suspend_info {
37 	int slot;
38 	sector_t lo;
39 	sector_t hi;
40 	struct list_head list;
41 };
42 
43 struct resync_info {
44 	__le64 lo;
45 	__le64 hi;
46 };
47 
48 /* md_cluster_info flags */
49 #define		MD_CLUSTER_WAITING_FOR_NEWDISK		1
50 #define		MD_CLUSTER_SUSPEND_READ_BALANCING	2
51 #define		MD_CLUSTER_BEGIN_JOIN_CLUSTER		3
52 
53 /* Lock the send communication. This is done through
54  * bit manipulation as opposed to a mutex in order to
55  * accomodate lock and hold. See next comment.
56  */
57 #define		MD_CLUSTER_SEND_LOCK			4
58 /* If cluster operations (such as adding a disk) must lock the
59  * communication channel, so as to perform extra operations
60  * (update metadata) and no other operation is allowed on the
61  * MD. Token needs to be locked and held until the operation
62  * completes witha md_update_sb(), which would eventually release
63  * the lock.
64  */
65 #define		MD_CLUSTER_SEND_LOCKED_ALREADY		5
66 /* We should receive message after node joined cluster and
67  * set up all the related infos such as bitmap and personality */
68 #define		MD_CLUSTER_ALREADY_IN_CLUSTER		6
69 #define		MD_CLUSTER_PENDING_RECV_EVENT		7
70 #define 	MD_CLUSTER_HOLDING_MUTEX_FOR_RECVD		8
71 
72 struct md_cluster_info {
73 	struct mddev *mddev; /* the md device which md_cluster_info belongs to */
74 	/* dlm lock space and resources for clustered raid. */
75 	dlm_lockspace_t *lockspace;
76 	int slot_number;
77 	struct completion completion;
78 	struct mutex recv_mutex;
79 	struct dlm_lock_resource *bitmap_lockres;
80 	struct dlm_lock_resource **other_bitmap_lockres;
81 	struct dlm_lock_resource *resync_lockres;
82 	struct list_head suspend_list;
83 	spinlock_t suspend_lock;
84 	struct md_thread *recovery_thread;
85 	unsigned long recovery_map;
86 	/* communication loc resources */
87 	struct dlm_lock_resource *ack_lockres;
88 	struct dlm_lock_resource *message_lockres;
89 	struct dlm_lock_resource *token_lockres;
90 	struct dlm_lock_resource *no_new_dev_lockres;
91 	struct md_thread *recv_thread;
92 	struct completion newdisk_completion;
93 	wait_queue_head_t wait;
94 	unsigned long state;
95 	/* record the region in RESYNCING message */
96 	sector_t sync_low;
97 	sector_t sync_hi;
98 };
99 
100 enum msg_type {
101 	METADATA_UPDATED = 0,
102 	RESYNCING,
103 	NEWDISK,
104 	REMOVE,
105 	RE_ADD,
106 	BITMAP_NEEDS_SYNC,
107 };
108 
109 struct cluster_msg {
110 	__le32 type;
111 	__le32 slot;
112 	/* TODO: Unionize this for smaller footprint */
113 	__le64 low;
114 	__le64 high;
115 	char uuid[16];
116 	__le32 raid_slot;
117 };
118 
119 static void sync_ast(void *arg)
120 {
121 	struct dlm_lock_resource *res;
122 
123 	res = arg;
124 	res->sync_locking_done = true;
125 	wake_up(&res->sync_locking);
126 }
127 
128 static int dlm_lock_sync(struct dlm_lock_resource *res, int mode)
129 {
130 	int ret = 0;
131 
132 	ret = dlm_lock(res->ls, mode, &res->lksb,
133 			res->flags, res->name, strlen(res->name),
134 			0, sync_ast, res, res->bast);
135 	if (ret)
136 		return ret;
137 	wait_event(res->sync_locking, res->sync_locking_done);
138 	res->sync_locking_done = false;
139 	if (res->lksb.sb_status == 0)
140 		res->mode = mode;
141 	return res->lksb.sb_status;
142 }
143 
144 static int dlm_unlock_sync(struct dlm_lock_resource *res)
145 {
146 	return dlm_lock_sync(res, DLM_LOCK_NL);
147 }
148 
149 /*
150  * An variation of dlm_lock_sync, which make lock request could
151  * be interrupted
152  */
153 static int dlm_lock_sync_interruptible(struct dlm_lock_resource *res, int mode,
154 				       struct mddev *mddev)
155 {
156 	int ret = 0;
157 
158 	ret = dlm_lock(res->ls, mode, &res->lksb,
159 			res->flags, res->name, strlen(res->name),
160 			0, sync_ast, res, res->bast);
161 	if (ret)
162 		return ret;
163 
164 	wait_event(res->sync_locking, res->sync_locking_done
165 				      || kthread_should_stop()
166 				      || test_bit(MD_CLOSING, &mddev->flags));
167 	if (!res->sync_locking_done) {
168 		/*
169 		 * the convert queue contains the lock request when request is
170 		 * interrupted, and sync_ast could still be run, so need to
171 		 * cancel the request and reset completion
172 		 */
173 		ret = dlm_unlock(res->ls, res->lksb.sb_lkid, DLM_LKF_CANCEL,
174 			&res->lksb, res);
175 		res->sync_locking_done = false;
176 		if (unlikely(ret != 0))
177 			pr_info("failed to cancel previous lock request "
178 				 "%s return %d\n", res->name, ret);
179 		return -EPERM;
180 	} else
181 		res->sync_locking_done = false;
182 	if (res->lksb.sb_status == 0)
183 		res->mode = mode;
184 	return res->lksb.sb_status;
185 }
186 
187 static struct dlm_lock_resource *lockres_init(struct mddev *mddev,
188 		char *name, void (*bastfn)(void *arg, int mode), int with_lvb)
189 {
190 	struct dlm_lock_resource *res = NULL;
191 	int ret, namelen;
192 	struct md_cluster_info *cinfo = mddev->cluster_info;
193 
194 	res = kzalloc(sizeof(struct dlm_lock_resource), GFP_KERNEL);
195 	if (!res)
196 		return NULL;
197 	init_waitqueue_head(&res->sync_locking);
198 	res->sync_locking_done = false;
199 	res->ls = cinfo->lockspace;
200 	res->mddev = mddev;
201 	res->mode = DLM_LOCK_IV;
202 	namelen = strlen(name);
203 	res->name = kzalloc(namelen + 1, GFP_KERNEL);
204 	if (!res->name) {
205 		pr_err("md-cluster: Unable to allocate resource name for resource %s\n", name);
206 		goto out_err;
207 	}
208 	strlcpy(res->name, name, namelen + 1);
209 	if (with_lvb) {
210 		res->lksb.sb_lvbptr = kzalloc(LVB_SIZE, GFP_KERNEL);
211 		if (!res->lksb.sb_lvbptr) {
212 			pr_err("md-cluster: Unable to allocate LVB for resource %s\n", name);
213 			goto out_err;
214 		}
215 		res->flags = DLM_LKF_VALBLK;
216 	}
217 
218 	if (bastfn)
219 		res->bast = bastfn;
220 
221 	res->flags |= DLM_LKF_EXPEDITE;
222 
223 	ret = dlm_lock_sync(res, DLM_LOCK_NL);
224 	if (ret) {
225 		pr_err("md-cluster: Unable to lock NL on new lock resource %s\n", name);
226 		goto out_err;
227 	}
228 	res->flags &= ~DLM_LKF_EXPEDITE;
229 	res->flags |= DLM_LKF_CONVERT;
230 
231 	return res;
232 out_err:
233 	kfree(res->lksb.sb_lvbptr);
234 	kfree(res->name);
235 	kfree(res);
236 	return NULL;
237 }
238 
239 static void lockres_free(struct dlm_lock_resource *res)
240 {
241 	int ret = 0;
242 
243 	if (!res)
244 		return;
245 
246 	/*
247 	 * use FORCEUNLOCK flag, so we can unlock even the lock is on the
248 	 * waiting or convert queue
249 	 */
250 	ret = dlm_unlock(res->ls, res->lksb.sb_lkid, DLM_LKF_FORCEUNLOCK,
251 		&res->lksb, res);
252 	if (unlikely(ret != 0))
253 		pr_err("failed to unlock %s return %d\n", res->name, ret);
254 	else
255 		wait_event(res->sync_locking, res->sync_locking_done);
256 
257 	kfree(res->name);
258 	kfree(res->lksb.sb_lvbptr);
259 	kfree(res);
260 }
261 
262 static void add_resync_info(struct dlm_lock_resource *lockres,
263 			    sector_t lo, sector_t hi)
264 {
265 	struct resync_info *ri;
266 
267 	ri = (struct resync_info *)lockres->lksb.sb_lvbptr;
268 	ri->lo = cpu_to_le64(lo);
269 	ri->hi = cpu_to_le64(hi);
270 }
271 
272 static struct suspend_info *read_resync_info(struct mddev *mddev, struct dlm_lock_resource *lockres)
273 {
274 	struct resync_info ri;
275 	struct suspend_info *s = NULL;
276 	sector_t hi = 0;
277 
278 	dlm_lock_sync(lockres, DLM_LOCK_CR);
279 	memcpy(&ri, lockres->lksb.sb_lvbptr, sizeof(struct resync_info));
280 	hi = le64_to_cpu(ri.hi);
281 	if (hi > 0) {
282 		s = kzalloc(sizeof(struct suspend_info), GFP_KERNEL);
283 		if (!s)
284 			goto out;
285 		s->hi = hi;
286 		s->lo = le64_to_cpu(ri.lo);
287 	}
288 	dlm_unlock_sync(lockres);
289 out:
290 	return s;
291 }
292 
293 static void recover_bitmaps(struct md_thread *thread)
294 {
295 	struct mddev *mddev = thread->mddev;
296 	struct md_cluster_info *cinfo = mddev->cluster_info;
297 	struct dlm_lock_resource *bm_lockres;
298 	char str[64];
299 	int slot, ret;
300 	struct suspend_info *s, *tmp;
301 	sector_t lo, hi;
302 
303 	while (cinfo->recovery_map) {
304 		slot = fls64((u64)cinfo->recovery_map) - 1;
305 
306 		/* Clear suspend_area associated with the bitmap */
307 		spin_lock_irq(&cinfo->suspend_lock);
308 		list_for_each_entry_safe(s, tmp, &cinfo->suspend_list, list)
309 			if (slot == s->slot) {
310 				list_del(&s->list);
311 				kfree(s);
312 			}
313 		spin_unlock_irq(&cinfo->suspend_lock);
314 
315 		snprintf(str, 64, "bitmap%04d", slot);
316 		bm_lockres = lockres_init(mddev, str, NULL, 1);
317 		if (!bm_lockres) {
318 			pr_err("md-cluster: Cannot initialize bitmaps\n");
319 			goto clear_bit;
320 		}
321 
322 		ret = dlm_lock_sync_interruptible(bm_lockres, DLM_LOCK_PW, mddev);
323 		if (ret) {
324 			pr_err("md-cluster: Could not DLM lock %s: %d\n",
325 					str, ret);
326 			goto clear_bit;
327 		}
328 		ret = bitmap_copy_from_slot(mddev, slot, &lo, &hi, true);
329 		if (ret) {
330 			pr_err("md-cluster: Could not copy data from bitmap %d\n", slot);
331 			goto clear_bit;
332 		}
333 		if (hi > 0) {
334 			if (lo < mddev->recovery_cp)
335 				mddev->recovery_cp = lo;
336 			/* wake up thread to continue resync in case resync
337 			 * is not finished */
338 			if (mddev->recovery_cp != MaxSector) {
339 			    set_bit(MD_RECOVERY_NEEDED, &mddev->recovery);
340 			    md_wakeup_thread(mddev->thread);
341 			}
342 		}
343 clear_bit:
344 		lockres_free(bm_lockres);
345 		clear_bit(slot, &cinfo->recovery_map);
346 	}
347 }
348 
349 static void recover_prep(void *arg)
350 {
351 	struct mddev *mddev = arg;
352 	struct md_cluster_info *cinfo = mddev->cluster_info;
353 	set_bit(MD_CLUSTER_SUSPEND_READ_BALANCING, &cinfo->state);
354 }
355 
356 static void __recover_slot(struct mddev *mddev, int slot)
357 {
358 	struct md_cluster_info *cinfo = mddev->cluster_info;
359 
360 	set_bit(slot, &cinfo->recovery_map);
361 	if (!cinfo->recovery_thread) {
362 		cinfo->recovery_thread = md_register_thread(recover_bitmaps,
363 				mddev, "recover");
364 		if (!cinfo->recovery_thread) {
365 			pr_warn("md-cluster: Could not create recovery thread\n");
366 			return;
367 		}
368 	}
369 	md_wakeup_thread(cinfo->recovery_thread);
370 }
371 
372 static void recover_slot(void *arg, struct dlm_slot *slot)
373 {
374 	struct mddev *mddev = arg;
375 	struct md_cluster_info *cinfo = mddev->cluster_info;
376 
377 	pr_info("md-cluster: %s Node %d/%d down. My slot: %d. Initiating recovery.\n",
378 			mddev->bitmap_info.cluster_name,
379 			slot->nodeid, slot->slot,
380 			cinfo->slot_number);
381 	/* deduct one since dlm slot starts from one while the num of
382 	 * cluster-md begins with 0 */
383 	__recover_slot(mddev, slot->slot - 1);
384 }
385 
386 static void recover_done(void *arg, struct dlm_slot *slots,
387 		int num_slots, int our_slot,
388 		uint32_t generation)
389 {
390 	struct mddev *mddev = arg;
391 	struct md_cluster_info *cinfo = mddev->cluster_info;
392 
393 	cinfo->slot_number = our_slot;
394 	/* completion is only need to be complete when node join cluster,
395 	 * it doesn't need to run during another node's failure */
396 	if (test_bit(MD_CLUSTER_BEGIN_JOIN_CLUSTER, &cinfo->state)) {
397 		complete(&cinfo->completion);
398 		clear_bit(MD_CLUSTER_BEGIN_JOIN_CLUSTER, &cinfo->state);
399 	}
400 	clear_bit(MD_CLUSTER_SUSPEND_READ_BALANCING, &cinfo->state);
401 }
402 
403 /* the ops is called when node join the cluster, and do lock recovery
404  * if node failure occurs */
405 static const struct dlm_lockspace_ops md_ls_ops = {
406 	.recover_prep = recover_prep,
407 	.recover_slot = recover_slot,
408 	.recover_done = recover_done,
409 };
410 
411 /*
412  * The BAST function for the ack lock resource
413  * This function wakes up the receive thread in
414  * order to receive and process the message.
415  */
416 static void ack_bast(void *arg, int mode)
417 {
418 	struct dlm_lock_resource *res = arg;
419 	struct md_cluster_info *cinfo = res->mddev->cluster_info;
420 
421 	if (mode == DLM_LOCK_EX) {
422 		if (test_bit(MD_CLUSTER_ALREADY_IN_CLUSTER, &cinfo->state))
423 			md_wakeup_thread(cinfo->recv_thread);
424 		else
425 			set_bit(MD_CLUSTER_PENDING_RECV_EVENT, &cinfo->state);
426 	}
427 }
428 
429 static void __remove_suspend_info(struct md_cluster_info *cinfo, int slot)
430 {
431 	struct suspend_info *s, *tmp;
432 
433 	list_for_each_entry_safe(s, tmp, &cinfo->suspend_list, list)
434 		if (slot == s->slot) {
435 			list_del(&s->list);
436 			kfree(s);
437 			break;
438 		}
439 }
440 
441 static void remove_suspend_info(struct mddev *mddev, int slot)
442 {
443 	struct md_cluster_info *cinfo = mddev->cluster_info;
444 	spin_lock_irq(&cinfo->suspend_lock);
445 	__remove_suspend_info(cinfo, slot);
446 	spin_unlock_irq(&cinfo->suspend_lock);
447 	mddev->pers->quiesce(mddev, 2);
448 }
449 
450 
451 static void process_suspend_info(struct mddev *mddev,
452 		int slot, sector_t lo, sector_t hi)
453 {
454 	struct md_cluster_info *cinfo = mddev->cluster_info;
455 	struct suspend_info *s;
456 
457 	if (!hi) {
458 		remove_suspend_info(mddev, slot);
459 		set_bit(MD_RECOVERY_NEEDED, &mddev->recovery);
460 		md_wakeup_thread(mddev->thread);
461 		return;
462 	}
463 
464 	/*
465 	 * The bitmaps are not same for different nodes
466 	 * if RESYNCING is happening in one node, then
467 	 * the node which received the RESYNCING message
468 	 * probably will perform resync with the region
469 	 * [lo, hi] again, so we could reduce resync time
470 	 * a lot if we can ensure that the bitmaps among
471 	 * different nodes are match up well.
472 	 *
473 	 * sync_low/hi is used to record the region which
474 	 * arrived in the previous RESYNCING message,
475 	 *
476 	 * Call bitmap_sync_with_cluster to clear
477 	 * NEEDED_MASK and set RESYNC_MASK since
478 	 * resync thread is running in another node,
479 	 * so we don't need to do the resync again
480 	 * with the same section */
481 	bitmap_sync_with_cluster(mddev, cinfo->sync_low,
482 					cinfo->sync_hi,
483 					lo, hi);
484 	cinfo->sync_low = lo;
485 	cinfo->sync_hi = hi;
486 
487 	s = kzalloc(sizeof(struct suspend_info), GFP_KERNEL);
488 	if (!s)
489 		return;
490 	s->slot = slot;
491 	s->lo = lo;
492 	s->hi = hi;
493 	mddev->pers->quiesce(mddev, 1);
494 	mddev->pers->quiesce(mddev, 0);
495 	spin_lock_irq(&cinfo->suspend_lock);
496 	/* Remove existing entry (if exists) before adding */
497 	__remove_suspend_info(cinfo, slot);
498 	list_add(&s->list, &cinfo->suspend_list);
499 	spin_unlock_irq(&cinfo->suspend_lock);
500 	mddev->pers->quiesce(mddev, 2);
501 }
502 
503 static void process_add_new_disk(struct mddev *mddev, struct cluster_msg *cmsg)
504 {
505 	char disk_uuid[64];
506 	struct md_cluster_info *cinfo = mddev->cluster_info;
507 	char event_name[] = "EVENT=ADD_DEVICE";
508 	char raid_slot[16];
509 	char *envp[] = {event_name, disk_uuid, raid_slot, NULL};
510 	int len;
511 
512 	len = snprintf(disk_uuid, 64, "DEVICE_UUID=");
513 	sprintf(disk_uuid + len, "%pU", cmsg->uuid);
514 	snprintf(raid_slot, 16, "RAID_DISK=%d", le32_to_cpu(cmsg->raid_slot));
515 	pr_info("%s:%d Sending kobject change with %s and %s\n", __func__, __LINE__, disk_uuid, raid_slot);
516 	init_completion(&cinfo->newdisk_completion);
517 	set_bit(MD_CLUSTER_WAITING_FOR_NEWDISK, &cinfo->state);
518 	kobject_uevent_env(&disk_to_dev(mddev->gendisk)->kobj, KOBJ_CHANGE, envp);
519 	wait_for_completion_timeout(&cinfo->newdisk_completion,
520 			NEW_DEV_TIMEOUT);
521 	clear_bit(MD_CLUSTER_WAITING_FOR_NEWDISK, &cinfo->state);
522 }
523 
524 
525 static void process_metadata_update(struct mddev *mddev, struct cluster_msg *msg)
526 {
527 	int got_lock = 0;
528 	struct md_cluster_info *cinfo = mddev->cluster_info;
529 	mddev->good_device_nr = le32_to_cpu(msg->raid_slot);
530 
531 	dlm_lock_sync(cinfo->no_new_dev_lockres, DLM_LOCK_CR);
532 	wait_event(mddev->thread->wqueue,
533 		   (got_lock = mddev_trylock(mddev)) ||
534 		    test_bit(MD_CLUSTER_HOLDING_MUTEX_FOR_RECVD, &cinfo->state));
535 	md_reload_sb(mddev, mddev->good_device_nr);
536 	if (got_lock)
537 		mddev_unlock(mddev);
538 }
539 
540 static void process_remove_disk(struct mddev *mddev, struct cluster_msg *msg)
541 {
542 	struct md_rdev *rdev;
543 
544 	rcu_read_lock();
545 	rdev = md_find_rdev_nr_rcu(mddev, le32_to_cpu(msg->raid_slot));
546 	if (rdev) {
547 		set_bit(ClusterRemove, &rdev->flags);
548 		set_bit(MD_RECOVERY_NEEDED, &mddev->recovery);
549 		md_wakeup_thread(mddev->thread);
550 	}
551 	else
552 		pr_warn("%s: %d Could not find disk(%d) to REMOVE\n",
553 			__func__, __LINE__, le32_to_cpu(msg->raid_slot));
554 	rcu_read_unlock();
555 }
556 
557 static void process_readd_disk(struct mddev *mddev, struct cluster_msg *msg)
558 {
559 	struct md_rdev *rdev;
560 
561 	rcu_read_lock();
562 	rdev = md_find_rdev_nr_rcu(mddev, le32_to_cpu(msg->raid_slot));
563 	if (rdev && test_bit(Faulty, &rdev->flags))
564 		clear_bit(Faulty, &rdev->flags);
565 	else
566 		pr_warn("%s: %d Could not find disk(%d) which is faulty",
567 			__func__, __LINE__, le32_to_cpu(msg->raid_slot));
568 	rcu_read_unlock();
569 }
570 
571 static int process_recvd_msg(struct mddev *mddev, struct cluster_msg *msg)
572 {
573 	int ret = 0;
574 
575 	if (WARN(mddev->cluster_info->slot_number - 1 == le32_to_cpu(msg->slot),
576 		"node %d received it's own msg\n", le32_to_cpu(msg->slot)))
577 		return -1;
578 	switch (le32_to_cpu(msg->type)) {
579 	case METADATA_UPDATED:
580 		process_metadata_update(mddev, msg);
581 		break;
582 	case RESYNCING:
583 		process_suspend_info(mddev, le32_to_cpu(msg->slot),
584 				     le64_to_cpu(msg->low),
585 				     le64_to_cpu(msg->high));
586 		break;
587 	case NEWDISK:
588 		process_add_new_disk(mddev, msg);
589 		break;
590 	case REMOVE:
591 		process_remove_disk(mddev, msg);
592 		break;
593 	case RE_ADD:
594 		process_readd_disk(mddev, msg);
595 		break;
596 	case BITMAP_NEEDS_SYNC:
597 		__recover_slot(mddev, le32_to_cpu(msg->slot));
598 		break;
599 	default:
600 		ret = -1;
601 		pr_warn("%s:%d Received unknown message from %d\n",
602 			__func__, __LINE__, msg->slot);
603 	}
604 	return ret;
605 }
606 
607 /*
608  * thread for receiving message
609  */
610 static void recv_daemon(struct md_thread *thread)
611 {
612 	struct md_cluster_info *cinfo = thread->mddev->cluster_info;
613 	struct dlm_lock_resource *ack_lockres = cinfo->ack_lockres;
614 	struct dlm_lock_resource *message_lockres = cinfo->message_lockres;
615 	struct cluster_msg msg;
616 	int ret;
617 
618 	mutex_lock(&cinfo->recv_mutex);
619 	/*get CR on Message*/
620 	if (dlm_lock_sync(message_lockres, DLM_LOCK_CR)) {
621 		pr_err("md/raid1:failed to get CR on MESSAGE\n");
622 		mutex_unlock(&cinfo->recv_mutex);
623 		return;
624 	}
625 
626 	/* read lvb and wake up thread to process this message_lockres */
627 	memcpy(&msg, message_lockres->lksb.sb_lvbptr, sizeof(struct cluster_msg));
628 	ret = process_recvd_msg(thread->mddev, &msg);
629 	if (ret)
630 		goto out;
631 
632 	/*release CR on ack_lockres*/
633 	ret = dlm_unlock_sync(ack_lockres);
634 	if (unlikely(ret != 0))
635 		pr_info("unlock ack failed return %d\n", ret);
636 	/*up-convert to PR on message_lockres*/
637 	ret = dlm_lock_sync(message_lockres, DLM_LOCK_PR);
638 	if (unlikely(ret != 0))
639 		pr_info("lock PR on msg failed return %d\n", ret);
640 	/*get CR on ack_lockres again*/
641 	ret = dlm_lock_sync(ack_lockres, DLM_LOCK_CR);
642 	if (unlikely(ret != 0))
643 		pr_info("lock CR on ack failed return %d\n", ret);
644 out:
645 	/*release CR on message_lockres*/
646 	ret = dlm_unlock_sync(message_lockres);
647 	if (unlikely(ret != 0))
648 		pr_info("unlock msg failed return %d\n", ret);
649 	mutex_unlock(&cinfo->recv_mutex);
650 }
651 
652 /* lock_token()
653  * Takes the lock on the TOKEN lock resource so no other
654  * node can communicate while the operation is underway.
655  */
656 static int lock_token(struct md_cluster_info *cinfo, bool mddev_locked)
657 {
658 	int error, set_bit = 0;
659 	struct mddev *mddev = cinfo->mddev;
660 
661 	/*
662 	 * If resync thread run after raid1d thread, then process_metadata_update
663 	 * could not continue if raid1d held reconfig_mutex (and raid1d is blocked
664 	 * since another node already got EX on Token and waitting the EX of Ack),
665 	 * so let resync wake up thread in case flag is set.
666 	 */
667 	if (mddev_locked && !test_bit(MD_CLUSTER_HOLDING_MUTEX_FOR_RECVD,
668 				      &cinfo->state)) {
669 		error = test_and_set_bit_lock(MD_CLUSTER_HOLDING_MUTEX_FOR_RECVD,
670 					      &cinfo->state);
671 		WARN_ON_ONCE(error);
672 		md_wakeup_thread(mddev->thread);
673 		set_bit = 1;
674 	}
675 	error = dlm_lock_sync(cinfo->token_lockres, DLM_LOCK_EX);
676 	if (set_bit)
677 		clear_bit_unlock(MD_CLUSTER_HOLDING_MUTEX_FOR_RECVD, &cinfo->state);
678 
679 	if (error)
680 		pr_err("md-cluster(%s:%d): failed to get EX on TOKEN (%d)\n",
681 				__func__, __LINE__, error);
682 
683 	/* Lock the receive sequence */
684 	mutex_lock(&cinfo->recv_mutex);
685 	return error;
686 }
687 
688 /* lock_comm()
689  * Sets the MD_CLUSTER_SEND_LOCK bit to lock the send channel.
690  */
691 static int lock_comm(struct md_cluster_info *cinfo, bool mddev_locked)
692 {
693 	wait_event(cinfo->wait,
694 		   !test_and_set_bit(MD_CLUSTER_SEND_LOCK, &cinfo->state));
695 
696 	return lock_token(cinfo, mddev_locked);
697 }
698 
699 static void unlock_comm(struct md_cluster_info *cinfo)
700 {
701 	WARN_ON(cinfo->token_lockres->mode != DLM_LOCK_EX);
702 	mutex_unlock(&cinfo->recv_mutex);
703 	dlm_unlock_sync(cinfo->token_lockres);
704 	clear_bit(MD_CLUSTER_SEND_LOCK, &cinfo->state);
705 	wake_up(&cinfo->wait);
706 }
707 
708 /* __sendmsg()
709  * This function performs the actual sending of the message. This function is
710  * usually called after performing the encompassing operation
711  * The function:
712  * 1. Grabs the message lockresource in EX mode
713  * 2. Copies the message to the message LVB
714  * 3. Downconverts message lockresource to CW
715  * 4. Upconverts ack lock resource from CR to EX. This forces the BAST on other nodes
716  *    and the other nodes read the message. The thread will wait here until all other
717  *    nodes have released ack lock resource.
718  * 5. Downconvert ack lockresource to CR
719  */
720 static int __sendmsg(struct md_cluster_info *cinfo, struct cluster_msg *cmsg)
721 {
722 	int error;
723 	int slot = cinfo->slot_number - 1;
724 
725 	cmsg->slot = cpu_to_le32(slot);
726 	/*get EX on Message*/
727 	error = dlm_lock_sync(cinfo->message_lockres, DLM_LOCK_EX);
728 	if (error) {
729 		pr_err("md-cluster: failed to get EX on MESSAGE (%d)\n", error);
730 		goto failed_message;
731 	}
732 
733 	memcpy(cinfo->message_lockres->lksb.sb_lvbptr, (void *)cmsg,
734 			sizeof(struct cluster_msg));
735 	/*down-convert EX to CW on Message*/
736 	error = dlm_lock_sync(cinfo->message_lockres, DLM_LOCK_CW);
737 	if (error) {
738 		pr_err("md-cluster: failed to convert EX to CW on MESSAGE(%d)\n",
739 				error);
740 		goto failed_ack;
741 	}
742 
743 	/*up-convert CR to EX on Ack*/
744 	error = dlm_lock_sync(cinfo->ack_lockres, DLM_LOCK_EX);
745 	if (error) {
746 		pr_err("md-cluster: failed to convert CR to EX on ACK(%d)\n",
747 				error);
748 		goto failed_ack;
749 	}
750 
751 	/*down-convert EX to CR on Ack*/
752 	error = dlm_lock_sync(cinfo->ack_lockres, DLM_LOCK_CR);
753 	if (error) {
754 		pr_err("md-cluster: failed to convert EX to CR on ACK(%d)\n",
755 				error);
756 		goto failed_ack;
757 	}
758 
759 failed_ack:
760 	error = dlm_unlock_sync(cinfo->message_lockres);
761 	if (unlikely(error != 0)) {
762 		pr_err("md-cluster: failed convert to NL on MESSAGE(%d)\n",
763 			error);
764 		/* in case the message can't be released due to some reason */
765 		goto failed_ack;
766 	}
767 failed_message:
768 	return error;
769 }
770 
771 static int sendmsg(struct md_cluster_info *cinfo, struct cluster_msg *cmsg,
772 		   bool mddev_locked)
773 {
774 	int ret;
775 
776 	lock_comm(cinfo, mddev_locked);
777 	ret = __sendmsg(cinfo, cmsg);
778 	unlock_comm(cinfo);
779 	return ret;
780 }
781 
782 static int gather_all_resync_info(struct mddev *mddev, int total_slots)
783 {
784 	struct md_cluster_info *cinfo = mddev->cluster_info;
785 	int i, ret = 0;
786 	struct dlm_lock_resource *bm_lockres;
787 	struct suspend_info *s;
788 	char str[64];
789 	sector_t lo, hi;
790 
791 
792 	for (i = 0; i < total_slots; i++) {
793 		memset(str, '\0', 64);
794 		snprintf(str, 64, "bitmap%04d", i);
795 		bm_lockres = lockres_init(mddev, str, NULL, 1);
796 		if (!bm_lockres)
797 			return -ENOMEM;
798 		if (i == (cinfo->slot_number - 1)) {
799 			lockres_free(bm_lockres);
800 			continue;
801 		}
802 
803 		bm_lockres->flags |= DLM_LKF_NOQUEUE;
804 		ret = dlm_lock_sync(bm_lockres, DLM_LOCK_PW);
805 		if (ret == -EAGAIN) {
806 			s = read_resync_info(mddev, bm_lockres);
807 			if (s) {
808 				pr_info("%s:%d Resync[%llu..%llu] in progress on %d\n",
809 						__func__, __LINE__,
810 						(unsigned long long) s->lo,
811 						(unsigned long long) s->hi, i);
812 				spin_lock_irq(&cinfo->suspend_lock);
813 				s->slot = i;
814 				list_add(&s->list, &cinfo->suspend_list);
815 				spin_unlock_irq(&cinfo->suspend_lock);
816 			}
817 			ret = 0;
818 			lockres_free(bm_lockres);
819 			continue;
820 		}
821 		if (ret) {
822 			lockres_free(bm_lockres);
823 			goto out;
824 		}
825 
826 		/* Read the disk bitmap sb and check if it needs recovery */
827 		ret = bitmap_copy_from_slot(mddev, i, &lo, &hi, false);
828 		if (ret) {
829 			pr_warn("md-cluster: Could not gather bitmaps from slot %d", i);
830 			lockres_free(bm_lockres);
831 			continue;
832 		}
833 		if ((hi > 0) && (lo < mddev->recovery_cp)) {
834 			set_bit(MD_RECOVERY_NEEDED, &mddev->recovery);
835 			mddev->recovery_cp = lo;
836 			md_check_recovery(mddev);
837 		}
838 
839 		lockres_free(bm_lockres);
840 	}
841 out:
842 	return ret;
843 }
844 
845 static int join(struct mddev *mddev, int nodes)
846 {
847 	struct md_cluster_info *cinfo;
848 	int ret, ops_rv;
849 	char str[64];
850 
851 	cinfo = kzalloc(sizeof(struct md_cluster_info), GFP_KERNEL);
852 	if (!cinfo)
853 		return -ENOMEM;
854 
855 	INIT_LIST_HEAD(&cinfo->suspend_list);
856 	spin_lock_init(&cinfo->suspend_lock);
857 	init_completion(&cinfo->completion);
858 	set_bit(MD_CLUSTER_BEGIN_JOIN_CLUSTER, &cinfo->state);
859 	init_waitqueue_head(&cinfo->wait);
860 	mutex_init(&cinfo->recv_mutex);
861 
862 	mddev->cluster_info = cinfo;
863 	cinfo->mddev = mddev;
864 
865 	memset(str, 0, 64);
866 	sprintf(str, "%pU", mddev->uuid);
867 	ret = dlm_new_lockspace(str, mddev->bitmap_info.cluster_name,
868 				DLM_LSFL_FS, LVB_SIZE,
869 				&md_ls_ops, mddev, &ops_rv, &cinfo->lockspace);
870 	if (ret)
871 		goto err;
872 	wait_for_completion(&cinfo->completion);
873 	if (nodes < cinfo->slot_number) {
874 		pr_err("md-cluster: Slot allotted(%d) is greater than available slots(%d).",
875 			cinfo->slot_number, nodes);
876 		ret = -ERANGE;
877 		goto err;
878 	}
879 	/* Initiate the communication resources */
880 	ret = -ENOMEM;
881 	cinfo->recv_thread = md_register_thread(recv_daemon, mddev, "cluster_recv");
882 	if (!cinfo->recv_thread) {
883 		pr_err("md-cluster: cannot allocate memory for recv_thread!\n");
884 		goto err;
885 	}
886 	cinfo->message_lockres = lockres_init(mddev, "message", NULL, 1);
887 	if (!cinfo->message_lockres)
888 		goto err;
889 	cinfo->token_lockres = lockres_init(mddev, "token", NULL, 0);
890 	if (!cinfo->token_lockres)
891 		goto err;
892 	cinfo->no_new_dev_lockres = lockres_init(mddev, "no-new-dev", NULL, 0);
893 	if (!cinfo->no_new_dev_lockres)
894 		goto err;
895 
896 	ret = dlm_lock_sync(cinfo->token_lockres, DLM_LOCK_EX);
897 	if (ret) {
898 		ret = -EAGAIN;
899 		pr_err("md-cluster: can't join cluster to avoid lock issue\n");
900 		goto err;
901 	}
902 	cinfo->ack_lockres = lockres_init(mddev, "ack", ack_bast, 0);
903 	if (!cinfo->ack_lockres) {
904 		ret = -ENOMEM;
905 		goto err;
906 	}
907 	/* get sync CR lock on ACK. */
908 	if (dlm_lock_sync(cinfo->ack_lockres, DLM_LOCK_CR))
909 		pr_err("md-cluster: failed to get a sync CR lock on ACK!(%d)\n",
910 				ret);
911 	dlm_unlock_sync(cinfo->token_lockres);
912 	/* get sync CR lock on no-new-dev. */
913 	if (dlm_lock_sync(cinfo->no_new_dev_lockres, DLM_LOCK_CR))
914 		pr_err("md-cluster: failed to get a sync CR lock on no-new-dev!(%d)\n", ret);
915 
916 
917 	pr_info("md-cluster: Joined cluster %s slot %d\n", str, cinfo->slot_number);
918 	snprintf(str, 64, "bitmap%04d", cinfo->slot_number - 1);
919 	cinfo->bitmap_lockres = lockres_init(mddev, str, NULL, 1);
920 	if (!cinfo->bitmap_lockres) {
921 		ret = -ENOMEM;
922 		goto err;
923 	}
924 	if (dlm_lock_sync(cinfo->bitmap_lockres, DLM_LOCK_PW)) {
925 		pr_err("Failed to get bitmap lock\n");
926 		ret = -EINVAL;
927 		goto err;
928 	}
929 
930 	cinfo->resync_lockres = lockres_init(mddev, "resync", NULL, 0);
931 	if (!cinfo->resync_lockres) {
932 		ret = -ENOMEM;
933 		goto err;
934 	}
935 
936 	return 0;
937 err:
938 	set_bit(MD_CLUSTER_HOLDING_MUTEX_FOR_RECVD, &cinfo->state);
939 	md_unregister_thread(&cinfo->recovery_thread);
940 	md_unregister_thread(&cinfo->recv_thread);
941 	lockres_free(cinfo->message_lockres);
942 	lockres_free(cinfo->token_lockres);
943 	lockres_free(cinfo->ack_lockres);
944 	lockres_free(cinfo->no_new_dev_lockres);
945 	lockres_free(cinfo->resync_lockres);
946 	lockres_free(cinfo->bitmap_lockres);
947 	if (cinfo->lockspace)
948 		dlm_release_lockspace(cinfo->lockspace, 2);
949 	mddev->cluster_info = NULL;
950 	kfree(cinfo);
951 	return ret;
952 }
953 
954 static void load_bitmaps(struct mddev *mddev, int total_slots)
955 {
956 	struct md_cluster_info *cinfo = mddev->cluster_info;
957 
958 	/* load all the node's bitmap info for resync */
959 	if (gather_all_resync_info(mddev, total_slots))
960 		pr_err("md-cluster: failed to gather all resyn infos\n");
961 	set_bit(MD_CLUSTER_ALREADY_IN_CLUSTER, &cinfo->state);
962 	/* wake up recv thread in case something need to be handled */
963 	if (test_and_clear_bit(MD_CLUSTER_PENDING_RECV_EVENT, &cinfo->state))
964 		md_wakeup_thread(cinfo->recv_thread);
965 }
966 
967 static void resync_bitmap(struct mddev *mddev)
968 {
969 	struct md_cluster_info *cinfo = mddev->cluster_info;
970 	struct cluster_msg cmsg = {0};
971 	int err;
972 
973 	cmsg.type = cpu_to_le32(BITMAP_NEEDS_SYNC);
974 	err = sendmsg(cinfo, &cmsg, 1);
975 	if (err)
976 		pr_err("%s:%d: failed to send BITMAP_NEEDS_SYNC message (%d)\n",
977 			__func__, __LINE__, err);
978 }
979 
980 static void unlock_all_bitmaps(struct mddev *mddev);
981 static int leave(struct mddev *mddev)
982 {
983 	struct md_cluster_info *cinfo = mddev->cluster_info;
984 
985 	if (!cinfo)
986 		return 0;
987 
988 	/* BITMAP_NEEDS_SYNC message should be sent when node
989 	 * is leaving the cluster with dirty bitmap, also we
990 	 * can only deliver it when dlm connection is available */
991 	if (cinfo->slot_number > 0 && mddev->recovery_cp != MaxSector)
992 		resync_bitmap(mddev);
993 
994 	set_bit(MD_CLUSTER_HOLDING_MUTEX_FOR_RECVD, &cinfo->state);
995 	md_unregister_thread(&cinfo->recovery_thread);
996 	md_unregister_thread(&cinfo->recv_thread);
997 	lockres_free(cinfo->message_lockres);
998 	lockres_free(cinfo->token_lockres);
999 	lockres_free(cinfo->ack_lockres);
1000 	lockres_free(cinfo->no_new_dev_lockres);
1001 	lockres_free(cinfo->resync_lockres);
1002 	lockres_free(cinfo->bitmap_lockres);
1003 	unlock_all_bitmaps(mddev);
1004 	dlm_release_lockspace(cinfo->lockspace, 2);
1005 	kfree(cinfo);
1006 	return 0;
1007 }
1008 
1009 /* slot_number(): Returns the MD slot number to use
1010  * DLM starts the slot numbers from 1, wheras cluster-md
1011  * wants the number to be from zero, so we deduct one
1012  */
1013 static int slot_number(struct mddev *mddev)
1014 {
1015 	struct md_cluster_info *cinfo = mddev->cluster_info;
1016 
1017 	return cinfo->slot_number - 1;
1018 }
1019 
1020 /*
1021  * Check if the communication is already locked, else lock the communication
1022  * channel.
1023  * If it is already locked, token is in EX mode, and hence lock_token()
1024  * should not be called.
1025  */
1026 static int metadata_update_start(struct mddev *mddev)
1027 {
1028 	struct md_cluster_info *cinfo = mddev->cluster_info;
1029 	int ret;
1030 
1031 	/*
1032 	 * metadata_update_start is always called with the protection of
1033 	 * reconfig_mutex, so set WAITING_FOR_TOKEN here.
1034 	 */
1035 	ret = test_and_set_bit_lock(MD_CLUSTER_HOLDING_MUTEX_FOR_RECVD,
1036 				    &cinfo->state);
1037 	WARN_ON_ONCE(ret);
1038 	md_wakeup_thread(mddev->thread);
1039 
1040 	wait_event(cinfo->wait,
1041 		   !test_and_set_bit(MD_CLUSTER_SEND_LOCK, &cinfo->state) ||
1042 		   test_and_clear_bit(MD_CLUSTER_SEND_LOCKED_ALREADY, &cinfo->state));
1043 
1044 	/* If token is already locked, return 0 */
1045 	if (cinfo->token_lockres->mode == DLM_LOCK_EX) {
1046 		clear_bit_unlock(MD_CLUSTER_HOLDING_MUTEX_FOR_RECVD, &cinfo->state);
1047 		return 0;
1048 	}
1049 
1050 	ret = lock_token(cinfo, 1);
1051 	clear_bit_unlock(MD_CLUSTER_HOLDING_MUTEX_FOR_RECVD, &cinfo->state);
1052 	return ret;
1053 }
1054 
1055 static int metadata_update_finish(struct mddev *mddev)
1056 {
1057 	struct md_cluster_info *cinfo = mddev->cluster_info;
1058 	struct cluster_msg cmsg;
1059 	struct md_rdev *rdev;
1060 	int ret = 0;
1061 	int raid_slot = -1;
1062 
1063 	memset(&cmsg, 0, sizeof(cmsg));
1064 	cmsg.type = cpu_to_le32(METADATA_UPDATED);
1065 	/* Pick up a good active device number to send.
1066 	 */
1067 	rdev_for_each(rdev, mddev)
1068 		if (rdev->raid_disk > -1 && !test_bit(Faulty, &rdev->flags)) {
1069 			raid_slot = rdev->desc_nr;
1070 			break;
1071 		}
1072 	if (raid_slot >= 0) {
1073 		cmsg.raid_slot = cpu_to_le32(raid_slot);
1074 		ret = __sendmsg(cinfo, &cmsg);
1075 	} else
1076 		pr_warn("md-cluster: No good device id found to send\n");
1077 	clear_bit(MD_CLUSTER_SEND_LOCKED_ALREADY, &cinfo->state);
1078 	unlock_comm(cinfo);
1079 	return ret;
1080 }
1081 
1082 static void metadata_update_cancel(struct mddev *mddev)
1083 {
1084 	struct md_cluster_info *cinfo = mddev->cluster_info;
1085 	clear_bit(MD_CLUSTER_SEND_LOCKED_ALREADY, &cinfo->state);
1086 	unlock_comm(cinfo);
1087 }
1088 
1089 static int resync_start(struct mddev *mddev)
1090 {
1091 	struct md_cluster_info *cinfo = mddev->cluster_info;
1092 	return dlm_lock_sync_interruptible(cinfo->resync_lockres, DLM_LOCK_EX, mddev);
1093 }
1094 
1095 static int resync_info_update(struct mddev *mddev, sector_t lo, sector_t hi)
1096 {
1097 	struct md_cluster_info *cinfo = mddev->cluster_info;
1098 	struct resync_info ri;
1099 	struct cluster_msg cmsg = {0};
1100 
1101 	/* do not send zero again, if we have sent before */
1102 	if (hi == 0) {
1103 		memcpy(&ri, cinfo->bitmap_lockres->lksb.sb_lvbptr, sizeof(struct resync_info));
1104 		if (le64_to_cpu(ri.hi) == 0)
1105 			return 0;
1106 	}
1107 
1108 	add_resync_info(cinfo->bitmap_lockres, lo, hi);
1109 	/* Re-acquire the lock to refresh LVB */
1110 	dlm_lock_sync(cinfo->bitmap_lockres, DLM_LOCK_PW);
1111 	cmsg.type = cpu_to_le32(RESYNCING);
1112 	cmsg.low = cpu_to_le64(lo);
1113 	cmsg.high = cpu_to_le64(hi);
1114 
1115 	/*
1116 	 * mddev_lock is held if resync_info_update is called from
1117 	 * resync_finish (md_reap_sync_thread -> resync_finish)
1118 	 */
1119 	if (lo == 0 && hi == 0)
1120 		return sendmsg(cinfo, &cmsg, 1);
1121 	else
1122 		return sendmsg(cinfo, &cmsg, 0);
1123 }
1124 
1125 static int resync_finish(struct mddev *mddev)
1126 {
1127 	struct md_cluster_info *cinfo = mddev->cluster_info;
1128 	dlm_unlock_sync(cinfo->resync_lockres);
1129 	return resync_info_update(mddev, 0, 0);
1130 }
1131 
1132 static int area_resyncing(struct mddev *mddev, int direction,
1133 		sector_t lo, sector_t hi)
1134 {
1135 	struct md_cluster_info *cinfo = mddev->cluster_info;
1136 	int ret = 0;
1137 	struct suspend_info *s;
1138 
1139 	if ((direction == READ) &&
1140 		test_bit(MD_CLUSTER_SUSPEND_READ_BALANCING, &cinfo->state))
1141 		return 1;
1142 
1143 	spin_lock_irq(&cinfo->suspend_lock);
1144 	if (list_empty(&cinfo->suspend_list))
1145 		goto out;
1146 	list_for_each_entry(s, &cinfo->suspend_list, list)
1147 		if (hi > s->lo && lo < s->hi) {
1148 			ret = 1;
1149 			break;
1150 		}
1151 out:
1152 	spin_unlock_irq(&cinfo->suspend_lock);
1153 	return ret;
1154 }
1155 
1156 /* add_new_disk() - initiates a disk add
1157  * However, if this fails before writing md_update_sb(),
1158  * add_new_disk_cancel() must be called to release token lock
1159  */
1160 static int add_new_disk(struct mddev *mddev, struct md_rdev *rdev)
1161 {
1162 	struct md_cluster_info *cinfo = mddev->cluster_info;
1163 	struct cluster_msg cmsg;
1164 	int ret = 0;
1165 	struct mdp_superblock_1 *sb = page_address(rdev->sb_page);
1166 	char *uuid = sb->device_uuid;
1167 
1168 	memset(&cmsg, 0, sizeof(cmsg));
1169 	cmsg.type = cpu_to_le32(NEWDISK);
1170 	memcpy(cmsg.uuid, uuid, 16);
1171 	cmsg.raid_slot = cpu_to_le32(rdev->desc_nr);
1172 	lock_comm(cinfo, 1);
1173 	ret = __sendmsg(cinfo, &cmsg);
1174 	if (ret)
1175 		return ret;
1176 	cinfo->no_new_dev_lockres->flags |= DLM_LKF_NOQUEUE;
1177 	ret = dlm_lock_sync(cinfo->no_new_dev_lockres, DLM_LOCK_EX);
1178 	cinfo->no_new_dev_lockres->flags &= ~DLM_LKF_NOQUEUE;
1179 	/* Some node does not "see" the device */
1180 	if (ret == -EAGAIN)
1181 		ret = -ENOENT;
1182 	if (ret)
1183 		unlock_comm(cinfo);
1184 	else {
1185 		dlm_lock_sync(cinfo->no_new_dev_lockres, DLM_LOCK_CR);
1186 		/* Since MD_CHANGE_DEVS will be set in add_bound_rdev which
1187 		 * will run soon after add_new_disk, the below path will be
1188 		 * invoked:
1189 		 *   md_wakeup_thread(mddev->thread)
1190 		 *	-> conf->thread (raid1d)
1191 		 *	-> md_check_recovery -> md_update_sb
1192 		 *	-> metadata_update_start/finish
1193 		 * MD_CLUSTER_SEND_LOCKED_ALREADY will be cleared eventually.
1194 		 *
1195 		 * For other failure cases, metadata_update_cancel and
1196 		 * add_new_disk_cancel also clear below bit as well.
1197 		 * */
1198 		set_bit(MD_CLUSTER_SEND_LOCKED_ALREADY, &cinfo->state);
1199 		wake_up(&cinfo->wait);
1200 	}
1201 	return ret;
1202 }
1203 
1204 static void add_new_disk_cancel(struct mddev *mddev)
1205 {
1206 	struct md_cluster_info *cinfo = mddev->cluster_info;
1207 	clear_bit(MD_CLUSTER_SEND_LOCKED_ALREADY, &cinfo->state);
1208 	unlock_comm(cinfo);
1209 }
1210 
1211 static int new_disk_ack(struct mddev *mddev, bool ack)
1212 {
1213 	struct md_cluster_info *cinfo = mddev->cluster_info;
1214 
1215 	if (!test_bit(MD_CLUSTER_WAITING_FOR_NEWDISK, &cinfo->state)) {
1216 		pr_warn("md-cluster(%s): Spurious cluster confirmation\n", mdname(mddev));
1217 		return -EINVAL;
1218 	}
1219 
1220 	if (ack)
1221 		dlm_unlock_sync(cinfo->no_new_dev_lockres);
1222 	complete(&cinfo->newdisk_completion);
1223 	return 0;
1224 }
1225 
1226 static int remove_disk(struct mddev *mddev, struct md_rdev *rdev)
1227 {
1228 	struct cluster_msg cmsg = {0};
1229 	struct md_cluster_info *cinfo = mddev->cluster_info;
1230 	cmsg.type = cpu_to_le32(REMOVE);
1231 	cmsg.raid_slot = cpu_to_le32(rdev->desc_nr);
1232 	return sendmsg(cinfo, &cmsg, 1);
1233 }
1234 
1235 static int lock_all_bitmaps(struct mddev *mddev)
1236 {
1237 	int slot, my_slot, ret, held = 1, i = 0;
1238 	char str[64];
1239 	struct md_cluster_info *cinfo = mddev->cluster_info;
1240 
1241 	cinfo->other_bitmap_lockres = kzalloc((mddev->bitmap_info.nodes - 1) *
1242 					     sizeof(struct dlm_lock_resource *),
1243 					     GFP_KERNEL);
1244 	if (!cinfo->other_bitmap_lockres) {
1245 		pr_err("md: can't alloc mem for other bitmap locks\n");
1246 		return 0;
1247 	}
1248 
1249 	my_slot = slot_number(mddev);
1250 	for (slot = 0; slot < mddev->bitmap_info.nodes; slot++) {
1251 		if (slot == my_slot)
1252 			continue;
1253 
1254 		memset(str, '\0', 64);
1255 		snprintf(str, 64, "bitmap%04d", slot);
1256 		cinfo->other_bitmap_lockres[i] = lockres_init(mddev, str, NULL, 1);
1257 		if (!cinfo->other_bitmap_lockres[i])
1258 			return -ENOMEM;
1259 
1260 		cinfo->other_bitmap_lockres[i]->flags |= DLM_LKF_NOQUEUE;
1261 		ret = dlm_lock_sync(cinfo->other_bitmap_lockres[i], DLM_LOCK_PW);
1262 		if (ret)
1263 			held = -1;
1264 		i++;
1265 	}
1266 
1267 	return held;
1268 }
1269 
1270 static void unlock_all_bitmaps(struct mddev *mddev)
1271 {
1272 	struct md_cluster_info *cinfo = mddev->cluster_info;
1273 	int i;
1274 
1275 	/* release other node's bitmap lock if they are existed */
1276 	if (cinfo->other_bitmap_lockres) {
1277 		for (i = 0; i < mddev->bitmap_info.nodes - 1; i++) {
1278 			if (cinfo->other_bitmap_lockres[i]) {
1279 				lockres_free(cinfo->other_bitmap_lockres[i]);
1280 			}
1281 		}
1282 		kfree(cinfo->other_bitmap_lockres);
1283 	}
1284 }
1285 
1286 static int gather_bitmaps(struct md_rdev *rdev)
1287 {
1288 	int sn, err;
1289 	sector_t lo, hi;
1290 	struct cluster_msg cmsg = {0};
1291 	struct mddev *mddev = rdev->mddev;
1292 	struct md_cluster_info *cinfo = mddev->cluster_info;
1293 
1294 	cmsg.type = cpu_to_le32(RE_ADD);
1295 	cmsg.raid_slot = cpu_to_le32(rdev->desc_nr);
1296 	err = sendmsg(cinfo, &cmsg, 1);
1297 	if (err)
1298 		goto out;
1299 
1300 	for (sn = 0; sn < mddev->bitmap_info.nodes; sn++) {
1301 		if (sn == (cinfo->slot_number - 1))
1302 			continue;
1303 		err = bitmap_copy_from_slot(mddev, sn, &lo, &hi, false);
1304 		if (err) {
1305 			pr_warn("md-cluster: Could not gather bitmaps from slot %d", sn);
1306 			goto out;
1307 		}
1308 		if ((hi > 0) && (lo < mddev->recovery_cp))
1309 			mddev->recovery_cp = lo;
1310 	}
1311 out:
1312 	return err;
1313 }
1314 
1315 static struct md_cluster_operations cluster_ops = {
1316 	.join   = join,
1317 	.leave  = leave,
1318 	.slot_number = slot_number,
1319 	.resync_start = resync_start,
1320 	.resync_finish = resync_finish,
1321 	.resync_info_update = resync_info_update,
1322 	.metadata_update_start = metadata_update_start,
1323 	.metadata_update_finish = metadata_update_finish,
1324 	.metadata_update_cancel = metadata_update_cancel,
1325 	.area_resyncing = area_resyncing,
1326 	.add_new_disk = add_new_disk,
1327 	.add_new_disk_cancel = add_new_disk_cancel,
1328 	.new_disk_ack = new_disk_ack,
1329 	.remove_disk = remove_disk,
1330 	.load_bitmaps = load_bitmaps,
1331 	.gather_bitmaps = gather_bitmaps,
1332 	.lock_all_bitmaps = lock_all_bitmaps,
1333 	.unlock_all_bitmaps = unlock_all_bitmaps,
1334 };
1335 
1336 static int __init cluster_init(void)
1337 {
1338 	pr_warn("md-cluster: EXPERIMENTAL. Use with caution\n");
1339 	pr_info("Registering Cluster MD functions\n");
1340 	register_md_cluster_operations(&cluster_ops, THIS_MODULE);
1341 	return 0;
1342 }
1343 
1344 static void cluster_exit(void)
1345 {
1346 	unregister_md_cluster_operations();
1347 }
1348 
1349 module_init(cluster_init);
1350 module_exit(cluster_exit);
1351 MODULE_AUTHOR("SUSE");
1352 MODULE_LICENSE("GPL");
1353 MODULE_DESCRIPTION("Clustering support for MD");
1354