Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit cc3ea893 authored by Trond Myklebust's avatar Trond Myklebust
Browse files

Merge tag 'nfs-rdma-for-3.20' of git://git.linux-nfs.org/projects/anna/nfs-rdma



NFS: Client side changes for RDMA

These patches improve the scalability of the NFSoRDMA client and take large
variables off of the stack.  Additionally, the GFP_* flags are updated to
match what TCP uses.

Signed-off-by: default avatarAnna Schumaker <Anna.Schumaker@Netapp.com>

* tag 'nfs-rdma-for-3.20' of git://git.linux-nfs.org/projects/anna/nfs-rdma: (21 commits)
  xprtrdma: Update the GFP flags used in xprt_rdma_allocate()
  xprtrdma: Clean up after adding regbuf management
  xprtrdma: Allocate zero pad separately from rpcrdma_buffer
  xprtrdma: Allocate RPC/RDMA receive buffer separately from struct rpcrdma_rep
  xprtrdma: Allocate RPC/RDMA send buffer separately from struct rpcrdma_req
  xprtrdma: Allocate RPC send buffer separately from struct rpcrdma_req
  xprtrdma: Add struct rpcrdma_regbuf and helpers
  xprtrdma: Refactor rpcrdma_buffer_create() and rpcrdma_buffer_destroy()
  xprtrdma: Simplify synopsis of rpcrdma_buffer_create()
  xprtrdma: Take struct ib_qp_attr and ib_qp_init_attr off the stack
  xprtrdma: Take struct ib_device_attr off the stack
  xprtrdma: Free the pd if ib_query_qp() fails
  xprtrdma: Remove rpcrdma_ep::rep_func and ::rep_xprt
  xprtrdma: Move credit update to RPC reply handler
  xprtrdma: Remove rl_mr field, and the mr_chunk union
  xprtrdma: Remove rpcrdma_ep::rep_ia
  xprtrdma: Rename "xprt" and "rdma_connect" fields in struct rpcrdma_xprt
  xprtrdma: Clean up hdrlen
  xprtrdma: Display XIDs in host byte order
  xprtrdma: Modernize htonl and ntohl
  ...
parents c7c545d4 a0a1d50c
Loading
Loading
Loading
Loading
+13 −1
Original line number Diff line number Diff line
@@ -42,6 +42,9 @@

#include <linux/types.h>

#define RPCRDMA_VERSION		1
#define rpcrdma_version		cpu_to_be32(RPCRDMA_VERSION)

struct rpcrdma_segment {
	__be32 rs_handle;	/* Registered memory handle */
	__be32 rs_length;	/* Length of the chunk in bytes */
@@ -95,7 +98,10 @@ struct rpcrdma_msg {
	} rm_body;
};

#define RPCRDMA_HDRLEN_MIN	28
/*
 * Smallest RPC/RDMA header: rm_xid through rm_type, then rm_nochunks
 */
#define RPCRDMA_HDRLEN_MIN	(sizeof(__be32) * 7)

enum rpcrdma_errcode {
	ERR_VERS = 1,
@@ -115,4 +121,10 @@ enum rpcrdma_proc {
	RDMA_ERROR = 4		/* An RPC RDMA encoding error */
};

#define rdma_msg	cpu_to_be32(RDMA_MSG)
#define rdma_nomsg	cpu_to_be32(RDMA_NOMSG)
#define rdma_msgp	cpu_to_be32(RDMA_MSGP)
#define rdma_done	cpu_to_be32(RDMA_DONE)
#define rdma_error	cpu_to_be32(RDMA_ERROR)

#endif				/* _LINUX_SUNRPC_RPC_RDMA_H */
+0 −2
Original line number Diff line number Diff line
@@ -63,8 +63,6 @@ extern atomic_t rdma_stat_rq_prod;
extern atomic_t rdma_stat_sq_poll;
extern atomic_t rdma_stat_sq_prod;

#define RPCRDMA_VERSION 1

/*
 * Contexts are built when an RDMA request is created and are a
 * record of the resources that can be recovered when the request
+62 −46
Original line number Diff line number Diff line
@@ -209,9 +209,11 @@ rpcrdma_create_chunks(struct rpc_rqst *rqst, struct xdr_buf *target,
		if (cur_rchunk) {	/* read */
			cur_rchunk->rc_discrim = xdr_one;
			/* all read chunks have the same "position" */
			cur_rchunk->rc_position = htonl(pos);
			cur_rchunk->rc_target.rs_handle = htonl(seg->mr_rkey);
			cur_rchunk->rc_target.rs_length = htonl(seg->mr_len);
			cur_rchunk->rc_position = cpu_to_be32(pos);
			cur_rchunk->rc_target.rs_handle =
						cpu_to_be32(seg->mr_rkey);
			cur_rchunk->rc_target.rs_length =
						cpu_to_be32(seg->mr_len);
			xdr_encode_hyper(
					(__be32 *)&cur_rchunk->rc_target.rs_offset,
					seg->mr_base);
@@ -222,8 +224,10 @@ rpcrdma_create_chunks(struct rpc_rqst *rqst, struct xdr_buf *target,
			cur_rchunk++;
			r_xprt->rx_stats.read_chunk_count++;
		} else {		/* write/reply */
			cur_wchunk->wc_target.rs_handle = htonl(seg->mr_rkey);
			cur_wchunk->wc_target.rs_length = htonl(seg->mr_len);
			cur_wchunk->wc_target.rs_handle =
						cpu_to_be32(seg->mr_rkey);
			cur_wchunk->wc_target.rs_length =
						cpu_to_be32(seg->mr_len);
			xdr_encode_hyper(
					(__be32 *)&cur_wchunk->wc_target.rs_offset,
					seg->mr_base);
@@ -257,7 +261,7 @@ rpcrdma_create_chunks(struct rpc_rqst *rqst, struct xdr_buf *target,
		*iptr++ = xdr_zero;	/* encode a NULL reply chunk */
	} else {
		warray->wc_discrim = xdr_one;
		warray->wc_nchunks = htonl(nchunks);
		warray->wc_nchunks = cpu_to_be32(nchunks);
		iptr = (__be32 *) cur_wchunk;
		if (type == rpcrdma_writech) {
			*iptr++ = xdr_zero; /* finish the write chunk list */
@@ -290,7 +294,7 @@ ssize_t
rpcrdma_marshal_chunks(struct rpc_rqst *rqst, ssize_t result)
{
	struct rpcrdma_req *req = rpcr_to_rdmar(rqst);
	struct rpcrdma_msg *headerp = (struct rpcrdma_msg *)req->rl_base;
	struct rpcrdma_msg *headerp = rdmab_to_msg(req->rl_rdmabuf);

	if (req->rl_rtype != rpcrdma_noch)
		result = rpcrdma_create_chunks(rqst, &rqst->rq_snd_buf,
@@ -402,13 +406,12 @@ rpcrdma_marshal_req(struct rpc_rqst *rqst)
	base = rqst->rq_svec[0].iov_base;
	rpclen = rqst->rq_svec[0].iov_len;

	/* build RDMA header in private area at front */
	headerp = (struct rpcrdma_msg *) req->rl_base;
	/* don't htonl XID, it's already done in request */
	headerp = rdmab_to_msg(req->rl_rdmabuf);
	/* don't byte-swap XID, it's already done in request */
	headerp->rm_xid = rqst->rq_xid;
	headerp->rm_vers = xdr_one;
	headerp->rm_credit = htonl(r_xprt->rx_buf.rb_max_requests);
	headerp->rm_type = htonl(RDMA_MSG);
	headerp->rm_vers = rpcrdma_version;
	headerp->rm_credit = cpu_to_be32(r_xprt->rx_buf.rb_max_requests);
	headerp->rm_type = rdma_msg;

	/*
	 * Chunks needed for results?
@@ -468,7 +471,7 @@ rpcrdma_marshal_req(struct rpc_rqst *rqst)
		return -EIO;
	}

	hdrlen = 28; /*sizeof *headerp;*/
	hdrlen = RPCRDMA_HDRLEN_MIN;
	padlen = 0;

	/*
@@ -482,11 +485,11 @@ rpcrdma_marshal_req(struct rpc_rqst *rqst)
						RPCRDMA_INLINE_PAD_VALUE(rqst));

		if (padlen) {
			headerp->rm_type = htonl(RDMA_MSGP);
			headerp->rm_type = rdma_msgp;
			headerp->rm_body.rm_padded.rm_align =
				htonl(RPCRDMA_INLINE_PAD_VALUE(rqst));
				cpu_to_be32(RPCRDMA_INLINE_PAD_VALUE(rqst));
			headerp->rm_body.rm_padded.rm_thresh =
				htonl(RPCRDMA_INLINE_PAD_THRESH);
				cpu_to_be32(RPCRDMA_INLINE_PAD_THRESH);
			headerp->rm_body.rm_padded.rm_pempty[0] = xdr_zero;
			headerp->rm_body.rm_padded.rm_pempty[1] = xdr_zero;
			headerp->rm_body.rm_padded.rm_pempty[2] = xdr_zero;
@@ -524,7 +527,7 @@ rpcrdma_marshal_req(struct rpc_rqst *rqst)
	dprintk("RPC:       %s: %s: hdrlen %zd rpclen %zd padlen %zd"
		" headerp 0x%p base 0x%p lkey 0x%x\n",
		__func__, transfertypes[req->rl_wtype], hdrlen, rpclen, padlen,
		headerp, base, req->rl_iov.lkey);
		headerp, base, rdmab_lkey(req->rl_rdmabuf));

	/*
	 * initialize send_iov's - normally only two: rdma chunk header and
@@ -533,26 +536,26 @@ rpcrdma_marshal_req(struct rpc_rqst *rqst)
	 * header and any write data. In all non-rdma cases, any following
	 * data has been copied into the RPC header buffer.
	 */
	req->rl_send_iov[0].addr = req->rl_iov.addr;
	req->rl_send_iov[0].addr = rdmab_addr(req->rl_rdmabuf);
	req->rl_send_iov[0].length = hdrlen;
	req->rl_send_iov[0].lkey = req->rl_iov.lkey;
	req->rl_send_iov[0].lkey = rdmab_lkey(req->rl_rdmabuf);

	req->rl_send_iov[1].addr = req->rl_iov.addr + (base - req->rl_base);
	req->rl_send_iov[1].addr = rdmab_addr(req->rl_sendbuf);
	req->rl_send_iov[1].length = rpclen;
	req->rl_send_iov[1].lkey = req->rl_iov.lkey;
	req->rl_send_iov[1].lkey = rdmab_lkey(req->rl_sendbuf);

	req->rl_niovs = 2;

	if (padlen) {
		struct rpcrdma_ep *ep = &r_xprt->rx_ep;

		req->rl_send_iov[2].addr = ep->rep_pad.addr;
		req->rl_send_iov[2].addr = rdmab_addr(ep->rep_padbuf);
		req->rl_send_iov[2].length = padlen;
		req->rl_send_iov[2].lkey = ep->rep_pad.lkey;
		req->rl_send_iov[2].lkey = rdmab_lkey(ep->rep_padbuf);

		req->rl_send_iov[3].addr = req->rl_send_iov[1].addr + rpclen;
		req->rl_send_iov[3].length = rqst->rq_slen - rpclen;
		req->rl_send_iov[3].lkey = req->rl_iov.lkey;
		req->rl_send_iov[3].lkey = rdmab_lkey(req->rl_sendbuf);

		req->rl_niovs = 4;
	}
@@ -569,8 +572,9 @@ rpcrdma_count_chunks(struct rpcrdma_rep *rep, unsigned int max, int wrchunk, __b
{
	unsigned int i, total_len;
	struct rpcrdma_write_chunk *cur_wchunk;
	char *base = (char *)rdmab_to_msg(rep->rr_rdmabuf);

	i = ntohl(**iptrp);	/* get array count */
	i = be32_to_cpu(**iptrp);
	if (i > max)
		return -1;
	cur_wchunk = (struct rpcrdma_write_chunk *) (*iptrp + 1);
@@ -582,11 +586,11 @@ rpcrdma_count_chunks(struct rpcrdma_rep *rep, unsigned int max, int wrchunk, __b
			xdr_decode_hyper((__be32 *)&seg->rs_offset, &off);
			dprintk("RPC:       %s: chunk %d@0x%llx:0x%x\n",
				__func__,
				ntohl(seg->rs_length),
				be32_to_cpu(seg->rs_length),
				(unsigned long long)off,
				ntohl(seg->rs_handle));
				be32_to_cpu(seg->rs_handle));
		}
		total_len += ntohl(seg->rs_length);
		total_len += be32_to_cpu(seg->rs_length);
		++cur_wchunk;
	}
	/* check and adjust for properly terminated write chunk */
@@ -596,7 +600,7 @@ rpcrdma_count_chunks(struct rpcrdma_rep *rep, unsigned int max, int wrchunk, __b
			return -1;
		cur_wchunk = (struct rpcrdma_write_chunk *) w;
	}
	if ((char *) cur_wchunk > rep->rr_base + rep->rr_len)
	if ((char *)cur_wchunk > base + rep->rr_len)
		return -1;

	*iptrp = (__be32 *) cur_wchunk;
@@ -691,7 +695,9 @@ rpcrdma_connect_worker(struct work_struct *work)
{
	struct rpcrdma_ep *ep =
		container_of(work, struct rpcrdma_ep, rep_connect_worker.work);
	struct rpc_xprt *xprt = ep->rep_xprt;
	struct rpcrdma_xprt *r_xprt =
		container_of(ep, struct rpcrdma_xprt, rx_ep);
	struct rpc_xprt *xprt = &r_xprt->rx_xprt;

	spin_lock_bh(&xprt->transport_lock);
	if (++xprt->connect_cookie == 0)	/* maintain a reserved value */
@@ -732,7 +738,7 @@ rpcrdma_reply_handler(struct rpcrdma_rep *rep)
	struct rpc_xprt *xprt = rep->rr_xprt;
	struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
	__be32 *iptr;
	int rdmalen, status;
	int credits, rdmalen, status;
	unsigned long cwnd;

	/* Check status. If bad, signal disconnect and return rep to pool */
@@ -744,14 +750,14 @@ rpcrdma_reply_handler(struct rpcrdma_rep *rep)
		}
		return;
	}
	if (rep->rr_len < 28) {
	if (rep->rr_len < RPCRDMA_HDRLEN_MIN) {
		dprintk("RPC:       %s: short/invalid reply\n", __func__);
		goto repost;
	}
	headerp = (struct rpcrdma_msg *) rep->rr_base;
	if (headerp->rm_vers != xdr_one) {
	headerp = rdmab_to_msg(rep->rr_rdmabuf);
	if (headerp->rm_vers != rpcrdma_version) {
		dprintk("RPC:       %s: invalid version %d\n",
			__func__, ntohl(headerp->rm_vers));
			__func__, be32_to_cpu(headerp->rm_vers));
		goto repost;
	}

@@ -762,7 +768,8 @@ rpcrdma_reply_handler(struct rpcrdma_rep *rep)
		spin_unlock(&xprt->transport_lock);
		dprintk("RPC:       %s: reply 0x%p failed "
			"to match any request xid 0x%08x len %d\n",
			__func__, rep, headerp->rm_xid, rep->rr_len);
			__func__, rep, be32_to_cpu(headerp->rm_xid),
			rep->rr_len);
repost:
		r_xprt->rx_stats.bad_reply_count++;
		rep->rr_func = rpcrdma_reply_handler;
@@ -778,13 +785,14 @@ rpcrdma_reply_handler(struct rpcrdma_rep *rep)
		spin_unlock(&xprt->transport_lock);
		dprintk("RPC:       %s: duplicate reply 0x%p to RPC "
			"request 0x%p: xid 0x%08x\n", __func__, rep, req,
			headerp->rm_xid);
			be32_to_cpu(headerp->rm_xid));
		goto repost;
	}

	dprintk("RPC:       %s: reply 0x%p completes request 0x%p\n"
		"                   RPC request 0x%p xid 0x%08x\n",
			__func__, rep, req, rqst, headerp->rm_xid);
			__func__, rep, req, rqst,
			be32_to_cpu(headerp->rm_xid));

	/* from here on, the reply is no longer an orphan */
	req->rl_reply = rep;
@@ -793,7 +801,7 @@ rpcrdma_reply_handler(struct rpcrdma_rep *rep)
	/* check for expected message types */
	/* The order of some of these tests is important. */
	switch (headerp->rm_type) {
	case htonl(RDMA_MSG):
	case rdma_msg:
		/* never expect read chunks */
		/* never expect reply chunks (two ways to check) */
		/* never expect write chunks without having offered RDMA */
@@ -824,22 +832,24 @@ rpcrdma_reply_handler(struct rpcrdma_rep *rep)
		} else {
			/* else ordinary inline */
			rdmalen = 0;
			iptr = (__be32 *)((unsigned char *)headerp + 28);
			rep->rr_len -= 28; /*sizeof *headerp;*/
			iptr = (__be32 *)((unsigned char *)headerp +
							RPCRDMA_HDRLEN_MIN);
			rep->rr_len -= RPCRDMA_HDRLEN_MIN;
			status = rep->rr_len;
		}
		/* Fix up the rpc results for upper layer */
		rpcrdma_inline_fixup(rqst, (char *)iptr, rep->rr_len, rdmalen);
		break;

	case htonl(RDMA_NOMSG):
	case rdma_nomsg:
		/* never expect read or write chunks, always reply chunks */
		if (headerp->rm_body.rm_chunks[0] != xdr_zero ||
		    headerp->rm_body.rm_chunks[1] != xdr_zero ||
		    headerp->rm_body.rm_chunks[2] != xdr_one ||
		    req->rl_nchunks == 0)
			goto badheader;
		iptr = (__be32 *)((unsigned char *)headerp + 28);
		iptr = (__be32 *)((unsigned char *)headerp +
							RPCRDMA_HDRLEN_MIN);
		rdmalen = rpcrdma_count_chunks(rep, req->rl_nchunks, 0, &iptr);
		if (rdmalen < 0)
			goto badheader;
@@ -853,7 +863,7 @@ rpcrdma_reply_handler(struct rpcrdma_rep *rep)
		dprintk("%s: invalid rpcrdma reply header (type %d):"
				" chunks[012] == %d %d %d"
				" expected chunks <= %d\n",
				__func__, ntohl(headerp->rm_type),
				__func__, be32_to_cpu(headerp->rm_type),
				headerp->rm_body.rm_chunks[0],
				headerp->rm_body.rm_chunks[1],
				headerp->rm_body.rm_chunks[2],
@@ -863,8 +873,14 @@ rpcrdma_reply_handler(struct rpcrdma_rep *rep)
		break;
	}

	credits = be32_to_cpu(headerp->rm_credit);
	if (credits == 0)
		credits = 1;	/* don't deadlock */
	else if (credits > r_xprt->rx_buf.rb_max_requests)
		credits = r_xprt->rx_buf.rb_max_requests;

	cwnd = xprt->cwnd;
	xprt->cwnd = atomic_read(&r_xprt->rx_buf.rb_credits) << RPC_CWNDSHIFT;
	xprt->cwnd = credits << RPC_CWNDSHIFT;
	if (xprt->cwnd > cwnd)
		xprt_release_rqst_cong(rqst->rq_task);

+82 −100
Original line number Diff line number Diff line
@@ -200,9 +200,9 @@ xprt_rdma_free_addresses(struct rpc_xprt *xprt)
static void
xprt_rdma_connect_worker(struct work_struct *work)
{
	struct rpcrdma_xprt *r_xprt =
		container_of(work, struct rpcrdma_xprt, rdma_connect.work);
	struct rpc_xprt *xprt = &r_xprt->xprt;
	struct rpcrdma_xprt *r_xprt = container_of(work, struct rpcrdma_xprt,
						   rx_connect_worker.work);
	struct rpc_xprt *xprt = &r_xprt->rx_xprt;
	int rc = 0;

	xprt_clear_connected(xprt);
@@ -235,7 +235,7 @@ xprt_rdma_destroy(struct rpc_xprt *xprt)

	dprintk("RPC:       %s: called\n", __func__);

	cancel_delayed_work_sync(&r_xprt->rdma_connect);
	cancel_delayed_work_sync(&r_xprt->rx_connect_worker);

	xprt_clear_connected(xprt);

@@ -364,8 +364,7 @@ xprt_setup_rdma(struct xprt_create *args)
	 * any inline data. Also specify any padding which will be provided
	 * from a preregistered zero buffer.
	 */
	rc = rpcrdma_buffer_create(&new_xprt->rx_buf, new_ep, &new_xprt->rx_ia,
				&new_xprt->rx_data);
	rc = rpcrdma_buffer_create(new_xprt);
	if (rc)
		goto out3;

@@ -374,9 +373,8 @@ xprt_setup_rdma(struct xprt_create *args)
	 * connection loss notification is async. We also catch connection loss
	 * when reaping receives.
	 */
	INIT_DELAYED_WORK(&new_xprt->rdma_connect, xprt_rdma_connect_worker);
	new_ep->rep_func = rpcrdma_conn_func;
	new_ep->rep_xprt = xprt;
	INIT_DELAYED_WORK(&new_xprt->rx_connect_worker,
			  xprt_rdma_connect_worker);

	xprt_rdma_format_addresses(xprt);
	xprt->max_payload = rpcrdma_max_payload(new_xprt);
@@ -434,7 +432,7 @@ xprt_rdma_connect(struct rpc_xprt *xprt, struct rpc_task *task)

	if (r_xprt->rx_ep.rep_connected != 0) {
		/* Reconnect */
		schedule_delayed_work(&r_xprt->rdma_connect,
		schedule_delayed_work(&r_xprt->rx_connect_worker,
				      xprt->reestablish_timeout);
		xprt->reestablish_timeout <<= 1;
		if (xprt->reestablish_timeout > RPCRDMA_MAX_REEST_TO)
@@ -442,86 +440,93 @@ xprt_rdma_connect(struct rpc_xprt *xprt, struct rpc_task *task)
		else if (xprt->reestablish_timeout < RPCRDMA_INIT_REEST_TO)
			xprt->reestablish_timeout = RPCRDMA_INIT_REEST_TO;
	} else {
		schedule_delayed_work(&r_xprt->rdma_connect, 0);
		schedule_delayed_work(&r_xprt->rx_connect_worker, 0);
		if (!RPC_IS_ASYNC(task))
			flush_delayed_work(&r_xprt->rdma_connect);
			flush_delayed_work(&r_xprt->rx_connect_worker);
	}
}

/*
 * The RDMA allocate/free functions need the task structure as a place
 * to hide the struct rpcrdma_req, which is necessary for the actual send/recv
 * sequence. For this reason, the recv buffers are attached to send
 * buffers for portions of the RPC. Note that the RPC layer allocates
 * both send and receive buffers in the same call. We may register
 * the receive buffer portion when using reply chunks.
 * sequence.
 *
 * The RPC layer allocates both send and receive buffers in the same call
 * (rq_send_buf and rq_rcv_buf are both part of a single contiguous buffer).
 * We may register rq_rcv_buf when using reply chunks.
 */
static void *
xprt_rdma_allocate(struct rpc_task *task, size_t size)
{
	struct rpc_xprt *xprt = task->tk_rqstp->rq_xprt;
	struct rpcrdma_req *req, *nreq;
	struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
	struct rpcrdma_regbuf *rb;
	struct rpcrdma_req *req;
	size_t min_size;
	gfp_t flags;

	req = rpcrdma_buffer_get(&rpcx_to_rdmax(xprt)->rx_buf);
	req = rpcrdma_buffer_get(&r_xprt->rx_buf);
	if (req == NULL)
		return NULL;

	if (size > req->rl_size) {
		dprintk("RPC:       %s: size %zd too large for buffer[%zd]: "
			"prog %d vers %d proc %d\n",
			__func__, size, req->rl_size,
			task->tk_client->cl_prog, task->tk_client->cl_vers,
			task->tk_msg.rpc_proc->p_proc);
		/*
		 * Outgoing length shortage. Our inline write max must have
		 * been configured to perform direct i/o.
	flags = GFP_NOIO | __GFP_NOWARN;
	if (RPC_IS_SWAPPER(task))
		flags = __GFP_MEMALLOC | GFP_NOWAIT | __GFP_NOWARN;

	if (req->rl_rdmabuf == NULL)
		goto out_rdmabuf;
	if (req->rl_sendbuf == NULL)
		goto out_sendbuf;
	if (size > req->rl_sendbuf->rg_size)
		goto out_sendbuf;

out:
	dprintk("RPC:       %s: size %zd, request 0x%p\n", __func__, size, req);
	req->rl_connect_cookie = 0;	/* our reserved value */
	return req->rl_sendbuf->rg_base;

out_rdmabuf:
	min_size = RPCRDMA_INLINE_WRITE_THRESHOLD(task->tk_rqstp);
	rb = rpcrdma_alloc_regbuf(&r_xprt->rx_ia, min_size, flags);
	if (IS_ERR(rb))
		goto out_fail;
	req->rl_rdmabuf = rb;

out_sendbuf:
	/* XDR encoding and RPC/RDMA marshaling of this request has not
	 * yet occurred. Thus a lower bound is needed to prevent buffer
	 * overrun during marshaling.
	 *
		 * This is therefore a large metadata operation, and the
		 * allocate call was made on the maximum possible message,
		 * e.g. containing long filename(s) or symlink data. In
		 * fact, while these metadata operations *might* carry
		 * large outgoing payloads, they rarely *do*. However, we
		 * have to commit to the request here, so reallocate and
		 * register it now. The data path will never require this
		 * reallocation.
	 * RPC/RDMA marshaling may choose to send payload bearing ops
	 * inline, if the result is smaller than the inline threshold.
	 * The value of the "size" argument accounts for header
	 * requirements but not for the payload in these cases.
	 *
		 * If the allocation or registration fails, the RPC framework
		 * will (doggedly) retry.
	 * Likewise, allocate enough space to receive a reply up to the
	 * size of the inline threshold.
	 *
	 * It's unlikely that both the send header and the received
	 * reply will be large, but slush is provided here to allow
	 * flexibility when marshaling.
	 */
		if (task->tk_flags & RPC_TASK_SWAPPER)
			nreq = kmalloc(sizeof *req + size, GFP_ATOMIC);
		else
			nreq = kmalloc(sizeof *req + size, GFP_NOFS);
		if (nreq == NULL)
			goto outfail;

		if (rpcrdma_register_internal(&rpcx_to_rdmax(xprt)->rx_ia,
				nreq->rl_base, size + sizeof(struct rpcrdma_req)
				- offsetof(struct rpcrdma_req, rl_base),
				&nreq->rl_handle, &nreq->rl_iov)) {
			kfree(nreq);
			goto outfail;
		}
		rpcx_to_rdmax(xprt)->rx_stats.hardway_register_count += size;
		nreq->rl_size = size;
		nreq->rl_niovs = 0;
		nreq->rl_nchunks = 0;
		nreq->rl_buffer = (struct rpcrdma_buffer *)req;
		nreq->rl_reply = req->rl_reply;
		memcpy(nreq->rl_segments,
			req->rl_segments, sizeof nreq->rl_segments);
		/* flag the swap with an unused field */
		nreq->rl_iov.length = 0;
		req->rl_reply = NULL;
		req = nreq;
	}
	dprintk("RPC:       %s: size %zd, request 0x%p\n", __func__, size, req);
	req->rl_connect_cookie = 0;	/* our reserved value */
	return req->rl_xdr_buf;

outfail:
	min_size = RPCRDMA_INLINE_READ_THRESHOLD(task->tk_rqstp);
	min_size += RPCRDMA_INLINE_WRITE_THRESHOLD(task->tk_rqstp);
	if (size < min_size)
		size = min_size;

	rb = rpcrdma_alloc_regbuf(&r_xprt->rx_ia, size, flags);
	if (IS_ERR(rb))
		goto out_fail;
	rb->rg_owner = req;

	r_xprt->rx_stats.hardway_register_count += size;
	rpcrdma_free_regbuf(&r_xprt->rx_ia, req->rl_sendbuf);
	req->rl_sendbuf = rb;
	goto out;

out_fail:
	rpcrdma_buffer_put(req);
	rpcx_to_rdmax(xprt)->rx_stats.failed_marshal_count++;
	r_xprt->rx_stats.failed_marshal_count++;
	return NULL;
}

@@ -533,47 +538,24 @@ xprt_rdma_free(void *buffer)
{
	struct rpcrdma_req *req;
	struct rpcrdma_xprt *r_xprt;
	struct rpcrdma_rep *rep;
	struct rpcrdma_regbuf *rb;
	int i;

	if (buffer == NULL)
		return;

	req = container_of(buffer, struct rpcrdma_req, rl_xdr_buf[0]);
	if (req->rl_iov.length == 0) {	/* see allocate above */
		r_xprt = container_of(((struct rpcrdma_req *) req->rl_buffer)->rl_buffer,
				      struct rpcrdma_xprt, rx_buf);
	} else
	rb = container_of(buffer, struct rpcrdma_regbuf, rg_base[0]);
	req = rb->rg_owner;
	r_xprt = container_of(req->rl_buffer, struct rpcrdma_xprt, rx_buf);
	rep = req->rl_reply;

	dprintk("RPC:       %s: called on 0x%p%s\n",
		__func__, rep, (rep && rep->rr_func) ? " (with waiter)" : "");
	dprintk("RPC:       %s: called on 0x%p\n", __func__, req->rl_reply);

	/*
	 * Finish the deregistration.  The process is considered
	 * complete when the rr_func vector becomes NULL - this
	 * was put in place during rpcrdma_reply_handler() - the wait
	 * call below will not block if the dereg is "done". If
	 * interrupted, our framework will clean up.
	 */
	for (i = 0; req->rl_nchunks;) {
		--req->rl_nchunks;
		i += rpcrdma_deregister_external(
			&req->rl_segments[i], r_xprt);
	}

	if (req->rl_iov.length == 0) {	/* see allocate above */
		struct rpcrdma_req *oreq = (struct rpcrdma_req *)req->rl_buffer;
		oreq->rl_reply = req->rl_reply;
		(void) rpcrdma_deregister_internal(&r_xprt->rx_ia,
						   req->rl_handle,
						   &req->rl_iov);
		kfree(req);
		req = oreq;
	}

	/* Put back request+reply buffers */
	rpcrdma_buffer_put(req);
}

+254 −157

File changed.

Preview size limit exceeded, changes collapsed.

Loading