Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit 928443cd authored by Alex Elder's avatar Alex Elder Committed by Alex Elder
Browse files

libceph: start separating connection flags from state



A ceph_connection holds a mixture of connection state (as in "state
machine" state) and connection flags in a single "state" field.  To
make the distinction more clear, define a new "flags" field and use
it rather than the "state" field to hold Boolean flag values.

Signed-off-by: default avatarAlex Elder <elder@inktank.com>
Reviewed-by: default avatarSage <Weil&lt;sage@inktank.com>
parent 15d9882c
Loading
Loading
Loading
Loading
+12 −6
Original line number Original line Diff line number Diff line
@@ -103,20 +103,25 @@ struct ceph_msg_pos {
#define MAX_DELAY_INTERVAL	(5 * 60 * HZ)
#define MAX_DELAY_INTERVAL	(5 * 60 * HZ)


/*
/*
 * ceph_connection state bit flags
 * ceph_connection flag bits
 */
 */

#define LOSSYTX         0  /* we can close channel or drop messages on errors */
#define LOSSYTX         0  /* we can close channel or drop messages on errors */
#define CONNECTING	1
#define NEGOTIATING	2
#define KEEPALIVE_PENDING      3
#define KEEPALIVE_PENDING      3
#define WRITE_PENDING	4  /* we have data ready to send */
#define WRITE_PENDING	4  /* we have data ready to send */
#define SOCK_CLOSED	11 /* socket state changed to closed */
#define BACKOFF         15

/*
 * ceph_connection states
 */
#define CONNECTING	1
#define NEGOTIATING	2
#define STANDBY		8  /* no outgoing messages, socket closed.  we keep
#define STANDBY		8  /* no outgoing messages, socket closed.  we keep
			    * the ceph_connection around to maintain shared
			    * the ceph_connection around to maintain shared
			    * state with the peer. */
			    * state with the peer. */
#define CLOSED		10 /* we've closed the connection */
#define CLOSED		10 /* we've closed the connection */
#define SOCK_CLOSED	11 /* socket state changed to closed */
#define OPENING         13 /* open connection w/ (possibly new) peer */
#define OPENING         13 /* open connection w/ (possibly new) peer */
#define BACKOFF         15


/*
/*
 * A single connection with another host.
 * A single connection with another host.
@@ -133,7 +138,8 @@ struct ceph_connection {


	struct ceph_messenger *msgr;
	struct ceph_messenger *msgr;
	struct socket *sock;
	struct socket *sock;
	unsigned long state;	/* connection state (see flags above) */
	unsigned long flags;
	unsigned long state;
	const char *error_msg;  /* error message, if any */
	const char *error_msg;  /* error message, if any */


	struct ceph_entity_addr peer_addr; /* peer address */
	struct ceph_entity_addr peer_addr; /* peer address */
+25 −25
Original line number Original line Diff line number Diff line
@@ -176,7 +176,7 @@ static void ceph_sock_write_space(struct sock *sk)
	 * buffer. See net/ipv4/tcp_input.c:tcp_check_space()
	 * buffer. See net/ipv4/tcp_input.c:tcp_check_space()
	 * and net/core/stream.c:sk_stream_write_space().
	 * and net/core/stream.c:sk_stream_write_space().
	 */
	 */
	if (test_bit(WRITE_PENDING, &con->state)) {
	if (test_bit(WRITE_PENDING, &con->flags)) {
		if (sk_stream_wspace(sk) >= sk_stream_min_wspace(sk)) {
		if (sk_stream_wspace(sk) >= sk_stream_min_wspace(sk)) {
			dout("%s %p queueing write work\n", __func__, con);
			dout("%s %p queueing write work\n", __func__, con);
			clear_bit(SOCK_NOSPACE, &sk->sk_socket->flags);
			clear_bit(SOCK_NOSPACE, &sk->sk_socket->flags);
@@ -203,7 +203,7 @@ static void ceph_sock_state_change(struct sock *sk)
		dout("%s TCP_CLOSE\n", __func__);
		dout("%s TCP_CLOSE\n", __func__);
	case TCP_CLOSE_WAIT:
	case TCP_CLOSE_WAIT:
		dout("%s TCP_CLOSE_WAIT\n", __func__);
		dout("%s TCP_CLOSE_WAIT\n", __func__);
		if (test_and_set_bit(SOCK_CLOSED, &con->state) == 0) {
		if (test_and_set_bit(SOCK_CLOSED, &con->flags) == 0) {
			if (test_bit(CONNECTING, &con->state))
			if (test_bit(CONNECTING, &con->state))
				con->error_msg = "connection failed";
				con->error_msg = "connection failed";
			else
			else
@@ -395,9 +395,9 @@ void ceph_con_close(struct ceph_connection *con)
	     ceph_pr_addr(&con->peer_addr.in_addr));
	     ceph_pr_addr(&con->peer_addr.in_addr));
	set_bit(CLOSED, &con->state);  /* in case there's queued work */
	set_bit(CLOSED, &con->state);  /* in case there's queued work */
	clear_bit(STANDBY, &con->state);  /* avoid connect_seq bump */
	clear_bit(STANDBY, &con->state);  /* avoid connect_seq bump */
	clear_bit(LOSSYTX, &con->state);  /* so we retry next connect */
	clear_bit(LOSSYTX, &con->flags);  /* so we retry next connect */
	clear_bit(KEEPALIVE_PENDING, &con->state);
	clear_bit(KEEPALIVE_PENDING, &con->flags);
	clear_bit(WRITE_PENDING, &con->state);
	clear_bit(WRITE_PENDING, &con->flags);
	mutex_lock(&con->mutex);
	mutex_lock(&con->mutex);
	reset_connection(con);
	reset_connection(con);
	con->peer_global_seq = 0;
	con->peer_global_seq = 0;
@@ -614,7 +614,7 @@ static void prepare_write_message(struct ceph_connection *con)
		prepare_write_message_footer(con);
		prepare_write_message_footer(con);
	}
	}


	set_bit(WRITE_PENDING, &con->state);
	set_bit(WRITE_PENDING, &con->flags);
}
}


/*
/*
@@ -635,7 +635,7 @@ static void prepare_write_ack(struct ceph_connection *con)
				&con->out_temp_ack);
				&con->out_temp_ack);


	con->out_more = 1;  /* more will follow.. eventually.. */
	con->out_more = 1;  /* more will follow.. eventually.. */
	set_bit(WRITE_PENDING, &con->state);
	set_bit(WRITE_PENDING, &con->flags);
}
}


/*
/*
@@ -646,7 +646,7 @@ static void prepare_write_keepalive(struct ceph_connection *con)
	dout("prepare_write_keepalive %p\n", con);
	dout("prepare_write_keepalive %p\n", con);
	con_out_kvec_reset(con);
	con_out_kvec_reset(con);
	con_out_kvec_add(con, sizeof (tag_keepalive), &tag_keepalive);
	con_out_kvec_add(con, sizeof (tag_keepalive), &tag_keepalive);
	set_bit(WRITE_PENDING, &con->state);
	set_bit(WRITE_PENDING, &con->flags);
}
}


/*
/*
@@ -675,7 +675,7 @@ static struct ceph_auth_handshake *get_connect_authorizer(struct ceph_connection


	if (IS_ERR(auth))
	if (IS_ERR(auth))
		return auth;
		return auth;
	if (test_bit(CLOSED, &con->state) || test_bit(OPENING, &con->state))
	if (test_bit(CLOSED, &con->state) || test_bit(OPENING, &con->flags))
		return ERR_PTR(-EAGAIN);
		return ERR_PTR(-EAGAIN);


	con->auth_reply_buf = auth->authorizer_reply_buf;
	con->auth_reply_buf = auth->authorizer_reply_buf;
@@ -695,7 +695,7 @@ static void prepare_write_banner(struct ceph_connection *con)
					&con->msgr->my_enc_addr);
					&con->msgr->my_enc_addr);


	con->out_more = 0;
	con->out_more = 0;
	set_bit(WRITE_PENDING, &con->state);
	set_bit(WRITE_PENDING, &con->flags);
}
}


static int prepare_write_connect(struct ceph_connection *con)
static int prepare_write_connect(struct ceph_connection *con)
@@ -745,7 +745,7 @@ static int prepare_write_connect(struct ceph_connection *con)
					auth->authorizer_buf);
					auth->authorizer_buf);


	con->out_more = 0;
	con->out_more = 0;
	set_bit(WRITE_PENDING, &con->state);
	set_bit(WRITE_PENDING, &con->flags);


	return 0;
	return 0;
}
}
@@ -1492,7 +1492,7 @@ static int process_connect(struct ceph_connection *con)
			le32_to_cpu(con->in_reply.connect_seq));
			le32_to_cpu(con->in_reply.connect_seq));


		if (con->in_reply.flags & CEPH_MSG_CONNECT_LOSSY)
		if (con->in_reply.flags & CEPH_MSG_CONNECT_LOSSY)
			set_bit(LOSSYTX, &con->state);
			set_bit(LOSSYTX, &con->flags);


		prepare_read_tag(con);
		prepare_read_tag(con);
		break;
		break;
@@ -1933,14 +1933,14 @@ do_next:
			prepare_write_ack(con);
			prepare_write_ack(con);
			goto more;
			goto more;
		}
		}
		if (test_and_clear_bit(KEEPALIVE_PENDING, &con->state)) {
		if (test_and_clear_bit(KEEPALIVE_PENDING, &con->flags)) {
			prepare_write_keepalive(con);
			prepare_write_keepalive(con);
			goto more;
			goto more;
		}
		}
	}
	}


	/* Nothing to do! */
	/* Nothing to do! */
	clear_bit(WRITE_PENDING, &con->state);
	clear_bit(WRITE_PENDING, &con->flags);
	dout("try_write nothing else to write.\n");
	dout("try_write nothing else to write.\n");
	ret = 0;
	ret = 0;
out:
out:
@@ -2106,7 +2106,7 @@ static void con_work(struct work_struct *work)


	mutex_lock(&con->mutex);
	mutex_lock(&con->mutex);
restart:
restart:
	if (test_and_clear_bit(BACKOFF, &con->state)) {
	if (test_and_clear_bit(BACKOFF, &con->flags)) {
		dout("con_work %p backing off\n", con);
		dout("con_work %p backing off\n", con);
		if (queue_delayed_work(ceph_msgr_wq, &con->work,
		if (queue_delayed_work(ceph_msgr_wq, &con->work,
				       round_jiffies_relative(con->delay))) {
				       round_jiffies_relative(con->delay))) {
@@ -2135,7 +2135,7 @@ restart:
		con_close_socket(con);
		con_close_socket(con);
	}
	}


	if (test_and_clear_bit(SOCK_CLOSED, &con->state))
	if (test_and_clear_bit(SOCK_CLOSED, &con->flags))
		goto fault;
		goto fault;


	ret = try_read(con);
	ret = try_read(con);
@@ -2174,7 +2174,7 @@ static void ceph_fault(struct ceph_connection *con)
	dout("fault %p state %lu to peer %s\n",
	dout("fault %p state %lu to peer %s\n",
	     con, con->state, ceph_pr_addr(&con->peer_addr.in_addr));
	     con, con->state, ceph_pr_addr(&con->peer_addr.in_addr));


	if (test_bit(LOSSYTX, &con->state)) {
	if (test_bit(LOSSYTX, &con->flags)) {
		dout("fault on LOSSYTX channel\n");
		dout("fault on LOSSYTX channel\n");
		goto out;
		goto out;
	}
	}
@@ -2196,9 +2196,9 @@ static void ceph_fault(struct ceph_connection *con)
	/* If there are no messages queued or keepalive pending, place
	/* If there are no messages queued or keepalive pending, place
	 * the connection in a STANDBY state */
	 * the connection in a STANDBY state */
	if (list_empty(&con->out_queue) &&
	if (list_empty(&con->out_queue) &&
	    !test_bit(KEEPALIVE_PENDING, &con->state)) {
	    !test_bit(KEEPALIVE_PENDING, &con->flags)) {
		dout("fault %p setting STANDBY clearing WRITE_PENDING\n", con);
		dout("fault %p setting STANDBY clearing WRITE_PENDING\n", con);
		clear_bit(WRITE_PENDING, &con->state);
		clear_bit(WRITE_PENDING, &con->flags);
		set_bit(STANDBY, &con->state);
		set_bit(STANDBY, &con->state);
	} else {
	} else {
		/* retry after a delay. */
		/* retry after a delay. */
@@ -2222,7 +2222,7 @@ static void ceph_fault(struct ceph_connection *con)
			 * that when con_work restarts we schedule the
			 * that when con_work restarts we schedule the
			 * delay then.
			 * delay then.
			 */
			 */
			set_bit(BACKOFF, &con->state);
			set_bit(BACKOFF, &con->flags);
		}
		}
	}
	}


@@ -2278,8 +2278,8 @@ static void clear_standby(struct ceph_connection *con)
		mutex_lock(&con->mutex);
		mutex_lock(&con->mutex);
		dout("clear_standby %p and ++connect_seq\n", con);
		dout("clear_standby %p and ++connect_seq\n", con);
		con->connect_seq++;
		con->connect_seq++;
		WARN_ON(test_bit(WRITE_PENDING, &con->state));
		WARN_ON(test_bit(WRITE_PENDING, &con->flags));
		WARN_ON(test_bit(KEEPALIVE_PENDING, &con->state));
		WARN_ON(test_bit(KEEPALIVE_PENDING, &con->flags));
		mutex_unlock(&con->mutex);
		mutex_unlock(&con->mutex);
	}
	}
}
}
@@ -2317,7 +2317,7 @@ void ceph_con_send(struct ceph_connection *con, struct ceph_msg *msg)
	/* if there wasn't anything waiting to send before, queue
	/* if there wasn't anything waiting to send before, queue
	 * new work */
	 * new work */
	clear_standby(con);
	clear_standby(con);
	if (test_and_set_bit(WRITE_PENDING, &con->state) == 0)
	if (test_and_set_bit(WRITE_PENDING, &con->flags) == 0)
		queue_con(con);
		queue_con(con);
}
}
EXPORT_SYMBOL(ceph_con_send);
EXPORT_SYMBOL(ceph_con_send);
@@ -2384,8 +2384,8 @@ void ceph_con_keepalive(struct ceph_connection *con)
{
{
	dout("con_keepalive %p\n", con);
	dout("con_keepalive %p\n", con);
	clear_standby(con);
	clear_standby(con);
	if (test_and_set_bit(KEEPALIVE_PENDING, &con->state) == 0 &&
	if (test_and_set_bit(KEEPALIVE_PENDING, &con->flags) == 0 &&
	    test_and_set_bit(WRITE_PENDING, &con->state) == 0)
	    test_and_set_bit(WRITE_PENDING, &con->flags) == 0)
		queue_con(con);
		queue_con(con);
}
}
EXPORT_SYMBOL(ceph_con_keepalive);
EXPORT_SYMBOL(ceph_con_keepalive);