Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit 6e498158 authored by Jon Paul Maloy's avatar Jon Paul Maloy Committed by David S. Miller
Browse files

tipc: move link synch and failover to link aggregation level



Link failover and synchronization have until now been handled by the
links themselves, forcing them to have knowledge about and to access
parallel links in order to make the two algorithms work correctly.

In this commit, we move the control part of this functionality to the
link aggregation level in node.c, which is the right location for this.
As a result, the two algorithms become easier to follow, and the link
implementation becomes simpler.

Tested-by: default avatarYing Xue <ying.xue@windriver.com>
Signed-off-by: default avatarJon Maloy <jon.maloy@ericsson.com>
Signed-off-by: default avatarDavid S. Miller <davem@davemloft.net>
parent 66996b6c
Loading
Loading
Loading
Loading
+132 −350
Original line number Original line Diff line number Diff line
@@ -48,7 +48,7 @@
/*
/*
 * Error message prefixes
 * Error message prefixes
 */
 */
static const char *link_co_err = "Link changeover error, ";
static const char *link_co_err = "Link tunneling error, ";
static const char *link_rst_msg = "Resetting link ";
static const char *link_rst_msg = "Resetting link ";
static const char *link_unk_evt = "Unknown link event ";
static const char *link_unk_evt = "Unknown link event ";


@@ -139,24 +139,6 @@ static void tipc_link_build_bcast_sync_msg(struct tipc_link *l,
static void tipc_link_sync_rcv(struct tipc_node *n, struct sk_buff *buf);
static void tipc_link_sync_rcv(struct tipc_node *n, struct sk_buff *buf);
static int tipc_link_input(struct tipc_link *l, struct sk_buff *skb);
static int tipc_link_input(struct tipc_link *l, struct sk_buff *skb);
static bool tipc_data_input(struct tipc_link *l, struct sk_buff *skb);
static bool tipc_data_input(struct tipc_link *l, struct sk_buff *skb);
static int tipc_link_failover_rcv(struct tipc_link *l, struct sk_buff **skb);

/*
 *  Simple link routines
 */
static unsigned int align(unsigned int i)
{
	return (i + 3) & ~3u;
}

static struct tipc_link *tipc_parallel_link(struct tipc_link *l)
{
	struct tipc_node *n = l->owner;

	if (node_active_link(n, 0) != l)
		return node_active_link(n, 0);
	return node_active_link(n, 1);
}


/*
/*
 *  Simple non-static link routines (i.e. referenced outside this file)
 *  Simple non-static link routines (i.e. referenced outside this file)
@@ -394,12 +376,10 @@ static int tipc_link_fsm_evt(struct tipc_link *l, int evt,
	/* Perform actions as decided by FSM */
	/* Perform actions as decided by FSM */
	if (actions & LINK_RESET) {
	if (actions & LINK_RESET) {
		l->exec_mode = TIPC_LINK_BLOCKED;
		l->exec_mode = TIPC_LINK_BLOCKED;
		rc |= TIPC_LINK_DOWN_EVT;
		rc = TIPC_LINK_DOWN_EVT;
	}
	if (actions & LINK_ACTIVATE) {
		l->exec_mode = TIPC_LINK_OPEN;
		rc |= TIPC_LINK_UP_EVT;
	}
	}
	if (actions & LINK_ACTIVATE)
		rc = TIPC_LINK_UP_EVT;
	if (actions & (SND_STATE | SND_PROBE))
	if (actions & (SND_STATE | SND_PROBE))
		mtyp = STATE_MSG;
		mtyp = STATE_MSG;
	if (actions & SND_RESET)
	if (actions & SND_RESET)
@@ -461,6 +441,9 @@ int tipc_link_timeout(struct tipc_link *l, struct sk_buff_head *xmitq)
{
{
	int rc = 0;
	int rc = 0;


	if (l->exec_mode == TIPC_LINK_BLOCKED)
		return rc;

	link_profile_stats(l);
	link_profile_stats(l);
	if (l->silent_intv_cnt)
	if (l->silent_intv_cnt)
		rc = tipc_link_fsm_evt(l, SILENCE_EVT, xmitq);
		rc = tipc_link_fsm_evt(l, SILENCE_EVT, xmitq);
@@ -563,52 +546,42 @@ void tipc_link_purge_queues(struct tipc_link *l_ptr)
	tipc_link_reset_fragments(l_ptr);
	tipc_link_reset_fragments(l_ptr);
}
}


void tipc_link_reset(struct tipc_link *l_ptr)
void tipc_link_reset(struct tipc_link *l)
{
{
	u32 prev_state = l_ptr->state;
	struct tipc_node *owner = l->owner;
	struct tipc_node *owner = l_ptr->owner;
	struct tipc_link *pl = tipc_parallel_link(l_ptr);


	msg_set_session(l_ptr->pmsg, ((msg_session(l_ptr->pmsg) + 1) & 0xffff));
	l->state = TIPC_LINK_RESETTING;


	/* Link is down, accept any session */
	/* Link is down, accept any session */
	l_ptr->peer_session = WILDCARD_SESSION;
	l->peer_session = WILDCARD_SESSION;


	/* Prepare for renewed mtu size negotiation */
	/* If peer is up, it only accepts an incremented session number */
	l_ptr->mtu = l_ptr->advertised_mtu;
	msg_set_session(l->pmsg, msg_session(l->pmsg) + 1);


	l_ptr->state = TIPC_LINK_RESETTING;
	/* Prepare for renewed mtu size negotiation */

	l->mtu = l->advertised_mtu;
	if ((prev_state == TIPC_LINK_RESETTING) ||
	    (prev_state == TIPC_LINK_ESTABLISHING))
		return;


	if (tipc_node_is_up(l_ptr->owner) && (pl != l_ptr)) {
		l_ptr->exec_mode = TIPC_LINK_BLOCKED;
		l_ptr->failover_checkpt = l_ptr->rcv_nxt;
		pl->failover_pkts = FIRST_FAILOVER;
		pl->failover_checkpt = l_ptr->rcv_nxt;
		pl->failover_skb = l_ptr->reasm_buf;
	} else {
		kfree_skb(l_ptr->reasm_buf);
	}
	/* Clean up all queues, except inputq: */
	/* Clean up all queues, except inputq: */
	__skb_queue_purge(&l_ptr->transmq);
	__skb_queue_purge(&l->transmq);
	__skb_queue_purge(&l_ptr->deferdq);
	__skb_queue_purge(&l->deferdq);
	if (!owner->inputq)
	if (!owner->inputq)
		owner->inputq = l_ptr->inputq;
		owner->inputq = l->inputq;
	skb_queue_splice_init(&l_ptr->wakeupq, owner->inputq);
	skb_queue_splice_init(&l->wakeupq, owner->inputq);
	if (!skb_queue_empty(owner->inputq))
	if (!skb_queue_empty(owner->inputq))
		owner->action_flags |= TIPC_MSG_EVT;
		owner->action_flags |= TIPC_MSG_EVT;
	tipc_link_purge_backlog(l_ptr);

	l_ptr->reasm_buf = NULL;
	tipc_link_purge_backlog(l);
	l_ptr->rcv_unacked = 0;
	kfree_skb(l->reasm_buf);
	l_ptr->snd_nxt = 1;
	kfree_skb(l->failover_reasm_skb);
	l_ptr->rcv_nxt = 1;
	l->reasm_buf = NULL;
	l_ptr->silent_intv_cnt = 0;
	l->failover_reasm_skb = NULL;
	l_ptr->stats.recv_info = 0;
	l->rcv_unacked = 0;
	l_ptr->stale_count = 0;
	l->snd_nxt = 1;
	link_reset_statistics(l_ptr);
	l->rcv_nxt = 1;
	l->silent_intv_cnt = 0;
	l->stats.recv_info = 0;
	l->stale_count = 0;
	link_reset_statistics(l);
}
}


/**
/**
@@ -751,20 +724,6 @@ int tipc_link_xmit(struct tipc_link *l, struct sk_buff_head *list,
	return 0;
	return 0;
}
}


static void skb2list(struct sk_buff *skb, struct sk_buff_head *list)
{
	skb_queue_head_init(list);
	__skb_queue_tail(list, skb);
}

static int __tipc_link_xmit_skb(struct tipc_link *link, struct sk_buff *skb)
{
	struct sk_buff_head head;

	skb2list(skb, &head);
	return __tipc_link_xmit(link->owner->net, link, &head);
}

/*
/*
 * tipc_link_sync_rcv - synchronize broadcast link endpoints.
 * tipc_link_sync_rcv - synchronize broadcast link endpoints.
 * Receive the sequence number where we should start receiving and
 * Receive the sequence number where we should start receiving and
@@ -955,32 +914,6 @@ static int tipc_link_retransm(struct tipc_link *l, int retransm,
	return 0;
	return 0;
}
}


/* link_synch(): check if all packets arrived before the synch
 *               point have been consumed
 * Returns true if the parallel links are synched, otherwise false
 */
static bool link_synch(struct tipc_link *l)
{
	unsigned int post_synch;
	struct tipc_link *pl;

	pl  = tipc_parallel_link(l);
	if (pl == l)
		goto synched;

	/* Was last pre-synch packet added to input queue ? */
	if (less_eq(pl->rcv_nxt, l->synch_point))
		return false;

	/* Is it still in the input queue ? */
	post_synch = mod(pl->rcv_nxt - l->synch_point) - 1;
	if (skb_queue_len(pl->inputq) > post_synch)
		return false;
synched:
	l->exec_mode = TIPC_LINK_OPEN;
	return true;
}

/* tipc_data_input - deliver data and name distr msgs to upper layer
/* tipc_data_input - deliver data and name distr msgs to upper layer
 *
 *
 * Consumes buffer if message is of right type
 * Consumes buffer if message is of right type
@@ -1025,54 +958,59 @@ static bool tipc_data_input(struct tipc_link *link, struct sk_buff *skb)
/* tipc_link_input - process packet that has passed link protocol check
/* tipc_link_input - process packet that has passed link protocol check
 *
 *
 * Consumes buffer
 * Consumes buffer
 * Node lock must be held
 */
 */
static int tipc_link_input(struct tipc_link *link, struct sk_buff *skb)
static int tipc_link_input(struct tipc_link *l, struct sk_buff *skb)
{
{
	struct tipc_node *node = link->owner;
	struct tipc_node *node = l->owner;
	struct tipc_msg *msg = buf_msg(skb);
	struct tipc_msg *hdr = buf_msg(skb);
	struct sk_buff **reasm_skb = &l->reasm_buf;
	struct sk_buff *iskb;
	struct sk_buff *iskb;
	int pos = 0;
	int usr = msg_user(hdr);
	int rc = 0;
	int rc = 0;
	int pos = 0;
	int ipos = 0;


	switch (msg_user(msg)) {
	if (unlikely(usr == TUNNEL_PROTOCOL)) {
	case TUNNEL_PROTOCOL:
		if (msg_type(hdr) == SYNCH_MSG) {
		if (msg_dup(msg)) {
			__skb_queue_purge(&l->deferdq);
			link->exec_mode = TIPC_LINK_TUNNEL;
			goto drop;
			link->synch_point = msg_seqno(msg_get_wrapped(msg));
			kfree_skb(skb);
			break;
		}
		}
		rc |= tipc_link_failover_rcv(link, &skb);
		if (!tipc_msg_extract(skb, &iskb, &ipos))
		if (!skb)
			return rc;
			break;
		kfree_skb(skb);
		if (msg_user(buf_msg(skb)) != MSG_BUNDLER) {
		skb = iskb;
			tipc_data_input(link, skb);
		hdr = buf_msg(skb);
			break;
		if (less(msg_seqno(hdr), l->drop_point))
			goto drop;
		if (tipc_data_input(l, skb))
			return rc;
		usr = msg_user(hdr);
		reasm_skb = &l->failover_reasm_skb;
	}
	}
	case MSG_BUNDLER:
		link->stats.recv_bundles++;
		link->stats.recv_bundled += msg_msgcnt(msg);


	if (usr == MSG_BUNDLER) {
		l->stats.recv_bundles++;
		l->stats.recv_bundled += msg_msgcnt(hdr);
		while (tipc_msg_extract(skb, &iskb, &pos))
		while (tipc_msg_extract(skb, &iskb, &pos))
			tipc_data_input(link, iskb);
			tipc_data_input(l, iskb);
		break;
		return rc;
	case MSG_FRAGMENTER:
	} else if (usr == MSG_FRAGMENTER) {
		link->stats.recv_fragments++;
		l->stats.recv_fragments++;
		if (tipc_buf_append(&link->reasm_buf, &skb)) {
		if (tipc_buf_append(reasm_skb, &skb)) {
			link->stats.recv_fragmented++;
			l->stats.recv_fragmented++;
			tipc_data_input(link, skb);
			tipc_data_input(l, skb);
		} else if (!link->reasm_buf) {
		} else if (!*reasm_skb) {
			link->exec_mode = TIPC_LINK_BLOCKED;
			l->exec_mode = TIPC_LINK_BLOCKED;
			rc |= TIPC_LINK_DOWN_EVT;
			l->state = TIPC_LINK_RESETTING;
			rc = TIPC_LINK_DOWN_EVT;
		}
		}
		break;
		return rc;
	case BCAST_PROTOCOL:
	} else if (usr == BCAST_PROTOCOL) {
		tipc_link_sync_rcv(node, skb);
		tipc_link_sync_rcv(node, skb);
		break;
		return rc;
	default:
	}
		break;
drop:
	};
	kfree_skb(skb);
	return rc;
	return rc;
}
}


@@ -1100,7 +1038,6 @@ int tipc_link_rcv(struct tipc_link *l, struct sk_buff *skb,
		  struct sk_buff_head *xmitq)
		  struct sk_buff_head *xmitq)
{
{
	struct sk_buff_head *arrvq = &l->deferdq;
	struct sk_buff_head *arrvq = &l->deferdq;
	struct sk_buff *tmp;
	struct tipc_msg *hdr;
	struct tipc_msg *hdr;
	u16 seqno, rcv_nxt;
	u16 seqno, rcv_nxt;
	int rc = 0;
	int rc = 0;
@@ -1112,18 +1049,18 @@ int tipc_link_rcv(struct tipc_link *l, struct sk_buff *skb,
		return rc;
		return rc;
	}
	}


	skb_queue_walk_safe(arrvq, skb, tmp) {
	while ((skb = skb_peek(arrvq))) {
		hdr = buf_msg(skb);
		hdr = buf_msg(skb);


		/* Verify and update link state */
		/* Verify and update link state */
		if (unlikely(msg_user(hdr) == LINK_PROTOCOL)) {
		if (unlikely(msg_user(hdr) == LINK_PROTOCOL)) {
			__skb_dequeue(arrvq);
			__skb_dequeue(arrvq);
			rc |= tipc_link_proto_rcv(l, skb, xmitq);
			rc = tipc_link_proto_rcv(l, skb, xmitq);
			continue;
			continue;
		}
		}


		if (unlikely(!link_working(l))) {
		if (unlikely(!link_working(l))) {
			rc |= tipc_link_fsm_evt(l, TRAFFIC_EVT, xmitq);
			rc = tipc_link_fsm_evt(l, TRAFFIC_EVT, xmitq);
			if (!link_working(l)) {
			if (!link_working(l)) {
				kfree_skb(__skb_dequeue(arrvq));
				kfree_skb(__skb_dequeue(arrvq));
				return rc;
				return rc;
@@ -1156,18 +1093,11 @@ int tipc_link_rcv(struct tipc_link *l, struct sk_buff *skb,
			return rc;
			return rc;
		}
		}


		/* Synchronize with parallel link if applicable */
		if (unlikely(l->exec_mode == TIPC_LINK_TUNNEL))
			if (!msg_dup(hdr) && !link_synch(l)) {
				kfree_skb(skb);
				return rc;
			}

		/* Packet can be delivered */
		/* Packet can be delivered */
		l->rcv_nxt++;
		l->rcv_nxt++;
		l->stats.recv_info++;
		l->stats.recv_info++;
		if (unlikely(!tipc_data_input(l, skb)))
		if (unlikely(!tipc_data_input(l, skb)))
			rc |= tipc_link_input(l, skb);
			rc = tipc_link_input(l, skb);


		/* Ack at regular intervals */
		/* Ack at regular intervals */
		if (unlikely(++l->rcv_unacked >= TIPC_MIN_LINK_WIN)) {
		if (unlikely(++l->rcv_unacked >= TIPC_MIN_LINK_WIN)) {
@@ -1288,7 +1218,7 @@ static void tipc_link_build_proto_msg(struct tipc_link *l, int mtyp, bool probe,
	} else {
	} else {
		/* RESET_MSG or ACTIVATE_MSG */
		/* RESET_MSG or ACTIVATE_MSG */
		msg_set_max_pkt(hdr, l->advertised_mtu);
		msg_set_max_pkt(hdr, l->advertised_mtu);
		msg_set_ack(hdr, l->failover_checkpt - 1);
		msg_set_ack(hdr, l->rcv_nxt - 1);
		msg_set_next_sent(hdr, 1);
		msg_set_next_sent(hdr, 1);
	}
	}
	skb = tipc_buf_acquire(msg_size(hdr));
	skb = tipc_buf_acquire(msg_size(hdr));
@@ -1296,223 +1226,75 @@ static void tipc_link_build_proto_msg(struct tipc_link *l, int mtyp, bool probe,
		return;
		return;
	skb_copy_to_linear_data(skb, hdr, msg_size(hdr));
	skb_copy_to_linear_data(skb, hdr, msg_size(hdr));
	skb->priority = TC_PRIO_CONTROL;
	skb->priority = TC_PRIO_CONTROL;
	__skb_queue_head(xmitq, skb);
	__skb_queue_tail(xmitq, skb);
}
}


/* tipc_link_tunnel_xmit(): Tunnel one packet via a link belonging to
/* tipc_link_tnl_prepare(): prepare and return a list of tunnel packets
 * a different bearer. Owner node is locked.
 * with contents of the link's tranmsit and backlog queues.
 */
 */
static void tipc_link_tunnel_xmit(struct tipc_link *l_ptr,
void tipc_link_tnl_prepare(struct tipc_link *l, struct tipc_link *tnl,
				  struct tipc_msg *tunnel_hdr,
			   int mtyp, struct sk_buff_head *xmitq)
				  struct tipc_msg *msg,
				  u32 selector)
{
{
	struct tipc_link *tunnel;
	struct sk_buff *skb, *tnlskb;
	struct sk_buff *skb;
	struct tipc_msg *hdr, tnlhdr;
	u32 length = msg_size(msg);
	struct sk_buff_head *queue = &l->transmq;
	struct sk_buff_head tmpxq, tnlq;
	u16 pktlen, pktcnt, seqno = l->snd_nxt;


	tunnel = node_active_link(l_ptr->owner, selector & 1);
	if (!tnl)
	if (!tipc_link_is_up(tunnel)) {
		pr_warn("%stunnel link no longer available\n", link_co_err);
		return;
		return;
	}
	msg_set_size(tunnel_hdr, length + INT_H_SIZE);
	skb = tipc_buf_acquire(length + INT_H_SIZE);
	if (!skb) {
		pr_warn("%sunable to send tunnel msg\n", link_co_err);
		return;
	}
	skb_copy_to_linear_data(skb, tunnel_hdr, INT_H_SIZE);
	skb_copy_to_linear_data_offset(skb, INT_H_SIZE, msg, length);
	__tipc_link_xmit_skb(tunnel, skb);
}


	skb_queue_head_init(&tnlq);
	skb_queue_head_init(&tmpxq);


/* tipc_link_failover_send_queue(): A link has gone down, but a second
	/* At least one packet required for safe algorithm => add dummy */
 * link is still active. We can do failover. Tunnel the failing link's
	skb = tipc_msg_create(TIPC_LOW_IMPORTANCE, TIPC_DIRECT_MSG,
 * whole send queue via the remaining link. This way, we don't lose
			      BASIC_H_SIZE, 0, l->addr, link_own_addr(l),
 * any packets, and sequence order is preserved for subsequent traffic
			      0, 0, TIPC_ERR_NO_PORT);
 * sent over the remaining link. Owner node is locked.
	if (!skb) {
 */
		pr_warn("%sunable to create tunnel packet\n", link_co_err);
void tipc_link_failover_send_queue(struct tipc_link *l_ptr)
{
	int msgcount;
	struct tipc_link *tunnel = node_active_link(l_ptr->owner, 0);
	struct tipc_msg tunnel_hdr;
	struct sk_buff *skb;
	int split_bundles;

	if (!tunnel)
		return;

	tipc_msg_init(link_own_addr(l_ptr), &tunnel_hdr, TUNNEL_PROTOCOL,
		      FAILOVER_MSG, INT_H_SIZE, l_ptr->addr);

	skb_queue_walk(&l_ptr->backlogq, skb) {
		msg_set_seqno(buf_msg(skb), l_ptr->snd_nxt);
		l_ptr->snd_nxt = mod(l_ptr->snd_nxt + 1);
	}
	skb_queue_splice_tail_init(&l_ptr->backlogq, &l_ptr->transmq);
	tipc_link_purge_backlog(l_ptr);
	msgcount = skb_queue_len(&l_ptr->transmq);
	msg_set_bearer_id(&tunnel_hdr, l_ptr->peer_bearer_id);
	msg_set_msgcnt(&tunnel_hdr, msgcount);

	if (skb_queue_empty(&l_ptr->transmq)) {
		skb = tipc_buf_acquire(INT_H_SIZE);
		if (skb) {
			skb_copy_to_linear_data(skb, &tunnel_hdr, INT_H_SIZE);
			msg_set_size(&tunnel_hdr, INT_H_SIZE);
			__tipc_link_xmit_skb(tunnel, skb);
		} else {
			pr_warn("%sunable to send changeover msg\n",
				link_co_err);
		}
		return;
		return;
	}
	}

	skb_queue_tail(&tnlq, skb);
	split_bundles = (node_active_link(l_ptr->owner, 0) !=
	tipc_link_xmit(l, &tnlq, &tmpxq);
			 node_active_link(l_ptr->owner, 0));
	__skb_queue_purge(&tmpxq);


	skb_queue_walk(&l_ptr->transmq, skb) {
	/* Initialize reusable tunnel packet header */
		struct tipc_msg *msg = buf_msg(skb);
	tipc_msg_init(link_own_addr(l), &tnlhdr, TUNNEL_PROTOCOL,

		      mtyp, INT_H_SIZE, l->addr);
		if ((msg_user(msg) == MSG_BUNDLER) && split_bundles) {
	pktcnt = skb_queue_len(&l->transmq) + skb_queue_len(&l->backlogq);
			struct tipc_msg *m = msg_get_wrapped(msg);
	msg_set_msgcnt(&tnlhdr, pktcnt);
			unchar *pos = (unchar *)m;
	msg_set_bearer_id(&tnlhdr, l->peer_bearer_id);

tnl:
			msgcount = msg_msgcnt(msg);
	/* Wrap each packet into a tunnel packet */
			while (msgcount--) {
				msg_set_seqno(m, msg_seqno(msg));
				tipc_link_tunnel_xmit(l_ptr, &tunnel_hdr, m,
						      msg_link_selector(m));
				pos += align(msg_size(m));
				m = (struct tipc_msg *)pos;
			}
		} else {
			tipc_link_tunnel_xmit(l_ptr, &tunnel_hdr, msg,
					      msg_link_selector(msg));
		}
	}
}

/* tipc_link_dup_queue_xmit(): A second link has become active. Tunnel a
 * duplicate of the first link's send queue via the new link. This way, we
 * are guaranteed that currently queued packets from a socket are delivered
 * before future traffic from the same socket, even if this is using the
 * new link. The last arriving copy of each duplicate packet is dropped at
 * the receiving end by the regular protocol check, so packet cardinality
 * and sequence order is preserved per sender/receiver socket pair.
 * Owner node is locked.
 */
void tipc_link_dup_queue_xmit(struct tipc_link *link,
			      struct tipc_link *tnl)
{
	struct sk_buff *skb;
	struct tipc_msg tnl_hdr;
	struct sk_buff_head *queue = &link->transmq;
	int mcnt;
	u16 seqno;

	tipc_msg_init(link_own_addr(link), &tnl_hdr, TUNNEL_PROTOCOL,
		      SYNCH_MSG, INT_H_SIZE, link->addr);
	mcnt = skb_queue_len(&link->transmq) + skb_queue_len(&link->backlogq);
	msg_set_msgcnt(&tnl_hdr, mcnt);
	msg_set_bearer_id(&tnl_hdr, link->peer_bearer_id);

tunnel_queue:
	skb_queue_walk(queue, skb) {
	skb_queue_walk(queue, skb) {
		struct sk_buff *outskb;
		hdr = buf_msg(skb);
		struct tipc_msg *msg = buf_msg(skb);
		if (queue == &l->backlogq)
		u32 len = msg_size(msg);
			msg_set_seqno(hdr, seqno++);

		pktlen = msg_size(hdr);
		msg_set_ack(msg, mod(link->rcv_nxt - 1));
		msg_set_size(&tnlhdr, pktlen + INT_H_SIZE);
		msg_set_bcast_ack(msg, link->owner->bclink.last_in);
		tnlskb = tipc_buf_acquire(pktlen + INT_H_SIZE);
		msg_set_size(&tnl_hdr, len + INT_H_SIZE);
		if (!tnlskb) {
		outskb = tipc_buf_acquire(len + INT_H_SIZE);
			pr_warn("%sunable to send packet\n", link_co_err);
		if (outskb == NULL) {
			pr_warn("%sunable to send duplicate msg\n",
				link_co_err);
			return;
		}
		skb_copy_to_linear_data(outskb, &tnl_hdr, INT_H_SIZE);
		skb_copy_to_linear_data_offset(outskb, INT_H_SIZE,
					       skb->data, len);
		__tipc_link_xmit_skb(tnl, outskb);
		if (!tipc_link_is_up(link))
			return;
			return;
		}
		}
	if (queue == &link->backlogq)
		skb_copy_to_linear_data(tnlskb, &tnlhdr, INT_H_SIZE);
		return;
		skb_copy_to_linear_data_offset(tnlskb, INT_H_SIZE, hdr, pktlen);
	seqno = link->snd_nxt;
		__skb_queue_tail(&tnlq, tnlskb);
	skb_queue_walk(&link->backlogq, skb) {
		msg_set_seqno(buf_msg(skb), seqno);
		seqno = mod(seqno + 1);
	}
	}
	queue = &link->backlogq;
	if (queue != &l->backlogq) {
	goto tunnel_queue;
		queue = &l->backlogq;
}
		goto tnl;

/*  tipc_link_failover_rcv(): Receive a tunnelled FAILOVER_MSG packet
 *  Owner node is locked.
 */
static int tipc_link_failover_rcv(struct tipc_link *link,
				  struct sk_buff **skb)
{
	struct tipc_msg *msg = buf_msg(*skb);
	struct sk_buff *iskb = NULL;
	struct tipc_link *pl = NULL;
	int bearer_id = msg_bearer_id(msg);
	int pos = 0;
	int rc = 0;

	if (msg_type(msg) != FAILOVER_MSG) {
		pr_warn("%sunknown tunnel pkt received\n", link_co_err);
		goto exit;
	}
	}
	if (bearer_id >= MAX_BEARERS)
		goto exit;


	if (bearer_id == link->bearer_id)
	tipc_link_xmit(tnl, &tnlq, xmitq);
		goto exit;

	pl = link->owner->links[bearer_id].link;

	if (link->failover_pkts == FIRST_FAILOVER)
		link->failover_pkts = msg_msgcnt(msg);

	/* Should we expect an inner packet? */
	if (!link->failover_pkts)
		goto exit;


	if (!tipc_msg_extract(*skb, &iskb, &pos)) {
	if (mtyp == FAILOVER_MSG) {
		pr_warn("%sno inner failover pkt\n", link_co_err);
		tnl->drop_point = l->rcv_nxt;
		*skb = NULL;
		tnl->failover_reasm_skb = l->reasm_buf;
		goto exit;
		l->reasm_buf = NULL;
	}
		l->exec_mode = TIPC_LINK_BLOCKED;
	link->failover_pkts--;
	*skb = NULL;

	/* Was this packet already delivered? */
	if (less(buf_seqno(iskb), link->failover_checkpt)) {
		kfree_skb(iskb);
		iskb = NULL;
		goto exit;
	}
	if (msg_user(buf_msg(iskb)) == MSG_FRAGMENTER) {
		link->stats.recv_fragments++;
		if (!tipc_buf_append(&link->failover_skb, &iskb) &&
		    !link->failover_skb) {
			link->exec_mode = TIPC_LINK_BLOCKED;
			rc |= TIPC_LINK_DOWN_EVT;
		}
	}
	}
exit:
	if (!link->failover_pkts && pl)
		pl->exec_mode = TIPC_LINK_OPEN;
	kfree_skb(*skb);
	*skb = iskb;
	return rc;
}
}


/* tipc_link_proto_rcv(): receive link level protocol message :
/* tipc_link_proto_rcv(): receive link level protocol message :
@@ -1593,7 +1375,7 @@ static int tipc_link_proto_rcv(struct tipc_link *l, struct sk_buff *skb,


		/* If NACK, retransmit will now start at right position */
		/* If NACK, retransmit will now start at right position */
		if (nacked_gap) {
		if (nacked_gap) {
			rc |= tipc_link_retransm(l, nacked_gap, xmitq);
			rc = tipc_link_retransm(l, nacked_gap, xmitq);
			l->stats.recv_nacks++;
			l->stats.recv_nacks++;
		}
		}
		tipc_link_advance_backlog(l, xmitq);
		tipc_link_advance_backlog(l, xmitq);
+6 −8
Original line number Original line Diff line number Diff line
@@ -164,13 +164,11 @@ struct tipc_link {
	struct tipc_msg *pmsg;
	struct tipc_msg *pmsg;
	u32 priority;
	u32 priority;
	char net_plane;
	char net_plane;
	u8 exec_mode;
	u16 synch_point;


	/* Failover */
	/* Failover/synch */
	u16 failover_pkts;
	u8 exec_mode;
	u16 failover_checkpt;
	u16 drop_point;
	struct sk_buff *failover_skb;
	struct sk_buff *failover_reasm_skb;


	/* Max packet negotiation */
	/* Max packet negotiation */
	u16 mtu;
	u16 mtu;
@@ -212,8 +210,8 @@ struct tipc_link *tipc_link_create(struct tipc_node *n,
				   const struct tipc_media_addr *maddr,
				   const struct tipc_media_addr *maddr,
				   struct sk_buff_head *inputq,
				   struct sk_buff_head *inputq,
				   struct sk_buff_head *namedq);
				   struct sk_buff_head *namedq);
void tipc_link_failover_send_queue(struct tipc_link *l_ptr);
void tipc_link_tnl_prepare(struct tipc_link *l, struct tipc_link *tnl,
void tipc_link_dup_queue_xmit(struct tipc_link *l_ptr, struct tipc_link *dest);
			   int mtyp, struct sk_buff_head *xmitq);
void tipc_link_reset_fragments(struct tipc_link *l_ptr);
void tipc_link_reset_fragments(struct tipc_link *l_ptr);
int tipc_link_is_up(struct tipc_link *l_ptr);
int tipc_link_is_up(struct tipc_link *l_ptr);
int tipc_link_is_active(struct tipc_link *l_ptr);
int tipc_link_is_active(struct tipc_link *l_ptr);
+11 −21
Original line number Original line Diff line number Diff line
@@ -110,7 +110,6 @@ struct tipc_skb_cb {
	struct sk_buff *tail;
	struct sk_buff *tail;
	bool validated;
	bool validated;
	bool wakeup_pending;
	bool wakeup_pending;
	bool bundling;
	u16 chain_sz;
	u16 chain_sz;
	u16 chain_imp;
	u16 chain_imp;
};
};
@@ -559,15 +558,6 @@ static inline void msg_set_node_capabilities(struct tipc_msg *m, u32 n)
	msg_set_bits(m, 1, 15, 0x1fff, n);
	msg_set_bits(m, 1, 15, 0x1fff, n);
}
}


static inline bool msg_dup(struct tipc_msg *m)
{
	if (likely(msg_user(m) != TUNNEL_PROTOCOL))
		return false;
	if (msg_type(m) != SYNCH_MSG)
		return false;
	return true;
}

/*
/*
 * Word 2
 * Word 2
 */
 */
@@ -621,12 +611,12 @@ static inline void msg_set_fragm_no(struct tipc_msg *m, u32 n)
}
}




static inline u32 msg_next_sent(struct tipc_msg *m)
static inline u16 msg_next_sent(struct tipc_msg *m)
{
{
	return msg_bits(m, 4, 0, 0xffff);
	return msg_bits(m, 4, 0, 0xffff);
}
}


static inline void msg_set_next_sent(struct tipc_msg *m, u32 n)
static inline void msg_set_next_sent(struct tipc_msg *m, u16 n)
{
{
	msg_set_bits(m, 4, 0, 0xffff, n);
	msg_set_bits(m, 4, 0, 0xffff, n);
}
}
@@ -727,12 +717,12 @@ static inline char *msg_media_addr(struct tipc_msg *m)
/*
/*
 * Word 9
 * Word 9
 */
 */
static inline u32 msg_msgcnt(struct tipc_msg *m)
static inline u16 msg_msgcnt(struct tipc_msg *m)
{
{
	return msg_bits(m, 9, 16, 0xffff);
	return msg_bits(m, 9, 16, 0xffff);
}
}


static inline void msg_set_msgcnt(struct tipc_msg *m, u32 n)
static inline void msg_set_msgcnt(struct tipc_msg *m, u16 n)
{
{
	msg_set_bits(m, 9, 16, 0xffff, n);
	msg_set_bits(m, 9, 16, 0xffff, n);
}
}
@@ -767,19 +757,19 @@ static inline void msg_set_link_tolerance(struct tipc_msg *m, u32 n)
	msg_set_bits(m, 9, 0, 0xffff, n);
	msg_set_bits(m, 9, 0, 0xffff, n);
}
}


static inline bool msg_is_traffic(struct tipc_msg *m)
static inline bool msg_peer_link_is_up(struct tipc_msg *m)
{
{
	if (likely(msg_user(m) != LINK_PROTOCOL))
	if (likely(msg_user(m) != LINK_PROTOCOL))
		return true;
		return true;
	if ((msg_type(m) == RESET_MSG) || (msg_type(m) == ACTIVATE_MSG))
	if (msg_type(m) == STATE_MSG)
		return false;
		return true;
		return true;
	return false;
}
}


static inline bool msg_peer_is_up(struct tipc_msg *m)
static inline bool msg_peer_node_is_up(struct tipc_msg *m)
{
{
	if (likely(msg_is_traffic(m)))
	if (msg_peer_link_is_up(m))
		return false;
		return true;
	return msg_redundant_link(m);
	return msg_redundant_link(m);
}
}


+190 −101

File changed.

Preview size limit exceeded, changes collapsed.

+3 −28
Original line number Original line Diff line number Diff line
@@ -47,33 +47,7 @@


#define INVALID_BEARER_ID -1
#define INVALID_BEARER_ID -1


/* Node FSM states and events:
 */
enum {
	SELF_DOWN_PEER_DOWN    = 0xdd,
	SELF_UP_PEER_UP        = 0xaa,
	SELF_DOWN_PEER_LEAVING = 0xd1,
	SELF_UP_PEER_COMING    = 0xac,
	SELF_COMING_PEER_UP    = 0xca,
	SELF_LEAVING_PEER_DOWN = 0x1d,
	NODE_FAILINGOVER       = 0xf0,
	NODE_SYNCHING          = 0xcc
};

enum {
	SELF_ESTABL_CONTACT_EVT = 0xece,
	SELF_LOST_CONTACT_EVT   = 0x1ce,
	PEER_ESTABL_CONTACT_EVT = 0xfece,
	PEER_LOST_CONTACT_EVT   = 0xf1ce,
	NODE_FAILOVER_BEGIN_EVT = 0xfbe,
	NODE_FAILOVER_END_EVT   = 0xfee,
	NODE_SYNCH_BEGIN_EVT    = 0xcbe,
	NODE_SYNCH_END_EVT      = 0xcee
};

/* Flags used to take different actions according to flag type
/* Flags used to take different actions according to flag type
 * TIPC_WAIT_PEER_LINKS_DOWN: wait to see that peer's links are down
 * TIPC_WAIT_OWN_LINKS_DOWN: wait until peer node is declared down
 * TIPC_NOTIFY_NODE_DOWN: notify node is down
 * TIPC_NOTIFY_NODE_DOWN: notify node is down
 * TIPC_NOTIFY_NODE_UP: notify node is up
 * TIPC_NOTIFY_NODE_UP: notify node is up
 * TIPC_DISTRIBUTE_NAME: publish or withdraw link state name type
 * TIPC_DISTRIBUTE_NAME: publish or withdraw link state name type
@@ -133,6 +107,8 @@ struct tipc_link_entry {
 * @links: array containing references to all links to node
 * @links: array containing references to all links to node
 * @action_flags: bit mask of different types of node actions
 * @action_flags: bit mask of different types of node actions
 * @bclink: broadcast-related info
 * @bclink: broadcast-related info
 * @state: connectivity state vs peer node
 * @sync_point: sequence number where synch/failover is finished
 * @list: links to adjacent nodes in sorted list of cluster's nodes
 * @list: links to adjacent nodes in sorted list of cluster's nodes
 * @working_links: number of working links to node (both active and standby)
 * @working_links: number of working links to node (both active and standby)
 * @link_cnt: number of links to node
 * @link_cnt: number of links to node
@@ -156,6 +132,7 @@ struct tipc_node {
	struct tipc_node_bclink bclink;
	struct tipc_node_bclink bclink;
	struct list_head list;
	struct list_head list;
	int state;
	int state;
	u16 sync_point;
	int link_cnt;
	int link_cnt;
	u16 working_links;
	u16 working_links;
	u16 capabilities;
	u16 capabilities;
@@ -180,8 +157,6 @@ bool tipc_node_update_dest(struct tipc_node *n, struct tipc_bearer *bearer,
void tipc_node_delete_links(struct net *net, int bearer_id);
void tipc_node_delete_links(struct net *net, int bearer_id);
void tipc_node_attach_link(struct tipc_node *n_ptr, struct tipc_link *l_ptr);
void tipc_node_attach_link(struct tipc_node *n_ptr, struct tipc_link *l_ptr);
void tipc_node_detach_link(struct tipc_node *n_ptr, struct tipc_link *l_ptr);
void tipc_node_detach_link(struct tipc_node *n_ptr, struct tipc_link *l_ptr);
void tipc_node_link_down(struct tipc_node *n_ptr, int bearer_id);
void tipc_node_link_up(struct tipc_node *n_ptr, int bearer_id);
bool tipc_node_is_up(struct tipc_node *n);
bool tipc_node_is_up(struct tipc_node *n);
int tipc_node_get_linkname(struct net *net, u32 bearer_id, u32 node,
int tipc_node_get_linkname(struct net *net, u32 bearer_id, u32 node,
			   char *linkname, size_t len);
			   char *linkname, size_t len);