Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit 5b2366bd authored by Andy Grover's avatar Andy Grover
Browse files

RDS: Rewrite rds_send_xmit



Simplify rds_send_xmit().

Send a congestion map (via xmit_cong_map) without
decrementing send_quota.

Move resetting of conn xmit variables to end of loop.

Update comments.

Implement a special case to turn off sending an rds header
when there is an atomic op and no other data.

Signed-off-by: default avatarAndy Grover <andy.grover@oracle.com>
parent 6c7cc6e4
Loading
Loading
Loading
Loading
+2 −3
Original line number Diff line number Diff line
@@ -96,8 +96,9 @@ struct rds_connection {
	unsigned long		c_xmit_sg;
	unsigned int		c_xmit_hdr_off;
	unsigned int		c_xmit_data_off;
	unsigned int		c_xmit_rdma_sent;
	unsigned int		c_xmit_atomic_sent;
	unsigned int		c_xmit_rdma_sent;
	unsigned int		c_xmit_data_sent;

	spinlock_t		c_lock;		/* protect msg queues */
	u64			c_next_tx_seq;
@@ -120,8 +121,6 @@ struct rds_connection {

	struct list_head	c_map_item;
	unsigned long		c_map_queued;
	unsigned long		c_map_offset;
	unsigned long		c_map_bytes;

	unsigned int		c_unacked_packets;
	unsigned int		c_unacked_bytes;
+71 −60
Original line number Diff line number Diff line
@@ -72,8 +72,9 @@ void rds_send_reset(struct rds_connection *conn)
	conn->c_xmit_sg = 0;
	conn->c_xmit_hdr_off = 0;
	conn->c_xmit_data_off = 0;
	conn->c_xmit_rdma_sent = 0;
	conn->c_xmit_atomic_sent = 0;
	conn->c_xmit_rdma_sent = 0;
	conn->c_xmit_data_sent = 0;

	conn->c_map_queued = 0;

@@ -137,69 +138,54 @@ int rds_send_xmit(struct rds_connection *conn)

	/*
	 * spin trying to push headers and data down the connection until
	 * the connection doens't make forward progress.
	 * the connection doesn't make forward progress.
	 */
	while (--send_quota) {
		/*
		 * See if need to send a congestion map update if we're
		 * between sending messages.  The send_sem protects our sole
		 * use of c_map_offset and _bytes.
		 * Note this is used only by transports that define a special
		 * xmit_cong_map function. For all others, we create allocate
		 * a cong_map message and treat it just like any other send.
		 */
		if (conn->c_map_bytes) {
			ret = conn->c_trans->xmit_cong_map(conn, conn->c_lcong,
							   conn->c_map_offset);
			if (ret <= 0)
				break;

			conn->c_map_offset += ret;
			conn->c_map_bytes -= ret;
			if (conn->c_map_bytes)
				continue;
		}

		/* If we're done sending the current message, clear the
		 * offset and S/G temporaries.
		 */
		rm = conn->c_xmit_rm;
		if (rm &&
		    conn->c_xmit_hdr_off == sizeof(struct rds_header) &&
		    conn->c_xmit_sg == rm->data.op_nents) {
			conn->c_xmit_rm = NULL;
			conn->c_xmit_sg = 0;
			conn->c_xmit_hdr_off = 0;
			conn->c_xmit_data_off = 0;
			conn->c_xmit_rdma_sent = 0;
			conn->c_xmit_atomic_sent = 0;

			/* Release the reference to the previous message. */
			rds_message_put(rm);
			rm = NULL;
		}

		/* If we're asked to send a cong map update, do so.
		/*
		 * If between sending messages, we can send a pending congestion
		 * map update.
		 *
		 * Transports either define a special xmit_cong_map function,
		 * or we allocate a cong_map message and treat it just like any
		 * other send.
		 */
		if (!rm && test_and_clear_bit(0, &conn->c_map_queued)) {
			if (conn->c_trans->xmit_cong_map) {
				conn->c_map_offset = 0;
				conn->c_map_bytes = sizeof(struct rds_header) +
				unsigned long map_offset = 0;
				unsigned long map_bytes = sizeof(struct rds_header) +
					RDS_CONG_MAP_BYTES;
				continue;

				while (map_bytes) {
					ret = conn->c_trans->xmit_cong_map(conn, conn->c_lcong,
									   map_offset);
					if (ret <= 0) {
						/* too far down the rabbithole! */
						mutex_unlock(&conn->c_send_lock);
						rds_conn_error(conn, "Cong map xmit failed\n");
						goto out;
					}

					map_offset += ret;
					map_bytes -= ret;
				}
			} else {
				/* send cong update like a normal rm */
				rm = rds_cong_update_alloc(conn);
				if (IS_ERR(rm)) {
					ret = PTR_ERR(rm);
					break;
				}
				rm->data.op_active = 1;

				conn->c_xmit_rm = rm;
			}
		}

		/*
		 * Grab the next message from the send queue, if there is one.
		 * If not already working on one, grab the next message.
		 *
		 * c_xmit_rm holds a ref while we're sending this message down
		 * the connction.  We can use this ref while holding the
@@ -264,7 +250,6 @@ int rds_send_xmit(struct rds_connection *conn)
			conn->c_xmit_rm = rm;
		}


		if (rm->atomic.op_active && !conn->c_xmit_atomic_sent) {
			ret = conn->c_trans->xmit_atomic(conn, rm);
			if (ret)
@@ -273,13 +258,20 @@ int rds_send_xmit(struct rds_connection *conn)
			/* The transport owns the mapped memory for now.
			 * You can't unmap it while it's on the send queue */
			set_bit(RDS_MSG_MAPPED, &rm->m_flags);
		}

			/*
		 * Try and send an rdma message.  Let's see if we can
		 * keep this simple and require that the transport either
		 * send the whole rdma or none of it.
			 * This is evil, muahaha.
			 * We permit 0-byte sends. (rds-ping depends on this.)
			 * BUT if there is an atomic op and no sent data,
			 * we turn off sending the header, to achieve
			 * "silent" atomics.
			 * But see below; RDMA op might toggle this back on!
			 */
			if (rm->data.op_nents == 0)
				rm->data.op_active = 0;
		}

		/* The transport either sends the whole rdma or none of it */
		if (rm->rdma.op_active && !conn->c_xmit_rdma_sent) {
			ret = conn->c_trans->xmit_rdma(conn, &rm->rdma);
			if (ret)
@@ -294,9 +286,7 @@ int rds_send_xmit(struct rds_connection *conn)
			set_bit(RDS_MSG_MAPPED, &rm->m_flags);
		}

		if (rm->data.op_active
		    && (conn->c_xmit_hdr_off < sizeof(struct rds_header) ||
			conn->c_xmit_sg < rm->data.op_nents)) {
		if (rm->data.op_active && !conn->c_xmit_data_sent) {
			ret = conn->c_trans->xmit(conn, rm,
						  conn->c_xmit_hdr_off,
						  conn->c_xmit_sg,
@@ -326,6 +316,27 @@ int rds_send_xmit(struct rds_connection *conn)
					       conn->c_xmit_sg == rm->data.op_nents);
				}
			}

			if (conn->c_xmit_hdr_off == sizeof(struct rds_header) &&
			    (conn->c_xmit_sg == rm->data.op_nents))
				conn->c_xmit_data_sent = 1;
		}

		/*
		 * A rm will only take multiple times through this loop
		 * if there is a data op. Thus, if the data is sent (or there was
		 * none), then we're done with the rm.
		 */
		if (!rm->data.op_active || conn->c_xmit_data_sent) {
			conn->c_xmit_rm = NULL;
			conn->c_xmit_sg = 0;
			conn->c_xmit_hdr_off = 0;
			conn->c_xmit_data_off = 0;
			conn->c_xmit_rdma_sent = 0;
			conn->c_xmit_atomic_sent = 0;
			conn->c_xmit_data_sent = 0;

			rds_message_put(rm);
		}
	}

@@ -350,7 +361,7 @@ int rds_send_xmit(struct rds_connection *conn)
	 */
	mutex_unlock(&conn->c_send_lock);

	if (conn->c_map_bytes || (send_quota == 0 && !was_empty)) {
	if (send_quota == 0 && !was_empty) {
		/* We exhausted the send quota, but there's work left to
		 * do. Return and (re-)schedule the send worker.
		 */