Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit f389e9fc authored by Linus Torvalds's avatar Linus Torvalds
Browse files
* 'for-linus' of git://git.kernel.org/pub/scm/linux/kernel/git/teigland/dlm: (21 commits)
  dlm: static initialization improvements
  dlm: clean ups
  dlm: Sanity check namelen before copying it
  dlm: keep cached master rsbs during recovery
  dlm: change error message to debug
  dlm: fix possible use-after-free
  dlm: limit dir lookup loop
  dlm: reject normal unlock when lock is waiting for lookup
  dlm: validate messages before processing
  dlm: reject messages from non-members
  dlm: another call to confirm_master in receive_request_reply
  dlm: recover locks waiting for overlap replies
  dlm: clear ast_type when removing from astqueue
  dlm: use fixed errno values in messages
  dlm: swap bytes for rcom lock reply
  dlm: align midcomms message buffer
  dlm: close othercons
  dlm: use dlm prefix on alloc and free functions
  dlm: don't print common non-errors
  dlm: proper prototypes
  ...
parents 2419505a 0fe410d3
Loading
Loading
Loading
Loading
+37 −39
Original line number Diff line number Diff line
@@ -49,7 +49,7 @@ static struct dlm_direntry *get_free_de(struct dlm_ls *ls, int len)
	spin_unlock(&ls->ls_recover_list_lock);

	if (!found)
		de = allocate_direntry(ls, len);
		de = kzalloc(sizeof(struct dlm_direntry) + len, GFP_KERNEL);
	return de;
}

@@ -62,7 +62,7 @@ void dlm_clear_free_entries(struct dlm_ls *ls)
		de = list_entry(ls->ls_recover_list.next, struct dlm_direntry,
				list);
		list_del(&de->list);
		free_direntry(de);
		kfree(de);
	}
	spin_unlock(&ls->ls_recover_list_lock);
}
@@ -171,7 +171,7 @@ void dlm_dir_remove_entry(struct dlm_ls *ls, int nodeid, char *name, int namelen
	}

	list_del(&de->list);
	free_direntry(de);
	kfree(de);
 out:
	write_unlock(&ls->ls_dirtbl[bucket].lock);
}
@@ -302,7 +302,7 @@ static int get_entry(struct dlm_ls *ls, int nodeid, char *name,

	write_unlock(&ls->ls_dirtbl[bucket].lock);

	de = allocate_direntry(ls, namelen);
	de = kzalloc(sizeof(struct dlm_direntry) + namelen, GFP_KERNEL);
	if (!de)
		return -ENOMEM;

@@ -313,7 +313,7 @@ static int get_entry(struct dlm_ls *ls, int nodeid, char *name,
	write_lock(&ls->ls_dirtbl[bucket].lock);
	tmp = search_bucket(ls, name, namelen, bucket);
	if (tmp) {
		free_direntry(de);
		kfree(de);
		de = tmp;
	} else {
		list_add_tail(&de->list, &ls->ls_dirtbl[bucket].list);
@@ -329,49 +329,47 @@ int dlm_dir_lookup(struct dlm_ls *ls, int nodeid, char *name, int namelen,
	return get_entry(ls, nodeid, name, namelen, r_nodeid);
}

/* Copy the names of master rsb's into the buffer provided.
   Only select names whose dir node is the given nodeid. */
static struct dlm_rsb *find_rsb_root(struct dlm_ls *ls, char *name, int len)
{
	struct dlm_rsb *r;

	down_read(&ls->ls_root_sem);
	list_for_each_entry(r, &ls->ls_root_list, res_root_list) {
		if (len == r->res_length && !memcmp(name, r->res_name, len)) {
			up_read(&ls->ls_root_sem);
			return r;
		}
	}
	up_read(&ls->ls_root_sem);
	return NULL;
}

/* Find the rsb where we left off (or start again), then send rsb names
   for rsb's we're master of and whose directory node matches the requesting
   node.  inbuf is the rsb name last sent, inlen is the name's length */

void dlm_copy_master_names(struct dlm_ls *ls, char *inbuf, int inlen,
 			   char *outbuf, int outlen, int nodeid)
{
	struct list_head *list;
	struct dlm_rsb *start_r = NULL, *r = NULL;
	int offset = 0, start_namelen, error, dir_nodeid;
	char *start_name;
	struct dlm_rsb *r;
	int offset = 0, dir_nodeid;
	uint16_t be_namelen;

	/*
	 * Find the rsb where we left off (or start again)
	 */

	start_namelen = inlen;
	start_name = inbuf;
	down_read(&ls->ls_root_sem);

	if (start_namelen > 1) {
		/*
		 * We could also use a find_rsb_root() function here that
		 * searched the ls_root_list.
		 */
		error = dlm_find_rsb(ls, start_name, start_namelen, R_MASTER,
				     &start_r);
		DLM_ASSERT(!error && start_r,
			   printk("error %d\n", error););
		DLM_ASSERT(!list_empty(&start_r->res_root_list),
			   dlm_print_rsb(start_r););
		dlm_put_rsb(start_r);
	if (inlen > 1) {
		r = find_rsb_root(ls, inbuf, inlen);
		if (!r) {
			inbuf[inlen - 1] = '\0';
			log_error(ls, "copy_master_names from %d start %d %s",
				  nodeid, inlen, inbuf);
			goto out;
		}

	/*
	 * Send rsb names for rsb's we're master of and whose directory node
	 * matches the requesting node.
	 */

	down_read(&ls->ls_root_sem);
	if (start_r)
		list = start_r->res_root_list.next;
	else
		list = r->res_root_list.next;
	} else {
		list = ls->ls_root_list.next;
	}

	for (offset = 0; list != &ls->ls_root_list; list = list->next) {
		r = list_entry(list, struct dlm_rsb, res_root_list);
+16 −0
Original line number Diff line number Diff line
@@ -570,5 +570,21 @@ static inline int dlm_no_directory(struct dlm_ls *ls)
	return (ls->ls_exflags & DLM_LSFL_NODIR) ? 1 : 0;
}

int dlm_netlink_init(void);
void dlm_netlink_exit(void);
void dlm_timeout_warn(struct dlm_lkb *lkb);

#ifdef CONFIG_DLM_DEBUG
int dlm_register_debugfs(void);
void dlm_unregister_debugfs(void);
int dlm_create_debug_file(struct dlm_ls *ls);
void dlm_delete_debug_file(struct dlm_ls *ls);
#else
static inline int dlm_register_debugfs(void) { return 0; }
static inline void dlm_unregister_debugfs(void) { }
static inline int dlm_create_debug_file(struct dlm_ls *ls) { return 0; }
static inline void dlm_delete_debug_file(struct dlm_ls *ls) { }
#endif

#endif				/* __DLM_INTERNAL_DOT_H__ */
+177 −72
Original line number Diff line number Diff line
/******************************************************************************
*******************************************************************************
**
**  Copyright (C) 2005-2007 Red Hat, Inc.  All rights reserved.
**  Copyright (C) 2005-2008 Red Hat, Inc.  All rights reserved.
**
**  This copyrighted material is made available to anyone wishing to use,
**  modify, copy, or redistribute it subject to the terms and conditions
@@ -88,7 +88,6 @@ static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
static int receive_extralen(struct dlm_message *ms);
static void do_purge(struct dlm_ls *ls, int nodeid, int pid);
static void del_timeout(struct dlm_lkb *lkb);
void dlm_timeout_warn(struct dlm_lkb *lkb);

/*
 * Lock compatibilty matrix - thanks Steve
@@ -335,7 +334,7 @@ static struct dlm_rsb *create_rsb(struct dlm_ls *ls, char *name, int len)
{
	struct dlm_rsb *r;

	r = allocate_rsb(ls, len);
	r = dlm_allocate_rsb(ls, len);
	if (!r)
		return NULL;

@@ -478,7 +477,7 @@ static int find_rsb(struct dlm_ls *ls, char *name, int namelen,
	error = _search_rsb(ls, name, namelen, bucket, 0, &tmp);
	if (!error) {
		write_unlock(&ls->ls_rsbtbl[bucket].lock);
		free_rsb(r);
		dlm_free_rsb(r);
		r = tmp;
		goto out;
	}
@@ -490,12 +489,6 @@ static int find_rsb(struct dlm_ls *ls, char *name, int namelen,
	return error;
}

int dlm_find_rsb(struct dlm_ls *ls, char *name, int namelen,
		 unsigned int flags, struct dlm_rsb **r_ret)
{
	return find_rsb(ls, name, namelen, flags, r_ret);
}

/* This is only called to add a reference when the code already holds
   a valid reference to the rsb, so there's no need for locking. */

@@ -519,7 +512,7 @@ static void toss_rsb(struct kref *kref)
	list_move(&r->res_hashchain, &ls->ls_rsbtbl[r->res_bucket].toss);
	r->res_toss_time = jiffies;
	if (r->res_lvbptr) {
		free_lvb(r->res_lvbptr);
		dlm_free_lvb(r->res_lvbptr);
		r->res_lvbptr = NULL;
	}
}
@@ -589,7 +582,7 @@ static int create_lkb(struct dlm_ls *ls, struct dlm_lkb **lkb_ret)
	uint32_t lkid = 0;
	uint16_t bucket;

	lkb = allocate_lkb(ls);
	lkb = dlm_allocate_lkb(ls);
	if (!lkb)
		return -ENOMEM;

@@ -683,8 +676,8 @@ static int __put_lkb(struct dlm_ls *ls, struct dlm_lkb *lkb)

		/* for local/process lkbs, lvbptr points to caller's lksb */
		if (lkb->lkb_lvbptr && is_master_copy(lkb))
			free_lvb(lkb->lkb_lvbptr);
		free_lkb(lkb);
			dlm_free_lvb(lkb->lkb_lvbptr);
		dlm_free_lkb(lkb);
		return 1;
	} else {
		write_unlock(&ls->ls_lkbtbl[bucket].lock);
@@ -988,7 +981,7 @@ static int shrink_bucket(struct dlm_ls *ls, int b)

			if (is_master(r))
				dir_remove(r);
			free_rsb(r);
			dlm_free_rsb(r);
			count++;
		} else {
			write_unlock(&ls->ls_rsbtbl[b].lock);
@@ -1171,7 +1164,7 @@ static void set_lvb_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
			return;

		if (!r->res_lvbptr)
			r->res_lvbptr = allocate_lvb(r->res_ls);
			r->res_lvbptr = dlm_allocate_lvb(r->res_ls);

		if (!r->res_lvbptr)
			return;
@@ -1203,7 +1196,7 @@ static void set_lvb_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
		return;

	if (!r->res_lvbptr)
		r->res_lvbptr = allocate_lvb(r->res_ls);
		r->res_lvbptr = dlm_allocate_lvb(r->res_ls);

	if (!r->res_lvbptr)
		return;
@@ -1852,7 +1845,7 @@ static void send_blocking_asts_all(struct dlm_rsb *r, struct dlm_lkb *lkb)
static int set_master(struct dlm_rsb *r, struct dlm_lkb *lkb)
{
	struct dlm_ls *ls = r->res_ls;
	int error, dir_nodeid, ret_nodeid, our_nodeid = dlm_our_nodeid();
	int i, error, dir_nodeid, ret_nodeid, our_nodeid = dlm_our_nodeid();

	if (rsb_flag(r, RSB_MASTER_UNCERTAIN)) {
		rsb_clear_flag(r, RSB_MASTER_UNCERTAIN);
@@ -1886,7 +1879,7 @@ static int set_master(struct dlm_rsb *r, struct dlm_lkb *lkb)
		return 1;
	}

	for (;;) {
	for (i = 0; i < 2; i++) {
		/* It's possible for dlm_scand to remove an old rsb for
		   this same resource from the toss list, us to create
		   a new one, look up the master locally, and find it
@@ -1900,6 +1893,8 @@ static int set_master(struct dlm_rsb *r, struct dlm_lkb *lkb)
		log_debug(ls, "dir_lookup error %d %s", error, r->res_name);
		schedule();
	}
	if (error && error != -EEXIST)
		return error;

	if (ret_nodeid == our_nodeid) {
		r->res_first_lkid = 0;
@@ -1941,8 +1936,11 @@ static void confirm_master(struct dlm_rsb *r, int error)
		break;

	case -EAGAIN:
		/* the remote master didn't queue our NOQUEUE request;
		   make a waiting lkb the first_lkid */
	case -EBADR:
	case -ENOTBLK:
		/* the remote request failed and won't be retried (it was
		   a NOQUEUE, or has been canceled/unlocked); make a waiting
		   lkb the first_lkid */

		r->res_first_lkid = 0;

@@ -2108,18 +2106,19 @@ static int validate_unlock_args(struct dlm_lkb *lkb, struct dlm_args *args)
	/* an lkb may be waiting for an rsb lookup to complete where the
	   lookup was initiated by another lock */

	if (args->flags & (DLM_LKF_CANCEL | DLM_LKF_FORCEUNLOCK)) {
	if (!list_empty(&lkb->lkb_rsb_lookup)) {
		if (args->flags & (DLM_LKF_CANCEL | DLM_LKF_FORCEUNLOCK)) {
			log_debug(ls, "unlock on rsb_lookup %x", lkb->lkb_id);
			list_del_init(&lkb->lkb_rsb_lookup);
			queue_cast(lkb->lkb_resource, lkb,
				   args->flags & DLM_LKF_CANCEL ?
				   -DLM_ECANCEL : -DLM_EUNLOCK);
			unhold_lkb(lkb); /* undoes create_lkb() */
		}
		/* caller changes -EBUSY to 0 for CANCEL and FORCEUNLOCK */
		rv = -EBUSY;
		goto out;
	}
	}

	/* cancel not allowed with another cancel/unlock in progress */

@@ -2986,7 +2985,7 @@ static int receive_lvb(struct dlm_ls *ls, struct dlm_lkb *lkb,

	if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
		if (!lkb->lkb_lvbptr)
			lkb->lkb_lvbptr = allocate_lvb(ls);
			lkb->lkb_lvbptr = dlm_allocate_lvb(ls);
		if (!lkb->lkb_lvbptr)
			return -ENOMEM;
		len = receive_extralen(ms);
@@ -3006,11 +3005,9 @@ static int receive_request_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
	lkb->lkb_bastaddr = (void *) (long) (ms->m_asts & AST_BAST);
	lkb->lkb_astaddr = (void *) (long) (ms->m_asts & AST_COMP);

	DLM_ASSERT(is_master_copy(lkb), dlm_print_lkb(lkb););

	if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
		/* lkb was just created so there won't be an lvb yet */
		lkb->lkb_lvbptr = allocate_lvb(ls);
		lkb->lkb_lvbptr = dlm_allocate_lvb(ls);
		if (!lkb->lkb_lvbptr)
			return -ENOMEM;
	}
@@ -3021,16 +3018,6 @@ static int receive_request_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
static int receive_convert_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
				struct dlm_message *ms)
{
	if (lkb->lkb_nodeid != ms->m_header.h_nodeid) {
		log_error(ls, "convert_args nodeid %d %d lkid %x %x",
			  lkb->lkb_nodeid, ms->m_header.h_nodeid,
			  lkb->lkb_id, lkb->lkb_remid);
		return -EINVAL;
	}

	if (!is_master_copy(lkb))
		return -EINVAL;

	if (lkb->lkb_status != DLM_LKSTS_GRANTED)
		return -EBUSY;

@@ -3046,8 +3033,6 @@ static int receive_convert_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
static int receive_unlock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
			       struct dlm_message *ms)
{
	if (!is_master_copy(lkb))
		return -EINVAL;
	if (receive_lvb(ls, lkb, ms))
		return -ENOMEM;
	return 0;
@@ -3063,6 +3048,50 @@ static void setup_stub_lkb(struct dlm_ls *ls, struct dlm_message *ms)
	lkb->lkb_remid = ms->m_lkid;
}

/* This is called after the rsb is locked so that we can safely inspect
   fields in the lkb. */

static int validate_message(struct dlm_lkb *lkb, struct dlm_message *ms)
{
	int from = ms->m_header.h_nodeid;
	int error = 0;

	switch (ms->m_type) {
	case DLM_MSG_CONVERT:
	case DLM_MSG_UNLOCK:
	case DLM_MSG_CANCEL:
		if (!is_master_copy(lkb) || lkb->lkb_nodeid != from)
			error = -EINVAL;
		break;

	case DLM_MSG_CONVERT_REPLY:
	case DLM_MSG_UNLOCK_REPLY:
	case DLM_MSG_CANCEL_REPLY:
	case DLM_MSG_GRANT:
	case DLM_MSG_BAST:
		if (!is_process_copy(lkb) || lkb->lkb_nodeid != from)
			error = -EINVAL;
		break;

	case DLM_MSG_REQUEST_REPLY:
		if (!is_process_copy(lkb))
			error = -EINVAL;
		else if (lkb->lkb_nodeid != -1 && lkb->lkb_nodeid != from)
			error = -EINVAL;
		break;

	default:
		error = -EINVAL;
	}

	if (error)
		log_error(lkb->lkb_resource->res_ls,
			  "ignore invalid message %d from %d %x %x %x %d",
			  ms->m_type, from, lkb->lkb_id, lkb->lkb_remid,
			  lkb->lkb_flags, lkb->lkb_nodeid);
	return error;
}

static void receive_request(struct dlm_ls *ls, struct dlm_message *ms)
{
	struct dlm_lkb *lkb;
@@ -3124,17 +3153,21 @@ static void receive_convert(struct dlm_ls *ls, struct dlm_message *ms)
	hold_rsb(r);
	lock_rsb(r);

	error = validate_message(lkb, ms);
	if (error)
		goto out;

	receive_flags(lkb, ms);
	error = receive_convert_args(ls, lkb, ms);
	if (error)
		goto out;
		goto out_reply;
	reply = !down_conversion(lkb);

	error = do_convert(r, lkb);
 out:
 out_reply:
	if (reply)
		send_convert_reply(r, lkb, error);

 out:
	unlock_rsb(r);
	put_rsb(r);
	dlm_put_lkb(lkb);
@@ -3160,15 +3193,19 @@ static void receive_unlock(struct dlm_ls *ls, struct dlm_message *ms)
	hold_rsb(r);
	lock_rsb(r);

	error = validate_message(lkb, ms);
	if (error)
		goto out;

	receive_flags(lkb, ms);
	error = receive_unlock_args(ls, lkb, ms);
	if (error)
		goto out;
		goto out_reply;

	error = do_unlock(r, lkb);
 out:
 out_reply:
	send_unlock_reply(r, lkb, error);

 out:
	unlock_rsb(r);
	put_rsb(r);
	dlm_put_lkb(lkb);
@@ -3196,9 +3233,13 @@ static void receive_cancel(struct dlm_ls *ls, struct dlm_message *ms)
	hold_rsb(r);
	lock_rsb(r);

	error = validate_message(lkb, ms);
	if (error)
		goto out;

	error = do_cancel(r, lkb);
	send_cancel_reply(r, lkb, error);

 out:
	unlock_rsb(r);
	put_rsb(r);
	dlm_put_lkb(lkb);
@@ -3217,22 +3258,26 @@ static void receive_grant(struct dlm_ls *ls, struct dlm_message *ms)

	error = find_lkb(ls, ms->m_remid, &lkb);
	if (error) {
		log_error(ls, "receive_grant no lkb");
		log_debug(ls, "receive_grant from %d no lkb %x",
			  ms->m_header.h_nodeid, ms->m_remid);
		return;
	}
	DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););

	r = lkb->lkb_resource;

	hold_rsb(r);
	lock_rsb(r);

	error = validate_message(lkb, ms);
	if (error)
		goto out;

	receive_flags_reply(lkb, ms);
	if (is_altmode(lkb))
		munge_altmode(lkb, ms);
	grant_lock_pc(r, lkb, ms);
	queue_cast(r, lkb, 0);

 out:
	unlock_rsb(r);
	put_rsb(r);
	dlm_put_lkb(lkb);
@@ -3246,18 +3291,22 @@ static void receive_bast(struct dlm_ls *ls, struct dlm_message *ms)

	error = find_lkb(ls, ms->m_remid, &lkb);
	if (error) {
		log_error(ls, "receive_bast no lkb");
		log_debug(ls, "receive_bast from %d no lkb %x",
			  ms->m_header.h_nodeid, ms->m_remid);
		return;
	}
	DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););

	r = lkb->lkb_resource;

	hold_rsb(r);
	lock_rsb(r);

	queue_bast(r, lkb, ms->m_bastmode);
	error = validate_message(lkb, ms);
	if (error)
		goto out;

	queue_bast(r, lkb, ms->m_bastmode);
 out:
	unlock_rsb(r);
	put_rsb(r);
	dlm_put_lkb(lkb);
@@ -3323,15 +3372,19 @@ static void receive_request_reply(struct dlm_ls *ls, struct dlm_message *ms)

	error = find_lkb(ls, ms->m_remid, &lkb);
	if (error) {
		log_error(ls, "receive_request_reply no lkb");
		log_debug(ls, "receive_request_reply from %d no lkb %x",
			  ms->m_header.h_nodeid, ms->m_remid);
		return;
	}
	DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););

	r = lkb->lkb_resource;
	hold_rsb(r);
	lock_rsb(r);

	error = validate_message(lkb, ms);
	if (error)
		goto out;

	mstype = lkb->lkb_wait_type;
	error = remove_from_waiters(lkb, DLM_MSG_REQUEST_REPLY);
	if (error)
@@ -3383,6 +3436,7 @@ static void receive_request_reply(struct dlm_ls *ls, struct dlm_message *ms)
		if (is_overlap(lkb)) {
			/* we'll ignore error in cancel/unlock reply */
			queue_cast_overlap(r, lkb);
			confirm_master(r, result);
			unhold_lkb(lkb); /* undoes create_lkb() */
		} else
			_request_lock(r, lkb);
@@ -3463,6 +3517,10 @@ static void _receive_convert_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
	hold_rsb(r);
	lock_rsb(r);

	error = validate_message(lkb, ms);
	if (error)
		goto out;

	/* stub reply can happen with waiters_mutex held */
	error = remove_from_waiters_ms(lkb, ms);
	if (error)
@@ -3481,10 +3539,10 @@ static void receive_convert_reply(struct dlm_ls *ls, struct dlm_message *ms)

	error = find_lkb(ls, ms->m_remid, &lkb);
	if (error) {
		log_error(ls, "receive_convert_reply no lkb");
		log_debug(ls, "receive_convert_reply from %d no lkb %x",
			  ms->m_header.h_nodeid, ms->m_remid);
		return;
	}
	DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););

	_receive_convert_reply(lkb, ms);
	dlm_put_lkb(lkb);
@@ -3498,6 +3556,10 @@ static void _receive_unlock_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
	hold_rsb(r);
	lock_rsb(r);

	error = validate_message(lkb, ms);
	if (error)
		goto out;

	/* stub reply can happen with waiters_mutex held */
	error = remove_from_waiters_ms(lkb, ms);
	if (error)
@@ -3529,10 +3591,10 @@ static void receive_unlock_reply(struct dlm_ls *ls, struct dlm_message *ms)

	error = find_lkb(ls, ms->m_remid, &lkb);
	if (error) {
		log_error(ls, "receive_unlock_reply no lkb");
		log_debug(ls, "receive_unlock_reply from %d no lkb %x",
			  ms->m_header.h_nodeid, ms->m_remid);
		return;
	}
	DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););

	_receive_unlock_reply(lkb, ms);
	dlm_put_lkb(lkb);
@@ -3546,6 +3608,10 @@ static void _receive_cancel_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
	hold_rsb(r);
	lock_rsb(r);

	error = validate_message(lkb, ms);
	if (error)
		goto out;

	/* stub reply can happen with waiters_mutex held */
	error = remove_from_waiters_ms(lkb, ms);
	if (error)
@@ -3577,10 +3643,10 @@ static void receive_cancel_reply(struct dlm_ls *ls, struct dlm_message *ms)

	error = find_lkb(ls, ms->m_remid, &lkb);
	if (error) {
		log_error(ls, "receive_cancel_reply no lkb");
		log_debug(ls, "receive_cancel_reply from %d no lkb %x",
			  ms->m_header.h_nodeid, ms->m_remid);
		return;
	}
	DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););

	_receive_cancel_reply(lkb, ms);
	dlm_put_lkb(lkb);
@@ -3640,6 +3706,13 @@ static void receive_lookup_reply(struct dlm_ls *ls, struct dlm_message *ms)

static void _receive_message(struct dlm_ls *ls, struct dlm_message *ms)
{
	if (!dlm_is_member(ls, ms->m_header.h_nodeid)) {
		log_debug(ls, "ignore non-member message %d from %d %x %x %d",
			  ms->m_type, ms->m_header.h_nodeid, ms->m_lkid,
			  ms->m_remid, ms->m_result);
		return;
	}

	switch (ms->m_type) {

	/* messages sent to a master node */
@@ -3778,7 +3851,8 @@ void dlm_receive_buffer(struct dlm_header *hd, int nodeid)

	ls = dlm_find_lockspace_global(hd->h_lockspace);
	if (!ls) {
		log_print("invalid h_lockspace %x from %d cmd %d type %d",
		if (dlm_config.ci_log_debug)
			log_print("invalid lockspace %x from %d cmd %d type %d",
				  hd->h_lockspace, nodeid, hd->h_cmd, type);

		if (hd->h_cmd == DLM_RCOM && type == DLM_RCOM_STATUS)
@@ -3806,6 +3880,7 @@ static void recover_convert_waiter(struct dlm_ls *ls, struct dlm_lkb *lkb)
		ls->ls_stub_ms.m_type = DLM_MSG_CONVERT_REPLY;
		ls->ls_stub_ms.m_result = -EINPROGRESS;
		ls->ls_stub_ms.m_flags = lkb->lkb_flags;
		ls->ls_stub_ms.m_header.h_nodeid = lkb->lkb_nodeid;
		_receive_convert_reply(lkb, &ls->ls_stub_ms);

		/* Same special case as in receive_rcom_lock_args() */
@@ -3847,6 +3922,7 @@ static int waiter_needs_recovery(struct dlm_ls *ls, struct dlm_lkb *lkb)
void dlm_recover_waiters_pre(struct dlm_ls *ls)
{
	struct dlm_lkb *lkb, *safe;
	int wait_type, stub_unlock_result, stub_cancel_result;

	mutex_lock(&ls->ls_waiters_mutex);

@@ -3865,7 +3941,33 @@ void dlm_recover_waiters_pre(struct dlm_ls *ls)
		if (!waiter_needs_recovery(ls, lkb))
			continue;

		switch (lkb->lkb_wait_type) {
		wait_type = lkb->lkb_wait_type;
		stub_unlock_result = -DLM_EUNLOCK;
		stub_cancel_result = -DLM_ECANCEL;

		/* Main reply may have been received leaving a zero wait_type,
		   but a reply for the overlapping op may not have been
		   received.  In that case we need to fake the appropriate
		   reply for the overlap op. */

		if (!wait_type) {
			if (is_overlap_cancel(lkb)) {
				wait_type = DLM_MSG_CANCEL;
				if (lkb->lkb_grmode == DLM_LOCK_IV)
					stub_cancel_result = 0;
			}
			if (is_overlap_unlock(lkb)) {
				wait_type = DLM_MSG_UNLOCK;
				if (lkb->lkb_grmode == DLM_LOCK_IV)
					stub_unlock_result = -ENOENT;
			}

			log_debug(ls, "rwpre overlap %x %x %d %d %d",
				  lkb->lkb_id, lkb->lkb_flags, wait_type,
				  stub_cancel_result, stub_unlock_result);
		}

		switch (wait_type) {

		case DLM_MSG_REQUEST:
			lkb->lkb_flags |= DLM_IFL_RESEND;
@@ -3878,8 +3980,9 @@ void dlm_recover_waiters_pre(struct dlm_ls *ls)
		case DLM_MSG_UNLOCK:
			hold_lkb(lkb);
			ls->ls_stub_ms.m_type = DLM_MSG_UNLOCK_REPLY;
			ls->ls_stub_ms.m_result = -DLM_EUNLOCK;
			ls->ls_stub_ms.m_result = stub_unlock_result;
			ls->ls_stub_ms.m_flags = lkb->lkb_flags;
			ls->ls_stub_ms.m_header.h_nodeid = lkb->lkb_nodeid;
			_receive_unlock_reply(lkb, &ls->ls_stub_ms);
			dlm_put_lkb(lkb);
			break;
@@ -3887,15 +3990,16 @@ void dlm_recover_waiters_pre(struct dlm_ls *ls)
		case DLM_MSG_CANCEL:
			hold_lkb(lkb);
			ls->ls_stub_ms.m_type = DLM_MSG_CANCEL_REPLY;
			ls->ls_stub_ms.m_result = -DLM_ECANCEL;
			ls->ls_stub_ms.m_result = stub_cancel_result;
			ls->ls_stub_ms.m_flags = lkb->lkb_flags;
			ls->ls_stub_ms.m_header.h_nodeid = lkb->lkb_nodeid;
			_receive_cancel_reply(lkb, &ls->ls_stub_ms);
			dlm_put_lkb(lkb);
			break;

		default:
			log_error(ls, "invalid lkb wait_type %d",
				  lkb->lkb_wait_type);
			log_error(ls, "invalid lkb wait_type %d %d",
				  lkb->lkb_wait_type, wait_type);
		}
		schedule();
	}
@@ -4184,7 +4288,7 @@ static int receive_rcom_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
	lkb->lkb_astaddr = (void *) (long) (rl->rl_asts & AST_COMP);

	if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
		lkb->lkb_lvbptr = allocate_lvb(ls);
		lkb->lkb_lvbptr = dlm_allocate_lvb(ls);
		if (!lkb->lkb_lvbptr)
			return -ENOMEM;
		lvblen = rc->rc_header.h_length - sizeof(struct dlm_rcom) -
@@ -4259,7 +4363,7 @@ int dlm_recover_master_copy(struct dlm_ls *ls, struct dlm_rcom *rc)
	put_rsb(r);
 out:
	if (error)
		log_print("recover_master_copy %d %x", error, rl->rl_lkid);
		log_debug(ls, "recover_master_copy %d %x", error, rl->rl_lkid);
	rl->rl_result = error;
	return error;
}
@@ -4342,7 +4446,7 @@ int dlm_user_request(struct dlm_ls *ls, struct dlm_user_args *ua,
		}
	}

	/* After ua is attached to lkb it will be freed by free_lkb().
	/* After ua is attached to lkb it will be freed by dlm_free_lkb().
	   When DLM_IFL_USER is set, the dlm knows that this is a userspace
	   lock and that lkb_astparam is the dlm_user_args structure. */

@@ -4679,6 +4783,7 @@ void dlm_clear_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc)
	}

	list_for_each_entry_safe(lkb, safe, &proc->asts, lkb_astqueue) {
		lkb->lkb_ast_type = 0;
		list_del(&lkb->lkb_astqueue);
		dlm_put_lkb(lkb);
	}
+0 −2
Original line number Diff line number Diff line
@@ -19,8 +19,6 @@ void dlm_print_lkb(struct dlm_lkb *lkb);
void dlm_receive_message_saved(struct dlm_ls *ls, struct dlm_message *ms);
void dlm_receive_buffer(struct dlm_header *hd, int nodeid);
int dlm_modes_compat(int mode1, int mode2);
int dlm_find_rsb(struct dlm_ls *ls, char *name, int namelen,
	unsigned int flags, struct dlm_rsb **r_ret);
void dlm_put_rsb(struct dlm_rsb *r);
void dlm_hold_rsb(struct dlm_rsb *r);
int dlm_put_lkb(struct dlm_lkb *lkb);
+4 −12
Original line number Diff line number Diff line
@@ -24,14 +24,6 @@
#include "recover.h"
#include "requestqueue.h"

#ifdef CONFIG_DLM_DEBUG
int dlm_create_debug_file(struct dlm_ls *ls);
void dlm_delete_debug_file(struct dlm_ls *ls);
#else
static inline int dlm_create_debug_file(struct dlm_ls *ls) { return 0; }
static inline void dlm_delete_debug_file(struct dlm_ls *ls) { }
#endif

static int			ls_count;
static struct mutex		ls_lock;
static struct list_head		lslist;
@@ -684,9 +676,9 @@ static int release_lockspace(struct dlm_ls *ls, int force)
			dlm_del_ast(lkb);

			if (lkb->lkb_lvbptr && lkb->lkb_flags & DLM_IFL_MSTCPY)
				free_lvb(lkb->lkb_lvbptr);
				dlm_free_lvb(lkb->lkb_lvbptr);

			free_lkb(lkb);
			dlm_free_lkb(lkb);
		}
	}
	dlm_astd_resume();
@@ -704,7 +696,7 @@ static int release_lockspace(struct dlm_ls *ls, int force)
					 res_hashchain);

			list_del(&rsb->res_hashchain);
			free_rsb(rsb);
			dlm_free_rsb(rsb);
		}

		head = &ls->ls_rsbtbl[i].toss;
@@ -712,7 +704,7 @@ static int release_lockspace(struct dlm_ls *ls, int force)
			rsb = list_entry(head->next, struct dlm_rsb,
					 res_hashchain);
			list_del(&rsb->res_hashchain);
			free_rsb(rsb);
			dlm_free_rsb(rsb);
		}
	}

Loading