Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit 3262c816 authored by Greg Banks's avatar Greg Banks Committed by Linus Torvalds
Browse files

[PATCH] knfsd: split svc_serv into pools



Split out the list of idle threads and pending sockets from svc_serv into a
new svc_pool structure, and allocate a fixed number (in this patch, 1) of
pools per svc_serv.  The new structure contains a lock which takes over
several of the duties of svc_serv->sv_lock, which is now relegated to
protecting only sv_tempsocks, sv_permsocks, and sv_tmpcnt in svc_serv.

The point is to move the hottest fields out of svc_serv and into svc_pool,
allowing a following patch to arrange for a svc_pool per NUMA node or per CPU.
 This is a major step towards making the NFS server NUMA-friendly.

Signed-off-by: default avatarGreg Banks <gnb@melbourne.sgi.com>
Signed-off-by: default avatarNeil Brown <neilb@suse.de>
Signed-off-by: default avatarAndrew Morton <akpm@osdl.org>
Signed-off-by: default avatarLinus Torvalds <torvalds@osdl.org>
parent c081a0c7
Loading
Loading
Loading
Loading
+23 −2
Original line number Original line Diff line number Diff line
@@ -17,6 +17,25 @@
#include <linux/wait.h>
#include <linux/wait.h>
#include <linux/mm.h>
#include <linux/mm.h>



/*
 *
 * RPC service thread pool.
 *
 * Pool of threads and temporary sockets.  Generally there is only
 * a single one of these per RPC service, but on NUMA machines those
 * services that can benefit from it (i.e. nfs but not lockd) will
 * have one pool per NUMA node.  This optimisation reduces cross-
 * node traffic on multi-node NUMA NFS servers.
 */
struct svc_pool {
	unsigned int		sp_id;	    	/* pool id; also node id on NUMA */
	spinlock_t		sp_lock;	/* protects all fields */
	struct list_head	sp_threads;	/* idle server threads */
	struct list_head	sp_sockets;	/* pending sockets */
	unsigned int		sp_nrthreads;	/* # of threads in pool */
} ____cacheline_aligned_in_smp;

/*
/*
 * RPC service.
 * RPC service.
 *
 *
@@ -28,8 +47,6 @@
 * We currently do not support more than one RPC program per daemon.
 * We currently do not support more than one RPC program per daemon.
 */
 */
struct svc_serv {
struct svc_serv {
	struct list_head	sv_threads;	/* idle server threads */
	struct list_head	sv_sockets;	/* pending sockets */
	struct svc_program *	sv_program;	/* RPC program */
	struct svc_program *	sv_program;	/* RPC program */
	struct svc_stat *	sv_stats;	/* RPC statistics */
	struct svc_stat *	sv_stats;	/* RPC statistics */
	spinlock_t		sv_lock;
	spinlock_t		sv_lock;
@@ -44,6 +61,9 @@ struct svc_serv {


	char *			sv_name;	/* service name */
	char *			sv_name;	/* service name */


	unsigned int		sv_nrpools;	/* number of thread pools */
	struct svc_pool *	sv_pools;	/* array of thread pools */

	void			(*sv_shutdown)(struct svc_serv *serv);
	void			(*sv_shutdown)(struct svc_serv *serv);
						/* Callback to use when last thread
						/* Callback to use when last thread
						 * exits.
						 * exits.
@@ -138,6 +158,7 @@ struct svc_rqst {
	int			rq_addrlen;
	int			rq_addrlen;


	struct svc_serv *	rq_server;	/* RPC service definition */
	struct svc_serv *	rq_server;	/* RPC service definition */
	struct svc_pool *	rq_pool;	/* thread pool */
	struct svc_procedure *	rq_procinfo;	/* procedure info */
	struct svc_procedure *	rq_procinfo;	/* procedure info */
	struct auth_ops *	rq_authop;	/* authentication flavour */
	struct auth_ops *	rq_authop;	/* authentication flavour */
	struct svc_cred		rq_cred;	/* auth info */
	struct svc_cred		rq_cred;	/* auth info */
+1 −0
Original line number Original line Diff line number Diff line
@@ -20,6 +20,7 @@ struct svc_sock {
	struct socket *		sk_sock;	/* berkeley socket layer */
	struct socket *		sk_sock;	/* berkeley socket layer */
	struct sock *		sk_sk;		/* INET layer */
	struct sock *		sk_sk;		/* INET layer */


	struct svc_pool *	sk_pool;	/* current pool iff queued */
	struct svc_serv *	sk_server;	/* service for this socket */
	struct svc_serv *	sk_server;	/* service for this socket */
	atomic_t		sk_inuse;	/* use count */
	atomic_t		sk_inuse;	/* use count */
	unsigned long		sk_flags;
	unsigned long		sk_flags;
+49 −7
Original line number Original line Diff line number Diff line
@@ -32,6 +32,7 @@ svc_create(struct svc_program *prog, unsigned int bufsize,
	struct svc_serv	*serv;
	struct svc_serv	*serv;
	int vers;
	int vers;
	unsigned int xdrsize;
	unsigned int xdrsize;
	unsigned int i;


	if (!(serv = kzalloc(sizeof(*serv), GFP_KERNEL)))
	if (!(serv = kzalloc(sizeof(*serv), GFP_KERNEL)))
		return NULL;
		return NULL;
@@ -55,13 +56,33 @@ svc_create(struct svc_program *prog, unsigned int bufsize,
		prog = prog->pg_next;
		prog = prog->pg_next;
	}
	}
	serv->sv_xdrsize   = xdrsize;
	serv->sv_xdrsize   = xdrsize;
	INIT_LIST_HEAD(&serv->sv_threads);
	INIT_LIST_HEAD(&serv->sv_sockets);
	INIT_LIST_HEAD(&serv->sv_tempsocks);
	INIT_LIST_HEAD(&serv->sv_tempsocks);
	INIT_LIST_HEAD(&serv->sv_permsocks);
	INIT_LIST_HEAD(&serv->sv_permsocks);
	init_timer(&serv->sv_temptimer);
	init_timer(&serv->sv_temptimer);
	spin_lock_init(&serv->sv_lock);
	spin_lock_init(&serv->sv_lock);


	serv->sv_nrpools = 1;
	serv->sv_pools =
		kcalloc(sizeof(struct svc_pool), serv->sv_nrpools,
			GFP_KERNEL);
	if (!serv->sv_pools) {
		kfree(serv);
		return NULL;
	}

	for (i = 0; i < serv->sv_nrpools; i++) {
		struct svc_pool *pool = &serv->sv_pools[i];

		dprintk("initialising pool %u for %s\n",
				i, serv->sv_name);

		pool->sp_id = i;
		INIT_LIST_HEAD(&pool->sp_threads);
		INIT_LIST_HEAD(&pool->sp_sockets);
		spin_lock_init(&pool->sp_lock);
	}


	/* Remove any stale portmap registrations */
	/* Remove any stale portmap registrations */
	svc_register(serv, 0, 0);
	svc_register(serv, 0, 0);


@@ -69,7 +90,7 @@ svc_create(struct svc_program *prog, unsigned int bufsize,
}
}


/*
/*
 * Destroy an RPC service
 * Destroy an RPC service.  Should be called with the BKL held
 */
 */
void
void
svc_destroy(struct svc_serv *serv)
svc_destroy(struct svc_serv *serv)
@@ -110,6 +131,7 @@ svc_destroy(struct svc_serv *serv)


	/* Unregister service with the portmapper */
	/* Unregister service with the portmapper */
	svc_register(serv, 0, 0);
	svc_register(serv, 0, 0);
	kfree(serv->sv_pools);
	kfree(serv);
	kfree(serv);
}
}


@@ -158,10 +180,11 @@ svc_release_buffer(struct svc_rqst *rqstp)
}
}


/*
/*
 * Create a server thread
 * Create a thread in the given pool.  Caller must hold BKL.
 */
 */
int
static int
svc_create_thread(svc_thread_fn func, struct svc_serv *serv)
__svc_create_thread(svc_thread_fn func, struct svc_serv *serv,
		    struct svc_pool *pool)
{
{
	struct svc_rqst	*rqstp;
	struct svc_rqst	*rqstp;
	int		error = -ENOMEM;
	int		error = -ENOMEM;
@@ -178,7 +201,11 @@ svc_create_thread(svc_thread_fn func, struct svc_serv *serv)
		goto out_thread;
		goto out_thread;


	serv->sv_nrthreads++;
	serv->sv_nrthreads++;
	spin_lock_bh(&pool->sp_lock);
	pool->sp_nrthreads++;
	spin_unlock_bh(&pool->sp_lock);
	rqstp->rq_server = serv;
	rqstp->rq_server = serv;
	rqstp->rq_pool = pool;
	error = kernel_thread((int (*)(void *)) func, rqstp, 0);
	error = kernel_thread((int (*)(void *)) func, rqstp, 0);
	if (error < 0)
	if (error < 0)
		goto out_thread;
		goto out_thread;
@@ -193,17 +220,32 @@ svc_create_thread(svc_thread_fn func, struct svc_serv *serv)
}
}


/*
/*
 * Destroy an RPC server thread
 * Create a thread in the default pool.  Caller must hold BKL.
 */
int
svc_create_thread(svc_thread_fn func, struct svc_serv *serv)
{
	return __svc_create_thread(func, serv, &serv->sv_pools[0]);
}

/*
 * Called from a server thread as it's exiting.  Caller must hold BKL.
 */
 */
void
void
svc_exit_thread(struct svc_rqst *rqstp)
svc_exit_thread(struct svc_rqst *rqstp)
{
{
	struct svc_serv	*serv = rqstp->rq_server;
	struct svc_serv	*serv = rqstp->rq_server;
	struct svc_pool	*pool = rqstp->rq_pool;


	svc_release_buffer(rqstp);
	svc_release_buffer(rqstp);
	kfree(rqstp->rq_resp);
	kfree(rqstp->rq_resp);
	kfree(rqstp->rq_argp);
	kfree(rqstp->rq_argp);
	kfree(rqstp->rq_auth_data);
	kfree(rqstp->rq_auth_data);

	spin_lock_bh(&pool->sp_lock);
	pool->sp_nrthreads--;
	spin_unlock_bh(&pool->sp_lock);

	kfree(rqstp);
	kfree(rqstp);


	/* Release the server */
	/* Release the server */
+80 −45
Original line number Original line Diff line number Diff line
@@ -46,7 +46,10 @@


/* SMP locking strategy:
/* SMP locking strategy:
 *
 *
 *	svc_serv->sv_lock protects most stuff for that service.
 *	svc_pool->sp_lock protects most of the fields of that pool.
 * 	svc_serv->sv_lock protects sv_tempsocks, sv_permsocks, sv_tmpcnt.
 *	when both need to be taken (rare), svc_serv->sv_lock is first.
 *	BKL protects svc_serv->sv_nrthread.
 *	svc_sock->sk_defer_lock protects the svc_sock->sk_deferred list
 *	svc_sock->sk_defer_lock protects the svc_sock->sk_deferred list
 *	svc_sock->sk_flags.SK_BUSY prevents a svc_sock being enqueued multiply.
 *	svc_sock->sk_flags.SK_BUSY prevents a svc_sock being enqueued multiply.
 *
 *
@@ -82,22 +85,22 @@ static struct cache_deferred_req *svc_defer(struct cache_req *req);
static int svc_conn_age_period = 6*60;
static int svc_conn_age_period = 6*60;


/*
/*
 * Queue up an idle server thread.  Must have serv->sv_lock held.
 * Queue up an idle server thread.  Must have pool->sp_lock held.
 * Note: this is really a stack rather than a queue, so that we only
 * Note: this is really a stack rather than a queue, so that we only
 * use as many different threads as we need, and the rest don't polute
 * use as many different threads as we need, and the rest don't pollute
 * the cache.
 * the cache.
 */
 */
static inline void
static inline void
svc_serv_enqueue(struct svc_serv *serv, struct svc_rqst *rqstp)
svc_thread_enqueue(struct svc_pool *pool, struct svc_rqst *rqstp)
{
{
	list_add(&rqstp->rq_list, &serv->sv_threads);
	list_add(&rqstp->rq_list, &pool->sp_threads);
}
}


/*
/*
 * Dequeue an nfsd thread.  Must have serv->sv_lock held.
 * Dequeue an nfsd thread.  Must have pool->sp_lock held.
 */
 */
static inline void
static inline void
svc_serv_dequeue(struct svc_serv *serv, struct svc_rqst *rqstp)
svc_thread_dequeue(struct svc_pool *pool, struct svc_rqst *rqstp)
{
{
	list_del(&rqstp->rq_list);
	list_del(&rqstp->rq_list);
}
}
@@ -148,6 +151,7 @@ static void
svc_sock_enqueue(struct svc_sock *svsk)
svc_sock_enqueue(struct svc_sock *svsk)
{
{
	struct svc_serv	*serv = svsk->sk_server;
	struct svc_serv	*serv = svsk->sk_server;
	struct svc_pool *pool = &serv->sv_pools[0];
	struct svc_rqst	*rqstp;
	struct svc_rqst	*rqstp;


	if (!(svsk->sk_flags &
	if (!(svsk->sk_flags &
@@ -156,10 +160,10 @@ svc_sock_enqueue(struct svc_sock *svsk)
	if (test_bit(SK_DEAD, &svsk->sk_flags))
	if (test_bit(SK_DEAD, &svsk->sk_flags))
		return;
		return;


	spin_lock_bh(&serv->sv_lock);
	spin_lock_bh(&pool->sp_lock);


	if (!list_empty(&serv->sv_threads) && 
	if (!list_empty(&pool->sp_threads) &&
	    !list_empty(&serv->sv_sockets))
	    !list_empty(&pool->sp_sockets))
		printk(KERN_ERR
		printk(KERN_ERR
			"svc_sock_enqueue: threads and sockets both waiting??\n");
			"svc_sock_enqueue: threads and sockets both waiting??\n");


@@ -179,6 +183,8 @@ svc_sock_enqueue(struct svc_sock *svsk)
		dprintk("svc: socket %p busy, not enqueued\n", svsk->sk_sk);
		dprintk("svc: socket %p busy, not enqueued\n", svsk->sk_sk);
		goto out_unlock;
		goto out_unlock;
	}
	}
	BUG_ON(svsk->sk_pool != NULL);
	svsk->sk_pool = pool;


	set_bit(SOCK_NOSPACE, &svsk->sk_sock->flags);
	set_bit(SOCK_NOSPACE, &svsk->sk_sock->flags);
	if (((atomic_read(&svsk->sk_reserved) + serv->sv_bufsz)*2
	if (((atomic_read(&svsk->sk_reserved) + serv->sv_bufsz)*2
@@ -189,19 +195,20 @@ svc_sock_enqueue(struct svc_sock *svsk)
		dprintk("svc: socket %p  no space, %d*2 > %ld, not enqueued\n",
		dprintk("svc: socket %p  no space, %d*2 > %ld, not enqueued\n",
			svsk->sk_sk, atomic_read(&svsk->sk_reserved)+serv->sv_bufsz,
			svsk->sk_sk, atomic_read(&svsk->sk_reserved)+serv->sv_bufsz,
			svc_sock_wspace(svsk));
			svc_sock_wspace(svsk));
		svsk->sk_pool = NULL;
		clear_bit(SK_BUSY, &svsk->sk_flags);
		clear_bit(SK_BUSY, &svsk->sk_flags);
		goto out_unlock;
		goto out_unlock;
	}
	}
	clear_bit(SOCK_NOSPACE, &svsk->sk_sock->flags);
	clear_bit(SOCK_NOSPACE, &svsk->sk_sock->flags);




	if (!list_empty(&serv->sv_threads)) {
	if (!list_empty(&pool->sp_threads)) {
		rqstp = list_entry(serv->sv_threads.next,
		rqstp = list_entry(pool->sp_threads.next,
				   struct svc_rqst,
				   struct svc_rqst,
				   rq_list);
				   rq_list);
		dprintk("svc: socket %p served by daemon %p\n",
		dprintk("svc: socket %p served by daemon %p\n",
			svsk->sk_sk, rqstp);
			svsk->sk_sk, rqstp);
		svc_serv_dequeue(serv, rqstp);
		svc_thread_dequeue(pool, rqstp);
		if (rqstp->rq_sock)
		if (rqstp->rq_sock)
			printk(KERN_ERR 
			printk(KERN_ERR 
				"svc_sock_enqueue: server %p, rq_sock=%p!\n",
				"svc_sock_enqueue: server %p, rq_sock=%p!\n",
@@ -210,28 +217,30 @@ svc_sock_enqueue(struct svc_sock *svsk)
		atomic_inc(&svsk->sk_inuse);
		atomic_inc(&svsk->sk_inuse);
		rqstp->rq_reserved = serv->sv_bufsz;
		rqstp->rq_reserved = serv->sv_bufsz;
		atomic_add(rqstp->rq_reserved, &svsk->sk_reserved);
		atomic_add(rqstp->rq_reserved, &svsk->sk_reserved);
		BUG_ON(svsk->sk_pool != pool);
		wake_up(&rqstp->rq_wait);
		wake_up(&rqstp->rq_wait);
	} else {
	} else {
		dprintk("svc: socket %p put into queue\n", svsk->sk_sk);
		dprintk("svc: socket %p put into queue\n", svsk->sk_sk);
		list_add_tail(&svsk->sk_ready, &serv->sv_sockets);
		list_add_tail(&svsk->sk_ready, &pool->sp_sockets);
		BUG_ON(svsk->sk_pool != pool);
	}
	}


out_unlock:
out_unlock:
	spin_unlock_bh(&serv->sv_lock);
	spin_unlock_bh(&pool->sp_lock);
}
}


/*
/*
 * Dequeue the first socket.  Must be called with the serv->sv_lock held.
 * Dequeue the first socket.  Must be called with the pool->sp_lock held.
 */
 */
static inline struct svc_sock *
static inline struct svc_sock *
svc_sock_dequeue(struct svc_serv *serv)
svc_sock_dequeue(struct svc_pool *pool)
{
{
	struct svc_sock	*svsk;
	struct svc_sock	*svsk;


	if (list_empty(&serv->sv_sockets))
	if (list_empty(&pool->sp_sockets))
		return NULL;
		return NULL;


	svsk = list_entry(serv->sv_sockets.next,
	svsk = list_entry(pool->sp_sockets.next,
			  struct svc_sock, sk_ready);
			  struct svc_sock, sk_ready);
	list_del_init(&svsk->sk_ready);
	list_del_init(&svsk->sk_ready);


@@ -250,6 +259,7 @@ svc_sock_dequeue(struct svc_serv *serv)
static inline void
static inline void
svc_sock_received(struct svc_sock *svsk)
svc_sock_received(struct svc_sock *svsk)
{
{
	svsk->sk_pool = NULL;
	clear_bit(SK_BUSY, &svsk->sk_flags);
	clear_bit(SK_BUSY, &svsk->sk_flags);
	svc_sock_enqueue(svsk);
	svc_sock_enqueue(svsk);
}
}
@@ -322,25 +332,33 @@ svc_sock_release(struct svc_rqst *rqstp)


/*
/*
 * External function to wake up a server waiting for data
 * External function to wake up a server waiting for data
 * This really only makes sense for services like lockd
 * which have exactly one thread anyway.
 */
 */
void
void
svc_wake_up(struct svc_serv *serv)
svc_wake_up(struct svc_serv *serv)
{
{
	struct svc_rqst	*rqstp;
	struct svc_rqst	*rqstp;
	unsigned int i;
	struct svc_pool *pool;


	spin_lock_bh(&serv->sv_lock);
	for (i = 0; i < serv->sv_nrpools; i++) {
	if (!list_empty(&serv->sv_threads)) {
		pool = &serv->sv_pools[i];
		rqstp = list_entry(serv->sv_threads.next,

		spin_lock_bh(&pool->sp_lock);
		if (!list_empty(&pool->sp_threads)) {
			rqstp = list_entry(pool->sp_threads.next,
					   struct svc_rqst,
					   struct svc_rqst,
					   rq_list);
					   rq_list);
			dprintk("svc: daemon %p woken up.\n", rqstp);
			dprintk("svc: daemon %p woken up.\n", rqstp);
			/*
			/*
		svc_serv_dequeue(serv, rqstp);
			svc_thread_dequeue(pool, rqstp);
			rqstp->rq_sock = NULL;
			rqstp->rq_sock = NULL;
			 */
			 */
			wake_up(&rqstp->rq_wait);
			wake_up(&rqstp->rq_wait);
		}
		}
	spin_unlock_bh(&serv->sv_lock);
		spin_unlock_bh(&pool->sp_lock);
	}
}
}


/*
/*
@@ -603,7 +621,10 @@ svc_udp_recvfrom(struct svc_rqst *rqstp)
	    /* udp sockets need large rcvbuf as all pending
	    /* udp sockets need large rcvbuf as all pending
	     * requests are still in that buffer.  sndbuf must
	     * requests are still in that buffer.  sndbuf must
	     * also be large enough that there is enough space
	     * also be large enough that there is enough space
	     * for one reply per thread.
	     * for one reply per thread.  We count all threads
	     * rather than threads in a particular pool, which
	     * provides an upper bound on the number of threads
	     * which will access the socket.
	     */
	     */
	    svc_sock_setbufsize(svsk->sk_sock,
	    svc_sock_setbufsize(svsk->sk_sock,
				(serv->sv_nrthreads+3) * serv->sv_bufsz,
				(serv->sv_nrthreads+3) * serv->sv_bufsz,
@@ -948,6 +969,11 @@ svc_tcp_recvfrom(struct svc_rqst *rqstp)
		/* sndbuf needs to have room for one request
		/* sndbuf needs to have room for one request
		 * per thread, otherwise we can stall even when the
		 * per thread, otherwise we can stall even when the
		 * network isn't a bottleneck.
		 * network isn't a bottleneck.
		 *
		 * We count all threads rather than threads in a
		 * particular pool, which provides an upper bound
		 * on the number of threads which will access the socket.
		 *
		 * rcvbuf just needs to be able to hold a few requests.
		 * rcvbuf just needs to be able to hold a few requests.
		 * Normally they will be removed from the queue 
		 * Normally they will be removed from the queue 
		 * as soon a a complete request arrives.
		 * as soon a a complete request arrives.
@@ -1163,13 +1189,16 @@ svc_sock_update_bufs(struct svc_serv *serv)
}
}


/*
/*
 * Receive the next request on any socket.
 * Receive the next request on any socket.  This code is carefully
 * organised not to touch any cachelines in the shared svc_serv
 * structure, only cachelines in the local svc_pool.
 */
 */
int
int
svc_recv(struct svc_rqst *rqstp, long timeout)
svc_recv(struct svc_rqst *rqstp, long timeout)
{
{
	struct svc_sock		*svsk =NULL;
	struct svc_sock		*svsk =NULL;
	struct svc_serv		*serv = rqstp->rq_server;
	struct svc_serv		*serv = rqstp->rq_server;
	struct svc_pool		*pool = rqstp->rq_pool;
	int			len;
	int			len;
	int 			pages;
	int 			pages;
	struct xdr_buf		*arg;
	struct xdr_buf		*arg;
@@ -1219,15 +1248,15 @@ svc_recv(struct svc_rqst *rqstp, long timeout)
	if (signalled())
	if (signalled())
		return -EINTR;
		return -EINTR;


	spin_lock_bh(&serv->sv_lock);
	spin_lock_bh(&pool->sp_lock);
	if ((svsk = svc_sock_dequeue(serv)) != NULL) {
	if ((svsk = svc_sock_dequeue(pool)) != NULL) {
		rqstp->rq_sock = svsk;
		rqstp->rq_sock = svsk;
		atomic_inc(&svsk->sk_inuse);
		atomic_inc(&svsk->sk_inuse);
		rqstp->rq_reserved = serv->sv_bufsz;	
		rqstp->rq_reserved = serv->sv_bufsz;	
		atomic_add(rqstp->rq_reserved, &svsk->sk_reserved);
		atomic_add(rqstp->rq_reserved, &svsk->sk_reserved);
	} else {
	} else {
		/* No data pending. Go to sleep */
		/* No data pending. Go to sleep */
		svc_serv_enqueue(serv, rqstp);
		svc_thread_enqueue(pool, rqstp);


		/*
		/*
		 * We have to be able to interrupt this wait
		 * We have to be able to interrupt this wait
@@ -1235,26 +1264,26 @@ svc_recv(struct svc_rqst *rqstp, long timeout)
		 */
		 */
		set_current_state(TASK_INTERRUPTIBLE);
		set_current_state(TASK_INTERRUPTIBLE);
		add_wait_queue(&rqstp->rq_wait, &wait);
		add_wait_queue(&rqstp->rq_wait, &wait);
		spin_unlock_bh(&serv->sv_lock);
		spin_unlock_bh(&pool->sp_lock);


		schedule_timeout(timeout);
		schedule_timeout(timeout);


		try_to_freeze();
		try_to_freeze();


		spin_lock_bh(&serv->sv_lock);
		spin_lock_bh(&pool->sp_lock);
		remove_wait_queue(&rqstp->rq_wait, &wait);
		remove_wait_queue(&rqstp->rq_wait, &wait);


		if (!(svsk = rqstp->rq_sock)) {
		if (!(svsk = rqstp->rq_sock)) {
			svc_serv_dequeue(serv, rqstp);
			svc_thread_dequeue(pool, rqstp);
			spin_unlock_bh(&serv->sv_lock);
			spin_unlock_bh(&pool->sp_lock);
			dprintk("svc: server %p, no data yet\n", rqstp);
			dprintk("svc: server %p, no data yet\n", rqstp);
			return signalled()? -EINTR : -EAGAIN;
			return signalled()? -EINTR : -EAGAIN;
		}
		}
	}
	}
	spin_unlock_bh(&serv->sv_lock);
	spin_unlock_bh(&pool->sp_lock);


	dprintk("svc: server %p, socket %p, inuse=%d\n",
	dprintk("svc: server %p, pool %u, socket %p, inuse=%d\n",
		 rqstp, svsk, atomic_read(&svsk->sk_inuse));
		 rqstp, pool->sp_id, svsk, atomic_read(&svsk->sk_inuse));
	len = svsk->sk_recvfrom(rqstp);
	len = svsk->sk_recvfrom(rqstp);
	dprintk("svc: got len=%d\n", len);
	dprintk("svc: got len=%d\n", len);


@@ -1553,7 +1582,13 @@ svc_delete_socket(struct svc_sock *svsk)


	if (!test_and_set_bit(SK_DETACHED, &svsk->sk_flags))
	if (!test_and_set_bit(SK_DETACHED, &svsk->sk_flags))
		list_del_init(&svsk->sk_list);
		list_del_init(&svsk->sk_list);
	list_del_init(&svsk->sk_ready);
    	/*
	 * We used to delete the svc_sock from whichever list
	 * it's sk_ready node was on, but we don't actually
	 * need to.  This is because the only time we're called
	 * while still attached to a queue, the queue itself
	 * is about to be destroyed (in svc_destroy).
	 */
	if (!test_and_set_bit(SK_DEAD, &svsk->sk_flags))
	if (!test_and_set_bit(SK_DEAD, &svsk->sk_flags))
		if (test_bit(SK_TEMP, &svsk->sk_flags))
		if (test_bit(SK_TEMP, &svsk->sk_flags))
			serv->sv_tmpcnt--;
			serv->sv_tmpcnt--;