Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit 3da691bf authored by Ilya Dryomov's avatar Ilya Dryomov
Browse files

rbd: new request handling code



The notable changes are:

- instead of explicitly stat'ing the object to see if it exists before
  issuing the write, send the write optimistically along with the stat
  in a single OSD request
- zero copyup optimization
- all object requests are associated with an image request and have
  a valid ->img_request pointer; there are no standalone (!IMG_DATA)
  object requests anymore
- code is structured as a state machine (vs a bunch of callbacks with
  implicit state)

Signed-off-by: default avatarIlya Dryomov <idryomov@gmail.com>
parent 45a267db
Loading
Loading
Loading
Loading
+601 −77
Original line number Diff line number Diff line
@@ -235,11 +235,37 @@ enum obj_req_flags {
	OBJ_REQ_EXISTS,		/* target exists: no = 0, yes = 1 */
};

/*
 * Writes go through the following state machine to deal with
 * layering:
 *
 *                       need copyup
 * RBD_OBJ_WRITE_GUARD ---------------> RBD_OBJ_WRITE_COPYUP
 *        |     ^                              |
 *        v     \------------------------------/
 *      done
 *        ^
 *        |
 * RBD_OBJ_WRITE_FLAT
 *
 * Writes start in RBD_OBJ_WRITE_GUARD or _FLAT, depending on whether
 * there is a parent or not.
 */
enum rbd_obj_write_state {
	RBD_OBJ_WRITE_FLAT = 1,
	RBD_OBJ_WRITE_GUARD,
	RBD_OBJ_WRITE_COPYUP,
};

struct rbd_obj_request {
	u64			object_no;
	u64			offset;		/* object start byte */
	u64			length;		/* bytes from offset */
	unsigned long		flags;
	union {
		bool			tried_parent;	/* for reads */
		enum rbd_obj_write_state write_state;	/* for writes */
	};

	/*
	 * An object request associated with an image will have its
@@ -1282,6 +1308,27 @@ static void zero_bvecs(struct ceph_bvec_iter *bvec_pos, u32 off, u32 bytes)
	}));
}

/*
 * Zero a range in @obj_req data buffer defined by a bio (list) or
 * bio_vec array.
 *
 * @off is relative to the start of the data buffer.
 */
static void rbd_obj_zero_range(struct rbd_obj_request *obj_req, u32 off,
			       u32 bytes)
{
	switch (obj_req->type) {
	case OBJ_REQUEST_BIO:
		zero_bios(&obj_req->bio_pos, off, bytes);
		break;
	case OBJ_REQUEST_BVECS:
		zero_bvecs(&obj_req->bvec_pos, off, bytes);
		break;
	default:
		rbd_assert(0);
	}
}

/*
 * The default/initial value for all object request flags is 0.  For
 * each flag, once its value is set to 1 it is never reset to 0
@@ -1567,6 +1614,35 @@ rbd_img_request_op_type(struct rbd_img_request *img_request)
		return OBJ_OP_READ;
}

static bool rbd_obj_is_entire(struct rbd_obj_request *obj_req)
{
	struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;

	return !obj_req->offset &&
	       obj_req->length == rbd_dev->layout.object_size;
}

static bool rbd_obj_is_tail(struct rbd_obj_request *obj_req)
{
	struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;

	return obj_req->offset + obj_req->length ==
					rbd_dev->layout.object_size;
}

static bool rbd_img_is_write(struct rbd_img_request *img_req)
{
	switch (rbd_img_request_op_type(img_req)) {
	case OBJ_OP_READ:
		return false;
	case OBJ_OP_WRITE:
	case OBJ_OP_DISCARD:
		return true;
	default:
		rbd_assert(0);
	}
}

static void
rbd_img_obj_request_read_callback(struct rbd_obj_request *obj_request)
{
@@ -1697,63 +1773,28 @@ static void rbd_osd_call_callback(struct rbd_obj_request *obj_request)
		obj_request_done_set(obj_request);
}

static void rbd_obj_handle_request(struct rbd_obj_request *obj_req);

static void rbd_osd_req_callback(struct ceph_osd_request *osd_req)
{
	struct rbd_obj_request *obj_request = osd_req->r_priv;
	u16 opcode;
	struct rbd_obj_request *obj_req = osd_req->r_priv;

	dout("%s: osd_req %p\n", __func__, osd_req);
	rbd_assert(osd_req == obj_request->osd_req);
	if (obj_request_img_data_test(obj_request)) {
		rbd_assert(obj_request->img_request);
		rbd_assert(obj_request->which != BAD_WHICH);
	} else {
		rbd_assert(obj_request->which == BAD_WHICH);
	}

	if (osd_req->r_result < 0)
		obj_request->result = osd_req->r_result;
	dout("%s osd_req %p result %d for obj_req %p\n", __func__, osd_req,
	     osd_req->r_result, obj_req);
	rbd_assert(osd_req == obj_req->osd_req);

	obj_req->result = osd_req->r_result < 0 ? osd_req->r_result : 0;
	if (!obj_req->result && !rbd_img_is_write(obj_req->img_request))
		obj_req->xferred = osd_req->r_result;
	else
		/*
	 * We support a 64-bit length, but ultimately it has to be
	 * passed to the block layer, which just supports a 32-bit
	 * length field.
		 * Writes aren't allowed to return a data payload.  In some
		 * guarded write cases (e.g. stat + zero on an empty object)
		 * a stat response makes it through, but we don't care.
		 */
	obj_request->xferred = osd_req->r_ops[0].outdata_len;
	rbd_assert(obj_request->xferred < (u64)UINT_MAX);

	opcode = osd_req->r_ops[0].op;
	switch (opcode) {
	case CEPH_OSD_OP_READ:
		rbd_osd_read_callback(obj_request);
		break;
	case CEPH_OSD_OP_SETALLOCHINT:
		rbd_assert(osd_req->r_ops[1].op == CEPH_OSD_OP_WRITE ||
			   osd_req->r_ops[1].op == CEPH_OSD_OP_WRITEFULL);
		/* fall through */
	case CEPH_OSD_OP_WRITE:
	case CEPH_OSD_OP_WRITEFULL:
		rbd_osd_write_callback(obj_request);
		break;
	case CEPH_OSD_OP_STAT:
		rbd_osd_stat_callback(obj_request);
		break;
	case CEPH_OSD_OP_DELETE:
	case CEPH_OSD_OP_TRUNCATE:
	case CEPH_OSD_OP_ZERO:
		rbd_osd_discard_callback(obj_request);
		break;
	case CEPH_OSD_OP_CALL:
		rbd_osd_call_callback(obj_request);
		break;
	default:
		rbd_warn(NULL, "unexpected OSD op: object_no %016llx opcode %d",
			 obj_request->object_no, opcode);
		break;
	}
		obj_req->xferred = 0;

	if (obj_request_done_test(obj_request))
		rbd_obj_request_complete(obj_request);
	rbd_obj_handle_request(obj_req);
}

static void rbd_osd_req_format_read(struct rbd_obj_request *obj_request)
@@ -1806,12 +1847,6 @@ __rbd_osd_req_create(struct rbd_device *rbd_dev,
	return NULL;
}

/*
 * Create an osd request.  A read request has one osd op (read).
 * A write request has either one (watch) or two (hint+write) osd ops.
 * (All rbd data writes are prefixed with an allocation hint op, but
 * technically osd watch is a write request, hence this distinction.)
 */
static struct ceph_osd_request *rbd_osd_req_create(
					struct rbd_device *rbd_dev,
					enum obj_operation_type op_type,
@@ -1831,8 +1866,6 @@ static struct ceph_osd_request *rbd_osd_req_create(
		snapc = img_request->snapc;
	}

	rbd_assert(num_ops == 1 || ((op_type == OBJ_OP_WRITE) && num_ops == 2));

	return __rbd_osd_req_create(rbd_dev, snapc, num_ops,
	    (op_type == OBJ_OP_WRITE || op_type == OBJ_OP_DISCARD) ?
	    CEPH_OSD_FLAG_WRITE : CEPH_OSD_FLAG_READ, obj_request);
@@ -2251,6 +2284,211 @@ static void rbd_img_obj_request_fill(struct rbd_obj_request *obj_request,
		rbd_osd_req_format_read(obj_request);
}

static void rbd_osd_req_setup_data(struct rbd_obj_request *obj_req, u32 which)
{
	switch (obj_req->type) {
	case OBJ_REQUEST_BIO:
		osd_req_op_extent_osd_data_bio(obj_req->osd_req, which,
					       &obj_req->bio_pos,
					       obj_req->length);
		break;
	case OBJ_REQUEST_BVECS:
		rbd_assert(obj_req->bvec_pos.iter.bi_size ==
							obj_req->length);
		osd_req_op_extent_osd_data_bvec_pos(obj_req->osd_req, which,
						    &obj_req->bvec_pos);
		break;
	default:
		rbd_assert(0);
	}
}

static int rbd_obj_setup_read(struct rbd_obj_request *obj_req)
{
	struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;

	obj_req->osd_req = rbd_osd_req_create(rbd_dev, OBJ_OP_READ, 1, obj_req);
	if (!obj_req->osd_req)
		return -ENOMEM;

	osd_req_op_extent_init(obj_req->osd_req, 0, CEPH_OSD_OP_READ,
			       obj_req->offset, obj_req->length, 0, 0);
	rbd_osd_req_setup_data(obj_req, 0);

	rbd_osd_req_format_read(obj_req);
	return 0;
}

static int __rbd_obj_setup_stat(struct rbd_obj_request *obj_req,
				unsigned int which)
{
	struct page **pages;

	/*
	 * The response data for a STAT call consists of:
	 *     le64 length;
	 *     struct {
	 *         le32 tv_sec;
	 *         le32 tv_nsec;
	 *     } mtime;
	 */
	pages = ceph_alloc_page_vector(1, GFP_NOIO);
	if (IS_ERR(pages))
		return PTR_ERR(pages);

	osd_req_op_init(obj_req->osd_req, which, CEPH_OSD_OP_STAT, 0);
	osd_req_op_raw_data_in_pages(obj_req->osd_req, which, pages,
				     8 + sizeof(struct ceph_timespec),
				     0, false, true);
	return 0;
}

static void __rbd_obj_setup_write(struct rbd_obj_request *obj_req,
				  unsigned int which)
{
	struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
	u16 opcode;

	osd_req_op_alloc_hint_init(obj_req->osd_req, which++,
				   rbd_dev->layout.object_size,
				   rbd_dev->layout.object_size);

	if (rbd_obj_is_entire(obj_req))
		opcode = CEPH_OSD_OP_WRITEFULL;
	else
		opcode = CEPH_OSD_OP_WRITE;

	osd_req_op_extent_init(obj_req->osd_req, which, opcode,
			       obj_req->offset, obj_req->length, 0, 0);
	rbd_osd_req_setup_data(obj_req, which++);

	rbd_assert(which == obj_req->osd_req->r_num_ops);
	rbd_osd_req_format_write(obj_req);
}

static int rbd_obj_setup_write(struct rbd_obj_request *obj_req)
{
	struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
	unsigned int num_osd_ops, which = 0;
	int ret;

	if (obj_request_overlaps_parent(obj_req)) {
		obj_req->write_state = RBD_OBJ_WRITE_GUARD;
		num_osd_ops = 3; /* stat + setallochint + write/writefull */
	} else {
		obj_req->write_state = RBD_OBJ_WRITE_FLAT;
		num_osd_ops = 2; /* setallochint + write/writefull */
	}

	obj_req->osd_req = rbd_osd_req_create(rbd_dev, OBJ_OP_WRITE,
					      num_osd_ops, obj_req);
	if (!obj_req->osd_req)
		return -ENOMEM;

	if (obj_request_overlaps_parent(obj_req)) {
		ret = __rbd_obj_setup_stat(obj_req, which++);
		if (ret)
			return ret;
	}

	__rbd_obj_setup_write(obj_req, which);
	return 0;
}

static void __rbd_obj_setup_discard(struct rbd_obj_request *obj_req,
				    unsigned int which)
{
	u16 opcode;

	if (rbd_obj_is_entire(obj_req)) {
		if (obj_request_overlaps_parent(obj_req)) {
			opcode = CEPH_OSD_OP_TRUNCATE;
		} else {
			osd_req_op_init(obj_req->osd_req, which++,
					CEPH_OSD_OP_DELETE, 0);
			opcode = 0;
		}
	} else if (rbd_obj_is_tail(obj_req)) {
		opcode = CEPH_OSD_OP_TRUNCATE;
	} else {
		opcode = CEPH_OSD_OP_ZERO;
	}

	if (opcode)
		osd_req_op_extent_init(obj_req->osd_req, which++, opcode,
				       obj_req->offset, obj_req->length,
				       0, 0);

	rbd_assert(which == obj_req->osd_req->r_num_ops);
	rbd_osd_req_format_write(obj_req);
}

static int rbd_obj_setup_discard(struct rbd_obj_request *obj_req)
{
	struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
	unsigned int num_osd_ops, which = 0;
	int ret;

	if (rbd_obj_is_entire(obj_req)) {
		obj_req->write_state = RBD_OBJ_WRITE_FLAT;
		num_osd_ops = 1; /* truncate/delete */
	} else {
		if (obj_request_overlaps_parent(obj_req)) {
			obj_req->write_state = RBD_OBJ_WRITE_GUARD;
			num_osd_ops = 2; /* stat + truncate/zero */
		} else {
			obj_req->write_state = RBD_OBJ_WRITE_FLAT;
			num_osd_ops = 1; /* truncate/zero */
		}
	}

	obj_req->osd_req = rbd_osd_req_create(rbd_dev, OBJ_OP_DISCARD,
					      num_osd_ops, obj_req);
	if (!obj_req->osd_req)
		return -ENOMEM;

	if (!rbd_obj_is_entire(obj_req) &&
	    obj_request_overlaps_parent(obj_req)) {
		ret = __rbd_obj_setup_stat(obj_req, which++);
		if (ret)
			return ret;
	}

	__rbd_obj_setup_discard(obj_req, which);
	return 0;
}

/*
 * For each object request in @img_req, allocate an OSD request, add
 * individual OSD ops and prepare them for submission.  The number of
 * OSD ops depends on op_type and the overlap point (if any).
 */
static int __rbd_img_fill_request(struct rbd_img_request *img_req)
{
	struct rbd_obj_request *obj_req;
	int ret;

	for_each_obj_request(img_req, obj_req) {
		switch (rbd_img_request_op_type(img_req)) {
		case OBJ_OP_READ:
			ret = rbd_obj_setup_read(obj_req);
			break;
		case OBJ_OP_WRITE:
			ret = rbd_obj_setup_write(obj_req);
			break;
		case OBJ_OP_DISCARD:
			ret = rbd_obj_setup_discard(obj_req);
			break;
		default:
			rbd_assert(0);
		}
		if (ret)
			return ret;
	}

	return 0;
}

/*
 * Split up an image request into one or more object requests, each
 * to a different object.  The "type" parameter indicates whether
@@ -2268,7 +2506,6 @@ static int rbd_img_request_fill(struct rbd_img_request *img_request,
	struct rbd_obj_request *next_obj_request;
	struct ceph_bio_iter bio_it;
	struct ceph_bvec_iter bvec_it;
	enum obj_operation_type op_type;
	u64 img_offset;
	u64 resid;

@@ -2278,7 +2515,6 @@ static int rbd_img_request_fill(struct rbd_img_request *img_request,
	img_offset = img_request->offset;
	resid = img_request->length;
	rbd_assert(resid > 0);
	op_type = rbd_img_request_op_type(img_request);

	if (type == OBJ_REQUEST_BIO) {
		bio_it = *(struct ceph_bio_iter *)data_desc;
@@ -2289,7 +2525,6 @@ static int rbd_img_request_fill(struct rbd_img_request *img_request,
	}

	while (resid) {
		struct ceph_osd_request *osd_req;
		u64 object_no = img_offset >> rbd_dev->header.obj_order;
		u64 offset = rbd_segment_offset(rbd_dev, img_offset);
		u64 length = rbd_segment_length(rbd_dev, img_offset, resid);
@@ -2317,23 +2552,14 @@ static int rbd_img_request_fill(struct rbd_img_request *img_request,
			ceph_bvec_iter_advance(&bvec_it, length);
		}

		osd_req = rbd_osd_req_create(rbd_dev, op_type,
					(op_type == OBJ_OP_WRITE) ? 2 : 1,
					obj_request);
		if (!osd_req)
			goto out_unwind;

		obj_request->osd_req = osd_req;
		obj_request->callback = rbd_img_obj_callback;
		obj_request->img_offset = img_offset;

		rbd_img_obj_request_fill(obj_request, osd_req, op_type, 0);

		img_offset += length;
		resid -= length;
	}

	return 0;
	return __rbd_img_fill_request(img_request);

out_unwind:
	for_each_obj_request_safe(img_request, obj_request, next_obj_request)
@@ -2712,16 +2938,171 @@ static int rbd_img_request_submit(struct rbd_img_request *img_request)

	rbd_img_request_get(img_request);
	for_each_obj_request_safe(img_request, obj_request, next_obj_request) {
		ret = rbd_img_obj_request_submit(obj_request);
		if (ret)
			goto out_put_ireq;
		rbd_obj_request_submit(obj_request);
	}

out_put_ireq:
	rbd_img_request_put(img_request);
	return ret;
}

static void rbd_img_end_child_request(struct rbd_img_request *img_req);

static int rbd_obj_read_from_parent(struct rbd_obj_request *obj_req,
				    u64 img_offset, u32 bytes)
{
	struct rbd_img_request *img_req = obj_req->img_request;
	struct rbd_img_request *child_img_req;
	int ret;

	child_img_req = rbd_parent_request_create(obj_req, img_offset, bytes);
	if (!child_img_req)
		return -ENOMEM;

	child_img_req->callback = rbd_img_end_child_request;

	if (!rbd_img_is_write(img_req)) {
		switch (obj_req->type) {
		case OBJ_REQUEST_BIO:
			ret = rbd_img_request_fill(child_img_req,
						   OBJ_REQUEST_BIO,
						   &obj_req->bio_pos);
			break;
		case OBJ_REQUEST_BVECS:
			ret = rbd_img_request_fill(child_img_req,
						   OBJ_REQUEST_BVECS,
						   &obj_req->bvec_pos);
			break;
		default:
			rbd_assert(0);
		}
	} else {
		struct ceph_bvec_iter it = {
			.bvecs = obj_req->copyup_bvecs,
			.iter = { .bi_size = bytes },
		};

		ret = rbd_img_request_fill(child_img_req, OBJ_REQUEST_BVECS,
					   &it);
	}
	if (ret) {
		rbd_img_request_put(child_img_req);
		return ret;
	}

	rbd_img_request_submit(child_img_req);
	return 0;
}

static bool rbd_obj_handle_read(struct rbd_obj_request *obj_req)
{
	struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
	int ret;

	if (obj_req->result == -ENOENT &&
	    obj_req->img_offset < rbd_dev->parent_overlap &&
	    !obj_req->tried_parent) {
		u64 obj_overlap = min(obj_req->length,
			      rbd_dev->parent_overlap - obj_req->img_offset);

		obj_req->tried_parent = true;
		ret = rbd_obj_read_from_parent(obj_req, obj_req->img_offset,
					       obj_overlap);
		if (ret) {
			obj_req->result = ret;
			return true;
		}
		return false;
	}

	/*
	 * -ENOENT means a hole in the image -- zero-fill the entire
	 * length of the request.  A short read also implies zero-fill
	 * to the end of the request.  In both cases we update xferred
	 * count to indicate the whole request was satisfied.
	 */
	if (obj_req->result == -ENOENT ||
	    (!obj_req->result && obj_req->xferred < obj_req->length)) {
		rbd_assert(!obj_req->xferred || !obj_req->result);
		rbd_obj_zero_range(obj_req, obj_req->xferred,
				   obj_req->length - obj_req->xferred);
		obj_req->result = 0;
		obj_req->xferred = obj_req->length;
	}

	return true;
}

/*
 * copyup_bvecs pages are never highmem pages
 */
static bool is_zero_bvecs(struct bio_vec *bvecs, u32 bytes)
{
	struct ceph_bvec_iter it = {
		.bvecs = bvecs,
		.iter = { .bi_size = bytes },
	};

	ceph_bvec_iter_advance_step(&it, bytes, ({
		if (memchr_inv(page_address(bv.bv_page) + bv.bv_offset, 0,
			       bv.bv_len))
			return false;
	}));
	return true;
}

static int rbd_obj_issue_copyup(struct rbd_obj_request *obj_req, u32 bytes)
{
	struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
	unsigned int num_osd_ops = obj_req->osd_req->r_num_ops;

	dout("%s obj_req %p bytes %u\n", __func__, obj_req, bytes);
	rbd_assert(obj_req->osd_req->r_ops[0].op == CEPH_OSD_OP_STAT);
	rbd_osd_req_destroy(obj_req->osd_req);

	/*
	 * Create a copyup request with the same number of OSD ops as
	 * the original request.  The original request was stat + op(s),
	 * the new copyup request will be copyup + the same op(s).
	 */
	obj_req->osd_req = rbd_osd_req_create(rbd_dev,
				rbd_img_request_op_type(obj_req->img_request),
				num_osd_ops, obj_req);
	if (!obj_req->osd_req)
		return -ENOMEM;

	/*
	 * Only send non-zero copyup data to save some I/O and network
	 * bandwidth -- zero copyup data is equivalent to the object not
	 * existing.
	 */
	if (is_zero_bvecs(obj_req->copyup_bvecs, bytes)) {
		dout("%s obj_req %p detected zeroes\n", __func__, obj_req);
		bytes = 0;
	}

	osd_req_op_cls_init(obj_req->osd_req, 0, CEPH_OSD_OP_CALL, "rbd",
			    "copyup");
	osd_req_op_cls_request_data_bvecs(obj_req->osd_req, 0,
					  obj_req->copyup_bvecs, bytes);

	switch (rbd_img_request_op_type(obj_req->img_request)) {
	case OBJ_OP_WRITE:
		__rbd_obj_setup_write(obj_req, 1);
		break;
	case OBJ_OP_DISCARD:
		rbd_assert(!rbd_obj_is_entire(obj_req));
		__rbd_obj_setup_discard(obj_req, 1);
		break;
	default:
		rbd_assert(0);
	}

	rbd_obj_request_submit(obj_req);
	/* FIXME: in lieu of rbd_img_obj_callback() */
	rbd_img_request_put(obj_req->img_request);
	return 0;
}

static int setup_copyup_bvecs(struct rbd_obj_request *obj_req, u64 obj_overlap)
{
	u32 i;
@@ -2850,6 +3231,149 @@ static void rbd_img_parent_read(struct rbd_obj_request *obj_request)
	obj_request_done_set(obj_request);
}

static int rbd_obj_handle_write_guard(struct rbd_obj_request *obj_req)
{
	struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
	u64 img_offset;
	u64 obj_overlap;
	int ret;

	if (!obj_request_overlaps_parent(obj_req)) {
		/*
		 * The overlap has become 0 (most likely because the
		 * image has been flattened).  Use rbd_obj_issue_copyup()
		 * to re-submit the original write request -- the copyup
		 * operation itself will be a no-op, since someone must
		 * have populated the child object while we weren't
		 * looking.  Move to WRITE_FLAT state as we'll be done
		 * with the operation once the null copyup completes.
		 */
		obj_req->write_state = RBD_OBJ_WRITE_FLAT;
		return rbd_obj_issue_copyup(obj_req, 0);
	}

	/*
	 * Determine the byte range covered by the object in the
	 * child image to which the original request was to be sent.
	 */
	img_offset = obj_req->img_offset - obj_req->offset;
	obj_overlap = rbd_dev->layout.object_size;

	/*
	 * There is no defined parent data beyond the parent
	 * overlap, so limit what we read at that boundary if
	 * necessary.
	 */
	if (img_offset + obj_overlap > rbd_dev->parent_overlap) {
		rbd_assert(img_offset < rbd_dev->parent_overlap);
		obj_overlap = rbd_dev->parent_overlap - img_offset;
	}

	ret = setup_copyup_bvecs(obj_req, obj_overlap);
	if (ret)
		return ret;

	obj_req->write_state = RBD_OBJ_WRITE_COPYUP;
	return rbd_obj_read_from_parent(obj_req, img_offset, obj_overlap);
}

static bool rbd_obj_handle_write(struct rbd_obj_request *obj_req)
{
	int ret;

again:
	switch (obj_req->write_state) {
	case RBD_OBJ_WRITE_GUARD:
		rbd_assert(!obj_req->xferred);
		if (obj_req->result == -ENOENT) {
			/*
			 * The target object doesn't exist.  Read the data for
			 * the entire target object up to the overlap point (if
			 * any) from the parent, so we can use it for a copyup.
			 */
			ret = rbd_obj_handle_write_guard(obj_req);
			if (ret) {
				obj_req->result = ret;
				return true;
			}
			return false;
		}
		/* fall through */
	case RBD_OBJ_WRITE_FLAT:
		if (!obj_req->result)
			/*
			 * There is no such thing as a successful short
			 * write -- indicate the whole request was satisfied.
			 */
			obj_req->xferred = obj_req->length;
		return true;
	case RBD_OBJ_WRITE_COPYUP:
		obj_req->write_state = RBD_OBJ_WRITE_GUARD;
		if (obj_req->result)
			goto again;

		rbd_assert(obj_req->xferred);
		ret = rbd_obj_issue_copyup(obj_req, obj_req->xferred);
		if (ret) {
			obj_req->result = ret;
			return true;
		}
		return false;
	default:
		rbd_assert(0);
	}
}

/*
 * Returns true if @obj_req is completed, or false otherwise.
 */
static bool __rbd_obj_handle_request(struct rbd_obj_request *obj_req)
{
	switch (rbd_img_request_op_type(obj_req->img_request)) {
	case OBJ_OP_READ:
		return rbd_obj_handle_read(obj_req);
	case OBJ_OP_WRITE:
		return rbd_obj_handle_write(obj_req);
	case OBJ_OP_DISCARD:
		if (rbd_obj_handle_write(obj_req)) {
			/*
			 * Hide -ENOENT from delete/truncate/zero -- discarding
			 * a non-existent object is not a problem.
			 */
			if (obj_req->result == -ENOENT) {
				obj_req->result = 0;
				obj_req->xferred = obj_req->length;
			}
			return true;
		}
		return false;
	default:
		rbd_assert(0);
	}
}

static void rbd_img_end_child_request(struct rbd_img_request *img_req)
{
	struct rbd_obj_request *obj_req = img_req->obj_request;

	rbd_assert(test_bit(IMG_REQ_CHILD, &img_req->flags));

	obj_req->result = img_req->result;
	obj_req->xferred = img_req->xferred;
	rbd_img_request_put(img_req);

	rbd_obj_handle_request(obj_req);
}

static void rbd_obj_handle_request(struct rbd_obj_request *obj_req)
{
	if (!__rbd_obj_handle_request(obj_req))
		return;

	obj_request_done_set(obj_req);
	rbd_obj_request_complete(obj_req);
}

static const struct rbd_client_id rbd_empty_cid;

static bool rbd_cid_equal(const struct rbd_client_id *lhs,