Loading net/qrtr/qrtr.c +44 −32 Original line number Diff line number Diff line Loading @@ -11,6 +11,7 @@ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. */ #include <linux/kthread.h> #include <linux/module.h> #include <linux/netlink.h> #include <linux/qrtr.h> Loading Loading @@ -141,8 +142,10 @@ static DEFINE_MUTEX(qrtr_port_lock); * @resume_tx: wait until remote port acks control flag * @qrtr_tx_lock: lock for qrtr_tx_flow * @rx_queue: receive queue * @work: scheduled work struct for recv work * @item: list item for broadcast list * @kworker: worker thread for recv work * @task: task to run the worker thread * @read_data: scheduled work for recv work * @ilc: ipc logging context reference */ struct qrtr_node { Loading @@ -157,9 +160,12 @@ struct qrtr_node { struct mutex qrtr_tx_lock; /* for qrtr_tx_flow */ struct sk_buff_head rx_queue; struct work_struct work; struct list_head item; struct kthread_worker kworker; struct task_struct *task; struct kthread_work read_data; void *ilc; }; Loading Loading @@ -332,8 +338,9 @@ static void __qrtr_node_release(struct kref *kref) } mutex_unlock(&node->qrtr_tx_lock); kthread_flush_worker(&node->kworker); kthread_stop(node->task); flush_work(&node->work); skb_queue_purge(&node->rx_queue); kfree(node); } Loading Loading @@ -383,10 +390,10 @@ static void qrtr_tx_resume(struct qrtr_node *node, struct sk_buff *skb) if (!flow) return; mutex_lock(&node->qrtr_tx_lock); atomic_set(&flow->pending, 0); wake_up_interruptible_all(&node->resume_tx); mutex_lock(&node->qrtr_tx_lock); list_for_each_entry_safe(waiter, temp, &flow->waiters, node) { list_del(&waiter->node); skbn = alloc_skb(0, GFP_KERNEL); Loading Loading @@ -446,20 +453,8 @@ static int qrtr_tx_wait(struct qrtr_node *node, struct sockaddr_qrtr *to, } mutex_unlock(&node->qrtr_tx_lock); ret = timeo; for (;;) { ret = wait_event_interruptible_timeout( node->resume_tx, !node->ep || atomic_read(&flow->pending) < QRTR_TX_FLOW_HIGH, timeo); if (ret < 0) return ret; if (!ret) return -EAGAIN; if (!node->ep) return -EPIPE; mutex_lock(&node->qrtr_tx_lock); if (atomic_read(&flow->pending) < QRTR_TX_FLOW_HIGH) { atomic_inc(&flow->pending); Loading @@ -468,19 +463,28 @@ static int qrtr_tx_wait(struct qrtr_node *node, struct sockaddr_qrtr *to, mutex_unlock(&node->qrtr_tx_lock); break; } mutex_unlock(&node->qrtr_tx_lock); } if (confirm_rx) { if (!ret) { waiter = kzalloc(sizeof(*waiter), GFP_KERNEL); if (!waiter) if (!waiter) { mutex_unlock(&node->qrtr_tx_lock); return -ENOMEM; } waiter->sk = sk; sock_hold(sk); mutex_lock(&node->qrtr_tx_lock); list_add_tail(&waiter->node, &flow->waiters); mutex_unlock(&node->qrtr_tx_lock); return -EAGAIN; } mutex_unlock(&node->qrtr_tx_lock); ret = wait_event_interruptible_timeout(node->resume_tx, !node->ep || atomic_read(&flow->pending) < QRTR_TX_FLOW_HIGH, timeo); if (ret < 0) return ret; if (!node->ep) return -EPIPE; } return confirm_rx; } Loading Loading @@ -676,7 +680,7 @@ int qrtr_endpoint_post(struct qrtr_endpoint *ep, const void *data, size_t len) qrtr_log_rx_msg(node, skb); skb_queue_tail(&node->rx_queue, skb); schedule_work(&node->work); kthread_queue_work(&node->kworker, &node->read_data); return 0; Loading Loading @@ -718,9 +722,10 @@ static void qrtr_port_put(struct qrtr_sock *ipc); * * This will auto-reply with resume-tx packet as necessary. */ static void qrtr_node_rx_work(struct work_struct *work) static void qrtr_node_rx_work(struct kthread_work *work) { struct qrtr_node *node = container_of(work, struct qrtr_node, work); struct qrtr_node *node = container_of(work, struct qrtr_node, read_data); struct qrtr_ctrl_pkt *pkt; struct sk_buff *skb; Loading Loading @@ -773,7 +778,6 @@ int qrtr_endpoint_register(struct qrtr_endpoint *ep, unsigned int nid) if (!node) return -ENOMEM; INIT_WORK(&node->work, qrtr_node_rx_work); kref_init(&node->ref); mutex_init(&node->ep_lock); skb_queue_head_init(&node->rx_queue); Loading @@ -781,6 +785,14 @@ int qrtr_endpoint_register(struct qrtr_endpoint *ep, unsigned int nid) node->ep = ep; atomic_set(&node->hello_sent, 0); kthread_init_work(&node->read_data, qrtr_node_rx_work); kthread_init_worker(&node->kworker); node->task = kthread_run(kthread_worker_fn, &node->kworker, "qrtr_rx"); if (IS_ERR(node->task)) { kfree(node); return -ENOMEM; } mutex_init(&node->qrtr_tx_lock); INIT_RADIX_TREE(&node->qrtr_tx_flow, GFP_KERNEL); init_waitqueue_head(&node->resume_tx); Loading Loading
net/qrtr/qrtr.c +44 −32 Original line number Diff line number Diff line Loading @@ -11,6 +11,7 @@ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. */ #include <linux/kthread.h> #include <linux/module.h> #include <linux/netlink.h> #include <linux/qrtr.h> Loading Loading @@ -141,8 +142,10 @@ static DEFINE_MUTEX(qrtr_port_lock); * @resume_tx: wait until remote port acks control flag * @qrtr_tx_lock: lock for qrtr_tx_flow * @rx_queue: receive queue * @work: scheduled work struct for recv work * @item: list item for broadcast list * @kworker: worker thread for recv work * @task: task to run the worker thread * @read_data: scheduled work for recv work * @ilc: ipc logging context reference */ struct qrtr_node { Loading @@ -157,9 +160,12 @@ struct qrtr_node { struct mutex qrtr_tx_lock; /* for qrtr_tx_flow */ struct sk_buff_head rx_queue; struct work_struct work; struct list_head item; struct kthread_worker kworker; struct task_struct *task; struct kthread_work read_data; void *ilc; }; Loading Loading @@ -332,8 +338,9 @@ static void __qrtr_node_release(struct kref *kref) } mutex_unlock(&node->qrtr_tx_lock); kthread_flush_worker(&node->kworker); kthread_stop(node->task); flush_work(&node->work); skb_queue_purge(&node->rx_queue); kfree(node); } Loading Loading @@ -383,10 +390,10 @@ static void qrtr_tx_resume(struct qrtr_node *node, struct sk_buff *skb) if (!flow) return; mutex_lock(&node->qrtr_tx_lock); atomic_set(&flow->pending, 0); wake_up_interruptible_all(&node->resume_tx); mutex_lock(&node->qrtr_tx_lock); list_for_each_entry_safe(waiter, temp, &flow->waiters, node) { list_del(&waiter->node); skbn = alloc_skb(0, GFP_KERNEL); Loading Loading @@ -446,20 +453,8 @@ static int qrtr_tx_wait(struct qrtr_node *node, struct sockaddr_qrtr *to, } mutex_unlock(&node->qrtr_tx_lock); ret = timeo; for (;;) { ret = wait_event_interruptible_timeout( node->resume_tx, !node->ep || atomic_read(&flow->pending) < QRTR_TX_FLOW_HIGH, timeo); if (ret < 0) return ret; if (!ret) return -EAGAIN; if (!node->ep) return -EPIPE; mutex_lock(&node->qrtr_tx_lock); if (atomic_read(&flow->pending) < QRTR_TX_FLOW_HIGH) { atomic_inc(&flow->pending); Loading @@ -468,19 +463,28 @@ static int qrtr_tx_wait(struct qrtr_node *node, struct sockaddr_qrtr *to, mutex_unlock(&node->qrtr_tx_lock); break; } mutex_unlock(&node->qrtr_tx_lock); } if (confirm_rx) { if (!ret) { waiter = kzalloc(sizeof(*waiter), GFP_KERNEL); if (!waiter) if (!waiter) { mutex_unlock(&node->qrtr_tx_lock); return -ENOMEM; } waiter->sk = sk; sock_hold(sk); mutex_lock(&node->qrtr_tx_lock); list_add_tail(&waiter->node, &flow->waiters); mutex_unlock(&node->qrtr_tx_lock); return -EAGAIN; } mutex_unlock(&node->qrtr_tx_lock); ret = wait_event_interruptible_timeout(node->resume_tx, !node->ep || atomic_read(&flow->pending) < QRTR_TX_FLOW_HIGH, timeo); if (ret < 0) return ret; if (!node->ep) return -EPIPE; } return confirm_rx; } Loading Loading @@ -676,7 +680,7 @@ int qrtr_endpoint_post(struct qrtr_endpoint *ep, const void *data, size_t len) qrtr_log_rx_msg(node, skb); skb_queue_tail(&node->rx_queue, skb); schedule_work(&node->work); kthread_queue_work(&node->kworker, &node->read_data); return 0; Loading Loading @@ -718,9 +722,10 @@ static void qrtr_port_put(struct qrtr_sock *ipc); * * This will auto-reply with resume-tx packet as necessary. */ static void qrtr_node_rx_work(struct work_struct *work) static void qrtr_node_rx_work(struct kthread_work *work) { struct qrtr_node *node = container_of(work, struct qrtr_node, work); struct qrtr_node *node = container_of(work, struct qrtr_node, read_data); struct qrtr_ctrl_pkt *pkt; struct sk_buff *skb; Loading Loading @@ -773,7 +778,6 @@ int qrtr_endpoint_register(struct qrtr_endpoint *ep, unsigned int nid) if (!node) return -ENOMEM; INIT_WORK(&node->work, qrtr_node_rx_work); kref_init(&node->ref); mutex_init(&node->ep_lock); skb_queue_head_init(&node->rx_queue); Loading @@ -781,6 +785,14 @@ int qrtr_endpoint_register(struct qrtr_endpoint *ep, unsigned int nid) node->ep = ep; atomic_set(&node->hello_sent, 0); kthread_init_work(&node->read_data, qrtr_node_rx_work); kthread_init_worker(&node->kworker); node->task = kthread_run(kthread_worker_fn, &node->kworker, "qrtr_rx"); if (IS_ERR(node->task)) { kfree(node); return -ENOMEM; } mutex_init(&node->qrtr_tx_lock); INIT_RADIX_TREE(&node->qrtr_tx_flow, GFP_KERNEL); init_waitqueue_head(&node->resume_tx); Loading