public inbox for passt-dev@passt.top
 help / color / mirror / code / Atom feed
From: David Gibson <david@gibson.dropbear.id.au>
To: Laurent Vivier <lvivier@redhat.com>
Cc: passt-dev@passt.top
Subject: Re: [PATCH 3/4] multiqueue: Add queue-aware flow management for multiqueue support
Date: Mon, 10 Nov 2025 16:54:14 +1100	[thread overview]
Message-ID: <aRF-BiIO69FDaUSU@zatzit> (raw)
In-Reply-To: <20251107143901.89955-4-lvivier@redhat.com>

[-- Attachment #1: Type: text/plain, Size: 32027 bytes --]

On Fri, Nov 07, 2025 at 03:39:00PM +0100, Laurent Vivier wrote:
> Packets are now routed to the correct RX queue based on which TX queue
> they arrived on, rather than always using queue 0.

Am I missing something, or is this not quite accurate.  The packets
are associated by flow with other packets that came on a particular
Tx queue, but AIUI the packet itself didn't arrive on a Tx queue, but
rather from a socket.

> Note: Flows initiated from the host (via sockets, udp_flow_from_sock())
> currently default to queue 0, as they don't have an associated incoming
> queue.
> 
> Signed-off-by: Laurent Vivier <lvivier@redhat.com>
> ---
>  flow.c      | 32 ++++++++++++++++++++++
>  flow.h      | 10 +++++++
>  icmp.c      | 23 +++++++++-------
>  icmp.h      |  2 +-
>  tap.c       | 77 +++++++++++++++++++++++++++------------------------
>  tap.h       |  5 ++--
>  tcp.c       | 79 +++++++++++++++++++++++++++++------------------------
>  tcp.h       |  2 +-
>  tcp_vu.c    |  8 ++++--
>  udp.c       | 33 ++++++++++++----------
>  udp.h       | 12 ++++----
>  udp_flow.c  |  8 +++++-
>  udp_flow.h  |  2 +-
>  udp_vu.c    |  4 ++-
>  vu_common.c |  4 +--
>  15 files changed, 187 insertions(+), 114 deletions(-)
> 
> diff --git a/flow.c b/flow.c
> index 278a9cf0ac6d..8e9d7e5e1847 100644
> --- a/flow.c
> +++ b/flow.c
> @@ -405,6 +405,37 @@ void flow_epollid_register(int epollid, int epollfd)
>  	epoll_id_to_fd[epollid] = epollfd;
>  }
>  
> +/**
> + * flow_rx_virtqueue() - Get RX (receive) queue number for a flow

Maybe avoid the name "virtqueue".  I believe the tap device can also
do multiqueue, and we may want to reuse this queue infrastructure for
that.

> + * @f:		Flow to query (may be NULL)
> + *
> + * Return: RX queue number for the flow, or 0 if flow is NULL or has no
> + *         valid queue assignment
> + */
> +int flow_rx_virtqueue(const struct flow_common *f)
> +{
> +	if (f == NULL || f->queueid == FLOW_QUEUEID_INVALID)

Any reason to have the second special case here, rather than
initializing f->queueid to 0?

> +		return 0;
> +	return f->queueid << 1;

I generally think it's clearer to use * 2 rather than << 1 here.  I
expect the compiler to turn it into the same thing.

> +}
> +
> +/**
> + * flow_queue_set() - Set queue pair assignment for a flow
> + * @f:		Flow to update
> + * @queueid:	Queue pair number to assign (even number, RX queue; TX is RX+1)

In comments using "from guest" / "to guest" instead of Rx/Tx is
probably a good idea.  Particularly in code that's not necessarily
specific to VU.

I think it is a bit confusing that somewhere we have queue pair IDs,
sometimes absolute queue numbers.  I think we should standardise on
one throughout as much of the code as possible.

> + */
> +void flow_queue_set(struct flow_common *f, int queueid)
> +{
> +	queueid >>= 1;
> +
> +	ASSERT(queueid < FLOW_QUEUEID_MAX);
> +
> +	flow_trace((union flow *)f, "updating queue from %d to %d",
> +		   f->queueid, queueid);
> +
> +	f->queueid = queueid;
> +}
> +
>  /**
>   * flow_initiate_() - Move flow to INI, setting pif[INISIDE]
>   * @flow:	Flow to change state
> @@ -609,6 +640,7 @@ union flow *flow_alloc(void)
>  	flow_new_entry = flow;
>  	memset(flow, 0, sizeof(*flow));
>  	flow_epollid_clear(&flow->f);
> +	flow->f.queueid = FLOW_QUEUEID_INVALID;
>  	flow_set_state(&flow->f, FLOW_STATE_NEW);
>  
>  	return flow;
> diff --git a/flow.h b/flow.h
> index b43b0b1dd7f2..44ab4ae8fd6a 100644
> --- a/flow.h
> +++ b/flow.h
> @@ -179,6 +179,8 @@ int flowside_connect(const struct ctx *c, int s,
>   * @side[]:	Information for each side of the flow
>   * @tap_omac:	MAC address of remote endpoint as seen from the guest
>   * @epollid:	epollfd identifier, or EPOLLFD_ID_INVALID
> + * @queueid:	Queue pair number assigned to this flow
> + *		(FLOW_QUEUEID_INVALID if not assigned)
>   */
>  struct flow_common {
>  #ifdef __GNUC__
> @@ -199,6 +201,8 @@ struct flow_common {
>  
>  #define EPOLLFD_ID_BITS 8
>  	unsigned int	epollid:EPOLLFD_ID_BITS;
> +#define FLOW_QUEUEID_BITS 5
> +	unsigned int	queueid:FLOW_QUEUEID_BITS;
>  };
>  
>  #define EPOLLFD_ID_DEFAULT	0
> @@ -206,6 +210,10 @@ struct flow_common {
>  #define EPOLLFD_ID_MAX		(EPOLLFD_ID_SIZE - 1)
>  #define EPOLLFD_ID_INVALID	EPOLLFD_ID_MAX
>  
> +#define FLOW_QUEUEID_SIZE	(1 << FLOW_QUEUEID_BITS)

"SIZE" is a bit ambiguous here.  Maybe "NUM" instead?

> +#define FLOW_QUEUEID_MAX	(FLOW_QUEUEID_SIZE - 1)
> +#define FLOW_QUEUEID_INVALID	FLOW_QUEUEID_MAX
> +
>  #define FLOW_INDEX_BITS		17	/* 128k - 1 */
>  #define FLOW_MAX		MAX_FROM_BITS(FLOW_INDEX_BITS)
>  
> @@ -266,6 +274,8 @@ int flow_epollfd(const struct flow_common *f);
>  void flow_epollid_set(struct flow_common *f, int epollid);
>  void flow_epollid_clear(struct flow_common *f);
>  void flow_epollid_register(int epollid, int epollfd);
> +int flow_rx_virtqueue(const struct flow_common *f);
> +void flow_queue_set(struct flow_common *f, int queueid);
>  void flow_defer_handler(const struct ctx *c, const struct timespec *now);
>  int flow_migrate_source_early(struct ctx *c, const struct migrate_stage *stage,
>  			      int fd);
> diff --git a/icmp.c b/icmp.c
> index d58499c3bf5c..80e8753072fa 100644
> --- a/icmp.c
> +++ b/icmp.c
> @@ -132,13 +132,13 @@ void icmp_sock_handler(const struct ctx *c, union epoll_ref ref)
>  		const struct in_addr *daddr = inany_v4(&ini->eaddr);
>  
>  		ASSERT(saddr && daddr); /* Must have IPv4 addresses */
> -		tap_icmp4_send(c, VHOST_USER_RX_QUEUE, *saddr, *daddr, buf,
> +		tap_icmp4_send(c, flow_rx_virtqueue(&pingf->f), *saddr, *daddr, buf,
>  			       pingf->f.tap_omac, n);
>  	} else if (pingf->f.type == FLOW_PING6) {
>  		const struct in6_addr *saddr = &ini->oaddr.a6;
>  		const struct in6_addr *daddr = &ini->eaddr.a6;
>  
> -		tap_icmp6_send(c, VHOST_USER_RX_QUEUE, saddr, daddr, buf,
> +		tap_icmp6_send(c, flow_rx_virtqueue(&pingf->f), saddr, daddr, buf,
>  			       pingf->f.tap_omac, n);
>  	}
>  	return;
> @@ -238,17 +238,18 @@ cancel:
>  
>  /**
>   * icmp_tap_handler() - Handle packets from tap
> - * @c:		Execution context
> - * @pif:	pif on which the packet is arriving
> - * @af:		Address family, AF_INET or AF_INET6
> - * @saddr:	Source address
> - * @daddr:	Destination address
> - * @data:	Single packet with ICMP/ICMPv6 header
> - * @now:	Current timestamp
> + * @c:			Execution context
> + * @incoming_queue:	Incoming queue number
> + * @pif:		pif on which the packet is arriving
> + * @af:			Address family, AF_INET or AF_INET6
> + * @saddr:		Source address
> + * @daddr:		Destination address
> + * @data:		Single packet with ICMP/ICMPv6 header
> + * @now:		Current timestamp
>   *
>   * Return: count of consumed packets (always 1, even if malformed)
>   */
> -int icmp_tap_handler(const struct ctx *c, uint8_t pif, sa_family_t af,
> +int icmp_tap_handler(const struct ctx *c, int incoming_queue, uint8_t pif, sa_family_t af,
>  		     const void *saddr, const void *daddr,
>  		     struct iov_tail *data, const struct timespec *now)
>  {
> @@ -309,6 +310,8 @@ int icmp_tap_handler(const struct ctx *c, uint8_t pif, sa_family_t af,
>  	else if (!(pingf = icmp_ping_new(c, af, id, saddr, daddr)))
>  		return 1;
>  
> +	flow_queue_set(&pingf->f, incoming_queue);
> +

I'd kind of like the initial setting of the flow's queue to be tied to
one of the existing flow state transitions, to ensure it can't be
forgotten.

>  	tgt = &pingf->f.side[TGTSIDE];
>  
>  	ASSERT(flow_proto[pingf->f.type] == proto);
> diff --git a/icmp.h b/icmp.h
> index 1a0e6205f087..6d6d6358bb33 100644
> --- a/icmp.h
> +++ b/icmp.h
> @@ -10,7 +10,7 @@ struct ctx;
>  struct icmp_ping_flow;
>  
>  void icmp_sock_handler(const struct ctx *c, union epoll_ref ref);
> -int icmp_tap_handler(const struct ctx *c, uint8_t pif, sa_family_t af,
> +int icmp_tap_handler(const struct ctx *c, int incoming_queue, uint8_t pif, sa_family_t af,
>  		     const void *saddr, const void *daddr,
>  		     struct iov_tail *data, const struct timespec *now);
>  void icmp_init(void);
> diff --git a/tap.c b/tap.c
> index 1308d49242e8..9a4399d947a3 100644
> --- a/tap.c
> +++ b/tap.c
> @@ -702,15 +702,17 @@ static bool tap4_is_fragment(const struct iphdr *iph,
>  
>  /**
>   * tap4_handler() - IPv4 and ARP packet handler for tap file descriptor
> - * @c:		Execution context
> - * @in:		Ingress packet pool, packets with Ethernet headers
> - * @now:	Current timestamp
> + * @c:			Execution context
> + * @incoming_queue:	Incoming queue number
> + * @in:			Ingress packet pool, packets with Ethernet headers
> + * @now:		Current timestamp
>   *
>   * Return: count of packets consumed by handlers
>   */
> -static int tap4_handler(struct ctx *c, const struct pool *in,
> -			const struct timespec *now)
> +static int tap4_handler(struct ctx *c, int incoming_queue,
> +			const struct pool *in, const struct timespec *now)
>  {
> +	int outgoing_queue = incoming_queue & ~1;
>  	unsigned int i, j, seq_count;
>  	struct tap4_l4_t *seq;
>  
> @@ -736,7 +738,7 @@ resume:
>  		if (!eh)
>  			continue;
>  		if (ntohs(eh->h_proto) == ETH_P_ARP) {
> -			arp(c, VHOST_USER_RX_QUEUE, &data);
> +			arp(c, outgoing_queue, &data);
>  			continue;
>  		}
>  
> @@ -783,7 +785,7 @@ resume:
>  
>  			tap_packet_debug(iph, NULL, NULL, 0, NULL, 1);
>  
> -			icmp_tap_handler(c, PIF_TAP, AF_INET,
> +			icmp_tap_handler(c, incoming_queue, PIF_TAP, AF_INET,
>  					 &iph->saddr, &iph->daddr,
>  					 &data, now);
>  			continue;
> @@ -797,7 +799,7 @@ resume:
>  			struct iov_tail eh_data;
>  
>  			packet_get(in, i, &eh_data);
> -			if (dhcp(c, VHOST_USER_RX_QUEUE, &eh_data))
> +			if (dhcp(c, outgoing_queue, &eh_data))
>  				continue;
>  		}
>  
> @@ -860,14 +862,14 @@ append:
>  			if (c->no_tcp)
>  				continue;
>  			for (k = 0; k < p->count; )
> -				k += tcp_tap_handler(c, PIF_TAP, AF_INET,
> +				k += tcp_tap_handler(c, incoming_queue, PIF_TAP, AF_INET,
>  						     &seq->saddr, &seq->daddr,
>  						     0, p, k, now);
>  		} else if (seq->protocol == IPPROTO_UDP) {
>  			if (c->no_udp)
>  				continue;
>  			for (k = 0; k < p->count; )
> -				k += udp_tap_handler(c, PIF_TAP, AF_INET,
> +				k += udp_tap_handler(c, incoming_queue, PIF_TAP, AF_INET,
>  						     &seq->saddr, &seq->daddr,
>  						     seq->ttl, p, k, now);
>  		}
> @@ -881,15 +883,17 @@ append:
>  
>  /**
>   * tap6_handler() - IPv6 packet handler for tap file descriptor
> - * @c:		Execution context
> - * @in:		Ingress packet pool, packets with Ethernet headers
> - * @now:	Current timestamp
> + * @c:			Execution context
> + * @incoming_queue:	Incoming queue number
> + * @in:			Ingress packet pool, packets with Ethernet headers
> + * @now:		Current timestamp
>   *
>   * Return: count of packets consumed by handlers
>   */
> -static int tap6_handler(struct ctx *c, const struct pool *in,
> -			const struct timespec *now)
> +static int tap6_handler(struct ctx *c, int incoming_queue,
> +			const struct pool *in, const struct timespec *now)
>  {
> +	int outgoing_queue = incoming_queue & ~1;
>  	unsigned int i, j, seq_count = 0;
>  	struct tap6_l4_t *seq;
>  
> @@ -965,12 +969,12 @@ resume:
>  				continue;
>  
>  			ndp_data = data;
> -			if (ndp(c, VHOST_USER_RX_QUEUE, saddr, &ndp_data))
> +			if (ndp(c, outgoing_queue, saddr, &ndp_data))
>  				continue;
>  
>  			tap_packet_debug(NULL, ip6h, NULL, proto, NULL, 1);
>  
> -			icmp_tap_handler(c, PIF_TAP, AF_INET6,
> +			icmp_tap_handler(c, incoming_queue, PIF_TAP, AF_INET6,
>  					 saddr, daddr, &data, now);
>  			continue;
>  		}
> @@ -984,8 +988,7 @@ resume:
>  		if (proto == IPPROTO_UDP) {
>  			struct iov_tail uh_data = data;
>  
> -			if (dhcpv6(c, VHOST_USER_RX_QUEUE, &uh_data, saddr,
> -				   daddr))
> +			if (dhcpv6(c, outgoing_queue, &uh_data, saddr, daddr))
>  				continue;
>  		}
>  
> @@ -1053,14 +1056,14 @@ append:
>  			if (c->no_tcp)
>  				continue;
>  			for (k = 0; k < p->count; )
> -				k += tcp_tap_handler(c, PIF_TAP, AF_INET6,
> +				k += tcp_tap_handler(c, incoming_queue, PIF_TAP, AF_INET6,
>  						     &seq->saddr, &seq->daddr,
>  						     seq->flow_lbl, p, k, now);
>  		} else if (seq->protocol == IPPROTO_UDP) {
>  			if (c->no_udp)
>  				continue;
>  			for (k = 0; k < p->count; )
> -				k += udp_tap_handler(c, PIF_TAP, AF_INET6,
> +				k += udp_tap_handler(c, incoming_queue, PIF_TAP, AF_INET6,
>  						     &seq->saddr, &seq->daddr,
>  						     seq->hop_limit, p, k, now);
>  		}
> @@ -1083,22 +1086,24 @@ void tap_flush_pools(void)
>  
>  /**
>   * tap_handler() - IPv4/IPv6 and ARP packet handler for tap file descriptor
> - * @c:		Execution context
> - * @now:	Current timestamp
> + * @c:			Execution context
> + * @incoming_queue:	Incoming queue number
> + * @now:		Current timestamp
>   */
> -void tap_handler(struct ctx *c, const struct timespec *now)
> +void tap_handler(struct ctx *c, int incoming_queue, const struct timespec *now)
>  {
> -	tap4_handler(c, pool_tap4, now);
> -	tap6_handler(c, pool_tap6, now);
> +	tap4_handler(c, incoming_queue, pool_tap4, now);
> +	tap6_handler(c, incoming_queue, pool_tap6, now);
>  }
>  
>  /**
>   * tap_add_packet() - Queue/capture packet, update notion of guest MAC address
> - * @c:		Execution context
> - * @data:	Packet to add to the pool
> - * @now:	Current timestamp
> + * @c:			Execution context
> + * @incoming_queue:	Incoming queue number
> + * @data:		Packet to add to the pool
> + * @now:		Current timestamp
>   */
> -void tap_add_packet(struct ctx *c, struct iov_tail *data,
> +void tap_add_packet(struct ctx *c, int incoming_queue, struct iov_tail *data,
>  		    const struct timespec *now)
>  {
>  	struct ethhdr eh_storage;
> @@ -1123,14 +1128,14 @@ void tap_add_packet(struct ctx *c, struct iov_tail *data,
>  	case ETH_P_ARP:
>  	case ETH_P_IP:
>  		if (!pool_can_fit(pool_tap4, data)) {
> -			tap4_handler(c, pool_tap4, now);
> +			tap4_handler(c, incoming_queue, pool_tap4, now);

This is a bit murky.  You're using the incoming_queue for the new
packet to process all the existiing packets in the pool.  I guess that
will work out because you'll have a separate pool for each
thread/queue.  But that's kind of unclear at this point.  Maybe the
queue number should be more explicitly part of the pool metadata?

>  			pool_flush(pool_tap4);
>  		}
>  		packet_add(pool_tap4, data);
>  		break;
>  	case ETH_P_IPV6:
>  		if (!pool_can_fit(pool_tap6, data)) {
> -			tap6_handler(c, pool_tap6, now);
> +			tap6_handler(c, incoming_queue, pool_tap6, now);
>  			pool_flush(pool_tap6);
>  		}
>  		packet_add(pool_tap6, data);
> @@ -1217,7 +1222,7 @@ static void tap_passt_input(struct ctx *c, const struct timespec *now)
>  		n -= sizeof(uint32_t);
>  
>  		data = IOV_TAIL_FROM_BUF(p, l2len, 0);
> -		tap_add_packet(c, &data, now);
> +		tap_add_packet(c, 0, &data, now);
>  
>  		p += l2len;
>  		n -= l2len;
> @@ -1226,7 +1231,7 @@ static void tap_passt_input(struct ctx *c, const struct timespec *now)
>  	partial_len = n;
>  	partial_frame = p;
>  
> -	tap_handler(c, now);
> +	tap_handler(c, 0, now);
>  }
>  
>  /**
> @@ -1285,10 +1290,10 @@ static void tap_pasta_input(struct ctx *c, const struct timespec *now)
>  			continue;
>  
>  		data = IOV_TAIL_FROM_BUF(pkt_buf + n, len, 0);
> -		tap_add_packet(c, &data, now);
> +		tap_add_packet(c, 0, &data, now);
>  	}
>  
> -	tap_handler(c, now);
> +	tap_handler(c, 0, now);
>  }
>  
>  /**
> diff --git a/tap.h b/tap.h
> index 76403a43edbc..fe9455ffcf4b 100644
> --- a/tap.h
> +++ b/tap.h
> @@ -119,7 +119,8 @@ int tap_sock_unix_open(char *sock_path);
>  void tap_sock_reset(struct ctx *c);
>  void tap_backend_init(struct ctx *c);
>  void tap_flush_pools(void);
> -void tap_handler(struct ctx *c, const struct timespec *now);
> -void tap_add_packet(struct ctx *c, struct iov_tail *data,
> +void tap_handler(struct ctx *c, int incoming_queue,
> +		 const struct timespec *now);
> +void tap_add_packet(struct ctx *c, int incoming_queue, struct iov_tail *data,
>  		    const struct timespec *now);
>  #endif /* TAP_H */
> diff --git a/tcp.c b/tcp.c
> index 5ce34baa8a5a..78494a2dc69f 100644
> --- a/tcp.c
> +++ b/tcp.c
> @@ -1496,21 +1496,23 @@ static void tcp_bind_outbound(const struct ctx *c,
>  
>  /**
>   * tcp_conn_from_tap() - Handle connection request (SYN segment) from tap
> - * @c:		Execution context
> - * @af:		Address family, AF_INET or AF_INET6
> - * @saddr:	Source address, pointer to in_addr or in6_addr
> - * @daddr:	Destination address, pointer to in_addr or in6_addr
> - * @th:		TCP header from tap: caller MUST ensure it's there
> - * @opts:	Pointer to start of options
> - * @optlen:	Bytes in options: caller MUST ensure available length
> - * @now:	Current timestamp
> + * @c:			Execution context
> + * @incoming_queue:	TX queue number for the flow
> + * @af:			Address family, AF_INET or AF_INET6
> + * @saddr:		Source address, pointer to in_addr or in6_addr
> + * @daddr:		Destination address, pointer to in_addr or in6_addr
> + * @th:			TCP header from tap: caller MUST ensure it's there
> + * @opts:		Pointer to start of options
> + * @optlen:		Bytes in options: caller MUST ensure available length
> + * @now:		Current timestamp
>   *
>   * #syscalls:vu getsockname
>   */
> -static void tcp_conn_from_tap(const struct ctx *c, sa_family_t af,
> -			      const void *saddr, const void *daddr,
> -			      const struct tcphdr *th, const char *opts,
> -			      size_t optlen, const struct timespec *now)
> +static void tcp_conn_from_tap(const struct ctx *c, int incoming_queue,
> +			      sa_family_t af, const void *saddr,
> +			      const void *daddr, const struct tcphdr *th,
> +			      const char *opts, size_t optlen,
> +			      const struct timespec *now)
>  {
>  	in_port_t srcport = ntohs(th->source);
>  	in_port_t dstport = ntohs(th->dest);
> @@ -1622,6 +1624,7 @@ static void tcp_conn_from_tap(const struct ctx *c, sa_family_t af,
>  		conn_event(c, conn, TAP_SYN_ACK_SENT);
>  	}
>  
> +	flow_queue_set(&conn->f, incoming_queue);
>  	tcp_epoll_ctl(c, conn);
>  
>  	if (c->mode == MODE_VU) { /* To rebind to same oport after migration */
> @@ -1983,16 +1986,16 @@ static void tcp_conn_from_sock_finish(const struct ctx *c,
>  
>  /**
>   * tcp_rst_no_conn() - Send RST in response to a packet with no connection
> - * @c:		Execution context
> - * @queue:	Queue to use to send the reply
> - * @af:		Address family, AF_INET or AF_INET6
> - * @saddr:	Source address of the packet we're responding to
> - * @daddr:	Destination address of the packet we're responding to
> - * @flow_lbl:	IPv6 flow label (ignored for IPv4)
> - * @th:		TCP header of the packet we're responding to
> - * @l4len:	Packet length, including TCP header
> - */
> -static void tcp_rst_no_conn(const struct ctx *c, int queue, int af,
> + * @c:			Execution context
> + * @outgoing_queue:	Queue to use to send the reply
> + * @af:			Address family, AF_INET or AF_INET6
> + * @saddr:		Source address of the packet we're responding to
> + * @daddr:		Destination address of the packet we're responding to
> + * @flow_lbl:		IPv6 flow label (ignored for IPv4)
> + * @th:			TCP header of the packet we're responding to
> + * @l4len:		Packet length, including TCP header
> + */
> +static void tcp_rst_no_conn(const struct ctx *c, int outgoing_queue, int af,
>  			    const void *saddr, const void *daddr,
>  			    uint32_t flow_lbl,
>  			    const struct tcphdr *th, size_t l4len)
> @@ -2050,24 +2053,25 @@ static void tcp_rst_no_conn(const struct ctx *c, int queue, int af,
>  
>  	tcp_update_csum(psum, rsth, &payload);
>  	rst_l2len = ((char *)rsth - buf) + sizeof(*rsth);
> -	tap_send_single(c, queue, buf, rst_l2len);
> +	tap_send_single(c, outgoing_queue, buf, rst_l2len);
>  }
>  
>  /**
>   * tcp_tap_handler() - Handle packets from tap and state transitions
> - * @c:		Execution context
> - * @pif:	pif on which the packet is arriving
> - * @af:		Address family, AF_INET or AF_INET6
> - * @saddr:	Source address
> - * @daddr:	Destination address
> - * @flow_lbl:	IPv6 flow label (ignored for IPv4)
> - * @p:		Pool of TCP packets, with TCP headers
> - * @idx:	Index of first packet in pool to process
> - * @now:	Current timestamp
> + * @c:			Execution context
> + * @incoming_queue:	Incoming queue number
> + * @pif:		pif on which the packet is arriving
> + * @af:			Address family, AF_INET or AF_INET6
> + * @saddr:		Source address
> + * @daddr:		Destination address
> + * @flow_lbl:		IPv6 flow label (ignored for IPv4)
> + * @p:			Pool of TCP packets, with TCP headers
> + * @idx:		Index of first packet in pool to process
> + * @now:		Current timestamp
>   *
>   * Return: count of consumed packets
>   */
> -int tcp_tap_handler(const struct ctx *c, uint8_t pif,
> +int tcp_tap_handler(const struct ctx *c, int incoming_queue, uint8_t pif,
>  		    sa_family_t af, const void *saddr, const void *daddr,
>  		    uint32_t flow_lbl, const struct pool *p, int idx,
>  		    const struct timespec *now)
> @@ -2107,11 +2111,11 @@ int tcp_tap_handler(const struct ctx *c, uint8_t pif,
>  	/* New connection from tap */
>  	if (!flow) {
>  		if (opts && th->syn && !th->ack)
> -			tcp_conn_from_tap(c, af, saddr, daddr, th,
> +			tcp_conn_from_tap(c, incoming_queue, af, saddr, daddr, th,
>  					  opts, optlen, now);
>  		else
> -			tcp_rst_no_conn(c, VHOST_USER_RX_QUEUE, af, saddr,
> -					daddr, flow_lbl, th, l4len);
> +			tcp_rst_no_conn(c, incoming_queue & ~1, af, saddr, daddr,
> +					flow_lbl, th, l4len);
>  		return 1;
>  	}
>  
> @@ -2119,6 +2123,9 @@ int tcp_tap_handler(const struct ctx *c, uint8_t pif,
>  	ASSERT(pif_at_sidx(sidx) == PIF_TAP);
>  	conn = &flow->tcp;
>  
> +	/* update queue */
> +	flow_queue_set(&flow->f, incoming_queue);
> +
>  	flow_trace(conn, "packet length %zu from tap", l4len);
>  
>  	if (th->rst) {
> diff --git a/tcp.h b/tcp.h
> index 320683ce5679..cddd36cadc97 100644
> --- a/tcp.h
> +++ b/tcp.h
> @@ -15,7 +15,7 @@ void tcp_listen_handler(const struct ctx *c, union epoll_ref ref,
>  			const struct timespec *now);
>  void tcp_sock_handler(const struct ctx *c,
>  		      union epoll_ref ref, uint32_t events);
> -int tcp_tap_handler(const struct ctx *c, uint8_t pif,
> +int tcp_tap_handler(const struct ctx *c, int incoming_queue, uint8_t pif,
>  		    sa_family_t af, const void *saddr, const void *daddr,
>  		    uint32_t flow_lbl, const struct pool *p, int idx,
>  		    const struct timespec *now);
> diff --git a/tcp_vu.c b/tcp_vu.c
> index 1c81ce376dad..40f552087bc5 100644
> --- a/tcp_vu.c
> +++ b/tcp_vu.c
> @@ -70,15 +70,16 @@ static size_t tcp_vu_hdrlen(bool v6)
>   */
>  int tcp_vu_send_flag(const struct ctx *c, struct tcp_tap_conn *conn, int flags)
>  {
> +	int rx_queue = flow_rx_virtqueue(&conn->f);
>  	struct vu_dev *vdev = c->vdev;
> -	struct vu_virtq *vq = &vdev->vq[VHOST_USER_RX_QUEUE];
> -	size_t optlen, hdrlen;
> +	struct vu_virtq *vq = &vdev->vq[rx_queue];
>  	struct vu_virtq_element flags_elem[2];
>  	struct ipv6hdr *ip6h = NULL;
>  	struct iphdr *ip4h = NULL;
>  	struct iovec flags_iov[2];
>  	struct tcp_syn_opts *opts;
>  	struct iov_tail payload;
> +	size_t optlen, hdrlen;
>  	struct tcphdr *th;
>  	struct ethhdr *eh;
>  	uint32_t seq;
> @@ -348,8 +349,9 @@ static void tcp_vu_prepare(const struct ctx *c, struct tcp_tap_conn *conn,
>  int tcp_vu_data_from_sock(const struct ctx *c, struct tcp_tap_conn *conn)
>  {
>  	uint32_t wnd_scaled = conn->wnd_from_tap << conn->ws_from_tap;
> +	int rx_queue = flow_rx_virtqueue(&conn->f);
>  	struct vu_dev *vdev = c->vdev;
> -	struct vu_virtq *vq = &vdev->vq[VHOST_USER_RX_QUEUE];
> +	struct vu_virtq *vq = &vdev->vq[rx_queue];
>  	ssize_t len, previous_dlen;
>  	int i, iov_cnt, head_cnt;
>  	size_t hdrlen, fillsize;
> diff --git a/udp.c b/udp.c
> index 868ffebb5802..a0eb719888cd 100644
> --- a/udp.c
> +++ b/udp.c
> @@ -636,14 +636,15 @@ static int udp_sock_recverr(const struct ctx *c, int s, flow_sidx_t sidx,
>  	if (hdr->cmsg_level == IPPROTO_IP &&
>  	    (o4 = inany_v4(&otap)) && inany_v4(&toside->eaddr)) {
>  		dlen = MIN(dlen, ICMP4_MAX_DLEN);
> -		udp_send_tap_icmp4(c, VHOST_USER_RX_QUEUE, ee, toside, *o4,
> -				   data, dlen);
> +		udp_send_tap_icmp4(c, flow_rx_virtqueue(&uflow->f), ee, toside,

Kind of a something for an earlier patch, but given how frequently and
where it's used, it's worth having a shorter name for
flow_rx_virtqueue().  And probably a wrapper macro so it can be called
on uflow directly as well.

> +				   *o4, data, dlen);
>  		return 1;
>  	}
>  
>  	if (hdr->cmsg_level == IPPROTO_IPV6 && !inany_v4(&toside->eaddr)) {
> -		udp_send_tap_icmp6(c, VHOST_USER_RX_QUEUE, ee, toside,
> -				   &otap.a6, data, dlen, FLOW_IDX(uflow));
> +		udp_send_tap_icmp6(c, flow_rx_virtqueue(&uflow->f), ee,
> +				   toside, &otap.a6, data, dlen,
> +				   FLOW_IDX(uflow));
>  		return 1;
>  	}
>  
> @@ -971,25 +972,27 @@ fail:
>  
>  /**
>   * udp_tap_handler() - Handle packets from tap
> - * @c:		Execution context
> - * @pif:	pif on which the packet is arriving
> - * @af:		Address family, AF_INET or AF_INET6
> - * @saddr:	Source address
> - * @daddr:	Destination address
> - * @ttl:	TTL or hop limit for packets to be sent in this call
> - * @p:		Pool of UDP packets, with UDP headers
> - * @idx:	Index of first packet to process
> - * @now:	Current timestamp
> + * @c:			Execution context
> + * @incoming_queue:	Incoming queue number
> + * @pif:		pif on which the packet is arriving
> + * @af:			Address family, AF_INET or AF_INET6
> + * @saddr:		Source address
> + * @daddr:		Destination address
> + * @ttl:		TTL or hop limit for packets to be sent in this call
> + * @p:			Pool of UDP packets, with UDP headers
> + * @idx:		Index of first packet to process
> + * @now:		Current timestamp
>   *
>   * Return: count of consumed packets
>   *
>   * #syscalls sendmmsg
>   */
> -int udp_tap_handler(const struct ctx *c, uint8_t pif,
> +int udp_tap_handler(const struct ctx *c, int incoming_queue, uint8_t pif,
>  		    sa_family_t af, const void *saddr, const void *daddr,
>  		    uint8_t ttl, const struct pool *p, int idx,
>  		    const struct timespec *now)
>  {
> +	int outgoing_queue = incoming_queue & ~1;
>  	const struct flowside *toside;
>  	struct mmsghdr mm[UIO_MAXIOV];
>  	union sockaddr_inany to_sa;
> @@ -1019,7 +1022,7 @@ int udp_tap_handler(const struct ctx *c, uint8_t pif,
>  	src = ntohs(uh->source);
>  	dst = ntohs(uh->dest);
>  
> -	tosidx = udp_flow_from_tap(c, pif, af, saddr, daddr, src, dst, now);
> +	tosidx = udp_flow_from_tap(c, outgoing_queue, pif, af, saddr, daddr, src, dst, now);
>  	if (!(uflow = udp_at_sidx(tosidx))) {
>  		char sstr[INET6_ADDRSTRLEN], dstr[INET6_ADDRSTRLEN];
>  
> diff --git a/udp.h b/udp.h
> index f1d83f380b3f..8ba4ccfe646a 100644
> --- a/udp.h
> +++ b/udp.h
> @@ -7,11 +7,13 @@
>  #define UDP_H
>  
>  void udp_portmap_clear(void);
> -void udp_listen_sock_handler(const struct ctx *c, union epoll_ref ref,
> -			     uint32_t events, const struct timespec *now);
> -void udp_sock_handler(const struct ctx *c, union epoll_ref ref,
> -		      uint32_t events, const struct timespec *now);
> -int udp_tap_handler(const struct ctx *c, uint8_t pif,
> +void udp_listen_sock_handler(const struct ctx *c,
> +			     union epoll_ref ref, uint32_t events,
> +			     const struct timespec *now);
> +void udp_sock_handler(const struct ctx *c,
> +		      union epoll_ref ref, uint32_t events,
> +		      const struct timespec *now);
> +int udp_tap_handler(const struct ctx *c, int incoming_queue, uint8_t pif,
>  		    sa_family_t af, const void *saddr, const void *daddr,
>  		    uint8_t ttl, const struct pool *p, int idx,
>  		    const struct timespec *now);
> diff --git a/udp_flow.c b/udp_flow.c
> index 8907f2f72741..b4a709b8d976 100644
> --- a/udp_flow.c
> +++ b/udp_flow.c
> @@ -266,17 +266,19 @@ flow_sidx_t udp_flow_from_sock(const struct ctx *c, uint8_t pif,
>  /**
>   * udp_flow_from_tap() - Find or create UDP flow for tap packets
>   * @c:		Execution context
> + * @queue:	RX queue number for the flow
>   * @pif:	pif on which the packet is arriving
>   * @af:		Address family, AF_INET or AF_INET6
>   * @saddr:	Source address on guest side
>   * @daddr:	Destination address guest side
>   * @srcport:	Source port on guest side
>   * @dstport:	Destination port on guest side
> + * @now:	Current timestamp
>   *
>   * Return: sidx for the destination side of the flow for this packet, or
>   *         FLOW_SIDX_NONE if we couldn't find or create a flow.
>   */
> -flow_sidx_t udp_flow_from_tap(const struct ctx *c,
> +flow_sidx_t udp_flow_from_tap(const struct ctx *c, int queue,
>  			      uint8_t pif, sa_family_t af,
>  			      const void *saddr, const void *daddr,
>  			      in_port_t srcport, in_port_t dstport,
> @@ -293,6 +295,8 @@ flow_sidx_t udp_flow_from_tap(const struct ctx *c,
>  			      srcport, dstport);
>  	if ((uflow = udp_at_sidx(sidx))) {
>  		uflow->ts = now->tv_sec;
> +		/* update queue */
> +		flow_queue_set(&uflow->f, queue);
>  		return flow_sidx_opposite(sidx);
>  	}
>  
> @@ -316,6 +320,8 @@ flow_sidx_t udp_flow_from_tap(const struct ctx *c,
>  		return FLOW_SIDX_NONE;
>  	}
>  
> +	flow_queue_set(&flow->f, queue);
> +
>  	return udp_flow_new(c, flow, now);
>  }
>  
> diff --git a/udp_flow.h b/udp_flow.h
> index 4c528e95ca66..4a057a9d44a8 100644
> --- a/udp_flow.h
> +++ b/udp_flow.h
> @@ -36,7 +36,7 @@ flow_sidx_t udp_flow_from_sock(const struct ctx *c, uint8_t pif,
>  			       const union inany_addr *dst, in_port_t port,
>  			       const union sockaddr_inany *s_in,
>  			       const struct timespec *now);
> -flow_sidx_t udp_flow_from_tap(const struct ctx *c,
> +flow_sidx_t udp_flow_from_tap(const struct ctx *c, int queue,
>  			      uint8_t pif, sa_family_t af,
>  			      const void *saddr, const void *daddr,
>  			      in_port_t srcport, in_port_t dstport,
> diff --git a/udp_vu.c b/udp_vu.c
> index 099677f914e7..cd2c9c516d44 100644
> --- a/udp_vu.c
> +++ b/udp_vu.c
> @@ -202,9 +202,11 @@ static void udp_vu_csum(const struct flowside *toside, int iov_used)
>  void udp_vu_sock_to_tap(const struct ctx *c, int s, int n, flow_sidx_t tosidx)
>  {
>  	const struct flowside *toside = flowside_at_sidx(tosidx);
> +	const struct udp_flow *uflow = udp_at_sidx(tosidx);
>  	bool v6 = !(inany_v4(&toside->eaddr) && inany_v4(&toside->oaddr));
> +	int rx_queue = flow_rx_virtqueue(&uflow->f);
>  	struct vu_dev *vdev = c->vdev;
> -	struct vu_virtq *vq = &vdev->vq[VHOST_USER_RX_QUEUE];
> +	struct vu_virtq *vq = &vdev->vq[rx_queue];
>  	int i;
>  
>  	for (i = 0; i < n; i++) {
> diff --git a/vu_common.c b/vu_common.c
> index 8904403e66af..56f26317b192 100644
> --- a/vu_common.c
> +++ b/vu_common.c
> @@ -196,11 +196,11 @@ static void vu_handle_tx(struct vu_dev *vdev, int index,
>  
>  		data = IOV_TAIL(elem[count].out_sg, elem[count].out_num, 0);
>  		if (IOV_DROP_HEADER(&data, struct virtio_net_hdr_mrg_rxbuf))
> -			tap_add_packet(vdev->context, &data, now);
> +			tap_add_packet(vdev->context, index, &data, now);
>  
>  		count++;
>  	}
> -	tap_handler(vdev->context, now);
> +	tap_handler(vdev->context, index, now);
>  
>  	if (count) {
>  		int i;
> -- 
> 2.51.0
> 

-- 
David Gibson (he or they)	| I'll have my music baroque, and my code
david AT gibson.dropbear.id.au	| minimalist, thank you, not the other way
				| around.
http://www.ozlabs.org/~dgibson

[-- Attachment #2: signature.asc --]
[-- Type: application/pgp-signature, Size: 833 bytes --]

  reply	other threads:[~2025-11-10  5:54 UTC|newest]

Thread overview: 13+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2025-11-07 14:38 [PATCH 0/4] vhost-user: Add " Laurent Vivier
2025-11-07 14:38 ` [PATCH 1/4] vhost-user: Enable multiqueue Laurent Vivier
2025-11-10  4:48   ` David Gibson
2025-11-17 15:26     ` Laurent Vivier
2025-11-18  0:16       ` David Gibson
2025-11-07 14:38 ` [PATCH 2/4] vhost-user: Add queue parameter throughout the network stack Laurent Vivier
2025-11-10  5:19   ` David Gibson
2025-11-07 14:39 ` [PATCH 3/4] multiqueue: Add queue-aware flow management for multiqueue support Laurent Vivier
2025-11-10  5:54   ` David Gibson [this message]
2025-11-07 14:39 ` [PATCH 4/4] test: Add multiqueue support to vhost-user test infrastructure Laurent Vivier
2025-11-10  5:57   ` David Gibson
2025-11-10  4:40 ` [PATCH 0/4] vhost-user: Add multiqueue support David Gibson
2025-11-10  6:00   ` David Gibson

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=aRF-BiIO69FDaUSU@zatzit \
    --to=david@gibson.dropbear.id.au \
    --cc=lvivier@redhat.com \
    --cc=passt-dev@passt.top \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
Code repositories for project(s) associated with this public inbox

	https://passt.top/passt

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for IMAP folder(s).