Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit 552bf225 authored by Chuck Lever's avatar Chuck Lever Committed by Anna Schumaker
Browse files

xprtrdma: Use new CQ API for RPC-over-RDMA client receive CQs



Calling ib_poll_cq() to sort through WCs during a completion is a
common pattern amongst RDMA consumers. Since commit 14d3a3b2
("IB: add a proper completion queue abstraction"), WC sorting can
be handled by the IB core.

By converting to this new API, xprtrdma is made a better neighbor to
other RDMA consumers, as it allows the core to schedule the delivery
of completions more fairly amongst all active consumers.

Because each ib_cqe carries a pointer to a completion method, the
core can now post its own operations on a consumer's QP, and handle
the completions itself, without changes to the consumer.

xprtrdma's reply processing is already handled in a work queue, but
there is some initial order-dependent processing that is done in the
soft IRQ context before a work item is scheduled.

IB_POLL_SOFTIRQ is a direct replacement for the current xprtrdma
receive code path.

Signed-off-by: default avatarChuck Lever <chuck.lever@oracle.com>
Reviewed-by: default avatarDevesh Sharma <devesh.sharma@broadcom.com>
Reviewed-by: default avatarSagi Grimberg <sagig@mellanox.com>
Signed-off-by: default avatarAnna Schumaker <Anna.Schumaker@Netapp.com>
parent 23826c7a
Loading
Loading
Loading
Loading
+20 −58
Original line number Diff line number Diff line
@@ -212,11 +212,18 @@ rpcrdma_update_granted_credits(struct rpcrdma_rep *rep)
	atomic_set(&buffer->rb_credits, credits);
}

/**
 * rpcrdma_receive_wc - Invoked by RDMA provider for each polled Receive WC
 * @cq:	completion queue (ignored)
 * @wc:	completed WR
 *
 */
static void
rpcrdma_recvcq_process_wc(struct ib_wc *wc)
rpcrdma_receive_wc(struct ib_cq *cq, struct ib_wc *wc)
{
	struct rpcrdma_rep *rep =
			(struct rpcrdma_rep *)(unsigned long)wc->wr_id;
	struct ib_cqe *cqe = wc->wr_cqe;
	struct rpcrdma_rep *rep = container_of(cqe, struct rpcrdma_rep,
					       rr_cqe);

	/* WARNING: Only wr_id and status are reliable at this point */
	if (wc->status != IB_WC_SUCCESS)
@@ -242,55 +249,20 @@ rpcrdma_recvcq_process_wc(struct ib_wc *wc)

out_fail:
	if (wc->status != IB_WC_WR_FLUSH_ERR)
		pr_err("RPC:       %s: rep %p: %s\n",
		       __func__, rep, ib_wc_status_msg(wc->status));
		pr_err("rpcrdma: Recv: %s (%u/0x%x)\n",
		       ib_wc_status_msg(wc->status),
		       wc->status, wc->vendor_err);
	rep->rr_len = RPCRDMA_BAD_LEN;
	goto out_schedule;
}

/* The wc array is on stack: automatic memory is always CPU-local.
 *
 * struct ib_wc is 64 bytes, making the poll array potentially
 * large. But this is at the bottom of the call chain. Further
 * substantial work is done in another thread.
 */
static void
rpcrdma_recvcq_poll(struct ib_cq *cq)
{
	struct ib_wc *pos, wcs[4];
	int count, rc;

	do {
		pos = wcs;

		rc = ib_poll_cq(cq, ARRAY_SIZE(wcs), pos);
		if (rc < 0)
			break;

		count = rc;
		while (count-- > 0)
			rpcrdma_recvcq_process_wc(pos++);
	} while (rc == ARRAY_SIZE(wcs));
}

/* Handle provider receive completion upcalls.
 */
static void
rpcrdma_recvcq_upcall(struct ib_cq *cq, void *cq_context)
{
	do {
		rpcrdma_recvcq_poll(cq);
	} while (ib_req_notify_cq(cq, IB_CQ_NEXT_COMP |
				  IB_CQ_REPORT_MISSED_EVENTS) > 0);
}

static void
rpcrdma_flush_cqs(struct rpcrdma_ep *ep)
{
	struct ib_wc wc;

	while (ib_poll_cq(ep->rep_attr.recv_cq, 1, &wc) > 0)
		rpcrdma_recvcq_process_wc(&wc);
		rpcrdma_receive_wc(NULL, &wc);
	while (ib_poll_cq(ep->rep_attr.send_cq, 1, &wc) > 0)
		rpcrdma_sendcq_process_wc(&wc);
}
@@ -655,9 +627,9 @@ rpcrdma_ep_create(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia,
		goto out2;
	}

	cq_attr.cqe = ep->rep_attr.cap.max_recv_wr + 1;
	recvcq = ib_create_cq(ia->ri_device, rpcrdma_recvcq_upcall,
			      rpcrdma_cq_async_error_upcall, NULL, &cq_attr);
	recvcq = ib_alloc_cq(ia->ri_device, NULL,
			     ep->rep_attr.cap.max_recv_wr + 1,
			     0, IB_POLL_SOFTIRQ);
	if (IS_ERR(recvcq)) {
		rc = PTR_ERR(recvcq);
		dprintk("RPC:       %s: failed to create recv CQ: %i\n",
@@ -665,14 +637,6 @@ rpcrdma_ep_create(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia,
		goto out2;
	}

	rc = ib_req_notify_cq(recvcq, IB_CQ_NEXT_COMP);
	if (rc) {
		dprintk("RPC:       %s: ib_req_notify_cq failed: %i\n",
			__func__, rc);
		ib_destroy_cq(recvcq);
		goto out2;
	}

	ep->rep_attr.send_cq = sendcq;
	ep->rep_attr.recv_cq = recvcq;

@@ -735,10 +699,7 @@ rpcrdma_ep_destroy(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
		ia->ri_id->qp = NULL;
	}

	rc = ib_destroy_cq(ep->rep_attr.recv_cq);
	if (rc)
		dprintk("RPC:       %s: ib_destroy_cq returned %i\n",
			__func__, rc);
	ib_free_cq(ep->rep_attr.recv_cq);

	rc = ib_destroy_cq(ep->rep_attr.send_cq);
	if (rc)
@@ -947,6 +908,7 @@ rpcrdma_create_rep(struct rpcrdma_xprt *r_xprt)
	}

	rep->rr_device = ia->ri_device;
	rep->rr_cqe.done = rpcrdma_receive_wc;
	rep->rr_rxprt = r_xprt;
	INIT_WORK(&rep->rr_work, rpcrdma_receive_worker);
	return rep;
@@ -1322,7 +1284,7 @@ rpcrdma_ep_post_recv(struct rpcrdma_ia *ia,
	int rc;

	recv_wr.next = NULL;
	recv_wr.wr_id = (u64) (unsigned long) rep;
	recv_wr.wr_cqe = &rep->rr_cqe;
	recv_wr.sg_list = &rep->rr_rdmabuf->rg_iov;
	recv_wr.num_sge = 1;

+1 −0
Original line number Diff line number Diff line
@@ -171,6 +171,7 @@ rdmab_to_msg(struct rpcrdma_regbuf *rb)
struct rpcrdma_buffer;

struct rpcrdma_rep {
	struct ib_cqe		rr_cqe;
	unsigned int		rr_len;
	struct ib_device	*rr_device;
	struct rpcrdma_xprt	*rr_rxprt;