#! /usr/bin/env avocado-runner-avocado-classless # SPDX-License-Identifier: GPL-2.0-or-later # # Copyright Red Hat # Author: David Gibson """ Test A Simple Socket Transport transfer.py - Helpers for testing data transfers """ import contextlib import ipaddress import time from avocado_classless.test import assert_eq, test_output from tasst.nstool import unshare_site # HACK: how long to wait for the server to be ready and listening (s) SERVER_READY_DELAY = 0.05 # 1/20th of a second # socat needs IPv6 addresses in square brackets def socat_fmt(ip): if isinstance(ip, ipaddress.IPv6Address): return f'[{ip}]' if isinstance(ip, ipaddress.IPv4Address): return f'{ip}' raise TypeError def socat_upload(datafile, cs, ss, connect, listen): cs.require_cmds('socat', 'cat') ss.require_cmds('socat') with ss.bg(f'socat -u {listen} STDOUT', verbose=False) as server: time.sleep(SERVER_READY_DELAY) cs.fg(f'socat -u OPEN:{datafile} {connect}') res = server.run() srcdata = cs.output(f'cat {datafile}', verbose=False) assert_eq(srcdata, res.stdout) def socat_download(datafile, cs, ss, connect, listen): cs.require_cmds('socat') ss.require_cmds('socat', 'cat') with ss.bg(f'socat -u OPEN:{datafile} {listen}'): time.sleep(SERVER_READY_DELAY) dstdata = cs.output(f'socat -u {connect} STDOUT', verbose=False) srcdata = ss.output(f'cat {datafile}', verbose=False) assert_eq(srcdata, dstdata) # pylint: disable=R0913 def _tcp_socat(connectip, connectport, listenip, listenport, fromip): v6 = isinstance(connectip, ipaddress.IPv6Address) if listenport is None: listenport = connectport if v6: connect = f'TCP6:[{connectip}]:{connectport},ipv6only' listen = f'TCP6-LISTEN:{listenport},ipv6only' else: connect = f'TCP4:{connectip}:{connectport}' listen = f'TCP4-LISTEN:{listenport}' if listenip is not None: listen += f',bind={socat_fmt(listenip)}' if fromip is not None: connect += f',bind={socat_fmt(fromip)}' return (connect, listen) def tcp_upload(datafile, cs, ss, connectip, connectport, listenip=None, listenport=None, fromip=None): connect, listen = _tcp_socat(connectip, connectport, listenip, listenport, fromip) socat_upload(datafile, cs, ss, connect, listen) def tcp_download(datafile, cs, ss, connectip, connectport, listenip=None, listenport=None, fromip=None): connect, listen = _tcp_socat(connectip, connectport, listenip, listenport, fromip) socat_download(datafile, cs, ss, connect, listen) def udp_transfer(datafile, cs, ss, connectip, connectport, listenip=None, listenport=None, fromip=None): v6 = isinstance(connectip, ipaddress.IPv6Address) if listenport is None: listenport = connectport if v6: connect = f'UDP6:[{connectip}]:{connectport},ipv6only,shut-null' listen = f'UDP6-LISTEN:{listenport},ipv6only,null-eof' else: connect = f'UDP4:{connectip}:{connectport},shut-null' listen = f'UDP4-LISTEN:{listenport},null-eof' if listenip is not None: listen += f',bind={socat_fmt(listenip)}' if fromip is not None: connect += f',bind={socat_fmt(fromip)}' socat_upload(datafile, cs, ss, connect, listen) def test_tcp_uploads(datafile, ip4, ip6, port, listenip4=None, listenip6=None, listenport=None, fromip4=None, fromip6=None): if listenport is None: listenport = port def dec(sitefn): def tcp4_upload(sites): with sites as (cs, ss): tcp_upload(datafile, cs, ss, ip4, port, listenip=listenip4, listenport=listenport, fromip=fromip4) def tcp6_upload(sites): with sites as (cs, ss): tcp_upload(datafile, cs, ss, ip6, port, listenip=listenip6, listenport=listenport, fromip=fromip6) return test_output(tcp4_upload, tcp6_upload)(sitefn) return dec def test_udp_transfers(datafile, ip4, ip6, port, listenip4=None, listenip6=None, listenport=None, fromip4=None, fromip6=None): if listenport is None: listenport = port def dec(sitefn): def udp4_transfer(sites): with sites as (cs, ss): udp_transfer(datafile, cs, ss, ip4, port, listenip=listenip4, listenport=listenport, fromip=fromip4) def udp6_transfer(sites): with sites as (cs, ss): udp_transfer(datafile, cs, ss, ip6, port, listenip=listenip6, listenport=listenport, fromip=fromip6) return test_output(udp4_transfer, udp6_transfer)(sitefn) return dec def test_transfers(ip4, ip6, port, **kwargs): def dec(sitefn): sitefn = test_tcp_uploads('small.bin', ip4, ip6, port, **kwargs)(sitefn) sitefn = test_udp_transfers('medium.bin', ip4, ip6, port, **kwargs)(sitefn) return sitefn return dec @test_transfers(ip4=ipaddress.ip_address('127.0.0.1'), ip6=ipaddress.ip_address('::1'), port=10000) @contextlib.contextmanager def local_only(): with unshare_site('ns', '-Un') as ns: ns.ifup('lo') yield (ns, ns)