xref: /openbmc/linux/drivers/md/md-cluster.c (revision afd75628)
1 /*
2  * Copyright (C) 2015, SUSE
3  *
4  * This program is free software; you can redistribute it and/or modify
5  * it under the terms of the GNU General Public License as published by
6  * the Free Software Foundation; either version 2, or (at your option)
7  * any later version.
8  *
9  */
10 
11 
12 #include <linux/module.h>
13 #include <linux/kthread.h>
14 #include <linux/dlm.h>
15 #include <linux/sched.h>
16 #include <linux/raid/md_p.h>
17 #include "md.h"
18 #include "md-bitmap.h"
19 #include "md-cluster.h"
20 
21 #define LVB_SIZE	64
22 #define NEW_DEV_TIMEOUT 5000
23 
24 struct dlm_lock_resource {
25 	dlm_lockspace_t *ls;
26 	struct dlm_lksb lksb;
27 	char *name; /* lock name. */
28 	uint32_t flags; /* flags to pass to dlm_lock() */
29 	wait_queue_head_t sync_locking; /* wait queue for synchronized locking */
30 	bool sync_locking_done;
31 	void (*bast)(void *arg, int mode); /* blocking AST function pointer*/
32 	struct mddev *mddev; /* pointing back to mddev. */
33 	int mode;
34 };
35 
36 struct suspend_info {
37 	int slot;
38 	sector_t lo;
39 	sector_t hi;
40 	struct list_head list;
41 };
42 
43 struct resync_info {
44 	__le64 lo;
45 	__le64 hi;
46 };
47 
48 /* md_cluster_info flags */
49 #define		MD_CLUSTER_WAITING_FOR_NEWDISK		1
50 #define		MD_CLUSTER_SUSPEND_READ_BALANCING	2
51 #define		MD_CLUSTER_BEGIN_JOIN_CLUSTER		3
52 
53 /* Lock the send communication. This is done through
54  * bit manipulation as opposed to a mutex in order to
55  * accomodate lock and hold. See next comment.
56  */
57 #define		MD_CLUSTER_SEND_LOCK			4
58 /* If cluster operations (such as adding a disk) must lock the
59  * communication channel, so as to perform extra operations
60  * (update metadata) and no other operation is allowed on the
61  * MD. Token needs to be locked and held until the operation
62  * completes witha md_update_sb(), which would eventually release
63  * the lock.
64  */
65 #define		MD_CLUSTER_SEND_LOCKED_ALREADY		5
66 /* We should receive message after node joined cluster and
67  * set up all the related infos such as bitmap and personality */
68 #define		MD_CLUSTER_ALREADY_IN_CLUSTER		6
69 #define		MD_CLUSTER_PENDING_RECV_EVENT		7
70 #define 	MD_CLUSTER_HOLDING_MUTEX_FOR_RECVD		8
71 
72 struct md_cluster_info {
73 	struct mddev *mddev; /* the md device which md_cluster_info belongs to */
74 	/* dlm lock space and resources for clustered raid. */
75 	dlm_lockspace_t *lockspace;
76 	int slot_number;
77 	struct completion completion;
78 	struct mutex recv_mutex;
79 	struct dlm_lock_resource *bitmap_lockres;
80 	struct dlm_lock_resource **other_bitmap_lockres;
81 	struct dlm_lock_resource *resync_lockres;
82 	struct list_head suspend_list;
83 	spinlock_t suspend_lock;
84 	struct md_thread *recovery_thread;
85 	unsigned long recovery_map;
86 	/* communication loc resources */
87 	struct dlm_lock_resource *ack_lockres;
88 	struct dlm_lock_resource *message_lockres;
89 	struct dlm_lock_resource *token_lockres;
90 	struct dlm_lock_resource *no_new_dev_lockres;
91 	struct md_thread *recv_thread;
92 	struct completion newdisk_completion;
93 	wait_queue_head_t wait;
94 	unsigned long state;
95 	/* record the region in RESYNCING message */
96 	sector_t sync_low;
97 	sector_t sync_hi;
98 };
99 
100 enum msg_type {
101 	METADATA_UPDATED = 0,
102 	RESYNCING,
103 	NEWDISK,
104 	REMOVE,
105 	RE_ADD,
106 	BITMAP_NEEDS_SYNC,
107 	CHANGE_CAPACITY,
108 	BITMAP_RESIZE,
109 };
110 
111 struct cluster_msg {
112 	__le32 type;
113 	__le32 slot;
114 	/* TODO: Unionize this for smaller footprint */
115 	__le64 low;
116 	__le64 high;
117 	char uuid[16];
118 	__le32 raid_slot;
119 };
120 
121 static void sync_ast(void *arg)
122 {
123 	struct dlm_lock_resource *res;
124 
125 	res = arg;
126 	res->sync_locking_done = true;
127 	wake_up(&res->sync_locking);
128 }
129 
130 static int dlm_lock_sync(struct dlm_lock_resource *res, int mode)
131 {
132 	int ret = 0;
133 
134 	ret = dlm_lock(res->ls, mode, &res->lksb,
135 			res->flags, res->name, strlen(res->name),
136 			0, sync_ast, res, res->bast);
137 	if (ret)
138 		return ret;
139 	wait_event(res->sync_locking, res->sync_locking_done);
140 	res->sync_locking_done = false;
141 	if (res->lksb.sb_status == 0)
142 		res->mode = mode;
143 	return res->lksb.sb_status;
144 }
145 
146 static int dlm_unlock_sync(struct dlm_lock_resource *res)
147 {
148 	return dlm_lock_sync(res, DLM_LOCK_NL);
149 }
150 
151 /*
152  * An variation of dlm_lock_sync, which make lock request could
153  * be interrupted
154  */
155 static int dlm_lock_sync_interruptible(struct dlm_lock_resource *res, int mode,
156 				       struct mddev *mddev)
157 {
158 	int ret = 0;
159 
160 	ret = dlm_lock(res->ls, mode, &res->lksb,
161 			res->flags, res->name, strlen(res->name),
162 			0, sync_ast, res, res->bast);
163 	if (ret)
164 		return ret;
165 
166 	wait_event(res->sync_locking, res->sync_locking_done
167 				      || kthread_should_stop()
168 				      || test_bit(MD_CLOSING, &mddev->flags));
169 	if (!res->sync_locking_done) {
170 		/*
171 		 * the convert queue contains the lock request when request is
172 		 * interrupted, and sync_ast could still be run, so need to
173 		 * cancel the request and reset completion
174 		 */
175 		ret = dlm_unlock(res->ls, res->lksb.sb_lkid, DLM_LKF_CANCEL,
176 			&res->lksb, res);
177 		res->sync_locking_done = false;
178 		if (unlikely(ret != 0))
179 			pr_info("failed to cancel previous lock request "
180 				 "%s return %d\n", res->name, ret);
181 		return -EPERM;
182 	} else
183 		res->sync_locking_done = false;
184 	if (res->lksb.sb_status == 0)
185 		res->mode = mode;
186 	return res->lksb.sb_status;
187 }
188 
189 static struct dlm_lock_resource *lockres_init(struct mddev *mddev,
190 		char *name, void (*bastfn)(void *arg, int mode), int with_lvb)
191 {
192 	struct dlm_lock_resource *res = NULL;
193 	int ret, namelen;
194 	struct md_cluster_info *cinfo = mddev->cluster_info;
195 
196 	res = kzalloc(sizeof(struct dlm_lock_resource), GFP_KERNEL);
197 	if (!res)
198 		return NULL;
199 	init_waitqueue_head(&res->sync_locking);
200 	res->sync_locking_done = false;
201 	res->ls = cinfo->lockspace;
202 	res->mddev = mddev;
203 	res->mode = DLM_LOCK_IV;
204 	namelen = strlen(name);
205 	res->name = kzalloc(namelen + 1, GFP_KERNEL);
206 	if (!res->name) {
207 		pr_err("md-cluster: Unable to allocate resource name for resource %s\n", name);
208 		goto out_err;
209 	}
210 	strlcpy(res->name, name, namelen + 1);
211 	if (with_lvb) {
212 		res->lksb.sb_lvbptr = kzalloc(LVB_SIZE, GFP_KERNEL);
213 		if (!res->lksb.sb_lvbptr) {
214 			pr_err("md-cluster: Unable to allocate LVB for resource %s\n", name);
215 			goto out_err;
216 		}
217 		res->flags = DLM_LKF_VALBLK;
218 	}
219 
220 	if (bastfn)
221 		res->bast = bastfn;
222 
223 	res->flags |= DLM_LKF_EXPEDITE;
224 
225 	ret = dlm_lock_sync(res, DLM_LOCK_NL);
226 	if (ret) {
227 		pr_err("md-cluster: Unable to lock NL on new lock resource %s\n", name);
228 		goto out_err;
229 	}
230 	res->flags &= ~DLM_LKF_EXPEDITE;
231 	res->flags |= DLM_LKF_CONVERT;
232 
233 	return res;
234 out_err:
235 	kfree(res->lksb.sb_lvbptr);
236 	kfree(res->name);
237 	kfree(res);
238 	return NULL;
239 }
240 
241 static void lockres_free(struct dlm_lock_resource *res)
242 {
243 	int ret = 0;
244 
245 	if (!res)
246 		return;
247 
248 	/*
249 	 * use FORCEUNLOCK flag, so we can unlock even the lock is on the
250 	 * waiting or convert queue
251 	 */
252 	ret = dlm_unlock(res->ls, res->lksb.sb_lkid, DLM_LKF_FORCEUNLOCK,
253 		&res->lksb, res);
254 	if (unlikely(ret != 0))
255 		pr_err("failed to unlock %s return %d\n", res->name, ret);
256 	else
257 		wait_event(res->sync_locking, res->sync_locking_done);
258 
259 	kfree(res->name);
260 	kfree(res->lksb.sb_lvbptr);
261 	kfree(res);
262 }
263 
264 static void add_resync_info(struct dlm_lock_resource *lockres,
265 			    sector_t lo, sector_t hi)
266 {
267 	struct resync_info *ri;
268 
269 	ri = (struct resync_info *)lockres->lksb.sb_lvbptr;
270 	ri->lo = cpu_to_le64(lo);
271 	ri->hi = cpu_to_le64(hi);
272 }
273 
274 static struct suspend_info *read_resync_info(struct mddev *mddev, struct dlm_lock_resource *lockres)
275 {
276 	struct resync_info ri;
277 	struct suspend_info *s = NULL;
278 	sector_t hi = 0;
279 
280 	dlm_lock_sync(lockres, DLM_LOCK_CR);
281 	memcpy(&ri, lockres->lksb.sb_lvbptr, sizeof(struct resync_info));
282 	hi = le64_to_cpu(ri.hi);
283 	if (hi > 0) {
284 		s = kzalloc(sizeof(struct suspend_info), GFP_KERNEL);
285 		if (!s)
286 			goto out;
287 		s->hi = hi;
288 		s->lo = le64_to_cpu(ri.lo);
289 	}
290 	dlm_unlock_sync(lockres);
291 out:
292 	return s;
293 }
294 
295 static void recover_bitmaps(struct md_thread *thread)
296 {
297 	struct mddev *mddev = thread->mddev;
298 	struct md_cluster_info *cinfo = mddev->cluster_info;
299 	struct dlm_lock_resource *bm_lockres;
300 	char str[64];
301 	int slot, ret;
302 	struct suspend_info *s, *tmp;
303 	sector_t lo, hi;
304 
305 	while (cinfo->recovery_map) {
306 		slot = fls64((u64)cinfo->recovery_map) - 1;
307 
308 		snprintf(str, 64, "bitmap%04d", slot);
309 		bm_lockres = lockres_init(mddev, str, NULL, 1);
310 		if (!bm_lockres) {
311 			pr_err("md-cluster: Cannot initialize bitmaps\n");
312 			goto clear_bit;
313 		}
314 
315 		ret = dlm_lock_sync_interruptible(bm_lockres, DLM_LOCK_PW, mddev);
316 		if (ret) {
317 			pr_err("md-cluster: Could not DLM lock %s: %d\n",
318 					str, ret);
319 			goto clear_bit;
320 		}
321 		ret = md_bitmap_copy_from_slot(mddev, slot, &lo, &hi, true);
322 		if (ret) {
323 			pr_err("md-cluster: Could not copy data from bitmap %d\n", slot);
324 			goto clear_bit;
325 		}
326 
327 		/* Clear suspend_area associated with the bitmap */
328 		spin_lock_irq(&cinfo->suspend_lock);
329 		list_for_each_entry_safe(s, tmp, &cinfo->suspend_list, list)
330 			if (slot == s->slot) {
331 				list_del(&s->list);
332 				kfree(s);
333 			}
334 		spin_unlock_irq(&cinfo->suspend_lock);
335 
336 		if (hi > 0) {
337 			if (lo < mddev->recovery_cp)
338 				mddev->recovery_cp = lo;
339 			/* wake up thread to continue resync in case resync
340 			 * is not finished */
341 			if (mddev->recovery_cp != MaxSector) {
342 				/*
343 				 * clear the REMOTE flag since we will launch
344 				 * resync thread in current node.
345 				 */
346 				clear_bit(MD_RESYNCING_REMOTE,
347 					  &mddev->recovery);
348 				set_bit(MD_RECOVERY_NEEDED, &mddev->recovery);
349 				md_wakeup_thread(mddev->thread);
350 			}
351 		}
352 clear_bit:
353 		lockres_free(bm_lockres);
354 		clear_bit(slot, &cinfo->recovery_map);
355 	}
356 }
357 
358 static void recover_prep(void *arg)
359 {
360 	struct mddev *mddev = arg;
361 	struct md_cluster_info *cinfo = mddev->cluster_info;
362 	set_bit(MD_CLUSTER_SUSPEND_READ_BALANCING, &cinfo->state);
363 }
364 
365 static void __recover_slot(struct mddev *mddev, int slot)
366 {
367 	struct md_cluster_info *cinfo = mddev->cluster_info;
368 
369 	set_bit(slot, &cinfo->recovery_map);
370 	if (!cinfo->recovery_thread) {
371 		cinfo->recovery_thread = md_register_thread(recover_bitmaps,
372 				mddev, "recover");
373 		if (!cinfo->recovery_thread) {
374 			pr_warn("md-cluster: Could not create recovery thread\n");
375 			return;
376 		}
377 	}
378 	md_wakeup_thread(cinfo->recovery_thread);
379 }
380 
381 static void recover_slot(void *arg, struct dlm_slot *slot)
382 {
383 	struct mddev *mddev = arg;
384 	struct md_cluster_info *cinfo = mddev->cluster_info;
385 
386 	pr_info("md-cluster: %s Node %d/%d down. My slot: %d. Initiating recovery.\n",
387 			mddev->bitmap_info.cluster_name,
388 			slot->nodeid, slot->slot,
389 			cinfo->slot_number);
390 	/* deduct one since dlm slot starts from one while the num of
391 	 * cluster-md begins with 0 */
392 	__recover_slot(mddev, slot->slot - 1);
393 }
394 
395 static void recover_done(void *arg, struct dlm_slot *slots,
396 		int num_slots, int our_slot,
397 		uint32_t generation)
398 {
399 	struct mddev *mddev = arg;
400 	struct md_cluster_info *cinfo = mddev->cluster_info;
401 
402 	cinfo->slot_number = our_slot;
403 	/* completion is only need to be complete when node join cluster,
404 	 * it doesn't need to run during another node's failure */
405 	if (test_bit(MD_CLUSTER_BEGIN_JOIN_CLUSTER, &cinfo->state)) {
406 		complete(&cinfo->completion);
407 		clear_bit(MD_CLUSTER_BEGIN_JOIN_CLUSTER, &cinfo->state);
408 	}
409 	clear_bit(MD_CLUSTER_SUSPEND_READ_BALANCING, &cinfo->state);
410 }
411 
412 /* the ops is called when node join the cluster, and do lock recovery
413  * if node failure occurs */
414 static const struct dlm_lockspace_ops md_ls_ops = {
415 	.recover_prep = recover_prep,
416 	.recover_slot = recover_slot,
417 	.recover_done = recover_done,
418 };
419 
420 /*
421  * The BAST function for the ack lock resource
422  * This function wakes up the receive thread in
423  * order to receive and process the message.
424  */
425 static void ack_bast(void *arg, int mode)
426 {
427 	struct dlm_lock_resource *res = arg;
428 	struct md_cluster_info *cinfo = res->mddev->cluster_info;
429 
430 	if (mode == DLM_LOCK_EX) {
431 		if (test_bit(MD_CLUSTER_ALREADY_IN_CLUSTER, &cinfo->state))
432 			md_wakeup_thread(cinfo->recv_thread);
433 		else
434 			set_bit(MD_CLUSTER_PENDING_RECV_EVENT, &cinfo->state);
435 	}
436 }
437 
438 static void __remove_suspend_info(struct md_cluster_info *cinfo, int slot)
439 {
440 	struct suspend_info *s, *tmp;
441 
442 	list_for_each_entry_safe(s, tmp, &cinfo->suspend_list, list)
443 		if (slot == s->slot) {
444 			list_del(&s->list);
445 			kfree(s);
446 			break;
447 		}
448 }
449 
450 static void remove_suspend_info(struct mddev *mddev, int slot)
451 {
452 	struct md_cluster_info *cinfo = mddev->cluster_info;
453 	mddev->pers->quiesce(mddev, 1);
454 	spin_lock_irq(&cinfo->suspend_lock);
455 	__remove_suspend_info(cinfo, slot);
456 	spin_unlock_irq(&cinfo->suspend_lock);
457 	mddev->pers->quiesce(mddev, 0);
458 }
459 
460 
461 static void process_suspend_info(struct mddev *mddev,
462 		int slot, sector_t lo, sector_t hi)
463 {
464 	struct md_cluster_info *cinfo = mddev->cluster_info;
465 	struct suspend_info *s;
466 
467 	if (!hi) {
468 		/*
469 		 * clear the REMOTE flag since resync or recovery is finished
470 		 * in remote node.
471 		 */
472 		clear_bit(MD_RESYNCING_REMOTE, &mddev->recovery);
473 		remove_suspend_info(mddev, slot);
474 		set_bit(MD_RECOVERY_NEEDED, &mddev->recovery);
475 		md_wakeup_thread(mddev->thread);
476 		return;
477 	}
478 
479 	/*
480 	 * The bitmaps are not same for different nodes
481 	 * if RESYNCING is happening in one node, then
482 	 * the node which received the RESYNCING message
483 	 * probably will perform resync with the region
484 	 * [lo, hi] again, so we could reduce resync time
485 	 * a lot if we can ensure that the bitmaps among
486 	 * different nodes are match up well.
487 	 *
488 	 * sync_low/hi is used to record the region which
489 	 * arrived in the previous RESYNCING message,
490 	 *
491 	 * Call bitmap_sync_with_cluster to clear
492 	 * NEEDED_MASK and set RESYNC_MASK since
493 	 * resync thread is running in another node,
494 	 * so we don't need to do the resync again
495 	 * with the same section */
496 	md_bitmap_sync_with_cluster(mddev, cinfo->sync_low, cinfo->sync_hi, lo, hi);
497 	cinfo->sync_low = lo;
498 	cinfo->sync_hi = hi;
499 
500 	s = kzalloc(sizeof(struct suspend_info), GFP_KERNEL);
501 	if (!s)
502 		return;
503 	s->slot = slot;
504 	s->lo = lo;
505 	s->hi = hi;
506 	mddev->pers->quiesce(mddev, 1);
507 	spin_lock_irq(&cinfo->suspend_lock);
508 	/* Remove existing entry (if exists) before adding */
509 	__remove_suspend_info(cinfo, slot);
510 	list_add(&s->list, &cinfo->suspend_list);
511 	spin_unlock_irq(&cinfo->suspend_lock);
512 	mddev->pers->quiesce(mddev, 0);
513 }
514 
515 static void process_add_new_disk(struct mddev *mddev, struct cluster_msg *cmsg)
516 {
517 	char disk_uuid[64];
518 	struct md_cluster_info *cinfo = mddev->cluster_info;
519 	char event_name[] = "EVENT=ADD_DEVICE";
520 	char raid_slot[16];
521 	char *envp[] = {event_name, disk_uuid, raid_slot, NULL};
522 	int len;
523 
524 	len = snprintf(disk_uuid, 64, "DEVICE_UUID=");
525 	sprintf(disk_uuid + len, "%pU", cmsg->uuid);
526 	snprintf(raid_slot, 16, "RAID_DISK=%d", le32_to_cpu(cmsg->raid_slot));
527 	pr_info("%s:%d Sending kobject change with %s and %s\n", __func__, __LINE__, disk_uuid, raid_slot);
528 	init_completion(&cinfo->newdisk_completion);
529 	set_bit(MD_CLUSTER_WAITING_FOR_NEWDISK, &cinfo->state);
530 	kobject_uevent_env(&disk_to_dev(mddev->gendisk)->kobj, KOBJ_CHANGE, envp);
531 	wait_for_completion_timeout(&cinfo->newdisk_completion,
532 			NEW_DEV_TIMEOUT);
533 	clear_bit(MD_CLUSTER_WAITING_FOR_NEWDISK, &cinfo->state);
534 }
535 
536 
537 static void process_metadata_update(struct mddev *mddev, struct cluster_msg *msg)
538 {
539 	int got_lock = 0;
540 	struct md_cluster_info *cinfo = mddev->cluster_info;
541 	mddev->good_device_nr = le32_to_cpu(msg->raid_slot);
542 
543 	dlm_lock_sync(cinfo->no_new_dev_lockres, DLM_LOCK_CR);
544 	wait_event(mddev->thread->wqueue,
545 		   (got_lock = mddev_trylock(mddev)) ||
546 		    test_bit(MD_CLUSTER_HOLDING_MUTEX_FOR_RECVD, &cinfo->state));
547 	md_reload_sb(mddev, mddev->good_device_nr);
548 	if (got_lock)
549 		mddev_unlock(mddev);
550 }
551 
552 static void process_remove_disk(struct mddev *mddev, struct cluster_msg *msg)
553 {
554 	struct md_rdev *rdev;
555 
556 	rcu_read_lock();
557 	rdev = md_find_rdev_nr_rcu(mddev, le32_to_cpu(msg->raid_slot));
558 	if (rdev) {
559 		set_bit(ClusterRemove, &rdev->flags);
560 		set_bit(MD_RECOVERY_NEEDED, &mddev->recovery);
561 		md_wakeup_thread(mddev->thread);
562 	}
563 	else
564 		pr_warn("%s: %d Could not find disk(%d) to REMOVE\n",
565 			__func__, __LINE__, le32_to_cpu(msg->raid_slot));
566 	rcu_read_unlock();
567 }
568 
569 static void process_readd_disk(struct mddev *mddev, struct cluster_msg *msg)
570 {
571 	struct md_rdev *rdev;
572 
573 	rcu_read_lock();
574 	rdev = md_find_rdev_nr_rcu(mddev, le32_to_cpu(msg->raid_slot));
575 	if (rdev && test_bit(Faulty, &rdev->flags))
576 		clear_bit(Faulty, &rdev->flags);
577 	else
578 		pr_warn("%s: %d Could not find disk(%d) which is faulty",
579 			__func__, __LINE__, le32_to_cpu(msg->raid_slot));
580 	rcu_read_unlock();
581 }
582 
583 static int process_recvd_msg(struct mddev *mddev, struct cluster_msg *msg)
584 {
585 	int ret = 0;
586 
587 	if (WARN(mddev->cluster_info->slot_number - 1 == le32_to_cpu(msg->slot),
588 		"node %d received it's own msg\n", le32_to_cpu(msg->slot)))
589 		return -1;
590 	switch (le32_to_cpu(msg->type)) {
591 	case METADATA_UPDATED:
592 		process_metadata_update(mddev, msg);
593 		break;
594 	case CHANGE_CAPACITY:
595 		set_capacity(mddev->gendisk, mddev->array_sectors);
596 		revalidate_disk(mddev->gendisk);
597 		break;
598 	case RESYNCING:
599 		set_bit(MD_RESYNCING_REMOTE, &mddev->recovery);
600 		process_suspend_info(mddev, le32_to_cpu(msg->slot),
601 				     le64_to_cpu(msg->low),
602 				     le64_to_cpu(msg->high));
603 		break;
604 	case NEWDISK:
605 		process_add_new_disk(mddev, msg);
606 		break;
607 	case REMOVE:
608 		process_remove_disk(mddev, msg);
609 		break;
610 	case RE_ADD:
611 		process_readd_disk(mddev, msg);
612 		break;
613 	case BITMAP_NEEDS_SYNC:
614 		__recover_slot(mddev, le32_to_cpu(msg->slot));
615 		break;
616 	case BITMAP_RESIZE:
617 		if (le64_to_cpu(msg->high) != mddev->pers->size(mddev, 0, 0))
618 			ret = md_bitmap_resize(mddev->bitmap,
619 					    le64_to_cpu(msg->high), 0, 0);
620 		break;
621 	default:
622 		ret = -1;
623 		pr_warn("%s:%d Received unknown message from %d\n",
624 			__func__, __LINE__, msg->slot);
625 	}
626 	return ret;
627 }
628 
629 /*
630  * thread for receiving message
631  */
632 static void recv_daemon(struct md_thread *thread)
633 {
634 	struct md_cluster_info *cinfo = thread->mddev->cluster_info;
635 	struct dlm_lock_resource *ack_lockres = cinfo->ack_lockres;
636 	struct dlm_lock_resource *message_lockres = cinfo->message_lockres;
637 	struct cluster_msg msg;
638 	int ret;
639 
640 	mutex_lock(&cinfo->recv_mutex);
641 	/*get CR on Message*/
642 	if (dlm_lock_sync(message_lockres, DLM_LOCK_CR)) {
643 		pr_err("md/raid1:failed to get CR on MESSAGE\n");
644 		mutex_unlock(&cinfo->recv_mutex);
645 		return;
646 	}
647 
648 	/* read lvb and wake up thread to process this message_lockres */
649 	memcpy(&msg, message_lockres->lksb.sb_lvbptr, sizeof(struct cluster_msg));
650 	ret = process_recvd_msg(thread->mddev, &msg);
651 	if (ret)
652 		goto out;
653 
654 	/*release CR on ack_lockres*/
655 	ret = dlm_unlock_sync(ack_lockres);
656 	if (unlikely(ret != 0))
657 		pr_info("unlock ack failed return %d\n", ret);
658 	/*up-convert to PR on message_lockres*/
659 	ret = dlm_lock_sync(message_lockres, DLM_LOCK_PR);
660 	if (unlikely(ret != 0))
661 		pr_info("lock PR on msg failed return %d\n", ret);
662 	/*get CR on ack_lockres again*/
663 	ret = dlm_lock_sync(ack_lockres, DLM_LOCK_CR);
664 	if (unlikely(ret != 0))
665 		pr_info("lock CR on ack failed return %d\n", ret);
666 out:
667 	/*release CR on message_lockres*/
668 	ret = dlm_unlock_sync(message_lockres);
669 	if (unlikely(ret != 0))
670 		pr_info("unlock msg failed return %d\n", ret);
671 	mutex_unlock(&cinfo->recv_mutex);
672 }
673 
674 /* lock_token()
675  * Takes the lock on the TOKEN lock resource so no other
676  * node can communicate while the operation is underway.
677  */
678 static int lock_token(struct md_cluster_info *cinfo, bool mddev_locked)
679 {
680 	int error, set_bit = 0;
681 	struct mddev *mddev = cinfo->mddev;
682 
683 	/*
684 	 * If resync thread run after raid1d thread, then process_metadata_update
685 	 * could not continue if raid1d held reconfig_mutex (and raid1d is blocked
686 	 * since another node already got EX on Token and waitting the EX of Ack),
687 	 * so let resync wake up thread in case flag is set.
688 	 */
689 	if (mddev_locked && !test_bit(MD_CLUSTER_HOLDING_MUTEX_FOR_RECVD,
690 				      &cinfo->state)) {
691 		error = test_and_set_bit_lock(MD_CLUSTER_HOLDING_MUTEX_FOR_RECVD,
692 					      &cinfo->state);
693 		WARN_ON_ONCE(error);
694 		md_wakeup_thread(mddev->thread);
695 		set_bit = 1;
696 	}
697 	error = dlm_lock_sync(cinfo->token_lockres, DLM_LOCK_EX);
698 	if (set_bit)
699 		clear_bit_unlock(MD_CLUSTER_HOLDING_MUTEX_FOR_RECVD, &cinfo->state);
700 
701 	if (error)
702 		pr_err("md-cluster(%s:%d): failed to get EX on TOKEN (%d)\n",
703 				__func__, __LINE__, error);
704 
705 	/* Lock the receive sequence */
706 	mutex_lock(&cinfo->recv_mutex);
707 	return error;
708 }
709 
710 /* lock_comm()
711  * Sets the MD_CLUSTER_SEND_LOCK bit to lock the send channel.
712  */
713 static int lock_comm(struct md_cluster_info *cinfo, bool mddev_locked)
714 {
715 	wait_event(cinfo->wait,
716 		   !test_and_set_bit(MD_CLUSTER_SEND_LOCK, &cinfo->state));
717 
718 	return lock_token(cinfo, mddev_locked);
719 }
720 
721 static void unlock_comm(struct md_cluster_info *cinfo)
722 {
723 	WARN_ON(cinfo->token_lockres->mode != DLM_LOCK_EX);
724 	mutex_unlock(&cinfo->recv_mutex);
725 	dlm_unlock_sync(cinfo->token_lockres);
726 	clear_bit(MD_CLUSTER_SEND_LOCK, &cinfo->state);
727 	wake_up(&cinfo->wait);
728 }
729 
730 /* __sendmsg()
731  * This function performs the actual sending of the message. This function is
732  * usually called after performing the encompassing operation
733  * The function:
734  * 1. Grabs the message lockresource in EX mode
735  * 2. Copies the message to the message LVB
736  * 3. Downconverts message lockresource to CW
737  * 4. Upconverts ack lock resource from CR to EX. This forces the BAST on other nodes
738  *    and the other nodes read the message. The thread will wait here until all other
739  *    nodes have released ack lock resource.
740  * 5. Downconvert ack lockresource to CR
741  */
742 static int __sendmsg(struct md_cluster_info *cinfo, struct cluster_msg *cmsg)
743 {
744 	int error;
745 	int slot = cinfo->slot_number - 1;
746 
747 	cmsg->slot = cpu_to_le32(slot);
748 	/*get EX on Message*/
749 	error = dlm_lock_sync(cinfo->message_lockres, DLM_LOCK_EX);
750 	if (error) {
751 		pr_err("md-cluster: failed to get EX on MESSAGE (%d)\n", error);
752 		goto failed_message;
753 	}
754 
755 	memcpy(cinfo->message_lockres->lksb.sb_lvbptr, (void *)cmsg,
756 			sizeof(struct cluster_msg));
757 	/*down-convert EX to CW on Message*/
758 	error = dlm_lock_sync(cinfo->message_lockres, DLM_LOCK_CW);
759 	if (error) {
760 		pr_err("md-cluster: failed to convert EX to CW on MESSAGE(%d)\n",
761 				error);
762 		goto failed_ack;
763 	}
764 
765 	/*up-convert CR to EX on Ack*/
766 	error = dlm_lock_sync(cinfo->ack_lockres, DLM_LOCK_EX);
767 	if (error) {
768 		pr_err("md-cluster: failed to convert CR to EX on ACK(%d)\n",
769 				error);
770 		goto failed_ack;
771 	}
772 
773 	/*down-convert EX to CR on Ack*/
774 	error = dlm_lock_sync(cinfo->ack_lockres, DLM_LOCK_CR);
775 	if (error) {
776 		pr_err("md-cluster: failed to convert EX to CR on ACK(%d)\n",
777 				error);
778 		goto failed_ack;
779 	}
780 
781 failed_ack:
782 	error = dlm_unlock_sync(cinfo->message_lockres);
783 	if (unlikely(error != 0)) {
784 		pr_err("md-cluster: failed convert to NL on MESSAGE(%d)\n",
785 			error);
786 		/* in case the message can't be released due to some reason */
787 		goto failed_ack;
788 	}
789 failed_message:
790 	return error;
791 }
792 
793 static int sendmsg(struct md_cluster_info *cinfo, struct cluster_msg *cmsg,
794 		   bool mddev_locked)
795 {
796 	int ret;
797 
798 	lock_comm(cinfo, mddev_locked);
799 	ret = __sendmsg(cinfo, cmsg);
800 	unlock_comm(cinfo);
801 	return ret;
802 }
803 
804 static int gather_all_resync_info(struct mddev *mddev, int total_slots)
805 {
806 	struct md_cluster_info *cinfo = mddev->cluster_info;
807 	int i, ret = 0;
808 	struct dlm_lock_resource *bm_lockres;
809 	struct suspend_info *s;
810 	char str[64];
811 	sector_t lo, hi;
812 
813 
814 	for (i = 0; i < total_slots; i++) {
815 		memset(str, '\0', 64);
816 		snprintf(str, 64, "bitmap%04d", i);
817 		bm_lockres = lockres_init(mddev, str, NULL, 1);
818 		if (!bm_lockres)
819 			return -ENOMEM;
820 		if (i == (cinfo->slot_number - 1)) {
821 			lockres_free(bm_lockres);
822 			continue;
823 		}
824 
825 		bm_lockres->flags |= DLM_LKF_NOQUEUE;
826 		ret = dlm_lock_sync(bm_lockres, DLM_LOCK_PW);
827 		if (ret == -EAGAIN) {
828 			s = read_resync_info(mddev, bm_lockres);
829 			if (s) {
830 				pr_info("%s:%d Resync[%llu..%llu] in progress on %d\n",
831 						__func__, __LINE__,
832 						(unsigned long long) s->lo,
833 						(unsigned long long) s->hi, i);
834 				spin_lock_irq(&cinfo->suspend_lock);
835 				s->slot = i;
836 				list_add(&s->list, &cinfo->suspend_list);
837 				spin_unlock_irq(&cinfo->suspend_lock);
838 			}
839 			ret = 0;
840 			lockres_free(bm_lockres);
841 			continue;
842 		}
843 		if (ret) {
844 			lockres_free(bm_lockres);
845 			goto out;
846 		}
847 
848 		/* Read the disk bitmap sb and check if it needs recovery */
849 		ret = md_bitmap_copy_from_slot(mddev, i, &lo, &hi, false);
850 		if (ret) {
851 			pr_warn("md-cluster: Could not gather bitmaps from slot %d", i);
852 			lockres_free(bm_lockres);
853 			continue;
854 		}
855 		if ((hi > 0) && (lo < mddev->recovery_cp)) {
856 			set_bit(MD_RECOVERY_NEEDED, &mddev->recovery);
857 			mddev->recovery_cp = lo;
858 			md_check_recovery(mddev);
859 		}
860 
861 		lockres_free(bm_lockres);
862 	}
863 out:
864 	return ret;
865 }
866 
867 static int join(struct mddev *mddev, int nodes)
868 {
869 	struct md_cluster_info *cinfo;
870 	int ret, ops_rv;
871 	char str[64];
872 
873 	cinfo = kzalloc(sizeof(struct md_cluster_info), GFP_KERNEL);
874 	if (!cinfo)
875 		return -ENOMEM;
876 
877 	INIT_LIST_HEAD(&cinfo->suspend_list);
878 	spin_lock_init(&cinfo->suspend_lock);
879 	init_completion(&cinfo->completion);
880 	set_bit(MD_CLUSTER_BEGIN_JOIN_CLUSTER, &cinfo->state);
881 	init_waitqueue_head(&cinfo->wait);
882 	mutex_init(&cinfo->recv_mutex);
883 
884 	mddev->cluster_info = cinfo;
885 	cinfo->mddev = mddev;
886 
887 	memset(str, 0, 64);
888 	sprintf(str, "%pU", mddev->uuid);
889 	ret = dlm_new_lockspace(str, mddev->bitmap_info.cluster_name,
890 				DLM_LSFL_FS, LVB_SIZE,
891 				&md_ls_ops, mddev, &ops_rv, &cinfo->lockspace);
892 	if (ret)
893 		goto err;
894 	wait_for_completion(&cinfo->completion);
895 	if (nodes < cinfo->slot_number) {
896 		pr_err("md-cluster: Slot allotted(%d) is greater than available slots(%d).",
897 			cinfo->slot_number, nodes);
898 		ret = -ERANGE;
899 		goto err;
900 	}
901 	/* Initiate the communication resources */
902 	ret = -ENOMEM;
903 	cinfo->recv_thread = md_register_thread(recv_daemon, mddev, "cluster_recv");
904 	if (!cinfo->recv_thread) {
905 		pr_err("md-cluster: cannot allocate memory for recv_thread!\n");
906 		goto err;
907 	}
908 	cinfo->message_lockres = lockres_init(mddev, "message", NULL, 1);
909 	if (!cinfo->message_lockres)
910 		goto err;
911 	cinfo->token_lockres = lockres_init(mddev, "token", NULL, 0);
912 	if (!cinfo->token_lockres)
913 		goto err;
914 	cinfo->no_new_dev_lockres = lockres_init(mddev, "no-new-dev", NULL, 0);
915 	if (!cinfo->no_new_dev_lockres)
916 		goto err;
917 
918 	ret = dlm_lock_sync(cinfo->token_lockres, DLM_LOCK_EX);
919 	if (ret) {
920 		ret = -EAGAIN;
921 		pr_err("md-cluster: can't join cluster to avoid lock issue\n");
922 		goto err;
923 	}
924 	cinfo->ack_lockres = lockres_init(mddev, "ack", ack_bast, 0);
925 	if (!cinfo->ack_lockres) {
926 		ret = -ENOMEM;
927 		goto err;
928 	}
929 	/* get sync CR lock on ACK. */
930 	if (dlm_lock_sync(cinfo->ack_lockres, DLM_LOCK_CR))
931 		pr_err("md-cluster: failed to get a sync CR lock on ACK!(%d)\n",
932 				ret);
933 	dlm_unlock_sync(cinfo->token_lockres);
934 	/* get sync CR lock on no-new-dev. */
935 	if (dlm_lock_sync(cinfo->no_new_dev_lockres, DLM_LOCK_CR))
936 		pr_err("md-cluster: failed to get a sync CR lock on no-new-dev!(%d)\n", ret);
937 
938 
939 	pr_info("md-cluster: Joined cluster %s slot %d\n", str, cinfo->slot_number);
940 	snprintf(str, 64, "bitmap%04d", cinfo->slot_number - 1);
941 	cinfo->bitmap_lockres = lockres_init(mddev, str, NULL, 1);
942 	if (!cinfo->bitmap_lockres) {
943 		ret = -ENOMEM;
944 		goto err;
945 	}
946 	if (dlm_lock_sync(cinfo->bitmap_lockres, DLM_LOCK_PW)) {
947 		pr_err("Failed to get bitmap lock\n");
948 		ret = -EINVAL;
949 		goto err;
950 	}
951 
952 	cinfo->resync_lockres = lockres_init(mddev, "resync", NULL, 0);
953 	if (!cinfo->resync_lockres) {
954 		ret = -ENOMEM;
955 		goto err;
956 	}
957 
958 	return 0;
959 err:
960 	set_bit(MD_CLUSTER_HOLDING_MUTEX_FOR_RECVD, &cinfo->state);
961 	md_unregister_thread(&cinfo->recovery_thread);
962 	md_unregister_thread(&cinfo->recv_thread);
963 	lockres_free(cinfo->message_lockres);
964 	lockres_free(cinfo->token_lockres);
965 	lockres_free(cinfo->ack_lockres);
966 	lockres_free(cinfo->no_new_dev_lockres);
967 	lockres_free(cinfo->resync_lockres);
968 	lockres_free(cinfo->bitmap_lockres);
969 	if (cinfo->lockspace)
970 		dlm_release_lockspace(cinfo->lockspace, 2);
971 	mddev->cluster_info = NULL;
972 	kfree(cinfo);
973 	return ret;
974 }
975 
976 static void load_bitmaps(struct mddev *mddev, int total_slots)
977 {
978 	struct md_cluster_info *cinfo = mddev->cluster_info;
979 
980 	/* load all the node's bitmap info for resync */
981 	if (gather_all_resync_info(mddev, total_slots))
982 		pr_err("md-cluster: failed to gather all resyn infos\n");
983 	set_bit(MD_CLUSTER_ALREADY_IN_CLUSTER, &cinfo->state);
984 	/* wake up recv thread in case something need to be handled */
985 	if (test_and_clear_bit(MD_CLUSTER_PENDING_RECV_EVENT, &cinfo->state))
986 		md_wakeup_thread(cinfo->recv_thread);
987 }
988 
989 static void resync_bitmap(struct mddev *mddev)
990 {
991 	struct md_cluster_info *cinfo = mddev->cluster_info;
992 	struct cluster_msg cmsg = {0};
993 	int err;
994 
995 	cmsg.type = cpu_to_le32(BITMAP_NEEDS_SYNC);
996 	err = sendmsg(cinfo, &cmsg, 1);
997 	if (err)
998 		pr_err("%s:%d: failed to send BITMAP_NEEDS_SYNC message (%d)\n",
999 			__func__, __LINE__, err);
1000 }
1001 
1002 static void unlock_all_bitmaps(struct mddev *mddev);
1003 static int leave(struct mddev *mddev)
1004 {
1005 	struct md_cluster_info *cinfo = mddev->cluster_info;
1006 
1007 	if (!cinfo)
1008 		return 0;
1009 
1010 	/* BITMAP_NEEDS_SYNC message should be sent when node
1011 	 * is leaving the cluster with dirty bitmap, also we
1012 	 * can only deliver it when dlm connection is available */
1013 	if (cinfo->slot_number > 0 && mddev->recovery_cp != MaxSector)
1014 		resync_bitmap(mddev);
1015 
1016 	set_bit(MD_CLUSTER_HOLDING_MUTEX_FOR_RECVD, &cinfo->state);
1017 	md_unregister_thread(&cinfo->recovery_thread);
1018 	md_unregister_thread(&cinfo->recv_thread);
1019 	lockres_free(cinfo->message_lockres);
1020 	lockres_free(cinfo->token_lockres);
1021 	lockres_free(cinfo->ack_lockres);
1022 	lockres_free(cinfo->no_new_dev_lockres);
1023 	lockres_free(cinfo->resync_lockres);
1024 	lockres_free(cinfo->bitmap_lockres);
1025 	unlock_all_bitmaps(mddev);
1026 	dlm_release_lockspace(cinfo->lockspace, 2);
1027 	kfree(cinfo);
1028 	return 0;
1029 }
1030 
1031 /* slot_number(): Returns the MD slot number to use
1032  * DLM starts the slot numbers from 1, wheras cluster-md
1033  * wants the number to be from zero, so we deduct one
1034  */
1035 static int slot_number(struct mddev *mddev)
1036 {
1037 	struct md_cluster_info *cinfo = mddev->cluster_info;
1038 
1039 	return cinfo->slot_number - 1;
1040 }
1041 
1042 /*
1043  * Check if the communication is already locked, else lock the communication
1044  * channel.
1045  * If it is already locked, token is in EX mode, and hence lock_token()
1046  * should not be called.
1047  */
1048 static int metadata_update_start(struct mddev *mddev)
1049 {
1050 	struct md_cluster_info *cinfo = mddev->cluster_info;
1051 	int ret;
1052 
1053 	/*
1054 	 * metadata_update_start is always called with the protection of
1055 	 * reconfig_mutex, so set WAITING_FOR_TOKEN here.
1056 	 */
1057 	ret = test_and_set_bit_lock(MD_CLUSTER_HOLDING_MUTEX_FOR_RECVD,
1058 				    &cinfo->state);
1059 	WARN_ON_ONCE(ret);
1060 	md_wakeup_thread(mddev->thread);
1061 
1062 	wait_event(cinfo->wait,
1063 		   !test_and_set_bit(MD_CLUSTER_SEND_LOCK, &cinfo->state) ||
1064 		   test_and_clear_bit(MD_CLUSTER_SEND_LOCKED_ALREADY, &cinfo->state));
1065 
1066 	/* If token is already locked, return 0 */
1067 	if (cinfo->token_lockres->mode == DLM_LOCK_EX) {
1068 		clear_bit_unlock(MD_CLUSTER_HOLDING_MUTEX_FOR_RECVD, &cinfo->state);
1069 		return 0;
1070 	}
1071 
1072 	ret = lock_token(cinfo, 1);
1073 	clear_bit_unlock(MD_CLUSTER_HOLDING_MUTEX_FOR_RECVD, &cinfo->state);
1074 	return ret;
1075 }
1076 
1077 static int metadata_update_finish(struct mddev *mddev)
1078 {
1079 	struct md_cluster_info *cinfo = mddev->cluster_info;
1080 	struct cluster_msg cmsg;
1081 	struct md_rdev *rdev;
1082 	int ret = 0;
1083 	int raid_slot = -1;
1084 
1085 	memset(&cmsg, 0, sizeof(cmsg));
1086 	cmsg.type = cpu_to_le32(METADATA_UPDATED);
1087 	/* Pick up a good active device number to send.
1088 	 */
1089 	rdev_for_each(rdev, mddev)
1090 		if (rdev->raid_disk > -1 && !test_bit(Faulty, &rdev->flags)) {
1091 			raid_slot = rdev->desc_nr;
1092 			break;
1093 		}
1094 	if (raid_slot >= 0) {
1095 		cmsg.raid_slot = cpu_to_le32(raid_slot);
1096 		ret = __sendmsg(cinfo, &cmsg);
1097 	} else
1098 		pr_warn("md-cluster: No good device id found to send\n");
1099 	clear_bit(MD_CLUSTER_SEND_LOCKED_ALREADY, &cinfo->state);
1100 	unlock_comm(cinfo);
1101 	return ret;
1102 }
1103 
1104 static void metadata_update_cancel(struct mddev *mddev)
1105 {
1106 	struct md_cluster_info *cinfo = mddev->cluster_info;
1107 	clear_bit(MD_CLUSTER_SEND_LOCKED_ALREADY, &cinfo->state);
1108 	unlock_comm(cinfo);
1109 }
1110 
1111 static int update_bitmap_size(struct mddev *mddev, sector_t size)
1112 {
1113 	struct md_cluster_info *cinfo = mddev->cluster_info;
1114 	struct cluster_msg cmsg = {0};
1115 	int ret;
1116 
1117 	cmsg.type = cpu_to_le32(BITMAP_RESIZE);
1118 	cmsg.high = cpu_to_le64(size);
1119 	ret = sendmsg(cinfo, &cmsg, 0);
1120 	if (ret)
1121 		pr_err("%s:%d: failed to send BITMAP_RESIZE message (%d)\n",
1122 			__func__, __LINE__, ret);
1123 	return ret;
1124 }
1125 
1126 static int resize_bitmaps(struct mddev *mddev, sector_t newsize, sector_t oldsize)
1127 {
1128 	struct bitmap_counts *counts;
1129 	char str[64];
1130 	struct dlm_lock_resource *bm_lockres;
1131 	struct bitmap *bitmap = mddev->bitmap;
1132 	unsigned long my_pages = bitmap->counts.pages;
1133 	int i, rv;
1134 
1135 	/*
1136 	 * We need to ensure all the nodes can grow to a larger
1137 	 * bitmap size before make the reshaping.
1138 	 */
1139 	rv = update_bitmap_size(mddev, newsize);
1140 	if (rv)
1141 		return rv;
1142 
1143 	for (i = 0; i < mddev->bitmap_info.nodes; i++) {
1144 		if (i == md_cluster_ops->slot_number(mddev))
1145 			continue;
1146 
1147 		bitmap = get_bitmap_from_slot(mddev, i);
1148 		if (IS_ERR(bitmap)) {
1149 			pr_err("can't get bitmap from slot %d\n", i);
1150 			goto out;
1151 		}
1152 		counts = &bitmap->counts;
1153 
1154 		/*
1155 		 * If we can hold the bitmap lock of one node then
1156 		 * the slot is not occupied, update the pages.
1157 		 */
1158 		snprintf(str, 64, "bitmap%04d", i);
1159 		bm_lockres = lockres_init(mddev, str, NULL, 1);
1160 		if (!bm_lockres) {
1161 			pr_err("Cannot initialize %s lock\n", str);
1162 			goto out;
1163 		}
1164 		bm_lockres->flags |= DLM_LKF_NOQUEUE;
1165 		rv = dlm_lock_sync(bm_lockres, DLM_LOCK_PW);
1166 		if (!rv)
1167 			counts->pages = my_pages;
1168 		lockres_free(bm_lockres);
1169 
1170 		if (my_pages != counts->pages)
1171 			/*
1172 			 * Let's revert the bitmap size if one node
1173 			 * can't resize bitmap
1174 			 */
1175 			goto out;
1176 	}
1177 
1178 	return 0;
1179 out:
1180 	md_bitmap_free(bitmap);
1181 	update_bitmap_size(mddev, oldsize);
1182 	return -1;
1183 }
1184 
1185 /*
1186  * return 0 if all the bitmaps have the same sync_size
1187  */
1188 static int cluster_check_sync_size(struct mddev *mddev)
1189 {
1190 	int i, rv;
1191 	bitmap_super_t *sb;
1192 	unsigned long my_sync_size, sync_size = 0;
1193 	int node_num = mddev->bitmap_info.nodes;
1194 	int current_slot = md_cluster_ops->slot_number(mddev);
1195 	struct bitmap *bitmap = mddev->bitmap;
1196 	char str[64];
1197 	struct dlm_lock_resource *bm_lockres;
1198 
1199 	sb = kmap_atomic(bitmap->storage.sb_page);
1200 	my_sync_size = sb->sync_size;
1201 	kunmap_atomic(sb);
1202 
1203 	for (i = 0; i < node_num; i++) {
1204 		if (i == current_slot)
1205 			continue;
1206 
1207 		bitmap = get_bitmap_from_slot(mddev, i);
1208 		if (IS_ERR(bitmap)) {
1209 			pr_err("can't get bitmap from slot %d\n", i);
1210 			return -1;
1211 		}
1212 
1213 		/*
1214 		 * If we can hold the bitmap lock of one node then
1215 		 * the slot is not occupied, update the sb.
1216 		 */
1217 		snprintf(str, 64, "bitmap%04d", i);
1218 		bm_lockres = lockres_init(mddev, str, NULL, 1);
1219 		if (!bm_lockres) {
1220 			pr_err("md-cluster: Cannot initialize %s\n", str);
1221 			md_bitmap_free(bitmap);
1222 			return -1;
1223 		}
1224 		bm_lockres->flags |= DLM_LKF_NOQUEUE;
1225 		rv = dlm_lock_sync(bm_lockres, DLM_LOCK_PW);
1226 		if (!rv)
1227 			md_bitmap_update_sb(bitmap);
1228 		lockres_free(bm_lockres);
1229 
1230 		sb = kmap_atomic(bitmap->storage.sb_page);
1231 		if (sync_size == 0)
1232 			sync_size = sb->sync_size;
1233 		else if (sync_size != sb->sync_size) {
1234 			kunmap_atomic(sb);
1235 			md_bitmap_free(bitmap);
1236 			return -1;
1237 		}
1238 		kunmap_atomic(sb);
1239 		md_bitmap_free(bitmap);
1240 	}
1241 
1242 	return (my_sync_size == sync_size) ? 0 : -1;
1243 }
1244 
1245 /*
1246  * Update the size for cluster raid is a little more complex, we perform it
1247  * by the steps:
1248  * 1. hold token lock and update superblock in initiator node.
1249  * 2. send METADATA_UPDATED msg to other nodes.
1250  * 3. The initiator node continues to check each bitmap's sync_size, if all
1251  *    bitmaps have the same value of sync_size, then we can set capacity and
1252  *    let other nodes to perform it. If one node can't update sync_size
1253  *    accordingly, we need to revert to previous value.
1254  */
1255 static void update_size(struct mddev *mddev, sector_t old_dev_sectors)
1256 {
1257 	struct md_cluster_info *cinfo = mddev->cluster_info;
1258 	struct cluster_msg cmsg;
1259 	struct md_rdev *rdev;
1260 	int ret = 0;
1261 	int raid_slot = -1;
1262 
1263 	md_update_sb(mddev, 1);
1264 	lock_comm(cinfo, 1);
1265 
1266 	memset(&cmsg, 0, sizeof(cmsg));
1267 	cmsg.type = cpu_to_le32(METADATA_UPDATED);
1268 	rdev_for_each(rdev, mddev)
1269 		if (rdev->raid_disk >= 0 && !test_bit(Faulty, &rdev->flags)) {
1270 			raid_slot = rdev->desc_nr;
1271 			break;
1272 		}
1273 	if (raid_slot >= 0) {
1274 		cmsg.raid_slot = cpu_to_le32(raid_slot);
1275 		/*
1276 		 * We can only change capiticy after all the nodes can do it,
1277 		 * so need to wait after other nodes already received the msg
1278 		 * and handled the change
1279 		 */
1280 		ret = __sendmsg(cinfo, &cmsg);
1281 		if (ret) {
1282 			pr_err("%s:%d: failed to send METADATA_UPDATED msg\n",
1283 			       __func__, __LINE__);
1284 			unlock_comm(cinfo);
1285 			return;
1286 		}
1287 	} else {
1288 		pr_err("md-cluster: No good device id found to send\n");
1289 		unlock_comm(cinfo);
1290 		return;
1291 	}
1292 
1293 	/*
1294 	 * check the sync_size from other node's bitmap, if sync_size
1295 	 * have already updated in other nodes as expected, send an
1296 	 * empty metadata msg to permit the change of capacity
1297 	 */
1298 	if (cluster_check_sync_size(mddev) == 0) {
1299 		memset(&cmsg, 0, sizeof(cmsg));
1300 		cmsg.type = cpu_to_le32(CHANGE_CAPACITY);
1301 		ret = __sendmsg(cinfo, &cmsg);
1302 		if (ret)
1303 			pr_err("%s:%d: failed to send CHANGE_CAPACITY msg\n",
1304 			       __func__, __LINE__);
1305 		set_capacity(mddev->gendisk, mddev->array_sectors);
1306 		revalidate_disk(mddev->gendisk);
1307 	} else {
1308 		/* revert to previous sectors */
1309 		ret = mddev->pers->resize(mddev, old_dev_sectors);
1310 		if (!ret)
1311 			revalidate_disk(mddev->gendisk);
1312 		ret = __sendmsg(cinfo, &cmsg);
1313 		if (ret)
1314 			pr_err("%s:%d: failed to send METADATA_UPDATED msg\n",
1315 			       __func__, __LINE__);
1316 	}
1317 	unlock_comm(cinfo);
1318 }
1319 
1320 static int resync_start(struct mddev *mddev)
1321 {
1322 	struct md_cluster_info *cinfo = mddev->cluster_info;
1323 	return dlm_lock_sync_interruptible(cinfo->resync_lockres, DLM_LOCK_EX, mddev);
1324 }
1325 
1326 static int resync_info_update(struct mddev *mddev, sector_t lo, sector_t hi)
1327 {
1328 	struct md_cluster_info *cinfo = mddev->cluster_info;
1329 	struct resync_info ri;
1330 	struct cluster_msg cmsg = {0};
1331 
1332 	/* do not send zero again, if we have sent before */
1333 	if (hi == 0) {
1334 		memcpy(&ri, cinfo->bitmap_lockres->lksb.sb_lvbptr, sizeof(struct resync_info));
1335 		if (le64_to_cpu(ri.hi) == 0)
1336 			return 0;
1337 	}
1338 
1339 	add_resync_info(cinfo->bitmap_lockres, lo, hi);
1340 	/* Re-acquire the lock to refresh LVB */
1341 	dlm_lock_sync(cinfo->bitmap_lockres, DLM_LOCK_PW);
1342 	cmsg.type = cpu_to_le32(RESYNCING);
1343 	cmsg.low = cpu_to_le64(lo);
1344 	cmsg.high = cpu_to_le64(hi);
1345 
1346 	/*
1347 	 * mddev_lock is held if resync_info_update is called from
1348 	 * resync_finish (md_reap_sync_thread -> resync_finish)
1349 	 */
1350 	if (lo == 0 && hi == 0)
1351 		return sendmsg(cinfo, &cmsg, 1);
1352 	else
1353 		return sendmsg(cinfo, &cmsg, 0);
1354 }
1355 
1356 static int resync_finish(struct mddev *mddev)
1357 {
1358 	struct md_cluster_info *cinfo = mddev->cluster_info;
1359 	int ret = 0;
1360 
1361 	clear_bit(MD_RESYNCING_REMOTE, &mddev->recovery);
1362 
1363 	/*
1364 	 * If resync thread is interrupted so we can't say resync is finished,
1365 	 * another node will launch resync thread to continue.
1366 	 */
1367 	if (!test_bit(MD_CLOSING, &mddev->flags))
1368 		ret = resync_info_update(mddev, 0, 0);
1369 	dlm_unlock_sync(cinfo->resync_lockres);
1370 	return ret;
1371 }
1372 
1373 static int area_resyncing(struct mddev *mddev, int direction,
1374 		sector_t lo, sector_t hi)
1375 {
1376 	struct md_cluster_info *cinfo = mddev->cluster_info;
1377 	int ret = 0;
1378 	struct suspend_info *s;
1379 
1380 	if ((direction == READ) &&
1381 		test_bit(MD_CLUSTER_SUSPEND_READ_BALANCING, &cinfo->state))
1382 		return 1;
1383 
1384 	spin_lock_irq(&cinfo->suspend_lock);
1385 	if (list_empty(&cinfo->suspend_list))
1386 		goto out;
1387 	list_for_each_entry(s, &cinfo->suspend_list, list)
1388 		if (hi > s->lo && lo < s->hi) {
1389 			ret = 1;
1390 			break;
1391 		}
1392 out:
1393 	spin_unlock_irq(&cinfo->suspend_lock);
1394 	return ret;
1395 }
1396 
1397 /* add_new_disk() - initiates a disk add
1398  * However, if this fails before writing md_update_sb(),
1399  * add_new_disk_cancel() must be called to release token lock
1400  */
1401 static int add_new_disk(struct mddev *mddev, struct md_rdev *rdev)
1402 {
1403 	struct md_cluster_info *cinfo = mddev->cluster_info;
1404 	struct cluster_msg cmsg;
1405 	int ret = 0;
1406 	struct mdp_superblock_1 *sb = page_address(rdev->sb_page);
1407 	char *uuid = sb->device_uuid;
1408 
1409 	memset(&cmsg, 0, sizeof(cmsg));
1410 	cmsg.type = cpu_to_le32(NEWDISK);
1411 	memcpy(cmsg.uuid, uuid, 16);
1412 	cmsg.raid_slot = cpu_to_le32(rdev->desc_nr);
1413 	lock_comm(cinfo, 1);
1414 	ret = __sendmsg(cinfo, &cmsg);
1415 	if (ret) {
1416 		unlock_comm(cinfo);
1417 		return ret;
1418 	}
1419 	cinfo->no_new_dev_lockres->flags |= DLM_LKF_NOQUEUE;
1420 	ret = dlm_lock_sync(cinfo->no_new_dev_lockres, DLM_LOCK_EX);
1421 	cinfo->no_new_dev_lockres->flags &= ~DLM_LKF_NOQUEUE;
1422 	/* Some node does not "see" the device */
1423 	if (ret == -EAGAIN)
1424 		ret = -ENOENT;
1425 	if (ret)
1426 		unlock_comm(cinfo);
1427 	else {
1428 		dlm_lock_sync(cinfo->no_new_dev_lockres, DLM_LOCK_CR);
1429 		/* Since MD_CHANGE_DEVS will be set in add_bound_rdev which
1430 		 * will run soon after add_new_disk, the below path will be
1431 		 * invoked:
1432 		 *   md_wakeup_thread(mddev->thread)
1433 		 *	-> conf->thread (raid1d)
1434 		 *	-> md_check_recovery -> md_update_sb
1435 		 *	-> metadata_update_start/finish
1436 		 * MD_CLUSTER_SEND_LOCKED_ALREADY will be cleared eventually.
1437 		 *
1438 		 * For other failure cases, metadata_update_cancel and
1439 		 * add_new_disk_cancel also clear below bit as well.
1440 		 * */
1441 		set_bit(MD_CLUSTER_SEND_LOCKED_ALREADY, &cinfo->state);
1442 		wake_up(&cinfo->wait);
1443 	}
1444 	return ret;
1445 }
1446 
1447 static void add_new_disk_cancel(struct mddev *mddev)
1448 {
1449 	struct md_cluster_info *cinfo = mddev->cluster_info;
1450 	clear_bit(MD_CLUSTER_SEND_LOCKED_ALREADY, &cinfo->state);
1451 	unlock_comm(cinfo);
1452 }
1453 
1454 static int new_disk_ack(struct mddev *mddev, bool ack)
1455 {
1456 	struct md_cluster_info *cinfo = mddev->cluster_info;
1457 
1458 	if (!test_bit(MD_CLUSTER_WAITING_FOR_NEWDISK, &cinfo->state)) {
1459 		pr_warn("md-cluster(%s): Spurious cluster confirmation\n", mdname(mddev));
1460 		return -EINVAL;
1461 	}
1462 
1463 	if (ack)
1464 		dlm_unlock_sync(cinfo->no_new_dev_lockres);
1465 	complete(&cinfo->newdisk_completion);
1466 	return 0;
1467 }
1468 
1469 static int remove_disk(struct mddev *mddev, struct md_rdev *rdev)
1470 {
1471 	struct cluster_msg cmsg = {0};
1472 	struct md_cluster_info *cinfo = mddev->cluster_info;
1473 	cmsg.type = cpu_to_le32(REMOVE);
1474 	cmsg.raid_slot = cpu_to_le32(rdev->desc_nr);
1475 	return sendmsg(cinfo, &cmsg, 1);
1476 }
1477 
1478 static int lock_all_bitmaps(struct mddev *mddev)
1479 {
1480 	int slot, my_slot, ret, held = 1, i = 0;
1481 	char str[64];
1482 	struct md_cluster_info *cinfo = mddev->cluster_info;
1483 
1484 	cinfo->other_bitmap_lockres =
1485 		kcalloc(mddev->bitmap_info.nodes - 1,
1486 			sizeof(struct dlm_lock_resource *), GFP_KERNEL);
1487 	if (!cinfo->other_bitmap_lockres) {
1488 		pr_err("md: can't alloc mem for other bitmap locks\n");
1489 		return 0;
1490 	}
1491 
1492 	my_slot = slot_number(mddev);
1493 	for (slot = 0; slot < mddev->bitmap_info.nodes; slot++) {
1494 		if (slot == my_slot)
1495 			continue;
1496 
1497 		memset(str, '\0', 64);
1498 		snprintf(str, 64, "bitmap%04d", slot);
1499 		cinfo->other_bitmap_lockres[i] = lockres_init(mddev, str, NULL, 1);
1500 		if (!cinfo->other_bitmap_lockres[i])
1501 			return -ENOMEM;
1502 
1503 		cinfo->other_bitmap_lockres[i]->flags |= DLM_LKF_NOQUEUE;
1504 		ret = dlm_lock_sync(cinfo->other_bitmap_lockres[i], DLM_LOCK_PW);
1505 		if (ret)
1506 			held = -1;
1507 		i++;
1508 	}
1509 
1510 	return held;
1511 }
1512 
1513 static void unlock_all_bitmaps(struct mddev *mddev)
1514 {
1515 	struct md_cluster_info *cinfo = mddev->cluster_info;
1516 	int i;
1517 
1518 	/* release other node's bitmap lock if they are existed */
1519 	if (cinfo->other_bitmap_lockres) {
1520 		for (i = 0; i < mddev->bitmap_info.nodes - 1; i++) {
1521 			if (cinfo->other_bitmap_lockres[i]) {
1522 				lockres_free(cinfo->other_bitmap_lockres[i]);
1523 			}
1524 		}
1525 		kfree(cinfo->other_bitmap_lockres);
1526 	}
1527 }
1528 
1529 static int gather_bitmaps(struct md_rdev *rdev)
1530 {
1531 	int sn, err;
1532 	sector_t lo, hi;
1533 	struct cluster_msg cmsg = {0};
1534 	struct mddev *mddev = rdev->mddev;
1535 	struct md_cluster_info *cinfo = mddev->cluster_info;
1536 
1537 	cmsg.type = cpu_to_le32(RE_ADD);
1538 	cmsg.raid_slot = cpu_to_le32(rdev->desc_nr);
1539 	err = sendmsg(cinfo, &cmsg, 1);
1540 	if (err)
1541 		goto out;
1542 
1543 	for (sn = 0; sn < mddev->bitmap_info.nodes; sn++) {
1544 		if (sn == (cinfo->slot_number - 1))
1545 			continue;
1546 		err = md_bitmap_copy_from_slot(mddev, sn, &lo, &hi, false);
1547 		if (err) {
1548 			pr_warn("md-cluster: Could not gather bitmaps from slot %d", sn);
1549 			goto out;
1550 		}
1551 		if ((hi > 0) && (lo < mddev->recovery_cp))
1552 			mddev->recovery_cp = lo;
1553 	}
1554 out:
1555 	return err;
1556 }
1557 
1558 static struct md_cluster_operations cluster_ops = {
1559 	.join   = join,
1560 	.leave  = leave,
1561 	.slot_number = slot_number,
1562 	.resync_start = resync_start,
1563 	.resync_finish = resync_finish,
1564 	.resync_info_update = resync_info_update,
1565 	.metadata_update_start = metadata_update_start,
1566 	.metadata_update_finish = metadata_update_finish,
1567 	.metadata_update_cancel = metadata_update_cancel,
1568 	.area_resyncing = area_resyncing,
1569 	.add_new_disk = add_new_disk,
1570 	.add_new_disk_cancel = add_new_disk_cancel,
1571 	.new_disk_ack = new_disk_ack,
1572 	.remove_disk = remove_disk,
1573 	.load_bitmaps = load_bitmaps,
1574 	.gather_bitmaps = gather_bitmaps,
1575 	.resize_bitmaps = resize_bitmaps,
1576 	.lock_all_bitmaps = lock_all_bitmaps,
1577 	.unlock_all_bitmaps = unlock_all_bitmaps,
1578 	.update_size = update_size,
1579 };
1580 
1581 static int __init cluster_init(void)
1582 {
1583 	pr_warn("md-cluster: support raid1 and raid10 (limited support)\n");
1584 	pr_info("Registering Cluster MD functions\n");
1585 	register_md_cluster_operations(&cluster_ops, THIS_MODULE);
1586 	return 0;
1587 }
1588 
1589 static void cluster_exit(void)
1590 {
1591 	unregister_md_cluster_operations();
1592 }
1593 
1594 module_init(cluster_init);
1595 module_exit(cluster_exit);
1596 MODULE_AUTHOR("SUSE");
1597 MODULE_LICENSE("GPL");
1598 MODULE_DESCRIPTION("Clustering support for MD");
1599