Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit 503f82a9 authored by Luis Henriques's avatar Luis Henriques Committed by Ilya Dryomov
Browse files

ceph: support copy_file_range file operation



This commit implements support for the copy_file_range syscall in cephfs.
It is implemented using the RADOS 'copy-from' operation, which allows to
do a remote object copy, without the need to download/upload data from/to
the OSDs.

Some manual copy may however be required if the source/destination file
offsets aren't object aligned or if the copy length is smaller than the
object size.

Signed-off-by: default avatarLuis Henriques <lhenriques@suse.com>
Reviewed-by: default avatar"Yan, Zheng" <zyan@redhat.com>
Signed-off-by: default avatarIlya Dryomov <idryomov@gmail.com>
parent 23ddf9be
Loading
Loading
Loading
Loading
+293 −1
Original line number Diff line number Diff line
// SPDX-License-Identifier: GPL-2.0
#include <linux/ceph/ceph_debug.h>
#include <linux/ceph/striper.h>

#include <linux/module.h>
#include <linux/sched.h>
@@ -1795,6 +1796,297 @@ static long ceph_fallocate(struct file *file, int mode,
	return ret;
}

/*
 * This function tries to get FILE_WR capabilities for dst_ci and FILE_RD for
 * src_ci.  Two attempts are made to obtain both caps, and an error is return if
 * this fails; zero is returned on success.
 */
static int get_rd_wr_caps(struct ceph_inode_info *src_ci,
			  loff_t src_endoff, int *src_got,
			  struct ceph_inode_info *dst_ci,
			  loff_t dst_endoff, int *dst_got)
{
	int ret = 0;
	bool retrying = false;

retry_caps:
	ret = ceph_get_caps(dst_ci, CEPH_CAP_FILE_WR, CEPH_CAP_FILE_BUFFER,
			    dst_endoff, dst_got, NULL);
	if (ret < 0)
		return ret;

	/*
	 * Since we're already holding the FILE_WR capability for the dst file,
	 * we would risk a deadlock by using ceph_get_caps.  Thus, we'll do some
	 * retry dance instead to try to get both capabilities.
	 */
	ret = ceph_try_get_caps(src_ci, CEPH_CAP_FILE_RD, CEPH_CAP_FILE_SHARED,
				false, src_got);
	if (ret <= 0) {
		/* Start by dropping dst_ci caps and getting src_ci caps */
		ceph_put_cap_refs(dst_ci, *dst_got);
		if (retrying) {
			if (!ret)
				/* ceph_try_get_caps masks EAGAIN */
				ret = -EAGAIN;
			return ret;
		}
		ret = ceph_get_caps(src_ci, CEPH_CAP_FILE_RD,
				    CEPH_CAP_FILE_SHARED, src_endoff,
				    src_got, NULL);
		if (ret < 0)
			return ret;
		/*... drop src_ci caps too, and retry */
		ceph_put_cap_refs(src_ci, *src_got);
		retrying = true;
		goto retry_caps;
	}
	return ret;
}

static void put_rd_wr_caps(struct ceph_inode_info *src_ci, int src_got,
			   struct ceph_inode_info *dst_ci, int dst_got)
{
	ceph_put_cap_refs(src_ci, src_got);
	ceph_put_cap_refs(dst_ci, dst_got);
}

/*
 * This function does several size-related checks, returning an error if:
 *  - source file is smaller than off+len
 *  - destination file size is not OK (inode_newsize_ok())
 *  - max bytes quotas is exceeded
 */
static int is_file_size_ok(struct inode *src_inode, struct inode *dst_inode,
			   loff_t src_off, loff_t dst_off, size_t len)
{
	loff_t size, endoff;

	size = i_size_read(src_inode);
	/*
	 * Don't copy beyond source file EOF.  Instead of simply setting length
	 * to (size - src_off), just drop to VFS default implementation, as the
	 * local i_size may be stale due to other clients writing to the source
	 * inode.
	 */
	if (src_off + len > size) {
		dout("Copy beyond EOF (%llu + %zu > %llu)\n",
		     src_off, len, size);
		return -EOPNOTSUPP;
	}
	size = i_size_read(dst_inode);

	endoff = dst_off + len;
	if (inode_newsize_ok(dst_inode, endoff))
		return -EOPNOTSUPP;

	if (ceph_quota_is_max_bytes_exceeded(dst_inode, endoff))
		return -EDQUOT;

	return 0;
}

static ssize_t ceph_copy_file_range(struct file *src_file, loff_t src_off,
				    struct file *dst_file, loff_t dst_off,
				    size_t len, unsigned int flags)
{
	struct inode *src_inode = file_inode(src_file);
	struct inode *dst_inode = file_inode(dst_file);
	struct ceph_inode_info *src_ci = ceph_inode(src_inode);
	struct ceph_inode_info *dst_ci = ceph_inode(dst_inode);
	struct ceph_cap_flush *prealloc_cf;
	struct ceph_object_locator src_oloc, dst_oloc;
	struct ceph_object_id src_oid, dst_oid;
	loff_t endoff = 0, size;
	ssize_t ret = -EIO;
	u64 src_objnum, dst_objnum, src_objoff, dst_objoff;
	u32 src_objlen, dst_objlen, object_size;
	int src_got = 0, dst_got = 0, err, dirty;
	bool do_final_copy = false;

	if (src_inode == dst_inode)
		return -EINVAL;
	if (ceph_snap(dst_inode) != CEPH_NOSNAP)
		return -EROFS;

	/*
	 * Some of the checks below will return -EOPNOTSUPP, which will force a
	 * fallback to the default VFS copy_file_range implementation.  This is
	 * desirable in several cases (for ex, the 'len' is smaller than the
	 * size of the objects, or in cases where that would be more
	 * efficient).
	 */

	if ((src_ci->i_layout.stripe_unit != dst_ci->i_layout.stripe_unit) ||
	    (src_ci->i_layout.stripe_count != dst_ci->i_layout.stripe_count) ||
	    (src_ci->i_layout.object_size != dst_ci->i_layout.object_size))
		return -EOPNOTSUPP;

	if (len < src_ci->i_layout.object_size)
		return -EOPNOTSUPP; /* no remote copy will be done */

	prealloc_cf = ceph_alloc_cap_flush();
	if (!prealloc_cf)
		return -ENOMEM;

	/* Start by sync'ing the source file */
	ret = file_write_and_wait_range(src_file, src_off, (src_off + len));
	if (ret < 0)
		goto out;

	/*
	 * We need FILE_WR caps for dst_ci and FILE_RD for src_ci as other
	 * clients may have dirty data in their caches.  And OSDs know nothing
	 * about caps, so they can't safely do the remote object copies.
	 */
	err = get_rd_wr_caps(src_ci, (src_off + len), &src_got,
			     dst_ci, (dst_off + len), &dst_got);
	if (err < 0) {
		dout("get_rd_wr_caps returned %d\n", err);
		ret = -EOPNOTSUPP;
		goto out;
	}

	ret = is_file_size_ok(src_inode, dst_inode, src_off, dst_off, len);
	if (ret < 0)
		goto out_caps;

	size = i_size_read(dst_inode);
	endoff = dst_off + len;

	/* Drop dst file cached pages */
	ret = invalidate_inode_pages2_range(dst_inode->i_mapping,
					    dst_off >> PAGE_SHIFT,
					    endoff >> PAGE_SHIFT);
	if (ret < 0) {
		dout("Failed to invalidate inode pages (%zd)\n", ret);
		ret = 0; /* XXX */
	}
	src_oloc.pool = src_ci->i_layout.pool_id;
	src_oloc.pool_ns = ceph_try_get_string(src_ci->i_layout.pool_ns);
	dst_oloc.pool = dst_ci->i_layout.pool_id;
	dst_oloc.pool_ns = ceph_try_get_string(dst_ci->i_layout.pool_ns);

	ceph_calc_file_object_mapping(&src_ci->i_layout, src_off,
				      src_ci->i_layout.object_size,
				      &src_objnum, &src_objoff, &src_objlen);
	ceph_calc_file_object_mapping(&dst_ci->i_layout, dst_off,
				      dst_ci->i_layout.object_size,
				      &dst_objnum, &dst_objoff, &dst_objlen);
	/* object-level offsets need to the same */
	if (src_objoff != dst_objoff) {
		ret = -EOPNOTSUPP;
		goto out_caps;
	}

	/*
	 * Do a manual copy if the object offset isn't object aligned.
	 * 'src_objlen' contains the bytes left until the end of the object,
	 * starting at the src_off
	 */
	if (src_objoff) {
		/*
		 * we need to temporarily drop all caps as we'll be calling
		 * {read,write}_iter, which will get caps again.
		 */
		put_rd_wr_caps(src_ci, src_got, dst_ci, dst_got);
		ret = do_splice_direct(src_file, &src_off, dst_file,
				       &dst_off, src_objlen, flags);
		if (ret < 0) {
			dout("do_splice_direct returned %d\n", err);
			goto out;
		}
		len -= ret;
		err = get_rd_wr_caps(src_ci, (src_off + len),
				     &src_got, dst_ci,
				     (dst_off + len), &dst_got);
		if (err < 0)
			goto out;
		err = is_file_size_ok(src_inode, dst_inode,
				      src_off, dst_off, len);
		if (err < 0)
			goto out_caps;
	}
	object_size = src_ci->i_layout.object_size;
	while (len >= object_size) {
		ceph_calc_file_object_mapping(&src_ci->i_layout, src_off,
					      object_size, &src_objnum,
					      &src_objoff, &src_objlen);
		ceph_calc_file_object_mapping(&dst_ci->i_layout, dst_off,
					      object_size, &dst_objnum,
					      &dst_objoff, &dst_objlen);
		ceph_oid_init(&src_oid);
		ceph_oid_printf(&src_oid, "%llx.%08llx",
				src_ci->i_vino.ino, src_objnum);
		ceph_oid_init(&dst_oid);
		ceph_oid_printf(&dst_oid, "%llx.%08llx",
				dst_ci->i_vino.ino, dst_objnum);
		/* Do an object remote copy */
		err = ceph_osdc_copy_from(
			&ceph_inode_to_client(src_inode)->client->osdc,
			src_ci->i_vino.snap, 0,
			&src_oid, &src_oloc,
			CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL |
			CEPH_OSD_OP_FLAG_FADVISE_NOCACHE,
			&dst_oid, &dst_oloc,
			CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL |
			CEPH_OSD_OP_FLAG_FADVISE_DONTNEED, 0);
		if (err) {
			dout("ceph_osdc_copy_from returned %d\n", err);
			if (!ret)
				ret = err;
			goto out_caps;
		}
		len -= object_size;
		src_off += object_size;
		dst_off += object_size;
		ret += object_size;
	}

	if (len)
		/* We still need one final local copy */
		do_final_copy = true;

	file_update_time(dst_file);
	if (endoff > size) {
		int caps_flags = 0;

		/* Let the MDS know about dst file size change */
		if (ceph_quota_is_max_bytes_approaching(dst_inode, endoff))
			caps_flags |= CHECK_CAPS_NODELAY;
		if (ceph_inode_set_size(dst_inode, endoff))
			caps_flags |= CHECK_CAPS_AUTHONLY;
		if (caps_flags)
			ceph_check_caps(dst_ci, caps_flags, NULL);
	}
	/* Mark Fw dirty */
	spin_lock(&dst_ci->i_ceph_lock);
	dst_ci->i_inline_version = CEPH_INLINE_NONE;
	dirty = __ceph_mark_dirty_caps(dst_ci, CEPH_CAP_FILE_WR, &prealloc_cf);
	spin_unlock(&dst_ci->i_ceph_lock);
	if (dirty)
		__mark_inode_dirty(dst_inode, dirty);

out_caps:
	put_rd_wr_caps(src_ci, src_got, dst_ci, dst_got);

	if (do_final_copy) {
		err = do_splice_direct(src_file, &src_off, dst_file,
				       &dst_off, len, flags);
		if (err < 0) {
			dout("do_splice_direct returned %d\n", err);
			goto out;
		}
		len -= err;
		ret += err;
	}

out:
	ceph_free_cap_flush(prealloc_cf);

	return ret;
}

const struct file_operations ceph_file_fops = {
	.open = ceph_open,
	.release = ceph_release,
@@ -1810,5 +2102,5 @@ const struct file_operations ceph_file_fops = {
	.unlocked_ioctl = ceph_ioctl,
	.compat_ioctl	= ceph_ioctl,
	.fallocate	= ceph_fallocate,
	.copy_file_range = ceph_copy_file_range,
};