diff --git a/smoketest/scripts/cli/test_interfaces_bonding.py b/smoketest/scripts/cli/test_interfaces_bonding.py index 8fc5cf8b1..d8e6bde5c 100755 --- a/smoketest/scripts/cli/test_interfaces_bonding.py +++ b/smoketest/scripts/cli/test_interfaces_bonding.py @@ -1,246 +1,246 @@ #!/usr/bin/env python3 # # Copyright (C) 2020-2023 VyOS maintainers and contributors # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License version 2 or later as # published by the Free Software Foundation. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. import os import unittest from base_interfaces_test import BasicInterfaceTest from vyos.ifconfig import Section from vyos.ifconfig.interface import Interface from vyos.configsession import ConfigSessionError from vyos.utils.network import get_interface_config from vyos.utils.file import read_file class BondingInterfaceTest(BasicInterfaceTest.TestCase): @classmethod def setUpClass(cls): cls._base_path = ['interfaces', 'bonding'] cls._mirror_interfaces = ['dum21354'] cls._members = [] # we need to filter out VLAN interfaces identified by a dot (.) # in their name - just in case! if 'TEST_ETH' in os.environ: cls._members = os.environ['TEST_ETH'].split() else: for tmp in Section.interfaces('ethernet'): if not '.' in tmp: cls._members.append(tmp) cls._options = {'bond0' : []} for member in cls._members: cls._options['bond0'].append(f'member interface {member}') cls._interfaces = list(cls._options) # call base-classes classmethod super(BondingInterfaceTest, cls).setUpClass() def test_add_single_ip_address(self): super().test_add_single_ip_address() for interface in self._interfaces: slaves = read_file(f'/sys/class/net/{interface}/bonding/slaves').split() self.assertListEqual(slaves, self._members) def test_vif_8021q_interfaces(self): super().test_vif_8021q_interfaces() for interface in self._interfaces: slaves = read_file(f'/sys/class/net/{interface}/bonding/slaves').split() self.assertListEqual(slaves, self._members) def test_bonding_remove_member(self): # T2515: when removing a bond member the previously enslaved/member # interface must be in its former admin-up/down state. Here we ensure # that it is admin-up as it was admin-up before. # configure member interfaces for interface in self._interfaces: for option in self._options.get(interface, []): self.cli_set(self._base_path + [interface] + option.split()) self.cli_commit() # remove single bond member port for interface in self._interfaces: remove_member = self._members[0] self.cli_delete(self._base_path + [interface, 'member', 'interface', remove_member]) self.cli_commit() # removed member port must be admin-up for interface in self._interfaces: remove_member = self._members[0] state = Interface(remove_member).get_admin_state() self.assertEqual('up', state) def test_bonding_min_links(self): # configure member interfaces min_links = len(self._interfaces) for interface in self._interfaces: for option in self._options.get(interface, []): self.cli_set(self._base_path + [interface] + option.split()) self.cli_set(self._base_path + [interface, 'min-links', str(min_links)]) self.cli_commit() # verify config for interface in self._interfaces: tmp = get_interface_config(interface) self.assertEqual(min_links, tmp['linkinfo']['info_data']['min_links']) # check LACP default rate self.assertEqual('slow', tmp['linkinfo']['info_data']['ad_lacp_rate']) def test_bonding_lacp_rate(self): # configure member interfaces lacp_rate = 'fast' for interface in self._interfaces: for option in self._options.get(interface, []): self.cli_set(self._base_path + [interface] + option.split()) self.cli_set(self._base_path + [interface, 'lacp-rate', lacp_rate]) self.cli_commit() # verify config for interface in self._interfaces: tmp = get_interface_config(interface) # check LACP minimum links (default value) self.assertEqual(0, tmp['linkinfo']['info_data']['min_links']) self.assertEqual(lacp_rate, tmp['linkinfo']['info_data']['ad_lacp_rate']) def test_bonding_hash_policy(self): # Define available bonding hash policies hash_policies = ['layer2', 'layer2+3', 'encap2+3', 'encap3+4'] for hash_policy in hash_policies: for interface in self._interfaces: for option in self._options.get(interface, []): self.cli_set(self._base_path + [interface] + option.split()) self.cli_set(self._base_path + [interface, 'hash-policy', hash_policy]) self.cli_commit() # verify config for interface in self._interfaces: defined_policy = read_file(f'/sys/class/net/{interface}/bonding/xmit_hash_policy').split() self.assertEqual(defined_policy[0], hash_policy) def test_bonding_mii_monitoring_interval(self): for interface in self._interfaces: for option in self._options.get(interface, []): self.cli_set(self._base_path + [interface] + option.split()) self.cli_commit() # verify default for interface in self._interfaces: tmp = read_file(f'/sys/class/net/{interface}/bonding/miimon').split() self.assertIn('100', tmp) mii_mon = '250' for interface in self._interfaces: self.cli_set(self._base_path + [interface, 'mii-mon-interval', mii_mon]) self.cli_commit() # verify new CLI value for interface in self._interfaces: tmp = read_file(f'/sys/class/net/{interface}/bonding/miimon').split() self.assertIn(mii_mon, tmp) def test_bonding_multi_use_member(self): # Define available bonding hash policies for interface in ['bond10', 'bond20']: for member in self._members: self.cli_set(self._base_path + [interface, 'member', 'interface', member]) # check validate() - can not use the same member interfaces multiple times with self.assertRaises(ConfigSessionError): self.cli_commit() self.cli_delete(self._base_path + ['bond20']) self.cli_commit() def test_bonding_source_interface(self): # Re-use member interface that is already a source-interface bond = 'bond99' pppoe = 'pppoe98756' member = next(iter(self._members)) self.cli_set(self._base_path + [bond, 'member', 'interface', member]) self.cli_set(['interfaces', 'pppoe', pppoe, 'source-interface', member]) # check validate() - can not add interface to bond, it is the source-interface of ... with self.assertRaises(ConfigSessionError): self.cli_commit() self.cli_delete(['interfaces', 'pppoe', pppoe]) self.cli_commit() # verify config slaves = read_file(f'/sys/class/net/{bond}/bonding/slaves').split() self.assertIn(member, slaves) def test_bonding_source_bridge_interface(self): # Re-use member interface that is already a source-interface bond = 'bond1097' bridge = 'br6327' member = next(iter(self._members)) self.cli_set(self._base_path + [bond, 'member', 'interface', member]) self.cli_set(['interfaces', 'bridge', bridge, 'member', 'interface', member]) # check validate() - can not add interface to bond, it is a member of bridge ... with self.assertRaises(ConfigSessionError): self.cli_commit() self.cli_delete(['interfaces', 'bridge', bridge]) self.cli_commit() # verify config slaves = read_file(f'/sys/class/net/{bond}/bonding/slaves').split() self.assertIn(member, slaves) def test_bonding_uniq_member_description(self): ethernet_path = ['interfaces', 'ethernet'] for interface in self._interfaces: for option in self._options.get(interface, []): self.cli_set(self._base_path + [interface] + option.split()) self.cli_commit() # Add any changes on bonding members # For example add description on separate ethX interfaces for interface in self._interfaces: for member in self._members: self.cli_set(ethernet_path + [member, 'description', member + '_interface']) self.cli_commit() # verify config for interface in self._interfaces: slaves = read_file(f'/sys/class/net/{interface}/bonding/slaves').split() for member in self._members: self.assertIn(member, slaves) if __name__ == '__main__': - unittest.main(verbosity=2, failfast=True) + unittest.main(verbosity=2) diff --git a/smoketest/scripts/cli/test_interfaces_macsec.py b/smoketest/scripts/cli/test_interfaces_macsec.py index 20ffa04d4..b32a6f524 100755 --- a/smoketest/scripts/cli/test_interfaces_macsec.py +++ b/smoketest/scripts/cli/test_interfaces_macsec.py @@ -1,213 +1,212 @@ #!/usr/bin/env python3 # # Copyright (C) 2020-2023 VyOS maintainers and contributors # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License version 2 or later as # published by the Free Software Foundation. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. import os import re import unittest from base_interfaces_test import BasicInterfaceTest from netifaces import interfaces from vyos.configsession import ConfigSessionError from vyos.ifconfig import Section from vyos.utils.process import cmd from vyos.utils.file import read_file from vyos.utils.network import get_interface_config from vyos.utils.process import process_named_running PROCESS_NAME = 'wpa_supplicant' def get_config_value(interface, key): tmp = read_file(f'/run/wpa_supplicant/{interface}.conf') tmp = re.findall(r'\n?{}=(.*)'.format(key), tmp) return tmp[0] def get_cipher(interface): tmp = get_interface_config(interface) return tmp['linkinfo']['info_data']['cipher_suite'].lower() class MACsecInterfaceTest(BasicInterfaceTest.TestCase): @classmethod def setUpClass(cls): cls._base_path = ['interfaces', 'macsec'] cls._options = { 'macsec0': ['source-interface eth0', 'security cipher gcm-aes-128'] } # if we have a physical eth1 interface, add a second macsec instance if 'eth1' in Section.interfaces('ethernet'): macsec = { 'macsec1': [f'source-interface eth1', 'security cipher gcm-aes-128'] } cls._options.update(macsec) cls._interfaces = list(cls._options) # call base-classes classmethod super(MACsecInterfaceTest, cls).setUpClass() def tearDown(self): super().tearDown() self.assertFalse(process_named_running(PROCESS_NAME)) def test_macsec_encryption(self): # MACsec can be operating in authentication and encryption mode - both # using different mandatory settings, lets test encryption as the basic # authentication test has been performed using the base class tests mak_cak = '232e44b7fda6f8e2d88a07bf78a7aff4' mak_ckn = '40916f4b23e3d548ad27eedd2d10c6f98c2d21684699647d63d41b500dfe8836' replay_window = '64' for interface, option_value in self._options.items(): for option in option_value: if option.split()[0] == 'source-interface': src_interface = option.split()[1] self.cli_set(self._base_path + [interface] + option.split()) # Encrypt link self.cli_set(self._base_path + [interface, 'security', 'encrypt']) # check validate() - Physical source interface MTU must be higher then our MTU self.cli_set(self._base_path + [interface, 'mtu', '1500']) with self.assertRaises(ConfigSessionError): self.cli_commit() self.cli_delete(self._base_path + [interface, 'mtu']) # check validate() - MACsec security keys mandartory when encryption is enabled with self.assertRaises(ConfigSessionError): self.cli_commit() self.cli_set(self._base_path + [interface, 'security', 'mka', 'cak', mak_cak]) # check validate() - MACsec security keys mandartory when encryption is enabled with self.assertRaises(ConfigSessionError): self.cli_commit() self.cli_set(self._base_path + [interface, 'security', 'mka', 'ckn', mak_ckn]) self.cli_set(self._base_path + [interface, 'security', 'replay-window', replay_window]) # final commit of settings self.cli_commit() tmp = get_config_value(src_interface, 'macsec_integ_only') self.assertIn("0", tmp) tmp = get_config_value(src_interface, 'mka_cak') self.assertIn(mak_cak, tmp) tmp = get_config_value(src_interface, 'mka_ckn') self.assertIn(mak_ckn, tmp) # check that the default priority of 255 is programmed tmp = get_config_value(src_interface, 'mka_priority') self.assertIn("255", tmp) tmp = get_config_value(src_interface, 'macsec_replay_window') self.assertIn(replay_window, tmp) tmp = read_file(f'/sys/class/net/{interface}/mtu') self.assertEqual(tmp, '1460') # Check for running process self.assertTrue(process_named_running(PROCESS_NAME)) def test_macsec_gcm_aes_128(self): src_interface = 'eth0' interface = 'macsec1' cipher = 'gcm-aes-128' self.cli_set(self._base_path + [interface]) # check validate() - source interface is mandatory with self.assertRaises(ConfigSessionError): self.cli_commit() self.cli_set(self._base_path + [interface, 'source-interface', src_interface]) # check validate() - cipher is mandatory with self.assertRaises(ConfigSessionError): self.cli_commit() self.cli_set(self._base_path + [interface, 'security', 'cipher', cipher]) # final commit and verify self.cli_commit() self.assertIn(interface, interfaces()) self.assertIn(interface, interfaces()) self.assertEqual(cipher, get_cipher(interface)) # check that we use the new macsec_csindex option (T4537) tmp = get_config_value(src_interface, 'macsec_csindex') self.assertIn("0", tmp) # Check for running process self.assertTrue(process_named_running(PROCESS_NAME)) def test_macsec_gcm_aes_256(self): src_interface = 'eth0' interface = 'macsec4' cipher = 'gcm-aes-256' self.cli_set(self._base_path + [interface]) # check validate() - source interface is mandatory with self.assertRaises(ConfigSessionError): self.cli_commit() self.cli_set(self._base_path + [interface, 'source-interface', src_interface]) # check validate() - cipher is mandatory with self.assertRaises(ConfigSessionError): self.cli_commit() self.cli_set(self._base_path + [interface, 'security', 'cipher', cipher]) # final commit and verify self.cli_commit() self.assertIn(interface, interfaces()) self.assertEqual(cipher, get_cipher(interface)) # check that we use the new macsec_csindex option (T4537) tmp = get_config_value(src_interface, 'macsec_csindex') self.assertIn("1", tmp) # Check for running process self.assertTrue(process_named_running(PROCESS_NAME)) def test_macsec_source_interface(self): # Ensure source-interface can bot be part of any other bond or bridge base_bridge = ['interfaces', 'bridge', 'br200'] base_bond = ['interfaces', 'bonding', 'bond200'] for interface, option_value in self._options.items(): for option in option_value: self.cli_set(self._base_path + [interface] + option.split()) if option.split()[0] == 'source-interface': src_interface = option.split()[1] self.cli_set(base_bridge + ['member', 'interface', src_interface]) # check validate() - Source interface must not already be a member of a bridge with self.assertRaises(ConfigSessionError): self.cli_commit() self.cli_delete(base_bridge) self.cli_set(base_bond + ['member', 'interface', src_interface]) # check validate() - Source interface must not already be a member of a bridge with self.assertRaises(ConfigSessionError): self.cli_commit() self.cli_delete(base_bond) # final commit and verify self.cli_commit() self.assertIn(interface, interfaces()) # Check for running process self.assertTrue(process_named_running(PROCESS_NAME)) if __name__ == '__main__': - unittest.main(verbosity=2, failfast=True) - + unittest.main(verbosity=2) diff --git a/smoketest/scripts/cli/test_qos.py b/smoketest/scripts/cli/test_qos.py index 4cda2089f..3743be788 100755 --- a/smoketest/scripts/cli/test_qos.py +++ b/smoketest/scripts/cli/test_qos.py @@ -1,547 +1,547 @@ #!/usr/bin/env python3 # # Copyright (C) 2022 VyOS maintainers and contributors # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License version 2 or later as # published by the Free Software Foundation. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. import os import unittest from json import loads from base_vyostest_shim import VyOSUnitTestSHIM from vyos.configsession import ConfigSessionError from vyos.ifconfig import Section from vyos.utils.process import cmd base_path = ['qos'] def get_tc_qdisc_json(interface) -> dict: tmp = cmd(f'tc -detail -json qdisc show dev {interface}') tmp = loads(tmp) return next(iter(tmp)) def get_tc_filter_json(interface, direction) -> list: if direction not in ['ingress', 'egress']: raise ValueError() tmp = cmd(f'tc -detail -json filter show dev {interface} {direction}') tmp = loads(tmp) return tmp class TestQoS(VyOSUnitTestSHIM.TestCase): @classmethod def setUpClass(cls): super(TestQoS, cls).setUpClass() # ensure we can also run this test on a live system - so lets clean # out the current configuration :) cls.cli_delete(cls, base_path) # We only test on physical interfaces and not VLAN (sub-)interfaces cls._interfaces = [] if 'TEST_ETH' in os.environ: tmp = os.environ['TEST_ETH'].split() cls._interfaces = tmp else: for tmp in Section.interfaces('ethernet', vlan=False): cls._interfaces.append(tmp) def tearDown(self): # delete testing SSH config self.cli_delete(base_path) self.cli_commit() def test_01_cake(self): bandwidth = 1000000 rtt = 200 for interface in self._interfaces: policy_name = f'qos-policy-{interface}' self.cli_set(base_path + ['interface', interface, 'egress', policy_name]) self.cli_set(base_path + ['policy', 'cake', policy_name, 'bandwidth', str(bandwidth)]) self.cli_set(base_path + ['policy', 'cake', policy_name, 'rtt', str(rtt)]) self.cli_set(base_path + ['policy', 'cake', policy_name, 'flow-isolation', 'dual-src-host']) bandwidth += 1000000 rtt += 20 # commit changes self.cli_commit() bandwidth = 1000000 rtt = 200 for interface in self._interfaces: tmp = get_tc_qdisc_json(interface) self.assertEqual('cake', tmp['kind']) # TC store rates as a 32-bit unsigned integer in bps (Bytes per second) self.assertEqual(int(bandwidth *125), tmp['options']['bandwidth']) # RTT internally is in us self.assertEqual(int(rtt *1000), tmp['options']['rtt']) self.assertEqual('dual-srchost', tmp['options']['flowmode']) self.assertFalse(tmp['options']['ingress']) self.assertFalse(tmp['options']['nat']) self.assertTrue(tmp['options']['raw']) bandwidth += 1000000 rtt += 20 def test_02_drop_tail(self): queue_limit = 50 first = True for interface in self._interfaces: policy_name = f'qos-policy-{interface}' if first: self.cli_set(base_path + ['interface', interface, 'ingress', policy_name]) # verify() - selected QoS policy on interface only supports egress with self.assertRaises(ConfigSessionError): self.cli_commit() self.cli_delete(base_path + ['interface', interface, 'ingress', policy_name]) first = False self.cli_set(base_path + ['interface', interface, 'egress', policy_name]) self.cli_set(base_path + ['policy', 'drop-tail', policy_name, 'queue-limit', str(queue_limit)]) queue_limit += 10 # commit changes self.cli_commit() queue_limit = 50 for interface in self._interfaces: tmp = get_tc_qdisc_json(interface) self.assertEqual('pfifo', tmp['kind']) self.assertEqual(queue_limit, tmp['options']['limit']) queue_limit += 10 def test_03_fair_queue(self): hash_interval = 10 queue_limit = 5 policy_type = 'fair-queue' first = True for interface in self._interfaces: policy_name = f'qos-policy-{interface}' if first: self.cli_set(base_path + ['interface', interface, 'ingress', policy_name]) # verify() - selected QoS policy on interface only supports egress with self.assertRaises(ConfigSessionError): self.cli_commit() self.cli_delete(base_path + ['interface', interface, 'ingress', policy_name]) first = False self.cli_set(base_path + ['interface', interface, 'egress', policy_name]) self.cli_set(base_path + ['policy', policy_type, policy_name, 'hash-interval', str(hash_interval)]) self.cli_set(base_path + ['policy', policy_type, policy_name, 'queue-limit', str(queue_limit)]) hash_interval += 1 queue_limit += 1 # commit changes self.cli_commit() hash_interval = 10 queue_limit = 5 for interface in self._interfaces: tmp = get_tc_qdisc_json(interface) self.assertEqual('sfq', tmp['kind']) self.assertEqual(hash_interval, tmp['options']['perturb']) self.assertEqual(queue_limit, tmp['options']['limit']) hash_interval += 1 queue_limit += 1 def test_04_fq_codel(self): policy_type = 'fq-codel' codel_quantum = 1500 flows = 512 interval = 100 queue_limit = 2048 target = 5 first = True for interface in self._interfaces: policy_name = f'qos-policy-{interface}' if first: self.cli_set(base_path + ['interface', interface, 'ingress', policy_name]) # verify() - selected QoS policy on interface only supports egress with self.assertRaises(ConfigSessionError): self.cli_commit() self.cli_delete(base_path + ['interface', interface, 'ingress', policy_name]) first = False self.cli_set(base_path + ['interface', interface, 'egress', policy_name]) self.cli_set(base_path + ['policy', policy_type, policy_name, 'codel-quantum', str(codel_quantum)]) self.cli_set(base_path + ['policy', policy_type, policy_name, 'flows', str(flows)]) self.cli_set(base_path + ['policy', policy_type, policy_name, 'interval', str(interval)]) self.cli_set(base_path + ['policy', policy_type, policy_name, 'queue-limit', str(queue_limit)]) self.cli_set(base_path + ['policy', policy_type, policy_name, 'target', str(target)]) codel_quantum += 10 flows += 2 interval += 10 queue_limit += 512 target += 1 # commit changes self.cli_commit() codel_quantum = 1500 flows = 512 interval = 100 queue_limit = 2048 target = 5 for interface in self._interfaces: tmp = get_tc_qdisc_json(interface) self.assertEqual('fq_codel', tmp['kind']) self.assertEqual(codel_quantum, tmp['options']['quantum']) self.assertEqual(flows, tmp['options']['flows']) self.assertEqual(queue_limit, tmp['options']['limit']) # due to internal rounding we need to substract 1 from interval and target after converting to milliseconds # configuration of: # tc qdisc add dev eth0 root fq_codel quantum 1500 flows 512 interval 100ms limit 2048 target 5ms noecn # results in: tc -j qdisc show dev eth0 # [{"kind":"fq_codel","handle":"8046:","root":true,"refcnt":3,"options":{"limit":2048,"flows":512, # "quantum":1500,"target":4999,"interval":99999,"memory_limit":33554432,"drop_batch":64}}] self.assertAlmostEqual(tmp['options']['interval'], interval *1000, delta=1) self.assertAlmostEqual(tmp['options']['target'], target *1000 -1, delta=1) codel_quantum += 10 flows += 2 interval += 10 queue_limit += 512 target += 1 def test_05_limiter(self): qos_config = { '1' : { 'bandwidth' : '1000000', 'match4' : { 'ssh' : { 'dport' : '22', }, }, }, '2' : { 'bandwidth' : '1000000', 'match6' : { 'ssh' : { 'dport' : '22', }, }, }, } first = True for interface in self._interfaces: policy_name = f'qos-policy-{interface}' if first: self.cli_set(base_path + ['interface', interface, 'egress', policy_name]) # verify() - selected QoS policy on interface only supports egress with self.assertRaises(ConfigSessionError): self.cli_commit() self.cli_delete(base_path + ['interface', interface, 'egress', policy_name]) first = False self.cli_set(base_path + ['interface', interface, 'ingress', policy_name]) # set default bandwidth parameter for all remaining connections self.cli_set(base_path + ['policy', 'limiter', policy_name, 'default', 'bandwidth', '500000']) for qos_class, qos_class_config in qos_config.items(): qos_class_base = base_path + ['policy', 'limiter', policy_name, 'class', qos_class] if 'match4' in qos_class_config: for match, match_config in qos_class_config['match4'].items(): if 'dport' in match_config: self.cli_set(qos_class_base + ['match', match, 'ip', 'destination', 'port', match_config['dport']]) if 'match6' in qos_class_config: for match, match_config in qos_class_config['match6'].items(): if 'dport' in match_config: self.cli_set(qos_class_base + ['match', match, 'ipv6', 'destination', 'port', match_config['dport']]) if 'bandwidth' in qos_class_config: self.cli_set(qos_class_base + ['bandwidth', qos_class_config['bandwidth']]) # commit changes self.cli_commit() for interface in self._interfaces: for filter in get_tc_filter_json(interface, 'ingress'): # bail out early if filter has no attached action if 'options' not in filter or 'actions' not in filter['options']: continue for qos_class, qos_class_config in qos_config.items(): # Every flowid starts with ffff and we encopde the class number after the colon if 'flowid' not in filter['options'] or filter['options']['flowid'] != f'ffff:{qos_class}': continue ip_hdr_offset = 20 if 'match6' in qos_class_config: ip_hdr_offset = 40 self.assertEqual(ip_hdr_offset, filter['options']['match']['off']) if 'dport' in match_config: dport = int(match_config['dport']) self.assertEqual(f'{dport:x}', filter['options']['match']['value']) def test_06_network_emulator(self): policy_type = 'network-emulator' bandwidth = 1000000 corruption = 1 delay = 2 duplicate = 3 loss = 4 queue_limit = 5 reordering = 6 first = True for interface in self._interfaces: policy_name = f'qos-policy-{interface}' if first: self.cli_set(base_path + ['interface', interface, 'ingress', policy_name]) # verify() - selected QoS policy on interface only supports egress with self.assertRaises(ConfigSessionError): self.cli_commit() self.cli_delete(base_path + ['interface', interface, 'ingress', policy_name]) first = False self.cli_set(base_path + ['interface', interface, 'egress', policy_name]) self.cli_set(base_path + ['policy', policy_type, policy_name, 'bandwidth', str(bandwidth)]) self.cli_set(base_path + ['policy', policy_type, policy_name, 'corruption', str(corruption)]) self.cli_set(base_path + ['policy', policy_type, policy_name, 'delay', str(delay)]) self.cli_set(base_path + ['policy', policy_type, policy_name, 'duplicate', str(duplicate)]) self.cli_set(base_path + ['policy', policy_type, policy_name, 'loss', str(loss)]) self.cli_set(base_path + ['policy', policy_type, policy_name, 'queue-limit', str(queue_limit)]) self.cli_set(base_path + ['policy', policy_type, policy_name, 'reordering', str(reordering)]) bandwidth += 1000000 corruption += 1 delay += 1 duplicate +=1 loss += 1 queue_limit += 1 reordering += 1 # commit changes self.cli_commit() bandwidth = 1000000 corruption = 1 delay = 2 duplicate = 3 loss = 4 queue_limit = 5 reordering = 6 for interface in self._interfaces: tmp = get_tc_qdisc_json(interface) self.assertEqual('netem', tmp['kind']) self.assertEqual(int(bandwidth *125), tmp['options']['rate']['rate']) # values are in % self.assertEqual(corruption/100, tmp['options']['corrupt']['corrupt']) self.assertEqual(duplicate/100, tmp['options']['duplicate']['duplicate']) self.assertEqual(loss/100, tmp['options']['loss-random']['loss']) self.assertEqual(reordering/100, tmp['options']['reorder']['reorder']) self.assertEqual(delay/1000, tmp['options']['delay']['delay']) self.assertEqual(queue_limit, tmp['options']['limit']) bandwidth += 1000000 corruption += 1 delay += 1 duplicate += 1 loss += 1 queue_limit += 1 reordering += 1 def test_07_priority_queue(self): priorities = ['1', '2', '3', '4', '5'] first = True for interface in self._interfaces: policy_name = f'qos-policy-{interface}' if first: self.cli_set(base_path + ['interface', interface, 'ingress', policy_name]) # verify() - selected QoS policy on interface only supports egress with self.assertRaises(ConfigSessionError): self.cli_commit() self.cli_delete(base_path + ['interface', interface, 'ingress', policy_name]) first = False self.cli_set(base_path + ['interface', interface, 'egress', policy_name]) self.cli_set(base_path + ['policy', 'priority-queue', policy_name, 'default', 'queue-limit', '10']) for priority in priorities: prio_base = base_path + ['policy', 'priority-queue', policy_name, 'class', priority] self.cli_set(prio_base + ['match', f'prio-{priority}', 'ip', 'destination', 'port', str(1000 + int(priority))]) # commit changes self.cli_commit() def test_08_random_detect(self): self.skipTest('tc returns invalid JSON here - needs iproute2 fix') bandwidth = 5000 first = True for interface in self._interfaces: policy_name = f'qos-policy-{interface}' if first: self.cli_set(base_path + ['interface', interface, 'ingress', policy_name]) # verify() - selected QoS policy on interface only supports egress with self.assertRaises(ConfigSessionError): self.cli_commit() self.cli_delete(base_path + ['interface', interface, 'ingress', policy_name]) first = False self.cli_set(base_path + ['interface', interface, 'egress', policy_name]) self.cli_set(base_path + ['policy', 'random-detect', policy_name, 'bandwidth', str(bandwidth)]) bandwidth += 1000 # commit changes self.cli_commit() bandwidth = 5000 for interface in self._interfaces: tmp = get_tc_qdisc_json(interface) import pprint pprint.pprint(tmp) def test_09_rate_control(self): bandwidth = 5000 burst = 20 latency = 5 first = True for interface in self._interfaces: policy_name = f'qos-policy-{interface}' if first: self.cli_set(base_path + ['interface', interface, 'ingress', policy_name]) # verify() - selected QoS policy on interface only supports egress with self.assertRaises(ConfigSessionError): self.cli_commit() self.cli_delete(base_path + ['interface', interface, 'ingress', policy_name]) first = False self.cli_set(base_path + ['interface', interface, 'egress', policy_name]) self.cli_set(base_path + ['policy', 'rate-control', policy_name, 'bandwidth', str(bandwidth)]) self.cli_set(base_path + ['policy', 'rate-control', policy_name, 'burst', str(burst)]) self.cli_set(base_path + ['policy', 'rate-control', policy_name, 'latency', str(latency)]) bandwidth += 1000 burst += 5 latency += 1 # commit changes self.cli_commit() bandwidth = 5000 burst = 20 latency = 5 for interface in self._interfaces: tmp = get_tc_qdisc_json(interface) self.assertEqual('tbf', tmp['kind']) self.assertEqual(0, tmp['options']['mpu']) # TC store rates as a 32-bit unsigned integer in bps (Bytes per second) self.assertEqual(int(bandwidth * 125), tmp['options']['rate']) bandwidth += 1000 burst += 5 latency += 1 def test_10_round_robin(self): qos_config = { '1' : { 'match4' : { 'ssh' : { 'dport' : '22', }, }, }, '2' : { 'match6' : { 'ssh' : { 'dport' : '22', }, }, }, } first = True for interface in self._interfaces: policy_name = f'qos-policy-{interface}' if first: self.cli_set(base_path + ['interface', interface, 'ingress', policy_name]) # verify() - selected QoS policy on interface only supports egress with self.assertRaises(ConfigSessionError): self.cli_commit() self.cli_delete(base_path + ['interface', interface, 'ingress', policy_name]) first = False self.cli_set(base_path + ['interface', interface, 'egress', policy_name]) for qos_class, qos_class_config in qos_config.items(): qos_class_base = base_path + ['policy', 'round-robin', policy_name, 'class', qos_class] if 'match4' in qos_class_config: for match, match_config in qos_class_config['match4'].items(): if 'dport' in match_config: self.cli_set(qos_class_base + ['match', match, 'ip', 'destination', 'port', match_config['dport']]) if 'match6' in qos_class_config: for match, match_config in qos_class_config['match6'].items(): if 'dport' in match_config: self.cli_set(qos_class_base + ['match', match, 'ipv6', 'destination', 'port', match_config['dport']]) # commit changes self.cli_commit() for interface in self._interfaces: import pprint tmp = get_tc_qdisc_json(interface) self.assertEqual('drr', tmp['kind']) for filter in get_tc_filter_json(interface, 'ingress'): # bail out early if filter has no attached action if 'options' not in filter or 'actions' not in filter['options']: continue for qos_class, qos_class_config in qos_config.items(): # Every flowid starts with ffff and we encopde the class number after the colon if 'flowid' not in filter['options'] or filter['options']['flowid'] != f'ffff:{qos_class}': continue ip_hdr_offset = 20 if 'match6' in qos_class_config: ip_hdr_offset = 40 self.assertEqual(ip_hdr_offset, filter['options']['match']['off']) if 'dport' in match_config: dport = int(match_config['dport']) self.assertEqual(f'{dport:x}', filter['options']['match']['value']) if __name__ == '__main__': - unittest.main(verbosity=2, failfast=True) + unittest.main(verbosity=2) diff --git a/smoketest/scripts/cli/test_service_dns_forwarding.py b/smoketest/scripts/cli/test_service_dns_forwarding.py index 1bd9f8713..bc50a4ffe 100755 --- a/smoketest/scripts/cli/test_service_dns_forwarding.py +++ b/smoketest/scripts/cli/test_service_dns_forwarding.py @@ -1,259 +1,259 @@ #!/usr/bin/env python3 # # Copyright (C) 2019-2022 VyOS maintainers and contributors # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License version 2 or later as # published by the Free Software Foundation. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. import re import unittest from base_vyostest_shim import VyOSUnitTestSHIM from vyos.configsession import ConfigSessionError from vyos.template import bracketize_ipv6 from vyos.utils.file import read_file from vyos.utils.process import process_named_running CONFIG_FILE = '/run/powerdns/recursor.conf' FORWARD_FILE = '/run/powerdns/recursor.forward-zones.conf' HOSTSD_FILE = '/run/powerdns/recursor.vyos-hostsd.conf.lua' PROCESS_NAME= 'pdns_recursor' base_path = ['service', 'dns', 'forwarding'] allow_from = ['192.0.2.0/24', '2001:db8::/32'] listen_adress = ['127.0.0.1', '::1'] def get_config_value(key, file=CONFIG_FILE): tmp = read_file(file) tmp = re.findall(r'\n{}=+(.*)'.format(key), tmp) return tmp[0] class TestServicePowerDNS(VyOSUnitTestSHIM.TestCase): @classmethod def setUpClass(cls): super(TestServicePowerDNS, cls).setUpClass() # ensure we can also run this test on a live system - so lets clean # out the current configuration :) cls.cli_delete(cls, base_path) def tearDown(self): # Check for running process self.assertTrue(process_named_running(PROCESS_NAME)) # Delete DNS forwarding configuration self.cli_delete(base_path) self.cli_commit() # Check for running process self.assertFalse(process_named_running(PROCESS_NAME)) def test_basic_forwarding(self): # Check basic DNS forwarding settings cache_size = '20' negative_ttl = '120' self.cli_set(base_path + ['cache-size', cache_size]) self.cli_set(base_path + ['negative-ttl', negative_ttl]) # check validate() - allow from must be defined with self.assertRaises(ConfigSessionError): self.cli_commit() for network in allow_from: self.cli_set(base_path + ['allow-from', network]) # check validate() - listen-address must be defined with self.assertRaises(ConfigSessionError): self.cli_commit() for address in listen_adress: self.cli_set(base_path + ['listen-address', address]) # configure DNSSEC self.cli_set(base_path + ['dnssec', 'validate']) # Do not use local /etc/hosts file in name resolution self.cli_set(base_path + ['ignore-hosts-file']) # commit changes self.cli_commit() # Check configured cache-size tmp = get_config_value('max-cache-entries') self.assertEqual(tmp, cache_size) # Networks allowed to query this server tmp = get_config_value('allow-from') self.assertEqual(tmp, ','.join(allow_from)) # Addresses to listen for DNS queries tmp = get_config_value('local-address') self.assertEqual(tmp, ','.join(listen_adress)) # Maximum amount of time negative entries are cached tmp = get_config_value('max-negative-ttl') self.assertEqual(tmp, negative_ttl) # Do not use local /etc/hosts file in name resolution tmp = get_config_value('export-etc-hosts') self.assertEqual(tmp, 'no') # RFC1918 addresses are looked up by default tmp = get_config_value('serve-rfc1918') self.assertEqual(tmp, 'yes') # verify default port configuration tmp = get_config_value('local-port') self.assertEqual(tmp, '53') def test_dnssec(self): # DNSSEC option testing for network in allow_from: self.cli_set(base_path + ['allow-from', network]) for address in listen_adress: self.cli_set(base_path + ['listen-address', address]) options = ['off', 'process-no-validate', 'process', 'log-fail', 'validate'] for option in options: self.cli_set(base_path + ['dnssec', option]) # commit changes self.cli_commit() tmp = get_config_value('dnssec') self.assertEqual(tmp, option) def test_external_nameserver(self): # Externe Domain Name Servers (DNS) addresses for network in allow_from: self.cli_set(base_path + ['allow-from', network]) for address in listen_adress: self.cli_set(base_path + ['listen-address', address]) nameservers = {'192.0.2.1': {}, '192.0.2.2': {'port': '53'}, '2001:db8::1': {'port': '853'}} for h,p in nameservers.items(): if 'port' in p: self.cli_set(base_path + ['name-server', h, 'port', p['port']]) else: self.cli_set(base_path + ['name-server', h]) # commit changes self.cli_commit() tmp = get_config_value(r'\+.', file=FORWARD_FILE) canonical_entries = [(lambda h, p: f"{bracketize_ipv6(h)}:{p['port'] if 'port' in p else 53}")(h, p) for (h, p) in nameservers.items()] self.assertEqual(tmp, ', '.join(canonical_entries)) # Do not use local /etc/hosts file in name resolution # default: yes tmp = get_config_value('export-etc-hosts') self.assertEqual(tmp, 'yes') def test_domain_forwarding(self): for network in allow_from: self.cli_set(base_path + ['allow-from', network]) for address in listen_adress: self.cli_set(base_path + ['listen-address', address]) domains = ['vyos.io', 'vyos.net', 'vyos.com'] nameservers = {'192.0.2.1': {}, '192.0.2.2': {'port': '53'}, '2001:db8::1': {'port': '853'}} for domain in domains: for h,p in nameservers.items(): if 'port' in p: self.cli_set(base_path + ['domain', domain, 'name-server', h, 'port', p['port']]) else: self.cli_set(base_path + ['domain', domain, 'name-server', h]) # Test 'recursion-desired' flag for only one domain if domain == domains[0]: self.cli_set(base_path + ['domain', domain, 'recursion-desired']) # Test 'negative trust anchor' flag for the second domain only if domain == domains[1]: self.cli_set(base_path + ['domain', domain, 'addnta']) # commit changes self.cli_commit() # Test configured name-servers hosts_conf = read_file(HOSTSD_FILE) for domain in domains: # Test 'recursion-desired' flag for the first domain only if domain == domains[0]: key =f'\+{domain}' else: key =f'{domain}' tmp = get_config_value(key, file=FORWARD_FILE) canonical_entries = [(lambda h, p: f"{bracketize_ipv6(h)}:{p['port'] if 'port' in p else 53}")(h, p) for (h, p) in nameservers.items()] self.assertEqual(tmp, ', '.join(canonical_entries)) # Test 'negative trust anchor' flag for the second domain only if domain == domains[1]: self.assertIn(f'addNTA("{domain}", "static")', hosts_conf) def test_no_rfc1918_forwarding(self): for network in allow_from: self.cli_set(base_path + ['allow-from', network]) for address in listen_adress: self.cli_set(base_path + ['listen-address', address]) self.cli_set(base_path + ['no-serve-rfc1918']) # commit changes self.cli_commit() # verify configuration tmp = get_config_value('serve-rfc1918') self.assertEqual(tmp, 'no') def test_dns64(self): dns_prefix = '64:ff9b::/96' for network in allow_from: self.cli_set(base_path + ['allow-from', network]) for address in listen_adress: self.cli_set(base_path + ['listen-address', address]) # Check dns64-prefix - must be prefix /96 self.cli_set(base_path + ['dns64-prefix', '2001:db8:aabb::/64']) with self.assertRaises(ConfigSessionError): self.cli_commit() self.cli_set(base_path + ['dns64-prefix', dns_prefix]) # commit changes self.cli_commit() # verify dns64-prefix configuration tmp = get_config_value('dns64-prefix') self.assertEqual(tmp, dns_prefix) def test_listening_port(self): # We can listen on a different port compared to '53' but only one at a time for port in ['1053', '5353']: self.cli_set(base_path + ['port', port]) for network in allow_from: self.cli_set(base_path + ['allow-from', network]) for address in listen_adress: self.cli_set(base_path + ['listen-address', address]) # commit changes self.cli_commit() # verify local-port configuration tmp = get_config_value('local-port') self.assertEqual(tmp, port) if __name__ == '__main__': - unittest.main(verbosity=2, failfast=True) + unittest.main(verbosity=2) diff --git a/smoketest/scripts/cli/test_system_frr.py b/smoketest/scripts/cli/test_system_frr.py index 715ec401d..3eb0cd0ab 100755 --- a/smoketest/scripts/cli/test_system_frr.py +++ b/smoketest/scripts/cli/test_system_frr.py @@ -1,146 +1,146 @@ #!/usr/bin/env python3 # # Copyright (C) 2019-2020 VyOS maintainers and contributors # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License version 2 or later as # published by the Free Software Foundation. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. import re import unittest from base_vyostest_shim import VyOSUnitTestSHIM from vyos.utils.file import read_file config_file = '/etc/frr/daemons' base_path = ['system', 'frr'] def daemons_config_parse(daemons_config): # create regex for parsing daemons options regex_daemon_config = re.compile( r'^(?P<daemon_name>\w+)_options="(?P<daemon_options>.*)"$', re.M) # create empty dict for config daemons_config_dict = {} # fill dictionary with actual config for daemon in regex_daemon_config.finditer(daemons_config): daemon_name = daemon.group('daemon_name') daemon_options = daemon.group('daemon_options') daemons_config_dict[daemon_name] = daemon_options # return daemons config return (daemons_config_dict) class TestSystemFRR(VyOSUnitTestSHIM.TestCase): def tearDown(self): self.cli_delete(base_path) self.cli_commit() def test_frr_snmp_multipledaemons(self): # test SNMP integration for multiple daemons test_daemon_names = ['ospfd', 'bgpd'] for test_daemon_name in test_daemon_names: self.cli_set(base_path + ['snmp', test_daemon_name]) self.cli_commit() # read the config file and check content daemons_config = read_file(config_file) daemons_config_dict = daemons_config_parse(daemons_config) # prepare regex for matching SNMP integration regex_snmp = re.compile(r'^.* -M snmp.*$') for (daemon_name, daemon_options) in daemons_config_dict.items(): snmp_enabled = regex_snmp.match(daemon_options) if daemon_name in test_daemon_names: self.assertTrue(snmp_enabled) else: self.assertFalse(snmp_enabled) def test_frr_snmp_addandremove(self): # test enabling and disabling of SNMP integration test_daemon_names = ['ospfd', 'bgpd'] for test_daemon_name in test_daemon_names: self.cli_set(base_path + ['snmp', test_daemon_name]) self.cli_commit() self.cli_delete(base_path) self.cli_commit() # read the config file and check content daemons_config = read_file(config_file) daemons_config_dict = daemons_config_parse(daemons_config) # prepare regex for matching SNMP integration regex_snmp = re.compile(r'^.* -M snmp.*$') for test_daemon_name in test_daemon_names: snmp_enabled = regex_snmp.match( daemons_config_dict[test_daemon_name]) self.assertFalse(snmp_enabled) def test_frr_snmp_empty(self): # test empty config section self.cli_set(base_path + ['snmp']) self.cli_commit() # read the config file and check content daemons_config = read_file(config_file) daemons_config_dict = daemons_config_parse(daemons_config) # prepare regex for matching SNMP integration regex_snmp = re.compile(r'^.* -M snmp.*$') for daemon_options in daemons_config_dict.values(): snmp_enabled = regex_snmp.match(daemon_options) self.assertFalse(snmp_enabled) def test_frr_bmp(self): # test BMP self.cli_set(base_path + ['bmp']) self.cli_commit() # read the config file and check content daemons_config = read_file(config_file) daemons_config_dict = daemons_config_parse(daemons_config) # prepare regex regex_bmp = re.compile(r'^.* -M bmp.*$') bmp_enabled = regex_bmp.match(daemons_config_dict['bgpd']) self.assertTrue(bmp_enabled) def test_frr_irdp(self): # test IRDP self.cli_set(base_path + ['irdp']) self.cli_commit() # read the config file and check content daemons_config = read_file(config_file) daemons_config_dict = daemons_config_parse(daemons_config) # prepare regex regex_irdp = re.compile(r'^.* -M irdp.*$') irdp_enabled = regex_irdp.match(daemons_config_dict['zebra']) self.assertTrue(irdp_enabled) def test_frr_bmpandsnmp(self): # test empty config section self.cli_set(base_path + ['bmp']) self.cli_set(base_path + ['snmp', 'bgpd']) self.cli_commit() # read the config file and check content daemons_config = read_file(config_file) daemons_config_dict = daemons_config_parse(daemons_config) # prepare regex regex_snmp = re.compile(r'^.* -M bmp.*$') regex_snmp = re.compile(r'^.* -M snmp.*$') bmp_enabled = regex_snmp.match(daemons_config_dict['bgpd']) snmp_enabled = regex_snmp.match(daemons_config_dict['bgpd']) self.assertTrue(bmp_enabled) self.assertTrue(snmp_enabled) if __name__ == '__main__': - unittest.main(verbosity=2, failfast=True) + unittest.main(verbosity=2)