Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit f9ff0da5 authored by Lars Ellenberg's avatar Lars Ellenberg Committed by Jens Axboe
Browse files

drbd: allow parallel flushes for multi-volume resources



To maintain write-order fidelity accros all volumes in a DRBD resource,
the receiver of a P_BARRIER needs to issue flushes to all volumes.
We used to do this by calling blkdev_issue_flush(), synchronously,
one volume at a time.

We now submit all flushes to all volumes in parallel, then wait for all
completions, to reduce worst-case latencies on multi-volume resources.

Signed-off-by: default avatarPhilipp Reisner <philipp.reisner@linbit.com>
Signed-off-by: default avatarLars Ellenberg <lars.ellenberg@linbit.com>
Signed-off-by: default avatarJens Axboe <axboe@fb.com>
parent 0982368b
Loading
Loading
Loading
Loading
+89 −25
Original line number Diff line number Diff line
@@ -1204,13 +1204,84 @@ static int drbd_recv_header(struct drbd_connection *connection, struct packet_in
	return err;
}

/* This is blkdev_issue_flush, but asynchronous.
 * We want to submit to all component volumes in parallel,
 * then wait for all completions.
 */
struct issue_flush_context {
	atomic_t pending;
	int error;
	struct completion done;
};
struct one_flush_context {
	struct drbd_device *device;
	struct issue_flush_context *ctx;
};

void one_flush_endio(struct bio *bio)
{
	struct one_flush_context *octx = bio->bi_private;
	struct drbd_device *device = octx->device;
	struct issue_flush_context *ctx = octx->ctx;

	if (bio->bi_error) {
		ctx->error = bio->bi_error;
		drbd_info(device, "local disk FLUSH FAILED with status %d\n", bio->bi_error);
	}
	kfree(octx);
	bio_put(bio);

	clear_bit(FLUSH_PENDING, &device->flags);
	put_ldev(device);
	kref_put(&device->kref, drbd_destroy_device);

	if (atomic_dec_and_test(&ctx->pending))
		complete(&ctx->done);
}

static void submit_one_flush(struct drbd_device *device, struct issue_flush_context *ctx)
{
	struct bio *bio = bio_alloc(GFP_NOIO, 0);
	struct one_flush_context *octx = kmalloc(sizeof(*octx), GFP_NOIO);
	if (!bio || !octx) {
		drbd_warn(device, "Could not allocate a bio, CANNOT ISSUE FLUSH\n");
		/* FIXME: what else can I do now?  disconnecting or detaching
		 * really does not help to improve the state of the world, either.
		 */
		kfree(octx);
		if (bio)
			bio_put(bio);

		ctx->error = -ENOMEM;
		put_ldev(device);
		kref_put(&device->kref, drbd_destroy_device);
		return;
	}

	octx->device = device;
	octx->ctx = ctx;
	bio->bi_bdev = device->ldev->backing_bdev;
	bio->bi_private = octx;
	bio->bi_end_io = one_flush_endio;
	bio_set_op_attrs(bio, REQ_OP_FLUSH, WRITE_FLUSH);

	device->flush_jif = jiffies;
	set_bit(FLUSH_PENDING, &device->flags);
	atomic_inc(&ctx->pending);
	submit_bio(bio);
}

static void drbd_flush(struct drbd_connection *connection)
{
	int rv;
	if (connection->resource->write_ordering >= WO_BDEV_FLUSH) {
		struct drbd_peer_device *peer_device;
		struct issue_flush_context ctx;
		int vnr;

	if (connection->resource->write_ordering >= WO_BDEV_FLUSH) {
		atomic_set(&ctx.pending, 1);
		ctx.error = 0;
		init_completion(&ctx.done);

		rcu_read_lock();
		idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
			struct drbd_device *device = peer_device->device;
@@ -1220,31 +1291,24 @@ static void drbd_flush(struct drbd_connection *connection)
			kref_get(&device->kref);
			rcu_read_unlock();

			/* Right now, we have only this one synchronous code path
			 * for flushes between request epochs.
			 * We may want to make those asynchronous,
			 * or at least parallelize the flushes to the volume devices.
			 */
			device->flush_jif = jiffies;
			set_bit(FLUSH_PENDING, &device->flags);
			rv = blkdev_issue_flush(device->ldev->backing_bdev,
					GFP_NOIO, NULL);
			clear_bit(FLUSH_PENDING, &device->flags);
			if (rv) {
				drbd_info(device, "local disk flush failed with status %d\n", rv);
			submit_one_flush(device, &ctx);

			rcu_read_lock();
		}
		rcu_read_unlock();

		/* Do we want to add a timeout,
		 * if disk-timeout is set? */
		if (!atomic_dec_and_test(&ctx.pending))
			wait_for_completion(&ctx.done);

		if (ctx.error) {
			/* would rather check on EOPNOTSUPP, but that is not reliable.
			 * don't try again for ANY return value != 0
			 * if (rv == -EOPNOTSUPP) */
			/* Any error is already reported by bio_endio callback. */
			drbd_bump_write_ordering(connection->resource, NULL, WO_DRAIN_IO);
		}
			put_ldev(device);
			kref_put(&device->kref, drbd_destroy_device);

			rcu_read_lock();
			if (rv)
				break;
		}
		rcu_read_unlock();
	}
}