Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit 19079203 authored by Ilya Dryomov's avatar Ilya Dryomov
Browse files

libceph: support for sending notifies



Implement ceph_osdc_notify() for sending notifies.

Due to the fact that the current messenger can't do read-in into
pagelists (it can only do write-out from them), I had to go with a page
vector for a NOTIFY_COMPLETE payload, for now.

Signed-off-by: default avatarIlya Dryomov <idryomov@gmail.com>
parent 922dab61
Loading
Loading
Loading
Loading
+20 −0
Original line number Diff line number Diff line
@@ -114,6 +114,11 @@ struct ceph_osd_req_op {
		struct {
			struct ceph_osd_data request_data;
		} notify_ack;
		struct {
			u64 cookie;
			struct ceph_osd_data request_data;
			struct ceph_osd_data response_data;
		} notify;
		struct {
			u64 expected_object_size;
			u64 expected_write_size;
@@ -202,6 +207,7 @@ struct ceph_osd_linger_request {
	struct ceph_osd_client *osdc;
	u64 linger_id;
	bool committed;
	bool is_watch;                  /* watch or notify */

	struct ceph_osd *osd;
	struct ceph_osd_request *reg_req;
@@ -220,14 +226,20 @@ struct ceph_osd_linger_request {
	struct list_head scan_item;

	struct completion reg_commit_wait;
	struct completion notify_finish_wait;
	int reg_commit_error;
	int notify_finish_error;
	int last_error;

	u32 register_gen;
	u64 notify_id;

	rados_watchcb2_t wcb;
	rados_watcherrcb_t errcb;
	void *data;

	struct page ***preply_pages;
	size_t *preply_len;
};

struct ceph_osd_client {
@@ -397,5 +409,13 @@ int ceph_osdc_notify_ack(struct ceph_osd_client *osdc,
			 u64 cookie,
			 void *payload,
			 size_t payload_len);
int ceph_osdc_notify(struct ceph_osd_client *osdc,
		     struct ceph_object_id *oid,
		     struct ceph_object_locator *oloc,
		     void *payload,
		     size_t payload_len,
		     u32 timeout,
		     struct page ***preply_pages,
		     size_t *preply_len);
#endif
+3 −0
Original line number Diff line number Diff line
@@ -476,6 +476,9 @@ struct ceph_osd_op {
			__u8 op;	/* CEPH_OSD_WATCH_OP_* */
			__le32 gen;     /* registration generation */
		} __attribute__ ((packed)) watch;
		struct {
			__le64 cookie;
		} __attribute__ ((packed)) notify;
		struct {
			__le64 offset, length;
			__le64 src_offset;
+3 −2
Original line number Diff line number Diff line
@@ -206,8 +206,9 @@ static void dump_linger_request(struct seq_file *s,
	seq_printf(s, "%llu\t", lreq->linger_id);
	dump_target(s, &lreq->t);

	seq_printf(s, "\t%u\t%s/%d\n", lreq->register_gen,
		   lreq->committed ? "C" : "", lreq->last_error);
	seq_printf(s, "\t%u\t%s%s/%d\n", lreq->register_gen,
		   lreq->is_watch ? "W" : "N", lreq->committed ? "C" : "",
		   lreq->last_error);
}

static void dump_linger_requests(struct seq_file *s, struct ceph_osd *osd)
+223 −9
Original line number Diff line number Diff line
@@ -334,6 +334,10 @@ static void osd_req_op_data_release(struct ceph_osd_request *osd_req,
	case CEPH_OSD_OP_NOTIFY_ACK:
		ceph_osd_data_release(&op->notify_ack.request_data);
		break;
	case CEPH_OSD_OP_NOTIFY:
		ceph_osd_data_release(&op->notify.request_data);
		ceph_osd_data_release(&op->notify.response_data);
		break;
	default:
		break;
	}
@@ -845,6 +849,9 @@ static u32 osd_req_encode_op(struct ceph_osd_op *dst,
		break;
	case CEPH_OSD_OP_NOTIFY_ACK:
		break;
	case CEPH_OSD_OP_NOTIFY:
		dst->notify.cookie = cpu_to_le64(src->notify.cookie);
		break;
	case CEPH_OSD_OP_SETALLOCHINT:
		dst->alloc_hint.expected_object_size =
		    cpu_to_le64(src->alloc_hint.expected_object_size);
@@ -1439,6 +1446,12 @@ static void setup_request_data(struct ceph_osd_request *req,
			ceph_osdc_msg_data_add(req->r_reply,
					       &op->cls.response_data);
			break;
		case CEPH_OSD_OP_NOTIFY:
			ceph_osdc_msg_data_add(msg,
					       &op->notify.request_data);
			ceph_osdc_msg_data_add(req->r_reply,
					       &op->notify.response_data);
			break;
		}

		data_len += op->indata_len;
@@ -1771,6 +1784,7 @@ linger_alloc(struct ceph_osd_client *osdc)
	RB_CLEAR_NODE(&lreq->osdc_node);
	INIT_LIST_HEAD(&lreq->scan_item);
	init_completion(&lreq->reg_commit_wait);
	init_completion(&lreq->notify_finish_wait);

	lreq->osdc = osdc;
	target_init(&lreq->t);
@@ -1934,6 +1948,7 @@ static void do_watch_notify(struct work_struct *w)
		goto out;
	}

	WARN_ON(!lreq->is_watch);
	dout("%s lreq %p notify_id %llu notifier_id %llu payload_len %zu\n",
	     __func__, lreq, lwork->notify.notify_id, lwork->notify.notifier_id,
	     lwork->notify.payload_len);
@@ -1997,6 +2012,24 @@ static void linger_commit_cb(struct ceph_osd_request *req)
	linger_reg_commit_complete(lreq, req->r_result);
	lreq->committed = true;

	if (!lreq->is_watch) {
		struct ceph_osd_data *osd_data =
		    osd_req_op_data(req, 0, notify, response_data);
		void *p = page_address(osd_data->pages[0]);

		WARN_ON(req->r_ops[0].op != CEPH_OSD_OP_NOTIFY ||
			osd_data->type != CEPH_OSD_DATA_TYPE_PAGES);

		/* make note of the notify_id */
		if (req->r_ops[0].outdata_len >= sizeof(u64)) {
			lreq->notify_id = ceph_decode_64(&p);
			dout("lreq %p notify_id %llu\n", lreq,
			     lreq->notify_id);
		} else {
			dout("lreq %p no notify_id\n", lreq);
		}
	}

	mutex_unlock(&lreq->lock);
	linger_put(lreq);
}
@@ -2050,7 +2083,7 @@ static void send_linger(struct ceph_osd_linger_request *lreq)
	req->r_mtime = lreq->mtime;

	mutex_lock(&lreq->lock);
	if (lreq->committed) {
	if (lreq->is_watch && lreq->committed) {
		WARN_ON(op->op != CEPH_OSD_OP_WATCH ||
			op->watch.cookie != lreq->linger_id);
		op->watch.op = CEPH_OSD_WATCH_OP_RECONNECT;
@@ -2059,6 +2092,9 @@ static void send_linger(struct ceph_osd_linger_request *lreq)
		     op->watch.gen);
		req->r_callback = linger_reconnect_cb;
	} else {
		if (!lreq->is_watch)
			lreq->notify_id = 0;
		else
			WARN_ON(op->watch.op != CEPH_OSD_WATCH_OP_WATCH);
		dout("lreq %p register\n", lreq);
		req->r_callback = linger_commit_cb;
@@ -2147,7 +2183,7 @@ static void linger_submit(struct ceph_osd_linger_request *lreq)
 */
static void __linger_cancel(struct ceph_osd_linger_request *lreq)
{
	if (lreq->ping_req->r_osd)
	if (lreq->is_watch && lreq->ping_req->r_osd)
		cancel_linger_request(lreq->ping_req);
	if (lreq->reg_req->r_osd)
		cancel_linger_request(lreq->reg_req);
@@ -2174,6 +2210,15 @@ static int linger_reg_commit_wait(struct ceph_osd_linger_request *lreq)
	return ret ?: lreq->reg_commit_error;
}

static int linger_notify_finish_wait(struct ceph_osd_linger_request *lreq)
{
	int ret;

	dout("%s lreq %p linger_id %llu\n", __func__, lreq, lreq->linger_id);
	ret = wait_for_completion_interruptible(&lreq->notify_finish_wait);
	return ret ?: lreq->notify_finish_error;
}

/*
 * Timeout callback, called every N seconds.  When 1 or more OSD
 * requests has been active for more than N seconds, we send a keepalive
@@ -2220,7 +2265,7 @@ static void handle_timeout(struct work_struct *work)
			found = true;

			mutex_lock(&lreq->lock);
			if (lreq->committed && !lreq->last_error)
			if (lreq->is_watch && lreq->committed && !lreq->last_error)
				send_linger_ping(lreq);
			mutex_unlock(&lreq->lock);
		}
@@ -3032,6 +3077,7 @@ static void handle_watch_notify(struct ceph_osd_client *osdc,
	u8 proto_ver, opcode;
	u64 cookie, notify_id;
	u64 notifier_id = 0;
	s32 return_code = 0;
	void *payload = NULL;
	u32 payload_len = 0;

@@ -3049,7 +3095,7 @@ static void handle_watch_notify(struct ceph_osd_client *osdc,
	}

	if (le16_to_cpu(msg->hdr.version) >= 2)
		p += 4; /* skip return_code */
		ceph_decode_32_safe(&p, end, return_code, bad);

	if (le16_to_cpu(msg->hdr.version) >= 3)
		ceph_decode_64_safe(&p, end, notifier_id, bad);
@@ -3063,13 +3109,38 @@ static void handle_watch_notify(struct ceph_osd_client *osdc,
	}

	mutex_lock(&lreq->lock);
	dout("%s opcode %d cookie %llu lreq %p\n", __func__, opcode, cookie,
	     lreq);
	dout("%s opcode %d cookie %llu lreq %p is_watch %d\n", __func__,
	     opcode, cookie, lreq, lreq->is_watch);
	if (opcode == CEPH_WATCH_EVENT_DISCONNECT) {
		if (!lreq->last_error) {
			lreq->last_error = -ENOTCONN;
			queue_watch_error(lreq);
		}
	} else if (!lreq->is_watch) {
		/* CEPH_WATCH_EVENT_NOTIFY_COMPLETE */
		if (lreq->notify_id && lreq->notify_id != notify_id) {
			dout("lreq %p notify_id %llu != %llu, ignoring\n", lreq,
			     lreq->notify_id, notify_id);
		} else if (!completion_done(&lreq->notify_finish_wait)) {
			struct ceph_msg_data *data =
			    list_first_entry_or_null(&msg->data,
						     struct ceph_msg_data,
						     links);

			if (data) {
				if (lreq->preply_pages) {
					WARN_ON(data->type !=
							CEPH_MSG_DATA_PAGES);
					*lreq->preply_pages = data->pages;
					*lreq->preply_len = data->length;
				} else {
					ceph_release_page_vector(data->pages,
					       calc_pages_for(0, data->length));
				}
			}
			lreq->notify_finish_error = return_code;
			complete_all(&lreq->notify_finish_wait);
		}
	} else {
		/* CEPH_WATCH_EVENT_NOTIFY */
		lwork = lwork_alloc(lreq, do_watch_notify);
@@ -3241,6 +3312,7 @@ ceph_osdc_watch(struct ceph_osd_client *osdc,
	if (!lreq)
		return ERR_PTR(-ENOMEM);

	lreq->is_watch = true;
	lreq->wcb = wcb;
	lreq->errcb = errcb;
	lreq->data = data;
@@ -3395,6 +3467,116 @@ int ceph_osdc_notify_ack(struct ceph_osd_client *osdc,
}
EXPORT_SYMBOL(ceph_osdc_notify_ack);

static int osd_req_op_notify_init(struct ceph_osd_request *req, int which,
				  u64 cookie, u32 prot_ver, u32 timeout,
				  void *payload, size_t payload_len)
{
	struct ceph_osd_req_op *op;
	struct ceph_pagelist *pl;
	int ret;

	op = _osd_req_op_init(req, which, CEPH_OSD_OP_NOTIFY, 0);
	op->notify.cookie = cookie;

	pl = kmalloc(sizeof(*pl), GFP_NOIO);
	if (!pl)
		return -ENOMEM;

	ceph_pagelist_init(pl);
	ret = ceph_pagelist_encode_32(pl, 1); /* prot_ver */
	ret |= ceph_pagelist_encode_32(pl, timeout);
	ret |= ceph_pagelist_encode_32(pl, payload_len);
	ret |= ceph_pagelist_append(pl, payload, payload_len);
	if (ret) {
		ceph_pagelist_release(pl);
		return -ENOMEM;
	}

	ceph_osd_data_pagelist_init(&op->notify.request_data, pl);
	op->indata_len = pl->length;
	return 0;
}

/*
 * @timeout: in seconds
 *
 * @preply_{pages,len} are initialized both on success and error.
 * The caller is responsible for:
 *
 *     ceph_release_page_vector(reply_pages, calc_pages_for(0, reply_len))
 */
int ceph_osdc_notify(struct ceph_osd_client *osdc,
		     struct ceph_object_id *oid,
		     struct ceph_object_locator *oloc,
		     void *payload,
		     size_t payload_len,
		     u32 timeout,
		     struct page ***preply_pages,
		     size_t *preply_len)
{
	struct ceph_osd_linger_request *lreq;
	struct page **pages;
	int ret;

	WARN_ON(!timeout);
	if (preply_pages) {
		*preply_pages = NULL;
		*preply_len = 0;
	}

	lreq = linger_alloc(osdc);
	if (!lreq)
		return -ENOMEM;

	lreq->preply_pages = preply_pages;
	lreq->preply_len = preply_len;

	ceph_oid_copy(&lreq->t.base_oid, oid);
	ceph_oloc_copy(&lreq->t.base_oloc, oloc);
	lreq->t.flags = CEPH_OSD_FLAG_READ;

	lreq->reg_req = alloc_linger_request(lreq);
	if (!lreq->reg_req) {
		ret = -ENOMEM;
		goto out_put_lreq;
	}

	/* for notify_id */
	pages = ceph_alloc_page_vector(1, GFP_NOIO);
	if (IS_ERR(pages)) {
		ret = PTR_ERR(pages);
		goto out_put_lreq;
	}

	down_write(&osdc->lock);
	linger_register(lreq); /* before osd_req_op_* */
	ret = osd_req_op_notify_init(lreq->reg_req, 0, lreq->linger_id, 1,
				     timeout, payload, payload_len);
	if (ret) {
		linger_unregister(lreq);
		up_write(&osdc->lock);
		ceph_release_page_vector(pages, 1);
		goto out_put_lreq;
	}
	ceph_osd_data_pages_init(osd_req_op_data(lreq->reg_req, 0, notify,
						 response_data),
				 pages, PAGE_SIZE, 0, false, true);
	linger_submit(lreq);
	up_write(&osdc->lock);

	ret = linger_reg_commit_wait(lreq);
	if (!ret)
		ret = linger_notify_finish_wait(lreq);
	else
		dout("lreq %p failed to initiate notify %d\n", lreq, ret);

	linger_cancel(lreq);
out_put_lreq:
	linger_put(lreq);
	return ret;
}
EXPORT_SYMBOL(ceph_osdc_notify);

/*
 * Call all pending notify callbacks - for use after a watch is
 * unregistered, to make sure no more callbacks for it will be invoked
@@ -3693,19 +3875,51 @@ static struct ceph_msg *get_reply(struct ceph_connection *con,
	return m;
}

/*
 * TODO: switch to a msg-owned pagelist
 */
static struct ceph_msg *alloc_msg_with_page_vector(struct ceph_msg_header *hdr)
{
	struct ceph_msg *m;
	int type = le16_to_cpu(hdr->type);
	u32 front_len = le32_to_cpu(hdr->front_len);
	u32 data_len = le32_to_cpu(hdr->data_len);

	m = ceph_msg_new(type, front_len, GFP_NOIO, false);
	if (!m)
		return NULL;

	if (data_len) {
		struct page **pages;
		struct ceph_osd_data osd_data;

		pages = ceph_alloc_page_vector(calc_pages_for(0, data_len),
					       GFP_NOIO);
		if (!pages) {
			ceph_msg_put(m);
			return NULL;
		}

		ceph_osd_data_pages_init(&osd_data, pages, data_len, 0, false,
					 false);
		ceph_osdc_msg_data_add(m, &osd_data);
	}

	return m;
}

static struct ceph_msg *alloc_msg(struct ceph_connection *con,
				  struct ceph_msg_header *hdr,
				  int *skip)
{
	struct ceph_osd *osd = con->private;
	int type = le16_to_cpu(hdr->type);
	int front = le32_to_cpu(hdr->front_len);

	*skip = 0;
	switch (type) {
	case CEPH_MSG_OSD_MAP:
	case CEPH_MSG_WATCH_NOTIFY:
		return ceph_msg_new(type, front, GFP_NOFS, false);
		return alloc_msg_with_page_vector(hdr);
	case CEPH_MSG_OSD_OPREPLY:
		return get_reply(con, hdr, skip);
	default: