Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit 15133f6e authored by Andy Grover's avatar Andy Grover
Browse files

RDS: Implement atomic operations



Implement a CMSG-based interface to do FADD and CSWP ops.

Alter send routines to handle atomic ops.

Add atomic counters to stats.

Add xmit_atomic() to struct rds_transport

Inline rds_ib_send_unmap_rdma into unmap_rm

Signed-off-by: default avatarAndy Grover <andy.grover@oracle.com>
parent a63273d4
Loading
Loading
Loading
Loading
+19 −0
Original line number Diff line number Diff line
@@ -73,6 +73,8 @@
#define RDS_CMSG_RDMA_MAP		3
#define RDS_CMSG_RDMA_STATUS		4
#define RDS_CMSG_CONG_UPDATE		5
#define RDS_CMSG_ATOMIC_FADD		6
#define RDS_CMSG_ATOMIC_CSWP		7

#define RDS_INFO_FIRST			10000
#define RDS_INFO_COUNTERS		10000
@@ -237,6 +239,23 @@ struct rds_rdma_args {
	u_int64_t	user_token;
};

struct rds_atomic_args {
	rds_rdma_cookie_t cookie;
	uint64_t 	local_addr;
	uint64_t 	remote_addr;
	union {
		struct {
			uint64_t	compare;
			uint64_t	swap;
		} cswp;
		struct {
			uint64_t	add;
		} fadd;
	};
	uint64_t	flags;
	uint64_t	user_token;
};

struct rds_rdma_notify {
	u_int64_t	user_token;
	int32_t		status;
+1 −0
Original line number Diff line number Diff line
@@ -264,6 +264,7 @@ struct rds_transport rds_ib_transport = {
	.xmit			= rds_ib_xmit,
	.xmit_cong_map		= NULL,
	.xmit_rdma		= rds_ib_xmit_rdma,
	.xmit_atomic		= rds_ib_xmit_atomic,
	.recv			= rds_ib_recv,
	.conn_alloc		= rds_ib_conn_alloc,
	.conn_free		= rds_ib_conn_free,
+1 −0
Original line number Diff line number Diff line
@@ -336,6 +336,7 @@ void rds_ib_send_add_credits(struct rds_connection *conn, unsigned int credits);
void rds_ib_advertise_credits(struct rds_connection *conn, unsigned int posted);
int rds_ib_send_grab_credits(struct rds_ib_connection *ic, u32 wanted,
			     u32 *adv_credits, int need_posted, int max_posted);
int rds_ib_xmit_atomic(struct rds_connection *conn, struct rm_atomic_op *op);

/* ib_stats.c */
DECLARE_PER_CPU(struct rds_ib_statistics, rds_ib_stats);
+3 −1
Original line number Diff line number Diff line
@@ -298,7 +298,9 @@ static struct rds_ib_mr *rds_ib_alloc_fmr(struct rds_ib_device *rds_ibdev)
	ibmr->fmr = ib_alloc_fmr(rds_ibdev->pd,
			(IB_ACCESS_LOCAL_WRITE |
			 IB_ACCESS_REMOTE_READ |
			 IB_ACCESS_REMOTE_WRITE),
			 IB_ACCESS_REMOTE_WRITE|
			 IB_ACCESS_REMOTE_ATOMIC),

			&pool->fmr_attr);
	if (IS_ERR(ibmr->fmr)) {
		err = PTR_ERR(ibmr->fmr);
+124 −16
Original line number Diff line number Diff line
@@ -62,15 +62,17 @@ static void rds_ib_send_rdma_complete(struct rds_message *rm,
	rds_rdma_send_complete(rm, notify_status);
}

static void rds_ib_send_unmap_rdma(struct rds_ib_connection *ic,
				   struct rds_rdma_op *op)
static void rds_ib_send_atomic_complete(struct rds_message *rm,
				      int wc_status)
{
	if (op->r_mapped) {
		ib_dma_unmap_sg(ic->i_cm_id->device,
			op->r_sg, op->r_nents,
			op->r_write ? DMA_TO_DEVICE : DMA_FROM_DEVICE);
		op->r_mapped = 0;
	}
	int notify_status;

	if (wc_status != IB_WC_SUCCESS)
		notify_status = RDS_RDMA_OTHER_ERROR;
	else
		notify_status = RDS_RDMA_SUCCESS;

	rds_atomic_send_complete(rm, notify_status);
}

static void rds_ib_send_unmap_rm(struct rds_ib_connection *ic,
@@ -86,7 +88,14 @@ static void rds_ib_send_unmap_rm(struct rds_ib_connection *ic,
			DMA_TO_DEVICE);

	if (rm->rdma.m_rdma_op.r_active) {
		rds_ib_send_unmap_rdma(ic, &rm->rdma.m_rdma_op);
		struct rds_rdma_op *op = &rm->rdma.m_rdma_op;

		if (op->r_mapped) {
			ib_dma_unmap_sg(ic->i_cm_id->device,
					op->r_sg, op->r_nents,
					op->r_write ? DMA_TO_DEVICE : DMA_FROM_DEVICE);
			op->r_mapped = 0;
		}

		/* If the user asked for a completion notification on this
		 * message, we can implement three different semantics:
@@ -116,6 +125,24 @@ static void rds_ib_send_unmap_rm(struct rds_ib_connection *ic,
			rds_stats_add(s_recv_rdma_bytes, rm->rdma.m_rdma_op.r_bytes);
	}

	if (rm->atomic.op_active) {
		struct rm_atomic_op *op = &rm->atomic;

		/* unmap atomic recvbuf */
		if (op->op_mapped) {
			ib_dma_unmap_sg(ic->i_cm_id->device, op->op_sg, 1,
					DMA_FROM_DEVICE);
			op->op_mapped = 0;
		}

		rds_ib_send_atomic_complete(rm, wc_status);

		if (rm->atomic.op_type == RDS_ATOMIC_TYPE_CSWP)
			rds_stats_inc(s_atomic_cswp);
		else
			rds_stats_inc(s_atomic_fadd);
	}

	/* If anyone waited for this message to get flushed out, wake
	 * them up now */
	rds_message_unmapped(rm);
@@ -158,12 +185,9 @@ void rds_ib_send_clear_ring(struct rds_ib_connection *ic)
	u32 i;

	for (i = 0, send = ic->i_sends; i < ic->i_send_ring.w_nr; i++, send++) {
		if (send->s_wr.opcode == 0xdead)
		if (!send->s_rm || send->s_wr.opcode == 0xdead)
			continue;
		if (send->s_rm)
		rds_ib_send_unmap_rm(ic, send, IB_WC_WR_FLUSH_ERR);
		if (send->s_op)
			rds_ib_send_unmap_rdma(ic, send->s_op);
	}
}

@@ -218,6 +242,8 @@ void rds_ib_send_cq_comp_handler(struct ib_cq *cq, void *context)
				break;
			case IB_WR_RDMA_WRITE:
			case IB_WR_RDMA_READ:
			case IB_WR_ATOMIC_FETCH_AND_ADD:
			case IB_WR_ATOMIC_CMP_AND_SWP:
				/* Nothing to be done - the SG list will be unmapped
				 * when the SEND completes. */
				break;
@@ -243,8 +269,7 @@ void rds_ib_send_cq_comp_handler(struct ib_cq *cq, void *context)

				rm = rds_send_get_message(conn, send->s_op);
				if (rm) {
					if (rm->rdma.m_rdma_op.r_active)
						rds_ib_send_unmap_rdma(ic, &rm->rdma.m_rdma_op);
					rds_ib_send_unmap_rm(ic, send, wc.status);
					rds_ib_send_rdma_complete(rm, wc.status);
					rds_message_put(rm);
				}
@@ -736,6 +761,89 @@ int rds_ib_xmit(struct rds_connection *conn, struct rds_message *rm,
	return ret;
}

/*
 * Issue atomic operation.
 * A simplified version of the rdma case, we always map 1 SG, and
 * only 8 bytes, for the return value from the atomic operation.
 */
int rds_ib_xmit_atomic(struct rds_connection *conn, struct rm_atomic_op *op)
{
	struct rds_ib_connection *ic = conn->c_transport_data;
	struct rds_ib_send_work *send = NULL;
	struct ib_send_wr *failed_wr;
	struct rds_ib_device *rds_ibdev;
	u32 pos;
	u32 work_alloc;
	int ret;

	rds_ibdev = ib_get_client_data(ic->i_cm_id->device, &rds_ib_client);

	work_alloc = rds_ib_ring_alloc(&ic->i_send_ring, 1, &pos);
	if (work_alloc != 1) {
		rds_ib_ring_unalloc(&ic->i_send_ring, work_alloc);
		rds_ib_stats_inc(s_ib_tx_ring_full);
		ret = -ENOMEM;
		goto out;
	}

	/* address of send request in ring */
	send = &ic->i_sends[pos];
	send->s_queued = jiffies;

	if (op->op_type == RDS_ATOMIC_TYPE_CSWP) {
		send->s_wr.opcode = IB_WR_ATOMIC_CMP_AND_SWP;
		send->s_wr.wr.atomic.compare_add = op->op_compare;
		send->s_wr.wr.atomic.swap = op->op_swap_add;
	} else { /* FADD */
		send->s_wr.opcode = IB_WR_ATOMIC_FETCH_AND_ADD;
		send->s_wr.wr.atomic.compare_add = op->op_swap_add;
		send->s_wr.wr.atomic.swap = 0;
	}
	send->s_wr.send_flags = IB_SEND_SIGNALED;
	send->s_wr.num_sge = 1;
	send->s_wr.next = NULL;
	send->s_wr.wr.atomic.remote_addr = op->op_remote_addr;
	send->s_wr.wr.atomic.rkey = op->op_rkey;

	/* map 8 byte retval buffer to the device */
	ret = ib_dma_map_sg(ic->i_cm_id->device, op->op_sg, 1, DMA_FROM_DEVICE);
	rdsdebug("ic %p mapping atomic op %p. mapped %d pg\n", ic, op, ret);
	if (ret != 1) {
		rds_ib_ring_unalloc(&ic->i_send_ring, work_alloc);
		rds_ib_stats_inc(s_ib_tx_sg_mapping_failure);
		ret = -ENOMEM; /* XXX ? */
		goto out;
	}

	/* Convert our struct scatterlist to struct ib_sge */
	send->s_sge[0].addr = ib_sg_dma_address(ic->i_cm_id->device, op->op_sg);
	send->s_sge[0].length = ib_sg_dma_len(ic->i_cm_id->device, op->op_sg);
	send->s_sge[0].lkey = ic->i_mr->lkey;

	rdsdebug("rva %Lx rpa %Lx len %u\n", op->op_remote_addr,
		 send->s_sge[0].addr, send->s_sge[0].length);

	failed_wr = &send->s_wr;
	ret = ib_post_send(ic->i_cm_id->qp, &send->s_wr, &failed_wr);
	rdsdebug("ic %p send %p (wr %p) ret %d wr %p\n", ic,
		 send, &send->s_wr, ret, failed_wr);
	BUG_ON(failed_wr != &send->s_wr);
	if (ret) {
		printk(KERN_WARNING "RDS/IB: atomic ib_post_send to %pI4 "
		       "returned %d\n", &conn->c_faddr, ret);
		rds_ib_ring_unalloc(&ic->i_send_ring, work_alloc);
		goto out;
	}

	if (unlikely(failed_wr != &send->s_wr)) {
		printk(KERN_WARNING "RDS/IB: atomic ib_post_send() rc=%d, but failed_wqe updated!\n", ret);
		BUG_ON(failed_wr != &send->s_wr);
	}

out:
	return ret;
}

int rds_ib_xmit_rdma(struct rds_connection *conn, struct rds_rdma_op *op)
{
	struct rds_ib_connection *ic = conn->c_transport_data;
Loading