Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit 14bb211d authored by Ilya Dryomov's avatar Ilya Dryomov
Browse files

rbd: support updating the lock cookie without releasing the lock



As we no longer release the lock before potentially raising BLACKLISTED
in rbd_reregister_watch(), the "either locked or blacklisted" assert in
rbd_queue_workfn() needs to go: we can be both locked and blacklisted
at that point now.

Signed-off-by: default avatarIlya Dryomov <idryomov@gmail.com>
Reviewed-by: default avatarJason Dillaman <dillaman@redhat.com>
parent cbbfb0ff
Loading
Loading
Loading
Loading
+41 −25
Original line number Diff line number Diff line
@@ -3820,24 +3820,51 @@ static void rbd_unregister_watch(struct rbd_device *rbd_dev)
	ceph_osdc_flush_notifies(&rbd_dev->rbd_client->client->osdc);
}

/*
 * lock_rwsem must be held for write
 */
static void rbd_reacquire_lock(struct rbd_device *rbd_dev)
{
	struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
	char cookie[32];
	int ret;

	WARN_ON(rbd_dev->lock_state != RBD_LOCK_STATE_LOCKED);

	format_lock_cookie(rbd_dev, cookie);
	ret = ceph_cls_set_cookie(osdc, &rbd_dev->header_oid,
				  &rbd_dev->header_oloc, RBD_LOCK_NAME,
				  CEPH_CLS_LOCK_EXCLUSIVE, rbd_dev->lock_cookie,
				  RBD_LOCK_TAG, cookie);
	if (ret) {
		if (ret != -EOPNOTSUPP)
			rbd_warn(rbd_dev, "failed to update lock cookie: %d",
				 ret);

		/*
		 * Lock cookie cannot be updated on older OSDs, so do
		 * a manual release and queue an acquire.
		 */
		if (rbd_release_lock(rbd_dev))
			queue_delayed_work(rbd_dev->task_wq,
					   &rbd_dev->lock_dwork, 0);
	} else {
		strcpy(rbd_dev->lock_cookie, cookie);
	}
}

static void rbd_reregister_watch(struct work_struct *work)
{
	struct rbd_device *rbd_dev = container_of(to_delayed_work(work),
					    struct rbd_device, watch_dwork);
	bool was_lock_owner = false;
	bool need_to_wake = false;
	int ret;

	dout("%s rbd_dev %p\n", __func__, rbd_dev);

	down_write(&rbd_dev->lock_rwsem);
	if (rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED)
		was_lock_owner = rbd_release_lock(rbd_dev);

	mutex_lock(&rbd_dev->watch_mutex);
	if (rbd_dev->watch_state != RBD_WATCH_STATE_ERROR) {
		mutex_unlock(&rbd_dev->watch_mutex);
		goto out;
		return;
	}

	ret = __rbd_register_watch(rbd_dev);
@@ -3845,36 +3872,28 @@ static void rbd_reregister_watch(struct work_struct *work)
		rbd_warn(rbd_dev, "failed to reregister watch: %d", ret);
		if (ret == -EBLACKLISTED || ret == -ENOENT) {
			set_bit(RBD_DEV_FLAG_BLACKLISTED, &rbd_dev->flags);
			need_to_wake = true;
			wake_requests(rbd_dev, true);
		} else {
			queue_delayed_work(rbd_dev->task_wq,
					   &rbd_dev->watch_dwork,
					   RBD_RETRY_DELAY);
		}
		mutex_unlock(&rbd_dev->watch_mutex);
		goto out;
		return;
	}

	need_to_wake = true;
	rbd_dev->watch_state = RBD_WATCH_STATE_REGISTERED;
	rbd_dev->watch_cookie = rbd_dev->watch_handle->linger_id;
	mutex_unlock(&rbd_dev->watch_mutex);

	down_write(&rbd_dev->lock_rwsem);
	if (rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED)
		rbd_reacquire_lock(rbd_dev);
	up_write(&rbd_dev->lock_rwsem);

	ret = rbd_dev_refresh(rbd_dev);
	if (ret)
		rbd_warn(rbd_dev, "reregisteration refresh failed: %d", ret);

	if (was_lock_owner) {
		ret = rbd_try_lock(rbd_dev);
		if (ret)
			rbd_warn(rbd_dev, "reregisteration lock failed: %d",
				 ret);
	}

out:
	up_write(&rbd_dev->lock_rwsem);
	if (need_to_wake)
		wake_requests(rbd_dev, true);
}

/*
@@ -4052,9 +4071,6 @@ static void rbd_queue_workfn(struct work_struct *work)
		if (rbd_dev->lock_state != RBD_LOCK_STATE_LOCKED &&
		    !test_bit(RBD_DEV_FLAG_BLACKLISTED, &rbd_dev->flags))
			rbd_wait_state_locked(rbd_dev);

		WARN_ON((rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED) ^
			!test_bit(RBD_DEV_FLAG_BLACKLISTED, &rbd_dev->flags));
		if (test_bit(RBD_DEV_FLAG_BLACKLISTED, &rbd_dev->flags)) {
			result = -EBLACKLISTED;
			goto err_unlock;
+5 −0
Original line number Diff line number Diff line
@@ -37,6 +37,11 @@ int ceph_cls_break_lock(struct ceph_osd_client *osdc,
			struct ceph_object_locator *oloc,
			char *lock_name, char *cookie,
			struct ceph_entity_name *locker);
int ceph_cls_set_cookie(struct ceph_osd_client *osdc,
			struct ceph_object_id *oid,
			struct ceph_object_locator *oloc,
			char *lock_name, u8 type, char *old_cookie,
			char *tag, char *new_cookie);

void ceph_free_lockers(struct ceph_locker *lockers, u32 num_lockers);

+51 −0
Original line number Diff line number Diff line
@@ -179,6 +179,57 @@ int ceph_cls_break_lock(struct ceph_osd_client *osdc,
}
EXPORT_SYMBOL(ceph_cls_break_lock);

int ceph_cls_set_cookie(struct ceph_osd_client *osdc,
			struct ceph_object_id *oid,
			struct ceph_object_locator *oloc,
			char *lock_name, u8 type, char *old_cookie,
			char *tag, char *new_cookie)
{
	int cookie_op_buf_size;
	int name_len = strlen(lock_name);
	int old_cookie_len = strlen(old_cookie);
	int tag_len = strlen(tag);
	int new_cookie_len = strlen(new_cookie);
	void *p, *end;
	struct page *cookie_op_page;
	int ret;

	cookie_op_buf_size = name_len + sizeof(__le32) +
			     old_cookie_len + sizeof(__le32) +
			     tag_len + sizeof(__le32) +
			     new_cookie_len + sizeof(__le32) +
			     sizeof(u8) + CEPH_ENCODING_START_BLK_LEN;
	if (cookie_op_buf_size > PAGE_SIZE)
		return -E2BIG;

	cookie_op_page = alloc_page(GFP_NOIO);
	if (!cookie_op_page)
		return -ENOMEM;

	p = page_address(cookie_op_page);
	end = p + cookie_op_buf_size;

	/* encode cls_lock_set_cookie_op struct */
	ceph_start_encoding(&p, 1, 1,
			    cookie_op_buf_size - CEPH_ENCODING_START_BLK_LEN);
	ceph_encode_string(&p, end, lock_name, name_len);
	ceph_encode_8(&p, type);
	ceph_encode_string(&p, end, old_cookie, old_cookie_len);
	ceph_encode_string(&p, end, tag, tag_len);
	ceph_encode_string(&p, end, new_cookie, new_cookie_len);

	dout("%s lock_name %s type %d old_cookie %s tag %s new_cookie %s\n",
	     __func__, lock_name, type, old_cookie, tag, new_cookie);
	ret = ceph_osdc_call(osdc, oid, oloc, "lock", "set_cookie",
			     CEPH_OSD_FLAG_WRITE, cookie_op_page,
			     cookie_op_buf_size, NULL, NULL);

	dout("%s: status %d\n", __func__, ret);
	__free_page(cookie_op_page);
	return ret;
}
EXPORT_SYMBOL(ceph_cls_set_cookie);

void ceph_free_lockers(struct ceph_locker *lockers, u32 num_lockers)
{
	int i;