Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit ce7c252a authored by Trond Myklebust's avatar Trond Myklebust
Browse files

SUNRPC: Add a separate spinlock to protect the RPC request receive list



This further reduces contention with the transport_lock, and allows us
to convert to using a non-bh-safe spinlock, since the list is now never
accessed from a bh context.

Signed-off-by: default avatarTrond Myklebust <trond.myklebust@primarydata.com>
parent 040249df
Loading
Loading
Loading
Loading
+1 −0
Original line number Diff line number Diff line
@@ -232,6 +232,7 @@ struct rpc_xprt {
	 */
	spinlock_t		transport_lock;	/* lock transport info */
	spinlock_t		reserve_lock;	/* lock slot table */
	spinlock_t		recv_lock;	/* lock receive list */
	u32			xid;		/* Next XID value to use */
	struct rpc_task *	snd_task;	/* Task blocked in send */
	struct svc_xprt		*bc_xprt;	/* NFSv4.1 backchannel */
+3 −3
Original line number Diff line number Diff line
@@ -1001,7 +1001,7 @@ static int receive_cb_reply(struct svc_sock *svsk, struct svc_rqst *rqstp)

	if (!bc_xprt)
		return -EAGAIN;
	spin_lock_bh(&bc_xprt->transport_lock);
	spin_lock(&bc_xprt->recv_lock);
	req = xprt_lookup_rqst(bc_xprt, xid);
	if (!req)
		goto unlock_notfound;
@@ -1019,7 +1019,7 @@ static int receive_cb_reply(struct svc_sock *svsk, struct svc_rqst *rqstp)
	memcpy(dst->iov_base, src->iov_base, src->iov_len);
	xprt_complete_rqst(req->rq_task, rqstp->rq_arg.len);
	rqstp->rq_arg.len = 0;
	spin_unlock_bh(&bc_xprt->transport_lock);
	spin_unlock(&bc_xprt->recv_lock);
	return 0;
unlock_notfound:
	printk(KERN_NOTICE
@@ -1028,7 +1028,7 @@ static int receive_cb_reply(struct svc_sock *svsk, struct svc_rqst *rqstp)
		__func__, ntohl(calldir),
		bc_xprt, ntohl(xid));
unlock_eagain:
	spin_unlock_bh(&bc_xprt->transport_lock);
	spin_unlock(&bc_xprt->recv_lock);
	return -EAGAIN;
}

+12 −8
Original line number Diff line number Diff line
@@ -872,17 +872,17 @@ void xprt_unpin_rqst(struct rpc_rqst *req)
}

static void xprt_wait_on_pinned_rqst(struct rpc_rqst *req)
__must_hold(&req->rq_xprt->transport_lock)
__must_hold(&req->rq_xprt->recv_lock)
{
	struct rpc_task *task = req->rq_task;
	
	if (task && test_bit(RPC_TASK_MSG_RECV, &task->tk_runstate)) {
		spin_unlock_bh(&req->rq_xprt->transport_lock);
		spin_unlock(&req->rq_xprt->recv_lock);
		set_bit(RPC_TASK_MSG_RECV_WAIT, &task->tk_runstate);
		wait_on_bit(&task->tk_runstate, RPC_TASK_MSG_RECV,
				TASK_UNINTERRUPTIBLE);
		clear_bit(RPC_TASK_MSG_RECV_WAIT, &task->tk_runstate);
		spin_lock_bh(&req->rq_xprt->transport_lock);
		spin_lock(&req->rq_xprt->recv_lock);
	}
}

@@ -1008,13 +1008,13 @@ void xprt_transmit(struct rpc_task *task)
			/*
			 * Add to the list only if we're expecting a reply
			 */
			spin_lock_bh(&xprt->transport_lock);
			/* Update the softirq receive buffer */
			memcpy(&req->rq_private_buf, &req->rq_rcv_buf,
					sizeof(req->rq_private_buf));
			/* Add request to the receive list */
			spin_lock(&xprt->recv_lock);
			list_add_tail(&req->rq_list, &xprt->recv);
			spin_unlock_bh(&xprt->transport_lock);
			spin_unlock(&xprt->recv_lock);
			xprt_reset_majortimeo(req);
			/* Turn off autodisconnect */
			del_singleshot_timer_sync(&xprt->timer);
@@ -1329,15 +1329,18 @@ void xprt_release(struct rpc_task *task)
		task->tk_ops->rpc_count_stats(task, task->tk_calldata);
	else if (task->tk_client)
		rpc_count_iostats(task, task->tk_client->cl_metrics);
	spin_lock(&xprt->recv_lock);
	if (!list_empty(&req->rq_list)) {
		list_del(&req->rq_list);
		xprt_wait_on_pinned_rqst(req);
	}
	spin_unlock(&xprt->recv_lock);
	spin_lock_bh(&xprt->transport_lock);
	xprt->ops->release_xprt(xprt, task);
	if (xprt->ops->release_request)
		xprt->ops->release_request(task);
	if (!list_empty(&req->rq_list))
		list_del(&req->rq_list);
	xprt->last_used = jiffies;
	xprt_schedule_autodisconnect(xprt);
	xprt_wait_on_pinned_rqst(req);
	spin_unlock_bh(&xprt->transport_lock);
	if (req->rq_buffer)
		xprt->ops->buf_free(task);
@@ -1361,6 +1364,7 @@ static void xprt_init(struct rpc_xprt *xprt, struct net *net)

	spin_lock_init(&xprt->transport_lock);
	spin_lock_init(&xprt->reserve_lock);
	spin_lock_init(&xprt->recv_lock);

	INIT_LIST_HEAD(&xprt->free);
	INIT_LIST_HEAD(&xprt->recv);
+4 −4
Original line number Diff line number Diff line
@@ -1051,7 +1051,7 @@ rpcrdma_reply_handler(struct work_struct *work)
	 * RPC completion while holding the transport lock to ensure
	 * the rep, rqst, and rq_task pointers remain stable.
	 */
	spin_lock_bh(&xprt->transport_lock);
	spin_lock(&xprt->recv_lock);
	rqst = xprt_lookup_rqst(xprt, headerp->rm_xid);
	if (!rqst)
		goto out_norqst;
@@ -1136,7 +1136,7 @@ rpcrdma_reply_handler(struct work_struct *work)
		xprt_release_rqst_cong(rqst->rq_task);

	xprt_complete_rqst(rqst->rq_task, status);
	spin_unlock_bh(&xprt->transport_lock);
	spin_unlock(&xprt->recv_lock);
	dprintk("RPC:       %s: xprt_complete_rqst(0x%p, 0x%p, %d)\n",
		__func__, xprt, rqst, status);
	return;
@@ -1187,12 +1187,12 @@ rpcrdma_reply_handler(struct work_struct *work)
	r_xprt->rx_stats.bad_reply_count++;
	goto out;

/* The req was still available, but by the time the transport_lock
/* The req was still available, but by the time the recv_lock
 * was acquired, the rqst and task had been released. Thus the RPC
 * has already been terminated.
 */
out_norqst:
	spin_unlock_bh(&xprt->transport_lock);
	spin_unlock(&xprt->recv_lock);
	rpcrdma_buffer_put(req);
	dprintk("RPC:       %s: race, no rqst left for req %p\n",
		__func__, req);
+5 −2
Original line number Diff line number Diff line
@@ -52,7 +52,7 @@ int svc_rdma_handle_bc_reply(struct rpc_xprt *xprt, __be32 *rdma_resp,
	if (src->iov_len < 24)
		goto out_shortreply;

	spin_lock_bh(&xprt->transport_lock);
	spin_lock(&xprt->recv_lock);
	req = xprt_lookup_rqst(xprt, xid);
	if (!req)
		goto out_notfound;
@@ -69,17 +69,20 @@ int svc_rdma_handle_bc_reply(struct rpc_xprt *xprt, __be32 *rdma_resp,
	else if (credits > r_xprt->rx_buf.rb_bc_max_requests)
		credits = r_xprt->rx_buf.rb_bc_max_requests;

	spin_lock_bh(&xprt->transport_lock);
	cwnd = xprt->cwnd;
	xprt->cwnd = credits << RPC_CWNDSHIFT;
	if (xprt->cwnd > cwnd)
		xprt_release_rqst_cong(req->rq_task);
	spin_unlock_bh(&xprt->transport_lock);


	ret = 0;
	xprt_complete_rqst(req->rq_task, rcvbuf->len);
	rcvbuf->len = 0;

out_unlock:
	spin_unlock_bh(&xprt->transport_lock);
	spin_unlock(&xprt->recv_lock);
out:
	return ret;

Loading