Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit 37151668 authored by Yehuda Sadeh's avatar Yehuda Sadeh Committed by Sage Weil
Browse files

ceph: do caps accounting per mds_client



Caps related accounting is now being done per mds client instead
of just being global. This prepares ground work for a later revision
of the caps preallocated reservation list.

Signed-off-by: default avatarYehuda Sadeh <yehuda@hq.newdream.net>
Signed-off-by: default avatarSage Weil <sage@newdream.net>
parent 0deb01c9
Loading
Loading
Loading
Loading
+89 −98
Original line number Original line Diff line number Diff line
@@ -113,58 +113,41 @@ const char *ceph_cap_string(int caps)
	return cap_str[i];
	return cap_str[i];
}
}


/*
void ceph_caps_init(struct ceph_mds_client *mdsc)
 * Cap reservations
 *
 * Maintain a global pool of preallocated struct ceph_caps, referenced
 * by struct ceph_caps_reservations.  This ensures that we preallocate
 * memory needed to successfully process an MDS response.  (If an MDS
 * sends us cap information and we fail to process it, we will have
 * problems due to the client and MDS being out of sync.)
 *
 * Reservations are 'owned' by a ceph_cap_reservation context.
 */
static spinlock_t caps_list_lock;
static struct list_head caps_list;  /* unused (reserved or unreserved) */
static int caps_total_count;        /* total caps allocated */
static int caps_use_count;          /* in use */
static int caps_reserve_count;      /* unused, reserved */
static int caps_avail_count;        /* unused, unreserved */
static int caps_min_count;          /* keep at least this many (unreserved) */

void __init ceph_caps_init(void)
{
{
	INIT_LIST_HEAD(&caps_list);
	INIT_LIST_HEAD(&mdsc->caps_list);
	spin_lock_init(&caps_list_lock);
	spin_lock_init(&mdsc->caps_list_lock);
}
}


void ceph_caps_finalize(void)
void ceph_caps_finalize(struct ceph_mds_client *mdsc)
{
{
	struct ceph_cap *cap;
	struct ceph_cap *cap;


	spin_lock(&caps_list_lock);
	spin_lock(&mdsc->caps_list_lock);
	while (!list_empty(&caps_list)) {
	while (!list_empty(&mdsc->caps_list)) {
		cap = list_first_entry(&caps_list, struct ceph_cap, caps_item);
		cap = list_first_entry(&mdsc->caps_list,
				       struct ceph_cap, caps_item);
		list_del(&cap->caps_item);
		list_del(&cap->caps_item);
		kmem_cache_free(ceph_cap_cachep, cap);
		kmem_cache_free(ceph_cap_cachep, cap);
	}
	}
	caps_total_count = 0;
	mdsc->caps_total_count = 0;
	caps_avail_count = 0;
	mdsc->caps_avail_count = 0;
	caps_use_count = 0;
	mdsc->caps_use_count = 0;
	caps_reserve_count = 0;
	mdsc->caps_reserve_count = 0;
	caps_min_count = 0;
	mdsc->caps_min_count = 0;
	spin_unlock(&caps_list_lock);
	spin_unlock(&mdsc->caps_list_lock);
}
}


void ceph_adjust_min_caps(int delta)
void ceph_adjust_min_caps(struct ceph_mds_client *mdsc, int delta)
{
{
	spin_lock(&caps_list_lock);
	spin_lock(&mdsc->caps_list_lock);
	caps_min_count += delta;
	mdsc->caps_min_count += delta;
	BUG_ON(caps_min_count < 0);
	BUG_ON(mdsc->caps_min_count < 0);
	spin_unlock(&caps_list_lock);
	spin_unlock(&mdsc->caps_list_lock);
}
}


int ceph_reserve_caps(struct ceph_cap_reservation *ctx, int need)
int ceph_reserve_caps(struct ceph_mds_client *mdsc,
		      struct ceph_cap_reservation *ctx, int need)
{
{
	int i;
	int i;
	struct ceph_cap *cap;
	struct ceph_cap *cap;
@@ -176,16 +159,17 @@ int ceph_reserve_caps(struct ceph_cap_reservation *ctx, int need)
	dout("reserve caps ctx=%p need=%d\n", ctx, need);
	dout("reserve caps ctx=%p need=%d\n", ctx, need);


	/* first reserve any caps that are already allocated */
	/* first reserve any caps that are already allocated */
	spin_lock(&caps_list_lock);
	spin_lock(&mdsc->caps_list_lock);
	if (caps_avail_count >= need)
	if (mdsc->caps_avail_count >= need)
		have = need;
		have = need;
	else
	else
		have = caps_avail_count;
		have = mdsc->caps_avail_count;
	caps_avail_count -= have;
	mdsc->caps_avail_count -= have;
	caps_reserve_count += have;
	mdsc->caps_reserve_count += have;
	BUG_ON(caps_total_count != caps_use_count + caps_reserve_count +
	BUG_ON(mdsc->caps_total_count != mdsc->caps_use_count +
	       caps_avail_count);
					 mdsc->caps_reserve_count +
	spin_unlock(&caps_list_lock);
					 mdsc->caps_avail_count);
	spin_unlock(&mdsc->caps_list_lock);


	for (i = have; i < need; i++) {
	for (i = have; i < need; i++) {
		cap = kmem_cache_alloc(ceph_cap_cachep, GFP_NOFS);
		cap = kmem_cache_alloc(ceph_cap_cachep, GFP_NOFS);
@@ -198,19 +182,20 @@ int ceph_reserve_caps(struct ceph_cap_reservation *ctx, int need)
	}
	}
	BUG_ON(have + alloc != need);
	BUG_ON(have + alloc != need);


	spin_lock(&caps_list_lock);
	spin_lock(&mdsc->caps_list_lock);
	caps_total_count += alloc;
	mdsc->caps_total_count += alloc;
	caps_reserve_count += alloc;
	mdsc->caps_reserve_count += alloc;
	list_splice(&newcaps, &caps_list);
	list_splice(&newcaps, &mdsc->caps_list);


	BUG_ON(caps_total_count != caps_use_count + caps_reserve_count +
	BUG_ON(mdsc->caps_total_count != mdsc->caps_use_count +
	       caps_avail_count);
					 mdsc->caps_reserve_count +
	spin_unlock(&caps_list_lock);
					 mdsc->caps_avail_count);
	spin_unlock(&mdsc->caps_list_lock);


	ctx->count = need;
	ctx->count = need;
	dout("reserve caps ctx=%p %d = %d used + %d resv + %d avail\n",
	dout("reserve caps ctx=%p %d = %d used + %d resv + %d avail\n",
	     ctx, caps_total_count, caps_use_count, caps_reserve_count,
	     ctx, mdsc->caps_total_count, mdsc->caps_use_count,
	     caps_avail_count);
	     mdsc->caps_reserve_count, mdsc->caps_avail_count);
	return 0;
	return 0;


out_alloc_count:
out_alloc_count:
@@ -220,26 +205,29 @@ int ceph_reserve_caps(struct ceph_cap_reservation *ctx, int need)
	return ret;
	return ret;
}
}


int ceph_unreserve_caps(struct ceph_cap_reservation *ctx)
int ceph_unreserve_caps(struct ceph_mds_client *mdsc,
			struct ceph_cap_reservation *ctx)
{
{
	dout("unreserve caps ctx=%p count=%d\n", ctx, ctx->count);
	dout("unreserve caps ctx=%p count=%d\n", ctx, ctx->count);
	if (ctx->count) {
	if (ctx->count) {
		spin_lock(&caps_list_lock);
		spin_lock(&mdsc->caps_list_lock);
		BUG_ON(caps_reserve_count < ctx->count);
		BUG_ON(mdsc->caps_reserve_count < ctx->count);
		caps_reserve_count -= ctx->count;
		mdsc->caps_reserve_count -= ctx->count;
		caps_avail_count += ctx->count;
		mdsc->caps_avail_count += ctx->count;
		ctx->count = 0;
		ctx->count = 0;
		dout("unreserve caps %d = %d used + %d resv + %d avail\n",
		dout("unreserve caps %d = %d used + %d resv + %d avail\n",
		     caps_total_count, caps_use_count, caps_reserve_count,
		     mdsc->caps_total_count, mdsc->caps_use_count,
		     caps_avail_count);
		     mdsc->caps_reserve_count, mdsc->caps_avail_count);
		BUG_ON(caps_total_count != caps_use_count + caps_reserve_count +
		BUG_ON(mdsc->caps_total_count != mdsc->caps_use_count +
		       caps_avail_count);
						 mdsc->caps_reserve_count +
		spin_unlock(&caps_list_lock);
						 mdsc->caps_avail_count);
		spin_unlock(&mdsc->caps_list_lock);
	}
	}
	return 0;
	return 0;
}
}


static struct ceph_cap *get_cap(struct ceph_cap_reservation *ctx)
static struct ceph_cap *get_cap(struct ceph_mds_client *mdsc,
				struct ceph_cap_reservation *ctx)
{
{
	struct ceph_cap *cap = NULL;
	struct ceph_cap *cap = NULL;


@@ -247,71 +235,74 @@ static struct ceph_cap *get_cap(struct ceph_cap_reservation *ctx)
	if (!ctx) {
	if (!ctx) {
		cap = kmem_cache_alloc(ceph_cap_cachep, GFP_NOFS);
		cap = kmem_cache_alloc(ceph_cap_cachep, GFP_NOFS);
		if (cap) {
		if (cap) {
			caps_use_count++;
			mdsc->caps_use_count++;
			caps_total_count++;
			mdsc->caps_total_count++;
		}
		}
		return cap;
		return cap;
	}
	}


	spin_lock(&caps_list_lock);
	spin_lock(&mdsc->caps_list_lock);
	dout("get_cap ctx=%p (%d) %d = %d used + %d resv + %d avail\n",
	dout("get_cap ctx=%p (%d) %d = %d used + %d resv + %d avail\n",
	     ctx, ctx->count, caps_total_count, caps_use_count,
	     ctx, ctx->count, mdsc->caps_total_count, mdsc->caps_use_count,
	     caps_reserve_count, caps_avail_count);
	     mdsc->caps_reserve_count, mdsc->caps_avail_count);
	BUG_ON(!ctx->count);
	BUG_ON(!ctx->count);
	BUG_ON(ctx->count > caps_reserve_count);
	BUG_ON(ctx->count > mdsc->caps_reserve_count);
	BUG_ON(list_empty(&caps_list));
	BUG_ON(list_empty(&mdsc->caps_list));


	ctx->count--;
	ctx->count--;
	caps_reserve_count--;
	mdsc->caps_reserve_count--;
	caps_use_count++;
	mdsc->caps_use_count++;


	cap = list_first_entry(&caps_list, struct ceph_cap, caps_item);
	cap = list_first_entry(&mdsc->caps_list, struct ceph_cap, caps_item);
	list_del(&cap->caps_item);
	list_del(&cap->caps_item);


	BUG_ON(caps_total_count != caps_use_count + caps_reserve_count +
	BUG_ON(mdsc->caps_total_count != mdsc->caps_use_count +
	       caps_avail_count);
	       mdsc->caps_reserve_count + mdsc->caps_avail_count);
	spin_unlock(&caps_list_lock);
	spin_unlock(&mdsc->caps_list_lock);
	return cap;
	return cap;
}
}


void ceph_put_cap(struct ceph_cap *cap)
void ceph_put_cap(struct ceph_mds_client *mdsc, struct ceph_cap *cap)
{
{
	spin_lock(&caps_list_lock);
	spin_lock(&mdsc->caps_list_lock);
	dout("put_cap %p %d = %d used + %d resv + %d avail\n",
	dout("put_cap %p %d = %d used + %d resv + %d avail\n",
	     cap, caps_total_count, caps_use_count,
	     cap, mdsc->caps_total_count, mdsc->caps_use_count,
	     caps_reserve_count, caps_avail_count);
	     mdsc->caps_reserve_count, mdsc->caps_avail_count);
	caps_use_count--;
	mdsc->caps_use_count--;
	/*
	/*
	 * Keep some preallocated caps around (ceph_min_count), to
	 * Keep some preallocated caps around (ceph_min_count), to
	 * avoid lots of free/alloc churn.
	 * avoid lots of free/alloc churn.
	 */
	 */
	if (caps_avail_count >= caps_reserve_count + caps_min_count) {
	if (mdsc->caps_avail_count >= mdsc->caps_reserve_count +
		caps_total_count--;
				      mdsc->caps_min_count) {
		mdsc->caps_total_count--;
		kmem_cache_free(ceph_cap_cachep, cap);
		kmem_cache_free(ceph_cap_cachep, cap);
	} else {
	} else {
		caps_avail_count++;
		mdsc->caps_avail_count++;
		list_add(&cap->caps_item, &caps_list);
		list_add(&cap->caps_item, &mdsc->caps_list);
	}
	}


	BUG_ON(caps_total_count != caps_use_count + caps_reserve_count +
	BUG_ON(mdsc->caps_total_count != mdsc->caps_use_count +
	       caps_avail_count);
	       mdsc->caps_reserve_count + mdsc->caps_avail_count);
	spin_unlock(&caps_list_lock);
	spin_unlock(&mdsc->caps_list_lock);
}
}


void ceph_reservation_status(struct ceph_client *client,
void ceph_reservation_status(struct ceph_client *client,
			     int *total, int *avail, int *used, int *reserved,
			     int *total, int *avail, int *used, int *reserved,
			     int *min)
			     int *min)
{
{
	struct ceph_mds_client *mdsc = &client->mdsc;

	if (total)
	if (total)
		*total = caps_total_count;
		*total = mdsc->caps_total_count;
	if (avail)
	if (avail)
		*avail = caps_avail_count;
		*avail = mdsc->caps_avail_count;
	if (used)
	if (used)
		*used = caps_use_count;
		*used = mdsc->caps_use_count;
	if (reserved)
	if (reserved)
		*reserved = caps_reserve_count;
		*reserved = mdsc->caps_reserve_count;
	if (min)
	if (min)
		*min = caps_min_count;
		*min = mdsc->caps_min_count;
}
}


/*
/*
@@ -540,7 +531,7 @@ int ceph_add_cap(struct inode *inode,
			new_cap = NULL;
			new_cap = NULL;
		} else {
		} else {
			spin_unlock(&inode->i_lock);
			spin_unlock(&inode->i_lock);
			new_cap = get_cap(caps_reservation);
			new_cap = get_cap(mdsc, caps_reservation);
			if (new_cap == NULL)
			if (new_cap == NULL)
				return -ENOMEM;
				return -ENOMEM;
			goto retry;
			goto retry;
@@ -898,7 +889,7 @@ void __ceph_remove_cap(struct ceph_cap *cap)
		ci->i_auth_cap = NULL;
		ci->i_auth_cap = NULL;


	if (removed)
	if (removed)
		ceph_put_cap(cap);
		ceph_put_cap(mdsc, cap);


	if (!__ceph_is_any_caps(ci) && ci->i_snap_realm) {
	if (!__ceph_is_any_caps(ci) && ci->i_snap_realm) {
		struct ceph_snap_realm *realm = ci->i_snap_realm;
		struct ceph_snap_realm *realm = ci->i_snap_realm;
+11 −5
Original line number Original line Diff line number Diff line
@@ -449,7 +449,7 @@ void ceph_mdsc_release_request(struct kref *kref)
	kfree(req->r_path1);
	kfree(req->r_path1);
	kfree(req->r_path2);
	kfree(req->r_path2);
	put_request_session(req);
	put_request_session(req);
	ceph_unreserve_caps(&req->r_caps_reservation);
	ceph_unreserve_caps(req->r_mdsc, &req->r_caps_reservation);
	kfree(req);
	kfree(req);
}
}


@@ -512,7 +512,8 @@ static void __register_request(struct ceph_mds_client *mdsc,
{
{
	req->r_tid = ++mdsc->last_tid;
	req->r_tid = ++mdsc->last_tid;
	if (req->r_num_caps)
	if (req->r_num_caps)
		ceph_reserve_caps(&req->r_caps_reservation, req->r_num_caps);
		ceph_reserve_caps(mdsc, &req->r_caps_reservation,
				  req->r_num_caps);
	dout("__register_request %p tid %lld\n", req, req->r_tid);
	dout("__register_request %p tid %lld\n", req, req->r_tid);
	ceph_mdsc_get_request(req);
	ceph_mdsc_get_request(req);
	__insert_request(mdsc, req);
	__insert_request(mdsc, req);
@@ -764,7 +765,7 @@ static int iterate_session_caps(struct ceph_mds_session *session,
			last_inode = NULL;
			last_inode = NULL;
		}
		}
		if (old_cap) {
		if (old_cap) {
			ceph_put_cap(old_cap);
			ceph_put_cap(session->s_mdsc, old_cap);
			old_cap = NULL;
			old_cap = NULL;
		}
		}


@@ -793,7 +794,7 @@ static int iterate_session_caps(struct ceph_mds_session *session,
	if (last_inode)
	if (last_inode)
		iput(last_inode);
		iput(last_inode);
	if (old_cap)
	if (old_cap)
		ceph_put_cap(old_cap);
		ceph_put_cap(session->s_mdsc, old_cap);


	return ret;
	return ret;
}
}
@@ -1251,6 +1252,7 @@ ceph_mdsc_create_request(struct ceph_mds_client *mdsc, int op, int mode)
		return ERR_PTR(-ENOMEM);
		return ERR_PTR(-ENOMEM);


	mutex_init(&req->r_fill_mutex);
	mutex_init(&req->r_fill_mutex);
	req->r_mdsc = mdsc;
	req->r_started = jiffies;
	req->r_started = jiffies;
	req->r_resend_mds = -1;
	req->r_resend_mds = -1;
	INIT_LIST_HEAD(&req->r_unsafe_dir_item);
	INIT_LIST_HEAD(&req->r_unsafe_dir_item);
@@ -1986,7 +1988,7 @@ static void handle_reply(struct ceph_mds_session *session, struct ceph_msg *msg)
	if (err == 0) {
	if (err == 0) {
		if (result == 0 && rinfo->dir_nr)
		if (result == 0 && rinfo->dir_nr)
			ceph_readdir_prepopulate(req, req->r_session);
			ceph_readdir_prepopulate(req, req->r_session);
		ceph_unreserve_caps(&req->r_caps_reservation);
		ceph_unreserve_caps(mdsc, &req->r_caps_reservation);
	}
	}
	mutex_unlock(&req->r_fill_mutex);
	mutex_unlock(&req->r_fill_mutex);


@@ -2767,6 +2769,9 @@ int ceph_mdsc_init(struct ceph_mds_client *mdsc, struct ceph_client *client)
	spin_lock_init(&mdsc->dentry_lru_lock);
	spin_lock_init(&mdsc->dentry_lru_lock);
	INIT_LIST_HEAD(&mdsc->dentry_lru);
	INIT_LIST_HEAD(&mdsc->dentry_lru);


	ceph_caps_init(mdsc);
	ceph_adjust_min_caps(mdsc, client->min_caps);

	return 0;
	return 0;
}
}


@@ -2962,6 +2967,7 @@ void ceph_mdsc_stop(struct ceph_mds_client *mdsc)
	if (mdsc->mdsmap)
	if (mdsc->mdsmap)
		ceph_mdsmap_destroy(mdsc->mdsmap);
		ceph_mdsmap_destroy(mdsc->mdsmap);
	kfree(mdsc->sessions);
	kfree(mdsc->sessions);
	ceph_caps_finalize(mdsc);
}
}




+22 −0
Original line number Original line Diff line number Diff line
@@ -151,6 +151,7 @@ typedef void (*ceph_mds_request_callback_t) (struct ceph_mds_client *mdsc,
struct ceph_mds_request {
struct ceph_mds_request {
	u64 r_tid;                   /* transaction id */
	u64 r_tid;                   /* transaction id */
	struct rb_node r_node;
	struct rb_node r_node;
	struct ceph_mds_client *r_mdsc;


	int r_op;                    /* mds op code */
	int r_op;                    /* mds op code */
	int r_mds;
	int r_mds;
@@ -267,6 +268,27 @@ struct ceph_mds_client {
	spinlock_t        cap_dirty_lock;   /* protects above items */
	spinlock_t        cap_dirty_lock;   /* protects above items */
	wait_queue_head_t cap_flushing_wq;
	wait_queue_head_t cap_flushing_wq;


	/*
	 * Cap reservations
	 *
	 * Maintain a global pool of preallocated struct ceph_caps, referenced
	 * by struct ceph_caps_reservations.  This ensures that we preallocate
	 * memory needed to successfully process an MDS response.  (If an MDS
	 * sends us cap information and we fail to process it, we will have
	 * problems due to the client and MDS being out of sync.)
	 *
	 * Reservations are 'owned' by a ceph_cap_reservation context.
	 */
	spinlock_t	caps_list_lock;
	struct		list_head caps_list; /* unused (reserved or
						unreserved) */
	int		caps_total_count;    /* total caps allocated */
	int		caps_use_count;      /* in use */
	int		caps_reserve_count;  /* unused, reserved */
	int		caps_avail_count;    /* unused, unreserved */
	int		caps_min_count;      /* keep at least this many
						(unreserved) */

#ifdef CONFIG_DEBUG_FS
#ifdef CONFIG_DEBUG_FS
	struct dentry 	  *debugfs_file;
	struct dentry 	  *debugfs_file;
#endif
#endif
+0 −6
Original line number Original line Diff line number Diff line
@@ -630,7 +630,6 @@ static struct ceph_client *ceph_create_client(struct ceph_mount_args *args)


	/* caps */
	/* caps */
	client->min_caps = args->max_readdir;
	client->min_caps = args->max_readdir;
	ceph_adjust_min_caps(client->min_caps);


	/* subsystems */
	/* subsystems */
	err = ceph_monc_init(&client->monc, client);
	err = ceph_monc_init(&client->monc, client);
@@ -680,8 +679,6 @@ static void ceph_destroy_client(struct ceph_client *client)


	ceph_monc_stop(&client->monc);
	ceph_monc_stop(&client->monc);


	ceph_adjust_min_caps(-client->min_caps);

	ceph_debugfs_client_cleanup(client);
	ceph_debugfs_client_cleanup(client);
	destroy_workqueue(client->wb_wq);
	destroy_workqueue(client->wb_wq);
	destroy_workqueue(client->pg_inv_wq);
	destroy_workqueue(client->pg_inv_wq);
@@ -1043,8 +1040,6 @@ static int __init init_ceph(void)
	if (ret)
	if (ret)
		goto out_msgr;
		goto out_msgr;


	ceph_caps_init();

	ret = register_filesystem(&ceph_fs_type);
	ret = register_filesystem(&ceph_fs_type);
	if (ret)
	if (ret)
		goto out_icache;
		goto out_icache;
@@ -1069,7 +1064,6 @@ static void __exit exit_ceph(void)
{
{
	dout("exit_ceph\n");
	dout("exit_ceph\n");
	unregister_filesystem(&ceph_fs_type);
	unregister_filesystem(&ceph_fs_type);
	ceph_caps_finalize();
	destroy_caches();
	destroy_caches();
	ceph_msgr_exit();
	ceph_msgr_exit();
	ceph_debugfs_cleanup();
	ceph_debugfs_cleanup();
+9 −6
Original line number Original line Diff line number Diff line
@@ -560,11 +560,13 @@ static inline int __ceph_caps_wanted(struct ceph_inode_info *ci)
/* what the mds thinks we want */
/* what the mds thinks we want */
extern int __ceph_caps_mds_wanted(struct ceph_inode_info *ci);
extern int __ceph_caps_mds_wanted(struct ceph_inode_info *ci);


extern void ceph_caps_init(void);
extern void ceph_caps_init(struct ceph_mds_client *mdsc);
extern void ceph_caps_finalize(void);
extern void ceph_caps_finalize(struct ceph_mds_client *mdsc);
extern void ceph_adjust_min_caps(int delta);
extern void ceph_adjust_min_caps(struct ceph_mds_client *mdsc, int delta);
extern int ceph_reserve_caps(struct ceph_cap_reservation *ctx, int need);
extern int ceph_reserve_caps(struct ceph_mds_client *mdsc,
extern int ceph_unreserve_caps(struct ceph_cap_reservation *ctx);
			     struct ceph_cap_reservation *ctx, int need);
extern int ceph_unreserve_caps(struct ceph_mds_client *mdsc,
			       struct ceph_cap_reservation *ctx);
extern void ceph_reservation_status(struct ceph_client *client,
extern void ceph_reservation_status(struct ceph_client *client,
				    int *total, int *avail, int *used,
				    int *total, int *avail, int *used,
				    int *reserved, int *min);
				    int *reserved, int *min);
@@ -806,7 +808,8 @@ static inline void ceph_remove_cap(struct ceph_cap *cap)
	__ceph_remove_cap(cap);
	__ceph_remove_cap(cap);
	spin_unlock(&inode->i_lock);
	spin_unlock(&inode->i_lock);
}
}
extern void ceph_put_cap(struct ceph_cap *cap);
extern void ceph_put_cap(struct ceph_mds_client *mdsc,
			 struct ceph_cap *cap);


extern void ceph_queue_caps_release(struct inode *inode);
extern void ceph_queue_caps_release(struct inode *inode);
extern int ceph_write_inode(struct inode *inode, struct writeback_control *wbc);
extern int ceph_write_inode(struct inode *inode, struct writeback_control *wbc);