Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit fa5f4aaf authored by David S. Miller's avatar David S. Miller
Browse files

Merge tag 'rxrpc-rewrite-20160908' of...

Merge tag 'rxrpc-rewrite-20160908' of git://git.kernel.org/pub/scm/linux/kernel/git/dhowells/linux-fs



David Howells says:

====================
rxrpc: Rewrite data and ack handling

This patch set constitutes the main portion of the AF_RXRPC rewrite.  It
consists of five fix/helper patches:

 (1) Fix ASSERTCMP's and ASSERTIFCMP's handling of signed values.

 (2) Update some protocol definitions slightly.

 (3) Use of an hlist for RCU purposes.

 (4) Removal of per-call sk_buff accounting (not really needed when skbs
     aren't being queued on the main queue).

 (5) Addition of a tracepoint to log incoming packets in the data_ready
     callback and to log the end of the data_ready callback.

And then there are two patches that form the main part:

 (6) Preallocation of resources for incoming calls so that in patch (7) the
     data_ready handler can be made to fully instantiate an incoming call
     and make it live.  This extends through into AFS so that AFS can
     preallocate its own incoming call resources.

     The preallocation size is capped at the listen() backlog setting - and
     that is capped at a sysctl limit which can be set between 4 and 32.

     The preallocation is (re)charged either by accepting/rejecting pending
     calls or, in the case of AFS, manually.  If insufficient preallocation
     resources exist, a BUSY packet will be transmitted.

     The advantage of using this preallocation is that once a call is set
     up in the data_ready handler, DATA packets can be queued on it
     immediately rather than the DATA packets being queued for a background
     work item to do all the allocation and then try and sort out the DATA
     packets whilst other DATA packets may still be coming in and going
     either to the background thread or the new call.

 (7) Rewrite the handling of DATA, ACK and ABORT packets.

     In the receive phase, DATA packets are now held in per-call circular
     buffers with deduplication, out of sequence detection and suchlike
     being done in data_ready.  Since there is only one producer and only
     once consumer, no locks need be used on the receive queue.

     Received ACK and ABORT packets are now parsed and discarded in
     data_ready to recycle resources as fast as possible.

     sk_buffs are no longer pulled, trimmed or cloned, but rather the
     offset and size of the content is tracked.  This particularly affects
     jumbo DATA packets which need insertion into the receive buffer in
     multiple places.  Annotations are kept to track which bit is which.

     Packets are no longer queued on the socket receive queue; rather,
     calls are queued.  Dummy packets to convey events therefore no longer
     need to be invented and metadata packets can be discarded as soon as
     parsed rather then being pushed onto the socket receive queue to
     indicate terminal events.

     The preallocation facility added in (6) is now used to set up incoming
     calls with very little locking required and no calls to the allocator
     in data_ready.

     Decryption and verification is now handled in recvmsg() rather than in
     a background thread.  This allows for the future possibility of
     decrypting directly into the user buffer.

     With this patch, the code is a lot simpler and most of the mass of
     call event and state wangling code in call_event.c is gone.

With this, the majority of the AF_RXRPC rewrite is complete.  However,
there are still things to be done, including:

 (*) Limit the number of active service calls to prevent an attacker from
     filling up a server's memory.

 (*) Limit the number of calls on the rebuff-with-BUSY queue.

 (*) Transmit delayed/deferred ACKs from recvmsg() if possible, rather than
     punting to the background thread.  Ideally, the background thread
     shouldn't run at all, but data_ready can't call kernel_sendmsg() and
     we can't rely on recvmsg() attending to the call in a timely fashion.

 (*) Prevent the call at the front of the socket queue from hogging
     recvmsg()'s attention if there's a sufficiently continuous supply of
     data.

 (*) Distribute ICMP errors by connection rather than by call.  Possibly
     parse the ICMP packet to try and pin down the exact connection and
     call.

 (*) Encrypt/decrypt directly between user buffers and socket buffers where
     possible.

 (*) IPv6.

 (*) Service ID upgrade.  This is a facility whereby a special flag bit is
     set in the DATA packet header when making a call that tells the server
     that it is allowed to change the service ID to an upgraded one and
     reply with an equivalent call from the upgraded service.

     This is used, for example, to override certain AFS calls so that IPv6
     addresses can be returned.

 (*) Allow userspace to preallocate call user IDs for incoming calls.
====================

Signed-off-by: default avatarDavid S. Miller <davem@davemloft.net>
parents 46dfc23e 248f219c
Loading
Loading
Loading
Loading
+54 −34
Original line number Diff line number Diff line
@@ -18,6 +18,7 @@

struct socket *afs_socket; /* my RxRPC socket */
static struct workqueue_struct *afs_async_calls;
static struct afs_call *afs_spare_incoming_call;
static atomic_t afs_outstanding_calls;

static void afs_free_call(struct afs_call *);
@@ -26,7 +27,8 @@ static int afs_wait_for_call_to_complete(struct afs_call *);
static void afs_wake_up_async_call(struct sock *, struct rxrpc_call *, unsigned long);
static int afs_dont_wait_for_call_to_complete(struct afs_call *);
static void afs_process_async_call(struct work_struct *);
static void afs_rx_new_call(struct sock *);
static void afs_rx_new_call(struct sock *, struct rxrpc_call *, unsigned long);
static void afs_rx_discard_new_call(struct rxrpc_call *, unsigned long);
static int afs_deliver_cm_op_id(struct afs_call *);

/* synchronous call management */
@@ -53,9 +55,9 @@ static const struct afs_call_type afs_RXCMxxxx = {
	.abort_to_error	= afs_abort_to_error,
};

static void afs_collect_incoming_call(struct work_struct *);
static void afs_charge_preallocation(struct work_struct *);

static DECLARE_WORK(afs_collect_incoming_call_work, afs_collect_incoming_call);
static DECLARE_WORK(afs_charge_preallocation_work, afs_charge_preallocation);

static int afs_wait_atomic_t(atomic_t *p)
{
@@ -100,13 +102,15 @@ int afs_open_socket(void)
	if (ret < 0)
		goto error_2;

	rxrpc_kernel_new_call_notification(socket, afs_rx_new_call);
	rxrpc_kernel_new_call_notification(socket, afs_rx_new_call,
					   afs_rx_discard_new_call);

	ret = kernel_listen(socket, INT_MAX);
	if (ret < 0)
		goto error_2;

	afs_socket = socket;
	afs_charge_preallocation(NULL);
	_leave(" = 0");
	return 0;

@@ -126,11 +130,19 @@ void afs_close_socket(void)
{
	_enter("");

	if (afs_spare_incoming_call) {
		atomic_inc(&afs_outstanding_calls);
		afs_free_call(afs_spare_incoming_call);
		afs_spare_incoming_call = NULL;
	}

	_debug("outstanding %u", atomic_read(&afs_outstanding_calls));
	wait_on_atomic_t(&afs_outstanding_calls, afs_wait_atomic_t,
			 TASK_UNINTERRUPTIBLE);
	_debug("no outstanding calls");

	flush_workqueue(afs_async_calls);
	kernel_sock_shutdown(afs_socket, SHUT_RDWR);
	flush_workqueue(afs_async_calls);
	sock_release(afs_socket);

@@ -590,57 +602,65 @@ static void afs_process_async_call(struct work_struct *work)
	_leave("");
}

static void afs_rx_attach(struct rxrpc_call *rxcall, unsigned long user_call_ID)
{
	struct afs_call *call = (struct afs_call *)user_call_ID;

	call->rxcall = rxcall;
}

/*
 * accept the backlog of incoming calls
 * Charge the incoming call preallocation.
 */
static void afs_collect_incoming_call(struct work_struct *work)
static void afs_charge_preallocation(struct work_struct *work)
{
	struct rxrpc_call *rxcall;
	struct afs_call *call = NULL;

	_enter("");
	struct afs_call *call = afs_spare_incoming_call;

	do {
	for (;;) {
		if (!call) {
			call = kzalloc(sizeof(struct afs_call), GFP_KERNEL);
			if (!call) {
				rxrpc_kernel_reject_call(afs_socket);
				return;
			}
			if (!call)
				break;

			INIT_WORK(&call->async_work, afs_process_async_call);
			call->wait_mode = &afs_async_incoming_call;
			call->type = &afs_RXCMxxxx;
			init_waitqueue_head(&call->waitq);
			call->state = AFS_CALL_AWAIT_OP_ID;

			_debug("CALL %p{%s} [%d]",
			       call, call->type->name,
			       atomic_read(&afs_outstanding_calls));
			atomic_inc(&afs_outstanding_calls);
		}

		rxcall = rxrpc_kernel_accept_call(afs_socket,
		if (rxrpc_kernel_charge_accept(afs_socket,
					       afs_wake_up_async_call,
					       afs_rx_attach,
					       (unsigned long)call,
						  afs_wake_up_async_call);
		if (!IS_ERR(rxcall)) {
			call->rxcall = rxcall;
			call->need_attention = true;
			queue_work(afs_async_calls, &call->async_work);
					       GFP_KERNEL) < 0)
			break;
		call = NULL;
	}
	} while (!call);
	afs_spare_incoming_call = call;
}

/*
 * Discard a preallocated call when a socket is shut down.
 */
static void afs_rx_discard_new_call(struct rxrpc_call *rxcall,
				    unsigned long user_call_ID)
{
	struct afs_call *call = (struct afs_call *)user_call_ID;

	if (call)
	atomic_inc(&afs_outstanding_calls);
	call->rxcall = NULL;
	afs_free_call(call);
}

/*
 * Notification of an incoming call.
 */
static void afs_rx_new_call(struct sock *sk)
static void afs_rx_new_call(struct sock *sk, struct rxrpc_call *rxcall,
			    unsigned long user_call_ID)
{
	queue_work(afs_wq, &afs_collect_incoming_call_work);
	atomic_inc(&afs_outstanding_calls);
	queue_work(afs_wq, &afs_charge_preallocation_work);
}

/*
+8 −5
Original line number Diff line number Diff line
@@ -21,10 +21,14 @@ struct rxrpc_call;

typedef void (*rxrpc_notify_rx_t)(struct sock *, struct rxrpc_call *,
				  unsigned long);
typedef void (*rxrpc_notify_new_call_t)(struct sock *);
typedef void (*rxrpc_notify_new_call_t)(struct sock *, struct rxrpc_call *,
					unsigned long);
typedef void (*rxrpc_discard_new_call_t)(struct rxrpc_call *, unsigned long);
typedef void (*rxrpc_user_attach_call_t)(struct rxrpc_call *, unsigned long);

void rxrpc_kernel_new_call_notification(struct socket *,
					rxrpc_notify_new_call_t);
					rxrpc_notify_new_call_t,
					rxrpc_discard_new_call_t);
struct rxrpc_call *rxrpc_kernel_begin_call(struct socket *,
					   struct sockaddr_rxrpc *,
					   struct key *,
@@ -38,10 +42,9 @@ int rxrpc_kernel_recv_data(struct socket *, struct rxrpc_call *,
void rxrpc_kernel_abort_call(struct socket *, struct rxrpc_call *,
			     u32, int, const char *);
void rxrpc_kernel_end_call(struct socket *, struct rxrpc_call *);
struct rxrpc_call *rxrpc_kernel_accept_call(struct socket *, unsigned long,
					    rxrpc_notify_rx_t);
int rxrpc_kernel_reject_call(struct socket *);
void rxrpc_kernel_get_peer(struct socket *, struct rxrpc_call *,
			   struct sockaddr_rxrpc *);
int rxrpc_kernel_charge_accept(struct socket *, rxrpc_notify_rx_t,
			       rxrpc_user_attach_call_t, unsigned long, gfp_t);

#endif /* _NET_RXRPC_H */
+12 −3
Original line number Diff line number Diff line
@@ -34,8 +34,6 @@ struct rxrpc_wire_header {
#define RXRPC_CID_INC		(1 << RXRPC_CIDSHIFT)	/* connection ID increment */

	__be32		callNumber;	/* call ID (0 for connection-level packets) */
#define RXRPC_PROCESS_MAXCALLS	(1<<2)	/* maximum number of active calls per conn (power of 2) */

	__be32		seq;		/* sequence number of pkt in call stream */
	__be32		serial;		/* serial number of pkt sent to network */

@@ -93,10 +91,14 @@ struct rxrpc_wire_header {
struct rxrpc_jumbo_header {
	uint8_t		flags;		/* packet flags (as per rxrpc_header) */
	uint8_t		pad;
	__be16		_rsvd;		/* reserved (used by kerberos security as cksum) */
	union {
		__be16	_rsvd;		/* reserved */
		__be16	cksum;		/* kerberos security checksum */
	};
};

#define RXRPC_JUMBO_DATALEN	1412	/* non-terminal jumbo packet data length */
#define RXRPC_JUMBO_SUBPKTLEN	(RXRPC_JUMBO_DATALEN + sizeof(struct rxrpc_jumbo_header))

/*****************************************************************************/
/*
@@ -131,6 +133,13 @@ struct rxrpc_ackpacket {

} __packed;

/* Some ACKs refer to specific packets and some are general and can be updated. */
#define RXRPC_ACK_UPDATEABLE ((1 << RXRPC_ACK_REQUESTED)	|	\
			      (1 << RXRPC_ACK_PING_RESPONSE)	|	\
			      (1 << RXRPC_ACK_DELAY)		|	\
			      (1 << RXRPC_ACK_IDLE))


/*
 * ACK packets can have a further piece of information tagged on the end
 */
+41 −7
Original line number Diff line number Diff line
@@ -18,16 +18,14 @@

TRACE_EVENT(rxrpc_call,
	    TP_PROTO(struct rxrpc_call *call, enum rxrpc_call_trace op,
		     int usage, int nskb,
		     const void *where, const void *aux),
		     int usage, const void *where, const void *aux),

	    TP_ARGS(call, op, usage, nskb, where, aux),
	    TP_ARGS(call, op, usage, where, aux),

	    TP_STRUCT__entry(
		    __field(struct rxrpc_call *,	call		)
		    __field(int,			op		)
		    __field(int,			usage		)
		    __field(int,			nskb		)
		    __field(const void *,		where		)
		    __field(const void *,		aux		)
			     ),
@@ -36,16 +34,14 @@ TRACE_EVENT(rxrpc_call,
		    __entry->call = call;
		    __entry->op = op;
		    __entry->usage = usage;
		    __entry->nskb = nskb;
		    __entry->where = where;
		    __entry->aux = aux;
			   ),

	    TP_printk("c=%p %s u=%d s=%d p=%pSR a=%p",
	    TP_printk("c=%p %s u=%d sp=%pSR a=%p",
		      __entry->call,
		      rxrpc_call_traces[__entry->op],
		      __entry->usage,
		      __entry->nskb,
		      __entry->where,
		      __entry->aux)
	    );
@@ -84,6 +80,44 @@ TRACE_EVENT(rxrpc_skb,
		      __entry->where)
	    );

TRACE_EVENT(rxrpc_rx_packet,
	    TP_PROTO(struct rxrpc_skb_priv *sp),

	    TP_ARGS(sp),

	    TP_STRUCT__entry(
		    __field_struct(struct rxrpc_host_header,	hdr		)
			     ),

	    TP_fast_assign(
		    memcpy(&__entry->hdr, &sp->hdr, sizeof(__entry->hdr));
			   ),

	    TP_printk("%08x:%08x:%08x:%04x %08x %08x %02x %02x",
		      __entry->hdr.epoch, __entry->hdr.cid,
		      __entry->hdr.callNumber, __entry->hdr.serviceId,
		      __entry->hdr.serial, __entry->hdr.seq,
		      __entry->hdr.type, __entry->hdr.flags)
	    );

TRACE_EVENT(rxrpc_rx_done,
	    TP_PROTO(int result, int abort_code),

	    TP_ARGS(result, abort_code),

	    TP_STRUCT__entry(
		    __field(int,			result		)
		    __field(int,			abort_code	)
			     ),

	    TP_fast_assign(
		    __entry->result = result;
		    __entry->abort_code = abort_code;
			   ),

	    TP_printk("r=%d a=%d", __entry->result, __entry->abort_code)
	    );

TRACE_EVENT(rxrpc_abort,
	    TP_PROTO(const char *why, u32 cid, u32 call_id, rxrpc_seq_t seq,
		     int abort_code, int error),
+64 −19
Original line number Diff line number Diff line
@@ -155,15 +155,15 @@ static int rxrpc_bind(struct socket *sock, struct sockaddr *saddr, int len)
	}

	if (rx->srx.srx_service) {
		write_lock_bh(&local->services_lock);
		list_for_each_entry(prx, &local->services, listen_link) {
		write_lock(&local->services_lock);
		hlist_for_each_entry(prx, &local->services, listen_link) {
			if (prx->srx.srx_service == rx->srx.srx_service)
				goto service_in_use;
		}

		rx->local = local;
		list_add_tail(&rx->listen_link, &local->services);
		write_unlock_bh(&local->services_lock);
		hlist_add_head_rcu(&rx->listen_link, &local->services);
		write_unlock(&local->services_lock);

		rx->sk.sk_state = RXRPC_SERVER_BOUND;
	} else {
@@ -176,7 +176,7 @@ static int rxrpc_bind(struct socket *sock, struct sockaddr *saddr, int len)
	return 0;

service_in_use:
	write_unlock_bh(&local->services_lock);
	write_unlock(&local->services_lock);
	rxrpc_put_local(local);
	ret = -EADDRINUSE;
error_unlock:
@@ -193,7 +193,7 @@ static int rxrpc_listen(struct socket *sock, int backlog)
{
	struct sock *sk = sock->sk;
	struct rxrpc_sock *rx = rxrpc_sk(sk);
	unsigned int max;
	unsigned int max, old;
	int ret;

	_enter("%p,%d", rx, backlog);
@@ -212,9 +212,13 @@ static int rxrpc_listen(struct socket *sock, int backlog)
			backlog = max;
		else if (backlog < 0 || backlog > max)
			break;
		old = sk->sk_max_ack_backlog;
		sk->sk_max_ack_backlog = backlog;
		ret = rxrpc_service_prealloc(rx, GFP_KERNEL);
		if (ret == 0)
			rx->sk.sk_state = RXRPC_SERVER_LISTENING;
		ret = 0;
		else
			sk->sk_max_ack_backlog = old;
		break;
	default:
		ret = -EBUSY;
@@ -303,16 +307,19 @@ EXPORT_SYMBOL(rxrpc_kernel_end_call);
 * rxrpc_kernel_new_call_notification - Get notifications of new calls
 * @sock: The socket to intercept received messages on
 * @notify_new_call: Function to be called when new calls appear
 * @discard_new_call: Function to discard preallocated calls
 *
 * Allow a kernel service to be given notifications about new calls.
 */
void rxrpc_kernel_new_call_notification(
	struct socket *sock,
	rxrpc_notify_new_call_t notify_new_call)
	rxrpc_notify_new_call_t notify_new_call,
	rxrpc_discard_new_call_t discard_new_call)
{
	struct rxrpc_sock *rx = rxrpc_sk(sock->sk);

	rx->notify_new_call = notify_new_call;
	rx->discard_new_call = discard_new_call;
}
EXPORT_SYMBOL(rxrpc_kernel_new_call_notification);

@@ -508,15 +515,16 @@ static int rxrpc_setsockopt(struct socket *sock, int level, int optname,
static unsigned int rxrpc_poll(struct file *file, struct socket *sock,
			       poll_table *wait)
{
	unsigned int mask;
	struct sock *sk = sock->sk;
	struct rxrpc_sock *rx = rxrpc_sk(sk);
	unsigned int mask;

	sock_poll_wait(file, sk_sleep(sk), wait);
	mask = 0;

	/* the socket is readable if there are any messages waiting on the Rx
	 * queue */
	if (!skb_queue_empty(&sk->sk_receive_queue))
	if (!list_empty(&rx->recvmsg_q))
		mask |= POLLIN | POLLRDNORM;

	/* the socket is writable if there is space to add new data to the
@@ -567,9 +575,12 @@ static int rxrpc_create(struct net *net, struct socket *sock, int protocol,
	rx->family = protocol;
	rx->calls = RB_ROOT;

	INIT_LIST_HEAD(&rx->listen_link);
	INIT_LIST_HEAD(&rx->secureq);
	INIT_LIST_HEAD(&rx->acceptq);
	INIT_HLIST_NODE(&rx->listen_link);
	spin_lock_init(&rx->incoming_lock);
	INIT_LIST_HEAD(&rx->sock_calls);
	INIT_LIST_HEAD(&rx->to_be_accepted);
	INIT_LIST_HEAD(&rx->recvmsg_q);
	rwlock_init(&rx->recvmsg_lock);
	rwlock_init(&rx->call_lock);
	memset(&rx->srx, 0, sizeof(rx->srx));

@@ -577,6 +588,39 @@ static int rxrpc_create(struct net *net, struct socket *sock, int protocol,
	return 0;
}

/*
 * Kill all the calls on a socket and shut it down.
 */
static int rxrpc_shutdown(struct socket *sock, int flags)
{
	struct sock *sk = sock->sk;
	struct rxrpc_sock *rx = rxrpc_sk(sk);
	int ret = 0;

	_enter("%p,%d", sk, flags);

	if (flags != SHUT_RDWR)
		return -EOPNOTSUPP;
	if (sk->sk_state == RXRPC_CLOSE)
		return -ESHUTDOWN;

	lock_sock(sk);

	spin_lock_bh(&sk->sk_receive_queue.lock);
	if (sk->sk_state < RXRPC_CLOSE) {
		sk->sk_state = RXRPC_CLOSE;
		sk->sk_shutdown = SHUTDOWN_MASK;
	} else {
		ret = -ESHUTDOWN;
	}
	spin_unlock_bh(&sk->sk_receive_queue.lock);

	rxrpc_discard_prealloc(rx);

	release_sock(sk);
	return ret;
}

/*
 * RxRPC socket destructor
 */
@@ -615,13 +659,14 @@ static int rxrpc_release_sock(struct sock *sk)

	ASSERTCMP(rx->listen_link.next, !=, LIST_POISON1);

	if (!list_empty(&rx->listen_link)) {
		write_lock_bh(&rx->local->services_lock);
		list_del(&rx->listen_link);
		write_unlock_bh(&rx->local->services_lock);
	if (!hlist_unhashed(&rx->listen_link)) {
		write_lock(&rx->local->services_lock);
		hlist_del_rcu(&rx->listen_link);
		write_unlock(&rx->local->services_lock);
	}

	/* try to flush out this socket */
	rxrpc_discard_prealloc(rx);
	rxrpc_release_calls_on_socket(rx);
	flush_workqueue(rxrpc_workqueue);
	rxrpc_purge_queue(&sk->sk_receive_queue);
@@ -670,7 +715,7 @@ static const struct proto_ops rxrpc_rpc_ops = {
	.poll		= rxrpc_poll,
	.ioctl		= sock_no_ioctl,
	.listen		= rxrpc_listen,
	.shutdown	= sock_no_shutdown,
	.shutdown	= rxrpc_shutdown,
	.setsockopt	= rxrpc_setsockopt,
	.getsockopt	= sock_no_getsockopt,
	.sendmsg	= rxrpc_sendmsg,
Loading