1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
| | #! /usr/bin/python3
# SPDX-License-Identifier: GPL-2.0-or-later
#
# Copyright Red Hat
# Author: David Gibson <david@gibson.dropbear.id.au>
"""
Test A Simple Socket Transport
pasta.py - Helpers for starting pasta
"""
import contextlib
from tasst.exesite import Site
from tasst.typecheck import typecheck
PASTA_BIN = '../pasta'
class Pasta(contextlib.AbstractContextManager):
"""A managed pasta instance"""
def __init__(self, hostsite, pidfile, extra_args, *, ns):
self.hostsite = typecheck(hostsite, Site)
self.hostsite.require_cmds(PASTA_BIN)
self.pidfile = typecheck(pidfile, str)
self.extra_args = typecheck(extra_args, str)
self.ns = typecheck(ns, Site)
self.proc = None
def __enter__(self):
relpid = self.ns.relative_pid(self.hostsite)
cmd = f'{PASTA_BIN} -f -P {self.pidfile} {self.extra_args} {relpid}'
self.proc = self.hostsite.bg(cmd, pidfile=self.pidfile)
self.proc.__enter__()
return self
def __exit__(self, *exc_details):
self.proc.__exit__(*exc_details)
|