Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit 2276f58a authored by Paolo Abeni's avatar Paolo Abeni Committed by David S. Miller
Browse files

udp: use a separate rx queue for packet reception



under udp flood the sk_receive_queue spinlock is heavily contended.
This patch try to reduce the contention on such lock adding a
second receive queue to the udp sockets; recvmsg() looks first
in such queue and, only if empty, tries to fetch the data from
sk_receive_queue. The latter is spliced into the newly added
queue every time the receive path has to acquire the
sk_receive_queue lock.

The accounting of forward allocated memory is still protected with
the sk_receive_queue lock, so udp_rmem_release() needs to acquire
both locks when the forward deficit is flushed.

On specific scenarios we can end up acquiring and releasing the
sk_receive_queue lock multiple times; that will be covered by
the next patch

Suggested-by: default avatarEric Dumazet <edumazet@google.com>
Signed-off-by: default avatarPaolo Abeni <pabeni@redhat.com>
Acked-by: default avatarEric Dumazet <edumazet@google.com>
Signed-off-by: default avatarDavid S. Miller <davem@davemloft.net>
parent 65101aec
Loading
Loading
Loading
Loading
+3 −0
Original line number Diff line number Diff line
@@ -80,6 +80,9 @@ struct udp_sock {
						struct sk_buff *skb,
						int nhoff);

	/* udp_recvmsg try to use this before splicing sk_receive_queue */
	struct sk_buff_head	reader_queue ____cacheline_aligned_in_smp;

	/* This field is dirtied by udp_recvmsg() */
	int		forward_deficit;
};
+2 −7
Original line number Diff line number Diff line
@@ -249,13 +249,8 @@ void udp_destruct_sock(struct sock *sk);
void skb_consume_udp(struct sock *sk, struct sk_buff *skb, int len);
int __udp_enqueue_schedule_skb(struct sock *sk, struct sk_buff *skb);
void udp_skb_destructor(struct sock *sk, struct sk_buff *skb);
static inline struct sk_buff *
__skb_recv_udp(struct sock *sk, unsigned int flags, int noblock, int *peeked,
	       int *off, int *err)
{
	return __skb_recv_datagram(sk, flags | (noblock ? MSG_DONTWAIT : 0),
				   udp_skb_destructor, peeked, off, err);
}
struct sk_buff *__skb_recv_udp(struct sock *sk, unsigned int flags,
			       int noblock, int *peeked, int *off, int *err);
static inline struct sk_buff *skb_recv_udp(struct sock *sk, unsigned int flags,
					   int noblock, int *err)
{
+1 −1
Original line number Diff line number Diff line
@@ -26,8 +26,8 @@ static __inline__ int udplite_getfrag(void *from, char *to, int offset,
/* Designate sk as UDP-Lite socket */
static inline int udplite_sk_init(struct sock *sk)
{
	udp_init_sock(sk);
	udp_sk(sk)->pcflag = UDPLITE_BIT;
	sk->sk_destruct = udp_destruct_sock;
	return 0;
}

+123 −15
Original line number Diff line number Diff line
@@ -1167,19 +1167,24 @@ int udp_sendpage(struct sock *sk, struct page *page, int offset,
static void udp_rmem_release(struct sock *sk, int size, int partial)
{
	struct udp_sock *up = udp_sk(sk);
	struct sk_buff_head *sk_queue;
	int amt;

	if (likely(partial)) {
		up->forward_deficit += size;
		size = up->forward_deficit;
		if (size < (sk->sk_rcvbuf >> 2) &&
		    !skb_queue_empty(&sk->sk_receive_queue))
		    !skb_queue_empty(&up->reader_queue))
			return;
	} else {
		size += up->forward_deficit;
	}
	up->forward_deficit = 0;

	/* acquire the sk_receive_queue for fwd allocated memory scheduling */
	sk_queue = &sk->sk_receive_queue;
	spin_lock(&sk_queue->lock);

	sk->sk_forward_alloc += size;
	amt = (sk->sk_forward_alloc - partial) & ~(SK_MEM_QUANTUM - 1);
	sk->sk_forward_alloc -= amt;
@@ -1188,9 +1193,14 @@ static void udp_rmem_release(struct sock *sk, int size, int partial)
		__sk_mem_reduce_allocated(sk, amt >> SK_MEM_QUANTUM_SHIFT);

	atomic_sub(size, &sk->sk_rmem_alloc);

	/* this can save us from acquiring the rx queue lock on next receive */
	skb_queue_splice_tail_init(sk_queue, &up->reader_queue);

	spin_unlock(&sk_queue->lock);
}

/* Note: called with sk_receive_queue.lock held.
/* Note: called with reader_queue.lock held.
 * Instead of using skb->truesize here, find a copy of it in skb->dev_scratch
 * This avoids a cache line miss while receive_queue lock is held.
 * Look at __udp_enqueue_schedule_skb() to find where this copy is done.
@@ -1306,10 +1316,12 @@ EXPORT_SYMBOL_GPL(__udp_enqueue_schedule_skb);
void udp_destruct_sock(struct sock *sk)
{
	/* reclaim completely the forward allocated memory */
	struct udp_sock *up = udp_sk(sk);
	unsigned int total = 0;
	struct sk_buff *skb;

	while ((skb = __skb_dequeue(&sk->sk_receive_queue)) != NULL) {
	skb_queue_splice_tail_init(&sk->sk_receive_queue, &up->reader_queue);
	while ((skb = __skb_dequeue(&up->reader_queue)) != NULL) {
		total += skb->truesize;
		kfree_skb(skb);
	}
@@ -1321,6 +1333,7 @@ EXPORT_SYMBOL_GPL(udp_destruct_sock);

int udp_init_sock(struct sock *sk)
{
	skb_queue_head_init(&udp_sk(sk)->reader_queue);
	sk->sk_destruct = udp_destruct_sock;
	return 0;
}
@@ -1338,6 +1351,26 @@ void skb_consume_udp(struct sock *sk, struct sk_buff *skb, int len)
}
EXPORT_SYMBOL_GPL(skb_consume_udp);

static struct sk_buff *__first_packet_length(struct sock *sk,
					     struct sk_buff_head *rcvq,
					     int *total)
{
	struct sk_buff *skb;

	while ((skb = skb_peek(rcvq)) != NULL &&
	       udp_lib_checksum_complete(skb)) {
		__UDP_INC_STATS(sock_net(sk), UDP_MIB_CSUMERRORS,
				IS_UDPLITE(sk));
		__UDP_INC_STATS(sock_net(sk), UDP_MIB_INERRORS,
				IS_UDPLITE(sk));
		atomic_inc(&sk->sk_drops);
		__skb_unlink(skb, rcvq);
		*total += skb->truesize;
		kfree_skb(skb);
	}
	return skb;
}

/**
 *	first_packet_length	- return length of first packet in receive queue
 *	@sk: socket
@@ -1347,22 +1380,20 @@ EXPORT_SYMBOL_GPL(skb_consume_udp);
 */
static int first_packet_length(struct sock *sk)
{
	struct sk_buff_head *rcvq = &sk->sk_receive_queue;
	struct sk_buff_head *rcvq = &udp_sk(sk)->reader_queue;
	struct sk_buff_head *sk_queue = &sk->sk_receive_queue;
	struct sk_buff *skb;
	int total = 0;
	int res;

	spin_lock_bh(&rcvq->lock);
	while ((skb = skb_peek(rcvq)) != NULL &&
		udp_lib_checksum_complete(skb)) {
		__UDP_INC_STATS(sock_net(sk), UDP_MIB_CSUMERRORS,
				IS_UDPLITE(sk));
		__UDP_INC_STATS(sock_net(sk), UDP_MIB_INERRORS,
				IS_UDPLITE(sk));
		atomic_inc(&sk->sk_drops);
		__skb_unlink(skb, rcvq);
		total += skb->truesize;
		kfree_skb(skb);
	skb = __first_packet_length(sk, rcvq, &total);
	if (!skb && !skb_queue_empty(sk_queue)) {
		spin_lock(&sk_queue->lock);
		skb_queue_splice_tail_init(sk_queue, rcvq);
		spin_unlock(&sk_queue->lock);

		skb = __first_packet_length(sk, rcvq, &total);
	}
	res = skb ? skb->len : -1;
	if (total)
@@ -1400,6 +1431,79 @@ int udp_ioctl(struct sock *sk, int cmd, unsigned long arg)
}
EXPORT_SYMBOL(udp_ioctl);

struct sk_buff *__skb_recv_udp(struct sock *sk, unsigned int flags,
			       int noblock, int *peeked, int *off, int *err)
{
	struct sk_buff_head *sk_queue = &sk->sk_receive_queue;
	struct sk_buff_head *queue;
	struct sk_buff *last;
	long timeo;
	int error;

	queue = &udp_sk(sk)->reader_queue;
	flags |= noblock ? MSG_DONTWAIT : 0;
	timeo = sock_rcvtimeo(sk, flags & MSG_DONTWAIT);
	do {
		struct sk_buff *skb;

		error = sock_error(sk);
		if (error)
			break;

		error = -EAGAIN;
		*peeked = 0;
		do {
			int _off = *off;

			spin_lock_bh(&queue->lock);
			skb = __skb_try_recv_from_queue(sk, queue, flags,
							udp_skb_destructor,
							peeked, &_off, err,
							&last);
			if (skb) {
				spin_unlock_bh(&queue->lock);
				*off = _off;
				return skb;
			}

			if (skb_queue_empty(sk_queue)) {
				spin_unlock_bh(&queue->lock);
				goto busy_check;
			}

			/* refill the reader queue and walk it again */
			_off = *off;
			spin_lock(&sk_queue->lock);
			skb_queue_splice_tail_init(sk_queue, queue);
			spin_unlock(&sk_queue->lock);

			skb = __skb_try_recv_from_queue(sk, queue, flags,
							udp_skb_destructor,
							peeked, &_off, err,
							&last);
			spin_unlock_bh(&queue->lock);
			if (skb) {
				*off = _off;
				return skb;
			}

busy_check:
			if (!sk_can_busy_loop(sk))
				break;

			sk_busy_loop(sk, flags & MSG_DONTWAIT);
		} while (!skb_queue_empty(sk_queue));

		/* sk_queue is empty, reader_queue may contain peeked packets */
	} while (timeo &&
		 !__skb_wait_for_more_packets(sk, &error, &timeo,
					      (struct sk_buff *)sk_queue));

	*err = error;
	return NULL;
}
EXPORT_SYMBOL_GPL(__skb_recv_udp);

/*
 * 	This should be easy, if there is something there we
 * 	return it, otherwise we block.
@@ -1490,7 +1594,8 @@ int udp_recvmsg(struct sock *sk, struct msghdr *msg, size_t len, int noblock,
	return err;

csum_copy_err:
	if (!__sk_queue_drop_skb(sk, skb, flags, udp_skb_destructor)) {
	if (!__sk_queue_drop_skb(sk, &udp_sk(sk)->reader_queue, skb, flags,
				 udp_skb_destructor)) {
		UDP_INC_STATS(sock_net(sk), UDP_MIB_CSUMERRORS, is_udplite);
		UDP_INC_STATS(sock_net(sk), UDP_MIB_INERRORS, is_udplite);
	}
@@ -2325,6 +2430,9 @@ unsigned int udp_poll(struct file *file, struct socket *sock, poll_table *wait)
	unsigned int mask = datagram_poll(file, sock, wait);
	struct sock *sk = sock->sk;

	if (!skb_queue_empty(&udp_sk(sk)->reader_queue))
		mask |= POLLIN | POLLRDNORM;

	sock_rps_record_flow(sk);

	/* Check for false positives due to checksum errors */
+2 −1
Original line number Diff line number Diff line
@@ -455,7 +455,8 @@ int udpv6_recvmsg(struct sock *sk, struct msghdr *msg, size_t len,
	return err;

csum_copy_err:
	if (!__sk_queue_drop_skb(sk, skb, flags, udp_skb_destructor)) {
	if (!__sk_queue_drop_skb(sk, &udp_sk(sk)->reader_queue, skb, flags,
				 udp_skb_destructor)) {
		if (is_udp4) {
			UDP_INC_STATS(sock_net(sk),
				      UDP_MIB_CSUMERRORS, is_udplite);