public inbox for passt-dev@passt.top
 help / color / mirror / code / Atom feed
* [PATCH 00/12] Use connect()ed sockets for both sides of UDP flows
@ 2025-04-04 10:15 David Gibson
  2025-04-04 10:15 ` [PATCH 01/12] udp: Use connect()ed sockets for initiating side David Gibson
                   ` (11 more replies)
  0 siblings, 12 replies; 13+ messages in thread
From: David Gibson @ 2025-04-04 10:15 UTC (permalink / raw)
  To: Stefano Brivio, passt-dev; +Cc: David Gibson

As discussed, I've been working on using connect()ed sockets, rather
than dups of the listening sockets for handling traffic on the
initiating side of UDP flows.  This improves consistency, avoids some
problems (bug 103) and will allow for some useful future improvements.

It has the nice side effect of allowing some more code to be shared
between various paths, resulting in a pretty nice negative diffstat.

David Gibson (12):
  udp: Use connect()ed sockets for initiating side
  udp: Make udp_sock_recv() take max number of frames as a parameter
  udp: Polish udp_vu_sock_info() and remove from vu specific code
  udp: Don't bother to batch datagrams from "listening" socket
  udp: Parameterize number of datagrams handled by
    udp_*_reply_sock_data()
  udp: Split spliced forwarding path from udp_buf_reply_sock_data()
  udp: Merge vhost-user and "buf" listening socket paths
  udp: Move UDP_MAX_FRAMES to udp.c
  udp_flow: Take pif and port as explicit parameters to
    udp_flow_from_sock()
  udp: Rework udp_listen_sock_data() into udp_sock_fwd()
  udp: Fold udp_splice_prepare and udp_splice_send into udp_sock_to_sock
  udp_flow: Don't discard packets that arrive between bind() and
    connect()

 epoll_type.h   |   4 +-
 flow.c         |   2 +-
 passt.c        |   6 +-
 udp.c          | 332 +++++++++++++++++++++++--------------------------
 udp.h          |   4 +-
 udp_flow.c     | 120 ++++++++++--------
 udp_flow.h     |   8 +-
 udp_internal.h |   4 +-
 udp_vu.c       |  93 +-------------
 udp_vu.h       |   2 +-
 util.c         |   2 +-
 11 files changed, 242 insertions(+), 335 deletions(-)

-- 
2.49.0


^ permalink raw reply	[flat|nested] 13+ messages in thread

* [PATCH 01/12] udp: Use connect()ed sockets for initiating side
  2025-04-04 10:15 [PATCH 00/12] Use connect()ed sockets for both sides of UDP flows David Gibson
@ 2025-04-04 10:15 ` David Gibson
  2025-04-04 10:15 ` [PATCH 02/12] udp: Make udp_sock_recv() take max number of frames as a parameter David Gibson
                   ` (10 subsequent siblings)
  11 siblings, 0 replies; 13+ messages in thread
From: David Gibson @ 2025-04-04 10:15 UTC (permalink / raw)
  To: Stefano Brivio, passt-dev; +Cc: David Gibson

Currently we have an asymmetry in how we handle UDP sockets.  For flows
where the target side is a socket, we create a new connect()ed socket
- the "reply socket" specifically for that flow used for sending and
receiving datagrams on that flow and only that flow.  For flows where the
initiating side is a socket, we continue to use the "listening" socket (or
rather, a dup() of it).  This has some disadvantages:

 * We need a hash lookup for every datagram on the listening socket in
   order to work out what flow it belongs to
 * The dup() keeps the socket alive even if automatic forwarding removes
   the listening socket.  However, the epoll data remains the same
   including containing the now stale original fd.  This causes bug 103.
 * We can't (easily) set flow-specific options on an initiating side
   socket, because that could affect other flows as well

Alter the code to use a connect()ed socket on the initiating side as well
as the target side.  There's no way to "clone and connect" the listening
socket (a loose equivalent of accept() for UDP), so we have to create a
new socket.  We have to bind() this socket before we connect() it, which
is allowed thanks to SO_REUSEADDR, but does leave a small window where it
could receive datagrams not intended for this flow.  For now we handle this
by simply discarding any datagrams received between bind() and connect(),
but I intend to improve this in a later patch.

Link: https://bugs.passt.top/show_bug.cgi?id=103

Signed-off-by: David Gibson <david@gibson.dropbear.id.au>
---
 epoll_type.h |  4 ++--
 passt.c      |  6 +++---
 udp.c        | 46 ++++++++++++++++++++++++----------------------
 udp.h        |  4 ++--
 udp_flow.c   | 32 +++++++++-----------------------
 util.c       |  2 +-
 6 files changed, 41 insertions(+), 53 deletions(-)

diff --git a/epoll_type.h b/epoll_type.h
index 7f2a1217..12ac59b1 100644
--- a/epoll_type.h
+++ b/epoll_type.h
@@ -22,8 +22,8 @@ enum epoll_type {
 	EPOLL_TYPE_TCP_TIMER,
 	/* UDP "listening" sockets */
 	EPOLL_TYPE_UDP_LISTEN,
-	/* UDP socket for replies on a specific flow */
-	EPOLL_TYPE_UDP_REPLY,
+	/* UDP socket for a specific flow */
+	EPOLL_TYPE_UDP,
 	/* ICMP/ICMPv6 ping sockets */
 	EPOLL_TYPE_PING,
 	/* inotify fd watching for end of netns (pasta) */
diff --git a/passt.c b/passt.c
index cd067728..388d10f1 100644
--- a/passt.c
+++ b/passt.c
@@ -68,7 +68,7 @@ char *epoll_type_str[] = {
 	[EPOLL_TYPE_TCP_LISTEN]		= "listening TCP socket",
 	[EPOLL_TYPE_TCP_TIMER]		= "TCP timer",
 	[EPOLL_TYPE_UDP_LISTEN]		= "listening UDP socket",
-	[EPOLL_TYPE_UDP_REPLY]		= "UDP reply socket",
+	[EPOLL_TYPE_UDP]		= "UDP flow socket",
 	[EPOLL_TYPE_PING]	= "ICMP/ICMPv6 ping socket",
 	[EPOLL_TYPE_NSQUIT_INOTIFY]	= "namespace inotify watch",
 	[EPOLL_TYPE_NSQUIT_TIMER]	= "namespace timer watch",
@@ -339,8 +339,8 @@ loop:
 		case EPOLL_TYPE_UDP_LISTEN:
 			udp_listen_sock_handler(&c, ref, eventmask, &now);
 			break;
-		case EPOLL_TYPE_UDP_REPLY:
-			udp_reply_sock_handler(&c, ref, eventmask, &now);
+		case EPOLL_TYPE_UDP:
+			udp_sock_handler(&c, ref, eventmask, &now);
 			break;
 		case EPOLL_TYPE_PING:
 			icmp_sock_handler(&c, ref);
diff --git a/udp.c b/udp.c
index ab3e9d20..fa6fccdc 100644
--- a/udp.c
+++ b/udp.c
@@ -39,27 +39,30 @@
  * could receive packets from multiple flows, so we use a hash table match to
  * find the specific flow for a datagram.
  *
- * When a UDP flow is initiated from a listening socket we take a duplicate of
- * the socket and store it in uflow->s[INISIDE].  This will last for the
- * lifetime of the flow, even if the original listening socket is closed due to
- * port auto-probing.  The duplicate is used to deliver replies back to the
- * originating side.
- *
- * Reply sockets
- * =============
+ * Flow sockets
+ * ============
  *
- * When a UDP flow targets a socket, we create a "reply" socket in
+ * When a UDP flow targets a socket, we create a "flow" socket in
  * uflow->s[TGTSIDE] both to deliver datagrams to the target side and receive
  * replies on the target side.  This socket is both bound and connected and has
- * EPOLL_TYPE_UDP_REPLY.  The connect() means it will only receive datagrams
+ * EPOLL_TYPE_UDP.  The connect() means it will only receive datagrams
  * associated with this flow, so the epoll reference directly points to the flow
  * and we don't need a hash lookup.
  *
- * NOTE: it's possible that the reply socket could have a bound address
- * overlapping with an unrelated listening socket.  We assume datagrams for the
- * flow will come to the reply socket in preference to a listening socket.  The
- * sample program doc/platform-requirements/reuseaddr-priority.c documents and
- * tests that assumption.
+ * When a flow is initiated from a listening socket, we create a "flow" socket
+ * with the same bound address as the listening socket, but also connect()ed to
+ * the flow's peer.  This is stored in uflow->s[INISIDE] and will last for the
+ * lifetime of the flow, even if the original listening socket is closed due to
+ * port auto-probing.  The duplicate is used to deliver replies back to the
+ * originating side.
+ *
+ * NOTE: A flow socket can have a bound address overlapping with a listening
+ * socket.  That will happen naturally for flows initiated from a socket, but is
+ * also possible (though unlikely) for tap initiated flows, depending on the
+ * source port.  We assume datagrams for the flow will come to a connect()ed
+ * socket in preference to a listening socket.  The sample program
+ * doc/platform-requirements/reuseaddr-priority.c documents and tests that
+ * assumption.
  *
  * "Spliced" flows
  * ===============
@@ -71,8 +74,7 @@
  * actually used; it doesn't make sense for datagrams and instead a pair of
  * recvmmsg() and sendmmsg() is used to forward the datagrams.
  *
- * Note that a spliced flow will have *both* a duplicated listening socket and a
- * reply socket (see above).
+ * Note that a spliced flow will have two flow sockets (see above).
  */
 
 #include <sched.h>
@@ -557,7 +559,7 @@ static int udp_sock_recverr(const struct ctx *c, union epoll_ref ref)
 	}
 
 	eh = (const struct errhdr *)CMSG_DATA(hdr);
-	if (ref.type == EPOLL_TYPE_UDP_REPLY) {
+	if (ref.type == EPOLL_TYPE_UDP) {
 		flow_sidx_t sidx = flow_sidx_opposite(ref.flowside);
 		const struct flowside *toside = flowside_at_sidx(sidx);
 		size_t dlen = rc;
@@ -792,14 +794,14 @@ static bool udp_buf_reply_sock_data(const struct ctx *c,
 }
 
 /**
- * udp_reply_sock_handler() - Handle new data from flow specific socket
+ * udp_sock_handler() - Handle new data from flow specific socket
  * @c:		Execution context
  * @ref:	epoll reference
  * @events:	epoll events bitmap
  * @now:	Current timestamp
  */
-void udp_reply_sock_handler(const struct ctx *c, union epoll_ref ref,
-			    uint32_t events, const struct timespec *now)
+void udp_sock_handler(const struct ctx *c, union epoll_ref ref,
+		      uint32_t events, const struct timespec *now)
 {
 	struct udp_flow *uflow = udp_at_sidx(ref.flowside);
 
@@ -807,7 +809,7 @@ void udp_reply_sock_handler(const struct ctx *c, union epoll_ref ref,
 
 	if (events & EPOLLERR) {
 		if (udp_sock_errs(c, ref) < 0) {
-			flow_err(uflow, "Unrecoverable error on reply socket");
+			flow_err(uflow, "Unrecoverable error on flow socket");
 			goto fail;
 		}
 	}
diff --git a/udp.h b/udp.h
index de2df6de..8fc42838 100644
--- a/udp.h
+++ b/udp.h
@@ -11,8 +11,8 @@
 void udp_portmap_clear(void);
 void udp_listen_sock_handler(const struct ctx *c, union epoll_ref ref,
 			     uint32_t events, const struct timespec *now);
-void udp_reply_sock_handler(const struct ctx *c, union epoll_ref ref,
-			    uint32_t events, const struct timespec *now);
+void udp_sock_handler(const struct ctx *c, union epoll_ref ref,
+		      uint32_t events, const struct timespec *now);
 int udp_tap_handler(const struct ctx *c, uint8_t pif,
 		    sa_family_t af, const void *saddr, const void *daddr,
 		    const struct pool *p, int idx, const struct timespec *now);
diff --git a/udp_flow.c b/udp_flow.c
index bf4b8965..d50bddb2 100644
--- a/udp_flow.c
+++ b/udp_flow.c
@@ -49,10 +49,7 @@ void udp_flow_close(const struct ctx *c, struct udp_flow *uflow)
 	flow_foreach_sidei(sidei) {
 		flow_hash_remove(c, FLOW_SIDX(uflow, sidei));
 		if (uflow->s[sidei] >= 0) {
-			/* The listening socket needs to stay in epoll, but the
-			 * flow specific one needs to be removed */
-			if (sidei == TGTSIDE)
-				epoll_del(c, uflow->s[sidei]);
+			epoll_del(c, uflow->s[sidei]);
 			close(uflow->s[sidei]);
 			uflow->s[sidei] = -1;
 		}
@@ -81,7 +78,7 @@ static int udp_flow_sock(const struct ctx *c,
 	} fref = { .sidx = FLOW_SIDX(uflow, sidei) };
 	int rc, s;
 
-	s = flowside_sock_l4(c, EPOLL_TYPE_UDP_REPLY, pif, side, fref.data);
+	s = flowside_sock_l4(c, EPOLL_TYPE_UDP, pif, side, fref.data);
 	if (s < 0) {
 		flow_dbg_perror(uflow, "Couldn't open flow specific socket");
 		return s;
@@ -120,13 +117,12 @@ static int udp_flow_sock(const struct ctx *c,
  * udp_flow_new() - Common setup for a new UDP flow
  * @c:		Execution context
  * @flow:	Initiated flow
- * @s_ini:	Initiating socket (or -1)
  * @now:	Timestamp
  *
  * Return: UDP specific flow, if successful, NULL on failure
  */
 static flow_sidx_t udp_flow_new(const struct ctx *c, union flow *flow,
-				int s_ini, const struct timespec *now)
+				const struct timespec *now)
 {
 	struct udp_flow *uflow = NULL;
 	unsigned sidei;
@@ -138,22 +134,12 @@ static flow_sidx_t udp_flow_new(const struct ctx *c, union flow *flow,
 	uflow->ts = now->tv_sec;
 	uflow->s[INISIDE] = uflow->s[TGTSIDE] = -1;
 
-	if (s_ini >= 0) {
-		/* When using auto port-scanning the listening port could go
-		 * away, so we need to duplicate the socket
-		 */
-		uflow->s[INISIDE] = fcntl(s_ini, F_DUPFD_CLOEXEC, 0);
-		if (uflow->s[INISIDE] < 0) {
-			flow_perror(uflow,
-				    "Couldn't duplicate listening socket");
-			goto cancel;
-		}
+	flow_foreach_sidei(sidei) {
+		if (pif_is_socket(uflow->f.pif[sidei]))
+			if ((uflow->s[sidei] = udp_flow_sock(c, uflow, sidei)) < 0)
+				goto cancel;
 	}
 
-	if (pif_is_socket(flow->f.pif[TGTSIDE]))
-		if ((uflow->s[TGTSIDE] = udp_flow_sock(c, uflow, TGTSIDE)) < 0)
-			goto cancel;
-
 	/* Tap sides always need to be looked up by hash.  Socket sides don't
 	 * always, but sometimes do (receiving packets on a socket not specific
 	 * to one flow).  Unconditionally hash both sides so all our bases are
@@ -224,7 +210,7 @@ flow_sidx_t udp_flow_from_sock(const struct ctx *c, union epoll_ref ref,
 		return FLOW_SIDX_NONE;
 	}
 
-	return udp_flow_new(c, flow, ref.fd, now);
+	return udp_flow_new(c, flow, now);
 }
 
 /**
@@ -280,7 +266,7 @@ flow_sidx_t udp_flow_from_tap(const struct ctx *c,
 		return FLOW_SIDX_NONE;
 	}
 
-	return udp_flow_new(c, flow, -1, now);
+	return udp_flow_new(c, flow, now);
 }
 
 /**
diff --git a/util.c b/util.c
index b9a3d434..0f68cf57 100644
--- a/util.c
+++ b/util.c
@@ -71,7 +71,7 @@ int sock_l4_sa(const struct ctx *c, enum epoll_type type,
 	case EPOLL_TYPE_UDP_LISTEN:
 		freebind = c->freebind;
 		/* fallthrough */
-	case EPOLL_TYPE_UDP_REPLY:
+	case EPOLL_TYPE_UDP:
 		proto = IPPROTO_UDP;
 		socktype = SOCK_DGRAM | SOCK_NONBLOCK;
 		break;
-- 
@@ -71,7 +71,7 @@ int sock_l4_sa(const struct ctx *c, enum epoll_type type,
 	case EPOLL_TYPE_UDP_LISTEN:
 		freebind = c->freebind;
 		/* fallthrough */
-	case EPOLL_TYPE_UDP_REPLY:
+	case EPOLL_TYPE_UDP:
 		proto = IPPROTO_UDP;
 		socktype = SOCK_DGRAM | SOCK_NONBLOCK;
 		break;
-- 
2.49.0


^ permalink raw reply related	[flat|nested] 13+ messages in thread

* [PATCH 02/12] udp: Make udp_sock_recv() take max number of frames as a parameter
  2025-04-04 10:15 [PATCH 00/12] Use connect()ed sockets for both sides of UDP flows David Gibson
  2025-04-04 10:15 ` [PATCH 01/12] udp: Use connect()ed sockets for initiating side David Gibson
@ 2025-04-04 10:15 ` David Gibson
  2025-04-04 10:15 ` [PATCH 03/12] udp: Polish udp_vu_sock_info() and remove from vu specific code David Gibson
                   ` (9 subsequent siblings)
  11 siblings, 0 replies; 13+ messages in thread
From: David Gibson @ 2025-04-04 10:15 UTC (permalink / raw)
  To: Stefano Brivio, passt-dev; +Cc: David Gibson

Currently udp_sock_recv() decides the maximum number of frames it is
willing to receive based on the mode.  However, we have upcoming use cases
where we will have different criteria for how many frames we want with
information that's not naturally available here but is in the caller.  So
make the maximum number of frames a parameter.

Signed-off-by: David Gibson <david@gibson.dropbear.id.au>
---
 udp.c | 27 +++++++++++++--------------
 1 file changed, 13 insertions(+), 14 deletions(-)

diff --git a/udp.c b/udp.c
index fa6fccdc..8125cfcb 100644
--- a/udp.c
+++ b/udp.c
@@ -634,22 +634,14 @@ static int udp_sock_errs(const struct ctx *c, union epoll_ref ref)
  * @c:		Execution context
  * @s:		Socket to receive from
  * @mmh		mmsghdr array to receive into
+ * @n:		Maximum number of datagrams to receive
  *
  * Return: Number of datagrams received
  *
  * #syscalls recvmmsg arm:recvmmsg_time64 i686:recvmmsg_time64
  */
-static int udp_sock_recv(const struct ctx *c, int s, struct mmsghdr *mmh)
+static int udp_sock_recv(const struct ctx *c, int s, struct mmsghdr *mmh, int n)
 {
-	/* For not entirely clear reasons (data locality?) pasta gets better
-	 * throughput if we receive tap datagrams one at a atime.  For small
-	 * splice datagrams throughput is slightly better if we do batch, but
-	 * it's slightly worse for large splice datagrams.  Since we don't know
-	 * before we receive whether we'll use tap or splice, always go one at a
-	 * time for pasta mode.
-	 */
-	int n = (c->mode == MODE_PASTA ? 1 : UDP_MAX_FRAMES);
-
 	ASSERT(!c->no_udp);
 
 	n = recvmmsg(s, mmh, n, 0, NULL);
@@ -671,9 +663,10 @@ static void udp_buf_listen_sock_data(const struct ctx *c, union epoll_ref ref,
 				     const struct timespec *now)
 {
 	const socklen_t sasize = sizeof(udp_meta[0].s_in);
-	int n, i;
+	/* See udp_buf_sock_data() comment */
+	int n = (c->mode == MODE_PASTA ? 1 : UDP_MAX_FRAMES), i;
 
-	if ((n = udp_sock_recv(c, ref.fd, udp_mh_recv)) <= 0)
+	if ((n = udp_sock_recv(c, ref.fd, udp_mh_recv, n)) <= 0)
 		return;
 
 	/* We divide datagrams into batches based on how we need to send them,
@@ -768,9 +761,15 @@ static bool udp_buf_reply_sock_data(const struct ctx *c,
 {
 	const struct flowside *toside = flowside_at_sidx(tosidx);
 	uint8_t topif = pif_at_sidx(tosidx);
-	int n, i;
+	/* For not entirely clear reasons (data locality?) pasta gets better
+	 * throughput if we receive tap datagrams one at a a time.  For small
+	 * splice datagrams throughput is slightly better if we do batch, but
+	 * it's slightly worse for large splice datagrams.  Since we don't know
+	 * the size before we receive, always go one at a time for pasta mode.
+	 */
+	int n = (c->mode == MODE_PASTA ? 1 : UDP_MAX_FRAMES), i;
 
-	if ((n = udp_sock_recv(c, s, udp_mh_recv)) <= 0)
+	if ((n = udp_sock_recv(c, s, udp_mh_recv, n)) <= 0)
 		return true;
 
 	for (i = 0; i < n; i++) {
-- 
@@ -634,22 +634,14 @@ static int udp_sock_errs(const struct ctx *c, union epoll_ref ref)
  * @c:		Execution context
  * @s:		Socket to receive from
  * @mmh		mmsghdr array to receive into
+ * @n:		Maximum number of datagrams to receive
  *
  * Return: Number of datagrams received
  *
  * #syscalls recvmmsg arm:recvmmsg_time64 i686:recvmmsg_time64
  */
-static int udp_sock_recv(const struct ctx *c, int s, struct mmsghdr *mmh)
+static int udp_sock_recv(const struct ctx *c, int s, struct mmsghdr *mmh, int n)
 {
-	/* For not entirely clear reasons (data locality?) pasta gets better
-	 * throughput if we receive tap datagrams one at a atime.  For small
-	 * splice datagrams throughput is slightly better if we do batch, but
-	 * it's slightly worse for large splice datagrams.  Since we don't know
-	 * before we receive whether we'll use tap or splice, always go one at a
-	 * time for pasta mode.
-	 */
-	int n = (c->mode == MODE_PASTA ? 1 : UDP_MAX_FRAMES);
-
 	ASSERT(!c->no_udp);
 
 	n = recvmmsg(s, mmh, n, 0, NULL);
@@ -671,9 +663,10 @@ static void udp_buf_listen_sock_data(const struct ctx *c, union epoll_ref ref,
 				     const struct timespec *now)
 {
 	const socklen_t sasize = sizeof(udp_meta[0].s_in);
-	int n, i;
+	/* See udp_buf_sock_data() comment */
+	int n = (c->mode == MODE_PASTA ? 1 : UDP_MAX_FRAMES), i;
 
-	if ((n = udp_sock_recv(c, ref.fd, udp_mh_recv)) <= 0)
+	if ((n = udp_sock_recv(c, ref.fd, udp_mh_recv, n)) <= 0)
 		return;
 
 	/* We divide datagrams into batches based on how we need to send them,
@@ -768,9 +761,15 @@ static bool udp_buf_reply_sock_data(const struct ctx *c,
 {
 	const struct flowside *toside = flowside_at_sidx(tosidx);
 	uint8_t topif = pif_at_sidx(tosidx);
-	int n, i;
+	/* For not entirely clear reasons (data locality?) pasta gets better
+	 * throughput if we receive tap datagrams one at a a time.  For small
+	 * splice datagrams throughput is slightly better if we do batch, but
+	 * it's slightly worse for large splice datagrams.  Since we don't know
+	 * the size before we receive, always go one at a time for pasta mode.
+	 */
+	int n = (c->mode == MODE_PASTA ? 1 : UDP_MAX_FRAMES), i;
 
-	if ((n = udp_sock_recv(c, s, udp_mh_recv)) <= 0)
+	if ((n = udp_sock_recv(c, s, udp_mh_recv, n)) <= 0)
 		return true;
 
 	for (i = 0; i < n; i++) {
-- 
2.49.0


^ permalink raw reply related	[flat|nested] 13+ messages in thread

* [PATCH 03/12] udp: Polish udp_vu_sock_info() and remove from vu specific code
  2025-04-04 10:15 [PATCH 00/12] Use connect()ed sockets for both sides of UDP flows David Gibson
  2025-04-04 10:15 ` [PATCH 01/12] udp: Use connect()ed sockets for initiating side David Gibson
  2025-04-04 10:15 ` [PATCH 02/12] udp: Make udp_sock_recv() take max number of frames as a parameter David Gibson
@ 2025-04-04 10:15 ` David Gibson
  2025-04-04 10:15 ` [PATCH 04/12] udp: Don't bother to batch datagrams from "listening" socket David Gibson
                   ` (8 subsequent siblings)
  11 siblings, 0 replies; 13+ messages in thread
From: David Gibson @ 2025-04-04 10:15 UTC (permalink / raw)
  To: Stefano Brivio, passt-dev; +Cc: David Gibson

udp_vu_sock_info() uses MSG_PEEK to look ahead at the next datagram to be
received and gets its source address.  Currently we only use it in the
vhost-user path, but there's nothing inherently vhost-user specific about
it.  We have upcoming uses for it elsewhere so rename and move to udp.c.

While we're there, polish its error reporting a litle.

Signed-off-by: David Gibson <david@gibson.dropbear.id.au>
---
 udp.c          | 25 +++++++++++++++++++++++++
 udp_internal.h |  1 +
 udp_vu.c       | 19 +------------------
 3 files changed, 27 insertions(+), 18 deletions(-)

diff --git a/udp.c b/udp.c
index 8125cfcb..6b72c30f 100644
--- a/udp.c
+++ b/udp.c
@@ -629,6 +629,31 @@ static int udp_sock_errs(const struct ctx *c, union epoll_ref ref)
 	return n_err;
 }
 
+/**
+ * udp_peek_addr() - Get source address for next packet
+ * @s:		Socket to get information from
+ * @src:	Socket address (output)
+ *
+ * Return: 0 on success, -1 otherwise
+ */
+int udp_peek_addr(int s, union sockaddr_inany *src)
+{
+	struct msghdr msg = {
+		.msg_name = src,
+		.msg_namelen = sizeof(*src),
+	};
+	int rc;
+
+	rc = recvmsg(s, &msg, MSG_PEEK | MSG_DONTWAIT);
+	if (rc < 0) {
+		if (errno != EAGAIN && errno != EWOULDBLOCK)
+			warn_perror("Error peeking at socket address");
+		return rc;
+	}
+	return 0;
+}
+
+
 /**
  * udp_sock_recv() - Receive datagrams from a socket
  * @c:		Execution context
diff --git a/udp_internal.h b/udp_internal.h
index 02724e59..43a61094 100644
--- a/udp_internal.h
+++ b/udp_internal.h
@@ -30,5 +30,6 @@ size_t udp_update_hdr4(struct iphdr *ip4h, struct udp_payload_t *bp,
 size_t udp_update_hdr6(struct ipv6hdr *ip6h, struct udp_payload_t *bp,
                        const struct flowside *toside, size_t dlen,
 		       bool no_udp_csum);
+int udp_peek_addr(int s, union sockaddr_inany *src);
 
 #endif /* UDP_INTERNAL_H */
diff --git a/udp_vu.c b/udp_vu.c
index 4153b6c1..5faf1e1d 100644
--- a/udp_vu.c
+++ b/udp_vu.c
@@ -57,23 +57,6 @@ static size_t udp_vu_hdrlen(bool v6)
 	return hdrlen;
 }
 
-/**
- * udp_vu_sock_info() - get socket information
- * @s:		Socket to get information from
- * @s_in:	Socket address (output)
- *
- * Return: 0 if socket address can be read, -1 otherwise
- */
-static int udp_vu_sock_info(int s, union sockaddr_inany *s_in)
-{
-	struct msghdr msg = {
-		.msg_name = s_in,
-		.msg_namelen = sizeof(union sockaddr_inany),
-	};
-
-	return recvmsg(s, &msg, MSG_PEEK | MSG_DONTWAIT);
-}
-
 /**
  * udp_vu_sock_recv() - Receive datagrams from socket into vhost-user buffers
  * @c:		Execution context
@@ -230,7 +213,7 @@ void udp_vu_listen_sock_data(const struct ctx *c, union epoll_ref ref,
 		int iov_used;
 		bool v6;
 
-		if (udp_vu_sock_info(ref.fd, &s_in) < 0)
+		if (udp_peek_addr(ref.fd, &s_in) < 0)
 			break;
 
 		sidx = udp_flow_from_sock(c, ref, &s_in, now);
-- 
@@ -57,23 +57,6 @@ static size_t udp_vu_hdrlen(bool v6)
 	return hdrlen;
 }
 
-/**
- * udp_vu_sock_info() - get socket information
- * @s:		Socket to get information from
- * @s_in:	Socket address (output)
- *
- * Return: 0 if socket address can be read, -1 otherwise
- */
-static int udp_vu_sock_info(int s, union sockaddr_inany *s_in)
-{
-	struct msghdr msg = {
-		.msg_name = s_in,
-		.msg_namelen = sizeof(union sockaddr_inany),
-	};
-
-	return recvmsg(s, &msg, MSG_PEEK | MSG_DONTWAIT);
-}
-
 /**
  * udp_vu_sock_recv() - Receive datagrams from socket into vhost-user buffers
  * @c:		Execution context
@@ -230,7 +213,7 @@ void udp_vu_listen_sock_data(const struct ctx *c, union epoll_ref ref,
 		int iov_used;
 		bool v6;
 
-		if (udp_vu_sock_info(ref.fd, &s_in) < 0)
+		if (udp_peek_addr(ref.fd, &s_in) < 0)
 			break;
 
 		sidx = udp_flow_from_sock(c, ref, &s_in, now);
-- 
2.49.0


^ permalink raw reply related	[flat|nested] 13+ messages in thread

* [PATCH 04/12] udp: Don't bother to batch datagrams from "listening" socket
  2025-04-04 10:15 [PATCH 00/12] Use connect()ed sockets for both sides of UDP flows David Gibson
                   ` (2 preceding siblings ...)
  2025-04-04 10:15 ` [PATCH 03/12] udp: Polish udp_vu_sock_info() and remove from vu specific code David Gibson
@ 2025-04-04 10:15 ` David Gibson
  2025-04-04 10:15 ` [PATCH 05/12] udp: Parameterize number of datagrams handled by udp_*_reply_sock_data() David Gibson
                   ` (7 subsequent siblings)
  11 siblings, 0 replies; 13+ messages in thread
From: David Gibson @ 2025-04-04 10:15 UTC (permalink / raw)
  To: Stefano Brivio, passt-dev; +Cc: David Gibson

A "listening" UDP socket can receive datagrams from multiple flows.  So,
we currently have some quite subtle and complex code in
udp_buf_listen_sock_data() to group contiguously received packets for the
same flow into batches for forwarding.

However, since we are now always using flow specific connect()ed sockets
once a flow is established, handling of datagrams on listening sockets is
essentially a slow path.  Given that, it's not worth the complexity.
Substantially simplify the code by using an approach more like vhost-user,
and "peeking" at the address of the next datagram, one at a time to
determine the correct flow before we actually receive the data,

This removes all meaningful use of the s_in and tosidx fields in
udp_meta_t, so they too can be removed, along with setting of msg_name and
msg_namelen in the msghdr arrays which referenced them.

Signed-off-by: David Gibson <david@gibson.dropbear.id.au>
---
 udp.c | 81 ++++++++++++++++-------------------------------------------
 1 file changed, 22 insertions(+), 59 deletions(-)

diff --git a/udp.c b/udp.c
index 6b72c30f..4444d762 100644
--- a/udp.c
+++ b/udp.c
@@ -138,20 +138,15 @@ static struct ethhdr udp4_eth_hdr;
 static struct ethhdr udp6_eth_hdr;
 
 /**
- * struct udp_meta_t - Pre-cooked headers and metadata for UDP packets
+ * struct udp_meta_t - Pre-cooked headers for UDP packets
  * @ip6h:	Pre-filled IPv6 header (except for payload_len and addresses)
  * @ip4h:	Pre-filled IPv4 header (except for tot_len and saddr)
  * @taph:	Tap backend specific header
- * @s_in:	Source socket address, filled in by recvmmsg()
- * @tosidx:	sidx for the destination side of this datagram's flow
  */
 static struct udp_meta_t {
 	struct ipv6hdr ip6h;
 	struct iphdr ip4h;
 	struct tap_hdr taph;
-
-	union sockaddr_inany s_in;
-	flow_sidx_t tosidx;
 }
 #ifdef __AVX2__
 __attribute__ ((aligned(32)))
@@ -234,8 +229,6 @@ static void udp_iov_init_one(const struct ctx *c, size_t i)
 	tiov[UDP_IOV_TAP] = tap_hdr_iov(c, &meta->taph);
 	tiov[UDP_IOV_PAYLOAD].iov_base = payload;
 
-	mh->msg_name	= &meta->s_in;
-	mh->msg_namelen	= sizeof(meta->s_in);
 	mh->msg_iov	= siov;
 	mh->msg_iovlen	= 1;
 }
@@ -687,60 +680,32 @@ static int udp_sock_recv(const struct ctx *c, int s, struct mmsghdr *mmh, int n)
 static void udp_buf_listen_sock_data(const struct ctx *c, union epoll_ref ref,
 				     const struct timespec *now)
 {
-	const socklen_t sasize = sizeof(udp_meta[0].s_in);
-	/* See udp_buf_sock_data() comment */
-	int n = (c->mode == MODE_PASTA ? 1 : UDP_MAX_FRAMES), i;
-
-	if ((n = udp_sock_recv(c, ref.fd, udp_mh_recv, n)) <= 0)
-		return;
-
-	/* We divide datagrams into batches based on how we need to send them,
-	 * determined by udp_meta[i].tosidx.  To avoid either two passes through
-	 * the array, or recalculating tosidx for a single entry, we have to
-	 * populate it one entry *ahead* of the loop counter.
-	 */
-	udp_meta[0].tosidx = udp_flow_from_sock(c, ref, &udp_meta[0].s_in, now);
-	udp_mh_recv[0].msg_hdr.msg_namelen = sasize;
-	for (i = 0; i < n; ) {
-		flow_sidx_t batchsidx = udp_meta[i].tosidx;
-		uint8_t batchpif = pif_at_sidx(batchsidx);
-		int batchstart = i;
-
-		do {
-			if (pif_is_socket(batchpif)) {
-				udp_splice_prepare(udp_mh_recv, i);
-			} else if (batchpif == PIF_TAP) {
-				udp_tap_prepare(udp_mh_recv, i,
-						flowside_at_sidx(batchsidx),
-						false);
-			}
-
-			if (++i >= n)
-				break;
-
-			udp_meta[i].tosidx = udp_flow_from_sock(c, ref,
-								&udp_meta[i].s_in,
-								now);
-			udp_mh_recv[i].msg_hdr.msg_namelen = sasize;
-		} while (flow_sidx_eq(udp_meta[i].tosidx, batchsidx));
-
-		if (pif_is_socket(batchpif)) {
-			udp_splice_send(c, batchstart, i - batchstart,
-					batchsidx);
-		} else if (batchpif == PIF_TAP) {
-			tap_send_frames(c, &udp_l2_iov[batchstart][0],
-					UDP_NUM_IOVS, i - batchstart);
-		} else if (flow_sidx_valid(batchsidx)) {
-			flow_sidx_t fromsidx = flow_sidx_opposite(batchsidx);
-			struct udp_flow *uflow = udp_at_sidx(batchsidx);
+	union sockaddr_inany src;
+
+	while (udp_peek_addr(ref.fd, &src) == 0) {
+		flow_sidx_t tosidx = udp_flow_from_sock(c, ref, &src, now);
+		uint8_t topif = pif_at_sidx(tosidx);
+
+		if (udp_sock_recv(c, ref.fd, udp_mh_recv, 1) <= 0)
+			break;
+
+		if (pif_is_socket(topif)) {
+			udp_splice_prepare(udp_mh_recv, 0);
+			udp_splice_send(c, 0, 1, tosidx);
+		} else if (topif == PIF_TAP) {
+			udp_tap_prepare(udp_mh_recv, 0, flowside_at_sidx(tosidx),
+					false);
+			tap_send_frames(c, &udp_l2_iov[0][0], UDP_NUM_IOVS, 1);
+		} else if (flow_sidx_valid(tosidx)) {
+			flow_sidx_t fromsidx = flow_sidx_opposite(tosidx);
+			struct udp_flow *uflow = udp_at_sidx(tosidx);
 
 			flow_err(uflow,
 				 "No support for forwarding UDP from %s to %s",
 				 pif_name(pif_at_sidx(fromsidx)),
-				 pif_name(batchpif));
+				 pif_name(topif));
 		} else {
-			debug("Discarding %d datagrams without flow",
-			      i - batchstart);
+			debug("Discarding datagram without flow");
 		}
 	}
 }
@@ -802,8 +767,6 @@ static bool udp_buf_reply_sock_data(const struct ctx *c,
 			udp_splice_prepare(udp_mh_recv, i);
 		else if (topif == PIF_TAP)
 			udp_tap_prepare(udp_mh_recv, i, toside, false);
-		/* Restore sockaddr length clobbered by recvmsg() */
-		udp_mh_recv[i].msg_hdr.msg_namelen = sizeof(udp_meta[i].s_in);
 	}
 
 	if (pif_is_socket(topif)) {
-- 
@@ -138,20 +138,15 @@ static struct ethhdr udp4_eth_hdr;
 static struct ethhdr udp6_eth_hdr;
 
 /**
- * struct udp_meta_t - Pre-cooked headers and metadata for UDP packets
+ * struct udp_meta_t - Pre-cooked headers for UDP packets
  * @ip6h:	Pre-filled IPv6 header (except for payload_len and addresses)
  * @ip4h:	Pre-filled IPv4 header (except for tot_len and saddr)
  * @taph:	Tap backend specific header
- * @s_in:	Source socket address, filled in by recvmmsg()
- * @tosidx:	sidx for the destination side of this datagram's flow
  */
 static struct udp_meta_t {
 	struct ipv6hdr ip6h;
 	struct iphdr ip4h;
 	struct tap_hdr taph;
-
-	union sockaddr_inany s_in;
-	flow_sidx_t tosidx;
 }
 #ifdef __AVX2__
 __attribute__ ((aligned(32)))
@@ -234,8 +229,6 @@ static void udp_iov_init_one(const struct ctx *c, size_t i)
 	tiov[UDP_IOV_TAP] = tap_hdr_iov(c, &meta->taph);
 	tiov[UDP_IOV_PAYLOAD].iov_base = payload;
 
-	mh->msg_name	= &meta->s_in;
-	mh->msg_namelen	= sizeof(meta->s_in);
 	mh->msg_iov	= siov;
 	mh->msg_iovlen	= 1;
 }
@@ -687,60 +680,32 @@ static int udp_sock_recv(const struct ctx *c, int s, struct mmsghdr *mmh, int n)
 static void udp_buf_listen_sock_data(const struct ctx *c, union epoll_ref ref,
 				     const struct timespec *now)
 {
-	const socklen_t sasize = sizeof(udp_meta[0].s_in);
-	/* See udp_buf_sock_data() comment */
-	int n = (c->mode == MODE_PASTA ? 1 : UDP_MAX_FRAMES), i;
-
-	if ((n = udp_sock_recv(c, ref.fd, udp_mh_recv, n)) <= 0)
-		return;
-
-	/* We divide datagrams into batches based on how we need to send them,
-	 * determined by udp_meta[i].tosidx.  To avoid either two passes through
-	 * the array, or recalculating tosidx for a single entry, we have to
-	 * populate it one entry *ahead* of the loop counter.
-	 */
-	udp_meta[0].tosidx = udp_flow_from_sock(c, ref, &udp_meta[0].s_in, now);
-	udp_mh_recv[0].msg_hdr.msg_namelen = sasize;
-	for (i = 0; i < n; ) {
-		flow_sidx_t batchsidx = udp_meta[i].tosidx;
-		uint8_t batchpif = pif_at_sidx(batchsidx);
-		int batchstart = i;
-
-		do {
-			if (pif_is_socket(batchpif)) {
-				udp_splice_prepare(udp_mh_recv, i);
-			} else if (batchpif == PIF_TAP) {
-				udp_tap_prepare(udp_mh_recv, i,
-						flowside_at_sidx(batchsidx),
-						false);
-			}
-
-			if (++i >= n)
-				break;
-
-			udp_meta[i].tosidx = udp_flow_from_sock(c, ref,
-								&udp_meta[i].s_in,
-								now);
-			udp_mh_recv[i].msg_hdr.msg_namelen = sasize;
-		} while (flow_sidx_eq(udp_meta[i].tosidx, batchsidx));
-
-		if (pif_is_socket(batchpif)) {
-			udp_splice_send(c, batchstart, i - batchstart,
-					batchsidx);
-		} else if (batchpif == PIF_TAP) {
-			tap_send_frames(c, &udp_l2_iov[batchstart][0],
-					UDP_NUM_IOVS, i - batchstart);
-		} else if (flow_sidx_valid(batchsidx)) {
-			flow_sidx_t fromsidx = flow_sidx_opposite(batchsidx);
-			struct udp_flow *uflow = udp_at_sidx(batchsidx);
+	union sockaddr_inany src;
+
+	while (udp_peek_addr(ref.fd, &src) == 0) {
+		flow_sidx_t tosidx = udp_flow_from_sock(c, ref, &src, now);
+		uint8_t topif = pif_at_sidx(tosidx);
+
+		if (udp_sock_recv(c, ref.fd, udp_mh_recv, 1) <= 0)
+			break;
+
+		if (pif_is_socket(topif)) {
+			udp_splice_prepare(udp_mh_recv, 0);
+			udp_splice_send(c, 0, 1, tosidx);
+		} else if (topif == PIF_TAP) {
+			udp_tap_prepare(udp_mh_recv, 0, flowside_at_sidx(tosidx),
+					false);
+			tap_send_frames(c, &udp_l2_iov[0][0], UDP_NUM_IOVS, 1);
+		} else if (flow_sidx_valid(tosidx)) {
+			flow_sidx_t fromsidx = flow_sidx_opposite(tosidx);
+			struct udp_flow *uflow = udp_at_sidx(tosidx);
 
 			flow_err(uflow,
 				 "No support for forwarding UDP from %s to %s",
 				 pif_name(pif_at_sidx(fromsidx)),
-				 pif_name(batchpif));
+				 pif_name(topif));
 		} else {
-			debug("Discarding %d datagrams without flow",
-			      i - batchstart);
+			debug("Discarding datagram without flow");
 		}
 	}
 }
@@ -802,8 +767,6 @@ static bool udp_buf_reply_sock_data(const struct ctx *c,
 			udp_splice_prepare(udp_mh_recv, i);
 		else if (topif == PIF_TAP)
 			udp_tap_prepare(udp_mh_recv, i, toside, false);
-		/* Restore sockaddr length clobbered by recvmsg() */
-		udp_mh_recv[i].msg_hdr.msg_namelen = sizeof(udp_meta[i].s_in);
 	}
 
 	if (pif_is_socket(topif)) {
-- 
2.49.0


^ permalink raw reply related	[flat|nested] 13+ messages in thread

* [PATCH 05/12] udp: Parameterize number of datagrams handled by udp_*_reply_sock_data()
  2025-04-04 10:15 [PATCH 00/12] Use connect()ed sockets for both sides of UDP flows David Gibson
                   ` (3 preceding siblings ...)
  2025-04-04 10:15 ` [PATCH 04/12] udp: Don't bother to batch datagrams from "listening" socket David Gibson
@ 2025-04-04 10:15 ` David Gibson
  2025-04-04 10:15 ` [PATCH 06/12] udp: Split spliced forwarding path from udp_buf_reply_sock_data() David Gibson
                   ` (6 subsequent siblings)
  11 siblings, 0 replies; 13+ messages in thread
From: David Gibson @ 2025-04-04 10:15 UTC (permalink / raw)
  To: Stefano Brivio, passt-dev; +Cc: David Gibson

Both udp_buf_reply_sock_data() and udp_vu_reply_sock_data() internally
decide what the maximum number of datagrams they will forward is.  We have
some upcoming reasons to allow the caller to decide that instead, so make
the maximum number of datagrams a parameter for both of them.

Signed-off-by: David Gibson <david@gibson.dropbear.id.au>
---
 udp.c    | 31 ++++++++++++++++++-------------
 udp_vu.c |  6 ++++--
 udp_vu.h |  3 ++-
 3 files changed, 24 insertions(+), 16 deletions(-)

diff --git a/udp.c b/udp.c
index 4444d762..d81f1213 100644
--- a/udp.c
+++ b/udp.c
@@ -742,22 +742,17 @@ void udp_listen_sock_handler(const struct ctx *c,
  * udp_buf_reply_sock_data() - Handle new data from flow specific socket
  * @c:		Execution context
  * @s:		Socket to read data from
+ * @n:		Maximum number of datagrams to forward
  * @tosidx:	Flow & side to forward data from @s to
  *
  * Return: true on success, false if can't forward from socket to flow's pif
  */
-static bool udp_buf_reply_sock_data(const struct ctx *c,
-				    int s, flow_sidx_t tosidx)
+static bool udp_buf_reply_sock_data(const struct ctx *c, int s, int n,
+				    flow_sidx_t tosidx)
 {
 	const struct flowside *toside = flowside_at_sidx(tosidx);
 	uint8_t topif = pif_at_sidx(tosidx);
-	/* For not entirely clear reasons (data locality?) pasta gets better
-	 * throughput if we receive tap datagrams one at a a time.  For small
-	 * splice datagrams throughput is slightly better if we do batch, but
-	 * it's slightly worse for large splice datagrams.  Since we don't know
-	 * the size before we receive, always go one at a time for pasta mode.
-	 */
-	int n = (c->mode == MODE_PASTA ? 1 : UDP_MAX_FRAMES), i;
+	int i;
 
 	if ((n = udp_sock_recv(c, s, udp_mh_recv, n)) <= 0)
 		return true;
@@ -802,6 +797,14 @@ void udp_sock_handler(const struct ctx *c, union epoll_ref ref,
 	}
 
 	if (events & EPOLLIN) {
+		/* For not entirely clear reasons (data locality?) pasta gets
+		 * better throughput if we receive tap datagrams one at a a
+		 * time.  For small splice datagrams throughput is slightly
+		 * better if we do batch, but it's slightly worse for large
+		 * splice datagrams.  Since we don't know the size before we
+		 * receive, always go one at a time for pasta mode.
+		 */
+		size_t n = (c->mode == MODE_PASTA ? 1 : UDP_MAX_FRAMES);
 		flow_sidx_t tosidx = flow_sidx_opposite(ref.flowside);
 		int s = ref.fd;
 		bool ret;
@@ -809,10 +812,12 @@ void udp_sock_handler(const struct ctx *c, union epoll_ref ref,
 		flow_trace(uflow, "Received data on reply socket");
 		uflow->ts = now->tv_sec;
 
-		if (c->mode == MODE_VU)
-			ret = udp_vu_reply_sock_data(c, s, tosidx);
-		else
-			ret = udp_buf_reply_sock_data(c, s, tosidx);
+		if (c->mode == MODE_VU) {
+			ret = udp_vu_reply_sock_data(c, s, UDP_MAX_FRAMES,
+						     tosidx);
+		} else {
+			ret = udp_buf_reply_sock_data(c, s, n, tosidx);
+		}
 
 		if (!ret) {
 			flow_err(uflow, "Unable to forward UDP");
diff --git a/udp_vu.c b/udp_vu.c
index 5faf1e1d..b2618b39 100644
--- a/udp_vu.c
+++ b/udp_vu.c
@@ -257,11 +257,13 @@ void udp_vu_listen_sock_data(const struct ctx *c, union epoll_ref ref,
  * udp_vu_reply_sock_data() - Handle new data from flow specific socket
  * @c:		Execution context
  * @s:		Socket to read data from
+ * @n:		Maximum number of datagrams to forward
  * @tosidx:	Flow & side to forward data from @s to
  *
  * Return: true on success, false if can't forward from socket to flow's pif
  */
-bool udp_vu_reply_sock_data(const struct ctx *c, int s, flow_sidx_t tosidx)
+bool udp_vu_reply_sock_data(const struct ctx *c, int s, int n,
+			    flow_sidx_t tosidx)
 {
 	const struct flowside *toside = flowside_at_sidx(tosidx);
 	bool v6 = !(inany_v4(&toside->eaddr) && inany_v4(&toside->oaddr));
@@ -272,7 +274,7 @@ bool udp_vu_reply_sock_data(const struct ctx *c, int s, flow_sidx_t tosidx)
 	if (pif_at_sidx(tosidx) != PIF_TAP)
 		return false;
 
-	for (i = 0; i < UDP_MAX_FRAMES; i++) {
+	for (i = 0; i < n; i++) {
 		ssize_t dlen;
 		int iov_used;
 
diff --git a/udp_vu.h b/udp_vu.h
index 6d541a4e..c897c36f 100644
--- a/udp_vu.h
+++ b/udp_vu.h
@@ -8,6 +8,7 @@
 
 void udp_vu_listen_sock_data(const struct ctx *c, union epoll_ref ref,
 			     const struct timespec *now);
-bool udp_vu_reply_sock_data(const struct ctx *c, int s, flow_sidx_t tosidx);
+bool udp_vu_reply_sock_data(const struct ctx *c, int s, int n,
+			    flow_sidx_t tosidx);
 
 #endif /* UDP_VU_H */
-- 
@@ -8,6 +8,7 @@
 
 void udp_vu_listen_sock_data(const struct ctx *c, union epoll_ref ref,
 			     const struct timespec *now);
-bool udp_vu_reply_sock_data(const struct ctx *c, int s, flow_sidx_t tosidx);
+bool udp_vu_reply_sock_data(const struct ctx *c, int s, int n,
+			    flow_sidx_t tosidx);
 
 #endif /* UDP_VU_H */
-- 
2.49.0


^ permalink raw reply related	[flat|nested] 13+ messages in thread

* [PATCH 06/12] udp: Split spliced forwarding path from udp_buf_reply_sock_data()
  2025-04-04 10:15 [PATCH 00/12] Use connect()ed sockets for both sides of UDP flows David Gibson
                   ` (4 preceding siblings ...)
  2025-04-04 10:15 ` [PATCH 05/12] udp: Parameterize number of datagrams handled by udp_*_reply_sock_data() David Gibson
@ 2025-04-04 10:15 ` David Gibson
  2025-04-04 10:15 ` [PATCH 07/12] udp: Merge vhost-user and "buf" listening socket paths David Gibson
                   ` (5 subsequent siblings)
  11 siblings, 0 replies; 13+ messages in thread
From: David Gibson @ 2025-04-04 10:15 UTC (permalink / raw)
  To: Stefano Brivio, passt-dev; +Cc: David Gibson

udp_buf_reply_sock_data() can handle forwarding data either from socket
to socket ("splicing") or from socket to tap.  It has a test on each
datagram for which case we're in, but that will be the same for everything
in the batch.

Split out the spliced path into a separate udp_sock_to_sock() function.
This leaves udp_{buf,vu}_reply_sock_data() handling only forwards from
socket to tap, so rename and simplify them accordingly.

This makes the code slightly longer for now, but will allow future cleanups
to shrink it back down again.

Signed-off-by: David Gibson <david@gibson.dropbear.id.au>
---
 udp.c    | 103 ++++++++++++++++++++++++++++++-------------------------
 udp_vu.c |  12 ++-----
 udp_vu.h |   3 +-
 3 files changed, 60 insertions(+), 58 deletions(-)

diff --git a/udp.c b/udp.c
index d81f1213..144e625f 100644
--- a/udp.c
+++ b/udp.c
@@ -671,6 +671,49 @@ static int udp_sock_recv(const struct ctx *c, int s, struct mmsghdr *mmh, int n)
 	return n;
 }
 
+/**
+ * udp_sock_to_sock() - Forward datagrams from socket to socket
+ * @c:		Execution context
+ * @from_s:	Socket to receive datagrams from
+ * @n:		Maximum number of datagrams to forward
+ * @tosidx:	Flow & side to forward datagrams too
+ */
+static void udp_sock_to_sock(const struct ctx *c, int from_s, int n,
+			     flow_sidx_t tosidx)
+{
+	int i;
+
+	if ((n = udp_sock_recv(c, from_s, udp_mh_recv, n)) <= 0)
+		return;
+
+	for (i = 0; i < n; i++)
+		udp_splice_prepare(udp_mh_recv, i);
+
+	udp_splice_send(c, 0, n, tosidx);
+}
+
+/**
+ * udp_buf_sock_to_tap() - Forward datagrams from socket to tap
+ * @c:		Execution context
+ * @s:		Socket to read data from
+ * @n:		Maximum number of datagrams to forward
+ * @tosidx:	Flow & side to forward data from @s to
+ */
+static void udp_buf_sock_to_tap(const struct ctx *c, int s, int n,
+				flow_sidx_t tosidx)
+{
+	const struct flowside *toside = flowside_at_sidx(tosidx);
+	int i;
+
+	if ((n = udp_sock_recv(c, s, udp_mh_recv, n)) <= 0)
+		return;
+
+	for (i = 0; i < n; i++)
+		udp_tap_prepare(udp_mh_recv, i, toside, false);
+
+	tap_send_frames(c, &udp_l2_iov[0][0], UDP_NUM_IOVS, n);
+}
+
 /**
  * udp_buf_listen_sock_data() - Handle new data from socket
  * @c:		Execution context
@@ -738,43 +781,6 @@ void udp_listen_sock_handler(const struct ctx *c,
 	}
 }
 
-/**
- * udp_buf_reply_sock_data() - Handle new data from flow specific socket
- * @c:		Execution context
- * @s:		Socket to read data from
- * @n:		Maximum number of datagrams to forward
- * @tosidx:	Flow & side to forward data from @s to
- *
- * Return: true on success, false if can't forward from socket to flow's pif
- */
-static bool udp_buf_reply_sock_data(const struct ctx *c, int s, int n,
-				    flow_sidx_t tosidx)
-{
-	const struct flowside *toside = flowside_at_sidx(tosidx);
-	uint8_t topif = pif_at_sidx(tosidx);
-	int i;
-
-	if ((n = udp_sock_recv(c, s, udp_mh_recv, n)) <= 0)
-		return true;
-
-	for (i = 0; i < n; i++) {
-		if (pif_is_socket(topif))
-			udp_splice_prepare(udp_mh_recv, i);
-		else if (topif == PIF_TAP)
-			udp_tap_prepare(udp_mh_recv, i, toside, false);
-	}
-
-	if (pif_is_socket(topif)) {
-		udp_splice_send(c, 0, n, tosidx);
-	} else if (topif == PIF_TAP) {
-		tap_send_frames(c, &udp_l2_iov[0][0], UDP_NUM_IOVS, n);
-	} else {
-		return false;
-	}
-
-	return true;
-}
-
 /**
  * udp_sock_handler() - Handle new data from flow specific socket
  * @c:		Execution context
@@ -806,21 +812,26 @@ void udp_sock_handler(const struct ctx *c, union epoll_ref ref,
 		 */
 		size_t n = (c->mode == MODE_PASTA ? 1 : UDP_MAX_FRAMES);
 		flow_sidx_t tosidx = flow_sidx_opposite(ref.flowside);
+		uint8_t topif = pif_at_sidx(tosidx);
 		int s = ref.fd;
-		bool ret;
 
 		flow_trace(uflow, "Received data on reply socket");
 		uflow->ts = now->tv_sec;
 
-		if (c->mode == MODE_VU) {
-			ret = udp_vu_reply_sock_data(c, s, UDP_MAX_FRAMES,
-						     tosidx);
+		if (pif_is_socket(topif)) {
+			udp_sock_to_sock(c, ref.fd, n, tosidx);
+		} else if (topif == PIF_TAP) {
+			if (c->mode == MODE_VU) {
+				udp_vu_sock_to_tap(c, s, UDP_MAX_FRAMES,
+						   tosidx);
+			} else {
+				udp_buf_sock_to_tap(c, s, n, tosidx);
+			}
 		} else {
-			ret = udp_buf_reply_sock_data(c, s, n, tosidx);
-		}
-
-		if (!ret) {
-			flow_err(uflow, "Unable to forward UDP");
+			flow_err(uflow,
+				 "No support for forwarding UDP from %s to %s",
+				 pif_name(pif_at_sidx(ref.flowside)),
+				 pif_name(topif));
 			goto fail;
 		}
 	}
diff --git a/udp_vu.c b/udp_vu.c
index b2618b39..fcccef65 100644
--- a/udp_vu.c
+++ b/udp_vu.c
@@ -254,16 +254,13 @@ void udp_vu_listen_sock_data(const struct ctx *c, union epoll_ref ref,
 }
 
 /**
- * udp_vu_reply_sock_data() - Handle new data from flow specific socket
+ * udp_vu_sock_to_tap() - Forward datagrames from socket to tap
  * @c:		Execution context
  * @s:		Socket to read data from
  * @n:		Maximum number of datagrams to forward
  * @tosidx:	Flow & side to forward data from @s to
- *
- * Return: true on success, false if can't forward from socket to flow's pif
  */
-bool udp_vu_reply_sock_data(const struct ctx *c, int s, int n,
-			    flow_sidx_t tosidx)
+void udp_vu_sock_to_tap(const struct ctx *c, int s, int n, flow_sidx_t tosidx)
 {
 	const struct flowside *toside = flowside_at_sidx(tosidx);
 	bool v6 = !(inany_v4(&toside->eaddr) && inany_v4(&toside->oaddr));
@@ -271,9 +268,6 @@ bool udp_vu_reply_sock_data(const struct ctx *c, int s, int n,
 	struct vu_virtq *vq = &vdev->vq[VHOST_USER_RX_QUEUE];
 	int i;
 
-	if (pif_at_sidx(tosidx) != PIF_TAP)
-		return false;
-
 	for (i = 0; i < n; i++) {
 		ssize_t dlen;
 		int iov_used;
@@ -290,6 +284,4 @@ bool udp_vu_reply_sock_data(const struct ctx *c, int s, int n,
 		}
 		vu_flush(vdev, vq, elem, iov_used);
 	}
-
-	return true;
 }
diff --git a/udp_vu.h b/udp_vu.h
index c897c36f..576b0e71 100644
--- a/udp_vu.h
+++ b/udp_vu.h
@@ -8,7 +8,6 @@
 
 void udp_vu_listen_sock_data(const struct ctx *c, union epoll_ref ref,
 			     const struct timespec *now);
-bool udp_vu_reply_sock_data(const struct ctx *c, int s, int n,
-			    flow_sidx_t tosidx);
+void udp_vu_sock_to_tap(const struct ctx *c, int s, int n, flow_sidx_t tosidx);
 
 #endif /* UDP_VU_H */
-- 
@@ -8,7 +8,6 @@
 
 void udp_vu_listen_sock_data(const struct ctx *c, union epoll_ref ref,
 			     const struct timespec *now);
-bool udp_vu_reply_sock_data(const struct ctx *c, int s, int n,
-			    flow_sidx_t tosidx);
+void udp_vu_sock_to_tap(const struct ctx *c, int s, int n, flow_sidx_t tosidx);
 
 #endif /* UDP_VU_H */
-- 
2.49.0


^ permalink raw reply related	[flat|nested] 13+ messages in thread

* [PATCH 07/12] udp: Merge vhost-user and "buf" listening socket paths
  2025-04-04 10:15 [PATCH 00/12] Use connect()ed sockets for both sides of UDP flows David Gibson
                   ` (5 preceding siblings ...)
  2025-04-04 10:15 ` [PATCH 06/12] udp: Split spliced forwarding path from udp_buf_reply_sock_data() David Gibson
@ 2025-04-04 10:15 ` David Gibson
  2025-04-04 10:15 ` [PATCH 08/12] udp: Move UDP_MAX_FRAMES to udp.c David Gibson
                   ` (4 subsequent siblings)
  11 siblings, 0 replies; 13+ messages in thread
From: David Gibson @ 2025-04-04 10:15 UTC (permalink / raw)
  To: Stefano Brivio, passt-dev; +Cc: David Gibson

udp_buf_listen_sock_data() and udp_vu_listen_sock_data() now have
effectively identical structure.  The forwarding functions used for flow
specific sockets (udp_buf_sock_to_tap(), udp_vu_sock_to_tap() and
udp_sock_to_sock()) also now take a number of datagrams.  This means we
can re-use them for the listening socket path, just passing '1' so they
handle a single datagram at a time.

This allows us to merge both the vhost-user and flow specific paths into
a single, simpler udp_listen_sock_data().

Signed-off-by: David Gibson <david@gibson.dropbear.id.au>
---
 udp.c          | 27 ++++++++--------------
 udp_internal.h |  1 -
 udp_vu.c       | 62 --------------------------------------------------
 3 files changed, 10 insertions(+), 80 deletions(-)

diff --git a/udp.c b/udp.c
index 144e625f..f724b100 100644
--- a/udp.c
+++ b/udp.c
@@ -629,7 +629,7 @@ static int udp_sock_errs(const struct ctx *c, union epoll_ref ref)
  *
  * Return: 0 on success, -1 otherwise
  */
-int udp_peek_addr(int s, union sockaddr_inany *src)
+static int udp_peek_addr(int s, union sockaddr_inany *src)
 {
 	struct msghdr msg = {
 		.msg_name = src,
@@ -715,12 +715,12 @@ static void udp_buf_sock_to_tap(const struct ctx *c, int s, int n,
 }
 
 /**
- * udp_buf_listen_sock_data() - Handle new data from socket
+ * udp_listen_sock_data() - Handle new data from listening socket
  * @c:		Execution context
  * @ref:	epoll reference
  * @now:	Current timestamp
  */
-static void udp_buf_listen_sock_data(const struct ctx *c, union epoll_ref ref,
+static void udp_listen_sock_data(const struct ctx *c, union epoll_ref ref,
 				     const struct timespec *now)
 {
 	union sockaddr_inany src;
@@ -729,16 +729,13 @@ static void udp_buf_listen_sock_data(const struct ctx *c, union epoll_ref ref,
 		flow_sidx_t tosidx = udp_flow_from_sock(c, ref, &src, now);
 		uint8_t topif = pif_at_sidx(tosidx);
 
-		if (udp_sock_recv(c, ref.fd, udp_mh_recv, 1) <= 0)
-			break;
-
 		if (pif_is_socket(topif)) {
-			udp_splice_prepare(udp_mh_recv, 0);
-			udp_splice_send(c, 0, 1, tosidx);
+			udp_sock_to_sock(c, ref.fd, 1, tosidx);
 		} else if (topif == PIF_TAP) {
-			udp_tap_prepare(udp_mh_recv, 0, flowside_at_sidx(tosidx),
-					false);
-			tap_send_frames(c, &udp_l2_iov[0][0], UDP_NUM_IOVS, 1);
+			if (c->mode == MODE_VU)
+				udp_vu_sock_to_tap(c, ref.fd, 1, tosidx);
+			else
+				udp_buf_sock_to_tap(c, ref.fd, 1, tosidx);
 		} else if (flow_sidx_valid(tosidx)) {
 			flow_sidx_t fromsidx = flow_sidx_opposite(tosidx);
 			struct udp_flow *uflow = udp_at_sidx(tosidx);
@@ -773,12 +770,8 @@ void udp_listen_sock_handler(const struct ctx *c,
 		}
 	}
 
-	if (events & EPOLLIN) {
-		if (c->mode == MODE_VU)
-			udp_vu_listen_sock_data(c, ref, now);
-		else
-			udp_buf_listen_sock_data(c, ref, now);
-	}
+	if (events & EPOLLIN)
+		udp_listen_sock_data(c, ref, now);
 }
 
 /**
diff --git a/udp_internal.h b/udp_internal.h
index 43a61094..02724e59 100644
--- a/udp_internal.h
+++ b/udp_internal.h
@@ -30,6 +30,5 @@ size_t udp_update_hdr4(struct iphdr *ip4h, struct udp_payload_t *bp,
 size_t udp_update_hdr6(struct ipv6hdr *ip6h, struct udp_payload_t *bp,
                        const struct flowside *toside, size_t dlen,
 		       bool no_udp_csum);
-int udp_peek_addr(int s, union sockaddr_inany *src);
 
 #endif /* UDP_INTERNAL_H */
diff --git a/udp_vu.c b/udp_vu.c
index fcccef65..665176b3 100644
--- a/udp_vu.c
+++ b/udp_vu.c
@@ -191,68 +191,6 @@ static void udp_vu_csum(const struct flowside *toside, int iov_used)
 	}
 }
 
-/**
- * udp_vu_listen_sock_data() - Handle new data from socket
- * @c:		Execution context
- * @ref:	epoll reference
- * @now:	Current timestamp
- */
-void udp_vu_listen_sock_data(const struct ctx *c, union epoll_ref ref,
-			     const struct timespec *now)
-{
-	struct vu_dev *vdev = c->vdev;
-	struct vu_virtq *vq = &vdev->vq[VHOST_USER_RX_QUEUE];
-	int i;
-
-	for (i = 0; i < UDP_MAX_FRAMES; i++) {
-		const struct flowside *toside;
-		union sockaddr_inany s_in;
-		flow_sidx_t sidx;
-		uint8_t pif;
-		ssize_t dlen;
-		int iov_used;
-		bool v6;
-
-		if (udp_peek_addr(ref.fd, &s_in) < 0)
-			break;
-
-		sidx = udp_flow_from_sock(c, ref, &s_in, now);
-		pif = pif_at_sidx(sidx);
-
-		if (pif != PIF_TAP) {
-			if (flow_sidx_valid(sidx)) {
-				flow_sidx_t fromsidx = flow_sidx_opposite(sidx);
-				struct udp_flow *uflow = udp_at_sidx(sidx);
-
-				flow_err(uflow,
-					"No support for forwarding UDP from %s to %s",
-					pif_name(pif_at_sidx(fromsidx)),
-					pif_name(pif));
-			} else {
-				debug("Discarding 1 datagram without flow");
-			}
-
-			continue;
-		}
-
-		toside = flowside_at_sidx(sidx);
-
-		v6 = !(inany_v4(&toside->eaddr) && inany_v4(&toside->oaddr));
-
-		iov_used = udp_vu_sock_recv(c, ref.fd, v6, &dlen);
-		if (iov_used <= 0)
-			break;
-
-		udp_vu_prepare(c, toside, dlen);
-		if (*c->pcap) {
-			udp_vu_csum(toside, iov_used);
-			pcap_iov(iov_vu, iov_used,
-				 sizeof(struct virtio_net_hdr_mrg_rxbuf));
-		}
-		vu_flush(vdev, vq, elem, iov_used);
-	}
-}
-
 /**
  * udp_vu_sock_to_tap() - Forward datagrames from socket to tap
  * @c:		Execution context
-- 
@@ -191,68 +191,6 @@ static void udp_vu_csum(const struct flowside *toside, int iov_used)
 	}
 }
 
-/**
- * udp_vu_listen_sock_data() - Handle new data from socket
- * @c:		Execution context
- * @ref:	epoll reference
- * @now:	Current timestamp
- */
-void udp_vu_listen_sock_data(const struct ctx *c, union epoll_ref ref,
-			     const struct timespec *now)
-{
-	struct vu_dev *vdev = c->vdev;
-	struct vu_virtq *vq = &vdev->vq[VHOST_USER_RX_QUEUE];
-	int i;
-
-	for (i = 0; i < UDP_MAX_FRAMES; i++) {
-		const struct flowside *toside;
-		union sockaddr_inany s_in;
-		flow_sidx_t sidx;
-		uint8_t pif;
-		ssize_t dlen;
-		int iov_used;
-		bool v6;
-
-		if (udp_peek_addr(ref.fd, &s_in) < 0)
-			break;
-
-		sidx = udp_flow_from_sock(c, ref, &s_in, now);
-		pif = pif_at_sidx(sidx);
-
-		if (pif != PIF_TAP) {
-			if (flow_sidx_valid(sidx)) {
-				flow_sidx_t fromsidx = flow_sidx_opposite(sidx);
-				struct udp_flow *uflow = udp_at_sidx(sidx);
-
-				flow_err(uflow,
-					"No support for forwarding UDP from %s to %s",
-					pif_name(pif_at_sidx(fromsidx)),
-					pif_name(pif));
-			} else {
-				debug("Discarding 1 datagram without flow");
-			}
-
-			continue;
-		}
-
-		toside = flowside_at_sidx(sidx);
-
-		v6 = !(inany_v4(&toside->eaddr) && inany_v4(&toside->oaddr));
-
-		iov_used = udp_vu_sock_recv(c, ref.fd, v6, &dlen);
-		if (iov_used <= 0)
-			break;
-
-		udp_vu_prepare(c, toside, dlen);
-		if (*c->pcap) {
-			udp_vu_csum(toside, iov_used);
-			pcap_iov(iov_vu, iov_used,
-				 sizeof(struct virtio_net_hdr_mrg_rxbuf));
-		}
-		vu_flush(vdev, vq, elem, iov_used);
-	}
-}
-
 /**
  * udp_vu_sock_to_tap() - Forward datagrames from socket to tap
  * @c:		Execution context
-- 
2.49.0


^ permalink raw reply related	[flat|nested] 13+ messages in thread

* [PATCH 08/12] udp: Move UDP_MAX_FRAMES to udp.c
  2025-04-04 10:15 [PATCH 00/12] Use connect()ed sockets for both sides of UDP flows David Gibson
                   ` (6 preceding siblings ...)
  2025-04-04 10:15 ` [PATCH 07/12] udp: Merge vhost-user and "buf" listening socket paths David Gibson
@ 2025-04-04 10:15 ` David Gibson
  2025-04-04 10:15 ` [PATCH 09/12] udp_flow: Take pif and port as explicit parameters to udp_flow_from_sock() David Gibson
                   ` (3 subsequent siblings)
  11 siblings, 0 replies; 13+ messages in thread
From: David Gibson @ 2025-04-04 10:15 UTC (permalink / raw)
  To: Stefano Brivio, passt-dev; +Cc: David Gibson

Recent changes mean that this define is no longer used anywhere except in
udp.c.  Move it back into udp.c from udp_internal.h.

Signed-off-by: David Gibson <david@gibson.dropbear.id.au>
---
 udp.c          | 2 ++
 udp_internal.h | 2 --
 2 files changed, 2 insertions(+), 2 deletions(-)

diff --git a/udp.c b/udp.c
index f724b100..f6de2924 100644
--- a/udp.c
+++ b/udp.c
@@ -116,6 +116,8 @@
 #include "udp_internal.h"
 #include "udp_vu.h"
 
+#define UDP_MAX_FRAMES		32  /* max # of frames to receive at once */
+
 /* Maximum UDP data to be returned in ICMP messages */
 #define ICMP4_MAX_DLEN 8
 #define ICMP6_MAX_DLEN (IPV6_MIN_MTU			\
diff --git a/udp_internal.h b/udp_internal.h
index 02724e59..f7d84267 100644
--- a/udp_internal.h
+++ b/udp_internal.h
@@ -8,8 +8,6 @@
 
 #include "tap.h" /* needed by udp_meta_t */
 
-#define UDP_MAX_FRAMES		32  /* max # of frames to receive at once */
-
 /**
  * struct udp_payload_t - UDP header and data for inbound messages
  * @uh:		UDP header
-- 
@@ -8,8 +8,6 @@
 
 #include "tap.h" /* needed by udp_meta_t */
 
-#define UDP_MAX_FRAMES		32  /* max # of frames to receive at once */
-
 /**
  * struct udp_payload_t - UDP header and data for inbound messages
  * @uh:		UDP header
-- 
2.49.0


^ permalink raw reply related	[flat|nested] 13+ messages in thread

* [PATCH 09/12] udp_flow: Take pif and port as explicit parameters to udp_flow_from_sock()
  2025-04-04 10:15 [PATCH 00/12] Use connect()ed sockets for both sides of UDP flows David Gibson
                   ` (7 preceding siblings ...)
  2025-04-04 10:15 ` [PATCH 08/12] udp: Move UDP_MAX_FRAMES to udp.c David Gibson
@ 2025-04-04 10:15 ` David Gibson
  2025-04-04 10:15 ` [PATCH 10/12] udp: Rework udp_listen_sock_data() into udp_sock_fwd() David Gibson
                   ` (2 subsequent siblings)
  11 siblings, 0 replies; 13+ messages in thread
From: David Gibson @ 2025-04-04 10:15 UTC (permalink / raw)
  To: Stefano Brivio, passt-dev; +Cc: David Gibson

Currently udp_flow_from_sock() is only used when receiving a datagram
from a "listening" socket.  It takes the listening socket's epoll
reference to get the interface and port on which the datagram arrived.

We have some upcoming cases where we want to use this in different
contexts, so make it take the pif and port as direct parameters instead.

Signed-off-by: David Gibson <david@gibson.dropbear.id.au>
---
 udp.c      |  4 +++-
 udp_flow.c | 15 +++++++--------
 udp_flow.h |  2 +-
 3 files changed, 11 insertions(+), 10 deletions(-)

diff --git a/udp.c b/udp.c
index f6de2924..dbb33f2a 100644
--- a/udp.c
+++ b/udp.c
@@ -728,7 +728,9 @@ static void udp_listen_sock_data(const struct ctx *c, union epoll_ref ref,
 	union sockaddr_inany src;
 
 	while (udp_peek_addr(ref.fd, &src) == 0) {
-		flow_sidx_t tosidx = udp_flow_from_sock(c, ref, &src, now);
+		flow_sidx_t tosidx = udp_flow_from_sock(c, ref.udp.pif,
+							ref.udp.port, &src,
+							now);
 		uint8_t topif = pif_at_sidx(tosidx);
 
 		if (pif_is_socket(topif)) {
diff --git a/udp_flow.c b/udp_flow.c
index d50bddb2..b95c3176 100644
--- a/udp_flow.c
+++ b/udp_flow.c
@@ -160,8 +160,10 @@ cancel:
 }
 
 /**
- * udp_flow_from_sock() - Find or create UDP flow for "listening" socket
+ * udp_flow_from_sock() - Find or create UDP flow for incoming datagram
  * @c:		Execution context
+ * @pif:	Interface the datagram is arriving from
+ * @port:	Our (local) port number to which the datagram is arriving
  * @ref:	epoll reference of the receiving socket
  * @s_in:	Source socket address, filled in by recvmmsg()
  * @now:	Timestamp
@@ -171,7 +173,7 @@ cancel:
  * Return: sidx for the destination side of the flow for this packet, or
  *         FLOW_SIDX_NONE if we couldn't find or create a flow.
  */
-flow_sidx_t udp_flow_from_sock(const struct ctx *c, union epoll_ref ref,
+flow_sidx_t udp_flow_from_sock(const struct ctx *c, uint8_t pif, in_port_t port,
 			       const union sockaddr_inany *s_in,
 			       const struct timespec *now)
 {
@@ -180,9 +182,7 @@ flow_sidx_t udp_flow_from_sock(const struct ctx *c, union epoll_ref ref,
 	union flow *flow;
 	flow_sidx_t sidx;
 
-	ASSERT(ref.type == EPOLL_TYPE_UDP_LISTEN);
-
-	sidx = flow_lookup_sa(c, IPPROTO_UDP, ref.udp.pif, s_in, ref.udp.port);
+	sidx = flow_lookup_sa(c, IPPROTO_UDP, pif, s_in, port);
 	if ((uflow = udp_at_sidx(sidx))) {
 		uflow->ts = now->tv_sec;
 		return flow_sidx_opposite(sidx);
@@ -192,12 +192,11 @@ flow_sidx_t udp_flow_from_sock(const struct ctx *c, union epoll_ref ref,
 		char sastr[SOCKADDR_STRLEN];
 
 		debug("Couldn't allocate flow for UDP datagram from %s %s",
-		      pif_name(ref.udp.pif),
-		      sockaddr_ntop(s_in, sastr, sizeof(sastr)));
+		      pif_name(pif), sockaddr_ntop(s_in, sastr, sizeof(sastr)));
 		return FLOW_SIDX_NONE;
 	}
 
-	ini = flow_initiate_sa(flow, ref.udp.pif, s_in, ref.udp.port);
+	ini = flow_initiate_sa(flow, pif, s_in, port);
 
 	if (!inany_is_unicast(&ini->eaddr) ||
 	    ini->eport == 0 || ini->oport == 0) {
diff --git a/udp_flow.h b/udp_flow.h
index 9a1b059c..d4e4c8b9 100644
--- a/udp_flow.h
+++ b/udp_flow.h
@@ -24,7 +24,7 @@ struct udp_flow {
 };
 
 struct udp_flow *udp_at_sidx(flow_sidx_t sidx);
-flow_sidx_t udp_flow_from_sock(const struct ctx *c, union epoll_ref ref,
+flow_sidx_t udp_flow_from_sock(const struct ctx *c, uint8_t pif, in_port_t port,
 			       const union sockaddr_inany *s_in,
 			       const struct timespec *now);
 flow_sidx_t udp_flow_from_tap(const struct ctx *c,
-- 
@@ -24,7 +24,7 @@ struct udp_flow {
 };
 
 struct udp_flow *udp_at_sidx(flow_sidx_t sidx);
-flow_sidx_t udp_flow_from_sock(const struct ctx *c, union epoll_ref ref,
+flow_sidx_t udp_flow_from_sock(const struct ctx *c, uint8_t pif, in_port_t port,
 			       const union sockaddr_inany *s_in,
 			       const struct timespec *now);
 flow_sidx_t udp_flow_from_tap(const struct ctx *c,
-- 
2.49.0


^ permalink raw reply related	[flat|nested] 13+ messages in thread

* [PATCH 10/12] udp: Rework udp_listen_sock_data() into udp_sock_fwd()
  2025-04-04 10:15 [PATCH 00/12] Use connect()ed sockets for both sides of UDP flows David Gibson
                   ` (8 preceding siblings ...)
  2025-04-04 10:15 ` [PATCH 09/12] udp_flow: Take pif and port as explicit parameters to udp_flow_from_sock() David Gibson
@ 2025-04-04 10:15 ` David Gibson
  2025-04-04 10:15 ` [PATCH 11/12] udp: Fold udp_splice_prepare and udp_splice_send into udp_sock_to_sock David Gibson
  2025-04-04 10:15 ` [PATCH 12/12] udp_flow: Don't discard packets that arrive between bind() and connect() David Gibson
  11 siblings, 0 replies; 13+ messages in thread
From: David Gibson @ 2025-04-04 10:15 UTC (permalink / raw)
  To: Stefano Brivio, passt-dev; +Cc: David Gibson

udp_listen_sock_data() forwards datagrams from a "listening" socket until
there are no more (for now).  We have an upcoming use case where we want
to do that for a socket that's not a "listening" socket, and uses a
different epoll reference.  So, adjust the function to take the pieces it
needs from the reference as direct parameters and rename to udp_sock_fwd().

Signed-off-by: David Gibson <david@gibson.dropbear.id.au>
---
 udp.c | 29 ++++++++++++++---------------
 1 file changed, 14 insertions(+), 15 deletions(-)

diff --git a/udp.c b/udp.c
index dbb33f2a..22e74b48 100644
--- a/udp.c
+++ b/udp.c
@@ -717,37 +717,36 @@ static void udp_buf_sock_to_tap(const struct ctx *c, int s, int n,
 }
 
 /**
- * udp_listen_sock_data() - Handle new data from listening socket
+ * udp_sock_fwd() - Forward datagrams from a possibly unconnected socket
  * @c:		Execution context
- * @ref:	epoll reference
+ * @s:		Socket to forward from
+ * @frompif:	Interface to which @s belongs
+ * @port:	Our (local) port number of @s
  * @now:	Current timestamp
  */
-static void udp_listen_sock_data(const struct ctx *c, union epoll_ref ref,
-				     const struct timespec *now)
+static void udp_sock_fwd(const struct ctx *c, int s, uint8_t frompif,
+			 in_port_t port, const struct timespec *now)
 {
 	union sockaddr_inany src;
 
-	while (udp_peek_addr(ref.fd, &src) == 0) {
-		flow_sidx_t tosidx = udp_flow_from_sock(c, ref.udp.pif,
-							ref.udp.port, &src,
-							now);
+	while (udp_peek_addr(s, &src) == 0) {
+		flow_sidx_t tosidx = udp_flow_from_sock(c, frompif, port,
+							&src, now);
 		uint8_t topif = pif_at_sidx(tosidx);
 
 		if (pif_is_socket(topif)) {
-			udp_sock_to_sock(c, ref.fd, 1, tosidx);
+			udp_sock_to_sock(c, s, 1, tosidx);
 		} else if (topif == PIF_TAP) {
 			if (c->mode == MODE_VU)
-				udp_vu_sock_to_tap(c, ref.fd, 1, tosidx);
+				udp_vu_sock_to_tap(c, s, 1, tosidx);
 			else
-				udp_buf_sock_to_tap(c, ref.fd, 1, tosidx);
+				udp_buf_sock_to_tap(c, s, 1, tosidx);
 		} else if (flow_sidx_valid(tosidx)) {
-			flow_sidx_t fromsidx = flow_sidx_opposite(tosidx);
 			struct udp_flow *uflow = udp_at_sidx(tosidx);
 
 			flow_err(uflow,
 				 "No support for forwarding UDP from %s to %s",
-				 pif_name(pif_at_sidx(fromsidx)),
-				 pif_name(topif));
+				 pif_name(frompif), pif_name(topif));
 		} else {
 			debug("Discarding datagram without flow");
 		}
@@ -775,7 +774,7 @@ void udp_listen_sock_handler(const struct ctx *c,
 	}
 
 	if (events & EPOLLIN)
-		udp_listen_sock_data(c, ref, now);
+		udp_sock_fwd(c, ref.fd, ref.udp.pif, ref.udp.port, now);
 }
 
 /**
-- 
@@ -717,37 +717,36 @@ static void udp_buf_sock_to_tap(const struct ctx *c, int s, int n,
 }
 
 /**
- * udp_listen_sock_data() - Handle new data from listening socket
+ * udp_sock_fwd() - Forward datagrams from a possibly unconnected socket
  * @c:		Execution context
- * @ref:	epoll reference
+ * @s:		Socket to forward from
+ * @frompif:	Interface to which @s belongs
+ * @port:	Our (local) port number of @s
  * @now:	Current timestamp
  */
-static void udp_listen_sock_data(const struct ctx *c, union epoll_ref ref,
-				     const struct timespec *now)
+static void udp_sock_fwd(const struct ctx *c, int s, uint8_t frompif,
+			 in_port_t port, const struct timespec *now)
 {
 	union sockaddr_inany src;
 
-	while (udp_peek_addr(ref.fd, &src) == 0) {
-		flow_sidx_t tosidx = udp_flow_from_sock(c, ref.udp.pif,
-							ref.udp.port, &src,
-							now);
+	while (udp_peek_addr(s, &src) == 0) {
+		flow_sidx_t tosidx = udp_flow_from_sock(c, frompif, port,
+							&src, now);
 		uint8_t topif = pif_at_sidx(tosidx);
 
 		if (pif_is_socket(topif)) {
-			udp_sock_to_sock(c, ref.fd, 1, tosidx);
+			udp_sock_to_sock(c, s, 1, tosidx);
 		} else if (topif == PIF_TAP) {
 			if (c->mode == MODE_VU)
-				udp_vu_sock_to_tap(c, ref.fd, 1, tosidx);
+				udp_vu_sock_to_tap(c, s, 1, tosidx);
 			else
-				udp_buf_sock_to_tap(c, ref.fd, 1, tosidx);
+				udp_buf_sock_to_tap(c, s, 1, tosidx);
 		} else if (flow_sidx_valid(tosidx)) {
-			flow_sidx_t fromsidx = flow_sidx_opposite(tosidx);
 			struct udp_flow *uflow = udp_at_sidx(tosidx);
 
 			flow_err(uflow,
 				 "No support for forwarding UDP from %s to %s",
-				 pif_name(pif_at_sidx(fromsidx)),
-				 pif_name(topif));
+				 pif_name(frompif), pif_name(topif));
 		} else {
 			debug("Discarding datagram without flow");
 		}
@@ -775,7 +774,7 @@ void udp_listen_sock_handler(const struct ctx *c,
 	}
 
 	if (events & EPOLLIN)
-		udp_listen_sock_data(c, ref, now);
+		udp_sock_fwd(c, ref.fd, ref.udp.pif, ref.udp.port, now);
 }
 
 /**
-- 
2.49.0


^ permalink raw reply related	[flat|nested] 13+ messages in thread

* [PATCH 11/12] udp: Fold udp_splice_prepare and udp_splice_send into udp_sock_to_sock
  2025-04-04 10:15 [PATCH 00/12] Use connect()ed sockets for both sides of UDP flows David Gibson
                   ` (9 preceding siblings ...)
  2025-04-04 10:15 ` [PATCH 10/12] udp: Rework udp_listen_sock_data() into udp_sock_fwd() David Gibson
@ 2025-04-04 10:15 ` David Gibson
  2025-04-04 10:15 ` [PATCH 12/12] udp_flow: Don't discard packets that arrive between bind() and connect() David Gibson
  11 siblings, 0 replies; 13+ messages in thread
From: David Gibson @ 2025-04-04 10:15 UTC (permalink / raw)
  To: Stefano Brivio, passt-dev; +Cc: David Gibson

udp_splice() prepare and udp_splice_send() are both quite simple functions
that now have only one caller: udp_sock_to_sock().  Fold them both into
that caller.

Signed-off-by: David Gibson <david@gibson.dropbear.id.au>
---
 udp.c | 53 +++++++++++++----------------------------------------
 1 file changed, 13 insertions(+), 40 deletions(-)

diff --git a/udp.c b/udp.c
index 22e74b48..7c8b7a2c 100644
--- a/udp.c
+++ b/udp.c
@@ -250,43 +250,6 @@ static void udp_iov_init(const struct ctx *c)
 		udp_iov_init_one(c, i);
 }
 
-/**
- * udp_splice_prepare() - Prepare one datagram for splicing
- * @mmh:	Receiving mmsghdr array
- * @idx:	Index of the datagram to prepare
- */
-static void udp_splice_prepare(struct mmsghdr *mmh, unsigned idx)
-{
-	udp_mh_splice[idx].msg_hdr.msg_iov->iov_len = mmh[idx].msg_len;
-}
-
-/**
- * udp_splice_send() - Send a batch of datagrams from socket to socket
- * @c:		Execution context
- * @start:	Index of batch's first datagram in udp[46]_l2_buf
- * @n:		Number of datagrams in batch
- * @src:	Source port for datagram (target side)
- * @dst:	Destination port for datagrams (target side)
- * @ref:	epoll reference for origin socket
- * @now:	Timestamp
- *
- * #syscalls sendmmsg
- */
-static void udp_splice_send(const struct ctx *c, size_t start, size_t n,
-			    flow_sidx_t tosidx)
-{
-	const struct flowside *toside = flowside_at_sidx(tosidx);
-	const struct udp_flow *uflow = udp_at_sidx(tosidx);
-	uint8_t topif = pif_at_sidx(tosidx);
-	int s = uflow->s[tosidx.sidei];
-	socklen_t sl;
-
-	pif_sockaddr(c, &udp_splice_to, &sl, topif,
-		     &toside->eaddr, toside->eport);
-
-	sendmmsg(s, udp_mh_splice + start, n, MSG_NOSIGNAL);
-}
-
 /**
  * udp_update_hdr4() - Update headers for one IPv4 datagram
  * @ip4h:		Pre-filled IPv4 header (except for tot_len and saddr)
@@ -683,15 +646,25 @@ static int udp_sock_recv(const struct ctx *c, int s, struct mmsghdr *mmh, int n)
 static void udp_sock_to_sock(const struct ctx *c, int from_s, int n,
 			     flow_sidx_t tosidx)
 {
+	const struct flowside *toside = flowside_at_sidx(tosidx);
+	const struct udp_flow *uflow = udp_at_sidx(tosidx);
+	uint8_t topif = pif_at_sidx(tosidx);
+	int to_s = uflow->s[tosidx.sidei];
+	socklen_t sl;
 	int i;
 
 	if ((n = udp_sock_recv(c, from_s, udp_mh_recv, n)) <= 0)
 		return;
 
-	for (i = 0; i < n; i++)
-		udp_splice_prepare(udp_mh_recv, i);
+	for (i = 0; i < n; i++) {
+		udp_mh_splice[i].msg_hdr.msg_iov->iov_len
+			= udp_mh_recv[i].msg_len;
+	}
+
+	pif_sockaddr(c, &udp_splice_to, &sl, topif,
+		     &toside->eaddr, toside->eport);
 
-	udp_splice_send(c, 0, n, tosidx);
+	sendmmsg(to_s, udp_mh_splice, n, MSG_NOSIGNAL);
 }
 
 /**
-- 
@@ -250,43 +250,6 @@ static void udp_iov_init(const struct ctx *c)
 		udp_iov_init_one(c, i);
 }
 
-/**
- * udp_splice_prepare() - Prepare one datagram for splicing
- * @mmh:	Receiving mmsghdr array
- * @idx:	Index of the datagram to prepare
- */
-static void udp_splice_prepare(struct mmsghdr *mmh, unsigned idx)
-{
-	udp_mh_splice[idx].msg_hdr.msg_iov->iov_len = mmh[idx].msg_len;
-}
-
-/**
- * udp_splice_send() - Send a batch of datagrams from socket to socket
- * @c:		Execution context
- * @start:	Index of batch's first datagram in udp[46]_l2_buf
- * @n:		Number of datagrams in batch
- * @src:	Source port for datagram (target side)
- * @dst:	Destination port for datagrams (target side)
- * @ref:	epoll reference for origin socket
- * @now:	Timestamp
- *
- * #syscalls sendmmsg
- */
-static void udp_splice_send(const struct ctx *c, size_t start, size_t n,
-			    flow_sidx_t tosidx)
-{
-	const struct flowside *toside = flowside_at_sidx(tosidx);
-	const struct udp_flow *uflow = udp_at_sidx(tosidx);
-	uint8_t topif = pif_at_sidx(tosidx);
-	int s = uflow->s[tosidx.sidei];
-	socklen_t sl;
-
-	pif_sockaddr(c, &udp_splice_to, &sl, topif,
-		     &toside->eaddr, toside->eport);
-
-	sendmmsg(s, udp_mh_splice + start, n, MSG_NOSIGNAL);
-}
-
 /**
  * udp_update_hdr4() - Update headers for one IPv4 datagram
  * @ip4h:		Pre-filled IPv4 header (except for tot_len and saddr)
@@ -683,15 +646,25 @@ static int udp_sock_recv(const struct ctx *c, int s, struct mmsghdr *mmh, int n)
 static void udp_sock_to_sock(const struct ctx *c, int from_s, int n,
 			     flow_sidx_t tosidx)
 {
+	const struct flowside *toside = flowside_at_sidx(tosidx);
+	const struct udp_flow *uflow = udp_at_sidx(tosidx);
+	uint8_t topif = pif_at_sidx(tosidx);
+	int to_s = uflow->s[tosidx.sidei];
+	socklen_t sl;
 	int i;
 
 	if ((n = udp_sock_recv(c, from_s, udp_mh_recv, n)) <= 0)
 		return;
 
-	for (i = 0; i < n; i++)
-		udp_splice_prepare(udp_mh_recv, i);
+	for (i = 0; i < n; i++) {
+		udp_mh_splice[i].msg_hdr.msg_iov->iov_len
+			= udp_mh_recv[i].msg_len;
+	}
+
+	pif_sockaddr(c, &udp_splice_to, &sl, topif,
+		     &toside->eaddr, toside->eport);
 
-	udp_splice_send(c, 0, n, tosidx);
+	sendmmsg(to_s, udp_mh_splice, n, MSG_NOSIGNAL);
 }
 
 /**
-- 
2.49.0


^ permalink raw reply related	[flat|nested] 13+ messages in thread

* [PATCH 12/12] udp_flow: Don't discard packets that arrive between bind() and connect()
  2025-04-04 10:15 [PATCH 00/12] Use connect()ed sockets for both sides of UDP flows David Gibson
                   ` (10 preceding siblings ...)
  2025-04-04 10:15 ` [PATCH 11/12] udp: Fold udp_splice_prepare and udp_splice_send into udp_sock_to_sock David Gibson
@ 2025-04-04 10:15 ` David Gibson
  11 siblings, 0 replies; 13+ messages in thread
From: David Gibson @ 2025-04-04 10:15 UTC (permalink / raw)
  To: Stefano Brivio, passt-dev; +Cc: David Gibson

When we establish a new UDP flow we create connect()ed sockets that will
only handle datagrams for this flow.  However, there is a race between
bind() and connect() where they might get some packets queued for a
different flow.  Currently we handle this by simply discarding any
queued datagrams after the connect.  UDP protocols should be able to handle
such packet loss, but it's not ideal.

We now have the tools we need to handle this better, by redirecting any
datagrams received during that race to the appropriate flow.  We need to
use a deferred handler for this to avoid unexpectedly re-ordering datagrams
in some edge cases.

Signed-off-by: David Gibson <david@gibson.dropbear.id.au>
---
 flow.c         |  2 +-
 udp.c          |  4 +--
 udp_flow.c     | 73 +++++++++++++++++++++++++++++++++++---------------
 udp_flow.h     |  6 ++++-
 udp_internal.h |  2 ++
 5 files changed, 61 insertions(+), 26 deletions(-)

diff --git a/flow.c b/flow.c
index 86222426..29a83e18 100644
--- a/flow.c
+++ b/flow.c
@@ -850,7 +850,7 @@ void flow_defer_handler(const struct ctx *c, const struct timespec *now)
 				closed = icmp_ping_timer(c, &flow->ping, now);
 			break;
 		case FLOW_UDP:
-			closed = udp_flow_defer(&flow->udp);
+			closed = udp_flow_defer(c, &flow->udp, now);
 			if (!closed && timer)
 				closed = udp_flow_timer(c, &flow->udp, now);
 			break;
diff --git a/udp.c b/udp.c
index 7c8b7a2c..b275db3d 100644
--- a/udp.c
+++ b/udp.c
@@ -697,8 +697,8 @@ static void udp_buf_sock_to_tap(const struct ctx *c, int s, int n,
  * @port:	Our (local) port number of @s
  * @now:	Current timestamp
  */
-static void udp_sock_fwd(const struct ctx *c, int s, uint8_t frompif,
-			 in_port_t port, const struct timespec *now)
+void udp_sock_fwd(const struct ctx *c, int s, uint8_t frompif,
+		  in_port_t port, const struct timespec *now)
 {
 	union sockaddr_inany src;
 
diff --git a/udp_flow.c b/udp_flow.c
index b95c3176..af15d7f2 100644
--- a/udp_flow.c
+++ b/udp_flow.c
@@ -9,10 +9,12 @@
 #include <fcntl.h>
 #include <sys/uio.h>
 #include <unistd.h>
+#include <netinet/udp.h>
 
 #include "util.h"
 #include "passt.h"
 #include "flow_table.h"
+#include "udp_internal.h"
 
 #define UDP_CONN_TIMEOUT	180 /* s, timeout for ephemeral or local bind */
 
@@ -67,16 +69,15 @@ void udp_flow_close(const struct ctx *c, struct udp_flow *uflow)
  * Return: fd of new socket on success, -ve error code on failure
  */
 static int udp_flow_sock(const struct ctx *c,
-			 const struct udp_flow *uflow, unsigned sidei)
+			 struct udp_flow *uflow, unsigned sidei)
 {
 	const struct flowside *side = &uflow->f.side[sidei];
-	struct mmsghdr discard[UIO_MAXIOV] = { 0 };
 	uint8_t pif = uflow->f.pif[sidei];
 	union {
 		flow_sidx_t sidx;
 		uint32_t data;
 	} fref = { .sidx = FLOW_SIDX(uflow, sidei) };
-	int rc, s;
+	int s;
 
 	s = flowside_sock_l4(c, EPOLL_TYPE_UDP, pif, side, fref.data);
 	if (s < 0) {
@@ -85,30 +86,32 @@ static int udp_flow_sock(const struct ctx *c,
 	}
 
 	if (flowside_connect(c, s, pif, side) < 0) {
-		rc = -errno;
+		int rc = -errno;
 		flow_dbg_perror(uflow, "Couldn't connect flow socket");
 		return rc;
 	}
 
-	/* It's possible, if unlikely, that we could receive some unrelated
-	 * packets in between the bind() and connect() of this socket.  For now
-	 * we just discard these.
+	/* It's possible, if unlikely, that we could receive some packets in
+	 * between the bind() and connect() which may or may not be for this
+	 * flow.  Being UDP we could just discard them, but it's not ideal.
 	 *
-	 * FIXME: Redirect these to an appropriate handler
+	 * There's also a tricky case if a bunch of datagrams for a new flow
+	 * arrive in rapid succession, the first going to the original listening
+	 * socket and later ones going to this new socket.  If we forwarded the
+	 * datagrams from the new socket immediately here they would go before
+	 * the datagram which established the flow.  Again, not strictly wrong
+	 * for UDP, but not ideal.
+	 *
+	 * So, we flag that the new socket is in a transient state where it
+	 * might have datagrams for a different flow queued.  Before the next
+	 * epoll cycle, udp_flow_defer() will flush out any such datagrams, and
+	 * thereafter everything on the new socket should be strictly for this
+	 * flow.
 	 */
-	rc = recvmmsg(s, discard, ARRAY_SIZE(discard), MSG_DONTWAIT, NULL);
-	if (rc >= ARRAY_SIZE(discard)) {
-		flow_dbg(uflow, "Too many (%d) spurious reply datagrams", rc);
-		return -E2BIG;
-	}
-
-	if (rc > 0) {
-		flow_trace(uflow, "Discarded %d spurious reply datagrams", rc);
-	} else if (errno != EAGAIN) {
-		rc = -errno;
-		flow_perror(uflow, "Unexpected error discarding datagrams");
-		return rc;
-	}
+	if (sidei)
+		uflow->flush1 = true;
+	else
+		uflow->flush0 = true;
 
 	return s;
 }
@@ -268,14 +271,40 @@ flow_sidx_t udp_flow_from_tap(const struct ctx *c,
 	return udp_flow_new(c, flow, now);
 }
 
+/**
+ * udp_flush_flow() - Flush datagrams that might not be for this flow
+ * @ctx:	Execution context
+ * @uflow:	Flow to handle
+ * @sidei:	Side of the flow to flush
+ * @now:	Current timestamp
+ */
+static void udp_flush_flow(const struct ctx *c,
+			   const struct udp_flow *uflow, unsigned sidei,
+			   const struct timespec *now)
+{
+	/* We don't know exactly where the datagrams will come from, but we know
+	 * they'll have an interface and oport matching this flow */
+	udp_sock_fwd(c, uflow->s[sidei], uflow->f.pif[sidei],
+		     uflow->f.side[sidei].oport, now);
+}
+
 /**
  * udp_flow_defer() - Deferred per-flow handling (clean up aborted flows)
  * @uflow:	Flow to handle
  *
  * Return: true if the connection is ready to free, false otherwise
  */
-bool udp_flow_defer(const struct udp_flow *uflow)
+bool udp_flow_defer(const struct ctx *c, struct udp_flow *uflow,
+		    const struct timespec *now)
 {
+	if (uflow->flush0) {
+		udp_flush_flow(c, uflow, INISIDE, now);
+		uflow->flush0 = false;
+	}
+	if (uflow->flush1) {
+		udp_flush_flow(c, uflow, TGTSIDE, now);
+		uflow->flush1 = false;
+	}
 	return uflow->closed;
 }
 
diff --git a/udp_flow.h b/udp_flow.h
index d4e4c8b9..d518737e 100644
--- a/udp_flow.h
+++ b/udp_flow.h
@@ -11,6 +11,8 @@
  * struct udp - Descriptor for a flow of UDP packets
  * @f:		Generic flow information
  * @closed:	Flow is already closed
+ * @flush0:	@s[0] may have datagrams queued for other flows
+ * @flush1:	@s[1] may have datagrams queued for other flows
  * @ts:		Activity timestamp
  * @s:		Socket fd (or -1) for each side of the flow
  */
@@ -19,6 +21,7 @@ struct udp_flow {
 	struct flow_common f;
 
 	bool closed :1;
+	bool flush0, flush1 :1;
 	time_t ts;
 	int s[SIDES];
 };
@@ -33,7 +36,8 @@ flow_sidx_t udp_flow_from_tap(const struct ctx *c,
 			      in_port_t srcport, in_port_t dstport,
 			      const struct timespec *now);
 void udp_flow_close(const struct ctx *c, struct udp_flow *uflow);
-bool udp_flow_defer(const struct udp_flow *uflow);
+bool udp_flow_defer(const struct ctx *c, struct udp_flow *uflow,
+		    const struct timespec *now);
 bool udp_flow_timer(const struct ctx *c, struct udp_flow *uflow,
 		    const struct timespec *now);
 
diff --git a/udp_internal.h b/udp_internal.h
index f7d84267..96d11cff 100644
--- a/udp_internal.h
+++ b/udp_internal.h
@@ -28,5 +28,7 @@ size_t udp_update_hdr4(struct iphdr *ip4h, struct udp_payload_t *bp,
 size_t udp_update_hdr6(struct ipv6hdr *ip6h, struct udp_payload_t *bp,
                        const struct flowside *toside, size_t dlen,
 		       bool no_udp_csum);
+void udp_sock_fwd(const struct ctx *c, int s, uint8_t frompif,
+		  in_port_t port, const struct timespec *now);
 
 #endif /* UDP_INTERNAL_H */
-- 
@@ -28,5 +28,7 @@ size_t udp_update_hdr4(struct iphdr *ip4h, struct udp_payload_t *bp,
 size_t udp_update_hdr6(struct ipv6hdr *ip6h, struct udp_payload_t *bp,
                        const struct flowside *toside, size_t dlen,
 		       bool no_udp_csum);
+void udp_sock_fwd(const struct ctx *c, int s, uint8_t frompif,
+		  in_port_t port, const struct timespec *now);
 
 #endif /* UDP_INTERNAL_H */
-- 
2.49.0


^ permalink raw reply related	[flat|nested] 13+ messages in thread

end of thread, other threads:[~2025-04-04 10:16 UTC | newest]

Thread overview: 13+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2025-04-04 10:15 [PATCH 00/12] Use connect()ed sockets for both sides of UDP flows David Gibson
2025-04-04 10:15 ` [PATCH 01/12] udp: Use connect()ed sockets for initiating side David Gibson
2025-04-04 10:15 ` [PATCH 02/12] udp: Make udp_sock_recv() take max number of frames as a parameter David Gibson
2025-04-04 10:15 ` [PATCH 03/12] udp: Polish udp_vu_sock_info() and remove from vu specific code David Gibson
2025-04-04 10:15 ` [PATCH 04/12] udp: Don't bother to batch datagrams from "listening" socket David Gibson
2025-04-04 10:15 ` [PATCH 05/12] udp: Parameterize number of datagrams handled by udp_*_reply_sock_data() David Gibson
2025-04-04 10:15 ` [PATCH 06/12] udp: Split spliced forwarding path from udp_buf_reply_sock_data() David Gibson
2025-04-04 10:15 ` [PATCH 07/12] udp: Merge vhost-user and "buf" listening socket paths David Gibson
2025-04-04 10:15 ` [PATCH 08/12] udp: Move UDP_MAX_FRAMES to udp.c David Gibson
2025-04-04 10:15 ` [PATCH 09/12] udp_flow: Take pif and port as explicit parameters to udp_flow_from_sock() David Gibson
2025-04-04 10:15 ` [PATCH 10/12] udp: Rework udp_listen_sock_data() into udp_sock_fwd() David Gibson
2025-04-04 10:15 ` [PATCH 11/12] udp: Fold udp_splice_prepare and udp_splice_send into udp_sock_to_sock David Gibson
2025-04-04 10:15 ` [PATCH 12/12] udp_flow: Don't discard packets that arrive between bind() and connect() David Gibson

Code repositories for project(s) associated with this public inbox

	https://passt.top/passt

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for IMAP folder(s).