From mboxrd@z Thu Jan 1 00:00:00 1970 Authentication-Results: passt.top; dmarc=pass (p=quarantine dis=none) header.from=redhat.com Authentication-Results: passt.top; dkim=pass (1024-bit key; unprotected) header.d=redhat.com header.i=@redhat.com header.a=rsa-sha256 header.s=mimecast20190719 header.b=c242wL67; dkim-atps=neutral Received: from us-smtp-delivery-124.mimecast.com (us-smtp-delivery-124.mimecast.com [170.10.133.124]) by passt.top (Postfix) with ESMTPS id E6A185A0271 for ; Tue, 16 Jun 2026 19:11:07 +0200 (CEST) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=redhat.com; s=mimecast20190719; t=1781629866; h=from:from:reply-to:subject:subject:date:date:message-id:message-id: to:to:cc:cc:mime-version:mime-version:content-type:content-type: content-transfer-encoding:content-transfer-encoding: in-reply-to:in-reply-to:references:references; bh=naXse06zdVjIGq4/LoR0WjexwpSca9Q+nYU7LY0Kli8=; b=c242wL67VF8nOTIn5M5Iv3BhtOaoPBFGEf4S6xrjN/iReVL18m1ukE4KMqJduN8ywraSe7 BEtXN0BW61KOn2wCSQ6BaTKbEW7mXDt1RxumR097sZURISoR8EgZCvZIVodfO4Xcijm89e k0c3jLf0GjZFvtpb9MSmw39F9VZ+4Mk= Received: from mx-prod-mc-06.mail-002.prod.us-west-2.aws.redhat.com (ec2-35-165-154-97.us-west-2.compute.amazonaws.com [35.165.154.97]) by relay.mimecast.com with ESMTP with STARTTLS (version=TLSv1.3, cipher=TLS_AES_256_GCM_SHA384) id us-mta-34-_H5ZJVzrOBGSHku57TfowA-1; Tue, 16 Jun 2026 13:11:05 -0400 X-MC-Unique: _H5ZJVzrOBGSHku57TfowA-1 X-Mimecast-MFC-AGG-ID: _H5ZJVzrOBGSHku57TfowA_1781629864 Received: from mx-prod-int-05.mail-002.prod.us-west-2.aws.redhat.com (mx-prod-int-05.mail-002.prod.us-west-2.aws.redhat.com [10.30.177.17]) (using TLSv1.3 with cipher TLS_AES_256_GCM_SHA384 (256/256 bits) key-exchange X25519 server-signature RSA-PSS (2048 bits) server-digest SHA256) (No client certificate requested) by mx-prod-mc-06.mail-002.prod.us-west-2.aws.redhat.com (Postfix) with ESMTPS id 2FFD818011D1 for ; Tue, 16 Jun 2026 17:11:04 +0000 (UTC) Received: from lenovo-t14s.redhat.corp (headnet05.pony-001.prod.iad2.dc.redhat.com [10.2.32.117]) by mx-prod-int-05.mail-002.prod.us-west-2.aws.redhat.com (Postfix) with ESMTP id 5EAAF195419F; Tue, 16 Jun 2026 17:11:03 +0000 (UTC) From: Laurent Vivier To: passt-dev@passt.top Subject: [PATCH 8/8] flow: Add mutex and per-qpair filtering to flow table operations Date: Tue, 16 Jun 2026 19:10:52 +0200 Message-ID: <20260616171052.3785909-9-lvivier@redhat.com> In-Reply-To: <20260616171052.3785909-1-lvivier@redhat.com> References: <20260616171052.3785909-1-lvivier@redhat.com> MIME-Version: 1.0 X-Scanned-By: MIMEDefang 3.0 on 10.30.177.17 X-Mimecast-Spam-Score: 0 X-Mimecast-MFC-PROC-ID: 9fo9WAlaCu2vZ9jy2_1oetwf3M9OFUBq90wG5sZYQ08_1781629864 X-Mimecast-Originator: redhat.com Content-Transfer-Encoding: 8bit content-type: text/plain; charset="US-ASCII"; x-default=true Message-ID-Hash: EISAI7ZXO2MVG2TG33K3SGUEZOS4A6EG X-Message-ID-Hash: EISAI7ZXO2MVG2TG33K3SGUEZOS4A6EG X-MailFrom: lvivier@redhat.com X-Mailman-Rule-Misses: dmarc-mitigation; no-senders; approved; emergency; loop; banned-address; member-moderation; nonmember-moderation; administrivia; implicit-dest; max-recipients; max-size; news-moderation; no-subject; digests; suspicious-header CC: Laurent Vivier X-Mailman-Version: 3.3.8 Precedence: list List-Id: Development discussion and patches for passt Archived-At: Archived-At: List-Archive: List-Archive: List-Help: List-Owner: List-Post: List-Subscribe: List-Unsubscribe: The flow table free list, hash table, and flow_new_entry are global shared state accessed from multiple threads. Protect flow_alloc(), flow_alloc_cancel(), flow_hash_insert(), flow_hash_remove(), and the free list rebuild in flow_defer_handler() with a pthread_rwlock_t: writers for mutations, readers for lookups. Make flow_new_entry _Thread_local so each thread independently tracks its own in-progress allocation. Since the lock is released between flow_alloc() and flow_activate(), other threads can observe intermediate flow states (NEW, INI, TGT, TYPED) during traversal. Adapt flow_foreach() and flow_defer_handler() accordingly: skip these entries silently rather than treating them as errors, and break free-list cluster merging across them. Filter flow_defer_handler()'s first loop by qpair, so each thread only processes its own flows. Signed-off-by: Laurent Vivier --- flow.c | 59 +++++++++++++++++++++++++++++++++++++++++++--------- flow_table.h | 2 +- 2 files changed, 50 insertions(+), 11 deletions(-) diff --git a/flow.c b/flow.c index 08c7620c7b0f..149360c3ec87 100644 --- a/flow.c +++ b/flow.c @@ -12,6 +12,8 @@ #include #include +#include + #include "util.h" #include "ip.h" #include "passt.h" @@ -129,7 +131,7 @@ static_assert(ARRAY_SIZE(flow_epoll) == FLOW_NUM_TYPES, unsigned flow_first_free; union flow flowtab[FLOW_MAX]; -static const union flow *flow_new_entry; /* = NULL */ +static _Thread_local const union flow *flow_new_entry; /* = NULL */ int qpair_to_fd[FLOW_QPAIR_SIZE]; /* Hash table to index it */ @@ -142,6 +144,8 @@ static flow_sidx_t flow_hashtab[FLOW_HASH_SIZE]; static_assert(ARRAY_SIZE(flow_hashtab) >= 2 * FLOW_MAX, "Safe linear probing requires hash table with more entries than the number of sides in the flow table"); +static pthread_rwlock_t flow_lock = PTHREAD_RWLOCK_INITIALIZER; + /** flowside_from_af() - Initialise flowside from addresses * @side: flowside to initialise * @af: Address family (AF_INET or AF_INET6) @@ -616,12 +620,18 @@ void flow_activate(struct flow_common *f) */ union flow *flow_alloc(void) { - union flow *flow = &flowtab[flow_first_free]; + union flow *flow; + + pthread_rwlock_wrlock(&flow_lock); + + flow = &flowtab[flow_first_free]; assert(!flow_new_entry); - if (flow_first_free >= FLOW_MAX) + if (flow_first_free >= FLOW_MAX) { + pthread_rwlock_unlock(&flow_lock); return NULL; + } assert(flow->f.state == FLOW_STATE_FREE); assert(flow->f.type == FLOW_TYPE_NONE); @@ -650,6 +660,8 @@ union flow *flow_alloc(void) memset(flow, 0, sizeof(*flow)); flow_set_state(&flow->f, FLOW_STATE_NEW); + pthread_rwlock_unlock(&flow_lock); + return flow; } @@ -661,6 +673,8 @@ union flow *flow_alloc(void) */ void flow_alloc_cancel(union flow *flow) { + pthread_rwlock_wrlock(&flow_lock); + assert(flow_new_entry == flow); assert(flow->f.state == FLOW_STATE_NEW || flow->f.state == FLOW_STATE_INI || @@ -678,6 +692,8 @@ void flow_alloc_cancel(union flow *flow) flow->free.next = flow_first_free; flow_first_free = FLOW_IDX(flow); flow_new_entry = NULL; + + pthread_rwlock_unlock(&flow_lock); } /** @@ -763,9 +779,13 @@ static inline unsigned flow_hash_probe(const struct ctx *c, flow_sidx_t sidx) uint64_t flow_hash_insert(const struct ctx *c, flow_sidx_t sidx) { uint64_t hash = flow_sidx_hash(c, sidx); - unsigned b = flow_hash_probe_(hash, sidx); + unsigned b; + pthread_rwlock_wrlock(&flow_lock); + b = flow_hash_probe_(hash, sidx); flow_hashtab[b] = sidx; + pthread_rwlock_unlock(&flow_lock); + flow_dbg(flow_at_sidx(sidx), "Side %u hash table insert: bucket: %u", sidx.sidei, b); @@ -779,10 +799,16 @@ uint64_t flow_hash_insert(const struct ctx *c, flow_sidx_t sidx) */ void flow_hash_remove(const struct ctx *c, flow_sidx_t sidx) { - unsigned b = flow_hash_probe(c, sidx), s; + unsigned b, s; - if (!flow_sidx_valid(flow_hashtab[b])) + pthread_rwlock_wrlock(&flow_lock); + + b = flow_hash_probe(c, sidx); + + if (!flow_sidx_valid(flow_hashtab[b])) { + pthread_rwlock_unlock(&flow_lock); return; /* Redundant remove */ + } flow_dbg(flow_at_sidx(sidx), "Side %u hash table remove: bucket: %u", sidx.sidei, b); @@ -802,6 +828,8 @@ void flow_hash_remove(const struct ctx *c, flow_sidx_t sidx) } flow_hashtab[b] = FLOW_SIDX_NONE; + + pthread_rwlock_unlock(&flow_lock); } /** @@ -816,10 +844,12 @@ void flow_hash_remove(const struct ctx *c, flow_sidx_t sidx) static flow_sidx_t flowside_lookup(const struct ctx *c, uint8_t proto, uint8_t pif, const struct flowside *side) { - flow_sidx_t sidx; + flow_sidx_t sidx, ret; union flow *flow; unsigned b; + pthread_rwlock_rdlock(&flow_lock); + b = flow_hash(c, proto, pif, side) % FLOW_HASH_SIZE; while ((sidx = flow_hashtab[b], flow = flow_at_sidx(sidx)) && !(FLOW_PROTO(&flow->f) == proto && @@ -827,7 +857,11 @@ static flow_sidx_t flowside_lookup(const struct ctx *c, uint8_t proto, flowside_eq(&flow->f.side[sidx.sidei], side))) b = mod_sub(b, 1, FLOW_HASH_SIZE); - return flow_hashtab[b]; + ret = flow_hashtab[b]; + + pthread_rwlock_unlock(&flow_lock); + + return ret; } /** @@ -920,6 +954,9 @@ void flow_defer_handler(const struct ctx *c, const struct timespec *now, flow_foreach(flow) { bool closed = false; + if (flow->f.qpair != qpair) + continue; + switch (flow->f.type) { case FLOW_TYPE_NONE: assert(false); @@ -951,6 +988,7 @@ void flow_defer_handler(const struct ctx *c, const struct timespec *now, } /* Second step: actually free the flows */ + pthread_rwlock_wrlock(&flow_lock); flow_foreach_slot(flow) { switch (flow->f.state) { case FLOW_STATE_FREE: { @@ -979,8 +1017,8 @@ void flow_defer_handler(const struct ctx *c, const struct timespec *now, case FLOW_STATE_INI: case FLOW_STATE_TGT: case FLOW_STATE_TYPED: - /* Incomplete flow at end of cycle */ - assert(false); + /* In-progress allocation on another thread */ + free_head = NULL; break; case FLOW_STATE_ACTIVE: @@ -1012,6 +1050,7 @@ void flow_defer_handler(const struct ctx *c, const struct timespec *now, } *last_next = FLOW_MAX; + pthread_rwlock_unlock(&flow_lock); } /** diff --git a/flow_table.h b/flow_table.h index e4ff6f73c35c..f2545390205a 100644 --- a/flow_table.h +++ b/flow_table.h @@ -72,7 +72,7 @@ extern union flow flowtab[]; (flow) += (flow)->free.n - 1; \ /* NOLINTNEXTLINE(readability-inconsistent-ifelse-braces) */\ else if ((flow)->f.state != FLOW_STATE_ACTIVE) { \ - flow_err((flow), "Bad flow state during traversal"); \ + (void)0; /* Differs from bare continue */ \ continue; \ } else -- 2.54.0