public inbox for passt-dev@passt.top
 help / color / mirror / code / Atom feed
From: David Gibson <david@gibson.dropbear.id.au>
To: Laurent Vivier <lvivier@redhat.com>
Cc: passt-dev@passt.top
Subject: Re: [PATCH v5 11/12] flow: Add queue pair tracking to flow management
Date: Fri, 19 Jun 2026 16:36:07 +1000	[thread overview]
Message-ID: <ajTjVwvVHM69nU1T@zatzit> (raw)
In-Reply-To: <20260616125130.1324274-12-lvivier@redhat.com>

[-- Attachment #1: Type: text/plain, Size: 14698 bytes --]

On Tue, Jun 16, 2026 at 02:51:29PM +0200, Laurent Vivier wrote:
> Add a qpair field (5 bits) to struct flow_common, with
> FLOW_QPAIR_INVALID as sentinel for unassigned flows.  Provide
> flow_setqp()/FLOW_SETQP() to assign and flow_qp()/FLOW_QP() to
> query the queue pair.

This seems like it ought to be earlier in the series, since getting
the qp from the flow seems like it should obviate some of the plumbing
the last half dozen patches have been doing.

> All protocol handlers (TCP, UDP, ICMP) set the queue pair on new
> flows via FLOW_SETQP(), and update it on each packet received from
> tap for existing flows, implementing virtio receive steering: return
> traffic is directed to the RX queue matching the originating TX
> queue.
> 
> tcp_keepalive() and tcp_inactivity() now filter by queue pair so
> each worker only processes its own flows.
> 
> tcp_buf.c uses conn->f.qpair instead of hardcoding QPAIR_DEFAULT
> for consistency, though this path is only used in non-vhost-user
> mode where the queue pair is always 0.

That's not unreasonable, except by reading the qpair from the flow
later in the to-guest path, I think we should avoid some of the
plumbing and therefore this oddity.

> Flows initiated from the host side default to queue pair 0.

"default" implies that under some conditions they will go somewhere
else, but I don't think that's (yet) the case, right?

> 
> Signed-off-by: Laurent Vivier <lvivier@redhat.com>
> ---
>  flow.c       | 34 ++++++++++++++++++++++++++++++++++
>  flow.h       | 18 +++++++++++++++++-
>  icmp.c       |  8 +++++---
>  tcp.c        | 20 ++++++++++++++++----
>  tcp_buf.c    | 10 +++++-----
>  tcp_splice.c |  1 +
>  udp_flow.c   |  7 +++++--
>  7 files changed, 83 insertions(+), 15 deletions(-)
> 
> diff --git a/flow.c b/flow.c
> index c93b73549c90..bf855fe0dfaf 100644
> --- a/flow.c
> +++ b/flow.c
> @@ -415,6 +415,39 @@ void flow_epollid_register(int epollid, int epollfd)
>  	epoll_id_to_fd[epollid] = epollfd;
>  }
>  
> +/**
> + * flow_qp() - Get the queue pair for a flow
> + * @f:		Flow to query (may be NULL)
> + *
> + * Return: queue pair number for the flow, or 0 if flow is NULL or has no
> + *         valid queue pair assignment
> + */
> +/* cppcheck-suppress unusedFunction */
> +unsigned int flow_qp(const struct flow_common *f)
> +{
> +	if (f == NULL || f->qpair == FLOW_QPAIR_INVALID)
> +		return QPAIR_DEFAULT;
> +	return f->qpair;
> +}
> +
> +/**
> + * flow_setqp() - Set queue pair assignment for a flow
> + * @f:		Flow to update
> + * @qpair:	Queue pair number to assign
> + */
> +void flow_setqp(struct flow_common *f, unsigned int qpair)
> +{
> +	assert(qpair < FLOW_QPAIR_MAX);
> +
> +	if (f->qpair == qpair)
> +		return;
> +
> +	flow_trace((union flow *)f, "updating queue pair from %d to %d",
> +		   f->qpair, qpair);
> +
> +	f->qpair = qpair;
> +}
> +
>  /**
>   * flow_initiate_() - Move flow to INI, setting pif[INISIDE]
>   * @flow:	Flow to change state
> @@ -636,6 +669,7 @@ union flow *flow_alloc(void)
>  
>  	flow_new_entry = flow;
>  	memset(flow, 0, sizeof(*flow));
> +	flow->f.qpair = FLOW_QPAIR_INVALID;
>  	flow_set_state(&flow->f, FLOW_STATE_NEW);
>  
>  	return flow;
> diff --git a/flow.h b/flow.h
> index cae259fe7037..3c74dcbd95c4 100644
> --- a/flow.h
> +++ b/flow.h
> @@ -184,7 +184,8 @@ int flowside_connect(const struct ctx *c, int s,
>   * @pif[]:	Interface for each side of the flow
>   * @side[]:	Information for each side of the flow
>   * @tap_omac:	MAC address of remote endpoint as seen from the guest
> - * @epollid:	epollfd identifier
> + * @qpair:	Queue pair number assigned to this flow
> + *		(FLOW_QPAIR_INVALID if not assigned)
>   */
>  struct flow_common {
>  #ifdef __GNUC__
> @@ -205,11 +206,19 @@ struct flow_common {
>  
>  #define EPOLLFD_ID_BITS 8
>  	unsigned int	epollid:EPOLLFD_ID_BITS;
> +#define FLOW_QPAIR_BITS 5
> +	unsigned int	qpair:FLOW_QPAIR_BITS;
>  };
>  
>  #define EPOLLFD_ID_DEFAULT	0
>  #define EPOLLFD_ID_SIZE		(1 << EPOLLFD_ID_BITS)
>  
> +#define FLOW_QPAIR_NUM		(1 << FLOW_QPAIR_BITS)
> +#define FLOW_QPAIR_MAX		(FLOW_QPAIR_NUM - 1)
> +#define FLOW_QPAIR_INVALID	FLOW_QPAIR_MAX
> +
> +static_assert(VHOST_USER_MAX_VQS <= FLOW_QPAIR_MAX * 2);

Should be strictly <, shouldn't it, since FLOW_QPAIR_MAX is reserved
for INVALID and can't be used for a real queue, right?

>  #define FLOW_INDEX_BITS		17	/* 128k - 1 */
>  #define FLOW_MAX		MAX_FROM_BITS(FLOW_INDEX_BITS)
>  
> @@ -270,6 +279,13 @@ void flow_epollid_set(struct flow_common *f, int epollid);
>  int flow_epoll_set(const struct flow_common *f, int command, uint32_t events,
>  		   int fd, unsigned int sidei);
>  void flow_epollid_register(int epollid, int epollfd);
> +unsigned int flow_qp(const struct flow_common *f);
> +#define FLOW_QP(flow_)				\
> +	(flow_qp(&(flow_)->f))
> +void flow_setqp(struct flow_common *f, unsigned int qpair);
> +#define FLOW_SETQP(flow_, _qpair)		\
> +	(flow_setqp(&(flow_)->f, _qpair))
> +
>  void flow_defer_handler(const struct ctx *c, const struct timespec *now,
>  			unsigned int qpair);
>  int flow_migrate_source_early(struct ctx *c, const struct migrate_stage *stage,
> diff --git a/icmp.c b/icmp.c
> index 62038f977116..2558fe5beaab 100644
> --- a/icmp.c
> +++ b/icmp.c
> @@ -184,7 +184,6 @@ static struct icmp_ping_flow *icmp_ping_new(const struct ctx *c,
>  	struct icmp_ping_flow *pingf;
>  	const struct flowside *tgt;
>  
> -	(void)qpair;
>  	if (!flow)
>  		return NULL;
>  
> @@ -216,6 +215,7 @@ static struct icmp_ping_flow *icmp_ping_new(const struct ctx *c,
>  	if (pingf->sock > FD_REF_MAX)
>  		goto cancel;
>  
> +	FLOW_SETQP(pingf, qpair);
>  	flow_epollid_set(&pingf->f, EPOLLFD_ID_DEFAULT);
>  	if (flow_epoll_set(&pingf->f, EPOLL_CTL_ADD, EPOLLIN, pingf->sock,
>  			   TGTSIDE) < 0) {
> @@ -305,10 +305,12 @@ int icmp_tap_handler(const struct ctx *c, unsigned int qpair, uint8_t pif,
>  	flow = flow_at_sidx(flow_lookup_af(c, proto, PIF_TAP,
>  					   af, saddr, daddr, id, id));
>  
> -	if (flow)
> +	if (flow) {
>  		pingf = &flow->ping;
> -	else if (!(pingf = icmp_ping_new(c, qpair, af, id, saddr, daddr)))
> +		FLOW_SETQP(pingf, qpair); /* XXX if qpair change, update epollfd */
> +	} else if (!(pingf = icmp_ping_new(c, qpair, af, id, saddr, daddr))) {
>  		return 1;
> +	}
>  
>  	tgt = &pingf->f.side[TGTSIDE];
>  
> diff --git a/tcp.c b/tcp.c
> index 7f8e68a31994..c0a4de33f068 100644
> --- a/tcp.c
> +++ b/tcp.c
> @@ -1735,6 +1735,7 @@ static void tcp_conn_from_tap(const struct ctx *c, unsigned int qpair,
>  
>  	conn->sock = s;
>  	conn->timer = -1;
> +	FLOW_SETQP(conn, qpair);

We obviously still need FLOW_SETQP() for guest triggered migrations,
but I wonder if we should add the qpair as a parameter to
flow_initiate_().

>  	flow_epollid_set(&conn->f, EPOLLFD_ID_DEFAULT);
>  	if (flow_epoll_set(&conn->f, EPOLL_CTL_ADD, 0, s, TGTSIDE) < 0) {
>  		flow_perror(flow, "Can't register with epoll");
> @@ -2250,7 +2251,7 @@ static void tcp_rst_no_conn(const struct ctx *c, unsigned int qpair, int af,
>  /**
>   * tcp_tap_handler() - Handle packets from tap and state transitions
>   * @c:		Execution context
> - * @qpair:	Queue pair on which to send packets
> + * @qpair:	Queue pair to process

The old version seems more accurate to me, not to mention that the
change could be pushed back into the patch that introduced this line.

>   * @pif:	pif on which the packet is arriving
>   * @af:		Address family, AF_INET or AF_INET6
>   * @saddr:	Source address
> @@ -2314,6 +2315,9 @@ int tcp_tap_handler(const struct ctx *c, unsigned int qpair, uint8_t pif,
>  	assert(pif_at_sidx(sidx) == PIF_TAP);
>  	conn = &flow->tcp;
>  
> +	/* update queue pair */
> +	FLOW_SETQP(flow, qpair);
> +
>  	flow_trace(conn, "packet length %zu from tap", l4len);
>  
>  	if (th->rst) {
> @@ -2518,6 +2522,7 @@ static void tcp_tap_conn_from_sock(const struct ctx *c, union flow *flow,
>  	conn->timer = -1;
>  	conn->ws_to_tap = conn->ws_from_tap = 0;
>  
> +	FLOW_SETQP(conn, QPAIR_DEFAULT);
>  	flow_epollid_set(&conn->f, EPOLLFD_ID_DEFAULT);
>  	if (flow_epoll_set(&conn->f, EPOLL_CTL_ADD, 0, s, INISIDE) < 0) {
>  		flow_perror(flow, "Can't register with epoll");
> @@ -2980,6 +2985,9 @@ static void tcp_keepalive(struct ctx *c, const struct timespec *now,
>  	flow_foreach_of_type(flow, FLOW_TCP) {
>  		struct tcp_tap_conn *conn = &flow->tcp;
>  
> +		if (conn->f.qpair != qpair)
> +			continue;
> +

Why filter like this, rather than using the flow's qpair to send the
keepalive?  I mean, obviously the answer is looking forward to the
multi-threading stuff, still I think that change belongs there.

>  		if (conn->tap_inactive) {
>  			flow_dbg(conn, "No tap activity for least %us, send keepalive",
>  				 KEEPALIVE_INTERVAL);
> @@ -3011,6 +3019,9 @@ static void tcp_inactivity(struct ctx *c, const struct timespec *now,
>  	flow_foreach_of_type(flow, FLOW_TCP) {
>  		struct tcp_tap_conn *conn = &flow->tcp;
>  
> +		if (conn->f.qpair != qpair)
> +			continue;
> +
>  		if (conn->inactive) {
>  			/* No activity in this interval, reset */
>  			flow_dbg(conn, "Inactive for at least %us, resetting",
> @@ -3841,6 +3852,7 @@ int tcp_flow_migrate_target(struct ctx *c, int fd)
>  		goto out;
>  	}
>  
> +	FLOW_SETQP(conn, QPAIR_DEFAULT);
>  	flow_epollid_set(&conn->f, EPOLLFD_ID_DEFAULT);
>  	if (flow_epoll_set(&conn->f, EPOLL_CTL_ADD, 0, conn->sock,
>  			   !TAPSIDE(conn)))
> @@ -4019,10 +4031,10 @@ int tcp_flow_migrate_target_ext(struct ctx *c, struct tcp_tap_conn *conn, int fd
>  	if (tcp_set_peek_offset(conn, peek_offset))
>  		goto fail;
>  
> -	if (tcp_send_flag(c, conn, ACK, QPAIR_DEFAULT))
> +	if (tcp_send_flag(c, conn, ACK, conn->f.qpair))
>  		goto fail;
>  
> -	tcp_data_from_sock(c, conn, QPAIR_DEFAULT);
> +	tcp_data_from_sock(c, conn, conn->f.qpair);
>  
>  	if ((rc = tcp_epoll_ctl(conn))) {
>  		flow_dbg(conn,
> @@ -4040,7 +4052,7 @@ fail:
>  	}
>  
>  	conn->flags = 0; /* Not waiting for ACK, don't schedule timer */
> -	tcp_rst(c, conn, QPAIR_DEFAULT);
> +	tcp_rst(c, conn, conn->f.qpair);
>  
>  	return 0;
>  }
> diff --git a/tcp_buf.c b/tcp_buf.c
> index ae8bebca5107..647c17621963 100644
> --- a/tcp_buf.c
> +++ b/tcp_buf.c
> @@ -124,7 +124,7 @@ static void tcp_revert_seq(const struct ctx *c, struct tcp_tap_conn **conns,
>  		conn->seq_to_tap = seq;
>  		peek_offset = conn->seq_to_tap - conn->seq_ack_from_tap;
>  		if (tcp_set_peek_offset(conn, peek_offset))
> -			tcp_rst(c, conn, QPAIR_DEFAULT);
> +			tcp_rst(c, conn, conn->f.qpair);

Apropos my comments on earlier patches, why not use conn->f.qpair
within tcp_rst(), instead of in this whole batch of callers?

>  	}
>  }
>  
> @@ -334,7 +334,7 @@ int tcp_buf_data_from_sock(const struct ctx *c, struct tcp_tap_conn *conn)
>  		conn->seq_to_tap = conn->seq_ack_from_tap;
>  		already_sent = 0;
>  		if (tcp_set_peek_offset(conn, 0)) {
> -			tcp_rst(c, conn, QPAIR_DEFAULT);
> +			tcp_rst(c, conn, conn->f.qpair);
>  			return -1;
>  		}
>  	}
> @@ -356,7 +356,7 @@ int tcp_buf_data_from_sock(const struct ctx *c, struct tcp_tap_conn *conn)
>  	}
>  
>  	if (tcp_prepare_iov(&mh_sock, iov_sock, already_sent, fill_bufs)) {
> -		tcp_rst(c, conn, QPAIR_DEFAULT);
> +		tcp_rst(c, conn, conn->f.qpair);
>  		return -1;
>  	}
>  
> @@ -381,7 +381,7 @@ int tcp_buf_data_from_sock(const struct ctx *c, struct tcp_tap_conn *conn)
>  
>  	if (len < 0) {
>  		if (errno != EAGAIN && errno != EWOULDBLOCK) {
> -			tcp_rst(c, conn, QPAIR_DEFAULT);
> +			tcp_rst(c, conn, conn->f.qpair);
>  			return -errno;
>  		}
>  
> @@ -410,7 +410,7 @@ int tcp_buf_data_from_sock(const struct ctx *c, struct tcp_tap_conn *conn)
>  
>  			ret = tcp_buf_send_flag(c, conn, FIN | ACK);
>  			if (ret) {
> -				tcp_rst(c, conn, QPAIR_DEFAULT);
> +				tcp_rst(c, conn, conn->f.qpair);
>  				return ret;
>  			}
>  
> diff --git a/tcp_splice.c b/tcp_splice.c
> index 3fd33a10308e..1a77ac2e8a18 100644
> --- a/tcp_splice.c
> +++ b/tcp_splice.c
> @@ -377,6 +377,7 @@ static int tcp_splice_connect(const struct ctx *c, struct tcp_splice_conn *conn)
>  
>  	pif_sockaddr(c, &sa, tgtpif, &tgt->eaddr, tgt->eport);
>  
> +	FLOW_SETQP(conn, QPAIR_DEFAULT);

qpair will never be relevant for spliced connections.  So, it would be
much nicer if we can handle this dummy initialisation within one of
the existing flow lifecycle helpers, rather than requiring it be done
explicitly in the flow-type-specific code.

>  	flow_epollid_set(&conn->f, EPOLLFD_ID_DEFAULT);
>  	if (flow_epoll_set(&conn->f, EPOLL_CTL_ADD, 0, conn->s[0], 0) ||
>  	    flow_epoll_set(&conn->f, EPOLL_CTL_ADD, 0, conn->s[1], 1)) {
> diff --git a/udp_flow.c b/udp_flow.c
> index 143f265493fa..44e0c4c50ca9 100644
> --- a/udp_flow.c
> +++ b/udp_flow.c
> @@ -81,7 +81,6 @@ static int udp_flow_sock(const struct ctx *c,
>  		return s;
>  	}
>  
> -	flow_epollid_set(&uflow->f, EPOLLFD_ID_DEFAULT);
>  	if (flow_epoll_set(&uflow->f, EPOLL_CTL_ADD, EPOLLIN, s, sidei) < 0) {
>  		rc = -errno;
>  		close(s);
> @@ -154,7 +153,8 @@ static flow_sidx_t udp_flow_new(const struct ctx *c, unsigned int qpair,
>  	uflow->ttl[INISIDE] = uflow->ttl[TGTSIDE] = 0;
>  	uflow->activity[INISIDE] = 1;
>  	uflow->activity[TGTSIDE] = 0;
> -	(void)qpair;
> +	FLOW_SETQP(uflow, qpair);
> +	flow_epollid_set(&uflow->f, EPOLLFD_ID_DEFAULT);
>  
>  	flow_foreach_sidei(sidei) {
>  		if (pif_is_socket(uflow->f.pif[sidei]))
> @@ -270,6 +270,7 @@ flow_sidx_t udp_flow_from_sock(const struct ctx *c, uint8_t pif,
>   * @daddr:	Destination address guest side
>   * @srcport:	Source port on guest side
>   * @dstport:	Destination port on guest side
> + * @now:	Current timestamp

Unrelated change?

>   *
>   * Return: sidx for the destination side of the flow for this packet, or
>   *         FLOW_SIDX_NONE if we couldn't find or create a flow.
> @@ -291,6 +292,8 @@ flow_sidx_t udp_flow_from_tap(const struct ctx *c, unsigned int qpair,
>  			      srcport, dstport);
>  	if ((uflow = udp_at_sidx(sidx))) {
>  		udp_flow_activity(uflow, sidx.sidei, now);
> +		/* update qpair */
> +		FLOW_SETQP(uflow, qpair); /* if qpair changes, update epollfd */
>  		return flow_sidx_opposite(sidx);
>  	}
>  
> -- 
> 2.54.0
> 

-- 
David Gibson (he or they)	| I'll have my music baroque, and my code
david AT gibson.dropbear.id.au	| minimalist, thank you, not the other way
				| around.
http://www.ozlabs.org/~dgibson

[-- Attachment #2: signature.asc --]
[-- Type: application/pgp-signature, Size: 833 bytes --]

  reply	other threads:[~2026-06-19  6:52 UTC|newest]

Thread overview: 24+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2026-06-16 12:51 [PATCH v5 00/12] vhost-user: Add multiqueue support Laurent Vivier
2026-06-16 12:51 ` [PATCH v5 01/12] tap: Remove pool parameter from tap4_handler() and tap6_handler() Laurent Vivier
2026-06-16 12:51 ` [PATCH v5 02/12] vhost-user: Advertise multiqueue support Laurent Vivier
2026-06-19  5:17   ` David Gibson
2026-06-16 12:51 ` [PATCH v5 03/12] test: Add multiqueue support to vhost-user test infrastructure Laurent Vivier
2026-06-19  5:06   ` David Gibson
2026-06-16 12:51 ` [PATCH v5 04/12] tap: Thread queue pair through all remaining tap paths Laurent Vivier
2026-06-19  5:37   ` David Gibson
2026-06-16 12:51 ` [PATCH v5 05/12] arp: Pass queue pair explicitly through ARP send path Laurent Vivier
2026-06-19  5:40   ` David Gibson
2026-06-16 12:51 ` [PATCH v5 06/12] tcp: Pass queue pair explicitly through TCP " Laurent Vivier
2026-06-19  6:00   ` David Gibson
2026-06-16 12:51 ` [PATCH v5 07/12] udp: Pass queue pair explicitly through UDP " Laurent Vivier
2026-06-19  6:08   ` David Gibson
2026-06-16 12:51 ` [PATCH v5 08/12] dhcp/dhcpv6: Pass queue pair explicitly through DHCP " Laurent Vivier
2026-06-19  6:10   ` David Gibson
2026-06-16 12:51 ` [PATCH v5 09/12] icmp: Pass queue pair explicitly through ICMP " Laurent Vivier
2026-06-19  6:12   ` David Gibson
2026-06-16 12:51 ` [PATCH v5 10/12] ndp: Pass queue pair explicitly through NDP " Laurent Vivier
2026-06-19  6:17   ` David Gibson
2026-06-16 12:51 ` [PATCH v5 11/12] flow: Add queue pair tracking to flow management Laurent Vivier
2026-06-19  6:36   ` David Gibson [this message]
2026-06-16 12:51 ` [PATCH v5 12/12] flow: Derive epoll fd from queue pair, removing epollid field Laurent Vivier
2026-06-19  6:52   ` David Gibson

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=ajTjVwvVHM69nU1T@zatzit \
    --to=david@gibson.dropbear.id.au \
    --cc=lvivier@redhat.com \
    --cc=passt-dev@passt.top \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
Code repositories for project(s) associated with this public inbox

	https://passt.top/passt

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for IMAP folder(s).