Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit 1643dfa4 authored by Ilya Dryomov's avatar Ilya Dryomov
Browse files

rbd: introduce a per-device ordered workqueue



This is going to be used for re-registering watch requests and
exclusive-lock tasks: acquire/request lock, notify-acquired, release
lock, notify-released.  Some refactoring in the map/unmap paths was
necessary to give this workqueue a meaningful name: "rbdX-tasks".

Signed-off-by: default avatarIlya Dryomov <idryomov@gmail.com>
Reviewed-by: default avatarMike Christie <mchristi@redhat.com>
parent 033268a5
Loading
Loading
Loading
Loading
+71 −80
Original line number Diff line number Diff line
@@ -128,11 +128,8 @@ static int atomic_dec_return_safe(atomic_t *v)
/*
 * An RBD device name will be "rbd#", where the "rbd" comes from
 * RBD_DRV_NAME above, and # is a unique integer identifier.
 * MAX_INT_FORMAT_WIDTH is used in ensuring DEV_NAME_LEN is big
 * enough to hold all possible device names.
 */
#define DEV_NAME_LEN		32
#define MAX_INT_FORMAT_WIDTH	((5 * sizeof (int)) / 2 + 1)

/*
 * block device image metadata (in-memory version)
@@ -353,10 +350,12 @@ struct rbd_device {
	struct ceph_object_id	header_oid;
	struct ceph_object_locator header_oloc;

	struct ceph_file_layout	layout;
	struct ceph_file_layout	layout;		/* used for all rbd requests */

	struct ceph_osd_linger_request *watch_handle;

	struct workqueue_struct	*task_wq;

	struct rbd_spec		*parent_spec;
	u64			parent_overlap;
	atomic_t		parent_ref;
@@ -3944,11 +3943,8 @@ static void rbd_spec_free(struct kref *kref)
	kfree(spec);
}

static void rbd_dev_release(struct device *dev)
static void rbd_dev_free(struct rbd_device *rbd_dev)
{
	struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
	bool need_put = !!rbd_dev->opts;

	ceph_oid_destroy(&rbd_dev->header_oid);
	ceph_oloc_destroy(&rbd_dev->header_oloc);

@@ -3956,6 +3952,19 @@ static void rbd_dev_release(struct device *dev)
	rbd_spec_put(rbd_dev->spec);
	kfree(rbd_dev->opts);
	kfree(rbd_dev);
}

static void rbd_dev_release(struct device *dev)
{
	struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
	bool need_put = !!rbd_dev->opts;

	if (need_put) {
		destroy_workqueue(rbd_dev->task_wq);
		ida_simple_remove(&rbd_dev_id_ida, rbd_dev->dev_id);
	}

	rbd_dev_free(rbd_dev);

	/*
	 * This is racy, but way better than putting module outside of
@@ -3966,9 +3975,8 @@ static void rbd_dev_release(struct device *dev)
		module_put(THIS_MODULE);
}

static struct rbd_device *rbd_dev_create(struct rbd_client *rbdc,
					 struct rbd_spec *spec,
					 struct rbd_options *opts)
static struct rbd_device *__rbd_dev_create(struct rbd_client *rbdc,
					   struct rbd_spec *spec)
{
	struct rbd_device *rbd_dev;

@@ -3977,8 +3985,6 @@ static struct rbd_device *rbd_dev_create(struct rbd_client *rbdc,
		return NULL;

	spin_lock_init(&rbd_dev->lock);
	rbd_dev->flags = 0;
	atomic_set(&rbd_dev->parent_ref, 0);
	INIT_LIST_HEAD(&rbd_dev->node);
	init_rwsem(&rbd_dev->header_rwsem);

@@ -3992,9 +3998,6 @@ static struct rbd_device *rbd_dev_create(struct rbd_client *rbdc,

	rbd_dev->rbd_client = rbdc;
	rbd_dev->spec = spec;
	rbd_dev->opts = opts;

	/* Initialize the layout used for all rbd requests */

	rbd_dev->layout.stripe_unit = 1 << RBD_MAX_OBJ_ORDER;
	rbd_dev->layout.stripe_count = 1;
@@ -4002,15 +4005,48 @@ static struct rbd_device *rbd_dev_create(struct rbd_client *rbdc,
	rbd_dev->layout.pool_id = spec->pool_id;
	RCU_INIT_POINTER(rbd_dev->layout.pool_ns, NULL);

	return rbd_dev;
}

/*
	 * If this is a mapping rbd_dev (as opposed to a parent one),
	 * pin our module.  We have a ref from do_rbd_add(), so use
	 * __module_get().
 * Create a mapping rbd_dev.
 */
	if (rbd_dev->opts)
static struct rbd_device *rbd_dev_create(struct rbd_client *rbdc,
					 struct rbd_spec *spec,
					 struct rbd_options *opts)
{
	struct rbd_device *rbd_dev;

	rbd_dev = __rbd_dev_create(rbdc, spec);
	if (!rbd_dev)
		return NULL;

	rbd_dev->opts = opts;

	/* get an id and fill in device name */
	rbd_dev->dev_id = ida_simple_get(&rbd_dev_id_ida, 0,
					 minor_to_rbd_dev_id(1 << MINORBITS),
					 GFP_KERNEL);
	if (rbd_dev->dev_id < 0)
		goto fail_rbd_dev;

	sprintf(rbd_dev->name, RBD_DRV_NAME "%d", rbd_dev->dev_id);
	rbd_dev->task_wq = alloc_ordered_workqueue("%s-tasks", WQ_MEM_RECLAIM,
						   rbd_dev->name);
	if (!rbd_dev->task_wq)
		goto fail_dev_id;

	/* we have a ref from do_rbd_add() */
	__module_get(THIS_MODULE);

	dout("%s rbd_dev %p dev_id %d\n", __func__, rbd_dev, rbd_dev->dev_id);
	return rbd_dev;

fail_dev_id:
	ida_simple_remove(&rbd_dev_id_ida, rbd_dev->dev_id);
fail_rbd_dev:
	rbd_dev_free(rbd_dev);
	return NULL;
}

static void rbd_dev_destroy(struct rbd_device *rbd_dev)
@@ -4645,46 +4681,6 @@ static int rbd_dev_header_info(struct rbd_device *rbd_dev)
	return rbd_dev_v2_header_info(rbd_dev);
}

/*
 * Get a unique rbd identifier for the given new rbd_dev, and add
 * the rbd_dev to the global list.
 */
static int rbd_dev_id_get(struct rbd_device *rbd_dev)
{
	int new_dev_id;

	new_dev_id = ida_simple_get(&rbd_dev_id_ida,
				    0, minor_to_rbd_dev_id(1 << MINORBITS),
				    GFP_KERNEL);
	if (new_dev_id < 0)
		return new_dev_id;

	rbd_dev->dev_id = new_dev_id;

	spin_lock(&rbd_dev_list_lock);
	list_add_tail(&rbd_dev->node, &rbd_dev_list);
	spin_unlock(&rbd_dev_list_lock);

	dout("rbd_dev %p given dev id %d\n", rbd_dev, rbd_dev->dev_id);

	return 0;
}

/*
 * Remove an rbd_dev from the global list, and record that its
 * identifier is no longer in use.
 */
static void rbd_dev_id_put(struct rbd_device *rbd_dev)
{
	spin_lock(&rbd_dev_list_lock);
	list_del_init(&rbd_dev->node);
	spin_unlock(&rbd_dev_list_lock);

	ida_simple_remove(&rbd_dev_id_ida, rbd_dev->dev_id);

	dout("rbd_dev %p released dev id %d\n", rbd_dev, rbd_dev->dev_id);
}

/*
 * Skips over white space at *buf, and updates *buf to point to the
 * first found non-space character (if any). Returns the length of
@@ -5077,8 +5073,7 @@ static int rbd_dev_probe_parent(struct rbd_device *rbd_dev, int depth)
		goto out_err;
	}

	parent = rbd_dev_create(rbd_dev->rbd_client, rbd_dev->parent_spec,
				NULL);
	parent = __rbd_dev_create(rbd_dev->rbd_client, rbd_dev->parent_spec);
	if (!parent) {
		ret = -ENOMEM;
		goto out_err;
@@ -5113,22 +5108,12 @@ static int rbd_dev_device_setup(struct rbd_device *rbd_dev)
{
	int ret;

	/* Get an id and fill in device name. */

	ret = rbd_dev_id_get(rbd_dev);
	if (ret)
		goto err_out_unlock;

	BUILD_BUG_ON(DEV_NAME_LEN
			< sizeof (RBD_DRV_NAME) + MAX_INT_FORMAT_WIDTH);
	sprintf(rbd_dev->name, "%s%d", RBD_DRV_NAME, rbd_dev->dev_id);

	/* Record our major and minor device numbers. */

	if (!single_major) {
		ret = register_blkdev(0, rbd_dev->name);
		if (ret < 0)
			goto err_out_id;
			goto err_out_unlock;

		rbd_dev->major = ret;
		rbd_dev->minor = 0;
@@ -5160,6 +5145,10 @@ static int rbd_dev_device_setup(struct rbd_device *rbd_dev)
	set_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
	up_write(&rbd_dev->header_rwsem);

	spin_lock(&rbd_dev_list_lock);
	list_add_tail(&rbd_dev->node, &rbd_dev_list);
	spin_unlock(&rbd_dev_list_lock);

	add_disk(rbd_dev->disk);
	pr_info("%s: added with size 0x%llx\n", rbd_dev->disk->disk_name,
		(unsigned long long) rbd_dev->mapping.size);
@@ -5173,8 +5162,6 @@ static int rbd_dev_device_setup(struct rbd_device *rbd_dev)
err_out_blkdev:
	if (!single_major)
		unregister_blkdev(rbd_dev->major, rbd_dev->name);
err_out_id:
	rbd_dev_id_put(rbd_dev);
err_out_unlock:
	up_write(&rbd_dev->header_rwsem);
	return ret;
@@ -5406,12 +5393,16 @@ static ssize_t rbd_add_single_major(struct bus_type *bus,
static void rbd_dev_device_release(struct rbd_device *rbd_dev)
{
	rbd_free_disk(rbd_dev);

	spin_lock(&rbd_dev_list_lock);
	list_del_init(&rbd_dev->node);
	spin_unlock(&rbd_dev_list_lock);

	clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
	device_del(&rbd_dev->dev);
	rbd_dev_mapping_clear(rbd_dev);
	if (!single_major)
		unregister_blkdev(rbd_dev->major, rbd_dev->name);
	rbd_dev_id_put(rbd_dev);
}

static void rbd_dev_remove_parent(struct rbd_device *rbd_dev)