#! /usr/bin/env avocado-runner-avocado-classless # SPDX-License-Identifier: GPL-2.0-or-later # # Copyright Red Hat # Author: David Gibson """ Test A Simple Socket Transport nstool.py - Run commands in namespaces via 'nstool' """ import contextlib import os import tempfile from avocado.utils.process import CmdError from avocado_classless.test import assert_eq, assert_raises, test_output from tasst.exesite import Site, REAL_HOST, test_isolated_site, test_site from tasst.typecheck import typecheck # FIXME: Can this be made more portable? # pylint: disable=W0511 UNIX_PATH_MAX = 108 NSTOOL_BIN = './nstool' class NsToolSite(Site): """A bundle of Linux namespaces managed by nstool""" def __init__(self, name, sockpath, parent=REAL_HOST): if len(sockpath) > UNIX_PATH_MAX: raise ValueError( f'Unix domain socket path "{sockpath}" is too long' ) super().__init__(name) self.sockpath = typecheck(sockpath, str) self.parent = typecheck(parent, Site) self._pid = None def __enter__(self): pid = self.parent.output(f'{NSTOOL_BIN} info -wp {self.sockpath}', verbose=False, timeout=1) self._pid = int(pid) return self def __exit__(self, *exc_details): pass # PID of the nstool hold process as seen by the test host def pid(self): return self._pid # PID of the nstool hold process as seen by another site # (important when using PID namespaces) def relative_pid(self, relative_to): relpid = relative_to.output(f'{NSTOOL_BIN} info -p {self.sockpath}') return int(relpid) def hostify(self, cmd, *, sudo=False, **kwargs): nst_args = self.sockpath if sudo: nst_args = '--keep-caps ' + nst_args return f'{NSTOOL_BIN} exec {nst_args} -- {cmd}', kwargs @contextlib.contextmanager def unshare_site(nsname, unshare_opts, parent=REAL_HOST, sudo=False): unshare_opts = typecheck(unshare_opts, str) parent = typecheck(parent, Site) sudo = typecheck(sudo, bool) parent.require_cmds('unshare', NSTOOL_BIN) # Create path for temporary nstool Unix socket # # Using Avocado's workdir often gives paths that are too lonhg for # Unix sockets with tempfile.TemporaryDirectory() as tmpd: sockpath = os.path.join(tmpd, nsname) holdcmd = f'unshare {unshare_opts} -- {NSTOOL_BIN} hold {sockpath}' with parent.bg(holdcmd, sudo=sudo) as holder: try: with NsToolSite(nsname, sockpath, parent=parent) as site: yield site finally: try: parent.fg(f'{NSTOOL_BIN} stop {sockpath}') finally: try: holder.run(timeout=0.1) finally: try: os.remove(sockpath) except FileNotFoundError: pass TEST_EXC = ValueError def test_sockdir_cleanup(s): def mess(sockpaths): with s as site: ns = site while isinstance(ns, NsToolSite): sockpaths.append(ns.sockpath) ns = ns.parent raise TEST_EXC sockpaths = [] assert_raises(TEST_EXC, mess, sockpaths) assert sockpaths for path in sockpaths: assert not os.path.exists(os.path.dirname(path)) def test_userns(nstool_site): REAL_HOST.require_cmds('capsh') with nstool_site as ns: ns.require_cmds('capsh') capcmd = 'capsh --has-p=CAP_SETUID' assert_raises(CmdError, REAL_HOST.fg, capcmd) ns.fg(capcmd, sudo=True) @test_output(test_userns, test_sockdir_cleanup) @test_isolated_site @contextlib.contextmanager def userns_site(): with unshare_site('usernetns', '-Ucn') as ns: ns.ifup('lo') yield ns @test_output(test_sockdir_cleanup) @test_isolated_site @contextlib.contextmanager def nested_site(): with unshare_site('userns', '-Uc') as userns: with unshare_site('netns', '-n', parent=userns, sudo=True) as netns: netns.ifup('lo') yield netns def test_relative_pid(s): with s as site: # The holder is init (pid 1) within its own pidns assert_eq(site.relative_pid(site), 1) @test_output(test_relative_pid, test_sockdir_cleanup) @test_isolated_site @contextlib.contextmanager def pidns_site(): with unshare_site('pidns', '-Upfn') as ns: ns.ifup('lo') yield ns @test_site @contextlib.contextmanager def connect_site(): with tempfile.TemporaryDirectory() as tmpd: sockpath = os.path.join(tmpd, 'nons') holdcmd = f'{NSTOOL_BIN} hold {sockpath}' try: with REAL_HOST.bg(holdcmd, ignore_status=True, context_timeout=0.1): with NsToolSite("fake ns", sockpath) as site: yield site finally: os.remove(sockpath)