Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit df3256f9 authored by Linus Torvalds's avatar Linus Torvalds
Browse files
* 'for-linus' of git://git.kernel.org/pub/scm/linux/kernel/git/teigland/dlm:
  dlm: make plock operation killable
  dlm: remove shared message stub for recovery
  dlm: delayed reply message warning
  dlm: Remove superfluous call to recalc_sigpending()
parents b0ca118d 901025d2
Loading
Loading
Loading
Loading
+8 −1
Original line number Diff line number Diff line
@@ -100,6 +100,7 @@ struct dlm_cluster {
	unsigned int cl_log_debug;
	unsigned int cl_protocol;
	unsigned int cl_timewarn_cs;
	unsigned int cl_waitwarn_us;
};

enum {
@@ -114,6 +115,7 @@ enum {
	CLUSTER_ATTR_LOG_DEBUG,
	CLUSTER_ATTR_PROTOCOL,
	CLUSTER_ATTR_TIMEWARN_CS,
	CLUSTER_ATTR_WAITWARN_US,
};

struct cluster_attribute {
@@ -166,6 +168,7 @@ CLUSTER_ATTR(scan_secs, 1);
CLUSTER_ATTR(log_debug, 0);
CLUSTER_ATTR(protocol, 0);
CLUSTER_ATTR(timewarn_cs, 1);
CLUSTER_ATTR(waitwarn_us, 0);

static struct configfs_attribute *cluster_attrs[] = {
	[CLUSTER_ATTR_TCP_PORT] = &cluster_attr_tcp_port.attr,
@@ -179,6 +182,7 @@ static struct configfs_attribute *cluster_attrs[] = {
	[CLUSTER_ATTR_LOG_DEBUG] = &cluster_attr_log_debug.attr,
	[CLUSTER_ATTR_PROTOCOL] = &cluster_attr_protocol.attr,
	[CLUSTER_ATTR_TIMEWARN_CS] = &cluster_attr_timewarn_cs.attr,
	[CLUSTER_ATTR_WAITWARN_US] = &cluster_attr_waitwarn_us.attr,
	NULL,
};

@@ -439,6 +443,7 @@ static struct config_group *make_cluster(struct config_group *g,
	cl->cl_log_debug = dlm_config.ci_log_debug;
	cl->cl_protocol = dlm_config.ci_protocol;
	cl->cl_timewarn_cs = dlm_config.ci_timewarn_cs;
	cl->cl_waitwarn_us = dlm_config.ci_waitwarn_us;

	space_list = &sps->ss_group;
	comm_list = &cms->cs_group;
@@ -986,6 +991,7 @@ int dlm_our_addr(struct sockaddr_storage *addr, int num)
#define DEFAULT_LOG_DEBUG          0
#define DEFAULT_PROTOCOL           0
#define DEFAULT_TIMEWARN_CS      500 /* 5 sec = 500 centiseconds */
#define DEFAULT_WAITWARN_US	   0

struct dlm_config_info dlm_config = {
	.ci_tcp_port = DEFAULT_TCP_PORT,
@@ -998,6 +1004,7 @@ struct dlm_config_info dlm_config = {
	.ci_scan_secs = DEFAULT_SCAN_SECS,
	.ci_log_debug = DEFAULT_LOG_DEBUG,
	.ci_protocol = DEFAULT_PROTOCOL,
	.ci_timewarn_cs = DEFAULT_TIMEWARN_CS
	.ci_timewarn_cs = DEFAULT_TIMEWARN_CS,
	.ci_waitwarn_us = DEFAULT_WAITWARN_US
};
+1 −0
Original line number Diff line number Diff line
@@ -28,6 +28,7 @@ struct dlm_config_info {
	int ci_log_debug;
	int ci_protocol;
	int ci_timewarn_cs;
	int ci_waitwarn_us;
};

extern struct dlm_config_info dlm_config;
+3 −0
Original line number Diff line number Diff line
@@ -209,6 +209,7 @@ struct dlm_args {
#define DLM_IFL_WATCH_TIMEWARN	0x00400000
#define DLM_IFL_TIMEOUT_CANCEL	0x00800000
#define DLM_IFL_DEADLOCK_CANCEL	0x01000000
#define DLM_IFL_STUB_MS		0x02000000 /* magic number for m_flags */
#define DLM_IFL_USER		0x00000001
#define DLM_IFL_ORPHAN		0x00000002

@@ -245,6 +246,7 @@ struct dlm_lkb {

	int8_t			lkb_wait_type;	/* type of reply waiting for */
	int8_t			lkb_wait_count;
	int			lkb_wait_nodeid; /* for debugging */

	struct list_head	lkb_idtbl_list;	/* lockspace lkbtbl */
	struct list_head	lkb_statequeue;	/* rsb g/c/w list */
@@ -254,6 +256,7 @@ struct dlm_lkb {
	struct list_head	lkb_ownqueue;	/* list of locks for a process */
	struct list_head	lkb_time_list;
	ktime_t			lkb_timestamp;
	ktime_t			lkb_wait_time;
	unsigned long		lkb_timeout_cs;

	struct dlm_callback	lkb_callbacks[DLM_CALLBACKS_SIZE];
+142 −40
Original line number Diff line number Diff line
@@ -799,10 +799,84 @@ static int msg_reply_type(int mstype)
	return -1;
}

static int nodeid_warned(int nodeid, int num_nodes, int *warned)
{
	int i;

	for (i = 0; i < num_nodes; i++) {
		if (!warned[i]) {
			warned[i] = nodeid;
			return 0;
		}
		if (warned[i] == nodeid)
			return 1;
	}
	return 0;
}

void dlm_scan_waiters(struct dlm_ls *ls)
{
	struct dlm_lkb *lkb;
	ktime_t zero = ktime_set(0, 0);
	s64 us;
	s64 debug_maxus = 0;
	u32 debug_scanned = 0;
	u32 debug_expired = 0;
	int num_nodes = 0;
	int *warned = NULL;

	if (!dlm_config.ci_waitwarn_us)
		return;

	mutex_lock(&ls->ls_waiters_mutex);

	list_for_each_entry(lkb, &ls->ls_waiters, lkb_wait_reply) {
		if (ktime_equal(lkb->lkb_wait_time, zero))
			continue;

		debug_scanned++;

		us = ktime_to_us(ktime_sub(ktime_get(), lkb->lkb_wait_time));

		if (us < dlm_config.ci_waitwarn_us)
			continue;

		lkb->lkb_wait_time = zero;

		debug_expired++;
		if (us > debug_maxus)
			debug_maxus = us;

		if (!num_nodes) {
			num_nodes = ls->ls_num_nodes;
			warned = kmalloc(GFP_KERNEL, num_nodes * sizeof(int));
			if (warned)
				memset(warned, 0, num_nodes * sizeof(int));
		}
		if (!warned)
			continue;
		if (nodeid_warned(lkb->lkb_wait_nodeid, num_nodes, warned))
			continue;

		log_error(ls, "waitwarn %x %lld %d us check connection to "
			  "node %d", lkb->lkb_id, (long long)us,
			  dlm_config.ci_waitwarn_us, lkb->lkb_wait_nodeid);
	}
	mutex_unlock(&ls->ls_waiters_mutex);

	if (warned)
		kfree(warned);

	if (debug_expired)
		log_debug(ls, "scan_waiters %u warn %u over %d us max %lld us",
			  debug_scanned, debug_expired,
			  dlm_config.ci_waitwarn_us, (long long)debug_maxus);
}

/* add/remove lkb from global waiters list of lkb's waiting for
   a reply from a remote node */

static int add_to_waiters(struct dlm_lkb *lkb, int mstype)
static int add_to_waiters(struct dlm_lkb *lkb, int mstype, int to_nodeid)
{
	struct dlm_ls *ls = lkb->lkb_resource->res_ls;
	int error = 0;
@@ -842,6 +916,8 @@ static int add_to_waiters(struct dlm_lkb *lkb, int mstype)

	lkb->lkb_wait_count++;
	lkb->lkb_wait_type = mstype;
	lkb->lkb_wait_time = ktime_get();
	lkb->lkb_wait_nodeid = to_nodeid; /* for debugging */
	hold_lkb(lkb);
	list_add(&lkb->lkb_wait_reply, &ls->ls_waiters);
 out:
@@ -961,10 +1037,10 @@ static int remove_from_waiters_ms(struct dlm_lkb *lkb, struct dlm_message *ms)
	struct dlm_ls *ls = lkb->lkb_resource->res_ls;
	int error;

	if (ms != &ls->ls_stub_ms)
	if (ms->m_flags != DLM_IFL_STUB_MS)
		mutex_lock(&ls->ls_waiters_mutex);
	error = _remove_from_waiters(lkb, ms->m_type, ms);
	if (ms != &ls->ls_stub_ms)
	if (ms->m_flags != DLM_IFL_STUB_MS)
		mutex_unlock(&ls->ls_waiters_mutex);
	return error;
}
@@ -1157,6 +1233,16 @@ void dlm_adjust_timeouts(struct dlm_ls *ls)
	list_for_each_entry(lkb, &ls->ls_timeout, lkb_time_list)
		lkb->lkb_timestamp = ktime_add_us(lkb->lkb_timestamp, adj_us);
	mutex_unlock(&ls->ls_timeout_mutex);

	if (!dlm_config.ci_waitwarn_us)
		return;

	mutex_lock(&ls->ls_waiters_mutex);
	list_for_each_entry(lkb, &ls->ls_waiters, lkb_wait_reply) {
		if (ktime_to_us(lkb->lkb_wait_time))
			lkb->lkb_wait_time = ktime_get();
	}
	mutex_unlock(&ls->ls_waiters_mutex);
}

/* lkb is master or local copy */
@@ -1376,14 +1462,8 @@ static void grant_lock_pending(struct dlm_rsb *r, struct dlm_lkb *lkb)
   ALTPR/ALTCW: our rqmode may have been changed to PR or CW to become
   compatible with other granted locks */

static void munge_demoted(struct dlm_lkb *lkb, struct dlm_message *ms)
static void munge_demoted(struct dlm_lkb *lkb)
{
	if (ms->m_type != DLM_MSG_CONVERT_REPLY) {
		log_print("munge_demoted %x invalid reply type %d",
			  lkb->lkb_id, ms->m_type);
		return;
	}

	if (lkb->lkb_rqmode == DLM_LOCK_IV || lkb->lkb_grmode == DLM_LOCK_IV) {
		log_print("munge_demoted %x invalid modes gr %d rq %d",
			  lkb->lkb_id, lkb->lkb_grmode, lkb->lkb_rqmode);
@@ -2844,12 +2924,12 @@ static int send_common(struct dlm_rsb *r, struct dlm_lkb *lkb, int mstype)
	struct dlm_mhandle *mh;
	int to_nodeid, error;

	error = add_to_waiters(lkb, mstype);
	to_nodeid = r->res_nodeid;

	error = add_to_waiters(lkb, mstype, to_nodeid);
	if (error)
		return error;

	to_nodeid = r->res_nodeid;

	error = create_message(r, lkb, to_nodeid, mstype, &ms, &mh);
	if (error)
		goto fail;
@@ -2880,9 +2960,9 @@ static int send_convert(struct dlm_rsb *r, struct dlm_lkb *lkb)
	/* down conversions go without a reply from the master */
	if (!error && down_conversion(lkb)) {
		remove_from_waiters(lkb, DLM_MSG_CONVERT_REPLY);
		r->res_ls->ls_stub_ms.m_flags = DLM_IFL_STUB_MS;
		r->res_ls->ls_stub_ms.m_type = DLM_MSG_CONVERT_REPLY;
		r->res_ls->ls_stub_ms.m_result = 0;
		r->res_ls->ls_stub_ms.m_flags = lkb->lkb_flags;
		__receive_convert_reply(r, lkb, &r->res_ls->ls_stub_ms);
	}

@@ -2951,12 +3031,12 @@ static int send_lookup(struct dlm_rsb *r, struct dlm_lkb *lkb)
	struct dlm_mhandle *mh;
	int to_nodeid, error;

	error = add_to_waiters(lkb, DLM_MSG_LOOKUP);
	to_nodeid = dlm_dir_nodeid(r);

	error = add_to_waiters(lkb, DLM_MSG_LOOKUP, to_nodeid);
	if (error)
		return error;

	to_nodeid = dlm_dir_nodeid(r);

	error = create_message(r, NULL, to_nodeid, DLM_MSG_LOOKUP, &ms, &mh);
	if (error)
		goto fail;
@@ -3070,6 +3150,9 @@ static void receive_flags(struct dlm_lkb *lkb, struct dlm_message *ms)

static void receive_flags_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
{
	if (ms->m_flags == DLM_IFL_STUB_MS)
		return;

	lkb->lkb_sbflags = ms->m_sbflags;
	lkb->lkb_flags = (lkb->lkb_flags & 0xFFFF0000) |
		         (ms->m_flags & 0x0000FFFF);
@@ -3612,7 +3695,7 @@ static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
		/* convert was queued on remote master */
		receive_flags_reply(lkb, ms);
		if (is_demoted(lkb))
			munge_demoted(lkb, ms);
			munge_demoted(lkb);
		del_lkb(r, lkb);
		add_lkb(r, lkb, DLM_LKSTS_CONVERT);
		add_timeout(lkb);
@@ -3622,7 +3705,7 @@ static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
		/* convert was granted on remote master */
		receive_flags_reply(lkb, ms);
		if (is_demoted(lkb))
			munge_demoted(lkb, ms);
			munge_demoted(lkb);
		grant_lock_pc(r, lkb, ms);
		queue_cast(r, lkb, 0);
		break;
@@ -3996,15 +4079,17 @@ void dlm_receive_buffer(union dlm_packet *p, int nodeid)
	dlm_put_lockspace(ls);
}

static void recover_convert_waiter(struct dlm_ls *ls, struct dlm_lkb *lkb)
static void recover_convert_waiter(struct dlm_ls *ls, struct dlm_lkb *lkb,
				   struct dlm_message *ms_stub)
{
	if (middle_conversion(lkb)) {
		hold_lkb(lkb);
		ls->ls_stub_ms.m_type = DLM_MSG_CONVERT_REPLY;
		ls->ls_stub_ms.m_result = -EINPROGRESS;
		ls->ls_stub_ms.m_flags = lkb->lkb_flags;
		ls->ls_stub_ms.m_header.h_nodeid = lkb->lkb_nodeid;
		_receive_convert_reply(lkb, &ls->ls_stub_ms);
		memset(ms_stub, 0, sizeof(struct dlm_message));
		ms_stub->m_flags = DLM_IFL_STUB_MS;
		ms_stub->m_type = DLM_MSG_CONVERT_REPLY;
		ms_stub->m_result = -EINPROGRESS;
		ms_stub->m_header.h_nodeid = lkb->lkb_nodeid;
		_receive_convert_reply(lkb, ms_stub);

		/* Same special case as in receive_rcom_lock_args() */
		lkb->lkb_grmode = DLM_LOCK_IV;
@@ -4045,13 +4130,27 @@ static int waiter_needs_recovery(struct dlm_ls *ls, struct dlm_lkb *lkb)
void dlm_recover_waiters_pre(struct dlm_ls *ls)
{
	struct dlm_lkb *lkb, *safe;
	struct dlm_message *ms_stub;
	int wait_type, stub_unlock_result, stub_cancel_result;

	ms_stub = kmalloc(GFP_KERNEL, sizeof(struct dlm_message));
	if (!ms_stub) {
		log_error(ls, "dlm_recover_waiters_pre no mem");
		return;
	}

	mutex_lock(&ls->ls_waiters_mutex);

	list_for_each_entry_safe(lkb, safe, &ls->ls_waiters, lkb_wait_reply) {
		log_debug(ls, "pre recover waiter lkid %x type %d flags %x",
			  lkb->lkb_id, lkb->lkb_wait_type, lkb->lkb_flags);

		/* exclude debug messages about unlocks because there can be so
		   many and they aren't very interesting */

		if (lkb->lkb_wait_type != DLM_MSG_UNLOCK) {
			log_debug(ls, "recover_waiter %x nodeid %d "
				  "msg %d to %d", lkb->lkb_id, lkb->lkb_nodeid,
				  lkb->lkb_wait_type, lkb->lkb_wait_nodeid);
		}

		/* all outstanding lookups, regardless of destination  will be
		   resent after recovery is done */
@@ -4097,26 +4196,28 @@ void dlm_recover_waiters_pre(struct dlm_ls *ls)
			break;

		case DLM_MSG_CONVERT:
			recover_convert_waiter(ls, lkb);
			recover_convert_waiter(ls, lkb, ms_stub);
			break;

		case DLM_MSG_UNLOCK:
			hold_lkb(lkb);
			ls->ls_stub_ms.m_type = DLM_MSG_UNLOCK_REPLY;
			ls->ls_stub_ms.m_result = stub_unlock_result;
			ls->ls_stub_ms.m_flags = lkb->lkb_flags;
			ls->ls_stub_ms.m_header.h_nodeid = lkb->lkb_nodeid;
			_receive_unlock_reply(lkb, &ls->ls_stub_ms);
			memset(ms_stub, 0, sizeof(struct dlm_message));
			ms_stub->m_flags = DLM_IFL_STUB_MS;
			ms_stub->m_type = DLM_MSG_UNLOCK_REPLY;
			ms_stub->m_result = stub_unlock_result;
			ms_stub->m_header.h_nodeid = lkb->lkb_nodeid;
			_receive_unlock_reply(lkb, ms_stub);
			dlm_put_lkb(lkb);
			break;

		case DLM_MSG_CANCEL:
			hold_lkb(lkb);
			ls->ls_stub_ms.m_type = DLM_MSG_CANCEL_REPLY;
			ls->ls_stub_ms.m_result = stub_cancel_result;
			ls->ls_stub_ms.m_flags = lkb->lkb_flags;
			ls->ls_stub_ms.m_header.h_nodeid = lkb->lkb_nodeid;
			_receive_cancel_reply(lkb, &ls->ls_stub_ms);
			memset(ms_stub, 0, sizeof(struct dlm_message));
			ms_stub->m_flags = DLM_IFL_STUB_MS;
			ms_stub->m_type = DLM_MSG_CANCEL_REPLY;
			ms_stub->m_result = stub_cancel_result;
			ms_stub->m_header.h_nodeid = lkb->lkb_nodeid;
			_receive_cancel_reply(lkb, ms_stub);
			dlm_put_lkb(lkb);
			break;

@@ -4127,6 +4228,7 @@ void dlm_recover_waiters_pre(struct dlm_ls *ls)
		schedule();
	}
	mutex_unlock(&ls->ls_waiters_mutex);
	kfree(ms_stub);
}

static struct dlm_lkb *find_resend_waiter(struct dlm_ls *ls)
@@ -4191,8 +4293,8 @@ int dlm_recover_waiters_post(struct dlm_ls *ls)
		ou = is_overlap_unlock(lkb);
		err = 0;

		log_debug(ls, "recover_waiters_post %x type %d flags %x %s",
			  lkb->lkb_id, mstype, lkb->lkb_flags, r->res_name);
		log_debug(ls, "recover_waiter %x nodeid %d msg %d r_nodeid %d",
			  lkb->lkb_id, lkb->lkb_nodeid, mstype, r->res_nodeid);

		/* At this point we assume that we won't get a reply to any
		   previous op or overlap op on this lock.  First, do a big
+1 −0
Original line number Diff line number Diff line
@@ -24,6 +24,7 @@ int dlm_put_lkb(struct dlm_lkb *lkb);
void dlm_scan_rsbs(struct dlm_ls *ls);
int dlm_lock_recovery_try(struct dlm_ls *ls);
void dlm_unlock_recovery(struct dlm_ls *ls);
void dlm_scan_waiters(struct dlm_ls *ls);
void dlm_scan_timeout(struct dlm_ls *ls);
void dlm_adjust_timeouts(struct dlm_ls *ls);

Loading