#! /usr/bin/python3 # SPDX-License-Identifier: GPL-2.0-or-later # # tasst - Test A Simple Socket Transport # library of test helpers for passt & pasta # # tasst/nstool.py - Run commands in namespaces via 'nstool' # # Copyright Red Hat # Author: David Gibson import contextlib import os import sys import tempfile import avocado from avocado.utils.process import CmdError from tasst import Tasst from tasst.site import IsolatedSiteTasst, SiteTasst, Site, REAL_HOST from tasst.typing import typecheck # FIXME: Can this be made more portable? UNIX_PATH_MAX = 108 class NsToolSite(Site): NST_BIN = './test/nstool' def __init__(self, name, sockpath): if len(sockpath) > UNIX_PATH_MAX: raise ValueError('Unix domain socket path \"{}\" is too long'.format(sockpath)) super().__init__(name) self.sockpath = sockpath def __enter__(self): self._pid = int(REAL_HOST.output('{} info -wp {}'.format(self.NST_BIN, self.sockpath), timeout=1)) return self def __exit__(self, *exc_details): pass # PID of the nstool hold process as seen by the test host def pid(self): return self._pid # PID of the nstool hold process as seen by another site # (important when using PID namespaces) def relative_pid(self, relative_to): cmd = '{} info -p {}'.format(self.NST_BIN, self.sockpath) return int(relative_to.output(cmd)) def _nst_cmd(self, cmd, sudo=False): nst_args = self.sockpath if sudo: nst_args = '--keep-caps ' + nst_args return '{} exec {} -- {}'.format(self.NST_BIN, nst_args, cmd) def output(self, cmd, sudo=False, **kwargs): return REAL_HOST.output(self._nst_cmd(cmd, sudo), **kwargs) def fg(self, cmd, sudo=False, **kwargs): return REAL_HOST.fg(self._nst_cmd(cmd, sudo), **kwargs) def bg(self, cmd, sudo=False, **kwargs): return REAL_HOST.bg(self._nst_cmd(cmd, sudo), **kwargs) # Create path for temporary nstool Unix socket # # The obvious choice would be to use Avocado's workdir, but that often # gives paths that are too long for Unix sockets def temp_sockpath(name): tmpd = tempfile.mkdtemp(suffix=name) return os.path.join(tmpd, 's') class UnshareSite(NsToolSite): def __init__(self, name, unshare_opts, parent=REAL_HOST, sudo=False): sockpath = temp_sockpath(name) parent.require_cmds('unshare', self.NST_BIN) super().__init__(name, sockpath) self.parent = typecheck(parent, Site) self.holdcmd = 'unshare {} -- {} hold {}'.format(unshare_opts, self.NST_BIN, sockpath) self.sudo = typecheck(sudo, bool) def __enter__(self): self.holder = self.parent.bg(self.holdcmd, sudo=self.sudo) self.holder.__enter__() return super().__enter__() def __exit__(self, *exc_details): super().__exit__(*exc_details) try: self.parent.fg('{} stop {}'.format(self.NST_BIN, self.sockpath)) finally: self.holder.__exit__(*exc_details) try: os.remove(self.sockpath) except FileNotFoundError: pass os.rmdir(os.path.dirname(self.sockpath)) class UserNetNsTasst(IsolatedSiteTasst): """ Test creating a userns+netns together :avocado: tags=meta """ def setup_site(self): return UnshareSite(type(self).__name__, '-Ucn') def test_userns(self): REAL_HOST.require_cmds('capsh') with self.setup_site() as ns: ns.require_cmds('capsh') capcmd = 'capsh --has-p=CAP_SETUID' self.assertRaises(CmdError, REAL_HOST.fg, capcmd) ns.fg(capcmd, sudo=True) class NestedNsTasst(IsolatedSiteTasst): """ Test creating userns with a netns nested within :avocado: tags=meta """ @contextlib.contextmanager def setup_site(self): name = type(self).__name__ with UnshareSite(name + '.userns', '-Uc') as userns: with UnshareSite(name + '.netns', '-n', parent=userns, sudo=True) as netns: yield netns class PidNsTasst(IsolatedSiteTasst): """ Test unsing unshare -p to create a pidns :avocado: tags=meta """ def setup_site(self): return UnshareSite(type(self).__name__, '-Upfn') def test_relative_pid(self): with self.setup_site() as site: # The holder is init (pid 1) within its own pidns self.assertEquals(site.relative_pid(site), 1) class ConnectNsToolTasst(SiteTasst): """ Test connecting to a pre-existing nstool :avocado: tags=meta """ @contextlib.contextmanager def setup_site(self): sockpath = temp_sockpath(type(self).__name__) holdcmd = '{} hold {}'.format(NsToolSite.NST_BIN, sockpath) with REAL_HOST.bg(holdcmd) as holder: with NsToolSite("fake ns", sockpath) as site: yield site try: os.remove(sockpath) finally: os.rmdir(os.path.dirname(sockpath))