Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit ad521763 authored by David S. Miller's avatar David S. Miller
Browse files

Merge branch 'rds-use-RCU-between-work-enqueue-and-connection-teardown'

Sowmini Varadhan says:

====================
rds: use RCU between work-enqueue and connection teardown

This patchset follows up on the root-cause mentioned in
https://www.spinics.net/lists/netdev/msg472849.html

Patch1 implements some code refactoring that was suggeseted
as an enhancement in http://patchwork.ozlabs.org/patch/843157/


It replaces the c_destroy_in_prog bit in rds_connection with
an atomically managed flag in rds_conn_path.

Patch2 builds on Patch1 and uses RCU to make sure that
work is only enqueued if the connection destroy is not already
in progress: the test-flag-and-enqueue is done under rcu_read_lock,
while destroy first sets the flag, uses synchronize_rcu to
wait for existing reader threads to complete, and then starts
all the work-cancellation.

Since I have not been able to reproduce the original stack traces
reported by syszbot, and these are fixes for a race condition that
are based on code-inspection I am not marking these as reported-by
at this time.
====================

Signed-off-by: default avatarDavid S. Miller <davem@davemloft.net>
parents eb9aa1bf 3db6e0d1
Loading
Loading
Loading
Loading
+7 −3
Original line number Diff line number Diff line
@@ -219,7 +219,11 @@ void rds_cong_queue_updates(struct rds_cong_map *map)
	spin_lock_irqsave(&rds_cong_lock, flags);

	list_for_each_entry(conn, &map->m_conn_list, c_map_item) {
		if (!test_and_set_bit(0, &conn->c_map_queued)) {
		struct rds_conn_path *cp = &conn->c_path[0];

		rcu_read_lock();
		if (!test_and_set_bit(0, &conn->c_map_queued) &&
		    !test_bit(RDS_DESTROY_PENDING, &cp->cp_flags)) {
			rds_stats_inc(s_cong_update_queued);
			/* We cannot inline the call to rds_send_xmit() here
			 * for two reasons (both pertaining to a TCP transport):
@@ -235,9 +239,9 @@ void rds_cong_queue_updates(struct rds_cong_map *map)
			 *    therefore trigger warnings.
			 * Defer the xmit to rds_send_worker() instead.
			 */
			queue_delayed_work(rds_wq,
					   &conn->c_path[0].cp_send_w, 0);
			queue_delayed_work(rds_wq, &cp->cp_send_w, 0);
		}
		rcu_read_unlock();
	}

	spin_unlock_irqrestore(&rds_cong_lock, flags);
+19 −5
Original line number Diff line number Diff line
@@ -366,8 +366,6 @@ void rds_conn_shutdown(struct rds_conn_path *cp)
	 * to the conn hash, so we never trigger a reconnect on this
	 * conn - the reconnect is always triggered by the active peer. */
	cancel_delayed_work_sync(&cp->cp_conn_w);
	if (conn->c_destroy_in_prog)
		return;
	rcu_read_lock();
	if (!hlist_unhashed(&conn->c_hash_node)) {
		rcu_read_unlock();
@@ -384,10 +382,13 @@ static void rds_conn_path_destroy(struct rds_conn_path *cp)
{
	struct rds_message *rm, *rtmp;

	set_bit(RDS_DESTROY_PENDING, &cp->cp_flags);

	if (!cp->cp_transport_data)
		return;

	/* make sure lingering queued work won't try to ref the conn */
	synchronize_rcu();
	cancel_delayed_work_sync(&cp->cp_send_w);
	cancel_delayed_work_sync(&cp->cp_recv_w);

@@ -405,6 +406,11 @@ static void rds_conn_path_destroy(struct rds_conn_path *cp)
	if (cp->cp_xmit_rm)
		rds_message_put(cp->cp_xmit_rm);

	WARN_ON(delayed_work_pending(&cp->cp_send_w));
	WARN_ON(delayed_work_pending(&cp->cp_recv_w));
	WARN_ON(delayed_work_pending(&cp->cp_conn_w));
	WARN_ON(work_pending(&cp->cp_down_w));

	cp->cp_conn->c_trans->conn_free(cp->cp_transport_data);
}

@@ -426,7 +432,6 @@ void rds_conn_destroy(struct rds_connection *conn)
		 "%pI4\n", conn, &conn->c_laddr,
		 &conn->c_faddr);

	conn->c_destroy_in_prog = 1;
	/* Ensure conn will not be scheduled for reconnect */
	spin_lock_irq(&rds_conn_lock);
	hlist_del_init_rcu(&conn->c_hash_node);
@@ -685,10 +690,13 @@ void rds_conn_path_drop(struct rds_conn_path *cp, bool destroy)
{
	atomic_set(&cp->cp_state, RDS_CONN_ERROR);

	if (!destroy && cp->cp_conn->c_destroy_in_prog)
	rcu_read_lock();
	if (!destroy && test_bit(RDS_DESTROY_PENDING, &cp->cp_flags)) {
		rcu_read_unlock();
		return;

	}
	queue_work(rds_wq, &cp->cp_down_w);
	rcu_read_unlock();
}
EXPORT_SYMBOL_GPL(rds_conn_path_drop);

@@ -705,9 +713,15 @@ EXPORT_SYMBOL_GPL(rds_conn_drop);
 */
void rds_conn_path_connect_if_down(struct rds_conn_path *cp)
{
	rcu_read_lock();
	if (test_bit(RDS_DESTROY_PENDING, &cp->cp_flags)) {
		rcu_read_unlock();
		return;
	}
	if (rds_conn_path_state(cp) == RDS_CONN_DOWN &&
	    !test_and_set_bit(RDS_RECONNECT_PENDING, &cp->cp_flags))
		queue_delayed_work(rds_wq, &cp->cp_conn_w, 0);
	rcu_read_unlock();
}
EXPORT_SYMBOL_GPL(rds_conn_path_connect_if_down);

+2 −2
Original line number Diff line number Diff line
@@ -88,6 +88,7 @@ enum {
#define RDS_RECONNECT_PENDING	1
#define RDS_IN_XMIT		2
#define RDS_RECV_REFILL		3
#define	RDS_DESTROY_PENDING	4

/* Max number of multipaths per RDS connection. Must be a power of 2 */
#define	RDS_MPATH_WORKERS	8
@@ -139,8 +140,7 @@ struct rds_connection {
	__be32			c_faddr;
	unsigned int		c_loopback:1,
				c_ping_triggered:1,
				c_destroy_in_prog:1,
				c_pad_to_32:29;
				c_pad_to_32:30;
	int			c_npaths;
	struct rds_connection	*c_passive;
	struct rds_transport	*c_trans;
+32 −5
Original line number Diff line number Diff line
@@ -162,6 +162,12 @@ int rds_send_xmit(struct rds_conn_path *cp)
		goto out;
	}

	if (test_bit(RDS_DESTROY_PENDING, &cp->cp_flags)) {
		release_in_xmit(cp);
		ret = -ENETUNREACH; /* dont requeue send work */
		goto out;
	}

	/*
	 * we record the send generation after doing the xmit acquire.
	 * if someone else manages to jump in and do some work, we'll use
@@ -437,7 +443,12 @@ int rds_send_xmit(struct rds_conn_path *cp)
		    !list_empty(&cp->cp_send_queue)) && !raced) {
			if (batch_count < send_batch_count)
				goto restart;
			rcu_read_lock();
			if (test_bit(RDS_DESTROY_PENDING, &cp->cp_flags))
				ret = -ENETUNREACH;
			else
				queue_delayed_work(rds_wq, &cp->cp_send_w, 1);
			rcu_read_unlock();
		} else if (raced) {
			rds_stats_inc(s_send_lock_queue_raced);
		}
@@ -1151,6 +1162,11 @@ int rds_sendmsg(struct socket *sock, struct msghdr *msg, size_t payload_len)
	else
		cpath = &conn->c_path[0];

	if (test_bit(RDS_DESTROY_PENDING, &cpath->cp_flags)) {
		ret = -EAGAIN;
		goto out;
	}

	rds_conn_path_connect_if_down(cpath);

	ret = rds_cong_wait(conn->c_fcong, dport, nonblock, rs);
@@ -1190,9 +1206,17 @@ int rds_sendmsg(struct socket *sock, struct msghdr *msg, size_t payload_len)
	rds_stats_inc(s_send_queued);

	ret = rds_send_xmit(cpath);
	if (ret == -ENOMEM || ret == -EAGAIN)
	if (ret == -ENOMEM || ret == -EAGAIN) {
		ret = 0;
		rcu_read_lock();
		if (test_bit(RDS_DESTROY_PENDING, &cpath->cp_flags))
			ret = -ENETUNREACH;
		else
			queue_delayed_work(rds_wq, &cpath->cp_send_w, 1);

		rcu_read_unlock();
	}
	if (ret)
		goto out;
	rds_message_put(rm);
	return payload_len;

@@ -1270,7 +1294,10 @@ rds_send_probe(struct rds_conn_path *cp, __be16 sport,
	rds_stats_inc(s_send_pong);

	/* schedule the send work on rds_wq */
	rcu_read_lock();
	if (!test_bit(RDS_DESTROY_PENDING, &cp->cp_flags))
		queue_delayed_work(rds_wq, &cp->cp_send_w, 1);
	rcu_read_unlock();

	rds_message_put(rm);
	return 0;
+1 −1
Original line number Diff line number Diff line
@@ -170,7 +170,7 @@ void rds_tcp_conn_path_shutdown(struct rds_conn_path *cp)
		 cp->cp_conn, tc, sock);

	if (sock) {
		if (cp->cp_conn->c_destroy_in_prog)
		if (test_bit(RDS_DESTROY_PENDING, &cp->cp_flags))
			rds_tcp_set_linger(sock);
		sock->ops->shutdown(sock, RCV_SHUTDOWN | SEND_SHUTDOWN);
		lock_sock(sock->sk);
Loading