1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
| | #! /usr/bin/python3
# SPDX-License-Identifier: GPL-2.0-or-later
#
# avocado/pasta_udp.py - Test TCP transfers with pasta
#
# Copyright Red Hat
# Author: David Gibson <david@gibson.dropbear.id.au>
import filecmp
import os.path
import common
from pasta import PastaConfiguredTest
class PastaUdp(PastaConfiguredTest):
DATA_FILE = './test/medium.bin'
def transfer(self, fromx, fromaddr, tox, toaddr):
tmpfile = os.path.join(self.workdir, 'transfer.tmp')
listener = tox.subprocess(
'socat -u {} OPEN:{},create,trunc'.format(toaddr, tmpfile))
listener.start()
fromx.system_output(
'socat -u OPEN:{} {}'.format(self.DATA_FILE, fromaddr))
status = listener.wait()
self.assertEquals(status, 0)
self.assertTrue(filecmp.cmp(self.DATA_FILE, tmpfile, shallow=False))
def test_inbound_v4(self):
self.transfer(self.outer, 'UDP4:127.0.0.1:{},shut-null'.format(self.INBOUND_FWD),
self.inner, 'UDP4-LISTEN:{},null-eof,bind=127.0.0.1'.format(self.INBOUND_FWD))
def test_outbound_splice_v4(self):
self.transfer(self.inner, 'UDP4:127.0.0.1:{},shut-null'.format(self.OUTBOUND_FWD),
self.outer, 'UDP4-LISTEN:{},null-eof,bind=127.0.0.1'.format(self.OUTBOUND_FWD))
def test_outbound_tap_v4(self):
self.transfer(self.inner, 'UDP4:{}:{},shut-null'.format(self.GW_IP4, self.OUTBOUND_FWD),
self.outer, 'UDP4-LISTEN:{},null-eof'.format(self.OUTBOUND_FWD))
def test_inbound_v6(self):
self.transfer(self.outer, 'UDP6:[::1]:{},shut-null'.format(self.INBOUND_FWD),
self.inner, 'UDP6-LISTEN:{},null-eof,bind=[::1]'.format(self.INBOUND_FWD))
def test_outbound_splice_v6(self):
self.transfer(self.inner, 'UDP6:[::1]:{},shut-null'.format(self.OUTBOUND_FWD),
self.outer, 'UDP6-LISTEN:{},null-eof,bind=[::1]'.format(self.OUTBOUND_FWD))
def test_outbound_tap_v6(self):
# We need the gateway route to be configured
common.slaac_wait(self.innerx, self.IFNAME)
self.transfer(self.inner, 'UDP6:[{}%{}]:{},shut-null'.format(self.gw_ll, self.IFNAME, self.OUTBOUND_FWD),
self.outer, 'UDP6-LISTEN:{},null-eof'.format(self.OUTBOUND_FWD))
|