Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit b0d93ad5 authored by Chuck Lever's avatar Chuck Lever Committed by Trond Myklebust
Browse files

[PATCH] RPC: separate TCP and UDP transport connection logic



 Create separate connection worker functions for managing UDP and TCP
 transport sockets.  This eliminates several dependencies on "xprt->stream".

 Test-plan:
 Destructive testing (unplugging the network temporarily).  Connectathon with
 v2, v3, and v4.

 Version: Thu, 11 Aug 2005 16:08:18 -0400

 Signed-off-by: default avatarChuck Lever <cel@netapp.com>
 Signed-off-by: default avatarTrond Myklebust <Trond.Myklebust@netapp.com>
parent c7b2cae8
Loading
Loading
Loading
Loading
+91 −73
Original line number Diff line number Diff line
@@ -836,102 +836,118 @@ static int xs_bindresvport(struct rpc_xprt *xprt, struct socket *sock)
	return err;
}

static struct socket *xs_create(struct rpc_xprt *xprt, int proto, int resvport)
/**
 * xs_udp_connect_worker - set up a UDP socket
 * @args: RPC transport to connect
 *
 * Invoked by a work queue tasklet.
 */
static void xs_udp_connect_worker(void *args)
{
	struct socket *sock;
	int type, err;

	dprintk("RPC:      xs_create(%s %d)\n",
			   (proto == IPPROTO_UDP)? "udp" : "tcp", proto);
	struct rpc_xprt *xprt = (struct rpc_xprt *) args;
	struct socket *sock = xprt->sock;
	int err, status = -EIO;

	type = (proto == IPPROTO_UDP)? SOCK_DGRAM : SOCK_STREAM;
	if (xprt->shutdown || xprt->addr.sin_port == 0)
		goto out;

	if ((err = sock_create_kern(PF_INET, type, proto, &sock)) < 0) {
		dprintk("RPC:      can't create socket (%d).\n", -err);
		return NULL;
	}
	dprintk("RPC:      xs_udp_connect_worker for xprt %p\n", xprt);

	/* If the caller has the capability, bind to a reserved port */
	if (resvport && xs_bindresvport(xprt, sock) < 0)
		goto failed;
	/* Start by resetting any existing state */
	xs_close(xprt);

	return sock;
	if ((err = sock_create_kern(PF_INET, SOCK_DGRAM, IPPROTO_UDP, &sock)) < 0) {
		dprintk("RPC:      can't create UDP transport socket (%d).\n", -err);
		goto out;
	}

failed:
	if (xprt->resvport && xs_bindresvport(xprt, sock) < 0) {
		sock_release(sock);
	return NULL;
		goto out;
	}

static void xs_bind(struct rpc_xprt *xprt, struct socket *sock)
{
	if (!xprt->inet) {
		struct sock *sk = sock->sk;

	if (xprt->inet)
		return;

		write_lock_bh(&sk->sk_callback_lock);

		sk->sk_user_data = xprt;
		xprt->old_data_ready = sk->sk_data_ready;
		xprt->old_state_change = sk->sk_state_change;
		xprt->old_write_space = sk->sk_write_space;
	if (xprt->prot == IPPROTO_UDP) {
		sk->sk_data_ready = xs_udp_data_ready;
		sk->sk_write_space = xs_udp_write_space;
		sk->sk_no_check = UDP_CSUM_NORCV;

		xprt_set_connected(xprt);
	} else {
		tcp_sk(sk)->nonagle = 1;	/* disable Nagle's algorithm */
		sk->sk_data_ready = xs_tcp_data_ready;
		sk->sk_state_change = xs_tcp_state_change;
		sk->sk_write_space = xs_tcp_write_space;
		xprt_clear_connected(xprt);
	}

		/* Reset to new socket */
		xprt->sock = sock;
		xprt->inet = sk;
	write_unlock_bh(&sk->sk_callback_lock);

	return;
		write_unlock_bh(&sk->sk_callback_lock);
	}
	xs_set_buffer_size(xprt);
	status = 0;
out:
	xprt_wake_pending_tasks(xprt, status);
	xprt_clear_connecting(xprt);
}

/**
 * xs_connect_worker - try to connect a socket to a remote endpoint
 * xs_tcp_connect_worker - connect a TCP socket to a remote endpoint
 * @args: RPC transport to connect
 *
 * Invoked by a work queue tasklet.
 */
static void xs_connect_worker(void *args)
static void xs_tcp_connect_worker(void *args)
{
	struct rpc_xprt *xprt = (struct rpc_xprt *)args;
	struct socket *sock = xprt->sock;
	int status = -EIO;
	int err, status = -EIO;

	if (xprt->shutdown || xprt->addr.sin_port == 0)
		goto out;

	dprintk("RPC:      xs_connect_worker xprt %p\n", xprt);
	dprintk("RPC:      xs_tcp_connect_worker for xprt %p\n", xprt);

	/*
	 * Start by resetting any existing state
	 */
	/* Start by resetting any existing socket state */
	xs_close(xprt);
	sock = xs_create(xprt, xprt->prot, xprt->resvport);
	if (sock == NULL) {
		/* couldn't create socket or bind to reserved port;
		 * this is likely a permanent error, so cause an abort */

	if ((err = sock_create_kern(PF_INET, SOCK_STREAM, IPPROTO_TCP, &sock)) < 0) {
		dprintk("RPC:      can't create TCP transport socket (%d).\n", -err);
		goto out;
	}
	xs_bind(xprt, sock);
	xs_set_buffer_size(xprt);

	status = 0;
	if (!xprt->stream)
	if (xprt->resvport && xs_bindresvport(xprt, sock) < 0) {
		sock_release(sock);
		goto out;
	}

	/*
	 * Tell the socket layer to start connecting...
	 */
	if (!xprt->inet) {
		struct sock *sk = sock->sk;

		write_lock_bh(&sk->sk_callback_lock);

		sk->sk_user_data = xprt;
		xprt->old_data_ready = sk->sk_data_ready;
		xprt->old_state_change = sk->sk_state_change;
		xprt->old_write_space = sk->sk_write_space;
		sk->sk_data_ready = xs_tcp_data_ready;
		sk->sk_state_change = xs_tcp_state_change;
		sk->sk_write_space = xs_tcp_write_space;
		tcp_sk(sk)->nonagle = 1;

		xprt_clear_connected(xprt);

		/* Reset to new socket */
		xprt->sock = sock;
		xprt->inet = sk;

		write_unlock_bh(&sk->sk_callback_lock);
	}

	/* Tell the socket layer to start connecting... */
	status = sock->ops->connect(sock, (struct sockaddr *) &xprt->addr,
			sizeof(xprt->addr), O_NONBLOCK);
	dprintk("RPC: %p  connect status %d connected %d sock state %d\n",
@@ -959,7 +975,9 @@ static void xs_connect(struct rpc_task *task)
{
	struct rpc_xprt *xprt = task->tk_xprt;

	if (!xprt_test_and_set_connecting(xprt)) {
	if (xprt_test_and_set_connecting(xprt))
		return;

	if (xprt->sock != NULL) {
		dprintk("RPC:      xs_connect delayed xprt %p\n", xprt);
		schedule_delayed_work(&xprt->connect_worker,
@@ -967,12 +985,12 @@ static void xs_connect(struct rpc_task *task)
	} else {
		dprintk("RPC:      xs_connect scheduled xprt %p\n", xprt);
		schedule_work(&xprt->connect_worker);

		/* flush_scheduled_work can sleep... */
		if (!RPC_IS_ASYNC(task))
			flush_scheduled_work();
	}
}
}

static struct rpc_xprt_ops xs_ops = {
	.set_buffer_size	= xs_set_buffer_size,
@@ -1013,7 +1031,7 @@ int xs_setup_udp(struct rpc_xprt *xprt, struct rpc_timeout *to)
	/* XXX: header size can vary due to auth type, IPv6, etc. */
	xprt->max_payload = (1U << 16) - (MAX_HEADER << 3);

	INIT_WORK(&xprt->connect_worker, xs_connect_worker, xprt);
	INIT_WORK(&xprt->connect_worker, xs_udp_connect_worker, xprt);

	xprt->ops = &xs_ops;

@@ -1052,7 +1070,7 @@ int xs_setup_tcp(struct rpc_xprt *xprt, struct rpc_timeout *to)
	xprt->resvport = capable(CAP_NET_BIND_SERVICE) ? 1 : 0;
	xprt->max_payload = (1U << 31) - 1;

	INIT_WORK(&xprt->connect_worker, xs_connect_worker, xprt);
	INIT_WORK(&xprt->connect_worker, xs_tcp_connect_worker, xprt);

	xprt->ops = &xs_ops;