Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit 48be539d authored by Chuck Lever's avatar Chuck Lever Committed by Anna Schumaker
Browse files

xprtrdma: Introduce ->alloc_slot call-out for xprtrdma



rpcrdma_buffer_get acquires an rpcrdma_req and rep for each RPC.
Currently this is done in the call_allocate action, and sometimes it
can fail if there are many outstanding RPCs.

When call_allocate fails, the RPC task is put on the delayq. It is
awoken a few milliseconds later, but there's no guarantee it will
get a buffer at that time. The RPC task can be repeatedly put back
to sleep or even starved.

The call_allocate action should rarely fail. The delayq mechanism is
not meant to deal with transport congestion.

In the current sunrpc stack, there is a friendlier way to deal with
this situation. These objects are actually tantamount to an RPC
slot (rpc_rqst) and there is a separate FSM action, distinct from
call_allocate, for allocating slot resources. This is the
call_reserve action.

When allocation fails during this action, the RPC is placed on the
transport's backlog queue. The backlog mechanism provides a stronger
guarantee that when the RPC is awoken, a buffer will be available
for it; and backlogged RPCs are awoken one-at-a-time.

To make slot resource allocation occur in the call_reserve action,
create special ->alloc_slot and ->free_slot call-outs for xprtrdma.

Signed-off-by: default avatarChuck Lever <chuck.lever@oracle.com>
Signed-off-by: default avatarAnna Schumaker <Anna.Schumaker@Netapp.com>
parent a9cde23a
Loading
Loading
Loading
Loading
+50 −2
Original line number Diff line number Diff line
@@ -538,6 +538,54 @@ xprt_rdma_connect(struct rpc_xprt *xprt, struct rpc_task *task)
	}
}

/**
 * xprt_rdma_alloc_slot - allocate an rpc_rqst
 * @xprt: controlling RPC transport
 * @task: RPC task requesting a fresh rpc_rqst
 *
 * tk_status values:
 *	%0 if task->tk_rqstp points to a fresh rpc_rqst
 *	%-EAGAIN if no rpc_rqst is available; queued on backlog
 */
static void
xprt_rdma_alloc_slot(struct rpc_xprt *xprt, struct rpc_task *task)
{
	struct rpc_rqst *rqst;

	spin_lock(&xprt->reserve_lock);
	if (list_empty(&xprt->free))
		goto out_sleep;
	rqst = list_first_entry(&xprt->free, struct rpc_rqst, rq_list);
	list_del(&rqst->rq_list);
	spin_unlock(&xprt->reserve_lock);

	task->tk_rqstp = rqst;
	task->tk_status = 0;
	return;

out_sleep:
	rpc_sleep_on(&xprt->backlog, task, NULL);
	spin_unlock(&xprt->reserve_lock);
	task->tk_status = -EAGAIN;
}

/**
 * xprt_rdma_free_slot - release an rpc_rqst
 * @xprt: controlling RPC transport
 * @rqst: rpc_rqst to release
 *
 */
static void
xprt_rdma_free_slot(struct rpc_xprt *xprt, struct rpc_rqst *rqst)
{
	memset(rqst, 0, sizeof(*rqst));

	spin_lock(&xprt->reserve_lock);
	list_add(&rqst->rq_list, &xprt->free);
	rpc_wake_up_next(&xprt->backlog);
	spin_unlock(&xprt->reserve_lock);
}

static bool
rpcrdma_get_sendbuf(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req,
		    size_t size, gfp_t flags)
@@ -780,8 +828,8 @@ xprt_rdma_disable_swap(struct rpc_xprt *xprt)
static const struct rpc_xprt_ops xprt_rdma_procs = {
	.reserve_xprt		= xprt_reserve_xprt_cong,
	.release_xprt		= xprt_release_xprt_cong, /* sunrpc/xprt.c */
	.alloc_slot		= xprt_alloc_slot,
	.free_slot		= xprt_free_slot,
	.alloc_slot		= xprt_rdma_alloc_slot,
	.free_slot		= xprt_rdma_free_slot,
	.release_request	= xprt_release_rqst_cong,       /* ditto */
	.set_retrans_timeout	= xprt_set_retrans_timeout_def, /* ditto */
	.timer			= xprt_rdma_timer,