Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit ac33d071 authored by Patrick Caulfield's avatar Patrick Caulfield Committed by Steven Whitehouse
Browse files

[DLM] Clean up lowcomms



This fixes up most of the things pointed out by akpm and Pavel Machek
with comments below indicating why some things have been left:

Andrew Morton wrote:
>
>> +static struct nodeinfo *nodeid2nodeinfo(int nodeid, gfp_t alloc)
>> +{
>> +	struct nodeinfo *ni;
>> +	int r;
>> +	int n;
>> +
>> +	down_read(&nodeinfo_lock);
>
> Given that this function can sleep, I wonder if `alloc' is useful.
>
> I see lots of callers passing in a literal "0" for `alloc'.  That's in fact
> a secret (GFP_ATOMIC & ~__GFP_HIGH).  I doubt if that's what you really
> meant.  Particularly as the code could at least have used __GFP_WAIT (aka
> GFP_NOIO) which is much, much more reliable than "0".  In fact "0" is the
> least reliable mode possible.
>
> IOW, this is all bollixed up.

When 0 is passed into nodeid2nodeinfo the function does not try to allocate a
new structure at all. it's an indication that the caller only wants the nodeinfo
struct for that nodeid if there actually is one in existance.
I've tidied the function itself so it's more obvious, (and tidier!)

>> +/* Data received from remote end */
>> +static int receive_from_sock(void)
>> +{
>> +	int ret = 0;
>> +	struct msghdr msg;
>> +	struct kvec iov[2];
>> +	unsigned len;
>> +	int r;
>> +	struct sctp_sndrcvinfo *sinfo;
>> +	struct cmsghdr *cmsg;
>> +	struct nodeinfo *ni;
>> +
>> +	/* These two are marginally too big for stack allocation, but this
>> +	 * function is (currently) only called by dlm_recvd so static should be
>> +	 * OK.
>> +	 */
>> +	static struct sockaddr_storage msgname;
>> +	static char incmsg[CMSG_SPACE(sizeof(struct sctp_sndrcvinfo))];
>
> whoa.  This is globally singly-threaded code??

Yes. it is only ever run in the context of dlm_recvd.
>>
>> +static void initiate_association(int nodeid)
>> +{
>> +	struct sockaddr_storage rem_addr;
>> +	static char outcmsg[CMSG_SPACE(sizeof(struct sctp_sndrcvinfo))];
>
> Another static buffer to worry about.  Globally singly-threaded code?

Yes. Only ever called by dlm_sendd.

>> +
>> +/* Send a message */
>> +static int send_to_sock(struct nodeinfo *ni)
>> +{
>> +	int ret = 0;
>> +	struct writequeue_entry *e;
>> +	int len, offset;
>> +	struct msghdr outmsg;
>> +	static char outcmsg[CMSG_SPACE(sizeof(struct sctp_sndrcvinfo))];
>
> Singly-threaded?

Yep.

>>
>> +static void dealloc_nodeinfo(void)
>> +{
>> +	int i;
>> +
>> +	for (i=1; i<=max_nodeid; i++) {
>> +		struct nodeinfo *ni = nodeid2nodeinfo(i, 0);
>> +		if (ni) {
>> +			idr_remove(&nodeinfo_idr, i);
>
> Didn't that need locking?

Not. it's only ever called at DLM shutdown after all the other threads
have been stopped.

>>
>> +static int write_list_empty(void)
>> +{
>> +	int status;
>> +
>> +	spin_lock_bh(&write_nodes_lock);
>> +	status = list_empty(&write_nodes);
>> +	spin_unlock_bh(&write_nodes_lock);
>> +
>> +	return status;
>> +}
>
> This function's return value is meaningless.  As soon as the lock gets
> dropped, the return value can get out of sync with reality.
>
> Looking at the caller, this _might_ happen to be OK, but it's a nasty and
> dangerous thing.  Really the locking should be moved into the caller.

It's just an optimisation to allow the caller to schedule if there is no work
to do. if something arrives immediately afterwards then it will get picked up
when the process re-awakes (and it will be woken by that arrival).

The 'accepting' atomic has gone completely. as Andrew pointed out it didn't
really achieve much anyway. I suspect it was a plaster over some other
startup or shutdown bug to be honest.


Signed-off-by: default avatarPatrick Caulfield <pcaulfie@redhat.com>
Signed-off-by: default avatarSteven Whitehouse <swhiteho@redhat.com>
Cc: Andrew Morton <akpm@osdl.org>
Cc: Pavel Machek <pavel@ucw.cz>
parent 34126f9f
Loading
Loading
Loading
Loading
+126 −138
Original line number Diff line number Diff line
@@ -2,7 +2,7 @@
*******************************************************************************
**
**  Copyright (C) Sistina Software, Inc.  1997-2003  All rights reserved.
**  Copyright (C) 2004-2005 Red Hat, Inc.  All rights reserved.
**  Copyright (C) 2004-2006 Red Hat, Inc.  All rights reserved.
**
**  This copyrighted material is made available to anyone wishing to use,
**  modify, copy, or redistribute it subject to the terms and conditions
@@ -75,13 +75,13 @@ struct nodeinfo {
};

static DEFINE_IDR(nodeinfo_idr);
static struct rw_semaphore	nodeinfo_lock;
static DECLARE_RWSEM(nodeinfo_lock);
static int max_nodeid;

struct cbuf {
	unsigned		base;
	unsigned		len;
	unsigned		mask;
	unsigned int base;
	unsigned int len;
	unsigned int mask;
};

/* Just the one of these, now. But this struct keeps
@@ -110,28 +110,32 @@ struct writequeue_entry {
	struct nodeinfo         *ni;
};

#define CBUF_ADD(cb, n) do { (cb)->len += n; } while(0)
#define CBUF_EMPTY(cb) ((cb)->len == 0)
#define CBUF_MAY_ADD(cb, n) (((cb)->len + (n)) < ((cb)->mask + 1))
#define CBUF_DATA(cb) (((cb)->base + (cb)->len) & (cb)->mask)
static void cbuf_add(struct cbuf *cb, int n)
{
	cb->len += n;
}

#define CBUF_INIT(cb, size) \
do { \
	(cb)->base = (cb)->len = 0; \
	(cb)->mask = ((size)-1); \
} while(0)
static int cbuf_data(struct cbuf *cb)
{
	return ((cb->base + cb->len) & cb->mask);
}

#define CBUF_EAT(cb, n) \
do { \
	(cb)->len  -= (n); \
	(cb)->base += (n); \
	(cb)->base &= (cb)->mask; \
} while(0)
static void cbuf_init(struct cbuf *cb, int size)
{
	cb->base = cb->len = 0;
	cb->mask = size-1;
}

static void cbuf_eat(struct cbuf *cb, int n)
{
	cb->len  -= n;
	cb->base += n;
	cb->base &= cb->mask;
}

/* List of nodes which have writes pending */
static struct list_head write_nodes;
static spinlock_t write_nodes_lock;
static LIST_HEAD(write_nodes);
static DEFINE_SPINLOCK(write_nodes_lock);

/* Maximum number of incoming messages to process before
 * doing a schedule()
@@ -141,8 +145,7 @@ static spinlock_t write_nodes_lock;
/* Manage daemons */
static struct task_struct *recv_task;
static struct task_struct *send_task;
static wait_queue_head_t lowcomms_recv_wait;
static atomic_t accepting;
static DECLARE_WAIT_QUEUE_HEAD(lowcomms_recv_wait);

/* The SCTP connection */
static struct connection sctp_con;
@@ -174,6 +177,8 @@ static int nodeid_to_addr(int nodeid, struct sockaddr *retaddr)
	return 0;
}

/* If alloc is 0 here we will not attempt to allocate a new
   nodeinfo struct */
static struct nodeinfo *nodeid2nodeinfo(int nodeid, gfp_t alloc)
{
	struct nodeinfo *ni;
@@ -184,7 +189,9 @@ static struct nodeinfo *nodeid2nodeinfo(int nodeid, gfp_t alloc)
	ni = idr_find(&nodeinfo_idr, nodeid);
	up_read(&nodeinfo_lock);

	if (!ni && alloc) {
	if (ni || !alloc)
		return ni;

	down_write(&nodeinfo_lock);

	ni = idr_find(&nodeinfo_idr, nodeid);
@@ -221,7 +228,6 @@ static struct nodeinfo *nodeid2nodeinfo(int nodeid, gfp_t alloc)
		max_nodeid = nodeid;
out_up:
	up_write(&nodeinfo_lock);
	}

	return ni;
}
@@ -324,7 +330,7 @@ static void send_shutdown(sctp_assoc_t associd)
	cmsg->cmsg_type = SCTP_SNDRCV;
	cmsg->cmsg_len = CMSG_LEN(sizeof(struct sctp_sndrcvinfo));
	outmessage.msg_controllen = cmsg->cmsg_len;
	sinfo = (struct sctp_sndrcvinfo *)CMSG_DATA(cmsg);
	sinfo = CMSG_DATA(cmsg);
	memset(sinfo, 0x00, sizeof(struct sctp_sndrcvinfo));

	sinfo->sinfo_flags |= MSG_EOF;
@@ -398,15 +404,18 @@ static void process_sctp_notification(struct msghdr *msg, char *buf)
			fs = get_fs();
			set_fs(get_ds());
			ret = sctp_con.sock->ops->getsockopt(sctp_con.sock,
						IPPROTO_SCTP, SCTP_PRIMARY_ADDR,
						(char*)&prim, &prim_len);
							     IPPROTO_SCTP,
							     SCTP_PRIMARY_ADDR,
							     (char*)&prim,
							     &prim_len);
			set_fs(fs);
			if (ret < 0) {
				struct nodeinfo *ni;

				log_print("getsockopt/sctp_primary_addr on "
					  "new assoc %d failed : %d",
				    (int)sn->sn_assoc_change.sac_assoc_id, ret);
					  (int)sn->sn_assoc_change.sac_assoc_id,
					  ret);

				/* Retry INIT later */
				ni = assoc2nodeinfo(sn->sn_assoc_change.sac_assoc_id);
@@ -426,9 +435,7 @@ static void process_sctp_notification(struct msghdr *msg, char *buf)
				return;

			/* Save the assoc ID */
			spin_lock(&ni->lock);
			ni->assoc_id = sn->sn_assoc_change.sac_assoc_id;
			spin_unlock(&ni->lock);

			log_print("got new/restarted association %d nodeid %d",
				  (int)sn->sn_assoc_change.sac_assoc_id, nodeid);
@@ -507,13 +514,12 @@ static int receive_from_sock(void)
		sctp_con.rx_page = alloc_page(GFP_ATOMIC);
		if (sctp_con.rx_page == NULL)
			goto out_resched;
		CBUF_INIT(&sctp_con.cb, PAGE_CACHE_SIZE);
		cbuf_init(&sctp_con.cb, PAGE_CACHE_SIZE);
	}

	memset(&incmsg, 0, sizeof(incmsg));
	memset(&msgname, 0, sizeof(msgname));

	memset(incmsg, 0, sizeof(incmsg));
	msg.msg_name = &msgname;
	msg.msg_namelen = sizeof(msgname);
	msg.msg_flags = 0;
@@ -532,17 +538,17 @@ static int receive_from_sock(void)
	 * iov[0] is the bit of the circular buffer between the current end
	 * point (cb.base + cb.len) and the end of the buffer.
	 */
	iov[0].iov_len = sctp_con.cb.base - CBUF_DATA(&sctp_con.cb);
	iov[0].iov_len = sctp_con.cb.base - cbuf_data(&sctp_con.cb);
	iov[0].iov_base = page_address(sctp_con.rx_page) +
			  CBUF_DATA(&sctp_con.cb);
		cbuf_data(&sctp_con.cb);
	iov[1].iov_len = 0;

	/*
	 * iov[1] is the bit of the circular buffer between the start of the
	 * buffer and the start of the currently used section (cb.base)
	 */
	if (CBUF_DATA(&sctp_con.cb) >= sctp_con.cb.base) {
		iov[0].iov_len = PAGE_CACHE_SIZE - CBUF_DATA(&sctp_con.cb);
	if (cbuf_data(&sctp_con.cb) >= sctp_con.cb.base) {
		iov[0].iov_len = PAGE_CACHE_SIZE - cbuf_data(&sctp_con.cb);
		iov[1].iov_len = sctp_con.cb.base;
		iov[1].iov_base = page_address(sctp_con.rx_page);
		msg.msg_iovlen = 2;
@@ -557,7 +563,7 @@ static int receive_from_sock(void)
	msg.msg_control = incmsg;
	msg.msg_controllen = sizeof(incmsg);
	cmsg = CMSG_FIRSTHDR(&msg);
	sinfo = (struct sctp_sndrcvinfo *)CMSG_DATA(cmsg);
	sinfo = CMSG_DATA(cmsg);

	if (msg.msg_flags & MSG_NOTIFICATION) {
		process_sctp_notification(&msg, page_address(sctp_con.rx_page));
@@ -583,14 +589,14 @@ static int receive_from_sock(void)
	if (r == 1)
		return 0;

	CBUF_ADD(&sctp_con.cb, ret);
	cbuf_add(&sctp_con.cb, ret);
	ret = dlm_process_incoming_buffer(cpu_to_le32(sinfo->sinfo_ppid),
					  page_address(sctp_con.rx_page),
					  sctp_con.cb.base, sctp_con.cb.len,
					  PAGE_CACHE_SIZE);
	if (ret < 0)
		goto out_close;
	CBUF_EAT(&sctp_con.cb, ret);
	cbuf_eat(&sctp_con.cb, ret);

out:
	ret = 0;
@@ -599,7 +605,7 @@ static int receive_from_sock(void)
out_resched:
	lowcomms_data_ready(sctp_con.sock->sk, 0);
	ret = 0;
	schedule();
	cond_resched();
	goto out_ret;

out_close:
@@ -619,10 +625,12 @@ static int add_bind_addr(struct sockaddr_storage *addr, int addr_len, int num)
	set_fs(get_ds());
	if (num == 1)
		result = sctp_con.sock->ops->bind(sctp_con.sock,
					(struct sockaddr *) addr, addr_len);
						  (struct sockaddr *) addr,
						  addr_len);
	else
		result = sctp_con.sock->ops->setsockopt(sctp_con.sock, SOL_SCTP,
				SCTP_SOCKOPT_BINDX_ADD, (char *)addr, addr_len);
							SCTP_SOCKOPT_BINDX_ADD,
							(char *)addr, addr_len);
	set_fs(fs);

	if (result < 0)
@@ -756,16 +764,13 @@ void *dlm_lowcomms_get_buffer(int nodeid, int len, gfp_t allocation, char **ppc)
	int users = 0;
	struct nodeinfo *ni;

	if (!atomic_read(&accepting))
		return NULL;

	ni = nodeid2nodeinfo(nodeid, allocation);
	if (!ni)
		return NULL;

	spin_lock(&ni->writequeue_lock);
	e = list_entry(ni->writequeue.prev, struct writequeue_entry, list);
	if (((struct list_head *) e == &ni->writequeue) ||
	if ((&e->list == &ni->writequeue) ||
	    (PAGE_CACHE_SIZE - e->end < len)) {
		e = NULL;
	} else {
@@ -803,9 +808,6 @@ void dlm_lowcomms_commit_buffer(void *arg)
	int users;
	struct nodeinfo *ni = e->ni;

	if (!atomic_read(&accepting))
		return;

	spin_lock(&ni->writequeue_lock);
	users = --e->users;
	if (users)
@@ -878,7 +880,7 @@ static void initiate_association(int nodeid)
	cmsg->cmsg_level = IPPROTO_SCTP;
	cmsg->cmsg_type = SCTP_SNDRCV;
	cmsg->cmsg_len = CMSG_LEN(sizeof(struct sctp_sndrcvinfo));
	sinfo = (struct sctp_sndrcvinfo *)CMSG_DATA(cmsg);
	sinfo = CMSG_DATA(cmsg);
	memset(sinfo, 0x00, sizeof(struct sctp_sndrcvinfo));
	sinfo->sinfo_ppid = cpu_to_le32(dlm_local_nodeid);

@@ -892,7 +894,7 @@ static void initiate_association(int nodeid)
}

/* Send a message */
static int send_to_sock(struct nodeinfo *ni)
static void send_to_sock(struct nodeinfo *ni)
{
	int ret = 0;
	struct writequeue_entry *e;
@@ -909,7 +911,7 @@ static int send_to_sock(struct nodeinfo *ni)
	if (!ni->assoc_id && !test_and_set_bit(NI_INIT_PENDING, &ni->flags)) {
		spin_unlock(&ni->lock);
		initiate_association(ni->nodeid);
		return 0;
		return;
	}
	spin_unlock(&ni->lock);

@@ -923,7 +925,7 @@ static int send_to_sock(struct nodeinfo *ni)
	cmsg->cmsg_level = IPPROTO_SCTP;
	cmsg->cmsg_type = SCTP_SNDRCV;
	cmsg->cmsg_len = CMSG_LEN(sizeof(struct sctp_sndrcvinfo));
	sinfo = (struct sctp_sndrcvinfo *)CMSG_DATA(cmsg);
	sinfo = CMSG_DATA(cmsg);
	memset(sinfo, 0x00, sizeof(struct sctp_sndrcvinfo));
	sinfo->sinfo_ppid = cpu_to_le32(dlm_local_nodeid);
	sinfo->sinfo_assoc_id = ni->assoc_id;
@@ -955,7 +957,7 @@ static int send_to_sock(struct nodeinfo *ni)
				goto send_error;
		} else {
			/* Don't starve people filling buffers */
			schedule();
			cond_resched();
		}

		spin_lock(&ni->writequeue_lock);
@@ -964,13 +966,14 @@ static int send_to_sock(struct nodeinfo *ni)

		if (e->len == 0 && e->users == 0) {
			list_del(&e->list);
			kunmap(e->page);
			free_entry(e);
			continue;
		}
	}
	spin_unlock(&ni->writequeue_lock);
out:
	return ret;
	return;

send_error:
	log_print("Error sending to node %d %d", ni->nodeid, ret);
@@ -982,7 +985,7 @@ static int send_to_sock(struct nodeinfo *ni)
	} else
		spin_unlock(&ni->lock);

	return ret;
	return;
}

/* Try to send any messages that are pending */
@@ -1106,7 +1109,7 @@ static int dlm_recvd(void *data)
		set_current_state(TASK_INTERRUPTIBLE);
		add_wait_queue(&lowcomms_recv_wait, &wait);
		if (!test_bit(CF_READ_PENDING, &sctp_con.flags))
			schedule();
			cond_resched();
		remove_wait_queue(&lowcomms_recv_wait, &wait);
		set_current_state(TASK_RUNNING);

@@ -1118,12 +1121,12 @@ static int dlm_recvd(void *data)

				/* Don't starve out everyone else */
				if (++count >= MAX_RX_MSG_COUNT) {
					schedule();
					cond_resched();
					count = 0;
				}
			} while (!kthread_should_stop() && ret >=0);
		}
		schedule();
		cond_resched();
	}

	return 0;
@@ -1138,7 +1141,7 @@ static int dlm_sendd(void *data)
	while (!kthread_should_stop()) {
		set_current_state(TASK_INTERRUPTIBLE);
		if (write_list_empty())
			schedule();
			cond_resched();
		set_current_state(TASK_RUNNING);

		if (sctp_con.eagain_flag) {
@@ -1197,7 +1200,6 @@ int dlm_lowcomms_start(void)
	error = daemons_start();
	if (error)
		goto fail_sock;
	atomic_set(&accepting, 1);
	return 0;

fail_sock:
@@ -1205,35 +1207,21 @@ int dlm_lowcomms_start(void)
	return error;
}

/* Set all the activity flags to prevent any socket activity. */

void dlm_lowcomms_stop(void)
{
	atomic_set(&accepting, 0);
	int i;

	sctp_con.flags = 0x7;
	daemons_stop();
	clean_writequeues();
	close_connection();
	dealloc_nodeinfo();
	max_nodeid = 0;
}

int dlm_lowcomms_init(void)
{
	init_waitqueue_head(&lowcomms_recv_wait);
	spin_lock_init(&write_nodes_lock);
	INIT_LIST_HEAD(&write_nodes);
	init_rwsem(&nodeinfo_lock);
	return 0;
}

void dlm_lowcomms_exit(void)
{
	int i;
	dlm_local_count = 0;
	dlm_local_nodeid = 0;

	for (i = 0; i < dlm_local_count; i++)
		kfree(dlm_local_addr[i]);
	dlm_local_count = 0;
	dlm_local_nodeid = 0;
}
+134 −208

File changed.

Preview size limit exceeded, changes collapsed.

+0 −2
Original line number Diff line number Diff line
@@ -14,8 +14,6 @@
#ifndef __LOWCOMMS_DOT_H__
#define __LOWCOMMS_DOT_H__

int dlm_lowcomms_init(void);
void dlm_lowcomms_exit(void);
int dlm_lowcomms_start(void);
void dlm_lowcomms_stop(void);
int dlm_lowcomms_close(int nodeid);
+1 −9
Original line number Diff line number Diff line
@@ -16,7 +16,6 @@
#include "lock.h"
#include "user.h"
#include "memory.h"
#include "lowcomms.h"
#include "config.h"

#ifdef CONFIG_DLM_DEBUG
@@ -47,20 +46,14 @@ static int __init init_dlm(void)
	if (error)
		goto out_config;

	error = dlm_lowcomms_init();
	if (error)
		goto out_debug;

	error = dlm_user_init();
	if (error)
		goto out_lowcomms;
		goto out_debug;

	printk("DLM (built %s %s) installed\n", __DATE__, __TIME__);

	return 0;

 out_lowcomms:
	dlm_lowcomms_exit();
 out_debug:
	dlm_unregister_debugfs();
 out_config:
@@ -76,7 +69,6 @@ static int __init init_dlm(void)
static void __exit exit_dlm(void)
{
	dlm_user_exit();
	dlm_lowcomms_exit();
	dlm_config_exit();
	dlm_memory_exit();
	dlm_lockspace_exit();