Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit a853e4c6 authored by Jon Paul Maloy's avatar Jon Paul Maloy Committed by David S. Miller
Browse files

tipc: introduce replicast as transport option for multicast



TIPC multicast messages are currently carried over a reliable
'broadcast link', making use of the underlying media's ability to
transport packets as L2 broadcast or IP multicast to all nodes in
the cluster.

When the used bearer is lacking that ability, we can instead emulate
the broadcast service by replicating and sending the packets over as
many unicast links as needed to reach all identified destinations.
We now introduce a new TIPC link-level 'replicast' service that does
this.

Reviewed-by: default avatarParthasarathy Bhuvaragan <parthasarathy.bhuvaragan@ericsson.com>
Acked-by: default avatarYing Xue <ying.xue@windriver.com>
Signed-off-by: default avatarJon Maloy <jon.maloy@ericsson.com>
Signed-off-by: default avatarDavid S. Miller <davem@davemloft.net>
parent 2ae0b8af
Loading
Loading
Loading
Loading
+82 −23
Original line number Diff line number Diff line
@@ -70,7 +70,7 @@ static struct tipc_bc_base *tipc_bc_base(struct net *net)

int tipc_bcast_get_mtu(struct net *net)
{
	return tipc_link_mtu(tipc_bc_sndlink(net));
	return tipc_link_mtu(tipc_bc_sndlink(net)) - INT_H_SIZE;
}

/* tipc_bcbase_select_primary(): find a bearer with links to all destinations,
@@ -175,42 +175,101 @@ static void tipc_bcbase_xmit(struct net *net, struct sk_buff_head *xmitq)
	__skb_queue_purge(&_xmitq);
}

/* tipc_bcast_xmit - deliver buffer chain to all nodes in cluster
 *                    and to identified node local sockets
/* tipc_bcast_xmit - broadcast the buffer chain to all external nodes
 * @net: the applicable net namespace
 * @list: chain of buffers containing message
 * @pkts: chain of buffers containing message
 * @cong_link_cnt: set to 1 if broadcast link is congested, otherwise 0
 * Consumes the buffer chain.
 * Returns 0 if success, otherwise errno: -ELINKCONG,-EHOSTUNREACH,-EMSGSIZE
 * Returns 0 if success, otherwise errno: -EHOSTUNREACH,-EMSGSIZE
 */
int tipc_bcast_xmit(struct net *net, struct sk_buff_head *list)
static int tipc_bcast_xmit(struct net *net, struct sk_buff_head *pkts,
			   u16 *cong_link_cnt)
{
	struct tipc_link *l = tipc_bc_sndlink(net);
	struct sk_buff_head xmitq, inputq, rcvq;
	struct sk_buff_head xmitq;
	int rc = 0;

	__skb_queue_head_init(&rcvq);
	__skb_queue_head_init(&xmitq);
	skb_queue_head_init(&inputq);

	/* Prepare message clone for local node */
	if (unlikely(!tipc_msg_reassemble(list, &rcvq)))
		return -EHOSTUNREACH;

	tipc_bcast_lock(net);
	if (tipc_link_bc_peers(l))
		rc = tipc_link_xmit(l, list, &xmitq);
		rc = tipc_link_xmit(l, pkts, &xmitq);
	tipc_bcast_unlock(net);

	/* Don't send to local node if adding to link failed */
	if (unlikely(rc && (rc != -ELINKCONG))) {
		__skb_queue_purge(&rcvq);
	tipc_bcbase_xmit(net, &xmitq);
	__skb_queue_purge(pkts);
	if (rc == -ELINKCONG) {
		*cong_link_cnt = 1;
		rc = 0;
	}
	return rc;
}

	/* Broadcast to all nodes, inluding local node */
	tipc_bcbase_xmit(net, &xmitq);
	tipc_sk_mcast_rcv(net, &rcvq, &inputq);
	__skb_queue_purge(list);
/* tipc_rcast_xmit - replicate and send a message to given destination nodes
 * @net: the applicable net namespace
 * @pkts: chain of buffers containing message
 * @dests: list of destination nodes
 * @cong_link_cnt: returns number of congested links
 * @cong_links: returns identities of congested links
 * Returns 0 if success, otherwise errno
 */
static int tipc_rcast_xmit(struct net *net, struct sk_buff_head *pkts,
			   struct tipc_nlist *dests, u16 *cong_link_cnt)
{
	struct sk_buff_head _pkts;
	struct u32_item *n, *tmp;
	u32 dst, selector;

	selector = msg_link_selector(buf_msg(skb_peek(pkts)));
	__skb_queue_head_init(&_pkts);

	list_for_each_entry_safe(n, tmp, &dests->list, list) {
		dst = n->value;
		if (!tipc_msg_pskb_copy(dst, pkts, &_pkts))
			return -ENOMEM;

		/* Any other return value than -ELINKCONG is ignored */
		if (tipc_node_xmit(net, &_pkts, dst, selector) == -ELINKCONG)
			(*cong_link_cnt)++;
	}
	return 0;
}

/* tipc_mcast_xmit - deliver message to indicated destination nodes
 *                   and to identified node local sockets
 * @net: the applicable net namespace
 * @pkts: chain of buffers containing message
 * @dests: destination nodes for message. Not consumed.
 * @cong_link_cnt: returns number of encountered congested destination links
 * @cong_links: returns identities of congested links
 * Consumes buffer chain.
 * Returns 0 if success, otherwise errno
 */
int tipc_mcast_xmit(struct net *net, struct sk_buff_head *pkts,
		    struct tipc_nlist *dests, u16 *cong_link_cnt)
{
	struct tipc_bc_base *bb = tipc_bc_base(net);
	struct sk_buff_head inputq, localq;
	int rc = 0;

	skb_queue_head_init(&inputq);
	skb_queue_head_init(&localq);

	/* Clone packets before they are consumed by next call */
	if (dests->local && !tipc_msg_reassemble(pkts, &localq)) {
		rc = -ENOMEM;
		goto exit;
	}

	if (dests->remote) {
		if (!bb->bcast_support)
			rc = tipc_rcast_xmit(net, pkts, dests, cong_link_cnt);
		else
			rc = tipc_bcast_xmit(net, pkts, cong_link_cnt);
	}

	if (dests->local)
		tipc_sk_mcast_rcv(net, &localq, &inputq);
exit:
	__skb_queue_purge(pkts);
	return rc;
}

+2 −1
Original line number Diff line number Diff line
@@ -66,7 +66,8 @@ void tipc_bcast_remove_peer(struct net *net, struct tipc_link *rcv_bcl);
void tipc_bcast_inc_bearer_dst_cnt(struct net *net, int bearer_id);
void tipc_bcast_dec_bearer_dst_cnt(struct net *net, int bearer_id);
int  tipc_bcast_get_mtu(struct net *net);
int tipc_bcast_xmit(struct net *net, struct sk_buff_head *list);
int tipc_mcast_xmit(struct net *net, struct sk_buff_head *pkts,
		    struct tipc_nlist *dests, u16 *cong_link_cnt);
int tipc_bcast_rcv(struct net *net, struct tipc_link *l, struct sk_buff *skb);
void tipc_bcast_ack_rcv(struct net *net, struct tipc_link *l,
			struct tipc_msg *hdr);
+7 −1
Original line number Diff line number Diff line
@@ -1032,11 +1032,17 @@ int tipc_link_retrans(struct tipc_link *l, u16 from, u16 to,
static bool tipc_data_input(struct tipc_link *l, struct sk_buff *skb,
			    struct sk_buff_head *inputq)
{
	switch (msg_user(buf_msg(skb))) {
	struct tipc_msg *hdr = buf_msg(skb);

	switch (msg_user(hdr)) {
	case TIPC_LOW_IMPORTANCE:
	case TIPC_MEDIUM_IMPORTANCE:
	case TIPC_HIGH_IMPORTANCE:
	case TIPC_CRITICAL_IMPORTANCE:
		if (unlikely(msg_type(hdr) == TIPC_MCAST_MSG)) {
			skb_queue_tail(l->bc_rcvlink->inputq, skb);
			return true;
		}
	case CONN_MANAGER:
		skb_queue_tail(inputq, skb);
		return true;
+17 −0
Original line number Diff line number Diff line
@@ -607,6 +607,23 @@ bool tipc_msg_reassemble(struct sk_buff_head *list, struct sk_buff_head *rcvq)
	return false;
}

bool tipc_msg_pskb_copy(u32 dst, struct sk_buff_head *msg,
			struct sk_buff_head *cpy)
{
	struct sk_buff *skb, *_skb;

	skb_queue_walk(msg, skb) {
		_skb = pskb_copy(skb, GFP_ATOMIC);
		if (!_skb) {
			__skb_queue_purge(cpy);
			return false;
		}
		msg_set_destnode(buf_msg(_skb), dst);
		__skb_queue_tail(cpy, _skb);
	}
	return true;
}

/* tipc_skb_queue_sorted(); sort pkt into list according to sequence number
 * @list: list to be appended to
 * @seqno: sequence number of buffer to add
+4 −5
Original line number Diff line number Diff line
@@ -631,14 +631,11 @@ static inline void msg_set_bc_netid(struct tipc_msg *m, u32 id)

static inline u32 msg_link_selector(struct tipc_msg *m)
{
	if (msg_user(m) == MSG_FRAGMENTER)
		m = (void *)msg_data(m);
	return msg_bits(m, 4, 0, 1);
}

static inline void msg_set_link_selector(struct tipc_msg *m, u32 n)
{
	msg_set_bits(m, 4, 0, 1, n);
}

/*
 * Word 5
 */
@@ -835,6 +832,8 @@ int tipc_msg_build(struct tipc_msg *mhdr, struct msghdr *m,
		   int offset, int dsz, int mtu, struct sk_buff_head *list);
bool tipc_msg_lookup_dest(struct net *net, struct sk_buff *skb, int *err);
bool tipc_msg_reassemble(struct sk_buff_head *list, struct sk_buff_head *rcvq);
bool tipc_msg_pskb_copy(u32 dst, struct sk_buff_head *msg,
			struct sk_buff_head *cpy);
void __tipc_skb_queue_sorted(struct sk_buff_head *list, u16 seqno,
			     struct sk_buff *skb);

Loading