From: David Gibson <david@gibson.dropbear.id.au>
To: Laurent Vivier <lvivier@redhat.com>
Cc: passt-dev@passt.top
Subject: Re: [PATCH 8/8] flow: Add mutex and per-qpair filtering to flow table operations
Date: Thu, 2 Jul 2026 13:03:41 +1000 [thread overview]
Message-ID: <akXVDbc4p_3P2LSl@zatzit> (raw)
In-Reply-To: <20260616171052.3785909-9-lvivier@redhat.com>
[-- Attachment #1: Type: text/plain, Size: 7800 bytes --]
On Tue, Jun 16, 2026 at 07:10:52PM +0200, Laurent Vivier wrote:
> The flow table free list, hash table, and flow_new_entry are global
> shared state accessed from multiple threads.
>
> Protect flow_alloc(), flow_alloc_cancel(), flow_hash_insert(),
> flow_hash_remove(), and the free list rebuild in flow_defer_handler()
> with a pthread_rwlock_t: writers for mutations, readers for lookups.
>
> Make flow_new_entry _Thread_local so each thread independently tracks
> its own in-progress allocation.
>
> Since the lock is released between flow_alloc() and flow_activate(),
> other threads can observe intermediate flow states (NEW, INI, TGT,
> TYPED) during traversal. Adapt flow_foreach() and flow_defer_handler()
> accordingly: skip these entries silently rather than treating them as
> errors, and break free-list cluster merging across them.
>
> Filter flow_defer_handler()'s first loop by qpair, so each thread
> only processes its own flows.
>
> Signed-off-by: Laurent Vivier <lvivier@redhat.com>
I'm skipping review of this, since it sounds like you're reworking the
flow table locking significantly anyway.
> ---
> flow.c | 59 +++++++++++++++++++++++++++++++++++++++++++---------
> flow_table.h | 2 +-
> 2 files changed, 50 insertions(+), 11 deletions(-)
>
> diff --git a/flow.c b/flow.c
> index 08c7620c7b0f..149360c3ec87 100644
> --- a/flow.c
> +++ b/flow.c
> @@ -12,6 +12,8 @@
> #include <sched.h>
> #include <string.h>
>
> +#include <pthread.h>
> +
> #include "util.h"
> #include "ip.h"
> #include "passt.h"
> @@ -129,7 +131,7 @@ static_assert(ARRAY_SIZE(flow_epoll) == FLOW_NUM_TYPES,
>
> unsigned flow_first_free;
> union flow flowtab[FLOW_MAX];
> -static const union flow *flow_new_entry; /* = NULL */
> +static _Thread_local const union flow *flow_new_entry; /* = NULL */
> int qpair_to_fd[FLOW_QPAIR_SIZE];
>
> /* Hash table to index it */
> @@ -142,6 +144,8 @@ static flow_sidx_t flow_hashtab[FLOW_HASH_SIZE];
> static_assert(ARRAY_SIZE(flow_hashtab) >= 2 * FLOW_MAX,
> "Safe linear probing requires hash table with more entries than the number of sides in the flow table");
>
> +static pthread_rwlock_t flow_lock = PTHREAD_RWLOCK_INITIALIZER;
> +
> /** flowside_from_af() - Initialise flowside from addresses
> * @side: flowside to initialise
> * @af: Address family (AF_INET or AF_INET6)
> @@ -616,12 +620,18 @@ void flow_activate(struct flow_common *f)
> */
> union flow *flow_alloc(void)
> {
> - union flow *flow = &flowtab[flow_first_free];
> + union flow *flow;
> +
> + pthread_rwlock_wrlock(&flow_lock);
> +
> + flow = &flowtab[flow_first_free];
>
> assert(!flow_new_entry);
>
> - if (flow_first_free >= FLOW_MAX)
> + if (flow_first_free >= FLOW_MAX) {
> + pthread_rwlock_unlock(&flow_lock);
> return NULL;
> + }
>
> assert(flow->f.state == FLOW_STATE_FREE);
> assert(flow->f.type == FLOW_TYPE_NONE);
> @@ -650,6 +660,8 @@ union flow *flow_alloc(void)
> memset(flow, 0, sizeof(*flow));
> flow_set_state(&flow->f, FLOW_STATE_NEW);
>
> + pthread_rwlock_unlock(&flow_lock);
> +
> return flow;
> }
>
> @@ -661,6 +673,8 @@ union flow *flow_alloc(void)
> */
> void flow_alloc_cancel(union flow *flow)
> {
> + pthread_rwlock_wrlock(&flow_lock);
> +
> assert(flow_new_entry == flow);
> assert(flow->f.state == FLOW_STATE_NEW ||
> flow->f.state == FLOW_STATE_INI ||
> @@ -678,6 +692,8 @@ void flow_alloc_cancel(union flow *flow)
> flow->free.next = flow_first_free;
> flow_first_free = FLOW_IDX(flow);
> flow_new_entry = NULL;
> +
> + pthread_rwlock_unlock(&flow_lock);
> }
>
> /**
> @@ -763,9 +779,13 @@ static inline unsigned flow_hash_probe(const struct ctx *c, flow_sidx_t sidx)
> uint64_t flow_hash_insert(const struct ctx *c, flow_sidx_t sidx)
> {
> uint64_t hash = flow_sidx_hash(c, sidx);
> - unsigned b = flow_hash_probe_(hash, sidx);
> + unsigned b;
>
> + pthread_rwlock_wrlock(&flow_lock);
> + b = flow_hash_probe_(hash, sidx);
> flow_hashtab[b] = sidx;
> + pthread_rwlock_unlock(&flow_lock);
> +
> flow_dbg(flow_at_sidx(sidx), "Side %u hash table insert: bucket: %u",
> sidx.sidei, b);
>
> @@ -779,10 +799,16 @@ uint64_t flow_hash_insert(const struct ctx *c, flow_sidx_t sidx)
> */
> void flow_hash_remove(const struct ctx *c, flow_sidx_t sidx)
> {
> - unsigned b = flow_hash_probe(c, sidx), s;
> + unsigned b, s;
>
> - if (!flow_sidx_valid(flow_hashtab[b]))
> + pthread_rwlock_wrlock(&flow_lock);
> +
> + b = flow_hash_probe(c, sidx);
> +
> + if (!flow_sidx_valid(flow_hashtab[b])) {
> + pthread_rwlock_unlock(&flow_lock);
> return; /* Redundant remove */
> + }
>
> flow_dbg(flow_at_sidx(sidx), "Side %u hash table remove: bucket: %u",
> sidx.sidei, b);
> @@ -802,6 +828,8 @@ void flow_hash_remove(const struct ctx *c, flow_sidx_t sidx)
> }
>
> flow_hashtab[b] = FLOW_SIDX_NONE;
> +
> + pthread_rwlock_unlock(&flow_lock);
> }
>
> /**
> @@ -816,10 +844,12 @@ void flow_hash_remove(const struct ctx *c, flow_sidx_t sidx)
> static flow_sidx_t flowside_lookup(const struct ctx *c, uint8_t proto,
> uint8_t pif, const struct flowside *side)
> {
> - flow_sidx_t sidx;
> + flow_sidx_t sidx, ret;
> union flow *flow;
> unsigned b;
>
> + pthread_rwlock_rdlock(&flow_lock);
> +
> b = flow_hash(c, proto, pif, side) % FLOW_HASH_SIZE;
> while ((sidx = flow_hashtab[b], flow = flow_at_sidx(sidx)) &&
> !(FLOW_PROTO(&flow->f) == proto &&
> @@ -827,7 +857,11 @@ static flow_sidx_t flowside_lookup(const struct ctx *c, uint8_t proto,
> flowside_eq(&flow->f.side[sidx.sidei], side)))
> b = mod_sub(b, 1, FLOW_HASH_SIZE);
>
> - return flow_hashtab[b];
> + ret = flow_hashtab[b];
> +
> + pthread_rwlock_unlock(&flow_lock);
> +
> + return ret;
> }
>
> /**
> @@ -920,6 +954,9 @@ void flow_defer_handler(const struct ctx *c, const struct timespec *now,
> flow_foreach(flow) {
> bool closed = false;
>
> + if (flow->f.qpair != qpair)
> + continue;
> +
> switch (flow->f.type) {
> case FLOW_TYPE_NONE:
> assert(false);
> @@ -951,6 +988,7 @@ void flow_defer_handler(const struct ctx *c, const struct timespec *now,
> }
>
> /* Second step: actually free the flows */
> + pthread_rwlock_wrlock(&flow_lock);
> flow_foreach_slot(flow) {
> switch (flow->f.state) {
> case FLOW_STATE_FREE: {
> @@ -979,8 +1017,8 @@ void flow_defer_handler(const struct ctx *c, const struct timespec *now,
> case FLOW_STATE_INI:
> case FLOW_STATE_TGT:
> case FLOW_STATE_TYPED:
> - /* Incomplete flow at end of cycle */
> - assert(false);
> + /* In-progress allocation on another thread */
> + free_head = NULL;
> break;
>
> case FLOW_STATE_ACTIVE:
> @@ -1012,6 +1050,7 @@ void flow_defer_handler(const struct ctx *c, const struct timespec *now,
> }
>
> *last_next = FLOW_MAX;
> + pthread_rwlock_unlock(&flow_lock);
> }
>
> /**
> diff --git a/flow_table.h b/flow_table.h
> index e4ff6f73c35c..f2545390205a 100644
> --- a/flow_table.h
> +++ b/flow_table.h
> @@ -72,7 +72,7 @@ extern union flow flowtab[];
> (flow) += (flow)->free.n - 1; \
> /* NOLINTNEXTLINE(readability-inconsistent-ifelse-braces) */\
> else if ((flow)->f.state != FLOW_STATE_ACTIVE) { \
> - flow_err((flow), "Bad flow state during traversal"); \
> + (void)0; /* Differs from bare continue */ \
> continue; \
> } else
>
> --
> 2.54.0
>
--
David Gibson (he or they) | I'll have my music baroque, and my code
david AT gibson.dropbear.id.au | minimalist, thank you, not the other way
| around.
http://www.ozlabs.org/~dgibson
[-- Attachment #2: signature.asc --]
[-- Type: application/pgp-signature, Size: 833 bytes --]
prev parent reply other threads:[~2026-07-02 3:42 UTC|newest]
Thread overview: 17+ messages / expand[flat|nested] mbox.gz Atom feed top
2026-06-16 17:10 [PATCH 0/8] multithreading: Prepare data structures for concurrent queue pair workers Laurent Vivier
2026-06-16 17:10 ` [PATCH 1/8] tap: Convert packet pools to per-queue-pair arrays for multiqueue Laurent Vivier
2026-06-29 9:59 ` David Gibson
2026-06-16 17:10 ` [PATCH 2/8] tap: Make L4 sequence pools per-qpair for thread safety Laurent Vivier
2026-07-02 2:27 ` David Gibson
2026-06-16 17:10 ` [PATCH 3/8] tcp: Make static buffers stack-local " Laurent Vivier
2026-07-02 2:32 ` David Gibson
2026-06-16 17:10 ` [PATCH 4/8] udp_vu: Make virtqueue " Laurent Vivier
2026-07-02 2:37 ` David Gibson
2026-06-16 17:10 ` [PATCH 5/8] flow: Make flow timer per-caller " Laurent Vivier
2026-07-02 2:49 ` David Gibson
2026-06-16 17:10 ` [PATCH 6/8] tcp: Make TCP timer state per-caller and guard global tasks Laurent Vivier
2026-07-02 2:55 ` David Gibson
2026-06-16 17:10 ` [PATCH 7/8] tcp: Protect init socket pools with mutex for thread safety Laurent Vivier
2026-07-02 2:59 ` David Gibson
2026-06-16 17:10 ` [PATCH 8/8] flow: Add mutex and per-qpair filtering to flow table operations Laurent Vivier
2026-07-02 3:03 ` David Gibson [this message]
Reply instructions:
You may reply publicly to this message via plain-text email
using any one of the following methods:
* Save the following mbox file, import it into your mail client,
and reply-to-all from there: mbox
Avoid top-posting and favor interleaved quoting:
https://en.wikipedia.org/wiki/Posting_style#Interleaved_style
* Reply using the --to, --cc, and --in-reply-to
switches of git-send-email(1):
git send-email \
--in-reply-to=akXVDbc4p_3P2LSl@zatzit \
--to=david@gibson.dropbear.id.au \
--cc=lvivier@redhat.com \
--cc=passt-dev@passt.top \
/path/to/YOUR_REPLY
https://kernel.org/pub/software/scm/git/docs/git-send-email.html
* If your mail client supports setting the In-Reply-To header
via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line
before the message body.
Code repositories for project(s) associated with this public inbox
https://passt.top/passt
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for IMAP folder(s).