Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit 0bf48289 authored by Steve Wise's avatar Steve Wise Committed by J. Bruce Fields
Browse files

svcrdma: refactor marshalling logic



This patch refactors the NFSRDMA server marshalling logic to
remove the intermediary map structures.  It also fixes an existing bug
where the NFSRDMA server was not minding the device fast register page
list length limitations.

Signed-off-by: default avatarTom Tucker <tom@opengridcomputing.com>
Signed-off-by: default avatarSteve Wise <swise@opengridcomputing.com>
parent 1b19453d
Loading
Loading
Loading
Loading
+1 −2
Original line number Diff line number Diff line
@@ -115,14 +115,13 @@ struct svc_rdma_fastreg_mr {
	struct list_head frmr_list;
};
struct svc_rdma_req_map {
	struct svc_rdma_fastreg_mr *frmr;
	unsigned long count;
	union {
		struct kvec sge[RPCSVC_MAXPAGES];
		struct svc_rdma_chunk_sge ch[RPCSVC_MAXPAGES];
		unsigned long lkey[RPCSVC_MAXPAGES];
	};
};
#define RDMACTXT_F_FAST_UNREG	1
#define RDMACTXT_F_LAST_CTXT	2

#define	SVCRDMA_DEVCAP_FAST_REG		1	/* fast mr registration */
+273 −370
Original line number Diff line number Diff line
/*
 * Copyright (c) 2014 Open Grid Computing, Inc. All rights reserved.
 * Copyright (c) 2005-2006 Network Appliance, Inc. All rights reserved.
 *
 * This software is available to you under a choice of one of two
@@ -69,7 +70,8 @@ static void rdma_build_arg_xdr(struct svc_rqst *rqstp,

	/* Set up the XDR head */
	rqstp->rq_arg.head[0].iov_base = page_address(page);
	rqstp->rq_arg.head[0].iov_len = min(byte_count, ctxt->sge[0].length);
	rqstp->rq_arg.head[0].iov_len =
		min_t(size_t, byte_count, ctxt->sge[0].length);
	rqstp->rq_arg.len = byte_count;
	rqstp->rq_arg.buflen = byte_count;

@@ -85,7 +87,7 @@ static void rdma_build_arg_xdr(struct svc_rqst *rqstp,
		page = ctxt->pages[sge_no];
		put_page(rqstp->rq_pages[sge_no]);
		rqstp->rq_pages[sge_no] = page;
		bc -= min(bc, ctxt->sge[sge_no].length);
		bc -= min_t(u32, bc, ctxt->sge[sge_no].length);
		rqstp->rq_arg.buflen += ctxt->sge[sge_no].length;
		sge_no++;
	}
@@ -113,291 +115,265 @@ static void rdma_build_arg_xdr(struct svc_rqst *rqstp,
	rqstp->rq_arg.tail[0].iov_len = 0;
}

/* Encode a read-chunk-list as an array of IB SGE
 *
 * Assumptions:
 * - chunk[0]->position points to pages[0] at an offset of 0
 * - pages[] is not physically or virtually contiguous and consists of
 *   PAGE_SIZE elements.
 *
 * Output:
 * - sge array pointing into pages[] array.
 * - chunk_sge array specifying sge index and count for each
 *   chunk in the read list
 *
 */
static int map_read_chunks(struct svcxprt_rdma *xprt,
static int rdma_read_max_sge(struct svcxprt_rdma *xprt, int sge_count)
{
	if (rdma_node_get_transport(xprt->sc_cm_id->device->node_type) ==
	     RDMA_TRANSPORT_IWARP)
		return 1;
	else
		return min_t(int, sge_count, xprt->sc_max_sge);
}

typedef int (*rdma_reader_fn)(struct svcxprt_rdma *xprt,
			      struct svc_rqst *rqstp,
			      struct svc_rdma_op_ctxt *head,
			   struct rpcrdma_msg *rmsgp,
			   struct svc_rdma_req_map *rpl_map,
			   struct svc_rdma_req_map *chl_map,
			   int ch_count,
			   int byte_count)
			      int *page_no,
			      u32 *page_offset,
			      u32 rs_handle,
			      u32 rs_length,
			      u64 rs_offset,
			      int last);

/* Issue an RDMA_READ using the local lkey to map the data sink */
static int rdma_read_chunk_lcl(struct svcxprt_rdma *xprt,
			       struct svc_rqst *rqstp,
			       struct svc_rdma_op_ctxt *head,
			       int *page_no,
			       u32 *page_offset,
			       u32 rs_handle,
			       u32 rs_length,
			       u64 rs_offset,
			       int last)
{
	int sge_no;
	int sge_bytes;
	int page_off;
	int page_no;
	int ch_bytes;
	int ch_no;
	struct rpcrdma_read_chunk *ch;
	struct ib_send_wr read_wr;
	int pages_needed = PAGE_ALIGN(*page_offset + rs_length) >> PAGE_SHIFT;
	struct svc_rdma_op_ctxt *ctxt = svc_rdma_get_context(xprt);
	int ret, read, pno;
	u32 pg_off = *page_offset;
	u32 pg_no = *page_no;

	sge_no = 0;
	page_no = 0;
	page_off = 0;
	ch = (struct rpcrdma_read_chunk *)&rmsgp->rm_body.rm_chunks[0];
	ch_no = 0;
	ch_bytes = ntohl(ch->rc_target.rs_length);
	head->arg.head[0] = rqstp->rq_arg.head[0];
	head->arg.tail[0] = rqstp->rq_arg.tail[0];
	head->arg.pages = &head->pages[head->count];
	head->hdr_count = head->count; /* save count of hdr pages */
	head->arg.page_base = 0;
	head->arg.page_len = ch_bytes;
	head->arg.len = rqstp->rq_arg.len + ch_bytes;
	head->arg.buflen = rqstp->rq_arg.buflen + ch_bytes;
	ctxt->direction = DMA_FROM_DEVICE;
	ctxt->read_hdr = head;
	pages_needed =
		min_t(int, pages_needed, rdma_read_max_sge(xprt, pages_needed));
	read = min_t(int, pages_needed << PAGE_SHIFT, rs_length);

	for (pno = 0; pno < pages_needed; pno++) {
		int len = min_t(int, rs_length, PAGE_SIZE - pg_off);

		head->arg.pages[pg_no] = rqstp->rq_arg.pages[pg_no];
		head->arg.page_len += len;
		head->arg.len += len;
		if (!pg_off)
			head->count++;
	chl_map->ch[0].start = 0;
	while (byte_count) {
		rpl_map->sge[sge_no].iov_base =
			page_address(rqstp->rq_arg.pages[page_no]) + page_off;
		sge_bytes = min_t(int, PAGE_SIZE-page_off, ch_bytes);
		rpl_map->sge[sge_no].iov_len = sge_bytes;
		/*
		 * Don't bump head->count here because the same page
		 * may be used by multiple SGE.
		 */
		head->arg.pages[page_no] = rqstp->rq_arg.pages[page_no];
		rqstp->rq_respages = &rqstp->rq_arg.pages[page_no+1];
		rqstp->rq_respages = &rqstp->rq_arg.pages[pg_no+1];
		rqstp->rq_next_page = rqstp->rq_respages + 1;
		ctxt->sge[pno].addr =
			ib_dma_map_page(xprt->sc_cm_id->device,
					head->arg.pages[pg_no], pg_off,
					PAGE_SIZE - pg_off,
					DMA_FROM_DEVICE);
		ret = ib_dma_mapping_error(xprt->sc_cm_id->device,
					   ctxt->sge[pno].addr);
		if (ret)
			goto err;
		atomic_inc(&xprt->sc_dma_used);

		byte_count -= sge_bytes;
		ch_bytes -= sge_bytes;
		sge_no++;
		/*
		 * If all bytes for this chunk have been mapped to an
		 * SGE, move to the next SGE
		 */
		if (ch_bytes == 0) {
			chl_map->ch[ch_no].count =
				sge_no - chl_map->ch[ch_no].start;
			ch_no++;
			ch++;
			chl_map->ch[ch_no].start = sge_no;
			ch_bytes = ntohl(ch->rc_target.rs_length);
			/* If bytes remaining account for next chunk */
			if (byte_count) {
				head->arg.page_len += ch_bytes;
				head->arg.len += ch_bytes;
				head->arg.buflen += ch_bytes;
		/* The lkey here is either a local dma lkey or a dma_mr lkey */
		ctxt->sge[pno].lkey = xprt->sc_dma_lkey;
		ctxt->sge[pno].length = len;
		ctxt->count++;

		/* adjust offset and wrap to next page if needed */
		pg_off += len;
		if (pg_off == PAGE_SIZE) {
			pg_off = 0;
			pg_no++;
		}
		rs_length -= len;
	}
		/*
		 * If this SGE consumed all of the page, move to the
		 * next page
		 */
		if ((sge_bytes + page_off) == PAGE_SIZE) {
			page_no++;
			page_off = 0;
			/*
			 * If there are still bytes left to map, bump
			 * the page count
			 */
			if (byte_count)
				head->count++;
		} else
			page_off += sge_bytes;

	if (last && rs_length == 0)
		set_bit(RDMACTXT_F_LAST_CTXT, &ctxt->flags);
	else
		clear_bit(RDMACTXT_F_LAST_CTXT, &ctxt->flags);

	memset(&read_wr, 0, sizeof(read_wr));
	read_wr.wr_id = (unsigned long)ctxt;
	read_wr.opcode = IB_WR_RDMA_READ;
	ctxt->wr_op = read_wr.opcode;
	read_wr.send_flags = IB_SEND_SIGNALED;
	read_wr.wr.rdma.rkey = rs_handle;
	read_wr.wr.rdma.remote_addr = rs_offset;
	read_wr.sg_list = ctxt->sge;
	read_wr.num_sge = pages_needed;

	ret = svc_rdma_send(xprt, &read_wr);
	if (ret) {
		pr_err("svcrdma: Error %d posting RDMA_READ\n", ret);
		set_bit(XPT_CLOSE, &xprt->sc_xprt.xpt_flags);
		goto err;
	}
	BUG_ON(byte_count != 0);
	return sge_no;

	/* return current location in page array */
	*page_no = pg_no;
	*page_offset = pg_off;
	ret = read;
	atomic_inc(&rdma_stat_read);
	return ret;
 err:
	svc_rdma_unmap_dma(ctxt);
	svc_rdma_put_context(ctxt, 0);
	return ret;
}

/* Map a read-chunk-list to an XDR and fast register the page-list.
 *
 * Assumptions:
 * - chunk[0]	position points to pages[0] at an offset of 0
 * - pages[]	will be made physically contiguous by creating a one-off memory
 *		region using the fastreg verb.
 * - byte_count is # of bytes in read-chunk-list
 * - ch_count	is # of chunks in read-chunk-list
 *
 * Output:
 * - sge array pointing into pages[] array.
 * - chunk_sge array specifying sge index and count for each
 *   chunk in the read list
 */
static int fast_reg_read_chunks(struct svcxprt_rdma *xprt,
/* Issue an RDMA_READ using an FRMR to map the data sink */
static int rdma_read_chunk_frmr(struct svcxprt_rdma *xprt,
				struct svc_rqst *rqstp,
				struct svc_rdma_op_ctxt *head,
				struct rpcrdma_msg *rmsgp,
				struct svc_rdma_req_map *rpl_map,
				struct svc_rdma_req_map *chl_map,
				int ch_count,
				int byte_count)
				int *page_no,
				u32 *page_offset,
				u32 rs_handle,
				u32 rs_length,
				u64 rs_offset,
				int last)
{
	int page_no;
	int ch_no;
	u32 offset;
	struct rpcrdma_read_chunk *ch;
	struct svc_rdma_fastreg_mr *frmr;
	int ret = 0;
	struct ib_send_wr read_wr;
	struct ib_send_wr inv_wr;
	struct ib_send_wr fastreg_wr;
	u8 key;
	int pages_needed = PAGE_ALIGN(*page_offset + rs_length) >> PAGE_SHIFT;
	struct svc_rdma_op_ctxt *ctxt = svc_rdma_get_context(xprt);
	struct svc_rdma_fastreg_mr *frmr = svc_rdma_get_frmr(xprt);
	int ret, read, pno;
	u32 pg_off = *page_offset;
	u32 pg_no = *page_no;

	frmr = svc_rdma_get_frmr(xprt);
	if (IS_ERR(frmr))
		return -ENOMEM;

	head->frmr = frmr;
	head->arg.head[0] = rqstp->rq_arg.head[0];
	head->arg.tail[0] = rqstp->rq_arg.tail[0];
	head->arg.pages = &head->pages[head->count];
	head->hdr_count = head->count; /* save count of hdr pages */
	head->arg.page_base = 0;
	head->arg.page_len = byte_count;
	head->arg.len = rqstp->rq_arg.len + byte_count;
	head->arg.buflen = rqstp->rq_arg.buflen + byte_count;
	ctxt->direction = DMA_FROM_DEVICE;
	ctxt->frmr = frmr;
	pages_needed = min_t(int, pages_needed, xprt->sc_frmr_pg_list_len);
	read = min_t(int, pages_needed << PAGE_SHIFT, rs_length);

	/* Fast register the page list */
	frmr->kva = page_address(rqstp->rq_arg.pages[0]);
	frmr->kva = page_address(rqstp->rq_arg.pages[pg_no]);
	frmr->direction = DMA_FROM_DEVICE;
	frmr->access_flags = (IB_ACCESS_LOCAL_WRITE|IB_ACCESS_REMOTE_WRITE);
	frmr->map_len = byte_count;
	frmr->page_list_len = PAGE_ALIGN(byte_count) >> PAGE_SHIFT;
	for (page_no = 0; page_no < frmr->page_list_len; page_no++) {
		frmr->page_list->page_list[page_no] =
	frmr->map_len = pages_needed << PAGE_SHIFT;
	frmr->page_list_len = pages_needed;

	for (pno = 0; pno < pages_needed; pno++) {
		int len = min_t(int, rs_length, PAGE_SIZE - pg_off);

		head->arg.pages[pg_no] = rqstp->rq_arg.pages[pg_no];
		head->arg.page_len += len;
		head->arg.len += len;
		if (!pg_off)
			head->count++;
		rqstp->rq_respages = &rqstp->rq_arg.pages[pg_no+1];
		rqstp->rq_next_page = rqstp->rq_respages + 1;
		frmr->page_list->page_list[pno] =
			ib_dma_map_page(xprt->sc_cm_id->device,
					rqstp->rq_arg.pages[page_no], 0,
					head->arg.pages[pg_no], 0,
					PAGE_SIZE, DMA_FROM_DEVICE);
		if (ib_dma_mapping_error(xprt->sc_cm_id->device,
					 frmr->page_list->page_list[page_no]))
			goto fatal_err;
		ret = ib_dma_mapping_error(xprt->sc_cm_id->device,
					   frmr->page_list->page_list[pno]);
		if (ret)
			goto err;
		atomic_inc(&xprt->sc_dma_used);
		head->arg.pages[page_no] = rqstp->rq_arg.pages[page_no];
	}
	head->count += page_no;

	/* rq_respages points one past arg pages */
	rqstp->rq_respages = &rqstp->rq_arg.pages[page_no];
	rqstp->rq_next_page = rqstp->rq_respages + 1;

	/* Create the reply and chunk maps */
	offset = 0;
	ch = (struct rpcrdma_read_chunk *)&rmsgp->rm_body.rm_chunks[0];
	for (ch_no = 0; ch_no < ch_count; ch_no++) {
		int len = ntohl(ch->rc_target.rs_length);
		rpl_map->sge[ch_no].iov_base = frmr->kva + offset;
		rpl_map->sge[ch_no].iov_len = len;
		chl_map->ch[ch_no].count = 1;
		chl_map->ch[ch_no].start = ch_no;
		offset += len;
		ch++;
		/* adjust offset and wrap to next page if needed */
		pg_off += len;
		if (pg_off == PAGE_SIZE) {
			pg_off = 0;
			pg_no++;
		}

	ret = svc_rdma_fastreg(xprt, frmr);
	if (ret)
		goto fatal_err;

	return ch_no;

 fatal_err:
	printk("svcrdma: error fast registering xdr for xprt %p", xprt);
	svc_rdma_put_frmr(xprt, frmr);
	return -EIO;
		rs_length -= len;
	}

static int rdma_set_ctxt_sge(struct svcxprt_rdma *xprt,
			     struct svc_rdma_op_ctxt *ctxt,
			     struct svc_rdma_fastreg_mr *frmr,
			     struct kvec *vec,
			     u64 *sgl_offset,
			     int count)
{
	int i;
	unsigned long off;
	if (last && rs_length == 0)
		set_bit(RDMACTXT_F_LAST_CTXT, &ctxt->flags);
	else
		clear_bit(RDMACTXT_F_LAST_CTXT, &ctxt->flags);

	ctxt->count = count;
	ctxt->direction = DMA_FROM_DEVICE;
	for (i = 0; i < count; i++) {
		ctxt->sge[i].length = 0; /* in case map fails */
		if (!frmr) {
			BUG_ON(!virt_to_page(vec[i].iov_base));
			off = (unsigned long)vec[i].iov_base & ~PAGE_MASK;
			ctxt->sge[i].addr =
				ib_dma_map_page(xprt->sc_cm_id->device,
						virt_to_page(vec[i].iov_base),
						off,
						vec[i].iov_len,
						DMA_FROM_DEVICE);
			if (ib_dma_mapping_error(xprt->sc_cm_id->device,
						 ctxt->sge[i].addr))
				return -EINVAL;
			ctxt->sge[i].lkey = xprt->sc_dma_lkey;
			atomic_inc(&xprt->sc_dma_used);
	/* Bump the key */
	key = (u8)(frmr->mr->lkey & 0x000000FF);
	ib_update_fast_reg_key(frmr->mr, ++key);

	ctxt->sge[0].addr = (unsigned long)frmr->kva + *page_offset;
	ctxt->sge[0].lkey = frmr->mr->lkey;
	ctxt->sge[0].length = read;
	ctxt->count = 1;
	ctxt->read_hdr = head;

	/* Prepare FASTREG WR */
	memset(&fastreg_wr, 0, sizeof(fastreg_wr));
	fastreg_wr.opcode = IB_WR_FAST_REG_MR;
	fastreg_wr.send_flags = IB_SEND_SIGNALED;
	fastreg_wr.wr.fast_reg.iova_start = (unsigned long)frmr->kva;
	fastreg_wr.wr.fast_reg.page_list = frmr->page_list;
	fastreg_wr.wr.fast_reg.page_list_len = frmr->page_list_len;
	fastreg_wr.wr.fast_reg.page_shift = PAGE_SHIFT;
	fastreg_wr.wr.fast_reg.length = frmr->map_len;
	fastreg_wr.wr.fast_reg.access_flags = frmr->access_flags;
	fastreg_wr.wr.fast_reg.rkey = frmr->mr->lkey;
	fastreg_wr.next = &read_wr;

	/* Prepare RDMA_READ */
	memset(&read_wr, 0, sizeof(read_wr));
	read_wr.send_flags = IB_SEND_SIGNALED;
	read_wr.wr.rdma.rkey = rs_handle;
	read_wr.wr.rdma.remote_addr = rs_offset;
	read_wr.sg_list = ctxt->sge;
	read_wr.num_sge = 1;
	if (xprt->sc_dev_caps & SVCRDMA_DEVCAP_READ_W_INV) {
		read_wr.opcode = IB_WR_RDMA_READ_WITH_INV;
		read_wr.wr_id = (unsigned long)ctxt;
		read_wr.ex.invalidate_rkey = ctxt->frmr->mr->lkey;
	} else {
			ctxt->sge[i].addr = (unsigned long)vec[i].iov_base;
			ctxt->sge[i].lkey = frmr->mr->lkey;
		}
		ctxt->sge[i].length = vec[i].iov_len;
		*sgl_offset = *sgl_offset + vec[i].iov_len;
		read_wr.opcode = IB_WR_RDMA_READ;
		read_wr.next = &inv_wr;
		/* Prepare invalidate */
		memset(&inv_wr, 0, sizeof(inv_wr));
		inv_wr.wr_id = (unsigned long)ctxt;
		inv_wr.opcode = IB_WR_LOCAL_INV;
		inv_wr.send_flags = IB_SEND_SIGNALED;
		inv_wr.ex.invalidate_rkey = frmr->mr->lkey;
	}
	return 0;
	ctxt->wr_op = read_wr.opcode;

	/* Post the chain */
	ret = svc_rdma_send(xprt, &fastreg_wr);
	if (ret) {
		pr_err("svcrdma: Error %d posting RDMA_READ\n", ret);
		set_bit(XPT_CLOSE, &xprt->sc_xprt.xpt_flags);
		goto err;
	}

static int rdma_read_max_sge(struct svcxprt_rdma *xprt, int sge_count)
{
	if ((rdma_node_get_transport(xprt->sc_cm_id->device->node_type) ==
	     RDMA_TRANSPORT_IWARP) &&
	    sge_count > 1)
		return 1;
	else
		return min_t(int, sge_count, xprt->sc_max_sge);
	/* return current location in page array */
	*page_no = pg_no;
	*page_offset = pg_off;
	ret = read;
	atomic_inc(&rdma_stat_read);
	return ret;
 err:
	svc_rdma_unmap_dma(ctxt);
	svc_rdma_put_context(ctxt, 0);
	svc_rdma_put_frmr(xprt, frmr);
	return ret;
}

/*
 * Use RDMA_READ to read data from the advertised client buffer into the
 * XDR stream starting at rq_arg.head[0].iov_base.
 * Each chunk in the array
 * contains the following fields:
 * discrim      - '1', This isn't used for data placement
 * position     - The xdr stream offset (the same for every chunk)
 * handle       - RMR for client memory region
 * length       - data transfer length
 * offset       - 64 bit tagged offset in remote memory region
 *
 * On our side, we need to read into a pagelist. The first page immediately
 * follows the RPC header.
 *
 * This function returns:
 * 0 - No error and no read-list found.
 *
 * 1 - Successful read-list processing. The data is not yet in
 * the pagelist and therefore the RPC request must be deferred. The
 * I/O completion will enqueue the transport again and
 * svc_rdma_recvfrom will complete the request.
 *
 * <0 - Error processing/posting read-list.
 *
 * NOTE: The ctxt must not be touched after the last WR has been posted
 * because the I/O completion processing may occur on another
 * processor and free / modify the context. Ne touche pas!
 */
static int rdma_read_xdr(struct svcxprt_rdma *xprt,
static int rdma_read_chunks(struct svcxprt_rdma *xprt,
			    struct rpcrdma_msg *rmsgp,
			    struct svc_rqst *rqstp,
			 struct svc_rdma_op_ctxt *hdr_ctxt)
			    struct svc_rdma_op_ctxt *head)
{
	struct ib_send_wr read_wr;
	struct ib_send_wr inv_wr;
	int err = 0;
	int ch_no;
	int ch_count;
	int byte_count;
	int sge_count;
	u64 sgl_offset;
	int page_no, ch_count, ret;
	struct rpcrdma_read_chunk *ch;
	struct svc_rdma_op_ctxt *ctxt = NULL;
	struct svc_rdma_req_map *rpl_map;
	struct svc_rdma_req_map *chl_map;
	u32 page_offset, byte_count;
	u64 rs_offset;
	rdma_reader_fn reader;

	/* If no read list is present, return 0 */
	ch = svc_rdma_get_read_chunk(rmsgp);
@@ -408,122 +384,55 @@ static int rdma_read_xdr(struct svcxprt_rdma *xprt,
	if (ch_count > RPCSVC_MAXPAGES)
		return -EINVAL;

	/* Allocate temporary reply and chunk maps */
	rpl_map = svc_rdma_get_req_map();
	chl_map = svc_rdma_get_req_map();
	/* The request is completed when the RDMA_READs complete. The
	 * head context keeps all the pages that comprise the
	 * request.
	 */
	head->arg.head[0] = rqstp->rq_arg.head[0];
	head->arg.tail[0] = rqstp->rq_arg.tail[0];
	head->arg.pages = &head->pages[head->count];
	head->hdr_count = head->count;
	head->arg.page_base = 0;
	head->arg.page_len = 0;
	head->arg.len = rqstp->rq_arg.len;
	head->arg.buflen = rqstp->rq_arg.buflen;

	if (!xprt->sc_frmr_pg_list_len)
		sge_count = map_read_chunks(xprt, rqstp, hdr_ctxt, rmsgp,
					    rpl_map, chl_map, ch_count,
					    byte_count);
	/* Use FRMR if supported */
	if (xprt->sc_dev_caps & SVCRDMA_DEVCAP_FAST_REG)
		reader = rdma_read_chunk_frmr;
	else
		sge_count = fast_reg_read_chunks(xprt, rqstp, hdr_ctxt, rmsgp,
						 rpl_map, chl_map, ch_count,
						 byte_count);
	if (sge_count < 0) {
		err = -EIO;
		goto out;
	}

	sgl_offset = 0;
	ch_no = 0;
		reader = rdma_read_chunk_lcl;

	page_no = 0; page_offset = 0;
	for (ch = (struct rpcrdma_read_chunk *)&rmsgp->rm_body.rm_chunks[0];
	     ch->rc_discrim != 0; ch++, ch_no++) {
		u64 rs_offset;
next_sge:
		ctxt = svc_rdma_get_context(xprt);
		ctxt->direction = DMA_FROM_DEVICE;
		ctxt->frmr = hdr_ctxt->frmr;
		ctxt->read_hdr = NULL;
		clear_bit(RDMACTXT_F_LAST_CTXT, &ctxt->flags);
		clear_bit(RDMACTXT_F_FAST_UNREG, &ctxt->flags);
	     ch->rc_discrim != 0; ch++) {

		/* Prepare READ WR */
		memset(&read_wr, 0, sizeof read_wr);
		read_wr.wr_id = (unsigned long)ctxt;
		read_wr.opcode = IB_WR_RDMA_READ;
		ctxt->wr_op = read_wr.opcode;
		read_wr.send_flags = IB_SEND_SIGNALED;
		read_wr.wr.rdma.rkey = ntohl(ch->rc_target.rs_handle);
		xdr_decode_hyper((__be32 *)&ch->rc_target.rs_offset,
				 &rs_offset);
		read_wr.wr.rdma.remote_addr = rs_offset + sgl_offset;
		read_wr.sg_list = ctxt->sge;
		read_wr.num_sge =
			rdma_read_max_sge(xprt, chl_map->ch[ch_no].count);
		err = rdma_set_ctxt_sge(xprt, ctxt, hdr_ctxt->frmr,
					&rpl_map->sge[chl_map->ch[ch_no].start],
					&sgl_offset,
					read_wr.num_sge);
		if (err) {
			svc_rdma_unmap_dma(ctxt);
			svc_rdma_put_context(ctxt, 0);
			goto out;
		}
		if (((ch+1)->rc_discrim == 0) &&
		    (read_wr.num_sge == chl_map->ch[ch_no].count)) {
			/*
			 * Mark the last RDMA_READ with a bit to
			 * indicate all RPC data has been fetched from
			 * the client and the RPC needs to be enqueued.
			 */
			set_bit(RDMACTXT_F_LAST_CTXT, &ctxt->flags);
			if (hdr_ctxt->frmr) {
				set_bit(RDMACTXT_F_FAST_UNREG, &ctxt->flags);
				/*
				 * Invalidate the local MR used to map the data
				 * sink.
				 */
				if (xprt->sc_dev_caps &
				    SVCRDMA_DEVCAP_READ_W_INV) {
					read_wr.opcode =
						IB_WR_RDMA_READ_WITH_INV;
					ctxt->wr_op = read_wr.opcode;
					read_wr.ex.invalidate_rkey =
						ctxt->frmr->mr->lkey;
				} else {
					/* Prepare INVALIDATE WR */
					memset(&inv_wr, 0, sizeof inv_wr);
					inv_wr.opcode = IB_WR_LOCAL_INV;
					inv_wr.send_flags = IB_SEND_SIGNALED;
					inv_wr.ex.invalidate_rkey =
						hdr_ctxt->frmr->mr->lkey;
					read_wr.next = &inv_wr;
				}
			}
			ctxt->read_hdr = hdr_ctxt;
		}
		/* Post the read */
		err = svc_rdma_send(xprt, &read_wr);
		if (err) {
			printk(KERN_ERR "svcrdma: Error %d posting RDMA_READ\n",
			       err);
			set_bit(XPT_CLOSE, &xprt->sc_xprt.xpt_flags);
			svc_rdma_unmap_dma(ctxt);
			svc_rdma_put_context(ctxt, 0);
			goto out;
		}
		atomic_inc(&rdma_stat_read);

		if (read_wr.num_sge < chl_map->ch[ch_no].count) {
			chl_map->ch[ch_no].count -= read_wr.num_sge;
			chl_map->ch[ch_no].start += read_wr.num_sge;
			goto next_sge;
		}
		sgl_offset = 0;
		err = 1;
	}

 out:
	svc_rdma_put_req_map(rpl_map);
	svc_rdma_put_req_map(chl_map);

		byte_count = ntohl(ch->rc_target.rs_length);

		while (byte_count > 0) {
			ret = reader(xprt, rqstp, head,
				     &page_no, &page_offset,
				     ntohl(ch->rc_target.rs_handle),
				     byte_count, rs_offset,
				     ((ch+1)->rc_discrim == 0) /* last */
				     );
			if (ret < 0)
				goto err;
			byte_count -= ret;
			rs_offset += ret;
			head->arg.buflen += ret;
		}
	}
	ret = 1;
 err:
	/* Detach arg pages. svc_recv will replenish them */
	for (ch_no = 0; &rqstp->rq_pages[ch_no] < rqstp->rq_respages; ch_no++)
		rqstp->rq_pages[ch_no] = NULL;
	for (page_no = 0;
	     &rqstp->rq_pages[page_no] < rqstp->rq_respages; page_no++)
		rqstp->rq_pages[page_no] = NULL;

	return err;
	return ret;
}

static int rdma_read_complete(struct svc_rqst *rqstp,
@@ -595,13 +504,9 @@ int svc_rdma_recvfrom(struct svc_rqst *rqstp)
				  struct svc_rdma_op_ctxt,
				  dto_q);
		list_del_init(&ctxt->dto_q);
	}
	if (ctxt) {
		spin_unlock_bh(&rdma_xprt->sc_rq_dto_lock);
		return rdma_read_complete(rqstp, ctxt);
	}

	if (!list_empty(&rdma_xprt->sc_rq_dto_q)) {
	} else if (!list_empty(&rdma_xprt->sc_rq_dto_q)) {
		ctxt = list_entry(rdma_xprt->sc_rq_dto_q.next,
				  struct svc_rdma_op_ctxt,
				  dto_q);
@@ -621,7 +526,6 @@ int svc_rdma_recvfrom(struct svc_rqst *rqstp)
		if (test_bit(XPT_CLOSE, &xprt->xpt_flags))
			goto close_out;

		BUG_ON(ret);
		goto out;
	}
	dprintk("svcrdma: processing ctxt=%p on xprt=%p, rqstp=%p, status=%d\n",
@@ -644,12 +548,11 @@ int svc_rdma_recvfrom(struct svc_rqst *rqstp)
	}

	/* Read read-list data. */
	ret = rdma_read_xdr(rdma_xprt, rmsgp, rqstp, ctxt);
	ret = rdma_read_chunks(rdma_xprt, rmsgp, rqstp, ctxt);
	if (ret > 0) {
		/* read-list posted, defer until data received from client. */
		goto defer;
	}
	if (ret < 0) {
	} else if (ret < 0) {
		/* Post of read-list failed, free context. */
		svc_rdma_put_context(ctxt, 1);
		return 0;