Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit 5359a17d authored by Ilya Dryomov's avatar Ilya Dryomov
Browse files

libceph, rbd: new bio handling code (aka don't clone bios)



The reason we clone bios is to be able to give each object request
(and consequently each ceph_osd_data/ceph_msg_data item) its own
pointer to a (list of) bio(s).  The messenger then initializes its
cursor with cloned bio's ->bi_iter, so it knows where to start reading
from/writing to.  That's all the cloned bios are used for: to determine
each object request's starting position in the provided data buffer.

Introduce ceph_bio_iter to do exactly that -- store position within bio
list (i.e. pointer to bio) + position within that bio (i.e. bvec_iter).

Signed-off-by: default avatarIlya Dryomov <idryomov@gmail.com>
parent a1fbb5e7
Loading
Loading
Loading
Loading
+40 −27
Original line number Diff line number Diff line
@@ -218,7 +218,7 @@ typedef void (*rbd_obj_callback_t)(struct rbd_obj_request *);

enum obj_request_type {
	OBJ_REQUEST_NODATA = 1,
	OBJ_REQUEST_BIO,
	OBJ_REQUEST_BIO,	/* pointer into provided bio (list) */
	OBJ_REQUEST_PAGES,
};

@@ -270,7 +270,7 @@ struct rbd_obj_request {

	enum obj_request_type	type;
	union {
		struct bio	*bio_list;
		struct ceph_bio_iter	bio_pos;
		struct {
			struct page	**pages;
			u32		page_count;
@@ -1255,6 +1255,27 @@ static u64 rbd_segment_length(struct rbd_device *rbd_dev,
	return length;
}

static void zero_bvec(struct bio_vec *bv)
{
	void *buf;
	unsigned long flags;

	buf = bvec_kmap_irq(bv, &flags);
	memset(buf, 0, bv->bv_len);
	flush_dcache_page(bv->bv_page);
	bvec_kunmap_irq(buf, &flags);
}

static void zero_bios(struct ceph_bio_iter *bio_pos, u32 off, u32 bytes)
{
	struct ceph_bio_iter it = *bio_pos;

	ceph_bio_iter_advance(&it, off);
	ceph_bio_iter_advance_step(&it, bytes, ({
		zero_bvec(&bv);
	}));
}

/*
 * bio helpers
 */
@@ -1719,13 +1740,14 @@ rbd_img_obj_request_read_callback(struct rbd_obj_request *obj_request)
	rbd_assert(obj_request->type != OBJ_REQUEST_NODATA);
	if (obj_request->result == -ENOENT) {
		if (obj_request->type == OBJ_REQUEST_BIO)
			zero_bio_chain(obj_request->bio_list, 0);
			zero_bios(&obj_request->bio_pos, 0, length);
		else
			zero_pages(obj_request->pages, 0, length);
		obj_request->result = 0;
	} else if (xferred < length && !obj_request->result) {
		if (obj_request->type == OBJ_REQUEST_BIO)
			zero_bio_chain(obj_request->bio_list, xferred);
			zero_bios(&obj_request->bio_pos, xferred,
				  length - xferred);
		else
			zero_pages(obj_request->pages, xferred, length);
	}
@@ -2036,11 +2058,8 @@ static void rbd_obj_request_destroy(struct kref *kref)
	rbd_assert(obj_request_type_valid(obj_request->type));
	switch (obj_request->type) {
	case OBJ_REQUEST_NODATA:
		break;		/* Nothing to do */
	case OBJ_REQUEST_BIO:
		if (obj_request->bio_list)
			bio_chain_put(obj_request->bio_list);
		break;
		break;		/* Nothing to do */
	case OBJ_REQUEST_PAGES:
		/* img_data requests don't own their page array */
		if (obj_request->pages &&
@@ -2368,7 +2387,7 @@ static void rbd_img_obj_request_fill(struct rbd_obj_request *obj_request,

	if (obj_request->type == OBJ_REQUEST_BIO)
		osd_req_op_extent_osd_data_bio(osd_request, num_ops,
					obj_request->bio_list, length);
					&obj_request->bio_pos, length);
	else if (obj_request->type == OBJ_REQUEST_PAGES)
		osd_req_op_extent_osd_data_pages(osd_request, num_ops,
					obj_request->pages, length,
@@ -2396,8 +2415,7 @@ static int rbd_img_request_fill(struct rbd_img_request *img_request,
	struct rbd_device *rbd_dev = img_request->rbd_dev;
	struct rbd_obj_request *obj_request = NULL;
	struct rbd_obj_request *next_obj_request;
	struct bio *bio_list = NULL;
	unsigned int bio_offset = 0;
	struct ceph_bio_iter bio_it;
	struct page **pages = NULL;
	enum obj_operation_type op_type;
	u64 img_offset;
@@ -2412,9 +2430,9 @@ static int rbd_img_request_fill(struct rbd_img_request *img_request,
	op_type = rbd_img_request_op_type(img_request);

	if (type == OBJ_REQUEST_BIO) {
		bio_list = data_desc;
		bio_it = *(struct ceph_bio_iter *)data_desc;
		rbd_assert(img_offset ==
			   bio_list->bi_iter.bi_sector << SECTOR_SHIFT);
			   bio_it.iter.bi_sector << SECTOR_SHIFT);
	} else if (type == OBJ_REQUEST_PAGES) {
		pages = data_desc;
	}
@@ -2440,17 +2458,8 @@ static int rbd_img_request_fill(struct rbd_img_request *img_request,
		rbd_img_obj_request_add(img_request, obj_request);

		if (type == OBJ_REQUEST_BIO) {
			unsigned int clone_size;

			rbd_assert(length <= (u64)UINT_MAX);
			clone_size = (unsigned int)length;
			obj_request->bio_list =
					bio_chain_clone_range(&bio_list,
								&bio_offset,
								clone_size,
								GFP_NOIO);
			if (!obj_request->bio_list)
				goto out_unwind;
			obj_request->bio_pos = bio_it;
			ceph_bio_iter_advance(&bio_it, length);
		} else if (type == OBJ_REQUEST_PAGES) {
			unsigned int page_count;

@@ -2980,7 +2989,7 @@ static void rbd_img_parent_read(struct rbd_obj_request *obj_request)

	if (obj_request->type == OBJ_REQUEST_BIO)
		result = rbd_img_request_fill(img_request, OBJ_REQUEST_BIO,
						obj_request->bio_list);
						&obj_request->bio_pos);
	else
		result = rbd_img_request_fill(img_request, OBJ_REQUEST_PAGES,
						obj_request->pages);
@@ -4093,9 +4102,13 @@ static void rbd_queue_workfn(struct work_struct *work)
	if (op_type == OBJ_OP_DISCARD)
		result = rbd_img_request_fill(img_request, OBJ_REQUEST_NODATA,
					      NULL);
	else
	else {
		struct ceph_bio_iter bio_it = { .bio = rq->bio,
						.iter = rq->bio->bi_iter };

		result = rbd_img_request_fill(img_request, OBJ_REQUEST_BIO,
					      rq->bio);
					      &bio_it);
	}
	if (result)
		goto err_img_request;

+51 −8
Original line number Diff line number Diff line
@@ -93,14 +93,60 @@ static __inline__ bool ceph_msg_data_type_valid(enum ceph_msg_data_type type)
	}
}

#ifdef CONFIG_BLOCK

struct ceph_bio_iter {
	struct bio *bio;
	struct bvec_iter iter;
};

#define __ceph_bio_iter_advance_step(it, n, STEP) do {			      \
	unsigned int __n = (n), __cur_n;				      \
									      \
	while (__n) {							      \
		BUG_ON(!(it)->iter.bi_size);				      \
		__cur_n = min((it)->iter.bi_size, __n);			      \
		(void)(STEP);						      \
		bio_advance_iter((it)->bio, &(it)->iter, __cur_n);	      \
		if (!(it)->iter.bi_size && (it)->bio->bi_next) {	      \
			dout("__ceph_bio_iter_advance_step next bio\n");      \
			(it)->bio = (it)->bio->bi_next;			      \
			(it)->iter = (it)->bio->bi_iter;		      \
		}							      \
		__n -= __cur_n;						      \
	}								      \
} while (0)

/*
 * Advance @it by @n bytes.
 */
#define ceph_bio_iter_advance(it, n)					      \
	__ceph_bio_iter_advance_step(it, n, 0)

/*
 * Advance @it by @n bytes, executing BVEC_STEP for each bio_vec.
 */
#define ceph_bio_iter_advance_step(it, n, BVEC_STEP)			      \
	__ceph_bio_iter_advance_step(it, n, ({				      \
		struct bio_vec bv;					      \
		struct bvec_iter __cur_iter;				      \
									      \
		__cur_iter = (it)->iter;				      \
		__cur_iter.bi_size = __cur_n;				      \
		__bio_for_each_segment(bv, (it)->bio, __cur_iter, __cur_iter) \
			(void)(BVEC_STEP);				      \
	}))

#endif /* CONFIG_BLOCK */

struct ceph_msg_data {
	struct list_head		links;	/* ceph_msg->data */
	enum ceph_msg_data_type		type;
	union {
#ifdef CONFIG_BLOCK
		struct {
			struct bio	*bio;
			size_t		bio_length;
			struct ceph_bio_iter	bio_pos;
			u32			bio_length;
		};
#endif /* CONFIG_BLOCK */
		struct {
@@ -122,10 +168,7 @@ struct ceph_msg_data_cursor {
	bool			need_crc;	/* crc update needed */
	union {
#ifdef CONFIG_BLOCK
		struct {				/* bio */
			struct bio	*bio;		/* bio from list */
			struct bvec_iter bvec_iter;
		};
		struct ceph_bio_iter	bio_iter;
#endif /* CONFIG_BLOCK */
		struct {				/* pages */
			unsigned int	page_offset;	/* offset in page */
@@ -290,8 +333,8 @@ extern void ceph_msg_data_add_pages(struct ceph_msg *msg, struct page **pages,
extern void ceph_msg_data_add_pagelist(struct ceph_msg *msg,
				struct ceph_pagelist *pagelist);
#ifdef CONFIG_BLOCK
extern void ceph_msg_data_add_bio(struct ceph_msg *msg, struct bio *bio,
				size_t length);
void ceph_msg_data_add_bio(struct ceph_msg *msg, struct ceph_bio_iter *bio_pos,
			   u32 length);
#endif /* CONFIG_BLOCK */

extern struct ceph_msg *ceph_msg_new(int type, int front_len, gfp_t flags,
+6 −5
Original line number Diff line number Diff line
@@ -72,8 +72,8 @@ struct ceph_osd_data {
		struct ceph_pagelist	*pagelist;
#ifdef CONFIG_BLOCK
		struct {
			struct bio	*bio;		/* list of bios */
			size_t		bio_length;	/* total in list */
			struct ceph_bio_iter	bio_pos;
			u32			bio_length;
		};
#endif /* CONFIG_BLOCK */
	};
@@ -405,9 +405,10 @@ extern void osd_req_op_extent_osd_data_pagelist(struct ceph_osd_request *,
					unsigned int which,
					struct ceph_pagelist *pagelist);
#ifdef CONFIG_BLOCK
extern void osd_req_op_extent_osd_data_bio(struct ceph_osd_request *,
void osd_req_op_extent_osd_data_bio(struct ceph_osd_request *osd_req,
				    unsigned int which,
					struct bio *bio, size_t bio_length);
				    struct ceph_bio_iter *bio_pos,
				    u32 bio_length);
#endif /* CONFIG_BLOCK */

extern void osd_req_op_cls_request_data_pagelist(struct ceph_osd_request *,
+34 −67
Original line number Diff line number Diff line
@@ -839,90 +839,57 @@ static void ceph_msg_data_bio_cursor_init(struct ceph_msg_data_cursor *cursor,
					size_t length)
{
	struct ceph_msg_data *data = cursor->data;
	struct bio *bio;
	struct ceph_bio_iter *it = &cursor->bio_iter;

	BUG_ON(data->type != CEPH_MSG_DATA_BIO);
	cursor->resid = min_t(size_t, length, data->bio_length);
	*it = data->bio_pos;
	if (cursor->resid < it->iter.bi_size)
		it->iter.bi_size = cursor->resid;

	bio = data->bio;
	BUG_ON(!bio);

	cursor->resid = min(length, data->bio_length);
	cursor->bio = bio;
	cursor->bvec_iter = bio->bi_iter;
	cursor->last_piece =
		cursor->resid <= bio_iter_len(bio, cursor->bvec_iter);
	BUG_ON(cursor->resid < bio_iter_len(it->bio, it->iter));
	cursor->last_piece = cursor->resid == bio_iter_len(it->bio, it->iter);
}

static struct page *ceph_msg_data_bio_next(struct ceph_msg_data_cursor *cursor,
						size_t *page_offset,
						size_t *length)
{
	struct ceph_msg_data *data = cursor->data;
	struct bio *bio;
	struct bio_vec bio_vec;

	BUG_ON(data->type != CEPH_MSG_DATA_BIO);

	bio = cursor->bio;
	BUG_ON(!bio);
	struct bio_vec bv = bio_iter_iovec(cursor->bio_iter.bio,
					   cursor->bio_iter.iter);

	bio_vec = bio_iter_iovec(bio, cursor->bvec_iter);

	*page_offset = (size_t) bio_vec.bv_offset;
	BUG_ON(*page_offset >= PAGE_SIZE);
	if (cursor->last_piece) /* pagelist offset is always 0 */
		*length = cursor->resid;
	else
		*length = (size_t) bio_vec.bv_len;
	BUG_ON(*length > cursor->resid);
	BUG_ON(*page_offset + *length > PAGE_SIZE);

	return bio_vec.bv_page;
	*page_offset = bv.bv_offset;
	*length = bv.bv_len;
	return bv.bv_page;
}

static bool ceph_msg_data_bio_advance(struct ceph_msg_data_cursor *cursor,
					size_t bytes)
{
	struct bio *bio;
	struct bio_vec bio_vec;

	BUG_ON(cursor->data->type != CEPH_MSG_DATA_BIO);

	bio = cursor->bio;
	BUG_ON(!bio);

	bio_vec = bio_iter_iovec(bio, cursor->bvec_iter);
	struct ceph_bio_iter *it = &cursor->bio_iter;

	/* Advance the cursor offset */

	BUG_ON(cursor->resid < bytes);
	BUG_ON(bytes > cursor->resid);
	BUG_ON(bytes > bio_iter_len(it->bio, it->iter));
	cursor->resid -= bytes;
	bio_advance_iter(it->bio, &it->iter, bytes);

	bio_advance_iter(bio, &cursor->bvec_iter, bytes);
	if (!cursor->resid) {
		BUG_ON(!cursor->last_piece);
		return false;   /* no more data */
	}

	if (bytes < bio_vec.bv_len)
	if (!bytes || (it->iter.bi_size && it->iter.bi_bvec_done))
		return false;	/* more bytes to process in this segment */

	/* Move on to the next segment, and possibly the next bio */

	if (!cursor->bvec_iter.bi_size) {
		bio = bio->bi_next;
		cursor->bio = bio;
		if (bio)
			cursor->bvec_iter = bio->bi_iter;
		else
			memset(&cursor->bvec_iter, 0,
			       sizeof(cursor->bvec_iter));
	}

	if (!cursor->last_piece) {
		BUG_ON(!cursor->resid);
		BUG_ON(!bio);
		/* A short read is OK, so use <= rather than == */
		if (cursor->resid <= bio_iter_len(bio, cursor->bvec_iter))
			cursor->last_piece = true;
	if (!it->iter.bi_size) {
		it->bio = it->bio->bi_next;
		it->iter = it->bio->bi_iter;
		if (cursor->resid < it->iter.bi_size)
			it->iter.bi_size = cursor->resid;
	}

	BUG_ON(cursor->last_piece);
	BUG_ON(cursor->resid < bio_iter_len(it->bio, it->iter));
	cursor->last_piece = cursor->resid == bio_iter_len(it->bio, it->iter);
	return true;
}
#endif /* CONFIG_BLOCK */
@@ -1163,9 +1130,11 @@ static struct page *ceph_msg_data_next(struct ceph_msg_data_cursor *cursor,
		page = NULL;
		break;
	}

	BUG_ON(!page);
	BUG_ON(*page_offset + *length > PAGE_SIZE);
	BUG_ON(!*length);
	BUG_ON(*length > cursor->resid);
	if (last_piece)
		*last_piece = cursor->last_piece;

@@ -3262,16 +3231,14 @@ void ceph_msg_data_add_pagelist(struct ceph_msg *msg,
EXPORT_SYMBOL(ceph_msg_data_add_pagelist);

#ifdef	CONFIG_BLOCK
void ceph_msg_data_add_bio(struct ceph_msg *msg, struct bio *bio,
		size_t length)
void ceph_msg_data_add_bio(struct ceph_msg *msg, struct ceph_bio_iter *bio_pos,
			   u32 length)
{
	struct ceph_msg_data *data;

	BUG_ON(!bio);

	data = ceph_msg_data_create(CEPH_MSG_DATA_BIO);
	BUG_ON(!data);
	data->bio = bio;
	data->bio_pos = *bio_pos;
	data->bio_length = length;

	list_add_tail(&data->links, &msg->data);
+8 −5
Original line number Diff line number Diff line
@@ -146,10 +146,11 @@ static void ceph_osd_data_pagelist_init(struct ceph_osd_data *osd_data,

#ifdef CONFIG_BLOCK
static void ceph_osd_data_bio_init(struct ceph_osd_data *osd_data,
			struct bio *bio, size_t bio_length)
				   struct ceph_bio_iter *bio_pos,
				   u32 bio_length)
{
	osd_data->type = CEPH_OSD_DATA_TYPE_BIO;
	osd_data->bio = bio;
	osd_data->bio_pos = *bio_pos;
	osd_data->bio_length = bio_length;
}
#endif /* CONFIG_BLOCK */
@@ -216,12 +217,14 @@ EXPORT_SYMBOL(osd_req_op_extent_osd_data_pagelist);

#ifdef CONFIG_BLOCK
void osd_req_op_extent_osd_data_bio(struct ceph_osd_request *osd_req,
			unsigned int which, struct bio *bio, size_t bio_length)
				    unsigned int which,
				    struct ceph_bio_iter *bio_pos,
				    u32 bio_length)
{
	struct ceph_osd_data *osd_data;

	osd_data = osd_req_op_data(osd_req, which, extent, osd_data);
	ceph_osd_data_bio_init(osd_data, bio, bio_length);
	ceph_osd_data_bio_init(osd_data, bio_pos, bio_length);
}
EXPORT_SYMBOL(osd_req_op_extent_osd_data_bio);
#endif /* CONFIG_BLOCK */
@@ -826,7 +829,7 @@ static void ceph_osdc_msg_data_add(struct ceph_msg *msg,
		ceph_msg_data_add_pagelist(msg, osd_data->pagelist);
#ifdef CONFIG_BLOCK
	} else if (osd_data->type == CEPH_OSD_DATA_TYPE_BIO) {
		ceph_msg_data_add_bio(msg, osd_data->bio, length);
		ceph_msg_data_add_bio(msg, &osd_data->bio_pos, length);
#endif
	} else {
		BUG_ON(osd_data->type != CEPH_OSD_DATA_TYPE_NONE);