Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit a6ca1094 authored by Ying Xue's avatar Ying Xue Committed by David S. Miller
Browse files

tipc: use generic SKB list APIs to manage TIPC outgoing packet chains



Use standard SKB list APIs associated with struct sk_buff_head to
manage socket outgoing packet chain and name table outgoing packet
chain, having relevant code simpler and more readable.

Signed-off-by: default avatarYing Xue <ying.xue@windriver.com>
Reviewed-by: default avatarJon Maloy <jon.maloy@ericsson.com>
Signed-off-by: default avatarDavid S. Miller <davem@davemloft.net>
parent f03273f1
Loading
Loading
Loading
Loading
+10 −10
Original line number Diff line number Diff line
@@ -398,20 +398,20 @@ static void bclink_peek_nack(struct tipc_msg *msg)

/* tipc_bclink_xmit - broadcast buffer chain to all nodes in cluster
 *                    and to identified node local sockets
 * @buf: chain of buffers containing message
 * @list: chain of buffers containing message
 * Consumes the buffer chain, except when returning -ELINKCONG
 * Returns 0 if success, otherwise errno: -ELINKCONG,-EHOSTUNREACH,-EMSGSIZE
 */
int tipc_bclink_xmit(struct sk_buff *buf)
int tipc_bclink_xmit(struct sk_buff_head *list)
{
	int rc = 0;
	int bc = 0;
	struct sk_buff *clbuf;
	struct sk_buff *skb;

	/* Prepare clone of message for local node */
	clbuf = tipc_msg_reassemble(buf);
	if (unlikely(!clbuf)) {
		kfree_skb_list(buf);
	skb = tipc_msg_reassemble(list);
	if (unlikely(!skb)) {
		__skb_queue_purge(list);
		return -EHOSTUNREACH;
	}

@@ -419,7 +419,7 @@ int tipc_bclink_xmit(struct sk_buff *buf)
	if (likely(bclink)) {
		tipc_bclink_lock();
		if (likely(bclink->bcast_nodes.count)) {
			rc = __tipc_link_xmit(bcl, buf);
			rc = __tipc_link_xmit(bcl, list);
			if (likely(!rc)) {
				u32 len = skb_queue_len(&bcl->outqueue);

@@ -433,13 +433,13 @@ int tipc_bclink_xmit(struct sk_buff *buf)
	}

	if (unlikely(!bc))
		kfree_skb_list(buf);
		__skb_queue_purge(list);

	/* Deliver message clone */
	if (likely(!rc))
		tipc_sk_mcast_rcv(clbuf);
		tipc_sk_mcast_rcv(skb);
	else
		kfree_skb(clbuf);
		kfree_skb(skb);

	return rc;
}
+1 −1
Original line number Diff line number Diff line
@@ -100,7 +100,7 @@ int tipc_bclink_reset_stats(void);
int  tipc_bclink_set_queue_limits(u32 limit);
void tipc_bcbearer_sort(struct tipc_node_map *nm_ptr, u32 node, bool action);
uint  tipc_bclink_get_mtu(void);
int tipc_bclink_xmit(struct sk_buff *buf);
int tipc_bclink_xmit(struct sk_buff_head *list);
void tipc_bclink_wakeup_users(void);
int tipc_nl_add_bc_link(struct tipc_nl_msg *msg);

+62 −36
Original line number Diff line number Diff line
@@ -664,9 +664,10 @@ static void link_state_event(struct tipc_link *l_ptr, unsigned int event)
 * - For all other messages we discard the buffer and return -EHOSTUNREACH
 * - For TIPC internal messages we also reset the link
 */
static int tipc_link_cong(struct tipc_link *link, struct sk_buff *buf)
static int tipc_link_cong(struct tipc_link *link, struct sk_buff_head *list)
{
	struct tipc_msg *msg = buf_msg(buf);
	struct sk_buff *skb = skb_peek(list);
	struct tipc_msg *msg = buf_msg(skb);
	uint imp = tipc_msg_tot_importance(msg);
	u32 oport = msg_tot_origport(msg);

@@ -679,28 +680,29 @@ static int tipc_link_cong(struct tipc_link *link, struct sk_buff *buf)
		goto drop;
	if (unlikely(msg_reroute_cnt(msg)))
		goto drop;
	if (TIPC_SKB_CB(buf)->wakeup_pending)
	if (TIPC_SKB_CB(skb)->wakeup_pending)
		return -ELINKCONG;
	if (link_schedule_user(link, oport, TIPC_SKB_CB(buf)->chain_sz, imp))
	if (link_schedule_user(link, oport, skb_queue_len(list), imp))
		return -ELINKCONG;
drop:
	kfree_skb_list(buf);
	__skb_queue_purge(list);
	return -EHOSTUNREACH;
}

/**
 * __tipc_link_xmit(): same as tipc_link_xmit, but destlink is known & locked
 * @link: link to use
 * @skb: chain of buffers containing message
 * @list: chain of buffers containing message
 *
 * Consumes the buffer chain, except when returning -ELINKCONG
 * Returns 0 if success, otherwise errno: -ELINKCONG, -EMSGSIZE (plain socket
 * user data messages) or -EHOSTUNREACH (all other messages/senders)
 * Only the socket functions tipc_send_stream() and tipc_send_packet() need
 * to act on the return value, since they may need to do more send attempts.
 */
int __tipc_link_xmit(struct tipc_link *link, struct sk_buff *skb)
int __tipc_link_xmit(struct tipc_link *link, struct sk_buff_head *list)
{
	struct tipc_msg *msg = buf_msg(skb);
	struct tipc_msg *msg = buf_msg(skb_peek(list));
	uint psz = msg_size(msg);
	uint sndlim = link->queue_limit[0];
	uint imp = tipc_msg_tot_importance(msg);
@@ -710,21 +712,21 @@ int __tipc_link_xmit(struct tipc_link *link, struct sk_buff *skb)
	uint bc_last_in = link->owner->bclink.last_in;
	struct tipc_media_addr *addr = &link->media_addr;
	struct sk_buff_head *outqueue = &link->outqueue;
	struct sk_buff *next;
	struct sk_buff *skb, *tmp;

	/* Match queue limits against msg importance: */
	if (unlikely(skb_queue_len(outqueue) >= link->queue_limit[imp]))
		return tipc_link_cong(link, skb);
		return tipc_link_cong(link, list);

	/* Has valid packet limit been used ? */
	if (unlikely(psz > mtu)) {
		kfree_skb_list(skb);
		__skb_queue_purge(list);
		return -EMSGSIZE;
	}

	/* Prepare each packet for sending, and add to outqueue: */
	while (skb) {
		next = skb->next;
	skb_queue_walk_safe(list, skb, tmp) {
		__skb_unlink(skb, list);
		msg = buf_msg(skb);
		msg_set_word(msg, 2, ((ack << 16) | mod(seqno)));
		msg_set_bcast_ack(msg, bc_last_in);
@@ -736,7 +738,6 @@ int __tipc_link_xmit(struct tipc_link *link, struct sk_buff *skb)
			link->unacked_window = 0;
		} else if (tipc_msg_bundle(outqueue, skb, mtu)) {
			link->stats.sent_bundled++;
			skb = next;
			continue;
		} else if (tipc_msg_make_bundle(outqueue, skb, mtu,
						link->addr)) {
@@ -750,22 +751,43 @@ int __tipc_link_xmit(struct tipc_link *link, struct sk_buff *skb)
				link->next_out = skb;
		}
		seqno++;
		skb = next;
	}
	link->next_out_no = seqno;
	return 0;
}

static void skb2list(struct sk_buff *skb, struct sk_buff_head *list)
{
	__skb_queue_head_init(list);
	__skb_queue_tail(list, skb);
}

static int __tipc_link_xmit_skb(struct tipc_link *link, struct sk_buff *skb)
{
	struct sk_buff_head head;

	skb2list(skb, &head);
	return __tipc_link_xmit(link, &head);
}

int tipc_link_xmit_skb(struct sk_buff *skb, u32 dnode, u32 selector)
{
	struct sk_buff_head head;

	skb2list(skb, &head);
	return tipc_link_xmit(&head, dnode, selector);
}

/**
 * tipc_link_xmit() is the general link level function for message sending
 * @buf: chain of buffers containing message
 * @list: chain of buffers containing message
 * @dsz: amount of user data to be sent
 * @dnode: address of destination node
 * @selector: a number used for deterministic link selection
 * Consumes the buffer chain, except when returning -ELINKCONG
 * Returns 0 if success, otherwise errno: -ELINKCONG,-EHOSTUNREACH,-EMSGSIZE
 */
int tipc_link_xmit(struct sk_buff *buf, u32 dnode, u32 selector)
int tipc_link_xmit(struct sk_buff_head *list, u32 dnode, u32 selector)
{
	struct tipc_link *link = NULL;
	struct tipc_node *node;
@@ -776,17 +798,22 @@ int tipc_link_xmit(struct sk_buff *buf, u32 dnode, u32 selector)
		tipc_node_lock(node);
		link = node->active_links[selector & 1];
		if (link)
			rc = __tipc_link_xmit(link, buf);
			rc = __tipc_link_xmit(link, list);
		tipc_node_unlock(node);
	}

	if (link)
		return rc;

	if (likely(in_own_node(dnode)))
		return tipc_sk_rcv(buf);
	if (likely(in_own_node(dnode))) {
		/* As a node local message chain never contains more than one
		 * buffer, we just need to dequeue one SKB buffer from the
		 * head list.
		 */
		return tipc_sk_rcv(__skb_dequeue(list));
	}
	__skb_queue_purge(list);

	kfree_skb_list(buf);
	return rc;
}

@@ -800,17 +827,17 @@ int tipc_link_xmit(struct sk_buff *buf, u32 dnode, u32 selector)
 */
static void tipc_link_sync_xmit(struct tipc_link *link)
{
	struct sk_buff *buf;
	struct sk_buff *skb;
	struct tipc_msg *msg;

	buf = tipc_buf_acquire(INT_H_SIZE);
	if (!buf)
	skb = tipc_buf_acquire(INT_H_SIZE);
	if (!skb)
		return;

	msg = buf_msg(buf);
	msg = buf_msg(skb);
	tipc_msg_init(msg, BCAST_PROTOCOL, STATE_MSG, INT_H_SIZE, link->addr);
	msg_set_last_bcast(msg, link->owner->bclink.acked);
	__tipc_link_xmit(link, buf);
	__tipc_link_xmit_skb(link, skb);
}

/*
@@ -1053,8 +1080,7 @@ void tipc_rcv(struct sk_buff *skb, struct tipc_bearer *b_ptr)
	u32 ackd;
	u32 released;

	__skb_queue_head_init(&head);
	__skb_queue_tail(&head, skb);
	skb2list(skb, &head);

	while ((skb = __skb_dequeue(&head))) {
		/* Ensure message is well-formed */
@@ -1573,7 +1599,7 @@ static void tipc_link_tunnel_xmit(struct tipc_link *l_ptr,
				  u32 selector)
{
	struct tipc_link *tunnel;
	struct sk_buff *buf;
	struct sk_buff *skb;
	u32 length = msg_size(msg);

	tunnel = l_ptr->owner->active_links[selector & 1];
@@ -1582,14 +1608,14 @@ static void tipc_link_tunnel_xmit(struct tipc_link *l_ptr,
		return;
	}
	msg_set_size(tunnel_hdr, length + INT_H_SIZE);
	buf = tipc_buf_acquire(length + INT_H_SIZE);
	if (!buf) {
	skb = tipc_buf_acquire(length + INT_H_SIZE);
	if (!skb) {
		pr_warn("%sunable to send tunnel msg\n", link_co_err);
		return;
	}
	skb_copy_to_linear_data(buf, tunnel_hdr, INT_H_SIZE);
	skb_copy_to_linear_data_offset(buf, INT_H_SIZE, msg, length);
	__tipc_link_xmit(tunnel, buf);
	skb_copy_to_linear_data(skb, tunnel_hdr, INT_H_SIZE);
	skb_copy_to_linear_data_offset(skb, INT_H_SIZE, msg, length);
	__tipc_link_xmit_skb(tunnel, skb);
}


@@ -1620,7 +1646,7 @@ void tipc_link_failover_send_queue(struct tipc_link *l_ptr)
		if (skb) {
			skb_copy_to_linear_data(skb, &tunnel_hdr, INT_H_SIZE);
			msg_set_size(&tunnel_hdr, INT_H_SIZE);
			__tipc_link_xmit(tunnel, skb);
			__tipc_link_xmit_skb(tunnel, skb);
		} else {
			pr_warn("%sunable to send changeover msg\n",
				link_co_err);
@@ -1691,7 +1717,7 @@ void tipc_link_dup_queue_xmit(struct tipc_link *l_ptr,
		skb_copy_to_linear_data(outskb, &tunnel_hdr, INT_H_SIZE);
		skb_copy_to_linear_data_offset(outskb, INT_H_SIZE, skb->data,
					       length);
		__tipc_link_xmit(tunnel, outskb);
		__tipc_link_xmit_skb(tunnel, outskb);
		if (!tipc_link_is_up(l_ptr))
			return;
	}
+3 −2
Original line number Diff line number Diff line
@@ -213,8 +213,9 @@ struct sk_buff *tipc_link_cmd_reset_stats(const void *req_tlv_area,
void tipc_link_reset_all(struct tipc_node *node);
void tipc_link_reset(struct tipc_link *l_ptr);
void tipc_link_reset_list(unsigned int bearer_id);
int tipc_link_xmit(struct sk_buff *buf, u32 dest, u32 selector);
int __tipc_link_xmit(struct tipc_link *link, struct sk_buff *buf);
int tipc_link_xmit_skb(struct sk_buff *skb, u32 dest, u32 selector);
int tipc_link_xmit(struct sk_buff_head *list, u32 dest, u32 selector);
int __tipc_link_xmit(struct tipc_link *link, struct sk_buff_head *list);
u32 tipc_link_get_max_pkt(u32 dest, u32 selector);
void tipc_link_bundle_rcv(struct sk_buff *buf);
void tipc_link_proto_xmit(struct tipc_link *l_ptr, u32 msg_typ, int prob,
+36 −38
Original line number Diff line number Diff line
@@ -166,11 +166,12 @@ int tipc_buf_append(struct sk_buff **headbuf, struct sk_buff **buf)
 * @offset: Posision in iov to start copying from
 * @dsz: Total length of user data
 * @pktmax: Max packet size that can be used
 * @chain: Buffer or chain of buffers to be returned to caller
 * @list: Buffer or chain of buffers to be returned to caller
 *
 * Returns message data size or errno: -ENOMEM, -EFAULT
 */
int tipc_msg_build(struct tipc_msg *mhdr, struct msghdr *m,
		   int offset, int dsz, int pktmax , struct sk_buff **chain)
int tipc_msg_build(struct tipc_msg *mhdr, struct msghdr *m, int offset,
		   int dsz, int pktmax, struct sk_buff_head *list)
{
	int mhsz = msg_hdr_sz(mhdr);
	int msz = mhsz + dsz;
@@ -179,22 +180,22 @@ int tipc_msg_build(struct tipc_msg *mhdr, struct msghdr *m,
	int pktrem = pktmax;
	int drem = dsz;
	struct tipc_msg pkthdr;
	struct sk_buff *buf, *prev;
	struct sk_buff *skb;
	char *pktpos;
	int rc;
	uint chain_sz = 0;

	msg_set_size(mhdr, msz);

	/* No fragmentation needed? */
	if (likely(msz <= pktmax)) {
		buf = tipc_buf_acquire(msz);
		*chain = buf;
		if (unlikely(!buf))
		skb = tipc_buf_acquire(msz);
		if (unlikely(!skb))
			return -ENOMEM;
		skb_copy_to_linear_data(buf, mhdr, mhsz);
		pktpos = buf->data + mhsz;
		TIPC_SKB_CB(buf)->chain_sz = 1;
		if (!dsz || !memcpy_fromiovecend(pktpos, m->msg_iov, offset, dsz))
		__skb_queue_tail(list, skb);
		skb_copy_to_linear_data(skb, mhdr, mhsz);
		pktpos = skb->data + mhsz;
		if (!dsz || !memcpy_fromiovecend(pktpos, m->msg_iov, offset,
						 dsz))
			return dsz;
		rc = -EFAULT;
		goto error;
@@ -207,15 +208,15 @@ int tipc_msg_build(struct tipc_msg *mhdr, struct msghdr *m,
	msg_set_fragm_no(&pkthdr, pktno);

	/* Prepare first fragment */
	*chain = buf = tipc_buf_acquire(pktmax);
	if (!buf)
	skb = tipc_buf_acquire(pktmax);
	if (!skb)
		return -ENOMEM;
	chain_sz = 1;
	pktpos = buf->data;
	skb_copy_to_linear_data(buf, &pkthdr, INT_H_SIZE);
	__skb_queue_tail(list, skb);
	pktpos = skb->data;
	skb_copy_to_linear_data(skb, &pkthdr, INT_H_SIZE);
	pktpos += INT_H_SIZE;
	pktrem -= INT_H_SIZE;
	skb_copy_to_linear_data_offset(buf, INT_H_SIZE, mhdr, mhsz);
	skb_copy_to_linear_data_offset(skb, INT_H_SIZE, mhdr, mhsz);
	pktpos += mhsz;
	pktrem -= mhsz;

@@ -238,28 +239,25 @@ int tipc_msg_build(struct tipc_msg *mhdr, struct msghdr *m,
			pktsz = drem + INT_H_SIZE;
		else
			pktsz = pktmax;
		prev = buf;
		buf = tipc_buf_acquire(pktsz);
		if (!buf) {
		skb = tipc_buf_acquire(pktsz);
		if (!skb) {
			rc = -ENOMEM;
			goto error;
		}
		chain_sz++;
		prev->next = buf;
		__skb_queue_tail(list, skb);
		msg_set_type(&pkthdr, FRAGMENT);
		msg_set_size(&pkthdr, pktsz);
		msg_set_fragm_no(&pkthdr, ++pktno);
		skb_copy_to_linear_data(buf, &pkthdr, INT_H_SIZE);
		pktpos = buf->data + INT_H_SIZE;
		skb_copy_to_linear_data(skb, &pkthdr, INT_H_SIZE);
		pktpos = skb->data + INT_H_SIZE;
		pktrem = pktsz - INT_H_SIZE;

	} while (1);
	TIPC_SKB_CB(*chain)->chain_sz = chain_sz;
	msg_set_type(buf_msg(buf), LAST_FRAGMENT);
	msg_set_type(buf_msg(skb), LAST_FRAGMENT);
	return dsz;
error:
	kfree_skb_list(*chain);
	*chain = NULL;
	__skb_queue_purge(list);
	__skb_queue_head_init(list);
	return rc;
}

@@ -430,22 +428,23 @@ int tipc_msg_eval(struct sk_buff *buf, u32 *dnode)
/* tipc_msg_reassemble() - clone a buffer chain of fragments and
 *                         reassemble the clones into one message
 */
struct sk_buff *tipc_msg_reassemble(struct sk_buff *chain)
struct sk_buff *tipc_msg_reassemble(struct sk_buff_head *list)
{
	struct sk_buff *buf = chain;
	struct sk_buff *frag = buf;
	struct sk_buff *skb;
	struct sk_buff *frag = NULL;
	struct sk_buff *head = NULL;
	int hdr_sz;

	/* Copy header if single buffer */
	if (!buf->next) {
		hdr_sz = skb_headroom(buf) + msg_hdr_sz(buf_msg(buf));
		return __pskb_copy(buf, hdr_sz, GFP_ATOMIC);
	if (skb_queue_len(list) == 1) {
		skb = skb_peek(list);
		hdr_sz = skb_headroom(skb) + msg_hdr_sz(buf_msg(skb));
		return __pskb_copy(skb, hdr_sz, GFP_ATOMIC);
	}

	/* Clone all fragments and reassemble */
	while (buf) {
		frag = skb_clone(buf, GFP_ATOMIC);
	skb_queue_walk(list, skb) {
		frag = skb_clone(skb, GFP_ATOMIC);
		if (!frag)
			goto error;
		frag->next = NULL;
@@ -453,7 +452,6 @@ struct sk_buff *tipc_msg_reassemble(struct sk_buff *chain)
			break;
		if (!head)
			goto error;
		buf = buf->next;
	}
	return frag;
error:
Loading