Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit 0c76c6ba authored by Linus Torvalds's avatar Linus Torvalds
Browse files
Pull Ceph updates from Sage Weil:
 "We have a pile of bug fixes from Ilya, including a few patches that
  sync up the CRUSH code with the latest from userspace.

  There is also a long series from Zheng that fixes various issues with
  snapshots, inline data, and directory fsync, some simplification and
  improvement in the cap release code, and a rework of the caching of
  directory contents.

  To top it off there are a few small fixes and cleanups from Benoit and
  Hong"

* 'for-linus' of git://git.kernel.org/pub/scm/linux/kernel/git/sage/ceph-client: (40 commits)
  rbd: use GFP_NOIO in rbd_obj_request_create()
  crush: fix a bug in tree bucket decode
  libceph: Fix ceph_tcp_sendpage()'s more boolean usage
  libceph: Remove spurious kunmap() of the zero page
  rbd: queue_depth map option
  rbd: store rbd_options in rbd_device
  rbd: terminate rbd_opts_tokens with Opt_err
  ceph: fix ceph_writepages_start()
  rbd: bump queue_max_segments
  ceph: rework dcache readdir
  crush: sync up with userspace
  crush: fix crash from invalid 'take' argument
  ceph: switch some GFP_NOFS memory allocation to GFP_KERNEL
  ceph: pre-allocate data structure that tracks caps flushing
  ceph: re-send flushing caps (which are revoked) in reconnect stage
  ceph: send TID of the oldest pending caps flush to MDS
  ceph: track pending caps flushing globally
  ceph: track pending caps flushing accurately
  libceph: fix wrong name "Ceph filesystem for Linux"
  ceph: fix directory fsync
  ...
parents 8688d954 5a60e876
Loading
Loading
Loading
Loading
+69 −42
Original line number Diff line number Diff line
@@ -346,6 +346,7 @@ struct rbd_device {
	struct rbd_image_header	header;
	unsigned long		flags;		/* possibly lock protected */
	struct rbd_spec		*spec;
	struct rbd_options	*opts;

	char			*header_name;

@@ -724,34 +725,36 @@ static struct rbd_client *rbd_client_find(struct ceph_options *ceph_opts)
}

/*
 * mount options
 * (Per device) rbd map options
 */
enum {
	Opt_queue_depth,
	Opt_last_int,
	/* int args above */
	Opt_last_string,
	/* string args above */
	Opt_read_only,
	Opt_read_write,
	/* Boolean args above */
	Opt_last_bool,
	Opt_err
};

static match_table_t rbd_opts_tokens = {
	{Opt_queue_depth, "queue_depth=%d"},
	/* int args above */
	/* string args above */
	{Opt_read_only, "read_only"},
	{Opt_read_only, "ro"},		/* Alternate spelling */
	{Opt_read_write, "read_write"},
	{Opt_read_write, "rw"},		/* Alternate spelling */
	/* Boolean args above */
	{-1, NULL}
	{Opt_err, NULL}
};

struct rbd_options {
	int	queue_depth;
	bool	read_only;
};

#define RBD_QUEUE_DEPTH_DEFAULT	BLKDEV_MAX_RQ
#define RBD_READ_ONLY_DEFAULT	false

static int parse_rbd_opts_token(char *c, void *private)
@@ -761,27 +764,27 @@ static int parse_rbd_opts_token(char *c, void *private)
	int token, intval, ret;

	token = match_token(c, rbd_opts_tokens, argstr);
	if (token < 0)
		return -EINVAL;

	if (token < Opt_last_int) {
		ret = match_int(&argstr[0], &intval);
		if (ret < 0) {
			pr_err("bad mount option arg (not int) "
			       "at '%s'\n", c);
			pr_err("bad mount option arg (not int) at '%s'\n", c);
			return ret;
		}
		dout("got int token %d val %d\n", token, intval);
	} else if (token > Opt_last_int && token < Opt_last_string) {
		dout("got string token %d val %s\n", token,
		     argstr[0].from);
	} else if (token > Opt_last_string && token < Opt_last_bool) {
		dout("got Boolean token %d\n", token);
		dout("got string token %d val %s\n", token, argstr[0].from);
	} else {
		dout("got token %d\n", token);
	}

	switch (token) {
	case Opt_queue_depth:
		if (intval < 1) {
			pr_err("queue_depth out of range\n");
			return -EINVAL;
		}
		rbd_opts->queue_depth = intval;
		break;
	case Opt_read_only:
		rbd_opts->read_only = true;
		break;
@@ -789,9 +792,10 @@ static int parse_rbd_opts_token(char *c, void *private)
		rbd_opts->read_only = false;
		break;
	default:
		rbd_assert(false);
		break;
		/* libceph prints "bad option" msg */
		return -EINVAL;
	}

	return 0;
}

@@ -1563,22 +1567,39 @@ static void rbd_obj_request_end(struct rbd_obj_request *obj_request)
/*
 * Wait for an object request to complete.  If interrupted, cancel the
 * underlying osd request.
 *
 * @timeout: in jiffies, 0 means "wait forever"
 */
static int rbd_obj_request_wait(struct rbd_obj_request *obj_request)
static int __rbd_obj_request_wait(struct rbd_obj_request *obj_request,
				  unsigned long timeout)
{
	int ret;
	long ret;

	dout("%s %p\n", __func__, obj_request);

	ret = wait_for_completion_interruptible(&obj_request->completion);
	if (ret < 0) {
		dout("%s %p interrupted\n", __func__, obj_request);
	ret = wait_for_completion_interruptible_timeout(
					&obj_request->completion,
					ceph_timeout_jiffies(timeout));
	if (ret <= 0) {
		if (ret == 0)
			ret = -ETIMEDOUT;
		rbd_obj_request_end(obj_request);
	} else {
		ret = 0;
	}

	dout("%s %p ret %d\n", __func__, obj_request, (int)ret);
	return ret;
}

	dout("%s %p done\n", __func__, obj_request);
	return 0;
static int rbd_obj_request_wait(struct rbd_obj_request *obj_request)
{
	return __rbd_obj_request_wait(obj_request, 0);
}

static int rbd_obj_request_wait_timeout(struct rbd_obj_request *obj_request,
					unsigned long timeout)
{
	return __rbd_obj_request_wait(obj_request, timeout);
}

static void rbd_img_request_complete(struct rbd_img_request *img_request)
@@ -2001,11 +2022,11 @@ static struct rbd_obj_request *rbd_obj_request_create(const char *object_name,
	rbd_assert(obj_request_type_valid(type));

	size = strlen(object_name) + 1;
	name = kmalloc(size, GFP_KERNEL);
	name = kmalloc(size, GFP_NOIO);
	if (!name)
		return NULL;

	obj_request = kmem_cache_zalloc(rbd_obj_request_cache, GFP_KERNEL);
	obj_request = kmem_cache_zalloc(rbd_obj_request_cache, GFP_NOIO);
	if (!obj_request) {
		kfree(name);
		return NULL;
@@ -2376,7 +2397,7 @@ static void rbd_img_obj_request_fill(struct rbd_obj_request *obj_request,
	}

	if (opcode == CEPH_OSD_OP_DELETE)
		osd_req_op_init(osd_request, num_ops, opcode);
		osd_req_op_init(osd_request, num_ops, opcode, 0);
	else
		osd_req_op_extent_init(osd_request, num_ops, opcode,
				       offset, length, 0, 0);
@@ -2848,7 +2869,7 @@ static int rbd_img_obj_exists_submit(struct rbd_obj_request *obj_request)
		goto out;
	stat_request->callback = rbd_img_obj_exists_callback;

	osd_req_op_init(stat_request->osd_req, 0, CEPH_OSD_OP_STAT);
	osd_req_op_init(stat_request->osd_req, 0, CEPH_OSD_OP_STAT, 0);
	osd_req_op_raw_data_in_pages(stat_request->osd_req, 0, pages, size, 0,
					false, false);
	rbd_osd_req_format_read(stat_request);
@@ -3122,6 +3143,7 @@ static struct rbd_obj_request *rbd_obj_watch_request_helper(
						bool watch)
{
	struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
	struct ceph_options *opts = osdc->client->options;
	struct rbd_obj_request *obj_request;
	int ret;

@@ -3148,7 +3170,7 @@ static struct rbd_obj_request *rbd_obj_watch_request_helper(
	if (ret)
		goto out;

	ret = rbd_obj_request_wait(obj_request);
	ret = rbd_obj_request_wait_timeout(obj_request, opts->mount_timeout);
	if (ret)
		goto out;

@@ -3750,10 +3772,9 @@ static int rbd_init_disk(struct rbd_device *rbd_dev)

	memset(&rbd_dev->tag_set, 0, sizeof(rbd_dev->tag_set));
	rbd_dev->tag_set.ops = &rbd_mq_ops;
	rbd_dev->tag_set.queue_depth = BLKDEV_MAX_RQ;
	rbd_dev->tag_set.queue_depth = rbd_dev->opts->queue_depth;
	rbd_dev->tag_set.numa_node = NUMA_NO_NODE;
	rbd_dev->tag_set.flags =
		BLK_MQ_F_SHOULD_MERGE | BLK_MQ_F_SG_MERGE;
	rbd_dev->tag_set.flags = BLK_MQ_F_SHOULD_MERGE | BLK_MQ_F_SG_MERGE;
	rbd_dev->tag_set.nr_hw_queues = 1;
	rbd_dev->tag_set.cmd_size = sizeof(struct work_struct);

@@ -3773,6 +3794,7 @@ static int rbd_init_disk(struct rbd_device *rbd_dev)
	/* set io sizes to object size */
	segment_size = rbd_obj_bytes(&rbd_dev->header);
	blk_queue_max_hw_sectors(q, segment_size / SECTOR_SIZE);
	blk_queue_max_segments(q, segment_size / SECTOR_SIZE);
	blk_queue_max_segment_size(q, segment_size);
	blk_queue_io_min(q, segment_size);
	blk_queue_io_opt(q, segment_size);
@@ -4044,7 +4066,8 @@ static void rbd_spec_free(struct kref *kref)
}

static struct rbd_device *rbd_dev_create(struct rbd_client *rbdc,
				struct rbd_spec *spec)
					 struct rbd_spec *spec,
					 struct rbd_options *opts)
{
	struct rbd_device *rbd_dev;

@@ -4058,8 +4081,9 @@ static struct rbd_device *rbd_dev_create(struct rbd_client *rbdc,
	INIT_LIST_HEAD(&rbd_dev->node);
	init_rwsem(&rbd_dev->header_rwsem);

	rbd_dev->spec = spec;
	rbd_dev->rbd_client = rbdc;
	rbd_dev->spec = spec;
	rbd_dev->opts = opts;

	/* Initialize the layout used for all rbd requests */

@@ -4075,6 +4099,7 @@ static void rbd_dev_destroy(struct rbd_device *rbd_dev)
{
	rbd_put_client(rbd_dev->rbd_client);
	rbd_spec_put(rbd_dev->spec);
	kfree(rbd_dev->opts);
	kfree(rbd_dev);
}

@@ -4933,6 +4958,7 @@ static int rbd_add_parse_args(const char *buf,
		goto out_mem;

	rbd_opts->read_only = RBD_READ_ONLY_DEFAULT;
	rbd_opts->queue_depth = RBD_QUEUE_DEPTH_DEFAULT;

	copts = ceph_parse_options(options, mon_addrs,
					mon_addrs + mon_addrs_size - 1,
@@ -4963,8 +4989,8 @@ static int rbd_add_parse_args(const char *buf,
 */
static int rbd_add_get_pool_id(struct rbd_client *rbdc, const char *pool_name)
{
	struct ceph_options *opts = rbdc->client->options;
	u64 newest_epoch;
	unsigned long timeout = rbdc->client->options->mount_timeout * HZ;
	int tries = 0;
	int ret;

@@ -4979,7 +5005,8 @@ static int rbd_add_get_pool_id(struct rbd_client *rbdc, const char *pool_name)
		if (rbdc->client->osdc.osdmap->epoch < newest_epoch) {
			ceph_monc_request_next_osdmap(&rbdc->client->monc);
			(void) ceph_monc_wait_osdmap(&rbdc->client->monc,
						     newest_epoch, timeout);
						     newest_epoch,
						     opts->mount_timeout);
			goto again;
		} else {
			/* the osdmap we have is new enough */
@@ -5148,7 +5175,7 @@ static int rbd_dev_probe_parent(struct rbd_device *rbd_dev)
	rbdc = __rbd_get_client(rbd_dev->rbd_client);

	ret = -ENOMEM;
	parent = rbd_dev_create(rbdc, parent_spec);
	parent = rbd_dev_create(rbdc, parent_spec, NULL);
	if (!parent)
		goto out_err;

@@ -5394,9 +5421,6 @@ static ssize_t do_rbd_add(struct bus_type *bus,
	rc = rbd_add_parse_args(buf, &ceph_opts, &rbd_opts, &spec);
	if (rc < 0)
		goto err_out_module;
	read_only = rbd_opts->read_only;
	kfree(rbd_opts);
	rbd_opts = NULL;	/* done with this */

	rbdc = rbd_get_client(ceph_opts);
	if (IS_ERR(rbdc)) {
@@ -5422,11 +5446,12 @@ static ssize_t do_rbd_add(struct bus_type *bus,
		goto err_out_client;
	}

	rbd_dev = rbd_dev_create(rbdc, spec);
	rbd_dev = rbd_dev_create(rbdc, spec, rbd_opts);
	if (!rbd_dev)
		goto err_out_client;
	rbdc = NULL;		/* rbd_dev now owns this */
	spec = NULL;		/* rbd_dev now owns this */
	rbd_opts = NULL;	/* rbd_dev now owns this */

	rc = rbd_dev_image_probe(rbd_dev, true);
	if (rc < 0)
@@ -5434,6 +5459,7 @@ static ssize_t do_rbd_add(struct bus_type *bus,

	/* If we are mapping a snapshot it must be marked read-only */

	read_only = rbd_dev->opts->read_only;
	if (rbd_dev->spec->snap_id != CEPH_NOSNAP)
		read_only = true;
	rbd_dev->mapping.read_only = read_only;
@@ -5458,6 +5484,7 @@ static ssize_t do_rbd_add(struct bus_type *bus,
	rbd_put_client(rbdc);
err_out_args:
	rbd_spec_put(spec);
	kfree(rbd_opts);
err_out_module:
	module_put(THIS_MODULE);

+2 −2
Original line number Diff line number Diff line
@@ -187,10 +187,10 @@ int ceph_pre_init_acls(struct inode *dir, umode_t *mode,
		val_size2 = posix_acl_xattr_size(default_acl->a_count);

	err = -ENOMEM;
	tmp_buf = kmalloc(max(val_size1, val_size2), GFP_NOFS);
	tmp_buf = kmalloc(max(val_size1, val_size2), GFP_KERNEL);
	if (!tmp_buf)
		goto out_err;
	pagelist = kmalloc(sizeof(struct ceph_pagelist), GFP_NOFS);
	pagelist = kmalloc(sizeof(struct ceph_pagelist), GFP_KERNEL);
	if (!pagelist)
		goto out_err;
	ceph_pagelist_init(pagelist);
+261 −47
Original line number Diff line number Diff line
@@ -87,17 +87,21 @@ static int ceph_set_page_dirty(struct page *page)
	inode = mapping->host;
	ci = ceph_inode(inode);

	/*
	 * Note that we're grabbing a snapc ref here without holding
	 * any locks!
	 */
	snapc = ceph_get_snap_context(ci->i_snap_realm->cached_context);

	/* dirty the head */
	spin_lock(&ci->i_ceph_lock);
	if (ci->i_head_snapc == NULL)
		ci->i_head_snapc = ceph_get_snap_context(snapc);
	BUG_ON(ci->i_wr_ref == 0); // caller should hold Fw reference
	if (__ceph_have_pending_cap_snap(ci)) {
		struct ceph_cap_snap *capsnap =
				list_last_entry(&ci->i_cap_snaps,
						struct ceph_cap_snap,
						ci_item);
		snapc = ceph_get_snap_context(capsnap->context);
		capsnap->dirty_pages++;
	} else {
		BUG_ON(!ci->i_head_snapc);
		snapc = ceph_get_snap_context(ci->i_head_snapc);
		++ci->i_wrbuffer_ref_head;
	}
	if (ci->i_wrbuffer_ref == 0)
		ihold(inode);
	++ci->i_wrbuffer_ref;
@@ -346,7 +350,7 @@ static int start_read(struct inode *inode, struct list_head *page_list, int max)

	/* build page vector */
	nr_pages = calc_pages_for(0, len);
	pages = kmalloc(sizeof(*pages) * nr_pages, GFP_NOFS);
	pages = kmalloc(sizeof(*pages) * nr_pages, GFP_KERNEL);
	ret = -ENOMEM;
	if (!pages)
		goto out;
@@ -358,7 +362,7 @@ static int start_read(struct inode *inode, struct list_head *page_list, int max)
 		dout("start_read %p adding %p idx %lu\n", inode, page,
		     page->index);
		if (add_to_page_cache_lru(page, &inode->i_data, page->index,
					  GFP_NOFS)) {
					  GFP_KERNEL)) {
			ceph_fscache_uncache_page(inode, page);
			page_cache_release(page);
			dout("start_read %p add_to_page_cache failed %p\n",
@@ -436,7 +440,7 @@ static int ceph_readpages(struct file *file, struct address_space *mapping,
 * only snap context we are allowed to write back.
 */
static struct ceph_snap_context *get_oldest_context(struct inode *inode,
						    u64 *snap_size)
						    loff_t *snap_size)
{
	struct ceph_inode_info *ci = ceph_inode(inode);
	struct ceph_snap_context *snapc = NULL;
@@ -476,8 +480,9 @@ static int writepage_nounlock(struct page *page, struct writeback_control *wbc)
	struct ceph_osd_client *osdc;
	struct ceph_snap_context *snapc, *oldest;
	loff_t page_off = page_offset(page);
	loff_t snap_size = -1;
	long writeback_stat;
	u64 truncate_size, snap_size = 0;
	u64 truncate_size;
	u32 truncate_seq;
	int err = 0, len = PAGE_CACHE_SIZE;

@@ -512,7 +517,7 @@ static int writepage_nounlock(struct page *page, struct writeback_control *wbc)
	spin_lock(&ci->i_ceph_lock);
	truncate_seq = ci->i_truncate_seq;
	truncate_size = ci->i_truncate_size;
	if (!snap_size)
	if (snap_size == -1)
		snap_size = i_size_read(inode);
	spin_unlock(&ci->i_ceph_lock);

@@ -695,7 +700,8 @@ static int ceph_writepages_start(struct address_space *mapping,
	unsigned wsize = 1 << inode->i_blkbits;
	struct ceph_osd_request *req = NULL;
	int do_sync = 0;
	u64 truncate_size, snap_size;
	loff_t snap_size, i_size;
	u64 truncate_size;
	u32 truncate_seq;

	/*
@@ -741,7 +747,7 @@ static int ceph_writepages_start(struct address_space *mapping,
retry:
	/* find oldest snap context with dirty data */
	ceph_put_snap_context(snapc);
	snap_size = 0;
	snap_size = -1;
	snapc = get_oldest_context(inode, &snap_size);
	if (!snapc) {
		/* hmm, why does writepages get called when there
@@ -749,16 +755,13 @@ static int ceph_writepages_start(struct address_space *mapping,
		dout(" no snap context with dirty data?\n");
		goto out;
	}
	if (snap_size == 0)
		snap_size = i_size_read(inode);
	dout(" oldest snapc is %p seq %lld (%d snaps)\n",
	     snapc, snapc->seq, snapc->num_snaps);

	spin_lock(&ci->i_ceph_lock);
	truncate_seq = ci->i_truncate_seq;
	truncate_size = ci->i_truncate_size;
	if (!snap_size)
		snap_size = i_size_read(inode);
	i_size = i_size_read(inode);
	spin_unlock(&ci->i_ceph_lock);

	if (last_snapc && snapc != last_snapc) {
@@ -828,8 +831,10 @@ static int ceph_writepages_start(struct address_space *mapping,
				dout("waiting on writeback %p\n", page);
				wait_on_page_writeback(page);
			}
			if (page_offset(page) >= snap_size) {
				dout("%p page eof %llu\n", page, snap_size);
			if (page_offset(page) >=
			    (snap_size == -1 ? i_size : snap_size)) {
				dout("%p page eof %llu\n", page,
				     (snap_size == -1 ? i_size : snap_size));
				done = 1;
				unlock_page(page);
				break;
@@ -884,7 +889,8 @@ static int ceph_writepages_start(struct address_space *mapping,
				}

				if (do_sync)
					osd_req_op_init(req, 1, CEPH_OSD_OP_STARTSYNC);
					osd_req_op_init(req, 1,
							CEPH_OSD_OP_STARTSYNC, 0);

				req->r_callback = writepages_finish;
				req->r_inode = inode;
@@ -944,10 +950,18 @@ static int ceph_writepages_start(struct address_space *mapping,
		}

		/* Format the osd request message and submit the write */

		offset = page_offset(pages[0]);
		len = min(snap_size - offset,
			  (u64)locked_pages << PAGE_CACHE_SHIFT);
		len = (u64)locked_pages << PAGE_CACHE_SHIFT;
		if (snap_size == -1) {
			len = min(len, (u64)i_size_read(inode) - offset);
			 /* writepages_finish() clears writeback pages
			  * according to the data length, so make sure
			  * data length covers all locked pages */
			len = max(len, 1 +
				((u64)(locked_pages - 1) << PAGE_CACHE_SHIFT));
		} else {
			len = min(len, snap_size - offset);
		}
		dout("writepages got %d pages at %llu~%llu\n",
		     locked_pages, offset, len);

@@ -1032,7 +1046,6 @@ static int ceph_update_writeable_page(struct file *file,
{
	struct inode *inode = file_inode(file);
	struct ceph_inode_info *ci = ceph_inode(inode);
	struct ceph_mds_client *mdsc = ceph_inode_to_client(inode)->mdsc;
	loff_t page_off = pos & PAGE_CACHE_MASK;
	int pos_in_page = pos & ~PAGE_CACHE_MASK;
	int end_in_page = pos_in_page + len;
@@ -1044,10 +1057,6 @@ static int ceph_update_writeable_page(struct file *file,
	/* writepages currently holds page lock, but if we change that later, */
	wait_on_page_writeback(page);

	/* check snap context */
	BUG_ON(!ci->i_snap_realm);
	down_read(&mdsc->snap_rwsem);
	BUG_ON(!ci->i_snap_realm->cached_context);
	snapc = page_snap_context(page);
	if (snapc && snapc != ci->i_head_snapc) {
		/*
@@ -1055,7 +1064,6 @@ static int ceph_update_writeable_page(struct file *file,
		 * context!  is it writeable now?
		 */
		oldest = get_oldest_context(inode, NULL);
		up_read(&mdsc->snap_rwsem);

		if (snapc->seq > oldest->seq) {
			ceph_put_snap_context(oldest);
@@ -1112,7 +1120,6 @@ static int ceph_update_writeable_page(struct file *file,
	}

	/* we need to read it. */
	up_read(&mdsc->snap_rwsem);
	r = readpage_nounlock(file, page);
	if (r < 0)
		goto fail_nosnap;
@@ -1157,16 +1164,13 @@ static int ceph_write_begin(struct file *file, struct address_space *mapping,

/*
 * we don't do anything in here that simple_write_end doesn't do
 * except adjust dirty page accounting and drop read lock on
 * mdsc->snap_rwsem.
 * except adjust dirty page accounting
 */
static int ceph_write_end(struct file *file, struct address_space *mapping,
			  loff_t pos, unsigned len, unsigned copied,
			  struct page *page, void *fsdata)
{
	struct inode *inode = file_inode(file);
	struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
	struct ceph_mds_client *mdsc = fsc->mdsc;
	unsigned from = pos & (PAGE_CACHE_SIZE - 1);
	int check_cap = 0;

@@ -1188,7 +1192,6 @@ static int ceph_write_end(struct file *file, struct address_space *mapping,
	set_page_dirty(page);

	unlock_page(page);
	up_read(&mdsc->snap_rwsem);
	page_cache_release(page);

	if (check_cap)
@@ -1314,13 +1317,17 @@ static int ceph_page_mkwrite(struct vm_area_struct *vma, struct vm_fault *vmf)
	struct inode *inode = file_inode(vma->vm_file);
	struct ceph_inode_info *ci = ceph_inode(inode);
	struct ceph_file_info *fi = vma->vm_file->private_data;
	struct ceph_mds_client *mdsc = ceph_inode_to_client(inode)->mdsc;
	struct ceph_cap_flush *prealloc_cf;
	struct page *page = vmf->page;
	loff_t off = page_offset(page);
	loff_t size = i_size_read(inode);
	size_t len;
	int want, got, ret;

	prealloc_cf = ceph_alloc_cap_flush();
	if (!prealloc_cf)
		return VM_FAULT_SIGBUS;

	if (ci->i_inline_version != CEPH_INLINE_NONE) {
		struct page *locked_page = NULL;
		if (off == 0) {
@@ -1330,8 +1337,10 @@ static int ceph_page_mkwrite(struct vm_area_struct *vma, struct vm_fault *vmf)
		ret = ceph_uninline_data(vma->vm_file, locked_page);
		if (locked_page)
			unlock_page(locked_page);
		if (ret < 0)
			return VM_FAULT_SIGBUS;
		if (ret < 0) {
			ret = VM_FAULT_SIGBUS;
			goto out_free;
		}
	}

	if (off + PAGE_CACHE_SIZE <= size)
@@ -1353,7 +1362,8 @@ static int ceph_page_mkwrite(struct vm_area_struct *vma, struct vm_fault *vmf)
			break;
		if (ret != -ERESTARTSYS) {
			WARN_ON(1);
			return VM_FAULT_SIGBUS;
			ret = VM_FAULT_SIGBUS;
			goto out_free;
		}
	}
	dout("page_mkwrite %p %llu~%zd got cap refs on %s\n",
@@ -1373,7 +1383,6 @@ static int ceph_page_mkwrite(struct vm_area_struct *vma, struct vm_fault *vmf)
	if (ret == 0) {
		/* success.  we'll keep the page locked. */
		set_page_dirty(page);
		up_read(&mdsc->snap_rwsem);
		ret = VM_FAULT_LOCKED;
	} else {
		if (ret == -ENOMEM)
@@ -1389,7 +1398,8 @@ static int ceph_page_mkwrite(struct vm_area_struct *vma, struct vm_fault *vmf)
		int dirty;
		spin_lock(&ci->i_ceph_lock);
		ci->i_inline_version = CEPH_INLINE_NONE;
		dirty = __ceph_mark_dirty_caps(ci, CEPH_CAP_FILE_WR);
		dirty = __ceph_mark_dirty_caps(ci, CEPH_CAP_FILE_WR,
					       &prealloc_cf);
		spin_unlock(&ci->i_ceph_lock);
		if (dirty)
			__mark_inode_dirty(inode, dirty);
@@ -1398,6 +1408,8 @@ static int ceph_page_mkwrite(struct vm_area_struct *vma, struct vm_fault *vmf)
	dout("page_mkwrite %p %llu~%zd dropping cap refs on %s ret %d\n",
	     inode, off, len, ceph_cap_string(got), ret);
	ceph_put_cap_refs(ci, got);
out_free:
	ceph_free_cap_flush(prealloc_cf);

	return ret;
}
@@ -1509,8 +1521,7 @@ int ceph_uninline_data(struct file *filp, struct page *locked_page)
				    ceph_vino(inode), 0, &len, 0, 1,
				    CEPH_OSD_OP_CREATE,
				    CEPH_OSD_FLAG_ONDISK | CEPH_OSD_FLAG_WRITE,
				    ci->i_snap_realm->cached_context,
				    0, 0, false);
				    ceph_empty_snapc, 0, 0, false);
	if (IS_ERR(req)) {
		err = PTR_ERR(req);
		goto out;
@@ -1528,7 +1539,7 @@ int ceph_uninline_data(struct file *filp, struct page *locked_page)
				    ceph_vino(inode), 0, &len, 1, 3,
				    CEPH_OSD_OP_WRITE,
				    CEPH_OSD_FLAG_ONDISK | CEPH_OSD_FLAG_WRITE,
				    ci->i_snap_realm->cached_context,
				    ceph_empty_snapc,
				    ci->i_truncate_seq, ci->i_truncate_size,
				    false);
	if (IS_ERR(req)) {
@@ -1597,3 +1608,206 @@ int ceph_mmap(struct file *file, struct vm_area_struct *vma)
	vma->vm_ops = &ceph_vmops;
	return 0;
}

enum {
	POOL_READ	= 1,
	POOL_WRITE	= 2,
};

static int __ceph_pool_perm_get(struct ceph_inode_info *ci, u32 pool)
{
	struct ceph_fs_client *fsc = ceph_inode_to_client(&ci->vfs_inode);
	struct ceph_mds_client *mdsc = fsc->mdsc;
	struct ceph_osd_request *rd_req = NULL, *wr_req = NULL;
	struct rb_node **p, *parent;
	struct ceph_pool_perm *perm;
	struct page **pages;
	int err = 0, err2 = 0, have = 0;

	down_read(&mdsc->pool_perm_rwsem);
	p = &mdsc->pool_perm_tree.rb_node;
	while (*p) {
		perm = rb_entry(*p, struct ceph_pool_perm, node);
		if (pool < perm->pool)
			p = &(*p)->rb_left;
		else if (pool > perm->pool)
			p = &(*p)->rb_right;
		else {
			have = perm->perm;
			break;
		}
	}
	up_read(&mdsc->pool_perm_rwsem);
	if (*p)
		goto out;

	dout("__ceph_pool_perm_get pool %u no perm cached\n", pool);

	down_write(&mdsc->pool_perm_rwsem);
	parent = NULL;
	while (*p) {
		parent = *p;
		perm = rb_entry(parent, struct ceph_pool_perm, node);
		if (pool < perm->pool)
			p = &(*p)->rb_left;
		else if (pool > perm->pool)
			p = &(*p)->rb_right;
		else {
			have = perm->perm;
			break;
		}
	}
	if (*p) {
		up_write(&mdsc->pool_perm_rwsem);
		goto out;
	}

	rd_req = ceph_osdc_alloc_request(&fsc->client->osdc,
					 ceph_empty_snapc,
					 1, false, GFP_NOFS);
	if (!rd_req) {
		err = -ENOMEM;
		goto out_unlock;
	}

	rd_req->r_flags = CEPH_OSD_FLAG_READ;
	osd_req_op_init(rd_req, 0, CEPH_OSD_OP_STAT, 0);
	rd_req->r_base_oloc.pool = pool;
	snprintf(rd_req->r_base_oid.name, sizeof(rd_req->r_base_oid.name),
		 "%llx.00000000", ci->i_vino.ino);
	rd_req->r_base_oid.name_len = strlen(rd_req->r_base_oid.name);

	wr_req = ceph_osdc_alloc_request(&fsc->client->osdc,
					 ceph_empty_snapc,
					 1, false, GFP_NOFS);
	if (!wr_req) {
		err = -ENOMEM;
		goto out_unlock;
	}

	wr_req->r_flags = CEPH_OSD_FLAG_WRITE |
			  CEPH_OSD_FLAG_ACK | CEPH_OSD_FLAG_ONDISK;
	osd_req_op_init(wr_req, 0, CEPH_OSD_OP_CREATE, CEPH_OSD_OP_FLAG_EXCL);
	wr_req->r_base_oloc.pool = pool;
	wr_req->r_base_oid = rd_req->r_base_oid;

	/* one page should be large enough for STAT data */
	pages = ceph_alloc_page_vector(1, GFP_KERNEL);
	if (IS_ERR(pages)) {
		err = PTR_ERR(pages);
		goto out_unlock;
	}

	osd_req_op_raw_data_in_pages(rd_req, 0, pages, PAGE_SIZE,
				     0, false, true);
	ceph_osdc_build_request(rd_req, 0, NULL, CEPH_NOSNAP,
				&ci->vfs_inode.i_mtime);
	err = ceph_osdc_start_request(&fsc->client->osdc, rd_req, false);

	ceph_osdc_build_request(wr_req, 0, NULL, CEPH_NOSNAP,
				&ci->vfs_inode.i_mtime);
	err2 = ceph_osdc_start_request(&fsc->client->osdc, wr_req, false);

	if (!err)
		err = ceph_osdc_wait_request(&fsc->client->osdc, rd_req);
	if (!err2)
		err2 = ceph_osdc_wait_request(&fsc->client->osdc, wr_req);

	if (err >= 0 || err == -ENOENT)
		have |= POOL_READ;
	else if (err != -EPERM)
		goto out_unlock;

	if (err2 == 0 || err2 == -EEXIST)
		have |= POOL_WRITE;
	else if (err2 != -EPERM) {
		err = err2;
		goto out_unlock;
	}

	perm = kmalloc(sizeof(*perm), GFP_NOFS);
	if (!perm) {
		err = -ENOMEM;
		goto out_unlock;
	}

	perm->pool = pool;
	perm->perm = have;
	rb_link_node(&perm->node, parent, p);
	rb_insert_color(&perm->node, &mdsc->pool_perm_tree);
	err = 0;
out_unlock:
	up_write(&mdsc->pool_perm_rwsem);

	if (rd_req)
		ceph_osdc_put_request(rd_req);
	if (wr_req)
		ceph_osdc_put_request(wr_req);
out:
	if (!err)
		err = have;
	dout("__ceph_pool_perm_get pool %u result = %d\n", pool, err);
	return err;
}

int ceph_pool_perm_check(struct ceph_inode_info *ci, int need)
{
	u32 pool;
	int ret, flags;

	if (ceph_test_mount_opt(ceph_inode_to_client(&ci->vfs_inode),
				NOPOOLPERM))
		return 0;

	spin_lock(&ci->i_ceph_lock);
	flags = ci->i_ceph_flags;
	pool = ceph_file_layout_pg_pool(ci->i_layout);
	spin_unlock(&ci->i_ceph_lock);
check:
	if (flags & CEPH_I_POOL_PERM) {
		if ((need & CEPH_CAP_FILE_RD) && !(flags & CEPH_I_POOL_RD)) {
			dout("ceph_pool_perm_check pool %u no read perm\n",
			     pool);
			return -EPERM;
		}
		if ((need & CEPH_CAP_FILE_WR) && !(flags & CEPH_I_POOL_WR)) {
			dout("ceph_pool_perm_check pool %u no write perm\n",
			     pool);
			return -EPERM;
		}
		return 0;
	}

	ret = __ceph_pool_perm_get(ci, pool);
	if (ret < 0)
		return ret;

	flags = CEPH_I_POOL_PERM;
	if (ret & POOL_READ)
		flags |= CEPH_I_POOL_RD;
	if (ret & POOL_WRITE)
		flags |= CEPH_I_POOL_WR;

	spin_lock(&ci->i_ceph_lock);
	if (pool == ceph_file_layout_pg_pool(ci->i_layout)) {
		ci->i_ceph_flags = flags;
        } else {
		pool = ceph_file_layout_pg_pool(ci->i_layout);
		flags = ci->i_ceph_flags;
	}
	spin_unlock(&ci->i_ceph_lock);
	goto check;
}

void ceph_pool_perm_destroy(struct ceph_mds_client *mdsc)
{
	struct ceph_pool_perm *perm;
	struct rb_node *n;

	while (!RB_EMPTY_ROOT(&mdsc->pool_perm_tree)) {
		n = rb_first(&mdsc->pool_perm_tree);
		perm = rb_entry(n, struct ceph_pool_perm, node);
		rb_erase(n, &mdsc->pool_perm_tree);
		kfree(perm);
	}
}
+577 −259

File changed.

Preview size limit exceeded, changes collapsed.

+166 −217

File changed.

Preview size limit exceeded, changes collapsed.

Loading