#! /usr/bin/python3 # SPDX-License-Identifier: GPL-2.0-or-later # # tasst - Test A Simple Socket Transport # library of test helpers for passt & pasta # # tasst/transfer.py - Helpers for testing data transfers # # Copyright Red Hat # Author: David Gibson import contextlib import ipaddress import time import avocado from tasst import Tasst, TasstSubData from tasst.site import Site from tasst.nstool import UnshareSite from tasst.typing import typecheck, typecheck_default # HACK: how long to wait for the server to be ready and listening (s) SERVER_READY_DELAY = 0.05 # 1/20th of a second # socat needs IPv6 addresses in square brackets def socat_fmt(ip): if isinstance(ip, ipaddress.IPv6Address): return '[{}]'.format(ip) elif isinstance(ip, ipaddress.IPv4Address): return '{}'.format(ip) else: raise TypeError class TransferTasstInfo: def __init__(self, datafile, cs, ss, ip4, ip6, port, listen_ip4=None, listen_ip6=None, listenport=None, from_ip4=None, from_ip6=None): self.datafile = typecheck(datafile, str) self.cs = typecheck(cs, Site) self.ss = typecheck(ss, Site) self.ip4 = typecheck(ip4, ipaddress.IPv4Address) self.ip6 = typecheck(ip6, ipaddress.IPv6Address) self.port = typecheck(port, int) self.listen_ip4 = typecheck_default(listen_ip4, ipaddress.IPv4Address, None) self.listen_ip6 = typecheck_default(listen_ip6, ipaddress.IPv6Address, None) self.listenport = typecheck_default(listenport, int, port) self.from_ip4 = typecheck_default(from_ip4, ipaddress.IPv4Address, None) self.from_ip6 = typecheck_default(from_ip6, ipaddress.IPv6Address, None) cs.require_cmds('socat', 'cat') ss.require_cmds('socat', 'cat') class BaseTransferTasst(Tasst): def socat_upload(self, datafile, cs, ss, connect, listen): with ss.bg('socat -u {} STDOUT'.format(listen), verbose=False) as server: time.sleep(SERVER_READY_DELAY) cs.fg('socat -u OPEN:{} {}'.format(datafile, connect)) res = server.run() self.assertEquals(res.exit_status, 0) srcdata = cs.output('cat {}'.format(datafile), verbose=False) self.assertEquals(srcdata, res.stdout) def socat_download(self, datafile, cs, ss, connect, listen): with ss.bg('socat -u OPEN:{} {}'.format(datafile, listen)) as server: time.sleep(SERVER_READY_DELAY) dstdata = cs.output('socat -u {} STDOUT'.format(connect), verbose=False) res = server.run() self.assertEquals(res.exit_status, 0) srcdata = ss.output('cat {}'.format(datafile), verbose=False) self.assertEquals(srcdata, dstdata) def _tcp_socat(self, datafile, connectip, connectport, listenip, listenport, fromip): v6 = isinstance(connectip, ipaddress.IPv6Address) if listenport is None: listenport = connectport if v6: connect = 'TCP6:[{}]:{},ipv6only'.format(connectip, connectport) listen = 'TCP6-LISTEN:{},ipv6only'.format(listenport) else: connect = 'TCP4:{}:{}'.format(connectip, connectport) listen = 'TCP4-LISTEN:{}'.format(listenport) if listenip is not None: listen += ',bind=' + socat_fmt(listenip) if fromip is not None: connect += ',bind=' + socat_fmt(fromip) return (connect, listen) def tcp_upload(self, datafile, cs, ss, connectip, connectport, listenip=None, listenport=None, fromip=None): connect, listen = self._tcp_socat(datafile, connectip, connectport, listenip, listenport, fromip) self.socat_upload(datafile, cs, ss, connect, listen) def tcp_download(self, datafile, cs, ss, connectip, connectport, listenip=None, listenport=None, fromip=None): connect, listen = self._tcp_socat(datafile, connectip, connectport, listenip, listenport, fromip) self.socat_download(datafile, cs, ss, connect, listen) def udp_transfer(self, datafile, cs, ss, connectip, connectport, listenip=None, listenport=None, fromip=None): v6 = isinstance(connectip, ipaddress.IPv6Address) if listenport is None: listenport = connectport if v6: connect = 'UDP6:[{}]:{},ipv6only,shut-null'.format(connectip, connectport) listen = 'UDP6-LISTEN:{},ipv6only,null-eof'.format(listenport) if listenip is not None: assert isinstance(listenip, ipaddress.IPv6Address) listen += ',bind=[{}]'.format(listenip) else: connect = 'UDP4:{}:{},shut-null'.format(connectip, connectport) listen = 'UDP4-LISTEN:{},null-eof'.format(listenport) if listenip is not None: assert isinstance(listenip, ipaddress.IPv4Address) listen += ',bind={}'.format(listenip) self.socat_upload(datafile, cs, ss, connect, listen) def setup_transfer(self): raise NotImplementedError("{} must implement setup_transfer() method".format(type(self).__name__)) @contextlib.contextmanager def check_setup_transfer(self): with self.setup_transfer() as tti: if not isinstance(tti, TransferTasstInfo): raise TypeError("{}.setup_tranfer() must yield a TransferTasstInfo instance".format(type(self).__name__)) yield tti class TcpUploadTasst(BaseTransferTasst): """ :avocado: disable """ def test_tcp4_upload(self): with self.check_setup_transfer() as tti: self.tcp_upload(tti.datafile, tti.cs, tti.ss, tti.ip4, tti.port, listenip=tti.listen_ip4, listenport=tti.listenport, fromip=tti.from_ip4) def test_tcp6_upload(self): with self.check_setup_transfer() as tti: self.tcp_upload(tti.datafile, tti.cs, tti.ss, tti.ip6, tti.port, listenip=tti.listen_ip6, listenport=tti.listenport, fromip=tti.from_ip6) class MetaTcpUploadTasst(TcpUploadTasst): """Ugly workaround for https://github.com/avocado-framework/avocado/issues/5680. Explicitly apply the "meta" tag to the tests in TransferTasst. :avocado: disable :avocado: tags=meta """ def test_tcp4_upload(self): super().test_tcp4_upload() def test_tcp6_upload(self): super().test_tcp6_upload() class UdpTransferTasst(BaseTransferTasst): """ :avocado: disable """ def test_udp4_transfer(self): with self.check_setup_transfer() as tti: self.udp_transfer(tti.datafile, tti.cs, tti.ss, tti.ip4, tti.port, listenip=tti.listen_ip4, listenport=tti.listenport, fromip=tti.from_ip4) def test_udp6_transfer(self): with self.check_setup_transfer() as tti: self.udp_transfer(tti.datafile, tti.cs, tti.ss, tti.ip6, tti.port, listenip=tti.listen_ip6, listenport=tti.listenport, fromip=tti.from_ip6) class MetaUdpTransferTasst(UdpTransferTasst): """Ugly workaround for https://github.com/avocado-framework/avocado/issues/5680. Explicitly apply the "meta" tag to the tests in TransferTasst. :avocado: disable :avocado: tags=meta """ def test_udp4_transfer(self): super().test_udp4_transfer() def test_udp6_transfer(self): super().test_udp6_transfer() LOOPBACK4 = ipaddress.ip_address('127.0.0.1') LOOPBACK6 = ipaddress.ip_address('::1') class LocalTransferTasst(MetaTcpUploadTasst, MetaUdpTransferTasst): """Test the transfer helpers """ PORT = 10000 @contextlib.contextmanager def setup_transfer(self): with UnshareSite(type(self).__name__ + '.netns', '-Un') as ns: ns.ifup('lo') yield TransferTasstInfo('test/small.bin', ns, ns, LOOPBACK4, LOOPBACK6, self.PORT)