Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit 51c3509e authored by Ilya Dryomov's avatar Ilya Dryomov
Browse files

rbd: remove old request handling code



Signed-off-by: default avatarIlya Dryomov <idryomov@gmail.com>
parent 3da691bf
Loading
Loading
Loading
Loading
+4 −730
Original line number Diff line number Diff line
@@ -231,8 +231,6 @@ enum obj_operation_type {
enum obj_req_flags {
	OBJ_REQ_DONE,		/* completion flag: not done = 0, done = 1 */
	OBJ_REQ_IMG_DATA,	/* object usage: standalone = 0, image = 1 */
	OBJ_REQ_KNOWN,		/* EXISTS flag valid: no = 0, yes = 1 */
	OBJ_REQ_EXISTS,		/* target exists: no = 0, yes = 1 */
};

/*
@@ -271,27 +269,15 @@ struct rbd_obj_request {
	 * An object request associated with an image will have its
	 * img_data flag set; a standalone object request will not.
	 *
	 * A standalone object request will have which == BAD_WHICH
	 * and a null obj_request pointer.
	 *
	 * An object request initiated in support of a layered image
	 * object (to check for its existence before a write) will
	 * have which == BAD_WHICH and a non-null obj_request pointer.
	 *
	 * Finally, an object request for rbd image data will have
	 * which != BAD_WHICH, and will have a non-null img_request
	 * pointer.  The value of which will be in the range
	 * 0..(img_request->obj_request_count-1).
	 */
	union {
		struct rbd_obj_request	*obj_request;	/* STAT op */
		struct {
	struct rbd_img_request	*img_request;
	u64			img_offset;
	/* links for img_request->obj_requests list */
	struct list_head	links;
		};
	};
	u32			which;		/* posn image request list */

	enum obj_request_type	type;
@@ -480,8 +466,6 @@ static bool single_major = true;
module_param(single_major, bool, S_IRUGO);
MODULE_PARM_DESC(single_major, "Use a single major number for all rbd devices (default: true)");

static int rbd_img_request_submit(struct rbd_img_request *img_request);

static ssize_t rbd_add(struct bus_type *bus, const char *buf,
		       size_t count);
static ssize_t rbd_remove(struct bus_type *bus, const char *buf,
@@ -610,9 +594,6 @@ void rbd_warn(struct rbd_device *rbd_dev, const char *fmt, ...)
#  define rbd_assert(expr)	((void) 0)
#endif /* !RBD_DEBUG */

static void rbd_osd_copyup_callback(struct rbd_obj_request *obj_request);
static int rbd_img_obj_request_submit(struct rbd_obj_request *obj_request);
static void rbd_img_parent_read(struct rbd_obj_request *obj_request);
static void rbd_dev_remove_parent(struct rbd_device *rbd_dev);

static int rbd_dev_refresh(struct rbd_device *rbd_dev);
@@ -1369,37 +1350,6 @@ static bool obj_request_done_test(struct rbd_obj_request *obj_request)
	return test_bit(OBJ_REQ_DONE, &obj_request->flags) != 0;
}

/*
 * This sets the KNOWN flag after (possibly) setting the EXISTS
 * flag.  The latter is set based on the "exists" value provided.
 *
 * Note that for our purposes once an object exists it never goes
 * away again.  It's possible that the response from two existence
 * checks are separated by the creation of the target object, and
 * the first ("doesn't exist") response arrives *after* the second
 * ("does exist").  In that case we ignore the second one.
 */
static void obj_request_existence_set(struct rbd_obj_request *obj_request,
				bool exists)
{
	if (exists)
		set_bit(OBJ_REQ_EXISTS, &obj_request->flags);
	set_bit(OBJ_REQ_KNOWN, &obj_request->flags);
	smp_mb();
}

static bool obj_request_known_test(struct rbd_obj_request *obj_request)
{
	smp_mb();
	return test_bit(OBJ_REQ_KNOWN, &obj_request->flags) != 0;
}

static bool obj_request_exists_test(struct rbd_obj_request *obj_request)
{
	smp_mb();
	return test_bit(OBJ_REQ_EXISTS, &obj_request->flags) != 0;
}

static bool obj_request_overlaps_parent(struct rbd_obj_request *obj_request)
{
	struct rbd_device *rbd_dev = obj_request->img_request->rbd_dev;
@@ -1643,42 +1593,6 @@ static bool rbd_img_is_write(struct rbd_img_request *img_req)
	}
}

static void
rbd_img_obj_request_read_callback(struct rbd_obj_request *obj_request)
{
	u64 xferred = obj_request->xferred;
	u64 length = obj_request->length;

	dout("%s: obj %p img %p result %d %llu/%llu\n", __func__,
		obj_request, obj_request->img_request, obj_request->result,
		xferred, length);
	/*
	 * ENOENT means a hole in the image.  We zero-fill the entire
	 * length of the request.  A short read also implies zero-fill
	 * to the end of the request.  An error requires the whole
	 * length of the request to be reported finished with an error
	 * to the block layer.  In each case we update the xferred
	 * count to indicate the whole request was satisfied.
	 */
	rbd_assert(obj_request->type != OBJ_REQUEST_NODATA);
	if (obj_request->result == -ENOENT) {
		if (obj_request->type == OBJ_REQUEST_BIO)
			zero_bios(&obj_request->bio_pos, 0, length);
		else
			zero_bvecs(&obj_request->bvec_pos, 0, length);
		obj_request->result = 0;
	} else if (xferred < length && !obj_request->result) {
		if (obj_request->type == OBJ_REQUEST_BIO)
			zero_bios(&obj_request->bio_pos, xferred,
				  length - xferred);
		else
			zero_bvecs(&obj_request->bvec_pos, xferred,
				   length - xferred);
	}
	obj_request->xferred = length;
	obj_request_done_set(obj_request);
}

static void rbd_obj_request_complete(struct rbd_obj_request *obj_request)
{
	dout("%s: obj %p cb %p\n", __func__, obj_request,
@@ -1686,93 +1600,6 @@ static void rbd_obj_request_complete(struct rbd_obj_request *obj_request)
	obj_request->callback(obj_request);
}

static void rbd_obj_request_error(struct rbd_obj_request *obj_request, int err)
{
	obj_request->result = err;
	obj_request->xferred = 0;
	/*
	 * kludge - mirror rbd_obj_request_submit() to match a put in
	 * rbd_img_obj_callback()
	 */
	if (obj_request_img_data_test(obj_request)) {
		WARN_ON(obj_request->callback != rbd_img_obj_callback);
		rbd_img_request_get(obj_request->img_request);
	}
	obj_request_done_set(obj_request);
	rbd_obj_request_complete(obj_request);
}

static void rbd_osd_read_callback(struct rbd_obj_request *obj_request)
{
	struct rbd_img_request *img_request = NULL;
	struct rbd_device *rbd_dev = NULL;
	bool layered = false;

	if (obj_request_img_data_test(obj_request)) {
		img_request = obj_request->img_request;
		layered = img_request && img_request_layered_test(img_request);
		rbd_dev = img_request->rbd_dev;
	}

	dout("%s: obj %p img %p result %d %llu/%llu\n", __func__,
		obj_request, img_request, obj_request->result,
		obj_request->xferred, obj_request->length);
	if (layered && obj_request->result == -ENOENT &&
			obj_request->img_offset < rbd_dev->parent_overlap)
		rbd_img_parent_read(obj_request);
	else if (img_request)
		rbd_img_obj_request_read_callback(obj_request);
	else
		obj_request_done_set(obj_request);
}

static void rbd_osd_write_callback(struct rbd_obj_request *obj_request)
{
	dout("%s: obj %p result %d %llu\n", __func__, obj_request,
		obj_request->result, obj_request->length);
	/*
	 * There is no such thing as a successful short write.  Set
	 * it to our originally-requested length.
	 */
	obj_request->xferred = obj_request->length;
	obj_request_done_set(obj_request);
}

static void rbd_osd_discard_callback(struct rbd_obj_request *obj_request)
{
	dout("%s: obj %p result %d %llu\n", __func__, obj_request,
		obj_request->result, obj_request->length);
	/*
	 * There is no such thing as a successful short discard.  Set
	 * it to our originally-requested length.
	 */
	obj_request->xferred = obj_request->length;
	/* discarding a non-existent object is not a problem */
	if (obj_request->result == -ENOENT)
		obj_request->result = 0;
	obj_request_done_set(obj_request);
}

/*
 * For a simple stat call there's nothing to do.  We'll do more if
 * this is part of a write sequence for a layered image.
 */
static void rbd_osd_stat_callback(struct rbd_obj_request *obj_request)
{
	dout("%s: obj %p\n", __func__, obj_request);
	obj_request_done_set(obj_request);
}

static void rbd_osd_call_callback(struct rbd_obj_request *obj_request)
{
	dout("%s: obj %p\n", __func__, obj_request);

	if (obj_request_img_data_test(obj_request))
		rbd_osd_copyup_callback(obj_request);
	else
		obj_request_done_set(obj_request);
}

static void rbd_obj_handle_request(struct rbd_obj_request *obj_req);

static void rbd_osd_req_callback(struct ceph_osd_request *osd_req)
@@ -1871,32 +1698,6 @@ static struct ceph_osd_request *rbd_osd_req_create(
	    CEPH_OSD_FLAG_WRITE : CEPH_OSD_FLAG_READ, obj_request);
}

/*
 * Create a copyup osd request based on the information in the object
 * request supplied.  A copyup request has two or three osd ops, a
 * copyup method call, potentially a hint op, and a write or truncate
 * or zero op.
 */
static struct ceph_osd_request *
rbd_osd_req_create_copyup(struct rbd_obj_request *obj_request)
{
	struct rbd_img_request *img_request;
	int num_osd_ops = 3;

	rbd_assert(obj_request_img_data_test(obj_request));
	img_request = obj_request->img_request;
	rbd_assert(img_request);
	rbd_assert(img_request_write_test(img_request) ||
			img_request_discard_test(img_request));

	if (img_request_discard_test(img_request))
		num_osd_ops = 2;

	return __rbd_osd_req_create(img_request->rbd_dev,
				    img_request->snapc, num_osd_ops,
				    CEPH_OSD_FLAG_WRITE, obj_request);
}

static void rbd_osd_req_destroy(struct ceph_osd_request *osd_req)
{
	ceph_osdc_put_request(osd_req);
@@ -2217,73 +2018,6 @@ static void rbd_img_obj_callback(struct rbd_obj_request *obj_request)
		rbd_img_request_complete(img_request);
}

/*
 * Add individual osd ops to the given ceph_osd_request and prepare
 * them for submission. num_ops is the current number of
 * osd operations already to the object request.
 */
static void rbd_img_obj_request_fill(struct rbd_obj_request *obj_request,
				struct ceph_osd_request *osd_request,
				enum obj_operation_type op_type,
				unsigned int num_ops)
{
	struct rbd_img_request *img_request = obj_request->img_request;
	struct rbd_device *rbd_dev = img_request->rbd_dev;
	u64 object_size = rbd_obj_bytes(&rbd_dev->header);
	u64 offset = obj_request->offset;
	u64 length = obj_request->length;
	u64 img_end;
	u16 opcode;

	if (op_type == OBJ_OP_DISCARD) {
		if (!offset && length == object_size &&
		    (!img_request_layered_test(img_request) ||
		     !obj_request_overlaps_parent(obj_request))) {
			opcode = CEPH_OSD_OP_DELETE;
		} else if ((offset + length == object_size)) {
			opcode = CEPH_OSD_OP_TRUNCATE;
		} else {
			down_read(&rbd_dev->header_rwsem);
			img_end = rbd_dev->header.image_size;
			up_read(&rbd_dev->header_rwsem);

			if (obj_request->img_offset + length == img_end)
				opcode = CEPH_OSD_OP_TRUNCATE;
			else
				opcode = CEPH_OSD_OP_ZERO;
		}
	} else if (op_type == OBJ_OP_WRITE) {
		if (!offset && length == object_size)
			opcode = CEPH_OSD_OP_WRITEFULL;
		else
			opcode = CEPH_OSD_OP_WRITE;
		osd_req_op_alloc_hint_init(osd_request, num_ops,
					object_size, object_size);
		num_ops++;
	} else {
		opcode = CEPH_OSD_OP_READ;
	}

	if (opcode == CEPH_OSD_OP_DELETE)
		osd_req_op_init(osd_request, num_ops, opcode, 0);
	else
		osd_req_op_extent_init(osd_request, num_ops, opcode,
				       offset, length, 0, 0);

	if (obj_request->type == OBJ_REQUEST_BIO)
		osd_req_op_extent_osd_data_bio(osd_request, num_ops,
					&obj_request->bio_pos, length);
	else if (obj_request->type == OBJ_REQUEST_BVECS)
		osd_req_op_extent_osd_data_bvec_pos(osd_request, num_ops,
					&obj_request->bvec_pos);

	/* Discards are also writes */
	if (op_type == OBJ_OP_WRITE || op_type == OBJ_OP_DISCARD)
		rbd_osd_req_format_write(obj_request);
	else
		rbd_osd_req_format_read(obj_request);
}

static void rbd_osd_req_setup_data(struct rbd_obj_request *obj_req, u32 which)
{
	switch (obj_req->type) {
@@ -2568,366 +2302,6 @@ static int rbd_img_request_fill(struct rbd_img_request *img_request,
	return -ENOMEM;
}

static void
rbd_osd_copyup_callback(struct rbd_obj_request *obj_request)
{
	struct rbd_img_request *img_request;
	struct rbd_device *rbd_dev;

	dout("%s: obj %p\n", __func__, obj_request);

	rbd_assert(obj_request->type == OBJ_REQUEST_BIO ||
		obj_request->type == OBJ_REQUEST_NODATA);
	rbd_assert(obj_request_img_data_test(obj_request));
	img_request = obj_request->img_request;
	rbd_assert(img_request);

	rbd_dev = img_request->rbd_dev;
	rbd_assert(rbd_dev);

	/*
	 * We want the transfer count to reflect the size of the
	 * original write request.  There is no such thing as a
	 * successful short write, so if the request was successful
	 * we can just set it to the originally-requested length.
	 */
	if (!obj_request->result)
		obj_request->xferred = obj_request->length;

	obj_request_done_set(obj_request);
}

static void
rbd_img_obj_parent_read_full_callback(struct rbd_img_request *img_request)
{
	struct rbd_obj_request *orig_request;
	struct ceph_osd_request *osd_req;
	struct rbd_device *rbd_dev;
	enum obj_operation_type op_type;
	int img_result;
	u64 parent_length;

	rbd_assert(img_request_child_test(img_request));

	/* First get what we need from the image request */

	orig_request = img_request->obj_request;
	rbd_assert(orig_request != NULL);
	rbd_assert(obj_request_type_valid(orig_request->type));
	img_result = img_request->result;
	parent_length = img_request->length;
	rbd_assert(img_result || parent_length == img_request->xferred);
	rbd_img_request_put(img_request);

	rbd_assert(orig_request->img_request);
	rbd_dev = orig_request->img_request->rbd_dev;
	rbd_assert(rbd_dev);

	/*
	 * If the overlap has become 0 (most likely because the
	 * image has been flattened) we need to free the pages
	 * and re-submit the original write request.
	 */
	if (!rbd_dev->parent_overlap) {
		rbd_obj_request_submit(orig_request);
		return;
	}

	if (img_result)
		goto out_err;

	/*
	 * The original osd request is of no use to use any more.
	 * We need a new one that can hold the three ops in a copyup
	 * request.  Allocate the new copyup osd request for the
	 * original request, and release the old one.
	 */
	img_result = -ENOMEM;
	osd_req = rbd_osd_req_create_copyup(orig_request);
	if (!osd_req)
		goto out_err;
	rbd_osd_req_destroy(orig_request->osd_req);
	orig_request->osd_req = osd_req;

	/* Initialize the copyup op */

	osd_req_op_cls_init(osd_req, 0, CEPH_OSD_OP_CALL, "rbd", "copyup");
	osd_req_op_cls_request_data_bvecs(osd_req, 0, orig_request->copyup_bvecs,
					  parent_length);

	/* Add the other op(s) */

	op_type = rbd_img_request_op_type(orig_request->img_request);
	rbd_img_obj_request_fill(orig_request, osd_req, op_type, 1);

	/* All set, send it off. */

	rbd_obj_request_submit(orig_request);
	return;

out_err:
	rbd_obj_request_error(orig_request, img_result);
}

static int setup_copyup_bvecs(struct rbd_obj_request *obj_req, u64 obj_overlap);

/*
 * Read from the parent image the range of data that covers the
 * entire target of the given object request.  This is used for
 * satisfying a layered image write request when the target of an
 * object request from the image request does not exist.
 *
 * A page array big enough to hold the returned data is allocated
 * and supplied to rbd_img_request_fill() as the "data descriptor."
 * When the read completes, this page array will be transferred to
 * the original object request for the copyup operation.
 *
 * If an error occurs, it is recorded as the result of the original
 * object request in rbd_img_obj_exists_callback().
 */
static int rbd_img_obj_parent_read_full(struct rbd_obj_request *obj_request)
{
	struct rbd_device *rbd_dev = obj_request->img_request->rbd_dev;
	struct rbd_img_request *parent_request = NULL;
	struct ceph_bvec_iter bvec_it = { 0 };
	u64 img_offset;
	u64 length;
	int result;

	rbd_assert(rbd_dev->parent != NULL);

	/*
	 * Determine the byte range covered by the object in the
	 * child image to which the original request was to be sent.
	 */
	img_offset = obj_request->img_offset - obj_request->offset;
	length = rbd_obj_bytes(&rbd_dev->header);

	/*
	 * There is no defined parent data beyond the parent
	 * overlap, so limit what we read at that boundary if
	 * necessary.
	 */
	if (img_offset + length > rbd_dev->parent_overlap) {
		rbd_assert(img_offset < rbd_dev->parent_overlap);
		length = rbd_dev->parent_overlap - img_offset;
	}

	/*
	 * Allocate a page array big enough to receive the data read
	 * from the parent.
	 */
	result = setup_copyup_bvecs(obj_request, length);
	if (result)
		goto out_err;

	result = -ENOMEM;
	parent_request = rbd_parent_request_create(obj_request,
						img_offset, length);
	if (!parent_request)
		goto out_err;

	bvec_it.bvecs = obj_request->copyup_bvecs;
	bvec_it.iter.bi_size = length;
	result = rbd_img_request_fill(parent_request, OBJ_REQUEST_BVECS,
				      &bvec_it);
	if (result)
		goto out_err;

	parent_request->callback = rbd_img_obj_parent_read_full_callback;

	result = rbd_img_request_submit(parent_request);
	if (!result)
		return 0;

out_err:
	if (parent_request)
		rbd_img_request_put(parent_request);
	return result;
}

static void rbd_img_obj_exists_callback(struct rbd_obj_request *obj_request)
{
	struct rbd_obj_request *orig_request;
	struct rbd_device *rbd_dev;
	int result;

	rbd_assert(!obj_request_img_data_test(obj_request));

	/*
	 * All we need from the object request is the original
	 * request and the result of the STAT op.  Grab those, then
	 * we're done with the request.
	 */
	orig_request = obj_request->obj_request;
	obj_request->obj_request = NULL;
	rbd_obj_request_put(orig_request);
	rbd_assert(orig_request);
	rbd_assert(orig_request->img_request);

	result = obj_request->result;
	obj_request->result = 0;

	dout("%s: obj %p for obj %p result %d %llu/%llu\n", __func__,
		obj_request, orig_request, result,
		obj_request->xferred, obj_request->length);
	rbd_obj_request_put(obj_request);

	/*
	 * If the overlap has become 0 (most likely because the
	 * image has been flattened) we need to re-submit the
	 * original request.
	 */
	rbd_dev = orig_request->img_request->rbd_dev;
	if (!rbd_dev->parent_overlap) {
		rbd_obj_request_submit(orig_request);
		return;
	}

	/*
	 * Our only purpose here is to determine whether the object
	 * exists, and we don't want to treat the non-existence as
	 * an error.  If something else comes back, transfer the
	 * error to the original request and complete it now.
	 */
	if (!result) {
		obj_request_existence_set(orig_request, true);
	} else if (result == -ENOENT) {
		obj_request_existence_set(orig_request, false);
	} else {
		goto fail_orig_request;
	}

	/*
	 * Resubmit the original request now that we have recorded
	 * whether the target object exists.
	 */
	result = rbd_img_obj_request_submit(orig_request);
	if (result)
		goto fail_orig_request;

	return;

fail_orig_request:
	rbd_obj_request_error(orig_request, result);
}

static int rbd_img_obj_exists_submit(struct rbd_obj_request *obj_request)
{
	struct rbd_device *rbd_dev = obj_request->img_request->rbd_dev;
	struct rbd_obj_request *stat_request;
	struct page **pages;
	int ret;

	stat_request = rbd_obj_request_create(OBJ_REQUEST_NODATA);
	if (!stat_request)
		return -ENOMEM;

	stat_request->object_no = obj_request->object_no;

	stat_request->osd_req = rbd_osd_req_create(rbd_dev, OBJ_OP_READ, 1,
						   stat_request);
	if (!stat_request->osd_req) {
		ret = -ENOMEM;
		goto fail_stat_request;
	}

	/*
	 * The response data for a STAT call consists of:
	 *     le64 length;
	 *     struct {
	 *         le32 tv_sec;
	 *         le32 tv_nsec;
	 *     } mtime;
	 */
	pages = ceph_alloc_page_vector(1, GFP_NOIO);
	if (IS_ERR(pages)) {
		ret = PTR_ERR(pages);
		goto fail_stat_request;
	}

	osd_req_op_init(stat_request->osd_req, 0, CEPH_OSD_OP_STAT, 0);
	osd_req_op_raw_data_in_pages(stat_request->osd_req, 0, pages,
				     8 + sizeof(struct ceph_timespec),
				     0, false, true);

	rbd_obj_request_get(obj_request);
	stat_request->obj_request = obj_request;
	stat_request->callback = rbd_img_obj_exists_callback;

	rbd_obj_request_submit(stat_request);
	return 0;

fail_stat_request:
	rbd_obj_request_put(stat_request);
	return ret;
}

static bool img_obj_request_simple(struct rbd_obj_request *obj_request)
{
	struct rbd_img_request *img_request = obj_request->img_request;
	struct rbd_device *rbd_dev = img_request->rbd_dev;

	/* Reads */
	if (!img_request_write_test(img_request) &&
	    !img_request_discard_test(img_request))
		return true;

	/* Non-layered writes */
	if (!img_request_layered_test(img_request))
		return true;

	/*
	 * Layered writes outside of the parent overlap range don't
	 * share any data with the parent.
	 */
	if (!obj_request_overlaps_parent(obj_request))
		return true;

	/*
	 * Entire-object layered writes - we will overwrite whatever
	 * parent data there is anyway.
	 */
	if (!obj_request->offset &&
	    obj_request->length == rbd_obj_bytes(&rbd_dev->header))
		return true;

	/*
	 * If the object is known to already exist, its parent data has
	 * already been copied.
	 */
	if (obj_request_known_test(obj_request) &&
	    obj_request_exists_test(obj_request))
		return true;

	return false;
}

static int rbd_img_obj_request_submit(struct rbd_obj_request *obj_request)
{
	rbd_assert(obj_request_img_data_test(obj_request));
	rbd_assert(obj_request_type_valid(obj_request->type));
	rbd_assert(obj_request->img_request);

	if (img_obj_request_simple(obj_request)) {
		rbd_obj_request_submit(obj_request);
		return 0;
	}

	/*
	 * It's a layered write.  The target object might exist but
	 * we may not know that yet.  If we know it doesn't exist,
	 * start by reading the data for the full target object from
	 * the parent so we can use it for a copyup to the target.
	 */
	if (obj_request_known_test(obj_request))
		return rbd_img_obj_parent_read_full(obj_request);

	/* We don't know whether the target exists.  Go find out. */

	return rbd_img_obj_exists_submit(obj_request);
}

static int rbd_img_request_submit(struct rbd_img_request *img_request)
{
	struct rbd_obj_request *obj_request;
@@ -3131,106 +2505,6 @@ static int setup_copyup_bvecs(struct rbd_obj_request *obj_req, u64 obj_overlap)
	return 0;
}

static void rbd_img_parent_read_callback(struct rbd_img_request *img_request)
{
	struct rbd_obj_request *obj_request;
	struct rbd_device *rbd_dev;
	u64 obj_end;
	u64 img_xferred;
	int img_result;

	rbd_assert(img_request_child_test(img_request));

	/* First get what we need from the image request and release it */

	obj_request = img_request->obj_request;
	img_xferred = img_request->xferred;
	img_result = img_request->result;
	rbd_img_request_put(img_request);

	/*
	 * If the overlap has become 0 (most likely because the
	 * image has been flattened) we need to re-submit the
	 * original request.
	 */
	rbd_assert(obj_request);
	rbd_assert(obj_request->img_request);
	rbd_dev = obj_request->img_request->rbd_dev;
	if (!rbd_dev->parent_overlap) {
		rbd_obj_request_submit(obj_request);
		return;
	}

	obj_request->result = img_result;
	if (obj_request->result)
		goto out;

	/*
	 * We need to zero anything beyond the parent overlap
	 * boundary.  Since rbd_img_obj_request_read_callback()
	 * will zero anything beyond the end of a short read, an
	 * easy way to do this is to pretend the data from the
	 * parent came up short--ending at the overlap boundary.
	 */
	rbd_assert(obj_request->img_offset < U64_MAX - obj_request->length);
	obj_end = obj_request->img_offset + obj_request->length;
	if (obj_end > rbd_dev->parent_overlap) {
		u64 xferred = 0;

		if (obj_request->img_offset < rbd_dev->parent_overlap)
			xferred = rbd_dev->parent_overlap -
					obj_request->img_offset;

		obj_request->xferred = min(img_xferred, xferred);
	} else {
		obj_request->xferred = img_xferred;
	}
out:
	rbd_img_obj_request_read_callback(obj_request);
	rbd_obj_request_complete(obj_request);
}

static void rbd_img_parent_read(struct rbd_obj_request *obj_request)
{
	struct rbd_img_request *img_request;
	int result;

	rbd_assert(obj_request_img_data_test(obj_request));
	rbd_assert(obj_request->img_request != NULL);
	rbd_assert(obj_request->result == (s32) -ENOENT);
	rbd_assert(obj_request_type_valid(obj_request->type));

	/* rbd_read_finish(obj_request, obj_request->length); */
	img_request = rbd_parent_request_create(obj_request,
						obj_request->img_offset,
						obj_request->length);
	result = -ENOMEM;
	if (!img_request)
		goto out_err;

	if (obj_request->type == OBJ_REQUEST_BIO)
		result = rbd_img_request_fill(img_request, OBJ_REQUEST_BIO,
						&obj_request->bio_pos);
	else
		result = rbd_img_request_fill(img_request, OBJ_REQUEST_BVECS,
						&obj_request->bvec_pos);
	if (result)
		goto out_err;

	img_request->callback = rbd_img_parent_read_callback;
	result = rbd_img_request_submit(img_request);
	if (result)
		goto out_err;

	return;
out_err:
	if (img_request)
		rbd_img_request_put(img_request);
	obj_request->result = result;
	obj_request->xferred = 0;
	obj_request_done_set(obj_request);
}

static int rbd_obj_handle_write_guard(struct rbd_obj_request *obj_req)
{
	struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;