Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit 6b049081 authored by Linus Torvalds's avatar Linus Torvalds
Browse files
Pull Ceph updates from Sage Weil:
 "There is the long-awaited discard support for RBD (Guangliang Zhao,
  Josh Durgin), a pile of RBD bug fixes that didn't belong in late -rc's
  (Ilya Dryomov, Li RongQing), a pile of fs/ceph bug fixes and
  performance and debugging improvements (Yan, Zheng, John Spray), and a
  smattering of cleanups (Chao Yu, Fabian Frederick, Joe Perches)"

* 'for-linus' of git://git.kernel.org/pub/scm/linux/kernel/git/sage/ceph-client: (40 commits)
  ceph: fix divide-by-zero in __validate_layout()
  rbd: rbd workqueues need a resque worker
  libceph: ceph-msgr workqueue needs a resque worker
  ceph: fix bool assignments
  libceph: separate multiple ops with commas in debugfs output
  libceph: sync osd op definitions in rados.h
  libceph: remove redundant declaration
  ceph: additional debugfs output
  ceph: export ceph_session_state_name function
  ceph: include the initial ACL in create/mkdir/mknod MDS requests
  ceph: use pagelist to present MDS request data
  libceph: reference counting pagelist
  ceph: fix llistxattr on symlink
  ceph: send client metadata to MDS
  ceph: remove redundant code for max file size verification
  ceph: remove redundant io_iter_advance()
  ceph: move ceph_find_inode() outside the s_mutex
  ceph: request xattrs if xattr_version is zero
  rbd: set the remaining discard properties to enable support
  rbd: use helpers to handle discard for layered images correctly
  ...
parents ce9d7f7b 0bc62284
Loading
Loading
Loading
Loading
+276 −120
Original line number Diff line number Diff line
@@ -210,6 +210,12 @@ enum obj_request_type {
	OBJ_REQUEST_NODATA, OBJ_REQUEST_BIO, OBJ_REQUEST_PAGES
};

enum obj_operation_type {
	OBJ_OP_WRITE,
	OBJ_OP_READ,
	OBJ_OP_DISCARD,
};

enum obj_req_flags {
	OBJ_REQ_DONE,		/* completion flag: not done = 0, done = 1 */
	OBJ_REQ_IMG_DATA,	/* object usage: standalone = 0, image = 1 */
@@ -276,6 +282,7 @@ enum img_req_flags {
	IMG_REQ_WRITE,		/* I/O direction: read = 0, write = 1 */
	IMG_REQ_CHILD,		/* initiator: block = 0, child image = 1 */
	IMG_REQ_LAYERED,	/* ENOENT handling: normal = 0, layered = 1 */
	IMG_REQ_DISCARD,	/* discard: normal = 0, discard request = 1 */
};

struct rbd_img_request {
@@ -785,6 +792,20 @@ static int parse_rbd_opts_token(char *c, void *private)
	return 0;
}

static char* obj_op_name(enum obj_operation_type op_type)
{
	switch (op_type) {
	case OBJ_OP_READ:
		return "read";
	case OBJ_OP_WRITE:
		return "write";
	case OBJ_OP_DISCARD:
		return "discard";
	default:
		return "???";
	}
}

/*
 * Get a ceph client with specific addr and configuration, if one does
 * not exist create it.  Either way, ceph_opts is consumed by this
@@ -1600,6 +1621,21 @@ static bool img_request_write_test(struct rbd_img_request *img_request)
	return test_bit(IMG_REQ_WRITE, &img_request->flags) != 0;
}

/*
 * Set the discard flag when the img_request is an discard request
 */
static void img_request_discard_set(struct rbd_img_request *img_request)
{
	set_bit(IMG_REQ_DISCARD, &img_request->flags);
	smp_mb();
}

static bool img_request_discard_test(struct rbd_img_request *img_request)
{
	smp_mb();
	return test_bit(IMG_REQ_DISCARD, &img_request->flags) != 0;
}

static void img_request_child_set(struct rbd_img_request *img_request)
{
	set_bit(IMG_REQ_CHILD, &img_request->flags);
@@ -1636,6 +1672,17 @@ static bool img_request_layered_test(struct rbd_img_request *img_request)
	return test_bit(IMG_REQ_LAYERED, &img_request->flags) != 0;
}

static enum obj_operation_type
rbd_img_request_op_type(struct rbd_img_request *img_request)
{
	if (img_request_write_test(img_request))
		return OBJ_OP_WRITE;
	else if (img_request_discard_test(img_request))
		return OBJ_OP_DISCARD;
	else
		return OBJ_OP_READ;
}

static void
rbd_img_obj_request_read_callback(struct rbd_obj_request *obj_request)
{
@@ -1722,6 +1769,21 @@ static void rbd_osd_write_callback(struct rbd_obj_request *obj_request)
	obj_request_done_set(obj_request);
}

static void rbd_osd_discard_callback(struct rbd_obj_request *obj_request)
{
	dout("%s: obj %p result %d %llu\n", __func__, obj_request,
		obj_request->result, obj_request->length);
	/*
	 * There is no such thing as a successful short discard.  Set
	 * it to our originally-requested length.
	 */
	obj_request->xferred = obj_request->length;
	/* discarding a non-existent object is not a problem */
	if (obj_request->result == -ENOENT)
		obj_request->result = 0;
	obj_request_done_set(obj_request);
}

/*
 * For a simple stat call there's nothing to do.  We'll do more if
 * this is part of a write sequence for a layered image.
@@ -1773,6 +1835,11 @@ static void rbd_osd_req_callback(struct ceph_osd_request *osd_req,
	case CEPH_OSD_OP_STAT:
		rbd_osd_stat_callback(obj_request);
		break;
	case CEPH_OSD_OP_DELETE:
	case CEPH_OSD_OP_TRUNCATE:
	case CEPH_OSD_OP_ZERO:
		rbd_osd_discard_callback(obj_request);
		break;
	case CEPH_OSD_OP_CALL:
	case CEPH_OSD_OP_NOTIFY_ACK:
	case CEPH_OSD_OP_WATCH:
@@ -1823,7 +1890,7 @@ static void rbd_osd_req_format_write(struct rbd_obj_request *obj_request)
 */
static struct ceph_osd_request *rbd_osd_req_create(
					struct rbd_device *rbd_dev,
					bool write_request,
					enum obj_operation_type op_type,
					unsigned int num_ops,
					struct rbd_obj_request *obj_request)
{
@@ -1831,16 +1898,18 @@ static struct ceph_osd_request *rbd_osd_req_create(
	struct ceph_osd_client *osdc;
	struct ceph_osd_request *osd_req;

	if (obj_request_img_data_test(obj_request)) {
	if (obj_request_img_data_test(obj_request) &&
		(op_type == OBJ_OP_DISCARD || op_type == OBJ_OP_WRITE)) {
		struct rbd_img_request *img_request = obj_request->img_request;

		rbd_assert(write_request ==
				img_request_write_test(img_request));
		if (write_request)
		if (op_type == OBJ_OP_WRITE) {
			rbd_assert(img_request_write_test(img_request));
		} else {
			rbd_assert(img_request_discard_test(img_request));
		}
		snapc = img_request->snapc;
	}

	rbd_assert(num_ops == 1 || (write_request && num_ops == 2));
	rbd_assert(num_ops == 1 || ((op_type == OBJ_OP_WRITE) && num_ops == 2));

	/* Allocate and initialize the request, for the num_ops ops */

@@ -1850,7 +1919,7 @@ static struct ceph_osd_request *rbd_osd_req_create(
	if (!osd_req)
		return NULL;	/* ENOMEM */

	if (write_request)
	if (op_type == OBJ_OP_WRITE || op_type == OBJ_OP_DISCARD)
		osd_req->r_flags = CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK;
	else
		osd_req->r_flags = CEPH_OSD_FLAG_READ;
@@ -1865,9 +1934,10 @@ static struct ceph_osd_request *rbd_osd_req_create(
}

/*
 * Create a copyup osd request based on the information in the
 * object request supplied.  A copyup request has three osd ops,
 * a copyup method call, a hint op, and a write op.
 * Create a copyup osd request based on the information in the object
 * request supplied.  A copyup request has two or three osd ops, a
 * copyup method call, potentially a hint op, and a write or truncate
 * or zero op.
 */
static struct ceph_osd_request *
rbd_osd_req_create_copyup(struct rbd_obj_request *obj_request)
@@ -1877,18 +1947,24 @@ rbd_osd_req_create_copyup(struct rbd_obj_request *obj_request)
	struct rbd_device *rbd_dev;
	struct ceph_osd_client *osdc;
	struct ceph_osd_request *osd_req;
	int num_osd_ops = 3;

	rbd_assert(obj_request_img_data_test(obj_request));
	img_request = obj_request->img_request;
	rbd_assert(img_request);
	rbd_assert(img_request_write_test(img_request));
	rbd_assert(img_request_write_test(img_request) ||
			img_request_discard_test(img_request));

	/* Allocate and initialize the request, for the three ops */
	if (img_request_discard_test(img_request))
		num_osd_ops = 2;

	/* Allocate and initialize the request, for all the ops */

	snapc = img_request->snapc;
	rbd_dev = img_request->rbd_dev;
	osdc = &rbd_dev->rbd_client->client->osdc;
	osd_req = ceph_osdc_alloc_request(osdc, snapc, 3, false, GFP_ATOMIC);
	osd_req = ceph_osdc_alloc_request(osdc, snapc, num_osd_ops,
						false, GFP_ATOMIC);
	if (!osd_req)
		return NULL;	/* ENOMEM */

@@ -2057,7 +2133,8 @@ static bool rbd_dev_parent_get(struct rbd_device *rbd_dev)
static struct rbd_img_request *rbd_img_request_create(
					struct rbd_device *rbd_dev,
					u64 offset, u64 length,
					bool write_request)
					enum obj_operation_type op_type,
					struct ceph_snap_context *snapc)
{
	struct rbd_img_request *img_request;

@@ -2065,20 +2142,17 @@ static struct rbd_img_request *rbd_img_request_create(
	if (!img_request)
		return NULL;

	if (write_request) {
		down_read(&rbd_dev->header_rwsem);
		ceph_get_snap_context(rbd_dev->header.snapc);
		up_read(&rbd_dev->header_rwsem);
	}

	img_request->rq = NULL;
	img_request->rbd_dev = rbd_dev;
	img_request->offset = offset;
	img_request->length = length;
	img_request->flags = 0;
	if (write_request) {
	if (op_type == OBJ_OP_DISCARD) {
		img_request_discard_set(img_request);
		img_request->snapc = snapc;
	} else if (op_type == OBJ_OP_WRITE) {
		img_request_write_set(img_request);
		img_request->snapc = rbd_dev->header.snapc;
		img_request->snapc = snapc;
	} else {
		img_request->snap_id = rbd_dev->spec->snap_id;
	}
@@ -2093,8 +2167,7 @@ static struct rbd_img_request *rbd_img_request_create(
	kref_init(&img_request->kref);

	dout("%s: rbd_dev %p %s %llu/%llu -> img %p\n", __func__, rbd_dev,
		write_request ? "write" : "read", offset, length,
		img_request);
		obj_op_name(op_type), offset, length, img_request);

	return img_request;
}
@@ -2118,7 +2191,8 @@ static void rbd_img_request_destroy(struct kref *kref)
		rbd_dev_parent_put(img_request->rbd_dev);
	}

	if (img_request_write_test(img_request))
	if (img_request_write_test(img_request) ||
		img_request_discard_test(img_request))
		ceph_put_snap_context(img_request->snapc);

	kmem_cache_free(rbd_img_request_cache, img_request);
@@ -2134,8 +2208,8 @@ static struct rbd_img_request *rbd_parent_request_create(
	rbd_assert(obj_request->img_request);
	rbd_dev = obj_request->img_request->rbd_dev;

	parent_request = rbd_img_request_create(rbd_dev->parent,
						img_offset, length, false);
	parent_request = rbd_img_request_create(rbd_dev->parent, img_offset,
						length, OBJ_OP_READ, NULL);
	if (!parent_request)
		return NULL;

@@ -2176,11 +2250,18 @@ static bool rbd_img_obj_end_request(struct rbd_obj_request *obj_request)
	result = obj_request->result;
	if (result) {
		struct rbd_device *rbd_dev = img_request->rbd_dev;
		enum obj_operation_type op_type;

		if (img_request_discard_test(img_request))
			op_type = OBJ_OP_DISCARD;
		else if (img_request_write_test(img_request))
			op_type = OBJ_OP_WRITE;
		else
			op_type = OBJ_OP_READ;

		rbd_warn(rbd_dev, "%s %llx at %llx (%llx)",
			img_request_write_test(img_request) ? "write" : "read",
			obj_request->length, obj_request->img_offset,
			obj_request->offset);
			obj_op_name(op_type), obj_request->length,
			obj_request->img_offset, obj_request->offset);
		rbd_warn(rbd_dev, "  result %d xferred %x",
			result, xferred);
		if (!img_request->result)
@@ -2244,6 +2325,67 @@ static void rbd_img_obj_callback(struct rbd_obj_request *obj_request)
		rbd_img_request_complete(img_request);
}

/*
 * Add individual osd ops to the given ceph_osd_request and prepare
 * them for submission. num_ops is the current number of
 * osd operations already to the object request.
 */
static void rbd_img_obj_request_fill(struct rbd_obj_request *obj_request,
				struct ceph_osd_request *osd_request,
				enum obj_operation_type op_type,
				unsigned int num_ops)
{
	struct rbd_img_request *img_request = obj_request->img_request;
	struct rbd_device *rbd_dev = img_request->rbd_dev;
	u64 object_size = rbd_obj_bytes(&rbd_dev->header);
	u64 offset = obj_request->offset;
	u64 length = obj_request->length;
	u64 img_end;
	u16 opcode;

	if (op_type == OBJ_OP_DISCARD) {
		if (!offset && length == object_size &&
		    (!img_request_layered_test(img_request) ||
		     !obj_request_overlaps_parent(obj_request))) {
			opcode = CEPH_OSD_OP_DELETE;
		} else if ((offset + length == object_size)) {
			opcode = CEPH_OSD_OP_TRUNCATE;
		} else {
			down_read(&rbd_dev->header_rwsem);
			img_end = rbd_dev->header.image_size;
			up_read(&rbd_dev->header_rwsem);

			if (obj_request->img_offset + length == img_end)
				opcode = CEPH_OSD_OP_TRUNCATE;
			else
				opcode = CEPH_OSD_OP_ZERO;
		}
	} else if (op_type == OBJ_OP_WRITE) {
		opcode = CEPH_OSD_OP_WRITE;
		osd_req_op_alloc_hint_init(osd_request, num_ops,
					object_size, object_size);
		num_ops++;
	} else {
		opcode = CEPH_OSD_OP_READ;
	}

	osd_req_op_extent_init(osd_request, num_ops, opcode, offset, length,
				0, 0);
	if (obj_request->type == OBJ_REQUEST_BIO)
		osd_req_op_extent_osd_data_bio(osd_request, num_ops,
					obj_request->bio_list, length);
	else if (obj_request->type == OBJ_REQUEST_PAGES)
		osd_req_op_extent_osd_data_pages(osd_request, num_ops,
					obj_request->pages, length,
					offset & ~PAGE_MASK, false, false);

	/* Discards are also writes */
	if (op_type == OBJ_OP_WRITE || op_type == OBJ_OP_DISCARD)
		rbd_osd_req_format_write(obj_request);
	else
		rbd_osd_req_format_read(obj_request);
}

/*
 * Split up an image request into one or more object requests, each
 * to a different object.  The "type" parameter indicates whether
@@ -2259,28 +2401,26 @@ static int rbd_img_request_fill(struct rbd_img_request *img_request,
	struct rbd_device *rbd_dev = img_request->rbd_dev;
	struct rbd_obj_request *obj_request = NULL;
	struct rbd_obj_request *next_obj_request;
	bool write_request = img_request_write_test(img_request);
	struct bio *bio_list = NULL;
	unsigned int bio_offset = 0;
	struct page **pages = NULL;
	enum obj_operation_type op_type;
	u64 img_offset;
	u64 resid;
	u16 opcode;

	dout("%s: img %p type %d data_desc %p\n", __func__, img_request,
		(int)type, data_desc);

	opcode = write_request ? CEPH_OSD_OP_WRITE : CEPH_OSD_OP_READ;
	img_offset = img_request->offset;
	resid = img_request->length;
	rbd_assert(resid > 0);
	op_type = rbd_img_request_op_type(img_request);

	if (type == OBJ_REQUEST_BIO) {
		bio_list = data_desc;
		rbd_assert(img_offset ==
			   bio_list->bi_iter.bi_sector << SECTOR_SHIFT);
	} else {
		rbd_assert(type == OBJ_REQUEST_PAGES);
	} else if (type == OBJ_REQUEST_PAGES) {
		pages = data_desc;
	}

@@ -2289,7 +2429,6 @@ static int rbd_img_request_fill(struct rbd_img_request *img_request,
		const char *object_name;
		u64 offset;
		u64 length;
		unsigned int which = 0;

		object_name = rbd_segment_name(rbd_dev, img_offset);
		if (!object_name)
@@ -2321,7 +2460,7 @@ static int rbd_img_request_fill(struct rbd_img_request *img_request,
								GFP_ATOMIC);
			if (!obj_request->bio_list)
				goto out_unwind;
		} else {
		} else if (type == OBJ_REQUEST_PAGES) {
			unsigned int page_count;

			obj_request->pages = pages;
@@ -2332,38 +2471,19 @@ static int rbd_img_request_fill(struct rbd_img_request *img_request,
			pages += page_count;
		}

		osd_req = rbd_osd_req_create(rbd_dev, write_request,
					     (write_request ? 2 : 1),
		osd_req = rbd_osd_req_create(rbd_dev, op_type,
					(op_type == OBJ_OP_WRITE) ? 2 : 1,
					obj_request);
		if (!osd_req)
			goto out_unwind;

		obj_request->osd_req = osd_req;
		obj_request->callback = rbd_img_obj_callback;
		rbd_img_request_get(img_request);

		if (write_request) {
			osd_req_op_alloc_hint_init(osd_req, which,
					     rbd_obj_bytes(&rbd_dev->header),
					     rbd_obj_bytes(&rbd_dev->header));
			which++;
		}

		osd_req_op_extent_init(osd_req, which, opcode, offset, length,
				       0, 0);
		if (type == OBJ_REQUEST_BIO)
			osd_req_op_extent_osd_data_bio(osd_req, which,
					obj_request->bio_list, length);
		else
			osd_req_op_extent_osd_data_pages(osd_req, which,
					obj_request->pages, length,
					offset & ~PAGE_MASK, false, false);
		obj_request->img_offset = img_offset;

		if (write_request)
			rbd_osd_req_format_write(obj_request);
		else
			rbd_osd_req_format_read(obj_request);
		rbd_img_obj_request_fill(obj_request, osd_req, op_type, 0);

		obj_request->img_offset = img_offset;
		rbd_img_request_get(img_request);

		img_offset += length;
		resid -= length;
@@ -2386,7 +2506,8 @@ rbd_img_obj_copyup_callback(struct rbd_obj_request *obj_request)
	struct page **pages;
	u32 page_count;

	rbd_assert(obj_request->type == OBJ_REQUEST_BIO);
	rbd_assert(obj_request->type == OBJ_REQUEST_BIO ||
		obj_request->type == OBJ_REQUEST_NODATA);
	rbd_assert(obj_request_img_data_test(obj_request));
	img_request = obj_request->img_request;
	rbd_assert(img_request);
@@ -2424,11 +2545,10 @@ rbd_img_obj_parent_read_full_callback(struct rbd_img_request *img_request)
	struct ceph_osd_client *osdc;
	struct rbd_device *rbd_dev;
	struct page **pages;
	enum obj_operation_type op_type;
	u32 page_count;
	int img_result;
	u64 parent_length;
	u64 offset;
	u64 length;

	rbd_assert(img_request_child_test(img_request));

@@ -2492,26 +2612,10 @@ rbd_img_obj_parent_read_full_callback(struct rbd_img_request *img_request)
	osd_req_op_cls_request_data_pages(osd_req, 0, pages, parent_length, 0,
						false, false);

	/* Then the hint op */

	osd_req_op_alloc_hint_init(osd_req, 1, rbd_obj_bytes(&rbd_dev->header),
				   rbd_obj_bytes(&rbd_dev->header));

	/* And the original write request op */

	offset = orig_request->offset;
	length = orig_request->length;
	osd_req_op_extent_init(osd_req, 2, CEPH_OSD_OP_WRITE,
					offset, length, 0, 0);
	if (orig_request->type == OBJ_REQUEST_BIO)
		osd_req_op_extent_osd_data_bio(osd_req, 2,
					orig_request->bio_list, length);
	else
		osd_req_op_extent_osd_data_pages(osd_req, 2,
					orig_request->pages, length,
					offset & ~PAGE_MASK, false, false);
	/* Add the other op(s) */

	rbd_osd_req_format_write(orig_request);
	op_type = rbd_img_request_op_type(orig_request->img_request);
	rbd_img_obj_request_fill(orig_request, osd_req, op_type, 1);

	/* All set, send it off. */

@@ -2728,7 +2832,7 @@ static int rbd_img_obj_exists_submit(struct rbd_obj_request *obj_request)

	rbd_assert(obj_request->img_request);
	rbd_dev = obj_request->img_request->rbd_dev;
	stat_request->osd_req = rbd_osd_req_create(rbd_dev, false, 1,
	stat_request->osd_req = rbd_osd_req_create(rbd_dev, OBJ_OP_READ, 1,
						   stat_request);
	if (!stat_request->osd_req)
		goto out;
@@ -2748,11 +2852,10 @@ static int rbd_img_obj_exists_submit(struct rbd_obj_request *obj_request)
	return ret;
}

static int rbd_img_obj_request_submit(struct rbd_obj_request *obj_request)
static bool img_obj_request_simple(struct rbd_obj_request *obj_request)
{
	struct rbd_img_request *img_request;
	struct rbd_device *rbd_dev;
	bool known;

	rbd_assert(obj_request_img_data_test(obj_request));

@@ -2760,22 +2863,44 @@ static int rbd_img_obj_request_submit(struct rbd_obj_request *obj_request)
	rbd_assert(img_request);
	rbd_dev = img_request->rbd_dev;

	/* Reads */
	if (!img_request_write_test(img_request) &&
	    !img_request_discard_test(img_request))
		return true;

	/* Non-layered writes */
	if (!img_request_layered_test(img_request))
		return true;

	/*
	 * Only writes to layered images need special handling.
	 * Reads and non-layered writes are simple object requests.
	 * Layered writes that start beyond the end of the overlap
	 * with the parent have no parent data, so they too are
	 * simple object requests.  Finally, if the target object is
	 * known to already exist, its parent data has already been
	 * copied, so a write to the object can also be handled as a
	 * simple object request.
	 * Layered writes outside of the parent overlap range don't
	 * share any data with the parent.
	 */
	if (!img_request_write_test(img_request) ||
		!img_request_layered_test(img_request) ||
		!obj_request_overlaps_parent(obj_request) ||
		((known = obj_request_known_test(obj_request)) &&
			obj_request_exists_test(obj_request))) {
	if (!obj_request_overlaps_parent(obj_request))
		return true;

	/*
	 * Entire-object layered writes - we will overwrite whatever
	 * parent data there is anyway.
	 */
	if (!obj_request->offset &&
	    obj_request->length == rbd_obj_bytes(&rbd_dev->header))
		return true;

	/*
	 * If the object is known to already exist, its parent data has
	 * already been copied.
	 */
	if (obj_request_known_test(obj_request) &&
	    obj_request_exists_test(obj_request))
		return true;

	return false;
}

static int rbd_img_obj_request_submit(struct rbd_obj_request *obj_request)
{
	if (img_obj_request_simple(obj_request)) {
		struct rbd_device *rbd_dev;
		struct ceph_osd_client *osdc;

@@ -2791,7 +2916,7 @@ static int rbd_img_obj_request_submit(struct rbd_obj_request *obj_request)
	 * start by reading the data for the full target object from
	 * the parent so we can use it for a copyup to the target.
	 */
	if (known)
	if (obj_request_known_test(obj_request))
		return rbd_img_obj_parent_read_full(obj_request);

	/* We don't know whether the target exists.  Go find out. */
@@ -2932,7 +3057,7 @@ static int rbd_obj_notify_ack_sync(struct rbd_device *rbd_dev, u64 notify_id)
		return -ENOMEM;

	ret = -ENOMEM;
	obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, 1,
	obj_request->osd_req = rbd_osd_req_create(rbd_dev, OBJ_OP_READ, 1,
						  obj_request);
	if (!obj_request->osd_req)
		goto out;
@@ -2995,7 +3120,7 @@ static struct rbd_obj_request *rbd_obj_watch_request_helper(
	if (!obj_request)
		return ERR_PTR(-ENOMEM);

	obj_request->osd_req = rbd_osd_req_create(rbd_dev, true, 1,
	obj_request->osd_req = rbd_osd_req_create(rbd_dev, OBJ_OP_WRITE, 1,
						  obj_request);
	if (!obj_request->osd_req) {
		ret = -ENOMEM;
@@ -3133,7 +3258,7 @@ static int rbd_obj_method_sync(struct rbd_device *rbd_dev,
	obj_request->pages = pages;
	obj_request->page_count = page_count;

	obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, 1,
	obj_request->osd_req = rbd_osd_req_create(rbd_dev, OBJ_OP_READ, 1,
						  obj_request);
	if (!obj_request->osd_req)
		goto out;
@@ -3183,11 +3308,20 @@ static int rbd_obj_method_sync(struct rbd_device *rbd_dev,
static void rbd_handle_request(struct rbd_device *rbd_dev, struct request *rq)
{
	struct rbd_img_request *img_request;
	struct ceph_snap_context *snapc = NULL;
	u64 offset = (u64)blk_rq_pos(rq) << SECTOR_SHIFT;
	u64 length = blk_rq_bytes(rq);
	bool wr = rq_data_dir(rq) == WRITE;
	enum obj_operation_type op_type;
	u64 mapping_size;
	int result;

	if (rq->cmd_flags & REQ_DISCARD)
		op_type = OBJ_OP_DISCARD;
	else if (rq->cmd_flags & REQ_WRITE)
		op_type = OBJ_OP_WRITE;
	else
		op_type = OBJ_OP_READ;

	/* Ignore/skip any zero-length requests */

	if (!length) {
@@ -3196,9 +3330,9 @@ static void rbd_handle_request(struct rbd_device *rbd_dev, struct request *rq)
		goto err_rq;
	}

	/* Disallow writes to a read-only device */
	/* Only reads are allowed to a read-only device */

	if (wr) {
	if (op_type != OBJ_OP_READ) {
		if (rbd_dev->mapping.read_only) {
			result = -EROFS;
			goto err_rq;
@@ -3226,21 +3360,35 @@ static void rbd_handle_request(struct rbd_device *rbd_dev, struct request *rq)
		goto err_rq;	/* Shouldn't happen */
	}

	if (offset + length > rbd_dev->mapping.size) {
	down_read(&rbd_dev->header_rwsem);
	mapping_size = rbd_dev->mapping.size;
	if (op_type != OBJ_OP_READ) {
		snapc = rbd_dev->header.snapc;
		ceph_get_snap_context(snapc);
	}
	up_read(&rbd_dev->header_rwsem);

	if (offset + length > mapping_size) {
		rbd_warn(rbd_dev, "beyond EOD (%llu~%llu > %llu)", offset,
			 length, rbd_dev->mapping.size);
			 length, mapping_size);
		result = -EIO;
		goto err_rq;
	}

	img_request = rbd_img_request_create(rbd_dev, offset, length, wr);
	img_request = rbd_img_request_create(rbd_dev, offset, length, op_type,
					     snapc);
	if (!img_request) {
		result = -ENOMEM;
		goto err_rq;
	}
	img_request->rq = rq;

	result = rbd_img_request_fill(img_request, OBJ_REQUEST_BIO, rq->bio);
	if (op_type == OBJ_OP_DISCARD)
		result = rbd_img_request_fill(img_request, OBJ_REQUEST_NODATA,
					      NULL);
	else
		result = rbd_img_request_fill(img_request, OBJ_REQUEST_BIO,
					      rq->bio);
	if (result)
		goto err_img_request;

@@ -3255,7 +3403,9 @@ static void rbd_handle_request(struct rbd_device *rbd_dev, struct request *rq)
err_rq:
	if (result)
		rbd_warn(rbd_dev, "%s %llx at %llx result %d",
			 wr ? "write" : "read", length, offset, result);
			 obj_op_name(op_type), length, offset, result);
	if (snapc)
		ceph_put_snap_context(snapc);
	blk_end_request_all(rq, result);
}

@@ -3393,7 +3543,7 @@ static int rbd_obj_read_sync(struct rbd_device *rbd_dev,
	obj_request->pages = pages;
	obj_request->page_count = page_count;

	obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, 1,
	obj_request->osd_req = rbd_osd_req_create(rbd_dev, OBJ_OP_READ, 1,
						  obj_request);
	if (!obj_request->osd_req)
		goto out;
@@ -3610,6 +3760,13 @@ static int rbd_init_disk(struct rbd_device *rbd_dev)
	blk_queue_io_min(q, segment_size);
	blk_queue_io_opt(q, segment_size);

	/* enable the discard support */
	queue_flag_set_unlocked(QUEUE_FLAG_DISCARD, q);
	q->limits.discard_granularity = segment_size;
	q->limits.discard_alignment = segment_size;
	q->limits.max_discard_sectors = segment_size / SECTOR_SIZE;
	q->limits.discard_zeroes_data = 1;

	blk_queue_merge_bvec(q, rbd_merge_bvec);
	disk->queue = q;

@@ -4924,7 +5081,7 @@ static int rbd_dev_image_id(struct rbd_device *rbd_dev)
		ret = image_id ? 0 : -ENOMEM;
		if (!ret)
			rbd_dev->image_format = 1;
	} else if (ret > sizeof (__le32)) {
	} else if (ret >= 0) {
		void *p = response;

		image_id = ceph_extract_encoded_string(&p, p + ret,
@@ -4932,8 +5089,6 @@ static int rbd_dev_image_id(struct rbd_device *rbd_dev)
		ret = PTR_ERR_OR_ZERO(image_id);
		if (!ret)
			rbd_dev->image_format = 2;
	} else {
		ret = -EINVAL;
	}

	if (!ret) {
@@ -5087,7 +5242,8 @@ static int rbd_dev_device_setup(struct rbd_device *rbd_dev)
	set_capacity(rbd_dev->disk, rbd_dev->mapping.size / SECTOR_SIZE);
	set_disk_ro(rbd_dev->disk, rbd_dev->mapping.read_only);

	rbd_dev->rq_wq = alloc_workqueue("%s", 0, 0, rbd_dev->disk->disk_name);
	rbd_dev->rq_wq = alloc_workqueue("%s", WQ_MEM_RECLAIM, 0,
					 rbd_dev->disk->disk_name);
	if (!rbd_dev->rq_wq) {
		ret = -ENOMEM;
		goto err_out_mapping;
+99 −26
Original line number Diff line number Diff line
@@ -169,36 +169,109 @@ int ceph_set_acl(struct inode *inode, struct posix_acl *acl, int type)
	return ret;
}

int ceph_init_acl(struct dentry *dentry, struct inode *inode, struct inode *dir)
int ceph_pre_init_acls(struct inode *dir, umode_t *mode,
		       struct ceph_acls_info *info)
{
	struct posix_acl *default_acl, *acl;
	umode_t new_mode = inode->i_mode;
	int error;
	struct posix_acl *acl, *default_acl;
	size_t val_size1 = 0, val_size2 = 0;
	struct ceph_pagelist *pagelist = NULL;
	void *tmp_buf = NULL;
	int err;

	error = posix_acl_create(dir, &new_mode, &default_acl, &acl);
	if (error)
		return error;
	err = posix_acl_create(dir, mode, &default_acl, &acl);
	if (err)
		return err;

	if (!default_acl && !acl) {
		cache_no_acl(inode);
		if (new_mode != inode->i_mode) {
			struct iattr newattrs = {
				.ia_mode = new_mode,
				.ia_valid = ATTR_MODE,
			};
			error = ceph_setattr(dentry, &newattrs);
	if (acl) {
		int ret = posix_acl_equiv_mode(acl, mode);
		if (ret < 0)
			goto out_err;
		if (ret == 0) {
			posix_acl_release(acl);
			acl = NULL;
		}
		return error;
	}

	if (!default_acl && !acl)
		return 0;

	if (acl)
		val_size1 = posix_acl_xattr_size(acl->a_count);
	if (default_acl)
		val_size2 = posix_acl_xattr_size(default_acl->a_count);

	err = -ENOMEM;
	tmp_buf = kmalloc(max(val_size1, val_size2), GFP_NOFS);
	if (!tmp_buf)
		goto out_err;
	pagelist = kmalloc(sizeof(struct ceph_pagelist), GFP_NOFS);
	if (!pagelist)
		goto out_err;
	ceph_pagelist_init(pagelist);

	err = ceph_pagelist_reserve(pagelist, PAGE_SIZE);
	if (err)
		goto out_err;

	ceph_pagelist_encode_32(pagelist, acl && default_acl ? 2 : 1);

	if (acl) {
		size_t len = strlen(POSIX_ACL_XATTR_ACCESS);
		err = ceph_pagelist_reserve(pagelist, len + val_size1 + 8);
		if (err)
			goto out_err;
		ceph_pagelist_encode_string(pagelist, POSIX_ACL_XATTR_ACCESS,
					    len);
		err = posix_acl_to_xattr(&init_user_ns, acl,
					 tmp_buf, val_size1);
		if (err < 0)
			goto out_err;
		ceph_pagelist_encode_32(pagelist, val_size1);
		ceph_pagelist_append(pagelist, tmp_buf, val_size1);
	}
	if (default_acl) {
		error = ceph_set_acl(inode, default_acl, ACL_TYPE_DEFAULT);
		posix_acl_release(default_acl);
		size_t len = strlen(POSIX_ACL_XATTR_DEFAULT);
		err = ceph_pagelist_reserve(pagelist, len + val_size2 + 8);
		if (err)
			goto out_err;
		err = ceph_pagelist_encode_string(pagelist,
						  POSIX_ACL_XATTR_DEFAULT, len);
		err = posix_acl_to_xattr(&init_user_ns, default_acl,
					 tmp_buf, val_size2);
		if (err < 0)
			goto out_err;
		ceph_pagelist_encode_32(pagelist, val_size2);
		ceph_pagelist_append(pagelist, tmp_buf, val_size2);
	}
	if (acl) {
		if (!error)
			error = ceph_set_acl(inode, acl, ACL_TYPE_ACCESS);

	kfree(tmp_buf);

	info->acl = acl;
	info->default_acl = default_acl;
	info->pagelist = pagelist;
	return 0;

out_err:
	posix_acl_release(acl);
	posix_acl_release(default_acl);
	kfree(tmp_buf);
	if (pagelist)
		ceph_pagelist_release(pagelist);
	return err;
}
	return error;

void ceph_init_inode_acls(struct inode* inode, struct ceph_acls_info *info)
{
	if (!inode)
		return;
	ceph_set_cached_acl(inode, ACL_TYPE_ACCESS, info->acl);
	ceph_set_cached_acl(inode, ACL_TYPE_DEFAULT, info->default_acl);
}

void ceph_release_acls_info(struct ceph_acls_info *info)
{
	posix_acl_release(info->acl);
	posix_acl_release(info->default_acl);
	if (info->pagelist)
		ceph_pagelist_release(info->pagelist);
}
+0 −9
Original line number Diff line number Diff line
@@ -1076,12 +1076,6 @@ static int ceph_update_writeable_page(struct file *file,
	/* past end of file? */
	i_size = inode->i_size;   /* caller holds i_mutex */

	if (i_size + len > inode->i_sb->s_maxbytes) {
		/* file is too big */
		r = -EINVAL;
		goto fail;
	}

	if (page_off >= i_size ||
	    (pos_in_page == 0 && (pos+len) >= i_size &&
	     end_in_page - pos_in_page != PAGE_CACHE_SIZE)) {
@@ -1099,9 +1093,6 @@ static int ceph_update_writeable_page(struct file *file,
	if (r < 0)
		goto fail_nosnap;
	goto retry_locked;

fail:
	up_read(&mdsc->snap_rwsem);
fail_nosnap:
	unlock_page(page);
	return r;
+19 −18

File changed.

Preview size limit exceeded, changes collapsed.

+46 −0

File changed.

Preview size limit exceeded, changes collapsed.

Loading