public inbox for passt-dev@passt.top
 help / color / mirror / code / Atom feed
From: David Gibson <david@gibson.dropbear.id.au>
To: passt-dev@passt.top, Stefano Brivio <sbrivio@redhat.com>
Cc: David Gibson <david@gibson.dropbear.id.au>
Subject: [PATCH v2 09/11] udp: Consolidate datagram batching
Date: Fri,  5 Jul 2024 20:44:07 +1000	[thread overview]
Message-ID: <20240705104409.3847002-10-david@gibson.dropbear.id.au> (raw)
In-Reply-To: <20240705104409.3847002-1-david@gibson.dropbear.id.au>

When we receive datagrams on a socket, we need to split them into batches
depending on how they need to be forwarded (either via a specific splice
socket, or via tap).  The logic to do this, is somewhat awkwardly split
between udp_buf_sock_handler() itself, udp_splice_send() and
udp_tap_send().

Move all the batching logic into udp_buf_sock_handler(), leaving
udp_splice_send() to just send the prepared batch.  udp_tap_send() reduces
to just a call to tap_send_frames() so open-code that call in
udp_buf_sock_handler().

This will allow separating the batching logic from the rest of the datagram
forwarding logic, which we'll need for upcoming flow table support.

Signed-off-by: David Gibson <david@gibson.dropbear.id.au>
---
 udp.c | 132 +++++++++++++++++++---------------------------------------
 1 file changed, 42 insertions(+), 90 deletions(-)

diff --git a/udp.c b/udp.c
index af5f23f0..dee402f7 100644
--- a/udp.c
+++ b/udp.c
@@ -501,42 +501,29 @@ static void udp_splice_prepare(struct mmsghdr *mmh, unsigned idx)
 }
 
 /**
- * udp_splice_send() - Send datagrams from socket to socket
+ * udp_splice_send() - Send a batch of datagrams from socket to socket
  * @c:		Execution context
- * @start:	Index of first datagram in udp[46]_l2_buf
- * @n:		Total number of datagrams in udp[46]_l2_buf pool
- * @dst:	Datagrams will be sent to this port (on destination side)
+ * @start:	Index of batch's first datagram in udp[46]_l2_buf
+ * @n:		Number of datagrams in batch
+ * @src:	Source port for datagram (target side)
+ * @dst:	Destination port for datagrams (target side)
  * @ref:	epoll reference for origin socket
  * @now:	Timestamp
- *
- * This consumes as many datagrams as are sendable via a single socket.  It
- * requires that udp_meta[@start].splicesrc is initialised, and will initialise
- * udp_meta[].splicesrc for each datagram it consumes *and one more* (if
- * present).
- *
- * Return: Number of datagrams forwarded
  */
-static unsigned udp_splice_send(const struct ctx *c, size_t start, size_t n,
-				in_port_t dst, union epoll_ref ref,
-				const struct timespec *now)
+static void udp_splice_send(const struct ctx *c, size_t start, size_t n,
+			    in_port_t src, in_port_t dst,
+			    union epoll_ref ref,
+			    const struct timespec *now)
 {
-	in_port_t src = udp_meta[start].splicesrc;
-	struct mmsghdr *mmh_recv;
-	unsigned int i = start;
 	int s;
 
-	ASSERT(udp_meta[start].splicesrc >= 0);
-	ASSERT(ref.type == EPOLL_TYPE_UDP);
-
 	if (ref.udp.v6) {
-		mmh_recv = udp6_mh_recv;
 		udp_splice_to.sa6 = (struct sockaddr_in6) {
 			.sin6_family = AF_INET6,
 			.sin6_addr = in6addr_loopback,
 			.sin6_port = htons(dst),
 		};
 	} else {
-		mmh_recv = udp4_mh_recv;
 		udp_splice_to.sa4 = (struct sockaddr_in) {
 			.sin_family = AF_INET,
 			.sin_addr = in4addr_loopback,
@@ -544,15 +531,6 @@ static unsigned udp_splice_send(const struct ctx *c, size_t start, size_t n,
 		};
 	}
 
-	do {
-		udp_splice_prepare(mmh_recv, i);
-
-		if (++i >= n)
-			break;
-
-		udp_meta[i].splicesrc = udp_mmh_splice_port(ref, &mmh_recv[i]);
-	} while (udp_meta[i].splicesrc == src);
-
 	if (ref.udp.pif == PIF_SPLICE) {
 		src += c->udp.fwd_in.rdelta[src];
 		s = udp_splice_init[ref.udp.v6][src].sock;
@@ -560,7 +538,7 @@ static unsigned udp_splice_send(const struct ctx *c, size_t start, size_t n,
 			s = udp_splice_new(c, ref.udp.v6, src, false);
 
 		if (s < 0)
-			goto out;
+			return;
 
 		udp_splice_ns[ref.udp.v6][dst].ts = now->tv_sec;
 		udp_splice_init[ref.udp.v6][src].ts = now->tv_sec;
@@ -577,15 +555,13 @@ static unsigned udp_splice_send(const struct ctx *c, size_t start, size_t n,
 			s = arg.s;
 		}
 		if (s < 0)
-			goto out;
+			return;
 
 		udp_splice_init[ref.udp.v6][dst].ts = now->tv_sec;
 		udp_splice_ns[ref.udp.v6][src].ts = now->tv_sec;
 	}
 
-	sendmmsg(s, udp_mh_splice + start, i - start, MSG_NOSIGNAL);
-out:
-	return i - start;
+	sendmmsg(s, udp_mh_splice + start, n, MSG_NOSIGNAL);
 }
 
 /**
@@ -725,7 +701,7 @@ static size_t udp_update_hdr6(const struct ctx *c,
  * @v6:		Prepare for IPv6?
  * @now:	Current timestamp
  */
-static void udp_tap_prepare(const struct ctx *c, struct mmsghdr *mmh,
+static void udp_tap_prepare(const struct ctx *c, const struct mmsghdr *mmh,
 			    unsigned idx, in_port_t dstport, bool v6,
 			    const struct timespec *now)
 {
@@ -752,49 +728,6 @@ static void udp_tap_prepare(const struct ctx *c, struct mmsghdr *mmh,
 	(*tap_iov)[UDP_IOV_PAYLOAD].iov_len = l4len;
 }
 
-/**
- * udp_tap_send() - Prepare UDP datagrams and send to tap interface
- * @c:		Execution context
- * @start:	Index of first datagram in udp[46]_l2_buf pool
- * @n:		Total number of datagrams in udp[46]_l2_buf pool
- * @dstport:	Destination port number on destination side
- * @ref:	epoll reference for origin socket
- * @now:	Current timestamp
- *
- * This consumes as many frames as are sendable via tap.  It requires that
- * udp_meta[@start].splicesrc is initialised, and will initialise
- * udp_meta[].splicesrc for each frame it consumes *and one more* (if present).
- *
- * Return: Number of frames sent via tap
- */
-static unsigned udp_tap_send(const struct ctx *c, size_t start, size_t n,
-			     in_port_t dstport, union epoll_ref ref,
-			     const struct timespec *now)
-{
-	struct mmsghdr *mmh_recv;
-	size_t i = start;
-
-	ASSERT(udp_meta[start].splicesrc == -1);
-	ASSERT(ref.type == EPOLL_TYPE_UDP);
-
-	if (ref.udp.v6)
-		mmh_recv = udp6_mh_recv;
-	else
-		mmh_recv = udp4_mh_recv;
-
-	do {
-		udp_tap_prepare(c, mmh_recv, i, dstport, ref.udp.v6, now);
-
-		if (++i >= n)
-			break;
-
-		udp_meta[i].splicesrc = udp_mmh_splice_port(ref, &mmh_recv[i]);
-	} while (udp_meta[i].splicesrc == -1);
-
-	tap_send_frames(c, &udp_l2_iov[start][0], UDP_NUM_IOVS, i - start);
-	return i - start;
-}
-
 /**
  * udp_sock_recv() - Receive datagrams from a socket
  * @c:		Execution context
@@ -842,7 +775,7 @@ void udp_buf_sock_handler(const struct ctx *c, union epoll_ref ref, uint32_t eve
 {
 	struct mmsghdr *mmh_recv = ref.udp.v6 ? udp6_mh_recv : udp4_mh_recv;
 	in_port_t dstport = ref.udp.port;
-	int n, m, i;
+	int n, i;
 
 	if ((n = udp_sock_recv(c, ref.fd, events, mmh_recv)) <= 0)
 		return;
@@ -852,19 +785,38 @@ void udp_buf_sock_handler(const struct ctx *c, union epoll_ref ref, uint32_t eve
 	else if (ref.udp.pif == PIF_HOST)
 		dstport += c->udp.fwd_in.f.delta[dstport];
 
-	/* We divide things into batches based on how we need to send them,
+	/* We divide datagrams into batches based on how we need to send them,
 	 * determined by udp_meta[i].splicesrc.  To avoid either two passes
 	 * through the array, or recalculating splicesrc for a single entry, we
-	 * have to populate it one entry *ahead* of the loop counter (if
-	 * present).  So we fill in entry 0 before the loop, then udp_*_send()
-	 * populate one entry past where they consume.
+	 * have to populate it one entry *ahead* of the loop counter.
 	 */
 	udp_meta[0].splicesrc = udp_mmh_splice_port(ref, mmh_recv);
-	for (i = 0; i < n; i += m) {
-		if (udp_meta[i].splicesrc >= 0)
-			m = udp_splice_send(c, i, n, dstport, ref, now);
-		else
-			m = udp_tap_send(c, i, n, dstport, ref, now);
+	for (i = 0; i < n; ) {
+		int batchsrc = udp_meta[i].splicesrc;
+		int batchstart = i;
+
+		do {
+			if (batchsrc >= 0) {
+				udp_splice_prepare(mmh_recv, i);
+			} else {
+				udp_tap_prepare(c, mmh_recv, i, dstport,
+						ref.udp.v6, now);
+			}
+
+			if (++i >= n)
+				break;
+
+			udp_meta[i].splicesrc = udp_mmh_splice_port(ref,
+								    &mmh_recv[i]);
+		} while (udp_meta[i].splicesrc == batchsrc);
+
+		if (batchsrc >= 0) {
+			udp_splice_send(c, batchstart, i - batchstart,
+					batchsrc, dstport, ref, now);
+		} else {
+			tap_send_frames(c, &udp_l2_iov[batchstart][0],
+					UDP_NUM_IOVS, i - batchstart);
+		}
 	}
 }
 
-- 
@@ -501,42 +501,29 @@ static void udp_splice_prepare(struct mmsghdr *mmh, unsigned idx)
 }
 
 /**
- * udp_splice_send() - Send datagrams from socket to socket
+ * udp_splice_send() - Send a batch of datagrams from socket to socket
  * @c:		Execution context
- * @start:	Index of first datagram in udp[46]_l2_buf
- * @n:		Total number of datagrams in udp[46]_l2_buf pool
- * @dst:	Datagrams will be sent to this port (on destination side)
+ * @start:	Index of batch's first datagram in udp[46]_l2_buf
+ * @n:		Number of datagrams in batch
+ * @src:	Source port for datagram (target side)
+ * @dst:	Destination port for datagrams (target side)
  * @ref:	epoll reference for origin socket
  * @now:	Timestamp
- *
- * This consumes as many datagrams as are sendable via a single socket.  It
- * requires that udp_meta[@start].splicesrc is initialised, and will initialise
- * udp_meta[].splicesrc for each datagram it consumes *and one more* (if
- * present).
- *
- * Return: Number of datagrams forwarded
  */
-static unsigned udp_splice_send(const struct ctx *c, size_t start, size_t n,
-				in_port_t dst, union epoll_ref ref,
-				const struct timespec *now)
+static void udp_splice_send(const struct ctx *c, size_t start, size_t n,
+			    in_port_t src, in_port_t dst,
+			    union epoll_ref ref,
+			    const struct timespec *now)
 {
-	in_port_t src = udp_meta[start].splicesrc;
-	struct mmsghdr *mmh_recv;
-	unsigned int i = start;
 	int s;
 
-	ASSERT(udp_meta[start].splicesrc >= 0);
-	ASSERT(ref.type == EPOLL_TYPE_UDP);
-
 	if (ref.udp.v6) {
-		mmh_recv = udp6_mh_recv;
 		udp_splice_to.sa6 = (struct sockaddr_in6) {
 			.sin6_family = AF_INET6,
 			.sin6_addr = in6addr_loopback,
 			.sin6_port = htons(dst),
 		};
 	} else {
-		mmh_recv = udp4_mh_recv;
 		udp_splice_to.sa4 = (struct sockaddr_in) {
 			.sin_family = AF_INET,
 			.sin_addr = in4addr_loopback,
@@ -544,15 +531,6 @@ static unsigned udp_splice_send(const struct ctx *c, size_t start, size_t n,
 		};
 	}
 
-	do {
-		udp_splice_prepare(mmh_recv, i);
-
-		if (++i >= n)
-			break;
-
-		udp_meta[i].splicesrc = udp_mmh_splice_port(ref, &mmh_recv[i]);
-	} while (udp_meta[i].splicesrc == src);
-
 	if (ref.udp.pif == PIF_SPLICE) {
 		src += c->udp.fwd_in.rdelta[src];
 		s = udp_splice_init[ref.udp.v6][src].sock;
@@ -560,7 +538,7 @@ static unsigned udp_splice_send(const struct ctx *c, size_t start, size_t n,
 			s = udp_splice_new(c, ref.udp.v6, src, false);
 
 		if (s < 0)
-			goto out;
+			return;
 
 		udp_splice_ns[ref.udp.v6][dst].ts = now->tv_sec;
 		udp_splice_init[ref.udp.v6][src].ts = now->tv_sec;
@@ -577,15 +555,13 @@ static unsigned udp_splice_send(const struct ctx *c, size_t start, size_t n,
 			s = arg.s;
 		}
 		if (s < 0)
-			goto out;
+			return;
 
 		udp_splice_init[ref.udp.v6][dst].ts = now->tv_sec;
 		udp_splice_ns[ref.udp.v6][src].ts = now->tv_sec;
 	}
 
-	sendmmsg(s, udp_mh_splice + start, i - start, MSG_NOSIGNAL);
-out:
-	return i - start;
+	sendmmsg(s, udp_mh_splice + start, n, MSG_NOSIGNAL);
 }
 
 /**
@@ -725,7 +701,7 @@ static size_t udp_update_hdr6(const struct ctx *c,
  * @v6:		Prepare for IPv6?
  * @now:	Current timestamp
  */
-static void udp_tap_prepare(const struct ctx *c, struct mmsghdr *mmh,
+static void udp_tap_prepare(const struct ctx *c, const struct mmsghdr *mmh,
 			    unsigned idx, in_port_t dstport, bool v6,
 			    const struct timespec *now)
 {
@@ -752,49 +728,6 @@ static void udp_tap_prepare(const struct ctx *c, struct mmsghdr *mmh,
 	(*tap_iov)[UDP_IOV_PAYLOAD].iov_len = l4len;
 }
 
-/**
- * udp_tap_send() - Prepare UDP datagrams and send to tap interface
- * @c:		Execution context
- * @start:	Index of first datagram in udp[46]_l2_buf pool
- * @n:		Total number of datagrams in udp[46]_l2_buf pool
- * @dstport:	Destination port number on destination side
- * @ref:	epoll reference for origin socket
- * @now:	Current timestamp
- *
- * This consumes as many frames as are sendable via tap.  It requires that
- * udp_meta[@start].splicesrc is initialised, and will initialise
- * udp_meta[].splicesrc for each frame it consumes *and one more* (if present).
- *
- * Return: Number of frames sent via tap
- */
-static unsigned udp_tap_send(const struct ctx *c, size_t start, size_t n,
-			     in_port_t dstport, union epoll_ref ref,
-			     const struct timespec *now)
-{
-	struct mmsghdr *mmh_recv;
-	size_t i = start;
-
-	ASSERT(udp_meta[start].splicesrc == -1);
-	ASSERT(ref.type == EPOLL_TYPE_UDP);
-
-	if (ref.udp.v6)
-		mmh_recv = udp6_mh_recv;
-	else
-		mmh_recv = udp4_mh_recv;
-
-	do {
-		udp_tap_prepare(c, mmh_recv, i, dstport, ref.udp.v6, now);
-
-		if (++i >= n)
-			break;
-
-		udp_meta[i].splicesrc = udp_mmh_splice_port(ref, &mmh_recv[i]);
-	} while (udp_meta[i].splicesrc == -1);
-
-	tap_send_frames(c, &udp_l2_iov[start][0], UDP_NUM_IOVS, i - start);
-	return i - start;
-}
-
 /**
  * udp_sock_recv() - Receive datagrams from a socket
  * @c:		Execution context
@@ -842,7 +775,7 @@ void udp_buf_sock_handler(const struct ctx *c, union epoll_ref ref, uint32_t eve
 {
 	struct mmsghdr *mmh_recv = ref.udp.v6 ? udp6_mh_recv : udp4_mh_recv;
 	in_port_t dstport = ref.udp.port;
-	int n, m, i;
+	int n, i;
 
 	if ((n = udp_sock_recv(c, ref.fd, events, mmh_recv)) <= 0)
 		return;
@@ -852,19 +785,38 @@ void udp_buf_sock_handler(const struct ctx *c, union epoll_ref ref, uint32_t eve
 	else if (ref.udp.pif == PIF_HOST)
 		dstport += c->udp.fwd_in.f.delta[dstport];
 
-	/* We divide things into batches based on how we need to send them,
+	/* We divide datagrams into batches based on how we need to send them,
 	 * determined by udp_meta[i].splicesrc.  To avoid either two passes
 	 * through the array, or recalculating splicesrc for a single entry, we
-	 * have to populate it one entry *ahead* of the loop counter (if
-	 * present).  So we fill in entry 0 before the loop, then udp_*_send()
-	 * populate one entry past where they consume.
+	 * have to populate it one entry *ahead* of the loop counter.
 	 */
 	udp_meta[0].splicesrc = udp_mmh_splice_port(ref, mmh_recv);
-	for (i = 0; i < n; i += m) {
-		if (udp_meta[i].splicesrc >= 0)
-			m = udp_splice_send(c, i, n, dstport, ref, now);
-		else
-			m = udp_tap_send(c, i, n, dstport, ref, now);
+	for (i = 0; i < n; ) {
+		int batchsrc = udp_meta[i].splicesrc;
+		int batchstart = i;
+
+		do {
+			if (batchsrc >= 0) {
+				udp_splice_prepare(mmh_recv, i);
+			} else {
+				udp_tap_prepare(c, mmh_recv, i, dstport,
+						ref.udp.v6, now);
+			}
+
+			if (++i >= n)
+				break;
+
+			udp_meta[i].splicesrc = udp_mmh_splice_port(ref,
+								    &mmh_recv[i]);
+		} while (udp_meta[i].splicesrc == batchsrc);
+
+		if (batchsrc >= 0) {
+			udp_splice_send(c, batchstart, i - batchstart,
+					batchsrc, dstport, ref, now);
+		} else {
+			tap_send_frames(c, &udp_l2_iov[batchstart][0],
+					UDP_NUM_IOVS, i - batchstart);
+		}
 	}
 }
 
-- 
2.45.2


  parent reply	other threads:[~2024-07-05 10:44 UTC|newest]

Thread overview: 15+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2024-07-05 10:43 [PATCH v2 00/11] Preliminaries for UDP flow support David Gibson
2024-07-05 10:43 ` [PATCH v2 01/11] util: sock_l4() determine protocol from epoll type rather than the reverse David Gibson
2024-07-05 10:44 ` [PATCH v2 02/11] flow: Add flow_sidx_valid() helper David Gibson
2024-07-05 10:44 ` [PATCH v2 03/11] udp: Pass full epoll reference through more of sock handler path David Gibson
2024-07-05 10:44 ` [PATCH v2 04/11] udp: Rename IOV and mmsghdr arrays David Gibson
2024-07-05 10:44 ` [PATCH v2 05/11] udp: Unify udp[46]_mh_splice David Gibson
2024-07-05 10:44 ` [PATCH v2 06/11] udp: Unify udp[46]_l2_iov David Gibson
2024-07-05 10:44 ` [PATCH v2 07/11] udp: Don't repeatedly initialise udp[46]_eth_hdr David Gibson
2024-07-05 10:44 ` [PATCH v2 08/11] udp: Move some more of sock_handler tasks into sub-functions David Gibson
2024-07-05 10:44 ` David Gibson [this message]
2024-07-05 10:44 ` [PATCH v2 10/11] doc: Add program to document and test assumptions about SO_REUSEADDR David Gibson
2024-07-12 11:42   ` David Taylor
2024-07-15  0:43     ` David Gibson
2024-07-05 10:44 ` [PATCH v2 11/11] doc: Test behaviour of zero length datagram recv()s David Gibson
2024-07-05 16:38 ` [PATCH v2 00/11] Preliminaries for UDP flow support Stefano Brivio

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=20240705104409.3847002-10-david@gibson.dropbear.id.au \
    --to=david@gibson.dropbear.id.au \
    --cc=passt-dev@passt.top \
    --cc=sbrivio@redhat.com \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
Code repositories for project(s) associated with this public inbox

	https://passt.top/passt

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for IMAP folder(s).