Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit 15ce4a0c authored by Chuck Lever's avatar Chuck Lever Committed by Trond Myklebust
Browse files

NFS: Replace atomic_t variables in nfs_direct_req with a single spin lock



Three atomic_t variables cause a lot of bus locking.  Because they are all
used in the same places in the code, just use a single spin lock.

Now that the atomic_t variables are gone, we can remove the request size
limitation since the code no longer depends on the limited width of atomic_t
on some platforms.

Test plan:
Compile with CONFIG_NFS and CONFIG_NFS_DIRECTIO enabled.  Millions of fsx
operations, iozone, OraSim.

Signed-off-by: default avatarChuck Lever <cel@netapp.com>
Signed-off-by: default avatarTrond Myklebust <Trond.Myklebust@netapp.com>
parent 88467055
Loading
Loading
Loading
Loading
+47 −34
Original line number Diff line number Diff line
@@ -58,7 +58,6 @@
#include "iostat.h"

#define NFSDBG_FACILITY		NFSDBG_VFS
#define MAX_DIRECTIO_SIZE	(4096UL << PAGE_SHIFT)

static void nfs_free_user_pages(struct page **pages, int npages, int do_dirty);
static kmem_cache_t *nfs_direct_cachep;
@@ -68,6 +67,8 @@ static kmem_cache_t *nfs_direct_cachep;
 */
struct nfs_direct_req {
	struct kref		kref;		/* release manager */

	/* I/O parameters */
	struct list_head	list;		/* nfs_read/write_data structs */
	struct file *		filp;		/* file descriptor */
	struct kiocb *		iocb;		/* controlling i/o request */
@@ -75,12 +76,14 @@ struct nfs_direct_req {
	struct inode *		inode;		/* target file of i/o */
	struct page **		pages;		/* pages in our buffer */
	unsigned int		npages;		/* count of pages */
	atomic_t		complete,	/* i/os we're waiting for */
				count,		/* bytes actually processed */

	/* completion state */
	spinlock_t		lock;		/* protect completion state */
	int			outstanding;	/* i/os we're waiting for */
	ssize_t			count,		/* bytes actually processed */
				error;		/* any reported error */
};


/**
 * nfs_direct_IO - NFS address space operation for direct I/O
 * @rw: direction (read or write)
@@ -110,12 +113,6 @@ static inline int nfs_get_user_pages(int rw, unsigned long user_addr, size_t siz
	unsigned long page_count;
	size_t array_size;

	/* set an arbitrary limit to prevent type overflow */
	if (size > MAX_DIRECTIO_SIZE) {
		*pages = NULL;
		return -EFBIG;
	}

	page_count = (user_addr + size + PAGE_SIZE - 1) >> PAGE_SHIFT;
	page_count -= user_addr >> PAGE_SHIFT;

@@ -164,8 +161,10 @@ static inline struct nfs_direct_req *nfs_direct_req_alloc(void)
	init_waitqueue_head(&dreq->wait);
	INIT_LIST_HEAD(&dreq->list);
	dreq->iocb = NULL;
	atomic_set(&dreq->count, 0);
	atomic_set(&dreq->error, 0);
	spin_lock_init(&dreq->lock);
	dreq->outstanding = 0;
	dreq->count = 0;
	dreq->error = 0;

	return dreq;
}
@@ -181,19 +180,18 @@ static void nfs_direct_req_release(struct kref *kref)
 */
static ssize_t nfs_direct_wait(struct nfs_direct_req *dreq)
{
	int result = -EIOCBQUEUED;
	ssize_t result = -EIOCBQUEUED;

	/* Async requests don't wait here */
	if (dreq->iocb)
		goto out;

	result = wait_event_interruptible(dreq->wait,
					(atomic_read(&dreq->complete) == 0));
	result = wait_event_interruptible(dreq->wait, (dreq->outstanding == 0));

	if (!result)
		result = atomic_read(&dreq->error);
		result = dreq->error;
	if (!result)
		result = atomic_read(&dreq->count);
		result = dreq->count;

out:
	kref_put(&dreq->kref, nfs_direct_req_release);
@@ -214,9 +212,9 @@ static void nfs_direct_complete(struct nfs_direct_req *dreq)
	nfs_free_user_pages(dreq->pages, dreq->npages, 1);

	if (dreq->iocb) {
		long res = atomic_read(&dreq->error);
		long res = (long) dreq->error;
		if (!res)
			res = atomic_read(&dreq->count);
			res = (long) dreq->count;
		aio_complete(dreq->iocb, res, 0);
	} else
		wake_up(&dreq->wait);
@@ -233,7 +231,6 @@ static struct nfs_direct_req *nfs_direct_read_alloc(size_t nbytes, size_t rsize)
{
	struct list_head *list;
	struct nfs_direct_req *dreq;
	unsigned int reads = 0;
	unsigned int rpages = (rsize + PAGE_CACHE_SIZE - 1) >> PAGE_CACHE_SHIFT;

	dreq = nfs_direct_req_alloc();
@@ -259,13 +256,12 @@ static struct nfs_direct_req *nfs_direct_read_alloc(size_t nbytes, size_t rsize)
		list_add(&data->pages, list);

		data->req = (struct nfs_page *) dreq;
		reads++;
		dreq->outstanding++;
		if (nbytes <= rsize)
			break;
		nbytes -= rsize;
	}
	kref_get(&dreq->kref);
	atomic_set(&dreq->complete, reads);
	return dreq;
}

@@ -276,12 +272,20 @@ static void nfs_direct_read_result(struct rpc_task *task, void *calldata)

	if (nfs_readpage_result(task, data) != 0)
		return;

	spin_lock(&dreq->lock);

	if (likely(task->tk_status >= 0))
		atomic_add(data->res.count, &dreq->count);
		dreq->count += data->res.count;
	else
		atomic_set(&dreq->error, task->tk_status);
		dreq->error = task->tk_status;

	if (unlikely(atomic_dec_and_test(&dreq->complete)))
	if (--dreq->outstanding) {
		spin_unlock(&dreq->lock);
		return;
	}

	spin_unlock(&dreq->lock);
	nfs_direct_complete(dreq);
}

@@ -388,7 +392,6 @@ static struct nfs_direct_req *nfs_direct_write_alloc(size_t nbytes, size_t wsize
{
	struct list_head *list;
	struct nfs_direct_req *dreq;
	unsigned int writes = 0;
	unsigned int wpages = (wsize + PAGE_CACHE_SIZE - 1) >> PAGE_CACHE_SHIFT;

	dreq = nfs_direct_req_alloc();
@@ -414,16 +417,19 @@ static struct nfs_direct_req *nfs_direct_write_alloc(size_t nbytes, size_t wsize
		list_add(&data->pages, list);

		data->req = (struct nfs_page *) dreq;
		writes++;
		dreq->outstanding++;
		if (nbytes <= wsize)
			break;
		nbytes -= wsize;
	}
	kref_get(&dreq->kref);
	atomic_set(&dreq->complete, writes);
	return dreq;
}

/*
 * NB: Return the value of the first error return code.  Subsequent
 *     errors after the first one are ignored.
 */
static void nfs_direct_write_result(struct rpc_task *task, void *calldata)
{
	struct nfs_write_data *data = calldata;
@@ -436,16 +442,23 @@ static void nfs_direct_write_result(struct rpc_task *task, void *calldata)
	if (unlikely(data->res.verf->committed != NFS_FILE_SYNC))
		status = -EIO;

	spin_lock(&dreq->lock);

	if (likely(status >= 0))
		atomic_add(data->res.count, &dreq->count);
		dreq->count += data->res.count;
	else
		atomic_set(&dreq->error, status);
		dreq->error = status;

	if (--dreq->outstanding) {
		spin_unlock(&dreq->lock);
		return;
	}

	spin_unlock(&dreq->lock);

	if (unlikely(atomic_dec_and_test(&dreq->complete))) {
	nfs_end_data_update(data->inode);
	nfs_direct_complete(dreq);
}
}

static const struct rpc_call_ops nfs_write_direct_ops = {
	.rpc_call_done = nfs_direct_write_result,