public inbox for passt-dev@passt.top
 help / color / mirror / code / Atom feed
From: David Gibson <david@gibson.dropbear.id.au>
To: Laurent Vivier <lvivier@redhat.com>
Cc: passt-dev@passt.top
Subject: Re: [PATCH 8/8] flow: Add mutex and per-qpair filtering to flow table operations
Date: Thu, 2 Jul 2026 13:03:41 +1000	[thread overview]
Message-ID: <akXVDbc4p_3P2LSl@zatzit> (raw)
In-Reply-To: <20260616171052.3785909-9-lvivier@redhat.com>

[-- Attachment #1: Type: text/plain, Size: 7800 bytes --]

On Tue, Jun 16, 2026 at 07:10:52PM +0200, Laurent Vivier wrote:
> The flow table free list, hash table, and flow_new_entry are global
> shared state accessed from multiple threads.
> 
> Protect flow_alloc(), flow_alloc_cancel(), flow_hash_insert(),
> flow_hash_remove(), and the free list rebuild in flow_defer_handler()
> with a pthread_rwlock_t: writers for mutations, readers for lookups.
> 
> Make flow_new_entry _Thread_local so each thread independently tracks
> its own in-progress allocation.
> 
> Since the lock is released between flow_alloc() and flow_activate(),
> other threads can observe intermediate flow states (NEW, INI, TGT,
> TYPED) during traversal.  Adapt flow_foreach() and flow_defer_handler()
> accordingly: skip these entries silently rather than treating them as
> errors, and break free-list cluster merging across them.
> 
> Filter flow_defer_handler()'s first loop by qpair, so each thread
> only processes its own flows.
> 
> Signed-off-by: Laurent Vivier <lvivier@redhat.com>

I'm skipping review of this, since it sounds like you're reworking the
flow table locking significantly anyway.

> ---
>  flow.c       | 59 +++++++++++++++++++++++++++++++++++++++++++---------
>  flow_table.h |  2 +-
>  2 files changed, 50 insertions(+), 11 deletions(-)
> 
> diff --git a/flow.c b/flow.c
> index 08c7620c7b0f..149360c3ec87 100644
> --- a/flow.c
> +++ b/flow.c
> @@ -12,6 +12,8 @@
>  #include <sched.h>
>  #include <string.h>
>  
> +#include <pthread.h>
> +
>  #include "util.h"
>  #include "ip.h"
>  #include "passt.h"
> @@ -129,7 +131,7 @@ static_assert(ARRAY_SIZE(flow_epoll) == FLOW_NUM_TYPES,
>  
>  unsigned flow_first_free;
>  union flow flowtab[FLOW_MAX];
> -static const union flow *flow_new_entry; /* = NULL */
> +static _Thread_local const union flow *flow_new_entry; /* = NULL */
>  int qpair_to_fd[FLOW_QPAIR_SIZE];
>  
>  /* Hash table to index it */
> @@ -142,6 +144,8 @@ static flow_sidx_t flow_hashtab[FLOW_HASH_SIZE];
>  static_assert(ARRAY_SIZE(flow_hashtab) >= 2 * FLOW_MAX,
>  "Safe linear probing requires hash table with more entries than the number of sides in the flow table");
>  
> +static pthread_rwlock_t flow_lock = PTHREAD_RWLOCK_INITIALIZER;
> +
>  /** flowside_from_af() - Initialise flowside from addresses
>   * @side:	flowside to initialise
>   * @af:		Address family (AF_INET or AF_INET6)
> @@ -616,12 +620,18 @@ void flow_activate(struct flow_common *f)
>   */
>  union flow *flow_alloc(void)
>  {
> -	union flow *flow = &flowtab[flow_first_free];
> +	union flow *flow;
> +
> +	pthread_rwlock_wrlock(&flow_lock);
> +
> +	flow = &flowtab[flow_first_free];
>  
>  	assert(!flow_new_entry);
>  
> -	if (flow_first_free >= FLOW_MAX)
> +	if (flow_first_free >= FLOW_MAX) {
> +		pthread_rwlock_unlock(&flow_lock);
>  		return NULL;
> +	}
>  
>  	assert(flow->f.state == FLOW_STATE_FREE);
>  	assert(flow->f.type == FLOW_TYPE_NONE);
> @@ -650,6 +660,8 @@ union flow *flow_alloc(void)
>  	memset(flow, 0, sizeof(*flow));
>  	flow_set_state(&flow->f, FLOW_STATE_NEW);
>  
> +	pthread_rwlock_unlock(&flow_lock);
> +
>  	return flow;
>  }
>  
> @@ -661,6 +673,8 @@ union flow *flow_alloc(void)
>   */
>  void flow_alloc_cancel(union flow *flow)
>  {
> +	pthread_rwlock_wrlock(&flow_lock);
> +
>  	assert(flow_new_entry == flow);
>  	assert(flow->f.state == FLOW_STATE_NEW ||
>  	       flow->f.state == FLOW_STATE_INI ||
> @@ -678,6 +692,8 @@ void flow_alloc_cancel(union flow *flow)
>  	flow->free.next = flow_first_free;
>  	flow_first_free = FLOW_IDX(flow);
>  	flow_new_entry = NULL;
> +
> +	pthread_rwlock_unlock(&flow_lock);
>  }
>  
>  /**
> @@ -763,9 +779,13 @@ static inline unsigned flow_hash_probe(const struct ctx *c, flow_sidx_t sidx)
>  uint64_t flow_hash_insert(const struct ctx *c, flow_sidx_t sidx)
>  {
>  	uint64_t hash = flow_sidx_hash(c, sidx);
> -	unsigned b = flow_hash_probe_(hash, sidx);
> +	unsigned b;
>  
> +	pthread_rwlock_wrlock(&flow_lock);
> +	b = flow_hash_probe_(hash, sidx);
>  	flow_hashtab[b] = sidx;
> +	pthread_rwlock_unlock(&flow_lock);
> +
>  	flow_dbg(flow_at_sidx(sidx), "Side %u hash table insert: bucket: %u",
>  		 sidx.sidei, b);
>  
> @@ -779,10 +799,16 @@ uint64_t flow_hash_insert(const struct ctx *c, flow_sidx_t sidx)
>   */
>  void flow_hash_remove(const struct ctx *c, flow_sidx_t sidx)
>  {
> -	unsigned b = flow_hash_probe(c, sidx), s;
> +	unsigned b, s;
>  
> -	if (!flow_sidx_valid(flow_hashtab[b]))
> +	pthread_rwlock_wrlock(&flow_lock);
> +
> +	b = flow_hash_probe(c, sidx);
> +
> +	if (!flow_sidx_valid(flow_hashtab[b])) {
> +		pthread_rwlock_unlock(&flow_lock);
>  		return; /* Redundant remove */
> +	}
>  
>  	flow_dbg(flow_at_sidx(sidx), "Side %u hash table remove: bucket: %u",
>  		 sidx.sidei, b);
> @@ -802,6 +828,8 @@ void flow_hash_remove(const struct ctx *c, flow_sidx_t sidx)
>  	}
>  
>  	flow_hashtab[b] = FLOW_SIDX_NONE;
> +
> +	pthread_rwlock_unlock(&flow_lock);
>  }
>  
>  /**
> @@ -816,10 +844,12 @@ void flow_hash_remove(const struct ctx *c, flow_sidx_t sidx)
>  static flow_sidx_t flowside_lookup(const struct ctx *c, uint8_t proto,
>  				   uint8_t pif, const struct flowside *side)
>  {
> -	flow_sidx_t sidx;
> +	flow_sidx_t sidx, ret;
>  	union flow *flow;
>  	unsigned b;
>  
> +	pthread_rwlock_rdlock(&flow_lock);
> +
>  	b = flow_hash(c, proto, pif, side) % FLOW_HASH_SIZE;
>  	while ((sidx = flow_hashtab[b], flow = flow_at_sidx(sidx)) &&
>  	       !(FLOW_PROTO(&flow->f) == proto &&
> @@ -827,7 +857,11 @@ static flow_sidx_t flowside_lookup(const struct ctx *c, uint8_t proto,
>  		 flowside_eq(&flow->f.side[sidx.sidei], side)))
>  		b = mod_sub(b, 1, FLOW_HASH_SIZE);
>  
> -	return flow_hashtab[b];
> +	ret = flow_hashtab[b];
> +
> +	pthread_rwlock_unlock(&flow_lock);
> +
> +	return ret;
>  }
>  
>  /**
> @@ -920,6 +954,9 @@ void flow_defer_handler(const struct ctx *c, const struct timespec *now,
>  	flow_foreach(flow) {
>  		bool closed = false;
>  
> +		if (flow->f.qpair != qpair)
> +			continue;
> +
>  		switch (flow->f.type) {
>  		case FLOW_TYPE_NONE:
>  			assert(false);
> @@ -951,6 +988,7 @@ void flow_defer_handler(const struct ctx *c, const struct timespec *now,
>  	}
>  
>  	/* Second step: actually free the flows */
> +	pthread_rwlock_wrlock(&flow_lock);
>  	flow_foreach_slot(flow) {
>  		switch (flow->f.state) {
>  		case FLOW_STATE_FREE: {
> @@ -979,8 +1017,8 @@ void flow_defer_handler(const struct ctx *c, const struct timespec *now,
>  		case FLOW_STATE_INI:
>  		case FLOW_STATE_TGT:
>  		case FLOW_STATE_TYPED:
> -			/* Incomplete flow at end of cycle */
> -			assert(false);
> +			/* In-progress allocation on another thread */
> +			free_head = NULL;
>  			break;
>  
>  		case FLOW_STATE_ACTIVE:
> @@ -1012,6 +1050,7 @@ void flow_defer_handler(const struct ctx *c, const struct timespec *now,
>  	}
>  
>  	*last_next = FLOW_MAX;
> +	pthread_rwlock_unlock(&flow_lock);
>  }
>  
>  /**
> diff --git a/flow_table.h b/flow_table.h
> index e4ff6f73c35c..f2545390205a 100644
> --- a/flow_table.h
> +++ b/flow_table.h
> @@ -72,7 +72,7 @@ extern union flow flowtab[];
>  			(flow) += (flow)->free.n - 1;			\
>  		/* NOLINTNEXTLINE(readability-inconsistent-ifelse-braces) */\
>  		else if ((flow)->f.state != FLOW_STATE_ACTIVE) {	\
> -			flow_err((flow), "Bad flow state during traversal"); \
> +			(void)0; /* Differs from bare continue */	\
>  			continue;					\
>  		} else
>  
> -- 
> 2.54.0
> 

-- 
David Gibson (he or they)	| I'll have my music baroque, and my code
david AT gibson.dropbear.id.au	| minimalist, thank you, not the other way
				| around.
http://www.ozlabs.org/~dgibson

[-- Attachment #2: signature.asc --]
[-- Type: application/pgp-signature, Size: 833 bytes --]

      reply	other threads:[~2026-07-02  3:42 UTC|newest]

Thread overview: 17+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2026-06-16 17:10 [PATCH 0/8] multithreading: Prepare data structures for concurrent queue pair workers Laurent Vivier
2026-06-16 17:10 ` [PATCH 1/8] tap: Convert packet pools to per-queue-pair arrays for multiqueue Laurent Vivier
2026-06-29  9:59   ` David Gibson
2026-06-16 17:10 ` [PATCH 2/8] tap: Make L4 sequence pools per-qpair for thread safety Laurent Vivier
2026-07-02  2:27   ` David Gibson
2026-06-16 17:10 ` [PATCH 3/8] tcp: Make static buffers stack-local " Laurent Vivier
2026-07-02  2:32   ` David Gibson
2026-06-16 17:10 ` [PATCH 4/8] udp_vu: Make virtqueue " Laurent Vivier
2026-07-02  2:37   ` David Gibson
2026-06-16 17:10 ` [PATCH 5/8] flow: Make flow timer per-caller " Laurent Vivier
2026-07-02  2:49   ` David Gibson
2026-06-16 17:10 ` [PATCH 6/8] tcp: Make TCP timer state per-caller and guard global tasks Laurent Vivier
2026-07-02  2:55   ` David Gibson
2026-06-16 17:10 ` [PATCH 7/8] tcp: Protect init socket pools with mutex for thread safety Laurent Vivier
2026-07-02  2:59   ` David Gibson
2026-06-16 17:10 ` [PATCH 8/8] flow: Add mutex and per-qpair filtering to flow table operations Laurent Vivier
2026-07-02  3:03   ` David Gibson [this message]

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=akXVDbc4p_3P2LSl@zatzit \
    --to=david@gibson.dropbear.id.au \
    --cc=lvivier@redhat.com \
    --cc=passt-dev@passt.top \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
Code repositories for project(s) associated with this public inbox

	https://passt.top/passt

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for IMAP folder(s).