public inbox for passt-dev@passt.top
 help / color / mirror / code / Atom feed
From: Laurent Vivier <lvivier@redhat.com>
To: passt-dev@passt.top
Cc: Laurent Vivier <lvivier@redhat.com>
Subject: [PATCH v3 6/6] flow: Introduce flow_epoll_set() to centralize epoll operations
Date: Fri,  9 Jan 2026 17:54:38 +0100	[thread overview]
Message-ID: <20260109165438.2492285-7-lvivier@redhat.com> (raw)
In-Reply-To: <20260109165438.2492285-1-lvivier@redhat.com>

Currently, each flow type (TCP, TCP_SPLICE, PING, UDP) has its own
code to add or modify file descriptors in epoll. This leads to
duplicated boilerplate code across icmp.c, tcp.c, tcp_splice.c, and
udp_flow.c, each setting up epoll_ref unions and calling epoll_ctl()
with flow-type-specific details.

Introduce flow_epoll_set() in flow.c to handle epoll operations for
all flow types in a unified way.

This will be needed to migrate queue pair from an epollfd to another.

Signed-off-by: Laurent Vivier <lvivier@redhat.com>
---
 flow.c       | 37 ++++++++++++++++++++++++
 flow.h       |  2 ++
 icmp.c       | 10 ++-----
 tcp.c        | 48 ++++++++++++++++++------------
 tcp_splice.c | 82 ++++++++++++++++++++++++----------------------------
 udp_flow.c   | 11 ++-----
 6 files changed, 111 insertions(+), 79 deletions(-)

diff --git a/flow.c b/flow.c
index 4f53486586cd..cefe6c8b5b24 100644
--- a/flow.c
+++ b/flow.c
@@ -20,6 +20,7 @@
 #include "flow.h"
 #include "flow_table.h"
 #include "repair.h"
+#include "epoll_ctl.h"
 
 const char *flow_state_str[] = {
 	[FLOW_STATE_FREE]	= "FREE",
@@ -53,6 +54,16 @@ const uint8_t flow_proto[] = {
 static_assert(ARRAY_SIZE(flow_proto) == FLOW_NUM_TYPES,
 	      "flow_proto[] doesn't match enum flow_type");
 
+static const enum epoll_type flow_epoll[] = {
+	[FLOW_TCP]		= EPOLL_TYPE_TCP,
+	[FLOW_TCP_SPLICE]	= EPOLL_TYPE_TCP_SPLICE,
+	[FLOW_PING4]		= EPOLL_TYPE_PING,
+	[FLOW_PING6]		= EPOLL_TYPE_PING,
+	[FLOW_UDP]		= EPOLL_TYPE_UDP,
+};
+static_assert(ARRAY_SIZE(flow_epoll) == FLOW_NUM_TYPES,
+	      "flow_epoll[] doesn't match enum flow_type");
+
 #define foreach_established_tcp_flow(flow)				\
 	flow_foreach_of_type((flow), FLOW_TCP)				\
 		if (!tcp_flow_is_established(&(flow)->tcp))		\
@@ -390,6 +401,32 @@ void flow_epollid_clear(struct flow_common *f)
 	f->epollid = EPOLLFD_ID_INVALID;
 }
 
+/**
+ * flow_epoll_set() - Add or modify epoll registration for a flow socket
+ * @f:		Flow to register socket for
+ * @command:	epoll_ctl() command: EPOLL_CTL_ADD or EPOLL_CTL_MOD
+ * @events:	epoll events to watch for
+ * @fd:		File descriptor to register
+ * @sidei:	Side index of the flow
+ *
+ * Return: 0 on success, -1 on error (from epoll_ctl())
+ */
+int flow_epoll_set(const struct flow_common *f, int command, uint32_t events,
+		   int fd, unsigned int sidei)
+{
+	struct epoll_event ev;
+	union epoll_ref ref;
+
+	ref.fd = fd;
+	ref.type = flow_epoll[f->type];
+	ref.flowside = flow_sidx(f, sidei);
+
+	ev.events = events;
+	ev.data.u64 = ref.u64;
+
+	return epoll_ctl(flow_epollfd(f), command, fd, &ev);
+}
+
 /**
  * flow_epollid_register() - Initialize the epoll id -> fd mapping
  * @epollid:	epoll id to associate to
diff --git a/flow.h b/flow.h
index b43b0b1dd7f2..1b78d596fb17 100644
--- a/flow.h
+++ b/flow.h
@@ -265,6 +265,8 @@ bool flow_in_epoll(const struct flow_common *f);
 int flow_epollfd(const struct flow_common *f);
 void flow_epollid_set(struct flow_common *f, int epollid);
 void flow_epollid_clear(struct flow_common *f);
+int flow_epoll_set(const struct flow_common *f, int command, uint32_t events,
+		   int fd, unsigned int sidei);
 void flow_epollid_register(int epollid, int epollfd);
 void flow_defer_handler(const struct ctx *c, const struct timespec *now);
 int flow_migrate_source_early(struct ctx *c, const struct migrate_stage *stage,
diff --git a/icmp.c b/icmp.c
index 9564c4963f7b..eb7f11be5dad 100644
--- a/icmp.c
+++ b/icmp.c
@@ -177,7 +177,6 @@ static struct icmp_ping_flow *icmp_ping_new(const struct ctx *c,
 	union flow *flow = flow_alloc();
 	struct icmp_ping_flow *pingf;
 	const struct flowside *tgt;
-	union epoll_ref ref;
 
 	if (!flow)
 		return NULL;
@@ -211,13 +210,10 @@ static struct icmp_ping_flow *icmp_ping_new(const struct ctx *c,
 		goto cancel;
 
 	flow_epollid_set(&pingf->f, EPOLLFD_ID_DEFAULT);
-
-	ref.type = EPOLL_TYPE_PING;
-	ref.flowside = FLOW_SIDX(flow, TGTSIDE);
-	ref.fd = pingf->sock;
-
-	if (epoll_add(flow_epollfd(&pingf->f), EPOLLIN, ref) < 0) {
+	if (flow_epoll_set(&pingf->f, EPOLL_CTL_ADD, EPOLLIN, pingf->sock,
+			   TGTSIDE) < 0) {
 		close(pingf->sock);
+		flow_epollid_clear(&pingf->f);
 		goto cancel;
 	}
 
diff --git a/tcp.c b/tcp.c
index 4b746b4ca16c..e7f284be6a52 100644
--- a/tcp.c
+++ b/tcp.c
@@ -523,34 +523,44 @@ static uint32_t tcp_conn_epoll_events(uint8_t events, uint8_t conn_flags)
 
 /**
  * tcp_epoll_ctl() - Add/modify/delete epoll state from connection events
- * @c:		Execution context
  * @conn:	Connection pointer
  *
  * Return: 0 on success, negative error code on failure (not on deletion)
  */
-static int tcp_epoll_ctl(const struct ctx *c, struct tcp_tap_conn *conn)
+static int tcp_epoll_ctl(struct tcp_tap_conn *conn)
 {
-	int m = flow_in_epoll(&conn->f) ? EPOLL_CTL_MOD : EPOLL_CTL_ADD;
-	union epoll_ref ref = { .type = EPOLL_TYPE_TCP, .fd = conn->sock,
-		                .flowside = FLOW_SIDX(conn, !TAPSIDE(conn)), };
-	struct epoll_event ev = { .data.u64 = ref.u64 };
-	int epollfd = flow_in_epoll(&conn->f) ? flow_epollfd(&conn->f)
-					      : c->epollfd;
+	uint32_t events;
+	int m;
 
 	if (conn->events == CLOSED) {
-		if (flow_in_epoll(&conn->f))
+		if (flow_in_epoll(&conn->f)) {
+			int epollfd = flow_epollfd(&conn->f);
+
 			epoll_del(epollfd, conn->sock);
-		if (conn->timer != -1)
-			epoll_del(epollfd, conn->timer);
+			if (conn->timer != -1)
+				epoll_del(epollfd, conn->timer);
+		}
+
 		return 0;
 	}
 
-	ev.events = tcp_conn_epoll_events(conn->events, conn->flags);
+	events = tcp_conn_epoll_events(conn->events, conn->flags);
 
-	if (epoll_ctl(epollfd, m, conn->sock, &ev))
-		return -errno;
+	if (flow_in_epoll(&conn->f)) {
+		m = EPOLL_CTL_MOD;
+	} else {
+		flow_epollid_set(&conn->f, EPOLLFD_ID_DEFAULT);
+		m = EPOLL_CTL_ADD;
+	}
 
-	flow_epollid_set(&conn->f, EPOLLFD_ID_DEFAULT);
+	if (flow_epoll_set(&conn->f, m, events, conn->sock,
+			   !TAPSIDE(conn)) < 0) {
+		int ret = -errno;
+
+		if (m == EPOLL_CTL_ADD)
+			flow_epollid_clear(&conn->f);
+		return ret;
+	}
 
 	return 0;
 }
@@ -671,7 +681,7 @@ void conn_flag_do(const struct ctx *c, struct tcp_tap_conn *conn,
 	}
 
 	if (flag == STALLED || flag == ~STALLED)
-		tcp_epoll_ctl(c, conn);
+		tcp_epoll_ctl(conn);
 
 	if (flag == ACK_FROM_TAP_DUE || flag == ACK_TO_TAP_DUE		  ||
 	    (flag == ~ACK_FROM_TAP_DUE && (conn->flags & ACK_TO_TAP_DUE)) ||
@@ -728,7 +738,7 @@ void conn_event_do(const struct ctx *c, struct tcp_tap_conn *conn,
 	} else {
 		if (event == CLOSED)
 			flow_hash_remove(c, TAP_SIDX(conn));
-		tcp_epoll_ctl(c, conn);
+		tcp_epoll_ctl(conn);
 	}
 
 	if (CONN_HAS(conn, SOCK_FIN_SENT | TAP_FIN_ACKED))
@@ -1744,7 +1754,7 @@ static void tcp_conn_from_tap(const struct ctx *c, sa_family_t af,
 		conn_event(c, conn, TAP_SYN_ACK_SENT);
 	}
 
-	tcp_epoll_ctl(c, conn);
+	tcp_epoll_ctl(conn);
 
 	if (c->mode == MODE_VU) { /* To rebind to same oport after migration */
 		socklen_t sl = sizeof(sa);
@@ -3986,7 +3996,7 @@ int tcp_flow_migrate_target_ext(struct ctx *c, struct tcp_tap_conn *conn, int fd
 	tcp_send_flag(c, conn, ACK);
 	tcp_data_from_sock(c, conn);
 
-	if ((rc = tcp_epoll_ctl(c, conn))) {
+	if ((rc = tcp_epoll_ctl(conn))) {
 		flow_dbg(conn,
 			 "Failed to subscribe to epoll for migrated socket: %s",
 			 strerror_(-rc));
diff --git a/tcp_splice.c b/tcp_splice.c
index bf4ff466de07..a7c04ca8652a 100644
--- a/tcp_splice.c
+++ b/tcp_splice.c
@@ -135,37 +135,31 @@ static uint32_t tcp_splice_conn_epoll_events(uint16_t events, unsigned sidei)
 
 /**
  * tcp_splice_epoll_ctl() - Add/modify/delete epoll state from connection events
- * @c:		Execution context
  * @conn:	Connection pointer
  *
  * Return: 0 on success, negative error code on failure (not on deletion)
  */
-static int tcp_splice_epoll_ctl(const struct ctx *c,
-				struct tcp_splice_conn *conn)
+static int tcp_splice_epoll_ctl(struct tcp_splice_conn *conn)
 {
-	int epollfd = flow_in_epoll(&conn->f) ? flow_epollfd(&conn->f)
-					      : c->epollfd;
-	int m = flow_in_epoll(&conn->f) ? EPOLL_CTL_MOD : EPOLL_CTL_ADD;
-	const union epoll_ref ref[SIDES] = {
-		{ .type = EPOLL_TYPE_TCP_SPLICE, .fd = conn->s[0],
-		  .flowside = FLOW_SIDX(conn, 0) },
-		{ .type = EPOLL_TYPE_TCP_SPLICE, .fd = conn->s[1],
-		  .flowside = FLOW_SIDX(conn, 1) }
-	};
-	struct epoll_event ev[SIDES] = { { .data.u64 = ref[0].u64 },
-					 { .data.u64 = ref[1].u64 } };
-
-	ev[0].events = tcp_splice_conn_epoll_events(conn->events, 0);
-	ev[1].events = tcp_splice_conn_epoll_events(conn->events, 1);
-
-
-	if (epoll_ctl(epollfd, m, conn->s[0], &ev[0]) ||
-	    epoll_ctl(epollfd, m, conn->s[1], &ev[1])) {
+	uint32_t events[2];
+	int m;
+
+	if (flow_in_epoll(&conn->f)) {
+		m = EPOLL_CTL_MOD;
+	} else {
+		flow_epollid_set(&conn->f, EPOLLFD_ID_DEFAULT);
+		m = EPOLL_CTL_ADD;
+	}
+
+	events[0] = tcp_splice_conn_epoll_events(conn->events, 0);
+	events[1] = tcp_splice_conn_epoll_events(conn->events, 1);
+
+	if (flow_epoll_set(&conn->f, m, events[0], conn->s[0], 0) ||
+	    flow_epoll_set(&conn->f, m, events[1], conn->s[1], 1)) {
 		int ret = -errno;
 		flow_perror(conn, "ERROR on epoll_ctl()");
 		return ret;
 	}
-	flow_epollid_set(&conn->f, EPOLLFD_ID_DEFAULT);
 
 	return 0;
 }
@@ -205,7 +199,7 @@ static void conn_flag_do(struct tcp_splice_conn *conn,
 	}
 }
 
-#define conn_flag(c, conn, flag)					\
+#define conn_flag(conn, flag)					\
 	do {								\
 		flow_trace(conn, "flag at %s:%i", __func__, __LINE__);	\
 		conn_flag_do(conn, flag);				\
@@ -213,12 +207,10 @@ static void conn_flag_do(struct tcp_splice_conn *conn,
 
 /**
  * conn_event_do() - Set and log connection events, update epoll state
- * @c:		Execution context
  * @conn:	Connection pointer
  * @event:	Connection event
  */
-static void conn_event_do(const struct ctx *c, struct tcp_splice_conn *conn,
-			  unsigned long event)
+static void conn_event_do(struct tcp_splice_conn *conn, unsigned long event)
 {
 	if (event & (event - 1)) {
 		int flag_index = fls(~event);
@@ -240,14 +232,14 @@ static void conn_event_do(const struct ctx *c, struct tcp_splice_conn *conn,
 			flow_dbg(conn, "%s", tcp_splice_event_str[flag_index]);
 	}
 
-	if (tcp_splice_epoll_ctl(c, conn))
-		conn_flag(c, conn, CLOSING);
+	if (tcp_splice_epoll_ctl(conn))
+		conn_flag(conn, CLOSING);
 }
 
-#define conn_event(c, conn, event)					\
+#define conn_event(conn, event)					\
 	do {								\
 		flow_trace(conn, "event at %s:%i",__func__, __LINE__);	\
-		conn_event_do(c, conn, event);				\
+		conn_event_do(conn, event);				\
 	} while (0)
 
 
@@ -315,7 +307,7 @@ static int tcp_splice_connect_finish(const struct ctx *c,
 			if (pipe2(conn->pipe[sidei], O_NONBLOCK | O_CLOEXEC)) {
 				flow_perror(conn, "cannot create %d->%d pipe",
 					    sidei, !sidei);
-				conn_flag(c, conn, CLOSING);
+				conn_flag(conn, CLOSING);
 				return -EIO;
 			}
 
@@ -329,7 +321,7 @@ static int tcp_splice_connect_finish(const struct ctx *c,
 	}
 
 	if (!(conn->events & SPLICE_ESTABLISHED))
-		conn_event(c, conn, SPLICE_ESTABLISHED);
+		conn_event(conn, SPLICE_ESTABLISHED);
 
 	return 0;
 }
@@ -376,7 +368,7 @@ static int tcp_splice_connect(const struct ctx *c, struct tcp_splice_conn *conn)
 
 	pif_sockaddr(c, &sa, tgtpif, &tgt->eaddr, tgt->eport);
 
-	conn_event(c, conn, SPLICE_CONNECT);
+	conn_event(conn, SPLICE_CONNECT);
 
 	if (connect(conn->s[1], &sa.sa, socklen_inany(&sa))) {
 		if (errno != EINPROGRESS) {
@@ -385,7 +377,7 @@ static int tcp_splice_connect(const struct ctx *c, struct tcp_splice_conn *conn)
 			return -errno;
 		}
 	} else {
-		conn_event(c, conn, SPLICE_ESTABLISHED);
+		conn_event(conn, SPLICE_ESTABLISHED);
 		return tcp_splice_connect_finish(c, conn);
 	}
 
@@ -445,7 +437,7 @@ void tcp_splice_conn_from_sock(const struct ctx *c, union flow *flow, int s0)
 		flow_trace(conn, "failed to set TCP_QUICKACK on %i", s0);
 
 	if (tcp_splice_connect(c, conn))
-		conn_flag(c, conn, CLOSING);
+		conn_flag(conn, CLOSING);
 
 	FLOW_ACTIVATE(conn);
 }
@@ -494,14 +486,14 @@ void tcp_splice_sock_handler(struct ctx *c, union epoll_ref ref,
 
 	if (events & EPOLLOUT) {
 		fromsidei = !evsidei;
-		conn_event(c, conn, ~OUT_WAIT(evsidei));
+		conn_event(conn, ~OUT_WAIT(evsidei));
 	} else {
 		fromsidei = evsidei;
 	}
 
 	if (events & EPOLLRDHUP)
 		/* For side 0 this is fake, but implied */
-		conn_event(c, conn, FIN_RCVD(evsidei));
+		conn_event(conn, FIN_RCVD(evsidei));
 
 swap:
 	eof = 0;
@@ -536,7 +528,7 @@ retry:
 				more = SPLICE_F_MORE;
 
 			if (conn->flags & lowat_set_flag)
-				conn_flag(c, conn, lowat_act_flag);
+				conn_flag(conn, lowat_act_flag);
 		}
 
 		do
@@ -568,8 +560,8 @@ retry:
 						   "Setting SO_RCVLOWAT %i: %s",
 						   lowat, strerror_(errno));
 				} else {
-					conn_flag(c, conn, lowat_set_flag);
-					conn_flag(c, conn, lowat_act_flag);
+					conn_flag(conn, lowat_set_flag);
+					conn_flag(conn, lowat_act_flag);
 				}
 			}
 
@@ -583,7 +575,7 @@ retry:
 			if (conn->read[fromsidei] == conn->written[fromsidei])
 				break;
 
-			conn_event(c, conn, OUT_WAIT(!fromsidei));
+			conn_event(conn, OUT_WAIT(!fromsidei));
 			break;
 		}
 
@@ -605,7 +597,7 @@ retry:
 			if ((conn->events & FIN_RCVD(sidei)) &&
 			    !(conn->events & FIN_SENT(!sidei))) {
 				shutdown(conn->s[!sidei], SHUT_WR);
-				conn_event(c, conn, FIN_SENT(!sidei));
+				conn_event(conn, FIN_SENT(!sidei));
 			}
 		}
 	}
@@ -626,7 +618,7 @@ retry:
 	return;
 
 close:
-	conn_flag(c, conn, CLOSING);
+	conn_flag(conn, CLOSING);
 }
 
 /**
@@ -762,10 +754,10 @@ void tcp_splice_timer(struct tcp_splice_conn *conn)
 				flow_trace(conn, "can't set SO_RCVLOWAT on %d",
 					   conn->s[sidei]);
 			}
-			conn_flag(c, conn, ~RCVLOWAT_SET(sidei));
+			conn_flag(conn, ~RCVLOWAT_SET(sidei));
 		}
 	}
 
 	flow_foreach_sidei(sidei)
-		conn_flag(c, conn, ~RCVLOWAT_ACT(sidei));
+		conn_flag(conn, ~RCVLOWAT_ACT(sidei));
 }
diff --git a/udp_flow.c b/udp_flow.c
index c4cf35c2c89d..80b15433f0ac 100644
--- a/udp_flow.c
+++ b/udp_flow.c
@@ -74,7 +74,6 @@ static int udp_flow_sock(const struct ctx *c,
 {
 	const struct flowside *side = &uflow->f.side[sidei];
 	uint8_t pif = uflow->f.pif[sidei];
-	union epoll_ref ref;
 	int rc;
 	int s;
 
@@ -84,14 +83,10 @@ static int udp_flow_sock(const struct ctx *c,
 		return s;
 	}
 
-	ref.type = EPOLL_TYPE_UDP;
-	ref.flowside = FLOW_SIDX(uflow, sidei);
-	ref.fd = s;
-
 	flow_epollid_set(&uflow->f, EPOLLFD_ID_DEFAULT);
-
-	rc = epoll_add(flow_epollfd(&uflow->f), EPOLLIN, ref);
-	if (rc < 0) {
+	if (flow_epoll_set(&uflow->f, EPOLL_CTL_ADD, EPOLLIN, s, sidei) < 0) {
+		rc = -errno;
+		flow_epollid_clear(&uflow->f);
 		close(s);
 		return rc;
 	}
-- 
2.52.0


      parent reply	other threads:[~2026-01-09 16:54 UTC|newest]

Thread overview: 7+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2026-01-09 16:54 [PATCH v3 0/6] " Laurent Vivier
2026-01-09 16:54 ` [PATCH v3 1/6] tcp: remove timer update in tcp_epoll_ctl() Laurent Vivier
2026-01-09 16:54 ` [PATCH v3 2/6] tcp: cleanup timer creation Laurent Vivier
2026-01-09 16:54 ` [PATCH v3 3/6] udp_flow: remove unneeded epoll_ref indirection Laurent Vivier
2026-01-09 16:54 ` [PATCH v3 4/6] udp_flow: Assign socket to flow inside udp_flow_sock() Laurent Vivier
2026-01-09 16:54 ` [PATCH v3 5/6] tcp_splice: Refactor tcp_splice_conn_epoll_events() to per-side computation Laurent Vivier
2026-01-09 16:54 ` Laurent Vivier [this message]

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=20260109165438.2492285-7-lvivier@redhat.com \
    --to=lvivier@redhat.com \
    --cc=passt-dev@passt.top \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
Code repositories for project(s) associated with this public inbox

	https://passt.top/passt

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for IMAP folder(s).