Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit cb1b7280 authored by Jon Paul Maloy's avatar Jon Paul Maloy Committed by David S. Miller
Browse files

tipc: eliminate race condition at multicast reception



In a previous commit in this series we resolved a race problem during
unicast message reception.

Here, we resolve the same problem at multicast reception. We apply the
same technique: an input queue serializing the delivery of arriving
buffers. The main difference is that here we do it in two steps.
First, the broadcast link feeds arriving buffers into the tail of an
arrival queue, which head is consumed at the socket level, and where
destination lookup is performed. Second, if the lookup is successful,
the resulting buffer clones are fed into a second queue, the input
queue. This queue is consumed at reception in the socket just like
in the unicast case. Both queues are protected by the same lock, -the
one of the input queue.

Reviewed-by: default avatarYing Xue <ying.xue@windriver.com>
Signed-off-by: default avatarJon Maloy <jon.maloy@ericsson.com>
Signed-off-by: default avatarDavid S. Miller <davem@davemloft.net>
parent 3c724acd
Loading
Loading
Loading
Loading
+35 −22
Original line number Diff line number Diff line
@@ -79,6 +79,13 @@ static void tipc_bclink_unlock(struct net *net)
		tipc_link_reset_all(node);
}

void tipc_bclink_input(struct net *net)
{
	struct tipc_net *tn = net_generic(net, tipc_net_id);

	tipc_sk_mcast_rcv(net, &tn->bclink->arrvq, &tn->bclink->inputq);
}

uint  tipc_bclink_get_mtu(void)
{
	return MAX_PKT_DEFAULT_MCAST;
@@ -356,7 +363,7 @@ static void bclink_peek_nack(struct net *net, struct tipc_msg *msg)
	tipc_node_unlock(n_ptr);
}

/* tipc_bclink_xmit - broadcast buffer chain to all nodes in cluster
/* tipc_bclink_xmit - deliver buffer chain to all nodes in cluster
 *                    and to identified node local sockets
 * @net: the applicable net namespace
 * @list: chain of buffers containing message
@@ -371,6 +378,8 @@ int tipc_bclink_xmit(struct net *net, struct sk_buff_head *list)
	int rc = 0;
	int bc = 0;
	struct sk_buff *skb;
	struct sk_buff_head arrvq;
	struct sk_buff_head inputq;

	/* Prepare clone of message for local node */
	skb = tipc_msg_reassemble(list);
@@ -379,7 +388,7 @@ int tipc_bclink_xmit(struct net *net, struct sk_buff_head *list)
		return -EHOSTUNREACH;
	}

	/* Broadcast to all other nodes */
	/* Broadcast to all nodes */
	if (likely(bclink)) {
		tipc_bclink_lock(net);
		if (likely(bclink->bcast_nodes.count)) {
@@ -399,12 +408,15 @@ int tipc_bclink_xmit(struct net *net, struct sk_buff_head *list)
	if (unlikely(!bc))
		__skb_queue_purge(list);

	/* Deliver message clone */
	if (likely(!rc))
		tipc_sk_mcast_rcv(net, skb);
	else
	if (unlikely(rc)) {
		kfree_skb(skb);

		return rc;
	}
	/* Deliver message clone */
	__skb_queue_head_init(&arrvq);
	skb_queue_head_init(&inputq);
	__skb_queue_tail(&arrvq, skb);
	tipc_sk_mcast_rcv(net, &arrvq, &inputq);
	return rc;
}

@@ -449,7 +461,7 @@ void tipc_bclink_rcv(struct net *net, struct sk_buff *buf)
	int deferred = 0;
	int pos = 0;
	struct sk_buff *iskb;
	struct sk_buff_head msgs;
	struct sk_buff_head *arrvq, *inputq;

	/* Screen out unwanted broadcast messages */
	if (msg_mc_netid(msg) != tn->net_id)
@@ -486,6 +498,8 @@ void tipc_bclink_rcv(struct net *net, struct sk_buff *buf)
	/* Handle in-sequence broadcast message */
	seqno = msg_seqno(msg);
	next_in = mod(node->bclink.last_in + 1);
	arrvq = &tn->bclink->arrvq;
	inputq = &tn->bclink->inputq;

	if (likely(seqno == next_in)) {
receive:
@@ -493,21 +507,26 @@ void tipc_bclink_rcv(struct net *net, struct sk_buff *buf)
		if (likely(msg_isdata(msg))) {
			tipc_bclink_lock(net);
			bclink_accept_pkt(node, seqno);
			spin_lock_bh(&inputq->lock);
			__skb_queue_tail(arrvq, buf);
			spin_unlock_bh(&inputq->lock);
			node->action_flags |= TIPC_BCAST_MSG_EVT;
			tipc_bclink_unlock(net);
			tipc_node_unlock(node);
			if (likely(msg_mcast(msg)))
				tipc_sk_mcast_rcv(net, buf);
			else
				kfree_skb(buf);
		} else if (msg_user(msg) == MSG_BUNDLER) {
			tipc_bclink_lock(net);
			bclink_accept_pkt(node, seqno);
			bcl->stats.recv_bundles++;
			bcl->stats.recv_bundled += msg_msgcnt(msg);
			pos = 0;
			while (tipc_msg_extract(buf, &iskb, &pos)) {
				spin_lock_bh(&inputq->lock);
				__skb_queue_tail(arrvq, iskb);
				spin_unlock_bh(&inputq->lock);
			}
			node->action_flags |= TIPC_BCAST_MSG_EVT;
			tipc_bclink_unlock(net);
			tipc_node_unlock(node);
			while (tipc_msg_extract(buf, &iskb, &pos))
				tipc_sk_mcast_rcv(net, iskb);
		} else if (msg_user(msg) == MSG_FRAGMENTER) {
			tipc_buf_append(&node->bclink.reasm_buf, &buf);
			if (unlikely(!buf && !node->bclink.reasm_buf))
@@ -523,14 +542,6 @@ void tipc_bclink_rcv(struct net *net, struct sk_buff *buf)
			}
			tipc_bclink_unlock(net);
			tipc_node_unlock(node);
		} else if (msg_user(msg) == NAME_DISTRIBUTOR) {
			tipc_bclink_lock(net);
			bclink_accept_pkt(node, seqno);
			tipc_bclink_unlock(net);
			tipc_node_unlock(node);
			skb_queue_head_init(&msgs);
			skb_queue_tail(&msgs, buf);
			tipc_named_rcv(net, &msgs);
		} else {
			tipc_bclink_lock(net);
			bclink_accept_pkt(node, seqno);
@@ -950,6 +961,8 @@ int tipc_bclink_init(struct net *net)
	skb_queue_head_init(&bcl->wakeupq);
	bcl->next_out_no = 1;
	spin_lock_init(&bclink->node.lock);
	__skb_queue_head_init(&bclink->arrvq);
	skb_queue_head_init(&bclink->inputq);
	bcl->owner = &bclink->node;
	bcl->owner->net = net;
	bcl->max_pkt = MAX_PKT_DEFAULT_MCAST;
+3 −0
Original line number Diff line number Diff line
@@ -97,6 +97,8 @@ struct tipc_bclink {
	struct tipc_link link;
	struct tipc_node node;
	unsigned int flags;
	struct sk_buff_head arrvq;
	struct sk_buff_head inputq;
	struct tipc_node_map bcast_nodes;
	struct tipc_node *retransmit_to;
};
@@ -134,5 +136,6 @@ uint tipc_bclink_get_mtu(void);
int tipc_bclink_xmit(struct net *net, struct sk_buff_head *list);
void tipc_bclink_wakeup_users(struct net *net);
int tipc_nl_add_bc_link(struct net *net, struct tipc_nl_msg *msg);
void tipc_bclink_input(struct net *net);

#endif
+17 −0
Original line number Diff line number Diff line
@@ -767,6 +767,23 @@ bool tipc_msg_lookup_dest(struct net *net, struct sk_buff *skb, u32 *dnode,
			  int *err);
struct sk_buff *tipc_msg_reassemble(struct sk_buff_head *list);

/* tipc_skb_peek(): peek and reserve first buffer in list
 * @list: list to be peeked in
 * Returns pointer to first buffer in list, if any
 */
static inline struct sk_buff *tipc_skb_peek(struct sk_buff_head *list,
					    spinlock_t *lock)
{
	struct sk_buff *skb;

	spin_lock_bh(lock);
	skb = skb_peek(list);
	if (skb)
		skb_get(skb);
	spin_unlock_bh(lock);
	return skb;
}

/* tipc_skb_peek_port(): find a destination port, ignoring all destinations
 *                       up to and including 'filter'.
 * Note: ignoring previously tried destinations minimizes the risk of
+1 −1
Original line number Diff line number Diff line
/*
 * net/tipc/name_table.h: Include file for TIPC name table code
 *
 * Copyright (c) 2000-2006, 2014, Ericsson AB
 * Copyright (c) 2000-2006, 2014-2015, Ericsson AB
 * Copyright (c) 2004-2005, 2010-2011, Wind River Systems
 * All rights reserved.
 *
+7 −4
Original line number Diff line number Diff line
@@ -582,10 +582,10 @@ void tipc_node_unlock(struct tipc_node *node)
	namedq = node->namedq;
	publ_list = &node->publ_list;

	node->action_flags &= ~(TIPC_MSG_EVT | TIPC_NOTIFY_NODE_DOWN |
				TIPC_NOTIFY_NODE_UP | TIPC_NOTIFY_LINK_UP |
				TIPC_NOTIFY_LINK_DOWN |
				TIPC_WAKEUP_BCAST_USERS |
	node->action_flags &= ~(TIPC_MSG_EVT |
				TIPC_NOTIFY_NODE_DOWN | TIPC_NOTIFY_NODE_UP |
				TIPC_NOTIFY_LINK_DOWN | TIPC_NOTIFY_LINK_UP |
				TIPC_WAKEUP_BCAST_USERS | TIPC_BCAST_MSG_EVT |
				TIPC_NAMED_MSG_EVT);

	spin_unlock_bh(&node->lock);
@@ -612,6 +612,9 @@ void tipc_node_unlock(struct tipc_node *node)

	if (flags & TIPC_NAMED_MSG_EVT)
		tipc_named_rcv(net, namedq);

	if (flags & TIPC_BCAST_MSG_EVT)
		tipc_bclink_input(net);
}

/* Caller should hold node lock for the passed node */
Loading