#! /usr/bin/python3 # SPDX-License-Identifier: GPL-2.0-or-later # # tasst - Test A Simple Socket Transport # library of test helpers for passt & pasta # # tasst/nstool.py - Run commands in namespaces via 'nstool' # # Copyright Red Hat # Author: David Gibson import os import sys import tempfile import avocado from avocado.utils.process import CmdError from tasst import Tasst from tasst.site import BaseSiteTasst, Site, REAL_HOST # FIXME: Can this be made more portable? UNIX_PATH_MAX = 108 class NsToolSite(Site): NST_BIN = './test/nstool' def __init__(self, name, sockpath): super().__init__(name) if len(sockpath) > UNIX_PATH_MAX: raise ValueError('Unix domain socket path \"{}\" is too long'.format(sockpath)) self.sockpath = sockpath self._pid = int(REAL_HOST.output('{} info -wp {}'.format(self.NST_BIN, sockpath), timeout=1)) print('NsToolSite: sockpath={} PID={}'.format(sockpath, self.pid()), file=sys.stderr) # PID of the nstool hold process as seen by the test host def pid(self): return self._pid # PID of the nstool hold process as seen by another site # (important when using PID namespaces) def relative_pid(self, relative_to): cmd = '{} info -p {}'.format(self.NST_BIN, self.sockpath) return int(relative_to.output(cmd)) def _nst_cmd(self, cmd, sudo=False): nst_args = self.sockpath if sudo: nst_args = '--keep-caps ' + nst_args return '{} exec {} -- {}'.format(self.NST_BIN, nst_args, cmd) def output(self, cmd, sudo=False, **kwargs): return REAL_HOST.output(self._nst_cmd(cmd, sudo), **kwargs) def fg(self, cmd, sudo=False, **kwargs): return REAL_HOST.fg(self._nst_cmd(cmd, sudo), **kwargs) def bg(self, cmd, sudo=False, **kwargs): return REAL_HOST.bg(self._nst_cmd(cmd, sudo), **kwargs) def veth(self, ifname, peername, peer=None): self.fg('ip link add {} type veth peer name {}'.format(ifname, peername), sudo=True) if peer is not None: if not isinstance(peer, NsToolSite): raise TypeError self.fg('ip link set {} netns {}'.format(peername, peer.relative_pid(self)), sudo=True) # Create path for temporary nstool Unix socket # # The obvious choice would be to use Avocado's workdir, but that often # gives paths that are too long for Unix sockets def temp_sockpath(name): tmpd = tempfile.mkdtemp(suffix=name) return os.path.join(tmpd, 's') class UnshareSite(NsToolSite): def __init__(self, name, unshare_opts, parent=REAL_HOST, sudo=False): sockpath = temp_sockpath(name) parent.require_cmds('unshare', self.NST_BIN) self.parent = parent holdcmd = 'unshare {} -- {} hold {}'.format(unshare_opts, self.NST_BIN, sockpath) self.holder = parent.bg(holdcmd, sudo=sudo) super().__init__(name, sockpath) def close(self): try: self.parent.fg('{} stop {}'.format(self.NST_BIN, self.sockpath)) except CmdError: pass try: self.holder.stop() finally: try: os.remove(self.sockpath) except FileNotFoundError: pass os.rmdir(os.path.dirname(self.sockpath)) super().close() class IsolatedNetTasst(BaseSiteTasst): """ Test a site with isolated network :avocado: disable :avocado: tags=meta """ def subsetup(self, site): site.ifup('lo') Tasst.subsetup(self, IsolatedNetTasst, site) BaseSiteTasst.subsetup(self, site) def test_isolated_net(self): site = self.get_subsetup(IsolatedNetTasst) self.assertEquals(site.ifs(), ['lo']) class UserNetNsTasst(IsolatedNetTasst): """ Test creating a userns+netns together :avocado: tags=meta """ def setUp(self): super().setUp() self.ns = UnshareSite(type(self).__name__, '-Ucn') IsolatedNetTasst.subsetup(self, self.ns) def tearDown(self): self.ns.close() super().tearDown() def test_userns(self): REAL_HOST.require_cmds('capsh') self.ns.require_cmds('capsh') capcmd = 'capsh --has-p=CAP_SETUID' self.assertRaises(CmdError, REAL_HOST.fg, capcmd) self.ns.fg(capcmd, sudo=True) class NestedNsTasst(IsolatedNetTasst): """ Test creating userns with a netns nested within :avocado: tags=meta """ def setUp(self): super().setUp() self.userns = UnshareSite(type(self).__name__ + '.userns', '-Uc') self.netns = UnshareSite(type(self).__name__ + '.netns', '-n', parent=self.userns, sudo=True) IsolatedNetTasst.subsetup(self, self.netns) def tearDown(self): self.netns.close() self.userns.close() super().tearDown() class PidNsTasst(IsolatedNetTasst): """ Test unsing unshare -p to create a pidns :avocado: tags=meta """ def setUp(self): super().setUp() self.pidns = UnshareSite(type(self).__name__, '-Upfn') IsolatedNetTasst.subsetup(self, self.pidns) def tearDown(self): self.pidns.close() super().tearDown() def test_relative_pid(self): # The holder is init (pid 1) within its own pidns self.assertEquals(self.pidns.relative_pid(self.pidns), 1) class ConnectNsToolTasst(BaseSiteTasst): """ Test connecting to a pre-existing nstool :avocado: tags=meta """ def setUp(self): super().setUp() self.sockpath = temp_sockpath(type(self).__name__) holdcmd = '{} hold {}'.format(NsToolSite.NST_BIN, self.sockpath) self.holder = REAL_HOST.bg(holdcmd) BaseSiteTasst.subsetup(self, NsToolSite("fake ns", self.sockpath)) def tearDown(self): try: self.holder.stop() finally: try: os.remove(self.sockpath) except FileNotFoundError: pass os.rmdir(os.path.dirname(self.sockpath)) super().tearDown()