diff --git a/python/vyos/configdict.py b/python/vyos/configdict.py index 9522d8fcc..fb6365060 100644 --- a/python/vyos/configdict.py +++ b/python/vyos/configdict.py @@ -1,1156 +1,1156 @@ # Copyright 2019-2024 VyOS maintainers and contributors <maintainers@vyos.io> # # This library is free software; you can redistribute it and/or # modify it under the terms of the GNU Lesser General Public # License as published by the Free Software Foundation; either # version 2.1 of the License, or (at your option) any later version. # # This library is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU # Lesser General Public License for more details. # # You should have received a copy of the GNU Lesser General Public # License along with this library. If not, see <http://www.gnu.org/licenses/>. """ A library for retrieving value dicts from VyOS configs in a declarative fashion. """ import os import json from vyos.defaults import frr_debug_enable from vyos.utils.dict import dict_search from vyos.utils.process import cmd def retrieve_config(path_hash, base_path, config): """ Retrieves a VyOS config as a dict according to a declarative description The description dict, passed in the first argument, must follow this format: ``field_name : <path, type, [inner_options_dict]>``. Supported types are: ``str`` (for normal nodes), ``list`` (returns a list of strings, for multi nodes), ``bool`` (returns True if valueless node exists), ``dict`` (for tag nodes, returns a dict indexed by node names, according to description in the third item of the tuple). Args: path_hash (dict): Declarative description of the config to retrieve base_path (list): A base path to prepend to all option paths config (vyos.config.Config): A VyOS config object Returns: dict: config dict """ config_hash = {} for k in path_hash: if type(path_hash[k]) != tuple: raise ValueError("In field {0}: expected a tuple, got a value {1}".format(k, str(path_hash[k]))) if len(path_hash[k]) < 2: raise ValueError("In field {0}: field description must be a tuple of at least two items, path (list) and type".format(k)) path = path_hash[k][0] if type(path) != list: raise ValueError("In field {0}: path must be a list, not a {1}".format(k, type(path))) typ = path_hash[k][1] if type(typ) != type: raise ValueError("In field {0}: type must be a type, not a {1}".format(k, type(typ))) path = base_path + path path_str = " ".join(path) if typ == str: config_hash[k] = config.return_value(path_str) elif typ == list: config_hash[k] = config.return_values(path_str) elif typ == bool: config_hash[k] = config.exists(path_str) elif typ == dict: try: inner_hash = path_hash[k][2] except IndexError: raise ValueError("The type of the \'{0}\' field is dict, but inner options hash is missing from the tuple".format(k)) config_hash[k] = {} nodes = config.list_nodes(path_str) for node in nodes: config_hash[k][node] = retrieve_config(inner_hash, path + [node], config) return config_hash def dict_merge(source, destination): """ Merge two dictionaries. Only keys which are not present in destination will be copied from source, anything else will be kept untouched. Function will return a new dict which has the merged key/value pairs. """ from copy import deepcopy tmp = deepcopy(destination) for key, value in source.items(): if key not in tmp: tmp[key] = value elif isinstance(source[key], dict): tmp[key] = dict_merge(source[key], tmp[key]) return tmp def list_diff(first, second): """ Diff two dictionaries and return only unique items """ second = set(second) return [item for item in first if item not in second] def is_node_changed(conf, path): """ Check if any key under path has been changed and return True. If nothing changed, return false """ from vyos.configdiff import get_config_diff D = get_config_diff(conf, key_mangling=('-', '_')) return D.is_node_changed(path) def leaf_node_changed(conf, path): """ Check if a leaf node was altered. If it has been altered - values has been changed, or it was added/removed, we will return a list containing the old value(s). If nothing has been changed, None is returned. NOTE: path must use the real CLI node name (e.g. with a hyphen!) """ from vyos.configdiff import get_config_diff D = get_config_diff(conf, key_mangling=('-', '_')) (new, old) = D.get_value_diff(path) if new != old: if isinstance(old, dict): # valueLess nodes return {} if node is deleted return True if old is None and isinstance(new, dict): # valueLess nodes return {} if node was added return True if old is None: return [] if isinstance(old, str): return [old] if isinstance(old, list): if isinstance(new, str): new = [new] elif isinstance(new, type(None)): new = [] return list_diff(old, new) return None def node_changed(conf, path, key_mangling=None, recursive=False, expand_nodes=None) -> list: """ Check if node under path (or anything under path if recursive=True) was changed. By default we only check if a node or subnode (recursive) was deleted from path. If expand_nodes is set to Diff.ADD we can also check if something was added to the path. If nothing changed, an empty list is returned. """ from vyos.configdiff import get_config_diff from vyos.configdiff import Diff # to prevent circular dependencies we assign the default here if not expand_nodes: expand_nodes = Diff.DELETE D = get_config_diff(conf, key_mangling) # get_child_nodes_diff() will return dict_keys() tmp = D.get_child_nodes_diff(path, expand_nodes=expand_nodes, recursive=recursive) output = [] if expand_nodes & Diff.DELETE: output.extend(list(tmp['delete'].keys())) if expand_nodes & Diff.ADD: output.extend(list(tmp['add'].keys())) # remove duplicate keys from list, this happens when a node (e.g. description) is altered output = list(dict.fromkeys(output)) return output def get_removed_vlans(conf, path, dict): """ Common function to parse a dictionary retrieved via get_config_dict() and determine any added/removed VLAN interfaces - be it 802.1q or Q-in-Q. """ from vyos.configdiff import get_config_diff, Diff # Check vif, vif-s/vif-c VLAN interfaces for removal D = get_config_diff(conf, key_mangling=('-', '_')) D.set_level(conf.get_level()) # get_child_nodes() will return dict_keys(), mangle this into a list with PEP448 keys = D.get_child_nodes_diff(path + ['vif'], expand_nodes=Diff.DELETE)['delete'].keys() if keys: dict['vif_remove'] = [*keys] # get_child_nodes() will return dict_keys(), mangle this into a list with PEP448 keys = D.get_child_nodes_diff(path + ['vif-s'], expand_nodes=Diff.DELETE)['delete'].keys() if keys: dict['vif_s_remove'] = [*keys] for vif in dict.get('vif_s', {}).keys(): keys = D.get_child_nodes_diff(path + ['vif-s', vif, 'vif-c'], expand_nodes=Diff.DELETE)['delete'].keys() if keys: dict['vif_s'][vif]['vif_c_remove'] = [*keys] return dict def is_member(conf, interface, intftype=None): """ Checks if passed interface is member of other interface of specified type. intftype is optional, if not passed it will search all known types (currently bridge and bonding) Returns: dict empty -> Interface is not a member key -> Interface is a member of this interface """ ret_val = {} intftypes = ['bonding', 'bridge'] if intftype not in intftypes + [None]: raise ValueError(( f'unknown interface type "{intftype}" or it cannot ' f'have member interfaces')) intftype = intftypes if intftype == None else [intftype] for iftype in intftype: base = ['interfaces', iftype] for intf in conf.list_nodes(base): member = base + [intf, 'member', 'interface', interface] if conf.exists(member): tmp = conf.get_config_dict(member, key_mangling=('-', '_'), get_first_key=True, no_tag_node_value_mangle=True) ret_val.update({intf : tmp}) return ret_val def is_mirror_intf(conf, interface, direction=None): """ Check whether the passed interface is used for port mirroring. Direction is optional, if not passed it will search all known direction (currently ingress and egress) Returns: None -> Interface is not a monitor interface Array() -> This interface is a monitor interface of interfaces """ from vyos.ifconfig import Section directions = ['ingress', 'egress'] if direction not in directions + [None]: raise ValueError(f'Unknown interface mirror direction "{direction}"') direction = directions if direction == None else [direction] ret_val = None base = ['interfaces'] for dir in direction: for iftype in conf.list_nodes(base): iftype_base = base + [iftype] for intf in conf.list_nodes(iftype_base): mirror = iftype_base + [intf, 'mirror', dir, interface] if conf.exists(mirror): path = ['interfaces', Section.section(intf), intf] tmp = conf.get_config_dict(path, key_mangling=('-', '_'), get_first_key=True) ret_val = {intf : tmp} return ret_val def has_address_configured(conf, intf): """ Checks if interface has an address configured. Checks the following config nodes: 'address', 'ipv6 address eui64', 'ipv6 address autoconf' Returns True if interface has address configured, False if it doesn't. """ from vyos.ifconfig import Section ret = False old_level = conf.get_level() conf.set_level([]) intfpath = ['interfaces', Section.get_config_path(intf)] if (conf.exists([intfpath, 'address']) or conf.exists([intfpath, 'ipv6', 'address', 'autoconf']) or conf.exists([intfpath, 'ipv6', 'address', 'eui64'])): ret = True conf.set_level(old_level) return ret def has_vrf_configured(conf, intf): """ Checks if interface has a VRF configured. Returns True if interface has VRF configured, False if it doesn't. """ from vyos.ifconfig import Section ret = False old_level = conf.get_level() conf.set_level([]) if conf.exists(['interfaces', Section.get_config_path(intf), 'vrf']): ret = True conf.set_level(old_level) return ret def has_vlan_subinterface_configured(conf, intf): """ Checks if interface has an VLAN subinterface configured. Checks the following config nodes: 'vif', 'vif-s' Return True if interface has VLAN subinterface configured. """ from vyos.ifconfig import Section ret = False intfpath = ['interfaces', Section.section(intf), intf] if (conf.exists(intfpath + ['vif']) or conf.exists(intfpath + ['vif-s'])): ret = True return ret def is_source_interface(conf, interface, intftype=None): """ Checks if passed interface is configured as source-interface of other interfaces of specified type. intftype is optional, if not passed it will search all known types (currently pppoe, macsec, pseudo-ethernet, tunnel and vxlan) Returns: None -> Interface is not a member interface name -> Interface is a member of this interface False -> interface type cannot have members """ ret_val = None intftypes = ['macsec', 'pppoe', 'pseudo-ethernet', 'tunnel', 'vxlan'] if not intftype: intftype = intftypes if isinstance(intftype, str): intftype = [intftype] elif not isinstance(intftype, list): raise ValueError(f'Interface type "{type(intftype)}" must be either str or list!') if not all(x in intftypes for x in intftype): raise ValueError(f'unknown interface type "{intftype}" or it can not ' 'have a source-interface') for it in intftype: base = ['interfaces', it] for intf in conf.list_nodes(base): src_intf = base + [intf, 'source-interface'] if conf.exists(src_intf) and interface in conf.return_values(src_intf): ret_val = intf break return ret_val def get_dhcp_interfaces(conf, vrf=None): """ Common helper functions to retrieve all interfaces from current CLI sessions that have DHCP configured. """ dhcp_interfaces = {} dict = conf.get_config_dict(['interfaces'], get_first_key=True) if not dict: return dhcp_interfaces def check_dhcp(config): ifname = config['ifname'] tmp = {} if 'address' in config and 'dhcp' in config['address']: options = {} if dict_search('dhcp_options.default_route_distance', config) != None: options.update({'dhcp_options' : config['dhcp_options']}) if 'vrf' in config: if vrf == config['vrf']: tmp.update({ifname : options}) else: if vrf is None: tmp.update({ifname : options}) return tmp for section, interface in dict.items(): for ifname in interface: # always reset config level, as get_interface_dict() will alter it conf.set_level([]) # we already have a dict representation of the config from get_config_dict(), # but with the extended information from get_interface_dict() we also # get the DHCP client default-route-distance default option if not specified. _, ifconfig = get_interface_dict(conf, ['interfaces', section], ifname) tmp = check_dhcp(ifconfig) dhcp_interfaces.update(tmp) # check per VLAN interfaces for vif, vif_config in ifconfig.get('vif', {}).items(): tmp = check_dhcp(vif_config) dhcp_interfaces.update(tmp) # check QinQ VLAN interfaces for vif_s, vif_s_config in ifconfig.get('vif_s', {}).items(): tmp = check_dhcp(vif_s_config) dhcp_interfaces.update(tmp) for vif_c, vif_c_config in vif_s_config.get('vif_c', {}).items(): tmp = check_dhcp(vif_c_config) dhcp_interfaces.update(tmp) return dhcp_interfaces def get_pppoe_interfaces(conf, vrf=None): """ Common helper functions to retrieve all interfaces from current CLI sessions that have DHCP configured. """ pppoe_interfaces = {} conf.set_level([]) for ifname in conf.list_nodes(['interfaces', 'pppoe']): # always reset config level, as get_interface_dict() will alter it conf.set_level([]) # we already have a dict representation of the config from get_config_dict(), # but with the extended information from get_interface_dict() we also # get the DHCP client default-route-distance default option if not specified. _, ifconfig = get_interface_dict(conf, ['interfaces', 'pppoe'], ifname) options = {} if 'default_route_distance' in ifconfig: options.update({'default_route_distance' : ifconfig['default_route_distance']}) if 'no_default_route' in ifconfig: options.update({'no_default_route' : {}}) if 'vrf' in ifconfig: if vrf == ifconfig['vrf']: pppoe_interfaces.update({ifname : options}) else: if vrf is None: pppoe_interfaces.update({ifname : options}) return pppoe_interfaces def get_interface_dict(config, base, ifname='', recursive_defaults=True, with_pki=False): """ Common utility function to retrieve and mangle the interfaces configuration from the CLI input nodes. All interfaces have a common base where value retrival is identical. This function must be used whenever possible when working on the interfaces node! Return a dictionary with the necessary interface config keys. """ if not ifname: from vyos import ConfigError # determine tagNode instance if 'VYOS_TAGNODE_VALUE' not in os.environ: raise ConfigError('Interface (VYOS_TAGNODE_VALUE) not specified') ifname = os.environ['VYOS_TAGNODE_VALUE'] # Check if interface has been removed. We must use exists() as # get_config_dict() will always return {} - even when an empty interface # node like the following exists. # +macsec macsec1 { # +} if not config.exists(base + [ifname]): dict = config.get_config_dict(base + [ifname], key_mangling=('-', '_'), get_first_key=True, no_tag_node_value_mangle=True) dict.update({'deleted' : {}}) else: # Get config_dict with default values dict = config.get_config_dict(base + [ifname], key_mangling=('-', '_'), get_first_key=True, no_tag_node_value_mangle=True, with_defaults=True, with_recursive_defaults=recursive_defaults, with_pki=with_pki) # If interface does not request an IPv4 DHCP address there is no need # to keep the dhcp-options key if 'address' not in dict or 'dhcp' not in dict['address']: if 'dhcp_options' in dict: del dict['dhcp_options'] # Add interface instance name into dictionary dict.update({'ifname': ifname}) # Check if QoS policy applied on this interface - See ifconfig.interface.set_mirror_redirect() if config.exists(['qos', 'interface', ifname]): dict.update({'traffic_policy': {}}) address = leaf_node_changed(config, base + [ifname, 'address']) if address: dict.update({'address_old' : address}) # Check if we are a member of a bridge device bridge = is_member(config, ifname, 'bridge') if bridge: dict.update({'is_bridge_member' : bridge}) # Check if it is a monitor interface mirror = is_mirror_intf(config, ifname) if mirror: dict.update({'is_mirror_intf' : mirror}) # Check if we are a member of a bond device bond = is_member(config, ifname, 'bonding') if bond: dict.update({'is_bond_member' : bond}) # Check if any DHCP options changed which require a client restat dhcp = is_node_changed(config, base + [ifname, 'dhcp-options']) if dhcp: dict.update({'dhcp_options_changed' : {}}) # Changine interface VRF assignemnts require a DHCP restart, too dhcp = is_node_changed(config, base + [ifname, 'vrf']) if dhcp: dict.update({'dhcp_options_changed' : {}}) # Some interfaces come with a source_interface which must also not be part # of any other bond or bridge interface as it is exclusivly assigned as the # Kernels "lower" interface to this new "virtual/upper" interface. if 'source_interface' in dict: # Check if source interface is member of another bridge tmp = is_member(config, dict['source_interface'], 'bridge') if tmp: dict.update({'source_interface_is_bridge_member' : tmp}) # Check if source interface is member of another bridge tmp = is_member(config, dict['source_interface'], 'bonding') if tmp: dict.update({'source_interface_is_bond_member' : tmp}) mac = leaf_node_changed(config, base + [ifname, 'mac']) if mac: dict.update({'mac_old' : mac}) eui64 = leaf_node_changed(config, base + [ifname, 'ipv6', 'address', 'eui64']) if eui64: tmp = dict_search('ipv6.address', dict) if not tmp: dict.update({'ipv6': {'address': {'eui64_old': eui64}}}) else: dict['ipv6']['address'].update({'eui64_old': eui64}) for vif, vif_config in dict.get('vif', {}).items(): # Add subinterface name to dictionary dict['vif'][vif].update({'ifname' : f'{ifname}.{vif}'}) if config.exists(['qos', 'interface', f'{ifname}.{vif}']): dict['vif'][vif].update({'traffic_policy': {}}) if 'deleted' not in dict: address = leaf_node_changed(config, base + [ifname, 'vif', vif, 'address']) if address: dict['vif'][vif].update({'address_old' : address}) # If interface does not request an IPv4 DHCP address there is no need # to keep the dhcp-options key if 'address' not in dict['vif'][vif] or 'dhcp' not in dict['vif'][vif]['address']: if 'dhcp_options' in dict['vif'][vif]: del dict['vif'][vif]['dhcp_options'] # Check if we are a member of a bridge device bridge = is_member(config, f'{ifname}.{vif}', 'bridge') if bridge: dict['vif'][vif].update({'is_bridge_member' : bridge}) # Check if any DHCP options changed which require a client restat dhcp = is_node_changed(config, base + [ifname, 'vif', vif, 'dhcp-options']) if dhcp: dict['vif'][vif].update({'dhcp_options_changed' : {}}) for vif_s, vif_s_config in dict.get('vif_s', {}).items(): # Add subinterface name to dictionary dict['vif_s'][vif_s].update({'ifname' : f'{ifname}.{vif_s}'}) if config.exists(['qos', 'interface', f'{ifname}.{vif_s}']): dict['vif_s'][vif_s].update({'traffic_policy': {}}) if 'deleted' not in dict: address = leaf_node_changed(config, base + [ifname, 'vif-s', vif_s, 'address']) if address: dict['vif_s'][vif_s].update({'address_old' : address}) # If interface does not request an IPv4 DHCP address there is no need # to keep the dhcp-options key if 'address' not in dict['vif_s'][vif_s] or 'dhcp' not in \ dict['vif_s'][vif_s]['address']: if 'dhcp_options' in dict['vif_s'][vif_s]: del dict['vif_s'][vif_s]['dhcp_options'] # Check if we are a member of a bridge device bridge = is_member(config, f'{ifname}.{vif_s}', 'bridge') if bridge: dict['vif_s'][vif_s].update({'is_bridge_member' : bridge}) # Check if any DHCP options changed which require a client restat dhcp = is_node_changed(config, base + [ifname, 'vif-s', vif_s, 'dhcp-options']) if dhcp: dict['vif_s'][vif_s].update({'dhcp_options_changed' : {}}) for vif_c, vif_c_config in vif_s_config.get('vif_c', {}).items(): # Add subinterface name to dictionary dict['vif_s'][vif_s]['vif_c'][vif_c].update({'ifname' : f'{ifname}.{vif_s}.{vif_c}'}) if config.exists(['qos', 'interface', f'{ifname}.{vif_s}.{vif_c}']): dict['vif_s'][vif_s]['vif_c'][vif_c].update({'traffic_policy': {}}) if 'deleted' not in dict: address = leaf_node_changed(config, base + [ifname, 'vif-s', vif_s, 'vif-c', vif_c, 'address']) if address: dict['vif_s'][vif_s]['vif_c'][vif_c].update( {'address_old' : address}) # If interface does not request an IPv4 DHCP address there is no need # to keep the dhcp-options key if 'address' not in dict['vif_s'][vif_s]['vif_c'][vif_c] or 'dhcp' \ not in dict['vif_s'][vif_s]['vif_c'][vif_c]['address']: if 'dhcp_options' in dict['vif_s'][vif_s]['vif_c'][vif_c]: del dict['vif_s'][vif_s]['vif_c'][vif_c]['dhcp_options'] # Check if we are a member of a bridge device bridge = is_member(config, f'{ifname}.{vif_s}.{vif_c}', 'bridge') if bridge: dict['vif_s'][vif_s]['vif_c'][vif_c].update( {'is_bridge_member' : bridge}) # Check if any DHCP options changed which require a client restat dhcp = is_node_changed(config, base + [ifname, 'vif-s', vif_s, 'vif-c', vif_c, 'dhcp-options']) if dhcp: dict['vif_s'][vif_s]['vif_c'][vif_c].update({'dhcp_options_changed' : {}}) # Check vif, vif-s/vif-c VLAN interfaces for removal dict = get_removed_vlans(config, base + [ifname], dict) return ifname, dict def get_vlan_ids(interface): """ Get the VLAN ID of the interface bound to the bridge """ vlan_ids = set() bridge_status = cmd('bridge -j vlan show', shell=True) vlan_filter_status = json.loads(bridge_status) if vlan_filter_status is not None: for interface_status in vlan_filter_status: ifname = interface_status['ifname'] if interface == ifname: vlans_status = interface_status['vlans'] for vlan_status in vlans_status: vlan_id = vlan_status['vlan'] vlan_ids.add(vlan_id) return vlan_ids def get_accel_dict(config, base, chap_secrets, with_pki=False): """ Common utility function to retrieve and mangle the Accel-PPP configuration from different CLI input nodes. All Accel-PPP services have a common base where value retrival is identical. This function must be used whenever possible when working with Accel-PPP services! Return a dictionary with the necessary interface config keys. """ from vyos.utils.cpu import get_core_count from vyos.template import is_ipv4 dict = config.get_config_dict(base, key_mangling=('-', '_'), get_first_key=True, no_tag_node_value_mangle=True, with_recursive_defaults=True, with_pki=with_pki) # set CPUs cores to process requests dict.update({'thread_count' : get_core_count()}) # we need to store the path to the secrets file dict.update({'chap_secrets_file' : chap_secrets}) # We can only have two IPv4 and three IPv6 nameservers - also they are # configured in a different way in the configuration, this is why we split # the configuration if 'name_server' in dict: ns_v4 = [] ns_v6 = [] for ns in dict['name_server']: if is_ipv4(ns): ns_v4.append(ns) else: ns_v6.append(ns) dict.update({'name_server_ipv4' : ns_v4, 'name_server_ipv6' : ns_v6}) del dict['name_server'] # Check option "disable-accounting" per server and replace default value from '1813' to '0' for server in (dict_search('authentication.radius.server', dict) or []): if 'disable_accounting' in dict['authentication']['radius']['server'][server]: dict['authentication']['radius']['server'][server]['acct_port'] = '0' return dict def get_frrender_dict(conf, argv=None) -> dict: from copy import deepcopy from vyos.config import config_dict_merge from vyos.frrender import frr_protocols # Create an empty dictionary which will be filled down the code path and # returned to the caller dict = {} if argv and len(argv) > 1: dict['vrf_context'] = argv[1] def dict_helper_ospf_defaults(ospf, path): # We have gathered the dict representation of the CLI, but there are default # options which we need to update into the dictionary retrived. default_values = conf.get_config_defaults(path, key_mangling=('-', '_'), get_first_key=True, recursive=True) # We have to cleanup the default dict, as default values could enable features # which are not explicitly enabled on the CLI. Example: default-information # originate comes with a default metric-type of 2, which will enable the # entire default-information originate tree, even when not set via CLI so we # need to check this first and probably drop that key. if dict_search('default_information.originate', ospf) is None: del default_values['default_information'] if 'mpls_te' not in ospf: del default_values['mpls_te'] if 'graceful_restart' not in ospf: del default_values['graceful_restart'] for area_num in default_values.get('area', []): if dict_search(f'area.{area_num}.area_type.nssa', ospf) is None: del default_values['area'][area_num]['area_type']['nssa'] for protocol in ['babel', 'bgp', 'connected', 'isis', 'kernel', 'rip', 'static']: if dict_search(f'redistribute.{protocol}', ospf) is None: del default_values['redistribute'][protocol] if not bool(default_values['redistribute']): del default_values['redistribute'] for interface in ospf.get('interface', []): # We need to reload the defaults on every pass b/c of # hello-multiplier dependency on dead-interval # If hello-multiplier is set, we need to remove the default from # dead-interval. if 'hello_multiplier' in ospf['interface'][interface]: del default_values['interface'][interface]['dead_interval'] ospf = config_dict_merge(default_values, ospf) return ospf def dict_helper_ospfv3_defaults(ospfv3, path): # We have gathered the dict representation of the CLI, but there are default # options which we need to update into the dictionary retrived. default_values = conf.get_config_defaults(path, key_mangling=('-', '_'), get_first_key=True, recursive=True) # We have to cleanup the default dict, as default values could enable features # which are not explicitly enabled on the CLI. Example: default-information # originate comes with a default metric-type of 2, which will enable the # entire default-information originate tree, even when not set via CLI so we # need to check this first and probably drop that key. if dict_search('default_information.originate', ospfv3) is None: del default_values['default_information'] if 'graceful_restart' not in ospfv3: del default_values['graceful_restart'] for protocol in ['babel', 'bgp', 'connected', 'isis', 'kernel', 'ripng', 'static']: if dict_search(f'redistribute.{protocol}', ospfv3) is None: del default_values['redistribute'][protocol] if not bool(default_values['redistribute']): del default_values['redistribute'] default_values.pop('interface', {}) # merge in remaining default values ospfv3 = config_dict_merge(default_values, ospfv3) return ospfv3 def dict_helper_pim_defaults(pim, path): # We have gathered the dict representation of the CLI, but there are default # options which we need to update into the dictionary retrived. default_values = conf.get_config_defaults(path, key_mangling=('-', '_'), get_first_key=True, recursive=True) # We have to cleanup the default dict, as default values could enable features # which are not explicitly enabled on the CLI. for interface in pim.get('interface', []): if 'igmp' not in pim['interface'][interface]: del default_values['interface'][interface]['igmp'] pim = config_dict_merge(default_values, pim) return pim # Ethernet and bonding interfaces can participate in EVPN which is configured via FRR tmp = {} for if_type in ['ethernet', 'bonding']: interface_path = ['interfaces', if_type] if not conf.exists(interface_path): continue for interface in conf.list_nodes(interface_path): evpn_path = interface_path + [interface, 'evpn'] if not conf.exists(evpn_path): continue evpn = conf.get_config_dict(evpn_path, key_mangling=('-', '_')) tmp.update({interface : evpn}) # At least one participating EVPN interface found, add to result dict if tmp: dict['interfaces'] = tmp # Zebra prefix exchange for Kernel IP/IPv6 and routing protocols for ip_version in ['ip', 'ipv6']: ip_cli_path = ['system', ip_version] ip_dict = conf.get_config_dict(ip_cli_path, key_mangling=('-', '_'), get_first_key=True, with_recursive_defaults=True) if ip_dict: ip_dict['afi'] = ip_version dict.update({ip_version : ip_dict}) # Enable SNMP agentx support # SNMP AgentX support cannot be disabled once enabled if conf.exists(['service', 'snmp']): dict['snmp'] = {} # We will always need the policy key dict['policy'] = conf.get_config_dict(['policy'], key_mangling=('-', '_'), get_first_key=True, no_tag_node_value_mangle=True) # We need to check the CLI if the BABEL node is present and thus load in all the default # values present on the CLI - that's why we have if conf.exists() babel_cli_path = ['protocols', 'babel'] if conf.exists(babel_cli_path): babel = conf.get_config_dict(babel_cli_path, key_mangling=('-', '_'), get_first_key=True, with_recursive_defaults=True) dict.update({'babel' : babel}) # We need to check the CLI if the BFD node is present and thus load in all the default # values present on the CLI - that's why we have if conf.exists() bfd_cli_path = ['protocols', 'bfd'] if conf.exists(bfd_cli_path): bfd = conf.get_config_dict(bfd_cli_path, key_mangling=('-', '_'), get_first_key=True, no_tag_node_value_mangle=True, with_recursive_defaults=True) dict.update({'bfd' : bfd}) # We need to check the CLI if the BGP node is present and thus load in all the default # values present on the CLI - that's why we have if conf.exists() bgp_cli_path = ['protocols', 'bgp'] if conf.exists(bgp_cli_path): bgp = conf.get_config_dict(bgp_cli_path, key_mangling=('-', '_'), get_first_key=True, no_tag_node_value_mangle=True, with_recursive_defaults=True) bgp['dependent_vrfs'] = {} dict.update({'bgp' : bgp}) elif conf.exists_effective(bgp_cli_path): dict.update({'bgp' : {'deleted' : '', 'dependent_vrfs' : {}}}) # We need to check the CLI if the EIGRP node is present and thus load in all the default # values present on the CLI - that's why we have if conf.exists() eigrp_cli_path = ['protocols', 'eigrp'] if conf.exists(eigrp_cli_path): isis = conf.get_config_dict(eigrp_cli_path, key_mangling=('-', '_'), get_first_key=True, no_tag_node_value_mangle=True, with_recursive_defaults=True) dict.update({'eigrp' : isis}) elif conf.exists_effective(eigrp_cli_path): dict.update({'eigrp' : {'deleted' : ''}}) # We need to check the CLI if the ISIS node is present and thus load in all the default # values present on the CLI - that's why we have if conf.exists() isis_cli_path = ['protocols', 'isis'] if conf.exists(isis_cli_path): isis = conf.get_config_dict(isis_cli_path, key_mangling=('-', '_'), get_first_key=True, no_tag_node_value_mangle=True, with_recursive_defaults=True) dict.update({'isis' : isis}) elif conf.exists_effective(isis_cli_path): dict.update({'isis' : {'deleted' : ''}}) # We need to check the CLI if the MPLS node is present and thus load in all the default # values present on the CLI - that's why we have if conf.exists() mpls_cli_path = ['protocols', 'mpls'] if conf.exists(mpls_cli_path): mpls = conf.get_config_dict(mpls_cli_path, key_mangling=('-', '_'), get_first_key=True) dict.update({'mpls' : mpls}) elif conf.exists_effective(mpls_cli_path): dict.update({'mpls' : {'deleted' : ''}}) # We need to check the CLI if the OPENFABRIC node is present and thus load in all the default # values present on the CLI - that's why we have if conf.exists() openfabric_cli_path = ['protocols', 'openfabric'] if conf.exists(openfabric_cli_path): openfabric = conf.get_config_dict(openfabric_cli_path, key_mangling=('-', '_'), get_first_key=True, no_tag_node_value_mangle=True) dict.update({'openfabric' : openfabric}) elif conf.exists_effective(openfabric_cli_path): dict.update({'openfabric' : {'deleted' : ''}}) # We need to check the CLI if the OSPF node is present and thus load in all the default # values present on the CLI - that's why we have if conf.exists() ospf_cli_path = ['protocols', 'ospf'] if conf.exists(ospf_cli_path): ospf = conf.get_config_dict(ospf_cli_path, key_mangling=('-', '_'), get_first_key=True) ospf = dict_helper_ospf_defaults(ospf, ospf_cli_path) dict.update({'ospf' : ospf}) elif conf.exists_effective(ospf_cli_path): dict.update({'ospf' : {'deleted' : ''}}) # We need to check the CLI if the OSPFv3 node is present and thus load in all the default # values present on the CLI - that's why we have if conf.exists() ospfv3_cli_path = ['protocols', 'ospfv3'] if conf.exists(ospfv3_cli_path): ospfv3 = conf.get_config_dict(ospfv3_cli_path, key_mangling=('-', '_'), get_first_key=True) ospfv3 = dict_helper_ospfv3_defaults(ospfv3, ospfv3_cli_path) dict.update({'ospfv3' : ospfv3}) elif conf.exists_effective(ospfv3_cli_path): dict.update({'ospfv3' : {'deleted' : ''}}) # We need to check the CLI if the PIM node is present and thus load in all the default # values present on the CLI - that's why we have if conf.exists() pim_cli_path = ['protocols', 'pim'] if conf.exists(pim_cli_path): pim = conf.get_config_dict(pim_cli_path, key_mangling=('-', '_'), get_first_key=True) pim = dict_helper_pim_defaults(pim, pim_cli_path) dict.update({'pim' : pim}) elif conf.exists_effective(pim_cli_path): dict.update({'pim' : {'deleted' : ''}}) # We need to check the CLI if the PIM6 node is present and thus load in all the default # values present on the CLI - that's why we have if conf.exists() pim6_cli_path = ['protocols', 'pim6'] if conf.exists(pim6_cli_path): pim6 = conf.get_config_dict(pim6_cli_path, key_mangling=('-', '_'), get_first_key=True, with_recursive_defaults=True) dict.update({'pim6' : pim6}) elif conf.exists_effective(pim6_cli_path): dict.update({'pim6' : {'deleted' : ''}}) # We need to check the CLI if the RIP node is present and thus load in all the default # values present on the CLI - that's why we have if conf.exists() rip_cli_path = ['protocols', 'rip'] if conf.exists(rip_cli_path): rip = conf.get_config_dict(rip_cli_path, key_mangling=('-', '_'), get_first_key=True, with_recursive_defaults=True) dict.update({'rip' : rip}) elif conf.exists_effective(rip_cli_path): dict.update({'rip' : {'deleted' : ''}}) # We need to check the CLI if the RIPng node is present and thus load in all the default # values present on the CLI - that's why we have if conf.exists() ripng_cli_path = ['protocols', 'ripng'] if conf.exists(ripng_cli_path): ripng = conf.get_config_dict(ripng_cli_path, key_mangling=('-', '_'), get_first_key=True, with_recursive_defaults=True) dict.update({'ripng' : ripng}) elif conf.exists_effective(ripng_cli_path): dict.update({'ripng' : {'deleted' : ''}}) # We need to check the CLI if the RPKI node is present and thus load in all the default # values present on the CLI - that's why we have if conf.exists() rpki_cli_path = ['protocols', 'rpki'] if conf.exists(rpki_cli_path): rpki = conf.get_config_dict(rpki_cli_path, key_mangling=('-', '_'), get_first_key=True, with_pki=True, with_recursive_defaults=True) dict.update({'rpki' : rpki}) elif conf.exists_effective(rpki_cli_path): dict.update({'rpki' : {'deleted' : ''}}) # We need to check the CLI if the Segment Routing node is present and thus load in # all the default values present on the CLI - that's why we have if conf.exists() sr_cli_path = ['protocols', 'segment-routing'] if conf.exists(sr_cli_path): sr = conf.get_config_dict(sr_cli_path, key_mangling=('-', '_'), get_first_key=True, no_tag_node_value_mangle=True, with_recursive_defaults=True) dict.update({'segment_routing' : sr}) elif conf.exists_effective(sr_cli_path): dict.update({'segment_routing' : {'deleted' : ''}}) # We need to check the CLI if the static node is present and thus load in # all the default values present on the CLI - that's why we have if conf.exists() static_cli_path = ['protocols', 'static'] if conf.exists(static_cli_path): static = conf.get_config_dict(static_cli_path, key_mangling=('-', '_'), get_first_key=True, no_tag_node_value_mangle=True) # T3680 - get a list of all interfaces currently configured to use DHCP tmp = get_dhcp_interfaces(conf) if tmp: static.update({'dhcp' : tmp}) tmp = get_pppoe_interfaces(conf) if tmp: static.update({'pppoe' : tmp}) dict.update({'static' : static}) elif conf.exists_effective(static_cli_path): dict.update({'static' : {'deleted' : ''}}) # keep a re-usable list of dependent VRFs dependent_vrfs_default = {} if 'bgp' in dict: dependent_vrfs_default = deepcopy(dict['bgp']) # we do not need to nest the 'dependent_vrfs' key - simply remove it if 'dependent_vrfs' in dependent_vrfs_default: del dependent_vrfs_default['dependent_vrfs'] vrf_cli_path = ['vrf', 'name'] if conf.exists(vrf_cli_path): vrf = conf.get_config_dict(vrf_cli_path, key_mangling=('-', '_'), get_first_key=False, no_tag_node_value_mangle=True) # We do not have any VRF related default values on the CLI. The defaults will only # come into place under the protocols tree, thus we can safely merge them with the # appropriate routing protocols for vrf_name, vrf_config in vrf['name'].items(): bgp_vrf_path = ['vrf', 'name', vrf_name, 'protocols', 'bgp'] if 'bgp' in vrf_config.get('protocols', []): # We have gathered the dict representation of the CLI, but there are default # options which we need to update into the dictionary retrived. default_values = conf.get_config_defaults(bgp_vrf_path, key_mangling=('-', '_'), get_first_key=True, recursive=True) # merge in remaining default values vrf_config['protocols']['bgp'] = config_dict_merge(default_values, vrf_config['protocols']['bgp']) # Add this BGP VRF instance as dependency into the default VRF if 'bgp' in dict: dict['bgp']['dependent_vrfs'].update({vrf_name : deepcopy(vrf_config)}) vrf_config['protocols']['bgp']['dependent_vrfs'] = conf.get_config_dict( vrf_cli_path, key_mangling=('-', '_'), get_first_key=True, no_tag_node_value_mangle=True) # We can safely delete ourself from the dependent VRF list if vrf_name in vrf_config['protocols']['bgp']['dependent_vrfs']: del vrf_config['protocols']['bgp']['dependent_vrfs'][vrf_name] # Add dependency on possible existing default VRF to this VRF if 'bgp' in dict: vrf_config['protocols']['bgp']['dependent_vrfs'].update({'default': {'protocols': { 'bgp': dependent_vrfs_default}}}) elif conf.exists_effective(bgp_vrf_path): # Add this BGP VRF instance as dependency into the default VRF tmp = {'deleted' : '', 'dependent_vrfs': deepcopy(vrf['name'])} # We can safely delete ourself from the dependent VRF list if vrf_name in tmp['dependent_vrfs']: del tmp['dependent_vrfs'][vrf_name] # Add dependency on possible existing default VRF to this VRF if 'bgp' in dict: tmp['dependent_vrfs'].update({'default': {'protocols': { 'bgp': dependent_vrfs_default}}}) if 'bgp' in dict: dict['bgp']['dependent_vrfs'].update({vrf_name : {'protocols': tmp} }) if 'protocols' not in vrf['name'][vrf_name]: vrf['name'][vrf_name].update({'protocols': {'bgp' : tmp}}) else: vrf['name'][vrf_name]['protocols'].update({'bgp' : tmp}) # We need to check the CLI if the EIGRP node is present and thus load in all the default # values present on the CLI - that's why we have if conf.exists() eigrp_vrf_path = ['vrf', 'name', vrf_name, 'protocols', 'eigrp'] if 'eigrp' in vrf_config.get('protocols', []): eigrp = conf.get_config_dict(eigrp_vrf_path, key_mangling=('-', '_'), get_first_key=True, no_tag_node_value_mangle=True) vrf['name'][vrf_name]['protocols'].update({'eigrp' : isis}) elif conf.exists_effective(eigrp_vrf_path): vrf['name'][vrf_name]['protocols'].update({'eigrp' : {'deleted' : ''}}) # We need to check the CLI if the ISIS node is present and thus load in all the default # values present on the CLI - that's why we have if conf.exists() isis_vrf_path = ['vrf', 'name', vrf_name, 'protocols', 'isis'] if 'isis' in vrf_config.get('protocols', []): isis = conf.get_config_dict(isis_vrf_path, key_mangling=('-', '_'), get_first_key=True, no_tag_node_value_mangle=True, with_recursive_defaults=True) vrf['name'][vrf_name]['protocols'].update({'isis' : isis}) elif conf.exists_effective(isis_vrf_path): vrf['name'][vrf_name]['protocols'].update({'isis' : {'deleted' : ''}}) # We need to check the CLI if the OSPF node is present and thus load in all the default # values present on the CLI - that's why we have if conf.exists() ospf_vrf_path = ['vrf', 'name', vrf_name, 'protocols', 'ospf'] if 'ospf' in vrf_config.get('protocols', []): ospf = conf.get_config_dict(ospf_vrf_path, key_mangling=('-', '_'), get_first_key=True) ospf = dict_helper_ospf_defaults(vrf_config['protocols']['ospf'], ospf_vrf_path) vrf['name'][vrf_name]['protocols'].update({'ospf' : ospf}) elif conf.exists_effective(ospf_vrf_path): vrf['name'][vrf_name]['protocols'].update({'ospf' : {'deleted' : ''}}) # We need to check the CLI if the OSPFv3 node is present and thus load in all the default # values present on the CLI - that's why we have if conf.exists() ospfv3_vrf_path = ['vrf', 'name', vrf_name, 'protocols', 'ospfv3'] if 'ospfv3' in vrf_config.get('protocols', []): ospfv3 = conf.get_config_dict(ospfv3_vrf_path, key_mangling=('-', '_'), get_first_key=True) ospfv3 = dict_helper_ospfv3_defaults(vrf_config['protocols']['ospfv3'], ospfv3_vrf_path) vrf['name'][vrf_name]['protocols'].update({'ospfv3' : ospfv3}) elif conf.exists_effective(ospfv3_vrf_path): vrf['name'][vrf_name]['protocols'].update({'ospfv3' : {'deleted' : ''}}) # We need to check the CLI if the static node is present and thus load in all the default # values present on the CLI - that's why we have if conf.exists() static_vrf_path = ['vrf', 'name', vrf_name, 'protocols', 'static'] if 'static' in vrf_config.get('protocols', []): static = conf.get_config_dict(static_vrf_path, key_mangling=('-', '_'), get_first_key=True, no_tag_node_value_mangle=True) # T3680 - get a list of all interfaces currently configured to use DHCP tmp = get_dhcp_interfaces(conf, vrf_name) if tmp: static.update({'dhcp' : tmp}) tmp = get_pppoe_interfaces(conf, vrf_name) if tmp: static.update({'pppoe' : tmp}) vrf['name'][vrf_name]['protocols'].update({'static': static}) elif conf.exists_effective(static_vrf_path): vrf['name'][vrf_name]['protocols'].update({'static': {'deleted' : ''}}) vrf_vni_path = ['vrf', 'name', vrf_name, 'vni'] - if conf.exists_effective(vrf_vni_path): - vrf_config.update({'vni': conf.return_effective_value(vrf_vni_path)}) + if conf.exists(vrf_vni_path): + vrf_config.update({'vni': conf.return_value(vrf_vni_path)}) dict.update({'vrf' : vrf}) elif conf.exists_effective(vrf_cli_path): effective_vrf = conf.get_config_dict(vrf_cli_path, key_mangling=('-', '_'), get_first_key=False, no_tag_node_value_mangle=True, effective=True) vrf = {'name' : {}} for vrf_name, vrf_config in effective_vrf.get('name', {}).items(): vrf['name'].update({vrf_name : {}}) for protocol in frr_protocols: if protocol in vrf_config.get('protocols', []): # Create initial protocols key if not present if 'protocols' not in vrf['name'][vrf_name]: vrf['name'][vrf_name].update({'protocols' : {}}) # All routing protocols are deleted when we pass this point tmp = {'deleted' : ''} # Special treatment for BGP routing protocol if protocol == 'bgp': tmp['dependent_vrfs'] = {} if 'name' in vrf: tmp['dependent_vrfs'] = conf.get_config_dict( vrf_cli_path, key_mangling=('-', '_'), get_first_key=True, no_tag_node_value_mangle=True, effective=True) # Add dependency on possible existing default VRF to this VRF if 'bgp' in dict: tmp['dependent_vrfs'].update({'default': {'protocols': { 'bgp': dependent_vrfs_default}}}) # We can safely delete ourself from the dependent VRF list if vrf_name in tmp['dependent_vrfs']: del tmp['dependent_vrfs'][vrf_name] # Update VRF related dict vrf['name'][vrf_name]['protocols'].update({protocol : tmp}) dict.update({'vrf' : vrf}) if os.path.exists(frr_debug_enable): print('======== < BEGIN > ==========') import pprint pprint.pprint(dict) print('========= < END > ===========') # Use singleton instance of the FRR render class if hasattr(conf, 'frrender_cls'): frrender = getattr(conf, 'frrender_cls') dict.update({'frrender_cls' : frrender}) frrender.generate(dict) return dict diff --git a/smoketest/scripts/cli/test_vrf.py b/smoketest/scripts/cli/test_vrf.py index 3cab5248e..f4ed1a61f 100755 --- a/smoketest/scripts/cli/test_vrf.py +++ b/smoketest/scripts/cli/test_vrf.py @@ -1,600 +1,600 @@ #!/usr/bin/env python3 # # Copyright (C) 2020-2024 VyOS maintainers and contributors # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License version 2 or later as # published by the Free Software Foundation. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. import re import os import unittest from base_vyostest_shim import VyOSUnitTestSHIM from json import loads from jmespath import search from vyos.configsession import ConfigSessionError -from vyos.frr import mgmt_daemon +from vyos.frrender import mgmt_daemon from vyos.ifconfig import Interface from vyos.ifconfig import Section from vyos.utils.file import read_file from vyos.utils.network import get_interface_config from vyos.utils.network import get_vrf_tableid from vyos.utils.network import is_intf_addr_assigned from vyos.utils.network import interface_exists from vyos.utils.process import cmd from vyos.utils.system import sysctl_read base_path = ['vrf'] vrfs = ['red', 'green', 'blue', 'foo-bar', 'baz_foo'] v4_protocols = ['any', 'babel', 'bgp', 'connected', 'eigrp', 'isis', 'kernel', 'ospf', 'rip', 'static', 'table'] v6_protocols = ['any', 'babel', 'bgp', 'connected', 'isis', 'kernel', 'ospfv3', 'ripng', 'static', 'table'] class VRFTest(VyOSUnitTestSHIM.TestCase): _interfaces = [] @classmethod def setUpClass(cls): # we need to filter out VLAN interfaces identified by a dot (.) # in their name - just in case! if 'TEST_ETH' in os.environ: tmp = os.environ['TEST_ETH'].split() cls._interfaces = tmp else: for tmp in Section.interfaces('ethernet', vlan=False): cls._interfaces.append(tmp) # call base-classes classmethod super(VRFTest, cls).setUpClass() def setUp(self): # VRF strict_most ist always enabled tmp = read_file('/proc/sys/net/vrf/strict_mode') self.assertEqual(tmp, '1') def tearDown(self): # delete all VRFs self.cli_delete(base_path) self.cli_commit() for vrf in vrfs: self.assertFalse(interface_exists(vrf)) def test_vrf_vni_and_table_id(self): base_table = '1000' table = base_table for vrf in vrfs: base = base_path + ['name', vrf] description = f'VyOS-VRF-{vrf}' self.cli_set(base + ['description', description]) # check validate() - a table ID is mandatory with self.assertRaises(ConfigSessionError): self.cli_commit() self.cli_set(base + ['table', table]) self.cli_set(base + ['vni', table]) if vrf == 'green': self.cli_set(base + ['disable']) table = str(int(table) + 1) # commit changes self.cli_commit() # Verify VRF configuration table = base_table iproute2_config = read_file('/etc/iproute2/rt_tables.d/vyos-vrf.conf') for vrf in vrfs: description = f'VyOS-VRF-{vrf}' self.assertTrue(interface_exists(vrf)) vrf_if = Interface(vrf) # validate proper interface description self.assertEqual(vrf_if.get_alias(), description) # validate admin up/down state of VRF state = 'up' if vrf == 'green': state = 'down' self.assertEqual(vrf_if.get_admin_state(), state) # Test the iproute2 lookup file, syntax is as follows: # # # id vrf name comment # 1000 red # VyOS-VRF-red # 1001 green # VyOS-VRF-green # ... regex = f'{table}\s+{vrf}\s+#\s+{description}' self.assertTrue(re.findall(regex, iproute2_config)) frrconfig = self.getFRRconfig(f'vrf {vrf}', daemon=mgmt_daemon) self.assertIn(f' vni {table}', frrconfig) self.assertEqual(int(table), get_vrf_tableid(vrf)) # Increment table ID for the next run table = str(int(table) + 1) def test_vrf_loopbacks_ips(self): table = '2000' for vrf in vrfs: base = base_path + ['name', vrf] self.cli_set(base + ['table', str(table)]) table = str(int(table) + 1) # commit changes self.cli_commit() # Verify VRF configuration loopbacks = ['127.0.0.1', '::1'] for vrf in vrfs: # Ensure VRF was created self.assertTrue(interface_exists(vrf)) # Verify IP forwarding is 1 (enabled) self.assertEqual(sysctl_read(f'net.ipv4.conf.{vrf}.forwarding'), '1') self.assertEqual(sysctl_read(f'net.ipv6.conf.{vrf}.forwarding'), '1') # Test for proper loopback IP assignment for addr in loopbacks: self.assertTrue(is_intf_addr_assigned(vrf, addr)) def test_vrf_bind_all(self): table = '2000' for vrf in vrfs: base = base_path + ['name', vrf] self.cli_set(base + ['table', str(table)]) table = str(int(table) + 1) self.cli_set(base_path + ['bind-to-all']) # commit changes self.cli_commit() # Verify VRF configuration self.assertEqual(sysctl_read('net.ipv4.tcp_l3mdev_accept'), '1') self.assertEqual(sysctl_read('net.ipv4.udp_l3mdev_accept'), '1') # If there is any VRF defined, strict_mode should be on self.assertEqual(sysctl_read('net.vrf.strict_mode'), '1') def test_vrf_table_id_is_unalterable(self): # Linux Kernel prohibits the change of a VRF table on the fly. # VRF must be deleted and recreated! table = '1000' vrf = vrfs[0] base = base_path + ['name', vrf] self.cli_set(base + ['table', table]) # commit changes self.cli_commit() # Check if VRF has been created self.assertTrue(interface_exists(vrf)) table = str(int(table) + 1) self.cli_set(base + ['table', table]) # check validate() - table ID can not be altered! with self.assertRaises(ConfigSessionError): self.cli_commit() def test_vrf_assign_interface(self): vrf = vrfs[0] table = '5000' self.cli_set(['vrf', 'name', vrf, 'table', table]) for interface in self._interfaces: section = Section.section(interface) self.cli_set(['interfaces', section, interface, 'vrf', vrf]) # commit changes self.cli_commit() # Verify VRF assignmant for interface in self._interfaces: tmp = get_interface_config(interface) self.assertEqual(vrf, tmp['master']) # cleanup section = Section.section(interface) self.cli_delete(['interfaces', section, interface, 'vrf']) def test_vrf_static_route(self): base_table = '100' table = base_table for vrf in vrfs: next_hop = f'192.0.{table}.1' prefix = f'10.0.{table}.0/24' base = base_path + ['name', vrf] self.cli_set(base + ['vni', table]) # check validate() - a table ID is mandatory with self.assertRaises(ConfigSessionError): self.cli_commit() self.cli_set(base + ['table', table]) self.cli_set(base + ['protocols', 'static', 'route', prefix, 'next-hop', next_hop]) table = str(int(table) + 1) # commit changes self.cli_commit() # Verify VRF configuration table = base_table for vrf in vrfs: next_hop = f'192.0.{table}.1' prefix = f'10.0.{table}.0/24' self.assertTrue(interface_exists(vrf)) frrconfig = self.getFRRconfig(f'vrf {vrf}', daemon=mgmt_daemon) self.assertIn(f' vni {table}', frrconfig) self.assertIn(f' ip route {prefix} {next_hop}', frrconfig) # Increment table ID for the next run table = str(int(table) + 1) def test_vrf_link_local_ip_addresses(self): # Testcase for issue T4331 table = '100' vrf = 'orange' interface = 'dum9998' addresses = ['192.0.2.1/26', '2001:db8:9998::1/64', 'fe80::1/64'] for address in addresses: self.cli_set(['interfaces', 'dummy', interface, 'address', address]) # Create dummy interfaces self.cli_commit() # ... and verify IP addresses got assigned for address in addresses: self.assertTrue(is_intf_addr_assigned(interface, address)) # Move interface to VRF self.cli_set(base_path + ['name', vrf, 'table', table]) self.cli_set(['interfaces', 'dummy', interface, 'vrf', vrf]) # Apply VRF config self.cli_commit() # Ensure VRF got created self.assertTrue(interface_exists(vrf)) # ... and IP addresses are still assigned for address in addresses: self.assertTrue(is_intf_addr_assigned(interface, address)) # Verify VRF table ID self.assertEqual(int(table), get_vrf_tableid(vrf)) # Verify interface is assigned to VRF tmp = get_interface_config(interface) self.assertEqual(vrf, tmp['master']) # Delete Interface self.cli_delete(['interfaces', 'dummy', interface]) self.cli_commit() def test_vrf_disable_forwarding(self): table = '2000' for vrf in vrfs: base = base_path + ['name', vrf] self.cli_set(base + ['table', table]) self.cli_set(base + ['ip', 'disable-forwarding']) self.cli_set(base + ['ipv6', 'disable-forwarding']) table = str(int(table) + 1) # commit changes self.cli_commit() # Verify VRF configuration loopbacks = ['127.0.0.1', '::1'] for vrf in vrfs: # Ensure VRF was created self.assertTrue(interface_exists(vrf)) # Verify IP forwarding is 0 (disabled) self.assertEqual(sysctl_read(f'net.ipv4.conf.{vrf}.forwarding'), '0') self.assertEqual(sysctl_read(f'net.ipv6.conf.{vrf}.forwarding'), '0') def test_vrf_ip_protocol_route_map(self): table = '6000' for vrf in vrfs: base = base_path + ['name', vrf] self.cli_set(base + ['table', table]) for protocol in v4_protocols: self.cli_set(['policy', 'route-map', f'route-map-{vrf}-{protocol}', 'rule', '10', 'action', 'permit']) self.cli_set(base + ['ip', 'protocol', protocol, 'route-map', f'route-map-{vrf}-{protocol}']) table = str(int(table) + 1) self.cli_commit() # Verify route-map properly applied to FRR for vrf in vrfs: frrconfig = self.getFRRconfig(f'vrf {vrf}', daemon=mgmt_daemon) self.assertIn(f'vrf {vrf}', frrconfig) for protocol in v4_protocols: self.assertIn(f' ip protocol {protocol} route-map route-map-{vrf}-{protocol}', frrconfig) # Delete route-maps for vrf in vrfs: base = base_path + ['name', vrf] self.cli_delete(['policy', 'route-map', f'route-map-{vrf}-{protocol}']) self.cli_delete(base + ['ip', 'protocol']) self.cli_commit() # Verify route-map properly is removed from FRR for vrf in vrfs: frrconfig = self.getFRRconfig(f'vrf {vrf}', daemon=mgmt_daemon) self.assertNotIn(f' ip protocol', frrconfig) def test_vrf_ip_ipv6_protocol_non_existing_route_map(self): table = '6100' non_existing = 'non-existing' for vrf in vrfs: base = base_path + ['name', vrf] self.cli_set(base + ['table', table]) for protocol in v4_protocols: self.cli_set(base + ['ip', 'protocol', protocol, 'route-map', f'v4-{non_existing}']) for protocol in v6_protocols: self.cli_set(base + ['ipv6', 'protocol', protocol, 'route-map', f'v6-{non_existing}']) table = str(int(table) + 1) # Both v4 and v6 route-maps do not exist yet with self.assertRaises(ConfigSessionError): self.cli_commit() self.cli_set(['policy', 'route-map', f'v4-{non_existing}', 'rule', '10', 'action', 'deny']) # v6 route-map does not exist yet with self.assertRaises(ConfigSessionError): self.cli_commit() self.cli_set(['policy', 'route-map', f'v6-{non_existing}', 'rule', '10', 'action', 'deny']) # Commit again self.cli_commit() def test_vrf_ipv6_protocol_route_map(self): table = '6200' for vrf in vrfs: base = base_path + ['name', vrf] self.cli_set(base + ['table', table]) for protocol in v6_protocols: route_map = f'route-map-{vrf}-{protocol.replace("ospfv3", "ospf6")}' self.cli_set(['policy', 'route-map', route_map, 'rule', '10', 'action', 'permit']) self.cli_set(base + ['ipv6', 'protocol', protocol, 'route-map', route_map]) table = str(int(table) + 1) self.cli_commit() # Verify route-map properly applied to FRR for vrf in vrfs: frrconfig = self.getFRRconfig(f'vrf {vrf}', daemon=mgmt_daemon) self.assertIn(f'vrf {vrf}', frrconfig) for protocol in v6_protocols: # VyOS and FRR use a different name for OSPFv3 (IPv6) if protocol == 'ospfv3': protocol = 'ospf6' route_map = f'route-map-{vrf}-{protocol}' self.assertIn(f' ipv6 protocol {protocol} route-map {route_map}', frrconfig) # Delete route-maps for vrf in vrfs: base = base_path + ['name', vrf] self.cli_delete(['policy', 'route-map', f'route-map-{vrf}-{protocol}']) self.cli_delete(base + ['ipv6', 'protocol']) self.cli_commit() # Verify route-map properly is removed from FRR for vrf in vrfs: frrconfig = self.getFRRconfig(f'vrf {vrf}', daemon=mgmt_daemon) self.assertNotIn(f' ipv6 protocol', frrconfig) def test_vrf_vni_duplicates(self): base_table = '6300' table = base_table for vrf in vrfs: base = base_path + ['name', vrf] self.cli_set(base + ['table', str(table)]) self.cli_set(base + ['vni', '100']) table = str(int(table) + 1) # L3VNIs can only be used once with self.assertRaises(ConfigSessionError): self.cli_commit() table = base_table for vrf in vrfs: base = base_path + ['name', vrf] self.cli_set(base + ['vni', str(table)]) table = str(int(table) + 1) # commit changes self.cli_commit() # Verify VRF configuration table = base_table for vrf in vrfs: self.assertTrue(interface_exists(vrf)) frrconfig = self.getFRRconfig(f'vrf {vrf}', daemon=mgmt_daemon) self.assertIn(f' vni {table}', frrconfig) # Increment table ID for the next run table = str(int(table) + 1) def test_vrf_vni_add_change_remove(self): base_table = '6300' table = base_table for vrf in vrfs: base = base_path + ['name', vrf] self.cli_set(base + ['table', str(table)]) self.cli_set(base + ['vni', str(table)]) table = str(int(table) + 1) # commit changes self.cli_commit() # Verify VRF configuration table = base_table for vrf in vrfs: self.assertTrue(interface_exists(vrf)) frrconfig = self.getFRRconfig(f'vrf {vrf}', daemon=mgmt_daemon) self.assertIn(f' vni {table}', frrconfig) # Increment table ID for the next run table = str(int(table) + 1) # Now change all L3VNIs (increment 2) # We must also change the base_table number as we probably could get # duplicate VNI's during the test as VNIs are applied 1:1 to FRR base_table = '5000' table = base_table for vrf in vrfs: base = base_path + ['name', vrf] self.cli_set(base + ['vni', str(table)]) table = str(int(table) + 2) # commit changes self.cli_commit() # Verify VRF configuration table = base_table for vrf in vrfs: self.assertTrue(interface_exists(vrf)) frrconfig = self.getFRRconfig(f'vrf {vrf}', daemon=mgmt_daemon) self.assertIn(f' vni {table}', frrconfig) # Increment table ID for the next run table = str(int(table) + 2) # add a new VRF with VNI - this must not delete any existing VRF/VNI purple = 'purple' table = str(int(table) + 10) self.cli_set(base_path + ['name', purple, 'table', table]) self.cli_set(base_path + ['name', purple, 'vni', table]) # commit changes self.cli_commit() # Verify VRF configuration table = base_table for vrf in vrfs: self.assertTrue(interface_exists(vrf)) frrconfig = self.getFRRconfig(f'vrf {vrf}', daemon=mgmt_daemon) self.assertIn(f' vni {table}', frrconfig) # Increment table ID for the next run table = str(int(table) + 2) # Verify purple VRF/VNI self.assertTrue(interface_exists(purple)) table = str(int(table) + 10) frrconfig = self.getFRRconfig(f'vrf {purple}', daemon=mgmt_daemon) self.assertIn(f' vni {table}', frrconfig) # Now delete all the VNIs for vrf in vrfs: base = base_path + ['name', vrf] self.cli_delete(base + ['vni']) # commit changes self.cli_commit() # Verify no VNI is defined for vrf in vrfs: self.assertTrue(interface_exists(vrf)) frrconfig = self.getFRRconfig(f'vrf {vrf}', daemon=mgmt_daemon) self.assertNotIn('vni', frrconfig) # Verify purple VNI remains self.assertTrue(interface_exists(purple)) frrconfig = self.getFRRconfig(f'vrf {purple}', daemon=mgmt_daemon) self.assertIn(f' vni {table}', frrconfig) def test_vrf_ip_ipv6_nht(self): table = '6910' for vrf in vrfs: base = base_path + ['name', vrf] self.cli_set(base + ['table', table]) self.cli_set(base + ['ip', 'nht', 'no-resolve-via-default']) self.cli_set(base + ['ipv6', 'nht', 'no-resolve-via-default']) table = str(int(table) + 1) self.cli_commit() # Verify route-map properly applied to FRR for vrf in vrfs: frrconfig = self.getFRRconfig(f'vrf {vrf}', daemon=mgmt_daemon) self.assertIn(f'vrf {vrf}', frrconfig) self.assertIn(f' no ip nht resolve-via-default', frrconfig) self.assertIn(f' no ipv6 nht resolve-via-default', frrconfig) # Delete route-maps for vrf in vrfs: base = base_path + ['name', vrf] self.cli_delete(base + ['ip']) self.cli_delete(base + ['ipv6']) self.cli_commit() # Verify route-map properly is removed from FRR for vrf in vrfs: frrconfig = self.getFRRconfig(f'vrf {vrf}', daemon=mgmt_daemon) self.assertNotIn(f' no ip nht resolve-via-default', frrconfig) self.assertNotIn(f' no ipv6 nht resolve-via-default', frrconfig) def test_vrf_conntrack(self): table = '8710' nftables_rules = { 'vrf_zones_ct_in': ['ct original zone set iifname map @ct_iface_map'], 'vrf_zones_ct_out': ['ct original zone set oifname map @ct_iface_map'] } self.cli_set(base_path + ['name', 'randomVRF', 'table', '1000']) self.cli_commit() # Conntrack rules should not be present for chain, rule in nftables_rules.items(): self.verify_nftables_chain(rule, 'inet vrf_zones', chain, inverse=True) # conntrack is only enabled once NAT, NAT66 or firewalling is enabled self.cli_set(['nat']) for vrf in vrfs: base = base_path + ['name', vrf] self.cli_set(base + ['table', table]) table = str(int(table) + 1) # We need the commit inside the loop to trigger the bug in T6603 self.cli_commit() # Conntrack rules should now be present for chain, rule in nftables_rules.items(): self.verify_nftables_chain(rule, 'inet vrf_zones', chain, inverse=False) # T6603: there should be only ONE entry for the iifname/oifname in the chains tmp = loads(cmd('sudo nft -j list table inet vrf_zones')) num_rules = len(search("nftables[].rule[].chain", tmp)) # ['vrf_zones_ct_in', 'vrf_zones_ct_out'] self.assertEqual(num_rules, 2) self.cli_delete(['nat']) if __name__ == '__main__': unittest.main(verbosity=2) diff --git a/src/conf_mode/vrf.py b/src/conf_mode/vrf.py index 6eea9af4d..1b19c55d2 100755 --- a/src/conf_mode/vrf.py +++ b/src/conf_mode/vrf.py @@ -1,368 +1,357 @@ #!/usr/bin/env python3 # # Copyright (C) 2020-2024 VyOS maintainers and contributors # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License version 2 or later as # published by the Free Software Foundation. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. from sys import exit from jmespath import search from json import loads from vyos.config import Config from vyos.configdict import get_frrender_dict -from vyos.configdict import dict_merge from vyos.configdict import node_changed from vyos.configverify import verify_route_map from vyos.firewall import conntrack_required from vyos.frrender import FRRender from vyos.ifconfig import Interface from vyos.template import render from vyos.utils.dict import dict_search from vyos.utils.network import get_vrf_tableid from vyos.utils.network import get_vrf_members from vyos.utils.network import interface_exists from vyos.utils.process import call from vyos.utils.process import cmd from vyos.utils.process import popen from vyos.utils.system import sysctl_write from vyos import ConfigError from vyos import airbag airbag.enable() config_file = '/etc/iproute2/rt_tables.d/vyos-vrf.conf' k_mod = ['vrf'] nftables_table = 'inet vrf_zones' nftables_rules = { 'vrf_zones_ct_in': 'counter ct original zone set iifname map @ct_iface_map', 'vrf_zones_ct_out': 'counter ct original zone set oifname map @ct_iface_map' } def has_rule(af : str, priority : int, table : str=None): """ Check if a given ip rule exists $ ip --json -4 rule show [{'l3mdev': None, 'priority': 1000, 'src': 'all'}, {'action': 'unreachable', 'l3mdev': None, 'priority': 2000, 'src': 'all'}, {'priority': 32765, 'src': 'all', 'table': 'local'}, {'priority': 32766, 'src': 'all', 'table': 'main'}, {'priority': 32767, 'src': 'all', 'table': 'default'}] """ if af not in ['-4', '-6']: raise ValueError() command = f'ip --detail --json {af} rule show' for tmp in loads(cmd(command)): if 'priority' in tmp and 'table' in tmp: if tmp['priority'] == priority and tmp['table'] == table: return True elif 'priority' in tmp and table in tmp: # l3mdev table has a different layout if tmp['priority'] == priority: return True return False def is_nft_vrf_zone_rule_setup() -> bool: """ Check if an nftables connection tracking rule already exists """ tmp = loads(cmd('sudo nft -j list table inet vrf_zones')) num_rules = len(search("nftables[].rule[].chain", tmp)) return bool(num_rules) def vrf_interfaces(c, match): matched = [] old_level = c.get_level() c.set_level(['interfaces']) section = c.get_config_dict([], get_first_key=True) for type in section: interfaces = section[type] for name in interfaces: interface = interfaces[name] if 'vrf' in interface: v = interface.get('vrf', '') if v == match: matched.append(name) c.set_level(old_level) return matched def vrf_routing(c, match): matched = [] old_level = c.get_level() c.set_level(['protocols', 'vrf']) if match in c.list_nodes([]): matched.append(match) c.set_level(old_level) return matched def get_config(config=None): if config: conf = config else: conf = Config() base = ['vrf'] vrf = conf.get_config_dict(base, key_mangling=('-', '_'), no_tag_node_value_mangle=True, get_first_key=True) # determine which VRF has been removed for name in node_changed(conf, base + ['name']): if 'vrf_remove' not in vrf: vrf.update({'vrf_remove' : {}}) vrf['vrf_remove'][name] = {} # get VRF bound interfaces interfaces = vrf_interfaces(conf, name) if interfaces: vrf['vrf_remove'][name]['interface'] = interfaces # get VRF bound routing instances routes = vrf_routing(conf, name) if routes: vrf['vrf_remove'][name]['route'] = routes if 'name' in vrf: vrf['conntrack'] = conntrack_required(conf) # We need to merge the FRR rendering dict into the VRF dict # this is required to get the route-map information to FRR vrf.update({'frr_dict' : get_frrender_dict(conf)}) - - # We also need the route-map information from the config - # - # XXX: one MUST always call this without the key_mangling() option! See - # vyos.configverify.verify_common_route_maps() for more information. - tmp = {'policy' : {'route-map' : conf.get_config_dict(['policy', 'route-map'], - get_first_key=True)}} - - # Merge policy dict into "regular" config dict - vrf = dict_merge(tmp, vrf) return vrf def verify(vrf): # ensure VRF is not assigned to any interface if 'vrf_remove' in vrf: for name, config in vrf['vrf_remove'].items(): if 'interface' in config: raise ConfigError(f'Can not remove VRF "{name}", it still has '\ f'member interfaces!') if 'route' in config: raise ConfigError(f'Can not remove VRF "{name}", it still has '\ f'static routes installed!') if 'name' in vrf: reserved_names = ["add", "all", "broadcast", "default", "delete", "dev", "get", "inet", "mtu", "link", "type", "vrf"] table_ids = [] vnis = [] for name, vrf_config in vrf['name'].items(): # Reserved VRF names if name in reserved_names: raise ConfigError(f'VRF name "{name}" is reserved and connot be used!') # table id is mandatory if 'table' not in vrf_config: raise ConfigError(f'VRF "{name}" table id is mandatory!') # routing table id can't be changed - OS restriction if interface_exists(name): tmp = get_vrf_tableid(name) if tmp and tmp != int(vrf_config['table']): raise ConfigError(f'VRF "{name}" table id modification not possible!') # VRF routing table ID must be unique on the system if 'table' in vrf_config and vrf_config['table'] in table_ids: raise ConfigError(f'VRF "{name}" table id is not unique!') table_ids.append(vrf_config['table']) # VRF VNIs must be unique on the system if 'vni' in vrf_config: vni = vrf_config['vni'] if vni in vnis: raise ConfigError(f'VRF "{name}" VNI "{vni}" is not unique!') vnis.append(vni) tmp = dict_search('ip.protocol', vrf_config) if tmp != None: for protocol, protocol_options in tmp.items(): if 'route_map' in protocol_options: - verify_route_map(protocol_options['route_map'], vrf) + verify_route_map(protocol_options['route_map'], vrf['frr_dict']) tmp = dict_search('ipv6.protocol', vrf_config) if tmp != None: for protocol, protocol_options in tmp.items(): if 'route_map' in protocol_options: - verify_route_map(protocol_options['route_map'], vrf) + verify_route_map(protocol_options['route_map'], vrf['frr_dict']) return None def generate(vrf): # Render iproute2 VR helper names render(config_file, 'iproute2/vrf.conf.j2', vrf) if 'frr_dict' in vrf and 'frrender_cls' not in vrf['frr_dict']: FRRender().generate(vrf['frr_dict']) return None def apply(vrf): # Documentation # # - https://github.com/torvalds/linux/blob/master/Documentation/networking/vrf.txt # - https://github.com/Mellanox/mlxsw/wiki/Virtual-Routing-and-Forwarding-(VRF) # - https://github.com/Mellanox/mlxsw/wiki/L3-Tunneling # - https://netdevconf.info/1.1/proceedings/slides/ahern-vrf-tutorial.pdf # - https://netdevconf.info/1.2/slides/oct6/02_ahern_what_is_l3mdev_slides.pdf # set the default VRF global behaviour bind_all = '0' if 'bind_to_all' in vrf: bind_all = '1' sysctl_write('net.ipv4.tcp_l3mdev_accept', bind_all) sysctl_write('net.ipv4.udp_l3mdev_accept', bind_all) for tmp in (dict_search('vrf_remove', vrf) or []): if interface_exists(tmp): # T5492: deleting a VRF instance may leafe processes running # (e.g. dhclient) as there is a depedency ordering issue in the CLI. # We need to ensure that we stop the dhclient processes first so # a proper DHCLP RELEASE message is sent for interface in get_vrf_members(tmp): vrf_iface = Interface(interface) vrf_iface.set_dhcp(False) vrf_iface.set_dhcpv6(False) # Remove nftables conntrack zone map item nft_del_element = f'delete element inet vrf_zones ct_iface_map {{ "{tmp}" }}' # Check if deleting is possible first to avoid raising errors _, err = popen(f'nft --check {nft_del_element}') if not err: # Remove map element cmd(f'nft {nft_del_element}') # Delete the VRF Kernel interface call(f'ip link delete dev {tmp}') if 'name' in vrf: # Linux routing uses rules to find tables - routing targets are then # looked up in those tables. If the lookup got a matching route, the # process ends. # # TL;DR; first table with a matching entry wins! # # You can see your routing table lookup rules using "ip rule", sadly the # local lookup is hit before any VRF lookup. Pinging an addresses from the # VRF will usually find a hit in the local table, and never reach the VRF # routing table - this is usually not what you want. Thus we will # re-arrange the tables and move the local lookup further down once VRFs # are enabled. # # Thanks to https://stbuehler.de/blog/article/2020/02/29/using_vrf__virtual_routing_and_forwarding__on_linux.html for afi in ['-4', '-6']: # move lookup local to pref 32765 (from 0) if not has_rule(afi, 32765, 'local'): call(f'ip {afi} rule add pref 32765 table local') if has_rule(afi, 0, 'local'): call(f'ip {afi} rule del pref 0') # make sure that in VRFs after failed lookup in the VRF specific table # nothing else is reached if not has_rule(afi, 1000, 'l3mdev'): # this should be added by the kernel when a VRF is created # add it here for completeness call(f'ip {afi} rule add pref 1000 l3mdev protocol kernel') # add another rule with an unreachable target which only triggers in VRF context # if a route could not be reached if not has_rule(afi, 2000, 'l3mdev'): call(f'ip {afi} rule add pref 2000 l3mdev unreachable') nft_vrf_zone_rule_setup = False for name, config in vrf['name'].items(): table = config['table'] if not interface_exists(name): # For each VRF apart from your default context create a VRF # interface with a separate routing table call(f'ip link add {name} type vrf table {table}') # set VRF description for e.g. SNMP monitoring vrf_if = Interface(name) # We also should add proper loopback IP addresses to the newly added # VRF for services bound to the loopback address (SNMP, NTP) vrf_if.add_addr('127.0.0.1/8') vrf_if.add_addr('::1/128') # add VRF description if available vrf_if.set_alias(config.get('description', '')) # Enable/Disable IPv4 forwarding tmp = dict_search('ip.disable_forwarding', config) value = '0' if (tmp != None) else '1' vrf_if.set_ipv4_forwarding(value) # Enable/Disable IPv6 forwarding tmp = dict_search('ipv6.disable_forwarding', config) value = '0' if (tmp != None) else '1' vrf_if.set_ipv6_forwarding(value) # Enable/Disable of an interface must always be done at the end of the # derived class to make use of the ref-counting set_admin_state() # function. We will only enable the interface if 'up' was called as # often as 'down'. This is required by some interface implementations # as certain parameters can only be changed when the interface is # in admin-down state. This ensures the link does not flap during # reconfiguration. state = 'down' if 'disable' in config else 'up' vrf_if.set_admin_state(state) # Add nftables conntrack zone map item nft_add_element = f'add element inet vrf_zones ct_iface_map {{ "{name}" : {table} }}' cmd(f'nft {nft_add_element}') # Only call into nftables as long as there is nothing setup to avoid wasting # CPU time and thus lenghten the commit process if not nft_vrf_zone_rule_setup: nft_vrf_zone_rule_setup = is_nft_vrf_zone_rule_setup() # Install nftables conntrack rules only once if vrf['conntrack'] and not nft_vrf_zone_rule_setup: for chain, rule in nftables_rules.items(): cmd(f'nft add rule inet vrf_zones {chain} {rule}') if 'name' not in vrf or not vrf['conntrack']: for chain, rule in nftables_rules.items(): cmd(f'nft flush chain inet vrf_zones {chain}') # Return default ip rule values if 'name' not in vrf: for afi in ['-4', '-6']: # move lookup local to pref 0 (from 32765) if not has_rule(afi, 0, 'local'): call(f'ip {afi} rule add pref 0 from all lookup local') if has_rule(afi, 32765, 'local'): call(f'ip {afi} rule del pref 32765 table local') if has_rule(afi, 1000, 'l3mdev'): call(f'ip {afi} rule del pref 1000 l3mdev protocol kernel') if has_rule(afi, 2000, 'l3mdev'): call(f'ip {afi} rule del pref 2000 l3mdev unreachable') if 'frr_dict' in vrf and 'frrender_cls' not in vrf['frr_dict']: FRRender().apply() return None if __name__ == '__main__': try: c = get_config() verify(c) generate(c) apply(c) except ConfigError as e: print(e) exit(1)