Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit 0cc1f315 authored by David S. Miller's avatar David S. Miller
Browse files

Merge branch 'tipc-scalable-broadcast-NACK'



Jon Maloy says:

====================
tipc: improve broadcast NACK mechanism

The broadcast protocol has turned out to not scale well beyond 70-80
nodes, while it is now possible to build TIPC clusters of at least ten
times that size. This commit series improves the NACK/retransmission
mechanism of the broadcast protocol to make is at scalable as the rest
of TIPC.
====================

Signed-off-by: default avatarDavid S. Miller <davem@davemloft.net>
parents 2c896fb0 e0a05ebe
Loading
Loading
Loading
Loading
+5 −3
Original line number Original line Diff line number Diff line
@@ -269,18 +269,19 @@ void tipc_bcast_ack_rcv(struct net *net, struct tipc_link *l, u32 acked)
 *
 *
 * RCU is locked, no other locks set
 * RCU is locked, no other locks set
 */
 */
void tipc_bcast_sync_rcv(struct net *net, struct tipc_link *l,
int tipc_bcast_sync_rcv(struct net *net, struct tipc_link *l,
			struct tipc_msg *hdr)
			struct tipc_msg *hdr)
{
{
	struct sk_buff_head *inputq = &tipc_bc_base(net)->inputq;
	struct sk_buff_head *inputq = &tipc_bc_base(net)->inputq;
	struct sk_buff_head xmitq;
	struct sk_buff_head xmitq;
	int rc = 0;


	__skb_queue_head_init(&xmitq);
	__skb_queue_head_init(&xmitq);


	tipc_bcast_lock(net);
	tipc_bcast_lock(net);
	if (msg_type(hdr) == STATE_MSG) {
	if (msg_type(hdr) == STATE_MSG) {
		tipc_link_bc_ack_rcv(l, msg_bcast_ack(hdr), &xmitq);
		tipc_link_bc_ack_rcv(l, msg_bcast_ack(hdr), &xmitq);
		tipc_link_bc_sync_rcv(l, hdr, &xmitq);
		rc = tipc_link_bc_sync_rcv(l, hdr, &xmitq);
	} else {
	} else {
		tipc_link_bc_init_rcv(l, hdr);
		tipc_link_bc_init_rcv(l, hdr);
	}
	}
@@ -291,6 +292,7 @@ void tipc_bcast_sync_rcv(struct net *net, struct tipc_link *l,
	/* Any socket wakeup messages ? */
	/* Any socket wakeup messages ? */
	if (!skb_queue_empty(inputq))
	if (!skb_queue_empty(inputq))
		tipc_sk_rcv(net, inputq);
		tipc_sk_rcv(net, inputq);
	return rc;
}
}


/* tipc_bcast_add_peer - add a peer node to broadcast link and bearer
/* tipc_bcast_add_peer - add a peer node to broadcast link and bearer
+2 −2
Original line number Original line Diff line number Diff line
@@ -56,7 +56,7 @@ int tipc_bcast_get_mtu(struct net *net);
int tipc_bcast_xmit(struct net *net, struct sk_buff_head *list);
int tipc_bcast_xmit(struct net *net, struct sk_buff_head *list);
int tipc_bcast_rcv(struct net *net, struct tipc_link *l, struct sk_buff *skb);
int tipc_bcast_rcv(struct net *net, struct tipc_link *l, struct sk_buff *skb);
void tipc_bcast_ack_rcv(struct net *net, struct tipc_link *l, u32 acked);
void tipc_bcast_ack_rcv(struct net *net, struct tipc_link *l, u32 acked);
void tipc_bcast_sync_rcv(struct net *net, struct tipc_link *l,
int tipc_bcast_sync_rcv(struct net *net, struct tipc_link *l,
			struct tipc_msg *hdr);
			struct tipc_msg *hdr);
int tipc_nl_add_bc_link(struct net *net, struct tipc_nl_msg *msg);
int tipc_nl_add_bc_link(struct net *net, struct tipc_nl_msg *msg);
int tipc_nl_bc_link_set(struct net *net, struct nlattr *attrs[]);
int tipc_nl_bc_link_set(struct net *net, struct nlattr *attrs[]);
+110 −21
Original line number Original line Diff line number Diff line
@@ -181,7 +181,10 @@ struct tipc_link {
	u16 acked;
	u16 acked;
	struct tipc_link *bc_rcvlink;
	struct tipc_link *bc_rcvlink;
	struct tipc_link *bc_sndlink;
	struct tipc_link *bc_sndlink;
	int nack_state;
	unsigned long prev_retr;
	u16 prev_from;
	u16 prev_to;
	u8 nack_state;
	bool bc_peer_is_up;
	bool bc_peer_is_up;


	/* Statistics */
	/* Statistics */
@@ -202,6 +205,8 @@ enum {
	BC_NACK_SND_SUPPRESS,
	BC_NACK_SND_SUPPRESS,
};
};


#define TIPC_BC_RETR_LIMIT 10   /* [ms] */

/*
/*
 * Interval between NACKs when packets arrive out of order
 * Interval between NACKs when packets arrive out of order
 */
 */
@@ -237,7 +242,7 @@ static void tipc_link_build_proto_msg(struct tipc_link *l, int mtyp, bool probe,
				      u16 rcvgap, int tolerance, int priority,
				      u16 rcvgap, int tolerance, int priority,
				      struct sk_buff_head *xmitq);
				      struct sk_buff_head *xmitq);
static void link_print(struct tipc_link *l, const char *str);
static void link_print(struct tipc_link *l, const char *str);
static void tipc_link_build_nack_msg(struct tipc_link *l,
static int tipc_link_build_nack_msg(struct tipc_link *l,
				    struct sk_buff_head *xmitq);
				    struct sk_buff_head *xmitq);
static void tipc_link_build_bc_init_msg(struct tipc_link *l,
static void tipc_link_build_bc_init_msg(struct tipc_link *l,
					struct sk_buff_head *xmitq);
					struct sk_buff_head *xmitq);
@@ -367,6 +372,18 @@ int tipc_link_bc_peers(struct tipc_link *l)
	return l->ackers;
	return l->ackers;
}
}


u16 link_bc_rcv_gap(struct tipc_link *l)
{
	struct sk_buff *skb = skb_peek(&l->deferdq);
	u16 gap = 0;

	if (more(l->snd_nxt, l->rcv_nxt))
		gap = l->snd_nxt - l->rcv_nxt;
	if (skb)
		gap = buf_seqno(skb) - l->rcv_nxt;
	return gap;
}

void tipc_link_set_mtu(struct tipc_link *l, int mtu)
void tipc_link_set_mtu(struct tipc_link *l, int mtu)
{
{
	l->mtu = mtu;
	l->mtu = mtu;
@@ -1135,7 +1152,10 @@ int tipc_link_build_state_msg(struct tipc_link *l, struct sk_buff_head *xmitq)
		if (((l->rcv_nxt ^ tipc_own_addr(l->net)) & 0xf) != 0xf)
		if (((l->rcv_nxt ^ tipc_own_addr(l->net)) & 0xf) != 0xf)
			return 0;
			return 0;
		l->rcv_unacked = 0;
		l->rcv_unacked = 0;
		return TIPC_LINK_SND_BC_ACK;

		/* Use snd_nxt to store peer's snd_nxt in broadcast rcv link */
		l->snd_nxt = l->rcv_nxt;
		return TIPC_LINK_SND_STATE;
	}
	}


	/* Unicast ACK */
	/* Unicast ACK */
@@ -1164,17 +1184,26 @@ void tipc_link_build_reset_msg(struct tipc_link *l, struct sk_buff_head *xmitq)
}
}


/* tipc_link_build_nack_msg: prepare link nack message for transmission
/* tipc_link_build_nack_msg: prepare link nack message for transmission
 * Note that sending of broadcast NACK is coordinated among nodes, to
 * reduce the risk of NACK storms towards the sender
 */
 */
static void tipc_link_build_nack_msg(struct tipc_link *l,
static int tipc_link_build_nack_msg(struct tipc_link *l,
				    struct sk_buff_head *xmitq)
				    struct sk_buff_head *xmitq)
{
{
	u32 def_cnt = ++l->stats.deferred_recv;
	u32 def_cnt = ++l->stats.deferred_recv;
	int match1, match2;


	if (link_is_bc_rcvlink(l))
	if (link_is_bc_rcvlink(l)) {
		return;
		match1 = def_cnt & 0xf;
		match2 = tipc_own_addr(l->net) & 0xf;
		if (match1 == match2)
			return TIPC_LINK_SND_STATE;
		return 0;
	}


	if ((skb_queue_len(&l->deferdq) == 1) || !(def_cnt % TIPC_NACK_INTV))
	if ((skb_queue_len(&l->deferdq) == 1) || !(def_cnt % TIPC_NACK_INTV))
		tipc_link_build_proto_msg(l, STATE_MSG, 0, 0, 0, 0, xmitq);
		tipc_link_build_proto_msg(l, STATE_MSG, 0, 0, 0, 0, xmitq);
	return 0;
}
}


/* tipc_link_rcv - process TIPC packets/messages arriving from off-node
/* tipc_link_rcv - process TIPC packets/messages arriving from off-node
@@ -1225,7 +1254,7 @@ int tipc_link_rcv(struct tipc_link *l, struct sk_buff *skb,
		/* Defer delivery if sequence gap */
		/* Defer delivery if sequence gap */
		if (unlikely(seqno != rcv_nxt)) {
		if (unlikely(seqno != rcv_nxt)) {
			__tipc_skb_queue_sorted(defq, seqno, skb);
			__tipc_skb_queue_sorted(defq, seqno, skb);
			tipc_link_build_nack_msg(l, xmitq);
			rc |= tipc_link_build_nack_msg(l, xmitq);
			break;
			break;
		}
		}


@@ -1236,7 +1265,7 @@ int tipc_link_rcv(struct tipc_link *l, struct sk_buff *skb,
			rc |= tipc_link_input(l, skb, l->inputq);
			rc |= tipc_link_input(l, skb, l->inputq);
		if (unlikely(++l->rcv_unacked >= TIPC_MIN_LINK_WIN))
		if (unlikely(++l->rcv_unacked >= TIPC_MIN_LINK_WIN))
			rc |= tipc_link_build_state_msg(l, xmitq);
			rc |= tipc_link_build_state_msg(l, xmitq);
		if (unlikely(rc & ~TIPC_LINK_SND_BC_ACK))
		if (unlikely(rc & ~TIPC_LINK_SND_STATE))
			break;
			break;
	} while ((skb = __skb_dequeue(defq)));
	} while ((skb = __skb_dequeue(defq)));


@@ -1250,10 +1279,11 @@ static void tipc_link_build_proto_msg(struct tipc_link *l, int mtyp, bool probe,
				      u16 rcvgap, int tolerance, int priority,
				      u16 rcvgap, int tolerance, int priority,
				      struct sk_buff_head *xmitq)
				      struct sk_buff_head *xmitq)
{
{
	struct tipc_link *bcl = l->bc_rcvlink;
	struct sk_buff *skb;
	struct sk_buff *skb;
	struct tipc_msg *hdr;
	struct tipc_msg *hdr;
	struct sk_buff_head *dfq = &l->deferdq;
	struct sk_buff_head *dfq = &l->deferdq;
	bool node_up = link_is_up(l->bc_rcvlink);
	bool node_up = link_is_up(bcl);
	struct tipc_mon_state *mstate = &l->mon_state;
	struct tipc_mon_state *mstate = &l->mon_state;
	int dlen = 0;
	int dlen = 0;
	void *data;
	void *data;
@@ -1281,7 +1311,7 @@ static void tipc_link_build_proto_msg(struct tipc_link *l, int mtyp, bool probe,
	msg_set_net_plane(hdr, l->net_plane);
	msg_set_net_plane(hdr, l->net_plane);
	msg_set_next_sent(hdr, l->snd_nxt);
	msg_set_next_sent(hdr, l->snd_nxt);
	msg_set_ack(hdr, l->rcv_nxt - 1);
	msg_set_ack(hdr, l->rcv_nxt - 1);
	msg_set_bcast_ack(hdr, l->bc_rcvlink->rcv_nxt - 1);
	msg_set_bcast_ack(hdr, bcl->rcv_nxt - 1);
	msg_set_last_bcast(hdr, l->bc_sndlink->snd_nxt - 1);
	msg_set_last_bcast(hdr, l->bc_sndlink->snd_nxt - 1);
	msg_set_link_tolerance(hdr, tolerance);
	msg_set_link_tolerance(hdr, tolerance);
	msg_set_linkprio(hdr, priority);
	msg_set_linkprio(hdr, priority);
@@ -1291,6 +1321,7 @@ static void tipc_link_build_proto_msg(struct tipc_link *l, int mtyp, bool probe,


	if (mtyp == STATE_MSG) {
	if (mtyp == STATE_MSG) {
		msg_set_seq_gap(hdr, rcvgap);
		msg_set_seq_gap(hdr, rcvgap);
		msg_set_bc_gap(hdr, link_bc_rcv_gap(bcl));
		msg_set_probe(hdr, probe);
		msg_set_probe(hdr, probe);
		tipc_mon_prep(l->net, data, &dlen, mstate, l->bearer_id);
		tipc_mon_prep(l->net, data, &dlen, mstate, l->bearer_id);
		msg_set_size(hdr, INT_H_SIZE + dlen);
		msg_set_size(hdr, INT_H_SIZE + dlen);
@@ -1573,51 +1604,107 @@ void tipc_link_bc_init_rcv(struct tipc_link *l, struct tipc_msg *hdr)
		l->rcv_nxt = peers_snd_nxt;
		l->rcv_nxt = peers_snd_nxt;
}
}


/* link_bc_retr eval()- check if the indicated range can be retransmitted now
 * - Adjust permitted range if there is overlap with previous retransmission
 */
static bool link_bc_retr_eval(struct tipc_link *l, u16 *from, u16 *to)
{
	unsigned long elapsed = jiffies_to_msecs(jiffies - l->prev_retr);

	if (less(*to, *from))
		return false;

	/* New retransmission request */
	if ((elapsed > TIPC_BC_RETR_LIMIT) ||
	    less(*to, l->prev_from) || more(*from, l->prev_to)) {
		l->prev_from = *from;
		l->prev_to = *to;
		l->prev_retr = jiffies;
		return true;
	}

	/* Inside range of previous retransmit */
	if (!less(*from, l->prev_from) && !more(*to, l->prev_to))
		return false;

	/* Fully or partially outside previous range => exclude overlap */
	if (less(*from, l->prev_from)) {
		*to = l->prev_from - 1;
		l->prev_from = *from;
	}
	if (more(*to, l->prev_to)) {
		*from = l->prev_to + 1;
		l->prev_to = *to;
	}
	l->prev_retr = jiffies;
	return true;
}

/* tipc_link_bc_sync_rcv - update rcv link according to peer's send state
/* tipc_link_bc_sync_rcv - update rcv link according to peer's send state
 */
 */
void tipc_link_bc_sync_rcv(struct tipc_link *l, struct tipc_msg *hdr,
int tipc_link_bc_sync_rcv(struct tipc_link *l, struct tipc_msg *hdr,
			  struct sk_buff_head *xmitq)
			  struct sk_buff_head *xmitq)
{
{
	struct tipc_link *snd_l = l->bc_sndlink;
	u16 peers_snd_nxt = msg_bc_snd_nxt(hdr);
	u16 peers_snd_nxt = msg_bc_snd_nxt(hdr);
	u16 from = msg_bcast_ack(hdr) + 1;
	u16 to = from + msg_bc_gap(hdr) - 1;
	int rc = 0;


	if (!link_is_up(l))
	if (!link_is_up(l))
		return;
		return rc;


	if (!msg_peer_node_is_up(hdr))
	if (!msg_peer_node_is_up(hdr))
		return;
		return rc;


	/* Open when peer ackowledges our bcast init msg (pkt #1) */
	/* Open when peer ackowledges our bcast init msg (pkt #1) */
	if (msg_ack(hdr))
	if (msg_ack(hdr))
		l->bc_peer_is_up = true;
		l->bc_peer_is_up = true;


	if (!l->bc_peer_is_up)
	if (!l->bc_peer_is_up)
		return;
		return rc;

	l->stats.recv_nacks++;


	/* Ignore if peers_snd_nxt goes beyond receive window */
	/* Ignore if peers_snd_nxt goes beyond receive window */
	if (more(peers_snd_nxt, l->rcv_nxt + l->window))
	if (more(peers_snd_nxt, l->rcv_nxt + l->window))
		return;
		return rc;

	if (link_bc_retr_eval(snd_l, &from, &to))
		rc = tipc_link_retrans(snd_l, from, to, xmitq);

	l->snd_nxt = peers_snd_nxt;
	if (link_bc_rcv_gap(l))
		rc |= TIPC_LINK_SND_STATE;

	/* Return now if sender supports nack via STATE messages */
	if (l->peer_caps & TIPC_BCAST_STATE_NACK)
		return rc;

	/* Otherwise, be backwards compatible */


	if (!more(peers_snd_nxt, l->rcv_nxt)) {
	if (!more(peers_snd_nxt, l->rcv_nxt)) {
		l->nack_state = BC_NACK_SND_CONDITIONAL;
		l->nack_state = BC_NACK_SND_CONDITIONAL;
		return;
		return 0;
	}
	}


	/* Don't NACK if one was recently sent or peeked */
	/* Don't NACK if one was recently sent or peeked */
	if (l->nack_state == BC_NACK_SND_SUPPRESS) {
	if (l->nack_state == BC_NACK_SND_SUPPRESS) {
		l->nack_state = BC_NACK_SND_UNCONDITIONAL;
		l->nack_state = BC_NACK_SND_UNCONDITIONAL;
		return;
		return 0;
	}
	}


	/* Conditionally delay NACK sending until next synch rcv */
	/* Conditionally delay NACK sending until next synch rcv */
	if (l->nack_state == BC_NACK_SND_CONDITIONAL) {
	if (l->nack_state == BC_NACK_SND_CONDITIONAL) {
		l->nack_state = BC_NACK_SND_UNCONDITIONAL;
		l->nack_state = BC_NACK_SND_UNCONDITIONAL;
		if ((peers_snd_nxt - l->rcv_nxt) < TIPC_MIN_LINK_WIN)
		if ((peers_snd_nxt - l->rcv_nxt) < TIPC_MIN_LINK_WIN)
			return;
			return 0;
	}
	}


	/* Send NACK now but suppress next one */
	/* Send NACK now but suppress next one */
	tipc_link_build_bc_proto_msg(l, true, peers_snd_nxt, xmitq);
	tipc_link_build_bc_proto_msg(l, true, peers_snd_nxt, xmitq);
	l->nack_state = BC_NACK_SND_SUPPRESS;
	l->nack_state = BC_NACK_SND_SUPPRESS;
	return 0;
}
}


void tipc_link_bc_ack_rcv(struct tipc_link *l, u16 acked,
void tipc_link_bc_ack_rcv(struct tipc_link *l, u16 acked,
@@ -1654,6 +1741,8 @@ void tipc_link_bc_ack_rcv(struct tipc_link *l, u16 acked,
}
}


/* tipc_link_bc_nack_rcv(): receive broadcast nack message
/* tipc_link_bc_nack_rcv(): receive broadcast nack message
 * This function is here for backwards compatibility, since
 * no BCAST_PROTOCOL/STATE messages occur from TIPC v2.5.
 */
 */
int tipc_link_bc_nack_rcv(struct tipc_link *l, struct sk_buff *skb,
int tipc_link_bc_nack_rcv(struct tipc_link *l, struct sk_buff *skb,
			  struct sk_buff_head *xmitq)
			  struct sk_buff_head *xmitq)
+3 −3
Original line number Original line Diff line number Diff line
@@ -63,7 +63,7 @@ enum {
enum {
enum {
	TIPC_LINK_UP_EVT       = 1,
	TIPC_LINK_UP_EVT       = 1,
	TIPC_LINK_DOWN_EVT     = (1 << 1),
	TIPC_LINK_DOWN_EVT     = (1 << 1),
	TIPC_LINK_SND_BC_ACK   = (1 << 2)
	TIPC_LINK_SND_STATE    = (1 << 2)
};
};


/* Starting value for maximum packet size negotiation on unicast links
/* Starting value for maximum packet size negotiation on unicast links
@@ -138,7 +138,7 @@ void tipc_link_bc_ack_rcv(struct tipc_link *l, u16 acked,
void tipc_link_build_bc_sync_msg(struct tipc_link *l,
void tipc_link_build_bc_sync_msg(struct tipc_link *l,
				 struct sk_buff_head *xmitq);
				 struct sk_buff_head *xmitq);
void tipc_link_bc_init_rcv(struct tipc_link *l, struct tipc_msg *hdr);
void tipc_link_bc_init_rcv(struct tipc_link *l, struct tipc_msg *hdr);
void tipc_link_bc_sync_rcv(struct tipc_link *l,   struct tipc_msg *hdr,
int tipc_link_bc_sync_rcv(struct tipc_link *l,   struct tipc_msg *hdr,
			  struct sk_buff_head *xmitq);
			  struct sk_buff_head *xmitq);
int tipc_link_bc_nack_rcv(struct tipc_link *l, struct sk_buff *skb,
int tipc_link_bc_nack_rcv(struct tipc_link *l, struct sk_buff *skb,
			  struct sk_buff_head *xmitq);
			  struct sk_buff_head *xmitq);
+10 −0
Original line number Original line Diff line number Diff line
@@ -719,6 +719,16 @@ static inline char *msg_media_addr(struct tipc_msg *m)
	return (char *)&m->hdr[TIPC_MEDIA_INFO_OFFSET];
	return (char *)&m->hdr[TIPC_MEDIA_INFO_OFFSET];
}
}


static inline u32 msg_bc_gap(struct tipc_msg *m)
{
	return msg_bits(m, 8, 0, 0x3ff);
}

static inline void msg_set_bc_gap(struct tipc_msg *m, u32 n)
{
	msg_set_bits(m, 8, 0, 0x3ff, n);
}

/*
/*
 * Word 9
 * Word 9
 */
 */
Loading