Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit c8b920bb authored by Chuck Lever's avatar Chuck Lever Committed by Anna Schumaker
Browse files

xprtrdma: Basic support for Remote Invalidation



Have frwr's ro_unmap_sync recognize an invalidated rkey that appears
as part of a Receive completion. Local invalidation can be skipped
for that rkey.

Use an out-of-band signaling mechanism to indicate to the server
that the client is prepared to receive RDMA Send With Invalidate.

Signed-off-by: default avatarChuck Lever <chuck.lever@oracle.com>
Signed-off-by: default avatarAnna Schumaker <Anna.Schumaker@Netapp.com>
parent 87cfb9a0
Loading
Loading
Loading
Loading
+2 −0
Original line number Diff line number Diff line
@@ -273,6 +273,7 @@ fmr_op_unmap_sync(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req)
	 */
	list_for_each_entry(mw, &req->rl_registered, mw_list)
		list_add_tail(&mw->fmr.fm_mr->list, &unmap_list);
	r_xprt->rx_stats.local_inv_needed++;
	rc = ib_unmap_fmr(&unmap_list);
	if (rc)
		goto out_reset;
@@ -330,4 +331,5 @@ const struct rpcrdma_memreg_ops rpcrdma_fmr_memreg_ops = {
	.ro_init_mr			= fmr_op_init_mr,
	.ro_release_mr			= fmr_op_release_mr,
	.ro_displayname			= "fmr",
	.ro_send_w_inv_ok		= 0,
};
+13 −0
Original line number Diff line number Diff line
@@ -67,6 +67,8 @@
 * pending send queue WRs before the transport is reconnected.
 */

#include <linux/sunrpc/rpc_rdma.h>

#include "xprt_rdma.h"

#if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
@@ -471,6 +473,7 @@ static void
frwr_op_unmap_sync(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req)
{
	struct ib_send_wr *invalidate_wrs, *pos, *prev, *bad_wr;
	struct rpcrdma_rep *rep = req->rl_reply;
	struct rpcrdma_ia *ia = &r_xprt->rx_ia;
	struct rpcrdma_mw *mw, *tmp;
	struct rpcrdma_frmr *f;
@@ -486,6 +489,12 @@ frwr_op_unmap_sync(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req)
	f = NULL;
	invalidate_wrs = pos = prev = NULL;
	list_for_each_entry(mw, &req->rl_registered, mw_list) {
		if ((rep->rr_wc_flags & IB_WC_WITH_INVALIDATE) &&
		    (mw->mw_handle == rep->rr_inv_rkey)) {
			mw->frmr.fr_state = FRMR_IS_INVALID;
			continue;
		}

		pos = __frwr_prepare_linv_wr(mw);

		if (!invalidate_wrs)
@@ -495,6 +504,8 @@ frwr_op_unmap_sync(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req)
		prev = pos;
		f = &mw->frmr;
	}
	if (!f)
		goto unmap;

	/* Strong send queue ordering guarantees that when the
	 * last WR in the chain completes, all WRs in the chain
@@ -509,6 +520,7 @@ frwr_op_unmap_sync(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req)
	 * replaces the QP. The RPC reply handler won't call us
	 * unless ri_id->qp is a valid pointer.
	 */
	r_xprt->rx_stats.local_inv_needed++;
	rc = ib_post_send(ia->ri_id->qp, invalidate_wrs, &bad_wr);
	if (rc)
		goto reset_mrs;
@@ -575,4 +587,5 @@ const struct rpcrdma_memreg_ops rpcrdma_frwr_memreg_ops = {
	.ro_init_mr			= frwr_op_init_mr,
	.ro_release_mr			= frwr_op_release_mr,
	.ro_displayname			= "frwr",
	.ro_send_w_inv_ok		= RPCRDMA_CMP_F_SND_W_INV_OK,
};
+14 −4
Original line number Diff line number Diff line
@@ -231,7 +231,8 @@ rpcrdma_convert_kvec(struct kvec *vec, struct rpcrdma_mr_seg *seg, int n)

static int
rpcrdma_convert_iovs(struct xdr_buf *xdrbuf, unsigned int pos,
	enum rpcrdma_chunktype type, struct rpcrdma_mr_seg *seg)
	enum rpcrdma_chunktype type, struct rpcrdma_mr_seg *seg,
	bool reminv_expected)
{
	int len, n, p, page_base;
	struct page **ppages;
@@ -273,6 +274,13 @@ rpcrdma_convert_iovs(struct xdr_buf *xdrbuf, unsigned int pos,
	if (type == rpcrdma_readch)
		return n;

	/* When encoding the Write list, some servers need to see an extra
	 * segment for odd-length Write chunks. The upper layer provides
	 * space in the tail iovec for this purpose.
	 */
	if (type == rpcrdma_writech && reminv_expected)
		return n;

	if (xdrbuf->tail[0].iov_len) {
		/* the rpcrdma protocol allows us to omit any trailing
		 * xdr pad bytes, saving the server an RDMA operation. */
@@ -329,7 +337,7 @@ rpcrdma_encode_read_list(struct rpcrdma_xprt *r_xprt,
	if (rtype == rpcrdma_areadch)
		pos = 0;
	seg = req->rl_segments;
	nsegs = rpcrdma_convert_iovs(&rqst->rq_snd_buf, pos, rtype, seg);
	nsegs = rpcrdma_convert_iovs(&rqst->rq_snd_buf, pos, rtype, seg, false);
	if (nsegs < 0)
		return ERR_PTR(nsegs);

@@ -393,7 +401,8 @@ rpcrdma_encode_write_list(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req,
	seg = req->rl_segments;
	nsegs = rpcrdma_convert_iovs(&rqst->rq_rcv_buf,
				     rqst->rq_rcv_buf.head[0].iov_len,
				     wtype, seg);
				     wtype, seg,
				     r_xprt->rx_ia.ri_reminv_expected);
	if (nsegs < 0)
		return ERR_PTR(nsegs);

@@ -458,7 +467,8 @@ rpcrdma_encode_reply_chunk(struct rpcrdma_xprt *r_xprt,
	}

	seg = req->rl_segments;
	nsegs = rpcrdma_convert_iovs(&rqst->rq_rcv_buf, 0, wtype, seg);
	nsegs = rpcrdma_convert_iovs(&rqst->rq_rcv_buf, 0, wtype, seg,
				     r_xprt->rx_ia.ri_reminv_expected);
	if (nsegs < 0)
		return ERR_PTR(nsegs);

+3 −2
Original line number Diff line number Diff line
@@ -730,10 +730,11 @@ void xprt_rdma_print_stats(struct rpc_xprt *xprt, struct seq_file *seq)
		   r_xprt->rx_stats.failed_marshal_count,
		   r_xprt->rx_stats.bad_reply_count,
		   r_xprt->rx_stats.nomsg_call_count);
	seq_printf(seq, "%lu %lu %lu\n",
	seq_printf(seq, "%lu %lu %lu %lu\n",
		   r_xprt->rx_stats.mrs_recovered,
		   r_xprt->rx_stats.mrs_orphaned,
		   r_xprt->rx_stats.mrs_allocated);
		   r_xprt->rx_stats.mrs_allocated,
		   r_xprt->rx_stats.local_inv_needed);
}

static int
+7 −1
Original line number Diff line number Diff line
@@ -185,6 +185,9 @@ rpcrdma_receive_wc(struct ib_cq *cq, struct ib_wc *wc)
		__func__, rep, wc->byte_len);

	rep->rr_len = wc->byte_len;
	rep->rr_wc_flags = wc->wc_flags;
	rep->rr_inv_rkey = wc->ex.invalidate_rkey;

	ib_dma_sync_single_for_cpu(rep->rr_device,
				   rdmab_addr(rep->rr_rdmabuf),
				   rep->rr_len, DMA_FROM_DEVICE);
@@ -212,12 +215,15 @@ rpcrdma_update_connect_private(struct rpcrdma_xprt *r_xprt,
	const struct rpcrdma_connect_private *pmsg = param->private_data;
	unsigned int rsize, wsize;

	/* Default settings for RPC-over-RDMA Version One */
	r_xprt->rx_ia.ri_reminv_expected = false;
	rsize = RPCRDMA_V1_DEF_INLINE_SIZE;
	wsize = RPCRDMA_V1_DEF_INLINE_SIZE;

	if (pmsg &&
	    pmsg->cp_magic == rpcrdma_cmp_magic &&
	    pmsg->cp_version == RPCRDMA_CMP_VERSION) {
		r_xprt->rx_ia.ri_reminv_expected = true;
		rsize = rpcrdma_decode_buffer_size(pmsg->cp_send_size);
		wsize = rpcrdma_decode_buffer_size(pmsg->cp_recv_size);
	}
@@ -568,7 +574,7 @@ rpcrdma_ep_create(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia,
	/* Prepare RDMA-CM private message */
	pmsg->cp_magic = rpcrdma_cmp_magic;
	pmsg->cp_version = RPCRDMA_CMP_VERSION;
	pmsg->cp_flags = 0;
	pmsg->cp_flags |= ia->ri_ops->ro_send_w_inv_ok;
	pmsg->cp_send_size = rpcrdma_encode_buffer_size(cdata->inline_wsize);
	pmsg->cp_recv_size = rpcrdma_encode_buffer_size(cdata->inline_rsize);
	ep->rep_remote_cma.private_data = pmsg;
Loading