diff --git a/smoketest/scripts/cli/base_vyostest_shim.py b/smoketest/scripts/cli/base_vyostest_shim.py index affa53877..d95071d1a 100644 --- a/smoketest/scripts/cli/base_vyostest_shim.py +++ b/smoketest/scripts/cli/base_vyostest_shim.py @@ -1,177 +1,192 @@ # Copyright (C) 2021-2024 VyOS maintainers and contributors # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License version 2 or later as # published by the Free Software Foundation. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. import os import unittest import paramiko import pprint from time import sleep +from time import time from typing import Type from vyos.configsession import ConfigSession from vyos.configsession import ConfigSessionError from vyos import ConfigError from vyos.defaults import commit_lock from vyos.utils.process import cmd from vyos.utils.process import run save_config = '/tmp/vyos-smoketest-save' # This class acts as shim between individual Smoketests developed for VyOS and # the Python UnitTest framework. Before every test is loaded, we dump the current # system configuration and reload it after the test - despite the test results. # # Using this approach we can not render a live system useless while running any # kind of smoketest. In addition it adds debug capabilities like printing the # command used to execute the test. class VyOSUnitTestSHIM: class TestCase(unittest.TestCase): # if enabled in derived class, print out each and every set/del command # on the CLI. This is usefull to grap all the commands required to # trigger the certain failure condition. # Use "self.debug = True" in derived classes setUp() method debug = False - + commit_guard = time() @classmethod def setUpClass(cls): cls._session = ConfigSession(os.getpid()) cls._session.save_config(save_config) if os.path.exists('/tmp/vyos.smoketest.debug'): cls.debug = True pass @classmethod def tearDownClass(cls): # discard any pending changes which might caused a messed up config cls._session.discard() # ... and restore the initial state cls._session.migrate_and_load_config(save_config) try: cls._session.commit() except (ConfigError, ConfigSessionError): cls._session.discard() cls.fail(cls) def cli_set(self, config): if self.debug: print('set ' + ' '.join(config)) self._session.set(config) def cli_delete(self, config): if self.debug: print('del ' + ' '.join(config)) self._session.delete(config) def cli_discard(self): if self.debug: print('DISCARD') self._session.discard() def cli_commit(self): if self.debug: print('commit') self._session.commit() # during a commit there is a process opening commit_lock, and run() returns 0 while run(f'sudo lsof -nP {commit_lock}') == 0: sleep(0.250) + # reset getFRRconfig() guard timer + self.commit_guard = time() def op_mode(self, path : list) -> None: """ Execute OP-mode command and return stdout """ if self.debug: print('commit') path = ' '.join(path) out = cmd(f'/opt/vyatta/bin/vyatta-op-cmd-wrapper {path}') if self.debug: print(f'\n\ncommand "{path}" returned:\n') pprint.pprint(out) return out - def getFRRconfig(self, string=None, end='$', endsection='^!', daemon=''): + def getFRRconfig(self, string=None, end='$', endsection='^!', daemon='', guard_time=10, empty_retry=0): """ Retrieve current "running configuration" from FRR """ # Sometimes FRR needs some time after reloading the configuration to - # appear in vtysh. This is a workaround addiung a 2 seconds guard timer - sleep(2) + # appear in vtysh. This is a workaround addiung a 10 second guard timer + # between the last cli_commit() and the first read of FRR config via vtysh + while (time() - self.commit_guard) < guard_time: + sleep(0.250) # wait 250 milliseconds command = f'vtysh -c "show run {daemon} no-header"' if string: command += f' | sed -n "/^{string}{end}/,/{endsection}/p"' out = cmd(command) if self.debug: print(f'\n\ncommand "{command}" returned:\n') pprint.pprint(out) + if empty_retry: + retry_count = 0 + while not out and retry_count < empty_retry: + if self.debug and retry_count % 10 == 0: + print(f"Attempt {retry_count}: FRR config is still empty. Retrying...") + retry_count += 1 + sleep(1) + out = cmd(command) + if not out: + print(f'FRR configuration still empty after {empty_retry} retires!') return out @staticmethod def ssh_send_cmd(command, username, password, hostname='localhost'): """ SSH command execution helper """ # Try to login via SSH ssh_client = paramiko.SSHClient() ssh_client.set_missing_host_key_policy(paramiko.AutoAddPolicy()) ssh_client.connect(hostname=hostname, username=username, password=password) _, stdout, stderr = ssh_client.exec_command(command) output = stdout.read().decode().strip() error = stderr.read().decode().strip() ssh_client.close() return output, error # Verify nftables output def verify_nftables(self, nftables_search, table, inverse=False, args=''): nftables_output = cmd(f'sudo nft {args} list table {table}') for search in nftables_search: matched = False for line in nftables_output.split("\n"): if all(item in line for item in search): matched = True break self.assertTrue(not matched if inverse else matched, msg=search) def verify_nftables_chain(self, nftables_search, table, chain, inverse=False, args=''): nftables_output = cmd(f'sudo nft {args} list chain {table} {chain}') for search in nftables_search: matched = False for line in nftables_output.split("\n"): if all(item in line for item in search): matched = True break self.assertTrue(not matched if inverse else matched, msg=search) # Verify ip rule output def verify_rules(self, rules_search, inverse=False, addr_family='inet'): rule_output = cmd(f'ip -family {addr_family} rule show') for search in rules_search: matched = False for line in rule_output.split("\n"): if all(item in line for item in search): matched = True break self.assertTrue(not matched if inverse else matched, msg=search) # standard construction; typing suggestion: https://stackoverflow.com/a/70292317 def ignore_warning(warning: Type[Warning]): import warnings from functools import wraps def inner(f): @wraps(f) def wrapped(*args, **kwargs): with warnings.catch_warnings(): warnings.simplefilter("ignore", category=warning) return f(*args, **kwargs) return wrapped return inner diff --git a/smoketest/scripts/cli/test_protocols_babel.py b/smoketest/scripts/cli/test_protocols_babel.py index 7a79c7b9e..107e262cc 100755 --- a/smoketest/scripts/cli/test_protocols_babel.py +++ b/smoketest/scripts/cli/test_protocols_babel.py @@ -1,218 +1,218 @@ #!/usr/bin/env python3 # # Copyright (C) 2024 VyOS maintainers and contributors # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License version 2 or later as # published by the Free Software Foundation. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. import unittest from base_vyostest_shim import VyOSUnitTestSHIM from vyos.ifconfig import Section from vyos.frrender import babel_daemon from vyos.utils.process import process_named_running from vyos.xml_ref import default_value base_path = ['protocols', 'babel'] class TestProtocolsBABEL(VyOSUnitTestSHIM.TestCase): @classmethod def setUpClass(cls): cls._interfaces = Section.interfaces('ethernet', vlan=False) # call base-classes classmethod super(TestProtocolsBABEL, cls).setUpClass() # Retrieve FRR daemon PID - it is not allowed to crash, thus PID must remain the same cls.daemon_pid = process_named_running(babel_daemon) # ensure we can also run this test on a live system - so lets clean # out the current configuration :) cls.cli_delete(cls, base_path) cls.cli_delete(cls, ['policy', 'prefix-list']) cls.cli_delete(cls, ['policy', 'prefix-list6']) def tearDown(self): # always destroy the entire babel configuration to make the processes # life as hard as possible self.cli_delete(base_path) self.cli_delete(['policy', 'prefix-list']) self.cli_delete(['policy', 'prefix-list6']) self.cli_commit() # check process health and continuity self.assertEqual(self.daemon_pid, process_named_running(babel_daemon)) def test_babel_interfaces(self): def_update_interval = default_value(base_path + ['interface', 'eth0', 'update-interval']) channel = '20' hello_interval = '1000' max_rtt_penalty = '100' rtt_decay = '23' rtt_max = '119' rtt_min = '11' rxcost = '40000' type = 'wired' for interface in self._interfaces: self.cli_set(base_path + ['interface', interface]) self.cli_set(base_path + ['interface', interface, 'channel', channel]) self.cli_set(base_path + ['interface', interface, 'enable-timestamps']) self.cli_set(base_path + ['interface', interface, 'hello-interval', hello_interval]) self.cli_set(base_path + ['interface', interface, 'max-rtt-penalty', max_rtt_penalty]) self.cli_set(base_path + ['interface', interface, 'rtt-decay', rtt_decay]) self.cli_set(base_path + ['interface', interface, 'rtt-max', rtt_max]) self.cli_set(base_path + ['interface', interface, 'rtt-min', rtt_min]) self.cli_set(base_path + ['interface', interface, 'enable-timestamps']) self.cli_set(base_path + ['interface', interface, 'rxcost', rxcost]) self.cli_set(base_path + ['interface', interface, 'split-horizon', 'disable']) self.cli_set(base_path + ['interface', interface, 'type', type]) self.cli_commit() frrconfig = self.getFRRconfig('router babel', endsection='^exit', daemon=babel_daemon) for interface in self._interfaces: self.assertIn(f' network {interface}', frrconfig) iface_config = self.getFRRconfig(f'interface {interface}', endsection='^exit', daemon=babel_daemon) self.assertIn(f' babel channel {channel}', iface_config) self.assertIn(f' babel enable-timestamps', iface_config) self.assertIn(f' babel update-interval {def_update_interval}', iface_config) self.assertIn(f' babel hello-interval {hello_interval}', iface_config) self.assertIn(f' babel rtt-decay {rtt_decay}', iface_config) self.assertIn(f' babel rtt-max {rtt_max}', iface_config) self.assertIn(f' babel rtt-min {rtt_min}', iface_config) self.assertIn(f' babel rxcost {rxcost}', iface_config) self.assertIn(f' babel max-rtt-penalty {max_rtt_penalty}', iface_config) self.assertIn(f' no babel split-horizon', iface_config) self.assertIn(f' babel {type}', iface_config) def test_babel_redistribute(self): ipv4_protos = ['bgp', 'connected', 'isis', 'kernel', 'ospf', 'rip', 'static'] ipv6_protos = ['bgp', 'connected', 'isis', 'kernel', 'ospfv3', 'ripng', 'static'] for protocol in ipv4_protos: self.cli_set(base_path + ['redistribute', 'ipv4', protocol]) for protocol in ipv6_protos: self.cli_set(base_path + ['redistribute', 'ipv6', protocol]) self.cli_commit() - frrconfig = self.getFRRconfig('router babel', endsection='^exit', daemon=babel_daemon) + frrconfig = self.getFRRconfig('router babel', endsection='^exit', daemon=babel_daemon, empty_retry=5) for protocol in ipv4_protos: self.assertIn(f' redistribute ipv4 {protocol}', frrconfig) for protocol in ipv6_protos: if protocol == 'ospfv3': protocol = 'ospf6' self.assertIn(f' redistribute ipv6 {protocol}', frrconfig) def test_babel_basic(self): diversity_factor = '64' resend_delay = '100' smoothing_half_life = '400' self.cli_set(base_path + ['parameters', 'diversity']) self.cli_set(base_path + ['parameters', 'diversity-factor', diversity_factor]) self.cli_set(base_path + ['parameters', 'resend-delay', resend_delay]) self.cli_set(base_path + ['parameters', 'smoothing-half-life', smoothing_half_life]) self.cli_commit() frrconfig = self.getFRRconfig('router babel', endsection='^exit', daemon=babel_daemon) self.assertIn(f' babel diversity', frrconfig) self.assertIn(f' babel diversity-factor {diversity_factor}', frrconfig) self.assertIn(f' babel resend-delay {resend_delay}', frrconfig) self.assertIn(f' babel smoothing-half-life {smoothing_half_life}', frrconfig) def test_babel_distribute_list(self): access_list_in4 = '40' access_list_out4 = '50' access_list_in4_iface = '44' access_list_out4_iface = '55' access_list_in6 = 'AL-foo-in6' access_list_out6 = 'AL-foo-out6' prefix_list_in4 = 'PL-foo-in4' prefix_list_out4 = 'PL-foo-out4' prefix_list_in6 = 'PL-foo-in6' prefix_list_out6 = 'PL-foo-out6' self.cli_set(['policy', 'access-list', access_list_in4]) self.cli_set(['policy', 'access-list', access_list_out4]) self.cli_set(['policy', 'access-list6', access_list_in6]) self.cli_set(['policy', 'access-list6', access_list_out6]) self.cli_set(['policy', 'access-list', f'{access_list_in4_iface}']) self.cli_set(['policy', 'access-list', f'{access_list_out4_iface}']) self.cli_set(['policy', 'prefix-list', prefix_list_in4]) self.cli_set(['policy', 'prefix-list', prefix_list_out4]) self.cli_set(['policy', 'prefix-list6', prefix_list_in6]) self.cli_set(['policy', 'prefix-list6', prefix_list_out6]) self.cli_set(base_path + ['distribute-list', 'ipv4', 'access-list', 'in', access_list_in4]) self.cli_set(base_path + ['distribute-list', 'ipv4', 'access-list', 'out', access_list_out4]) self.cli_set(base_path + ['distribute-list', 'ipv6', 'access-list', 'in', access_list_in6]) self.cli_set(base_path + ['distribute-list', 'ipv6', 'access-list', 'out', access_list_out6]) self.cli_set(base_path + ['distribute-list', 'ipv4', 'prefix-list', 'in', prefix_list_in4]) self.cli_set(base_path + ['distribute-list', 'ipv4', 'prefix-list', 'out', prefix_list_out4]) self.cli_set(base_path + ['distribute-list', 'ipv6', 'prefix-list', 'in', prefix_list_in6]) self.cli_set(base_path + ['distribute-list', 'ipv6', 'prefix-list', 'out', prefix_list_out6]) for interface in self._interfaces: self.cli_set(base_path + ['interface', interface]) self.cli_set(['policy', 'access-list6', f'{access_list_in6}-{interface}']) self.cli_set(['policy', 'access-list6', f'{access_list_out6}-{interface}']) self.cli_set(['policy', 'prefix-list', f'{prefix_list_in4}-{interface}']) self.cli_set(['policy', 'prefix-list', f'{prefix_list_out4}-{interface}']) self.cli_set(['policy', 'prefix-list6', f'{prefix_list_in6}-{interface}']) self.cli_set(['policy', 'prefix-list6', f'{prefix_list_out6}-{interface}']) tmp_path = base_path + ['distribute-list', 'ipv4', 'interface', interface] self.cli_set(tmp_path + ['access-list', 'in', f'{access_list_in4_iface}']) self.cli_set(tmp_path + ['access-list', 'out', f'{access_list_out4_iface}']) self.cli_set(tmp_path + ['prefix-list', 'in', f'{prefix_list_in4}-{interface}']) self.cli_set(tmp_path + ['prefix-list', 'out', f'{prefix_list_out4}-{interface}']) tmp_path = base_path + ['distribute-list', 'ipv6', 'interface', interface] self.cli_set(tmp_path + ['access-list', 'in', f'{access_list_in6}-{interface}']) self.cli_set(tmp_path + ['access-list', 'out', f'{access_list_out6}-{interface}']) self.cli_set(tmp_path + ['prefix-list', 'in', f'{prefix_list_in6}-{interface}']) self.cli_set(tmp_path + ['prefix-list', 'out', f'{prefix_list_out6}-{interface}']) self.cli_commit() frrconfig = self.getFRRconfig('router babel', endsection='^exit', daemon=babel_daemon) self.assertIn(f' distribute-list {access_list_in4} in', frrconfig) self.assertIn(f' distribute-list {access_list_out4} out', frrconfig) self.assertIn(f' ipv6 distribute-list {access_list_in6} in', frrconfig) self.assertIn(f' ipv6 distribute-list {access_list_out6} out', frrconfig) self.assertIn(f' distribute-list prefix {prefix_list_in4} in', frrconfig) self.assertIn(f' distribute-list prefix {prefix_list_out4} out', frrconfig) self.assertIn(f' ipv6 distribute-list prefix {prefix_list_in6} in', frrconfig) self.assertIn(f' ipv6 distribute-list prefix {prefix_list_out6} out', frrconfig) for interface in self._interfaces: self.assertIn(f' distribute-list {access_list_in4_iface} in {interface}', frrconfig) self.assertIn(f' distribute-list {access_list_out4_iface} out {interface}', frrconfig) self.assertIn(f' ipv6 distribute-list {access_list_in6}-{interface} in {interface}', frrconfig) self.assertIn(f' ipv6 distribute-list {access_list_out6}-{interface} out {interface}', frrconfig) self.assertIn(f' distribute-list prefix {prefix_list_in4}-{interface} in {interface}', frrconfig) self.assertIn(f' distribute-list prefix {prefix_list_out4}-{interface} out {interface}', frrconfig) self.assertIn(f' ipv6 distribute-list prefix {prefix_list_in6}-{interface} in {interface}', frrconfig) self.assertIn(f' ipv6 distribute-list prefix {prefix_list_out6}-{interface} out {interface}', frrconfig) if __name__ == '__main__': unittest.main(verbosity=2) diff --git a/smoketest/scripts/cli/test_protocols_ospf.py b/smoketest/scripts/cli/test_protocols_ospf.py index 2bc9cf2a5..599bf3930 100755 --- a/smoketest/scripts/cli/test_protocols_ospf.py +++ b/smoketest/scripts/cli/test_protocols_ospf.py @@ -1,594 +1,578 @@ #!/usr/bin/env python3 # # Copyright (C) 2021-2024 VyOS maintainers and contributors # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License version 2 or later as # published by the Free Software Foundation. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. import unittest from time import sleep from base_vyostest_shim import VyOSUnitTestSHIM from vyos.configsession import ConfigSessionError from vyos.ifconfig import Section from vyos.frrender import ospf_daemon from vyos.utils.process import process_named_running from vyos.xml_ref import default_value base_path = ['protocols', 'ospf'] route_map = 'foo-bar-baz10' dummy_if = 'dum3562' class TestProtocolsOSPF(VyOSUnitTestSHIM.TestCase): @classmethod def setUpClass(cls): super(TestProtocolsOSPF, cls).setUpClass() # Retrieve FRR daemon PID - it is not allowed to crash, thus PID must remain the same cls.daemon_pid = process_named_running(ospf_daemon) cls.cli_set(cls, ['policy', 'route-map', route_map, 'rule', '10', 'action', 'permit']) cls.cli_set(cls, ['policy', 'route-map', route_map, 'rule', '20', 'action', 'permit']) cls.cli_set(cls, ['interfaces', 'dummy', dummy_if]) # ensure we can also run this test on a live system - so lets clean # out the current configuration :) cls.cli_delete(cls, base_path) @classmethod def tearDownClass(cls): cls.cli_delete(cls, ['policy', 'route-map', route_map]) cls.cli_delete(cls, ['interfaces', 'dummy', dummy_if]) super(TestProtocolsOSPF, cls).tearDownClass() def tearDown(self): self.cli_delete(base_path) self.cli_commit() frrconfig = self.getFRRconfig('router ospf', endsection='^exit', daemon=ospf_daemon) self.assertNotIn(f'router ospf', frrconfig) # check process health and continuity self.assertEqual(self.daemon_pid, process_named_running(ospf_daemon)) def test_ospf_01_defaults(self): # commit changes self.cli_set(base_path) self.cli_commit() # Verify FRR ospfd configuration frrconfig = self.getFRRconfig('router ospf', endsection='^exit', daemon=ospf_daemon) self.assertIn(f'router ospf', frrconfig) self.assertIn(f' auto-cost reference-bandwidth 100', frrconfig) self.assertIn(f' timers throttle spf 200 1000 10000', frrconfig) # defaults def test_ospf_02_simple(self): router_id = '127.0.0.1' abr_type = 'ibm' bandwidth = '1000' metric = '123' self.cli_set(base_path + ['auto-cost', 'reference-bandwidth', bandwidth]) self.cli_set(base_path + ['parameters', 'router-id', router_id]) self.cli_set(base_path + ['parameters', 'abr-type', abr_type]) self.cli_set(base_path + ['parameters', 'opaque-lsa']) self.cli_set(base_path + ['parameters', 'rfc1583-compatibility']) self.cli_set(base_path + ['log-adjacency-changes', 'detail']) self.cli_set(base_path + ['default-metric', metric]) self.cli_set(base_path + ['passive-interface', 'default']) self.cli_set(base_path + ['area', '10', 'area-type', 'stub']) self.cli_set(base_path + ['area', '10', 'network', '10.0.0.0/16']) self.cli_set(base_path + ['area', '10', 'range', '10.0.1.0/24']) self.cli_set(base_path + ['area', '10', 'range', '10.0.2.0/24', 'not-advertise']) # commit changes self.cli_commit() # Verify FRR ospfd configuration frrconfig = self.getFRRconfig('router ospf', endsection='^exit', daemon=ospf_daemon) self.assertIn(f'router ospf', frrconfig) self.assertIn(f' compatible rfc1583', frrconfig) self.assertIn(f' auto-cost reference-bandwidth {bandwidth}', frrconfig) self.assertIn(f' ospf router-id {router_id}', frrconfig) self.assertIn(f' ospf abr-type {abr_type}', frrconfig) self.assertIn(f' timers throttle spf 200 1000 10000', frrconfig) # defaults self.assertIn(f' capability opaque', frrconfig) self.assertIn(f' default-metric {metric}', frrconfig) self.assertIn(f' passive-interface default', frrconfig) self.assertIn(f' area 10 stub', frrconfig) self.assertIn(f' network 10.0.0.0/16 area 10', frrconfig) self.assertIn(f' area 10 range 10.0.1.0/24', frrconfig) self.assertNotIn(f' area 10 range 10.0.1.0/24 not-advertise', frrconfig) self.assertIn(f' area 10 range 10.0.2.0/24 not-advertise', frrconfig) def test_ospf_03_access_list(self): acl = '100' seq = '10' protocols = ['bgp', 'connected', 'isis', 'kernel', 'rip', 'static'] self.cli_set(['policy', 'access-list', acl, 'rule', seq, 'action', 'permit']) self.cli_set(['policy', 'access-list', acl, 'rule', seq, 'source', 'any']) self.cli_set(['policy', 'access-list', acl, 'rule', seq, 'destination', 'any']) for ptotocol in protocols: self.cli_set(base_path + ['access-list', acl, 'export', ptotocol]) # commit changes self.cli_commit() # Verify FRR ospfd configuration frrconfig = self.getFRRconfig('router ospf', endsection='^exit', daemon=ospf_daemon) self.assertIn(f'router ospf', frrconfig) self.assertIn(f' timers throttle spf 200 1000 10000', frrconfig) # defaults for ptotocol in protocols: self.assertIn(f' distribute-list {acl} out {ptotocol}', frrconfig) # defaults self.cli_delete(['policy', 'access-list', acl]) def test_ospf_04_default_originate(self): seq = '100' metric = '50' metric_type = '1' self.cli_set(base_path + ['default-information', 'originate', 'metric', metric]) self.cli_set(base_path + ['default-information', 'originate', 'metric-type', metric_type]) self.cli_set(base_path + ['default-information', 'originate', 'route-map', route_map]) # commit changes self.cli_commit() # Verify FRR ospfd configuration frrconfig = self.getFRRconfig('router ospf', endsection='^exit', daemon=ospf_daemon) self.assertIn(f'router ospf', frrconfig) self.assertIn(f' timers throttle spf 200 1000 10000', frrconfig) # defaults self.assertIn(f' default-information originate metric {metric} metric-type {metric_type} route-map {route_map}', frrconfig) # Now set 'always' self.cli_set(base_path + ['default-information', 'originate', 'always']) self.cli_commit() # Verify FRR ospfd configuration frrconfig = self.getFRRconfig('router ospf', endsection='^exit', daemon=ospf_daemon) self.assertIn(f' default-information originate always metric {metric} metric-type {metric_type} route-map {route_map}', frrconfig) def test_ospf_05_options(self): global_distance = '128' intra_area = '100' inter_area = '110' external = '120' on_startup = '30' on_shutdown = '60' refresh = '50' aggregation_timer = '100' summary_nets = { '10.0.1.0/24' : {}, '10.0.2.0/24' : {'tag' : '50'}, '10.0.3.0/24' : {'no_advertise' : {}}, } self.cli_set(base_path + ['distance', 'global', global_distance]) self.cli_set(base_path + ['distance', 'ospf', 'external', external]) self.cli_set(base_path + ['distance', 'ospf', 'intra-area', intra_area]) self.cli_set(base_path + ['max-metric', 'router-lsa', 'on-startup', on_startup]) self.cli_set(base_path + ['max-metric', 'router-lsa', 'on-shutdown', on_shutdown]) self.cli_set(base_path + ['mpls-te', 'enable']) self.cli_set(base_path + ['refresh', 'timers', refresh]) self.cli_set(base_path + ['aggregation', 'timer', aggregation_timer]) for summary, summary_options in summary_nets.items(): self.cli_set(base_path + ['summary-address', summary]) if 'tag' in summary_options: self.cli_set(base_path + ['summary-address', summary, 'tag', summary_options['tag']]) if 'no_advertise' in summary_options: self.cli_set(base_path + ['summary-address', summary, 'no-advertise']) # commit changes self.cli_commit() # Verify FRR ospfd configuration frrconfig = self.getFRRconfig('router ospf', endsection='^exit', daemon=ospf_daemon) self.assertIn(f'router ospf', frrconfig) self.assertIn(f' mpls-te on', frrconfig) self.assertIn(f' mpls-te router-address 0.0.0.0', frrconfig) # default self.assertIn(f' distance {global_distance}', frrconfig) self.assertIn(f' distance ospf intra-area {intra_area} external {external}', frrconfig) self.assertIn(f' max-metric router-lsa on-startup {on_startup}', frrconfig) self.assertIn(f' max-metric router-lsa on-shutdown {on_shutdown}', frrconfig) self.assertIn(f' refresh timer {refresh}', frrconfig) self.assertIn(f' aggregation timer {aggregation_timer}', frrconfig) for summary, summary_options in summary_nets.items(): self.assertIn(f' summary-address {summary}', frrconfig) if 'tag' in summary_options: tag = summary_options['tag'] self.assertIn(f' summary-address {summary} tag {tag}', frrconfig) if 'no_advertise' in summary_options: self.assertIn(f' summary-address {summary} no-advertise', frrconfig) # enable inter-area self.cli_set(base_path + ['distance', 'ospf', 'inter-area', inter_area]) self.cli_commit() frrconfig = self.getFRRconfig('router ospf', endsection='^exit', daemon=ospf_daemon) self.assertIn(f' distance ospf intra-area {intra_area} inter-area {inter_area} external {external}', frrconfig) # https://github.com/FRRouting/frr/issues/17011 # We need to wait on_shutdown time, until the OSPF process is removed from the CLI # otherwise the test in tearDown() will fail self.cli_delete(base_path) self.cli_commit() sleep(int(on_shutdown) + 5) # additional grace period of 5 seconds def test_ospf_06_neighbor(self): priority = '10' poll_interval = '20' neighbors = ['1.1.1.1', '2.2.2.2', '3.3.3.3'] for neighbor in neighbors: self.cli_set(base_path + ['neighbor', neighbor, 'priority', priority]) self.cli_set(base_path + ['neighbor', neighbor, 'poll-interval', poll_interval]) # commit changes self.cli_commit() # Verify FRR ospfd configuration frrconfig = self.getFRRconfig('router ospf', endsection='^exit', daemon=ospf_daemon) self.assertIn(f'router ospf', frrconfig) for neighbor in neighbors: self.assertIn(f' neighbor {neighbor} priority {priority} poll-interval {poll_interval}', frrconfig) # default def test_ospf_07_redistribute(self): metric = '15' metric_type = '1' redistribute = ['babel', 'bgp', 'connected', 'isis', 'kernel', 'rip', 'static'] for protocol in redistribute: self.cli_set(base_path + ['redistribute', protocol, 'metric', metric]) self.cli_set(base_path + ['redistribute', protocol, 'route-map', route_map]) self.cli_set(base_path + ['redistribute', protocol, 'metric-type', metric_type]) # commit changes self.cli_commit() # Verify FRR ospfd configuration frrconfig = self.getFRRconfig('router ospf', endsection='^exit', daemon=ospf_daemon) self.assertIn(f'router ospf', frrconfig) for protocol in redistribute: self.assertIn(f' redistribute {protocol} metric {metric} metric-type {metric_type} route-map {route_map}', frrconfig) def test_ospf_08_virtual_link(self): networks = ['10.0.0.0/8', '172.16.0.0/12', '192.168.0.0/16'] area = '10' shortcut = 'enable' virtual_link = '192.0.2.1' hello = '6' retransmit = '5' transmit = '5' dead = '40' window_default = default_value(base_path + ['area', area, 'virtual-link', virtual_link, 'retransmit-window']) self.cli_set(base_path + ['area', area, 'shortcut', shortcut]) self.cli_set(base_path + ['area', area, 'virtual-link', virtual_link, 'hello-interval', hello]) self.cli_set(base_path + ['area', area, 'virtual-link', virtual_link, 'retransmit-interval', retransmit]) self.cli_set(base_path + ['area', area, 'virtual-link', virtual_link, 'transmit-delay', transmit]) self.cli_set(base_path + ['area', area, 'virtual-link', virtual_link, 'dead-interval', dead]) for network in networks: self.cli_set(base_path + ['area', area, 'network', network]) # commit changes self.cli_commit() # Verify FRR ospfd configuration frrconfig = self.getFRRconfig('router ospf', endsection='^exit', daemon=ospf_daemon) self.assertIn(f'router ospf', frrconfig) self.assertIn(f' area {area} shortcut {shortcut}', frrconfig) self.assertIn(f' area {area} virtual-link {virtual_link} hello-interval {hello} retransmit-interval {retransmit} retransmit-window {window_default} transmit-delay {transmit} dead-interval {dead}', frrconfig) for network in networks: self.assertIn(f' network {network} area {area}', frrconfig) def test_ospf_09_interface_configuration(self): interfaces = Section.interfaces('ethernet') password = 'vyos1234' bandwidth = '10000' cost = '150' network = 'point-to-point' priority = '200' bfd_profile = 'vyos-test' self.cli_set(base_path + ['passive-interface', 'default']) for interface in interfaces: base_interface = base_path + ['interface', interface] self.cli_set(base_interface + ['authentication', 'plaintext-password', password]) self.cli_set(base_interface + ['bandwidth', bandwidth]) self.cli_set(base_interface + ['bfd', 'profile', bfd_profile]) self.cli_set(base_interface + ['cost', cost]) self.cli_set(base_interface + ['mtu-ignore']) self.cli_set(base_interface + ['network', network]) self.cli_set(base_interface + ['priority', priority]) self.cli_set(base_interface + ['passive', 'disable']) # commit changes self.cli_commit() frrconfig = self.getFRRconfig('router ospf', endsection='^exit', daemon=ospf_daemon) self.assertIn(f'router ospf', frrconfig) self.assertIn(f' passive-interface default', frrconfig) for interface in interfaces: # Can not use daemon for getFRRconfig() as bandwidth parameter belongs to zebra process config = self.getFRRconfig(f'interface {interface}') self.assertIn(f'interface {interface}', config) self.assertIn(f' ip ospf authentication-key {password}', config) self.assertIn(f' ip ospf bfd', config) self.assertIn(f' ip ospf bfd profile {bfd_profile}', config) self.assertIn(f' ip ospf cost {cost}', config) self.assertIn(f' ip ospf mtu-ignore', config) self.assertIn(f' ip ospf network {network}', config) self.assertIn(f' ip ospf priority {priority}', config) self.assertIn(f' no ip ospf passive', config) self.assertIn(f' bandwidth {bandwidth}', config) # T5467: Remove interface from OSPF process and VRF self.cli_delete(base_path + ['interface']) self.cli_commit() for interface in interfaces: # T5467: It must also be removed from FRR config frrconfig = self.getFRRconfig(f'interface {interface}', daemon=ospf_daemon) self.assertNotIn(f'interface {interface}', frrconfig) # There should be no OSPF related command at all under the interface self.assertNotIn(f' ip ospf', frrconfig) def test_ospf_11_interface_area(self): area = '0' interfaces = Section.interfaces('ethernet') self.cli_set(base_path + ['area', area, 'network', '10.0.0.0/8']) for interface in interfaces: self.cli_set(base_path + ['interface', interface, 'area', area]) # we can not have bot area network and interface area set with self.assertRaises(ConfigSessionError): self.cli_commit() self.cli_delete(base_path + ['area', area, 'network']) self.cli_commit() # Verify FRR ospfd configuration frrconfig = self.getFRRconfig('router ospf', endsection='^exit', daemon=ospf_daemon) self.assertIn(f'router ospf', frrconfig) for interface in interfaces: config = self.getFRRconfig(f'interface {interface}', daemon=ospf_daemon) self.assertIn(f'interface {interface}', config) self.assertIn(f' ip ospf area {area}', config) def test_ospf_12_vrfs(self): # It is safe to assume that when the basic VRF test works, all # other OSPF related features work, as we entirely inherit the CLI # templates and Jinja2 FRR template. table = '1000' vrf = 'blue' vrf_base = ['vrf', 'name', vrf] vrf_iface = 'eth1' area = '1' self.cli_set(vrf_base + ['table', table]) self.cli_set(vrf_base + ['protocols', 'ospf', 'interface', vrf_iface, 'area', area]) self.cli_set(['interfaces', 'ethernet', vrf_iface, 'vrf', vrf]) # Also set a default VRF OSPF config self.cli_set(base_path) self.cli_commit() # Verify FRR ospfd configuration frrconfig = self.getFRRconfig('router ospf', endsection='^exit', daemon=ospf_daemon) self.assertIn(f'router ospf', frrconfig) self.assertIn(f' auto-cost reference-bandwidth 100', frrconfig) self.assertIn(f' timers throttle spf 200 1000 10000', frrconfig) # defaults frrconfig = self.getFRRconfig(f'router ospf vrf {vrf}', daemon=ospf_daemon) self.assertIn(f'router ospf vrf {vrf}', frrconfig) self.assertIn(f' auto-cost reference-bandwidth 100', frrconfig) self.assertIn(f' timers throttle spf 200 1000 10000', frrconfig) # defaults frrconfig = self.getFRRconfig(f'interface {vrf_iface}', daemon=ospf_daemon) self.assertIn(f'interface {vrf_iface}', frrconfig) self.assertIn(f' ip ospf area {area}', frrconfig) # T5467: Remove interface from OSPF process and VRF self.cli_delete(vrf_base + ['protocols', 'ospf', 'interface']) self.cli_delete(['interfaces', 'ethernet', vrf_iface, 'vrf']) self.cli_commit() # T5467: It must also be removed from FRR config frrconfig = self.getFRRconfig(f'interface {vrf_iface}', daemon=ospf_daemon) self.assertNotIn(f'interface {vrf_iface}', frrconfig) # There should be no OSPF related command at all under the interface self.assertNotIn(f' ip ospf', frrconfig) # cleanup self.cli_delete(['vrf', 'name', vrf]) self.cli_delete(['interfaces', 'ethernet', vrf_iface, 'vrf']) def test_ospf_13_export_list(self): # Verify explort-list works on ospf-area acl = '100' seq = '10' area = '0.0.0.10' network = '10.0.0.0/8' self.cli_set(['policy', 'access-list', acl, 'rule', seq, 'action', 'permit']) self.cli_set(['policy', 'access-list', acl, 'rule', seq, 'source', 'any']) self.cli_set(['policy', 'access-list', acl, 'rule', seq, 'destination', 'any']) self.cli_set(base_path + ['area', area, 'network', network]) self.cli_set(base_path + ['area', area, 'export-list', acl]) # commit changes self.cli_commit() # Verify FRR ospfd configuration frrconfig = self.getFRRconfig('router ospf', endsection='^exit', daemon=ospf_daemon) self.assertIn(f'router ospf', frrconfig) self.assertIn(f' timers throttle spf 200 1000 10000', frrconfig) # default self.assertIn(f' network {network} area {area}', frrconfig) self.assertIn(f' area {area} export-list {acl}', frrconfig) def test_ospf_14_segment_routing_configuration(self): global_block_low = "300" global_block_high = "399" local_block_low = "400" local_block_high = "499" maximum_stack_size = '5' prefix_one = '192.168.0.1/32' prefix_two = '192.168.0.2/32' prefix_one_value = '1' prefix_two_value = '2' self.cli_set(base_path + ['interface', dummy_if]) self.cli_set(base_path + ['segment-routing', 'maximum-label-depth', maximum_stack_size]) self.cli_set(base_path + ['segment-routing', 'global-block', 'low-label-value', global_block_low]) self.cli_set(base_path + ['segment-routing', 'global-block', 'high-label-value', global_block_high]) self.cli_set(base_path + ['segment-routing', 'local-block', 'low-label-value', local_block_low]) self.cli_set(base_path + ['segment-routing', 'local-block', 'high-label-value', local_block_high]) self.cli_set(base_path + ['segment-routing', 'prefix', prefix_one, 'index', 'value', prefix_one_value]) self.cli_set(base_path + ['segment-routing', 'prefix', prefix_one, 'index', 'explicit-null']) self.cli_set(base_path + ['segment-routing', 'prefix', prefix_two, 'index', 'value', prefix_two_value]) self.cli_set(base_path + ['segment-routing', 'prefix', prefix_two, 'index', 'no-php-flag']) # Commit all changes self.cli_commit() # Verify all changes frrconfig = self.getFRRconfig('router ospf', endsection='^exit', daemon=ospf_daemon) self.assertIn(f' segment-routing on', frrconfig) self.assertIn(f' segment-routing global-block {global_block_low} {global_block_high} local-block {local_block_low} {local_block_high}', frrconfig) self.assertIn(f' segment-routing node-msd {maximum_stack_size}', frrconfig) self.assertIn(f' segment-routing prefix {prefix_one} index {prefix_one_value} explicit-null', frrconfig) self.assertIn(f' segment-routing prefix {prefix_two} index {prefix_two_value} no-php-flag', frrconfig) def test_ospf_15_ldp_sync(self): holddown = "500" interfaces = Section.interfaces('ethernet') self.cli_set(base_path + ['interface', dummy_if]) self.cli_set(base_path + ['ldp-sync', 'holddown', holddown]) # Commit main OSPF changes self.cli_commit() # Verify main OSPF changes frrconfig = self.getFRRconfig('router ospf', endsection='^exit', daemon=ospf_daemon) self.assertIn(f'router ospf', frrconfig) self.assertIn(f' timers throttle spf 200 1000 10000', frrconfig) self.assertIn(f' mpls ldp-sync holddown {holddown}', frrconfig) for interface in interfaces: self.cli_set(base_path + ['interface', interface, 'ldp-sync', 'holddown', holddown]) # Commit interface changes for holddown self.cli_commit() for interface in interfaces: # Verify interface changes for holddown config = self.getFRRconfig(f'interface {interface}', daemon=ospf_daemon) self.assertIn(f'interface {interface}', config) self.assertIn(f' ip ospf dead-interval 40', config) self.assertIn(f' ip ospf mpls ldp-sync', config) self.assertIn(f' ip ospf mpls ldp-sync holddown {holddown}', config) for interface in interfaces: self.cli_set(base_path + ['interface', interface, 'ldp-sync', 'disable']) # Commit interface changes for disable self.cli_commit() for interface in interfaces: # Verify interface changes for disable config = self.getFRRconfig(f'interface {interface}', daemon=ospf_daemon) self.assertIn(f'interface {interface}', config) self.assertIn(f' ip ospf dead-interval 40', config) self.assertNotIn(f' ip ospf mpls ldp-sync', config) def test_ospf_16_graceful_restart(self): period = '300' supported_grace_time = '400' router_ids = ['192.0.2.1', '192.0.2.2'] self.cli_set(base_path + ['capability', 'opaque']) self.cli_set(base_path + ['graceful-restart', 'grace-period', period]) self.cli_set(base_path + ['graceful-restart', 'helper', 'planned-only']) self.cli_set(base_path + ['graceful-restart', 'helper', 'no-strict-lsa-checking']) self.cli_set(base_path + ['graceful-restart', 'helper', 'supported-grace-time', supported_grace_time]) for router_id in router_ids: self.cli_set(base_path + ['graceful-restart', 'helper', 'enable', 'router-id', router_id]) # commit changes self.cli_commit() # Verify FRR ospfd configuration frrconfig = self.getFRRconfig('router ospf', endsection='^exit', daemon=ospf_daemon) self.assertIn(f'router ospf', frrconfig) self.assertIn(f' capability opaque', frrconfig) self.assertIn(f' graceful-restart grace-period {period}', frrconfig) self.assertIn(f' graceful-restart helper planned-only', frrconfig) self.assertIn(f' no graceful-restart helper strict-lsa-checking', frrconfig) self.assertIn(f' graceful-restart helper supported-grace-time {supported_grace_time}', frrconfig) for router_id in router_ids: self.assertIn(f' graceful-restart helper enable {router_id}', frrconfig) def test_ospf_17_duplicate_area_network(self): area0 = '0' area1 = '1' network = '10.0.0.0/8' self.cli_set(base_path + ['area', area0, 'network', network]) # we can not have the same network defined on two areas self.cli_set(base_path + ['area', area1, 'network', network]) with self.assertRaises(ConfigSessionError): self.cli_commit() self.cli_delete(base_path + ['area', area0]) self.cli_commit() # Verify FRR ospfd configuration - frrconfig = self.getFRRconfig('router ospf', endsection='^exit', daemon=ospf_daemon) - # Required to prevent the race condition T6761 - retry_count = 0 - max_retries = 60 - - while not frrconfig and retry_count < max_retries: - # Log every 10 seconds - if retry_count % 10 == 0: - print(f"Attempt {retry_count}: FRR config is still empty. Retrying...") - - retry_count += 1 - sleep(1) - frrconfig = self.getFRRconfig('router ospf', endsection='^exit', daemon=ospf_daemon) - - if not frrconfig: - print("Failed to retrieve FRR config after 60 seconds") - + frrconfig = self.getFRRconfig('router ospf', endsection='^exit', daemon=ospf_daemon, empty_retry=60) self.assertIn(f'router ospf', frrconfig) self.assertIn(f' network {network} area {area1}', frrconfig) if __name__ == '__main__': unittest.main(verbosity=2)