Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit 8304d6f2 authored by David Teigland's avatar David Teigland
Browse files

dlm: record full callback state



Change how callbacks are recorded for locks.  Previously, information
about multiple callbacks was combined into a couple of variables that
indicated what the end result should be.  In some situations, we
could not tell from this combined state what the exact sequence of
callbacks were, and would end up either delivering the callbacks in
the wrong order, or suppress redundant callbacks incorrectly.  This
new approach records all the data for each callback, leaving no
uncertainty about what needs to be delivered.

Signed-off-by: default avatarDavid Teigland <teigland@redhat.com>
parent 35d34df7
Loading
Loading
Loading
Loading
+198 −59
Original line number Diff line number Diff line
@@ -18,6 +18,7 @@

#define WAKE_ASTS  0

static uint64_t			ast_seq_count;
static struct list_head		ast_queue;
static spinlock_t		ast_queue_lock;
static struct task_struct *	astd_task;
@@ -25,40 +26,186 @@ static unsigned long astd_wakeflags;
static struct mutex		astd_running;


static void dlm_dump_lkb_callbacks(struct dlm_lkb *lkb)
{
	int i;

	log_print("last_bast %x %llu flags %x mode %d sb %d %x",
		  lkb->lkb_id,
		  (unsigned long long)lkb->lkb_last_bast.seq,
		  lkb->lkb_last_bast.flags,
		  lkb->lkb_last_bast.mode,
		  lkb->lkb_last_bast.sb_status,
		  lkb->lkb_last_bast.sb_flags);

	log_print("last_cast %x %llu flags %x mode %d sb %d %x",
		  lkb->lkb_id,
		  (unsigned long long)lkb->lkb_last_cast.seq,
		  lkb->lkb_last_cast.flags,
		  lkb->lkb_last_cast.mode,
		  lkb->lkb_last_cast.sb_status,
		  lkb->lkb_last_cast.sb_flags);

	for (i = 0; i < DLM_CALLBACKS_SIZE; i++) {
		log_print("cb %x %llu flags %x mode %d sb %d %x",
			  lkb->lkb_id,
			  (unsigned long long)lkb->lkb_callbacks[i].seq,
			  lkb->lkb_callbacks[i].flags,
			  lkb->lkb_callbacks[i].mode,
			  lkb->lkb_callbacks[i].sb_status,
			  lkb->lkb_callbacks[i].sb_flags);
	}
}

void dlm_del_ast(struct dlm_lkb *lkb)
{
	spin_lock(&ast_queue_lock);
	if (lkb->lkb_ast_type & (AST_COMP | AST_BAST))
		list_del(&lkb->lkb_astqueue);
	if (!list_empty(&lkb->lkb_astqueue))
		list_del_init(&lkb->lkb_astqueue);
	spin_unlock(&ast_queue_lock);
}

void dlm_add_ast(struct dlm_lkb *lkb, int type, int mode)
int dlm_add_lkb_callback(struct dlm_lkb *lkb, uint32_t flags, int mode,
			 int status, uint32_t sbflags, uint64_t seq)
{
	struct dlm_ls *ls = lkb->lkb_resource->res_ls;
	uint64_t prev_seq;
	int prev_mode;
	int i;

	for (i = 0; i < DLM_CALLBACKS_SIZE; i++) {
		if (lkb->lkb_callbacks[i].seq)
			continue;

		/*
		 * Suppress some redundant basts here, do more on removal.
		 * Don't even add a bast if the callback just before it
		 * is a bast for the same mode or a more restrictive mode.
		 * (the addional > PR check is needed for PR/CW inversion)
		 */

		if ((i > 0) && (flags & DLM_CB_BAST) &&
		    (lkb->lkb_callbacks[i-1].flags & DLM_CB_BAST)) {

			prev_seq = lkb->lkb_callbacks[i-1].seq;
			prev_mode = lkb->lkb_callbacks[i-1].mode;

			if ((prev_mode == mode) ||
			    (prev_mode > mode && prev_mode > DLM_LOCK_PR)) {

				log_debug(ls, "skip %x add bast %llu mode %d "
					  "for bast %llu mode %d",
					  lkb->lkb_id,
					  (unsigned long long)seq,
					  mode,
					  (unsigned long long)prev_seq,
					  prev_mode);
				return 0;
			}
		}

		lkb->lkb_callbacks[i].seq = seq;
		lkb->lkb_callbacks[i].flags = flags;
		lkb->lkb_callbacks[i].mode = mode;
		lkb->lkb_callbacks[i].sb_status = status;
		lkb->lkb_callbacks[i].sb_flags = (sbflags & 0x000000FF);
		break;
	}

	if (i == DLM_CALLBACKS_SIZE) {
		log_error(ls, "no callbacks %x %llu flags %x mode %d sb %d %x",
			  lkb->lkb_id, (unsigned long long)seq,
			  flags, mode, status, sbflags);
		dlm_dump_lkb_callbacks(lkb);
		return -1;
	}

	return 0;
}

int dlm_rem_lkb_callback(struct dlm_ls *ls, struct dlm_lkb *lkb,
			 struct dlm_callback *cb, int *resid)
{
	int i;

	*resid = 0;

	if (!lkb->lkb_callbacks[0].seq)
		return -ENOENT;

	/* oldest undelivered cb is callbacks[0] */

	memcpy(cb, &lkb->lkb_callbacks[0], sizeof(struct dlm_callback));
	memset(&lkb->lkb_callbacks[0], 0, sizeof(struct dlm_callback));

	/* shift others down */

	for (i = 1; i < DLM_CALLBACKS_SIZE; i++) {
		if (!lkb->lkb_callbacks[i].seq)
			break;
		memcpy(&lkb->lkb_callbacks[i-1], &lkb->lkb_callbacks[i],
		       sizeof(struct dlm_callback));
		memset(&lkb->lkb_callbacks[i], 0, sizeof(struct dlm_callback));
		(*resid)++;
	}

	/* if cb is a bast, it should be skipped if the blocking mode is
	   compatible with the last granted mode */

	if ((cb->flags & DLM_CB_BAST) && lkb->lkb_last_cast.seq) {
		if (dlm_modes_compat(cb->mode, lkb->lkb_last_cast.mode)) {
			cb->flags |= DLM_CB_SKIP;

			log_debug(ls, "skip %x bast %llu mode %d "
				  "for cast %llu mode %d",
				  lkb->lkb_id,
				  (unsigned long long)cb->seq,
				  cb->mode,
				  (unsigned long long)lkb->lkb_last_cast.seq,
				  lkb->lkb_last_cast.mode);
			return 0;
		}
	}

	if (cb->flags & DLM_CB_CAST) {
		memcpy(&lkb->lkb_last_cast, cb, sizeof(struct dlm_callback));
		lkb->lkb_last_cast_time = ktime_get();
	}

	if (cb->flags & DLM_CB_BAST) {
		memcpy(&lkb->lkb_last_bast, cb, sizeof(struct dlm_callback));
		lkb->lkb_last_bast_time = ktime_get();
	}

	return 0;
}

void dlm_add_ast(struct dlm_lkb *lkb, uint32_t flags, int mode, int status,
		 uint32_t sbflags)
{
	uint64_t seq;
	int rv;

	spin_lock(&ast_queue_lock);

	seq = ++ast_seq_count;

	if (lkb->lkb_flags & DLM_IFL_USER) {
		dlm_user_add_ast(lkb, type, mode);
		spin_unlock(&ast_queue_lock);
		dlm_user_add_ast(lkb, flags, mode, status, sbflags, seq);
		return;
	}

	spin_lock(&ast_queue_lock);
	if (!(lkb->lkb_ast_type & (AST_COMP | AST_BAST))) {
	rv = dlm_add_lkb_callback(lkb, flags, mode, status, sbflags, seq);
	if (rv < 0) {
		spin_unlock(&ast_queue_lock);
		return;
	}

	if (list_empty(&lkb->lkb_astqueue)) {
		kref_get(&lkb->lkb_ref);
		list_add_tail(&lkb->lkb_astqueue, &ast_queue);
		lkb->lkb_ast_first = type;
	}

	/* sanity check, this should not happen */

	if ((type == AST_COMP) && (lkb->lkb_ast_type & AST_COMP))
		log_print("repeat cast %d castmode %d lock %x %s",
			  mode, lkb->lkb_castmode,
			  lkb->lkb_id, lkb->lkb_resource->res_name);

	lkb->lkb_ast_type |= type;
	if (type == AST_BAST)
		lkb->lkb_bastmode = mode;
	else
		lkb->lkb_castmode = mode;
	spin_unlock(&ast_queue_lock);

	set_bit(WAKE_ASTS, &astd_wakeflags);
@@ -72,7 +219,8 @@ static void process_asts(void)
	struct dlm_lkb *lkb;
	void (*castfn) (void *astparam);
	void (*bastfn) (void *astparam, int mode);
	int type, first, bastmode, castmode, do_bast, do_cast, last_castmode;
	struct dlm_callback callbacks[DLM_CALLBACKS_SIZE];
	int i, rv, resid;

repeat:
	spin_lock(&ast_queue_lock);
@@ -83,54 +231,45 @@ static void process_asts(void)
		if (dlm_locking_stopped(ls))
			continue;

		list_del(&lkb->lkb_astqueue);
		type = lkb->lkb_ast_type;
		lkb->lkb_ast_type = 0;
		first = lkb->lkb_ast_first;
		lkb->lkb_ast_first = 0;
		bastmode = lkb->lkb_bastmode;
		castmode = lkb->lkb_castmode;
		/* we remove from astqueue list and remove everything in
		   lkb_callbacks before releasing the spinlock so empty
		   lkb_astqueue is always consistent with empty lkb_callbacks */

		list_del_init(&lkb->lkb_astqueue);

		castfn = lkb->lkb_astfn;
		bastfn = lkb->lkb_bastfn;
		spin_unlock(&ast_queue_lock);

		do_cast = (type & AST_COMP) && castfn;
		do_bast = (type & AST_BAST) && bastfn;
		memset(&callbacks, 0, sizeof(callbacks));

		/* Skip a bast if its blocking mode is compatible with the
		   granted mode of the preceding cast. */
		for (i = 0; i < DLM_CALLBACKS_SIZE; i++) {
			rv = dlm_rem_lkb_callback(ls, lkb, &callbacks[i], &resid);
			if (rv < 0)
				break;
		}
		spin_unlock(&ast_queue_lock);

		if (do_bast) {
			if (first == AST_COMP)
				last_castmode = castmode;
			else
				last_castmode = lkb->lkb_castmode_done;
			if (dlm_modes_compat(bastmode, last_castmode))
				do_bast = 0;
		if (resid) {
			/* shouldn't happen, for loop should have removed all */
			log_error(ls, "callback resid %d lkb %x",
				  resid, lkb->lkb_id);
		}

		if (first == AST_COMP) {
			if (do_cast)
				castfn(lkb->lkb_astparam);
			if (do_bast)
				bastfn(lkb->lkb_astparam, bastmode);
		} else if (first == AST_BAST) {
			if (do_bast)
				bastfn(lkb->lkb_astparam, bastmode);
			if (do_cast)
		for (i = 0; i < DLM_CALLBACKS_SIZE; i++) {
			if (!callbacks[i].seq)
				break;
			if (callbacks[i].flags & DLM_CB_SKIP) {
				continue;
			} else if (callbacks[i].flags & DLM_CB_BAST) {
				bastfn(lkb->lkb_astparam, callbacks[i].mode);
			} else if (callbacks[i].flags & DLM_CB_CAST) {
				lkb->lkb_lksb->sb_status = callbacks[i].sb_status;
				lkb->lkb_lksb->sb_flags = callbacks[i].sb_flags;
				castfn(lkb->lkb_astparam);
		} else {
			log_error(ls, "bad ast_first %d ast_type %d",
				  first, type);
			}
		}

		if (do_cast)
			lkb->lkb_castmode_done = castmode;
		if (do_bast)
			lkb->lkb_bastmode_done = bastmode;

		/* this removes the reference added by dlm_add_ast
		   and may result in the lkb being freed */
		/* removes ref for ast_queue, may cause lkb to be freed */
		dlm_put_lkb(lkb);

		cond_resched();
+6 −1
Original line number Diff line number Diff line
@@ -13,8 +13,13 @@
#ifndef __ASTD_DOT_H__
#define __ASTD_DOT_H__

void dlm_add_ast(struct dlm_lkb *lkb, int type, int mode);
void dlm_del_ast(struct dlm_lkb *lkb);
int dlm_add_lkb_callback(struct dlm_lkb *lkb, uint32_t flags, int mode,
                         int status, uint32_t sbflags, uint64_t seq);
int dlm_rem_lkb_callback(struct dlm_ls *ls, struct dlm_lkb *lkb,
                         struct dlm_callback *cb, int *resid);
void dlm_add_ast(struct dlm_lkb *lkb, uint32_t flags, int mode, int status,
		 uint32_t sbflags);

void dlm_astd_wake(void);
int dlm_astd_start(void);
+2 −2
Original line number Diff line number Diff line
@@ -257,12 +257,12 @@ static int print_format3_lock(struct seq_file *s, struct dlm_lkb *lkb,
			lkb->lkb_status,
			lkb->lkb_grmode,
			lkb->lkb_rqmode,
			lkb->lkb_bastmode,
			lkb->lkb_last_bast.mode,
			rsb_lookup,
			lkb->lkb_wait_type,
			lkb->lkb_lvbseq,
			(unsigned long long)ktime_to_ns(lkb->lkb_timestamp),
			(unsigned long long)ktime_to_ns(lkb->lkb_time_bast));
			(unsigned long long)ktime_to_ns(lkb->lkb_last_bast_time));
	return rv;
}

+20 −15
Original line number Diff line number Diff line
@@ -192,11 +192,6 @@ struct dlm_args {
 * lkb is a process copy, the nodeid specifies the lock master.
 */

/* lkb_ast_type */

#define AST_COMP		1
#define AST_BAST		2

/* lkb_status */

#define DLM_LKSTS_WAITING	1
@@ -217,6 +212,20 @@ struct dlm_args {
#define DLM_IFL_USER		0x00000001
#define DLM_IFL_ORPHAN		0x00000002

#define DLM_CALLBACKS_SIZE	6

#define DLM_CB_CAST		0x00000001
#define DLM_CB_BAST		0x00000002
#define DLM_CB_SKIP		0x00000004

struct dlm_callback {
	uint64_t		seq;
	uint32_t		flags;		/* DLM_CBF_ */
	int			sb_status;	/* copy to lksb status */
	uint8_t			sb_flags;	/* copy to lksb flags */
	int8_t			mode; /* rq mode of bast, gr mode of cast */
};

struct dlm_lkb {
	struct dlm_rsb		*lkb_resource;	/* the rsb */
	struct kref		lkb_ref;
@@ -236,13 +245,6 @@ struct dlm_lkb {

	int8_t			lkb_wait_type;	/* type of reply waiting for */
	int8_t			lkb_wait_count;
	int8_t			lkb_ast_type;	/* type of ast queued for */
	int8_t			lkb_ast_first;	/* type of first ast queued */

	int8_t			lkb_bastmode;	/* req mode of queued bast */
	int8_t			lkb_castmode;	/* gr mode of queued cast */
	int8_t			lkb_bastmode_done; /* last delivered bastmode */
	int8_t			lkb_castmode_done; /* last delivered castmode */

	struct list_head	lkb_idtbl_list;	/* lockspace lkbtbl */
	struct list_head	lkb_statequeue;	/* rsb g/c/w list */
@@ -251,10 +253,15 @@ struct dlm_lkb {
	struct list_head	lkb_astqueue;	/* need ast to be sent */
	struct list_head	lkb_ownqueue;	/* list of locks for a process */
	struct list_head	lkb_time_list;
	ktime_t			lkb_time_bast;	/* for debugging */
	ktime_t			lkb_timestamp;
	unsigned long		lkb_timeout_cs;

	struct dlm_callback	lkb_callbacks[DLM_CALLBACKS_SIZE];
	struct dlm_callback	lkb_last_cast;
	struct dlm_callback	lkb_last_bast;
	ktime_t			lkb_last_cast_time;	/* for debugging */
	ktime_t			lkb_last_bast_time;	/* for debugging */

	char			*lkb_lvbptr;
	struct dlm_lksb		*lkb_lksb;      /* caller's status block */
	void			(*lkb_astfn) (void *astparam);
@@ -544,8 +551,6 @@ struct dlm_user_args {
					  (dlm_user_proc) on the struct file,
					  the process's locks point back to it*/
	struct dlm_lksb		lksb;
	int			old_mode;
	int			update_user_lvb;
	struct dlm_lksb __user	*user_lksb;
	void __user		*castparam;
	void __user		*castaddr;
+17 −21
Original line number Diff line number Diff line
@@ -160,10 +160,10 @@ static const int __quecvt_compat_matrix[8][8] = {
void dlm_print_lkb(struct dlm_lkb *lkb)
{
	printk(KERN_ERR "lkb: nodeid %d id %x remid %x exflags %x flags %x\n"
	       "     status %d rqmode %d grmode %d wait_type %d ast_type %d\n",
	       "     status %d rqmode %d grmode %d wait_type %d\n",
	       lkb->lkb_nodeid, lkb->lkb_id, lkb->lkb_remid, lkb->lkb_exflags,
	       lkb->lkb_flags, lkb->lkb_status, lkb->lkb_rqmode,
	       lkb->lkb_grmode, lkb->lkb_wait_type, lkb->lkb_ast_type);
	       lkb->lkb_grmode, lkb->lkb_wait_type);
}

static void dlm_print_rsb(struct dlm_rsb *r)
@@ -305,10 +305,7 @@ static void queue_cast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
		rv = -EDEADLK;
	}

	lkb->lkb_lksb->sb_status = rv;
	lkb->lkb_lksb->sb_flags = lkb->lkb_sbflags;

	dlm_add_ast(lkb, AST_COMP, lkb->lkb_grmode);
	dlm_add_ast(lkb, DLM_CB_CAST, lkb->lkb_grmode, rv, lkb->lkb_sbflags);
}

static inline void queue_cast_overlap(struct dlm_rsb *r, struct dlm_lkb *lkb)
@@ -319,13 +316,10 @@ static inline void queue_cast_overlap(struct dlm_rsb *r, struct dlm_lkb *lkb)

static void queue_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rqmode)
{
	lkb->lkb_time_bast = ktime_get();

	if (is_master_copy(lkb)) {
		lkb->lkb_bastmode = rqmode; /* printed by debugfs */
		send_bast(r, lkb, rqmode);
	} else {
		dlm_add_ast(lkb, AST_BAST, rqmode);
		dlm_add_ast(lkb, DLM_CB_BAST, rqmode, 0, 0);
	}
}

@@ -600,6 +594,7 @@ static int create_lkb(struct dlm_ls *ls, struct dlm_lkb **lkb_ret)
	INIT_LIST_HEAD(&lkb->lkb_ownqueue);
	INIT_LIST_HEAD(&lkb->lkb_rsb_lookup);
	INIT_LIST_HEAD(&lkb->lkb_time_list);
	INIT_LIST_HEAD(&lkb->lkb_astqueue);

	get_random_bytes(&bucket, sizeof(bucket));
	bucket &= (ls->ls_lkbtbl_size - 1);
@@ -2819,9 +2814,9 @@ static void send_args(struct dlm_rsb *r, struct dlm_lkb *lkb,
	   not from lkb fields */

	if (lkb->lkb_bastfn)
		ms->m_asts |= AST_BAST;
		ms->m_asts |= DLM_CB_BAST;
	if (lkb->lkb_astfn)
		ms->m_asts |= AST_COMP;
		ms->m_asts |= DLM_CB_CAST;

	/* compare with switch in create_message; send_remove() doesn't
	   use send_args() */
@@ -3122,8 +3117,8 @@ static int receive_request_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
	lkb->lkb_grmode = DLM_LOCK_IV;
	lkb->lkb_rqmode = ms->m_rqmode;

	lkb->lkb_bastfn = (ms->m_asts & AST_BAST) ? &fake_bastfn : NULL;
	lkb->lkb_astfn = (ms->m_asts & AST_COMP) ? &fake_astfn : NULL;
	lkb->lkb_bastfn = (ms->m_asts & DLM_CB_BAST) ? &fake_bastfn : NULL;
	lkb->lkb_astfn = (ms->m_asts & DLM_CB_CAST) ? &fake_astfn : NULL;

	if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
		/* lkb was just created so there won't be an lvb yet */
@@ -4412,8 +4407,8 @@ static int receive_rcom_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
	lkb->lkb_grmode = rl->rl_grmode;
	/* don't set lkb_status because add_lkb wants to itself */

	lkb->lkb_bastfn = (rl->rl_asts & AST_BAST) ? &fake_bastfn : NULL;
	lkb->lkb_astfn = (rl->rl_asts & AST_COMP) ? &fake_astfn : NULL;
	lkb->lkb_bastfn = (rl->rl_asts & DLM_CB_BAST) ? &fake_bastfn : NULL;
	lkb->lkb_astfn = (rl->rl_asts & DLM_CB_CAST) ? &fake_astfn : NULL;

	if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
		int lvblen = rc->rc_header.h_length - sizeof(struct dlm_rcom) -
@@ -4589,7 +4584,6 @@ int dlm_user_request(struct dlm_ls *ls, struct dlm_user_args *ua,
	error = set_lock_args(mode, &ua->lksb, flags, namelen, timeout_cs,
			      fake_astfn, ua, fake_bastfn, &args);
	lkb->lkb_flags |= DLM_IFL_USER;
	ua->old_mode = DLM_LOCK_IV;

	if (error) {
		__put_lkb(ls, lkb);
@@ -4658,7 +4652,6 @@ int dlm_user_convert(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
	ua->bastparam = ua_tmp->bastparam;
	ua->bastaddr = ua_tmp->bastaddr;
	ua->user_lksb = ua_tmp->user_lksb;
	ua->old_mode = lkb->lkb_grmode;

	error = set_lock_args(mode, &ua->lksb, flags, 0, timeout_cs,
			      fake_astfn, ua, fake_bastfn, &args);
@@ -4917,8 +4910,9 @@ void dlm_clear_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc)
	}

	list_for_each_entry_safe(lkb, safe, &proc->asts, lkb_astqueue) {
		lkb->lkb_ast_type = 0;
		list_del(&lkb->lkb_astqueue);
		memset(&lkb->lkb_callbacks, 0,
		       sizeof(struct dlm_callback) * DLM_CALLBACKS_SIZE);
		list_del_init(&lkb->lkb_astqueue);
		dlm_put_lkb(lkb);
	}

@@ -4958,7 +4952,9 @@ static void purge_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc)

	spin_lock(&proc->asts_spin);
	list_for_each_entry_safe(lkb, safe, &proc->asts, lkb_astqueue) {
		list_del(&lkb->lkb_astqueue);
		memset(&lkb->lkb_callbacks, 0,
		       sizeof(struct dlm_callback) * DLM_CALLBACKS_SIZE);
		list_del_init(&lkb->lkb_astqueue);
		dlm_put_lkb(lkb);
	}
	spin_unlock(&proc->asts_spin);
Loading