#! /usr/bin/env python3 # SPDX-License-Identifier: GPL-2.0-or-later # # Copyright Red Hat # Author: David Gibson """ Test A Simple Socket Transport tasst/snh.py - Simulated network hosts for testing """ import contextlib import ipaddress import json import subprocess import sys import exeter STDOUT = 1 class SnhProcess(contextlib.AbstractContextManager): """ A background process running on a SimNetHost """ def __init__(self, snh, *cmd, check=True, context_timeout=1.0, **kwargs): self.snh = snh self.cmd = cmd self.check = check self.context_timeout = float(context_timeout) self.kwargs = kwargs def __enter__(self): self.popen = subprocess.Popen(self.cmd, **self.kwargs) return self def run(self, **kwargs): stdout, stderr = self.popen.communicate(**kwargs) cp = subprocess.CompletedProcess(self.popen.args, self.popen.returncode, stdout, stderr) if self.check: cp.check_returncode() return cp def terminate(self): self.popen.terminate() def kill(self): self.popen.kill() def __exit__(self, *exc_details): try: self.popen.wait(timeout=self.context_timeout) except subprocess.TimeoutExpired as e: self.terminate() try: self.popen.wait(timeout=self.context_timeout) except subprocess.TimeoutExpired: self.kill() raise e class SimNetHost(contextlib.AbstractContextManager): """ A (usually virtual or simulated) location where we can execute commands and configure networks. """ def __init__(self, name): self.name = name # For debugging def hostify(self, *cmd, **kwargs): raise NotImplementedError def __enter__(self): return self def __exit__(self, *exc_details): pass def output(self, *cmd, **kwargs): proc = self.fg(*cmd, capture=STDOUT, **kwargs) return proc.stdout def fg(self, *cmd, timeout=None, **kwargs): # We don't use subprocess.run() because it kills without # attempting to terminate on timeout with self.bg(*cmd, **kwargs) as proc: res = proc.run(timeout=timeout) return res def bg(self, *cmd, capture=None, **kwargs): if capture == STDOUT: kwargs['stdout'] = subprocess.PIPE hostcmd, kwargs = self.hostify(*cmd, **kwargs) proc = SnhProcess(self, *hostcmd, **kwargs) print(f"SimNetHost {self.name}: Started {cmd} => {proc}", file=sys.stderr) return proc def ifs(self): info = json.loads(self.output('ip', '-j', 'link', 'show')) return [i['ifname'] for i in info] def ifup(self, ifname, *addrs, dad=None): if dad == 'disable': self.fg('sysctl', f'net.ipv6.conf.{ifname}.accept_dad=0', capable=True) elif dad == 'optimistic': self.fg('sysctl', f'net.ipv6.conf.{ifname}.optimistic_dad=1', capable=True) elif dad is not None: raise ValueError for a in addrs: if not isinstance(a, ipaddress.IPv4Interface) \ and not isinstance(a, ipaddress.IPv6Interface): raise TypeError self.fg('ip', 'addr', 'add', f'{a.with_prefixlen}', 'dev', f'{ifname}', capable=True) self.fg('ip', 'link', 'set', f'{ifname}', 'up', capable=True) def addrinfos(self, ifname, **criteria): info = json.loads(self.output('ip', '-j', 'addr', 'show', f'{ifname}')) assert len(info) == 1 # We specified a specific interface ais = list(ai for ai in info[0]['addr_info']) for key, value in criteria.items(): ais = [ai for ai in ais if key in ai and ai[key] == value] return ais def addrs(self, ifname, **criteria): # Return just the parsed, non-tentative addresses return [ipaddress.ip_interface(f'{ai["local"]}/{ai["prefixlen"]}') for ai in self.addrinfos(ifname, **criteria) if 'tentative' not in ai] def mtu(self, ifname): cmd = ['ip', '-j', 'link', 'show', f'{ifname}'] (info,) = json.loads(self.output(*cmd)) return info['mtu'] def addr_wait(self, ifname, **criteria): while True: addrs = self.addrs(ifname, **criteria) if addrs: return addrs # Internal tests def test_true(self): with self as snh: snh.fg('true') def test_false(self): with self as snh: exeter.assert_raises(subprocess.CalledProcessError, snh.fg, 'false') def test_echo(self): msg = 'Hello tasst' with self as snh: out = snh.output('echo', f'{msg}') exeter.assert_eq(out, msg.encode('utf-8') + b'\n') def test_timeout(self): with self as snh: exeter.assert_raises(subprocess.TimeoutExpired, snh.fg, 'sleep', 'infinity', timeout=0.1, check=False) def test_bg_true(self): with self as snh: with snh.bg('true'): pass def test_bg_false(self): with self as snh: with snh.bg('false') as proc: exeter.assert_raises(subprocess.CalledProcessError, proc.run) def test_bg_echo(self): msg = 'Hello tasst' with self as snh: with snh.bg('echo', f'{msg}', capture=STDOUT) as proc: res = proc.run() exeter.assert_eq(res.stdout, msg.encode('utf-8') + b'\n') def test_bg_timeout(self): with self as snh: with snh.bg('sleep', 'infinity') as proc: exeter.assert_raises(subprocess.TimeoutExpired, proc.run, timeout=0.1) proc.terminate() def test_bg_context_timeout(self): with self as snh: def run_timeout(): with snh.bg('sleep', 'infinity', context_timeout=0.1): pass exeter.assert_raises(subprocess.TimeoutExpired, run_timeout) def test_has_lo(self): with self as snh: assert 'lo' in snh.ifs() def test_lo_addrs(self): expected = set(ipaddress.ip_interface(a) for a in ['127.0.0.1/8', '::1/128']) with self as snh: assert set(snh.addrs('lo')) == expected def test_lo_mtu(self): with self as snh: exeter.assert_eq(snh.mtu('lo'), 65536) SELFTESTS = [test_true, test_false, test_echo, test_timeout, test_bg_true, test_bg_false, test_bg_echo, test_bg_timeout, test_bg_context_timeout, test_has_lo, test_lo_addrs, test_lo_mtu] @classmethod def selftest(cls, setup): "Register standard snh tests for instance returned by setup" for t in cls.SELFTESTS: testid = f'{setup.__qualname__}|{t.__qualname__}' exeter.register_pipe(testid, setup, t) # Additional tests only valid if the snh is isolated (no outside # network connections) def test_is_isolated(self): with self as snh: exeter.assert_eq(snh.ifs(), ['lo']) ISOLATED_SELFTESTS = [test_is_isolated] @classmethod def selftest_isolated(cls, setup): "Register self tests for an isolated snh example" cls.selftest(setup) for t in cls.ISOLATED_SELFTESTS: testid = f'{setup.__qualname__}|{t.__qualname__}' exeter.register_pipe(testid, setup, t) class RealHost(SimNetHost): """Represents the host on which the tests are running (as opposed to some simulated host created by the tests) """ def __init__(self): super().__init__('REAL_HOST') def hostify(self, *cmd, capable=False, **kwargs): assert not capable, \ "BUG: Shouldn't run commands with capabilities on host" return cmd, kwargs SimNetHost.selftest(RealHost)