Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit 687f079a authored by David S. Miller's avatar David S. Miller
Browse files

Merge branch 'tipc-next'



Jon Maloy says:

====================
tipc: improve broadcast implementation

The TIPC broadcast link implementation is currently complex and hard to
follow. It also incurs some amount of code and structure duplication,
something that can be reduced significantly with a little effort.

This commit series introduces a number of improvements which address
both the locking structure, the code/structure duplication issue, and
the overall readbility of the code.

The series consists of three main parts:

1-7: Adaptation to the new link structure, and preparation for the next
     step. In particular, we want the broadcast transmission link to
     have a life cycle that is longer than any of its potential (unicast
     and broadcast receive links) users. This eliminates the need to
     always test for the presence of this link before accessing it.

8-10: This is what is really new in this series. Commit #9 is by far
      the largest and most important one, because it moves most of
      the broadcast functionality into link.c, partially reusing the
      fields and functionality of the unicast link. The removal of
      the "node_map" infrastructure in commit #10 is also an important
      achievement.

11-16: Some improvements leveraging the changes made in the previous
       commits.

The series needs commit 53387c4e ("tipc: extend broadcast link window size")
and commit e5356794 ("tipc: conditionally expand buffer headroom over udp tunnel")
which are both present in 'net' but not yet in 'net-next', to apply cleanly.
====================

Signed-off-by: default avatarDavid S. Miller <davem@davemloft.net>
parents ba3e2084 2af5ae37
Loading
Loading
Loading
Loading
+237 −751

File changed.

Preview size limit exceeded, changes collapsed.

+32 −90
Original line number Original line Diff line number Diff line
@@ -37,102 +37,44 @@
#ifndef _TIPC_BCAST_H
#ifndef _TIPC_BCAST_H
#define _TIPC_BCAST_H
#define _TIPC_BCAST_H


#include <linux/tipc_config.h>
#include "core.h"
#include "link.h"
#include "node.h"


/**
struct tipc_node;
 * struct tipc_bcbearer_pair - a pair of bearers used by broadcast link
struct tipc_msg;
 * @primary: pointer to primary bearer
struct tipc_nl_msg;
 * @secondary: pointer to secondary bearer
struct tipc_node_map;
 *
 * Bearers must have same priority and same set of reachable destinations
 * to be paired.
 */

struct tipc_bcbearer_pair {
	struct tipc_bearer *primary;
	struct tipc_bearer *secondary;
};

#define	BCBEARER		MAX_BEARERS

/**
 * struct tipc_bcbearer - bearer used by broadcast link
 * @bearer: (non-standard) broadcast bearer structure
 * @media: (non-standard) broadcast media structure
 * @bpairs: array of bearer pairs
 * @bpairs_temp: temporary array of bearer pairs used by tipc_bcbearer_sort()
 * @remains: temporary node map used by tipc_bcbearer_send()
 * @remains_new: temporary node map used tipc_bcbearer_send()
 *
 * Note: The fields labelled "temporary" are incorporated into the bearer
 * to avoid consuming potentially limited stack space through the use of
 * large local variables within multicast routines.  Concurrent access is
 * prevented through use of the spinlock "bclink_lock".
 */
struct tipc_bcbearer {
	struct tipc_bearer bearer;
	struct tipc_media media;
	struct tipc_bcbearer_pair bpairs[MAX_BEARERS];
	struct tipc_bcbearer_pair bpairs_temp[TIPC_MAX_LINK_PRI + 1];
	struct tipc_node_map remains;
	struct tipc_node_map remains_new;
};


/**
int tipc_bcast_init(struct net *net);
 * struct tipc_bclink - link used for broadcast messages
void tipc_bcast_reinit(struct net *net);
 * @lock: spinlock governing access to structure
void tipc_bcast_stop(struct net *net);
 * @link: (non-standard) broadcast link structure
void tipc_bcast_add_peer(struct net *net, struct tipc_link *l,
 * @node: (non-standard) node structure representing b'cast link's peer node
			 struct sk_buff_head *xmitq);
 * @bcast_nodes: map of broadcast-capable nodes
void tipc_bcast_remove_peer(struct net *net, struct tipc_link *rcv_bcl);
 * @retransmit_to: node that most recently requested a retransmit
void tipc_bcast_inc_bearer_dst_cnt(struct net *net, int bearer_id);
 *
void tipc_bcast_dec_bearer_dst_cnt(struct net *net, int bearer_id);
 * Handles sequence numbering, fragmentation, bundling, etc.
int  tipc_bcast_get_mtu(struct net *net);
 */
int tipc_bcast_xmit(struct net *net, struct sk_buff_head *list);
struct tipc_bclink {
int tipc_bcast_rcv(struct net *net, struct tipc_link *l, struct sk_buff *skb);
	spinlock_t lock;
void tipc_bcast_ack_rcv(struct net *net, struct tipc_link *l, u32 acked);
	struct tipc_link link;
void tipc_bcast_sync_rcv(struct net *net, struct tipc_link *l,
	struct tipc_node node;
			 struct tipc_msg *hdr);
	struct sk_buff_head arrvq;
int tipc_nl_add_bc_link(struct net *net, struct tipc_nl_msg *msg);
	struct sk_buff_head inputq;
int tipc_nl_bc_link_set(struct net *net, struct nlattr *attrs[]);
	struct tipc_node_map bcast_nodes;
int tipc_bclink_reset_stats(struct net *net);
	struct tipc_node *retransmit_to;
};


struct tipc_node;
static inline void tipc_bcast_lock(struct net *net)
extern const char tipc_bclink_name[];
{
	spin_lock_bh(&tipc_net(net)->bclock);
}


/**
static inline void tipc_bcast_unlock(struct net *net)
 * tipc_nmap_equal - test for equality of node maps
 */
static inline int tipc_nmap_equal(struct tipc_node_map *nm_a,
				  struct tipc_node_map *nm_b)
{
{
	return !memcmp(nm_a, nm_b, sizeof(*nm_a));
	spin_unlock_bh(&tipc_net(net)->bclock);
}
}


int tipc_bclink_init(struct net *net);
static inline struct tipc_link *tipc_bc_sndlink(struct net *net)
void tipc_bclink_stop(struct net *net);
{
void tipc_bclink_add_node(struct net *net, u32 addr);
	return tipc_net(net)->bcl;
void tipc_bclink_remove_node(struct net *net, u32 addr);
}
struct tipc_node *tipc_bclink_retransmit_to(struct net *tn);
void tipc_bclink_acknowledge(struct tipc_node *n_ptr, u32 acked);
void tipc_bclink_rcv(struct net *net, struct sk_buff *buf);
u32  tipc_bclink_get_last_sent(struct net *net);
u32  tipc_bclink_acks_missing(struct tipc_node *n_ptr);
void tipc_bclink_update_link_state(struct tipc_node *node,
				   u32 last_sent);
int  tipc_bclink_reset_stats(struct net *net);
int  tipc_bclink_set_queue_limits(struct net *net, u32 limit);
void tipc_bcbearer_sort(struct net *net, struct tipc_node_map *nm_ptr,
			u32 node, bool action);
uint  tipc_bclink_get_mtu(void);
int tipc_bclink_xmit(struct net *net, struct sk_buff_head *list);
void tipc_bclink_wakeup_users(struct net *net);
int tipc_nl_add_bc_link(struct net *net, struct tipc_nl_msg *msg);
int tipc_nl_bc_link_set(struct net *net, struct nlattr *attrs[]);
void tipc_bclink_input(struct net *net);
void tipc_bclink_sync_state(struct tipc_node *n, struct tipc_msg *msg);


#endif
#endif
+59 −35
Original line number Original line Diff line number Diff line
@@ -193,10 +193,8 @@ void tipc_bearer_add_dest(struct net *net, u32 bearer_id, u32 dest)


	rcu_read_lock();
	rcu_read_lock();
	b_ptr = rcu_dereference_rtnl(tn->bearer_list[bearer_id]);
	b_ptr = rcu_dereference_rtnl(tn->bearer_list[bearer_id]);
	if (b_ptr) {
	if (b_ptr)
		tipc_bcbearer_sort(net, &b_ptr->nodes, dest, true);
		tipc_disc_add_dest(b_ptr->link_req);
		tipc_disc_add_dest(b_ptr->link_req);
	}
	rcu_read_unlock();
	rcu_read_unlock();
}
}


@@ -207,10 +205,8 @@ void tipc_bearer_remove_dest(struct net *net, u32 bearer_id, u32 dest)


	rcu_read_lock();
	rcu_read_lock();
	b_ptr = rcu_dereference_rtnl(tn->bearer_list[bearer_id]);
	b_ptr = rcu_dereference_rtnl(tn->bearer_list[bearer_id]);
	if (b_ptr) {
	if (b_ptr)
		tipc_bcbearer_sort(net, &b_ptr->nodes, dest, false);
		tipc_disc_remove_dest(b_ptr->link_req);
		tipc_disc_remove_dest(b_ptr->link_req);
	}
	rcu_read_unlock();
	rcu_read_unlock();
}
}


@@ -418,10 +414,9 @@ void tipc_disable_l2_media(struct tipc_bearer *b)
 * @b_ptr: the bearer through which the packet is to be sent
 * @b_ptr: the bearer through which the packet is to be sent
 * @dest: peer destination address
 * @dest: peer destination address
 */
 */
int tipc_l2_send_msg(struct net *net, struct sk_buff *buf,
int tipc_l2_send_msg(struct net *net, struct sk_buff *skb,
		     struct tipc_bearer *b, struct tipc_media_addr *dest)
		     struct tipc_bearer *b, struct tipc_media_addr *dest)
{
{
	struct sk_buff *clone;
	struct net_device *dev;
	struct net_device *dev;
	int delta;
	int delta;


@@ -429,42 +424,48 @@ int tipc_l2_send_msg(struct net *net, struct sk_buff *buf,
	if (!dev)
	if (!dev)
		return 0;
		return 0;


	clone = skb_clone(buf, GFP_ATOMIC);
	delta = dev->hard_header_len - skb_headroom(skb);
	if (!clone)
		return 0;

	delta = dev->hard_header_len - skb_headroom(buf);
	if ((delta > 0) &&
	if ((delta > 0) &&
	    pskb_expand_head(clone, SKB_DATA_ALIGN(delta), 0, GFP_ATOMIC)) {
	    pskb_expand_head(skb, SKB_DATA_ALIGN(delta), 0, GFP_ATOMIC)) {
		kfree_skb(clone);
		kfree_skb(skb);
		return 0;
		return 0;
	}
	}


	skb_reset_network_header(clone);
	skb_reset_network_header(skb);
	clone->dev = dev;
	skb->dev = dev;
	clone->protocol = htons(ETH_P_TIPC);
	skb->protocol = htons(ETH_P_TIPC);
	dev_hard_header(clone, dev, ETH_P_TIPC, dest->value,
	dev_hard_header(skb, dev, ETH_P_TIPC, dest->value,
			dev->dev_addr, clone->len);
			dev->dev_addr, skb->len);
	dev_queue_xmit(clone);
	dev_queue_xmit(skb);
	return 0;
	return 0;
}
}


/* tipc_bearer_send- sends buffer to destination over bearer
int tipc_bearer_mtu(struct net *net, u32 bearer_id)
 *
{
 * IMPORTANT:
	int mtu = 0;
 * The media send routine must not alter the buffer being passed in
	struct tipc_bearer *b;
 * as it may be needed for later retransmission!

	rcu_read_lock();
	b = rcu_dereference_rtnl(tipc_net(net)->bearer_list[bearer_id]);
	if (b)
		mtu = b->mtu;
	rcu_read_unlock();
	return mtu;
}

/* tipc_bearer_xmit_skb - sends buffer to destination over bearer
 */
 */
void tipc_bearer_send(struct net *net, u32 bearer_id, struct sk_buff *buf,
void tipc_bearer_xmit_skb(struct net *net, u32 bearer_id,
			  struct sk_buff *skb,
			  struct tipc_media_addr *dest)
			  struct tipc_media_addr *dest)
{
{
	struct tipc_net *tn = net_generic(net, tipc_net_id);
	struct tipc_net *tn = tipc_net(net);
	struct tipc_bearer *b_ptr;
	struct tipc_bearer *b;


	rcu_read_lock();
	rcu_read_lock();
	b_ptr = rcu_dereference_rtnl(tn->bearer_list[bearer_id]);
	b = rcu_dereference_rtnl(tn->bearer_list[bearer_id]);
	if (likely(b_ptr))
	if (likely(b))
		b_ptr->media->send_msg(net, buf, b_ptr, dest);
		b->media->send_msg(net, skb, b, dest);
	rcu_read_unlock();
	rcu_read_unlock();
}
}


@@ -487,8 +488,31 @@ void tipc_bearer_xmit(struct net *net, u32 bearer_id,
		skb_queue_walk_safe(xmitq, skb, tmp) {
		skb_queue_walk_safe(xmitq, skb, tmp) {
			__skb_dequeue(xmitq);
			__skb_dequeue(xmitq);
			b->media->send_msg(net, skb, b, dst);
			b->media->send_msg(net, skb, b, dst);
			/* Until we remove cloning in tipc_l2_send_msg(): */
		}
			kfree_skb(skb);
	}
	rcu_read_unlock();
}

/* tipc_bearer_bc_xmit() - broadcast buffers to all destinations
 */
void tipc_bearer_bc_xmit(struct net *net, u32 bearer_id,
			 struct sk_buff_head *xmitq)
{
	struct tipc_net *tn = tipc_net(net);
	int net_id = tn->net_id;
	struct tipc_bearer *b;
	struct sk_buff *skb, *tmp;
	struct tipc_msg *hdr;

	rcu_read_lock();
	b = rcu_dereference_rtnl(tn->bearer_list[bearer_id]);
	if (likely(b)) {
		skb_queue_walk_safe(xmitq, skb, tmp) {
			hdr = buf_msg(skb);
			msg_set_non_seq(hdr, 1);
			msg_set_mc_netid(hdr, net_id);
			__skb_dequeue(xmitq);
			b->media->send_msg(net, skb, b, &b->bcast_addr);
		}
		}
	}
	}
	rcu_read_unlock();
	rcu_read_unlock();
+7 −2
Original line number Original line Diff line number Diff line
@@ -163,6 +163,7 @@ struct tipc_bearer {
	u32 identity;
	u32 identity;
	struct tipc_link_req *link_req;
	struct tipc_link_req *link_req;
	char net_plane;
	char net_plane;
	int node_cnt;
	struct tipc_node_map nodes;
	struct tipc_node_map nodes;
};
};


@@ -215,10 +216,14 @@ struct tipc_media *tipc_media_find(const char *name);
int tipc_bearer_setup(void);
int tipc_bearer_setup(void);
void tipc_bearer_cleanup(void);
void tipc_bearer_cleanup(void);
void tipc_bearer_stop(struct net *net);
void tipc_bearer_stop(struct net *net);
void tipc_bearer_send(struct net *net, u32 bearer_id, struct sk_buff *buf,
int tipc_bearer_mtu(struct net *net, u32 bearer_id);
void tipc_bearer_xmit_skb(struct net *net, u32 bearer_id,
			  struct sk_buff *skb,
			  struct tipc_media_addr *dest);
			  struct tipc_media_addr *dest);
void tipc_bearer_xmit(struct net *net, u32 bearer_id,
void tipc_bearer_xmit(struct net *net, u32 bearer_id,
		      struct sk_buff_head *xmitq,
		      struct sk_buff_head *xmitq,
		      struct tipc_media_addr *dst);
		      struct tipc_media_addr *dst);
void tipc_bearer_bc_xmit(struct net *net, u32 bearer_id,
			 struct sk_buff_head *xmitq);


#endif	/* _TIPC_BEARER_H */
#endif	/* _TIPC_BEARER_H */
+9 −0
Original line number Original line Diff line number Diff line
@@ -42,6 +42,7 @@
#include "bearer.h"
#include "bearer.h"
#include "net.h"
#include "net.h"
#include "socket.h"
#include "socket.h"
#include "bcast.h"


#include <linux/module.h>
#include <linux/module.h>


@@ -71,8 +72,15 @@ static int __net_init tipc_init_net(struct net *net)
	err = tipc_topsrv_start(net);
	err = tipc_topsrv_start(net);
	if (err)
	if (err)
		goto out_subscr;
		goto out_subscr;

	err = tipc_bcast_init(net);
	if (err)
		goto out_bclink;

	return 0;
	return 0;


out_bclink:
	tipc_bcast_stop(net);
out_subscr:
out_subscr:
	tipc_nametbl_stop(net);
	tipc_nametbl_stop(net);
out_nametbl:
out_nametbl:
@@ -85,6 +93,7 @@ static void __net_exit tipc_exit_net(struct net *net)
{
{
	tipc_topsrv_stop(net);
	tipc_topsrv_stop(net);
	tipc_net_stop(net);
	tipc_net_stop(net);
	tipc_bcast_stop(net);
	tipc_nametbl_stop(net);
	tipc_nametbl_stop(net);
	tipc_sk_rht_destroy(net);
	tipc_sk_rht_destroy(net);
}
}
Loading