Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit 52666986 authored by Jon Paul Maloy's avatar Jon Paul Maloy Committed by David S. Miller
Browse files

tipc: let broadcast packet reception use new link receive function



The code path for receiving broadcast packets is currently distinct
from the unicast path. This leads to unnecessary code and data
duplication, something that can be avoided with some effort.

We now introduce separate per-peer tipc_link instances for handling
broadcast packet reception. Each receive link keeps a pointer to the
common, single, broadcast link instance, and can hence handle release
and retransmission of send buffers as if they belonged to the own
instance.

Furthermore, we let each unicast link instance keep a reference to both
the pertaining broadcast receive link, and to the common send link.
This makes it possible for the unicast links to easily access data for
broadcast link synchronization, as well as for carrying acknowledges for
received broadcast packets.

Signed-off-by: default avatarJon Maloy <jon.maloy@ericsson.com>
Reviewed-by: default avatarYing Xue <ying.xue@windriver.com>
Signed-off-by: default avatarDavid S. Miller <davem@davemloft.net>
parent fd556f20
Loading
Loading
Loading
Loading
+132 −31
Original line number Diff line number Diff line
@@ -112,11 +112,6 @@ static struct tipc_bc_base *tipc_bc_base(struct net *net)
	return tipc_net(net)->bcbase;
}

static struct tipc_link *tipc_bc_sndlink(struct net *net)
{
	return tipc_net(net)->bcl;
}

/**
 * tipc_nmap_equal - test for equality of node maps
 */
@@ -169,31 +164,6 @@ static void bcbuf_decr_acks(struct sk_buff *buf)
	bcbuf_set_acks(buf, bcbuf_acks(buf) - 1);
}

void tipc_bclink_add_node(struct net *net, u32 addr)
{
	struct tipc_net *tn = net_generic(net, tipc_net_id);
	struct tipc_link *l = tipc_bc_sndlink(net);
	tipc_bclink_lock(net);
	tipc_nmap_add(&tn->bcbase->bcast_nodes, addr);
	tipc_link_add_bc_peer(l);
	tipc_bclink_unlock(net);
}

void tipc_bclink_remove_node(struct net *net, u32 addr)
{
	struct tipc_net *tn = net_generic(net, tipc_net_id);

	tipc_bclink_lock(net);
	tipc_nmap_remove(&tn->bcbase->bcast_nodes, addr);
	tn->bcl->ackers--;

	/* Last node? => reset backlog queue */
	if (!tn->bcbase->bcast_nodes.count)
		tipc_link_purge_backlog(tn->bcbase->link);

	tipc_bclink_unlock(net);
}

static void bclink_set_last_sent(struct net *net)
{
	struct tipc_net *tn = net_generic(net, tipc_net_id);
@@ -501,12 +471,141 @@ int tipc_bcast_xmit(struct net *net, struct sk_buff_head *list)
		__skb_queue_purge(&rcvq);
		return rc;
	}

	/* Broadcast to all nodes, inluding local node */
	tipc_bcbearer_xmit(net, &xmitq);
	tipc_sk_mcast_rcv(net, &rcvq, &inputq);
	__skb_queue_purge(list);
	return 0;
}

/* tipc_bcast_rcv - receive a broadcast packet, and deliver to rcv link
 *
 * RCU is locked, no other locks set
 */
int tipc_bcast_rcv(struct net *net, struct tipc_link *l, struct sk_buff *skb)
{
	struct tipc_msg *hdr = buf_msg(skb);
	struct sk_buff_head *inputq = &tipc_bc_base(net)->inputq;
	struct sk_buff_head xmitq;
	int rc;

	__skb_queue_head_init(&xmitq);

	if (msg_mc_netid(hdr) != tipc_netid(net) || !tipc_link_is_up(l)) {
		kfree_skb(skb);
		return 0;
	}

	tipc_bcast_lock(net);
	if (msg_user(hdr) == BCAST_PROTOCOL)
		rc = tipc_link_bc_nack_rcv(l, skb, &xmitq);
	else
		rc = tipc_link_rcv(l, skb, NULL);
	tipc_bcast_unlock(net);

	if (!skb_queue_empty(&xmitq))
		tipc_bcbearer_xmit(net, &xmitq);

	/* Any socket wakeup messages ? */
	if (!skb_queue_empty(inputq))
		tipc_sk_rcv(net, inputq);

	return rc;
}

/* tipc_bcast_ack_rcv - receive and handle a broadcast acknowledge
 *
 * RCU is locked, no other locks set
 */
void tipc_bcast_ack_rcv(struct net *net, struct tipc_link *l, u32 acked)
{
	struct sk_buff_head *inputq = &tipc_bc_base(net)->inputq;
	struct sk_buff_head xmitq;

	__skb_queue_head_init(&xmitq);

	tipc_bcast_lock(net);
	tipc_link_bc_ack_rcv(l, acked, &xmitq);
	tipc_bcast_unlock(net);

	tipc_bcbearer_xmit(net, &xmitq);

	/* Any socket wakeup messages ? */
	if (!skb_queue_empty(inputq))
		tipc_sk_rcv(net, inputq);
}

/* tipc_bcast_synch_rcv -  check and update rcv link with peer's send state
 *
 * RCU is locked, no other locks set
 */
void tipc_bcast_sync_rcv(struct net *net, struct tipc_link *l,
			 struct tipc_msg *hdr)
{
	struct sk_buff_head *inputq = &tipc_bc_base(net)->inputq;
	struct sk_buff_head xmitq;

	__skb_queue_head_init(&xmitq);

	tipc_bcast_lock(net);
	if (msg_type(hdr) == STATE_MSG) {
		tipc_link_bc_ack_rcv(l, msg_bcast_ack(hdr), &xmitq);
		tipc_link_bc_sync_rcv(l, hdr, &xmitq);
	} else {
		tipc_link_bc_init_rcv(l, hdr);
	}
	tipc_bcast_unlock(net);

	tipc_bcbearer_xmit(net, &xmitq);

	/* Any socket wakeup messages ? */
	if (!skb_queue_empty(inputq))
		tipc_sk_rcv(net, inputq);
}

/* tipc_bcast_add_peer - add a peer node to broadcast link and bearer
 *
 * RCU is locked, node lock is set
 */
void tipc_bcast_add_peer(struct net *net, u32 addr, struct tipc_link *uc_l,
			 struct sk_buff_head *xmitq)
{
	struct tipc_net *tn = net_generic(net, tipc_net_id);
	struct tipc_link *snd_l = tipc_bc_sndlink(net);

	tipc_bclink_lock(net);
	tipc_nmap_add(&tn->bcbase->bcast_nodes, addr);
	tipc_link_add_bc_peer(snd_l, uc_l, xmitq);
	tipc_bclink_unlock(net);
}

/* tipc_bcast_remove_peer - remove a peer node from broadcast link and bearer
 *
 * RCU is locked, node lock is set
 */
void tipc_bcast_remove_peer(struct net *net, u32 addr,
			    struct tipc_link *rcv_l)
{
	struct tipc_net *tn = net_generic(net, tipc_net_id);
	struct sk_buff_head *inputq = &tipc_bc_base(net)->inputq;
	struct tipc_link *snd_l = tipc_bc_sndlink(net);
	struct sk_buff_head xmitq;

	__skb_queue_head_init(&xmitq);

	tipc_bclink_lock(net);
	tipc_nmap_remove(&tn->bcbase->bcast_nodes, addr);
	tipc_link_remove_bc_peer(snd_l, rcv_l, &xmitq);
	tipc_bclink_unlock(net);

	tipc_bcbearer_xmit(net, &xmitq);

	/* Any socket wakeup messages ? */
	if (!skb_queue_empty(inputq))
		tipc_sk_rcv(net, inputq);
}

/**
 * bclink_accept_pkt - accept an incoming, in-sequence broadcast packet
 *
@@ -728,6 +827,7 @@ static int tipc_bcbearer_send(struct net *net, struct sk_buff *buf,
			return 0;
		}
	}
	msg_set_mc_netid(msg, tn->net_id);

	/* Send buffer over bearers until all targets reached */
	bcbearer->remains = bclink->bcast_nodes;
@@ -1042,12 +1142,13 @@ int tipc_bcast_init(struct net *net)
	spin_lock_init(&tipc_net(net)->bclock);
	bb->node.net = net;

	if (!tipc_link_bc_create(&bb->node,
	if (!tipc_link_bc_create(&bb->node, 0, 0,
				 MAX_PKT_DEFAULT_MCAST,
				 BCLINK_WIN_DEFAULT,
				 0,
				 &bb->inputq,
				 &bb->namedq,
				 NULL,
				 &l))
		goto enomem;
	bb->link = l;
+14 −2
Original line number Diff line number Diff line
@@ -47,8 +47,11 @@ struct tipc_node_map;
int tipc_bcast_init(struct net *net);
void tipc_bcast_reinit(struct net *net);
void tipc_bcast_stop(struct net *net);
void tipc_bclink_add_node(struct net *net, u32 addr);
void tipc_bclink_remove_node(struct net *net, u32 addr);
void tipc_bcast_add_peer(struct net *net, u32 addr,
			 struct tipc_link *l,
			 struct sk_buff_head *xmitq);
void tipc_bcast_remove_peer(struct net *net, u32 addr,
			    struct tipc_link *rcv_bcl);
struct tipc_node *tipc_bclink_retransmit_to(struct net *tn);
void tipc_bclink_acknowledge(struct tipc_node *n_ptr, u32 acked);
void tipc_bclink_rcv(struct net *net, struct sk_buff *buf);
@@ -62,6 +65,10 @@ int tipc_bclink_reset_stats(struct net *net);
int  tipc_bclink_set_queue_limits(struct net *net, u32 limit);
uint  tipc_bcast_get_mtu(void);
int tipc_bcast_xmit(struct net *net, struct sk_buff_head *list);
int tipc_bcast_rcv(struct net *net, struct tipc_link *l, struct sk_buff *skb);
void tipc_bcast_ack_rcv(struct net *net, struct tipc_link *l, u32 acked);
void tipc_bcast_sync_rcv(struct net *net, struct tipc_link *l,
			 struct tipc_msg *hdr);
void tipc_bclink_wakeup_users(struct net *net);
int tipc_nl_add_bc_link(struct net *net, struct tipc_nl_msg *msg);
int tipc_nl_bc_link_set(struct net *net, struct nlattr *attrs[]);
@@ -78,4 +85,9 @@ static inline void tipc_bcast_unlock(struct net *net)
	spin_unlock_bh(&tipc_net(net)->bclock);
}

static inline struct tipc_link *tipc_bc_sndlink(struct net *net)
{
	return tipc_net(net)->bcl;
}

#endif
+5 −0
Original line number Diff line number Diff line
@@ -115,6 +115,11 @@ static inline struct tipc_net *tipc_net(struct net *net)
	return net_generic(net, tipc_net_id);
}

static inline int tipc_netid(struct net *net)
{
	return tipc_net(net)->net_id;
}

static inline u16 mod(u16 x)
{
	return x & 0xffffu;
+312 −123

File changed.

Preview size limit exceeded, changes collapsed.

+32 −11
Original line number Diff line number Diff line
@@ -66,7 +66,8 @@ enum {
 */
enum {
	TIPC_LINK_UP_EVT       = 1,
	TIPC_LINK_DOWN_EVT     = (1 << 1)
	TIPC_LINK_DOWN_EVT     = (1 << 1),
	TIPC_LINK_SND_BC_ACK   = (1 << 2)
};

/* Starting value for maximum packet size negotiation on unicast links
@@ -209,6 +210,10 @@ struct tipc_link {
	/* Broadcast */
	u16 ackers;
	u16 acked;
	struct tipc_link *bc_rcvlink;
	struct tipc_link *bc_sndlink;
	int nack_state;
	bool bc_peer_is_up;

	/* Statistics */
	struct tipc_stats stats;
@@ -217,17 +222,21 @@ struct tipc_link {
bool tipc_link_create(struct tipc_node *n, char *if_name, int bearer_id,
		      int tolerance, char net_plane, u32 mtu, int priority,
		      int window, u32 session, u32 ownnode, u32 peer,
		      u16 peer_caps, struct tipc_media_addr *maddr,
		      struct sk_buff_head *inputq, struct sk_buff_head *namedq,
		      u16 peer_caps,
		      struct tipc_media_addr *maddr,
		      struct tipc_link *bc_sndlink,
		      struct tipc_link *bc_rcvlink,
		      struct sk_buff_head *inputq,
		      struct sk_buff_head *namedq,
		      struct tipc_link **link);
bool tipc_link_bc_create(struct tipc_node *n, int mtu, int window,
			 u16 peer_caps, struct sk_buff_head *inputq,
bool tipc_link_bc_create(struct tipc_node *n, u32 ownnode, u32 peer,
			 int mtu, int window, u16 peer_caps,
			 struct sk_buff_head *inputq,
			 struct sk_buff_head *namedq,
			 struct tipc_link *bc_sndlink,
			 struct tipc_link **link);
void tipc_link_tnl_prepare(struct tipc_link *l, struct tipc_link *tnl,
			   int mtyp, struct sk_buff_head *xmitq);
void tipc_link_build_bcast_sync_msg(struct tipc_link *l,
				    struct sk_buff_head *xmitq);
void tipc_link_build_reset_msg(struct tipc_link *l, struct sk_buff_head *xmitq);
int tipc_link_fsm_evt(struct tipc_link *l, int evt);
void tipc_link_reset_fragments(struct tipc_link *l_ptr);
@@ -264,9 +273,21 @@ int tipc_nl_parse_link_prop(struct nlattr *prop, struct nlattr *props[]);
int tipc_link_timeout(struct tipc_link *l, struct sk_buff_head *xmitq);
int tipc_link_rcv(struct tipc_link *l, struct sk_buff *skb,
		  struct sk_buff_head *xmitq);
void tipc_link_build_ack_msg(struct tipc_link *l, struct sk_buff_head *xmitq);
void tipc_link_add_bc_peer(struct tipc_link *l);
void tipc_link_remove_bc_peer(struct tipc_link *l);
int tipc_link_build_ack_msg(struct tipc_link *l, struct sk_buff_head *xmitq);
void tipc_link_add_bc_peer(struct tipc_link *snd_l,
			   struct tipc_link *uc_l,
			   struct sk_buff_head *xmitq);
void tipc_link_remove_bc_peer(struct tipc_link *snd_l,
			      struct tipc_link *rcv_l,
			      struct sk_buff_head *xmitq);
int tipc_link_bc_peers(struct tipc_link *l);

void tipc_link_bc_ack_rcv(struct tipc_link *l, u16 acked,
			  struct sk_buff_head *xmitq);
void tipc_link_build_bc_sync_msg(struct tipc_link *l,
				 struct sk_buff_head *xmitq);
void tipc_link_bc_init_rcv(struct tipc_link *l, struct tipc_msg *hdr);
void tipc_link_bc_sync_rcv(struct tipc_link *l,   struct tipc_msg *hdr,
			   struct sk_buff_head *xmitq);
int tipc_link_bc_nack_rcv(struct tipc_link *l, struct sk_buff *skb,
			  struct sk_buff_head *xmitq);
#endif
Loading