1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
| | #! /usr/bin/env avocado-runner-avocado-classless
# SPDX-License-Identifier: GPL-2.0-or-later
#
# Copyright Red Hat
# Author: David Gibson <david@gibson.dropbear.id.au>
"""
Test A Simple Socket Transport
nstool.py - Run commands in namespaces via 'nstool'
"""
import contextlib
import os
import tempfile
from avocado.utils.process import CmdError
from avocado_classless.test import assert_eq, assert_raises, test_output
from tasst.exesite import Site, REAL_HOST, test_isolated_site, test_site
from tasst.typecheck import typecheck
# FIXME: Can this be made more portable? # pylint: disable=W0511
UNIX_PATH_MAX = 108
NSTOOL_BIN = './nstool'
class NsToolSite(Site):
"""A bundle of Linux namespaces managed by nstool"""
def __init__(self, name, sockpath, parent=REAL_HOST):
if len(sockpath) > UNIX_PATH_MAX:
raise ValueError(
f'Unix domain socket path "{sockpath}" is too long'
)
super().__init__(name)
self.sockpath = typecheck(sockpath, str)
self.parent = typecheck(parent, Site)
self._pid = None
def __enter__(self):
pid = self.parent.output(f'{NSTOOL_BIN} info -wp {self.sockpath}',
verbose=False, timeout=1)
self._pid = int(pid)
return self
def __exit__(self, *exc_details):
pass
# PID of the nstool hold process as seen by the test host
def pid(self):
return self._pid
# PID of the nstool hold process as seen by another site
# (important when using PID namespaces)
def relative_pid(self, relative_to):
relpid = relative_to.output(f'{NSTOOL_BIN} info -p {self.sockpath}')
return int(relpid)
def hostify(self, cmd, *, sudo=False, **kwargs):
nst_args = self.sockpath
if sudo:
nst_args = '--keep-caps ' + nst_args
return f'{NSTOOL_BIN} exec {nst_args} -- {cmd}', kwargs
def veth(self, ifname, peername, peer=None):
self.fg(f'ip link add {ifname} type veth peer name {peername}',
sudo=True)
if peer is not None:
if not isinstance(peer, NsToolSite):
raise TypeError
self.fg(f'ip link set {peername} netns {peer.relative_pid(self)}',
sudo=True)
@contextlib.contextmanager
def unshare_site(nsname, unshare_opts, parent=REAL_HOST, sudo=False):
unshare_opts = typecheck(unshare_opts, str)
parent = typecheck(parent, Site)
sudo = typecheck(sudo, bool)
parent.require_cmds('unshare', NSTOOL_BIN)
# Create path for temporary nstool Unix socket
#
# Using Avocado's workdir often gives paths that are too lonhg for
# Unix sockets
with tempfile.TemporaryDirectory() as tmpd:
sockpath = os.path.join(tmpd, nsname)
holdcmd = f'unshare {unshare_opts} -- {NSTOOL_BIN} hold {sockpath}'
with parent.bg(holdcmd, sudo=sudo) as holder:
try:
with NsToolSite(nsname, sockpath, parent=parent) as site:
yield site
finally:
try:
parent.fg(f'{NSTOOL_BIN} stop {sockpath}')
finally:
try:
holder.run(timeout=0.1)
finally:
try:
os.remove(sockpath)
except FileNotFoundError:
pass
TEST_EXC = ValueError
def test_sockdir_cleanup(s):
def mess(sockpaths):
with s as site:
ns = site
while isinstance(ns, NsToolSite):
sockpaths.append(ns.sockpath)
ns = ns.parent
raise TEST_EXC
sockpaths = []
assert_raises(TEST_EXC, mess, sockpaths)
assert sockpaths
for path in sockpaths:
assert not os.path.exists(os.path.dirname(path))
def test_userns(nstool_site):
REAL_HOST.require_cmds('capsh')
with nstool_site as ns:
ns.require_cmds('capsh')
capcmd = 'capsh --has-p=CAP_SETUID'
assert_raises(CmdError, REAL_HOST.fg, capcmd)
ns.fg(capcmd, sudo=True)
@test_output(test_userns, test_sockdir_cleanup)
@test_isolated_site
@contextlib.contextmanager
def userns_site():
with unshare_site('usernetns', '-Ucn') as ns:
ns.ifup('lo')
yield ns
@test_output(test_sockdir_cleanup)
@test_isolated_site
@contextlib.contextmanager
def nested_site():
with unshare_site('userns', '-Uc') as userns:
with unshare_site('netns', '-n', parent=userns, sudo=True) as netns:
netns.ifup('lo')
yield netns
def test_relative_pid(s):
with s as site:
# The holder is init (pid 1) within its own pidns
assert_eq(site.relative_pid(site), 1)
@test_output(test_relative_pid, test_sockdir_cleanup)
@test_isolated_site
@contextlib.contextmanager
def pidns_site():
with unshare_site('pidns', '-Upfn') as ns:
ns.ifup('lo')
yield ns
@test_site
@contextlib.contextmanager
def connect_site():
with tempfile.TemporaryDirectory() as tmpd:
sockpath = os.path.join(tmpd, 'nons')
holdcmd = f'{NSTOOL_BIN} hold {sockpath}'
try:
with REAL_HOST.bg(holdcmd, ignore_status=True,
context_timeout=0.1):
with NsToolSite("fake ns", sockpath) as site:
yield site
finally:
os.remove(sockpath)
|