Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit e00de341 authored by Sage Weil's avatar Sage Weil
Browse files

libceph: fix msgr standby handling



The standby logic used to be pretty dependent on the work requeueing
behavior that changed when we switched to WQ_NON_REENTRANT.  It was also
very fragile.

Restructure things so that:
 - We clear WRITE_PENDING when we set STANDBY.  This ensures we will
   requeue work when we wake up later.
 - con_work backs off if STANDBY is set.  There is nothing to do if we are
   in standby.
 - clear_standby() helper is called by both con_send() and con_keepalive(),
   the two actions that can wake us up again.  Move the connect_seq++
   logic here.

Signed-off-by: default avatarSage Weil <sage@newdream.net>
parent e76661d0
Loading
Loading
Loading
Loading
+22 −8
Original line number Diff line number Diff line
@@ -1712,14 +1712,6 @@ more:

	/* open the socket first? */
	if (con->sock == NULL) {
		/*
		 * if we were STANDBY and are reconnecting _this_
		 * connection, bump connect_seq now.  Always bump
		 * global_seq.
		 */
		if (test_and_clear_bit(STANDBY, &con->state))
			con->connect_seq++;

		prepare_write_banner(msgr, con);
		prepare_write_connect(msgr, con, 1);
		prepare_read_banner(con);
@@ -1962,6 +1954,10 @@ static void con_work(struct work_struct *work)
		}
	}

	if (test_bit(STANDBY, &con->state)) {
		dout("con_work %p STANDBY\n", con);
		goto done;
	}
	if (test_bit(CLOSED, &con->state)) { /* e.g. if we are replaced */
		dout("con_work CLOSED\n");
		con_close_socket(con);
@@ -2022,6 +2018,8 @@ static void ceph_fault(struct ceph_connection *con)
	 * the connection in a STANDBY state */
	if (list_empty(&con->out_queue) &&
	    !test_bit(KEEPALIVE_PENDING, &con->state)) {
		dout("fault %p setting STANDBY clearing WRITE_PENDING\n", con);
		clear_bit(WRITE_PENDING, &con->state);
		set_bit(STANDBY, &con->state);
	} else {
		/* retry after a delay. */
@@ -2117,6 +2115,19 @@ void ceph_messenger_destroy(struct ceph_messenger *msgr)
}
EXPORT_SYMBOL(ceph_messenger_destroy);

static void clear_standby(struct ceph_connection *con)
{
	/* come back from STANDBY? */
	if (test_and_clear_bit(STANDBY, &con->state)) {
		mutex_lock(&con->mutex);
		dout("clear_standby %p and ++connect_seq\n", con);
		con->connect_seq++;
		WARN_ON(test_bit(WRITE_PENDING, &con->state));
		WARN_ON(test_bit(KEEPALIVE_PENDING, &con->state));
		mutex_unlock(&con->mutex);
	}
}

/*
 * Queue up an outgoing message on the given connection.
 */
@@ -2149,6 +2160,7 @@ void ceph_con_send(struct ceph_connection *con, struct ceph_msg *msg)

	/* if there wasn't anything waiting to send before, queue
	 * new work */
	clear_standby(con);
	if (test_and_set_bit(WRITE_PENDING, &con->state) == 0)
		queue_con(con);
}
@@ -2214,6 +2226,8 @@ void ceph_con_revoke_message(struct ceph_connection *con, struct ceph_msg *msg)
 */
void ceph_con_keepalive(struct ceph_connection *con)
{
	dout("con_keepalive %p\n", con);
	clear_standby(con);
	if (test_and_set_bit(KEEPALIVE_PENDING, &con->state) == 0 &&
	    test_and_set_bit(WRITE_PENDING, &con->state) == 0)
		queue_con(con);