Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit 10183a69 authored by Yan, Zheng's avatar Yan, Zheng Committed by Ilya Dryomov
Browse files

ceph: check OSD caps before read/write



Signed-off-by: default avatarYan, Zheng <zyan@redhat.com>
parent 144cba14
Loading
Loading
Loading
Loading
+203 −0
Original line number Diff line number Diff line
@@ -1598,3 +1598,206 @@ int ceph_mmap(struct file *file, struct vm_area_struct *vma)
	vma->vm_ops = &ceph_vmops;
	return 0;
}

enum {
	POOL_READ	= 1,
	POOL_WRITE	= 2,
};

static int __ceph_pool_perm_get(struct ceph_inode_info *ci, u32 pool)
{
	struct ceph_fs_client *fsc = ceph_inode_to_client(&ci->vfs_inode);
	struct ceph_mds_client *mdsc = fsc->mdsc;
	struct ceph_osd_request *rd_req = NULL, *wr_req = NULL;
	struct rb_node **p, *parent;
	struct ceph_pool_perm *perm;
	struct page **pages;
	int err = 0, err2 = 0, have = 0;

	down_read(&mdsc->pool_perm_rwsem);
	p = &mdsc->pool_perm_tree.rb_node;
	while (*p) {
		perm = rb_entry(*p, struct ceph_pool_perm, node);
		if (pool < perm->pool)
			p = &(*p)->rb_left;
		else if (pool > perm->pool)
			p = &(*p)->rb_right;
		else {
			have = perm->perm;
			break;
		}
	}
	up_read(&mdsc->pool_perm_rwsem);
	if (*p)
		goto out;

	dout("__ceph_pool_perm_get pool %u no perm cached\n", pool);

	down_write(&mdsc->pool_perm_rwsem);
	parent = NULL;
	while (*p) {
		parent = *p;
		perm = rb_entry(parent, struct ceph_pool_perm, node);
		if (pool < perm->pool)
			p = &(*p)->rb_left;
		else if (pool > perm->pool)
			p = &(*p)->rb_right;
		else {
			have = perm->perm;
			break;
		}
	}
	if (*p) {
		up_write(&mdsc->pool_perm_rwsem);
		goto out;
	}

	rd_req = ceph_osdc_alloc_request(&fsc->client->osdc,
					 ci->i_snap_realm->cached_context,
					 1, false, GFP_NOFS);
	if (!rd_req) {
		err = -ENOMEM;
		goto out_unlock;
	}

	rd_req->r_flags = CEPH_OSD_FLAG_READ;
	osd_req_op_init(rd_req, 0, CEPH_OSD_OP_STAT, 0);
	rd_req->r_base_oloc.pool = pool;
	snprintf(rd_req->r_base_oid.name, sizeof(rd_req->r_base_oid.name),
		 "%llx.00000000", ci->i_vino.ino);
	rd_req->r_base_oid.name_len = strlen(rd_req->r_base_oid.name);

	wr_req = ceph_osdc_alloc_request(&fsc->client->osdc,
					 ci->i_snap_realm->cached_context,
					 1, false, GFP_NOFS);
	if (!wr_req) {
		err = -ENOMEM;
		goto out_unlock;
	}

	wr_req->r_flags = CEPH_OSD_FLAG_WRITE |
			  CEPH_OSD_FLAG_ACK | CEPH_OSD_FLAG_ONDISK;
	osd_req_op_init(wr_req, 0, CEPH_OSD_OP_CREATE, CEPH_OSD_OP_FLAG_EXCL);
	wr_req->r_base_oloc.pool = pool;
	wr_req->r_base_oid = rd_req->r_base_oid;

	/* one page should be large enough for STAT data */
	pages = ceph_alloc_page_vector(1, GFP_KERNEL);
	if (IS_ERR(pages)) {
		err = PTR_ERR(pages);
		goto out_unlock;
	}

	osd_req_op_raw_data_in_pages(rd_req, 0, pages, PAGE_SIZE,
				     0, false, true);
	ceph_osdc_build_request(rd_req, 0, NULL, CEPH_NOSNAP,
				&ci->vfs_inode.i_mtime);
	err = ceph_osdc_start_request(&fsc->client->osdc, rd_req, false);

	ceph_osdc_build_request(wr_req, 0, NULL, CEPH_NOSNAP,
				&ci->vfs_inode.i_mtime);
	err2 = ceph_osdc_start_request(&fsc->client->osdc, wr_req, false);

	if (!err)
		err = ceph_osdc_wait_request(&fsc->client->osdc, rd_req);
	if (!err2)
		err2 = ceph_osdc_wait_request(&fsc->client->osdc, wr_req);

	if (err >= 0 || err == -ENOENT)
		have |= POOL_READ;
	else if (err != -EPERM)
		goto out_unlock;

	if (err2 == 0 || err2 == -EEXIST)
		have |= POOL_WRITE;
	else if (err2 != -EPERM) {
		err = err2;
		goto out_unlock;
	}

	perm = kmalloc(sizeof(*perm), GFP_NOFS);
	if (!perm) {
		err = -ENOMEM;
		goto out_unlock;
	}

	perm->pool = pool;
	perm->perm = have;
	rb_link_node(&perm->node, parent, p);
	rb_insert_color(&perm->node, &mdsc->pool_perm_tree);
	err = 0;
out_unlock:
	up_write(&mdsc->pool_perm_rwsem);

	if (rd_req)
		ceph_osdc_put_request(rd_req);
	if (wr_req)
		ceph_osdc_put_request(wr_req);
out:
	if (!err)
		err = have;
	dout("__ceph_pool_perm_get pool %u result = %d\n", pool, err);
	return err;
}

int ceph_pool_perm_check(struct ceph_inode_info *ci, int need)
{
	u32 pool;
	int ret, flags;

	if (ceph_test_mount_opt(ceph_inode_to_client(&ci->vfs_inode),
				NOPOOLPERM))
		return 0;

	spin_lock(&ci->i_ceph_lock);
	flags = ci->i_ceph_flags;
	pool = ceph_file_layout_pg_pool(ci->i_layout);
	spin_unlock(&ci->i_ceph_lock);
check:
	if (flags & CEPH_I_POOL_PERM) {
		if ((need & CEPH_CAP_FILE_RD) && !(flags & CEPH_I_POOL_RD)) {
			dout("ceph_pool_perm_check pool %u no read perm\n",
			     pool);
			return -EPERM;
		}
		if ((need & CEPH_CAP_FILE_WR) && !(flags & CEPH_I_POOL_WR)) {
			dout("ceph_pool_perm_check pool %u no write perm\n",
			     pool);
			return -EPERM;
		}
		return 0;
	}

	ret = __ceph_pool_perm_get(ci, pool);
	if (ret < 0)
		return ret;

	flags = CEPH_I_POOL_PERM;
	if (ret & POOL_READ)
		flags |= CEPH_I_POOL_RD;
	if (ret & POOL_WRITE)
		flags |= CEPH_I_POOL_WR;

	spin_lock(&ci->i_ceph_lock);
	if (pool == ceph_file_layout_pg_pool(ci->i_layout)) {
		ci->i_ceph_flags = flags;
        } else {
		pool = ceph_file_layout_pg_pool(ci->i_layout);
		flags = ci->i_ceph_flags;
	}
	spin_unlock(&ci->i_ceph_lock);
	goto check;
}

void ceph_pool_perm_destroy(struct ceph_mds_client *mdsc)
{
	struct ceph_pool_perm *perm;
	struct rb_node *n;

	while (!RB_EMPTY_ROOT(&mdsc->pool_perm_tree)) {
		n = rb_first(&mdsc->pool_perm_tree);
		perm = rb_entry(n, struct ceph_pool_perm, node);
		rb_erase(n, &mdsc->pool_perm_tree);
		kfree(perm);
	}
}
+4 −0
Original line number Diff line number Diff line
@@ -2233,6 +2233,10 @@ int ceph_get_caps(struct ceph_inode_info *ci, int need, int want,
{
	int _got, check_max, ret, err = 0;

	ret = ceph_pool_perm_check(ci, need);
	if (ret < 0)
		return ret;

retry:
	if (endoff > 0)
		check_max_size(&ci->vfs_inode, endoff);
+3 −0
Original line number Diff line number Diff line
@@ -753,7 +753,10 @@ static int fill_inode(struct inode *inode, struct page *locked_page,

	if (new_version ||
	    (new_issued & (CEPH_CAP_ANY_FILE_RD | CEPH_CAP_ANY_FILE_WR))) {
		if (ci->i_layout.fl_pg_pool != info->layout.fl_pg_pool)
			ci->i_ceph_flags &= ~CEPH_I_POOL_PERM;
		ci->i_layout = info->layout;

		queue_trunc = ceph_fill_file_size(inode, issued,
					le32_to_cpu(info->truncate_seq),
					le64_to_cpu(info->truncate_size),
+4 −0
Original line number Diff line number Diff line
@@ -3414,6 +3414,9 @@ int ceph_mdsc_init(struct ceph_fs_client *fsc)
	ceph_caps_init(mdsc);
	ceph_adjust_min_caps(mdsc, fsc->min_caps);

	init_rwsem(&mdsc->pool_perm_rwsem);
	mdsc->pool_perm_tree = RB_ROOT;

	return 0;
}

@@ -3607,6 +3610,7 @@ static void ceph_mdsc_stop(struct ceph_mds_client *mdsc)
		ceph_mdsmap_destroy(mdsc->mdsmap);
	kfree(mdsc->sessions);
	ceph_caps_finalize(mdsc);
	ceph_pool_perm_destroy(mdsc);
}

void ceph_mdsc_destroy(struct ceph_fs_client *fsc)
+9 −0
Original line number Diff line number Diff line
@@ -260,6 +260,12 @@ struct ceph_mds_request {
	int r_num_caps;
};

struct ceph_pool_perm {
	struct rb_node node;
	u32 pool;
	int perm;
};

/*
 * mds client state
 */
@@ -328,6 +334,9 @@ struct ceph_mds_client {
	spinlock_t	  dentry_lru_lock;
	struct list_head  dentry_lru;
	int		  num_dentry;

	struct rw_semaphore     pool_perm_rwsem;
	struct rb_root		pool_perm_tree;
};

extern const char *ceph_mds_op_name(int op);
Loading