public inbox for passt-dev@passt.top
 help / color / mirror / code / Atom feed
From: Laurent Vivier <lvivier@redhat.com>
To: passt-dev@passt.top
Cc: Laurent Vivier <lvivier@redhat.com>
Subject: [PATCH v5 11/12] flow: Add queue pair tracking to flow management
Date: Tue, 16 Jun 2026 14:51:29 +0200	[thread overview]
Message-ID: <20260616125130.1324274-12-lvivier@redhat.com> (raw)
In-Reply-To: <20260616125130.1324274-1-lvivier@redhat.com>

Add a qpair field (5 bits) to struct flow_common, with
FLOW_QPAIR_INVALID as sentinel for unassigned flows.  Provide
flow_setqp()/FLOW_SETQP() to assign and flow_qp()/FLOW_QP() to
query the queue pair.

All protocol handlers (TCP, UDP, ICMP) set the queue pair on new
flows via FLOW_SETQP(), and update it on each packet received from
tap for existing flows, implementing virtio receive steering: return
traffic is directed to the RX queue matching the originating TX
queue.

tcp_keepalive() and tcp_inactivity() now filter by queue pair so
each worker only processes its own flows.

tcp_buf.c uses conn->f.qpair instead of hardcoding QPAIR_DEFAULT
for consistency, though this path is only used in non-vhost-user
mode where the queue pair is always 0.

Flows initiated from the host side default to queue pair 0.

Signed-off-by: Laurent Vivier <lvivier@redhat.com>
---
 flow.c       | 34 ++++++++++++++++++++++++++++++++++
 flow.h       | 18 +++++++++++++++++-
 icmp.c       |  8 +++++---
 tcp.c        | 20 ++++++++++++++++----
 tcp_buf.c    | 10 +++++-----
 tcp_splice.c |  1 +
 udp_flow.c   |  7 +++++--
 7 files changed, 83 insertions(+), 15 deletions(-)

diff --git a/flow.c b/flow.c
index c93b73549c90..bf855fe0dfaf 100644
--- a/flow.c
+++ b/flow.c
@@ -415,6 +415,39 @@ void flow_epollid_register(int epollid, int epollfd)
 	epoll_id_to_fd[epollid] = epollfd;
 }
 
+/**
+ * flow_qp() - Get the queue pair for a flow
+ * @f:		Flow to query (may be NULL)
+ *
+ * Return: queue pair number for the flow, or 0 if flow is NULL or has no
+ *         valid queue pair assignment
+ */
+/* cppcheck-suppress unusedFunction */
+unsigned int flow_qp(const struct flow_common *f)
+{
+	if (f == NULL || f->qpair == FLOW_QPAIR_INVALID)
+		return QPAIR_DEFAULT;
+	return f->qpair;
+}
+
+/**
+ * flow_setqp() - Set queue pair assignment for a flow
+ * @f:		Flow to update
+ * @qpair:	Queue pair number to assign
+ */
+void flow_setqp(struct flow_common *f, unsigned int qpair)
+{
+	assert(qpair < FLOW_QPAIR_MAX);
+
+	if (f->qpair == qpair)
+		return;
+
+	flow_trace((union flow *)f, "updating queue pair from %d to %d",
+		   f->qpair, qpair);
+
+	f->qpair = qpair;
+}
+
 /**
  * flow_initiate_() - Move flow to INI, setting pif[INISIDE]
  * @flow:	Flow to change state
@@ -636,6 +669,7 @@ union flow *flow_alloc(void)
 
 	flow_new_entry = flow;
 	memset(flow, 0, sizeof(*flow));
+	flow->f.qpair = FLOW_QPAIR_INVALID;
 	flow_set_state(&flow->f, FLOW_STATE_NEW);
 
 	return flow;
diff --git a/flow.h b/flow.h
index cae259fe7037..3c74dcbd95c4 100644
--- a/flow.h
+++ b/flow.h
@@ -184,7 +184,8 @@ int flowside_connect(const struct ctx *c, int s,
  * @pif[]:	Interface for each side of the flow
  * @side[]:	Information for each side of the flow
  * @tap_omac:	MAC address of remote endpoint as seen from the guest
- * @epollid:	epollfd identifier
+ * @qpair:	Queue pair number assigned to this flow
+ *		(FLOW_QPAIR_INVALID if not assigned)
  */
 struct flow_common {
 #ifdef __GNUC__
@@ -205,11 +206,19 @@ struct flow_common {
 
 #define EPOLLFD_ID_BITS 8
 	unsigned int	epollid:EPOLLFD_ID_BITS;
+#define FLOW_QPAIR_BITS 5
+	unsigned int	qpair:FLOW_QPAIR_BITS;
 };
 
 #define EPOLLFD_ID_DEFAULT	0
 #define EPOLLFD_ID_SIZE		(1 << EPOLLFD_ID_BITS)
 
+#define FLOW_QPAIR_NUM		(1 << FLOW_QPAIR_BITS)
+#define FLOW_QPAIR_MAX		(FLOW_QPAIR_NUM - 1)
+#define FLOW_QPAIR_INVALID	FLOW_QPAIR_MAX
+
+static_assert(VHOST_USER_MAX_VQS <= FLOW_QPAIR_MAX * 2);
+
 #define FLOW_INDEX_BITS		17	/* 128k - 1 */
 #define FLOW_MAX		MAX_FROM_BITS(FLOW_INDEX_BITS)
 
@@ -270,6 +279,13 @@ void flow_epollid_set(struct flow_common *f, int epollid);
 int flow_epoll_set(const struct flow_common *f, int command, uint32_t events,
 		   int fd, unsigned int sidei);
 void flow_epollid_register(int epollid, int epollfd);
+unsigned int flow_qp(const struct flow_common *f);
+#define FLOW_QP(flow_)				\
+	(flow_qp(&(flow_)->f))
+void flow_setqp(struct flow_common *f, unsigned int qpair);
+#define FLOW_SETQP(flow_, _qpair)		\
+	(flow_setqp(&(flow_)->f, _qpair))
+
 void flow_defer_handler(const struct ctx *c, const struct timespec *now,
 			unsigned int qpair);
 int flow_migrate_source_early(struct ctx *c, const struct migrate_stage *stage,
diff --git a/icmp.c b/icmp.c
index 62038f977116..2558fe5beaab 100644
--- a/icmp.c
+++ b/icmp.c
@@ -184,7 +184,6 @@ static struct icmp_ping_flow *icmp_ping_new(const struct ctx *c,
 	struct icmp_ping_flow *pingf;
 	const struct flowside *tgt;
 
-	(void)qpair;
 	if (!flow)
 		return NULL;
 
@@ -216,6 +215,7 @@ static struct icmp_ping_flow *icmp_ping_new(const struct ctx *c,
 	if (pingf->sock > FD_REF_MAX)
 		goto cancel;
 
+	FLOW_SETQP(pingf, qpair);
 	flow_epollid_set(&pingf->f, EPOLLFD_ID_DEFAULT);
 	if (flow_epoll_set(&pingf->f, EPOLL_CTL_ADD, EPOLLIN, pingf->sock,
 			   TGTSIDE) < 0) {
@@ -305,10 +305,12 @@ int icmp_tap_handler(const struct ctx *c, unsigned int qpair, uint8_t pif,
 	flow = flow_at_sidx(flow_lookup_af(c, proto, PIF_TAP,
 					   af, saddr, daddr, id, id));
 
-	if (flow)
+	if (flow) {
 		pingf = &flow->ping;
-	else if (!(pingf = icmp_ping_new(c, qpair, af, id, saddr, daddr)))
+		FLOW_SETQP(pingf, qpair); /* XXX if qpair change, update epollfd */
+	} else if (!(pingf = icmp_ping_new(c, qpair, af, id, saddr, daddr))) {
 		return 1;
+	}
 
 	tgt = &pingf->f.side[TGTSIDE];
 
diff --git a/tcp.c b/tcp.c
index 7f8e68a31994..c0a4de33f068 100644
--- a/tcp.c
+++ b/tcp.c
@@ -1735,6 +1735,7 @@ static void tcp_conn_from_tap(const struct ctx *c, unsigned int qpair,
 
 	conn->sock = s;
 	conn->timer = -1;
+	FLOW_SETQP(conn, qpair);
 	flow_epollid_set(&conn->f, EPOLLFD_ID_DEFAULT);
 	if (flow_epoll_set(&conn->f, EPOLL_CTL_ADD, 0, s, TGTSIDE) < 0) {
 		flow_perror(flow, "Can't register with epoll");
@@ -2250,7 +2251,7 @@ static void tcp_rst_no_conn(const struct ctx *c, unsigned int qpair, int af,
 /**
  * tcp_tap_handler() - Handle packets from tap and state transitions
  * @c:		Execution context
- * @qpair:	Queue pair on which to send packets
+ * @qpair:	Queue pair to process
  * @pif:	pif on which the packet is arriving
  * @af:		Address family, AF_INET or AF_INET6
  * @saddr:	Source address
@@ -2314,6 +2315,9 @@ int tcp_tap_handler(const struct ctx *c, unsigned int qpair, uint8_t pif,
 	assert(pif_at_sidx(sidx) == PIF_TAP);
 	conn = &flow->tcp;
 
+	/* update queue pair */
+	FLOW_SETQP(flow, qpair);
+
 	flow_trace(conn, "packet length %zu from tap", l4len);
 
 	if (th->rst) {
@@ -2518,6 +2522,7 @@ static void tcp_tap_conn_from_sock(const struct ctx *c, union flow *flow,
 	conn->timer = -1;
 	conn->ws_to_tap = conn->ws_from_tap = 0;
 
+	FLOW_SETQP(conn, QPAIR_DEFAULT);
 	flow_epollid_set(&conn->f, EPOLLFD_ID_DEFAULT);
 	if (flow_epoll_set(&conn->f, EPOLL_CTL_ADD, 0, s, INISIDE) < 0) {
 		flow_perror(flow, "Can't register with epoll");
@@ -2980,6 +2985,9 @@ static void tcp_keepalive(struct ctx *c, const struct timespec *now,
 	flow_foreach_of_type(flow, FLOW_TCP) {
 		struct tcp_tap_conn *conn = &flow->tcp;
 
+		if (conn->f.qpair != qpair)
+			continue;
+
 		if (conn->tap_inactive) {
 			flow_dbg(conn, "No tap activity for least %us, send keepalive",
 				 KEEPALIVE_INTERVAL);
@@ -3011,6 +3019,9 @@ static void tcp_inactivity(struct ctx *c, const struct timespec *now,
 	flow_foreach_of_type(flow, FLOW_TCP) {
 		struct tcp_tap_conn *conn = &flow->tcp;
 
+		if (conn->f.qpair != qpair)
+			continue;
+
 		if (conn->inactive) {
 			/* No activity in this interval, reset */
 			flow_dbg(conn, "Inactive for at least %us, resetting",
@@ -3841,6 +3852,7 @@ int tcp_flow_migrate_target(struct ctx *c, int fd)
 		goto out;
 	}
 
+	FLOW_SETQP(conn, QPAIR_DEFAULT);
 	flow_epollid_set(&conn->f, EPOLLFD_ID_DEFAULT);
 	if (flow_epoll_set(&conn->f, EPOLL_CTL_ADD, 0, conn->sock,
 			   !TAPSIDE(conn)))
@@ -4019,10 +4031,10 @@ int tcp_flow_migrate_target_ext(struct ctx *c, struct tcp_tap_conn *conn, int fd
 	if (tcp_set_peek_offset(conn, peek_offset))
 		goto fail;
 
-	if (tcp_send_flag(c, conn, ACK, QPAIR_DEFAULT))
+	if (tcp_send_flag(c, conn, ACK, conn->f.qpair))
 		goto fail;
 
-	tcp_data_from_sock(c, conn, QPAIR_DEFAULT);
+	tcp_data_from_sock(c, conn, conn->f.qpair);
 
 	if ((rc = tcp_epoll_ctl(conn))) {
 		flow_dbg(conn,
@@ -4040,7 +4052,7 @@ fail:
 	}
 
 	conn->flags = 0; /* Not waiting for ACK, don't schedule timer */
-	tcp_rst(c, conn, QPAIR_DEFAULT);
+	tcp_rst(c, conn, conn->f.qpair);
 
 	return 0;
 }
diff --git a/tcp_buf.c b/tcp_buf.c
index ae8bebca5107..647c17621963 100644
--- a/tcp_buf.c
+++ b/tcp_buf.c
@@ -124,7 +124,7 @@ static void tcp_revert_seq(const struct ctx *c, struct tcp_tap_conn **conns,
 		conn->seq_to_tap = seq;
 		peek_offset = conn->seq_to_tap - conn->seq_ack_from_tap;
 		if (tcp_set_peek_offset(conn, peek_offset))
-			tcp_rst(c, conn, QPAIR_DEFAULT);
+			tcp_rst(c, conn, conn->f.qpair);
 	}
 }
 
@@ -334,7 +334,7 @@ int tcp_buf_data_from_sock(const struct ctx *c, struct tcp_tap_conn *conn)
 		conn->seq_to_tap = conn->seq_ack_from_tap;
 		already_sent = 0;
 		if (tcp_set_peek_offset(conn, 0)) {
-			tcp_rst(c, conn, QPAIR_DEFAULT);
+			tcp_rst(c, conn, conn->f.qpair);
 			return -1;
 		}
 	}
@@ -356,7 +356,7 @@ int tcp_buf_data_from_sock(const struct ctx *c, struct tcp_tap_conn *conn)
 	}
 
 	if (tcp_prepare_iov(&mh_sock, iov_sock, already_sent, fill_bufs)) {
-		tcp_rst(c, conn, QPAIR_DEFAULT);
+		tcp_rst(c, conn, conn->f.qpair);
 		return -1;
 	}
 
@@ -381,7 +381,7 @@ int tcp_buf_data_from_sock(const struct ctx *c, struct tcp_tap_conn *conn)
 
 	if (len < 0) {
 		if (errno != EAGAIN && errno != EWOULDBLOCK) {
-			tcp_rst(c, conn, QPAIR_DEFAULT);
+			tcp_rst(c, conn, conn->f.qpair);
 			return -errno;
 		}
 
@@ -410,7 +410,7 @@ int tcp_buf_data_from_sock(const struct ctx *c, struct tcp_tap_conn *conn)
 
 			ret = tcp_buf_send_flag(c, conn, FIN | ACK);
 			if (ret) {
-				tcp_rst(c, conn, QPAIR_DEFAULT);
+				tcp_rst(c, conn, conn->f.qpair);
 				return ret;
 			}
 
diff --git a/tcp_splice.c b/tcp_splice.c
index 3fd33a10308e..1a77ac2e8a18 100644
--- a/tcp_splice.c
+++ b/tcp_splice.c
@@ -377,6 +377,7 @@ static int tcp_splice_connect(const struct ctx *c, struct tcp_splice_conn *conn)
 
 	pif_sockaddr(c, &sa, tgtpif, &tgt->eaddr, tgt->eport);
 
+	FLOW_SETQP(conn, QPAIR_DEFAULT);
 	flow_epollid_set(&conn->f, EPOLLFD_ID_DEFAULT);
 	if (flow_epoll_set(&conn->f, EPOLL_CTL_ADD, 0, conn->s[0], 0) ||
 	    flow_epoll_set(&conn->f, EPOLL_CTL_ADD, 0, conn->s[1], 1)) {
diff --git a/udp_flow.c b/udp_flow.c
index 143f265493fa..44e0c4c50ca9 100644
--- a/udp_flow.c
+++ b/udp_flow.c
@@ -81,7 +81,6 @@ static int udp_flow_sock(const struct ctx *c,
 		return s;
 	}
 
-	flow_epollid_set(&uflow->f, EPOLLFD_ID_DEFAULT);
 	if (flow_epoll_set(&uflow->f, EPOLL_CTL_ADD, EPOLLIN, s, sidei) < 0) {
 		rc = -errno;
 		close(s);
@@ -154,7 +153,8 @@ static flow_sidx_t udp_flow_new(const struct ctx *c, unsigned int qpair,
 	uflow->ttl[INISIDE] = uflow->ttl[TGTSIDE] = 0;
 	uflow->activity[INISIDE] = 1;
 	uflow->activity[TGTSIDE] = 0;
-	(void)qpair;
+	FLOW_SETQP(uflow, qpair);
+	flow_epollid_set(&uflow->f, EPOLLFD_ID_DEFAULT);
 
 	flow_foreach_sidei(sidei) {
 		if (pif_is_socket(uflow->f.pif[sidei]))
@@ -270,6 +270,7 @@ flow_sidx_t udp_flow_from_sock(const struct ctx *c, uint8_t pif,
  * @daddr:	Destination address guest side
  * @srcport:	Source port on guest side
  * @dstport:	Destination port on guest side
+ * @now:	Current timestamp
  *
  * Return: sidx for the destination side of the flow for this packet, or
  *         FLOW_SIDX_NONE if we couldn't find or create a flow.
@@ -291,6 +292,8 @@ flow_sidx_t udp_flow_from_tap(const struct ctx *c, unsigned int qpair,
 			      srcport, dstport);
 	if ((uflow = udp_at_sidx(sidx))) {
 		udp_flow_activity(uflow, sidx.sidei, now);
+		/* update qpair */
+		FLOW_SETQP(uflow, qpair); /* if qpair changes, update epollfd */
 		return flow_sidx_opposite(sidx);
 	}
 
-- 
2.54.0


  parent reply	other threads:[~2026-06-16 12:51 UTC|newest]

Thread overview: 13+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2026-06-16 12:51 [PATCH v5 00/12] vhost-user: Add multiqueue support Laurent Vivier
2026-06-16 12:51 ` [PATCH v5 01/12] tap: Remove pool parameter from tap4_handler() and tap6_handler() Laurent Vivier
2026-06-16 12:51 ` [PATCH v5 02/12] vhost-user: Advertise multiqueue support Laurent Vivier
2026-06-16 12:51 ` [PATCH v5 03/12] test: Add multiqueue support to vhost-user test infrastructure Laurent Vivier
2026-06-16 12:51 ` [PATCH v5 04/12] tap: Thread queue pair through all remaining tap paths Laurent Vivier
2026-06-16 12:51 ` [PATCH v5 05/12] arp: Pass queue pair explicitly through ARP send path Laurent Vivier
2026-06-16 12:51 ` [PATCH v5 06/12] tcp: Pass queue pair explicitly through TCP " Laurent Vivier
2026-06-16 12:51 ` [PATCH v5 07/12] udp: Pass queue pair explicitly through UDP " Laurent Vivier
2026-06-16 12:51 ` [PATCH v5 08/12] dhcp/dhcpv6: Pass queue pair explicitly through DHCP " Laurent Vivier
2026-06-16 12:51 ` [PATCH v5 09/12] icmp: Pass queue pair explicitly through ICMP " Laurent Vivier
2026-06-16 12:51 ` [PATCH v5 10/12] ndp: Pass queue pair explicitly through NDP " Laurent Vivier
2026-06-16 12:51 ` Laurent Vivier [this message]
2026-06-16 12:51 ` [PATCH v5 12/12] flow: Derive epoll fd from queue pair, removing epollid field Laurent Vivier

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=20260616125130.1324274-12-lvivier@redhat.com \
    --to=lvivier@redhat.com \
    --cc=passt-dev@passt.top \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
Code repositories for project(s) associated with this public inbox

	https://passt.top/passt

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for IMAP folder(s).