Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit 584aa810 authored by Fred Isaman's avatar Fred Isaman Committed by Trond Myklebust
Browse files

NFS: rewrite directio read to use async coalesce code



This also has the advantage that it allows directio to use pnfs.

Signed-off-by: default avatarFred Isaman <iisaman@netapp.com>
Signed-off-by: default avatarTrond Myklebust <Trond.Myklebust@netapp.com>
parent 1825a0d0
Loading
Loading
Loading
Loading
+123 −132
Original line number Diff line number Diff line
@@ -124,22 +124,6 @@ ssize_t nfs_direct_IO(int rw, struct kiocb *iocb, const struct iovec *iov, loff_
	return -EINVAL;
}

static void nfs_direct_dirty_pages(struct page **pages, unsigned int pgbase, size_t count)
{
	unsigned int npages;
	unsigned int i;

	if (count == 0)
		return;
	pages += (pgbase >> PAGE_SHIFT);
	npages = (count + (pgbase & ~PAGE_MASK) + PAGE_SIZE - 1) >> PAGE_SHIFT;
	for (i = 0; i < npages; i++) {
		struct page *page = pages[i];
		if (!PageCompound(page))
			set_page_dirty(page);
	}
}

static void nfs_direct_release_pages(struct page **pages, unsigned int npages)
{
	unsigned int i;
@@ -226,58 +210,92 @@ static void nfs_direct_complete(struct nfs_direct_req *dreq)
	nfs_direct_req_release(dreq);
}

/*
 * We must hold a reference to all the pages in this direct read request
 * until the RPCs complete.  This could be long *after* we are woken up in
 * nfs_direct_wait (for instance, if someone hits ^C on a slow server).
 */
static void nfs_direct_read_result(struct rpc_task *task, void *calldata)
void nfs_direct_readpage_release(struct nfs_page *req)
{
	struct nfs_read_data *data = calldata;

	nfs_readpage_result(task, data);
	dprintk("NFS: direct read done (%s/%lld %d@%lld)\n",
		req->wb_context->dentry->d_inode->i_sb->s_id,
		(long long)NFS_FILEID(req->wb_context->dentry->d_inode),
		req->wb_bytes,
		(long long)req_offset(req));
	nfs_release_request(req);
}

static void nfs_direct_read_release(void *calldata)
static void nfs_direct_read_completion(struct nfs_pgio_header *hdr)
{
	unsigned long bytes = 0;
	struct nfs_direct_req *dreq = hdr->dreq;

	struct nfs_read_data *data = calldata;
	struct nfs_direct_req *dreq = (struct nfs_direct_req *)data->header->req;
	int status = data->task.tk_status;
	if (test_bit(NFS_IOHDR_REDO, &hdr->flags))
		goto out_put;

	spin_lock(&dreq->lock);
	if (unlikely(status < 0)) {
		dreq->error = status;
	if (test_bit(NFS_IOHDR_ERROR, &hdr->flags) && (hdr->good_bytes == 0))
		dreq->error = hdr->error;
	else
		dreq->count += hdr->good_bytes;
	spin_unlock(&dreq->lock);

	if (!test_bit(NFS_IOHDR_ERROR, &hdr->flags)) {
		while (!list_empty(&hdr->pages)) {
			struct nfs_page *req = nfs_list_entry(hdr->pages.next);
			struct page *page = req->wb_page;

			if (test_bit(NFS_IOHDR_EOF, &hdr->flags)) {
				if (bytes > hdr->good_bytes)
					zero_user(page, 0, PAGE_SIZE);
				else if (hdr->good_bytes - bytes < PAGE_SIZE)
					zero_user_segment(page,
						hdr->good_bytes & ~PAGE_MASK,
						PAGE_SIZE);
			}
			bytes += req->wb_bytes;
			nfs_list_remove_request(req);
			nfs_direct_readpage_release(req);
			if (!PageCompound(page))
				set_page_dirty(page);
			page_cache_release(page);
		}
	} else {
		dreq->count += data->res.count;
		spin_unlock(&dreq->lock);
		nfs_direct_dirty_pages(data->pages.pagevec,
				data->args.pgbase,
				data->res.count);
		while (!list_empty(&hdr->pages)) {
			struct nfs_page *req = nfs_list_entry(hdr->pages.next);

			if (bytes < hdr->good_bytes)
				if (!PageCompound(req->wb_page))
					set_page_dirty(req->wb_page);
			bytes += req->wb_bytes;
			page_cache_release(req->wb_page);
			nfs_list_remove_request(req);
			nfs_direct_readpage_release(req);
		}
	nfs_direct_release_pages(data->pages.pagevec, data->pages.npages);

	}
out_put:
	if (put_dreq(dreq))
		nfs_direct_complete(dreq);
	nfs_readdata_release(data);
	hdr->release(hdr);
}

static const struct rpc_call_ops nfs_read_direct_ops = {
	.rpc_call_prepare = nfs_read_prepare,
	.rpc_call_done = nfs_direct_read_result,
	.rpc_release = nfs_direct_read_release,
};

static void nfs_direct_readhdr_release(struct nfs_read_header *rhdr)
static void nfs_sync_pgio_error(struct list_head *head)
{
	struct nfs_read_data *data = &rhdr->rpc_data;
	struct nfs_page *req;

	if (data->pages.pagevec != data->pages.page_array)
		kfree(data->pages.pagevec);
	nfs_readhdr_free(&rhdr->header);
	while (!list_empty(head)) {
		req = nfs_list_entry(head->next);
		nfs_list_remove_request(req);
		nfs_release_request(req);
	}
}

static void nfs_direct_pgio_init(struct nfs_pgio_header *hdr)
{
	get_dreq(hdr->dreq);
}

static const struct nfs_pgio_completion_ops nfs_direct_read_completion_ops = {
	.error_cleanup = nfs_sync_pgio_error,
	.init_hdr = nfs_direct_pgio_init,
	.completion = nfs_direct_read_completion,
};

/*
 * For each rsize'd chunk of the user's buffer, dispatch an NFS READ
 * operation.  If nfs_readdata_alloc() or get_user_pages() fails,
@@ -285,118 +303,85 @@ static void nfs_direct_readhdr_release(struct nfs_read_header *rhdr)
 * handled automatically by nfs_direct_read_result().  Otherwise, if
 * no requests have been sent, just return an error.
 */
static ssize_t nfs_direct_read_schedule_segment(struct nfs_direct_req *dreq,
static ssize_t nfs_direct_read_schedule_segment(struct nfs_pageio_descriptor *desc,
						const struct iovec *iov,
						loff_t pos)
{
	struct nfs_direct_req *dreq = desc->pg_dreq;
	struct nfs_open_context *ctx = dreq->ctx;
	struct inode *inode = ctx->dentry->d_inode;
	unsigned long user_addr = (unsigned long)iov->iov_base;
	size_t count = iov->iov_len;
	size_t rsize = NFS_SERVER(inode)->rsize;
	struct rpc_task *task;
	struct rpc_message msg = {
		.rpc_cred = ctx->cred,
	};
	struct rpc_task_setup task_setup_data = {
		.rpc_client = NFS_CLIENT(inode),
		.rpc_message = &msg,
		.callback_ops = &nfs_read_direct_ops,
		.workqueue = nfsiod_workqueue,
		.flags = RPC_TASK_ASYNC,
	};
	unsigned int pgbase;
	int result;
	ssize_t started = 0;
	struct page **pagevec = NULL;
	unsigned int npages;

	do {
		struct nfs_read_header *rhdr;
		struct nfs_read_data *data;
		struct nfs_page_array *pages;
		size_t bytes;
		int i;

		pgbase = user_addr & ~PAGE_MASK;
		bytes = min(rsize,count);
		bytes = min(max(rsize, PAGE_SIZE), count);

		result = -ENOMEM;
		rhdr = nfs_readhdr_alloc();
		if (unlikely(!rhdr))
			break;
		data = nfs_readdata_alloc(&rhdr->header, nfs_page_array_len(pgbase, bytes));
		if (!data) {
			nfs_readhdr_free(&rhdr->header);
		npages = nfs_page_array_len(pgbase, bytes);
		if (!pagevec)
			pagevec = kmalloc(npages * sizeof(struct page *),
					  GFP_KERNEL);
		if (!pagevec)
			break;
		}
		data->header = &rhdr->header;
		atomic_inc(&data->header->refcnt);
		pages = &data->pages;

		down_read(&current->mm->mmap_sem);
		result = get_user_pages(current, current->mm, user_addr,
					pages->npages, 1, 0, pages->pagevec, NULL);
					npages, 1, 0, pagevec, NULL);
		up_read(&current->mm->mmap_sem);
		if (result < 0) {
			nfs_direct_readhdr_release(rhdr);
		if (result < 0)
			break;
		}
		if ((unsigned)result < pages->npages) {
		if ((unsigned)result < npages) {
			bytes = result * PAGE_SIZE;
			if (bytes <= pgbase) {
				nfs_direct_release_pages(pages->pagevec, result);
				nfs_direct_readhdr_release(rhdr);
				nfs_direct_release_pages(pagevec, result);
				break;
			}
			bytes -= pgbase;
			pages->npages = result;
			npages = result;
		}

		get_dreq(dreq);

		rhdr->header.req = (struct nfs_page *) dreq;
		rhdr->header.inode = inode;
		rhdr->header.cred = msg.rpc_cred;
		data->args.fh = NFS_FH(inode);
		data->args.context = get_nfs_open_context(ctx);
		data->args.lock_context = dreq->l_ctx;
		data->args.offset = pos;
		data->args.pgbase = pgbase;
		data->args.pages = pages->pagevec;
		data->args.count = bytes;
		data->res.fattr = &data->fattr;
		data->res.eof = 0;
		data->res.count = bytes;
		nfs_fattr_init(&data->fattr);
		msg.rpc_argp = &data->args;
		msg.rpc_resp = &data->res;

		task_setup_data.task = &data->task;
		task_setup_data.callback_data = data;
		NFS_PROTO(inode)->read_setup(data, &msg);

		task = rpc_run_task(&task_setup_data);
		if (IS_ERR(task))
		for (i = 0; i < npages; i++) {
			struct nfs_page *req;
			unsigned int req_len = min(bytes, PAGE_SIZE - pgbase);
			/* XXX do we need to do the eof zeroing found in async_filler? */
			req = nfs_create_request(dreq->ctx, dreq->inode,
						 pagevec[i],
						 pgbase, req_len);
			if (IS_ERR(req)) {
				nfs_direct_release_pages(pagevec + i,
							 npages - i);
				result = PTR_ERR(req);
				break;

		dprintk("NFS: %5u initiated direct read call "
			"(req %s/%Ld, %zu bytes @ offset %Lu)\n",
				task->tk_pid,
				inode->i_sb->s_id,
				(long long)NFS_FILEID(inode),
				bytes,
				(unsigned long long)data->args.offset);
		rpc_put_task(task);

		started += bytes;
		user_addr += bytes;
		pos += bytes;
		/* FIXME: Remove this unnecessary math from final patch */
		pgbase += bytes;
		pgbase &= ~PAGE_MASK;
		BUG_ON(pgbase != (user_addr & ~PAGE_MASK));

		count -= bytes;
			}
			req->wb_index = pos >> PAGE_SHIFT;
			req->wb_offset = pos & ~PAGE_MASK;
			if (!nfs_pageio_add_request(desc, req)) {
				result = desc->pg_error;
				nfs_release_request(req);
				nfs_direct_release_pages(pagevec + i,
							 npages - i);
				break;
			}
			pgbase = 0;
			bytes -= req_len;
			started += req_len;
			user_addr += req_len;
			pos += req_len;
			count -= req_len;
		}
	} while (count != 0);

	kfree(pagevec);

	if (started)
		return started;
	return result < 0 ? (ssize_t) result : -EFAULT;
@@ -407,15 +392,19 @@ static ssize_t nfs_direct_read_schedule_iovec(struct nfs_direct_req *dreq,
					      unsigned long nr_segs,
					      loff_t pos)
{
	struct nfs_pageio_descriptor desc;
	ssize_t result = -EINVAL;
	size_t requested_bytes = 0;
	unsigned long seg;

	nfs_pageio_init_read(&desc, dreq->inode,
			     &nfs_direct_read_completion_ops);
	get_dreq(dreq);
	desc.pg_dreq = dreq;

	for (seg = 0; seg < nr_segs; seg++) {
		const struct iovec *vec = &iov[seg];
		result = nfs_direct_read_schedule_segment(dreq, vec, pos);
		result = nfs_direct_read_schedule_segment(&desc, vec, pos);
		if (result < 0)
			break;
		requested_bytes += result;
@@ -424,6 +413,8 @@ static ssize_t nfs_direct_read_schedule_iovec(struct nfs_direct_req *dreq,
		pos += vec->iov_len;
	}

	nfs_pageio_complete(&desc);

	/*
	 * If no bytes were started, return the error, and let the
	 * generic layer handle the completion.
+3 −2
Original line number Diff line number Diff line
@@ -304,8 +304,9 @@ struct nfs_pgio_completion_ops;
/* read.c */
extern struct nfs_read_header *nfs_readhdr_alloc(void);
extern void nfs_readhdr_free(struct nfs_pgio_header *hdr);
extern struct nfs_read_data *nfs_readdata_alloc(struct nfs_pgio_header *hdr,
						unsigned int pagecount);
extern void nfs_pageio_init_read(struct nfs_pageio_descriptor *pgio,
			struct inode *inode,
			const struct nfs_pgio_completion_ops *compl_ops);
extern int nfs_initiate_read(struct rpc_clnt *clnt,
			     struct nfs_read_data *data,
			     const struct rpc_call_ops *call_ops);
+4 −3
Original line number Diff line number Diff line
@@ -48,8 +48,11 @@ void nfs_pgheader_init(struct nfs_pageio_descriptor *desc,
	hdr->cred = hdr->req->wb_context->cred;
	hdr->io_start = req_offset(hdr->req);
	hdr->good_bytes = desc->pg_count;
	hdr->dreq = desc->pg_dreq;
	hdr->release = release;
	hdr->completion_ops = desc->pg_completion_ops;
	if (hdr->completion_ops->init_hdr)
		hdr->completion_ops->init_hdr(hdr);
}

void nfs_set_pgio_error(struct nfs_pgio_header *hdr, int error, loff_t pos)
@@ -116,9 +119,6 @@ nfs_create_request(struct nfs_open_context *ctx, struct inode *inode,
	req->wb_page    = page;
	req->wb_index	= page->index;
	page_cache_get(page);
	BUG_ON(PagePrivate(page));
	BUG_ON(!PageLocked(page));
	BUG_ON(page->mapping->host != inode);
	req->wb_offset  = offset;
	req->wb_pgbase	= offset;
	req->wb_bytes   = count;
@@ -257,6 +257,7 @@ void nfs_pageio_init(struct nfs_pageio_descriptor *desc,
	desc->pg_ioflags = io_flags;
	desc->pg_error = 0;
	desc->pg_lseg = NULL;
	desc->pg_dreq = NULL;
}

/**
+5 −5
Original line number Diff line number Diff line
@@ -51,7 +51,7 @@ struct nfs_read_header *nfs_readhdr_alloc()
	return rhdr;
}

struct nfs_read_data *nfs_readdata_alloc(struct nfs_pgio_header *hdr,
static struct nfs_read_data *nfs_readdata_alloc(struct nfs_pgio_header *hdr,
						unsigned int pagecount)
{
	struct nfs_read_data *data, *prealloc;
@@ -123,7 +123,7 @@ void nfs_pageio_reset_read_mds(struct nfs_pageio_descriptor *pgio)
}
EXPORT_SYMBOL_GPL(nfs_pageio_reset_read_mds);

static void nfs_pageio_init_read(struct nfs_pageio_descriptor *pgio,
void nfs_pageio_init_read(struct nfs_pageio_descriptor *pgio,
			  struct inode *inode,
			  const struct nfs_pgio_completion_ops *compl_ops)
{
+1 −0
Original line number Diff line number Diff line
@@ -68,6 +68,7 @@ struct nfs_pageio_descriptor {
	const struct rpc_call_ops *pg_rpc_callops;
	const struct nfs_pgio_completion_ops *pg_completion_ops;
	struct pnfs_layout_segment *pg_lseg;
	struct nfs_direct_req	*pg_dreq;
};

#define NFS_WBACK_BUSY(req)	(test_bit(PG_BUSY,&(req)->wb_flags))
Loading