1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
| | #! /usr/bin/env python3
# SPDX-License-Identifier: GPL-2.0-or-later
#
# Copyright Red Hat
# Author: David Gibson <david@gibson.dropbear.id.au>
"""
Test A Simple Socket Transport
tasst/ip.py - Configure and read IP on simulated sites
"""
from __future__ import annotations
import contextlib
import dataclasses
import ipaddress
import json
from typing import Any, Iterator, Literal, Sequence, cast
import exeter
from . import cmdsite, unshare
Addr = ipaddress.IPv4Address | ipaddress.IPv6Address
AddrMask = ipaddress.IPv4Interface | ipaddress.IPv6Interface
Net = ipaddress.IPv4Network | ipaddress.IPv6Network
# Loopback addresses, for convenience
LOOPBACK4 = ipaddress.ip_address('127.0.0.1')
LOOPBACK6 = ipaddress.ip_address('::1')
# Documentation test networks defined by RFC 5737
TEST_NET_1 = ipaddress.ip_network('192.0.2.0/24')
TEST_NET_2 = ipaddress.ip_network('198.51.100.0/24')
TEST_NET_3 = ipaddress.ip_network('203.0.113.0/24')
# Documentation test network defined by RFC 3849
TEST_NET6 = ipaddress.ip_network('2001:db8::/32')
# Some subnets of that for our usage
TEST_NET6_TASST_A = ipaddress.ip_network('2001:db8:9a55:aaaa::/64')
TEST_NET6_TASST_B = ipaddress.ip_network('2001:db8:9a55:bbbb::/64')
TEST_NET6_TASST_C = ipaddress.ip_network('2001:db8:9a55:cccc::/64')
class IpiAllocator:
"""IP address allocator"""
DEFAULT_NETS = (TEST_NET_1, TEST_NET6_TASST_A)
def __init__(self, *nets: Net | str) -> None:
if not nets:
nets = self.DEFAULT_NETS
self.nets = [ipaddress.ip_network(n) for n in nets]
self.hostses = [n.hosts() for n in self.nets]
def next_ipis(self) \
-> list[ipaddress.IPv4Interface | ipaddress.IPv6Interface]:
addrs = [next(h) for h in self.hostses]
return [ipaddress.ip_interface(f'{a}/{n.prefixlen}')
for a, n in zip(addrs, self.nets)]
def ifs(site: cmdsite.CmdSite) -> Sequence[str]:
info = json.loads(site.output('ip', '-j', 'link', 'show'))
return [i['ifname'] for i in info]
def ifup(site: cmdsite.CmdSite, ifname: str, *addrs: AddrMask,
dad: Literal['disable', 'optimistic', None] = None) -> None:
if dad == 'disable':
site.fg('sysctl', f'net.ipv6.conf.{ifname}.accept_dad=0',
privilege=True)
elif dad == 'optimistic':
site.fg('sysctl', f'net.ipv6.conf.{ifname}.optimistic_dad=1',
privilege=True)
elif dad is not None:
raise ValueError
for a in addrs:
site.fg('ip', 'addr', 'add', f'{a.with_prefixlen}',
'dev', ifname, privilege=True)
site.fg('ip', 'link', 'set', ifname, 'up', privilege=True)
def addrs(site: cmdsite.CmdSite, ifname: str, **criteria: str) \
-> Sequence[AddrMask]:
info = json.loads(site.output('ip', '-j', 'addr', 'show', f'{ifname}'))
assert len(info) == 1 # We specified a specific interface
ais = [ai for ai in info[0]['addr_info']]
for key, value in criteria.items():
ais = [ai for ai in ais if key in ai and ai[key] == value]
# Return just the parsed, non-tentative addresses
return [ipaddress.ip_interface(f'{ai["local"]}/{ai["prefixlen"]}')
for ai in ais if 'tentative' not in ai]
def addr_wait(site: cmdsite.CmdSite, ifname: str, **criteria: str) \
-> Sequence[AddrMask]:
while True:
a = addrs(site, ifname, **criteria)
if a:
return a
def mtu(site: cmdsite.CmdSite, ifname: str) -> int:
cmd = ['ip', '-j', 'link', 'show', ifname]
(info,) = json.loads(site.output(*cmd))
return cast(int, info['mtu'])
def _routes(site: cmdsite.CmdSite, ipv: str, **criteria: str) -> Any:
routes = json.loads(site.output('ip', '-j', f'-{ipv}', 'route'))
for key, value in criteria.items():
routes = [r for r in routes if key in r and r[key] == value]
return routes
def routes4(site: cmdsite.CmdSite, **criteria: str) -> Any:
return _routes(site, '4', **criteria)
def routes6(site: cmdsite.CmdSite, **criteria: str) -> Any:
return _routes(site, '6', **criteria)
@dataclasses.dataclass
class BaseNetScenario(exeter.Scenario):
"""Test that a site has sane looking basic networking"""
site: cmdsite.CmdSite
@exeter.scenariotest
def has_lo(self) -> None:
assert 'lo' in ifs(self.site)
@exeter.scenariotest
def lo_addrs(self) -> None:
expected = set(ipaddress.ip_interface(a)
for a in ['127.0.0.1/8', '::1/128'])
assert set(addrs(self.site, 'lo')) == expected
@exeter.scenariotest
def lo_mtu(self) -> None:
exeter.assert_eq(mtu(self.site, 'lo'), 65536)
@dataclasses.dataclass
class IsolatedNetScenario(BaseNetScenario):
@exeter.scenariotest
def is_isolated(self) -> None:
exeter.assert_eq(list(ifs(self.site)), ['lo'])
def selftests() -> None:
@BaseNetScenario.test
def host() -> Iterator[BaseNetScenario]:
yield BaseNetScenario(cmdsite.BUILD_HOST)
@IsolatedNetScenario.test
def netns() -> Iterator[IsolatedNetScenario]:
with unshare.unshare("netns", "-Ucn") as ns:
ifup(ns, 'lo')
yield IsolatedNetScenario(ns)
ifname = 'dummy0'
dummy_ips = {ipaddress.ip_interface(a) for a in
['192.0.2.1/24', '2001:db8:9a55::1/112', '10.1.2.3/8']}
dummy_routes4 = {i.network for i in dummy_ips
if isinstance(i, ipaddress.IPv4Interface)}
dummy_routes6 = {i.network for i in dummy_ips
if isinstance(i, ipaddress.IPv6Interface)}
dummy_routes6.add(ipaddress.IPv6Network('fe80::/64'))
@contextlib.contextmanager
def dummy_setup() -> Iterator[cmdsite.CmdSite]:
with unshare.unshare('dummy', '-Un') as site:
site.fg('ip', 'link', 'add', 'type', 'dummy', privilege=True)
ifup(site, 'lo')
ifup(site, ifname, *dummy_ips, dad='disable')
yield site
@exeter.test
def test_addr() -> None:
with dummy_setup() as site:
actual = set(addrs(site, ifname, scope='global'))
exeter.assert_eq(actual, dummy_ips)
@exeter.test
def test_routes4() -> None:
with dummy_setup() as site:
actual = set(ipaddress.ip_interface(r['dst']).network
for r in routes4(site, dev=ifname))
exeter.assert_eq(actual, dummy_routes4)
@exeter.test
def test_routes6() -> None:
with dummy_setup() as site:
actual = set(ipaddress.ip_interface(r['dst']).network
for r in routes6(site, dev=ifname))
exeter.assert_eq(actual, dummy_routes6)
def ipa_test(nets: tuple[Net | str, ...], count: int = 12) -> None:
ipa = IpiAllocator(*nets)
addrsets: list[set[ipaddress.IPv4Address | ipaddress.IPv6Address]] \
= [set() for i in range(len(nets))]
for i in range(count):
addrs = ipa.next_ipis()
# Check we got as many addresses as expected
exeter.assert_eq(len(addrs), len(nets))
for s, a, n in zip(addrsets, addrs, nets):
# Check the addresses belong to the right network
exeter.assert_eq(a.network, ipaddress.ip_network(n))
s.add(a)
# Check the addresses are unique
for s in addrsets:
exeter.assert_eq(len(s), count)
@exeter.test
def ipa_test_default() -> None:
ipa_test(nets=IpiAllocator.DEFAULT_NETS)
@exeter.test
def ipa_test_custom() -> None:
ipa_test(nets=('10.55.0.0/16', '192.168.55.0/24',
'fd00:9a57:a000::/48'))
|