Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit 553adfd9 authored by Yan, Zheng's avatar Yan, Zheng Committed by Ilya Dryomov
Browse files

ceph: track pending caps flushing accurately



Previously we do not trace accurate TID for flushing caps. when
MDS failovers, we have no choice but to re-send all flushing caps
with a new TID. This can cause problem because MDS can has already
flushed some caps and has issued the same caps to other client.
The re-sent cap flush has a new TID, which makes MDS unable to
detect if it has already processed the cap flush.

This patch adds code to track pending caps flushing accurately.
When re-sending cap flush is needed, we use its original flush
TID.

Signed-off-by: default avatarYan, Zheng <zyan@redhat.com>
parent 6c13a6bb
Loading
Loading
Loading
Loading
+160 −85
Original line number Diff line number Diff line
@@ -1097,7 +1097,8 @@ void ceph_queue_caps_release(struct inode *inode)
 * caller should hold snap_rwsem (read), s_mutex.
 */
static int __send_cap(struct ceph_mds_client *mdsc, struct ceph_cap *cap,
		      int op, int used, int want, int retain, int flushing)
		      int op, int used, int want, int retain, int flushing,
		      u64 flush_tid)
	__releases(cap->ci->i_ceph_lock)
{
	struct ceph_inode_info *ci = cap->ci;
@@ -1115,8 +1116,6 @@ static int __send_cap(struct ceph_mds_client *mdsc, struct ceph_cap *cap,
	u64 xattr_version = 0;
	struct ceph_buffer *xattr_blob = NULL;
	int delayed = 0;
	u64 flush_tid = 0;
	int i;
	int ret;
	bool inline_data;

@@ -1160,24 +1159,7 @@ static int __send_cap(struct ceph_mds_client *mdsc, struct ceph_cap *cap,
	cap->implemented &= cap->issued | used;
	cap->mds_wanted = want;

	if (flushing) {
		/*
		 * assign a tid for flush operations so we can avoid
		 * flush1 -> dirty1 -> flush2 -> flushack1 -> mark
		 * clean type races.  track latest tid for every bit
		 * so we can handle flush AxFw, flush Fw, and have the
		 * first ack clean Ax.
		 */
		flush_tid = ++ci->i_cap_flush_last_tid;
		dout(" cap_flush_tid %d\n", (int)flush_tid);
		for (i = 0; i < CEPH_CAP_BITS; i++)
			if (flushing & (1 << i))
				ci->i_cap_flush_tid[i] = flush_tid;

		follows = ci->i_head_snapc->seq;
	} else {
		follows = 0;
	}
	follows = flushing ? ci->i_head_snapc->seq : 0;

	keep = cap->implemented;
	seq = cap->seq;
@@ -1311,7 +1293,10 @@ void __ceph_flush_snaps(struct ceph_inode_info *ci,
			goto retry;
		}

		capsnap->flush_tid = ++ci->i_cap_flush_last_tid;
		spin_lock(&mdsc->cap_dirty_lock);
		capsnap->flush_tid = ++mdsc->last_cap_flush_tid;
		spin_unlock(&mdsc->cap_dirty_lock);

		atomic_inc(&capsnap->nref);
		if (list_empty(&capsnap->flushing_item))
			list_add_tail(&capsnap->flushing_item,
@@ -1407,6 +1392,29 @@ int __ceph_mark_dirty_caps(struct ceph_inode_info *ci, int mask)
	return dirty;
}

static void __add_cap_flushing_to_inode(struct ceph_inode_info *ci,
					struct ceph_cap_flush *cf)
{
	struct rb_node **p = &ci->i_cap_flush_tree.rb_node;
	struct rb_node *parent = NULL;
	struct ceph_cap_flush *other = NULL;

	while (*p) {
		parent = *p;
		other = rb_entry(parent, struct ceph_cap_flush, i_node);

		if (cf->tid < other->tid)
			p = &(*p)->rb_left;
		else if (cf->tid > other->tid)
			p = &(*p)->rb_right;
		else
			BUG();
	}

	rb_link_node(&cf->i_node, parent, p);
	rb_insert_color(&cf->i_node, &ci->i_cap_flush_tree);
}

/*
 * Add dirty inode to the flushing list.  Assigned a seq number so we
 * can wait for caps to flush without starving.
@@ -1414,10 +1422,12 @@ int __ceph_mark_dirty_caps(struct ceph_inode_info *ci, int mask)
 * Called under i_ceph_lock.
 */
static int __mark_caps_flushing(struct inode *inode,
				 struct ceph_mds_session *session)
				struct ceph_mds_session *session,
				u64 *flush_tid)
{
	struct ceph_mds_client *mdsc = ceph_sb_to_client(inode->i_sb)->mdsc;
	struct ceph_inode_info *ci = ceph_inode(inode);
	struct ceph_cap_flush *cf;
	int flushing;

	BUG_ON(ci->i_dirty_caps == 0);
@@ -1432,9 +1442,14 @@ static int __mark_caps_flushing(struct inode *inode,
	ci->i_dirty_caps = 0;
	dout(" inode %p now !dirty\n", inode);

	cf = kmalloc(sizeof(*cf), GFP_ATOMIC);
	cf->caps = flushing;

	spin_lock(&mdsc->cap_dirty_lock);
	list_del_init(&ci->i_dirty_item);

	cf->tid = ++mdsc->last_cap_flush_tid;

	if (list_empty(&ci->i_flushing_item)) {
		ci->i_cap_flush_seq = ++mdsc->cap_flush_seq;
		list_add_tail(&ci->i_flushing_item, &session->s_cap_flushing);
@@ -1448,6 +1463,9 @@ static int __mark_caps_flushing(struct inode *inode,
	}
	spin_unlock(&mdsc->cap_dirty_lock);

	__add_cap_flushing_to_inode(ci, cf);

	*flush_tid = cf->tid;
	return flushing;
}

@@ -1493,6 +1511,7 @@ void ceph_check_caps(struct ceph_inode_info *ci, int flags,
	struct ceph_mds_client *mdsc = fsc->mdsc;
	struct inode *inode = &ci->vfs_inode;
	struct ceph_cap *cap;
	u64 flush_tid;
	int file_wanted, used, cap_used;
	int took_snap_rwsem = 0;             /* true if mdsc->snap_rwsem held */
	int issued, implemented, want, retain, revoking, flushing = 0;
@@ -1711,17 +1730,20 @@ void ceph_check_caps(struct ceph_inode_info *ci, int flags,
			took_snap_rwsem = 1;
		}

		if (cap == ci->i_auth_cap && ci->i_dirty_caps)
			flushing = __mark_caps_flushing(inode, session);
		else
		if (cap == ci->i_auth_cap && ci->i_dirty_caps) {
			flushing = __mark_caps_flushing(inode, session,
							&flush_tid);
		} else {
			flushing = 0;
			flush_tid = 0;
		}

		mds = cap->mds;  /* remember mds, so we don't repeat */
		sent++;

		/* __send_cap drops i_ceph_lock */
		delayed += __send_cap(mdsc, cap, CEPH_CAP_OP_UPDATE, cap_used,
				      want, retain, flushing);
				      want, retain, flushing, flush_tid);
		goto retry; /* retake i_ceph_lock and restart our cap scan. */
	}

@@ -1750,12 +1772,13 @@ void ceph_check_caps(struct ceph_inode_info *ci, int flags,
/*
 * Try to flush dirty caps back to the auth mds.
 */
static int try_flush_caps(struct inode *inode, u16 flush_tid[])
static int try_flush_caps(struct inode *inode, u64 *ptid)
{
	struct ceph_mds_client *mdsc = ceph_sb_to_client(inode->i_sb)->mdsc;
	struct ceph_inode_info *ci = ceph_inode(inode);
	struct ceph_mds_session *session = NULL;
	int flushing = 0;
	u64 flush_tid = 0;

retry:
	spin_lock(&ci->i_ceph_lock);
@@ -1780,46 +1803,52 @@ static int try_flush_caps(struct inode *inode, u16 flush_tid[])
		if (cap->session->s_state < CEPH_MDS_SESSION_OPEN)
			goto out;

		flushing = __mark_caps_flushing(inode, session);
		flushing = __mark_caps_flushing(inode, session, &flush_tid);

		/* __send_cap drops i_ceph_lock */
		delayed = __send_cap(mdsc, cap, CEPH_CAP_OP_FLUSH, used, want,
				     cap->issued | cap->implemented, flushing);
				     (cap->issued | cap->implemented),
				     flushing, flush_tid);

		if (delayed) {
			spin_lock(&ci->i_ceph_lock);
		if (delayed)
			__cap_delay_requeue(mdsc, ci);
			spin_unlock(&ci->i_ceph_lock);
		}
	} else {
		struct rb_node *n = rb_last(&ci->i_cap_flush_tree);
		if (n) {
			struct ceph_cap_flush *cf =
				rb_entry(n, struct ceph_cap_flush, i_node);
			flush_tid = cf->tid;
		}

		flushing = ci->i_flushing_caps;
	if (flushing)
		memcpy(flush_tid, ci->i_cap_flush_tid,
		       sizeof(ci->i_cap_flush_tid));
out:
		spin_unlock(&ci->i_ceph_lock);
	}
out:
	if (session)
		mutex_unlock(&session->s_mutex);

	*ptid = flush_tid;
	return flushing;
}

/*
 * Return true if we've flushed caps through the given flush_tid.
 */
static int caps_are_flushed(struct inode *inode, u16 flush_tid[])
static int caps_are_flushed(struct inode *inode, u64 flush_tid)
{
	struct ceph_inode_info *ci = ceph_inode(inode);
	int i, ret = 1;
	struct ceph_cap_flush *cf;
	struct rb_node *n;
	int ret = 1;

	spin_lock(&ci->i_ceph_lock);
	for (i = 0; i < CEPH_CAP_BITS; i++) {
		if (!(ci->i_flushing_caps & (1 << i)))
			continue;
		// tid only has 16 bits. we need to handle wrapping
		if ((s16)(ci->i_cap_flush_tid[i] - flush_tid[i]) <= 0) {
			/* still flushing this bit */
	n = rb_first(&ci->i_cap_flush_tree);
	if (n) {
		cf = rb_entry(n, struct ceph_cap_flush, i_node);
		if (cf->tid <= flush_tid)
			ret = 0;
			break;
		}
	}
	spin_unlock(&ci->i_ceph_lock);
	return ret;
@@ -1922,7 +1951,7 @@ int ceph_fsync(struct file *file, loff_t start, loff_t end, int datasync)
{
	struct inode *inode = file->f_mapping->host;
	struct ceph_inode_info *ci = ceph_inode(inode);
	u16 flush_tid[CEPH_CAP_BITS];
	u64 flush_tid;
	int ret;
	int dirty;

@@ -1938,7 +1967,7 @@ int ceph_fsync(struct file *file, loff_t start, loff_t end, int datasync)

	mutex_lock(&inode->i_mutex);

	dirty = try_flush_caps(inode, flush_tid);
	dirty = try_flush_caps(inode, &flush_tid);
	dout("fsync dirty caps are %s\n", ceph_cap_string(dirty));

	ret = unsafe_dirop_wait(inode);
@@ -1967,14 +1996,14 @@ int ceph_fsync(struct file *file, loff_t start, loff_t end, int datasync)
int ceph_write_inode(struct inode *inode, struct writeback_control *wbc)
{
	struct ceph_inode_info *ci = ceph_inode(inode);
	u16 flush_tid[CEPH_CAP_BITS];
	u64 flush_tid;
	int err = 0;
	int dirty;
	int wait = wbc->sync_mode == WB_SYNC_ALL;

	dout("write_inode %p wait=%d\n", inode, wait);
	if (wait) {
		dirty = try_flush_caps(inode, flush_tid);
		dirty = try_flush_caps(inode, &flush_tid);
		if (dirty)
			err = wait_event_interruptible(ci->i_cap_wq,
				       caps_are_flushed(inode, flush_tid));
@@ -2022,39 +2051,66 @@ static void kick_flushing_capsnaps(struct ceph_mds_client *mdsc,
	}
}

void ceph_kick_flushing_caps(struct ceph_mds_client *mdsc,
			     struct ceph_mds_session *session)
static int __kick_flushing_caps(struct ceph_mds_client *mdsc,
				struct ceph_mds_session *session,
				struct ceph_inode_info *ci)
{
	struct ceph_inode_info *ci;

	kick_flushing_capsnaps(mdsc, session);

	dout("kick_flushing_caps mds%d\n", session->s_mds);
	list_for_each_entry(ci, &session->s_cap_flushing, i_flushing_item) {
	struct inode *inode = &ci->vfs_inode;
	struct ceph_cap *cap;
	struct ceph_cap_flush *cf;
	struct rb_node *n;
	int delayed = 0;
	u64 first_tid = 0;

	while (true) {
		spin_lock(&ci->i_ceph_lock);
		cap = ci->i_auth_cap;
		if (cap && cap->session == session) {
			dout("kick_flushing_caps %p cap %p %s\n", inode,
			     cap, ceph_cap_string(ci->i_flushing_caps));
			delayed = __send_cap(mdsc, cap, CEPH_CAP_OP_FLUSH,
		if (!(cap && cap->session == session)) {
			pr_err("%p auth cap %p not mds%d ???\n", inode,
					cap, session->s_mds);
			spin_unlock(&ci->i_ceph_lock);
			break;
		}

		for (n = rb_first(&ci->i_cap_flush_tree); n; n = rb_next(n)) {
			cf = rb_entry(n, struct ceph_cap_flush, i_node);
			if (cf->tid >= first_tid)
				break;
		}
		if (!n) {
			spin_unlock(&ci->i_ceph_lock);
			break;
		}

		cf = rb_entry(n, struct ceph_cap_flush, i_node);
		first_tid = cf->tid + 1;

		dout("kick_flushing_caps %p cap %p tid %llu %s\n", inode,
		     cap, cf->tid, ceph_cap_string(cf->caps));
		delayed |= __send_cap(mdsc, cap, CEPH_CAP_OP_FLUSH,
				      __ceph_caps_used(ci),
				      __ceph_caps_wanted(ci),
				      cap->issued | cap->implemented,
					     ci->i_flushing_caps);
				      cf->caps, cf->tid);
	}
	return delayed;
}

void ceph_kick_flushing_caps(struct ceph_mds_client *mdsc,
			     struct ceph_mds_session *session)
{
	struct ceph_inode_info *ci;

	kick_flushing_capsnaps(mdsc, session);

	dout("kick_flushing_caps mds%d\n", session->s_mds);
	list_for_each_entry(ci, &session->s_cap_flushing, i_flushing_item) {
		int delayed = __kick_flushing_caps(mdsc, session, ci);
		if (delayed) {
			spin_lock(&ci->i_ceph_lock);
			__cap_delay_requeue(mdsc, ci);
			spin_unlock(&ci->i_ceph_lock);
		}
		} else {
			pr_err("%p auth cap %p not mds%d ???\n", inode,
			       cap, session->s_mds);
			spin_unlock(&ci->i_ceph_lock);
		}
	}
}

@@ -2064,7 +2120,6 @@ static void kick_flushing_inode_caps(struct ceph_mds_client *mdsc,
{
	struct ceph_inode_info *ci = ceph_inode(inode);
	struct ceph_cap *cap;
	int delayed = 0;

	spin_lock(&ci->i_ceph_lock);
	cap = ci->i_auth_cap;
@@ -2074,16 +2129,16 @@ static void kick_flushing_inode_caps(struct ceph_mds_client *mdsc,
	__ceph_flush_snaps(ci, &session, 1);

	if (ci->i_flushing_caps) {
		int delayed;

		spin_lock(&mdsc->cap_dirty_lock);
		list_move_tail(&ci->i_flushing_item,
			       &cap->session->s_cap_flushing);
		spin_unlock(&mdsc->cap_dirty_lock);

		delayed = __send_cap(mdsc, cap, CEPH_CAP_OP_FLUSH,
				     __ceph_caps_used(ci),
				     __ceph_caps_wanted(ci),
				     cap->issued | cap->implemented,
				     ci->i_flushing_caps);
		spin_unlock(&ci->i_ceph_lock);

		delayed = __kick_flushing_caps(mdsc, session, ci);
		if (delayed) {
			spin_lock(&ci->i_ceph_lock);
			__cap_delay_requeue(mdsc, ci);
@@ -2836,16 +2891,29 @@ static void handle_cap_flush_ack(struct inode *inode, u64 flush_tid,
{
	struct ceph_inode_info *ci = ceph_inode(inode);
	struct ceph_mds_client *mdsc = ceph_sb_to_client(inode->i_sb)->mdsc;
	struct ceph_cap_flush *cf;
	struct rb_node *n;
	LIST_HEAD(to_remove);
	unsigned seq = le32_to_cpu(m->seq);
	int dirty = le32_to_cpu(m->dirty);
	int cleaned = 0;
	int drop = 0;
	int i;

	for (i = 0; i < CEPH_CAP_BITS; i++)
		if ((dirty & (1 << i)) &&
		    (u16)flush_tid == ci->i_cap_flush_tid[i])
			cleaned |= 1 << i;
	n = rb_first(&ci->i_cap_flush_tree);
	while (n) {
		cf = rb_entry(n, struct ceph_cap_flush, i_node);
		n = rb_next(&cf->i_node);
		if (cf->tid == flush_tid)
			cleaned = cf->caps;
		if (cf->tid <= flush_tid) {
			rb_erase(&cf->i_node, &ci->i_cap_flush_tree);
			list_add_tail(&cf->list, &to_remove);
		} else {
			cleaned &= ~cf->caps;
			if (!cleaned)
				break;
		}
	}

	dout("handle_cap_flush_ack inode %p mds%d seq %d on %s cleaned %s,"
	     " flushing %s -> %s\n",
@@ -2890,6 +2958,13 @@ static void handle_cap_flush_ack(struct inode *inode, u64 flush_tid,

out:
	spin_unlock(&ci->i_ceph_lock);

	while (!list_empty(&to_remove)) {
		cf = list_first_entry(&to_remove,
				      struct ceph_cap_flush, list);
		list_del(&cf->list);
		kfree(cf);
	}
	if (drop)
		iput(inode);
}
+1 −2
Original line number Diff line number Diff line
@@ -417,8 +417,7 @@ struct inode *ceph_alloc_inode(struct super_block *sb)
	INIT_LIST_HEAD(&ci->i_dirty_item);
	INIT_LIST_HEAD(&ci->i_flushing_item);
	ci->i_cap_flush_seq = 0;
	ci->i_cap_flush_last_tid = 0;
	memset(&ci->i_cap_flush_tid, 0, sizeof(ci->i_cap_flush_tid));
	ci->i_cap_flush_tree = RB_ROOT;
	init_waitqueue_head(&ci->i_cap_wq);
	ci->i_hold_caps_min = 0;
	ci->i_hold_caps_max = 0;
+20 −0
Original line number Diff line number Diff line
@@ -1142,6 +1142,7 @@ static int remove_session_caps_cb(struct inode *inode, struct ceph_cap *cap,
				  void *arg)
{
	struct ceph_inode_info *ci = ceph_inode(inode);
	LIST_HEAD(to_remove);
	int drop = 0;

	dout("removing cap %p, ci is %p, inode is %p\n",
@@ -1149,9 +1150,19 @@ static int remove_session_caps_cb(struct inode *inode, struct ceph_cap *cap,
	spin_lock(&ci->i_ceph_lock);
	__ceph_remove_cap(cap, false);
	if (!ci->i_auth_cap) {
		struct ceph_cap_flush *cf;
		struct ceph_mds_client *mdsc =
			ceph_sb_to_client(inode->i_sb)->mdsc;

		while (true) {
			struct rb_node *n = rb_first(&ci->i_cap_flush_tree);
			if (!n)
				break;
			cf = rb_entry(n, struct ceph_cap_flush, i_node);
			rb_erase(&cf->i_node, &ci->i_cap_flush_tree);
			list_add(&cf->list, &to_remove);
		}

		spin_lock(&mdsc->cap_dirty_lock);
		if (!list_empty(&ci->i_dirty_item)) {
			pr_warn_ratelimited(
@@ -1173,8 +1184,16 @@ static int remove_session_caps_cb(struct inode *inode, struct ceph_cap *cap,
			drop = 1;
		}
		spin_unlock(&mdsc->cap_dirty_lock);

	}
	spin_unlock(&ci->i_ceph_lock);
	while (!list_empty(&to_remove)) {
		struct ceph_cap_flush *cf;
		cf = list_first_entry(&to_remove,
				      struct ceph_cap_flush, list);
		list_del(&cf->list);
		kfree(cf);
	}
	while (drop--)
		iput(inode);
	return 0;
@@ -3408,6 +3427,7 @@ int ceph_mdsc_init(struct ceph_fs_client *fsc)
	INIT_LIST_HEAD(&mdsc->snap_flush_list);
	spin_lock_init(&mdsc->snap_flush_lock);
	mdsc->cap_flush_seq = 0;
	mdsc->last_cap_flush_tid = 1;
	INIT_LIST_HEAD(&mdsc->cap_dirty);
	INIT_LIST_HEAD(&mdsc->cap_dirty_migrating);
	mdsc->num_cap_flushing = 0;
+1 −0
Original line number Diff line number Diff line
@@ -307,6 +307,7 @@ struct ceph_mds_client {
	spinlock_t       snap_flush_lock;

	u64               cap_flush_seq;
	u64               last_cap_flush_tid;
	struct list_head  cap_dirty;        /* inodes with dirty caps */
	struct list_head  cap_dirty_migrating; /* ...that are migration... */
	int               num_cap_flushing; /* # caps we are flushing */
+10 −1
Original line number Diff line number Diff line
@@ -186,6 +186,15 @@ static inline void ceph_put_cap_snap(struct ceph_cap_snap *capsnap)
	}
}

struct ceph_cap_flush {
	u64 tid;
	int caps;
	union {
		struct rb_node i_node;
		struct list_head list;
	};
};

/*
 * The frag tree describes how a directory is fragmented, potentially across
 * multiple metadata servers.  It is also used to indicate points where
@@ -299,7 +308,7 @@ struct ceph_inode_info {
	/* we need to track cap writeback on a per-cap-bit basis, to allow
	 * overlapping, pipelined cap flushes to the mds.  we can probably
	 * reduce the tid to 8 bits if we're concerned about inode size. */
	u16 i_cap_flush_last_tid, i_cap_flush_tid[CEPH_CAP_BITS];
	struct rb_root i_cap_flush_tree;
	wait_queue_head_t i_cap_wq;      /* threads waiting on a capability */
	unsigned long i_hold_caps_min; /* jiffies */
	unsigned long i_hold_caps_max; /* jiffies */