xref: /openbmc/linux/net/tipc/name_table.c (revision e3ec9c7d)
1 /*
2  * net/tipc/name_table.c: TIPC name table code
3  *
4  * Copyright (c) 2000-2006, Ericsson AB
5  * Copyright (c) 2004-2008, Wind River Systems
6  * All rights reserved.
7  *
8  * Redistribution and use in source and binary forms, with or without
9  * modification, are permitted provided that the following conditions are met:
10  *
11  * 1. Redistributions of source code must retain the above copyright
12  *    notice, this list of conditions and the following disclaimer.
13  * 2. Redistributions in binary form must reproduce the above copyright
14  *    notice, this list of conditions and the following disclaimer in the
15  *    documentation and/or other materials provided with the distribution.
16  * 3. Neither the names of the copyright holders nor the names of its
17  *    contributors may be used to endorse or promote products derived from
18  *    this software without specific prior written permission.
19  *
20  * Alternatively, this software may be distributed under the terms of the
21  * GNU General Public License ("GPL") version 2 as published by the Free
22  * Software Foundation.
23  *
24  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
25  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
26  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
27  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
28  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
29  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
30  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
31  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
32  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
33  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
34  * POSSIBILITY OF SUCH DAMAGE.
35  */
36 
37 #include "core.h"
38 #include "config.h"
39 #include "name_table.h"
40 #include "name_distr.h"
41 #include "subscr.h"
42 #include "port.h"
43 
44 static int tipc_nametbl_size = 1024;		/* must be a power of 2 */
45 
46 /**
47  * struct sub_seq - container for all published instances of a name sequence
48  * @lower: name sequence lower bound
49  * @upper: name sequence upper bound
50  * @node_list: circular list of publications made by own node
51  * @cluster_list: circular list of publications made by own cluster
52  * @zone_list: circular list of publications made by own zone
53  * @node_list_size: number of entries in "node_list"
54  * @cluster_list_size: number of entries in "cluster_list"
55  * @zone_list_size: number of entries in "zone_list"
56  *
57  * Note: The zone list always contains at least one entry, since all
58  *       publications of the associated name sequence belong to it.
59  *       (The cluster and node lists may be empty.)
60  */
61 
62 struct sub_seq {
63 	u32 lower;
64 	u32 upper;
65 	struct publication *node_list;
66 	struct publication *cluster_list;
67 	struct publication *zone_list;
68 	u32 node_list_size;
69 	u32 cluster_list_size;
70 	u32 zone_list_size;
71 };
72 
73 /**
74  * struct name_seq - container for all published instances of a name type
75  * @type: 32 bit 'type' value for name sequence
76  * @sseq: pointer to dynamically-sized array of sub-sequences of this 'type';
77  *        sub-sequences are sorted in ascending order
78  * @alloc: number of sub-sequences currently in array
79  * @first_free: array index of first unused sub-sequence entry
80  * @ns_list: links to adjacent name sequences in hash chain
81  * @subscriptions: list of subscriptions for this 'type'
82  * @lock: spinlock controlling access to publication lists of all sub-sequences
83  */
84 
85 struct name_seq {
86 	u32 type;
87 	struct sub_seq *sseqs;
88 	u32 alloc;
89 	u32 first_free;
90 	struct hlist_node ns_list;
91 	struct list_head subscriptions;
92 	spinlock_t lock;
93 };
94 
95 /**
96  * struct name_table - table containing all existing port name publications
97  * @types: pointer to fixed-sized array of name sequence lists,
98  *         accessed via hashing on 'type'; name sequence lists are *not* sorted
99  * @local_publ_count: number of publications issued by this node
100  */
101 
102 struct name_table {
103 	struct hlist_head *types;
104 	u32 local_publ_count;
105 };
106 
107 static struct name_table table;
108 static atomic_t rsv_publ_ok = ATOMIC_INIT(0);
109 DEFINE_RWLOCK(tipc_nametbl_lock);
110 
111 
112 static int hash(int x)
113 {
114 	return x & (tipc_nametbl_size - 1);
115 }
116 
117 /**
118  * publ_create - create a publication structure
119  */
120 
121 static struct publication *publ_create(u32 type, u32 lower, u32 upper,
122 				       u32 scope, u32 node, u32 port_ref,
123 				       u32 key)
124 {
125 	struct publication *publ = kzalloc(sizeof(*publ), GFP_ATOMIC);
126 	if (publ == NULL) {
127 		warn("Publication creation failure, no memory\n");
128 		return NULL;
129 	}
130 
131 	publ->type = type;
132 	publ->lower = lower;
133 	publ->upper = upper;
134 	publ->scope = scope;
135 	publ->node = node;
136 	publ->ref = port_ref;
137 	publ->key = key;
138 	INIT_LIST_HEAD(&publ->local_list);
139 	INIT_LIST_HEAD(&publ->pport_list);
140 	INIT_LIST_HEAD(&publ->subscr.nodesub_list);
141 	return publ;
142 }
143 
144 /**
145  * tipc_subseq_alloc - allocate a specified number of sub-sequence structures
146  */
147 
148 static struct sub_seq *tipc_subseq_alloc(u32 cnt)
149 {
150 	struct sub_seq *sseq = kcalloc(cnt, sizeof(struct sub_seq), GFP_ATOMIC);
151 	return sseq;
152 }
153 
154 /**
155  * tipc_nameseq_create - create a name sequence structure for the specified 'type'
156  *
157  * Allocates a single sub-sequence structure and sets it to all 0's.
158  */
159 
160 static struct name_seq *tipc_nameseq_create(u32 type, struct hlist_head *seq_head)
161 {
162 	struct name_seq *nseq = kzalloc(sizeof(*nseq), GFP_ATOMIC);
163 	struct sub_seq *sseq = tipc_subseq_alloc(1);
164 
165 	if (!nseq || !sseq) {
166 		warn("Name sequence creation failed, no memory\n");
167 		kfree(nseq);
168 		kfree(sseq);
169 		return NULL;
170 	}
171 
172 	spin_lock_init(&nseq->lock);
173 	nseq->type = type;
174 	nseq->sseqs = sseq;
175 	nseq->alloc = 1;
176 	INIT_HLIST_NODE(&nseq->ns_list);
177 	INIT_LIST_HEAD(&nseq->subscriptions);
178 	hlist_add_head(&nseq->ns_list, seq_head);
179 	return nseq;
180 }
181 
182 /**
183  * nameseq_find_subseq - find sub-sequence (if any) matching a name instance
184  *
185  * Very time-critical, so binary searches through sub-sequence array.
186  */
187 
188 static struct sub_seq *nameseq_find_subseq(struct name_seq *nseq,
189 					   u32 instance)
190 {
191 	struct sub_seq *sseqs = nseq->sseqs;
192 	int low = 0;
193 	int high = nseq->first_free - 1;
194 	int mid;
195 
196 	while (low <= high) {
197 		mid = (low + high) / 2;
198 		if (instance < sseqs[mid].lower)
199 			high = mid - 1;
200 		else if (instance > sseqs[mid].upper)
201 			low = mid + 1;
202 		else
203 			return &sseqs[mid];
204 	}
205 	return NULL;
206 }
207 
208 /**
209  * nameseq_locate_subseq - determine position of name instance in sub-sequence
210  *
211  * Returns index in sub-sequence array of the entry that contains the specified
212  * instance value; if no entry contains that value, returns the position
213  * where a new entry for it would be inserted in the array.
214  *
215  * Note: Similar to binary search code for locating a sub-sequence.
216  */
217 
218 static u32 nameseq_locate_subseq(struct name_seq *nseq, u32 instance)
219 {
220 	struct sub_seq *sseqs = nseq->sseqs;
221 	int low = 0;
222 	int high = nseq->first_free - 1;
223 	int mid;
224 
225 	while (low <= high) {
226 		mid = (low + high) / 2;
227 		if (instance < sseqs[mid].lower)
228 			high = mid - 1;
229 		else if (instance > sseqs[mid].upper)
230 			low = mid + 1;
231 		else
232 			return mid;
233 	}
234 	return low;
235 }
236 
237 /**
238  * tipc_nameseq_insert_publ -
239  */
240 
241 static struct publication *tipc_nameseq_insert_publ(struct name_seq *nseq,
242 						    u32 type, u32 lower, u32 upper,
243 						    u32 scope, u32 node, u32 port, u32 key)
244 {
245 	struct subscription *s;
246 	struct subscription *st;
247 	struct publication *publ;
248 	struct sub_seq *sseq;
249 	int created_subseq = 0;
250 
251 	sseq = nameseq_find_subseq(nseq, lower);
252 	if (sseq) {
253 
254 		/* Lower end overlaps existing entry => need an exact match */
255 
256 		if ((sseq->lower != lower) || (sseq->upper != upper)) {
257 			warn("Cannot publish {%u,%u,%u}, overlap error\n",
258 			     type, lower, upper);
259 			return NULL;
260 		}
261 	} else {
262 		u32 inspos;
263 		struct sub_seq *freesseq;
264 
265 		/* Find where lower end should be inserted */
266 
267 		inspos = nameseq_locate_subseq(nseq, lower);
268 
269 		/* Fail if upper end overlaps into an existing entry */
270 
271 		if ((inspos < nseq->first_free) &&
272 		    (upper >= nseq->sseqs[inspos].lower)) {
273 			warn("Cannot publish {%u,%u,%u}, overlap error\n",
274 			     type, lower, upper);
275 			return NULL;
276 		}
277 
278 		/* Ensure there is space for new sub-sequence */
279 
280 		if (nseq->first_free == nseq->alloc) {
281 			struct sub_seq *sseqs = tipc_subseq_alloc(nseq->alloc * 2);
282 
283 			if (!sseqs) {
284 				warn("Cannot publish {%u,%u,%u}, no memory\n",
285 				     type, lower, upper);
286 				return NULL;
287 			}
288 			memcpy(sseqs, nseq->sseqs,
289 			       nseq->alloc * sizeof(struct sub_seq));
290 			kfree(nseq->sseqs);
291 			nseq->sseqs = sseqs;
292 			nseq->alloc *= 2;
293 		}
294 
295 		/* Insert new sub-sequence */
296 
297 		sseq = &nseq->sseqs[inspos];
298 		freesseq = &nseq->sseqs[nseq->first_free];
299 		memmove(sseq + 1, sseq, (freesseq - sseq) * sizeof(*sseq));
300 		memset(sseq, 0, sizeof(*sseq));
301 		nseq->first_free++;
302 		sseq->lower = lower;
303 		sseq->upper = upper;
304 		created_subseq = 1;
305 	}
306 
307 	/* Insert a publication: */
308 
309 	publ = publ_create(type, lower, upper, scope, node, port, key);
310 	if (!publ)
311 		return NULL;
312 
313 	sseq->zone_list_size++;
314 	if (!sseq->zone_list)
315 		sseq->zone_list = publ->zone_list_next = publ;
316 	else {
317 		publ->zone_list_next = sseq->zone_list->zone_list_next;
318 		sseq->zone_list->zone_list_next = publ;
319 	}
320 
321 	if (in_own_cluster(node)) {
322 		sseq->cluster_list_size++;
323 		if (!sseq->cluster_list)
324 			sseq->cluster_list = publ->cluster_list_next = publ;
325 		else {
326 			publ->cluster_list_next =
327 			sseq->cluster_list->cluster_list_next;
328 			sseq->cluster_list->cluster_list_next = publ;
329 		}
330 	}
331 
332 	if (node == tipc_own_addr) {
333 		sseq->node_list_size++;
334 		if (!sseq->node_list)
335 			sseq->node_list = publ->node_list_next = publ;
336 		else {
337 			publ->node_list_next = sseq->node_list->node_list_next;
338 			sseq->node_list->node_list_next = publ;
339 		}
340 	}
341 
342 	/*
343 	 * Any subscriptions waiting for notification?
344 	 */
345 	list_for_each_entry_safe(s, st, &nseq->subscriptions, nameseq_list) {
346 		tipc_subscr_report_overlap(s,
347 					   publ->lower,
348 					   publ->upper,
349 					   TIPC_PUBLISHED,
350 					   publ->ref,
351 					   publ->node,
352 					   created_subseq);
353 	}
354 	return publ;
355 }
356 
357 /**
358  * tipc_nameseq_remove_publ -
359  *
360  * NOTE: There may be cases where TIPC is asked to remove a publication
361  * that is not in the name table.  For example, if another node issues a
362  * publication for a name sequence that overlaps an existing name sequence
363  * the publication will not be recorded, which means the publication won't
364  * be found when the name sequence is later withdrawn by that node.
365  * A failed withdraw request simply returns a failure indication and lets the
366  * caller issue any error or warning messages associated with such a problem.
367  */
368 
369 static struct publication *tipc_nameseq_remove_publ(struct name_seq *nseq, u32 inst,
370 						    u32 node, u32 ref, u32 key)
371 {
372 	struct publication *publ;
373 	struct publication *curr;
374 	struct publication *prev;
375 	struct sub_seq *sseq = nameseq_find_subseq(nseq, inst);
376 	struct sub_seq *free;
377 	struct subscription *s, *st;
378 	int removed_subseq = 0;
379 
380 	if (!sseq)
381 		return NULL;
382 
383 	/* Remove publication from zone scope list */
384 
385 	prev = sseq->zone_list;
386 	publ = sseq->zone_list->zone_list_next;
387 	while ((publ->key != key) || (publ->ref != ref) ||
388 	       (publ->node && (publ->node != node))) {
389 		prev = publ;
390 		publ = publ->zone_list_next;
391 		if (prev == sseq->zone_list) {
392 
393 			/* Prevent endless loop if publication not found */
394 
395 			return NULL;
396 		}
397 	}
398 	if (publ != sseq->zone_list)
399 		prev->zone_list_next = publ->zone_list_next;
400 	else if (publ->zone_list_next != publ) {
401 		prev->zone_list_next = publ->zone_list_next;
402 		sseq->zone_list = publ->zone_list_next;
403 	} else {
404 		sseq->zone_list = NULL;
405 	}
406 	sseq->zone_list_size--;
407 
408 	/* Remove publication from cluster scope list, if present */
409 
410 	if (in_own_cluster(node)) {
411 		prev = sseq->cluster_list;
412 		curr = sseq->cluster_list->cluster_list_next;
413 		while (curr != publ) {
414 			prev = curr;
415 			curr = curr->cluster_list_next;
416 			if (prev == sseq->cluster_list) {
417 
418 				/* Prevent endless loop for malformed list */
419 
420 				err("Unable to de-list cluster publication\n"
421 				    "{%u%u}, node=0x%x, ref=%u, key=%u)\n",
422 				    publ->type, publ->lower, publ->node,
423 				    publ->ref, publ->key);
424 				goto end_cluster;
425 			}
426 		}
427 		if (publ != sseq->cluster_list)
428 			prev->cluster_list_next = publ->cluster_list_next;
429 		else if (publ->cluster_list_next != publ) {
430 			prev->cluster_list_next = publ->cluster_list_next;
431 			sseq->cluster_list = publ->cluster_list_next;
432 		} else {
433 			sseq->cluster_list = NULL;
434 		}
435 		sseq->cluster_list_size--;
436 	}
437 end_cluster:
438 
439 	/* Remove publication from node scope list, if present */
440 
441 	if (node == tipc_own_addr) {
442 		prev = sseq->node_list;
443 		curr = sseq->node_list->node_list_next;
444 		while (curr != publ) {
445 			prev = curr;
446 			curr = curr->node_list_next;
447 			if (prev == sseq->node_list) {
448 
449 				/* Prevent endless loop for malformed list */
450 
451 				err("Unable to de-list node publication\n"
452 				    "{%u%u}, node=0x%x, ref=%u, key=%u)\n",
453 				    publ->type, publ->lower, publ->node,
454 				    publ->ref, publ->key);
455 				goto end_node;
456 			}
457 		}
458 		if (publ != sseq->node_list)
459 			prev->node_list_next = publ->node_list_next;
460 		else if (publ->node_list_next != publ) {
461 			prev->node_list_next = publ->node_list_next;
462 			sseq->node_list = publ->node_list_next;
463 		} else {
464 			sseq->node_list = NULL;
465 		}
466 		sseq->node_list_size--;
467 	}
468 end_node:
469 
470 	/* Contract subseq list if no more publications for that subseq */
471 
472 	if (!sseq->zone_list) {
473 		free = &nseq->sseqs[nseq->first_free--];
474 		memmove(sseq, sseq + 1, (free - (sseq + 1)) * sizeof(*sseq));
475 		removed_subseq = 1;
476 	}
477 
478 	/* Notify any waiting subscriptions */
479 
480 	list_for_each_entry_safe(s, st, &nseq->subscriptions, nameseq_list) {
481 		tipc_subscr_report_overlap(s,
482 					   publ->lower,
483 					   publ->upper,
484 					   TIPC_WITHDRAWN,
485 					   publ->ref,
486 					   publ->node,
487 					   removed_subseq);
488 	}
489 
490 	return publ;
491 }
492 
493 /**
494  * tipc_nameseq_subscribe: attach a subscription, and issue
495  * the prescribed number of events if there is any sub-
496  * sequence overlapping with the requested sequence
497  */
498 
499 static void tipc_nameseq_subscribe(struct name_seq *nseq, struct subscription *s)
500 {
501 	struct sub_seq *sseq = nseq->sseqs;
502 
503 	list_add(&s->nameseq_list, &nseq->subscriptions);
504 
505 	if (!sseq)
506 		return;
507 
508 	while (sseq != &nseq->sseqs[nseq->first_free]) {
509 		struct publication *zl = sseq->zone_list;
510 		if (zl && tipc_subscr_overlap(s, sseq->lower, sseq->upper)) {
511 			struct publication *crs = zl;
512 			int must_report = 1;
513 
514 			do {
515 				tipc_subscr_report_overlap(s,
516 							   sseq->lower,
517 							   sseq->upper,
518 							   TIPC_PUBLISHED,
519 							   crs->ref,
520 							   crs->node,
521 							   must_report);
522 				must_report = 0;
523 				crs = crs->zone_list_next;
524 			} while (crs != zl);
525 		}
526 		sseq++;
527 	}
528 }
529 
530 static struct name_seq *nametbl_find_seq(u32 type)
531 {
532 	struct hlist_head *seq_head;
533 	struct hlist_node *seq_node;
534 	struct name_seq *ns;
535 
536 	seq_head = &table.types[hash(type)];
537 	hlist_for_each_entry(ns, seq_node, seq_head, ns_list) {
538 		if (ns->type == type)
539 			return ns;
540 	}
541 
542 	return NULL;
543 };
544 
545 struct publication *tipc_nametbl_insert_publ(u32 type, u32 lower, u32 upper,
546 					     u32 scope, u32 node, u32 port, u32 key)
547 {
548 	struct name_seq *seq = nametbl_find_seq(type);
549 
550 	if (lower > upper) {
551 		warn("Failed to publish illegal {%u,%u,%u}\n",
552 		     type, lower, upper);
553 		return NULL;
554 	}
555 
556 	if (!seq)
557 		seq = tipc_nameseq_create(type, &table.types[hash(type)]);
558 	if (!seq)
559 		return NULL;
560 
561 	return tipc_nameseq_insert_publ(seq, type, lower, upper,
562 					scope, node, port, key);
563 }
564 
565 struct publication *tipc_nametbl_remove_publ(u32 type, u32 lower,
566 					     u32 node, u32 ref, u32 key)
567 {
568 	struct publication *publ;
569 	struct name_seq *seq = nametbl_find_seq(type);
570 
571 	if (!seq)
572 		return NULL;
573 
574 	publ = tipc_nameseq_remove_publ(seq, lower, node, ref, key);
575 
576 	if (!seq->first_free && list_empty(&seq->subscriptions)) {
577 		hlist_del_init(&seq->ns_list);
578 		kfree(seq->sseqs);
579 		kfree(seq);
580 	}
581 	return publ;
582 }
583 
584 /*
585  * tipc_nametbl_translate - translate name to port id
586  *
587  * Note: on entry 'destnode' is the search domain used during translation;
588  *       on exit it passes back the node address of the matching port (if any)
589  */
590 
591 u32 tipc_nametbl_translate(u32 type, u32 instance, u32 *destnode)
592 {
593 	struct sub_seq *sseq;
594 	struct publication *publ = NULL;
595 	struct name_seq *seq;
596 	u32 ref;
597 
598 	if (!tipc_in_scope(*destnode, tipc_own_addr))
599 		return 0;
600 
601 	read_lock_bh(&tipc_nametbl_lock);
602 	seq = nametbl_find_seq(type);
603 	if (unlikely(!seq))
604 		goto not_found;
605 	sseq = nameseq_find_subseq(seq, instance);
606 	if (unlikely(!sseq))
607 		goto not_found;
608 	spin_lock_bh(&seq->lock);
609 
610 	/* Closest-First Algorithm: */
611 	if (likely(!*destnode)) {
612 		publ = sseq->node_list;
613 		if (publ) {
614 			sseq->node_list = publ->node_list_next;
615 found:
616 			ref = publ->ref;
617 			*destnode = publ->node;
618 			spin_unlock_bh(&seq->lock);
619 			read_unlock_bh(&tipc_nametbl_lock);
620 			return ref;
621 		}
622 		publ = sseq->cluster_list;
623 		if (publ) {
624 			sseq->cluster_list = publ->cluster_list_next;
625 			goto found;
626 		}
627 		publ = sseq->zone_list;
628 		if (publ) {
629 			sseq->zone_list = publ->zone_list_next;
630 			goto found;
631 		}
632 	}
633 
634 	/* Round-Robin Algorithm: */
635 	else if (*destnode == tipc_own_addr) {
636 		publ = sseq->node_list;
637 		if (publ) {
638 			sseq->node_list = publ->node_list_next;
639 			goto found;
640 		}
641 	} else if (in_own_cluster(*destnode)) {
642 		publ = sseq->cluster_list;
643 		if (publ) {
644 			sseq->cluster_list = publ->cluster_list_next;
645 			goto found;
646 		}
647 	} else {
648 		publ = sseq->zone_list;
649 		if (publ) {
650 			sseq->zone_list = publ->zone_list_next;
651 			goto found;
652 		}
653 	}
654 	spin_unlock_bh(&seq->lock);
655 not_found:
656 	read_unlock_bh(&tipc_nametbl_lock);
657 	return 0;
658 }
659 
660 /**
661  * tipc_nametbl_mc_translate - find multicast destinations
662  *
663  * Creates list of all local ports that overlap the given multicast address;
664  * also determines if any off-node ports overlap.
665  *
666  * Note: Publications with a scope narrower than 'limit' are ignored.
667  * (i.e. local node-scope publications mustn't receive messages arriving
668  * from another node, even if the multcast link brought it here)
669  *
670  * Returns non-zero if any off-node ports overlap
671  */
672 
673 int tipc_nametbl_mc_translate(u32 type, u32 lower, u32 upper, u32 limit,
674 			      struct port_list *dports)
675 {
676 	struct name_seq *seq;
677 	struct sub_seq *sseq;
678 	struct sub_seq *sseq_stop;
679 	int res = 0;
680 
681 	read_lock_bh(&tipc_nametbl_lock);
682 	seq = nametbl_find_seq(type);
683 	if (!seq)
684 		goto exit;
685 
686 	spin_lock_bh(&seq->lock);
687 
688 	sseq = seq->sseqs + nameseq_locate_subseq(seq, lower);
689 	sseq_stop = seq->sseqs + seq->first_free;
690 	for (; sseq != sseq_stop; sseq++) {
691 		struct publication *publ;
692 
693 		if (sseq->lower > upper)
694 			break;
695 
696 		publ = sseq->node_list;
697 		if (publ) {
698 			do {
699 				if (publ->scope <= limit)
700 					tipc_port_list_add(dports, publ->ref);
701 				publ = publ->node_list_next;
702 			} while (publ != sseq->node_list);
703 		}
704 
705 		if (sseq->cluster_list_size != sseq->node_list_size)
706 			res = 1;
707 	}
708 
709 	spin_unlock_bh(&seq->lock);
710 exit:
711 	read_unlock_bh(&tipc_nametbl_lock);
712 	return res;
713 }
714 
715 /**
716  * tipc_nametbl_publish_rsv - publish port name using a reserved name type
717  */
718 
719 int tipc_nametbl_publish_rsv(u32 ref, unsigned int scope,
720 			struct tipc_name_seq const *seq)
721 {
722 	int res;
723 
724 	atomic_inc(&rsv_publ_ok);
725 	res = tipc_publish(ref, scope, seq);
726 	atomic_dec(&rsv_publ_ok);
727 	return res;
728 }
729 
730 /**
731  * tipc_nametbl_publish - add name publication to network name tables
732  */
733 
734 struct publication *tipc_nametbl_publish(u32 type, u32 lower, u32 upper,
735 				    u32 scope, u32 port_ref, u32 key)
736 {
737 	struct publication *publ;
738 
739 	if (table.local_publ_count >= tipc_max_publications) {
740 		warn("Publication failed, local publication limit reached (%u)\n",
741 		     tipc_max_publications);
742 		return NULL;
743 	}
744 	if ((type < TIPC_RESERVED_TYPES) && !atomic_read(&rsv_publ_ok)) {
745 		warn("Publication failed, reserved name {%u,%u,%u}\n",
746 		     type, lower, upper);
747 		return NULL;
748 	}
749 
750 	write_lock_bh(&tipc_nametbl_lock);
751 	table.local_publ_count++;
752 	publ = tipc_nametbl_insert_publ(type, lower, upper, scope,
753 				   tipc_own_addr, port_ref, key);
754 	if (publ && (scope != TIPC_NODE_SCOPE)) {
755 		tipc_named_publish(publ);
756 	}
757 	write_unlock_bh(&tipc_nametbl_lock);
758 	return publ;
759 }
760 
761 /**
762  * tipc_nametbl_withdraw - withdraw name publication from network name tables
763  */
764 
765 int tipc_nametbl_withdraw(u32 type, u32 lower, u32 ref, u32 key)
766 {
767 	struct publication *publ;
768 
769 	write_lock_bh(&tipc_nametbl_lock);
770 	publ = tipc_nametbl_remove_publ(type, lower, tipc_own_addr, ref, key);
771 	if (likely(publ)) {
772 		table.local_publ_count--;
773 		if (publ->scope != TIPC_NODE_SCOPE)
774 			tipc_named_withdraw(publ);
775 		write_unlock_bh(&tipc_nametbl_lock);
776 		list_del_init(&publ->pport_list);
777 		kfree(publ);
778 		return 1;
779 	}
780 	write_unlock_bh(&tipc_nametbl_lock);
781 	err("Unable to remove local publication\n"
782 	    "(type=%u, lower=%u, ref=%u, key=%u)\n",
783 	    type, lower, ref, key);
784 	return 0;
785 }
786 
787 /**
788  * tipc_nametbl_subscribe - add a subscription object to the name table
789  */
790 
791 void tipc_nametbl_subscribe(struct subscription *s)
792 {
793 	u32 type = s->seq.type;
794 	struct name_seq *seq;
795 
796 	write_lock_bh(&tipc_nametbl_lock);
797 	seq = nametbl_find_seq(type);
798 	if (!seq) {
799 		seq = tipc_nameseq_create(type, &table.types[hash(type)]);
800 	}
801 	if (seq) {
802 		spin_lock_bh(&seq->lock);
803 		tipc_nameseq_subscribe(seq, s);
804 		spin_unlock_bh(&seq->lock);
805 	} else {
806 		warn("Failed to create subscription for {%u,%u,%u}\n",
807 		     s->seq.type, s->seq.lower, s->seq.upper);
808 	}
809 	write_unlock_bh(&tipc_nametbl_lock);
810 }
811 
812 /**
813  * tipc_nametbl_unsubscribe - remove a subscription object from name table
814  */
815 
816 void tipc_nametbl_unsubscribe(struct subscription *s)
817 {
818 	struct name_seq *seq;
819 
820 	write_lock_bh(&tipc_nametbl_lock);
821 	seq = nametbl_find_seq(s->seq.type);
822 	if (seq != NULL) {
823 		spin_lock_bh(&seq->lock);
824 		list_del_init(&s->nameseq_list);
825 		spin_unlock_bh(&seq->lock);
826 		if ((seq->first_free == 0) && list_empty(&seq->subscriptions)) {
827 			hlist_del_init(&seq->ns_list);
828 			kfree(seq->sseqs);
829 			kfree(seq);
830 		}
831 	}
832 	write_unlock_bh(&tipc_nametbl_lock);
833 }
834 
835 
836 /**
837  * subseq_list: print specified sub-sequence contents into the given buffer
838  */
839 
840 static void subseq_list(struct sub_seq *sseq, struct print_buf *buf, u32 depth,
841 			u32 index)
842 {
843 	char portIdStr[27];
844 	const char *scope_str[] = {"", " zone", " cluster", " node"};
845 	struct publication *publ = sseq->zone_list;
846 
847 	tipc_printf(buf, "%-10u %-10u ", sseq->lower, sseq->upper);
848 
849 	if (depth == 2 || !publ) {
850 		tipc_printf(buf, "\n");
851 		return;
852 	}
853 
854 	do {
855 		sprintf(portIdStr, "<%u.%u.%u:%u>",
856 			 tipc_zone(publ->node), tipc_cluster(publ->node),
857 			 tipc_node(publ->node), publ->ref);
858 		tipc_printf(buf, "%-26s ", portIdStr);
859 		if (depth > 3) {
860 			tipc_printf(buf, "%-10u %s", publ->key,
861 				    scope_str[publ->scope]);
862 		}
863 
864 		publ = publ->zone_list_next;
865 		if (publ == sseq->zone_list)
866 			break;
867 
868 		tipc_printf(buf, "\n%33s", " ");
869 	} while (1);
870 
871 	tipc_printf(buf, "\n");
872 }
873 
874 /**
875  * nameseq_list: print specified name sequence contents into the given buffer
876  */
877 
878 static void nameseq_list(struct name_seq *seq, struct print_buf *buf, u32 depth,
879 			 u32 type, u32 lowbound, u32 upbound, u32 index)
880 {
881 	struct sub_seq *sseq;
882 	char typearea[11];
883 
884 	if (seq->first_free == 0)
885 		return;
886 
887 	sprintf(typearea, "%-10u", seq->type);
888 
889 	if (depth == 1) {
890 		tipc_printf(buf, "%s\n", typearea);
891 		return;
892 	}
893 
894 	for (sseq = seq->sseqs; sseq != &seq->sseqs[seq->first_free]; sseq++) {
895 		if ((lowbound <= sseq->upper) && (upbound >= sseq->lower)) {
896 			tipc_printf(buf, "%s ", typearea);
897 			spin_lock_bh(&seq->lock);
898 			subseq_list(sseq, buf, depth, index);
899 			spin_unlock_bh(&seq->lock);
900 			sprintf(typearea, "%10s", " ");
901 		}
902 	}
903 }
904 
905 /**
906  * nametbl_header - print name table header into the given buffer
907  */
908 
909 static void nametbl_header(struct print_buf *buf, u32 depth)
910 {
911 	const char *header[] = {
912 		"Type       ",
913 		"Lower      Upper      ",
914 		"Port Identity              ",
915 		"Publication Scope"
916 	};
917 
918 	int i;
919 
920 	if (depth > 4)
921 		depth = 4;
922 	for (i = 0; i < depth; i++)
923 		tipc_printf(buf, header[i]);
924 	tipc_printf(buf, "\n");
925 }
926 
927 /**
928  * nametbl_list - print specified name table contents into the given buffer
929  */
930 
931 static void nametbl_list(struct print_buf *buf, u32 depth_info,
932 			 u32 type, u32 lowbound, u32 upbound)
933 {
934 	struct hlist_head *seq_head;
935 	struct hlist_node *seq_node;
936 	struct name_seq *seq;
937 	int all_types;
938 	u32 depth;
939 	u32 i;
940 
941 	all_types = (depth_info & TIPC_NTQ_ALLTYPES);
942 	depth = (depth_info & ~TIPC_NTQ_ALLTYPES);
943 
944 	if (depth == 0)
945 		return;
946 
947 	if (all_types) {
948 		/* display all entries in name table to specified depth */
949 		nametbl_header(buf, depth);
950 		lowbound = 0;
951 		upbound = ~0;
952 		for (i = 0; i < tipc_nametbl_size; i++) {
953 			seq_head = &table.types[i];
954 			hlist_for_each_entry(seq, seq_node, seq_head, ns_list) {
955 				nameseq_list(seq, buf, depth, seq->type,
956 					     lowbound, upbound, i);
957 			}
958 		}
959 	} else {
960 		/* display only the sequence that matches the specified type */
961 		if (upbound < lowbound) {
962 			tipc_printf(buf, "invalid name sequence specified\n");
963 			return;
964 		}
965 		nametbl_header(buf, depth);
966 		i = hash(type);
967 		seq_head = &table.types[i];
968 		hlist_for_each_entry(seq, seq_node, seq_head, ns_list) {
969 			if (seq->type == type) {
970 				nameseq_list(seq, buf, depth, type,
971 					     lowbound, upbound, i);
972 				break;
973 			}
974 		}
975 	}
976 }
977 
978 #define MAX_NAME_TBL_QUERY 32768
979 
980 struct sk_buff *tipc_nametbl_get(const void *req_tlv_area, int req_tlv_space)
981 {
982 	struct sk_buff *buf;
983 	struct tipc_name_table_query *argv;
984 	struct tlv_desc *rep_tlv;
985 	struct print_buf b;
986 	int str_len;
987 
988 	if (!TLV_CHECK(req_tlv_area, req_tlv_space, TIPC_TLV_NAME_TBL_QUERY))
989 		return tipc_cfg_reply_error_string(TIPC_CFG_TLV_ERROR);
990 
991 	buf = tipc_cfg_reply_alloc(TLV_SPACE(MAX_NAME_TBL_QUERY));
992 	if (!buf)
993 		return NULL;
994 
995 	rep_tlv = (struct tlv_desc *)buf->data;
996 	tipc_printbuf_init(&b, TLV_DATA(rep_tlv), MAX_NAME_TBL_QUERY);
997 	argv = (struct tipc_name_table_query *)TLV_DATA(req_tlv_area);
998 	read_lock_bh(&tipc_nametbl_lock);
999 	nametbl_list(&b, ntohl(argv->depth), ntohl(argv->type),
1000 		     ntohl(argv->lowbound), ntohl(argv->upbound));
1001 	read_unlock_bh(&tipc_nametbl_lock);
1002 	str_len = tipc_printbuf_validate(&b);
1003 
1004 	skb_put(buf, TLV_SPACE(str_len));
1005 	TLV_SET(rep_tlv, TIPC_TLV_ULTRA_STRING, NULL, str_len);
1006 
1007 	return buf;
1008 }
1009 
1010 int tipc_nametbl_init(void)
1011 {
1012 	table.types = kcalloc(tipc_nametbl_size, sizeof(struct hlist_head),
1013 			      GFP_ATOMIC);
1014 	if (!table.types)
1015 		return -ENOMEM;
1016 
1017 	table.local_publ_count = 0;
1018 	return 0;
1019 }
1020 
1021 void tipc_nametbl_stop(void)
1022 {
1023 	u32 i;
1024 
1025 	if (!table.types)
1026 		return;
1027 
1028 	/* Verify name table is empty, then release it */
1029 
1030 	write_lock_bh(&tipc_nametbl_lock);
1031 	for (i = 0; i < tipc_nametbl_size; i++) {
1032 		if (!hlist_empty(&table.types[i]))
1033 			err("tipc_nametbl_stop(): hash chain %u is non-null\n", i);
1034 	}
1035 	kfree(table.types);
1036 	table.types = NULL;
1037 	write_unlock_bh(&tipc_nametbl_lock);
1038 }
1039 
1040