public inbox for passt-dev@passt.top
 help / color / mirror / code / Atom feed
From: Laurent Vivier <lvivier@redhat.com>
To: passt-dev@passt.top
Cc: Laurent Vivier <lvivier@redhat.com>
Subject: [PATCH 3/4] multiqueue: Add queue-aware flow management for multiqueue support
Date: Fri,  7 Nov 2025 15:39:00 +0100	[thread overview]
Message-ID: <20251107143901.89955-4-lvivier@redhat.com> (raw)
In-Reply-To: <20251107143901.89955-1-lvivier@redhat.com>

Packets are now routed to the correct RX queue based on which TX queue
they arrived on, rather than always using queue 0.

Note: Flows initiated from the host (via sockets, udp_flow_from_sock())
currently default to queue 0, as they don't have an associated incoming
queue.

Signed-off-by: Laurent Vivier <lvivier@redhat.com>
---
 flow.c      | 32 ++++++++++++++++++++++
 flow.h      | 10 +++++++
 icmp.c      | 23 +++++++++-------
 icmp.h      |  2 +-
 tap.c       | 77 +++++++++++++++++++++++++++------------------------
 tap.h       |  5 ++--
 tcp.c       | 79 +++++++++++++++++++++++++++++------------------------
 tcp.h       |  2 +-
 tcp_vu.c    |  8 ++++--
 udp.c       | 33 ++++++++++++----------
 udp.h       | 12 ++++----
 udp_flow.c  |  8 +++++-
 udp_flow.h  |  2 +-
 udp_vu.c    |  4 ++-
 vu_common.c |  4 +--
 15 files changed, 187 insertions(+), 114 deletions(-)

diff --git a/flow.c b/flow.c
index 278a9cf0ac6d..8e9d7e5e1847 100644
--- a/flow.c
+++ b/flow.c
@@ -405,6 +405,37 @@ void flow_epollid_register(int epollid, int epollfd)
 	epoll_id_to_fd[epollid] = epollfd;
 }
 
+/**
+ * flow_rx_virtqueue() - Get RX (receive) queue number for a flow
+ * @f:		Flow to query (may be NULL)
+ *
+ * Return: RX queue number for the flow, or 0 if flow is NULL or has no
+ *         valid queue assignment
+ */
+int flow_rx_virtqueue(const struct flow_common *f)
+{
+	if (f == NULL || f->queueid == FLOW_QUEUEID_INVALID)
+		return 0;
+	return f->queueid << 1;
+}
+
+/**
+ * flow_queue_set() - Set queue pair assignment for a flow
+ * @f:		Flow to update
+ * @queueid:	Queue pair number to assign (even number, RX queue; TX is RX+1)
+ */
+void flow_queue_set(struct flow_common *f, int queueid)
+{
+	queueid >>= 1;
+
+	ASSERT(queueid < FLOW_QUEUEID_MAX);
+
+	flow_trace((union flow *)f, "updating queue from %d to %d",
+		   f->queueid, queueid);
+
+	f->queueid = queueid;
+}
+
 /**
  * flow_initiate_() - Move flow to INI, setting pif[INISIDE]
  * @flow:	Flow to change state
@@ -609,6 +640,7 @@ union flow *flow_alloc(void)
 	flow_new_entry = flow;
 	memset(flow, 0, sizeof(*flow));
 	flow_epollid_clear(&flow->f);
+	flow->f.queueid = FLOW_QUEUEID_INVALID;
 	flow_set_state(&flow->f, FLOW_STATE_NEW);
 
 	return flow;
diff --git a/flow.h b/flow.h
index b43b0b1dd7f2..44ab4ae8fd6a 100644
--- a/flow.h
+++ b/flow.h
@@ -179,6 +179,8 @@ int flowside_connect(const struct ctx *c, int s,
  * @side[]:	Information for each side of the flow
  * @tap_omac:	MAC address of remote endpoint as seen from the guest
  * @epollid:	epollfd identifier, or EPOLLFD_ID_INVALID
+ * @queueid:	Queue pair number assigned to this flow
+ *		(FLOW_QUEUEID_INVALID if not assigned)
  */
 struct flow_common {
 #ifdef __GNUC__
@@ -199,6 +201,8 @@ struct flow_common {
 
 #define EPOLLFD_ID_BITS 8
 	unsigned int	epollid:EPOLLFD_ID_BITS;
+#define FLOW_QUEUEID_BITS 5
+	unsigned int	queueid:FLOW_QUEUEID_BITS;
 };
 
 #define EPOLLFD_ID_DEFAULT	0
@@ -206,6 +210,10 @@ struct flow_common {
 #define EPOLLFD_ID_MAX		(EPOLLFD_ID_SIZE - 1)
 #define EPOLLFD_ID_INVALID	EPOLLFD_ID_MAX
 
+#define FLOW_QUEUEID_SIZE	(1 << FLOW_QUEUEID_BITS)
+#define FLOW_QUEUEID_MAX	(FLOW_QUEUEID_SIZE - 1)
+#define FLOW_QUEUEID_INVALID	FLOW_QUEUEID_MAX
+
 #define FLOW_INDEX_BITS		17	/* 128k - 1 */
 #define FLOW_MAX		MAX_FROM_BITS(FLOW_INDEX_BITS)
 
@@ -266,6 +274,8 @@ int flow_epollfd(const struct flow_common *f);
 void flow_epollid_set(struct flow_common *f, int epollid);
 void flow_epollid_clear(struct flow_common *f);
 void flow_epollid_register(int epollid, int epollfd);
+int flow_rx_virtqueue(const struct flow_common *f);
+void flow_queue_set(struct flow_common *f, int queueid);
 void flow_defer_handler(const struct ctx *c, const struct timespec *now);
 int flow_migrate_source_early(struct ctx *c, const struct migrate_stage *stage,
 			      int fd);
diff --git a/icmp.c b/icmp.c
index d58499c3bf5c..80e8753072fa 100644
--- a/icmp.c
+++ b/icmp.c
@@ -132,13 +132,13 @@ void icmp_sock_handler(const struct ctx *c, union epoll_ref ref)
 		const struct in_addr *daddr = inany_v4(&ini->eaddr);
 
 		ASSERT(saddr && daddr); /* Must have IPv4 addresses */
-		tap_icmp4_send(c, VHOST_USER_RX_QUEUE, *saddr, *daddr, buf,
+		tap_icmp4_send(c, flow_rx_virtqueue(&pingf->f), *saddr, *daddr, buf,
 			       pingf->f.tap_omac, n);
 	} else if (pingf->f.type == FLOW_PING6) {
 		const struct in6_addr *saddr = &ini->oaddr.a6;
 		const struct in6_addr *daddr = &ini->eaddr.a6;
 
-		tap_icmp6_send(c, VHOST_USER_RX_QUEUE, saddr, daddr, buf,
+		tap_icmp6_send(c, flow_rx_virtqueue(&pingf->f), saddr, daddr, buf,
 			       pingf->f.tap_omac, n);
 	}
 	return;
@@ -238,17 +238,18 @@ cancel:
 
 /**
  * icmp_tap_handler() - Handle packets from tap
- * @c:		Execution context
- * @pif:	pif on which the packet is arriving
- * @af:		Address family, AF_INET or AF_INET6
- * @saddr:	Source address
- * @daddr:	Destination address
- * @data:	Single packet with ICMP/ICMPv6 header
- * @now:	Current timestamp
+ * @c:			Execution context
+ * @incoming_queue:	Incoming queue number
+ * @pif:		pif on which the packet is arriving
+ * @af:			Address family, AF_INET or AF_INET6
+ * @saddr:		Source address
+ * @daddr:		Destination address
+ * @data:		Single packet with ICMP/ICMPv6 header
+ * @now:		Current timestamp
  *
  * Return: count of consumed packets (always 1, even if malformed)
  */
-int icmp_tap_handler(const struct ctx *c, uint8_t pif, sa_family_t af,
+int icmp_tap_handler(const struct ctx *c, int incoming_queue, uint8_t pif, sa_family_t af,
 		     const void *saddr, const void *daddr,
 		     struct iov_tail *data, const struct timespec *now)
 {
@@ -309,6 +310,8 @@ int icmp_tap_handler(const struct ctx *c, uint8_t pif, sa_family_t af,
 	else if (!(pingf = icmp_ping_new(c, af, id, saddr, daddr)))
 		return 1;
 
+	flow_queue_set(&pingf->f, incoming_queue);
+
 	tgt = &pingf->f.side[TGTSIDE];
 
 	ASSERT(flow_proto[pingf->f.type] == proto);
diff --git a/icmp.h b/icmp.h
index 1a0e6205f087..6d6d6358bb33 100644
--- a/icmp.h
+++ b/icmp.h
@@ -10,7 +10,7 @@ struct ctx;
 struct icmp_ping_flow;
 
 void icmp_sock_handler(const struct ctx *c, union epoll_ref ref);
-int icmp_tap_handler(const struct ctx *c, uint8_t pif, sa_family_t af,
+int icmp_tap_handler(const struct ctx *c, int incoming_queue, uint8_t pif, sa_family_t af,
 		     const void *saddr, const void *daddr,
 		     struct iov_tail *data, const struct timespec *now);
 void icmp_init(void);
diff --git a/tap.c b/tap.c
index 1308d49242e8..9a4399d947a3 100644
--- a/tap.c
+++ b/tap.c
@@ -702,15 +702,17 @@ static bool tap4_is_fragment(const struct iphdr *iph,
 
 /**
  * tap4_handler() - IPv4 and ARP packet handler for tap file descriptor
- * @c:		Execution context
- * @in:		Ingress packet pool, packets with Ethernet headers
- * @now:	Current timestamp
+ * @c:			Execution context
+ * @incoming_queue:	Incoming queue number
+ * @in:			Ingress packet pool, packets with Ethernet headers
+ * @now:		Current timestamp
  *
  * Return: count of packets consumed by handlers
  */
-static int tap4_handler(struct ctx *c, const struct pool *in,
-			const struct timespec *now)
+static int tap4_handler(struct ctx *c, int incoming_queue,
+			const struct pool *in, const struct timespec *now)
 {
+	int outgoing_queue = incoming_queue & ~1;
 	unsigned int i, j, seq_count;
 	struct tap4_l4_t *seq;
 
@@ -736,7 +738,7 @@ resume:
 		if (!eh)
 			continue;
 		if (ntohs(eh->h_proto) == ETH_P_ARP) {
-			arp(c, VHOST_USER_RX_QUEUE, &data);
+			arp(c, outgoing_queue, &data);
 			continue;
 		}
 
@@ -783,7 +785,7 @@ resume:
 
 			tap_packet_debug(iph, NULL, NULL, 0, NULL, 1);
 
-			icmp_tap_handler(c, PIF_TAP, AF_INET,
+			icmp_tap_handler(c, incoming_queue, PIF_TAP, AF_INET,
 					 &iph->saddr, &iph->daddr,
 					 &data, now);
 			continue;
@@ -797,7 +799,7 @@ resume:
 			struct iov_tail eh_data;
 
 			packet_get(in, i, &eh_data);
-			if (dhcp(c, VHOST_USER_RX_QUEUE, &eh_data))
+			if (dhcp(c, outgoing_queue, &eh_data))
 				continue;
 		}
 
@@ -860,14 +862,14 @@ append:
 			if (c->no_tcp)
 				continue;
 			for (k = 0; k < p->count; )
-				k += tcp_tap_handler(c, PIF_TAP, AF_INET,
+				k += tcp_tap_handler(c, incoming_queue, PIF_TAP, AF_INET,
 						     &seq->saddr, &seq->daddr,
 						     0, p, k, now);
 		} else if (seq->protocol == IPPROTO_UDP) {
 			if (c->no_udp)
 				continue;
 			for (k = 0; k < p->count; )
-				k += udp_tap_handler(c, PIF_TAP, AF_INET,
+				k += udp_tap_handler(c, incoming_queue, PIF_TAP, AF_INET,
 						     &seq->saddr, &seq->daddr,
 						     seq->ttl, p, k, now);
 		}
@@ -881,15 +883,17 @@ append:
 
 /**
  * tap6_handler() - IPv6 packet handler for tap file descriptor
- * @c:		Execution context
- * @in:		Ingress packet pool, packets with Ethernet headers
- * @now:	Current timestamp
+ * @c:			Execution context
+ * @incoming_queue:	Incoming queue number
+ * @in:			Ingress packet pool, packets with Ethernet headers
+ * @now:		Current timestamp
  *
  * Return: count of packets consumed by handlers
  */
-static int tap6_handler(struct ctx *c, const struct pool *in,
-			const struct timespec *now)
+static int tap6_handler(struct ctx *c, int incoming_queue,
+			const struct pool *in, const struct timespec *now)
 {
+	int outgoing_queue = incoming_queue & ~1;
 	unsigned int i, j, seq_count = 0;
 	struct tap6_l4_t *seq;
 
@@ -965,12 +969,12 @@ resume:
 				continue;
 
 			ndp_data = data;
-			if (ndp(c, VHOST_USER_RX_QUEUE, saddr, &ndp_data))
+			if (ndp(c, outgoing_queue, saddr, &ndp_data))
 				continue;
 
 			tap_packet_debug(NULL, ip6h, NULL, proto, NULL, 1);
 
-			icmp_tap_handler(c, PIF_TAP, AF_INET6,
+			icmp_tap_handler(c, incoming_queue, PIF_TAP, AF_INET6,
 					 saddr, daddr, &data, now);
 			continue;
 		}
@@ -984,8 +988,7 @@ resume:
 		if (proto == IPPROTO_UDP) {
 			struct iov_tail uh_data = data;
 
-			if (dhcpv6(c, VHOST_USER_RX_QUEUE, &uh_data, saddr,
-				   daddr))
+			if (dhcpv6(c, outgoing_queue, &uh_data, saddr, daddr))
 				continue;
 		}
 
@@ -1053,14 +1056,14 @@ append:
 			if (c->no_tcp)
 				continue;
 			for (k = 0; k < p->count; )
-				k += tcp_tap_handler(c, PIF_TAP, AF_INET6,
+				k += tcp_tap_handler(c, incoming_queue, PIF_TAP, AF_INET6,
 						     &seq->saddr, &seq->daddr,
 						     seq->flow_lbl, p, k, now);
 		} else if (seq->protocol == IPPROTO_UDP) {
 			if (c->no_udp)
 				continue;
 			for (k = 0; k < p->count; )
-				k += udp_tap_handler(c, PIF_TAP, AF_INET6,
+				k += udp_tap_handler(c, incoming_queue, PIF_TAP, AF_INET6,
 						     &seq->saddr, &seq->daddr,
 						     seq->hop_limit, p, k, now);
 		}
@@ -1083,22 +1086,24 @@ void tap_flush_pools(void)
 
 /**
  * tap_handler() - IPv4/IPv6 and ARP packet handler for tap file descriptor
- * @c:		Execution context
- * @now:	Current timestamp
+ * @c:			Execution context
+ * @incoming_queue:	Incoming queue number
+ * @now:		Current timestamp
  */
-void tap_handler(struct ctx *c, const struct timespec *now)
+void tap_handler(struct ctx *c, int incoming_queue, const struct timespec *now)
 {
-	tap4_handler(c, pool_tap4, now);
-	tap6_handler(c, pool_tap6, now);
+	tap4_handler(c, incoming_queue, pool_tap4, now);
+	tap6_handler(c, incoming_queue, pool_tap6, now);
 }
 
 /**
  * tap_add_packet() - Queue/capture packet, update notion of guest MAC address
- * @c:		Execution context
- * @data:	Packet to add to the pool
- * @now:	Current timestamp
+ * @c:			Execution context
+ * @incoming_queue:	Incoming queue number
+ * @data:		Packet to add to the pool
+ * @now:		Current timestamp
  */
-void tap_add_packet(struct ctx *c, struct iov_tail *data,
+void tap_add_packet(struct ctx *c, int incoming_queue, struct iov_tail *data,
 		    const struct timespec *now)
 {
 	struct ethhdr eh_storage;
@@ -1123,14 +1128,14 @@ void tap_add_packet(struct ctx *c, struct iov_tail *data,
 	case ETH_P_ARP:
 	case ETH_P_IP:
 		if (!pool_can_fit(pool_tap4, data)) {
-			tap4_handler(c, pool_tap4, now);
+			tap4_handler(c, incoming_queue, pool_tap4, now);
 			pool_flush(pool_tap4);
 		}
 		packet_add(pool_tap4, data);
 		break;
 	case ETH_P_IPV6:
 		if (!pool_can_fit(pool_tap6, data)) {
-			tap6_handler(c, pool_tap6, now);
+			tap6_handler(c, incoming_queue, pool_tap6, now);
 			pool_flush(pool_tap6);
 		}
 		packet_add(pool_tap6, data);
@@ -1217,7 +1222,7 @@ static void tap_passt_input(struct ctx *c, const struct timespec *now)
 		n -= sizeof(uint32_t);
 
 		data = IOV_TAIL_FROM_BUF(p, l2len, 0);
-		tap_add_packet(c, &data, now);
+		tap_add_packet(c, 0, &data, now);
 
 		p += l2len;
 		n -= l2len;
@@ -1226,7 +1231,7 @@ static void tap_passt_input(struct ctx *c, const struct timespec *now)
 	partial_len = n;
 	partial_frame = p;
 
-	tap_handler(c, now);
+	tap_handler(c, 0, now);
 }
 
 /**
@@ -1285,10 +1290,10 @@ static void tap_pasta_input(struct ctx *c, const struct timespec *now)
 			continue;
 
 		data = IOV_TAIL_FROM_BUF(pkt_buf + n, len, 0);
-		tap_add_packet(c, &data, now);
+		tap_add_packet(c, 0, &data, now);
 	}
 
-	tap_handler(c, now);
+	tap_handler(c, 0, now);
 }
 
 /**
diff --git a/tap.h b/tap.h
index 76403a43edbc..fe9455ffcf4b 100644
--- a/tap.h
+++ b/tap.h
@@ -119,7 +119,8 @@ int tap_sock_unix_open(char *sock_path);
 void tap_sock_reset(struct ctx *c);
 void tap_backend_init(struct ctx *c);
 void tap_flush_pools(void);
-void tap_handler(struct ctx *c, const struct timespec *now);
-void tap_add_packet(struct ctx *c, struct iov_tail *data,
+void tap_handler(struct ctx *c, int incoming_queue,
+		 const struct timespec *now);
+void tap_add_packet(struct ctx *c, int incoming_queue, struct iov_tail *data,
 		    const struct timespec *now);
 #endif /* TAP_H */
diff --git a/tcp.c b/tcp.c
index 5ce34baa8a5a..78494a2dc69f 100644
--- a/tcp.c
+++ b/tcp.c
@@ -1496,21 +1496,23 @@ static void tcp_bind_outbound(const struct ctx *c,
 
 /**
  * tcp_conn_from_tap() - Handle connection request (SYN segment) from tap
- * @c:		Execution context
- * @af:		Address family, AF_INET or AF_INET6
- * @saddr:	Source address, pointer to in_addr or in6_addr
- * @daddr:	Destination address, pointer to in_addr or in6_addr
- * @th:		TCP header from tap: caller MUST ensure it's there
- * @opts:	Pointer to start of options
- * @optlen:	Bytes in options: caller MUST ensure available length
- * @now:	Current timestamp
+ * @c:			Execution context
+ * @incoming_queue:	TX queue number for the flow
+ * @af:			Address family, AF_INET or AF_INET6
+ * @saddr:		Source address, pointer to in_addr or in6_addr
+ * @daddr:		Destination address, pointer to in_addr or in6_addr
+ * @th:			TCP header from tap: caller MUST ensure it's there
+ * @opts:		Pointer to start of options
+ * @optlen:		Bytes in options: caller MUST ensure available length
+ * @now:		Current timestamp
  *
  * #syscalls:vu getsockname
  */
-static void tcp_conn_from_tap(const struct ctx *c, sa_family_t af,
-			      const void *saddr, const void *daddr,
-			      const struct tcphdr *th, const char *opts,
-			      size_t optlen, const struct timespec *now)
+static void tcp_conn_from_tap(const struct ctx *c, int incoming_queue,
+			      sa_family_t af, const void *saddr,
+			      const void *daddr, const struct tcphdr *th,
+			      const char *opts, size_t optlen,
+			      const struct timespec *now)
 {
 	in_port_t srcport = ntohs(th->source);
 	in_port_t dstport = ntohs(th->dest);
@@ -1622,6 +1624,7 @@ static void tcp_conn_from_tap(const struct ctx *c, sa_family_t af,
 		conn_event(c, conn, TAP_SYN_ACK_SENT);
 	}
 
+	flow_queue_set(&conn->f, incoming_queue);
 	tcp_epoll_ctl(c, conn);
 
 	if (c->mode == MODE_VU) { /* To rebind to same oport after migration */
@@ -1983,16 +1986,16 @@ static void tcp_conn_from_sock_finish(const struct ctx *c,
 
 /**
  * tcp_rst_no_conn() - Send RST in response to a packet with no connection
- * @c:		Execution context
- * @queue:	Queue to use to send the reply
- * @af:		Address family, AF_INET or AF_INET6
- * @saddr:	Source address of the packet we're responding to
- * @daddr:	Destination address of the packet we're responding to
- * @flow_lbl:	IPv6 flow label (ignored for IPv4)
- * @th:		TCP header of the packet we're responding to
- * @l4len:	Packet length, including TCP header
- */
-static void tcp_rst_no_conn(const struct ctx *c, int queue, int af,
+ * @c:			Execution context
+ * @outgoing_queue:	Queue to use to send the reply
+ * @af:			Address family, AF_INET or AF_INET6
+ * @saddr:		Source address of the packet we're responding to
+ * @daddr:		Destination address of the packet we're responding to
+ * @flow_lbl:		IPv6 flow label (ignored for IPv4)
+ * @th:			TCP header of the packet we're responding to
+ * @l4len:		Packet length, including TCP header
+ */
+static void tcp_rst_no_conn(const struct ctx *c, int outgoing_queue, int af,
 			    const void *saddr, const void *daddr,
 			    uint32_t flow_lbl,
 			    const struct tcphdr *th, size_t l4len)
@@ -2050,24 +2053,25 @@ static void tcp_rst_no_conn(const struct ctx *c, int queue, int af,
 
 	tcp_update_csum(psum, rsth, &payload);
 	rst_l2len = ((char *)rsth - buf) + sizeof(*rsth);
-	tap_send_single(c, queue, buf, rst_l2len);
+	tap_send_single(c, outgoing_queue, buf, rst_l2len);
 }
 
 /**
  * tcp_tap_handler() - Handle packets from tap and state transitions
- * @c:		Execution context
- * @pif:	pif on which the packet is arriving
- * @af:		Address family, AF_INET or AF_INET6
- * @saddr:	Source address
- * @daddr:	Destination address
- * @flow_lbl:	IPv6 flow label (ignored for IPv4)
- * @p:		Pool of TCP packets, with TCP headers
- * @idx:	Index of first packet in pool to process
- * @now:	Current timestamp
+ * @c:			Execution context
+ * @incoming_queue:	Incoming queue number
+ * @pif:		pif on which the packet is arriving
+ * @af:			Address family, AF_INET or AF_INET6
+ * @saddr:		Source address
+ * @daddr:		Destination address
+ * @flow_lbl:		IPv6 flow label (ignored for IPv4)
+ * @p:			Pool of TCP packets, with TCP headers
+ * @idx:		Index of first packet in pool to process
+ * @now:		Current timestamp
  *
  * Return: count of consumed packets
  */
-int tcp_tap_handler(const struct ctx *c, uint8_t pif,
+int tcp_tap_handler(const struct ctx *c, int incoming_queue, uint8_t pif,
 		    sa_family_t af, const void *saddr, const void *daddr,
 		    uint32_t flow_lbl, const struct pool *p, int idx,
 		    const struct timespec *now)
@@ -2107,11 +2111,11 @@ int tcp_tap_handler(const struct ctx *c, uint8_t pif,
 	/* New connection from tap */
 	if (!flow) {
 		if (opts && th->syn && !th->ack)
-			tcp_conn_from_tap(c, af, saddr, daddr, th,
+			tcp_conn_from_tap(c, incoming_queue, af, saddr, daddr, th,
 					  opts, optlen, now);
 		else
-			tcp_rst_no_conn(c, VHOST_USER_RX_QUEUE, af, saddr,
-					daddr, flow_lbl, th, l4len);
+			tcp_rst_no_conn(c, incoming_queue & ~1, af, saddr, daddr,
+					flow_lbl, th, l4len);
 		return 1;
 	}
 
@@ -2119,6 +2123,9 @@ int tcp_tap_handler(const struct ctx *c, uint8_t pif,
 	ASSERT(pif_at_sidx(sidx) == PIF_TAP);
 	conn = &flow->tcp;
 
+	/* update queue */
+	flow_queue_set(&flow->f, incoming_queue);
+
 	flow_trace(conn, "packet length %zu from tap", l4len);
 
 	if (th->rst) {
diff --git a/tcp.h b/tcp.h
index 320683ce5679..cddd36cadc97 100644
--- a/tcp.h
+++ b/tcp.h
@@ -15,7 +15,7 @@ void tcp_listen_handler(const struct ctx *c, union epoll_ref ref,
 			const struct timespec *now);
 void tcp_sock_handler(const struct ctx *c,
 		      union epoll_ref ref, uint32_t events);
-int tcp_tap_handler(const struct ctx *c, uint8_t pif,
+int tcp_tap_handler(const struct ctx *c, int incoming_queue, uint8_t pif,
 		    sa_family_t af, const void *saddr, const void *daddr,
 		    uint32_t flow_lbl, const struct pool *p, int idx,
 		    const struct timespec *now);
diff --git a/tcp_vu.c b/tcp_vu.c
index 1c81ce376dad..40f552087bc5 100644
--- a/tcp_vu.c
+++ b/tcp_vu.c
@@ -70,15 +70,16 @@ static size_t tcp_vu_hdrlen(bool v6)
  */
 int tcp_vu_send_flag(const struct ctx *c, struct tcp_tap_conn *conn, int flags)
 {
+	int rx_queue = flow_rx_virtqueue(&conn->f);
 	struct vu_dev *vdev = c->vdev;
-	struct vu_virtq *vq = &vdev->vq[VHOST_USER_RX_QUEUE];
-	size_t optlen, hdrlen;
+	struct vu_virtq *vq = &vdev->vq[rx_queue];
 	struct vu_virtq_element flags_elem[2];
 	struct ipv6hdr *ip6h = NULL;
 	struct iphdr *ip4h = NULL;
 	struct iovec flags_iov[2];
 	struct tcp_syn_opts *opts;
 	struct iov_tail payload;
+	size_t optlen, hdrlen;
 	struct tcphdr *th;
 	struct ethhdr *eh;
 	uint32_t seq;
@@ -348,8 +349,9 @@ static void tcp_vu_prepare(const struct ctx *c, struct tcp_tap_conn *conn,
 int tcp_vu_data_from_sock(const struct ctx *c, struct tcp_tap_conn *conn)
 {
 	uint32_t wnd_scaled = conn->wnd_from_tap << conn->ws_from_tap;
+	int rx_queue = flow_rx_virtqueue(&conn->f);
 	struct vu_dev *vdev = c->vdev;
-	struct vu_virtq *vq = &vdev->vq[VHOST_USER_RX_QUEUE];
+	struct vu_virtq *vq = &vdev->vq[rx_queue];
 	ssize_t len, previous_dlen;
 	int i, iov_cnt, head_cnt;
 	size_t hdrlen, fillsize;
diff --git a/udp.c b/udp.c
index 868ffebb5802..a0eb719888cd 100644
--- a/udp.c
+++ b/udp.c
@@ -636,14 +636,15 @@ static int udp_sock_recverr(const struct ctx *c, int s, flow_sidx_t sidx,
 	if (hdr->cmsg_level == IPPROTO_IP &&
 	    (o4 = inany_v4(&otap)) && inany_v4(&toside->eaddr)) {
 		dlen = MIN(dlen, ICMP4_MAX_DLEN);
-		udp_send_tap_icmp4(c, VHOST_USER_RX_QUEUE, ee, toside, *o4,
-				   data, dlen);
+		udp_send_tap_icmp4(c, flow_rx_virtqueue(&uflow->f), ee, toside,
+				   *o4, data, dlen);
 		return 1;
 	}
 
 	if (hdr->cmsg_level == IPPROTO_IPV6 && !inany_v4(&toside->eaddr)) {
-		udp_send_tap_icmp6(c, VHOST_USER_RX_QUEUE, ee, toside,
-				   &otap.a6, data, dlen, FLOW_IDX(uflow));
+		udp_send_tap_icmp6(c, flow_rx_virtqueue(&uflow->f), ee,
+				   toside, &otap.a6, data, dlen,
+				   FLOW_IDX(uflow));
 		return 1;
 	}
 
@@ -971,25 +972,27 @@ fail:
 
 /**
  * udp_tap_handler() - Handle packets from tap
- * @c:		Execution context
- * @pif:	pif on which the packet is arriving
- * @af:		Address family, AF_INET or AF_INET6
- * @saddr:	Source address
- * @daddr:	Destination address
- * @ttl:	TTL or hop limit for packets to be sent in this call
- * @p:		Pool of UDP packets, with UDP headers
- * @idx:	Index of first packet to process
- * @now:	Current timestamp
+ * @c:			Execution context
+ * @incoming_queue:	Incoming queue number
+ * @pif:		pif on which the packet is arriving
+ * @af:			Address family, AF_INET or AF_INET6
+ * @saddr:		Source address
+ * @daddr:		Destination address
+ * @ttl:		TTL or hop limit for packets to be sent in this call
+ * @p:			Pool of UDP packets, with UDP headers
+ * @idx:		Index of first packet to process
+ * @now:		Current timestamp
  *
  * Return: count of consumed packets
  *
  * #syscalls sendmmsg
  */
-int udp_tap_handler(const struct ctx *c, uint8_t pif,
+int udp_tap_handler(const struct ctx *c, int incoming_queue, uint8_t pif,
 		    sa_family_t af, const void *saddr, const void *daddr,
 		    uint8_t ttl, const struct pool *p, int idx,
 		    const struct timespec *now)
 {
+	int outgoing_queue = incoming_queue & ~1;
 	const struct flowside *toside;
 	struct mmsghdr mm[UIO_MAXIOV];
 	union sockaddr_inany to_sa;
@@ -1019,7 +1022,7 @@ int udp_tap_handler(const struct ctx *c, uint8_t pif,
 	src = ntohs(uh->source);
 	dst = ntohs(uh->dest);
 
-	tosidx = udp_flow_from_tap(c, pif, af, saddr, daddr, src, dst, now);
+	tosidx = udp_flow_from_tap(c, outgoing_queue, pif, af, saddr, daddr, src, dst, now);
 	if (!(uflow = udp_at_sidx(tosidx))) {
 		char sstr[INET6_ADDRSTRLEN], dstr[INET6_ADDRSTRLEN];
 
diff --git a/udp.h b/udp.h
index f1d83f380b3f..8ba4ccfe646a 100644
--- a/udp.h
+++ b/udp.h
@@ -7,11 +7,13 @@
 #define UDP_H
 
 void udp_portmap_clear(void);
-void udp_listen_sock_handler(const struct ctx *c, union epoll_ref ref,
-			     uint32_t events, const struct timespec *now);
-void udp_sock_handler(const struct ctx *c, union epoll_ref ref,
-		      uint32_t events, const struct timespec *now);
-int udp_tap_handler(const struct ctx *c, uint8_t pif,
+void udp_listen_sock_handler(const struct ctx *c,
+			     union epoll_ref ref, uint32_t events,
+			     const struct timespec *now);
+void udp_sock_handler(const struct ctx *c,
+		      union epoll_ref ref, uint32_t events,
+		      const struct timespec *now);
+int udp_tap_handler(const struct ctx *c, int incoming_queue, uint8_t pif,
 		    sa_family_t af, const void *saddr, const void *daddr,
 		    uint8_t ttl, const struct pool *p, int idx,
 		    const struct timespec *now);
diff --git a/udp_flow.c b/udp_flow.c
index 8907f2f72741..b4a709b8d976 100644
--- a/udp_flow.c
+++ b/udp_flow.c
@@ -266,17 +266,19 @@ flow_sidx_t udp_flow_from_sock(const struct ctx *c, uint8_t pif,
 /**
  * udp_flow_from_tap() - Find or create UDP flow for tap packets
  * @c:		Execution context
+ * @queue:	RX queue number for the flow
  * @pif:	pif on which the packet is arriving
  * @af:		Address family, AF_INET or AF_INET6
  * @saddr:	Source address on guest side
  * @daddr:	Destination address guest side
  * @srcport:	Source port on guest side
  * @dstport:	Destination port on guest side
+ * @now:	Current timestamp
  *
  * Return: sidx for the destination side of the flow for this packet, or
  *         FLOW_SIDX_NONE if we couldn't find or create a flow.
  */
-flow_sidx_t udp_flow_from_tap(const struct ctx *c,
+flow_sidx_t udp_flow_from_tap(const struct ctx *c, int queue,
 			      uint8_t pif, sa_family_t af,
 			      const void *saddr, const void *daddr,
 			      in_port_t srcport, in_port_t dstport,
@@ -293,6 +295,8 @@ flow_sidx_t udp_flow_from_tap(const struct ctx *c,
 			      srcport, dstport);
 	if ((uflow = udp_at_sidx(sidx))) {
 		uflow->ts = now->tv_sec;
+		/* update queue */
+		flow_queue_set(&uflow->f, queue);
 		return flow_sidx_opposite(sidx);
 	}
 
@@ -316,6 +320,8 @@ flow_sidx_t udp_flow_from_tap(const struct ctx *c,
 		return FLOW_SIDX_NONE;
 	}
 
+	flow_queue_set(&flow->f, queue);
+
 	return udp_flow_new(c, flow, now);
 }
 
diff --git a/udp_flow.h b/udp_flow.h
index 4c528e95ca66..4a057a9d44a8 100644
--- a/udp_flow.h
+++ b/udp_flow.h
@@ -36,7 +36,7 @@ flow_sidx_t udp_flow_from_sock(const struct ctx *c, uint8_t pif,
 			       const union inany_addr *dst, in_port_t port,
 			       const union sockaddr_inany *s_in,
 			       const struct timespec *now);
-flow_sidx_t udp_flow_from_tap(const struct ctx *c,
+flow_sidx_t udp_flow_from_tap(const struct ctx *c, int queue,
 			      uint8_t pif, sa_family_t af,
 			      const void *saddr, const void *daddr,
 			      in_port_t srcport, in_port_t dstport,
diff --git a/udp_vu.c b/udp_vu.c
index 099677f914e7..cd2c9c516d44 100644
--- a/udp_vu.c
+++ b/udp_vu.c
@@ -202,9 +202,11 @@ static void udp_vu_csum(const struct flowside *toside, int iov_used)
 void udp_vu_sock_to_tap(const struct ctx *c, int s, int n, flow_sidx_t tosidx)
 {
 	const struct flowside *toside = flowside_at_sidx(tosidx);
+	const struct udp_flow *uflow = udp_at_sidx(tosidx);
 	bool v6 = !(inany_v4(&toside->eaddr) && inany_v4(&toside->oaddr));
+	int rx_queue = flow_rx_virtqueue(&uflow->f);
 	struct vu_dev *vdev = c->vdev;
-	struct vu_virtq *vq = &vdev->vq[VHOST_USER_RX_QUEUE];
+	struct vu_virtq *vq = &vdev->vq[rx_queue];
 	int i;
 
 	for (i = 0; i < n; i++) {
diff --git a/vu_common.c b/vu_common.c
index 8904403e66af..56f26317b192 100644
--- a/vu_common.c
+++ b/vu_common.c
@@ -196,11 +196,11 @@ static void vu_handle_tx(struct vu_dev *vdev, int index,
 
 		data = IOV_TAIL(elem[count].out_sg, elem[count].out_num, 0);
 		if (IOV_DROP_HEADER(&data, struct virtio_net_hdr_mrg_rxbuf))
-			tap_add_packet(vdev->context, &data, now);
+			tap_add_packet(vdev->context, index, &data, now);
 
 		count++;
 	}
-	tap_handler(vdev->context, now);
+	tap_handler(vdev->context, index, now);
 
 	if (count) {
 		int i;
-- 
2.51.0


  parent reply	other threads:[~2025-11-07 14:39 UTC|newest]

Thread overview: 13+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2025-11-07 14:38 [PATCH 0/4] vhost-user: Add " Laurent Vivier
2025-11-07 14:38 ` [PATCH 1/4] vhost-user: Enable multiqueue Laurent Vivier
2025-11-10  4:48   ` David Gibson
2025-11-17 15:26     ` Laurent Vivier
2025-11-18  0:16       ` David Gibson
2025-11-07 14:38 ` [PATCH 2/4] vhost-user: Add queue parameter throughout the network stack Laurent Vivier
2025-11-10  5:19   ` David Gibson
2025-11-07 14:39 ` Laurent Vivier [this message]
2025-11-10  5:54   ` [PATCH 3/4] multiqueue: Add queue-aware flow management for multiqueue support David Gibson
2025-11-07 14:39 ` [PATCH 4/4] test: Add multiqueue support to vhost-user test infrastructure Laurent Vivier
2025-11-10  5:57   ` David Gibson
2025-11-10  4:40 ` [PATCH 0/4] vhost-user: Add multiqueue support David Gibson
2025-11-10  6:00   ` David Gibson

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=20251107143901.89955-4-lvivier@redhat.com \
    --to=lvivier@redhat.com \
    --cc=passt-dev@passt.top \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
Code repositories for project(s) associated with this public inbox

	https://passt.top/passt

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for IMAP folder(s).