xref: /openbmc/linux/fs/ocfs2/cluster/nodemanager.c (revision b04b4f78)
1 /* -*- mode: c; c-basic-offset: 8; -*-
2  * vim: noexpandtab sw=8 ts=8 sts=0:
3  *
4  * Copyright (C) 2004, 2005 Oracle.  All rights reserved.
5  *
6  * This program is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU General Public
8  * License as published by the Free Software Foundation; either
9  * version 2 of the License, or (at your option) any later version.
10  *
11  * This program is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14  * General Public License for more details.
15  *
16  * You should have received a copy of the GNU General Public
17  * License along with this program; if not, write to the
18  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
19  * Boston, MA 021110-1307, USA.
20  */
21 
22 #include <linux/kernel.h>
23 #include <linux/module.h>
24 #include <linux/configfs.h>
25 
26 #include "tcp.h"
27 #include "nodemanager.h"
28 #include "heartbeat.h"
29 #include "masklog.h"
30 #include "sys.h"
31 #include "ver.h"
32 
33 /* for now we operate under the assertion that there can be only one
34  * cluster active at a time.  Changing this will require trickling
35  * cluster references throughout where nodes are looked up */
36 struct o2nm_cluster *o2nm_single_cluster = NULL;
37 
38 
39 struct o2nm_node *o2nm_get_node_by_num(u8 node_num)
40 {
41 	struct o2nm_node *node = NULL;
42 
43 	if (node_num >= O2NM_MAX_NODES || o2nm_single_cluster == NULL)
44 		goto out;
45 
46 	read_lock(&o2nm_single_cluster->cl_nodes_lock);
47 	node = o2nm_single_cluster->cl_nodes[node_num];
48 	if (node)
49 		config_item_get(&node->nd_item);
50 	read_unlock(&o2nm_single_cluster->cl_nodes_lock);
51 out:
52 	return node;
53 }
54 EXPORT_SYMBOL_GPL(o2nm_get_node_by_num);
55 
56 int o2nm_configured_node_map(unsigned long *map, unsigned bytes)
57 {
58 	struct o2nm_cluster *cluster = o2nm_single_cluster;
59 
60 	BUG_ON(bytes < (sizeof(cluster->cl_nodes_bitmap)));
61 
62 	if (cluster == NULL)
63 		return -EINVAL;
64 
65 	read_lock(&cluster->cl_nodes_lock);
66 	memcpy(map, cluster->cl_nodes_bitmap, sizeof(cluster->cl_nodes_bitmap));
67 	read_unlock(&cluster->cl_nodes_lock);
68 
69 	return 0;
70 }
71 EXPORT_SYMBOL_GPL(o2nm_configured_node_map);
72 
73 static struct o2nm_node *o2nm_node_ip_tree_lookup(struct o2nm_cluster *cluster,
74 						  __be32 ip_needle,
75 						  struct rb_node ***ret_p,
76 						  struct rb_node **ret_parent)
77 {
78 	struct rb_node **p = &cluster->cl_node_ip_tree.rb_node;
79 	struct rb_node *parent = NULL;
80 	struct o2nm_node *node, *ret = NULL;
81 
82 	while (*p) {
83 		int cmp;
84 
85 		parent = *p;
86 		node = rb_entry(parent, struct o2nm_node, nd_ip_node);
87 
88 		cmp = memcmp(&ip_needle, &node->nd_ipv4_address,
89 				sizeof(ip_needle));
90 		if (cmp < 0)
91 			p = &(*p)->rb_left;
92 		else if (cmp > 0)
93 			p = &(*p)->rb_right;
94 		else {
95 			ret = node;
96 			break;
97 		}
98 	}
99 
100 	if (ret_p != NULL)
101 		*ret_p = p;
102 	if (ret_parent != NULL)
103 		*ret_parent = parent;
104 
105 	return ret;
106 }
107 
108 struct o2nm_node *o2nm_get_node_by_ip(__be32 addr)
109 {
110 	struct o2nm_node *node = NULL;
111 	struct o2nm_cluster *cluster = o2nm_single_cluster;
112 
113 	if (cluster == NULL)
114 		goto out;
115 
116 	read_lock(&cluster->cl_nodes_lock);
117 	node = o2nm_node_ip_tree_lookup(cluster, addr, NULL, NULL);
118 	if (node)
119 		config_item_get(&node->nd_item);
120 	read_unlock(&cluster->cl_nodes_lock);
121 
122 out:
123 	return node;
124 }
125 EXPORT_SYMBOL_GPL(o2nm_get_node_by_ip);
126 
127 void o2nm_node_put(struct o2nm_node *node)
128 {
129 	config_item_put(&node->nd_item);
130 }
131 EXPORT_SYMBOL_GPL(o2nm_node_put);
132 
133 void o2nm_node_get(struct o2nm_node *node)
134 {
135 	config_item_get(&node->nd_item);
136 }
137 EXPORT_SYMBOL_GPL(o2nm_node_get);
138 
139 u8 o2nm_this_node(void)
140 {
141 	u8 node_num = O2NM_MAX_NODES;
142 
143 	if (o2nm_single_cluster && o2nm_single_cluster->cl_has_local)
144 		node_num = o2nm_single_cluster->cl_local_node;
145 
146 	return node_num;
147 }
148 EXPORT_SYMBOL_GPL(o2nm_this_node);
149 
150 /* node configfs bits */
151 
152 static struct o2nm_cluster *to_o2nm_cluster(struct config_item *item)
153 {
154 	return item ?
155 		container_of(to_config_group(item), struct o2nm_cluster,
156 			     cl_group)
157 		: NULL;
158 }
159 
160 static struct o2nm_node *to_o2nm_node(struct config_item *item)
161 {
162 	return item ? container_of(item, struct o2nm_node, nd_item) : NULL;
163 }
164 
165 static void o2nm_node_release(struct config_item *item)
166 {
167 	struct o2nm_node *node = to_o2nm_node(item);
168 	kfree(node);
169 }
170 
171 static ssize_t o2nm_node_num_read(struct o2nm_node *node, char *page)
172 {
173 	return sprintf(page, "%d\n", node->nd_num);
174 }
175 
176 static struct o2nm_cluster *to_o2nm_cluster_from_node(struct o2nm_node *node)
177 {
178 	/* through the first node_set .parent
179 	 * mycluster/nodes/mynode == o2nm_cluster->o2nm_node_group->o2nm_node */
180 	return to_o2nm_cluster(node->nd_item.ci_parent->ci_parent);
181 }
182 
183 enum {
184 	O2NM_NODE_ATTR_NUM = 0,
185 	O2NM_NODE_ATTR_PORT,
186 	O2NM_NODE_ATTR_ADDRESS,
187 	O2NM_NODE_ATTR_LOCAL,
188 };
189 
190 static ssize_t o2nm_node_num_write(struct o2nm_node *node, const char *page,
191 				   size_t count)
192 {
193 	struct o2nm_cluster *cluster = to_o2nm_cluster_from_node(node);
194 	unsigned long tmp;
195 	char *p = (char *)page;
196 
197 	tmp = simple_strtoul(p, &p, 0);
198 	if (!p || (*p && (*p != '\n')))
199 		return -EINVAL;
200 
201 	if (tmp >= O2NM_MAX_NODES)
202 		return -ERANGE;
203 
204 	/* once we're in the cl_nodes tree networking can look us up by
205 	 * node number and try to use our address and port attributes
206 	 * to connect to this node.. make sure that they've been set
207 	 * before writing the node attribute? */
208 	if (!test_bit(O2NM_NODE_ATTR_ADDRESS, &node->nd_set_attributes) ||
209 	    !test_bit(O2NM_NODE_ATTR_PORT, &node->nd_set_attributes))
210 		return -EINVAL; /* XXX */
211 
212 	write_lock(&cluster->cl_nodes_lock);
213 	if (cluster->cl_nodes[tmp])
214 		p = NULL;
215 	else  {
216 		cluster->cl_nodes[tmp] = node;
217 		node->nd_num = tmp;
218 		set_bit(tmp, cluster->cl_nodes_bitmap);
219 	}
220 	write_unlock(&cluster->cl_nodes_lock);
221 	if (p == NULL)
222 		return -EEXIST;
223 
224 	return count;
225 }
226 static ssize_t o2nm_node_ipv4_port_read(struct o2nm_node *node, char *page)
227 {
228 	return sprintf(page, "%u\n", ntohs(node->nd_ipv4_port));
229 }
230 
231 static ssize_t o2nm_node_ipv4_port_write(struct o2nm_node *node,
232 					 const char *page, size_t count)
233 {
234 	unsigned long tmp;
235 	char *p = (char *)page;
236 
237 	tmp = simple_strtoul(p, &p, 0);
238 	if (!p || (*p && (*p != '\n')))
239 		return -EINVAL;
240 
241 	if (tmp == 0)
242 		return -EINVAL;
243 	if (tmp >= (u16)-1)
244 		return -ERANGE;
245 
246 	node->nd_ipv4_port = htons(tmp);
247 
248 	return count;
249 }
250 
251 static ssize_t o2nm_node_ipv4_address_read(struct o2nm_node *node, char *page)
252 {
253 	return sprintf(page, "%pI4\n", &node->nd_ipv4_address);
254 }
255 
256 static ssize_t o2nm_node_ipv4_address_write(struct o2nm_node *node,
257 					    const char *page,
258 					    size_t count)
259 {
260 	struct o2nm_cluster *cluster = to_o2nm_cluster_from_node(node);
261 	int ret, i;
262 	struct rb_node **p, *parent;
263 	unsigned int octets[4];
264 	__be32 ipv4_addr = 0;
265 
266 	ret = sscanf(page, "%3u.%3u.%3u.%3u", &octets[3], &octets[2],
267 		     &octets[1], &octets[0]);
268 	if (ret != 4)
269 		return -EINVAL;
270 
271 	for (i = 0; i < ARRAY_SIZE(octets); i++) {
272 		if (octets[i] > 255)
273 			return -ERANGE;
274 		be32_add_cpu(&ipv4_addr, octets[i] << (i * 8));
275 	}
276 
277 	ret = 0;
278 	write_lock(&cluster->cl_nodes_lock);
279 	if (o2nm_node_ip_tree_lookup(cluster, ipv4_addr, &p, &parent))
280 		ret = -EEXIST;
281 	else {
282 		rb_link_node(&node->nd_ip_node, parent, p);
283 		rb_insert_color(&node->nd_ip_node, &cluster->cl_node_ip_tree);
284 	}
285 	write_unlock(&cluster->cl_nodes_lock);
286 	if (ret)
287 		return ret;
288 
289 	memcpy(&node->nd_ipv4_address, &ipv4_addr, sizeof(ipv4_addr));
290 
291 	return count;
292 }
293 
294 static ssize_t o2nm_node_local_read(struct o2nm_node *node, char *page)
295 {
296 	return sprintf(page, "%d\n", node->nd_local);
297 }
298 
299 static ssize_t o2nm_node_local_write(struct o2nm_node *node, const char *page,
300 				     size_t count)
301 {
302 	struct o2nm_cluster *cluster = to_o2nm_cluster_from_node(node);
303 	unsigned long tmp;
304 	char *p = (char *)page;
305 	ssize_t ret;
306 
307 	tmp = simple_strtoul(p, &p, 0);
308 	if (!p || (*p && (*p != '\n')))
309 		return -EINVAL;
310 
311 	tmp = !!tmp; /* boolean of whether this node wants to be local */
312 
313 	/* setting local turns on networking rx for now so we require having
314 	 * set everything else first */
315 	if (!test_bit(O2NM_NODE_ATTR_ADDRESS, &node->nd_set_attributes) ||
316 	    !test_bit(O2NM_NODE_ATTR_NUM, &node->nd_set_attributes) ||
317 	    !test_bit(O2NM_NODE_ATTR_PORT, &node->nd_set_attributes))
318 		return -EINVAL; /* XXX */
319 
320 	/* the only failure case is trying to set a new local node
321 	 * when a different one is already set */
322 	if (tmp && tmp == cluster->cl_has_local &&
323 	    cluster->cl_local_node != node->nd_num)
324 		return -EBUSY;
325 
326 	/* bring up the rx thread if we're setting the new local node. */
327 	if (tmp && !cluster->cl_has_local) {
328 		ret = o2net_start_listening(node);
329 		if (ret)
330 			return ret;
331 	}
332 
333 	if (!tmp && cluster->cl_has_local &&
334 	    cluster->cl_local_node == node->nd_num) {
335 		o2net_stop_listening(node);
336 		cluster->cl_local_node = O2NM_INVALID_NODE_NUM;
337 	}
338 
339 	node->nd_local = tmp;
340 	if (node->nd_local) {
341 		cluster->cl_has_local = tmp;
342 		cluster->cl_local_node = node->nd_num;
343 	}
344 
345 	return count;
346 }
347 
348 struct o2nm_node_attribute {
349 	struct configfs_attribute attr;
350 	ssize_t (*show)(struct o2nm_node *, char *);
351 	ssize_t (*store)(struct o2nm_node *, const char *, size_t);
352 };
353 
354 static struct o2nm_node_attribute o2nm_node_attr_num = {
355 	.attr	= { .ca_owner = THIS_MODULE,
356 		    .ca_name = "num",
357 		    .ca_mode = S_IRUGO | S_IWUSR },
358 	.show	= o2nm_node_num_read,
359 	.store	= o2nm_node_num_write,
360 };
361 
362 static struct o2nm_node_attribute o2nm_node_attr_ipv4_port = {
363 	.attr	= { .ca_owner = THIS_MODULE,
364 		    .ca_name = "ipv4_port",
365 		    .ca_mode = S_IRUGO | S_IWUSR },
366 	.show	= o2nm_node_ipv4_port_read,
367 	.store	= o2nm_node_ipv4_port_write,
368 };
369 
370 static struct o2nm_node_attribute o2nm_node_attr_ipv4_address = {
371 	.attr	= { .ca_owner = THIS_MODULE,
372 		    .ca_name = "ipv4_address",
373 		    .ca_mode = S_IRUGO | S_IWUSR },
374 	.show	= o2nm_node_ipv4_address_read,
375 	.store	= o2nm_node_ipv4_address_write,
376 };
377 
378 static struct o2nm_node_attribute o2nm_node_attr_local = {
379 	.attr	= { .ca_owner = THIS_MODULE,
380 		    .ca_name = "local",
381 		    .ca_mode = S_IRUGO | S_IWUSR },
382 	.show	= o2nm_node_local_read,
383 	.store	= o2nm_node_local_write,
384 };
385 
386 static struct configfs_attribute *o2nm_node_attrs[] = {
387 	[O2NM_NODE_ATTR_NUM] = &o2nm_node_attr_num.attr,
388 	[O2NM_NODE_ATTR_PORT] = &o2nm_node_attr_ipv4_port.attr,
389 	[O2NM_NODE_ATTR_ADDRESS] = &o2nm_node_attr_ipv4_address.attr,
390 	[O2NM_NODE_ATTR_LOCAL] = &o2nm_node_attr_local.attr,
391 	NULL,
392 };
393 
394 static int o2nm_attr_index(struct configfs_attribute *attr)
395 {
396 	int i;
397 	for (i = 0; i < ARRAY_SIZE(o2nm_node_attrs); i++) {
398 		if (attr == o2nm_node_attrs[i])
399 			return i;
400 	}
401 	BUG();
402 	return 0;
403 }
404 
405 static ssize_t o2nm_node_show(struct config_item *item,
406 			      struct configfs_attribute *attr,
407 			      char *page)
408 {
409 	struct o2nm_node *node = to_o2nm_node(item);
410 	struct o2nm_node_attribute *o2nm_node_attr =
411 		container_of(attr, struct o2nm_node_attribute, attr);
412 	ssize_t ret = 0;
413 
414 	if (o2nm_node_attr->show)
415 		ret = o2nm_node_attr->show(node, page);
416 	return ret;
417 }
418 
419 static ssize_t o2nm_node_store(struct config_item *item,
420 			       struct configfs_attribute *attr,
421 			       const char *page, size_t count)
422 {
423 	struct o2nm_node *node = to_o2nm_node(item);
424 	struct o2nm_node_attribute *o2nm_node_attr =
425 		container_of(attr, struct o2nm_node_attribute, attr);
426 	ssize_t ret;
427 	int attr_index = o2nm_attr_index(attr);
428 
429 	if (o2nm_node_attr->store == NULL) {
430 		ret = -EINVAL;
431 		goto out;
432 	}
433 
434 	if (test_bit(attr_index, &node->nd_set_attributes))
435 		return -EBUSY;
436 
437 	ret = o2nm_node_attr->store(node, page, count);
438 	if (ret < count)
439 		goto out;
440 
441 	set_bit(attr_index, &node->nd_set_attributes);
442 out:
443 	return ret;
444 }
445 
446 static struct configfs_item_operations o2nm_node_item_ops = {
447 	.release		= o2nm_node_release,
448 	.show_attribute		= o2nm_node_show,
449 	.store_attribute	= o2nm_node_store,
450 };
451 
452 static struct config_item_type o2nm_node_type = {
453 	.ct_item_ops	= &o2nm_node_item_ops,
454 	.ct_attrs	= o2nm_node_attrs,
455 	.ct_owner	= THIS_MODULE,
456 };
457 
458 /* node set */
459 
460 struct o2nm_node_group {
461 	struct config_group ns_group;
462 	/* some stuff? */
463 };
464 
465 #if 0
466 static struct o2nm_node_group *to_o2nm_node_group(struct config_group *group)
467 {
468 	return group ?
469 		container_of(group, struct o2nm_node_group, ns_group)
470 		: NULL;
471 }
472 #endif
473 
474 struct o2nm_cluster_attribute {
475 	struct configfs_attribute attr;
476 	ssize_t (*show)(struct o2nm_cluster *, char *);
477 	ssize_t (*store)(struct o2nm_cluster *, const char *, size_t);
478 };
479 
480 static ssize_t o2nm_cluster_attr_write(const char *page, ssize_t count,
481                                        unsigned int *val)
482 {
483 	unsigned long tmp;
484 	char *p = (char *)page;
485 
486 	tmp = simple_strtoul(p, &p, 0);
487 	if (!p || (*p && (*p != '\n')))
488 		return -EINVAL;
489 
490 	if (tmp == 0)
491 		return -EINVAL;
492 	if (tmp >= (u32)-1)
493 		return -ERANGE;
494 
495 	*val = tmp;
496 
497 	return count;
498 }
499 
500 static ssize_t o2nm_cluster_attr_idle_timeout_ms_read(
501 	struct o2nm_cluster *cluster, char *page)
502 {
503 	return sprintf(page, "%u\n", cluster->cl_idle_timeout_ms);
504 }
505 
506 static ssize_t o2nm_cluster_attr_idle_timeout_ms_write(
507 	struct o2nm_cluster *cluster, const char *page, size_t count)
508 {
509 	ssize_t ret;
510 	unsigned int val;
511 
512 	ret =  o2nm_cluster_attr_write(page, count, &val);
513 
514 	if (ret > 0) {
515 		if (cluster->cl_idle_timeout_ms != val
516 			&& o2net_num_connected_peers()) {
517 			mlog(ML_NOTICE,
518 			     "o2net: cannot change idle timeout after "
519 			     "the first peer has agreed to it."
520 			     "  %d connected peers\n",
521 			     o2net_num_connected_peers());
522 			ret = -EINVAL;
523 		} else if (val <= cluster->cl_keepalive_delay_ms) {
524 			mlog(ML_NOTICE, "o2net: idle timeout must be larger "
525 			     "than keepalive delay\n");
526 			ret = -EINVAL;
527 		} else {
528 			cluster->cl_idle_timeout_ms = val;
529 		}
530 	}
531 
532 	return ret;
533 }
534 
535 static ssize_t o2nm_cluster_attr_keepalive_delay_ms_read(
536 	struct o2nm_cluster *cluster, char *page)
537 {
538 	return sprintf(page, "%u\n", cluster->cl_keepalive_delay_ms);
539 }
540 
541 static ssize_t o2nm_cluster_attr_keepalive_delay_ms_write(
542 	struct o2nm_cluster *cluster, const char *page, size_t count)
543 {
544 	ssize_t ret;
545 	unsigned int val;
546 
547 	ret =  o2nm_cluster_attr_write(page, count, &val);
548 
549 	if (ret > 0) {
550 		if (cluster->cl_keepalive_delay_ms != val
551 		    && o2net_num_connected_peers()) {
552 			mlog(ML_NOTICE,
553 			     "o2net: cannot change keepalive delay after"
554 			     " the first peer has agreed to it."
555 			     "  %d connected peers\n",
556 			     o2net_num_connected_peers());
557 			ret = -EINVAL;
558 		} else if (val >= cluster->cl_idle_timeout_ms) {
559 			mlog(ML_NOTICE, "o2net: keepalive delay must be "
560 			     "smaller than idle timeout\n");
561 			ret = -EINVAL;
562 		} else {
563 			cluster->cl_keepalive_delay_ms = val;
564 		}
565 	}
566 
567 	return ret;
568 }
569 
570 static ssize_t o2nm_cluster_attr_reconnect_delay_ms_read(
571 	struct o2nm_cluster *cluster, char *page)
572 {
573 	return sprintf(page, "%u\n", cluster->cl_reconnect_delay_ms);
574 }
575 
576 static ssize_t o2nm_cluster_attr_reconnect_delay_ms_write(
577 	struct o2nm_cluster *cluster, const char *page, size_t count)
578 {
579 	return o2nm_cluster_attr_write(page, count,
580 	                               &cluster->cl_reconnect_delay_ms);
581 }
582 static struct o2nm_cluster_attribute o2nm_cluster_attr_idle_timeout_ms = {
583 	.attr	= { .ca_owner = THIS_MODULE,
584 		    .ca_name = "idle_timeout_ms",
585 		    .ca_mode = S_IRUGO | S_IWUSR },
586 	.show	= o2nm_cluster_attr_idle_timeout_ms_read,
587 	.store	= o2nm_cluster_attr_idle_timeout_ms_write,
588 };
589 
590 static struct o2nm_cluster_attribute o2nm_cluster_attr_keepalive_delay_ms = {
591 	.attr	= { .ca_owner = THIS_MODULE,
592 		    .ca_name = "keepalive_delay_ms",
593 		    .ca_mode = S_IRUGO | S_IWUSR },
594 	.show	= o2nm_cluster_attr_keepalive_delay_ms_read,
595 	.store	= o2nm_cluster_attr_keepalive_delay_ms_write,
596 };
597 
598 static struct o2nm_cluster_attribute o2nm_cluster_attr_reconnect_delay_ms = {
599 	.attr	= { .ca_owner = THIS_MODULE,
600 		    .ca_name = "reconnect_delay_ms",
601 		    .ca_mode = S_IRUGO | S_IWUSR },
602 	.show	= o2nm_cluster_attr_reconnect_delay_ms_read,
603 	.store	= o2nm_cluster_attr_reconnect_delay_ms_write,
604 };
605 
606 static struct configfs_attribute *o2nm_cluster_attrs[] = {
607 	&o2nm_cluster_attr_idle_timeout_ms.attr,
608 	&o2nm_cluster_attr_keepalive_delay_ms.attr,
609 	&o2nm_cluster_attr_reconnect_delay_ms.attr,
610 	NULL,
611 };
612 static ssize_t o2nm_cluster_show(struct config_item *item,
613                                  struct configfs_attribute *attr,
614                                  char *page)
615 {
616 	struct o2nm_cluster *cluster = to_o2nm_cluster(item);
617 	struct o2nm_cluster_attribute *o2nm_cluster_attr =
618 		container_of(attr, struct o2nm_cluster_attribute, attr);
619 	ssize_t ret = 0;
620 
621 	if (o2nm_cluster_attr->show)
622 		ret = o2nm_cluster_attr->show(cluster, page);
623 	return ret;
624 }
625 
626 static ssize_t o2nm_cluster_store(struct config_item *item,
627                                   struct configfs_attribute *attr,
628                                   const char *page, size_t count)
629 {
630 	struct o2nm_cluster *cluster = to_o2nm_cluster(item);
631 	struct o2nm_cluster_attribute *o2nm_cluster_attr =
632 		container_of(attr, struct o2nm_cluster_attribute, attr);
633 	ssize_t ret;
634 
635 	if (o2nm_cluster_attr->store == NULL) {
636 		ret = -EINVAL;
637 		goto out;
638 	}
639 
640 	ret = o2nm_cluster_attr->store(cluster, page, count);
641 	if (ret < count)
642 		goto out;
643 out:
644 	return ret;
645 }
646 
647 static struct config_item *o2nm_node_group_make_item(struct config_group *group,
648 						     const char *name)
649 {
650 	struct o2nm_node *node = NULL;
651 
652 	if (strlen(name) > O2NM_MAX_NAME_LEN)
653 		return ERR_PTR(-ENAMETOOLONG);
654 
655 	node = kzalloc(sizeof(struct o2nm_node), GFP_KERNEL);
656 	if (node == NULL)
657 		return ERR_PTR(-ENOMEM);
658 
659 	strcpy(node->nd_name, name); /* use item.ci_namebuf instead? */
660 	config_item_init_type_name(&node->nd_item, name, &o2nm_node_type);
661 	spin_lock_init(&node->nd_lock);
662 
663 	return &node->nd_item;
664 }
665 
666 static void o2nm_node_group_drop_item(struct config_group *group,
667 				      struct config_item *item)
668 {
669 	struct o2nm_node *node = to_o2nm_node(item);
670 	struct o2nm_cluster *cluster = to_o2nm_cluster(group->cg_item.ci_parent);
671 
672 	o2net_disconnect_node(node);
673 
674 	if (cluster->cl_has_local &&
675 	    (cluster->cl_local_node == node->nd_num)) {
676 		cluster->cl_has_local = 0;
677 		cluster->cl_local_node = O2NM_INVALID_NODE_NUM;
678 		o2net_stop_listening(node);
679 	}
680 
681 	/* XXX call into net to stop this node from trading messages */
682 
683 	write_lock(&cluster->cl_nodes_lock);
684 
685 	/* XXX sloppy */
686 	if (node->nd_ipv4_address)
687 		rb_erase(&node->nd_ip_node, &cluster->cl_node_ip_tree);
688 
689 	/* nd_num might be 0 if the node number hasn't been set.. */
690 	if (cluster->cl_nodes[node->nd_num] == node) {
691 		cluster->cl_nodes[node->nd_num] = NULL;
692 		clear_bit(node->nd_num, cluster->cl_nodes_bitmap);
693 	}
694 	write_unlock(&cluster->cl_nodes_lock);
695 
696 	config_item_put(item);
697 }
698 
699 static struct configfs_group_operations o2nm_node_group_group_ops = {
700 	.make_item	= o2nm_node_group_make_item,
701 	.drop_item	= o2nm_node_group_drop_item,
702 };
703 
704 static struct config_item_type o2nm_node_group_type = {
705 	.ct_group_ops	= &o2nm_node_group_group_ops,
706 	.ct_owner	= THIS_MODULE,
707 };
708 
709 /* cluster */
710 
711 static void o2nm_cluster_release(struct config_item *item)
712 {
713 	struct o2nm_cluster *cluster = to_o2nm_cluster(item);
714 
715 	kfree(cluster->cl_group.default_groups);
716 	kfree(cluster);
717 }
718 
719 static struct configfs_item_operations o2nm_cluster_item_ops = {
720 	.release	= o2nm_cluster_release,
721 	.show_attribute		= o2nm_cluster_show,
722 	.store_attribute	= o2nm_cluster_store,
723 };
724 
725 static struct config_item_type o2nm_cluster_type = {
726 	.ct_item_ops	= &o2nm_cluster_item_ops,
727 	.ct_attrs	= o2nm_cluster_attrs,
728 	.ct_owner	= THIS_MODULE,
729 };
730 
731 /* cluster set */
732 
733 struct o2nm_cluster_group {
734 	struct configfs_subsystem cs_subsys;
735 	/* some stuff? */
736 };
737 
738 #if 0
739 static struct o2nm_cluster_group *to_o2nm_cluster_group(struct config_group *group)
740 {
741 	return group ?
742 		container_of(to_configfs_subsystem(group), struct o2nm_cluster_group, cs_subsys)
743 	       : NULL;
744 }
745 #endif
746 
747 static struct config_group *o2nm_cluster_group_make_group(struct config_group *group,
748 							  const char *name)
749 {
750 	struct o2nm_cluster *cluster = NULL;
751 	struct o2nm_node_group *ns = NULL;
752 	struct config_group *o2hb_group = NULL, *ret = NULL;
753 	void *defs = NULL;
754 
755 	/* this runs under the parent dir's i_mutex; there can be only
756 	 * one caller in here at a time */
757 	if (o2nm_single_cluster)
758 		return ERR_PTR(-ENOSPC);
759 
760 	cluster = kzalloc(sizeof(struct o2nm_cluster), GFP_KERNEL);
761 	ns = kzalloc(sizeof(struct o2nm_node_group), GFP_KERNEL);
762 	defs = kcalloc(3, sizeof(struct config_group *), GFP_KERNEL);
763 	o2hb_group = o2hb_alloc_hb_set();
764 	if (cluster == NULL || ns == NULL || o2hb_group == NULL || defs == NULL)
765 		goto out;
766 
767 	config_group_init_type_name(&cluster->cl_group, name,
768 				    &o2nm_cluster_type);
769 	config_group_init_type_name(&ns->ns_group, "node",
770 				    &o2nm_node_group_type);
771 
772 	cluster->cl_group.default_groups = defs;
773 	cluster->cl_group.default_groups[0] = &ns->ns_group;
774 	cluster->cl_group.default_groups[1] = o2hb_group;
775 	cluster->cl_group.default_groups[2] = NULL;
776 	rwlock_init(&cluster->cl_nodes_lock);
777 	cluster->cl_node_ip_tree = RB_ROOT;
778 	cluster->cl_reconnect_delay_ms = O2NET_RECONNECT_DELAY_MS_DEFAULT;
779 	cluster->cl_idle_timeout_ms    = O2NET_IDLE_TIMEOUT_MS_DEFAULT;
780 	cluster->cl_keepalive_delay_ms = O2NET_KEEPALIVE_DELAY_MS_DEFAULT;
781 
782 	ret = &cluster->cl_group;
783 	o2nm_single_cluster = cluster;
784 
785 out:
786 	if (ret == NULL) {
787 		kfree(cluster);
788 		kfree(ns);
789 		o2hb_free_hb_set(o2hb_group);
790 		kfree(defs);
791 		ret = ERR_PTR(-ENOMEM);
792 	}
793 
794 	return ret;
795 }
796 
797 static void o2nm_cluster_group_drop_item(struct config_group *group, struct config_item *item)
798 {
799 	struct o2nm_cluster *cluster = to_o2nm_cluster(item);
800 	int i;
801 	struct config_item *killme;
802 
803 	BUG_ON(o2nm_single_cluster != cluster);
804 	o2nm_single_cluster = NULL;
805 
806 	for (i = 0; cluster->cl_group.default_groups[i]; i++) {
807 		killme = &cluster->cl_group.default_groups[i]->cg_item;
808 		cluster->cl_group.default_groups[i] = NULL;
809 		config_item_put(killme);
810 	}
811 
812 	config_item_put(item);
813 }
814 
815 static struct configfs_group_operations o2nm_cluster_group_group_ops = {
816 	.make_group	= o2nm_cluster_group_make_group,
817 	.drop_item	= o2nm_cluster_group_drop_item,
818 };
819 
820 static struct config_item_type o2nm_cluster_group_type = {
821 	.ct_group_ops	= &o2nm_cluster_group_group_ops,
822 	.ct_owner	= THIS_MODULE,
823 };
824 
825 static struct o2nm_cluster_group o2nm_cluster_group = {
826 	.cs_subsys = {
827 		.su_group = {
828 			.cg_item = {
829 				.ci_namebuf = "cluster",
830 				.ci_type = &o2nm_cluster_group_type,
831 			},
832 		},
833 	},
834 };
835 
836 int o2nm_depend_item(struct config_item *item)
837 {
838 	return configfs_depend_item(&o2nm_cluster_group.cs_subsys, item);
839 }
840 
841 void o2nm_undepend_item(struct config_item *item)
842 {
843 	configfs_undepend_item(&o2nm_cluster_group.cs_subsys, item);
844 }
845 
846 int o2nm_depend_this_node(void)
847 {
848 	int ret = 0;
849 	struct o2nm_node *local_node;
850 
851 	local_node = o2nm_get_node_by_num(o2nm_this_node());
852 	if (!local_node) {
853 		ret = -EINVAL;
854 		goto out;
855 	}
856 
857 	ret = o2nm_depend_item(&local_node->nd_item);
858 	o2nm_node_put(local_node);
859 
860 out:
861 	return ret;
862 }
863 
864 void o2nm_undepend_this_node(void)
865 {
866 	struct o2nm_node *local_node;
867 
868 	local_node = o2nm_get_node_by_num(o2nm_this_node());
869 	BUG_ON(!local_node);
870 
871 	o2nm_undepend_item(&local_node->nd_item);
872 	o2nm_node_put(local_node);
873 }
874 
875 
876 static void __exit exit_o2nm(void)
877 {
878 	/* XXX sync with hb callbacks and shut down hb? */
879 	o2net_unregister_hb_callbacks();
880 	configfs_unregister_subsystem(&o2nm_cluster_group.cs_subsys);
881 	o2cb_sys_shutdown();
882 
883 	o2net_exit();
884 	o2hb_exit();
885 }
886 
887 static int __init init_o2nm(void)
888 {
889 	int ret = -1;
890 
891 	cluster_print_version();
892 
893 	ret = o2hb_init();
894 	if (ret)
895 		goto out;
896 
897 	ret = o2net_init();
898 	if (ret)
899 		goto out_o2hb;
900 
901 	ret = o2net_register_hb_callbacks();
902 	if (ret)
903 		goto out_o2net;
904 
905 	config_group_init(&o2nm_cluster_group.cs_subsys.su_group);
906 	mutex_init(&o2nm_cluster_group.cs_subsys.su_mutex);
907 	ret = configfs_register_subsystem(&o2nm_cluster_group.cs_subsys);
908 	if (ret) {
909 		printk(KERN_ERR "nodemanager: Registration returned %d\n", ret);
910 		goto out_callbacks;
911 	}
912 
913 	ret = o2cb_sys_init();
914 	if (!ret)
915 		goto out;
916 
917 	configfs_unregister_subsystem(&o2nm_cluster_group.cs_subsys);
918 out_callbacks:
919 	o2net_unregister_hb_callbacks();
920 out_o2net:
921 	o2net_exit();
922 out_o2hb:
923 	o2hb_exit();
924 out:
925 	return ret;
926 }
927 
928 MODULE_AUTHOR("Oracle");
929 MODULE_LICENSE("GPL");
930 
931 module_init(init_o2nm)
932 module_exit(exit_o2nm)
933