Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit c9ffc77a authored by Alex Elder's avatar Alex Elder
Browse files

libceph: define connection flag helpers

Define and use functions that encapsulate operations performed on
a connection's flags.

This resolves:
    http://tracker.ceph.com/issues/4234



Signed-off-by: default avatarAlex Elder <elder@inktank.com>
Reviewed-by: default avatarJosh Durgin <josh.durgin@inktank.com>
parent 37206ee5
Loading
Loading
Loading
Loading
+78 −29
Original line number Original line Diff line number Diff line
@@ -98,6 +98,57 @@
#define CON_FLAG_SOCK_CLOSED	   3  /* socket state changed to closed */
#define CON_FLAG_SOCK_CLOSED	   3  /* socket state changed to closed */
#define CON_FLAG_BACKOFF           4  /* need to retry queuing delayed work */
#define CON_FLAG_BACKOFF           4  /* need to retry queuing delayed work */


static bool con_flag_valid(unsigned long con_flag)
{
	switch (con_flag) {
	case CON_FLAG_LOSSYTX:
	case CON_FLAG_KEEPALIVE_PENDING:
	case CON_FLAG_WRITE_PENDING:
	case CON_FLAG_SOCK_CLOSED:
	case CON_FLAG_BACKOFF:
		return true;
	default:
		return false;
	}
}

static void con_flag_clear(struct ceph_connection *con, unsigned long con_flag)
{
	BUG_ON(!con_flag_valid(con_flag));

	clear_bit(con_flag, &con->flags);
}

static void con_flag_set(struct ceph_connection *con, unsigned long con_flag)
{
	BUG_ON(!con_flag_valid(con_flag));

	set_bit(con_flag, &con->flags);
}

static bool con_flag_test(struct ceph_connection *con, unsigned long con_flag)
{
	BUG_ON(!con_flag_valid(con_flag));

	return test_bit(con_flag, &con->flags);
}

static bool con_flag_test_and_clear(struct ceph_connection *con,
					unsigned long con_flag)
{
	BUG_ON(!con_flag_valid(con_flag));

	return test_and_clear_bit(con_flag, &con->flags);
}

static bool con_flag_test_and_set(struct ceph_connection *con,
					unsigned long con_flag)
{
	BUG_ON(!con_flag_valid(con_flag));

	return test_and_set_bit(con_flag, &con->flags);
}

/* static tag bytes (protocol control messages) */
/* static tag bytes (protocol control messages) */
static char tag_msg = CEPH_MSGR_TAG_MSG;
static char tag_msg = CEPH_MSGR_TAG_MSG;
static char tag_ack = CEPH_MSGR_TAG_ACK;
static char tag_ack = CEPH_MSGR_TAG_ACK;
@@ -309,7 +360,7 @@ static void ceph_sock_write_space(struct sock *sk)
	 * buffer. See net/ipv4/tcp_input.c:tcp_check_space()
	 * buffer. See net/ipv4/tcp_input.c:tcp_check_space()
	 * and net/core/stream.c:sk_stream_write_space().
	 * and net/core/stream.c:sk_stream_write_space().
	 */
	 */
	if (test_bit(CON_FLAG_WRITE_PENDING, &con->flags)) {
	if (con_flag_test(con, CON_FLAG_WRITE_PENDING)) {
		if (sk_stream_wspace(sk) >= sk_stream_min_wspace(sk)) {
		if (sk_stream_wspace(sk) >= sk_stream_min_wspace(sk)) {
			dout("%s %p queueing write work\n", __func__, con);
			dout("%s %p queueing write work\n", __func__, con);
			clear_bit(SOCK_NOSPACE, &sk->sk_socket->flags);
			clear_bit(SOCK_NOSPACE, &sk->sk_socket->flags);
@@ -334,7 +385,7 @@ static void ceph_sock_state_change(struct sock *sk)
	case TCP_CLOSE_WAIT:
	case TCP_CLOSE_WAIT:
		dout("%s TCP_CLOSE_WAIT\n", __func__);
		dout("%s TCP_CLOSE_WAIT\n", __func__);
		con_sock_state_closing(con);
		con_sock_state_closing(con);
		set_bit(CON_FLAG_SOCK_CLOSED, &con->flags);
		con_flag_set(con, CON_FLAG_SOCK_CLOSED);
		queue_con(con);
		queue_con(con);
		break;
		break;
	case TCP_ESTABLISHED:
	case TCP_ESTABLISHED:
@@ -475,7 +526,7 @@ static int con_close_socket(struct ceph_connection *con)
	 * received a socket close event before we had the chance to
	 * received a socket close event before we had the chance to
	 * shut the socket down.
	 * shut the socket down.
	 */
	 */
	clear_bit(CON_FLAG_SOCK_CLOSED, &con->flags);
	con_flag_clear(con, CON_FLAG_SOCK_CLOSED);


	con_sock_state_closed(con);
	con_sock_state_closed(con);
	return rc;
	return rc;
@@ -539,11 +590,10 @@ void ceph_con_close(struct ceph_connection *con)
	     ceph_pr_addr(&con->peer_addr.in_addr));
	     ceph_pr_addr(&con->peer_addr.in_addr));
	con->state = CON_STATE_CLOSED;
	con->state = CON_STATE_CLOSED;


	clear_bit(CON_FLAG_LOSSYTX, &con->flags); /* so we retry next connect */
	con_flag_clear(con, CON_FLAG_LOSSYTX);	/* so we retry next connect */
	clear_bit(CON_FLAG_KEEPALIVE_PENDING, &con->flags);
	con_flag_clear(con, CON_FLAG_KEEPALIVE_PENDING);
	clear_bit(CON_FLAG_WRITE_PENDING, &con->flags);
	con_flag_clear(con, CON_FLAG_WRITE_PENDING);
	clear_bit(CON_FLAG_KEEPALIVE_PENDING, &con->flags);
	con_flag_clear(con, CON_FLAG_BACKOFF);
	clear_bit(CON_FLAG_BACKOFF, &con->flags);


	reset_connection(con);
	reset_connection(con);
	con->peer_global_seq = 0;
	con->peer_global_seq = 0;
@@ -799,7 +849,7 @@ static void prepare_write_message(struct ceph_connection *con)
		/* no, queue up footer too and be done */
		/* no, queue up footer too and be done */
		prepare_write_message_footer(con);
		prepare_write_message_footer(con);


	set_bit(CON_FLAG_WRITE_PENDING, &con->flags);
	con_flag_set(con, CON_FLAG_WRITE_PENDING);
}
}


/*
/*
@@ -820,7 +870,7 @@ static void prepare_write_ack(struct ceph_connection *con)
				&con->out_temp_ack);
				&con->out_temp_ack);


	con->out_more = 1;  /* more will follow.. eventually.. */
	con->out_more = 1;  /* more will follow.. eventually.. */
	set_bit(CON_FLAG_WRITE_PENDING, &con->flags);
	con_flag_set(con, CON_FLAG_WRITE_PENDING);
}
}


/*
/*
@@ -831,7 +881,7 @@ static void prepare_write_keepalive(struct ceph_connection *con)
	dout("prepare_write_keepalive %p\n", con);
	dout("prepare_write_keepalive %p\n", con);
	con_out_kvec_reset(con);
	con_out_kvec_reset(con);
	con_out_kvec_add(con, sizeof (tag_keepalive), &tag_keepalive);
	con_out_kvec_add(con, sizeof (tag_keepalive), &tag_keepalive);
	set_bit(CON_FLAG_WRITE_PENDING, &con->flags);
	con_flag_set(con, CON_FLAG_WRITE_PENDING);
}
}


/*
/*
@@ -874,7 +924,7 @@ static void prepare_write_banner(struct ceph_connection *con)
					&con->msgr->my_enc_addr);
					&con->msgr->my_enc_addr);


	con->out_more = 0;
	con->out_more = 0;
	set_bit(CON_FLAG_WRITE_PENDING, &con->flags);
	con_flag_set(con, CON_FLAG_WRITE_PENDING);
}
}


static int prepare_write_connect(struct ceph_connection *con)
static int prepare_write_connect(struct ceph_connection *con)
@@ -924,7 +974,7 @@ static int prepare_write_connect(struct ceph_connection *con)
					auth->authorizer_buf);
					auth->authorizer_buf);


	con->out_more = 0;
	con->out_more = 0;
	set_bit(CON_FLAG_WRITE_PENDING, &con->flags);
	con_flag_set(con, CON_FLAG_WRITE_PENDING);


	return 0;
	return 0;
}
}
@@ -1644,7 +1694,7 @@ static int process_connect(struct ceph_connection *con)
			le32_to_cpu(con->in_reply.connect_seq));
			le32_to_cpu(con->in_reply.connect_seq));


		if (con->in_reply.flags & CEPH_MSG_CONNECT_LOSSY)
		if (con->in_reply.flags & CEPH_MSG_CONNECT_LOSSY)
			set_bit(CON_FLAG_LOSSYTX, &con->flags);
			con_flag_set(con, CON_FLAG_LOSSYTX);


		con->delay = 0;      /* reset backoff memory */
		con->delay = 0;      /* reset backoff memory */


@@ -2081,15 +2131,14 @@ static int try_write(struct ceph_connection *con)
			prepare_write_ack(con);
			prepare_write_ack(con);
			goto more;
			goto more;
		}
		}
		if (test_and_clear_bit(CON_FLAG_KEEPALIVE_PENDING,
		if (con_flag_test_and_clear(con, CON_FLAG_KEEPALIVE_PENDING)) {
				       &con->flags)) {
			prepare_write_keepalive(con);
			prepare_write_keepalive(con);
			goto more;
			goto more;
		}
		}
	}
	}


	/* Nothing to do! */
	/* Nothing to do! */
	clear_bit(CON_FLAG_WRITE_PENDING, &con->flags);
	con_flag_clear(con, CON_FLAG_WRITE_PENDING);
	dout("try_write nothing else to write.\n");
	dout("try_write nothing else to write.\n");
	ret = 0;
	ret = 0;
out:
out:
@@ -2269,7 +2318,7 @@ static void queue_con(struct ceph_connection *con)


static bool con_sock_closed(struct ceph_connection *con)
static bool con_sock_closed(struct ceph_connection *con)
{
{
	if (!test_and_clear_bit(CON_FLAG_SOCK_CLOSED, &con->flags))
	if (!con_flag_test_and_clear(con, CON_FLAG_SOCK_CLOSED))
		return false;
		return false;


#define CASE(x)								\
#define CASE(x)								\
@@ -2310,14 +2359,14 @@ static void con_work(struct work_struct *work)
	if (con_sock_closed(con))
	if (con_sock_closed(con))
		goto fault;
		goto fault;


	if (test_and_clear_bit(CON_FLAG_BACKOFF, &con->flags)) {
	if (con_flag_test_and_clear(con, CON_FLAG_BACKOFF)) {
		dout("con_work %p backing off\n", con);
		dout("con_work %p backing off\n", con);
		ret = queue_con_delay(con, round_jiffies_relative(con->delay));
		ret = queue_con_delay(con, round_jiffies_relative(con->delay));
		if (ret) {
		if (ret) {
			dout("con_work %p FAILED to back off %lu\n", con,
			dout("con_work %p FAILED to back off %lu\n", con,
			     con->delay);
			     con->delay);
			BUG_ON(ret == -ENOENT);
			BUG_ON(ret == -ENOENT);
			set_bit(CON_FLAG_BACKOFF, &con->flags);
			con_flag_set(con, CON_FLAG_BACKOFF);
		}
		}
		goto done;
		goto done;
	}
	}
@@ -2382,7 +2431,7 @@ static void ceph_fault(struct ceph_connection *con)


	con_close_socket(con);
	con_close_socket(con);


	if (test_bit(CON_FLAG_LOSSYTX, &con->flags)) {
	if (con_flag_test(con, CON_FLAG_LOSSYTX)) {
		dout("fault on LOSSYTX channel, marking CLOSED\n");
		dout("fault on LOSSYTX channel, marking CLOSED\n");
		con->state = CON_STATE_CLOSED;
		con->state = CON_STATE_CLOSED;
		goto out_unlock;
		goto out_unlock;
@@ -2402,9 +2451,9 @@ static void ceph_fault(struct ceph_connection *con)
	/* If there are no messages queued or keepalive pending, place
	/* If there are no messages queued or keepalive pending, place
	 * the connection in a STANDBY state */
	 * the connection in a STANDBY state */
	if (list_empty(&con->out_queue) &&
	if (list_empty(&con->out_queue) &&
	    !test_bit(CON_FLAG_KEEPALIVE_PENDING, &con->flags)) {
	    !con_flag_test(con, CON_FLAG_KEEPALIVE_PENDING)) {
		dout("fault %p setting STANDBY clearing WRITE_PENDING\n", con);
		dout("fault %p setting STANDBY clearing WRITE_PENDING\n", con);
		clear_bit(CON_FLAG_WRITE_PENDING, &con->flags);
		con_flag_clear(con, CON_FLAG_WRITE_PENDING);
		con->state = CON_STATE_STANDBY;
		con->state = CON_STATE_STANDBY;
	} else {
	} else {
		/* retry after a delay. */
		/* retry after a delay. */
@@ -2413,7 +2462,7 @@ static void ceph_fault(struct ceph_connection *con)
			con->delay = BASE_DELAY_INTERVAL;
			con->delay = BASE_DELAY_INTERVAL;
		else if (con->delay < MAX_DELAY_INTERVAL)
		else if (con->delay < MAX_DELAY_INTERVAL)
			con->delay *= 2;
			con->delay *= 2;
		set_bit(CON_FLAG_BACKOFF, &con->flags);
		con_flag_set(con, CON_FLAG_BACKOFF);
		queue_con(con);
		queue_con(con);
	}
	}


@@ -2470,8 +2519,8 @@ static void clear_standby(struct ceph_connection *con)
		dout("clear_standby %p and ++connect_seq\n", con);
		dout("clear_standby %p and ++connect_seq\n", con);
		con->state = CON_STATE_PREOPEN;
		con->state = CON_STATE_PREOPEN;
		con->connect_seq++;
		con->connect_seq++;
		WARN_ON(test_bit(CON_FLAG_WRITE_PENDING, &con->flags));
		WARN_ON(con_flag_test(con, CON_FLAG_WRITE_PENDING));
		WARN_ON(test_bit(CON_FLAG_KEEPALIVE_PENDING, &con->flags));
		WARN_ON(con_flag_test(con, CON_FLAG_KEEPALIVE_PENDING));
	}
	}
}
}


@@ -2512,7 +2561,7 @@ void ceph_con_send(struct ceph_connection *con, struct ceph_msg *msg)


	/* if there wasn't anything waiting to send before, queue
	/* if there wasn't anything waiting to send before, queue
	 * new work */
	 * new work */
	if (test_and_set_bit(CON_FLAG_WRITE_PENDING, &con->flags) == 0)
	if (con_flag_test_and_set(con, CON_FLAG_WRITE_PENDING) == 0)
		queue_con(con);
		queue_con(con);
}
}
EXPORT_SYMBOL(ceph_con_send);
EXPORT_SYMBOL(ceph_con_send);
@@ -2601,8 +2650,8 @@ void ceph_con_keepalive(struct ceph_connection *con)
	mutex_lock(&con->mutex);
	mutex_lock(&con->mutex);
	clear_standby(con);
	clear_standby(con);
	mutex_unlock(&con->mutex);
	mutex_unlock(&con->mutex);
	if (test_and_set_bit(CON_FLAG_KEEPALIVE_PENDING, &con->flags) == 0 &&
	if (con_flag_test_and_set(con, CON_FLAG_KEEPALIVE_PENDING) == 0 &&
	    test_and_set_bit(CON_FLAG_WRITE_PENDING, &con->flags) == 0)
	    con_flag_test_and_set(con, CON_FLAG_WRITE_PENDING) == 0)
		queue_con(con);
		queue_con(con);
}
}
EXPORT_SYMBOL(ceph_con_keepalive);
EXPORT_SYMBOL(ceph_con_keepalive);