Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit 9a5889ae authored by Linus Torvalds's avatar Linus Torvalds
Browse files
Pull Ceph updates from Sage Weil:
 "There is some follow-on RBD cleanup after the last window's code drop,
  a series from Yan fixing multi-mds behavior in cephfs, and then a
  sprinkling of bug fixes all around.  Some warnings, sleeping while
  atomic, a null dereference, and cleanups"

* 'for-linus' of git://git.kernel.org/pub/scm/linux/kernel/git/sage/ceph-client: (36 commits)
  libceph: fix invalid unsigned->signed conversion for timespec encoding
  libceph: call r_unsafe_callback when unsafe reply is received
  ceph: fix race between cap issue and revoke
  ceph: fix cap revoke race
  ceph: fix pending vmtruncate race
  ceph: avoid accessing invalid memory
  libceph: Fix NULL pointer dereference in auth client code
  ceph: Reconstruct the func ceph_reserve_caps.
  ceph: Free mdsc if alloc mdsc->mdsmap failed.
  ceph: remove sb_start/end_write in ceph_aio_write.
  ceph: avoid meaningless calling ceph_caps_revoking if sync_mode == WB_SYNC_ALL.
  ceph: fix sleeping function called from invalid context.
  ceph: move inode to proper flushing list when auth MDS changes
  rbd: fix a couple warnings
  ceph: clear migrate seq when MDS restarts
  ceph: check migrate seq before changing auth cap
  ceph: fix race between page writeback and truncate
  ceph: reset iov_len when discarding cap release messages
  ceph: fix cap release race
  libceph: fix truncate size calculation
  ...
parents e3a0dd98 8b8cf891
Loading
Loading
Loading
Loading
+90 −84
Original line number Diff line number Diff line
@@ -372,7 +372,7 @@ enum rbd_dev_flags {
	RBD_DEV_FLAG_REMOVING,	/* this mapping is being removed */
};

static DEFINE_MUTEX(ctl_mutex);	  /* Serialize open/close/setup/teardown */
static DEFINE_MUTEX(client_mutex);	/* Serialize client creation */

static LIST_HEAD(rbd_dev_list);    /* devices */
static DEFINE_SPINLOCK(rbd_dev_list_lock);
@@ -489,10 +489,8 @@ static int rbd_open(struct block_device *bdev, fmode_t mode)
	if (removing)
		return -ENOENT;

	mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
	(void) get_device(&rbd_dev->dev);
	set_device_ro(bdev, rbd_dev->mapping.read_only);
	mutex_unlock(&ctl_mutex);

	return 0;
}
@@ -507,9 +505,7 @@ static void rbd_release(struct gendisk *disk, fmode_t mode)
	spin_unlock_irq(&rbd_dev->lock);
	rbd_assert(open_count_before > 0);

	mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
	put_device(&rbd_dev->dev);
	mutex_unlock(&ctl_mutex);
}

static const struct block_device_operations rbd_bd_ops = {
@@ -520,7 +516,7 @@ static const struct block_device_operations rbd_bd_ops = {

/*
 * Initialize an rbd client instance.  Success or not, this function
 * consumes ceph_opts.
 * consumes ceph_opts.  Caller holds client_mutex.
 */
static struct rbd_client *rbd_client_create(struct ceph_options *ceph_opts)
{
@@ -535,30 +531,25 @@ static struct rbd_client *rbd_client_create(struct ceph_options *ceph_opts)
	kref_init(&rbdc->kref);
	INIT_LIST_HEAD(&rbdc->node);

	mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);

	rbdc->client = ceph_create_client(ceph_opts, rbdc, 0, 0);
	if (IS_ERR(rbdc->client))
		goto out_mutex;
		goto out_rbdc;
	ceph_opts = NULL; /* Now rbdc->client is responsible for ceph_opts */

	ret = ceph_open_session(rbdc->client);
	if (ret < 0)
		goto out_err;
		goto out_client;

	spin_lock(&rbd_client_list_lock);
	list_add_tail(&rbdc->node, &rbd_client_list);
	spin_unlock(&rbd_client_list_lock);

	mutex_unlock(&ctl_mutex);
	dout("%s: rbdc %p\n", __func__, rbdc);

	return rbdc;

out_err:
out_client:
	ceph_destroy_client(rbdc->client);
out_mutex:
	mutex_unlock(&ctl_mutex);
out_rbdc:
	kfree(rbdc);
out_opt:
	if (ceph_opts)
@@ -682,11 +673,13 @@ static struct rbd_client *rbd_get_client(struct ceph_options *ceph_opts)
{
	struct rbd_client *rbdc;

	mutex_lock_nested(&client_mutex, SINGLE_DEPTH_NESTING);
	rbdc = rbd_client_find(ceph_opts);
	if (rbdc)	/* using an existing client */
		ceph_destroy_options(ceph_opts);
	else
		rbdc = rbd_client_create(ceph_opts);
	mutex_unlock(&client_mutex);

	return rbdc;
}
@@ -840,7 +833,6 @@ static int rbd_header_from_disk(struct rbd_device *rbd_dev,

	/* We won't fail any more, fill in the header */

	down_write(&rbd_dev->header_rwsem);
	if (first_time) {
		header->object_prefix = object_prefix;
		header->obj_order = ondisk->options.order;
@@ -869,8 +861,6 @@ static int rbd_header_from_disk(struct rbd_device *rbd_dev,
		if (rbd_dev->mapping.size != header->image_size)
			rbd_dev->mapping.size = header->image_size;

	up_write(&rbd_dev->header_rwsem);

	return 0;
out_2big:
	ret = -EIO;
@@ -1126,6 +1116,7 @@ static void zero_bio_chain(struct bio *chain, int start_ofs)
				buf = bvec_kmap_irq(bv, &flags);
				memset(buf + remainder, 0,
				       bv->bv_len - remainder);
				flush_dcache_page(bv->bv_page);
				bvec_kunmap_irq(buf, &flags);
			}
			pos += bv->bv_len;
@@ -1153,11 +1144,12 @@ static void zero_pages(struct page **pages, u64 offset, u64 end)
		unsigned long flags;
		void *kaddr;

		page_offset = (size_t)(offset & ~PAGE_MASK);
		length = min(PAGE_SIZE - page_offset, (size_t)(end - offset));
		page_offset = offset & ~PAGE_MASK;
		length = min_t(size_t, PAGE_SIZE - page_offset, end - offset);
		local_irq_save(flags);
		kaddr = kmap_atomic(*page);
		memset(kaddr + page_offset, 0, length);
		flush_dcache_page(*page);
		kunmap_atomic(kaddr);
		local_irq_restore(flags);

@@ -2171,9 +2163,9 @@ static int rbd_img_request_fill(struct rbd_img_request *img_request,
	struct rbd_obj_request *obj_request = NULL;
	struct rbd_obj_request *next_obj_request;
	bool write_request = img_request_write_test(img_request);
	struct bio *bio_list;
	struct bio *bio_list = 0;
	unsigned int bio_offset = 0;
	struct page **pages;
	struct page **pages = 0;
	u64 img_offset;
	u64 resid;
	u16 opcode;
@@ -2535,6 +2527,7 @@ static void rbd_img_obj_exists_callback(struct rbd_obj_request *obj_request)
	 */
	orig_request = obj_request->obj_request;
	obj_request->obj_request = NULL;
	rbd_obj_request_put(orig_request);
	rbd_assert(orig_request);
	rbd_assert(orig_request->img_request);

@@ -2555,7 +2548,6 @@ static void rbd_img_obj_exists_callback(struct rbd_obj_request *obj_request)
	if (!rbd_dev->parent_overlap) {
		struct ceph_osd_client *osdc;

		rbd_obj_request_put(orig_request);
		osdc = &rbd_dev->rbd_client->client->osdc;
		result = rbd_obj_request_submit(osdc, orig_request);
		if (!result)
@@ -2585,7 +2577,6 @@ static void rbd_img_obj_exists_callback(struct rbd_obj_request *obj_request)
out:
	if (orig_request->result)
		rbd_obj_request_complete(orig_request);
	rbd_obj_request_put(orig_request);
}

static int rbd_img_obj_exists_submit(struct rbd_obj_request *obj_request)
@@ -2859,7 +2850,7 @@ static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
		(unsigned int)opcode);
	ret = rbd_dev_refresh(rbd_dev);
	if (ret)
		rbd_warn(rbd_dev, ": header refresh error (%d)\n", ret);
		rbd_warn(rbd_dev, "header refresh error (%d)\n", ret);

	rbd_obj_notify_ack(rbd_dev, notify_id);
}
@@ -3339,8 +3330,8 @@ static int rbd_dev_refresh(struct rbd_device *rbd_dev)
	int ret;

	rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
	down_write(&rbd_dev->header_rwsem);
	mapping_size = rbd_dev->mapping.size;
	mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
	if (rbd_dev->image_format == 1)
		ret = rbd_dev_v1_header_info(rbd_dev);
	else
@@ -3349,7 +3340,8 @@ static int rbd_dev_refresh(struct rbd_device *rbd_dev)
	/* If it's a mapped snapshot, validate its EXISTS flag */

	rbd_exists_validate(rbd_dev);
	mutex_unlock(&ctl_mutex);
	up_write(&rbd_dev->header_rwsem);

	if (mapping_size != rbd_dev->mapping.size) {
		sector_t size;

@@ -3813,6 +3805,7 @@ static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev)
	void *end;
	u64 pool_id;
	char *image_id;
	u64 snap_id;
	u64 overlap;
	int ret;

@@ -3872,24 +3865,56 @@ static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev)
			(unsigned long long)pool_id, U32_MAX);
		goto out_err;
	}
	parent_spec->pool_id = pool_id;

	image_id = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
	if (IS_ERR(image_id)) {
		ret = PTR_ERR(image_id);
		goto out_err;
	}
	parent_spec->image_id = image_id;
	ceph_decode_64_safe(&p, end, parent_spec->snap_id, out_err);
	ceph_decode_64_safe(&p, end, snap_id, out_err);
	ceph_decode_64_safe(&p, end, overlap, out_err);

	if (overlap) {
		rbd_spec_put(rbd_dev->parent_spec);
	/*
	 * The parent won't change (except when the clone is
	 * flattened, already handled that).  So we only need to
	 * record the parent spec we have not already done so.
	 */
	if (!rbd_dev->parent_spec) {
		parent_spec->pool_id = pool_id;
		parent_spec->image_id = image_id;
		parent_spec->snap_id = snap_id;
		rbd_dev->parent_spec = parent_spec;
		parent_spec = NULL;	/* rbd_dev now owns this */
	}

	/*
	 * We always update the parent overlap.  If it's zero we
	 * treat it specially.
	 */
	rbd_dev->parent_overlap = overlap;
	smp_mb();
	if (!overlap) {

		/* A null parent_spec indicates it's the initial probe */

		if (parent_spec) {
			/*
			 * The overlap has become zero, so the clone
			 * must have been resized down to 0 at some
			 * point.  Treat this the same as a flatten.
			 */
			rbd_dev_parent_put(rbd_dev);
			pr_info("%s: clone image now standalone\n",
				rbd_dev->disk->disk_name);
		} else {
		rbd_warn(rbd_dev, "ignoring parent of clone with overlap 0\n");
			/*
			 * For the initial probe, if we find the
			 * overlap is zero we just pretend there was
			 * no parent image.
			 */
			rbd_warn(rbd_dev, "ignoring parent of "
						"clone with overlap 0\n");
		}
	}
out:
	ret = 0;
@@ -4245,16 +4270,14 @@ static int rbd_dev_v2_header_info(struct rbd_device *rbd_dev)
	bool first_time = rbd_dev->header.object_prefix == NULL;
	int ret;

	down_write(&rbd_dev->header_rwsem);

	ret = rbd_dev_v2_image_size(rbd_dev);
	if (ret)
		goto out;
		return ret;

	if (first_time) {
		ret = rbd_dev_v2_header_onetime(rbd_dev);
		if (ret)
			goto out;
			return ret;
	}

	/*
@@ -4269,7 +4292,7 @@ static int rbd_dev_v2_header_info(struct rbd_device *rbd_dev)

		ret = rbd_dev_v2_parent_info(rbd_dev);
		if (ret)
			goto out;
			return ret;

		/*
		 * Print a warning if this is the initial probe and
@@ -4290,8 +4313,6 @@ static int rbd_dev_v2_header_info(struct rbd_device *rbd_dev)

	ret = rbd_dev_v2_snap_context(rbd_dev);
	dout("rbd_dev_v2_snap_context returned %d\n", ret);
out:
	up_write(&rbd_dev->header_rwsem);

	return ret;
}
@@ -4301,8 +4322,6 @@ static int rbd_bus_add_dev(struct rbd_device *rbd_dev)
	struct device *dev;
	int ret;

	mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);

	dev = &rbd_dev->dev;
	dev->bus = &rbd_bus_type;
	dev->type = &rbd_device_type;
@@ -4311,8 +4330,6 @@ static int rbd_bus_add_dev(struct rbd_device *rbd_dev)
	dev_set_name(dev, "%d", rbd_dev->dev_id);
	ret = device_register(dev);

	mutex_unlock(&ctl_mutex);

	return ret;
}

@@ -5059,23 +5076,6 @@ static ssize_t rbd_add(struct bus_type *bus,
	return (ssize_t)rc;
}

static struct rbd_device *__rbd_get_dev(unsigned long dev_id)
{
	struct list_head *tmp;
	struct rbd_device *rbd_dev;

	spin_lock(&rbd_dev_list_lock);
	list_for_each(tmp, &rbd_dev_list) {
		rbd_dev = list_entry(tmp, struct rbd_device, node);
		if (rbd_dev->dev_id == dev_id) {
			spin_unlock(&rbd_dev_list_lock);
			return rbd_dev;
		}
	}
	spin_unlock(&rbd_dev_list_lock);
	return NULL;
}

static void rbd_dev_device_release(struct device *dev)
{
	struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
@@ -5120,8 +5120,10 @@ static ssize_t rbd_remove(struct bus_type *bus,
			  size_t count)
{
	struct rbd_device *rbd_dev = NULL;
	int target_id;
	struct list_head *tmp;
	int dev_id;
	unsigned long ul;
	bool already = false;
	int ret;

	ret = strict_strtoul(buf, 10, &ul);
@@ -5129,37 +5131,40 @@ static ssize_t rbd_remove(struct bus_type *bus,
		return ret;

	/* convert to int; abort if we lost anything in the conversion */
	target_id = (int) ul;
	if (target_id != ul)
	dev_id = (int)ul;
	if (dev_id != ul)
		return -EINVAL;

	mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);

	rbd_dev = __rbd_get_dev(target_id);
	if (!rbd_dev) {
	ret = -ENOENT;
		goto done;
	spin_lock(&rbd_dev_list_lock);
	list_for_each(tmp, &rbd_dev_list) {
		rbd_dev = list_entry(tmp, struct rbd_device, node);
		if (rbd_dev->dev_id == dev_id) {
			ret = 0;
			break;
		}

	}
	if (!ret) {
		spin_lock_irq(&rbd_dev->lock);
		if (rbd_dev->open_count)
			ret = -EBUSY;
		else
		set_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags);
			already = test_and_set_bit(RBD_DEV_FLAG_REMOVING,
							&rbd_dev->flags);
		spin_unlock_irq(&rbd_dev->lock);
	if (ret < 0)
		goto done;
	}
	spin_unlock(&rbd_dev_list_lock);
	if (ret < 0 || already)
		return ret;

	rbd_bus_del_dev(rbd_dev);
	ret = rbd_dev_header_watch_sync(rbd_dev, false);
	if (ret)
		rbd_warn(rbd_dev, "failed to cancel watch event (%d)\n", ret);
	rbd_dev_image_release(rbd_dev);
	module_put(THIS_MODULE);
	ret = count;
done:
	mutex_unlock(&ctl_mutex);

	return ret;
	return count;
}

/*
@@ -5267,6 +5272,7 @@ static void __exit rbd_exit(void)
module_init(rbd_init);
module_exit(rbd_exit);

MODULE_AUTHOR("Alex Elder <elder@inktank.com>");
MODULE_AUTHOR("Sage Weil <sage@newdream.net>");
MODULE_AUTHOR("Yehuda Sadeh <yehuda@hq.newdream.net>");
MODULE_DESCRIPTION("rados block device");
+42 −46
Original line number Diff line number Diff line
@@ -439,13 +439,12 @@ static int writepage_nounlock(struct page *page, struct writeback_control *wbc)
	struct ceph_inode_info *ci;
	struct ceph_fs_client *fsc;
	struct ceph_osd_client *osdc;
	loff_t page_off = page_offset(page);
	int len = PAGE_CACHE_SIZE;
	loff_t i_size;
	int err = 0;
	struct ceph_snap_context *snapc, *oldest;
	u64 snap_size = 0;
	loff_t page_off = page_offset(page);
	long writeback_stat;
	u64 truncate_size, snap_size = 0;
	u32 truncate_seq;
	int err = 0, len = PAGE_CACHE_SIZE;

	dout("writepage %p idx %lu\n", page, page->index);

@@ -475,13 +474,20 @@ static int writepage_nounlock(struct page *page, struct writeback_control *wbc)
	}
	ceph_put_snap_context(oldest);

	spin_lock(&ci->i_ceph_lock);
	truncate_seq = ci->i_truncate_seq;
	truncate_size = ci->i_truncate_size;
	if (!snap_size)
		snap_size = i_size_read(inode);
	spin_unlock(&ci->i_ceph_lock);

	/* is this a partial page at end of file? */
	if (snap_size)
		i_size = snap_size;
	else
		i_size = i_size_read(inode);
	if (i_size < page_off + len)
		len = i_size - page_off;
	if (page_off >= snap_size) {
		dout("%p page eof %llu\n", page, snap_size);
		goto out;
	}
	if (snap_size < page_off + len)
		len = snap_size - page_off;

	dout("writepage %p page %p index %lu on %llu~%u snapc %p\n",
	     inode, page, page->index, page_off, len, snapc);
@@ -495,7 +501,7 @@ static int writepage_nounlock(struct page *page, struct writeback_control *wbc)
	err = ceph_osdc_writepages(osdc, ceph_vino(inode),
				   &ci->i_layout, snapc,
				   page_off, len,
				   ci->i_truncate_seq, ci->i_truncate_size,
				   truncate_seq, truncate_size,
				   &inode->i_mtime, &page, 1);
	if (err < 0) {
		dout("writepage setting page/mapping error %d %p\n", err, page);
@@ -632,25 +638,6 @@ static void writepages_finish(struct ceph_osd_request *req,
	ceph_osdc_put_request(req);
}

static struct ceph_osd_request *
ceph_writepages_osd_request(struct inode *inode, u64 offset, u64 *len,
				struct ceph_snap_context *snapc, int num_ops)
{
	struct ceph_fs_client *fsc;
	struct ceph_inode_info *ci;
	struct ceph_vino vino;

	fsc = ceph_inode_to_client(inode);
	ci = ceph_inode(inode);
	vino = ceph_vino(inode);
	/* BUG_ON(vino.snap != CEPH_NOSNAP); */

	return ceph_osdc_new_request(&fsc->client->osdc, &ci->i_layout,
			vino, offset, len, num_ops, CEPH_OSD_OP_WRITE,
			CEPH_OSD_FLAG_WRITE|CEPH_OSD_FLAG_ONDISK,
			snapc, ci->i_truncate_seq, ci->i_truncate_size, true);
}

/*
 * initiate async writeback
 */
@@ -659,7 +646,8 @@ static int ceph_writepages_start(struct address_space *mapping,
{
	struct inode *inode = mapping->host;
	struct ceph_inode_info *ci = ceph_inode(inode);
	struct ceph_fs_client *fsc;
	struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
	struct ceph_vino vino = ceph_vino(inode);
	pgoff_t index, start, end;
	int range_whole = 0;
	int should_loop = 1;
@@ -671,22 +659,22 @@ static int ceph_writepages_start(struct address_space *mapping,
	unsigned wsize = 1 << inode->i_blkbits;
	struct ceph_osd_request *req = NULL;
	int do_sync;
	u64 snap_size;
	u64 truncate_size, snap_size;
	u32 truncate_seq;

	/*
	 * Include a 'sync' in the OSD request if this is a data
	 * integrity write (e.g., O_SYNC write or fsync()), or if our
	 * cap is being revoked.
	 */
	do_sync = wbc->sync_mode == WB_SYNC_ALL;
	if (ceph_caps_revoking(ci, CEPH_CAP_FILE_BUFFER))
	if ((wbc->sync_mode == WB_SYNC_ALL) ||
		ceph_caps_revoking(ci, CEPH_CAP_FILE_BUFFER))
		do_sync = 1;
	dout("writepages_start %p dosync=%d (mode=%s)\n",
	     inode, do_sync,
	     wbc->sync_mode == WB_SYNC_NONE ? "NONE" :
	     (wbc->sync_mode == WB_SYNC_ALL ? "ALL" : "HOLD"));

	fsc = ceph_inode_to_client(inode);
	if (fsc->mount_state == CEPH_MOUNT_SHUTDOWN) {
		pr_warning("writepage_start %p on forced umount\n", inode);
		return -EIO; /* we're in a forced umount, don't write! */
@@ -729,6 +717,14 @@ static int ceph_writepages_start(struct address_space *mapping,
		snap_size = i_size_read(inode);
	dout(" oldest snapc is %p seq %lld (%d snaps)\n",
	     snapc, snapc->seq, snapc->num_snaps);

	spin_lock(&ci->i_ceph_lock);
	truncate_seq = ci->i_truncate_seq;
	truncate_size = ci->i_truncate_size;
	if (!snap_size)
		snap_size = i_size_read(inode);
	spin_unlock(&ci->i_ceph_lock);

	if (last_snapc && snapc != last_snapc) {
		/* if we switched to a newer snapc, restart our scan at the
		 * start of the original file range. */
@@ -740,7 +736,6 @@ static int ceph_writepages_start(struct address_space *mapping,

	while (!done && index <= end) {
		int num_ops = do_sync ? 2 : 1;
		struct ceph_vino vino;
		unsigned i;
		int first;
		pgoff_t next;
@@ -834,17 +829,18 @@ static int ceph_writepages_start(struct address_space *mapping,
			 * that it will use.
			 */
			if (locked_pages == 0) {
				size_t size;

				BUG_ON(pages);

				/* prepare async write request */
				offset = (u64)page_offset(page);
				len = wsize;
				req = ceph_writepages_osd_request(inode,
							offset, &len, snapc,
							num_ops);

				req = ceph_osdc_new_request(&fsc->client->osdc,
							&ci->i_layout, vino,
							offset, &len, num_ops,
							CEPH_OSD_OP_WRITE,
							CEPH_OSD_FLAG_WRITE |
							CEPH_OSD_FLAG_ONDISK,
							snapc, truncate_seq,
							truncate_size, true);
				if (IS_ERR(req)) {
					rc = PTR_ERR(req);
					unlock_page(page);
@@ -855,8 +851,8 @@ static int ceph_writepages_start(struct address_space *mapping,
				req->r_inode = inode;

				max_pages = calc_pages_for(0, (u64)len);
				size = max_pages * sizeof (*pages);
				pages = kmalloc(size, GFP_NOFS);
				pages = kmalloc(max_pages * sizeof (*pages),
						GFP_NOFS);
				if (!pages) {
					pool = fsc->wb_pagevec_pool;
					pages = mempool_alloc(pool, GFP_NOFS);
+63 −39
Original line number Diff line number Diff line
@@ -147,7 +147,7 @@ void ceph_adjust_min_caps(struct ceph_mds_client *mdsc, int delta)
	spin_unlock(&mdsc->caps_list_lock);
}

int ceph_reserve_caps(struct ceph_mds_client *mdsc,
void ceph_reserve_caps(struct ceph_mds_client *mdsc,
		      struct ceph_cap_reservation *ctx, int need)
{
	int i;
@@ -155,7 +155,6 @@ int ceph_reserve_caps(struct ceph_mds_client *mdsc,
	int have;
	int alloc = 0;
	LIST_HEAD(newcaps);
	int ret = 0;

	dout("reserve caps ctx=%p need=%d\n", ctx, need);

@@ -174,14 +173,15 @@ int ceph_reserve_caps(struct ceph_mds_client *mdsc,

	for (i = have; i < need; i++) {
		cap = kmem_cache_alloc(ceph_cap_cachep, GFP_NOFS);
		if (!cap) {
			ret = -ENOMEM;
			goto out_alloc_count;
		}
		if (!cap)
			break;
		list_add(&cap->caps_item, &newcaps);
		alloc++;
	}
	BUG_ON(have + alloc != need);
	/* we didn't manage to reserve as much as we needed */
	if (have + alloc != need)
		pr_warn("reserve caps ctx=%p ENOMEM need=%d got=%d\n",
			ctx, need, have + alloc);

	spin_lock(&mdsc->caps_list_lock);
	mdsc->caps_total_count += alloc;
@@ -197,13 +197,6 @@ int ceph_reserve_caps(struct ceph_mds_client *mdsc,
	dout("reserve caps ctx=%p %d = %d used + %d resv + %d avail\n",
	     ctx, mdsc->caps_total_count, mdsc->caps_use_count,
	     mdsc->caps_reserve_count, mdsc->caps_avail_count);
	return 0;

out_alloc_count:
	/* we didn't manage to reserve as much as we needed */
	pr_warning("reserve caps ctx=%p ENOMEM need=%d got=%d\n",
		   ctx, need, have);
	return ret;
}

int ceph_unreserve_caps(struct ceph_mds_client *mdsc,
@@ -612,9 +605,11 @@ int ceph_add_cap(struct inode *inode,
		__cap_delay_requeue(mdsc, ci);
	}

	if (flags & CEPH_CAP_FLAG_AUTH)
	if (flags & CEPH_CAP_FLAG_AUTH) {
		if (ci->i_auth_cap == NULL ||
		    ceph_seq_cmp(ci->i_auth_cap->mseq, mseq) < 0)
			ci->i_auth_cap = cap;
	else if (ci->i_auth_cap == cap) {
	} else if (ci->i_auth_cap == cap) {
		ci->i_auth_cap = NULL;
		spin_lock(&mdsc->cap_dirty_lock);
		if (!list_empty(&ci->i_dirty_item)) {
@@ -695,6 +690,15 @@ int __ceph_caps_issued(struct ceph_inode_info *ci, int *implemented)
		if (implemented)
			*implemented |= cap->implemented;
	}
	/*
	 * exclude caps issued by non-auth MDS, but are been revoking
	 * by the auth MDS. The non-auth MDS should be revoking/exporting
	 * these caps, but the message is delayed.
	 */
	if (ci->i_auth_cap) {
		cap = ci->i_auth_cap;
		have &= ~cap->implemented | cap->issued;
	}
	return have;
}

@@ -802,22 +806,28 @@ int __ceph_caps_issued_mask(struct ceph_inode_info *ci, int mask, int touch)
/*
 * Return true if mask caps are currently being revoked by an MDS.
 */
int ceph_caps_revoking(struct ceph_inode_info *ci, int mask)
int __ceph_caps_revoking_other(struct ceph_inode_info *ci,
			       struct ceph_cap *ocap, int mask)
{
	struct inode *inode = &ci->vfs_inode;
	struct ceph_cap *cap;
	struct rb_node *p;
	int ret = 0;

	spin_lock(&ci->i_ceph_lock);
	for (p = rb_first(&ci->i_caps); p; p = rb_next(p)) {
		cap = rb_entry(p, struct ceph_cap, ci_node);
		if (__cap_is_valid(cap) &&
		    (cap->implemented & ~cap->issued & mask)) {
			ret = 1;
			break;
		if (cap != ocap && __cap_is_valid(cap) &&
		    (cap->implemented & ~cap->issued & mask))
			return 1;
	}
	return 0;
}

int ceph_caps_revoking(struct ceph_inode_info *ci, int mask)
{
	struct inode *inode = &ci->vfs_inode;
	int ret;

	spin_lock(&ci->i_ceph_lock);
	ret = __ceph_caps_revoking_other(ci, NULL, mask);
	spin_unlock(&ci->i_ceph_lock);
	dout("ceph_caps_revoking %p %s = %d\n", inode,
	     ceph_cap_string(mask), ret);
@@ -1980,8 +1990,15 @@ static void kick_flushing_inode_caps(struct ceph_mds_client *mdsc,
	cap = ci->i_auth_cap;
	dout("kick_flushing_inode_caps %p flushing %s flush_seq %lld\n", inode,
	     ceph_cap_string(ci->i_flushing_caps), ci->i_cap_flush_seq);

	__ceph_flush_snaps(ci, &session, 1);

	if (ci->i_flushing_caps) {
		spin_lock(&mdsc->cap_dirty_lock);
		list_move_tail(&ci->i_flushing_item,
			       &cap->session->s_cap_flushing);
		spin_unlock(&mdsc->cap_dirty_lock);

		delayed = __send_cap(mdsc, cap, CEPH_CAP_OP_FLUSH,
				     __ceph_caps_used(ci),
				     __ceph_caps_wanted(ci),
@@ -2055,7 +2072,11 @@ static int try_get_cap_refs(struct ceph_inode_info *ci, int need, int want,
	/* finish pending truncate */
	while (ci->i_truncate_pending) {
		spin_unlock(&ci->i_ceph_lock);
		__ceph_do_pending_vmtruncate(inode, !(need & CEPH_CAP_FILE_WR));
		if (!(need & CEPH_CAP_FILE_WR))
			mutex_lock(&inode->i_mutex);
		__ceph_do_pending_vmtruncate(inode);
		if (!(need & CEPH_CAP_FILE_WR))
			mutex_unlock(&inode->i_mutex);
		spin_lock(&ci->i_ceph_lock);
	}

@@ -2473,6 +2494,11 @@ static void handle_cap_grant(struct inode *inode, struct ceph_mds_caps *grant,
	} else {
		dout("grant: %s -> %s\n", ceph_cap_string(cap->issued),
		     ceph_cap_string(newcaps));
		/* non-auth MDS is revoking the newly grant caps ? */
		if (cap == ci->i_auth_cap &&
		    __ceph_caps_revoking_other(ci, cap, newcaps))
		    check_caps = 2;

		cap->issued = newcaps;
		cap->implemented |= newcaps; /* add bits only, to
					      * avoid stepping on a
@@ -3042,21 +3068,19 @@ int ceph_encode_inode_release(void **p, struct inode *inode,
		     (cap->issued & unless) == 0)) {
			if ((cap->issued & drop) &&
			    (cap->issued & unless) == 0) {
				dout("encode_inode_release %p cap %p %s -> "
				     "%s\n", inode, cap,
				     ceph_cap_string(cap->issued),
				     ceph_cap_string(cap->issued & ~drop));
				cap->issued &= ~drop;
				cap->implemented &= ~drop;
				if (ci->i_ceph_flags & CEPH_I_NODELAY) {
				int wanted = __ceph_caps_wanted(ci);
					dout("  wanted %s -> %s (act %s)\n",
				if ((ci->i_ceph_flags & CEPH_I_NODELAY) == 0)
					wanted |= cap->mds_wanted;
				dout("encode_inode_release %p cap %p "
				     "%s -> %s, wanted %s -> %s\n", inode, cap,
				     ceph_cap_string(cap->issued),
				     ceph_cap_string(cap->issued & ~drop),
				     ceph_cap_string(cap->mds_wanted),
					     ceph_cap_string(cap->mds_wanted &
							     ~wanted),
				     ceph_cap_string(wanted));
					cap->mds_wanted &= wanted;
				}

				cap->issued &= ~drop;
				cap->implemented &= ~drop;
				cap->mds_wanted = wanted;
			} else {
				dout("encode_inode_release %p cap %p %s"
				     " (force)\n", inode, cap,
+1 −3
Original line number Diff line number Diff line
@@ -716,7 +716,6 @@ static ssize_t ceph_aio_write(struct kiocb *iocb, const struct iovec *iov,
	if (ceph_snap(inode) != CEPH_NOSNAP)
		return -EROFS;

	sb_start_write(inode->i_sb);
	mutex_lock(&inode->i_mutex);
	hold_mutex = true;

@@ -809,7 +808,6 @@ static ssize_t ceph_aio_write(struct kiocb *iocb, const struct iovec *iov,
out:
	if (hold_mutex)
		mutex_unlock(&inode->i_mutex);
	sb_end_write(inode->i_sb);
	current->backing_dev_info = NULL;

	return written ? written : err;
@@ -824,7 +822,7 @@ static loff_t ceph_llseek(struct file *file, loff_t offset, int whence)
	int ret;

	mutex_lock(&inode->i_mutex);
	__ceph_do_pending_vmtruncate(inode, false);
	__ceph_do_pending_vmtruncate(inode);

	if (whence == SEEK_END || whence == SEEK_DATA || whence == SEEK_HOLE) {
		ret = ceph_do_getattr(inode, CEPH_STAT_CAP_SIZE);
+6 −8
Original line number Diff line number Diff line
@@ -1465,7 +1465,9 @@ static void ceph_vmtruncate_work(struct work_struct *work)
	struct inode *inode = &ci->vfs_inode;

	dout("vmtruncate_work %p\n", inode);
	__ceph_do_pending_vmtruncate(inode, true);
	mutex_lock(&inode->i_mutex);
	__ceph_do_pending_vmtruncate(inode);
	mutex_unlock(&inode->i_mutex);
	iput(inode);
}

@@ -1492,7 +1494,7 @@ void ceph_queue_vmtruncate(struct inode *inode)
 * Make sure any pending truncation is applied before doing anything
 * that may depend on it.
 */
void __ceph_do_pending_vmtruncate(struct inode *inode, bool needlock)
void __ceph_do_pending_vmtruncate(struct inode *inode)
{
	struct ceph_inode_info *ci = ceph_inode(inode);
	u64 to;
@@ -1525,11 +1527,7 @@ void __ceph_do_pending_vmtruncate(struct inode *inode, bool needlock)
	     ci->i_truncate_pending, to);
	spin_unlock(&ci->i_ceph_lock);

	if (needlock)
		mutex_lock(&inode->i_mutex);
	truncate_inode_pages(inode->i_mapping, to);
	if (needlock)
		mutex_unlock(&inode->i_mutex);

	spin_lock(&ci->i_ceph_lock);
	if (to == ci->i_truncate_size) {
@@ -1588,7 +1586,7 @@ int ceph_setattr(struct dentry *dentry, struct iattr *attr)
	if (ceph_snap(inode) != CEPH_NOSNAP)
		return -EROFS;

	__ceph_do_pending_vmtruncate(inode, false);
	__ceph_do_pending_vmtruncate(inode);

	err = inode_change_ok(inode, attr);
	if (err != 0)
@@ -1770,7 +1768,7 @@ int ceph_setattr(struct dentry *dentry, struct iattr *attr)
	     ceph_cap_string(dirtied), mask);

	ceph_mdsc_put_request(req);
	__ceph_do_pending_vmtruncate(inode, false);
	__ceph_do_pending_vmtruncate(inode);
	return err;
out:
	spin_unlock(&ci->i_ceph_lock);
Loading