#! /usr/bin/env python3 # SPDX-License-Identifier: GPL-2.0-or-later # # Copyright Red Hat # Author: David Gibson """ Test A Simple Socket Transport nstool.py - Run commands in namespaces via 'nstool' """ import contextlib import os import subprocess import tempfile import exeter from .snh import RealHost, SimNetHost # FIXME: Can this be made more portable? UNIX_PATH_MAX = 108 NSTOOL_BIN = 'test/nstool' class NsTool(SimNetHost): """A bundle of Linux namespaces managed by nstool""" def __init__(self, name, sockpath, parent=RealHost()): if len(sockpath) > UNIX_PATH_MAX: raise ValueError( f'Unix domain socket path "{sockpath}" is too long' ) super().__init__(name) self.sockpath = sockpath self.parent = parent self._pid = None def __enter__(self): cmd = [f'{NSTOOL_BIN}', 'info', '-wp', f'{self.sockpath}'] pid = self.parent.output(*cmd, timeout=1) self._pid = int(pid) return self def __exit__(self, *exc_details): pass # PID of the nstool hold process as seen by the parent snh def pid(self): return self._pid # PID of the nstool hold process as seen by another snh which can # see the nstool socket (important when using PID namespaces) def relative_pid(self, relative_to): cmd = [f'{NSTOOL_BIN}', 'info', '-p', f'{self.sockpath}'] relpid = relative_to.output(*cmd) return int(relpid) def hostify(self, *cmd, capable=False, **kwargs): hostcmd = [f'{NSTOOL_BIN}', 'exec'] if capable: hostcmd.append('--keep-caps') hostcmd += [self.sockpath, '--'] hostcmd += list(cmd) return hostcmd, kwargs def veth(self, ifname, peername, peer=None): self.fg('ip', 'link', 'add', f'{ifname}', 'type', 'veth', 'peer', 'name', f'{peername}', capable=True) if peer is not None: if not isinstance(peer, NsTool): raise TypeError self.fg('ip', 'link', 'set', f'{peername}', 'netns', f'{peer.relative_pid(self)}', capable=True) @contextlib.contextmanager def unshare_snh(name, *opts, parent=RealHost(), capable=False): # Create path for temporary nstool Unix socket with tempfile.TemporaryDirectory() as tmpd: sockpath = os.path.join(tmpd, name) cmd = ['unshare'] + list(opts) cmd += ['--', f'{NSTOOL_BIN}', 'hold', f'{sockpath}'] with parent.bg(*cmd, capable=capable) as holder: try: with NsTool(name, sockpath, parent=parent) as snh: yield snh finally: try: parent.fg(f'{NSTOOL_BIN}', 'stop', f'{sockpath}') finally: try: holder.run(timeout=0.1) holder.kill() finally: try: os.remove(sockpath) except FileNotFoundError: pass TEST_EXC = ValueError def test_sockdir_cleanup(s): def mess(sockpaths): with s as snh: ns = snh while isinstance(ns, NsTool): sockpaths.append(ns.sockpath) ns = ns.parent raise TEST_EXC sockpaths = [] exeter.assert_raises(TEST_EXC, mess, sockpaths) assert sockpaths for path in sockpaths: assert not os.path.exists(os.path.dirname(path)) @contextlib.contextmanager def userns_snh(): with unshare_snh('usernetns', '-Ucn') as ns: ns.ifup('lo') yield ns @exeter.test def test_userns(): cmd = ['capsh', '--has-p=CAP_SETUID'] with RealHost() as realhost: status = realhost.fg(*cmd, check=False) assert status.returncode != 0 with userns_snh() as ns: ns.fg(*cmd, capable=True) @contextlib.contextmanager def nested_snh(): with unshare_snh('userns', '-Uc') as userns: with unshare_snh('netns', '-n', parent=userns, capable=True) as netns: netns.ifup('lo') yield netns @contextlib.contextmanager def pidns_snh(): with unshare_snh('pidns', '-Upfn') as ns: ns.ifup('lo') yield ns @exeter.test def test_relative_pid(): with pidns_snh() as snh: # The holder is init (pid 1) within its own pidns exeter.assert_eq(snh.relative_pid(snh), 1) # General tests for all the nstool examples for setup in [userns_snh, nested_snh, pidns_snh]: # Common snh tests SimNetHost.selftest_isolated(setup) exeter.register_pipe(f'{setup.__qualname__}|test_sockdir_cleanup', setup, test_sockdir_cleanup) @contextlib.contextmanager def connect_snh(): with tempfile.TemporaryDirectory() as tmpd: sockpath = os.path.join(tmpd, 'nons') holdcmd = [f'{NSTOOL_BIN}', 'hold', f'{sockpath}'] with subprocess.Popen(holdcmd) as holder: try: with NsTool("fakens", sockpath) as snh: yield snh finally: holder.kill() os.remove(sockpath) SimNetHost.selftest(connect_snh)