* [PATCH 1/8] tap: Convert packet pools to per-queue-pair arrays for multiqueue
2026-06-16 17:10 [PATCH 0/8] multithreading: Prepare data structures for concurrent queue pair workers Laurent Vivier
@ 2026-06-16 17:10 ` Laurent Vivier
2026-06-16 17:10 ` [PATCH 2/8] tap: Make L4 sequence pools per-qpair for thread safety Laurent Vivier
` (6 subsequent siblings)
7 siblings, 0 replies; 9+ messages in thread
From: Laurent Vivier @ 2026-06-16 17:10 UTC (permalink / raw)
To: passt-dev; +Cc: Laurent Vivier
Convert the global pool_tap4 and pool_tap6 packet pools from single
pools to arrays of pools, one for each queue pair. This change is
necessary to support multiqueue operation in vhost-user mode, where
multiple queue pairs may be processing packets concurrently.
The pool storage structures (pool_tap4_storage and pool_tap6_storage)
are now arrays of VHOST_USER_MAX_VQS/2 elements, with corresponding
pointer arrays (pool_tap4 and pool_tap6) for accessing them.
Add a qpair parameter to tap_flush_pools() so it flushes the correct
pool. tap4_handler() and tap6_handler() now use the qpair they
already receive to index into the pool arrays. Add bounds checking
assertions in tap_handler() and tap_add_packet().
In passt and pasta modes, all operations use QPAIR_DEFAULT. In
vhost-user mode, the queue pair is derived from the virtqueue index
via QPAIR_FROM_QUEUE().
All pools within the array share the same buffer pointer:
- In vhost-user mode: Points to the vhost-user memory structure, which
is safe as packet data remains in guest memory and pools only track
iovecs
- In passt/pasta mode: Points to pkt_buf, which is safe as only queue
pair 0 is used
Signed-off-by: Laurent Vivier <lvivier@redhat.com>
---
tap.c | 77 ++++++++++++++++++++++++++++++-----------------------
tap.h | 2 +-
vu_common.c | 2 +-
3 files changed, 46 insertions(+), 35 deletions(-)
diff --git a/tap.c b/tap.c
index 5e9c7a1701bf..80912372e216 100644
--- a/tap.c
+++ b/tap.c
@@ -94,9 +94,13 @@ CHECK_FRAME_LEN(L2_MAX_LEN_VU);
DIV_ROUND_UP(sizeof(pkt_buf), \
ETH_HLEN + sizeof(struct ipv6hdr) + sizeof(struct udphdr))
-/* IPv4 (plus ARP) and IPv6 message batches from tap/guest to IP handlers */
-static PACKET_POOL_NOINIT(pool_tap4, TAP_MSGS_IP4);
-static PACKET_POOL_NOINIT(pool_tap6, TAP_MSGS_IP6);
+/* IPv4 (plus ARP) and IPv6 message batches from tap/guest to IP handlers
+ * One pool per queue pair for multiqueue support
+ */
+static PACKET_POOL_DECL(pool_tap4, TAP_MSGS_IP4) pool_tap4_storage[VHOST_USER_MAX_VQS / 2];
+static struct pool *pool_tap4[VHOST_USER_MAX_VQS / 2];
+static PACKET_POOL_DECL(pool_tap6, TAP_MSGS_IP6) pool_tap6_storage[VHOST_USER_MAX_VQS / 2];
+static struct pool *pool_tap6[VHOST_USER_MAX_VQS / 2];
#define TAP_SEQS 128 /* Different L4 tuples in one batch */
@@ -717,12 +721,12 @@ static int tap4_handler(struct ctx *c, unsigned int qpair,
unsigned int i, j, seq_count;
struct tap4_l4_t *seq;
- if (!c->ifi4 || !pool_tap4->count)
- return pool_tap4->count;
+ if (!c->ifi4 || !pool_tap4[qpair]->count)
+ return pool_tap4[qpair]->count;
i = 0;
resume:
- for (seq_count = 0, seq = NULL; i < pool_tap4->count; i++) {
+ for (seq_count = 0, seq = NULL; i < pool_tap4[qpair]->count; i++) {
size_t l3len, hlen, l4len;
struct ethhdr eh_storage;
struct iphdr iph_storage;
@@ -732,7 +736,7 @@ resume:
struct iov_tail data;
struct iphdr *iph;
- if (!packet_get(pool_tap4, i, &data))
+ if (!packet_get(pool_tap4[qpair], i, &data))
continue;
eh = IOV_PEEK_HEADER(&data, eh_storage);
@@ -804,7 +808,7 @@ resume:
if (iph->protocol == IPPROTO_UDP) {
struct iov_tail eh_data;
- packet_get(pool_tap4, i, &eh_data);
+ packet_get(pool_tap4[qpair], i, &eh_data);
if (dhcp(c, qpair, &eh_data))
continue;
}
@@ -835,7 +839,7 @@ resume:
goto append;
if (seq_count == TAP_SEQS)
- break; /* Resume after flushing if i < pool_tap4->count */
+ break; /* Resume after flushing if i < pool_tap4[qpair]->count */
for (seq = tap4_l4 + seq_count - 1; seq >= tap4_l4; seq--) {
if (L4_MATCH(iph, uh, seq)) {
@@ -881,10 +885,10 @@ append:
}
}
- if (i < pool_tap4->count)
+ if (i < pool_tap4[qpair]->count)
goto resume;
- return pool_tap4->count;
+ return pool_tap4[qpair]->count;
}
#define IPV6_NH_OPT(nh) \
@@ -953,12 +957,12 @@ static int tap6_handler(struct ctx *c, unsigned int qpair,
unsigned int i, j, seq_count = 0;
struct tap6_l4_t *seq;
- if (!c->ifi6 || !pool_tap6->count)
- return pool_tap6->count;
+ if (!c->ifi6 || !pool_tap6[qpair]->count)
+ return pool_tap6[qpair]->count;
i = 0;
resume:
- for (seq_count = 0, seq = NULL; i < pool_tap6->count; i++) {
+ for (seq_count = 0, seq = NULL; i < pool_tap6[qpair]->count; i++) {
size_t l4len, plen, check;
struct in6_addr *saddr, *daddr;
struct ipv6hdr ip6h_storage;
@@ -970,7 +974,7 @@ resume:
struct ipv6hdr *ip6h;
uint8_t proto;
- if (!packet_get(pool_tap6, i, &data))
+ if (!packet_get(pool_tap6[qpair], i, &data))
return -1;
eh = IOV_REMOVE_HEADER(&data, eh_storage);
@@ -1083,7 +1087,7 @@ resume:
goto append;
if (seq_count == TAP_SEQS)
- break; /* Resume after flushing if i < pool_tap6->count */
+ break; /* Resume after flushing if i < pool_tap6[qpair]->count */
for (seq = tap6_l4 + seq_count - 1; seq >= tap6_l4; seq--) {
if (L4_MATCH(ip6h, proto, uh, seq)) {
@@ -1130,19 +1134,19 @@ append:
}
}
- if (i < pool_tap6->count)
+ if (i < pool_tap6[qpair]->count)
goto resume;
- return pool_tap6->count;
+ return pool_tap6[qpair]->count;
}
/**
- * tap_flush_pools() - Flush both IPv4 and IPv6 packet pools
+ * tap_flush_pools() - Flush both IPv4 and IPv6 packet pools for a given qpair
*/
-void tap_flush_pools(void)
+void tap_flush_pools(unsigned int qpair)
{
- pool_flush(pool_tap4);
- pool_flush(pool_tap6);
+ pool_flush(pool_tap4[qpair]);
+ pool_flush(pool_tap6[qpair]);
}
/**
@@ -1153,6 +1157,7 @@ void tap_flush_pools(void)
*/
void tap_handler(struct ctx *c, unsigned int qpair, const struct timespec *now)
{
+ assert(qpair < VHOST_USER_MAX_VQS / 2);
tap4_handler(c, qpair, now);
tap6_handler(c, qpair, now);
}
@@ -1187,21 +1192,23 @@ void tap_add_packet(struct ctx *c, unsigned int qpair, struct iov_tail *data,
proto_update_l2_buf(c->guest_mac);
}
+ assert(qpair < VHOST_USER_MAX_VQS / 2);
+
switch (ntohs(eh->h_proto)) {
case ETH_P_ARP:
case ETH_P_IP:
- if (!pool_can_fit(pool_tap4, data)) {
+ if (!pool_can_fit(pool_tap4[qpair], data)) {
tap4_handler(c, qpair, now);
- pool_flush(pool_tap4);
+ pool_flush(pool_tap4[qpair]);
}
- packet_add(pool_tap4, data);
+ packet_add(pool_tap4[qpair], data);
break;
case ETH_P_IPV6:
- if (!pool_can_fit(pool_tap6, data)) {
+ if (!pool_can_fit(pool_tap6[qpair], data)) {
tap6_handler(c, qpair, now);
- pool_flush(pool_tap6);
+ pool_flush(pool_tap6[qpair]);
}
- packet_add(pool_tap6, data);
+ packet_add(pool_tap6[qpair], data);
break;
default:
break;
@@ -1239,7 +1246,7 @@ static void tap_passt_input(struct ctx *c, const struct timespec *now)
ssize_t n;
char *p;
- tap_flush_pools();
+ tap_flush_pools(QPAIR_DEFAULT);
if (partial_len) {
/* We have a partial frame from an earlier pass. Move it to the
@@ -1322,7 +1329,7 @@ static void tap_pasta_input(struct ctx *c, const struct timespec *now)
{
ssize_t n, len;
- tap_flush_pools();
+ tap_flush_pools(QPAIR_DEFAULT);
for (n = 0;
n <= (ssize_t)(sizeof(pkt_buf) - L2_MAX_LEN_PASTA);
@@ -1591,10 +1598,14 @@ static void tap_sock_tun_init(struct ctx *c)
*/
static void tap_sock_update_pool(void *base, size_t size)
{
- int i;
+ unsigned int i;
- pool_tap4_storage = PACKET_INIT(pool_tap4, TAP_MSGS_IP4, base, size);
- pool_tap6_storage = PACKET_INIT(pool_tap6, TAP_MSGS_IP6, base, size);
+ for (i = 0; i < VHOST_USER_MAX_VQS / 2; i++) {
+ pool_tap4_storage[i] = PACKET_INIT(pool_tap4, TAP_MSGS_IP4, base, size);
+ pool_tap4[i] = (struct pool *)&pool_tap4_storage[i];
+ pool_tap6_storage[i] = PACKET_INIT(pool_tap6, TAP_MSGS_IP6, base, size);
+ pool_tap6[i] = (struct pool *)&pool_tap6_storage[i];
+ }
for (i = 0; i < TAP_SEQS; i++) {
tap4_l4[i].p = PACKET_INIT(pool_l4, UIO_MAXIOV, base, size);
diff --git a/tap.h b/tap.h
index ecb12de372b5..8a1d8f933a5b 100644
--- a/tap.h
+++ b/tap.h
@@ -123,7 +123,7 @@ void tap_handler_passt(struct ctx *c, uint32_t events,
int tap_sock_unix_open(char *sock_path);
void tap_sock_reset(struct ctx *c);
void tap_backend_init(struct ctx *c);
-void tap_flush_pools(void);
+void tap_flush_pools(unsigned int qpair);
void tap_handler(struct ctx *c, unsigned int qpair, const struct timespec *now);
void tap_add_packet(struct ctx *c, unsigned int qpair, struct iov_tail *data,
const struct timespec *now);
diff --git a/vu_common.c b/vu_common.c
index 6aa1ba768136..2e69c1945055 100644
--- a/vu_common.c
+++ b/vu_common.c
@@ -178,7 +178,7 @@ static void vu_handle_tx(struct vu_dev *vdev, int index,
assert(QPAIR_FROMGUEST_QUEUE(QPAIR_FROM_QUEUE(index)) ==
(unsigned int)index);
- tap_flush_pools();
+ tap_flush_pools(QPAIR_FROM_QUEUE(index));
count = 0;
out_sg_count = 0;
--
2.54.0
^ permalink raw reply [flat|nested] 9+ messages in thread* [PATCH 2/8] tap: Make L4 sequence pools per-qpair for thread safety
2026-06-16 17:10 [PATCH 0/8] multithreading: Prepare data structures for concurrent queue pair workers Laurent Vivier
2026-06-16 17:10 ` [PATCH 1/8] tap: Convert packet pools to per-queue-pair arrays for multiqueue Laurent Vivier
@ 2026-06-16 17:10 ` Laurent Vivier
2026-06-16 17:10 ` [PATCH 3/8] tcp: Make static buffers stack-local " Laurent Vivier
` (5 subsequent siblings)
7 siblings, 0 replies; 9+ messages in thread
From: Laurent Vivier @ 2026-06-16 17:10 UTC (permalink / raw)
To: passt-dev; +Cc: Laurent Vivier
The L4 sequence arrays tap4_l4[] and tap6_l4[] are used to batch
packets with the same L4 tuple within a single tap_handler() call.
They are global, but tap_handler() can be called concurrently from
different worker threads with different qpairs in vhost-user mode.
Make these arrays per-qpair by adding a VHOST_USER_MAX_VQS/2 first
dimension, indexed by the qpair parameter already available in
tap4_handler() and tap6_handler().
Update tap_sock_update_pool() to initialize all qpair*seq entries.
Signed-off-by: Laurent Vivier <lvivier@redhat.com>
---
tap.c | 32 +++++++++++++++++++-------------
1 file changed, 19 insertions(+), 13 deletions(-)
diff --git a/tap.c b/tap.c
index 80912372e216..659df9d560d3 100644
--- a/tap.c
+++ b/tap.c
@@ -606,7 +606,7 @@ static struct tap4_l4_t {
struct in_addr daddr;
struct pool_l4_t p;
-} tap4_l4[TAP_SEQS /* Arbitrary: TAP_MSGS in theory, so limit in users */];
+} tap4_l4[VHOST_USER_MAX_VQS / 2][TAP_SEQS /* Arbitrary: TAP_MSGS in theory, so limit in users */];
/**
* struct l4_seq6_t - Message sequence for one protocol handler call, IPv6
@@ -633,7 +633,7 @@ static struct tap6_l4_t {
uint8_t hop_limit;
struct pool_l4_t p;
-} tap6_l4[TAP_SEQS /* Arbitrary: TAP_MSGS in theory, so limit in users */];
+} tap6_l4[VHOST_USER_MAX_VQS / 2][TAP_SEQS /* Arbitrary: TAP_MSGS in theory, so limit in users */];
/**
* tap_packet_debug() - Print debug message for packet(s) from guest/tap
@@ -841,7 +841,7 @@ resume:
if (seq_count == TAP_SEQS)
break; /* Resume after flushing if i < pool_tap4[qpair]->count */
- for (seq = tap4_l4 + seq_count - 1; seq >= tap4_l4; seq--) {
+ for (seq = tap4_l4[qpair] + seq_count - 1; seq >= tap4_l4[qpair]; seq--) {
if (L4_MATCH(iph, uh, seq)) {
if (seq->p.count >= UIO_MAXIOV)
seq = NULL;
@@ -849,8 +849,8 @@ resume:
}
}
- if (!seq || seq < tap4_l4) {
- seq = tap4_l4 + seq_count++;
+ if (!seq || seq < tap4_l4[qpair]) {
+ seq = tap4_l4[qpair] + seq_count++;
L4_SET(iph, uh, seq);
pool_flush((struct pool *)&seq->p);
}
@@ -862,7 +862,7 @@ append:
packet_add((struct pool *)&seq->p, &data);
}
- for (j = 0, seq = tap4_l4; j < seq_count; j++, seq++) {
+ for (j = 0, seq = tap4_l4[qpair]; j < seq_count; j++, seq++) {
const struct pool *p = (const struct pool *)&seq->p;
size_t k;
@@ -1089,7 +1089,7 @@ resume:
if (seq_count == TAP_SEQS)
break; /* Resume after flushing if i < pool_tap6[qpair]->count */
- for (seq = tap6_l4 + seq_count - 1; seq >= tap6_l4; seq--) {
+ for (seq = tap6_l4[qpair] + seq_count - 1; seq >= tap6_l4[qpair]; seq--) {
if (L4_MATCH(ip6h, proto, uh, seq)) {
if (seq->p.count >= UIO_MAXIOV)
seq = NULL;
@@ -1097,8 +1097,8 @@ resume:
}
}
- if (!seq || seq < tap6_l4) {
- seq = tap6_l4 + seq_count++;
+ if (!seq || seq < tap6_l4[qpair]) {
+ seq = tap6_l4[qpair] + seq_count++;
L4_SET(ip6h, proto, uh, seq);
pool_flush((struct pool *)&seq->p);
}
@@ -1110,7 +1110,7 @@ append:
packet_add((struct pool *)&seq->p, &data);
}
- for (j = 0, seq = tap6_l4; j < seq_count; j++, seq++) {
+ for (j = 0, seq = tap6_l4[qpair]; j < seq_count; j++, seq++) {
const struct pool *p = (const struct pool *)&seq->p;
size_t k;
@@ -1607,9 +1607,15 @@ static void tap_sock_update_pool(void *base, size_t size)
pool_tap6[i] = (struct pool *)&pool_tap6_storage[i];
}
- for (i = 0; i < TAP_SEQS; i++) {
- tap4_l4[i].p = PACKET_INIT(pool_l4, UIO_MAXIOV, base, size);
- tap6_l4[i].p = PACKET_INIT(pool_l4, UIO_MAXIOV, base, size);
+ for (i = 0; i < VHOST_USER_MAX_VQS / 2; i++) {
+ unsigned int j;
+
+ for (j = 0; j < TAP_SEQS; j++) {
+ tap4_l4[i][j].p = PACKET_INIT(pool_l4, UIO_MAXIOV,
+ base, size);
+ tap6_l4[i][j].p = PACKET_INIT(pool_l4, UIO_MAXIOV,
+ base, size);
+ }
}
}
--
2.54.0
^ permalink raw reply [flat|nested] 9+ messages in thread* [PATCH 3/8] tcp: Make static buffers stack-local for thread safety
2026-06-16 17:10 [PATCH 0/8] multithreading: Prepare data structures for concurrent queue pair workers Laurent Vivier
2026-06-16 17:10 ` [PATCH 1/8] tap: Convert packet pools to per-queue-pair arrays for multiqueue Laurent Vivier
2026-06-16 17:10 ` [PATCH 2/8] tap: Make L4 sequence pools per-qpair for thread safety Laurent Vivier
@ 2026-06-16 17:10 ` Laurent Vivier
2026-06-16 17:10 ` [PATCH 4/8] udp_vu: Make virtqueue " Laurent Vivier
` (4 subsequent siblings)
7 siblings, 0 replies; 9+ messages in thread
From: Laurent Vivier @ 2026-06-16 17:10 UTC (permalink / raw)
To: passt-dev; +Cc: Laurent Vivier
Static buffers shared across all call sites are not safe when multiple
worker threads handle TCP connections concurrently.
In tcp.c, move tcp_iov[] from file scope into tcp_data_from_tap() where
it is exclusively used. At UIO_MAXIOV (1024) entries of struct iovec
(16 bytes each), this adds 16 KiB to the stack frame.
In tcp_vu.c, move iov_vu[], elem[], and frame[] from file scope into
tcp_vu_data_from_sock() and pass them to tcp_vu_sock_recv() as
parameters. Also make iov_msg[] in tcp_vu_sock_recv() a local variable
instead of static, as it is only used within a single call. Combined,
these add roughly 80 KiB across the nested stack frames, which is
acceptable for per-thread stacks.
Signed-off-by: Laurent Vivier <lvivier@redhat.com>
---
tcp.c | 3 +--
tcp_vu.c | 33 ++++++++++++++++++++-------------
2 files changed, 21 insertions(+), 15 deletions(-)
diff --git a/tcp.c b/tcp.c
index 1549e14adaf4..f4fe866ba7c3 100644
--- a/tcp.c
+++ b/tcp.c
@@ -435,8 +435,6 @@ static socklen_t tcp_info_size;
/* Kernel reports delivery rate in TCP_INFO (kernel commit eb8329e0a04d) */
#define delivery_rate_cap tcp_info_cap(delivery_rate)
-/* sendmsg() to socket */
-static struct iovec tcp_iov [UIO_MAXIOV];
/* Pools for pre-opened sockets (in init) */
int init_sock_pool4 [TCP_SOCK_POOL_SIZE];
@@ -1900,6 +1898,7 @@ static int tcp_data_from_tap(const struct ctx *c, struct tcp_tap_conn *conn,
uint16_t max_ack_seq_wnd = conn->wnd_from_tap;
uint32_t max_ack_seq = conn->seq_ack_from_tap;
uint32_t seq_from_tap = conn->seq_from_tap;
+ struct iovec tcp_iov[UIO_MAXIOV];
struct msghdr mh = { .msg_iov = tcp_iov };
size_t len;
ssize_t n;
diff --git a/tcp_vu.c b/tcp_vu.c
index 4f76f599156f..9270ece43d17 100644
--- a/tcp_vu.c
+++ b/tcp_vu.c
@@ -35,9 +35,6 @@
#include "vu_common.h"
#include <time.h>
-static struct iovec iov_vu[VIRTQUEUE_MAX_SIZE];
-static struct vu_virtq_element elem[VIRTQUEUE_MAX_SIZE];
-
/**
* struct vu_frame - Descriptor for a TCP frame mapped to virtqueue elements
* @idx_element: Index of first element in elem[] for this frame
@@ -46,13 +43,13 @@ static struct vu_virtq_element elem[VIRTQUEUE_MAX_SIZE];
* @num_iovec: Number of iovecs covering this frame's buffers
* @size: Total frame size including all headers
*/
-static struct vu_frame {
+struct vu_frame {
int idx_element;
int num_element;
int idx_iovec;
int num_iovec;
size_t size;
-} frame[VIRTQUEUE_MAX_SIZE];
+};
/**
* tcp_vu_hdrlen() - Sum size of all headers, from TCP to virtio-net
@@ -224,6 +221,9 @@ int tcp_vu_send_flag(const struct ctx *c, struct tcp_tap_conn *conn, int flags,
* @v6: Set for IPv6 connections
* @already_sent: Number of bytes already sent
* @fillsize: Maximum bytes to fill in guest-side receiving window
+ * @iov_vu: IO vector array for virtqueue buffers
+ * @elem: Virtqueue element array
+ * @frame: Frame descriptor array
* @elem_used: number of element (output)
* @frame_cnt: Pointer to store the number of frames (output)
*
@@ -233,9 +233,12 @@ int tcp_vu_send_flag(const struct ctx *c, struct tcp_tap_conn *conn, int flags,
static ssize_t tcp_vu_sock_recv(const struct ctx *c, struct vu_virtq *vq,
const struct tcp_tap_conn *conn, bool v6,
uint32_t already_sent, size_t fillsize,
+ struct iovec *iov_vu,
+ struct vu_virtq_element *elem,
+ struct vu_frame *frame,
int *elem_used, int *frame_cnt)
{
- static struct iovec iov_msg[VIRTQUEUE_MAX_SIZE + DISCARD_IOV_NUM];
+ struct iovec iov_msg[VIRTQUEUE_MAX_SIZE + DISCARD_IOV_NUM];
const struct vu_dev *vdev = c->vdev;
struct msghdr mh_sock = { 0 };
uint16_t mss = MSS_GET(conn);
@@ -252,16 +255,16 @@ static ssize_t tcp_vu_sock_recv(const struct ctx *c, struct vu_virtq *vq,
iov_used = 0;
elem_cnt = 0;
*frame_cnt = 0;
- while (fillsize > 0 && elem_cnt < ARRAY_SIZE(elem) &&
- iov_used < ARRAY_SIZE(iov_vu) &&
- *frame_cnt < ARRAY_SIZE(frame)) {
+ while (fillsize > 0 && elem_cnt < VIRTQUEUE_MAX_SIZE &&
+ iov_used < VIRTQUEUE_MAX_SIZE &&
+ *frame_cnt < VIRTQUEUE_MAX_SIZE) {
size_t frame_size, in_total;
int cnt;
cnt = vu_collect(vdev, vq, &elem[elem_cnt],
- ARRAY_SIZE(elem) - elem_cnt,
+ VIRTQUEUE_MAX_SIZE - elem_cnt,
&iov_vu[iov_used],
- ARRAY_SIZE(iov_vu) - iov_used, &in_total,
+ VIRTQUEUE_MAX_SIZE - iov_used, &in_total,
MIN(mss, fillsize) + hdrlen,
&frame_size);
if (cnt == 0)
@@ -327,7 +330,8 @@ static ssize_t tcp_vu_sock_recv(const struct ctx *c, struct vu_virtq *vq,
if ((size_t)ret <= f->size - hdrlen) {
unsigned cnt;
- cnt = iov_skip_bytes(&iov_vu[f->idx_iovec], f->num_iovec,
+ cnt = iov_skip_bytes(&iov_vu[f->idx_iovec],
+ f->num_iovec,
MAX(hdrlen + ret, VNET_HLEN + ETH_ZLEN),
NULL);
if (cnt < (unsigned)f->num_iovec)
@@ -433,6 +437,9 @@ static void tcp_vu_prepare(const struct ctx *c, struct tcp_tap_conn *conn,
int tcp_vu_data_from_sock(const struct ctx *c, struct tcp_tap_conn *conn,
unsigned int qpair)
{
+ struct vu_virtq_element elem[VIRTQUEUE_MAX_SIZE];
+ struct iovec iov_vu[VIRTQUEUE_MAX_SIZE];
+ struct vu_frame frame[VIRTQUEUE_MAX_SIZE];
uint32_t wnd_scaled = conn->wnd_from_tap << conn->ws_from_tap;
int rx_queue = QPAIR_TOGUEST_QUEUE(qpair);
struct vu_dev *vdev = c->vdev;
@@ -477,7 +484,7 @@ int tcp_vu_data_from_sock(const struct ctx *c, struct tcp_tap_conn *conn,
* data from the socket
*/
len = tcp_vu_sock_recv(c, vq, conn, v6, already_sent, fillsize,
- &elem_cnt, &frame_cnt);
+ iov_vu, elem, frame, &elem_cnt, &frame_cnt);
if (len < 0) {
if (len != -EAGAIN && len != -EWOULDBLOCK) {
tcp_rst(c, conn, qpair);
--
2.54.0
^ permalink raw reply [flat|nested] 9+ messages in thread* [PATCH 4/8] udp_vu: Make virtqueue buffers stack-local for thread safety
2026-06-16 17:10 [PATCH 0/8] multithreading: Prepare data structures for concurrent queue pair workers Laurent Vivier
` (2 preceding siblings ...)
2026-06-16 17:10 ` [PATCH 3/8] tcp: Make static buffers stack-local " Laurent Vivier
@ 2026-06-16 17:10 ` Laurent Vivier
2026-06-16 17:10 ` [PATCH 5/8] flow: Make flow timer per-caller " Laurent Vivier
` (3 subsequent siblings)
7 siblings, 0 replies; 9+ messages in thread
From: Laurent Vivier @ 2026-06-16 17:10 UTC (permalink / raw)
To: passt-dev; +Cc: Laurent Vivier
The function-local static buffers elem[] and iov_vu[] in
udp_vu_sock_to_tap() are shared across all threads. When multiple
worker threads process UDP vhost-user data concurrently, they would
stomp on each other's buffers.
Remove the static qualifier so each call gets its own stack-allocated
arrays, eliminating cross-thread sharing.
Signed-off-by: Laurent Vivier <lvivier@redhat.com>
---
udp_vu.c | 4 ++--
1 file changed, 2 insertions(+), 2 deletions(-)
diff --git a/udp_vu.c b/udp_vu.c
index 864e7a99b8d9..342673dc7e6d 100644
--- a/udp_vu.c
+++ b/udp_vu.c
@@ -146,8 +146,8 @@ void udp_vu_sock_to_tap(const struct ctx *c, int s, int n, flow_sidx_t tosidx,
{
const struct flowside *toside = flowside_at_sidx(tosidx);
bool v6 = !(inany_v4(&toside->eaddr) && inany_v4(&toside->oaddr));
- static struct vu_virtq_element elem[VIRTQUEUE_MAX_SIZE];
- static struct iovec iov_vu[VIRTQUEUE_MAX_SIZE];
+ struct vu_virtq_element elem[VIRTQUEUE_MAX_SIZE];
+ struct iovec iov_vu[VIRTQUEUE_MAX_SIZE];
int rx_queue = QPAIR_TOGUEST_QUEUE(qpair);
struct vu_dev *vdev = c->vdev;
struct vu_virtq *vq = &vdev->vq[rx_queue];
--
2.54.0
^ permalink raw reply [flat|nested] 9+ messages in thread* [PATCH 5/8] flow: Make flow timer per-caller for thread safety
2026-06-16 17:10 [PATCH 0/8] multithreading: Prepare data structures for concurrent queue pair workers Laurent Vivier
` (3 preceding siblings ...)
2026-06-16 17:10 ` [PATCH 4/8] udp_vu: Make virtqueue " Laurent Vivier
@ 2026-06-16 17:10 ` Laurent Vivier
2026-06-16 17:10 ` [PATCH 6/8] tcp: Make TCP timer state per-caller and guard global tasks Laurent Vivier
` (2 subsequent siblings)
7 siblings, 0 replies; 9+ messages in thread
From: Laurent Vivier @ 2026-06-16 17:10 UTC (permalink / raw)
To: passt-dev; +Cc: Laurent Vivier
Move the static flow_timer_run variable out of flow.c and pass it as a
parameter to flow_defer_handler(). This allows each caller to maintain
its own timer state: each vhost-user queue pair worker uses the per-qpair
context.
Signed-off-by: Laurent Vivier <lvivier@redhat.com>
---
flow.c | 11 +++++------
flow.h | 2 +-
passt.c | 8 +++++---
3 files changed, 11 insertions(+), 10 deletions(-)
diff --git a/flow.c b/flow.c
index 787a7139cfc1..08c7620c7b0f 100644
--- a/flow.c
+++ b/flow.c
@@ -142,9 +142,6 @@ static flow_sidx_t flow_hashtab[FLOW_HASH_SIZE];
static_assert(ARRAY_SIZE(flow_hashtab) >= 2 * FLOW_MAX,
"Safe linear probing requires hash table with more entries than the number of sides in the flow table");
-/* Last time the flow timers ran */
-static struct timespec flow_timer_run;
-
/** flowside_from_af() - Initialise flowside from addresses
* @side: flowside to initialise
* @af: Address family (AF_INET or AF_INET6)
@@ -898,9 +895,11 @@ flow_sidx_t flow_lookup_sa(const struct ctx *c, uint8_t proto, uint8_t pif,
* flow_defer_handler() - Handler for per-flow deferred and timed tasks
* @c: Execution context
* @now: Current timestamp
+ * @timer_run: Last time the flow timers ran
+ * @qpair: Queue pair to process
*/
void flow_defer_handler(const struct ctx *c, const struct timespec *now,
- unsigned int qpair)
+ struct timespec *timer_run, unsigned int qpair)
{
struct flow_free_cluster *free_head = NULL;
unsigned *last_next = &flow_first_free;
@@ -908,9 +907,9 @@ void flow_defer_handler(const struct ctx *c, const struct timespec *now,
bool timer = false;
union flow *flow;
- if (timespec_diff_ms(now, &flow_timer_run) >= FLOW_TIMER_INTERVAL) {
+ if (timespec_diff_ms(now, timer_run) >= FLOW_TIMER_INTERVAL) {
timer = true;
- flow_timer_run = *now;
+ *timer_run = *now;
}
assert(!flow_new_entry); /* Incomplete flow at end of cycle */
diff --git a/flow.h b/flow.h
index 53e0408a9ee5..10634e64a7fc 100644
--- a/flow.h
+++ b/flow.h
@@ -280,7 +280,7 @@ void flow_migrate(struct flow_common *f, unsigned int qpair, uint32_t events,
(flow_migrate(&(flow_)->f, qpair_, events_, fd_, sidei_))
void flow_defer_handler(const struct ctx *c, const struct timespec *now,
- unsigned int qpair);
+ struct timespec *timer_run, unsigned int qpair);
int flow_migrate_source_early(struct ctx *c, const struct migrate_stage *stage,
int fd);
int flow_migrate_source_pre(struct ctx *c, const struct migrate_stage *stage,
diff --git a/passt.c b/passt.c
index 3afc59b19120..bebc2b99f523 100644
--- a/passt.c
+++ b/passt.c
@@ -98,15 +98,16 @@ struct passt_stats {
* post_handler() - Run periodic and deferred tasks for L4 protocol handlers
* @c: Execution context
* @now: Current timestamp
+ * @timer_run: Last time the flow timers ran
* @qpair: Queue pair to process
*/
static void post_handler(struct ctx *c, const struct timespec *now,
- unsigned int qpair)
+ struct timespec *timer_run, unsigned int qpair)
{
if (!c->no_tcp)
tcp_defer_handler(c, now, qpair);
- flow_defer_handler(c, now, qpair);
+ flow_defer_handler(c, now, timer_run, qpair);
fwd_scan_ports_timer(c, now);
if (!c->no_ndp)
@@ -221,6 +222,7 @@ static void print_stats(const struct ctx *c, const struct passt_stats *stats,
static void passt_worker(void *opaque, int nfds, struct epoll_event *events)
{
static struct passt_stats stats = { 0 };
+ static struct timespec flow_timer_run;
struct ctx *c = opaque;
struct timespec now;
int i;
@@ -304,7 +306,7 @@ static void passt_worker(void *opaque, int nfds, struct epoll_event *events)
print_stats(c, &stats, &now);
}
- post_handler(c, &now, QPAIR_DEFAULT);
+ post_handler(c, &now, &flow_timer_run, QPAIR_DEFAULT);
migrate_handler(c);
}
--
2.54.0
^ permalink raw reply [flat|nested] 9+ messages in thread* [PATCH 6/8] tcp: Make TCP timer state per-caller and guard global tasks
2026-06-16 17:10 [PATCH 0/8] multithreading: Prepare data structures for concurrent queue pair workers Laurent Vivier
` (4 preceding siblings ...)
2026-06-16 17:10 ` [PATCH 5/8] flow: Make flow timer per-caller " Laurent Vivier
@ 2026-06-16 17:10 ` Laurent Vivier
2026-06-16 17:10 ` [PATCH 7/8] tcp: Protect init socket pools with mutex for thread safety Laurent Vivier
2026-06-16 17:10 ` [PATCH 8/8] flow: Add mutex and per-qpair filtering to flow table operations Laurent Vivier
7 siblings, 0 replies; 9+ messages in thread
From: Laurent Vivier @ 2026-06-16 17:10 UTC (permalink / raw)
To: passt-dev; +Cc: Laurent Vivier
tcp_defer_handler() uses c->tcp.timer_run, c->tcp.keepalive_run, and
c->tcp.inactivity_run as global timer gates shared across all callers.
In multiqueue mode, multiple qpair workers will call tcp_defer_handler()
concurrently, causing races on these fields. It also unconditionally
runs tcp_payload_flush(), tcp_sock_refill_init(), and tcp_splice_refill()
which operate on global state.
Add timer_run, keepalive_run, and inactivity_run as parameters so each
caller provides its own per-qpair timer state. Remove the now-unused
fields from struct tcp_ctx and drop timer_init() which only initialised
c->tcp.timer_run.
Guard tcp_payload_flush() and socket pool refills with qpair == 0 since
they operate on global buffers shared across all queue pairs.
Signed-off-by: Laurent Vivier <lvivier@redhat.com>
---
passt.c | 37 +++++++++++++++++--------------------
tcp.c | 51 +++++++++++++++++++++++++++++++--------------------
tcp.h | 9 ++-------
3 files changed, 50 insertions(+), 47 deletions(-)
diff --git a/passt.c b/passt.c
index bebc2b99f523..ca5973e17317 100644
--- a/passt.c
+++ b/passt.c
@@ -96,16 +96,23 @@ struct passt_stats {
/**
* post_handler() - Run periodic and deferred tasks for L4 protocol handlers
- * @c: Execution context
- * @now: Current timestamp
- * @timer_run: Last time the flow timers ran
- * @qpair: Queue pair to process
+ * @c: Execution context
+ * @now: Current timestamp
+ * @timer_run: Last time the flow timers ran
+ * @tcp_timer_run: Last time TCP timers ran
+ * @keepalive_run: Last time keepalives ran
+ * @inactivity_run: Last time inactivity scan ran
+ * @qpair: Queue pair to process
*/
static void post_handler(struct ctx *c, const struct timespec *now,
- struct timespec *timer_run, unsigned int qpair)
+ struct timespec *timer_run,
+ struct timespec *tcp_timer_run,
+ time_t *keepalive_run,
+ time_t *inactivity_run, unsigned int qpair)
{
if (!c->no_tcp)
- tcp_defer_handler(c, now, qpair);
+ tcp_defer_handler(c, now, tcp_timer_run, keepalive_run,
+ inactivity_run, qpair);
flow_defer_handler(c, now, timer_run, qpair);
fwd_scan_ports_timer(c, now);
@@ -130,16 +137,6 @@ static void random_init(struct ctx *c)
srandom(seed);
}
-/**
- * timer_init() - Set initial timestamp for timer runs to current time
- * @c: Execution context
- * @now: Current timestamp
- */
-static void timer_init(struct ctx *c, const struct timespec *now)
-{
- c->tcp.timer_run = *now;
-}
-
/**
* proto_update_l2_buf() - Update scatter-gather L2 buffers in protocol handlers
* @eth_d: Ethernet destination address, NULL if unchanged
@@ -221,8 +218,9 @@ static void print_stats(const struct ctx *c, const struct passt_stats *stats,
*/
static void passt_worker(void *opaque, int nfds, struct epoll_event *events)
{
+ static time_t keepalive_run, inactivity_run;
static struct passt_stats stats = { 0 };
- static struct timespec flow_timer_run;
+ static struct timespec flow_timer_run, tcp_timer_run;
struct ctx *c = opaque;
struct timespec now;
int i;
@@ -306,7 +304,8 @@ static void passt_worker(void *opaque, int nfds, struct epoll_event *events)
print_stats(c, &stats, &now);
}
- post_handler(c, &now, &flow_timer_run, QPAIR_DEFAULT);
+ post_handler(c, &now, &flow_timer_run, &tcp_timer_run,
+ &keepalive_run, &inactivity_run, QPAIR_DEFAULT);
migrate_handler(c);
}
@@ -433,8 +432,6 @@ int main(int argc, char **argv)
isolate_postfork(&c);
- timer_init(&c, &now);
-
loop:
/* NOLINTBEGIN(bugprone-branch-clone): intervals can be the same */
/* cppcheck-suppress [duplicateValueTernary, unmatchedSuppression] */
diff --git a/tcp.c b/tcp.c
index f4fe866ba7c3..955012355d69 100644
--- a/tcp.c
+++ b/tcp.c
@@ -2988,17 +2988,19 @@ int tcp_init(struct ctx *c)
/**
* tcp_keepalive() - Send keepalives for connections which need it
* @c: Execution context
+ * @now: Current timestamp
+ * @last_run: Last time keepalives ran, updated on run
* @qpair: Queue pair to process
*/
-static void tcp_keepalive(struct ctx *c, const struct timespec *now,
- unsigned int qpair)
+static void tcp_keepalive(const struct ctx *c, const struct timespec *now,
+ time_t *last_run, unsigned int qpair)
{
union flow *flow;
- if (now->tv_sec - c->tcp.keepalive_run < KEEPALIVE_INTERVAL)
+ if (now->tv_sec - *last_run < KEEPALIVE_INTERVAL)
return;
- c->tcp.keepalive_run = now->tv_sec;
+ *last_run = now->tv_sec;
flow_foreach_of_type(flow, FLOW_TCP) {
struct tcp_tap_conn *conn = &flow->tcp;
@@ -3021,18 +3023,20 @@ static void tcp_keepalive(struct ctx *c, const struct timespec *now,
/**
* tcp_inactivity() - Scan for and close long-inactive connections
* @c: Execution context
+ * @now: Current timestamp
+ * @last_run: Last time inactivity scan ran, updated on run
* @qpair: Queue pair to process
*/
-static void tcp_inactivity(struct ctx *c, const struct timespec *now,
- unsigned int qpair)
+static void tcp_inactivity(const struct ctx *c, const struct timespec *now,
+ time_t *last_run, unsigned int qpair)
{
union flow *flow;
- if (now->tv_sec - c->tcp.inactivity_run < INACTIVITY_INTERVAL)
+ if (now->tv_sec - *last_run < INACTIVITY_INTERVAL)
return;
debug("TCP inactivity scan");
- c->tcp.inactivity_run = now->tv_sec;
+ *last_run = now->tv_sec;
flow_foreach_of_type(flow, FLOW_TCP) {
struct tcp_tap_conn *conn = &flow->tcp;
@@ -3054,27 +3058,34 @@ static void tcp_inactivity(struct ctx *c, const struct timespec *now,
/**
* tcp_defer_handler() - Handler for TCP deferred tasks
- * @c: Execution context
- * @now: Current timestamp
- * @qpair: Queue pair to process
+ * @c: Execution context
+ * @now: Current timestamp
+ * @timer_run: Last time TCP timers ran
+ * @keepalive_run: Last time keepalives ran
+ * @inactivity_run: Last time inactivity scan ran
+ * @qpair: Queue pair to process
*/
/* cppcheck-suppress [constParameterPointer, unmatchedSuppression] */
void tcp_defer_handler(struct ctx *c, const struct timespec *now,
- unsigned int qpair)
+ struct timespec *timer_run, time_t *keepalive_run,
+ time_t *inactivity_run, unsigned int qpair)
{
- tcp_payload_flush(c);
+ if (qpair == 0)
+ tcp_payload_flush(c);
- if (timespec_diff_ms(now, &c->tcp.timer_run) < TCP_TIMER_INTERVAL)
+ if (timespec_diff_ms(now, timer_run) < TCP_TIMER_INTERVAL)
return;
- c->tcp.timer_run = *now;
+ *timer_run = *now;
- tcp_sock_refill_init(c);
- if (c->mode == MODE_PASTA)
- tcp_splice_refill(c);
+ if (qpair == 0) {
+ tcp_sock_refill_init(c);
+ if (c->mode == MODE_PASTA)
+ tcp_splice_refill(c);
+ }
- tcp_keepalive(c, now, qpair);
- tcp_inactivity(c, now, qpair);
+ tcp_keepalive(c, now, keepalive_run, qpair);
+ tcp_inactivity(c, now, inactivity_run, qpair);
}
/**
diff --git a/tcp.h b/tcp.h
index 490f1b140e44..64c75ba481bd 100644
--- a/tcp.h
+++ b/tcp.h
@@ -32,7 +32,8 @@ int tcp_listen(const struct ctx *c, uint8_t pif, unsigned rule,
const union inany_addr *addr, const char *ifname, in_port_t port);
int tcp_init(struct ctx *c);
void tcp_defer_handler(struct ctx *c, const struct timespec *now,
- unsigned int qpair);
+ struct timespec *timer_run, time_t *keepalive_run,
+ time_t *inactivity_run, unsigned int qpair);
void tcp_update_l2_buf(const unsigned char *eth_d);
@@ -42,24 +43,18 @@ extern bool peek_offset_cap;
* struct tcp_ctx - Execution context for TCP routines
* @scan_in: Port scanning state for inbound packets
* @scan_out: Port scanning state for outbound packets
- * @timer_run: Timestamp of most recent timer run
* @pipe_size: Size of pipes for spliced connections
* @rto_max: Maximum retry timeout (in s)
* @syn_retries: SYN retries using exponential backoff timeout
* @syn_linear_timeouts: SYN retries before using exponential backoff timeout
- * @keepalive_run: Time we last issued tap-side keepalives
- * @inactivity_run: Time we last scanned for inactive connections
*/
struct tcp_ctx {
struct fwd_scan scan_in;
struct fwd_scan scan_out;
- struct timespec timer_run;
size_t pipe_size;
int rto_max;
uint8_t syn_retries;
uint8_t syn_linear_timeouts;
- time_t keepalive_run;
- time_t inactivity_run;
};
#endif /* TCP_H */
--
2.54.0
^ permalink raw reply [flat|nested] 9+ messages in thread* [PATCH 7/8] tcp: Protect init socket pools with mutex for thread safety
2026-06-16 17:10 [PATCH 0/8] multithreading: Prepare data structures for concurrent queue pair workers Laurent Vivier
` (5 preceding siblings ...)
2026-06-16 17:10 ` [PATCH 6/8] tcp: Make TCP timer state per-caller and guard global tasks Laurent Vivier
@ 2026-06-16 17:10 ` Laurent Vivier
2026-06-16 17:10 ` [PATCH 8/8] flow: Add mutex and per-qpair filtering to flow table operations Laurent Vivier
7 siblings, 0 replies; 9+ messages in thread
From: Laurent Vivier @ 2026-06-16 17:10 UTC (permalink / raw)
To: passt-dev; +Cc: Laurent Vivier
The pre-opened socket pools init_sock_pool4/6 are consumed by
tcp_conn_pool_sock() when creating new connections from any worker
thread, and refilled by tcp_sock_refill_pool() from tcp_timer() in
post_handler(). These can run concurrently on different threads.
Add a mutex protecting both operations in tcp_conn_sock() and
tcp_sock_refill_init(), where init namespace pools are accessed.
Signed-off-by: Laurent Vivier <lvivier@redhat.com>
---
tcp.c | 10 +++++++++-
1 file changed, 9 insertions(+), 1 deletion(-)
diff --git a/tcp.c b/tcp.c
index 955012355d69..019340c1c348 100644
--- a/tcp.c
+++ b/tcp.c
@@ -293,6 +293,7 @@
#include <sys/uio.h>
#include <time.h>
#include <arpa/inet.h>
+#include <pthread.h>
#include <linux/sockios.h>
#include <linux/sock_diag.h>
@@ -439,6 +440,7 @@ static socklen_t tcp_info_size;
/* Pools for pre-opened sockets (in init) */
int init_sock_pool4 [TCP_SOCK_POOL_SIZE];
int init_sock_pool6 [TCP_SOCK_POOL_SIZE];
+static pthread_mutex_t sock_pool_lock = PTHREAD_MUTEX_INITIALIZER;
/**
* conn_at_sidx() - Get TCP connection specific flow at given sidx
@@ -1568,7 +1570,11 @@ int tcp_conn_sock(sa_family_t af)
int *pool = af == AF_INET6 ? init_sock_pool6 : init_sock_pool4;
int s;
- if ((s = tcp_conn_pool_sock(pool)) >= 0)
+ pthread_mutex_lock(&sock_pool_lock);
+ s = tcp_conn_pool_sock(pool);
+ pthread_mutex_unlock(&sock_pool_lock);
+
+ if (s >= 0)
return s;
/* If the pool is empty we just open a new one without refilling the
@@ -2857,6 +2863,7 @@ int tcp_sock_refill_pool(int pool[], sa_family_t af)
*/
static void tcp_sock_refill_init(const struct ctx *c)
{
+ pthread_mutex_lock(&sock_pool_lock);
if (c->ifi4) {
int rc = tcp_sock_refill_pool(init_sock_pool4, AF_INET);
if (rc < 0)
@@ -2869,6 +2876,7 @@ static void tcp_sock_refill_init(const struct ctx *c)
warn("TCP: Error refilling IPv6 host socket pool: %s",
strerror_(-rc));
}
+ pthread_mutex_unlock(&sock_pool_lock);
}
/**
--
2.54.0
^ permalink raw reply [flat|nested] 9+ messages in thread* [PATCH 8/8] flow: Add mutex and per-qpair filtering to flow table operations
2026-06-16 17:10 [PATCH 0/8] multithreading: Prepare data structures for concurrent queue pair workers Laurent Vivier
` (6 preceding siblings ...)
2026-06-16 17:10 ` [PATCH 7/8] tcp: Protect init socket pools with mutex for thread safety Laurent Vivier
@ 2026-06-16 17:10 ` Laurent Vivier
7 siblings, 0 replies; 9+ messages in thread
From: Laurent Vivier @ 2026-06-16 17:10 UTC (permalink / raw)
To: passt-dev; +Cc: Laurent Vivier
The flow table free list, hash table, and flow_new_entry are global
shared state accessed from multiple threads.
Protect flow_alloc(), flow_alloc_cancel(), flow_hash_insert(),
flow_hash_remove(), and the free list rebuild in flow_defer_handler()
with a pthread_rwlock_t: writers for mutations, readers for lookups.
Make flow_new_entry _Thread_local so each thread independently tracks
its own in-progress allocation.
Since the lock is released between flow_alloc() and flow_activate(),
other threads can observe intermediate flow states (NEW, INI, TGT,
TYPED) during traversal. Adapt flow_foreach() and flow_defer_handler()
accordingly: skip these entries silently rather than treating them as
errors, and break free-list cluster merging across them.
Filter flow_defer_handler()'s first loop by qpair, so each thread
only processes its own flows.
Signed-off-by: Laurent Vivier <lvivier@redhat.com>
---
flow.c | 59 +++++++++++++++++++++++++++++++++++++++++++---------
flow_table.h | 2 +-
2 files changed, 50 insertions(+), 11 deletions(-)
diff --git a/flow.c b/flow.c
index 08c7620c7b0f..149360c3ec87 100644
--- a/flow.c
+++ b/flow.c
@@ -12,6 +12,8 @@
#include <sched.h>
#include <string.h>
+#include <pthread.h>
+
#include "util.h"
#include "ip.h"
#include "passt.h"
@@ -129,7 +131,7 @@ static_assert(ARRAY_SIZE(flow_epoll) == FLOW_NUM_TYPES,
unsigned flow_first_free;
union flow flowtab[FLOW_MAX];
-static const union flow *flow_new_entry; /* = NULL */
+static _Thread_local const union flow *flow_new_entry; /* = NULL */
int qpair_to_fd[FLOW_QPAIR_SIZE];
/* Hash table to index it */
@@ -142,6 +144,8 @@ static flow_sidx_t flow_hashtab[FLOW_HASH_SIZE];
static_assert(ARRAY_SIZE(flow_hashtab) >= 2 * FLOW_MAX,
"Safe linear probing requires hash table with more entries than the number of sides in the flow table");
+static pthread_rwlock_t flow_lock = PTHREAD_RWLOCK_INITIALIZER;
+
/** flowside_from_af() - Initialise flowside from addresses
* @side: flowside to initialise
* @af: Address family (AF_INET or AF_INET6)
@@ -616,12 +620,18 @@ void flow_activate(struct flow_common *f)
*/
union flow *flow_alloc(void)
{
- union flow *flow = &flowtab[flow_first_free];
+ union flow *flow;
+
+ pthread_rwlock_wrlock(&flow_lock);
+
+ flow = &flowtab[flow_first_free];
assert(!flow_new_entry);
- if (flow_first_free >= FLOW_MAX)
+ if (flow_first_free >= FLOW_MAX) {
+ pthread_rwlock_unlock(&flow_lock);
return NULL;
+ }
assert(flow->f.state == FLOW_STATE_FREE);
assert(flow->f.type == FLOW_TYPE_NONE);
@@ -650,6 +660,8 @@ union flow *flow_alloc(void)
memset(flow, 0, sizeof(*flow));
flow_set_state(&flow->f, FLOW_STATE_NEW);
+ pthread_rwlock_unlock(&flow_lock);
+
return flow;
}
@@ -661,6 +673,8 @@ union flow *flow_alloc(void)
*/
void flow_alloc_cancel(union flow *flow)
{
+ pthread_rwlock_wrlock(&flow_lock);
+
assert(flow_new_entry == flow);
assert(flow->f.state == FLOW_STATE_NEW ||
flow->f.state == FLOW_STATE_INI ||
@@ -678,6 +692,8 @@ void flow_alloc_cancel(union flow *flow)
flow->free.next = flow_first_free;
flow_first_free = FLOW_IDX(flow);
flow_new_entry = NULL;
+
+ pthread_rwlock_unlock(&flow_lock);
}
/**
@@ -763,9 +779,13 @@ static inline unsigned flow_hash_probe(const struct ctx *c, flow_sidx_t sidx)
uint64_t flow_hash_insert(const struct ctx *c, flow_sidx_t sidx)
{
uint64_t hash = flow_sidx_hash(c, sidx);
- unsigned b = flow_hash_probe_(hash, sidx);
+ unsigned b;
+ pthread_rwlock_wrlock(&flow_lock);
+ b = flow_hash_probe_(hash, sidx);
flow_hashtab[b] = sidx;
+ pthread_rwlock_unlock(&flow_lock);
+
flow_dbg(flow_at_sidx(sidx), "Side %u hash table insert: bucket: %u",
sidx.sidei, b);
@@ -779,10 +799,16 @@ uint64_t flow_hash_insert(const struct ctx *c, flow_sidx_t sidx)
*/
void flow_hash_remove(const struct ctx *c, flow_sidx_t sidx)
{
- unsigned b = flow_hash_probe(c, sidx), s;
+ unsigned b, s;
- if (!flow_sidx_valid(flow_hashtab[b]))
+ pthread_rwlock_wrlock(&flow_lock);
+
+ b = flow_hash_probe(c, sidx);
+
+ if (!flow_sidx_valid(flow_hashtab[b])) {
+ pthread_rwlock_unlock(&flow_lock);
return; /* Redundant remove */
+ }
flow_dbg(flow_at_sidx(sidx), "Side %u hash table remove: bucket: %u",
sidx.sidei, b);
@@ -802,6 +828,8 @@ void flow_hash_remove(const struct ctx *c, flow_sidx_t sidx)
}
flow_hashtab[b] = FLOW_SIDX_NONE;
+
+ pthread_rwlock_unlock(&flow_lock);
}
/**
@@ -816,10 +844,12 @@ void flow_hash_remove(const struct ctx *c, flow_sidx_t sidx)
static flow_sidx_t flowside_lookup(const struct ctx *c, uint8_t proto,
uint8_t pif, const struct flowside *side)
{
- flow_sidx_t sidx;
+ flow_sidx_t sidx, ret;
union flow *flow;
unsigned b;
+ pthread_rwlock_rdlock(&flow_lock);
+
b = flow_hash(c, proto, pif, side) % FLOW_HASH_SIZE;
while ((sidx = flow_hashtab[b], flow = flow_at_sidx(sidx)) &&
!(FLOW_PROTO(&flow->f) == proto &&
@@ -827,7 +857,11 @@ static flow_sidx_t flowside_lookup(const struct ctx *c, uint8_t proto,
flowside_eq(&flow->f.side[sidx.sidei], side)))
b = mod_sub(b, 1, FLOW_HASH_SIZE);
- return flow_hashtab[b];
+ ret = flow_hashtab[b];
+
+ pthread_rwlock_unlock(&flow_lock);
+
+ return ret;
}
/**
@@ -920,6 +954,9 @@ void flow_defer_handler(const struct ctx *c, const struct timespec *now,
flow_foreach(flow) {
bool closed = false;
+ if (flow->f.qpair != qpair)
+ continue;
+
switch (flow->f.type) {
case FLOW_TYPE_NONE:
assert(false);
@@ -951,6 +988,7 @@ void flow_defer_handler(const struct ctx *c, const struct timespec *now,
}
/* Second step: actually free the flows */
+ pthread_rwlock_wrlock(&flow_lock);
flow_foreach_slot(flow) {
switch (flow->f.state) {
case FLOW_STATE_FREE: {
@@ -979,8 +1017,8 @@ void flow_defer_handler(const struct ctx *c, const struct timespec *now,
case FLOW_STATE_INI:
case FLOW_STATE_TGT:
case FLOW_STATE_TYPED:
- /* Incomplete flow at end of cycle */
- assert(false);
+ /* In-progress allocation on another thread */
+ free_head = NULL;
break;
case FLOW_STATE_ACTIVE:
@@ -1012,6 +1050,7 @@ void flow_defer_handler(const struct ctx *c, const struct timespec *now,
}
*last_next = FLOW_MAX;
+ pthread_rwlock_unlock(&flow_lock);
}
/**
diff --git a/flow_table.h b/flow_table.h
index e4ff6f73c35c..f2545390205a 100644
--- a/flow_table.h
+++ b/flow_table.h
@@ -72,7 +72,7 @@ extern union flow flowtab[];
(flow) += (flow)->free.n - 1; \
/* NOLINTNEXTLINE(readability-inconsistent-ifelse-braces) */\
else if ((flow)->f.state != FLOW_STATE_ACTIVE) { \
- flow_err((flow), "Bad flow state during traversal"); \
+ (void)0; /* Differs from bare continue */ \
continue; \
} else
--
2.54.0
^ permalink raw reply [flat|nested] 9+ messages in thread