Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit 1d328aae authored by Linux Build Service Account's avatar Linux Build Service Account Committed by Gerrit - the friendly Code Review server
Browse files

Merge "media: dvb: pipe work queue optimization"

parents ea313504 f7a8be28
Loading
Loading
Loading
Loading
+174 −57
Original line number Diff line number Diff line
@@ -77,7 +77,6 @@ static struct
	struct dentry *debugfs_filters_file;
	struct dentry *debugfs_sources_file;
	struct dentry *debugfs_index_tables_file;

} mpq_dmx_tspp2_info;

/**
@@ -92,7 +91,6 @@ static void pipe_work_queue_init(struct pipe_work_queue *queue)
	spin_lock_init(&queue->lock);
	INIT_LIST_HEAD(&queue->work_list);
	INIT_LIST_HEAD(&queue->free_list);
	init_waitqueue_head(&queue->wait_queue);

	for (i = 0; i < TSPP2_DMX_PIPE_WORK_POOL_SIZE; i++)
		list_add_tail(&queue->work_pool[i].next, &queue->free_list);
@@ -144,7 +142,6 @@ static void pipe_work_queue_push(struct pipe_work_queue *queue,

	spin_lock_irqsave(&queue->lock, flags);
	list_add_tail(&work->next, &queue->work_list);
	wake_up_all(&queue->wait_queue);
	spin_unlock_irqrestore(&queue->lock, flags);
}

@@ -383,7 +380,7 @@ static int mpq_dmx_tspp2_init_idx_table(enum dmx_video_codec codec,
			__func__, table_id, ret);
		return ret;
	}

	table->num_patterns = 0;
	MPQ_DVB_DBG_PRINT(
		"%s: Initialize new index table #%d, prefix=0x%X, mask=0x%X\n",
		__func__, table_id, INDEX_TABLE_PREFIX_VALUE,
@@ -720,6 +717,48 @@ static int mpq_dmx_tspp2_ts_event_check(struct dvb_demux_feed *feed,
	return 0;
}

static int mpq_dmx_tspp2_stream_buffer_event_check(struct dvb_demux_feed *feed,
	struct pipe_info *pipe_info)
{
	int ret;
	u32 session_id;

	if (feed->demux->playback_mode != DMX_PB_MODE_PULL)
		return 0;

	if (!mutex_is_locked(&pipe_info->mutex))
		return -EINVAL;

	/*
	 * For pull mode need to wait for sufficient room to write the
	 * meta-data packet in the mpq_streambuffer object.
	 * Data itself was already written by TSPPv2 hardware (so required_space
	 * argument is 0).
	 * Since this may block waiting for the metadata buffer, pipe mutex
	 * needs to be released, and when returning verify the pipe was not
	 * closed / re-opened in the meantime.
	 */
	session_id = pipe_info->session_id;
	mutex_unlock(&pipe_info->mutex);
	ret = mpq_dmx_decoder_fullness_wait(feed, 0);
	mutex_lock(&pipe_info->mutex);
	if (ret) {
		MPQ_DVB_ERR_PRINT(
			"%s: mpq_dmx_decoder_fullness_wait failed, ret=%d\n",
			__func__, ret);
		return ret;
	}
	if (pipe_info->session_id != session_id || !pipe_info->ref_count) {
		MPQ_DVB_ERR_PRINT(
			"%s: pipe was closed / re-opened: ref. count=%u, session_id=%u (expected %u)\n",
			__func__, pipe_info->ref_count,
			pipe_info->session_id, session_id);
		return -ENODEV;
	}

	return 0;
}

/**
 * mpq_dmx_tspp2_filter_event_cb() - filter event notification handler
 *
@@ -1667,6 +1706,8 @@ static int mpq_dmx_init_out_pipe(struct mpq_demux *mpq_demux,
	pipe_info->source_info = source_info;
	pipe_info->eos_pending = 0;
	pipe_info->session_id++;
	pipe_info->hw_missed_notif = 0;
	pipe_info->handler_count = 0;
	return 0;

close_pipe:
@@ -1732,8 +1773,7 @@ static int mpq_dmx_terminate_out_pipe(struct pipe_info *pipe_info)
	tspp2_pipe_close(pipe_info->handle);
	pipe_info->handle = TSPP2_INVALID_HANDLE;

	pipe_work_queue_cancel_work(&source_info->demux_src.work_queue,
		pipe_info);
	pipe_work_queue_cancel_work(&pipe_info->work_queue, pipe_info);

	if (pipe_info->buffer.kernel_map)
		ion_unmap_kernel(mpq_demux->ion_client,
@@ -2084,25 +2124,51 @@ static void mpq_dmx_tspp2_update_pipe_stats(struct pipe_info *pipe_info)
	pipe_info->hw_notif_count++;
}

static void mpq_dmx_tspp2_queue_pipe_handler(struct pipe_info *pipe_info,
static int mpq_dmx_tspp2_queue_pipe_handler(struct pipe_info *pipe_info,
	enum mpq_dmx_tspp2_pipe_event event)
{
	struct source_info *src;
	struct source_info *src = pipe_info->source_info;
	struct pipe_work_queue *queue = &pipe_info->work_queue;
	struct pipe_work *pipe_work;
	unsigned long flags;

	src = pipe_info->source_info;
	pipe_work = pipe_work_queue_allocate(&src->demux_src.work_queue);
	/* Try to merge data events into 1 pipe work object */
	spin_lock_irqsave(&queue->lock, flags);
	if (!list_empty(&queue->work_list)) {
		pipe_work = list_first_entry(&queue->work_list,
			struct pipe_work, next);
		if (event == PIPE_DATA_EVENT &&
			pipe_work->event == PIPE_DATA_EVENT &&
			pipe_work->event_count &&
			pipe_work->pipe_info == pipe_info &&
			pipe_work->session_id == pipe_info->session_id) {
			pipe_work->event_count++;
			spin_unlock_irqrestore(&queue->lock, flags);
			wake_up_all(&src->demux_src.wait_queue);
			return 0;
		}
	}
	spin_unlock_irqrestore(&queue->lock, flags);

	pipe_work = pipe_work_queue_allocate(&pipe_info->work_queue);
	if (pipe_work == NULL) {
		int pipe_idx = (pipe_info - mpq_dmx_tspp2_info.pipes) /
			sizeof(*pipe_info);
		MPQ_DVB_ERR_PRINT(
			"%s: Cannot allocate pipe work for pipe %d\n",
			__func__, pipe_info->type);
		return;
			"%s: Cannot allocate pipe work for pipe %d, type %d\n",
			__func__, pipe_idx, pipe_info->type);
		return -ENOSPC;
	}

	pipe_work->pipe_info = pipe_info;
	pipe_work->event = event;
	pipe_work->session_id = pipe_info->session_id;
	pipe_work_queue_push(&src->demux_src.work_queue, pipe_work);
	pipe_work->event_count = 1;

	pipe_work_queue_push(&pipe_info->work_queue, pipe_work);
	wake_up_all(&src->demux_src.wait_queue);

	return 0;
}

/**
@@ -2113,6 +2179,7 @@ static void mpq_dmx_tspp2_queue_pipe_handler(struct pipe_info *pipe_info,
static void mpq_dmx_sps_producer_cb(struct sps_event_notify *notify)
{
	struct pipe_info *pipe_info = notify->user;
	int ret;

	MPQ_DVB_DBG_PRINT("%s: Notification event id=%d, handle=%p, type=%d\n",
		__func__, notify->event_id, pipe_info, pipe_info->type);
@@ -2123,14 +2190,19 @@ static void mpq_dmx_sps_producer_cb(struct sps_event_notify *notify)
	mpq_dmx_tspp2_update_pipe_stats(pipe_info);

	/* Schedule a new work to relevant source workqueue */
	if (notify->event_id == SPS_EVENT_OUT_OF_DESC)
		mpq_dmx_tspp2_queue_pipe_handler(pipe_info,
	if (notify->event_id == SPS_EVENT_OUT_OF_DESC) {
		MPQ_DVB_ERR_PRINT("%s: SPS_EVENT_OUT_OF_DESC!\n", __func__);
		ret = mpq_dmx_tspp2_queue_pipe_handler(pipe_info,
			PIPE_OVERFLOW_EVENT);
	else
		mpq_dmx_tspp2_queue_pipe_handler(pipe_info,
	} else {
		ret = mpq_dmx_tspp2_queue_pipe_handler(pipe_info,
			PIPE_DATA_EVENT);
	}

	if (ret)
		pipe_info->hw_missed_notif++;
}

/**
 * Event callback function from SPS driver for input (consumer) pipes.
 *
@@ -2190,13 +2262,14 @@ static void mpq_dmx_timer_cb(unsigned long param)

		/* Schedule a new work to relevant source workqueue */
		pipe_work = pipe_work_queue_allocate(
			&source_info->demux_src.work_queue);
			&pipe_info->work_queue);
		if (pipe_work != NULL) {
			pipe_work->session_id = pipe_info->session_id;
			pipe_work->pipe_info = pipe_info;
			pipe_work->event = PIPE_DATA_EVENT;
			pipe_work_queue_push(&source_info->demux_src.work_queue,
				pipe_work);
			pipe_work->event_count = 1;
			pipe_work_queue_push(&pipe_info->work_queue, pipe_work);
			wake_up_all(&source_info->demux_src.wait_queue);
		} else {
			MPQ_DVB_ERR_PRINT(
				"%s: Cannot allocate pipe work\n", __func__);
@@ -3069,6 +3142,14 @@ static int mpq_dmx_tspp2_process_video_headers(struct mpq_feed *mpq_feed,
		return ret;
	}

	ret = mpq_dmx_tspp2_stream_buffer_event_check(feed, header_pipe);
	if (ret) {
		MPQ_DVB_ERR_PRINT(
			"%s: mpq_dmx_tspp2_stream_buffer_event_check failed, ret=%d\n",
			__func__, ret);
		return ret;
	}

	ret = mpq_streambuffer_pkt_write(stream_buffer, &packet,
		(u8 *)&meta_data);
	if (ret) {
@@ -4596,8 +4677,7 @@ static int mpq_dmx_tspp2_eos_cmd(struct mpq_tspp2_feed *tspp2_feed)

	source_info = pipe_info->source_info;

	pipe_work = pipe_work_queue_allocate(
		&source_info->demux_src.work_queue);
	pipe_work = pipe_work_queue_allocate(&pipe_info->work_queue);
	if (pipe_work == NULL) {
		MPQ_DVB_ERR_PRINT("%s: Cannot allocate pipe work\n", __func__);
		mutex_unlock(&mpq_dmx_tspp2_info.mutex);
@@ -4606,9 +4686,11 @@ static int mpq_dmx_tspp2_eos_cmd(struct mpq_tspp2_feed *tspp2_feed)

	pipe_work->pipe_info = pipe_info;
	pipe_work->event = PIPE_EOS_EVENT;
	pipe_work->event_count = 1;
	pipe_work->session_id = pipe_info->session_id;

	pipe_work_queue_push(&source_info->demux_src.work_queue, pipe_work);
	pipe_work_queue_push(&pipe_info->work_queue, pipe_work);
	wake_up_all(&source_info->demux_src.wait_queue);

	mutex_unlock(&mpq_dmx_tspp2_info.mutex);

@@ -5853,6 +5935,21 @@ static int mpq_dmx_tspp2_write_cancel(struct dmx_demux *demux)
	return 0;
}

static bool mpq_dmx_tspp2_pipe_do_work(struct source_info *source_info)
{
	struct pipe_info *pipe_info;
	int i;

	for (i = 0; i < TSPP2_NUM_PIPES; i++) {
		pipe_info = &mpq_dmx_tspp2_info.pipes[i];
		if (pipe_info->source_info == source_info &&
			(!pipe_work_queue_empty(&pipe_info->work_queue)))
			return true;
	}

	return false;
}

static int mpq_dmx_tspp2_thread(void *arg)
{
	struct source_info *source_info = arg;
@@ -5860,12 +5957,13 @@ static int mpq_dmx_tspp2_thread(void *arg)
	struct pipe_info *pipe_info;
	int ret;
	unsigned long flags;
	int i;
	int j;

	while (1) {
		ret = wait_event_interruptible(
			source_info->demux_src.work_queue.wait_queue,
			!pipe_work_queue_empty(
				&source_info->demux_src.work_queue) ||
			source_info->demux_src.wait_queue,
			mpq_dmx_tspp2_pipe_do_work(source_info) ||
			kthread_should_stop());
		if (ret) {
			MPQ_DVB_ERR_PRINT(
@@ -5878,41 +5976,54 @@ static int mpq_dmx_tspp2_thread(void *arg)
			break;
		}

		pipe_work = pipe_work_queue_pop(
			&source_info->demux_src.work_queue);
		for (i = 0; i < TSPP2_NUM_PIPES; i++) {
			pipe_info = &mpq_dmx_tspp2_info.pipes[i];

			/*
		 * The pipe work might have been flushed
		 * if filter was stopped
			 * Lock pipe mutex to protect against pipe being closed
			 * during its processing
			 */
		if (pipe_work == NULL) {
			MPQ_DVB_DBG_PRINT("%s: pipe was flushed\n", __func__);
			if (mutex_lock_interruptible(&pipe_info->mutex))
				continue;

			if (pipe_info->source_info != source_info ||
				pipe_work_queue_empty(&pipe_info->work_queue)) {
				mutex_unlock(&pipe_info->mutex);
				continue;
			}

		pipe_info = pipe_work->pipe_info;
			pipe_work = pipe_work_queue_pop(&pipe_info->work_queue);

			/*
		 * Lock pipe mutex to protect against pipe being closed
		 * during its processing
			 * The pipe work might have been flushed
			 * if filter was stopped
			 */
		if (mutex_lock_interruptible(&pipe_info->mutex))
			if (pipe_work == NULL) {
				MPQ_DVB_DBG_PRINT(
					"%s: pipe was flushed\n", __func__);
				mutex_unlock(&pipe_info->mutex);
				continue;
			}

			spin_lock_irqsave(&pipe_info->lock, flags);
			if (pipe_info->ref_count && pipe_info->pipe_handler &&
			pipe_work->session_id == pipe_info->session_id) {
				pipe_work->session_id ==
					pipe_info->session_id) {
				spin_unlock_irqrestore(&pipe_info->lock, flags);
			pipe_info->pipe_handler(pipe_info, pipe_work->event);
				for (j = 0; j < pipe_work->event_count; j++)
					pipe_info->pipe_handler(pipe_info,
						pipe_work->event);
				pipe_info->handler_count += j;
			} else {
				spin_unlock_irqrestore(&pipe_info->lock, flags);
			}

		mutex_unlock(&pipe_work->pipe_info->mutex);
			mutex_unlock(&pipe_info->mutex);

		pipe_work_queue_release(&source_info->demux_src.work_queue,
			pipe_work_queue_release(&pipe_info->work_queue,
				pipe_work);
		}
	}

	/* Terminate thread gracefully */
	set_current_state(TASK_INTERRUPTIBLE);
@@ -6143,6 +6254,10 @@ static int mpq_dmx_tspp2_pipes_print(struct seq_file *s, void *p)
				pipe_info->hw_notif_rate_hz);
			seq_printf(s, "interrupt count: %d\n",
				pipe_info->hw_notif_count);
			seq_printf(s, "int. miss count: %d\n",
				pipe_info->hw_missed_notif);
			seq_printf(s, "handler count  : %d\n",
				pipe_info->handler_count);
			seq_printf(s, "buffer address : 0x%p(0x%p)\n",
				pipe_info->buffer.mem,
				(void *)pipe_info->buffer.iova);
@@ -6346,6 +6461,7 @@ static const struct file_operations dbgfs_index_tables_fops = {
	.release = single_release,
	.owner = THIS_MODULE,
};

static const struct file_operations dbgfs_sources_fops = {
	.open = mpq_dmx_tspp2_sources_open,
	.read = seq_read,
@@ -6354,6 +6470,7 @@ static const struct file_operations dbgfs_sources_fops = {
	.owner = THIS_MODULE,
};


/**
 * Initialize a single demux device.
 *
@@ -6533,7 +6650,7 @@ static int __init mpq_dmx_tspp2_plugin_init(void)

		init_completion(&source_info->completion);

		pipe_work_queue_init(&source_info->demux_src.work_queue);
		init_waitqueue_head(&source_info->demux_src.wait_queue);

		/* Initialize source processing thread */
		source_info->demux_src.thread =
@@ -6564,6 +6681,7 @@ static int __init mpq_dmx_tspp2_plugin_init(void)
	for (i = 0; i < TSPP2_NUM_PIPES; i++) {
		mpq_dmx_tspp2_info.pipes[i].handle = TSPP2_INVALID_HANDLE;
		mutex_init(&mpq_dmx_tspp2_info.pipes[i].mutex);
		pipe_work_queue_init(&mpq_dmx_tspp2_info.pipes[i].work_queue);
		spin_lock_init(&mpq_dmx_tspp2_info.pipes[i].lock);
	}
	mpq_dmx_tspp2_info.user_count = 0;
@@ -6619,7 +6737,6 @@ static int __init mpq_dmx_tspp2_plugin_init(void)
				mpq_dmx_tspp2_info.debugfs_dmx_dir,
				NULL,
				&dbgfs_sources_fops);

	}

	return ret;
+14 −9
Original line number Diff line number Diff line
@@ -34,7 +34,7 @@

#define TSPP2_DMX_MAX_FEED_OPS			4

#define TSPP2_DMX_PIPE_WORK_POOL_SIZE		100
#define TSPP2_DMX_PIPE_WORK_POOL_SIZE		500

/* Max number of section filters */
#define TSPP2_DMX_MAX_SECTION_FILTER_NUM	64
@@ -220,13 +220,16 @@ enum mpq_dmx_tspp2_pipe_event {
 *
 * @pipe_info:		Associated pipe
 * @event:		Source event type for this work item
 * @session_id:	pipe_info.session_id cached value at time of pipe work creation
 * @session_id:		pipe_info.session_id cached value at time of pipe
 *			work creation.
 * @event_count:	Number of events included in this work
 * @next:		List node field for pipe_work_queue lists
 */
struct pipe_work {
	struct pipe_info *pipe_info;
	enum mpq_dmx_tspp2_pipe_event event;
	u32 session_id;
	u32 event_count;
	struct list_head next;
};

@@ -237,14 +240,12 @@ struct pipe_work {
 * @work_list:	Queue of pipe_work element source thread should process
 * @free_list:	List of free pipe_work objects
 * @lock:	Lock to protect modifications to lists
 * @wait_queue:	Processing thread wait queue
 */
struct pipe_work_queue {
	struct pipe_work work_pool[TSPP2_DMX_PIPE_WORK_POOL_SIZE];
	struct list_head work_list;
	struct list_head free_list;
	spinlock_t lock;
	wait_queue_head_t wait_queue;
};

/**
@@ -315,6 +316,7 @@ struct mpq_dmx_tspp2_pipe_buffer {
 * @mutex:		Mutex for protecting access to pipe info
 * @eos_pending:	Flag specifying whether the pipe handler has an
 *			end of stream notification that should be handled.
 * @work_queue:		pipe_work queue of work pending for this pipe
 * @hw_notif_count:	Total number of HW notifications
 * @hw_notif_rate_hz:	Rate of HW notifications in unit of Hz
 * @hw_notif_last_time:	Time at which previous HW notification was received
@@ -337,10 +339,13 @@ struct pipe_info {
		enum mpq_dmx_tspp2_pipe_event event);
	struct mutex mutex;
	int eos_pending;
	struct pipe_work_queue work_queue;

	/* debug-fs */
	u32 hw_notif_count;
	u32 hw_notif_rate_hz;
	u32 hw_missed_notif;
	u32 handler_count;
	struct timespec hw_notif_last_time;
};

@@ -490,7 +495,7 @@ struct buffer_insertion_source {
 * struct demuxing_source - demuxing source related resources
 *
 * @thread:			Source processing thread
 * @work_queue:			Pipe work queue object
 * @wait_queue:			Processing thread wait queue
 * @mpq_demux:			Pointer to the demux connected to this source
 * @clear_section_pipe:		Pipe opened to hold clear TS packets of sections
 * @scrambled_section_pipe:	Pipe opened to hold scrambled TS packets of
@@ -498,7 +503,7 @@ struct buffer_insertion_source {
 */
struct demuxing_source {
	struct task_struct *thread;
	struct pipe_work_queue work_queue;
	wait_queue_head_t wait_queue;
	struct mpq_demux *mpq_demux;
	struct pipe_info *clear_section_pipe;
	struct pipe_info *scrambled_section_pipe;