Loading net/qrtr/qrtr.c +74 −4 Original line number Diff line number Diff line Loading @@ -133,6 +133,7 @@ static DEFINE_MUTEX(qrtr_port_lock); * @nid: node id * @net_id: network cluster identifer * @hello_sent: hello packet sent to endpoint * @hello_rcvd: hello packet received from endpoint * @qrtr_tx_flow: tree with tx counts per flow * @resume_tx: waiters for a resume tx from the remote * @qrtr_tx_lock: lock for qrtr_tx_flow Loading @@ -141,6 +142,7 @@ static DEFINE_MUTEX(qrtr_port_lock); * @kworker: worker thread for recv work * @task: task to run the worker thread * @read_data: scheduled work for recv work * @say_hello: scheduled work for initiating hello * @ws: wakeupsource avoid system suspend */ struct qrtr_node { Loading @@ -150,6 +152,7 @@ struct qrtr_node { unsigned int nid; unsigned int net_id; atomic_t hello_sent; atomic_t hello_rcvd; struct radix_tree_root qrtr_tx_flow; struct wait_queue_head resume_tx; Loading @@ -161,6 +164,7 @@ struct qrtr_node { struct kthread_worker kworker; struct task_struct *task; struct kthread_work read_data; struct kthread_work say_hello; struct wakeup_source *ws; }; Loading Loading @@ -465,6 +469,10 @@ static int qrtr_node_enqueue(struct qrtr_node *node, struct sk_buff *skb, kfree_skb(skb); return rc; } if (atomic_read(&node->hello_sent) && type == QRTR_TYPE_HELLO) { kfree_skb(skb); return 0; } /* If sk is null, this is a forwarded packet and should not wait */ if (!skb->sk) { Loading Loading @@ -809,6 +817,29 @@ static void qrtr_fwd_pkt(struct sk_buff *skb, struct qrtr_cb *cb) qrtr_node_release(node); } static void qrtr_sock_queue_skb(struct qrtr_node *node, struct sk_buff *skb, struct qrtr_sock *ipc) { struct qrtr_cb *cb = (struct qrtr_cb *)skb->cb; int rc; /* Don't queue HELLO if control port already received */ if (cb->type == QRTR_TYPE_HELLO) { if (atomic_read(&node->hello_rcvd)) { kfree_skb(skb); return; } atomic_inc(&node->hello_rcvd); } rc = sock_queue_rcv_skb(&ipc->sk, skb); if (rc) { pr_err("%s: qrtr pkt dropped flow[%d] rc[%d]\n", __func__, cb->confirm_rx, rc); kfree_skb(skb); } } /* Handle not atomic operations for a received packet. */ static void qrtr_node_rx_work(struct kthread_work *work) { Loading Loading @@ -837,15 +868,40 @@ static void qrtr_node_rx_work(struct kthread_work *work) if (!ipc) { kfree_skb(skb); } else { if (sock_queue_rcv_skb(&ipc->sk, skb)) kfree_skb(skb); qrtr_sock_queue_skb(node, skb, ipc); qrtr_port_put(ipc); } } } } static void qrtr_hello_work(struct kthread_work *work) { struct sockaddr_qrtr from = {AF_QIPCRTR, 0, QRTR_PORT_CTRL}; struct sockaddr_qrtr to = {AF_QIPCRTR, 0, QRTR_PORT_CTRL}; struct qrtr_ctrl_pkt *pkt; struct qrtr_node *node; struct qrtr_sock *ctrl; struct sk_buff *skb; ctrl = qrtr_port_lookup(QRTR_PORT_CTRL); if (!ctrl) return; skb = qrtr_alloc_ctrl_packet(&pkt); if (!skb) { qrtr_port_put(ctrl); return; } node = container_of(work, struct qrtr_node, say_hello); pkt->cmd = cpu_to_le32(QRTR_TYPE_HELLO); from.sq_node = qrtr_local_nid; to.sq_node = node->nid; qrtr_node_enqueue(node, skb, QRTR_TYPE_HELLO, &from, &to, 0); qrtr_port_put(ctrl); } /** * qrtr_endpoint_register() - register a new endpoint * @ep: endpoint to register Loading Loading @@ -874,8 +930,10 @@ int qrtr_endpoint_register(struct qrtr_endpoint *ep, unsigned int net_id, node->nid = QRTR_EP_NID_AUTO; node->ep = ep; atomic_set(&node->hello_sent, 0); atomic_set(&node->hello_rcvd, 0); kthread_init_work(&node->read_data, qrtr_node_rx_work); kthread_init_work(&node->say_hello, qrtr_hello_work); kthread_init_worker(&node->kworker); node->task = kthread_run(kthread_worker_fn, &node->kworker, "qrtr_rx"); if (IS_ERR(node->task)) { Loading @@ -899,6 +957,7 @@ int qrtr_endpoint_register(struct qrtr_endpoint *ep, unsigned int net_id, node->ws = wakeup_source_register("qrtr_ws"); kthread_queue_work(&node->kworker, &node->say_hello); return 0; } EXPORT_SYMBOL_GPL(qrtr_endpoint_register); Loading Loading @@ -1195,6 +1254,17 @@ static int __qrtr_bind(struct socket *sock, qrtr_reset_ports(); mutex_unlock(&qrtr_port_lock); if (port == QRTR_PORT_CTRL) { struct qrtr_node *node; down_write(&qrtr_epts_lock); list_for_each_entry(node, &qrtr_all_epts, item) { atomic_set(&node->hello_sent, 0); atomic_set(&node->hello_rcvd, 0); } up_write(&qrtr_epts_lock); } /* unbind previous, if any */ if (!zapped) qrtr_port_remove(ipc); Loading Loading @@ -1294,7 +1364,7 @@ static int qrtr_bcast_enqueue(struct qrtr_node *node, struct sk_buff *skb, down_read(&qrtr_epts_lock); list_for_each_entry(node, &qrtr_all_epts, item) { if (node->nid == QRTR_EP_NID_AUTO) if (node->nid == QRTR_EP_NID_AUTO && type != QRTR_TYPE_HELLO) continue; skbn = skb_clone(skb, GFP_KERNEL); Loading Loading
net/qrtr/qrtr.c +74 −4 Original line number Diff line number Diff line Loading @@ -133,6 +133,7 @@ static DEFINE_MUTEX(qrtr_port_lock); * @nid: node id * @net_id: network cluster identifer * @hello_sent: hello packet sent to endpoint * @hello_rcvd: hello packet received from endpoint * @qrtr_tx_flow: tree with tx counts per flow * @resume_tx: waiters for a resume tx from the remote * @qrtr_tx_lock: lock for qrtr_tx_flow Loading @@ -141,6 +142,7 @@ static DEFINE_MUTEX(qrtr_port_lock); * @kworker: worker thread for recv work * @task: task to run the worker thread * @read_data: scheduled work for recv work * @say_hello: scheduled work for initiating hello * @ws: wakeupsource avoid system suspend */ struct qrtr_node { Loading @@ -150,6 +152,7 @@ struct qrtr_node { unsigned int nid; unsigned int net_id; atomic_t hello_sent; atomic_t hello_rcvd; struct radix_tree_root qrtr_tx_flow; struct wait_queue_head resume_tx; Loading @@ -161,6 +164,7 @@ struct qrtr_node { struct kthread_worker kworker; struct task_struct *task; struct kthread_work read_data; struct kthread_work say_hello; struct wakeup_source *ws; }; Loading Loading @@ -465,6 +469,10 @@ static int qrtr_node_enqueue(struct qrtr_node *node, struct sk_buff *skb, kfree_skb(skb); return rc; } if (atomic_read(&node->hello_sent) && type == QRTR_TYPE_HELLO) { kfree_skb(skb); return 0; } /* If sk is null, this is a forwarded packet and should not wait */ if (!skb->sk) { Loading Loading @@ -809,6 +817,29 @@ static void qrtr_fwd_pkt(struct sk_buff *skb, struct qrtr_cb *cb) qrtr_node_release(node); } static void qrtr_sock_queue_skb(struct qrtr_node *node, struct sk_buff *skb, struct qrtr_sock *ipc) { struct qrtr_cb *cb = (struct qrtr_cb *)skb->cb; int rc; /* Don't queue HELLO if control port already received */ if (cb->type == QRTR_TYPE_HELLO) { if (atomic_read(&node->hello_rcvd)) { kfree_skb(skb); return; } atomic_inc(&node->hello_rcvd); } rc = sock_queue_rcv_skb(&ipc->sk, skb); if (rc) { pr_err("%s: qrtr pkt dropped flow[%d] rc[%d]\n", __func__, cb->confirm_rx, rc); kfree_skb(skb); } } /* Handle not atomic operations for a received packet. */ static void qrtr_node_rx_work(struct kthread_work *work) { Loading Loading @@ -837,15 +868,40 @@ static void qrtr_node_rx_work(struct kthread_work *work) if (!ipc) { kfree_skb(skb); } else { if (sock_queue_rcv_skb(&ipc->sk, skb)) kfree_skb(skb); qrtr_sock_queue_skb(node, skb, ipc); qrtr_port_put(ipc); } } } } static void qrtr_hello_work(struct kthread_work *work) { struct sockaddr_qrtr from = {AF_QIPCRTR, 0, QRTR_PORT_CTRL}; struct sockaddr_qrtr to = {AF_QIPCRTR, 0, QRTR_PORT_CTRL}; struct qrtr_ctrl_pkt *pkt; struct qrtr_node *node; struct qrtr_sock *ctrl; struct sk_buff *skb; ctrl = qrtr_port_lookup(QRTR_PORT_CTRL); if (!ctrl) return; skb = qrtr_alloc_ctrl_packet(&pkt); if (!skb) { qrtr_port_put(ctrl); return; } node = container_of(work, struct qrtr_node, say_hello); pkt->cmd = cpu_to_le32(QRTR_TYPE_HELLO); from.sq_node = qrtr_local_nid; to.sq_node = node->nid; qrtr_node_enqueue(node, skb, QRTR_TYPE_HELLO, &from, &to, 0); qrtr_port_put(ctrl); } /** * qrtr_endpoint_register() - register a new endpoint * @ep: endpoint to register Loading Loading @@ -874,8 +930,10 @@ int qrtr_endpoint_register(struct qrtr_endpoint *ep, unsigned int net_id, node->nid = QRTR_EP_NID_AUTO; node->ep = ep; atomic_set(&node->hello_sent, 0); atomic_set(&node->hello_rcvd, 0); kthread_init_work(&node->read_data, qrtr_node_rx_work); kthread_init_work(&node->say_hello, qrtr_hello_work); kthread_init_worker(&node->kworker); node->task = kthread_run(kthread_worker_fn, &node->kworker, "qrtr_rx"); if (IS_ERR(node->task)) { Loading @@ -899,6 +957,7 @@ int qrtr_endpoint_register(struct qrtr_endpoint *ep, unsigned int net_id, node->ws = wakeup_source_register("qrtr_ws"); kthread_queue_work(&node->kworker, &node->say_hello); return 0; } EXPORT_SYMBOL_GPL(qrtr_endpoint_register); Loading Loading @@ -1195,6 +1254,17 @@ static int __qrtr_bind(struct socket *sock, qrtr_reset_ports(); mutex_unlock(&qrtr_port_lock); if (port == QRTR_PORT_CTRL) { struct qrtr_node *node; down_write(&qrtr_epts_lock); list_for_each_entry(node, &qrtr_all_epts, item) { atomic_set(&node->hello_sent, 0); atomic_set(&node->hello_rcvd, 0); } up_write(&qrtr_epts_lock); } /* unbind previous, if any */ if (!zapped) qrtr_port_remove(ipc); Loading Loading @@ -1294,7 +1364,7 @@ static int qrtr_bcast_enqueue(struct qrtr_node *node, struct sk_buff *skb, down_read(&qrtr_epts_lock); list_for_each_entry(node, &qrtr_all_epts, item) { if (node->nid == QRTR_EP_NID_AUTO) if (node->nid == QRTR_EP_NID_AUTO && type != QRTR_TYPE_HELLO) continue; skbn = skb_clone(skb, GFP_KERNEL); Loading