Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit 7d1e8255 authored by Trond Myklebust's avatar Trond Myklebust
Browse files

SUNRPC: Add the equivalent of the linger and linger2 timeouts to RPC sockets



This fixes a regression against FreeBSD servers as reported by Tomas
Kasparek. Apparently when using RPC over a TCP socket, the FreeBSD servers
don't ever react to the client closing the socket, and so commit
e06799f9 (SUNRPC: Use shutdown() instead of
close() when disconnecting a TCP socket) causes the setup to hang forever
whenever the client attempts to close and then reconnect.

We break the deadlock by adding a 'linger2' style timeout to the socket,
after which, the client will abort the connection using a TCP 'RST'.

The default timeout is set to 15 seconds. A subsequent patch will put it
under user control by means of a systctl.

Signed-off-by: default avatarTrond Myklebust <Trond.Myklebust@netapp.com>
parent 5e3771ce
Loading
Loading
Loading
Loading
+1 −0
Original line number Original line Diff line number Diff line
@@ -260,6 +260,7 @@ void xprt_conditional_disconnect(struct rpc_xprt *xprt, unsigned int cookie);
#define XPRT_BOUND		(4)
#define XPRT_BOUND		(4)
#define XPRT_BINDING		(5)
#define XPRT_BINDING		(5)
#define XPRT_CLOSING		(6)
#define XPRT_CLOSING		(6)
#define XPRT_CONNECTION_ABORT	(7)


static inline void xprt_set_connected(struct rpc_xprt *xprt)
static inline void xprt_set_connected(struct rpc_xprt *xprt)
{
{
+81 −17
Original line number Original line Diff line number Diff line
@@ -49,6 +49,8 @@ unsigned int xprt_tcp_slot_table_entries = RPC_DEF_SLOT_TABLE;
unsigned int xprt_min_resvport = RPC_DEF_MIN_RESVPORT;
unsigned int xprt_min_resvport = RPC_DEF_MIN_RESVPORT;
unsigned int xprt_max_resvport = RPC_DEF_MAX_RESVPORT;
unsigned int xprt_max_resvport = RPC_DEF_MAX_RESVPORT;


#define XS_TCP_LINGER_TO	(15U * HZ)

/*
/*
 * We can register our own files under /proc/sys/sunrpc by
 * We can register our own files under /proc/sys/sunrpc by
 * calling register_sysctl_table() again.  The files in that
 * calling register_sysctl_table() again.  The files in that
@@ -806,6 +808,7 @@ static void xs_close(struct rpc_xprt *xprt)
	xs_reset_transport(transport);
	xs_reset_transport(transport);


	smp_mb__before_clear_bit();
	smp_mb__before_clear_bit();
	clear_bit(XPRT_CONNECTION_ABORT, &xprt->state);
	clear_bit(XPRT_CLOSE_WAIT, &xprt->state);
	clear_bit(XPRT_CLOSE_WAIT, &xprt->state);
	clear_bit(XPRT_CLOSING, &xprt->state);
	clear_bit(XPRT_CLOSING, &xprt->state);
	smp_mb__after_clear_bit();
	smp_mb__after_clear_bit();
@@ -1133,6 +1136,47 @@ out:
	read_unlock(&sk->sk_callback_lock);
	read_unlock(&sk->sk_callback_lock);
}
}


/*
 * Do the equivalent of linger/linger2 handling for dealing with
 * broken servers that don't close the socket in a timely
 * fashion
 */
static void xs_tcp_schedule_linger_timeout(struct rpc_xprt *xprt,
		unsigned long timeout)
{
	struct sock_xprt *transport;

	if (xprt_test_and_set_connecting(xprt))
		return;
	set_bit(XPRT_CONNECTION_ABORT, &xprt->state);
	transport = container_of(xprt, struct sock_xprt, xprt);
	queue_delayed_work(rpciod_workqueue, &transport->connect_worker,
			   timeout);
}

static void xs_tcp_cancel_linger_timeout(struct rpc_xprt *xprt)
{
	struct sock_xprt *transport;

	transport = container_of(xprt, struct sock_xprt, xprt);

	if (!test_bit(XPRT_CONNECTION_ABORT, &xprt->state) ||
	    !cancel_delayed_work(&transport->connect_worker))
		return;
	clear_bit(XPRT_CONNECTION_ABORT, &xprt->state);
	xprt_clear_connecting(xprt);
}

static void xs_sock_mark_closed(struct rpc_xprt *xprt)
{
	smp_mb__before_clear_bit();
	clear_bit(XPRT_CLOSE_WAIT, &xprt->state);
	clear_bit(XPRT_CLOSING, &xprt->state);
	smp_mb__after_clear_bit();
	/* Mark transport as closed and wake up all pending tasks */
	xprt_disconnect_done(xprt);
}

/**
/**
 * xs_tcp_state_change - callback to handle TCP socket state changes
 * xs_tcp_state_change - callback to handle TCP socket state changes
 * @sk: socket whose state has changed
 * @sk: socket whose state has changed
@@ -1178,6 +1222,7 @@ static void xs_tcp_state_change(struct sock *sk)
		clear_bit(XPRT_CONNECTED, &xprt->state);
		clear_bit(XPRT_CONNECTED, &xprt->state);
		clear_bit(XPRT_CLOSE_WAIT, &xprt->state);
		clear_bit(XPRT_CLOSE_WAIT, &xprt->state);
		smp_mb__after_clear_bit();
		smp_mb__after_clear_bit();
		xs_tcp_schedule_linger_timeout(xprt, XS_TCP_LINGER_TO);
		break;
		break;
	case TCP_CLOSE_WAIT:
	case TCP_CLOSE_WAIT:
		/* The server initiated a shutdown of the socket */
		/* The server initiated a shutdown of the socket */
@@ -1194,17 +1239,14 @@ static void xs_tcp_state_change(struct sock *sk)
		break;
		break;
	case TCP_LAST_ACK:
	case TCP_LAST_ACK:
		set_bit(XPRT_CLOSING, &xprt->state);
		set_bit(XPRT_CLOSING, &xprt->state);
		xs_tcp_schedule_linger_timeout(xprt, XS_TCP_LINGER_TO);
		smp_mb__before_clear_bit();
		smp_mb__before_clear_bit();
		clear_bit(XPRT_CONNECTED, &xprt->state);
		clear_bit(XPRT_CONNECTED, &xprt->state);
		smp_mb__after_clear_bit();
		smp_mb__after_clear_bit();
		break;
		break;
	case TCP_CLOSE:
	case TCP_CLOSE:
		smp_mb__before_clear_bit();
		xs_tcp_cancel_linger_timeout(xprt);
		clear_bit(XPRT_CLOSE_WAIT, &xprt->state);
		xs_sock_mark_closed(xprt);
		clear_bit(XPRT_CLOSING, &xprt->state);
		smp_mb__after_clear_bit();
		/* Mark transport as closed and wake up all pending tasks */
		xprt_disconnect_done(xprt);
	}
	}
 out:
 out:
	read_unlock(&sk->sk_callback_lock);
	read_unlock(&sk->sk_callback_lock);
@@ -1562,8 +1604,8 @@ static void xs_udp_connect_worker4(struct work_struct *work)
	xs_udp_finish_connecting(xprt, sock);
	xs_udp_finish_connecting(xprt, sock);
	status = 0;
	status = 0;
out:
out:
	xprt_wake_pending_tasks(xprt, status);
	xprt_clear_connecting(xprt);
	xprt_clear_connecting(xprt);
	xprt_wake_pending_tasks(xprt, status);
}
}


/**
/**
@@ -1604,8 +1646,8 @@ static void xs_udp_connect_worker6(struct work_struct *work)
	xs_udp_finish_connecting(xprt, sock);
	xs_udp_finish_connecting(xprt, sock);
	status = 0;
	status = 0;
out:
out:
	xprt_wake_pending_tasks(xprt, status);
	xprt_clear_connecting(xprt);
	xprt_clear_connecting(xprt);
	xprt_wake_pending_tasks(xprt, status);
}
}


/*
/*
@@ -1626,7 +1668,9 @@ static void xs_abort_connection(struct rpc_xprt *xprt, struct sock_xprt *transpo
	memset(&any, 0, sizeof(any));
	memset(&any, 0, sizeof(any));
	any.sa_family = AF_UNSPEC;
	any.sa_family = AF_UNSPEC;
	result = kernel_connect(transport->sock, &any, sizeof(any), 0);
	result = kernel_connect(transport->sock, &any, sizeof(any), 0);
	if (result)
	if (!result)
		xs_sock_mark_closed(xprt);
	else
		dprintk("RPC:       AF_UNSPEC connect return code %d\n",
		dprintk("RPC:       AF_UNSPEC connect return code %d\n",
				result);
				result);
}
}
@@ -1702,6 +1746,7 @@ static void xs_tcp_connect_worker4(struct work_struct *work)
		goto out;
		goto out;


	if (!sock) {
	if (!sock) {
		clear_bit(XPRT_CONNECTION_ABORT, &xprt->state);
		/* start from scratch */
		/* start from scratch */
		if ((err = sock_create_kern(PF_INET, SOCK_STREAM, IPPROTO_TCP, &sock)) < 0) {
		if ((err = sock_create_kern(PF_INET, SOCK_STREAM, IPPROTO_TCP, &sock)) < 0) {
			dprintk("RPC:       can't create TCP transport socket (%d).\n", -err);
			dprintk("RPC:       can't create TCP transport socket (%d).\n", -err);
@@ -1713,10 +1758,18 @@ static void xs_tcp_connect_worker4(struct work_struct *work)
			sock_release(sock);
			sock_release(sock);
			goto out;
			goto out;
		}
		}
	} else
	} else {
		int abort_and_exit;

		abort_and_exit = test_and_clear_bit(XPRT_CONNECTION_ABORT,
				&xprt->state);
		/* "close" the socket, preserving the local port */
		/* "close" the socket, preserving the local port */
		xs_tcp_reuse_connection(xprt, transport);
		xs_tcp_reuse_connection(xprt, transport);


		if (abort_and_exit)
			goto out_eagain;
	}

	dprintk("RPC:       worker connecting xprt %p to address: %s\n",
	dprintk("RPC:       worker connecting xprt %p to address: %s\n",
			xprt, xprt->address_strings[RPC_DISPLAY_ALL]);
			xprt, xprt->address_strings[RPC_DISPLAY_ALL]);


@@ -1732,17 +1785,18 @@ static void xs_tcp_connect_worker4(struct work_struct *work)
	case 0:
	case 0:
	case -EINPROGRESS:
	case -EINPROGRESS:
	case -EALREADY:
	case -EALREADY:
		goto out_clear;
		xprt_clear_connecting(xprt);
		return;
	}
	}
	/* get rid of existing socket, and retry */
	/* get rid of existing socket, and retry */
	xs_tcp_shutdown(xprt);
	xs_tcp_shutdown(xprt);
	printk("%s: connect returned unhandled error %d\n",
	printk("%s: connect returned unhandled error %d\n",
			__func__, status);
			__func__, status);
out_eagain:
	status = -EAGAIN;
	status = -EAGAIN;
out:
out:
	xprt_wake_pending_tasks(xprt, status);
out_clear:
	xprt_clear_connecting(xprt);
	xprt_clear_connecting(xprt);
	xprt_wake_pending_tasks(xprt, status);
}
}


/**
/**
@@ -1763,6 +1817,7 @@ static void xs_tcp_connect_worker6(struct work_struct *work)
		goto out;
		goto out;


	if (!sock) {
	if (!sock) {
		clear_bit(XPRT_CONNECTION_ABORT, &xprt->state);
		/* start from scratch */
		/* start from scratch */
		if ((err = sock_create_kern(PF_INET6, SOCK_STREAM, IPPROTO_TCP, &sock)) < 0) {
		if ((err = sock_create_kern(PF_INET6, SOCK_STREAM, IPPROTO_TCP, &sock)) < 0) {
			dprintk("RPC:       can't create TCP transport socket (%d).\n", -err);
			dprintk("RPC:       can't create TCP transport socket (%d).\n", -err);
@@ -1774,10 +1829,18 @@ static void xs_tcp_connect_worker6(struct work_struct *work)
			sock_release(sock);
			sock_release(sock);
			goto out;
			goto out;
		}
		}
	} else
	} else {
		int abort_and_exit;

		abort_and_exit = test_and_clear_bit(XPRT_CONNECTION_ABORT,
				&xprt->state);
		/* "close" the socket, preserving the local port */
		/* "close" the socket, preserving the local port */
		xs_tcp_reuse_connection(xprt, transport);
		xs_tcp_reuse_connection(xprt, transport);


		if (abort_and_exit)
			goto out_eagain;
	}

	dprintk("RPC:       worker connecting xprt %p to address: %s\n",
	dprintk("RPC:       worker connecting xprt %p to address: %s\n",
			xprt, xprt->address_strings[RPC_DISPLAY_ALL]);
			xprt, xprt->address_strings[RPC_DISPLAY_ALL]);


@@ -1792,17 +1855,18 @@ static void xs_tcp_connect_worker6(struct work_struct *work)
	case 0:
	case 0:
	case -EINPROGRESS:
	case -EINPROGRESS:
	case -EALREADY:
	case -EALREADY:
		goto out_clear;
		xprt_clear_connecting(xprt);
		return;
	}
	}
	/* get rid of existing socket, and retry */
	/* get rid of existing socket, and retry */
	xs_tcp_shutdown(xprt);
	xs_tcp_shutdown(xprt);
	printk("%s: connect returned unhandled error %d\n",
	printk("%s: connect returned unhandled error %d\n",
			__func__, status);
			__func__, status);
out_eagain:
	status = -EAGAIN;
	status = -EAGAIN;
out:
out:
	xprt_wake_pending_tasks(xprt, status);
out_clear:
	xprt_clear_connecting(xprt);
	xprt_clear_connecting(xprt);
	xprt_wake_pending_tasks(xprt, status);
}
}


/**
/**