1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
| | #! /usr/bin/env python3
# SPDX-License-Identifier: GPL-2.0-or-later
#
# Copyright Red Hat
# Author: David Gibson <david@gibson.dropbear.id.au>
"""
Test A Simple Socket Transport
selftest/veth.py - Test various veth configurations
"""
import contextlib
from typing import Iterator, Literal
import ipaddress
import exeter
from . import cmdsite, ip, transfer, unshare
@contextlib.contextmanager
def veth(site: cmdsite.CmdSite, ifname: str,
peername: str, peer: unshare.Unshare | None = None) -> Iterator[None]:
site.fg('ip', 'link', 'add', ifname, 'type', 'veth',
'peer', 'name', peername, privilege=True)
if peer is not None:
site.fg('ip', 'link', 'set', peername,
'netns', f'{peer.relative_pid(site)}', privilege=True)
yield
site.fg('ip', 'link', 'del', ifname, privilege=True)
def selftests() -> None:
@contextlib.contextmanager
def veth_setup() -> Iterator[tuple[cmdsite.CmdSite, cmdsite.CmdSite]]:
with unshare.unshare('ns1', '-Un') as ns1:
with unshare.unshare('ns2', '-n', parent=ns1,
privilege=True) as ns2:
with veth(ns1, 'vetha', 'vethb', ns2):
yield (ns1, ns2)
@exeter.test
def test_ifs() -> None:
with veth_setup() as (ns1, ns2):
exeter.assert_eq(set(ip.ifs(ns1)), set(['lo', 'vetha']))
exeter.assert_eq(set(ip.ifs(ns2)), set(['lo', 'vethb']))
@exeter.test
def test_mtu() -> None:
with veth_setup() as (ns1, ns2):
exeter.assert_eq(ip.mtu(ns1, 'vetha'), 1500)
exeter.assert_eq(ip.mtu(ns2, 'vethb'), 1500)
def test_slaac(dad: Literal['disable', 'optimistic', None]) -> None:
TESTMAC = '02:aa:bb:cc:dd:ee'
TESTIP = ipaddress.ip_interface('fe80::aa:bbff:fecc:ddee/64')
with veth_setup() as (ns1, ns2):
ns1.fg('ip', 'link', 'set', 'dev', 'vetha', 'address', TESTMAC,
privilege=True)
ip.ifup(ns1, 'vetha', dad=dad)
ip.ifup(ns2, 'vethb')
addrs = ip.addr_wait(ns1, 'vetha', family='inet6', scope='link')
exeter.assert_eq(addrs, [TESTIP])
@exeter.test
def test_dad() -> None:
test_slaac(dad=None)
@exeter.test
def test_optimistic_dad() -> None:
test_slaac(dad='optimistic')
@exeter.test
def test_no_dad() -> None:
test_slaac(dad='disable')
def veth_transfer(ip1: ip.AddrMask, ip2: ip.AddrMask) \
-> Iterator[transfer.TransferScenario]:
with veth_setup() as (ns1, ns2):
ip.ifup(ns1, 'lo')
ip.ifup(ns1, 'vetha', ip1, dad='disable')
ip.ifup(ns2, 'lo')
ip.ifup(ns2, 'vethb', ip2, dad='disable')
yield transfer.TransferScenario(client=ns1, server=ns2,
connect_ip=ip2.ip,
connect_port=10000)
ipa = ip.IpiAllocator()
ns1_ip4, ns1_ip6 = ipa.next_ipis()
ns2_ip4, ns2_ip6 = ipa.next_ipis()
@transfer.TransferScenario.test
def veth_transfer4() -> Iterator[transfer.TransferScenario]:
yield from veth_transfer(ns1_ip4, ns2_ip4)
@transfer.TransferScenario.test
def veth_transfer6() -> Iterator[transfer.TransferScenario]:
yield from veth_transfer(ns1_ip6, ns2_ip6)
|