Loading net/qrtr/qrtr.c +105 −36 Original line number Diff line number Diff line Loading @@ -158,19 +158,26 @@ struct qrtr_node { void *ilc; }; struct qrtr_tx_flow_waiter { struct list_head node; struct sock *sk; }; struct qrtr_tx_flow { atomic_t pending; struct list_head waiters; }; #define QRTR_TX_FLOW_HIGH 10 #define QRTR_TX_FLOW_LOW 5 static struct sk_buff *qrtr_alloc_ctrl_packet(struct qrtr_ctrl_pkt **pkt); static int qrtr_local_enqueue(struct qrtr_node *node, struct sk_buff *skb, int type, struct sockaddr_qrtr *from, struct sockaddr_qrtr *to); struct sockaddr_qrtr *to, unsigned int flags); static int qrtr_bcast_enqueue(struct qrtr_node *node, struct sk_buff *skb, int type, struct sockaddr_qrtr *from, struct sockaddr_qrtr *to); struct sockaddr_qrtr *to, unsigned int flags); static void qrtr_log_tx_msg(struct qrtr_node *node, struct qrtr_hdr_v1 *hdr, struct sk_buff *skb) Loading Loading @@ -289,7 +296,10 @@ static inline int kref_put_rwsem_lock(struct kref *kref, */ static void __qrtr_node_release(struct kref *kref) { struct qrtr_tx_flow_waiter *waiter; struct qrtr_tx_flow_waiter *temp; struct radix_tree_iter iter; struct qrtr_tx_flow *flow; struct qrtr_node *node = container_of(kref, struct qrtr_node, ref); void __rcu **slot; Loading @@ -304,10 +314,19 @@ static void __qrtr_node_release(struct kref *kref) up_write(&qrtr_node_lock); /* Free tx flow counters */ mutex_lock(&node->qrtr_tx_lock); radix_tree_for_each_slot(slot, &node->qrtr_tx_flow, &iter, 0) { radix_tree_iter_delete(&node->qrtr_tx_flow, &iter, slot); kfree(*slot); flow = *slot; list_for_each_entry_safe(waiter, temp, &flow->waiters, node) { list_del(&waiter->node); sock_put(waiter->sk); kfree(waiter); } kfree(flow); radix_tree_delete(&node->qrtr_tx_flow, iter.index); } mutex_unlock(&node->qrtr_tx_lock); flush_work(&node->work); skb_queue_purge(&node->rx_queue); Loading Loading @@ -337,25 +356,44 @@ static void qrtr_node_release(struct qrtr_node *node) */ static void qrtr_tx_resume(struct qrtr_node *node, struct sk_buff *skb) { struct qrtr_tx_flow_waiter *waiter; struct qrtr_tx_flow_waiter *temp; struct qrtr_ctrl_pkt *pkt; struct qrtr_tx_flow *flow; struct sockaddr_qrtr src; struct qrtr_sock *ipc; struct sk_buff *skbn; unsigned long key; int dest_node; int dest_port; pkt = (struct qrtr_ctrl_pkt *)skb->data; if (le32_to_cpu(pkt->cmd) != QRTR_TYPE_RESUME_TX) return; dest_node = le32_to_cpu(pkt->client.node); dest_port = le32_to_cpu(pkt->client.port); key = (u64)dest_node << 32 | dest_port; src.sq_family = AF_QIPCRTR; src.sq_node = le32_to_cpu(pkt->client.node); src.sq_port = le32_to_cpu(pkt->client.port); key = (u64)src.sq_node << 32 | src.sq_port; flow = radix_tree_lookup(&node->qrtr_tx_flow, key); if (flow) atomic_set(&flow->pending, 0); if (!flow) return; atomic_set(&flow->pending, 0); wake_up_interruptible_all(&node->resume_tx); mutex_lock(&node->qrtr_tx_lock); list_for_each_entry_safe(waiter, temp, &flow->waiters, node) { list_del(&waiter->node); skbn = alloc_skb(0, GFP_KERNEL); if (skbn) { ipc = qrtr_sk(waiter->sk); qrtr_local_enqueue(NULL, skbn, QRTR_TYPE_RESUME_TX, &src, &ipc->us, 0); } sock_put(waiter->sk); kfree(waiter); } mutex_unlock(&node->qrtr_tx_lock); } /** Loading @@ -373,35 +411,54 @@ static void qrtr_tx_resume(struct qrtr_node *node, struct sk_buff *skb) * * Return: 1 if confirm_rx should be set, 0 otherwise or errno failure */ static int qrtr_tx_wait(struct qrtr_node *node, int dest_node, int dest_port, int type) static int qrtr_tx_wait(struct qrtr_node *node, struct sockaddr_qrtr *to, struct sock *sk, int type, unsigned int flags) { struct qrtr_tx_flow_waiter *waiter; struct qrtr_tx_flow *flow; unsigned long key = (u64)dest_node << 32 | dest_port; unsigned long key = (u64)to->sq_node << 32 | to->sq_port; int confirm_rx = 0; int ret; long timeo; long ret; /* Never set confirm_rx on non-data packets */ if (type != QRTR_TYPE_DATA) return 0; /* Assume sk is set correctly for all data type packets */ timeo = sock_sndtimeo(sk, flags & MSG_DONTWAIT); mutex_lock(&node->qrtr_tx_lock); flow = radix_tree_lookup(&node->qrtr_tx_flow, key); if (!flow) { flow = kzalloc(sizeof(*flow), GFP_KERNEL); if (!flow) return 1; else INIT_LIST_HEAD(&flow->waiters); radix_tree_insert(&node->qrtr_tx_flow, key, flow); } mutex_unlock(&node->qrtr_tx_lock); for (;;) { ret = wait_event_interruptible(node->resume_tx, atomic_read(&flow->pending) < QRTR_TX_FLOW_HIGH || !node->ep); if (ret) ret = wait_event_interruptible_timeout( node->resume_tx, !node->ep || atomic_read(&flow->pending) < QRTR_TX_FLOW_HIGH, timeo); if (ret < 0) return ret; if (!ret) { waiter = kzalloc(sizeof(*waiter), GFP_KERNEL); if (!waiter) return -ENOMEM; waiter->sk = sk; sock_hold(sk); mutex_lock(&node->qrtr_tx_lock); list_add_tail(&waiter->node, &flow->waiters); mutex_unlock(&node->qrtr_tx_lock); return -EAGAIN; } if (!node->ep) return -EPIPE; Loading @@ -422,14 +479,14 @@ static int qrtr_tx_wait(struct qrtr_node *node, int dest_node, int dest_port, /* Pass an outgoing packet socket buffer to the endpoint driver. */ static int qrtr_node_enqueue(struct qrtr_node *node, struct sk_buff *skb, int type, struct sockaddr_qrtr *from, struct sockaddr_qrtr *to) struct sockaddr_qrtr *to, unsigned int flags) { struct qrtr_hdr_v1 *hdr; int confirm_rx; size_t len = skb->len; int rc = -ENODEV; confirm_rx = qrtr_tx_wait(node, to->sq_node, to->sq_port, type); confirm_rx = qrtr_tx_wait(node, to, skb->sk, type, flags); if (confirm_rx < 0) { kfree_skb(skb); return confirm_rx; Loading Loading @@ -708,22 +765,33 @@ EXPORT_SYMBOL_GPL(qrtr_endpoint_register); */ void qrtr_endpoint_unregister(struct qrtr_endpoint *ep) { struct radix_tree_iter iter; struct qrtr_node *node = ep->node; struct sockaddr_qrtr src = {AF_QIPCRTR, node->nid, QRTR_PORT_CTRL}; struct sockaddr_qrtr dst = {AF_QIPCRTR, qrtr_local_nid, QRTR_PORT_CTRL}; struct qrtr_ctrl_pkt *pkt; struct sk_buff *skb; void __rcu **slot; mutex_lock(&node->ep_lock); node->ep = NULL; mutex_unlock(&node->ep_lock); /* Notify the local controller about the event */ down_read(&qrtr_node_lock); radix_tree_for_each_slot(slot, &qrtr_nodes, &iter, 0) { if (node != *slot) continue; skb = qrtr_alloc_ctrl_packet(&pkt); if (skb) { if (!skb) continue; src.sq_node = iter.index; pkt->cmd = cpu_to_le32(QRTR_TYPE_BYE); qrtr_local_enqueue(NULL, skb, QRTR_TYPE_BYE, &src, &dst); qrtr_local_enqueue(NULL, skb, QRTR_TYPE_BYE, &src, &dst, 0); } up_read(&qrtr_node_lock); /* Wake up any transmitters waiting for resume-tx from the node */ wake_up_interruptible_all(&node->resume_tx); Loading Loading @@ -783,7 +851,7 @@ static void qrtr_send_del_client(struct qrtr_sock *ipc) skb_set_owner_w(skb, &ipc->sk); if (ipc->state == QRTR_STATE_MULTI) { qrtr_bcast_enqueue(NULL, skb, type, &ipc->us, &to); qrtr_bcast_enqueue(NULL, skb, type, &ipc->us, &to, 0); return; } Loading @@ -799,11 +867,11 @@ static void qrtr_send_del_client(struct qrtr_sock *ipc) } skb_set_owner_w(skbn, &ipc->sk); qrtr_node_enqueue(node, skbn, type, &ipc->us, &to); qrtr_node_enqueue(node, skbn, type, &ipc->us, &to, 0); qrtr_node_release(node); } exit: qrtr_local_enqueue(NULL, skb, type, &ipc->us, &to); qrtr_local_enqueue(NULL, skb, type, &ipc->us, &to, 0); } /* Remove port assignment. */ Loading Loading @@ -959,7 +1027,7 @@ static int qrtr_bind(struct socket *sock, struct sockaddr *saddr, int len) /* Queue packet to local peer socket. */ static int qrtr_local_enqueue(struct qrtr_node *node, struct sk_buff *skb, int type, struct sockaddr_qrtr *from, struct sockaddr_qrtr *to) struct sockaddr_qrtr *to, unsigned int flags) { struct qrtr_sock *ipc; struct qrtr_cb *cb; Loading Loading @@ -992,7 +1060,7 @@ static int qrtr_local_enqueue(struct qrtr_node *node, struct sk_buff *skb, /* Queue packet for broadcast. */ static int qrtr_bcast_enqueue(struct qrtr_node *node, struct sk_buff *skb, int type, struct sockaddr_qrtr *from, struct sockaddr_qrtr *to) struct sockaddr_qrtr *to, unsigned int flags) { struct sk_buff *skbn; Loading @@ -1004,11 +1072,11 @@ static int qrtr_bcast_enqueue(struct qrtr_node *node, struct sk_buff *skb, if (!skbn) break; skb_set_owner_w(skbn, skb->sk); qrtr_node_enqueue(node, skbn, type, from, to); qrtr_node_enqueue(node, skbn, type, from, to, flags); } up_read(&qrtr_node_lock); qrtr_local_enqueue(node, skb, type, from, to); qrtr_local_enqueue(node, skb, type, from, to, flags); return 0; } Loading @@ -1017,7 +1085,8 @@ static int qrtr_sendmsg(struct socket *sock, struct msghdr *msg, size_t len) { DECLARE_SOCKADDR(struct sockaddr_qrtr *, addr, msg->msg_name); int (*enqueue_fn)(struct qrtr_node *, struct sk_buff *, int, struct sockaddr_qrtr *, struct sockaddr_qrtr *); struct sockaddr_qrtr *, struct sockaddr_qrtr *, unsigned int); struct qrtr_sock *ipc = qrtr_sk(sock->sk); struct sock *sk = sock->sk; struct qrtr_node *node; Loading Loading @@ -1106,7 +1175,7 @@ static int qrtr_sendmsg(struct socket *sock, struct msghdr *msg, size_t len) type = le32_to_cpu(type); } rc = enqueue_fn(node, skb, type, &ipc->us, addr); rc = enqueue_fn(node, skb, type, &ipc->us, addr, msg->msg_flags); if (rc >= 0) rc = len; Loading Loading @@ -1140,7 +1209,7 @@ static int qrtr_resume_tx(struct qrtr_cb *cb) pkt->client.port = cpu_to_le32(cb->dst_port); ret = qrtr_node_enqueue(node, skb, QRTR_TYPE_RESUME_TX, &local, &remote); &local, &remote, 0); qrtr_node_release(node); Loading Loading
net/qrtr/qrtr.c +105 −36 Original line number Diff line number Diff line Loading @@ -158,19 +158,26 @@ struct qrtr_node { void *ilc; }; struct qrtr_tx_flow_waiter { struct list_head node; struct sock *sk; }; struct qrtr_tx_flow { atomic_t pending; struct list_head waiters; }; #define QRTR_TX_FLOW_HIGH 10 #define QRTR_TX_FLOW_LOW 5 static struct sk_buff *qrtr_alloc_ctrl_packet(struct qrtr_ctrl_pkt **pkt); static int qrtr_local_enqueue(struct qrtr_node *node, struct sk_buff *skb, int type, struct sockaddr_qrtr *from, struct sockaddr_qrtr *to); struct sockaddr_qrtr *to, unsigned int flags); static int qrtr_bcast_enqueue(struct qrtr_node *node, struct sk_buff *skb, int type, struct sockaddr_qrtr *from, struct sockaddr_qrtr *to); struct sockaddr_qrtr *to, unsigned int flags); static void qrtr_log_tx_msg(struct qrtr_node *node, struct qrtr_hdr_v1 *hdr, struct sk_buff *skb) Loading Loading @@ -289,7 +296,10 @@ static inline int kref_put_rwsem_lock(struct kref *kref, */ static void __qrtr_node_release(struct kref *kref) { struct qrtr_tx_flow_waiter *waiter; struct qrtr_tx_flow_waiter *temp; struct radix_tree_iter iter; struct qrtr_tx_flow *flow; struct qrtr_node *node = container_of(kref, struct qrtr_node, ref); void __rcu **slot; Loading @@ -304,10 +314,19 @@ static void __qrtr_node_release(struct kref *kref) up_write(&qrtr_node_lock); /* Free tx flow counters */ mutex_lock(&node->qrtr_tx_lock); radix_tree_for_each_slot(slot, &node->qrtr_tx_flow, &iter, 0) { radix_tree_iter_delete(&node->qrtr_tx_flow, &iter, slot); kfree(*slot); flow = *slot; list_for_each_entry_safe(waiter, temp, &flow->waiters, node) { list_del(&waiter->node); sock_put(waiter->sk); kfree(waiter); } kfree(flow); radix_tree_delete(&node->qrtr_tx_flow, iter.index); } mutex_unlock(&node->qrtr_tx_lock); flush_work(&node->work); skb_queue_purge(&node->rx_queue); Loading Loading @@ -337,25 +356,44 @@ static void qrtr_node_release(struct qrtr_node *node) */ static void qrtr_tx_resume(struct qrtr_node *node, struct sk_buff *skb) { struct qrtr_tx_flow_waiter *waiter; struct qrtr_tx_flow_waiter *temp; struct qrtr_ctrl_pkt *pkt; struct qrtr_tx_flow *flow; struct sockaddr_qrtr src; struct qrtr_sock *ipc; struct sk_buff *skbn; unsigned long key; int dest_node; int dest_port; pkt = (struct qrtr_ctrl_pkt *)skb->data; if (le32_to_cpu(pkt->cmd) != QRTR_TYPE_RESUME_TX) return; dest_node = le32_to_cpu(pkt->client.node); dest_port = le32_to_cpu(pkt->client.port); key = (u64)dest_node << 32 | dest_port; src.sq_family = AF_QIPCRTR; src.sq_node = le32_to_cpu(pkt->client.node); src.sq_port = le32_to_cpu(pkt->client.port); key = (u64)src.sq_node << 32 | src.sq_port; flow = radix_tree_lookup(&node->qrtr_tx_flow, key); if (flow) atomic_set(&flow->pending, 0); if (!flow) return; atomic_set(&flow->pending, 0); wake_up_interruptible_all(&node->resume_tx); mutex_lock(&node->qrtr_tx_lock); list_for_each_entry_safe(waiter, temp, &flow->waiters, node) { list_del(&waiter->node); skbn = alloc_skb(0, GFP_KERNEL); if (skbn) { ipc = qrtr_sk(waiter->sk); qrtr_local_enqueue(NULL, skbn, QRTR_TYPE_RESUME_TX, &src, &ipc->us, 0); } sock_put(waiter->sk); kfree(waiter); } mutex_unlock(&node->qrtr_tx_lock); } /** Loading @@ -373,35 +411,54 @@ static void qrtr_tx_resume(struct qrtr_node *node, struct sk_buff *skb) * * Return: 1 if confirm_rx should be set, 0 otherwise or errno failure */ static int qrtr_tx_wait(struct qrtr_node *node, int dest_node, int dest_port, int type) static int qrtr_tx_wait(struct qrtr_node *node, struct sockaddr_qrtr *to, struct sock *sk, int type, unsigned int flags) { struct qrtr_tx_flow_waiter *waiter; struct qrtr_tx_flow *flow; unsigned long key = (u64)dest_node << 32 | dest_port; unsigned long key = (u64)to->sq_node << 32 | to->sq_port; int confirm_rx = 0; int ret; long timeo; long ret; /* Never set confirm_rx on non-data packets */ if (type != QRTR_TYPE_DATA) return 0; /* Assume sk is set correctly for all data type packets */ timeo = sock_sndtimeo(sk, flags & MSG_DONTWAIT); mutex_lock(&node->qrtr_tx_lock); flow = radix_tree_lookup(&node->qrtr_tx_flow, key); if (!flow) { flow = kzalloc(sizeof(*flow), GFP_KERNEL); if (!flow) return 1; else INIT_LIST_HEAD(&flow->waiters); radix_tree_insert(&node->qrtr_tx_flow, key, flow); } mutex_unlock(&node->qrtr_tx_lock); for (;;) { ret = wait_event_interruptible(node->resume_tx, atomic_read(&flow->pending) < QRTR_TX_FLOW_HIGH || !node->ep); if (ret) ret = wait_event_interruptible_timeout( node->resume_tx, !node->ep || atomic_read(&flow->pending) < QRTR_TX_FLOW_HIGH, timeo); if (ret < 0) return ret; if (!ret) { waiter = kzalloc(sizeof(*waiter), GFP_KERNEL); if (!waiter) return -ENOMEM; waiter->sk = sk; sock_hold(sk); mutex_lock(&node->qrtr_tx_lock); list_add_tail(&waiter->node, &flow->waiters); mutex_unlock(&node->qrtr_tx_lock); return -EAGAIN; } if (!node->ep) return -EPIPE; Loading @@ -422,14 +479,14 @@ static int qrtr_tx_wait(struct qrtr_node *node, int dest_node, int dest_port, /* Pass an outgoing packet socket buffer to the endpoint driver. */ static int qrtr_node_enqueue(struct qrtr_node *node, struct sk_buff *skb, int type, struct sockaddr_qrtr *from, struct sockaddr_qrtr *to) struct sockaddr_qrtr *to, unsigned int flags) { struct qrtr_hdr_v1 *hdr; int confirm_rx; size_t len = skb->len; int rc = -ENODEV; confirm_rx = qrtr_tx_wait(node, to->sq_node, to->sq_port, type); confirm_rx = qrtr_tx_wait(node, to, skb->sk, type, flags); if (confirm_rx < 0) { kfree_skb(skb); return confirm_rx; Loading Loading @@ -708,22 +765,33 @@ EXPORT_SYMBOL_GPL(qrtr_endpoint_register); */ void qrtr_endpoint_unregister(struct qrtr_endpoint *ep) { struct radix_tree_iter iter; struct qrtr_node *node = ep->node; struct sockaddr_qrtr src = {AF_QIPCRTR, node->nid, QRTR_PORT_CTRL}; struct sockaddr_qrtr dst = {AF_QIPCRTR, qrtr_local_nid, QRTR_PORT_CTRL}; struct qrtr_ctrl_pkt *pkt; struct sk_buff *skb; void __rcu **slot; mutex_lock(&node->ep_lock); node->ep = NULL; mutex_unlock(&node->ep_lock); /* Notify the local controller about the event */ down_read(&qrtr_node_lock); radix_tree_for_each_slot(slot, &qrtr_nodes, &iter, 0) { if (node != *slot) continue; skb = qrtr_alloc_ctrl_packet(&pkt); if (skb) { if (!skb) continue; src.sq_node = iter.index; pkt->cmd = cpu_to_le32(QRTR_TYPE_BYE); qrtr_local_enqueue(NULL, skb, QRTR_TYPE_BYE, &src, &dst); qrtr_local_enqueue(NULL, skb, QRTR_TYPE_BYE, &src, &dst, 0); } up_read(&qrtr_node_lock); /* Wake up any transmitters waiting for resume-tx from the node */ wake_up_interruptible_all(&node->resume_tx); Loading Loading @@ -783,7 +851,7 @@ static void qrtr_send_del_client(struct qrtr_sock *ipc) skb_set_owner_w(skb, &ipc->sk); if (ipc->state == QRTR_STATE_MULTI) { qrtr_bcast_enqueue(NULL, skb, type, &ipc->us, &to); qrtr_bcast_enqueue(NULL, skb, type, &ipc->us, &to, 0); return; } Loading @@ -799,11 +867,11 @@ static void qrtr_send_del_client(struct qrtr_sock *ipc) } skb_set_owner_w(skbn, &ipc->sk); qrtr_node_enqueue(node, skbn, type, &ipc->us, &to); qrtr_node_enqueue(node, skbn, type, &ipc->us, &to, 0); qrtr_node_release(node); } exit: qrtr_local_enqueue(NULL, skb, type, &ipc->us, &to); qrtr_local_enqueue(NULL, skb, type, &ipc->us, &to, 0); } /* Remove port assignment. */ Loading Loading @@ -959,7 +1027,7 @@ static int qrtr_bind(struct socket *sock, struct sockaddr *saddr, int len) /* Queue packet to local peer socket. */ static int qrtr_local_enqueue(struct qrtr_node *node, struct sk_buff *skb, int type, struct sockaddr_qrtr *from, struct sockaddr_qrtr *to) struct sockaddr_qrtr *to, unsigned int flags) { struct qrtr_sock *ipc; struct qrtr_cb *cb; Loading Loading @@ -992,7 +1060,7 @@ static int qrtr_local_enqueue(struct qrtr_node *node, struct sk_buff *skb, /* Queue packet for broadcast. */ static int qrtr_bcast_enqueue(struct qrtr_node *node, struct sk_buff *skb, int type, struct sockaddr_qrtr *from, struct sockaddr_qrtr *to) struct sockaddr_qrtr *to, unsigned int flags) { struct sk_buff *skbn; Loading @@ -1004,11 +1072,11 @@ static int qrtr_bcast_enqueue(struct qrtr_node *node, struct sk_buff *skb, if (!skbn) break; skb_set_owner_w(skbn, skb->sk); qrtr_node_enqueue(node, skbn, type, from, to); qrtr_node_enqueue(node, skbn, type, from, to, flags); } up_read(&qrtr_node_lock); qrtr_local_enqueue(node, skb, type, from, to); qrtr_local_enqueue(node, skb, type, from, to, flags); return 0; } Loading @@ -1017,7 +1085,8 @@ static int qrtr_sendmsg(struct socket *sock, struct msghdr *msg, size_t len) { DECLARE_SOCKADDR(struct sockaddr_qrtr *, addr, msg->msg_name); int (*enqueue_fn)(struct qrtr_node *, struct sk_buff *, int, struct sockaddr_qrtr *, struct sockaddr_qrtr *); struct sockaddr_qrtr *, struct sockaddr_qrtr *, unsigned int); struct qrtr_sock *ipc = qrtr_sk(sock->sk); struct sock *sk = sock->sk; struct qrtr_node *node; Loading Loading @@ -1106,7 +1175,7 @@ static int qrtr_sendmsg(struct socket *sock, struct msghdr *msg, size_t len) type = le32_to_cpu(type); } rc = enqueue_fn(node, skb, type, &ipc->us, addr); rc = enqueue_fn(node, skb, type, &ipc->us, addr, msg->msg_flags); if (rc >= 0) rc = len; Loading Loading @@ -1140,7 +1209,7 @@ static int qrtr_resume_tx(struct qrtr_cb *cb) pkt->client.port = cpu_to_le32(cb->dst_port); ret = qrtr_node_enqueue(node, skb, QRTR_TYPE_RESUME_TX, &local, &remote); &local, &remote, 0); qrtr_node_release(node); Loading