public inbox for passt-dev@passt.top
 help / color / mirror / code / Atom feed
From: Laurent Vivier <lvivier@redhat.com>
To: passt-dev@passt.top
Cc: Laurent Vivier <lvivier@redhat.com>
Subject: [PATCH 8/8] flow: Add mutex and per-qpair filtering to flow table operations
Date: Tue, 16 Jun 2026 19:10:52 +0200	[thread overview]
Message-ID: <20260616171052.3785909-9-lvivier@redhat.com> (raw)
In-Reply-To: <20260616171052.3785909-1-lvivier@redhat.com>

The flow table free list, hash table, and flow_new_entry are global
shared state accessed from multiple threads.

Protect flow_alloc(), flow_alloc_cancel(), flow_hash_insert(),
flow_hash_remove(), and the free list rebuild in flow_defer_handler()
with a pthread_rwlock_t: writers for mutations, readers for lookups.

Make flow_new_entry _Thread_local so each thread independently tracks
its own in-progress allocation.

Since the lock is released between flow_alloc() and flow_activate(),
other threads can observe intermediate flow states (NEW, INI, TGT,
TYPED) during traversal.  Adapt flow_foreach() and flow_defer_handler()
accordingly: skip these entries silently rather than treating them as
errors, and break free-list cluster merging across them.

Filter flow_defer_handler()'s first loop by qpair, so each thread
only processes its own flows.

Signed-off-by: Laurent Vivier <lvivier@redhat.com>
---
 flow.c       | 59 +++++++++++++++++++++++++++++++++++++++++++---------
 flow_table.h |  2 +-
 2 files changed, 50 insertions(+), 11 deletions(-)

diff --git a/flow.c b/flow.c
index 08c7620c7b0f..149360c3ec87 100644
--- a/flow.c
+++ b/flow.c
@@ -12,6 +12,8 @@
 #include <sched.h>
 #include <string.h>
 
+#include <pthread.h>
+
 #include "util.h"
 #include "ip.h"
 #include "passt.h"
@@ -129,7 +131,7 @@ static_assert(ARRAY_SIZE(flow_epoll) == FLOW_NUM_TYPES,
 
 unsigned flow_first_free;
 union flow flowtab[FLOW_MAX];
-static const union flow *flow_new_entry; /* = NULL */
+static _Thread_local const union flow *flow_new_entry; /* = NULL */
 int qpair_to_fd[FLOW_QPAIR_SIZE];
 
 /* Hash table to index it */
@@ -142,6 +144,8 @@ static flow_sidx_t flow_hashtab[FLOW_HASH_SIZE];
 static_assert(ARRAY_SIZE(flow_hashtab) >= 2 * FLOW_MAX,
 "Safe linear probing requires hash table with more entries than the number of sides in the flow table");
 
+static pthread_rwlock_t flow_lock = PTHREAD_RWLOCK_INITIALIZER;
+
 /** flowside_from_af() - Initialise flowside from addresses
  * @side:	flowside to initialise
  * @af:		Address family (AF_INET or AF_INET6)
@@ -616,12 +620,18 @@ void flow_activate(struct flow_common *f)
  */
 union flow *flow_alloc(void)
 {
-	union flow *flow = &flowtab[flow_first_free];
+	union flow *flow;
+
+	pthread_rwlock_wrlock(&flow_lock);
+
+	flow = &flowtab[flow_first_free];
 
 	assert(!flow_new_entry);
 
-	if (flow_first_free >= FLOW_MAX)
+	if (flow_first_free >= FLOW_MAX) {
+		pthread_rwlock_unlock(&flow_lock);
 		return NULL;
+	}
 
 	assert(flow->f.state == FLOW_STATE_FREE);
 	assert(flow->f.type == FLOW_TYPE_NONE);
@@ -650,6 +660,8 @@ union flow *flow_alloc(void)
 	memset(flow, 0, sizeof(*flow));
 	flow_set_state(&flow->f, FLOW_STATE_NEW);
 
+	pthread_rwlock_unlock(&flow_lock);
+
 	return flow;
 }
 
@@ -661,6 +673,8 @@ union flow *flow_alloc(void)
  */
 void flow_alloc_cancel(union flow *flow)
 {
+	pthread_rwlock_wrlock(&flow_lock);
+
 	assert(flow_new_entry == flow);
 	assert(flow->f.state == FLOW_STATE_NEW ||
 	       flow->f.state == FLOW_STATE_INI ||
@@ -678,6 +692,8 @@ void flow_alloc_cancel(union flow *flow)
 	flow->free.next = flow_first_free;
 	flow_first_free = FLOW_IDX(flow);
 	flow_new_entry = NULL;
+
+	pthread_rwlock_unlock(&flow_lock);
 }
 
 /**
@@ -763,9 +779,13 @@ static inline unsigned flow_hash_probe(const struct ctx *c, flow_sidx_t sidx)
 uint64_t flow_hash_insert(const struct ctx *c, flow_sidx_t sidx)
 {
 	uint64_t hash = flow_sidx_hash(c, sidx);
-	unsigned b = flow_hash_probe_(hash, sidx);
+	unsigned b;
 
+	pthread_rwlock_wrlock(&flow_lock);
+	b = flow_hash_probe_(hash, sidx);
 	flow_hashtab[b] = sidx;
+	pthread_rwlock_unlock(&flow_lock);
+
 	flow_dbg(flow_at_sidx(sidx), "Side %u hash table insert: bucket: %u",
 		 sidx.sidei, b);
 
@@ -779,10 +799,16 @@ uint64_t flow_hash_insert(const struct ctx *c, flow_sidx_t sidx)
  */
 void flow_hash_remove(const struct ctx *c, flow_sidx_t sidx)
 {
-	unsigned b = flow_hash_probe(c, sidx), s;
+	unsigned b, s;
 
-	if (!flow_sidx_valid(flow_hashtab[b]))
+	pthread_rwlock_wrlock(&flow_lock);
+
+	b = flow_hash_probe(c, sidx);
+
+	if (!flow_sidx_valid(flow_hashtab[b])) {
+		pthread_rwlock_unlock(&flow_lock);
 		return; /* Redundant remove */
+	}
 
 	flow_dbg(flow_at_sidx(sidx), "Side %u hash table remove: bucket: %u",
 		 sidx.sidei, b);
@@ -802,6 +828,8 @@ void flow_hash_remove(const struct ctx *c, flow_sidx_t sidx)
 	}
 
 	flow_hashtab[b] = FLOW_SIDX_NONE;
+
+	pthread_rwlock_unlock(&flow_lock);
 }
 
 /**
@@ -816,10 +844,12 @@ void flow_hash_remove(const struct ctx *c, flow_sidx_t sidx)
 static flow_sidx_t flowside_lookup(const struct ctx *c, uint8_t proto,
 				   uint8_t pif, const struct flowside *side)
 {
-	flow_sidx_t sidx;
+	flow_sidx_t sidx, ret;
 	union flow *flow;
 	unsigned b;
 
+	pthread_rwlock_rdlock(&flow_lock);
+
 	b = flow_hash(c, proto, pif, side) % FLOW_HASH_SIZE;
 	while ((sidx = flow_hashtab[b], flow = flow_at_sidx(sidx)) &&
 	       !(FLOW_PROTO(&flow->f) == proto &&
@@ -827,7 +857,11 @@ static flow_sidx_t flowside_lookup(const struct ctx *c, uint8_t proto,
 		 flowside_eq(&flow->f.side[sidx.sidei], side)))
 		b = mod_sub(b, 1, FLOW_HASH_SIZE);
 
-	return flow_hashtab[b];
+	ret = flow_hashtab[b];
+
+	pthread_rwlock_unlock(&flow_lock);
+
+	return ret;
 }
 
 /**
@@ -920,6 +954,9 @@ void flow_defer_handler(const struct ctx *c, const struct timespec *now,
 	flow_foreach(flow) {
 		bool closed = false;
 
+		if (flow->f.qpair != qpair)
+			continue;
+
 		switch (flow->f.type) {
 		case FLOW_TYPE_NONE:
 			assert(false);
@@ -951,6 +988,7 @@ void flow_defer_handler(const struct ctx *c, const struct timespec *now,
 	}
 
 	/* Second step: actually free the flows */
+	pthread_rwlock_wrlock(&flow_lock);
 	flow_foreach_slot(flow) {
 		switch (flow->f.state) {
 		case FLOW_STATE_FREE: {
@@ -979,8 +1017,8 @@ void flow_defer_handler(const struct ctx *c, const struct timespec *now,
 		case FLOW_STATE_INI:
 		case FLOW_STATE_TGT:
 		case FLOW_STATE_TYPED:
-			/* Incomplete flow at end of cycle */
-			assert(false);
+			/* In-progress allocation on another thread */
+			free_head = NULL;
 			break;
 
 		case FLOW_STATE_ACTIVE:
@@ -1012,6 +1050,7 @@ void flow_defer_handler(const struct ctx *c, const struct timespec *now,
 	}
 
 	*last_next = FLOW_MAX;
+	pthread_rwlock_unlock(&flow_lock);
 }
 
 /**
diff --git a/flow_table.h b/flow_table.h
index e4ff6f73c35c..f2545390205a 100644
--- a/flow_table.h
+++ b/flow_table.h
@@ -72,7 +72,7 @@ extern union flow flowtab[];
 			(flow) += (flow)->free.n - 1;			\
 		/* NOLINTNEXTLINE(readability-inconsistent-ifelse-braces) */\
 		else if ((flow)->f.state != FLOW_STATE_ACTIVE) {	\
-			flow_err((flow), "Bad flow state during traversal"); \
+			(void)0; /* Differs from bare continue */	\
 			continue;					\
 		} else
 
-- 
2.54.0


      parent reply	other threads:[~2026-06-16 17:11 UTC|newest]

Thread overview: 9+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2026-06-16 17:10 [PATCH 0/8] multithreading: Prepare data structures for concurrent queue pair workers Laurent Vivier
2026-06-16 17:10 ` [PATCH 1/8] tap: Convert packet pools to per-queue-pair arrays for multiqueue Laurent Vivier
2026-06-16 17:10 ` [PATCH 2/8] tap: Make L4 sequence pools per-qpair for thread safety Laurent Vivier
2026-06-16 17:10 ` [PATCH 3/8] tcp: Make static buffers stack-local " Laurent Vivier
2026-06-16 17:10 ` [PATCH 4/8] udp_vu: Make virtqueue " Laurent Vivier
2026-06-16 17:10 ` [PATCH 5/8] flow: Make flow timer per-caller " Laurent Vivier
2026-06-16 17:10 ` [PATCH 6/8] tcp: Make TCP timer state per-caller and guard global tasks Laurent Vivier
2026-06-16 17:10 ` [PATCH 7/8] tcp: Protect init socket pools with mutex for thread safety Laurent Vivier
2026-06-16 17:10 ` Laurent Vivier [this message]

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=20260616171052.3785909-9-lvivier@redhat.com \
    --to=lvivier@redhat.com \
    --cc=passt-dev@passt.top \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
Code repositories for project(s) associated with this public inbox

	https://passt.top/passt

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for IMAP folder(s).