From: Laurent Vivier <lvivier@redhat.com>
To: passt-dev@passt.top
Cc: Laurent Vivier <lvivier@redhat.com>
Subject: [PATCH 8/8] flow: Add mutex and per-qpair filtering to flow table operations
Date: Tue, 16 Jun 2026 19:10:52 +0200 [thread overview]
Message-ID: <20260616171052.3785909-9-lvivier@redhat.com> (raw)
In-Reply-To: <20260616171052.3785909-1-lvivier@redhat.com>
The flow table free list, hash table, and flow_new_entry are global
shared state accessed from multiple threads.
Protect flow_alloc(), flow_alloc_cancel(), flow_hash_insert(),
flow_hash_remove(), and the free list rebuild in flow_defer_handler()
with a pthread_rwlock_t: writers for mutations, readers for lookups.
Make flow_new_entry _Thread_local so each thread independently tracks
its own in-progress allocation.
Since the lock is released between flow_alloc() and flow_activate(),
other threads can observe intermediate flow states (NEW, INI, TGT,
TYPED) during traversal. Adapt flow_foreach() and flow_defer_handler()
accordingly: skip these entries silently rather than treating them as
errors, and break free-list cluster merging across them.
Filter flow_defer_handler()'s first loop by qpair, so each thread
only processes its own flows.
Signed-off-by: Laurent Vivier <lvivier@redhat.com>
---
flow.c | 59 +++++++++++++++++++++++++++++++++++++++++++---------
flow_table.h | 2 +-
2 files changed, 50 insertions(+), 11 deletions(-)
diff --git a/flow.c b/flow.c
index 08c7620c7b0f..149360c3ec87 100644
--- a/flow.c
+++ b/flow.c
@@ -12,6 +12,8 @@
#include <sched.h>
#include <string.h>
+#include <pthread.h>
+
#include "util.h"
#include "ip.h"
#include "passt.h"
@@ -129,7 +131,7 @@ static_assert(ARRAY_SIZE(flow_epoll) == FLOW_NUM_TYPES,
unsigned flow_first_free;
union flow flowtab[FLOW_MAX];
-static const union flow *flow_new_entry; /* = NULL */
+static _Thread_local const union flow *flow_new_entry; /* = NULL */
int qpair_to_fd[FLOW_QPAIR_SIZE];
/* Hash table to index it */
@@ -142,6 +144,8 @@ static flow_sidx_t flow_hashtab[FLOW_HASH_SIZE];
static_assert(ARRAY_SIZE(flow_hashtab) >= 2 * FLOW_MAX,
"Safe linear probing requires hash table with more entries than the number of sides in the flow table");
+static pthread_rwlock_t flow_lock = PTHREAD_RWLOCK_INITIALIZER;
+
/** flowside_from_af() - Initialise flowside from addresses
* @side: flowside to initialise
* @af: Address family (AF_INET or AF_INET6)
@@ -616,12 +620,18 @@ void flow_activate(struct flow_common *f)
*/
union flow *flow_alloc(void)
{
- union flow *flow = &flowtab[flow_first_free];
+ union flow *flow;
+
+ pthread_rwlock_wrlock(&flow_lock);
+
+ flow = &flowtab[flow_first_free];
assert(!flow_new_entry);
- if (flow_first_free >= FLOW_MAX)
+ if (flow_first_free >= FLOW_MAX) {
+ pthread_rwlock_unlock(&flow_lock);
return NULL;
+ }
assert(flow->f.state == FLOW_STATE_FREE);
assert(flow->f.type == FLOW_TYPE_NONE);
@@ -650,6 +660,8 @@ union flow *flow_alloc(void)
memset(flow, 0, sizeof(*flow));
flow_set_state(&flow->f, FLOW_STATE_NEW);
+ pthread_rwlock_unlock(&flow_lock);
+
return flow;
}
@@ -661,6 +673,8 @@ union flow *flow_alloc(void)
*/
void flow_alloc_cancel(union flow *flow)
{
+ pthread_rwlock_wrlock(&flow_lock);
+
assert(flow_new_entry == flow);
assert(flow->f.state == FLOW_STATE_NEW ||
flow->f.state == FLOW_STATE_INI ||
@@ -678,6 +692,8 @@ void flow_alloc_cancel(union flow *flow)
flow->free.next = flow_first_free;
flow_first_free = FLOW_IDX(flow);
flow_new_entry = NULL;
+
+ pthread_rwlock_unlock(&flow_lock);
}
/**
@@ -763,9 +779,13 @@ static inline unsigned flow_hash_probe(const struct ctx *c, flow_sidx_t sidx)
uint64_t flow_hash_insert(const struct ctx *c, flow_sidx_t sidx)
{
uint64_t hash = flow_sidx_hash(c, sidx);
- unsigned b = flow_hash_probe_(hash, sidx);
+ unsigned b;
+ pthread_rwlock_wrlock(&flow_lock);
+ b = flow_hash_probe_(hash, sidx);
flow_hashtab[b] = sidx;
+ pthread_rwlock_unlock(&flow_lock);
+
flow_dbg(flow_at_sidx(sidx), "Side %u hash table insert: bucket: %u",
sidx.sidei, b);
@@ -779,10 +799,16 @@ uint64_t flow_hash_insert(const struct ctx *c, flow_sidx_t sidx)
*/
void flow_hash_remove(const struct ctx *c, flow_sidx_t sidx)
{
- unsigned b = flow_hash_probe(c, sidx), s;
+ unsigned b, s;
- if (!flow_sidx_valid(flow_hashtab[b]))
+ pthread_rwlock_wrlock(&flow_lock);
+
+ b = flow_hash_probe(c, sidx);
+
+ if (!flow_sidx_valid(flow_hashtab[b])) {
+ pthread_rwlock_unlock(&flow_lock);
return; /* Redundant remove */
+ }
flow_dbg(flow_at_sidx(sidx), "Side %u hash table remove: bucket: %u",
sidx.sidei, b);
@@ -802,6 +828,8 @@ void flow_hash_remove(const struct ctx *c, flow_sidx_t sidx)
}
flow_hashtab[b] = FLOW_SIDX_NONE;
+
+ pthread_rwlock_unlock(&flow_lock);
}
/**
@@ -816,10 +844,12 @@ void flow_hash_remove(const struct ctx *c, flow_sidx_t sidx)
static flow_sidx_t flowside_lookup(const struct ctx *c, uint8_t proto,
uint8_t pif, const struct flowside *side)
{
- flow_sidx_t sidx;
+ flow_sidx_t sidx, ret;
union flow *flow;
unsigned b;
+ pthread_rwlock_rdlock(&flow_lock);
+
b = flow_hash(c, proto, pif, side) % FLOW_HASH_SIZE;
while ((sidx = flow_hashtab[b], flow = flow_at_sidx(sidx)) &&
!(FLOW_PROTO(&flow->f) == proto &&
@@ -827,7 +857,11 @@ static flow_sidx_t flowside_lookup(const struct ctx *c, uint8_t proto,
flowside_eq(&flow->f.side[sidx.sidei], side)))
b = mod_sub(b, 1, FLOW_HASH_SIZE);
- return flow_hashtab[b];
+ ret = flow_hashtab[b];
+
+ pthread_rwlock_unlock(&flow_lock);
+
+ return ret;
}
/**
@@ -920,6 +954,9 @@ void flow_defer_handler(const struct ctx *c, const struct timespec *now,
flow_foreach(flow) {
bool closed = false;
+ if (flow->f.qpair != qpair)
+ continue;
+
switch (flow->f.type) {
case FLOW_TYPE_NONE:
assert(false);
@@ -951,6 +988,7 @@ void flow_defer_handler(const struct ctx *c, const struct timespec *now,
}
/* Second step: actually free the flows */
+ pthread_rwlock_wrlock(&flow_lock);
flow_foreach_slot(flow) {
switch (flow->f.state) {
case FLOW_STATE_FREE: {
@@ -979,8 +1017,8 @@ void flow_defer_handler(const struct ctx *c, const struct timespec *now,
case FLOW_STATE_INI:
case FLOW_STATE_TGT:
case FLOW_STATE_TYPED:
- /* Incomplete flow at end of cycle */
- assert(false);
+ /* In-progress allocation on another thread */
+ free_head = NULL;
break;
case FLOW_STATE_ACTIVE:
@@ -1012,6 +1050,7 @@ void flow_defer_handler(const struct ctx *c, const struct timespec *now,
}
*last_next = FLOW_MAX;
+ pthread_rwlock_unlock(&flow_lock);
}
/**
diff --git a/flow_table.h b/flow_table.h
index e4ff6f73c35c..f2545390205a 100644
--- a/flow_table.h
+++ b/flow_table.h
@@ -72,7 +72,7 @@ extern union flow flowtab[];
(flow) += (flow)->free.n - 1; \
/* NOLINTNEXTLINE(readability-inconsistent-ifelse-braces) */\
else if ((flow)->f.state != FLOW_STATE_ACTIVE) { \
- flow_err((flow), "Bad flow state during traversal"); \
+ (void)0; /* Differs from bare continue */ \
continue; \
} else
--
2.54.0
prev parent reply other threads:[~2026-06-16 17:11 UTC|newest]
Thread overview: 9+ messages / expand[flat|nested] mbox.gz Atom feed top
2026-06-16 17:10 [PATCH 0/8] multithreading: Prepare data structures for concurrent queue pair workers Laurent Vivier
2026-06-16 17:10 ` [PATCH 1/8] tap: Convert packet pools to per-queue-pair arrays for multiqueue Laurent Vivier
2026-06-16 17:10 ` [PATCH 2/8] tap: Make L4 sequence pools per-qpair for thread safety Laurent Vivier
2026-06-16 17:10 ` [PATCH 3/8] tcp: Make static buffers stack-local " Laurent Vivier
2026-06-16 17:10 ` [PATCH 4/8] udp_vu: Make virtqueue " Laurent Vivier
2026-06-16 17:10 ` [PATCH 5/8] flow: Make flow timer per-caller " Laurent Vivier
2026-06-16 17:10 ` [PATCH 6/8] tcp: Make TCP timer state per-caller and guard global tasks Laurent Vivier
2026-06-16 17:10 ` [PATCH 7/8] tcp: Protect init socket pools with mutex for thread safety Laurent Vivier
2026-06-16 17:10 ` Laurent Vivier [this message]
Reply instructions:
You may reply publicly to this message via plain-text email
using any one of the following methods:
* Save the following mbox file, import it into your mail client,
and reply-to-all from there: mbox
Avoid top-posting and favor interleaved quoting:
https://en.wikipedia.org/wiki/Posting_style#Interleaved_style
* Reply using the --to, --cc, and --in-reply-to
switches of git-send-email(1):
git send-email \
--in-reply-to=20260616171052.3785909-9-lvivier@redhat.com \
--to=lvivier@redhat.com \
--cc=passt-dev@passt.top \
/path/to/YOUR_REPLY
https://kernel.org/pub/software/scm/git/docs/git-send-email.html
* If your mail client supports setting the In-Reply-To header
via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line
before the message body.
Code repositories for project(s) associated with this public inbox
https://passt.top/passt
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for IMAP folder(s).