Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit a3795208 authored by David S. Miller's avatar David S. Miller
Browse files

Merge branch 'tipc-next'



Jon Maloy says:

====================
tipc: some optimizations and impovements

The commits in this series contain some relatively simple changes that
lead to better throughput across TIPC connections. We also make changes
to the implementation of link transmission queueing and priority
handling, in order to make the code more comprehensible and maintainable.

v2: Commit #2: Redesigned tipc_msg_validate() to use pskb_may_pull(),
               as per feedback from David Miller.
    Commit #3: Some cosmetic changes to tipc_msg_extract(). I tried to
               replace the unconditional skb_linearize() with calls to
               pskb_may_pull() at selected locations, but I gave up.
               First, skb_trim() requires a fully linearized buffer.
               Second, it doesn't make much sense; the whole buffer
               will end up linearized, one way or another.
====================

Signed-off-by: default avatarDavid S. Miller <davem@davemloft.net>
parents 5f1764dd e3eea1eb
Loading
Loading
Loading
Loading
+24 −29
Original line number Diff line number Diff line
@@ -135,9 +135,10 @@ static void bclink_set_last_sent(struct net *net)
{
	struct tipc_net *tn = net_generic(net, tipc_net_id);
	struct tipc_link *bcl = tn->bcl;
	struct sk_buff *skb = skb_peek(&bcl->backlogq);

	if (bcl->next_out)
		bcl->fsm_msg_cnt = mod(buf_seqno(bcl->next_out) - 1);
	if (skb)
		bcl->fsm_msg_cnt = mod(buf_seqno(skb) - 1);
	else
		bcl->fsm_msg_cnt = mod(bcl->next_out_no - 1);
}
@@ -180,7 +181,7 @@ static void bclink_retransmit_pkt(struct tipc_net *tn, u32 after, u32 to)
	struct sk_buff *skb;
	struct tipc_link *bcl = tn->bcl;

	skb_queue_walk(&bcl->outqueue, skb) {
	skb_queue_walk(&bcl->transmq, skb) {
		if (more(buf_seqno(skb), after)) {
			tipc_link_retransmit(bcl, skb, mod(to - after));
			break;
@@ -210,14 +211,17 @@ void tipc_bclink_wakeup_users(struct net *net)
void tipc_bclink_acknowledge(struct tipc_node *n_ptr, u32 acked)
{
	struct sk_buff *skb, *tmp;
	struct sk_buff *next;
	unsigned int released = 0;
	struct net *net = n_ptr->net;
	struct tipc_net *tn = net_generic(net, tipc_net_id);

	if (unlikely(!n_ptr->bclink.recv_permitted))
		return;

	tipc_bclink_lock(net);

	/* Bail out if tx queue is empty (no clean up is required) */
	skb = skb_peek(&tn->bcl->outqueue);
	skb = skb_peek(&tn->bcl->transmq);
	if (!skb)
		goto exit;

@@ -244,27 +248,19 @@ void tipc_bclink_acknowledge(struct tipc_node *n_ptr, u32 acked)
	}

	/* Skip over packets that node has previously acknowledged */
	skb_queue_walk(&tn->bcl->outqueue, skb) {
	skb_queue_walk(&tn->bcl->transmq, skb) {
		if (more(buf_seqno(skb), n_ptr->bclink.acked))
			break;
	}

	/* Update packets that node is now acknowledging */
	skb_queue_walk_from_safe(&tn->bcl->outqueue, skb, tmp) {
	skb_queue_walk_from_safe(&tn->bcl->transmq, skb, tmp) {
		if (more(buf_seqno(skb), acked))
			break;

		next = tipc_skb_queue_next(&tn->bcl->outqueue, skb);
		if (skb != tn->bcl->next_out) {
		bcbuf_decr_acks(skb);
		} else {
			bcbuf_set_acks(skb, 0);
			tn->bcl->next_out = next;
		bclink_set_last_sent(net);
		}

		if (bcbuf_acks(skb) == 0) {
			__skb_unlink(skb, &tn->bcl->outqueue);
			__skb_unlink(skb, &tn->bcl->transmq);
			kfree_skb(skb);
			released = 1;
		}
@@ -272,7 +268,7 @@ void tipc_bclink_acknowledge(struct tipc_node *n_ptr, u32 acked)
	n_ptr->bclink.acked = acked;

	/* Try resolving broadcast link congestion, if necessary */
	if (unlikely(tn->bcl->next_out)) {
	if (unlikely(skb_peek(&tn->bcl->backlogq))) {
		tipc_link_push_packets(tn->bcl);
		bclink_set_last_sent(net);
	}
@@ -319,7 +315,7 @@ void tipc_bclink_update_link_state(struct tipc_node *n_ptr,
	buf = tipc_buf_acquire(INT_H_SIZE);
	if (buf) {
		struct tipc_msg *msg = buf_msg(buf);
		struct sk_buff *skb = skb_peek(&n_ptr->bclink.deferred_queue);
		struct sk_buff *skb = skb_peek(&n_ptr->bclink.deferdq);
		u32 to = skb ? buf_seqno(skb) - 1 : n_ptr->bclink.last_sent;

		tipc_msg_init(tn->own_addr, msg, BCAST_PROTOCOL, STATE_MSG,
@@ -387,14 +383,13 @@ int tipc_bclink_xmit(struct net *net, struct sk_buff_head *list)
		__skb_queue_purge(list);
		return -EHOSTUNREACH;
	}

	/* Broadcast to all nodes */
	if (likely(bclink)) {
		tipc_bclink_lock(net);
		if (likely(bclink->bcast_nodes.count)) {
			rc = __tipc_link_xmit(net, bcl, list);
			if (likely(!rc)) {
				u32 len = skb_queue_len(&bcl->outqueue);
				u32 len = skb_queue_len(&bcl->transmq);

				bclink_set_last_sent(net);
				bcl->stats.queue_sz_counts++;
@@ -559,25 +554,25 @@ void tipc_bclink_rcv(struct net *net, struct sk_buff *buf)
		if (node->bclink.last_in == node->bclink.last_sent)
			goto unlock;

		if (skb_queue_empty(&node->bclink.deferred_queue)) {
		if (skb_queue_empty(&node->bclink.deferdq)) {
			node->bclink.oos_state = 1;
			goto unlock;
		}

		msg = buf_msg(skb_peek(&node->bclink.deferred_queue));
		msg = buf_msg(skb_peek(&node->bclink.deferdq));
		seqno = msg_seqno(msg);
		next_in = mod(next_in + 1);
		if (seqno != next_in)
			goto unlock;

		/* Take in-sequence message from deferred queue & deliver it */
		buf = __skb_dequeue(&node->bclink.deferred_queue);
		buf = __skb_dequeue(&node->bclink.deferdq);
		goto receive;
	}

	/* Handle out-of-sequence broadcast message */
	if (less(next_in, seqno)) {
		deferred = tipc_link_defer_pkt(&node->bclink.deferred_queue,
		deferred = tipc_link_defer_pkt(&node->bclink.deferdq,
					       buf);
		bclink_update_last_sent(node, seqno);
		buf = NULL;
@@ -634,7 +629,6 @@ static int tipc_bcbearer_send(struct net *net, struct sk_buff *buf,
		msg_set_non_seq(msg, 1);
		msg_set_mc_netid(msg, tn->net_id);
		tn->bcl->stats.sent_info++;

		if (WARN_ON(!bclink->bcast_nodes.count)) {
			dump_stack();
			return 0;
@@ -913,8 +907,9 @@ int tipc_bclink_init(struct net *net)
	sprintf(bcbearer->media.name, "tipc-broadcast");

	spin_lock_init(&bclink->lock);
	__skb_queue_head_init(&bcl->outqueue);
	__skb_queue_head_init(&bcl->deferred_queue);
	__skb_queue_head_init(&bcl->transmq);
	__skb_queue_head_init(&bcl->backlogq);
	__skb_queue_head_init(&bcl->deferdq);
	skb_queue_head_init(&bcl->wakeupq);
	bcl->next_out_no = 1;
	spin_lock_init(&bclink->node.lock);
+3 −0
Original line number Diff line number Diff line
@@ -89,6 +89,7 @@ static void tipc_disc_init_msg(struct net *net, struct sk_buff *buf, u32 type,
		      MAX_H_SIZE, dest_domain);
	msg_set_non_seq(msg, 1);
	msg_set_node_sig(msg, tn->random);
	msg_set_node_capabilities(msg, 0);
	msg_set_dest_domain(msg, dest_domain);
	msg_set_bc_netid(msg, tn->net_id);
	b_ptr->media->addr2msg(msg_media_addr(msg), &b_ptr->addr);
@@ -133,6 +134,7 @@ void tipc_disc_rcv(struct net *net, struct sk_buff *buf,
	u32 net_id = msg_bc_netid(msg);
	u32 mtyp = msg_type(msg);
	u32 signature = msg_node_sig(msg);
	u16 caps = msg_node_capabilities(msg);
	bool addr_match = false;
	bool sign_match = false;
	bool link_up = false;
@@ -167,6 +169,7 @@ void tipc_disc_rcv(struct net *net, struct sk_buff *buf,
	if (!node)
		return;
	tipc_node_lock(node);
	node->capabilities = caps;
	link = node->links[bearer->identity];

	/* Prepare to validate requesting node's signature and media address */
+126 −226
Original line number Diff line number Diff line
/*
 * net/tipc/link.c: TIPC link code
 *
 * Copyright (c) 1996-2007, 2012-2014, Ericsson AB
 * Copyright (c) 1996-2007, 2012-2015, Ericsson AB
 * Copyright (c) 2004-2007, 2010-2013, Wind River Systems
 * All rights reserved.
 *
@@ -35,6 +35,7 @@
 */

#include "core.h"
#include "subscr.h"
#include "link.h"
#include "bcast.h"
#include "socket.h"
@@ -194,10 +195,10 @@ static void link_timeout(unsigned long data)
	tipc_node_lock(l_ptr->owner);

	/* update counters used in statistical profiling of send traffic */
	l_ptr->stats.accu_queue_sz += skb_queue_len(&l_ptr->outqueue);
	l_ptr->stats.accu_queue_sz += skb_queue_len(&l_ptr->transmq);
	l_ptr->stats.queue_sz_counts++;

	skb = skb_peek(&l_ptr->outqueue);
	skb = skb_peek(&l_ptr->transmq);
	if (skb) {
		struct tipc_msg *msg = buf_msg(skb);
		u32 length = msg_size(msg);
@@ -229,7 +230,7 @@ static void link_timeout(unsigned long data)
	/* do all other link processing performed on a periodic basis */
	link_state_event(l_ptr, TIMEOUT_EVT);

	if (l_ptr->next_out)
	if (skb_queue_len(&l_ptr->backlogq))
		tipc_link_push_packets(l_ptr);

	tipc_node_unlock(l_ptr->owner);
@@ -305,16 +306,15 @@ struct tipc_link *tipc_link_create(struct tipc_node *n_ptr,
	msg_set_session(msg, (tn->random & 0xffff));
	msg_set_bearer_id(msg, b_ptr->identity);
	strcpy((char *)msg_data(msg), if_name);

	l_ptr->priority = b_ptr->priority;
	tipc_link_set_queue_limits(l_ptr, b_ptr->window);

	l_ptr->net_plane = b_ptr->net_plane;
	link_init_max_pkt(l_ptr);
	l_ptr->priority = b_ptr->priority;
	tipc_link_set_queue_limits(l_ptr, b_ptr->window);

	l_ptr->next_out_no = 1;
	__skb_queue_head_init(&l_ptr->outqueue);
	__skb_queue_head_init(&l_ptr->deferred_queue);
	__skb_queue_head_init(&l_ptr->transmq);
	__skb_queue_head_init(&l_ptr->backlogq);
	__skb_queue_head_init(&l_ptr->deferdq);
	skb_queue_head_init(&l_ptr->wakeupq);
	skb_queue_head_init(&l_ptr->inputq);
	skb_queue_head_init(&l_ptr->namedq);
@@ -400,7 +400,7 @@ static bool link_schedule_user(struct tipc_link *link, u32 oport,
 */
void link_prepare_wakeup(struct tipc_link *link)
{
	uint pend_qsz = skb_queue_len(&link->outqueue);
	uint pend_qsz = skb_queue_len(&link->backlogq);
	struct sk_buff *skb, *tmp;

	skb_queue_walk_safe(&link->wakeupq, skb, tmp) {
@@ -430,8 +430,9 @@ void tipc_link_reset_fragments(struct tipc_link *l_ptr)
 */
void tipc_link_purge_queues(struct tipc_link *l_ptr)
{
	__skb_queue_purge(&l_ptr->deferred_queue);
	__skb_queue_purge(&l_ptr->outqueue);
	__skb_queue_purge(&l_ptr->deferdq);
	__skb_queue_purge(&l_ptr->transmq);
	__skb_queue_purge(&l_ptr->backlogq);
	tipc_link_reset_fragments(l_ptr);
}

@@ -464,15 +465,15 @@ void tipc_link_reset(struct tipc_link *l_ptr)
	}

	/* Clean up all queues, except inputq: */
	__skb_queue_purge(&l_ptr->outqueue);
	__skb_queue_purge(&l_ptr->deferred_queue);
	__skb_queue_purge(&l_ptr->transmq);
	__skb_queue_purge(&l_ptr->backlogq);
	__skb_queue_purge(&l_ptr->deferdq);
	if (!owner->inputq)
		owner->inputq = &l_ptr->inputq;
	skb_queue_splice_init(&l_ptr->wakeupq, owner->inputq);
	if (!skb_queue_empty(owner->inputq))
		owner->action_flags |= TIPC_MSG_EVT;
	l_ptr->next_out = NULL;
	l_ptr->unacked_window = 0;
	l_ptr->rcv_unacked = 0;
	l_ptr->checkpoint = 1;
	l_ptr->next_out_no = 1;
	l_ptr->fsm_msg_cnt = 0;
@@ -706,7 +707,7 @@ static int tipc_link_cong(struct tipc_link *link, struct sk_buff_head *list)
{
	struct sk_buff *skb = skb_peek(list);
	struct tipc_msg *msg = buf_msg(skb);
	uint imp = tipc_msg_tot_importance(msg);
	int imp = msg_importance(msg);
	u32 oport = msg_tot_origport(msg);

	if (unlikely(imp > TIPC_CRITICAL_IMPORTANCE)) {
@@ -742,54 +743,51 @@ int __tipc_link_xmit(struct net *net, struct tipc_link *link,
		     struct sk_buff_head *list)
{
	struct tipc_msg *msg = buf_msg(skb_peek(list));
	uint psz = msg_size(msg);
	uint sndlim = link->queue_limit[0];
	uint imp = tipc_msg_tot_importance(msg);
	unsigned int maxwin = link->window;
	unsigned int imp = msg_importance(msg);
	uint mtu = link->max_pkt;
	uint ack = mod(link->next_in_no - 1);
	uint seqno = link->next_out_no;
	uint bc_last_in = link->owner->bclink.last_in;
	struct tipc_media_addr *addr = &link->media_addr;
	struct sk_buff_head *outqueue = &link->outqueue;
	struct sk_buff_head *transmq = &link->transmq;
	struct sk_buff_head *backlogq = &link->backlogq;
	struct sk_buff *skb, *tmp;

	/* Match queue limits against msg importance: */
	if (unlikely(skb_queue_len(outqueue) >= link->queue_limit[imp]))
	/* Match queue limit against msg importance: */
	if (unlikely(skb_queue_len(backlogq) >= link->queue_limit[imp]))
		return tipc_link_cong(link, list);

	/* Has valid packet limit been used ? */
	if (unlikely(psz > mtu)) {
	if (unlikely(msg_size(msg) > mtu)) {
		__skb_queue_purge(list);
		return -EMSGSIZE;
	}

	/* Prepare each packet for sending, and add to outqueue: */
	/* Prepare each packet for sending, and add to relevant queue: */
	skb_queue_walk_safe(list, skb, tmp) {
		__skb_unlink(skb, list);
		msg = buf_msg(skb);
		msg_set_word(msg, 2, ((ack << 16) | mod(seqno)));
		msg_set_seqno(msg, seqno);
		msg_set_ack(msg, ack);
		msg_set_bcast_ack(msg, bc_last_in);

		if (skb_queue_len(outqueue) < sndlim) {
			__skb_queue_tail(outqueue, skb);
			tipc_bearer_send(net, link->bearer_id,
					 skb, addr);
			link->next_out = NULL;
			link->unacked_window = 0;
		} else if (tipc_msg_bundle(outqueue, skb, mtu)) {
		if (likely(skb_queue_len(transmq) < maxwin)) {
			__skb_queue_tail(transmq, skb);
			tipc_bearer_send(net, link->bearer_id, skb, addr);
			link->rcv_unacked = 0;
			seqno++;
			continue;
		}
		if (tipc_msg_bundle(skb_peek_tail(backlogq), skb, mtu)) {
			link->stats.sent_bundled++;
			continue;
		} else if (tipc_msg_make_bundle(outqueue, skb, mtu,
						link->addr)) {
		}
		if (tipc_msg_make_bundle(&skb, mtu, link->addr)) {
			link->stats.sent_bundled++;
			link->stats.sent_bundles++;
			if (!link->next_out)
				link->next_out = skb_peek_tail(outqueue);
		} else {
			__skb_queue_tail(outqueue, skb);
			if (!link->next_out)
				link->next_out = skb;
		}
		__skb_queue_tail(backlogq, skb);
		seqno++;
	}
	link->next_out_no = seqno;
@@ -895,14 +893,6 @@ static void tipc_link_sync_rcv(struct tipc_node *n, struct sk_buff *buf)
	kfree_skb(buf);
}

struct sk_buff *tipc_skb_queue_next(const struct sk_buff_head *list,
				    const struct sk_buff *skb)
{
	if (skb_queue_is_last(list, skb))
		return NULL;
	return skb->next;
}

/*
 * tipc_link_push_packets - push unsent packets to bearer
 *
@@ -911,30 +901,23 @@ struct sk_buff *tipc_skb_queue_next(const struct sk_buff_head *list,
 *
 * Called with node locked
 */
void tipc_link_push_packets(struct tipc_link *l_ptr)
void tipc_link_push_packets(struct tipc_link *link)
{
	struct sk_buff_head *outqueue = &l_ptr->outqueue;
	struct sk_buff *skb = l_ptr->next_out;
	struct sk_buff *skb;
	struct tipc_msg *msg;
	u32 next, first;

	skb_queue_walk_from(outqueue, skb) {
		msg = buf_msg(skb);
		next = msg_seqno(msg);
		first = buf_seqno(skb_peek(outqueue));
	unsigned int ack = mod(link->next_in_no - 1);

		if (mod(next - first) < l_ptr->queue_limit[0]) {
			msg_set_ack(msg, mod(l_ptr->next_in_no - 1));
			msg_set_bcast_ack(msg, l_ptr->owner->bclink.last_in);
			if (msg_user(msg) == MSG_BUNDLER)
				TIPC_SKB_CB(skb)->bundling = false;
			tipc_bearer_send(l_ptr->owner->net,
					 l_ptr->bearer_id, skb,
					 &l_ptr->media_addr);
			l_ptr->next_out = tipc_skb_queue_next(outqueue, skb);
		} else {
	while (skb_queue_len(&link->transmq) < link->window) {
		skb = __skb_dequeue(&link->backlogq);
		if (!skb)
			break;
		}
		msg = buf_msg(skb);
		msg_set_ack(msg, ack);
		msg_set_bcast_ack(msg, link->owner->bclink.last_in);
		link->rcv_unacked = 0;
		__skb_queue_tail(&link->transmq, skb);
		tipc_bearer_send(link->owner->net, link->bearer_id,
				 skb, &link->media_addr);
	}
}

@@ -1021,8 +1004,8 @@ void tipc_link_retransmit(struct tipc_link *l_ptr, struct sk_buff *skb,
		l_ptr->stale_count = 1;
	}

	skb_queue_walk_from(&l_ptr->outqueue, skb) {
		if (!retransmits || skb == l_ptr->next_out)
	skb_queue_walk_from(&l_ptr->transmq, skb) {
		if (!retransmits)
			break;
		msg = buf_msg(skb);
		msg_set_ack(msg, mod(l_ptr->next_in_no - 1));
@@ -1039,67 +1022,12 @@ static void link_retrieve_defq(struct tipc_link *link,
{
	u32 seq_no;

	if (skb_queue_empty(&link->deferred_queue))
	if (skb_queue_empty(&link->deferdq))
		return;

	seq_no = buf_seqno(skb_peek(&link->deferred_queue));
	seq_no = buf_seqno(skb_peek(&link->deferdq));
	if (seq_no == mod(link->next_in_no))
		skb_queue_splice_tail_init(&link->deferred_queue, list);
}

/**
 * link_recv_buf_validate - validate basic format of received message
 *
 * This routine ensures a TIPC message has an acceptable header, and at least
 * as much data as the header indicates it should.  The routine also ensures
 * that the entire message header is stored in the main fragment of the message
 * buffer, to simplify future access to message header fields.
 *
 * Note: Having extra info present in the message header or data areas is OK.
 * TIPC will ignore the excess, under the assumption that it is optional info
 * introduced by a later release of the protocol.
 */
static int link_recv_buf_validate(struct sk_buff *buf)
{
	static u32 min_data_hdr_size[8] = {
		SHORT_H_SIZE, MCAST_H_SIZE, NAMED_H_SIZE, BASIC_H_SIZE,
		MAX_H_SIZE, MAX_H_SIZE, MAX_H_SIZE, MAX_H_SIZE
		};

	struct tipc_msg *msg;
	u32 tipc_hdr[2];
	u32 size;
	u32 hdr_size;
	u32 min_hdr_size;

	/* If this packet comes from the defer queue, the skb has already
	 * been validated
	 */
	if (unlikely(TIPC_SKB_CB(buf)->deferred))
		return 1;

	if (unlikely(buf->len < MIN_H_SIZE))
		return 0;

	msg = skb_header_pointer(buf, 0, sizeof(tipc_hdr), tipc_hdr);
	if (msg == NULL)
		return 0;

	if (unlikely(msg_version(msg) != TIPC_VERSION))
		return 0;

	size = msg_size(msg);
	hdr_size = msg_hdr_sz(msg);
	min_hdr_size = msg_isdata(msg) ?
		min_data_hdr_size[msg_type(msg)] : INT_H_SIZE;

	if (unlikely((hdr_size < min_hdr_size) ||
		     (size < hdr_size) ||
		     (buf->len < size) ||
		     (size - hdr_size > TIPC_MAX_USER_MSG_SIZE)))
		return 0;

	return pskb_may_pull(buf, hdr_size);
		skb_queue_splice_tail_init(&link->deferdq, list);
}

/**
@@ -1127,16 +1055,11 @@ void tipc_rcv(struct net *net, struct sk_buff *skb, struct tipc_bearer *b_ptr)

	while ((skb = __skb_dequeue(&head))) {
		/* Ensure message is well-formed */
		if (unlikely(!link_recv_buf_validate(skb)))
			goto discard;

		/* Ensure message data is a single contiguous unit */
		if (unlikely(skb_linearize(skb)))
		if (unlikely(!tipc_msg_validate(skb)))
			goto discard;

		/* Handle arrival of a non-unicast link message */
		msg = buf_msg(skb);

		if (unlikely(msg_non_seq(msg))) {
			if (msg_user(msg) ==  LINK_CONFIG)
				tipc_disc_rcv(net, skb, b_ptr);
@@ -1177,21 +1100,20 @@ void tipc_rcv(struct net *net, struct sk_buff *skb, struct tipc_bearer *b_ptr)
		ackd = msg_ack(msg);

		/* Release acked messages */
		if (n_ptr->bclink.recv_permitted)
		if (unlikely(n_ptr->bclink.acked != msg_bcast_ack(msg)))
			tipc_bclink_acknowledge(n_ptr, msg_bcast_ack(msg));

		released = 0;
		skb_queue_walk_safe(&l_ptr->outqueue, skb1, tmp) {
			if (skb1 == l_ptr->next_out ||
			    more(buf_seqno(skb1), ackd))
		skb_queue_walk_safe(&l_ptr->transmq, skb1, tmp) {
			if (more(buf_seqno(skb1), ackd))
				break;
			 __skb_unlink(skb1, &l_ptr->outqueue);
			 __skb_unlink(skb1, &l_ptr->transmq);
			 kfree_skb(skb1);
			 released = 1;
		}

		/* Try sending any messages link endpoint has pending */
		if (unlikely(l_ptr->next_out))
		if (unlikely(skb_queue_len(&l_ptr->backlogq)))
			tipc_link_push_packets(l_ptr);

		if (released && !skb_queue_empty(&l_ptr->wakeupq))
@@ -1226,10 +1148,9 @@ void tipc_rcv(struct net *net, struct sk_buff *skb, struct tipc_bearer *b_ptr)
			goto unlock;
		}
		l_ptr->next_in_no++;
		if (unlikely(!skb_queue_empty(&l_ptr->deferred_queue)))
		if (unlikely(!skb_queue_empty(&l_ptr->deferdq)))
			link_retrieve_defq(l_ptr, &head);

		if (unlikely(++l_ptr->unacked_window >= TIPC_MIN_LINK_WIN)) {
		if (unlikely(++l_ptr->rcv_unacked >= TIPC_MIN_LINK_WIN)) {
			l_ptr->stats.sent_acks++;
			tipc_link_proto_xmit(l_ptr, STATE_MSG, 0, 0, 0, 0, 0);
		}
@@ -1396,10 +1317,9 @@ static void link_handle_out_of_seq_msg(struct tipc_link *l_ptr,
		return;
	}

	if (tipc_link_defer_pkt(&l_ptr->deferred_queue, buf)) {
	if (tipc_link_defer_pkt(&l_ptr->deferdq, buf)) {
		l_ptr->stats.deferred_recv++;
		TIPC_SKB_CB(buf)->deferred = true;
		if ((skb_queue_len(&l_ptr->deferred_queue) % 16) == 1)
		if ((skb_queue_len(&l_ptr->deferdq) % TIPC_MIN_LINK_WIN) == 1)
			tipc_link_proto_xmit(l_ptr, STATE_MSG, 0, 0, 0, 0, 0);
	} else {
		l_ptr->stats.duplicates++;
@@ -1436,11 +1356,11 @@ void tipc_link_proto_xmit(struct tipc_link *l_ptr, u32 msg_typ, int probe_msg,

		if (!tipc_link_is_up(l_ptr))
			return;
		if (l_ptr->next_out)
			next_sent = buf_seqno(l_ptr->next_out);
		if (skb_queue_len(&l_ptr->backlogq))
			next_sent = buf_seqno(skb_peek(&l_ptr->backlogq));
		msg_set_next_sent(msg, next_sent);
		if (!skb_queue_empty(&l_ptr->deferred_queue)) {
			u32 rec = buf_seqno(skb_peek(&l_ptr->deferred_queue));
		if (!skb_queue_empty(&l_ptr->deferdq)) {
			u32 rec = buf_seqno(skb_peek(&l_ptr->deferdq));
			gap = mod(rec - mod(l_ptr->next_in_no));
		}
		msg_set_seq_gap(msg, gap);
@@ -1492,10 +1412,9 @@ void tipc_link_proto_xmit(struct tipc_link *l_ptr, u32 msg_typ, int probe_msg,

	skb_copy_to_linear_data(buf, msg, sizeof(l_ptr->proto_msg));
	buf->priority = TC_PRIO_CONTROL;

	tipc_bearer_send(l_ptr->owner->net, l_ptr->bearer_id, buf,
			 &l_ptr->media_addr);
	l_ptr->unacked_window = 0;
	l_ptr->rcv_unacked = 0;
	kfree_skb(buf);
}

@@ -1630,7 +1549,7 @@ static void tipc_link_proto_rcv(struct tipc_link *l_ptr,
		}
		if (msg_seq_gap(msg)) {
			l_ptr->stats.recv_nacks++;
			tipc_link_retransmit(l_ptr, skb_peek(&l_ptr->outqueue),
			tipc_link_retransmit(l_ptr, skb_peek(&l_ptr->transmq),
					     msg_seq_gap(msg));
		}
		break;
@@ -1677,7 +1596,7 @@ static void tipc_link_tunnel_xmit(struct tipc_link *l_ptr,
 */
void tipc_link_failover_send_queue(struct tipc_link *l_ptr)
{
	u32 msgcount = skb_queue_len(&l_ptr->outqueue);
	int msgcount;
	struct tipc_link *tunnel = l_ptr->owner->active_links[0];
	struct tipc_msg tunnel_hdr;
	struct sk_buff *skb;
@@ -1688,10 +1607,12 @@ void tipc_link_failover_send_queue(struct tipc_link *l_ptr)

	tipc_msg_init(link_own_addr(l_ptr), &tunnel_hdr, CHANGEOVER_PROTOCOL,
		      ORIGINAL_MSG, INT_H_SIZE, l_ptr->addr);
	skb_queue_splice_tail_init(&l_ptr->backlogq, &l_ptr->transmq);
	msgcount = skb_queue_len(&l_ptr->transmq);
	msg_set_bearer_id(&tunnel_hdr, l_ptr->peer_bearer_id);
	msg_set_msgcnt(&tunnel_hdr, msgcount);

	if (skb_queue_empty(&l_ptr->outqueue)) {
	if (skb_queue_empty(&l_ptr->transmq)) {
		skb = tipc_buf_acquire(INT_H_SIZE);
		if (skb) {
			skb_copy_to_linear_data(skb, &tunnel_hdr, INT_H_SIZE);
@@ -1707,7 +1628,7 @@ void tipc_link_failover_send_queue(struct tipc_link *l_ptr)
	split_bundles = (l_ptr->owner->active_links[0] !=
			 l_ptr->owner->active_links[1]);

	skb_queue_walk(&l_ptr->outqueue, skb) {
	skb_queue_walk(&l_ptr->transmq, skb) {
		struct tipc_msg *msg = buf_msg(skb);

		if ((msg_user(msg) == MSG_BUNDLER) && split_bundles) {
@@ -1738,80 +1659,66 @@ void tipc_link_failover_send_queue(struct tipc_link *l_ptr)
 * and sequence order is preserved per sender/receiver socket pair.
 * Owner node is locked.
 */
void tipc_link_dup_queue_xmit(struct tipc_link *l_ptr,
			      struct tipc_link *tunnel)
void tipc_link_dup_queue_xmit(struct tipc_link *link,
			      struct tipc_link *tnl)
{
	struct sk_buff *skb;
	struct tipc_msg tunnel_hdr;

	tipc_msg_init(link_own_addr(l_ptr), &tunnel_hdr, CHANGEOVER_PROTOCOL,
		      DUPLICATE_MSG, INT_H_SIZE, l_ptr->addr);
	msg_set_msgcnt(&tunnel_hdr, skb_queue_len(&l_ptr->outqueue));
	msg_set_bearer_id(&tunnel_hdr, l_ptr->peer_bearer_id);
	skb_queue_walk(&l_ptr->outqueue, skb) {
	struct tipc_msg tnl_hdr;
	struct sk_buff_head *queue = &link->transmq;
	int mcnt;

	tipc_msg_init(link_own_addr(link), &tnl_hdr, CHANGEOVER_PROTOCOL,
		      DUPLICATE_MSG, INT_H_SIZE, link->addr);
	mcnt = skb_queue_len(&link->transmq) + skb_queue_len(&link->backlogq);
	msg_set_msgcnt(&tnl_hdr, mcnt);
	msg_set_bearer_id(&tnl_hdr, link->peer_bearer_id);

tunnel_queue:
	skb_queue_walk(queue, skb) {
		struct sk_buff *outskb;
		struct tipc_msg *msg = buf_msg(skb);
		u32 length = msg_size(msg);
		u32 len = msg_size(msg);

		if (msg_user(msg) == MSG_BUNDLER)
			msg_set_type(msg, CLOSED_MSG);
		msg_set_ack(msg, mod(l_ptr->next_in_no - 1));	/* Update */
		msg_set_bcast_ack(msg, l_ptr->owner->bclink.last_in);
		msg_set_size(&tunnel_hdr, length + INT_H_SIZE);
		outskb = tipc_buf_acquire(length + INT_H_SIZE);
		msg_set_ack(msg, mod(link->next_in_no - 1));
		msg_set_bcast_ack(msg, link->owner->bclink.last_in);
		msg_set_size(&tnl_hdr, len + INT_H_SIZE);
		outskb = tipc_buf_acquire(len + INT_H_SIZE);
		if (outskb == NULL) {
			pr_warn("%sunable to send duplicate msg\n",
				link_co_err);
			return;
		}
		skb_copy_to_linear_data(outskb, &tunnel_hdr, INT_H_SIZE);
		skb_copy_to_linear_data_offset(outskb, INT_H_SIZE, skb->data,
					       length);
		__tipc_link_xmit_skb(tunnel, outskb);
		if (!tipc_link_is_up(l_ptr))
		skb_copy_to_linear_data(outskb, &tnl_hdr, INT_H_SIZE);
		skb_copy_to_linear_data_offset(outskb, INT_H_SIZE,
					       skb->data, len);
		__tipc_link_xmit_skb(tnl, outskb);
		if (!tipc_link_is_up(link))
			return;
	}
}

/**
 * buf_extract - extracts embedded TIPC message from another message
 * @skb: encapsulating message buffer
 * @from_pos: offset to extract from
 *
 * Returns a new message buffer containing an embedded message.  The
 * encapsulating buffer is left unchanged.
 */
static struct sk_buff *buf_extract(struct sk_buff *skb, u32 from_pos)
{
	struct tipc_msg *msg = (struct tipc_msg *)(skb->data + from_pos);
	u32 size = msg_size(msg);
	struct sk_buff *eb;

	eb = tipc_buf_acquire(size);
	if (eb)
		skb_copy_to_linear_data(eb, msg, size);
	return eb;
	if (queue == &link->backlogq)
		return;
	queue = &link->backlogq;
	goto tunnel_queue;
}

/* tipc_link_dup_rcv(): Receive a tunnelled DUPLICATE_MSG packet.
 * Owner node is locked.
 */
static void tipc_link_dup_rcv(struct tipc_link *l_ptr,
			      struct sk_buff *t_buf)
static void tipc_link_dup_rcv(struct tipc_link *link,
			      struct sk_buff *skb)
{
	struct sk_buff *buf;
	struct sk_buff *iskb;
	int pos = 0;

	if (!tipc_link_is_up(l_ptr))
	if (!tipc_link_is_up(link))
		return;

	buf = buf_extract(t_buf, INT_H_SIZE);
	if (buf == NULL) {
	if (!tipc_msg_extract(skb, &iskb, &pos)) {
		pr_warn("%sfailed to extract inner dup pkt\n", link_co_err);
		return;
	}

	/* Add buffer to deferred queue, if applicable: */
	link_handle_out_of_seq_msg(l_ptr, buf);
	/* Append buffer to deferred queue, if applicable: */
	link_handle_out_of_seq_msg(link, iskb);
}

/*  tipc_link_failover_rcv(): Receive a tunnelled ORIGINAL_MSG packet
@@ -1823,6 +1730,7 @@ static struct sk_buff *tipc_link_failover_rcv(struct tipc_link *l_ptr,
	struct tipc_msg *t_msg = buf_msg(t_buf);
	struct sk_buff *buf = NULL;
	struct tipc_msg *msg;
	int pos = 0;

	if (tipc_link_is_up(l_ptr))
		tipc_link_reset(l_ptr);
@@ -1834,8 +1742,7 @@ static struct sk_buff *tipc_link_failover_rcv(struct tipc_link *l_ptr,
	/* Should there be an inner packet? */
	if (l_ptr->exp_msg_count) {
		l_ptr->exp_msg_count--;
		buf = buf_extract(t_buf, INT_H_SIZE);
		if (buf == NULL) {
		if (!tipc_msg_extract(t_buf, &buf, &pos)) {
			pr_warn("%sno inner failover pkt\n", link_co_err);
			goto exit;
		}
@@ -1903,23 +1810,16 @@ static void link_set_supervision_props(struct tipc_link *l_ptr, u32 tol)
	l_ptr->abort_limit = tol / (jiffies_to_msecs(l_ptr->cont_intv) / 4);
}

void tipc_link_set_queue_limits(struct tipc_link *l_ptr, u32 window)
void tipc_link_set_queue_limits(struct tipc_link *l, u32 win)
{
	/* Data messages from this node, inclusive FIRST_FRAGM */
	l_ptr->queue_limit[TIPC_LOW_IMPORTANCE] = window;
	l_ptr->queue_limit[TIPC_MEDIUM_IMPORTANCE] = (window / 3) * 4;
	l_ptr->queue_limit[TIPC_HIGH_IMPORTANCE] = (window / 3) * 5;
	l_ptr->queue_limit[TIPC_CRITICAL_IMPORTANCE] = (window / 3) * 6;
	/* Transiting data messages,inclusive FIRST_FRAGM */
	l_ptr->queue_limit[TIPC_LOW_IMPORTANCE + 4] = 300;
	l_ptr->queue_limit[TIPC_MEDIUM_IMPORTANCE + 4] = 600;
	l_ptr->queue_limit[TIPC_HIGH_IMPORTANCE + 4] = 900;
	l_ptr->queue_limit[TIPC_CRITICAL_IMPORTANCE + 4] = 1200;
	l_ptr->queue_limit[CONN_MANAGER] = 1200;
	l_ptr->queue_limit[CHANGEOVER_PROTOCOL] = 2500;
	l_ptr->queue_limit[NAME_DISTRIBUTOR] = 3000;
	/* FRAGMENT and LAST_FRAGMENT packets */
	l_ptr->queue_limit[MSG_FRAGMENTER] = 4000;
	int max_bulk = TIPC_MAX_PUBLICATIONS / (l->max_pkt / ITEM_SIZE);

	l->window = win;
	l->queue_limit[TIPC_LOW_IMPORTANCE]      = win / 2;
	l->queue_limit[TIPC_MEDIUM_IMPORTANCE]   = win;
	l->queue_limit[TIPC_HIGH_IMPORTANCE]     = win / 2 * 3;
	l->queue_limit[TIPC_CRITICAL_IMPORTANCE] = win * 2;
	l->queue_limit[TIPC_SYSTEM_IMPORTANCE]   = max_bulk;
}

/* tipc_link_find_owner - locate owner node of link by link's name
Loading