1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
| | #! /usr/bin/env avocado-runner-avocado-classless
# SPDX-License-Identifier: GPL-2.0-or-later
#
# Copyright Red Hat
# Author: David Gibson <david@gibson.dropbear.id.au>
"""
Test A Simple Socket Transport
dhcp.py - Helpers for testing DHCP & DHCPv6
"""
import contextlib
import dataclasses
import ipaddress
import os
import tempfile
from typing import Iterator, Literal
import exeter
from . import cmdsite, ip, unshare, veth
DHCLIENT = '/sbin/dhclient'
def _dhclient(site: cmdsite.CmdSite, ifname: str, ipv: Literal['4', '6']) \
-> Iterator[None]:
with tempfile.TemporaryDirectory() as tmpdir:
pidfile = os.path.join(tmpdir, 'dhclient.pid')
leasefile = os.path.join(tmpdir, 'dhclient.leases')
# We need '-nc' because we may be running with
# capabilities but not UID 0. Without -nc dhclient drops
# capabilities before invoking dhclient-script, so it's
# unable to actually configure the interface
opts = [f'-{ipv}', '-v', '-nc', '-pf', f'{pidfile}',
'-lf', f'{leasefile}', f'{ifname}']
site.fg(f'{DHCLIENT}', *opts, privilege=True)
yield
site.fg(f'{DHCLIENT}', '-x', '-pf', f'{pidfile}', privilege=True)
@contextlib.contextmanager
def dhclient4(site: cmdsite.CmdSite, ifname: str) -> Iterator[None]:
yield from _dhclient(site, ifname, '4')
@contextlib.contextmanager
def dhclient6(site: cmdsite.CmdSite, ifname: str) -> Iterator[None]:
yield from _dhclient(site, ifname, '6')
@dataclasses.dataclass
class Dhcp4Scenario(exeter.Scenario):
client: cmdsite.CmdSite
ifname: str
addr: ip.Addr
gateway: ip.Addr
mtu: int
@exeter.scenariotest
def dhcp_addr(self) -> None:
with dhclient4(self.client, self.ifname):
(actual_addr,) = ip.addrs(self.client, self.ifname,
family='inet', scope='global')
exeter.assert_eq(actual_addr.ip, self.addr)
@exeter.scenariotest
def dhcp_route(self) -> None:
with dhclient4(self.client, self.ifname):
(defroute,) = ip.routes4(self.client, dst='default')
exeter.assert_eq(ipaddress.ip_address(defroute['gateway']),
self.gateway)
@exeter.scenariotest
def dhcp_mtu(self) -> None:
with dhclient4(self.client, self.ifname):
exeter.assert_eq(ip.mtu(self.client, self.ifname), self.mtu)
DHCPD = 'dhcpd'
IFNAME = 'clientif'
SUBNET4 = ip.TEST_NET_1
ipa4 = ip.IpiAllocator(SUBNET4)
(SERVER_IP4,) = ipa4.next_ipis()
(CLIENT_IP4,) = ipa4.next_ipis()
@contextlib.contextmanager
def setup_dhcpd_common(ifname: str, server_ifname: str) \
-> Iterator[tuple[cmdsite.CmdSite, cmdsite.CmdSite, str]]:
with unshare.unshare('client', '-Un') as client, \
unshare.unshare('server', '-n',
parent=client, privilege=True) as server, \
veth.veth(client, ifname, server_ifname, server), \
tempfile.TemporaryDirectory() as tmpdir:
yield (client, server, tmpdir)
def setup_dhcpd4() -> Iterator[Dhcp4Scenario]:
server_ifname = 'serverif'
with setup_dhcpd_common(IFNAME, server_ifname) as (client, server, tmpdir):
# Configure dhcpd
confpath = os.path.join(tmpdir, 'dhcpd.conf')
open(confpath, 'w', encoding='UTF-8').write(
f'''subnet {SUBNET4.network_address} netmask {SUBNET4.netmask} {{
option routers {SERVER_IP4.ip};
range {CLIENT_IP4.ip} {CLIENT_IP4.ip};
}}'''
)
pidfile = os.path.join(tmpdir, 'dhcpd.pid')
leasepath = os.path.join(tmpdir, 'dhcpd.leases')
open(leasepath, 'wb').write(b'')
ip.ifup(server, 'lo')
ip.ifup(server, server_ifname, SERVER_IP4)
opts = ['-f', '-d', '-4', '-cf', f'{confpath}',
'-lf', f'{leasepath}', '-pf', f'{pidfile}']
server.fg(f'{DHCPD}', '-t', *opts) # test config
with server.bg(f'{DHCPD}', *opts, privilege=True,
check=False) as dhcpd:
# Configure the client
ip.ifup(client, 'lo')
yield Dhcp4Scenario(client=client, ifname=IFNAME,
addr=CLIENT_IP4.ip,
gateway=SERVER_IP4.ip, mtu=1500)
dhcpd.terminate()
@dataclasses.dataclass
class Dhcp6Scenario(exeter.Scenario):
client: cmdsite.CmdSite
ifname: str
addr: ip.Addr
@exeter.scenariotest
def dhcp6_addr(self) -> None:
with dhclient6(self.client, self.ifname):
addrs = [a.ip for a in ip.addrs(self.client, self.ifname,
family='inet6',
scope='global')]
assert self.addr in addrs # Might also have a SLAAC address
SUBNET6 = ip.TEST_NET6_TASST_A
ipa6 = ip.IpiAllocator(SUBNET6)
(SERVER_IP6,) = ipa6.next_ipis()
(CLIENT_IP6,) = ipa6.next_ipis()
def setup_dhcpd6() -> Iterator[Dhcp6Scenario]:
server_ifname = 'serverif'
with setup_dhcpd_common(IFNAME, server_ifname) as (client, server, tmpdir):
# Sort out link local addressing
ip.ifup(server, 'lo')
ip.ifup(server, server_ifname, SERVER_IP6)
ip.ifup(client, 'lo')
ip.ifup(client, IFNAME)
ip.addr_wait(server, server_ifname, family='inet6', scope='link')
# Configure the DHCP server
confpath = os.path.join(tmpdir, 'dhcpd.conf')
open(confpath, 'w', encoding='UTF-8').write(
f'''subnet6 {SUBNET6} {{
range6 {CLIENT_IP6.ip} {CLIENT_IP6.ip};
}}''')
pidfile = os.path.join(tmpdir, 'dhcpd.pid')
leasepath = os.path.join(tmpdir, 'dhcpd.leases')
open(leasepath, 'wb').write(b'')
opts = ['-f', '-d', '-6', '-cf', f'{confpath}',
'-lf', f'{leasepath}', '-pf', f'{pidfile}']
server.fg(f'{DHCPD}', '-t', *opts) # test config
with server.bg(f'{DHCPD}', *opts, privilege=True,
check=False) as dhcpd:
yield Dhcp6Scenario(client=client, ifname=IFNAME,
addr=CLIENT_IP6.ip)
dhcpd.terminate()
def selftests() -> None:
Dhcp4Scenario.test(setup_dhcpd4)
Dhcp6Scenario.test(setup_dhcpd6)
|