From: Laurent Vivier <lvivier@redhat.com>
To: passt-dev@passt.top
Cc: Laurent Vivier <lvivier@redhat.com>
Subject: [PATCH v2 5/5] flow: Introduce flow_epoll_set() to centralize epoll operations
Date: Thu, 8 Jan 2026 15:01:19 +0100 [thread overview]
Message-ID: <20260108140119.1204797-6-lvivier@redhat.com> (raw)
In-Reply-To: <20260108140119.1204797-1-lvivier@redhat.com>
Currently, each flow type (TCP, TCP_SPLICE, PING, UDP) has its own
code to add or modify file descriptors in epoll. This leads to
duplicated boilerplate code across icmp.c, tcp.c, tcp_splice.c, and
udp_flow.c, each setting up epoll_ref unions and calling epoll_ctl()
with flow-type-specific details.
Introduce flow_epoll_set() in flow.c to handle epoll operations for
all flow types in a unified way. The function takes an explicit epoll
type parameter, allowing it to handle not only flow socket types but
also the TCP timer (EPOLL_TYPE_TCP_TIMER).
This will be needed to migrate queue pair from an epollfd to another.
Signed-off-by: Laurent Vivier <lvivier@redhat.com>
---
flow.c | 33 ++++++++++++++++++++
flow.h | 3 ++
icmp.c | 11 +++----
tcp.c | 64 ++++++++++++++++++++------------------
tcp_splice.c | 86 +++++++++++++++++++++++++---------------------------
udp_flow.c | 12 +++-----
6 files changed, 119 insertions(+), 90 deletions(-)
diff --git a/flow.c b/flow.c
index 4f53486586cd..73d24f9d3765 100644
--- a/flow.c
+++ b/flow.c
@@ -20,6 +20,7 @@
#include "flow.h"
#include "flow_table.h"
#include "repair.h"
+#include "epoll_ctl.h"
const char *flow_state_str[] = {
[FLOW_STATE_FREE] = "FREE",
@@ -390,6 +391,38 @@ void flow_epollid_clear(struct flow_common *f)
f->epollid = EPOLLFD_ID_INVALID;
}
+/**
+ * flow_epoll_set() - Add or modify epoll registration for a flow socket
+ * @command: epoll_ctl() command: EPOLL_CTL_ADD or EPOLL_CTL_MOD
+ * @type: epoll type
+ * @f: Flow to register socket for
+ * @events: epoll events to watch for
+ * @fd: File descriptor to register
+ * @sidei: Side index of the flow
+ *
+ * Return: 0 on success, -1 on error (from epoll_ctl())
+ */
+int flow_epoll_set(int command, enum epoll_type type,
+ const struct flow_common *f, uint32_t events, int fd,
+ unsigned int sidei)
+{
+ struct epoll_event ev;
+ union epoll_ref ref;
+
+ ref.type = type;
+ ref.fd = fd;
+
+ if (type == EPOLL_TYPE_TCP_TIMER)
+ ref.flow = flow_idx(f);
+ else
+ ref.flowside = flow_sidx(f, sidei);
+
+ ev.events = events;
+ ev.data.u64 = ref.u64;
+
+ return epoll_ctl(flow_epollfd(f), command, fd, &ev);
+}
+
/**
* flow_epollid_register() - Initialize the epoll id -> fd mapping
* @epollid: epoll id to associate to
diff --git a/flow.h b/flow.h
index b43b0b1dd7f2..bd5c6d90322b 100644
--- a/flow.h
+++ b/flow.h
@@ -265,6 +265,9 @@ bool flow_in_epoll(const struct flow_common *f);
int flow_epollfd(const struct flow_common *f);
void flow_epollid_set(struct flow_common *f, int epollid);
void flow_epollid_clear(struct flow_common *f);
+int flow_epoll_set(int command, enum epoll_type type,
+ const struct flow_common *f, uint32_t events, int fd,
+ unsigned int sidei);
void flow_epollid_register(int epollid, int epollfd);
void flow_defer_handler(const struct ctx *c, const struct timespec *now);
int flow_migrate_source_early(struct ctx *c, const struct migrate_stage *stage,
diff --git a/icmp.c b/icmp.c
index 9564c4963f7b..b6bb36d80715 100644
--- a/icmp.c
+++ b/icmp.c
@@ -177,7 +177,6 @@ static struct icmp_ping_flow *icmp_ping_new(const struct ctx *c,
union flow *flow = flow_alloc();
struct icmp_ping_flow *pingf;
const struct flowside *tgt;
- union epoll_ref ref;
if (!flow)
return NULL;
@@ -211,13 +210,11 @@ static struct icmp_ping_flow *icmp_ping_new(const struct ctx *c,
goto cancel;
flow_epollid_set(&pingf->f, EPOLLFD_ID_DEFAULT);
-
- ref.type = EPOLL_TYPE_PING;
- ref.flowside = FLOW_SIDX(flow, TGTSIDE);
- ref.fd = pingf->sock;
-
- if (epoll_add(flow_epollfd(&pingf->f), EPOLLIN, ref) < 0) {
+ if (flow_epoll_set(EPOLL_CTL_ADD, EPOLL_TYPE_PING, &pingf->f,
+ EPOLLIN, pingf->sock,
+ TGTSIDE) < 0) {
close(pingf->sock);
+ flow_epollid_clear(&pingf->f);
goto cancel;
}
diff --git a/tcp.c b/tcp.c
index 5141cdc7e839..d8cc11b377de 100644
--- a/tcp.c
+++ b/tcp.c
@@ -523,34 +523,44 @@ static uint32_t tcp_conn_epoll_events(uint8_t events, uint8_t conn_flags)
/**
* tcp_epoll_ctl() - Add/modify/delete epoll state from connection events
- * @c: Execution context
* @conn: Connection pointer
*
* Return: 0 on success, negative error code on failure (not on deletion)
*/
-static int tcp_epoll_ctl(const struct ctx *c, struct tcp_tap_conn *conn)
+static int tcp_epoll_ctl(struct tcp_tap_conn *conn)
{
- int m = flow_in_epoll(&conn->f) ? EPOLL_CTL_MOD : EPOLL_CTL_ADD;
- union epoll_ref ref = { .type = EPOLL_TYPE_TCP, .fd = conn->sock,
- .flowside = FLOW_SIDX(conn, !TAPSIDE(conn)), };
- struct epoll_event ev = { .data.u64 = ref.u64 };
- int epollfd = flow_in_epoll(&conn->f) ? flow_epollfd(&conn->f)
- : c->epollfd;
+ uint32_t events;
+ int m;
if (conn->events == CLOSED) {
- if (flow_in_epoll(&conn->f))
+ if (flow_in_epoll(&conn->f)) {
+ int epollfd = flow_epollfd(&conn->f);
+
epoll_del(epollfd, conn->sock);
- if (conn->timer != -1)
- epoll_del(epollfd, conn->timer);
+ if (conn->timer != -1)
+ epoll_del(epollfd, conn->timer);
+ }
+
return 0;
}
- ev.events = tcp_conn_epoll_events(conn->events, conn->flags);
+ events = tcp_conn_epoll_events(conn->events, conn->flags);
- if (epoll_ctl(epollfd, m, conn->sock, &ev))
- return -errno;
+ if (flow_in_epoll(&conn->f)) {
+ m = EPOLL_CTL_MOD;
+ } else {
+ flow_epollid_set(&conn->f, EPOLLFD_ID_DEFAULT);
+ m = EPOLL_CTL_ADD;
+ }
+
+ if (flow_epoll_set(m, EPOLL_TYPE_TCP, &conn->f, events, conn->sock,
+ !TAPSIDE(conn)) < 0) {
+ int ret = -errno;
- flow_epollid_set(&conn->f, EPOLLFD_ID_DEFAULT);
+ if (m == EPOLL_CTL_ADD)
+ flow_epollid_clear(&conn->f);
+ return ret;
+ }
return 0;
}
@@ -569,11 +579,6 @@ static void tcp_timer_ctl(const struct ctx *c, struct tcp_tap_conn *conn)
return;
if (conn->timer == -1) {
- union epoll_ref ref = { .type = EPOLL_TYPE_TCP_TIMER,
- .flow = FLOW_IDX(conn) };
- struct epoll_event ev = { .data.u64 = ref.u64,
- .events = EPOLLIN | EPOLLET };
- int epollfd = flow_epollfd(&conn->f);
int fd;
fd = timerfd_create(CLOCK_MONOTONIC, 0);
@@ -581,18 +586,17 @@ static void tcp_timer_ctl(const struct ctx *c, struct tcp_tap_conn *conn)
flow_dbg_perror(conn, "failed to get timer");
if (fd > -1)
close(fd);
- conn->timer = -1;
return;
}
- conn->timer = fd;
- ref.fd = conn->timer;
- if (epoll_ctl(epollfd, EPOLL_CTL_ADD, conn->timer, &ev)) {
+ if (flow_epoll_set(EPOLL_CTL_ADD, EPOLL_TYPE_TCP_TIMER,
+ &conn->f, EPOLLIN | EPOLLET, fd, 0) < 0) {
flow_dbg_perror(conn, "failed to add timer");
- close(conn->timer);
- conn->timer = -1;
+ close(fd);
return;
}
+
+ conn->timer = fd;
}
if (conn->flags & ACK_TO_TAP_DUE) {
@@ -669,7 +673,7 @@ void conn_flag_do(const struct ctx *c, struct tcp_tap_conn *conn,
}
if (flag == STALLED || flag == ~STALLED)
- tcp_epoll_ctl(c, conn);
+ tcp_epoll_ctl(conn);
if (flag == ACK_FROM_TAP_DUE || flag == ACK_TO_TAP_DUE ||
(flag == ~ACK_FROM_TAP_DUE && (conn->flags & ACK_TO_TAP_DUE)) ||
@@ -726,7 +730,7 @@ void conn_event_do(const struct ctx *c, struct tcp_tap_conn *conn,
} else {
if (event == CLOSED)
flow_hash_remove(c, TAP_SIDX(conn));
- tcp_epoll_ctl(c, conn);
+ tcp_epoll_ctl(conn);
}
if (CONN_HAS(conn, SOCK_FIN_SENT | TAP_FIN_ACKED))
@@ -1742,7 +1746,7 @@ static void tcp_conn_from_tap(const struct ctx *c, sa_family_t af,
conn_event(c, conn, TAP_SYN_ACK_SENT);
}
- tcp_epoll_ctl(c, conn);
+ tcp_epoll_ctl(conn);
if (c->mode == MODE_VU) { /* To rebind to same oport after migration */
socklen_t sl = sizeof(sa);
@@ -3984,7 +3988,7 @@ int tcp_flow_migrate_target_ext(struct ctx *c, struct tcp_tap_conn *conn, int fd
tcp_send_flag(c, conn, ACK);
tcp_data_from_sock(c, conn);
- if ((rc = tcp_epoll_ctl(c, conn))) {
+ if ((rc = tcp_epoll_ctl(conn))) {
flow_dbg(conn,
"Failed to subscribe to epoll for migrated socket: %s",
strerror_(-rc));
diff --git a/tcp_splice.c b/tcp_splice.c
index bf4ff466de07..26e9845c39ee 100644
--- a/tcp_splice.c
+++ b/tcp_splice.c
@@ -135,37 +135,35 @@ static uint32_t tcp_splice_conn_epoll_events(uint16_t events, unsigned sidei)
/**
* tcp_splice_epoll_ctl() - Add/modify/delete epoll state from connection events
- * @c: Execution context
* @conn: Connection pointer
*
* Return: 0 on success, negative error code on failure (not on deletion)
*/
-static int tcp_splice_epoll_ctl(const struct ctx *c,
- struct tcp_splice_conn *conn)
+static int tcp_splice_epoll_ctl(struct tcp_splice_conn *conn)
{
- int epollfd = flow_in_epoll(&conn->f) ? flow_epollfd(&conn->f)
- : c->epollfd;
- int m = flow_in_epoll(&conn->f) ? EPOLL_CTL_MOD : EPOLL_CTL_ADD;
- const union epoll_ref ref[SIDES] = {
- { .type = EPOLL_TYPE_TCP_SPLICE, .fd = conn->s[0],
- .flowside = FLOW_SIDX(conn, 0) },
- { .type = EPOLL_TYPE_TCP_SPLICE, .fd = conn->s[1],
- .flowside = FLOW_SIDX(conn, 1) }
- };
- struct epoll_event ev[SIDES] = { { .data.u64 = ref[0].u64 },
- { .data.u64 = ref[1].u64 } };
-
- ev[0].events = tcp_splice_conn_epoll_events(conn->events, 0);
- ev[1].events = tcp_splice_conn_epoll_events(conn->events, 1);
-
-
- if (epoll_ctl(epollfd, m, conn->s[0], &ev[0]) ||
- epoll_ctl(epollfd, m, conn->s[1], &ev[1])) {
+ uint32_t events[2];
+ int m;
+
+ if (flow_in_epoll(&conn->f)) {
+ m = EPOLL_CTL_MOD;
+ } else {
+ flow_epollid_set(&conn->f, EPOLLFD_ID_DEFAULT);
+ m = EPOLL_CTL_ADD;
+ }
+
+ events[0] = tcp_splice_conn_epoll_events(conn->events, 0);
+ events[1] = tcp_splice_conn_epoll_events(conn->events, 1);
+
+ if (flow_epoll_set(m, EPOLL_TYPE_TCP_SPLICE, &conn->f, events[0],
+ conn->s[0], 0) ||
+ flow_epoll_set(m, EPOLL_TYPE_TCP_SPLICE, &conn->f, events[1],
+ conn->s[1], 1)) {
int ret = -errno;
flow_perror(conn, "ERROR on epoll_ctl()");
+ if (m == EPOLL_CTL_ADD)
+ flow_epollid_clear(&conn->f);
return ret;
}
- flow_epollid_set(&conn->f, EPOLLFD_ID_DEFAULT);
return 0;
}
@@ -205,7 +203,7 @@ static void conn_flag_do(struct tcp_splice_conn *conn,
}
}
-#define conn_flag(c, conn, flag) \
+#define conn_flag(conn, flag) \
do { \
flow_trace(conn, "flag at %s:%i", __func__, __LINE__); \
conn_flag_do(conn, flag); \
@@ -213,12 +211,10 @@ static void conn_flag_do(struct tcp_splice_conn *conn,
/**
* conn_event_do() - Set and log connection events, update epoll state
- * @c: Execution context
* @conn: Connection pointer
* @event: Connection event
*/
-static void conn_event_do(const struct ctx *c, struct tcp_splice_conn *conn,
- unsigned long event)
+static void conn_event_do(struct tcp_splice_conn *conn, unsigned long event)
{
if (event & (event - 1)) {
int flag_index = fls(~event);
@@ -240,14 +236,14 @@ static void conn_event_do(const struct ctx *c, struct tcp_splice_conn *conn,
flow_dbg(conn, "%s", tcp_splice_event_str[flag_index]);
}
- if (tcp_splice_epoll_ctl(c, conn))
- conn_flag(c, conn, CLOSING);
+ if (tcp_splice_epoll_ctl(conn))
+ conn_flag(conn, CLOSING);
}
-#define conn_event(c, conn, event) \
+#define conn_event(conn, event) \
do { \
flow_trace(conn, "event at %s:%i",__func__, __LINE__); \
- conn_event_do(c, conn, event); \
+ conn_event_do(conn, event); \
} while (0)
@@ -315,7 +311,7 @@ static int tcp_splice_connect_finish(const struct ctx *c,
if (pipe2(conn->pipe[sidei], O_NONBLOCK | O_CLOEXEC)) {
flow_perror(conn, "cannot create %d->%d pipe",
sidei, !sidei);
- conn_flag(c, conn, CLOSING);
+ conn_flag(conn, CLOSING);
return -EIO;
}
@@ -329,7 +325,7 @@ static int tcp_splice_connect_finish(const struct ctx *c,
}
if (!(conn->events & SPLICE_ESTABLISHED))
- conn_event(c, conn, SPLICE_ESTABLISHED);
+ conn_event(conn, SPLICE_ESTABLISHED);
return 0;
}
@@ -376,7 +372,7 @@ static int tcp_splice_connect(const struct ctx *c, struct tcp_splice_conn *conn)
pif_sockaddr(c, &sa, tgtpif, &tgt->eaddr, tgt->eport);
- conn_event(c, conn, SPLICE_CONNECT);
+ conn_event(conn, SPLICE_CONNECT);
if (connect(conn->s[1], &sa.sa, socklen_inany(&sa))) {
if (errno != EINPROGRESS) {
@@ -385,7 +381,7 @@ static int tcp_splice_connect(const struct ctx *c, struct tcp_splice_conn *conn)
return -errno;
}
} else {
- conn_event(c, conn, SPLICE_ESTABLISHED);
+ conn_event(conn, SPLICE_ESTABLISHED);
return tcp_splice_connect_finish(c, conn);
}
@@ -445,7 +441,7 @@ void tcp_splice_conn_from_sock(const struct ctx *c, union flow *flow, int s0)
flow_trace(conn, "failed to set TCP_QUICKACK on %i", s0);
if (tcp_splice_connect(c, conn))
- conn_flag(c, conn, CLOSING);
+ conn_flag(conn, CLOSING);
FLOW_ACTIVATE(conn);
}
@@ -494,14 +490,14 @@ void tcp_splice_sock_handler(struct ctx *c, union epoll_ref ref,
if (events & EPOLLOUT) {
fromsidei = !evsidei;
- conn_event(c, conn, ~OUT_WAIT(evsidei));
+ conn_event(conn, ~OUT_WAIT(evsidei));
} else {
fromsidei = evsidei;
}
if (events & EPOLLRDHUP)
/* For side 0 this is fake, but implied */
- conn_event(c, conn, FIN_RCVD(evsidei));
+ conn_event(conn, FIN_RCVD(evsidei));
swap:
eof = 0;
@@ -536,7 +532,7 @@ retry:
more = SPLICE_F_MORE;
if (conn->flags & lowat_set_flag)
- conn_flag(c, conn, lowat_act_flag);
+ conn_flag(conn, lowat_act_flag);
}
do
@@ -568,8 +564,8 @@ retry:
"Setting SO_RCVLOWAT %i: %s",
lowat, strerror_(errno));
} else {
- conn_flag(c, conn, lowat_set_flag);
- conn_flag(c, conn, lowat_act_flag);
+ conn_flag(conn, lowat_set_flag);
+ conn_flag(conn, lowat_act_flag);
}
}
@@ -583,7 +579,7 @@ retry:
if (conn->read[fromsidei] == conn->written[fromsidei])
break;
- conn_event(c, conn, OUT_WAIT(!fromsidei));
+ conn_event(conn, OUT_WAIT(!fromsidei));
break;
}
@@ -605,7 +601,7 @@ retry:
if ((conn->events & FIN_RCVD(sidei)) &&
!(conn->events & FIN_SENT(!sidei))) {
shutdown(conn->s[!sidei], SHUT_WR);
- conn_event(c, conn, FIN_SENT(!sidei));
+ conn_event(conn, FIN_SENT(!sidei));
}
}
}
@@ -626,7 +622,7 @@ retry:
return;
close:
- conn_flag(c, conn, CLOSING);
+ conn_flag(conn, CLOSING);
}
/**
@@ -762,10 +758,10 @@ void tcp_splice_timer(struct tcp_splice_conn *conn)
flow_trace(conn, "can't set SO_RCVLOWAT on %d",
conn->s[sidei]);
}
- conn_flag(c, conn, ~RCVLOWAT_SET(sidei));
+ conn_flag(conn, ~RCVLOWAT_SET(sidei));
}
}
flow_foreach_sidei(sidei)
- conn_flag(c, conn, ~RCVLOWAT_ACT(sidei));
+ conn_flag(conn, ~RCVLOWAT_ACT(sidei));
}
diff --git a/udp_flow.c b/udp_flow.c
index c4cf35c2c89d..b016a8095ec6 100644
--- a/udp_flow.c
+++ b/udp_flow.c
@@ -74,7 +74,6 @@ static int udp_flow_sock(const struct ctx *c,
{
const struct flowside *side = &uflow->f.side[sidei];
uint8_t pif = uflow->f.pif[sidei];
- union epoll_ref ref;
int rc;
int s;
@@ -84,14 +83,11 @@ static int udp_flow_sock(const struct ctx *c,
return s;
}
- ref.type = EPOLL_TYPE_UDP;
- ref.flowside = FLOW_SIDX(uflow, sidei);
- ref.fd = s;
-
flow_epollid_set(&uflow->f, EPOLLFD_ID_DEFAULT);
-
- rc = epoll_add(flow_epollfd(&uflow->f), EPOLLIN, ref);
- if (rc < 0) {
+ if (flow_epoll_set(EPOLL_CTL_ADD, EPOLL_TYPE_UDP, &uflow->f,
+ EPOLLIN, s, sidei) < 0) {
+ rc = -errno;
+ flow_epollid_clear(&uflow->f);
close(s);
return rc;
}
--
2.52.0
next prev parent reply other threads:[~2026-01-08 14:01 UTC|newest]
Thread overview: 10+ messages / expand[flat|nested] mbox.gz Atom feed top
2026-01-08 14:01 [PATCH v2 0/5] " Laurent Vivier
2026-01-08 14:01 ` [PATCH v2 1/5] tcp: remove timer update in tcp_epoll_ctl() Laurent Vivier
2026-01-08 23:26 ` David Gibson
2026-01-08 14:01 ` [PATCH v2 2/5] udp_flow: remove unneeded epoll_ref indirection Laurent Vivier
2026-01-08 23:26 ` David Gibson
2026-01-08 14:01 ` [PATCH v2 3/5] udp_flow: Assign socket to flow inside udp_flow_sock() Laurent Vivier
2026-01-08 14:01 ` [PATCH v2 4/5] tcp_splice: Refactor tcp_splice_conn_epoll_events() to per-side computation Laurent Vivier
2026-01-08 14:01 ` Laurent Vivier [this message]
2026-01-08 23:33 ` [PATCH v2 5/5] flow: Introduce flow_epoll_set() to centralize epoll operations David Gibson
2026-01-09 9:26 ` Laurent Vivier
Reply instructions:
You may reply publicly to this message via plain-text email
using any one of the following methods:
* Save the following mbox file, import it into your mail client,
and reply-to-all from there: mbox
Avoid top-posting and favor interleaved quoting:
https://en.wikipedia.org/wiki/Posting_style#Interleaved_style
* Reply using the --to, --cc, and --in-reply-to
switches of git-send-email(1):
git send-email \
--in-reply-to=20260108140119.1204797-6-lvivier@redhat.com \
--to=lvivier@redhat.com \
--cc=passt-dev@passt.top \
/path/to/YOUR_REPLY
https://kernel.org/pub/software/scm/git/docs/git-send-email.html
* If your mail client supports setting the In-Reply-To header
via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line
before the message body.
Code repositories for project(s) associated with this public inbox
https://passt.top/passt
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for IMAP folder(s).