#! /usr/bin/python3 # SPDX-License-Identifier: GPL-2.0-or-later # # avocado/pasta_udp.py - Test TCP transfers with pasta # # Copyright Red Hat # Author: David Gibson import filecmp import os.path import common from pasta import PastaConfiguredTest class PastaUdp(PastaConfiguredTest): DATA_FILE = './test/medium.bin' def transfer(self, fromx, fromaddr, tox, toaddr): tmpfile = os.path.join(self.workdir, 'transfer.tmp') listener = tox.subprocess( 'socat -u {} OPEN:{},create,trunc'.format(toaddr, tmpfile)) listener.start() fromx.system_output( 'socat -u OPEN:{} {}'.format(self.DATA_FILE, fromaddr)) status = listener.wait() self.assertEquals(status, 0) self.assertTrue(filecmp.cmp(self.DATA_FILE, tmpfile, shallow=False)) def test_inbound_v4(self): self.transfer(self.outer, 'UDP4:127.0.0.1:{},shut-null'.format(self.INBOUND_FWD), self.inner, 'UDP4-LISTEN:{},null-eof,bind=127.0.0.1'.format(self.INBOUND_FWD)) def test_outbound_splice_v4(self): self.transfer(self.inner, 'UDP4:127.0.0.1:{},shut-null'.format(self.OUTBOUND_FWD), self.outer, 'UDP4-LISTEN:{},null-eof,bind=127.0.0.1'.format(self.OUTBOUND_FWD)) def test_outbound_tap_v4(self): self.transfer(self.inner, 'UDP4:{}:{},shut-null'.format(self.GW_IP4, self.OUTBOUND_FWD), self.outer, 'UDP4-LISTEN:{},null-eof'.format(self.OUTBOUND_FWD)) def test_inbound_v6(self): self.transfer(self.outer, 'UDP6:[::1]:{},shut-null'.format(self.INBOUND_FWD), self.inner, 'UDP6-LISTEN:{},null-eof,bind=[::1]'.format(self.INBOUND_FWD)) def test_outbound_splice_v6(self): self.transfer(self.inner, 'UDP6:[::1]:{},shut-null'.format(self.OUTBOUND_FWD), self.outer, 'UDP6-LISTEN:{},null-eof,bind=[::1]'.format(self.OUTBOUND_FWD)) def test_outbound_tap_v6(self): # We need the gateway route to be configured common.slaac_wait(self.innerx, self.IFNAME) self.transfer(self.inner, 'UDP6:[{}%{}]:{},shut-null'.format(self.gw_ll, self.IFNAME, self.OUTBOUND_FWD), self.outer, 'UDP6-LISTEN:{},null-eof'.format(self.OUTBOUND_FWD))