#! /usr/bin/python3 # SPDX-License-Identifier: GPL-2.0-or-later # # avocado/pasta_tcp.py - Test TCP transfers with pasta # # Copyright Red Hat # Author: David Gibson import filecmp import os.path import common from pasta import PastaConfiguredTest class PastaTcpSmall(PastaConfiguredTest): DATA_FILE = './test/small.bin' def transfer(self, fromx, fromaddr, tox, toaddr): tmpfile = os.path.join(self.workdir, 'transfer.tmp') listener = tox.subprocess( 'socat -u {} OPEN:{},create,trunc'.format(toaddr, tmpfile)) listener.start() fromx.system_output( 'socat -u OPEN:{} {}'.format(self.DATA_FILE, fromaddr)) status = listener.wait() self.assertEquals(status, 0) self.assertTrue(filecmp.cmp(self.DATA_FILE, tmpfile, shallow=False)) def test_inbound_v4(self): self.transfer(self.outer, 'TCP4:127.0.0.1:{}'.format(self.INBOUND_FWD), self.inner, 'TCP4-LISTEN:{},bind=127.0.0.1'.format(self.INBOUND_FWD)) def test_outbound_splice_v4(self): self.transfer(self.inner, 'TCP4:127.0.0.1:{}'.format(self.OUTBOUND_FWD), self.outer, 'TCP4-LISTEN:{},bind=127.0.0.1'.format(self.OUTBOUND_FWD)) def test_outbound_tap_v4(self): self.transfer(self.inner, 'TCP4:{}:{}'.format(self.GW_IP4, self.OUTBOUND_FWD), self.outer, 'TCP4-LISTEN:{}'.format(self.OUTBOUND_FWD)) def test_inbound_v6(self): self.transfer(self.outer, 'TCP6:[::1]:{}'.format(self.INBOUND_FWD), self.inner, 'TCP6-LISTEN:{},bind=[::1]'.format(self.INBOUND_FWD)) def test_outbound_splice_v6(self): self.transfer(self.inner, 'TCP6:[::1]:{}'.format(self.OUTBOUND_FWD), self.outer, 'TCP6-LISTEN:{},bind=[::1]'.format(self.OUTBOUND_FWD)) def test_outbound_tap_v6(self): # We need the gateway route to be configured common.slaac_wait(self.innerx, self.IFNAME) self.transfer(self.inner, 'TCP6:[{}%{}]:{}'.format(self.gw_ll, self.IFNAME, self.OUTBOUND_FWD), self.outer, 'TCP6-LISTEN:{}'.format(self.OUTBOUND_FWD)) # Sane again but with a larger data file class PastaTcpBig(PastaTcpSmall): DATA_FILE = './test/big.bin' timeout = 15.0