public inbox for passt-dev@passt.top
 help / color / mirror / code / Atom feed
From: Stefano Brivio <sbrivio@redhat.com>
To: passt-dev@passt.top
Cc: David Gibson <david@gibson.dropbear.id.au>
Subject: [PATCH v27 1/2] migrate: Migrate TCP flows
Date: Sun, 16 Feb 2025 23:22:26 +0100	[thread overview]
Message-ID: <20250216222227.2017788-2-sbrivio@redhat.com> (raw)
In-Reply-To: <20250216222227.2017788-1-sbrivio@redhat.com>

This implements flow preparation on the source, transfer of data with
a format roughly inspired by struct tcp_tap_conn, plus a specific
structure for parameters that don't fit in the flow table, and flow
insertion on the target, with all the appropriate window options,
window scaling, MSS, etc.

Contents of pending queues are transferred as well.

The target side is rather convoluted because we first need to create
sockets and switch them to repair mode, before we can apply options
that are *not* stored in the flow table. This also means that, if
we're testing this on the same machine, in the same namespace, we need
to close the listening socket on the source before we can start moving
data.

Further, we need to connect() the socket on the target before we can
restore data queues, but we can't do that (again, on the same machine)
as long as the matching source socket is open, which implies an
arbitrary limit on queue sizes we can transfer, because we can only
dump pending queues on the source as long as the socket is open, of
course.

Co-authored-by: David Gibson <david@gibson.dropbear.id.au>
Signed-off-by: Stefano Brivio <sbrivio@redhat.com>
---
 contrib/selinux/passt.te |   4 +-
 flow.c                   | 243 +++++++++++
 flow.h                   |   8 +
 migrate.c                |  10 +
 passt.c                  |   6 +-
 repair.c                 |   1 -
 tcp.c                    | 919 +++++++++++++++++++++++++++++++++++++++
 tcp_conn.h               | 103 +++++
 8 files changed, 1288 insertions(+), 6 deletions(-)

diff --git a/contrib/selinux/passt.te b/contrib/selinux/passt.te
index fc1320d..f595079 100644
--- a/contrib/selinux/passt.te
+++ b/contrib/selinux/passt.te
@@ -45,7 +45,7 @@ require {
 	type net_conf_t;
 	type proc_net_t;
 	type node_t;
-	class tcp_socket { create accept listen name_bind name_connect };
+	class tcp_socket { create accept listen name_bind name_connect getattr };
 	class udp_socket { create accept listen };
 	class icmp_socket { bind create name_bind node_bind setopt read write };
 	class sock_file { create unlink write };
@@ -129,7 +129,7 @@ corenet_udp_sendrecv_all_ports(passt_t)
 allow passt_t node_t:icmp_socket { name_bind node_bind };
 allow passt_t port_t:icmp_socket name_bind;
 
-allow passt_t self:tcp_socket { create getopt setopt connect bind listen accept shutdown read write };
+allow passt_t self:tcp_socket { create getopt setopt connect bind listen accept shutdown read write getattr };
 allow passt_t self:udp_socket { create getopt setopt connect bind read write };
 allow passt_t self:icmp_socket { bind create setopt read write };
 
diff --git a/flow.c b/flow.c
index 3ac551b..cc881e8 100644
--- a/flow.c
+++ b/flow.c
@@ -19,6 +19,7 @@
 #include "inany.h"
 #include "flow.h"
 #include "flow_table.h"
+#include "repair.h"
 
 const char *flow_state_str[] = {
 	[FLOW_STATE_FREE]	= "FREE",
@@ -52,6 +53,35 @@ const uint8_t flow_proto[] = {
 static_assert(ARRAY_SIZE(flow_proto) == FLOW_NUM_TYPES,
 	      "flow_proto[] doesn't match enum flow_type");
 
+#define foreach_flow(i, flow, bound)					\
+	for ((i) = 0, (flow) = &flowtab[(i)];				\
+	     (i) < (bound);						\
+	     (i)++, (flow) = &flowtab[(i)])				\
+		if ((flow)->f.state == FLOW_STATE_FREE)			\
+			(i) += (flow)->free.n - 1;			\
+		else
+
+#define foreach_active_flow(i, flow, bound)				\
+	foreach_flow((i), (flow), (bound))				\
+		if ((flow)->f.state != FLOW_STATE_ACTIVE)		\
+			/* NOLINTNEXTLINE(bugprone-branch-clone) */	\
+			continue;					\
+		else
+
+#define foreach_tcp_flow(i, flow, bound)				\
+	foreach_active_flow((i), (flow), (bound))			\
+		if ((flow)->f.type != FLOW_TCP)				\
+			/* NOLINTNEXTLINE(bugprone-branch-clone) */	\
+			continue;					\
+		else
+
+#define foreach_established_tcp_flow(i, flow, bound)			\
+	foreach_tcp_flow((i), (flow), (bound))				\
+		if (!tcp_flow_is_established(&(flow)->tcp))		\
+			/* NOLINTNEXTLINE(bugprone-branch-clone) */	\
+			continue;					\
+		else
+
 /* Global Flow Table */
 
 /**
@@ -874,6 +904,219 @@ void flow_defer_handler(const struct ctx *c, const struct timespec *now)
 	*last_next = FLOW_MAX;
 }
 
+/**
+ * flow_migrate_source_rollback() - Disable repair mode, return failure
+ * @c:		Execution context
+ * @max_flow:	Maximum index of affected flows
+ * @ret:	Negative error code
+ *
+ * Return: @ret
+ */
+static int flow_migrate_source_rollback(struct ctx *c, unsigned max_flow,
+					int ret)
+{
+	union flow *flow;
+	unsigned i;
+
+	debug("...roll back migration");
+
+	foreach_established_tcp_flow(i, flow, max_flow)
+		if (tcp_flow_repair_off(c, &flow->tcp))
+			die("Failed to roll back TCP_REPAIR mode");
+
+	if (repair_flush(c))
+		die("Failed to roll back TCP_REPAIR mode");
+
+	return ret;
+}
+
+/**
+ * flow_migrate_repair_all() - Turn repair mode on or off for all flows
+ * @c:		Execution context
+ * @enable:	Switch repair mode on if set, off otherwise
+ *
+ * Return: 0 on success, negative error code on failure
+ */
+static int flow_migrate_repair_all(struct ctx *c, bool enable)
+{
+	union flow *flow;
+	unsigned i;
+	int rc;
+
+	foreach_established_tcp_flow(i, flow, FLOW_MAX) {
+		if (enable)
+			rc = tcp_flow_repair_on(c, &flow->tcp);
+		else
+			rc = tcp_flow_repair_off(c, &flow->tcp);
+
+		if (rc) {
+			debug("Can't %s repair mode: %s",
+			      enable ? "enable" : "disable", strerror_(-rc));
+			return flow_migrate_source_rollback(c, i, rc);
+		}
+	}
+
+	if ((rc = repair_flush(c))) {
+		debug("Can't %s repair mode: %s",
+		      enable ? "enable" : "disable", strerror_(-rc));
+		return flow_migrate_source_rollback(c, i, rc);
+	}
+
+	return 0;
+}
+
+/**
+ * flow_migrate_source_pre() - Prepare flows for migration: enable repair mode
+ * @c:		Execution context
+ * @stage:	Migration stage information (unused)
+ * @fd:		Migration file descriptor (unused)
+ *
+ * Return: 0 on success, positive error code on failure
+ */
+int flow_migrate_source_pre(struct ctx *c, const struct migrate_stage *stage,
+			    int fd)
+{
+	int rc;
+
+	(void)stage;
+	(void)fd;
+
+	if ((rc = flow_migrate_repair_all(c, true)))
+		return -rc;
+
+	return 0;
+}
+
+/**
+ * flow_migrate_source() - Dump all the remaining information and send data
+ * @c:		Execution context (unused)
+ * @stage:	Migration stage information (unused)
+ * @fd:		Migration file descriptor
+ *
+ * Return: 0 on success, positive error code on failure
+ */
+int flow_migrate_source(struct ctx *c, const struct migrate_stage *stage,
+			int fd)
+{
+	uint32_t count = 0;
+	bool first = true;
+	union flow *flow;
+	unsigned i;
+	int rc;
+
+	(void)c;
+	(void)stage;
+
+	foreach_established_tcp_flow(i, flow, FLOW_MAX)
+		count++;
+
+	count = htonl(count);
+	if (write_all_buf(fd, &count, sizeof(count))) {
+		rc = errno;
+		err_perror("Can't send flow count (%u)", ntohl(count));
+		return flow_migrate_source_rollback(c, FLOW_MAX, rc);
+	}
+
+	debug("Sending %u flows", ntohl(count));
+
+	/* Dump and send information that can be stored in the flow table.
+	 *
+	 * Limited rollback options here: if we fail to transfer any data (that
+	 * is, on the first flow), undo everything and resume. Otherwise, the
+	 * stream might now be inconsistent, and we might have closed listening
+	 * TCP sockets, so just terminate.
+	 */
+	foreach_established_tcp_flow(i, flow, FLOW_MAX) {
+		rc = tcp_flow_migrate_source(fd, &flow->tcp);
+		if (rc) {
+			err("Can't send data, flow %u: %s", i, strerror_(-rc));
+			if (!first)
+				die("Inconsistent migration state, exiting");
+
+			return flow_migrate_source_rollback(c, FLOW_MAX, -rc);
+		}
+
+		first = false;
+	}
+
+	/* And then "extended" data (including window data we saved previously):
+	 * the target needs to set repair mode on sockets before it can set
+	 * this stuff, but it needs sockets (and flows) for that.
+	 *
+	 * This also closes sockets so that the target can start connecting
+	 * theirs: you can't sendmsg() to queues (using the socket) if the
+	 * socket is not connected (EPIPE), not even in repair mode. And the
+	 * target needs to restore queues now because we're sending the data.
+	 *
+	 * So, no rollback here, just try as hard as we can. Tolerate per-flow
+	 * failures but not if the stream might be inconsistent (reported here
+	 * as EIO).
+	 */
+	foreach_established_tcp_flow(i, flow, FLOW_MAX) {
+		rc = tcp_flow_migrate_source_ext(fd, i, &flow->tcp);
+		if (rc) {
+			err("Extended data for flow %u: %s", i, strerror_(-rc));
+
+			if (rc == -EIO)
+				die("Inconsistent migration state, exiting");
+		}
+	}
+
+	return 0;
+}
+
+/**
+ * flow_migrate_target() - Receive flows and insert in flow table
+ * @c:		Execution context
+ * @stage:	Migration stage information (unused)
+ * @fd:		Migration file descriptor
+ *
+ * Return: 0 on success, positive error code on failure
+ */
+int flow_migrate_target(struct ctx *c, const struct migrate_stage *stage,
+			int fd)
+{
+	uint32_t count;
+	unsigned i;
+	int rc;
+
+	(void)stage;
+
+	if (read_all_buf(fd, &count, sizeof(count)))
+		return errno;
+
+	count = ntohl(count);
+	debug("Receiving %u flows", count);
+
+	if ((rc = flow_migrate_repair_all(c, true)))
+		return -rc;
+
+	repair_flush(c);
+
+	/* TODO: flow header with type, instead? */
+	for (i = 0; i < count; i++) {
+		rc = tcp_flow_migrate_target(c, fd);
+		if (rc) {
+			debug("Migration data failure at flow %u: %s, abort",
+			      i, strerror_(-rc));
+			return -rc;
+		}
+	}
+
+	repair_flush(c);
+
+	for (i = 0; i < count; i++) {
+		rc = tcp_flow_migrate_target_ext(c, flowtab + i, fd);
+		if (rc) {
+			debug("Migration data failure at flow %u: %s, abort",
+			      i, strerror_(-rc));
+			return -rc;
+		}
+	}
+
+	return 0;
+}
+
 /**
  * flow_init() - Initialise flow related data structures
  */
diff --git a/flow.h b/flow.h
index 24ba3ef..675726e 100644
--- a/flow.h
+++ b/flow.h
@@ -249,6 +249,14 @@ union flow;
 
 void flow_init(void);
 void flow_defer_handler(const struct ctx *c, const struct timespec *now);
+int flow_migrate_source_early(struct ctx *c, const struct migrate_stage *stage,
+			      int fd);
+int flow_migrate_source_pre(struct ctx *c, const struct migrate_stage *stage,
+			    int fd);
+int flow_migrate_source(struct ctx *c, const struct migrate_stage *stage,
+			int fd);
+int flow_migrate_target(struct ctx *c, const struct migrate_stage *stage,
+			int fd);
 
 void flow_log_(const struct flow_common *f, int pri, const char *fmt, ...)
 	__attribute__((format(printf, 3, 4)));
diff --git a/migrate.c b/migrate.c
index 1c59016..0fca77b 100644
--- a/migrate.c
+++ b/migrate.c
@@ -103,6 +103,16 @@ static const struct migrate_stage stages_v1[] = {
 		.source = seen_addrs_source_v1,
 		.target = seen_addrs_target_v1,
 	},
+	{
+		.name = "prepare flows",
+		.source = flow_migrate_source_pre,
+		.target = NULL,
+	},
+	{
+		.name = "transfer flows",
+		.source = flow_migrate_source,
+		.target = flow_migrate_target,
+	},
 	{ 0 },
 };
 
diff --git a/passt.c b/passt.c
index 6f9fb4d..68d1a28 100644
--- a/passt.c
+++ b/passt.c
@@ -223,9 +223,6 @@ int main(int argc, char **argv)
 		if (sigaction(SIGCHLD, &sa, NULL))
 			die_perror("Couldn't install signal handlers");
 
-		if (signal(SIGPIPE, SIG_IGN) == SIG_ERR)
-			die_perror("Couldn't set disposition for SIGPIPE");
-
 		c.mode = MODE_PASTA;
 	} else if (strstr(name, "passt")) {
 		c.mode = MODE_PASST;
@@ -233,6 +230,9 @@ int main(int argc, char **argv)
 		_exit(EXIT_FAILURE);
 	}
 
+	if (signal(SIGPIPE, SIG_IGN) == SIG_ERR)
+		die_perror("Couldn't set disposition for SIGPIPE");
+
 	madvise(pkt_buf, TAP_BUF_BYTES, MADV_HUGEPAGE);
 
 	c.epollfd = epoll_create1(EPOLL_CLOEXEC);
diff --git a/repair.c b/repair.c
index dac28a6..3ee089f 100644
--- a/repair.c
+++ b/repair.c
@@ -202,7 +202,6 @@ int repair_flush(struct ctx *c)
  *
  * Return: 0 on success, negative error code on failure
  */
-/* cppcheck-suppress unusedFunction */
 int repair_set(struct ctx *c, int s, int cmd)
 {
 	int rc;
diff --git a/tcp.c b/tcp.c
index b978b30..98e1c6a 100644
--- a/tcp.c
+++ b/tcp.c
@@ -280,6 +280,7 @@
 #include <stddef.h>
 #include <string.h>
 #include <sys/epoll.h>
+#include <sys/ioctl.h>
 #include <sys/socket.h>
 #include <sys/timerfd.h>
 #include <sys/types.h>
@@ -287,6 +288,8 @@
 #include <time.h>
 #include <arpa/inet.h>
 
+#include <linux/sockios.h>
+
 #include "checksum.h"
 #include "util.h"
 #include "iov.h"
@@ -299,6 +302,7 @@
 #include "log.h"
 #include "inany.h"
 #include "flow.h"
+#include "repair.h"
 #include "linux_dep.h"
 
 #include "flow_table.h"
@@ -306,6 +310,21 @@
 #include "tcp_buf.h"
 #include "tcp_vu.h"
 
+#ifndef __USE_MISC
+/* From Linux UAPI, missing in netinet/tcp.h provided by musl */
+struct tcp_repair_opt {
+	__u32	opt_code;
+	__u32	opt_val;
+};
+
+enum {
+	TCP_NO_QUEUE,
+	TCP_RECV_QUEUE,
+	TCP_SEND_QUEUE,
+	TCP_QUEUES_NR,
+};
+#endif
+
 /* MSS rounding: see SET_MSS() */
 #define MSS_DEFAULT			536
 #define WINDOW_DEFAULT			14600		/* RFC 6928 */
@@ -326,6 +345,19 @@
 	 ((conn)->events & (SOCK_FIN_RCVD | TAP_FIN_RCVD)))
 #define CONN_HAS(conn, set)	(((conn)->events & (set)) == (set))
 
+/* Buffers to migrate pending data from send and receive queues. No, they don't
+ * use memory if we don't use them. And we're going away after this, so splurge.
+ */
+#define TCP_MIGRATE_SND_QUEUE_MAX	(64 << 20)
+#define TCP_MIGRATE_RCV_QUEUE_MAX	(64 << 20)
+uint8_t tcp_migrate_snd_queue		[TCP_MIGRATE_SND_QUEUE_MAX];
+uint8_t tcp_migrate_rcv_queue		[TCP_MIGRATE_RCV_QUEUE_MAX];
+
+#define TCP_MIGRATE_RESTORE_CHUNK_MIN	1024 /* Try smaller when above this */
+
+/* "Extended" data (not stored in the flow table) for TCP flow migration */
+static struct tcp_tap_transfer_ext migrate_ext[FLOW_MAX];
+
 static const char *tcp_event_str[] __attribute((__unused__)) = {
 	"SOCK_ACCEPTED", "TAP_SYN_RCVD", "ESTABLISHED", "TAP_SYN_ACK_SENT",
 
@@ -1468,6 +1500,7 @@ static void tcp_conn_from_tap(const struct ctx *c, sa_family_t af,
 
 	conn->sock = s;
 	conn->timer = -1;
+	conn->listening_sock = -1;
 	conn_event(c, conn, TAP_SYN_RCVD);
 
 	conn->wnd_to_tap = WINDOW_DEFAULT;
@@ -1968,10 +2001,27 @@ int tcp_tap_handler(const struct ctx *c, uint8_t pif, sa_family_t af,
 		ack_due = 1;
 
 	if ((conn->events & TAP_FIN_RCVD) && !(conn->events & SOCK_FIN_SENT)) {
+		socklen_t sl;
+		struct tcp_info tinfo;
+
 		shutdown(conn->sock, SHUT_WR);
 		conn_event(c, conn, SOCK_FIN_SENT);
 		tcp_send_flag(c, conn, ACK);
 		ack_due = 0;
+
+		/* If we received a FIN, but the socket is in TCP_ESTABLISHED
+		 * state, it must be a migrated socket. The kernel saw the FIN
+		 * on the source socket, but not on the target socket.
+		 *
+		 * Approximate the effect of that FIN: as we're sending a FIN
+		 * out ourselves, the socket is now in a state equivalent to
+		 * LAST_ACK. Now that we sent the FIN out, close it with a RST.
+		 */
+		sl = sizeof(tinfo);
+		getsockopt(conn->sock, SOL_TCP, TCP_INFO, &tinfo, &sl);
+		if (tinfo.tcpi_state == TCP_ESTABLISHED &&
+		    conn->events & SOCK_FIN_RCVD)
+			goto reset;
 	}
 
 	if (ack_due)
@@ -2054,6 +2104,7 @@ static void tcp_tap_conn_from_sock(const struct ctx *c, union flow *flow,
 void tcp_listen_handler(const struct ctx *c, union epoll_ref ref,
 			const struct timespec *now)
 {
+	struct tcp_tap_conn *conn;
 	union sockaddr_inany sa;
 	socklen_t sl = sizeof(sa);
 	struct flowside *ini;
@@ -2069,6 +2120,9 @@ void tcp_listen_handler(const struct ctx *c, union epoll_ref ref,
 	if (s < 0)
 		goto cancel;
 
+	conn = (struct tcp_tap_conn *)flow;
+	conn->listening_sock = ref.fd;
+
 	tcp_sock_set_nodelay(s);
 
 	/* FIXME: If useful: when the listening port has a specific bound
@@ -2634,3 +2688,868 @@ void tcp_timer(struct ctx *c, const struct timespec *now)
 	if (c->mode == MODE_PASTA)
 		tcp_splice_refill(c);
 }
+
+/**
+ * tcp_flow_is_established() - Was the connection established? Includes closing
+ * @conn:	Pointer to the TCP connection structure
+ *
+ * Return: true if the connection was established, false otherwise
+ */
+bool tcp_flow_is_established(const struct tcp_tap_conn *conn)
+{
+	return conn->events & ESTABLISHED;
+}
+
+/**
+ * tcp_flow_repair_on() - Enable repair mode for a single TCP flow
+ * @c:		Execution context
+ * @conn:	Pointer to the TCP connection structure
+ *
+ * Return: 0 on success, negative error code on failure
+ */
+int tcp_flow_repair_on(struct ctx *c, const struct tcp_tap_conn *conn)
+{
+	int rc = 0;
+
+	if ((rc = repair_set(c, conn->sock, TCP_REPAIR_ON)))
+		err("Failed to set TCP_REPAIR");
+
+	return rc;
+}
+
+/**
+ * tcp_flow_repair_off() - Clear repair mode for a single TCP flow
+ * @c:		Execution context
+ * @conn:	Pointer to the TCP connection structure
+ *
+ * Return: 0 on success, negative error code on failure
+ */
+int tcp_flow_repair_off(struct ctx *c, const struct tcp_tap_conn *conn)
+{
+	int rc = 0;
+
+	if ((rc = repair_set(c, conn->sock, TCP_REPAIR_OFF)))
+		err("Failed to clear TCP_REPAIR");
+
+	return rc;
+}
+
+/**
+ * tcp_flow_dump_tinfo() - Dump window scale, tcpi_state, tcpi_options
+ * @c:		Execution context
+ * @t:		Extended migration data
+ *
+ * Return: 0 on success, negative error code on failure
+ */
+static int tcp_flow_dump_tinfo(int s, struct tcp_tap_transfer_ext *t)
+{
+	struct tcp_info tinfo;
+	socklen_t sl;
+
+	sl = sizeof(tinfo);
+	if (getsockopt(s, SOL_TCP, TCP_INFO, &tinfo, &sl)) {
+		int rc = -errno;
+		err_perror("Querying TCP_INFO, socket %i", s);
+		return rc;
+	}
+
+	t->snd_ws		= tinfo.tcpi_snd_wscale;
+	t->rcv_ws		= tinfo.tcpi_rcv_wscale;
+	t->tcpi_state		= tinfo.tcpi_state;
+	t->tcpi_options		= tinfo.tcpi_options;
+
+	return 0;
+}
+
+/**
+ * tcp_flow_dump_mss() - Dump MSS clamp (not current MSS) via TCP_MAXSEG
+ * @c:		Execution context
+ * @t:		Extended migration data
+ *
+ * Return: 0 on success, negative error code on failure
+ */
+static int tcp_flow_dump_mss(int s, struct tcp_tap_transfer_ext *t)
+{
+	socklen_t sl = sizeof(t->mss);
+
+	if (getsockopt(s, SOL_TCP, TCP_MAXSEG, &t->mss, &sl)) {
+		int rc = -errno;
+		err_perror("Getting MSS, socket %i", s);
+		return rc;
+	}
+
+	return 0;
+}
+
+/**
+ * tcp_flow_dump_wnd() - Dump current tcp_repair_window parameters
+ * @c:		Execution context
+ * @t:		Extended migration data
+ *
+ * Return: 0 on success, negative error code on failure
+ */
+static int tcp_flow_dump_wnd(int s, struct tcp_tap_transfer_ext *t)
+{
+	struct tcp_repair_window wnd;
+	socklen_t sl = sizeof(wnd);
+
+	if (getsockopt(s, IPPROTO_TCP, TCP_REPAIR_WINDOW, &wnd, &sl)) {
+		int rc = -errno;
+		err_perror("Getting window repair data, socket %i", s);
+		return rc;
+	}
+
+	t->snd_wl1	= wnd.snd_wl1;
+	t->snd_wnd	= wnd.snd_wnd;
+	t->max_window	= wnd.max_window;
+	t->rcv_wnd	= wnd.rcv_wnd;
+	t->rcv_wup	= wnd.rcv_wup;
+
+	/* If we received a FIN, we also need to adjust window parameters.
+	 *
+	 * This must be called after tcp_flow_dump_tinfo(), for t->tcpi_state.
+	 */
+	if (t->tcpi_state == TCP_CLOSE_WAIT || t->tcpi_state == TCP_LAST_ACK) {
+		t->rcv_wup--;
+		t->rcv_wnd++;
+	}
+
+	return 0;
+}
+
+/**
+ * tcp_flow_repair_wnd() - Restore window parameters from extended data
+ * @c:		Execution context
+ * @t:		Extended migration data
+ *
+ * Return: 0 on success, negative error code on failure
+ */
+static int tcp_flow_repair_wnd(int s, const struct tcp_tap_transfer_ext *t)
+{
+	struct tcp_repair_window wnd;
+
+	wnd.snd_wl1	= t->snd_wl1;
+	wnd.snd_wnd	= t->snd_wnd;
+	wnd.max_window	= t->max_window;
+	wnd.rcv_wnd	= t->rcv_wnd;
+	wnd.rcv_wup	= t->rcv_wup;
+
+	if (setsockopt(s, IPPROTO_TCP, TCP_REPAIR_WINDOW, &wnd, sizeof(wnd))) {
+		int rc = -errno;
+		err_perror("Setting window data, socket %i", s);
+		return rc;
+	}
+
+	return 0;
+}
+
+/**
+ * tcp_flow_select_queue() - Select queue (receive or send) for next operation
+ * @s:		Socket
+ * @queue:	TCP_RECV_QUEUE or TCP_SEND_QUEUE
+ *
+ * Return: 0 on success, negative error code on failure
+ */
+static int tcp_flow_select_queue(int s, int queue)
+{
+	if (setsockopt(s, SOL_TCP, TCP_REPAIR_QUEUE, &queue, sizeof(queue))) {
+		int rc = -errno;
+		err_perror("Selecting TCP_SEND_QUEUE, socket %i", s);
+		return rc;
+	}
+
+	return 0;
+}
+
+/**
+ * tcp_flow_dump_sndqueue() - Dump send queue, length of sent and not sent data
+ * @s:		Socket
+ * @t:		Extended migration data
+ *
+ * Return: 0 on success, negative error code on failure
+ *
+ * #syscalls:vu ioctl
+ */
+static int tcp_flow_dump_sndqueue(int s, struct tcp_tap_transfer_ext *t)
+{
+	ssize_t rc;
+
+	if (ioctl(s, SIOCOUTQ, &t->sndq) < 0) {
+		rc = -errno;
+		err_perror("Getting send queue size, socket %i", s);
+		return rc;
+	}
+
+	if (ioctl(s, SIOCOUTQNSD, &t->notsent) < 0) {
+		rc = -errno;
+		err_perror("Getting not sent count, socket %i", s);
+		return rc;
+	}
+
+	/* If we sent a FIN, SIOCOUTQ and SIOCOUTQNSD are one greater than the
+	 * actual pending queue length, because they are based on the sequence
+	 * numbers, not directly on the buffer contents.
+	 *
+	 * This must be called after tcp_flow_dump_tinfo(), for t->tcpi_state.
+	 */
+	if (t->tcpi_state == TCP_FIN_WAIT1 || t->tcpi_state == TCP_FIN_WAIT2 ||
+	    t->tcpi_state == TCP_LAST_ACK  || t->tcpi_state == TCP_CLOSING) {
+		if (t->sndq)
+			t->sndq--;
+		if (t->notsent)
+			t->notsent--;
+	}
+
+	if (t->notsent > t->sndq) {
+		err("Invalid notsent count socket %i, send: %u, not sent: %u",
+		    s, t->sndq, t->notsent);
+		return -EINVAL;
+	}
+
+	if (t->sndq > TCP_MIGRATE_SND_QUEUE_MAX) {
+		err("Send queue too large to migrate socket %i: %u bytes",
+		    s, t->sndq);
+		return -ENOBUFS;
+	}
+
+	rc = recv(s, tcp_migrate_snd_queue,
+		  MIN(t->sndq, TCP_MIGRATE_SND_QUEUE_MAX), MSG_PEEK);
+	if (rc < 0) {
+		if (errno == EAGAIN)  { /* EAGAIN means empty */
+			rc = 0;
+		} else {
+			rc = -errno;
+			err_perror("Can't read send queue, socket %i", s);
+			return rc;
+		}
+	}
+
+	if ((uint32_t)rc < t->sndq) {
+		err("Short read migrating send queue");
+		return -ENXIO;
+	}
+
+	t->notsent = MIN(t->notsent, t->sndq);
+
+	return 0;
+}
+
+/**
+ * tcp_flow_repair_queue() - Restore contents of a given (pre-selected) queue
+ * @s:		Socket
+ * @len:	Length of data to be restored
+ * @buf:	Buffer with content of pending data queue
+ *
+ * Return: 0 on success, negative error code on failure
+ */
+static int tcp_flow_repair_queue(int s, size_t len, uint8_t *buf)
+{
+	size_t chunk = len;
+	uint8_t *p = buf;
+
+	while (len > 0) {
+		ssize_t rc = send(s, p, MIN(len, chunk), 0);
+
+		if (rc < 0) {
+			if ((errno == ENOBUFS || errno == ENOMEM) &&
+			    chunk >= TCP_MIGRATE_RESTORE_CHUNK_MIN) {
+				chunk /= 2;
+				continue;
+			}
+
+			rc = -errno;
+			err_perror("Can't write queue, socket %i", s);
+			return rc;
+		}
+
+		len -= rc;
+		p += rc;
+	}
+
+	return 0;
+}
+
+/**
+ * tcp_flow_dump_seq() - Dump current sequence of pre-selected queue
+ * @s:		Socket
+ * @v:		Sequence value, set on return
+ *
+ * Return: 0 on success, negative error code on failure
+ */
+static int tcp_flow_dump_seq(int s, uint32_t *v)
+{
+	socklen_t sl = sizeof(*v);
+
+	if (getsockopt(s, SOL_TCP, TCP_QUEUE_SEQ, v, &sl)) {
+		int rc = -errno;
+		err_perror("Dumping sequence, socket %i", s);
+		return rc;
+	}
+
+	return 0;
+}
+
+/**
+ * tcp_flow_repair_seq() - Restore sequence for pre-selected queue
+ * @s:		Socket
+ * @v:		Sequence value to be set
+ *
+ * Return: 0 on success, negative error code on failure
+ */
+static int tcp_flow_repair_seq(int s, const uint32_t *v)
+{
+	if (setsockopt(s, SOL_TCP, TCP_QUEUE_SEQ, v, sizeof(*v))) {
+		int rc = -errno;
+		err_perror("Setting sequence, socket %i", s);
+		return rc;
+	}
+
+	return 0;
+}
+
+/**
+ * tcp_flow_dump_rcvqueue() - Dump receive queue and its length, seal/block it
+ * @s:		Socket
+ * @t:		Extended migration data
+ *
+ * Return: 0 on success, negative error code on failure
+ *
+ * #syscalls:vu ioctl
+ */
+static int tcp_flow_dump_rcvqueue(int s, struct tcp_tap_transfer_ext *t)
+{
+	ssize_t rc;
+
+	if (ioctl(s, SIOCINQ, &t->rcvq) < 0) {
+		rc = -errno;
+		err_perror("Get receive queue size, socket %i", s);
+		return rc;
+	}
+
+	/* If we received a FIN, SIOCINQ is one greater than the actual number
+	 * of bytes on the queue, because it's based on the sequence number
+	 * rather than directly on the buffer contents.
+	 *
+	 * This must be called after tcp_flow_dump_tinfo(), for t->tcpi_state.
+	 */
+	if (t->rcvq &&
+	    (t->tcpi_state == TCP_CLOSE_WAIT || t->tcpi_state == TCP_LAST_ACK))
+		t->rcvq--;
+
+	if (t->rcvq > TCP_MIGRATE_RCV_QUEUE_MAX) {
+		err("Receive queue too large to migrate socket %i: %u bytes",
+		    s, t->rcvq);
+		return -ENOBUFS;
+	}
+
+	rc = recv(s, tcp_migrate_rcv_queue, t->rcvq, MSG_PEEK);
+	if (rc < 0) {
+		if (errno == EAGAIN)  { /* EAGAIN means empty */
+			rc = 0;
+		} else {
+			rc = -errno;
+			err_perror("Can't read receive queue for socket %i", s);
+			return rc;
+		}
+	}
+
+	if ((uint32_t)rc < t->rcvq) {
+		err("Short read migrating receive queue");
+		return -ENXIO;
+	}
+
+	return 0;
+}
+
+/**
+ * tcp_flow_repair_opt() - Set repair "options" (MSS, scale, SACK, timestamps)
+ * @s:		Socket
+ * @t:		Extended migration data
+ *
+ * Return: 0 on success, negative error code on failure
+ */
+int tcp_flow_repair_opt(int s, const struct tcp_tap_transfer_ext *t)
+{
+	const struct tcp_repair_opt opts[] = {
+		{ TCPOPT_WINDOW,		t->snd_ws + (t->rcv_ws << 16) },
+		{ TCPOPT_MAXSEG,		t->mss },
+		{ TCPOPT_SACK_PERMITTED,	0 },
+		{ TCPOPT_TIMESTAMP,		0 },
+	};
+	socklen_t sl;
+
+	sl = sizeof(opts[0]) * (2 +
+				!!(t->tcpi_options & TCPI_OPT_SACK) +
+				!!(t->tcpi_options & TCPI_OPT_TIMESTAMPS));
+
+	if (setsockopt(s, SOL_TCP, TCP_REPAIR_OPTIONS, opts, sl)) {
+		int rc = -errno;
+		err_perror("Setting repair options, socket %i", s);
+		return rc;
+	}
+
+	return 0;
+}
+
+/**
+ * tcp_flow_migrate_source() - Send data (flow table) for flow, close listening
+ * @fd:		Descriptor for state migration
+ * @conn:	Pointer to the TCP connection structure
+ *
+ * Return: 0 on success, negative error code on failure
+ */
+int tcp_flow_migrate_source(int fd, struct tcp_tap_conn *conn)
+{
+	struct tcp_tap_transfer t = {
+		.retrans		= conn->retrans,
+		.ws_from_tap		= conn->ws_from_tap,
+		.ws_to_tap		= conn->ws_to_tap,
+		.events			= conn->events,
+
+		.tap_mss		= htonl(MSS_GET(conn)),
+
+		.sndbuf			= htonl(conn->sndbuf),
+
+		.flags			= conn->flags,
+		.seq_dup_ack_approx	= conn->seq_dup_ack_approx,
+
+		.wnd_from_tap		= htons(conn->wnd_from_tap),
+		.wnd_to_tap		= htons(conn->wnd_to_tap),
+
+		.seq_to_tap		= htonl(conn->seq_to_tap),
+		.seq_ack_from_tap	= htonl(conn->seq_ack_from_tap),
+		.seq_from_tap		= htonl(conn->seq_from_tap),
+		.seq_ack_to_tap		= htonl(conn->seq_ack_to_tap),
+		.seq_init_from_tap	= htonl(conn->seq_init_from_tap),
+	};
+
+	memcpy(&t.pif, conn->f.pif, sizeof(t.pif));
+	memcpy(&t.side, conn->f.side, sizeof(t.side));
+
+	if (write_all_buf(fd, &t, sizeof(t))) {
+		int rc = -errno;
+		err_perror("Can't write migration data, socket %i", conn->sock);
+		return rc;
+	}
+
+	if (conn->listening_sock != -1 && !fcntl(conn->listening_sock, F_GETFD))
+		close(conn->listening_sock);
+
+	return 0;
+}
+
+/**
+ * tcp_flow_migrate_source_ext() - Dump queues, close sockets, send final data
+ * @fd:		Descriptor for state migration
+ * @fidx:	Flow index
+ * @conn:	Pointer to the TCP connection structure
+ *
+ * Return: 0 on success, negative (not -EIO) on failure, -EIO on sending failure
+ */
+int tcp_flow_migrate_source_ext(int fd, int fidx,
+				const struct tcp_tap_conn *conn)
+{
+	uint32_t peek_offset = conn->seq_to_tap - conn->seq_ack_from_tap;
+	struct tcp_tap_transfer_ext *t = &migrate_ext[fidx];
+	int s = conn->sock;
+	int rc;
+
+	/* Disable SO_PEEK_OFF, it will make accessing the queues in repair mode
+	 * weird.
+	 */
+	if (tcp_set_peek_offset(s, -1)) {
+		rc = -errno;
+		goto fail;
+	}
+
+	if ((rc = tcp_flow_dump_tinfo(s, t)))
+		goto fail;
+
+	if ((rc = tcp_flow_dump_mss(s, t)))
+		goto fail;
+
+	if ((rc = tcp_flow_dump_wnd(s, t)))
+		goto fail;
+
+	if ((rc = tcp_flow_select_queue(s, TCP_SEND_QUEUE)))
+		goto fail;
+
+	if ((rc = tcp_flow_dump_sndqueue(s, t)))
+		goto fail;
+
+	if ((rc = tcp_flow_dump_seq(s, &t->seq_snd)))
+		goto fail;
+
+	if ((rc = tcp_flow_select_queue(s, TCP_RECV_QUEUE)))
+		goto fail;
+
+	if ((rc = tcp_flow_dump_rcvqueue(s, t)))
+		goto fail;
+
+	if ((rc = tcp_flow_dump_seq(s, &t->seq_rcv)))
+		goto fail;
+
+	close(s);
+
+	/* Adjustments unrelated to FIN segments: sequence numbers we dumped are
+	 * based on the end of the queues.
+	 */
+	t->seq_rcv	-= t->rcvq;
+	t->seq_snd	-= t->sndq;
+
+	debug("Extended migration data, socket %i sequences send %u receive %u",
+	      s, t->seq_snd, t->seq_rcv);
+	debug("  pending queues: send %u not sent %u receive %u",
+	      t->sndq, t->notsent, t->rcvq);
+	debug("  window: snd_wl1 %u snd_wnd %u max %u rcv_wnd %u rcv_wup %u",
+	      t->snd_wl1, t->snd_wnd, t->max_window, t->rcv_wnd, t->rcv_wup);
+	debug("  SO_PEEK_OFF %s  offset=%"PRIu32,
+	      peek_offset_cap ? "enabled" : "disabled", peek_offset);
+
+	/* Endianness fix-ups */
+	t->seq_snd	= htonl(t->seq_snd);
+	t->seq_rcv 	= htonl(t->seq_rcv);
+	t->sndq		= htonl(t->sndq);
+	t->notsent	= htonl(t->notsent);
+	t->rcvq		= htonl(t->rcvq);
+
+	t->snd_wl1	= htonl(t->snd_wl1);
+	t->snd_wnd	= htonl(t->snd_wnd);
+	t->max_window	= htonl(t->max_window);
+	t->rcv_wnd	= htonl(t->rcv_wnd);
+	t->rcv_wup	= htonl(t->rcv_wup);
+
+	if (write_all_buf(fd, t, sizeof(*t))) {
+		err_perror("Failed to write extended data, socket %i", s);
+		return -EIO;
+	}
+
+	if (write_all_buf(fd, tcp_migrate_snd_queue, ntohl(t->sndq))) {
+		err_perror("Failed to write send queue data, socket %i", s);
+		return -EIO;
+	}
+
+	if (write_all_buf(fd, tcp_migrate_rcv_queue, ntohl(t->rcvq))) {
+		err_perror("Failed to write receive queue data, socket %i", s);
+		return -EIO;
+	}
+
+	return 0;
+
+fail:
+	/* For any type of failure dumping data, write an invalid extended data
+	 * descriptor that allows us to keep the stream in sync, but tells the
+	 * target to skip the flow. If we fail to transfer data, that's fatal:
+	 * return -EIO in that case (and only in that case).
+	 */
+	t->tcpi_state = 0; /* Not defined: tell the target to skip this flow */
+
+	if (write_all_buf(fd, t, sizeof(*t))) {
+		err_perror("Failed to write extended data, socket %i", s);
+		return -EIO;
+	}
+
+	if (rc == -EIO) /* but not a migration data transfer failure */
+		return -ENODATA;
+
+	return rc;
+}
+
+/**
+ * tcp_flow_repair_socket() - Open and bind socket, request repair mode
+ * @c:		Execution context
+ * @conn:	Pointer to the TCP connection structure
+ *
+ * Return: 0 on success, negative error code on failure
+ */
+int tcp_flow_repair_socket(struct ctx *c, struct tcp_tap_conn *conn)
+{
+	sa_family_t af = CONN_V4(conn) ? AF_INET : AF_INET6;
+	const struct flowside *sockside = HOSTFLOW(conn);
+	union sockaddr_inany a;
+	socklen_t sl;
+	int s, rc;
+
+	pif_sockaddr(c, &a, &sl, PIF_HOST, &sockside->oaddr, sockside->oport);
+
+	if ((conn->sock = socket(af, SOCK_STREAM | SOCK_NONBLOCK | SOCK_CLOEXEC,
+				 IPPROTO_TCP)) < 0) {
+		rc = -errno;
+		err_perror("Failed to create socket for migrated flow");
+		return rc;
+	}
+	s = conn->sock;
+
+	if (setsockopt(s, SOL_SOCKET, SO_REUSEADDR, &(int){ 1 }, sizeof(int)))
+		debug_perror("Setting SO_REUSEADDR on socket %i", s);
+
+	tcp_sock_set_nodelay(s);
+
+	if ((rc = bind(s, &a.sa, sizeof(a)))) {
+		err_perror("Failed to bind socket for migrated flow");
+		goto err;
+	}
+
+	if ((rc = tcp_flow_repair_on(c, conn)))
+		goto err;
+
+	return 0;
+
+err:
+	close(s);
+	conn->sock = -1;
+	return rc;
+}
+
+/**
+ * tcp_flow_repair_connect() - Connect socket in repair mode, then turn it off
+ * @c:		Execution context
+ * @conn:	Pointer to the TCP connection structure
+ *
+ * Return: 0 on success, negative error code on failure
+ */
+static int tcp_flow_repair_connect(const struct ctx *c,
+				   struct tcp_tap_conn *conn)
+{
+	const struct flowside *tgt = HOSTFLOW(conn);
+	int rc;
+
+	rc = flowside_connect(c, conn->sock, PIF_HOST, tgt);
+	if (rc) {
+		rc = -errno;
+		err_perror("Failed to connect migrated socket %i", conn->sock);
+		return rc;
+	}
+
+	conn->in_epoll = 0;
+	conn->timer = -1;
+	conn->listening_sock = -1;
+
+	return 0;
+}
+
+/**
+ * tcp_flow_migrate_target() - Receive data (flow table part) for flow, insert
+ * @c:		Execution context
+ * @fd:		Descriptor for state migration
+ *
+ * Return: 0 on success, negative on fatal failure, but 0 on single flow failure
+ */
+int tcp_flow_migrate_target(struct ctx *c, int fd)
+{
+	struct tcp_tap_transfer t;
+	struct tcp_tap_conn *conn;
+	union flow *flow;
+	int rc;
+
+	if (!(flow = flow_alloc())) {
+		err("Flow table full on migration target");
+		return 0;
+	}
+
+	if (read_all_buf(fd, &t, sizeof(t))) {
+		flow_alloc_cancel(flow);
+		err_perror("Failed to receive migration data");
+		return -errno;
+	}
+
+	flow->f.state = FLOW_STATE_TGT;
+	memcpy(&flow->f.pif, &t.pif, sizeof(flow->f.pif));
+	memcpy(&flow->f.side, &t.side, sizeof(flow->f.side));
+	conn = FLOW_SET_TYPE(flow, FLOW_TCP, tcp);
+
+	conn->retrans			= t.retrans;
+	conn->ws_from_tap		= t.ws_from_tap;
+	conn->ws_to_tap			= t.ws_to_tap;
+	conn->events			= t.events;
+
+	conn->sndbuf			= htonl(t.sndbuf);
+
+	conn->flags			= t.flags;
+	conn->seq_dup_ack_approx	= t.seq_dup_ack_approx;
+
+	MSS_SET(conn,			  ntohl(t.tap_mss));
+
+	conn->wnd_from_tap		= ntohs(t.wnd_from_tap);
+	conn->wnd_to_tap		= ntohs(t.wnd_to_tap);
+
+	conn->seq_to_tap		= ntohl(t.seq_to_tap);
+	conn->seq_ack_from_tap		= ntohl(t.seq_ack_from_tap);
+	conn->seq_from_tap		= ntohl(t.seq_from_tap);
+	conn->seq_ack_to_tap		= ntohl(t.seq_ack_to_tap);
+	conn->seq_init_from_tap		= ntohl(t.seq_init_from_tap);
+
+	if ((rc = tcp_flow_repair_socket(c, conn))) {
+		flow_err(flow, "Can't set up socket: %s, drop", strerror_(rc));
+		flow_alloc_cancel(flow);
+		return 0;
+	}
+
+	flow_hash_insert(c, TAP_SIDX(conn));
+	FLOW_ACTIVATE(conn);
+
+	return 0;
+}
+
+/**
+ * tcp_flow_migrate_target_ext() - Receive extended data for flow, set, connect
+ * @c:		Execution context
+ * @flow:	Existing flow for this connection data
+ * @fd:		Descriptor for state migration
+ *
+ * Return: 0 on success, negative on fatal failure, but 0 on single flow failure
+ */
+int tcp_flow_migrate_target_ext(struct ctx *c, union flow *flow, int fd)
+{
+	struct tcp_tap_conn *conn = &flow->tcp;
+	uint32_t peek_offset = conn->seq_to_tap - conn->seq_ack_from_tap;
+	struct tcp_tap_transfer_ext t;
+	int s = conn->sock, rc;
+
+	if (read_all_buf(fd, &t, sizeof(t))) {
+		rc = -errno;
+		err_perror("Failed to read extended data for socket %i", s);
+		return rc;
+	}
+
+	if (!t.tcpi_state) { /* Source wants us to skip this flow */
+		flow_err(flow, "Dropping as requested by source");
+		goto fail;
+	}
+
+	/* Endianness fix-ups */
+	t.seq_snd	= ntohl(t.seq_snd);
+	t.seq_rcv 	= ntohl(t.seq_rcv);
+	t.sndq		= ntohl(t.sndq);
+	t.notsent	= ntohl(t.notsent);
+	t.rcvq		= ntohl(t.rcvq);
+
+	t.snd_wl1	= ntohl(t.snd_wl1);
+	t.snd_wnd	= ntohl(t.snd_wnd);
+	t.max_window	= ntohl(t.max_window);
+	t.rcv_wnd	= ntohl(t.rcv_wnd);
+	t.rcv_wup	= ntohl(t.rcv_wup);
+
+	debug("Extended migration data, socket %i sequences send %u receive %u",
+	      s, t.seq_snd, t.seq_rcv);
+	debug("  pending queues: send %u not sent %u receive %u",
+	      t.sndq, t.notsent, t.rcvq);
+	debug("  window: snd_wl1 %u snd_wnd %u max %u rcv_wnd %u rcv_wup %u",
+	      t.snd_wl1, t.snd_wnd, t.max_window, t.rcv_wnd, t.rcv_wup);
+	debug("  SO_PEEK_OFF %s  offset=%"PRIu32,
+	      peek_offset_cap ? "enabled" : "disabled", peek_offset);
+
+	if (t.sndq > TCP_MIGRATE_SND_QUEUE_MAX || t.notsent > t.sndq ||
+	    t.rcvq > TCP_MIGRATE_RCV_QUEUE_MAX) {
+		err("Bad queues socket %i, send: %u, not sent: %u, receive: %u",
+		    s, t.sndq, t.notsent, t.rcvq);
+		return -EINVAL;
+	}
+
+	if (read_all_buf(fd, tcp_migrate_snd_queue, t.sndq)) {
+		rc = -errno;
+		err_perror("Failed to read send queue data, socket %i", s);
+		return rc;
+	}
+
+	if (read_all_buf(fd, tcp_migrate_rcv_queue, t.rcvq)) {
+		rc = -errno;
+		err_perror("Failed to read receive queue data, socket %i", s);
+		return rc;
+	}
+
+	if (tcp_flow_select_queue(s, TCP_SEND_QUEUE))
+		goto fail;
+
+	if (tcp_flow_repair_seq(s, &t.seq_snd))
+		goto fail;
+
+	if (tcp_flow_select_queue(s, TCP_RECV_QUEUE))
+		goto fail;
+
+	if (tcp_flow_repair_seq(s, &t.seq_rcv))
+		goto fail;
+
+	if (tcp_flow_repair_connect(c, conn))
+		goto fail;
+
+	if (tcp_flow_repair_queue(s, t.rcvq, tcp_migrate_rcv_queue))
+		goto fail;
+
+	if (tcp_flow_select_queue(s, TCP_SEND_QUEUE))
+		goto fail;
+
+	if (tcp_flow_repair_queue(s, t.sndq - t.notsent,
+				  tcp_migrate_snd_queue))
+		goto fail;
+
+	if (tcp_flow_repair_opt(s, &t))
+		goto fail;
+
+	/* If we sent a FIN sent and it was acknowledged (TCP_FIN_WAIT2), don't
+	 * send it out, because we already sent it for sure.
+	 *
+	 * Call shutdown(x, SHUT_WR) in repair mode, so that we move to
+	 * FIN_WAIT_1 (tcp_shutdown()) without sending anything
+	 * (goto in tcp_write_xmit()).
+	 */
+	if (t.tcpi_state == TCP_FIN_WAIT2) {
+		int v;
+
+		v = TCP_SEND_QUEUE;
+		if (setsockopt(s, SOL_TCP, TCP_REPAIR_QUEUE, &v, sizeof(v)))
+			debug_perror("Selecting repair queue, socket %i", s);
+		else
+			shutdown(s, SHUT_WR);
+	}
+
+	if (tcp_flow_repair_wnd(s, &t))
+		goto fail;
+
+	tcp_flow_repair_off(c, conn);
+	repair_flush(c);
+
+	if (t.notsent) {
+		if (tcp_flow_repair_queue(s, t.notsent,
+					  tcp_migrate_snd_queue +
+					  (t.sndq - t.notsent))) {
+			/* This sometimes seems to fail for unclear reasons.
+			 * Don't fail the whole migration, just reset the flow
+			 * and carry on to the next one.
+			 */
+			goto fail;
+		}
+	}
+
+	/* If we sent a FIN but it wasn't acknowledged yet (TCP_FIN_WAIT1), send
+	 * it out, because we don't know if we already sent it.
+	 *
+	 * Call shutdown(x, SHUT_WR) *not* in repair mode, which moves us to
+	 * TCP_FIN_WAIT1.
+	 */
+	if (t.tcpi_state == TCP_FIN_WAIT1)
+		shutdown(s, SHUT_WR);
+
+	if (tcp_set_peek_offset(conn->sock, peek_offset))
+		goto fail;
+
+	tcp_send_flag(c, conn, ACK);
+	tcp_data_from_sock(c, conn);
+
+	if ((rc = tcp_epoll_ctl(c, conn))) {
+		debug("Failed to subscribe to epoll for migrated socket %i: %s",
+		      conn->sock, strerror_(-rc));
+		goto fail;
+	}
+
+	return 0;
+
+fail:
+	tcp_flow_repair_off(c, conn);
+	repair_flush(c);
+
+	conn->flags = 0; /* Not waiting for ACK, don't schedule timer */
+	tcp_rst(c, conn);
+
+	return 0;
+}
diff --git a/tcp_conn.h b/tcp_conn.h
index 8c20805..42dff48 100644
--- a/tcp_conn.h
+++ b/tcp_conn.h
@@ -19,6 +19,7 @@
  * @tap_mss:		MSS advertised by tap/guest, rounded to 2 ^ TCP_MSS_BITS
  * @sock:		Socket descriptor number
  * @events:		Connection events, implying connection states
+ * @listening_sock:	Listening socket this socket was accept()ed from, or -1
  * @timer:		timerfd descriptor for timeout events
  * @flags:		Connection flags representing internal attributes
  * @sndbuf:		Sending buffer in kernel, rounded to 2 ^ SNDBUF_BITS
@@ -68,6 +69,7 @@ struct tcp_tap_conn {
 #define	CONN_STATE_BITS		/* Setting these clears other flags */	\
 	(SOCK_ACCEPTED | TAP_SYN_RCVD | ESTABLISHED)
 
+	int		listening_sock;
 
 	int		timer		:FD_REF_BITS;
 
@@ -96,6 +98,93 @@ struct tcp_tap_conn {
 	uint32_t	seq_init_from_tap;
 };
 
+/**
+ * struct tcp_tap_transfer - Migrated TCP data, flow table part, network order
+ * @pif:		Interfaces for each side of the flow
+ * @side:		Addresses and ports for each side of the flow
+ * @retrans:		Number of retransmissions occurred due to ACK_TIMEOUT
+ * @ws_from_tap:	Window scaling factor advertised from tap/guest
+ * @ws_to_tap:		Window scaling factor advertised to tap/guest
+ * @events:		Connection events, implying connection states
+ * @tap_mss:		MSS advertised by tap/guest, rounded to 2 ^ TCP_MSS_BITS
+ * @sndbuf:		Sending buffer in kernel, rounded to 2 ^ SNDBUF_BITS
+ * @flags:		Connection flags representing internal attributes
+ * @seq_dup_ack_approx:	Last duplicate ACK number sent to tap
+ * @wnd_from_tap:	Last window size from tap, unscaled (as received)
+ * @wnd_to_tap:		Sending window advertised to tap, unscaled (as sent)
+ * @seq_to_tap:		Next sequence for packets to tap
+ * @seq_ack_from_tap:	Last ACK number received from tap
+ * @seq_from_tap:	Next sequence for packets from tap (not actually sent)
+ * @seq_ack_to_tap:	Last ACK number sent to tap
+ * @seq_init_from_tap:	Initial sequence number from tap
+*/
+struct tcp_tap_transfer {
+	uint8_t		pif[SIDES];
+	struct flowside	side[SIDES];
+
+	uint8_t		retrans;
+	uint8_t		ws_from_tap;
+	uint8_t		ws_to_tap;
+	uint8_t		events;
+
+	uint32_t	tap_mss;
+
+	uint32_t	sndbuf;
+
+	uint8_t		flags;
+	uint8_t		seq_dup_ack_approx;
+
+	uint16_t	wnd_from_tap;
+	uint16_t	wnd_to_tap;
+
+	uint32_t	seq_to_tap;
+	uint32_t	seq_ack_from_tap;
+	uint32_t	seq_from_tap;
+	uint32_t	seq_ack_to_tap;
+	uint32_t	seq_init_from_tap;
+} __attribute__((packed, aligned(__alignof__(uint32_t))));
+
+/**
+ * struct tcp_tap_transfer_ext - Migrated TCP data, outside flow, network order
+ * @seq_snd:		Socket-side send sequence
+ * @seq_rcv:		Socket-side receive sequence
+ * @sndq:		Length of pending send queue (unacknowledged / not sent)
+ * @notsent:		Part of pending send queue that wasn't sent out yet
+ * @rcvq:		Length of pending receive queue
+ * @mss:		Socket-side MSS clamp
+ * @snd_wl1:		Next sequence used in window probe (next sequence - 1)
+ * @snd_wnd:		Socket-side sending window
+ * @max_window:		Window clamp
+ * @rcv_wnd:		Socket-side receive window
+ * @rcv_wup:		rcv_nxt on last window update sent
+ * @snd_ws:		Window scaling factor, send
+ * @rcv_ws:		Window scaling factor, receive
+ * @tcpi_state:		Connection state in TCP_INFO style (enum, tcp_states.h)
+ * @tcpi_options:	TCPI_OPT_* constants (timestamps, selective ACK)
+ */
+struct tcp_tap_transfer_ext {
+	uint32_t	seq_snd;
+	uint32_t	seq_rcv;
+
+	uint32_t	sndq;
+	uint32_t	notsent;
+	uint32_t	rcvq;
+
+	uint32_t	mss;
+
+	/* We can't just use struct tcp_repair_window: we need network order */
+	uint32_t	snd_wl1;
+	uint32_t	snd_wnd;
+	uint32_t	max_window;
+	uint32_t	rcv_wnd;
+	uint32_t	rcv_wup;
+
+	uint8_t		snd_ws;
+	uint8_t		rcv_ws;
+	uint8_t		tcpi_state;
+	uint8_t		tcpi_options;
+} __attribute__((packed, aligned(__alignof__(uint32_t))));
+
 /**
  * struct tcp_splice_conn - Descriptor for a spliced TCP connection
  * @f:			Generic flow information
@@ -140,6 +229,20 @@ extern int init_sock_pool4	[TCP_SOCK_POOL_SIZE];
 extern int init_sock_pool6	[TCP_SOCK_POOL_SIZE];
 
 bool tcp_flow_defer(const struct tcp_tap_conn *conn);
+
+int tcp_flow_repair_on(struct ctx *c, const struct tcp_tap_conn *conn);
+int tcp_flow_repair_off(struct ctx *c, const struct tcp_tap_conn *conn);
+
+int tcp_flow_migrate_shrink_window(int fidx, const struct tcp_tap_conn *conn);
+int tcp_flow_migrate_source(int fd, struct tcp_tap_conn *conn);
+int tcp_flow_migrate_source_ext(int fd, int fidx,
+				const struct tcp_tap_conn *conn);
+
+int tcp_flow_migrate_target(struct ctx *c, int fd);
+int tcp_flow_migrate_target_ext(struct ctx *c, union flow *flow, int fd);
+
+bool tcp_flow_is_established(const struct tcp_tap_conn *conn);
+
 bool tcp_splice_flow_defer(struct tcp_splice_conn *conn);
 void tcp_splice_timer(const struct ctx *c, struct tcp_splice_conn *conn);
 int tcp_conn_pool_sock(int pool[]);
-- 
@@ -19,6 +19,7 @@
  * @tap_mss:		MSS advertised by tap/guest, rounded to 2 ^ TCP_MSS_BITS
  * @sock:		Socket descriptor number
  * @events:		Connection events, implying connection states
+ * @listening_sock:	Listening socket this socket was accept()ed from, or -1
  * @timer:		timerfd descriptor for timeout events
  * @flags:		Connection flags representing internal attributes
  * @sndbuf:		Sending buffer in kernel, rounded to 2 ^ SNDBUF_BITS
@@ -68,6 +69,7 @@ struct tcp_tap_conn {
 #define	CONN_STATE_BITS		/* Setting these clears other flags */	\
 	(SOCK_ACCEPTED | TAP_SYN_RCVD | ESTABLISHED)
 
+	int		listening_sock;
 
 	int		timer		:FD_REF_BITS;
 
@@ -96,6 +98,93 @@ struct tcp_tap_conn {
 	uint32_t	seq_init_from_tap;
 };
 
+/**
+ * struct tcp_tap_transfer - Migrated TCP data, flow table part, network order
+ * @pif:		Interfaces for each side of the flow
+ * @side:		Addresses and ports for each side of the flow
+ * @retrans:		Number of retransmissions occurred due to ACK_TIMEOUT
+ * @ws_from_tap:	Window scaling factor advertised from tap/guest
+ * @ws_to_tap:		Window scaling factor advertised to tap/guest
+ * @events:		Connection events, implying connection states
+ * @tap_mss:		MSS advertised by tap/guest, rounded to 2 ^ TCP_MSS_BITS
+ * @sndbuf:		Sending buffer in kernel, rounded to 2 ^ SNDBUF_BITS
+ * @flags:		Connection flags representing internal attributes
+ * @seq_dup_ack_approx:	Last duplicate ACK number sent to tap
+ * @wnd_from_tap:	Last window size from tap, unscaled (as received)
+ * @wnd_to_tap:		Sending window advertised to tap, unscaled (as sent)
+ * @seq_to_tap:		Next sequence for packets to tap
+ * @seq_ack_from_tap:	Last ACK number received from tap
+ * @seq_from_tap:	Next sequence for packets from tap (not actually sent)
+ * @seq_ack_to_tap:	Last ACK number sent to tap
+ * @seq_init_from_tap:	Initial sequence number from tap
+*/
+struct tcp_tap_transfer {
+	uint8_t		pif[SIDES];
+	struct flowside	side[SIDES];
+
+	uint8_t		retrans;
+	uint8_t		ws_from_tap;
+	uint8_t		ws_to_tap;
+	uint8_t		events;
+
+	uint32_t	tap_mss;
+
+	uint32_t	sndbuf;
+
+	uint8_t		flags;
+	uint8_t		seq_dup_ack_approx;
+
+	uint16_t	wnd_from_tap;
+	uint16_t	wnd_to_tap;
+
+	uint32_t	seq_to_tap;
+	uint32_t	seq_ack_from_tap;
+	uint32_t	seq_from_tap;
+	uint32_t	seq_ack_to_tap;
+	uint32_t	seq_init_from_tap;
+} __attribute__((packed, aligned(__alignof__(uint32_t))));
+
+/**
+ * struct tcp_tap_transfer_ext - Migrated TCP data, outside flow, network order
+ * @seq_snd:		Socket-side send sequence
+ * @seq_rcv:		Socket-side receive sequence
+ * @sndq:		Length of pending send queue (unacknowledged / not sent)
+ * @notsent:		Part of pending send queue that wasn't sent out yet
+ * @rcvq:		Length of pending receive queue
+ * @mss:		Socket-side MSS clamp
+ * @snd_wl1:		Next sequence used in window probe (next sequence - 1)
+ * @snd_wnd:		Socket-side sending window
+ * @max_window:		Window clamp
+ * @rcv_wnd:		Socket-side receive window
+ * @rcv_wup:		rcv_nxt on last window update sent
+ * @snd_ws:		Window scaling factor, send
+ * @rcv_ws:		Window scaling factor, receive
+ * @tcpi_state:		Connection state in TCP_INFO style (enum, tcp_states.h)
+ * @tcpi_options:	TCPI_OPT_* constants (timestamps, selective ACK)
+ */
+struct tcp_tap_transfer_ext {
+	uint32_t	seq_snd;
+	uint32_t	seq_rcv;
+
+	uint32_t	sndq;
+	uint32_t	notsent;
+	uint32_t	rcvq;
+
+	uint32_t	mss;
+
+	/* We can't just use struct tcp_repair_window: we need network order */
+	uint32_t	snd_wl1;
+	uint32_t	snd_wnd;
+	uint32_t	max_window;
+	uint32_t	rcv_wnd;
+	uint32_t	rcv_wup;
+
+	uint8_t		snd_ws;
+	uint8_t		rcv_ws;
+	uint8_t		tcpi_state;
+	uint8_t		tcpi_options;
+} __attribute__((packed, aligned(__alignof__(uint32_t))));
+
 /**
  * struct tcp_splice_conn - Descriptor for a spliced TCP connection
  * @f:			Generic flow information
@@ -140,6 +229,20 @@ extern int init_sock_pool4	[TCP_SOCK_POOL_SIZE];
 extern int init_sock_pool6	[TCP_SOCK_POOL_SIZE];
 
 bool tcp_flow_defer(const struct tcp_tap_conn *conn);
+
+int tcp_flow_repair_on(struct ctx *c, const struct tcp_tap_conn *conn);
+int tcp_flow_repair_off(struct ctx *c, const struct tcp_tap_conn *conn);
+
+int tcp_flow_migrate_shrink_window(int fidx, const struct tcp_tap_conn *conn);
+int tcp_flow_migrate_source(int fd, struct tcp_tap_conn *conn);
+int tcp_flow_migrate_source_ext(int fd, int fidx,
+				const struct tcp_tap_conn *conn);
+
+int tcp_flow_migrate_target(struct ctx *c, int fd);
+int tcp_flow_migrate_target_ext(struct ctx *c, union flow *flow, int fd);
+
+bool tcp_flow_is_established(const struct tcp_tap_conn *conn);
+
 bool tcp_splice_flow_defer(struct tcp_splice_conn *conn);
 void tcp_splice_timer(const struct ctx *c, struct tcp_splice_conn *conn);
 int tcp_conn_pool_sock(int pool[]);
-- 
2.43.0


  reply	other threads:[~2025-02-16 22:22 UTC|newest]

Thread overview: 5+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2025-02-16 22:22 [PATCH v27 0/2] State migration, perhaps final (#2)? Stefano Brivio
2025-02-16 22:22 ` Stefano Brivio [this message]
2025-02-17  3:46   ` [PATCH v27 1/2] migrate: Migrate TCP flows David Gibson
2025-02-16 22:22 ` [PATCH v27 2/2] test: Add migration tests Stefano Brivio
2025-02-17  3:47   ` David Gibson

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=20250216222227.2017788-2-sbrivio@redhat.com \
    --to=sbrivio@redhat.com \
    --cc=david@gibson.dropbear.id.au \
    --cc=passt-dev@passt.top \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
Code repositories for project(s) associated with this public inbox

	https://passt.top/passt

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for IMAP folder(s).