Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit 788e2df3 authored by Alex Elder's avatar Alex Elder Committed by Sage Weil
Browse files

rbd: implement sync object read with new code

Reimplement the synchronous read operation used for reading a
version 1 header using the new request tracking code.  Name the
resulting function rbd_obj_read_sync() to better reflect that
it's a full object operation, not an object request.  To do this,
implement a new OBJ_REQUEST_PAGES object request type.

This implements a new mechanism to allow the caller to wait for
completion for an rbd_obj_request by calling rbd_obj_request_wait().

This partially resolves:
    http://tracker.newdream.net/issues/3755



Signed-off-by: default avatarAlex Elder <elder@inktank.com>
Reviewed-by: default avatarJosh Durgin <josh.durgin@inktank.com>
parent 7d250b94
Loading
Loading
Loading
Loading
+92 −4
Original line number Diff line number Diff line
@@ -170,7 +170,7 @@ typedef void (*rbd_img_callback_t)(struct rbd_img_request *);
struct rbd_obj_request;
typedef void (*rbd_obj_callback_t)(struct rbd_obj_request *);

enum obj_request_type { OBJ_REQUEST_BIO };	/* More types to come */
enum obj_request_type { OBJ_REQUEST_BIO, OBJ_REQUEST_PAGES };

struct rbd_obj_request {
	const char		*object_name;
@@ -182,7 +182,13 @@ struct rbd_obj_request {
	u32			which;		/* posn image request list */

	enum obj_request_type	type;
	union {
		struct bio	*bio_list;
		struct {
			struct page	**pages;
			u32		page_count;
		};
	};

	struct ceph_osd_request	*osd_req;

@@ -192,6 +198,7 @@ struct rbd_obj_request {
	atomic_t		done;

	rbd_obj_callback_t	callback;
	struct completion	completion;

	struct kref		kref;
};
@@ -1077,6 +1084,7 @@ static bool obj_request_type_valid(enum obj_request_type type)
{
	switch (type) {
	case OBJ_REQUEST_BIO:
	case OBJ_REQUEST_PAGES:
		return true;
	default:
		return false;
@@ -1291,14 +1299,23 @@ static void rbd_img_request_complete(struct rbd_img_request *img_request)
		rbd_img_request_put(img_request);
}

/* Caller is responsible for rbd_obj_request_destroy(obj_request) */

static int rbd_obj_request_wait(struct rbd_obj_request *obj_request)
{
	return wait_for_completion_interruptible(&obj_request->completion);
}

static void rbd_obj_request_complete(struct rbd_obj_request *obj_request)
{
	if (obj_request->callback)
		obj_request->callback(obj_request);
	else
		complete_all(&obj_request->completion);
}

/*
 * Request sync osd read
 * Synchronously read a range from an object into a provided buffer
 */
static int rbd_req_sync_read(struct rbd_device *rbd_dev,
			  const char *object_name,
@@ -1556,6 +1573,11 @@ static struct ceph_osd_request *rbd_osd_req_create(
		/* osd client requires "num pages" even for bio */
		osd_req->r_num_pages = calc_pages_for(offset, length);
		break;
	case OBJ_REQUEST_PAGES:
		osd_req->r_pages = obj_request->pages;
		osd_req->r_num_pages = obj_request->page_count;
		osd_req->r_page_alignment = offset & ~PAGE_MASK;
		break;
	}

	if (write_request) {
@@ -1616,6 +1638,7 @@ static struct rbd_obj_request *rbd_obj_request_create(const char *object_name,
	obj_request->type = type;
	INIT_LIST_HEAD(&obj_request->links);
	atomic_set(&obj_request->done, 0);
	init_completion(&obj_request->completion);
	kref_init(&obj_request->kref);

	return obj_request;
@@ -1639,6 +1662,11 @@ static void rbd_obj_request_destroy(struct kref *kref)
		if (obj_request->bio_list)
			bio_chain_put(obj_request->bio_list);
		break;
	case OBJ_REQUEST_PAGES:
		if (obj_request->pages)
			ceph_release_page_vector(obj_request->pages,
						obj_request->page_count);
		break;
	}

	kfree(obj_request);
@@ -1987,6 +2015,65 @@ static void rbd_free_disk(struct rbd_device *rbd_dev)
	put_disk(disk);
}

static int rbd_obj_read_sync(struct rbd_device *rbd_dev,
				const char *object_name,
				u64 offset, u64 length,
				char *buf, u64 *version)

{
	struct ceph_osd_req_op *op;
	struct rbd_obj_request *obj_request;
	struct ceph_osd_client *osdc;
	struct page **pages = NULL;
	u32 page_count;
	int ret;

	page_count = (u32) calc_pages_for(offset, length);
	pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
	if (IS_ERR(pages))
		ret = PTR_ERR(pages);

	ret = -ENOMEM;
	obj_request = rbd_obj_request_create(object_name, offset, length,
						OBJ_REQUEST_PAGES);
	if (!obj_request)
		goto out;

	obj_request->pages = pages;
	obj_request->page_count = page_count;

	op = rbd_osd_req_op_create(CEPH_OSD_OP_READ, offset, length);
	if (!op)
		goto out;
	obj_request->osd_req = rbd_osd_req_create(rbd_dev, false,
						obj_request, op);
	rbd_osd_req_op_destroy(op);
	if (!obj_request->osd_req)
		goto out;

	osdc = &rbd_dev->rbd_client->client->osdc;
	ret = rbd_obj_request_submit(osdc, obj_request);
	if (ret)
		goto out;
	ret = rbd_obj_request_wait(obj_request);
	if (ret)
		goto out;

	ret = obj_request->result;
	if (ret < 0)
		goto out;
	ret = ceph_copy_from_page_vector(pages, buf, 0, obj_request->xferred);
	if (version)
		*version = obj_request->version;
out:
	if (obj_request)
		rbd_obj_request_put(obj_request);
	else
		ceph_release_page_vector(pages, page_count);

	return ret;
}

/*
 * Read the complete header for the given rbd device.
 *
@@ -2025,7 +2112,8 @@ rbd_dev_v1_header_read(struct rbd_device *rbd_dev, u64 *version)
		if (!ondisk)
			return ERR_PTR(-ENOMEM);

		ret = rbd_req_sync_read(rbd_dev, rbd_dev->header_name,
		(void) rbd_req_sync_read;	/* avoid a warning */
		ret = rbd_obj_read_sync(rbd_dev, rbd_dev->header_name,
				       0, size,
				       (char *) ondisk, version);