On Tue, Jun 16, 2026 at 07:10:52PM +0200, Laurent Vivier wrote: > The flow table free list, hash table, and flow_new_entry are global > shared state accessed from multiple threads. > > Protect flow_alloc(), flow_alloc_cancel(), flow_hash_insert(), > flow_hash_remove(), and the free list rebuild in flow_defer_handler() > with a pthread_rwlock_t: writers for mutations, readers for lookups. > > Make flow_new_entry _Thread_local so each thread independently tracks > its own in-progress allocation. > > Since the lock is released between flow_alloc() and flow_activate(), > other threads can observe intermediate flow states (NEW, INI, TGT, > TYPED) during traversal. Adapt flow_foreach() and flow_defer_handler() > accordingly: skip these entries silently rather than treating them as > errors, and break free-list cluster merging across them. > > Filter flow_defer_handler()'s first loop by qpair, so each thread > only processes its own flows. > > Signed-off-by: Laurent Vivier I'm skipping review of this, since it sounds like you're reworking the flow table locking significantly anyway. > --- > flow.c | 59 +++++++++++++++++++++++++++++++++++++++++++--------- > flow_table.h | 2 +- > 2 files changed, 50 insertions(+), 11 deletions(-) > > diff --git a/flow.c b/flow.c > index 08c7620c7b0f..149360c3ec87 100644 > --- a/flow.c > +++ b/flow.c > @@ -12,6 +12,8 @@ > #include > #include > > +#include > + > #include "util.h" > #include "ip.h" > #include "passt.h" > @@ -129,7 +131,7 @@ static_assert(ARRAY_SIZE(flow_epoll) == FLOW_NUM_TYPES, > > unsigned flow_first_free; > union flow flowtab[FLOW_MAX]; > -static const union flow *flow_new_entry; /* = NULL */ > +static _Thread_local const union flow *flow_new_entry; /* = NULL */ > int qpair_to_fd[FLOW_QPAIR_SIZE]; > > /* Hash table to index it */ > @@ -142,6 +144,8 @@ static flow_sidx_t flow_hashtab[FLOW_HASH_SIZE]; > static_assert(ARRAY_SIZE(flow_hashtab) >= 2 * FLOW_MAX, > "Safe linear probing requires hash table with more entries than the number of sides in the flow table"); > > +static pthread_rwlock_t flow_lock = PTHREAD_RWLOCK_INITIALIZER; > + > /** flowside_from_af() - Initialise flowside from addresses > * @side: flowside to initialise > * @af: Address family (AF_INET or AF_INET6) > @@ -616,12 +620,18 @@ void flow_activate(struct flow_common *f) > */ > union flow *flow_alloc(void) > { > - union flow *flow = &flowtab[flow_first_free]; > + union flow *flow; > + > + pthread_rwlock_wrlock(&flow_lock); > + > + flow = &flowtab[flow_first_free]; > > assert(!flow_new_entry); > > - if (flow_first_free >= FLOW_MAX) > + if (flow_first_free >= FLOW_MAX) { > + pthread_rwlock_unlock(&flow_lock); > return NULL; > + } > > assert(flow->f.state == FLOW_STATE_FREE); > assert(flow->f.type == FLOW_TYPE_NONE); > @@ -650,6 +660,8 @@ union flow *flow_alloc(void) > memset(flow, 0, sizeof(*flow)); > flow_set_state(&flow->f, FLOW_STATE_NEW); > > + pthread_rwlock_unlock(&flow_lock); > + > return flow; > } > > @@ -661,6 +673,8 @@ union flow *flow_alloc(void) > */ > void flow_alloc_cancel(union flow *flow) > { > + pthread_rwlock_wrlock(&flow_lock); > + > assert(flow_new_entry == flow); > assert(flow->f.state == FLOW_STATE_NEW || > flow->f.state == FLOW_STATE_INI || > @@ -678,6 +692,8 @@ void flow_alloc_cancel(union flow *flow) > flow->free.next = flow_first_free; > flow_first_free = FLOW_IDX(flow); > flow_new_entry = NULL; > + > + pthread_rwlock_unlock(&flow_lock); > } > > /** > @@ -763,9 +779,13 @@ static inline unsigned flow_hash_probe(const struct ctx *c, flow_sidx_t sidx) > uint64_t flow_hash_insert(const struct ctx *c, flow_sidx_t sidx) > { > uint64_t hash = flow_sidx_hash(c, sidx); > - unsigned b = flow_hash_probe_(hash, sidx); > + unsigned b; > > + pthread_rwlock_wrlock(&flow_lock); > + b = flow_hash_probe_(hash, sidx); > flow_hashtab[b] = sidx; > + pthread_rwlock_unlock(&flow_lock); > + > flow_dbg(flow_at_sidx(sidx), "Side %u hash table insert: bucket: %u", > sidx.sidei, b); > > @@ -779,10 +799,16 @@ uint64_t flow_hash_insert(const struct ctx *c, flow_sidx_t sidx) > */ > void flow_hash_remove(const struct ctx *c, flow_sidx_t sidx) > { > - unsigned b = flow_hash_probe(c, sidx), s; > + unsigned b, s; > > - if (!flow_sidx_valid(flow_hashtab[b])) > + pthread_rwlock_wrlock(&flow_lock); > + > + b = flow_hash_probe(c, sidx); > + > + if (!flow_sidx_valid(flow_hashtab[b])) { > + pthread_rwlock_unlock(&flow_lock); > return; /* Redundant remove */ > + } > > flow_dbg(flow_at_sidx(sidx), "Side %u hash table remove: bucket: %u", > sidx.sidei, b); > @@ -802,6 +828,8 @@ void flow_hash_remove(const struct ctx *c, flow_sidx_t sidx) > } > > flow_hashtab[b] = FLOW_SIDX_NONE; > + > + pthread_rwlock_unlock(&flow_lock); > } > > /** > @@ -816,10 +844,12 @@ void flow_hash_remove(const struct ctx *c, flow_sidx_t sidx) > static flow_sidx_t flowside_lookup(const struct ctx *c, uint8_t proto, > uint8_t pif, const struct flowside *side) > { > - flow_sidx_t sidx; > + flow_sidx_t sidx, ret; > union flow *flow; > unsigned b; > > + pthread_rwlock_rdlock(&flow_lock); > + > b = flow_hash(c, proto, pif, side) % FLOW_HASH_SIZE; > while ((sidx = flow_hashtab[b], flow = flow_at_sidx(sidx)) && > !(FLOW_PROTO(&flow->f) == proto && > @@ -827,7 +857,11 @@ static flow_sidx_t flowside_lookup(const struct ctx *c, uint8_t proto, > flowside_eq(&flow->f.side[sidx.sidei], side))) > b = mod_sub(b, 1, FLOW_HASH_SIZE); > > - return flow_hashtab[b]; > + ret = flow_hashtab[b]; > + > + pthread_rwlock_unlock(&flow_lock); > + > + return ret; > } > > /** > @@ -920,6 +954,9 @@ void flow_defer_handler(const struct ctx *c, const struct timespec *now, > flow_foreach(flow) { > bool closed = false; > > + if (flow->f.qpair != qpair) > + continue; > + > switch (flow->f.type) { > case FLOW_TYPE_NONE: > assert(false); > @@ -951,6 +988,7 @@ void flow_defer_handler(const struct ctx *c, const struct timespec *now, > } > > /* Second step: actually free the flows */ > + pthread_rwlock_wrlock(&flow_lock); > flow_foreach_slot(flow) { > switch (flow->f.state) { > case FLOW_STATE_FREE: { > @@ -979,8 +1017,8 @@ void flow_defer_handler(const struct ctx *c, const struct timespec *now, > case FLOW_STATE_INI: > case FLOW_STATE_TGT: > case FLOW_STATE_TYPED: > - /* Incomplete flow at end of cycle */ > - assert(false); > + /* In-progress allocation on another thread */ > + free_head = NULL; > break; > > case FLOW_STATE_ACTIVE: > @@ -1012,6 +1050,7 @@ void flow_defer_handler(const struct ctx *c, const struct timespec *now, > } > > *last_next = FLOW_MAX; > + pthread_rwlock_unlock(&flow_lock); > } > > /** > diff --git a/flow_table.h b/flow_table.h > index e4ff6f73c35c..f2545390205a 100644 > --- a/flow_table.h > +++ b/flow_table.h > @@ -72,7 +72,7 @@ extern union flow flowtab[]; > (flow) += (flow)->free.n - 1; \ > /* NOLINTNEXTLINE(readability-inconsistent-ifelse-braces) */\ > else if ((flow)->f.state != FLOW_STATE_ACTIVE) { \ > - flow_err((flow), "Bad flow state during traversal"); \ > + (void)0; /* Differs from bare continue */ \ > continue; \ > } else > > -- > 2.54.0 > -- David Gibson (he or they) | I'll have my music baroque, and my code david AT gibson.dropbear.id.au | minimalist, thank you, not the other way | around. http://www.ozlabs.org/~dgibson