Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit b07d3c4b authored by Ilya Dryomov's avatar Ilya Dryomov
Browse files

libceph: support for checking on status of watch



Implement ceph_osdc_watch_check() to be able to check on status of
watch.  Note that the time it takes for a watch/notify event to get
delivered through the notify_wq is taken into account.

Signed-off-by: default avatarIlya Dryomov <idryomov@gmail.com>
parent 19079203
Loading
Loading
Loading
Loading
+4 −0
Original line number Diff line number Diff line
@@ -213,6 +213,8 @@ struct ceph_osd_linger_request {
	struct ceph_osd_request *reg_req;
	struct ceph_osd_request *ping_req;
	unsigned long ping_sent;
	unsigned long watch_valid_thru;
	struct list_head pending_lworks;

	struct ceph_osd_request_target t;
	u32 last_force_resend;
@@ -417,5 +419,7 @@ int ceph_osdc_notify(struct ceph_osd_client *osdc,
		     u32 timeout,
		     struct page ***preply_pages,
		     size_t *preply_len);
int ceph_osdc_watch_check(struct ceph_osd_client *osdc,
			  struct ceph_osd_linger_request *lreq);
#endif
+51 −1
Original line number Diff line number Diff line
@@ -1746,6 +1746,7 @@ static void linger_release(struct kref *kref)
	WARN_ON(!RB_EMPTY_NODE(&lreq->node));
	WARN_ON(!RB_EMPTY_NODE(&lreq->osdc_node));
	WARN_ON(!list_empty(&lreq->scan_item));
	WARN_ON(!list_empty(&lreq->pending_lworks));
	WARN_ON(lreq->osd);

	if (lreq->reg_req)
@@ -1783,6 +1784,7 @@ linger_alloc(struct ceph_osd_client *osdc)
	RB_CLEAR_NODE(&lreq->node);
	RB_CLEAR_NODE(&lreq->osdc_node);
	INIT_LIST_HEAD(&lreq->scan_item);
	INIT_LIST_HEAD(&lreq->pending_lworks);
	init_completion(&lreq->reg_commit_wait);
	init_completion(&lreq->notify_finish_wait);

@@ -1890,6 +1892,8 @@ static void cancel_linger_request(struct ceph_osd_request *req)
struct linger_work {
	struct work_struct work;
	struct ceph_osd_linger_request *lreq;
	struct list_head pending_item;
	unsigned long queued_stamp;

	union {
		struct {
@@ -1916,6 +1920,7 @@ static struct linger_work *lwork_alloc(struct ceph_osd_linger_request *lreq,
		return NULL;

	INIT_WORK(&lwork->work, workfn);
	INIT_LIST_HEAD(&lwork->pending_item);
	lwork->lreq = linger_get(lreq);

	return lwork;
@@ -1925,6 +1930,10 @@ static void lwork_free(struct linger_work *lwork)
{
	struct ceph_osd_linger_request *lreq = lwork->lreq;

	mutex_lock(&lreq->lock);
	list_del(&lwork->pending_item);
	mutex_unlock(&lreq->lock);

	linger_put(lreq);
	kfree(lwork);
}
@@ -1935,6 +1944,10 @@ static void lwork_queue(struct linger_work *lwork)
	struct ceph_osd_client *osdc = lreq->osdc;

	verify_lreq_locked(lreq);
	WARN_ON(!list_empty(&lwork->pending_item));

	lwork->queued_stamp = jiffies;
	list_add_tail(&lwork->pending_item, &lreq->pending_lworks);
	queue_work(osdc->notify_wq, &lwork->work);
}

@@ -2116,7 +2129,9 @@ static void linger_ping_cb(struct ceph_osd_request *req)
	     __func__, lreq, lreq->linger_id, req->r_result, lreq->ping_sent,
	     lreq->last_error);
	if (lreq->register_gen == req->r_ops[0].watch.gen) {
		if (req->r_result && !lreq->last_error) {
		if (!req->r_result) {
			lreq->watch_valid_thru = lreq->ping_sent;
		} else if (!lreq->last_error) {
			lreq->last_error = normalize_watch_error(req->r_result);
			queue_watch_error(lreq);
		}
@@ -3316,6 +3331,7 @@ ceph_osdc_watch(struct ceph_osd_client *osdc,
	lreq->wcb = wcb;
	lreq->errcb = errcb;
	lreq->data = data;
	lreq->watch_valid_thru = jiffies;

	ceph_oid_copy(&lreq->t.base_oid, oid);
	ceph_oloc_copy(&lreq->t.base_oloc, oloc);
@@ -3577,6 +3593,40 @@ int ceph_osdc_notify(struct ceph_osd_client *osdc,
}
EXPORT_SYMBOL(ceph_osdc_notify);

/*
 * Return the number of milliseconds since the watch was last
 * confirmed, or an error.  If there is an error, the watch is no
 * longer valid, and should be destroyed with ceph_osdc_unwatch().
 */
int ceph_osdc_watch_check(struct ceph_osd_client *osdc,
			  struct ceph_osd_linger_request *lreq)
{
	unsigned long stamp, age;
	int ret;

	down_read(&osdc->lock);
	mutex_lock(&lreq->lock);
	stamp = lreq->watch_valid_thru;
	if (!list_empty(&lreq->pending_lworks)) {
		struct linger_work *lwork =
		    list_first_entry(&lreq->pending_lworks,
				     struct linger_work,
				     pending_item);

		if (time_before(lwork->queued_stamp, stamp))
			stamp = lwork->queued_stamp;
	}
	age = jiffies - stamp;
	dout("%s lreq %p linger_id %llu age %lu last_error %d\n", __func__,
	     lreq, lreq->linger_id, age, lreq->last_error);
	/* we are truncating to msecs, so return a safe upper bound */
	ret = lreq->last_error ?: 1 + jiffies_to_msecs(age);

	mutex_unlock(&lreq->lock);
	up_read(&osdc->lock);
	return ret;
}

/*
 * Call all pending notify callbacks - for use after a watch is
 * unregistered, to make sure no more callbacks for it will be invoked