Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit 6d2d0ee2 authored by Chuck Lever's avatar Chuck Lever Committed by Anna Schumaker
Browse files

xprtrdma: Replace rpcrdma_receive_wq with a per-xprt workqueue



To address a connection-close ordering problem, we need the ability
to drain the RPC completions running on rpcrdma_receive_wq for just
one transport. Give each transport its own RPC completion workqueue,
and drain that workqueue when disconnecting the transport.

Signed-off-by: default avatarChuck Lever <chuck.lever@oracle.com>
Signed-off-by: default avatarAnna Schumaker <Anna.Schumaker@Netapp.com>
parent 6ceea368
Loading
Loading
Loading
Loading
+1 −1
Original line number Diff line number Diff line
@@ -1356,7 +1356,7 @@ void rpcrdma_reply_handler(struct rpcrdma_rep *rep)
	clear_bit(RPCRDMA_REQ_F_PENDING, &req->rl_flags);

	trace_xprtrdma_reply(rqst->rq_task, rep, req, credits);
	queue_work(rpcrdma_receive_wq, &rep->rr_work);
	queue_work(buf->rb_completion_wq, &rep->rr_work);
	return;

out_badversion:
+6 −11
Original line number Diff line number Diff line
@@ -444,10 +444,14 @@ xprt_rdma_close(struct rpc_xprt *xprt)
	struct rpcrdma_ep *ep = &r_xprt->rx_ep;
	struct rpcrdma_ia *ia = &r_xprt->rx_ia;

	might_sleep();

	dprintk("RPC:       %s: closing xprt %p\n", __func__, xprt);

	if (test_and_clear_bit(RPCRDMA_IAF_REMOVING, &ia->ri_flags)) {
	/* Prevent marshaling and sending of new requests */
	xprt_clear_connected(xprt);

	if (test_and_clear_bit(RPCRDMA_IAF_REMOVING, &ia->ri_flags)) {
		rpcrdma_ia_remove(ia);
		return;
	}
@@ -858,8 +862,6 @@ void xprt_rdma_cleanup(void)
		dprintk("RPC:       %s: xprt_unregister returned %i\n",
			__func__, rc);

	rpcrdma_destroy_wq();

	rc = xprt_unregister_transport(&xprt_rdma_bc);
	if (rc)
		dprintk("RPC:       %s: xprt_unregister(bc) returned %i\n",
@@ -870,20 +872,13 @@ int xprt_rdma_init(void)
{
	int rc;

	rc = rpcrdma_alloc_wq();
	if (rc)
		return rc;

	rc = xprt_register_transport(&xprt_rdma);
	if (rc) {
		rpcrdma_destroy_wq();
	if (rc)
		return rc;
	}

	rc = xprt_register_transport(&xprt_rdma_bc);
	if (rc) {
		xprt_unregister_transport(&xprt_rdma);
		rpcrdma_destroy_wq();
		return rc;
	}

+36 −31
Original line number Diff line number Diff line
@@ -80,33 +80,23 @@ static int rpcrdma_create_rep(struct rpcrdma_xprt *r_xprt, bool temp);
static void rpcrdma_dma_unmap_regbuf(struct rpcrdma_regbuf *rb);
static void rpcrdma_post_recvs(struct rpcrdma_xprt *r_xprt, bool temp);

struct workqueue_struct *rpcrdma_receive_wq __read_mostly;

int
rpcrdma_alloc_wq(void)
/* Wait for outstanding transport work to finish.
 */
static void rpcrdma_xprt_drain(struct rpcrdma_xprt *r_xprt)
{
	struct workqueue_struct *recv_wq;

	recv_wq = alloc_workqueue("xprtrdma_receive",
				  WQ_MEM_RECLAIM | WQ_HIGHPRI,
				  0);
	if (!recv_wq)
		return -ENOMEM;

	rpcrdma_receive_wq = recv_wq;
	return 0;
}
	struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
	struct rpcrdma_ia *ia = &r_xprt->rx_ia;

void
rpcrdma_destroy_wq(void)
{
	struct workqueue_struct *wq;
	/* Flush Receives, then wait for deferred Reply work
	 * to complete.
	 */
	ib_drain_qp(ia->ri_id->qp);
	drain_workqueue(buf->rb_completion_wq);

	if (rpcrdma_receive_wq) {
		wq = rpcrdma_receive_wq;
		rpcrdma_receive_wq = NULL;
		destroy_workqueue(wq);
	}
	/* Deferred Reply processing might have scheduled
	 * local invalidations.
	 */
	ib_drain_sq(ia->ri_id->qp);
}

/**
@@ -483,7 +473,7 @@ rpcrdma_ia_remove(struct rpcrdma_ia *ia)
	 *   connection is already gone.
	 */
	if (ia->ri_id->qp) {
		ib_drain_qp(ia->ri_id->qp);
		rpcrdma_xprt_drain(r_xprt);
		rdma_destroy_qp(ia->ri_id);
		ia->ri_id->qp = NULL;
	}
@@ -825,8 +815,10 @@ rpcrdma_ep_connect(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
	return rc;
}

/*
 * rpcrdma_ep_disconnect
/**
 * rpcrdma_ep_disconnect - Disconnect underlying transport
 * @ep: endpoint to disconnect
 * @ia: associated interface adapter
 *
 * This is separate from destroy to facilitate the ability
 * to reconnect without recreating the endpoint.
@@ -837,19 +829,20 @@ rpcrdma_ep_connect(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
void
rpcrdma_ep_disconnect(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
{
	struct rpcrdma_xprt *r_xprt = container_of(ep, struct rpcrdma_xprt,
						   rx_ep);
	int rc;

	/* returns without wait if ID is not connected */
	rc = rdma_disconnect(ia->ri_id);
	if (!rc)
		/* returns without wait if not connected */
		wait_event_interruptible(ep->rep_connect_wait,
							ep->rep_connected != 1);
	else
		ep->rep_connected = rc;
	trace_xprtrdma_disconnect(container_of(ep, struct rpcrdma_xprt,
					       rx_ep), rc);
	trace_xprtrdma_disconnect(r_xprt, rc);

	ib_drain_qp(ia->ri_id->qp);
	rpcrdma_xprt_drain(r_xprt);
}

/* Fixed-size circular FIFO queue. This implementation is wait-free and
@@ -1183,6 +1176,13 @@ rpcrdma_buffer_create(struct rpcrdma_xprt *r_xprt)
	if (rc)
		goto out;

	buf->rb_completion_wq = alloc_workqueue("rpcrdma-%s",
						WQ_MEM_RECLAIM | WQ_HIGHPRI,
						0,
			r_xprt->rx_xprt.address_strings[RPC_DISPLAY_ADDR]);
	if (!buf->rb_completion_wq)
		goto out;

	return 0;
out:
	rpcrdma_buffer_destroy(buf);
@@ -1241,6 +1241,11 @@ rpcrdma_buffer_destroy(struct rpcrdma_buffer *buf)
{
	cancel_delayed_work_sync(&buf->rb_refresh_worker);

	if (buf->rb_completion_wq) {
		destroy_workqueue(buf->rb_completion_wq);
		buf->rb_completion_wq = NULL;
	}

	rpcrdma_sendctxs_destroy(buf);

	while (!list_empty(&buf->rb_recv_bufs)) {
+1 −5
Original line number Diff line number Diff line
@@ -412,6 +412,7 @@ struct rpcrdma_buffer {

	u32			rb_bc_max_requests;

	struct workqueue_struct *rb_completion_wq;
	struct delayed_work	rb_refresh_worker;
};
#define rdmab_to_ia(b) (&container_of((b), struct rpcrdma_xprt, rx_buf)->rx_ia)
@@ -547,8 +548,6 @@ void rpcrdma_ia_close(struct rpcrdma_ia *);
bool frwr_is_supported(struct rpcrdma_ia *);
bool fmr_is_supported(struct rpcrdma_ia *);

extern struct workqueue_struct *rpcrdma_receive_wq;

/*
 * Endpoint calls - xprtrdma/verbs.c
 */
@@ -603,9 +602,6 @@ rpcrdma_dma_map_regbuf(struct rpcrdma_ia *ia, struct rpcrdma_regbuf *rb)
	return __rpcrdma_dma_map_regbuf(ia, rb);
}

int rpcrdma_alloc_wq(void);
void rpcrdma_destroy_wq(void);

/*
 * Wrappers for chunk registration, shared by read/write chunk code.
 */