Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit 2312f0b3 authored by Lars Ellenberg's avatar Lars Ellenberg Committed by Philipp Reisner
Browse files

drbd: fix potential deadlock during "restart" of conflicting writes



w_restart_write(), run from worker context, calls __drbd_make_request()
and further drbd_al_begin_io(, delegate=true), which then
potentially deadlocks.  The previous patch moved a BUG_ON to expose
such call paths, which would now be triggered.

Also, if we call __drbd_make_request() from resource worker context,
like w_restart_write() did, and that should block for whatever reason
(!drbd_state_is_stable(), resource suspended, ...),
we potentially deadlock the whole resource, as the worker
is needed for state changes and other things.

Create a dedicated retry workqueue for this instead.

Also make sure that inc_ap_bio()/dec_ap_bio() are properly paired,
even if do_retry() needs to retry itself,
in case __drbd_make_request() returns != 0.

Signed-off-by: default avatarPhilipp Reisner <philipp.reisner@linbit.com>
Signed-off-by: default avatarLars Ellenberg <lars.ellenberg@linbit.com>
parent f9916d61
Loading
Loading
Loading
Loading
+79 −0
Original line number Diff line number Diff line
@@ -2383,6 +2383,73 @@ void drbd_minor_destroy(struct kref *kref)
	kref_put(&tconn->kref, &conn_destroy);
}

/* One global retry thread, if we need to push back some bio and have it
 * reinserted through our make request function.
 */
static struct retry_worker {
	struct workqueue_struct *wq;
	struct work_struct worker;

	spinlock_t lock;
	struct list_head writes;
} retry;

static void do_retry(struct work_struct *ws)
{
	struct retry_worker *retry = container_of(ws, struct retry_worker, worker);
	LIST_HEAD(writes);
	struct drbd_request *req, *tmp;

	spin_lock_irq(&retry->lock);
	list_splice_init(&retry->writes, &writes);
	spin_unlock_irq(&retry->lock);

	list_for_each_entry_safe(req, tmp, &writes, tl_requests) {
		struct drbd_conf *mdev = req->w.mdev;
		struct bio *bio = req->master_bio;
		unsigned long start_time = req->start_time;

		/* We have exclusive access to this request object.
		 * If it had not been RQ_POSTPONED, the code path which queued
		 * it here would have completed and freed it already.
		 */
		mempool_free(req, drbd_request_mempool);

		/* A single suspended or otherwise blocking device may stall
		 * all others as well.  Fortunately, this code path is to
		 * recover from a situation that "should not happen":
		 * concurrent writes in multi-primary setup.
		 * In a "normal" lifecycle, this workqueue is supposed to be
		 * destroyed without ever doing anything.
		 * If it turns out to be an issue anyways, we can do per
		 * resource (replication group) or per device (minor) retry
		 * workqueues instead.
		 */

		/* We are not just doing generic_make_request(),
		 * as we want to keep the start_time information. */
		do {
			inc_ap_bio(mdev);
		} while(__drbd_make_request(mdev, bio, start_time));
	}
}

void drbd_restart_write(struct drbd_request *req)
{
	unsigned long flags;
	spin_lock_irqsave(&retry.lock, flags);
	list_move_tail(&req->tl_requests, &retry.writes);
	spin_unlock_irqrestore(&retry.lock, flags);

	/* Drop the extra reference that would otherwise
	 * have been dropped by complete_master_bio.
	 * do_retry() needs to grab a new one. */
	dec_ap_bio(req->w.mdev);

	queue_work(retry.wq, &retry.worker);
}


static void drbd_cleanup(void)
{
	unsigned int i;
@@ -2402,6 +2469,9 @@ static void drbd_cleanup(void)
	if (drbd_proc)
		remove_proc_entry("drbd", NULL);

	if (retry.wq)
		destroy_workqueue(retry.wq);

	drbd_genl_unregister();

	idr_for_each_entry(&minors, mdev, i) {
@@ -2851,6 +2921,15 @@ int __init drbd_init(void)
	rwlock_init(&global_state_lock);
	INIT_LIST_HEAD(&drbd_tconns);

	retry.wq = create_singlethread_workqueue("drbd-reissue");
	if (!retry.wq) {
		printk(KERN_ERR "drbd: unable to create retry workqueue\n");
		goto fail;
	}
	INIT_WORK(&retry.worker, do_retry);
	spin_lock_init(&retry.lock);
	INIT_LIST_HEAD(&retry.writes);

	printk(KERN_INFO "drbd: initialized. "
	       "Version: " REL_VERSION " (api:%d/proto:%d-%d)\n",
	       API_VERSION, PRO_VERSION_MIN, PRO_VERSION_MAX);
+3 −29
Original line number Diff line number Diff line
@@ -1748,30 +1748,6 @@ static int receive_RSDataReply(struct drbd_tconn *tconn, struct packet_info *pi)
	return err;
}

static int w_restart_write(struct drbd_work *w, int cancel)
{
	struct drbd_request *req = container_of(w, struct drbd_request, w);
	struct drbd_conf *mdev = w->mdev;
	struct bio *bio;
	unsigned long start_time;
	unsigned long flags;

	spin_lock_irqsave(&mdev->tconn->req_lock, flags);
	if (!expect(req->rq_state & RQ_POSTPONED)) {
		spin_unlock_irqrestore(&mdev->tconn->req_lock, flags);
		return -EIO;
	}
	bio = req->master_bio;
	start_time = req->start_time;
	/* Postponed requests will not have their master_bio completed!  */
	__req_mod(req, DISCARD_WRITE, NULL);
	spin_unlock_irqrestore(&mdev->tconn->req_lock, flags);

	while (__drbd_make_request(mdev, bio, start_time))
		/* retry */ ;
	return 0;
}

static void restart_conflicting_writes(struct drbd_conf *mdev,
				       sector_t sector, int size)
{
@@ -1785,11 +1761,9 @@ static void restart_conflicting_writes(struct drbd_conf *mdev,
		if (req->rq_state & RQ_LOCAL_PENDING ||
		    !(req->rq_state & RQ_POSTPONED))
			continue;
		if (expect(list_empty(&req->w.list))) {
			req->w.mdev = mdev;
			req->w.cb = w_restart_write;
			drbd_queue_work(&mdev->tconn->data.work, &req->w);
		}
		/* as it is RQ_POSTPONED, this will cause it to
		 * be queued on the retry workqueue. */
		__req_mod(req, DISCARD_WRITE, NULL);
	}
}

+14 −3
Original line number Diff line number Diff line
@@ -104,7 +104,7 @@ static void _req_is_done(struct drbd_conf *mdev, struct drbd_request *req, const
	 * and never sent), it should still be "empty" as
	 * initialized in drbd_req_new(), so we can list_del() it
	 * here unconditionally */
	list_del(&req->tl_requests);
	list_del_init(&req->tl_requests);

	/* if it was a write, we may have to set the corresponding
	 * bit(s) out-of-sync first. If it had a local part, we need to
@@ -143,6 +143,9 @@ static void _req_is_done(struct drbd_conf *mdev, struct drbd_request *req, const
		}
	}

	if (s & RQ_POSTPONED)
		drbd_restart_write(req);
	else
		drbd_req_free(req);
}

@@ -289,8 +292,16 @@ void _req_may_be_done(struct drbd_request *req, struct bio_and_error *m)
		if (!(s & RQ_POSTPONED)) {
			m->error = ok ? 0 : (error ?: -EIO);
			m->bio = req->master_bio;
		}
			req->master_bio = NULL;
		} else {
			/* Assert that this will be _req_is_done()
			 * with this very invokation. */
			/* FIXME:
			 * what about (RQ_LOCAL_PENDING | RQ_LOCAL_ABORTED)?
			 */
			D_ASSERT(!(s & RQ_LOCAL_PENDING));
			D_ASSERT(s & RQ_NET_DONE);
		}
	}

	if (s & RQ_LOCAL_PENDING)
+3 −0
Original line number Diff line number Diff line
@@ -268,6 +268,9 @@ extern void request_timer_fn(unsigned long data);
extern void tl_restart(struct drbd_tconn *tconn, enum drbd_req_event what);
extern void _tl_restart(struct drbd_tconn *tconn, enum drbd_req_event what);

/* this is in drbd_main.c */
extern void drbd_restart_write(struct drbd_request *req);

/* use this if you don't want to deal with calling complete_master_bio()
 * outside the spinlock, e.g. when walking some list on cleanup. */
static inline int _req_mod(struct drbd_request *req, enum drbd_req_event what)