Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit ca0c4273 authored by Ying Xue's avatar Ying Xue Committed by David S. Miller
Browse files

tipc: avoid to asynchronously deliver name tables to peer node



Postpone the actions of delivering name tables until after node
lock is released, avoiding to do it under asynchronous context.

Signed-off-by: default avatarYing Xue <ying.xue@windriver.com>
Reviewed-by: default avatarErik Hugne <erik.hugne@ericsson.com>
Reviewed-by: default avatarJon Maloy <jon.maloy@ericsson.com>
Signed-off-by: default avatarDavid S. Miller <davem@davemloft.net>
parent 9d561949
Loading
Loading
Loading
Loading
+2 −50
Original line number Diff line number Diff line
@@ -38,34 +38,6 @@
#include "link.h"
#include "name_distr.h"

#define ITEM_SIZE sizeof(struct distr_item)

/**
 * struct distr_item - publication info distributed to other nodes
 * @type: name sequence type
 * @lower: name sequence lower bound
 * @upper: name sequence upper bound
 * @ref: publishing port reference
 * @key: publication key
 *
 * ===> All fields are stored in network byte order. <===
 *
 * First 3 fields identify (name or) name sequence being published.
 * Reference field uniquely identifies port that published name sequence.
 * Key field uniquely identifies publication, in the event a port has
 * multiple publications of the same name sequence.
 *
 * Note: There is no field that identifies the publishing node because it is
 * the same for all items contained within a publication message.
 */
struct distr_item {
	__be32 type;
	__be32 lower;
	__be32 upper;
	__be32 ref;
	__be32 key;
};

/**
 * struct publ_list - list of publications made by this node
 * @list: circular list of publications
@@ -239,29 +211,9 @@ static void named_distribute(struct list_head *message_list, u32 node,
/**
 * tipc_named_node_up - tell specified node about all publications by this node
 */
void tipc_named_node_up(unsigned long nodearg)
void tipc_named_node_up(u32 max_item_buf, u32 node)
{
	struct tipc_node *n_ptr;
	struct tipc_link *l_ptr;
	struct list_head message_list;
	u32 node = (u32)nodearg;
	u32 max_item_buf = 0;

	/* compute maximum amount of publication data to send per message */
	n_ptr = tipc_node_find(node);
	if (n_ptr) {
		tipc_node_lock(n_ptr);
		l_ptr = n_ptr->active_links[0];
		if (l_ptr)
			max_item_buf = ((l_ptr->max_pkt - INT_H_SIZE) /
				ITEM_SIZE) * ITEM_SIZE;
		tipc_node_unlock(n_ptr);
	}
	if (!max_item_buf)
		return;

	/* create list of publication messages, then send them as a unit */
	INIT_LIST_HEAD(&message_list);
	LIST_HEAD(message_list);

	read_lock_bh(&tipc_nametbl_lock);
	named_distribute(&message_list, node, &publ_cluster, max_item_buf);
+29 −1
Original line number Diff line number Diff line
@@ -39,10 +39,38 @@

#include "name_table.h"

#define ITEM_SIZE sizeof(struct distr_item)

/**
 * struct distr_item - publication info distributed to other nodes
 * @type: name sequence type
 * @lower: name sequence lower bound
 * @upper: name sequence upper bound
 * @ref: publishing port reference
 * @key: publication key
 *
 * ===> All fields are stored in network byte order. <===
 *
 * First 3 fields identify (name or) name sequence being published.
 * Reference field uniquely identifies port that published name sequence.
 * Key field uniquely identifies publication, in the event a port has
 * multiple publications of the same name sequence.
 *
 * Note: There is no field that identifies the publishing node because it is
 * the same for all items contained within a publication message.
 */
struct distr_item {
	__be32 type;
	__be32 lower;
	__be32 upper;
	__be32 ref;
	__be32 key;
};

struct sk_buff *tipc_named_publish(struct publication *publ);
struct sk_buff *tipc_named_withdraw(struct publication *publ);
void named_cluster_distribute(struct sk_buff *buf);
void tipc_named_node_up(unsigned long node);
void tipc_named_node_up(u32 max_item_buf, u32 node);
void tipc_named_rcv(struct sk_buff *buf);
void tipc_named_reinit(void);

+15 −1
Original line number Diff line number Diff line
@@ -267,7 +267,7 @@ void tipc_node_detach_link(struct tipc_node *n_ptr, struct tipc_link *l_ptr)

static void node_established_contact(struct tipc_node *n_ptr)
{
	tipc_k_signal((Handler)tipc_named_node_up, n_ptr->addr);
	n_ptr->flags |= TIPC_NODE_UP;
	n_ptr->bclink.oos_state = 0;
	n_ptr->bclink.acked = tipc_bclink_get_last_sent();
	tipc_bclink_add_node(n_ptr->addr);
@@ -455,6 +455,9 @@ int tipc_node_get_linkname(u32 bearer_id, u32 addr, char *linkname, size_t len)
void tipc_node_unlock(struct tipc_node *node)
{
	LIST_HEAD(nsub_list);
	struct tipc_link *link;
	int pkt_sz = 0;
	u32 addr = 0;

	if (likely(!node->flags)) {
		spin_unlock_bh(&node->lock);
@@ -465,8 +468,19 @@ void tipc_node_unlock(struct tipc_node *node)
		list_replace_init(&node->nsub, &nsub_list);
		node->flags &= ~TIPC_NODE_LOST;
	}
	if (node->flags & TIPC_NODE_UP) {
		link = node->active_links[0];
		node->flags &= ~TIPC_NODE_UP;
		if (link) {
			pkt_sz = ((link->max_pkt - INT_H_SIZE) / ITEM_SIZE) *
				  ITEM_SIZE;
			addr = node->addr;
		}
	}
	spin_unlock_bh(&node->lock);

	if (!list_empty(&nsub_list))
		tipc_nodesub_notify(&nsub_list);
	if (pkt_sz)
		tipc_named_node_up(pkt_sz, addr);
}
+6 −2
Original line number Diff line number Diff line
@@ -48,15 +48,19 @@
#define INVALID_NODE_SIG 0x10000

/* Flags used to block (re)establishment of contact with a neighboring node
 * TIPC_NODE_DOWN: indicate node is down
 * TIPC_NODE_DOWN: indicate node is down and it's used to block the node's
 *                 links until RESET or ACTIVE message arrives
 * TIPC_NODE_RESET: indicate node is reset
 * TIPC_NODE_LOST: indicate node is lost and it's used to notify subscriptions
 *                 when node lock is released
 * TIPC_NODE_UP: indicate node is up and it's used to deliver local name table
 *               when node lock is released
 */
enum {
	TIPC_NODE_DOWN	= (1 << 1),
	TIPC_NODE_RESET	= (1 << 2),
	TIPC_NODE_LOST	= (1 << 3)
	TIPC_NODE_LOST	= (1 << 3),
	TIPC_NODE_UP	= (1 << 4)
};

/**