Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit 99722fe4 authored by Chuck Lever's avatar Chuck Lever Committed by J. Bruce Fields
Browse files

svcrdma: Persistently allocate and DMA-map Send buffers



While sending each RPC Reply, svc_rdma_sendto allocates and DMA-
maps a separate buffer where the RPC/RDMA transport header is
constructed. The buffer is unmapped and released in the Send
completion handler. This is significant per-RPC overhead,
especially for small RPCs.

Instead, allocate and DMA-map a buffer, and cache it in each
svc_rdma_send_ctxt. This buffer and its mapping can be re-used
for each RPC, saving the cost of memory allocation and DMA
mapping.

Signed-off-by: default avatarChuck Lever <chuck.lever@oracle.com>
Signed-off-by: default avatarJ. Bruce Fields <bfields@redhat.com>
parent 3abb03fa
Loading
Loading
Loading
Loading
+6 −2
Original line number Diff line number Diff line
@@ -162,6 +162,7 @@ struct svc_rdma_send_ctxt {
	struct list_head	sc_list;
	struct ib_send_wr	sc_send_wr;
	struct ib_cqe		sc_cqe;
	void			*sc_xprt_buf;
	int			sc_page_count;
	int			sc_cur_sge_no;
	struct page		*sc_pages[RPCSVC_MAXPAGES];
@@ -199,9 +200,12 @@ extern struct svc_rdma_send_ctxt *
extern void svc_rdma_send_ctxt_put(struct svcxprt_rdma *rdma,
				   struct svc_rdma_send_ctxt *ctxt);
extern int svc_rdma_send(struct svcxprt_rdma *rdma, struct ib_send_wr *wr);
extern int svc_rdma_map_reply_hdr(struct svcxprt_rdma *rdma,
extern void svc_rdma_sync_reply_hdr(struct svcxprt_rdma *rdma,
				    struct svc_rdma_send_ctxt *ctxt,
				  __be32 *rdma_resp, unsigned int len);
				    unsigned int len);
extern int svc_rdma_map_reply_msg(struct svcxprt_rdma *rdma,
				  struct svc_rdma_send_ctxt *ctxt,
				  struct xdr_buf *xdr, __be32 *wr_lst);
extern int svc_rdma_sendto(struct svc_rqst *);

/* svc_rdma_transport.c */
+17 −34
Original line number Diff line number Diff line
@@ -115,43 +115,21 @@ int svc_rdma_handle_bc_reply(struct rpc_xprt *xprt, __be32 *rdma_resp,
 * the adapter has a small maximum SQ depth.
 */
static int svc_rdma_bc_sendto(struct svcxprt_rdma *rdma,
			      struct rpc_rqst *rqst)
			      struct rpc_rqst *rqst,
			      struct svc_rdma_send_ctxt *ctxt)
{
	struct svc_rdma_send_ctxt *ctxt;
	int ret;

	ctxt = svc_rdma_send_ctxt_get(rdma);
	if (!ctxt) {
		ret = -ENOMEM;
		goto out_err;
	}

	/* rpcrdma_bc_send_request builds the transport header and
	 * the backchannel RPC message in the same buffer. Thus only
	 * one SGE is needed to send both.
	 */
	ret = svc_rdma_map_reply_hdr(rdma, ctxt, rqst->rq_buffer,
				     rqst->rq_snd_buf.len);
	ret = svc_rdma_map_reply_msg(rdma, ctxt, &rqst->rq_snd_buf, NULL);
	if (ret < 0)
		goto out_err;
		return -EIO;

	/* Bump page refcnt so Send completion doesn't release
	 * the rq_buffer before all retransmits are complete.
	 */
	get_page(virt_to_page(rqst->rq_buffer));
	ctxt->sc_send_wr.opcode = IB_WR_SEND;
	ret = svc_rdma_send(rdma, &ctxt->sc_send_wr);
	if (ret)
		goto out_unmap;

out_err:
	dprintk("svcrdma: %s returns %d\n", __func__, ret);
	return ret;

out_unmap:
	svc_rdma_send_ctxt_put(rdma, ctxt);
	ret = -EIO;
	goto out_err;
	return svc_rdma_send(rdma, &ctxt->sc_send_wr);
}

/* Server-side transport endpoint wants a whole page for its send
@@ -198,13 +176,15 @@ rpcrdma_bc_send_request(struct svcxprt_rdma *rdma, struct rpc_rqst *rqst)
{
	struct rpc_xprt *xprt = rqst->rq_xprt;
	struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
	struct svc_rdma_send_ctxt *ctxt;
	__be32 *p;
	int rc;

	/* Space in the send buffer for an RPC/RDMA header is reserved
	 * via xprt->tsh_size.
	 */
	p = rqst->rq_buffer;
	ctxt = svc_rdma_send_ctxt_get(rdma);
	if (!ctxt)
		goto drop_connection;

	p = ctxt->sc_xprt_buf;
	*p++ = rqst->rq_xid;
	*p++ = rpcrdma_version;
	*p++ = cpu_to_be32(r_xprt->rx_buf.rb_bc_max_requests);
@@ -212,14 +192,17 @@ rpcrdma_bc_send_request(struct svcxprt_rdma *rdma, struct rpc_rqst *rqst)
	*p++ = xdr_zero;
	*p++ = xdr_zero;
	*p   = xdr_zero;
	svc_rdma_sync_reply_hdr(rdma, ctxt, RPCRDMA_HDRLEN_MIN);

#ifdef SVCRDMA_BACKCHANNEL_DEBUG
	pr_info("%s: %*ph\n", __func__, 64, rqst->rq_buffer);
#endif

	rc = svc_rdma_bc_sendto(rdma, rqst);
	if (rc)
	rc = svc_rdma_bc_sendto(rdma, rqst, ctxt);
	if (rc) {
		svc_rdma_send_ctxt_put(rdma, ctxt);
		goto drop_connection;
	}
	return rc;

drop_connection:
@@ -327,7 +310,7 @@ xprt_setup_rdma_bc(struct xprt_create *args)
	xprt->idle_timeout = RPCRDMA_IDLE_DISC_TO;

	xprt->prot = XPRT_TRANSPORT_BC_RDMA;
	xprt->tsh_size = RPCRDMA_HDRLEN_MIN / sizeof(__be32);
	xprt->tsh_size = 0;
	xprt->ops = &xprt_rdma_bc_procs;

	memcpy(&xprt->addr, args->dstaddr, args->addrlen);
+6 −19
Original line number Diff line number Diff line
@@ -602,17 +602,15 @@ static void svc_rdma_send_error(struct svcxprt_rdma *xprt,
				__be32 *rdma_argp, int status)
{
	struct svc_rdma_send_ctxt *ctxt;
	__be32 *p, *err_msgp;
	unsigned int length;
	struct page *page;
	__be32 *p;
	int ret;

	page = alloc_page(GFP_KERNEL);
	if (!page)
	ctxt = svc_rdma_send_ctxt_get(xprt);
	if (!ctxt)
		return;
	err_msgp = page_address(page);

	p = err_msgp;
	p = ctxt->sc_xprt_buf;
	*p++ = *rdma_argp;
	*p++ = *(rdma_argp + 1);
	*p++ = xprt->sc_fc_credits;
@@ -628,19 +626,8 @@ static void svc_rdma_send_error(struct svcxprt_rdma *xprt,
		*p++ = err_chunk;
		trace_svcrdma_err_chunk(*rdma_argp);
	}
	length = (unsigned long)p - (unsigned long)err_msgp;

	/* Map transport header; no RPC message payload */
	ctxt = svc_rdma_send_ctxt_get(xprt);
	if (!ctxt)
		return;

	ret = svc_rdma_map_reply_hdr(xprt, ctxt, err_msgp, length);
	if (ret) {
		dprintk("svcrdma: Error %d mapping send for protocol error\n",
			ret);
		return;
	}
	length = (unsigned long)p - (unsigned long)ctxt->sc_xprt_buf;
	svc_rdma_sync_reply_hdr(xprt, ctxt, length);

	ctxt->sc_send_wr.opcode = IB_WR_SEND;
	ret = svc_rdma_send(xprt, &ctxt->sc_send_wr);
+76 −73
Original line number Diff line number Diff line
@@ -127,6 +127,8 @@ static struct svc_rdma_send_ctxt *
svc_rdma_send_ctxt_alloc(struct svcxprt_rdma *rdma)
{
	struct svc_rdma_send_ctxt *ctxt;
	dma_addr_t addr;
	void *buffer;
	size_t size;
	int i;

@@ -134,16 +136,33 @@ svc_rdma_send_ctxt_alloc(struct svcxprt_rdma *rdma)
	size += rdma->sc_max_send_sges * sizeof(struct ib_sge);
	ctxt = kmalloc(size, GFP_KERNEL);
	if (!ctxt)
		return NULL;
		goto fail0;
	buffer = kmalloc(rdma->sc_max_req_size, GFP_KERNEL);
	if (!buffer)
		goto fail1;
	addr = ib_dma_map_single(rdma->sc_pd->device, buffer,
				 rdma->sc_max_req_size, DMA_TO_DEVICE);
	if (ib_dma_mapping_error(rdma->sc_pd->device, addr))
		goto fail2;

	ctxt->sc_cqe.done = svc_rdma_wc_send;
	ctxt->sc_send_wr.next = NULL;
	ctxt->sc_send_wr.wr_cqe = &ctxt->sc_cqe;
	ctxt->sc_send_wr.sg_list = ctxt->sc_sges;
	ctxt->sc_send_wr.send_flags = IB_SEND_SIGNALED;
	ctxt->sc_cqe.done = svc_rdma_wc_send;
	ctxt->sc_xprt_buf = buffer;
	ctxt->sc_sges[0].addr = addr;

	for (i = 0; i < rdma->sc_max_send_sges; i++)
		ctxt->sc_sges[i].lkey = rdma->sc_pd->local_dma_lkey;
	return ctxt;

fail2:
	kfree(buffer);
fail1:
	kfree(ctxt);
fail0:
	return NULL;
}

/**
@@ -157,6 +176,11 @@ void svc_rdma_send_ctxts_destroy(struct svcxprt_rdma *rdma)

	while ((ctxt = svc_rdma_next_send_ctxt(&rdma->sc_send_ctxts))) {
		list_del(&ctxt->sc_list);
		ib_dma_unmap_single(rdma->sc_pd->device,
				    ctxt->sc_sges[0].addr,
				    rdma->sc_max_req_size,
				    DMA_TO_DEVICE);
		kfree(ctxt->sc_xprt_buf);
		kfree(ctxt);
	}
}
@@ -181,6 +205,7 @@ struct svc_rdma_send_ctxt *svc_rdma_send_ctxt_get(struct svcxprt_rdma *rdma)

out:
	ctxt->sc_send_wr.num_sge = 0;
	ctxt->sc_cur_sge_no = 0;
	ctxt->sc_page_count = 0;
	return ctxt;

@@ -205,7 +230,10 @@ void svc_rdma_send_ctxt_put(struct svcxprt_rdma *rdma,
	struct ib_device *device = rdma->sc_cm_id->device;
	unsigned int i;

	for (i = 0; i < ctxt->sc_send_wr.num_sge; i++)
	/* The first SGE contains the transport header, which
	 * remains mapped until @ctxt is destroyed.
	 */
	for (i = 1; i < ctxt->sc_send_wr.num_sge; i++)
		ib_dma_unmap_page(device,
				  ctxt->sc_sges[i].addr,
				  ctxt->sc_sges[i].length,
@@ -519,33 +547,35 @@ static int svc_rdma_dma_map_buf(struct svcxprt_rdma *rdma,
}

/**
 * svc_rdma_map_reply_hdr - DMA map the transport header buffer
 * svc_rdma_sync_reply_hdr - DMA sync the transport header buffer
 * @rdma: controlling transport
 * @ctxt: op_ctxt for the Send WR
 * @rdma_resp: buffer containing transport header
 * @ctxt: send_ctxt for the Send WR
 * @len: length of transport header
 *
 * Returns:
 *	%0 if the header is DMA mapped,
 *	%-EIO if DMA mapping failed.
 */
int svc_rdma_map_reply_hdr(struct svcxprt_rdma *rdma,
void svc_rdma_sync_reply_hdr(struct svcxprt_rdma *rdma,
			     struct svc_rdma_send_ctxt *ctxt,
			   __be32 *rdma_resp,
			     unsigned int len)
{
	ctxt->sc_pages[0] = virt_to_page(rdma_resp);
	ctxt->sc_page_count++;
	ctxt->sc_cur_sge_no = 0;
	return svc_rdma_dma_map_page(rdma, ctxt, ctxt->sc_pages[0], 0, len);
	ctxt->sc_sges[0].length = len;
	ctxt->sc_send_wr.num_sge++;
	ib_dma_sync_single_for_device(rdma->sc_pd->device,
				      ctxt->sc_sges[0].addr, len,
				      DMA_TO_DEVICE);
}

/* Load the xdr_buf into the ctxt's sge array, and DMA map each
/* svc_rdma_map_reply_msg - Map the buffer holding RPC message
 * @rdma: controlling transport
 * @ctxt: send_ctxt for the Send WR
 * @xdr: prepared xdr_buf containing RPC message
 * @wr_lst: pointer to Call header's Write list, or NULL
 *
 * Load the xdr_buf into the ctxt's sge array, and DMA map each
 * element as it is added.
 *
 * Returns zero on success, or a negative errno on failure.
 */
static int svc_rdma_map_reply_msg(struct svcxprt_rdma *rdma,
int svc_rdma_map_reply_msg(struct svcxprt_rdma *rdma,
			   struct svc_rdma_send_ctxt *ctxt,
			   struct xdr_buf *xdr, __be32 *wr_lst)
{
@@ -624,7 +654,7 @@ static void svc_rdma_save_io_pages(struct svc_rqst *rqstp,

	ctxt->sc_page_count += pages;
	for (i = 0; i < pages; i++) {
		ctxt->sc_pages[i + 1] = rqstp->rq_respages[i];
		ctxt->sc_pages[i] = rqstp->rq_respages[i];
		rqstp->rq_respages[i] = NULL;
	}
	rqstp->rq_next_page = rqstp->rq_respages + 1;
@@ -649,27 +679,18 @@ static void svc_rdma_save_io_pages(struct svc_rqst *rqstp,
 * - The Reply's transport header will never be larger than a page.
 */
static int svc_rdma_send_reply_msg(struct svcxprt_rdma *rdma,
				   __be32 *rdma_argp, __be32 *rdma_resp,
				   struct svc_rdma_send_ctxt *ctxt,
				   __be32 *rdma_argp,
				   struct svc_rqst *rqstp,
				   __be32 *wr_lst, __be32 *rp_ch)
{
	struct svc_rdma_send_ctxt *ctxt;
	int ret;

	ctxt = svc_rdma_send_ctxt_get(rdma);
	if (!ctxt)
		return -ENOMEM;

	ret = svc_rdma_map_reply_hdr(rdma, ctxt, rdma_resp,
				     svc_rdma_reply_hdr_len(rdma_resp));
	if (ret < 0)
		goto err;

	if (!rp_ch) {
		ret = svc_rdma_map_reply_msg(rdma, ctxt,
					     &rqstp->rq_res, wr_lst);
		if (ret < 0)
			goto err;
			return ret;
	}

	svc_rdma_save_io_pages(rqstp, ctxt);
@@ -683,15 +704,7 @@ static int svc_rdma_send_reply_msg(struct svcxprt_rdma *rdma,
	}
	dprintk("svcrdma: posting Send WR with %u sge(s)\n",
		ctxt->sc_send_wr.num_sge);
	ret = svc_rdma_send(rdma, &ctxt->sc_send_wr);
	if (ret)
		goto err;

	return 0;

err:
	svc_rdma_send_ctxt_put(rdma, ctxt);
	return ret;
	return svc_rdma_send(rdma, &ctxt->sc_send_wr);
}

/* Given the client-provided Write and Reply chunks, the server was not
@@ -702,42 +715,31 @@ static int svc_rdma_send_reply_msg(struct svcxprt_rdma *rdma,
 * Remote Invalidation is skipped for simplicity.
 */
static int svc_rdma_send_error_msg(struct svcxprt_rdma *rdma,
				   __be32 *rdma_resp, struct svc_rqst *rqstp)
				   struct svc_rdma_send_ctxt *ctxt,
				   struct svc_rqst *rqstp)
{
	struct svc_rdma_send_ctxt *ctxt;
	__be32 *p;
	int ret;

	ctxt = svc_rdma_send_ctxt_get(rdma);
	if (!ctxt)
		return -ENOMEM;

	/* Replace the original transport header with an
	 * RDMA_ERROR response. XID etc are preserved.
	 */
	trace_svcrdma_err_chunk(*rdma_resp);
	p = rdma_resp + 3;
	p = ctxt->sc_xprt_buf;
	trace_svcrdma_err_chunk(*p);
	p += 3;
	*p++ = rdma_error;
	*p   = err_chunk;

	ret = svc_rdma_map_reply_hdr(rdma, ctxt, rdma_resp, 20);
	if (ret < 0)
		goto err;
	svc_rdma_sync_reply_hdr(rdma, ctxt, RPCRDMA_HDRLEN_ERR);

	svc_rdma_save_io_pages(rqstp, ctxt);

	ctxt->sc_send_wr.opcode = IB_WR_SEND;
	ret = svc_rdma_send(rdma, &ctxt->sc_send_wr);
	if (ret)
		goto err;

	return 0;

err:
	if (ret) {
		svc_rdma_send_ctxt_put(rdma, ctxt);
		return ret;
	}

	return 0;
}

void svc_rdma_prep_reply_hdr(struct svc_rqst *rqstp)
{
}
@@ -762,7 +764,7 @@ int svc_rdma_sendto(struct svc_rqst *rqstp)
	struct svc_rdma_recv_ctxt *rctxt = rqstp->rq_xprt_ctxt;
	__be32 *p, *rdma_argp, *rdma_resp, *wr_lst, *rp_ch;
	struct xdr_buf *xdr = &rqstp->rq_res;
	struct page *res_page;
	struct svc_rdma_send_ctxt *sctxt;
	int ret;

	rdma_argp = rctxt->rc_recv_buf;
@@ -775,10 +777,10 @@ int svc_rdma_sendto(struct svc_rqst *rqstp)
	 * critical section.
	 */
	ret = -ENOMEM;
	res_page = alloc_page(GFP_KERNEL);
	if (!res_page)
	sctxt = svc_rdma_send_ctxt_get(rdma);
	if (!sctxt)
		goto err0;
	rdma_resp = page_address(res_page);
	rdma_resp = sctxt->sc_xprt_buf;

	p = rdma_resp;
	*p++ = *rdma_argp;
@@ -805,10 +807,11 @@ int svc_rdma_sendto(struct svc_rqst *rqstp)
		svc_rdma_xdr_encode_reply_chunk(rdma_resp, rp_ch, ret);
	}

	ret = svc_rdma_send_reply_msg(rdma, rdma_argp, rdma_resp, rqstp,
	svc_rdma_sync_reply_hdr(rdma, sctxt, svc_rdma_reply_hdr_len(rdma_resp));
	ret = svc_rdma_send_reply_msg(rdma, sctxt, rdma_argp, rqstp,
				      wr_lst, rp_ch);
	if (ret < 0)
		goto err0;
		goto err1;
	ret = 0;

out:
@@ -820,14 +823,14 @@ int svc_rdma_sendto(struct svc_rqst *rqstp)
	if (ret != -E2BIG && ret != -EINVAL)
		goto err1;

	ret = svc_rdma_send_error_msg(rdma, rdma_resp, rqstp);
	ret = svc_rdma_send_error_msg(rdma, sctxt, rqstp);
	if (ret < 0)
		goto err0;
		goto err1;
	ret = 0;
	goto out;

 err1:
	put_page(res_page);
	svc_rdma_send_ctxt_put(rdma, sctxt);
 err0:
	trace_svcrdma_send_failed(rqstp, ret);
	set_bit(XPT_CLOSE, &xprt->xpt_flags);