#! /usr/bin/python3 # SPDX-License-Identifier: GPL-2.0-or-later # # tasst - Test A Simple Socket Transport # library of test helpers for passt & pasta # # tasst/transfer.py - Helpers for testing data transfers # # Copyright Red Hat # Author: David Gibson import contextlib import ipaddress import time import avocado from tasst import Tasst, TasstSubData from tasst.address import LOOPBACK4, LOOPBACK6 from tasst.site import Site from tasst.nstool import UnshareSite from tasst.typing import typecheck, typecheck_default # HACK: how long to wait for the server to be ready and listening (s) SERVER_READY_DELAY = 0.05 # 1/20th of a second # socat needs IPv6 addresses in square brackets def socat_fmt(ip): if isinstance(ip, ipaddress.IPv6Address): return '[{}]'.format(ip) elif isinstance(ip, ipaddress.IPv4Address): return '{}'.format(ip) else: raise TypeError class TransferTasstInfo: def __init__(self, datafile, cs, ss, ip4, ip6, port, listen_ip4=None, listen_ip6=None, listenport=None, from_ip4=None, from_ip6=None): self.datafile = typecheck(datafile, str) self.cs = typecheck(cs, Site) self.ss = typecheck(ss, Site) self.ip4 = typecheck(ip4, ipaddress.IPv4Address) self.ip6 = typecheck(ip6, ipaddress.IPv6Address) self.port = typecheck(port, int) self.listen_ip4 = typecheck_default(listen_ip4, ipaddress.IPv4Address, None) self.listen_ip6 = typecheck_default(listen_ip6, ipaddress.IPv6Address, None) self.listenport = typecheck_default(listenport, int, port) self.from_ip4 = typecheck_default(from_ip4, ipaddress.IPv4Address, None) self.from_ip6 = typecheck_default(from_ip6, ipaddress.IPv6Address, None) cs.require_cmds('socat', 'cat') ss.require_cmds('socat', 'cat') class BaseTransferTasst(Tasst): def socat_upload(self, datafile, cs, ss, connect, listen): with ss.bg('socat -u {} STDOUT'.format(listen), verbose=False) as server: time.sleep(SERVER_READY_DELAY) cs.fg('socat -u OPEN:{} {}'.format(datafile, connect)) res = server.run() self.assertEquals(res.exit_status, 0) srcdata = cs.output('cat {}'.format(datafile), verbose=False) self.assertEquals(srcdata, res.stdout) def socat_download(self, datafile, cs, ss, connect, listen): with ss.bg('socat -u OPEN:{} {}'.format(datafile, listen)) as server: time.sleep(SERVER_READY_DELAY) dstdata = cs.output('socat -u {} STDOUT'.format(connect), verbose=False) res = server.run() self.assertEquals(res.exit_status, 0) srcdata = ss.output('cat {}'.format(datafile), verbose=False) self.assertEquals(srcdata, dstdata) def _tcp_socat(self, datafile, connectip, connectport, listenip, listenport, fromip): v6 = isinstance(connectip, ipaddress.IPv6Address) if listenport is None: listenport = connectport if v6: connect = 'TCP6:[{}]:{},ipv6only'.format(connectip, connectport) listen = 'TCP6-LISTEN:{},ipv6only'.format(listenport) else: connect = 'TCP4:{}:{}'.format(connectip, connectport) listen = 'TCP4-LISTEN:{}'.format(listenport) if listenip is not None: listen += ',bind=' + socat_fmt(listenip) if fromip is not None: connect += ',bind=' + socat_fmt(fromip) return (connect, listen) def tcp_upload(self, datafile, cs, ss, connectip, connectport, listenip=None, listenport=None, fromip=None): connect, listen = self._tcp_socat(datafile, connectip, connectport, listenip, listenport, fromip) self.socat_upload(datafile, cs, ss, connect, listen) def tcp_download(self, datafile, cs, ss, connectip, connectport, listenip=None, listenport=None, fromip=None): connect, listen = self._tcp_socat(datafile, connectip, connectport, listenip, listenport, fromip) self.socat_download(datafile, cs, ss, connect, listen) def udp_transfer(self, datafile, cs, ss, connectip, connectport, listenip=None, listenport=None, fromip=None): v6 = isinstance(connectip, ipaddress.IPv6Address) if listenport is None: listenport = connectport if v6: connect = 'UDP6:[{}]:{},ipv6only,shut-null'.format(connectip, connectport) listen = 'UDP6-LISTEN:{},ipv6only,null-eof'.format(listenport) if listenip is not None: assert isinstance(listenip, ipaddress.IPv6Address) listen += ',bind=[{}]'.format(listenip) else: connect = 'UDP4:{}:{},shut-null'.format(connectip, connectport) listen = 'UDP4-LISTEN:{},null-eof'.format(listenport) if listenip is not None: assert isinstance(listenip, ipaddress.IPv4Address) listen += ',bind={}'.format(listenip) self.socat_upload(datafile, cs, ss, connect, listen) def setup_transfer(self): raise NotImplementedError("{} must implement setup_transfer() method".format(type(self).__name__)) @contextlib.contextmanager def check_setup_transfer(self): with self.setup_transfer() as tti: if not isinstance(tti, TransferTasstInfo): raise TypeError("{}.setup_tranfer() must yield a TransferTasstInfo instance".format(type(self).__name__)) yield tti class TcpUploadTasst(BaseTransferTasst): """ :avocado: disable """ def test_tcp4_upload(self): with self.check_setup_transfer() as tti: self.tcp_upload(tti.datafile, tti.cs, tti.ss, tti.ip4, tti.port, listenip=tti.listen_ip4, listenport=tti.listenport, fromip=tti.from_ip4) def test_tcp6_upload(self): with self.check_setup_transfer() as tti: self.tcp_upload(tti.datafile, tti.cs, tti.ss, tti.ip6, tti.port, listenip=tti.listen_ip6, listenport=tti.listenport, fromip=tti.from_ip6) class MetaTcpUploadTasst(TcpUploadTasst): """Ugly workaround for https://github.com/avocado-framework/avocado/issues/5680. Explicitly apply the "meta" tag to the tests in TransferTasst. :avocado: disable :avocado: tags=meta """ def test_tcp4_upload(self): super().test_tcp4_upload() def test_tcp6_upload(self): super().test_tcp6_upload() class UdpTransferTasst(BaseTransferTasst): """ :avocado: disable """ def test_udp4_transfer(self): with self.check_setup_transfer() as tti: self.udp_transfer(tti.datafile, tti.cs, tti.ss, tti.ip4, tti.port, listenip=tti.listen_ip4, listenport=tti.listenport, fromip=tti.from_ip4) def test_udp6_transfer(self): with self.check_setup_transfer() as tti: self.udp_transfer(tti.datafile, tti.cs, tti.ss, tti.ip6, tti.port, listenip=tti.listen_ip6, listenport=tti.listenport, fromip=tti.from_ip6) class MetaUdpTransferTasst(UdpTransferTasst): """Ugly workaround for https://github.com/avocado-framework/avocado/issues/5680. Explicitly apply the "meta" tag to the tests in TransferTasst. :avocado: disable :avocado: tags=meta """ def test_udp4_transfer(self): super().test_udp4_transfer() def test_udp6_transfer(self): super().test_udp6_transfer() LOOPBACK4 = ipaddress.ip_address('127.0.0.1') LOOPBACK6 = ipaddress.ip_address('::1') class LocalTransferTasst(MetaTcpUploadTasst, MetaUdpTransferTasst): """Test the transfer helpers """ PORT = 10000 @contextlib.contextmanager def setup_transfer(self): with UnshareSite(type(self).__name__ + '.netns', '-Un') as ns: ns.ifup('lo') yield TransferTasstInfo('test/small.bin', ns, ns, LOOPBACK4, LOOPBACK6, self.PORT)