Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit f26e51f6 authored by Linus Torvalds's avatar Linus Torvalds
Browse files
* git://git.kernel.org/pub/scm/linux/kernel/git/steve/gfs2-2.6-nmw: (51 commits)
  [DLM] block dlm_recv in recovery transition
  [DLM] don't overwrite castparam if it's NULL
  [GFS2] Get superblock a different way
  [GFS2] Don't try to remove buffers that don't exist
  [GFS2] Alternate gfs2_iget to avoid looking up inodes being freed
  [GFS2] Data corruption fix
  [GFS2] Clean up journaled data writing
  [GFS2] GFS2: chmod hung - fix race in thread creation
  [DLM] Make dlm_sendd cond_resched more
  [GFS2] Move inode deletion out of blocking_cb
  [GFS2] flocks from same process trip kernel BUG at fs/gfs2/glock.c:1118!
  [GFS2] Clean up gfs2_trans_add_revoke()
  [GFS2] Use slab operations for all gfs2_bufdata allocations
  [GFS2] Replace revoke structure with bufdata structure
  [GFS2] Fix ordering of dirty/journal for ordered buffer unstuffing
  [GFS2] Clean up ordered write code
  [GFS2] Move pin/unpin into lops.c, clean up locking
  [GFS2] Don't mark jdata dirty in gfs2_unstuffer_page()
  [GFS2] Introduce gfs2_remove_from_ail
  [GFS2] Correct lock ordering in unlink
  ...
parents 1462222b c36258b5
Loading
Loading
Loading
Loading
+1 −0
Original line number Diff line number Diff line
@@ -491,6 +491,7 @@ struct dlm_ls {
	uint64_t		ls_recover_seq;
	struct dlm_recover	*ls_recover_args;
	struct rw_semaphore	ls_in_recovery;	/* block local requests */
	struct rw_semaphore	ls_recv_active;	/* block dlm_recv */
	struct list_head	ls_requestqueue;/* queue remote requests */
	struct mutex		ls_requestqueue_mutex;
	char			*ls_recover_buf;
+85 −57
Original line number Diff line number Diff line
@@ -3638,55 +3638,8 @@ static void receive_lookup_reply(struct dlm_ls *ls, struct dlm_message *ms)
	dlm_put_lkb(lkb);
}

int dlm_receive_message(struct dlm_header *hd, int nodeid, int recovery)
static void _receive_message(struct dlm_ls *ls, struct dlm_message *ms)
{
	struct dlm_message *ms = (struct dlm_message *) hd;
	struct dlm_ls *ls;
	int error = 0;

	if (!recovery)
		dlm_message_in(ms);

	ls = dlm_find_lockspace_global(hd->h_lockspace);
	if (!ls) {
		log_print("drop message %d from %d for unknown lockspace %d",
			  ms->m_type, nodeid, hd->h_lockspace);
		return -EINVAL;
	}

	/* recovery may have just ended leaving a bunch of backed-up requests
	   in the requestqueue; wait while dlm_recoverd clears them */

	if (!recovery)
		dlm_wait_requestqueue(ls);

	/* recovery may have just started while there were a bunch of
	   in-flight requests -- save them in requestqueue to be processed
	   after recovery.  we can't let dlm_recvd block on the recovery
	   lock.  if dlm_recoverd is calling this function to clear the
	   requestqueue, it needs to be interrupted (-EINTR) if another
	   recovery operation is starting. */

	while (1) {
		if (dlm_locking_stopped(ls)) {
			if (recovery) {
				error = -EINTR;
				goto out;
			}
			error = dlm_add_requestqueue(ls, nodeid, hd);
			if (error == -EAGAIN)
				continue;
			else {
				error = -EINTR;
				goto out;
			}
		}

		if (dlm_lock_recovery_try(ls))
			break;
		schedule();
	}

	switch (ms->m_type) {

	/* messages sent to a master node */
@@ -3761,17 +3714,90 @@ int dlm_receive_message(struct dlm_header *hd, int nodeid, int recovery)
		log_error(ls, "unknown message type %d", ms->m_type);
	}

	dlm_unlock_recovery(ls);
 out:
	dlm_put_lockspace(ls);
	dlm_astd_wake();
	return error;
}

/* If the lockspace is in recovery mode (locking stopped), then normal
   messages are saved on the requestqueue for processing after recovery is
   done.  When not in recovery mode, we wait for dlm_recoverd to drain saved
   messages off the requestqueue before we process new ones. This occurs right
   after recovery completes when we transition from saving all messages on
   requestqueue, to processing all the saved messages, to processing new
   messages as they arrive. */

/*
 * Recovery related
 */
static void dlm_receive_message(struct dlm_ls *ls, struct dlm_message *ms,
				int nodeid)
{
	if (dlm_locking_stopped(ls)) {
		dlm_add_requestqueue(ls, nodeid, (struct dlm_header *) ms);
	} else {
		dlm_wait_requestqueue(ls);
		_receive_message(ls, ms);
	}
}

/* This is called by dlm_recoverd to process messages that were saved on
   the requestqueue. */

void dlm_receive_message_saved(struct dlm_ls *ls, struct dlm_message *ms)
{
	_receive_message(ls, ms);
}

/* This is called by the midcomms layer when something is received for
   the lockspace.  It could be either a MSG (normal message sent as part of
   standard locking activity) or an RCOM (recovery message sent as part of
   lockspace recovery). */

void dlm_receive_buffer(struct dlm_header *hd, int nodeid)
{
	struct dlm_message *ms = (struct dlm_message *) hd;
	struct dlm_rcom *rc = (struct dlm_rcom *) hd;
	struct dlm_ls *ls;
	int type = 0;

	switch (hd->h_cmd) {
	case DLM_MSG:
		dlm_message_in(ms);
		type = ms->m_type;
		break;
	case DLM_RCOM:
		dlm_rcom_in(rc);
		type = rc->rc_type;
		break;
	default:
		log_print("invalid h_cmd %d from %u", hd->h_cmd, nodeid);
		return;
	}

	if (hd->h_nodeid != nodeid) {
		log_print("invalid h_nodeid %d from %d lockspace %x",
			  hd->h_nodeid, nodeid, hd->h_lockspace);
		return;
	}

	ls = dlm_find_lockspace_global(hd->h_lockspace);
	if (!ls) {
		log_print("invalid h_lockspace %x from %d cmd %d type %d",
			  hd->h_lockspace, nodeid, hd->h_cmd, type);

		if (hd->h_cmd == DLM_RCOM && type == DLM_RCOM_STATUS)
			dlm_send_ls_not_ready(nodeid, rc);
		return;
	}

	/* this rwsem allows dlm_ls_stop() to wait for all dlm_recv threads to
	   be inactive (in this ls) before transitioning to recovery mode */

	down_read(&ls->ls_recv_active);
	if (hd->h_cmd == DLM_MSG)
		dlm_receive_message(ls, ms, nodeid);
	else
		dlm_receive_rcom(ls, rc, nodeid);
	up_read(&ls->ls_recv_active);

	dlm_put_lockspace(ls);
}

static void recover_convert_waiter(struct dlm_ls *ls, struct dlm_lkb *lkb)
{
@@ -4429,6 +4455,7 @@ int dlm_user_unlock(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,

	if (lvb_in && ua->lksb.sb_lvbptr)
		memcpy(ua->lksb.sb_lvbptr, lvb_in, DLM_USER_LVB_LEN);
	if (ua_tmp->castparam)
		ua->castparam = ua_tmp->castparam;
	ua->user_lksb = ua_tmp->user_lksb;

@@ -4474,6 +4501,7 @@ int dlm_user_cancel(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
		goto out;

	ua = (struct dlm_user_args *)lkb->lkb_astparam;
	if (ua_tmp->castparam)
		ua->castparam = ua_tmp->castparam;
	ua->user_lksb = ua_tmp->user_lksb;

+2 −1
Original line number Diff line number Diff line
@@ -16,7 +16,8 @@
void dlm_print_rsb(struct dlm_rsb *r);
void dlm_dump_rsb(struct dlm_rsb *r);
void dlm_print_lkb(struct dlm_lkb *lkb);
int dlm_receive_message(struct dlm_header *hd, int nodeid, int recovery);
void dlm_receive_message_saved(struct dlm_ls *ls, struct dlm_message *ms);
void dlm_receive_buffer(struct dlm_header *hd, int nodeid);
int dlm_modes_compat(int mode1, int mode2);
int dlm_find_rsb(struct dlm_ls *ls, char *name, int namelen,
	unsigned int flags, struct dlm_rsb **r_ret);
+1 −0
Original line number Diff line number Diff line
@@ -519,6 +519,7 @@ static int new_lockspace(char *name, int namelen, void **lockspace,
	ls->ls_recover_seq = 0;
	ls->ls_recover_args = NULL;
	init_rwsem(&ls->ls_in_recovery);
	init_rwsem(&ls->ls_recv_active);
	INIT_LIST_HEAD(&ls->ls_requestqueue);
	mutex_init(&ls->ls_requestqueue_mutex);
	mutex_init(&ls->ls_clear_proc_locks);
+8 −15
Original line number Diff line number Diff line
@@ -334,19 +334,9 @@ static void close_connection(struct connection *con, bool and_other)
		con->rx_page = NULL;
	}

	/* If we are an 'othercon' then NULL the pointer to us
	   from the parent and tidy ourself up */
	if (test_bit(CF_IS_OTHERCON, &con->flags)) {
		struct connection *parent = __nodeid2con(con->nodeid, 0);
		parent->othercon = NULL;
		kmem_cache_free(con_cache, con);
	}
	else {
		/* Parent connections get reused */
	con->retries = 0;
	mutex_unlock(&con->sock_mutex);
}
}

/* We only send shutdown messages to nodes that are not part of the cluster */
static void sctp_send_shutdown(sctp_assoc_t associd)
@@ -731,6 +721,8 @@ static int tcp_accept_from_sock(struct connection *con)
			INIT_WORK(&othercon->swork, process_send_sockets);
			INIT_WORK(&othercon->rwork, process_recv_sockets);
			set_bit(CF_IS_OTHERCON, &othercon->flags);
		}
		if (!othercon->sock) {
			newcon->othercon = othercon;
			othercon->sock = newsock;
			newsock->sk->sk_user_data = othercon;
@@ -1272,14 +1264,15 @@ static void send_to_sock(struct connection *con)
		if (len) {
			ret = sendpage(con->sock, e->page, offset, len,
				       msg_flags);
			if (ret == -EAGAIN || ret == 0)
			if (ret == -EAGAIN || ret == 0) {
				cond_resched();
				goto out;
			}
			if (ret <= 0)
				goto send_error;
		} else {
		}
			/* Don't starve people filling buffers */
			cond_resched();
		}

		spin_lock(&con->writequeue_lock);
		e->offset += ret;
Loading