xref: /openbmc/linux/drivers/md/md-cluster.c (revision 293467aa)
1 /*
2  * Copyright (C) 2015, SUSE
3  *
4  * This program is free software; you can redistribute it and/or modify
5  * it under the terms of the GNU General Public License as published by
6  * the Free Software Foundation; either version 2, or (at your option)
7  * any later version.
8  *
9  */
10 
11 
12 #include <linux/module.h>
13 #include <linux/dlm.h>
14 #include <linux/sched.h>
15 #include "md.h"
16 #include "bitmap.h"
17 #include "md-cluster.h"
18 
19 #define LVB_SIZE	64
20 
21 struct dlm_lock_resource {
22 	dlm_lockspace_t *ls;
23 	struct dlm_lksb lksb;
24 	char *name; /* lock name. */
25 	uint32_t flags; /* flags to pass to dlm_lock() */
26 	struct completion completion; /* completion for synchronized locking */
27 	void (*bast)(void *arg, int mode); /* blocking AST function pointer*/
28 	struct mddev *mddev; /* pointing back to mddev. */
29 };
30 
31 struct suspend_info {
32 	int slot;
33 	sector_t lo;
34 	sector_t hi;
35 	struct list_head list;
36 };
37 
38 struct resync_info {
39 	__le64 lo;
40 	__le64 hi;
41 };
42 
43 struct md_cluster_info {
44 	/* dlm lock space and resources for clustered raid. */
45 	dlm_lockspace_t *lockspace;
46 	int slot_number;
47 	struct completion completion;
48 	struct dlm_lock_resource *sb_lock;
49 	struct mutex sb_mutex;
50 	struct dlm_lock_resource *bitmap_lockres;
51 	struct list_head suspend_list;
52 	spinlock_t suspend_lock;
53 	struct md_thread *recovery_thread;
54 	unsigned long recovery_map;
55 	/* communication loc resources */
56 	struct dlm_lock_resource *ack_lockres;
57 	struct dlm_lock_resource *message_lockres;
58 	struct dlm_lock_resource *token_lockres;
59 	struct md_thread *recv_thread;
60 };
61 
62 enum msg_type {
63 	METADATA_UPDATED = 0,
64 	RESYNCING,
65 };
66 
67 struct cluster_msg {
68 	int type;
69 	int slot;
70 	sector_t low;
71 	sector_t high;
72 };
73 
74 static void sync_ast(void *arg)
75 {
76 	struct dlm_lock_resource *res;
77 
78 	res = (struct dlm_lock_resource *) arg;
79 	complete(&res->completion);
80 }
81 
82 static int dlm_lock_sync(struct dlm_lock_resource *res, int mode)
83 {
84 	int ret = 0;
85 
86 	init_completion(&res->completion);
87 	ret = dlm_lock(res->ls, mode, &res->lksb,
88 			res->flags, res->name, strlen(res->name),
89 			0, sync_ast, res, res->bast);
90 	if (ret)
91 		return ret;
92 	wait_for_completion(&res->completion);
93 	return res->lksb.sb_status;
94 }
95 
96 static int dlm_unlock_sync(struct dlm_lock_resource *res)
97 {
98 	return dlm_lock_sync(res, DLM_LOCK_NL);
99 }
100 
101 static struct dlm_lock_resource *lockres_init(struct mddev *mddev,
102 		char *name, void (*bastfn)(void *arg, int mode), int with_lvb)
103 {
104 	struct dlm_lock_resource *res = NULL;
105 	int ret, namelen;
106 	struct md_cluster_info *cinfo = mddev->cluster_info;
107 
108 	res = kzalloc(sizeof(struct dlm_lock_resource), GFP_KERNEL);
109 	if (!res)
110 		return NULL;
111 	res->ls = cinfo->lockspace;
112 	res->mddev = mddev;
113 	namelen = strlen(name);
114 	res->name = kzalloc(namelen + 1, GFP_KERNEL);
115 	if (!res->name) {
116 		pr_err("md-cluster: Unable to allocate resource name for resource %s\n", name);
117 		goto out_err;
118 	}
119 	strlcpy(res->name, name, namelen + 1);
120 	if (with_lvb) {
121 		res->lksb.sb_lvbptr = kzalloc(LVB_SIZE, GFP_KERNEL);
122 		if (!res->lksb.sb_lvbptr) {
123 			pr_err("md-cluster: Unable to allocate LVB for resource %s\n", name);
124 			goto out_err;
125 		}
126 		res->flags = DLM_LKF_VALBLK;
127 	}
128 
129 	if (bastfn)
130 		res->bast = bastfn;
131 
132 	res->flags |= DLM_LKF_EXPEDITE;
133 
134 	ret = dlm_lock_sync(res, DLM_LOCK_NL);
135 	if (ret) {
136 		pr_err("md-cluster: Unable to lock NL on new lock resource %s\n", name);
137 		goto out_err;
138 	}
139 	res->flags &= ~DLM_LKF_EXPEDITE;
140 	res->flags |= DLM_LKF_CONVERT;
141 
142 	return res;
143 out_err:
144 	kfree(res->lksb.sb_lvbptr);
145 	kfree(res->name);
146 	kfree(res);
147 	return NULL;
148 }
149 
150 static void lockres_free(struct dlm_lock_resource *res)
151 {
152 	if (!res)
153 		return;
154 
155 	init_completion(&res->completion);
156 	dlm_unlock(res->ls, res->lksb.sb_lkid, 0, &res->lksb, res);
157 	wait_for_completion(&res->completion);
158 
159 	kfree(res->name);
160 	kfree(res->lksb.sb_lvbptr);
161 	kfree(res);
162 }
163 
164 static char *pretty_uuid(char *dest, char *src)
165 {
166 	int i, len = 0;
167 
168 	for (i = 0; i < 16; i++) {
169 		if (i == 4 || i == 6 || i == 8 || i == 10)
170 			len += sprintf(dest + len, "-");
171 		len += sprintf(dest + len, "%02x", (__u8)src[i]);
172 	}
173 	return dest;
174 }
175 
176 static void add_resync_info(struct mddev *mddev, struct dlm_lock_resource *lockres,
177 		sector_t lo, sector_t hi)
178 {
179 	struct resync_info *ri;
180 
181 	ri = (struct resync_info *)lockres->lksb.sb_lvbptr;
182 	ri->lo = cpu_to_le64(lo);
183 	ri->hi = cpu_to_le64(hi);
184 }
185 
186 static struct suspend_info *read_resync_info(struct mddev *mddev, struct dlm_lock_resource *lockres)
187 {
188 	struct resync_info ri;
189 	struct suspend_info *s = NULL;
190 	sector_t hi = 0;
191 
192 	dlm_lock_sync(lockres, DLM_LOCK_CR);
193 	memcpy(&ri, lockres->lksb.sb_lvbptr, sizeof(struct resync_info));
194 	hi = le64_to_cpu(ri.hi);
195 	if (ri.hi > 0) {
196 		s = kzalloc(sizeof(struct suspend_info), GFP_KERNEL);
197 		if (!s)
198 			goto out;
199 		s->hi = hi;
200 		s->lo = le64_to_cpu(ri.lo);
201 	}
202 	dlm_unlock_sync(lockres);
203 out:
204 	return s;
205 }
206 
207 void recover_bitmaps(struct md_thread *thread)
208 {
209 	struct mddev *mddev = thread->mddev;
210 	struct md_cluster_info *cinfo = mddev->cluster_info;
211 	struct dlm_lock_resource *bm_lockres;
212 	char str[64];
213 	int slot, ret;
214 	struct suspend_info *s, *tmp;
215 	sector_t lo, hi;
216 
217 	while (cinfo->recovery_map) {
218 		slot = fls64((u64)cinfo->recovery_map) - 1;
219 
220 		/* Clear suspend_area associated with the bitmap */
221 		spin_lock_irq(&cinfo->suspend_lock);
222 		list_for_each_entry_safe(s, tmp, &cinfo->suspend_list, list)
223 			if (slot == s->slot) {
224 				list_del(&s->list);
225 				kfree(s);
226 			}
227 		spin_unlock_irq(&cinfo->suspend_lock);
228 
229 		snprintf(str, 64, "bitmap%04d", slot);
230 		bm_lockres = lockres_init(mddev, str, NULL, 1);
231 		if (!bm_lockres) {
232 			pr_err("md-cluster: Cannot initialize bitmaps\n");
233 			goto clear_bit;
234 		}
235 
236 		ret = dlm_lock_sync(bm_lockres, DLM_LOCK_PW);
237 		if (ret) {
238 			pr_err("md-cluster: Could not DLM lock %s: %d\n",
239 					str, ret);
240 			goto clear_bit;
241 		}
242 		ret = bitmap_copy_from_slot(mddev, slot, &lo, &hi);
243 		if (ret) {
244 			pr_err("md-cluster: Could not copy data from bitmap %d\n", slot);
245 			goto dlm_unlock;
246 		}
247 		if (hi > 0) {
248 			/* TODO:Wait for current resync to get over */
249 			set_bit(MD_RECOVERY_NEEDED, &mddev->recovery);
250 			if (lo < mddev->recovery_cp)
251 				mddev->recovery_cp = lo;
252 			md_check_recovery(mddev);
253 		}
254 dlm_unlock:
255 		dlm_unlock_sync(bm_lockres);
256 clear_bit:
257 		clear_bit(slot, &cinfo->recovery_map);
258 	}
259 }
260 
261 static void recover_prep(void *arg)
262 {
263 }
264 
265 static void recover_slot(void *arg, struct dlm_slot *slot)
266 {
267 	struct mddev *mddev = arg;
268 	struct md_cluster_info *cinfo = mddev->cluster_info;
269 
270 	pr_info("md-cluster: %s Node %d/%d down. My slot: %d. Initiating recovery.\n",
271 			mddev->bitmap_info.cluster_name,
272 			slot->nodeid, slot->slot,
273 			cinfo->slot_number);
274 	set_bit(slot->slot - 1, &cinfo->recovery_map);
275 	if (!cinfo->recovery_thread) {
276 		cinfo->recovery_thread = md_register_thread(recover_bitmaps,
277 				mddev, "recover");
278 		if (!cinfo->recovery_thread) {
279 			pr_warn("md-cluster: Could not create recovery thread\n");
280 			return;
281 		}
282 	}
283 	md_wakeup_thread(cinfo->recovery_thread);
284 }
285 
286 static void recover_done(void *arg, struct dlm_slot *slots,
287 		int num_slots, int our_slot,
288 		uint32_t generation)
289 {
290 	struct mddev *mddev = arg;
291 	struct md_cluster_info *cinfo = mddev->cluster_info;
292 
293 	cinfo->slot_number = our_slot;
294 	complete(&cinfo->completion);
295 }
296 
297 static const struct dlm_lockspace_ops md_ls_ops = {
298 	.recover_prep = recover_prep,
299 	.recover_slot = recover_slot,
300 	.recover_done = recover_done,
301 };
302 
303 /*
304  * The BAST function for the ack lock resource
305  * This function wakes up the receive thread in
306  * order to receive and process the message.
307  */
308 static void ack_bast(void *arg, int mode)
309 {
310 	struct dlm_lock_resource *res = (struct dlm_lock_resource *)arg;
311 	struct md_cluster_info *cinfo = res->mddev->cluster_info;
312 
313 	if (mode == DLM_LOCK_EX)
314 		md_wakeup_thread(cinfo->recv_thread);
315 }
316 
317 static void process_recvd_msg(struct mddev *mddev, struct cluster_msg *msg)
318 {
319 	switch (msg->type) {
320 	case METADATA_UPDATED:
321 		pr_info("%s: %d Received message: METADATA_UPDATE from %d\n",
322 			__func__, __LINE__, msg->slot);
323 		break;
324 	case RESYNCING:
325 		pr_info("%s: %d Received message: RESYNCING from %d\n",
326 			__func__, __LINE__, msg->slot);
327 		break;
328 	};
329 }
330 
331 /*
332  * thread for receiving message
333  */
334 static void recv_daemon(struct md_thread *thread)
335 {
336 	struct md_cluster_info *cinfo = thread->mddev->cluster_info;
337 	struct dlm_lock_resource *ack_lockres = cinfo->ack_lockres;
338 	struct dlm_lock_resource *message_lockres = cinfo->message_lockres;
339 	struct cluster_msg msg;
340 
341 	/*get CR on Message*/
342 	if (dlm_lock_sync(message_lockres, DLM_LOCK_CR)) {
343 		pr_err("md/raid1:failed to get CR on MESSAGE\n");
344 		return;
345 	}
346 
347 	/* read lvb and wake up thread to process this message_lockres */
348 	memcpy(&msg, message_lockres->lksb.sb_lvbptr, sizeof(struct cluster_msg));
349 	process_recvd_msg(thread->mddev, &msg);
350 
351 	/*release CR on ack_lockres*/
352 	dlm_unlock_sync(ack_lockres);
353 	/*up-convert to EX on message_lockres*/
354 	dlm_lock_sync(message_lockres, DLM_LOCK_EX);
355 	/*get CR on ack_lockres again*/
356 	dlm_lock_sync(ack_lockres, DLM_LOCK_CR);
357 	/*release CR on message_lockres*/
358 	dlm_unlock_sync(message_lockres);
359 }
360 
361 /* lock_comm()
362  * Takes the lock on the TOKEN lock resource so no other
363  * node can communicate while the operation is underway.
364  */
365 static int lock_comm(struct md_cluster_info *cinfo)
366 {
367 	int error;
368 
369 	error = dlm_lock_sync(cinfo->token_lockres, DLM_LOCK_EX);
370 	if (error)
371 		pr_err("md-cluster(%s:%d): failed to get EX on TOKEN (%d)\n",
372 				__func__, __LINE__, error);
373 	return error;
374 }
375 
376 static void unlock_comm(struct md_cluster_info *cinfo)
377 {
378 	dlm_unlock_sync(cinfo->token_lockres);
379 }
380 
381 /* __sendmsg()
382  * This function performs the actual sending of the message. This function is
383  * usually called after performing the encompassing operation
384  * The function:
385  * 1. Grabs the message lockresource in EX mode
386  * 2. Copies the message to the message LVB
387  * 3. Downconverts message lockresource to CR
388  * 4. Upconverts ack lock resource from CR to EX. This forces the BAST on other nodes
389  *    and the other nodes read the message. The thread will wait here until all other
390  *    nodes have released ack lock resource.
391  * 5. Downconvert ack lockresource to CR
392  */
393 static int __sendmsg(struct md_cluster_info *cinfo, struct cluster_msg *cmsg)
394 {
395 	int error;
396 	int slot = cinfo->slot_number - 1;
397 
398 	cmsg->slot = cpu_to_le32(slot);
399 	/*get EX on Message*/
400 	error = dlm_lock_sync(cinfo->message_lockres, DLM_LOCK_EX);
401 	if (error) {
402 		pr_err("md-cluster: failed to get EX on MESSAGE (%d)\n", error);
403 		goto failed_message;
404 	}
405 
406 	memcpy(cinfo->message_lockres->lksb.sb_lvbptr, (void *)cmsg,
407 			sizeof(struct cluster_msg));
408 	/*down-convert EX to CR on Message*/
409 	error = dlm_lock_sync(cinfo->message_lockres, DLM_LOCK_CR);
410 	if (error) {
411 		pr_err("md-cluster: failed to convert EX to CR on MESSAGE(%d)\n",
412 				error);
413 		goto failed_message;
414 	}
415 
416 	/*up-convert CR to EX on Ack*/
417 	error = dlm_lock_sync(cinfo->ack_lockres, DLM_LOCK_EX);
418 	if (error) {
419 		pr_err("md-cluster: failed to convert CR to EX on ACK(%d)\n",
420 				error);
421 		goto failed_ack;
422 	}
423 
424 	/*down-convert EX to CR on Ack*/
425 	error = dlm_lock_sync(cinfo->ack_lockres, DLM_LOCK_CR);
426 	if (error) {
427 		pr_err("md-cluster: failed to convert EX to CR on ACK(%d)\n",
428 				error);
429 		goto failed_ack;
430 	}
431 
432 failed_ack:
433 	dlm_unlock_sync(cinfo->message_lockres);
434 failed_message:
435 	return error;
436 }
437 
438 static int sendmsg(struct md_cluster_info *cinfo, struct cluster_msg *cmsg)
439 {
440 	int ret;
441 
442 	lock_comm(cinfo);
443 	ret = __sendmsg(cinfo, cmsg);
444 	unlock_comm(cinfo);
445 	return ret;
446 }
447 
448 static int gather_all_resync_info(struct mddev *mddev, int total_slots)
449 {
450 	struct md_cluster_info *cinfo = mddev->cluster_info;
451 	int i, ret = 0;
452 	struct dlm_lock_resource *bm_lockres;
453 	struct suspend_info *s;
454 	char str[64];
455 
456 
457 	for (i = 0; i < total_slots; i++) {
458 		memset(str, '\0', 64);
459 		snprintf(str, 64, "bitmap%04d", i);
460 		bm_lockres = lockres_init(mddev, str, NULL, 1);
461 		if (!bm_lockres)
462 			return -ENOMEM;
463 		if (i == (cinfo->slot_number - 1))
464 			continue;
465 
466 		bm_lockres->flags |= DLM_LKF_NOQUEUE;
467 		ret = dlm_lock_sync(bm_lockres, DLM_LOCK_PW);
468 		if (ret == -EAGAIN) {
469 			memset(bm_lockres->lksb.sb_lvbptr, '\0', LVB_SIZE);
470 			s = read_resync_info(mddev, bm_lockres);
471 			if (s) {
472 				pr_info("%s:%d Resync[%llu..%llu] in progress on %d\n",
473 						__func__, __LINE__,
474 						(unsigned long long) s->lo,
475 						(unsigned long long) s->hi, i);
476 				spin_lock_irq(&cinfo->suspend_lock);
477 				s->slot = i;
478 				list_add(&s->list, &cinfo->suspend_list);
479 				spin_unlock_irq(&cinfo->suspend_lock);
480 			}
481 			ret = 0;
482 			lockres_free(bm_lockres);
483 			continue;
484 		}
485 		if (ret)
486 			goto out;
487 		/* TODO: Read the disk bitmap sb and check if it needs recovery */
488 		dlm_unlock_sync(bm_lockres);
489 		lockres_free(bm_lockres);
490 	}
491 out:
492 	return ret;
493 }
494 
495 static int join(struct mddev *mddev, int nodes)
496 {
497 	struct md_cluster_info *cinfo;
498 	int ret, ops_rv;
499 	char str[64];
500 
501 	if (!try_module_get(THIS_MODULE))
502 		return -ENOENT;
503 
504 	cinfo = kzalloc(sizeof(struct md_cluster_info), GFP_KERNEL);
505 	if (!cinfo)
506 		return -ENOMEM;
507 
508 	init_completion(&cinfo->completion);
509 
510 	mutex_init(&cinfo->sb_mutex);
511 	mddev->cluster_info = cinfo;
512 
513 	memset(str, 0, 64);
514 	pretty_uuid(str, mddev->uuid);
515 	ret = dlm_new_lockspace(str, mddev->bitmap_info.cluster_name,
516 				DLM_LSFL_FS, LVB_SIZE,
517 				&md_ls_ops, mddev, &ops_rv, &cinfo->lockspace);
518 	if (ret)
519 		goto err;
520 	wait_for_completion(&cinfo->completion);
521 	if (nodes <= cinfo->slot_number) {
522 		pr_err("md-cluster: Slot allotted(%d) greater than available slots(%d)", cinfo->slot_number - 1,
523 			nodes);
524 		ret = -ERANGE;
525 		goto err;
526 	}
527 	cinfo->sb_lock = lockres_init(mddev, "cmd-super",
528 					NULL, 0);
529 	if (!cinfo->sb_lock) {
530 		ret = -ENOMEM;
531 		goto err;
532 	}
533 	/* Initiate the communication resources */
534 	ret = -ENOMEM;
535 	cinfo->recv_thread = md_register_thread(recv_daemon, mddev, "cluster_recv");
536 	if (!cinfo->recv_thread) {
537 		pr_err("md-cluster: cannot allocate memory for recv_thread!\n");
538 		goto err;
539 	}
540 	cinfo->message_lockres = lockres_init(mddev, "message", NULL, 1);
541 	if (!cinfo->message_lockres)
542 		goto err;
543 	cinfo->token_lockres = lockres_init(mddev, "token", NULL, 0);
544 	if (!cinfo->token_lockres)
545 		goto err;
546 	cinfo->ack_lockres = lockres_init(mddev, "ack", ack_bast, 0);
547 	if (!cinfo->ack_lockres)
548 		goto err;
549 	/* get sync CR lock on ACK. */
550 	if (dlm_lock_sync(cinfo->ack_lockres, DLM_LOCK_CR))
551 		pr_err("md-cluster: failed to get a sync CR lock on ACK!(%d)\n",
552 				ret);
553 
554 	pr_info("md-cluster: Joined cluster %s slot %d\n", str, cinfo->slot_number);
555 	snprintf(str, 64, "bitmap%04d", cinfo->slot_number - 1);
556 	cinfo->bitmap_lockres = lockres_init(mddev, str, NULL, 1);
557 	if (!cinfo->bitmap_lockres)
558 		goto err;
559 	if (dlm_lock_sync(cinfo->bitmap_lockres, DLM_LOCK_PW)) {
560 		pr_err("Failed to get bitmap lock\n");
561 		ret = -EINVAL;
562 		goto err;
563 	}
564 
565 	INIT_LIST_HEAD(&cinfo->suspend_list);
566 	spin_lock_init(&cinfo->suspend_lock);
567 
568 	ret = gather_all_resync_info(mddev, nodes);
569 	if (ret)
570 		goto err;
571 
572 	return 0;
573 err:
574 	lockres_free(cinfo->message_lockres);
575 	lockres_free(cinfo->token_lockres);
576 	lockres_free(cinfo->ack_lockres);
577 	lockres_free(cinfo->bitmap_lockres);
578 	lockres_free(cinfo->sb_lock);
579 	if (cinfo->lockspace)
580 		dlm_release_lockspace(cinfo->lockspace, 2);
581 	mddev->cluster_info = NULL;
582 	kfree(cinfo);
583 	module_put(THIS_MODULE);
584 	return ret;
585 }
586 
587 static int leave(struct mddev *mddev)
588 {
589 	struct md_cluster_info *cinfo = mddev->cluster_info;
590 
591 	if (!cinfo)
592 		return 0;
593 	md_unregister_thread(&cinfo->recovery_thread);
594 	md_unregister_thread(&cinfo->recv_thread);
595 	lockres_free(cinfo->message_lockres);
596 	lockres_free(cinfo->token_lockres);
597 	lockres_free(cinfo->ack_lockres);
598 	lockres_free(cinfo->sb_lock);
599 	lockres_free(cinfo->bitmap_lockres);
600 	dlm_release_lockspace(cinfo->lockspace, 2);
601 	return 0;
602 }
603 
604 /* slot_number(): Returns the MD slot number to use
605  * DLM starts the slot numbers from 1, wheras cluster-md
606  * wants the number to be from zero, so we deduct one
607  */
608 static int slot_number(struct mddev *mddev)
609 {
610 	struct md_cluster_info *cinfo = mddev->cluster_info;
611 
612 	return cinfo->slot_number - 1;
613 }
614 
615 static void resync_info_update(struct mddev *mddev, sector_t lo, sector_t hi)
616 {
617 	struct md_cluster_info *cinfo = mddev->cluster_info;
618 
619 	add_resync_info(mddev, cinfo->bitmap_lockres, lo, hi);
620 	/* Re-acquire the lock to refresh LVB */
621 	dlm_lock_sync(cinfo->bitmap_lockres, DLM_LOCK_PW);
622 }
623 
624 static int metadata_update_start(struct mddev *mddev)
625 {
626 	return lock_comm(mddev->cluster_info);
627 }
628 
629 static int metadata_update_finish(struct mddev *mddev)
630 {
631 	struct md_cluster_info *cinfo = mddev->cluster_info;
632 	struct cluster_msg cmsg;
633 	int ret;
634 
635 	memset(&cmsg, 0, sizeof(cmsg));
636 	cmsg.type = cpu_to_le32(METADATA_UPDATED);
637 	ret = __sendmsg(cinfo, &cmsg);
638 	unlock_comm(cinfo);
639 	return ret;
640 }
641 
642 static int metadata_update_cancel(struct mddev *mddev)
643 {
644 	struct md_cluster_info *cinfo = mddev->cluster_info;
645 
646 	return dlm_unlock_sync(cinfo->token_lockres);
647 }
648 
649 static struct md_cluster_operations cluster_ops = {
650 	.join   = join,
651 	.leave  = leave,
652 	.slot_number = slot_number,
653 	.resync_info_update = resync_info_update,
654 	.metadata_update_start = metadata_update_start,
655 	.metadata_update_finish = metadata_update_finish,
656 	.metadata_update_cancel = metadata_update_cancel,
657 };
658 
659 static int __init cluster_init(void)
660 {
661 	pr_warn("md-cluster: EXPERIMENTAL. Use with caution\n");
662 	pr_info("Registering Cluster MD functions\n");
663 	register_md_cluster_operations(&cluster_ops, THIS_MODULE);
664 	return 0;
665 }
666 
667 static void cluster_exit(void)
668 {
669 	unregister_md_cluster_operations();
670 }
671 
672 module_init(cluster_init);
673 module_exit(cluster_exit);
674 MODULE_LICENSE("GPL");
675 MODULE_DESCRIPTION("Clustering support for MD");
676