1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
| | #! /usr/bin/env python3
# SPDX-License-Identifier: GPL-2.0-or-later
#
# Copyright Red Hat
# Author: David Gibson <david@gibson.dropbear.id.au>
"""
Test A Simple Socket Transport
tasst/snh.py - Simulated network hosts for testing
"""
from __future__ import annotations
import contextlib
import enum
import subprocess
import sys
from typing import Any, Iterator, Optional
import exeter
class Capture(enum.Enum):
STDOUT = 1
# We might need our own versions of these eventually, but for now we
# can just alias the ones in subprocess
CompletedCmd = subprocess.CompletedProcess[bytes]
TimeoutExpired = subprocess.TimeoutExpired
CmdError = subprocess.CalledProcessError
class RunningCmd:
"""
A background process running on a CmdSite
"""
site: CmdSite
cmd: tuple[str, ...]
check: bool
popen: subprocess.Popen[bytes]
def __init__(self, site: CmdSite, popen: subprocess.Popen[bytes],
*cmd: str, check: bool = True) -> None:
self.site = site
self.popen = popen
self.cmd = cmd
self.check = check
def run(self, **kwargs: Any) -> CompletedCmd:
stdout, stderr = self.popen.communicate(**kwargs)
cp = CompletedCmd(self.popen.args, self.popen.returncode,
stdout, stderr)
if self.check:
cp.check_returncode()
return cp
def terminate(self) -> None:
self.popen.terminate()
def kill(self) -> None:
self.popen.kill()
class CmdSite(exeter.Scenario):
"""
A (usually virtual or simulated) location where we can execute
commands and configure networks.
"""
name: str
def __init__(self, name: str) -> None:
self.name = name # For debugging
def output(self, *cmd: str, **kwargs: Any) -> bytes:
proc = self.fg(*cmd, capture=Capture.STDOUT, **kwargs)
return proc.stdout
def fg(self, *cmd: str, timeout: Optional[float] = None, **kwargs: Any) \
-> CompletedCmd:
# We don't use subprocess.run() because it kills without
# attempting to terminate on timeout
with self.bg(*cmd, **kwargs) as proc:
res = proc.run(timeout=timeout)
return res
def sh(self, script: str, **kwargs: Any) -> None:
for cmd in script.splitlines():
self.fg(cmd, shell=True, **kwargs)
@contextlib.contextmanager
def bg(self, *cmd: str, capture: Optional[Capture] = None,
check: bool = True, context_timeout: float = 1.0, **kwargs: Any) \
-> Iterator[RunningCmd]:
if capture == Capture.STDOUT:
kwargs['stdout'] = subprocess.PIPE
print(f"Site {self.name}: {cmd}", file=sys.stderr)
with self.popen(*cmd, **kwargs) as popen:
proc = RunningCmd(self, popen, *cmd, check=check)
try:
yield proc
finally:
try:
popen.wait(timeout=context_timeout)
except subprocess.TimeoutExpired as e:
popen.terminate()
try:
popen.wait(timeout=context_timeout)
except subprocess.TimeoutExpired:
popen.kill()
raise e
def popen(self, *cmd: str, **kwargs: Any) -> subprocess.Popen[bytes]:
raise NotImplementedError
@exeter.scenariotest
def test_true(self) -> None:
self.fg('true')
@exeter.scenariotest
def test_false(self) -> None:
exeter.assert_raises(CmdError, self.fg, 'false')
@exeter.scenariotest
def test_echo(self) -> None:
msg = 'Hello tasst'
out = self.output('echo', f'{msg}')
exeter.assert_eq(out, msg.encode('utf-8') + b'\n')
@exeter.scenariotest
def test_timeout(self) -> None:
exeter.assert_raises(TimeoutExpired, self.fg,
'sleep', 'infinity', timeout=0.1, check=False)
@exeter.scenariotest
def test_bg_true(self) -> None:
with self.bg('true') as proc:
proc.run()
@exeter.scenariotest
def test_bg_false(self) -> None:
with self.bg('false') as proc:
exeter.assert_raises(CmdError, proc.run)
@exeter.scenariotest
def test_bg_echo(self) -> None:
msg = 'Hello tasst'
with self.bg('echo', f'{msg}', capture=Capture.STDOUT) as proc:
res = proc.run()
exeter.assert_eq(res.stdout, msg.encode('utf-8') + b'\n')
@exeter.scenariotest
def test_bg_timeout(self) -> None:
with self.bg('sleep', 'infinity') as proc:
exeter.assert_raises(TimeoutExpired, proc.run, timeout=0.1)
proc.terminate()
@exeter.scenariotest
def test_bg_context_timeout(self) -> None:
def run_timeout() -> None:
with self.bg('sleep', 'infinity', context_timeout=0.1):
pass
exeter.assert_raises(TimeoutExpired, run_timeout)
class BuildHost(CmdSite):
"""
Represents the host on which the tests are running (as opposed
to some simulated host created by the tests)
"""
def __init__(self) -> None:
super().__init__('BUILD_HOST')
def popen(self, *cmd: str, privilege: bool = False, **kwargs: Any) \
-> subprocess.Popen[bytes]:
assert not privilege, \
"BUG: Shouldn't run commands with privilege on host"
return subprocess.Popen(cmd, **kwargs)
BUILD_HOST = BuildHost()
def build_host() -> Iterator[BuildHost]:
yield BUILD_HOST
def selftests() -> None:
CmdSite.test(build_host)
|