Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit f7a8be28 authored by Gilad Broner's avatar Gilad Broner
Browse files

media: dvb: pipe work queue optimization



Having a pipe work object for each sps callback quickly
caused queue to fill-up when playing from memory where TSPPv2
hardware processing is very fast.
Each pipe now has its own dedicated queue, and in case the same
event repeats (which is the likely scenario) the events are merged
into one work object with event count instead.

Change-Id: I00b99f3d8de3718c73a238520d49e5bdb3843545
Signed-off-by: default avatarGilad Broner <gbroner@codeaurora.org>
parent 5cc57e3e
Loading
Loading
Loading
Loading
+174 −57
Original line number Diff line number Diff line
@@ -77,7 +77,6 @@ static struct
	struct dentry *debugfs_filters_file;
	struct dentry *debugfs_sources_file;
	struct dentry *debugfs_index_tables_file;

} mpq_dmx_tspp2_info;

/**
@@ -92,7 +91,6 @@ static void pipe_work_queue_init(struct pipe_work_queue *queue)
	spin_lock_init(&queue->lock);
	INIT_LIST_HEAD(&queue->work_list);
	INIT_LIST_HEAD(&queue->free_list);
	init_waitqueue_head(&queue->wait_queue);

	for (i = 0; i < TSPP2_DMX_PIPE_WORK_POOL_SIZE; i++)
		list_add_tail(&queue->work_pool[i].next, &queue->free_list);
@@ -144,7 +142,6 @@ static void pipe_work_queue_push(struct pipe_work_queue *queue,

	spin_lock_irqsave(&queue->lock, flags);
	list_add_tail(&work->next, &queue->work_list);
	wake_up_all(&queue->wait_queue);
	spin_unlock_irqrestore(&queue->lock, flags);
}

@@ -383,7 +380,7 @@ static int mpq_dmx_tspp2_init_idx_table(enum dmx_video_codec codec,
			__func__, table_id, ret);
		return ret;
	}

	table->num_patterns = 0;
	MPQ_DVB_DBG_PRINT(
		"%s: Initialize new index table #%d, prefix=0x%X, mask=0x%X\n",
		__func__, table_id, INDEX_TABLE_PREFIX_VALUE,
@@ -720,6 +717,48 @@ static int mpq_dmx_tspp2_ts_event_check(struct dvb_demux_feed *feed,
	return 0;
}

static int mpq_dmx_tspp2_stream_buffer_event_check(struct dvb_demux_feed *feed,
	struct pipe_info *pipe_info)
{
	int ret;
	u32 session_id;

	if (feed->demux->playback_mode != DMX_PB_MODE_PULL)
		return 0;

	if (!mutex_is_locked(&pipe_info->mutex))
		return -EINVAL;

	/*
	 * For pull mode need to wait for sufficient room to write the
	 * meta-data packet in the mpq_streambuffer object.
	 * Data itself was already written by TSPPv2 hardware (so required_space
	 * argument is 0).
	 * Since this may block waiting for the metadata buffer, pipe mutex
	 * needs to be released, and when returning verify the pipe was not
	 * closed / re-opened in the meantime.
	 */
	session_id = pipe_info->session_id;
	mutex_unlock(&pipe_info->mutex);
	ret = mpq_dmx_decoder_fullness_wait(feed, 0);
	mutex_lock(&pipe_info->mutex);
	if (ret) {
		MPQ_DVB_ERR_PRINT(
			"%s: mpq_dmx_decoder_fullness_wait failed, ret=%d\n",
			__func__, ret);
		return ret;
	}
	if (pipe_info->session_id != session_id || !pipe_info->ref_count) {
		MPQ_DVB_ERR_PRINT(
			"%s: pipe was closed / re-opened: ref. count=%u, session_id=%u (expected %u)\n",
			__func__, pipe_info->ref_count,
			pipe_info->session_id, session_id);
		return -ENODEV;
	}

	return 0;
}

/**
 * mpq_dmx_tspp2_filter_event_cb() - filter event notification handler
 *
@@ -1667,6 +1706,8 @@ static int mpq_dmx_init_out_pipe(struct mpq_demux *mpq_demux,
	pipe_info->source_info = source_info;
	pipe_info->eos_pending = 0;
	pipe_info->session_id++;
	pipe_info->hw_missed_notif = 0;
	pipe_info->handler_count = 0;
	return 0;

close_pipe:
@@ -1732,8 +1773,7 @@ static int mpq_dmx_terminate_out_pipe(struct pipe_info *pipe_info)
	tspp2_pipe_close(pipe_info->handle);
	pipe_info->handle = TSPP2_INVALID_HANDLE;

	pipe_work_queue_cancel_work(&source_info->demux_src.work_queue,
		pipe_info);
	pipe_work_queue_cancel_work(&pipe_info->work_queue, pipe_info);

	if (pipe_info->buffer.kernel_map)
		ion_unmap_kernel(mpq_demux->ion_client,
@@ -2084,25 +2124,51 @@ static void mpq_dmx_tspp2_update_pipe_stats(struct pipe_info *pipe_info)
	pipe_info->hw_notif_count++;
}

static void mpq_dmx_tspp2_queue_pipe_handler(struct pipe_info *pipe_info,
static int mpq_dmx_tspp2_queue_pipe_handler(struct pipe_info *pipe_info,
	enum mpq_dmx_tspp2_pipe_event event)
{
	struct source_info *src;
	struct source_info *src = pipe_info->source_info;
	struct pipe_work_queue *queue = &pipe_info->work_queue;
	struct pipe_work *pipe_work;
	unsigned long flags;

	src = pipe_info->source_info;
	pipe_work = pipe_work_queue_allocate(&src->demux_src.work_queue);
	/* Try to merge data events into 1 pipe work object */
	spin_lock_irqsave(&queue->lock, flags);
	if (!list_empty(&queue->work_list)) {
		pipe_work = list_first_entry(&queue->work_list,
			struct pipe_work, next);
		if (event == PIPE_DATA_EVENT &&
			pipe_work->event == PIPE_DATA_EVENT &&
			pipe_work->event_count &&
			pipe_work->pipe_info == pipe_info &&
			pipe_work->session_id == pipe_info->session_id) {
			pipe_work->event_count++;
			spin_unlock_irqrestore(&queue->lock, flags);
			wake_up_all(&src->demux_src.wait_queue);
			return 0;
		}
	}
	spin_unlock_irqrestore(&queue->lock, flags);

	pipe_work = pipe_work_queue_allocate(&pipe_info->work_queue);
	if (pipe_work == NULL) {
		int pipe_idx = (pipe_info - mpq_dmx_tspp2_info.pipes) /
			sizeof(*pipe_info);
		MPQ_DVB_ERR_PRINT(
			"%s: Cannot allocate pipe work for pipe %d\n",
			__func__, pipe_info->type);
		return;
			"%s: Cannot allocate pipe work for pipe %d, type %d\n",
			__func__, pipe_idx, pipe_info->type);
		return -ENOSPC;
	}

	pipe_work->pipe_info = pipe_info;
	pipe_work->event = event;
	pipe_work->session_id = pipe_info->session_id;
	pipe_work_queue_push(&src->demux_src.work_queue, pipe_work);
	pipe_work->event_count = 1;

	pipe_work_queue_push(&pipe_info->work_queue, pipe_work);
	wake_up_all(&src->demux_src.wait_queue);

	return 0;
}

/**
@@ -2113,6 +2179,7 @@ static void mpq_dmx_tspp2_queue_pipe_handler(struct pipe_info *pipe_info,
static void mpq_dmx_sps_producer_cb(struct sps_event_notify *notify)
{
	struct pipe_info *pipe_info = notify->user;
	int ret;

	MPQ_DVB_DBG_PRINT("%s: Notification event id=%d, handle=%p, type=%d\n",
		__func__, notify->event_id, pipe_info, pipe_info->type);
@@ -2123,14 +2190,19 @@ static void mpq_dmx_sps_producer_cb(struct sps_event_notify *notify)
	mpq_dmx_tspp2_update_pipe_stats(pipe_info);

	/* Schedule a new work to relevant source workqueue */
	if (notify->event_id == SPS_EVENT_OUT_OF_DESC)
		mpq_dmx_tspp2_queue_pipe_handler(pipe_info,
	if (notify->event_id == SPS_EVENT_OUT_OF_DESC) {
		MPQ_DVB_ERR_PRINT("%s: SPS_EVENT_OUT_OF_DESC!\n", __func__);
		ret = mpq_dmx_tspp2_queue_pipe_handler(pipe_info,
			PIPE_OVERFLOW_EVENT);
	else
		mpq_dmx_tspp2_queue_pipe_handler(pipe_info,
	} else {
		ret = mpq_dmx_tspp2_queue_pipe_handler(pipe_info,
			PIPE_DATA_EVENT);
	}

	if (ret)
		pipe_info->hw_missed_notif++;
}

/**
 * Event callback function from SPS driver for input (consumer) pipes.
 *
@@ -2190,13 +2262,14 @@ static void mpq_dmx_timer_cb(unsigned long param)

		/* Schedule a new work to relevant source workqueue */
		pipe_work = pipe_work_queue_allocate(
			&source_info->demux_src.work_queue);
			&pipe_info->work_queue);
		if (pipe_work != NULL) {
			pipe_work->session_id = pipe_info->session_id;
			pipe_work->pipe_info = pipe_info;
			pipe_work->event = PIPE_DATA_EVENT;
			pipe_work_queue_push(&source_info->demux_src.work_queue,
				pipe_work);
			pipe_work->event_count = 1;
			pipe_work_queue_push(&pipe_info->work_queue, pipe_work);
			wake_up_all(&source_info->demux_src.wait_queue);
		} else {
			MPQ_DVB_ERR_PRINT(
				"%s: Cannot allocate pipe work\n", __func__);
@@ -3069,6 +3142,14 @@ static int mpq_dmx_tspp2_process_video_headers(struct mpq_feed *mpq_feed,
		return ret;
	}

	ret = mpq_dmx_tspp2_stream_buffer_event_check(feed, header_pipe);
	if (ret) {
		MPQ_DVB_ERR_PRINT(
			"%s: mpq_dmx_tspp2_stream_buffer_event_check failed, ret=%d\n",
			__func__, ret);
		return ret;
	}

	ret = mpq_streambuffer_pkt_write(stream_buffer, &packet,
		(u8 *)&meta_data);
	if (ret) {
@@ -4596,8 +4677,7 @@ static int mpq_dmx_tspp2_eos_cmd(struct mpq_tspp2_feed *tspp2_feed)

	source_info = pipe_info->source_info;

	pipe_work = pipe_work_queue_allocate(
		&source_info->demux_src.work_queue);
	pipe_work = pipe_work_queue_allocate(&pipe_info->work_queue);
	if (pipe_work == NULL) {
		MPQ_DVB_ERR_PRINT("%s: Cannot allocate pipe work\n", __func__);
		mutex_unlock(&mpq_dmx_tspp2_info.mutex);
@@ -4606,9 +4686,11 @@ static int mpq_dmx_tspp2_eos_cmd(struct mpq_tspp2_feed *tspp2_feed)

	pipe_work->pipe_info = pipe_info;
	pipe_work->event = PIPE_EOS_EVENT;
	pipe_work->event_count = 1;
	pipe_work->session_id = pipe_info->session_id;

	pipe_work_queue_push(&source_info->demux_src.work_queue, pipe_work);
	pipe_work_queue_push(&pipe_info->work_queue, pipe_work);
	wake_up_all(&source_info->demux_src.wait_queue);

	mutex_unlock(&mpq_dmx_tspp2_info.mutex);

@@ -5853,6 +5935,21 @@ static int mpq_dmx_tspp2_write_cancel(struct dmx_demux *demux)
	return 0;
}

static bool mpq_dmx_tspp2_pipe_do_work(struct source_info *source_info)
{
	struct pipe_info *pipe_info;
	int i;

	for (i = 0; i < TSPP2_NUM_PIPES; i++) {
		pipe_info = &mpq_dmx_tspp2_info.pipes[i];
		if (pipe_info->source_info == source_info &&
			(!pipe_work_queue_empty(&pipe_info->work_queue)))
			return true;
	}

	return false;
}

static int mpq_dmx_tspp2_thread(void *arg)
{
	struct source_info *source_info = arg;
@@ -5860,12 +5957,13 @@ static int mpq_dmx_tspp2_thread(void *arg)
	struct pipe_info *pipe_info;
	int ret;
	unsigned long flags;
	int i;
	int j;

	while (1) {
		ret = wait_event_interruptible(
			source_info->demux_src.work_queue.wait_queue,
			!pipe_work_queue_empty(
				&source_info->demux_src.work_queue) ||
			source_info->demux_src.wait_queue,
			mpq_dmx_tspp2_pipe_do_work(source_info) ||
			kthread_should_stop());
		if (ret) {
			MPQ_DVB_ERR_PRINT(
@@ -5878,41 +5976,54 @@ static int mpq_dmx_tspp2_thread(void *arg)
			break;
		}

		pipe_work = pipe_work_queue_pop(
			&source_info->demux_src.work_queue);
		for (i = 0; i < TSPP2_NUM_PIPES; i++) {
			pipe_info = &mpq_dmx_tspp2_info.pipes[i];

			/*
		 * The pipe work might have been flushed
		 * if filter was stopped
			 * Lock pipe mutex to protect against pipe being closed
			 * during its processing
			 */
		if (pipe_work == NULL) {
			MPQ_DVB_DBG_PRINT("%s: pipe was flushed\n", __func__);
			if (mutex_lock_interruptible(&pipe_info->mutex))
				continue;

			if (pipe_info->source_info != source_info ||
				pipe_work_queue_empty(&pipe_info->work_queue)) {
				mutex_unlock(&pipe_info->mutex);
				continue;
			}

		pipe_info = pipe_work->pipe_info;
			pipe_work = pipe_work_queue_pop(&pipe_info->work_queue);

			/*
		 * Lock pipe mutex to protect against pipe being closed
		 * during its processing
			 * The pipe work might have been flushed
			 * if filter was stopped
			 */
		if (mutex_lock_interruptible(&pipe_info->mutex))
			if (pipe_work == NULL) {
				MPQ_DVB_DBG_PRINT(
					"%s: pipe was flushed\n", __func__);
				mutex_unlock(&pipe_info->mutex);
				continue;
			}

			spin_lock_irqsave(&pipe_info->lock, flags);
			if (pipe_info->ref_count && pipe_info->pipe_handler &&
			pipe_work->session_id == pipe_info->session_id) {
				pipe_work->session_id ==
					pipe_info->session_id) {
				spin_unlock_irqrestore(&pipe_info->lock, flags);
			pipe_info->pipe_handler(pipe_info, pipe_work->event);
				for (j = 0; j < pipe_work->event_count; j++)
					pipe_info->pipe_handler(pipe_info,
						pipe_work->event);
				pipe_info->handler_count += j;
			} else {
				spin_unlock_irqrestore(&pipe_info->lock, flags);
			}

		mutex_unlock(&pipe_work->pipe_info->mutex);
			mutex_unlock(&pipe_info->mutex);

		pipe_work_queue_release(&source_info->demux_src.work_queue,
			pipe_work_queue_release(&pipe_info->work_queue,
				pipe_work);
		}
	}

	/* Terminate thread gracefully */
	set_current_state(TASK_INTERRUPTIBLE);
@@ -6143,6 +6254,10 @@ static int mpq_dmx_tspp2_pipes_print(struct seq_file *s, void *p)
				pipe_info->hw_notif_rate_hz);
			seq_printf(s, "interrupt count: %d\n",
				pipe_info->hw_notif_count);
			seq_printf(s, "int. miss count: %d\n",
				pipe_info->hw_missed_notif);
			seq_printf(s, "handler count  : %d\n",
				pipe_info->handler_count);
			seq_printf(s, "buffer address : 0x%p(0x%p)\n",
				pipe_info->buffer.mem,
				(void *)pipe_info->buffer.iova);
@@ -6346,6 +6461,7 @@ static const struct file_operations dbgfs_index_tables_fops = {
	.release = single_release,
	.owner = THIS_MODULE,
};

static const struct file_operations dbgfs_sources_fops = {
	.open = mpq_dmx_tspp2_sources_open,
	.read = seq_read,
@@ -6354,6 +6470,7 @@ static const struct file_operations dbgfs_sources_fops = {
	.owner = THIS_MODULE,
};


/**
 * Initialize a single demux device.
 *
@@ -6533,7 +6650,7 @@ static int __init mpq_dmx_tspp2_plugin_init(void)

		init_completion(&source_info->completion);

		pipe_work_queue_init(&source_info->demux_src.work_queue);
		init_waitqueue_head(&source_info->demux_src.wait_queue);

		/* Initialize source processing thread */
		source_info->demux_src.thread =
@@ -6564,6 +6681,7 @@ static int __init mpq_dmx_tspp2_plugin_init(void)
	for (i = 0; i < TSPP2_NUM_PIPES; i++) {
		mpq_dmx_tspp2_info.pipes[i].handle = TSPP2_INVALID_HANDLE;
		mutex_init(&mpq_dmx_tspp2_info.pipes[i].mutex);
		pipe_work_queue_init(&mpq_dmx_tspp2_info.pipes[i].work_queue);
		spin_lock_init(&mpq_dmx_tspp2_info.pipes[i].lock);
	}
	mpq_dmx_tspp2_info.user_count = 0;
@@ -6619,7 +6737,6 @@ static int __init mpq_dmx_tspp2_plugin_init(void)
				mpq_dmx_tspp2_info.debugfs_dmx_dir,
				NULL,
				&dbgfs_sources_fops);

	}

	return ret;
+14 −9
Original line number Diff line number Diff line
@@ -34,7 +34,7 @@

#define TSPP2_DMX_MAX_FEED_OPS			4

#define TSPP2_DMX_PIPE_WORK_POOL_SIZE		100
#define TSPP2_DMX_PIPE_WORK_POOL_SIZE		500

/* Max number of section filters */
#define TSPP2_DMX_MAX_SECTION_FILTER_NUM	64
@@ -220,13 +220,16 @@ enum mpq_dmx_tspp2_pipe_event {
 *
 * @pipe_info:		Associated pipe
 * @event:		Source event type for this work item
 * @session_id:	pipe_info.session_id cached value at time of pipe work creation
 * @session_id:		pipe_info.session_id cached value at time of pipe
 *			work creation.
 * @event_count:	Number of events included in this work
 * @next:		List node field for pipe_work_queue lists
 */
struct pipe_work {
	struct pipe_info *pipe_info;
	enum mpq_dmx_tspp2_pipe_event event;
	u32 session_id;
	u32 event_count;
	struct list_head next;
};

@@ -237,14 +240,12 @@ struct pipe_work {
 * @work_list:	Queue of pipe_work element source thread should process
 * @free_list:	List of free pipe_work objects
 * @lock:	Lock to protect modifications to lists
 * @wait_queue:	Processing thread wait queue
 */
struct pipe_work_queue {
	struct pipe_work work_pool[TSPP2_DMX_PIPE_WORK_POOL_SIZE];
	struct list_head work_list;
	struct list_head free_list;
	spinlock_t lock;
	wait_queue_head_t wait_queue;
};

/**
@@ -315,6 +316,7 @@ struct mpq_dmx_tspp2_pipe_buffer {
 * @mutex:		Mutex for protecting access to pipe info
 * @eos_pending:	Flag specifying whether the pipe handler has an
 *			end of stream notification that should be handled.
 * @work_queue:		pipe_work queue of work pending for this pipe
 * @hw_notif_count:	Total number of HW notifications
 * @hw_notif_rate_hz:	Rate of HW notifications in unit of Hz
 * @hw_notif_last_time:	Time at which previous HW notification was received
@@ -337,10 +339,13 @@ struct pipe_info {
		enum mpq_dmx_tspp2_pipe_event event);
	struct mutex mutex;
	int eos_pending;
	struct pipe_work_queue work_queue;

	/* debug-fs */
	u32 hw_notif_count;
	u32 hw_notif_rate_hz;
	u32 hw_missed_notif;
	u32 handler_count;
	struct timespec hw_notif_last_time;
};

@@ -490,7 +495,7 @@ struct buffer_insertion_source {
 * struct demuxing_source - demuxing source related resources
 *
 * @thread:			Source processing thread
 * @work_queue:			Pipe work queue object
 * @wait_queue:			Processing thread wait queue
 * @mpq_demux:			Pointer to the demux connected to this source
 * @clear_section_pipe:		Pipe opened to hold clear TS packets of sections
 * @scrambled_section_pipe:	Pipe opened to hold scrambled TS packets of
@@ -498,7 +503,7 @@ struct buffer_insertion_source {
 */
struct demuxing_source {
	struct task_struct *thread;
	struct pipe_work_queue work_queue;
	wait_queue_head_t wait_queue;
	struct mpq_demux *mpq_demux;
	struct pipe_info *clear_section_pipe;
	struct pipe_info *scrambled_section_pipe;