Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit e8344e66 authored by majianpeng's avatar majianpeng Committed by Sage Weil
Browse files

ceph: Implement writev/pwritev for sync operation.



For writev/pwritev sync-operatoin, ceph only do the first iov.

I divided the write-sync-operation into two functions. One for
direct-write, other for none-direct-sync-write. This is because for
none-direct-sync-write we can merge iovs to one. But for direct-write,
we can't merge iovs.

Signed-off-by: default avatarJianpeng Ma <majianpeng@gmail.com>
Reviewed-by: default avatarYan, Zheng <zheng.z.yan@intel.com>
Signed-off-by: default avatarSage Weil <sage@inktank.com>
parent 9f12bd11
Loading
Loading
Loading
Loading
+193 −80
Original line number Diff line number Diff line
@@ -489,83 +489,79 @@ static void ceph_sync_write_unsafe(struct ceph_osd_request *req, bool unsafe)
	}
}


/*
 * Synchronous write, straight from __user pointer or user pages (if
 * O_DIRECT).
 * Synchronous write, straight from __user pointer or user pages.
 *
 * If write spans object boundary, just do multiple writes.  (For a
 * correct atomic write, we should e.g. take write locks on all
 * objects, rollback on failure, etc.)
 */
static ssize_t ceph_sync_write(struct file *file, const char __user *data,
			       size_t left, loff_t pos, loff_t *ppos)
static ssize_t
ceph_sync_direct_write(struct kiocb *iocb, const struct iovec *iov,
		       unsigned long nr_segs, size_t count)
{
	struct file *file = iocb->ki_filp;
	struct inode *inode = file_inode(file);
	struct ceph_inode_info *ci = ceph_inode(inode);
	struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
	struct ceph_snap_context *snapc;
	struct ceph_vino vino;
	struct ceph_osd_request *req;
	int num_ops = 1;
	struct page **pages;
	int num_pages;
	u64 len;
	int written = 0;
	int flags;
	int check_caps = 0;
	int page_align, io_align;
	unsigned long buf_align;
	int page_align;
	int ret;
	struct timespec mtime = CURRENT_TIME;
	bool own_pages = false;
	loff_t pos = iocb->ki_pos;
	struct iov_iter i;

	if (ceph_snap(file_inode(file)) != CEPH_NOSNAP)
		return -EROFS;

	dout("sync_write on file %p %lld~%u %s\n", file, pos,
	     (unsigned)left, (file->f_flags & O_DIRECT) ? "O_DIRECT" : "");
	dout("sync_direct_write on file %p %lld~%u\n", file, pos,
	     (unsigned)count);

	ret = filemap_write_and_wait_range(inode->i_mapping, pos, pos + left);
	ret = filemap_write_and_wait_range(inode->i_mapping, pos, pos + count);
	if (ret < 0)
		return ret;

	ret = invalidate_inode_pages2_range(inode->i_mapping,
					    pos >> PAGE_CACHE_SHIFT,
					    (pos + left) >> PAGE_CACHE_SHIFT);
					    (pos + count) >> PAGE_CACHE_SHIFT);
	if (ret < 0)
		dout("invalidate_inode_pages2_range returned %d\n", ret);

	flags = CEPH_OSD_FLAG_ORDERSNAP |
		CEPH_OSD_FLAG_ONDISK |
		CEPH_OSD_FLAG_WRITE;
	if ((file->f_flags & (O_SYNC|O_DIRECT)) == 0)
		flags |= CEPH_OSD_FLAG_ACK;
	else
		num_ops++;	/* Also include a 'startsync' command. */

	/*
	 * we may need to do multiple writes here if we span an object
	 * boundary.  this isn't atomic, unfortunately.  :(
	 */
more:
	io_align = pos & ~PAGE_MASK;
	buf_align = (unsigned long)data & ~PAGE_MASK;
	len = left;
	iov_iter_init(&i, iov, nr_segs, count, 0);

	while (iov_iter_count(&i) > 0) {
		void __user *data = i.iov->iov_base + i.iov_offset;
		u64 len = i.iov->iov_len - i.iov_offset;

		page_align = (unsigned long)data & ~PAGE_MASK;

		snapc = ci->i_snap_realm->cached_context;
		vino = ceph_vino(inode);
		req = ceph_osdc_new_request(&fsc->client->osdc, &ci->i_layout,
				    vino, pos, &len, num_ops,
					    vino, pos, &len,
					    2,/*include a 'startsync' command*/
					    CEPH_OSD_OP_WRITE, flags, snapc,
				    ci->i_truncate_seq, ci->i_truncate_size,
					    ci->i_truncate_seq,
					    ci->i_truncate_size,
					    false);
	if (IS_ERR(req))
		return PTR_ERR(req);
		if (IS_ERR(req)) {
			ret = PTR_ERR(req);
			goto out;
		}

	/* write from beginning of first page, regardless of io alignment */
	page_align = file->f_flags & O_DIRECT ? buf_align : io_align;
		num_pages = calc_pages_for(page_align, len);
	if (file->f_flags & O_DIRECT) {
		pages = ceph_get_direct_page_vector(data, num_pages, false);
		if (IS_ERR(pages)) {
			ret = PTR_ERR(pages);
@@ -578,27 +574,146 @@ static ssize_t ceph_sync_write(struct file *file, const char __user *data,
		 */
		truncate_inode_pages_range(inode->i_mapping, pos,
				   (pos+len) | (PAGE_CACHE_SIZE-1));
	} else {
		osd_req_op_extent_osd_data_pages(req, 0, pages, len, page_align,
						false, false);

		/* BUG_ON(vino.snap != CEPH_NOSNAP); */
		ceph_osdc_build_request(req, pos, snapc, vino.snap, &mtime);

		ret = ceph_osdc_start_request(&fsc->client->osdc, req, false);
		if (!ret)
			ret = ceph_osdc_wait_request(&fsc->client->osdc, req);

		ceph_put_page_vector(pages, num_pages, false);

out:
		ceph_osdc_put_request(req);
		if (ret == 0) {
			pos += len;
			written += len;
			iov_iter_advance(&i, (size_t)len);

			if (pos > i_size_read(inode)) {
				check_caps = ceph_inode_set_size(inode, pos);
				if (check_caps)
					ceph_check_caps(ceph_inode(inode),
							CHECK_CAPS_AUTHONLY,
							NULL);
			}
		} else
			break;
	}

	if (ret != -EOLDSNAPC && written > 0) {
		iocb->ki_pos = pos;
		ret = written;
	}
	return ret;
}


/*
 * Synchronous write, straight from __user pointer or user pages.
 *
 * If write spans object boundary, just do multiple writes.  (For a
 * correct atomic write, we should e.g. take write locks on all
 * objects, rollback on failure, etc.)
 */
static ssize_t ceph_sync_write(struct kiocb *iocb, const struct iovec *iov,
			       unsigned long nr_segs, size_t count)
{
	struct file *file = iocb->ki_filp;
	struct inode *inode = file_inode(file);
	struct ceph_inode_info *ci = ceph_inode(inode);
	struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
	struct ceph_snap_context *snapc;
	struct ceph_vino vino;
	struct ceph_osd_request *req;
	struct page **pages;
	u64 len;
	int num_pages;
	int written = 0;
	int flags;
	int check_caps = 0;
	int ret;
	struct timespec mtime = CURRENT_TIME;
	loff_t pos = iocb->ki_pos;
	struct iov_iter i;

	if (ceph_snap(file_inode(file)) != CEPH_NOSNAP)
		return -EROFS;

	dout("sync_write on file %p %lld~%u\n", file, pos, (unsigned)count);

	ret = filemap_write_and_wait_range(inode->i_mapping, pos, pos + count);
	if (ret < 0)
		return ret;

	ret = invalidate_inode_pages2_range(inode->i_mapping,
					    pos >> PAGE_CACHE_SHIFT,
					    (pos + count) >> PAGE_CACHE_SHIFT);
	if (ret < 0)
		dout("invalidate_inode_pages2_range returned %d\n", ret);

	flags = CEPH_OSD_FLAG_ORDERSNAP |
		CEPH_OSD_FLAG_ONDISK |
		CEPH_OSD_FLAG_WRITE |
		CEPH_OSD_FLAG_ACK;

	iov_iter_init(&i, iov, nr_segs, count, 0);

	while ((len = iov_iter_count(&i)) > 0) {
		size_t left;
		int n;

		snapc = ci->i_snap_realm->cached_context;
		vino = ceph_vino(inode);
		req = ceph_osdc_new_request(&fsc->client->osdc, &ci->i_layout,
					    vino, pos, &len, 1,
					    CEPH_OSD_OP_WRITE, flags, snapc,
					    ci->i_truncate_seq,
					    ci->i_truncate_size,
					    false);
		if (IS_ERR(req)) {
			ret = PTR_ERR(req);
			goto out;
		}

		/*
		 * write from beginning of first page,
		 * regardless of io alignment
		 */
		num_pages = (len + PAGE_CACHE_SIZE - 1) >> PAGE_CACHE_SHIFT;

		pages = ceph_alloc_page_vector(num_pages, GFP_NOFS);
		if (IS_ERR(pages)) {
			ret = PTR_ERR(pages);
			goto out;
		}
		ret = ceph_copy_user_to_page_vector(pages, data, pos, len);

		left = len;
		for (n = 0; n < num_pages; n++) {
			size_t plen = min(left, PAGE_SIZE);
			ret = iov_iter_copy_from_user(pages[n], &i, 0, plen);
			if (ret != plen) {
				ret = -EFAULT;
				break;
			}
			left -= ret;
			iov_iter_advance(&i, ret);
		}

		if (ret < 0) {
			ceph_release_page_vector(pages, num_pages);
			goto out;
		}

		if ((file->f_flags & O_SYNC) == 0) {
		/* get a second commit callback */
		req->r_unsafe_callback = ceph_sync_write_unsafe;
		req->r_inode = inode;
			own_pages = true;
		}
	}
	osd_req_op_extent_osd_data_pages(req, 0, pages, len, page_align,
					false, own_pages);

		osd_req_op_extent_osd_data_pages(req, 0, pages, len, 0,
						false, true);

		/* BUG_ON(vino.snap != CEPH_NOSNAP); */
		ceph_osdc_build_request(req, pos, snapc, vino.snap, &mtime);
@@ -607,30 +722,26 @@ static ssize_t ceph_sync_write(struct file *file, const char __user *data,
		if (!ret)
			ret = ceph_osdc_wait_request(&fsc->client->osdc, req);

	if (file->f_flags & O_DIRECT)
		ceph_put_page_vector(pages, num_pages, false);
	else if (file->f_flags & O_SYNC)
		ceph_release_page_vector(pages, num_pages);

out:
		ceph_osdc_put_request(req);
		if (ret == 0) {
			pos += len;
			written += len;
		left -= len;
		data += len;
		if (left)
			goto more;

		ret = written;
		*ppos = pos;
		if (pos > i_size_read(inode))
			if (pos > i_size_read(inode)) {
				check_caps = ceph_inode_set_size(inode, pos);
				if (check_caps)
			ceph_check_caps(ceph_inode(inode), CHECK_CAPS_AUTHONLY,
					ceph_check_caps(ceph_inode(inode),
							CHECK_CAPS_AUTHONLY,
							NULL);
	} else if (ret != -EOLDSNAPC && written > 0) {
			}
		} else
			break;
	}

	if (ret != -EOLDSNAPC && written > 0) {
		ret = written;
		iocb->ki_pos = pos;
	}
	return ret;
}
@@ -772,11 +883,13 @@ static ssize_t ceph_aio_write(struct kiocb *iocb, const struct iovec *iov,
	     inode, ceph_vinop(inode), pos, count, ceph_cap_string(got));

	if ((got & (CEPH_CAP_FILE_BUFFER|CEPH_CAP_FILE_LAZYIO)) == 0 ||
	    (iocb->ki_filp->f_flags & O_DIRECT) ||
	    (fi->flags & CEPH_F_SYNC)) {
	    (file->f_flags & O_DIRECT) || (fi->flags & CEPH_F_SYNC)) {
		mutex_unlock(&inode->i_mutex);
		written = ceph_sync_write(file, iov->iov_base, count,
					  pos, &iocb->ki_pos);
		if (file->f_flags & O_DIRECT)
			written = ceph_sync_direct_write(iocb, iov,
							 nr_segs, count);
		else
			written = ceph_sync_write(iocb, iov, nr_segs, count);
		if (written == -EOLDSNAPC) {
			dout("aio_write %p %llx.%llx %llu~%u"
				"got EOLDSNAPC, retrying\n",