1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
| | #! /usr/bin/python3
# SPDX-License-Identifier: GPL-2.0-or-later
#
# tasst - Test A Simple Socket Transport
# library of test helpers for passt & pasta
#
# tasst/site.py - Manage simulated network sites for testing
#
# Copyright Red Hat
# Author: David Gibson <david@gibson.dropbear.id.au>
import json
import avocado
from avocado.utils.process import CmdError
from tasst import Tasst
class Site:
"""
A (usually virtual or simulated) location where we can execute
commands and configure networks.
"""
def __init__(self, name):
self.name = name # For debugging
# Shut down the site and release any resources it's using. We
# can't use __del__() for this (RAII like), because Python doesn't
# guarantee that will get called, and that's pretty easy to hit in
# practice. The modern Pythonic way of doing this is 'with' and
# ContextManagers, but that doesn't work with Avocado's jUnit
# derived format for setUp() and tearDown(). Oh well, do it
# manually.
def close(self):
pass
def output(self, cmd, **kwargs):
raise NotImplementedError
def fg(self, cmd, **kwargs):
self.output(cmd, **kwargs)
def bg(self, cmd, **kwargs):
raise NotImplementedError
def require_cmds(self, *cmds):
missing = [c for c in cmds
if self.fg('type {}'.format(c), ignore_status=True) != 0]
if missing:
raise avocado.TestCancel("Missing commands {} on {}"
.format(', '.join(missing), self.name))
def ifs(self):
self.require_cmds('ip')
info = json.loads(self.output('ip -j link show'))
return [i['ifname'] for i in info]
class BaseSiteTasst(Tasst):
"""
Basic tests for executing commands on sites
:avocado: disable
:avocado: tags=meta
"""
timeout = 1.0
# Derived classes must call this from setUp()
def subsetup(self, site):
assert isinstance(site, Site)
site.require_cmds('true', 'false', 'echo')
Tasst.subsetup(self, BaseSiteTasst, site)
def test_true(self):
site = self.get_subsetup(BaseSiteTasst)
site.fg('true')
def test_false(self):
site = self.get_subsetup(BaseSiteTasst)
self.assertRaises(CmdError, site.fg, 'false')
def test_echo(self):
site = self.get_subsetup(BaseSiteTasst)
s = 'Hello tasst'
out = site.output('echo {}'.format(s))
self.assertEquals(out, s.encode('utf-8'))
def test_bg_true(self):
site = self.get_subsetup(BaseSiteTasst)
proc = site.bg('true')
status = proc.wait()
self.assertEquals(status, 0)
def test_bg_false(self):
site = self.get_subsetup(BaseSiteTasst)
proc = site.bg('false')
status = proc.wait()
self.assertNotEquals(status, 0)
def test_has_lo(self):
site = self.get_subsetup(BaseSiteTasst)
self.assertIn('lo', site.ifs())
# Represents the host on which the tests are running, as opposed to
# some simulated host created by the tests
class RealHost(Site):
def __init__(self):
super().__init__('REAL_HOST')
def output(self, cmd, sudo=False, **kwargs):
assert not sudo, "BUG: Shouldn't run commands with privilege on host"
return avocado.utils.process.system_output(cmd, **kwargs)
def fg(self, cmd, sudo=False, **kwargs):
assert not sudo, "BUG: Shouldn't run commands with privilege on host"
return avocado.utils.process.system(cmd, **kwargs)
def bg(self, cmd, sudo=False, **kwargs):
assert not sudo, "BUG: Shouldn't run commands with privilege on host"
proc = avocado.utils.process.SubProcess(cmd, **kwargs)
proc.start()
return proc
REAL_HOST = RealHost()
class RealHostTasst(BaseSiteTasst):
def setUp(self):
super().setUp()
BaseSiteTasst.subsetup(self, REAL_HOST)
|