Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit 4b9bc86d authored by Sebastian Andrzej Siewior's avatar Sebastian Andrzej Siewior Committed by Martin K. Petersen
Browse files

fcoe: convert to kworker



The driver creates its own per-CPU threads which are updated based on
CPU hotplug events. It is also possible to use kworkers and remove some
of the kthread infrastrucure.

The code checked ->thread to decide if there is an active per-CPU
thread. By using the kworker infrastructure this is no longer
possible (or required). The thread pointer is saved in `kthread' instead
of `thread' so anything trying to use thread is caught by the
compiler. Currently only the bnx2fc driver is using struct fcoe_percpu_s
and the kthread member.

After a CPU went offline, we may still enqueue items on the "offline"
CPU. This isn't much of a problem. The work will be done on a random
CPU. The allocated crc_eof_page page won't be cleaned up. It is probably
expected that the CPU comes up at some point so it should not be a
problem. The crc_eof_page memory is released of course once the module
is removed.

This patch was only compile-tested due to -ENODEV.

Cc: Vasu Dev <vasu.dev@intel.com>
Cc: "James E.J. Bottomley" <jejb@linux.vnet.ibm.com>
Cc: "Martin K. Petersen" <martin.petersen@oracle.com>
Cc: Christoph Hellwig <hch@lst.de>
Cc: fcoe-devel@open-fcoe.org
Cc: linux-scsi@vger.kernel.org
Signed-off-by: default avatarSebastian Andrzej Siewior <bigeasy@linutronix.de>
Tested-by: default avatarJohannes Thumshirn <jthumshirn@suse.de>
Reviewed-by: default avatarJohannes Thumshirn <jthumshirn@suse.de>
Signed-off-by: default avatarMartin K. Petersen <martin.petersen@oracle.com>
parent 49a75815
Loading
Loading
Loading
Loading
+4 −4
Original line number Diff line number Diff line
@@ -486,7 +486,7 @@ static int bnx2fc_rcv(struct sk_buff *skb, struct net_device *dev,

	__skb_queue_tail(&bg->fcoe_rx_list, skb);
	if (bg->fcoe_rx_list.qlen == 1)
		wake_up_process(bg->thread);
		wake_up_process(bg->kthread);

	spin_unlock(&bg->fcoe_rx_list.lock);

@@ -2715,7 +2715,7 @@ static int __init bnx2fc_mod_init(void)
	}
	wake_up_process(l2_thread);
	spin_lock_bh(&bg->fcoe_rx_list.lock);
	bg->thread = l2_thread;
	bg->kthread = l2_thread;
	spin_unlock_bh(&bg->fcoe_rx_list.lock);

	for_each_possible_cpu(cpu) {
@@ -2788,8 +2788,8 @@ static void __exit bnx2fc_mod_exit(void)
	/* Destroy global thread */
	bg = &bnx2fc_global;
	spin_lock_bh(&bg->fcoe_rx_list.lock);
	l2_thread = bg->thread;
	bg->thread = NULL;
	l2_thread = bg->kthread;
	bg->kthread = NULL;
	while ((skb = __skb_dequeue(&bg->fcoe_rx_list)) != NULL)
		kfree_skb(skb);

+26 −250
Original line number Diff line number Diff line
@@ -67,9 +67,6 @@ static DEFINE_MUTEX(fcoe_config_mutex);

static struct workqueue_struct *fcoe_wq;

/* fcoe_percpu_clean completion.  Waiter protected by fcoe_create_mutex */
static DECLARE_COMPLETION(fcoe_flush_completion);

/* fcoe host list */
/* must only by accessed under the RTNL mutex */
static LIST_HEAD(fcoe_hostlist);
@@ -80,7 +77,6 @@ static int fcoe_reset(struct Scsi_Host *);
static int fcoe_xmit(struct fc_lport *, struct fc_frame *);
static int fcoe_rcv(struct sk_buff *, struct net_device *,
		    struct packet_type *, struct net_device *);
static int fcoe_percpu_receive_thread(void *);
static void fcoe_percpu_clean(struct fc_lport *);
static int fcoe_link_ok(struct fc_lport *);

@@ -107,7 +103,6 @@ static int fcoe_ddp_setup(struct fc_lport *, u16, struct scatterlist *,
static int fcoe_ddp_done(struct fc_lport *, u16);
static int fcoe_ddp_target(struct fc_lport *, u16, struct scatterlist *,
			   unsigned int);
static int fcoe_cpu_callback(struct notifier_block *, unsigned long, void *);
static int fcoe_dcb_app_notification(struct notifier_block *notifier,
				     ulong event, void *ptr);

@@ -136,11 +131,6 @@ static struct notifier_block fcoe_notifier = {
	.notifier_call = fcoe_device_notification,
};

/* notification function for CPU hotplug events */
static struct notifier_block fcoe_cpu_notifier = {
	.notifier_call = fcoe_cpu_callback,
};

/* notification function for DCB events */
static struct notifier_block dcb_notifier = {
	.notifier_call = fcoe_dcb_app_notification,
@@ -1245,152 +1235,21 @@ static int __exit fcoe_if_exit(void)
	return 0;
}

/**
 * fcoe_percpu_thread_create() - Create a receive thread for an online CPU
 * @cpu: The CPU index of the CPU to create a receive thread for
 */
static void fcoe_percpu_thread_create(unsigned int cpu)
static void fcoe_thread_cleanup_local(unsigned int cpu)
{
	struct fcoe_percpu_s *p;
	struct task_struct *thread;

	p = &per_cpu(fcoe_percpu, cpu);

	thread = kthread_create_on_node(fcoe_percpu_receive_thread,
					(void *)p, cpu_to_node(cpu),
					"fcoethread/%d", cpu);

	if (likely(!IS_ERR(thread))) {
		kthread_bind(thread, cpu);
		wake_up_process(thread);

		spin_lock_bh(&p->fcoe_rx_list.lock);
		p->thread = thread;
		spin_unlock_bh(&p->fcoe_rx_list.lock);
	}
}

/**
 * fcoe_percpu_thread_destroy() - Remove the receive thread of a CPU
 * @cpu: The CPU index of the CPU whose receive thread is to be destroyed
 *
 * Destroys a per-CPU Rx thread. Any pending skbs are moved to the
 * current CPU's Rx thread. If the thread being destroyed is bound to
 * the CPU processing this context the skbs will be freed.
 */
static void fcoe_percpu_thread_destroy(unsigned int cpu)
{
	struct fcoe_percpu_s *p;
	struct task_struct *thread;
	struct page *crc_eof;
	struct sk_buff *skb;
#ifdef CONFIG_SMP
	struct fcoe_percpu_s *p0;
	unsigned targ_cpu = get_cpu();
#endif /* CONFIG_SMP */

	FCOE_DBG("Destroying receive thread for CPU %d\n", cpu);
	struct fcoe_percpu_s *p;

	/* Prevent any new skbs from being queued for this CPU. */
	p = &per_cpu(fcoe_percpu, cpu);
	p = per_cpu_ptr(&fcoe_percpu, cpu);
	spin_lock_bh(&p->fcoe_rx_list.lock);
	thread = p->thread;
	p->thread = NULL;
	crc_eof = p->crc_eof_page;
	p->crc_eof_page = NULL;
	p->crc_eof_offset = 0;
	spin_unlock_bh(&p->fcoe_rx_list.lock);

#ifdef CONFIG_SMP
	/*
	 * Don't bother moving the skb's if this context is running
	 * on the same CPU that is having its thread destroyed. This
	 * can easily happen when the module is removed.
	 */
	if (cpu != targ_cpu) {
		p0 = &per_cpu(fcoe_percpu, targ_cpu);
		spin_lock_bh(&p0->fcoe_rx_list.lock);
		if (p0->thread) {
			FCOE_DBG("Moving frames from CPU %d to CPU %d\n",
				 cpu, targ_cpu);

			while ((skb = __skb_dequeue(&p->fcoe_rx_list)) != NULL)
				__skb_queue_tail(&p0->fcoe_rx_list, skb);
			spin_unlock_bh(&p0->fcoe_rx_list.lock);
		} else {
			/*
			 * The targeted CPU is not initialized and cannot accept
			 * new	skbs. Unlock the targeted CPU and drop the skbs
			 * on the CPU that is going offline.
			 */
			while ((skb = __skb_dequeue(&p->fcoe_rx_list)) != NULL)
				kfree_skb(skb);
			spin_unlock_bh(&p0->fcoe_rx_list.lock);
		}
	} else {
		/*
		 * This scenario occurs when the module is being removed
		 * and all threads are being destroyed. skbs will continue
		 * to be shifted from the CPU thread that is being removed
		 * to the CPU thread associated with the CPU that is processing
		 * the module removal. Once there is only one CPU Rx thread it
		 * will reach this case and we will drop all skbs and later
		 * stop the thread.
		 */
		spin_lock_bh(&p->fcoe_rx_list.lock);
		while ((skb = __skb_dequeue(&p->fcoe_rx_list)) != NULL)
			kfree_skb(skb);
		spin_unlock_bh(&p->fcoe_rx_list.lock);
	}
	put_cpu();
#else
	/*
	 * This a non-SMP scenario where the singular Rx thread is
	 * being removed. Free all skbs and stop the thread.
	 */
	spin_lock_bh(&p->fcoe_rx_list.lock);
	while ((skb = __skb_dequeue(&p->fcoe_rx_list)) != NULL)
		kfree_skb(skb);
	spin_unlock_bh(&p->fcoe_rx_list.lock);
#endif

	if (thread)
		kthread_stop(thread);

	if (crc_eof)
		put_page(crc_eof);
}

/**
 * fcoe_cpu_callback() - Handler for CPU hotplug events
 * @nfb:    The callback data block
 * @action: The event triggering the callback
 * @hcpu:   The index of the CPU that the event is for
 *
 * This creates or destroys per-CPU data for fcoe
 *
 * Returns NOTIFY_OK always.
 */
static int fcoe_cpu_callback(struct notifier_block *nfb,
			     unsigned long action, void *hcpu)
{
	unsigned cpu = (unsigned long)hcpu;

	switch (action) {
	case CPU_ONLINE:
	case CPU_ONLINE_FROZEN:
		FCOE_DBG("CPU %x online: Create Rx thread\n", cpu);
		fcoe_percpu_thread_create(cpu);
		break;
	case CPU_DEAD:
	case CPU_DEAD_FROZEN:
		FCOE_DBG("CPU %x offline: Remove Rx thread\n", cpu);
		fcoe_percpu_thread_destroy(cpu);
		break;
	default:
		break;
	}
	return NOTIFY_OK;
	flush_work(&p->work);
}

/**
@@ -1509,26 +1368,6 @@ static int fcoe_rcv(struct sk_buff *skb, struct net_device *netdev,

	fps = &per_cpu(fcoe_percpu, cpu);
	spin_lock(&fps->fcoe_rx_list.lock);
	if (unlikely(!fps->thread)) {
		/*
		 * The targeted CPU is not ready, let's target
		 * the first CPU now. For non-SMP systems this
		 * will check the same CPU twice.
		 */
		FCOE_NETDEV_DBG(netdev, "CPU is online, but no receive thread "
				"ready for incoming skb- using first online "
				"CPU.\n");

		spin_unlock(&fps->fcoe_rx_list.lock);
		cpu = cpumask_first(cpu_online_mask);
		fps = &per_cpu(fcoe_percpu, cpu);
		spin_lock(&fps->fcoe_rx_list.lock);
		if (!fps->thread) {
			spin_unlock(&fps->fcoe_rx_list.lock);
			goto err;
		}
	}

	/*
	 * We now have a valid CPU that we're targeting for
	 * this skb. We also have this receive thread locked,
@@ -1543,8 +1382,7 @@ static int fcoe_rcv(struct sk_buff *skb, struct net_device *netdev,
	 * in softirq context.
	 */
	__skb_queue_tail(&fps->fcoe_rx_list, skb);
	if (fps->thread->state == TASK_INTERRUPTIBLE)
		wake_up_process(fps->thread);
	schedule_work_on(cpu, &fps->work);
	spin_unlock(&fps->fcoe_rx_list.lock);

	return NET_RX_SUCCESS;
@@ -1712,15 +1550,6 @@ static int fcoe_xmit(struct fc_lport *lport, struct fc_frame *fp)
	return 0;
}

/**
 * fcoe_percpu_flush_done() - Indicate per-CPU queue flush completion
 * @skb: The completed skb (argument required by destructor)
 */
static void fcoe_percpu_flush_done(struct sk_buff *skb)
{
	complete(&fcoe_flush_completion);
}

/**
 * fcoe_filter_frames() - filter out bad fcoe frames, i.e. bad CRC
 * @lport: The local port the frame was received on
@@ -1792,7 +1621,6 @@ static void fcoe_recv_frame(struct sk_buff *skb)
	fr = fcoe_dev_from_skb(skb);
	lport = fr->fr_dev;
	if (unlikely(!lport)) {
		if (skb->destructor != fcoe_percpu_flush_done)
		FCOE_NETDEV_DBG(skb->dev, "NULL lport in skb\n");
		kfree_skb(skb);
		return;
@@ -1857,40 +1685,28 @@ static void fcoe_recv_frame(struct sk_buff *skb)
}

/**
 * fcoe_percpu_receive_thread() - The per-CPU packet receive thread
 * @arg: The per-CPU context
 * fcoe_receive_work() - The per-CPU worker
 * @work: The work struct
 *
 * Return: 0 for success
 */
static int fcoe_percpu_receive_thread(void *arg)
static void fcoe_receive_work(struct work_struct *work)
{
	struct fcoe_percpu_s *p = arg;
	struct fcoe_percpu_s *p;
	struct sk_buff *skb;
	struct sk_buff_head tmp;

	p = container_of(work, struct fcoe_percpu_s, work);
	skb_queue_head_init(&tmp);

	set_user_nice(current, MIN_NICE);

	while (!kthread_should_stop()) {

	spin_lock_bh(&p->fcoe_rx_list.lock);
	skb_queue_splice_init(&p->fcoe_rx_list, &tmp);

		if (!skb_queue_len(&tmp)) {
			set_current_state(TASK_INTERRUPTIBLE);
	spin_unlock_bh(&p->fcoe_rx_list.lock);
			schedule();
			continue;
		}

		spin_unlock_bh(&p->fcoe_rx_list.lock);
	if (!skb_queue_len(&tmp))
		return;

		while ((skb = __skb_dequeue(&tmp)) != NULL)
	while ((skb = __skb_dequeue(&tmp)))
		fcoe_recv_frame(skb);

	}
	return 0;
}

/**
@@ -2450,36 +2266,19 @@ static int fcoe_link_ok(struct fc_lport *lport)
 *
 * Must be called with fcoe_create_mutex held to single-thread completion.
 *
 * This flushes the pending skbs by adding a new skb to each queue and
 * waiting until they are all freed.  This assures us that not only are
 * there no packets that will be handled by the lport, but also that any
 * threads already handling packet have returned.
 * This flushes the pending skbs by flush the work item for each CPU. The work
 * item on each possible CPU is flushed because we may have used the per-CPU
 * struct of an offline CPU.
 */
static void fcoe_percpu_clean(struct fc_lport *lport)
{
	struct fcoe_percpu_s *pp;
	struct sk_buff *skb;
	unsigned int cpu;

	for_each_possible_cpu(cpu) {
		pp = &per_cpu(fcoe_percpu, cpu);

		if (!pp->thread || !cpu_online(cpu))
			continue;

		skb = dev_alloc_skb(0);
		if (!skb)
			continue;

		skb->destructor = fcoe_percpu_flush_done;

		spin_lock_bh(&pp->fcoe_rx_list.lock);
		__skb_queue_tail(&pp->fcoe_rx_list, skb);
		if (pp->fcoe_rx_list.qlen == 1)
			wake_up_process(pp->thread);
		spin_unlock_bh(&pp->fcoe_rx_list.lock);

		wait_for_completion(&fcoe_flush_completion);
		flush_work(&pp->work);
	}
}

@@ -2625,22 +2424,11 @@ static int __init fcoe_init(void)
	mutex_lock(&fcoe_config_mutex);

	for_each_possible_cpu(cpu) {
		p = &per_cpu(fcoe_percpu, cpu);
		p = per_cpu_ptr(&fcoe_percpu, cpu);
		INIT_WORK(&p->work, fcoe_receive_work);
		skb_queue_head_init(&p->fcoe_rx_list);
	}

	cpu_notifier_register_begin();

	for_each_online_cpu(cpu)
		fcoe_percpu_thread_create(cpu);

	/* Initialize per CPU interrupt thread */
	rc = __register_hotcpu_notifier(&fcoe_cpu_notifier);
	if (rc)
		goto out_free;

	cpu_notifier_register_done();

	/* Setup link change notification */
	fcoe_dev_setup();

@@ -2652,12 +2440,6 @@ static int __init fcoe_init(void)
	return 0;

out_free:
	for_each_online_cpu(cpu) {
		fcoe_percpu_thread_destroy(cpu);
	}

	cpu_notifier_register_done();

	mutex_unlock(&fcoe_config_mutex);
	destroy_workqueue(fcoe_wq);
	return rc;
@@ -2690,14 +2472,8 @@ static void __exit fcoe_exit(void)
	}
	rtnl_unlock();

	cpu_notifier_register_begin();

	for_each_online_cpu(cpu)
		fcoe_percpu_thread_destroy(cpu);

	__unregister_hotcpu_notifier(&fcoe_cpu_notifier);

	cpu_notifier_register_done();
	for_each_possible_cpu(cpu)
		fcoe_thread_cleanup_local(cpu);

	mutex_unlock(&fcoe_config_mutex);

+4 −2
Original line number Diff line number Diff line
@@ -319,14 +319,16 @@ struct fcoe_transport {

/**
 * struct fcoe_percpu_s - The context for FCoE receive thread(s)
 * @thread:	    The thread context
 * @kthread:	    The thread context (used by bnx2fc)
 * @work:	    The work item (used by fcoe)
 * @fcoe_rx_list:   The queue of pending packets to process
 * @page:	    The memory page for calculating frame trailer CRCs
 * @crc_eof_offset: The offset into the CRC page pointing to available
 *		    memory for a new trailer
 */
struct fcoe_percpu_s {
	struct task_struct *thread;
	struct task_struct *kthread;
	struct work_struct work;
	struct sk_buff_head fcoe_rx_list;
	struct page *crc_eof_page;
	int crc_eof_offset;