#! /usr/bin/env python3 # SPDX-License-Identifier: GPL-2.0-or-later # # Copyright Red Hat # Author: David Gibson """ Test A Simple Socket Transport transfer.py - Helpers for testing data transfers """ import dataclasses import ipaddress import time from typing import Iterator, Optional import exeter from . import cmdsite, ip, unshare # HACK: how long to wait for the server to be ready and listening (s) SERVER_READY_DELAY = 0.1 # 1/10th of a second # socat needs IPv6 addresses in square brackets def socat_ip(ip: ip.Addr) -> str: if isinstance(ip, ipaddress.IPv6Address): return f'[{ip}]' elif isinstance(ip, ipaddress.IPv4Address): return f'{ip}' raise TypeError def socat_upload(datafile: str, csite: cmdsite.CmdSite, ssite: cmdsite.CmdSite, connect: str, listen: str) -> None: srcdata = csite.output('cat', f'{datafile}') with ssite.bg('socat', '-u', f'{listen}', 'STDOUT', capture=cmdsite.Capture.STDOUT) as server: time.sleep(SERVER_READY_DELAY) # Can't use csite.fg() here, because while we wait for the # client to complete we won't be reading from the output pipe # of the server, meaning it will freeze once the buffers fill with csite.bg('socat', '-u', f'OPEN:{datafile}', f'{connect}') \ as client: res = server.run() client.run() exeter.assert_eq(srcdata, res.stdout) def socat_download(datafile: str, csite: cmdsite.CmdSite, ssite: cmdsite.CmdSite, connect: str, listen: str) -> None: srcdata = ssite.output('cat', f'{datafile}') with ssite.bg('socat', '-u', f'OPEN:{datafile}', f'{listen}'): time.sleep(SERVER_READY_DELAY) dstdata = csite.output('socat', '-u', f'{connect}', 'STDOUT') exeter.assert_eq(srcdata, dstdata) def _tcp_socat(connectip: ip.Addr, connectport: int, listenip: Optional[ip.Addr], listenport: Optional[int], fromip: Optional[ip.Addr]) -> tuple[str, str]: v6 = isinstance(connectip, ipaddress.IPv6Address) if listenport is None: listenport = connectport if v6: connect = f'TCP6:[{connectip}]:{connectport},ipv6only' listen = f'TCP6-LISTEN:{listenport},ipv6only' else: connect = f'TCP4:{connectip}:{connectport}' listen = f'TCP4-LISTEN:{listenport}' if listenip is not None: listen += f',bind={socat_ip(listenip)}' if fromip is not None: connect += f',bind={socat_ip(fromip)}' return (connect, listen) def tcp_upload(datafile: str, cs: cmdsite.CmdSite, ss: cmdsite.CmdSite, connectip: ip.Addr, connectport: int, listenip: Optional[ip.Addr] = None, listenport: Optional[int] = None, fromip: Optional[ip.Addr] = None) -> None: connect, listen = _tcp_socat(connectip, connectport, listenip, listenport, fromip) socat_upload(datafile, cs, ss, connect, listen) def tcp_download(datafile: str, cs: cmdsite.CmdSite, ss: cmdsite.CmdSite, connectip: ip.Addr, connectport: int, listenip: Optional[ip.Addr] = None, listenport: Optional[int] = None, fromip: Optional[ip.Addr] = None) -> None: connect, listen = _tcp_socat(connectip, connectport, listenip, listenport, fromip) socat_download(datafile, cs, ss, connect, listen) def udp_transfer(datafile: str, cs: cmdsite.CmdSite, ss: cmdsite.CmdSite, connectip: ip.Addr, connectport: int, listenip: Optional[ip.Addr] = None, listenport: Optional[int] = None, fromip: Optional[ip.Addr] = None) -> None: v6 = isinstance(connectip, ipaddress.IPv6Address) if listenport is None: listenport = connectport if v6: connect = f'UDP6:[{connectip}]:{connectport},ipv6only,shut-null' listen = f'UDP6-LISTEN:{listenport},ipv6only,null-eof' else: connect = f'UDP4:{connectip}:{connectport},shut-null' listen = f'UDP4-LISTEN:{listenport},null-eof' if listenip is not None: listen += f',bind={socat_ip(listenip)}' if fromip is not None: connect += f',bind={socat_ip(fromip)}' socat_upload(datafile, cs, ss, connect, listen) SMALL_DATA = 'small.bin' BIG_DATA = 'big.bin' UDP_DATA = 'medium.bin' @dataclasses.dataclass class TransferScenario(exeter.Scenario): client: cmdsite.CmdSite server: cmdsite.CmdSite connect_ip: ip.Addr connect_port: int listen_ip: Optional[ip.Addr] = None from_ip: Optional[ip.Addr] = None listen_port: Optional[int] = None def tcp_upload(self, datafile: str) -> None: tcp_upload(datafile, self.client, self.server, self.connect_ip, self.connect_port, listenip=self.listen_ip, listenport=self.listen_port, fromip=self.from_ip) @exeter.scenariotest def tcp_small_upload(self) -> None: self.tcp_upload(SMALL_DATA) @exeter.scenariotest def tcp_big_upload(self) -> None: self.tcp_upload(BIG_DATA) def tcp_download(self, datafile: str) -> None: tcp_download(datafile, self.client, self.server, self.connect_ip, self.connect_port, listenip=self.listen_ip, listenport=self.listen_port, fromip=self.from_ip) @exeter.scenariotest def tcp_small_download(self) -> None: self.tcp_download(SMALL_DATA) @exeter.scenariotest def tcp_big_download(self) -> None: self.tcp_download(BIG_DATA) @exeter.scenariotest def udp_transfer(self, datafile: str = UDP_DATA) -> None: udp_transfer(datafile, self.client, self.server, self.connect_ip, self.connect_port, listenip=self.listen_ip, listenport=self.listen_port, fromip=self.from_ip) def local4() -> Iterator[TransferScenario]: with unshare.unshare('ns', '-Un') as ns: ip.ifup(ns, 'lo') yield TransferScenario(client=ns, server=ns, connect_ip=ip.LOOPBACK4, connect_port=10000) def local6() -> Iterator[TransferScenario]: with unshare.unshare('ns', '-Un') as ns: ip.ifup(ns, 'lo') yield TransferScenario(client=ns, server=ns, connect_ip=ip.LOOPBACK6, connect_port=10000) def selftests() -> None: TransferScenario.test(local4) TransferScenario.test(local6)