Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit cdb897e3 authored by Linus Torvalds's avatar Linus Torvalds
Browse files

Merge tag 'ceph-for-4.14-rc1' of git://github.com/ceph/ceph-client

Pull ceph updates from Ilya Dryomov:
 "The highlights include:

   - a large series of fixes and improvements to the snapshot-handling
     code (Zheng Yan)

   - individual read/write OSD requests passed down to libceph are now
     limited to 16M in size to avoid hitting OSD-side limits (Zheng Yan)

   - encode MStatfs v2 message to allow for more accurate space usage
     reporting (Douglas Fuller)

   - switch to the new writeback error tracking infrastructure (Jeff
     Layton)"

* tag 'ceph-for-4.14-rc1' of git://github.com/ceph/ceph-client: (35 commits)
  ceph: stop on-going cached readdir if mds revokes FILE_SHARED cap
  ceph: wait on writeback after writing snapshot data
  ceph: fix capsnap dirty pages accounting
  ceph: ignore wbc->range_{start,end} when write back snapshot data
  ceph: fix "range cyclic" mode writepages
  ceph: cleanup local variables in ceph_writepages_start()
  ceph: optimize pagevec iterating in ceph_writepages_start()
  ceph: make writepage_nounlock() invalidate page that beyonds EOF
  ceph: properly get capsnap's size in get_oldest_context()
  ceph: remove stale check in ceph_invalidatepage()
  ceph: queue cap snap only when snap realm's context changes
  ceph: handle race between vmtruncate and queuing cap snap
  ceph: fix message order check in handle_cap_export()
  ceph: fix NULL pointer dereference in ceph_flush_snaps()
  ceph: adjust 36 checks for NULL pointers
  ceph: delete an unnecessary return statement in update_dentry_lease()
  ceph: ENOMEM pr_err in __get_or_create_frag() is redundant
  ceph: check negative offsets in ceph_llseek()
  ceph: more accurate statfs
  ceph: properly set snap follows for cap reconnect
  ...
parents b31ff3cd 15b51bd6
Loading
Loading
Loading
Loading
+1 −1
Original line number Original line Diff line number Diff line
@@ -3435,7 +3435,7 @@ static void rbd_acquire_lock(struct work_struct *work)
	struct rbd_device *rbd_dev = container_of(to_delayed_work(work),
	struct rbd_device *rbd_dev = container_of(to_delayed_work(work),
					    struct rbd_device, lock_dwork);
					    struct rbd_device, lock_dwork);
	enum rbd_lock_state lock_state;
	enum rbd_lock_state lock_state;
	int ret;
	int ret = 0;


	dout("%s rbd_dev %p\n", __func__, rbd_dev);
	dout("%s rbd_dev %p\n", __func__, rbd_dev);
again:
again:
+224 −179
Original line number Original line Diff line number Diff line
@@ -152,17 +152,10 @@ static void ceph_invalidatepage(struct page *page, unsigned int offset,


	ceph_invalidate_fscache_page(inode, page);
	ceph_invalidate_fscache_page(inode, page);


	WARN_ON(!PageLocked(page));
	if (!PagePrivate(page))
	if (!PagePrivate(page))
		return;
		return;


	/*
	 * We can get non-dirty pages here due to races between
	 * set_page_dirty and truncate_complete_page; just spit out a
	 * warning, in case we end up with accounting problems later.
	 */
	if (!PageDirty(page))
		pr_err("%p invalidatepage %p page not dirty\n", inode, page);

	ClearPageChecked(page);
	ClearPageChecked(page);


	dout("%p invalidatepage %p idx %lu full dirty page\n",
	dout("%p invalidatepage %p idx %lu full dirty page\n",
@@ -455,13 +448,9 @@ static int ceph_readpages(struct file *file, struct address_space *mapping,
	if (rc == 0)
	if (rc == 0)
		goto out;
		goto out;


	if (fsc->mount_options->rsize >= PAGE_SIZE)
	max = fsc->mount_options->rsize >> PAGE_SHIFT;
		max = (fsc->mount_options->rsize + PAGE_SIZE - 1)
	dout("readpages %p file %p nr_pages %d max %d\n",
			>> PAGE_SHIFT;
	     inode, file, nr_pages, max);

	dout("readpages %p file %p nr_pages %d max %d\n", inode,
		file, nr_pages,
	     max);
	while (!list_empty(page_list)) {
	while (!list_empty(page_list)) {
		rc = start_read(inode, page_list, max);
		rc = start_read(inode, page_list, max);
		if (rc < 0)
		if (rc < 0)
@@ -474,14 +463,22 @@ static int ceph_readpages(struct file *file, struct address_space *mapping,
	return rc;
	return rc;
}
}


struct ceph_writeback_ctl
{
	loff_t i_size;
	u64 truncate_size;
	u32 truncate_seq;
	bool size_stable;
	bool head_snapc;
};

/*
/*
 * Get ref for the oldest snapc for an inode with dirty data... that is, the
 * Get ref for the oldest snapc for an inode with dirty data... that is, the
 * only snap context we are allowed to write back.
 * only snap context we are allowed to write back.
 */
 */
static struct ceph_snap_context *get_oldest_context(struct inode *inode,
static struct ceph_snap_context *
						    loff_t *snap_size,
get_oldest_context(struct inode *inode, struct ceph_writeback_ctl *ctl,
						    u64 *truncate_size,
		   struct ceph_snap_context *page_snapc)
						    u32 *truncate_seq)
{
{
	struct ceph_inode_info *ci = ceph_inode(inode);
	struct ceph_inode_info *ci = ceph_inode(inode);
	struct ceph_snap_context *snapc = NULL;
	struct ceph_snap_context *snapc = NULL;
@@ -491,30 +488,78 @@ static struct ceph_snap_context *get_oldest_context(struct inode *inode,
	list_for_each_entry(capsnap, &ci->i_cap_snaps, ci_item) {
	list_for_each_entry(capsnap, &ci->i_cap_snaps, ci_item) {
		dout(" cap_snap %p snapc %p has %d dirty pages\n", capsnap,
		dout(" cap_snap %p snapc %p has %d dirty pages\n", capsnap,
		     capsnap->context, capsnap->dirty_pages);
		     capsnap->context, capsnap->dirty_pages);
		if (capsnap->dirty_pages) {
		if (!capsnap->dirty_pages)
			continue;

		/* get i_size, truncate_{seq,size} for page_snapc? */
		if (snapc && capsnap->context != page_snapc)
			continue;

		if (ctl) {
			if (capsnap->writing) {
				ctl->i_size = i_size_read(inode);
				ctl->size_stable = false;
			} else {
				ctl->i_size = capsnap->size;
				ctl->size_stable = true;
			}
			ctl->truncate_size = capsnap->truncate_size;
			ctl->truncate_seq = capsnap->truncate_seq;
			ctl->head_snapc = false;
		}

		if (snapc)
			break;

		snapc = ceph_get_snap_context(capsnap->context);
		snapc = ceph_get_snap_context(capsnap->context);
			if (snap_size)
		if (!page_snapc ||
				*snap_size = capsnap->size;
		    page_snapc == snapc ||
			if (truncate_size)
		    page_snapc->seq > snapc->seq)
				*truncate_size = capsnap->truncate_size;
			if (truncate_seq)
				*truncate_seq = capsnap->truncate_seq;
			break;
			break;
	}
	}
	}
	if (!snapc && ci->i_wrbuffer_ref_head) {
	if (!snapc && ci->i_wrbuffer_ref_head) {
		snapc = ceph_get_snap_context(ci->i_head_snapc);
		snapc = ceph_get_snap_context(ci->i_head_snapc);
		dout(" head snapc %p has %d dirty pages\n",
		dout(" head snapc %p has %d dirty pages\n",
		     snapc, ci->i_wrbuffer_ref_head);
		     snapc, ci->i_wrbuffer_ref_head);
		if (truncate_size)
		if (ctl) {
			*truncate_size = ci->i_truncate_size;
			ctl->i_size = i_size_read(inode);
		if (truncate_seq)
			ctl->truncate_size = ci->i_truncate_size;
			*truncate_seq = ci->i_truncate_seq;
			ctl->truncate_seq = ci->i_truncate_seq;
			ctl->size_stable = false;
			ctl->head_snapc = true;
		}
	}
	}
	spin_unlock(&ci->i_ceph_lock);
	spin_unlock(&ci->i_ceph_lock);
	return snapc;
	return snapc;
}
}


static u64 get_writepages_data_length(struct inode *inode,
				      struct page *page, u64 start)
{
	struct ceph_inode_info *ci = ceph_inode(inode);
	struct ceph_snap_context *snapc = page_snap_context(page);
	struct ceph_cap_snap *capsnap = NULL;
	u64 end = i_size_read(inode);

	if (snapc != ci->i_head_snapc) {
		bool found = false;
		spin_lock(&ci->i_ceph_lock);
		list_for_each_entry(capsnap, &ci->i_cap_snaps, ci_item) {
			if (capsnap->context == snapc) {
				if (!capsnap->writing)
					end = capsnap->size;
				found = true;
				break;
			}
		}
		spin_unlock(&ci->i_ceph_lock);
		WARN_ON(!found);
	}
	if (end > page_offset(page) + PAGE_SIZE)
		end = page_offset(page) + PAGE_SIZE;
	return end > start ? end - start : 0;
}

/*
/*
 * Write a single page, but leave the page locked.
 * Write a single page, but leave the page locked.
 *
 *
@@ -526,30 +571,25 @@ static int writepage_nounlock(struct page *page, struct writeback_control *wbc)
	struct inode *inode;
	struct inode *inode;
	struct ceph_inode_info *ci;
	struct ceph_inode_info *ci;
	struct ceph_fs_client *fsc;
	struct ceph_fs_client *fsc;
	struct ceph_osd_client *osdc;
	struct ceph_snap_context *snapc, *oldest;
	struct ceph_snap_context *snapc, *oldest;
	loff_t page_off = page_offset(page);
	loff_t page_off = page_offset(page);
	loff_t snap_size = -1;
	long writeback_stat;
	long writeback_stat;
	u64 truncate_size;
	u32 truncate_seq;
	int err, len = PAGE_SIZE;
	int err, len = PAGE_SIZE;
	struct ceph_writeback_ctl ceph_wbc;


	dout("writepage %p idx %lu\n", page, page->index);
	dout("writepage %p idx %lu\n", page, page->index);


	inode = page->mapping->host;
	inode = page->mapping->host;
	ci = ceph_inode(inode);
	ci = ceph_inode(inode);
	fsc = ceph_inode_to_client(inode);
	fsc = ceph_inode_to_client(inode);
	osdc = &fsc->client->osdc;


	/* verify this is a writeable snap context */
	/* verify this is a writeable snap context */
	snapc = page_snap_context(page);
	snapc = page_snap_context(page);
	if (snapc == NULL) {
	if (!snapc) {
		dout("writepage %p page %p not dirty?\n", inode, page);
		dout("writepage %p page %p not dirty?\n", inode, page);
		return 0;
		return 0;
	}
	}
	oldest = get_oldest_context(inode, &snap_size,
	oldest = get_oldest_context(inode, &ceph_wbc, snapc);
				    &truncate_size, &truncate_seq);
	if (snapc->seq > oldest->seq) {
	if (snapc->seq > oldest->seq) {
		dout("writepage %p page %p snapc %p not writeable - noop\n",
		dout("writepage %p page %p snapc %p not writeable - noop\n",
		     inode, page, snapc);
		     inode, page, snapc);
@@ -561,20 +601,18 @@ static int writepage_nounlock(struct page *page, struct writeback_control *wbc)
	}
	}
	ceph_put_snap_context(oldest);
	ceph_put_snap_context(oldest);


	if (snap_size == -1)
		snap_size = i_size_read(inode);

	/* is this a partial page at end of file? */
	/* is this a partial page at end of file? */
	if (page_off >= snap_size) {
	if (page_off >= ceph_wbc.i_size) {
		dout("%p page eof %llu\n", page, snap_size);
		dout("%p page eof %llu\n", page, ceph_wbc.i_size);
		page->mapping->a_ops->invalidatepage(page, 0, PAGE_SIZE);
		return 0;
		return 0;
	}
	}


	if (snap_size < page_off + len)
	if (ceph_wbc.i_size < page_off + len)
		len = snap_size - page_off;
		len = ceph_wbc.i_size - page_off;


	dout("writepage %p page %p index %lu on %llu~%u snapc %p\n",
	dout("writepage %p page %p index %lu on %llu~%u snapc %p seq %lld\n",
	     inode, page, page->index, page_off, len, snapc);
	     inode, page, page->index, page_off, len, snapc, snapc->seq);


	writeback_stat = atomic_long_inc_return(&fsc->writeback_count);
	writeback_stat = atomic_long_inc_return(&fsc->writeback_count);
	if (writeback_stat >
	if (writeback_stat >
@@ -582,10 +620,10 @@ static int writepage_nounlock(struct page *page, struct writeback_control *wbc)
		set_bdi_congested(inode_to_bdi(inode), BLK_RW_ASYNC);
		set_bdi_congested(inode_to_bdi(inode), BLK_RW_ASYNC);


	set_page_writeback(page);
	set_page_writeback(page);
	err = ceph_osdc_writepages(osdc, ceph_vino(inode),
	err = ceph_osdc_writepages(&fsc->client->osdc, ceph_vino(inode),
				   &ci->i_layout, snapc,
				   &ci->i_layout, snapc, page_off, len,
				   page_off, len,
				   ceph_wbc.truncate_seq,
				   truncate_seq, truncate_size,
				   ceph_wbc.truncate_size,
				   &inode->i_mtime, &page, 1);
				   &inode->i_mtime, &page, 1);
	if (err < 0) {
	if (err < 0) {
		struct writeback_control tmp_wbc;
		struct writeback_control tmp_wbc;
@@ -746,31 +784,17 @@ static int ceph_writepages_start(struct address_space *mapping,
	struct ceph_inode_info *ci = ceph_inode(inode);
	struct ceph_inode_info *ci = ceph_inode(inode);
	struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
	struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
	struct ceph_vino vino = ceph_vino(inode);
	struct ceph_vino vino = ceph_vino(inode);
	pgoff_t index, start, end;
	pgoff_t index, start_index, end = -1;
	int range_whole = 0;
	int should_loop = 1;
	pgoff_t max_pages = 0, max_pages_ever = 0;
	struct ceph_snap_context *snapc = NULL, *last_snapc = NULL, *pgsnapc;
	struct ceph_snap_context *snapc = NULL, *last_snapc = NULL, *pgsnapc;
	struct pagevec pvec;
	struct pagevec pvec;
	int done = 0;
	int rc = 0;
	int rc = 0;
	unsigned int wsize = i_blocksize(inode);
	unsigned int wsize = i_blocksize(inode);
	struct ceph_osd_request *req = NULL;
	struct ceph_osd_request *req = NULL;
	int do_sync = 0;
	struct ceph_writeback_ctl ceph_wbc;
	loff_t snap_size, i_size;
	bool should_loop, range_whole = false;
	u64 truncate_size;
	bool stop, done = false;
	u32 truncate_seq;


	/*
	dout("writepages_start %p (mode=%s)\n", inode,
	 * Include a 'sync' in the OSD request if this is a data
	 * integrity write (e.g., O_SYNC write or fsync()), or if our
	 * cap is being revoked.
	 */
	if ((wbc->sync_mode == WB_SYNC_ALL) ||
		ceph_caps_revoking(ci, CEPH_CAP_FILE_BUFFER))
		do_sync = 1;
	dout("writepages_start %p dosync=%d (mode=%s)\n",
	     inode, do_sync,
	     wbc->sync_mode == WB_SYNC_NONE ? "NONE" :
	     wbc->sync_mode == WB_SYNC_NONE ? "NONE" :
	     (wbc->sync_mode == WB_SYNC_ALL ? "ALL" : "HOLD"));
	     (wbc->sync_mode == WB_SYNC_ALL ? "ALL" : "HOLD"));


@@ -783,35 +807,17 @@ static int ceph_writepages_start(struct address_space *mapping,
		mapping_set_error(mapping, -EIO);
		mapping_set_error(mapping, -EIO);
		return -EIO; /* we're in a forced umount, don't write! */
		return -EIO; /* we're in a forced umount, don't write! */
	}
	}
	if (fsc->mount_options->wsize && fsc->mount_options->wsize < wsize)
	if (fsc->mount_options->wsize < wsize)
		wsize = fsc->mount_options->wsize;
		wsize = fsc->mount_options->wsize;
	if (wsize < PAGE_SIZE)
		wsize = PAGE_SIZE;
	max_pages_ever = wsize >> PAGE_SHIFT;


	pagevec_init(&pvec, 0);
	pagevec_init(&pvec, 0);


	/* where to start/end? */
	start_index = wbc->range_cyclic ? mapping->writeback_index : 0;
	if (wbc->range_cyclic) {
	index = start_index;
		start = mapping->writeback_index; /* Start from prev offset */
		end = -1;
		dout(" cyclic, start at %lu\n", start);
	} else {
		start = wbc->range_start >> PAGE_SHIFT;
		end = wbc->range_end >> PAGE_SHIFT;
		if (wbc->range_start == 0 && wbc->range_end == LLONG_MAX)
			range_whole = 1;
		should_loop = 0;
		dout(" not cyclic, %lu to %lu\n", start, end);
	}
	index = start;


retry:
retry:
	/* find oldest snap context with dirty data */
	/* find oldest snap context with dirty data */
	ceph_put_snap_context(snapc);
	snapc = get_oldest_context(inode, &ceph_wbc, NULL);
	snap_size = -1;
	snapc = get_oldest_context(inode, &snap_size,
				   &truncate_size, &truncate_seq);
	if (!snapc) {
	if (!snapc) {
		/* hmm, why does writepages get called when there
		/* hmm, why does writepages get called when there
		   is no dirty data? */
		   is no dirty data? */
@@ -821,40 +827,56 @@ static int ceph_writepages_start(struct address_space *mapping,
	dout(" oldest snapc is %p seq %lld (%d snaps)\n",
	dout(" oldest snapc is %p seq %lld (%d snaps)\n",
	     snapc, snapc->seq, snapc->num_snaps);
	     snapc, snapc->seq, snapc->num_snaps);


	i_size = i_size_read(inode);
	should_loop = false;

	if (ceph_wbc.head_snapc && snapc != last_snapc) {
	if (last_snapc && snapc != last_snapc) {
		/* where to start/end? */
		/* if we switched to a newer snapc, restart our scan at the
		if (wbc->range_cyclic) {
		 * start of the original file range. */
			index = start_index;
		dout("  snapc differs from last pass, restarting at %lu\n",
			end = -1;
		     index);
			if (index > 0)
		index = start;
				should_loop = true;
			dout(" cyclic, start at %lu\n", index);
		} else {
			index = wbc->range_start >> PAGE_SHIFT;
			end = wbc->range_end >> PAGE_SHIFT;
			if (wbc->range_start == 0 && wbc->range_end == LLONG_MAX)
				range_whole = true;
			dout(" not cyclic, %lu to %lu\n", index, end);
		}
	} else if (!ceph_wbc.head_snapc) {
		/* Do not respect wbc->range_{start,end}. Dirty pages
		 * in that range can be associated with newer snapc.
		 * They are not writeable until we write all dirty pages
		 * associated with 'snapc' get written */
		if (index > 0 || wbc->sync_mode != WB_SYNC_NONE)
			should_loop = true;
		dout(" non-head snapc, range whole\n");
	}
	}

	ceph_put_snap_context(last_snapc);
	last_snapc = snapc;
	last_snapc = snapc;


	while (!done && index <= end) {
	stop = false;
		unsigned i;
	while (!stop && index <= end) {
		int first;
		pgoff_t strip_unit_end = 0;
		int num_ops = 0, op_idx;
		int num_ops = 0, op_idx;
		int pvec_pages, locked_pages = 0;
		unsigned i, pvec_pages, max_pages, locked_pages = 0;
		struct page **pages = NULL, **data_pages;
		struct page **pages = NULL, **data_pages;
		mempool_t *pool = NULL;	/* Becomes non-null if mempool used */
		mempool_t *pool = NULL;	/* Becomes non-null if mempool used */
		struct page *page;
		struct page *page;
		int want;
		pgoff_t strip_unit_end = 0;
		u64 offset = 0, len = 0;
		u64 offset = 0, len = 0;


		max_pages = max_pages_ever;
		max_pages = wsize >> PAGE_SHIFT;


get_more_pages:
get_more_pages:
		first = -1;
		pvec_pages = min_t(unsigned, PAGEVEC_SIZE,
		want = min(end - index,
				   max_pages - locked_pages);
			   min((pgoff_t)PAGEVEC_SIZE,
		if (end - index < (u64)(pvec_pages - 1))
			       max_pages - (pgoff_t)locked_pages) - 1)
			pvec_pages = (unsigned)(end - index) + 1;
			+ 1;

		pvec_pages = pagevec_lookup_tag(&pvec, mapping, &index,
		pvec_pages = pagevec_lookup_tag(&pvec, mapping, &index,
						PAGECACHE_TAG_DIRTY,
						PAGECACHE_TAG_DIRTY,
						want);
						pvec_pages);
		dout("pagevec_lookup_tag got %d\n", pvec_pages);
		dout("pagevec_lookup_tag got %d\n", pvec_pages);
		if (!pvec_pages && !locked_pages)
		if (!pvec_pages && !locked_pages)
			break;
			break;
@@ -871,11 +893,15 @@ static int ceph_writepages_start(struct address_space *mapping,
			    unlikely(page->mapping != mapping)) {
			    unlikely(page->mapping != mapping)) {
				dout("!dirty or !mapping %p\n", page);
				dout("!dirty or !mapping %p\n", page);
				unlock_page(page);
				unlock_page(page);
				break;
				continue;
			}
			}
			if (!wbc->range_cyclic && page->index > end) {
			if (page->index > end) {
				dout("end of range %p\n", page);
				dout("end of range %p\n", page);
				done = 1;
				/* can't be range_cyclic (1st pass) because
				 * end == -1 in that case. */
				stop = true;
				if (ceph_wbc.head_snapc)
					done = true;
				unlock_page(page);
				unlock_page(page);
				break;
				break;
			}
			}
@@ -884,39 +910,37 @@ static int ceph_writepages_start(struct address_space *mapping,
				unlock_page(page);
				unlock_page(page);
				break;
				break;
			}
			}
			if (wbc->sync_mode != WB_SYNC_NONE) {
			if (page_offset(page) >= ceph_wbc.i_size) {
				dout("waiting on writeback %p\n", page);
				dout("%p page eof %llu\n",
				wait_on_page_writeback(page);
				     page, ceph_wbc.i_size);
			}
				/* not done if range_cyclic */
			if (page_offset(page) >=
				stop = true;
			    (snap_size == -1 ? i_size : snap_size)) {
				dout("%p page eof %llu\n", page,
				     (snap_size == -1 ? i_size : snap_size));
				done = 1;
				unlock_page(page);
				unlock_page(page);
				break;
				break;
			}
			}
			if (PageWriteback(page)) {
			if (PageWriteback(page)) {
				if (wbc->sync_mode == WB_SYNC_NONE) {
					dout("%p under writeback\n", page);
					dout("%p under writeback\n", page);
					unlock_page(page);
					unlock_page(page);
				break;
					continue;
				}
				dout("waiting on writeback %p\n", page);
				wait_on_page_writeback(page);
			}
			}


			/* only if matching snap context */
			/* only if matching snap context */
			pgsnapc = page_snap_context(page);
			pgsnapc = page_snap_context(page);
			if (pgsnapc->seq > snapc->seq) {
			if (pgsnapc != snapc) {
				dout("page snapc %p %lld > oldest %p %lld\n",
				dout("page snapc %p %lld != oldest %p %lld\n",
				     pgsnapc, pgsnapc->seq, snapc, snapc->seq);
				     pgsnapc, pgsnapc->seq, snapc, snapc->seq);
				unlock_page(page);
				unlock_page(page);
				if (!locked_pages)
				continue;
					continue; /* keep looking for snap */
				break;
			}
			}


			if (!clear_page_dirty_for_io(page)) {
			if (!clear_page_dirty_for_io(page)) {
				dout("%p !clear_page_dirty_for_io\n", page);
				dout("%p !clear_page_dirty_for_io\n", page);
				unlock_page(page);
				unlock_page(page);
				break;
				continue;
			}
			}


			/*
			/*
@@ -942,7 +966,7 @@ static int ceph_writepages_start(struct address_space *mapping,
					break;
					break;
				}
				}


				num_ops = 1 + do_sync;
				num_ops = 1;
				strip_unit_end = page->index +
				strip_unit_end = page->index +
					((len - 1) >> PAGE_SHIFT);
					((len - 1) >> PAGE_SHIFT);


@@ -972,8 +996,6 @@ static int ceph_writepages_start(struct address_space *mapping,
			}
			}


			/* note position of first page in pvec */
			/* note position of first page in pvec */
			if (first < 0)
				first = i;
			dout("%p will write page %p idx %lu\n",
			dout("%p will write page %p idx %lu\n",
			     inode, page, page->index);
			     inode, page, page->index);


@@ -984,8 +1006,10 @@ static int ceph_writepages_start(struct address_space *mapping,
						  BLK_RW_ASYNC);
						  BLK_RW_ASYNC);
			}
			}


			pages[locked_pages] = page;

			locked_pages++;
			pages[locked_pages++] = page;
			pvec.pages[i] = NULL;

			len += PAGE_SIZE;
			len += PAGE_SIZE;
		}
		}


@@ -993,23 +1017,23 @@ static int ceph_writepages_start(struct address_space *mapping,
		if (!locked_pages)
		if (!locked_pages)
			goto release_pvec_pages;
			goto release_pvec_pages;
		if (i) {
		if (i) {
			int j;
			unsigned j, n = 0;
			BUG_ON(!locked_pages || first < 0);
			/* shift unused page to beginning of pvec */
			for (j = 0; j < pvec_pages; j++) {
				if (!pvec.pages[j])
					continue;
				if (n < j)
					pvec.pages[n] = pvec.pages[j];
				n++;
			}
			pvec.nr = n;


			if (pvec_pages && i == pvec_pages &&
			if (pvec_pages && i == pvec_pages &&
			    locked_pages < max_pages) {
			    locked_pages < max_pages) {
				dout("reached end pvec, trying for more\n");
				dout("reached end pvec, trying for more\n");
				pagevec_reinit(&pvec);
				pagevec_release(&pvec);
				goto get_more_pages;
				goto get_more_pages;
			}
			}

			/* shift unused pages over in the pvec...  we
			 * will need to release them below. */
			for (j = i; j < pvec_pages; j++) {
				dout(" pvec leftover page %p\n", pvec.pages[j]);
				pvec.pages[j-i+first] = pvec.pages[j];
			}
			pvec.nr -= i-first;
		}
		}


new_request:
new_request:
@@ -1019,10 +1043,9 @@ static int ceph_writepages_start(struct address_space *mapping,
		req = ceph_osdc_new_request(&fsc->client->osdc,
		req = ceph_osdc_new_request(&fsc->client->osdc,
					&ci->i_layout, vino,
					&ci->i_layout, vino,
					offset, &len, 0, num_ops,
					offset, &len, 0, num_ops,
					CEPH_OSD_OP_WRITE,
					CEPH_OSD_OP_WRITE, CEPH_OSD_FLAG_WRITE,
					CEPH_OSD_FLAG_WRITE,
					snapc, ceph_wbc.truncate_seq,
					snapc, truncate_seq,
					ceph_wbc.truncate_size, false);
					truncate_size, false);
		if (IS_ERR(req)) {
		if (IS_ERR(req)) {
			req = ceph_osdc_new_request(&fsc->client->osdc,
			req = ceph_osdc_new_request(&fsc->client->osdc,
						&ci->i_layout, vino,
						&ci->i_layout, vino,
@@ -1031,8 +1054,8 @@ static int ceph_writepages_start(struct address_space *mapping,
						    CEPH_OSD_SLAB_OPS),
						    CEPH_OSD_SLAB_OPS),
						CEPH_OSD_OP_WRITE,
						CEPH_OSD_OP_WRITE,
						CEPH_OSD_FLAG_WRITE,
						CEPH_OSD_FLAG_WRITE,
						snapc, truncate_seq,
						snapc, ceph_wbc.truncate_seq,
						truncate_size, true);
						ceph_wbc.truncate_size, true);
			BUG_ON(IS_ERR(req));
			BUG_ON(IS_ERR(req));
		}
		}
		BUG_ON(len < page_offset(pages[locked_pages - 1]) +
		BUG_ON(len < page_offset(pages[locked_pages - 1]) +
@@ -1048,7 +1071,7 @@ static int ceph_writepages_start(struct address_space *mapping,
		for (i = 0; i < locked_pages; i++) {
		for (i = 0; i < locked_pages; i++) {
			u64 cur_offset = page_offset(pages[i]);
			u64 cur_offset = page_offset(pages[i]);
			if (offset + len != cur_offset) {
			if (offset + len != cur_offset) {
				if (op_idx + do_sync + 1 == req->r_num_ops)
				if (op_idx + 1 == req->r_num_ops)
					break;
					break;
				osd_req_op_extent_dup_last(req, op_idx,
				osd_req_op_extent_dup_last(req, op_idx,
							   cur_offset - offset);
							   cur_offset - offset);
@@ -1069,14 +1092,15 @@ static int ceph_writepages_start(struct address_space *mapping,
			len += PAGE_SIZE;
			len += PAGE_SIZE;
		}
		}


		if (snap_size != -1) {
		if (ceph_wbc.size_stable) {
			len = min(len, snap_size - offset);
			len = min(len, ceph_wbc.i_size - offset);
		} else if (i == locked_pages) {
		} else if (i == locked_pages) {
			/* writepages_finish() clears writeback pages
			/* writepages_finish() clears writeback pages
			 * according to the data length, so make sure
			 * according to the data length, so make sure
			 * data length covers all locked pages */
			 * data length covers all locked pages */
			u64 min_len = len + 1 - PAGE_SIZE;
			u64 min_len = len + 1 - PAGE_SIZE;
			len = min(len, (u64)i_size_read(inode) - offset);
			len = get_writepages_data_length(inode, pages[i - 1],
							 offset);
			len = max(len, min_len);
			len = max(len, min_len);
		}
		}
		dout("writepages got pages at %llu~%llu\n", offset, len);
		dout("writepages got pages at %llu~%llu\n", offset, len);
@@ -1085,17 +1109,12 @@ static int ceph_writepages_start(struct address_space *mapping,
						 0, !!pool, false);
						 0, !!pool, false);
		osd_req_op_extent_update(req, op_idx, len);
		osd_req_op_extent_update(req, op_idx, len);


		if (do_sync) {
			op_idx++;
			osd_req_op_init(req, op_idx, CEPH_OSD_OP_STARTSYNC, 0);
		}
		BUG_ON(op_idx + 1 != req->r_num_ops);
		BUG_ON(op_idx + 1 != req->r_num_ops);


		pool = NULL;
		pool = NULL;
		if (i < locked_pages) {
		if (i < locked_pages) {
			BUG_ON(num_ops <= req->r_num_ops);
			BUG_ON(num_ops <= req->r_num_ops);
			num_ops -= req->r_num_ops;
			num_ops -= req->r_num_ops;
			num_ops += do_sync;
			locked_pages -= i;
			locked_pages -= i;


			/* allocate new pages array for next request */
			/* allocate new pages array for next request */
@@ -1127,22 +1146,50 @@ static int ceph_writepages_start(struct address_space *mapping,
		if (pages)
		if (pages)
			goto new_request;
			goto new_request;


		if (wbc->nr_to_write <= 0)
		/*
			done = 1;
		 * We stop writing back only if we are not doing
		 * integrity sync. In case of integrity sync we have to
		 * keep going until we have written all the pages
		 * we tagged for writeback prior to entering this loop.
		 */
		if (wbc->nr_to_write <= 0 && wbc->sync_mode == WB_SYNC_NONE)
			done = stop = true;


release_pvec_pages:
release_pvec_pages:
		dout("pagevec_release on %d pages (%p)\n", (int)pvec.nr,
		dout("pagevec_release on %d pages (%p)\n", (int)pvec.nr,
		     pvec.nr ? pvec.pages[0] : NULL);
		     pvec.nr ? pvec.pages[0] : NULL);
		pagevec_release(&pvec);
		pagevec_release(&pvec);

		if (locked_pages && !done)
			goto retry;
	}
	}


	if (should_loop && !done) {
	if (should_loop && !done) {
		/* more to do; loop back to beginning of file */
		/* more to do; loop back to beginning of file */
		dout("writepages looping back to beginning of file\n");
		dout("writepages looping back to beginning of file\n");
		should_loop = 0;
		end = start_index - 1; /* OK even when start_index == 0 */

		/* to write dirty pages associated with next snapc,
		 * we need to wait until current writes complete */
		if (wbc->sync_mode != WB_SYNC_NONE &&
		    start_index == 0 && /* all dirty pages were checked */
		    !ceph_wbc.head_snapc) {
			struct page *page;
			unsigned i, nr;
			index = 0;
			while ((index <= end) &&
			       (nr = pagevec_lookup_tag(&pvec, mapping, &index,
							PAGECACHE_TAG_WRITEBACK,
							PAGEVEC_SIZE))) {
				for (i = 0; i < nr; i++) {
					page = pvec.pages[i];
					if (page_snap_context(page) != snapc)
						continue;
					wait_on_page_writeback(page);
				}
				pagevec_release(&pvec);
				cond_resched();
			}
		}

		start_index = 0;
		index = 0;
		index = 0;
		goto retry;
		goto retry;
	}
	}
@@ -1152,8 +1199,8 @@ static int ceph_writepages_start(struct address_space *mapping,


out:
out:
	ceph_osdc_put_request(req);
	ceph_osdc_put_request(req);
	ceph_put_snap_context(snapc);
	ceph_put_snap_context(last_snapc);
	dout("writepages done, rc = %d\n", rc);
	dout("writepages dend - startone, rc = %d\n", rc);
	return rc;
	return rc;
}
}


@@ -1165,8 +1212,7 @@ static int ceph_writepages_start(struct address_space *mapping,
static int context_is_writeable_or_written(struct inode *inode,
static int context_is_writeable_or_written(struct inode *inode,
					   struct ceph_snap_context *snapc)
					   struct ceph_snap_context *snapc)
{
{
	struct ceph_snap_context *oldest = get_oldest_context(inode, NULL,
	struct ceph_snap_context *oldest = get_oldest_context(inode, NULL, NULL);
							      NULL, NULL);
	int ret = !oldest || snapc->seq <= oldest->seq;
	int ret = !oldest || snapc->seq <= oldest->seq;


	ceph_put_snap_context(oldest);
	ceph_put_snap_context(oldest);
@@ -1211,8 +1257,7 @@ static int ceph_update_writeable_page(struct file *file,
		 * this page is already dirty in another (older) snap
		 * this page is already dirty in another (older) snap
		 * context!  is it writeable now?
		 * context!  is it writeable now?
		 */
		 */
		oldest = get_oldest_context(inode, NULL, NULL, NULL);
		oldest = get_oldest_context(inode, NULL, NULL);

		if (snapc->seq > oldest->seq) {
		if (snapc->seq > oldest->seq) {
			ceph_put_snap_context(oldest);
			ceph_put_snap_context(oldest);
			dout(" page %p snapc %p not current or oldest\n",
			dout(" page %p snapc %p not current or oldest\n",
+1 −1
Original line number Original line Diff line number Diff line
@@ -209,7 +209,7 @@ void ceph_fscache_register_inode_cookie(struct inode *inode)
	struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
	struct ceph_fs_client *fsc = ceph_inode_to_client(inode);


	/* No caching for filesystem */
	/* No caching for filesystem */
	if (fsc->fscache == NULL)
	if (!fsc->fscache)
		return;
		return;


	/* Only cache for regular files that are read only */
	/* Only cache for regular files that are read only */
+23 −17
Original line number Original line Diff line number Diff line
@@ -490,12 +490,13 @@ static void __check_cap_issue(struct ceph_inode_info *ci, struct ceph_cap *cap,
	}
	}


	/*
	/*
	 * if we are newly issued FILE_SHARED, mark dir not complete; we
	 * If FILE_SHARED is newly issued, mark dir not complete. We don't
	 * don't know what happened to this directory while we didn't
	 * know what happened to this directory while we didn't have the cap.
	 * have the cap.
	 * If FILE_SHARED is being revoked, also mark dir not complete. It
	 * stops on-going cached readdir.
	 */
	 */
	if ((issued & CEPH_CAP_FILE_SHARED) &&
	if ((issued & CEPH_CAP_FILE_SHARED) != (had & CEPH_CAP_FILE_SHARED)) {
	    (had & CEPH_CAP_FILE_SHARED) == 0) {
		if (issued & CEPH_CAP_FILE_SHARED)
			ci->i_shared_gen++;
			ci->i_shared_gen++;
		if (S_ISDIR(ci->vfs_inode.i_mode)) {
		if (S_ISDIR(ci->vfs_inode.i_mode)) {
			dout(" marking %p NOT complete\n", &ci->vfs_inode);
			dout(" marking %p NOT complete\n", &ci->vfs_inode);
@@ -611,7 +612,7 @@ void ceph_add_cap(struct inode *inode,
	}
	}


	if (flags & CEPH_CAP_FLAG_AUTH) {
	if (flags & CEPH_CAP_FLAG_AUTH) {
		if (ci->i_auth_cap == NULL ||
		if (!ci->i_auth_cap ||
		    ceph_seq_cmp(ci->i_auth_cap->mseq, mseq) < 0) {
		    ceph_seq_cmp(ci->i_auth_cap->mseq, mseq) < 0) {
			ci->i_auth_cap = cap;
			ci->i_auth_cap = cap;
			cap->mds_wanted = wanted;
			cap->mds_wanted = wanted;
@@ -728,7 +729,7 @@ static void __touch_cap(struct ceph_cap *cap)
	struct ceph_mds_session *s = cap->session;
	struct ceph_mds_session *s = cap->session;


	spin_lock(&s->s_cap_lock);
	spin_lock(&s->s_cap_lock);
	if (s->s_cap_iterator == NULL) {
	if (!s->s_cap_iterator) {
		dout("__touch_cap %p cap %p mds%d\n", &cap->ci->vfs_inode, cap,
		dout("__touch_cap %p cap %p mds%d\n", &cap->ci->vfs_inode, cap,
		     s->s_mds);
		     s->s_mds);
		list_move_tail(&cap->session_caps, &s->s_caps);
		list_move_tail(&cap->session_caps, &s->s_caps);
@@ -1248,7 +1249,10 @@ static int __send_cap(struct ceph_mds_client *mdsc, struct ceph_cap *cap,
	arg.mode = inode->i_mode;
	arg.mode = inode->i_mode;


	arg.inline_data = ci->i_inline_version != CEPH_INLINE_NONE;
	arg.inline_data = ci->i_inline_version != CEPH_INLINE_NONE;
	arg.flags = 0;
	if (list_empty(&ci->i_cap_snaps))
		arg.flags = CEPH_CLIENT_CAPS_NO_CAPSNAP;
	else
		arg.flags = CEPH_CLIENT_CAPS_PENDING_CAPSNAP;
	if (sync)
	if (sync)
		arg.flags |= CEPH_CLIENT_CAPS_SYNC;
		arg.flags |= CEPH_CLIENT_CAPS_SYNC;


@@ -1454,13 +1458,19 @@ void ceph_flush_snaps(struct ceph_inode_info *ci,
		goto retry;
		goto retry;
	}
	}


	// make sure flushsnap messages are sent in proper order.
	if (ci->i_ceph_flags & CEPH_I_KICK_FLUSH) {
		__kick_flushing_caps(mdsc, session, ci, 0);
		ci->i_ceph_flags &= ~CEPH_I_KICK_FLUSH;
	}

	__ceph_flush_snaps(ci, session);
	__ceph_flush_snaps(ci, session);
out:
out:
	spin_unlock(&ci->i_ceph_lock);
	spin_unlock(&ci->i_ceph_lock);


	if (psession) {
	if (psession) {
		*psession = session;
		*psession = session;
	} else {
	} else if (session) {
		mutex_unlock(&session->s_mutex);
		mutex_unlock(&session->s_mutex);
		ceph_put_mds_session(session);
		ceph_put_mds_session(session);
	}
	}
@@ -1901,11 +1911,7 @@ void ceph_check_caps(struct ceph_inode_info *ci, int flags,
		    (ci->i_ceph_flags &
		    (ci->i_ceph_flags &
		     (CEPH_I_KICK_FLUSH | CEPH_I_FLUSH_SNAPS))) {
		     (CEPH_I_KICK_FLUSH | CEPH_I_FLUSH_SNAPS))) {
			if (ci->i_ceph_flags & CEPH_I_KICK_FLUSH) {
			if (ci->i_ceph_flags & CEPH_I_KICK_FLUSH) {
				spin_lock(&mdsc->cap_dirty_lock);
				__kick_flushing_caps(mdsc, session, ci, 0);
				oldest_flush_tid = __get_oldest_flush_tid(mdsc);
				spin_unlock(&mdsc->cap_dirty_lock);
				__kick_flushing_caps(mdsc, session, ci,
						     oldest_flush_tid);
				ci->i_ceph_flags &= ~CEPH_I_KICK_FLUSH;
				ci->i_ceph_flags &= ~CEPH_I_KICK_FLUSH;
			}
			}
			if (ci->i_ceph_flags & CEPH_I_FLUSH_SNAPS)
			if (ci->i_ceph_flags & CEPH_I_FLUSH_SNAPS)
@@ -2110,7 +2116,7 @@ int ceph_fsync(struct file *file, loff_t start, loff_t end, int datasync)


	dout("fsync %p%s\n", inode, datasync ? " datasync" : "");
	dout("fsync %p%s\n", inode, datasync ? " datasync" : "");


	ret = filemap_write_and_wait_range(inode->i_mapping, start, end);
	ret = file_write_and_wait_range(file, start, end);
	if (ret < 0)
	if (ret < 0)
		goto out;
		goto out;


@@ -3422,7 +3428,7 @@ static void handle_cap_export(struct inode *inode, struct ceph_mds_caps *ex,
	tcap = __get_cap_for_mds(ci, target);
	tcap = __get_cap_for_mds(ci, target);
	if (tcap) {
	if (tcap) {
		/* already have caps from the target */
		/* already have caps from the target */
		if (tcap->cap_id != t_cap_id ||
		if (tcap->cap_id == t_cap_id &&
		    ceph_seq_cmp(tcap->seq, t_seq) < 0) {
		    ceph_seq_cmp(tcap->seq, t_seq) < 0) {
			dout(" updating import cap %p mds%d\n", tcap, target);
			dout(" updating import cap %p mds%d\n", tcap, target);
			tcap->cap_id = t_cap_id;
			tcap->cap_id = t_cap_id;
+1 −1
Original line number Original line Diff line number Diff line
@@ -24,7 +24,7 @@ static int mdsmap_show(struct seq_file *s, void *p)
	struct ceph_fs_client *fsc = s->private;
	struct ceph_fs_client *fsc = s->private;
	struct ceph_mdsmap *mdsmap;
	struct ceph_mdsmap *mdsmap;


	if (fsc->mdsc == NULL || fsc->mdsc->mdsmap == NULL)
	if (!fsc->mdsc || !fsc->mdsc->mdsmap)
		return 0;
		return 0;
	mdsmap = fsc->mdsc->mdsmap;
	mdsmap = fsc->mdsc->mdsmap;
	seq_printf(s, "epoch %d\n", mdsmap->m_epoch);
	seq_printf(s, "epoch %d\n", mdsmap->m_epoch);
Loading