Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit d14393eb authored by qctecmdr's avatar qctecmdr Committed by Gerrit - the friendly Code Review server
Browse files

Merge "net: qrtr: Implement outgoing flow control"

parents 5d0063a9 332849c5
Loading
Loading
Loading
Loading
+213 −34
Original line number Diff line number Diff line
@@ -8,6 +8,7 @@
#include <linux/qrtr.h>
#include <linux/termios.h>	/* For TIOCINQ/OUTQ */
#include <linux/numa.h>
#include <linux/wait.h>

#include <net/sock.h>

@@ -113,6 +114,9 @@ static DEFINE_MUTEX(qrtr_port_lock);
 * @ep: endpoint
 * @ref: reference count for node
 * @nid: node id
 * @qrtr_tx_flow: tree with tx counts per flow
 * @resume_tx: waiters for a resume tx from the remote
 * @qrtr_tx_lock: lock for qrtr_tx_flow
 * @rx_queue: receive queue
 * @work: scheduled work struct for recv work
 * @item: list item for broadcast list
@@ -123,11 +127,28 @@ struct qrtr_node {
	struct kref ref;
	unsigned int nid;

	struct radix_tree_root qrtr_tx_flow;
	struct wait_queue_head resume_tx;
	struct mutex qrtr_tx_lock; /* for qrtr_tx_flow */

	struct sk_buff_head rx_queue;
	struct work_struct work;
	struct list_head item;
};

/**
 * struct qrtr_tx_flow - tx flow control
 * @pending: number of waiting senders
 * @tx_failed: indicates that a message with confirm_rx flag was lost
 */
struct qrtr_tx_flow {
	atomic_t pending;
	int tx_failed;
};

#define QRTR_TX_FLOW_HIGH	10
#define QRTR_TX_FLOW_LOW	5

static int qrtr_local_enqueue(struct qrtr_node *node, struct sk_buff *skb,
			      int type, struct sockaddr_qrtr *from,
			      struct sockaddr_qrtr *to);
@@ -142,7 +163,9 @@ static int qrtr_bcast_enqueue(struct qrtr_node *node, struct sk_buff *skb,
 */
static void __qrtr_node_release(struct kref *kref)
{
	struct radix_tree_iter iter;
	struct qrtr_node *node = container_of(kref, struct qrtr_node, ref);
	void __rcu **slot;

	if (node->nid != QRTR_EP_NID_AUTO)
		radix_tree_delete(&qrtr_nodes, node->nid);
@@ -150,6 +173,12 @@ static void __qrtr_node_release(struct kref *kref)
	list_del(&node->item);
	mutex_unlock(&qrtr_node_lock);

	/* Free tx flow counters */
	radix_tree_for_each_slot(slot, &node->qrtr_tx_flow, &iter, 0) {
		radix_tree_iter_delete(&node->qrtr_tx_flow, &iter, slot);
		kfree(*slot);
	}

	cancel_work_sync(&node->work);
	skb_queue_purge(&node->rx_queue);
	kfree(node);
@@ -171,6 +200,126 @@ static void qrtr_node_release(struct qrtr_node *node)
	kref_put_mutex(&node->ref, __qrtr_node_release, &qrtr_node_lock);
}

/**
 * qrtr_tx_resume() - reset flow control counter
 * @node:	qrtr_node that the QRTR_TYPE_RESUME_TX packet arrived on
 * @skb:	resume_tx packet
 */
static void qrtr_tx_resume(struct qrtr_node *node, struct sk_buff *skb)
{
	struct qrtr_ctrl_pkt *pkt = (struct qrtr_ctrl_pkt *)skb->data;
	u64 remote_node = le32_to_cpu(pkt->client.node);
	u32 remote_port = le32_to_cpu(pkt->client.port);
	struct qrtr_tx_flow *flow;
	unsigned long key;

	key = remote_node << 32 | remote_port;

	flow = radix_tree_lookup(&node->qrtr_tx_flow, key);
	if (flow)
		atomic_set(&flow->pending, 0);

	wake_up_interruptible_all(&node->resume_tx);

	consume_skb(skb);
}

/**
 * qrtr_tx_wait() - flow control for outgoing packets
 * @node:	qrtr_node that the packet is to be send to
 * @dest_node:	node id of the destination
 * @dest_port:	port number of the destination
 * @type:	type of message
 *
 * The flow control scheme is based around the low and high "watermarks". When
 * the low watermark is passed the confirm_rx flag is set on the outgoing
 * message, which will trigger the remote to send a control message of the type
 * QRTR_TYPE_RESUME_TX to reset the counter. If the high watermark is hit
 * further transmision should be paused.
 *
 * Return: 1 if confirm_rx should be set, 0 otherwise or errno failure
 */
static int qrtr_tx_wait(struct qrtr_node *node, int dest_node, int dest_port,
			int type)
{
	unsigned long key = (u64)dest_node << 32 | dest_port;
	struct qrtr_tx_flow *flow;
	int confirm_rx = 0;
	int ret;

	/* Never set confirm_rx on non-data packets */
	if (type != QRTR_TYPE_DATA)
		return 0;

	mutex_lock(&node->qrtr_tx_lock);
	flow = radix_tree_lookup(&node->qrtr_tx_flow, key);
	if (!flow) {
		flow = kzalloc(sizeof(*flow), GFP_KERNEL);
		if (flow)
			radix_tree_insert(&node->qrtr_tx_flow, key, flow);
	}
	mutex_unlock(&node->qrtr_tx_lock);

	/* Set confirm_rx if we where unable to find and allocate a flow */
	if (!flow)
		return 1;

	for (;;) {
		ret = wait_event_interruptible(node->resume_tx,
					       atomic_read(&flow->pending) < QRTR_TX_FLOW_HIGH ||
					       !node->ep ||
					       READ_ONCE(flow->tx_failed));
		if (ret)
			return ret;

		if (!node->ep)
			return -EPIPE;

		mutex_lock(&node->qrtr_tx_lock);
		if (READ_ONCE(flow->tx_failed)) {
			WRITE_ONCE(flow->tx_failed, 0);
			confirm_rx = 1;
			mutex_unlock(&node->qrtr_tx_lock);
			break;
		}

		if (atomic_read(&flow->pending) < QRTR_TX_FLOW_HIGH) {
			confirm_rx = atomic_inc_return(&flow->pending) == QRTR_TX_FLOW_LOW;
			mutex_unlock(&node->qrtr_tx_lock);
			break;
		}
		mutex_unlock(&node->qrtr_tx_lock);
	}

	return confirm_rx;
}

/**
 * qrtr_tx_flow_failed() - flag that tx of confirm_rx flagged messages failed
 * @node:	qrtr_node that the packet is to be send to
 * @dest_node:	node id of the destination
 * @dest_port:	port number of the destination
 *
 * Signal that the transmission of a message with confirm_rx flag failed. The
 * flow's "pending" counter will keep incrementing towards QRTR_TX_FLOW_HIGH,
 * at which point transmission would stall forever waiting for the resume TX
 * message associated with the dropped confirm_rx message.
 * Work around this by marking the flow as having a failed transmission and
 * cause the next transmission attempt to be sent with the confirm_rx.
 */
static void qrtr_tx_flow_failed(struct qrtr_node *node, int dest_node,
				int dest_port)
{
	unsigned long key = (u64)dest_node << 32 | dest_port;
	struct qrtr_tx_flow *flow;

	mutex_lock(&node->qrtr_tx_lock);
	flow = radix_tree_lookup(&node->qrtr_tx_flow, key);
	if (flow)
		WRITE_ONCE(flow->tx_failed, 1);
	mutex_unlock(&node->qrtr_tx_lock);
}

/* Pass an outgoing packet socket buffer to the endpoint driver. */
static int qrtr_node_enqueue(struct qrtr_node *node, struct sk_buff *skb,
			     int type, struct sockaddr_qrtr *from,
@@ -179,6 +328,13 @@ static int qrtr_node_enqueue(struct qrtr_node *node, struct sk_buff *skb,
	struct qrtr_hdr_v1 *hdr;
	size_t len = skb->len;
	int rc = -ENODEV;
	int confirm_rx;

	confirm_rx = qrtr_tx_wait(node, to->sq_node, to->sq_port, type);
	if (confirm_rx < 0) {
		kfree_skb(skb);
		return confirm_rx;
	}

	hdr = skb_push(skb, sizeof(*hdr));
	hdr->version = cpu_to_le32(QRTR_PROTO_VER_1);
@@ -194,7 +350,7 @@ static int qrtr_node_enqueue(struct qrtr_node *node, struct sk_buff *skb,
	}

	hdr->size = cpu_to_le32(len);
	hdr->confirm_rx = 0;
	hdr->confirm_rx = !!confirm_rx;

	skb_put_padto(skb, ALIGN(len, 4));

@@ -205,6 +361,11 @@ static int qrtr_node_enqueue(struct qrtr_node *node, struct sk_buff *skb,
		kfree_skb(skb);
	mutex_unlock(&node->ep_lock);

	/* Need to ensure that a subsequent message carries the otherwise lost
	 * confirm_rx flag if we dropped this one */
	if (rc && confirm_rx)
		qrtr_tx_flow_failed(node, to->sq_node, to->sq_port);

	return rc;
}

@@ -311,7 +472,8 @@ int qrtr_endpoint_post(struct qrtr_endpoint *ep, const void *data, size_t len)
	if (len != ALIGN(size, 4) + hdrlen)
		goto err;

	if (cb->dst_port != QRTR_PORT_CTRL && cb->type != QRTR_TYPE_DATA)
	if (cb->dst_port != QRTR_PORT_CTRL && cb->type != QRTR_TYPE_DATA &&
	    cb->type != QRTR_TYPE_RESUME_TX)
		goto err;

	skb_put_data(skb, data + hdrlen, size);
@@ -362,25 +524,17 @@ static void qrtr_port_put(struct qrtr_sock *ipc);
static void qrtr_node_rx_work(struct work_struct *work)
{
	struct qrtr_node *node = container_of(work, struct qrtr_node, work);
	struct qrtr_ctrl_pkt *pkt;
	struct sockaddr_qrtr dst;
	struct sockaddr_qrtr src;
	struct sk_buff *skb;

	while ((skb = skb_dequeue(&node->rx_queue)) != NULL) {
		struct qrtr_sock *ipc;
		struct qrtr_cb *cb;
		int confirm;

		cb = (struct qrtr_cb *)skb->cb;
		src.sq_node = cb->src_node;
		src.sq_port = cb->src_port;
		dst.sq_node = cb->dst_node;
		dst.sq_port = cb->dst_port;
		confirm = !!cb->confirm_rx;
		struct qrtr_cb *cb = (struct qrtr_cb *)skb->cb;

		qrtr_node_assign(node, cb->src_node);

		if (cb->type == QRTR_TYPE_RESUME_TX) {
			qrtr_tx_resume(node, skb);
		} else {
			ipc = qrtr_port_lookup(cb->dst_port);
			if (!ipc) {
				kfree_skb(skb);
@@ -390,19 +544,6 @@ static void qrtr_node_rx_work(struct work_struct *work)

				qrtr_port_put(ipc);
			}

		if (confirm) {
			skb = qrtr_alloc_ctrl_packet(&pkt);
			if (!skb)
				break;

			pkt->cmd = cpu_to_le32(QRTR_TYPE_RESUME_TX);
			pkt->client.node = cpu_to_le32(dst.sq_node);
			pkt->client.port = cpu_to_le32(dst.sq_port);

			if (qrtr_node_enqueue(node, skb, QRTR_TYPE_RESUME_TX,
					      &dst, &src))
				break;
		}
	}
}
@@ -433,6 +574,10 @@ int qrtr_endpoint_register(struct qrtr_endpoint *ep, unsigned int nid)
	node->nid = QRTR_EP_NID_AUTO;
	node->ep = ep;

	mutex_init(&node->qrtr_tx_lock);
	INIT_RADIX_TREE(&node->qrtr_tx_flow, GFP_KERNEL);
	init_waitqueue_head(&node->resume_tx);

	qrtr_node_assign(node, nid);

	mutex_lock(&qrtr_node_lock);
@@ -467,6 +612,9 @@ void qrtr_endpoint_unregister(struct qrtr_endpoint *ep)
		qrtr_local_enqueue(NULL, skb, QRTR_TYPE_BYE, &src, &dst);
	}

	/* Wake up any transmitters waiting for resume-tx from the node */
	wake_up_interruptible_all(&node->resume_tx);

	qrtr_node_release(node);
	ep->node = NULL;
}
@@ -816,6 +964,34 @@ static int qrtr_sendmsg(struct socket *sock, struct msghdr *msg, size_t len)
	return rc;
}

static int qrtr_send_resume_tx(struct qrtr_cb *cb)
{
	struct sockaddr_qrtr remote = { AF_QIPCRTR, cb->src_node, cb->src_port };
	struct sockaddr_qrtr local = { AF_QIPCRTR, cb->dst_node, cb->dst_port };
	struct qrtr_ctrl_pkt *pkt;
	struct qrtr_node *node;
	struct sk_buff *skb;
	int ret;

	node = qrtr_node_lookup(remote.sq_node);
	if (!node)
		return -EINVAL;

	skb = qrtr_alloc_ctrl_packet(&pkt);
	if (!skb)
		return -ENOMEM;

	pkt->cmd = cpu_to_le32(QRTR_TYPE_RESUME_TX);
	pkt->client.node = cpu_to_le32(cb->dst_node);
	pkt->client.port = cpu_to_le32(cb->dst_port);

	ret = qrtr_node_enqueue(node, skb, QRTR_TYPE_RESUME_TX, &local, &remote);

	qrtr_node_release(node);

	return ret;
}

static int qrtr_recvmsg(struct socket *sock, struct msghdr *msg,
			size_t size, int flags)
{
@@ -838,6 +1014,7 @@ static int qrtr_recvmsg(struct socket *sock, struct msghdr *msg,
		release_sock(sk);
		return rc;
	}
	cb = (struct qrtr_cb *)skb->cb;

	copied = skb->len;
	if (copied > size) {
@@ -851,7 +1028,6 @@ static int qrtr_recvmsg(struct socket *sock, struct msghdr *msg,
	rc = copied;

	if (addr) {
		cb = (struct qrtr_cb *)skb->cb;
		addr->sq_family = AF_QIPCRTR;
		addr->sq_node = cb->src_node;
		addr->sq_port = cb->src_port;
@@ -859,6 +1035,9 @@ static int qrtr_recvmsg(struct socket *sock, struct msghdr *msg,
	}

out:
	if (cb->confirm_rx)
		qrtr_send_resume_tx(cb);

	skb_free_datagram(sk, skb);
	release_sock(sk);