xref: /openbmc/linux/fs/ocfs2/cluster/nodemanager.c (revision 13a83fc9096dfaf2a7f4671b5777780bbe1d4a30)
1 /* -*- mode: c; c-basic-offset: 8; -*-
2  * vim: noexpandtab sw=8 ts=8 sts=0:
3  *
4  * Copyright (C) 2004, 2005 Oracle.  All rights reserved.
5  *
6  * This program is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU General Public
8  * License as published by the Free Software Foundation; either
9  * version 2 of the License, or (at your option) any later version.
10  *
11  * This program is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14  * General Public License for more details.
15  *
16  * You should have received a copy of the GNU General Public
17  * License along with this program; if not, write to the
18  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
19  * Boston, MA 021110-1307, USA.
20  */
21 
22 #include <linux/slab.h>
23 #include <linux/kernel.h>
24 #include <linux/module.h>
25 #include <linux/configfs.h>
26 
27 #include "tcp.h"
28 #include "nodemanager.h"
29 #include "heartbeat.h"
30 #include "masklog.h"
31 #include "sys.h"
32 
33 /* for now we operate under the assertion that there can be only one
34  * cluster active at a time.  Changing this will require trickling
35  * cluster references throughout where nodes are looked up */
36 struct o2nm_cluster *o2nm_single_cluster = NULL;
37 
38 char *o2nm_fence_method_desc[O2NM_FENCE_METHODS] = {
39 		"reset",	/* O2NM_FENCE_RESET */
40 		"panic",	/* O2NM_FENCE_PANIC */
41 };
42 
43 struct o2nm_node *o2nm_get_node_by_num(u8 node_num)
44 {
45 	struct o2nm_node *node = NULL;
46 
47 	if (node_num >= O2NM_MAX_NODES || o2nm_single_cluster == NULL)
48 		goto out;
49 
50 	read_lock(&o2nm_single_cluster->cl_nodes_lock);
51 	node = o2nm_single_cluster->cl_nodes[node_num];
52 	if (node)
53 		config_item_get(&node->nd_item);
54 	read_unlock(&o2nm_single_cluster->cl_nodes_lock);
55 out:
56 	return node;
57 }
58 EXPORT_SYMBOL_GPL(o2nm_get_node_by_num);
59 
60 int o2nm_configured_node_map(unsigned long *map, unsigned bytes)
61 {
62 	struct o2nm_cluster *cluster = o2nm_single_cluster;
63 
64 	BUG_ON(bytes < (sizeof(cluster->cl_nodes_bitmap)));
65 
66 	if (cluster == NULL)
67 		return -EINVAL;
68 
69 	read_lock(&cluster->cl_nodes_lock);
70 	memcpy(map, cluster->cl_nodes_bitmap, sizeof(cluster->cl_nodes_bitmap));
71 	read_unlock(&cluster->cl_nodes_lock);
72 
73 	return 0;
74 }
75 EXPORT_SYMBOL_GPL(o2nm_configured_node_map);
76 
77 static struct o2nm_node *o2nm_node_ip_tree_lookup(struct o2nm_cluster *cluster,
78 						  __be32 ip_needle,
79 						  struct rb_node ***ret_p,
80 						  struct rb_node **ret_parent)
81 {
82 	struct rb_node **p = &cluster->cl_node_ip_tree.rb_node;
83 	struct rb_node *parent = NULL;
84 	struct o2nm_node *node, *ret = NULL;
85 
86 	while (*p) {
87 		int cmp;
88 
89 		parent = *p;
90 		node = rb_entry(parent, struct o2nm_node, nd_ip_node);
91 
92 		cmp = memcmp(&ip_needle, &node->nd_ipv4_address,
93 				sizeof(ip_needle));
94 		if (cmp < 0)
95 			p = &(*p)->rb_left;
96 		else if (cmp > 0)
97 			p = &(*p)->rb_right;
98 		else {
99 			ret = node;
100 			break;
101 		}
102 	}
103 
104 	if (ret_p != NULL)
105 		*ret_p = p;
106 	if (ret_parent != NULL)
107 		*ret_parent = parent;
108 
109 	return ret;
110 }
111 
112 struct o2nm_node *o2nm_get_node_by_ip(__be32 addr)
113 {
114 	struct o2nm_node *node = NULL;
115 	struct o2nm_cluster *cluster = o2nm_single_cluster;
116 
117 	if (cluster == NULL)
118 		goto out;
119 
120 	read_lock(&cluster->cl_nodes_lock);
121 	node = o2nm_node_ip_tree_lookup(cluster, addr, NULL, NULL);
122 	if (node)
123 		config_item_get(&node->nd_item);
124 	read_unlock(&cluster->cl_nodes_lock);
125 
126 out:
127 	return node;
128 }
129 EXPORT_SYMBOL_GPL(o2nm_get_node_by_ip);
130 
131 void o2nm_node_put(struct o2nm_node *node)
132 {
133 	config_item_put(&node->nd_item);
134 }
135 EXPORT_SYMBOL_GPL(o2nm_node_put);
136 
137 void o2nm_node_get(struct o2nm_node *node)
138 {
139 	config_item_get(&node->nd_item);
140 }
141 EXPORT_SYMBOL_GPL(o2nm_node_get);
142 
143 u8 o2nm_this_node(void)
144 {
145 	u8 node_num = O2NM_MAX_NODES;
146 
147 	if (o2nm_single_cluster && o2nm_single_cluster->cl_has_local)
148 		node_num = o2nm_single_cluster->cl_local_node;
149 
150 	return node_num;
151 }
152 EXPORT_SYMBOL_GPL(o2nm_this_node);
153 
154 /* node configfs bits */
155 
156 static struct o2nm_cluster *to_o2nm_cluster(struct config_item *item)
157 {
158 	return item ?
159 		container_of(to_config_group(item), struct o2nm_cluster,
160 			     cl_group)
161 		: NULL;
162 }
163 
164 static struct o2nm_node *to_o2nm_node(struct config_item *item)
165 {
166 	return item ? container_of(item, struct o2nm_node, nd_item) : NULL;
167 }
168 
169 static void o2nm_node_release(struct config_item *item)
170 {
171 	struct o2nm_node *node = to_o2nm_node(item);
172 	kfree(node);
173 }
174 
175 static ssize_t o2nm_node_num_read(struct o2nm_node *node, char *page)
176 {
177 	return sprintf(page, "%d\n", node->nd_num);
178 }
179 
180 static struct o2nm_cluster *to_o2nm_cluster_from_node(struct o2nm_node *node)
181 {
182 	/* through the first node_set .parent
183 	 * mycluster/nodes/mynode == o2nm_cluster->o2nm_node_group->o2nm_node */
184 	return to_o2nm_cluster(node->nd_item.ci_parent->ci_parent);
185 }
186 
187 enum {
188 	O2NM_NODE_ATTR_NUM = 0,
189 	O2NM_NODE_ATTR_PORT,
190 	O2NM_NODE_ATTR_ADDRESS,
191 };
192 
193 static ssize_t o2nm_node_num_write(struct o2nm_node *node, const char *page,
194 				   size_t count)
195 {
196 	struct o2nm_cluster *cluster = to_o2nm_cluster_from_node(node);
197 	unsigned long tmp;
198 	char *p = (char *)page;
199 	int ret = 0;
200 
201 	tmp = simple_strtoul(p, &p, 0);
202 	if (!p || (*p && (*p != '\n')))
203 		return -EINVAL;
204 
205 	if (tmp >= O2NM_MAX_NODES)
206 		return -ERANGE;
207 
208 	/* once we're in the cl_nodes tree networking can look us up by
209 	 * node number and try to use our address and port attributes
210 	 * to connect to this node.. make sure that they've been set
211 	 * before writing the node attribute? */
212 	if (!test_bit(O2NM_NODE_ATTR_ADDRESS, &node->nd_set_attributes) ||
213 	    !test_bit(O2NM_NODE_ATTR_PORT, &node->nd_set_attributes))
214 		return -EINVAL; /* XXX */
215 
216 	write_lock(&cluster->cl_nodes_lock);
217 	if (cluster->cl_nodes[tmp])
218 		ret = -EEXIST;
219 	else if (test_and_set_bit(O2NM_NODE_ATTR_NUM,
220 			&node->nd_set_attributes))
221 		ret = -EBUSY;
222 	else  {
223 		cluster->cl_nodes[tmp] = node;
224 		node->nd_num = tmp;
225 		set_bit(tmp, cluster->cl_nodes_bitmap);
226 	}
227 	write_unlock(&cluster->cl_nodes_lock);
228 	if (ret)
229 		return ret;
230 
231 	return count;
232 }
233 static ssize_t o2nm_node_ipv4_port_read(struct o2nm_node *node, char *page)
234 {
235 	return sprintf(page, "%u\n", ntohs(node->nd_ipv4_port));
236 }
237 
238 static ssize_t o2nm_node_ipv4_port_write(struct o2nm_node *node,
239 					 const char *page, size_t count)
240 {
241 	unsigned long tmp;
242 	char *p = (char *)page;
243 
244 	tmp = simple_strtoul(p, &p, 0);
245 	if (!p || (*p && (*p != '\n')))
246 		return -EINVAL;
247 
248 	if (tmp == 0)
249 		return -EINVAL;
250 	if (tmp >= (u16)-1)
251 		return -ERANGE;
252 
253 	if (test_and_set_bit(O2NM_NODE_ATTR_PORT, &node->nd_set_attributes))
254 		return -EBUSY;
255 	node->nd_ipv4_port = htons(tmp);
256 
257 	return count;
258 }
259 
260 static ssize_t o2nm_node_ipv4_address_read(struct o2nm_node *node, char *page)
261 {
262 	return sprintf(page, "%pI4\n", &node->nd_ipv4_address);
263 }
264 
265 static ssize_t o2nm_node_ipv4_address_write(struct o2nm_node *node,
266 					    const char *page,
267 					    size_t count)
268 {
269 	struct o2nm_cluster *cluster = to_o2nm_cluster_from_node(node);
270 	int ret, i;
271 	struct rb_node **p, *parent;
272 	unsigned int octets[4];
273 	__be32 ipv4_addr = 0;
274 
275 	ret = sscanf(page, "%3u.%3u.%3u.%3u", &octets[3], &octets[2],
276 		     &octets[1], &octets[0]);
277 	if (ret != 4)
278 		return -EINVAL;
279 
280 	for (i = 0; i < ARRAY_SIZE(octets); i++) {
281 		if (octets[i] > 255)
282 			return -ERANGE;
283 		be32_add_cpu(&ipv4_addr, octets[i] << (i * 8));
284 	}
285 
286 	ret = 0;
287 	write_lock(&cluster->cl_nodes_lock);
288 	if (o2nm_node_ip_tree_lookup(cluster, ipv4_addr, &p, &parent))
289 		ret = -EEXIST;
290 	else if (test_and_set_bit(O2NM_NODE_ATTR_ADDRESS,
291 			&node->nd_set_attributes))
292 		ret = -EBUSY;
293 	else {
294 		rb_link_node(&node->nd_ip_node, parent, p);
295 		rb_insert_color(&node->nd_ip_node, &cluster->cl_node_ip_tree);
296 	}
297 	write_unlock(&cluster->cl_nodes_lock);
298 	if (ret)
299 		return ret;
300 
301 	memcpy(&node->nd_ipv4_address, &ipv4_addr, sizeof(ipv4_addr));
302 
303 	return count;
304 }
305 
306 static ssize_t o2nm_node_local_read(struct o2nm_node *node, char *page)
307 {
308 	return sprintf(page, "%d\n", node->nd_local);
309 }
310 
311 static ssize_t o2nm_node_local_write(struct o2nm_node *node, const char *page,
312 				     size_t count)
313 {
314 	struct o2nm_cluster *cluster = to_o2nm_cluster_from_node(node);
315 	unsigned long tmp;
316 	char *p = (char *)page;
317 	ssize_t ret;
318 
319 	tmp = simple_strtoul(p, &p, 0);
320 	if (!p || (*p && (*p != '\n')))
321 		return -EINVAL;
322 
323 	tmp = !!tmp; /* boolean of whether this node wants to be local */
324 
325 	/* setting local turns on networking rx for now so we require having
326 	 * set everything else first */
327 	if (!test_bit(O2NM_NODE_ATTR_ADDRESS, &node->nd_set_attributes) ||
328 	    !test_bit(O2NM_NODE_ATTR_NUM, &node->nd_set_attributes) ||
329 	    !test_bit(O2NM_NODE_ATTR_PORT, &node->nd_set_attributes))
330 		return -EINVAL; /* XXX */
331 
332 	/* the only failure case is trying to set a new local node
333 	 * when a different one is already set */
334 	if (tmp && tmp == cluster->cl_has_local &&
335 	    cluster->cl_local_node != node->nd_num)
336 		return -EBUSY;
337 
338 	/* bring up the rx thread if we're setting the new local node. */
339 	if (tmp && !cluster->cl_has_local) {
340 		ret = o2net_start_listening(node);
341 		if (ret)
342 			return ret;
343 	}
344 
345 	if (!tmp && cluster->cl_has_local &&
346 	    cluster->cl_local_node == node->nd_num) {
347 		o2net_stop_listening(node);
348 		cluster->cl_local_node = O2NM_INVALID_NODE_NUM;
349 	}
350 
351 	node->nd_local = tmp;
352 	if (node->nd_local) {
353 		cluster->cl_has_local = tmp;
354 		cluster->cl_local_node = node->nd_num;
355 	}
356 
357 	return count;
358 }
359 
360 struct o2nm_node_attribute {
361 	struct configfs_attribute attr;
362 	ssize_t (*show)(struct o2nm_node *, char *);
363 	ssize_t (*store)(struct o2nm_node *, const char *, size_t);
364 };
365 
366 static struct o2nm_node_attribute o2nm_node_attr_num = {
367 	.attr	= { .ca_owner = THIS_MODULE,
368 		    .ca_name = "num",
369 		    .ca_mode = S_IRUGO | S_IWUSR },
370 	.show	= o2nm_node_num_read,
371 	.store	= o2nm_node_num_write,
372 };
373 
374 static struct o2nm_node_attribute o2nm_node_attr_ipv4_port = {
375 	.attr	= { .ca_owner = THIS_MODULE,
376 		    .ca_name = "ipv4_port",
377 		    .ca_mode = S_IRUGO | S_IWUSR },
378 	.show	= o2nm_node_ipv4_port_read,
379 	.store	= o2nm_node_ipv4_port_write,
380 };
381 
382 static struct o2nm_node_attribute o2nm_node_attr_ipv4_address = {
383 	.attr	= { .ca_owner = THIS_MODULE,
384 		    .ca_name = "ipv4_address",
385 		    .ca_mode = S_IRUGO | S_IWUSR },
386 	.show	= o2nm_node_ipv4_address_read,
387 	.store	= o2nm_node_ipv4_address_write,
388 };
389 
390 static struct o2nm_node_attribute o2nm_node_attr_local = {
391 	.attr	= { .ca_owner = THIS_MODULE,
392 		    .ca_name = "local",
393 		    .ca_mode = S_IRUGO | S_IWUSR },
394 	.show	= o2nm_node_local_read,
395 	.store	= o2nm_node_local_write,
396 };
397 
398 static struct configfs_attribute *o2nm_node_attrs[] = {
399 	&o2nm_node_attr_num.attr,
400 	&o2nm_node_attr_ipv4_port.attr,
401 	&o2nm_node_attr_ipv4_address.attr,
402 	&o2nm_node_attr_local.attr,
403 	NULL,
404 };
405 
406 static ssize_t o2nm_node_show(struct config_item *item,
407 			      struct configfs_attribute *attr,
408 			      char *page)
409 {
410 	struct o2nm_node *node = to_o2nm_node(item);
411 	struct o2nm_node_attribute *o2nm_node_attr =
412 		container_of(attr, struct o2nm_node_attribute, attr);
413 	ssize_t ret = 0;
414 
415 	if (o2nm_node_attr->show)
416 		ret = o2nm_node_attr->show(node, page);
417 	return ret;
418 }
419 
420 static ssize_t o2nm_node_store(struct config_item *item,
421 			       struct configfs_attribute *attr,
422 			       const char *page, size_t count)
423 {
424 	struct o2nm_node *node = to_o2nm_node(item);
425 	struct o2nm_node_attribute *o2nm_node_attr =
426 		container_of(attr, struct o2nm_node_attribute, attr);
427 
428 	if (o2nm_node_attr->store == NULL)
429 		return -EINVAL;
430 
431 	return o2nm_node_attr->store(node, page, count);
432 }
433 
434 static struct configfs_item_operations o2nm_node_item_ops = {
435 	.release		= o2nm_node_release,
436 	.show_attribute		= o2nm_node_show,
437 	.store_attribute	= o2nm_node_store,
438 };
439 
440 static struct config_item_type o2nm_node_type = {
441 	.ct_item_ops	= &o2nm_node_item_ops,
442 	.ct_attrs	= o2nm_node_attrs,
443 	.ct_owner	= THIS_MODULE,
444 };
445 
446 /* node set */
447 
448 struct o2nm_node_group {
449 	struct config_group ns_group;
450 	/* some stuff? */
451 };
452 
453 #if 0
454 static struct o2nm_node_group *to_o2nm_node_group(struct config_group *group)
455 {
456 	return group ?
457 		container_of(group, struct o2nm_node_group, ns_group)
458 		: NULL;
459 }
460 #endif
461 
462 struct o2nm_cluster_attribute {
463 	struct configfs_attribute attr;
464 	ssize_t (*show)(struct o2nm_cluster *, char *);
465 	ssize_t (*store)(struct o2nm_cluster *, const char *, size_t);
466 };
467 
468 static ssize_t o2nm_cluster_attr_write(const char *page, ssize_t count,
469                                        unsigned int *val)
470 {
471 	unsigned long tmp;
472 	char *p = (char *)page;
473 
474 	tmp = simple_strtoul(p, &p, 0);
475 	if (!p || (*p && (*p != '\n')))
476 		return -EINVAL;
477 
478 	if (tmp == 0)
479 		return -EINVAL;
480 	if (tmp >= (u32)-1)
481 		return -ERANGE;
482 
483 	*val = tmp;
484 
485 	return count;
486 }
487 
488 static ssize_t o2nm_cluster_attr_idle_timeout_ms_read(
489 	struct o2nm_cluster *cluster, char *page)
490 {
491 	return sprintf(page, "%u\n", cluster->cl_idle_timeout_ms);
492 }
493 
494 static ssize_t o2nm_cluster_attr_idle_timeout_ms_write(
495 	struct o2nm_cluster *cluster, const char *page, size_t count)
496 {
497 	ssize_t ret;
498 	unsigned int val;
499 
500 	ret =  o2nm_cluster_attr_write(page, count, &val);
501 
502 	if (ret > 0) {
503 		if (cluster->cl_idle_timeout_ms != val
504 			&& o2net_num_connected_peers()) {
505 			mlog(ML_NOTICE,
506 			     "o2net: cannot change idle timeout after "
507 			     "the first peer has agreed to it."
508 			     "  %d connected peers\n",
509 			     o2net_num_connected_peers());
510 			ret = -EINVAL;
511 		} else if (val <= cluster->cl_keepalive_delay_ms) {
512 			mlog(ML_NOTICE, "o2net: idle timeout must be larger "
513 			     "than keepalive delay\n");
514 			ret = -EINVAL;
515 		} else {
516 			cluster->cl_idle_timeout_ms = val;
517 		}
518 	}
519 
520 	return ret;
521 }
522 
523 static ssize_t o2nm_cluster_attr_keepalive_delay_ms_read(
524 	struct o2nm_cluster *cluster, char *page)
525 {
526 	return sprintf(page, "%u\n", cluster->cl_keepalive_delay_ms);
527 }
528 
529 static ssize_t o2nm_cluster_attr_keepalive_delay_ms_write(
530 	struct o2nm_cluster *cluster, const char *page, size_t count)
531 {
532 	ssize_t ret;
533 	unsigned int val;
534 
535 	ret =  o2nm_cluster_attr_write(page, count, &val);
536 
537 	if (ret > 0) {
538 		if (cluster->cl_keepalive_delay_ms != val
539 		    && o2net_num_connected_peers()) {
540 			mlog(ML_NOTICE,
541 			     "o2net: cannot change keepalive delay after"
542 			     " the first peer has agreed to it."
543 			     "  %d connected peers\n",
544 			     o2net_num_connected_peers());
545 			ret = -EINVAL;
546 		} else if (val >= cluster->cl_idle_timeout_ms) {
547 			mlog(ML_NOTICE, "o2net: keepalive delay must be "
548 			     "smaller than idle timeout\n");
549 			ret = -EINVAL;
550 		} else {
551 			cluster->cl_keepalive_delay_ms = val;
552 		}
553 	}
554 
555 	return ret;
556 }
557 
558 static ssize_t o2nm_cluster_attr_reconnect_delay_ms_read(
559 	struct o2nm_cluster *cluster, char *page)
560 {
561 	return sprintf(page, "%u\n", cluster->cl_reconnect_delay_ms);
562 }
563 
564 static ssize_t o2nm_cluster_attr_reconnect_delay_ms_write(
565 	struct o2nm_cluster *cluster, const char *page, size_t count)
566 {
567 	return o2nm_cluster_attr_write(page, count,
568 	                               &cluster->cl_reconnect_delay_ms);
569 }
570 
571 static ssize_t o2nm_cluster_attr_fence_method_read(
572 	struct o2nm_cluster *cluster, char *page)
573 {
574 	ssize_t ret = 0;
575 
576 	if (cluster)
577 		ret = sprintf(page, "%s\n",
578 			      o2nm_fence_method_desc[cluster->cl_fence_method]);
579 	return ret;
580 }
581 
582 static ssize_t o2nm_cluster_attr_fence_method_write(
583 	struct o2nm_cluster *cluster, const char *page, size_t count)
584 {
585 	unsigned int i;
586 
587 	if (page[count - 1] != '\n')
588 		goto bail;
589 
590 	for (i = 0; i < O2NM_FENCE_METHODS; ++i) {
591 		if (count != strlen(o2nm_fence_method_desc[i]) + 1)
592 			continue;
593 		if (strncasecmp(page, o2nm_fence_method_desc[i], count - 1))
594 			continue;
595 		if (cluster->cl_fence_method != i) {
596 			printk(KERN_INFO "ocfs2: Changing fence method to %s\n",
597 			       o2nm_fence_method_desc[i]);
598 			cluster->cl_fence_method = i;
599 		}
600 		return count;
601 	}
602 
603 bail:
604 	return -EINVAL;
605 }
606 
607 static struct o2nm_cluster_attribute o2nm_cluster_attr_idle_timeout_ms = {
608 	.attr	= { .ca_owner = THIS_MODULE,
609 		    .ca_name = "idle_timeout_ms",
610 		    .ca_mode = S_IRUGO | S_IWUSR },
611 	.show	= o2nm_cluster_attr_idle_timeout_ms_read,
612 	.store	= o2nm_cluster_attr_idle_timeout_ms_write,
613 };
614 
615 static struct o2nm_cluster_attribute o2nm_cluster_attr_keepalive_delay_ms = {
616 	.attr	= { .ca_owner = THIS_MODULE,
617 		    .ca_name = "keepalive_delay_ms",
618 		    .ca_mode = S_IRUGO | S_IWUSR },
619 	.show	= o2nm_cluster_attr_keepalive_delay_ms_read,
620 	.store	= o2nm_cluster_attr_keepalive_delay_ms_write,
621 };
622 
623 static struct o2nm_cluster_attribute o2nm_cluster_attr_reconnect_delay_ms = {
624 	.attr	= { .ca_owner = THIS_MODULE,
625 		    .ca_name = "reconnect_delay_ms",
626 		    .ca_mode = S_IRUGO | S_IWUSR },
627 	.show	= o2nm_cluster_attr_reconnect_delay_ms_read,
628 	.store	= o2nm_cluster_attr_reconnect_delay_ms_write,
629 };
630 
631 static struct o2nm_cluster_attribute o2nm_cluster_attr_fence_method = {
632 	.attr	= { .ca_owner = THIS_MODULE,
633 		    .ca_name = "fence_method",
634 		    .ca_mode = S_IRUGO | S_IWUSR },
635 	.show	= o2nm_cluster_attr_fence_method_read,
636 	.store	= o2nm_cluster_attr_fence_method_write,
637 };
638 
639 static struct configfs_attribute *o2nm_cluster_attrs[] = {
640 	&o2nm_cluster_attr_idle_timeout_ms.attr,
641 	&o2nm_cluster_attr_keepalive_delay_ms.attr,
642 	&o2nm_cluster_attr_reconnect_delay_ms.attr,
643 	&o2nm_cluster_attr_fence_method.attr,
644 	NULL,
645 };
646 static ssize_t o2nm_cluster_show(struct config_item *item,
647                                  struct configfs_attribute *attr,
648                                  char *page)
649 {
650 	struct o2nm_cluster *cluster = to_o2nm_cluster(item);
651 	struct o2nm_cluster_attribute *o2nm_cluster_attr =
652 		container_of(attr, struct o2nm_cluster_attribute, attr);
653 	ssize_t ret = 0;
654 
655 	if (o2nm_cluster_attr->show)
656 		ret = o2nm_cluster_attr->show(cluster, page);
657 	return ret;
658 }
659 
660 static ssize_t o2nm_cluster_store(struct config_item *item,
661                                   struct configfs_attribute *attr,
662                                   const char *page, size_t count)
663 {
664 	struct o2nm_cluster *cluster = to_o2nm_cluster(item);
665 	struct o2nm_cluster_attribute *o2nm_cluster_attr =
666 		container_of(attr, struct o2nm_cluster_attribute, attr);
667 	ssize_t ret;
668 
669 	if (o2nm_cluster_attr->store == NULL) {
670 		ret = -EINVAL;
671 		goto out;
672 	}
673 
674 	ret = o2nm_cluster_attr->store(cluster, page, count);
675 	if (ret < count)
676 		goto out;
677 out:
678 	return ret;
679 }
680 
681 static struct config_item *o2nm_node_group_make_item(struct config_group *group,
682 						     const char *name)
683 {
684 	struct o2nm_node *node = NULL;
685 
686 	if (strlen(name) > O2NM_MAX_NAME_LEN)
687 		return ERR_PTR(-ENAMETOOLONG);
688 
689 	node = kzalloc(sizeof(struct o2nm_node), GFP_KERNEL);
690 	if (node == NULL)
691 		return ERR_PTR(-ENOMEM);
692 
693 	strcpy(node->nd_name, name); /* use item.ci_namebuf instead? */
694 	config_item_init_type_name(&node->nd_item, name, &o2nm_node_type);
695 	spin_lock_init(&node->nd_lock);
696 
697 	mlog(ML_CLUSTER, "o2nm: Registering node %s\n", name);
698 
699 	return &node->nd_item;
700 }
701 
702 static void o2nm_node_group_drop_item(struct config_group *group,
703 				      struct config_item *item)
704 {
705 	struct o2nm_node *node = to_o2nm_node(item);
706 	struct o2nm_cluster *cluster = to_o2nm_cluster(group->cg_item.ci_parent);
707 
708 	o2net_disconnect_node(node);
709 
710 	if (cluster->cl_has_local &&
711 	    (cluster->cl_local_node == node->nd_num)) {
712 		cluster->cl_has_local = 0;
713 		cluster->cl_local_node = O2NM_INVALID_NODE_NUM;
714 		o2net_stop_listening(node);
715 	}
716 
717 	/* XXX call into net to stop this node from trading messages */
718 
719 	write_lock(&cluster->cl_nodes_lock);
720 
721 	/* XXX sloppy */
722 	if (node->nd_ipv4_address)
723 		rb_erase(&node->nd_ip_node, &cluster->cl_node_ip_tree);
724 
725 	/* nd_num might be 0 if the node number hasn't been set.. */
726 	if (cluster->cl_nodes[node->nd_num] == node) {
727 		cluster->cl_nodes[node->nd_num] = NULL;
728 		clear_bit(node->nd_num, cluster->cl_nodes_bitmap);
729 	}
730 	write_unlock(&cluster->cl_nodes_lock);
731 
732 	mlog(ML_CLUSTER, "o2nm: Unregistered node %s\n",
733 	     config_item_name(&node->nd_item));
734 
735 	config_item_put(item);
736 }
737 
738 static struct configfs_group_operations o2nm_node_group_group_ops = {
739 	.make_item	= o2nm_node_group_make_item,
740 	.drop_item	= o2nm_node_group_drop_item,
741 };
742 
743 static struct config_item_type o2nm_node_group_type = {
744 	.ct_group_ops	= &o2nm_node_group_group_ops,
745 	.ct_owner	= THIS_MODULE,
746 };
747 
748 /* cluster */
749 
750 static void o2nm_cluster_release(struct config_item *item)
751 {
752 	struct o2nm_cluster *cluster = to_o2nm_cluster(item);
753 
754 	kfree(cluster->cl_group.default_groups);
755 	kfree(cluster);
756 }
757 
758 static struct configfs_item_operations o2nm_cluster_item_ops = {
759 	.release	= o2nm_cluster_release,
760 	.show_attribute		= o2nm_cluster_show,
761 	.store_attribute	= o2nm_cluster_store,
762 };
763 
764 static struct config_item_type o2nm_cluster_type = {
765 	.ct_item_ops	= &o2nm_cluster_item_ops,
766 	.ct_attrs	= o2nm_cluster_attrs,
767 	.ct_owner	= THIS_MODULE,
768 };
769 
770 /* cluster set */
771 
772 struct o2nm_cluster_group {
773 	struct configfs_subsystem cs_subsys;
774 	/* some stuff? */
775 };
776 
777 #if 0
778 static struct o2nm_cluster_group *to_o2nm_cluster_group(struct config_group *group)
779 {
780 	return group ?
781 		container_of(to_configfs_subsystem(group), struct o2nm_cluster_group, cs_subsys)
782 	       : NULL;
783 }
784 #endif
785 
786 static struct config_group *o2nm_cluster_group_make_group(struct config_group *group,
787 							  const char *name)
788 {
789 	struct o2nm_cluster *cluster = NULL;
790 	struct o2nm_node_group *ns = NULL;
791 	struct config_group *o2hb_group = NULL, *ret = NULL;
792 	void *defs = NULL;
793 
794 	/* this runs under the parent dir's i_mutex; there can be only
795 	 * one caller in here at a time */
796 	if (o2nm_single_cluster)
797 		return ERR_PTR(-ENOSPC);
798 
799 	cluster = kzalloc(sizeof(struct o2nm_cluster), GFP_KERNEL);
800 	ns = kzalloc(sizeof(struct o2nm_node_group), GFP_KERNEL);
801 	defs = kcalloc(3, sizeof(struct config_group *), GFP_KERNEL);
802 	o2hb_group = o2hb_alloc_hb_set();
803 	if (cluster == NULL || ns == NULL || o2hb_group == NULL || defs == NULL)
804 		goto out;
805 
806 	config_group_init_type_name(&cluster->cl_group, name,
807 				    &o2nm_cluster_type);
808 	config_group_init_type_name(&ns->ns_group, "node",
809 				    &o2nm_node_group_type);
810 
811 	cluster->cl_group.default_groups = defs;
812 	cluster->cl_group.default_groups[0] = &ns->ns_group;
813 	cluster->cl_group.default_groups[1] = o2hb_group;
814 	cluster->cl_group.default_groups[2] = NULL;
815 	rwlock_init(&cluster->cl_nodes_lock);
816 	cluster->cl_node_ip_tree = RB_ROOT;
817 	cluster->cl_reconnect_delay_ms = O2NET_RECONNECT_DELAY_MS_DEFAULT;
818 	cluster->cl_idle_timeout_ms    = O2NET_IDLE_TIMEOUT_MS_DEFAULT;
819 	cluster->cl_keepalive_delay_ms = O2NET_KEEPALIVE_DELAY_MS_DEFAULT;
820 	cluster->cl_fence_method       = O2NM_FENCE_RESET;
821 
822 	ret = &cluster->cl_group;
823 	o2nm_single_cluster = cluster;
824 
825 out:
826 	if (ret == NULL) {
827 		kfree(cluster);
828 		kfree(ns);
829 		o2hb_free_hb_set(o2hb_group);
830 		kfree(defs);
831 		ret = ERR_PTR(-ENOMEM);
832 	}
833 
834 	return ret;
835 }
836 
837 static void o2nm_cluster_group_drop_item(struct config_group *group, struct config_item *item)
838 {
839 	struct o2nm_cluster *cluster = to_o2nm_cluster(item);
840 	int i;
841 	struct config_item *killme;
842 
843 	BUG_ON(o2nm_single_cluster != cluster);
844 	o2nm_single_cluster = NULL;
845 
846 	for (i = 0; cluster->cl_group.default_groups[i]; i++) {
847 		killme = &cluster->cl_group.default_groups[i]->cg_item;
848 		cluster->cl_group.default_groups[i] = NULL;
849 		config_item_put(killme);
850 	}
851 
852 	config_item_put(item);
853 }
854 
855 static struct configfs_group_operations o2nm_cluster_group_group_ops = {
856 	.make_group	= o2nm_cluster_group_make_group,
857 	.drop_item	= o2nm_cluster_group_drop_item,
858 };
859 
860 static struct config_item_type o2nm_cluster_group_type = {
861 	.ct_group_ops	= &o2nm_cluster_group_group_ops,
862 	.ct_owner	= THIS_MODULE,
863 };
864 
865 static struct o2nm_cluster_group o2nm_cluster_group = {
866 	.cs_subsys = {
867 		.su_group = {
868 			.cg_item = {
869 				.ci_namebuf = "cluster",
870 				.ci_type = &o2nm_cluster_group_type,
871 			},
872 		},
873 	},
874 };
875 
876 int o2nm_depend_item(struct config_item *item)
877 {
878 	return configfs_depend_item(&o2nm_cluster_group.cs_subsys, item);
879 }
880 
881 void o2nm_undepend_item(struct config_item *item)
882 {
883 	configfs_undepend_item(&o2nm_cluster_group.cs_subsys, item);
884 }
885 
886 int o2nm_depend_this_node(void)
887 {
888 	int ret = 0;
889 	struct o2nm_node *local_node;
890 
891 	local_node = o2nm_get_node_by_num(o2nm_this_node());
892 	if (!local_node) {
893 		ret = -EINVAL;
894 		goto out;
895 	}
896 
897 	ret = o2nm_depend_item(&local_node->nd_item);
898 	o2nm_node_put(local_node);
899 
900 out:
901 	return ret;
902 }
903 
904 void o2nm_undepend_this_node(void)
905 {
906 	struct o2nm_node *local_node;
907 
908 	local_node = o2nm_get_node_by_num(o2nm_this_node());
909 	BUG_ON(!local_node);
910 
911 	o2nm_undepend_item(&local_node->nd_item);
912 	o2nm_node_put(local_node);
913 }
914 
915 
916 static void __exit exit_o2nm(void)
917 {
918 	/* XXX sync with hb callbacks and shut down hb? */
919 	o2net_unregister_hb_callbacks();
920 	configfs_unregister_subsystem(&o2nm_cluster_group.cs_subsys);
921 	o2cb_sys_shutdown();
922 
923 	o2net_exit();
924 	o2hb_exit();
925 }
926 
927 static int __init init_o2nm(void)
928 {
929 	int ret = -1;
930 
931 	ret = o2hb_init();
932 	if (ret)
933 		goto out;
934 
935 	ret = o2net_init();
936 	if (ret)
937 		goto out_o2hb;
938 
939 	ret = o2net_register_hb_callbacks();
940 	if (ret)
941 		goto out_o2net;
942 
943 	config_group_init(&o2nm_cluster_group.cs_subsys.su_group);
944 	mutex_init(&o2nm_cluster_group.cs_subsys.su_mutex);
945 	ret = configfs_register_subsystem(&o2nm_cluster_group.cs_subsys);
946 	if (ret) {
947 		printk(KERN_ERR "nodemanager: Registration returned %d\n", ret);
948 		goto out_callbacks;
949 	}
950 
951 	ret = o2cb_sys_init();
952 	if (!ret)
953 		goto out;
954 
955 	configfs_unregister_subsystem(&o2nm_cluster_group.cs_subsys);
956 out_callbacks:
957 	o2net_unregister_hb_callbacks();
958 out_o2net:
959 	o2net_exit();
960 out_o2hb:
961 	o2hb_exit();
962 out:
963 	return ret;
964 }
965 
966 MODULE_AUTHOR("Oracle");
967 MODULE_LICENSE("GPL");
968 MODULE_DESCRIPTION("OCFS2 cluster management");
969 
970 module_init(init_o2nm)
971 module_exit(exit_o2nm)
972