diff --git a/python/vyos/configverify.py b/python/vyos/configverify.py index 85423142d..5d3723876 100644 --- a/python/vyos/configverify.py +++ b/python/vyos/configverify.py @@ -1,467 +1,470 @@ -# Copyright 2020-2023 VyOS maintainers and contributors <maintainers@vyos.io> +# Copyright 2020-2024 VyOS maintainers and contributors <maintainers@vyos.io> # # This library is free software; you can redistribute it and/or # modify it under the terms of the GNU Lesser General Public # License as published by the Free Software Foundation; either # version 2.1 of the License, or (at your option) any later version. # # This library is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU # Lesser General Public License for more details. # # You should have received a copy of the GNU Lesser General Public # License along with this library. If not, see <http://www.gnu.org/licenses/>. # The sole purpose of this module is to hold common functions used in # all kinds of implementations to verify the CLI configuration. # It is started by migrating the interfaces to the new get_config_dict() # approach which will lead to a lot of code that can be reused. # NOTE: imports should be as local as possible to the function which # makes use of it! from vyos import ConfigError from vyos.utils.dict import dict_search from vyos.utils.dict import dict_search_recursive +# pattern re-used in ipsec migration script +dynamic_interface_pattern = r'(ppp|pppoe|sstpc|l2tp|ipoe)[0-9]+' + def verify_mtu(config): """ Common helper function used by interface implementations to perform recurring validation if the specified MTU can be used by the underlaying hardware. """ from vyos.ifconfig import Interface if 'mtu' in config: mtu = int(config['mtu']) tmp = Interface(config['ifname']) # Not all interfaces support min/max MTU # https://vyos.dev/T5011 try: min_mtu = tmp.get_min_mtu() max_mtu = tmp.get_max_mtu() except: # Fallback to defaults min_mtu = 68 max_mtu = 9000 if mtu < min_mtu: raise ConfigError(f'Interface MTU too low, ' \ f'minimum supported MTU is {min_mtu}!') if mtu > max_mtu: raise ConfigError(f'Interface MTU too high, ' \ f'maximum supported MTU is {max_mtu}!') def verify_mtu_parent(config, parent): if 'mtu' not in config or 'mtu' not in parent: return mtu = int(config['mtu']) parent_mtu = int(parent['mtu']) if mtu > parent_mtu: raise ConfigError(f'Interface MTU ({mtu}) too high, ' \ f'parent interface MTU is {parent_mtu}!') def verify_mtu_ipv6(config): """ Common helper function used by interface implementations to perform recurring validation if the specified MTU can be used when IPv6 is configured on the interface. IPv6 requires a 1280 bytes MTU. """ from vyos.template import is_ipv6 if 'mtu' in config: # IPv6 minimum required link mtu min_mtu = 1280 if int(config['mtu']) < min_mtu: interface = config['ifname'] error_msg = f'IPv6 address will be configured on interface "{interface}",\n' \ f'the required minimum MTU is {min_mtu}!' if 'address' in config: for address in config['address']: if address in ['dhcpv6'] or is_ipv6(address): raise ConfigError(error_msg) tmp = dict_search('ipv6.address.no_default_link_local', config) if tmp == None: raise ConfigError('link-local ' + error_msg) tmp = dict_search('ipv6.address.autoconf', config) if tmp != None: raise ConfigError(error_msg) tmp = dict_search('ipv6.address.eui64', config) if tmp != None: raise ConfigError(error_msg) def verify_vrf(config): """ Common helper function used by interface implementations to perform recurring validation of VRF configuration. """ from netifaces import interfaces if 'vrf' in config and config['vrf'] != 'default': if config['vrf'] not in interfaces(): raise ConfigError('VRF "{vrf}" does not exist'.format(**config)) if 'is_bridge_member' in config: raise ConfigError( 'Interface "{ifname}" cannot be both a member of VRF "{vrf}" ' 'and bridge "{is_bridge_member}"!'.format(**config)) def verify_bond_bridge_member(config): """ Checks if interface has a VRF configured and is also part of a bond or bridge, which is not allowed! """ if 'vrf' in config: ifname = config['ifname'] if 'is_bond_member' in config: raise ConfigError(f'Can not add interface "{ifname}" to bond, it has a VRF assigned!') if 'is_bridge_member' in config: raise ConfigError(f'Can not add interface "{ifname}" to bridge, it has a VRF assigned!') def verify_tunnel(config): """ This helper is used to verify the common part of the tunnel """ from vyos.template import is_ipv4 from vyos.template import is_ipv6 if 'encapsulation' not in config: raise ConfigError('Must configure the tunnel encapsulation for '\ '{ifname}!'.format(**config)) if 'source_address' not in config and 'source_interface' not in config: raise ConfigError('source-address or source-interface required for tunnel!') if 'remote' not in config and config['encapsulation'] != 'gre': raise ConfigError('remote ip address is mandatory for tunnel') if config['encapsulation'] in ['ipip6', 'ip6ip6', 'ip6gre', 'ip6gretap', 'ip6erspan']: error_ipv6 = 'Encapsulation mode requires IPv6' if 'source_address' in config and not is_ipv6(config['source_address']): raise ConfigError(f'{error_ipv6} source-address') if 'remote' in config and not is_ipv6(config['remote']): raise ConfigError(f'{error_ipv6} remote') else: error_ipv4 = 'Encapsulation mode requires IPv4' if 'source_address' in config and not is_ipv4(config['source_address']): raise ConfigError(f'{error_ipv4} source-address') if 'remote' in config and not is_ipv4(config['remote']): raise ConfigError(f'{error_ipv4} remote address') if config['encapsulation'] in ['sit', 'gretap', 'ip6gretap']: if 'source_interface' in config: encapsulation = config['encapsulation'] raise ConfigError(f'Option source-interface can not be used with ' \ f'encapsulation "{encapsulation}"!') elif config['encapsulation'] == 'gre': if 'source_address' in config and is_ipv6(config['source_address']): raise ConfigError('Can not use local IPv6 address is for mGRE tunnels') def verify_eapol(config): """ Common helper function used by interface implementations to perform recurring validation of EAPoL configuration. """ if 'eapol' in config: if 'certificate' not in config['eapol']: raise ConfigError('Certificate must be specified when using EAPoL!') if 'pki' not in config or 'certificate' not in config['pki']: raise ConfigError('Invalid certificate specified for EAPoL') cert_name = config['eapol']['certificate'] if cert_name not in config['pki']['certificate']: raise ConfigError('Invalid certificate specified for EAPoL') cert = config['pki']['certificate'][cert_name] if 'certificate' not in cert or 'private' not in cert or 'key' not in cert['private']: raise ConfigError('Invalid certificate/private key specified for EAPoL') if 'password_protected' in cert['private']: raise ConfigError('Encrypted private key cannot be used for EAPoL') if 'ca_certificate' in config['eapol']: if 'ca' not in config['pki']: raise ConfigError('Invalid CA certificate specified for EAPoL') for ca_cert_name in config['eapol']['ca_certificate']: if ca_cert_name not in config['pki']['ca']: raise ConfigError('Invalid CA certificate specified for EAPoL') ca_cert = config['pki']['ca'][ca_cert_name] if 'certificate' not in ca_cert: raise ConfigError('Invalid CA certificate specified for EAPoL') def verify_mirror_redirect(config): """ Common helper function used by interface implementations to perform recurring validation of mirror and redirect interface configuration via tc(8) It makes no sense to mirror traffic back at yourself! """ import os if {'mirror', 'redirect'} <= set(config): raise ConfigError('Mirror and redirect can not be enabled at the same time!') if 'mirror' in config: for direction, mirror_interface in config['mirror'].items(): if not os.path.exists(f'/sys/class/net/{mirror_interface}'): raise ConfigError(f'Requested mirror interface "{mirror_interface}" '\ 'does not exist!') if mirror_interface == config['ifname']: raise ConfigError(f'Can not mirror "{direction}" traffic back '\ 'the originating interface!') if 'redirect' in config: redirect_ifname = config['redirect'] if not os.path.exists(f'/sys/class/net/{redirect_ifname}'): raise ConfigError(f'Requested redirect interface "{redirect_ifname}" '\ 'does not exist!') if ('mirror' in config or 'redirect' in config) and dict_search('traffic_policy.in', config) is not None: # XXX: support combination of limiting and redirect/mirror - this is an # artificial limitation raise ConfigError('Can not use ingress policy together with mirror or redirect!') def verify_authentication(config): """ Common helper function used by interface implementations to perform recurring validation of authentication for either PPPoE or WWAN interfaces. If authentication CLI option is defined, both username and password must be set! """ if 'authentication' not in config: return if not {'username', 'password'} <= set(config['authentication']): raise ConfigError('Authentication requires both username and ' \ 'password to be set!') def verify_address(config): """ Common helper function used by interface implementations to perform recurring validation of IP address assignment when interface is part of a bridge or bond. """ if {'is_bridge_member', 'address'} <= set(config): interface = config['ifname'] bridge_name = next(iter(config['is_bridge_member'])) raise ConfigError(f'Cannot assign address to interface "{interface}" ' f'as it is a member of bridge "{bridge_name}"!') def verify_bridge_delete(config): """ Common helper function used by interface implementations to perform recurring validation of IP address assignmenr when interface also is part of a bridge. """ if 'is_bridge_member' in config: interface = config['ifname'] bridge_name = next(iter(config['is_bridge_member'])) raise ConfigError(f'Interface "{interface}" cannot be deleted as it ' f'is a member of bridge "{bridge_name}"!') def verify_interface_exists(ifname): """ Common helper function used by interface implementations to perform recurring validation if an interface actually exists. """ import os if not os.path.exists(f'/sys/class/net/{ifname}'): raise ConfigError(f'Interface "{ifname}" does not exist!') def verify_source_interface(config): """ Common helper function used by interface implementations to perform recurring validation of the existence of a source-interface required by e.g. peth/MACvlan, MACsec ... """ import re from netifaces import interfaces ifname = config['ifname'] if 'source_interface' not in config: raise ConfigError(f'Physical source-interface required for "{ifname}"!') src_ifname = config['source_interface'] # We do not allow sourcing other interfaces (e.g. tunnel) from dynamic interfaces - tmp = re.compile(r'(ppp|pppoe|sstpc|l2tp|ipoe)[0-9]+') + tmp = re.compile(dynamic_interface_pattern) if tmp.match(src_ifname): raise ConfigError(f'Can not source "{ifname}" from dynamic interface "{src_ifname}"!') if src_ifname not in interfaces(): raise ConfigError(f'Specified source-interface {src_ifname} does not exist') if 'source_interface_is_bridge_member' in config: bridge_name = next(iter(config['source_interface_is_bridge_member'])) raise ConfigError(f'Invalid source-interface "{src_ifname}". Interface ' f'is already a member of bridge "{bridge_name}"!') if 'source_interface_is_bond_member' in config: bond_name = next(iter(config['source_interface_is_bond_member'])) raise ConfigError(f'Invalid source-interface "{src_ifname}". Interface ' f'is already a member of bond "{bond_name}"!') if 'is_source_interface' in config: tmp = config['is_source_interface'] raise ConfigError(f'Can not use source-interface "{src_ifname}", it already ' \ f'belongs to interface "{tmp}"!') def verify_dhcpv6(config): """ Common helper function used by interface implementations to perform recurring validation of DHCPv6 options which are mutually exclusive. """ if 'dhcpv6_options' in config: if {'parameters_only', 'temporary'} <= set(config['dhcpv6_options']): raise ConfigError('DHCPv6 temporary and parameters-only options ' 'are mutually exclusive!') # It is not allowed to have duplicate SLA-IDs as those identify an # assigned IPv6 subnet from a delegated prefix for pd in (dict_search('dhcpv6_options.pd', config) or []): sla_ids = [] interfaces = dict_search(f'dhcpv6_options.pd.{pd}.interface', config) if not interfaces: raise ConfigError('DHCPv6-PD requires an interface where to assign ' 'the delegated prefix!') for count, interface in enumerate(interfaces): if 'sla_id' in interfaces[interface]: sla_ids.append(interfaces[interface]['sla_id']) else: sla_ids.append(str(count)) # Check for duplicates duplicates = [x for n, x in enumerate(sla_ids) if x in sla_ids[:n]] if duplicates: raise ConfigError('Site-Level Aggregation Identifier (SLA-ID) ' 'must be unique per prefix-delegation!') def verify_vlan_config(config): """ Common helper function used by interface implementations to perform recurring validation of interface VLANs """ # VLAN and Q-in-Q IDs are not allowed to overlap if 'vif' in config and 'vif_s' in config: duplicate = list(set(config['vif']) & set(config['vif_s'])) if duplicate: raise ConfigError(f'Duplicate VLAN id "{duplicate[0]}" used for vif and vif-s interfaces!') parent_ifname = config['ifname'] # 802.1q VLANs for vlan_id in config.get('vif', {}): vlan = config['vif'][vlan_id] vlan['ifname'] = f'{parent_ifname}.{vlan_id}' verify_dhcpv6(vlan) verify_address(vlan) verify_vrf(vlan) verify_mirror_redirect(vlan) verify_mtu_parent(vlan, config) # 802.1ad (Q-in-Q) VLANs for s_vlan_id in config.get('vif_s', {}): s_vlan = config['vif_s'][s_vlan_id] s_vlan['ifname'] = f'{parent_ifname}.{s_vlan_id}' verify_dhcpv6(s_vlan) verify_address(s_vlan) verify_vrf(s_vlan) verify_mirror_redirect(s_vlan) verify_mtu_parent(s_vlan, config) for c_vlan_id in s_vlan.get('vif_c', {}): c_vlan = s_vlan['vif_c'][c_vlan_id] c_vlan['ifname'] = f'{parent_ifname}.{s_vlan_id}.{c_vlan_id}' verify_dhcpv6(c_vlan) verify_address(c_vlan) verify_vrf(c_vlan) verify_mirror_redirect(c_vlan) verify_mtu_parent(c_vlan, config) verify_mtu_parent(c_vlan, s_vlan) def verify_diffie_hellman_length(file, min_keysize): """ Verify Diffie-Hellamn keypair length given via file. It must be greater then or equal to min_keysize """ import os import re from vyos.utils.process import cmd try: keysize = str(min_keysize) except: return False if os.path.exists(file): out = cmd(f'openssl dhparam -inform PEM -in {file} -text') prog = re.compile('\d+\s+bit') if prog.search(out): bits = prog.search(out)[0].split()[0] if int(bits) >= int(min_keysize): return True return False def verify_common_route_maps(config): """ Common helper function used by routing protocol implementations to perform recurring validation if the specified route-map for either zebra to kernel installation exists (this is the top-level route_map key) or when a route is redistributed with a route-map that it exists! """ # XXX: This function is called in combination with a previous call to: # tmp = conf.get_config_dict(['policy']) - see protocols_ospf.py as example. # We should NOT call this with the key_mangling option as this would rename # route-map hypens '-' to underscores '_' and one could no longer distinguish # what should have been the "proper" route-map name, as foo-bar and foo_bar # are two entire different route-map instances! for route_map in ['route-map', 'route_map']: if route_map not in config: continue tmp = config[route_map] # Check if the specified route-map exists, if not error out if dict_search(f'policy.route-map.{tmp}', config) == None: raise ConfigError(f'Specified route-map "{tmp}" does not exist!') if 'redistribute' in config: for protocol, protocol_config in config['redistribute'].items(): if 'route_map' in protocol_config: verify_route_map(protocol_config['route_map'], config) def verify_route_map(route_map_name, config): """ Common helper function used by routing protocol implementations to perform recurring validation if a specified route-map exists! """ # Check if the specified route-map exists, if not error out if dict_search(f'policy.route-map.{route_map_name}', config) == None: raise ConfigError(f'Specified route-map "{route_map_name}" does not exist!') def verify_prefix_list(prefix_list, config, version=''): """ Common helper function used by routing protocol implementations to perform recurring validation if a specified prefix-list exists! """ # Check if the specified prefix-list exists, if not error out if dict_search(f'policy.prefix-list{version}.{prefix_list}', config) == None: raise ConfigError(f'Specified prefix-list{version} "{prefix_list}" does not exist!') def verify_access_list(access_list, config, version=''): """ Common helper function used by routing protocol implementations to perform recurring validation if a specified prefix-list exists! """ # Check if the specified ACL exists, if not error out if dict_search(f'policy.access-list{version}.{access_list}', config) == None: raise ConfigError(f'Specified access-list{version} "{access_list}" does not exist!') diff --git a/src/conf_mode/service_dns_dynamic.py b/src/conf_mode/service_dns_dynamic.py index 99fa8feee..845aaa1b5 100755 --- a/src/conf_mode/service_dns_dynamic.py +++ b/src/conf_mode/service_dns_dynamic.py @@ -1,187 +1,187 @@ #!/usr/bin/env python3 # # Copyright (C) 2018-2023 VyOS maintainers and contributors # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License version 2 or later as # published by the Free Software Foundation. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. import os import re from sys import exit from vyos.base import Warning from vyos.config import Config from vyos.configverify import verify_interface_exists +from vyos.configverify import dynamic_interface_pattern from vyos.template import render from vyos.utils.process import call +from vyos.utils.network import interface_exists from vyos import ConfigError from vyos import airbag airbag.enable() config_file = r'/run/ddclient/ddclient.conf' systemd_override = r'/run/systemd/system/ddclient.service.d/override.conf' -# Dynamic interfaces that might not exist when the configuration is loaded -dynamic_interfaces = ('pppoe', 'sstpc') - # Protocols that require zone zone_necessary = ['cloudflare', 'digitalocean', 'godaddy', 'hetzner', 'gandi', 'nfsn', 'nsupdate'] zone_supported = zone_necessary + ['dnsexit2', 'zoneedit1'] # Protocols that do not require username username_unnecessary = ['1984', 'cloudflare', 'cloudns', 'digitalocean', 'dnsexit2', 'duckdns', 'freemyip', 'hetzner', 'keysystems', 'njalla', 'nsupdate', 'regfishde'] # Protocols that support TTL ttl_supported = ['cloudflare', 'dnsexit2', 'gandi', 'hetzner', 'godaddy', 'nfsn', 'nsupdate'] # Protocols that support both IPv4 and IPv6 dualstack_supported = ['cloudflare', 'digitalocean', 'dnsexit2', 'duckdns', 'dyndns2', 'easydns', 'freedns', 'hetzner', 'infomaniak', 'njalla'] # dyndns2 protocol in ddclient honors dual stack for selective servers # because of the way it is implemented in ddclient dyndns_dualstack_servers = ['members.dyndns.org', 'dynv6.com'] def get_config(config=None): if config: conf = config else: conf = Config() base = ['service', 'dns', 'dynamic'] if not conf.exists(base): return None dyndns = conf.get_config_dict(base, key_mangling=('-', '_'), no_tag_node_value_mangle=True, get_first_key=True, with_recursive_defaults=True) dyndns['config_file'] = config_file return dyndns def verify(dyndns): # bail out early - looks like removal from running config if not dyndns or 'name' not in dyndns: return None # Dynamic DNS service provider - configuration validation for service, config in dyndns['name'].items(): - error_msg_req = f'is required for Dynamic DNS service "{service}"' error_msg_uns = f'is not supported for Dynamic DNS service "{service}"' for field in ['protocol', 'address', 'host_name']: if field not in config: raise ConfigError(f'"{field.replace("_", "-")}" {error_msg_req}') # If dyndns address is an interface, ensure # that the interface exists (or just warn if dynamic interface) # and that web-options are not set if config['address'] != 'web': + tmp = re.compile(dynamic_interface_pattern) # exclude check interface for dynamic interfaces - if config['address'].startswith(dynamic_interfaces): - Warning(f'Interface "{config["address"]}" does not exist yet and cannot ' - f'be used for Dynamic DNS service "{service}" until it is up!') + if tmp.match(config["address"]): + if not interface_exists(config["address"]): + Warning(f'Interface "{config["address"]}" does not exist yet and cannot ' + f'be used for Dynamic DNS service "{service}" until it is up!') else: verify_interface_exists(config['address']) if 'web_options' in config: raise ConfigError(f'"web-options" is applicable only when using HTTP(S) ' f'web request to obtain the IP address') # Warn if using checkip.dyndns.org, as it does not support HTTPS # See: https://github.com/ddclient/ddclient/issues/597 if 'web_options' in config: if 'url' not in config['web_options']: raise ConfigError(f'"url" in "web-options" {error_msg_req} ' f'with protocol "{config["protocol"]}"') elif re.search("^(https?://)?checkip\.dyndns\.org", config['web_options']['url']): Warning(f'"checkip.dyndns.org" does not support HTTPS requests for IP address ' f'lookup. Please use a different IP address lookup service.') # RFC2136 uses 'key' instead of 'password' if config['protocol'] != 'nsupdate' and 'password' not in config: raise ConfigError(f'"password" {error_msg_req}') # Other RFC2136 specific configuration validation if config['protocol'] == 'nsupdate': if 'password' in config: raise ConfigError(f'"password" {error_msg_uns} with protocol "{config["protocol"]}"') for field in ['server', 'key']: if field not in config: raise ConfigError(f'"{field}" {error_msg_req} with protocol "{config["protocol"]}"') if config['protocol'] in zone_necessary and 'zone' not in config: raise ConfigError(f'"zone" {error_msg_req} with protocol "{config["protocol"]}"') if config['protocol'] not in zone_supported and 'zone' in config: raise ConfigError(f'"zone" {error_msg_uns} with protocol "{config["protocol"]}"') if config['protocol'] not in username_unnecessary and 'username' not in config: raise ConfigError(f'"username" {error_msg_req} with protocol "{config["protocol"]}"') if config['protocol'] not in ttl_supported and 'ttl' in config: raise ConfigError(f'"ttl" {error_msg_uns} with protocol "{config["protocol"]}"') if config['ip_version'] == 'both': if config['protocol'] not in dualstack_supported: raise ConfigError(f'Both IPv4 and IPv6 at the same time {error_msg_uns} ' f'with protocol "{config["protocol"]}"') # dyndns2 protocol in ddclient honors dual stack only for dyn.com (dyndns.org) if config['protocol'] == 'dyndns2' and 'server' in config and config['server'] not in dyndns_dualstack_servers: raise ConfigError(f'Both IPv4 and IPv6 at the same time {error_msg_uns} ' f'for "{config["server"]}" with protocol "{config["protocol"]}"') if {'wait_time', 'expiry_time'} <= config.keys() and int(config['expiry_time']) < int(config['wait_time']): raise ConfigError(f'"expiry-time" must be greater than "wait-time" for ' f'Dynamic DNS service "{service}"') return None def generate(dyndns): # bail out early - looks like removal from running config if not dyndns or 'name' not in dyndns: return None render(config_file, 'dns-dynamic/ddclient.conf.j2', dyndns, permission=0o600) render(systemd_override, 'dns-dynamic/override.conf.j2', dyndns) return None def apply(dyndns): systemd_service = 'ddclient.service' # Reload systemd manager configuration call('systemctl daemon-reload') # bail out early - looks like removal from running config if not dyndns or 'name' not in dyndns: call(f'systemctl stop {systemd_service}') if os.path.exists(config_file): os.unlink(config_file) else: call(f'systemctl reload-or-restart {systemd_service}') return None if __name__ == '__main__': try: c = get_config() verify(c) generate(c) apply(c) except ConfigError as e: print(e) exit(1) diff --git a/src/conf_mode/vpn_ipsec.py b/src/conf_mode/vpn_ipsec.py index adbac0405..d074ed159 100755 --- a/src/conf_mode/vpn_ipsec.py +++ b/src/conf_mode/vpn_ipsec.py @@ -1,595 +1,603 @@ #!/usr/bin/env python3 # # Copyright (C) 2021-2024 VyOS maintainers and contributors # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License version 2 or later as # published by the Free Software Foundation. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. import ipaddress import os import re import jmespath from sys import exit from time import sleep from time import time from vyos.base import Warning from vyos.config import Config from vyos.configdict import leaf_node_changed from vyos.configverify import verify_interface_exists +from vyos.configverify import dynamic_interface_pattern from vyos.defaults import directories from vyos.ifconfig import Interface from vyos.pki import encode_certificate from vyos.pki import encode_public_key from vyos.pki import find_chain from vyos.pki import load_certificate from vyos.pki import load_private_key from vyos.pki import wrap_certificate from vyos.pki import wrap_crl from vyos.pki import wrap_public_key from vyos.pki import wrap_private_key from vyos.template import ip_from_cidr from vyos.template import is_ipv4 from vyos.template import is_ipv6 from vyos.template import render from vyos.utils.network import is_ipv6_link_local from vyos.utils.network import interface_exists from vyos.utils.dict import dict_search from vyos.utils.dict import dict_search_args from vyos.utils.process import call from vyos.utils.process import run from vyos import ConfigError from vyos import airbag airbag.enable() dhcp_wait_attempts = 2 dhcp_wait_sleep = 1 swanctl_dir = '/etc/swanctl' charon_conf = '/etc/strongswan.d/charon.conf' charon_dhcp_conf = '/etc/strongswan.d/charon/dhcp.conf' charon_radius_conf = '/etc/strongswan.d/charon/eap-radius.conf' interface_conf = '/etc/strongswan.d/interfaces_use.conf' swanctl_conf = f'{swanctl_dir}/swanctl.conf' default_install_routes = 'yes' vici_socket = '/var/run/charon.vici' CERT_PATH = f'{swanctl_dir}/x509/' PUBKEY_PATH = f'{swanctl_dir}/pubkey/' KEY_PATH = f'{swanctl_dir}/private/' CA_PATH = f'{swanctl_dir}/x509ca/' CRL_PATH = f'{swanctl_dir}/x509crl/' DHCP_HOOK_IFLIST = '/tmp/ipsec_dhcp_waiting' def get_config(config=None): if config: conf = config else: conf = Config() base = ['vpn', 'ipsec'] l2tp_base = ['vpn', 'l2tp', 'remote-access', 'ipsec-settings'] if not conf.exists(base): return None # retrieve common dictionary keys ipsec = conf.get_config_dict(base, key_mangling=('-', '_'), no_tag_node_value_mangle=True, get_first_key=True, with_recursive_defaults=True, with_pki=True) ipsec['dhcp_no_address'] = {} ipsec['install_routes'] = 'no' if conf.exists(base + ["options", "disable-route-autoinstall"]) else default_install_routes ipsec['interface_change'] = leaf_node_changed(conf, base + ['interface']) ipsec['nhrp_exists'] = conf.exists(['protocols', 'nhrp', 'tunnel']) tmp = conf.get_config_dict(l2tp_base, key_mangling=('-', '_'), no_tag_node_value_mangle=True, get_first_key=True) if tmp: ipsec['l2tp'] = conf.merge_defaults(tmp, recursive=True) ipsec['l2tp_outside_address'] = conf.return_value(['vpn', 'l2tp', 'remote-access', 'outside-address']) ipsec['l2tp_ike_default'] = 'aes256-sha1-modp1024,3des-sha1-modp1024' ipsec['l2tp_esp_default'] = 'aes256-sha1,3des-sha1' return ipsec def get_dhcp_address(iface): addresses = Interface(iface).get_addr() if not addresses: return None for address in addresses: if not is_ipv6_link_local(address): return ip_from_cidr(address) return None def verify_pki_x509(pki, x509_conf): if not pki or 'ca' not in pki or 'certificate' not in pki: raise ConfigError(f'PKI is not configured') ca_cert_name = x509_conf['ca_certificate'] cert_name = x509_conf['certificate'] if not dict_search_args(pki, 'ca', ca_cert_name, 'certificate'): raise ConfigError(f'Missing CA certificate on specified PKI CA certificate "{ca_cert_name}"') if not dict_search_args(pki, 'certificate', cert_name, 'certificate'): raise ConfigError(f'Missing certificate on specified PKI certificate "{cert_name}"') if not dict_search_args(pki, 'certificate', cert_name, 'private', 'key'): raise ConfigError(f'Missing private key on specified PKI certificate "{cert_name}"') return True def verify_pki_rsa(pki, rsa_conf): if not pki or 'key_pair' not in pki: raise ConfigError(f'PKI is not configured') local_key = rsa_conf['local_key'] remote_key = rsa_conf['remote_key'] if not dict_search_args(pki, 'key_pair', local_key, 'private', 'key'): raise ConfigError(f'Missing private key on specified local-key "{local_key}"') if not dict_search_args(pki, 'key_pair', remote_key, 'public', 'key'): raise ConfigError(f'Missing public key on specified remote-key "{remote_key}"') return True def verify(ipsec): if not ipsec: return None if 'authentication' in ipsec: if 'psk' in ipsec['authentication']: for psk, psk_config in ipsec['authentication']['psk'].items(): if 'id' not in psk_config or 'secret' not in psk_config: raise ConfigError(f'Authentication psk "{psk}" missing "id" or "secret"') if 'interface' in ipsec: - for ifname in ipsec['interface']: - verify_interface_exists(ifname) + tmp = re.compile(dynamic_interface_pattern) + for interface in ipsec['interface']: + # exclude check interface for dynamic interfaces + if tmp.match(interface): + if not interface_exists(interface): + Warning(f'Interface "{interface}" does not exist yet and cannot be used ' + f'for IPsec until it is up!') + else: + verify_interface_exists(interface) if 'l2tp' in ipsec: if 'esp_group' in ipsec['l2tp']: if 'esp_group' not in ipsec or ipsec['l2tp']['esp_group'] not in ipsec['esp_group']: raise ConfigError(f"Invalid esp-group on L2TP remote-access config") if 'ike_group' in ipsec['l2tp']: if 'ike_group' not in ipsec or ipsec['l2tp']['ike_group'] not in ipsec['ike_group']: raise ConfigError(f"Invalid ike-group on L2TP remote-access config") if 'authentication' not in ipsec['l2tp']: raise ConfigError(f'Missing authentication settings on L2TP remote-access config') if 'mode' not in ipsec['l2tp']['authentication']: raise ConfigError(f'Missing authentication mode on L2TP remote-access config') if not ipsec['l2tp_outside_address']: raise ConfigError(f'Missing outside-address on L2TP remote-access config') if ipsec['l2tp']['authentication']['mode'] == 'pre-shared-secret': if 'pre_shared_secret' not in ipsec['l2tp']['authentication']: raise ConfigError(f'Missing pre shared secret on L2TP remote-access config') if ipsec['l2tp']['authentication']['mode'] == 'x509': if 'x509' not in ipsec['l2tp']['authentication']: raise ConfigError(f'Missing x509 settings on L2TP remote-access config') x509 = ipsec['l2tp']['authentication']['x509'] if 'ca_certificate' not in x509 or 'certificate' not in x509: raise ConfigError(f'Missing x509 certificates on L2TP remote-access config') verify_pki_x509(ipsec['pki'], x509) if 'profile' in ipsec: for profile, profile_conf in ipsec['profile'].items(): if 'esp_group' in profile_conf: if 'esp_group' not in ipsec or profile_conf['esp_group'] not in ipsec['esp_group']: raise ConfigError(f"Invalid esp-group on {profile} profile") else: raise ConfigError(f"Missing esp-group on {profile} profile") if 'ike_group' in profile_conf: if 'ike_group' not in ipsec or profile_conf['ike_group'] not in ipsec['ike_group']: raise ConfigError(f"Invalid ike-group on {profile} profile") else: raise ConfigError(f"Missing ike-group on {profile} profile") if 'authentication' not in profile_conf: raise ConfigError(f"Missing authentication on {profile} profile") if 'remote_access' in ipsec: if 'connection' in ipsec['remote_access']: for name, ra_conf in ipsec['remote_access']['connection'].items(): if 'esp_group' in ra_conf: if 'esp_group' not in ipsec or ra_conf['esp_group'] not in ipsec['esp_group']: raise ConfigError(f"Invalid esp-group on {name} remote-access config") else: raise ConfigError(f"Missing esp-group on {name} remote-access config") if 'ike_group' in ra_conf: if 'ike_group' not in ipsec or ra_conf['ike_group'] not in ipsec['ike_group']: raise ConfigError(f"Invalid ike-group on {name} remote-access config") ike = ra_conf['ike_group'] if dict_search(f'ike_group.{ike}.key_exchange', ipsec) != 'ikev2': raise ConfigError('IPsec remote-access connections requires IKEv2!') else: raise ConfigError(f"Missing ike-group on {name} remote-access config") if 'authentication' not in ra_conf: raise ConfigError(f"Missing authentication on {name} remote-access config") if ra_conf['authentication']['server_mode'] == 'x509': if 'x509' not in ra_conf['authentication']: raise ConfigError(f"Missing x509 settings on {name} remote-access config") x509 = ra_conf['authentication']['x509'] if 'ca_certificate' not in x509 or 'certificate' not in x509: raise ConfigError(f"Missing x509 certificates on {name} remote-access config") verify_pki_x509(ipsec['pki'], x509) elif ra_conf['authentication']['server_mode'] == 'pre-shared-secret': if 'pre_shared_secret' not in ra_conf['authentication']: raise ConfigError(f"Missing pre-shared-key on {name} remote-access config") if 'client_mode' not in ra_conf['authentication']: raise ConfigError('Client authentication method is required!') if dict_search('authentication.client_mode', ra_conf) == 'eap-radius': if dict_search('remote_access.radius.server', ipsec) == None: raise ConfigError('RADIUS authentication requires at least one server') if 'pool' in ra_conf: if {'dhcp', 'radius'} <= set(ra_conf['pool']): raise ConfigError(f'Can not use both DHCP and RADIUS for address allocation '\ f'at the same time for "{name}"!') if 'dhcp' in ra_conf['pool'] and len(ra_conf['pool']) > 1: raise ConfigError(f'Can not use DHCP and a predefined address pool for "{name}"!') if 'radius' in ra_conf['pool'] and len(ra_conf['pool']) > 1: raise ConfigError(f'Can not use RADIUS and a predefined address pool for "{name}"!') for pool in ra_conf['pool']: if pool == 'dhcp': if dict_search('remote_access.dhcp.server', ipsec) == None: raise ConfigError('IPsec DHCP server is not configured!') elif pool == 'radius': if dict_search('remote_access.radius.server', ipsec) == None: raise ConfigError('IPsec RADIUS server is not configured!') if dict_search('authentication.client_mode', ra_conf) != 'eap-radius': raise ConfigError('RADIUS IP pool requires eap-radius client authentication!') elif 'pool' not in ipsec['remote_access'] or pool not in ipsec['remote_access']['pool']: raise ConfigError(f'Requested pool "{pool}" does not exist!') if 'pool' in ipsec['remote_access']: for pool, pool_config in ipsec['remote_access']['pool'].items(): if 'prefix' not in pool_config: raise ConfigError(f'Missing madatory prefix option for pool "{pool}"!') if 'name_server' in pool_config: if len(pool_config['name_server']) > 2: raise ConfigError(f'Only two name-servers are supported for remote-access pool "{pool}"!') for ns in pool_config['name_server']: v4_addr_and_ns = is_ipv4(ns) and not is_ipv4(pool_config['prefix']) v6_addr_and_ns = is_ipv6(ns) and not is_ipv6(pool_config['prefix']) if v4_addr_and_ns or v6_addr_and_ns: raise ConfigError('Must use both IPv4 or IPv6 addresses for pool prefix and name-server adresses!') if 'exclude' in pool_config: for exclude in pool_config['exclude']: v4_addr_and_exclude = is_ipv4(exclude) and not is_ipv4(pool_config['prefix']) v6_addr_and_exclude = is_ipv6(exclude) and not is_ipv6(pool_config['prefix']) if v4_addr_and_exclude or v6_addr_and_exclude: raise ConfigError('Must use both IPv4 or IPv6 addresses for pool prefix and exclude prefixes!') if 'radius' in ipsec['remote_access'] and 'server' in ipsec['remote_access']['radius']: for server, server_config in ipsec['remote_access']['radius']['server'].items(): if 'key' not in server_config: raise ConfigError(f'Missing RADIUS secret key for server "{server}"') if 'site_to_site' in ipsec and 'peer' in ipsec['site_to_site']: for peer, peer_conf in ipsec['site_to_site']['peer'].items(): has_default_esp = False # Peer name it is swanctl connection name and shouldn't contain dots or colons, T4118 if bool(re.search(':|\.', peer)): raise ConfigError(f'Incorrect peer name "{peer}" ' f'Peer name can contain alpha-numeric letters, hyphen and underscore') if 'remote_address' not in peer_conf: print(f'You should set correct remote-address "peer {peer} remote-address x.x.x.x"\n') if 'default_esp_group' in peer_conf: has_default_esp = True if 'esp_group' not in ipsec or peer_conf['default_esp_group'] not in ipsec['esp_group']: raise ConfigError(f"Invalid esp-group on site-to-site peer {peer}") if 'ike_group' in peer_conf: if 'ike_group' not in ipsec or peer_conf['ike_group'] not in ipsec['ike_group']: raise ConfigError(f"Invalid ike-group on site-to-site peer {peer}") else: raise ConfigError(f"Missing ike-group on site-to-site peer {peer}") if 'authentication' not in peer_conf or 'mode' not in peer_conf['authentication']: raise ConfigError(f"Missing authentication on site-to-site peer {peer}") if {'id', 'use_x509_id'} <= set(peer_conf['authentication']): raise ConfigError(f"Manually set peer id and use-x509-id are mutually exclusive!") if peer_conf['authentication']['mode'] == 'x509': if 'x509' not in peer_conf['authentication']: raise ConfigError(f"Missing x509 settings on site-to-site peer {peer}") x509 = peer_conf['authentication']['x509'] if 'ca_certificate' not in x509 or 'certificate' not in x509: raise ConfigError(f"Missing x509 certificates on site-to-site peer {peer}") verify_pki_x509(ipsec['pki'], x509) elif peer_conf['authentication']['mode'] == 'rsa': if 'rsa' not in peer_conf['authentication']: raise ConfigError(f"Missing RSA settings on site-to-site peer {peer}") rsa = peer_conf['authentication']['rsa'] if 'local_key' not in rsa: raise ConfigError(f"Missing RSA local-key on site-to-site peer {peer}") if 'remote_key' not in rsa: raise ConfigError(f"Missing RSA remote-key on site-to-site peer {peer}") verify_pki_rsa(ipsec['pki'], rsa) if 'local_address' not in peer_conf and 'dhcp_interface' not in peer_conf: raise ConfigError(f"Missing local-address or dhcp-interface on site-to-site peer {peer}") if 'dhcp_interface' in peer_conf: dhcp_interface = peer_conf['dhcp_interface'] verify_interface_exists(dhcp_interface) dhcp_base = directories['isc_dhclient_dir'] if not os.path.exists(f'{dhcp_base}/dhclient_{dhcp_interface}.conf'): raise ConfigError(f"Invalid dhcp-interface on site-to-site peer {peer}") address = get_dhcp_address(dhcp_interface) count = 0 while not address and count < dhcp_wait_attempts: address = get_dhcp_address(dhcp_interface) count += 1 sleep(dhcp_wait_sleep) if not address: ipsec['dhcp_no_address'][peer] = dhcp_interface print(f"Failed to get address from dhcp-interface on site-to-site peer {peer} -- skipped") continue if 'vti' in peer_conf: if 'local_address' in peer_conf and 'dhcp_interface' in peer_conf: raise ConfigError(f"A single local-address or dhcp-interface is required when using VTI on site-to-site peer {peer}") if dict_search('options.disable_route_autoinstall', ipsec) == None: Warning('It\'s recommended to use ipsec vti with the next command\n[set vpn ipsec option disable-route-autoinstall]') if 'bind' in peer_conf['vti']: vti_interface = peer_conf['vti']['bind'] if not interface_exists(vti_interface): raise ConfigError(f'VTI interface {vti_interface} for site-to-site peer {peer} does not exist!') if 'vti' not in peer_conf and 'tunnel' not in peer_conf: raise ConfigError(f"No VTI or tunnel specified on site-to-site peer {peer}") if 'tunnel' in peer_conf: for tunnel, tunnel_conf in peer_conf['tunnel'].items(): if 'esp_group' not in tunnel_conf and not has_default_esp: raise ConfigError(f"Missing esp-group on tunnel {tunnel} for site-to-site peer {peer}") esp_group_name = tunnel_conf['esp_group'] if 'esp_group' in tunnel_conf else peer_conf['default_esp_group'] if esp_group_name not in ipsec['esp_group']: raise ConfigError(f"Invalid esp-group on tunnel {tunnel} for site-to-site peer {peer}") esp_group = ipsec['esp_group'][esp_group_name] if 'mode' in esp_group and esp_group['mode'] == 'transport': if 'protocol' in tunnel_conf and ((peer in ['any', '0.0.0.0']) or ('local_address' not in peer_conf or peer_conf['local_address'] in ['any', '0.0.0.0'])): raise ConfigError(f"Fixed local-address or peer required when a protocol is defined with ESP transport mode on tunnel {tunnel} for site-to-site peer {peer}") if ('local' in tunnel_conf and 'prefix' in tunnel_conf['local']) or ('remote' in tunnel_conf and 'prefix' in tunnel_conf['remote']): raise ConfigError(f"Local/remote prefix cannot be used with ESP transport mode on tunnel {tunnel} for site-to-site peer {peer}") def cleanup_pki_files(): for path in [CERT_PATH, CA_PATH, CRL_PATH, KEY_PATH, PUBKEY_PATH]: if not os.path.exists(path): continue for file in os.listdir(path): file_path = os.path.join(path, file) if os.path.isfile(file_path): os.unlink(file_path) def generate_pki_files_x509(pki, x509_conf): ca_cert_name = x509_conf['ca_certificate'] ca_cert_data = dict_search_args(pki, 'ca', ca_cert_name, 'certificate') ca_cert_crls = dict_search_args(pki, 'ca', ca_cert_name, 'crl') or [] ca_index = 1 crl_index = 1 ca_cert = load_certificate(ca_cert_data) pki_ca_certs = [load_certificate(ca['certificate']) for ca in pki['ca'].values()] ca_cert_chain = find_chain(ca_cert, pki_ca_certs) cert_name = x509_conf['certificate'] cert_data = dict_search_args(pki, 'certificate', cert_name, 'certificate') key_data = dict_search_args(pki, 'certificate', cert_name, 'private', 'key') protected = 'passphrase' in x509_conf for ca_cert_obj in ca_cert_chain: with open(os.path.join(CA_PATH, f'{ca_cert_name}_{ca_index}.pem'), 'w') as f: f.write(encode_certificate(ca_cert_obj)) ca_index += 1 for crl in ca_cert_crls: with open(os.path.join(CRL_PATH, f'{ca_cert_name}_{crl_index}.pem'), 'w') as f: f.write(wrap_crl(crl)) crl_index += 1 with open(os.path.join(CERT_PATH, f'{cert_name}.pem'), 'w') as f: f.write(wrap_certificate(cert_data)) with open(os.path.join(KEY_PATH, f'x509_{cert_name}.pem'), 'w') as f: f.write(wrap_private_key(key_data, protected)) def generate_pki_files_rsa(pki, rsa_conf): local_key_name = rsa_conf['local_key'] local_key_data = dict_search_args(pki, 'key_pair', local_key_name, 'private', 'key') protected = 'passphrase' in rsa_conf remote_key_name = rsa_conf['remote_key'] remote_key_data = dict_search_args(pki, 'key_pair', remote_key_name, 'public', 'key') local_key = load_private_key(local_key_data, rsa_conf['passphrase'] if protected else None) with open(os.path.join(KEY_PATH, f'rsa_{local_key_name}.pem'), 'w') as f: f.write(wrap_private_key(local_key_data, protected)) with open(os.path.join(PUBKEY_PATH, f'{local_key_name}.pem'), 'w') as f: f.write(encode_public_key(local_key.public_key())) with open(os.path.join(PUBKEY_PATH, f'{remote_key_name}.pem'), 'w') as f: f.write(wrap_public_key(remote_key_data)) def generate(ipsec): cleanup_pki_files() if not ipsec: for config_file in [charon_dhcp_conf, charon_radius_conf, interface_conf, swanctl_conf]: if os.path.isfile(config_file): os.unlink(config_file) render(charon_conf, 'ipsec/charon.j2', {'install_routes': default_install_routes}) return if ipsec['dhcp_no_address']: with open(DHCP_HOOK_IFLIST, 'w') as f: f.write(" ".join(ipsec['dhcp_no_address'].values())) elif os.path.exists(DHCP_HOOK_IFLIST): os.unlink(DHCP_HOOK_IFLIST) for path in [swanctl_dir, CERT_PATH, CA_PATH, CRL_PATH, PUBKEY_PATH]: if not os.path.exists(path): os.mkdir(path, mode=0o755) if not os.path.exists(KEY_PATH): os.mkdir(KEY_PATH, mode=0o700) if 'l2tp' in ipsec: if 'authentication' in ipsec['l2tp'] and 'x509' in ipsec['l2tp']['authentication']: generate_pki_files_x509(ipsec['pki'], ipsec['l2tp']['authentication']['x509']) if 'remote_access' in ipsec and 'connection' in ipsec['remote_access']: for rw, rw_conf in ipsec['remote_access']['connection'].items(): if 'authentication' in rw_conf and 'x509' in rw_conf['authentication']: generate_pki_files_x509(ipsec['pki'], rw_conf['authentication']['x509']) if 'site_to_site' in ipsec and 'peer' in ipsec['site_to_site']: for peer, peer_conf in ipsec['site_to_site']['peer'].items(): if peer in ipsec['dhcp_no_address']: continue if peer_conf['authentication']['mode'] == 'x509': generate_pki_files_x509(ipsec['pki'], peer_conf['authentication']['x509']) elif peer_conf['authentication']['mode'] == 'rsa': generate_pki_files_rsa(ipsec['pki'], peer_conf['authentication']['rsa']) local_ip = '' if 'local_address' in peer_conf: local_ip = peer_conf['local_address'] elif 'dhcp_interface' in peer_conf: local_ip = get_dhcp_address(peer_conf['dhcp_interface']) ipsec['site_to_site']['peer'][peer]['local_address'] = local_ip if 'tunnel' in peer_conf: for tunnel, tunnel_conf in peer_conf['tunnel'].items(): local_prefixes = dict_search_args(tunnel_conf, 'local', 'prefix') remote_prefixes = dict_search_args(tunnel_conf, 'remote', 'prefix') if not local_prefixes or not remote_prefixes: continue passthrough = None for local_prefix in local_prefixes: for remote_prefix in remote_prefixes: local_net = ipaddress.ip_network(local_prefix) remote_net = ipaddress.ip_network(remote_prefix) if local_net.overlaps(remote_net): if passthrough is None: passthrough = [] passthrough.append(local_prefix) ipsec['site_to_site']['peer'][peer]['tunnel'][tunnel]['passthrough'] = passthrough # auth psk <tag> dhcp-interface <xxx> if jmespath.search('authentication.psk.*.dhcp_interface', ipsec): for psk, psk_config in ipsec['authentication']['psk'].items(): if 'dhcp_interface' in psk_config: for iface in psk_config['dhcp_interface']: id = get_dhcp_address(iface) if id: ipsec['authentication']['psk'][psk]['id'].append(id) render(charon_conf, 'ipsec/charon.j2', ipsec) render(charon_dhcp_conf, 'ipsec/charon/dhcp.conf.j2', ipsec) render(charon_radius_conf, 'ipsec/charon/eap-radius.conf.j2', ipsec) render(interface_conf, 'ipsec/interfaces_use.conf.j2', ipsec) render(swanctl_conf, 'ipsec/swanctl.conf.j2', ipsec) def resync_nhrp(ipsec): if ipsec and not ipsec['nhrp_exists']: return tmp = run('/usr/libexec/vyos/conf_mode/protocols_nhrp.py') if tmp > 0: print('ERROR: failed to reapply NHRP settings!') def apply(ipsec): systemd_service = 'strongswan.service' if not ipsec: call(f'systemctl stop {systemd_service}') else: call(f'systemctl reload-or-restart {systemd_service}') resync_nhrp(ipsec) if __name__ == '__main__': try: ipsec = get_config() verify(ipsec) generate(ipsec) apply(ipsec) except ConfigError as e: print(e) exit(1)