public inbox for passt-dev@passt.top
 help / color / mirror / code / Atom feed
From: David Gibson <david@gibson.dropbear.id.au>
To: Stefano Brivio <sbrivio@redhat.com>, passt-dev@passt.top
Cc: Cleber Rosa <crosa@redhat.com>,
	David Gibson <david@gibson.dropbear.id.au>
Subject: [PATCH v3 08/15] tasst/unshare: Add helpers to run commands in Linux unshared namespaces
Date: Mon, 26 Aug 2024 12:09:35 +1000	[thread overview]
Message-ID: <20240826020942.545155-9-david@gibson.dropbear.id.au> (raw)
In-Reply-To: <20240826020942.545155-1-david@gibson.dropbear.id.au>

Use our existing nstool C helper, add python wrappers to easily run
commands in various namespaces.

Signed-off-by: David Gibson <david@gibson.dropbear.id.au>
---
 test/tasst/__main__.py |   1 +
 test/tasst/unshare.py  | 166 +++++++++++++++++++++++++++++++++++++++++
 2 files changed, 167 insertions(+)
 create mode 100644 test/tasst/unshare.py

diff --git a/test/tasst/__main__.py b/test/tasst/__main__.py
index 4ea4c593..9cba8985 100644
--- a/test/tasst/__main__.py
+++ b/test/tasst/__main__.py
@@ -17,6 +17,7 @@ import exeter
 
 MODULES = [
     'cmdsite',
+    'unshare',
 ]
 
 
diff --git a/test/tasst/unshare.py b/test/tasst/unshare.py
new file mode 100644
index 00000000..15b760b5
--- /dev/null
+++ b/test/tasst/unshare.py
@@ -0,0 +1,166 @@
+#! /usr/bin/env python3
+
+# SPDX-License-Identifier: GPL-2.0-or-later
+#
+# Copyright Red Hat
+# Author: David Gibson <david@gibson.dropbear.id.au>
+
+"""
+Test A Simple Socket Transport
+
+unshare.py - Create and run commands in Linux namespaces
+"""
+
+import contextlib
+import os
+import subprocess
+import tempfile
+from typing import Any, Callable, Iterator
+
+import exeter
+
+from . import cmdsite
+
+
+# FIXME: Can this be made more portable?
+UNIX_PATH_MAX = 108
+
+NSTOOL_BIN = './nstool'
+
+
+class Unshare(cmdsite.CmdSite):
+    """A bundle of Linux namespaces managed by nstool"""
+
+    sockpath: str
+    parent: cmdsite.CmdSite
+    _pid: int
+
+    def __init__(self, name: str, sockpath: str,
+                 parent: cmdsite.CmdSite = cmdsite.BUILD_HOST,
+                 parent_priv: bool = False) -> None:
+        if len(sockpath) > UNIX_PATH_MAX:
+            raise ValueError(
+                f'Unix domain socket path "{sockpath}" is too long'
+            )
+
+        super().__init__(name)
+        self.sockpath = sockpath
+        self.parent = parent
+        self.parent_priv = parent_priv
+        self.parent.fg(NSTOOL_BIN, 'info', '-wp', self.sockpath, timeout=1)
+
+    # PID of the nstool hold process as seen by another site which can
+    # see the nstool socket (important when using PID namespaces)
+    def relative_pid(self, relative_to: cmdsite.CmdSite) -> int | None:
+        cmd = [NSTOOL_BIN, 'info', '-p', self.sockpath]
+        relpid = int(relative_to.output(*cmd))
+        if not relpid:
+            return None
+        return relpid
+
+    def popen(self, *cmd: str, privilege: bool = False,
+              **kwargs: Any) -> subprocess.Popen[bytes]:
+        hostcmd = [NSTOOL_BIN, 'exec']
+        if privilege:
+            hostcmd.append('--keep-caps')
+        hostcmd += [self.sockpath, '--']
+        hostcmd += list(cmd)
+        return self.parent.popen(*hostcmd, privilege=self.parent_priv,
+                                 **kwargs)
+
+
+@contextlib.contextmanager
+def unshare(name: str, *opts: str,
+            parent: cmdsite.CmdSite = cmdsite.BUILD_HOST,
+            privilege: bool = False) -> Iterator[Unshare]:
+    # Create path for temporary nstool Unix socket
+    with tempfile.TemporaryDirectory() as tmpd:
+        sockpath = os.path.join(tmpd, name)
+        cmd = ['unshare'] + list(opts)
+        cmd += ['--', NSTOOL_BIN, 'hold', sockpath]
+        with parent.bg(*cmd, privilege=privilege) as holder:
+            try:
+                yield Unshare(name, sockpath, parent=parent,
+                              parent_priv=privilege)
+            finally:
+                try:
+                    parent.fg(NSTOOL_BIN, 'stop', sockpath)
+                finally:
+                    try:
+                        holder.run(timeout=0.1)
+                        holder.kill()
+                    finally:
+                        try:
+                            os.remove(sockpath)
+                        except FileNotFoundError:
+                            pass
+
+
+def _userns_setup() -> Iterator[cmdsite.CmdSite]:
+    with unshare('usernetns', '-Ucn') as site:
+        yield site
+
+
+def _nested_setup() -> Iterator[cmdsite.CmdSite]:
+    with unshare('userns', '-Uc') as userns:
+        with unshare('netns', '-n', parent=userns, privilege=True) as netns:
+            yield netns
+
+
+def _pidns_setup() -> Iterator[cmdsite.CmdSite]:
+    with unshare('pidns', '-Upfn') as site:
+        yield site
+
+
+def connect_site() -> Iterator[Unshare]:
+    with tempfile.TemporaryDirectory() as tmpd:
+        sockpath = os.path.join(tmpd, 'nons')
+        holdcmd = [NSTOOL_BIN, 'hold', sockpath]
+        with subprocess.Popen(holdcmd) as holder:
+            try:
+                yield Unshare("fakens", sockpath)
+            finally:
+                holder.kill()
+                os.remove(sockpath)
+
+
+def selftests() -> None:
+    @exeter.test
+    def test_userns() -> None:
+        cmd = ['capsh', '--has-p=CAP_SETUID']
+        status = cmdsite.BUILD_HOST.fg(*cmd, check=False)
+        assert status.returncode != 0
+        with unshare('userns', '-Ucn') as ns:
+            ns.fg(*cmd, privilege=True)
+
+    @exeter.test
+    def test_relative_pid() -> None:
+        with unshare('pidns', '-Upfn') as site:
+            # The holder is init (pid 1) within its own pidns
+            exeter.assert_eq(site.relative_pid(site), 1)
+
+    def sockdir_cleanup(setup: Callable[[], Iterator[cmdsite.CmdSite]]) \
+            -> None:
+        cm = contextlib.contextmanager(setup)
+
+        def mess(sockpaths: list[str]) -> None:
+            with cm() as site:
+                while isinstance(site, Unshare):
+                    sockpaths.append(site.sockpath)
+                    site = site.parent
+
+        sockpaths: list[str] = []
+        mess(sockpaths)
+        assert sockpaths
+        for path in sockpaths:
+            assert not os.path.exists(os.path.dirname(path))
+
+    # General tests for all the nstool examples
+    for setup in [_userns_setup, _nested_setup, _pidns_setup]:
+        # Common cmdsite.CmdSite & NetSite tests
+        cmdsite.CmdSite.test(setup)
+
+        exeter.register(f'{setup.__qualname__}|sockdir_cleanup',
+                        sockdir_cleanup, setup)
+
+    cmdsite.CmdSite.test(connect_site)
-- 
@@ -0,0 +1,166 @@
+#! /usr/bin/env python3
+
+# SPDX-License-Identifier: GPL-2.0-or-later
+#
+# Copyright Red Hat
+# Author: David Gibson <david@gibson.dropbear.id.au>
+
+"""
+Test A Simple Socket Transport
+
+unshare.py - Create and run commands in Linux namespaces
+"""
+
+import contextlib
+import os
+import subprocess
+import tempfile
+from typing import Any, Callable, Iterator
+
+import exeter
+
+from . import cmdsite
+
+
+# FIXME: Can this be made more portable?
+UNIX_PATH_MAX = 108
+
+NSTOOL_BIN = './nstool'
+
+
+class Unshare(cmdsite.CmdSite):
+    """A bundle of Linux namespaces managed by nstool"""
+
+    sockpath: str
+    parent: cmdsite.CmdSite
+    _pid: int
+
+    def __init__(self, name: str, sockpath: str,
+                 parent: cmdsite.CmdSite = cmdsite.BUILD_HOST,
+                 parent_priv: bool = False) -> None:
+        if len(sockpath) > UNIX_PATH_MAX:
+            raise ValueError(
+                f'Unix domain socket path "{sockpath}" is too long'
+            )
+
+        super().__init__(name)
+        self.sockpath = sockpath
+        self.parent = parent
+        self.parent_priv = parent_priv
+        self.parent.fg(NSTOOL_BIN, 'info', '-wp', self.sockpath, timeout=1)
+
+    # PID of the nstool hold process as seen by another site which can
+    # see the nstool socket (important when using PID namespaces)
+    def relative_pid(self, relative_to: cmdsite.CmdSite) -> int | None:
+        cmd = [NSTOOL_BIN, 'info', '-p', self.sockpath]
+        relpid = int(relative_to.output(*cmd))
+        if not relpid:
+            return None
+        return relpid
+
+    def popen(self, *cmd: str, privilege: bool = False,
+              **kwargs: Any) -> subprocess.Popen[bytes]:
+        hostcmd = [NSTOOL_BIN, 'exec']
+        if privilege:
+            hostcmd.append('--keep-caps')
+        hostcmd += [self.sockpath, '--']
+        hostcmd += list(cmd)
+        return self.parent.popen(*hostcmd, privilege=self.parent_priv,
+                                 **kwargs)
+
+
+@contextlib.contextmanager
+def unshare(name: str, *opts: str,
+            parent: cmdsite.CmdSite = cmdsite.BUILD_HOST,
+            privilege: bool = False) -> Iterator[Unshare]:
+    # Create path for temporary nstool Unix socket
+    with tempfile.TemporaryDirectory() as tmpd:
+        sockpath = os.path.join(tmpd, name)
+        cmd = ['unshare'] + list(opts)
+        cmd += ['--', NSTOOL_BIN, 'hold', sockpath]
+        with parent.bg(*cmd, privilege=privilege) as holder:
+            try:
+                yield Unshare(name, sockpath, parent=parent,
+                              parent_priv=privilege)
+            finally:
+                try:
+                    parent.fg(NSTOOL_BIN, 'stop', sockpath)
+                finally:
+                    try:
+                        holder.run(timeout=0.1)
+                        holder.kill()
+                    finally:
+                        try:
+                            os.remove(sockpath)
+                        except FileNotFoundError:
+                            pass
+
+
+def _userns_setup() -> Iterator[cmdsite.CmdSite]:
+    with unshare('usernetns', '-Ucn') as site:
+        yield site
+
+
+def _nested_setup() -> Iterator[cmdsite.CmdSite]:
+    with unshare('userns', '-Uc') as userns:
+        with unshare('netns', '-n', parent=userns, privilege=True) as netns:
+            yield netns
+
+
+def _pidns_setup() -> Iterator[cmdsite.CmdSite]:
+    with unshare('pidns', '-Upfn') as site:
+        yield site
+
+
+def connect_site() -> Iterator[Unshare]:
+    with tempfile.TemporaryDirectory() as tmpd:
+        sockpath = os.path.join(tmpd, 'nons')
+        holdcmd = [NSTOOL_BIN, 'hold', sockpath]
+        with subprocess.Popen(holdcmd) as holder:
+            try:
+                yield Unshare("fakens", sockpath)
+            finally:
+                holder.kill()
+                os.remove(sockpath)
+
+
+def selftests() -> None:
+    @exeter.test
+    def test_userns() -> None:
+        cmd = ['capsh', '--has-p=CAP_SETUID']
+        status = cmdsite.BUILD_HOST.fg(*cmd, check=False)
+        assert status.returncode != 0
+        with unshare('userns', '-Ucn') as ns:
+            ns.fg(*cmd, privilege=True)
+
+    @exeter.test
+    def test_relative_pid() -> None:
+        with unshare('pidns', '-Upfn') as site:
+            # The holder is init (pid 1) within its own pidns
+            exeter.assert_eq(site.relative_pid(site), 1)
+
+    def sockdir_cleanup(setup: Callable[[], Iterator[cmdsite.CmdSite]]) \
+            -> None:
+        cm = contextlib.contextmanager(setup)
+
+        def mess(sockpaths: list[str]) -> None:
+            with cm() as site:
+                while isinstance(site, Unshare):
+                    sockpaths.append(site.sockpath)
+                    site = site.parent
+
+        sockpaths: list[str] = []
+        mess(sockpaths)
+        assert sockpaths
+        for path in sockpaths:
+            assert not os.path.exists(os.path.dirname(path))
+
+    # General tests for all the nstool examples
+    for setup in [_userns_setup, _nested_setup, _pidns_setup]:
+        # Common cmdsite.CmdSite & NetSite tests
+        cmdsite.CmdSite.test(setup)
+
+        exeter.register(f'{setup.__qualname__}|sockdir_cleanup',
+                        sockdir_cleanup, setup)
+
+    cmdsite.CmdSite.test(connect_site)
-- 
2.46.0


  parent reply	other threads:[~2024-08-26  2:10 UTC|newest]

Thread overview: 16+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2024-08-26  2:09 [PATCH v3 00/15] RFC: Proof-of-concept based exeter+Avocado tests David Gibson
2024-08-26  2:09 ` [PATCH v3 01/15] test: run static checkers with Avocado and JSON definitions David Gibson
2024-08-26  2:09 ` [PATCH v3 02/15] test: Adjust how we invoke tests with run_avocado David Gibson
2024-08-26  2:09 ` [PATCH v3 03/15] test: Extend make targets to run Avocado tests David Gibson
2024-08-26  2:09 ` [PATCH v3 04/15] test: Exeter based static tests David Gibson
2024-08-26  2:09 ` [PATCH v3 05/15] tasst: Support library and linters for tests in Python David Gibson
2024-08-26  2:09 ` [PATCH v3 06/15] tasst/cmdsite: Base helpers for running shell commands in various places David Gibson
2024-08-26  2:09 ` [PATCH v3 07/15] test: Add exeter+Avocado based build tests David Gibson
2024-08-26  2:09 ` David Gibson [this message]
2024-08-26  2:09 ` [PATCH v3 09/15] tasst/ip: Helpers for configuring IPv4 and IPv6 David Gibson
2024-08-26  2:09 ` [PATCH v3 10/15] tasst/veth: Helpers for constructing veth devices between namespaces David Gibson
2024-08-26  2:09 ` [PATCH v3 11/15] tasst: Helpers to test transferring data between sites David Gibson
2024-08-26  2:09 ` [PATCH v3 12/15] tasst: Helpers for testing NDP behaviour David Gibson
2024-08-26  2:09 ` [PATCH v3 13/15] tasst: Helpers for testing DHCP & DHCPv6 behaviour David Gibson
2024-08-26  2:09 ` [PATCH v3 14/15] tasst: Helpers to construct a simple network environment for tests David Gibson
2024-08-26  2:09 ` [PATCH v3 15/15] avocado: Convert basic pasta tests David Gibson

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=20240826020942.545155-9-david@gibson.dropbear.id.au \
    --to=david@gibson.dropbear.id.au \
    --cc=crosa@redhat.com \
    --cc=passt-dev@passt.top \
    --cc=sbrivio@redhat.com \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
Code repositories for project(s) associated with this public inbox

	https://passt.top/passt

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for IMAP folder(s).