Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit af9b028e authored by Jon Paul Maloy's avatar Jon Paul Maloy Committed by David S. Miller
Browse files

tipc: make media xmit call outside node spinlock context



Currently, message sending is performed through a deep call chain,
where the node spinlock is grabbed and held during a significant
part of the transmission time. This is clearly detrimental to
overall throughput performance; it would be better if we could send
the message after the spinlock has been released.

In this commit, we do instead let the call revert on the stack after
the buffer chain has been added to the transmission queue, whereafter
clones of the buffers are transmitted to the device layer outside the
spinlock scope.

As a further step in our effort to separate the roles of the node
and link entities we also move the function tipc_link_xmit() to
node.c, and rename it to tipc_node_xmit().

Reviewed-by: default avatarYing Xue <ying.xue@windriver.com>
Signed-off-by: default avatarJon Maloy <jon.maloy@ericsson.com>
Signed-off-by: default avatarDavid S. Miller <davem@davemloft.net>
parent 22d85c79
Loading
Loading
Loading
Loading
+26 −0
Original line number Diff line number Diff line
@@ -470,6 +470,32 @@ void tipc_bearer_send(struct net *net, u32 bearer_id, struct sk_buff *buf,
	rcu_read_unlock();
}

/* tipc_bearer_xmit() -send buffer to destination over bearer
 */
void tipc_bearer_xmit(struct net *net, u32 bearer_id,
		      struct sk_buff_head *xmitq,
		      struct tipc_media_addr *dst)
{
	struct tipc_net *tn = net_generic(net, tipc_net_id);
	struct tipc_bearer *b;
	struct sk_buff *skb, *tmp;

	if (skb_queue_empty(xmitq))
		return;

	rcu_read_lock();
	b = rcu_dereference_rtnl(tn->bearer_list[bearer_id]);
	if (likely(b)) {
		skb_queue_walk_safe(xmitq, skb, tmp) {
			__skb_dequeue(xmitq);
			b->media->send_msg(net, skb, b, dst);
			/* Until we remove cloning in tipc_l2_send_msg(): */
			kfree_skb(skb);
		}
	}
	rcu_read_unlock();
}

/**
 * tipc_l2_rcv_msg - handle incoming TIPC message from an interface
 * @buf: the received packet
+3 −0
Original line number Diff line number Diff line
@@ -217,5 +217,8 @@ void tipc_bearer_cleanup(void);
void tipc_bearer_stop(struct net *net);
void tipc_bearer_send(struct net *net, u32 bearer_id, struct sk_buff *buf,
		      struct tipc_media_addr *dest);
void tipc_bearer_xmit(struct net *net, u32 bearer_id,
		      struct sk_buff_head *xmitq,
		      struct tipc_media_addr *dst);

#endif	/* _TIPC_BEARER_H */
+72 −60
Original line number Diff line number Diff line
@@ -353,7 +353,6 @@ static int link_schedule_user(struct tipc_link *link, struct sk_buff_head *list)
	/* This really cannot happen...  */
	if (unlikely(imp > TIPC_CRITICAL_IMPORTANCE)) {
		pr_warn("%s<%s>, send queue full", link_rst_msg, link->name);
		tipc_link_reset(link);
		return -ENOBUFS;
	}
	/* Non-blocking sender: */
@@ -701,77 +700,90 @@ int __tipc_link_xmit(struct net *net, struct tipc_link *link,
	return 0;
}

static void skb2list(struct sk_buff *skb, struct sk_buff_head *list)
{
	skb_queue_head_init(list);
	__skb_queue_tail(list, skb);
}

static int __tipc_link_xmit_skb(struct tipc_link *link, struct sk_buff *skb)
/**
 * tipc_link_xmit(): enqueue buffer list according to queue situation
 * @link: link to use
 * @list: chain of buffers containing message
 * @xmitq: returned list of packets to be sent by caller
 *
 * Consumes the buffer chain, except when returning -ELINKCONG,
 * since the caller then may want to make more send attempts.
 * Returns 0 if success, or errno: -ELINKCONG, -EMSGSIZE or -ENOBUFS
 * Messages at TIPC_SYSTEM_IMPORTANCE are always accepted
 */
int tipc_link_xmit(struct tipc_link *l, struct sk_buff_head *list,
		   struct sk_buff_head *xmitq)
{
	struct sk_buff_head head;
	struct tipc_msg *hdr = buf_msg(skb_peek(list));
	unsigned int maxwin = l->window;
	unsigned int i, imp = msg_importance(hdr);
	unsigned int mtu = l->mtu;
	u16 ack = l->rcv_nxt - 1;
	u16 seqno = l->snd_nxt;
	u16 bc_last_in = l->owner->bclink.last_in;
	struct sk_buff_head *transmq = &l->transmq;
	struct sk_buff_head *backlogq = &l->backlogq;
	struct sk_buff *skb, *_skb, *bskb;

	skb2list(skb, &head);
	return __tipc_link_xmit(link->owner->net, link, &head);
	/* Match msg importance against this and all higher backlog limits: */
	for (i = imp; i <= TIPC_SYSTEM_IMPORTANCE; i++) {
		if (unlikely(l->backlog[i].len >= l->backlog[i].limit))
			return link_schedule_user(l, list);
	}
	if (unlikely(msg_size(hdr) > mtu))
		return -EMSGSIZE;

/* tipc_link_xmit_skb(): send single buffer to destination
 * Buffers sent via this functon are generally TIPC_SYSTEM_IMPORTANCE
 * messages, which will not cause link congestion
 * The only exception is datagram messages rerouted after secondary
 * lookup, which are rare and safe to dispose of anyway.
 * TODO: Return real return value, and let callers use
 * tipc_wait_for_sendpkt() where applicable
 */
int tipc_link_xmit_skb(struct net *net, struct sk_buff *skb, u32 dnode,
		       u32 selector)
{
	struct sk_buff_head head;
	int rc;
	/* Prepare each packet for sending, and add to relevant queue: */
	while (skb_queue_len(list)) {
		skb = skb_peek(list);
		hdr = buf_msg(skb);
		msg_set_seqno(hdr, seqno);
		msg_set_ack(hdr, ack);
		msg_set_bcast_ack(hdr, bc_last_in);

	skb2list(skb, &head);
	rc = tipc_link_xmit(net, &head, dnode, selector);
	if (rc)
		kfree_skb(skb);
		if (likely(skb_queue_len(transmq) < maxwin)) {
			_skb = skb_clone(skb, GFP_ATOMIC);
			if (!_skb)
				return -ENOBUFS;
			__skb_dequeue(list);
			__skb_queue_tail(transmq, skb);
			__skb_queue_tail(xmitq, _skb);
			l->rcv_unacked = 0;
			seqno++;
			continue;
		}
		if (tipc_msg_bundle(skb_peek_tail(backlogq), hdr, mtu)) {
			kfree_skb(__skb_dequeue(list));
			l->stats.sent_bundled++;
			continue;
		}
		if (tipc_msg_make_bundle(&bskb, hdr, mtu, l->addr)) {
			kfree_skb(__skb_dequeue(list));
			__skb_queue_tail(backlogq, bskb);
			l->backlog[msg_importance(buf_msg(bskb))].len++;
			l->stats.sent_bundled++;
			l->stats.sent_bundles++;
			continue;
		}
		l->backlog[imp].len += skb_queue_len(list);
		skb_queue_splice_tail_init(list, backlogq);
	}
	l->snd_nxt = seqno;
	return 0;
}

/**
 * tipc_link_xmit() is the general link level function for message sending
 * @net: the applicable net namespace
 * @list: chain of buffers containing message
 * @dsz: amount of user data to be sent
 * @dnode: address of destination node
 * @selector: a number used for deterministic link selection
 * Consumes the buffer chain, except when returning error
 * Returns 0 if success, otherwise errno: -ELINKCONG,-EHOSTUNREACH,-EMSGSIZE
 */
int tipc_link_xmit(struct net *net, struct sk_buff_head *list, u32 dnode,
		   u32 selector)
static void skb2list(struct sk_buff *skb, struct sk_buff_head *list)
{
	struct tipc_link *link = NULL;
	struct tipc_node *node;
	int rc = -EHOSTUNREACH;

	node = tipc_node_find(net, dnode);
	if (node) {
		tipc_node_lock(node);
		link = node_active_link(node, selector & 1);
		if (link)
			rc = __tipc_link_xmit(net, link, list);
		tipc_node_unlock(node);
		tipc_node_put(node);
	skb_queue_head_init(list);
	__skb_queue_tail(list, skb);
}
	if (link)
		return rc;

	if (likely(in_own_node(net, dnode))) {
		tipc_sk_rcv(net, list);
		return 0;
	}
static int __tipc_link_xmit_skb(struct tipc_link *link, struct sk_buff *skb)
{
	struct sk_buff_head head;

	__skb_queue_purge(list);
	return rc;
	skb2list(skb, &head);
	return __tipc_link_xmit(link->owner->net, link, &head);
}

/*
+2 −4
Original line number Diff line number Diff line
@@ -223,12 +223,10 @@ void tipc_link_purge_queues(struct tipc_link *l_ptr);
void tipc_link_purge_backlog(struct tipc_link *l);
void tipc_link_reset_all(struct tipc_node *node);
void tipc_link_reset(struct tipc_link *l_ptr);
int tipc_link_xmit_skb(struct net *net, struct sk_buff *skb, u32 dest,
		       u32 selector);
int tipc_link_xmit(struct net *net, struct sk_buff_head *list, u32 dest,
		   u32 selector);
int __tipc_link_xmit(struct net *net, struct tipc_link *link,
		     struct sk_buff_head *list);
int tipc_link_xmit(struct tipc_link *link,	struct sk_buff_head *list,
		   struct sk_buff_head *xmitq);
void tipc_link_proto_xmit(struct tipc_link *l_ptr, u32 msg_typ, int prob,
			  u32 gap, u32 tolerance, u32 priority);
void tipc_link_push_packets(struct tipc_link *l_ptr);
+2 −2
Original line number Diff line number Diff line
@@ -102,7 +102,7 @@ void named_cluster_distribute(struct net *net, struct sk_buff *skb)
		if (!oskb)
			break;
		msg_set_destnode(buf_msg(oskb), dnode);
		tipc_link_xmit_skb(net, oskb, dnode, dnode);
		tipc_node_xmit_skb(net, oskb, dnode, dnode);
	}
	rcu_read_unlock();

@@ -223,7 +223,7 @@ void tipc_named_node_up(struct net *net, u32 dnode)
			 &tn->nametbl->publ_list[TIPC_ZONE_SCOPE]);
	rcu_read_unlock();

	tipc_link_xmit(net, &head, dnode, dnode);
	tipc_node_xmit(net, &head, dnode, dnode);
}

static void tipc_publ_subscribe(struct net *net, struct publication *publ,
Loading