Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit 745a8e3b authored by Yan, Zheng's avatar Yan, Zheng Committed by Ilya Dryomov
Browse files

ceph: don't pre-allocate space for cap release messages



Previously we pre-allocate cap release messages for each caps. This
wastes lots of memory when there are large amount of caps. This patch
make the code not pre-allocate the cap release messages. Instead,
we add the corresponding ceph_cap struct to a list when releasing a
cap. Later when flush cap releases is needed, we allocate the cap
release messages dynamically.

Signed-off-by: default avatarYan, Zheng <zyan@redhat.com>
parent affbc19a
Loading
Loading
Loading
Loading
+29 −56
Original line number Diff line number Diff line
@@ -926,16 +926,6 @@ void __ceph_remove_cap(struct ceph_cap *cap, bool queue_release)

	/* remove from session list */
	spin_lock(&session->s_cap_lock);
	/*
	 * s_cap_reconnect is protected by s_cap_lock. no one changes
	 * s_cap_gen while session is in the reconnect state.
	 */
	if (queue_release &&
	    (!session->s_cap_reconnect ||
	     cap->cap_gen == session->s_cap_gen))
		__queue_cap_release(session, ci->i_vino.ino, cap->cap_id,
				    cap->mseq, cap->issue_seq);

	if (session->s_cap_iterator == cap) {
		/* not yet, we are iterating over this very cap */
		dout("__ceph_remove_cap  delaying %p removal from session %p\n",
@@ -948,6 +938,25 @@ void __ceph_remove_cap(struct ceph_cap *cap, bool queue_release)
	}
	/* protect backpointer with s_cap_lock: see iterate_session_caps */
	cap->ci = NULL;

	/*
	 * s_cap_reconnect is protected by s_cap_lock. no one changes
	 * s_cap_gen while session is in the reconnect state.
	 */
	if (queue_release &&
	    (!session->s_cap_reconnect || cap->cap_gen == session->s_cap_gen)) {
		cap->queue_release = 1;
		if (removed) {
			list_add_tail(&cap->session_caps,
				      &session->s_cap_releases);
			session->s_num_cap_releases++;
			removed = 0;
		}
	} else {
		cap->queue_release = 0;
	}
	cap->cap_ino = ci->i_vino.ino;

	spin_unlock(&session->s_cap_lock);

	/* remove from inode list */
@@ -1053,44 +1062,6 @@ static int send_cap_msg(struct ceph_mds_session *session,
	return 0;
}

void __queue_cap_release(struct ceph_mds_session *session,
			 u64 ino, u64 cap_id, u32 migrate_seq,
			 u32 issue_seq)
{
	struct ceph_msg *msg;
	struct ceph_mds_cap_release *head;
	struct ceph_mds_cap_item *item;

	BUG_ON(!session->s_num_cap_releases);
	msg = list_first_entry(&session->s_cap_releases,
			       struct ceph_msg, list_head);

	dout(" adding %llx release to mds%d msg %p (%d left)\n",
	     ino, session->s_mds, msg, session->s_num_cap_releases);

	BUG_ON(msg->front.iov_len + sizeof(*item) > PAGE_CACHE_SIZE);
	head = msg->front.iov_base;
	le32_add_cpu(&head->num, 1);
	item = msg->front.iov_base + msg->front.iov_len;
	item->ino = cpu_to_le64(ino);
	item->cap_id = cpu_to_le64(cap_id);
	item->migrate_seq = cpu_to_le32(migrate_seq);
	item->seq = cpu_to_le32(issue_seq);

	session->s_num_cap_releases--;

	msg->front.iov_len += sizeof(*item);
	if (le32_to_cpu(head->num) == CEPH_CAPS_PER_RELEASE) {
		dout(" release msg %p full\n", msg);
		list_move_tail(&msg->list_head, &session->s_cap_releases_done);
	} else {
		dout(" release msg %p at %d/%d (%d)\n", msg,
		     (int)le32_to_cpu(head->num),
		     (int)CEPH_CAPS_PER_RELEASE,
		     (int)msg->front.iov_len);
	}
}

/*
 * Queue cap releases when an inode is dropped from our cache.  Since
 * inode is about to be destroyed, there is no need for i_ceph_lock.
@@ -3051,7 +3022,6 @@ static void handle_cap_export(struct inode *inode, struct ceph_mds_caps *ex,
			mutex_lock_nested(&session->s_mutex,
					  SINGLE_DEPTH_NESTING);
		}
		ceph_add_cap_releases(mdsc, tsession);
		new_cap = ceph_get_cap(mdsc, NULL);
	} else {
		WARN_ON(1);
@@ -3247,16 +3217,20 @@ void ceph_handle_caps(struct ceph_mds_session *session,
	dout(" mds%d seq %lld cap seq %u\n", session->s_mds, session->s_seq,
	     (unsigned)seq);

	if (op == CEPH_CAP_OP_IMPORT)
		ceph_add_cap_releases(mdsc, session);

	if (!inode) {
		dout(" i don't have ino %llx\n", vino.ino);

		if (op == CEPH_CAP_OP_IMPORT) {
			cap = ceph_get_cap(mdsc, NULL);
			cap->cap_ino = vino.ino;
			cap->queue_release = 1;
			cap->cap_id = cap_id;
			cap->mseq = mseq;
			cap->seq = seq;
			spin_lock(&session->s_cap_lock);
			__queue_cap_release(session, vino.ino, cap_id,
					    mseq, seq);
			list_add_tail(&cap->session_caps,
					&session->s_cap_releases);
			session->s_num_cap_releases++;
			spin_unlock(&session->s_cap_lock);
		}
		goto flush_cap_releases;
@@ -3332,11 +3306,10 @@ void ceph_handle_caps(struct ceph_mds_session *session,

flush_cap_releases:
	/*
	 * send any full release message to try to move things
	 * send any cap release message to try to move things
	 * along for the mds (who clearly thinks we still have this
	 * cap).
	 */
	ceph_add_cap_releases(mdsc, session);
	ceph_send_cap_releases(mdsc, session);

done:
+86 −137
Original line number Diff line number Diff line
@@ -458,7 +458,6 @@ static struct ceph_mds_session *register_session(struct ceph_mds_client *mdsc,
	s->s_cap_reconnect = 0;
	s->s_cap_iterator = NULL;
	INIT_LIST_HEAD(&s->s_cap_releases);
	INIT_LIST_HEAD(&s->s_cap_releases_done);
	INIT_LIST_HEAD(&s->s_cap_flushing);
	INIT_LIST_HEAD(&s->s_cap_snaps_flushing);

@@ -998,27 +997,25 @@ void ceph_mdsc_open_export_target_sessions(struct ceph_mds_client *mdsc,
 * session caps
 */

/*
 * Free preallocated cap messages assigned to this session
 */
static void cleanup_cap_releases(struct ceph_mds_session *session)
/* caller holds s_cap_lock, we drop it */
static void cleanup_cap_releases(struct ceph_mds_client *mdsc,
				 struct ceph_mds_session *session)
	__releases(session->s_cap_lock)
{
	struct ceph_msg *msg;
	LIST_HEAD(tmp_list);
	list_splice_init(&session->s_cap_releases, &tmp_list);
	session->s_num_cap_releases = 0;
	spin_unlock(&session->s_cap_lock);

	spin_lock(&session->s_cap_lock);
	while (!list_empty(&session->s_cap_releases)) {
		msg = list_first_entry(&session->s_cap_releases,
				       struct ceph_msg, list_head);
		list_del_init(&msg->list_head);
		ceph_msg_put(msg);
	}
	while (!list_empty(&session->s_cap_releases_done)) {
		msg = list_first_entry(&session->s_cap_releases_done,
				       struct ceph_msg, list_head);
		list_del_init(&msg->list_head);
		ceph_msg_put(msg);
	dout("cleanup_cap_releases mds%d\n", session->s_mds);
	while (!list_empty(&tmp_list)) {
		struct ceph_cap *cap;
		/* zero out the in-progress message */
		cap = list_first_entry(&tmp_list,
					struct ceph_cap, session_caps);
		list_del(&cap->session_caps);
		ceph_put_cap(mdsc, cap);
	}
	spin_unlock(&session->s_cap_lock);
}

static void cleanup_session_requests(struct ceph_mds_client *mdsc,
@@ -1095,11 +1092,17 @@ static int iterate_session_caps(struct ceph_mds_session *session,
			dout("iterate_session_caps  finishing cap %p removal\n",
			     cap);
			BUG_ON(cap->session != session);
			cap->session = NULL;
			list_del_init(&cap->session_caps);
			session->s_nr_caps--;
			cap->session = NULL;
			if (cap->queue_release) {
				list_add_tail(&cap->session_caps,
					      &session->s_cap_releases);
				session->s_num_cap_releases++;
			} else {
				old_cap = cap;  /* put_cap it w/o locks held */
			}
		}
		if (ret < 0)
			goto out;
	}
@@ -1191,11 +1194,12 @@ static void remove_session_caps(struct ceph_mds_session *session)
			spin_lock(&session->s_cap_lock);
		}
	}
	spin_unlock(&session->s_cap_lock);

	// drop cap expires and unlock s_cap_lock
	cleanup_cap_releases(session->s_mdsc, session);

	BUG_ON(session->s_nr_caps > 0);
	BUG_ON(!list_empty(&session->s_cap_flushing));
	cleanup_cap_releases(session);
}

/*
@@ -1418,76 +1422,10 @@ static int trim_caps(struct ceph_mds_client *mdsc,
		session->s_trim_caps = 0;
	}

	ceph_add_cap_releases(mdsc, session);
	ceph_send_cap_releases(mdsc, session);
	return 0;
}

/*
 * Allocate cap_release messages.  If there is a partially full message
 * in the queue, try to allocate enough to cover it's remainder, so that
 * we can send it immediately.
 *
 * Called under s_mutex.
 */
int ceph_add_cap_releases(struct ceph_mds_client *mdsc,
			  struct ceph_mds_session *session)
{
	struct ceph_msg *msg, *partial = NULL;
	struct ceph_mds_cap_release *head;
	int err = -ENOMEM;
	int extra = mdsc->fsc->mount_options->cap_release_safety;
	int num;

	dout("add_cap_releases %p mds%d extra %d\n", session, session->s_mds,
	     extra);

	spin_lock(&session->s_cap_lock);

	if (!list_empty(&session->s_cap_releases)) {
		msg = list_first_entry(&session->s_cap_releases,
				       struct ceph_msg,
				 list_head);
		head = msg->front.iov_base;
		num = le32_to_cpu(head->num);
		if (num) {
			dout(" partial %p with (%d/%d)\n", msg, num,
			     (int)CEPH_CAPS_PER_RELEASE);
			extra += CEPH_CAPS_PER_RELEASE - num;
			partial = msg;
		}
	}
	while (session->s_num_cap_releases < session->s_nr_caps + extra) {
		spin_unlock(&session->s_cap_lock);
		msg = ceph_msg_new(CEPH_MSG_CLIENT_CAPRELEASE, PAGE_CACHE_SIZE,
				   GFP_NOFS, false);
		if (!msg)
			goto out_unlocked;
		dout("add_cap_releases %p msg %p now %d\n", session, msg,
		     (int)msg->front.iov_len);
		head = msg->front.iov_base;
		head->num = cpu_to_le32(0);
		msg->front.iov_len = sizeof(*head);
		spin_lock(&session->s_cap_lock);
		list_add(&msg->list_head, &session->s_cap_releases);
		session->s_num_cap_releases += CEPH_CAPS_PER_RELEASE;
	}

	if (partial) {
		head = partial->front.iov_base;
		num = le32_to_cpu(head->num);
		dout(" queueing partial %p with %d/%d\n", partial, num,
		     (int)CEPH_CAPS_PER_RELEASE);
		list_move_tail(&partial->list_head,
			       &session->s_cap_releases_done);
		session->s_num_cap_releases -= CEPH_CAPS_PER_RELEASE - num;
	}
	err = 0;
	spin_unlock(&session->s_cap_lock);
out_unlocked:
	return err;
}

static int check_cap_flush(struct ceph_inode_info *ci,
			   u64 want_flush_seq, u64 want_snap_seq)
{
@@ -1590,60 +1528,74 @@ static void wait_caps_flush(struct ceph_mds_client *mdsc,
void ceph_send_cap_releases(struct ceph_mds_client *mdsc,
			    struct ceph_mds_session *session)
{
	struct ceph_msg *msg;
	struct ceph_msg *msg = NULL;
	struct ceph_mds_cap_release *head;
	struct ceph_mds_cap_item *item;
	struct ceph_cap *cap;
	LIST_HEAD(tmp_list);
	int num_cap_releases;

	dout("send_cap_releases mds%d\n", session->s_mds);
	spin_lock(&session->s_cap_lock);
	while (!list_empty(&session->s_cap_releases_done)) {
		msg = list_first_entry(&session->s_cap_releases_done,
				 struct ceph_msg, list_head);
		list_del_init(&msg->list_head);
again:
	list_splice_init(&session->s_cap_releases, &tmp_list);
	num_cap_releases = session->s_num_cap_releases;
	session->s_num_cap_releases = 0;
	spin_unlock(&session->s_cap_lock);

	while (!list_empty(&tmp_list)) {
		if (!msg) {
			msg = ceph_msg_new(CEPH_MSG_CLIENT_CAPRELEASE,
					PAGE_CACHE_SIZE, GFP_NOFS, false);
			if (!msg)
				goto out_err;
			head = msg->front.iov_base;
			head->num = cpu_to_le32(0);
			msg->front.iov_len = sizeof(*head);
		}
		cap = list_first_entry(&tmp_list, struct ceph_cap,
					session_caps);
		list_del(&cap->session_caps);
		num_cap_releases--;

		head = msg->front.iov_base;
		le32_add_cpu(&head->num, 1);
		item = msg->front.iov_base + msg->front.iov_len;
		item->ino = cpu_to_le64(cap->cap_ino);
		item->cap_id = cpu_to_le64(cap->cap_id);
		item->migrate_seq = cpu_to_le32(cap->mseq);
		item->seq = cpu_to_le32(cap->issue_seq);
		msg->front.iov_len += sizeof(*item);

		ceph_put_cap(mdsc, cap);

		if (le32_to_cpu(head->num) == CEPH_CAPS_PER_RELEASE) {
			msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
			dout("send_cap_releases mds%d %p\n", session->s_mds, msg);
			ceph_con_send(&session->s_con, msg);
		spin_lock(&session->s_cap_lock);
			msg = NULL;
		}
	spin_unlock(&session->s_cap_lock);
	}

static void discard_cap_releases(struct ceph_mds_client *mdsc,
				 struct ceph_mds_session *session)
{
	struct ceph_msg *msg;
	struct ceph_mds_cap_release *head;
	unsigned num;

	dout("discard_cap_releases mds%d\n", session->s_mds);

	if (!list_empty(&session->s_cap_releases)) {
		/* zero out the in-progress message */
		msg = list_first_entry(&session->s_cap_releases,
					struct ceph_msg, list_head);
		head = msg->front.iov_base;
		num = le32_to_cpu(head->num);
		dout("discard_cap_releases mds%d %p %u\n",
		     session->s_mds, msg, num);
		head->num = cpu_to_le32(0);
		msg->front.iov_len = sizeof(*head);
		session->s_num_cap_releases += num;
	}
	BUG_ON(num_cap_releases != 0);

	/* requeue completed messages */
	while (!list_empty(&session->s_cap_releases_done)) {
		msg = list_first_entry(&session->s_cap_releases_done,
				 struct ceph_msg, list_head);
		list_del_init(&msg->list_head);
	spin_lock(&session->s_cap_lock);
	if (!list_empty(&session->s_cap_releases))
		goto again;
	spin_unlock(&session->s_cap_lock);

		head = msg->front.iov_base;
		num = le32_to_cpu(head->num);
		dout("discard_cap_releases mds%d %p %u\n", session->s_mds, msg,
		     num);
		session->s_num_cap_releases += num;
		head->num = cpu_to_le32(0);
		msg->front.iov_len = sizeof(*head);
		list_add(&msg->list_head, &session->s_cap_releases);
	if (msg) {
		msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
		dout("send_cap_releases mds%d %p\n", session->s_mds, msg);
		ceph_con_send(&session->s_con, msg);
	}
	return;
out_err:
	pr_err("send_cap_releases mds%d, failed to allocate message\n",
		session->s_mds);
	spin_lock(&session->s_cap_lock);
	list_splice(&tmp_list, &session->s_cap_releases);
	session->s_num_cap_releases += num_cap_releases;
	spin_unlock(&session->s_cap_lock);
}

/*
@@ -2529,7 +2481,6 @@ static void handle_reply(struct ceph_mds_session *session, struct ceph_msg *msg)
	}
	mutex_unlock(&mdsc->mutex);

	ceph_add_cap_releases(mdsc, req->r_session);
	mutex_unlock(&session->s_mutex);

	/* kick calling process */
@@ -2921,8 +2872,7 @@ static void send_mds_reconnect(struct ceph_mds_client *mdsc,
	 */
	session->s_cap_reconnect = 1;
	/* drop old cap expires; we're about to reestablish that state */
	discard_cap_releases(mdsc, session);
	spin_unlock(&session->s_cap_lock);
	cleanup_cap_releases(mdsc, session);

	/* trim unused caps to reduce MDS's cache rejoin time */
	if (mdsc->fsc->sb->s_root)
@@ -3385,7 +3335,6 @@ static void delayed_work(struct work_struct *work)
			send_renew_caps(mdsc, s);
		else
			ceph_con_keepalive(&s->s_con);
		ceph_add_cap_releases(mdsc, s);
		if (s->s_state == CEPH_MDS_SESSION_OPEN ||
		    s->s_state == CEPH_MDS_SESSION_HUNG)
			ceph_send_cap_releases(mdsc, s);
+0 −3
Original line number Diff line number Diff line
@@ -139,7 +139,6 @@ struct ceph_mds_session {
	int		  s_cap_reconnect;
	int		  s_readonly;
	struct list_head  s_cap_releases; /* waiting cap_release messages */
	struct list_head  s_cap_releases_done; /* ready to send */
	struct ceph_cap  *s_cap_iterator;

	/* protected by mutex */
@@ -389,8 +388,6 @@ static inline void ceph_mdsc_put_request(struct ceph_mds_request *req)
	kref_put(&req->r_kref, ceph_mdsc_release_request);
}

extern int ceph_add_cap_releases(struct ceph_mds_client *mdsc,
				 struct ceph_mds_session *session);
extern void ceph_send_cap_releases(struct ceph_mds_client *mdsc,
				   struct ceph_mds_session *session);

+14 −6
Original line number Diff line number Diff line
@@ -122,11 +122,21 @@ struct ceph_cap {
	struct rb_node ci_node;          /* per-ci cap tree */
	struct ceph_mds_session *session;
	struct list_head session_caps;   /* per-session caplist */
	int mds;
	u64 cap_id;       /* unique cap id (mds provided) */
	union {
		/* in-use caps */
		struct {
			int issued;       /* latest, from the mds */
	int implemented;  /* implemented superset of issued (for revocation) */
	int mds_wanted;
			int implemented;  /* implemented superset of
					     issued (for revocation) */
			int mds, mds_wanted;
		};
		/* caps to release */
		struct {
			u64 cap_ino;
			int queue_release;
		};
	};
	u32 seq, issue_seq, mseq;
	u32 cap_gen;      /* active/stale cycle */
	unsigned long last_used;
@@ -845,8 +855,6 @@ extern void ceph_put_cap(struct ceph_mds_client *mdsc,
			 struct ceph_cap *cap);
extern int ceph_is_any_caps(struct inode *inode);

extern void __queue_cap_release(struct ceph_mds_session *session, u64 ino,
				u64 cap_id, u32 migrate_seq, u32 issue_seq);
extern void ceph_queue_caps_release(struct inode *inode);
extern int ceph_write_inode(struct inode *inode, struct writeback_control *wbc);
extern int ceph_fsync(struct file *file, loff_t start, loff_t end,