diff --git a/python/vyos/frr.py b/python/vyos/frr.py deleted file mode 100644 index 67279a6f7..000000000 --- a/python/vyos/frr.py +++ /dev/null @@ -1,569 +0,0 @@ -# Copyright 2020-2024 VyOS maintainers and contributors <maintainers@vyos.io> -# -# This library is free software; you can redistribute it and/or -# modify it under the terms of the GNU Lesser General Public -# License as published by the Free Software Foundation; either -# version 2.1 of the License, or (at your option) any later version. -# -# This library is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU -# Lesser General Public License for more details. -# -# You should have received a copy of the GNU Lesser General Public License -# along with this library. If not, see <http://www.gnu.org/licenses/>. - -r""" -A Library for interracting with the FRR daemon suite. -It supports simple configuration manipulation and loading using the official tools -supplied with FRR (vtysh and frr-reload) - -All configuration management and manipulation is done using strings and regex. - - -Example Usage -##### - -# Reading configuration from frr: -``` ->>> original_config = get_configuration() ->>> repr(original_config) -'!\nfrr version 7.3.1\nfrr defaults traditional\nhostname debian\n...... -``` - - -# Modify a configuration section: -``` ->>> new_bgp_section = 'router bgp 65000\n neighbor 192.0.2.1 remote-as 65000\n' ->>> modified_config = replace_section(original_config, new_bgp_section, replace_re=r'router bgp \d+') ->>> repr(modified_config) -'............router bgp 65000\n neighbor 192.0.2.1 remote-as 65000\n...........' -``` - -Remove a configuration section: -``` ->>> modified_config = remove_section(original_config, r'router ospf') -``` - -Test the new configuration: -``` ->>> try: ->>> mark_configuration(modified configuration) ->>> except ConfigurationNotValid as e: ->>> print('resulting configuration is not valid') ->>> sys.exit(1) -``` - -Apply the new configuration: -``` ->>> try: ->>> replace_configuration(modified_config) ->>> except CommitError as e: ->>> print('Exception while commiting the supplied configuration') ->>> print(e) ->>> exit(1) -``` -""" - -import tempfile -import re - -from vyos import ConfigError -from vyos.defaults import frr_debug_enable -from vyos.utils.process import cmd -from vyos.utils.process import popen -from vyos.utils.process import STDOUT - -import logging -from logging.handlers import SysLogHandler -import os -import sys - -LOG = logging.getLogger(__name__) -DEBUG = False - -ch = SysLogHandler(address='/dev/log') -ch2 = logging.StreamHandler(stream=sys.stdout) -LOG.addHandler(ch) -LOG.addHandler(ch2) - -babel_daemon = 'babeld' -bfd_daemon = 'bfdd' -bgp_daemon = 'bgpd' -eigrp_daemon = 'eigrpd' -isis_daemon = 'isisd' -ldpd_daemon = 'ldpd' -mgmt_daemon = 'mgmtd' -openfabric_daemon = 'fabricd' -ospf_daemon = 'ospfd' -ospf6_daemon = 'ospf6d' -pim_daemon = 'pimd' -pim6_daemon = 'pim6d' -rip_daemon = 'ripd' -ripng_daemon = 'ripngd' -static_daemon = 'staticd' -zebra_daemon = 'zebra' - -_frr_daemons = [zebra_daemon, static_daemon, bgp_daemon, ospf_daemon, ospf6_daemon, rip_daemon, ripng_daemon, mgmt_daemon, - isis_daemon, pim_daemon, pim6_daemon, ldpd_daemon, eigrp_daemon, babel_daemon, bfd_daemon, openfabric_daemon] - -path_vtysh = '/usr/bin/vtysh' -path_frr_reload = '/usr/lib/frr/frr-reload.py' -path_config = '/run/frr' - -default_add_before = r'(ip prefix-list .*|route-map .*|line vty|end)' - - -class FrrError(Exception): - pass - - -class ConfigurationNotValid(FrrError): - """ - The configuratioin supplied to vtysh is not valid - """ - pass - - -class CommitError(FrrError): - """ - Commiting the supplied configuration failed to commit by a unknown reason - see commit error and/or run mark_configuration on the specified configuration - to se error generated - - used by: reload_configuration() - """ - pass - - -class ConfigSectionNotFound(FrrError): - """ - Removal of configuration failed because it is not existing in the supplied configuration - """ - pass - -def init_debugging(): - global DEBUG - - DEBUG = os.path.exists(frr_debug_enable) - if DEBUG: - LOG.setLevel(logging.DEBUG) - -def get_configuration(daemon=None, marked=False): - """ Get current running FRR configuration - daemon: Collect only configuration for the specified FRR daemon, - supplying daemon=None retrieves the complete configuration - marked: Mark the configuration with "end" tags - - return: string containing the running configuration from frr - - """ - if daemon and daemon not in _frr_daemons: - raise ValueError(f'The specified daemon type is not supported {repr(daemon)}') - - cmd = f"{path_vtysh} -c 'show run'" - if daemon: - cmd += f' -d {daemon}' - - output, code = popen(cmd, stderr=STDOUT) - if code: - raise OSError(code, output) - - config = output.replace('\r', '') - # Remove first header lines from FRR config - config = config.split("\n", 3)[-1] - # Mark the configuration with end tags - if marked: - config = mark_configuration(config) - - return config - - -def mark_configuration(config): - """ Add end marks and Test the configuration for syntax faults - If the configuration is valid a marked version of the configuration is returned, - or else it failes with a ConfigurationNotValid Exception - - config: The configuration string to mark/test - return: The marked configuration from FRR - """ - output, code = popen(f"{path_vtysh} -m -f -", stderr=STDOUT, input=config) - - if code == 2: - raise ConfigurationNotValid(str(output)) - elif code: - raise OSError(code, output) - - config = output.replace('\r', '') - return config - - -def reload_configuration(config, daemon=None): - """ Execute frr-reload with the new configuration - This will try to reapply the supplied configuration inside FRR. - The configuration needs to be a complete configuration from the integrated config or - from a daemon. - - config: The configuration to apply - daemon: Apply the conigutaion to the specified FRR daemon, - supplying daemon=None applies to the integrated configuration - return: None - """ - if daemon and daemon not in _frr_daemons: - raise ValueError(f'The specified daemon type is not supported {repr(daemon)}') - - f = tempfile.NamedTemporaryFile('w') - f.write(config) - f.flush() - - LOG.debug(f'reload_configuration: Reloading config using temporary file: {f.name}') - cmd = f'{path_frr_reload} --reload' - if daemon: - cmd += f' --daemon {daemon}' - - if DEBUG: - cmd += f' --debug --stdout' - - cmd += f' {f.name}' - - LOG.debug(f'reload_configuration: Executing command against frr-reload: "{cmd}"') - output, code = popen(cmd, stderr=STDOUT) - f.close() - - for i, e in enumerate(output.split('\n')): - LOG.debug(f'frr-reload output: {i:3} {e}') - - if code == 1: - raise ConfigError(output) - elif code: - raise OSError(code, output) - - return output - - -def save_configuration(): - """ T3217: Save FRR configuration to /run/frr/config/frr.conf """ - return cmd(f'{path_vtysh} -n -w') - - -def execute(command): - """ Run commands inside vtysh - command: str containing commands to execute inside a vtysh session - """ - if not isinstance(command, str): - raise ValueError(f'command needs to be a string: {repr(command)}') - - cmd = f"{path_vtysh} -c '{command}'" - - output, code = popen(cmd, stderr=STDOUT) - if code: - raise OSError(code, output) - - config = output.replace('\r', '') - return config - - -def configure(lines, daemon=False): - """ run commands inside config mode vtysh - lines: list or str conaining commands to execute inside a configure session - only one command executed on each configure() - Executing commands inside a subcontext uses the list to describe the context - ex: ['router bgp 6500', 'neighbor 192.0.2.1 remote-as 65000'] - return: None - """ - if isinstance(lines, str): - lines = [lines] - elif not isinstance(lines, list): - raise ValueError('lines needs to be string or list of commands') - - if daemon and daemon not in _frr_daemons: - raise ValueError(f'The specified daemon type is not supported {repr(daemon)}') - - cmd = f'{path_vtysh}' - if daemon: - cmd += f' -d {daemon}' - - cmd += " -c 'configure terminal'" - for x in lines: - cmd += f" -c '{x}'" - - output, code = popen(cmd, stderr=STDOUT) - if code == 1: - raise ConfigurationNotValid(f'Configuration FRR failed: {repr(output)}') - elif code: - raise OSError(code, output) - - config = output.replace('\r', '') - return config - - -def _replace_section(config, replacement, replace_re, before_re): - r"""Replace a section of FRR config - config: full original configuration - replacement: replacement configuration section - replace_re: The regex to replace - example: ^router bgp \d+$.?*^!$ - this will replace everything between ^router bgp X$ and ^!$ - before_re: When replace_re is not existant, the config will be added before this tag - example: ^line vty$ - - return: modified configuration as a text file - """ - # DEPRECATED, this is replaced by a new implementation - # Check if block is configured, remove the existing instance else add a new one - if re.findall(replace_re, config, flags=re.MULTILINE | re.DOTALL): - # Section is in the configration, replace it - return re.sub(replace_re, replacement, config, count=1, - flags=re.MULTILINE | re.DOTALL) - if before_re: - if not re.findall(before_re, config, flags=re.MULTILINE | re.DOTALL): - raise ConfigSectionNotFound(f"Config section {before_re} not found in config") - - # If no section is in the configuration, add it before the line vty line - return re.sub(before_re, rf'{replacement}\n\g<1>', config, count=1, - flags=re.MULTILINE | re.DOTALL) - - raise ConfigSectionNotFound(f"Config section {replacement} not found in config") - - -def replace_section(config, replacement, from_re, to_re=r'!', before_re=r'line vty'): - r"""Replace a section of FRR config - config: full original configuration - replacement: replacement configuration section - from_re: Regex for the start of section matching - example: 'router bgp \d+' - to_re: Regex for stop of section matching - default: '!' - example: '!' or 'end' - before_re: When from_re/to_re does not return a match, the config will - be added before this tag - default: ^line vty$ - - startline and endline tags will be automatically added to the resulting from_re/to_re and before_re regex'es - """ - # DEPRECATED, this is replaced by a new implementation - return _replace_section(config, replacement, replace_re=rf'^{from_re}$.*?^{to_re}$', before_re=rf'^({before_re})$') - - -def remove_section(config, from_re, to_re='!'): - # DEPRECATED, this is replaced by a new implementation - return _replace_section(config, '', replace_re=rf'^{from_re}$.*?^{to_re}$', before_re=None) - - -def _find_first_block(config, start_pattern, stop_pattern, start_at=0): - '''Find start and stop line numbers for a config block - config: (list) A list conaining the configuration that is searched - start_pattern: (raw-str) The pattern searched for a a start of block tag - stop_pattern: (raw-str) The pattern searched for to signify the end of the block - start_at: (int) The index to start searching at in the <config> - - Returns: - None: No complete block could be found - set(int, int): A complete block found between the line numbers returned in the set - - The object <config> is searched from the start for the regex <start_pattern> until the first match is found. - On a successful match it continues the search for the regex <stop_pattern> until it is found. - After a successful run a set is returned containing the start and stop line numbers. - ''' - LOG.debug(f'_find_first_block: find start={repr(start_pattern)} stop={repr(stop_pattern)} start_at={start_at}') - _start = None - for i, element in enumerate(config[start_at:], start=start_at): - # LOG.debug(f'_find_first_block: running line {i:3} "{element}"') - if not _start: - if not re.match(start_pattern, element): - LOG.debug(f'_find_first_block: no match {i:3} "{element}"') - continue - _start = i - LOG.debug(f'_find_first_block: Found start {i:3} "{element}"') - continue - - if not re.match(stop_pattern, element): - LOG.debug(f'_find_first_block: no match {i:3} "{element}"') - continue - - LOG.debug(f'_find_first_block: Found stop {i:3} "{element}"') - return (_start, i) - - LOG.debug('_find_first_block: exit start={repr(start_pattern)} stop={repr(stop_pattern)} start_at={start_at}') - return None - - -def _find_first_element(config, pattern, start_at=0): - '''Find the first element that matches the current pattern in config - config: (list) A list containing the configuration that is searched - start_pattern: (raw-str) The pattern searched for - start_at: (int) The index to start searching at in the <config> - - return: Line index of the line containing the searched pattern - - TODO: for now it returns -1 on a no-match because 0 also returns as False - TODO: that means that we can not use False matching to tell if its - ''' - LOG.debug(f'_find_first_element: find start="{pattern}" start_at={start_at}') - for i, element in enumerate(config[start_at:], start=0): - if re.match(pattern + '$', element): - LOG.debug(f'_find_first_element: Found stop {i:3} "{element}"') - return i - LOG.debug(f'_find_first_element: no match {i:3} "{element}"') - LOG.debug(f'_find_first_element: Did not find any match, exiting') - return -1 - - -def _find_elements(config, pattern, start_at=0): - '''Find all instances of pattern and return a list containing all element indexes - config: (list) A list containing the configuration that is searched - start_pattern: (raw-str) The pattern searched for - start_at: (int) The index to start searching at in the <config> - - return: A list of line indexes containing the searched pattern - TODO: refactor this to return a generator instead - ''' - return [i for i, element in enumerate(config[start_at:], start=0) if re.match(pattern + '$', element)] - - -class FRRConfig: - '''Main FRR Configuration manipulation object - Using this object the user could load, manipulate and commit the configuration to FRR - ''' - def __init__(self, config=[]): - self.imported_config = '' - - if isinstance(config, list): - self.config = config.copy() - self.original_config = config.copy() - elif isinstance(config, str): - self.config = config.split('\n') - self.original_config = self.config.copy() - else: - raise ValueError( - 'The config element needs to be a string or list type object') - - if config: - LOG.debug(f'__init__: frr library initiated with initial config') - for i, e in enumerate(self.config): - LOG.debug(f'__init__: initial {i:3} {e}') - - def load_configuration(self, daemon=None): - '''Load the running configuration from FRR into the config object - daemon: str with name of the FRR Daemon to load configuration from or - None to load the consolidated config - - Using this overwrites the current loaded config objects and replaces the original loaded config - ''' - init_debugging() - - self.imported_config = get_configuration(daemon=daemon) - if daemon: - LOG.debug(f'load_configuration: Configuration loaded from FRR daemon {daemon}') - else: - LOG.debug(f'load_configuration: Configuration loaded from FRR integrated config') - - self.original_config = self.imported_config.split('\n') - self.config = self.original_config.copy() - - for i, e in enumerate(self.imported_config.split('\n')): - LOG.debug(f'load_configuration: loaded {i:3} {e}') - return - - def test_configuration(self): - '''Test the current configuration against FRR - This will exception if FRR failes to load the current configuration object - ''' - LOG.debug('test_configation: Testing configuration') - mark_configuration('\n'.join(self.config)) - - def commit_configuration(self, daemon=None): - ''' - Commit the current configuration to FRR daemon: str with name of the - FRR daemon to commit to or None to use the consolidated config. - - Configuration is automatically saved after apply - ''' - LOG.debug('commit_configuration: Commiting configuration') - for i, e in enumerate(self.config): - LOG.debug(f'commit_configuration: new_config {i:3} {e}') - - # https://github.com/FRRouting/frr/issues/10132 - # https://github.com/FRRouting/frr/issues/10133 - count = 0 - count_max = 5 - emsg = '' - while count < count_max: - count += 1 - try: - reload_configuration('\n'.join(self.config), daemon=daemon) - break - except ConfigError as e: - emsg = str(e) - except: - # we just need to re-try the commit of the configuration - # for the listed FRR issues above - pass - if count >= count_max: - if emsg: - raise ConfigError(emsg) - raise ConfigurationNotValid(f'Config commit retry counter ({count_max}) exceeded for {daemon} daemon!') - - # Save configuration to /run/frr/config/frr.conf - save_configuration() - - - def modify_section(self, start_pattern, replacement='!', stop_pattern=r'\S+', remove_stop_mark=False, count=0): - if isinstance(replacement, str): - replacement = replacement.split('\n') - elif not isinstance(replacement, list): - return ValueError("The replacement element needs to be a string or list type object") - LOG.debug(f'modify_section: starting search for {repr(start_pattern)} until {repr(stop_pattern)}') - - _count = 0 - _next_start = 0 - while True: - if count and count <= _count: - # Break out of the loop after specified amount of matches - LOG.debug(f'modify_section: reached limit ({_count}), exiting loop at line {_next_start}') - break - # While searching, always assume that the user wants to search for the exact pattern he entered - # To be more specific the user needs a override, eg. a "pattern.*" - _w = _find_first_block( - self.config, start_pattern+'$', stop_pattern, start_at=_next_start) - if not _w: - # Reached the end, no more elements to remove - LOG.debug(f'modify_section: No more config sections found, exiting') - break - start_element, end_element = _w - LOG.debug(f'modify_section: found match between {start_element} and {end_element}') - for i, e in enumerate(self.config[start_element:end_element+1 if remove_stop_mark else end_element], - start=start_element): - LOG.debug(f'modify_section: remove {i:3} {e}') - del self.config[start_element:end_element + - 1 if remove_stop_mark else end_element] - if replacement: - # Append the replacement config at the current position - for i, e in enumerate(replacement, start=start_element): - LOG.debug(f'modify_section: add {i:3} {e}') - self.config[start_element:start_element] = replacement - _count += 1 - _next_start = start_element + len(replacement) - - return _count - - def add_before(self, before_pattern, addition): - '''Add config block before this element in the configuration''' - if isinstance(addition, str): - addition = addition.split('\n') - elif not isinstance(addition, list): - return ValueError("The replacement element needs to be a string or list type object") - - start = _find_first_element(self.config, before_pattern) - if start < 0: - return False - for i, e in enumerate(addition, start=start): - LOG.debug(f'add_before: add {i:3} {e}') - self.config[start:start] = addition - return True - - def __str__(self): - return '\n'.join(self.config) - - def __repr__(self): - return f'frr({repr(str(self))})' diff --git a/python/vyos/frrender.py b/python/vyos/frrender.py index 7a0b661a3..ead893ff9 100644 --- a/python/vyos/frrender.py +++ b/python/vyos/frrender.py @@ -1,167 +1,168 @@ # Copyright 2024 VyOS maintainers and contributors <maintainers@vyos.io> # # This library is free software; you can redistribute it and/or # modify it under the terms of the GNU Lesser General Public # License as published by the Free Software Foundation; either # version 2.1 of the License, or (at your option) any later version. # # This library is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU # Lesser General Public License for more details. # # You should have received a copy of the GNU Lesser General Public License # along with this library. If not, see <http://www.gnu.org/licenses/>. """ Library used to interface with FRRs mgmtd introduced in version 10.0 """ import os from vyos.defaults import frr_debug_enable from vyos.utils.file import write_file from vyos.utils.process import rc_cmd from vyos.template import render_to_string from vyos import ConfigError DEBUG_ON = os.path.exists(frr_debug_enable) def debug(message): if not DEBUG_ON: return print(message) frr_protocols = ['babel', 'bfd', 'bgp', 'eigrp', 'isis', 'mpls', 'nhrp', 'openfabric', 'ospf', 'ospfv3', 'pim', 'pim6', 'rip', 'ripng', 'rpki', 'segment_routing', 'static'] bgp_daemon = 'bgpd' isis_daemon = 'isisd' mgmt_daemon = 'mgmtd' +openfabric_daemon = 'fabricd' pim_daemon = 'pimd' zebra_daemon = 'zebra' class FRRender: def __init__(self): self._frr_conf = '/run/frr/config/frr.conf' def generate(self, config): if not isinstance(config, dict): tmp = type(config) raise ValueError(f'Config must be of type "dict" and not "{tmp}"!') def inline_helper(config_dict) -> str: output = '!\n' if 'babel' in config_dict and 'deleted' not in config_dict['babel']: output += render_to_string('frr/babeld.frr.j2', config_dict['babel']) output += '\n' if 'bfd' in config_dict and 'deleted' not in config_dict['bfd']: output += render_to_string('frr/bfdd.frr.j2', config_dict['bfd']) output += '\n' if 'bgp' in config_dict and 'deleted' not in config_dict['bgp']: output += render_to_string('frr/bgpd.frr.j2', config_dict['bgp']) output += '\n' if 'eigrp' in config_dict and 'deleted' not in config_dict['eigrp']: output += render_to_string('frr/eigrpd.frr.j2', config_dict['eigrp']) output += '\n' if 'isis' in config_dict and 'deleted' not in config_dict['isis']: output += render_to_string('frr/isisd.frr.j2', config_dict['isis']) output += '\n' if 'mpls' in config_dict and 'deleted' not in config_dict['mpls']: output += render_to_string('frr/ldpd.frr.j2', config_dict['mpls']) output += '\n' if 'openfabric' in config_dict and 'deleted' not in config_dict['openfabric']: output += render_to_string('frr/fabricd.frr.j2', config_dict['openfabric']) output += '\n' if 'ospf' in config_dict and 'deleted' not in config_dict['ospf']: output += render_to_string('frr/ospfd.frr.j2', config_dict['ospf']) output += '\n' if 'ospfv3' in config_dict and 'deleted' not in config_dict['ospfv3']: output += render_to_string('frr/ospf6d.frr.j2', config_dict['ospfv3']) output += '\n' if 'pim' in config_dict and 'deleted' not in config_dict['pim']: output += render_to_string('frr/pimd.frr.j2', config_dict['pim']) output += '\n' if 'pim6' in config_dict and 'deleted' not in config_dict['pim6']: output += render_to_string('frr/pim6d.frr.j2', config_dict['pim6']) output += '\n' if 'policy' in config_dict and len(config_dict['policy']) > 0: output += render_to_string('frr/policy.frr.j2', config_dict['policy']) output += '\n' if 'rip' in config_dict and 'deleted' not in config_dict['rip']: output += render_to_string('frr/ripd.frr.j2', config_dict['rip']) output += '\n' if 'ripng' in config_dict and 'deleted' not in config_dict['ripng']: output += render_to_string('frr/ripngd.frr.j2', config_dict['ripng']) output += '\n' if 'rpki' in config_dict and 'deleted' not in config_dict['rpki']: output += render_to_string('frr/rpki.frr.j2', config_dict['rpki']) output += '\n' if 'segment_routing' in config_dict and 'deleted' not in config_dict['segment_routing']: output += render_to_string('frr/zebra.segment_routing.frr.j2', config_dict['segment_routing']) output += '\n' if 'static' in config_dict and 'deleted' not in config_dict['static']: output += render_to_string('frr/staticd.frr.j2', config_dict['static']) output += '\n' if 'ip' in config_dict and 'deleted' not in config_dict['ip']: output += render_to_string('frr/zebra.route-map.frr.j2', config_dict['ip']) output += '\n' if 'ipv6' in config_dict and 'deleted' not in config_dict['ipv6']: output += render_to_string('frr/zebra.route-map.frr.j2', config_dict['ipv6']) output += '\n' return output debug('======< RENDERING CONFIG >======') # we can not reload an empty file, thus we always embed the marker output = '!\n' # Enable SNMP agentx support # SNMP AgentX support cannot be disabled once enabled if 'snmp' in config: output += 'agentx\n' # Add routing protocols in global VRF output += inline_helper(config) # Interface configuration for EVPN is not VRF related if 'interfaces' in config: output += render_to_string('frr/evpn.mh.frr.j2', {'interfaces' : config['interfaces']}) output += '\n' if 'vrf' in config and 'name' in config['vrf']: output += render_to_string('frr/zebra.vrf.route-map.frr.j2', config['vrf']) + '\n' for vrf, vrf_config in config['vrf']['name'].items(): if 'protocols' not in vrf_config: continue for protocol in vrf_config['protocols']: vrf_config['protocols'][protocol]['vrf'] = vrf output += inline_helper(vrf_config['protocols']) debug(output) debug('======< RENDERING CONFIG COMPLETE >======') write_file(self._frr_conf, output) if DEBUG_ON: write_file('/tmp/frr.conf.debug', output) def apply(self): count = 0 count_max = 5 emsg = '' while count < count_max: count += 1 debug(f'FRR: Reloading configuration - tries: {count} | Python class ID: {id(self)}') cmdline = '/usr/lib/frr/frr-reload.py --reload' if DEBUG_ON: cmdline += ' --debug' rc, emsg = rc_cmd(f'{cmdline} {self._frr_conf}') if rc != 0: debug('FRR configuration reload failed, retrying') continue debug(emsg) debug('======< DONE APPLYING CONFIG >======') break if count >= count_max: raise ConfigError(emsg) def save_configuration(): """ T3217: Save FRR configuration to /run/frr/config/frr.conf """ return cmd('/usr/bin/vtysh -n --writeconfig') diff --git a/smoketest/scripts/cli/test_interfaces_bonding.py b/smoketest/scripts/cli/test_interfaces_bonding.py index 418744712..735e4f3c5 100755 --- a/smoketest/scripts/cli/test_interfaces_bonding.py +++ b/smoketest/scripts/cli/test_interfaces_bonding.py @@ -1,314 +1,314 @@ #!/usr/bin/env python3 # # Copyright (C) 2020-2023 VyOS maintainers and contributors # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License version 2 or later as # published by the Free Software Foundation. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. import os import unittest from base_interfaces_test import BasicInterfaceTest from vyos.ifconfig import Section from vyos.ifconfig.interface import Interface from vyos.configsession import ConfigSessionError from vyos.utils.network import get_interface_config from vyos.utils.file import read_file -from vyos.frr import mgmt_daemon +from vyos.frrender import mgmt_daemon class BondingInterfaceTest(BasicInterfaceTest.TestCase): @classmethod def setUpClass(cls): cls._base_path = ['interfaces', 'bonding'] cls._mirror_interfaces = ['dum21354'] cls._members = [] # we need to filter out VLAN interfaces identified by a dot (.) # in their name - just in case! if 'TEST_ETH' in os.environ: cls._members = os.environ['TEST_ETH'].split() else: for tmp in Section.interfaces('ethernet', vlan=False): cls._members.append(tmp) cls._options = {'bond0' : []} for member in cls._members: cls._options['bond0'].append(f'member interface {member}') cls._interfaces = list(cls._options) # call base-classes classmethod super(BondingInterfaceTest, cls).setUpClass() def test_add_single_ip_address(self): super().test_add_single_ip_address() for interface in self._interfaces: slaves = read_file(f'/sys/class/net/{interface}/bonding/slaves').split() self.assertListEqual(slaves, self._members) def test_vif_8021q_interfaces(self): super().test_vif_8021q_interfaces() for interface in self._interfaces: slaves = read_file(f'/sys/class/net/{interface}/bonding/slaves').split() self.assertListEqual(slaves, self._members) def test_bonding_remove_member(self): # T2515: when removing a bond member the previously enslaved/member # interface must be in its former admin-up/down state. Here we ensure # that it is admin-up as it was admin-up before. # configure member interfaces for interface in self._interfaces: for option in self._options.get(interface, []): self.cli_set(self._base_path + [interface] + option.split()) self.cli_commit() # remove single bond member port for interface in self._interfaces: remove_member = self._members[0] self.cli_delete(self._base_path + [interface, 'member', 'interface', remove_member]) self.cli_commit() # removed member port must be admin-up for interface in self._interfaces: remove_member = self._members[0] state = Interface(remove_member).get_admin_state() self.assertEqual('up', state) def test_bonding_min_links(self): # configure member interfaces min_links = len(self._interfaces) for interface in self._interfaces: for option in self._options.get(interface, []): self.cli_set(self._base_path + [interface] + option.split()) self.cli_set(self._base_path + [interface, 'min-links', str(min_links)]) self.cli_commit() # verify config for interface in self._interfaces: tmp = get_interface_config(interface) self.assertEqual(min_links, tmp['linkinfo']['info_data']['min_links']) # check LACP default rate self.assertEqual('slow', tmp['linkinfo']['info_data']['ad_lacp_rate']) def test_bonding_lacp_rate(self): # configure member interfaces lacp_rate = 'fast' for interface in self._interfaces: for option in self._options.get(interface, []): self.cli_set(self._base_path + [interface] + option.split()) self.cli_set(self._base_path + [interface, 'lacp-rate', lacp_rate]) self.cli_commit() # verify config for interface in self._interfaces: tmp = get_interface_config(interface) # check LACP minimum links (default value) self.assertEqual(0, tmp['linkinfo']['info_data']['min_links']) self.assertEqual(lacp_rate, tmp['linkinfo']['info_data']['ad_lacp_rate']) def test_bonding_hash_policy(self): # Define available bonding hash policies hash_policies = ['layer2', 'layer2+3', 'encap2+3', 'encap3+4'] for hash_policy in hash_policies: for interface in self._interfaces: for option in self._options.get(interface, []): self.cli_set(self._base_path + [interface] + option.split()) self.cli_set(self._base_path + [interface, 'hash-policy', hash_policy]) self.cli_commit() # verify config for interface in self._interfaces: defined_policy = read_file(f'/sys/class/net/{interface}/bonding/xmit_hash_policy').split() self.assertEqual(defined_policy[0], hash_policy) def test_bonding_mii_monitoring_interval(self): for interface in self._interfaces: for option in self._options.get(interface, []): self.cli_set(self._base_path + [interface] + option.split()) self.cli_commit() # verify default for interface in self._interfaces: tmp = read_file(f'/sys/class/net/{interface}/bonding/miimon').split() self.assertIn('100', tmp) mii_mon = '250' for interface in self._interfaces: self.cli_set(self._base_path + [interface, 'mii-mon-interval', mii_mon]) self.cli_commit() # verify new CLI value for interface in self._interfaces: tmp = read_file(f'/sys/class/net/{interface}/bonding/miimon').split() self.assertIn(mii_mon, tmp) def test_bonding_multi_use_member(self): # Define available bonding hash policies for interface in ['bond10', 'bond20']: for member in self._members: self.cli_set(self._base_path + [interface, 'member', 'interface', member]) # check validate() - can not use the same member interfaces multiple times with self.assertRaises(ConfigSessionError): self.cli_commit() self.cli_delete(self._base_path + ['bond20']) self.cli_commit() def test_bonding_source_interface(self): # Re-use member interface that is already a source-interface bond = 'bond99' pppoe = 'pppoe98756' member = next(iter(self._members)) self.cli_set(self._base_path + [bond, 'member', 'interface', member]) self.cli_set(['interfaces', 'pppoe', pppoe, 'source-interface', member]) # check validate() - can not add interface to bond, it is the source-interface of ... with self.assertRaises(ConfigSessionError): self.cli_commit() self.cli_delete(['interfaces', 'pppoe', pppoe]) self.cli_commit() # verify config slaves = read_file(f'/sys/class/net/{bond}/bonding/slaves').split() self.assertIn(member, slaves) def test_bonding_source_bridge_interface(self): # Re-use member interface that is already a source-interface bond = 'bond1097' bridge = 'br6327' member = next(iter(self._members)) self.cli_set(self._base_path + [bond, 'member', 'interface', member]) self.cli_set(['interfaces', 'bridge', bridge, 'member', 'interface', member]) # check validate() - can not add interface to bond, it is a member of bridge ... with self.assertRaises(ConfigSessionError): self.cli_commit() self.cli_delete(['interfaces', 'bridge', bridge]) self.cli_commit() # verify config slaves = read_file(f'/sys/class/net/{bond}/bonding/slaves').split() self.assertIn(member, slaves) def test_bonding_uniq_member_description(self): ethernet_path = ['interfaces', 'ethernet'] for interface in self._interfaces: for option in self._options.get(interface, []): self.cli_set(self._base_path + [interface] + option.split()) self.cli_commit() # Add any changes on bonding members # For example add description on separate ethX interfaces for interface in self._interfaces: for member in self._members: self.cli_set(ethernet_path + [member, 'description', member + '_interface']) self.cli_commit() # verify config for interface in self._interfaces: slaves = read_file(f'/sys/class/net/{interface}/bonding/slaves').split() for member in self._members: self.assertIn(member, slaves) def test_bonding_system_mac(self): # configure member interfaces and system-mac default_system_mac = '00:00:00:00:00:00' # default MAC is all zeroes system_mac = '00:50:ab:cd:ef:11' for interface in self._interfaces: for option in self._options.get(interface, []): self.cli_set(self._base_path + [interface] + option.split()) self.cli_set(self._base_path + [interface, 'system-mac', system_mac]) self.cli_commit() # verify config for interface in self._interfaces: tmp = read_file(f'/sys/class/net/{interface}/bonding/ad_actor_system') self.assertIn(tmp, system_mac) for interface in self._interfaces: self.cli_delete(self._base_path + [interface, 'system-mac']) self.cli_commit() # verify default value for interface in self._interfaces: tmp = read_file(f'/sys/class/net/{interface}/bonding/ad_actor_system') self.assertIn(tmp, default_system_mac) def test_bonding_evpn_multihoming(self): id = '5' for interface in self._interfaces: for option in self._options.get(interface, []): self.cli_set(self._base_path + [interface] + option.split()) self.cli_set(self._base_path + [interface, 'evpn', 'es-id', id]) self.cli_set(self._base_path + [interface, 'evpn', 'es-df-pref', id]) self.cli_set(self._base_path + [interface, 'evpn', 'es-sys-mac', f'00:12:34:56:78:0{id}']) self.cli_set(self._base_path + [interface, 'evpn', 'uplink']) id = int(id) + 1 self.cli_commit() id = '5' for interface in self._interfaces: frrconfig = self.getFRRconfig(f'interface {interface}', daemon=mgmt_daemon) self.assertIn(f' evpn mh es-id {id}', frrconfig) self.assertIn(f' evpn mh es-df-pref {id}', frrconfig) self.assertIn(f' evpn mh es-sys-mac 00:12:34:56:78:0{id}', frrconfig) self.assertIn(f' evpn mh uplink', frrconfig) id = int(id) + 1 for interface in self._interfaces: self.cli_delete(self._base_path + [interface, 'evpn', 'es-id']) self.cli_delete(self._base_path + [interface, 'evpn', 'es-df-pref']) self.cli_commit() id = '5' for interface in self._interfaces: frrconfig = self.getFRRconfig(f'interface {interface}', daemon=mgmt_daemon) self.assertIn(f' evpn mh es-sys-mac 00:12:34:56:78:0{id}', frrconfig) self.assertIn(f' evpn mh uplink', frrconfig) id = int(id) + 1 if __name__ == '__main__': unittest.main(verbosity=2) diff --git a/smoketest/scripts/cli/test_interfaces_ethernet.py b/smoketest/scripts/cli/test_interfaces_ethernet.py index 218fa0759..c02ca613b 100755 --- a/smoketest/scripts/cli/test_interfaces_ethernet.py +++ b/smoketest/scripts/cli/test_interfaces_ethernet.py @@ -1,227 +1,227 @@ #!/usr/bin/env python3 # # Copyright (C) 2020-2024 VyOS maintainers and contributors # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License version 2 or later as # published by the Free Software Foundation. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. import os import unittest from glob import glob from json import loads from netifaces import AF_INET from netifaces import AF_INET6 from netifaces import ifaddresses from base_interfaces_test import BasicInterfaceTest from vyos.configsession import ConfigSessionError from vyos.ifconfig import Section -from vyos.frr import mgmt_daemon +from vyos.frrender import mgmt_daemon from vyos.utils.process import cmd from vyos.utils.process import popen from vyos.utils.file import read_file from vyos.utils.network import is_ipv6_link_local class EthernetInterfaceTest(BasicInterfaceTest.TestCase): @classmethod def setUpClass(cls): cls._base_path = ['interfaces', 'ethernet'] cls._mirror_interfaces = ['dum21354'] # We only test on physical interfaces and not VLAN (sub-)interfaces if 'TEST_ETH' in os.environ: tmp = os.environ['TEST_ETH'].split() cls._interfaces = tmp else: for tmp in Section.interfaces('ethernet', vlan=False): cls._interfaces.append(tmp) cls._macs = {} for interface in cls._interfaces: cls._macs[interface] = read_file(f'/sys/class/net/{interface}/address') # call base-classes classmethod super(EthernetInterfaceTest, cls).setUpClass() def tearDown(self): for interface in self._interfaces: # when using a dedicated interface to test via TEST_ETH environment # variable only this one will be cleared in the end - usable to test # ethernet interfaces via SSH self.cli_delete(self._base_path + [interface]) self.cli_set(self._base_path + [interface, 'duplex', 'auto']) self.cli_set(self._base_path + [interface, 'speed', 'auto']) self.cli_set(self._base_path + [interface, 'hw-id', self._macs[interface]]) self.cli_commit() # Verify that no address remains on the system as this is an eternal # interface. for interface in self._interfaces: self.assertNotIn(AF_INET, ifaddresses(interface)) # required for IPv6 link-local address self.assertIn(AF_INET6, ifaddresses(interface)) for addr in ifaddresses(interface)[AF_INET6]: # checking link local addresses makes no sense if is_ipv6_link_local(addr['addr']): continue self.assertFalse(is_intf_addr_assigned(interface, addr['addr'])) # Ensure no VLAN interfaces are left behind tmp = [x for x in Section.interfaces('ethernet') if x.startswith(f'{interface}.')] self.assertListEqual(tmp, []) def test_offloading_rps(self): # enable RPS on all available CPUs, RPS works with a CPU bitmask, # where each bit represents a CPU (core/thread). The formula below # expands to rps_cpus = 255 for a 8 core system rps_cpus = (1 << os.cpu_count()) -1 # XXX: we should probably reserve one core when the system is under # high preasure so we can still have a core left for housekeeping. # This is done by masking out the lowst bit so CPU0 is spared from # receive packet steering. rps_cpus &= ~1 for interface in self._interfaces: self.cli_set(self._base_path + [interface, 'offload', 'rps']) self.cli_commit() for interface in self._interfaces: cpus = read_file(f'/sys/class/net/{interface}/queues/rx-0/rps_cpus') # remove the nasty ',' separation on larger strings cpus = cpus.replace(',','') cpus = int(cpus, 16) self.assertEqual(f'{cpus:x}', f'{rps_cpus:x}') def test_offloading_rfs(self): global_rfs_flow = 32768 rfs_flow = global_rfs_flow for interface in self._interfaces: self.cli_set(self._base_path + [interface, 'offload', 'rfs']) self.cli_commit() for interface in self._interfaces: queues = len(glob(f'/sys/class/net/{interface}/queues/rx-*')) rfs_flow = int(global_rfs_flow/queues) for i in range(0, queues): tmp = read_file(f'/sys/class/net/{interface}/queues/rx-{i}/rps_flow_cnt') self.assertEqual(int(tmp), rfs_flow) tmp = read_file(f'/proc/sys/net/core/rps_sock_flow_entries') self.assertEqual(int(tmp), global_rfs_flow) # delete configuration of RFS and check all values returned to default "0" for interface in self._interfaces: self.cli_delete(self._base_path + [interface, 'offload', 'rfs']) self.cli_commit() for interface in self._interfaces: queues = len(glob(f'/sys/class/net/{interface}/queues/rx-*')) rfs_flow = int(global_rfs_flow/queues) for i in range(0, queues): tmp = read_file(f'/sys/class/net/{interface}/queues/rx-{i}/rps_flow_cnt') self.assertEqual(int(tmp), 0) def test_non_existing_interface(self): unknonw_interface = self._base_path + ['eth667'] self.cli_set(unknonw_interface) # check validate() - interface does not exist with self.assertRaises(ConfigSessionError): self.cli_commit() # we need to remove this wrong interface from the configuration # manually, else tearDown() will have problem in commit() self.cli_delete(unknonw_interface) def test_speed_duplex_verify(self): for interface in self._interfaces: self.cli_set(self._base_path + [interface, 'speed', '1000']) # check validate() - if either speed or duplex is not auto, the # other one must be manually configured, too with self.assertRaises(ConfigSessionError): self.cli_commit() self.cli_set(self._base_path + [interface, 'speed', 'auto']) self.cli_commit() def test_ethtool_ring_buffer(self): for interface in self._interfaces: # We do not use vyos.ethtool here to not have any chance # for invalid testcases. Re-gain data by hand tmp = cmd(f'sudo ethtool --json --show-ring {interface}') tmp = loads(tmp) max_rx = str(tmp[0]['rx-max']) max_tx = str(tmp[0]['tx-max']) self.cli_set(self._base_path + [interface, 'ring-buffer', 'rx', max_rx]) self.cli_set(self._base_path + [interface, 'ring-buffer', 'tx', max_tx]) self.cli_commit() for interface in self._interfaces: tmp = cmd(f'sudo ethtool --json --show-ring {interface}') tmp = loads(tmp) max_rx = str(tmp[0]['rx-max']) max_tx = str(tmp[0]['tx-max']) rx = str(tmp[0]['rx']) tx = str(tmp[0]['tx']) # validate if the above change was carried out properly and the # ring-buffer size got increased self.assertEqual(max_rx, rx) self.assertEqual(max_tx, tx) def test_ethtool_flow_control(self): for interface in self._interfaces: # Disable flow-control self.cli_set(self._base_path + [interface, 'disable-flow-control']) # Check current flow-control state on ethernet interface out, err = popen(f'sudo ethtool --json --show-pause {interface}') # Flow-control not supported - test if it bails out with a proper # this is a dynamic path where err = 1 on VMware, but err = 0 on # a physical box. if bool(err): with self.assertRaises(ConfigSessionError): self.cli_commit() else: out = loads(out) # Flow control is on self.assertTrue(out[0]['autonegotiate']) # commit change on CLI to disable-flow-control and re-test self.cli_commit() out, err = popen(f'sudo ethtool --json --show-pause {interface}') out = loads(out) self.assertFalse(out[0]['autonegotiate']) def test_ethtool_evpn_uplink_tarcking(self): for interface in self._interfaces: self.cli_set(self._base_path + [interface, 'evpn', 'uplink']) self.cli_commit() for interface in self._interfaces: frrconfig = self.getFRRconfig(f'interface {interface}', daemon=mgmt_daemon) self.assertIn(f' evpn mh uplink', frrconfig) if __name__ == '__main__': unittest.main(verbosity=2) diff --git a/smoketest/scripts/cli/test_protocols_bgp.py b/smoketest/scripts/cli/test_protocols_bgp.py index 6eeac37a7..1b6c30dfe 100755 --- a/smoketest/scripts/cli/test_protocols_bgp.py +++ b/smoketest/scripts/cli/test_protocols_bgp.py @@ -1,1415 +1,1415 @@ #!/usr/bin/env python3 # # Copyright (C) 2021-2024 VyOS maintainers and contributors # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License version 2 or later as # published by the Free Software Foundation. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. import unittest from time import sleep from base_vyostest_shim import VyOSUnitTestSHIM from vyos.ifconfig import Section from vyos.configsession import ConfigSessionError from vyos.template import is_ipv6 from vyos.utils.process import process_named_running from vyos.utils.process import cmd -from vyos.frr import bgp_daemon +from vyos.frrender import bgp_daemon PROCESS_NAME = 'bgpd' ASN = '64512' base_path = ['protocols', 'bgp'] route_map_in = 'foo-map-in' route_map_out = 'foo-map-out' prefix_list_in = 'pfx-foo-in' prefix_list_out = 'pfx-foo-out' prefix_list_in6 = 'pfx-foo-in6' prefix_list_out6 = 'pfx-foo-out6' bfd_profile = 'foo-bar-baz' import_afi = 'ipv4-unicast' import_vrf = 'red' import_rd = ASN + ':100' import_vrf_base = ['vrf', 'name'] neighbor_config = { '192.0.2.1' : { 'bfd' : '', 'cap_dynamic' : '', 'cap_ext_next' : '', 'cap_ext_sver' : '', 'remote_as' : '100', 'adv_interv' : '400', 'passive' : '', 'password' : 'VyOS-Secure123', 'shutdown' : '', 'cap_over' : '', 'ttl_security' : '5', 'system_as' : '300', 'route_map_in' : route_map_in, 'route_map_out' : route_map_out, 'no_send_comm_ext' : '', 'addpath_all' : '', 'p_attr_discard' : ['10', '20', '30', '40', '50'], }, '192.0.2.2' : { 'bfd_profile' : bfd_profile, 'remote_as' : '200', 'shutdown' : '', 'no_cap_nego' : '', 'port' : '667', 'cap_strict' : '', 'advertise_map' : route_map_in, 'non_exist_map' : route_map_out, 'pfx_list_in' : prefix_list_in, 'pfx_list_out' : prefix_list_out, 'no_send_comm_std' : '', 'local_role' : 'rs-client', 'p_attr_taw' : '200', }, '192.0.2.3' : { 'advertise_map' : route_map_in, 'description' : 'foo bar baz', 'remote_as' : '200', 'passive' : '', 'multi_hop' : '5', 'update_src' : 'lo', 'peer_group' : 'foo', 'graceful_rst' : '', }, '2001:db8::1' : { 'advertise_map' : route_map_in, 'exist_map' : route_map_out, 'cap_dynamic' : '', 'cap_ext_next' : '', 'cap_ext_sver' : '', 'remote_as' : '123', 'adv_interv' : '400', 'passive' : '', 'password' : 'VyOS-Secure123', 'shutdown' : '', 'cap_over' : '', 'ttl_security' : '5', 'system_as' : '300', 'solo' : '', 'route_map_in' : route_map_in, 'route_map_out' : route_map_out, 'no_send_comm_std' : '', 'addpath_per_as' : '', 'peer_group' : 'foo-bar', 'local_role' : 'customer', 'local_role_strict': '', }, '2001:db8::2' : { 'remote_as' : '456', 'shutdown' : '', 'no_cap_nego' : '', 'port' : '667', 'cap_strict' : '', 'pfx_list_in' : prefix_list_in6, 'pfx_list_out' : prefix_list_out6, 'no_send_comm_ext' : '', 'peer_group' : 'foo-bar_baz', 'graceful_rst_hlp' : '', 'disable_conn_chk' : '', }, } peer_group_config = { 'foo' : { 'advertise_map' : route_map_in, 'exist_map' : route_map_out, 'bfd' : '', 'remote_as' : '100', 'passive' : '', 'password' : 'VyOS-Secure123', 'shutdown' : '', 'cap_over' : '', 'ttl_security' : '5', 'disable_conn_chk' : '', 'p_attr_discard' : ['100', '150', '200'], }, 'bar' : { 'remote_as' : '111', 'graceful_rst_no' : '', 'port' : '667', 'p_attr_taw' : '126', }, 'foo-bar' : { 'advertise_map' : route_map_in, 'description' : 'foo peer bar group', 'remote_as' : '200', 'shutdown' : '', 'no_cap_nego' : '', 'system_as' : '300', 'pfx_list_in' : prefix_list_in, 'pfx_list_out' : prefix_list_out, 'no_send_comm_ext' : '', }, 'foo-bar_baz' : { 'advertise_map' : route_map_in, 'non_exist_map' : route_map_out, 'bfd_profile' : bfd_profile, 'cap_dynamic' : '', 'cap_ext_next' : '', 'remote_as' : '200', 'passive' : '', 'multi_hop' : '5', 'update_src' : 'lo', 'route_map_in' : route_map_in, 'route_map_out' : route_map_out, 'local_role' : 'peer', 'local_role_strict': '', }, } class TestProtocolsBGP(VyOSUnitTestSHIM.TestCase): @classmethod def setUpClass(cls): super(TestProtocolsBGP, cls).setUpClass() # Retrieve FRR daemon PID - it is not allowed to crash, thus PID must remain the same cls.daemon_pid = process_named_running(PROCESS_NAME) # ensure we can also run this test on a live system - so lets clean # out the current configuration :) cls.cli_delete(cls, base_path) cls.cli_delete(cls, ['policy', 'route-map']) cls.cli_delete(cls, ['policy', 'prefix-list']) cls.cli_delete(cls, ['policy', 'prefix-list6']) cls.cli_delete(cls, ['vrf']) cls.cli_set(cls, ['policy', 'route-map', route_map_in, 'rule', '10', 'action', 'permit']) cls.cli_set(cls, ['policy', 'route-map', route_map_out, 'rule', '10', 'action', 'permit']) cls.cli_set(cls, ['policy', 'prefix-list', prefix_list_in, 'rule', '10', 'action', 'permit']) cls.cli_set(cls, ['policy', 'prefix-list', prefix_list_in, 'rule', '10', 'prefix', '192.0.2.0/25']) cls.cli_set(cls, ['policy', 'prefix-list', prefix_list_out, 'rule', '10', 'action', 'permit']) cls.cli_set(cls, ['policy', 'prefix-list', prefix_list_out, 'rule', '10', 'prefix', '192.0.2.128/25']) cls.cli_set(cls, ['policy', 'prefix-list6', prefix_list_in6, 'rule', '10', 'action', 'permit']) cls.cli_set(cls, ['policy', 'prefix-list6', prefix_list_in6, 'rule', '10', 'prefix', '2001:db8:1000::/64']) cls.cli_set(cls, ['policy', 'prefix-list6', prefix_list_out6, 'rule', '10', 'action', 'deny']) cls.cli_set(cls, ['policy', 'prefix-list6', prefix_list_out6, 'rule', '10', 'prefix', '2001:db8:2000::/64']) @classmethod def tearDownClass(cls): cls.cli_delete(cls, ['policy', 'route-map']) cls.cli_delete(cls, ['policy', 'prefix-list']) cls.cli_delete(cls, ['policy', 'prefix-list6']) def setUp(self): self.cli_set(base_path + ['system-as', ASN]) def tearDown(self): # cleanup any possible VRF mess self.cli_delete(['vrf']) # always destrox the entire bgpd configuration to make the processes # life as hard as possible self.cli_delete(base_path) self.cli_commit() frrconfig = self.getFRRconfig('router bgp') self.assertNotIn(f'router bgp', frrconfig) # check process health and continuity self.assertEqual(self.daemon_pid, process_named_running(PROCESS_NAME)) def create_bgp_instances_for_import_test(self): table = '1000' self.cli_set(import_vrf_base + [import_vrf, 'table', table]) self.cli_set(import_vrf_base + [import_vrf, 'protocols', 'bgp', 'system-as', ASN]) def verify_frr_config(self, peer, peer_config, frrconfig): # recurring patterns to verify for both a simple neighbor and a peer-group if 'bfd' in peer_config: self.assertIn(f' neighbor {peer} bfd', frrconfig) if 'bfd_profile' in peer_config: self.assertIn(f' neighbor {peer} bfd profile {peer_config["bfd_profile"]}', frrconfig) self.assertIn(f' neighbor {peer} bfd check-control-plane-failure', frrconfig) if 'cap_dynamic' in peer_config: self.assertIn(f' neighbor {peer} capability dynamic', frrconfig) if 'cap_ext_next' in peer_config: self.assertIn(f' neighbor {peer} capability extended-nexthop', frrconfig) if 'cap_ext_sver' in peer_config: self.assertIn(f' neighbor {peer} capability software-version', frrconfig) if 'description' in peer_config: self.assertIn(f' neighbor {peer} description {peer_config["description"]}', frrconfig) if 'no_cap_nego' in peer_config: self.assertIn(f' neighbor {peer} dont-capability-negotiate', frrconfig) if 'multi_hop' in peer_config: self.assertIn(f' neighbor {peer} ebgp-multihop {peer_config["multi_hop"]}', frrconfig) if 'local_as' in peer_config: self.assertIn(f' neighbor {peer} local-as {peer_config["local_as"]} no-prepend replace-as', frrconfig) if 'local_role' in peer_config: tmp = f' neighbor {peer} local-role {peer_config["local_role"]}' if 'local_role_strict' in peer_config: tmp += ' strict' self.assertIn(tmp, frrconfig) if 'cap_over' in peer_config: self.assertIn(f' neighbor {peer} override-capability', frrconfig) if 'passive' in peer_config: self.assertIn(f' neighbor {peer} passive', frrconfig) if 'password' in peer_config: self.assertIn(f' neighbor {peer} password {peer_config["password"]}', frrconfig) if 'port' in peer_config: self.assertIn(f' neighbor {peer} port {peer_config["port"]}', frrconfig) if 'remote_as' in peer_config: self.assertIn(f' neighbor {peer} remote-as {peer_config["remote_as"]}', frrconfig) if 'solo' in peer_config: self.assertIn(f' neighbor {peer} solo', frrconfig) if 'shutdown' in peer_config: self.assertIn(f' neighbor {peer} shutdown', frrconfig) if 'ttl_security' in peer_config: self.assertIn(f' neighbor {peer} ttl-security hops {peer_config["ttl_security"]}', frrconfig) if 'update_src' in peer_config: self.assertIn(f' neighbor {peer} update-source {peer_config["update_src"]}', frrconfig) if 'route_map_in' in peer_config: self.assertIn(f' neighbor {peer} route-map {peer_config["route_map_in"]} in', frrconfig) if 'route_map_out' in peer_config: self.assertIn(f' neighbor {peer} route-map {peer_config["route_map_out"]} out', frrconfig) if 'pfx_list_in' in peer_config: self.assertIn(f' neighbor {peer} prefix-list {peer_config["pfx_list_in"]} in', frrconfig) if 'pfx_list_out' in peer_config: self.assertIn(f' neighbor {peer} prefix-list {peer_config["pfx_list_out"]} out', frrconfig) if 'no_send_comm_std' in peer_config: self.assertIn(f' no neighbor {peer} send-community', frrconfig) if 'no_send_comm_ext' in peer_config: self.assertIn(f' no neighbor {peer} send-community extended', frrconfig) if 'addpath_all' in peer_config: self.assertIn(f' neighbor {peer} addpath-tx-all-paths', frrconfig) if 'p_attr_discard' in peer_config: tmp = ' '.join(peer_config["p_attr_discard"]) self.assertIn(f' neighbor {peer} path-attribute discard {tmp}', frrconfig) if 'p_attr_taw' in peer_config: self.assertIn(f' neighbor {peer} path-attribute treat-as-withdraw {peer_config["p_attr_taw"]}', frrconfig) if 'addpath_per_as' in peer_config: self.assertIn(f' neighbor {peer} addpath-tx-bestpath-per-AS', frrconfig) if 'advertise_map' in peer_config: base = f' neighbor {peer} advertise-map {peer_config["advertise_map"]}' if 'exist_map' in peer_config: base = f'{base} exist-map {peer_config["exist_map"]}' if 'non_exist_map' in peer_config: base = f'{base} non-exist-map {peer_config["non_exist_map"]}' self.assertIn(base, frrconfig) if 'graceful_rst' in peer_config: self.assertIn(f' neighbor {peer} graceful-restart', frrconfig) if 'graceful_rst_no' in peer_config: self.assertIn(f' neighbor {peer} graceful-restart-disable', frrconfig) if 'graceful_rst_hlp' in peer_config: self.assertIn(f' neighbor {peer} graceful-restart-helper', frrconfig) if 'disable_conn_chk' in peer_config: self.assertIn(f' neighbor {peer} disable-connected-check', frrconfig) def test_bgp_01_simple(self): router_id = '127.0.0.1' local_pref = '500' stalepath_time = '60' max_path_v4 = '2' max_path_v4ibgp = '4' max_path_v6 = '8' max_path_v6ibgp = '16' cond_adv_timer = '30' min_hold_time = '2' tcp_keepalive_idle = '66' tcp_keepalive_interval = '77' tcp_keepalive_probes = '22' self.cli_set(base_path + ['parameters', 'allow-martian-nexthop']) self.cli_set(base_path + ['parameters', 'disable-ebgp-connected-route-check']) self.cli_set(base_path + ['parameters', 'no-hard-administrative-reset']) self.cli_set(base_path + ['parameters', 'log-neighbor-changes']) self.cli_set(base_path + ['parameters', 'labeled-unicast', 'explicit-null']) self.cli_set(base_path + ['parameters', 'router-id', router_id]) # System AS number MUST be defined - as this is set in setUp() we remove # this once for testing of the proper error self.cli_delete(base_path + ['system-as']) with self.assertRaises(ConfigSessionError): self.cli_commit() self.cli_set(base_path + ['system-as', ASN]) # Default local preference (higher = more preferred, default value is 100) self.cli_set(base_path + ['parameters', 'default', 'local-pref', local_pref]) self.cli_set(base_path + ['parameters', 'graceful-restart', 'stalepath-time', stalepath_time]) self.cli_set(base_path + ['parameters', 'graceful-shutdown']) self.cli_set(base_path + ['parameters', 'ebgp-requires-policy']) self.cli_set(base_path + ['parameters', 'bestpath', 'as-path', 'multipath-relax']) self.cli_set(base_path + ['parameters', 'bestpath', 'bandwidth', 'default-weight-for-missing']) self.cli_set(base_path + ['parameters', 'bestpath', 'compare-routerid']) self.cli_set(base_path + ['parameters', 'bestpath', 'peer-type', 'multipath-relax']) self.cli_set(base_path + ['parameters', 'conditional-advertisement', 'timer', cond_adv_timer]) self.cli_set(base_path + ['parameters', 'fast-convergence']) self.cli_set(base_path + ['parameters', 'minimum-holdtime', min_hold_time]) self.cli_set(base_path + ['parameters', 'no-suppress-duplicates']) self.cli_set(base_path + ['parameters', 'reject-as-sets']) self.cli_set(base_path + ['parameters', 'route-reflector-allow-outbound-policy']) self.cli_set(base_path + ['parameters', 'shutdown']) self.cli_set(base_path + ['parameters', 'suppress-fib-pending']) self.cli_set(base_path + ['parameters', 'tcp-keepalive', 'idle', tcp_keepalive_idle]) self.cli_set(base_path + ['parameters', 'tcp-keepalive', 'interval', tcp_keepalive_interval]) self.cli_set(base_path + ['parameters', 'tcp-keepalive', 'probes', tcp_keepalive_probes]) # AFI maximum path support self.cli_set(base_path + ['address-family', 'ipv4-unicast', 'maximum-paths', 'ebgp', max_path_v4]) self.cli_set(base_path + ['address-family', 'ipv4-unicast', 'maximum-paths', 'ibgp', max_path_v4ibgp]) self.cli_set(base_path + ['address-family', 'ipv4-labeled-unicast', 'maximum-paths', 'ebgp', max_path_v4]) self.cli_set(base_path + ['address-family', 'ipv4-labeled-unicast', 'maximum-paths', 'ibgp', max_path_v4ibgp]) self.cli_set(base_path + ['address-family', 'ipv6-unicast', 'maximum-paths', 'ebgp', max_path_v6]) self.cli_set(base_path + ['address-family', 'ipv6-unicast', 'maximum-paths', 'ibgp', max_path_v6ibgp]) # commit changes self.cli_commit() # Verify FRR bgpd configuration frrconfig = self.getFRRconfig(f'router bgp {ASN}') self.assertIn(f'router bgp {ASN}', frrconfig) self.assertIn(f' bgp router-id {router_id}', frrconfig) self.assertIn(f' bgp allow-martian-nexthop', frrconfig) self.assertIn(f' bgp disable-ebgp-connected-route-check', frrconfig) self.assertIn(f' bgp log-neighbor-changes', frrconfig) self.assertIn(f' bgp default local-preference {local_pref}', frrconfig) self.assertIn(f' bgp conditional-advertisement timer {cond_adv_timer}', frrconfig) self.assertIn(f' bgp fast-convergence', frrconfig) self.assertIn(f' bgp graceful-restart stalepath-time {stalepath_time}', frrconfig) self.assertIn(f' bgp graceful-shutdown', frrconfig) self.assertIn(f' no bgp hard-administrative-reset', frrconfig) self.assertIn(f' bgp labeled-unicast explicit-null', frrconfig) self.assertIn(f' bgp bestpath as-path multipath-relax', frrconfig) self.assertIn(f' bgp bestpath bandwidth default-weight-for-missing', frrconfig) self.assertIn(f' bgp bestpath compare-routerid', frrconfig) self.assertIn(f' bgp bestpath peer-type multipath-relax', frrconfig) self.assertIn(f' bgp minimum-holdtime {min_hold_time}', frrconfig) self.assertIn(f' bgp reject-as-sets', frrconfig) self.assertIn(f' bgp route-reflector allow-outbound-policy', frrconfig) self.assertIn(f' bgp shutdown', frrconfig) self.assertIn(f' bgp suppress-fib-pending', frrconfig) self.assertIn(f' bgp tcp-keepalive {tcp_keepalive_idle} {tcp_keepalive_interval} {tcp_keepalive_probes}', frrconfig) self.assertNotIn(f'bgp ebgp-requires-policy', frrconfig) self.assertIn(f' no bgp suppress-duplicates', frrconfig) afiv4_config = self.getFRRconfig(' address-family ipv4 unicast') self.assertIn(f' maximum-paths {max_path_v4}', afiv4_config) self.assertIn(f' maximum-paths ibgp {max_path_v4ibgp}', afiv4_config) afiv4_config = self.getFRRconfig(' address-family ipv4 labeled-unicast') self.assertIn(f' maximum-paths {max_path_v4}', afiv4_config) self.assertIn(f' maximum-paths ibgp {max_path_v4ibgp}', afiv4_config) afiv6_config = self.getFRRconfig(' address-family ipv6 unicast') self.assertIn(f' maximum-paths {max_path_v6}', afiv6_config) self.assertIn(f' maximum-paths ibgp {max_path_v6ibgp}', afiv6_config) def test_bgp_02_neighbors(self): # Test out individual neighbor configuration items, not all of them are # also available to a peer-group! self.cli_set(base_path + ['parameters', 'deterministic-med']) for peer, peer_config in neighbor_config.items(): afi = 'ipv4-unicast' if is_ipv6(peer): afi = 'ipv6-unicast' if 'adv_interv' in peer_config: self.cli_set(base_path + ['neighbor', peer, 'advertisement-interval', peer_config["adv_interv"]]) if 'bfd' in peer_config: self.cli_set(base_path + ['neighbor', peer, 'bfd']) if 'bfd_profile' in peer_config: self.cli_set(base_path + ['neighbor', peer, 'bfd', 'profile', peer_config["bfd_profile"]]) self.cli_set(base_path + ['neighbor', peer, 'bfd', 'check-control-plane-failure']) if 'cap_dynamic' in peer_config: self.cli_set(base_path + ['neighbor', peer, 'capability', 'dynamic']) if 'cap_ext_next' in peer_config: self.cli_set(base_path + ['neighbor', peer, 'capability', 'extended-nexthop']) if 'cap_ext_sver' in peer_config: self.cli_set(base_path + ['neighbor', peer, 'capability', 'software-version']) if 'description' in peer_config: self.cli_set(base_path + ['neighbor', peer, 'description', peer_config["description"]]) if 'no_cap_nego' in peer_config: self.cli_set(base_path + ['neighbor', peer, 'disable-capability-negotiation']) if 'multi_hop' in peer_config: self.cli_set(base_path + ['neighbor', peer, 'ebgp-multihop', peer_config["multi_hop"]]) if 'local_as' in peer_config: self.cli_set(base_path + ['neighbor', peer, 'local-as', peer_config["local_as"], 'no-prepend', 'replace-as']) if 'local_role' in peer_config: self.cli_set(base_path + ['neighbor', peer, 'local-role', peer_config["local_role"]]) if 'local_role_strict' in peer_config: self.cli_set(base_path + ['neighbor', peer, 'local-role', peer_config["local_role"], 'strict']) if 'cap_over' in peer_config: self.cli_set(base_path + ['neighbor', peer, 'override-capability']) if 'passive' in peer_config: self.cli_set(base_path + ['neighbor', peer, 'passive']) if 'password' in peer_config: self.cli_set(base_path + ['neighbor', peer, 'password', peer_config["password"]]) if 'port' in peer_config: self.cli_set(base_path + ['neighbor', peer, 'port', peer_config["port"]]) if 'remote_as' in peer_config: self.cli_set(base_path + ['neighbor', peer, 'remote-as', peer_config["remote_as"]]) if 'cap_strict' in peer_config: self.cli_set(base_path + ['neighbor', peer, 'strict-capability-match']) if 'shutdown' in peer_config: self.cli_set(base_path + ['neighbor', peer, 'shutdown']) if 'solo' in peer_config: self.cli_set(base_path + ['neighbor', peer, 'solo']) if 'ttl_security' in peer_config: self.cli_set(base_path + ['neighbor', peer, 'ttl-security', 'hops', peer_config["ttl_security"]]) if 'update_src' in peer_config: self.cli_set(base_path + ['neighbor', peer, 'update-source', peer_config["update_src"]]) if 'p_attr_discard' in peer_config: for attribute in peer_config['p_attr_discard']: self.cli_set(base_path + ['neighbor', peer, 'path-attribute', 'discard', attribute]) if 'p_attr_taw' in peer_config: self.cli_set(base_path + ['neighbor', peer, 'path-attribute', 'treat-as-withdraw', peer_config["p_attr_taw"]]) if 'route_map_in' in peer_config: self.cli_set(base_path + ['neighbor', peer, 'address-family', afi, 'route-map', 'import', peer_config["route_map_in"]]) if 'route_map_out' in peer_config: self.cli_set(base_path + ['neighbor', peer, 'address-family', afi, 'route-map', 'export', peer_config["route_map_out"]]) if 'pfx_list_in' in peer_config: self.cli_set(base_path + ['neighbor', peer, 'address-family', afi, 'prefix-list', 'import', peer_config["pfx_list_in"]]) if 'pfx_list_out' in peer_config: self.cli_set(base_path + ['neighbor', peer, 'address-family', afi, 'prefix-list', 'export', peer_config["pfx_list_out"]]) if 'no_send_comm_std' in peer_config: self.cli_set(base_path + ['neighbor', peer, 'address-family', afi, 'disable-send-community', 'standard']) if 'no_send_comm_ext' in peer_config: self.cli_set(base_path + ['neighbor', peer, 'address-family', afi, 'disable-send-community', 'extended']) if 'addpath_all' in peer_config: self.cli_set(base_path + ['neighbor', peer, 'address-family', afi, 'addpath-tx-all']) if 'addpath_per_as' in peer_config: self.cli_set(base_path + ['neighbor', peer, 'address-family', afi, 'addpath-tx-per-as']) if 'graceful_rst' in peer_config: self.cli_set(base_path + ['neighbor', peer, 'graceful-restart', 'enable']) if 'graceful_rst_no' in peer_config: self.cli_set(base_path + ['neighbor', peer, 'graceful-restart', 'disable']) if 'graceful_rst_hlp' in peer_config: self.cli_set(base_path + ['neighbor', peer, 'graceful-restart', 'restart-helper']) if 'disable_conn_chk' in peer_config: self.cli_set(base_path + ['neighbor', peer, 'disable-connected-check']) # Conditional advertisement if 'advertise_map' in peer_config: self.cli_set(base_path + ['neighbor', peer, 'address-family', afi, 'conditionally-advertise', 'advertise-map', peer_config["advertise_map"]]) # Either exist-map or non-exist-map needs to be specified if 'exist_map' not in peer_config and 'non_exist_map' not in peer_config: with self.assertRaises(ConfigSessionError): self.cli_commit() self.cli_set(base_path + ['neighbor', peer, 'address-family', afi, 'conditionally-advertise', 'exist-map', route_map_in]) if 'exist_map' in peer_config: self.cli_set(base_path + ['neighbor', peer, 'address-family', afi, 'conditionally-advertise', 'exist-map', peer_config["exist_map"]]) if 'non_exist_map' in peer_config: self.cli_set(base_path + ['neighbor', peer, 'address-family', afi, 'conditionally-advertise', 'non-exist-map', peer_config["non_exist_map"]]) # commit changes self.cli_commit() # Verify FRR bgpd configuration frrconfig = self.getFRRconfig(f'router bgp {ASN}') self.assertIn(f'router bgp {ASN}', frrconfig) for peer, peer_config in neighbor_config.items(): if 'adv_interv' in peer_config: self.assertIn(f' neighbor {peer} advertisement-interval {peer_config["adv_interv"]}', frrconfig) if 'cap_strict' in peer_config: self.assertIn(f' neighbor {peer} strict-capability-match', frrconfig) self.verify_frr_config(peer, peer_config, frrconfig) def test_bgp_03_peer_groups(self): # Test out individual peer-group configuration items for peer_group, config in peer_group_config.items(): if 'bfd' in config: self.cli_set(base_path + ['peer-group', peer_group, 'bfd']) if 'bfd_profile' in config: self.cli_set(base_path + ['peer-group', peer_group, 'bfd', 'profile', config["bfd_profile"]]) self.cli_set(base_path + ['peer-group', peer_group, 'bfd', 'check-control-plane-failure']) if 'cap_dynamic' in config: self.cli_set(base_path + ['peer-group', peer_group, 'capability', 'dynamic']) if 'cap_ext_next' in config: self.cli_set(base_path + ['peer-group', peer_group, 'capability', 'extended-nexthop']) if 'cap_ext_sver' in config: self.cli_set(base_path + ['peer-group', peer_group, 'capability', 'software-version']) if 'description' in config: self.cli_set(base_path + ['peer-group', peer_group, 'description', config["description"]]) if 'no_cap_nego' in config: self.cli_set(base_path + ['peer-group', peer_group, 'disable-capability-negotiation']) if 'multi_hop' in config: self.cli_set(base_path + ['peer-group', peer_group, 'ebgp-multihop', config["multi_hop"]]) if 'local_as' in config: self.cli_set(base_path + ['peer-group', peer_group, 'local-as', config["local_as"], 'no-prepend', 'replace-as']) if 'local_role' in config: self.cli_set(base_path + ['peer-group', peer_group, 'local-role', config["local_role"]]) if 'local_role_strict' in config: self.cli_set(base_path + ['peer-group', peer_group, 'local-role', config["local_role"], 'strict']) if 'cap_over' in config: self.cli_set(base_path + ['peer-group', peer_group, 'override-capability']) if 'passive' in config: self.cli_set(base_path + ['peer-group', peer_group, 'passive']) if 'password' in config: self.cli_set(base_path + ['peer-group', peer_group, 'password', config["password"]]) if 'port' in config: self.cli_set(base_path + ['peer-group', peer_group, 'port', config["port"]]) if 'remote_as' in config: self.cli_set(base_path + ['peer-group', peer_group, 'remote-as', config["remote_as"]]) if 'shutdown' in config: self.cli_set(base_path + ['peer-group', peer_group, 'shutdown']) if 'ttl_security' in config: self.cli_set(base_path + ['peer-group', peer_group, 'ttl-security', 'hops', config["ttl_security"]]) if 'update_src' in config: self.cli_set(base_path + ['peer-group', peer_group, 'update-source', config["update_src"]]) if 'route_map_in' in config: self.cli_set(base_path + ['peer-group', peer_group, 'address-family', 'ipv4-unicast', 'route-map', 'import', config["route_map_in"]]) if 'route_map_out' in config: self.cli_set(base_path + ['peer-group', peer_group, 'address-family', 'ipv4-unicast', 'route-map', 'export', config["route_map_out"]]) if 'pfx_list_in' in config: self.cli_set(base_path + ['peer-group', peer_group, 'address-family', 'ipv4-unicast', 'prefix-list', 'import', config["pfx_list_in"]]) if 'pfx_list_out' in config: self.cli_set(base_path + ['peer-group', peer_group, 'address-family', 'ipv4-unicast', 'prefix-list', 'export', config["pfx_list_out"]]) if 'no_send_comm_std' in config: self.cli_set(base_path + ['peer-group', peer_group, 'address-family', 'ipv4-unicast', 'disable-send-community', 'standard']) if 'no_send_comm_ext' in config: self.cli_set(base_path + ['peer-group', peer_group, 'address-family', 'ipv4-unicast', 'disable-send-community', 'extended']) if 'addpath_all' in config: self.cli_set(base_path + ['peer-group', peer_group, 'address-family', 'ipv4-unicast', 'addpath-tx-all']) if 'addpath_per_as' in config: self.cli_set(base_path + ['peer-group', peer_group, 'address-family', 'ipv4-unicast', 'addpath-tx-per-as']) if 'graceful_rst' in config: self.cli_set(base_path + ['peer-group', peer_group, 'graceful-restart', 'enable']) if 'graceful_rst_no' in config: self.cli_set(base_path + ['peer-group', peer_group, 'graceful-restart', 'disable']) if 'graceful_rst_hlp' in config: self.cli_set(base_path + ['peer-group', peer_group, 'graceful-restart', 'restart-helper']) if 'disable_conn_chk' in config: self.cli_set(base_path + ['peer-group', peer_group, 'disable-connected-check']) if 'p_attr_discard' in config: for attribute in config['p_attr_discard']: self.cli_set(base_path + ['peer-group', peer_group, 'path-attribute', 'discard', attribute]) if 'p_attr_taw' in config: self.cli_set(base_path + ['peer-group', peer_group, 'path-attribute', 'treat-as-withdraw', config["p_attr_taw"]]) # Conditional advertisement if 'advertise_map' in config: self.cli_set(base_path + ['peer-group', peer_group, 'address-family', 'ipv4-unicast', 'conditionally-advertise', 'advertise-map', config["advertise_map"]]) # Either exist-map or non-exist-map needs to be specified if 'exist_map' not in config and 'non_exist_map' not in config: with self.assertRaises(ConfigSessionError): self.cli_commit() self.cli_set(base_path + ['peer-group', peer_group, 'address-family', 'ipv4-unicast', 'conditionally-advertise', 'exist-map', route_map_in]) if 'exist_map' in config: self.cli_set(base_path + ['peer-group', peer_group, 'address-family', 'ipv4-unicast', 'conditionally-advertise', 'exist-map', config["exist_map"]]) if 'non_exist_map' in config: self.cli_set(base_path + ['peer-group', peer_group, 'address-family', 'ipv4-unicast', 'conditionally-advertise', 'non-exist-map', config["non_exist_map"]]) for peer, peer_config in neighbor_config.items(): if 'peer_group' in peer_config: self.cli_set(base_path + ['neighbor', peer, 'peer-group', peer_config['peer_group']]) # commit changes self.cli_commit() # Verify FRR bgpd configuration frrconfig = self.getFRRconfig(f'router bgp {ASN}') self.assertIn(f'router bgp {ASN}', frrconfig) for peer, peer_config in peer_group_config.items(): self.assertIn(f' neighbor {peer_group} peer-group', frrconfig) self.verify_frr_config(peer, peer_config, frrconfig) for peer, peer_config in neighbor_config.items(): if 'peer_group' in peer_config: self.assertIn(f' neighbor {peer} peer-group {peer_config["peer_group"]}', frrconfig) def test_bgp_04_afi_ipv4(self): networks = { '10.0.0.0/8' : { 'as_set' : '', 'summary_only' : '', 'route_map' : route_map_in, }, '100.64.0.0/10' : { 'as_set' : '', }, '192.168.0.0/16' : { 'summary_only' : '', }, } # We want to redistribute ... redistributes = ['connected', 'isis', 'kernel', 'ospf', 'rip', 'static'] for redistribute in redistributes: self.cli_set(base_path + ['address-family', 'ipv4-unicast', 'redistribute', redistribute]) for network, network_config in networks.items(): self.cli_set(base_path + ['address-family', 'ipv4-unicast', 'network', network]) if 'as_set' in network_config: self.cli_set(base_path + ['address-family', 'ipv4-unicast', 'aggregate-address', network, 'as-set']) if 'summary_only' in network_config: self.cli_set(base_path + ['address-family', 'ipv4-unicast', 'aggregate-address', network, 'summary-only']) if 'route_map' in network_config: self.cli_set(base_path + ['address-family', 'ipv4-unicast', 'aggregate-address', network, 'route-map', network_config['route_map']]) # commit changes self.cli_commit() # Verify FRR bgpd configuration frrconfig = self.getFRRconfig(f'router bgp {ASN}') self.assertIn(f'router bgp {ASN}', frrconfig) self.assertIn(f' address-family ipv4 unicast', frrconfig) for redistribute in redistributes: self.assertIn(f' redistribute {redistribute}', frrconfig) for network, network_config in networks.items(): self.assertIn(f' network {network}', frrconfig) command = f'aggregate-address {network}' if 'as_set' in network_config: command = f'{command} as-set' if 'summary_only' in network_config: command = f'{command} summary-only' if 'route_map' in network_config: command = f'{command} route-map {network_config["route_map"]}' self.assertIn(command, frrconfig) def test_bgp_05_afi_ipv6(self): networks = { '2001:db8:100::/48' : { }, '2001:db8:200::/48' : { }, '2001:db8:300::/48' : { 'summary_only' : '', }, } # We want to redistribute ... redistributes = ['connected', 'kernel', 'ospfv3', 'ripng', 'static'] for redistribute in redistributes: self.cli_set(base_path + ['address-family', 'ipv6-unicast', 'redistribute', redistribute]) for network, network_config in networks.items(): self.cli_set(base_path + ['address-family', 'ipv6-unicast', 'network', network]) if 'summary_only' in network_config: self.cli_set(base_path + ['address-family', 'ipv6-unicast', 'aggregate-address', network, 'summary-only']) # commit changes self.cli_commit() # Verify FRR bgpd configuration frrconfig = self.getFRRconfig(f'router bgp {ASN}') self.assertIn(f'router bgp {ASN}', frrconfig) self.assertIn(f' address-family ipv6 unicast', frrconfig) # T2100: By default ebgp-requires-policy is disabled to keep VyOS # 1.3 and 1.2 backwards compatibility self.assertIn(f' no bgp ebgp-requires-policy', frrconfig) for redistribute in redistributes: # FRR calls this OSPF6 if redistribute == 'ospfv3': redistribute = 'ospf6' self.assertIn(f' redistribute {redistribute}', frrconfig) for network, network_config in networks.items(): self.assertIn(f' network {network}', frrconfig) if 'as_set' in network_config: self.assertIn(f' aggregate-address {network} summary-only', frrconfig) def test_bgp_06_listen_range(self): # Implemented via T1875 limit = '64' listen_ranges = ['192.0.2.0/25', '192.0.2.128/25'] peer_group = 'listenfoobar' self.cli_set(base_path + ['listen', 'limit', limit]) for prefix in listen_ranges: self.cli_set(base_path + ['listen', 'range', prefix]) # check validate() - peer-group must be defined for range/prefix with self.assertRaises(ConfigSessionError): self.cli_commit() self.cli_set(base_path + ['listen', 'range', prefix, 'peer-group', peer_group]) # check validate() - peer-group does yet not exist! with self.assertRaises(ConfigSessionError): self.cli_commit() self.cli_set(base_path + ['peer-group', peer_group, 'remote-as', ASN]) # commit changes self.cli_commit() # Verify FRR bgpd configuration frrconfig = self.getFRRconfig(f'router bgp {ASN}') self.assertIn(f'router bgp {ASN}', frrconfig) self.assertIn(f' neighbor {peer_group} peer-group', frrconfig) self.assertIn(f' neighbor {peer_group} remote-as {ASN}', frrconfig) self.assertIn(f' bgp listen limit {limit}', frrconfig) for prefix in listen_ranges: self.assertIn(f' bgp listen range {prefix} peer-group {peer_group}', frrconfig) def test_bgp_07_l2vpn_evpn(self): vnis = ['10010', '10020', '10030'] soo = '1.2.3.4:10000' evi_limit = '1000' route_targets = ['1.1.1.1:100', '1.1.1.1:200', '1.1.1.1:300'] self.cli_set(base_path + ['address-family', 'l2vpn-evpn', 'advertise-all-vni']) self.cli_set(base_path + ['address-family', 'l2vpn-evpn', 'advertise-default-gw']) self.cli_set(base_path + ['address-family', 'l2vpn-evpn', 'advertise-svi-ip']) self.cli_set(base_path + ['address-family', 'l2vpn-evpn', 'flooding', 'disable']) self.cli_set(base_path + ['address-family', 'l2vpn-evpn', 'default-originate', 'ipv4']) self.cli_set(base_path + ['address-family', 'l2vpn-evpn', 'default-originate', 'ipv6']) self.cli_set(base_path + ['address-family', 'l2vpn-evpn', 'disable-ead-evi-rx']) self.cli_set(base_path + ['address-family', 'l2vpn-evpn', 'disable-ead-evi-tx']) self.cli_set(base_path + ['address-family', 'l2vpn-evpn', 'mac-vrf', 'soo', soo]) for vni in vnis: self.cli_set(base_path + ['address-family', 'l2vpn-evpn', 'vni', vni, 'advertise-default-gw']) self.cli_set(base_path + ['address-family', 'l2vpn-evpn', 'vni', vni, 'advertise-svi-ip']) self.cli_set(base_path + ['address-family', 'l2vpn-evpn', 'ead-es-frag', 'evi-limit', evi_limit]) for route_target in route_targets: self.cli_set(base_path + ['address-family', 'l2vpn-evpn', 'ead-es-route-target', 'export', route_target]) # commit changes self.cli_commit() # Verify FRR bgpd configuration frrconfig = self.getFRRconfig(f'router bgp {ASN}') self.assertIn(f'router bgp {ASN}', frrconfig) self.assertIn(f' address-family l2vpn evpn', frrconfig) self.assertIn(f' advertise-all-vni', frrconfig) self.assertIn(f' advertise-default-gw', frrconfig) self.assertIn(f' advertise-svi-ip', frrconfig) self.assertIn(f' default-originate ipv4', frrconfig) self.assertIn(f' default-originate ipv6', frrconfig) self.assertIn(f' disable-ead-evi-rx', frrconfig) self.assertIn(f' disable-ead-evi-tx', frrconfig) self.assertIn(f' flooding disable', frrconfig) self.assertIn(f' mac-vrf soo {soo}', frrconfig) for vni in vnis: vniconfig = self.getFRRconfig(f' vni {vni}') self.assertIn(f'vni {vni}', vniconfig) self.assertIn(f' advertise-default-gw', vniconfig) self.assertIn(f' advertise-svi-ip', vniconfig) self.assertIn(f' ead-es-frag evi-limit {evi_limit}', frrconfig) for route_target in route_targets: self.assertIn(f' ead-es-route-target export {route_target}', frrconfig) def test_bgp_09_distance_and_flowspec(self): distance_external = '25' distance_internal = '30' distance_local = '35' distance_v4_prefix = '169.254.0.0/32' distance_v6_prefix = '2001::/128' distance_prefix_value = '110' distance_families = ['ipv4-unicast', 'ipv6-unicast','ipv4-multicast', 'ipv6-multicast'] verify_families = ['ipv4 unicast', 'ipv6 unicast','ipv4 multicast', 'ipv6 multicast'] flowspec_families = ['address-family ipv4 flowspec', 'address-family ipv6 flowspec'] flowspec_int = 'lo' # Per family distance support for family in distance_families: self.cli_set(base_path + ['address-family', family, 'distance', 'external', distance_external]) self.cli_set(base_path + ['address-family', family, 'distance', 'internal', distance_internal]) self.cli_set(base_path + ['address-family', family, 'distance', 'local', distance_local]) if 'ipv4' in family: self.cli_set(base_path + ['address-family', family, 'distance', 'prefix', distance_v4_prefix, 'distance', distance_prefix_value]) if 'ipv6' in family: self.cli_set(base_path + ['address-family', family, 'distance', 'prefix', distance_v6_prefix, 'distance', distance_prefix_value]) # IPv4 flowspec interface check self.cli_set(base_path + ['address-family', 'ipv4-flowspec', 'local-install', 'interface', flowspec_int]) # IPv6 flowspec interface check self.cli_set(base_path + ['address-family', 'ipv6-flowspec', 'local-install', 'interface', flowspec_int]) # Commit changes self.cli_commit() # Verify FRR distances configuration frrconfig = self.getFRRconfig(f'router bgp {ASN}') self.assertIn(f'router bgp {ASN}', frrconfig) for family in verify_families: self.assertIn(f'address-family {family}', frrconfig) self.assertIn(f'distance bgp {distance_external} {distance_internal} {distance_local}', frrconfig) if 'ipv4' in family: self.assertIn(f'distance {distance_prefix_value} {distance_v4_prefix}', frrconfig) if 'ipv6' in family: self.assertIn(f'distance {distance_prefix_value} {distance_v6_prefix}', frrconfig) # Verify FRR flowspec configuration for family in flowspec_families: self.assertIn(f'{family}', frrconfig) self.assertIn(f'local-install {flowspec_int}', frrconfig) def test_bgp_10_vrf_simple(self): router_id = '127.0.0.3' vrfs = ['red', 'green', 'blue'] # It is safe to assume that when the basic VRF test works, all # other BGP related features work, as we entirely inherit the CLI # templates and Jinja2 FRR template. table = '1000' # testing only one AFI is sufficient as it's generic code for vrf in vrfs: vrf_base = ['vrf', 'name', vrf] self.cli_set(vrf_base + ['table', table]) self.cli_set(vrf_base + ['protocols', 'bgp', 'system-as', ASN]) self.cli_set(vrf_base + ['protocols', 'bgp', 'parameters', 'router-id', router_id]) table = str(int(table) + 1000) # import VRF routes do main RIB self.cli_set(base_path + ['address-family', 'ipv6-unicast', 'import', 'vrf', vrf]) self.cli_commit() # Verify FRR bgpd configuration frrconfig = self.getFRRconfig(f'router bgp {ASN}') self.assertIn(f'router bgp {ASN}', frrconfig) self.assertIn(f' address-family ipv6 unicast', frrconfig) for vrf in vrfs: self.assertIn(f' import vrf {vrf}', frrconfig) # Verify FRR bgpd configuration frr_vrf_config = self.getFRRconfig(f'router bgp {ASN} vrf {vrf}') self.assertIn(f'router bgp {ASN} vrf {vrf}', frr_vrf_config) self.assertIn(f' bgp router-id {router_id}', frr_vrf_config) def test_bgp_11_confederation(self): router_id = '127.10.10.2' confed_id = str(int(ASN) + 1) confed_asns = '10 20 30 40' self.cli_set(base_path + ['parameters', 'router-id', router_id]) self.cli_set(base_path + ['parameters', 'confederation', 'identifier', confed_id]) for asn in confed_asns.split(): self.cli_set(base_path + ['parameters', 'confederation', 'peers', asn]) # commit changes self.cli_commit() # Verify FRR bgpd configuration frrconfig = self.getFRRconfig(f'router bgp {ASN}') self.assertIn(f'router bgp {ASN}', frrconfig) self.assertIn(f' bgp router-id {router_id}', frrconfig) self.assertIn(f' bgp confederation identifier {confed_id}', frrconfig) self.assertIn(f' bgp confederation peers {confed_asns}', frrconfig) def test_bgp_12_v6_link_local(self): remote_asn = str(int(ASN) + 10) interface = 'eth0' self.cli_set(base_path + ['neighbor', interface, 'address-family', 'ipv6-unicast']) self.cli_set(base_path + ['neighbor', interface, 'interface', 'v6only', 'remote-as', remote_asn]) # commit changes self.cli_commit() # Verify FRR bgpd configuration frrconfig = self.getFRRconfig(f'router bgp {ASN}') self.assertIn(f'router bgp {ASN}', frrconfig) self.assertIn(f' neighbor {interface} interface v6only remote-as {remote_asn}', frrconfig) self.assertIn(f' address-family ipv6 unicast', frrconfig) self.assertIn(f' neighbor {interface} activate', frrconfig) self.assertIn(f' exit-address-family', frrconfig) def test_bgp_13_vpn(self): remote_asn = str(int(ASN) + 150) neighbor = '192.0.2.55' vrf_name = 'red' label = 'auto' rd = f'{neighbor}:{ASN}' rt_export = f'{neighbor}:1002 1.2.3.4:567' rt_import = f'{neighbor}:1003 500:100' # testing only one AFI is sufficient as it's generic code for afi in ['ipv4-unicast', 'ipv6-unicast']: self.cli_set(base_path + ['address-family', afi, 'export', 'vpn']) self.cli_set(base_path + ['address-family', afi, 'import', 'vpn']) self.cli_set(base_path + ['address-family', afi, 'label', 'vpn', 'export', label]) self.cli_set(base_path + ['address-family', afi, 'label', 'vpn', 'allocation-mode', 'per-nexthop']) self.cli_set(base_path + ['address-family', afi, 'rd', 'vpn', 'export', rd]) self.cli_set(base_path + ['address-family', afi, 'route-map', 'vpn', 'export', route_map_out]) self.cli_set(base_path + ['address-family', afi, 'route-map', 'vpn', 'import', route_map_in]) self.cli_set(base_path + ['address-family', afi, 'route-target', 'vpn', 'export', rt_export]) self.cli_set(base_path + ['address-family', afi, 'route-target', 'vpn', 'import', rt_import]) # commit changes self.cli_commit() # Verify FRR bgpd configuration frrconfig = self.getFRRconfig(f'router bgp {ASN}') self.assertIn(f'router bgp {ASN}', frrconfig) for afi in ['ipv4', 'ipv6']: afi_config = self.getFRRconfig(f' address-family {afi} unicast', endsection='exit-address-family', daemon=bgp_daemon) self.assertIn(f'address-family {afi} unicast', afi_config) self.assertIn(f' export vpn', afi_config) self.assertIn(f' import vpn', afi_config) self.assertIn(f' label vpn export {label}', afi_config) self.assertIn(f' label vpn export allocation-mode per-nexthop', afi_config) self.assertIn(f' rd vpn export {rd}', afi_config) self.assertIn(f' route-map vpn export {route_map_out}', afi_config) self.assertIn(f' route-map vpn import {route_map_in}', afi_config) self.assertIn(f' rt vpn export {rt_export}', afi_config) self.assertIn(f' rt vpn import {rt_import}', afi_config) self.assertIn(f' exit-address-family', afi_config) def test_bgp_14_remote_as_peer_group_override(self): # Peer-group member cannot override remote-as of peer-group remote_asn = str(int(ASN) + 150) neighbor = '192.0.2.1' peer_group = 'bar' interface = 'eth0' self.cli_set(base_path + ['neighbor', neighbor, 'remote-as', remote_asn]) self.cli_set(base_path + ['neighbor', neighbor, 'peer-group', peer_group]) self.cli_set(base_path + ['peer-group', peer_group, 'remote-as', remote_asn]) # Peer-group member cannot override remote-as of peer-group with self.assertRaises(ConfigSessionError): self.cli_commit() self.cli_delete(base_path + ['neighbor', neighbor, 'remote-as']) # re-test with interface based peer-group self.cli_set(base_path + ['neighbor', interface, 'interface', 'peer-group', peer_group]) self.cli_set(base_path + ['neighbor', interface, 'interface', 'remote-as', 'external']) with self.assertRaises(ConfigSessionError): self.cli_commit() self.cli_delete(base_path + ['neighbor', interface, 'interface', 'remote-as']) # re-test with interface based v6only peer-group self.cli_set(base_path + ['neighbor', interface, 'interface', 'v6only', 'peer-group', peer_group]) self.cli_set(base_path + ['neighbor', interface, 'interface', 'v6only', 'remote-as', 'external']) with self.assertRaises(ConfigSessionError): self.cli_commit() self.cli_delete(base_path + ['neighbor', interface, 'interface', 'v6only', 'remote-as']) self.cli_commit() frrconfig = self.getFRRconfig(f'router bgp {ASN}') self.assertIn(f'router bgp {ASN}', frrconfig) self.assertIn(f' neighbor {neighbor} peer-group {peer_group}', frrconfig) self.assertIn(f' neighbor {peer_group} peer-group', frrconfig) self.assertIn(f' neighbor {peer_group} remote-as {remote_asn}', frrconfig) def test_bgp_15_local_as_ebgp(self): # https://vyos.dev/T4560 # local-as allowed only for ebgp peers neighbor = '192.0.2.99' remote_asn = '500' local_asn = '400' self.cli_set(base_path + ['neighbor', neighbor, 'remote-as', ASN]) self.cli_set(base_path + ['neighbor', neighbor, 'local-as', local_asn]) # check validate() - local-as allowed only for ebgp peers with self.assertRaises(ConfigSessionError): self.cli_commit() self.cli_set(base_path + ['neighbor', neighbor, 'remote-as', remote_asn]) self.cli_commit() frrconfig = self.getFRRconfig(f'router bgp {ASN}') self.assertIn(f'router bgp {ASN}', frrconfig) self.assertIn(f' neighbor {neighbor} remote-as {remote_asn}', frrconfig) self.assertIn(f' neighbor {neighbor} local-as {local_asn}', frrconfig) def test_bgp_16_import_rd_rt_compatibility(self): # Verify if import vrf and rd vpn export # exist in the same address family self.create_bgp_instances_for_import_test() self.cli_set( base_path + ['address-family', import_afi, 'import', 'vrf', import_vrf]) self.cli_set( base_path + ['address-family', import_afi, 'rd', 'vpn', 'export', import_rd]) with self.assertRaises(ConfigSessionError): self.cli_commit() def test_bgp_17_import_rd_rt_compatibility(self): # Verify if vrf that is in import vrf list contains rd vpn export self.create_bgp_instances_for_import_test() self.cli_set( base_path + ['address-family', import_afi, 'import', 'vrf', import_vrf]) self.cli_commit() frrconfig = self.getFRRconfig(f'router bgp {ASN}') frrconfig_vrf = self.getFRRconfig(f'router bgp {ASN} vrf {import_vrf}') self.assertIn(f'router bgp {ASN}', frrconfig) self.assertIn(f'address-family ipv4 unicast', frrconfig) self.assertIn(f' import vrf {import_vrf}', frrconfig) self.assertIn(f'router bgp {ASN} vrf {import_vrf}', frrconfig_vrf) self.cli_set( import_vrf_base + [import_vrf] + base_path + ['address-family', import_afi, 'rd', 'vpn', 'export', import_rd]) with self.assertRaises(ConfigSessionError): self.cli_commit() def test_bgp_18_deleting_import_vrf(self): # Verify deleting vrf that is in import vrf list self.create_bgp_instances_for_import_test() self.cli_set( base_path + ['address-family', import_afi, 'import', 'vrf', import_vrf]) self.cli_commit() frrconfig = self.getFRRconfig(f'router bgp {ASN}') frrconfig_vrf = self.getFRRconfig(f'router bgp {ASN} vrf {import_vrf}') self.assertIn(f'router bgp {ASN}', frrconfig) self.assertIn(f'address-family ipv4 unicast', frrconfig) self.assertIn(f' import vrf {import_vrf}', frrconfig) self.assertIn(f'router bgp {ASN} vrf {import_vrf}', frrconfig_vrf) self.cli_delete(import_vrf_base + [import_vrf]) with self.assertRaises(ConfigSessionError): self.cli_commit() def test_bgp_19_deleting_default_vrf(self): # Verify deleting existent vrf default if other vrfs were created self.create_bgp_instances_for_import_test() self.cli_commit() frrconfig = self.getFRRconfig(f'router bgp {ASN}') frrconfig_vrf = self.getFRRconfig(f'router bgp {ASN} vrf {import_vrf}') self.assertIn(f'router bgp {ASN}', frrconfig) self.assertIn(f'router bgp {ASN} vrf {import_vrf}', frrconfig_vrf) self.cli_delete(base_path) with self.assertRaises(ConfigSessionError): self.cli_commit() def test_bgp_20_import_rd_rt_compatibility(self): # Verify if vrf that has rd vpn export is in import vrf of other vrfs self.create_bgp_instances_for_import_test() self.cli_set( import_vrf_base + [import_vrf] + base_path + ['address-family', import_afi, 'rd', 'vpn', 'export', import_rd]) self.cli_commit() frrconfig = self.getFRRconfig(f'router bgp {ASN}') frrconfig_vrf = self.getFRRconfig(f'router bgp {ASN} vrf {import_vrf}') self.assertIn(f'router bgp {ASN}', frrconfig) self.assertIn(f'router bgp {ASN} vrf {import_vrf}', frrconfig_vrf) self.assertIn(f'address-family ipv4 unicast', frrconfig_vrf) self.assertIn(f' rd vpn export {import_rd}', frrconfig_vrf) self.cli_set( base_path + ['address-family', import_afi, 'import', 'vrf', import_vrf]) with self.assertRaises(ConfigSessionError): self.cli_commit() def test_bgp_21_import_unspecified_vrf(self): # Verify if vrf that is in import is unspecified self.create_bgp_instances_for_import_test() self.cli_set( base_path + ['address-family', import_afi, 'import', 'vrf', 'test']) with self.assertRaises(ConfigSessionError): self.cli_commit() def test_bgp_22_interface_mpls_forwarding(self): interfaces = Section.interfaces('ethernet', vlan=False) for interface in interfaces: self.cli_set(base_path + ['interface', interface, 'mpls', 'forwarding']) self.cli_commit() for interface in interfaces: frrconfig = self.getFRRconfig(f'interface {interface}') self.assertIn(f'interface {interface}', frrconfig) self.assertIn(f' mpls bgp forwarding', frrconfig) def test_bgp_23_vrf_interface_mpls_forwarding(self): self.create_bgp_instances_for_import_test() interfaces = Section.interfaces('ethernet', vlan=False) for interface in interfaces: self.cli_set(['interfaces', 'ethernet', interface, 'vrf', import_vrf]) self.cli_set(import_vrf_base + [import_vrf] + base_path + ['interface', interface, 'mpls', 'forwarding']) self.cli_commit() for interface in interfaces: frrconfig = self.getFRRconfig(f'interface {interface}') self.assertIn(f'interface {interface}', frrconfig) self.assertIn(f' mpls bgp forwarding', frrconfig) self.cli_delete(['interfaces', 'ethernet', interface, 'vrf']) def test_bgp_24_srv6_sid(self): locator_name = 'VyOS_foo' sid = 'auto' nexthop_ipv4 = '192.0.0.1' nexthop_ipv6 = '2001:db8:100:200::2' self.cli_set(base_path + ['srv6', 'locator', locator_name]) self.cli_set(base_path + ['sid', 'vpn', 'per-vrf', 'export', sid]) self.cli_set(base_path + ['address-family', 'ipv4-unicast', 'sid', 'vpn', 'export', sid]) # verify() - SID per VRF and SID per address-family are mutually exclusive! with self.assertRaises(ConfigSessionError): self.cli_commit() self.cli_delete(base_path + ['address-family', 'ipv4-unicast', 'sid']) self.cli_commit() frrconfig = self.getFRRconfig(f'router bgp {ASN}') self.assertIn(f'router bgp {ASN}', frrconfig) self.assertIn(f' segment-routing srv6', frrconfig) self.assertIn(f' locator {locator_name}', frrconfig) self.assertIn(f' sid vpn per-vrf export {sid}', frrconfig) # Now test AFI SID self.cli_delete(base_path + ['sid']) self.cli_set(base_path + ['address-family', 'ipv4-unicast', 'sid', 'vpn', 'export', sid]) self.cli_set(base_path + ['address-family', 'ipv4-unicast', 'nexthop', 'vpn', 'export', nexthop_ipv4]) self.cli_set(base_path + ['address-family', 'ipv6-unicast', 'sid', 'vpn', 'export', sid]) self.cli_set(base_path + ['address-family', 'ipv6-unicast', 'nexthop', 'vpn', 'export', nexthop_ipv6]) self.cli_commit() frrconfig = self.getFRRconfig(f'router bgp {ASN}') self.assertIn(f'router bgp {ASN}', frrconfig) self.assertIn(f' segment-routing srv6', frrconfig) self.assertIn(f' locator {locator_name}', frrconfig) afiv4_config = self.getFRRconfig(' address-family ipv4 unicast') self.assertIn(f' sid vpn export {sid}', afiv4_config) self.assertIn(f' nexthop vpn export {nexthop_ipv4}', afiv4_config) afiv6_config = self.getFRRconfig(' address-family ipv6 unicast') self.assertIn(f' sid vpn export {sid}', afiv6_config) self.assertIn(f' nexthop vpn export {nexthop_ipv6}', afiv4_config) def test_bgp_25_ipv4_labeled_unicast_peer_group(self): pg_ipv4 = 'foo4' ipv4_max_prefix = '20' ipv4_prefix = '192.0.2.0/24' self.cli_set(base_path + ['listen', 'range', ipv4_prefix, 'peer-group', pg_ipv4]) self.cli_set(base_path + ['parameters', 'labeled-unicast', 'ipv4-explicit-null']) self.cli_set(base_path + ['peer-group', pg_ipv4, 'address-family', 'ipv4-labeled-unicast', 'maximum-prefix', ipv4_max_prefix]) self.cli_set(base_path + ['peer-group', pg_ipv4, 'remote-as', 'external']) self.cli_commit() frrconfig = self.getFRRconfig(f'router bgp {ASN}') self.assertIn(f'router bgp {ASN}', frrconfig) self.assertIn(f' neighbor {pg_ipv4} peer-group', frrconfig) self.assertIn(f' neighbor {pg_ipv4} remote-as external', frrconfig) self.assertIn(f' bgp listen range {ipv4_prefix} peer-group {pg_ipv4}', frrconfig) self.assertIn(f' bgp labeled-unicast ipv4-explicit-null', frrconfig) afiv4_config = self.getFRRconfig(' address-family ipv4 labeled-unicast') self.assertIn(f' neighbor {pg_ipv4} activate', afiv4_config) self.assertIn(f' neighbor {pg_ipv4} maximum-prefix {ipv4_max_prefix}', afiv4_config) def test_bgp_26_ipv6_labeled_unicast_peer_group(self): pg_ipv6 = 'foo6' ipv6_max_prefix = '200' ipv6_prefix = '2001:db8:1000::/64' self.cli_set(base_path + ['listen', 'range', ipv6_prefix, 'peer-group', pg_ipv6]) self.cli_set(base_path + ['parameters', 'labeled-unicast', 'ipv6-explicit-null']) self.cli_set(base_path + ['peer-group', pg_ipv6, 'address-family', 'ipv6-labeled-unicast', 'maximum-prefix', ipv6_max_prefix]) self.cli_set(base_path + ['peer-group', pg_ipv6, 'remote-as', 'external']) self.cli_commit() frrconfig = self.getFRRconfig(f'router bgp {ASN}') self.assertIn(f'router bgp {ASN}', frrconfig) self.assertIn(f' neighbor {pg_ipv6} peer-group', frrconfig) self.assertIn(f' neighbor {pg_ipv6} remote-as external', frrconfig) self.assertIn(f' bgp listen range {ipv6_prefix} peer-group {pg_ipv6}', frrconfig) self.assertIn(f' bgp labeled-unicast ipv6-explicit-null', frrconfig) afiv6_config = self.getFRRconfig(' address-family ipv6 labeled-unicast') self.assertIn(f' neighbor {pg_ipv6} activate', afiv6_config) self.assertIn(f' neighbor {pg_ipv6} maximum-prefix {ipv6_max_prefix}', afiv6_config) def test_bgp_27_route_reflector_client(self): self.cli_set(base_path + ['peer-group', 'peer1', 'address-family', 'l2vpn-evpn', 'route-reflector-client']) with self.assertRaises(ConfigSessionError) as e: self.cli_commit() self.cli_set(base_path + ['peer-group', 'peer1', 'remote-as', 'internal']) self.cli_commit() conf = self.getFRRconfig(' address-family l2vpn evpn') self.assertIn('neighbor peer1 route-reflector-client', conf) def test_bgp_28_peer_group_member_all_internal_or_external(self): def _common_config_check(conf, include_ras=True): if include_ras: self.assertIn(f'neighbor {int_neighbors[0]} remote-as {ASN}', conf) self.assertIn(f'neighbor {int_neighbors[1]} remote-as {ASN}', conf) self.assertIn(f'neighbor {ext_neighbors[0]} remote-as {int(ASN) + 1}',conf) self.assertIn(f'neighbor {int_neighbors[0]} peer-group {int_pg_name}', conf) self.assertIn(f'neighbor {int_neighbors[1]} peer-group {int_pg_name}', conf) self.assertIn(f'neighbor {ext_neighbors[0]} peer-group {ext_pg_name}', conf) int_neighbors = ['192.0.2.2', '192.0.2.3'] ext_neighbors = ['192.122.2.2', '192.122.2.3'] int_pg_name, ext_pg_name = 'SMOKETESTINT', 'SMOKETESTEXT' self.cli_set(base_path + ['neighbor', int_neighbors[0], 'peer-group', int_pg_name]) self.cli_set(base_path + ['neighbor', int_neighbors[0], 'remote-as', ASN]) self.cli_set(base_path + ['peer-group', int_pg_name, 'address-family', 'ipv4-unicast']) self.cli_set(base_path + ['neighbor', ext_neighbors[0], 'peer-group', ext_pg_name]) self.cli_set(base_path + ['neighbor', ext_neighbors[0], 'remote-as', f'{int(ASN) + 1}']) self.cli_set(base_path + ['peer-group', ext_pg_name, 'address-family', 'ipv4-unicast']) self.cli_commit() # test add external remote-as to internal group self.cli_set(base_path + ['neighbor', int_neighbors[1], 'peer-group', int_pg_name]) self.cli_set(base_path + ['neighbor', int_neighbors[1], 'remote-as', f'{int(ASN) + 1}']) with self.assertRaises(ConfigSessionError) as e: self.cli_commit() # self.assertIn('\nPeer-group members must be all internal or all external\n', str(e.exception)) # test add internal remote-as to internal group self.cli_set(base_path + ['neighbor', int_neighbors[1], 'remote-as', ASN]) self.cli_commit() conf = self.getFRRconfig(f'router bgp {ASN}') _common_config_check(conf) # test add internal remote-as to external group self.cli_set(base_path + ['neighbor', ext_neighbors[1], 'peer-group', ext_pg_name]) self.cli_set(base_path + ['neighbor', ext_neighbors[1], 'remote-as', ASN]) with self.assertRaises(ConfigSessionError) as e: self.cli_commit() # self.assertIn('\nPeer-group members must be all internal or all external\n', str(e.exception)) # test add external remote-as to external group self.cli_set(base_path + ['neighbor', ext_neighbors[1], 'remote-as', f'{int(ASN) + 2}']) self.cli_commit() conf = self.getFRRconfig(f'router bgp {ASN}') _common_config_check(conf) self.assertIn(f'neighbor {ext_neighbors[1]} remote-as {int(ASN) + 2}', conf) self.assertIn(f'neighbor {ext_neighbors[1]} peer-group {ext_pg_name}', conf) # test named remote-as self.cli_set(base_path + ['neighbor', int_neighbors[0], 'remote-as', 'internal']) self.cli_set(base_path + ['neighbor', int_neighbors[1], 'remote-as', 'internal']) self.cli_set(base_path + ['neighbor', ext_neighbors[0], 'remote-as', 'external']) self.cli_set(base_path + ['neighbor', ext_neighbors[1], 'remote-as', 'external']) self.cli_commit() conf = self.getFRRconfig(f'router bgp {ASN}') _common_config_check(conf, include_ras=False) self.assertIn(f'neighbor {int_neighbors[0]} remote-as internal', conf) self.assertIn(f'neighbor {int_neighbors[1]} remote-as internal', conf) self.assertIn(f'neighbor {ext_neighbors[0]} remote-as external', conf) self.assertIn(f'neighbor {ext_neighbors[1]} remote-as external', conf) self.assertIn(f'neighbor {ext_neighbors[1]} peer-group {ext_pg_name}', conf) def test_bgp_29_peer_group_remote_as_equal_local_as(self): self.cli_set(base_path + ['system-as', ASN]) self.cli_set(base_path + ['peer-group', 'OVERLAY', 'local-as', f'{int(ASN) + 1}']) self.cli_set(base_path + ['peer-group', 'OVERLAY', 'remote-as', f'{int(ASN) + 1}']) self.cli_set(base_path + ['peer-group', 'OVERLAY', 'address-family', 'l2vpn-evpn']) self.cli_set(base_path + ['peer-group', 'UNDERLAY', 'address-family', 'ipv4-unicast']) self.cli_set(base_path + ['neighbor', '10.177.70.62', 'peer-group', 'UNDERLAY']) self.cli_set(base_path + ['neighbor', '10.177.70.62', 'remote-as', 'external']) self.cli_set(base_path + ['neighbor', '10.177.75.1', 'peer-group', 'OVERLAY']) self.cli_set(base_path + ['neighbor', '10.177.75.2', 'peer-group', 'OVERLAY']) self.cli_commit() conf = self.getFRRconfig(f'router bgp {ASN}') self.assertIn(f'neighbor OVERLAY remote-as {int(ASN) + 1}', conf) self.assertIn(f'neighbor OVERLAY local-as {int(ASN) + 1}', conf) def test_bgp_99_bmp(self): target_name = 'instance-bmp' target_address = '127.0.0.1' target_port = '5000' min_retry = '1024' max_retry = '2048' monitor_ipv4 = 'pre-policy' monitor_ipv6 = 'pre-policy' mirror_buffer = '32000000' bmp_path = base_path + ['bmp'] target_path = bmp_path + ['target', target_name] # by default the 'bmp' module not loaded for the bgpd expect Error self.cli_set(bmp_path) if not process_named_running('bgpd', 'bmp'): with self.assertRaises(ConfigSessionError): self.cli_commit() # add required 'bmp' module to bgpd and restart bgpd self.cli_delete(bmp_path) self.cli_set(['system', 'frr', 'bmp']) self.cli_commit() # restart bgpd to apply "-M bmp" and update PID cmd(f'sudo kill -9 {self.daemon_pid}') # let the bgpd process recover sleep(10) # update daemon PID - this was a planned daemon restart self.daemon_pid = process_named_running(PROCESS_NAME) # set bmp config but not set address self.cli_set(target_path + ['port', target_port]) # address is not set, expect Error with self.assertRaises(ConfigSessionError): self.cli_commit() # config other bmp options self.cli_set(target_path + ['address', target_address]) self.cli_set(bmp_path + ['mirror-buffer-limit', mirror_buffer]) self.cli_set(target_path + ['port', target_port]) self.cli_set(target_path + ['min-retry', min_retry]) self.cli_set(target_path + ['max-retry', max_retry]) self.cli_set(target_path + ['mirror']) self.cli_set(target_path + ['monitor', 'ipv4-unicast', monitor_ipv4]) self.cli_set(target_path + ['monitor', 'ipv6-unicast', monitor_ipv6]) self.cli_commit() # Verify bgpd bmp configuration frrconfig = self.getFRRconfig(f'router bgp {ASN}') self.assertIn(f'bmp mirror buffer-limit {mirror_buffer}', frrconfig) self.assertIn(f'bmp targets {target_name}', frrconfig) self.assertIn(f'bmp mirror', frrconfig) self.assertIn(f'bmp monitor ipv4 unicast {monitor_ipv4}', frrconfig) self.assertIn(f'bmp monitor ipv6 unicast {monitor_ipv6}', frrconfig) self.assertIn(f'bmp connect {target_address} port {target_port} min-retry {min_retry} max-retry {max_retry}', frrconfig) if __name__ == '__main__': unittest.main(verbosity=2) diff --git a/smoketest/scripts/cli/test_protocols_isis.py b/smoketest/scripts/cli/test_protocols_isis.py index 5b86dd53a..2fddbfeba 100755 --- a/smoketest/scripts/cli/test_protocols_isis.py +++ b/smoketest/scripts/cli/test_protocols_isis.py @@ -1,417 +1,417 @@ #!/usr/bin/env python3 # # Copyright (C) 2021-2024 VyOS maintainers and contributors # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License version 2 or later as # published by the Free Software Foundation. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. import unittest from base_vyostest_shim import VyOSUnitTestSHIM from vyos.configsession import ConfigSessionError from vyos.ifconfig import Section from vyos.utils.process import process_named_running -from vyos.frr import isis_daemon +from vyos.frrender import isis_daemon PROCESS_NAME = 'isisd' base_path = ['protocols', 'isis'] domain = 'VyOS' net = '49.0001.1921.6800.1002.00' class TestProtocolsISIS(VyOSUnitTestSHIM.TestCase): @classmethod def setUpClass(cls): cls._interfaces = Section.interfaces('ethernet') # call base-classes classmethod super(TestProtocolsISIS, cls).setUpClass() # Retrieve FRR daemon PID - it is not allowed to crash, thus PID must remain the same cls.daemon_pid = process_named_running(PROCESS_NAME) # ensure we can also run this test on a live system - so lets clean # out the current configuration :) cls.cli_delete(cls, base_path) cls.cli_delete(cls, ['vrf']) def tearDown(self): # cleanup any possible VRF mess self.cli_delete(['vrf']) # always destrox the entire isisd configuration to make the processes # life as hard as possible self.cli_delete(base_path) self.cli_commit() # check process health and continuity self.assertEqual(self.daemon_pid, process_named_running(PROCESS_NAME)) def isis_base_config(self): self.cli_set(base_path + ['net', net]) for interface in self._interfaces: self.cli_set(base_path + ['interface', interface]) def test_isis_01_redistribute(self): prefix_list = 'EXPORT-ISIS' route_map = 'EXPORT-ISIS' rule = '10' metric_style = 'transition' self.cli_set(['policy', 'prefix-list', prefix_list, 'rule', rule, 'action', 'permit']) self.cli_set(['policy', 'prefix-list', prefix_list, 'rule', rule, 'prefix', '203.0.113.0/24']) self.cli_set(['policy', 'route-map', route_map, 'rule', rule, 'action', 'permit']) self.cli_set(['policy', 'route-map', route_map, 'rule', rule, 'match', 'ip', 'address', 'prefix-list', prefix_list]) self.cli_set(base_path) # verify() - net id and interface are mandatory with self.assertRaises(ConfigSessionError): self.cli_commit() self.isis_base_config() self.cli_set(base_path + ['redistribute', 'ipv4', 'connected']) # verify() - Redistribute level-1 or level-2 should be specified with self.assertRaises(ConfigSessionError): self.cli_commit() self.cli_set(base_path + ['redistribute', 'ipv4', 'connected', 'level-2', 'route-map', route_map]) self.cli_set(base_path + ['metric-style', metric_style]) self.cli_set(base_path + ['log-adjacency-changes']) # Commit all changes self.cli_commit() # Verify all changes tmp = self.getFRRconfig(f'router isis {domain}', daemon=isis_daemon) self.assertIn(f' net {net}', tmp) self.assertIn(f' metric-style {metric_style}', tmp) self.assertIn(f' log-adjacency-changes', tmp) self.assertIn(f' redistribute ipv4 connected level-2 route-map {route_map}', tmp) for interface in self._interfaces: tmp = self.getFRRconfig(f'interface {interface}', daemon=isis_daemon) self.assertIn(f' ip router isis {domain}', tmp) self.assertIn(f' ipv6 router isis {domain}', tmp) self.cli_delete(['policy', 'route-map', route_map]) self.cli_delete(['policy', 'prefix-list', prefix_list]) def test_isis_02_vrfs(self): vrfs = ['red', 'green', 'blue'] # It is safe to assume that when the basic VRF test works, all other # IS-IS related features work, as we entirely inherit the CLI templates # and Jinja2 FRR template. table = '1000' vrf = 'red' vrf_base = ['vrf', 'name', vrf] vrf_iface = 'eth1' self.cli_set(vrf_base + ['table', table]) self.cli_set(vrf_base + ['protocols', 'isis', 'net', net]) self.cli_set(vrf_base + ['protocols', 'isis', 'interface', vrf_iface]) self.cli_set(vrf_base + ['protocols', 'isis', 'advertise-high-metrics']) self.cli_set(vrf_base + ['protocols', 'isis', 'advertise-passive-only']) self.cli_set(['interfaces', 'ethernet', vrf_iface, 'vrf', vrf]) # Also set a default VRF IS-IS config self.cli_set(base_path + ['net', net]) self.cli_set(base_path + ['interface', 'eth0']) self.cli_commit() # Verify FRR isisd configuration tmp = self.getFRRconfig(f'router isis {domain}', daemon=isis_daemon) self.assertIn(f'router isis {domain}', tmp) self.assertIn(f' net {net}', tmp) tmp = self.getFRRconfig(f'router isis {domain} vrf {vrf}', daemon=isis_daemon) self.assertIn(f'router isis {domain} vrf {vrf}', tmp) self.assertIn(f' net {net}', tmp) self.assertIn(f' advertise-high-metrics', tmp) self.assertIn(f' advertise-passive-only', tmp) self.cli_delete(['vrf', 'name', vrf]) self.cli_delete(['interfaces', 'ethernet', vrf_iface, 'vrf']) def test_isis_04_default_information(self): metric = '50' route_map = 'default-foo-' self.isis_base_config() for afi in ['ipv4', 'ipv6']: for level in ['level-1', 'level-2']: self.cli_set(base_path + ['default-information', 'originate', afi, level, 'always']) self.cli_set(base_path + ['default-information', 'originate', afi, level, 'metric', metric]) self.cli_set(base_path + ['default-information', 'originate', afi, level, 'route-map', route_map + level + afi]) # Commit all changes self.cli_commit() # Verify all changes tmp = self.getFRRconfig(f'router isis {domain}', daemon=isis_daemon) self.assertIn(f' net {net}', tmp) for afi in ['ipv4', 'ipv6']: for level in ['level-1', 'level-2']: route_map_name = route_map + level + afi self.assertIn(f' default-information originate {afi} {level} always route-map {route_map_name} metric {metric}', tmp) def test_isis_05_password(self): password = 'foo' self.isis_base_config() for interface in self._interfaces: self.cli_set(base_path + ['interface', interface, 'password', 'plaintext-password', f'{password}-{interface}']) self.cli_set(base_path + ['area-password', 'plaintext-password', password]) self.cli_set(base_path + ['area-password', 'md5', password]) self.cli_set(base_path + ['domain-password', 'plaintext-password', password]) self.cli_set(base_path + ['domain-password', 'md5', password]) # verify() - can not use both md5 and plaintext-password for area-password with self.assertRaises(ConfigSessionError): self.cli_commit() self.cli_delete(base_path + ['area-password', 'md5', password]) # verify() - can not use both md5 and plaintext-password for domain-password with self.assertRaises(ConfigSessionError): self.cli_commit() self.cli_delete(base_path + ['domain-password', 'md5', password]) # Commit all changes self.cli_commit() # Verify all changes tmp = self.getFRRconfig(f'router isis {domain}', daemon=isis_daemon) self.assertIn(f' net {net}', tmp) self.assertIn(f' domain-password clear {password}', tmp) self.assertIn(f' area-password clear {password}', tmp) for interface in self._interfaces: tmp = self.getFRRconfig(f'interface {interface}', daemon=isis_daemon) self.assertIn(f' isis password clear {password}-{interface}', tmp) def test_isis_06_spf_delay_bfd(self): network = 'point-to-point' holddown = '10' init_delay = '50' long_delay = '200' short_delay = '100' time_to_learn = '75' bfd_profile = 'isis-bfd' self.cli_set(base_path + ['net', net]) for interface in self._interfaces: self.cli_set(base_path + ['interface', interface, 'network', network]) self.cli_set(base_path + ['interface', interface, 'bfd', 'profile', bfd_profile]) self.cli_set(base_path + ['spf-delay-ietf', 'holddown', holddown]) # verify() - All types of spf-delay must be configured with self.assertRaises(ConfigSessionError): self.cli_commit() self.cli_set(base_path + ['spf-delay-ietf', 'init-delay', init_delay]) # verify() - All types of spf-delay must be configured with self.assertRaises(ConfigSessionError): self.cli_commit() self.cli_set(base_path + ['spf-delay-ietf', 'long-delay', long_delay]) # verify() - All types of spf-delay must be configured with self.assertRaises(ConfigSessionError): self.cli_commit() self.cli_set(base_path + ['spf-delay-ietf', 'short-delay', short_delay]) # verify() - All types of spf-delay must be configured with self.assertRaises(ConfigSessionError): self.cli_commit() self.cli_set(base_path + ['spf-delay-ietf', 'time-to-learn', time_to_learn]) # Commit all changes self.cli_commit() # Verify all changes tmp = self.getFRRconfig(f'router isis {domain}', daemon=isis_daemon) self.assertIn(f' net {net}', tmp) self.assertIn(f' spf-delay-ietf init-delay {init_delay} short-delay {short_delay} long-delay {long_delay} holddown {holddown} time-to-learn {time_to_learn}', tmp) for interface in self._interfaces: tmp = self.getFRRconfig(f'interface {interface}', daemon=isis_daemon) self.assertIn(f' ip router isis {domain}', tmp) self.assertIn(f' ipv6 router isis {domain}', tmp) self.assertIn(f' isis network {network}', tmp) self.assertIn(f' isis bfd', tmp) self.assertIn(f' isis bfd profile {bfd_profile}', tmp) def test_isis_07_segment_routing_configuration(self): global_block_low = "300" global_block_high = "399" local_block_low = "400" local_block_high = "499" interface = 'lo' maximum_stack_size = '5' prefix_one = '192.168.0.1/32' prefix_two = '192.168.0.2/32' prefix_three = '192.168.0.3/32' prefix_four = '192.168.0.4/32' prefix_one_value = '1' prefix_two_value = '2' prefix_three_value = '60000' prefix_four_value = '65000' self.cli_set(base_path + ['net', net]) self.cli_set(base_path + ['interface', interface]) self.cli_set(base_path + ['segment-routing', 'maximum-label-depth', maximum_stack_size]) self.cli_set(base_path + ['segment-routing', 'global-block', 'low-label-value', global_block_low]) self.cli_set(base_path + ['segment-routing', 'global-block', 'high-label-value', global_block_high]) self.cli_set(base_path + ['segment-routing', 'local-block', 'low-label-value', local_block_low]) self.cli_set(base_path + ['segment-routing', 'local-block', 'high-label-value', local_block_high]) self.cli_set(base_path + ['segment-routing', 'prefix', prefix_one, 'index', 'value', prefix_one_value]) self.cli_set(base_path + ['segment-routing', 'prefix', prefix_one, 'index', 'explicit-null']) self.cli_set(base_path + ['segment-routing', 'prefix', prefix_two, 'index', 'value', prefix_two_value]) self.cli_set(base_path + ['segment-routing', 'prefix', prefix_two, 'index', 'no-php-flag']) self.cli_set(base_path + ['segment-routing', 'prefix', prefix_three, 'absolute', 'value', prefix_three_value]) self.cli_set(base_path + ['segment-routing', 'prefix', prefix_three, 'absolute', 'explicit-null']) self.cli_set(base_path + ['segment-routing', 'prefix', prefix_four, 'absolute', 'value', prefix_four_value]) self.cli_set(base_path + ['segment-routing', 'prefix', prefix_four, 'absolute', 'no-php-flag']) # Commit all changes self.cli_commit() # Verify all changes tmp = self.getFRRconfig(f'router isis {domain}', daemon=isis_daemon) self.assertIn(f' net {net}', tmp) self.assertIn(f' segment-routing on', tmp) self.assertIn(f' segment-routing global-block {global_block_low} {global_block_high} local-block {local_block_low} {local_block_high}', tmp) self.assertIn(f' segment-routing node-msd {maximum_stack_size}', tmp) self.assertIn(f' segment-routing prefix {prefix_one} index {prefix_one_value} explicit-null', tmp) self.assertIn(f' segment-routing prefix {prefix_two} index {prefix_two_value} no-php-flag', tmp) self.assertIn(f' segment-routing prefix {prefix_three} absolute {prefix_three_value} explicit-null', tmp) self.assertIn(f' segment-routing prefix {prefix_four} absolute {prefix_four_value} no-php-flag', tmp) def test_isis_08_ldp_sync(self): holddown = "500" interface = 'lo' self.cli_set(base_path + ['net', net]) self.cli_set(base_path + ['interface', interface]) self.cli_set(base_path + ['ldp-sync', 'holddown', holddown]) # Commit main ISIS changes self.cli_commit() # Verify main ISIS changes tmp = self.getFRRconfig(f'router isis {domain}', daemon=isis_daemon) self.assertIn(f' net {net}', tmp) self.assertIn(f' mpls ldp-sync', tmp) self.assertIn(f' mpls ldp-sync holddown {holddown}', tmp) for interface in self._interfaces: self.cli_set(base_path + ['interface', interface, 'ldp-sync', 'holddown', holddown]) # Commit interface changes for holddown self.cli_commit() for interface in self._interfaces: # Verify interface changes for holddown tmp = self.getFRRconfig(f'interface {interface}', daemon=isis_daemon) self.assertIn(f'interface {interface}', tmp) self.assertIn(f' ip router isis {domain}', tmp) self.assertIn(f' ipv6 router isis {domain}', tmp) self.assertIn(f' isis mpls ldp-sync holddown {holddown}', tmp) for interface in self._interfaces: self.cli_set(base_path + ['interface', interface, 'ldp-sync', 'disable']) # Commit interface changes for disable self.cli_commit() for interface in self._interfaces: # Verify interface changes for disable tmp = self.getFRRconfig(f'interface {interface}', daemon=isis_daemon) self.assertIn(f'interface {interface}', tmp) self.assertIn(f' ip router isis {domain}', tmp) self.assertIn(f' ipv6 router isis {domain}', tmp) self.assertIn(f' no isis mpls ldp-sync', tmp) def test_isis_09_lfa(self): prefix_list = 'lfa-prefix-list-test-1' prefix_list_address = '192.168.255.255/32' interface = 'lo' self.cli_set(base_path + ['net', net]) self.cli_set(base_path + ['interface', interface]) self.cli_set(['policy', 'prefix-list', prefix_list, 'rule', '1', 'action', 'permit']) self.cli_set(['policy', 'prefix-list', prefix_list, 'rule', '1', 'prefix', prefix_list_address]) # Commit main ISIS changes self.cli_commit() # Add remote portion of LFA with prefix list with validation for level in ['level-1', 'level-2']: self.cli_set(base_path + ['fast-reroute', 'lfa', 'remote', 'prefix-list', prefix_list, level]) self.cli_commit() tmp = self.getFRRconfig(f'router isis {domain}', daemon=isis_daemon) self.assertIn(f' net {net}', tmp) self.assertIn(f' fast-reroute remote-lfa prefix-list {prefix_list} {level}', tmp) self.cli_delete(base_path + ['fast-reroute']) self.cli_commit() # Add local portion of LFA load-sharing portion with validation for level in ['level-1', 'level-2']: self.cli_set(base_path + ['fast-reroute', 'lfa', 'local', 'load-sharing', 'disable', level]) self.cli_commit() tmp = self.getFRRconfig(f'router isis {domain}', daemon=isis_daemon) self.assertIn(f' net {net}', tmp) self.assertIn(f' fast-reroute load-sharing disable {level}', tmp) self.cli_delete(base_path + ['fast-reroute']) self.cli_commit() # Add local portion of LFA priority-limit portion with validation for priority in ['critical', 'high', 'medium']: for level in ['level-1', 'level-2']: self.cli_set(base_path + ['fast-reroute', 'lfa', 'local', 'priority-limit', priority, level]) self.cli_commit() tmp = self.getFRRconfig(f'router isis {domain}', daemon=isis_daemon) self.assertIn(f' net {net}', tmp) self.assertIn(f' fast-reroute priority-limit {priority} {level}', tmp) self.cli_delete(base_path + ['fast-reroute']) self.cli_commit() # Add local portion of LFA tiebreaker portion with validation index = '100' for tiebreaker in ['downstream','lowest-backup-metric','node-protecting']: for level in ['level-1', 'level-2']: self.cli_set(base_path + ['fast-reroute', 'lfa', 'local', 'tiebreaker', tiebreaker, 'index', index, level]) self.cli_commit() tmp = self.getFRRconfig(f'router isis {domain}', daemon=isis_daemon) self.assertIn(f' net {net}', tmp) self.assertIn(f' fast-reroute lfa tiebreaker {tiebreaker} index {index} {level}', tmp) self.cli_delete(base_path + ['fast-reroute']) self.cli_commit() # Clean up and remove prefix list self.cli_delete(['policy', 'prefix-list', prefix_list]) self.cli_commit() def test_isis_10_topology(self): topologies = ['ipv4-multicast', 'ipv4-mgmt', 'ipv6-unicast', 'ipv6-multicast', 'ipv6-mgmt'] interface = 'lo' # Set a basic IS-IS config self.cli_set(base_path + ['net', net]) self.cli_set(base_path + ['interface', interface]) for topology in topologies: self.cli_set(base_path + ['topology', topology]) self.cli_commit() tmp = self.getFRRconfig(f'router isis {domain}', daemon=isis_daemon) self.assertIn(f' net {net}', tmp) self.assertIn(f' topology {topology}', tmp) if __name__ == '__main__': unittest.main(verbosity=2) diff --git a/smoketest/scripts/cli/test_protocols_openfabric.py b/smoketest/scripts/cli/test_protocols_openfabric.py index 889cba135..3e99656ec 100644 --- a/smoketest/scripts/cli/test_protocols_openfabric.py +++ b/smoketest/scripts/cli/test_protocols_openfabric.py @@ -1,187 +1,187 @@ #!/usr/bin/env python3 # # Copyright (C) 2024 VyOS maintainers and contributors # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License version 2 or later as # published by the Free Software Foundation. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. import unittest from base_vyostest_shim import VyOSUnitTestSHIM from vyos.configsession import ConfigSessionError from vyos.utils.process import process_named_running -from vyos.frr import openfabric_daemon +from vyos.frrender import openfabric_daemon PROCESS_NAME = 'fabricd' base_path = ['protocols', 'openfabric'] domain = 'VyOS' net = '49.0001.1111.1111.1111.00' dummy_if = 'dum1234' address_families = ['ipv4', 'ipv6'] path = base_path + ['domain', domain] class TestProtocolsOpenFabric(VyOSUnitTestSHIM.TestCase): @classmethod def setUpClass(cls): # call base-classes classmethod super(TestProtocolsOpenFabric, cls).setUpClass() # Retrieve FRR daemon PID - it is not allowed to crash, thus PID must remain the same cls.daemon_pid = process_named_running(PROCESS_NAME) # ensure we can also run this test on a live system - so lets clean # out the current configuration :) cls.cli_delete(cls, base_path) def tearDown(self): self.cli_delete(base_path) self.cli_commit() # check process health and continuity self.assertEqual(self.daemon_pid, process_named_running(PROCESS_NAME)) def openfabric_base_config(self): self.cli_set(['interfaces', 'dummy', dummy_if]) self.cli_set(base_path + ['net', net]) for family in address_families: self.cli_set(path + ['interface', dummy_if, 'address-family', family]) def test_openfabric_01_router_params(self): fabric_tier = '5' lsp_gen_interval = '20' self.cli_set(base_path) # verify() - net id and domain name are mandatory with self.assertRaises(ConfigSessionError): self.cli_commit() self.openfabric_base_config() self.cli_set(path + ['log-adjacency-changes']) self.cli_set(path + ['set-overload-bit']) self.cli_set(path + ['fabric-tier', fabric_tier]) self.cli_set(path + ['lsp-gen-interval', lsp_gen_interval]) # Commit all changes self.cli_commit() # Verify all changes tmp = self.getFRRconfig(f'router openfabric {domain}', daemon=openfabric_daemon) self.assertIn(f' net {net}', tmp) self.assertIn(f' log-adjacency-changes', tmp) self.assertIn(f' set-overload-bit', tmp) self.assertIn(f' fabric-tier {fabric_tier}', tmp) self.assertIn(f' lsp-gen-interval {lsp_gen_interval}', tmp) tmp = self.getFRRconfig(f'interface {dummy_if}', daemon=openfabric_daemon) self.assertIn(f' ip router openfabric {domain}', tmp) self.assertIn(f' ipv6 router openfabric {domain}', tmp) def test_openfabric_02_loopback_interface(self): interface = 'lo' hello_interval = '100' metric = '24478' self.openfabric_base_config() self.cli_set(path + ['interface', interface, 'address-family', 'ipv4']) self.cli_set(path + ['interface', interface, 'hello-interval', hello_interval]) self.cli_set(path + ['interface', interface, 'metric', metric]) # Commit all changes self.cli_commit() # Verify FRR openfabric configuration tmp = self.getFRRconfig(f'router openfabric {domain}', daemon=openfabric_daemon) self.assertIn(f'router openfabric {domain}', tmp) self.assertIn(f' net {net}', tmp) # Verify interface configuration tmp = self.getFRRconfig(f'interface {interface}', daemon=openfabric_daemon) self.assertIn(f' ip router openfabric {domain}', tmp) # for lo interface 'openfabric passive' is implied self.assertIn(f' openfabric passive', tmp) self.assertIn(f' openfabric metric {metric}', tmp) def test_openfabric_03_password(self): password = 'foo' self.openfabric_base_config() self.cli_set(path + ['interface', dummy_if, 'password', 'plaintext-password', f'{password}-{dummy_if}']) self.cli_set(path + ['interface', dummy_if, 'password', 'md5', f'{password}-{dummy_if}']) # verify() - can not use both md5 and plaintext-password for password for the interface with self.assertRaises(ConfigSessionError): self.cli_commit() self.cli_delete(path + ['interface', dummy_if, 'password', 'md5']) self.cli_set(path + ['domain-password', 'plaintext-password', password]) self.cli_set(path + ['domain-password', 'md5', password]) # verify() - can not use both md5 and plaintext-password for domain-password with self.assertRaises(ConfigSessionError): self.cli_commit() self.cli_delete(path + ['domain-password', 'md5']) # Commit all changes self.cli_commit() # Verify all changes tmp = self.getFRRconfig(f'router openfabric {domain}', daemon=openfabric_daemon) self.assertIn(f' net {net}', tmp) self.assertIn(f' domain-password clear {password}', tmp) tmp = self.getFRRconfig(f'interface {dummy_if}', daemon=openfabric_daemon) self.assertIn(f' openfabric password clear {password}-{dummy_if}', tmp) def test_openfabric_multiple_domains(self): domain_2 = 'VyOS_2' interface = 'dum5678' new_path = base_path + ['domain', domain_2] self.openfabric_base_config() # set same interface for 2 OpenFabric domains self.cli_set(['interfaces', 'dummy', interface]) self.cli_set(new_path + ['interface', interface, 'address-family', 'ipv4']) self.cli_set(path + ['interface', interface, 'address-family', 'ipv4']) # verify() - same interface can be used only for one OpenFabric instance with self.assertRaises(ConfigSessionError): self.cli_commit() self.cli_delete(path + ['interface', interface]) # Commit all changes self.cli_commit() # Verify FRR openfabric configuration tmp = self.getFRRconfig(f'router openfabric {domain}', daemon=openfabric_daemon) self.assertIn(f'router openfabric {domain}', tmp) self.assertIn(f' net {net}', tmp) tmp = self.getFRRconfig(f'router openfabric {domain_2}', daemon=openfabric_daemon) self.assertIn(f'router openfabric {domain_2}', tmp) self.assertIn(f' net {net}', tmp) # Verify interface configuration tmp = self.getFRRconfig(f'interface {dummy_if}', daemon=openfabric_daemon) self.assertIn(f' ip router openfabric {domain}', tmp) self.assertIn(f' ipv6 router openfabric {domain}', tmp) tmp = self.getFRRconfig(f'interface {interface}', daemon=openfabric_daemon) self.assertIn(f' ip router openfabric {domain_2}', tmp) if __name__ == '__main__': unittest.main(verbosity=2) diff --git a/smoketest/scripts/cli/test_protocols_segment-routing.py b/smoketest/scripts/cli/test_protocols_segment-routing.py index eb563db93..8905d5e45 100755 --- a/smoketest/scripts/cli/test_protocols_segment-routing.py +++ b/smoketest/scripts/cli/test_protocols_segment-routing.py @@ -1,111 +1,111 @@ #!/usr/bin/env python3 # # Copyright (C) 2023-2024 VyOS maintainers and contributors # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License version 2 or later as # published by the Free Software Foundation. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. import unittest from base_vyostest_shim import VyOSUnitTestSHIM from vyos.configsession import ConfigSessionError from vyos.ifconfig import Section -from vyos.frr import zebra_daemon +from vyos.frrender import zebra_daemon from vyos.utils.process import process_named_running from vyos.utils.system import sysctl_read base_path = ['protocols', 'segment-routing'] PROCESS_NAME = 'zebra' class TestProtocolsSegmentRouting(VyOSUnitTestSHIM.TestCase): @classmethod def setUpClass(cls): # call base-classes classmethod super(TestProtocolsSegmentRouting, cls).setUpClass() # Retrieve FRR daemon PID - it is not allowed to crash, thus PID must remain the same cls.daemon_pid = process_named_running(PROCESS_NAME) # ensure we can also run this test on a live system - so lets clean # out the current configuration :) cls.cli_delete(cls, base_path) def tearDown(self): self.cli_delete(base_path) self.cli_commit() # check process health and continuity self.assertEqual(self.daemon_pid, process_named_running(PROCESS_NAME)) def test_srv6(self): interfaces = Section.interfaces('ethernet', vlan=False) locators = { 'foo' : { 'prefix' : '2001:a::/64' }, 'foo' : { 'prefix' : '2001:b::/64', 'usid' : {} }, } for locator, locator_config in locators.items(): self.cli_set(base_path + ['srv6', 'locator', locator, 'prefix', locator_config['prefix']]) if 'usid' in locator_config: self.cli_set(base_path + ['srv6', 'locator', locator, 'behavior-usid']) # verify() - SRv6 should be enabled on at least one interface! with self.assertRaises(ConfigSessionError): self.cli_commit() for interface in interfaces: self.cli_set(base_path + ['interface', interface, 'srv6']) self.cli_commit() for interface in interfaces: self.assertEqual(sysctl_read(f'net.ipv6.conf.{interface}.seg6_enabled'), '1') self.assertEqual(sysctl_read(f'net.ipv6.conf.{interface}.seg6_require_hmac'), '0') # default frrconfig = self.getFRRconfig(f'segment-routing', daemon=zebra_daemon) self.assertIn(f'segment-routing', frrconfig) self.assertIn(f' srv6', frrconfig) self.assertIn(f' locators', frrconfig) for locator, locator_config in locators.items(): self.assertIn(f' locator {locator}', frrconfig) self.assertIn(f' prefix {locator_config["prefix"]} block-len 40 node-len 24 func-bits 16', frrconfig) def test_srv6_sysctl(self): interfaces = Section.interfaces('ethernet', vlan=False) # HMAC accept for interface in interfaces: self.cli_set(base_path + ['interface', interface, 'srv6']) self.cli_set(base_path + ['interface', interface, 'srv6', 'hmac', 'ignore']) self.cli_commit() for interface in interfaces: self.assertEqual(sysctl_read(f'net.ipv6.conf.{interface}.seg6_enabled'), '1') self.assertEqual(sysctl_read(f'net.ipv6.conf.{interface}.seg6_require_hmac'), '-1') # ignore # HMAC drop for interface in interfaces: self.cli_set(base_path + ['interface', interface, 'srv6']) self.cli_set(base_path + ['interface', interface, 'srv6', 'hmac', 'drop']) self.cli_commit() for interface in interfaces: self.assertEqual(sysctl_read(f'net.ipv6.conf.{interface}.seg6_enabled'), '1') self.assertEqual(sysctl_read(f'net.ipv6.conf.{interface}.seg6_require_hmac'), '1') # drop # Disable SRv6 on first interface first_if = interfaces[-1] self.cli_delete(base_path + ['interface', first_if]) self.cli_commit() self.assertEqual(sysctl_read(f'net.ipv6.conf.{first_if}.seg6_enabled'), '0') if __name__ == '__main__': unittest.main(verbosity=2) diff --git a/smoketest/scripts/cli/test_system_ipv6.py b/smoketest/scripts/cli/test_system_ipv6.py index be9751c4d..ebf620204 100755 --- a/smoketest/scripts/cli/test_system_ipv6.py +++ b/smoketest/scripts/cli/test_system_ipv6.py @@ -1,155 +1,155 @@ #!/usr/bin/env python3 # # Copyright (C) 2021-2024 VyOS maintainers and contributors # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License version 2 or later as # published by the Free Software Foundation. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. import unittest from base_vyostest_shim import VyOSUnitTestSHIM from vyos.configsession import ConfigSessionError from vyos.utils.system import sysctl_read from vyos.xml_ref import default_value from vyos.frrender import mgmt_daemon from vyos.frrender import zebra_daemon base_path = ['system', 'ipv6'] class TestSystemIPv6(VyOSUnitTestSHIM.TestCase): @classmethod def setUpClass(cls): super(TestSystemIPv6, cls).setUpClass() # ensure we can also run this test on a live system - so lets clean # out the current configuration :) cls.cli_delete(cls, base_path) def tearDown(self): self.cli_delete(base_path) self.cli_commit() def test_system_ipv6_forwarding(self): # Test if IPv6 forwarding can be disabled globally, default is '1' # which means forwearding enabled self.assertEqual(sysctl_read('net.ipv6.conf.all.forwarding'), '1') self.cli_set(base_path + ['disable-forwarding']) self.cli_commit() self.assertEqual(sysctl_read('net.ipv6.conf.all.forwarding'), '0') frrconfig = self.getFRRconfig('', end='', daemon=zebra_daemon) self.assertIn('no ipv6 forwarding', frrconfig) self.cli_delete(base_path + ['disable-forwarding']) self.cli_commit() self.assertEqual(sysctl_read('net.ipv6.conf.all.forwarding'), '1') frrconfig = self.getFRRconfig('', end='', daemon=zebra_daemon) self.assertNotIn('no ipv6 forwarding', frrconfig) def test_system_ipv6_strict_dad(self): # This defaults to 1 self.assertEqual(sysctl_read('net.ipv6.conf.all.accept_dad'), '1') # Do not assign any IPv6 address on interfaces, this requires a reboot # which can not be tested, but we can read the config file :) self.cli_set(base_path + ['strict-dad']) self.cli_commit() # Verify configuration file self.assertEqual(sysctl_read('net.ipv6.conf.all.accept_dad'), '2') def test_system_ipv6_multipath(self): # This defaults to 0 self.assertEqual(sysctl_read('net.ipv6.fib_multipath_hash_policy'), '0') # Do not assign any IPv6 address on interfaces, this requires a reboot # which can not be tested, but we can read the config file :) self.cli_set(base_path + ['multipath', 'layer4-hashing']) self.cli_commit() # Verify configuration file self.assertEqual(sysctl_read('net.ipv6.fib_multipath_hash_policy'), '1') def test_system_ipv6_neighbor_table_size(self): # Maximum number of entries to keep in the ARP cache, the # default is 8192 cli_default = int(default_value(base_path + ['neighbor', 'table-size'])) def _verify_gc_thres(table_size): self.assertEqual(sysctl_read('net.ipv6.neigh.default.gc_thresh3'), str(table_size)) self.assertEqual(sysctl_read('net.ipv6.neigh.default.gc_thresh2'), str(table_size // 2)) self.assertEqual(sysctl_read('net.ipv6.neigh.default.gc_thresh1'), str(table_size // 8)) _verify_gc_thres(cli_default) for size in [1024, 2048, 4096, 8192, 16384, 32768]: self.cli_set(base_path + ['neighbor', 'table-size', str(size)]) self.cli_commit() _verify_gc_thres(size) def test_system_ipv6_protocol_route_map(self): protocols = ['any', 'babel', 'bgp', 'connected', 'isis', 'kernel', 'ospfv3', 'ripng', 'static', 'table'] for protocol in protocols: route_map = 'route-map-' + protocol.replace('ospfv3', 'ospf6') self.cli_set(['policy', 'route-map', route_map, 'rule', '10', 'action', 'permit']) self.cli_set(base_path + ['protocol', protocol, 'route-map', route_map]) self.cli_commit() # Verify route-map properly applied to FRR frrconfig = self.getFRRconfig('ipv6 protocol', end='', daemon=mgmt_daemon) for protocol in protocols: # VyOS and FRR use a different name for OSPFv3 (IPv6) if protocol == 'ospfv3': protocol = 'ospf6' self.assertIn(f'ipv6 protocol {protocol} route-map route-map-{protocol}', frrconfig) # Delete route-maps self.cli_delete(['policy', 'route-map']) self.cli_delete(base_path + ['protocol']) self.cli_commit() # Verify route-map properly applied to FRR frrconfig = self.getFRRconfig('ipv6 protocol', end='', daemon=mgmt_daemon) self.assertNotIn(f'ipv6 protocol', frrconfig) def test_system_ipv6_protocol_non_existing_route_map(self): non_existing = 'non-existing6' self.cli_set(base_path + ['protocol', 'static', 'route-map', non_existing]) # VRF does yet not exist - an error must be thrown with self.assertRaises(ConfigSessionError): self.cli_commit() self.cli_set(['policy', 'route-map', non_existing, 'rule', '10', 'action', 'deny']) # Commit again self.cli_commit() def test_system_ipv6_nht(self): self.cli_set(base_path + ['nht', 'no-resolve-via-default']) self.cli_commit() # Verify CLI config applied to FRR frrconfig = self.getFRRconfig('', end='', daemon=mgmt_daemon) self.assertIn(f'no ipv6 nht resolve-via-default', frrconfig) self.cli_delete(base_path + ['nht', 'no-resolve-via-default']) self.cli_commit() # Verify CLI config removed to FRR frrconfig = self.getFRRconfig('', end='', daemon=mgmt_daemon) self.assertNotIn(f'no ipv6 nht resolve-via-default', frrconfig) if __name__ == '__main__': - unittest.main(verbosity=2, failfast=True) + unittest.main(verbosity=2)