Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit 4609245e authored by Ilya Dryomov's avatar Ilya Dryomov
Browse files

libceph: pool deletion detection



This adds the "map check" infrastructure for sending osdmap version
checks on CALC_TARGET_POOL_DNE and completing in-flight requests with
-ENOENT if the target pool doesn't exist or has just been deleted.

Signed-off-by: default avatarIlya Dryomov <idryomov@gmail.com>
parent d0b19705
Loading
Loading
Loading
Loading
+6 −0
Original line number Diff line number Diff line
@@ -151,6 +151,7 @@ struct ceph_osd_request_target {
struct ceph_osd_request {
	u64             r_tid;              /* unique for this client */
	struct rb_node  r_node;
	struct rb_node  r_mc_node;          /* map check */
	struct ceph_osd *r_osd;

	struct ceph_osd_request_target r_t;
@@ -191,6 +192,7 @@ struct ceph_osd_request {
	int r_attempts;
	struct ceph_eversion r_replay_version; /* aka reassert_version */
	u32 r_last_force_resend;
	u32 r_map_dne_bound;

	struct ceph_osd_req_op r_ops[];
};
@@ -218,6 +220,7 @@ struct ceph_osd_linger_request {

	struct ceph_osd_request_target t;
	u32 last_force_resend;
	u32 map_dne_bound;

	struct timespec mtime;

@@ -225,6 +228,7 @@ struct ceph_osd_linger_request {
	struct mutex lock;
	struct rb_node node;            /* osd */
	struct rb_node osdc_node;       /* osdc */
	struct rb_node mc_node;         /* map check */
	struct list_head scan_item;

	struct completion reg_commit_wait;
@@ -257,6 +261,8 @@ struct ceph_osd_client {
	atomic64_t             last_tid;      /* tid of last request */
	u64                    last_linger_id;
	struct rb_root         linger_requests; /* lingering requests */
	struct rb_root         map_checks;
	struct rb_root         linger_map_checks;
	atomic_t               num_requests;
	atomic_t               num_homeless;
	struct delayed_work    timeout_work;
+242 −6
Original line number Diff line number Diff line
@@ -396,6 +396,7 @@ static void target_destroy(struct ceph_osd_request_target *t)
static void request_release_checks(struct ceph_osd_request *req)
{
	WARN_ON(!RB_EMPTY_NODE(&req->r_node));
	WARN_ON(!RB_EMPTY_NODE(&req->r_mc_node));
	WARN_ON(!list_empty(&req->r_unsafe_item));
	WARN_ON(req->r_osd);
}
@@ -456,6 +457,7 @@ static void request_init(struct ceph_osd_request *req)
	init_completion(&req->r_completion);
	init_completion(&req->r_safe_completion);
	RB_CLEAR_NODE(&req->r_node);
	RB_CLEAR_NODE(&req->r_mc_node);
	INIT_LIST_HEAD(&req->r_unsafe_item);

	target_init(&req->r_t);
@@ -969,6 +971,7 @@ EXPORT_SYMBOL(ceph_osdc_new_request);
 * We keep osd requests in an rbtree, sorted by ->r_tid.
 */
DEFINE_RB_FUNCS(request, struct ceph_osd_request, r_tid, r_node)
DEFINE_RB_FUNCS(request_mc, struct ceph_osd_request, r_tid, r_mc_node)

static bool osd_homeless(struct ceph_osd *osd)
{
@@ -1601,10 +1604,13 @@ static void maybe_request_map(struct ceph_osd_client *osdc)
		ceph_monc_renew_subs(&osdc->client->monc);
}

static void send_map_check(struct ceph_osd_request *req);

static void __submit_request(struct ceph_osd_request *req, bool wrlocked)
{
	struct ceph_osd_client *osdc = req->r_osdc;
	struct ceph_osd *osd;
	enum calc_target_result ct_res;
	bool need_send = false;
	bool promoted = false;

@@ -1612,7 +1618,10 @@ static void __submit_request(struct ceph_osd_request *req, bool wrlocked)
	dout("%s req %p wrlocked %d\n", __func__, req, wrlocked);

again:
	calc_target(osdc, &req->r_t, &req->r_last_force_resend, false);
	ct_res = calc_target(osdc, &req->r_t, &req->r_last_force_resend, false);
	if (ct_res == CALC_TARGET_POOL_DNE && !wrlocked)
		goto promote;

	osd = lookup_create_osd(osdc, req->r_t.osd, wrlocked);
	if (IS_ERR(osd)) {
		WARN_ON(PTR_ERR(osd) != -EAGAIN || wrlocked);
@@ -1656,6 +1665,9 @@ static void __submit_request(struct ceph_osd_request *req, bool wrlocked)
		send_request(req);
	mutex_unlock(&osd->lock);

	if (ct_res == CALC_TARGET_POOL_DNE)
		send_map_check(req);

	if (promoted)
		downgrade_write(&osdc->lock);
	return;
@@ -1699,6 +1711,7 @@ static void __finish_request(struct ceph_osd_request *req)
	verify_osd_locked(osd);
	dout("%s req %p tid %llu\n", __func__, req, req->r_tid);

	WARN_ON(lookup_request_mc(&osdc->map_checks, req->r_tid));
	unlink_request(osd, req);
	atomic_dec(&osdc->num_requests);

@@ -1726,13 +1739,127 @@ static void __complete_request(struct ceph_osd_request *req)
		complete_all(&req->r_completion);
}

/*
 * Note that this is open-coded in handle_reply(), which has to deal
 * with ack vs commit, dup acks, etc.
 */
static void complete_request(struct ceph_osd_request *req, int err)
{
	dout("%s req %p tid %llu err %d\n", __func__, req, req->r_tid, err);

	req->r_result = err;
	__finish_request(req);
	__complete_request(req);
	complete_all(&req->r_safe_completion);
	ceph_osdc_put_request(req);
}

static void cancel_map_check(struct ceph_osd_request *req)
{
	struct ceph_osd_client *osdc = req->r_osdc;
	struct ceph_osd_request *lookup_req;

	verify_osdc_wrlocked(osdc);

	lookup_req = lookup_request_mc(&osdc->map_checks, req->r_tid);
	if (!lookup_req)
		return;

	WARN_ON(lookup_req != req);
	erase_request_mc(&osdc->map_checks, req);
	ceph_osdc_put_request(req);
}

static void cancel_request(struct ceph_osd_request *req)
{
	dout("%s req %p tid %llu\n", __func__, req, req->r_tid);

	cancel_map_check(req);
	finish_request(req);
}

static void check_pool_dne(struct ceph_osd_request *req)
{
	struct ceph_osd_client *osdc = req->r_osdc;
	struct ceph_osdmap *map = osdc->osdmap;

	verify_osdc_wrlocked(osdc);
	WARN_ON(!map->epoch);

	if (req->r_attempts) {
		/*
		 * We sent a request earlier, which means that
		 * previously the pool existed, and now it does not
		 * (i.e., it was deleted).
		 */
		req->r_map_dne_bound = map->epoch;
		dout("%s req %p tid %llu pool disappeared\n", __func__, req,
		     req->r_tid);
	} else {
		dout("%s req %p tid %llu map_dne_bound %u have %u\n", __func__,
		     req, req->r_tid, req->r_map_dne_bound, map->epoch);
	}

	if (req->r_map_dne_bound) {
		if (map->epoch >= req->r_map_dne_bound) {
			/* we had a new enough map */
			pr_info_ratelimited("tid %llu pool does not exist\n",
					    req->r_tid);
			complete_request(req, -ENOENT);
		}
	} else {
		send_map_check(req);
	}
}

static void map_check_cb(struct ceph_mon_generic_request *greq)
{
	struct ceph_osd_client *osdc = &greq->monc->client->osdc;
	struct ceph_osd_request *req;
	u64 tid = greq->private_data;

	WARN_ON(greq->result || !greq->u.newest);

	down_write(&osdc->lock);
	req = lookup_request_mc(&osdc->map_checks, tid);
	if (!req) {
		dout("%s tid %llu dne\n", __func__, tid);
		goto out_unlock;
	}

	dout("%s req %p tid %llu map_dne_bound %u newest %llu\n", __func__,
	     req, req->r_tid, req->r_map_dne_bound, greq->u.newest);
	if (!req->r_map_dne_bound)
		req->r_map_dne_bound = greq->u.newest;
	erase_request_mc(&osdc->map_checks, req);
	check_pool_dne(req);

	ceph_osdc_put_request(req);
out_unlock:
	up_write(&osdc->lock);
}

static void send_map_check(struct ceph_osd_request *req)
{
	struct ceph_osd_client *osdc = req->r_osdc;
	struct ceph_osd_request *lookup_req;
	int ret;

	verify_osdc_wrlocked(osdc);

	lookup_req = lookup_request_mc(&osdc->map_checks, req->r_tid);
	if (lookup_req) {
		WARN_ON(lookup_req != req);
		return;
	}

	ceph_osdc_get_request(req);
	insert_request_mc(&osdc->map_checks, req);
	ret = ceph_monc_get_version_async(&osdc->client->monc, "osdmap",
					  map_check_cb, req->r_tid);
	WARN_ON(ret);
}

/*
 * lingering requests, watch/notify v2 infrastructure
 */
@@ -1745,6 +1872,7 @@ static void linger_release(struct kref *kref)
	     lreq->reg_req, lreq->ping_req);
	WARN_ON(!RB_EMPTY_NODE(&lreq->node));
	WARN_ON(!RB_EMPTY_NODE(&lreq->osdc_node));
	WARN_ON(!RB_EMPTY_NODE(&lreq->mc_node));
	WARN_ON(!list_empty(&lreq->scan_item));
	WARN_ON(!list_empty(&lreq->pending_lworks));
	WARN_ON(lreq->osd);
@@ -1783,6 +1911,7 @@ linger_alloc(struct ceph_osd_client *osdc)
	mutex_init(&lreq->lock);
	RB_CLEAR_NODE(&lreq->node);
	RB_CLEAR_NODE(&lreq->osdc_node);
	RB_CLEAR_NODE(&lreq->mc_node);
	INIT_LIST_HEAD(&lreq->scan_item);
	INIT_LIST_HEAD(&lreq->pending_lworks);
	init_completion(&lreq->reg_commit_wait);
@@ -1797,6 +1926,7 @@ linger_alloc(struct ceph_osd_client *osdc)

DEFINE_RB_INSDEL_FUNCS(linger, struct ceph_osd_linger_request, linger_id, node)
DEFINE_RB_FUNCS(linger_osdc, struct ceph_osd_linger_request, linger_id, osdc_node)
DEFINE_RB_FUNCS(linger_mc, struct ceph_osd_linger_request, linger_id, mc_node)

/*
 * Create linger request <-> OSD session relation.
@@ -2193,6 +2323,23 @@ static void linger_submit(struct ceph_osd_linger_request *lreq)
	send_linger(lreq);
}

static void cancel_linger_map_check(struct ceph_osd_linger_request *lreq)
{
	struct ceph_osd_client *osdc = lreq->osdc;
	struct ceph_osd_linger_request *lookup_lreq;

	verify_osdc_wrlocked(osdc);

	lookup_lreq = lookup_linger_mc(&osdc->linger_map_checks,
				       lreq->linger_id);
	if (!lookup_lreq)
		return;

	WARN_ON(lookup_lreq != lreq);
	erase_linger_mc(&osdc->linger_map_checks, lreq);
	linger_put(lreq);
}

/*
 * @lreq has to be both registered and linked.
 */
@@ -2202,6 +2349,7 @@ static void __linger_cancel(struct ceph_osd_linger_request *lreq)
		cancel_linger_request(lreq->ping_req);
	if (lreq->reg_req->r_osd)
		cancel_linger_request(lreq->reg_req);
	cancel_linger_map_check(lreq);
	unlink_linger(lreq->osd, lreq);
	linger_unregister(lreq);
}
@@ -2216,6 +2364,89 @@ static void linger_cancel(struct ceph_osd_linger_request *lreq)
	up_write(&osdc->lock);
}

static void send_linger_map_check(struct ceph_osd_linger_request *lreq);

static void check_linger_pool_dne(struct ceph_osd_linger_request *lreq)
{
	struct ceph_osd_client *osdc = lreq->osdc;
	struct ceph_osdmap *map = osdc->osdmap;

	verify_osdc_wrlocked(osdc);
	WARN_ON(!map->epoch);

	if (lreq->register_gen) {
		lreq->map_dne_bound = map->epoch;
		dout("%s lreq %p linger_id %llu pool disappeared\n", __func__,
		     lreq, lreq->linger_id);
	} else {
		dout("%s lreq %p linger_id %llu map_dne_bound %u have %u\n",
		     __func__, lreq, lreq->linger_id, lreq->map_dne_bound,
		     map->epoch);
	}

	if (lreq->map_dne_bound) {
		if (map->epoch >= lreq->map_dne_bound) {
			/* we had a new enough map */
			pr_info("linger_id %llu pool does not exist\n",
				lreq->linger_id);
			linger_reg_commit_complete(lreq, -ENOENT);
			__linger_cancel(lreq);
		}
	} else {
		send_linger_map_check(lreq);
	}
}

static void linger_map_check_cb(struct ceph_mon_generic_request *greq)
{
	struct ceph_osd_client *osdc = &greq->monc->client->osdc;
	struct ceph_osd_linger_request *lreq;
	u64 linger_id = greq->private_data;

	WARN_ON(greq->result || !greq->u.newest);

	down_write(&osdc->lock);
	lreq = lookup_linger_mc(&osdc->linger_map_checks, linger_id);
	if (!lreq) {
		dout("%s linger_id %llu dne\n", __func__, linger_id);
		goto out_unlock;
	}

	dout("%s lreq %p linger_id %llu map_dne_bound %u newest %llu\n",
	     __func__, lreq, lreq->linger_id, lreq->map_dne_bound,
	     greq->u.newest);
	if (!lreq->map_dne_bound)
		lreq->map_dne_bound = greq->u.newest;
	erase_linger_mc(&osdc->linger_map_checks, lreq);
	check_linger_pool_dne(lreq);

	linger_put(lreq);
out_unlock:
	up_write(&osdc->lock);
}

static void send_linger_map_check(struct ceph_osd_linger_request *lreq)
{
	struct ceph_osd_client *osdc = lreq->osdc;
	struct ceph_osd_linger_request *lookup_lreq;
	int ret;

	verify_osdc_wrlocked(osdc);

	lookup_lreq = lookup_linger_mc(&osdc->linger_map_checks,
				       lreq->linger_id);
	if (lookup_lreq) {
		WARN_ON(lookup_lreq != lreq);
		return;
	}

	linger_get(lreq);
	insert_linger_mc(&osdc->linger_map_checks, lreq);
	ret = ceph_monc_get_version_async(&osdc->client->monc, "osdmap",
					  linger_map_check_cb, lreq->linger_id);
	WARN_ON(ret);
}

static int linger_reg_commit_wait(struct ceph_osd_linger_request *lreq)
{
	int ret;
@@ -2677,10 +2908,7 @@ static void handle_reply(struct ceph_osd *osd, struct ceph_msg *msg)
	return;

fail_request:
	req->r_result = -EIO;
	__finish_request(req);
	__complete_request(req);
	complete_all(&req->r_safe_completion);
	complete_request(req, -EIO);
out_unlock_session:
	mutex_unlock(&osd->lock);
out_unlock_osdc:
@@ -2764,6 +2992,7 @@ static void scan_requests(struct ceph_osd *osd,

			/* fall through */
		case CALC_TARGET_NEED_RESEND:
			cancel_linger_map_check(lreq);
			/*
			 * scan_requests() for the previous epoch(s)
			 * may have already added it to the list, since
@@ -2773,6 +3002,7 @@ static void scan_requests(struct ceph_osd *osd,
				list_add_tail(&lreq->scan_item, need_resend_linger);
			break;
		case CALC_TARGET_POOL_DNE:
			check_linger_pool_dne(lreq);
			break;
		}
	}
@@ -2782,7 +3012,7 @@ static void scan_requests(struct ceph_osd *osd,
		    rb_entry(n, struct ceph_osd_request, r_node);
		enum calc_target_result ct_res;

		n = rb_next(n); /* unlink_request() */
		n = rb_next(n); /* unlink_request(), check_pool_dne() */

		dout("%s req %p tid %llu\n", __func__, req, req->r_tid);
		ct_res = calc_target(osdc, &req->r_t,
@@ -2799,10 +3029,12 @@ static void scan_requests(struct ceph_osd *osd,

			/* fall through */
		case CALC_TARGET_NEED_RESEND:
			cancel_map_check(req);
			unlink_request(osd, req);
			insert_request(need_resend, req);
			break;
		case CALC_TARGET_POOL_DNE:
			check_pool_dne(req);
			break;
		}
	}
@@ -3655,6 +3887,8 @@ int ceph_osdc_init(struct ceph_osd_client *osdc, struct ceph_client *client)
	osdc->homeless_osd.o_osdc = osdc;
	osdc->homeless_osd.o_osd = CEPH_HOMELESS_OSD;
	osdc->linger_requests = RB_ROOT;
	osdc->map_checks = RB_ROOT;
	osdc->linger_map_checks = RB_ROOT;
	INIT_DELAYED_WORK(&osdc->timeout_work, handle_timeout);
	INIT_DELAYED_WORK(&osdc->osds_timeout_work, handle_osds_timeout);

@@ -3720,6 +3954,8 @@ void ceph_osdc_stop(struct ceph_osd_client *osdc)

	WARN_ON(!list_empty(&osdc->osd_lru));
	WARN_ON(!RB_EMPTY_ROOT(&osdc->linger_requests));
	WARN_ON(!RB_EMPTY_ROOT(&osdc->map_checks));
	WARN_ON(!RB_EMPTY_ROOT(&osdc->linger_map_checks));
	WARN_ON(atomic_read(&osdc->num_requests));
	WARN_ON(atomic_read(&osdc->num_homeless));