name_distr.c (19415dbf3097d3eace4fc756cdf49cad0e697134) name_distr.c (37922ea4a3105176357c8d565a9d982c4a08714a)
1/*
2 * net/tipc/name_distr.c: TIPC name distribution code
3 *
4 * Copyright (c) 2000-2006, 2014, Ericsson AB
5 * Copyright (c) 2005, 2010-2011, Wind River Systems
6 * All rights reserved.
7 *
8 * Redistribution and use in source and binary forms, with or without

--- 190 unchanged lines hidden (view full) ---

199/**
200 * tipc_publ_purge - remove publication associated with a failed node
201 *
202 * Invoked for each publication issued by a newly failed node.
203 * Removes publication structure from name table & deletes it.
204 */
205static void tipc_publ_purge(struct net *net, struct publication *publ, u32 addr)
206{
1/*
2 * net/tipc/name_distr.c: TIPC name distribution code
3 *
4 * Copyright (c) 2000-2006, 2014, Ericsson AB
5 * Copyright (c) 2005, 2010-2011, Wind River Systems
6 * All rights reserved.
7 *
8 * Redistribution and use in source and binary forms, with or without

--- 190 unchanged lines hidden (view full) ---

199/**
200 * tipc_publ_purge - remove publication associated with a failed node
201 *
202 * Invoked for each publication issued by a newly failed node.
203 * Removes publication structure from name table & deletes it.
204 */
205static void tipc_publ_purge(struct net *net, struct publication *publ, u32 addr)
206{
207 struct tipc_net *tn = net_generic(net, tipc_net_id);
207 struct tipc_net *tn = tipc_net(net);
208 struct publication *p;
209
210 spin_lock_bh(&tn->nametbl_lock);
208 struct publication *p;
209
210 spin_lock_bh(&tn->nametbl_lock);
211 p = tipc_nametbl_remove_publ(net, publ->type, publ->lower,
212 publ->node, publ->port, publ->key);
211 p = tipc_nametbl_remove_publ(net, publ->type, publ->lower, publ->upper,
212 publ->node, publ->key);
213 if (p)
214 tipc_node_unsubscribe(net, &p->binding_node, addr);
215 spin_unlock_bh(&tn->nametbl_lock);
216
217 if (p != publ) {
218 pr_err("Unable to remove publication from failed node\n"
219 " (type=%u, lower=%u, node=0x%x, port=%u, key=%u)\n",
220 publ->type, publ->lower, publ->node, publ->port,

--- 35 unchanged lines hidden (view full) ---

256 * subscribers
257 *
258 * tipc_nametbl_lock must be held.
259 * Returns the publication item if successful, otherwise NULL.
260 */
261static bool tipc_update_nametbl(struct net *net, struct distr_item *i,
262 u32 node, u32 dtype)
263{
213 if (p)
214 tipc_node_unsubscribe(net, &p->binding_node, addr);
215 spin_unlock_bh(&tn->nametbl_lock);
216
217 if (p != publ) {
218 pr_err("Unable to remove publication from failed node\n"
219 " (type=%u, lower=%u, node=0x%x, port=%u, key=%u)\n",
220 publ->type, publ->lower, publ->node, publ->port,

--- 35 unchanged lines hidden (view full) ---

256 * subscribers
257 *
258 * tipc_nametbl_lock must be held.
259 * Returns the publication item if successful, otherwise NULL.
260 */
261static bool tipc_update_nametbl(struct net *net, struct distr_item *i,
262 u32 node, u32 dtype)
263{
264 struct publication *publ = NULL;
264 struct publication *p = NULL;
265 u32 lower = ntohl(i->lower);
266 u32 upper = ntohl(i->upper);
267 u32 type = ntohl(i->type);
268 u32 port = ntohl(i->port);
269 u32 key = ntohl(i->key);
265
266 if (dtype == PUBLICATION) {
270
271 if (dtype == PUBLICATION) {
267 publ = tipc_nametbl_insert_publ(net, ntohl(i->type),
268 ntohl(i->lower),
269 ntohl(i->upper),
270 TIPC_CLUSTER_SCOPE, node,
271 ntohl(i->port), ntohl(i->key));
272 if (publ) {
273 tipc_node_subscribe(net, &publ->binding_node, node);
272 p = tipc_nametbl_insert_publ(net, type, lower, upper,
273 TIPC_CLUSTER_SCOPE, node,
274 port, key);
275 if (p) {
276 tipc_node_subscribe(net, &p->binding_node, node);
274 return true;
275 }
276 } else if (dtype == WITHDRAWAL) {
277 return true;
278 }
279 } else if (dtype == WITHDRAWAL) {
277 publ = tipc_nametbl_remove_publ(net, ntohl(i->type),
278 ntohl(i->lower),
279 node, ntohl(i->port),
280 ntohl(i->key));
281 if (publ) {
282 tipc_node_unsubscribe(net, &publ->binding_node, node);
283 kfree_rcu(publ, rcu);
280 p = tipc_nametbl_remove_publ(net, type, lower,
281 upper, node, key);
282 if (p) {
283 tipc_node_unsubscribe(net, &p->binding_node, node);
284 kfree_rcu(p, rcu);
284 return true;
285 }
285 return true;
286 }
287 pr_warn_ratelimited("Failed to remove binding %u,%u from %x\n",
288 type, lower, node);
286 } else {
287 pr_warn("Unrecognized name table message received\n");
288 }
289 return false;
290}
291
292/**
289 } else {
290 pr_warn("Unrecognized name table message received\n");
291 }
292 return false;
293}
294
295/**
293 * tipc_named_add_backlog - add a failed name table update to the backlog
294 *
295 */
296static void tipc_named_add_backlog(struct net *net, struct distr_item *i,
297 u32 type, u32 node)
298{
299 struct distr_queue_item *e;
300 struct tipc_net *tn = net_generic(net, tipc_net_id);
301 unsigned long now = get_jiffies_64();
302
303 e = kzalloc(sizeof(*e), GFP_ATOMIC);
304 if (!e)
305 return;
306 e->dtype = type;
307 e->node = node;
308 e->expires = now + msecs_to_jiffies(sysctl_tipc_named_timeout);
309 memcpy(e, i, sizeof(*i));
310 list_add_tail(&e->next, &tn->dist_queue);
311}
312
313/**
314 * tipc_named_process_backlog - try to process any pending name table updates
315 * from the network.
316 */
317void tipc_named_process_backlog(struct net *net)
318{
319 struct distr_queue_item *e, *tmp;
320 struct tipc_net *tn = net_generic(net, tipc_net_id);
321 unsigned long now = get_jiffies_64();
322
323 list_for_each_entry_safe(e, tmp, &tn->dist_queue, next) {
324 if (time_after(e->expires, now)) {
325 if (!tipc_update_nametbl(net, &e->i, e->node, e->dtype))
326 continue;
327 } else {
328 pr_warn_ratelimited("Dropping name table update (%d) of {%u, %u, %u} from %x key=%u\n",
329 e->dtype, ntohl(e->i.type),
330 ntohl(e->i.lower),
331 ntohl(e->i.upper),
332 e->node, ntohl(e->i.key));
333 }
334 list_del(&e->next);
335 kfree(e);
336 }
337}
338
339/**
340 * tipc_named_rcv - process name table update messages sent by another node
341 */
342void tipc_named_rcv(struct net *net, struct sk_buff_head *inputq)
343{
344 struct tipc_net *tn = net_generic(net, tipc_net_id);
345 struct tipc_msg *msg;
346 struct distr_item *item;
347 uint count;

--- 5 unchanged lines hidden (view full) ---

353 for (skb = skb_dequeue(inputq); skb; skb = skb_dequeue(inputq)) {
354 skb_linearize(skb);
355 msg = buf_msg(skb);
356 mtype = msg_type(msg);
357 item = (struct distr_item *)msg_data(msg);
358 count = msg_data_sz(msg) / ITEM_SIZE;
359 node = msg_orignode(msg);
360 while (count--) {
296 * tipc_named_rcv - process name table update messages sent by another node
297 */
298void tipc_named_rcv(struct net *net, struct sk_buff_head *inputq)
299{
300 struct tipc_net *tn = net_generic(net, tipc_net_id);
301 struct tipc_msg *msg;
302 struct distr_item *item;
303 uint count;

--- 5 unchanged lines hidden (view full) ---

309 for (skb = skb_dequeue(inputq); skb; skb = skb_dequeue(inputq)) {
310 skb_linearize(skb);
311 msg = buf_msg(skb);
312 mtype = msg_type(msg);
313 item = (struct distr_item *)msg_data(msg);
314 count = msg_data_sz(msg) / ITEM_SIZE;
315 node = msg_orignode(msg);
316 while (count--) {
361 if (!tipc_update_nametbl(net, item, node, mtype))
362 tipc_named_add_backlog(net, item, mtype, node);
317 tipc_update_nametbl(net, item, node, mtype);
363 item++;
364 }
365 kfree_skb(skb);
318 item++;
319 }
320 kfree_skb(skb);
366 tipc_named_process_backlog(net);
367 }
368 spin_unlock_bh(&tn->nametbl_lock);
369}
370
371/**
372 * tipc_named_reinit - re-initialize local publications
373 *
374 * This routine is called whenever TIPC networking is enabled.

--- 19 unchanged lines hidden ---
321 }
322 spin_unlock_bh(&tn->nametbl_lock);
323}
324
325/**
326 * tipc_named_reinit - re-initialize local publications
327 *
328 * This routine is called whenever TIPC networking is enabled.

--- 19 unchanged lines hidden ---