public inbox for passt-dev@passt.top
 help / color / mirror / code / Atom feed
* [PATCH v5 00/12] vhost-user: Add multiqueue support
@ 2026-06-16 12:51 Laurent Vivier
  2026-06-16 12:51 ` [PATCH v5 01/12] tap: Remove pool parameter from tap4_handler() and tap6_handler() Laurent Vivier
                   ` (11 more replies)
  0 siblings, 12 replies; 13+ messages in thread
From: Laurent Vivier @ 2026-06-16 12:51 UTC (permalink / raw)
  To: passt-dev; +Cc: Laurent Vivier

This series implements multiqueue support for vhost-user mode, allowing passt
to utilize multiple queue pairs for improved network throughput when used with
multi-CPU guest VMs. While this version uses a single thread for packet
processing, it enables the guest kernel to distribute network traffic across
multiple queues and vCPUs.

The implementation advertises support for up to 16 queue pairs (32 virtqueues)
by setting VIRTIO_NET_F_MQ and VHOST_USER_PROTOCOL_F_MQ feature flags.
Packets are routed to the appropriate RX queue based on which TX queue they
originated from, following the virtio specification's automatic receive
steering requirements.

This series adds:
- Multiqueue capability advertisement (VIRTIO_NET_F_MQ and
  VHOST_USER_PROTOCOL_F_MQ features)
- Queue pair parameter throughout the network stack, propagated through all
  protocol handlers (TCP, UDP, ICMP, ARP, DHCP, DHCPv6, NDP)
- Flow-aware queue routing that tracks the originating TX queue for each flow
  and routes return packets to the corresponding RX queue
- Epoll fd derived directly from queue pair, removing the separate epollid
  field and adding flow_migrate() for moving flows between queue pairs
- Test coverage with VHOST_USER_MQ environment variable to validate multiqueue
  functionality across all protocols (TCP, UDP, ICMP) and services (DHCP, NDP)

Current behavior:
- TX queue selection is controlled by the guest kernel's networking stack
- RX packets are routed to queues based on their associated flows, with the
  queue assignment updated on each packet from TX to maintain affinity
- Host-initiated flows (e.g., from socket-side connections) currently default
  to queue pair 0

The changes are transparent to single-queue operation - passt/pasta modes and
single-queue vhost-user configurations continue to work unchanged, always
using queue pair 0.

v5:
- Rebase
- Dropped "tap: Convert packet pools to per-queue-pair arrays for multiqueue"
- Split "vhost-user: Add queue pair parameter throughout the network stack"
  into separate per-protocol patches (tap, arp, tcp, udp, dhcp/dhcpv6,
  icmp, ndp) for easier review
- New patch: "flow: Derive epoll fd from queue pair, removing epollid field"
  derives epoll fd directly from queue pair, removing the redundant epollid
  field and adding flow_migrate()/FLOW_MIGRATE() for existing flows
- tcp_keepalive() and tcp_inactivity() now filter flows by queue pair
- Renamed "vhost-user: Enable multiqueue" to "vhost-user: Advertise
  multiqueue support"

v4:
- Rebase
- Added QPAIR_DEFAULT, QPAIR_FROMGUEST_QUEUE(), QPAIR_TOGUEST_QUEUE(), and
  QPAIR_FROM_QUEUE() macros to centralize queue pair calculations
- Replaced hardcoded 0 with QPAIR_DEFAULT throughout for clarity
- Set queue pair at flow creation time in icmp_ping_new() and udp_flow_new()
  rather than after creation
- Removed vhost-user-specific queue macros (VHOST_USER_TX_QUEUE,
  VHOST_USER_IS_QUEUE_TX, VHOST_USER_IS_QUEUE_RX) in favor of generic
  QPAIR_* macros for better consistency

v3:
- Removed --max-qpairs configuration option - multiqueue support is now
  always enabled up to 16 queue pairs without requiring explicit configuration
- Replaced "tap: Add queue pair parameter throughout the packet processing
  path" with "tap: Convert packet pools to per-queue-pair arrays for
  multiqueue" - simplified implementation by converting global pools to arrays
  rather than passing pool parameters throughout
- Changed qpair parameter type from int to unsigned int throughout the codebase
- Simplified test infrastructure - queues parameter is always set on netdev,
  mq=true added to virtio-net only when VHOST_USER_MQ > 1
- Updated QEMU usage hints to always show multiqueue-capable command line

v2:
- New patch: "tap: Remove pool parameter from tap4_handler() and tap6_handler()"
  to clean up unused parameters before adding queue pair parameter
- Changed to one packet pool per queue pair instead of shared pools across
  all queue pairs
- Split "multiqueue: Add queue-aware flow management..." into two patches:
  - "tap: Add queue pair parameter throughout the packet processing path"
  - "flow: Add queue pair tracking to flow management"
- Updated test infrastructure patch with refined implementation

Laurent Vivier (12):
  tap: Remove pool parameter from tap4_handler() and tap6_handler()
  vhost-user: Advertise multiqueue support
  test: Add multiqueue support to vhost-user test infrastructure
  tap: Thread queue pair through all remaining tap paths
  arp: Pass queue pair explicitly through ARP send path
  tcp: Pass queue pair explicitly through TCP send path
  udp: Pass queue pair explicitly through UDP send path
  dhcp/dhcpv6: Pass queue pair explicitly through DHCP send path
  icmp: Pass queue pair explicitly through ICMP send path
  ndp: Pass queue pair explicitly through NDP send path
  flow: Add queue pair tracking to flow management
  flow: Derive epoll fd from queue pair, removing epollid field

 arp.c          |  15 +--
 arp.h          |   6 +-
 dhcp.c         |   5 +-
 dhcp.h         |   2 +-
 dhcpv6.c       |  12 ++-
 dhcpv6.h       |   2 +-
 flow.c         |  66 ++++++++-----
 flow.h         |  28 ++++--
 fwd.c          |   4 +-
 icmp.c         |  29 ++++--
 icmp.h         |   7 +-
 ndp.c          |  40 ++++----
 ndp.h          |   7 +-
 passt.c        |  27 +++---
 passt.h        |   8 ++
 tap.c          | 134 ++++++++++++++------------
 tap.h          |  21 +++--
 tcp.c          | 249 ++++++++++++++++++++++++++++++-------------------
 tcp.h          |  15 +--
 tcp_buf.c      |  10 +-
 tcp_internal.h |   7 +-
 tcp_splice.c   |   2 +-
 tcp_vu.c       |  22 +++--
 tcp_vu.h       |   6 +-
 test/lib/setup |  21 +++--
 test/run       |  23 +++++
 udp.c          |  45 +++++----
 udp.h          |   9 +-
 udp_flow.c     |  27 +++---
 udp_flow.h     |   4 +-
 udp_internal.h |   3 +-
 udp_vu.c       |   6 +-
 udp_vu.h       |   3 +-
 vhost_user.c   |  14 ++-
 vhost_user.h   |   9 --
 virtio.h       |   2 +-
 vu_common.c    |  22 +++--
 vu_common.h    |   3 +-
 38 files changed, 557 insertions(+), 358 deletions(-)

-- 
2.54.0


^ permalink raw reply	[flat|nested] 13+ messages in thread

* [PATCH v5 01/12] tap: Remove pool parameter from tap4_handler() and tap6_handler()
  2026-06-16 12:51 [PATCH v5 00/12] vhost-user: Add multiqueue support Laurent Vivier
@ 2026-06-16 12:51 ` Laurent Vivier
  2026-06-16 12:51 ` [PATCH v5 02/12] vhost-user: Advertise multiqueue support Laurent Vivier
                   ` (10 subsequent siblings)
  11 siblings, 0 replies; 13+ messages in thread
From: Laurent Vivier @ 2026-06-16 12:51 UTC (permalink / raw)
  To: passt-dev; +Cc: Laurent Vivier, David Gibson

These handlers only ever operate on their respective global pools
(pool_tap4 and pool_tap6). The pool parameter was always passed the
same value, making it unnecessary indirection.

Access the global pools directly instead, simplifying the function
signatures.

Signed-off-by: Laurent Vivier <lvivier@redhat.com>
Reviewed-by: David Gibson <david@gibson.dropbear.id.au>
---
 tap.c | 46 +++++++++++++++++++++-------------------------
 1 file changed, 21 insertions(+), 25 deletions(-)

diff --git a/tap.c b/tap.c
index 4cba4c785617..4699c5ef9177 100644
--- a/tap.c
+++ b/tap.c
@@ -699,23 +699,21 @@ static bool tap4_is_fragment(const struct iphdr *iph,
 /**
  * tap4_handler() - IPv4 and ARP packet handler for tap file descriptor
  * @c:		Execution context
- * @in:		Ingress packet pool, packets with Ethernet headers
  * @now:	Current timestamp
  *
  * Return: count of packets consumed by handlers
  */
-static int tap4_handler(struct ctx *c, const struct pool *in,
-			const struct timespec *now)
+static int tap4_handler(struct ctx *c, const struct timespec *now)
 {
 	unsigned int i, j, seq_count;
 	struct tap4_l4_t *seq;
 
-	if (!c->ifi4 || !in->count)
-		return in->count;
+	if (!c->ifi4 || !pool_tap4->count)
+		return pool_tap4->count;
 
 	i = 0;
 resume:
-	for (seq_count = 0, seq = NULL; i < in->count; i++) {
+	for (seq_count = 0, seq = NULL; i < pool_tap4->count; i++) {
 		size_t l3len, hlen, l4len;
 		struct ethhdr eh_storage;
 		struct iphdr iph_storage;
@@ -725,7 +723,7 @@ resume:
 		struct iov_tail data;
 		struct iphdr *iph;
 
-		if (!packet_get(in, i, &data))
+		if (!packet_get(pool_tap4, i, &data))
 			continue;
 
 		eh = IOV_PEEK_HEADER(&data, eh_storage);
@@ -797,7 +795,7 @@ resume:
 		if (iph->protocol == IPPROTO_UDP) {
 			struct iov_tail eh_data;
 
-			packet_get(in, i, &eh_data);
+			packet_get(pool_tap4, i, &eh_data);
 			if (dhcp(c, &eh_data))
 				continue;
 		}
@@ -828,7 +826,7 @@ resume:
 			goto append;
 
 		if (seq_count == TAP_SEQS)
-			break;	/* Resume after flushing if i < in->count */
+			break;	/* Resume after flushing if i < pool_tap4->count */
 
 		for (seq = tap4_l4 + seq_count - 1; seq >= tap4_l4; seq--) {
 			if (L4_MATCH(iph, uh, seq)) {
@@ -874,10 +872,10 @@ append:
 		}
 	}
 
-	if (i < in->count)
+	if (i < pool_tap4->count)
 		goto resume;
 
-	return in->count;
+	return pool_tap4->count;
 }
 
 #define IPV6_NH_OPT(nh)							\
@@ -935,23 +933,21 @@ found:
 /**
  * tap6_handler() - IPv6 packet handler for tap file descriptor
  * @c:		Execution context
- * @in:		Ingress packet pool, packets with Ethernet headers
  * @now:	Current timestamp
  *
  * Return: count of packets consumed by handlers
  */
-static int tap6_handler(struct ctx *c, const struct pool *in,
-			const struct timespec *now)
+static int tap6_handler(struct ctx *c, const struct timespec *now)
 {
 	unsigned int i, j, seq_count = 0;
 	struct tap6_l4_t *seq;
 
-	if (!c->ifi6 || !in->count)
-		return in->count;
+	if (!c->ifi6 || !pool_tap6->count)
+		return pool_tap6->count;
 
 	i = 0;
 resume:
-	for (seq_count = 0, seq = NULL; i < in->count; i++) {
+	for (seq_count = 0, seq = NULL; i < pool_tap6->count; i++) {
 		size_t l4len, plen, check;
 		struct in6_addr *saddr, *daddr;
 		struct ipv6hdr ip6h_storage;
@@ -963,7 +959,7 @@ resume:
 		struct ipv6hdr *ip6h;
 		uint8_t proto;
 
-		if (!packet_get(in, i, &data))
+		if (!packet_get(pool_tap6, i, &data))
 			return -1;
 
 		eh = IOV_REMOVE_HEADER(&data, eh_storage);
@@ -1076,7 +1072,7 @@ resume:
 			goto append;
 
 		if (seq_count == TAP_SEQS)
-			break;	/* Resume after flushing if i < in->count */
+			break;	/* Resume after flushing if i < pool_tap6->count */
 
 		for (seq = tap6_l4 + seq_count - 1; seq >= tap6_l4; seq--) {
 			if (L4_MATCH(ip6h, proto, uh, seq)) {
@@ -1123,10 +1119,10 @@ append:
 		}
 	}
 
-	if (i < in->count)
+	if (i < pool_tap6->count)
 		goto resume;
 
-	return in->count;
+	return pool_tap6->count;
 }
 
 /**
@@ -1145,8 +1141,8 @@ void tap_flush_pools(void)
  */
 void tap_handler(struct ctx *c, const struct timespec *now)
 {
-	tap4_handler(c, pool_tap4, now);
-	tap6_handler(c, pool_tap6, now);
+	tap4_handler(c, now);
+	tap6_handler(c, now);
 }
 
 /**
@@ -1182,14 +1178,14 @@ void tap_add_packet(struct ctx *c, struct iov_tail *data,
 	case ETH_P_ARP:
 	case ETH_P_IP:
 		if (!pool_can_fit(pool_tap4, data)) {
-			tap4_handler(c, pool_tap4, now);
+			tap4_handler(c, now);
 			pool_flush(pool_tap4);
 		}
 		packet_add(pool_tap4, data);
 		break;
 	case ETH_P_IPV6:
 		if (!pool_can_fit(pool_tap6, data)) {
-			tap6_handler(c, pool_tap6, now);
+			tap6_handler(c, now);
 			pool_flush(pool_tap6);
 		}
 		packet_add(pool_tap6, data);
-- 
2.54.0


^ permalink raw reply	[flat|nested] 13+ messages in thread

* [PATCH v5 02/12] vhost-user: Advertise multiqueue support
  2026-06-16 12:51 [PATCH v5 00/12] vhost-user: Add multiqueue support Laurent Vivier
  2026-06-16 12:51 ` [PATCH v5 01/12] tap: Remove pool parameter from tap4_handler() and tap6_handler() Laurent Vivier
@ 2026-06-16 12:51 ` Laurent Vivier
  2026-06-16 12:51 ` [PATCH v5 03/12] test: Add multiqueue support to vhost-user test infrastructure Laurent Vivier
                   ` (9 subsequent siblings)
  11 siblings, 0 replies; 13+ messages in thread
From: Laurent Vivier @ 2026-06-16 12:51 UTC (permalink / raw)
  To: passt-dev; +Cc: Laurent Vivier

Allow the guest to negotiate multiple virtqueue pairs by advertising
VIRTIO_NET_F_MQ and VHOST_USER_PROTOCOL_F_MQ feature flags, and
increase VHOST_USER_MAX_VQS from 2 to 32, supporting up to 16 queue
pairs.

Replace the VHOST_USER_RX_QUEUE, VHOST_USER_TX_QUEUE,
VHOST_USER_IS_QUEUE_TX(), and VHOST_USER_IS_QUEUE_RX() macros with a
general set of QPAIR_* macros in passt.h that translate between queue
pair numbers and virtqueue indices.  These are needed now that queue
indices are no longer limited to 0 and 1.

Add a queue pair parameter to vu_send_single(), propagating it to the
virtqueue selection.  All callers currently pass QPAIR_DEFAULT (0):
only the first RX queue is used for receiving.  The guest kernel
selects which TX queue to use for transmission.  Full multi-RX-queue
load balancing will be implemented separately.

Signed-off-by: Laurent Vivier <lvivier@redhat.com>
---
 passt.h      |  8 ++++++++
 tap.c        |  9 ++++++---
 tcp_vu.c     |  7 +++++--
 udp_vu.c     |  3 ++-
 vhost_user.c | 14 +++++++++-----
 vhost_user.h |  9 ---------
 virtio.h     |  2 +-
 vu_common.c  | 17 +++++++++++------
 vu_common.h  |  3 ++-
 9 files changed, 44 insertions(+), 28 deletions(-)

diff --git a/passt.h b/passt.h
index 07126a969551..bf8a1e037317 100644
--- a/passt.h
+++ b/passt.h
@@ -28,6 +28,14 @@ union epoll_ref;
 #include "udp.h"
 #include "vhost_user.h"
 
+/* Queue pairs consist of one RX queue (even index) and one TX queue (odd index).
+ * Example: pair 0 has RX queue 0 and TX queue 1; pair 1 has RX queue 2 and TX queue 3.
+ */
+#define QPAIR_DEFAULT			0		/* Default queue pair */
+#define QPAIR_FROMGUEST_QUEUE(qpair)	((size_t)(qpair) * 2 + 1)	/* TX queue index from pair */
+#define QPAIR_TOGUEST_QUEUE(qpair)	((size_t)(qpair) * 2)		/* RX queue index from pair */
+#define QPAIR_FROM_QUEUE(queue)		((queue) / 2)			/* Extract pair from queue */
+
 /* Default address for our end on the tap interface.  Bit 0 of byte 0 must be 0
  * (unicast) and bit 1 of byte 1 must be 1 (locally administered).  Otherwise
  * it's arbitrary.
diff --git a/tap.c b/tap.c
index 4699c5ef9177..a5d22088424f 100644
--- a/tap.c
+++ b/tap.c
@@ -155,7 +155,7 @@ void tap_send_single(const struct ctx *c, const void *data, size_t l2len)
 		tap_send_frames(c, iov, iovcnt, 1);
 		break;
 	case MODE_VU:
-		vu_send_single(c, data, l2len);
+		vu_send_single(c, QPAIR_DEFAULT, data, l2len);
 		break;
 	}
 }
@@ -1379,8 +1379,11 @@ static void tap_backend_show_hints(const struct ctx *c)
 		break;
 	case MODE_VU:
 		info("You can start qemu with:");
-		info("    kvm ... -chardev socket,id=chr0,path=%s -netdev vhost-user,id=netdev0,chardev=chr0 -device virtio-net,netdev=netdev0 -object memory-backend-memfd,id=memfd0,share=on,size=$RAMSIZE -numa node,memdev=memfd0\n",
-		     c->sock_path);
+		info("    kvm ... -chardev socket,id=chr0,path=%s "
+		     "-netdev vhost-user,id=netdev0,chardev=chr0,queues=$QUEUES "
+		     "-device virtio-net,netdev=netdev0,mq=true "
+		     "-object memory-backend-memfd,id=memfd0,share=on,size=$RAMSIZE "
+		     "-numa node,memdev=memfd0\n", c->sock_path);
 		break;
 	}
 }
diff --git a/tcp_vu.c b/tcp_vu.c
index 7e2a7dbc81e1..9ef6b5242c9c 100644
--- a/tcp_vu.c
+++ b/tcp_vu.c
@@ -124,7 +124,8 @@ static int tcp_vu_send_dup(const struct ctx *c, struct vu_virtq *vq,
 int tcp_vu_send_flag(const struct ctx *c, struct tcp_tap_conn *conn, int flags)
 {
 	struct vu_dev *vdev = c->vdev;
-	struct vu_virtq *vq = &vdev->vq[VHOST_USER_RX_QUEUE];
+	int rx_queue = QPAIR_TOGUEST_QUEUE(QPAIR_DEFAULT);
+	struct vu_virtq *vq = &vdev->vq[rx_queue];
 	size_t optlen, hdrlen, iov_cnt, iov_used;
 	struct vu_virtq_element flags_elem[2];
 	struct iov_tail payload, l2frame;
@@ -429,8 +430,10 @@ static void tcp_vu_prepare(const struct ctx *c, struct tcp_tap_conn *conn,
 int tcp_vu_data_from_sock(const struct ctx *c, struct tcp_tap_conn *conn)
 {
 	uint32_t wnd_scaled = conn->wnd_from_tap << conn->ws_from_tap;
+	unsigned int qpair = QPAIR_DEFAULT;
+	int rx_queue = QPAIR_TOGUEST_QUEUE(qpair);
 	struct vu_dev *vdev = c->vdev;
-	struct vu_virtq *vq = &vdev->vq[VHOST_USER_RX_QUEUE];
+	struct vu_virtq *vq = &vdev->vq[rx_queue];
 	uint32_t already_sent, check;
 	ssize_t len, previous_dlen;
 	int i, elem_cnt, frame_cnt;
diff --git a/udp_vu.c b/udp_vu.c
index e4fb105730bf..b1a8ad76a691 100644
--- a/udp_vu.c
+++ b/udp_vu.c
@@ -147,8 +147,9 @@ void udp_vu_sock_to_tap(const struct ctx *c, int s, int n, flow_sidx_t tosidx)
 	bool v6 = !(inany_v4(&toside->eaddr) && inany_v4(&toside->oaddr));
 	static struct vu_virtq_element elem[VIRTQUEUE_MAX_SIZE];
 	static struct iovec iov_vu[VIRTQUEUE_MAX_SIZE];
+	int rx_queue = QPAIR_TOGUEST_QUEUE(QPAIR_DEFAULT);
 	struct vu_dev *vdev = c->vdev;
-	struct vu_virtq *vq = &vdev->vq[VHOST_USER_RX_QUEUE];
+	struct vu_virtq *vq = &vdev->vq[rx_queue];
 	size_t hdrlen = udp_vu_hdrlen(v6);
 	int i;
 
diff --git a/vhost_user.c b/vhost_user.c
index a1259c2624c0..fa13c66d5f0d 100644
--- a/vhost_user.c
+++ b/vhost_user.c
@@ -324,6 +324,7 @@ static bool vu_get_features_exec(struct vu_dev *vdev,
 		1ULL << VIRTIO_F_VERSION_1 |
 		1ULL << VIRTIO_NET_F_GUEST_CSUM |
 		1ULL << VIRTIO_NET_F_MRG_RXBUF |
+		1ULL << VIRTIO_NET_F_MQ |
 		1ULL << VHOST_F_LOG_ALL |
 		1ULL << VHOST_USER_F_PROTOCOL_FEATURES;
 
@@ -770,7 +771,8 @@ static void vu_check_queue_msg_file(struct vhost_user_msg *vmsg)
 	bool nofd = vmsg->payload.u64 & VHOST_USER_VRING_NOFD_MASK;
 
 	if (idx >= VHOST_USER_MAX_VQS)
-		die("Invalid vhost-user queue index: %u", idx);
+		die("Invalid vhost-user queue index: %u (maximum %u)", idx,
+		    VHOST_USER_MAX_VQS);
 
 	if (nofd) {
 		vmsg_close_fds(vmsg);
@@ -812,7 +814,9 @@ static bool vu_set_vring_kick_exec(struct vu_dev *vdev,
 
 	vdev->vq[idx].started = true;
 
-	if (vdev->vq[idx].kick_fd != -1 && VHOST_USER_IS_QUEUE_TX(idx)) {
+	if (vdev->vq[idx].kick_fd != -1 &&
+	    QPAIR_FROMGUEST_QUEUE(QPAIR_FROM_QUEUE(idx)) ==
+	    (unsigned int)idx) {
 		vu_set_watch(vdev, idx);
 		debug("Waiting for kicks on fd: %d for vq: %d",
 		      vdev->vq[idx].kick_fd, idx);
@@ -899,7 +903,8 @@ static bool vu_get_protocol_features_exec(struct vu_dev *vdev,
 	uint64_t features = 1ULL << VHOST_USER_PROTOCOL_F_REPLY_ACK |
 			    1ULL << VHOST_USER_PROTOCOL_F_LOG_SHMFD |
 			    1ULL << VHOST_USER_PROTOCOL_F_DEVICE_STATE |
-			    1ULL << VHOST_USER_PROTOCOL_F_RARP;
+			    1ULL << VHOST_USER_PROTOCOL_F_RARP |
+			    1ULL << VHOST_USER_PROTOCOL_F_MQ;
 
 	(void)vdev;
 	vmsg_set_reply_u64(vmsg, features);
@@ -938,10 +943,9 @@ static bool vu_get_queue_num_exec(struct vu_dev *vdev,
 {
 	(void)vdev;
 
-	/* NOLINTNEXTLINE(misc-redundant-expression) */
 	vmsg_set_reply_u64(vmsg, VHOST_USER_MAX_VQS / 2);
 
-	debug("VHOST_USER_MAX_VQS  %u", VHOST_USER_MAX_VQS / 2);
+	debug("Using up to %u vhost-user queue pairs", VHOST_USER_MAX_VQS / 2);
 
 	return true;
 }
diff --git a/vhost_user.h b/vhost_user.h
index e806a9e54e00..d2e51d3e86c3 100644
--- a/vhost_user.h
+++ b/vhost_user.h
@@ -201,15 +201,6 @@ struct vhost_user_msg {
 } __attribute__ ((__packed__));
 #define VHOST_USER_HDR_SIZE sizeof(struct vhost_user_header)
 
-/* index of the RX virtqueue */
-#define VHOST_USER_RX_QUEUE 0
-/* index of the TX virtqueue */
-#define VHOST_USER_TX_QUEUE 1
-
-/* in case of multiqueue, the RX and TX queues are interleaved */
-#define VHOST_USER_IS_QUEUE_TX(n)	(n % 2)
-#define VHOST_USER_IS_QUEUE_RX(n)	(!(n % 2))
-
 /* Default virtio-net header for passt */
 #define VU_HEADER ((struct virtio_net_hdr){	\
 	.flags = VIRTIO_NET_HDR_F_DATA_VALID,	\
diff --git a/virtio.h b/virtio.h
index 8f2ae068d5ba..eb7570e63cd7 100644
--- a/virtio.h
+++ b/virtio.h
@@ -90,7 +90,7 @@ struct vu_dev_region {
 	uint64_t mmap_addr;
 };
 
-#define VHOST_USER_MAX_VQS 2U
+#define VHOST_USER_MAX_VQS 32U
 
 /*
  * Set a reasonable maximum number of ram slots, which will be supported by
diff --git a/vu_common.c b/vu_common.c
index d07f584f228a..8b555ea9a8b1 100644
--- a/vu_common.c
+++ b/vu_common.c
@@ -175,7 +175,8 @@ static void vu_handle_tx(struct vu_dev *vdev, int index,
 	int out_sg_count;
 	int count;
 
-	assert(VHOST_USER_IS_QUEUE_TX(index));
+	assert(QPAIR_FROMGUEST_QUEUE(QPAIR_FROM_QUEUE(index)) ==
+	       (unsigned int)index);
 
 	tap_flush_pools();
 
@@ -233,28 +234,32 @@ void vu_kick_cb(struct vu_dev *vdev, union epoll_ref ref,
 
 	trace("vhost-user: got kick_data: %016"PRIx64" idx: %d",
 	      kick_data, ref.queue);
-	if (VHOST_USER_IS_QUEUE_TX(ref.queue))
+	if (QPAIR_FROMGUEST_QUEUE(QPAIR_FROM_QUEUE(ref.queue)) ==
+	    (unsigned int)ref.queue)
 		vu_handle_tx(vdev, ref.queue, now);
 }
 
 /**
- * vu_send_single() - Send a buffer to the front-end using the RX virtqueue
- * @c:		execution context
+ * vu_send_single() - Send a buffer to the front-end using a specified virtqueue
+ * @c:		Execution context
+ * @qpair:	Queue pair on which to send the buffer
  * @buf:	address of the buffer
  * @size:	size of the buffer
  *
  * Return: number of bytes sent, -1 if there is an error
  */
-int vu_send_single(const struct ctx *c, const void *buf, size_t size)
+int vu_send_single(const struct ctx *c, unsigned int qpair, const void *buf, size_t size)
 {
 	struct vu_dev *vdev = c->vdev;
-	struct vu_virtq *vq = &vdev->vq[VHOST_USER_RX_QUEUE];
 	struct vu_virtq_element elem[VIRTQUEUE_MAX_SIZE];
 	struct iovec in_sg[VIRTQUEUE_MAX_SIZE];
+	struct vu_virtq *vq;
 	size_t total, in_total;
 	int elem_cnt;
 	int i;
 
+	vq = &vdev->vq[QPAIR_TOGUEST_QUEUE(qpair)];
+
 	trace("vu_send_single size %zu", size);
 
 	if (!vu_queue_enabled(vq) || !vu_queue_started(vq)) {
diff --git a/vu_common.h b/vu_common.h
index 817384175a1d..f5603d9ddeb6 100644
--- a/vu_common.h
+++ b/vu_common.h
@@ -23,7 +23,8 @@ void vu_flush(const struct vu_dev *vdev, struct vu_virtq *vq,
 	      struct vu_virtq_element *elem, int elem_cnt, size_t frame_len);
 void vu_kick_cb(struct vu_dev *vdev, union epoll_ref ref,
 		const struct timespec *now);
-int vu_send_single(const struct ctx *c, const void *buf, size_t size);
+int vu_send_single(const struct ctx *c, unsigned int qpair, const void *buf,
+		   size_t size);
 void vu_pad(const struct iovec *iov, size_t cnt, size_t frame_len);
 
 #endif /* VU_COMMON_H */
-- 
2.54.0


^ permalink raw reply	[flat|nested] 13+ messages in thread

* [PATCH v5 03/12] test: Add multiqueue support to vhost-user test infrastructure
  2026-06-16 12:51 [PATCH v5 00/12] vhost-user: Add multiqueue support Laurent Vivier
  2026-06-16 12:51 ` [PATCH v5 01/12] tap: Remove pool parameter from tap4_handler() and tap6_handler() Laurent Vivier
  2026-06-16 12:51 ` [PATCH v5 02/12] vhost-user: Advertise multiqueue support Laurent Vivier
@ 2026-06-16 12:51 ` Laurent Vivier
  2026-06-16 12:51 ` [PATCH v5 04/12] tap: Thread queue pair through all remaining tap paths Laurent Vivier
                   ` (8 subsequent siblings)
  11 siblings, 0 replies; 13+ messages in thread
From: Laurent Vivier @ 2026-06-16 12:51 UTC (permalink / raw)
  To: passt-dev; +Cc: Laurent Vivier, David Gibson

With the recent addition of multiqueue support to passt's vhost-user
implementation, we need test coverage to validate the functionality. The
test infrastructure previously only tested single queue configurations.

Add a VHOST_USER_MQ environment variable to control the number of queue
pairs. The queues parameter on the netdev is always set to this value
(defaulting to 1 for single queue). When set to values greater than 1,
the setup scripts add mq=true to the virtio-net device for enabling
multiqueue support.

The test suite now runs an additional set of tests with 8 queue pairs to
exercise the multiqueue paths across all protocols (TCP, UDP, ICMP) and
services (DHCP, NDP). Note that the guest kernel will only enable as many
queues as there are vCPUs.

Signed-off-by: Laurent Vivier <lvivier@redhat.com>
Reviewed-by: David Gibson <david@gibson.dropbear.id.au>
---
 test/lib/setup | 21 +++++++++++++--------
 test/run       | 23 +++++++++++++++++++++++
 2 files changed, 36 insertions(+), 8 deletions(-)

diff --git a/test/lib/setup b/test/lib/setup
index 5994598744a3..3872a02b109b 100755
--- a/test/lib/setup
+++ b/test/lib/setup
@@ -18,6 +18,8 @@ VCPUS="$( [ $(nproc) -ge 8 ] && echo 6 || echo $(( $(nproc) / 2 + 1 )) )"
 MEM_KIB="$(sed -n 's/MemTotal:[ ]*\([0-9]*\) kB/\1/p' /proc/meminfo)"
 QEMU_ARCH="$(uname -m)"
 [ "${QEMU_ARCH}" = "i686" ] && QEMU_ARCH=i386
+VHOST_USER=0
+VHOST_USER_MQ=1
 
 # setup_build() - Set up pane layout for build tests
 setup_build() {
@@ -46,6 +48,7 @@ setup_passt() {
 	[ ${DEBUG} -eq 1 ] && __opts="${__opts} -d"
 	[ ${TRACE} -eq 1 ] && __opts="${__opts} --trace"
 	[ ${VHOST_USER} -eq 1 ] && __opts="${__opts} --vhost-user"
+	[ ${VHOST_USER_MQ} -gt 1 ] && __virtio_opts="${__virtio_opts},mq=true"
 
 	context_run passt "make clean"
 	context_run passt "make valgrind"
@@ -59,8 +62,8 @@ setup_passt() {
 		__vmem="$(((${__vmem} + 500) / 1000))G"
 		__qemu_netdev="						       \
 			-chardev socket,id=c,path=${STATESETUP}/passt.socket   \
-			-netdev vhost-user,id=v,chardev=c		       \
-			-device virtio-net,netdev=v			       \
+			-netdev vhost-user,id=v,chardev=c,queues=${VHOST_USER_MQ} \
+			-device virtio-net,netdev=v${__virtio_opts}            \
 			-object memory-backend-memfd,id=m,share=on,size=${__vmem} \
 			-numa node,memdev=m"
 	else
@@ -156,6 +159,7 @@ setup_passt_in_ns() {
 	[ ${DEBUG} -eq 1 ] && __opts="${__opts} -d"
 	[ ${TRACE} -eq 1 ] && __opts="${__opts} --trace"
 	[ ${VHOST_USER} -eq 1 ] && __opts="${__opts} --vhost-user"
+	[ ${VHOST_USER_MQ} -gt 1 ] && __virtio_opts="${__virtio_opts},mq=true"
 
 	if [ ${VALGRIND} -eq 1 ]; then
 		context_run passt "make clean"
@@ -173,8 +177,8 @@ setup_passt_in_ns() {
 		__vmem="$(((${__vmem} + 500) / 1000))G"
 		__qemu_netdev="						       \
 			-chardev socket,id=c,path=${STATESETUP}/passt.socket   \
-			-netdev vhost-user,id=v,chardev=c		       \
-			-device virtio-net,netdev=v			       \
+			-netdev vhost-user,id=v,chardev=c,queues=${VHOST_USER_MQ} \
+			-device virtio-net,netdev=v${__virtio_opts}            \
 			-object memory-backend-memfd,id=m,share=on,size=${__vmem} \
 			-numa node,memdev=m"
 	else
@@ -251,6 +255,7 @@ setup_two_guests() {
 	[ ${DEBUG} -eq 1 ] && __opts="${__opts} -d"
 	[ ${TRACE} -eq 1 ] && __opts="${__opts} --trace"
 	[ ${VHOST_USER} -eq 1 ] && __opts="${__opts} --vhost-user"
+	[ ${VHOST_USER_MQ} -gt 1 ] && __virtio_opts="${__virtio_opts},mq=true"
 
 	context_run_bg passt_2 "./passt -s ${STATESETUP}/passt_2.socket -P ${STATESETUP}/passt_2.pid -f ${__opts} --hostname hostname2 --fqdn fqdn2 -t 10004 -u 10004"
 	wait_for [ -f "${STATESETUP}/passt_2.pid" ]
@@ -260,14 +265,14 @@ setup_two_guests() {
 		__vmem="$(((${__vmem} + 500) / 1000))G"
 		__qemu_netdev1="					       \
 			-chardev socket,id=c,path=${STATESETUP}/passt_1.socket \
-			-netdev vhost-user,id=v,chardev=c		       \
-			-device virtio-net,netdev=v			       \
+			-netdev vhost-user,id=v,chardev=c,queues=${VHOST_USER_MQ} \
+			-device virtio-net,netdev=v${__virtio_opts}            \
 			-object memory-backend-memfd,id=m,share=on,size=${__vmem} \
 			-numa node,memdev=m"
 		__qemu_netdev2="					       \
 			-chardev socket,id=c,path=${STATESETUP}/passt_2.socket \
-			-netdev vhost-user,id=v,chardev=c		       \
-			-device virtio-net,netdev=v			       \
+			-netdev vhost-user,id=v,chardev=c,queues=${VHOST_USER_MQ} \
+			-device virtio-net,netdev=v${__virtio_opts}            \
 			-object memory-backend-memfd,id=m,share=on,size=${__vmem} \
 			-numa node,memdev=m"
 	else
diff --git a/test/run b/test/run
index f858e5586847..652cc12b1234 100755
--- a/test/run
+++ b/test/run
@@ -190,6 +190,29 @@ run() {
 	test passt_vu_in_ns/shutdown
 	teardown passt_in_ns
 
+	VHOST_USER=1
+	VHOST_USER_MQ=8
+	setup passt_in_ns
+	test passt_vu/ndp
+	test passt_vu_in_ns/dhcp
+	test passt_vu_in_ns/icmp
+	test passt_vu_in_ns/tcp
+	test passt_vu_in_ns/udp
+	test passt_vu_in_ns/shutdown
+	teardown passt_in_ns
+
+	setup two_guests
+	test two_guests_vu/basic
+	teardown two_guests
+
+	setup passt_in_ns
+	test passt_vu/ndp
+	test passt_vu_in_ns/dhcp
+	test perf/passt_vu_tcp
+	test perf/passt_vu_udp
+	test passt_vu_in_ns/shutdown
+	teardown passt_in_ns
+
 	# TODO: Make those faster by at least pre-installing gcc and make on
 	# non-x86 images, then re-enable.
 skip_distro() {
-- 
2.54.0


^ permalink raw reply	[flat|nested] 13+ messages in thread

* [PATCH v5 04/12] tap: Thread queue pair through all remaining tap paths
  2026-06-16 12:51 [PATCH v5 00/12] vhost-user: Add multiqueue support Laurent Vivier
                   ` (2 preceding siblings ...)
  2026-06-16 12:51 ` [PATCH v5 03/12] test: Add multiqueue support to vhost-user test infrastructure Laurent Vivier
@ 2026-06-16 12:51 ` Laurent Vivier
  2026-06-16 12:51 ` [PATCH v5 05/12] arp: Pass queue pair explicitly through ARP send path Laurent Vivier
                   ` (7 subsequent siblings)
  11 siblings, 0 replies; 13+ messages in thread
From: Laurent Vivier @ 2026-06-16 12:51 UTC (permalink / raw)
  To: passt-dev; +Cc: Laurent Vivier

The previous commit threaded queue pair through vu_send_single().
Extend this to every other tap entry point: tap_send_single(), receive
handlers (tap_add_packet, tap_handler, tap4/6_handler), send helpers
(tap_udp4/6_send, tap_icmp4/6_send), and connection setup
(tap_listen_handler, tap_start_connection).

All callers pass QPAIR_DEFAULT except vu_handle_tx(), which derives
the queue pair from the virtqueue index with QPAIR_FROM_QUEUE().

The parameter is plumbed but not yet consumed.  Subsequent patches will
use it to direct traffic to the correct queue pair.

No functional change.

Signed-off-by: Laurent Vivier <lvivier@redhat.com>
---
 arp.c       |  6 ++--
 dhcp.c      |  3 +-
 dhcpv6.c    |  4 +--
 icmp.c      |  6 ++--
 ndp.c       |  2 +-
 passt.c     |  2 +-
 tap.c       | 79 +++++++++++++++++++++++++++++++++--------------------
 tap.h       | 21 +++++++-------
 tcp.c       |  2 +-
 udp.c       |  4 +--
 vu_common.c |  5 ++--
 11 files changed, 80 insertions(+), 54 deletions(-)

diff --git a/arp.c b/arp.c
index bb042e9585a3..e97c4de86a99 100644
--- a/arp.c
+++ b/arp.c
@@ -112,7 +112,7 @@ int arp(const struct ctx *c, struct iov_tail *data)
 	memcpy(resp.am.tha,		am->sha,	sizeof(resp.am.tha));
 	memcpy(resp.am.tip,		am->sip,	sizeof(resp.am.tip));
 
-	tap_send_single(c, &resp, sizeof(resp));
+	tap_send_single(c, QPAIR_DEFAULT, &resp, sizeof(resp));
 
 	return 1;
 }
@@ -148,7 +148,7 @@ void arp_send_init_req(const struct ctx *c)
 	memcpy(req.am.tip,	&c->ip4.addr,		sizeof(req.am.tip));
 
 	debug("Sending initial ARP request for guest MAC address");
-	tap_send_single(c, &req, sizeof(req));
+	tap_send_single(c, QPAIR_DEFAULT, &req, sizeof(req));
 }
 
 /**
@@ -202,5 +202,5 @@ void arp_announce(const struct ctx *c, struct in_addr *ip,
 	eth_ntop(mac, mac_str, sizeof(mac_str));
 	debug("ARP announcement for %s / %s", ip_str, mac_str);
 
-	tap_send_single(c, &msg, sizeof(msg));
+	tap_send_single(c, QPAIR_DEFAULT, &msg, sizeof(msg));
 }
diff --git a/dhcp.c b/dhcp.c
index 1ff8cba9f93d..e652659138b8 100644
--- a/dhcp.c
+++ b/dhcp.c
@@ -473,7 +473,8 @@ int dhcp(const struct ctx *c, struct iov_tail *data)
 	else
 		dst = c->ip4.addr;
 
-	tap_udp4_send(c, c->ip4.our_tap_addr, 67, dst, 68, &reply, dlen);
+	tap_udp4_send(c, QPAIR_DEFAULT, c->ip4.our_tap_addr, 67, dst, 68,
+		      &reply, dlen);
 
 	return 1;
 }
diff --git a/dhcpv6.c b/dhcpv6.c
index 97c04e2cb846..f4ebeccf912c 100644
--- a/dhcpv6.c
+++ b/dhcpv6.c
@@ -405,7 +405,7 @@ static void dhcpv6_send_ia_notonlink(struct ctx *c,
 
 	resp_not_on_link.hdr.xid = xid;
 
-	tap_udp6_send(c, src, 547, tap_ip6_daddr(c, src), 546,
+	tap_udp6_send(c, QPAIR_DEFAULT, src, 547, tap_ip6_daddr(c, src), 546,
 		      xid, &resp_not_on_link, n);
 }
 
@@ -680,7 +680,7 @@ int dhcpv6(struct ctx *c, struct iov_tail *data,
 
 	resp.hdr.xid = mh->xid;
 
-	tap_udp6_send(c, src, 547, tap_ip6_daddr(c, src), 546,
+	tap_udp6_send(c, QPAIR_DEFAULT, src, 547, tap_ip6_daddr(c, src), 546,
 		      mh->xid, &resp, n);
 	c->ip6.addr_seen = c->ip6.addr;
 
diff --git a/icmp.c b/icmp.c
index 18b6106a393e..3705f5ce0c9b 100644
--- a/icmp.c
+++ b/icmp.c
@@ -132,12 +132,14 @@ void icmp_sock_handler(const struct ctx *c, union epoll_ref ref)
 		const struct in_addr *daddr = inany_v4(&ini->eaddr);
 
 		assert(saddr && daddr); /* Must have IPv4 addresses */
-		tap_icmp4_send(c, *saddr, *daddr, buf, pingf->f.tap_omac, n);
+		tap_icmp4_send(c, QPAIR_DEFAULT, *saddr, *daddr, buf,
+			       pingf->f.tap_omac, n);
 	} else if (pingf->f.type == FLOW_PING6) {
 		const struct in6_addr *saddr = &ini->oaddr.a6;
 		const struct in6_addr *daddr = &ini->eaddr.a6;
 
-		tap_icmp6_send(c, saddr, daddr, buf, pingf->f.tap_omac, n);
+		tap_icmp6_send(c, QPAIR_DEFAULT, saddr, daddr, buf,
+			       pingf->f.tap_omac, n);
 	}
 	return;
 
diff --git a/ndp.c b/ndp.c
index 1f2bcb0cc7ea..6269cb38d93f 100644
--- a/ndp.c
+++ b/ndp.c
@@ -184,7 +184,7 @@ static void ndp_send(const struct ctx *c, const struct in6_addr *dst,
 {
 	const struct in6_addr *src = &c->ip6.our_tap_ll;
 
-	tap_icmp6_send(c, src, dst, buf, c->our_tap_mac, l4len);
+	tap_icmp6_send(c, QPAIR_DEFAULT, src, dst, buf, c->our_tap_mac, l4len);
 }
 
 /**
diff --git a/passt.c b/passt.c
index 06f9ddb9b459..71eb4f0192e2 100644
--- a/passt.c
+++ b/passt.c
@@ -242,7 +242,7 @@ static void passt_worker(void *opaque, int nfds, struct epoll_event *events)
 			tap_handler_passt(c, eventmask, &now);
 			break;
 		case EPOLL_TYPE_TAP_LISTEN:
-			tap_listen_handler(c, eventmask);
+			tap_listen_handler(c, QPAIR_DEFAULT, eventmask);
 			break;
 		case EPOLL_TYPE_NSQUIT_INOTIFY:
 			pasta_netns_quit_inotify_handler(c, ref.fd);
diff --git a/tap.c b/tap.c
index a5d22088424f..521ccd6d47e7 100644
--- a/tap.c
+++ b/tap.c
@@ -124,10 +124,12 @@ unsigned long tap_l2_max_len(const struct ctx *c)
 /**
  * tap_send_single() - Send a single frame
  * @c:		Execution context
+ * @qpair:	Queue pair on which to send the frame
  * @data:	Packet buffer
  * @l2len:	Total L2 packet length
  */
-void tap_send_single(const struct ctx *c, const void *data, size_t l2len)
+void tap_send_single(const struct ctx *c, unsigned int qpair, const void *data,
+		     size_t l2len)
 {
 	uint8_t padded[ETH_ZLEN] = { 0 };
 	struct iovec iov[2];
@@ -155,7 +157,7 @@ void tap_send_single(const struct ctx *c, const void *data, size_t l2len)
 		tap_send_frames(c, iov, iovcnt, 1);
 		break;
 	case MODE_VU:
-		vu_send_single(c, QPAIR_DEFAULT, data, l2len);
+		vu_send_single(c, qpair, data, l2len);
 		break;
 	}
 }
@@ -258,6 +260,7 @@ void *tap_push_uh4(struct udphdr *uh, struct in_addr src, in_port_t sport,
 /**
  * tap_udp4_send() - Send UDP over IPv4 packet
  * @c:		Execution context
+ * @qpair:	Queue pair on which to send packet
  * @src:	IPv4 source address
  * @sport:	UDP source port
  * @dst:	IPv4 destination address
@@ -265,8 +268,8 @@ void *tap_push_uh4(struct udphdr *uh, struct in_addr src, in_port_t sport,
  * @in:	UDP payload contents (not including UDP header)
  * @dlen:	UDP payload length (not including UDP header)
  */
-void tap_udp4_send(const struct ctx *c, struct in_addr src, in_port_t sport,
-		   struct in_addr dst, in_port_t dport,
+void tap_udp4_send(const struct ctx *c, unsigned int qpair, struct in_addr src,
+		   in_port_t sport, struct in_addr dst, in_port_t dport,
 		   const void *in, size_t dlen)
 {
 	size_t l4len = dlen + sizeof(struct udphdr);
@@ -276,20 +279,22 @@ void tap_udp4_send(const struct ctx *c, struct in_addr src, in_port_t sport,
 	char *data = tap_push_uh4(uh, src, sport, dst, dport, in, dlen);
 
 	memcpy(data, in, dlen);
-	tap_send_single(c, buf, dlen + (data - buf));
+	tap_send_single(c, qpair, buf, dlen + (data - buf));
 }
 
 /**
  * tap_icmp4_send() - Send ICMPv4 packet
  * @c:		Execution context
+ * @qpair:	Queue pair on which to send packet
  * @src:	IPv4 source address
  * @dst:	IPv4 destination address
  * @in:		ICMP packet, including ICMP header
  * @src_mac:	MAC address to be used as source for message
  * @l4len:	ICMP packet length, including ICMP header
  */
-void tap_icmp4_send(const struct ctx *c, struct in_addr src, struct in_addr dst,
-		    const void *in, const void *src_mac, size_t l4len)
+void tap_icmp4_send(const struct ctx *c, unsigned int qpair, struct in_addr src,
+		    struct in_addr dst, const void *in, const void *src_mac,
+		    size_t l4len)
 {
 	char buf[USHRT_MAX];
 	struct iphdr *ip4h = tap_push_l2h(c, buf, src_mac, ETH_P_IP);
@@ -299,7 +304,7 @@ void tap_icmp4_send(const struct ctx *c, struct in_addr src, struct in_addr dst,
 	memcpy(icmp4h, in, l4len);
 	csum_icmp4(icmp4h, icmp4h + 1, l4len - sizeof(*icmp4h));
 
-	tap_send_single(c, buf, l4len + ((char *)icmp4h - buf));
+	tap_send_single(c, qpair, buf, l4len + ((char *)icmp4h - buf));
 }
 
 /**
@@ -363,6 +368,7 @@ void *tap_push_uh6(struct udphdr *uh,
 /**
  * tap_udp6_send() - Send UDP over IPv6 packet
  * @c:		Execution context
+ * @qpair:	Queue pair on which to send packet
  * @src:	IPv6 source address
  * @sport:	UDP source port
  * @dst:	IPv6 destination address
@@ -371,7 +377,7 @@ void *tap_push_uh6(struct udphdr *uh,
  * @in:	UDP payload contents (not including UDP header)
  * @dlen:	UDP payload length (not including UDP header)
  */
-void tap_udp6_send(const struct ctx *c,
+void tap_udp6_send(const struct ctx *c, unsigned int qpair,
 		   const struct in6_addr *src, in_port_t sport,
 		   const struct in6_addr *dst, in_port_t dport,
 		   uint32_t flow, void *in, size_t dlen)
@@ -384,19 +390,20 @@ void tap_udp6_send(const struct ctx *c,
 	char *data = tap_push_uh6(uh, src, sport, dst, dport, in, dlen);
 
 	memcpy(data, in, dlen);
-	tap_send_single(c, buf, dlen + (data - buf));
+	tap_send_single(c, qpair, buf, dlen + (data - buf));
 }
 
 /**
  * tap_icmp6_send() - Send ICMPv6 packet
  * @c:		Execution context
+ * @qpair:	Queue pair on which to send packet
  * @src:	IPv6 source address
  * @dst:	IPv6 destination address
  * @in:		ICMP packet, including ICMP header
  * @src_mac:	MAC address to be used as source for message
  * @l4len:	ICMP packet length, including ICMP header
  */
-void tap_icmp6_send(const struct ctx *c,
+void tap_icmp6_send(const struct ctx *c, unsigned int qpair,
 		    const struct in6_addr *src, const struct in6_addr *dst,
 		    const void *in, const void *src_mac, size_t l4len)
 {
@@ -408,7 +415,7 @@ void tap_icmp6_send(const struct ctx *c,
 	memcpy(icmp6h, in, l4len);
 	csum_icmp6(icmp6h, src, dst, icmp6h + 1, l4len - sizeof(*icmp6h));
 
-	tap_send_single(c, buf, l4len + ((char *)icmp6h - buf));
+	tap_send_single(c, qpair, buf, l4len + ((char *)icmp6h - buf));
 }
 
 /**
@@ -699,15 +706,19 @@ static bool tap4_is_fragment(const struct iphdr *iph,
 /**
  * tap4_handler() - IPv4 and ARP packet handler for tap file descriptor
  * @c:		Execution context
+ * @qpair:	Queue pair on which to send packets
  * @now:	Current timestamp
  *
  * Return: count of packets consumed by handlers
  */
-static int tap4_handler(struct ctx *c, const struct timespec *now)
+static int tap4_handler(struct ctx *c, unsigned int qpair,
+			const struct timespec *now)
 {
 	unsigned int i, j, seq_count;
 	struct tap4_l4_t *seq;
 
+	(void)qpair;
+
 	if (!c->ifi4 || !pool_tap4->count)
 		return pool_tap4->count;
 
@@ -933,15 +944,19 @@ found:
 /**
  * tap6_handler() - IPv6 packet handler for tap file descriptor
  * @c:		Execution context
+ * @qpair:	Queue pair on which to send packets
  * @now:	Current timestamp
  *
  * Return: count of packets consumed by handlers
  */
-static int tap6_handler(struct ctx *c, const struct timespec *now)
+static int tap6_handler(struct ctx *c, unsigned int qpair,
+			const struct timespec *now)
 {
 	unsigned int i, j, seq_count = 0;
 	struct tap6_l4_t *seq;
 
+	(void)qpair;
+
 	if (!c->ifi6 || !pool_tap6->count)
 		return pool_tap6->count;
 
@@ -1137,21 +1152,23 @@ void tap_flush_pools(void)
 /**
  * tap_handler() - IPv4/IPv6 and ARP packet handler for tap file descriptor
  * @c:		Execution context
+ * @qpair:	Queue pair on which to send packets
  * @now:	Current timestamp
  */
-void tap_handler(struct ctx *c, const struct timespec *now)
+void tap_handler(struct ctx *c, unsigned int qpair, const struct timespec *now)
 {
-	tap4_handler(c, now);
-	tap6_handler(c, now);
+	tap4_handler(c, qpair, now);
+	tap6_handler(c, qpair, now);
 }
 
 /**
  * tap_add_packet() - Queue/capture packet, update notion of guest MAC address
  * @c:		Execution context
+ * @qpair:	Queue pair associated with the packet
  * @data:	Packet to add to the pool
  * @now:	Current timestamp
  */
-void tap_add_packet(struct ctx *c, struct iov_tail *data,
+void tap_add_packet(struct ctx *c, unsigned int qpair, struct iov_tail *data,
 		    const struct timespec *now)
 {
 	size_t l2len = iov_tail_size(data);
@@ -1178,14 +1195,14 @@ void tap_add_packet(struct ctx *c, struct iov_tail *data,
 	case ETH_P_ARP:
 	case ETH_P_IP:
 		if (!pool_can_fit(pool_tap4, data)) {
-			tap4_handler(c, now);
+			tap4_handler(c, qpair, now);
 			pool_flush(pool_tap4);
 		}
 		packet_add(pool_tap4, data);
 		break;
 	case ETH_P_IPV6:
 		if (!pool_can_fit(pool_tap6, data)) {
-			tap6_handler(c, now);
+			tap6_handler(c, qpair, now);
 			pool_flush(pool_tap6);
 		}
 		packet_add(pool_tap6, data);
@@ -1270,7 +1287,7 @@ static void tap_passt_input(struct ctx *c, const struct timespec *now)
 		n -= sizeof(uint32_t);
 
 		data = IOV_TAIL_FROM_BUF(p, l2len, 0);
-		tap_add_packet(c, &data, now);
+		tap_add_packet(c, QPAIR_DEFAULT, &data, now);
 
 		p += l2len;
 		n -= l2len;
@@ -1279,7 +1296,7 @@ static void tap_passt_input(struct ctx *c, const struct timespec *now)
 	partial_len = n;
 	partial_frame = p;
 
-	tap_handler(c, now);
+	tap_handler(c, QPAIR_DEFAULT, now);
 }
 
 /**
@@ -1338,10 +1355,10 @@ static void tap_pasta_input(struct ctx *c, const struct timespec *now)
 			continue;
 
 		data = IOV_TAIL_FROM_BUF(pkt_buf + n, len, 0);
-		tap_add_packet(c, &data, now);
+		tap_add_packet(c, QPAIR_DEFAULT, &data, now);
 	}
 
-	tap_handler(c, now);
+	tap_handler(c, QPAIR_DEFAULT, now);
 }
 
 /**
@@ -1430,11 +1447,14 @@ bool tap_is_ready(const struct ctx *c)
 /**
  * tap_start_connection() - start a new connection
  * @c:		Execution context
+ * @qpair:	Queue pair to use for the connection
  */
-static void tap_start_connection(const struct ctx *c)
+static void tap_start_connection(const struct ctx *c, unsigned int qpair)
 {
 	union epoll_ref ref = { 0 };
 
+	(void)qpair;
+
 	ref.fd = c->fd_tap;
 	switch (c->mode) {
 	case MODE_PASST:
@@ -1462,9 +1482,10 @@ static void tap_start_connection(const struct ctx *c)
 /**
  * tap_listen_handler() - Handle new connection on listening socket
  * @c:		Execution context
+ * @qpair:	Queue pair to use for the connection
  * @events:	epoll events
  */
-void tap_listen_handler(struct ctx *c, uint32_t events)
+void tap_listen_handler(struct ctx *c, unsigned int qpair, uint32_t events)
 {
 	int v = INT_MAX / 2;
 	struct ucred ucred;
@@ -1513,7 +1534,7 @@ void tap_listen_handler(struct ctx *c, uint32_t events)
 	    setsockopt(c->fd_tap, SOL_SOCKET, SO_SNDBUF, &v, sizeof(v)))
 		trace("tap: failed to set SO_SNDBUF to %i", v);
 
-	tap_start_connection(c);
+	tap_start_connection(c, qpair);
 }
 
 /**
@@ -1566,7 +1587,7 @@ static void tap_sock_tun_init(struct ctx *c)
 	pasta_ns_conf(c);
 
 	if (!c->splice_only)
-		tap_start_connection(c);
+		tap_start_connection(c, QPAIR_DEFAULT);
 }
 
 /**
@@ -1603,7 +1624,7 @@ void tap_backend_init(struct ctx *c)
 
 	if (c->fd_tap != -1) { /* Passed as --fd */
 		assert(c->one_off);
-		tap_start_connection(c);
+		tap_start_connection(c, QPAIR_DEFAULT);
 		return;
 	}
 
diff --git a/tap.h b/tap.h
index 07ca0965215c..ecb12de372b5 100644
--- a/tap.h
+++ b/tap.h
@@ -91,30 +91,31 @@ void *tap_push_ip6h(struct ipv6hdr *ip6h,
 		    const struct in6_addr *src,
 		    const struct in6_addr *dst,
 		    size_t l4len, uint8_t proto, uint32_t flow);
-void tap_udp4_send(const struct ctx *c, struct in_addr src, in_port_t sport,
-		   struct in_addr dst, in_port_t dport,
+void tap_udp4_send(const struct ctx *c, unsigned int qpair, struct in_addr src,
+		   in_port_t sport, struct in_addr dst, in_port_t dport,
 		   const void *in, size_t dlen);
-void tap_icmp4_send(const struct ctx *c, struct in_addr src, struct in_addr dst,
-		    const void *in, const void *src_mac, size_t l4len);
+void tap_icmp4_send(const struct ctx *c, unsigned int qpair, struct in_addr src,
+		    struct in_addr dst, const void *in, const void *src_mac,
+		    size_t l4len);
 const struct in6_addr *tap_ip6_daddr(const struct ctx *c,
 				     const struct in6_addr *src);
 void *tap_push_ip6h(struct ipv6hdr *ip6h,
 		    const struct in6_addr *src, const struct in6_addr *dst,
 		    size_t l4len, uint8_t proto, uint32_t flow);
-void tap_udp6_send(const struct ctx *c,
+void tap_udp6_send(const struct ctx *c, unsigned int qpair,
 		   const struct in6_addr *src, in_port_t sport,
 		   const struct in6_addr *dst, in_port_t dport,
 		   uint32_t flow, void *in, size_t dlen);
-void tap_icmp6_send(const struct ctx *c,
+void tap_icmp6_send(const struct ctx *c, unsigned int qpair,
 		    const struct in6_addr *src, const struct in6_addr *dst,
 		    const void *in, const void *src_mac, size_t l4len);
-void tap_send_single(const struct ctx *c, const void *data, size_t l2len);
+void tap_send_single(const struct ctx *c, unsigned int qpair, const void *data, size_t l2len);
 size_t tap_send_frames(const struct ctx *c, const struct iovec *iov,
 		       size_t bufs_per_frame, size_t nframes);
 void eth_update_mac(struct ethhdr *eh,
 		    const unsigned char *eth_d, const unsigned char *eth_s);
 bool tap_is_ready(const struct ctx *c);
-void tap_listen_handler(struct ctx *c, uint32_t events);
+void tap_listen_handler(struct ctx *c, unsigned int qpair, uint32_t events);
 void tap_handler_pasta(struct ctx *c, uint32_t events,
 		       const struct timespec *now);
 void tap_handler_passt(struct ctx *c, uint32_t events,
@@ -123,7 +124,7 @@ int tap_sock_unix_open(char *sock_path);
 void tap_sock_reset(struct ctx *c);
 void tap_backend_init(struct ctx *c);
 void tap_flush_pools(void);
-void tap_handler(struct ctx *c, const struct timespec *now);
-void tap_add_packet(struct ctx *c, struct iov_tail *data,
+void tap_handler(struct ctx *c, unsigned int qpair, const struct timespec *now);
+void tap_add_packet(struct ctx *c, unsigned int qpair, struct iov_tail *data,
 		    const struct timespec *now);
 #endif /* TAP_H */
diff --git a/tcp.c b/tcp.c
index c400075440c9..c127b3132e5a 100644
--- a/tcp.c
+++ b/tcp.c
@@ -2227,7 +2227,7 @@ static void tcp_rst_no_conn(const struct ctx *c, int af,
 
 	tcp_update_csum(psum, rsth, &payload, 0);
 	rst_l2len = ((char *)rsth - buf) + sizeof(*rsth);
-	tap_send_single(c, buf, rst_l2len);
+	tap_send_single(c, QPAIR_DEFAULT, buf, rst_l2len);
 }
 
 /**
diff --git a/udp.c b/udp.c
index c28d6ee2b691..a295cb0e97cf 100644
--- a/udp.c
+++ b/udp.c
@@ -445,7 +445,7 @@ static void udp_send_tap_icmp4(const struct ctx *c,
 	/* Try to obtain the MAC address of the generating node */
 	saddr_any = inany_from_v4(saddr);
 	fwd_neigh_mac_get(c, &saddr_any, tap_omac);
-	tap_icmp4_send(c, saddr, eaddr, &msg, tap_omac, msglen);
+	tap_icmp4_send(c, QPAIR_DEFAULT, saddr, eaddr, &msg, tap_omac, msglen);
 }
 
 
@@ -493,7 +493,7 @@ static void udp_send_tap_icmp6(const struct ctx *c,
 
 	/* Try to obtain the MAC address of the generating node */
 	fwd_neigh_mac_get(c, (union inany_addr *) saddr, tap_omac);
-	tap_icmp6_send(c, saddr, eaddr, &msg, tap_omac, msglen);
+	tap_icmp6_send(c, QPAIR_DEFAULT, saddr, eaddr, &msg, tap_omac, msglen);
 }
 
 /**
diff --git a/vu_common.c b/vu_common.c
index 8b555ea9a8b1..6aa1ba768136 100644
--- a/vu_common.c
+++ b/vu_common.c
@@ -200,11 +200,12 @@ static void vu_handle_tx(struct vu_dev *vdev, int index,
 
 		data = IOV_TAIL(elem[count].out_sg, elem[count].out_num, 0);
 		if (IOV_DROP_HEADER(&data, struct virtio_net_hdr_mrg_rxbuf))
-			tap_add_packet(vdev->context, &data, now);
+			tap_add_packet(vdev->context, QPAIR_FROM_QUEUE(index),
+				       &data, now);
 
 		count++;
 	}
-	tap_handler(vdev->context, now);
+	tap_handler(vdev->context, QPAIR_FROM_QUEUE(index), now);
 
 	if (count) {
 		int i;
-- 
2.54.0


^ permalink raw reply	[flat|nested] 13+ messages in thread

* [PATCH v5 05/12] arp: Pass queue pair explicitly through ARP send path
  2026-06-16 12:51 [PATCH v5 00/12] vhost-user: Add multiqueue support Laurent Vivier
                   ` (3 preceding siblings ...)
  2026-06-16 12:51 ` [PATCH v5 04/12] tap: Thread queue pair through all remaining tap paths Laurent Vivier
@ 2026-06-16 12:51 ` Laurent Vivier
  2026-06-16 12:51 ` [PATCH v5 06/12] tcp: Pass queue pair explicitly through TCP " Laurent Vivier
                   ` (6 subsequent siblings)
  11 siblings, 0 replies; 13+ messages in thread
From: Laurent Vivier @ 2026-06-16 12:51 UTC (permalink / raw)
  To: passt-dev; +Cc: Laurent Vivier

Add a qpair parameter to arp(), arp_send_init_req(), and
arp_announce(), forwarding it to tap_send_single() instead of
hardcoding QPAIR_DEFAULT.

tap_start_connection() now consumes the qpair parameter it received
in the previous commit.

fwd_neigh_table_update() has no queue pair context and keeps using
QPAIR_DEFAULT.

No functional change.

Signed-off-by: Laurent Vivier <lvivier@redhat.com>
---
 arp.c | 15 +++++++++------
 arp.h |  6 +++---
 fwd.c |  2 +-
 tap.c |  6 ++----
 4 files changed, 15 insertions(+), 14 deletions(-)

diff --git a/arp.c b/arp.c
index e97c4de86a99..1dc8b87cd993 100644
--- a/arp.c
+++ b/arp.c
@@ -63,11 +63,12 @@ static bool ignore_arp(const struct ctx *c,
 /**
  * arp() - Check if this is a supported ARP message, reply as needed
  * @c:		Execution context
+ * @qpair:	Queue pair on which to send the reply
  * @data:	Single packet with Ethernet buffer
  *
  * Return: 1 if handled, -1 on failure
  */
-int arp(const struct ctx *c, struct iov_tail *data)
+int arp(const struct ctx *c, unsigned int qpair, struct iov_tail *data)
 {
 	union inany_addr tgt;
 	struct {
@@ -112,7 +113,7 @@ int arp(const struct ctx *c, struct iov_tail *data)
 	memcpy(resp.am.tha,		am->sha,	sizeof(resp.am.tha));
 	memcpy(resp.am.tip,		am->sip,	sizeof(resp.am.tip));
 
-	tap_send_single(c, QPAIR_DEFAULT, &resp, sizeof(resp));
+	tap_send_single(c, qpair, &resp, sizeof(resp));
 
 	return 1;
 }
@@ -120,8 +121,9 @@ int arp(const struct ctx *c, struct iov_tail *data)
 /**
  * arp_send_init_req() - Send initial ARP request to retrieve guest MAC address
  * @c:		Execution context
+ * @qpair:	Queue pair on which to send the request
  */
-void arp_send_init_req(const struct ctx *c)
+void arp_send_init_req(const struct ctx *c, unsigned int qpair)
 {
 	struct {
 		struct ethhdr eh;
@@ -148,16 +150,17 @@ void arp_send_init_req(const struct ctx *c)
 	memcpy(req.am.tip,	&c->ip4.addr,		sizeof(req.am.tip));
 
 	debug("Sending initial ARP request for guest MAC address");
-	tap_send_single(c, QPAIR_DEFAULT, &req, sizeof(req));
+	tap_send_single(c, qpair, &req, sizeof(req));
 }
 
 /**
  * arp_announce() - Send an ARP announcement for an IPv4 host
  * @c:		Execution context
+ * @qpair:	Queue pair on which to send the announcement
  * @ip:	IPv4 address we announce as owned by @mac
  * @mac:	MAC address to advertise for @ip
  */
-void arp_announce(const struct ctx *c, struct in_addr *ip,
+void arp_announce(const struct ctx *c, unsigned int qpair, struct in_addr *ip,
 		  const unsigned char *mac)
 {
 	char ip_str[INET_ADDRSTRLEN];
@@ -202,5 +205,5 @@ void arp_announce(const struct ctx *c, struct in_addr *ip,
 	eth_ntop(mac, mac_str, sizeof(mac_str));
 	debug("ARP announcement for %s / %s", ip_str, mac_str);
 
-	tap_send_single(c, QPAIR_DEFAULT, &msg, sizeof(msg));
+	tap_send_single(c, qpair, &msg, sizeof(msg));
 }
diff --git a/arp.h b/arp.h
index 4b1f38bcec9b..501760393a74 100644
--- a/arp.h
+++ b/arp.h
@@ -22,9 +22,9 @@ struct arpmsg {
 	unsigned char tip[4];
 } __attribute__((__packed__));
 
-int arp(const struct ctx *c, struct iov_tail *data);
-void arp_send_init_req(const struct ctx *c);
-void arp_announce(const struct ctx *c, struct in_addr *ip,
+int arp(const struct ctx *c, unsigned int qpair, struct iov_tail *data);
+void arp_send_init_req(const struct ctx *c, unsigned int qpair);
+void arp_announce(const struct ctx *c, unsigned int qpair, struct in_addr *ip,
 		  const unsigned char *mac);
 
 #endif /* ARP_H */
diff --git a/fwd.c b/fwd.c
index c0a6adacd294..0d0e265b7dc0 100644
--- a/fwd.c
+++ b/fwd.c
@@ -145,7 +145,7 @@ void fwd_neigh_table_update(const struct ctx *c, const union inany_addr *addr,
 		return;
 
 	if (inany_v4(addr))
-		arp_announce(c, inany_v4(addr), e->mac);
+		arp_announce(c, QPAIR_DEFAULT, inany_v4(addr), e->mac);
 	else
 		ndp_unsolicited_na(c, &addr->a6);
 }
diff --git a/tap.c b/tap.c
index 521ccd6d47e7..66dcb83665a7 100644
--- a/tap.c
+++ b/tap.c
@@ -741,7 +741,7 @@ resume:
 		if (!eh)
 			continue;
 		if (ntohs(eh->h_proto) == ETH_P_ARP) {
-			arp(c, &data);
+			arp(c, qpair, &data);
 			continue;
 		}
 
@@ -1453,8 +1453,6 @@ static void tap_start_connection(const struct ctx *c, unsigned int qpair)
 {
 	union epoll_ref ref = { 0 };
 
-	(void)qpair;
-
 	ref.fd = c->fd_tap;
 	switch (c->mode) {
 	case MODE_PASST:
@@ -1474,7 +1472,7 @@ static void tap_start_connection(const struct ctx *c, unsigned int qpair)
 		return;
 
 	if (c->ifi4)
-		arp_send_init_req(c);
+		arp_send_init_req(c, qpair);
 	if (c->ifi6 && !c->no_ndp)
 		ndp_send_init_req(c);
 }
-- 
2.54.0


^ permalink raw reply	[flat|nested] 13+ messages in thread

* [PATCH v5 06/12] tcp: Pass queue pair explicitly through TCP send path
  2026-06-16 12:51 [PATCH v5 00/12] vhost-user: Add multiqueue support Laurent Vivier
                   ` (4 preceding siblings ...)
  2026-06-16 12:51 ` [PATCH v5 05/12] arp: Pass queue pair explicitly through ARP send path Laurent Vivier
@ 2026-06-16 12:51 ` Laurent Vivier
  2026-06-16 12:51 ` [PATCH v5 07/12] udp: Pass queue pair explicitly through UDP " Laurent Vivier
                   ` (5 subsequent siblings)
  11 siblings, 0 replies; 13+ messages in thread
From: Laurent Vivier @ 2026-06-16 12:51 UTC (permalink / raw)
  To: passt-dev; +Cc: Laurent Vivier

Thread a qpair parameter from the entry points (tcp_sock_handler,
tcp_timer_handler, tcp_tap_handler, tcp_defer_handler) through every
intermediate function down to the vhost-user send functions, so callers
explicitly select the target RX virtqueue instead of hardcoding
QPAIR_DEFAULT.

Add a qpair parameter to tcp_send_flag(), tcp_data_from_sock(),
tcp_rst_do() and its tcp_rst() macro, tcp_rewind_seq(),
tcp_data_from_tap(), tcp_conn_from_sock_finish(), tcp_connect_finish(),
tcp_tap_window_update(), tcp_conn_from_tap(), tcp_rst_no_conn(),
tcp_keepalive(), and tcp_inactivity().

tcp_vu_send_flag() and tcp_vu_data_from_sock() now use the passed
qpair to select the RX virtqueue instead of always using
QPAIR_DEFAULT.

The buffer-based path (tcp_buf.c) does not thread qpair since it is
only used in non-vhost-user mode.

No functional change.

Signed-off-by: Laurent Vivier <lvivier@redhat.com>
---
 passt.c        |  12 +--
 tap.c          |   8 +-
 tcp.c          | 199 ++++++++++++++++++++++++++++---------------------
 tcp.h          |  15 ++--
 tcp_buf.c      |  10 +--
 tcp_internal.h |   7 +-
 tcp_vu.c       |  19 +++--
 tcp_vu.h       |   6 +-
 8 files changed, 157 insertions(+), 119 deletions(-)

diff --git a/passt.c b/passt.c
index 71eb4f0192e2..9569f920ee28 100644
--- a/passt.c
+++ b/passt.c
@@ -98,11 +98,13 @@ struct passt_stats {
  * post_handler() - Run periodic and deferred tasks for L4 protocol handlers
  * @c:		Execution context
  * @now:	Current timestamp
+ * @qpair:	Queue pair to process
  */
-static void post_handler(struct ctx *c, const struct timespec *now)
+static void post_handler(struct ctx *c, const struct timespec *now,
+			 unsigned int qpair)
 {
 	if (!c->no_tcp)
-		tcp_defer_handler(c, now);
+		tcp_defer_handler(c, now, qpair);
 
 	flow_defer_handler(c, now);
 	fwd_scan_ports_timer(c, now);
@@ -251,7 +253,7 @@ static void passt_worker(void *opaque, int nfds, struct epoll_event *events)
 			pasta_netns_quit_timer_handler(c, ref);
 			break;
 		case EPOLL_TYPE_TCP:
-			tcp_sock_handler(c, ref, eventmask);
+			tcp_sock_handler(c, ref, eventmask, QPAIR_DEFAULT);
 			break;
 		case EPOLL_TYPE_TCP_SPLICE:
 			tcp_splice_sock_handler(c, ref, eventmask, &now);
@@ -260,7 +262,7 @@ static void passt_worker(void *opaque, int nfds, struct epoll_event *events)
 			tcp_listen_handler(c, ref, &now);
 			break;
 		case EPOLL_TYPE_TCP_TIMER:
-			tcp_timer_handler(c, ref);
+			tcp_timer_handler(c, ref, QPAIR_DEFAULT);
 			break;
 		case EPOLL_TYPE_UDP_LISTEN:
 			udp_listen_sock_handler(c, ref, eventmask, &now);
@@ -300,7 +302,7 @@ static void passt_worker(void *opaque, int nfds, struct epoll_event *events)
 		print_stats(c, &stats, &now);
 	}
 
-	post_handler(c, &now);
+	post_handler(c, &now, QPAIR_DEFAULT);
 
 	migrate_handler(c);
 }
diff --git a/tap.c b/tap.c
index 66dcb83665a7..ba2a573fa630 100644
--- a/tap.c
+++ b/tap.c
@@ -717,8 +717,6 @@ static int tap4_handler(struct ctx *c, unsigned int qpair,
 	unsigned int i, j, seq_count;
 	struct tap4_l4_t *seq;
 
-	(void)qpair;
-
 	if (!c->ifi4 || !pool_tap4->count)
 		return pool_tap4->count;
 
@@ -870,7 +868,7 @@ append:
 			if (c->no_tcp)
 				continue;
 			for (k = 0; k < p->count; )
-				k += tcp_tap_handler(c, PIF_TAP, AF_INET,
+				k += tcp_tap_handler(c, qpair, PIF_TAP, AF_INET,
 						     &seq->saddr, &seq->daddr,
 						     0, p, k, now);
 		} else if (seq->protocol == IPPROTO_UDP) {
@@ -955,8 +953,6 @@ static int tap6_handler(struct ctx *c, unsigned int qpair,
 	unsigned int i, j, seq_count = 0;
 	struct tap6_l4_t *seq;
 
-	(void)qpair;
-
 	if (!c->ifi6 || !pool_tap6->count)
 		return pool_tap6->count;
 
@@ -1121,7 +1117,7 @@ append:
 			if (c->no_tcp)
 				continue;
 			for (k = 0; k < p->count; )
-				k += tcp_tap_handler(c, PIF_TAP, AF_INET6,
+				k += tcp_tap_handler(c, qpair, PIF_TAP, AF_INET6,
 						     &seq->saddr, &seq->daddr,
 						     seq->flow_lbl, p, k, now);
 		} else if (seq->protocol == IPPROTO_UDP) {
diff --git a/tcp.c b/tcp.c
index c127b3132e5a..7f8e68a31994 100644
--- a/tcp.c
+++ b/tcp.c
@@ -1258,16 +1258,18 @@ static void tcp_update_seqack_from_tap(const struct ctx *c,
  * tcp_rewind_seq() - Rewind sequence to tap and socket offset to current ACK
  * @c:		Execution context
  * @conn:	Connection pointer
+ * @qpair:	Queue pair to process
  *
  * Return: 0 on success, -1 on failure, with connection reset
  */
-static int tcp_rewind_seq(const struct ctx *c, struct tcp_tap_conn *conn)
+static int tcp_rewind_seq(const struct ctx *c, struct tcp_tap_conn *conn,
+			  unsigned int qpair)
 {
 	conn->seq_to_tap = conn->seq_ack_from_tap;
 	conn->events &= ~TAP_FIN_SENT;
 
 	if (tcp_set_peek_offset(conn, 0)) {
-		tcp_rst(c, conn);
+		tcp_rst(c, conn, qpair);
 		return -1;
 	}
 
@@ -1371,16 +1373,17 @@ int tcp_prepare_flags(const struct ctx *c, struct tcp_tap_conn *conn,
  * @c:         Execution context
  * @conn:      Connection pointer
  * @flags:     TCP flags: if not set, send segment only if ACK is due
+ * @qpair:     Queue pair to process
  *
  * Return: negative error code on fatal connection failure, 0 otherwise
  */
 static int tcp_send_flag(const struct ctx *c, struct tcp_tap_conn *conn,
-			 int flags)
+			 int flags, unsigned int qpair)
 {
 	int ret;
 
 	if (c->mode == MODE_VU)
-		ret = tcp_vu_send_flag(c, conn, flags);
+		ret = tcp_vu_send_flag(c, conn, flags, qpair);
 	else
 		ret = tcp_buf_send_flag(c, conn, flags);
 
@@ -1425,14 +1428,16 @@ static void tcp_sock_rst(const struct ctx *c, struct tcp_tap_conn *conn)
  * tcp_rst_do() - Reset a tap connection: send RST segment on both sides, close
  * @c:		Execution context
  * @conn:	Connection pointer
+ * @qpair:	Queue pair to process
  */
-void tcp_rst_do(const struct ctx *c, struct tcp_tap_conn *conn)
+void tcp_rst_do(const struct ctx *c, struct tcp_tap_conn *conn,
+		unsigned int qpair)
 {
 	if (conn->events == CLOSED)
 		return;
 
 	/* Send RST on tap */
-	tcp_send_flag(c, conn, RST);
+	tcp_send_flag(c, conn, RST, qpair);
 
 	tcp_sock_rst(c, conn);
 }
@@ -1459,11 +1464,13 @@ static void tcp_get_tap_ws(struct tcp_tap_conn *conn,
  * @c:		Execution context
  * @conn:	Connection pointer
  * @wnd:	Window value, host order, unscaled
+ * @qpair:	Queue pair to process
  *
  * Return: false on zero window (not stored to wnd_from_tap), true otherwise
  */
 static bool tcp_tap_window_update(const struct ctx *c,
-				  struct tcp_tap_conn *conn, unsigned wnd)
+				  struct tcp_tap_conn *conn, unsigned wnd,
+				  unsigned int qpair)
 {
 	wnd = MIN(MAX_WINDOW, wnd << conn->ws_from_tap);
 
@@ -1474,7 +1481,7 @@ static bool tcp_tap_window_update(const struct ctx *c,
 	 * that no data beyond the updated window will be acknowledged.
 	 */
 	if (!wnd && SEQ_LT(conn->seq_ack_from_tap, conn->seq_to_tap)) {
-		tcp_rewind_seq(c, conn);
+		tcp_rewind_seq(c, conn, qpair);
 		return false;
 	}
 
@@ -1646,6 +1653,7 @@ static void tcp_bind_outbound(const struct ctx *c,
 /**
  * tcp_conn_from_tap() - Handle connection request (SYN segment) from tap
  * @c:		Execution context
+ * @qpair:	Queue pair for the flow
  * @af:		Address family, AF_INET or AF_INET6
  * @saddr:	Source address, pointer to in_addr or in6_addr
  * @daddr:	Destination address, pointer to in_addr or in6_addr
@@ -1656,10 +1664,11 @@ static void tcp_bind_outbound(const struct ctx *c,
  *
  * #syscalls:vu getsockname
  */
-static void tcp_conn_from_tap(const struct ctx *c, sa_family_t af,
-			      const void *saddr, const void *daddr,
-			      const struct tcphdr *th, const char *opts,
-			      size_t optlen, const struct timespec *now)
+static void tcp_conn_from_tap(const struct ctx *c, unsigned int qpair,
+			      sa_family_t af, const void *saddr,
+			      const void *daddr, const struct tcphdr *th,
+			      const char *opts, size_t optlen,
+			      const struct timespec *now)
 {
 	in_port_t srcport = ntohs(th->source);
 	in_port_t dstport = ntohs(th->dest);
@@ -1760,7 +1769,7 @@ static void tcp_conn_from_tap(const struct ctx *c, sa_family_t af,
 
 	if (connect(s, &sa.sa, socklen_inany(&sa))) {
 		if (errno != EINPROGRESS) {
-			tcp_rst(c, conn);
+			tcp_rst(c, conn, qpair);
 			goto cancel;
 		}
 
@@ -1768,7 +1777,7 @@ static void tcp_conn_from_tap(const struct ctx *c, sa_family_t af,
 	} else {
 		tcp_get_sndbuf(conn);
 
-		if (tcp_send_flag(c, conn, SYN | ACK))
+		if (tcp_send_flag(c, conn, SYN | ACK, qpair))
 			goto cancel;
 
 		conn_event(c, conn, TAP_SYN_ACK_SENT);
@@ -1830,15 +1839,17 @@ static int tcp_sock_consume(const struct tcp_tap_conn *conn, uint32_t ack_seq)
  * tcp_data_from_sock() - Handle new data from socket, queue to tap, in window
  * @c:		Execution context
  * @conn:	Connection pointer
+ * @qpair:	Queue pair to process
  *
  * Return: negative on connection reset, 0 otherwise
  *
  * #syscalls recvmsg
  */
-static int tcp_data_from_sock(const struct ctx *c, struct tcp_tap_conn *conn)
+static int tcp_data_from_sock(const struct ctx *c, struct tcp_tap_conn *conn,
+			      unsigned int qpair)
 {
 	if (c->mode == MODE_VU)
-		return tcp_vu_data_from_sock(c, conn);
+		return tcp_vu_data_from_sock(c, conn, qpair);
 
 	return tcp_buf_data_from_sock(c, conn);
 }
@@ -1866,13 +1877,15 @@ static ssize_t tcp_packet_data_len(const struct tcphdr *th, size_t l4len)
  * @conn:	Connection pointer
  * @p:		Pool of TCP packets, with TCP headers
  * @idx:	Index of first data packet in pool
+ * @qpair:	Queue pair to process
  *
  * #syscalls sendmsg
  *
  * Return: count of consumed packets
  */
 static int tcp_data_from_tap(const struct ctx *c, struct tcp_tap_conn *conn,
-			     const struct pool *p, int idx)
+			     const struct pool *p, int idx,
+			     unsigned int qpair)
 {
 	int i, iov_i, ack = 0, fin = 0, retr = 0, keep = -1, partial_send = 0;
 	uint16_t max_ack_seq_wnd = conn->wnd_from_tap;
@@ -1922,7 +1935,7 @@ static int tcp_data_from_tap(const struct ctx *c, struct tcp_tap_conn *conn,
 				   "keep-alive sequence: %u, previous: %u",
 				   seq, conn->seq_from_tap);
 
-			if (tcp_send_flag(c, conn, ACK))
+			if (tcp_send_flag(c, conn, ACK, qpair))
 				return -1;
 
 			tcp_timer_ctl(c, conn);
@@ -1933,7 +1946,8 @@ static int tcp_data_from_tap(const struct ctx *c, struct tcp_tap_conn *conn,
 
 			if (p->count == 1) {
 				tcp_tap_window_update(c, conn,
-						      ntohs(th->window));
+						      ntohs(th->window),
+						      qpair);
 				return 1;
 			}
 
@@ -1959,7 +1973,7 @@ static int tcp_data_from_tap(const struct ctx *c, struct tcp_tap_conn *conn,
 				 * well.
 				 */
 				if (!ntohs(th->window))
-					tcp_rewind_seq(c, conn);
+					tcp_rewind_seq(c, conn, qpair);
 
 				max_ack_seq_wnd = ntohs(th->window);
 				max_ack_seq = ack_seq;
@@ -2024,17 +2038,17 @@ static int tcp_data_from_tap(const struct ctx *c, struct tcp_tap_conn *conn,
 	if (ack && !tcp_sock_consume(conn, max_ack_seq))
 		tcp_update_seqack_from_tap(c, conn, max_ack_seq);
 
-	tcp_tap_window_update(c, conn, max_ack_seq_wnd);
+	tcp_tap_window_update(c, conn, max_ack_seq_wnd, qpair);
 
 	if (retr) {
 		flow_trace(conn,
 			   "fast re-transmit, ACK: %u, previous sequence: %u",
 			   conn->seq_ack_from_tap, conn->seq_to_tap);
 
-		if (tcp_rewind_seq(c, conn))
+		if (tcp_rewind_seq(c, conn, qpair))
 			return -1;
 
-		tcp_data_from_sock(c, conn);
+		tcp_data_from_sock(c, conn, qpair);
 	}
 
 	if (!iov_i)
@@ -2050,7 +2064,7 @@ eintr:
 			 *   Then swiftly looked away and left.
 			 */
 			conn->seq_from_tap = seq_from_tap;
-			if (tcp_send_flag(c, conn, ACK))
+			if (tcp_send_flag(c, conn, ACK, qpair))
 				return -1;
 		}
 
@@ -2058,7 +2072,7 @@ eintr:
 			goto eintr;
 
 		if (errno == EAGAIN || errno == EWOULDBLOCK) {
-			if (tcp_send_flag(c, conn, ACK | DUP_ACK))
+			if (tcp_send_flag(c, conn, ACK | DUP_ACK, qpair))
 				return -1;
 
 			uint32_t events = tcp_conn_epoll_events(conn->events,
@@ -2094,7 +2108,7 @@ out:
 		 */
 		if (conn->seq_dup_ack_approx != (conn->seq_from_tap & 0xff)) {
 			conn->seq_dup_ack_approx = conn->seq_from_tap & 0xff;
-			if (tcp_send_flag(c, conn, ACK | DUP_ACK))
+			if (tcp_send_flag(c, conn, ACK | DUP_ACK, qpair))
 				return -1;
 		}
 		return p->count - idx;
@@ -2109,7 +2123,7 @@ out:
 
 		conn_event(c, conn, TAP_FIN_RCVD);
 	} else {
-		if (tcp_send_flag(c, conn, ACK_IF_NEEDED))
+		if (tcp_send_flag(c, conn, ACK_IF_NEEDED, qpair))
 			return -1;
 	}
 
@@ -2123,13 +2137,15 @@ out:
  * @th:		TCP header of SYN, ACK segment: caller MUST ensure it's there
  * @opts:	Pointer to start of options
  * @optlen:	Bytes in options: caller MUST ensure available length
+ * @qpair:	Queue pair to process
  */
 static void tcp_conn_from_sock_finish(const struct ctx *c,
 				      struct tcp_tap_conn *conn,
 				      const struct tcphdr *th,
-				      const char *opts, size_t optlen)
+				      const char *opts, size_t optlen,
+				      unsigned int qpair)
 {
-	tcp_tap_window_update(c, conn, ntohs(th->window));
+	tcp_tap_window_update(c, conn, ntohs(th->window), qpair);
 	tcp_get_tap_ws(conn, opts, optlen);
 
 	/* First value is not scaled */
@@ -2144,24 +2160,25 @@ static void tcp_conn_from_sock_finish(const struct ctx *c,
 
 	conn_event(c, conn, ESTABLISHED);
 	if (tcp_set_peek_offset(conn, 0)) {
-		tcp_rst(c, conn);
+		tcp_rst(c, conn, qpair);
 		return;
 	}
 
-	if (tcp_send_flag(c, conn, ACK)) {
-		tcp_rst(c, conn);
+	if (tcp_send_flag(c, conn, ACK, qpair)) {
+		tcp_rst(c, conn, qpair);
 		return;
 	}
 
 	/* The client might have sent data already, which we didn't
 	 * dequeue waiting for SYN,ACK from tap -- check now.
 	 */
-	tcp_data_from_sock(c, conn);
+	tcp_data_from_sock(c, conn, qpair);
 }
 
 /**
  * tcp_rst_no_conn() - Send RST in response to a packet with no connection
  * @c:		Execution context
+ * @qpair:	Queue pair on which to send the reply
  * @af:		Address family, AF_INET or AF_INET6
  * @saddr:	Source address of the packet we're responding to
  * @daddr:	Destination address of the packet we're responding to
@@ -2169,7 +2186,7 @@ static void tcp_conn_from_sock_finish(const struct ctx *c,
  * @th:		TCP header of the packet we're responding to
  * @l4len:	Packet length, including TCP header
  */
-static void tcp_rst_no_conn(const struct ctx *c, int af,
+static void tcp_rst_no_conn(const struct ctx *c, unsigned int qpair, int af,
 			    const void *saddr, const void *daddr,
 			    uint32_t flow_lbl,
 			    const struct tcphdr *th, size_t l4len)
@@ -2227,12 +2244,13 @@ static void tcp_rst_no_conn(const struct ctx *c, int af,
 
 	tcp_update_csum(psum, rsth, &payload, 0);
 	rst_l2len = ((char *)rsth - buf) + sizeof(*rsth);
-	tap_send_single(c, QPAIR_DEFAULT, buf, rst_l2len);
+	tap_send_single(c, qpair, buf, rst_l2len);
 }
 
 /**
  * tcp_tap_handler() - Handle packets from tap and state transitions
  * @c:		Execution context
+ * @qpair:	Queue pair on which to send packets
  * @pif:	pif on which the packet is arriving
  * @af:		Address family, AF_INET or AF_INET6
  * @saddr:	Source address
@@ -2244,9 +2262,10 @@ static void tcp_rst_no_conn(const struct ctx *c, int af,
  *
  * Return: count of consumed packets
  */
-int tcp_tap_handler(const struct ctx *c, uint8_t pif, sa_family_t af,
-		    const void *saddr, const void *daddr, uint32_t flow_lbl,
-		    const struct pool *p, int idx, const struct timespec *now)
+int tcp_tap_handler(const struct ctx *c, unsigned int qpair, uint8_t pif,
+		    sa_family_t af, const void *saddr, const void *daddr,
+		    uint32_t flow_lbl, const struct pool *p, int idx,
+		    const struct timespec *now)
 {
 	struct tcp_tap_conn *conn;
 	struct tcphdr th_storage;
@@ -2283,10 +2302,11 @@ int tcp_tap_handler(const struct ctx *c, uint8_t pif, sa_family_t af,
 	/* New connection from tap */
 	if (!flow) {
 		if (opts && th->syn && !th->ack)
-			tcp_conn_from_tap(c, af, saddr, daddr, th,
+			tcp_conn_from_tap(c, qpair, af, saddr, daddr, th,
 					  opts, optlen, now);
 		else
-			tcp_rst_no_conn(c, af, saddr, daddr, flow_lbl, th, l4len);
+			tcp_rst_no_conn(c, qpair, af, saddr, daddr, flow_lbl, th,
+					l4len);
 		return 1;
 	}
 
@@ -2310,7 +2330,7 @@ int tcp_tap_handler(const struct ctx *c, uint8_t pif, sa_family_t af,
 	/* Establishing connection from socket */
 	if (conn->events & SOCK_ACCEPTED) {
 		if (th->syn && th->ack && !th->fin) {
-			tcp_conn_from_sock_finish(c, conn, th, opts, optlen);
+			tcp_conn_from_sock_finish(c, conn, th, opts, optlen, qpair);
 			return 1;
 		}
 
@@ -2337,7 +2357,7 @@ int tcp_tap_handler(const struct ctx *c, uint8_t pif, sa_family_t af,
 				goto reset;
 			}
 
-			if (tcp_send_flag(c, conn, ACK))
+			if (tcp_send_flag(c, conn, ACK, qpair))
 				goto reset;
 
 			conn_event(c, conn, SOCK_FIN_SENT);
@@ -2348,8 +2368,8 @@ int tcp_tap_handler(const struct ctx *c, uint8_t pif, sa_family_t af,
 		if (!th->ack)
 			goto reset;
 
-		if (tcp_tap_window_update(c, conn, ntohs(th->window)))
-			tcp_data_from_sock(c, conn);
+		if (tcp_tap_window_update(c, conn, ntohs(th->window), qpair))
+			tcp_data_from_sock(c, conn, qpair);
 
 		if (p->count - idx == 1)
 			return 1;
@@ -2380,12 +2400,12 @@ int tcp_tap_handler(const struct ctx *c, uint8_t pif, sa_family_t af,
 				   "fast re-transmit, ACK: %u, previous sequence: %u",
 				   ntohl(th->ack_seq), conn->seq_to_tap);
 
-			if (tcp_rewind_seq(c, conn))
+			if (tcp_rewind_seq(c, conn, qpair))
 				return -1;
 		}
 
-		if (tcp_tap_window_update(c, conn, ntohs(th->window)) || retr)
-			tcp_data_from_sock(c, conn);
+		if (tcp_tap_window_update(c, conn, ntohs(th->window), qpair) || retr)
+			tcp_data_from_sock(c, conn, qpair);
 
 		if (conn->seq_ack_from_tap == conn->seq_to_tap) {
 			if (th->ack && conn->events & TAP_FIN_SENT)
@@ -2400,7 +2420,7 @@ int tcp_tap_handler(const struct ctx *c, uint8_t pif, sa_family_t af,
 	}
 
 	/* Established connections accepting data from tap */
-	count = tcp_data_from_tap(c, conn, p, idx);
+	count = tcp_data_from_tap(c, conn, p, idx, qpair);
 	if (count == -1)
 		goto reset;
 
@@ -2419,7 +2439,7 @@ int tcp_tap_handler(const struct ctx *c, uint8_t pif, sa_family_t af,
 		}
 
 		conn_event(c, conn, SOCK_FIN_SENT);
-		if (tcp_send_flag(c, conn, ACK))
+		if (tcp_send_flag(c, conn, ACK, qpair))
 			goto reset;
 
 		ack_due = 0;
@@ -2449,7 +2469,7 @@ reset:
 	 * remaining packets in the batch, since they'd be invalidated when our
 	 * RST is received, even if otherwise good.
 	 */
-	tcp_rst(c, conn);
+	tcp_rst(c, conn, qpair);
 	return p->count - idx;
 }
 
@@ -2457,20 +2477,22 @@ reset:
  * tcp_connect_finish() - Handle completion of connect() from EPOLLOUT event
  * @c:		Execution context
  * @conn:	Connection pointer
+ * @qpair:	Queue pair to process
  */
-static void tcp_connect_finish(const struct ctx *c, struct tcp_tap_conn *conn)
+static void tcp_connect_finish(const struct ctx *c, struct tcp_tap_conn *conn,
+			       unsigned int qpair)
 {
 	socklen_t sl;
 	int so;
 
 	sl = sizeof(so);
 	if (getsockopt(conn->sock, SOL_SOCKET, SO_ERROR, &so, &sl) || so) {
-		tcp_rst(c, conn);
+		tcp_rst(c, conn, qpair);
 		return;
 	}
 
-	if (tcp_send_flag(c, conn, SYN | ACK)) {
-		tcp_rst(c, conn);
+	if (tcp_send_flag(c, conn, SYN | ACK, qpair)) {
+		tcp_rst(c, conn, qpair);
 		return;
 	}
 
@@ -2513,7 +2535,7 @@ static void tcp_tap_conn_from_sock(const struct ctx *c, union flow *flow,
 
 	conn->wnd_from_tap = WINDOW_DEFAULT;
 
-	if (tcp_send_flag(c, conn, SYN)) {
+	if (tcp_send_flag(c, conn, SYN, QPAIR_DEFAULT)) {
 		conn_flag(c, conn, CLOSING);
 		FLOW_ACTIVATE(conn);
 		return;
@@ -2604,12 +2626,14 @@ cancel:
  * tcp_timer_handler() - timerfd events: close, send ACK, retransmit, or reset
  * @c:		Execution context
  * @ref:	epoll reference of timer (not connection)
+ * @qpair:	Queue pair to process
  *
  * #syscalls timerfd_gettime|timerfd_gettime64
  * #syscalls arm:timerfd_gettime64 i686:timerfd_gettime64
  * #syscalls arm:timerfd_settime64 i686:timerfd_settime64
  */
-void tcp_timer_handler(const struct ctx *c, union epoll_ref ref)
+void tcp_timer_handler(const struct ctx *c, union epoll_ref ref,
+		       unsigned int qpair)
 {
 	struct itimerspec check_armed = { { 0 }, { 0 } };
 	struct tcp_tap_conn *conn = &FLOW(ref.flow)->tcp;
@@ -2628,8 +2652,8 @@ void tcp_timer_handler(const struct ctx *c, union epoll_ref ref)
 		return;
 
 	if (conn->flags & ACK_TO_TAP_DUE) {
-		if (tcp_send_flag(c, conn, ACK_IF_NEEDED)) {
-			tcp_rst(c, conn);
+		if (tcp_send_flag(c, conn, ACK_IF_NEEDED, qpair)) {
+			tcp_rst(c, conn, qpair);
 			return;
 		}
 		tcp_timer_ctl(c, conn);
@@ -2641,11 +2665,11 @@ void tcp_timer_handler(const struct ctx *c, union epoll_ref ref)
 			max = MIN(TCP_MAX_RETRIES, max);
 			if (conn->retries >= max) {
 				flow_dbg(conn, "handshake timeout");
-				tcp_rst(c, conn);
+				tcp_rst(c, conn, qpair);
 			} else {
 				flow_trace(conn, "SYN timeout, retry");
-				if (tcp_send_flag(c, conn, SYN)) {
-					tcp_rst(c, conn);
+				if (tcp_send_flag(c, conn, SYN, qpair)) {
+					tcp_rst(c, conn, qpair);
 					return;
 				}
 				conn->retries++;
@@ -2654,7 +2678,7 @@ void tcp_timer_handler(const struct ctx *c, union epoll_ref ref)
 			}
 		} else if (conn->retries == TCP_MAX_RETRIES) {
 			flow_dbg(conn, "retransmissions count exceeded");
-			tcp_rst(c, conn);
+			tcp_rst(c, conn, qpair);
 		} else {
 			flow_dbg(conn, "ACK timeout, retry");
 
@@ -2662,10 +2686,10 @@ void tcp_timer_handler(const struct ctx *c, union epoll_ref ref)
 				conn->wnd_from_tap = 1; /* Zero-window probe */
 
 			conn->retries++;
-			if (tcp_rewind_seq(c, conn))
+			if (tcp_rewind_seq(c, conn, qpair))
 				return;
 
-			tcp_data_from_sock(c, conn);
+			tcp_data_from_sock(c, conn, qpair);
 			tcp_timer_ctl(c, conn);
 		}
 	}
@@ -2676,9 +2700,10 @@ void tcp_timer_handler(const struct ctx *c, union epoll_ref ref)
  * @c:		Execution context
  * @ref:	epoll reference
  * @events:	epoll events bitmap
+ * @qpair:	Queue pair to process
  */
 void tcp_sock_handler(const struct ctx *c, union epoll_ref ref,
-		      uint32_t events)
+		      uint32_t events, unsigned int qpair)
 {
 	struct tcp_tap_conn *conn = conn_at_sidx(ref.flowside);
 
@@ -2689,7 +2714,7 @@ void tcp_sock_handler(const struct ctx *c, union epoll_ref ref,
 		return;
 
 	if (events & EPOLLERR) {
-		tcp_rst(c, conn);
+		tcp_rst(c, conn, qpair);
 		return;
 	}
 
@@ -2708,13 +2733,13 @@ void tcp_sock_handler(const struct ctx *c, union epoll_ref ref,
 			conn_event(c, conn, SOCK_FIN_RCVD);
 
 		if (events & EPOLLIN)
-			tcp_data_from_sock(c, conn);
+			tcp_data_from_sock(c, conn, qpair);
 
 		if (events & EPOLLOUT) {
 			tcp_epoll_ctl(conn);
 			if (tcp_update_seqack_wnd(c, conn, false, NULL) &&
-			    tcp_send_flag(c, conn, ACK)) {
-				tcp_rst(c, conn);
+			    tcp_send_flag(c, conn, ACK, qpair)) {
+				tcp_rst(c, conn, qpair);
 				return;
 			}
 		}
@@ -2724,7 +2749,7 @@ void tcp_sock_handler(const struct ctx *c, union epoll_ref ref,
 
 	/* EPOLLHUP during handshake: reset */
 	if (events & EPOLLHUP) {
-		tcp_rst(c, conn);
+		tcp_rst(c, conn, qpair);
 		return;
 	}
 
@@ -2734,7 +2759,7 @@ void tcp_sock_handler(const struct ctx *c, union epoll_ref ref,
 
 	if (conn->events == TAP_SYN_RCVD) {
 		if (events & EPOLLOUT)
-			tcp_connect_finish(c, conn);
+			tcp_connect_finish(c, conn, qpair);
 		/* Data? Check later */
 	}
 }
@@ -2939,9 +2964,11 @@ int tcp_init(struct ctx *c)
 
 /**
  * tcp_keepalive() - Send keepalives for connections which need it
- * @:	Execution context
+ * @c:		Execution context
+ * @qpair:	Queue pair to process
  */
-static void tcp_keepalive(struct ctx *c, const struct timespec *now)
+static void tcp_keepalive(struct ctx *c, const struct timespec *now,
+			  unsigned int qpair)
 {
 	union flow *flow;
 
@@ -2956,8 +2983,8 @@ static void tcp_keepalive(struct ctx *c, const struct timespec *now)
 		if (conn->tap_inactive) {
 			flow_dbg(conn, "No tap activity for least %us, send keepalive",
 				 KEEPALIVE_INTERVAL);
-			if (tcp_send_flag(c, conn, KEEPALIVE))
-				tcp_rst(c, conn);
+			if (tcp_send_flag(c, conn, KEEPALIVE, qpair))
+				tcp_rst(c, conn, qpair);
 		}
 
 		/* Ready to check fot next interval */
@@ -2967,9 +2994,11 @@ static void tcp_keepalive(struct ctx *c, const struct timespec *now)
 
 /**
  * tcp_inactivity() - Scan for and close long-inactive connections
- * @:	Execution context
+ * @c:		Execution context
+ * @qpair:	Queue pair to process
  */
-static void tcp_inactivity(struct ctx *c, const struct timespec *now)
+static void tcp_inactivity(struct ctx *c, const struct timespec *now,
+			   unsigned int qpair)
 {
 	union flow *flow;
 
@@ -2986,7 +3015,7 @@ static void tcp_inactivity(struct ctx *c, const struct timespec *now)
 			/* No activity in this interval, reset */
 			flow_dbg(conn, "Inactive for at least %us, resetting",
 				 INACTIVITY_INTERVAL);
-			tcp_rst(c, conn);
+			tcp_rst(c, conn, qpair);
 		}
 
 		/* Ready to check fot next interval */
@@ -2998,9 +3027,11 @@ static void tcp_inactivity(struct ctx *c, const struct timespec *now)
  * tcp_defer_handler() - Handler for TCP deferred tasks
  * @c:		Execution context
  * @now:	Current timestamp
+ * @qpair:	Queue pair to process
  */
 /* cppcheck-suppress [constParameterPointer, unmatchedSuppression] */
-void tcp_defer_handler(struct ctx *c, const struct timespec *now)
+void tcp_defer_handler(struct ctx *c, const struct timespec *now,
+		       unsigned int qpair)
 {
 	tcp_payload_flush(c);
 
@@ -3013,8 +3044,8 @@ void tcp_defer_handler(struct ctx *c, const struct timespec *now)
 	if (c->mode == MODE_PASTA)
 		tcp_splice_refill(c);
 
-	tcp_keepalive(c, now);
-	tcp_inactivity(c, now);
+	tcp_keepalive(c, now, qpair);
+	tcp_inactivity(c, now, qpair);
 }
 
 /**
@@ -3988,10 +4019,10 @@ int tcp_flow_migrate_target_ext(struct ctx *c, struct tcp_tap_conn *conn, int fd
 	if (tcp_set_peek_offset(conn, peek_offset))
 		goto fail;
 
-	if (tcp_send_flag(c, conn, ACK))
+	if (tcp_send_flag(c, conn, ACK, QPAIR_DEFAULT))
 		goto fail;
 
-	tcp_data_from_sock(c, conn);
+	tcp_data_from_sock(c, conn, QPAIR_DEFAULT);
 
 	if ((rc = tcp_epoll_ctl(conn))) {
 		flow_dbg(conn,
@@ -4009,7 +4040,7 @@ fail:
 	}
 
 	conn->flags = 0; /* Not waiting for ACK, don't schedule timer */
-	tcp_rst(c, conn);
+	tcp_rst(c, conn, QPAIR_DEFAULT);
 
 	return 0;
 }
diff --git a/tcp.h b/tcp.h
index 3262a807e5d4..490f1b140e44 100644
--- a/tcp.h
+++ b/tcp.h
@@ -18,18 +18,21 @@
 
 struct ctx;
 
-void tcp_timer_handler(const struct ctx *c, union epoll_ref ref);
+void tcp_timer_handler(const struct ctx *c, union epoll_ref ref,
+		       unsigned int qpair);
 void tcp_listen_handler(const struct ctx *c, union epoll_ref ref,
 			const struct timespec *now);
 void tcp_sock_handler(const struct ctx *c, union epoll_ref ref,
-		      uint32_t events);
-int tcp_tap_handler(const struct ctx *c, uint8_t pif, sa_family_t af,
-		    const void *saddr, const void *daddr, uint32_t flow_lbl,
-		    const struct pool *p, int idx, const struct timespec *now);
+		      uint32_t events, unsigned int qpair);
+int tcp_tap_handler(const struct ctx *c, unsigned int qpair, uint8_t pif,
+		    sa_family_t af, const void *saddr, const void *daddr,
+		    uint32_t flow_lbl, const struct pool *p, int idx,
+		    const struct timespec *now);
 int tcp_listen(const struct ctx *c, uint8_t pif, unsigned rule,
 	       const union inany_addr *addr, const char *ifname, in_port_t port);
 int tcp_init(struct ctx *c);
-void tcp_defer_handler(struct ctx *c, const struct timespec *now);
+void tcp_defer_handler(struct ctx *c, const struct timespec *now,
+		       unsigned int qpair);
 
 void tcp_update_l2_buf(const unsigned char *eth_d);
 
diff --git a/tcp_buf.c b/tcp_buf.c
index ca356089dc0b..ae8bebca5107 100644
--- a/tcp_buf.c
+++ b/tcp_buf.c
@@ -124,7 +124,7 @@ static void tcp_revert_seq(const struct ctx *c, struct tcp_tap_conn **conns,
 		conn->seq_to_tap = seq;
 		peek_offset = conn->seq_to_tap - conn->seq_ack_from_tap;
 		if (tcp_set_peek_offset(conn, peek_offset))
-			tcp_rst(c, conn);
+			tcp_rst(c, conn, QPAIR_DEFAULT);
 	}
 }
 
@@ -334,7 +334,7 @@ int tcp_buf_data_from_sock(const struct ctx *c, struct tcp_tap_conn *conn)
 		conn->seq_to_tap = conn->seq_ack_from_tap;
 		already_sent = 0;
 		if (tcp_set_peek_offset(conn, 0)) {
-			tcp_rst(c, conn);
+			tcp_rst(c, conn, QPAIR_DEFAULT);
 			return -1;
 		}
 	}
@@ -356,7 +356,7 @@ int tcp_buf_data_from_sock(const struct ctx *c, struct tcp_tap_conn *conn)
 	}
 
 	if (tcp_prepare_iov(&mh_sock, iov_sock, already_sent, fill_bufs)) {
-		tcp_rst(c, conn);
+		tcp_rst(c, conn, QPAIR_DEFAULT);
 		return -1;
 	}
 
@@ -381,7 +381,7 @@ int tcp_buf_data_from_sock(const struct ctx *c, struct tcp_tap_conn *conn)
 
 	if (len < 0) {
 		if (errno != EAGAIN && errno != EWOULDBLOCK) {
-			tcp_rst(c, conn);
+			tcp_rst(c, conn, QPAIR_DEFAULT);
 			return -errno;
 		}
 
@@ -410,7 +410,7 @@ int tcp_buf_data_from_sock(const struct ctx *c, struct tcp_tap_conn *conn)
 
 			ret = tcp_buf_send_flag(c, conn, FIN | ACK);
 			if (ret) {
-				tcp_rst(c, conn);
+				tcp_rst(c, conn, QPAIR_DEFAULT);
 				return ret;
 			}
 
diff --git a/tcp_internal.h b/tcp_internal.h
index 40472c9973c8..22f8825adccc 100644
--- a/tcp_internal.h
+++ b/tcp_internal.h
@@ -174,11 +174,12 @@ void conn_event_do(const struct ctx *c, struct tcp_tap_conn *conn,
 		conn_event_do(c, conn, event);				\
 	} while (0)
 
-void tcp_rst_do(const struct ctx *c, struct tcp_tap_conn *conn);
-#define tcp_rst(c, conn)						\
+void tcp_rst_do(const struct ctx *c, struct tcp_tap_conn *conn,
+		unsigned int qpair);
+#define tcp_rst(c, conn, qpair)						\
 	do {								\
 		flow_dbg((conn), "TCP reset at %s:%i", __func__, __LINE__); \
-		tcp_rst_do(c, conn);					\
+		tcp_rst_do(c, conn, qpair);				\
 	} while (0)
 
 struct tcp_info_linux;
diff --git a/tcp_vu.c b/tcp_vu.c
index 9ef6b5242c9c..4f76f599156f 100644
--- a/tcp_vu.c
+++ b/tcp_vu.c
@@ -116,15 +116,17 @@ static int tcp_vu_send_dup(const struct ctx *c, struct vu_virtq *vq,
  * @c:		Execution context
  * @conn:	Connection pointer
  * @flags:	TCP flags: if not set, send segment only if ACK is due
+ * @qpair:	Queue pair to process
  *
  * Return: -ECONNRESET on fatal connection error,
  *         -EAGAIN if vhost-user buffers are unavailable,
  *         0 otherwise
  */
-int tcp_vu_send_flag(const struct ctx *c, struct tcp_tap_conn *conn, int flags)
+int tcp_vu_send_flag(const struct ctx *c, struct tcp_tap_conn *conn, int flags,
+		     unsigned int qpair)
 {
 	struct vu_dev *vdev = c->vdev;
-	int rx_queue = QPAIR_TOGUEST_QUEUE(QPAIR_DEFAULT);
+	int rx_queue = QPAIR_TOGUEST_QUEUE(qpair);
 	struct vu_virtq *vq = &vdev->vq[rx_queue];
 	size_t optlen, hdrlen, iov_cnt, iov_used;
 	struct vu_virtq_element flags_elem[2];
@@ -424,13 +426,14 @@ static void tcp_vu_prepare(const struct ctx *c, struct tcp_tap_conn *conn,
  *			     in window
  * @c:		Execution context
  * @conn:	Connection pointer
+ * @qpair:	Queue pair to process
  *
  * Return: negative on connection reset, 0 otherwise
  */
-int tcp_vu_data_from_sock(const struct ctx *c, struct tcp_tap_conn *conn)
+int tcp_vu_data_from_sock(const struct ctx *c, struct tcp_tap_conn *conn,
+			  unsigned int qpair)
 {
 	uint32_t wnd_scaled = conn->wnd_from_tap << conn->ws_from_tap;
-	unsigned int qpair = QPAIR_DEFAULT;
 	int rx_queue = QPAIR_TOGUEST_QUEUE(qpair);
 	struct vu_dev *vdev = c->vdev;
 	struct vu_virtq *vq = &vdev->vq[rx_queue];
@@ -454,7 +457,7 @@ int tcp_vu_data_from_sock(const struct ctx *c, struct tcp_tap_conn *conn)
 		conn->seq_to_tap = conn->seq_ack_from_tap;
 		already_sent = 0;
 		if (tcp_set_peek_offset(conn, 0)) {
-			tcp_rst(c, conn);
+			tcp_rst(c, conn, qpair);
 			return -1;
 		}
 	}
@@ -477,7 +480,7 @@ int tcp_vu_data_from_sock(const struct ctx *c, struct tcp_tap_conn *conn)
 			       &elem_cnt, &frame_cnt);
 	if (len < 0) {
 		if (len != -EAGAIN && len != -EWOULDBLOCK) {
-			tcp_rst(c, conn);
+			tcp_rst(c, conn, qpair);
 			return len;
 		}
 
@@ -498,9 +501,9 @@ int tcp_vu_data_from_sock(const struct ctx *c, struct tcp_tap_conn *conn)
 			/* See tcp_buf_data_from_sock() */
 			conn->seq_ack_to_tap = conn->seq_from_tap;
 
-			ret = tcp_vu_send_flag(c, conn, FIN | ACK);
+			ret = tcp_vu_send_flag(c, conn, FIN | ACK, qpair);
 			if (ret) {
-				tcp_rst(c, conn);
+				tcp_rst(c, conn, qpair);
 				return ret;
 			}
 
diff --git a/tcp_vu.h b/tcp_vu.h
index 6ab6057f352a..ae48420f4906 100644
--- a/tcp_vu.h
+++ b/tcp_vu.h
@@ -6,7 +6,9 @@
 #ifndef TCP_VU_H
 #define TCP_VU_H
 
-int tcp_vu_send_flag(const struct ctx *c, struct tcp_tap_conn *conn, int flags);
-int tcp_vu_data_from_sock(const struct ctx *c, struct tcp_tap_conn *conn);
+int tcp_vu_send_flag(const struct ctx *c, struct tcp_tap_conn *conn, int flags,
+		     unsigned int qpair);
+int tcp_vu_data_from_sock(const struct ctx *c, struct tcp_tap_conn *conn,
+			  unsigned int qpair);
 
 #endif  /*TCP_VU_H */
-- 
2.54.0


^ permalink raw reply	[flat|nested] 13+ messages in thread

* [PATCH v5 07/12] udp: Pass queue pair explicitly through UDP send path
  2026-06-16 12:51 [PATCH v5 00/12] vhost-user: Add multiqueue support Laurent Vivier
                   ` (5 preceding siblings ...)
  2026-06-16 12:51 ` [PATCH v5 06/12] tcp: Pass queue pair explicitly through TCP " Laurent Vivier
@ 2026-06-16 12:51 ` Laurent Vivier
  2026-06-16 12:51 ` [PATCH v5 08/12] dhcp/dhcpv6: Pass queue pair explicitly through DHCP " Laurent Vivier
                   ` (4 subsequent siblings)
  11 siblings, 0 replies; 13+ messages in thread
From: Laurent Vivier @ 2026-06-16 12:51 UTC (permalink / raw)
  To: passt-dev; +Cc: Laurent Vivier

Thread the queue pair parameter through the UDP socket handler and send
path, replacing hardcoded QPAIR_DEFAULT values.  This is the UDP
counterpart to the equivalent TCP and ICMP changes.

The queue pair is passed from passt_worker() through
udp_listen_sock_handler(), udp_sock_handler(), udp_sock_fwd(), and down
into udp_sock_errs()/udp_sock_recverr() for ICMP error generation, as
well as udp_vu_sock_to_tap() for vhost-user delivery.

On the flow handling side, flow_defer_handler() receives the queue pair
and passes it to udp_flow_defer() and udp_flush_flow(), so that deferred
UDP datagrams are forwarded on the correct queue.

No functional change.

Signed-off-by: Laurent Vivier <lvivier@redhat.com>
---
 flow.c         |  5 +++--
 flow.h         |  3 ++-
 passt.c        |  8 +++++---
 tap.c          |  4 ++--
 udp.c          | 45 +++++++++++++++++++++++++--------------------
 udp.h          |  9 +++++----
 udp_flow.c     | 24 ++++++++++++++----------
 udp_flow.h     |  4 ++--
 udp_internal.h |  3 ++-
 udp_vu.c       |  5 +++--
 udp_vu.h       |  3 ++-
 11 files changed, 65 insertions(+), 48 deletions(-)

diff --git a/flow.c b/flow.c
index 565ed2b2f7e7..c93b73549c90 100644
--- a/flow.c
+++ b/flow.c
@@ -884,7 +884,8 @@ flow_sidx_t flow_lookup_sa(const struct ctx *c, uint8_t proto, uint8_t pif,
  * @c:		Execution context
  * @now:	Current timestamp
  */
-void flow_defer_handler(const struct ctx *c, const struct timespec *now)
+void flow_defer_handler(const struct ctx *c, const struct timespec *now,
+			unsigned int qpair)
 {
 	struct flow_free_cluster *free_head = NULL;
 	unsigned *last_next = &flow_first_free;
@@ -923,7 +924,7 @@ void flow_defer_handler(const struct ctx *c, const struct timespec *now)
 				closed = icmp_ping_timer(c, &flow->ping, now);
 			break;
 		case FLOW_UDP:
-			closed = udp_flow_defer(c, &flow->udp, now);
+			closed = udp_flow_defer(c, &flow->udp, now, qpair);
 			if (!closed && timer)
 				closed = udp_flow_timer(c, &flow->udp, now);
 			break;
diff --git a/flow.h b/flow.h
index 6c6a9260aa23..cae259fe7037 100644
--- a/flow.h
+++ b/flow.h
@@ -270,7 +270,8 @@ void flow_epollid_set(struct flow_common *f, int epollid);
 int flow_epoll_set(const struct flow_common *f, int command, uint32_t events,
 		   int fd, unsigned int sidei);
 void flow_epollid_register(int epollid, int epollfd);
-void flow_defer_handler(const struct ctx *c, const struct timespec *now);
+void flow_defer_handler(const struct ctx *c, const struct timespec *now,
+			unsigned int qpair);
 int flow_migrate_source_early(struct ctx *c, const struct migrate_stage *stage,
 			      int fd);
 int flow_migrate_source_pre(struct ctx *c, const struct migrate_stage *stage,
diff --git a/passt.c b/passt.c
index 9569f920ee28..41239991451f 100644
--- a/passt.c
+++ b/passt.c
@@ -106,7 +106,7 @@ static void post_handler(struct ctx *c, const struct timespec *now,
 	if (!c->no_tcp)
 		tcp_defer_handler(c, now, qpair);
 
-	flow_defer_handler(c, now);
+	flow_defer_handler(c, now, qpair);
 	fwd_scan_ports_timer(c, now);
 
 	if (!c->no_ndp)
@@ -265,10 +265,12 @@ static void passt_worker(void *opaque, int nfds, struct epoll_event *events)
 			tcp_timer_handler(c, ref, QPAIR_DEFAULT);
 			break;
 		case EPOLL_TYPE_UDP_LISTEN:
-			udp_listen_sock_handler(c, ref, eventmask, &now);
+			udp_listen_sock_handler(c, ref, eventmask, &now,
+						QPAIR_DEFAULT);
 			break;
 		case EPOLL_TYPE_UDP:
-			udp_sock_handler(c, ref, eventmask, &now);
+			udp_sock_handler(c, ref, eventmask, &now,
+					 QPAIR_DEFAULT);
 			break;
 		case EPOLL_TYPE_PING:
 			icmp_sock_handler(c, ref);
diff --git a/tap.c b/tap.c
index ba2a573fa630..e8fd3661ebb5 100644
--- a/tap.c
+++ b/tap.c
@@ -875,7 +875,7 @@ append:
 			if (c->no_udp)
 				continue;
 			for (k = 0; k < p->count; )
-				k += udp_tap_handler(c, PIF_TAP, AF_INET,
+				k += udp_tap_handler(c, qpair, PIF_TAP, AF_INET,
 						     &seq->saddr, &seq->daddr,
 						     seq->ttl, p, k, now);
 		}
@@ -1124,7 +1124,7 @@ append:
 			if (c->no_udp)
 				continue;
 			for (k = 0; k < p->count; )
-				k += udp_tap_handler(c, PIF_TAP, AF_INET6,
+				k += udp_tap_handler(c, qpair, PIF_TAP, AF_INET6,
 						     &seq->saddr, &seq->daddr,
 						     seq->hop_limit, p, k, now);
 		}
diff --git a/udp.c b/udp.c
index a295cb0e97cf..e91d44aa33d6 100644
--- a/udp.c
+++ b/udp.c
@@ -403,13 +403,14 @@ static void udp_tap_prepare(const struct mmsghdr *mmh,
 /**
  * udp_send_tap_icmp4() - Construct and send ICMPv4 to local peer
  * @c:		Execution context
+ * @qpair:	Queue pair on which to send the ICMPv4 packet
  * @ee:	Extended error descriptor
  * @toside:	Destination side of flow
  * @saddr:	Address of ICMP generating node
  * @in:	First bytes (max 8) of original UDP message body
  * @dlen:	Length of the read part of original UDP message body
  */
-static void udp_send_tap_icmp4(const struct ctx *c,
+static void udp_send_tap_icmp4(const struct ctx *c, unsigned int qpair,
 			       const struct sock_extended_err *ee,
 			       const struct flowside *toside,
 			       struct in_addr saddr,
@@ -445,13 +446,14 @@ static void udp_send_tap_icmp4(const struct ctx *c,
 	/* Try to obtain the MAC address of the generating node */
 	saddr_any = inany_from_v4(saddr);
 	fwd_neigh_mac_get(c, &saddr_any, tap_omac);
-	tap_icmp4_send(c, QPAIR_DEFAULT, saddr, eaddr, &msg, tap_omac, msglen);
+	tap_icmp4_send(c, qpair, saddr, eaddr, &msg, tap_omac, msglen);
 }
 
 
 /**
  * udp_send_tap_icmp6() - Construct and send ICMPv6 to local peer
  * @c:		Execution context
+ * @qpair:	Queue pair on which to send the ICMPv6 packet
  * @ee:	Extended error descriptor
  * @toside:	Destination side of flow
  * @saddr:	Address of ICMP generating node
@@ -459,7 +461,7 @@ static void udp_send_tap_icmp4(const struct ctx *c,
  * @dlen:	Length of the read part of original UDP message body
  * @flow:	IPv6 flow identifier
  */
-static void udp_send_tap_icmp6(const struct ctx *c,
+static void udp_send_tap_icmp6(const struct ctx *c, unsigned int qpair,
 			       const struct sock_extended_err *ee,
 			       const struct flowside *toside,
 			       const struct in6_addr *saddr,
@@ -493,7 +495,7 @@ static void udp_send_tap_icmp6(const struct ctx *c,
 
 	/* Try to obtain the MAC address of the generating node */
 	fwd_neigh_mac_get(c, (union inany_addr *) saddr, tap_omac);
-	tap_icmp6_send(c, QPAIR_DEFAULT, saddr, eaddr, &msg, tap_omac, msglen);
+	tap_icmp6_send(c, qpair, saddr, eaddr, &msg, tap_omac, msglen);
 }
 
 /**
@@ -546,7 +548,7 @@ static int udp_pktinfo(struct msghdr *msg, union inany_addr *dst)
  * #syscalls recvmsg
  */
 static int udp_sock_recverr(const struct ctx *c, int s, flow_sidx_t sidx,
-			    uint8_t pif, in_port_t port)
+			    uint8_t pif, in_port_t port, unsigned int qpair)
 {
 	char buf[PKTINFO_SPACE + RECVERR_SPACE];
 	const struct sock_extended_err *ee;
@@ -653,12 +655,12 @@ static int udp_sock_recverr(const struct ctx *c, int s, flow_sidx_t sidx,
 	if (hdr->cmsg_level == IPPROTO_IP &&
 	    (o4 = inany_v4(&otap)) && inany_v4(&toside->eaddr)) {
 		dlen = MIN(dlen, ICMP4_MAX_DLEN);
-		udp_send_tap_icmp4(c, ee, toside, *o4, data, dlen);
+		udp_send_tap_icmp4(c, qpair, ee, toside, *o4, data, dlen);
 		return 1;
 	}
 
 	if (hdr->cmsg_level == IPPROTO_IPV6 && !inany_v4(&toside->eaddr)) {
-		udp_send_tap_icmp6(c, ee, toside, &otap.a6, data, dlen,
+		udp_send_tap_icmp6(c, qpair, ee, toside, &otap.a6, data, dlen,
 				   FLOW_IDX(uflow));
 		return 1;
 	}
@@ -685,7 +687,7 @@ fail:
  * Return: number of errors handled, or < 0 if we have an unrecoverable error
  */
 static int udp_sock_errs(const struct ctx *c, int s, flow_sidx_t sidx,
-			 uint8_t pif, in_port_t port)
+			 uint8_t pif, in_port_t port, unsigned int qpair)
 {
 	unsigned n_err = 0;
 	socklen_t errlen;
@@ -694,7 +696,7 @@ static int udp_sock_errs(const struct ctx *c, int s, flow_sidx_t sidx,
 	assert(!c->no_udp);
 
 	/* Empty the error queue */
-	while ((rc = udp_sock_recverr(c, s, sidx, pif, port)) > 0)
+	while ((rc = udp_sock_recverr(c, s, sidx, pif, port, qpair)) > 0)
 		n_err += rc;
 
 	if (rc < 0)
@@ -853,7 +855,8 @@ static void udp_buf_sock_to_tap(const struct ctx *c, int s, int n,
  * @now:	Current timestamp
  */
 void udp_sock_fwd(const struct ctx *c, int s, int rule_hint,
-		  uint8_t frompif, in_port_t port, const struct timespec *now)
+		  uint8_t frompif, in_port_t port, const struct timespec *now,
+		  unsigned int qpair)
 {
 	union sockaddr_inany src;
 	union inany_addr dst;
@@ -869,7 +872,7 @@ void udp_sock_fwd(const struct ctx *c, int s, int rule_hint,
 			      strerror_(-rc));
 			/* Clear errors & carry on */
 			if (udp_sock_errs(c, s, FLOW_SIDX_NONE,
-					  frompif, port) < 0) {
+					  frompif, port, qpair) < 0) {
 				err_ratelimit(now,
 "UDP: Unrecoverable error on listening socket: (%s port %hu)",
 				    pif_name(frompif), port);
@@ -886,7 +889,7 @@ void udp_sock_fwd(const struct ctx *c, int s, int rule_hint,
 			udp_sock_to_sock(c, s, 1, tosidx);
 		} else if (topif == PIF_TAP) {
 			if (c->mode == MODE_VU)
-				udp_vu_sock_to_tap(c, s, 1, tosidx);
+				udp_vu_sock_to_tap(c, s, 1, tosidx, qpair);
 			else
 				udp_buf_sock_to_tap(c, s, 1, tosidx);
 		} else if (flow_sidx_valid(tosidx)) {
@@ -919,11 +922,11 @@ void udp_sock_fwd(const struct ctx *c, int s, int rule_hint,
  */
 void udp_listen_sock_handler(const struct ctx *c,
 			     union epoll_ref ref, uint32_t events,
-			     const struct timespec *now)
+			     const struct timespec *now, unsigned int qpair)
 {
 	if (events & (EPOLLERR | EPOLLIN)) {
 		udp_sock_fwd(c, ref.fd, ref.listen.rule,
-			     ref.listen.pif, ref.listen.port, now);
+			     ref.listen.pif, ref.listen.port, now, qpair);
 	}
 }
 
@@ -933,16 +936,17 @@ void udp_listen_sock_handler(const struct ctx *c,
  * @ref:	epoll reference
  * @events:	epoll events bitmap
  * @now:	Current timestamp
+ * @qpair:	Queue pair to process
  */
-void udp_sock_handler(const struct ctx *c, union epoll_ref ref,
-		      uint32_t events, const struct timespec *now)
+void udp_sock_handler(const struct ctx *c, union epoll_ref ref, uint32_t events,
+		      const struct timespec *now, unsigned int qpair)
 {
 	struct udp_flow *uflow = udp_at_sidx(ref.flowside);
 
 	assert(!c->no_udp && uflow);
 
 	if (events & EPOLLERR) {
-		if (udp_sock_errs(c, ref.fd, ref.flowside, PIF_NONE, 0) < 0) {
+		if (udp_sock_errs(c, ref.fd, ref.flowside, PIF_NONE, 0, qpair) < 0) {
 			flow_err(uflow, "Unrecoverable error on flow socket");
 			goto fail;
 		}
@@ -969,7 +973,7 @@ void udp_sock_handler(const struct ctx *c, union epoll_ref ref,
 		} else if (topif == PIF_TAP) {
 			if (c->mode == MODE_VU) {
 				udp_vu_sock_to_tap(c, s, UDP_MAX_FRAMES,
-						   tosidx);
+						   tosidx, qpair);
 			} else {
 				udp_buf_sock_to_tap(c, s, n, tosidx);
 			}
@@ -991,6 +995,7 @@ fail:
 /**
  * udp_tap_handler() - Handle packets from tap
  * @c:		Execution context
+ * @qpair:	Queue pair to process
  * @pif:	pif on which the packet is arriving
  * @af:		Address family, AF_INET or AF_INET6
  * @saddr:	Source address
@@ -1004,7 +1009,7 @@ fail:
  *
  * #syscalls sendmmsg
  */
-int udp_tap_handler(const struct ctx *c, uint8_t pif,
+int udp_tap_handler(const struct ctx *c, unsigned int qpair, uint8_t pif,
 		    sa_family_t af, const void *saddr, const void *daddr,
 		    uint8_t ttl, const struct pool *p, int idx,
 		    const struct timespec *now)
@@ -1037,7 +1042,7 @@ int udp_tap_handler(const struct ctx *c, uint8_t pif,
 	src = ntohs(uh->source);
 	dst = ntohs(uh->dest);
 
-	tosidx = udp_flow_from_tap(c, pif, af, saddr, daddr, src, dst, now);
+	tosidx = udp_flow_from_tap(c, qpair, pif, af, saddr, daddr, src, dst, now);
 	if (!(uflow = udp_at_sidx(tosidx))) {
 		char sstr[INET6_ADDRSTRLEN], dstr[INET6_ADDRSTRLEN];
 
diff --git a/udp.h b/udp.h
index 42d7a1c708cc..35b12ea2c9a6 100644
--- a/udp.h
+++ b/udp.h
@@ -12,10 +12,11 @@
 #include "fwd.h"
 
 void udp_listen_sock_handler(const struct ctx *c, union epoll_ref ref,
-			     uint32_t events, const struct timespec *now);
-void udp_sock_handler(const struct ctx *c, union epoll_ref ref,
-		      uint32_t events, const struct timespec *now);
-int udp_tap_handler(const struct ctx *c, uint8_t pif,
+			     uint32_t events, const struct timespec *now,
+			     unsigned int qpair);
+void udp_sock_handler(const struct ctx *c, union epoll_ref ref, uint32_t events,
+		      const struct timespec *now, unsigned int qpair);
+int udp_tap_handler(const struct ctx *c, unsigned int qpair, uint8_t pif,
 		    sa_family_t af, const void *saddr, const void *daddr,
 		    uint8_t ttl, const struct pool *p, int idx,
 		    const struct timespec *now);
diff --git a/udp_flow.c b/udp_flow.c
index 35417bc48a39..143f265493fa 100644
--- a/udp_flow.c
+++ b/udp_flow.c
@@ -127,6 +127,7 @@ static int udp_flow_sock(const struct ctx *c,
 /**
  * udp_flow_new() - Common setup for a new UDP flow
  * @c:		Execution context
+ * @qpair:	Queue pair for the flow
  * @flow:	Initiated flow
  * @rule_hint:	Index of forwarding rule, or -1 if unknown
  * @now:	Timestamp
@@ -136,8 +137,9 @@ static int udp_flow_sock(const struct ctx *c,
  *
  * #syscalls getsockname
  */
-static flow_sidx_t udp_flow_new(const struct ctx *c, union flow *flow,
-				int rule_hint, const struct timespec *now)
+static flow_sidx_t udp_flow_new(const struct ctx *c, unsigned int qpair,
+				union flow *flow, int rule_hint,
+				const struct timespec *now)
 {
 	struct udp_flow *uflow = NULL;
 	const struct flowside *tgt;
@@ -152,6 +154,7 @@ static flow_sidx_t udp_flow_new(const struct ctx *c, union flow *flow,
 	uflow->ttl[INISIDE] = uflow->ttl[TGTSIDE] = 0;
 	uflow->activity[INISIDE] = 1;
 	uflow->activity[TGTSIDE] = 0;
+	(void)qpair;
 
 	flow_foreach_sidei(sidei) {
 		if (pif_is_socket(uflow->f.pif[sidei]))
@@ -254,12 +257,13 @@ flow_sidx_t udp_flow_from_sock(const struct ctx *c, uint8_t pif,
 		return FLOW_SIDX_NONE;
 	}
 
-	return udp_flow_new(c, flow, rule_hint, now);
+	return udp_flow_new(c, QPAIR_DEFAULT, flow, rule_hint, now);
 }
 
 /**
  * udp_flow_from_tap() - Find or create UDP flow for tap packets
  * @c:		Execution context
+ * @qpair:	Queue pair for the flow
  * @pif:	pif on which the packet is arriving
  * @af:		Address family, AF_INET or AF_INET6
  * @saddr:	Source address on guest side
@@ -270,7 +274,7 @@ flow_sidx_t udp_flow_from_sock(const struct ctx *c, uint8_t pif,
  * Return: sidx for the destination side of the flow for this packet, or
  *         FLOW_SIDX_NONE if we couldn't find or create a flow.
  */
-flow_sidx_t udp_flow_from_tap(const struct ctx *c,
+flow_sidx_t udp_flow_from_tap(const struct ctx *c, unsigned int qpair,
 			      uint8_t pif, sa_family_t af,
 			      const void *saddr, const void *daddr,
 			      in_port_t srcport, in_port_t dstport,
@@ -310,7 +314,7 @@ flow_sidx_t udp_flow_from_tap(const struct ctx *c,
 		return FLOW_SIDX_NONE;
 	}
 
-	return udp_flow_new(c, flow, FWD_NO_HINT, now);
+	return udp_flow_new(c, qpair, flow, FWD_NO_HINT, now);
 }
 
 /**
@@ -322,12 +326,12 @@ flow_sidx_t udp_flow_from_tap(const struct ctx *c,
  */
 static void udp_flush_flow(const struct ctx *c,
 			   const struct udp_flow *uflow, unsigned sidei,
-			   const struct timespec *now)
+			   const struct timespec *now, unsigned int qpair)
 {
 	/* We don't know exactly where the datagrams will come from, but we know
 	 * they'll have an interface and oport matching this flow */
 	udp_sock_fwd(c, uflow->s[sidei], -1, uflow->f.pif[sidei],
-		     uflow->f.side[sidei].oport, now);
+		     uflow->f.side[sidei].oport, now, qpair);
 }
 
 /**
@@ -339,14 +343,14 @@ static void udp_flush_flow(const struct ctx *c,
  * Return: true if the connection is ready to free, false otherwise
  */
 bool udp_flow_defer(const struct ctx *c, struct udp_flow *uflow,
-		    const struct timespec *now)
+		    const struct timespec *now, unsigned int qpair)
 {
 	if (uflow->flush0) {
-		udp_flush_flow(c, uflow, INISIDE, now);
+		udp_flush_flow(c, uflow, INISIDE, now, qpair);
 		uflow->flush0 = false;
 	}
 	if (uflow->flush1) {
-		udp_flush_flow(c, uflow, TGTSIDE, now);
+		udp_flush_flow(c, uflow, TGTSIDE, now, qpair);
 		uflow->flush1 = false;
 	}
 	return uflow->closed;
diff --git a/udp_flow.h b/udp_flow.h
index 62cc9b3aae1f..5a297c61646a 100644
--- a/udp_flow.h
+++ b/udp_flow.h
@@ -44,14 +44,14 @@ flow_sidx_t udp_flow_from_sock(const struct ctx *c, uint8_t pif,
 			       const union inany_addr *dst, in_port_t port,
 			       const union sockaddr_inany *s_in,
 			       int rule_hint, const struct timespec *now);
-flow_sidx_t udp_flow_from_tap(const struct ctx *c,
+flow_sidx_t udp_flow_from_tap(const struct ctx *c, unsigned int qpair,
 			      uint8_t pif, sa_family_t af,
 			      const void *saddr, const void *daddr,
 			      in_port_t srcport, in_port_t dstport,
 			      const struct timespec *now);
 void udp_flow_close(const struct ctx *c, struct udp_flow *uflow);
 bool udp_flow_defer(const struct ctx *c, struct udp_flow *uflow,
-		    const struct timespec *now);
+		    const struct timespec *now, unsigned int qpair);
 bool udp_flow_timer(const struct ctx *c, struct udp_flow *uflow,
 		    const struct timespec *now);
 void udp_flow_activity(struct udp_flow *uflow, unsigned int sidei,
diff --git a/udp_internal.h b/udp_internal.h
index 361cc7495a01..0cd6da49fc05 100644
--- a/udp_internal.h
+++ b/udp_internal.h
@@ -34,6 +34,7 @@ size_t udp_update_hdr6(struct ipv6hdr *ip6h, struct udphdr *uh,
 		       const struct flowside *toside, size_t dlen,
 		       bool no_udp_csum);
 void udp_sock_fwd(const struct ctx *c, int s, int rule_hint,
-		  uint8_t frompif, in_port_t port, const struct timespec *now);
+		  uint8_t frompif, in_port_t port, const struct timespec *now,
+		  unsigned int qpair);
 
 #endif /* UDP_INTERNAL_H */
diff --git a/udp_vu.c b/udp_vu.c
index b1a8ad76a691..864e7a99b8d9 100644
--- a/udp_vu.c
+++ b/udp_vu.c
@@ -141,13 +141,14 @@ static void udp_vu_prepare(const struct ctx *c, struct iov_tail *data,
  * @n:		Maximum number of datagrams to forward
  * @tosidx:	Flow & side to forward data from @s to
  */
-void udp_vu_sock_to_tap(const struct ctx *c, int s, int n, flow_sidx_t tosidx)
+void udp_vu_sock_to_tap(const struct ctx *c, int s, int n, flow_sidx_t tosidx,
+			 unsigned int qpair)
 {
 	const struct flowside *toside = flowside_at_sidx(tosidx);
 	bool v6 = !(inany_v4(&toside->eaddr) && inany_v4(&toside->oaddr));
 	static struct vu_virtq_element elem[VIRTQUEUE_MAX_SIZE];
 	static struct iovec iov_vu[VIRTQUEUE_MAX_SIZE];
-	int rx_queue = QPAIR_TOGUEST_QUEUE(QPAIR_DEFAULT);
+	int rx_queue = QPAIR_TOGUEST_QUEUE(qpair);
 	struct vu_dev *vdev = c->vdev;
 	struct vu_virtq *vq = &vdev->vq[rx_queue];
 	size_t hdrlen = udp_vu_hdrlen(v6);
diff --git a/udp_vu.h b/udp_vu.h
index 1e38af35ad4e..40ab28119b10 100644
--- a/udp_vu.h
+++ b/udp_vu.h
@@ -10,6 +10,7 @@
 
 void udp_vu_listen_sock_data(const struct ctx *c, union epoll_ref ref,
 			     const struct timespec *now);
-void udp_vu_sock_to_tap(const struct ctx *c, int s, int n, flow_sidx_t tosidx);
+void udp_vu_sock_to_tap(const struct ctx *c, int s, int n, flow_sidx_t tosidx,
+			unsigned int qpair);
 
 #endif /* UDP_VU_H */
-- 
2.54.0


^ permalink raw reply	[flat|nested] 13+ messages in thread

* [PATCH v5 08/12] dhcp/dhcpv6: Pass queue pair explicitly through DHCP send path
  2026-06-16 12:51 [PATCH v5 00/12] vhost-user: Add multiqueue support Laurent Vivier
                   ` (6 preceding siblings ...)
  2026-06-16 12:51 ` [PATCH v5 07/12] udp: Pass queue pair explicitly through UDP " Laurent Vivier
@ 2026-06-16 12:51 ` Laurent Vivier
  2026-06-16 12:51 ` [PATCH v5 09/12] icmp: Pass queue pair explicitly through ICMP " Laurent Vivier
                   ` (3 subsequent siblings)
  11 siblings, 0 replies; 13+ messages in thread
From: Laurent Vivier @ 2026-06-16 12:51 UTC (permalink / raw)
  To: passt-dev; +Cc: Laurent Vivier

Add a qpair parameter to dhcp(), dhcpv6(), and
dhcpv6_send_ia_notonlink(), forwarding it to tap_udp4_send() and
tap_udp6_send() instead of hardcoding QPAIR_DEFAULT.

No functional change.

Signed-off-by: Laurent Vivier <lvivier@redhat.com>
---
 dhcp.c   |  6 +++---
 dhcp.h   |  2 +-
 dhcpv6.c | 12 +++++++-----
 dhcpv6.h |  2 +-
 tap.c    |  4 ++--
 5 files changed, 14 insertions(+), 12 deletions(-)

diff --git a/dhcp.c b/dhcp.c
index e652659138b8..6ff094c9179e 100644
--- a/dhcp.c
+++ b/dhcp.c
@@ -296,11 +296,12 @@ static void opt_set_dns_search(const struct ctx *c, size_t max_len)
 /**
  * dhcp() - Check if this is a DHCP message, reply as needed
  * @c:		Execution context
+ * @qpair:	Queue pair on which to send the reply
  * @data:	Single packet with Ethernet buffer
  *
  * Return: 0 if it's not a DHCP message, 1 if handled, -1 on failure
  */
-int dhcp(const struct ctx *c, struct iov_tail *data)
+int dhcp(const struct ctx *c, unsigned int qpair, struct iov_tail *data)
 {
 	char macstr[ETH_ADDRSTRLEN];
 	size_t mlen, dlen, opt_len;
@@ -473,8 +474,7 @@ int dhcp(const struct ctx *c, struct iov_tail *data)
 	else
 		dst = c->ip4.addr;
 
-	tap_udp4_send(c, QPAIR_DEFAULT, c->ip4.our_tap_addr, 67, dst, 68,
-		      &reply, dlen);
+	tap_udp4_send(c, qpair, c->ip4.our_tap_addr, 67, dst, 68, &reply, dlen);
 
 	return 1;
 }
diff --git a/dhcp.h b/dhcp.h
index cd50c99b8856..6d034f0c58af 100644
--- a/dhcp.h
+++ b/dhcp.h
@@ -6,7 +6,7 @@
 #ifndef DHCP_H
 #define DHCP_H
 
-int dhcp(const struct ctx *c, struct iov_tail *data);
+int dhcp(const struct ctx *c, unsigned int qpair, struct iov_tail *data);
 void dhcp_init(void);
 
 #endif /* DHCP_H */
diff --git a/dhcpv6.c b/dhcpv6.c
index f4ebeccf912c..3c97c921a849 100644
--- a/dhcpv6.c
+++ b/dhcpv6.c
@@ -370,12 +370,13 @@ notonlink:
 /**
  * dhcpv6_send_ia_notonlink() - Send NotOnLink status
  * @c:			Execution context
+ * @qpair:		Queue pair on which to send the reply
  * @ia_base:		Non-appropriate IA_NA or IA_TA base
  * @client_id_base:	Client ID message option base
  * @len:		Client ID length
  * @xid:		Transaction ID for message exchange
  */
-static void dhcpv6_send_ia_notonlink(struct ctx *c,
+static void dhcpv6_send_ia_notonlink(struct ctx *c, unsigned int qpair,
 				     const struct iov_tail *ia_base,
 				     const struct iov_tail *client_id_base,
 				     int len, uint32_t xid)
@@ -405,7 +406,7 @@ static void dhcpv6_send_ia_notonlink(struct ctx *c,
 
 	resp_not_on_link.hdr.xid = xid;
 
-	tap_udp6_send(c, QPAIR_DEFAULT, src, 547, tap_ip6_daddr(c, src), 546,
+	tap_udp6_send(c, qpair, src, 547, tap_ip6_daddr(c, src), 546,
 		      xid, &resp_not_on_link, n);
 }
 
@@ -542,13 +543,14 @@ static size_t dhcpv6_client_fqdn_fill(const struct iov_tail *data,
 /**
  * dhcpv6() - Check if this is a DHCPv6 message, reply as needed
  * @c:		Execution context
+ * @qpair:	Queue pair on which to send the reply
  * @data:	Single packet starting from UDP header
  * @saddr:	Source IPv6 address of original message
  * @daddr:	Destination IPv6 address of original message
  *
  * Return: 0 if it's not a DHCPv6 message, 1 if handled, -1 on failure
  */
-int dhcpv6(struct ctx *c, struct iov_tail *data,
+int dhcpv6(struct ctx *c, unsigned int qpair, struct iov_tail *data,
 	   const struct in6_addr *saddr, const struct in6_addr *daddr)
 {
 	const struct opt_server_id *server_id = NULL;
@@ -630,7 +632,7 @@ int dhcpv6(struct ctx *c, struct iov_tail *data,
 
 		if (dhcpv6_ia_notonlink(data, &c->ip6.addr)) {
 
-			dhcpv6_send_ia_notonlink(c, data, &client_id_base,
+			dhcpv6_send_ia_notonlink(c, qpair, data, &client_id_base,
 						 ntohs(client_id->l), mh->xid);
 
 			return 1;
@@ -680,7 +682,7 @@ int dhcpv6(struct ctx *c, struct iov_tail *data,
 
 	resp.hdr.xid = mh->xid;
 
-	tap_udp6_send(c, QPAIR_DEFAULT, src, 547, tap_ip6_daddr(c, src), 546,
+	tap_udp6_send(c, qpair, src, 547, tap_ip6_daddr(c, src), 546,
 		      mh->xid, &resp, n);
 	c->ip6.addr_seen = c->ip6.addr;
 
diff --git a/dhcpv6.h b/dhcpv6.h
index c706dfdbb2ac..3a249b39e6c7 100644
--- a/dhcpv6.h
+++ b/dhcpv6.h
@@ -6,7 +6,7 @@
 #ifndef DHCPV6_H
 #define DHCPV6_H
 
-int dhcpv6(struct ctx *c, struct iov_tail *data,
+int dhcpv6(struct ctx *c, unsigned int qpair, struct iov_tail *data,
 	   struct in6_addr *saddr, struct in6_addr *daddr);
 void dhcpv6_init(const struct ctx *c);
 
diff --git a/tap.c b/tap.c
index e8fd3661ebb5..8e19390f7273 100644
--- a/tap.c
+++ b/tap.c
@@ -805,7 +805,7 @@ resume:
 			struct iov_tail eh_data;
 
 			packet_get(pool_tap4, i, &eh_data);
-			if (dhcp(c, &eh_data))
+			if (dhcp(c, qpair, &eh_data))
 				continue;
 		}
 
@@ -1049,7 +1049,7 @@ resume:
 		if (proto == IPPROTO_UDP) {
 			struct iov_tail uh_data = data;
 
-			if (dhcpv6(c, &uh_data, saddr, daddr))
+			if (dhcpv6(c, qpair, &uh_data, saddr, daddr))
 				continue;
 		}
 
-- 
2.54.0


^ permalink raw reply	[flat|nested] 13+ messages in thread

* [PATCH v5 09/12] icmp: Pass queue pair explicitly through ICMP send path
  2026-06-16 12:51 [PATCH v5 00/12] vhost-user: Add multiqueue support Laurent Vivier
                   ` (7 preceding siblings ...)
  2026-06-16 12:51 ` [PATCH v5 08/12] dhcp/dhcpv6: Pass queue pair explicitly through DHCP " Laurent Vivier
@ 2026-06-16 12:51 ` Laurent Vivier
  2026-06-16 12:51 ` [PATCH v5 10/12] ndp: Pass queue pair explicitly through NDP " Laurent Vivier
                   ` (2 subsequent siblings)
  11 siblings, 0 replies; 13+ messages in thread
From: Laurent Vivier @ 2026-06-16 12:51 UTC (permalink / raw)
  To: passt-dev; +Cc: Laurent Vivier

icmp_sock_handler() uses QPAIR_DEFAULT when calling tap_icmp4_send()
and tap_icmp6_send().  Thread a qpair parameter from the caller in
passt_worker() so the queue pair is selected at the entry point rather
than hard-coded inside the handler.

passt_worker() currently passes QPAIR_DEFAULT, preserving existing
single-queue behavior.  This is a preparatory step for per-queue
worker threads in vhost-user mode.

No functional change.

Signed-off-by: Laurent Vivier <lvivier@redhat.com>
---
 icmp.c  | 22 ++++++++++++++--------
 icmp.h  |  7 ++++---
 passt.c |  2 +-
 tap.c   |  4 ++--
 4 files changed, 21 insertions(+), 14 deletions(-)

diff --git a/icmp.c b/icmp.c
index 3705f5ce0c9b..62038f977116 100644
--- a/icmp.c
+++ b/icmp.c
@@ -66,8 +66,10 @@ static struct icmp_ping_flow *ping_at_sidx(flow_sidx_t sidx)
  * icmp_sock_handler() - Handle new data from ICMP or ICMPv6 socket
  * @c:		Execution context
  * @ref:	epoll reference
+ * @qpair:	Queue pair to process
  */
-void icmp_sock_handler(const struct ctx *c, union epoll_ref ref)
+void icmp_sock_handler(const struct ctx *c, union epoll_ref ref,
+		       unsigned int qpair)
 {
 	struct icmp_ping_flow *pingf = ping_at_sidx(ref.flowside);
 	const struct flowside *ini = &pingf->f.side[INISIDE];
@@ -132,13 +134,13 @@ void icmp_sock_handler(const struct ctx *c, union epoll_ref ref)
 		const struct in_addr *daddr = inany_v4(&ini->eaddr);
 
 		assert(saddr && daddr); /* Must have IPv4 addresses */
-		tap_icmp4_send(c, QPAIR_DEFAULT, *saddr, *daddr, buf,
+		tap_icmp4_send(c, qpair, *saddr, *daddr, buf,
 			       pingf->f.tap_omac, n);
 	} else if (pingf->f.type == FLOW_PING6) {
 		const struct in6_addr *saddr = &ini->oaddr.a6;
 		const struct in6_addr *daddr = &ini->eaddr.a6;
 
-		tap_icmp6_send(c, QPAIR_DEFAULT, saddr, daddr, buf,
+		tap_icmp6_send(c, qpair, saddr, daddr, buf,
 			       pingf->f.tap_omac, n);
 	}
 	return;
@@ -163,6 +165,7 @@ static void icmp_ping_close(const struct ctx *c,
 /**
  * icmp_ping_new() - Prepare a new ping socket for a new id
  * @c:		Execution context
+ * @qpair:	Queue pair of the flow
  * @af:		Address family, AF_INET or AF_INET6
  * @id:		ICMP id for the new socket
  * @saddr:	Source address
@@ -171,8 +174,9 @@ static void icmp_ping_close(const struct ctx *c,
  * Return: newly opened ping flow, or NULL on failure
  */
 static struct icmp_ping_flow *icmp_ping_new(const struct ctx *c,
-					    sa_family_t af, uint16_t id,
-					    const void *saddr, const void *daddr)
+					    unsigned int qpair, sa_family_t af,
+					    uint16_t id, const void *saddr,
+					    const void *daddr)
 {
 	uint8_t proto = af == AF_INET ? IPPROTO_ICMP : IPPROTO_ICMPV6;
 	uint8_t flowtype = af == AF_INET ? FLOW_PING4 : FLOW_PING6;
@@ -180,6 +184,7 @@ static struct icmp_ping_flow *icmp_ping_new(const struct ctx *c,
 	struct icmp_ping_flow *pingf;
 	const struct flowside *tgt;
 
+	(void)qpair;
 	if (!flow)
 		return NULL;
 
@@ -234,6 +239,7 @@ cancel:
 /**
  * icmp_tap_handler() - Handle packets from tap
  * @c:		Execution context
+ * @qpair:	Queue pair to process
  * @pif:	pif on which the packet is arriving
  * @af:		Address family, AF_INET or AF_INET6
  * @saddr:	Source address
@@ -243,8 +249,8 @@ cancel:
  *
  * Return: count of consumed packets (always 1, even if malformed)
  */
-int icmp_tap_handler(const struct ctx *c, uint8_t pif, sa_family_t af,
-		     const void *saddr, const void *daddr,
+int icmp_tap_handler(const struct ctx *c, unsigned int qpair, uint8_t pif,
+		     sa_family_t af, const void *saddr, const void *daddr,
 		     struct iov_tail *data, const struct timespec *now)
 {
 	struct iovec iov[MAX_IOV_ICMP];
@@ -301,7 +307,7 @@ int icmp_tap_handler(const struct ctx *c, uint8_t pif, sa_family_t af,
 
 	if (flow)
 		pingf = &flow->ping;
-	else if (!(pingf = icmp_ping_new(c, af, id, saddr, daddr)))
+	else if (!(pingf = icmp_ping_new(c, qpair, af, id, saddr, daddr)))
 		return 1;
 
 	tgt = &pingf->f.side[TGTSIDE];
diff --git a/icmp.h b/icmp.h
index 556260461995..5cc067e57a9c 100644
--- a/icmp.h
+++ b/icmp.h
@@ -13,9 +13,10 @@
 struct ctx;
 struct icmp_ping_flow;
 
-void icmp_sock_handler(const struct ctx *c, union epoll_ref ref);
-int icmp_tap_handler(const struct ctx *c, uint8_t pif, sa_family_t af,
-		     const void *saddr, const void *daddr,
+void icmp_sock_handler(const struct ctx *c, union epoll_ref ref,
+		       unsigned int qpair);
+int icmp_tap_handler(const struct ctx *c, unsigned int qpair, uint8_t pif,
+		     sa_family_t af, const void *saddr, const void *daddr,
 		     struct iov_tail *data, const struct timespec *now);
 void icmp_init(void);
 
diff --git a/passt.c b/passt.c
index 41239991451f..c9e456641e85 100644
--- a/passt.c
+++ b/passt.c
@@ -273,7 +273,7 @@ static void passt_worker(void *opaque, int nfds, struct epoll_event *events)
 					 QPAIR_DEFAULT);
 			break;
 		case EPOLL_TYPE_PING:
-			icmp_sock_handler(c, ref);
+			icmp_sock_handler(c, ref, QPAIR_DEFAULT);
 			break;
 		case EPOLL_TYPE_VHOST_CMD:
 			vu_control_handler(c->vdev, c->fd_tap, eventmask);
diff --git a/tap.c b/tap.c
index 8e19390f7273..de1e5269526c 100644
--- a/tap.c
+++ b/tap.c
@@ -791,7 +791,7 @@ resume:
 
 			tap_packet_debug(iph, NULL, NULL, 0, NULL, 1);
 
-			icmp_tap_handler(c, PIF_TAP, AF_INET,
+			icmp_tap_handler(c, qpair, PIF_TAP, AF_INET,
 					 &iph->saddr, &iph->daddr,
 					 &data, now);
 			continue;
@@ -1035,7 +1035,7 @@ resume:
 
 			tap_packet_debug(NULL, ip6h, NULL, proto, NULL, 1);
 
-			icmp_tap_handler(c, PIF_TAP, AF_INET6,
+			icmp_tap_handler(c, qpair, PIF_TAP, AF_INET6,
 					 saddr, daddr, &data, now);
 			continue;
 		}
-- 
2.54.0


^ permalink raw reply	[flat|nested] 13+ messages in thread

* [PATCH v5 10/12] ndp: Pass queue pair explicitly through NDP send path
  2026-06-16 12:51 [PATCH v5 00/12] vhost-user: Add multiqueue support Laurent Vivier
                   ` (8 preceding siblings ...)
  2026-06-16 12:51 ` [PATCH v5 09/12] icmp: Pass queue pair explicitly through ICMP " Laurent Vivier
@ 2026-06-16 12:51 ` Laurent Vivier
  2026-06-16 12:51 ` [PATCH v5 11/12] flow: Add queue pair tracking to flow management Laurent Vivier
  2026-06-16 12:51 ` [PATCH v5 12/12] flow: Derive epoll fd from queue pair, removing epollid field Laurent Vivier
  11 siblings, 0 replies; 13+ messages in thread
From: Laurent Vivier @ 2026-06-16 12:51 UTC (permalink / raw)
  To: passt-dev; +Cc: Laurent Vivier

Add a qpair parameter to ndp(), ndp_send(), ndp_na(),
ndp_unsolicited_na(), ndp_ra(), and ndp_send_init_req(), forwarding
it to tap_icmp6_send() instead of hardcoding QPAIR_DEFAULT.

ndp_timer() and fwd_neigh_table_update() have no queue pair context
and keep using QPAIR_DEFAULT.

No functional change.

Signed-off-by: Laurent Vivier <lvivier@redhat.com>
---
 fwd.c |  2 +-
 ndp.c | 40 ++++++++++++++++++++++++----------------
 ndp.h |  7 ++++---
 tap.c |  4 ++--
 4 files changed, 31 insertions(+), 22 deletions(-)

diff --git a/fwd.c b/fwd.c
index 0d0e265b7dc0..1bfe024ada26 100644
--- a/fwd.c
+++ b/fwd.c
@@ -147,7 +147,7 @@ void fwd_neigh_table_update(const struct ctx *c, const union inany_addr *addr,
 	if (inany_v4(addr))
 		arp_announce(c, QPAIR_DEFAULT, inany_v4(addr), e->mac);
 	else
-		ndp_unsolicited_na(c, &addr->a6);
+		ndp_unsolicited_na(c, QPAIR_DEFAULT, &addr->a6);
 }
 
 /**
diff --git a/ndp.c b/ndp.c
index 6269cb38d93f..00d1751ba100 100644
--- a/ndp.c
+++ b/ndp.c
@@ -175,26 +175,28 @@ struct ndp_ns {
 /**
  * ndp_send() - Send an NDP message
  * @c:		Execution context
+ * @qpair:	Queue pair on which to send the message
  * @dst:	IPv6 address to send the message to
  * @buf:	ICMPv6 header + message payload
  * @l4len:	Length of message, including ICMPv6 header
  */
-static void ndp_send(const struct ctx *c, const struct in6_addr *dst,
-		     const void *buf, size_t l4len)
+static void ndp_send(const struct ctx *c, unsigned int qpair,
+		     const struct in6_addr *dst, const void *buf, size_t l4len)
 {
 	const struct in6_addr *src = &c->ip6.our_tap_ll;
 
-	tap_icmp6_send(c, QPAIR_DEFAULT, src, dst, buf, c->our_tap_mac, l4len);
+	tap_icmp6_send(c, qpair, src, dst, buf, c->our_tap_mac, l4len);
 }
 
 /**
  * ndp_na() - Send an NDP Neighbour Advertisement (NA) message
  * @c:		Execution context
+ * @qpair:	Queue pair on which to send the NA
  * @dst:	IPv6 address to send the NA to
  * @addr:	IPv6 address to advertise
  */
-static void ndp_na(const struct ctx *c, const struct in6_addr *dst,
-	    const struct in6_addr *addr)
+static void ndp_na(const struct ctx *c, unsigned int qpair,
+		   const struct in6_addr *dst, const struct in6_addr *addr)
 {
 	union inany_addr tgt;
 	struct ndp_na na = {
@@ -217,26 +219,30 @@ static void ndp_na(const struct ctx *c, const struct in6_addr *dst,
 	inany_from_af(&tgt, AF_INET6, addr);
 	fwd_neigh_mac_get(c, &tgt, na.target_l2_addr.mac);
 
-	ndp_send(c, dst, &na, sizeof(na));
+	ndp_send(c, qpair, dst, &na, sizeof(na));
 }
 
 /**
  * ndp_unsolicited_na() - Send unsolicited NA
  * @c:         Execution context
+ * @qpair:     Queue pair on which to send the NA
  * @addr:      IPv6 address to advertise
  */
-void ndp_unsolicited_na(const struct ctx *c, const struct in6_addr *addr)
+void ndp_unsolicited_na(const struct ctx *c, unsigned int qpair,
+			const struct in6_addr *addr)
 {
 	if (tap_is_ready(c))
-		ndp_na(c, &in6addr_ll_all_nodes, addr);
+		ndp_na(c, qpair, &in6addr_ll_all_nodes, addr);
 }
 
 /**
  * ndp_ra() - Send an NDP Router Advertisement (RA) message
  * @c:		Execution context
+ * @qpair:	Queue pair on which to send the RA
  * @dst:	IPv6 address to send the RA to
  */
-static void ndp_ra(const struct ctx *c, const struct in6_addr *dst)
+static void ndp_ra(const struct ctx *c, unsigned int qpair,
+		   const struct in6_addr *dst)
 {
 	struct ndp_ra ra = {
 		.ih = {
@@ -344,18 +350,19 @@ static void ndp_ra(const struct ctx *c, const struct in6_addr *dst)
 	memcpy(&ra.source_ll.mac, c->our_tap_mac, ETH_ALEN);
 
 	/* NOLINTNEXTLINE(clang-analyzer-security.PointerSub) */
-	ndp_send(c, dst, &ra, ptr - (unsigned char *)&ra);
+	ndp_send(c, qpair, dst, &ra, ptr - (unsigned char *)&ra);
 }
 
 /**
  * ndp() - Check for NDP solicitations, reply as needed
  * @c:		Execution context
+ * @qpair:	Queue pair on which to send replies
  * @saddr:	Source IPv6 address
  * @data:	Single packet with ICMPv6 header
  *
  * Return: 0 if not handled here, 1 if handled, -1 on failure
  */
-int ndp(const struct ctx *c, const struct in6_addr *saddr,
+int ndp(const struct ctx *c, unsigned int qpair, const struct in6_addr *saddr,
 	struct iov_tail *data)
 {
 	struct icmp6hdr ih_storage;
@@ -384,13 +391,13 @@ int ndp(const struct ctx *c, const struct in6_addr *saddr,
 
 		info("NDP: received NS, sending NA");
 
-		ndp_na(c, saddr, &ns->target_addr);
+		ndp_na(c, qpair, saddr, &ns->target_addr);
 	} else if (ih->icmp6_type == RS) {
 		if (c->no_ra)
 			return 1;
 
 		info("NDP: received RS, sending RA");
-		ndp_ra(c, saddr);
+		ndp_ra(c, qpair, saddr);
 	}
 
 	return 1;
@@ -448,7 +455,7 @@ void ndp_timer(const struct ctx *c, const struct timespec *now)
 
 	info("NDP: sending unsolicited RA, next in %llds", (long long)interval);
 
-	ndp_ra(c, &in6addr_ll_all_nodes);
+	ndp_ra(c, QPAIR_DEFAULT, &in6addr_ll_all_nodes);
 
 first:
 	next_ra = now->tv_sec + interval;
@@ -457,8 +464,9 @@ first:
 /**
  * ndp_send_init_req() - Send initial NDP NS to retrieve guest MAC address
  * @c:		Execution context
+ * @qpair:	Queue pair on which to send the request
  */
-void ndp_send_init_req(const struct ctx *c)
+void ndp_send_init_req(const struct ctx *c, unsigned int qpair)
 {
 	struct ndp_ns ns = {
 		.ih = {
@@ -471,5 +479,5 @@ void ndp_send_init_req(const struct ctx *c)
 		.target_addr = c->ip6.addr
 	};
 	debug("Sending initial NDP NS request for guest MAC address");
-	ndp_send(c, &c->ip6.addr, &ns, sizeof(ns));
+	ndp_send(c, qpair, &c->ip6.addr, &ns, sizeof(ns));
 }
diff --git a/ndp.h b/ndp.h
index 56b756d8400b..8c168fc199fe 100644
--- a/ndp.h
+++ b/ndp.h
@@ -8,10 +8,11 @@
 
 struct icmp6hdr;
 
-int ndp(const struct ctx *c, const struct in6_addr *saddr,
+int ndp(const struct ctx *c, unsigned int qpair, const struct in6_addr *saddr,
 	struct iov_tail *data);
 void ndp_timer(const struct ctx *c, const struct timespec *now);
-void ndp_send_init_req(const struct ctx *c);
-void ndp_unsolicited_na(const struct ctx *c, const struct in6_addr *addr);
+void ndp_send_init_req(const struct ctx *c, unsigned int qpair);
+void ndp_unsolicited_na(const struct ctx *c, unsigned int qpair,
+			const struct in6_addr *addr);
 
 #endif /* NDP_H */
diff --git a/tap.c b/tap.c
index de1e5269526c..5e9c7a1701bf 100644
--- a/tap.c
+++ b/tap.c
@@ -1030,7 +1030,7 @@ resume:
 				continue;
 
 			ndp_data = data;
-			if (ndp(c, saddr, &ndp_data))
+			if (ndp(c, qpair, saddr, &ndp_data))
 				continue;
 
 			tap_packet_debug(NULL, ip6h, NULL, proto, NULL, 1);
@@ -1470,7 +1470,7 @@ static void tap_start_connection(const struct ctx *c, unsigned int qpair)
 	if (c->ifi4)
 		arp_send_init_req(c, qpair);
 	if (c->ifi6 && !c->no_ndp)
-		ndp_send_init_req(c);
+		ndp_send_init_req(c, qpair);
 }
 
 /**
-- 
2.54.0


^ permalink raw reply	[flat|nested] 13+ messages in thread

* [PATCH v5 11/12] flow: Add queue pair tracking to flow management
  2026-06-16 12:51 [PATCH v5 00/12] vhost-user: Add multiqueue support Laurent Vivier
                   ` (9 preceding siblings ...)
  2026-06-16 12:51 ` [PATCH v5 10/12] ndp: Pass queue pair explicitly through NDP " Laurent Vivier
@ 2026-06-16 12:51 ` Laurent Vivier
  2026-06-16 12:51 ` [PATCH v5 12/12] flow: Derive epoll fd from queue pair, removing epollid field Laurent Vivier
  11 siblings, 0 replies; 13+ messages in thread
From: Laurent Vivier @ 2026-06-16 12:51 UTC (permalink / raw)
  To: passt-dev; +Cc: Laurent Vivier

Add a qpair field (5 bits) to struct flow_common, with
FLOW_QPAIR_INVALID as sentinel for unassigned flows.  Provide
flow_setqp()/FLOW_SETQP() to assign and flow_qp()/FLOW_QP() to
query the queue pair.

All protocol handlers (TCP, UDP, ICMP) set the queue pair on new
flows via FLOW_SETQP(), and update it on each packet received from
tap for existing flows, implementing virtio receive steering: return
traffic is directed to the RX queue matching the originating TX
queue.

tcp_keepalive() and tcp_inactivity() now filter by queue pair so
each worker only processes its own flows.

tcp_buf.c uses conn->f.qpair instead of hardcoding QPAIR_DEFAULT
for consistency, though this path is only used in non-vhost-user
mode where the queue pair is always 0.

Flows initiated from the host side default to queue pair 0.

Signed-off-by: Laurent Vivier <lvivier@redhat.com>
---
 flow.c       | 34 ++++++++++++++++++++++++++++++++++
 flow.h       | 18 +++++++++++++++++-
 icmp.c       |  8 +++++---
 tcp.c        | 20 ++++++++++++++++----
 tcp_buf.c    | 10 +++++-----
 tcp_splice.c |  1 +
 udp_flow.c   |  7 +++++--
 7 files changed, 83 insertions(+), 15 deletions(-)

diff --git a/flow.c b/flow.c
index c93b73549c90..bf855fe0dfaf 100644
--- a/flow.c
+++ b/flow.c
@@ -415,6 +415,39 @@ void flow_epollid_register(int epollid, int epollfd)
 	epoll_id_to_fd[epollid] = epollfd;
 }
 
+/**
+ * flow_qp() - Get the queue pair for a flow
+ * @f:		Flow to query (may be NULL)
+ *
+ * Return: queue pair number for the flow, or 0 if flow is NULL or has no
+ *         valid queue pair assignment
+ */
+/* cppcheck-suppress unusedFunction */
+unsigned int flow_qp(const struct flow_common *f)
+{
+	if (f == NULL || f->qpair == FLOW_QPAIR_INVALID)
+		return QPAIR_DEFAULT;
+	return f->qpair;
+}
+
+/**
+ * flow_setqp() - Set queue pair assignment for a flow
+ * @f:		Flow to update
+ * @qpair:	Queue pair number to assign
+ */
+void flow_setqp(struct flow_common *f, unsigned int qpair)
+{
+	assert(qpair < FLOW_QPAIR_MAX);
+
+	if (f->qpair == qpair)
+		return;
+
+	flow_trace((union flow *)f, "updating queue pair from %d to %d",
+		   f->qpair, qpair);
+
+	f->qpair = qpair;
+}
+
 /**
  * flow_initiate_() - Move flow to INI, setting pif[INISIDE]
  * @flow:	Flow to change state
@@ -636,6 +669,7 @@ union flow *flow_alloc(void)
 
 	flow_new_entry = flow;
 	memset(flow, 0, sizeof(*flow));
+	flow->f.qpair = FLOW_QPAIR_INVALID;
 	flow_set_state(&flow->f, FLOW_STATE_NEW);
 
 	return flow;
diff --git a/flow.h b/flow.h
index cae259fe7037..3c74dcbd95c4 100644
--- a/flow.h
+++ b/flow.h
@@ -184,7 +184,8 @@ int flowside_connect(const struct ctx *c, int s,
  * @pif[]:	Interface for each side of the flow
  * @side[]:	Information for each side of the flow
  * @tap_omac:	MAC address of remote endpoint as seen from the guest
- * @epollid:	epollfd identifier
+ * @qpair:	Queue pair number assigned to this flow
+ *		(FLOW_QPAIR_INVALID if not assigned)
  */
 struct flow_common {
 #ifdef __GNUC__
@@ -205,11 +206,19 @@ struct flow_common {
 
 #define EPOLLFD_ID_BITS 8
 	unsigned int	epollid:EPOLLFD_ID_BITS;
+#define FLOW_QPAIR_BITS 5
+	unsigned int	qpair:FLOW_QPAIR_BITS;
 };
 
 #define EPOLLFD_ID_DEFAULT	0
 #define EPOLLFD_ID_SIZE		(1 << EPOLLFD_ID_BITS)
 
+#define FLOW_QPAIR_NUM		(1 << FLOW_QPAIR_BITS)
+#define FLOW_QPAIR_MAX		(FLOW_QPAIR_NUM - 1)
+#define FLOW_QPAIR_INVALID	FLOW_QPAIR_MAX
+
+static_assert(VHOST_USER_MAX_VQS <= FLOW_QPAIR_MAX * 2);
+
 #define FLOW_INDEX_BITS		17	/* 128k - 1 */
 #define FLOW_MAX		MAX_FROM_BITS(FLOW_INDEX_BITS)
 
@@ -270,6 +279,13 @@ void flow_epollid_set(struct flow_common *f, int epollid);
 int flow_epoll_set(const struct flow_common *f, int command, uint32_t events,
 		   int fd, unsigned int sidei);
 void flow_epollid_register(int epollid, int epollfd);
+unsigned int flow_qp(const struct flow_common *f);
+#define FLOW_QP(flow_)				\
+	(flow_qp(&(flow_)->f))
+void flow_setqp(struct flow_common *f, unsigned int qpair);
+#define FLOW_SETQP(flow_, _qpair)		\
+	(flow_setqp(&(flow_)->f, _qpair))
+
 void flow_defer_handler(const struct ctx *c, const struct timespec *now,
 			unsigned int qpair);
 int flow_migrate_source_early(struct ctx *c, const struct migrate_stage *stage,
diff --git a/icmp.c b/icmp.c
index 62038f977116..2558fe5beaab 100644
--- a/icmp.c
+++ b/icmp.c
@@ -184,7 +184,6 @@ static struct icmp_ping_flow *icmp_ping_new(const struct ctx *c,
 	struct icmp_ping_flow *pingf;
 	const struct flowside *tgt;
 
-	(void)qpair;
 	if (!flow)
 		return NULL;
 
@@ -216,6 +215,7 @@ static struct icmp_ping_flow *icmp_ping_new(const struct ctx *c,
 	if (pingf->sock > FD_REF_MAX)
 		goto cancel;
 
+	FLOW_SETQP(pingf, qpair);
 	flow_epollid_set(&pingf->f, EPOLLFD_ID_DEFAULT);
 	if (flow_epoll_set(&pingf->f, EPOLL_CTL_ADD, EPOLLIN, pingf->sock,
 			   TGTSIDE) < 0) {
@@ -305,10 +305,12 @@ int icmp_tap_handler(const struct ctx *c, unsigned int qpair, uint8_t pif,
 	flow = flow_at_sidx(flow_lookup_af(c, proto, PIF_TAP,
 					   af, saddr, daddr, id, id));
 
-	if (flow)
+	if (flow) {
 		pingf = &flow->ping;
-	else if (!(pingf = icmp_ping_new(c, qpair, af, id, saddr, daddr)))
+		FLOW_SETQP(pingf, qpair); /* XXX if qpair change, update epollfd */
+	} else if (!(pingf = icmp_ping_new(c, qpair, af, id, saddr, daddr))) {
 		return 1;
+	}
 
 	tgt = &pingf->f.side[TGTSIDE];
 
diff --git a/tcp.c b/tcp.c
index 7f8e68a31994..c0a4de33f068 100644
--- a/tcp.c
+++ b/tcp.c
@@ -1735,6 +1735,7 @@ static void tcp_conn_from_tap(const struct ctx *c, unsigned int qpair,
 
 	conn->sock = s;
 	conn->timer = -1;
+	FLOW_SETQP(conn, qpair);
 	flow_epollid_set(&conn->f, EPOLLFD_ID_DEFAULT);
 	if (flow_epoll_set(&conn->f, EPOLL_CTL_ADD, 0, s, TGTSIDE) < 0) {
 		flow_perror(flow, "Can't register with epoll");
@@ -2250,7 +2251,7 @@ static void tcp_rst_no_conn(const struct ctx *c, unsigned int qpair, int af,
 /**
  * tcp_tap_handler() - Handle packets from tap and state transitions
  * @c:		Execution context
- * @qpair:	Queue pair on which to send packets
+ * @qpair:	Queue pair to process
  * @pif:	pif on which the packet is arriving
  * @af:		Address family, AF_INET or AF_INET6
  * @saddr:	Source address
@@ -2314,6 +2315,9 @@ int tcp_tap_handler(const struct ctx *c, unsigned int qpair, uint8_t pif,
 	assert(pif_at_sidx(sidx) == PIF_TAP);
 	conn = &flow->tcp;
 
+	/* update queue pair */
+	FLOW_SETQP(flow, qpair);
+
 	flow_trace(conn, "packet length %zu from tap", l4len);
 
 	if (th->rst) {
@@ -2518,6 +2522,7 @@ static void tcp_tap_conn_from_sock(const struct ctx *c, union flow *flow,
 	conn->timer = -1;
 	conn->ws_to_tap = conn->ws_from_tap = 0;
 
+	FLOW_SETQP(conn, QPAIR_DEFAULT);
 	flow_epollid_set(&conn->f, EPOLLFD_ID_DEFAULT);
 	if (flow_epoll_set(&conn->f, EPOLL_CTL_ADD, 0, s, INISIDE) < 0) {
 		flow_perror(flow, "Can't register with epoll");
@@ -2980,6 +2985,9 @@ static void tcp_keepalive(struct ctx *c, const struct timespec *now,
 	flow_foreach_of_type(flow, FLOW_TCP) {
 		struct tcp_tap_conn *conn = &flow->tcp;
 
+		if (conn->f.qpair != qpair)
+			continue;
+
 		if (conn->tap_inactive) {
 			flow_dbg(conn, "No tap activity for least %us, send keepalive",
 				 KEEPALIVE_INTERVAL);
@@ -3011,6 +3019,9 @@ static void tcp_inactivity(struct ctx *c, const struct timespec *now,
 	flow_foreach_of_type(flow, FLOW_TCP) {
 		struct tcp_tap_conn *conn = &flow->tcp;
 
+		if (conn->f.qpair != qpair)
+			continue;
+
 		if (conn->inactive) {
 			/* No activity in this interval, reset */
 			flow_dbg(conn, "Inactive for at least %us, resetting",
@@ -3841,6 +3852,7 @@ int tcp_flow_migrate_target(struct ctx *c, int fd)
 		goto out;
 	}
 
+	FLOW_SETQP(conn, QPAIR_DEFAULT);
 	flow_epollid_set(&conn->f, EPOLLFD_ID_DEFAULT);
 	if (flow_epoll_set(&conn->f, EPOLL_CTL_ADD, 0, conn->sock,
 			   !TAPSIDE(conn)))
@@ -4019,10 +4031,10 @@ int tcp_flow_migrate_target_ext(struct ctx *c, struct tcp_tap_conn *conn, int fd
 	if (tcp_set_peek_offset(conn, peek_offset))
 		goto fail;
 
-	if (tcp_send_flag(c, conn, ACK, QPAIR_DEFAULT))
+	if (tcp_send_flag(c, conn, ACK, conn->f.qpair))
 		goto fail;
 
-	tcp_data_from_sock(c, conn, QPAIR_DEFAULT);
+	tcp_data_from_sock(c, conn, conn->f.qpair);
 
 	if ((rc = tcp_epoll_ctl(conn))) {
 		flow_dbg(conn,
@@ -4040,7 +4052,7 @@ fail:
 	}
 
 	conn->flags = 0; /* Not waiting for ACK, don't schedule timer */
-	tcp_rst(c, conn, QPAIR_DEFAULT);
+	tcp_rst(c, conn, conn->f.qpair);
 
 	return 0;
 }
diff --git a/tcp_buf.c b/tcp_buf.c
index ae8bebca5107..647c17621963 100644
--- a/tcp_buf.c
+++ b/tcp_buf.c
@@ -124,7 +124,7 @@ static void tcp_revert_seq(const struct ctx *c, struct tcp_tap_conn **conns,
 		conn->seq_to_tap = seq;
 		peek_offset = conn->seq_to_tap - conn->seq_ack_from_tap;
 		if (tcp_set_peek_offset(conn, peek_offset))
-			tcp_rst(c, conn, QPAIR_DEFAULT);
+			tcp_rst(c, conn, conn->f.qpair);
 	}
 }
 
@@ -334,7 +334,7 @@ int tcp_buf_data_from_sock(const struct ctx *c, struct tcp_tap_conn *conn)
 		conn->seq_to_tap = conn->seq_ack_from_tap;
 		already_sent = 0;
 		if (tcp_set_peek_offset(conn, 0)) {
-			tcp_rst(c, conn, QPAIR_DEFAULT);
+			tcp_rst(c, conn, conn->f.qpair);
 			return -1;
 		}
 	}
@@ -356,7 +356,7 @@ int tcp_buf_data_from_sock(const struct ctx *c, struct tcp_tap_conn *conn)
 	}
 
 	if (tcp_prepare_iov(&mh_sock, iov_sock, already_sent, fill_bufs)) {
-		tcp_rst(c, conn, QPAIR_DEFAULT);
+		tcp_rst(c, conn, conn->f.qpair);
 		return -1;
 	}
 
@@ -381,7 +381,7 @@ int tcp_buf_data_from_sock(const struct ctx *c, struct tcp_tap_conn *conn)
 
 	if (len < 0) {
 		if (errno != EAGAIN && errno != EWOULDBLOCK) {
-			tcp_rst(c, conn, QPAIR_DEFAULT);
+			tcp_rst(c, conn, conn->f.qpair);
 			return -errno;
 		}
 
@@ -410,7 +410,7 @@ int tcp_buf_data_from_sock(const struct ctx *c, struct tcp_tap_conn *conn)
 
 			ret = tcp_buf_send_flag(c, conn, FIN | ACK);
 			if (ret) {
-				tcp_rst(c, conn, QPAIR_DEFAULT);
+				tcp_rst(c, conn, conn->f.qpair);
 				return ret;
 			}
 
diff --git a/tcp_splice.c b/tcp_splice.c
index 3fd33a10308e..1a77ac2e8a18 100644
--- a/tcp_splice.c
+++ b/tcp_splice.c
@@ -377,6 +377,7 @@ static int tcp_splice_connect(const struct ctx *c, struct tcp_splice_conn *conn)
 
 	pif_sockaddr(c, &sa, tgtpif, &tgt->eaddr, tgt->eport);
 
+	FLOW_SETQP(conn, QPAIR_DEFAULT);
 	flow_epollid_set(&conn->f, EPOLLFD_ID_DEFAULT);
 	if (flow_epoll_set(&conn->f, EPOLL_CTL_ADD, 0, conn->s[0], 0) ||
 	    flow_epoll_set(&conn->f, EPOLL_CTL_ADD, 0, conn->s[1], 1)) {
diff --git a/udp_flow.c b/udp_flow.c
index 143f265493fa..44e0c4c50ca9 100644
--- a/udp_flow.c
+++ b/udp_flow.c
@@ -81,7 +81,6 @@ static int udp_flow_sock(const struct ctx *c,
 		return s;
 	}
 
-	flow_epollid_set(&uflow->f, EPOLLFD_ID_DEFAULT);
 	if (flow_epoll_set(&uflow->f, EPOLL_CTL_ADD, EPOLLIN, s, sidei) < 0) {
 		rc = -errno;
 		close(s);
@@ -154,7 +153,8 @@ static flow_sidx_t udp_flow_new(const struct ctx *c, unsigned int qpair,
 	uflow->ttl[INISIDE] = uflow->ttl[TGTSIDE] = 0;
 	uflow->activity[INISIDE] = 1;
 	uflow->activity[TGTSIDE] = 0;
-	(void)qpair;
+	FLOW_SETQP(uflow, qpair);
+	flow_epollid_set(&uflow->f, EPOLLFD_ID_DEFAULT);
 
 	flow_foreach_sidei(sidei) {
 		if (pif_is_socket(uflow->f.pif[sidei]))
@@ -270,6 +270,7 @@ flow_sidx_t udp_flow_from_sock(const struct ctx *c, uint8_t pif,
  * @daddr:	Destination address guest side
  * @srcport:	Source port on guest side
  * @dstport:	Destination port on guest side
+ * @now:	Current timestamp
  *
  * Return: sidx for the destination side of the flow for this packet, or
  *         FLOW_SIDX_NONE if we couldn't find or create a flow.
@@ -291,6 +292,8 @@ flow_sidx_t udp_flow_from_tap(const struct ctx *c, unsigned int qpair,
 			      srcport, dstport);
 	if ((uflow = udp_at_sidx(sidx))) {
 		udp_flow_activity(uflow, sidx.sidei, now);
+		/* update qpair */
+		FLOW_SETQP(uflow, qpair); /* if qpair changes, update epollfd */
 		return flow_sidx_opposite(sidx);
 	}
 
-- 
2.54.0


^ permalink raw reply	[flat|nested] 13+ messages in thread

* [PATCH v5 12/12] flow: Derive epoll fd from queue pair, removing epollid field
  2026-06-16 12:51 [PATCH v5 00/12] vhost-user: Add multiqueue support Laurent Vivier
                   ` (10 preceding siblings ...)
  2026-06-16 12:51 ` [PATCH v5 11/12] flow: Add queue pair tracking to flow management Laurent Vivier
@ 2026-06-16 12:51 ` Laurent Vivier
  11 siblings, 0 replies; 13+ messages in thread
From: Laurent Vivier @ 2026-06-16 12:51 UTC (permalink / raw)
  To: passt-dev; +Cc: Laurent Vivier

Since each queue pair maps to exactly one epoll instance, the epoll
file descriptor can be looked up directly from the qpair field.  This
makes the separate epollid field in flow_common redundant.

Replace epoll_id_to_fd[] with qpair_to_fd[], remove
flow_epollid_set(), flow_epollid_register(), flow_qp()/FLOW_QP(),
and the epollid field from flow_common.  FLOW_QPAIR_INVALID is no
longer needed: newly allocated flows get qpair 0 from memset.

For new flows, FLOW_SETQP() sets the queue pair during creation, and
the socket is registered with epoll separately via flow_epoll_set().

For existing flows that may move between queue pairs, add
flow_migrate()/FLOW_MIGRATE(), which removes the socket from the old
epoll instance and re-registers it on the new one.

TCP timers are migrated lazily: tcp_timer_handler() detects a qpair
mismatch when a timer fires, moves the timerfd to the correct epoll
instance, and returns without further processing.

flow_init() now takes the execution context to initialise
qpair_to_fd[].

Signed-off-by: Laurent Vivier <lvivier@redhat.com>
---
 flow.c       | 63 ++++++++++++++++++++--------------------------------
 flow.h       | 25 ++++++++-------------
 icmp.c       |  3 +--
 passt.c      |  3 +--
 tcp.c        | 42 +++++++++++++++++++++++++----------
 tcp_splice.c |  1 -
 udp_flow.c   |  4 +---
 7 files changed, 66 insertions(+), 75 deletions(-)

diff --git a/flow.c b/flow.c
index bf855fe0dfaf..787a7139cfc1 100644
--- a/flow.c
+++ b/flow.c
@@ -130,7 +130,7 @@ static_assert(ARRAY_SIZE(flow_epoll) == FLOW_NUM_TYPES,
 unsigned flow_first_free;
 union flow flowtab[FLOW_MAX];
 static const union flow *flow_new_entry; /* = NULL */
-static int epoll_id_to_fd[EPOLLFD_ID_SIZE];
+int qpair_to_fd[FLOW_QPAIR_SIZE];
 
 /* Hash table to index it */
 #define FLOW_HASH_LOAD		70		/* % */
@@ -362,19 +362,7 @@ static void flow_set_state(struct flow_common *f, enum flow_state state)
  */
 int flow_epollfd(const struct flow_common *f)
 {
-	return epoll_id_to_fd[f->epollid];
-}
-
-/**
- * flow_epollid_set() - Associate a flow with an epoll id
- * @f:		Flow to update
- * @epollid:	epoll id to associate with this flow
- */
-void flow_epollid_set(struct flow_common *f, int epollid)
-{
-	assert(epollid < EPOLLFD_ID_SIZE);
-
-	f->epollid = epollid;
+	return qpair_to_fd[f->qpair];
 }
 
 /**
@@ -404,40 +392,31 @@ int flow_epoll_set(const struct flow_common *f, int command, uint32_t events,
 }
 
 /**
- * flow_epollid_register() - Initialize the epoll id -> fd mapping
- * @epollid:	epoll id to associate to
- * @epollfd:	epoll file descriptor for this epoll id
+ * flow_setqp() - Set queue pair assignment for a flow
+ * @f:		Flow to update
+ * @qpair:	Queue pair number to assign
  */
-void flow_epollid_register(int epollid, int epollfd)
+void flow_setqp(struct flow_common *f, unsigned int qpair)
 {
-	assert(epollid < EPOLLFD_ID_SIZE);
+	assert(qpair < FLOW_QPAIR_SIZE);
 
-	epoll_id_to_fd[epollid] = epollfd;
-}
+	flow_trace((union flow *)f, "setting queue pair to %d", qpair);
 
-/**
- * flow_qp() - Get the queue pair for a flow
- * @f:		Flow to query (may be NULL)
- *
- * Return: queue pair number for the flow, or 0 if flow is NULL or has no
- *         valid queue pair assignment
- */
-/* cppcheck-suppress unusedFunction */
-unsigned int flow_qp(const struct flow_common *f)
-{
-	if (f == NULL || f->qpair == FLOW_QPAIR_INVALID)
-		return QPAIR_DEFAULT;
-	return f->qpair;
+	f->qpair = qpair;
 }
 
 /**
- * flow_setqp() - Set queue pair assignment for a flow
+ * flow_migrate() - Migrate a flow to a different queue pair
  * @f:		Flow to update
  * @qpair:	Queue pair number to assign
+ * @events:	epoll events to watch for
+ * @fd:		File descriptor to register
+ * @sidei:	Side index of the flow
  */
-void flow_setqp(struct flow_common *f, unsigned int qpair)
+void flow_migrate(struct flow_common *f, unsigned int qpair, uint32_t events,
+		  int fd, unsigned int sidei)
 {
-	assert(qpair < FLOW_QPAIR_MAX);
+	assert(qpair < FLOW_QPAIR_SIZE);
 
 	if (f->qpair == qpair)
 		return;
@@ -445,7 +424,10 @@ void flow_setqp(struct flow_common *f, unsigned int qpair)
 	flow_trace((union flow *)f, "updating queue pair from %d to %d",
 		   f->qpair, qpair);
 
+	epoll_del(qpair_to_fd[f->qpair], fd);
+
 	f->qpair = qpair;
+	flow_epoll_set(f, EPOLL_CTL_ADD, events, fd, sidei);
 }
 
 /**
@@ -669,7 +651,6 @@ union flow *flow_alloc(void)
 
 	flow_new_entry = flow;
 	memset(flow, 0, sizeof(*flow));
-	flow->f.qpair = FLOW_QPAIR_INVALID;
 	flow_set_state(&flow->f, FLOW_STATE_NEW);
 
 	return flow;
@@ -1295,8 +1276,9 @@ int flow_migrate_target(struct ctx *c, const struct migrate_stage *stage,
 
 /**
  * flow_init() - Initialise flow related data structures
+ * @c:		Execution context
  */
-void flow_init(void)
+void flow_init(const struct ctx *c)
 {
 	unsigned b;
 
@@ -1306,4 +1288,7 @@ void flow_init(void)
 
 	for (b = 0; b < FLOW_HASH_SIZE; b++)
 		flow_hashtab[b] = FLOW_SIDX_NONE;
+
+	for (b = 0; b < FLOW_QPAIR_SIZE; b++)
+		qpair_to_fd[b] = c->epollfd;
 }
diff --git a/flow.h b/flow.h
index 3c74dcbd95c4..53e0408a9ee5 100644
--- a/flow.h
+++ b/flow.h
@@ -157,6 +157,8 @@ struct flowside {
 	in_port_t		eport;
 };
 
+extern int qpair_to_fd[];
+
 /**
  * flowside_eq() - Check if two flowsides are equal
  * @left, @right:	Flowsides to compare
@@ -204,20 +206,12 @@ struct flow_common {
 
 	uint8_t		tap_omac[6];
 
-#define EPOLLFD_ID_BITS 8
-	unsigned int	epollid:EPOLLFD_ID_BITS;
 #define FLOW_QPAIR_BITS 5
 	unsigned int	qpair:FLOW_QPAIR_BITS;
 };
 
-#define EPOLLFD_ID_DEFAULT	0
-#define EPOLLFD_ID_SIZE		(1 << EPOLLFD_ID_BITS)
-
-#define FLOW_QPAIR_NUM		(1 << FLOW_QPAIR_BITS)
-#define FLOW_QPAIR_MAX		(FLOW_QPAIR_NUM - 1)
-#define FLOW_QPAIR_INVALID	FLOW_QPAIR_MAX
-
-static_assert(VHOST_USER_MAX_VQS <= FLOW_QPAIR_MAX * 2);
+#define FLOW_QPAIR_SIZE		(1 << FLOW_QPAIR_BITS)
+static_assert(VHOST_USER_MAX_VQS <= FLOW_QPAIR_SIZE * 2);
 
 #define FLOW_INDEX_BITS		17	/* 128k - 1 */
 #define FLOW_MAX		MAX_FROM_BITS(FLOW_INDEX_BITS)
@@ -273,18 +267,17 @@ flow_sidx_t flow_lookup_sa(const struct ctx *c, uint8_t proto, uint8_t pif,
 
 union flow;
 
-void flow_init(void);
+void flow_init(const struct ctx *c);
 int flow_epollfd(const struct flow_common *f);
-void flow_epollid_set(struct flow_common *f, int epollid);
 int flow_epoll_set(const struct flow_common *f, int command, uint32_t events,
 		   int fd, unsigned int sidei);
-void flow_epollid_register(int epollid, int epollfd);
-unsigned int flow_qp(const struct flow_common *f);
-#define FLOW_QP(flow_)				\
-	(flow_qp(&(flow_)->f))
 void flow_setqp(struct flow_common *f, unsigned int qpair);
 #define FLOW_SETQP(flow_, _qpair)		\
 	(flow_setqp(&(flow_)->f, _qpair))
+void flow_migrate(struct flow_common *f, unsigned int qpair, uint32_t events,
+		  int fd, unsigned int sidei);
+#define FLOW_MIGRATE(flow_, qpair_, events_, fd_, sidei_)	\
+	(flow_migrate(&(flow_)->f, qpair_, events_, fd_, sidei_))
 
 void flow_defer_handler(const struct ctx *c, const struct timespec *now,
 			unsigned int qpair);
diff --git a/icmp.c b/icmp.c
index 2558fe5beaab..98ce55a8aff0 100644
--- a/icmp.c
+++ b/icmp.c
@@ -216,7 +216,6 @@ static struct icmp_ping_flow *icmp_ping_new(const struct ctx *c,
 		goto cancel;
 
 	FLOW_SETQP(pingf, qpair);
-	flow_epollid_set(&pingf->f, EPOLLFD_ID_DEFAULT);
 	if (flow_epoll_set(&pingf->f, EPOLL_CTL_ADD, EPOLLIN, pingf->sock,
 			   TGTSIDE) < 0) {
 		close(pingf->sock);
@@ -307,7 +306,7 @@ int icmp_tap_handler(const struct ctx *c, unsigned int qpair, uint8_t pif,
 
 	if (flow) {
 		pingf = &flow->ping;
-		FLOW_SETQP(pingf, qpair); /* XXX if qpair change, update epollfd */
+		FLOW_MIGRATE(pingf, qpair, EPOLLIN, pingf->sock, TGTSIDE);
 	} else if (!(pingf = icmp_ping_new(c, qpair, af, id, saddr, daddr))) {
 		return 1;
 	}
diff --git a/passt.c b/passt.c
index c9e456641e85..3afc59b19120 100644
--- a/passt.c
+++ b/passt.c
@@ -365,7 +365,6 @@ int main(int argc, char **argv)
 	c.epollfd = epoll_create1(EPOLL_CLOEXEC);
 	if (c.epollfd == -1)
 		die_perror("Failed to create epoll file descriptor");
-	flow_epollid_register(EPOLLFD_ID_DEFAULT, c.epollfd);
 
 	if (getrlimit(RLIMIT_NOFILE, &limit))
 		die_perror("Failed to get maximum value of open files limit");
@@ -388,7 +387,7 @@ int main(int argc, char **argv)
 	if (clock_gettime(CLOCK_MONOTONIC, &now))
 		die_perror("Failed to get CLOCK_MONOTONIC time");
 
-	flow_init();
+	flow_init(&c);
 	fwd_scan_ports_init(&c);
 
 	if ((!c.no_udp && udp_init(&c)) || (!c.no_tcp && tcp_init(&c)))
diff --git a/tcp.c b/tcp.c
index c0a4de33f068..1549e14adaf4 100644
--- a/tcp.c
+++ b/tcp.c
@@ -541,6 +541,21 @@ static int tcp_epoll_ctl(struct tcp_tap_conn *conn)
 	return 0;
 }
 
+static int tcp_timer_epoll_add(struct tcp_tap_conn *conn, int fd)
+{
+	union epoll_ref ref;
+
+	ref.type = EPOLL_TYPE_TCP_TIMER;
+	ref.flow = FLOW_IDX(conn);
+	ref.fd = fd;
+	if (epoll_add(flow_epollfd(&conn->f), EPOLLIN | EPOLLET, ref) < 0) {
+		flow_dbg(conn, "failed to add timer");
+		return -1;
+	}
+
+	return 0;
+}
+
 /**
  * tcp_timer_ctl() - Set timerfd based on flags/events, create timerfd if needed
  * @c:		Execution context
@@ -555,7 +570,6 @@ static void tcp_timer_ctl(const struct ctx *c, struct tcp_tap_conn *conn)
 		return;
 
 	if (conn->timer == -1) {
-		union epoll_ref ref;
 		int fd;
 
 		fd = timerfd_create(CLOCK_MONOTONIC, 0);
@@ -570,12 +584,7 @@ static void tcp_timer_ctl(const struct ctx *c, struct tcp_tap_conn *conn)
 			return;
 		}
 
-		ref.type = EPOLL_TYPE_TCP_TIMER;
-		ref.flow = FLOW_IDX(conn);
-		ref.fd = fd;
-		if (epoll_add(flow_epollfd(&conn->f), EPOLLIN | EPOLLET,
-			      ref) < 0) {
-			flow_dbg(conn, "failed to add timer");
+		if (tcp_timer_epoll_add(conn, fd) < 0) {
 			close(fd);
 			return;
 		}
@@ -1736,7 +1745,6 @@ static void tcp_conn_from_tap(const struct ctx *c, unsigned int qpair,
 	conn->sock = s;
 	conn->timer = -1;
 	FLOW_SETQP(conn, qpair);
-	flow_epollid_set(&conn->f, EPOLLFD_ID_DEFAULT);
 	if (flow_epoll_set(&conn->f, EPOLL_CTL_ADD, 0, s, TGTSIDE) < 0) {
 		flow_perror(flow, "Can't register with epoll");
 		goto cancel;
@@ -2315,8 +2323,9 @@ int tcp_tap_handler(const struct ctx *c, unsigned int qpair, uint8_t pif,
 	assert(pif_at_sidx(sidx) == PIF_TAP);
 	conn = &flow->tcp;
 
-	/* update queue pair */
-	FLOW_SETQP(flow, qpair);
+	FLOW_MIGRATE(flow, qpair,
+		     tcp_conn_epoll_events(conn->events, conn->flags),
+		     conn->sock, !TAPSIDE(conn));
 
 	flow_trace(conn, "packet length %zu from tap", l4len);
 
@@ -2523,7 +2532,6 @@ static void tcp_tap_conn_from_sock(const struct ctx *c, union flow *flow,
 	conn->ws_to_tap = conn->ws_from_tap = 0;
 
 	FLOW_SETQP(conn, QPAIR_DEFAULT);
-	flow_epollid_set(&conn->f, EPOLLFD_ID_DEFAULT);
 	if (flow_epoll_set(&conn->f, EPOLL_CTL_ADD, 0, s, INISIDE) < 0) {
 		flow_perror(flow, "Can't register with epoll");
 		conn_flag(c, conn, CLOSING);
@@ -2646,6 +2654,17 @@ void tcp_timer_handler(const struct ctx *c, union epoll_ref ref,
 	assert(!c->no_tcp);
 	assert(conn->f.type == FLOW_TCP);
 
+	if (conn->f.qpair != qpair) {
+		int old_epollfd = qpair_to_fd[qpair];
+
+		epoll_del(old_epollfd, conn->timer);
+		if (tcp_timer_epoll_add(conn, conn->timer) < 0) {
+			close(conn->timer);
+			conn->timer = -1;
+		}
+		return;
+	}
+
 	/* We don't reset timers on ~ACK_FROM_TAP_DUE, ~ACK_TO_TAP_DUE. If the
 	 * timer is currently armed, this event came from a previous setting,
 	 * and we just set the timer to a new point in the future: discard it.
@@ -3853,7 +3872,6 @@ int tcp_flow_migrate_target(struct ctx *c, int fd)
 	}
 
 	FLOW_SETQP(conn, QPAIR_DEFAULT);
-	flow_epollid_set(&conn->f, EPOLLFD_ID_DEFAULT);
 	if (flow_epoll_set(&conn->f, EPOLL_CTL_ADD, 0, conn->sock,
 			   !TAPSIDE(conn)))
 		goto out; /* tcp_flow_migrate_target_ext() will clean this up */
diff --git a/tcp_splice.c b/tcp_splice.c
index 1a77ac2e8a18..3215337d3e62 100644
--- a/tcp_splice.c
+++ b/tcp_splice.c
@@ -378,7 +378,6 @@ static int tcp_splice_connect(const struct ctx *c, struct tcp_splice_conn *conn)
 	pif_sockaddr(c, &sa, tgtpif, &tgt->eaddr, tgt->eport);
 
 	FLOW_SETQP(conn, QPAIR_DEFAULT);
-	flow_epollid_set(&conn->f, EPOLLFD_ID_DEFAULT);
 	if (flow_epoll_set(&conn->f, EPOLL_CTL_ADD, 0, conn->s[0], 0) ||
 	    flow_epoll_set(&conn->f, EPOLL_CTL_ADD, 0, conn->s[1], 1)) {
 		int ret = -errno;
diff --git a/udp_flow.c b/udp_flow.c
index 44e0c4c50ca9..27dc24ffb2ae 100644
--- a/udp_flow.c
+++ b/udp_flow.c
@@ -154,7 +154,6 @@ static flow_sidx_t udp_flow_new(const struct ctx *c, unsigned int qpair,
 	uflow->activity[INISIDE] = 1;
 	uflow->activity[TGTSIDE] = 0;
 	FLOW_SETQP(uflow, qpair);
-	flow_epollid_set(&uflow->f, EPOLLFD_ID_DEFAULT);
 
 	flow_foreach_sidei(sidei) {
 		if (pif_is_socket(uflow->f.pif[sidei]))
@@ -292,8 +291,7 @@ flow_sidx_t udp_flow_from_tap(const struct ctx *c, unsigned int qpair,
 			      srcport, dstport);
 	if ((uflow = udp_at_sidx(sidx))) {
 		udp_flow_activity(uflow, sidx.sidei, now);
-		/* update qpair */
-		FLOW_SETQP(uflow, qpair); /* if qpair changes, update epollfd */
+		FLOW_MIGRATE(uflow, qpair, EPOLLIN, uflow->s[TGTSIDE], TGTSIDE);
 		return flow_sidx_opposite(sidx);
 	}
 
-- 
2.54.0


^ permalink raw reply	[flat|nested] 13+ messages in thread

end of thread, other threads:[~2026-06-16 12:51 UTC | newest]

Thread overview: 13+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2026-06-16 12:51 [PATCH v5 00/12] vhost-user: Add multiqueue support Laurent Vivier
2026-06-16 12:51 ` [PATCH v5 01/12] tap: Remove pool parameter from tap4_handler() and tap6_handler() Laurent Vivier
2026-06-16 12:51 ` [PATCH v5 02/12] vhost-user: Advertise multiqueue support Laurent Vivier
2026-06-16 12:51 ` [PATCH v5 03/12] test: Add multiqueue support to vhost-user test infrastructure Laurent Vivier
2026-06-16 12:51 ` [PATCH v5 04/12] tap: Thread queue pair through all remaining tap paths Laurent Vivier
2026-06-16 12:51 ` [PATCH v5 05/12] arp: Pass queue pair explicitly through ARP send path Laurent Vivier
2026-06-16 12:51 ` [PATCH v5 06/12] tcp: Pass queue pair explicitly through TCP " Laurent Vivier
2026-06-16 12:51 ` [PATCH v5 07/12] udp: Pass queue pair explicitly through UDP " Laurent Vivier
2026-06-16 12:51 ` [PATCH v5 08/12] dhcp/dhcpv6: Pass queue pair explicitly through DHCP " Laurent Vivier
2026-06-16 12:51 ` [PATCH v5 09/12] icmp: Pass queue pair explicitly through ICMP " Laurent Vivier
2026-06-16 12:51 ` [PATCH v5 10/12] ndp: Pass queue pair explicitly through NDP " Laurent Vivier
2026-06-16 12:51 ` [PATCH v5 11/12] flow: Add queue pair tracking to flow management Laurent Vivier
2026-06-16 12:51 ` [PATCH v5 12/12] flow: Derive epoll fd from queue pair, removing epollid field Laurent Vivier

Code repositories for project(s) associated with this public inbox

	https://passt.top/passt

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for IMAP folder(s).