#! /usr/bin/env python3 # SPDX-License-Identifier: GPL-2.0-or-later # # Copyright Red Hat # Author: David Gibson """ Test A Simple Socket Transport transfer.py - Helpers for testing data transfers """ import contextlib from ipaddress import IPv4Address, IPv6Address import time import exeter from . import nstool, snh # HACK: how long to wait for the server to be ready and listening (s) SERVER_READY_DELAY = 0.1 # 1/10th of a second # socat needs IPv6 addresses in square brackets def socat_ip(ip): if isinstance(ip, IPv6Address): return f'[{ip}]' if isinstance(ip, IPv4Address): return f'{ip}' raise TypeError def socat_upload(datafile, csnh, ssnh, connect, listen): srcdata = csnh.output('cat', f'{datafile}') with ssnh.bg('socat', '-u', f'{listen}', 'STDOUT', capture=snh.STDOUT) as server: time.sleep(SERVER_READY_DELAY) # Can't use csnh.fg() here, because while we wait for the # client to complete we won't be reading from the output pipe # of the server, meaning it will freeze once the buffers fill with csnh.bg('socat', '-u', f'OPEN:{datafile}', f'{connect}') \ as client: res = server.run() client.run() exeter.assert_eq(srcdata, res.stdout) def socat_download(datafile, csnh, ssnh, connect, listen): srcdata = ssnh.output('cat', f'{datafile}') with ssnh.bg('socat', '-u', f'OPEN:{datafile}', f'{listen}'): time.sleep(SERVER_READY_DELAY) dstdata = csnh.output('socat', '-u', f'{connect}', 'STDOUT') exeter.assert_eq(srcdata, dstdata) def _tcp_socat(connectip, connectport, listenip, listenport, fromip): v6 = isinstance(connectip, IPv6Address) if listenport is None: listenport = connectport if v6: connect = f'TCP6:[{connectip}]:{connectport},ipv6only' listen = f'TCP6-LISTEN:{listenport},ipv6only' else: connect = f'TCP4:{connectip}:{connectport}' listen = f'TCP4-LISTEN:{listenport}' if listenip is not None: listen += f',bind={socat_ip(listenip)}' if fromip is not None: connect += f',bind={socat_ip(fromip)}' return (connect, listen) def tcp_upload(datafile, cs, ss, connectip, connectport, listenip=None, listenport=None, fromip=None): connect, listen = _tcp_socat(connectip, connectport, listenip, listenport, fromip) socat_upload(datafile, cs, ss, connect, listen) def tcp_download(datafile, cs, ss, connectip, connectport, listenip=None, listenport=None, fromip=None): connect, listen = _tcp_socat(connectip, connectport, listenip, listenport, fromip) socat_download(datafile, cs, ss, connect, listen) def udp_transfer(datafile, cs, ss, connectip, connectport, listenip=None, listenport=None, fromip=None): v6 = isinstance(connectip, IPv6Address) if listenport is None: listenport = connectport if v6: connect = f'UDP6:[{connectip}]:{connectport},ipv6only,shut-null' listen = f'UDP6-LISTEN:{listenport},ipv6only,null-eof' else: connect = f'UDP4:{connectip}:{connectport},shut-null' listen = f'UDP4-LISTEN:{listenport},null-eof' if listenip is not None: listen += f',bind={socat_ip(listenip)}' if fromip is not None: connect += f',bind={socat_ip(fromip)}' socat_upload(datafile, cs, ss, connect, listen) SMALL_DATA = 'test/small.bin' BIG_DATA = 'test/big.bin' UDP_DATA = 'test/medium.bin' class TransferTestScenario: def __init__(self, *, client, server, connect_ip, connect_port, listen_ip=None, listen_port=None, from_ip=None): self.client = client self.server = server if isinstance(connect_ip, IPv4Address): self.ip = connect_ip self.listen_ip = listen_ip self.from_ip = from_ip elif isinstance(connect_ip, IPv6Address): self.ip = connect_ip self.listen_ip = listen_ip self.from_ip = from_ip self.port = connect_port self.listen_port = listen_port def test_tcp_upload(setup, datafile=SMALL_DATA): with setup as scn: tcp_upload(datafile, scn.client, scn.server, scn.ip, scn.port, listenip=scn.listen_ip, listenport=scn.listen_port, fromip=scn.from_ip) def test_tcp_big_upload(setup): return test_tcp_upload(setup, datafile=BIG_DATA) def test_tcp_download(setup, datafile=SMALL_DATA): with setup as scn: tcp_download(datafile, scn.client, scn.server, scn.ip, scn.port, listenip=scn.listen_ip, listenport=scn.listen_port, fromip=scn.from_ip) def test_tcp_big_download(setup): return test_tcp_download(setup, datafile=BIG_DATA) def test_udp_transfer(setup, datafile=UDP_DATA): with setup as scn: udp_transfer(datafile, scn.client, scn.server, scn.ip, scn.port, listenip=scn.listen_ip, listenport=scn.listen_port, fromip=scn.from_ip) TRANSFER_TESTS = [test_tcp_upload, test_tcp_big_upload, test_tcp_download, test_tcp_big_download, test_udp_transfer] def transfer_tests(setup): for t in TRANSFER_TESTS: testid = f'{setup.__qualname__}|{t.__qualname__}' exeter.register_pipe(testid, setup, t) @contextlib.contextmanager def local_transfer4(): with nstool.unshare_snh('ns', '-Un') as ns: ns.ifup('lo') yield TransferTestScenario(client=ns, server=ns, connect_ip=IPv4Address('127.0.0.1'), connect_port=10000) transfer_tests(local_transfer4) @contextlib.contextmanager def local_transfer6(): with nstool.unshare_snh('ns', '-Un') as ns: ns.ifup('lo') yield TransferTestScenario(client=ns, server=ns, connect_ip=IPv6Address('::1'), connect_port=10000) transfer_tests(local_transfer6)