Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit a8d7d124 authored by David S. Miller's avatar David S. Miller
Browse files

Merge branch 'smc-splice-implementation'



Ursula Braun says:

====================
net/smc: splice implementation

Stefan comes up with an smc implementation for splice(). The first
three patches are preparational patches, the 4th patch implements
splice().
====================

Signed-off-by: default avatarDavid S. Miller <davem@davemloft.net>
parents a7b15ab8 9014db20
Loading
Loading
Loading
Loading
+33 −7
Original line number Diff line number Diff line
@@ -1089,7 +1089,7 @@ static int smc_accept(struct socket *sock, struct socket *new_sock,
			release_sock(clcsk);
		} else if (!atomic_read(&smc_sk(nsk)->conn.bytes_to_rcv)) {
			lock_sock(nsk);
			smc_rx_wait_data(smc_sk(nsk), &timeo);
			smc_rx_wait(smc_sk(nsk), &timeo, smc_rx_data_available);
			release_sock(nsk);
		}
	}
@@ -1163,10 +1163,12 @@ static int smc_recvmsg(struct socket *sock, struct msghdr *msg, size_t len,
		goto out;
	}

	if (smc->use_fallback)
	if (smc->use_fallback) {
		rc = smc->clcsock->ops->recvmsg(smc->clcsock, msg, len, flags);
	else
		rc = smc_rx_recvmsg(smc, msg, len, flags);
	} else {
		msg->msg_namelen = 0;
		rc = smc_rx_recvmsg(smc, msg, NULL, len, flags);
	}

out:
	release_sock(sk);
@@ -1447,6 +1449,12 @@ static ssize_t smc_sendpage(struct socket *sock, struct page *page,
	return rc;
}

/* Map the affected portions of the rmbe into an spd, note the number of bytes
 * to splice in conn->splice_pending, and press 'go'. Delays consumer cursor
 * updates till whenever a respective page has been fully processed.
 * Note that subsequent recv() calls have to wait till all splice() processing
 * completed.
 */
static ssize_t smc_splice_read(struct socket *sock, loff_t *ppos,
			       struct pipe_inode_info *pipe, size_t len,
			       unsigned int flags)
@@ -1457,16 +1465,34 @@ static ssize_t smc_splice_read(struct socket *sock, loff_t *ppos,

	smc = smc_sk(sk);
	lock_sock(sk);
	if ((sk->sk_state != SMC_ACTIVE) && (sk->sk_state != SMC_CLOSED))

	if (sk->sk_state == SMC_INIT ||
	    sk->sk_state == SMC_LISTEN ||
	    sk->sk_state == SMC_CLOSED)
		goto out;

	if (sk->sk_state == SMC_PEERFINCLOSEWAIT) {
		rc = 0;
		goto out;
	}

	if (smc->use_fallback) {
		rc = smc->clcsock->ops->splice_read(smc->clcsock, ppos,
						    pipe, len, flags);
	} else {
		rc = -EOPNOTSUPP;
		if (*ppos) {
			rc = -ESPIPE;
			goto out;
		}
		if (flags & SPLICE_F_NONBLOCK)
			flags = MSG_DONTWAIT;
		else
			flags = 0;
		rc = smc_rx_recvmsg(smc, NULL, pipe, len, flags);
	}
out:
	release_sock(sk);

	return rc;
}

+3 −0
Original line number Diff line number Diff line
@@ -164,6 +164,9 @@ struct smc_connection {
	atomic_t		bytes_to_rcv;	/* arrived data,
						 * not yet received
						 */
	atomic_t		splice_pending;	/* number of spliced bytes
						 * pending processing
						 */
#ifndef KERNEL_HAS_ATOMIC64
	spinlock_t		acurs_lock;	/* protect cursors */
#endif
+9 −9
Original line number Diff line number Diff line
@@ -290,8 +290,8 @@ static void smc_buf_free(struct smc_buf_desc *buf_desc, struct smc_link *lnk,
				    DMA_TO_DEVICE);
	}
	sg_free_table(&buf_desc->sgt[SMC_SINGLE_LINK]);
	if (buf_desc->cpu_addr)
		free_pages((unsigned long)buf_desc->cpu_addr, buf_desc->order);
	if (buf_desc->pages)
		__free_pages(buf_desc->pages, buf_desc->order);
	kfree(buf_desc);
}

@@ -566,16 +566,16 @@ static struct smc_buf_desc *smc_new_buf_create(struct smc_link_group *lgr,
	if (!buf_desc)
		return ERR_PTR(-ENOMEM);

	buf_desc->cpu_addr =
		(void *)__get_free_pages(GFP_KERNEL | __GFP_NOWARN |
					 __GFP_NOMEMALLOC |
	buf_desc->order = get_order(bufsize);
	buf_desc->pages = alloc_pages(GFP_KERNEL | __GFP_NOWARN |
				      __GFP_NOMEMALLOC | __GFP_COMP |
				      __GFP_NORETRY | __GFP_ZERO,
					 get_order(bufsize));
	if (!buf_desc->cpu_addr) {
				      buf_desc->order);
	if (!buf_desc->pages) {
		kfree(buf_desc);
		return ERR_PTR(-EAGAIN);
	}
	buf_desc->order = get_order(bufsize);
	buf_desc->cpu_addr = (void *)page_address(buf_desc->pages);

	/* build the sg table from the pages */
	lnk = &lgr->lnk[SMC_SINGLE_LINK];
+1 −0
Original line number Diff line number Diff line
@@ -120,6 +120,7 @@ struct smc_link {
struct smc_buf_desc {
	struct list_head	list;
	void			*cpu_addr;	/* virtual address of buffer */
	struct page		*pages;
	struct sg_table		sgt[SMC_LINKS_PER_LGR_MAX];/* virtual buffer */
	struct ib_mr		*mr_rx[SMC_LINKS_PER_LGR_MAX];
						/* for rmb only: memory region
+164 −36
Original line number Diff line number Diff line
@@ -22,11 +22,10 @@
#include "smc_tx.h" /* smc_tx_consumer_update() */
#include "smc_rx.h"

/* callback implementation for sk.sk_data_ready()
 * to wakeup rcvbuf consumers that blocked with smc_rx_wait_data().
/* callback implementation to wakeup consumers blocked with smc_rx_wait().
 * indirectly called by smc_cdc_msg_recv_action().
 */
static void smc_rx_data_ready(struct sock *sk)
static void smc_rx_wake_up(struct sock *sk)
{
	struct socket_wq *wq;

@@ -44,28 +43,140 @@ static void smc_rx_data_ready(struct sock *sk)
	rcu_read_unlock();
}

/* Update consumer cursor
 *   @conn   connection to update
 *   @cons   consumer cursor
 *   @len    number of Bytes consumed
 */
static void smc_rx_update_consumer(struct smc_connection *conn,
				   union smc_host_cursor cons, size_t len)
{
	smc_curs_add(conn->rmbe_size, &cons, len);
	smc_curs_write(&conn->local_tx_ctrl.cons, smc_curs_read(&cons, conn),
		       conn);
	/* send consumer cursor update if required */
	/* similar to advertising new TCP rcv_wnd if required */
	smc_tx_consumer_update(conn);
}

struct smc_spd_priv {
	struct smc_sock *smc;
	size_t		 len;
};

static void smc_rx_pipe_buf_release(struct pipe_inode_info *pipe,
				    struct pipe_buffer *buf)
{
	struct smc_spd_priv *priv = (struct smc_spd_priv *)buf->private;
	struct smc_sock *smc = priv->smc;
	struct smc_connection *conn;
	union smc_host_cursor cons;
	struct sock *sk = &smc->sk;

	if (sk->sk_state == SMC_CLOSED ||
	    sk->sk_state == SMC_PEERFINCLOSEWAIT ||
	    sk->sk_state == SMC_APPFINCLOSEWAIT)
		goto out;
	conn = &smc->conn;
	lock_sock(sk);
	smc_curs_write(&cons, smc_curs_read(&conn->local_tx_ctrl.cons, conn),
		       conn);
	smc_rx_update_consumer(conn, cons, priv->len);
	release_sock(sk);
	if (atomic_sub_and_test(priv->len, &conn->splice_pending))
		smc_rx_wake_up(sk);
out:
	kfree(priv);
	put_page(buf->page);
	sock_put(sk);
}

static int smc_rx_pipe_buf_nosteal(struct pipe_inode_info *pipe,
				   struct pipe_buffer *buf)
{
	return 1;
}

static const struct pipe_buf_operations smc_pipe_ops = {
	.can_merge = 0,
	.confirm = generic_pipe_buf_confirm,
	.release = smc_rx_pipe_buf_release,
	.steal = smc_rx_pipe_buf_nosteal,
	.get = generic_pipe_buf_get
};

static void smc_rx_spd_release(struct splice_pipe_desc *spd,
			       unsigned int i)
{
	put_page(spd->pages[i]);
}

static int smc_rx_splice(struct pipe_inode_info *pipe, char *src, size_t len,
			 struct smc_sock *smc)
{
	struct splice_pipe_desc spd;
	struct partial_page partial;
	struct smc_spd_priv *priv;
	struct page *page;
	int bytes;

	page = virt_to_page(smc->conn.rmb_desc->cpu_addr);
	priv = kzalloc(sizeof(*priv), GFP_KERNEL);
	if (!priv)
		return -ENOMEM;
	priv->len = len;
	priv->smc = smc;
	partial.offset = src - (char *)smc->conn.rmb_desc->cpu_addr;
	partial.len = len;
	partial.private = (unsigned long)priv;

	spd.nr_pages_max = 1;
	spd.nr_pages = 1;
	spd.pages = &page;
	spd.partial = &partial;
	spd.ops = &smc_pipe_ops;
	spd.spd_release = smc_rx_spd_release;

	bytes = splice_to_pipe(pipe, &spd);
	if (bytes > 0) {
		sock_hold(&smc->sk);
		get_page(smc->conn.rmb_desc->pages);
		atomic_add(bytes, &smc->conn.splice_pending);
	}

	return bytes;
}

static int smc_rx_data_available_and_no_splice_pend(struct smc_connection *conn)
{
	return atomic_read(&conn->bytes_to_rcv) &&
	       !atomic_read(&conn->splice_pending);
}

/* blocks rcvbuf consumer until >=len bytes available or timeout or interrupted
 *   @smc    smc socket
 *   @timeo  pointer to max seconds to wait, pointer to value 0 for no timeout
 *   @fcrit  add'l criterion to evaluate as function pointer
 * Returns:
 * 1 if at least 1 byte available in rcvbuf or if socket error/shutdown.
 * 0 otherwise (nothing in rcvbuf nor timeout, e.g. interrupted).
 */
int smc_rx_wait_data(struct smc_sock *smc, long *timeo)
int smc_rx_wait(struct smc_sock *smc, long *timeo,
		int (*fcrit)(struct smc_connection *conn))
{
	DEFINE_WAIT_FUNC(wait, woken_wake_function);
	struct smc_connection *conn = &smc->conn;
	struct sock *sk = &smc->sk;
	int rc;

	if (atomic_read(&conn->bytes_to_rcv))
	if (fcrit(conn))
		return 1;
	sk_set_bit(SOCKWQ_ASYNC_WAITDATA, sk);
	add_wait_queue(sk_sleep(sk), &wait);
	rc = sk_wait_event(sk, timeo,
			   sk->sk_err ||
			   sk->sk_shutdown & RCV_SHUTDOWN ||
			   atomic_read(&conn->bytes_to_rcv) ||
			   fcrit(conn) ||
			   smc_cdc_rxed_any_close_or_senddone(conn),
			   &wait);
	remove_wait_queue(sk_sleep(sk), &wait);
@@ -73,19 +184,25 @@ int smc_rx_wait_data(struct smc_sock *smc, long *timeo)
	return rc;
}

/* rcvbuf consumer: main API called by socket layer.
 * called under sk lock.
/* smc_rx_recvmsg - receive data from RMBE
 * @msg:	copy data to receive buffer
 * @pipe:	copy data to pipe if set - indicates splice() call
 *
 * rcvbuf consumer: main API called by socket layer.
 * Called under sk lock.
 */
int smc_rx_recvmsg(struct smc_sock *smc, struct msghdr *msg, size_t len,
		   int flags)
int smc_rx_recvmsg(struct smc_sock *smc, struct msghdr *msg,
		   struct pipe_inode_info *pipe, size_t len, int flags)
{
	size_t copylen, read_done = 0, read_remaining = len;
	size_t chunk_len, chunk_off, chunk_len_sum;
	struct smc_connection *conn = &smc->conn;
	int (*func)(struct smc_connection *conn);
	union smc_host_cursor cons;
	int readable, chunk;
	char *rcvbuf_base;
	struct sock *sk;
	int splbytes;
	long timeo;
	int target;		/* Read at least these many bytes */
	int rc;
@@ -101,37 +218,32 @@ int smc_rx_recvmsg(struct smc_sock *smc, struct msghdr *msg, size_t len,
	timeo = sock_rcvtimeo(sk, flags & MSG_DONTWAIT);
	target = sock_rcvlowat(sk, flags & MSG_WAITALL, len);

	msg->msg_namelen = 0;
	/* we currently use 1 RMBE per RMB, so RMBE == RMB base addr */
	rcvbuf_base = conn->rmb_desc->cpu_addr;

	do { /* while (read_remaining) */
		if (read_done >= target)
		if (read_done >= target || (pipe && read_done))
			break;

		if (atomic_read(&conn->bytes_to_rcv))
			goto copy;

		if (sk->sk_shutdown & RCV_SHUTDOWN ||
		    smc_cdc_rxed_any_close_or_senddone(conn) ||
		    conn->local_tx_ctrl.conn_state_flags.peer_conn_abort)
			break;

		if (read_done) {
			if (sk->sk_err ||
			    sk->sk_state == SMC_CLOSED ||
			    sk->sk_shutdown & RCV_SHUTDOWN ||
			    !timeo ||
			    signal_pending(current) ||
			    smc_cdc_rxed_any_close_or_senddone(conn) ||
			    conn->local_tx_ctrl.conn_state_flags.
			    peer_conn_abort)
			    signal_pending(current))
				break;
		} else {
			if (sk->sk_err) {
				read_done = sock_error(sk);
				break;
			}
			if (sk->sk_shutdown & RCV_SHUTDOWN ||
			    smc_cdc_rxed_any_close_or_senddone(conn) ||
			    conn->local_tx_ctrl.conn_state_flags.
			    peer_conn_abort)
				break;
			if (sk->sk_state == SMC_CLOSED) {
				if (!sock_flag(sk, SOCK_DONE)) {
					/* This occurs when user tries to read
@@ -150,20 +262,33 @@ int smc_rx_recvmsg(struct smc_sock *smc, struct msghdr *msg, size_t len,
				return -EAGAIN;
		}

		if (!atomic_read(&conn->bytes_to_rcv)) {
			smc_rx_wait_data(smc, &timeo);
		if (!smc_rx_data_available(conn)) {
			smc_rx_wait(smc, &timeo, smc_rx_data_available);
			continue;
		}

copy:
		/* initialize variables for 1st iteration of subsequent loop */
		/* could be just 1 byte, even after smc_rx_wait_data above */
		/* could be just 1 byte, even after waiting on data above */
		readable = atomic_read(&conn->bytes_to_rcv);
		splbytes = atomic_read(&conn->splice_pending);
		if (!readable || (msg && splbytes)) {
			if (splbytes)
				func = smc_rx_data_available_and_no_splice_pend;
			else
				func = smc_rx_data_available;
			smc_rx_wait(smc, &timeo, func);
			continue;
		}

		/* not more than what user space asked for */
		copylen = min_t(size_t, read_remaining, readable);
		smc_curs_write(&cons,
			       smc_curs_read(&conn->local_tx_ctrl.cons, conn),
			       conn);
		/* subsequent splice() calls pick up where previous left */
		if (splbytes)
			smc_curs_add(conn->rmbe_size, &cons, splbytes);
		/* determine chunks where to read from rcvbuf */
		/* either unwrapped case, or 1st chunk of wrapped case */
		chunk_len = min_t(size_t,
@@ -173,9 +298,16 @@ int smc_rx_recvmsg(struct smc_sock *smc, struct msghdr *msg, size_t len,
		smc_rmb_sync_sg_for_cpu(conn);
		for (chunk = 0; chunk < 2; chunk++) {
			if (!(flags & MSG_TRUNC)) {
				rc = memcpy_to_msg(msg, rcvbuf_base + chunk_off,
				if (msg) {
					rc = memcpy_to_msg(msg, rcvbuf_base +
							   chunk_off,
							   chunk_len);
				if (rc) {
				} else {
					rc = smc_rx_splice(pipe, rcvbuf_base +
							chunk_off, chunk_len,
							smc);
				}
				if (rc < 0) {
					if (!read_done)
						read_done = -EFAULT;
					smc_rmb_sync_sg_for_device(conn);
@@ -196,18 +328,13 @@ int smc_rx_recvmsg(struct smc_sock *smc, struct msghdr *msg, size_t len,

		/* update cursors */
		if (!(flags & MSG_PEEK)) {
			smc_curs_add(conn->rmbe_size, &cons, copylen);
			/* increased in recv tasklet smc_cdc_msg_rcv() */
			smp_mb__before_atomic();
			atomic_sub(copylen, &conn->bytes_to_rcv);
			/* guarantee 0 <= bytes_to_rcv <= rmbe_size */
			smp_mb__after_atomic();
			smc_curs_write(&conn->local_tx_ctrl.cons,
				       smc_curs_read(&cons, conn),
				       conn);
			/* send consumer cursor update if required */
			/* similar to advertising new TCP rcv_wnd if required */
			smc_tx_consumer_update(conn);
			if (msg)
				smc_rx_update_consumer(conn, cons, copylen);
		}
	} while (read_remaining);
out:
@@ -217,5 +344,6 @@ int smc_rx_recvmsg(struct smc_sock *smc, struct msghdr *msg, size_t len,
/* Initialize receive properties on connection establishment. NB: not __init! */
void smc_rx_init(struct smc_sock *smc)
{
	smc->sk.sk_data_ready = smc_rx_data_ready;
	smc->sk.sk_data_ready = smc_rx_wake_up;
	atomic_set(&smc->conn.splice_pending, 0);
}
Loading