Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit bd1a643e authored by Linus Torvalds's avatar Linus Torvalds
Browse files
* 'for-linus' of git://git.kernel.org/pub/scm/linux/kernel/git/sage/ceph-client:
  rbd: fix split bio handling
  rbd: fix leak of ops struct
parents 9f6bd36c 1fec7093
Loading
Loading
Loading
Loading
+156 −21
Original line number Diff line number Diff line
@@ -92,6 +92,8 @@ struct rbd_client {
	struct list_head	node;
};

struct rbd_req_coll;

/*
 * a single io request
 */
@@ -100,6 +102,24 @@ struct rbd_request {
	struct bio		*bio;		/* cloned bio */
	struct page		**pages;	/* list of used pages */
	u64			len;
	int			coll_index;
	struct rbd_req_coll	*coll;
};

struct rbd_req_status {
	int done;
	int rc;
	u64 bytes;
};

/*
 * a collection of requests
 */
struct rbd_req_coll {
	int			total;
	int			num_done;
	struct kref		kref;
	struct rbd_req_status	status[0];
};

struct rbd_snap {
@@ -416,6 +436,17 @@ static void rbd_put_client(struct rbd_device *rbd_dev)
	rbd_dev->client = NULL;
}

/*
 * Destroy requests collection
 */
static void rbd_coll_release(struct kref *kref)
{
	struct rbd_req_coll *coll =
		container_of(kref, struct rbd_req_coll, kref);

	dout("rbd_coll_release %p\n", coll);
	kfree(coll);
}

/*
 * Create a new header structure, translate header format from the on-disk
@@ -590,6 +621,14 @@ static u64 rbd_get_segment(struct rbd_image_header *header,
	return len;
}

static int rbd_get_num_segments(struct rbd_image_header *header,
				u64 ofs, u64 len)
{
	u64 start_seg = ofs >> header->obj_order;
	u64 end_seg = (ofs + len - 1) >> header->obj_order;
	return end_seg - start_seg + 1;
}

/*
 * bio helpers
 */
@@ -735,6 +774,50 @@ static void rbd_destroy_ops(struct ceph_osd_req_op *ops)
	kfree(ops);
}

static void rbd_coll_end_req_index(struct request *rq,
				   struct rbd_req_coll *coll,
				   int index,
				   int ret, u64 len)
{
	struct request_queue *q;
	int min, max, i;

	dout("rbd_coll_end_req_index %p index %d ret %d len %lld\n",
	     coll, index, ret, len);

	if (!rq)
		return;

	if (!coll) {
		blk_end_request(rq, ret, len);
		return;
	}

	q = rq->q;

	spin_lock_irq(q->queue_lock);
	coll->status[index].done = 1;
	coll->status[index].rc = ret;
	coll->status[index].bytes = len;
	max = min = coll->num_done;
	while (max < coll->total && coll->status[max].done)
		max++;

	for (i = min; i<max; i++) {
		__blk_end_request(rq, coll->status[i].rc,
				  coll->status[i].bytes);
		coll->num_done++;
		kref_put(&coll->kref, rbd_coll_release);
	}
	spin_unlock_irq(q->queue_lock);
}

static void rbd_coll_end_req(struct rbd_request *req,
			     int ret, u64 len)
{
	rbd_coll_end_req_index(req->rq, req->coll, req->coll_index, ret, len);
}

/*
 * Send ceph osd request
 */
@@ -749,6 +832,8 @@ static int rbd_do_request(struct request *rq,
			  int flags,
			  struct ceph_osd_req_op *ops,
			  int num_reply,
			  struct rbd_req_coll *coll,
			  int coll_index,
			  void (*rbd_cb)(struct ceph_osd_request *req,
					 struct ceph_msg *msg),
			  struct ceph_osd_request **linger_req,
@@ -763,12 +848,20 @@ static int rbd_do_request(struct request *rq,
	struct ceph_osd_request_head *reqhead;
	struct rbd_image_header *header = &dev->header;

	ret = -ENOMEM;
	req_data = kzalloc(sizeof(*req_data), GFP_NOIO);
	if (!req_data)
		goto done;
	if (!req_data) {
		if (coll)
			rbd_coll_end_req_index(rq, coll, coll_index,
					       -ENOMEM, len);
		return -ENOMEM;
	}

	if (coll) {
		req_data->coll = coll;
		req_data->coll_index = coll_index;
	}

	dout("rbd_do_request len=%lld ofs=%lld\n", len, ofs);
	dout("rbd_do_request obj=%s ofs=%lld len=%lld\n", obj, len, ofs);

	down_read(&header->snap_rwsem);

@@ -828,7 +921,8 @@ static int rbd_do_request(struct request *rq,
		ret = ceph_osdc_wait_request(&dev->client->osdc, req);
		if (ver)
			*ver = le64_to_cpu(req->r_reassert_version.version);
		dout("reassert_ver=%lld\n", le64_to_cpu(req->r_reassert_version.version));
		dout("reassert_ver=%lld\n",
		     le64_to_cpu(req->r_reassert_version.version));
		ceph_osdc_put_request(req);
	}
	return ret;
@@ -837,10 +931,8 @@ static int rbd_do_request(struct request *rq,
	bio_chain_put(req_data->bio);
	ceph_osdc_put_request(req);
done_pages:
	rbd_coll_end_req(req_data, ret, len);
	kfree(req_data);
done:
	if (rq)
		blk_end_request(rq, ret, len);
	return ret;
}

@@ -874,7 +966,7 @@ static void rbd_req_cb(struct ceph_osd_request *req, struct ceph_msg *msg)
		bytes = req_data->len;
	}

	blk_end_request(req_data->rq, rc, bytes);
	rbd_coll_end_req(req_data, rc, bytes);

	if (req_data->bio)
		bio_chain_put(req_data->bio);
@@ -934,6 +1026,7 @@ static int rbd_req_sync_op(struct rbd_device *dev,
			  flags,
			  ops,
			  2,
			  NULL, 0,
			  NULL,
			  linger_req, ver);
	if (ret < 0)
@@ -959,7 +1052,9 @@ static int rbd_do_op(struct request *rq,
		     u64 snapid,
		     int opcode, int flags, int num_reply,
		     u64 ofs, u64 len,
		     struct bio *bio)
		     struct bio *bio,
		     struct rbd_req_coll *coll,
		     int coll_index)
{
	char *seg_name;
	u64 seg_ofs;
@@ -995,7 +1090,10 @@ static int rbd_do_op(struct request *rq,
			     flags,
			     ops,
			     num_reply,
			     coll, coll_index,
			     rbd_req_cb, 0, NULL);

	rbd_destroy_ops(ops);
done:
	kfree(seg_name);
	return ret;
@@ -1008,13 +1106,15 @@ static int rbd_req_write(struct request *rq,
			 struct rbd_device *rbd_dev,
			 struct ceph_snap_context *snapc,
			 u64 ofs, u64 len,
			 struct bio *bio)
			 struct bio *bio,
			 struct rbd_req_coll *coll,
			 int coll_index)
{
	return rbd_do_op(rq, rbd_dev, snapc, CEPH_NOSNAP,
			 CEPH_OSD_OP_WRITE,
			 CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
			 2,
			 ofs, len, bio);
			 ofs, len, bio, coll, coll_index);
}

/*
@@ -1024,14 +1124,16 @@ static int rbd_req_read(struct request *rq,
			 struct rbd_device *rbd_dev,
			 u64 snapid,
			 u64 ofs, u64 len,
			 struct bio *bio)
			 struct bio *bio,
			 struct rbd_req_coll *coll,
			 int coll_index)
{
	return rbd_do_op(rq, rbd_dev, NULL,
			 (snapid ? snapid : CEPH_NOSNAP),
			 CEPH_OSD_OP_READ,
			 CEPH_OSD_FLAG_READ,
			 2,
			 ofs, len, bio);
			 ofs, len, bio, coll, coll_index);
}

/*
@@ -1063,7 +1165,9 @@ static int rbd_req_sync_notify_ack(struct rbd_device *dev,
{
	struct ceph_osd_req_op *ops;
	struct page **pages = NULL;
	int ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_NOTIFY_ACK, 0);
	int ret;

	ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_NOTIFY_ACK, 0);
	if (ret < 0)
		return ret;

@@ -1077,6 +1181,7 @@ static int rbd_req_sync_notify_ack(struct rbd_device *dev,
			  CEPH_OSD_FLAG_READ,
			  ops,
			  1,
			  NULL, 0,
			  rbd_simple_req_cb, 0, NULL);

	rbd_destroy_ops(ops);
@@ -1274,6 +1379,20 @@ static int rbd_req_sync_exec(struct rbd_device *dev,
	return ret;
}

static struct rbd_req_coll *rbd_alloc_coll(int num_reqs)
{
	struct rbd_req_coll *coll =
			kzalloc(sizeof(struct rbd_req_coll) +
			        sizeof(struct rbd_req_status) * num_reqs,
				GFP_ATOMIC);

	if (!coll)
		return NULL;
	coll->total = num_reqs;
	kref_init(&coll->kref);
	return coll;
}

/*
 * block device queue callback
 */
@@ -1291,6 +1410,8 @@ static void rbd_rq_fn(struct request_queue *q)
		bool do_write;
		int size, op_size = 0;
		u64 ofs;
		int num_segs, cur_seg = 0;
		struct rbd_req_coll *coll;

		/* peek at request from block layer */
		if (!rq)
@@ -1321,6 +1442,14 @@ static void rbd_rq_fn(struct request_queue *q)
		     do_write ? "write" : "read",
		     size, blk_rq_pos(rq) * 512ULL);

		num_segs = rbd_get_num_segments(&rbd_dev->header, ofs, size);
		coll = rbd_alloc_coll(num_segs);
		if (!coll) {
			spin_lock_irq(q->queue_lock);
			__blk_end_request_all(rq, -ENOMEM);
			goto next;
		}

		do {
			/* a bio clone to be passed down to OSD req */
			dout("rq->bio->bi_vcnt=%d\n", rq->bio->bi_vcnt);
@@ -1328,35 +1457,41 @@ static void rbd_rq_fn(struct request_queue *q)
						  rbd_dev->header.block_name,
						  ofs, size,
						  NULL, NULL);
			kref_get(&coll->kref);
			bio = bio_chain_clone(&rq_bio, &next_bio, &bp,
					      op_size, GFP_ATOMIC);
			if (!bio) {
				spin_lock_irq(q->queue_lock);
				__blk_end_request_all(rq, -ENOMEM);
				goto next;
				rbd_coll_end_req_index(rq, coll, cur_seg,
						       -ENOMEM, op_size);
				goto next_seg;
			}


			/* init OSD command: write or read */
			if (do_write)
				rbd_req_write(rq, rbd_dev,
					      rbd_dev->header.snapc,
					      ofs,
					      op_size, bio);
					      op_size, bio,
					      coll, cur_seg);
			else
				rbd_req_read(rq, rbd_dev,
					     cur_snap_id(rbd_dev),
					     ofs,
					     op_size, bio);
					     op_size, bio,
					     coll, cur_seg);

next_seg:
			size -= op_size;
			ofs += op_size;

			cur_seg++;
			rq_bio = next_bio;
		} while (size > 0);
		kref_put(&coll->kref, rbd_coll_release);

		if (bp)
			bio_pair_release(bp);

		spin_lock_irq(q->queue_lock);
next:
		rq = blk_fetch_request(q);