Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit 10f67868 authored by Alexei Starovoitov's avatar Alexei Starovoitov
Browse files

Merge branch 'xdp_xmit-bulking'



Jesper Dangaard Brouer says:

====================
This patchset change ndo_xdp_xmit API to take a bulk of xdp frames.

When kernel is compiled with CONFIG_RETPOLINE, every indirect function
pointer (branch) call hurts performance. For XDP this have a huge
negative performance impact.

This patchset reduce the needed (indirect) calls to ndo_xdp_xmit, but
also prepares for further optimizations.  The DMA APIs use of indirect
function pointer calls is the primary source the regression.  It is
left for a followup patchset, to use bulking calls towards the DMA API
(via the scatter-gatter calls).

The other advantage of this API change is that drivers can easier
amortize the cost of any sync/locking scheme, over the bulk of
packets.  The assumption of the current API is that the driver
implemementing the NDO will also allocate a dedicated XDP TX queue for
every CPU in the system.  Which is not always possible or practical to
configure. E.g. ixgbe cannot load an XDP program on a machine with
more than 96 CPUs, due to limited hardware TX queues.  E.g. virtio_net
is hard to configure as it requires manually increasing the
queues. E.g. tun driver chooses to use a per XDP frame producer lock
modulo smp_processor_id over avail queues.

I'm considered adding 'flags' to ndo_xdp_xmit, but it's not part of
this patchset.  This will be a followup patchset, once we know if this
will be needed (e.g. for non-map xdp_redirect flush-flag, and if
AF_XDP chooses to use ndo_xdp_xmit for TX).

---
V5: Fixed up issues spotted by Daniel and John

V4: Splitout the patches from 4 to 8 patches.  I cannot split the
driver changes from the NDO change, but I've tried to isolated the NDO
change together with the driver change as much as possible.
====================

Signed-off-by: default avatarAlexei Starovoitov <ast@kernel.org>
parents f80acbd2 a570e48f
Loading
Loading
Loading
Loading
+19 −7
Original line number Diff line number Diff line
@@ -3664,14 +3664,19 @@ netdev_tx_t i40e_lan_xmit_frame(struct sk_buff *skb, struct net_device *netdev)
 * @dev: netdev
 * @xdp: XDP buffer
 *
 * Returns Zero if sent, else an error code
 * Returns number of frames successfully sent. Frames that fail are
 * free'ed via XDP return API.
 *
 * For error cases, a negative errno code is returned and no-frames
 * are transmitted (caller must handle freeing frames).
 **/
int i40e_xdp_xmit(struct net_device *dev, struct xdp_frame *xdpf)
int i40e_xdp_xmit(struct net_device *dev, int n, struct xdp_frame **frames)
{
	struct i40e_netdev_priv *np = netdev_priv(dev);
	unsigned int queue_index = smp_processor_id();
	struct i40e_vsi *vsi = np->vsi;
	int err;
	int drops = 0;
	int i;

	if (test_bit(__I40E_VSI_DOWN, vsi->state))
		return -ENETDOWN;
@@ -3679,11 +3684,18 @@ int i40e_xdp_xmit(struct net_device *dev, struct xdp_frame *xdpf)
	if (!i40e_enabled_xdp_vsi(vsi) || queue_index >= vsi->num_queue_pairs)
		return -ENXIO;

	for (i = 0; i < n; i++) {
		struct xdp_frame *xdpf = frames[i];
		int err;

		err = i40e_xmit_xdp_ring(xdpf, vsi->xdp_rings[queue_index]);
	if (err != I40E_XDP_TX)
		return -ENOSPC;
		if (err != I40E_XDP_TX) {
			xdp_return_frame_rx_napi(xdpf);
			drops++;
		}
	}

	return 0;
	return n - drops;
}

/**
+1 −1
Original line number Diff line number Diff line
@@ -487,7 +487,7 @@ u32 i40e_get_tx_pending(struct i40e_ring *ring, bool in_sw);
void i40e_detect_recover_hung(struct i40e_vsi *vsi);
int __i40e_maybe_stop_tx(struct i40e_ring *tx_ring, int size);
bool __i40e_chk_linearize(struct sk_buff *skb);
int i40e_xdp_xmit(struct net_device *dev, struct xdp_frame *xdpf);
int i40e_xdp_xmit(struct net_device *dev, int n, struct xdp_frame **frames);
void i40e_xdp_flush(struct net_device *dev);

/**
+15 −6
Original line number Diff line number Diff line
@@ -10017,11 +10017,13 @@ static int ixgbe_xdp(struct net_device *dev, struct netdev_bpf *xdp)
	}
}

static int ixgbe_xdp_xmit(struct net_device *dev, struct xdp_frame *xdpf)
static int ixgbe_xdp_xmit(struct net_device *dev, int n,
			  struct xdp_frame **frames)
{
	struct ixgbe_adapter *adapter = netdev_priv(dev);
	struct ixgbe_ring *ring;
	int err;
	int drops = 0;
	int i;

	if (unlikely(test_bit(__IXGBE_DOWN, &adapter->state)))
		return -ENETDOWN;
@@ -10033,11 +10035,18 @@ static int ixgbe_xdp_xmit(struct net_device *dev, struct xdp_frame *xdpf)
	if (unlikely(!ring))
		return -ENXIO;

	for (i = 0; i < n; i++) {
		struct xdp_frame *xdpf = frames[i];
		int err;

		err = ixgbe_xmit_xdp_ring(adapter, xdpf);
	if (err != IXGBE_XDP_TX)
		return -ENOSPC;
		if (err != IXGBE_XDP_TX) {
			xdp_return_frame_rx_napi(xdpf);
			drops++;
		}
	}

	return 0;
	return n - drops;
}

static void ixgbe_xdp_flush(struct net_device *dev)
+24 −13
Original line number Diff line number Diff line
@@ -70,6 +70,7 @@
#include <net/netns/generic.h>
#include <net/rtnetlink.h>
#include <net/sock.h>
#include <net/xdp.h>
#include <linux/seq_file.h>
#include <linux/uio.h>
#include <linux/skb_array.h>
@@ -1290,34 +1291,44 @@ static const struct net_device_ops tun_netdev_ops = {
	.ndo_get_stats64	= tun_net_get_stats64,
};

static int tun_xdp_xmit(struct net_device *dev, struct xdp_frame *frame)
static int tun_xdp_xmit(struct net_device *dev, int n, struct xdp_frame **frames)
{
	struct tun_struct *tun = netdev_priv(dev);
	struct tun_file *tfile;
	u32 numqueues;
	int ret = 0;
	int drops = 0;
	int cnt = n;
	int i;

	rcu_read_lock();

	numqueues = READ_ONCE(tun->numqueues);
	if (!numqueues) {
		ret = -ENOSPC;
		goto out;
		rcu_read_unlock();
		return -ENXIO; /* Caller will free/return all frames */
	}

	tfile = rcu_dereference(tun->tfiles[smp_processor_id() %
					    numqueues]);

	spin_lock(&tfile->tx_ring.producer_lock);
	for (i = 0; i < n; i++) {
		struct xdp_frame *xdp = frames[i];
		/* Encode the XDP flag into lowest bit for consumer to differ
		 * XDP buffer from sk_buff.
		 */
	if (ptr_ring_produce(&tfile->tx_ring, tun_xdp_to_ptr(frame))) {
		void *frame = tun_xdp_to_ptr(xdp);

		if (__ptr_ring_produce(&tfile->tx_ring, frame)) {
			this_cpu_inc(tun->pcpu_stats->tx_dropped);
		ret = -ENOSPC;
			xdp_return_frame_rx_napi(xdp);
			drops++;
		}
	}
	spin_unlock(&tfile->tx_ring.producer_lock);

out:
	rcu_read_unlock();
	return ret;
	return cnt - drops;
}

static int tun_xdp_tx(struct net_device *dev, struct xdp_buff *xdp)
@@ -1327,7 +1338,7 @@ static int tun_xdp_tx(struct net_device *dev, struct xdp_buff *xdp)
	if (unlikely(!frame))
		return -EOVERFLOW;

	return tun_xdp_xmit(dev, frame);
	return tun_xdp_xmit(dev, 1, &frame);
}

static void tun_xdp_flush(struct net_device *dev)
+49 −17
Original line number Diff line number Diff line
@@ -419,23 +419,13 @@ static void virtnet_xdp_flush(struct net_device *dev)
	virtqueue_kick(sq->vq);
}

static int __virtnet_xdp_xmit(struct virtnet_info *vi,
static int __virtnet_xdp_xmit_one(struct virtnet_info *vi,
				   struct send_queue *sq,
				   struct xdp_frame *xdpf)
{
	struct virtio_net_hdr_mrg_rxbuf *hdr;
	struct xdp_frame *xdpf_sent;
	struct send_queue *sq;
	unsigned int len;
	unsigned int qp;
	int err;

	qp = vi->curr_queue_pairs - vi->xdp_queue_pairs + smp_processor_id();
	sq = &vi->sq[qp];

	/* Free up any pending old buffers before queueing new ones. */
	while ((xdpf_sent = virtqueue_get_buf(sq->vq, &len)) != NULL)
		xdp_return_frame(xdpf_sent);

	/* virtqueue want to use data area in-front of packet */
	if (unlikely(xdpf->metasize > 0))
		return -EOPNOTSUPP;
@@ -459,11 +449,40 @@ static int __virtnet_xdp_xmit(struct virtnet_info *vi,
	return 0;
}

static int virtnet_xdp_xmit(struct net_device *dev, struct xdp_frame *xdpf)
static int __virtnet_xdp_tx_xmit(struct virtnet_info *vi,
				   struct xdp_frame *xdpf)
{
	struct xdp_frame *xdpf_sent;
	struct send_queue *sq;
	unsigned int len;
	unsigned int qp;

	qp = vi->curr_queue_pairs - vi->xdp_queue_pairs + smp_processor_id();
	sq = &vi->sq[qp];

	/* Free up any pending old buffers before queueing new ones. */
	while ((xdpf_sent = virtqueue_get_buf(sq->vq, &len)) != NULL)
		xdp_return_frame(xdpf_sent);

	return __virtnet_xdp_xmit_one(vi, sq, xdpf);
}

static int virtnet_xdp_xmit(struct net_device *dev,
			    int n, struct xdp_frame **frames)
{
	struct virtnet_info *vi = netdev_priv(dev);
	struct receive_queue *rq = vi->rq;
	struct xdp_frame *xdpf_sent;
	struct bpf_prog *xdp_prog;
	struct send_queue *sq;
	unsigned int len;
	unsigned int qp;
	int drops = 0;
	int err;
	int i;

	qp = vi->curr_queue_pairs - vi->xdp_queue_pairs + smp_processor_id();
	sq = &vi->sq[qp];

	/* Only allow ndo_xdp_xmit if XDP is loaded on dev, as this
	 * indicate XDP resources have been successfully allocated.
@@ -472,7 +491,20 @@ static int virtnet_xdp_xmit(struct net_device *dev, struct xdp_frame *xdpf)
	if (!xdp_prog)
		return -ENXIO;

	return __virtnet_xdp_xmit(vi, xdpf);
	/* Free up any pending old buffers before queueing new ones. */
	while ((xdpf_sent = virtqueue_get_buf(sq->vq, &len)) != NULL)
		xdp_return_frame(xdpf_sent);

	for (i = 0; i < n; i++) {
		struct xdp_frame *xdpf = frames[i];

		err = __virtnet_xdp_xmit_one(vi, sq, xdpf);
		if (err) {
			xdp_return_frame_rx_napi(xdpf);
			drops++;
		}
	}
	return n - drops;
}

static unsigned int virtnet_get_headroom(struct virtnet_info *vi)
@@ -616,7 +648,7 @@ static struct sk_buff *receive_small(struct net_device *dev,
			xdpf = convert_to_xdp_frame(&xdp);
			if (unlikely(!xdpf))
				goto err_xdp;
			err = __virtnet_xdp_xmit(vi, xdpf);
			err = __virtnet_xdp_tx_xmit(vi, xdpf);
			if (unlikely(err)) {
				trace_xdp_exception(vi->dev, xdp_prog, act);
				goto err_xdp;
@@ -779,7 +811,7 @@ static struct sk_buff *receive_mergeable(struct net_device *dev,
			xdpf = convert_to_xdp_frame(&xdp);
			if (unlikely(!xdpf))
				goto err_xdp;
			err = __virtnet_xdp_xmit(vi, xdpf);
			err = __virtnet_xdp_tx_xmit(vi, xdpf);
			if (unlikely(err)) {
				trace_xdp_exception(vi->dev, xdp_prog, act);
				if (unlikely(xdp_page != page))
Loading