Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit adb072d3 authored by Linus Torvalds's avatar Linus Torvalds
Browse files

Merge tag 'ceph-for-4.15-rc1' of git://github.com/ceph/ceph-client

Pull ceph updates from Ilya Dryomov:
 "We have a set of file locking improvements from Zheng, rbd rw/ro state
  handling code cleanup from myself and some assorted CephFS fixes from
  Jeff.

  rbd now defaults to single-major=Y, lifting the limit of ~240 rbd
  images per host for everyone"

* tag 'ceph-for-4.15-rc1' of git://github.com/ceph/ceph-client:
  rbd: default to single-major device number scheme
  libceph: don't WARN() if user tries to add invalid key
  rbd: set discard_alignment to zero
  ceph: silence sparse endianness warning in encode_caps_cb
  ceph: remove the bump of i_version
  ceph: present consistent fsid, regardless of arch endianness
  ceph: clean up spinlocking and list handling around cleanup_cap_releases()
  rbd: get rid of rbd_mapping::read_only
  rbd: fix and simplify rbd_ioctl_set_ro()
  ceph: remove unused and redundant variable dropping
  ceph: mark expected switch fall-throughs
  ceph: -EINVAL on decoding failure in ceph_mdsc_handle_fsmap()
  ceph: disable cached readdir after dropping positive dentry
  ceph: fix bool initialization/comparison
  ceph: handle 'session get evicted while there are file locks'
  ceph: optimize flock encoding during reconnect
  ceph: make lock_to_ceph_filelock() static
  ceph: keep auth cap when inode has flocks or posix locks
parents 11ca75d2 3cfa3b16
Loading
Loading
Loading
Loading
+13 −52
Original line number Diff line number Diff line
@@ -348,7 +348,6 @@ struct rbd_client_id {
struct rbd_mapping {
	u64                     size;
	u64                     features;
	bool			read_only;
};

/*
@@ -450,12 +449,11 @@ static DEFINE_IDA(rbd_dev_id_ida);
static struct workqueue_struct *rbd_wq;

/*
 * Default to false for now, as single-major requires >= 0.75 version of
 * userspace rbd utility.
 * single-major requires >= 0.75 version of userspace rbd utility.
 */
static bool single_major = false;
static bool single_major = true;
module_param(single_major, bool, S_IRUGO);
MODULE_PARM_DESC(single_major, "Use a single major number for all rbd devices (default: false)");
MODULE_PARM_DESC(single_major, "Use a single major number for all rbd devices (default: true)");

static int rbd_img_request_submit(struct rbd_img_request *img_request);

@@ -608,9 +606,6 @@ static int rbd_open(struct block_device *bdev, fmode_t mode)
	struct rbd_device *rbd_dev = bdev->bd_disk->private_data;
	bool removing = false;

	if ((mode & FMODE_WRITE) && rbd_dev->mapping.read_only)
		return -EROFS;

	spin_lock_irq(&rbd_dev->lock);
	if (test_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags))
		removing = true;
@@ -640,46 +635,24 @@ static void rbd_release(struct gendisk *disk, fmode_t mode)

static int rbd_ioctl_set_ro(struct rbd_device *rbd_dev, unsigned long arg)
{
	int ret = 0;
	int val;
	bool ro;
	bool ro_changed = false;
	int ro;

	/* get_user() may sleep, so call it before taking rbd_dev->lock */
	if (get_user(val, (int __user *)(arg)))
	if (get_user(ro, (int __user *)arg))
		return -EFAULT;

	ro = val ? true : false;
	/* Snapshot doesn't allow to write*/
	/* Snapshots can't be marked read-write */
	if (rbd_dev->spec->snap_id != CEPH_NOSNAP && !ro)
		return -EROFS;

	spin_lock_irq(&rbd_dev->lock);
	/* prevent others open this device */
	if (rbd_dev->open_count > 1) {
		ret = -EBUSY;
		goto out;
	}

	if (rbd_dev->mapping.read_only != ro) {
		rbd_dev->mapping.read_only = ro;
		ro_changed = true;
	}

out:
	spin_unlock_irq(&rbd_dev->lock);
	/* set_disk_ro() may sleep, so call it after releasing rbd_dev->lock */
	if (ret == 0 && ro_changed)
		set_disk_ro(rbd_dev->disk, ro ? 1 : 0);

	return ret;
	/* Let blkdev_roset() handle it */
	return -ENOTTY;
}

static int rbd_ioctl(struct block_device *bdev, fmode_t mode,
			unsigned int cmd, unsigned long arg)
{
	struct rbd_device *rbd_dev = bdev->bd_disk->private_data;
	int ret = 0;
	int ret;

	switch (cmd) {
	case BLKROSET:
@@ -4050,15 +4023,8 @@ static void rbd_queue_workfn(struct work_struct *work)
		goto err_rq;
	}

	/* Only reads are allowed to a read-only device */

	if (op_type != OBJ_OP_READ) {
		if (rbd_dev->mapping.read_only) {
			result = -EROFS;
			goto err_rq;
		}
		rbd_assert(rbd_dev->spec->snap_id == CEPH_NOSNAP);
	}
	rbd_assert(op_type == OBJ_OP_READ ||
		   rbd_dev->spec->snap_id == CEPH_NOSNAP);

	/*
	 * Quit early if the mapped snapshot no longer exists.  It's
@@ -4423,7 +4389,6 @@ static int rbd_init_disk(struct rbd_device *rbd_dev)
	/* enable the discard support */
	queue_flag_set_unlocked(QUEUE_FLAG_DISCARD, q);
	q->limits.discard_granularity = segment_size;
	q->limits.discard_alignment = segment_size;
	blk_queue_max_discard_sectors(q, segment_size / SECTOR_SIZE);
	blk_queue_max_write_zeroes_sectors(q, segment_size / SECTOR_SIZE);

@@ -5994,7 +5959,7 @@ static int rbd_dev_device_setup(struct rbd_device *rbd_dev)
		goto err_out_disk;

	set_capacity(rbd_dev->disk, rbd_dev->mapping.size / SECTOR_SIZE);
	set_disk_ro(rbd_dev->disk, rbd_dev->mapping.read_only);
	set_disk_ro(rbd_dev->disk, rbd_dev->opts->read_only);

	ret = dev_set_name(&rbd_dev->dev, "%d", rbd_dev->dev_id);
	if (ret)
@@ -6145,7 +6110,6 @@ static ssize_t do_rbd_add(struct bus_type *bus,
	struct rbd_options *rbd_opts = NULL;
	struct rbd_spec *spec = NULL;
	struct rbd_client *rbdc;
	bool read_only;
	int rc;

	if (!try_module_get(THIS_MODULE))
@@ -6194,11 +6158,8 @@ static ssize_t do_rbd_add(struct bus_type *bus,
	}

	/* If we are mapping a snapshot it must be marked read-only */

	read_only = rbd_dev->opts->read_only;
	if (rbd_dev->spec->snap_id != CEPH_NOSNAP)
		read_only = true;
	rbd_dev->mapping.read_only = read_only;
		rbd_dev->opts->read_only = true;

	rc = rbd_dev_device_setup(rbd_dev);
	if (rc)
+4 −5
Original line number Diff line number Diff line
@@ -1160,7 +1160,7 @@ static int __send_cap(struct ceph_mds_client *mdsc, struct ceph_cap *cap,
	struct ceph_inode_info *ci = cap->ci;
	struct inode *inode = &ci->vfs_inode;
	struct cap_msg_args arg;
	int held, revoking, dropping;
	int held, revoking;
	int wake = 0;
	int delayed = 0;
	int ret;
@@ -1168,7 +1168,6 @@ static int __send_cap(struct ceph_mds_client *mdsc, struct ceph_cap *cap,
	held = cap->issued | cap->implemented;
	revoking = cap->implemented & ~cap->issued;
	retain &= ~revoking;
	dropping = cap->issued & ~retain;

	dout("__send_cap %p cap %p session %p %s -> %s (revoking %s)\n",
	     inode, cap, cap->session,
@@ -1712,7 +1711,7 @@ void ceph_check_caps(struct ceph_inode_info *ci, int flags,

	/* if we are unmounting, flush any unused caps immediately. */
	if (mdsc->stopping)
		is_delayed = 1;
		is_delayed = true;

	spin_lock(&ci->i_ceph_lock);

@@ -3189,8 +3188,8 @@ static void handle_cap_flush_ack(struct inode *inode, u64 flush_tid,
	int dirty = le32_to_cpu(m->dirty);
	int cleaned = 0;
	bool drop = false;
	bool wake_ci = 0;
	bool wake_mdsc = 0;
	bool wake_ci = false;
	bool wake_mdsc = false;

	list_for_each_entry_safe(cf, tmp_cf, &ci->i_cap_flush_list, i_list) {
		if (cf->tid == flush_tid)
+7 −2
Original line number Diff line number Diff line
@@ -493,6 +493,7 @@ struct inode *ceph_alloc_inode(struct super_block *sb)
	ci->i_wb_ref = 0;
	ci->i_wrbuffer_ref = 0;
	ci->i_wrbuffer_ref_head = 0;
	atomic_set(&ci->i_filelock_ref, 0);
	ci->i_shared_gen = 0;
	ci->i_rdcache_gen = 0;
	ci->i_rdcache_revoking = 0;
@@ -786,7 +787,6 @@ static int fill_inode(struct inode *inode, struct page *locked_page,

	/* update inode */
	ci->i_version = le64_to_cpu(info->version);
	inode->i_version++;
	inode->i_rdev = le32_to_cpu(info->rdev);
	inode->i_blkbits = fls(le32_to_cpu(info->layout.fl_stripe_unit)) - 1;

@@ -1185,6 +1185,7 @@ int ceph_fill_trace(struct super_block *sb, struct ceph_mds_request *req)
				    ceph_snap(d_inode(dn)) != tvino.snap)) {
				dout(" dn %p points to wrong inode %p\n",
				     dn, d_inode(dn));
				ceph_dir_clear_ordered(dir);
				d_delete(dn);
				dput(dn);
				goto retry_lookup;
@@ -1322,6 +1323,7 @@ int ceph_fill_trace(struct super_block *sb, struct ceph_mds_request *req)
			dout(" %p links to %p %llx.%llx, not %llx.%llx\n",
			     dn, d_inode(dn), ceph_vinop(d_inode(dn)),
			     ceph_vinop(in));
			ceph_dir_clear_ordered(dir);
			d_invalidate(dn);
			have_lease = false;
		}
@@ -1573,6 +1575,7 @@ int ceph_readdir_prepopulate(struct ceph_mds_request *req,
			    ceph_snap(d_inode(dn)) != tvino.snap)) {
			dout(" dn %p points to wrong inode %p\n",
			     dn, d_inode(dn));
			__ceph_dir_clear_ordered(ci);
			d_delete(dn);
			dput(dn);
			goto retry_lookup;
@@ -1597,7 +1600,9 @@ int ceph_readdir_prepopulate(struct ceph_mds_request *req,
				 &req->r_caps_reservation);
		if (ret < 0) {
			pr_err("fill_inode badness on %p\n", in);
			if (d_really_is_negative(dn))
			if (d_really_is_positive(dn))
				__ceph_dir_clear_ordered(ci);
			else
				iput(in);
			d_drop(dn);
			err = ret;
+126 −51
Original line number Diff line number Diff line
@@ -30,19 +30,52 @@ void __init ceph_flock_init(void)
	get_random_bytes(&lock_secret, sizeof(lock_secret));
}

static void ceph_fl_copy_lock(struct file_lock *dst, struct file_lock *src)
{
	struct inode *inode = file_inode(src->fl_file);
	atomic_inc(&ceph_inode(inode)->i_filelock_ref);
}

static void ceph_fl_release_lock(struct file_lock *fl)
{
	struct inode *inode = file_inode(fl->fl_file);
	struct ceph_inode_info *ci = ceph_inode(inode);
	if (atomic_dec_and_test(&ci->i_filelock_ref)) {
		/* clear error when all locks are released */
		spin_lock(&ci->i_ceph_lock);
		ci->i_ceph_flags &= ~CEPH_I_ERROR_FILELOCK;
		spin_unlock(&ci->i_ceph_lock);
	}
}

static const struct file_lock_operations ceph_fl_lock_ops = {
	.fl_copy_lock = ceph_fl_copy_lock,
	.fl_release_private = ceph_fl_release_lock,
};

/**
 * Implement fcntl and flock locking functions.
 */
static int ceph_lock_message(u8 lock_type, u16 operation, struct file *file,
static int ceph_lock_message(u8 lock_type, u16 operation, struct inode *inode,
			     int cmd, u8 wait, struct file_lock *fl)
{
	struct inode *inode = file_inode(file);
	struct ceph_mds_client *mdsc = ceph_sb_to_client(inode->i_sb)->mdsc;
	struct ceph_mds_request *req;
	int err;
	u64 length = 0;
	u64 owner;

	if (operation == CEPH_MDS_OP_SETFILELOCK) {
		/*
		 * increasing i_filelock_ref closes race window between
		 * handling request reply and adding file_lock struct to
		 * inode. Otherwise, auth caps may get trimmed in the
		 * window. Caller function will decrease the counter.
		 */
		fl->fl_ops = &ceph_fl_lock_ops;
		atomic_inc(&ceph_inode(inode)->i_filelock_ref);
	}

	if (operation != CEPH_MDS_OP_SETFILELOCK || cmd == CEPH_LOCK_UNLOCK)
		wait = 0;

@@ -180,10 +213,12 @@ static int ceph_lock_wait_for_completion(struct ceph_mds_client *mdsc,
 */
int ceph_lock(struct file *file, int cmd, struct file_lock *fl)
{
	u8 lock_cmd;
	int err;
	u8 wait = 0;
	struct inode *inode = file_inode(file);
	struct ceph_inode_info *ci = ceph_inode(inode);
	int err = 0;
	u16 op = CEPH_MDS_OP_SETFILELOCK;
	u8 wait = 0;
	u8 lock_cmd;

	if (!(fl->fl_flags & FL_POSIX))
		return -ENOLCK;
@@ -199,6 +234,26 @@ int ceph_lock(struct file *file, int cmd, struct file_lock *fl)
	else if (IS_SETLKW(cmd))
		wait = 1;

	spin_lock(&ci->i_ceph_lock);
	if (ci->i_ceph_flags & CEPH_I_ERROR_FILELOCK) {
		err = -EIO;
	} else if (op == CEPH_MDS_OP_SETFILELOCK) {
		/*
		 * increasing i_filelock_ref closes race window between
		 * handling request reply and adding file_lock struct to
		 * inode. Otherwise, i_auth_cap may get trimmed in the
		 * window. Caller function will decrease the counter.
		 */
		fl->fl_ops = &ceph_fl_lock_ops;
		atomic_inc(&ci->i_filelock_ref);
	}
	spin_unlock(&ci->i_ceph_lock);
	if (err < 0) {
		if (op == CEPH_MDS_OP_SETFILELOCK && F_UNLCK == fl->fl_type)
			posix_lock_file(file, fl, NULL);
		return err;
	}

	if (F_RDLCK == fl->fl_type)
		lock_cmd = CEPH_LOCK_SHARED;
	else if (F_WRLCK == fl->fl_type)
@@ -206,16 +261,16 @@ int ceph_lock(struct file *file, int cmd, struct file_lock *fl)
	else
		lock_cmd = CEPH_LOCK_UNLOCK;

	err = ceph_lock_message(CEPH_LOCK_FCNTL, op, file, lock_cmd, wait, fl);
	err = ceph_lock_message(CEPH_LOCK_FCNTL, op, inode, lock_cmd, wait, fl);
	if (!err) {
		if (op != CEPH_MDS_OP_GETFILELOCK) {
		if (op == CEPH_MDS_OP_SETFILELOCK) {
			dout("mds locked, locking locally");
			err = posix_lock_file(file, fl, NULL);
			if (err && (CEPH_MDS_OP_SETFILELOCK == op)) {
			if (err) {
				/* undo! This should only happen if
				 * the kernel detects local
				 * deadlock. */
				ceph_lock_message(CEPH_LOCK_FCNTL, op, file,
				ceph_lock_message(CEPH_LOCK_FCNTL, op, inode,
						  CEPH_LOCK_UNLOCK, 0, fl);
				dout("got %d on posix_lock_file, undid lock",
				     err);
@@ -227,9 +282,11 @@ int ceph_lock(struct file *file, int cmd, struct file_lock *fl)

int ceph_flock(struct file *file, int cmd, struct file_lock *fl)
{
	u8 lock_cmd;
	int err;
	struct inode *inode = file_inode(file);
	struct ceph_inode_info *ci = ceph_inode(inode);
	int err = 0;
	u8 wait = 0;
	u8 lock_cmd;

	if (!(fl->fl_flags & FL_FLOCK))
		return -ENOLCK;
@@ -239,6 +296,21 @@ int ceph_flock(struct file *file, int cmd, struct file_lock *fl)

	dout("ceph_flock, fl_file: %p", fl->fl_file);

	spin_lock(&ci->i_ceph_lock);
	if (ci->i_ceph_flags & CEPH_I_ERROR_FILELOCK) {
		err = -EIO;
	} else {
		/* see comment in ceph_lock */
		fl->fl_ops = &ceph_fl_lock_ops;
		atomic_inc(&ci->i_filelock_ref);
	}
	spin_unlock(&ci->i_ceph_lock);
	if (err < 0) {
		if (F_UNLCK == fl->fl_type)
			locks_lock_file_wait(file, fl);
		return err;
	}

	if (IS_SETLKW(cmd))
		wait = 1;

@@ -250,13 +322,13 @@ int ceph_flock(struct file *file, int cmd, struct file_lock *fl)
		lock_cmd = CEPH_LOCK_UNLOCK;

	err = ceph_lock_message(CEPH_LOCK_FLOCK, CEPH_MDS_OP_SETFILELOCK,
				file, lock_cmd, wait, fl);
				inode, lock_cmd, wait, fl);
	if (!err) {
		err = locks_lock_file_wait(file, fl);
		if (err) {
			ceph_lock_message(CEPH_LOCK_FLOCK,
					  CEPH_MDS_OP_SETFILELOCK,
					  file, CEPH_LOCK_UNLOCK, 0, fl);
					  inode, CEPH_LOCK_UNLOCK, 0, fl);
			dout("got %d on locks_lock_file_wait, undid lock", err);
		}
	}
@@ -288,6 +360,37 @@ void ceph_count_locks(struct inode *inode, int *fcntl_count, int *flock_count)
	     *flock_count, *fcntl_count);
}

/*
 * Given a pointer to a lock, convert it to a ceph filelock
 */
static int lock_to_ceph_filelock(struct file_lock *lock,
				 struct ceph_filelock *cephlock)
{
	int err = 0;
	cephlock->start = cpu_to_le64(lock->fl_start);
	cephlock->length = cpu_to_le64(lock->fl_end - lock->fl_start + 1);
	cephlock->client = cpu_to_le64(0);
	cephlock->pid = cpu_to_le64((u64)lock->fl_pid);
	cephlock->owner = cpu_to_le64(secure_addr(lock->fl_owner));

	switch (lock->fl_type) {
	case F_RDLCK:
		cephlock->type = CEPH_LOCK_SHARED;
		break;
	case F_WRLCK:
		cephlock->type = CEPH_LOCK_EXCL;
		break;
	case F_UNLCK:
		cephlock->type = CEPH_LOCK_UNLOCK;
		break;
	default:
		dout("Have unknown lock type %d", lock->fl_type);
		err = -EINVAL;
	}

	return err;
}

/**
 * Encode the flock and fcntl locks for the given inode into the ceph_filelock
 * array. Must be called with inode->i_lock already held.
@@ -356,50 +459,22 @@ int ceph_locks_to_pagelist(struct ceph_filelock *flocks,
	if (err)
		goto out_fail;

	if (num_fcntl_locks > 0) {
		err = ceph_pagelist_append(pagelist, flocks,
					   num_fcntl_locks * sizeof(*flocks));
		if (err)
			goto out_fail;
	}

	nlocks = cpu_to_le32(num_flock_locks);
	err = ceph_pagelist_append(pagelist, &nlocks, sizeof(nlocks));
	if (err)
		goto out_fail;

	err = ceph_pagelist_append(pagelist,
				   &flocks[num_fcntl_locks],
	if (num_flock_locks > 0) {
		err = ceph_pagelist_append(pagelist, &flocks[num_fcntl_locks],
					   num_flock_locks * sizeof(*flocks));
out_fail:
	return err;
	}

/*
 * Given a pointer to a lock, convert it to a ceph filelock
 */
int lock_to_ceph_filelock(struct file_lock *lock,
			  struct ceph_filelock *cephlock)
{
	int err = 0;
	cephlock->start = cpu_to_le64(lock->fl_start);
	cephlock->length = cpu_to_le64(lock->fl_end - lock->fl_start + 1);
	cephlock->client = cpu_to_le64(0);
	cephlock->pid = cpu_to_le64((u64)lock->fl_pid);
	cephlock->owner = cpu_to_le64(secure_addr(lock->fl_owner));

	switch (lock->fl_type) {
	case F_RDLCK:
		cephlock->type = CEPH_LOCK_SHARED;
		break;
	case F_WRLCK:
		cephlock->type = CEPH_LOCK_EXCL;
		break;
	case F_UNLCK:
		cephlock->type = CEPH_LOCK_UNLOCK;
		break;
	default:
		dout("Have unknown lock type %d", lock->fl_type);
		err = -EINVAL;
	}

out_fail:
	return err;
}
+63 −33
Original line number Diff line number Diff line
@@ -1039,22 +1039,23 @@ void ceph_mdsc_open_export_target_sessions(struct ceph_mds_client *mdsc,
 * session caps
 */

/* caller holds s_cap_lock, we drop it */
static void cleanup_cap_releases(struct ceph_mds_client *mdsc,
				 struct ceph_mds_session *session)
	__releases(session->s_cap_lock)
static void detach_cap_releases(struct ceph_mds_session *session,
				struct list_head *target)
{
	LIST_HEAD(tmp_list);
	list_splice_init(&session->s_cap_releases, &tmp_list);
	lockdep_assert_held(&session->s_cap_lock);

	list_splice_init(&session->s_cap_releases, target);
	session->s_num_cap_releases = 0;
	spin_unlock(&session->s_cap_lock);
	dout("dispose_cap_releases mds%d\n", session->s_mds);
}

	dout("cleanup_cap_releases mds%d\n", session->s_mds);
	while (!list_empty(&tmp_list)) {
static void dispose_cap_releases(struct ceph_mds_client *mdsc,
				 struct list_head *dispose)
{
	while (!list_empty(dispose)) {
		struct ceph_cap *cap;
		/* zero out the in-progress message */
		cap = list_first_entry(&tmp_list,
					struct ceph_cap, session_caps);
		cap = list_first_entry(dispose, struct ceph_cap, session_caps);
		list_del(&cap->session_caps);
		ceph_put_cap(mdsc, cap);
	}
@@ -1215,6 +1216,13 @@ static int remove_session_caps_cb(struct inode *inode, struct ceph_cap *cap,
		}
		spin_unlock(&mdsc->cap_dirty_lock);

		if (atomic_read(&ci->i_filelock_ref) > 0) {
			/* make further file lock syscall return -EIO */
			ci->i_ceph_flags |= CEPH_I_ERROR_FILELOCK;
			pr_warn_ratelimited(" dropping file locks for %p %lld\n",
					    inode, ceph_ino(inode));
		}

		if (!ci->i_dirty_caps && ci->i_prealloc_cap_flush) {
			list_add(&ci->i_prealloc_cap_flush->i_list, &to_remove);
			ci->i_prealloc_cap_flush = NULL;
@@ -1244,6 +1252,8 @@ static void remove_session_caps(struct ceph_mds_session *session)
{
	struct ceph_fs_client *fsc = session->s_mdsc->fsc;
	struct super_block *sb = fsc->sb;
	LIST_HEAD(dispose);

	dout("remove_session_caps on %p\n", session);
	iterate_session_caps(session, remove_session_caps_cb, fsc);

@@ -1278,10 +1288,12 @@ static void remove_session_caps(struct ceph_mds_session *session)
	}

	// drop cap expires and unlock s_cap_lock
	cleanup_cap_releases(session->s_mdsc, session);
	detach_cap_releases(session, &dispose);

	BUG_ON(session->s_nr_caps > 0);
	BUG_ON(!list_empty(&session->s_cap_flushing));
	spin_unlock(&session->s_cap_lock);
	dispose_cap_releases(session->s_mdsc, &dispose);
}

/*
@@ -1462,6 +1474,11 @@ static int trim_caps_cb(struct inode *inode, struct ceph_cap *cap, void *arg)
			goto out;
		if ((used | wanted) & CEPH_CAP_ANY_WR)
			goto out;
		/* Note: it's possible that i_filelock_ref becomes non-zero
		 * after dropping auth caps. It doesn't hurt because reply
		 * of lock mds request will re-add auth caps. */
		if (atomic_read(&ci->i_filelock_ref) > 0)
			goto out;
	}
	/* The inode has cached pages, but it's no longer used.
	 * we can safely drop it */
@@ -2827,7 +2844,7 @@ static int encode_caps_cb(struct inode *inode, struct ceph_cap *cap,
		struct ceph_mds_cap_reconnect v2;
		struct ceph_mds_cap_reconnect_v1 v1;
	} rec;
	struct ceph_inode_info *ci;
	struct ceph_inode_info *ci = cap->ci;
	struct ceph_reconnect_state *recon_state = arg;
	struct ceph_pagelist *pagelist = recon_state->pagelist;
	char *path;
@@ -2836,8 +2853,6 @@ static int encode_caps_cb(struct inode *inode, struct ceph_cap *cap,
	u64 snap_follows;
	struct dentry *dentry;

	ci = cap->ci;

	dout(" adding %p ino %llx.%llx cap %p %lld %s\n",
	     inode, ceph_vinop(inode), cap, cap->cap_id,
	     ceph_cap_string(cap->issued));
@@ -2870,7 +2885,8 @@ static int encode_caps_cb(struct inode *inode, struct ceph_cap *cap,
		rec.v2.issued = cpu_to_le32(cap->issued);
		rec.v2.snaprealm = cpu_to_le64(ci->i_snap_realm->ino);
		rec.v2.pathbase = cpu_to_le64(pathbase);
		rec.v2.flock_len = 0;
		rec.v2.flock_len = (__force __le32)
			((ci->i_ceph_flags & CEPH_I_ERROR_FILELOCK) ? 0 : 1);
	} else {
		rec.v1.cap_id = cpu_to_le64(cap->cap_id);
		rec.v1.wanted = cpu_to_le32(__ceph_caps_wanted(ci));
@@ -2894,12 +2910,18 @@ static int encode_caps_cb(struct inode *inode, struct ceph_cap *cap,

	if (recon_state->msg_version >= 2) {
		int num_fcntl_locks, num_flock_locks;
		struct ceph_filelock *flocks;
		struct ceph_filelock *flocks = NULL;
		size_t struct_len, total_len = 0;
		u8 struct_v = 0;

encode_again:
		if (rec.v2.flock_len) {
			ceph_count_locks(inode, &num_fcntl_locks, &num_flock_locks);
		} else {
			num_fcntl_locks = 0;
			num_flock_locks = 0;
		}
		if (num_fcntl_locks + num_flock_locks > 0) {
			flocks = kmalloc((num_fcntl_locks + num_flock_locks) *
					 sizeof(struct ceph_filelock), GFP_NOFS);
			if (!flocks) {
@@ -2911,10 +2933,15 @@ static int encode_caps_cb(struct inode *inode, struct ceph_cap *cap,
							  num_flock_locks);
			if (err) {
				kfree(flocks);
				flocks = NULL;
				if (err == -ENOSPC)
					goto encode_again;
				goto out_free;
			}
		} else {
			kfree(flocks);
			flocks = NULL;
		}

		if (recon_state->msg_version >= 3) {
			/* version, compat_version and struct_len */
@@ -2993,6 +3020,7 @@ static void send_mds_reconnect(struct ceph_mds_client *mdsc,
	int s_nr_caps;
	struct ceph_pagelist *pagelist;
	struct ceph_reconnect_state recon_state;
	LIST_HEAD(dispose);

	pr_info("mds%d reconnect start\n", mds);

@@ -3026,7 +3054,9 @@ static void send_mds_reconnect(struct ceph_mds_client *mdsc,
	 */
	session->s_cap_reconnect = 1;
	/* drop old cap expires; we're about to reestablish that state */
	cleanup_cap_releases(mdsc, session);
	detach_cap_releases(session, &dispose);
	spin_unlock(&session->s_cap_lock);
	dispose_cap_releases(mdsc, &dispose);

	/* trim unused caps to reduce MDS's cache rejoin time */
	if (mdsc->fsc->sb->s_root)
@@ -3857,14 +3887,14 @@ void ceph_mdsc_handle_fsmap(struct ceph_mds_client *mdsc, struct ceph_msg *msg)
		goto err_out;
	}
	return;

bad:
	pr_err("error decoding fsmap\n");
err_out:
	mutex_lock(&mdsc->mutex);
	mdsc->mdsmap_err = -ENOENT;
	mdsc->mdsmap_err = err;
	__wake_requests(mdsc, &mdsc->waiting_for_map);
	mutex_unlock(&mdsc->mutex);
	return;
}

/*
Loading