diff --git a/python/vyos/frrender.py b/python/vyos/frrender.py index ead893ff9..95d6c7243 100644 --- a/python/vyos/frrender.py +++ b/python/vyos/frrender.py @@ -1,168 +1,176 @@ # Copyright 2024 VyOS maintainers and contributors <maintainers@vyos.io> # # This library is free software; you can redistribute it and/or # modify it under the terms of the GNU Lesser General Public # License as published by the Free Software Foundation; either # version 2.1 of the License, or (at your option) any later version. # # This library is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU # Lesser General Public License for more details. # # You should have received a copy of the GNU Lesser General Public License # along with this library. If not, see <http://www.gnu.org/licenses/>. """ Library used to interface with FRRs mgmtd introduced in version 10.0 """ import os from vyos.defaults import frr_debug_enable from vyos.utils.file import write_file from vyos.utils.process import rc_cmd from vyos.template import render_to_string from vyos import ConfigError DEBUG_ON = os.path.exists(frr_debug_enable) def debug(message): if not DEBUG_ON: return print(message) frr_protocols = ['babel', 'bfd', 'bgp', 'eigrp', 'isis', 'mpls', 'nhrp', 'openfabric', 'ospf', 'ospfv3', 'pim', 'pim6', 'rip', 'ripng', 'rpki', 'segment_routing', 'static'] +babel_daemon = 'babeld' +bfd_daemon = 'bfdd' bgp_daemon = 'bgpd' isis_daemon = 'isisd' +ldpd_daemon = 'ldpd' mgmt_daemon = 'mgmtd' openfabric_daemon = 'fabricd' +ospf_daemon = 'ospfd' +ospf6_daemon = 'ospf6d' pim_daemon = 'pimd' +pim6_daemon = 'pim6d' +rip_daemon = 'ripd' +ripng_daemon = 'ripngd' zebra_daemon = 'zebra' class FRRender: def __init__(self): self._frr_conf = '/run/frr/config/frr.conf' def generate(self, config): if not isinstance(config, dict): tmp = type(config) raise ValueError(f'Config must be of type "dict" and not "{tmp}"!') def inline_helper(config_dict) -> str: output = '!\n' if 'babel' in config_dict and 'deleted' not in config_dict['babel']: output += render_to_string('frr/babeld.frr.j2', config_dict['babel']) output += '\n' if 'bfd' in config_dict and 'deleted' not in config_dict['bfd']: output += render_to_string('frr/bfdd.frr.j2', config_dict['bfd']) output += '\n' if 'bgp' in config_dict and 'deleted' not in config_dict['bgp']: output += render_to_string('frr/bgpd.frr.j2', config_dict['bgp']) output += '\n' if 'eigrp' in config_dict and 'deleted' not in config_dict['eigrp']: output += render_to_string('frr/eigrpd.frr.j2', config_dict['eigrp']) output += '\n' if 'isis' in config_dict and 'deleted' not in config_dict['isis']: output += render_to_string('frr/isisd.frr.j2', config_dict['isis']) output += '\n' if 'mpls' in config_dict and 'deleted' not in config_dict['mpls']: output += render_to_string('frr/ldpd.frr.j2', config_dict['mpls']) output += '\n' if 'openfabric' in config_dict and 'deleted' not in config_dict['openfabric']: output += render_to_string('frr/fabricd.frr.j2', config_dict['openfabric']) output += '\n' if 'ospf' in config_dict and 'deleted' not in config_dict['ospf']: output += render_to_string('frr/ospfd.frr.j2', config_dict['ospf']) output += '\n' if 'ospfv3' in config_dict and 'deleted' not in config_dict['ospfv3']: output += render_to_string('frr/ospf6d.frr.j2', config_dict['ospfv3']) output += '\n' if 'pim' in config_dict and 'deleted' not in config_dict['pim']: output += render_to_string('frr/pimd.frr.j2', config_dict['pim']) output += '\n' if 'pim6' in config_dict and 'deleted' not in config_dict['pim6']: output += render_to_string('frr/pim6d.frr.j2', config_dict['pim6']) output += '\n' if 'policy' in config_dict and len(config_dict['policy']) > 0: output += render_to_string('frr/policy.frr.j2', config_dict['policy']) output += '\n' if 'rip' in config_dict and 'deleted' not in config_dict['rip']: output += render_to_string('frr/ripd.frr.j2', config_dict['rip']) output += '\n' if 'ripng' in config_dict and 'deleted' not in config_dict['ripng']: output += render_to_string('frr/ripngd.frr.j2', config_dict['ripng']) output += '\n' if 'rpki' in config_dict and 'deleted' not in config_dict['rpki']: output += render_to_string('frr/rpki.frr.j2', config_dict['rpki']) output += '\n' if 'segment_routing' in config_dict and 'deleted' not in config_dict['segment_routing']: output += render_to_string('frr/zebra.segment_routing.frr.j2', config_dict['segment_routing']) output += '\n' if 'static' in config_dict and 'deleted' not in config_dict['static']: output += render_to_string('frr/staticd.frr.j2', config_dict['static']) output += '\n' if 'ip' in config_dict and 'deleted' not in config_dict['ip']: output += render_to_string('frr/zebra.route-map.frr.j2', config_dict['ip']) output += '\n' if 'ipv6' in config_dict and 'deleted' not in config_dict['ipv6']: output += render_to_string('frr/zebra.route-map.frr.j2', config_dict['ipv6']) output += '\n' return output debug('======< RENDERING CONFIG >======') # we can not reload an empty file, thus we always embed the marker output = '!\n' # Enable SNMP agentx support # SNMP AgentX support cannot be disabled once enabled if 'snmp' in config: output += 'agentx\n' # Add routing protocols in global VRF output += inline_helper(config) # Interface configuration for EVPN is not VRF related if 'interfaces' in config: output += render_to_string('frr/evpn.mh.frr.j2', {'interfaces' : config['interfaces']}) output += '\n' if 'vrf' in config and 'name' in config['vrf']: output += render_to_string('frr/zebra.vrf.route-map.frr.j2', config['vrf']) + '\n' for vrf, vrf_config in config['vrf']['name'].items(): if 'protocols' not in vrf_config: continue for protocol in vrf_config['protocols']: vrf_config['protocols'][protocol]['vrf'] = vrf output += inline_helper(vrf_config['protocols']) debug(output) debug('======< RENDERING CONFIG COMPLETE >======') write_file(self._frr_conf, output) if DEBUG_ON: write_file('/tmp/frr.conf.debug', output) def apply(self): count = 0 count_max = 5 emsg = '' while count < count_max: count += 1 debug(f'FRR: Reloading configuration - tries: {count} | Python class ID: {id(self)}') cmdline = '/usr/lib/frr/frr-reload.py --reload' if DEBUG_ON: cmdline += ' --debug' rc, emsg = rc_cmd(f'{cmdline} {self._frr_conf}') if rc != 0: debug('FRR configuration reload failed, retrying') continue debug(emsg) debug('======< DONE APPLYING CONFIG >======') break if count >= count_max: raise ConfigError(emsg) def save_configuration(): """ T3217: Save FRR configuration to /run/frr/config/frr.conf """ return cmd('/usr/bin/vtysh -n --writeconfig') diff --git a/smoketest/scripts/cli/test_protocols_babel.py b/smoketest/scripts/cli/test_protocols_babel.py index 606c1efd3..7a79c7b9e 100755 --- a/smoketest/scripts/cli/test_protocols_babel.py +++ b/smoketest/scripts/cli/test_protocols_babel.py @@ -1,218 +1,218 @@ #!/usr/bin/env python3 # # Copyright (C) 2024 VyOS maintainers and contributors # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License version 2 or later as # published by the Free Software Foundation. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. import unittest from base_vyostest_shim import VyOSUnitTestSHIM from vyos.ifconfig import Section +from vyos.frrender import babel_daemon from vyos.utils.process import process_named_running from vyos.xml_ref import default_value -PROCESS_NAME = 'babeld' base_path = ['protocols', 'babel'] class TestProtocolsBABEL(VyOSUnitTestSHIM.TestCase): @classmethod def setUpClass(cls): cls._interfaces = Section.interfaces('ethernet', vlan=False) # call base-classes classmethod super(TestProtocolsBABEL, cls).setUpClass() # Retrieve FRR daemon PID - it is not allowed to crash, thus PID must remain the same - cls.daemon_pid = process_named_running(PROCESS_NAME) + cls.daemon_pid = process_named_running(babel_daemon) # ensure we can also run this test on a live system - so lets clean # out the current configuration :) cls.cli_delete(cls, base_path) cls.cli_delete(cls, ['policy', 'prefix-list']) cls.cli_delete(cls, ['policy', 'prefix-list6']) def tearDown(self): # always destroy the entire babel configuration to make the processes # life as hard as possible self.cli_delete(base_path) self.cli_delete(['policy', 'prefix-list']) self.cli_delete(['policy', 'prefix-list6']) self.cli_commit() # check process health and continuity - self.assertEqual(self.daemon_pid, process_named_running(PROCESS_NAME)) + self.assertEqual(self.daemon_pid, process_named_running(babel_daemon)) def test_babel_interfaces(self): def_update_interval = default_value(base_path + ['interface', 'eth0', 'update-interval']) channel = '20' hello_interval = '1000' max_rtt_penalty = '100' rtt_decay = '23' rtt_max = '119' rtt_min = '11' rxcost = '40000' type = 'wired' for interface in self._interfaces: self.cli_set(base_path + ['interface', interface]) self.cli_set(base_path + ['interface', interface, 'channel', channel]) self.cli_set(base_path + ['interface', interface, 'enable-timestamps']) self.cli_set(base_path + ['interface', interface, 'hello-interval', hello_interval]) self.cli_set(base_path + ['interface', interface, 'max-rtt-penalty', max_rtt_penalty]) self.cli_set(base_path + ['interface', interface, 'rtt-decay', rtt_decay]) self.cli_set(base_path + ['interface', interface, 'rtt-max', rtt_max]) self.cli_set(base_path + ['interface', interface, 'rtt-min', rtt_min]) self.cli_set(base_path + ['interface', interface, 'enable-timestamps']) self.cli_set(base_path + ['interface', interface, 'rxcost', rxcost]) self.cli_set(base_path + ['interface', interface, 'split-horizon', 'disable']) self.cli_set(base_path + ['interface', interface, 'type', type]) self.cli_commit() - frrconfig = self.getFRRconfig('router babel', daemon=PROCESS_NAME) + frrconfig = self.getFRRconfig('router babel', endsection='^exit', daemon=babel_daemon) for interface in self._interfaces: self.assertIn(f' network {interface}', frrconfig) - iface_config = self.getFRRconfig(f'interface {interface}', daemon=PROCESS_NAME) + iface_config = self.getFRRconfig(f'interface {interface}', endsection='^exit', daemon=babel_daemon) self.assertIn(f' babel channel {channel}', iface_config) self.assertIn(f' babel enable-timestamps', iface_config) self.assertIn(f' babel update-interval {def_update_interval}', iface_config) self.assertIn(f' babel hello-interval {hello_interval}', iface_config) self.assertIn(f' babel rtt-decay {rtt_decay}', iface_config) self.assertIn(f' babel rtt-max {rtt_max}', iface_config) self.assertIn(f' babel rtt-min {rtt_min}', iface_config) self.assertIn(f' babel rxcost {rxcost}', iface_config) self.assertIn(f' babel max-rtt-penalty {max_rtt_penalty}', iface_config) self.assertIn(f' no babel split-horizon', iface_config) self.assertIn(f' babel {type}', iface_config) def test_babel_redistribute(self): ipv4_protos = ['bgp', 'connected', 'isis', 'kernel', 'ospf', 'rip', 'static'] ipv6_protos = ['bgp', 'connected', 'isis', 'kernel', 'ospfv3', 'ripng', 'static'] for protocol in ipv4_protos: self.cli_set(base_path + ['redistribute', 'ipv4', protocol]) for protocol in ipv6_protos: self.cli_set(base_path + ['redistribute', 'ipv6', protocol]) self.cli_commit() - frrconfig = self.getFRRconfig('router babel', daemon=PROCESS_NAME) + frrconfig = self.getFRRconfig('router babel', endsection='^exit', daemon=babel_daemon) for protocol in ipv4_protos: self.assertIn(f' redistribute ipv4 {protocol}', frrconfig) for protocol in ipv6_protos: if protocol == 'ospfv3': protocol = 'ospf6' self.assertIn(f' redistribute ipv6 {protocol}', frrconfig) def test_babel_basic(self): diversity_factor = '64' resend_delay = '100' smoothing_half_life = '400' self.cli_set(base_path + ['parameters', 'diversity']) self.cli_set(base_path + ['parameters', 'diversity-factor', diversity_factor]) self.cli_set(base_path + ['parameters', 'resend-delay', resend_delay]) self.cli_set(base_path + ['parameters', 'smoothing-half-life', smoothing_half_life]) self.cli_commit() - frrconfig = self.getFRRconfig('router babel', daemon=PROCESS_NAME) + frrconfig = self.getFRRconfig('router babel', endsection='^exit', daemon=babel_daemon) self.assertIn(f' babel diversity', frrconfig) self.assertIn(f' babel diversity-factor {diversity_factor}', frrconfig) self.assertIn(f' babel resend-delay {resend_delay}', frrconfig) self.assertIn(f' babel smoothing-half-life {smoothing_half_life}', frrconfig) def test_babel_distribute_list(self): access_list_in4 = '40' access_list_out4 = '50' access_list_in4_iface = '44' access_list_out4_iface = '55' access_list_in6 = 'AL-foo-in6' access_list_out6 = 'AL-foo-out6' prefix_list_in4 = 'PL-foo-in4' prefix_list_out4 = 'PL-foo-out4' prefix_list_in6 = 'PL-foo-in6' prefix_list_out6 = 'PL-foo-out6' self.cli_set(['policy', 'access-list', access_list_in4]) self.cli_set(['policy', 'access-list', access_list_out4]) self.cli_set(['policy', 'access-list6', access_list_in6]) self.cli_set(['policy', 'access-list6', access_list_out6]) self.cli_set(['policy', 'access-list', f'{access_list_in4_iface}']) self.cli_set(['policy', 'access-list', f'{access_list_out4_iface}']) self.cli_set(['policy', 'prefix-list', prefix_list_in4]) self.cli_set(['policy', 'prefix-list', prefix_list_out4]) self.cli_set(['policy', 'prefix-list6', prefix_list_in6]) self.cli_set(['policy', 'prefix-list6', prefix_list_out6]) self.cli_set(base_path + ['distribute-list', 'ipv4', 'access-list', 'in', access_list_in4]) self.cli_set(base_path + ['distribute-list', 'ipv4', 'access-list', 'out', access_list_out4]) self.cli_set(base_path + ['distribute-list', 'ipv6', 'access-list', 'in', access_list_in6]) self.cli_set(base_path + ['distribute-list', 'ipv6', 'access-list', 'out', access_list_out6]) self.cli_set(base_path + ['distribute-list', 'ipv4', 'prefix-list', 'in', prefix_list_in4]) self.cli_set(base_path + ['distribute-list', 'ipv4', 'prefix-list', 'out', prefix_list_out4]) self.cli_set(base_path + ['distribute-list', 'ipv6', 'prefix-list', 'in', prefix_list_in6]) self.cli_set(base_path + ['distribute-list', 'ipv6', 'prefix-list', 'out', prefix_list_out6]) for interface in self._interfaces: self.cli_set(base_path + ['interface', interface]) self.cli_set(['policy', 'access-list6', f'{access_list_in6}-{interface}']) self.cli_set(['policy', 'access-list6', f'{access_list_out6}-{interface}']) self.cli_set(['policy', 'prefix-list', f'{prefix_list_in4}-{interface}']) self.cli_set(['policy', 'prefix-list', f'{prefix_list_out4}-{interface}']) self.cli_set(['policy', 'prefix-list6', f'{prefix_list_in6}-{interface}']) self.cli_set(['policy', 'prefix-list6', f'{prefix_list_out6}-{interface}']) tmp_path = base_path + ['distribute-list', 'ipv4', 'interface', interface] self.cli_set(tmp_path + ['access-list', 'in', f'{access_list_in4_iface}']) self.cli_set(tmp_path + ['access-list', 'out', f'{access_list_out4_iface}']) self.cli_set(tmp_path + ['prefix-list', 'in', f'{prefix_list_in4}-{interface}']) self.cli_set(tmp_path + ['prefix-list', 'out', f'{prefix_list_out4}-{interface}']) tmp_path = base_path + ['distribute-list', 'ipv6', 'interface', interface] self.cli_set(tmp_path + ['access-list', 'in', f'{access_list_in6}-{interface}']) self.cli_set(tmp_path + ['access-list', 'out', f'{access_list_out6}-{interface}']) self.cli_set(tmp_path + ['prefix-list', 'in', f'{prefix_list_in6}-{interface}']) self.cli_set(tmp_path + ['prefix-list', 'out', f'{prefix_list_out6}-{interface}']) self.cli_commit() - frrconfig = self.getFRRconfig('router babel', daemon=PROCESS_NAME) + frrconfig = self.getFRRconfig('router babel', endsection='^exit', daemon=babel_daemon) self.assertIn(f' distribute-list {access_list_in4} in', frrconfig) self.assertIn(f' distribute-list {access_list_out4} out', frrconfig) self.assertIn(f' ipv6 distribute-list {access_list_in6} in', frrconfig) self.assertIn(f' ipv6 distribute-list {access_list_out6} out', frrconfig) self.assertIn(f' distribute-list prefix {prefix_list_in4} in', frrconfig) self.assertIn(f' distribute-list prefix {prefix_list_out4} out', frrconfig) self.assertIn(f' ipv6 distribute-list prefix {prefix_list_in6} in', frrconfig) self.assertIn(f' ipv6 distribute-list prefix {prefix_list_out6} out', frrconfig) for interface in self._interfaces: self.assertIn(f' distribute-list {access_list_in4_iface} in {interface}', frrconfig) self.assertIn(f' distribute-list {access_list_out4_iface} out {interface}', frrconfig) self.assertIn(f' ipv6 distribute-list {access_list_in6}-{interface} in {interface}', frrconfig) self.assertIn(f' ipv6 distribute-list {access_list_out6}-{interface} out {interface}', frrconfig) self.assertIn(f' distribute-list prefix {prefix_list_in4}-{interface} in {interface}', frrconfig) self.assertIn(f' distribute-list prefix {prefix_list_out4}-{interface} out {interface}', frrconfig) self.assertIn(f' ipv6 distribute-list prefix {prefix_list_in6}-{interface} in {interface}', frrconfig) self.assertIn(f' ipv6 distribute-list prefix {prefix_list_out6}-{interface} out {interface}', frrconfig) if __name__ == '__main__': unittest.main(verbosity=2) diff --git a/smoketest/scripts/cli/test_protocols_bfd.py b/smoketest/scripts/cli/test_protocols_bfd.py index 9f178c821..32e39e8f7 100755 --- a/smoketest/scripts/cli/test_protocols_bfd.py +++ b/smoketest/scripts/cli/test_protocols_bfd.py @@ -1,238 +1,238 @@ #!/usr/bin/env python3 # # Copyright (C) 2021-2023 VyOS maintainers and contributors # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License version 2 or later as # published by the Free Software Foundation. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. import unittest from base_vyostest_shim import VyOSUnitTestSHIM from vyos.configsession import ConfigSessionError +from vyos.frrender import bfd_daemon from vyos.utils.process import process_named_running -PROCESS_NAME = 'bfdd' base_path = ['protocols', 'bfd'] frr_endsection = '^ exit' dum_if = 'dum1001' vrf_name = 'red' peers = { '192.0.2.10' : { 'intv_rx' : '500', 'intv_tx' : '600', 'multihop' : '', 'source_addr': '192.0.2.254', 'profile' : 'foo-bar-baz', 'minimum_ttl': '20', }, '192.0.2.20' : { 'echo_mode' : '', 'intv_echo' : '100', 'intv_mult' : '100', 'intv_rx' : '222', 'intv_tx' : '333', 'passive' : '', 'shutdown' : '', 'profile' : 'foo', 'source_intf': dum_if, }, '2001:db8::1000:1' : { 'source_addr': '2001:db8::1', 'vrf' : vrf_name, }, '2001:db8::2000:1' : { 'source_addr': '2001:db8::1', 'multihop' : '', 'profile' : 'baz_foo', }, } profiles = { 'foo' : { 'echo_mode' : '', 'intv_echo' : '100', 'intv_mult' : '101', 'intv_rx' : '222', 'intv_tx' : '333', 'shutdown' : '', 'minimum_ttl': '40', }, 'foo-bar-baz' : { 'intv_mult' : '4', 'intv_rx' : '400', 'intv_tx' : '400', }, 'baz_foo' : { 'intv_mult' : '102', 'intv_rx' : '444', 'passive' : '', }, } class TestProtocolsBFD(VyOSUnitTestSHIM.TestCase): @classmethod def setUpClass(cls): super(TestProtocolsBFD, cls).setUpClass() # Retrieve FRR daemon PID - it is not allowed to crash, thus PID must remain the same - cls.daemon_pid = process_named_running(PROCESS_NAME) + cls.daemon_pid = process_named_running(bfd_daemon) # ensure we can also run this test on a live system - so lets clean # out the current configuration :) cls.cli_delete(cls, base_path) def tearDown(self): self.cli_delete(base_path) self.cli_commit() # check process health and continuity - self.assertEqual(self.daemon_pid, process_named_running(PROCESS_NAME)) + self.assertEqual(self.daemon_pid, process_named_running(bfd_daemon)) def test_bfd_peer(self): self.cli_set(['vrf', 'name', vrf_name, 'table', '1000']) for peer, peer_config in peers.items(): if 'echo_mode' in peer_config: self.cli_set(base_path + ['peer', peer, 'echo-mode']) if 'intv_echo' in peer_config: self.cli_set(base_path + ['peer', peer, 'interval', 'echo-interval', peer_config["intv_echo"]]) if 'intv_mult' in peer_config: self.cli_set(base_path + ['peer', peer, 'interval', 'multiplier', peer_config["intv_mult"]]) if 'intv_rx' in peer_config: self.cli_set(base_path + ['peer', peer, 'interval', 'receive', peer_config["intv_rx"]]) if 'intv_tx' in peer_config: self.cli_set(base_path + ['peer', peer, 'interval', 'transmit', peer_config["intv_tx"]]) if 'minimum_ttl' in peer_config: self.cli_set(base_path + ['peer', peer, 'minimum-ttl', peer_config["minimum_ttl"]]) if 'multihop' in peer_config: self.cli_set(base_path + ['peer', peer, 'multihop']) if 'passive' in peer_config: self.cli_set(base_path + ['peer', peer, 'passive']) if 'shutdown' in peer_config: self.cli_set(base_path + ['peer', peer, 'shutdown']) if 'source_addr' in peer_config: self.cli_set(base_path + ['peer', peer, 'source', 'address', peer_config["source_addr"]]) if 'source_intf' in peer_config: self.cli_set(base_path + ['peer', peer, 'source', 'interface', peer_config["source_intf"]]) if 'vrf' in peer_config: self.cli_set(base_path + ['peer', peer, 'vrf', peer_config["vrf"]]) # commit changes self.cli_commit() # Verify FRR bgpd configuration - frrconfig = self.getFRRconfig('bfd', endsection='^exit', daemon=PROCESS_NAME) + frrconfig = self.getFRRconfig('bfd', endsection='^exit', daemon=bfd_daemon) for peer, peer_config in peers.items(): tmp = f'peer {peer}' if 'multihop' in peer_config: tmp += f' multihop' if 'source_addr' in peer_config: tmp += f' local-address {peer_config["source_addr"]}' if 'source_intf' in peer_config: tmp += f' interface {peer_config["source_intf"]}' if 'vrf' in peer_config: tmp += f' vrf {peer_config["vrf"]}' self.assertIn(tmp, frrconfig) peerconfig = self.getFRRconfig(f' peer {peer}', end='', endsection=frr_endsection, - daemon=PROCESS_NAME) + daemon=bfd_daemon) if 'echo_mode' in peer_config: self.assertIn(f'echo-mode', peerconfig) if 'intv_echo' in peer_config: self.assertIn(f'echo receive-interval {peer_config["intv_echo"]}', peerconfig) self.assertIn(f'echo transmit-interval {peer_config["intv_echo"]}', peerconfig) if 'intv_mult' in peer_config: self.assertIn(f'detect-multiplier {peer_config["intv_mult"]}', peerconfig) if 'intv_rx' in peer_config: self.assertIn(f'receive-interval {peer_config["intv_rx"]}', peerconfig) if 'intv_tx' in peer_config: self.assertIn(f'transmit-interval {peer_config["intv_tx"]}', peerconfig) if 'minimum_ttl' in peer_config: self.assertIn(f'minimum-ttl {peer_config["minimum_ttl"]}', peerconfig) if 'passive' in peer_config: self.assertIn(f'passive-mode', peerconfig) if 'shutdown' in peer_config: self.assertIn(f'shutdown', peerconfig) else: self.assertNotIn(f'shutdown', peerconfig) self.cli_delete(['vrf', 'name', vrf_name]) def test_bfd_profile(self): for profile, profile_config in profiles.items(): if 'echo_mode' in profile_config: self.cli_set(base_path + ['profile', profile, 'echo-mode']) if 'intv_echo' in profile_config: self.cli_set(base_path + ['profile', profile, 'interval', 'echo-interval', profile_config["intv_echo"]]) if 'intv_mult' in profile_config: self.cli_set(base_path + ['profile', profile, 'interval', 'multiplier', profile_config["intv_mult"]]) if 'intv_rx' in profile_config: self.cli_set(base_path + ['profile', profile, 'interval', 'receive', profile_config["intv_rx"]]) if 'intv_tx' in profile_config: self.cli_set(base_path + ['profile', profile, 'interval', 'transmit', profile_config["intv_tx"]]) if 'minimum_ttl' in profile_config: self.cli_set(base_path + ['profile', profile, 'minimum-ttl', profile_config["minimum_ttl"]]) if 'passive' in profile_config: self.cli_set(base_path + ['profile', profile, 'passive']) if 'shutdown' in profile_config: self.cli_set(base_path + ['profile', profile, 'shutdown']) for peer, peer_config in peers.items(): if 'profile' in peer_config: self.cli_set(base_path + ['peer', peer, 'profile', peer_config["profile"] + 'wrong']) if 'source_addr' in peer_config: self.cli_set(base_path + ['peer', peer, 'source', 'address', peer_config["source_addr"]]) if 'source_intf' in peer_config: self.cli_set(base_path + ['peer', peer, 'source', 'interface', peer_config["source_intf"]]) # BFD profile does not exist! with self.assertRaises(ConfigSessionError): self.cli_commit() for peer, peer_config in peers.items(): if 'profile' in peer_config: self.cli_set(base_path + ['peer', peer, 'profile', peer_config["profile"]]) # commit changes self.cli_commit() # Verify FRR bgpd configuration for profile, profile_config in profiles.items(): config = self.getFRRconfig(f' profile {profile}', endsection=frr_endsection) if 'echo_mode' in profile_config: self.assertIn(f' echo-mode', config) if 'intv_echo' in profile_config: self.assertIn(f' echo receive-interval {profile_config["intv_echo"]}', config) self.assertIn(f' echo transmit-interval {profile_config["intv_echo"]}', config) if 'intv_mult' in profile_config: self.assertIn(f' detect-multiplier {profile_config["intv_mult"]}', config) if 'intv_rx' in profile_config: self.assertIn(f' receive-interval {profile_config["intv_rx"]}', config) if 'intv_tx' in profile_config: self.assertIn(f' transmit-interval {profile_config["intv_tx"]}', config) if 'minimum_ttl' in profile_config: self.assertIn(f' minimum-ttl {profile_config["minimum_ttl"]}', config) if 'passive' in profile_config: self.assertIn(f' passive-mode', config) if 'shutdown' in profile_config: self.assertIn(f' shutdown', config) else: self.assertNotIn(f'shutdown', config) for peer, peer_config in peers.items(): peerconfig = self.getFRRconfig(f' peer {peer}', end='', - endsection=frr_endsection, daemon=PROCESS_NAME) + endsection=frr_endsection, daemon=bfd_daemon) if 'profile' in peer_config: self.assertIn(f' profile {peer_config["profile"]}', peerconfig) if __name__ == '__main__': unittest.main(verbosity=2) diff --git a/smoketest/scripts/cli/test_protocols_bgp.py b/smoketest/scripts/cli/test_protocols_bgp.py index 1b6c30dfe..cdf18a051 100755 --- a/smoketest/scripts/cli/test_protocols_bgp.py +++ b/smoketest/scripts/cli/test_protocols_bgp.py @@ -1,1415 +1,1417 @@ #!/usr/bin/env python3 # # Copyright (C) 2021-2024 VyOS maintainers and contributors # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License version 2 or later as # published by the Free Software Foundation. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. import unittest from time import sleep from base_vyostest_shim import VyOSUnitTestSHIM from vyos.ifconfig import Section from vyos.configsession import ConfigSessionError from vyos.template import is_ipv6 from vyos.utils.process import process_named_running from vyos.utils.process import cmd from vyos.frrender import bgp_daemon -PROCESS_NAME = 'bgpd' ASN = '64512' base_path = ['protocols', 'bgp'] route_map_in = 'foo-map-in' route_map_out = 'foo-map-out' prefix_list_in = 'pfx-foo-in' prefix_list_out = 'pfx-foo-out' prefix_list_in6 = 'pfx-foo-in6' prefix_list_out6 = 'pfx-foo-out6' bfd_profile = 'foo-bar-baz' import_afi = 'ipv4-unicast' import_vrf = 'red' import_rd = ASN + ':100' import_vrf_base = ['vrf', 'name'] neighbor_config = { '192.0.2.1' : { 'bfd' : '', 'cap_dynamic' : '', 'cap_ext_next' : '', 'cap_ext_sver' : '', 'remote_as' : '100', 'adv_interv' : '400', 'passive' : '', 'password' : 'VyOS-Secure123', 'shutdown' : '', 'cap_over' : '', 'ttl_security' : '5', 'system_as' : '300', 'route_map_in' : route_map_in, 'route_map_out' : route_map_out, 'no_send_comm_ext' : '', 'addpath_all' : '', 'p_attr_discard' : ['10', '20', '30', '40', '50'], }, '192.0.2.2' : { 'bfd_profile' : bfd_profile, 'remote_as' : '200', 'shutdown' : '', 'no_cap_nego' : '', 'port' : '667', 'cap_strict' : '', 'advertise_map' : route_map_in, 'non_exist_map' : route_map_out, 'pfx_list_in' : prefix_list_in, 'pfx_list_out' : prefix_list_out, 'no_send_comm_std' : '', 'local_role' : 'rs-client', 'p_attr_taw' : '200', }, '192.0.2.3' : { 'advertise_map' : route_map_in, 'description' : 'foo bar baz', 'remote_as' : '200', 'passive' : '', 'multi_hop' : '5', 'update_src' : 'lo', 'peer_group' : 'foo', 'graceful_rst' : '', }, '2001:db8::1' : { 'advertise_map' : route_map_in, 'exist_map' : route_map_out, 'cap_dynamic' : '', 'cap_ext_next' : '', 'cap_ext_sver' : '', 'remote_as' : '123', 'adv_interv' : '400', 'passive' : '', 'password' : 'VyOS-Secure123', 'shutdown' : '', 'cap_over' : '', 'ttl_security' : '5', 'system_as' : '300', 'solo' : '', 'route_map_in' : route_map_in, 'route_map_out' : route_map_out, 'no_send_comm_std' : '', 'addpath_per_as' : '', 'peer_group' : 'foo-bar', 'local_role' : 'customer', 'local_role_strict': '', }, '2001:db8::2' : { 'remote_as' : '456', 'shutdown' : '', 'no_cap_nego' : '', 'port' : '667', 'cap_strict' : '', 'pfx_list_in' : prefix_list_in6, 'pfx_list_out' : prefix_list_out6, 'no_send_comm_ext' : '', 'peer_group' : 'foo-bar_baz', 'graceful_rst_hlp' : '', 'disable_conn_chk' : '', }, } peer_group_config = { 'foo' : { 'advertise_map' : route_map_in, 'exist_map' : route_map_out, 'bfd' : '', 'remote_as' : '100', 'passive' : '', 'password' : 'VyOS-Secure123', 'shutdown' : '', 'cap_over' : '', 'ttl_security' : '5', 'disable_conn_chk' : '', 'p_attr_discard' : ['100', '150', '200'], }, 'bar' : { 'remote_as' : '111', 'graceful_rst_no' : '', 'port' : '667', 'p_attr_taw' : '126', }, 'foo-bar' : { 'advertise_map' : route_map_in, 'description' : 'foo peer bar group', 'remote_as' : '200', 'shutdown' : '', 'no_cap_nego' : '', 'system_as' : '300', 'pfx_list_in' : prefix_list_in, 'pfx_list_out' : prefix_list_out, 'no_send_comm_ext' : '', }, 'foo-bar_baz' : { 'advertise_map' : route_map_in, 'non_exist_map' : route_map_out, 'bfd_profile' : bfd_profile, 'cap_dynamic' : '', 'cap_ext_next' : '', 'remote_as' : '200', 'passive' : '', 'multi_hop' : '5', 'update_src' : 'lo', 'route_map_in' : route_map_in, 'route_map_out' : route_map_out, 'local_role' : 'peer', 'local_role_strict': '', }, } class TestProtocolsBGP(VyOSUnitTestSHIM.TestCase): @classmethod def setUpClass(cls): super(TestProtocolsBGP, cls).setUpClass() # Retrieve FRR daemon PID - it is not allowed to crash, thus PID must remain the same - cls.daemon_pid = process_named_running(PROCESS_NAME) + cls.daemon_pid = process_named_running(bgp_daemon) # ensure we can also run this test on a live system - so lets clean # out the current configuration :) cls.cli_delete(cls, base_path) cls.cli_delete(cls, ['policy', 'route-map']) cls.cli_delete(cls, ['policy', 'prefix-list']) cls.cli_delete(cls, ['policy', 'prefix-list6']) cls.cli_delete(cls, ['vrf']) cls.cli_set(cls, ['policy', 'route-map', route_map_in, 'rule', '10', 'action', 'permit']) cls.cli_set(cls, ['policy', 'route-map', route_map_out, 'rule', '10', 'action', 'permit']) cls.cli_set(cls, ['policy', 'prefix-list', prefix_list_in, 'rule', '10', 'action', 'permit']) cls.cli_set(cls, ['policy', 'prefix-list', prefix_list_in, 'rule', '10', 'prefix', '192.0.2.0/25']) cls.cli_set(cls, ['policy', 'prefix-list', prefix_list_out, 'rule', '10', 'action', 'permit']) cls.cli_set(cls, ['policy', 'prefix-list', prefix_list_out, 'rule', '10', 'prefix', '192.0.2.128/25']) cls.cli_set(cls, ['policy', 'prefix-list6', prefix_list_in6, 'rule', '10', 'action', 'permit']) cls.cli_set(cls, ['policy', 'prefix-list6', prefix_list_in6, 'rule', '10', 'prefix', '2001:db8:1000::/64']) cls.cli_set(cls, ['policy', 'prefix-list6', prefix_list_out6, 'rule', '10', 'action', 'deny']) cls.cli_set(cls, ['policy', 'prefix-list6', prefix_list_out6, 'rule', '10', 'prefix', '2001:db8:2000::/64']) @classmethod def tearDownClass(cls): cls.cli_delete(cls, ['policy', 'route-map']) cls.cli_delete(cls, ['policy', 'prefix-list']) cls.cli_delete(cls, ['policy', 'prefix-list6']) def setUp(self): self.cli_set(base_path + ['system-as', ASN]) def tearDown(self): # cleanup any possible VRF mess self.cli_delete(['vrf']) # always destrox the entire bgpd configuration to make the processes # life as hard as possible self.cli_delete(base_path) self.cli_commit() - frrconfig = self.getFRRconfig('router bgp') + frrconfig = self.getFRRconfig('router bgp', endsection='^exit', daemon=bgp_daemon) self.assertNotIn(f'router bgp', frrconfig) # check process health and continuity - self.assertEqual(self.daemon_pid, process_named_running(PROCESS_NAME)) + self.assertEqual(self.daemon_pid, process_named_running(bgp_daemon)) def create_bgp_instances_for_import_test(self): table = '1000' self.cli_set(import_vrf_base + [import_vrf, 'table', table]) self.cli_set(import_vrf_base + [import_vrf, 'protocols', 'bgp', 'system-as', ASN]) def verify_frr_config(self, peer, peer_config, frrconfig): # recurring patterns to verify for both a simple neighbor and a peer-group if 'bfd' in peer_config: self.assertIn(f' neighbor {peer} bfd', frrconfig) if 'bfd_profile' in peer_config: self.assertIn(f' neighbor {peer} bfd profile {peer_config["bfd_profile"]}', frrconfig) self.assertIn(f' neighbor {peer} bfd check-control-plane-failure', frrconfig) if 'cap_dynamic' in peer_config: self.assertIn(f' neighbor {peer} capability dynamic', frrconfig) if 'cap_ext_next' in peer_config: self.assertIn(f' neighbor {peer} capability extended-nexthop', frrconfig) if 'cap_ext_sver' in peer_config: self.assertIn(f' neighbor {peer} capability software-version', frrconfig) if 'description' in peer_config: self.assertIn(f' neighbor {peer} description {peer_config["description"]}', frrconfig) if 'no_cap_nego' in peer_config: self.assertIn(f' neighbor {peer} dont-capability-negotiate', frrconfig) if 'multi_hop' in peer_config: self.assertIn(f' neighbor {peer} ebgp-multihop {peer_config["multi_hop"]}', frrconfig) if 'local_as' in peer_config: self.assertIn(f' neighbor {peer} local-as {peer_config["local_as"]} no-prepend replace-as', frrconfig) if 'local_role' in peer_config: tmp = f' neighbor {peer} local-role {peer_config["local_role"]}' if 'local_role_strict' in peer_config: tmp += ' strict' self.assertIn(tmp, frrconfig) if 'cap_over' in peer_config: self.assertIn(f' neighbor {peer} override-capability', frrconfig) if 'passive' in peer_config: self.assertIn(f' neighbor {peer} passive', frrconfig) if 'password' in peer_config: self.assertIn(f' neighbor {peer} password {peer_config["password"]}', frrconfig) if 'port' in peer_config: self.assertIn(f' neighbor {peer} port {peer_config["port"]}', frrconfig) if 'remote_as' in peer_config: self.assertIn(f' neighbor {peer} remote-as {peer_config["remote_as"]}', frrconfig) if 'solo' in peer_config: self.assertIn(f' neighbor {peer} solo', frrconfig) if 'shutdown' in peer_config: self.assertIn(f' neighbor {peer} shutdown', frrconfig) if 'ttl_security' in peer_config: self.assertIn(f' neighbor {peer} ttl-security hops {peer_config["ttl_security"]}', frrconfig) if 'update_src' in peer_config: self.assertIn(f' neighbor {peer} update-source {peer_config["update_src"]}', frrconfig) if 'route_map_in' in peer_config: self.assertIn(f' neighbor {peer} route-map {peer_config["route_map_in"]} in', frrconfig) if 'route_map_out' in peer_config: self.assertIn(f' neighbor {peer} route-map {peer_config["route_map_out"]} out', frrconfig) if 'pfx_list_in' in peer_config: self.assertIn(f' neighbor {peer} prefix-list {peer_config["pfx_list_in"]} in', frrconfig) if 'pfx_list_out' in peer_config: self.assertIn(f' neighbor {peer} prefix-list {peer_config["pfx_list_out"]} out', frrconfig) if 'no_send_comm_std' in peer_config: self.assertIn(f' no neighbor {peer} send-community', frrconfig) if 'no_send_comm_ext' in peer_config: self.assertIn(f' no neighbor {peer} send-community extended', frrconfig) if 'addpath_all' in peer_config: self.assertIn(f' neighbor {peer} addpath-tx-all-paths', frrconfig) if 'p_attr_discard' in peer_config: tmp = ' '.join(peer_config["p_attr_discard"]) self.assertIn(f' neighbor {peer} path-attribute discard {tmp}', frrconfig) if 'p_attr_taw' in peer_config: self.assertIn(f' neighbor {peer} path-attribute treat-as-withdraw {peer_config["p_attr_taw"]}', frrconfig) if 'addpath_per_as' in peer_config: self.assertIn(f' neighbor {peer} addpath-tx-bestpath-per-AS', frrconfig) if 'advertise_map' in peer_config: base = f' neighbor {peer} advertise-map {peer_config["advertise_map"]}' if 'exist_map' in peer_config: base = f'{base} exist-map {peer_config["exist_map"]}' if 'non_exist_map' in peer_config: base = f'{base} non-exist-map {peer_config["non_exist_map"]}' self.assertIn(base, frrconfig) if 'graceful_rst' in peer_config: self.assertIn(f' neighbor {peer} graceful-restart', frrconfig) if 'graceful_rst_no' in peer_config: self.assertIn(f' neighbor {peer} graceful-restart-disable', frrconfig) if 'graceful_rst_hlp' in peer_config: self.assertIn(f' neighbor {peer} graceful-restart-helper', frrconfig) if 'disable_conn_chk' in peer_config: self.assertIn(f' neighbor {peer} disable-connected-check', frrconfig) def test_bgp_01_simple(self): router_id = '127.0.0.1' local_pref = '500' stalepath_time = '60' max_path_v4 = '2' max_path_v4ibgp = '4' max_path_v6 = '8' max_path_v6ibgp = '16' cond_adv_timer = '30' min_hold_time = '2' tcp_keepalive_idle = '66' tcp_keepalive_interval = '77' tcp_keepalive_probes = '22' self.cli_set(base_path + ['parameters', 'allow-martian-nexthop']) self.cli_set(base_path + ['parameters', 'disable-ebgp-connected-route-check']) self.cli_set(base_path + ['parameters', 'no-hard-administrative-reset']) self.cli_set(base_path + ['parameters', 'log-neighbor-changes']) self.cli_set(base_path + ['parameters', 'labeled-unicast', 'explicit-null']) self.cli_set(base_path + ['parameters', 'router-id', router_id]) # System AS number MUST be defined - as this is set in setUp() we remove # this once for testing of the proper error self.cli_delete(base_path + ['system-as']) with self.assertRaises(ConfigSessionError): self.cli_commit() self.cli_set(base_path + ['system-as', ASN]) # Default local preference (higher = more preferred, default value is 100) self.cli_set(base_path + ['parameters', 'default', 'local-pref', local_pref]) self.cli_set(base_path + ['parameters', 'graceful-restart', 'stalepath-time', stalepath_time]) self.cli_set(base_path + ['parameters', 'graceful-shutdown']) self.cli_set(base_path + ['parameters', 'ebgp-requires-policy']) self.cli_set(base_path + ['parameters', 'bestpath', 'as-path', 'multipath-relax']) self.cli_set(base_path + ['parameters', 'bestpath', 'bandwidth', 'default-weight-for-missing']) self.cli_set(base_path + ['parameters', 'bestpath', 'compare-routerid']) self.cli_set(base_path + ['parameters', 'bestpath', 'peer-type', 'multipath-relax']) self.cli_set(base_path + ['parameters', 'conditional-advertisement', 'timer', cond_adv_timer]) self.cli_set(base_path + ['parameters', 'fast-convergence']) self.cli_set(base_path + ['parameters', 'minimum-holdtime', min_hold_time]) self.cli_set(base_path + ['parameters', 'no-suppress-duplicates']) self.cli_set(base_path + ['parameters', 'reject-as-sets']) self.cli_set(base_path + ['parameters', 'route-reflector-allow-outbound-policy']) self.cli_set(base_path + ['parameters', 'shutdown']) self.cli_set(base_path + ['parameters', 'suppress-fib-pending']) self.cli_set(base_path + ['parameters', 'tcp-keepalive', 'idle', tcp_keepalive_idle]) self.cli_set(base_path + ['parameters', 'tcp-keepalive', 'interval', tcp_keepalive_interval]) self.cli_set(base_path + ['parameters', 'tcp-keepalive', 'probes', tcp_keepalive_probes]) # AFI maximum path support self.cli_set(base_path + ['address-family', 'ipv4-unicast', 'maximum-paths', 'ebgp', max_path_v4]) self.cli_set(base_path + ['address-family', 'ipv4-unicast', 'maximum-paths', 'ibgp', max_path_v4ibgp]) self.cli_set(base_path + ['address-family', 'ipv4-labeled-unicast', 'maximum-paths', 'ebgp', max_path_v4]) self.cli_set(base_path + ['address-family', 'ipv4-labeled-unicast', 'maximum-paths', 'ibgp', max_path_v4ibgp]) self.cli_set(base_path + ['address-family', 'ipv6-unicast', 'maximum-paths', 'ebgp', max_path_v6]) self.cli_set(base_path + ['address-family', 'ipv6-unicast', 'maximum-paths', 'ibgp', max_path_v6ibgp]) # commit changes self.cli_commit() # Verify FRR bgpd configuration - frrconfig = self.getFRRconfig(f'router bgp {ASN}') + frrconfig = self.getFRRconfig(f'router bgp {ASN}', daemon=bgp_daemon) self.assertIn(f'router bgp {ASN}', frrconfig) self.assertIn(f' bgp router-id {router_id}', frrconfig) self.assertIn(f' bgp allow-martian-nexthop', frrconfig) self.assertIn(f' bgp disable-ebgp-connected-route-check', frrconfig) self.assertIn(f' bgp log-neighbor-changes', frrconfig) self.assertIn(f' bgp default local-preference {local_pref}', frrconfig) self.assertIn(f' bgp conditional-advertisement timer {cond_adv_timer}', frrconfig) self.assertIn(f' bgp fast-convergence', frrconfig) self.assertIn(f' bgp graceful-restart stalepath-time {stalepath_time}', frrconfig) self.assertIn(f' bgp graceful-shutdown', frrconfig) self.assertIn(f' no bgp hard-administrative-reset', frrconfig) self.assertIn(f' bgp labeled-unicast explicit-null', frrconfig) self.assertIn(f' bgp bestpath as-path multipath-relax', frrconfig) self.assertIn(f' bgp bestpath bandwidth default-weight-for-missing', frrconfig) self.assertIn(f' bgp bestpath compare-routerid', frrconfig) self.assertIn(f' bgp bestpath peer-type multipath-relax', frrconfig) self.assertIn(f' bgp minimum-holdtime {min_hold_time}', frrconfig) self.assertIn(f' bgp reject-as-sets', frrconfig) self.assertIn(f' bgp route-reflector allow-outbound-policy', frrconfig) self.assertIn(f' bgp shutdown', frrconfig) self.assertIn(f' bgp suppress-fib-pending', frrconfig) self.assertIn(f' bgp tcp-keepalive {tcp_keepalive_idle} {tcp_keepalive_interval} {tcp_keepalive_probes}', frrconfig) self.assertNotIn(f'bgp ebgp-requires-policy', frrconfig) self.assertIn(f' no bgp suppress-duplicates', frrconfig) - afiv4_config = self.getFRRconfig(' address-family ipv4 unicast') + afiv4_config = self.getFRRconfig(' address-family ipv4 unicast', + endsection='^ exit-address-family', daemon=bgp_daemon) self.assertIn(f' maximum-paths {max_path_v4}', afiv4_config) self.assertIn(f' maximum-paths ibgp {max_path_v4ibgp}', afiv4_config) - afiv4_config = self.getFRRconfig(' address-family ipv4 labeled-unicast') + afiv4_config = self.getFRRconfig(' address-family ipv4 labeled-unicast', + endsection='^ exit-address-family', daemon=bgp_daemon) self.assertIn(f' maximum-paths {max_path_v4}', afiv4_config) self.assertIn(f' maximum-paths ibgp {max_path_v4ibgp}', afiv4_config) - afiv6_config = self.getFRRconfig(' address-family ipv6 unicast') + afiv6_config = self.getFRRconfig(' address-family ipv6 unicast', + endsection='^ exit-address-family', daemon=bgp_daemon) self.assertIn(f' maximum-paths {max_path_v6}', afiv6_config) self.assertIn(f' maximum-paths ibgp {max_path_v6ibgp}', afiv6_config) def test_bgp_02_neighbors(self): # Test out individual neighbor configuration items, not all of them are # also available to a peer-group! self.cli_set(base_path + ['parameters', 'deterministic-med']) for peer, peer_config in neighbor_config.items(): afi = 'ipv4-unicast' if is_ipv6(peer): afi = 'ipv6-unicast' if 'adv_interv' in peer_config: self.cli_set(base_path + ['neighbor', peer, 'advertisement-interval', peer_config["adv_interv"]]) if 'bfd' in peer_config: self.cli_set(base_path + ['neighbor', peer, 'bfd']) if 'bfd_profile' in peer_config: self.cli_set(base_path + ['neighbor', peer, 'bfd', 'profile', peer_config["bfd_profile"]]) self.cli_set(base_path + ['neighbor', peer, 'bfd', 'check-control-plane-failure']) if 'cap_dynamic' in peer_config: self.cli_set(base_path + ['neighbor', peer, 'capability', 'dynamic']) if 'cap_ext_next' in peer_config: self.cli_set(base_path + ['neighbor', peer, 'capability', 'extended-nexthop']) if 'cap_ext_sver' in peer_config: self.cli_set(base_path + ['neighbor', peer, 'capability', 'software-version']) if 'description' in peer_config: self.cli_set(base_path + ['neighbor', peer, 'description', peer_config["description"]]) if 'no_cap_nego' in peer_config: self.cli_set(base_path + ['neighbor', peer, 'disable-capability-negotiation']) if 'multi_hop' in peer_config: self.cli_set(base_path + ['neighbor', peer, 'ebgp-multihop', peer_config["multi_hop"]]) if 'local_as' in peer_config: self.cli_set(base_path + ['neighbor', peer, 'local-as', peer_config["local_as"], 'no-prepend', 'replace-as']) if 'local_role' in peer_config: self.cli_set(base_path + ['neighbor', peer, 'local-role', peer_config["local_role"]]) if 'local_role_strict' in peer_config: self.cli_set(base_path + ['neighbor', peer, 'local-role', peer_config["local_role"], 'strict']) if 'cap_over' in peer_config: self.cli_set(base_path + ['neighbor', peer, 'override-capability']) if 'passive' in peer_config: self.cli_set(base_path + ['neighbor', peer, 'passive']) if 'password' in peer_config: self.cli_set(base_path + ['neighbor', peer, 'password', peer_config["password"]]) if 'port' in peer_config: self.cli_set(base_path + ['neighbor', peer, 'port', peer_config["port"]]) if 'remote_as' in peer_config: self.cli_set(base_path + ['neighbor', peer, 'remote-as', peer_config["remote_as"]]) if 'cap_strict' in peer_config: self.cli_set(base_path + ['neighbor', peer, 'strict-capability-match']) if 'shutdown' in peer_config: self.cli_set(base_path + ['neighbor', peer, 'shutdown']) if 'solo' in peer_config: self.cli_set(base_path + ['neighbor', peer, 'solo']) if 'ttl_security' in peer_config: self.cli_set(base_path + ['neighbor', peer, 'ttl-security', 'hops', peer_config["ttl_security"]]) if 'update_src' in peer_config: self.cli_set(base_path + ['neighbor', peer, 'update-source', peer_config["update_src"]]) if 'p_attr_discard' in peer_config: for attribute in peer_config['p_attr_discard']: self.cli_set(base_path + ['neighbor', peer, 'path-attribute', 'discard', attribute]) if 'p_attr_taw' in peer_config: self.cli_set(base_path + ['neighbor', peer, 'path-attribute', 'treat-as-withdraw', peer_config["p_attr_taw"]]) if 'route_map_in' in peer_config: self.cli_set(base_path + ['neighbor', peer, 'address-family', afi, 'route-map', 'import', peer_config["route_map_in"]]) if 'route_map_out' in peer_config: self.cli_set(base_path + ['neighbor', peer, 'address-family', afi, 'route-map', 'export', peer_config["route_map_out"]]) if 'pfx_list_in' in peer_config: self.cli_set(base_path + ['neighbor', peer, 'address-family', afi, 'prefix-list', 'import', peer_config["pfx_list_in"]]) if 'pfx_list_out' in peer_config: self.cli_set(base_path + ['neighbor', peer, 'address-family', afi, 'prefix-list', 'export', peer_config["pfx_list_out"]]) if 'no_send_comm_std' in peer_config: self.cli_set(base_path + ['neighbor', peer, 'address-family', afi, 'disable-send-community', 'standard']) if 'no_send_comm_ext' in peer_config: self.cli_set(base_path + ['neighbor', peer, 'address-family', afi, 'disable-send-community', 'extended']) if 'addpath_all' in peer_config: self.cli_set(base_path + ['neighbor', peer, 'address-family', afi, 'addpath-tx-all']) if 'addpath_per_as' in peer_config: self.cli_set(base_path + ['neighbor', peer, 'address-family', afi, 'addpath-tx-per-as']) if 'graceful_rst' in peer_config: self.cli_set(base_path + ['neighbor', peer, 'graceful-restart', 'enable']) if 'graceful_rst_no' in peer_config: self.cli_set(base_path + ['neighbor', peer, 'graceful-restart', 'disable']) if 'graceful_rst_hlp' in peer_config: self.cli_set(base_path + ['neighbor', peer, 'graceful-restart', 'restart-helper']) if 'disable_conn_chk' in peer_config: self.cli_set(base_path + ['neighbor', peer, 'disable-connected-check']) # Conditional advertisement if 'advertise_map' in peer_config: self.cli_set(base_path + ['neighbor', peer, 'address-family', afi, 'conditionally-advertise', 'advertise-map', peer_config["advertise_map"]]) # Either exist-map or non-exist-map needs to be specified if 'exist_map' not in peer_config and 'non_exist_map' not in peer_config: with self.assertRaises(ConfigSessionError): self.cli_commit() self.cli_set(base_path + ['neighbor', peer, 'address-family', afi, 'conditionally-advertise', 'exist-map', route_map_in]) if 'exist_map' in peer_config: self.cli_set(base_path + ['neighbor', peer, 'address-family', afi, 'conditionally-advertise', 'exist-map', peer_config["exist_map"]]) if 'non_exist_map' in peer_config: self.cli_set(base_path + ['neighbor', peer, 'address-family', afi, 'conditionally-advertise', 'non-exist-map', peer_config["non_exist_map"]]) # commit changes self.cli_commit() # Verify FRR bgpd configuration - frrconfig = self.getFRRconfig(f'router bgp {ASN}') + frrconfig = self.getFRRconfig(f'router bgp {ASN}', endsection='^exit', daemon=bgp_daemon) self.assertIn(f'router bgp {ASN}', frrconfig) for peer, peer_config in neighbor_config.items(): if 'adv_interv' in peer_config: self.assertIn(f' neighbor {peer} advertisement-interval {peer_config["adv_interv"]}', frrconfig) if 'cap_strict' in peer_config: self.assertIn(f' neighbor {peer} strict-capability-match', frrconfig) self.verify_frr_config(peer, peer_config, frrconfig) def test_bgp_03_peer_groups(self): # Test out individual peer-group configuration items for peer_group, config in peer_group_config.items(): if 'bfd' in config: self.cli_set(base_path + ['peer-group', peer_group, 'bfd']) if 'bfd_profile' in config: self.cli_set(base_path + ['peer-group', peer_group, 'bfd', 'profile', config["bfd_profile"]]) self.cli_set(base_path + ['peer-group', peer_group, 'bfd', 'check-control-plane-failure']) if 'cap_dynamic' in config: self.cli_set(base_path + ['peer-group', peer_group, 'capability', 'dynamic']) if 'cap_ext_next' in config: self.cli_set(base_path + ['peer-group', peer_group, 'capability', 'extended-nexthop']) if 'cap_ext_sver' in config: self.cli_set(base_path + ['peer-group', peer_group, 'capability', 'software-version']) if 'description' in config: self.cli_set(base_path + ['peer-group', peer_group, 'description', config["description"]]) if 'no_cap_nego' in config: self.cli_set(base_path + ['peer-group', peer_group, 'disable-capability-negotiation']) if 'multi_hop' in config: self.cli_set(base_path + ['peer-group', peer_group, 'ebgp-multihop', config["multi_hop"]]) if 'local_as' in config: self.cli_set(base_path + ['peer-group', peer_group, 'local-as', config["local_as"], 'no-prepend', 'replace-as']) if 'local_role' in config: self.cli_set(base_path + ['peer-group', peer_group, 'local-role', config["local_role"]]) if 'local_role_strict' in config: self.cli_set(base_path + ['peer-group', peer_group, 'local-role', config["local_role"], 'strict']) if 'cap_over' in config: self.cli_set(base_path + ['peer-group', peer_group, 'override-capability']) if 'passive' in config: self.cli_set(base_path + ['peer-group', peer_group, 'passive']) if 'password' in config: self.cli_set(base_path + ['peer-group', peer_group, 'password', config["password"]]) if 'port' in config: self.cli_set(base_path + ['peer-group', peer_group, 'port', config["port"]]) if 'remote_as' in config: self.cli_set(base_path + ['peer-group', peer_group, 'remote-as', config["remote_as"]]) if 'shutdown' in config: self.cli_set(base_path + ['peer-group', peer_group, 'shutdown']) if 'ttl_security' in config: self.cli_set(base_path + ['peer-group', peer_group, 'ttl-security', 'hops', config["ttl_security"]]) if 'update_src' in config: self.cli_set(base_path + ['peer-group', peer_group, 'update-source', config["update_src"]]) if 'route_map_in' in config: self.cli_set(base_path + ['peer-group', peer_group, 'address-family', 'ipv4-unicast', 'route-map', 'import', config["route_map_in"]]) if 'route_map_out' in config: self.cli_set(base_path + ['peer-group', peer_group, 'address-family', 'ipv4-unicast', 'route-map', 'export', config["route_map_out"]]) if 'pfx_list_in' in config: self.cli_set(base_path + ['peer-group', peer_group, 'address-family', 'ipv4-unicast', 'prefix-list', 'import', config["pfx_list_in"]]) if 'pfx_list_out' in config: self.cli_set(base_path + ['peer-group', peer_group, 'address-family', 'ipv4-unicast', 'prefix-list', 'export', config["pfx_list_out"]]) if 'no_send_comm_std' in config: self.cli_set(base_path + ['peer-group', peer_group, 'address-family', 'ipv4-unicast', 'disable-send-community', 'standard']) if 'no_send_comm_ext' in config: self.cli_set(base_path + ['peer-group', peer_group, 'address-family', 'ipv4-unicast', 'disable-send-community', 'extended']) if 'addpath_all' in config: self.cli_set(base_path + ['peer-group', peer_group, 'address-family', 'ipv4-unicast', 'addpath-tx-all']) if 'addpath_per_as' in config: self.cli_set(base_path + ['peer-group', peer_group, 'address-family', 'ipv4-unicast', 'addpath-tx-per-as']) if 'graceful_rst' in config: self.cli_set(base_path + ['peer-group', peer_group, 'graceful-restart', 'enable']) if 'graceful_rst_no' in config: self.cli_set(base_path + ['peer-group', peer_group, 'graceful-restart', 'disable']) if 'graceful_rst_hlp' in config: self.cli_set(base_path + ['peer-group', peer_group, 'graceful-restart', 'restart-helper']) if 'disable_conn_chk' in config: self.cli_set(base_path + ['peer-group', peer_group, 'disable-connected-check']) if 'p_attr_discard' in config: for attribute in config['p_attr_discard']: self.cli_set(base_path + ['peer-group', peer_group, 'path-attribute', 'discard', attribute]) if 'p_attr_taw' in config: self.cli_set(base_path + ['peer-group', peer_group, 'path-attribute', 'treat-as-withdraw', config["p_attr_taw"]]) # Conditional advertisement if 'advertise_map' in config: self.cli_set(base_path + ['peer-group', peer_group, 'address-family', 'ipv4-unicast', 'conditionally-advertise', 'advertise-map', config["advertise_map"]]) # Either exist-map or non-exist-map needs to be specified if 'exist_map' not in config and 'non_exist_map' not in config: with self.assertRaises(ConfigSessionError): self.cli_commit() self.cli_set(base_path + ['peer-group', peer_group, 'address-family', 'ipv4-unicast', 'conditionally-advertise', 'exist-map', route_map_in]) if 'exist_map' in config: self.cli_set(base_path + ['peer-group', peer_group, 'address-family', 'ipv4-unicast', 'conditionally-advertise', 'exist-map', config["exist_map"]]) if 'non_exist_map' in config: self.cli_set(base_path + ['peer-group', peer_group, 'address-family', 'ipv4-unicast', 'conditionally-advertise', 'non-exist-map', config["non_exist_map"]]) for peer, peer_config in neighbor_config.items(): if 'peer_group' in peer_config: self.cli_set(base_path + ['neighbor', peer, 'peer-group', peer_config['peer_group']]) # commit changes self.cli_commit() # Verify FRR bgpd configuration - frrconfig = self.getFRRconfig(f'router bgp {ASN}') + frrconfig = self.getFRRconfig(f'router bgp {ASN}', endsection='^exit', daemon=bgp_daemon) self.assertIn(f'router bgp {ASN}', frrconfig) for peer, peer_config in peer_group_config.items(): self.assertIn(f' neighbor {peer_group} peer-group', frrconfig) self.verify_frr_config(peer, peer_config, frrconfig) for peer, peer_config in neighbor_config.items(): if 'peer_group' in peer_config: self.assertIn(f' neighbor {peer} peer-group {peer_config["peer_group"]}', frrconfig) def test_bgp_04_afi_ipv4(self): networks = { '10.0.0.0/8' : { 'as_set' : '', 'summary_only' : '', 'route_map' : route_map_in, }, '100.64.0.0/10' : { 'as_set' : '', }, '192.168.0.0/16' : { 'summary_only' : '', }, } # We want to redistribute ... redistributes = ['connected', 'isis', 'kernel', 'ospf', 'rip', 'static'] for redistribute in redistributes: self.cli_set(base_path + ['address-family', 'ipv4-unicast', 'redistribute', redistribute]) for network, network_config in networks.items(): self.cli_set(base_path + ['address-family', 'ipv4-unicast', 'network', network]) if 'as_set' in network_config: self.cli_set(base_path + ['address-family', 'ipv4-unicast', 'aggregate-address', network, 'as-set']) if 'summary_only' in network_config: self.cli_set(base_path + ['address-family', 'ipv4-unicast', 'aggregate-address', network, 'summary-only']) if 'route_map' in network_config: self.cli_set(base_path + ['address-family', 'ipv4-unicast', 'aggregate-address', network, 'route-map', network_config['route_map']]) # commit changes self.cli_commit() # Verify FRR bgpd configuration - frrconfig = self.getFRRconfig(f'router bgp {ASN}') + frrconfig = self.getFRRconfig(f'router bgp {ASN}', endsection='^exit', daemon=bgp_daemon) self.assertIn(f'router bgp {ASN}', frrconfig) self.assertIn(f' address-family ipv4 unicast', frrconfig) for redistribute in redistributes: self.assertIn(f' redistribute {redistribute}', frrconfig) for network, network_config in networks.items(): self.assertIn(f' network {network}', frrconfig) command = f'aggregate-address {network}' if 'as_set' in network_config: command = f'{command} as-set' if 'summary_only' in network_config: command = f'{command} summary-only' if 'route_map' in network_config: command = f'{command} route-map {network_config["route_map"]}' self.assertIn(command, frrconfig) def test_bgp_05_afi_ipv6(self): networks = { '2001:db8:100::/48' : { }, '2001:db8:200::/48' : { }, '2001:db8:300::/48' : { 'summary_only' : '', }, } # We want to redistribute ... redistributes = ['connected', 'kernel', 'ospfv3', 'ripng', 'static'] for redistribute in redistributes: self.cli_set(base_path + ['address-family', 'ipv6-unicast', 'redistribute', redistribute]) for network, network_config in networks.items(): self.cli_set(base_path + ['address-family', 'ipv6-unicast', 'network', network]) if 'summary_only' in network_config: self.cli_set(base_path + ['address-family', 'ipv6-unicast', 'aggregate-address', network, 'summary-only']) # commit changes self.cli_commit() # Verify FRR bgpd configuration - frrconfig = self.getFRRconfig(f'router bgp {ASN}') + frrconfig = self.getFRRconfig(f'router bgp {ASN}', endsection='^exit', daemon=bgp_daemon) self.assertIn(f'router bgp {ASN}', frrconfig) self.assertIn(f' address-family ipv6 unicast', frrconfig) # T2100: By default ebgp-requires-policy is disabled to keep VyOS # 1.3 and 1.2 backwards compatibility self.assertIn(f' no bgp ebgp-requires-policy', frrconfig) for redistribute in redistributes: # FRR calls this OSPF6 if redistribute == 'ospfv3': redistribute = 'ospf6' self.assertIn(f' redistribute {redistribute}', frrconfig) for network, network_config in networks.items(): self.assertIn(f' network {network}', frrconfig) if 'as_set' in network_config: self.assertIn(f' aggregate-address {network} summary-only', frrconfig) def test_bgp_06_listen_range(self): # Implemented via T1875 limit = '64' listen_ranges = ['192.0.2.0/25', '192.0.2.128/25'] peer_group = 'listenfoobar' self.cli_set(base_path + ['listen', 'limit', limit]) for prefix in listen_ranges: self.cli_set(base_path + ['listen', 'range', prefix]) # check validate() - peer-group must be defined for range/prefix with self.assertRaises(ConfigSessionError): self.cli_commit() self.cli_set(base_path + ['listen', 'range', prefix, 'peer-group', peer_group]) # check validate() - peer-group does yet not exist! with self.assertRaises(ConfigSessionError): self.cli_commit() self.cli_set(base_path + ['peer-group', peer_group, 'remote-as', ASN]) # commit changes self.cli_commit() # Verify FRR bgpd configuration - frrconfig = self.getFRRconfig(f'router bgp {ASN}') + frrconfig = self.getFRRconfig(f'router bgp {ASN}', endsection='^exit', daemon=bgp_daemon) self.assertIn(f'router bgp {ASN}', frrconfig) self.assertIn(f' neighbor {peer_group} peer-group', frrconfig) self.assertIn(f' neighbor {peer_group} remote-as {ASN}', frrconfig) self.assertIn(f' bgp listen limit {limit}', frrconfig) for prefix in listen_ranges: self.assertIn(f' bgp listen range {prefix} peer-group {peer_group}', frrconfig) def test_bgp_07_l2vpn_evpn(self): vnis = ['10010', '10020', '10030'] soo = '1.2.3.4:10000' evi_limit = '1000' route_targets = ['1.1.1.1:100', '1.1.1.1:200', '1.1.1.1:300'] self.cli_set(base_path + ['address-family', 'l2vpn-evpn', 'advertise-all-vni']) self.cli_set(base_path + ['address-family', 'l2vpn-evpn', 'advertise-default-gw']) self.cli_set(base_path + ['address-family', 'l2vpn-evpn', 'advertise-svi-ip']) self.cli_set(base_path + ['address-family', 'l2vpn-evpn', 'flooding', 'disable']) self.cli_set(base_path + ['address-family', 'l2vpn-evpn', 'default-originate', 'ipv4']) self.cli_set(base_path + ['address-family', 'l2vpn-evpn', 'default-originate', 'ipv6']) self.cli_set(base_path + ['address-family', 'l2vpn-evpn', 'disable-ead-evi-rx']) self.cli_set(base_path + ['address-family', 'l2vpn-evpn', 'disable-ead-evi-tx']) self.cli_set(base_path + ['address-family', 'l2vpn-evpn', 'mac-vrf', 'soo', soo]) for vni in vnis: self.cli_set(base_path + ['address-family', 'l2vpn-evpn', 'vni', vni, 'advertise-default-gw']) self.cli_set(base_path + ['address-family', 'l2vpn-evpn', 'vni', vni, 'advertise-svi-ip']) self.cli_set(base_path + ['address-family', 'l2vpn-evpn', 'ead-es-frag', 'evi-limit', evi_limit]) for route_target in route_targets: self.cli_set(base_path + ['address-family', 'l2vpn-evpn', 'ead-es-route-target', 'export', route_target]) # commit changes self.cli_commit() # Verify FRR bgpd configuration - frrconfig = self.getFRRconfig(f'router bgp {ASN}') + frrconfig = self.getFRRconfig(f'router bgp {ASN}', endsection='^exit', daemon=bgp_daemon) self.assertIn(f'router bgp {ASN}', frrconfig) self.assertIn(f' address-family l2vpn evpn', frrconfig) self.assertIn(f' advertise-all-vni', frrconfig) self.assertIn(f' advertise-default-gw', frrconfig) self.assertIn(f' advertise-svi-ip', frrconfig) self.assertIn(f' default-originate ipv4', frrconfig) self.assertIn(f' default-originate ipv6', frrconfig) self.assertIn(f' disable-ead-evi-rx', frrconfig) self.assertIn(f' disable-ead-evi-tx', frrconfig) self.assertIn(f' flooding disable', frrconfig) self.assertIn(f' mac-vrf soo {soo}', frrconfig) for vni in vnis: - vniconfig = self.getFRRconfig(f' vni {vni}') + vniconfig = self.getFRRconfig(f' vni {vni}', endsection='^exit-vni') self.assertIn(f'vni {vni}', vniconfig) self.assertIn(f' advertise-default-gw', vniconfig) self.assertIn(f' advertise-svi-ip', vniconfig) self.assertIn(f' ead-es-frag evi-limit {evi_limit}', frrconfig) for route_target in route_targets: self.assertIn(f' ead-es-route-target export {route_target}', frrconfig) def test_bgp_09_distance_and_flowspec(self): distance_external = '25' distance_internal = '30' distance_local = '35' distance_v4_prefix = '169.254.0.0/32' distance_v6_prefix = '2001::/128' distance_prefix_value = '110' distance_families = ['ipv4-unicast', 'ipv6-unicast','ipv4-multicast', 'ipv6-multicast'] verify_families = ['ipv4 unicast', 'ipv6 unicast','ipv4 multicast', 'ipv6 multicast'] flowspec_families = ['address-family ipv4 flowspec', 'address-family ipv6 flowspec'] flowspec_int = 'lo' # Per family distance support for family in distance_families: self.cli_set(base_path + ['address-family', family, 'distance', 'external', distance_external]) self.cli_set(base_path + ['address-family', family, 'distance', 'internal', distance_internal]) self.cli_set(base_path + ['address-family', family, 'distance', 'local', distance_local]) if 'ipv4' in family: self.cli_set(base_path + ['address-family', family, 'distance', 'prefix', distance_v4_prefix, 'distance', distance_prefix_value]) if 'ipv6' in family: self.cli_set(base_path + ['address-family', family, 'distance', 'prefix', distance_v6_prefix, 'distance', distance_prefix_value]) # IPv4 flowspec interface check self.cli_set(base_path + ['address-family', 'ipv4-flowspec', 'local-install', 'interface', flowspec_int]) # IPv6 flowspec interface check self.cli_set(base_path + ['address-family', 'ipv6-flowspec', 'local-install', 'interface', flowspec_int]) # Commit changes self.cli_commit() # Verify FRR distances configuration frrconfig = self.getFRRconfig(f'router bgp {ASN}') self.assertIn(f'router bgp {ASN}', frrconfig) for family in verify_families: self.assertIn(f'address-family {family}', frrconfig) self.assertIn(f'distance bgp {distance_external} {distance_internal} {distance_local}', frrconfig) if 'ipv4' in family: self.assertIn(f'distance {distance_prefix_value} {distance_v4_prefix}', frrconfig) if 'ipv6' in family: self.assertIn(f'distance {distance_prefix_value} {distance_v6_prefix}', frrconfig) # Verify FRR flowspec configuration for family in flowspec_families: self.assertIn(f'{family}', frrconfig) self.assertIn(f'local-install {flowspec_int}', frrconfig) def test_bgp_10_vrf_simple(self): router_id = '127.0.0.3' vrfs = ['red', 'green', 'blue'] # It is safe to assume that when the basic VRF test works, all # other BGP related features work, as we entirely inherit the CLI # templates and Jinja2 FRR template. table = '1000' # testing only one AFI is sufficient as it's generic code for vrf in vrfs: vrf_base = ['vrf', 'name', vrf] self.cli_set(vrf_base + ['table', table]) self.cli_set(vrf_base + ['protocols', 'bgp', 'system-as', ASN]) self.cli_set(vrf_base + ['protocols', 'bgp', 'parameters', 'router-id', router_id]) table = str(int(table) + 1000) # import VRF routes do main RIB self.cli_set(base_path + ['address-family', 'ipv6-unicast', 'import', 'vrf', vrf]) self.cli_commit() # Verify FRR bgpd configuration frrconfig = self.getFRRconfig(f'router bgp {ASN}') self.assertIn(f'router bgp {ASN}', frrconfig) self.assertIn(f' address-family ipv6 unicast', frrconfig) for vrf in vrfs: self.assertIn(f' import vrf {vrf}', frrconfig) # Verify FRR bgpd configuration frr_vrf_config = self.getFRRconfig(f'router bgp {ASN} vrf {vrf}') self.assertIn(f'router bgp {ASN} vrf {vrf}', frr_vrf_config) self.assertIn(f' bgp router-id {router_id}', frr_vrf_config) def test_bgp_11_confederation(self): router_id = '127.10.10.2' confed_id = str(int(ASN) + 1) confed_asns = '10 20 30 40' self.cli_set(base_path + ['parameters', 'router-id', router_id]) self.cli_set(base_path + ['parameters', 'confederation', 'identifier', confed_id]) for asn in confed_asns.split(): self.cli_set(base_path + ['parameters', 'confederation', 'peers', asn]) # commit changes self.cli_commit() # Verify FRR bgpd configuration frrconfig = self.getFRRconfig(f'router bgp {ASN}') self.assertIn(f'router bgp {ASN}', frrconfig) self.assertIn(f' bgp router-id {router_id}', frrconfig) self.assertIn(f' bgp confederation identifier {confed_id}', frrconfig) self.assertIn(f' bgp confederation peers {confed_asns}', frrconfig) def test_bgp_12_v6_link_local(self): remote_asn = str(int(ASN) + 10) interface = 'eth0' self.cli_set(base_path + ['neighbor', interface, 'address-family', 'ipv6-unicast']) self.cli_set(base_path + ['neighbor', interface, 'interface', 'v6only', 'remote-as', remote_asn]) # commit changes self.cli_commit() # Verify FRR bgpd configuration frrconfig = self.getFRRconfig(f'router bgp {ASN}') self.assertIn(f'router bgp {ASN}', frrconfig) self.assertIn(f' neighbor {interface} interface v6only remote-as {remote_asn}', frrconfig) self.assertIn(f' address-family ipv6 unicast', frrconfig) self.assertIn(f' neighbor {interface} activate', frrconfig) self.assertIn(f' exit-address-family', frrconfig) def test_bgp_13_vpn(self): remote_asn = str(int(ASN) + 150) neighbor = '192.0.2.55' vrf_name = 'red' label = 'auto' rd = f'{neighbor}:{ASN}' rt_export = f'{neighbor}:1002 1.2.3.4:567' rt_import = f'{neighbor}:1003 500:100' # testing only one AFI is sufficient as it's generic code for afi in ['ipv4-unicast', 'ipv6-unicast']: self.cli_set(base_path + ['address-family', afi, 'export', 'vpn']) self.cli_set(base_path + ['address-family', afi, 'import', 'vpn']) self.cli_set(base_path + ['address-family', afi, 'label', 'vpn', 'export', label]) self.cli_set(base_path + ['address-family', afi, 'label', 'vpn', 'allocation-mode', 'per-nexthop']) self.cli_set(base_path + ['address-family', afi, 'rd', 'vpn', 'export', rd]) self.cli_set(base_path + ['address-family', afi, 'route-map', 'vpn', 'export', route_map_out]) self.cli_set(base_path + ['address-family', afi, 'route-map', 'vpn', 'import', route_map_in]) self.cli_set(base_path + ['address-family', afi, 'route-target', 'vpn', 'export', rt_export]) self.cli_set(base_path + ['address-family', afi, 'route-target', 'vpn', 'import', rt_import]) # commit changes self.cli_commit() # Verify FRR bgpd configuration frrconfig = self.getFRRconfig(f'router bgp {ASN}') self.assertIn(f'router bgp {ASN}', frrconfig) for afi in ['ipv4', 'ipv6']: afi_config = self.getFRRconfig(f' address-family {afi} unicast', endsection='exit-address-family', daemon=bgp_daemon) self.assertIn(f'address-family {afi} unicast', afi_config) self.assertIn(f' export vpn', afi_config) self.assertIn(f' import vpn', afi_config) self.assertIn(f' label vpn export {label}', afi_config) self.assertIn(f' label vpn export allocation-mode per-nexthop', afi_config) self.assertIn(f' rd vpn export {rd}', afi_config) self.assertIn(f' route-map vpn export {route_map_out}', afi_config) self.assertIn(f' route-map vpn import {route_map_in}', afi_config) self.assertIn(f' rt vpn export {rt_export}', afi_config) self.assertIn(f' rt vpn import {rt_import}', afi_config) self.assertIn(f' exit-address-family', afi_config) def test_bgp_14_remote_as_peer_group_override(self): # Peer-group member cannot override remote-as of peer-group remote_asn = str(int(ASN) + 150) neighbor = '192.0.2.1' peer_group = 'bar' interface = 'eth0' self.cli_set(base_path + ['neighbor', neighbor, 'remote-as', remote_asn]) self.cli_set(base_path + ['neighbor', neighbor, 'peer-group', peer_group]) self.cli_set(base_path + ['peer-group', peer_group, 'remote-as', remote_asn]) # Peer-group member cannot override remote-as of peer-group with self.assertRaises(ConfigSessionError): self.cli_commit() self.cli_delete(base_path + ['neighbor', neighbor, 'remote-as']) # re-test with interface based peer-group self.cli_set(base_path + ['neighbor', interface, 'interface', 'peer-group', peer_group]) self.cli_set(base_path + ['neighbor', interface, 'interface', 'remote-as', 'external']) with self.assertRaises(ConfigSessionError): self.cli_commit() self.cli_delete(base_path + ['neighbor', interface, 'interface', 'remote-as']) # re-test with interface based v6only peer-group self.cli_set(base_path + ['neighbor', interface, 'interface', 'v6only', 'peer-group', peer_group]) self.cli_set(base_path + ['neighbor', interface, 'interface', 'v6only', 'remote-as', 'external']) with self.assertRaises(ConfigSessionError): self.cli_commit() self.cli_delete(base_path + ['neighbor', interface, 'interface', 'v6only', 'remote-as']) self.cli_commit() frrconfig = self.getFRRconfig(f'router bgp {ASN}') self.assertIn(f'router bgp {ASN}', frrconfig) self.assertIn(f' neighbor {neighbor} peer-group {peer_group}', frrconfig) self.assertIn(f' neighbor {peer_group} peer-group', frrconfig) self.assertIn(f' neighbor {peer_group} remote-as {remote_asn}', frrconfig) def test_bgp_15_local_as_ebgp(self): # https://vyos.dev/T4560 # local-as allowed only for ebgp peers neighbor = '192.0.2.99' remote_asn = '500' local_asn = '400' self.cli_set(base_path + ['neighbor', neighbor, 'remote-as', ASN]) self.cli_set(base_path + ['neighbor', neighbor, 'local-as', local_asn]) # check validate() - local-as allowed only for ebgp peers with self.assertRaises(ConfigSessionError): self.cli_commit() self.cli_set(base_path + ['neighbor', neighbor, 'remote-as', remote_asn]) self.cli_commit() frrconfig = self.getFRRconfig(f'router bgp {ASN}') self.assertIn(f'router bgp {ASN}', frrconfig) self.assertIn(f' neighbor {neighbor} remote-as {remote_asn}', frrconfig) self.assertIn(f' neighbor {neighbor} local-as {local_asn}', frrconfig) def test_bgp_16_import_rd_rt_compatibility(self): # Verify if import vrf and rd vpn export # exist in the same address family self.create_bgp_instances_for_import_test() self.cli_set( base_path + ['address-family', import_afi, 'import', 'vrf', import_vrf]) self.cli_set( base_path + ['address-family', import_afi, 'rd', 'vpn', 'export', import_rd]) with self.assertRaises(ConfigSessionError): self.cli_commit() def test_bgp_17_import_rd_rt_compatibility(self): # Verify if vrf that is in import vrf list contains rd vpn export self.create_bgp_instances_for_import_test() self.cli_set( base_path + ['address-family', import_afi, 'import', 'vrf', import_vrf]) self.cli_commit() frrconfig = self.getFRRconfig(f'router bgp {ASN}') frrconfig_vrf = self.getFRRconfig(f'router bgp {ASN} vrf {import_vrf}') self.assertIn(f'router bgp {ASN}', frrconfig) self.assertIn(f'address-family ipv4 unicast', frrconfig) self.assertIn(f' import vrf {import_vrf}', frrconfig) self.assertIn(f'router bgp {ASN} vrf {import_vrf}', frrconfig_vrf) self.cli_set( import_vrf_base + [import_vrf] + base_path + ['address-family', import_afi, 'rd', 'vpn', 'export', import_rd]) with self.assertRaises(ConfigSessionError): self.cli_commit() def test_bgp_18_deleting_import_vrf(self): # Verify deleting vrf that is in import vrf list self.create_bgp_instances_for_import_test() self.cli_set( base_path + ['address-family', import_afi, 'import', 'vrf', import_vrf]) self.cli_commit() frrconfig = self.getFRRconfig(f'router bgp {ASN}') frrconfig_vrf = self.getFRRconfig(f'router bgp {ASN} vrf {import_vrf}') self.assertIn(f'router bgp {ASN}', frrconfig) self.assertIn(f'address-family ipv4 unicast', frrconfig) self.assertIn(f' import vrf {import_vrf}', frrconfig) self.assertIn(f'router bgp {ASN} vrf {import_vrf}', frrconfig_vrf) self.cli_delete(import_vrf_base + [import_vrf]) with self.assertRaises(ConfigSessionError): self.cli_commit() def test_bgp_19_deleting_default_vrf(self): # Verify deleting existent vrf default if other vrfs were created self.create_bgp_instances_for_import_test() self.cli_commit() frrconfig = self.getFRRconfig(f'router bgp {ASN}') frrconfig_vrf = self.getFRRconfig(f'router bgp {ASN} vrf {import_vrf}') self.assertIn(f'router bgp {ASN}', frrconfig) self.assertIn(f'router bgp {ASN} vrf {import_vrf}', frrconfig_vrf) self.cli_delete(base_path) with self.assertRaises(ConfigSessionError): self.cli_commit() def test_bgp_20_import_rd_rt_compatibility(self): # Verify if vrf that has rd vpn export is in import vrf of other vrfs self.create_bgp_instances_for_import_test() self.cli_set( import_vrf_base + [import_vrf] + base_path + ['address-family', import_afi, 'rd', 'vpn', 'export', import_rd]) self.cli_commit() frrconfig = self.getFRRconfig(f'router bgp {ASN}') frrconfig_vrf = self.getFRRconfig(f'router bgp {ASN} vrf {import_vrf}') self.assertIn(f'router bgp {ASN}', frrconfig) self.assertIn(f'router bgp {ASN} vrf {import_vrf}', frrconfig_vrf) self.assertIn(f'address-family ipv4 unicast', frrconfig_vrf) self.assertIn(f' rd vpn export {import_rd}', frrconfig_vrf) self.cli_set( base_path + ['address-family', import_afi, 'import', 'vrf', import_vrf]) with self.assertRaises(ConfigSessionError): self.cli_commit() def test_bgp_21_import_unspecified_vrf(self): # Verify if vrf that is in import is unspecified self.create_bgp_instances_for_import_test() self.cli_set( base_path + ['address-family', import_afi, 'import', 'vrf', 'test']) with self.assertRaises(ConfigSessionError): self.cli_commit() def test_bgp_22_interface_mpls_forwarding(self): interfaces = Section.interfaces('ethernet', vlan=False) for interface in interfaces: self.cli_set(base_path + ['interface', interface, 'mpls', 'forwarding']) self.cli_commit() for interface in interfaces: frrconfig = self.getFRRconfig(f'interface {interface}') self.assertIn(f'interface {interface}', frrconfig) self.assertIn(f' mpls bgp forwarding', frrconfig) def test_bgp_23_vrf_interface_mpls_forwarding(self): self.create_bgp_instances_for_import_test() interfaces = Section.interfaces('ethernet', vlan=False) for interface in interfaces: self.cli_set(['interfaces', 'ethernet', interface, 'vrf', import_vrf]) self.cli_set(import_vrf_base + [import_vrf] + base_path + ['interface', interface, 'mpls', 'forwarding']) self.cli_commit() for interface in interfaces: frrconfig = self.getFRRconfig(f'interface {interface}') self.assertIn(f'interface {interface}', frrconfig) self.assertIn(f' mpls bgp forwarding', frrconfig) self.cli_delete(['interfaces', 'ethernet', interface, 'vrf']) def test_bgp_24_srv6_sid(self): locator_name = 'VyOS_foo' sid = 'auto' nexthop_ipv4 = '192.0.0.1' nexthop_ipv6 = '2001:db8:100:200::2' self.cli_set(base_path + ['srv6', 'locator', locator_name]) self.cli_set(base_path + ['sid', 'vpn', 'per-vrf', 'export', sid]) self.cli_set(base_path + ['address-family', 'ipv4-unicast', 'sid', 'vpn', 'export', sid]) # verify() - SID per VRF and SID per address-family are mutually exclusive! with self.assertRaises(ConfigSessionError): self.cli_commit() self.cli_delete(base_path + ['address-family', 'ipv4-unicast', 'sid']) self.cli_commit() frrconfig = self.getFRRconfig(f'router bgp {ASN}') self.assertIn(f'router bgp {ASN}', frrconfig) self.assertIn(f' segment-routing srv6', frrconfig) self.assertIn(f' locator {locator_name}', frrconfig) self.assertIn(f' sid vpn per-vrf export {sid}', frrconfig) # Now test AFI SID self.cli_delete(base_path + ['sid']) self.cli_set(base_path + ['address-family', 'ipv4-unicast', 'sid', 'vpn', 'export', sid]) self.cli_set(base_path + ['address-family', 'ipv4-unicast', 'nexthop', 'vpn', 'export', nexthop_ipv4]) self.cli_set(base_path + ['address-family', 'ipv6-unicast', 'sid', 'vpn', 'export', sid]) self.cli_set(base_path + ['address-family', 'ipv6-unicast', 'nexthop', 'vpn', 'export', nexthop_ipv6]) self.cli_commit() frrconfig = self.getFRRconfig(f'router bgp {ASN}') self.assertIn(f'router bgp {ASN}', frrconfig) self.assertIn(f' segment-routing srv6', frrconfig) self.assertIn(f' locator {locator_name}', frrconfig) afiv4_config = self.getFRRconfig(' address-family ipv4 unicast') self.assertIn(f' sid vpn export {sid}', afiv4_config) self.assertIn(f' nexthop vpn export {nexthop_ipv4}', afiv4_config) afiv6_config = self.getFRRconfig(' address-family ipv6 unicast') self.assertIn(f' sid vpn export {sid}', afiv6_config) self.assertIn(f' nexthop vpn export {nexthop_ipv6}', afiv4_config) def test_bgp_25_ipv4_labeled_unicast_peer_group(self): pg_ipv4 = 'foo4' ipv4_max_prefix = '20' ipv4_prefix = '192.0.2.0/24' self.cli_set(base_path + ['listen', 'range', ipv4_prefix, 'peer-group', pg_ipv4]) self.cli_set(base_path + ['parameters', 'labeled-unicast', 'ipv4-explicit-null']) self.cli_set(base_path + ['peer-group', pg_ipv4, 'address-family', 'ipv4-labeled-unicast', 'maximum-prefix', ipv4_max_prefix]) self.cli_set(base_path + ['peer-group', pg_ipv4, 'remote-as', 'external']) self.cli_commit() frrconfig = self.getFRRconfig(f'router bgp {ASN}') self.assertIn(f'router bgp {ASN}', frrconfig) self.assertIn(f' neighbor {pg_ipv4} peer-group', frrconfig) self.assertIn(f' neighbor {pg_ipv4} remote-as external', frrconfig) self.assertIn(f' bgp listen range {ipv4_prefix} peer-group {pg_ipv4}', frrconfig) self.assertIn(f' bgp labeled-unicast ipv4-explicit-null', frrconfig) afiv4_config = self.getFRRconfig(' address-family ipv4 labeled-unicast') self.assertIn(f' neighbor {pg_ipv4} activate', afiv4_config) self.assertIn(f' neighbor {pg_ipv4} maximum-prefix {ipv4_max_prefix}', afiv4_config) def test_bgp_26_ipv6_labeled_unicast_peer_group(self): pg_ipv6 = 'foo6' ipv6_max_prefix = '200' ipv6_prefix = '2001:db8:1000::/64' self.cli_set(base_path + ['listen', 'range', ipv6_prefix, 'peer-group', pg_ipv6]) self.cli_set(base_path + ['parameters', 'labeled-unicast', 'ipv6-explicit-null']) self.cli_set(base_path + ['peer-group', pg_ipv6, 'address-family', 'ipv6-labeled-unicast', 'maximum-prefix', ipv6_max_prefix]) self.cli_set(base_path + ['peer-group', pg_ipv6, 'remote-as', 'external']) self.cli_commit() frrconfig = self.getFRRconfig(f'router bgp {ASN}') self.assertIn(f'router bgp {ASN}', frrconfig) self.assertIn(f' neighbor {pg_ipv6} peer-group', frrconfig) self.assertIn(f' neighbor {pg_ipv6} remote-as external', frrconfig) self.assertIn(f' bgp listen range {ipv6_prefix} peer-group {pg_ipv6}', frrconfig) self.assertIn(f' bgp labeled-unicast ipv6-explicit-null', frrconfig) afiv6_config = self.getFRRconfig(' address-family ipv6 labeled-unicast') self.assertIn(f' neighbor {pg_ipv6} activate', afiv6_config) self.assertIn(f' neighbor {pg_ipv6} maximum-prefix {ipv6_max_prefix}', afiv6_config) def test_bgp_27_route_reflector_client(self): self.cli_set(base_path + ['peer-group', 'peer1', 'address-family', 'l2vpn-evpn', 'route-reflector-client']) with self.assertRaises(ConfigSessionError) as e: self.cli_commit() self.cli_set(base_path + ['peer-group', 'peer1', 'remote-as', 'internal']) self.cli_commit() conf = self.getFRRconfig(' address-family l2vpn evpn') self.assertIn('neighbor peer1 route-reflector-client', conf) def test_bgp_28_peer_group_member_all_internal_or_external(self): def _common_config_check(conf, include_ras=True): if include_ras: self.assertIn(f'neighbor {int_neighbors[0]} remote-as {ASN}', conf) self.assertIn(f'neighbor {int_neighbors[1]} remote-as {ASN}', conf) self.assertIn(f'neighbor {ext_neighbors[0]} remote-as {int(ASN) + 1}',conf) self.assertIn(f'neighbor {int_neighbors[0]} peer-group {int_pg_name}', conf) self.assertIn(f'neighbor {int_neighbors[1]} peer-group {int_pg_name}', conf) self.assertIn(f'neighbor {ext_neighbors[0]} peer-group {ext_pg_name}', conf) int_neighbors = ['192.0.2.2', '192.0.2.3'] ext_neighbors = ['192.122.2.2', '192.122.2.3'] int_pg_name, ext_pg_name = 'SMOKETESTINT', 'SMOKETESTEXT' self.cli_set(base_path + ['neighbor', int_neighbors[0], 'peer-group', int_pg_name]) self.cli_set(base_path + ['neighbor', int_neighbors[0], 'remote-as', ASN]) self.cli_set(base_path + ['peer-group', int_pg_name, 'address-family', 'ipv4-unicast']) self.cli_set(base_path + ['neighbor', ext_neighbors[0], 'peer-group', ext_pg_name]) self.cli_set(base_path + ['neighbor', ext_neighbors[0], 'remote-as', f'{int(ASN) + 1}']) self.cli_set(base_path + ['peer-group', ext_pg_name, 'address-family', 'ipv4-unicast']) self.cli_commit() # test add external remote-as to internal group self.cli_set(base_path + ['neighbor', int_neighbors[1], 'peer-group', int_pg_name]) self.cli_set(base_path + ['neighbor', int_neighbors[1], 'remote-as', f'{int(ASN) + 1}']) with self.assertRaises(ConfigSessionError) as e: self.cli_commit() # self.assertIn('\nPeer-group members must be all internal or all external\n', str(e.exception)) # test add internal remote-as to internal group self.cli_set(base_path + ['neighbor', int_neighbors[1], 'remote-as', ASN]) self.cli_commit() conf = self.getFRRconfig(f'router bgp {ASN}') _common_config_check(conf) # test add internal remote-as to external group self.cli_set(base_path + ['neighbor', ext_neighbors[1], 'peer-group', ext_pg_name]) self.cli_set(base_path + ['neighbor', ext_neighbors[1], 'remote-as', ASN]) with self.assertRaises(ConfigSessionError) as e: self.cli_commit() # self.assertIn('\nPeer-group members must be all internal or all external\n', str(e.exception)) # test add external remote-as to external group self.cli_set(base_path + ['neighbor', ext_neighbors[1], 'remote-as', f'{int(ASN) + 2}']) self.cli_commit() conf = self.getFRRconfig(f'router bgp {ASN}') _common_config_check(conf) self.assertIn(f'neighbor {ext_neighbors[1]} remote-as {int(ASN) + 2}', conf) self.assertIn(f'neighbor {ext_neighbors[1]} peer-group {ext_pg_name}', conf) # test named remote-as self.cli_set(base_path + ['neighbor', int_neighbors[0], 'remote-as', 'internal']) self.cli_set(base_path + ['neighbor', int_neighbors[1], 'remote-as', 'internal']) self.cli_set(base_path + ['neighbor', ext_neighbors[0], 'remote-as', 'external']) self.cli_set(base_path + ['neighbor', ext_neighbors[1], 'remote-as', 'external']) self.cli_commit() conf = self.getFRRconfig(f'router bgp {ASN}') _common_config_check(conf, include_ras=False) self.assertIn(f'neighbor {int_neighbors[0]} remote-as internal', conf) self.assertIn(f'neighbor {int_neighbors[1]} remote-as internal', conf) self.assertIn(f'neighbor {ext_neighbors[0]} remote-as external', conf) self.assertIn(f'neighbor {ext_neighbors[1]} remote-as external', conf) self.assertIn(f'neighbor {ext_neighbors[1]} peer-group {ext_pg_name}', conf) def test_bgp_29_peer_group_remote_as_equal_local_as(self): self.cli_set(base_path + ['system-as', ASN]) self.cli_set(base_path + ['peer-group', 'OVERLAY', 'local-as', f'{int(ASN) + 1}']) self.cli_set(base_path + ['peer-group', 'OVERLAY', 'remote-as', f'{int(ASN) + 1}']) self.cli_set(base_path + ['peer-group', 'OVERLAY', 'address-family', 'l2vpn-evpn']) self.cli_set(base_path + ['peer-group', 'UNDERLAY', 'address-family', 'ipv4-unicast']) self.cli_set(base_path + ['neighbor', '10.177.70.62', 'peer-group', 'UNDERLAY']) self.cli_set(base_path + ['neighbor', '10.177.70.62', 'remote-as', 'external']) self.cli_set(base_path + ['neighbor', '10.177.75.1', 'peer-group', 'OVERLAY']) self.cli_set(base_path + ['neighbor', '10.177.75.2', 'peer-group', 'OVERLAY']) self.cli_commit() conf = self.getFRRconfig(f'router bgp {ASN}') self.assertIn(f'neighbor OVERLAY remote-as {int(ASN) + 1}', conf) self.assertIn(f'neighbor OVERLAY local-as {int(ASN) + 1}', conf) def test_bgp_99_bmp(self): target_name = 'instance-bmp' target_address = '127.0.0.1' target_port = '5000' min_retry = '1024' max_retry = '2048' monitor_ipv4 = 'pre-policy' monitor_ipv6 = 'pre-policy' mirror_buffer = '32000000' bmp_path = base_path + ['bmp'] target_path = bmp_path + ['target', target_name] # by default the 'bmp' module not loaded for the bgpd expect Error self.cli_set(bmp_path) if not process_named_running('bgpd', 'bmp'): with self.assertRaises(ConfigSessionError): self.cli_commit() # add required 'bmp' module to bgpd and restart bgpd self.cli_delete(bmp_path) self.cli_set(['system', 'frr', 'bmp']) self.cli_commit() # restart bgpd to apply "-M bmp" and update PID cmd(f'sudo kill -9 {self.daemon_pid}') # let the bgpd process recover sleep(10) # update daemon PID - this was a planned daemon restart - self.daemon_pid = process_named_running(PROCESS_NAME) + self.daemon_pid = process_named_running(bgp_daemon) # set bmp config but not set address self.cli_set(target_path + ['port', target_port]) # address is not set, expect Error with self.assertRaises(ConfigSessionError): self.cli_commit() # config other bmp options self.cli_set(target_path + ['address', target_address]) self.cli_set(bmp_path + ['mirror-buffer-limit', mirror_buffer]) self.cli_set(target_path + ['port', target_port]) self.cli_set(target_path + ['min-retry', min_retry]) self.cli_set(target_path + ['max-retry', max_retry]) self.cli_set(target_path + ['mirror']) self.cli_set(target_path + ['monitor', 'ipv4-unicast', monitor_ipv4]) self.cli_set(target_path + ['monitor', 'ipv6-unicast', monitor_ipv6]) self.cli_commit() # Verify bgpd bmp configuration frrconfig = self.getFRRconfig(f'router bgp {ASN}') self.assertIn(f'bmp mirror buffer-limit {mirror_buffer}', frrconfig) self.assertIn(f'bmp targets {target_name}', frrconfig) self.assertIn(f'bmp mirror', frrconfig) self.assertIn(f'bmp monitor ipv4 unicast {monitor_ipv4}', frrconfig) self.assertIn(f'bmp monitor ipv6 unicast {monitor_ipv6}', frrconfig) self.assertIn(f'bmp connect {target_address} port {target_port} min-retry {min_retry} max-retry {max_retry}', frrconfig) if __name__ == '__main__': unittest.main(verbosity=2) diff --git a/smoketest/scripts/cli/test_protocols_isis.py b/smoketest/scripts/cli/test_protocols_isis.py index 2fddbfeba..bde32a1ca 100755 --- a/smoketest/scripts/cli/test_protocols_isis.py +++ b/smoketest/scripts/cli/test_protocols_isis.py @@ -1,417 +1,416 @@ #!/usr/bin/env python3 # # Copyright (C) 2021-2024 VyOS maintainers and contributors # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License version 2 or later as # published by the Free Software Foundation. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. import unittest from base_vyostest_shim import VyOSUnitTestSHIM from vyos.configsession import ConfigSessionError from vyos.ifconfig import Section from vyos.utils.process import process_named_running from vyos.frrender import isis_daemon -PROCESS_NAME = 'isisd' base_path = ['protocols', 'isis'] domain = 'VyOS' net = '49.0001.1921.6800.1002.00' class TestProtocolsISIS(VyOSUnitTestSHIM.TestCase): @classmethod def setUpClass(cls): cls._interfaces = Section.interfaces('ethernet') # call base-classes classmethod super(TestProtocolsISIS, cls).setUpClass() # Retrieve FRR daemon PID - it is not allowed to crash, thus PID must remain the same - cls.daemon_pid = process_named_running(PROCESS_NAME) + cls.daemon_pid = process_named_running(isis_daemon) # ensure we can also run this test on a live system - so lets clean # out the current configuration :) cls.cli_delete(cls, base_path) cls.cli_delete(cls, ['vrf']) def tearDown(self): # cleanup any possible VRF mess self.cli_delete(['vrf']) # always destrox the entire isisd configuration to make the processes # life as hard as possible self.cli_delete(base_path) self.cli_commit() # check process health and continuity - self.assertEqual(self.daemon_pid, process_named_running(PROCESS_NAME)) + self.assertEqual(self.daemon_pid, process_named_running(isis_daemon)) def isis_base_config(self): self.cli_set(base_path + ['net', net]) for interface in self._interfaces: self.cli_set(base_path + ['interface', interface]) def test_isis_01_redistribute(self): prefix_list = 'EXPORT-ISIS' route_map = 'EXPORT-ISIS' rule = '10' metric_style = 'transition' self.cli_set(['policy', 'prefix-list', prefix_list, 'rule', rule, 'action', 'permit']) self.cli_set(['policy', 'prefix-list', prefix_list, 'rule', rule, 'prefix', '203.0.113.0/24']) self.cli_set(['policy', 'route-map', route_map, 'rule', rule, 'action', 'permit']) self.cli_set(['policy', 'route-map', route_map, 'rule', rule, 'match', 'ip', 'address', 'prefix-list', prefix_list]) self.cli_set(base_path) # verify() - net id and interface are mandatory with self.assertRaises(ConfigSessionError): self.cli_commit() self.isis_base_config() self.cli_set(base_path + ['redistribute', 'ipv4', 'connected']) # verify() - Redistribute level-1 or level-2 should be specified with self.assertRaises(ConfigSessionError): self.cli_commit() self.cli_set(base_path + ['redistribute', 'ipv4', 'connected', 'level-2', 'route-map', route_map]) self.cli_set(base_path + ['metric-style', metric_style]) self.cli_set(base_path + ['log-adjacency-changes']) # Commit all changes self.cli_commit() # Verify all changes tmp = self.getFRRconfig(f'router isis {domain}', daemon=isis_daemon) self.assertIn(f' net {net}', tmp) self.assertIn(f' metric-style {metric_style}', tmp) self.assertIn(f' log-adjacency-changes', tmp) self.assertIn(f' redistribute ipv4 connected level-2 route-map {route_map}', tmp) for interface in self._interfaces: tmp = self.getFRRconfig(f'interface {interface}', daemon=isis_daemon) self.assertIn(f' ip router isis {domain}', tmp) self.assertIn(f' ipv6 router isis {domain}', tmp) self.cli_delete(['policy', 'route-map', route_map]) self.cli_delete(['policy', 'prefix-list', prefix_list]) def test_isis_02_vrfs(self): vrfs = ['red', 'green', 'blue'] # It is safe to assume that when the basic VRF test works, all other # IS-IS related features work, as we entirely inherit the CLI templates # and Jinja2 FRR template. table = '1000' vrf = 'red' vrf_base = ['vrf', 'name', vrf] vrf_iface = 'eth1' self.cli_set(vrf_base + ['table', table]) self.cli_set(vrf_base + ['protocols', 'isis', 'net', net]) self.cli_set(vrf_base + ['protocols', 'isis', 'interface', vrf_iface]) self.cli_set(vrf_base + ['protocols', 'isis', 'advertise-high-metrics']) self.cli_set(vrf_base + ['protocols', 'isis', 'advertise-passive-only']) self.cli_set(['interfaces', 'ethernet', vrf_iface, 'vrf', vrf]) # Also set a default VRF IS-IS config self.cli_set(base_path + ['net', net]) self.cli_set(base_path + ['interface', 'eth0']) self.cli_commit() # Verify FRR isisd configuration tmp = self.getFRRconfig(f'router isis {domain}', daemon=isis_daemon) self.assertIn(f'router isis {domain}', tmp) self.assertIn(f' net {net}', tmp) tmp = self.getFRRconfig(f'router isis {domain} vrf {vrf}', daemon=isis_daemon) self.assertIn(f'router isis {domain} vrf {vrf}', tmp) self.assertIn(f' net {net}', tmp) self.assertIn(f' advertise-high-metrics', tmp) self.assertIn(f' advertise-passive-only', tmp) self.cli_delete(['vrf', 'name', vrf]) self.cli_delete(['interfaces', 'ethernet', vrf_iface, 'vrf']) def test_isis_04_default_information(self): metric = '50' route_map = 'default-foo-' self.isis_base_config() for afi in ['ipv4', 'ipv6']: for level in ['level-1', 'level-2']: self.cli_set(base_path + ['default-information', 'originate', afi, level, 'always']) self.cli_set(base_path + ['default-information', 'originate', afi, level, 'metric', metric]) self.cli_set(base_path + ['default-information', 'originate', afi, level, 'route-map', route_map + level + afi]) # Commit all changes self.cli_commit() # Verify all changes tmp = self.getFRRconfig(f'router isis {domain}', daemon=isis_daemon) self.assertIn(f' net {net}', tmp) for afi in ['ipv4', 'ipv6']: for level in ['level-1', 'level-2']: route_map_name = route_map + level + afi self.assertIn(f' default-information originate {afi} {level} always route-map {route_map_name} metric {metric}', tmp) def test_isis_05_password(self): password = 'foo' self.isis_base_config() for interface in self._interfaces: self.cli_set(base_path + ['interface', interface, 'password', 'plaintext-password', f'{password}-{interface}']) self.cli_set(base_path + ['area-password', 'plaintext-password', password]) self.cli_set(base_path + ['area-password', 'md5', password]) self.cli_set(base_path + ['domain-password', 'plaintext-password', password]) self.cli_set(base_path + ['domain-password', 'md5', password]) # verify() - can not use both md5 and plaintext-password for area-password with self.assertRaises(ConfigSessionError): self.cli_commit() self.cli_delete(base_path + ['area-password', 'md5', password]) # verify() - can not use both md5 and plaintext-password for domain-password with self.assertRaises(ConfigSessionError): self.cli_commit() self.cli_delete(base_path + ['domain-password', 'md5', password]) # Commit all changes self.cli_commit() # Verify all changes tmp = self.getFRRconfig(f'router isis {domain}', daemon=isis_daemon) self.assertIn(f' net {net}', tmp) self.assertIn(f' domain-password clear {password}', tmp) self.assertIn(f' area-password clear {password}', tmp) for interface in self._interfaces: tmp = self.getFRRconfig(f'interface {interface}', daemon=isis_daemon) self.assertIn(f' isis password clear {password}-{interface}', tmp) def test_isis_06_spf_delay_bfd(self): network = 'point-to-point' holddown = '10' init_delay = '50' long_delay = '200' short_delay = '100' time_to_learn = '75' bfd_profile = 'isis-bfd' self.cli_set(base_path + ['net', net]) for interface in self._interfaces: self.cli_set(base_path + ['interface', interface, 'network', network]) self.cli_set(base_path + ['interface', interface, 'bfd', 'profile', bfd_profile]) self.cli_set(base_path + ['spf-delay-ietf', 'holddown', holddown]) # verify() - All types of spf-delay must be configured with self.assertRaises(ConfigSessionError): self.cli_commit() self.cli_set(base_path + ['spf-delay-ietf', 'init-delay', init_delay]) # verify() - All types of spf-delay must be configured with self.assertRaises(ConfigSessionError): self.cli_commit() self.cli_set(base_path + ['spf-delay-ietf', 'long-delay', long_delay]) # verify() - All types of spf-delay must be configured with self.assertRaises(ConfigSessionError): self.cli_commit() self.cli_set(base_path + ['spf-delay-ietf', 'short-delay', short_delay]) # verify() - All types of spf-delay must be configured with self.assertRaises(ConfigSessionError): self.cli_commit() self.cli_set(base_path + ['spf-delay-ietf', 'time-to-learn', time_to_learn]) # Commit all changes self.cli_commit() # Verify all changes tmp = self.getFRRconfig(f'router isis {domain}', daemon=isis_daemon) self.assertIn(f' net {net}', tmp) self.assertIn(f' spf-delay-ietf init-delay {init_delay} short-delay {short_delay} long-delay {long_delay} holddown {holddown} time-to-learn {time_to_learn}', tmp) for interface in self._interfaces: tmp = self.getFRRconfig(f'interface {interface}', daemon=isis_daemon) self.assertIn(f' ip router isis {domain}', tmp) self.assertIn(f' ipv6 router isis {domain}', tmp) self.assertIn(f' isis network {network}', tmp) self.assertIn(f' isis bfd', tmp) self.assertIn(f' isis bfd profile {bfd_profile}', tmp) def test_isis_07_segment_routing_configuration(self): global_block_low = "300" global_block_high = "399" local_block_low = "400" local_block_high = "499" interface = 'lo' maximum_stack_size = '5' prefix_one = '192.168.0.1/32' prefix_two = '192.168.0.2/32' prefix_three = '192.168.0.3/32' prefix_four = '192.168.0.4/32' prefix_one_value = '1' prefix_two_value = '2' prefix_three_value = '60000' prefix_four_value = '65000' self.cli_set(base_path + ['net', net]) self.cli_set(base_path + ['interface', interface]) self.cli_set(base_path + ['segment-routing', 'maximum-label-depth', maximum_stack_size]) self.cli_set(base_path + ['segment-routing', 'global-block', 'low-label-value', global_block_low]) self.cli_set(base_path + ['segment-routing', 'global-block', 'high-label-value', global_block_high]) self.cli_set(base_path + ['segment-routing', 'local-block', 'low-label-value', local_block_low]) self.cli_set(base_path + ['segment-routing', 'local-block', 'high-label-value', local_block_high]) self.cli_set(base_path + ['segment-routing', 'prefix', prefix_one, 'index', 'value', prefix_one_value]) self.cli_set(base_path + ['segment-routing', 'prefix', prefix_one, 'index', 'explicit-null']) self.cli_set(base_path + ['segment-routing', 'prefix', prefix_two, 'index', 'value', prefix_two_value]) self.cli_set(base_path + ['segment-routing', 'prefix', prefix_two, 'index', 'no-php-flag']) self.cli_set(base_path + ['segment-routing', 'prefix', prefix_three, 'absolute', 'value', prefix_three_value]) self.cli_set(base_path + ['segment-routing', 'prefix', prefix_three, 'absolute', 'explicit-null']) self.cli_set(base_path + ['segment-routing', 'prefix', prefix_four, 'absolute', 'value', prefix_four_value]) self.cli_set(base_path + ['segment-routing', 'prefix', prefix_four, 'absolute', 'no-php-flag']) # Commit all changes self.cli_commit() # Verify all changes tmp = self.getFRRconfig(f'router isis {domain}', daemon=isis_daemon) self.assertIn(f' net {net}', tmp) self.assertIn(f' segment-routing on', tmp) self.assertIn(f' segment-routing global-block {global_block_low} {global_block_high} local-block {local_block_low} {local_block_high}', tmp) self.assertIn(f' segment-routing node-msd {maximum_stack_size}', tmp) self.assertIn(f' segment-routing prefix {prefix_one} index {prefix_one_value} explicit-null', tmp) self.assertIn(f' segment-routing prefix {prefix_two} index {prefix_two_value} no-php-flag', tmp) self.assertIn(f' segment-routing prefix {prefix_three} absolute {prefix_three_value} explicit-null', tmp) self.assertIn(f' segment-routing prefix {prefix_four} absolute {prefix_four_value} no-php-flag', tmp) def test_isis_08_ldp_sync(self): holddown = "500" interface = 'lo' self.cli_set(base_path + ['net', net]) self.cli_set(base_path + ['interface', interface]) self.cli_set(base_path + ['ldp-sync', 'holddown', holddown]) # Commit main ISIS changes self.cli_commit() # Verify main ISIS changes tmp = self.getFRRconfig(f'router isis {domain}', daemon=isis_daemon) self.assertIn(f' net {net}', tmp) self.assertIn(f' mpls ldp-sync', tmp) self.assertIn(f' mpls ldp-sync holddown {holddown}', tmp) for interface in self._interfaces: self.cli_set(base_path + ['interface', interface, 'ldp-sync', 'holddown', holddown]) # Commit interface changes for holddown self.cli_commit() for interface in self._interfaces: # Verify interface changes for holddown tmp = self.getFRRconfig(f'interface {interface}', daemon=isis_daemon) self.assertIn(f'interface {interface}', tmp) self.assertIn(f' ip router isis {domain}', tmp) self.assertIn(f' ipv6 router isis {domain}', tmp) self.assertIn(f' isis mpls ldp-sync holddown {holddown}', tmp) for interface in self._interfaces: self.cli_set(base_path + ['interface', interface, 'ldp-sync', 'disable']) # Commit interface changes for disable self.cli_commit() for interface in self._interfaces: # Verify interface changes for disable tmp = self.getFRRconfig(f'interface {interface}', daemon=isis_daemon) self.assertIn(f'interface {interface}', tmp) self.assertIn(f' ip router isis {domain}', tmp) self.assertIn(f' ipv6 router isis {domain}', tmp) self.assertIn(f' no isis mpls ldp-sync', tmp) def test_isis_09_lfa(self): prefix_list = 'lfa-prefix-list-test-1' prefix_list_address = '192.168.255.255/32' interface = 'lo' self.cli_set(base_path + ['net', net]) self.cli_set(base_path + ['interface', interface]) self.cli_set(['policy', 'prefix-list', prefix_list, 'rule', '1', 'action', 'permit']) self.cli_set(['policy', 'prefix-list', prefix_list, 'rule', '1', 'prefix', prefix_list_address]) # Commit main ISIS changes self.cli_commit() # Add remote portion of LFA with prefix list with validation for level in ['level-1', 'level-2']: self.cli_set(base_path + ['fast-reroute', 'lfa', 'remote', 'prefix-list', prefix_list, level]) self.cli_commit() tmp = self.getFRRconfig(f'router isis {domain}', daemon=isis_daemon) self.assertIn(f' net {net}', tmp) self.assertIn(f' fast-reroute remote-lfa prefix-list {prefix_list} {level}', tmp) self.cli_delete(base_path + ['fast-reroute']) self.cli_commit() # Add local portion of LFA load-sharing portion with validation for level in ['level-1', 'level-2']: self.cli_set(base_path + ['fast-reroute', 'lfa', 'local', 'load-sharing', 'disable', level]) self.cli_commit() tmp = self.getFRRconfig(f'router isis {domain}', daemon=isis_daemon) self.assertIn(f' net {net}', tmp) self.assertIn(f' fast-reroute load-sharing disable {level}', tmp) self.cli_delete(base_path + ['fast-reroute']) self.cli_commit() # Add local portion of LFA priority-limit portion with validation for priority in ['critical', 'high', 'medium']: for level in ['level-1', 'level-2']: self.cli_set(base_path + ['fast-reroute', 'lfa', 'local', 'priority-limit', priority, level]) self.cli_commit() tmp = self.getFRRconfig(f'router isis {domain}', daemon=isis_daemon) self.assertIn(f' net {net}', tmp) self.assertIn(f' fast-reroute priority-limit {priority} {level}', tmp) self.cli_delete(base_path + ['fast-reroute']) self.cli_commit() # Add local portion of LFA tiebreaker portion with validation index = '100' for tiebreaker in ['downstream','lowest-backup-metric','node-protecting']: for level in ['level-1', 'level-2']: self.cli_set(base_path + ['fast-reroute', 'lfa', 'local', 'tiebreaker', tiebreaker, 'index', index, level]) self.cli_commit() tmp = self.getFRRconfig(f'router isis {domain}', daemon=isis_daemon) self.assertIn(f' net {net}', tmp) self.assertIn(f' fast-reroute lfa tiebreaker {tiebreaker} index {index} {level}', tmp) self.cli_delete(base_path + ['fast-reroute']) self.cli_commit() # Clean up and remove prefix list self.cli_delete(['policy', 'prefix-list', prefix_list]) self.cli_commit() def test_isis_10_topology(self): topologies = ['ipv4-multicast', 'ipv4-mgmt', 'ipv6-unicast', 'ipv6-multicast', 'ipv6-mgmt'] interface = 'lo' # Set a basic IS-IS config self.cli_set(base_path + ['net', net]) self.cli_set(base_path + ['interface', interface]) for topology in topologies: self.cli_set(base_path + ['topology', topology]) self.cli_commit() tmp = self.getFRRconfig(f'router isis {domain}', daemon=isis_daemon) self.assertIn(f' net {net}', tmp) self.assertIn(f' topology {topology}', tmp) if __name__ == '__main__': unittest.main(verbosity=2) diff --git a/smoketest/scripts/cli/test_protocols_mpls.py b/smoketest/scripts/cli/test_protocols_mpls.py index 0c1599f9b..88528973d 100755 --- a/smoketest/scripts/cli/test_protocols_mpls.py +++ b/smoketest/scripts/cli/test_protocols_mpls.py @@ -1,120 +1,120 @@ #!/usr/bin/env python3 # # Copyright (C) 2021-2023 VyOS maintainers and contributors # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License version 2 or later as # published by the Free Software Foundation. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. import unittest from base_vyostest_shim import VyOSUnitTestSHIM from vyos.configsession import ConfigSessionError from vyos.ifconfig import Section +from vyos.frrender import ldpd_daemon from vyos.utils.process import process_named_running -PROCESS_NAME = 'ldpd' base_path = ['protocols', 'mpls', 'ldp'] peers = { '192.0.2.10' : { 'intv_rx' : '500', 'intv_tx' : '600', 'multihop' : '', 'source_addr': '192.0.2.254', }, '192.0.2.20' : { 'echo_mode' : '', 'intv_echo' : '100', 'intv_mult' : '100', 'intv_rx' : '222', 'intv_tx' : '333', 'passive' : '', 'shutdown' : '', }, '2001:db8::a' : { 'source_addr': '2001:db8::1', }, '2001:db8::b' : { 'source_addr': '2001:db8::1', 'multihop' : '', }, } profiles = { 'foo' : { 'echo_mode' : '', 'intv_echo' : '100', 'intv_mult' : '101', 'intv_rx' : '222', 'intv_tx' : '333', 'shutdown' : '', }, 'bar' : { 'intv_mult' : '102', 'intv_rx' : '444', 'passive' : '', }, } class TestProtocolsMPLS(VyOSUnitTestSHIM.TestCase): @classmethod def setUpClass(cls): super(TestProtocolsMPLS, cls).setUpClass() # Retrieve FRR daemon PID - it is not allowed to crash, thus PID must remain the same - cls.daemon_pid = process_named_running(PROCESS_NAME) + cls.daemon_pid = process_named_running(ldpd_daemon) # ensure we can also run this test on a live system - so lets clean # out the current configuration :) cls.cli_delete(cls, base_path) def tearDown(self): self.cli_delete(base_path) self.cli_commit() # check process health and continuity - self.assertEqual(self.daemon_pid, process_named_running(PROCESS_NAME)) + self.assertEqual(self.daemon_pid, process_named_running(ldpd_daemon)) def test_mpls_basic(self): router_id = '1.2.3.4' transport_ipv4_addr = '5.6.7.8' interfaces = Section.interfaces('ethernet') self.cli_set(base_path + ['router-id', router_id]) # At least one LDP interface must be configured with self.assertRaises(ConfigSessionError): self.cli_commit() for interface in interfaces: self.cli_set(base_path + ['interface', interface]) # LDP transport address missing with self.assertRaises(ConfigSessionError): self.cli_commit() self.cli_set(base_path + ['discovery', 'transport-ipv4-address', transport_ipv4_addr]) # Commit changes self.cli_commit() # Validate configuration - frrconfig = self.getFRRconfig('mpls ldp', daemon=PROCESS_NAME) + frrconfig = self.getFRRconfig('mpls ldp', daemon=ldpd_daemon) self.assertIn(f'mpls ldp', frrconfig) self.assertIn(f' router-id {router_id}', frrconfig) # Validate AFI IPv4 - afiv4_config = self.getFRRconfig(' address-family ipv4', daemon=PROCESS_NAME) + afiv4_config = self.getFRRconfig(' address-family ipv4', daemon=ldpd_daemon) self.assertIn(f' discovery transport-address {transport_ipv4_addr}', afiv4_config) for interface in interfaces: self.assertIn(f' interface {interface}', afiv4_config) if __name__ == '__main__': unittest.main(verbosity=2) diff --git a/smoketest/scripts/cli/test_protocols_openfabric.py b/smoketest/scripts/cli/test_protocols_openfabric.py index 3e99656ec..f1372d7ab 100644 --- a/smoketest/scripts/cli/test_protocols_openfabric.py +++ b/smoketest/scripts/cli/test_protocols_openfabric.py @@ -1,187 +1,186 @@ #!/usr/bin/env python3 # # Copyright (C) 2024 VyOS maintainers and contributors # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License version 2 or later as # published by the Free Software Foundation. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. import unittest from base_vyostest_shim import VyOSUnitTestSHIM from vyos.configsession import ConfigSessionError from vyos.utils.process import process_named_running from vyos.frrender import openfabric_daemon -PROCESS_NAME = 'fabricd' base_path = ['protocols', 'openfabric'] domain = 'VyOS' net = '49.0001.1111.1111.1111.00' dummy_if = 'dum1234' address_families = ['ipv4', 'ipv6'] path = base_path + ['domain', domain] class TestProtocolsOpenFabric(VyOSUnitTestSHIM.TestCase): @classmethod def setUpClass(cls): # call base-classes classmethod super(TestProtocolsOpenFabric, cls).setUpClass() # Retrieve FRR daemon PID - it is not allowed to crash, thus PID must remain the same - cls.daemon_pid = process_named_running(PROCESS_NAME) + cls.daemon_pid = process_named_running(openfabric_daemon) # ensure we can also run this test on a live system - so lets clean # out the current configuration :) cls.cli_delete(cls, base_path) def tearDown(self): self.cli_delete(base_path) self.cli_commit() # check process health and continuity - self.assertEqual(self.daemon_pid, process_named_running(PROCESS_NAME)) + self.assertEqual(self.daemon_pid, process_named_running(openfabric_daemon)) def openfabric_base_config(self): self.cli_set(['interfaces', 'dummy', dummy_if]) self.cli_set(base_path + ['net', net]) for family in address_families: self.cli_set(path + ['interface', dummy_if, 'address-family', family]) def test_openfabric_01_router_params(self): fabric_tier = '5' lsp_gen_interval = '20' self.cli_set(base_path) # verify() - net id and domain name are mandatory with self.assertRaises(ConfigSessionError): self.cli_commit() self.openfabric_base_config() self.cli_set(path + ['log-adjacency-changes']) self.cli_set(path + ['set-overload-bit']) self.cli_set(path + ['fabric-tier', fabric_tier]) self.cli_set(path + ['lsp-gen-interval', lsp_gen_interval]) # Commit all changes self.cli_commit() # Verify all changes tmp = self.getFRRconfig(f'router openfabric {domain}', daemon=openfabric_daemon) self.assertIn(f' net {net}', tmp) self.assertIn(f' log-adjacency-changes', tmp) self.assertIn(f' set-overload-bit', tmp) self.assertIn(f' fabric-tier {fabric_tier}', tmp) self.assertIn(f' lsp-gen-interval {lsp_gen_interval}', tmp) tmp = self.getFRRconfig(f'interface {dummy_if}', daemon=openfabric_daemon) self.assertIn(f' ip router openfabric {domain}', tmp) self.assertIn(f' ipv6 router openfabric {domain}', tmp) def test_openfabric_02_loopback_interface(self): interface = 'lo' hello_interval = '100' metric = '24478' self.openfabric_base_config() self.cli_set(path + ['interface', interface, 'address-family', 'ipv4']) self.cli_set(path + ['interface', interface, 'hello-interval', hello_interval]) self.cli_set(path + ['interface', interface, 'metric', metric]) # Commit all changes self.cli_commit() # Verify FRR openfabric configuration tmp = self.getFRRconfig(f'router openfabric {domain}', daemon=openfabric_daemon) self.assertIn(f'router openfabric {domain}', tmp) self.assertIn(f' net {net}', tmp) # Verify interface configuration tmp = self.getFRRconfig(f'interface {interface}', daemon=openfabric_daemon) self.assertIn(f' ip router openfabric {domain}', tmp) # for lo interface 'openfabric passive' is implied self.assertIn(f' openfabric passive', tmp) self.assertIn(f' openfabric metric {metric}', tmp) def test_openfabric_03_password(self): password = 'foo' self.openfabric_base_config() self.cli_set(path + ['interface', dummy_if, 'password', 'plaintext-password', f'{password}-{dummy_if}']) self.cli_set(path + ['interface', dummy_if, 'password', 'md5', f'{password}-{dummy_if}']) # verify() - can not use both md5 and plaintext-password for password for the interface with self.assertRaises(ConfigSessionError): self.cli_commit() self.cli_delete(path + ['interface', dummy_if, 'password', 'md5']) self.cli_set(path + ['domain-password', 'plaintext-password', password]) self.cli_set(path + ['domain-password', 'md5', password]) # verify() - can not use both md5 and plaintext-password for domain-password with self.assertRaises(ConfigSessionError): self.cli_commit() self.cli_delete(path + ['domain-password', 'md5']) # Commit all changes self.cli_commit() # Verify all changes tmp = self.getFRRconfig(f'router openfabric {domain}', daemon=openfabric_daemon) self.assertIn(f' net {net}', tmp) self.assertIn(f' domain-password clear {password}', tmp) tmp = self.getFRRconfig(f'interface {dummy_if}', daemon=openfabric_daemon) self.assertIn(f' openfabric password clear {password}-{dummy_if}', tmp) def test_openfabric_multiple_domains(self): domain_2 = 'VyOS_2' interface = 'dum5678' new_path = base_path + ['domain', domain_2] self.openfabric_base_config() # set same interface for 2 OpenFabric domains self.cli_set(['interfaces', 'dummy', interface]) self.cli_set(new_path + ['interface', interface, 'address-family', 'ipv4']) self.cli_set(path + ['interface', interface, 'address-family', 'ipv4']) # verify() - same interface can be used only for one OpenFabric instance with self.assertRaises(ConfigSessionError): self.cli_commit() self.cli_delete(path + ['interface', interface]) # Commit all changes self.cli_commit() # Verify FRR openfabric configuration tmp = self.getFRRconfig(f'router openfabric {domain}', daemon=openfabric_daemon) self.assertIn(f'router openfabric {domain}', tmp) self.assertIn(f' net {net}', tmp) tmp = self.getFRRconfig(f'router openfabric {domain_2}', daemon=openfabric_daemon) self.assertIn(f'router openfabric {domain_2}', tmp) self.assertIn(f' net {net}', tmp) # Verify interface configuration tmp = self.getFRRconfig(f'interface {dummy_if}', daemon=openfabric_daemon) self.assertIn(f' ip router openfabric {domain}', tmp) self.assertIn(f' ipv6 router openfabric {domain}', tmp) tmp = self.getFRRconfig(f'interface {interface}', daemon=openfabric_daemon) self.assertIn(f' ip router openfabric {domain_2}', tmp) if __name__ == '__main__': unittest.main(verbosity=2) diff --git a/smoketest/scripts/cli/test_protocols_ospf.py b/smoketest/scripts/cli/test_protocols_ospf.py index 97543048f..2bc9cf2a5 100755 --- a/smoketest/scripts/cli/test_protocols_ospf.py +++ b/smoketest/scripts/cli/test_protocols_ospf.py @@ -1,594 +1,594 @@ #!/usr/bin/env python3 # # Copyright (C) 2021-2024 VyOS maintainers and contributors # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License version 2 or later as # published by the Free Software Foundation. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. import unittest from time import sleep from base_vyostest_shim import VyOSUnitTestSHIM from vyos.configsession import ConfigSessionError from vyos.ifconfig import Section +from vyos.frrender import ospf_daemon from vyos.utils.process import process_named_running from vyos.xml_ref import default_value -PROCESS_NAME = 'ospfd' base_path = ['protocols', 'ospf'] route_map = 'foo-bar-baz10' dummy_if = 'dum3562' class TestProtocolsOSPF(VyOSUnitTestSHIM.TestCase): @classmethod def setUpClass(cls): super(TestProtocolsOSPF, cls).setUpClass() # Retrieve FRR daemon PID - it is not allowed to crash, thus PID must remain the same - cls.daemon_pid = process_named_running(PROCESS_NAME) + cls.daemon_pid = process_named_running(ospf_daemon) cls.cli_set(cls, ['policy', 'route-map', route_map, 'rule', '10', 'action', 'permit']) cls.cli_set(cls, ['policy', 'route-map', route_map, 'rule', '20', 'action', 'permit']) cls.cli_set(cls, ['interfaces', 'dummy', dummy_if]) # ensure we can also run this test on a live system - so lets clean # out the current configuration :) cls.cli_delete(cls, base_path) @classmethod def tearDownClass(cls): cls.cli_delete(cls, ['policy', 'route-map', route_map]) cls.cli_delete(cls, ['interfaces', 'dummy', dummy_if]) super(TestProtocolsOSPF, cls).tearDownClass() def tearDown(self): self.cli_delete(base_path) self.cli_commit() - frrconfig = self.getFRRconfig('router ospf') + frrconfig = self.getFRRconfig('router ospf', endsection='^exit', daemon=ospf_daemon) self.assertNotIn(f'router ospf', frrconfig) # check process health and continuity - self.assertEqual(self.daemon_pid, process_named_running(PROCESS_NAME)) + self.assertEqual(self.daemon_pid, process_named_running(ospf_daemon)) def test_ospf_01_defaults(self): # commit changes self.cli_set(base_path) self.cli_commit() # Verify FRR ospfd configuration - frrconfig = self.getFRRconfig('router ospf', daemon=PROCESS_NAME) + frrconfig = self.getFRRconfig('router ospf', endsection='^exit', daemon=ospf_daemon) self.assertIn(f'router ospf', frrconfig) self.assertIn(f' auto-cost reference-bandwidth 100', frrconfig) self.assertIn(f' timers throttle spf 200 1000 10000', frrconfig) # defaults def test_ospf_02_simple(self): router_id = '127.0.0.1' abr_type = 'ibm' bandwidth = '1000' metric = '123' self.cli_set(base_path + ['auto-cost', 'reference-bandwidth', bandwidth]) self.cli_set(base_path + ['parameters', 'router-id', router_id]) self.cli_set(base_path + ['parameters', 'abr-type', abr_type]) self.cli_set(base_path + ['parameters', 'opaque-lsa']) self.cli_set(base_path + ['parameters', 'rfc1583-compatibility']) self.cli_set(base_path + ['log-adjacency-changes', 'detail']) self.cli_set(base_path + ['default-metric', metric]) self.cli_set(base_path + ['passive-interface', 'default']) self.cli_set(base_path + ['area', '10', 'area-type', 'stub']) self.cli_set(base_path + ['area', '10', 'network', '10.0.0.0/16']) self.cli_set(base_path + ['area', '10', 'range', '10.0.1.0/24']) self.cli_set(base_path + ['area', '10', 'range', '10.0.2.0/24', 'not-advertise']) # commit changes self.cli_commit() # Verify FRR ospfd configuration - frrconfig = self.getFRRconfig('router ospf', daemon=PROCESS_NAME) + frrconfig = self.getFRRconfig('router ospf', endsection='^exit', daemon=ospf_daemon) self.assertIn(f'router ospf', frrconfig) self.assertIn(f' compatible rfc1583', frrconfig) self.assertIn(f' auto-cost reference-bandwidth {bandwidth}', frrconfig) self.assertIn(f' ospf router-id {router_id}', frrconfig) self.assertIn(f' ospf abr-type {abr_type}', frrconfig) self.assertIn(f' timers throttle spf 200 1000 10000', frrconfig) # defaults self.assertIn(f' capability opaque', frrconfig) self.assertIn(f' default-metric {metric}', frrconfig) self.assertIn(f' passive-interface default', frrconfig) self.assertIn(f' area 10 stub', frrconfig) self.assertIn(f' network 10.0.0.0/16 area 10', frrconfig) self.assertIn(f' area 10 range 10.0.1.0/24', frrconfig) self.assertNotIn(f' area 10 range 10.0.1.0/24 not-advertise', frrconfig) self.assertIn(f' area 10 range 10.0.2.0/24 not-advertise', frrconfig) def test_ospf_03_access_list(self): acl = '100' seq = '10' protocols = ['bgp', 'connected', 'isis', 'kernel', 'rip', 'static'] self.cli_set(['policy', 'access-list', acl, 'rule', seq, 'action', 'permit']) self.cli_set(['policy', 'access-list', acl, 'rule', seq, 'source', 'any']) self.cli_set(['policy', 'access-list', acl, 'rule', seq, 'destination', 'any']) for ptotocol in protocols: self.cli_set(base_path + ['access-list', acl, 'export', ptotocol]) # commit changes self.cli_commit() # Verify FRR ospfd configuration - frrconfig = self.getFRRconfig('router ospf', daemon=PROCESS_NAME) + frrconfig = self.getFRRconfig('router ospf', endsection='^exit', daemon=ospf_daemon) self.assertIn(f'router ospf', frrconfig) self.assertIn(f' timers throttle spf 200 1000 10000', frrconfig) # defaults for ptotocol in protocols: self.assertIn(f' distribute-list {acl} out {ptotocol}', frrconfig) # defaults self.cli_delete(['policy', 'access-list', acl]) def test_ospf_04_default_originate(self): seq = '100' metric = '50' metric_type = '1' self.cli_set(base_path + ['default-information', 'originate', 'metric', metric]) self.cli_set(base_path + ['default-information', 'originate', 'metric-type', metric_type]) self.cli_set(base_path + ['default-information', 'originate', 'route-map', route_map]) # commit changes self.cli_commit() # Verify FRR ospfd configuration - frrconfig = self.getFRRconfig('router ospf', daemon=PROCESS_NAME) + frrconfig = self.getFRRconfig('router ospf', endsection='^exit', daemon=ospf_daemon) self.assertIn(f'router ospf', frrconfig) self.assertIn(f' timers throttle spf 200 1000 10000', frrconfig) # defaults self.assertIn(f' default-information originate metric {metric} metric-type {metric_type} route-map {route_map}', frrconfig) # Now set 'always' self.cli_set(base_path + ['default-information', 'originate', 'always']) self.cli_commit() # Verify FRR ospfd configuration - frrconfig = self.getFRRconfig('router ospf', daemon=PROCESS_NAME) + frrconfig = self.getFRRconfig('router ospf', endsection='^exit', daemon=ospf_daemon) self.assertIn(f' default-information originate always metric {metric} metric-type {metric_type} route-map {route_map}', frrconfig) def test_ospf_05_options(self): global_distance = '128' intra_area = '100' inter_area = '110' external = '120' on_startup = '30' on_shutdown = '60' refresh = '50' aggregation_timer = '100' summary_nets = { '10.0.1.0/24' : {}, '10.0.2.0/24' : {'tag' : '50'}, '10.0.3.0/24' : {'no_advertise' : {}}, } self.cli_set(base_path + ['distance', 'global', global_distance]) self.cli_set(base_path + ['distance', 'ospf', 'external', external]) self.cli_set(base_path + ['distance', 'ospf', 'intra-area', intra_area]) self.cli_set(base_path + ['max-metric', 'router-lsa', 'on-startup', on_startup]) self.cli_set(base_path + ['max-metric', 'router-lsa', 'on-shutdown', on_shutdown]) self.cli_set(base_path + ['mpls-te', 'enable']) self.cli_set(base_path + ['refresh', 'timers', refresh]) self.cli_set(base_path + ['aggregation', 'timer', aggregation_timer]) for summary, summary_options in summary_nets.items(): self.cli_set(base_path + ['summary-address', summary]) if 'tag' in summary_options: self.cli_set(base_path + ['summary-address', summary, 'tag', summary_options['tag']]) if 'no_advertise' in summary_options: self.cli_set(base_path + ['summary-address', summary, 'no-advertise']) # commit changes self.cli_commit() # Verify FRR ospfd configuration - frrconfig = self.getFRRconfig('router ospf', daemon=PROCESS_NAME) + frrconfig = self.getFRRconfig('router ospf', endsection='^exit', daemon=ospf_daemon) self.assertIn(f'router ospf', frrconfig) self.assertIn(f' mpls-te on', frrconfig) self.assertIn(f' mpls-te router-address 0.0.0.0', frrconfig) # default self.assertIn(f' distance {global_distance}', frrconfig) self.assertIn(f' distance ospf intra-area {intra_area} external {external}', frrconfig) self.assertIn(f' max-metric router-lsa on-startup {on_startup}', frrconfig) self.assertIn(f' max-metric router-lsa on-shutdown {on_shutdown}', frrconfig) self.assertIn(f' refresh timer {refresh}', frrconfig) self.assertIn(f' aggregation timer {aggregation_timer}', frrconfig) for summary, summary_options in summary_nets.items(): self.assertIn(f' summary-address {summary}', frrconfig) if 'tag' in summary_options: tag = summary_options['tag'] self.assertIn(f' summary-address {summary} tag {tag}', frrconfig) if 'no_advertise' in summary_options: self.assertIn(f' summary-address {summary} no-advertise', frrconfig) # enable inter-area self.cli_set(base_path + ['distance', 'ospf', 'inter-area', inter_area]) self.cli_commit() - frrconfig = self.getFRRconfig('router ospf', daemon=PROCESS_NAME) + frrconfig = self.getFRRconfig('router ospf', endsection='^exit', daemon=ospf_daemon) self.assertIn(f' distance ospf intra-area {intra_area} inter-area {inter_area} external {external}', frrconfig) # https://github.com/FRRouting/frr/issues/17011 # We need to wait on_shutdown time, until the OSPF process is removed from the CLI # otherwise the test in tearDown() will fail self.cli_delete(base_path) self.cli_commit() sleep(int(on_shutdown) + 5) # additional grace period of 5 seconds def test_ospf_06_neighbor(self): priority = '10' poll_interval = '20' neighbors = ['1.1.1.1', '2.2.2.2', '3.3.3.3'] for neighbor in neighbors: self.cli_set(base_path + ['neighbor', neighbor, 'priority', priority]) self.cli_set(base_path + ['neighbor', neighbor, 'poll-interval', poll_interval]) # commit changes self.cli_commit() # Verify FRR ospfd configuration - frrconfig = self.getFRRconfig('router ospf', daemon=PROCESS_NAME) + frrconfig = self.getFRRconfig('router ospf', endsection='^exit', daemon=ospf_daemon) self.assertIn(f'router ospf', frrconfig) for neighbor in neighbors: self.assertIn(f' neighbor {neighbor} priority {priority} poll-interval {poll_interval}', frrconfig) # default def test_ospf_07_redistribute(self): metric = '15' metric_type = '1' redistribute = ['babel', 'bgp', 'connected', 'isis', 'kernel', 'rip', 'static'] for protocol in redistribute: self.cli_set(base_path + ['redistribute', protocol, 'metric', metric]) self.cli_set(base_path + ['redistribute', protocol, 'route-map', route_map]) self.cli_set(base_path + ['redistribute', protocol, 'metric-type', metric_type]) # commit changes self.cli_commit() # Verify FRR ospfd configuration - frrconfig = self.getFRRconfig('router ospf', daemon=PROCESS_NAME) + frrconfig = self.getFRRconfig('router ospf', endsection='^exit', daemon=ospf_daemon) self.assertIn(f'router ospf', frrconfig) for protocol in redistribute: self.assertIn(f' redistribute {protocol} metric {metric} metric-type {metric_type} route-map {route_map}', frrconfig) def test_ospf_08_virtual_link(self): networks = ['10.0.0.0/8', '172.16.0.0/12', '192.168.0.0/16'] area = '10' shortcut = 'enable' virtual_link = '192.0.2.1' hello = '6' retransmit = '5' transmit = '5' dead = '40' window_default = default_value(base_path + ['area', area, 'virtual-link', virtual_link, 'retransmit-window']) self.cli_set(base_path + ['area', area, 'shortcut', shortcut]) self.cli_set(base_path + ['area', area, 'virtual-link', virtual_link, 'hello-interval', hello]) self.cli_set(base_path + ['area', area, 'virtual-link', virtual_link, 'retransmit-interval', retransmit]) self.cli_set(base_path + ['area', area, 'virtual-link', virtual_link, 'transmit-delay', transmit]) self.cli_set(base_path + ['area', area, 'virtual-link', virtual_link, 'dead-interval', dead]) for network in networks: self.cli_set(base_path + ['area', area, 'network', network]) # commit changes self.cli_commit() # Verify FRR ospfd configuration - frrconfig = self.getFRRconfig('router ospf', daemon=PROCESS_NAME) + frrconfig = self.getFRRconfig('router ospf', endsection='^exit', daemon=ospf_daemon) self.assertIn(f'router ospf', frrconfig) self.assertIn(f' area {area} shortcut {shortcut}', frrconfig) self.assertIn(f' area {area} virtual-link {virtual_link} hello-interval {hello} retransmit-interval {retransmit} retransmit-window {window_default} transmit-delay {transmit} dead-interval {dead}', frrconfig) for network in networks: self.assertIn(f' network {network} area {area}', frrconfig) def test_ospf_09_interface_configuration(self): interfaces = Section.interfaces('ethernet') password = 'vyos1234' bandwidth = '10000' cost = '150' network = 'point-to-point' priority = '200' bfd_profile = 'vyos-test' self.cli_set(base_path + ['passive-interface', 'default']) for interface in interfaces: base_interface = base_path + ['interface', interface] self.cli_set(base_interface + ['authentication', 'plaintext-password', password]) self.cli_set(base_interface + ['bandwidth', bandwidth]) self.cli_set(base_interface + ['bfd', 'profile', bfd_profile]) self.cli_set(base_interface + ['cost', cost]) self.cli_set(base_interface + ['mtu-ignore']) self.cli_set(base_interface + ['network', network]) self.cli_set(base_interface + ['priority', priority]) self.cli_set(base_interface + ['passive', 'disable']) # commit changes self.cli_commit() - frrconfig = self.getFRRconfig('router ospf', daemon=PROCESS_NAME) + frrconfig = self.getFRRconfig('router ospf', endsection='^exit', daemon=ospf_daemon) self.assertIn(f'router ospf', frrconfig) self.assertIn(f' passive-interface default', frrconfig) for interface in interfaces: # Can not use daemon for getFRRconfig() as bandwidth parameter belongs to zebra process config = self.getFRRconfig(f'interface {interface}') self.assertIn(f'interface {interface}', config) self.assertIn(f' ip ospf authentication-key {password}', config) self.assertIn(f' ip ospf bfd', config) self.assertIn(f' ip ospf bfd profile {bfd_profile}', config) self.assertIn(f' ip ospf cost {cost}', config) self.assertIn(f' ip ospf mtu-ignore', config) self.assertIn(f' ip ospf network {network}', config) self.assertIn(f' ip ospf priority {priority}', config) self.assertIn(f' no ip ospf passive', config) self.assertIn(f' bandwidth {bandwidth}', config) # T5467: Remove interface from OSPF process and VRF self.cli_delete(base_path + ['interface']) self.cli_commit() for interface in interfaces: # T5467: It must also be removed from FRR config - frrconfig = self.getFRRconfig(f'interface {interface}', daemon=PROCESS_NAME) + frrconfig = self.getFRRconfig(f'interface {interface}', daemon=ospf_daemon) self.assertNotIn(f'interface {interface}', frrconfig) # There should be no OSPF related command at all under the interface self.assertNotIn(f' ip ospf', frrconfig) def test_ospf_11_interface_area(self): area = '0' interfaces = Section.interfaces('ethernet') self.cli_set(base_path + ['area', area, 'network', '10.0.0.0/8']) for interface in interfaces: self.cli_set(base_path + ['interface', interface, 'area', area]) # we can not have bot area network and interface area set with self.assertRaises(ConfigSessionError): self.cli_commit() self.cli_delete(base_path + ['area', area, 'network']) self.cli_commit() # Verify FRR ospfd configuration - frrconfig = self.getFRRconfig('router ospf', daemon=PROCESS_NAME) + frrconfig = self.getFRRconfig('router ospf', endsection='^exit', daemon=ospf_daemon) self.assertIn(f'router ospf', frrconfig) for interface in interfaces: - config = self.getFRRconfig(f'interface {interface}', daemon=PROCESS_NAME) + config = self.getFRRconfig(f'interface {interface}', daemon=ospf_daemon) self.assertIn(f'interface {interface}', config) self.assertIn(f' ip ospf area {area}', config) def test_ospf_12_vrfs(self): # It is safe to assume that when the basic VRF test works, all # other OSPF related features work, as we entirely inherit the CLI # templates and Jinja2 FRR template. table = '1000' vrf = 'blue' vrf_base = ['vrf', 'name', vrf] vrf_iface = 'eth1' area = '1' self.cli_set(vrf_base + ['table', table]) self.cli_set(vrf_base + ['protocols', 'ospf', 'interface', vrf_iface, 'area', area]) self.cli_set(['interfaces', 'ethernet', vrf_iface, 'vrf', vrf]) # Also set a default VRF OSPF config self.cli_set(base_path) self.cli_commit() # Verify FRR ospfd configuration - frrconfig = self.getFRRconfig('router ospf', daemon=PROCESS_NAME) + frrconfig = self.getFRRconfig('router ospf', endsection='^exit', daemon=ospf_daemon) self.assertIn(f'router ospf', frrconfig) self.assertIn(f' auto-cost reference-bandwidth 100', frrconfig) self.assertIn(f' timers throttle spf 200 1000 10000', frrconfig) # defaults - frrconfig = self.getFRRconfig(f'router ospf vrf {vrf}', daemon=PROCESS_NAME) + frrconfig = self.getFRRconfig(f'router ospf vrf {vrf}', daemon=ospf_daemon) self.assertIn(f'router ospf vrf {vrf}', frrconfig) self.assertIn(f' auto-cost reference-bandwidth 100', frrconfig) self.assertIn(f' timers throttle spf 200 1000 10000', frrconfig) # defaults - frrconfig = self.getFRRconfig(f'interface {vrf_iface}', daemon=PROCESS_NAME) + frrconfig = self.getFRRconfig(f'interface {vrf_iface}', daemon=ospf_daemon) self.assertIn(f'interface {vrf_iface}', frrconfig) self.assertIn(f' ip ospf area {area}', frrconfig) # T5467: Remove interface from OSPF process and VRF self.cli_delete(vrf_base + ['protocols', 'ospf', 'interface']) self.cli_delete(['interfaces', 'ethernet', vrf_iface, 'vrf']) self.cli_commit() # T5467: It must also be removed from FRR config - frrconfig = self.getFRRconfig(f'interface {vrf_iface}', daemon=PROCESS_NAME) + frrconfig = self.getFRRconfig(f'interface {vrf_iface}', daemon=ospf_daemon) self.assertNotIn(f'interface {vrf_iface}', frrconfig) # There should be no OSPF related command at all under the interface self.assertNotIn(f' ip ospf', frrconfig) # cleanup self.cli_delete(['vrf', 'name', vrf]) self.cli_delete(['interfaces', 'ethernet', vrf_iface, 'vrf']) def test_ospf_13_export_list(self): # Verify explort-list works on ospf-area acl = '100' seq = '10' area = '0.0.0.10' network = '10.0.0.0/8' self.cli_set(['policy', 'access-list', acl, 'rule', seq, 'action', 'permit']) self.cli_set(['policy', 'access-list', acl, 'rule', seq, 'source', 'any']) self.cli_set(['policy', 'access-list', acl, 'rule', seq, 'destination', 'any']) self.cli_set(base_path + ['area', area, 'network', network]) self.cli_set(base_path + ['area', area, 'export-list', acl]) # commit changes self.cli_commit() # Verify FRR ospfd configuration - frrconfig = self.getFRRconfig('router ospf', daemon=PROCESS_NAME) + frrconfig = self.getFRRconfig('router ospf', endsection='^exit', daemon=ospf_daemon) self.assertIn(f'router ospf', frrconfig) self.assertIn(f' timers throttle spf 200 1000 10000', frrconfig) # default self.assertIn(f' network {network} area {area}', frrconfig) self.assertIn(f' area {area} export-list {acl}', frrconfig) def test_ospf_14_segment_routing_configuration(self): global_block_low = "300" global_block_high = "399" local_block_low = "400" local_block_high = "499" maximum_stack_size = '5' prefix_one = '192.168.0.1/32' prefix_two = '192.168.0.2/32' prefix_one_value = '1' prefix_two_value = '2' self.cli_set(base_path + ['interface', dummy_if]) self.cli_set(base_path + ['segment-routing', 'maximum-label-depth', maximum_stack_size]) self.cli_set(base_path + ['segment-routing', 'global-block', 'low-label-value', global_block_low]) self.cli_set(base_path + ['segment-routing', 'global-block', 'high-label-value', global_block_high]) self.cli_set(base_path + ['segment-routing', 'local-block', 'low-label-value', local_block_low]) self.cli_set(base_path + ['segment-routing', 'local-block', 'high-label-value', local_block_high]) self.cli_set(base_path + ['segment-routing', 'prefix', prefix_one, 'index', 'value', prefix_one_value]) self.cli_set(base_path + ['segment-routing', 'prefix', prefix_one, 'index', 'explicit-null']) self.cli_set(base_path + ['segment-routing', 'prefix', prefix_two, 'index', 'value', prefix_two_value]) self.cli_set(base_path + ['segment-routing', 'prefix', prefix_two, 'index', 'no-php-flag']) # Commit all changes self.cli_commit() # Verify all changes - frrconfig = self.getFRRconfig('router ospf', daemon=PROCESS_NAME) + frrconfig = self.getFRRconfig('router ospf', endsection='^exit', daemon=ospf_daemon) self.assertIn(f' segment-routing on', frrconfig) self.assertIn(f' segment-routing global-block {global_block_low} {global_block_high} local-block {local_block_low} {local_block_high}', frrconfig) self.assertIn(f' segment-routing node-msd {maximum_stack_size}', frrconfig) self.assertIn(f' segment-routing prefix {prefix_one} index {prefix_one_value} explicit-null', frrconfig) self.assertIn(f' segment-routing prefix {prefix_two} index {prefix_two_value} no-php-flag', frrconfig) def test_ospf_15_ldp_sync(self): holddown = "500" interfaces = Section.interfaces('ethernet') self.cli_set(base_path + ['interface', dummy_if]) self.cli_set(base_path + ['ldp-sync', 'holddown', holddown]) # Commit main OSPF changes self.cli_commit() # Verify main OSPF changes - frrconfig = self.getFRRconfig('router ospf', daemon=PROCESS_NAME) + frrconfig = self.getFRRconfig('router ospf', endsection='^exit', daemon=ospf_daemon) self.assertIn(f'router ospf', frrconfig) self.assertIn(f' timers throttle spf 200 1000 10000', frrconfig) self.assertIn(f' mpls ldp-sync holddown {holddown}', frrconfig) for interface in interfaces: self.cli_set(base_path + ['interface', interface, 'ldp-sync', 'holddown', holddown]) # Commit interface changes for holddown self.cli_commit() for interface in interfaces: # Verify interface changes for holddown - config = self.getFRRconfig(f'interface {interface}', daemon=PROCESS_NAME) + config = self.getFRRconfig(f'interface {interface}', daemon=ospf_daemon) self.assertIn(f'interface {interface}', config) self.assertIn(f' ip ospf dead-interval 40', config) self.assertIn(f' ip ospf mpls ldp-sync', config) self.assertIn(f' ip ospf mpls ldp-sync holddown {holddown}', config) for interface in interfaces: self.cli_set(base_path + ['interface', interface, 'ldp-sync', 'disable']) # Commit interface changes for disable self.cli_commit() for interface in interfaces: # Verify interface changes for disable - config = self.getFRRconfig(f'interface {interface}', daemon=PROCESS_NAME) + config = self.getFRRconfig(f'interface {interface}', daemon=ospf_daemon) self.assertIn(f'interface {interface}', config) self.assertIn(f' ip ospf dead-interval 40', config) self.assertNotIn(f' ip ospf mpls ldp-sync', config) def test_ospf_16_graceful_restart(self): period = '300' supported_grace_time = '400' router_ids = ['192.0.2.1', '192.0.2.2'] self.cli_set(base_path + ['capability', 'opaque']) self.cli_set(base_path + ['graceful-restart', 'grace-period', period]) self.cli_set(base_path + ['graceful-restart', 'helper', 'planned-only']) self.cli_set(base_path + ['graceful-restart', 'helper', 'no-strict-lsa-checking']) self.cli_set(base_path + ['graceful-restart', 'helper', 'supported-grace-time', supported_grace_time]) for router_id in router_ids: self.cli_set(base_path + ['graceful-restart', 'helper', 'enable', 'router-id', router_id]) # commit changes self.cli_commit() # Verify FRR ospfd configuration - frrconfig = self.getFRRconfig('router ospf', daemon=PROCESS_NAME) + frrconfig = self.getFRRconfig('router ospf', endsection='^exit', daemon=ospf_daemon) self.assertIn(f'router ospf', frrconfig) self.assertIn(f' capability opaque', frrconfig) self.assertIn(f' graceful-restart grace-period {period}', frrconfig) self.assertIn(f' graceful-restart helper planned-only', frrconfig) self.assertIn(f' no graceful-restart helper strict-lsa-checking', frrconfig) self.assertIn(f' graceful-restart helper supported-grace-time {supported_grace_time}', frrconfig) for router_id in router_ids: self.assertIn(f' graceful-restart helper enable {router_id}', frrconfig) def test_ospf_17_duplicate_area_network(self): area0 = '0' area1 = '1' network = '10.0.0.0/8' self.cli_set(base_path + ['area', area0, 'network', network]) # we can not have the same network defined on two areas self.cli_set(base_path + ['area', area1, 'network', network]) with self.assertRaises(ConfigSessionError): self.cli_commit() self.cli_delete(base_path + ['area', area0]) self.cli_commit() # Verify FRR ospfd configuration - frrconfig = self.getFRRconfig('router ospf', daemon=PROCESS_NAME) + frrconfig = self.getFRRconfig('router ospf', endsection='^exit', daemon=ospf_daemon) # Required to prevent the race condition T6761 retry_count = 0 max_retries = 60 while not frrconfig and retry_count < max_retries: # Log every 10 seconds if retry_count % 10 == 0: print(f"Attempt {retry_count}: FRR config is still empty. Retrying...") retry_count += 1 sleep(1) - frrconfig = self.getFRRconfig('router ospf', daemon=PROCESS_NAME) + frrconfig = self.getFRRconfig('router ospf', endsection='^exit', daemon=ospf_daemon) if not frrconfig: print("Failed to retrieve FRR config after 60 seconds") self.assertIn(f'router ospf', frrconfig) self.assertIn(f' network {network} area {area1}', frrconfig) if __name__ == '__main__': unittest.main(verbosity=2) diff --git a/smoketest/scripts/cli/test_protocols_ospfv3.py b/smoketest/scripts/cli/test_protocols_ospfv3.py index f0892d576..d961a4fdc 100755 --- a/smoketest/scripts/cli/test_protocols_ospfv3.py +++ b/smoketest/scripts/cli/test_protocols_ospfv3.py @@ -1,342 +1,342 @@ #!/usr/bin/env python3 # # Copyright (C) 2021-2024 VyOS maintainers and contributors # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License version 2 or later as # published by the Free Software Foundation. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. import unittest from base_vyostest_shim import VyOSUnitTestSHIM from vyos.configsession import ConfigSessionError from vyos.ifconfig import Section +from vyos.frrender import ospf6_daemon from vyos.utils.process import process_named_running -PROCESS_NAME = 'ospf6d' base_path = ['protocols', 'ospfv3'] route_map = 'foo-bar-baz-0815' router_id = '192.0.2.1' default_area = '0' class TestProtocolsOSPFv3(VyOSUnitTestSHIM.TestCase): @classmethod def setUpClass(cls): super(TestProtocolsOSPFv3, cls).setUpClass() # Retrieve FRR daemon PID - it is not allowed to crash, thus PID must remain the same - cls.daemon_pid = process_named_running(PROCESS_NAME) + cls.daemon_pid = process_named_running(ospf6_daemon) cls.cli_set(cls, ['policy', 'route-map', route_map, 'rule', '10', 'action', 'permit']) cls.cli_set(cls, ['policy', 'route-map', route_map, 'rule', '20', 'action', 'permit']) # ensure we can also run this test on a live system - so lets clean # out the current configuration :) cls.cli_delete(cls, base_path) @classmethod def tearDownClass(cls): cls.cli_delete(cls, ['policy', 'route-map', route_map]) super(TestProtocolsOSPFv3, cls).tearDownClass() def tearDown(self): self.cli_delete(base_path) self.cli_commit() - frrconfig = self.getFRRconfig('router ospf6') + frrconfig = self.getFRRconfig('router ospf6', endsection='^exit', daemon=ospf6_daemon) self.assertNotIn(f'router ospf6', frrconfig) # check process health and continuity - self.assertEqual(self.daemon_pid, process_named_running(PROCESS_NAME)) + self.assertEqual(self.daemon_pid, process_named_running(ospf6_daemon)) def test_ospfv3_01_basic(self): seq = '10' prefix = '2001:db8::/32' acl_name = 'foo-acl-100' self.cli_set(['policy', 'access-list6', acl_name, 'rule', seq, 'action', 'permit']) self.cli_set(['policy', 'access-list6', acl_name, 'rule', seq, 'source', 'any']) self.cli_set(base_path + ['parameters', 'router-id', router_id]) self.cli_set(base_path + ['area', default_area, 'range', prefix, 'advertise']) self.cli_set(base_path + ['area', default_area, 'export-list', acl_name]) self.cli_set(base_path + ['area', default_area, 'import-list', acl_name]) interfaces = Section.interfaces('ethernet') for interface in interfaces: self.cli_set(base_path + ['interface', interface, 'area', default_area]) # commit changes self.cli_commit() # Verify FRR ospfd configuration - frrconfig = self.getFRRconfig('router ospf6', daemon=PROCESS_NAME) + frrconfig = self.getFRRconfig('router ospf6', endsection='^exit', daemon=ospf6_daemon) self.assertIn(f'router ospf6', frrconfig) self.assertIn(f' area {default_area} range {prefix}', frrconfig) self.assertIn(f' ospf6 router-id {router_id}', frrconfig) self.assertIn(f' area {default_area} import-list {acl_name}', frrconfig) self.assertIn(f' area {default_area} export-list {acl_name}', frrconfig) for interface in interfaces: - if_config = self.getFRRconfig(f'interface {interface}', daemon=PROCESS_NAME) + if_config = self.getFRRconfig(f'interface {interface}', daemon=ospf6_daemon) self.assertIn(f'ipv6 ospf6 area {default_area}', if_config) self.cli_delete(['policy', 'access-list6', acl_name]) def test_ospfv3_02_distance(self): dist_global = '200' dist_external = '110' dist_inter_area = '120' dist_intra_area = '130' self.cli_set(base_path + ['distance', 'global', dist_global]) self.cli_set(base_path + ['distance', 'ospfv3', 'external', dist_external]) self.cli_set(base_path + ['distance', 'ospfv3', 'inter-area', dist_inter_area]) self.cli_set(base_path + ['distance', 'ospfv3', 'intra-area', dist_intra_area]) # commit changes self.cli_commit() # Verify FRR ospfd configuration - frrconfig = self.getFRRconfig('router ospf6', daemon=PROCESS_NAME) + frrconfig = self.getFRRconfig('router ospf6', endsection='^exit', daemon=ospf6_daemon) self.assertIn(f'router ospf6', frrconfig) self.assertIn(f' distance {dist_global}', frrconfig) self.assertIn(f' distance ospf6 intra-area {dist_intra_area} inter-area {dist_inter_area} external {dist_external}', frrconfig) def test_ospfv3_03_redistribute(self): metric = '15' metric_type = '1' route_map = 'foo-bar' route_map_seq = '10' redistribute = ['babel', 'bgp', 'connected', 'isis', 'kernel', 'ripng', 'static'] self.cli_set(['policy', 'route-map', route_map, 'rule', route_map_seq, 'action', 'permit']) for protocol in redistribute: self.cli_set(base_path + ['redistribute', protocol, 'metric', metric]) self.cli_set(base_path + ['redistribute', protocol, 'route-map', route_map]) self.cli_set(base_path + ['redistribute', protocol, 'metric-type', metric_type]) # commit changes self.cli_commit() # Verify FRR ospfd configuration - frrconfig = self.getFRRconfig('router ospf6', daemon=PROCESS_NAME) + frrconfig = self.getFRRconfig('router ospf6', endsection='^exit', daemon=ospf6_daemon) self.assertIn(f'router ospf6', frrconfig) for protocol in redistribute: self.assertIn(f' redistribute {protocol} metric {metric} metric-type {metric_type} route-map {route_map}', frrconfig) def test_ospfv3_04_interfaces(self): bfd_profile = 'vyos-ipv6' self.cli_set(base_path + ['parameters', 'router-id', router_id]) self.cli_set(base_path + ['area', default_area]) cost = '100' priority = '10' interfaces = Section.interfaces('ethernet') for interface in interfaces: if_base = base_path + ['interface', interface] self.cli_set(if_base + ['bfd', 'profile', bfd_profile]) self.cli_set(if_base + ['cost', cost]) self.cli_set(if_base + ['instance-id', '0']) self.cli_set(if_base + ['mtu-ignore']) self.cli_set(if_base + ['network', 'point-to-point']) self.cli_set(if_base + ['passive']) self.cli_set(if_base + ['priority', priority]) cost = str(int(cost) + 10) priority = str(int(priority) + 5) # commit changes self.cli_commit() # Verify FRR ospfd configuration - frrconfig = self.getFRRconfig('router ospf6', daemon=PROCESS_NAME) + frrconfig = self.getFRRconfig('router ospf6', endsection='^exit', daemon=ospf6_daemon) self.assertIn(f'router ospf6', frrconfig) cost = '100' priority = '10' for interface in interfaces: - if_config = self.getFRRconfig(f'interface {interface}', daemon=PROCESS_NAME) + if_config = self.getFRRconfig(f'interface {interface}', daemon=ospf6_daemon) self.assertIn(f'interface {interface}', if_config) self.assertIn(f' ipv6 ospf6 bfd', if_config) self.assertIn(f' ipv6 ospf6 bfd profile {bfd_profile}', if_config) self.assertIn(f' ipv6 ospf6 cost {cost}', if_config) self.assertIn(f' ipv6 ospf6 mtu-ignore', if_config) self.assertIn(f' ipv6 ospf6 network point-to-point', if_config) self.assertIn(f' ipv6 ospf6 passive', if_config) self.assertIn(f' ipv6 ospf6 priority {priority}', if_config) cost = str(int(cost) + 10) priority = str(int(priority) + 5) # Cleanup interfaces self.cli_delete(base_path + ['interface']) self.cli_commit() for interface in interfaces: - if_config = self.getFRRconfig(f'interface {interface}', daemon=PROCESS_NAME) + if_config = self.getFRRconfig(f'interface {interface}', daemon=ospf6_daemon) # There should be no OSPF6 configuration at all after interface removal self.assertNotIn(f' ipv6 ospf6', if_config) def test_ospfv3_05_area_stub(self): area_stub = '23' area_stub_nosum = '26' self.cli_set(base_path + ['area', area_stub, 'area-type', 'stub']) self.cli_set(base_path + ['area', area_stub_nosum, 'area-type', 'stub', 'no-summary']) # commit changes self.cli_commit() # Verify FRR ospfd configuration - frrconfig = self.getFRRconfig('router ospf6', daemon=PROCESS_NAME) + frrconfig = self.getFRRconfig('router ospf6', endsection='^exit', daemon=ospf6_daemon) self.assertIn(f'router ospf6', frrconfig) self.assertIn(f' area {area_stub} stub', frrconfig) self.assertIn(f' area {area_stub_nosum} stub no-summary', frrconfig) def test_ospfv3_06_area_nssa(self): area_nssa = '1.1.1.1' area_nssa_nosum = '2.2.2.2' area_nssa_default = '3.3.3.3' self.cli_set(base_path + ['area', area_nssa, 'area-type', 'nssa']) self.cli_set(base_path + ['area', area_nssa, 'area-type', 'stub']) # can only set one area-type per OSPFv3 area with self.assertRaises(ConfigSessionError): self.cli_commit() self.cli_delete(base_path + ['area', area_nssa, 'area-type', 'stub']) self.cli_set(base_path + ['area', area_nssa_nosum, 'area-type', 'nssa', 'no-summary']) self.cli_set(base_path + ['area', area_nssa_nosum, 'area-type', 'nssa', 'default-information-originate']) self.cli_set(base_path + ['area', area_nssa_default, 'area-type', 'nssa', 'default-information-originate']) # commit changes self.cli_commit() # Verify FRR ospfd configuration - frrconfig = self.getFRRconfig('router ospf6', daemon=PROCESS_NAME) + frrconfig = self.getFRRconfig('router ospf6', endsection='^exit', daemon=ospf6_daemon) self.assertIn(f'router ospf6', frrconfig) self.assertIn(f' area {area_nssa} nssa', frrconfig) self.assertIn(f' area {area_nssa_nosum} nssa default-information-originate no-summary', frrconfig) self.assertIn(f' area {area_nssa_default} nssa default-information-originate', frrconfig) def test_ospfv3_07_default_originate(self): seq = '100' metric = '50' metric_type = '1' self.cli_set(base_path + ['default-information', 'originate', 'metric', metric]) self.cli_set(base_path + ['default-information', 'originate', 'metric-type', metric_type]) self.cli_set(base_path + ['default-information', 'originate', 'route-map', route_map]) # commit changes self.cli_commit() # Verify FRR ospfd configuration - frrconfig = self.getFRRconfig('router ospf6', daemon=PROCESS_NAME) + frrconfig = self.getFRRconfig('router ospf6', endsection='^exit', daemon=ospf6_daemon) self.assertIn(f'router ospf6', frrconfig) self.assertIn(f' default-information originate metric {metric} metric-type {metric_type} route-map {route_map}', frrconfig) # Now set 'always' self.cli_set(base_path + ['default-information', 'originate', 'always']) self.cli_commit() # Verify FRR ospfd configuration - frrconfig = self.getFRRconfig('router ospf6', daemon=PROCESS_NAME) + frrconfig = self.getFRRconfig('router ospf6', endsection='^exit', daemon=ospf6_daemon) self.assertIn(f' default-information originate always metric {metric} metric-type {metric_type} route-map {route_map}', frrconfig) def test_ospfv3_08_vrfs(self): # It is safe to assume that when the basic VRF test works, all # other OSPF related features work, as we entirely inherit the CLI # templates and Jinja2 FRR template. table = '1000' vrf = 'blue' vrf_base = ['vrf', 'name', vrf] vrf_iface = 'eth1' router_id = '1.2.3.4' router_id_vrf = '1.2.3.5' self.cli_set(vrf_base + ['table', table]) self.cli_set(vrf_base + ['protocols', 'ospfv3', 'interface', vrf_iface, 'bfd']) self.cli_set(vrf_base + ['protocols', 'ospfv3', 'parameters', 'router-id', router_id_vrf]) self.cli_set(['interfaces', 'ethernet', vrf_iface, 'vrf', vrf]) # Also set a default VRF OSPF config self.cli_set(base_path + ['parameters', 'router-id', router_id]) self.cli_commit() # Verify FRR ospfd configuration - frrconfig = self.getFRRconfig('router ospf6', daemon=PROCESS_NAME) + frrconfig = self.getFRRconfig('router ospf6', endsection='^exit', daemon=ospf6_daemon) self.assertIn(f'router ospf6', frrconfig) self.assertIn(f' ospf6 router-id {router_id}', frrconfig) - frrconfig = self.getFRRconfig(f'interface {vrf_iface}', daemon=PROCESS_NAME) + frrconfig = self.getFRRconfig(f'interface {vrf_iface}', daemon=ospf6_daemon) self.assertIn(f'interface {vrf_iface}', frrconfig) self.assertIn(f' ipv6 ospf6 bfd', frrconfig) - frrconfig = self.getFRRconfig(f'router ospf6 vrf {vrf}', daemon=PROCESS_NAME) + frrconfig = self.getFRRconfig(f'router ospf6 vrf {vrf}', daemon=ospf6_daemon) self.assertIn(f'router ospf6 vrf {vrf}', frrconfig) self.assertIn(f' ospf6 router-id {router_id_vrf}', frrconfig) # T5467: Remove interface from OSPF process and VRF self.cli_delete(vrf_base + ['protocols', 'ospfv3', 'interface']) self.cli_delete(['interfaces', 'ethernet', vrf_iface, 'vrf']) self.cli_commit() # T5467: It must also be removed from FRR config - frrconfig = self.getFRRconfig(f'interface {vrf_iface}', daemon=PROCESS_NAME) + frrconfig = self.getFRRconfig(f'interface {vrf_iface}', daemon=ospf6_daemon) self.assertNotIn(f'interface {vrf_iface}', frrconfig) # There should be no OSPF related command at all under the interface self.assertNotIn(f' ipv6 ospf6', frrconfig) # cleanup self.cli_delete(['vrf', 'name', vrf]) self.cli_delete(['interfaces', 'ethernet', vrf_iface, 'vrf']) def test_ospfv3_09_graceful_restart(self): period = '300' supported_grace_time = '400' router_ids = ['192.0.2.1', '192.0.2.2'] self.cli_set(base_path + ['graceful-restart', 'grace-period', period]) self.cli_set(base_path + ['graceful-restart', 'helper', 'planned-only']) self.cli_set(base_path + ['graceful-restart', 'helper', 'lsa-check-disable']) self.cli_set(base_path + ['graceful-restart', 'helper', 'supported-grace-time', supported_grace_time]) for router_id in router_ids: self.cli_set(base_path + ['graceful-restart', 'helper', 'enable', 'router-id', router_id]) # commit changes self.cli_commit() # Verify FRR ospfd configuration - frrconfig = self.getFRRconfig('router ospf6', daemon=PROCESS_NAME) + frrconfig = self.getFRRconfig('router ospf6', endsection='^exit', daemon=ospf6_daemon) self.assertIn(f'router ospf6', frrconfig) self.assertIn(f' graceful-restart grace-period {period}', frrconfig) self.assertIn(f' graceful-restart helper planned-only', frrconfig) self.assertIn(f' graceful-restart helper lsa-check-disable', frrconfig) self.assertIn(f' graceful-restart helper supported-grace-time {supported_grace_time}', frrconfig) for router_id in router_ids: self.assertIn(f' graceful-restart helper enable {router_id}', frrconfig) if __name__ == '__main__': unittest.main(verbosity=2) diff --git a/smoketest/scripts/cli/test_protocols_pim.py b/smoketest/scripts/cli/test_protocols_pim.py index a2cc4f1ed..98b9d57aa 100755 --- a/smoketest/scripts/cli/test_protocols_pim.py +++ b/smoketest/scripts/cli/test_protocols_pim.py @@ -1,193 +1,192 @@ #!/usr/bin/env python3 # # Copyright (C) 2023 VyOS maintainers and contributors # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License version 2 or later as # published by the Free Software Foundation. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. import unittest from base_vyostest_shim import VyOSUnitTestSHIM from vyos.configsession import ConfigSessionError from vyos.frrender import pim_daemon from vyos.ifconfig import Section from vyos.utils.process import process_named_running -PROCESS_NAME = pim_daemon base_path = ['protocols', 'pim'] class TestProtocolsPIM(VyOSUnitTestSHIM.TestCase): def tearDown(self): # pimd process must be running - self.assertTrue(process_named_running(PROCESS_NAME)) + self.assertTrue(process_named_running(pim_daemon)) self.cli_delete(base_path) self.cli_commit() # pimd process must be stopped by now - self.assertFalse(process_named_running(PROCESS_NAME)) + self.assertFalse(process_named_running(pim_daemon)) def test_01_pim_basic(self): rp = '127.0.0.1' group = '224.0.0.0/4' hello = '100' dr_priority = '64' self.cli_set(base_path + ['rp', 'address', rp, 'group', group]) interfaces = Section.interfaces('ethernet') for interface in interfaces: self.cli_set(base_path + ['interface', interface , 'bfd']) self.cli_set(base_path + ['interface', interface , 'dr-priority', dr_priority]) self.cli_set(base_path + ['interface', interface , 'hello', hello]) self.cli_set(base_path + ['interface', interface , 'no-bsm']) self.cli_set(base_path + ['interface', interface , 'no-unicast-bsm']) self.cli_set(base_path + ['interface', interface , 'passive']) # commit changes self.cli_commit() # Verify FRR pimd configuration - frrconfig = self.getFRRconfig('router pim', endsection='^exit', daemon=PROCESS_NAME) + frrconfig = self.getFRRconfig('router pim', endsection='^exit', daemon=pim_daemon) self.assertIn(f' rp {rp} {group}', frrconfig) for interface in interfaces: - frrconfig = self.getFRRconfig(f'interface {interface}', daemon=PROCESS_NAME) + frrconfig = self.getFRRconfig(f'interface {interface}', daemon=pim_daemon) self.assertIn(f'interface {interface}', frrconfig) self.assertIn(f' ip pim', frrconfig) self.assertIn(f' ip pim bfd', frrconfig) self.assertIn(f' ip pim drpriority {dr_priority}', frrconfig) self.assertIn(f' ip pim hello {hello}', frrconfig) self.assertIn(f' no ip pim bsm', frrconfig) self.assertIn(f' no ip pim unicast-bsm', frrconfig) self.assertIn(f' ip pim passive', frrconfig) self.cli_commit() def test_02_pim_advanced(self): rp = '127.0.0.2' group = '224.0.0.0/4' join_prune_interval = '123' rp_keep_alive_timer = '190' keep_alive_timer = '180' packets = '10' prefix_list = 'pim-test' register_suppress_time = '300' self.cli_set(base_path + ['rp', 'address', rp, 'group', group]) self.cli_set(base_path + ['rp', 'keep-alive-timer', rp_keep_alive_timer]) self.cli_set(base_path + ['ecmp', 'rebalance']) self.cli_set(base_path + ['join-prune-interval', join_prune_interval]) self.cli_set(base_path + ['keep-alive-timer', keep_alive_timer]) self.cli_set(base_path + ['packets', packets]) self.cli_set(base_path + ['register-accept-list', 'prefix-list', prefix_list]) self.cli_set(base_path + ['register-suppress-time', register_suppress_time]) self.cli_set(base_path + ['no-v6-secondary']) self.cli_set(base_path + ['spt-switchover', 'infinity-and-beyond', 'prefix-list', prefix_list]) self.cli_set(base_path + ['ssm', 'prefix-list', prefix_list]) # check validate() - PIM require defined interfaces! with self.assertRaises(ConfigSessionError): self.cli_commit() interfaces = Section.interfaces('ethernet') for interface in interfaces: self.cli_set(base_path + ['interface', interface]) # commit changes self.cli_commit() # Verify FRR pimd configuration - frrconfig = self.getFRRconfig('router pim', endsection='^exit', daemon=PROCESS_NAME) + frrconfig = self.getFRRconfig('router pim', endsection='^exit', daemon=pim_daemon) self.assertIn(f' no send-v6-secondary', frrconfig) self.assertIn(f' rp {rp} {group}', frrconfig) self.assertIn(f' register-suppress-time {register_suppress_time}', frrconfig) self.assertIn(f' join-prune-interval {join_prune_interval}', frrconfig) self.assertIn(f' packets {packets}', frrconfig) self.assertIn(f' keep-alive-timer {keep_alive_timer}', frrconfig) self.assertIn(f' rp keep-alive-timer {rp_keep_alive_timer}', frrconfig) self.assertIn(f' ssm prefix-list {prefix_list}', frrconfig) self.assertIn(f' register-accept-list {prefix_list}', frrconfig) self.assertIn(f' spt-switchover infinity-and-beyond prefix-list {prefix_list}', frrconfig) self.assertIn(f' ecmp rebalance', frrconfig) def test_03_pim_igmp_proxy(self): igmp_proxy = ['protocols', 'igmp-proxy'] rp = '127.0.0.1' group = '224.0.0.0/4' self.cli_set(base_path) self.cli_set(igmp_proxy) # check validate() - can not set both IGMP proxy and PIM with self.assertRaises(ConfigSessionError): self.cli_commit() self.cli_delete(igmp_proxy) self.cli_set(base_path + ['rp', 'address', rp, 'group', group]) interfaces = Section.interfaces('ethernet') for interface in interfaces: self.cli_set(base_path + ['interface', interface , 'bfd']) # commit changes self.cli_commit() def test_04_igmp(self): watermark_warning = '2000' query_interval = '1000' query_max_response_time = '200' version = '2' igmp_join = { '224.1.1.1' : { 'source' : ['1.1.1.1', '2.2.2.2', '3.3.3.3'] }, '224.1.2.2' : { 'source' : [] }, '224.1.3.3' : {}, } self.cli_set(base_path + ['igmp', 'watermark-warning', watermark_warning]) interfaces = Section.interfaces('ethernet') for interface in interfaces: self.cli_set(base_path + ['interface', interface , 'igmp', 'version', version]) self.cli_set(base_path + ['interface', interface , 'igmp', 'query-interval', query_interval]) self.cli_set(base_path + ['interface', interface , 'igmp', 'query-max-response-time', query_max_response_time]) for join, join_config in igmp_join.items(): self.cli_set(base_path + ['interface', interface , 'igmp', 'join', join]) if 'source' in join_config: for source in join_config['source']: self.cli_set(base_path + ['interface', interface , 'igmp', 'join', join, 'source-address', source]) self.cli_commit() - frrconfig = self.getFRRconfig(daemon=PROCESS_NAME) + frrconfig = self.getFRRconfig(daemon=pim_daemon) self.assertIn(f'ip igmp watermark-warn {watermark_warning}', frrconfig) for interface in interfaces: - frrconfig = self.getFRRconfig(f'interface {interface}', daemon=PROCESS_NAME) + frrconfig = self.getFRRconfig(f'interface {interface}', daemon=pim_daemon) self.assertIn(f'interface {interface}', frrconfig) self.assertIn(f' ip igmp', frrconfig) self.assertIn(f' ip igmp version {version}', frrconfig) self.assertIn(f' ip igmp query-interval {query_interval}', frrconfig) self.assertIn(f' ip igmp query-max-response-time {query_max_response_time}', frrconfig) for join, join_config in igmp_join.items(): if 'source' in join_config: for source in join_config['source']: self.assertIn(f' ip igmp join-group {join} {source}', frrconfig) else: self.assertIn(f' ip igmp join-group {join}', frrconfig) if __name__ == '__main__': unittest.main(verbosity=2) diff --git a/smoketest/scripts/cli/test_protocols_pim6.py b/smoketest/scripts/cli/test_protocols_pim6.py index 4b9ae6161..f69eb4ae1 100755 --- a/smoketest/scripts/cli/test_protocols_pim6.py +++ b/smoketest/scripts/cli/test_protocols_pim6.py @@ -1,146 +1,146 @@ #!/usr/bin/env python3 # # Copyright (C) 2023 VyOS maintainers and contributors # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License version 2 or later as # published by the Free Software Foundation. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. import unittest from base_vyostest_shim import VyOSUnitTestSHIM from vyos.configsession import ConfigSessionError from vyos.ifconfig import Section +from vyos.frrender import pim6_daemon from vyos.utils.process import process_named_running -PROCESS_NAME = 'pim6d' base_path = ['protocols', 'pim6'] class TestProtocolsPIMv6(VyOSUnitTestSHIM.TestCase): @classmethod def setUpClass(cls): # call base-classes classmethod super(TestProtocolsPIMv6, cls).setUpClass() # Retrieve FRR daemon PID - it is not allowed to crash, thus PID must remain the same - cls.daemon_pid = process_named_running(PROCESS_NAME) + cls.daemon_pid = process_named_running(pim6_daemon) # ensure we can also run this test on a live system - so lets clean # out the current configuration :) cls.cli_delete(cls, base_path) def tearDown(self): self.cli_delete(base_path) self.cli_commit() # check process health and continuity - self.assertEqual(self.daemon_pid, process_named_running(PROCESS_NAME)) + self.assertEqual(self.daemon_pid, process_named_running(pim6_daemon)) def test_pim6_01_mld_simple(self): # commit changes interfaces = Section.interfaces('ethernet') for interface in interfaces: self.cli_set(base_path + ['interface', interface]) self.cli_commit() # Verify FRR pim6d configuration for interface in interfaces: - config = self.getFRRconfig(f'interface {interface}', daemon=PROCESS_NAME) + config = self.getFRRconfig(f'interface {interface}', daemon=pim6_daemon) self.assertIn(f'interface {interface}', config) self.assertIn(f' ipv6 mld', config) self.assertNotIn(f' ipv6 mld version 1', config) # Change to MLD version 1 for interface in interfaces: self.cli_set(base_path + ['interface', interface, 'mld', 'version', '1']) self.cli_commit() # Verify FRR pim6d configuration for interface in interfaces: - config = self.getFRRconfig(f'interface {interface}', daemon=PROCESS_NAME) + config = self.getFRRconfig(f'interface {interface}', daemon=pim6_daemon) self.assertIn(f'interface {interface}', config) self.assertIn(f' ipv6 mld', config) self.assertIn(f' ipv6 mld version 1', config) def test_pim6_02_mld_join(self): interfaces = Section.interfaces('ethernet') # Use an invalid multicast group address for interface in interfaces: self.cli_set(base_path + ['interface', interface, 'mld', 'join', 'fd00::1234']) with self.assertRaises(ConfigSessionError): self.cli_commit() self.cli_delete(base_path + ['interface']) # Use a valid multicast group address for interface in interfaces: self.cli_set(base_path + ['interface', interface, 'mld', 'join', 'ff18::1234']) self.cli_commit() # Verify FRR pim6d configuration for interface in interfaces: - config = self.getFRRconfig(f'interface {interface}', daemon=PROCESS_NAME) + config = self.getFRRconfig(f'interface {interface}', daemon=pim6_daemon) self.assertIn(f'interface {interface}', config) self.assertIn(f' ipv6 mld join-group ff18::1234', config) # Join a source-specific multicast group for interface in interfaces: self.cli_set(base_path + ['interface', interface, 'mld', 'join', 'ff38::5678', 'source', '2001:db8::5678']) self.cli_commit() # Verify FRR pim6d configuration for interface in interfaces: - config = self.getFRRconfig(f'interface {interface}', daemon=PROCESS_NAME) + config = self.getFRRconfig(f'interface {interface}', daemon=pim6_daemon) self.assertIn(f'interface {interface}', config) self.assertIn(f' ipv6 mld join-group ff38::5678 2001:db8::5678', config) def test_pim6_03_basic(self): interfaces = Section.interfaces('ethernet') join_prune_interval = '123' keep_alive_timer = '77' packets = '5' register_suppress_time = '99' dr_priority = '100' hello = '50' self.cli_set(base_path + ['join-prune-interval', join_prune_interval]) self.cli_set(base_path + ['keep-alive-timer', keep_alive_timer]) self.cli_set(base_path + ['packets', packets]) self.cli_set(base_path + ['register-suppress-time', register_suppress_time]) for interface in interfaces: self.cli_set(base_path + ['interface', interface, 'dr-priority', dr_priority]) self.cli_set(base_path + ['interface', interface, 'hello', hello]) self.cli_set(base_path + ['interface', interface, 'no-bsm']) self.cli_set(base_path + ['interface', interface, 'no-unicast-bsm']) self.cli_set(base_path + ['interface', interface, 'passive']) self.cli_commit() # Verify FRR pim6d configuration - config = self.getFRRconfig('router pim6', endsection='^exit', daemon=PROCESS_NAME) + config = self.getFRRconfig('router pim6', endsection='^exit', daemon=pim6_daemon) self.assertIn(f' join-prune-interval {join_prune_interval}', config) self.assertIn(f' keep-alive-timer {keep_alive_timer}', config) self.assertIn(f' packets {packets}', config) self.assertIn(f' register-suppress-time {register_suppress_time}', config) for interface in interfaces: - config = self.getFRRconfig(f'interface {interface}', daemon=PROCESS_NAME) + config = self.getFRRconfig(f'interface {interface}', daemon=pim6_daemon) self.assertIn(f' ipv6 pim drpriority {dr_priority}', config) self.assertIn(f' ipv6 pim hello {hello}', config) self.assertIn(f' no ipv6 pim bsm', config) self.assertIn(f' no ipv6 pim unicast-bsm', config) self.assertIn(f' ipv6 pim passive', config) if __name__ == '__main__': unittest.main(verbosity=2) diff --git a/smoketest/scripts/cli/test_protocols_rip.py b/smoketest/scripts/cli/test_protocols_rip.py index 48a1f877b..33706a9c9 100755 --- a/smoketest/scripts/cli/test_protocols_rip.py +++ b/smoketest/scripts/cli/test_protocols_rip.py @@ -1,186 +1,186 @@ #!/usr/bin/env python3 # # Copyright (C) 2021-2023 VyOS maintainers and contributors # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License version 2 or later as # published by the Free Software Foundation. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. import unittest from base_vyostest_shim import VyOSUnitTestSHIM from vyos.ifconfig import Section +from vyos.frrender import rip_daemon from vyos.utils.process import process_named_running -PROCESS_NAME = 'ripd' acl_in = '198' acl_out = '199' prefix_list_in = 'foo-prefix' prefix_list_out = 'bar-prefix' route_map = 'FooBar123' base_path = ['protocols', 'rip'] class TestProtocolsRIP(VyOSUnitTestSHIM.TestCase): @classmethod def setUpClass(cls): super(TestProtocolsRIP, cls).setUpClass() # Retrieve FRR daemon PID - it is not allowed to crash, thus PID must remain the same - cls.daemon_pid = process_named_running(PROCESS_NAME) + cls.daemon_pid = process_named_running(rip_daemon) # ensure we can also run this test on a live system - so lets clean # out the current configuration :) cls.cli_delete(cls, base_path) cls.cli_set(cls, ['policy', 'access-list', acl_in, 'rule', '10', 'action', 'permit']) cls.cli_set(cls, ['policy', 'access-list', acl_in, 'rule', '10', 'source', 'any']) cls.cli_set(cls, ['policy', 'access-list', acl_in, 'rule', '10', 'destination', 'any']) cls.cli_set(cls, ['policy', 'access-list', acl_out, 'rule', '20', 'action', 'deny']) cls.cli_set(cls, ['policy', 'access-list', acl_out, 'rule', '20', 'source', 'any']) cls.cli_set(cls, ['policy', 'access-list', acl_out, 'rule', '20', 'destination', 'any']) cls.cli_set(cls, ['policy', 'prefix-list', prefix_list_in, 'rule', '100', 'action', 'permit']) cls.cli_set(cls, ['policy', 'prefix-list', prefix_list_in, 'rule', '100', 'prefix', '192.0.2.0/24']) cls.cli_set(cls, ['policy', 'prefix-list', prefix_list_out, 'rule', '200', 'action', 'deny']) cls.cli_set(cls, ['policy', 'prefix-list', prefix_list_out, 'rule', '200', 'prefix', '192.0.2.0/24']) cls.cli_set(cls, ['policy', 'route-map', route_map, 'rule', '10', 'action', 'permit']) @classmethod def tearDownClass(cls): cls.cli_delete(cls, ['policy', 'access-list', acl_in]) cls.cli_delete(cls, ['policy', 'access-list', acl_out]) cls.cli_delete(cls, ['policy', 'prefix-list', prefix_list_in]) cls.cli_delete(cls, ['policy', 'prefix-list', prefix_list_out]) cls.cli_delete(cls, ['policy', 'route-map', route_map]) super(TestProtocolsRIP, cls).tearDownClass() def tearDown(self): self.cli_delete(base_path) self.cli_commit() frrconfig = self.getFRRconfig('router rip') self.assertNotIn(f'router rip', frrconfig) # check process health and continuity - self.assertEqual(self.daemon_pid, process_named_running(PROCESS_NAME)) + self.assertEqual(self.daemon_pid, process_named_running(rip_daemon)) def test_rip_01_parameters(self): distance = '40' network_distance = '66' metric = '8' interfaces = Section.interfaces('ethernet') neighbors = ['1.2.3.4', '1.2.3.5', '1.2.3.6'] networks = ['10.0.0.0/8', '172.16.0.0/12', '192.168.0.0/16'] redistribute = ['bgp', 'connected', 'isis', 'kernel', 'ospf', 'static'] timer_garbage = '888' timer_timeout = '1000' timer_update = '90' self.cli_set(base_path + ['default-distance', distance]) self.cli_set(base_path + ['default-information', 'originate']) self.cli_set(base_path + ['default-metric', metric]) self.cli_set(base_path + ['distribute-list', 'access-list', 'in', acl_in]) self.cli_set(base_path + ['distribute-list', 'access-list', 'out', acl_out]) self.cli_set(base_path + ['distribute-list', 'prefix-list', 'in', prefix_list_in]) self.cli_set(base_path + ['distribute-list', 'prefix-list', 'out', prefix_list_out]) self.cli_set(base_path + ['passive-interface', 'default']) self.cli_set(base_path + ['timers', 'garbage-collection', timer_garbage]) self.cli_set(base_path + ['timers', 'timeout', timer_timeout]) self.cli_set(base_path + ['timers', 'update', timer_update]) for interface in interfaces: self.cli_set(base_path + ['interface', interface]) self.cli_set(base_path + ['distribute-list', 'interface', interface, 'access-list', 'in', acl_in]) self.cli_set(base_path + ['distribute-list', 'interface', interface, 'access-list', 'out', acl_out]) self.cli_set(base_path + ['distribute-list', 'interface', interface, 'prefix-list', 'in', prefix_list_in]) self.cli_set(base_path + ['distribute-list', 'interface', interface, 'prefix-list', 'out', prefix_list_out]) for neighbor in neighbors: self.cli_set(base_path + ['neighbor', neighbor]) for network in networks: self.cli_set(base_path + ['network', network]) self.cli_set(base_path + ['network-distance', network, 'distance', network_distance]) self.cli_set(base_path + ['route', network]) for proto in redistribute: self.cli_set(base_path + ['redistribute', proto, 'metric', metric]) self.cli_set(base_path + ['redistribute', proto, 'route-map', route_map]) # commit changes self.cli_commit() # Verify FRR ripd configuration frrconfig = self.getFRRconfig('router rip') self.assertIn(f'router rip', frrconfig) self.assertIn(f' distance {distance}', frrconfig) self.assertIn(f' default-information originate', frrconfig) self.assertIn(f' default-metric {metric}', frrconfig) self.assertIn(f' distribute-list {acl_in} in', frrconfig) self.assertIn(f' distribute-list {acl_out} out', frrconfig) self.assertIn(f' distribute-list prefix {prefix_list_in} in', frrconfig) self.assertIn(f' distribute-list prefix {prefix_list_out} out', frrconfig) self.assertIn(f' passive-interface default', frrconfig) self.assertIn(f' timers basic {timer_update} {timer_timeout} {timer_garbage}', frrconfig) for interface in interfaces: self.assertIn(f' network {interface}', frrconfig) self.assertIn(f' distribute-list {acl_in} in {interface}', frrconfig) self.assertIn(f' distribute-list {acl_out} out {interface}', frrconfig) self.assertIn(f' distribute-list prefix {prefix_list_in} in {interface}', frrconfig) self.assertIn(f' distribute-list prefix {prefix_list_out} out {interface}', frrconfig) for neighbor in neighbors: self.assertIn(f' neighbor {neighbor}', frrconfig) for network in networks: self.assertIn(f' network {network}', frrconfig) self.assertIn(f' distance {network_distance} {network}', frrconfig) self.assertIn(f' route {network}', frrconfig) for proto in redistribute: self.assertIn(f' redistribute {proto} metric {metric} route-map {route_map}', frrconfig) def test_rip_02_zebra_route_map(self): # Implemented because of T3328 self.cli_set(base_path + ['route-map', route_map]) # commit changes self.cli_commit() # Verify FRR configuration zebra_route_map = f'ip protocol rip route-map {route_map}' frrconfig = self.getFRRconfig(zebra_route_map) self.assertIn(zebra_route_map, frrconfig) # Remove the route-map again self.cli_delete(base_path + ['route-map']) # commit changes self.cli_commit() # Verify FRR configuration frrconfig = self.getFRRconfig(zebra_route_map) self.assertNotIn(zebra_route_map, frrconfig) def test_rip_03_version(self): rx_version = '1' tx_version = '2' interface = 'eth0' self.cli_set(base_path + ['version', tx_version]) self.cli_set(base_path + ['interface', interface, 'send', 'version', tx_version]) self.cli_set(base_path + ['interface', interface, 'receive', 'version', rx_version]) # commit changes self.cli_commit() # Verify FRR configuration frrconfig = self.getFRRconfig('router rip') self.assertIn(f'version {tx_version}', frrconfig) frrconfig = self.getFRRconfig(f'interface {interface}') self.assertIn(f' ip rip receive version {rx_version}', frrconfig) self.assertIn(f' ip rip send version {tx_version}', frrconfig) if __name__ == '__main__': unittest.main(verbosity=2) diff --git a/smoketest/scripts/cli/test_protocols_ripng.py b/smoketest/scripts/cli/test_protocols_ripng.py index c266bdfd2..b10df9679 100755 --- a/smoketest/scripts/cli/test_protocols_ripng.py +++ b/smoketest/scripts/cli/test_protocols_ripng.py @@ -1,163 +1,163 @@ #!/usr/bin/env python3 # # Copyright (C) 2021-2023 VyOS maintainers and contributors # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License version 2 or later as # published by the Free Software Foundation. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. import unittest from base_vyostest_shim import VyOSUnitTestSHIM from vyos.ifconfig import Section +from vyos.frrender import ripng_daemon from vyos.utils.process import process_named_running -PROCESS_NAME = 'ripngd' acl_in = '198' acl_out = '199' prefix_list_in = 'foo-prefix' prefix_list_out = 'bar-prefix' route_map = 'FooBar123' base_path = ['protocols', 'ripng'] class TestProtocolsRIPng(VyOSUnitTestSHIM.TestCase): @classmethod def setUpClass(cls): # call base-classes classmethod super(TestProtocolsRIPng, cls).setUpClass() # Retrieve FRR daemon PID - it is not allowed to crash, thus PID must remain the same - cls.daemon_pid = process_named_running(PROCESS_NAME) + cls.daemon_pid = process_named_running(ripng_daemon) # ensure we can also run this test on a live system - so lets clean # out the current configuration :) cls.cli_delete(cls, base_path) cls.cli_set(cls, ['policy', 'access-list6', acl_in, 'rule', '10', 'action', 'permit']) cls.cli_set(cls, ['policy', 'access-list6', acl_in, 'rule', '10', 'source', 'any']) cls.cli_set(cls, ['policy', 'access-list6', acl_out, 'rule', '20', 'action', 'deny']) cls.cli_set(cls, ['policy', 'access-list6', acl_out, 'rule', '20', 'source', 'any']) cls.cli_set(cls, ['policy', 'prefix-list6', prefix_list_in, 'rule', '100', 'action', 'permit']) cls.cli_set(cls, ['policy', 'prefix-list6', prefix_list_in, 'rule', '100', 'prefix', '2001:db8::/32']) cls.cli_set(cls, ['policy', 'prefix-list6', prefix_list_out, 'rule', '200', 'action', 'deny']) cls.cli_set(cls, ['policy', 'prefix-list6', prefix_list_out, 'rule', '200', 'prefix', '2001:db8::/32']) cls.cli_set(cls, ['policy', 'route-map', route_map, 'rule', '10', 'action', 'permit']) @classmethod def tearDownClass(cls): # call base-classes classmethod super(TestProtocolsRIPng, cls).tearDownClass() cls.cli_delete(cls, ['policy', 'access-list6', acl_in]) cls.cli_delete(cls, ['policy', 'access-list6', acl_out]) cls.cli_delete(cls, ['policy', 'prefix-list6', prefix_list_in]) cls.cli_delete(cls, ['policy', 'prefix-list6', prefix_list_out]) cls.cli_delete(cls, ['policy', 'route-map', route_map]) def tearDown(self): self.cli_delete(base_path) self.cli_commit() frrconfig = self.getFRRconfig('router ripng') self.assertNotIn(f'router ripng', frrconfig) # check process health and continuity - self.assertEqual(self.daemon_pid, process_named_running(PROCESS_NAME)) + self.assertEqual(self.daemon_pid, process_named_running(ripng_daemon)) def test_ripng_01_parameters(self): metric = '8' interfaces = Section.interfaces('ethernet') aggregates = ['2001:db8:1000::/48', '2001:db8:2000::/48', '2001:db8:3000::/48'] networks = ['2001:db8:1000::/64', '2001:db8:1001::/64', '2001:db8:2000::/64', '2001:db8:2001::/64'] redistribute = ['bgp', 'connected', 'kernel', 'ospfv3', 'static'] timer_garbage = '888' timer_timeout = '1000' timer_update = '90' self.cli_set(base_path + ['default-information', 'originate']) self.cli_set(base_path + ['default-metric', metric]) self.cli_set(base_path + ['distribute-list', 'access-list', 'in', acl_in]) self.cli_set(base_path + ['distribute-list', 'access-list', 'out', acl_out]) self.cli_set(base_path + ['distribute-list', 'prefix-list', 'in', prefix_list_in]) self.cli_set(base_path + ['distribute-list', 'prefix-list', 'out', prefix_list_out]) self.cli_set(base_path + ['passive-interface', 'default']) self.cli_set(base_path + ['timers', 'garbage-collection', timer_garbage]) self.cli_set(base_path + ['timers', 'timeout', timer_timeout]) self.cli_set(base_path + ['timers', 'update', timer_update]) for aggregate in aggregates: self.cli_set(base_path + ['aggregate-address', aggregate]) for interface in interfaces: self.cli_set(base_path + ['interface', interface]) self.cli_set(base_path + ['distribute-list', 'interface', interface, 'access-list', 'in', acl_in]) self.cli_set(base_path + ['distribute-list', 'interface', interface, 'access-list', 'out', acl_out]) self.cli_set(base_path + ['distribute-list', 'interface', interface, 'prefix-list', 'in', prefix_list_in]) self.cli_set(base_path + ['distribute-list', 'interface', interface, 'prefix-list', 'out', prefix_list_out]) for network in networks: self.cli_set(base_path + ['network', network]) self.cli_set(base_path + ['route', network]) for proto in redistribute: self.cli_set(base_path + ['redistribute', proto, 'metric', metric]) self.cli_set(base_path + ['redistribute', proto, 'route-map', route_map]) # commit changes self.cli_commit() # Verify FRR ospfd configuration frrconfig = self.getFRRconfig('router ripng') self.assertIn(f'router ripng', frrconfig) self.assertIn(f' default-information originate', frrconfig) self.assertIn(f' default-metric {metric}', frrconfig) self.assertIn(f' ipv6 distribute-list {acl_in} in', frrconfig) self.assertIn(f' ipv6 distribute-list {acl_out} out', frrconfig) self.assertIn(f' ipv6 distribute-list prefix {prefix_list_in} in', frrconfig) self.assertIn(f' ipv6 distribute-list prefix {prefix_list_out} out', frrconfig) self.assertIn(f' passive-interface default', frrconfig) self.assertIn(f' timers basic {timer_update} {timer_timeout} {timer_garbage}', frrconfig) for aggregate in aggregates: self.assertIn(f' aggregate-address {aggregate}', frrconfig) for interface in interfaces: self.assertIn(f' network {interface}', frrconfig) self.assertIn(f' ipv6 distribute-list {acl_in} in {interface}', frrconfig) self.assertIn(f' ipv6 distribute-list {acl_out} out {interface}', frrconfig) self.assertIn(f' ipv6 distribute-list prefix {prefix_list_in} in {interface}', frrconfig) self.assertIn(f' ipv6 distribute-list prefix {prefix_list_out} out {interface}', frrconfig) for network in networks: self.assertIn(f' network {network}', frrconfig) self.assertIn(f' route {network}', frrconfig) for proto in redistribute: if proto == 'ospfv3': proto = 'ospf6' self.assertIn(f' redistribute {proto} metric {metric} route-map {route_map}', frrconfig) def test_ripng_02_zebra_route_map(self): # Implemented because of T3328 self.cli_set(base_path + ['route-map', route_map]) # commit changes self.cli_commit() # Verify FRR configuration zebra_route_map = f'ipv6 protocol ripng route-map {route_map}' frrconfig = self.getFRRconfig(zebra_route_map) self.assertIn(zebra_route_map, frrconfig) # Remove the route-map again self.cli_delete(base_path + ['route-map']) # commit changes self.cli_commit() # Verify FRR configuration frrconfig = self.getFRRconfig(zebra_route_map) self.assertNotIn(zebra_route_map, frrconfig) if __name__ == '__main__': unittest.main(verbosity=2) diff --git a/smoketest/scripts/cli/test_protocols_rpki.py b/smoketest/scripts/cli/test_protocols_rpki.py index c6a6adf58..23738717a 100755 --- a/smoketest/scripts/cli/test_protocols_rpki.py +++ b/smoketest/scripts/cli/test_protocols_rpki.py @@ -1,250 +1,249 @@ #!/usr/bin/env python3 # # Copyright (C) 2021-2024 VyOS maintainers and contributors # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License version 2 or later as # published by the Free Software Foundation. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. import unittest from base_vyostest_shim import VyOSUnitTestSHIM from vyos.configsession import ConfigSessionError +from vyos.frrender import bgp_daemon from vyos.utils.file import read_file from vyos.utils.process import process_named_running base_path = ['protocols', 'rpki'] -PROCESS_NAME = 'bgpd' - rpki_key_name = 'rpki-smoketest' rpki_key_type = 'ssh-rsa' rpki_ssh_key = """ b3BlbnNzaC1rZXktdjEAAAAABG5vbmUAAAAEbm9uZQAAAAAAAAABAAABFwAAAAdzc2gtcn NhAAAAAwEAAQAAAQEAweDyflDFR4qyEwETbJkZ2ZZc+sJNiDTvYpwGsWIkju49lJSxHe1x Kf8FhwfyMu40Snt1yDlRmmmz4CsbLgbuZGMPvXG11e34+C0pSVUvpF6aqRTeLl1pDRK7Rn jgm3su+I8SRLQR4qbLG6VXWOFuVpwiqbExLaU0hFYTPNP+dArNpsWEEKsohk6pTXdhg3Vz Wp3vCMjl2JTshDa3lD7p2xISSAReEY0fnfEAmQzH4Z6DIwwGdFuMWoQIg+oFBM9ARrO2/F IjRsz6AecR/WeU72JEw4aJic1/cAJQA6PiQBHwkuo3Wll1tbpxeRZoB2NQG22ETyJLvhfT aooNLT9HpQAAA8joU5dM6FOXTAAAAAdzc2gtcnNhAAABAQDB4PJ+UMVHirITARNsmRnZll z6wk2INO9inAaxYiSO7j2UlLEd7XEp/wWHB/Iy7jRKe3XIOVGaabPgKxsuBu5kYw+9cbXV 7fj4LSlJVS+kXpqpFN4uXWkNErtGeOCbey74jxJEtBHipssbpVdY4W5WnCKpsTEtpTSEVh M80/50Cs2mxYQQqyiGTqlNd2GDdXNane8IyOXYlOyENreUPunbEhJIBF4RjR+d8QCZDMfh noMjDAZ0W4xahAiD6gUEz0BGs7b8UiNGzPoB5xH9Z5TvYkTDhomJzX9wAlADo+JAEfCS6j daWXW1unF5FmgHY1AbbYRPIku+F9Nqig0tP0elAAAAAwEAAQAAAQACkDlUjzfUhtJs6uY5 WNrdJB5NmHUS+HQzzxFNlhkapK6+wKqI1UNaRUtq6iF7J+gcFf7MK2nXS098BsXguWm8fQ zPuemoDvHsQhiaJhyvpSqRUrvPTB/f8t/0AhQiKiJIWgfpTaIw53inAGwjujNNxNm2eafH TThhCYxOkRT7rsT6bnSio6yeqPy5QHg7IKFztp5FXDUyiOS3aX3SvzQcDUkMXALdvzX50t 1XIk+X48Rgkq72dL4VpV2oMNDu3hM6FqBUplf9Mv3s51FNSma/cibCQoVufrIfoqYjkNTj IpYFUcq4zZ0/KvgXgzSsy9VN/4TtbalrOuu7X/SHJbvhAAAAgGPFsXgONYQvXxCnK1dIue ozgaZg1I/n522E2ZCOXBW4dYJVyNpppwRreDzuFzTDEe061MpNHfScjVBJCCulivFYWscL 6oaGsryDbFxO3QmB4I98UBqrds2yan9/JGc6EYe299yvaHy7Y64+NC0+fN8H2RAZ61T4w1 0JrCaJRyvzAAAAgQDvBfuV1U7o9k/fbU+U7W2UYnWblpOZAMfi1XQP6IJJeyWs90PdTdXh +l0eIQrCawIiRJytNfxMmbD4huwTf77fWiyCcPznmALQ7ex/yJ+W5Z0V4dPGF3h7o1uiS2 36JhQ7mfcliCkhp/1PIklBIMPcCp0zl+s9wMv2hX7w1Pah9QAAAIEAz6YgU9Xute+J+dBw oWxEQ+igR6KE55Um7O9AvSrqnCm9r7lSFsXC2ErYOxoDSJ3yIBEV0b4XAGn6tbbVIs3jS8 BnLHxclAHQecOx1PGn7PKbnPW0oJRq/X9QCIEelKYvlykpayn7uZooTXqcDaPZxfPpmPdy e8chVJvdygi7kPEAAAAMY3BvQExSMS53dWUzAQIDBAUGBw== """ rpki_ssh_pub = """ AAAAB3NzaC1yc2EAAAADAQABAAABAQDB4PJ+UMVHirITARNsmRnZllz6wk2INO9inAaxYi SO7j2UlLEd7XEp/wWHB/Iy7jRKe3XIOVGaabPgKxsuBu5kYw+9cbXV7fj4LSlJVS+kXpqp FN4uXWkNErtGeOCbey74jxJEtBHipssbpVdY4W5WnCKpsTEtpTSEVhM80/50Cs2mxYQQqy iGTqlNd2GDdXNane8IyOXYlOyENreUPunbEhJIBF4RjR+d8QCZDMfhnoMjDAZ0W4xahAiD 6gUEz0BGs7b8UiNGzPoB5xH9Z5TvYkTDhomJzX9wAlADo+JAEfCS6jdaWXW1unF5FmgHY1 AbbYRPIku+F9Nqig0tP0el """ rpki_ssh_key_replacement = """ b3BlbnNzaC1rZXktdjEAAAAABG5vbmUAAAAEbm9uZQAAAAAAAAABAAABFwAAAAdzc2gtcn NhAAAAAwEAAQAAAQEAtLPMwiGR3o6puPDbus9Yqoah9/7rv7i6ykykPmcEZ6ERnA0N6bl7 LkQxnCuX270ukTTZOhROvQnvQYIZohCMz27Q16z7r+I755QXL0x8x4Gqhg/hQUY7UtX6ts db8+pO7G1PL4r9zT6/KJAF/wv86DezJ3I6TMaA7MCikXfQWJisBvhgAXF1+7V9CWaroGgV /hHzQJu1yd4cfsYoHyeDaZ+lwFw4egNItIy63fIGDxrnXaonJ1ODGQh7zWlpl/cwQR/KyJ P8vvOZ9olQ6syZV+DAcAo4Fe59wW2Zj4bl8bdGcdiDn0grkafxwTcg9ynr9kwQ8b66oXY4 hwB4vlPFPwAAA8jkGyX45Bsl+AAAAAdzc2gtcnNhAAABAQC0s8zCIZHejqm48Nu6z1iqhq H3/uu/uLrKTKQ+ZwRnoRGcDQ3puXsuRDGcK5fbvS6RNNk6FE69Ce9BghmiEIzPbtDXrPuv 4jvnlBcvTHzHgaqGD+FBRjtS1fq2x1vz6k7sbU8viv3NPr8okAX/C/zoN7MncjpMxoDswK KRd9BYmKwG+GABcXX7tX0JZqugaBX+EfNAm7XJ3hx+xigfJ4Npn6XAXDh6A0i0jLrd8gYP GuddqicnU4MZCHvNaWmX9zBBH8rIk/y+85n2iVDqzJlX4MBwCjgV7n3BbZmPhuXxt0Zx2I OfSCuRp/HBNyD3Kev2TBDxvrqhdjiHAHi+U8U/AAAAAwEAAQAAAQA99gkX5/rknXaE+9Hc VIzKrC+NodOkgetKwszuuNRB1HD9WVyT8A3U5307V5dSuaPmFoEF8UCugWGQzNONRq+B0T W7Po1u2dxAo/7vMQL4RfX60icjAroExWqakfFtycIWP8UPQFGWtxVFC12C/tFRrwe3Vuu2 t7otdEBKMRM3zU0Hj88/5FIk/MDhththDCKTMe4+iwNKo30dyqSCckpTd2k5de9JYz8Aom 87jtQcyDdynaELSo9CsA8KRPlozZ4VSWTVLH+Cv2TZWPL7hy79YvvIfuF/Sd6PGkNwG1Vj TAbq2Wx4uq+HmpNiz7W0LnbZtQJ7dzLA3FZlvQMC8fVBAAAAgQDWvImVZCyVWpoG+LnKY3 joegjKRYKdgKRPCqGoIHiYsqCRxqSRW3jsuQCCvk4YO3/ZmqORiGktK+5r8R1QEtwg5qbi N7GZD34m7USNuqG2G/4puEly8syMmR6VRRvEURFQrpv2wniXNSefvsDc+WDqTfXGUxr+FT 478wkzjwc/fAAAAIEA9uP0Ym3OC3cZ5FOvmu51lxo5lqPlUeE78axg2I4u/9Il8nOvSVuq B9X5wAUyGAGcUjT3EZmRAtL2sQxc5T0Vw3bnxCjzukEbFM+DRtYy1hXSOoGTTwKoMWBpho R3X5uRLUQL/22C4rd7tSJpjqnZXIH0B5z2fFh4vzu8/SrgCrUAAACBALtep4BcGJfjfhfF ODzQe7Rk7tsaX8pfNv6bQu0sR5C9pDURFRf0fRC0oqgeTuzq/vHPyNLsUUgTCpKWiLFmvU G9pelLT3XPPgzA+g0gycM0unuX8kkP3T5VQAM/7u0+h1CaJ8A6cCkzvDJxYdfio3WR60OP ulHg7HCcyomFLaSjAAAADGNwb0BMUjEud3VlMwECAwQFBg== """ rpki_ssh_pub_replacement = """ AAAAB3NzaC1yc2EAAAADAQABAAABAQC0s8zCIZHejqm48Nu6z1iqhqH3/uu/uLrKTKQ+Zw RnoRGcDQ3puXsuRDGcK5fbvS6RNNk6FE69Ce9BghmiEIzPbtDXrPuv4jvnlBcvTHzHgaqG D+FBRjtS1fq2x1vz6k7sbU8viv3NPr8okAX/C/zoN7MncjpMxoDswKKRd9BYmKwG+GABcX X7tX0JZqugaBX+EfNAm7XJ3hx+xigfJ4Npn6XAXDh6A0i0jLrd8gYPGuddqicnU4MZCHvN aWmX9zBBH8rIk/y+85n2iVDqzJlX4MBwCjgV7n3BbZmPhuXxt0Zx2IOfSCuRp/HBNyD3Ke v2TBDxvrqhdjiHAHi+U8U/ """ class TestProtocolsRPKI(VyOSUnitTestSHIM.TestCase): @classmethod def setUpClass(cls): # call base-classes classmethod super(TestProtocolsRPKI, cls).setUpClass() # Retrieve FRR daemon PID - it is not allowed to crash, thus PID must remain the same - cls.daemon_pid = process_named_running(PROCESS_NAME) + cls.daemon_pid = process_named_running(bgp_daemon) # ensure we can also run this test on a live system - so lets clean # out the current configuration :) cls.cli_delete(cls, base_path) def tearDown(self): self.cli_delete(base_path) self.cli_commit() frrconfig = self.getFRRconfig('rpki') self.assertNotIn(f'rpki', frrconfig) # check process health and continuity - self.assertEqual(self.daemon_pid, process_named_running(PROCESS_NAME)) + self.assertEqual(self.daemon_pid, process_named_running(bgp_daemon)) def test_rpki(self): expire_interval = '3600' polling_period = '600' retry_interval = '300' cache = { '192.0.2.1' : { 'port' : '8080', 'preference' : '10' }, '2001:db8::1' : { 'port' : '1234', 'preference' : '30' }, 'rpki.vyos.net' : { 'port' : '5678', 'preference' : '40' }, } self.cli_set(base_path + ['expire-interval', expire_interval]) self.cli_set(base_path + ['polling-period', polling_period]) self.cli_set(base_path + ['retry-interval', retry_interval]) for peer, peer_config in cache.items(): self.cli_set(base_path + ['cache', peer, 'port', peer_config['port']]) self.cli_set(base_path + ['cache', peer, 'preference', peer_config['preference']]) # commit changes self.cli_commit() # Verify FRR configuration frrconfig = self.getFRRconfig('rpki') self.assertIn(f'rpki expire_interval {expire_interval}', frrconfig) self.assertIn(f'rpki polling_period {polling_period}', frrconfig) self.assertIn(f'rpki retry_interval {retry_interval}', frrconfig) for peer, peer_config in cache.items(): port = peer_config['port'] preference = peer_config['preference'] self.assertIn(f'rpki cache tcp {peer} {port} preference {preference}', frrconfig) def test_rpki_ssh(self): polling = '7200' cache = { '192.0.2.3' : { 'port' : '1234', 'username' : 'foo', 'preference' : '10' }, '192.0.2.4' : { 'port' : '5678', 'username' : 'bar', 'preference' : '20' }, } self.cli_set(['pki', 'openssh', rpki_key_name, 'private', 'key', rpki_ssh_key.replace('\n','')]) self.cli_set(['pki', 'openssh', rpki_key_name, 'public', 'key', rpki_ssh_pub.replace('\n','')]) self.cli_set(['pki', 'openssh', rpki_key_name, 'public', 'type', rpki_key_type]) for cache_name, cache_config in cache.items(): self.cli_set(base_path + ['cache', cache_name, 'port', cache_config['port']]) self.cli_set(base_path + ['cache', cache_name, 'preference', cache_config['preference']]) self.cli_set(base_path + ['cache', cache_name, 'ssh', 'username', cache_config['username']]) self.cli_set(base_path + ['cache', cache_name, 'ssh', 'key', rpki_key_name]) # commit changes self.cli_commit() # Verify FRR configuration frrconfig = self.getFRRconfig('rpki') for cache_name, cache_config in cache.items(): port = cache_config['port'] preference = cache_config['preference'] username = cache_config['username'] self.assertIn(f'rpki cache ssh {cache_name} {port} {username} /run/frr/id_rpki_{cache_name} /run/frr/id_rpki_{cache_name}.pub preference {preference}', frrconfig) # Verify content of SSH keys tmp = read_file(f'/run/frr/id_rpki_{cache_name}') self.assertIn(rpki_ssh_key.replace('\n',''), tmp) tmp = read_file(f'/run/frr/id_rpki_{cache_name}.pub') self.assertIn(rpki_ssh_pub.replace('\n',''), tmp) # Change OpenSSH key and verify it was properly written to filesystem self.cli_set(['pki', 'openssh', rpki_key_name, 'private', 'key', rpki_ssh_key_replacement.replace('\n','')]) self.cli_set(['pki', 'openssh', rpki_key_name, 'public', 'key', rpki_ssh_pub_replacement.replace('\n','')]) # commit changes self.cli_commit() for cache_name, cache_config in cache.items(): port = cache_config['port'] preference = cache_config['preference'] username = cache_config['username'] self.assertIn(f'rpki cache ssh {cache_name} {port} {username} /run/frr/id_rpki_{cache_name} /run/frr/id_rpki_{cache_name}.pub preference {preference}', frrconfig) # Verify content of SSH keys tmp = read_file(f'/run/frr/id_rpki_{cache_name}') self.assertIn(rpki_ssh_key_replacement.replace('\n',''), tmp) tmp = read_file(f'/run/frr/id_rpki_{cache_name}.pub') self.assertIn(rpki_ssh_pub_replacement.replace('\n',''), tmp) self.cli_delete(['pki', 'openssh']) def test_rpki_verify_preference(self): cache = { '192.0.2.1' : { 'port' : '8080', 'preference' : '1' }, '192.0.2.2' : { 'port' : '9090', 'preference' : '1' }, } for peer, peer_config in cache.items(): self.cli_set(base_path + ['cache', peer, 'port', peer_config['port']]) self.cli_set(base_path + ['cache', peer, 'preference', peer_config['preference']]) # check validate() - preferences must be unique with self.assertRaises(ConfigSessionError): self.cli_commit() if __name__ == '__main__': unittest.main(verbosity=2) diff --git a/smoketest/scripts/cli/test_protocols_segment-routing.py b/smoketest/scripts/cli/test_protocols_segment-routing.py index 8905d5e45..624985476 100755 --- a/smoketest/scripts/cli/test_protocols_segment-routing.py +++ b/smoketest/scripts/cli/test_protocols_segment-routing.py @@ -1,111 +1,110 @@ #!/usr/bin/env python3 # # Copyright (C) 2023-2024 VyOS maintainers and contributors # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License version 2 or later as # published by the Free Software Foundation. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. import unittest from base_vyostest_shim import VyOSUnitTestSHIM from vyos.configsession import ConfigSessionError from vyos.ifconfig import Section from vyos.frrender import zebra_daemon from vyos.utils.process import process_named_running from vyos.utils.system import sysctl_read base_path = ['protocols', 'segment-routing'] -PROCESS_NAME = 'zebra' class TestProtocolsSegmentRouting(VyOSUnitTestSHIM.TestCase): @classmethod def setUpClass(cls): # call base-classes classmethod super(TestProtocolsSegmentRouting, cls).setUpClass() # Retrieve FRR daemon PID - it is not allowed to crash, thus PID must remain the same - cls.daemon_pid = process_named_running(PROCESS_NAME) + cls.daemon_pid = process_named_running(zebra_daemon) # ensure we can also run this test on a live system - so lets clean # out the current configuration :) cls.cli_delete(cls, base_path) def tearDown(self): self.cli_delete(base_path) self.cli_commit() # check process health and continuity - self.assertEqual(self.daemon_pid, process_named_running(PROCESS_NAME)) + self.assertEqual(self.daemon_pid, process_named_running(zebra_daemon)) def test_srv6(self): interfaces = Section.interfaces('ethernet', vlan=False) locators = { 'foo' : { 'prefix' : '2001:a::/64' }, 'foo' : { 'prefix' : '2001:b::/64', 'usid' : {} }, } for locator, locator_config in locators.items(): self.cli_set(base_path + ['srv6', 'locator', locator, 'prefix', locator_config['prefix']]) if 'usid' in locator_config: self.cli_set(base_path + ['srv6', 'locator', locator, 'behavior-usid']) # verify() - SRv6 should be enabled on at least one interface! with self.assertRaises(ConfigSessionError): self.cli_commit() for interface in interfaces: self.cli_set(base_path + ['interface', interface, 'srv6']) self.cli_commit() for interface in interfaces: self.assertEqual(sysctl_read(f'net.ipv6.conf.{interface}.seg6_enabled'), '1') self.assertEqual(sysctl_read(f'net.ipv6.conf.{interface}.seg6_require_hmac'), '0') # default frrconfig = self.getFRRconfig(f'segment-routing', daemon=zebra_daemon) self.assertIn(f'segment-routing', frrconfig) self.assertIn(f' srv6', frrconfig) self.assertIn(f' locators', frrconfig) for locator, locator_config in locators.items(): self.assertIn(f' locator {locator}', frrconfig) self.assertIn(f' prefix {locator_config["prefix"]} block-len 40 node-len 24 func-bits 16', frrconfig) def test_srv6_sysctl(self): interfaces = Section.interfaces('ethernet', vlan=False) # HMAC accept for interface in interfaces: self.cli_set(base_path + ['interface', interface, 'srv6']) self.cli_set(base_path + ['interface', interface, 'srv6', 'hmac', 'ignore']) self.cli_commit() for interface in interfaces: self.assertEqual(sysctl_read(f'net.ipv6.conf.{interface}.seg6_enabled'), '1') self.assertEqual(sysctl_read(f'net.ipv6.conf.{interface}.seg6_require_hmac'), '-1') # ignore # HMAC drop for interface in interfaces: self.cli_set(base_path + ['interface', interface, 'srv6']) self.cli_set(base_path + ['interface', interface, 'srv6', 'hmac', 'drop']) self.cli_commit() for interface in interfaces: self.assertEqual(sysctl_read(f'net.ipv6.conf.{interface}.seg6_enabled'), '1') self.assertEqual(sysctl_read(f'net.ipv6.conf.{interface}.seg6_require_hmac'), '1') # drop # Disable SRv6 on first interface first_if = interfaces[-1] self.cli_delete(base_path + ['interface', first_if]) self.cli_commit() self.assertEqual(sysctl_read(f'net.ipv6.conf.{first_if}.seg6_enabled'), '0') if __name__ == '__main__': unittest.main(verbosity=2)