From mboxrd@z Thu Jan 1 00:00:00 1970 Authentication-Results: passt.top; dmarc=pass (p=quarantine dis=none) header.from=redhat.com Authentication-Results: passt.top; dkim=pass (1024-bit key; unprotected) header.d=redhat.com header.i=@redhat.com header.a=rsa-sha256 header.s=mimecast20190719 header.b=CzUldw+E; dkim-atps=neutral Received: from us-smtp-delivery-124.mimecast.com (us-smtp-delivery-124.mimecast.com [170.10.129.124]) by passt.top (Postfix) with ESMTPS id 4F6955A026E for ; Tue, 16 Jun 2026 14:51:52 +0200 (CEST) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=redhat.com; s=mimecast20190719; t=1781614311; h=from:from:reply-to:subject:subject:date:date:message-id:message-id: to:to:cc:cc:mime-version:mime-version:content-type:content-type: content-transfer-encoding:content-transfer-encoding: in-reply-to:in-reply-to:references:references; bh=OEQ0sJAzxUyHaPFxtLQ4N52BNnsCXHTCqP+JxaH78ts=; b=CzUldw+EC1RKdcRZandOvwl10ntpmbibzhLhrkZcxTY4DdNPv1iICrTPWJLUg9MIN1a7Sz xB4156El7RmigU3/Y6LpU8WgAahq/xARAXVsi8j7iFiVTwicdn7T2sLpVRpinFRGB1JqA6 N1Ij0EcHSAIoYHwt7Ru1pOky42AUp4g= Received: from mx-prod-mc-05.mail-002.prod.us-west-2.aws.redhat.com (ec2-54-186-198-63.us-west-2.compute.amazonaws.com [54.186.198.63]) by relay.mimecast.com with ESMTP with STARTTLS (version=TLSv1.3, cipher=TLS_AES_256_GCM_SHA384) id us-mta-483-qUfnE4-nOgqWi31TyxAgzQ-1; Tue, 16 Jun 2026 08:51:49 -0400 X-MC-Unique: qUfnE4-nOgqWi31TyxAgzQ-1 X-Mimecast-MFC-AGG-ID: qUfnE4-nOgqWi31TyxAgzQ_1781614308 Received: from mx-prod-int-06.mail-002.prod.us-west-2.aws.redhat.com (mx-prod-int-06.mail-002.prod.us-west-2.aws.redhat.com [10.30.177.93]) (using TLSv1.3 with cipher TLS_AES_256_GCM_SHA384 (256/256 bits) key-exchange X25519 server-signature RSA-PSS (2048 bits) server-digest SHA256) (No client certificate requested) by mx-prod-mc-05.mail-002.prod.us-west-2.aws.redhat.com (Postfix) with ESMTPS id CC4781955DD8 for ; Tue, 16 Jun 2026 12:51:48 +0000 (UTC) Received: from lenovo-t14s.redhat.corp (headnet05.pony-001.prod.iad2.dc.redhat.com [10.2.32.117]) by mx-prod-int-06.mail-002.prod.us-west-2.aws.redhat.com (Postfix) with ESMTP id 048641800593; Tue, 16 Jun 2026 12:51:47 +0000 (UTC) From: Laurent Vivier To: passt-dev@passt.top Subject: [PATCH v5 11/12] flow: Add queue pair tracking to flow management Date: Tue, 16 Jun 2026 14:51:29 +0200 Message-ID: <20260616125130.1324274-12-lvivier@redhat.com> In-Reply-To: <20260616125130.1324274-1-lvivier@redhat.com> References: <20260616125130.1324274-1-lvivier@redhat.com> MIME-Version: 1.0 X-Scanned-By: MIMEDefang 3.4.1 on 10.30.177.93 X-Mimecast-Spam-Score: 0 X-Mimecast-MFC-PROC-ID: MykpMY2I0ah_IMTbssce61GuJ4mP2tF4dXKtXSphj2c_1781614308 X-Mimecast-Originator: redhat.com Content-Transfer-Encoding: 8bit content-type: text/plain; charset="US-ASCII"; x-default=true Message-ID-Hash: VSYVZNVO5O7FRQ36BDL3KORGGOMRFXK3 X-Message-ID-Hash: VSYVZNVO5O7FRQ36BDL3KORGGOMRFXK3 X-MailFrom: lvivier@redhat.com X-Mailman-Rule-Misses: dmarc-mitigation; no-senders; approved; emergency; loop; banned-address; member-moderation; nonmember-moderation; administrivia; implicit-dest; max-recipients; max-size; news-moderation; no-subject; digests; suspicious-header CC: Laurent Vivier X-Mailman-Version: 3.3.8 Precedence: list List-Id: Development discussion and patches for passt Archived-At: Archived-At: List-Archive: List-Archive: List-Help: List-Owner: List-Post: List-Subscribe: List-Unsubscribe: Add a qpair field (5 bits) to struct flow_common, with FLOW_QPAIR_INVALID as sentinel for unassigned flows. Provide flow_setqp()/FLOW_SETQP() to assign and flow_qp()/FLOW_QP() to query the queue pair. All protocol handlers (TCP, UDP, ICMP) set the queue pair on new flows via FLOW_SETQP(), and update it on each packet received from tap for existing flows, implementing virtio receive steering: return traffic is directed to the RX queue matching the originating TX queue. tcp_keepalive() and tcp_inactivity() now filter by queue pair so each worker only processes its own flows. tcp_buf.c uses conn->f.qpair instead of hardcoding QPAIR_DEFAULT for consistency, though this path is only used in non-vhost-user mode where the queue pair is always 0. Flows initiated from the host side default to queue pair 0. Signed-off-by: Laurent Vivier --- flow.c | 34 ++++++++++++++++++++++++++++++++++ flow.h | 18 +++++++++++++++++- icmp.c | 8 +++++--- tcp.c | 20 ++++++++++++++++---- tcp_buf.c | 10 +++++----- tcp_splice.c | 1 + udp_flow.c | 7 +++++-- 7 files changed, 83 insertions(+), 15 deletions(-) diff --git a/flow.c b/flow.c index c93b73549c90..bf855fe0dfaf 100644 --- a/flow.c +++ b/flow.c @@ -415,6 +415,39 @@ void flow_epollid_register(int epollid, int epollfd) epoll_id_to_fd[epollid] = epollfd; } +/** + * flow_qp() - Get the queue pair for a flow + * @f: Flow to query (may be NULL) + * + * Return: queue pair number for the flow, or 0 if flow is NULL or has no + * valid queue pair assignment + */ +/* cppcheck-suppress unusedFunction */ +unsigned int flow_qp(const struct flow_common *f) +{ + if (f == NULL || f->qpair == FLOW_QPAIR_INVALID) + return QPAIR_DEFAULT; + return f->qpair; +} + +/** + * flow_setqp() - Set queue pair assignment for a flow + * @f: Flow to update + * @qpair: Queue pair number to assign + */ +void flow_setqp(struct flow_common *f, unsigned int qpair) +{ + assert(qpair < FLOW_QPAIR_MAX); + + if (f->qpair == qpair) + return; + + flow_trace((union flow *)f, "updating queue pair from %d to %d", + f->qpair, qpair); + + f->qpair = qpair; +} + /** * flow_initiate_() - Move flow to INI, setting pif[INISIDE] * @flow: Flow to change state @@ -636,6 +669,7 @@ union flow *flow_alloc(void) flow_new_entry = flow; memset(flow, 0, sizeof(*flow)); + flow->f.qpair = FLOW_QPAIR_INVALID; flow_set_state(&flow->f, FLOW_STATE_NEW); return flow; diff --git a/flow.h b/flow.h index cae259fe7037..3c74dcbd95c4 100644 --- a/flow.h +++ b/flow.h @@ -184,7 +184,8 @@ int flowside_connect(const struct ctx *c, int s, * @pif[]: Interface for each side of the flow * @side[]: Information for each side of the flow * @tap_omac: MAC address of remote endpoint as seen from the guest - * @epollid: epollfd identifier + * @qpair: Queue pair number assigned to this flow + * (FLOW_QPAIR_INVALID if not assigned) */ struct flow_common { #ifdef __GNUC__ @@ -205,11 +206,19 @@ struct flow_common { #define EPOLLFD_ID_BITS 8 unsigned int epollid:EPOLLFD_ID_BITS; +#define FLOW_QPAIR_BITS 5 + unsigned int qpair:FLOW_QPAIR_BITS; }; #define EPOLLFD_ID_DEFAULT 0 #define EPOLLFD_ID_SIZE (1 << EPOLLFD_ID_BITS) +#define FLOW_QPAIR_NUM (1 << FLOW_QPAIR_BITS) +#define FLOW_QPAIR_MAX (FLOW_QPAIR_NUM - 1) +#define FLOW_QPAIR_INVALID FLOW_QPAIR_MAX + +static_assert(VHOST_USER_MAX_VQS <= FLOW_QPAIR_MAX * 2); + #define FLOW_INDEX_BITS 17 /* 128k - 1 */ #define FLOW_MAX MAX_FROM_BITS(FLOW_INDEX_BITS) @@ -270,6 +279,13 @@ void flow_epollid_set(struct flow_common *f, int epollid); int flow_epoll_set(const struct flow_common *f, int command, uint32_t events, int fd, unsigned int sidei); void flow_epollid_register(int epollid, int epollfd); +unsigned int flow_qp(const struct flow_common *f); +#define FLOW_QP(flow_) \ + (flow_qp(&(flow_)->f)) +void flow_setqp(struct flow_common *f, unsigned int qpair); +#define FLOW_SETQP(flow_, _qpair) \ + (flow_setqp(&(flow_)->f, _qpair)) + void flow_defer_handler(const struct ctx *c, const struct timespec *now, unsigned int qpair); int flow_migrate_source_early(struct ctx *c, const struct migrate_stage *stage, diff --git a/icmp.c b/icmp.c index 62038f977116..2558fe5beaab 100644 --- a/icmp.c +++ b/icmp.c @@ -184,7 +184,6 @@ static struct icmp_ping_flow *icmp_ping_new(const struct ctx *c, struct icmp_ping_flow *pingf; const struct flowside *tgt; - (void)qpair; if (!flow) return NULL; @@ -216,6 +215,7 @@ static struct icmp_ping_flow *icmp_ping_new(const struct ctx *c, if (pingf->sock > FD_REF_MAX) goto cancel; + FLOW_SETQP(pingf, qpair); flow_epollid_set(&pingf->f, EPOLLFD_ID_DEFAULT); if (flow_epoll_set(&pingf->f, EPOLL_CTL_ADD, EPOLLIN, pingf->sock, TGTSIDE) < 0) { @@ -305,10 +305,12 @@ int icmp_tap_handler(const struct ctx *c, unsigned int qpair, uint8_t pif, flow = flow_at_sidx(flow_lookup_af(c, proto, PIF_TAP, af, saddr, daddr, id, id)); - if (flow) + if (flow) { pingf = &flow->ping; - else if (!(pingf = icmp_ping_new(c, qpair, af, id, saddr, daddr))) + FLOW_SETQP(pingf, qpair); /* XXX if qpair change, update epollfd */ + } else if (!(pingf = icmp_ping_new(c, qpair, af, id, saddr, daddr))) { return 1; + } tgt = &pingf->f.side[TGTSIDE]; diff --git a/tcp.c b/tcp.c index 7f8e68a31994..c0a4de33f068 100644 --- a/tcp.c +++ b/tcp.c @@ -1735,6 +1735,7 @@ static void tcp_conn_from_tap(const struct ctx *c, unsigned int qpair, conn->sock = s; conn->timer = -1; + FLOW_SETQP(conn, qpair); flow_epollid_set(&conn->f, EPOLLFD_ID_DEFAULT); if (flow_epoll_set(&conn->f, EPOLL_CTL_ADD, 0, s, TGTSIDE) < 0) { flow_perror(flow, "Can't register with epoll"); @@ -2250,7 +2251,7 @@ static void tcp_rst_no_conn(const struct ctx *c, unsigned int qpair, int af, /** * tcp_tap_handler() - Handle packets from tap and state transitions * @c: Execution context - * @qpair: Queue pair on which to send packets + * @qpair: Queue pair to process * @pif: pif on which the packet is arriving * @af: Address family, AF_INET or AF_INET6 * @saddr: Source address @@ -2314,6 +2315,9 @@ int tcp_tap_handler(const struct ctx *c, unsigned int qpair, uint8_t pif, assert(pif_at_sidx(sidx) == PIF_TAP); conn = &flow->tcp; + /* update queue pair */ + FLOW_SETQP(flow, qpair); + flow_trace(conn, "packet length %zu from tap", l4len); if (th->rst) { @@ -2518,6 +2522,7 @@ static void tcp_tap_conn_from_sock(const struct ctx *c, union flow *flow, conn->timer = -1; conn->ws_to_tap = conn->ws_from_tap = 0; + FLOW_SETQP(conn, QPAIR_DEFAULT); flow_epollid_set(&conn->f, EPOLLFD_ID_DEFAULT); if (flow_epoll_set(&conn->f, EPOLL_CTL_ADD, 0, s, INISIDE) < 0) { flow_perror(flow, "Can't register with epoll"); @@ -2980,6 +2985,9 @@ static void tcp_keepalive(struct ctx *c, const struct timespec *now, flow_foreach_of_type(flow, FLOW_TCP) { struct tcp_tap_conn *conn = &flow->tcp; + if (conn->f.qpair != qpair) + continue; + if (conn->tap_inactive) { flow_dbg(conn, "No tap activity for least %us, send keepalive", KEEPALIVE_INTERVAL); @@ -3011,6 +3019,9 @@ static void tcp_inactivity(struct ctx *c, const struct timespec *now, flow_foreach_of_type(flow, FLOW_TCP) { struct tcp_tap_conn *conn = &flow->tcp; + if (conn->f.qpair != qpair) + continue; + if (conn->inactive) { /* No activity in this interval, reset */ flow_dbg(conn, "Inactive for at least %us, resetting", @@ -3841,6 +3852,7 @@ int tcp_flow_migrate_target(struct ctx *c, int fd) goto out; } + FLOW_SETQP(conn, QPAIR_DEFAULT); flow_epollid_set(&conn->f, EPOLLFD_ID_DEFAULT); if (flow_epoll_set(&conn->f, EPOLL_CTL_ADD, 0, conn->sock, !TAPSIDE(conn))) @@ -4019,10 +4031,10 @@ int tcp_flow_migrate_target_ext(struct ctx *c, struct tcp_tap_conn *conn, int fd if (tcp_set_peek_offset(conn, peek_offset)) goto fail; - if (tcp_send_flag(c, conn, ACK, QPAIR_DEFAULT)) + if (tcp_send_flag(c, conn, ACK, conn->f.qpair)) goto fail; - tcp_data_from_sock(c, conn, QPAIR_DEFAULT); + tcp_data_from_sock(c, conn, conn->f.qpair); if ((rc = tcp_epoll_ctl(conn))) { flow_dbg(conn, @@ -4040,7 +4052,7 @@ fail: } conn->flags = 0; /* Not waiting for ACK, don't schedule timer */ - tcp_rst(c, conn, QPAIR_DEFAULT); + tcp_rst(c, conn, conn->f.qpair); return 0; } diff --git a/tcp_buf.c b/tcp_buf.c index ae8bebca5107..647c17621963 100644 --- a/tcp_buf.c +++ b/tcp_buf.c @@ -124,7 +124,7 @@ static void tcp_revert_seq(const struct ctx *c, struct tcp_tap_conn **conns, conn->seq_to_tap = seq; peek_offset = conn->seq_to_tap - conn->seq_ack_from_tap; if (tcp_set_peek_offset(conn, peek_offset)) - tcp_rst(c, conn, QPAIR_DEFAULT); + tcp_rst(c, conn, conn->f.qpair); } } @@ -334,7 +334,7 @@ int tcp_buf_data_from_sock(const struct ctx *c, struct tcp_tap_conn *conn) conn->seq_to_tap = conn->seq_ack_from_tap; already_sent = 0; if (tcp_set_peek_offset(conn, 0)) { - tcp_rst(c, conn, QPAIR_DEFAULT); + tcp_rst(c, conn, conn->f.qpair); return -1; } } @@ -356,7 +356,7 @@ int tcp_buf_data_from_sock(const struct ctx *c, struct tcp_tap_conn *conn) } if (tcp_prepare_iov(&mh_sock, iov_sock, already_sent, fill_bufs)) { - tcp_rst(c, conn, QPAIR_DEFAULT); + tcp_rst(c, conn, conn->f.qpair); return -1; } @@ -381,7 +381,7 @@ int tcp_buf_data_from_sock(const struct ctx *c, struct tcp_tap_conn *conn) if (len < 0) { if (errno != EAGAIN && errno != EWOULDBLOCK) { - tcp_rst(c, conn, QPAIR_DEFAULT); + tcp_rst(c, conn, conn->f.qpair); return -errno; } @@ -410,7 +410,7 @@ int tcp_buf_data_from_sock(const struct ctx *c, struct tcp_tap_conn *conn) ret = tcp_buf_send_flag(c, conn, FIN | ACK); if (ret) { - tcp_rst(c, conn, QPAIR_DEFAULT); + tcp_rst(c, conn, conn->f.qpair); return ret; } diff --git a/tcp_splice.c b/tcp_splice.c index 3fd33a10308e..1a77ac2e8a18 100644 --- a/tcp_splice.c +++ b/tcp_splice.c @@ -377,6 +377,7 @@ static int tcp_splice_connect(const struct ctx *c, struct tcp_splice_conn *conn) pif_sockaddr(c, &sa, tgtpif, &tgt->eaddr, tgt->eport); + FLOW_SETQP(conn, QPAIR_DEFAULT); flow_epollid_set(&conn->f, EPOLLFD_ID_DEFAULT); if (flow_epoll_set(&conn->f, EPOLL_CTL_ADD, 0, conn->s[0], 0) || flow_epoll_set(&conn->f, EPOLL_CTL_ADD, 0, conn->s[1], 1)) { diff --git a/udp_flow.c b/udp_flow.c index 143f265493fa..44e0c4c50ca9 100644 --- a/udp_flow.c +++ b/udp_flow.c @@ -81,7 +81,6 @@ static int udp_flow_sock(const struct ctx *c, return s; } - flow_epollid_set(&uflow->f, EPOLLFD_ID_DEFAULT); if (flow_epoll_set(&uflow->f, EPOLL_CTL_ADD, EPOLLIN, s, sidei) < 0) { rc = -errno; close(s); @@ -154,7 +153,8 @@ static flow_sidx_t udp_flow_new(const struct ctx *c, unsigned int qpair, uflow->ttl[INISIDE] = uflow->ttl[TGTSIDE] = 0; uflow->activity[INISIDE] = 1; uflow->activity[TGTSIDE] = 0; - (void)qpair; + FLOW_SETQP(uflow, qpair); + flow_epollid_set(&uflow->f, EPOLLFD_ID_DEFAULT); flow_foreach_sidei(sidei) { if (pif_is_socket(uflow->f.pif[sidei])) @@ -270,6 +270,7 @@ flow_sidx_t udp_flow_from_sock(const struct ctx *c, uint8_t pif, * @daddr: Destination address guest side * @srcport: Source port on guest side * @dstport: Destination port on guest side + * @now: Current timestamp * * Return: sidx for the destination side of the flow for this packet, or * FLOW_SIDX_NONE if we couldn't find or create a flow. @@ -291,6 +292,8 @@ flow_sidx_t udp_flow_from_tap(const struct ctx *c, unsigned int qpair, srcport, dstport); if ((uflow = udp_at_sidx(sidx))) { udp_flow_activity(uflow, sidx.sidei, now); + /* update qpair */ + FLOW_SETQP(uflow, qpair); /* if qpair changes, update epollfd */ return flow_sidx_opposite(sidx); } -- 2.54.0