Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit b4df6ed8 authored by Mark Fasheh's avatar Mark Fasheh
Browse files

[PATCH] ocfs2: fix orphan recovery deadlock



Orphan dir recovery can deadlock with another process in
ocfs2_delete_inode() in some corner cases. Fix this by tracking recovery
state more closely and allowing it to handle inode wipes which might
deadlock.

Signed-off-by: default avatarMark Fasheh <mark.fasheh@oracle.com>
parent 895928b8
Loading
Loading
Loading
Loading
+1 −0
Original line number Diff line number Diff line
@@ -67,6 +67,7 @@ void ocfs2_init_node_maps(struct ocfs2_super *osb)
	ocfs2_node_map_init(&osb->mounted_map);
	ocfs2_node_map_init(&osb->recovery_map);
	ocfs2_node_map_init(&osb->umount_map);
	ocfs2_node_map_init(&osb->osb_recovering_orphan_dirs);
}

static void ocfs2_do_node_down(int node_num,
+45 −1
Original line number Diff line number Diff line
@@ -41,6 +41,7 @@
#include "dlmglue.h"
#include "extent_map.h"
#include "file.h"
#include "heartbeat.h"
#include "inode.h"
#include "journal.h"
#include "namei.h"
@@ -544,6 +545,42 @@ bail:
	return status;
}

/* 
 * Serialize with orphan dir recovery. If the process doing
 * recovery on this orphan dir does an iget() with the dir
 * i_mutex held, we'll deadlock here. Instead we detect this
 * and exit early - recovery will wipe this inode for us.
 */
static int ocfs2_check_orphan_recovery_state(struct ocfs2_super *osb,
					     int slot)
{
	int ret = 0;

	spin_lock(&osb->osb_lock);
	if (ocfs2_node_map_test_bit(osb, &osb->osb_recovering_orphan_dirs, slot)) {
		mlog(0, "Recovery is happening on orphan dir %d, will skip "
		     "this inode\n", slot);
		ret = -EDEADLK;
		goto out;
	}
	/* This signals to the orphan recovery process that it should
	 * wait for us to handle the wipe. */
	osb->osb_orphan_wipes[slot]++;
out:
	spin_unlock(&osb->osb_lock);
	return ret;
}

static void ocfs2_signal_wipe_completion(struct ocfs2_super *osb,
					 int slot)
{
	spin_lock(&osb->osb_lock);
	osb->osb_orphan_wipes[slot]--;
	spin_unlock(&osb->osb_lock);

	wake_up(&osb->osb_wipe_event);
}

static int ocfs2_wipe_inode(struct inode *inode,
			    struct buffer_head *di_bh)
{
@@ -555,6 +592,11 @@ static int ocfs2_wipe_inode(struct inode *inode,
	/* We've already voted on this so it should be readonly - no
	 * spinlock needed. */
	orphaned_slot = OCFS2_I(inode)->ip_orphaned_slot;

	status = ocfs2_check_orphan_recovery_state(osb, orphaned_slot);
	if (status)
		return status;

	orphan_dir_inode = ocfs2_get_system_file_inode(osb,
						       ORPHAN_DIR_SYSTEM_INODE,
						       orphaned_slot);
@@ -597,6 +639,7 @@ bail_unlock_dir:
	brelse(orphan_dir_bh);
bail:
	iput(orphan_dir_inode);
	ocfs2_signal_wipe_completion(osb, orphaned_slot);

	return status;
}
@@ -822,6 +865,7 @@ void ocfs2_delete_inode(struct inode *inode)

	status = ocfs2_wipe_inode(inode, di_bh);
	if (status < 0) {
		if (status != -EDEADLK)
			mlog_errno(status);
		goto bail_unlock_inode;
	}
+93 −31
Original line number Diff line number Diff line
@@ -1408,21 +1408,17 @@ bail:
	return status;
}

static int ocfs2_recover_orphans(struct ocfs2_super *osb,
				 int slot)
static int ocfs2_queue_orphans(struct ocfs2_super *osb,
			       int slot,
			       struct inode **head)
{
	int status = 0;
	int have_disk_lock = 0;
	struct inode *inode = NULL;
	struct inode *iter;
	int status;
	struct inode *orphan_dir_inode = NULL;
	struct inode *iter;
	unsigned long offset, blk, local;
	struct buffer_head *bh = NULL;
	struct ocfs2_dir_entry *de;
	struct super_block *sb = osb->sb;
	struct ocfs2_inode_info *oi;

	mlog(0, "Recover inodes from orphan dir in slot %d\n", slot);

	orphan_dir_inode = ocfs2_get_system_file_inode(osb,
						       ORPHAN_DIR_SYSTEM_INODE,
@@ -1430,17 +1426,15 @@ static int ocfs2_recover_orphans(struct ocfs2_super *osb,
	if  (!orphan_dir_inode) {
		status = -ENOENT;
		mlog_errno(status);
		goto out;
		return status;
	}	

	mutex_lock(&orphan_dir_inode->i_mutex);
	status = ocfs2_meta_lock(orphan_dir_inode, NULL, NULL, 0);
	if (status < 0) {
		mutex_unlock(&orphan_dir_inode->i_mutex);
		mlog_errno(status);
		goto out;
	}
	have_disk_lock = 1;

	offset = 0;
	iter = NULL;
@@ -1451,11 +1445,10 @@ static int ocfs2_recover_orphans(struct ocfs2_super *osb,
		if (!bh)
			status = -EINVAL;
		if (status < 0) {
			mutex_unlock(&orphan_dir_inode->i_mutex);
			if (bh)
				brelse(bh);
			mlog_errno(status);
			goto out;
			goto out_unlock;
		}

		local = 0;
@@ -1465,11 +1458,10 @@ static int ocfs2_recover_orphans(struct ocfs2_super *osb,

			if (!ocfs2_check_dir_entry(orphan_dir_inode,
						  de, bh, local)) {
				mutex_unlock(&orphan_dir_inode->i_mutex);
				status = -EINVAL;
				mlog_errno(status);
				brelse(bh);
				goto out;
				goto out_unlock;
			}

			local += le16_to_cpu(de->rec_len);
@@ -1504,18 +1496,95 @@ static int ocfs2_recover_orphans(struct ocfs2_super *osb,

			mlog(0, "queue orphan %"MLFu64"\n",
			     OCFS2_I(iter)->ip_blkno);
			OCFS2_I(iter)->ip_next_orphan = inode;
			inode = iter;
			/* No locking is required for the next_orphan
			 * queue as there is only ever a single
			 * process doing orphan recovery. */
			OCFS2_I(iter)->ip_next_orphan = *head;
			*head = iter;
		}
		brelse(bh);
	}
	mutex_unlock(&orphan_dir_inode->i_mutex);

out_unlock:
	ocfs2_meta_unlock(orphan_dir_inode, 0);
	have_disk_lock = 0;

out:
	mutex_unlock(&orphan_dir_inode->i_mutex);
	iput(orphan_dir_inode);
	orphan_dir_inode = NULL;
	return status;
}

static int ocfs2_orphan_recovery_can_continue(struct ocfs2_super *osb,
					      int slot)
{
	int ret;

	spin_lock(&osb->osb_lock);
	ret = !osb->osb_orphan_wipes[slot];
	spin_unlock(&osb->osb_lock);
	return ret;
}

static void ocfs2_mark_recovering_orphan_dir(struct ocfs2_super *osb,
					     int slot)
{
	spin_lock(&osb->osb_lock);
	/* Mark ourselves such that new processes in delete_inode()
	 * know to quit early. */
	ocfs2_node_map_set_bit(osb, &osb->osb_recovering_orphan_dirs, slot);
	while (osb->osb_orphan_wipes[slot]) {
		/* If any processes are already in the middle of an
		 * orphan wipe on this dir, then we need to wait for
		 * them. */
		spin_unlock(&osb->osb_lock);
		wait_event_interruptible(osb->osb_wipe_event,
					 ocfs2_orphan_recovery_can_continue(osb, slot));
		spin_lock(&osb->osb_lock);
	}
	spin_unlock(&osb->osb_lock);
}

static void ocfs2_clear_recovering_orphan_dir(struct ocfs2_super *osb,
					      int slot)
{
	ocfs2_node_map_clear_bit(osb, &osb->osb_recovering_orphan_dirs, slot);
}

/*
 * Orphan recovery. Each mounted node has it's own orphan dir which we
 * must run during recovery. Our strategy here is to build a list of
 * the inodes in the orphan dir and iget/iput them. The VFS does
 * (most) of the rest of the work.
 *
 * Orphan recovery can happen at any time, not just mount so we have a
 * couple of extra considerations.
 *
 * - We grab as many inodes as we can under the orphan dir lock -
 *   doing iget() outside the orphan dir risks getting a reference on
 *   an invalid inode.
 * - We must be sure not to deadlock with other processes on the
 *   system wanting to run delete_inode(). This can happen when they go
 *   to lock the orphan dir and the orphan recovery process attempts to
 *   iget() inside the orphan dir lock. This can be avoided by
 *   advertising our state to ocfs2_delete_inode().
 */
static int ocfs2_recover_orphans(struct ocfs2_super *osb,
				 int slot)
{
	int ret = 0;
	struct inode *inode = NULL;
	struct inode *iter;
	struct ocfs2_inode_info *oi;

	mlog(0, "Recover inodes from orphan dir in slot %d\n", slot);

	ocfs2_mark_recovering_orphan_dir(osb, slot);
	ret = ocfs2_queue_orphans(osb, slot, &inode);
	ocfs2_clear_recovering_orphan_dir(osb, slot);

	/* Error here should be noted, but we want to continue with as
	 * many queued inodes as we've got. */
	if (ret)
		mlog_errno(ret);

	while (inode) {
		oi = OCFS2_I(inode);
@@ -1541,14 +1610,7 @@ static int ocfs2_recover_orphans(struct ocfs2_super *osb,
		inode = iter;
	}

out:
	if (have_disk_lock)
		ocfs2_meta_unlock(orphan_dir_inode, 0);

	if (orphan_dir_inode)
		iput(orphan_dir_inode);

	return status;
	return ret;
}

static int ocfs2_wait_on_mount(struct ocfs2_super *osb)
+4 −0
Original line number Diff line number Diff line
@@ -287,6 +287,10 @@ struct ocfs2_super
	struct inode			*osb_tl_inode;
	struct buffer_head		*osb_tl_bh;
	struct work_struct		osb_truncate_log_wq;

	struct ocfs2_node_map		osb_recovering_orphan_dirs;
	unsigned int			*osb_orphan_wipes;
	wait_queue_head_t		osb_wipe_event;
};

#define OCFS2_SB(sb)	    ((struct ocfs2_super *)(sb)->s_fs_info)
+11 −0
Original line number Diff line number Diff line
@@ -1325,6 +1325,16 @@ static int ocfs2_initialize_super(struct super_block *sb,
	}
	mlog(ML_NOTICE, "max_slots for this device: %u\n", osb->max_slots);

	init_waitqueue_head(&osb->osb_wipe_event);
	osb->osb_orphan_wipes = kcalloc(osb->max_slots,
					sizeof(*osb->osb_orphan_wipes),
					GFP_KERNEL);
	if (!osb->osb_orphan_wipes) {
		status = -ENOMEM;
		mlog_errno(status);
		goto bail;
	}

	osb->s_feature_compat =
		le32_to_cpu(OCFS2_RAW_SB(di)->s_feature_compat);
	osb->s_feature_ro_compat =
@@ -1638,6 +1648,7 @@ static void ocfs2_delete_osb(struct ocfs2_super *osb)
	if (osb->slot_info)
		ocfs2_free_slot_info(osb->slot_info);

	kfree(osb->osb_orphan_wipes);
	/* FIXME
	 * This belongs in journal shutdown, but because we have to
	 * allocate osb->journal at the start of ocfs2_initalize_osb(),