public inbox for passt-dev@passt.top
 help / color / mirror / code / Atom feed
blob 7d9b1c1174ed4804db2764c45f6439bc38414b40 7674 bytes (raw)

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
 
#! /usr/bin/env python3

# SPDX-License-Identifier: GPL-2.0-or-later
#
# Copyright Red Hat
# Author: David Gibson <david@gibson.dropbear.id.au>

"""
Test A Simple Socket Transport

tasst/ip.py - Configure and read IP on simulated sites
"""

from __future__ import annotations

import contextlib
import dataclasses
import ipaddress
import json
from typing import Any, Iterator, Literal, Sequence, cast

import exeter

from . import cmdsite, unshare

Addr = ipaddress.IPv4Address | ipaddress.IPv6Address
AddrMask = ipaddress.IPv4Interface | ipaddress.IPv6Interface
Net = ipaddress.IPv4Network | ipaddress.IPv6Network


# Loopback addresses, for convenience
LOOPBACK4 = ipaddress.ip_address('127.0.0.1')
LOOPBACK6 = ipaddress.ip_address('::1')

# Documentation test networks defined by RFC 5737
TEST_NET_1 = ipaddress.ip_network('192.0.2.0/24')
TEST_NET_2 = ipaddress.ip_network('198.51.100.0/24')
TEST_NET_3 = ipaddress.ip_network('203.0.113.0/24')

# Documentation test network defined by RFC 3849
TEST_NET6 = ipaddress.ip_network('2001:db8::/32')
# Some subnets of that for our usage
TEST_NET6_TASST_A = ipaddress.ip_network('2001:db8:9a55:aaaa::/64')
TEST_NET6_TASST_B = ipaddress.ip_network('2001:db8:9a55:bbbb::/64')
TEST_NET6_TASST_C = ipaddress.ip_network('2001:db8:9a55:cccc::/64')


class IpiAllocator:
    """IP address allocator"""

    DEFAULT_NETS = (TEST_NET_1, TEST_NET6_TASST_A)

    def __init__(self, *nets: Net | str) -> None:
        if not nets:
            nets = self.DEFAULT_NETS

        self.nets = [ipaddress.ip_network(n) for n in nets]
        self.hostses = [n.hosts() for n in self.nets]

    def next_ipis(self) \
            -> list[ipaddress.IPv4Interface | ipaddress.IPv6Interface]:
        addrs = [next(h) for h in self.hostses]
        return [ipaddress.ip_interface(f'{a}/{n.prefixlen}')
                for a, n in zip(addrs, self.nets)]


def ifs(site: cmdsite.CmdSite) -> Sequence[str]:
    info = json.loads(site.output('ip', '-j', 'link', 'show'))
    return [i['ifname'] for i in info]


def ifup(site: cmdsite.CmdSite, ifname: str, *addrs: AddrMask,
         dad: Literal['disable', 'optimistic', None] = None) -> None:
    if dad == 'disable':
        site.fg('sysctl', f'net.ipv6.conf.{ifname}.accept_dad=0',
                privilege=True)
    elif dad == 'optimistic':
        site.fg('sysctl', f'net.ipv6.conf.{ifname}.optimistic_dad=1',
                privilege=True)
    elif dad is not None:
        raise ValueError

    for a in addrs:
        site.fg('ip', 'addr', 'add', f'{a.with_prefixlen}',
                'dev', ifname, privilege=True)

    site.fg('ip', 'link', 'set', ifname, 'up', privilege=True)


def addrs(site: cmdsite.CmdSite, ifname: str, **criteria: str) \
        -> Sequence[AddrMask]:
    info = json.loads(site.output('ip', '-j', 'addr', 'show', f'{ifname}'))
    assert len(info) == 1  # We specified a specific interface

    ais = [ai for ai in info[0]['addr_info']]
    for key, value in criteria.items():
        ais = [ai for ai in ais if key in ai and ai[key] == value]

    # Return just the parsed, non-tentative addresses
    return [ipaddress.ip_interface(f'{ai["local"]}/{ai["prefixlen"]}')
            for ai in ais if 'tentative' not in ai]


def addr_wait(site: cmdsite.CmdSite, ifname: str, **criteria: str) \
        -> Sequence[AddrMask]:
    while True:
        a = addrs(site, ifname, **criteria)
        if a:
            return a


def mtu(site: cmdsite.CmdSite, ifname: str) -> int:
    cmd = ['ip', '-j', 'link', 'show', ifname]
    (info,) = json.loads(site.output(*cmd))
    return cast(int, info['mtu'])


def _routes(site: cmdsite.CmdSite, ipv: str, **criteria: str) -> Any:
    routes = json.loads(site.output('ip', '-j', f'-{ipv}', 'route'))
    for key, value in criteria.items():
        routes = [r for r in routes if key in r and r[key] == value]

    return routes


def routes4(site: cmdsite.CmdSite, **criteria: str) -> Any:
    return _routes(site, '4', **criteria)


def routes6(site: cmdsite.CmdSite, **criteria: str) -> Any:
    return _routes(site, '6', **criteria)


@dataclasses.dataclass
class BaseNetScenario(exeter.Scenario):
    """Test that a site has sane looking basic networking"""
    site: cmdsite.CmdSite

    @exeter.scenariotest
    def has_lo(self) -> None:
        assert 'lo' in ifs(self.site)

    @exeter.scenariotest
    def lo_addrs(self) -> None:
        expected = set(ipaddress.ip_interface(a)
                       for a in ['127.0.0.1/8', '::1/128'])
        assert set(addrs(self.site, 'lo')) == expected

    @exeter.scenariotest
    def lo_mtu(self) -> None:
        exeter.assert_eq(mtu(self.site, 'lo'), 65536)


@dataclasses.dataclass
class IsolatedNetScenario(BaseNetScenario):
    @exeter.scenariotest
    def is_isolated(self) -> None:
        exeter.assert_eq(list(ifs(self.site)), ['lo'])


def selftests() -> None:
    @BaseNetScenario.test
    def host() -> Iterator[BaseNetScenario]:
        yield BaseNetScenario(cmdsite.BUILD_HOST)

    @IsolatedNetScenario.test
    def netns() -> Iterator[IsolatedNetScenario]:
        with unshare.unshare("netns", "-Ucn") as ns:
            ifup(ns, 'lo')
            yield IsolatedNetScenario(ns)

    ifname = 'dummy0'
    dummy_ips = {ipaddress.ip_interface(a) for a in
                 ['192.0.2.1/24', '2001:db8:9a55::1/112', '10.1.2.3/8']}
    dummy_routes4 = {i.network for i in dummy_ips
                     if isinstance(i, ipaddress.IPv4Interface)}
    dummy_routes6 = {i.network for i in dummy_ips
                     if isinstance(i, ipaddress.IPv6Interface)}
    dummy_routes6.add(ipaddress.IPv6Network('fe80::/64'))

    @contextlib.contextmanager
    def dummy_setup() -> Iterator[cmdsite.CmdSite]:
        with unshare.unshare('dummy', '-Un') as site:
            site.fg('ip', 'link', 'add', 'type', 'dummy', privilege=True)
            ifup(site, 'lo')
            ifup(site, ifname, *dummy_ips, dad='disable')
            yield site

    @exeter.test
    def test_addr() -> None:
        with dummy_setup() as site:
            actual = set(addrs(site, ifname, scope='global'))
            exeter.assert_eq(actual, dummy_ips)

    @exeter.test
    def test_routes4() -> None:
        with dummy_setup() as site:
            actual = set(ipaddress.ip_interface(r['dst']).network
                         for r in routes4(site, dev=ifname))
            exeter.assert_eq(actual, dummy_routes4)

    @exeter.test
    def test_routes6() -> None:
        with dummy_setup() as site:
            actual = set(ipaddress.ip_interface(r['dst']).network
                         for r in routes6(site, dev=ifname))
            exeter.assert_eq(actual, dummy_routes6)

    def ipa_test(nets: tuple[Net | str, ...], count: int = 12) -> None:
        ipa = IpiAllocator(*nets)

        addrsets: list[set[ipaddress.IPv4Address | ipaddress.IPv6Address]] \
            = [set() for i in range(len(nets))]
        for i in range(count):
            addrs = ipa.next_ipis()
            # Check we got as many addresses as expected
            exeter.assert_eq(len(addrs), len(nets))
            for s, a, n in zip(addrsets, addrs, nets):
                # Check the addresses belong to the right network
                exeter.assert_eq(a.network, ipaddress.ip_network(n))
                s.add(a)

        # Check the addresses are unique
        for s in addrsets:
            exeter.assert_eq(len(s), count)

    @exeter.test
    def ipa_test_default() -> None:
        ipa_test(nets=IpiAllocator.DEFAULT_NETS)

    @exeter.test
    def ipa_test_custom() -> None:
        ipa_test(nets=('10.55.0.0/16', '192.168.55.0/24',
                       'fd00:9a57:a000::/48'))

debug log:

solving 7d9b1c11 ...
found 7d9b1c11 in https://archives.passt.top/passt-dev/20240826020942.545155-10-david@gibson.dropbear.id.au/

applying [1/1] https://archives.passt.top/passt-dev/20240826020942.545155-10-david@gibson.dropbear.id.au/
diff --git a/test/tasst/ip.py b/test/tasst/ip.py
new file mode 100644
index 00000000..7d9b1c11

Checking patch test/tasst/ip.py...
Applied patch test/tasst/ip.py cleanly.

index at:
100644 7d9b1c1174ed4804db2764c45f6439bc38414b40	test/tasst/ip.py

Code repositories for project(s) associated with this public inbox

	https://passt.top/passt

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for IMAP folder(s).