Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit f5c17aae authored by David Howells's avatar David Howells
Browse files

rxrpc: Calls should only have one terminal state



Condense the terminal states of a call state machine to a single state,
plus a separate completion type value.  The value is then set, along with
error and abort code values, only when the call is transitioned to the
completion state.

Helpers are provided to simplify this.

Signed-off-by: default avatarDavid Howells <dhowells@redhat.com>
parent ccbd3dbe
Loading
Loading
Loading
Loading
+90 −26
Original line number Diff line number Diff line
@@ -289,8 +289,6 @@ enum rxrpc_conn_proto_state {
	RXRPC_CONN_SERVICE,		/* Service secured connection */
	RXRPC_CONN_REMOTELY_ABORTED,	/* Conn aborted by peer */
	RXRPC_CONN_LOCALLY_ABORTED,	/* Conn aborted locally */
	RXRPC_CONN_NETWORK_ERROR,	/* Conn terminated by network error */
	RXRPC_CONN_LOCAL_ERROR,		/* Conn terminated by local error */
	RXRPC_CONN__NR_STATES
};

@@ -344,7 +342,6 @@ struct rxrpc_connection {
	enum rxrpc_conn_proto_state state : 8;	/* current state of connection */
	u32			local_abort;	/* local abort code */
	u32			remote_abort;	/* remote abort code */
	int			error;		/* local error incurred */
	int			debug_id;	/* debug ID for printks */
	atomic_t		serial;		/* packet serial number counter */
	unsigned int		hi_serial;	/* highest serial number received */
@@ -411,13 +408,22 @@ enum rxrpc_call_state {
	RXRPC_CALL_SERVER_ACK_REQUEST,	/* - server pending ACK of request */
	RXRPC_CALL_SERVER_SEND_REPLY,	/* - server sending reply */
	RXRPC_CALL_SERVER_AWAIT_ACK,	/* - server awaiting final ACK */
	RXRPC_CALL_COMPLETE,		/* - call completed */
	RXRPC_CALL_COMPLETE,		/* - call complete */
	RXRPC_CALL_DEAD,		/* - call is dead */
	NR__RXRPC_CALL_STATES
};

/*
 * Call completion condition (state == RXRPC_CALL_COMPLETE).
 */
enum rxrpc_call_completion {
	RXRPC_CALL_SUCCEEDED,		/* - Normal termination */
	RXRPC_CALL_SERVER_BUSY,		/* - call rejected by busy server */
	RXRPC_CALL_REMOTELY_ABORTED,	/* - call aborted by peer */
	RXRPC_CALL_LOCALLY_ABORTED,	/* - call aborted locally on error or close */
	RXRPC_CALL_LOCAL_ERROR,		/* - call failed due to local error */
	RXRPC_CALL_NETWORK_ERROR,	/* - call terminated by network error */
	RXRPC_CALL_DEAD,		/* - call is dead */
	NR__RXRPC_CALL_STATES
	NR__RXRPC_CALL_COMPLETIONS
};

/*
@@ -451,14 +457,13 @@ struct rxrpc_call {
	unsigned long		events;
	spinlock_t		lock;
	rwlock_t		state_lock;	/* lock for state transition */
	u32			abort_code;	/* Local/remote abort code */
	int			error;		/* Local error incurred */
	enum rxrpc_call_state	state : 8;	/* current state of call */
	enum rxrpc_call_completion completion : 8; /* Call completion condition */
	atomic_t		usage;
	atomic_t		skb_count;	/* Outstanding packets on this call */
	atomic_t		sequence;	/* Tx data packet sequence counter */
	u32			local_abort;	/* local abort code */
	u32			remote_abort;	/* remote abort code */
	int			error_report;	/* Network error (ICMP/local transport) */
	int			error;		/* Local error incurred */
	enum rxrpc_call_state	state : 8;	/* current state of call */
	u16			service_id;	/* service ID */
	u32			call_id;	/* call ID on connection  */
	u32			cid;		/* connection ID plus channel index */
@@ -493,20 +498,6 @@ struct rxrpc_call {
	unsigned long		ackr_window[RXRPC_ACKR_WINDOW_ASZ + 1];
};

/*
 * locally abort an RxRPC call
 */
static inline void rxrpc_abort_call(struct rxrpc_call *call, u32 abort_code)
{
	write_lock_bh(&call->state_lock);
	if (call->state < RXRPC_CALL_COMPLETE) {
		call->local_abort = abort_code;
		call->state = RXRPC_CALL_LOCALLY_ABORTED;
		set_bit(RXRPC_CALL_EV_ABORT, &call->events);
	}
	write_unlock_bh(&call->state_lock);
}

#include <trace/events/rxrpc.h>

/*
@@ -534,6 +525,8 @@ void rxrpc_process_call(struct work_struct *);
/*
 * call_object.c
 */
extern const char *const rxrpc_call_states[];
extern const char *const rxrpc_call_completions[];
extern unsigned int rxrpc_max_call_lifetime;
extern unsigned int rxrpc_dead_call_expiry;
extern struct kmem_cache *rxrpc_call_jar;
@@ -563,6 +556,78 @@ static inline bool rxrpc_is_client_call(const struct rxrpc_call *call)
	return !rxrpc_is_service_call(call);
}

/*
 * Transition a call to the complete state.
 */
static inline bool __rxrpc_set_call_completion(struct rxrpc_call *call,
					       enum rxrpc_call_completion compl,
					       u32 abort_code,
					       int error)
{
	if (call->state < RXRPC_CALL_COMPLETE) {
		call->abort_code = abort_code;
		call->error = error;
		call->completion = compl,
		call->state = RXRPC_CALL_COMPLETE;
		return true;
	}
	return false;
}

static inline bool rxrpc_set_call_completion(struct rxrpc_call *call,
					     enum rxrpc_call_completion compl,
					     u32 abort_code,
					     int error)
{
	int ret;

	write_lock_bh(&call->state_lock);
	ret = __rxrpc_set_call_completion(call, compl, abort_code, error);
	write_unlock_bh(&call->state_lock);
	return ret;
}

/*
 * Record that a call successfully completed.
 */
static inline void __rxrpc_call_completed(struct rxrpc_call *call)
{
	__rxrpc_set_call_completion(call, RXRPC_CALL_SUCCEEDED, 0, 0);
}

static inline void rxrpc_call_completed(struct rxrpc_call *call)
{
	write_lock_bh(&call->state_lock);
	__rxrpc_call_completed(call);
	write_unlock_bh(&call->state_lock);
}

/*
 * Record that a call is locally aborted.
 */
static inline bool __rxrpc_abort_call(struct rxrpc_call *call,
				      u32 abort_code, int error)
{
	if (__rxrpc_set_call_completion(call,
					RXRPC_CALL_LOCALLY_ABORTED,
					abort_code, error)) {
		set_bit(RXRPC_CALL_EV_ABORT, &call->events);
		return true;
	}
	return false;
}

static inline bool rxrpc_abort_call(struct rxrpc_call *call,
				    u32 abort_code, int error)
{
	bool ret;

	write_lock_bh(&call->state_lock);
	ret = __rxrpc_abort_call(call, abort_code, error);
	write_unlock_bh(&call->state_lock);
	return ret;
}

/*
 * conn_client.c
 */
@@ -778,7 +843,6 @@ static inline void rxrpc_put_peer(struct rxrpc_peer *peer)
/*
 * proc.c
 */
extern const char *const rxrpc_call_states[];
extern const struct file_operations rxrpc_call_seq_fops;
extern const struct file_operations rxrpc_connection_seq_fops;

+6 −13
Original line number Diff line number Diff line
@@ -329,12 +329,8 @@ struct rxrpc_call *rxrpc_accept_call(struct rxrpc_sock *rx,
	case RXRPC_CALL_SERVER_ACCEPTING:
		call->state = RXRPC_CALL_SERVER_RECV_REQUEST;
		break;
	case RXRPC_CALL_REMOTELY_ABORTED:
	case RXRPC_CALL_LOCALLY_ABORTED:
		ret = -ECONNABORTED;
		goto out_release;
	case RXRPC_CALL_NETWORK_ERROR:
		ret = call->conn->error;
	case RXRPC_CALL_COMPLETE:
		ret = call->error;
		goto out_release;
	case RXRPC_CALL_DEAD:
		ret = -ETIME;
@@ -403,17 +399,14 @@ int rxrpc_reject_call(struct rxrpc_sock *rx)
	write_lock_bh(&call->state_lock);
	switch (call->state) {
	case RXRPC_CALL_SERVER_ACCEPTING:
		call->state = RXRPC_CALL_SERVER_BUSY;
		__rxrpc_set_call_completion(call, RXRPC_CALL_SERVER_BUSY,
					    0, ECONNABORTED);
		if (test_and_set_bit(RXRPC_CALL_EV_REJECT_BUSY, &call->events))
			rxrpc_queue_call(call);
		ret = 0;
		goto out_release;
	case RXRPC_CALL_REMOTELY_ABORTED:
	case RXRPC_CALL_LOCALLY_ABORTED:
		ret = -ECONNABORTED;
		goto out_release;
	case RXRPC_CALL_NETWORK_ERROR:
		ret = call->conn->error;
	case RXRPC_CALL_COMPLETE:
		ret = call->error;
		goto out_release;
	case RXRPC_CALL_DEAD:
		ret = -ETIME;
+16 −26
Original line number Diff line number Diff line
@@ -95,7 +95,7 @@ void __rxrpc_propose_ACK(struct rxrpc_call *call, u8 ack_reason,
	_debug("cancel timer %%%u", serial);
	try_to_del_timer_sync(&call->ack_timer);
	read_lock_bh(&call->state_lock);
	if (call->state <= RXRPC_CALL_COMPLETE &&
	if (call->state < RXRPC_CALL_COMPLETE &&
	    !test_and_set_bit(RXRPC_CALL_EV_ACK, &call->events))
		rxrpc_queue_call(call);
	read_unlock_bh(&call->state_lock);
@@ -123,7 +123,7 @@ static void rxrpc_set_resend(struct rxrpc_call *call, u8 resend,
			     unsigned long resend_at)
{
	read_lock_bh(&call->state_lock);
	if (call->state >= RXRPC_CALL_COMPLETE)
	if (call->state == RXRPC_CALL_COMPLETE)
		resend = 0;

	if (resend & 1) {
@@ -230,7 +230,7 @@ static void rxrpc_resend_timer(struct rxrpc_call *call)
	_enter("%d,%d,%d",
	       call->acks_tail, call->acks_unacked, call->acks_head);

	if (call->state >= RXRPC_CALL_COMPLETE)
	if (call->state == RXRPC_CALL_COMPLETE)
		return;

	resend = 0;
@@ -711,7 +711,7 @@ static int rxrpc_process_rx_queue(struct rxrpc_call *call,
		break;
	case RXRPC_CALL_SERVER_AWAIT_ACK:
		_debug("srv complete");
		call->state = RXRPC_CALL_COMPLETE;
		__rxrpc_call_completed(call);
		post_ACK = true;
		break;
	case RXRPC_CALL_CLIENT_SEND_REQUEST:
@@ -875,24 +875,22 @@ void rxrpc_process_call(struct work_struct *work)
		clear_bit(RXRPC_CALL_EV_REJECT_BUSY, &call->events);
		clear_bit(RXRPC_CALL_EV_ABORT, &call->events);

		error = call->error_report;
		if (error < RXRPC_LOCAL_ERROR_OFFSET) {
		if (call->completion == RXRPC_CALL_NETWORK_ERROR) {
			mark = RXRPC_SKB_MARK_NET_ERROR;
			_debug("post net error %d", error);
		} else {
			mark = RXRPC_SKB_MARK_LOCAL_ERROR;
			error -= RXRPC_LOCAL_ERROR_OFFSET;
			_debug("post net local error %d", error);
		}

		if (rxrpc_post_message(call, mark, error, true) < 0)
		if (rxrpc_post_message(call, mark, call->error, true) < 0)
			goto no_mem;
		clear_bit(RXRPC_CALL_EV_RCVD_ERROR, &call->events);
		goto kill_ACKs;
	}

	if (test_bit(RXRPC_CALL_EV_CONN_ABORT, &call->events)) {
		ASSERTCMP(call->state, >, RXRPC_CALL_COMPLETE);
		ASSERTCMP(call->state, ==, RXRPC_CALL_COMPLETE);

		clear_bit(RXRPC_CALL_EV_REJECT_BUSY, &call->events);
		clear_bit(RXRPC_CALL_EV_ABORT, &call->events);
@@ -900,7 +898,7 @@ void rxrpc_process_call(struct work_struct *work)
		_debug("post conn abort");

		if (rxrpc_post_message(call, RXRPC_SKB_MARK_LOCAL_ERROR,
				       call->conn->error, true) < 0)
				       call->error, true) < 0)
			goto no_mem;
		clear_bit(RXRPC_CALL_EV_CONN_ABORT, &call->events);
		goto kill_ACKs;
@@ -913,13 +911,13 @@ void rxrpc_process_call(struct work_struct *work)
	}

	if (test_bit(RXRPC_CALL_EV_ABORT, &call->events)) {
		ASSERTCMP(call->state, >, RXRPC_CALL_COMPLETE);
		ASSERTCMP(call->state, ==, RXRPC_CALL_COMPLETE);

		if (rxrpc_post_message(call, RXRPC_SKB_MARK_LOCAL_ERROR,
				       ECONNABORTED, true) < 0)
				       call->error, true) < 0)
			goto no_mem;
		whdr.type = RXRPC_PACKET_TYPE_ABORT;
		data = htonl(call->local_abort);
		data = htonl(call->abort_code);
		iov[1].iov_base = &data;
		iov[1].iov_len = sizeof(data);
		genbit = RXRPC_CALL_EV_ABORT;
@@ -979,13 +977,7 @@ void rxrpc_process_call(struct work_struct *work)
	}

	if (test_bit(RXRPC_CALL_EV_LIFE_TIMER, &call->events)) {
		write_lock_bh(&call->state_lock);
		if (call->state <= RXRPC_CALL_COMPLETE) {
			call->state = RXRPC_CALL_LOCALLY_ABORTED;
			call->local_abort = RX_CALL_TIMEOUT;
			set_bit(RXRPC_CALL_EV_ABORT, &call->events);
		}
		write_unlock_bh(&call->state_lock);
		rxrpc_abort_call(call, RX_CALL_TIMEOUT, ETIME);

		_debug("post timeout");
		if (rxrpc_post_message(call, RXRPC_SKB_MARK_LOCAL_ERROR,
@@ -998,7 +990,8 @@ void rxrpc_process_call(struct work_struct *work)

	/* deal with assorted inbound messages */
	if (!skb_queue_empty(&call->rx_queue)) {
		switch (rxrpc_process_rx_queue(call, &abort_code)) {
		ret = rxrpc_process_rx_queue(call, &abort_code);
		switch (ret) {
		case 0:
		case -EAGAIN:
			break;
@@ -1007,7 +1000,7 @@ void rxrpc_process_call(struct work_struct *work)
		case -EKEYEXPIRED:
		case -EKEYREJECTED:
		case -EPROTO:
			rxrpc_abort_call(call, abort_code);
			rxrpc_abort_call(call, abort_code, -ret);
			goto kill_ACKs;
		}
	}
@@ -1232,10 +1225,7 @@ void rxrpc_process_call(struct work_struct *work)
		goto kill_ACKs;

	case RXRPC_CALL_EV_ACK_FINAL:
		write_lock_bh(&call->state_lock);
		if (call->state == RXRPC_CALL_CLIENT_FINAL_ACK)
			call->state = RXRPC_CALL_COMPLETE;
		write_unlock_bh(&call->state_lock);
		rxrpc_call_completed(call);
		goto kill_ACKs;

	default:
+17 −26
Original line number Diff line number Diff line
@@ -43,11 +43,16 @@ const char *const rxrpc_call_states[NR__RXRPC_CALL_STATES] = {
	[RXRPC_CALL_SERVER_SEND_REPLY]		= "SvSndRpl",
	[RXRPC_CALL_SERVER_AWAIT_ACK]		= "SvAwtACK",
	[RXRPC_CALL_COMPLETE]			= "Complete",
	[RXRPC_CALL_DEAD]			= "Dead    ",
};

const char *const rxrpc_call_completions[NR__RXRPC_CALL_COMPLETIONS] = {
	[RXRPC_CALL_SUCCEEDED]			= "Complete",
	[RXRPC_CALL_SERVER_BUSY]		= "SvBusy  ",
	[RXRPC_CALL_REMOTELY_ABORTED]		= "RmtAbort",
	[RXRPC_CALL_LOCALLY_ABORTED]		= "LocAbort",
	[RXRPC_CALL_LOCAL_ERROR]		= "LocError",
	[RXRPC_CALL_NETWORK_ERROR]		= "NetError",
	[RXRPC_CALL_DEAD]			= "Dead    ",
};

struct kmem_cache *rxrpc_call_jar;
@@ -358,7 +363,7 @@ struct rxrpc_call *rxrpc_incoming_call(struct rxrpc_sock *rx,
		_debug("CALL: %u { %s }",
		       call->debug_id, rxrpc_call_states[call->state]);

		if (call->state >= RXRPC_CALL_COMPLETE) {
		if (call->state == RXRPC_CALL_COMPLETE) {
			__rxrpc_disconnect_call(conn, call);
		} else {
			spin_unlock(&conn->channel_lock);
@@ -472,8 +477,7 @@ void rxrpc_release_call(struct rxrpc_call *call)
	if (call->state < RXRPC_CALL_COMPLETE &&
	    call->state != RXRPC_CALL_CLIENT_FINAL_ACK) {
		_debug("+++ ABORTING STATE %d +++\n", call->state);
		call->state = RXRPC_CALL_LOCALLY_ABORTED;
		call->local_abort = RX_CALL_DEAD;
		__rxrpc_abort_call(call, RX_CALL_DEAD, ECONNRESET);
	}
	write_unlock_bh(&call->state_lock);

@@ -538,20 +542,13 @@ static void rxrpc_mark_call_released(struct rxrpc_call *call)

	write_lock(&call->state_lock);
	if (call->state < RXRPC_CALL_DEAD) {
		sched = false;
		if (call->state < RXRPC_CALL_COMPLETE) {
			_debug("abort call %p", call);
			call->state = RXRPC_CALL_LOCALLY_ABORTED;
			call->local_abort = RX_CALL_DEAD;
			if (!test_and_set_bit(RXRPC_CALL_EV_ABORT, &call->events))
				sched = true;
		}
		sched = __rxrpc_abort_call(call, RX_CALL_DEAD, ECONNRESET);
		if (!test_and_set_bit(RXRPC_CALL_EV_RELEASE, &call->events))
			sched = true;
		if (sched)
			rxrpc_queue_call(call);
	}
	write_unlock(&call->state_lock);
	if (sched)
		rxrpc_queue_call(call);
}

/*
@@ -749,17 +746,14 @@ static void rxrpc_call_life_expired(unsigned long _call)
{
	struct rxrpc_call *call = (struct rxrpc_call *) _call;

	_enter("{%d}", call->debug_id);

	if (call->state >= RXRPC_CALL_COMPLETE)
		return;

	_enter("{%d}", call->debug_id);
	read_lock_bh(&call->state_lock);
	if (call->state < RXRPC_CALL_COMPLETE) {
	set_bit(RXRPC_CALL_EV_LIFE_TIMER, &call->events);
	rxrpc_queue_call(call);
}
	read_unlock_bh(&call->state_lock);
}

/*
 * handle resend timer expiry
@@ -791,9 +785,6 @@ static void rxrpc_ack_time_expired(unsigned long _call)
	if (call->state >= RXRPC_CALL_COMPLETE)
		return;

	read_lock_bh(&call->state_lock);
	if (call->state < RXRPC_CALL_COMPLETE &&
	    !test_and_set_bit(RXRPC_CALL_EV_ACK, &call->events))
	if (!test_and_set_bit(RXRPC_CALL_EV_ACK, &call->events))
		rxrpc_queue_call(call);
	read_unlock_bh(&call->state_lock);
}
+1 −1
Original line number Diff line number Diff line
@@ -741,7 +741,7 @@ void rxrpc_disconnect_client_call(struct rxrpc_call *call)
	 * terminal retransmission without requiring access to the call.
	 */
	if (test_bit(RXRPC_CALL_EXPOSED, &call->flags)) {
		_debug("exposed %u,%u", call->call_id, call->local_abort);
		_debug("exposed %u,%u", call->call_id, call->abort_code);
		__rxrpc_disconnect_call(conn, call);
	}

Loading