xref: /openbmc/linux/fs/ocfs2/stack_user.c (revision f42b3800)
1 /* -*- mode: c; c-basic-offset: 8; -*-
2  * vim: noexpandtab sw=8 ts=8 sts=0:
3  *
4  * stack_user.c
5  *
6  * Code which interfaces ocfs2 with fs/dlm and a userspace stack.
7  *
8  * Copyright (C) 2007 Oracle.  All rights reserved.
9  *
10  * This program is free software; you can redistribute it and/or
11  * modify it under the terms of the GNU General Public
12  * License as published by the Free Software Foundation, version 2.
13  *
14  * This program is distributed in the hope that it will be useful,
15  * but WITHOUT ANY WARRANTY; without even the implied warranty of
16  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
17  * General Public License for more details.
18  */
19 
20 #include <linux/module.h>
21 #include <linux/fs.h>
22 #include <linux/miscdevice.h>
23 #include <linux/mutex.h>
24 #include <linux/reboot.h>
25 #include <asm/uaccess.h>
26 
27 #include "ocfs2.h"  /* For struct ocfs2_lock_res */
28 #include "stackglue.h"
29 
30 
31 /*
32  * The control protocol starts with a handshake.  Until the handshake
33  * is complete, the control device will fail all write(2)s.
34  *
35  * The handshake is simple.  First, the client reads until EOF.  Each line
36  * of output is a supported protocol tag.  All protocol tags are a single
37  * character followed by a two hex digit version number.  Currently the
38  * only things supported is T01, for "Text-base version 0x01".  Next, the
39  * client writes the version they would like to use, including the newline.
40  * Thus, the protocol tag is 'T01\n'.  If the version tag written is
41  * unknown, -EINVAL is returned.  Once the negotiation is complete, the
42  * client can start sending messages.
43  *
44  * The T01 protocol has three messages.  First is the "SETN" message.
45  * It has the following syntax:
46  *
47  *  SETN<space><8-char-hex-nodenum><newline>
48  *
49  * This is 14 characters.
50  *
51  * The "SETN" message must be the first message following the protocol.
52  * It tells ocfs2_control the local node number.
53  *
54  * Next comes the "SETV" message.  It has the following syntax:
55  *
56  *  SETV<space><2-char-hex-major><space><2-char-hex-minor><newline>
57  *
58  * This is 11 characters.
59  *
60  * The "SETV" message sets the filesystem locking protocol version as
61  * negotiated by the client.  The client negotiates based on the maximum
62  * version advertised in /sys/fs/ocfs2/max_locking_protocol.  The major
63  * number from the "SETV" message must match
64  * user_stack.sp_proto->lp_max_version.pv_major, and the minor number
65  * must be less than or equal to ...->lp_max_version.pv_minor.
66  *
67  * Once this information has been set, mounts will be allowed.  From this
68  * point on, the "DOWN" message can be sent for node down notification.
69  * It has the following syntax:
70  *
71  *  DOWN<space><32-char-cap-hex-uuid><space><8-char-hex-nodenum><newline>
72  *
73  * eg:
74  *
75  *  DOWN 632A924FDD844190BDA93C0DF6B94899 00000001\n
76  *
77  * This is 47 characters.
78  */
79 
80 /*
81  * Whether or not the client has done the handshake.
82  * For now, we have just one protocol version.
83  */
84 #define OCFS2_CONTROL_PROTO			"T01\n"
85 #define OCFS2_CONTROL_PROTO_LEN			4
86 
87 /* Handshake states */
88 #define OCFS2_CONTROL_HANDSHAKE_INVALID		(0)
89 #define OCFS2_CONTROL_HANDSHAKE_READ		(1)
90 #define OCFS2_CONTROL_HANDSHAKE_PROTOCOL	(2)
91 #define OCFS2_CONTROL_HANDSHAKE_VALID		(3)
92 
93 /* Messages */
94 #define OCFS2_CONTROL_MESSAGE_OP_LEN		4
95 #define OCFS2_CONTROL_MESSAGE_SETNODE_OP	"SETN"
96 #define OCFS2_CONTROL_MESSAGE_SETNODE_TOTAL_LEN	14
97 #define OCFS2_CONTROL_MESSAGE_SETVERSION_OP	"SETV"
98 #define OCFS2_CONTROL_MESSAGE_SETVERSION_TOTAL_LEN	11
99 #define OCFS2_CONTROL_MESSAGE_DOWN_OP		"DOWN"
100 #define OCFS2_CONTROL_MESSAGE_DOWN_TOTAL_LEN	47
101 #define OCFS2_TEXT_UUID_LEN			32
102 #define OCFS2_CONTROL_MESSAGE_VERNUM_LEN	2
103 #define OCFS2_CONTROL_MESSAGE_NODENUM_LEN	8
104 
105 /*
106  * ocfs2_live_connection is refcounted because the filesystem and
107  * miscdevice sides can detach in different order.  Let's just be safe.
108  */
109 struct ocfs2_live_connection {
110 	struct list_head		oc_list;
111 	struct ocfs2_cluster_connection	*oc_conn;
112 };
113 
114 struct ocfs2_control_private {
115 	struct list_head op_list;
116 	int op_state;
117 	int op_this_node;
118 	struct ocfs2_protocol_version op_proto;
119 };
120 
121 /* SETN<space><8-char-hex-nodenum><newline> */
122 struct ocfs2_control_message_setn {
123 	char	tag[OCFS2_CONTROL_MESSAGE_OP_LEN];
124 	char	space;
125 	char	nodestr[OCFS2_CONTROL_MESSAGE_NODENUM_LEN];
126 	char	newline;
127 };
128 
129 /* SETV<space><2-char-hex-major><space><2-char-hex-minor><newline> */
130 struct ocfs2_control_message_setv {
131 	char	tag[OCFS2_CONTROL_MESSAGE_OP_LEN];
132 	char	space1;
133 	char	major[OCFS2_CONTROL_MESSAGE_VERNUM_LEN];
134 	char	space2;
135 	char	minor[OCFS2_CONTROL_MESSAGE_VERNUM_LEN];
136 	char	newline;
137 };
138 
139 /* DOWN<space><32-char-cap-hex-uuid><space><8-char-hex-nodenum><newline> */
140 struct ocfs2_control_message_down {
141 	char	tag[OCFS2_CONTROL_MESSAGE_OP_LEN];
142 	char	space1;
143 	char	uuid[OCFS2_TEXT_UUID_LEN];
144 	char	space2;
145 	char	nodestr[OCFS2_CONTROL_MESSAGE_NODENUM_LEN];
146 	char	newline;
147 };
148 
149 union ocfs2_control_message {
150 	char					tag[OCFS2_CONTROL_MESSAGE_OP_LEN];
151 	struct ocfs2_control_message_setn	u_setn;
152 	struct ocfs2_control_message_setv	u_setv;
153 	struct ocfs2_control_message_down	u_down;
154 };
155 
156 static struct ocfs2_stack_plugin user_stack;
157 
158 static atomic_t ocfs2_control_opened;
159 static int ocfs2_control_this_node = -1;
160 static struct ocfs2_protocol_version running_proto;
161 
162 static LIST_HEAD(ocfs2_live_connection_list);
163 static LIST_HEAD(ocfs2_control_private_list);
164 static DEFINE_MUTEX(ocfs2_control_lock);
165 
166 static inline void ocfs2_control_set_handshake_state(struct file *file,
167 						     int state)
168 {
169 	struct ocfs2_control_private *p = file->private_data;
170 	p->op_state = state;
171 }
172 
173 static inline int ocfs2_control_get_handshake_state(struct file *file)
174 {
175 	struct ocfs2_control_private *p = file->private_data;
176 	return p->op_state;
177 }
178 
179 static struct ocfs2_live_connection *ocfs2_connection_find(const char *name)
180 {
181 	size_t len = strlen(name);
182 	struct ocfs2_live_connection *c;
183 
184 	BUG_ON(!mutex_is_locked(&ocfs2_control_lock));
185 
186 	list_for_each_entry(c, &ocfs2_live_connection_list, oc_list) {
187 		if ((c->oc_conn->cc_namelen == len) &&
188 		    !strncmp(c->oc_conn->cc_name, name, len))
189 			return c;
190 	}
191 
192 	return c;
193 }
194 
195 /*
196  * ocfs2_live_connection structures are created underneath the ocfs2
197  * mount path.  Since the VFS prevents multiple calls to
198  * fill_super(), we can't get dupes here.
199  */
200 static int ocfs2_live_connection_new(struct ocfs2_cluster_connection *conn,
201 				     struct ocfs2_live_connection **c_ret)
202 {
203 	int rc = 0;
204 	struct ocfs2_live_connection *c;
205 
206 	c = kzalloc(sizeof(struct ocfs2_live_connection), GFP_KERNEL);
207 	if (!c)
208 		return -ENOMEM;
209 
210 	mutex_lock(&ocfs2_control_lock);
211 	c->oc_conn = conn;
212 
213 	if (atomic_read(&ocfs2_control_opened))
214 		list_add(&c->oc_list, &ocfs2_live_connection_list);
215 	else {
216 		printk(KERN_ERR
217 		       "ocfs2: Userspace control daemon is not present\n");
218 		rc = -ESRCH;
219 	}
220 
221 	mutex_unlock(&ocfs2_control_lock);
222 
223 	if (!rc)
224 		*c_ret = c;
225 	else
226 		kfree(c);
227 
228 	return rc;
229 }
230 
231 /*
232  * This function disconnects the cluster connection from ocfs2_control.
233  * Afterwards, userspace can't affect the cluster connection.
234  */
235 static void ocfs2_live_connection_drop(struct ocfs2_live_connection *c)
236 {
237 	mutex_lock(&ocfs2_control_lock);
238 	list_del_init(&c->oc_list);
239 	c->oc_conn = NULL;
240 	mutex_unlock(&ocfs2_control_lock);
241 
242 	kfree(c);
243 }
244 
245 static int ocfs2_control_cfu(void *target, size_t target_len,
246 			     const char __user *buf, size_t count)
247 {
248 	/* The T01 expects write(2) calls to have exactly one command */
249 	if ((count != target_len) ||
250 	    (count > sizeof(union ocfs2_control_message)))
251 		return -EINVAL;
252 
253 	if (copy_from_user(target, buf, target_len))
254 		return -EFAULT;
255 
256 	return 0;
257 }
258 
259 static ssize_t ocfs2_control_validate_protocol(struct file *file,
260 					       const char __user *buf,
261 					       size_t count)
262 {
263 	ssize_t ret;
264 	char kbuf[OCFS2_CONTROL_PROTO_LEN];
265 
266 	ret = ocfs2_control_cfu(kbuf, OCFS2_CONTROL_PROTO_LEN,
267 				buf, count);
268 	if (ret)
269 		return ret;
270 
271 	if (strncmp(kbuf, OCFS2_CONTROL_PROTO, OCFS2_CONTROL_PROTO_LEN))
272 		return -EINVAL;
273 
274 	ocfs2_control_set_handshake_state(file,
275 					  OCFS2_CONTROL_HANDSHAKE_PROTOCOL);
276 
277 	return count;
278 }
279 
280 static void ocfs2_control_send_down(const char *uuid,
281 				    int nodenum)
282 {
283 	struct ocfs2_live_connection *c;
284 
285 	mutex_lock(&ocfs2_control_lock);
286 
287 	c = ocfs2_connection_find(uuid);
288 	if (c) {
289 		BUG_ON(c->oc_conn == NULL);
290 		c->oc_conn->cc_recovery_handler(nodenum,
291 						c->oc_conn->cc_recovery_data);
292 	}
293 
294 	mutex_unlock(&ocfs2_control_lock);
295 }
296 
297 /*
298  * Called whenever configuration elements are sent to /dev/ocfs2_control.
299  * If all configuration elements are present, try to set the global
300  * values.  If there is a problem, return an error.  Skip any missing
301  * elements, and only bump ocfs2_control_opened when we have all elements
302  * and are successful.
303  */
304 static int ocfs2_control_install_private(struct file *file)
305 {
306 	int rc = 0;
307 	int set_p = 1;
308 	struct ocfs2_control_private *p = file->private_data;
309 
310 	BUG_ON(p->op_state != OCFS2_CONTROL_HANDSHAKE_PROTOCOL);
311 
312 	mutex_lock(&ocfs2_control_lock);
313 
314 	if (p->op_this_node < 0) {
315 		set_p = 0;
316 	} else if ((ocfs2_control_this_node >= 0) &&
317 		   (ocfs2_control_this_node != p->op_this_node)) {
318 		rc = -EINVAL;
319 		goto out_unlock;
320 	}
321 
322 	if (!p->op_proto.pv_major) {
323 		set_p = 0;
324 	} else if (!list_empty(&ocfs2_live_connection_list) &&
325 		   ((running_proto.pv_major != p->op_proto.pv_major) ||
326 		    (running_proto.pv_minor != p->op_proto.pv_minor))) {
327 		rc = -EINVAL;
328 		goto out_unlock;
329 	}
330 
331 	if (set_p) {
332 		ocfs2_control_this_node = p->op_this_node;
333 		running_proto.pv_major = p->op_proto.pv_major;
334 		running_proto.pv_minor = p->op_proto.pv_minor;
335 	}
336 
337 out_unlock:
338 	mutex_unlock(&ocfs2_control_lock);
339 
340 	if (!rc && set_p) {
341 		/* We set the global values successfully */
342 		atomic_inc(&ocfs2_control_opened);
343 		ocfs2_control_set_handshake_state(file,
344 					OCFS2_CONTROL_HANDSHAKE_VALID);
345 	}
346 
347 	return rc;
348 }
349 
350 static int ocfs2_control_get_this_node(void)
351 {
352 	int rc;
353 
354 	mutex_lock(&ocfs2_control_lock);
355 	if (ocfs2_control_this_node < 0)
356 		rc = -EINVAL;
357 	else
358 		rc = ocfs2_control_this_node;
359 	mutex_unlock(&ocfs2_control_lock);
360 
361 	return rc;
362 }
363 
364 static int ocfs2_control_do_setnode_msg(struct file *file,
365 					struct ocfs2_control_message_setn *msg)
366 {
367 	long nodenum;
368 	char *ptr = NULL;
369 	struct ocfs2_control_private *p = file->private_data;
370 
371 	if (ocfs2_control_get_handshake_state(file) !=
372 	    OCFS2_CONTROL_HANDSHAKE_PROTOCOL)
373 		return -EINVAL;
374 
375 	if (strncmp(msg->tag, OCFS2_CONTROL_MESSAGE_SETNODE_OP,
376 		    OCFS2_CONTROL_MESSAGE_OP_LEN))
377 		return -EINVAL;
378 
379 	if ((msg->space != ' ') || (msg->newline != '\n'))
380 		return -EINVAL;
381 	msg->space = msg->newline = '\0';
382 
383 	nodenum = simple_strtol(msg->nodestr, &ptr, 16);
384 	if (!ptr || *ptr)
385 		return -EINVAL;
386 
387 	if ((nodenum == LONG_MIN) || (nodenum == LONG_MAX) ||
388 	    (nodenum > INT_MAX) || (nodenum < 0))
389 		return -ERANGE;
390 	p->op_this_node = nodenum;
391 
392 	return ocfs2_control_install_private(file);
393 }
394 
395 static int ocfs2_control_do_setversion_msg(struct file *file,
396 					   struct ocfs2_control_message_setv *msg)
397  {
398 	long major, minor;
399 	char *ptr = NULL;
400 	struct ocfs2_control_private *p = file->private_data;
401 	struct ocfs2_protocol_version *max =
402 		&user_stack.sp_proto->lp_max_version;
403 
404 	if (ocfs2_control_get_handshake_state(file) !=
405 	    OCFS2_CONTROL_HANDSHAKE_PROTOCOL)
406 		return -EINVAL;
407 
408 	if (strncmp(msg->tag, OCFS2_CONTROL_MESSAGE_SETVERSION_OP,
409 		    OCFS2_CONTROL_MESSAGE_OP_LEN))
410 		return -EINVAL;
411 
412 	if ((msg->space1 != ' ') || (msg->space2 != ' ') ||
413 	    (msg->newline != '\n'))
414 		return -EINVAL;
415 	msg->space1 = msg->space2 = msg->newline = '\0';
416 
417 	major = simple_strtol(msg->major, &ptr, 16);
418 	if (!ptr || *ptr)
419 		return -EINVAL;
420 	minor = simple_strtol(msg->minor, &ptr, 16);
421 	if (!ptr || *ptr)
422 		return -EINVAL;
423 
424 	/*
425 	 * The major must be between 1 and 255, inclusive.  The minor
426 	 * must be between 0 and 255, inclusive.  The version passed in
427 	 * must be within the maximum version supported by the filesystem.
428 	 */
429 	if ((major == LONG_MIN) || (major == LONG_MAX) ||
430 	    (major > (u8)-1) || (major < 1))
431 		return -ERANGE;
432 	if ((minor == LONG_MIN) || (minor == LONG_MAX) ||
433 	    (minor > (u8)-1) || (minor < 0))
434 		return -ERANGE;
435 	if ((major != max->pv_major) ||
436 	    (minor > max->pv_minor))
437 		return -EINVAL;
438 
439 	p->op_proto.pv_major = major;
440 	p->op_proto.pv_minor = minor;
441 
442 	return ocfs2_control_install_private(file);
443 }
444 
445 static int ocfs2_control_do_down_msg(struct file *file,
446 				     struct ocfs2_control_message_down *msg)
447 {
448 	long nodenum;
449 	char *p = NULL;
450 
451 	if (ocfs2_control_get_handshake_state(file) !=
452 	    OCFS2_CONTROL_HANDSHAKE_VALID)
453 		return -EINVAL;
454 
455 	if (strncmp(msg->tag, OCFS2_CONTROL_MESSAGE_DOWN_OP,
456 		    OCFS2_CONTROL_MESSAGE_OP_LEN))
457 		return -EINVAL;
458 
459 	if ((msg->space1 != ' ') || (msg->space2 != ' ') ||
460 	    (msg->newline != '\n'))
461 		return -EINVAL;
462 	msg->space1 = msg->space2 = msg->newline = '\0';
463 
464 	nodenum = simple_strtol(msg->nodestr, &p, 16);
465 	if (!p || *p)
466 		return -EINVAL;
467 
468 	if ((nodenum == LONG_MIN) || (nodenum == LONG_MAX) ||
469 	    (nodenum > INT_MAX) || (nodenum < 0))
470 		return -ERANGE;
471 
472 	ocfs2_control_send_down(msg->uuid, nodenum);
473 
474 	return 0;
475 }
476 
477 static ssize_t ocfs2_control_message(struct file *file,
478 				     const char __user *buf,
479 				     size_t count)
480 {
481 	ssize_t ret;
482 	union ocfs2_control_message msg;
483 
484 	/* Try to catch padding issues */
485 	WARN_ON(offsetof(struct ocfs2_control_message_down, uuid) !=
486 		(sizeof(msg.u_down.tag) + sizeof(msg.u_down.space1)));
487 
488 	memset(&msg, 0, sizeof(union ocfs2_control_message));
489 	ret = ocfs2_control_cfu(&msg, count, buf, count);
490 	if (ret)
491 		goto out;
492 
493 	if ((count == OCFS2_CONTROL_MESSAGE_SETNODE_TOTAL_LEN) &&
494 	    !strncmp(msg.tag, OCFS2_CONTROL_MESSAGE_SETNODE_OP,
495 		     OCFS2_CONTROL_MESSAGE_OP_LEN))
496 		ret = ocfs2_control_do_setnode_msg(file, &msg.u_setn);
497 	else if ((count == OCFS2_CONTROL_MESSAGE_SETVERSION_TOTAL_LEN) &&
498 		 !strncmp(msg.tag, OCFS2_CONTROL_MESSAGE_SETVERSION_OP,
499 			  OCFS2_CONTROL_MESSAGE_OP_LEN))
500 		ret = ocfs2_control_do_setversion_msg(file, &msg.u_setv);
501 	else if ((count == OCFS2_CONTROL_MESSAGE_DOWN_TOTAL_LEN) &&
502 		 !strncmp(msg.tag, OCFS2_CONTROL_MESSAGE_DOWN_OP,
503 			  OCFS2_CONTROL_MESSAGE_OP_LEN))
504 		ret = ocfs2_control_do_down_msg(file, &msg.u_down);
505 	else
506 		ret = -EINVAL;
507 
508 out:
509 	return ret ? ret : count;
510 }
511 
512 static ssize_t ocfs2_control_write(struct file *file,
513 				   const char __user *buf,
514 				   size_t count,
515 				   loff_t *ppos)
516 {
517 	ssize_t ret;
518 
519 	switch (ocfs2_control_get_handshake_state(file)) {
520 		case OCFS2_CONTROL_HANDSHAKE_INVALID:
521 			ret = -EINVAL;
522 			break;
523 
524 		case OCFS2_CONTROL_HANDSHAKE_READ:
525 			ret = ocfs2_control_validate_protocol(file, buf,
526 							      count);
527 			break;
528 
529 		case OCFS2_CONTROL_HANDSHAKE_PROTOCOL:
530 		case OCFS2_CONTROL_HANDSHAKE_VALID:
531 			ret = ocfs2_control_message(file, buf, count);
532 			break;
533 
534 		default:
535 			BUG();
536 			ret = -EIO;
537 			break;
538 	}
539 
540 	return ret;
541 }
542 
543 /*
544  * This is a naive version.  If we ever have a new protocol, we'll expand
545  * it.  Probably using seq_file.
546  */
547 static ssize_t ocfs2_control_read(struct file *file,
548 				  char __user *buf,
549 				  size_t count,
550 				  loff_t *ppos)
551 {
552 	char *proto_string = OCFS2_CONTROL_PROTO;
553 	size_t to_write = 0;
554 
555 	if (*ppos >= OCFS2_CONTROL_PROTO_LEN)
556 		return 0;
557 
558 	to_write = OCFS2_CONTROL_PROTO_LEN - *ppos;
559 	if (to_write > count)
560 		to_write = count;
561 	if (copy_to_user(buf, proto_string + *ppos, to_write))
562 		return -EFAULT;
563 
564 	*ppos += to_write;
565 
566 	/* Have we read the whole protocol list? */
567 	if (*ppos >= OCFS2_CONTROL_PROTO_LEN)
568 		ocfs2_control_set_handshake_state(file,
569 						  OCFS2_CONTROL_HANDSHAKE_READ);
570 
571 	return to_write;
572 }
573 
574 static int ocfs2_control_release(struct inode *inode, struct file *file)
575 {
576 	struct ocfs2_control_private *p = file->private_data;
577 
578 	mutex_lock(&ocfs2_control_lock);
579 
580 	if (ocfs2_control_get_handshake_state(file) !=
581 	    OCFS2_CONTROL_HANDSHAKE_VALID)
582 		goto out;
583 
584 	if (atomic_dec_and_test(&ocfs2_control_opened)) {
585 		if (!list_empty(&ocfs2_live_connection_list)) {
586 			/* XXX: Do bad things! */
587 			printk(KERN_ERR
588 			       "ocfs2: Unexpected release of ocfs2_control!\n"
589 			       "       Loss of cluster connection requires "
590 			       "an emergency restart!\n");
591 			emergency_restart();
592 		}
593 		/*
594 		 * Last valid close clears the node number and resets
595 		 * the locking protocol version
596 		 */
597 		ocfs2_control_this_node = -1;
598 		running_proto.pv_major = 0;
599 		running_proto.pv_major = 0;
600 	}
601 
602 out:
603 	list_del_init(&p->op_list);
604 	file->private_data = NULL;
605 
606 	mutex_unlock(&ocfs2_control_lock);
607 
608 	kfree(p);
609 
610 	return 0;
611 }
612 
613 static int ocfs2_control_open(struct inode *inode, struct file *file)
614 {
615 	struct ocfs2_control_private *p;
616 
617 	p = kzalloc(sizeof(struct ocfs2_control_private), GFP_KERNEL);
618 	if (!p)
619 		return -ENOMEM;
620 	p->op_this_node = -1;
621 
622 	mutex_lock(&ocfs2_control_lock);
623 	file->private_data = p;
624 	list_add(&p->op_list, &ocfs2_control_private_list);
625 	mutex_unlock(&ocfs2_control_lock);
626 
627 	return 0;
628 }
629 
630 static const struct file_operations ocfs2_control_fops = {
631 	.open    = ocfs2_control_open,
632 	.release = ocfs2_control_release,
633 	.read    = ocfs2_control_read,
634 	.write   = ocfs2_control_write,
635 	.owner   = THIS_MODULE,
636 };
637 
638 struct miscdevice ocfs2_control_device = {
639 	.minor		= MISC_DYNAMIC_MINOR,
640 	.name		= "ocfs2_control",
641 	.fops		= &ocfs2_control_fops,
642 };
643 
644 static int ocfs2_control_init(void)
645 {
646 	int rc;
647 
648 	atomic_set(&ocfs2_control_opened, 0);
649 
650 	rc = misc_register(&ocfs2_control_device);
651 	if (rc)
652 		printk(KERN_ERR
653 		       "ocfs2: Unable to register ocfs2_control device "
654 		       "(errno %d)\n",
655 		       -rc);
656 
657 	return rc;
658 }
659 
660 static void ocfs2_control_exit(void)
661 {
662 	int rc;
663 
664 	rc = misc_deregister(&ocfs2_control_device);
665 	if (rc)
666 		printk(KERN_ERR
667 		       "ocfs2: Unable to deregister ocfs2_control device "
668 		       "(errno %d)\n",
669 		       -rc);
670 }
671 
672 static struct dlm_lksb *fsdlm_astarg_to_lksb(void *astarg)
673 {
674 	struct ocfs2_lock_res *res = astarg;
675 	return &res->l_lksb.lksb_fsdlm;
676 }
677 
678 static void fsdlm_lock_ast_wrapper(void *astarg)
679 {
680 	struct dlm_lksb *lksb = fsdlm_astarg_to_lksb(astarg);
681 	int status = lksb->sb_status;
682 
683 	BUG_ON(user_stack.sp_proto == NULL);
684 
685 	/*
686 	 * For now we're punting on the issue of other non-standard errors
687 	 * where we can't tell if the unlock_ast or lock_ast should be called.
688 	 * The main "other error" that's possible is EINVAL which means the
689 	 * function was called with invalid args, which shouldn't be possible
690 	 * since the caller here is under our control.  Other non-standard
691 	 * errors probably fall into the same category, or otherwise are fatal
692 	 * which means we can't carry on anyway.
693 	 */
694 
695 	if (status == -DLM_EUNLOCK || status == -DLM_ECANCEL)
696 		user_stack.sp_proto->lp_unlock_ast(astarg, 0);
697 	else
698 		user_stack.sp_proto->lp_lock_ast(astarg);
699 }
700 
701 static void fsdlm_blocking_ast_wrapper(void *astarg, int level)
702 {
703 	BUG_ON(user_stack.sp_proto == NULL);
704 
705 	user_stack.sp_proto->lp_blocking_ast(astarg, level);
706 }
707 
708 static int user_dlm_lock(struct ocfs2_cluster_connection *conn,
709 			 int mode,
710 			 union ocfs2_dlm_lksb *lksb,
711 			 u32 flags,
712 			 void *name,
713 			 unsigned int namelen,
714 			 void *astarg)
715 {
716 	int ret;
717 
718 	if (!lksb->lksb_fsdlm.sb_lvbptr)
719 		lksb->lksb_fsdlm.sb_lvbptr = (char *)lksb +
720 					     sizeof(struct dlm_lksb);
721 
722 	ret = dlm_lock(conn->cc_lockspace, mode, &lksb->lksb_fsdlm,
723 		       flags|DLM_LKF_NODLCKWT, name, namelen, 0,
724 		       fsdlm_lock_ast_wrapper, astarg,
725 		       fsdlm_blocking_ast_wrapper);
726 	return ret;
727 }
728 
729 static int user_dlm_unlock(struct ocfs2_cluster_connection *conn,
730 			   union ocfs2_dlm_lksb *lksb,
731 			   u32 flags,
732 			   void *astarg)
733 {
734 	int ret;
735 
736 	ret = dlm_unlock(conn->cc_lockspace, lksb->lksb_fsdlm.sb_lkid,
737 			 flags, &lksb->lksb_fsdlm, astarg);
738 	return ret;
739 }
740 
741 static int user_dlm_lock_status(union ocfs2_dlm_lksb *lksb)
742 {
743 	return lksb->lksb_fsdlm.sb_status;
744 }
745 
746 static void *user_dlm_lvb(union ocfs2_dlm_lksb *lksb)
747 {
748 	return (void *)(lksb->lksb_fsdlm.sb_lvbptr);
749 }
750 
751 static void user_dlm_dump_lksb(union ocfs2_dlm_lksb *lksb)
752 {
753 }
754 
755 /*
756  * Compare a requested locking protocol version against the current one.
757  *
758  * If the major numbers are different, they are incompatible.
759  * If the current minor is greater than the request, they are incompatible.
760  * If the current minor is less than or equal to the request, they are
761  * compatible, and the requester should run at the current minor version.
762  */
763 static int fs_protocol_compare(struct ocfs2_protocol_version *existing,
764 			       struct ocfs2_protocol_version *request)
765 {
766 	if (existing->pv_major != request->pv_major)
767 		return 1;
768 
769 	if (existing->pv_minor > request->pv_minor)
770 		return 1;
771 
772 	if (existing->pv_minor < request->pv_minor)
773 		request->pv_minor = existing->pv_minor;
774 
775 	return 0;
776 }
777 
778 static int user_cluster_connect(struct ocfs2_cluster_connection *conn)
779 {
780 	dlm_lockspace_t *fsdlm;
781 	struct ocfs2_live_connection *control;
782 	int rc = 0;
783 
784 	BUG_ON(conn == NULL);
785 
786 	rc = ocfs2_live_connection_new(conn, &control);
787 	if (rc)
788 		goto out;
789 
790 	/*
791 	 * running_proto must have been set before we allowed any mounts
792 	 * to proceed.
793 	 */
794 	if (fs_protocol_compare(&running_proto, &conn->cc_version)) {
795 		printk(KERN_ERR
796 		       "Unable to mount with fs locking protocol version "
797 		       "%u.%u because the userspace control daemon has "
798 		       "negotiated %u.%u\n",
799 		       conn->cc_version.pv_major, conn->cc_version.pv_minor,
800 		       running_proto.pv_major, running_proto.pv_minor);
801 		rc = -EPROTO;
802 		ocfs2_live_connection_drop(control);
803 		goto out;
804 	}
805 
806 	rc = dlm_new_lockspace(conn->cc_name, strlen(conn->cc_name),
807 			       &fsdlm, DLM_LSFL_FS, DLM_LVB_LEN);
808 	if (rc) {
809 		ocfs2_live_connection_drop(control);
810 		goto out;
811 	}
812 
813 	conn->cc_private = control;
814 	conn->cc_lockspace = fsdlm;
815 out:
816 	return rc;
817 }
818 
819 static int user_cluster_disconnect(struct ocfs2_cluster_connection *conn,
820 				   int hangup_pending)
821 {
822 	dlm_release_lockspace(conn->cc_lockspace, 2);
823 	conn->cc_lockspace = NULL;
824 	ocfs2_live_connection_drop(conn->cc_private);
825 	conn->cc_private = NULL;
826 	return 0;
827 }
828 
829 static int user_cluster_this_node(unsigned int *this_node)
830 {
831 	int rc;
832 
833 	rc = ocfs2_control_get_this_node();
834 	if (rc < 0)
835 		return rc;
836 
837 	*this_node = rc;
838 	return 0;
839 }
840 
841 static struct ocfs2_stack_operations user_stack_ops = {
842 	.connect	= user_cluster_connect,
843 	.disconnect	= user_cluster_disconnect,
844 	.this_node	= user_cluster_this_node,
845 	.dlm_lock	= user_dlm_lock,
846 	.dlm_unlock	= user_dlm_unlock,
847 	.lock_status	= user_dlm_lock_status,
848 	.lock_lvb	= user_dlm_lvb,
849 	.dump_lksb	= user_dlm_dump_lksb,
850 };
851 
852 static struct ocfs2_stack_plugin user_stack = {
853 	.sp_name	= "user",
854 	.sp_ops		= &user_stack_ops,
855 	.sp_owner	= THIS_MODULE,
856 };
857 
858 
859 static int __init user_stack_init(void)
860 {
861 	int rc;
862 
863 	rc = ocfs2_control_init();
864 	if (!rc) {
865 		rc = ocfs2_stack_glue_register(&user_stack);
866 		if (rc)
867 			ocfs2_control_exit();
868 	}
869 
870 	return rc;
871 }
872 
873 static void __exit user_stack_exit(void)
874 {
875 	ocfs2_stack_glue_unregister(&user_stack);
876 	ocfs2_control_exit();
877 }
878 
879 MODULE_AUTHOR("Oracle");
880 MODULE_DESCRIPTION("ocfs2 driver for userspace cluster stacks");
881 MODULE_LICENSE("GPL");
882 module_init(user_stack_init);
883 module_exit(user_stack_exit);
884