From: Laurent Vivier <lvivier@redhat.com>
To: passt-dev@passt.top
Cc: Laurent Vivier <lvivier@redhat.com>
Subject: [PATCH v5 11/12] flow: Add queue pair tracking to flow management
Date: Tue, 16 Jun 2026 14:51:29 +0200 [thread overview]
Message-ID: <20260616125130.1324274-12-lvivier@redhat.com> (raw)
In-Reply-To: <20260616125130.1324274-1-lvivier@redhat.com>
Add a qpair field (5 bits) to struct flow_common, with
FLOW_QPAIR_INVALID as sentinel for unassigned flows. Provide
flow_setqp()/FLOW_SETQP() to assign and flow_qp()/FLOW_QP() to
query the queue pair.
All protocol handlers (TCP, UDP, ICMP) set the queue pair on new
flows via FLOW_SETQP(), and update it on each packet received from
tap for existing flows, implementing virtio receive steering: return
traffic is directed to the RX queue matching the originating TX
queue.
tcp_keepalive() and tcp_inactivity() now filter by queue pair so
each worker only processes its own flows.
tcp_buf.c uses conn->f.qpair instead of hardcoding QPAIR_DEFAULT
for consistency, though this path is only used in non-vhost-user
mode where the queue pair is always 0.
Flows initiated from the host side default to queue pair 0.
Signed-off-by: Laurent Vivier <lvivier@redhat.com>
---
flow.c | 34 ++++++++++++++++++++++++++++++++++
flow.h | 18 +++++++++++++++++-
icmp.c | 8 +++++---
tcp.c | 20 ++++++++++++++++----
tcp_buf.c | 10 +++++-----
tcp_splice.c | 1 +
udp_flow.c | 7 +++++--
7 files changed, 83 insertions(+), 15 deletions(-)
diff --git a/flow.c b/flow.c
index c93b73549c90..bf855fe0dfaf 100644
--- a/flow.c
+++ b/flow.c
@@ -415,6 +415,39 @@ void flow_epollid_register(int epollid, int epollfd)
epoll_id_to_fd[epollid] = epollfd;
}
+/**
+ * flow_qp() - Get the queue pair for a flow
+ * @f: Flow to query (may be NULL)
+ *
+ * Return: queue pair number for the flow, or 0 if flow is NULL or has no
+ * valid queue pair assignment
+ */
+/* cppcheck-suppress unusedFunction */
+unsigned int flow_qp(const struct flow_common *f)
+{
+ if (f == NULL || f->qpair == FLOW_QPAIR_INVALID)
+ return QPAIR_DEFAULT;
+ return f->qpair;
+}
+
+/**
+ * flow_setqp() - Set queue pair assignment for a flow
+ * @f: Flow to update
+ * @qpair: Queue pair number to assign
+ */
+void flow_setqp(struct flow_common *f, unsigned int qpair)
+{
+ assert(qpair < FLOW_QPAIR_MAX);
+
+ if (f->qpair == qpair)
+ return;
+
+ flow_trace((union flow *)f, "updating queue pair from %d to %d",
+ f->qpair, qpair);
+
+ f->qpair = qpair;
+}
+
/**
* flow_initiate_() - Move flow to INI, setting pif[INISIDE]
* @flow: Flow to change state
@@ -636,6 +669,7 @@ union flow *flow_alloc(void)
flow_new_entry = flow;
memset(flow, 0, sizeof(*flow));
+ flow->f.qpair = FLOW_QPAIR_INVALID;
flow_set_state(&flow->f, FLOW_STATE_NEW);
return flow;
diff --git a/flow.h b/flow.h
index cae259fe7037..3c74dcbd95c4 100644
--- a/flow.h
+++ b/flow.h
@@ -184,7 +184,8 @@ int flowside_connect(const struct ctx *c, int s,
* @pif[]: Interface for each side of the flow
* @side[]: Information for each side of the flow
* @tap_omac: MAC address of remote endpoint as seen from the guest
- * @epollid: epollfd identifier
+ * @qpair: Queue pair number assigned to this flow
+ * (FLOW_QPAIR_INVALID if not assigned)
*/
struct flow_common {
#ifdef __GNUC__
@@ -205,11 +206,19 @@ struct flow_common {
#define EPOLLFD_ID_BITS 8
unsigned int epollid:EPOLLFD_ID_BITS;
+#define FLOW_QPAIR_BITS 5
+ unsigned int qpair:FLOW_QPAIR_BITS;
};
#define EPOLLFD_ID_DEFAULT 0
#define EPOLLFD_ID_SIZE (1 << EPOLLFD_ID_BITS)
+#define FLOW_QPAIR_NUM (1 << FLOW_QPAIR_BITS)
+#define FLOW_QPAIR_MAX (FLOW_QPAIR_NUM - 1)
+#define FLOW_QPAIR_INVALID FLOW_QPAIR_MAX
+
+static_assert(VHOST_USER_MAX_VQS <= FLOW_QPAIR_MAX * 2);
+
#define FLOW_INDEX_BITS 17 /* 128k - 1 */
#define FLOW_MAX MAX_FROM_BITS(FLOW_INDEX_BITS)
@@ -270,6 +279,13 @@ void flow_epollid_set(struct flow_common *f, int epollid);
int flow_epoll_set(const struct flow_common *f, int command, uint32_t events,
int fd, unsigned int sidei);
void flow_epollid_register(int epollid, int epollfd);
+unsigned int flow_qp(const struct flow_common *f);
+#define FLOW_QP(flow_) \
+ (flow_qp(&(flow_)->f))
+void flow_setqp(struct flow_common *f, unsigned int qpair);
+#define FLOW_SETQP(flow_, _qpair) \
+ (flow_setqp(&(flow_)->f, _qpair))
+
void flow_defer_handler(const struct ctx *c, const struct timespec *now,
unsigned int qpair);
int flow_migrate_source_early(struct ctx *c, const struct migrate_stage *stage,
diff --git a/icmp.c b/icmp.c
index 62038f977116..2558fe5beaab 100644
--- a/icmp.c
+++ b/icmp.c
@@ -184,7 +184,6 @@ static struct icmp_ping_flow *icmp_ping_new(const struct ctx *c,
struct icmp_ping_flow *pingf;
const struct flowside *tgt;
- (void)qpair;
if (!flow)
return NULL;
@@ -216,6 +215,7 @@ static struct icmp_ping_flow *icmp_ping_new(const struct ctx *c,
if (pingf->sock > FD_REF_MAX)
goto cancel;
+ FLOW_SETQP(pingf, qpair);
flow_epollid_set(&pingf->f, EPOLLFD_ID_DEFAULT);
if (flow_epoll_set(&pingf->f, EPOLL_CTL_ADD, EPOLLIN, pingf->sock,
TGTSIDE) < 0) {
@@ -305,10 +305,12 @@ int icmp_tap_handler(const struct ctx *c, unsigned int qpair, uint8_t pif,
flow = flow_at_sidx(flow_lookup_af(c, proto, PIF_TAP,
af, saddr, daddr, id, id));
- if (flow)
+ if (flow) {
pingf = &flow->ping;
- else if (!(pingf = icmp_ping_new(c, qpair, af, id, saddr, daddr)))
+ FLOW_SETQP(pingf, qpair); /* XXX if qpair change, update epollfd */
+ } else if (!(pingf = icmp_ping_new(c, qpair, af, id, saddr, daddr))) {
return 1;
+ }
tgt = &pingf->f.side[TGTSIDE];
diff --git a/tcp.c b/tcp.c
index 7f8e68a31994..c0a4de33f068 100644
--- a/tcp.c
+++ b/tcp.c
@@ -1735,6 +1735,7 @@ static void tcp_conn_from_tap(const struct ctx *c, unsigned int qpair,
conn->sock = s;
conn->timer = -1;
+ FLOW_SETQP(conn, qpair);
flow_epollid_set(&conn->f, EPOLLFD_ID_DEFAULT);
if (flow_epoll_set(&conn->f, EPOLL_CTL_ADD, 0, s, TGTSIDE) < 0) {
flow_perror(flow, "Can't register with epoll");
@@ -2250,7 +2251,7 @@ static void tcp_rst_no_conn(const struct ctx *c, unsigned int qpair, int af,
/**
* tcp_tap_handler() - Handle packets from tap and state transitions
* @c: Execution context
- * @qpair: Queue pair on which to send packets
+ * @qpair: Queue pair to process
* @pif: pif on which the packet is arriving
* @af: Address family, AF_INET or AF_INET6
* @saddr: Source address
@@ -2314,6 +2315,9 @@ int tcp_tap_handler(const struct ctx *c, unsigned int qpair, uint8_t pif,
assert(pif_at_sidx(sidx) == PIF_TAP);
conn = &flow->tcp;
+ /* update queue pair */
+ FLOW_SETQP(flow, qpair);
+
flow_trace(conn, "packet length %zu from tap", l4len);
if (th->rst) {
@@ -2518,6 +2522,7 @@ static void tcp_tap_conn_from_sock(const struct ctx *c, union flow *flow,
conn->timer = -1;
conn->ws_to_tap = conn->ws_from_tap = 0;
+ FLOW_SETQP(conn, QPAIR_DEFAULT);
flow_epollid_set(&conn->f, EPOLLFD_ID_DEFAULT);
if (flow_epoll_set(&conn->f, EPOLL_CTL_ADD, 0, s, INISIDE) < 0) {
flow_perror(flow, "Can't register with epoll");
@@ -2980,6 +2985,9 @@ static void tcp_keepalive(struct ctx *c, const struct timespec *now,
flow_foreach_of_type(flow, FLOW_TCP) {
struct tcp_tap_conn *conn = &flow->tcp;
+ if (conn->f.qpair != qpair)
+ continue;
+
if (conn->tap_inactive) {
flow_dbg(conn, "No tap activity for least %us, send keepalive",
KEEPALIVE_INTERVAL);
@@ -3011,6 +3019,9 @@ static void tcp_inactivity(struct ctx *c, const struct timespec *now,
flow_foreach_of_type(flow, FLOW_TCP) {
struct tcp_tap_conn *conn = &flow->tcp;
+ if (conn->f.qpair != qpair)
+ continue;
+
if (conn->inactive) {
/* No activity in this interval, reset */
flow_dbg(conn, "Inactive for at least %us, resetting",
@@ -3841,6 +3852,7 @@ int tcp_flow_migrate_target(struct ctx *c, int fd)
goto out;
}
+ FLOW_SETQP(conn, QPAIR_DEFAULT);
flow_epollid_set(&conn->f, EPOLLFD_ID_DEFAULT);
if (flow_epoll_set(&conn->f, EPOLL_CTL_ADD, 0, conn->sock,
!TAPSIDE(conn)))
@@ -4019,10 +4031,10 @@ int tcp_flow_migrate_target_ext(struct ctx *c, struct tcp_tap_conn *conn, int fd
if (tcp_set_peek_offset(conn, peek_offset))
goto fail;
- if (tcp_send_flag(c, conn, ACK, QPAIR_DEFAULT))
+ if (tcp_send_flag(c, conn, ACK, conn->f.qpair))
goto fail;
- tcp_data_from_sock(c, conn, QPAIR_DEFAULT);
+ tcp_data_from_sock(c, conn, conn->f.qpair);
if ((rc = tcp_epoll_ctl(conn))) {
flow_dbg(conn,
@@ -4040,7 +4052,7 @@ fail:
}
conn->flags = 0; /* Not waiting for ACK, don't schedule timer */
- tcp_rst(c, conn, QPAIR_DEFAULT);
+ tcp_rst(c, conn, conn->f.qpair);
return 0;
}
diff --git a/tcp_buf.c b/tcp_buf.c
index ae8bebca5107..647c17621963 100644
--- a/tcp_buf.c
+++ b/tcp_buf.c
@@ -124,7 +124,7 @@ static void tcp_revert_seq(const struct ctx *c, struct tcp_tap_conn **conns,
conn->seq_to_tap = seq;
peek_offset = conn->seq_to_tap - conn->seq_ack_from_tap;
if (tcp_set_peek_offset(conn, peek_offset))
- tcp_rst(c, conn, QPAIR_DEFAULT);
+ tcp_rst(c, conn, conn->f.qpair);
}
}
@@ -334,7 +334,7 @@ int tcp_buf_data_from_sock(const struct ctx *c, struct tcp_tap_conn *conn)
conn->seq_to_tap = conn->seq_ack_from_tap;
already_sent = 0;
if (tcp_set_peek_offset(conn, 0)) {
- tcp_rst(c, conn, QPAIR_DEFAULT);
+ tcp_rst(c, conn, conn->f.qpair);
return -1;
}
}
@@ -356,7 +356,7 @@ int tcp_buf_data_from_sock(const struct ctx *c, struct tcp_tap_conn *conn)
}
if (tcp_prepare_iov(&mh_sock, iov_sock, already_sent, fill_bufs)) {
- tcp_rst(c, conn, QPAIR_DEFAULT);
+ tcp_rst(c, conn, conn->f.qpair);
return -1;
}
@@ -381,7 +381,7 @@ int tcp_buf_data_from_sock(const struct ctx *c, struct tcp_tap_conn *conn)
if (len < 0) {
if (errno != EAGAIN && errno != EWOULDBLOCK) {
- tcp_rst(c, conn, QPAIR_DEFAULT);
+ tcp_rst(c, conn, conn->f.qpair);
return -errno;
}
@@ -410,7 +410,7 @@ int tcp_buf_data_from_sock(const struct ctx *c, struct tcp_tap_conn *conn)
ret = tcp_buf_send_flag(c, conn, FIN | ACK);
if (ret) {
- tcp_rst(c, conn, QPAIR_DEFAULT);
+ tcp_rst(c, conn, conn->f.qpair);
return ret;
}
diff --git a/tcp_splice.c b/tcp_splice.c
index 3fd33a10308e..1a77ac2e8a18 100644
--- a/tcp_splice.c
+++ b/tcp_splice.c
@@ -377,6 +377,7 @@ static int tcp_splice_connect(const struct ctx *c, struct tcp_splice_conn *conn)
pif_sockaddr(c, &sa, tgtpif, &tgt->eaddr, tgt->eport);
+ FLOW_SETQP(conn, QPAIR_DEFAULT);
flow_epollid_set(&conn->f, EPOLLFD_ID_DEFAULT);
if (flow_epoll_set(&conn->f, EPOLL_CTL_ADD, 0, conn->s[0], 0) ||
flow_epoll_set(&conn->f, EPOLL_CTL_ADD, 0, conn->s[1], 1)) {
diff --git a/udp_flow.c b/udp_flow.c
index 143f265493fa..44e0c4c50ca9 100644
--- a/udp_flow.c
+++ b/udp_flow.c
@@ -81,7 +81,6 @@ static int udp_flow_sock(const struct ctx *c,
return s;
}
- flow_epollid_set(&uflow->f, EPOLLFD_ID_DEFAULT);
if (flow_epoll_set(&uflow->f, EPOLL_CTL_ADD, EPOLLIN, s, sidei) < 0) {
rc = -errno;
close(s);
@@ -154,7 +153,8 @@ static flow_sidx_t udp_flow_new(const struct ctx *c, unsigned int qpair,
uflow->ttl[INISIDE] = uflow->ttl[TGTSIDE] = 0;
uflow->activity[INISIDE] = 1;
uflow->activity[TGTSIDE] = 0;
- (void)qpair;
+ FLOW_SETQP(uflow, qpair);
+ flow_epollid_set(&uflow->f, EPOLLFD_ID_DEFAULT);
flow_foreach_sidei(sidei) {
if (pif_is_socket(uflow->f.pif[sidei]))
@@ -270,6 +270,7 @@ flow_sidx_t udp_flow_from_sock(const struct ctx *c, uint8_t pif,
* @daddr: Destination address guest side
* @srcport: Source port on guest side
* @dstport: Destination port on guest side
+ * @now: Current timestamp
*
* Return: sidx for the destination side of the flow for this packet, or
* FLOW_SIDX_NONE if we couldn't find or create a flow.
@@ -291,6 +292,8 @@ flow_sidx_t udp_flow_from_tap(const struct ctx *c, unsigned int qpair,
srcport, dstport);
if ((uflow = udp_at_sidx(sidx))) {
udp_flow_activity(uflow, sidx.sidei, now);
+ /* update qpair */
+ FLOW_SETQP(uflow, qpair); /* if qpair changes, update epollfd */
return flow_sidx_opposite(sidx);
}
--
2.54.0
next prev parent reply other threads:[~2026-06-16 12:51 UTC|newest]
Thread overview: 13+ messages / expand[flat|nested] mbox.gz Atom feed top
2026-06-16 12:51 [PATCH v5 00/12] vhost-user: Add multiqueue support Laurent Vivier
2026-06-16 12:51 ` [PATCH v5 01/12] tap: Remove pool parameter from tap4_handler() and tap6_handler() Laurent Vivier
2026-06-16 12:51 ` [PATCH v5 02/12] vhost-user: Advertise multiqueue support Laurent Vivier
2026-06-16 12:51 ` [PATCH v5 03/12] test: Add multiqueue support to vhost-user test infrastructure Laurent Vivier
2026-06-16 12:51 ` [PATCH v5 04/12] tap: Thread queue pair through all remaining tap paths Laurent Vivier
2026-06-16 12:51 ` [PATCH v5 05/12] arp: Pass queue pair explicitly through ARP send path Laurent Vivier
2026-06-16 12:51 ` [PATCH v5 06/12] tcp: Pass queue pair explicitly through TCP " Laurent Vivier
2026-06-16 12:51 ` [PATCH v5 07/12] udp: Pass queue pair explicitly through UDP " Laurent Vivier
2026-06-16 12:51 ` [PATCH v5 08/12] dhcp/dhcpv6: Pass queue pair explicitly through DHCP " Laurent Vivier
2026-06-16 12:51 ` [PATCH v5 09/12] icmp: Pass queue pair explicitly through ICMP " Laurent Vivier
2026-06-16 12:51 ` [PATCH v5 10/12] ndp: Pass queue pair explicitly through NDP " Laurent Vivier
2026-06-16 12:51 ` Laurent Vivier [this message]
2026-06-16 12:51 ` [PATCH v5 12/12] flow: Derive epoll fd from queue pair, removing epollid field Laurent Vivier
Reply instructions:
You may reply publicly to this message via plain-text email
using any one of the following methods:
* Save the following mbox file, import it into your mail client,
and reply-to-all from there: mbox
Avoid top-posting and favor interleaved quoting:
https://en.wikipedia.org/wiki/Posting_style#Interleaved_style
* Reply using the --to, --cc, and --in-reply-to
switches of git-send-email(1):
git send-email \
--in-reply-to=20260616125130.1324274-12-lvivier@redhat.com \
--to=lvivier@redhat.com \
--cc=passt-dev@passt.top \
/path/to/YOUR_REPLY
https://kernel.org/pub/software/scm/git/docs/git-send-email.html
* If your mail client supports setting the In-Reply-To header
via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line
before the message body.
Code repositories for project(s) associated with this public inbox
https://passt.top/passt
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for IMAP folder(s).