1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
| | #! /usr/bin/env python3
# SPDX-License-Identifier: GPL-2.0-or-later
#
# Copyright Red Hat
# Author: David Gibson <david@gibson.dropbear.id.au>
"""
Test A Simple Socket Transport
scenario/simple.py - Smallest sensible network to use passt/pasta
"""
import contextlib
from typing import Iterator
from .. import ip, transfer, unshare, veth
class __SimpleNet: # pylint: disable=R0903
"""A simple network setup scenario
The sample network has 2 sites (network namespaces) connected with
a veth link:
[simhost] <-veth-> [gw]
gw is set up as the default router for simhost.
simhost has addresses:
self.IP4 (IPv4), self.IP6 (IPv6), self.ip6_ll (IPv6 link local)
gw has addresses:
self.GW_IP4 (IPv4), self.GW_IP6 (IPv6),
self.gw_ip6_ll (IPv6 link local)
self.REMOTE_IP4 (IPv4), self.REMOTE_IP6 (IPv6)
The "remote" addresses are on a different subnet from the others,
so the only way for simhost to reach them is via its default
route. This helps to exercise that we're actually using that,
rather than just local net routes.
"""
IFNAME = 'veth'
GW_IFNAME = 'gw' + IFNAME
ipa_local = ip.IpiAllocator()
(IP4, IP6) = ipa_local.next_ipis()
(GW_IP4, GW_IP6) = ipa_local.next_ipis()
ipa_remote = ip.IpiAllocator(ip.TEST_NET_2,
ip.TEST_NET6_TASST_B)
(REMOTE_IP4, REMOTE_IP6) = ipa_remote.next_ipis()
simhost: unshare.Unshare
gw: unshare.Unshare
def __init__(self, simhost: unshare.Unshare, gw: unshare.Unshare) -> None:
self.simhost = simhost
self.gw = gw
ip.ifup(self.gw, 'lo')
ip.ifup(self.gw, self.GW_IFNAME, self.GW_IP4, self.GW_IP6,
self.REMOTE_IP4, self.REMOTE_IP6)
ip.ifup(simhost, 'lo')
ip.ifup(simhost, self.IFNAME, self.IP4, self.IP6)
# Once link is up on both sides, SLAAC will run
self.gw_ip6_ll = ip.addr_wait(self.gw, self.GW_IFNAME,
family='inet6', scope='link')[0]
self.ip6_ll = ip.addr_wait(self.simhost, self.IFNAME,
family='inet6', scope='link')[0]
# Set up the default route
self.simhost.fg('ip', '-4', 'route', 'add', 'default',
'via', f'{self.GW_IP4.ip}', privilege=True)
self.simhost.fg('ip', '-6', 'route', 'add', 'default',
'via', f'{self.gw_ip6_ll.ip}', 'dev', self.IFNAME,
privilege=True)
@contextlib.contextmanager
def simple_net() -> Iterator[__SimpleNet]:
with unshare.unshare('simhost', '-Ucnpf', '--mount-proc') as simhost, \
unshare.unshare('gw', '-n', parent=simhost, privilege=True) as gw, \
veth.veth(simhost, __SimpleNet.IFNAME, __SimpleNet.GW_IFNAME, gw):
yield __SimpleNet(simhost, gw)
def simple_transfer4() -> Iterator[transfer.TransferScenario]:
with simple_net() as snet:
yield transfer.TransferScenario(client=snet.simhost,
server=snet.gw,
connect_ip=snet.REMOTE_IP4.ip,
connect_port=10000)
def simple_transfer6() -> Iterator[transfer.TransferScenario]:
with simple_net() as snet:
yield transfer.TransferScenario(client=snet.simhost,
server=snet.gw,
connect_ip=snet.REMOTE_IP6.ip,
connect_port=10000)
def selftests() -> None:
transfer.TransferScenario.test(simple_transfer4)
transfer.TransferScenario.test(simple_transfer6)
|