public inbox for passt-dev@passt.top
 help / color / mirror / code / Atom feed
From: David Gibson <david@gibson.dropbear.id.au>
To: Stefano Brivio <sbrivio@redhat.com>, passt-dev@passt.top
Cc: jmaloy@redhat.com, David Gibson <david@gibson.dropbear.id.au>
Subject: [PATCH v7 21/27] udp: Handle "spliced" datagrams with per-flow sockets
Date: Fri,  5 Jul 2024 12:07:18 +1000	[thread overview]
Message-ID: <20240705020724.3447719-22-david@gibson.dropbear.id.au> (raw)
In-Reply-To: <20240705020724.3447719-1-david@gibson.dropbear.id.au>

When forwarding a datagram to a socket, we need to find a socket with a
suitable local address to send it.  Currently we keep track of such sockets
in an array indexed by local port, but this can't properly handle cases
where we have multiple local addresses in active use.

For "spliced" (socket to socket) cases, improve this by instead opening
a socket specifically for the target side of the flow.  We connect() as
well as bind()ing that socket, so that it will only receive the flow's
reply packets, not anything else.  We direct datagrams sent via that socket
using the addresses from the flow table, effectively replacing bespoke
addressing logic with the unified logic in fwd.c

When we create the flow, we also take a duplicate of the originating
socket, and use that to deliver reply datagrams back to the origin, again
using addresses from the flow table entry.

Signed-off-by: David Gibson <david@gibson.dropbear.id.au>
---
 epoll_type.h |   2 +
 flow.c       |  20 +++
 flow.h       |   2 +
 flow_table.h |  14 ++
 passt.c      |   4 +
 udp.c        | 403 ++++++++++++++++++---------------------------------
 udp.h        |   6 +-
 udp_flow.h   |   2 +
 util.c       |   1 +
 9 files changed, 194 insertions(+), 260 deletions(-)

diff --git a/epoll_type.h b/epoll_type.h
index b6c04199..7a752ed1 100644
--- a/epoll_type.h
+++ b/epoll_type.h
@@ -22,6 +22,8 @@ enum epoll_type {
 	EPOLL_TYPE_TCP_TIMER,
 	/* UDP sockets */
 	EPOLL_TYPE_UDP,
+	/* UDP socket for replies on a specific flow */
+	EPOLL_TYPE_UDP_REPLY,
 	/* ICMP/ICMPv6 ping sockets */
 	EPOLL_TYPE_PING,
 	/* inotify fd watching for end of netns (pasta) */
diff --git a/flow.c b/flow.c
index 0cb9495b..2e100ddb 100644
--- a/flow.c
+++ b/flow.c
@@ -236,6 +236,26 @@ int flowside_sock_l4(const struct ctx *c, enum epoll_type type, uint8_t pif,
 	}
 }
 
+/** flowside_connect() - Connect a socket based on flowside
+ * @c:		Execution context
+ * @s:		Socket to connect
+ * @pif:	Target pif
+ * @tgt:	Target flowside
+ *
+ * Connect @s to the endpoint address and port from @tgt.
+ *
+ * Return: 0 on success, negative on error
+ */
+int flowside_connect(const struct ctx *c, int s,
+		     uint8_t pif, const struct flowside *tgt)
+{
+	union sockaddr_inany sa;
+	socklen_t sl;
+
+	pif_sockaddr(c, &sa, &sl, pif, &tgt->eaddr, tgt->eport);
+	return connect(s, &sa.sa, sl);
+}
+
 /** flow_log_ - Log flow-related message
  * @f:		flow the message is related to
  * @pri:	Log priority
diff --git a/flow.h b/flow.h
index 3752e5ee..3f65ceb9 100644
--- a/flow.h
+++ b/flow.h
@@ -168,6 +168,8 @@ static inline bool flowside_eq(const struct flowside *left,
 
 int flowside_sock_l4(const struct ctx *c, enum epoll_type type, uint8_t pif,
 		     const struct flowside *tgt, uint32_t data);
+int flowside_connect(const struct ctx *c, int s,
+		     uint8_t pif, const struct flowside *tgt);
 
 /**
  * struct flow_common - Common fields for packet flows
diff --git a/flow_table.h b/flow_table.h
index 3fbc7c8d..1faac4a7 100644
--- a/flow_table.h
+++ b/flow_table.h
@@ -92,6 +92,20 @@ static inline flow_sidx_t flow_sidx_opposite(flow_sidx_t sidx)
 	return (flow_sidx_t){.flow = sidx.flow, .side = !sidx.side};
 }
 
+/** pif_at_sidx - Interface for a given flow and side
+ * @sidx:    Flow & side index
+ *
+ * Return: pif for the flow & side given by @sidx
+ */
+static inline uint8_t pif_at_sidx(flow_sidx_t sidx)
+{
+	const union flow *flow = flow_at_sidx(sidx);
+
+	if (!flow)
+		return PIF_NONE;
+	return flow->f.pif[sidx.side];
+}
+
 /** flow_sidx_t - Index of one side of a flow from common structure
  * @f:		Common flow fields pointer
  * @side:	Which side to refer to (0 or 1)
diff --git a/passt.c b/passt.c
index e4d45daa..f9405bee 100644
--- a/passt.c
+++ b/passt.c
@@ -67,6 +67,7 @@ char *epoll_type_str[] = {
 	[EPOLL_TYPE_TCP_LISTEN]		= "listening TCP socket",
 	[EPOLL_TYPE_TCP_TIMER]		= "TCP timer",
 	[EPOLL_TYPE_UDP]		= "UDP socket",
+	[EPOLL_TYPE_UDP_REPLY]		= "UDP reply socket",
 	[EPOLL_TYPE_PING]	= "ICMP/ICMPv6 ping socket",
 	[EPOLL_TYPE_NSQUIT_INOTIFY]	= "namespace inotify watch",
 	[EPOLL_TYPE_NSQUIT_TIMER]	= "namespace timer watch",
@@ -349,6 +350,9 @@ loop:
 		case EPOLL_TYPE_UDP:
 			udp_buf_sock_handler(&c, ref, eventmask, &now);
 			break;
+		case EPOLL_TYPE_UDP_REPLY:
+			udp_reply_sock_handler(&c, ref, eventmask, &now);
+			break;
 		case EPOLL_TYPE_PING:
 			icmp_sock_handler(&c, ref);
 			break;
diff --git a/udp.c b/udp.c
index daf4fe26..f4c696db 100644
--- a/udp.c
+++ b/udp.c
@@ -35,7 +35,31 @@
  * ===================
  *
  * UDP doesn't use listen(), but we consider long term sockets which are allowed
- * to create new flows "listening" by analogy with TCP.
+ * to create new flows "listening" by analogy with TCP. This listening socket
+ * could receive packets from multiple flows, so we use a hash table match to
+ * find the specific flow for a datagram.
+ *
+ * When a UDP flow is initiated from a listening socket we take a duplicate of
+ * the socket and store it in uflow->s[INISIDE].  This will last for the
+ * lifetime of the flow, even if the original listening socket is closed due to
+ * port auto-probing.  The duplicate is used to deliver replies back to the
+ * originating side.
+ *
+ * Reply sockets
+ * =============
+ *
+ * When a UDP flow targets a socket, we create a "reply" socket in
+ * uflow->s[TGTSIDE] both to deliver datagrams to the target side and receive
+ * replies on the target side.  This socket is both bound and connected and has
+ * EPOLL_TYPE_UDP_REPLY.  The connect() means it will only receive datagrams
+ * associated with this flow, so the epoll reference directly points to the flow
+ * and we don't need a hash lookup.
+ *
+ * NOTE: it's possible that the reply socket could have a bound address
+ * overlapping with an unrelated listening socket.  We assume datagrams for the
+ * flow will come to the reply socket in preference to a listening socket.  The
+ * sample program contrib/udp-reuseaddr/reuseaddr-priority.c documents and tests
+ * that assumption.
  *
  * Port tracking
  * =============
@@ -56,62 +80,6 @@
  *
  * Packets are forwarded back and forth, by prepending and stripping UDP headers
  * in the obvious way, with no port translation.
- *
- * In PASTA mode, the L2-L4 translation is skipped for connections to ports
- * bound between namespaces using the loopback interface, messages are directly
- * transferred between L4 sockets instead. These are called spliced connections
- * for consistency with the TCP implementation, but the splice() syscall isn't
- * actually used as it wouldn't make sense for datagram-based connections: a
- * pair of recvmmsg() and sendmmsg() deals with this case.
- *
- * The connection tracking for PASTA mode is slightly complicated by the absence
- * of actual connections, see struct udp_splice_port, and these examples:
- *
- * - from init to namespace:
- *
- *   - forward direction: 127.0.0.1:5000 -> 127.0.0.1:80 in init from socket s,
- *     with epoll reference: index = 80, splice = 1, orig = 1, ns = 0
- *     - if udp_splice_ns[V4][5000].sock:
- *       - send packet to udp_splice_ns[V4][5000].sock, with destination port
- *         80
- *     - otherwise:
- *       - create new socket udp_splice_ns[V4][5000].sock
- *       - bind in namespace to 127.0.0.1:5000
- *       - add to epoll with reference: index = 5000, splice = 1, orig = 0,
- *         ns = 1
- *     - update udp_splice_init[V4][80].ts and udp_splice_ns[V4][5000].ts with
- *       current time
- *
- *   - reverse direction: 127.0.0.1:80 -> 127.0.0.1:5000 in namespace socket s,
- *     having epoll reference: index = 5000, splice = 1, orig = 0, ns = 1
- *     - if udp_splice_init[V4][80].sock:
- *       - send to udp_splice_init[V4][80].sock, with destination port 5000
- *       - update udp_splice_init[V4][80].ts and udp_splice_ns[V4][5000].ts with
- *         current time
- *     - otherwise, discard
- *
- * - from namespace to init:
- *
- *   - forward direction: 127.0.0.1:2000 -> 127.0.0.1:22 in namespace from
- *     socket s, with epoll reference: index = 22, splice = 1, orig = 1, ns = 1
- *     - if udp4_splice_init[V4][2000].sock:
- *       - send packet to udp_splice_init[V4][2000].sock, with destination
- *         port 22
- *     - otherwise:
- *       - create new socket udp_splice_init[V4][2000].sock
- *       - bind in init to 127.0.0.1:2000
- *       - add to epoll with reference: index = 2000, splice = 1, orig = 0,
- *         ns = 0
- *     - update udp_splice_ns[V4][22].ts and udp_splice_init[V4][2000].ts with
- *       current time
- *
- *   - reverse direction: 127.0.0.1:22 -> 127.0.0.1:2000 in init from socket s,
- *     having epoll reference: index = 2000, splice = 1, orig = 0, ns = 0
- *   - if udp_splice_ns[V4][22].sock:
- *     - send to udp_splice_ns[V4][22].sock, with destination port 2000
- *     - update udp_splice_ns[V4][22].ts and udp_splice_init[V4][2000].ts with
- *       current time
- *   - otherwise, discard
  */
 
 #include <sched.h>
@@ -134,6 +102,7 @@
 #include <sys/socket.h>
 #include <sys/uio.h>
 #include <time.h>
+#include <fcntl.h>
 
 #include "checksum.h"
 #include "util.h"
@@ -223,7 +192,6 @@ static struct ethhdr udp6_eth_hdr;
  * @ip4h:	Pre-filled IPv4 header (except for tot_len and saddr)
  * @taph:	Tap backend specific header
  * @s_in:	Source socket address, filled in by recvmmsg()
- * @splicesrc:	Source port for splicing, or -1 if not spliceable
  * @tosidx:	sidx for the destination side of this datagram's flow
  */
 static struct udp_meta_t {
@@ -232,7 +200,6 @@ static struct udp_meta_t {
 	struct tap_hdr taph;
 
 	union sockaddr_inany s_in;
-	int splicesrc;
 	flow_sidx_t tosidx;
 }
 #ifdef __AVX2__
@@ -270,7 +237,6 @@ static struct mmsghdr	udp_mh_splice		[UDP_MAX_FRAMES];
 /* IOVs for L2 frames */
 static struct iovec	udp_l2_iov		[UDP_MAX_FRAMES][UDP_NUM_IOVS];
 
-
 /**
  * udp_portmap_clear() - Clear UDP port map before configuration
  */
@@ -383,140 +349,6 @@ static void udp_iov_init(const struct ctx *c)
 		udp_iov_init_one(c, i);
 }
 
-/**
- * udp_splice_new() - Create and prepare socket for "spliced" binding
- * @c:		Execution context
- * @v6:		Set for IPv6 sockets
- * @src:	Source port of original connection, host order
- * @ns:		Does the splice originate in the ns or not
- *
- * Return: prepared socket, negative error code on failure
- *
- * #syscalls:pasta getsockname
- */
-int udp_splice_new(const struct ctx *c, int v6, in_port_t src, bool ns)
-{
-	struct epoll_event ev = { .events = EPOLLIN | EPOLLRDHUP | EPOLLHUP };
-	union epoll_ref ref = { .type = EPOLL_TYPE_UDP,
-				.udp = { .splice = true, .v6 = v6, .port = src }
-			      };
-	struct udp_splice_port *sp;
-	int act, s;
-
-	if (ns) {
-		ref.udp.pif = PIF_SPLICE;
-		sp = &udp_splice_ns[v6 ? V6 : V4][src];
-		act = UDP_ACT_SPLICE_NS;
-	} else {
-		ref.udp.pif = PIF_HOST;
-		sp = &udp_splice_init[v6 ? V6 : V4][src];
-		act = UDP_ACT_SPLICE_INIT;
-	}
-
-	s = socket(v6 ? AF_INET6 : AF_INET, SOCK_DGRAM | SOCK_NONBLOCK,
-		   IPPROTO_UDP);
-
-	if (s > FD_REF_MAX) {
-		close(s);
-		return -EIO;
-	}
-
-	if (s < 0)
-		return s;
-
-	ref.fd = s;
-
-	if (v6) {
-		struct sockaddr_in6 addr6 = {
-			.sin6_family = AF_INET6,
-			.sin6_port = htons(src),
-			.sin6_addr = IN6ADDR_LOOPBACK_INIT,
-		};
-		if (bind(s, (struct sockaddr *)&addr6, sizeof(addr6)))
-			goto fail;
-	} else {
-		struct sockaddr_in addr4 = {
-			.sin_family = AF_INET,
-			.sin_port = htons(src),
-			.sin_addr = IN4ADDR_LOOPBACK_INIT,
-		};
-		if (bind(s, (struct sockaddr *)&addr4, sizeof(addr4)))
-			goto fail;
-	}
-
-	sp->sock = s;
-	bitmap_set(udp_act[v6 ? V6 : V4][act], src);
-
-	ev.data.u64 = ref.u64;
-	epoll_ctl(c->epollfd, EPOLL_CTL_ADD, s, &ev);
-	return s;
-
-fail:
-	close(s);
-	return -1;
-}
-
-/**
- * struct udp_splice_new_ns_arg - Arguments for udp_splice_new_ns()
- * @c:		Execution context
- * @v6:		Set for IPv6
- * @src:	Source port of originating datagram, host order
- * @dst:	Destination port of originating datagram, host order
- * @s:		Newly created socket or negative error code
- */
-struct udp_splice_new_ns_arg {
-	const struct ctx *c;
-	int v6;
-	in_port_t src;
-	int s;
-};
-
-/**
- * udp_splice_new_ns() - Enter namespace and call udp_splice_new()
- * @arg:	See struct udp_splice_new_ns_arg
- *
- * Return: 0
- */
-static int udp_splice_new_ns(void *arg)
-{
-	struct udp_splice_new_ns_arg *a;
-
-	a = (struct udp_splice_new_ns_arg *)arg;
-
-	ns_enter(a->c);
-
-	a->s = udp_splice_new(a->c, a->v6, a->src, true);
-
-	return 0;
-}
-
-/**
- * udp_mmh_splice_port() - Is source address of message suitable for splicing?
- * @ref:	epoll reference for incoming message's origin socket
- * @mmh:	mmsghdr of incoming message
- *
- * Return: if source address of message in @mmh refers to localhost (127.0.0.1
- *         or ::1) its source port (host order), otherwise -1.
- */
-static int udp_mmh_splice_port(union epoll_ref ref, const struct mmsghdr *mmh)
-{
-	const struct sockaddr_in6 *sa6 = mmh->msg_hdr.msg_name;
-	const struct sockaddr_in *sa4 = mmh->msg_hdr.msg_name;
-
-	ASSERT(ref.type == EPOLL_TYPE_UDP);
-
-	if (!ref.udp.splice)
-		return -1;
-
-	if (ref.udp.v6 && IN6_IS_ADDR_LOOPBACK(&sa6->sin6_addr))
-		return ntohs(sa6->sin6_port);
-
-	if (!ref.udp.v6 && IN4_IS_ADDR_LOOPBACK(&sa4->sin_addr))
-		return ntohs(sa4->sin_port);
-
-	return -1;
-}
-
 /**
  * udp_at_sidx() - Get UDP specific flow at given sidx
  * @sidx:    Flow and side to retrieve
@@ -542,6 +374,16 @@ struct udp_flow *udp_at_sidx(flow_sidx_t sidx)
  */
 static void udp_flow_close(const struct ctx *c, const struct udp_flow *uflow)
 {
+	if (uflow->s[INISIDE] >= 0) {
+		/* The listening socket needs to stay in epoll */
+		close(uflow->s[INISIDE]);
+	}
+
+	if (uflow->s[TGTSIDE] >= 0) {
+		/* But the flow specific one needs to be removed */
+		epoll_ctl(c->epollfd, EPOLL_CTL_DEL, uflow->s[TGTSIDE], NULL);
+		close(uflow->s[TGTSIDE]);
+	}
 	flow_hash_remove(c, FLOW_SIDX(uflow, INISIDE));
 }
 
@@ -549,26 +391,80 @@ static void udp_flow_close(const struct ctx *c, const struct udp_flow *uflow)
  * udp_flow_new() - Common setup for a new UDP flow
  * @c:		Execution context
  * @flow:	Initiated flow
+ * @s_ini:	Initiating socket (or -1)
  * @now:	Timestamp
  *
  * Return: UDP specific flow, if successful, NULL on failure
  */
 static flow_sidx_t udp_flow_new(const struct ctx *c, union flow *flow,
-				const struct timespec *now)
+				int s_ini, const struct timespec *now)
 {
 	const struct flowside *ini = &flow->f.side[INISIDE];
 	struct udp_flow *uflow = NULL;
+	const struct flowside *tgt;
+	uint8_t tgtpif;
 
 	if (!inany_is_unicast(&ini->eaddr) || ini->eport == 0) {
 		flow_dbg(flow, "Invalid endpoint to initiate UDP flow");
 		goto cancel;
 	}
 
-	if (!flow_target(c, flow, IPPROTO_UDP))
+	if (!(tgt = flow_target(c, flow, IPPROTO_UDP)))
 		goto cancel;
+	tgtpif = flow->f.pif[TGTSIDE];
 
 	uflow = FLOW_SET_TYPE(flow, FLOW_UDP, udp);
 	uflow->ts = now->tv_sec;
+	uflow->s[INISIDE] = uflow->s[TGTSIDE] = -1;
+
+	if (s_ini >= 0) {
+		/* When using auto port-scanning the listening port could go
+		 * away, so we need to duplicate it */
+		uflow->s[INISIDE] = fcntl(s_ini, F_DUPFD_CLOEXEC, 0);
+		if (uflow->s[INISIDE] < 0) {
+			flow_err(uflow,
+				 "Couldn't duplicate listening socket: %s",
+				 strerror(errno));
+			goto cancel;
+		}
+	}
+
+	if (pif_is_socket(tgtpif)) {
+		union {
+			flow_sidx_t sidx;
+			uint32_t data;
+		} fref = {
+			.sidx = FLOW_SIDX(flow, TGTSIDE),
+		};
+
+		uflow->s[TGTSIDE] = flowside_sock_l4(c, EPOLL_TYPE_UDP_REPLY,
+						     tgtpif, tgt, fref.data);
+		if (uflow->s[TGTSIDE] < 0) {
+			flow_dbg(uflow,
+				 "Couldn't open socket for spliced flow: %s",
+				 strerror(errno));
+			goto cancel;
+		}
+
+		if (flowside_connect(c, uflow->s[TGTSIDE], tgtpif, tgt) < 0) {
+			flow_dbg(uflow,
+				 "Couldn't connect flow socket: %s",
+				 strerror(errno));
+			goto cancel;
+		}
+
+		/* It's possible, if unlikely, that we could receive some
+		 * unrelated packets in between the bind() and connect() of this
+		 * socket.  For now we just discard these.  We could consider
+		 * trying to re-direct these to an appropriate handler, if we
+		 * need to.
+		 */
+		/* cppcheck-suppress nullPointer */
+		while (recv(uflow->s[TGTSIDE], NULL, 0, MSG_DONTWAIT) >= 0)
+			;
+		if (errno != EAGAIN)
+			warn_perror("Unexpected error discarding datagrams");
+	}
 
 	flow_hash_insert(c, FLOW_SIDX(uflow, INISIDE));
 	FLOW_ACTIVATE(uflow);
@@ -580,7 +476,6 @@ cancel:
 		udp_flow_close(c, uflow);
 	flow_alloc_cancel(flow);
 	return FLOW_SIDX_NONE;
-
 }
 
 /**
@@ -590,6 +485,8 @@ cancel:
  * @meta:	Metadata buffer for the datagram
  * @now:	Timestamp
  *
+ * #syscalls fcntl
+ *
  * Return: sidx for the destination side of the flow for this packet, or
  *         FLOW_SIDX_NONE if we couldn't find or create a flow.
  */
@@ -623,7 +520,7 @@ static flow_sidx_t udp_flow_from_sock(const struct ctx *c, union epoll_ref ref,
 	}
 
 	flow_initiate_sa(flow, ref.udp.pif, &meta->s_in, ref.udp.port);
-	return udp_flow_new(c, flow, now);
+	return udp_flow_new(c, flow, ref.fd, now);
 }
 
 /**
@@ -647,55 +544,16 @@ static void udp_splice_prepare(struct mmsghdr *mmh, unsigned idx)
  * @now:	Timestamp
  */
 static void udp_splice_send(const struct ctx *c, size_t start, size_t n,
-			    in_port_t src, in_port_t dst,
-			    union epoll_ref ref,
-			    const struct timespec *now)
+			    flow_sidx_t tosidx)
 {
-	int s;
-
-	if (ref.udp.v6) {
-		udp_splice_to.sa6 = (struct sockaddr_in6) {
-			.sin6_family = AF_INET6,
-			.sin6_addr = in6addr_loopback,
-			.sin6_port = htons(dst),
-		};
-	} else {
-		udp_splice_to.sa4 = (struct sockaddr_in) {
-			.sin_family = AF_INET,
-			.sin_addr = in4addr_loopback,
-			.sin_port = htons(dst),
-		};
-	}
-
-	if (ref.udp.pif == PIF_SPLICE) {
-		src += c->udp.fwd_in.rdelta[src];
-		s = udp_splice_init[ref.udp.v6][src].sock;
-		if (s < 0 && ref.udp.orig)
-			s = udp_splice_new(c, ref.udp.v6, src, false);
-
-		if (s < 0)
-			return;
-
-		udp_splice_ns[ref.udp.v6][dst].ts = now->tv_sec;
-		udp_splice_init[ref.udp.v6][src].ts = now->tv_sec;
-	} else {
-		ASSERT(ref.udp.pif == PIF_HOST);
-		src += c->udp.fwd_out.rdelta[src];
-		s = udp_splice_ns[ref.udp.v6][src].sock;
-		if (s < 0 && ref.udp.orig) {
-			struct udp_splice_new_ns_arg arg = {
-				c, ref.udp.v6, src, -1,
-			};
-
-			NS_CALL(udp_splice_new_ns, &arg);
-			s = arg.s;
-		}
-		if (s < 0)
-			return;
+	const struct udp_flow *uflow = udp_at_sidx(tosidx);
+	const struct flowside *toside = &uflow->f.side[tosidx.side];
+	uint8_t topif = uflow->f.pif[tosidx.side];
+	int s = uflow->s[tosidx.side];
+	socklen_t sl;
 
-		udp_splice_init[ref.udp.v6][dst].ts = now->tv_sec;
-		udp_splice_ns[ref.udp.v6][src].ts = now->tv_sec;
-	}
+	pif_sockaddr(c, &udp_splice_to, &sl, topif,
+		     &toside->eaddr, toside->eport);
 
 	sendmmsg(s, udp_mh_splice + start, n, MSG_NOSIGNAL);
 }
@@ -922,20 +780,18 @@ void udp_buf_sock_handler(const struct ctx *c, union epoll_ref ref, uint32_t eve
 		dstport += c->udp.fwd_in.f.delta[dstport];
 
 	/* We divide datagrams into batches based on how we need to send them,
-	 * determined by udp_meta[i].splicesrc and udp_meta[i].tosidx.  To avoid
-	 * either two passes through the array, or recalculating splicesrc and
-	 * tosidxfor a single entry, we have to populate it one entry *ahead* of
-	 * the loop counter.
+	 * determined by udp_meta[i].tosidx.  To avoid either two passes through
+	 * the array, or recalculating tosidx for a single entry, we have to
+	 * populate it one entry *ahead* of the loop counter.
 	 */
-	udp_meta[0].splicesrc = udp_mmh_splice_port(ref, mmh_recv);
 	udp_meta[0].tosidx = udp_flow_from_sock(c, ref, &udp_meta[0], now);
 	for (i = 0; i < n; ) {
 		flow_sidx_t batchsidx = udp_meta[i].tosidx;
-		int batchsrc = udp_meta[i].splicesrc;
+		uint8_t batchpif = pif_at_sidx(batchsidx);
 		int batchstart = i;
 
 		do {
-			if (batchsrc >= 0)
+			if (pif_is_socket(batchpif))
 				udp_splice_prepare(mmh_recv, i);
 			else
 				udp_tap_prepare(c, mmh_recv, i, dstport,
@@ -944,23 +800,54 @@ void udp_buf_sock_handler(const struct ctx *c, union epoll_ref ref, uint32_t eve
 			if (++i >= n)
 				break;
 
-			udp_meta[i].splicesrc = udp_mmh_splice_port(ref,
-								    &mmh_recv[i]);
 			udp_meta[i].tosidx = udp_flow_from_sock(c, ref,
 								&udp_meta[i],
 								now);
-		} while (flow_sidx_eq(udp_meta[i].tosidx, batchsidx) &&
-			 udp_meta[i].splicesrc == batchsrc);
+		} while (flow_sidx_eq(udp_meta[i].tosidx, batchsidx));
 
-		if (batchsrc >= 0)
+		if (pif_is_socket(batchpif))
 			udp_splice_send(c, batchstart, i - batchstart,
-					batchsrc, dstport, ref, now);
+					batchsidx);
 		else
 			tap_send_frames(c, &udp_l2_iov[batchstart][0],
 					UDP_NUM_IOVS, i - batchstart);
 	}
 }
 
+/**
+ * udp_reply_sock_handler() - Handle new data from flow specific socket
+ * @c:		Execution context
+ * @ref:	epoll reference
+ * @events:	epoll events bitmap
+ * @now:	Current timestamp
+ *
+ * #syscalls recvmmsg
+ */
+void udp_reply_sock_handler(const struct ctx *c, union epoll_ref ref,
+			    uint32_t events, const struct timespec *now)
+{
+	flow_sidx_t tosidx = flow_sidx_opposite(ref.flowside);
+	struct udp_flow *uflow = udp_at_sidx(ref.flowside);
+	const struct flowside *fromside = &uflow->f.side[ref.flowside.side];
+	bool v6 = !inany_v4(&fromside->eaddr);
+	struct mmsghdr *mmh_recv = v6 ? udp6_mh_recv : udp4_mh_recv;
+	int from_s = uflow->s[ref.flowside.side];
+	int n, i;
+
+	ASSERT(!c->no_udp && uflow);
+
+	if ((n = udp_sock_recv(c, from_s, events, mmh_recv)) <= 0)
+		return;
+
+	flow_trace(uflow, "Received %d datagrams on reply socket", n);
+	uflow->ts = now->tv_sec;
+
+	for (i = 0; i < n; i++)
+		udp_splice_prepare(mmh_recv, i);
+
+	udp_splice_send(c, 0, n, tosidx);
+}
+
 /**
  * udp_tap_handler() - Handle packets from tap
  * @c:		Execution context
diff --git a/udp.h b/udp.h
index 5865def2..db5e546e 100644
--- a/udp.h
+++ b/udp.h
@@ -9,8 +9,10 @@
 #define UDP_TIMER_INTERVAL		1000 /* ms */
 
 void udp_portmap_clear(void);
-void udp_buf_sock_handler(const struct ctx *c, union epoll_ref ref, uint32_t events,
-		      const struct timespec *now);
+void udp_buf_sock_handler(const struct ctx *c, union epoll_ref ref,
+			  uint32_t events, const struct timespec *now);
+void udp_reply_sock_handler(const struct ctx *c, union epoll_ref ref,
+			    uint32_t events, const struct timespec *now);
 int udp_tap_handler(struct ctx *c, uint8_t pif, sa_family_t af,
 		    const void *saddr, const void *daddr,
 		    const struct pool *p, int idx, const struct timespec *now);
diff --git a/udp_flow.h b/udp_flow.h
index 18af9ac4..d5b8784f 100644
--- a/udp_flow.h
+++ b/udp_flow.h
@@ -11,12 +11,14 @@
  * struct udp - Descriptor for a flow of UDP packets
  * @f:		Generic flow information
  * @ts:		Activity timestamp
+ * @s:		Socket fd (or -1) for each side of the flow
  */
 struct udp_flow {
 	/* Must be first element */
 	struct flow_common f;
 
 	time_t ts;
+	int s[SIDES];
 };
 
 bool udp_flow_timer(const struct ctx *c, const struct udp_flow *uflow,
diff --git a/util.c b/util.c
index f2994a79..7c57ab15 100644
--- a/util.c
+++ b/util.c
@@ -61,6 +61,7 @@ int sock_l4_sa(const struct ctx *c, enum epoll_type type,
 		socktype = SOCK_STREAM | SOCK_NONBLOCK;
 		break;
 	case EPOLL_TYPE_UDP:
+	case EPOLL_TYPE_UDP_REPLY:
 		proto = IPPROTO_UDP;
 		socktype = SOCK_DGRAM | SOCK_NONBLOCK;
 		break;
-- 
@@ -61,6 +61,7 @@ int sock_l4_sa(const struct ctx *c, enum epoll_type type,
 		socktype = SOCK_STREAM | SOCK_NONBLOCK;
 		break;
 	case EPOLL_TYPE_UDP:
+	case EPOLL_TYPE_UDP_REPLY:
 		proto = IPPROTO_UDP;
 		socktype = SOCK_DGRAM | SOCK_NONBLOCK;
 		break;
-- 
2.45.2


  parent reply	other threads:[~2024-07-05  2:07 UTC|newest]

Thread overview: 59+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2024-07-05  2:06 [PATCH v7 00/27] Unified flow table David Gibson
2024-07-05  2:06 ` [PATCH v7 01/27] flow: Common address information for initiating side David Gibson
2024-07-05  2:06 ` [PATCH v7 02/27] flow: Common address information for target side David Gibson
2024-07-10 21:30   ` Stefano Brivio
2024-07-11  0:19     ` David Gibson
2024-07-05  2:07 ` [PATCH v7 03/27] tcp, flow: Remove redundant information, repack connection structures David Gibson
2024-07-05  2:07 ` [PATCH v7 04/27] tcp: Obtain guest address from flowside David Gibson
2024-07-05  2:07 ` [PATCH v7 05/27] tcp: Manage outbound address via flow table David Gibson
2024-07-05  2:07 ` [PATCH v7 06/27] tcp: Simplify endpoint validation using flowside information David Gibson
2024-07-05  2:07 ` [PATCH v7 07/27] tcp_splice: Eliminate SPLICE_V6 flag David Gibson
2024-07-05  2:07 ` [PATCH v7 08/27] tcp, flow: Replace TCP specific hash function with general flow hash David Gibson
2024-07-05  2:07 ` [PATCH v7 09/27] flow, tcp: Generalise TCP hash table to general flow hash table David Gibson
2024-07-05  2:07 ` [PATCH v7 10/27] tcp: Re-use flow hash for initial sequence number generation David Gibson
2024-07-05  2:07 ` [PATCH v7 11/27] icmp: Remove redundant id field from flow table entry David Gibson
2024-07-05  2:07 ` [PATCH v7 12/27] icmp: Obtain destination addresses from the flowsides David Gibson
2024-07-05  2:07 ` [PATCH v7 13/27] icmp: Look up ping flows using flow hash David Gibson
2024-07-05  2:07 ` [PATCH v7 14/27] icmp: Eliminate icmp_id_map David Gibson
2024-07-05  2:07 ` [PATCH v7 15/27] flow: Helper to create sockets based on flowside David Gibson
2024-07-10 21:32   ` Stefano Brivio
2024-07-11  0:21     ` David Gibson
2024-07-11  0:27     ` David Gibson
2024-07-05  2:07 ` [PATCH v7 16/27] icmp: Manage outbound socket address via flow table David Gibson
2024-07-05  2:07 ` [PATCH v7 17/27] flow, tcp: Flow based NAT and port forwarding for TCP David Gibson
2024-07-05  2:07 ` [PATCH v7 18/27] flow, icmp: Use general flow forwarding rules for ICMP David Gibson
2024-07-05  2:07 ` [PATCH v7 19/27] fwd: Update flow forwarding logic for UDP David Gibson
2024-07-08 21:26   ` Stefano Brivio
2024-07-09  0:19     ` David Gibson
2024-07-05  2:07 ` [PATCH v7 20/27] udp: Create flows for datagrams from originating sockets David Gibson
2024-07-09 22:32   ` Stefano Brivio
2024-07-09 23:59     ` David Gibson
2024-07-10 21:35       ` Stefano Brivio
2024-07-11  4:26         ` David Gibson
2024-07-11  8:20           ` Stefano Brivio
2024-07-11 22:58             ` David Gibson
2024-07-12  8:21               ` Stefano Brivio
2024-07-15  4:06                 ` David Gibson
2024-07-15 16:37                   ` Stefano Brivio
2024-07-17  0:49                     ` David Gibson
2024-07-05  2:07 ` David Gibson [this message]
2024-07-09 22:32   ` [PATCH v7 21/27] udp: Handle "spliced" datagrams with per-flow sockets Stefano Brivio
2024-07-10  0:23     ` David Gibson
2024-07-10 17:13       ` Stefano Brivio
2024-07-11  1:30         ` David Gibson
2024-07-11  8:23           ` Stefano Brivio
2024-07-11  2:48         ` David Gibson
2024-07-12 13:34   ` Stefano Brivio
2024-07-15  4:32     ` David Gibson
2024-07-05  2:07 ` [PATCH v7 22/27] udp: Remove obsolete splice tracking David Gibson
2024-07-10 21:36   ` Stefano Brivio
2024-07-11  0:43     ` David Gibson
2024-07-05  2:07 ` [PATCH v7 23/27] udp: Find or create flows for datagrams from tap interface David Gibson
2024-07-10 21:36   ` Stefano Brivio
2024-07-11  0:45     ` David Gibson
2024-07-05  2:07 ` [PATCH v7 24/27] udp: Direct datagrams from host to guest via flow table David Gibson
2024-07-10 21:37   ` Stefano Brivio
2024-07-11  0:46     ` David Gibson
2024-07-05  2:07 ` [PATCH v7 25/27] udp: Remove obsolete socket tracking David Gibson
2024-07-05  2:07 ` [PATCH v7 26/27] udp: Remove rdelta port forwarding maps David Gibson
2024-07-05  2:07 ` [PATCH v7 27/27] udp: Rename UDP listening sockets David Gibson

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=20240705020724.3447719-22-david@gibson.dropbear.id.au \
    --to=david@gibson.dropbear.id.au \
    --cc=jmaloy@redhat.com \
    --cc=passt-dev@passt.top \
    --cc=sbrivio@redhat.com \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
Code repositories for project(s) associated with this public inbox

	https://passt.top/passt

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for IMAP folder(s).