On Tue, Jun 16, 2026 at 02:51:29PM +0200, Laurent Vivier wrote: > Add a qpair field (5 bits) to struct flow_common, with > FLOW_QPAIR_INVALID as sentinel for unassigned flows. Provide > flow_setqp()/FLOW_SETQP() to assign and flow_qp()/FLOW_QP() to > query the queue pair. This seems like it ought to be earlier in the series, since getting the qp from the flow seems like it should obviate some of the plumbing the last half dozen patches have been doing. > All protocol handlers (TCP, UDP, ICMP) set the queue pair on new > flows via FLOW_SETQP(), and update it on each packet received from > tap for existing flows, implementing virtio receive steering: return > traffic is directed to the RX queue matching the originating TX > queue. > > tcp_keepalive() and tcp_inactivity() now filter by queue pair so > each worker only processes its own flows. > > tcp_buf.c uses conn->f.qpair instead of hardcoding QPAIR_DEFAULT > for consistency, though this path is only used in non-vhost-user > mode where the queue pair is always 0. That's not unreasonable, except by reading the qpair from the flow later in the to-guest path, I think we should avoid some of the plumbing and therefore this oddity. > Flows initiated from the host side default to queue pair 0. "default" implies that under some conditions they will go somewhere else, but I don't think that's (yet) the case, right? > > Signed-off-by: Laurent Vivier > --- > flow.c | 34 ++++++++++++++++++++++++++++++++++ > flow.h | 18 +++++++++++++++++- > icmp.c | 8 +++++--- > tcp.c | 20 ++++++++++++++++---- > tcp_buf.c | 10 +++++----- > tcp_splice.c | 1 + > udp_flow.c | 7 +++++-- > 7 files changed, 83 insertions(+), 15 deletions(-) > > diff --git a/flow.c b/flow.c > index c93b73549c90..bf855fe0dfaf 100644 > --- a/flow.c > +++ b/flow.c > @@ -415,6 +415,39 @@ void flow_epollid_register(int epollid, int epollfd) > epoll_id_to_fd[epollid] = epollfd; > } > > +/** > + * flow_qp() - Get the queue pair for a flow > + * @f: Flow to query (may be NULL) > + * > + * Return: queue pair number for the flow, or 0 if flow is NULL or has no > + * valid queue pair assignment > + */ > +/* cppcheck-suppress unusedFunction */ > +unsigned int flow_qp(const struct flow_common *f) > +{ > + if (f == NULL || f->qpair == FLOW_QPAIR_INVALID) > + return QPAIR_DEFAULT; > + return f->qpair; > +} > + > +/** > + * flow_setqp() - Set queue pair assignment for a flow > + * @f: Flow to update > + * @qpair: Queue pair number to assign > + */ > +void flow_setqp(struct flow_common *f, unsigned int qpair) > +{ > + assert(qpair < FLOW_QPAIR_MAX); > + > + if (f->qpair == qpair) > + return; > + > + flow_trace((union flow *)f, "updating queue pair from %d to %d", > + f->qpair, qpair); > + > + f->qpair = qpair; > +} > + > /** > * flow_initiate_() - Move flow to INI, setting pif[INISIDE] > * @flow: Flow to change state > @@ -636,6 +669,7 @@ union flow *flow_alloc(void) > > flow_new_entry = flow; > memset(flow, 0, sizeof(*flow)); > + flow->f.qpair = FLOW_QPAIR_INVALID; > flow_set_state(&flow->f, FLOW_STATE_NEW); > > return flow; > diff --git a/flow.h b/flow.h > index cae259fe7037..3c74dcbd95c4 100644 > --- a/flow.h > +++ b/flow.h > @@ -184,7 +184,8 @@ int flowside_connect(const struct ctx *c, int s, > * @pif[]: Interface for each side of the flow > * @side[]: Information for each side of the flow > * @tap_omac: MAC address of remote endpoint as seen from the guest > - * @epollid: epollfd identifier > + * @qpair: Queue pair number assigned to this flow > + * (FLOW_QPAIR_INVALID if not assigned) > */ > struct flow_common { > #ifdef __GNUC__ > @@ -205,11 +206,19 @@ struct flow_common { > > #define EPOLLFD_ID_BITS 8 > unsigned int epollid:EPOLLFD_ID_BITS; > +#define FLOW_QPAIR_BITS 5 > + unsigned int qpair:FLOW_QPAIR_BITS; > }; > > #define EPOLLFD_ID_DEFAULT 0 > #define EPOLLFD_ID_SIZE (1 << EPOLLFD_ID_BITS) > > +#define FLOW_QPAIR_NUM (1 << FLOW_QPAIR_BITS) > +#define FLOW_QPAIR_MAX (FLOW_QPAIR_NUM - 1) > +#define FLOW_QPAIR_INVALID FLOW_QPAIR_MAX > + > +static_assert(VHOST_USER_MAX_VQS <= FLOW_QPAIR_MAX * 2); Should be strictly <, shouldn't it, since FLOW_QPAIR_MAX is reserved for INVALID and can't be used for a real queue, right? > #define FLOW_INDEX_BITS 17 /* 128k - 1 */ > #define FLOW_MAX MAX_FROM_BITS(FLOW_INDEX_BITS) > > @@ -270,6 +279,13 @@ void flow_epollid_set(struct flow_common *f, int epollid); > int flow_epoll_set(const struct flow_common *f, int command, uint32_t events, > int fd, unsigned int sidei); > void flow_epollid_register(int epollid, int epollfd); > +unsigned int flow_qp(const struct flow_common *f); > +#define FLOW_QP(flow_) \ > + (flow_qp(&(flow_)->f)) > +void flow_setqp(struct flow_common *f, unsigned int qpair); > +#define FLOW_SETQP(flow_, _qpair) \ > + (flow_setqp(&(flow_)->f, _qpair)) > + > void flow_defer_handler(const struct ctx *c, const struct timespec *now, > unsigned int qpair); > int flow_migrate_source_early(struct ctx *c, const struct migrate_stage *stage, > diff --git a/icmp.c b/icmp.c > index 62038f977116..2558fe5beaab 100644 > --- a/icmp.c > +++ b/icmp.c > @@ -184,7 +184,6 @@ static struct icmp_ping_flow *icmp_ping_new(const struct ctx *c, > struct icmp_ping_flow *pingf; > const struct flowside *tgt; > > - (void)qpair; > if (!flow) > return NULL; > > @@ -216,6 +215,7 @@ static struct icmp_ping_flow *icmp_ping_new(const struct ctx *c, > if (pingf->sock > FD_REF_MAX) > goto cancel; > > + FLOW_SETQP(pingf, qpair); > flow_epollid_set(&pingf->f, EPOLLFD_ID_DEFAULT); > if (flow_epoll_set(&pingf->f, EPOLL_CTL_ADD, EPOLLIN, pingf->sock, > TGTSIDE) < 0) { > @@ -305,10 +305,12 @@ int icmp_tap_handler(const struct ctx *c, unsigned int qpair, uint8_t pif, > flow = flow_at_sidx(flow_lookup_af(c, proto, PIF_TAP, > af, saddr, daddr, id, id)); > > - if (flow) > + if (flow) { > pingf = &flow->ping; > - else if (!(pingf = icmp_ping_new(c, qpair, af, id, saddr, daddr))) > + FLOW_SETQP(pingf, qpair); /* XXX if qpair change, update epollfd */ > + } else if (!(pingf = icmp_ping_new(c, qpair, af, id, saddr, daddr))) { > return 1; > + } > > tgt = &pingf->f.side[TGTSIDE]; > > diff --git a/tcp.c b/tcp.c > index 7f8e68a31994..c0a4de33f068 100644 > --- a/tcp.c > +++ b/tcp.c > @@ -1735,6 +1735,7 @@ static void tcp_conn_from_tap(const struct ctx *c, unsigned int qpair, > > conn->sock = s; > conn->timer = -1; > + FLOW_SETQP(conn, qpair); We obviously still need FLOW_SETQP() for guest triggered migrations, but I wonder if we should add the qpair as a parameter to flow_initiate_(). > flow_epollid_set(&conn->f, EPOLLFD_ID_DEFAULT); > if (flow_epoll_set(&conn->f, EPOLL_CTL_ADD, 0, s, TGTSIDE) < 0) { > flow_perror(flow, "Can't register with epoll"); > @@ -2250,7 +2251,7 @@ static void tcp_rst_no_conn(const struct ctx *c, unsigned int qpair, int af, > /** > * tcp_tap_handler() - Handle packets from tap and state transitions > * @c: Execution context > - * @qpair: Queue pair on which to send packets > + * @qpair: Queue pair to process The old version seems more accurate to me, not to mention that the change could be pushed back into the patch that introduced this line. > * @pif: pif on which the packet is arriving > * @af: Address family, AF_INET or AF_INET6 > * @saddr: Source address > @@ -2314,6 +2315,9 @@ int tcp_tap_handler(const struct ctx *c, unsigned int qpair, uint8_t pif, > assert(pif_at_sidx(sidx) == PIF_TAP); > conn = &flow->tcp; > > + /* update queue pair */ > + FLOW_SETQP(flow, qpair); > + > flow_trace(conn, "packet length %zu from tap", l4len); > > if (th->rst) { > @@ -2518,6 +2522,7 @@ static void tcp_tap_conn_from_sock(const struct ctx *c, union flow *flow, > conn->timer = -1; > conn->ws_to_tap = conn->ws_from_tap = 0; > > + FLOW_SETQP(conn, QPAIR_DEFAULT); > flow_epollid_set(&conn->f, EPOLLFD_ID_DEFAULT); > if (flow_epoll_set(&conn->f, EPOLL_CTL_ADD, 0, s, INISIDE) < 0) { > flow_perror(flow, "Can't register with epoll"); > @@ -2980,6 +2985,9 @@ static void tcp_keepalive(struct ctx *c, const struct timespec *now, > flow_foreach_of_type(flow, FLOW_TCP) { > struct tcp_tap_conn *conn = &flow->tcp; > > + if (conn->f.qpair != qpair) > + continue; > + Why filter like this, rather than using the flow's qpair to send the keepalive? I mean, obviously the answer is looking forward to the multi-threading stuff, still I think that change belongs there. > if (conn->tap_inactive) { > flow_dbg(conn, "No tap activity for least %us, send keepalive", > KEEPALIVE_INTERVAL); > @@ -3011,6 +3019,9 @@ static void tcp_inactivity(struct ctx *c, const struct timespec *now, > flow_foreach_of_type(flow, FLOW_TCP) { > struct tcp_tap_conn *conn = &flow->tcp; > > + if (conn->f.qpair != qpair) > + continue; > + > if (conn->inactive) { > /* No activity in this interval, reset */ > flow_dbg(conn, "Inactive for at least %us, resetting", > @@ -3841,6 +3852,7 @@ int tcp_flow_migrate_target(struct ctx *c, int fd) > goto out; > } > > + FLOW_SETQP(conn, QPAIR_DEFAULT); > flow_epollid_set(&conn->f, EPOLLFD_ID_DEFAULT); > if (flow_epoll_set(&conn->f, EPOLL_CTL_ADD, 0, conn->sock, > !TAPSIDE(conn))) > @@ -4019,10 +4031,10 @@ int tcp_flow_migrate_target_ext(struct ctx *c, struct tcp_tap_conn *conn, int fd > if (tcp_set_peek_offset(conn, peek_offset)) > goto fail; > > - if (tcp_send_flag(c, conn, ACK, QPAIR_DEFAULT)) > + if (tcp_send_flag(c, conn, ACK, conn->f.qpair)) > goto fail; > > - tcp_data_from_sock(c, conn, QPAIR_DEFAULT); > + tcp_data_from_sock(c, conn, conn->f.qpair); > > if ((rc = tcp_epoll_ctl(conn))) { > flow_dbg(conn, > @@ -4040,7 +4052,7 @@ fail: > } > > conn->flags = 0; /* Not waiting for ACK, don't schedule timer */ > - tcp_rst(c, conn, QPAIR_DEFAULT); > + tcp_rst(c, conn, conn->f.qpair); > > return 0; > } > diff --git a/tcp_buf.c b/tcp_buf.c > index ae8bebca5107..647c17621963 100644 > --- a/tcp_buf.c > +++ b/tcp_buf.c > @@ -124,7 +124,7 @@ static void tcp_revert_seq(const struct ctx *c, struct tcp_tap_conn **conns, > conn->seq_to_tap = seq; > peek_offset = conn->seq_to_tap - conn->seq_ack_from_tap; > if (tcp_set_peek_offset(conn, peek_offset)) > - tcp_rst(c, conn, QPAIR_DEFAULT); > + tcp_rst(c, conn, conn->f.qpair); Apropos my comments on earlier patches, why not use conn->f.qpair within tcp_rst(), instead of in this whole batch of callers? > } > } > > @@ -334,7 +334,7 @@ int tcp_buf_data_from_sock(const struct ctx *c, struct tcp_tap_conn *conn) > conn->seq_to_tap = conn->seq_ack_from_tap; > already_sent = 0; > if (tcp_set_peek_offset(conn, 0)) { > - tcp_rst(c, conn, QPAIR_DEFAULT); > + tcp_rst(c, conn, conn->f.qpair); > return -1; > } > } > @@ -356,7 +356,7 @@ int tcp_buf_data_from_sock(const struct ctx *c, struct tcp_tap_conn *conn) > } > > if (tcp_prepare_iov(&mh_sock, iov_sock, already_sent, fill_bufs)) { > - tcp_rst(c, conn, QPAIR_DEFAULT); > + tcp_rst(c, conn, conn->f.qpair); > return -1; > } > > @@ -381,7 +381,7 @@ int tcp_buf_data_from_sock(const struct ctx *c, struct tcp_tap_conn *conn) > > if (len < 0) { > if (errno != EAGAIN && errno != EWOULDBLOCK) { > - tcp_rst(c, conn, QPAIR_DEFAULT); > + tcp_rst(c, conn, conn->f.qpair); > return -errno; > } > > @@ -410,7 +410,7 @@ int tcp_buf_data_from_sock(const struct ctx *c, struct tcp_tap_conn *conn) > > ret = tcp_buf_send_flag(c, conn, FIN | ACK); > if (ret) { > - tcp_rst(c, conn, QPAIR_DEFAULT); > + tcp_rst(c, conn, conn->f.qpair); > return ret; > } > > diff --git a/tcp_splice.c b/tcp_splice.c > index 3fd33a10308e..1a77ac2e8a18 100644 > --- a/tcp_splice.c > +++ b/tcp_splice.c > @@ -377,6 +377,7 @@ static int tcp_splice_connect(const struct ctx *c, struct tcp_splice_conn *conn) > > pif_sockaddr(c, &sa, tgtpif, &tgt->eaddr, tgt->eport); > > + FLOW_SETQP(conn, QPAIR_DEFAULT); qpair will never be relevant for spliced connections. So, it would be much nicer if we can handle this dummy initialisation within one of the existing flow lifecycle helpers, rather than requiring it be done explicitly in the flow-type-specific code. > flow_epollid_set(&conn->f, EPOLLFD_ID_DEFAULT); > if (flow_epoll_set(&conn->f, EPOLL_CTL_ADD, 0, conn->s[0], 0) || > flow_epoll_set(&conn->f, EPOLL_CTL_ADD, 0, conn->s[1], 1)) { > diff --git a/udp_flow.c b/udp_flow.c > index 143f265493fa..44e0c4c50ca9 100644 > --- a/udp_flow.c > +++ b/udp_flow.c > @@ -81,7 +81,6 @@ static int udp_flow_sock(const struct ctx *c, > return s; > } > > - flow_epollid_set(&uflow->f, EPOLLFD_ID_DEFAULT); > if (flow_epoll_set(&uflow->f, EPOLL_CTL_ADD, EPOLLIN, s, sidei) < 0) { > rc = -errno; > close(s); > @@ -154,7 +153,8 @@ static flow_sidx_t udp_flow_new(const struct ctx *c, unsigned int qpair, > uflow->ttl[INISIDE] = uflow->ttl[TGTSIDE] = 0; > uflow->activity[INISIDE] = 1; > uflow->activity[TGTSIDE] = 0; > - (void)qpair; > + FLOW_SETQP(uflow, qpair); > + flow_epollid_set(&uflow->f, EPOLLFD_ID_DEFAULT); > > flow_foreach_sidei(sidei) { > if (pif_is_socket(uflow->f.pif[sidei])) > @@ -270,6 +270,7 @@ flow_sidx_t udp_flow_from_sock(const struct ctx *c, uint8_t pif, > * @daddr: Destination address guest side > * @srcport: Source port on guest side > * @dstport: Destination port on guest side > + * @now: Current timestamp Unrelated change? > * > * Return: sidx for the destination side of the flow for this packet, or > * FLOW_SIDX_NONE if we couldn't find or create a flow. > @@ -291,6 +292,8 @@ flow_sidx_t udp_flow_from_tap(const struct ctx *c, unsigned int qpair, > srcport, dstport); > if ((uflow = udp_at_sidx(sidx))) { > udp_flow_activity(uflow, sidx.sidei, now); > + /* update qpair */ > + FLOW_SETQP(uflow, qpair); /* if qpair changes, update epollfd */ > return flow_sidx_opposite(sidx); > } > > -- > 2.54.0 > -- David Gibson (he or they) | I'll have my music baroque, and my code david AT gibson.dropbear.id.au | minimalist, thank you, not the other way | around. http://www.ozlabs.org/~dgibson