Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit c8e55f36 authored by Tejun Heo's avatar Tejun Heo
Browse files

workqueue: implement worker states



Implement worker states.  After created, a worker is STARTED.  While a
worker isn't processing a work, it's IDLE and chained on
gcwq->idle_list.  While processing a work, a worker is BUSY and
chained on gcwq->busy_hash.  Also, gcwq now counts the number of all
workers and idle ones.

worker_thread() is restructured to reflect state transitions.
cwq->more_work is removed and waking up a worker makes it check for
events.  A worker is killed by setting DIE flag while it's IDLE and
waking it up.

This gives gcwq better visibility of what's going on and allows it to
find out whether a work is executing quickly which is necessary to
have multiple workers processing the same cwq.

Signed-off-by: default avatarTejun Heo <tj@kernel.org>
parent 8b03ae3c
Loading
Loading
Loading
Loading
+173 −41
Original line number Original line Diff line number Diff line
@@ -35,6 +35,17 @@
#include <linux/lockdep.h>
#include <linux/lockdep.h>
#include <linux/idr.h>
#include <linux/idr.h>


enum {
	/* worker flags */
	WORKER_STARTED		= 1 << 0,	/* started */
	WORKER_DIE		= 1 << 1,	/* die die die */
	WORKER_IDLE		= 1 << 2,	/* is idle */

	BUSY_WORKER_HASH_ORDER	= 6,		/* 64 pointers */
	BUSY_WORKER_HASH_SIZE	= 1 << BUSY_WORKER_HASH_ORDER,
	BUSY_WORKER_HASH_MASK	= BUSY_WORKER_HASH_SIZE - 1,
};

/*
/*
 * Structure fields follow one of the following exclusion rules.
 * Structure fields follow one of the following exclusion rules.
 *
 *
@@ -51,11 +62,18 @@ struct global_cwq;
struct cpu_workqueue_struct;
struct cpu_workqueue_struct;


struct worker {
struct worker {
	/* on idle list while idle, on busy hash table while busy */
	union {
		struct list_head	entry;	/* L: while idle */
		struct hlist_node	hentry;	/* L: while busy */
	};

	struct work_struct	*current_work;	/* L: work being processed */
	struct work_struct	*current_work;	/* L: work being processed */
	struct list_head	scheduled;	/* L: scheduled works */
	struct list_head	scheduled;	/* L: scheduled works */
	struct task_struct	*task;		/* I: worker task */
	struct task_struct	*task;		/* I: worker task */
	struct global_cwq	*gcwq;		/* I: the associated gcwq */
	struct global_cwq	*gcwq;		/* I: the associated gcwq */
	struct cpu_workqueue_struct *cwq;	/* I: the associated cwq */
	struct cpu_workqueue_struct *cwq;	/* I: the associated cwq */
	unsigned int		flags;		/* L: flags */
	int			id;		/* I: worker id */
	int			id;		/* I: worker id */
};
};


@@ -65,6 +83,15 @@ struct worker {
struct global_cwq {
struct global_cwq {
	spinlock_t		lock;		/* the gcwq lock */
	spinlock_t		lock;		/* the gcwq lock */
	unsigned int		cpu;		/* I: the associated cpu */
	unsigned int		cpu;		/* I: the associated cpu */

	int			nr_workers;	/* L: total number of workers */
	int			nr_idle;	/* L: currently idle ones */

	/* workers are chained either in the idle_list or busy_hash */
	struct list_head	idle_list;	/* L: list of idle workers */
	struct hlist_head	busy_hash[BUSY_WORKER_HASH_SIZE];
						/* L: hash of busy workers */

	struct ida		worker_ida;	/* L: for worker IDs */
	struct ida		worker_ida;	/* L: for worker IDs */
} ____cacheline_aligned_in_smp;
} ____cacheline_aligned_in_smp;


@@ -77,7 +104,6 @@ struct global_cwq {
struct cpu_workqueue_struct {
struct cpu_workqueue_struct {
	struct global_cwq	*gcwq;		/* I: the associated gcwq */
	struct global_cwq	*gcwq;		/* I: the associated gcwq */
	struct list_head worklist;
	struct list_head worklist;
	wait_queue_head_t more_work;
	struct worker		*worker;
	struct worker		*worker;
	struct workqueue_struct *wq;		/* I: the owning workqueue */
	struct workqueue_struct *wq;		/* I: the owning workqueue */
	int			work_color;	/* L: current color */
	int			work_color;	/* L: current color */
@@ -306,6 +332,33 @@ static inline struct cpu_workqueue_struct *get_wq_data(struct work_struct *work)
			WORK_STRUCT_WQ_DATA_MASK);
			WORK_STRUCT_WQ_DATA_MASK);
}
}


/**
 * busy_worker_head - return the busy hash head for a work
 * @gcwq: gcwq of interest
 * @work: work to be hashed
 *
 * Return hash head of @gcwq for @work.
 *
 * CONTEXT:
 * spin_lock_irq(gcwq->lock).
 *
 * RETURNS:
 * Pointer to the hash head.
 */
static struct hlist_head *busy_worker_head(struct global_cwq *gcwq,
					   struct work_struct *work)
{
	const int base_shift = ilog2(sizeof(struct work_struct));
	unsigned long v = (unsigned long)work;

	/* simple shift and fold hash, do we need something better? */
	v >>= base_shift;
	v += v >> BUSY_WORKER_HASH_ORDER;
	v &= BUSY_WORKER_HASH_MASK;

	return &gcwq->busy_hash[v];
}

/**
/**
 * insert_work - insert a work into cwq
 * insert_work - insert a work into cwq
 * @cwq: cwq @work belongs to
 * @cwq: cwq @work belongs to
@@ -332,7 +385,7 @@ static void insert_work(struct cpu_workqueue_struct *cwq,
	smp_wmb();
	smp_wmb();


	list_add_tail(&work->entry, head);
	list_add_tail(&work->entry, head);
	wake_up(&cwq->more_work);
	wake_up_process(cwq->worker->task);
}
}


static void __queue_work(unsigned int cpu, struct workqueue_struct *wq,
static void __queue_work(unsigned int cpu, struct workqueue_struct *wq,
@@ -470,13 +523,59 @@ int queue_delayed_work_on(int cpu, struct workqueue_struct *wq,
}
}
EXPORT_SYMBOL_GPL(queue_delayed_work_on);
EXPORT_SYMBOL_GPL(queue_delayed_work_on);


/**
 * worker_enter_idle - enter idle state
 * @worker: worker which is entering idle state
 *
 * @worker is entering idle state.  Update stats and idle timer if
 * necessary.
 *
 * LOCKING:
 * spin_lock_irq(gcwq->lock).
 */
static void worker_enter_idle(struct worker *worker)
{
	struct global_cwq *gcwq = worker->gcwq;

	BUG_ON(worker->flags & WORKER_IDLE);
	BUG_ON(!list_empty(&worker->entry) &&
	       (worker->hentry.next || worker->hentry.pprev));

	worker->flags |= WORKER_IDLE;
	gcwq->nr_idle++;

	/* idle_list is LIFO */
	list_add(&worker->entry, &gcwq->idle_list);
}

/**
 * worker_leave_idle - leave idle state
 * @worker: worker which is leaving idle state
 *
 * @worker is leaving idle state.  Update stats.
 *
 * LOCKING:
 * spin_lock_irq(gcwq->lock).
 */
static void worker_leave_idle(struct worker *worker)
{
	struct global_cwq *gcwq = worker->gcwq;

	BUG_ON(!(worker->flags & WORKER_IDLE));
	worker->flags &= ~WORKER_IDLE;
	gcwq->nr_idle--;
	list_del_init(&worker->entry);
}

static struct worker *alloc_worker(void)
static struct worker *alloc_worker(void)
{
{
	struct worker *worker;
	struct worker *worker;


	worker = kzalloc(sizeof(*worker), GFP_KERNEL);
	worker = kzalloc(sizeof(*worker), GFP_KERNEL);
	if (worker)
	if (worker) {
		INIT_LIST_HEAD(&worker->entry);
		INIT_LIST_HEAD(&worker->scheduled);
		INIT_LIST_HEAD(&worker->scheduled);
	}
	return worker;
	return worker;
}
}


@@ -541,13 +640,16 @@ static struct worker *create_worker(struct cpu_workqueue_struct *cwq, bool bind)
 * start_worker - start a newly created worker
 * start_worker - start a newly created worker
 * @worker: worker to start
 * @worker: worker to start
 *
 *
 * Start @worker.
 * Make the gcwq aware of @worker and start it.
 *
 *
 * CONTEXT:
 * CONTEXT:
 * spin_lock_irq(gcwq->lock).
 * spin_lock_irq(gcwq->lock).
 */
 */
static void start_worker(struct worker *worker)
static void start_worker(struct worker *worker)
{
{
	worker->flags |= WORKER_STARTED;
	worker->gcwq->nr_workers++;
	worker_enter_idle(worker);
	wake_up_process(worker->task);
	wake_up_process(worker->task);
}
}


@@ -555,7 +657,10 @@ static void start_worker(struct worker *worker)
 * destroy_worker - destroy a workqueue worker
 * destroy_worker - destroy a workqueue worker
 * @worker: worker to be destroyed
 * @worker: worker to be destroyed
 *
 *
 * Destroy @worker.
 * Destroy @worker and adjust @gcwq stats accordingly.
 *
 * CONTEXT:
 * spin_lock_irq(gcwq->lock) which is released and regrabbed.
 */
 */
static void destroy_worker(struct worker *worker)
static void destroy_worker(struct worker *worker)
{
{
@@ -566,12 +671,21 @@ static void destroy_worker(struct worker *worker)
	BUG_ON(worker->current_work);
	BUG_ON(worker->current_work);
	BUG_ON(!list_empty(&worker->scheduled));
	BUG_ON(!list_empty(&worker->scheduled));


	if (worker->flags & WORKER_STARTED)
		gcwq->nr_workers--;
	if (worker->flags & WORKER_IDLE)
		gcwq->nr_idle--;

	list_del_init(&worker->entry);
	worker->flags |= WORKER_DIE;

	spin_unlock_irq(&gcwq->lock);

	kthread_stop(worker->task);
	kthread_stop(worker->task);
	kfree(worker);
	kfree(worker);


	spin_lock_irq(&gcwq->lock);
	spin_lock_irq(&gcwq->lock);
	ida_remove(&gcwq->worker_ida, id);
	ida_remove(&gcwq->worker_ida, id);
	spin_unlock_irq(&gcwq->lock);
}
}


/**
/**
@@ -686,6 +800,7 @@ static void process_one_work(struct worker *worker, struct work_struct *work)
{
{
	struct cpu_workqueue_struct *cwq = worker->cwq;
	struct cpu_workqueue_struct *cwq = worker->cwq;
	struct global_cwq *gcwq = cwq->gcwq;
	struct global_cwq *gcwq = cwq->gcwq;
	struct hlist_head *bwh = busy_worker_head(gcwq, work);
	work_func_t f = work->func;
	work_func_t f = work->func;
	int work_color;
	int work_color;
#ifdef CONFIG_LOCKDEP
#ifdef CONFIG_LOCKDEP
@@ -700,6 +815,7 @@ static void process_one_work(struct worker *worker, struct work_struct *work)
#endif
#endif
	/* claim and process */
	/* claim and process */
	debug_work_deactivate(work);
	debug_work_deactivate(work);
	hlist_add_head(&worker->hentry, bwh);
	worker->current_work = work;
	worker->current_work = work;
	work_color = get_work_color(work);
	work_color = get_work_color(work);
	list_del_init(&work->entry);
	list_del_init(&work->entry);
@@ -727,6 +843,7 @@ static void process_one_work(struct worker *worker, struct work_struct *work)
	spin_lock_irq(&gcwq->lock);
	spin_lock_irq(&gcwq->lock);


	/* we're done with it, release */
	/* we're done with it, release */
	hlist_del_init(&worker->hentry);
	worker->current_work = NULL;
	worker->current_work = NULL;
	cwq_dec_nr_in_flight(cwq, work_color);
	cwq_dec_nr_in_flight(cwq, work_color);
}
}
@@ -763,47 +880,56 @@ static int worker_thread(void *__worker)
	struct worker *worker = __worker;
	struct worker *worker = __worker;
	struct global_cwq *gcwq = worker->gcwq;
	struct global_cwq *gcwq = worker->gcwq;
	struct cpu_workqueue_struct *cwq = worker->cwq;
	struct cpu_workqueue_struct *cwq = worker->cwq;
	DEFINE_WAIT(wait);

	for (;;) {
		prepare_to_wait(&cwq->more_work, &wait, TASK_INTERRUPTIBLE);
		if (!kthread_should_stop() &&
		    list_empty(&cwq->worklist))
			schedule();
		finish_wait(&cwq->more_work, &wait);

		if (kthread_should_stop())
			break;


woke_up:
	if (unlikely(!cpumask_equal(&worker->task->cpus_allowed,
	if (unlikely(!cpumask_equal(&worker->task->cpus_allowed,
				    get_cpu_mask(gcwq->cpu))))
				    get_cpu_mask(gcwq->cpu))))
			set_cpus_allowed_ptr(worker->task,
		set_cpus_allowed_ptr(worker->task, get_cpu_mask(gcwq->cpu));
					     get_cpu_mask(gcwq->cpu));


	spin_lock_irq(&gcwq->lock);
	spin_lock_irq(&gcwq->lock);


	/* DIE can be set only while we're idle, checking here is enough */
	if (worker->flags & WORKER_DIE) {
		spin_unlock_irq(&gcwq->lock);
		return 0;
	}

	worker_leave_idle(worker);

	/*
	 * ->scheduled list can only be filled while a worker is
	 * preparing to process a work or actually processing it.
	 * Make sure nobody diddled with it while I was sleeping.
	 */
	BUG_ON(!list_empty(&worker->scheduled));

	while (!list_empty(&cwq->worklist)) {
	while (!list_empty(&cwq->worklist)) {
		struct work_struct *work =
		struct work_struct *work =
			list_first_entry(&cwq->worklist,
			list_first_entry(&cwq->worklist,
					 struct work_struct, entry);
					 struct work_struct, entry);


			if (likely(!(*work_data_bits(work) &
		if (likely(!(*work_data_bits(work) & WORK_STRUCT_LINKED))) {
				     WORK_STRUCT_LINKED))) {
			/* optimization path, not strictly necessary */
			/* optimization path, not strictly necessary */
			process_one_work(worker, work);
			process_one_work(worker, work);
			if (unlikely(!list_empty(&worker->scheduled)))
			if (unlikely(!list_empty(&worker->scheduled)))
				process_scheduled_works(worker);
				process_scheduled_works(worker);
		} else {
		} else {
				move_linked_works(work, &worker->scheduled,
			move_linked_works(work, &worker->scheduled, NULL);
						  NULL);
			process_scheduled_works(worker);
			process_scheduled_works(worker);
		}
		}
	}
	}


	/*
	 * gcwq->lock is held and there's no work to process, sleep.
	 * Workers are woken up only while holding gcwq->lock, so
	 * setting the current state before releasing gcwq->lock is
	 * enough to prevent losing any event.
	 */
	worker_enter_idle(worker);
	__set_current_state(TASK_INTERRUPTIBLE);
	spin_unlock_irq(&gcwq->lock);
	spin_unlock_irq(&gcwq->lock);
	}
	schedule();

	goto woke_up;
	return 0;
}
}


struct wq_barrier {
struct wq_barrier {
@@ -1600,7 +1726,6 @@ struct workqueue_struct *__create_workqueue_key(const char *name,
		cwq->max_active = max_active;
		cwq->max_active = max_active;
		INIT_LIST_HEAD(&cwq->worklist);
		INIT_LIST_HEAD(&cwq->worklist);
		INIT_LIST_HEAD(&cwq->delayed_works);
		INIT_LIST_HEAD(&cwq->delayed_works);
		init_waitqueue_head(&cwq->more_work);


		if (failed)
		if (failed)
			continue;
			continue;
@@ -1651,7 +1776,7 @@ EXPORT_SYMBOL_GPL(__create_workqueue_key);
 */
 */
void destroy_workqueue(struct workqueue_struct *wq)
void destroy_workqueue(struct workqueue_struct *wq)
{
{
	int cpu;
	unsigned int cpu;


	flush_workqueue(wq);
	flush_workqueue(wq);


@@ -1670,8 +1795,10 @@ void destroy_workqueue(struct workqueue_struct *wq)
		int i;
		int i;


		if (cwq->worker) {
		if (cwq->worker) {
			spin_lock_irq(&cwq->gcwq->lock);
			destroy_worker(cwq->worker);
			destroy_worker(cwq->worker);
			cwq->worker = NULL;
			cwq->worker = NULL;
			spin_unlock_irq(&cwq->gcwq->lock);
		}
		}


		for (i = 0; i < WORK_NR_COLORS; i++)
		for (i = 0; i < WORK_NR_COLORS; i++)
@@ -1881,7 +2008,7 @@ void thaw_workqueues(void)
			       cwq->nr_active < cwq->max_active)
			       cwq->nr_active < cwq->max_active)
				cwq_activate_first_delayed(cwq);
				cwq_activate_first_delayed(cwq);


			wake_up(&cwq->more_work);
			wake_up_process(cwq->worker->task);
		}
		}


		spin_unlock_irq(&gcwq->lock);
		spin_unlock_irq(&gcwq->lock);
@@ -1896,6 +2023,7 @@ void thaw_workqueues(void)
void __init init_workqueues(void)
void __init init_workqueues(void)
{
{
	unsigned int cpu;
	unsigned int cpu;
	int i;


	singlethread_cpu = cpumask_first(cpu_possible_mask);
	singlethread_cpu = cpumask_first(cpu_possible_mask);
	hotcpu_notifier(workqueue_cpu_callback, 0);
	hotcpu_notifier(workqueue_cpu_callback, 0);
@@ -1907,6 +2035,10 @@ void __init init_workqueues(void)
		spin_lock_init(&gcwq->lock);
		spin_lock_init(&gcwq->lock);
		gcwq->cpu = cpu;
		gcwq->cpu = cpu;


		INIT_LIST_HEAD(&gcwq->idle_list);
		for (i = 0; i < BUSY_WORKER_HASH_SIZE; i++)
			INIT_HLIST_HEAD(&gcwq->busy_hash[i]);

		ida_init(&gcwq->worker_ida);
		ida_init(&gcwq->worker_ida);
	}
	}