Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit 077ebed6 authored by Chris Lew's avatar Chris Lew
Browse files

net: qrtr: Add non blocking option for tx_resume



QRTR should support asynchronous sends even with the tx flow control
enabled. Add a timeout to the tx_wait and send a zero size packet to
nonblocking sockets after the remote port is no longer flow controlled.
This zero size packet will act as a notification for clients to start
sending again.

This change squashes the following commits from msm-4.14:
  commit 5dff60d7bb53 ("net: qrtr: Add non blocking option for tx_resume")
  commit aff4c897bcf5 ("net: qrtr: Fix missing resume_tx notification")
  commit c969cce741fc ("net: qrtr: Rearrange tx wait and protect critical
                        section")

In addition fix minor format issues.

Change-Id: I4b2a145b9507aa18ff08ef58e5620b72952f7f5f
Signed-off-by: default avatarChris Lew <clew@codeaurora.org>
parent 61218504
Loading
Loading
Loading
Loading
+102 −39
Original line number Diff line number Diff line
@@ -147,25 +147,33 @@ struct qrtr_node {
	struct list_head item;
};

struct qrtr_tx_flow_waiter {
	struct list_head node;
	struct sock *sk;
};

/**
 * struct qrtr_tx_flow - tx flow control
 * @pending: number of waiting senders
 * @tx_failed: indicates that a message with confirm_rx flag was lost
 * @waiters: list of ports to notify when this flow resumes
 */
struct qrtr_tx_flow {
	atomic_t pending;
	int tx_failed;
	struct list_head waiters;
};

#define QRTR_TX_FLOW_HIGH	10
#define QRTR_TX_FLOW_LOW	5

static struct sk_buff *qrtr_alloc_ctrl_packet(struct qrtr_ctrl_pkt **pkt);
static int qrtr_local_enqueue(struct qrtr_node *node, struct sk_buff *skb,
			      int type, struct sockaddr_qrtr *from,
			      struct sockaddr_qrtr *to);
			      struct sockaddr_qrtr *to, unsigned int flags);
static int qrtr_bcast_enqueue(struct qrtr_node *node, struct sk_buff *skb,
			      int type, struct sockaddr_qrtr *from,
			      struct sockaddr_qrtr *to);
			      struct sockaddr_qrtr *to, unsigned int flags);
static struct qrtr_sock *qrtr_port_lookup(int port);
static void qrtr_port_put(struct qrtr_sock *ipc);

@@ -202,7 +210,10 @@ static inline int kref_put_rwsem_lock(struct kref *kref,
 */
static void __qrtr_node_release(struct kref *kref)
{
	struct qrtr_tx_flow_waiter *waiter;
	struct qrtr_tx_flow_waiter *temp;
	struct radix_tree_iter iter;
	struct qrtr_tx_flow *flow;
	struct qrtr_node *node = container_of(kref, struct qrtr_node, ref);
	unsigned long flags;
	void __rcu **slot;
@@ -220,10 +231,18 @@ static void __qrtr_node_release(struct kref *kref)
	up_write(&qrtr_epts_lock);

	/* Free tx flow counters */
	mutex_lock(&node->qrtr_tx_lock);
	radix_tree_for_each_slot(slot, &node->qrtr_tx_flow, &iter, 0) {
		flow = *slot;
		list_for_each_entry_safe(waiter, temp, &flow->waiters, node) {
			list_del(&waiter->node);
			sock_put(waiter->sk);
			kfree(waiter);
		}
		radix_tree_iter_delete(&node->qrtr_tx_flow, &iter, slot);
		kfree(*slot);
	}
	mutex_unlock(&node->qrtr_tx_lock);

	cancel_work_sync(&node->work);
	skb_queue_purge(&node->rx_queue);
@@ -253,23 +272,45 @@ static void qrtr_node_release(struct qrtr_node *node)
 */
static void qrtr_tx_resume(struct qrtr_node *node, struct sk_buff *skb)
{
	struct qrtr_tx_flow_waiter *waiter;
	struct qrtr_tx_flow_waiter *temp;
	struct qrtr_ctrl_pkt pkt = {0,};
	struct qrtr_tx_flow *flow;
	struct sockaddr_qrtr src;
	struct qrtr_sock *ipc;
	struct sk_buff *skbn;
	unsigned long key;
	u64 remote_node;
	u32 remote_port;

	skb_copy_bits(skb, 0, &pkt, sizeof(pkt));
	remote_node = le32_to_cpu(pkt.client.node);
	remote_port = le32_to_cpu(pkt.client.port);
	key = remote_node << 32 | remote_port;
	if (le32_to_cpu(pkt.cmd) != QRTR_TYPE_RESUME_TX)
		return;

	src.sq_family = AF_QIPCRTR;
	src.sq_node = le32_to_cpu(pkt.client.node);
	src.sq_port = le32_to_cpu(pkt.client.port);
	key = (u64)src.sq_node << 32 | src.sq_port;

	flow = radix_tree_lookup(&node->qrtr_tx_flow, key);
	if (flow)
		atomic_set(&flow->pending, 0);
	if (!flow)
		return;

	mutex_lock(&node->qrtr_tx_lock);
	atomic_set(&flow->pending, 0);
	wake_up_interruptible_all(&node->resume_tx);

	list_for_each_entry_safe(waiter, temp, &flow->waiters, node) {
		list_del(&waiter->node);
		skbn = alloc_skb(0, GFP_KERNEL);
		if (skbn) {
			ipc = qrtr_sk(waiter->sk);
			qrtr_local_enqueue(NULL, skbn, QRTR_TYPE_RESUME_TX,
					   &src, &ipc->us, 0);
		}
		sock_put(waiter->sk);
		kfree(waiter);
	}
	mutex_unlock(&node->qrtr_tx_lock);

	consume_skb(skb);
}

@@ -288,42 +329,40 @@ static void qrtr_tx_resume(struct qrtr_node *node, struct sk_buff *skb)
 *
 * Return: 1 if confirm_rx should be set, 0 otherwise or errno failure
 */
static int qrtr_tx_wait(struct qrtr_node *node, int dest_node, int dest_port,
			int type)
static int qrtr_tx_wait(struct qrtr_node *node, struct sockaddr_qrtr *to,
			struct sock *sk, int type, unsigned int flags)
{
	unsigned long key = (u64)dest_node << 32 | dest_port;
	unsigned long key = (u64)to->sq_node << 32 | to->sq_port;
	struct qrtr_tx_flow_waiter *waiter;
	struct qrtr_tx_flow *flow;
	int confirm_rx = 0;
	int ret;
	long timeo;
	long ret;

	/* Never set confirm_rx on non-data packets */
	if (type != QRTR_TYPE_DATA)
		return 0;

	/* Assume sk is set correctly for all data type packets */
	timeo = sock_sndtimeo(sk, flags & MSG_DONTWAIT);

	mutex_lock(&node->qrtr_tx_lock);
	flow = radix_tree_lookup(&node->qrtr_tx_flow, key);
	if (!flow) {
		flow = kzalloc(sizeof(*flow), GFP_KERNEL);
		if (flow)
		if (flow) {
			INIT_LIST_HEAD(&flow->waiters);
			radix_tree_insert(&node->qrtr_tx_flow, key, flow);
		}
	}
	mutex_unlock(&node->qrtr_tx_lock);

	/* Set confirm_rx if we where unable to find and allocate a flow */
	if (!flow)
		return 1;

	ret = timeo;
	for (;;) {
		ret = wait_event_interruptible(node->resume_tx,
					       atomic_read(&flow->pending) < QRTR_TX_FLOW_HIGH ||
					       !node->ep ||
					       READ_ONCE(flow->tx_failed));
		if (ret)
			return ret;

		if (!node->ep)
			return -EPIPE;

		mutex_lock(&node->qrtr_tx_lock);
		if (READ_ONCE(flow->tx_failed)) {
			WRITE_ONCE(flow->tx_failed, 0);
@@ -333,13 +372,35 @@ static int qrtr_tx_wait(struct qrtr_node *node, int dest_node, int dest_port,
		}

		if (atomic_read(&flow->pending) < QRTR_TX_FLOW_HIGH) {
			confirm_rx = atomic_inc_return(&flow->pending) == QRTR_TX_FLOW_LOW;
			confirm_rx = atomic_inc_return(&flow->pending) ==
				     QRTR_TX_FLOW_LOW;
			mutex_unlock(&node->qrtr_tx_lock);
			break;
		}
		if (!ret) {
			waiter = kzalloc(sizeof(*waiter), GFP_KERNEL);
			if (!waiter) {
				mutex_unlock(&node->qrtr_tx_lock);
				return -ENOMEM;
			}
			waiter->sk = sk;
			sock_hold(sk);
			list_add_tail(&waiter->node, &flow->waiters);
			mutex_unlock(&node->qrtr_tx_lock);
			return -EAGAIN;
		}
		mutex_unlock(&node->qrtr_tx_lock);

		ret = wait_event_interruptible_timeout(node->resume_tx,
				!node->ep ||
				READ_ONCE(flow->tx_failed) ||
				atomic_read(&flow->pending) < QRTR_TX_FLOW_HIGH,
				timeo);
		if (ret < 0)
			return ret;
		if (!node->ep)
			return -EPIPE;
	}
	return confirm_rx;
}

@@ -372,7 +433,7 @@ static void qrtr_tx_flow_failed(struct qrtr_node *node, int dest_node,
/* Pass an outgoing packet socket buffer to the endpoint driver. */
static int qrtr_node_enqueue(struct qrtr_node *node, struct sk_buff *skb,
			     int type, struct sockaddr_qrtr *from,
			     struct sockaddr_qrtr *to)
			     struct sockaddr_qrtr *to, unsigned int flags)
{
	struct qrtr_hdr_v1 *hdr;
	size_t len = skb->len;
@@ -384,7 +445,7 @@ static int qrtr_node_enqueue(struct qrtr_node *node, struct sk_buff *skb,
		return rc;
	}

	confirm_rx = qrtr_tx_wait(node, to->sq_node, to->sq_port, type);
	confirm_rx = qrtr_tx_wait(node, to, skb->sk, type, flags);
	if (confirm_rx < 0) {
		kfree_skb(skb);
		return confirm_rx;
@@ -667,7 +728,7 @@ static void qrtr_notify_bye(u32 nid)
		return;

	pkt->cmd = cpu_to_le32(QRTR_TYPE_BYE);
	qrtr_local_enqueue(NULL, skb, QRTR_TYPE_BYE, &src, &dst);
	qrtr_local_enqueue(NULL, skb, QRTR_TYPE_BYE, &src, &dst, 0);
}

/**
@@ -757,7 +818,7 @@ static void qrtr_send_del_client(struct qrtr_sock *ipc)
	skb_set_owner_w(skb, &ipc->sk);

	if (ipc->state == QRTR_STATE_MULTI) {
		qrtr_bcast_enqueue(NULL, skb, type, &ipc->us, &to);
		qrtr_bcast_enqueue(NULL, skb, type, &ipc->us, &to, 0);
		return;
	}

@@ -773,11 +834,11 @@ static void qrtr_send_del_client(struct qrtr_sock *ipc)
		}

		skb_set_owner_w(skbn, &ipc->sk);
		qrtr_node_enqueue(node, skbn, type, &ipc->us, &to);
		qrtr_node_enqueue(node, skbn, type, &ipc->us, &to, 0);
		qrtr_node_release(node);
	}
exit:
	qrtr_local_enqueue(NULL, skb, type, &ipc->us, &to);
	qrtr_local_enqueue(NULL, skb, type, &ipc->us, &to, 0);
}

/* Remove port assignment. */
@@ -936,7 +997,7 @@ static int qrtr_bind(struct socket *sock, struct sockaddr *saddr, int len)
/* Queue packet to local peer socket. */
static int qrtr_local_enqueue(struct qrtr_node *node, struct sk_buff *skb,
			      int type, struct sockaddr_qrtr *from,
			      struct sockaddr_qrtr *to)
			      struct sockaddr_qrtr *to, unsigned int flags)
{
	struct qrtr_sock *ipc;
	struct qrtr_cb *cb;
@@ -976,7 +1037,7 @@ static int qrtr_local_enqueue(struct qrtr_node *node, struct sk_buff *skb,
/* Queue packet for broadcast. */
static int qrtr_bcast_enqueue(struct qrtr_node *node, struct sk_buff *skb,
			      int type, struct sockaddr_qrtr *from,
			      struct sockaddr_qrtr *to)
			      struct sockaddr_qrtr *to, unsigned int flags)
{
	struct sk_buff *skbn;

@@ -989,11 +1050,11 @@ static int qrtr_bcast_enqueue(struct qrtr_node *node, struct sk_buff *skb,
		if (!skbn)
			break;
		skb_set_owner_w(skbn, skb->sk);
		qrtr_node_enqueue(node, skbn, type, from, to);
		qrtr_node_enqueue(node, skbn, type, from, to, flags);
	}
	up_read(&qrtr_epts_lock);

	qrtr_local_enqueue(node, skb, type, from, to);
	qrtr_local_enqueue(node, skb, type, from, to, flags);

	return 0;
}
@@ -1002,7 +1063,8 @@ static int qrtr_sendmsg(struct socket *sock, struct msghdr *msg, size_t len)
{
	DECLARE_SOCKADDR(struct sockaddr_qrtr *, addr, msg->msg_name);
	int (*enqueue_fn)(struct qrtr_node *, struct sk_buff *, int,
			  struct sockaddr_qrtr *, struct sockaddr_qrtr *);
			  struct sockaddr_qrtr *, struct sockaddr_qrtr *,
			  unsigned int);
	__le32 qrtr_type = cpu_to_le32(QRTR_TYPE_DATA);
	struct qrtr_sock *ipc = qrtr_sk(sock->sk);
	struct sock *sk = sock->sk;
@@ -1096,7 +1158,7 @@ static int qrtr_sendmsg(struct socket *sock, struct msghdr *msg, size_t len)
	if (addr->sq_port == QRTR_PORT_CTRL && type == QRTR_TYPE_NEW_SERVER)
		ipc->state = QRTR_STATE_MULTI;

	rc = enqueue_fn(node, skb, type, &ipc->us, addr);
	rc = enqueue_fn(node, skb, type, &ipc->us, addr, msg->msg_flags);
	if (rc >= 0)
		rc = len;

@@ -1128,7 +1190,8 @@ static int qrtr_send_resume_tx(struct qrtr_cb *cb)
	pkt->client.node = cpu_to_le32(cb->dst_node);
	pkt->client.port = cpu_to_le32(cb->dst_port);

	ret = qrtr_node_enqueue(node, skb, QRTR_TYPE_RESUME_TX, &local, &remote);
	ret = qrtr_node_enqueue(node, skb, QRTR_TYPE_RESUME_TX,
				&local, &remote, 0);

	qrtr_node_release(node);