Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit 302d3deb authored by Chuck Lever's avatar Chuck Lever Committed by Anna Schumaker
Browse files

xprtrdma: Prevent inline overflow



When deciding whether to send a Call inline, rpcrdma_marshal_req
doesn't take into account header bytes consumed by chunk lists.
This results in Call messages on the wire that are sometimes larger
than the inline threshold.

Likewise, when a Write list or Reply chunk is in play, the server's
reply has to emit an RDMA Send that includes a larger-than-minimal
RPC-over-RDMA header.

The actual size of a Call message cannot be estimated until after
the chunk lists have been registered. Thus the size of each
RPC-over-RDMA header can be estimated only after chunks are
registered; but the decision to register chunks is based on the size
of that header. Chicken, meet egg.

The best a client can do is estimate header size based on the
largest header that might occur, and then ensure that inline content
is always smaller than that.

Signed-off-by: default avatarChuck Lever <chuck.lever@oracle.com>
Tested-by: default avatarSteve Wise <swise@opengridcomputing.com>
Signed-off-by: default avatarAnna Schumaker <Anna.Schumaker@Netapp.com>
parent 94931746
Loading
Loading
Loading
Loading
+3 −0
Original line number Diff line number Diff line
@@ -39,6 +39,9 @@ static int
fmr_op_open(struct rpcrdma_ia *ia, struct rpcrdma_ep *ep,
	    struct rpcrdma_create_data_internal *cdata)
{
	rpcrdma_set_max_header_sizes(ia, cdata, max_t(unsigned int, 1,
						      RPCRDMA_MAX_DATA_SEGS /
						      RPCRDMA_MAX_FMR_SGES));
	return 0;
}

+3 −0
Original line number Diff line number Diff line
@@ -231,6 +231,9 @@ frwr_op_open(struct rpcrdma_ia *ia, struct rpcrdma_ep *ep,
					       depth;
	}

	rpcrdma_set_max_header_sizes(ia, cdata, max_t(unsigned int, 1,
						      RPCRDMA_MAX_DATA_SEGS /
						      ia->ri_max_frmr_depth));
	return 0;
}

+4 −1
Original line number Diff line number Diff line
@@ -36,8 +36,11 @@ physical_op_open(struct rpcrdma_ia *ia, struct rpcrdma_ep *ep,
		       __func__, PTR_ERR(mr));
		return -ENOMEM;
	}

	ia->ri_dma_mr = mr;

	rpcrdma_set_max_header_sizes(ia, cdata, min_t(unsigned int,
						      RPCRDMA_MAX_DATA_SEGS,
						      RPCRDMA_MAX_HDR_SEGS));
	return 0;
}

+75 −10
Original line number Diff line number Diff line
@@ -61,7 +61,6 @@ enum rpcrdma_chunktype {
	rpcrdma_replych
};

#if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
static const char transfertypes[][12] = {
	"pure inline",	/* no chunks */
	" read chunk",	/* some argument via rdma read */
@@ -69,18 +68,72 @@ static const char transfertypes[][12] = {
	"write chunk",	/* some result via rdma write */
	"reply chunk"	/* entire reply via rdma write */
};
#endif

/* Returns size of largest RPC-over-RDMA header in a Call message
 *
 * The client marshals only one chunk list per Call message.
 * The largest list is the Read list.
 */
static unsigned int rpcrdma_max_call_header_size(unsigned int maxsegs)
{
	unsigned int size;

	/* Fixed header fields and list discriminators */
	size = RPCRDMA_HDRLEN_MIN;

	/* Maximum Read list size */
	maxsegs += 2;	/* segment for head and tail buffers */
	size = maxsegs * sizeof(struct rpcrdma_read_chunk);

	dprintk("RPC:       %s: max call header size = %u\n",
		__func__, size);
	return size;
}

/* Returns size of largest RPC-over-RDMA header in a Reply message
 *
 * There is only one Write list or one Reply chunk per Reply
 * message.  The larger list is the Write list.
 */
static unsigned int rpcrdma_max_reply_header_size(unsigned int maxsegs)
{
	unsigned int size;

	/* Fixed header fields and list discriminators */
	size = RPCRDMA_HDRLEN_MIN;

	/* Maximum Write list size */
	maxsegs += 2;	/* segment for head and tail buffers */
	size = sizeof(__be32);		/* segment count */
	size += maxsegs * sizeof(struct rpcrdma_segment);
	size += sizeof(__be32);	/* list discriminator */

	dprintk("RPC:       %s: max reply header size = %u\n",
		__func__, size);
	return size;
}

void rpcrdma_set_max_header_sizes(struct rpcrdma_ia *ia,
				  struct rpcrdma_create_data_internal *cdata,
				  unsigned int maxsegs)
{
	ia->ri_max_inline_write = cdata->inline_wsize -
				  rpcrdma_max_call_header_size(maxsegs);
	ia->ri_max_inline_read = cdata->inline_rsize -
				 rpcrdma_max_reply_header_size(maxsegs);
}

/* The client can send a request inline as long as the RPCRDMA header
 * plus the RPC call fit under the transport's inline limit. If the
 * combined call message size exceeds that limit, the client must use
 * the read chunk list for this operation.
 */
static bool rpcrdma_args_inline(struct rpc_rqst *rqst)
static bool rpcrdma_args_inline(struct rpcrdma_xprt *r_xprt,
				struct rpc_rqst *rqst)
{
	unsigned int callsize = RPCRDMA_HDRLEN_MIN + rqst->rq_snd_buf.len;
	struct rpcrdma_ia *ia = &r_xprt->rx_ia;

	return callsize <= RPCRDMA_INLINE_WRITE_THRESHOLD(rqst);
	return rqst->rq_snd_buf.len <= ia->ri_max_inline_write;
}

/* The client can't know how large the actual reply will be. Thus it
@@ -89,11 +142,12 @@ static bool rpcrdma_args_inline(struct rpc_rqst *rqst)
 * limit, the client must provide a write list or a reply chunk for
 * this request.
 */
static bool rpcrdma_results_inline(struct rpc_rqst *rqst)
static bool rpcrdma_results_inline(struct rpcrdma_xprt *r_xprt,
				   struct rpc_rqst *rqst)
{
	unsigned int repsize = RPCRDMA_HDRLEN_MIN + rqst->rq_rcv_buf.buflen;
	struct rpcrdma_ia *ia = &r_xprt->rx_ia;

	return repsize <= RPCRDMA_INLINE_READ_THRESHOLD(rqst);
	return rqst->rq_rcv_buf.buflen <= ia->ri_max_inline_read;
}

static int
@@ -492,7 +546,7 @@ rpcrdma_marshal_req(struct rpc_rqst *rqst)
	 */
	if (rqst->rq_rcv_buf.flags & XDRBUF_READ)
		wtype = rpcrdma_writech;
	else if (rpcrdma_results_inline(rqst))
	else if (rpcrdma_results_inline(r_xprt, rqst))
		wtype = rpcrdma_noch;
	else
		wtype = rpcrdma_replych;
@@ -511,7 +565,7 @@ rpcrdma_marshal_req(struct rpc_rqst *rqst)
	 * that both has a data payload, and whose non-data arguments
	 * by themselves are larger than the inline threshold.
	 */
	if (rpcrdma_args_inline(rqst)) {
	if (rpcrdma_args_inline(r_xprt, rqst)) {
		rtype = rpcrdma_noch;
	} else if (rqst->rq_snd_buf.flags & XDRBUF_WRITE) {
		rtype = rpcrdma_readch;
@@ -561,6 +615,9 @@ rpcrdma_marshal_req(struct rpc_rqst *rqst)
	if (hdrlen < 0)
		return hdrlen;

	if (hdrlen + rpclen > RPCRDMA_INLINE_WRITE_THRESHOLD(rqst))
		goto out_overflow;

	dprintk("RPC:       %s: %s: hdrlen %zd rpclen %zd"
		" headerp 0x%p base 0x%p lkey 0x%x\n",
		__func__, transfertypes[wtype], hdrlen, rpclen,
@@ -587,6 +644,14 @@ rpcrdma_marshal_req(struct rpc_rqst *rqst)

	req->rl_niovs = 2;
	return 0;

out_overflow:
	pr_err("rpcrdma: send overflow: hdrlen %zd rpclen %zu %s\n",
		hdrlen, rpclen, transfertypes[wtype]);
	/* Terminate this RPC. Chunks registered above will be
	 * released by xprt_release -> xprt_rmda_free .
	 */
	return -EIO;
}

/*
+5 −0
Original line number Diff line number Diff line
@@ -73,6 +73,8 @@ struct rpcrdma_ia {
	struct completion	ri_done;
	int			ri_async_rc;
	unsigned int		ri_max_frmr_depth;
	unsigned int		ri_max_inline_write;
	unsigned int		ri_max_inline_read;
	struct ib_qp_attr	ri_qp_attr;
	struct ib_qp_init_attr	ri_qp_init_attr;
};
@@ -538,6 +540,9 @@ void rpcrdma_reply_handler(struct rpcrdma_rep *);
 * RPC/RDMA protocol calls - xprtrdma/rpc_rdma.c
 */
int rpcrdma_marshal_req(struct rpc_rqst *);
void rpcrdma_set_max_header_sizes(struct rpcrdma_ia *,
				  struct rpcrdma_create_data_internal *,
				  unsigned int);

/* RPC/RDMA module init - xprtrdma/transport.c
 */