diff --git a/smoketest/scripts/cli/test_vpn_ipsec.py b/smoketest/scripts/cli/test_vpn_ipsec.py index 17e12bcaf..f5369ee7a 100755 --- a/smoketest/scripts/cli/test_vpn_ipsec.py +++ b/smoketest/scripts/cli/test_vpn_ipsec.py @@ -1,898 +1,899 @@ #!/usr/bin/env python3 # # Copyright (C) 2021-2023 VyOS maintainers and contributors # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License version 2 or later as # published by the Free Software Foundation. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. import os import unittest from base_vyostest_shim import VyOSUnitTestSHIM from vyos.configsession import ConfigSessionError from vyos.utils.process import call from vyos.utils.process import process_named_running from vyos.utils.file import read_file ethernet_path = ['interfaces', 'ethernet'] tunnel_path = ['interfaces', 'tunnel'] vti_path = ['interfaces', 'vti'] nhrp_path = ['protocols', 'nhrp'] base_path = ['vpn', 'ipsec'] charon_file = '/etc/strongswan.d/charon.conf' dhcp_waiting_file = '/tmp/ipsec_dhcp_waiting' swanctl_file = '/etc/swanctl/swanctl.conf' peer_ip = '203.0.113.45' connection_name = 'main-branch' local_id = 'left' remote_id = 'right' interface = 'eth1' vif = '100' esp_group = 'MyESPGroup' ike_group = 'MyIKEGroup' secret = 'MYSECRETKEY' PROCESS_NAME = 'charon-systemd' regex_uuid4 = '[0-9a-fA-F]{8}\-[0-9a-fA-F]{4}\-[0-9a-fA-F]{4}\-[0-9a-fA-F]{4}\-[0-9a-fA-F]{12}' ca_name = 'MyVyOS-CA' ca_pem = """ MIICMDCCAdegAwIBAgIUBCzIjYvD7SPbx5oU18IYg7NVxQ0wCgYIKoZIzj0EAwIw ZzELMAkGA1UEBhMCR0IxEzARBgNVBAgMClNvbWUtU3RhdGUxEjAQBgNVBAcMCVNv bWUtQ2l0eTENMAsGA1UECgwEVnlPUzEgMB4GA1UEAwwXSVBTZWMgU21va2V0ZXN0 IFJvb3QgQ0EwHhcNMjMwOTI0MTIwMzQxWhcNMzMwOTIxMTIwMzQxWjBnMQswCQYD VQQGEwJHQjETMBEGA1UECAwKU29tZS1TdGF0ZTESMBAGA1UEBwwJU29tZS1DaXR5 MQ0wCwYDVQQKDARWeU9TMSAwHgYDVQQDDBdJUFNlYyBTbW9rZXRlc3QgUm9vdCBD QTBZMBMGByqGSM49AgEGCCqGSM49AwEHA0IABEh8/yU572B3zmFxrGgHk+H7grYt EHUJodY3gXNWMHz0gySrbGhsGtECDfP/G+T4Suk7cuVzB1wnLocSafD8TcqjYTBf MA8GA1UdEwEB/wQFMAMBAf8wDgYDVR0PAQH/BAQDAgGGMB0GA1UdJQQWMBQGCCsG AQUFBwMCBggrBgEFBQcDATAdBgNVHQ4EFgQUTYoQJNlk7X87/gRegHnCnPef39Aw CgYIKoZIzj0EAwIDRwAwRAIgX1spXjrUc10r3g/Zm4O31LU5O08J2vVqFo94zHE5 0VgCIG4JK9Zg5O/yn4mYksZux7efiHRUzL2y2TXQ9IqrqM8W """ int_ca_name = 'MyVyOS-IntCA' int_ca_pem = """ MIICYDCCAgWgAwIBAgIUcFx2BVYErHI+SneyPYHijxXt1cgwCgYIKoZIzj0EAwIw ZzELMAkGA1UEBhMCR0IxEzARBgNVBAgMClNvbWUtU3RhdGUxEjAQBgNVBAcMCVNv bWUtQ2l0eTENMAsGA1UECgwEVnlPUzEgMB4GA1UEAwwXSVBTZWMgU21va2V0ZXN0 IFJvb3QgQ0EwHhcNMjMwOTI0MTIwNTE5WhcNMzMwOTIwMTIwNTE5WjBvMQswCQYD VQQGEwJHQjETMBEGA1UECAwKU29tZS1TdGF0ZTESMBAGA1UEBwwJU29tZS1DaXR5 MQ0wCwYDVQQKDARWeU9TMSgwJgYDVQQDDB9JUFNlYyBTbW9rZXRlc3QgSW50ZXJt ZWRpYXRlIENBMFkwEwYHKoZIzj0CAQYIKoZIzj0DAQcDQgAEIHw2G5dq3c715AcA tzR++dYu1fLRFmHzRGTZOT7hLrh2Fg4hnKFPLOeUA5Qi50xCvjJ9JnonTyy2RfRH axYizKOBhjCBgzASBgNVHRMBAf8ECDAGAQH/AgEAMA4GA1UdDwEB/wQEAwIBhjAd BgNVHSUEFjAUBggrBgEFBQcDAgYIKwYBBQUHAwEwHQYDVR0OBBYEFC9KrFYtA+hO l7vdMbWxTMAyLB7BMB8GA1UdIwQYMBaAFE2KECTZZO1/O/4EXoB5wpz3n9/QMAoG CCqGSM49BAMCA0kAMEYCIQCnqWbElgOL9dGO3iLxasFNq/hM7vM/DzaiHi4BowxW 0gIhAMohefNj+QgLfPhvyODHIPE9LMyfp7lJEaCC2K8PCSFD """ peer_name = 'peer1' peer_cert = """ MIICSTCCAfCgAwIBAgIUPxYleUgCo/glVVePze3QmAFgi6MwCgYIKoZIzj0EAwIw bzELMAkGA1UEBhMCR0IxEzARBgNVBAgMClNvbWUtU3RhdGUxEjAQBgNVBAcMCVNv bWUtQ2l0eTENMAsGA1UECgwEVnlPUzEoMCYGA1UEAwwfSVBTZWMgU21va2V0ZXN0 IEludGVybWVkaWF0ZSBDQTAeFw0yMzA5MjQxMjA2NDJaFw0yODA5MjIxMjA2NDJa MGQxCzAJBgNVBAYTAkdCMRMwEQYDVQQIDApTb21lLVN0YXRlMRIwEAYDVQQHDAlT b21lLUNpdHkxDTALBgNVBAoMBFZ5T1MxHTAbBgNVBAMMFElQU2VjIFNtb2tldGVz dCBQZWVyMFkwEwYHKoZIzj0CAQYIKoZIzj0DAQcDQgAEZJtuTDu84uy++GMwRNLl 10JAXZxXQSDl+CdTWwjbQZURcdY+ia7BoaoYX/0VKPel3Se64rIUQQLQoY/9MJb9 UKN1MHMwDAYDVR0TAQH/BAIwADAOBgNVHQ8BAf8EBAMCB4AwEwYDVR0lBAwwCgYI KwYBBQUHAwEwHQYDVR0OBBYEFNJCdnkm3cAmf04UwOKL7IqMJ6OXMB8GA1UdIwQY MBaAFC9KrFYtA+hOl7vdMbWxTMAyLB7BMAoGCCqGSM49BAMCA0cAMEQCIGVnDRUy UJ0U/deDvrBo1+AakZndkNAMN/XNo5a5GzhEAiBCY7E/3b0BIO8FiIbVB3iDcaxg g7ET2RgWxvhEoN3ZRw== """ peer_key = """ MIGHAgEAMBMGByqGSM49AgEGCCqGSM49AwEHBG0wawIBAQQgVDEZDK7q/T+tiJUV WLKS3ZYDfZ4lZv0C1gJpYq0gWP2hRANCAARkm25MO7zi7L74YzBE0uXXQkBdnFdB IOX4J1NbCNtBlRFx1j6JrsGhqhhf/RUo96XdJ7rishRBAtChj/0wlv1Q """ swanctl_dir = '/etc/swanctl' CERT_PATH = f'{swanctl_dir}/x509/' CA_PATH = f'{swanctl_dir}/x509ca/' class TestVPNIPsec(VyOSUnitTestSHIM.TestCase): skip_process_check = False @classmethod def setUpClass(cls): super(TestVPNIPsec, cls).setUpClass() # ensure we can also run this test on a live system - so lets clean # out the current configuration :) cls.cli_delete(cls, base_path) + cls.cli_delete(cls, ['pki']) cls.cli_set(cls, base_path + ['interface', f'{interface}.{vif}']) @classmethod def tearDownClass(cls): super(TestVPNIPsec, cls).tearDownClass() cls.cli_delete(cls, base_path + ['interface', f'{interface}.{vif}']) def setUp(self): # Set IKE/ESP Groups self.cli_set(base_path + ['esp-group', esp_group, 'proposal', '1', 'encryption', 'aes128']) self.cli_set(base_path + ['esp-group', esp_group, 'proposal', '1', 'hash', 'sha1']) self.cli_set(base_path + ['ike-group', ike_group, 'proposal', '1', 'dh-group', '2']) self.cli_set(base_path + ['ike-group', ike_group, 'proposal', '1', 'encryption', 'aes128']) self.cli_set(base_path + ['ike-group', ike_group, 'proposal', '1', 'hash', 'sha1']) def tearDown(self): # Check for running process if not self.skip_process_check: self.assertTrue(process_named_running(PROCESS_NAME)) else: self.skip_process_check = False # Reset self.cli_delete(base_path) self.cli_delete(tunnel_path) self.cli_commit() # Check for no longer running process self.assertFalse(process_named_running(PROCESS_NAME)) def setupPKI(self): self.cli_set(['pki', 'ca', ca_name, 'certificate', ca_pem.replace('\n','')]) self.cli_set(['pki', 'ca', int_ca_name, 'certificate', int_ca_pem.replace('\n','')]) self.cli_set(['pki', 'certificate', peer_name, 'certificate', peer_cert.replace('\n','')]) self.cli_set(['pki', 'certificate', peer_name, 'private', 'key', peer_key.replace('\n','')]) def tearDownPKI(self): self.cli_delete(['pki']) def test_01_dhcp_fail_handling(self): # Skip process check - connection is not created for this test self.skip_process_check = True # Interface for dhcp-interface self.cli_set(ethernet_path + [interface, 'vif', vif, 'address', 'dhcp']) # Use VLAN to avoid getting IP from qemu dhcp server # vpn ipsec auth psk <tag> id <x.x.x.x> self.cli_set(base_path + ['authentication', 'psk', connection_name, 'id', local_id]) self.cli_set(base_path + ['authentication', 'psk', connection_name, 'id', remote_id]) self.cli_set(base_path + ['authentication', 'psk', connection_name, 'id', peer_ip]) self.cli_set(base_path + ['authentication', 'psk', connection_name, 'secret', secret]) # Site to site peer_base_path = base_path + ['site-to-site', 'peer', connection_name] self.cli_set(peer_base_path + ['authentication', 'mode', 'pre-shared-secret']) self.cli_set(peer_base_path + ['ike-group', ike_group]) self.cli_set(peer_base_path + ['default-esp-group', esp_group]) self.cli_set(peer_base_path + ['dhcp-interface', f'{interface}.{vif}']) self.cli_set(peer_base_path + ['tunnel', '1', 'protocol', 'gre']) self.cli_commit() self.assertTrue(os.path.exists(dhcp_waiting_file)) dhcp_waiting = read_file(dhcp_waiting_file) self.assertIn(f'{interface}.{vif}', dhcp_waiting) # Ensure dhcp-failed interface was added for dhclient hook self.cli_delete(ethernet_path + [interface, 'vif', vif, 'address']) def test_02_site_to_site(self): self.cli_set(base_path + ['ike-group', ike_group, 'key-exchange', 'ikev2']) local_address = '192.0.2.10' priority = '20' life_bytes = '100000' life_packets = '2000000' # vpn ipsec auth psk <tag> id <x.x.x.x> self.cli_set(base_path + ['authentication', 'psk', connection_name, 'id', local_id]) self.cli_set(base_path + ['authentication', 'psk', connection_name, 'id', remote_id]) self.cli_set(base_path + ['authentication', 'psk', connection_name, 'id', local_address]) self.cli_set(base_path + ['authentication', 'psk', connection_name, 'id', peer_ip]) self.cli_set(base_path + ['authentication', 'psk', connection_name, 'secret', secret]) # Site to site peer_base_path = base_path + ['site-to-site', 'peer', connection_name] self.cli_set(base_path + ['esp-group', esp_group, 'life-bytes', life_bytes]) self.cli_set(base_path + ['esp-group', esp_group, 'life-packets', life_packets]) self.cli_set(peer_base_path + ['authentication', 'mode', 'pre-shared-secret']) self.cli_set(peer_base_path + ['ike-group', ike_group]) self.cli_set(peer_base_path + ['default-esp-group', esp_group]) self.cli_set(peer_base_path + ['local-address', local_address]) self.cli_set(peer_base_path + ['remote-address', peer_ip]) self.cli_set(peer_base_path + ['tunnel', '1', 'protocol', 'tcp']) self.cli_set(peer_base_path + ['tunnel', '1', 'local', 'prefix', '172.16.10.0/24']) self.cli_set(peer_base_path + ['tunnel', '1', 'local', 'prefix', '172.16.11.0/24']) self.cli_set(peer_base_path + ['tunnel', '1', 'local', 'port', '443']) self.cli_set(peer_base_path + ['tunnel', '1', 'remote', 'prefix', '172.17.10.0/24']) self.cli_set(peer_base_path + ['tunnel', '1', 'remote', 'prefix', '172.17.11.0/24']) self.cli_set(peer_base_path + ['tunnel', '1', 'remote', 'port', '443']) self.cli_set(peer_base_path + ['tunnel', '2', 'local', 'prefix', '10.1.0.0/16']) self.cli_set(peer_base_path + ['tunnel', '2', 'remote', 'prefix', '10.2.0.0/16']) self.cli_set(peer_base_path + ['tunnel', '2', 'priority', priority]) self.cli_commit() # Verify strongSwan configuration swanctl_conf = read_file(swanctl_file) swanctl_conf_lines = [ f'version = 2', f'auth = psk', f'life_bytes = {life_bytes}', f'life_packets = {life_packets}', f'rekey_time = 28800s', # default value f'proposals = aes128-sha1-modp1024', f'esp_proposals = aes128-sha1-modp1024', f'life_time = 3600s', # default value f'local_addrs = {local_address} # dhcp:no', f'remote_addrs = {peer_ip}', f'mode = tunnel', f'{connection_name}-tunnel-1', f'local_ts = 172.16.10.0/24[tcp/443],172.16.11.0/24[tcp/443]', f'remote_ts = 172.17.10.0/24[tcp/443],172.17.11.0/24[tcp/443]', f'mode = tunnel', f'{connection_name}-tunnel-2', f'local_ts = 10.1.0.0/16', f'remote_ts = 10.2.0.0/16', f'priority = {priority}', f'mode = tunnel', ] for line in swanctl_conf_lines: self.assertIn(line, swanctl_conf) swanctl_secrets_lines = [ f'id-{regex_uuid4} = "{local_id}"', f'id-{regex_uuid4} = "{remote_id}"', f'id-{regex_uuid4} = "{local_address}"', f'id-{regex_uuid4} = "{peer_ip}"', f'secret = "{secret}"' ] for line in swanctl_secrets_lines: self.assertRegex(swanctl_conf, fr'{line}') def test_03_site_to_site_vti(self): local_address = '192.0.2.10' vti = 'vti10' # IKE self.cli_set(base_path + ['ike-group', ike_group, 'key-exchange', 'ikev2']) self.cli_set(base_path + ['ike-group', ike_group, 'disable-mobike']) # ESP self.cli_set(base_path + ['esp-group', esp_group, 'compression']) # VTI interface self.cli_set(vti_path + [vti, 'address', '10.1.1.1/24']) # vpn ipsec auth psk <tag> id <x.x.x.x> self.cli_set(base_path + ['authentication', 'psk', connection_name, 'id', local_id]) self.cli_set(base_path + ['authentication', 'psk', connection_name, 'id', remote_id]) self.cli_set(base_path + ['authentication', 'psk', connection_name, 'id', peer_ip]) self.cli_set(base_path + ['authentication', 'psk', connection_name, 'secret', secret]) # Site to site peer_base_path = base_path + ['site-to-site', 'peer', connection_name] self.cli_set(peer_base_path + ['authentication', 'mode', 'pre-shared-secret']) self.cli_set(peer_base_path + ['connection-type', 'none']) self.cli_set(peer_base_path + ['force-udp-encapsulation']) self.cli_set(peer_base_path + ['ike-group', ike_group]) self.cli_set(peer_base_path + ['default-esp-group', esp_group]) self.cli_set(peer_base_path + ['local-address', local_address]) self.cli_set(peer_base_path + ['remote-address', peer_ip]) self.cli_set(peer_base_path + ['tunnel', '1', 'local', 'prefix', '172.16.10.0/24']) self.cli_set(peer_base_path + ['tunnel', '1', 'local', 'prefix', '172.16.11.0/24']) self.cli_set(peer_base_path + ['tunnel', '1', 'remote', 'prefix', '172.17.10.0/24']) self.cli_set(peer_base_path + ['tunnel', '1', 'remote', 'prefix', '172.17.11.0/24']) self.cli_set(peer_base_path + ['vti', 'bind', vti]) self.cli_set(peer_base_path + ['vti', 'esp-group', esp_group]) self.cli_commit() swanctl_conf = read_file(swanctl_file) if_id = vti.lstrip('vti') # The key defaults to 0 and will match any policies which similarly do # not have a lookup key configuration - thus we shift the key by one # to also support a vti0 interface if_id = str(int(if_id) +1) swanctl_conf_lines = [ f'version = 2', f'auth = psk', f'proposals = aes128-sha1-modp1024', f'esp_proposals = aes128-sha1-modp1024', f'local_addrs = {local_address} # dhcp:no', f'mobike = no', f'remote_addrs = {peer_ip}', f'mode = tunnel', f'local_ts = 172.16.10.0/24,172.16.11.0/24', f'remote_ts = 172.17.10.0/24,172.17.11.0/24', f'ipcomp = yes', f'start_action = none', f'if_id_in = {if_id}', # will be 11 for vti10 - shifted by one f'if_id_out = {if_id}', f'updown = "/etc/ipsec.d/vti-up-down {vti}"' ] for line in swanctl_conf_lines: self.assertIn(line, swanctl_conf) swanctl_secrets_lines = [ f'id-{regex_uuid4} = "{local_id}"', f'id-{regex_uuid4} = "{remote_id}"', f'secret = "{secret}"' ] for line in swanctl_secrets_lines: self.assertRegex(swanctl_conf, fr'{line}') def test_04_dmvpn(self): tunnel_if = 'tun100' nhrp_secret = 'secret' ike_lifetime = '3600' esp_lifetime = '1800' # Tunnel self.cli_set(tunnel_path + [tunnel_if, 'address', '172.16.253.134/29']) self.cli_set(tunnel_path + [tunnel_if, 'encapsulation', 'gre']) self.cli_set(tunnel_path + [tunnel_if, 'source-address', '192.0.2.1']) self.cli_set(tunnel_path + [tunnel_if, 'enable-multicast']) self.cli_set(tunnel_path + [tunnel_if, 'parameters', 'ip', 'key', '1']) # NHRP self.cli_set(nhrp_path + ['tunnel', tunnel_if, 'cisco-authentication', nhrp_secret]) self.cli_set(nhrp_path + ['tunnel', tunnel_if, 'holding-time', '300']) self.cli_set(nhrp_path + ['tunnel', tunnel_if, 'multicast', 'dynamic']) self.cli_set(nhrp_path + ['tunnel', tunnel_if, 'redirect']) self.cli_set(nhrp_path + ['tunnel', tunnel_if, 'shortcut']) # IKE/ESP Groups self.cli_set(base_path + ['esp-group', esp_group, 'lifetime', esp_lifetime]) self.cli_set(base_path + ['esp-group', esp_group, 'mode', 'transport']) self.cli_set(base_path + ['esp-group', esp_group, 'pfs', 'dh-group2']) self.cli_set(base_path + ['esp-group', esp_group, 'proposal', '2', 'encryption', 'aes256']) self.cli_set(base_path + ['esp-group', esp_group, 'proposal', '2', 'hash', 'sha1']) self.cli_set(base_path + ['esp-group', esp_group, 'proposal', '3', 'encryption', '3des']) self.cli_set(base_path + ['esp-group', esp_group, 'proposal', '3', 'hash', 'md5']) self.cli_set(base_path + ['ike-group', ike_group, 'key-exchange', 'ikev1']) self.cli_set(base_path + ['ike-group', ike_group, 'lifetime', ike_lifetime]) self.cli_set(base_path + ['ike-group', ike_group, 'proposal', '2', 'dh-group', '2']) self.cli_set(base_path + ['ike-group', ike_group, 'proposal', '2', 'encryption', 'aes256']) self.cli_set(base_path + ['ike-group', ike_group, 'proposal', '2', 'hash', 'sha1']) self.cli_set(base_path + ['ike-group', ike_group, 'proposal', '2', 'prf', 'prfsha1']) # Profile self.cli_set(base_path + ['profile', 'NHRPVPN', 'authentication', 'mode', 'pre-shared-secret']) self.cli_set(base_path + ['profile', 'NHRPVPN', 'authentication', 'pre-shared-secret', nhrp_secret]) self.cli_set(base_path + ['profile', 'NHRPVPN', 'bind', 'tunnel', tunnel_if]) self.cli_set(base_path + ['profile', 'NHRPVPN', 'esp-group', esp_group]) self.cli_set(base_path + ['profile', 'NHRPVPN', 'ike-group', ike_group]) self.cli_commit() swanctl_conf = read_file(swanctl_file) swanctl_lines = [ f'proposals = aes128-sha1-modp1024,aes256-sha1-prfsha1-modp1024', f'version = 1', f'rekey_time = {ike_lifetime}s', f'rekey_time = {esp_lifetime}s', f'esp_proposals = aes128-sha1-modp1024,aes256-sha1-modp1024,3des-md5-modp1024', f'local_ts = dynamic[gre]', f'remote_ts = dynamic[gre]', f'mode = transport', f'secret = {nhrp_secret}' ] for line in swanctl_lines: self.assertIn(line, swanctl_conf) # There is only one NHRP test so no need to delete this globally in tearDown() self.cli_delete(nhrp_path) def test_05_x509_site2site(self): # Enable PKI self.setupPKI() vti = 'vti20' self.cli_set(vti_path + [vti, 'address', '192.168.0.1/31']) peer_ip = '172.18.254.202' connection_name = 'office' local_address = '172.18.254.201' peer_base_path = base_path + ['site-to-site', 'peer', connection_name] self.cli_set(peer_base_path + ['authentication', 'local-id', peer_name]) self.cli_set(peer_base_path + ['authentication', 'mode', 'x509']) self.cli_set(peer_base_path + ['authentication', 'remote-id', 'peer2']) self.cli_set(peer_base_path + ['authentication', 'x509', 'ca-certificate', int_ca_name]) self.cli_set(peer_base_path + ['authentication', 'x509', 'certificate', peer_name]) self.cli_set(peer_base_path + ['connection-type', 'initiate']) self.cli_set(peer_base_path + ['ike-group', ike_group]) self.cli_set(peer_base_path + ['ikev2-reauth', 'inherit']) self.cli_set(peer_base_path + ['local-address', local_address]) self.cli_set(peer_base_path + ['remote-address', peer_ip]) self.cli_set(peer_base_path + ['vti', 'bind', vti]) self.cli_set(peer_base_path + ['vti', 'esp-group', esp_group]) self.cli_commit() swanctl_conf = read_file(swanctl_file) tmp = peer_ip.replace('.', '-') if_id = vti.lstrip('vti') # The key defaults to 0 and will match any policies which similarly do # not have a lookup key configuration - thus we shift the key by one # to also support a vti0 interface if_id = str(int(if_id) +1) swanctl_lines = [ f'{connection_name}', f'version = 0', # key-exchange not set - defaulting to 0 for ikev1 and ikev2 f'send_cert = always', f'mobike = yes', f'keyingtries = 0', f'id = "{peer_name}"', f'auth = pubkey', f'certs = {peer_name}.pem', f'proposals = aes128-sha1-modp1024', f'esp_proposals = aes128-sha1-modp1024', f'local_addrs = {local_address} # dhcp:no', f'remote_addrs = {peer_ip}', f'local_ts = 0.0.0.0/0,::/0', f'remote_ts = 0.0.0.0/0,::/0', f'updown = "/etc/ipsec.d/vti-up-down {vti}"', f'if_id_in = {if_id}', # will be 11 for vti10 f'if_id_out = {if_id}', f'ipcomp = no', f'mode = tunnel', f'start_action = start', ] for line in swanctl_lines: self.assertIn(line, swanctl_conf) swanctl_secrets_lines = [ f'{connection_name}', f'file = {peer_name}.pem', ] for line in swanctl_secrets_lines: self.assertIn(line, swanctl_conf) # Check Root CA, Intermediate CA and Peer cert/key pair is present self.assertTrue(os.path.exists(os.path.join(CA_PATH, f'{int_ca_name}_1.pem'))) self.assertTrue(os.path.exists(os.path.join(CA_PATH, f'{int_ca_name}_2.pem'))) self.assertTrue(os.path.exists(os.path.join(CERT_PATH, f'{peer_name}.pem'))) # There is only one VTI test so no need to delete this globally in tearDown() self.cli_delete(vti_path) # Disable PKI self.tearDownPKI() def test_06_flex_vpn_vips(self): local_address = '192.0.2.5' local_id = 'vyos-r1' remote_id = 'vyos-r2' peer_base_path = base_path + ['site-to-site', 'peer', connection_name] self.cli_set(tunnel_path + ['tun1', 'encapsulation', 'gre']) self.cli_set(tunnel_path + ['tun1', 'source-address', local_address]) self.cli_set(base_path + ['interface', interface]) self.cli_set(base_path + ['options', 'flexvpn']) self.cli_set(base_path + ['options', 'interface', 'tun1']) self.cli_set(base_path + ['ike-group', ike_group, 'key-exchange', 'ikev2']) # vpn ipsec auth psk <tag> id <x.x.x.x> self.cli_set(base_path + ['authentication', 'psk', connection_name, 'id', local_id]) self.cli_set(base_path + ['authentication', 'psk', connection_name, 'id', remote_id]) self.cli_set(base_path + ['authentication', 'psk', connection_name, 'id', local_address]) self.cli_set(base_path + ['authentication', 'psk', connection_name, 'id', peer_ip]) self.cli_set(base_path + ['authentication', 'psk', connection_name, 'secret', secret]) self.cli_set(peer_base_path + ['authentication', 'local-id', local_id]) self.cli_set(peer_base_path + ['authentication', 'mode', 'pre-shared-secret']) self.cli_set(peer_base_path + ['authentication', 'remote-id', remote_id]) self.cli_set(peer_base_path + ['connection-type', 'initiate']) self.cli_set(peer_base_path + ['ike-group', ike_group]) self.cli_set(peer_base_path + ['default-esp-group', esp_group]) self.cli_set(peer_base_path + ['local-address', local_address]) self.cli_set(peer_base_path + ['remote-address', peer_ip]) self.cli_set(peer_base_path + ['tunnel', '1', 'protocol', 'gre']) self.cli_set(peer_base_path + ['virtual-address', '203.0.113.55']) self.cli_set(peer_base_path + ['virtual-address', '203.0.113.56']) self.cli_commit() # Verify strongSwan configuration swanctl_conf = read_file(swanctl_file) swanctl_conf_lines = [ f'version = 2', f'vips = 203.0.113.55, 203.0.113.56', f'life_time = 3600s', # default value f'local_addrs = {local_address} # dhcp:no', f'remote_addrs = {peer_ip}', f'{connection_name}-tunnel-1', f'mode = tunnel', ] for line in swanctl_conf_lines: self.assertIn(line, swanctl_conf) swanctl_secrets_lines = [ f'id-{regex_uuid4} = "{local_id}"', f'id-{regex_uuid4} = "{remote_id}"', f'id-{regex_uuid4} = "{peer_ip}"', f'id-{regex_uuid4} = "{local_address}"', f'secret = "{secret}"', ] for line in swanctl_secrets_lines: self.assertRegex(swanctl_conf, fr'{line}') # Verify charon configuration charon_conf = read_file(charon_file) charon_conf_lines = [ f'# Cisco FlexVPN', f'cisco_flexvpn = yes', f'install_virtual_ip = yes', f'install_virtual_ip_on = tun1', ] for line in charon_conf_lines: self.assertIn(line, charon_conf) def test_07_ikev2_road_warrior(self): # This is a known to be good configuration for Microsoft Windows 10 and Apple iOS 17 self.setupPKI() ike_group = 'IKE-RW' esp_group = 'ESP-RW' conn_name = 'vyos-rw' local_address = '192.0.2.1' ip_pool_name = 'ra-rw-ipv4' username = 'vyos' password = 'secret' ike_lifetime = '7200' eap_lifetime = '3600' local_id = 'ipsec.vyos.net' name_servers = ['172.16.254.100', '172.16.254.101'] prefix = '172.16.250.0/28' # IKE self.cli_set(base_path + ['ike-group', ike_group, 'key-exchange', 'ikev2']) self.cli_set(base_path + ['ike-group', ike_group, 'lifetime', ike_lifetime]) self.cli_set(base_path + ['ike-group', ike_group, 'proposal', '1', 'dh-group', '14']) self.cli_set(base_path + ['ike-group', ike_group, 'proposal', '1', 'encryption', 'aes256']) self.cli_set(base_path + ['ike-group', ike_group, 'proposal', '1', 'hash', 'sha512']) self.cli_set(base_path + ['ike-group', ike_group, 'proposal', '2', 'dh-group', '14']) self.cli_set(base_path + ['ike-group', ike_group, 'proposal', '2', 'encryption', 'aes256']) self.cli_set(base_path + ['ike-group', ike_group, 'proposal', '2', 'hash', 'sha256']) self.cli_set(base_path + ['ike-group', ike_group, 'proposal', '3', 'dh-group', '2']) self.cli_set(base_path + ['ike-group', ike_group, 'proposal', '3', 'encryption', 'aes256']) self.cli_set(base_path + ['ike-group', ike_group, 'proposal', '3', 'hash', 'sha256']) self.cli_set(base_path + ['ike-group', ike_group, 'proposal', '10', 'dh-group', '14']) self.cli_set(base_path + ['ike-group', ike_group, 'proposal', '10', 'encryption', 'aes128gcm128']) self.cli_set(base_path + ['ike-group', ike_group, 'proposal', '10', 'hash', 'sha256']) # ESP self.cli_set(base_path + ['esp-group', esp_group, 'lifetime', eap_lifetime]) self.cli_set(base_path + ['esp-group', esp_group, 'pfs', 'disable']) self.cli_set(base_path + ['esp-group', esp_group, 'proposal', '1', 'encryption', 'aes256']) self.cli_set(base_path + ['esp-group', esp_group, 'proposal', '1', 'hash', 'sha512']) self.cli_set(base_path + ['esp-group', esp_group, 'proposal', '2', 'encryption', 'aes256']) self.cli_set(base_path + ['esp-group', esp_group, 'proposal', '2', 'hash', 'sha384']) self.cli_set(base_path + ['esp-group', esp_group, 'proposal', '3', 'encryption', 'aes256']) self.cli_set(base_path + ['esp-group', esp_group, 'proposal', '3', 'hash', 'sha256']) self.cli_set(base_path + ['esp-group', esp_group, 'proposal', '4', 'encryption', 'aes256']) self.cli_set(base_path + ['esp-group', esp_group, 'proposal', '4', 'hash', 'sha1']) self.cli_set(base_path + ['esp-group', esp_group, 'proposal', '10', 'encryption', 'aes128gcm128']) self.cli_set(base_path + ['esp-group', esp_group, 'proposal', '10', 'hash', 'sha256']) self.cli_set(base_path + ['remote-access', 'connection', conn_name, 'authentication', 'local-id', local_id]) self.cli_set(base_path + ['remote-access', 'connection', conn_name, 'authentication', 'local-users', 'username', username, 'password', password]) self.cli_set(base_path + ['remote-access', 'connection', conn_name, 'authentication', 'server-mode', 'x509']) self.cli_set(base_path + ['remote-access', 'connection', conn_name, 'authentication', 'x509', 'certificate', peer_name]) # verify() - CA cert required for x509 auth with self.assertRaises(ConfigSessionError): self.cli_commit() self.cli_set(base_path + ['remote-access', 'connection', conn_name, 'authentication', 'x509', 'ca-certificate', ca_name]) self.cli_set(base_path + ['remote-access', 'connection', conn_name, 'esp-group', esp_group]) self.cli_set(base_path + ['remote-access', 'connection', conn_name, 'ike-group', ike_group]) self.cli_set(base_path + ['remote-access', 'connection', conn_name, 'local-address', local_address]) self.cli_set(base_path + ['remote-access', 'connection', conn_name, 'pool', ip_pool_name]) for ns in name_servers: self.cli_set(base_path + ['remote-access', 'pool', ip_pool_name, 'name-server', ns]) self.cli_set(base_path + ['remote-access', 'pool', ip_pool_name, 'prefix', prefix]) self.cli_commit() # verify applied configuration swanctl_conf = read_file(swanctl_file) swanctl_lines = [ f'{conn_name}', f'remote_addrs = %any', f'local_addrs = {local_address}', f'proposals = aes256-sha512-modp2048,aes256-sha256-modp2048,aes256-sha256-modp1024,aes128gcm128-sha256-modp2048', f'version = 2', f'send_certreq = no', f'rekey_time = {ike_lifetime}s', f'keyingtries = 0', f'pools = {ip_pool_name}', f'id = "{local_id}"', f'auth = pubkey', f'certs = peer1.pem', f'auth = eap-mschapv2', f'eap_id = %any', f'esp_proposals = aes256-sha512,aes256-sha384,aes256-sha256,aes256-sha1,aes128gcm128-sha256', f'rekey_time = {eap_lifetime}s', f'rand_time = 540s', f'dpd_action = clear', f'inactivity = 28800', f'local_ts = 0.0.0.0/0,::/0', ] for line in swanctl_lines: self.assertIn(line, swanctl_conf) swanctl_secrets_lines = [ f'eap-{conn_name}-{username}', f'secret = "{password}"', f'id-{conn_name}-{username} = "{username}"', ] for line in swanctl_secrets_lines: self.assertIn(line, swanctl_conf) swanctl_pool_lines = [ f'{ip_pool_name}', f'addrs = {prefix}', f'dns = {",".join(name_servers)}', ] for line in swanctl_pool_lines: self.assertIn(line, swanctl_conf) # Check Root CA, Intermediate CA and Peer cert/key pair is present self.assertTrue(os.path.exists(os.path.join(CA_PATH, f'{ca_name}_1.pem'))) self.assertTrue(os.path.exists(os.path.join(CERT_PATH, f'{peer_name}.pem'))) self.tearDownPKI() def test_08_ikev2_road_warrior_client_auth_eap_tls(self): # This is a known to be good configuration for Microsoft Windows 10 and Apple iOS 17 self.setupPKI() ike_group = 'IKE-RW' esp_group = 'ESP-RW' conn_name = 'vyos-rw' local_address = '192.0.2.1' ip_pool_name = 'ra-rw-ipv4' username = 'vyos' password = 'secret' ike_lifetime = '7200' eap_lifetime = '3600' local_id = 'ipsec.vyos.net' name_servers = ['172.16.254.100', '172.16.254.101'] prefix = '172.16.250.0/28' # IKE self.cli_set(base_path + ['ike-group', ike_group, 'key-exchange', 'ikev2']) self.cli_set(base_path + ['ike-group', ike_group, 'lifetime', ike_lifetime]) self.cli_set(base_path + ['ike-group', ike_group, 'proposal', '1', 'dh-group', '14']) self.cli_set(base_path + ['ike-group', ike_group, 'proposal', '1', 'encryption', 'aes256']) self.cli_set(base_path + ['ike-group', ike_group, 'proposal', '1', 'hash', 'sha512']) self.cli_set(base_path + ['ike-group', ike_group, 'proposal', '2', 'dh-group', '14']) self.cli_set(base_path + ['ike-group', ike_group, 'proposal', '2', 'encryption', 'aes256']) self.cli_set(base_path + ['ike-group', ike_group, 'proposal', '2', 'hash', 'sha256']) self.cli_set(base_path + ['ike-group', ike_group, 'proposal', '3', 'dh-group', '2']) self.cli_set(base_path + ['ike-group', ike_group, 'proposal', '3', 'encryption', 'aes256']) self.cli_set(base_path + ['ike-group', ike_group, 'proposal', '3', 'hash', 'sha256']) self.cli_set(base_path + ['ike-group', ike_group, 'proposal', '10', 'dh-group', '14']) self.cli_set(base_path + ['ike-group', ike_group, 'proposal', '10', 'encryption', 'aes128gcm128']) self.cli_set(base_path + ['ike-group', ike_group, 'proposal', '10', 'hash', 'sha256']) # ESP self.cli_set(base_path + ['esp-group', esp_group, 'lifetime', eap_lifetime]) self.cli_set(base_path + ['esp-group', esp_group, 'pfs', 'disable']) self.cli_set(base_path + ['esp-group', esp_group, 'proposal', '1', 'encryption', 'aes256']) self.cli_set(base_path + ['esp-group', esp_group, 'proposal', '1', 'hash', 'sha512']) self.cli_set(base_path + ['esp-group', esp_group, 'proposal', '2', 'encryption', 'aes256']) self.cli_set(base_path + ['esp-group', esp_group, 'proposal', '2', 'hash', 'sha384']) self.cli_set(base_path + ['esp-group', esp_group, 'proposal', '3', 'encryption', 'aes256']) self.cli_set(base_path + ['esp-group', esp_group, 'proposal', '3', 'hash', 'sha256']) self.cli_set(base_path + ['esp-group', esp_group, 'proposal', '4', 'encryption', 'aes256']) self.cli_set(base_path + ['esp-group', esp_group, 'proposal', '4', 'hash', 'sha1']) self.cli_set(base_path + ['esp-group', esp_group, 'proposal', '10', 'encryption', 'aes128gcm128']) self.cli_set(base_path + ['esp-group', esp_group, 'proposal', '10', 'hash', 'sha256']) self.cli_set(base_path + ['remote-access', 'connection', conn_name, 'authentication', 'local-id', local_id]) # Use EAP-TLS auth instead of default EAP-MSCHAPv2 self.cli_set(base_path + ['remote-access', 'connection', conn_name, 'authentication', 'client-mode', 'eap-tls']) self.cli_set(base_path + ['remote-access', 'connection', conn_name, 'authentication', 'server-mode', 'x509']) self.cli_set(base_path + ['remote-access', 'connection', conn_name, 'authentication', 'x509', 'certificate', peer_name]) # verify() - CA cert required for x509 auth with self.assertRaises(ConfigSessionError): self.cli_commit() self.cli_set(base_path + ['remote-access', 'connection', conn_name, 'authentication', 'x509', 'ca-certificate', ca_name]) self.cli_set(base_path + ['remote-access', 'connection', conn_name, 'esp-group', esp_group]) self.cli_set(base_path + ['remote-access', 'connection', conn_name, 'ike-group', ike_group]) self.cli_set(base_path + ['remote-access', 'connection', conn_name, 'local-address', local_address]) self.cli_set(base_path + ['remote-access', 'connection', conn_name, 'pool', ip_pool_name]) for ns in name_servers: self.cli_set(base_path + ['remote-access', 'pool', ip_pool_name, 'name-server', ns]) self.cli_set(base_path + ['remote-access', 'pool', ip_pool_name, 'prefix', prefix]) self.cli_commit() # verify applied configuration swanctl_conf = read_file(swanctl_file) swanctl_lines = [ f'{conn_name}', f'remote_addrs = %any', f'local_addrs = {local_address}', f'proposals = aes256-sha512-modp2048,aes256-sha256-modp2048,aes256-sha256-modp1024,aes128gcm128-sha256-modp2048', f'version = 2', f'send_certreq = no', f'rekey_time = {ike_lifetime}s', f'keyingtries = 0', f'pools = {ip_pool_name}', f'id = "{local_id}"', f'auth = pubkey', f'certs = peer1.pem', f'auth = eap-tls', f'eap_id = %any', f'esp_proposals = aes256-sha512,aes256-sha384,aes256-sha256,aes256-sha1,aes128gcm128-sha256', f'rekey_time = {eap_lifetime}s', f'rand_time = 540s', f'dpd_action = clear', f'inactivity = 28800', f'local_ts = 0.0.0.0/0,::/0', ] for line in swanctl_lines: self.assertIn(line, swanctl_conf) swanctl_pool_lines = [ f'{ip_pool_name}', f'addrs = {prefix}', f'dns = {",".join(name_servers)}', ] for line in swanctl_pool_lines: self.assertIn(line, swanctl_conf) # Check Root CA, Intermediate CA and Peer cert/key pair is present self.assertTrue(os.path.exists(os.path.join(CA_PATH, f'{ca_name}_1.pem'))) self.assertTrue(os.path.exists(os.path.join(CERT_PATH, f'{peer_name}.pem'))) self.tearDownPKI() def test_09_ikev2_road_warrior_client_auth_x509(self): # This is a known to be good configuration for Microsoft Windows 10 and Apple iOS 17 self.setupPKI() ike_group = 'IKE-RW' esp_group = 'ESP-RW' conn_name = 'vyos-rw' local_address = '192.0.2.1' ip_pool_name = 'ra-rw-ipv4' ike_lifetime = '7200' eap_lifetime = '3600' local_id = 'ipsec.vyos.net' name_servers = ['172.16.254.100', '172.16.254.101'] prefix = '172.16.250.0/28' # IKE self.cli_set(base_path + ['ike-group', ike_group, 'key-exchange', 'ikev2']) self.cli_set(base_path + ['ike-group', ike_group, 'lifetime', ike_lifetime]) self.cli_set(base_path + ['ike-group', ike_group, 'proposal', '1', 'dh-group', '14']) self.cli_set(base_path + ['ike-group', ike_group, 'proposal', '1', 'encryption', 'aes256']) self.cli_set(base_path + ['ike-group', ike_group, 'proposal', '1', 'hash', 'sha512']) self.cli_set(base_path + ['ike-group', ike_group, 'proposal', '2', 'dh-group', '14']) self.cli_set(base_path + ['ike-group', ike_group, 'proposal', '2', 'encryption', 'aes256']) self.cli_set(base_path + ['ike-group', ike_group, 'proposal', '2', 'hash', 'sha256']) self.cli_set(base_path + ['ike-group', ike_group, 'proposal', '3', 'dh-group', '2']) self.cli_set(base_path + ['ike-group', ike_group, 'proposal', '3', 'encryption', 'aes256']) self.cli_set(base_path + ['ike-group', ike_group, 'proposal', '3', 'hash', 'sha256']) self.cli_set(base_path + ['ike-group', ike_group, 'proposal', '10', 'dh-group', '14']) self.cli_set(base_path + ['ike-group', ike_group, 'proposal', '10', 'encryption', 'aes128gcm128']) self.cli_set(base_path + ['ike-group', ike_group, 'proposal', '10', 'hash', 'sha256']) # ESP self.cli_set(base_path + ['esp-group', esp_group, 'lifetime', eap_lifetime]) self.cli_set(base_path + ['esp-group', esp_group, 'pfs', 'disable']) self.cli_set(base_path + ['esp-group', esp_group, 'proposal', '1', 'encryption', 'aes256']) self.cli_set(base_path + ['esp-group', esp_group, 'proposal', '1', 'hash', 'sha512']) self.cli_set(base_path + ['esp-group', esp_group, 'proposal', '2', 'encryption', 'aes256']) self.cli_set(base_path + ['esp-group', esp_group, 'proposal', '2', 'hash', 'sha384']) self.cli_set(base_path + ['esp-group', esp_group, 'proposal', '3', 'encryption', 'aes256']) self.cli_set(base_path + ['esp-group', esp_group, 'proposal', '3', 'hash', 'sha256']) self.cli_set(base_path + ['esp-group', esp_group, 'proposal', '4', 'encryption', 'aes256']) self.cli_set(base_path + ['esp-group', esp_group, 'proposal', '4', 'hash', 'sha1']) self.cli_set(base_path + ['esp-group', esp_group, 'proposal', '10', 'encryption', 'aes128gcm128']) self.cli_set(base_path + ['esp-group', esp_group, 'proposal', '10', 'hash', 'sha256']) self.cli_set(base_path + ['remote-access', 'connection', conn_name, 'authentication', 'local-id', local_id]) # Use client-mode x509 instead of default EAP-MSCHAPv2 self.cli_set(base_path + ['remote-access', 'connection', conn_name, 'authentication', 'client-mode', 'x509']) self.cli_set(base_path + ['remote-access', 'connection', conn_name, 'authentication', 'server-mode', 'x509']) self.cli_set(base_path + ['remote-access', 'connection', conn_name, 'authentication', 'x509', 'certificate', peer_name]) # verify() - CA cert required for x509 auth with self.assertRaises(ConfigSessionError): self.cli_commit() self.cli_set(base_path + ['remote-access', 'connection', conn_name, 'authentication', 'x509', 'ca-certificate', ca_name]) self.cli_set(base_path + ['remote-access', 'connection', conn_name, 'esp-group', esp_group]) self.cli_set(base_path + ['remote-access', 'connection', conn_name, 'ike-group', ike_group]) self.cli_set(base_path + ['remote-access', 'connection', conn_name, 'local-address', local_address]) self.cli_set(base_path + ['remote-access', 'connection', conn_name, 'pool', ip_pool_name]) for ns in name_servers: self.cli_set(base_path + ['remote-access', 'pool', ip_pool_name, 'name-server', ns]) self.cli_set(base_path + ['remote-access', 'pool', ip_pool_name, 'prefix', prefix]) self.cli_commit() # verify applied configuration swanctl_conf = read_file(swanctl_file) swanctl_lines = [ f'{conn_name}', f'remote_addrs = %any', f'local_addrs = {local_address}', f'proposals = aes256-sha512-modp2048,aes256-sha256-modp2048,aes256-sha256-modp1024,aes128gcm128-sha256-modp2048', f'version = 2', f'send_certreq = no', f'rekey_time = {ike_lifetime}s', f'keyingtries = 0', f'pools = {ip_pool_name}', f'id = "{local_id}"', f'auth = pubkey', f'certs = peer1.pem', f'esp_proposals = aes256-sha512,aes256-sha384,aes256-sha256,aes256-sha1,aes128gcm128-sha256', f'rekey_time = {eap_lifetime}s', f'rand_time = 540s', f'dpd_action = clear', f'inactivity = 28800', f'local_ts = 0.0.0.0/0,::/0', ] for line in swanctl_lines: self.assertIn(line, swanctl_conf) swanctl_unexpected_lines = [ f'auth = eap-', f'eap_id' ] for unexpected_line in swanctl_unexpected_lines: self.assertNotIn(unexpected_line, swanctl_conf) swanctl_pool_lines = [ f'{ip_pool_name}', f'addrs = {prefix}', f'dns = {",".join(name_servers)}', ] for line in swanctl_pool_lines: self.assertIn(line, swanctl_conf) # Check Root CA, Intermediate CA and Peer cert/key pair is present self.assertTrue(os.path.exists(os.path.join(CA_PATH, f'{ca_name}_1.pem'))) self.assertTrue(os.path.exists(os.path.join(CERT_PATH, f'{peer_name}.pem'))) self.tearDownPKI() if __name__ == '__main__': unittest.main(verbosity=2) diff --git a/src/conf_mode/pki.py b/src/conf_mode/pki.py index 310519abd..239e44c3b 100755 --- a/src/conf_mode/pki.py +++ b/src/conf_mode/pki.py @@ -1,426 +1,433 @@ #!/usr/bin/env python3 # # Copyright (C) 2021-2024 VyOS maintainers and contributors # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License version 2 or later as # published by the Free Software Foundation. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. import os from sys import argv from sys import exit from vyos.config import Config from vyos.config import config_dict_merge from vyos.configdep import set_dependents from vyos.configdep import call_dependents from vyos.configdict import node_changed from vyos.configdiff import Diff from vyos.defaults import directories from vyos.pki import is_ca_certificate from vyos.pki import load_certificate from vyos.pki import load_public_key from vyos.pki import load_private_key from vyos.pki import load_crl from vyos.pki import load_dh_parameters from vyos.utils.boot import boot_configuration_complete from vyos.utils.dict import dict_search from vyos.utils.dict import dict_search_args from vyos.utils.dict import dict_search_recursive from vyos.utils.process import call from vyos.utils.process import cmd from vyos.utils.process import is_systemd_service_active from vyos import ConfigError from vyos import airbag airbag.enable() vyos_certbot_dir = directories['certbot'] # keys to recursively search for under specified path sync_search = [ { 'keys': ['certificate'], 'path': ['service', 'https'], }, { 'keys': ['certificate', 'ca_certificate'], 'path': ['interfaces', 'ethernet'], }, { 'keys': ['certificate', 'ca_certificate', 'dh_params', 'shared_secret_key', 'auth_key', 'crypt_key'], 'path': ['interfaces', 'openvpn'], }, { 'keys': ['ca_certificate'], 'path': ['interfaces', 'sstpc'], }, { 'keys': ['certificate', 'ca_certificate', 'local_key', 'remote_key'], 'path': ['vpn', 'ipsec'], }, { 'keys': ['certificate', 'ca_certificate'], 'path': ['vpn', 'openconnect'], }, { 'keys': ['certificate', 'ca_certificate'], 'path': ['vpn', 'sstp'], } ] # key from other config nodes -> key in pki['changed'] and pki sync_translate = { 'certificate': 'certificate', 'ca_certificate': 'ca', 'dh_params': 'dh', 'local_key': 'key_pair', 'remote_key': 'key_pair', 'shared_secret_key': 'openvpn', 'auth_key': 'openvpn', 'crypt_key': 'openvpn' } def certbot_delete(certificate): if not boot_configuration_complete(): return if os.path.exists(f'{vyos_certbot_dir}/renewal/{certificate}.conf'): cmd(f'certbot delete --non-interactive --config-dir {vyos_certbot_dir} --cert-name {certificate}') def certbot_request(name: str, config: dict, dry_run: bool=True): # We do not call certbot when booting the system - there is no need to do so and # request new certificates during boot/image upgrade as the certbot configuration # is stored persistent under /config - thus we do not open the door to transient # errors if not boot_configuration_complete(): return domains = '--domains ' + ' --domains '.join(config['domain_name']) - tmp = f'certbot certonly --config-dir {vyos_certbot_dir} --cert-name {name} '\ - f'--non-interactive --standalone --agree-tos --no-eff-email --expand '\ - f'--server {config["url"]} --email {config["email"]} '\ - f'--key-type rsa --rsa-key-size {config["rsa_key_size"]} {domains}' + tmp = f'certbot certonly --non-interactive --config-dir {vyos_certbot_dir} --cert-name {name} '\ + f'--standalone --agree-tos --no-eff-email --expand --server {config["url"]} '\ + f'--email {config["email"]} --key-type rsa --rsa-key-size {config["rsa_key_size"]} '\ + f'{domains}' if 'listen_address' in config: tmp += f' --http-01-address {config["listen_address"]}' # verify() does not need to actually request a cert but only test for plausability if dry_run: tmp += ' --dry-run' cmd(tmp, raising=ConfigError, message=f'ACME certbot request failed for "{name}"!') def get_config(config=None): if config: conf = config else: conf = Config() base = ['pki'] pki = conf.get_config_dict(base, key_mangling=('-', '_'), get_first_key=True, no_tag_node_value_mangle=True) if len(argv) > 1 and argv[1] == 'certbot_renew': pki['certbot_renew'] = {} tmp = node_changed(conf, base + ['ca'], key_mangling=('-', '_'), recursive=True) if tmp: if 'changed' not in pki: pki.update({'changed':{}}) pki['changed'].update({'ca' : tmp}) - tmp = node_changed(conf, base + ['certificate'], key_mangling=('-', '_'), - recursive=True, expand_nodes=Diff.ADD|Diff.DELETE) + tmp = node_changed(conf, base + ['certificate'], key_mangling=('-', '_'), recursive=True) if tmp: if 'changed' not in pki: pki.update({'changed':{}}) pki['changed'].update({'certificate' : tmp}) tmp = node_changed(conf, base + ['dh'], key_mangling=('-', '_'), recursive=True) if tmp: if 'changed' not in pki: pki.update({'changed':{}}) pki['changed'].update({'dh' : tmp}) tmp = node_changed(conf, base + ['key-pair'], key_mangling=('-', '_'), recursive=True) if tmp: if 'changed' not in pki: pki.update({'changed':{}}) pki['changed'].update({'key_pair' : tmp}) tmp = node_changed(conf, base + ['openvpn', 'shared-secret'], key_mangling=('-', '_'), recursive=True) if tmp: if 'changed' not in pki: pki.update({'changed':{}}) pki['changed'].update({'openvpn' : tmp}) # We only merge on the defaults of there is a configuration at all if conf.exists(base): # We have gathered the dict representation of the CLI, but there are default # options which we need to update into the dictionary retrived. default_values = conf.get_config_defaults(**pki.kwargs, recursive=True) # remove ACME default configuration if unused by CLI if 'certificate' in pki: for name, cert_config in pki['certificate'].items(): if 'acme' not in cert_config: # Remove ACME default values del default_values['certificate'][name]['acme'] # merge CLI and default dictionary pki = config_dict_merge(default_values, pki) # Certbot triggered an external renew of the certificates. # Mark all ACME based certificates as "changed" to trigger # update of dependent services if 'certificate' in pki and 'certbot_renew' in pki: renew = [] for name, cert_config in pki['certificate'].items(): if 'acme' in cert_config: renew.append(name) # If triggered externally by certbot, certificate key is not present in changed if 'changed' not in pki: pki.update({'changed':{}}) pki['changed'].update({'certificate' : renew}) # We need to get the entire system configuration to verify that we are not # deleting a certificate that is still referenced somewhere! pki['system'] = conf.get_config_dict([], key_mangling=('-', '_'), get_first_key=True, no_tag_node_value_mangle=True) for search in sync_search: for key in search['keys']: changed_key = sync_translate[key] if 'changed' not in pki or changed_key not in pki['changed']: continue for item_name in pki['changed'][changed_key]: node_present = False if changed_key == 'openvpn': node_present = dict_search_args(pki, 'openvpn', 'shared_secret', item_name) else: node_present = dict_search_args(pki, changed_key, item_name) if node_present: search_dict = dict_search_args(pki['system'], *search['path']) if not search_dict: continue for found_name, found_path in dict_search_recursive(search_dict, key): if found_name == item_name: path = search['path'] path_str = ' '.join(path + found_path) - print(f'pki: Updating config: {path_str} {found_name}') + print(f'PKI: Updating config: {path_str} {found_name}') if path[0] == 'interfaces': ifname = found_path[0] set_dependents(path[1], conf, ifname) else: set_dependents(path[1], conf) return pki def is_valid_certificate(raw_data): # If it loads correctly we're good, or return False return load_certificate(raw_data, wrap_tags=True) def is_valid_ca_certificate(raw_data): # Check if this is a valid certificate with CA attributes cert = load_certificate(raw_data, wrap_tags=True) if not cert: return False return is_ca_certificate(cert) def is_valid_public_key(raw_data): # If it loads correctly we're good, or return False return load_public_key(raw_data, wrap_tags=True) def is_valid_private_key(raw_data, protected=False): # If it loads correctly we're good, or return False # With encrypted private keys, we always return true as we cannot ask for password to verify if protected: return True return load_private_key(raw_data, passphrase=None, wrap_tags=True) def is_valid_crl(raw_data): # If it loads correctly we're good, or return False return load_crl(raw_data, wrap_tags=True) def is_valid_dh_parameters(raw_data): # If it loads correctly we're good, or return False return load_dh_parameters(raw_data, wrap_tags=True) def verify(pki): if not pki: return None if 'ca' in pki: for name, ca_conf in pki['ca'].items(): if 'certificate' in ca_conf: if not is_valid_ca_certificate(ca_conf['certificate']): raise ConfigError(f'Invalid certificate on CA certificate "{name}"') if 'private' in ca_conf and 'key' in ca_conf['private']: private = ca_conf['private'] protected = 'password_protected' in private if not is_valid_private_key(private['key'], protected): raise ConfigError(f'Invalid private key on CA certificate "{name}"') if 'crl' in ca_conf: ca_crls = ca_conf['crl'] if isinstance(ca_crls, str): ca_crls = [ca_crls] for crl in ca_crls: if not is_valid_crl(crl): raise ConfigError(f'Invalid CRL on CA certificate "{name}"') if 'certificate' in pki: for name, cert_conf in pki['certificate'].items(): if 'certificate' in cert_conf: if not is_valid_certificate(cert_conf['certificate']): raise ConfigError(f'Invalid certificate on certificate "{name}"') if 'private' in cert_conf and 'key' in cert_conf['private']: private = cert_conf['private'] protected = 'password_protected' in private if not is_valid_private_key(private['key'], protected): raise ConfigError(f'Invalid private key on certificate "{name}"') if 'acme' in cert_conf: if 'domain_name' not in cert_conf['acme']: raise ConfigError(f'At least one domain-name is required to request '\ f'certificate for "{name}" via ACME!') if 'email' not in cert_conf['acme']: raise ConfigError(f'An email address is required to request '\ f'certificate for "{name}" via ACME!') if 'certbot_renew' not in pki: # Only run the ACME command if something on this entity changed, # as this is time intensive tmp = dict_search('changed.certificate', pki) if tmp != None and name in tmp: certbot_request(name, cert_conf['acme']) if 'dh' in pki: for name, dh_conf in pki['dh'].items(): if 'parameters' in dh_conf: if not is_valid_dh_parameters(dh_conf['parameters']): raise ConfigError(f'Invalid DH parameters on "{name}"') if 'key_pair' in pki: for name, key_conf in pki['key_pair'].items(): if 'public' in key_conf and 'key' in key_conf['public']: if not is_valid_public_key(key_conf['public']['key']): raise ConfigError(f'Invalid public key on key-pair "{name}"') if 'private' in key_conf and 'key' in key_conf['private']: private = key_conf['private'] protected = 'password_protected' in private if not is_valid_private_key(private['key'], protected): raise ConfigError(f'Invalid private key on key-pair "{name}"') if 'x509' in pki: if 'default' in pki['x509']: default_values = pki['x509']['default'] if 'country' in default_values: country = default_values['country'] if len(country) != 2 or not country.isalpha(): raise ConfigError(f'Invalid default country value. Value must be 2 alpha characters.') if 'changed' in pki: # if the list is getting longer, we can move to a dict() and also embed the # search key as value from line 173 or 176 for search in sync_search: for key in search['keys']: changed_key = sync_translate[key] if changed_key not in pki['changed']: continue for item_name in pki['changed'][changed_key]: node_present = False if changed_key == 'openvpn': node_present = dict_search_args(pki, 'openvpn', 'shared_secret', item_name) else: node_present = dict_search_args(pki, changed_key, item_name) if not node_present: search_dict = dict_search_args(pki['system'], *search['path']) if not search_dict: continue for found_name, found_path in dict_search_recursive(search_dict, key): if found_name == item_name: path_str = " ".join(search['path'] + found_path) raise ConfigError(f'PKI object "{item_name}" still in use by "{path_str}"') return None def generate(pki): if not pki: return None # Certbot renewal only needs to re-trigger the services to load up the # new PEM file if 'certbot_renew' in pki: return None - # list of certificates issued via certbot certbot_list = [] + certbot_list_on_disk = [] + if os.path.exists(f'{vyos_certbot_dir}/live'): + certbot_list_on_disk = [f.path.split('/')[-1] for f in os.scandir(f'{vyos_certbot_dir}/live') if f.is_dir()] + if 'certificate' in pki: + changed_certificates = dict_search('changed.certificate', pki) for name, cert_conf in pki['certificate'].items(): if 'acme' in cert_conf: certbot_list.append(name) - # when something for the certificate changed, we should delete it - if name in dict_search('changed.certificate', pki): - certbot_delete(name) + # generate certificate if not found on disk + if name not in certbot_list_on_disk: + certbot_request(name, cert_conf['acme'], dry_run=False) + elif changed_certificates != None and name in changed_certificates: + # when something for the certificate changed, we should delete it + if name in certbot_list_on_disk: + certbot_delete(name) certbot_request(name, cert_conf['acme'], dry_run=False) # Cleanup certbot configuration and certificates if no longer in use by CLI # Get foldernames under vyos_certbot_dir which each represent a certbot cert if os.path.exists(f'{vyos_certbot_dir}/live'): - for cert in [f.path.split('/')[-1] for f in os.scandir(f'{vyos_certbot_dir}/live') if f.is_dir()]: + for cert in certbot_list_on_disk: if cert not in certbot_list: # certificate is no longer active on the CLI - remove it certbot_delete(cert) return None def apply(pki): systemd_certbot_name = 'certbot.timer' if not pki: call(f'systemctl stop {systemd_certbot_name}') return None has_certbot = False if 'certificate' in pki: for name, cert_conf in pki['certificate'].items(): if 'acme' in cert_conf: has_certbot = True break if not has_certbot: call(f'systemctl stop {systemd_certbot_name}') elif has_certbot and not is_systemd_service_active(systemd_certbot_name): call(f'systemctl restart {systemd_certbot_name}') if 'changed' in pki: call_dependents() return None if __name__ == '__main__': try: c = get_config() verify(c) generate(c) apply(c) except ConfigError as e: print(e) exit(1) diff --git a/src/conf_mode/vpn_ipsec.py b/src/conf_mode/vpn_ipsec.py index 7fd32c230..5bdcf2fa1 100755 --- a/src/conf_mode/vpn_ipsec.py +++ b/src/conf_mode/vpn_ipsec.py @@ -1,594 +1,595 @@ #!/usr/bin/env python3 # # Copyright (C) 2021-2024 VyOS maintainers and contributors # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License version 2 or later as # published by the Free Software Foundation. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. import ipaddress import os import re import jmespath from sys import exit from time import sleep from time import time from vyos.base import Warning from vyos.config import Config from vyos.configdict import leaf_node_changed from vyos.configverify import verify_interface_exists from vyos.defaults import directories from vyos.ifconfig import Interface from vyos.pki import encode_certificate from vyos.pki import encode_public_key from vyos.pki import find_chain from vyos.pki import load_certificate from vyos.pki import load_private_key from vyos.pki import wrap_certificate from vyos.pki import wrap_crl from vyos.pki import wrap_public_key from vyos.pki import wrap_private_key from vyos.template import ip_from_cidr from vyos.template import is_ipv4 from vyos.template import is_ipv6 from vyos.template import render from vyos.utils.network import is_ipv6_link_local +from vyos.utils.network import interface_exists from vyos.utils.dict import dict_search from vyos.utils.dict import dict_search_args from vyos.utils.process import call from vyos.utils.process import run from vyos import ConfigError from vyos import airbag airbag.enable() dhcp_wait_attempts = 2 dhcp_wait_sleep = 1 swanctl_dir = '/etc/swanctl' charon_conf = '/etc/strongswan.d/charon.conf' charon_dhcp_conf = '/etc/strongswan.d/charon/dhcp.conf' charon_radius_conf = '/etc/strongswan.d/charon/eap-radius.conf' interface_conf = '/etc/strongswan.d/interfaces_use.conf' swanctl_conf = f'{swanctl_dir}/swanctl.conf' default_install_routes = 'yes' vici_socket = '/var/run/charon.vici' -CERT_PATH = f'{swanctl_dir}/x509/' +CERT_PATH = f'{swanctl_dir}/x509/' PUBKEY_PATH = f'{swanctl_dir}/pubkey/' -KEY_PATH = f'{swanctl_dir}/private/' -CA_PATH = f'{swanctl_dir}/x509ca/' -CRL_PATH = f'{swanctl_dir}/x509crl/' +KEY_PATH = f'{swanctl_dir}/private/' +CA_PATH = f'{swanctl_dir}/x509ca/' +CRL_PATH = f'{swanctl_dir}/x509crl/' DHCP_HOOK_IFLIST = '/tmp/ipsec_dhcp_waiting' def get_config(config=None): if config: conf = config else: conf = Config() base = ['vpn', 'ipsec'] l2tp_base = ['vpn', 'l2tp', 'remote-access', 'ipsec-settings'] if not conf.exists(base): return None # retrieve common dictionary keys ipsec = conf.get_config_dict(base, key_mangling=('-', '_'), no_tag_node_value_mangle=True, get_first_key=True, with_recursive_defaults=True, with_pki=True) ipsec['dhcp_no_address'] = {} ipsec['install_routes'] = 'no' if conf.exists(base + ["options", "disable-route-autoinstall"]) else default_install_routes ipsec['interface_change'] = leaf_node_changed(conf, base + ['interface']) ipsec['nhrp_exists'] = conf.exists(['protocols', 'nhrp', 'tunnel']) tmp = conf.get_config_dict(l2tp_base, key_mangling=('-', '_'), no_tag_node_value_mangle=True, get_first_key=True) if tmp: ipsec['l2tp'] = conf.merge_defaults(tmp, recursive=True) ipsec['l2tp_outside_address'] = conf.return_value(['vpn', 'l2tp', 'remote-access', 'outside-address']) ipsec['l2tp_ike_default'] = 'aes256-sha1-modp1024,3des-sha1-modp1024' ipsec['l2tp_esp_default'] = 'aes256-sha1,3des-sha1' return ipsec def get_dhcp_address(iface): addresses = Interface(iface).get_addr() if not addresses: return None for address in addresses: if not is_ipv6_link_local(address): return ip_from_cidr(address) return None def verify_pki_x509(pki, x509_conf): if not pki or 'ca' not in pki or 'certificate' not in pki: raise ConfigError(f'PKI is not configured') ca_cert_name = x509_conf['ca_certificate'] cert_name = x509_conf['certificate'] if not dict_search_args(pki, 'ca', ca_cert_name, 'certificate'): raise ConfigError(f'Missing CA certificate on specified PKI CA certificate "{ca_cert_name}"') if not dict_search_args(pki, 'certificate', cert_name, 'certificate'): raise ConfigError(f'Missing certificate on specified PKI certificate "{cert_name}"') if not dict_search_args(pki, 'certificate', cert_name, 'private', 'key'): raise ConfigError(f'Missing private key on specified PKI certificate "{cert_name}"') return True def verify_pki_rsa(pki, rsa_conf): if not pki or 'key_pair' not in pki: raise ConfigError(f'PKI is not configured') local_key = rsa_conf['local_key'] remote_key = rsa_conf['remote_key'] if not dict_search_args(pki, 'key_pair', local_key, 'private', 'key'): raise ConfigError(f'Missing private key on specified local-key "{local_key}"') if not dict_search_args(pki, 'key_pair', remote_key, 'public', 'key'): raise ConfigError(f'Missing public key on specified remote-key "{remote_key}"') return True def verify(ipsec): if not ipsec: return None if 'authentication' in ipsec: if 'psk' in ipsec['authentication']: for psk, psk_config in ipsec['authentication']['psk'].items(): if 'id' not in psk_config or 'secret' not in psk_config: raise ConfigError(f'Authentication psk "{psk}" missing "id" or "secret"') if 'interfaces' in ipsec : for ifname in ipsec['interface']: verify_interface_exists(ifname) if 'l2tp' in ipsec: if 'esp_group' in ipsec['l2tp']: if 'esp_group' not in ipsec or ipsec['l2tp']['esp_group'] not in ipsec['esp_group']: raise ConfigError(f"Invalid esp-group on L2TP remote-access config") if 'ike_group' in ipsec['l2tp']: if 'ike_group' not in ipsec or ipsec['l2tp']['ike_group'] not in ipsec['ike_group']: raise ConfigError(f"Invalid ike-group on L2TP remote-access config") if 'authentication' not in ipsec['l2tp']: raise ConfigError(f'Missing authentication settings on L2TP remote-access config') if 'mode' not in ipsec['l2tp']['authentication']: raise ConfigError(f'Missing authentication mode on L2TP remote-access config') if not ipsec['l2tp_outside_address']: raise ConfigError(f'Missing outside-address on L2TP remote-access config') if ipsec['l2tp']['authentication']['mode'] == 'pre-shared-secret': if 'pre_shared_secret' not in ipsec['l2tp']['authentication']: raise ConfigError(f'Missing pre shared secret on L2TP remote-access config') if ipsec['l2tp']['authentication']['mode'] == 'x509': if 'x509' not in ipsec['l2tp']['authentication']: raise ConfigError(f'Missing x509 settings on L2TP remote-access config') x509 = ipsec['l2tp']['authentication']['x509'] if 'ca_certificate' not in x509 or 'certificate' not in x509: raise ConfigError(f'Missing x509 certificates on L2TP remote-access config') verify_pki_x509(ipsec['pki'], x509) if 'profile' in ipsec: for profile, profile_conf in ipsec['profile'].items(): if 'esp_group' in profile_conf: if 'esp_group' not in ipsec or profile_conf['esp_group'] not in ipsec['esp_group']: raise ConfigError(f"Invalid esp-group on {profile} profile") else: raise ConfigError(f"Missing esp-group on {profile} profile") if 'ike_group' in profile_conf: if 'ike_group' not in ipsec or profile_conf['ike_group'] not in ipsec['ike_group']: raise ConfigError(f"Invalid ike-group on {profile} profile") else: raise ConfigError(f"Missing ike-group on {profile} profile") if 'authentication' not in profile_conf: raise ConfigError(f"Missing authentication on {profile} profile") if 'remote_access' in ipsec: if 'connection' in ipsec['remote_access']: for name, ra_conf in ipsec['remote_access']['connection'].items(): if 'esp_group' in ra_conf: if 'esp_group' not in ipsec or ra_conf['esp_group'] not in ipsec['esp_group']: raise ConfigError(f"Invalid esp-group on {name} remote-access config") else: raise ConfigError(f"Missing esp-group on {name} remote-access config") if 'ike_group' in ra_conf: if 'ike_group' not in ipsec or ra_conf['ike_group'] not in ipsec['ike_group']: raise ConfigError(f"Invalid ike-group on {name} remote-access config") ike = ra_conf['ike_group'] if dict_search(f'ike_group.{ike}.key_exchange', ipsec) != 'ikev2': raise ConfigError('IPsec remote-access connections requires IKEv2!') else: raise ConfigError(f"Missing ike-group on {name} remote-access config") if 'authentication' not in ra_conf: raise ConfigError(f"Missing authentication on {name} remote-access config") if ra_conf['authentication']['server_mode'] == 'x509': if 'x509' not in ra_conf['authentication']: raise ConfigError(f"Missing x509 settings on {name} remote-access config") x509 = ra_conf['authentication']['x509'] if 'ca_certificate' not in x509 or 'certificate' not in x509: raise ConfigError(f"Missing x509 certificates on {name} remote-access config") verify_pki_x509(ipsec['pki'], x509) elif ra_conf['authentication']['server_mode'] == 'pre-shared-secret': if 'pre_shared_secret' not in ra_conf['authentication']: raise ConfigError(f"Missing pre-shared-key on {name} remote-access config") if 'client_mode' not in ra_conf['authentication']: raise ConfigError('Client authentication method is required!') if dict_search('authentication.client_mode', ra_conf) == 'eap-radius': if dict_search('remote_access.radius.server', ipsec) == None: raise ConfigError('RADIUS authentication requires at least one server') if 'pool' in ra_conf: if {'dhcp', 'radius'} <= set(ra_conf['pool']): raise ConfigError(f'Can not use both DHCP and RADIUS for address allocation '\ f'at the same time for "{name}"!') if 'dhcp' in ra_conf['pool'] and len(ra_conf['pool']) > 1: raise ConfigError(f'Can not use DHCP and a predefined address pool for "{name}"!') if 'radius' in ra_conf['pool'] and len(ra_conf['pool']) > 1: raise ConfigError(f'Can not use RADIUS and a predefined address pool for "{name}"!') for pool in ra_conf['pool']: if pool == 'dhcp': if dict_search('remote_access.dhcp.server', ipsec) == None: raise ConfigError('IPsec DHCP server is not configured!') elif pool == 'radius': if dict_search('remote_access.radius.server', ipsec) == None: raise ConfigError('IPsec RADIUS server is not configured!') if dict_search('authentication.client_mode', ra_conf) != 'eap-radius': raise ConfigError('RADIUS IP pool requires eap-radius client authentication!') elif 'pool' not in ipsec['remote_access'] or pool not in ipsec['remote_access']['pool']: raise ConfigError(f'Requested pool "{pool}" does not exist!') if 'pool' in ipsec['remote_access']: for pool, pool_config in ipsec['remote_access']['pool'].items(): if 'prefix' not in pool_config: raise ConfigError(f'Missing madatory prefix option for pool "{pool}"!') if 'name_server' in pool_config: if len(pool_config['name_server']) > 2: raise ConfigError(f'Only two name-servers are supported for remote-access pool "{pool}"!') for ns in pool_config['name_server']: v4_addr_and_ns = is_ipv4(ns) and not is_ipv4(pool_config['prefix']) v6_addr_and_ns = is_ipv6(ns) and not is_ipv6(pool_config['prefix']) if v4_addr_and_ns or v6_addr_and_ns: raise ConfigError('Must use both IPv4 or IPv6 addresses for pool prefix and name-server adresses!') if 'exclude' in pool_config: for exclude in pool_config['exclude']: v4_addr_and_exclude = is_ipv4(exclude) and not is_ipv4(pool_config['prefix']) v6_addr_and_exclude = is_ipv6(exclude) and not is_ipv6(pool_config['prefix']) if v4_addr_and_exclude or v6_addr_and_exclude: raise ConfigError('Must use both IPv4 or IPv6 addresses for pool prefix and exclude prefixes!') if 'radius' in ipsec['remote_access'] and 'server' in ipsec['remote_access']['radius']: for server, server_config in ipsec['remote_access']['radius']['server'].items(): if 'key' not in server_config: raise ConfigError(f'Missing RADIUS secret key for server "{server}"') if 'site_to_site' in ipsec and 'peer' in ipsec['site_to_site']: for peer, peer_conf in ipsec['site_to_site']['peer'].items(): has_default_esp = False # Peer name it is swanctl connection name and shouldn't contain dots or colons, T4118 if bool(re.search(':|\.', peer)): raise ConfigError(f'Incorrect peer name "{peer}" ' f'Peer name can contain alpha-numeric letters, hyphen and underscore') if 'remote_address' not in peer_conf: print(f'You should set correct remote-address "peer {peer} remote-address x.x.x.x"\n') if 'default_esp_group' in peer_conf: has_default_esp = True if 'esp_group' not in ipsec or peer_conf['default_esp_group'] not in ipsec['esp_group']: raise ConfigError(f"Invalid esp-group on site-to-site peer {peer}") if 'ike_group' in peer_conf: if 'ike_group' not in ipsec or peer_conf['ike_group'] not in ipsec['ike_group']: raise ConfigError(f"Invalid ike-group on site-to-site peer {peer}") else: raise ConfigError(f"Missing ike-group on site-to-site peer {peer}") if 'authentication' not in peer_conf or 'mode' not in peer_conf['authentication']: raise ConfigError(f"Missing authentication on site-to-site peer {peer}") if {'id', 'use_x509_id'} <= set(peer_conf['authentication']): raise ConfigError(f"Manually set peer id and use-x509-id are mutually exclusive!") if peer_conf['authentication']['mode'] == 'x509': if 'x509' not in peer_conf['authentication']: raise ConfigError(f"Missing x509 settings on site-to-site peer {peer}") x509 = peer_conf['authentication']['x509'] if 'ca_certificate' not in x509 or 'certificate' not in x509: raise ConfigError(f"Missing x509 certificates on site-to-site peer {peer}") verify_pki_x509(ipsec['pki'], x509) elif peer_conf['authentication']['mode'] == 'rsa': if 'rsa' not in peer_conf['authentication']: raise ConfigError(f"Missing RSA settings on site-to-site peer {peer}") rsa = peer_conf['authentication']['rsa'] if 'local_key' not in rsa: raise ConfigError(f"Missing RSA local-key on site-to-site peer {peer}") if 'remote_key' not in rsa: raise ConfigError(f"Missing RSA remote-key on site-to-site peer {peer}") verify_pki_rsa(ipsec['pki'], rsa) if 'local_address' not in peer_conf and 'dhcp_interface' not in peer_conf: raise ConfigError(f"Missing local-address or dhcp-interface on site-to-site peer {peer}") if 'dhcp_interface' in peer_conf: dhcp_interface = peer_conf['dhcp_interface'] verify_interface_exists(dhcp_interface) dhcp_base = directories['isc_dhclient_dir'] if not os.path.exists(f'{dhcp_base}/dhclient_{dhcp_interface}.conf'): raise ConfigError(f"Invalid dhcp-interface on site-to-site peer {peer}") address = get_dhcp_address(dhcp_interface) count = 0 while not address and count < dhcp_wait_attempts: address = get_dhcp_address(dhcp_interface) count += 1 sleep(dhcp_wait_sleep) if not address: ipsec['dhcp_no_address'][peer] = dhcp_interface print(f"Failed to get address from dhcp-interface on site-to-site peer {peer} -- skipped") continue if 'vti' in peer_conf: if 'local_address' in peer_conf and 'dhcp_interface' in peer_conf: raise ConfigError(f"A single local-address or dhcp-interface is required when using VTI on site-to-site peer {peer}") if dict_search('options.disable_route_autoinstall', ipsec) == None: Warning('It\'s recommended to use ipsec vti with the next command\n[set vpn ipsec option disable-route-autoinstall]') if 'bind' in peer_conf['vti']: vti_interface = peer_conf['vti']['bind'] - if not os.path.exists(f'/sys/class/net/{vti_interface}'): + if not interface_exists(vti_interface): raise ConfigError(f'VTI interface {vti_interface} for site-to-site peer {peer} does not exist!') if 'vti' not in peer_conf and 'tunnel' not in peer_conf: raise ConfigError(f"No VTI or tunnel specified on site-to-site peer {peer}") if 'tunnel' in peer_conf: for tunnel, tunnel_conf in peer_conf['tunnel'].items(): if 'esp_group' not in tunnel_conf and not has_default_esp: raise ConfigError(f"Missing esp-group on tunnel {tunnel} for site-to-site peer {peer}") esp_group_name = tunnel_conf['esp_group'] if 'esp_group' in tunnel_conf else peer_conf['default_esp_group'] if esp_group_name not in ipsec['esp_group']: raise ConfigError(f"Invalid esp-group on tunnel {tunnel} for site-to-site peer {peer}") esp_group = ipsec['esp_group'][esp_group_name] if 'mode' in esp_group and esp_group['mode'] == 'transport': if 'protocol' in tunnel_conf and ((peer in ['any', '0.0.0.0']) or ('local_address' not in peer_conf or peer_conf['local_address'] in ['any', '0.0.0.0'])): raise ConfigError(f"Fixed local-address or peer required when a protocol is defined with ESP transport mode on tunnel {tunnel} for site-to-site peer {peer}") if ('local' in tunnel_conf and 'prefix' in tunnel_conf['local']) or ('remote' in tunnel_conf and 'prefix' in tunnel_conf['remote']): raise ConfigError(f"Local/remote prefix cannot be used with ESP transport mode on tunnel {tunnel} for site-to-site peer {peer}") def cleanup_pki_files(): for path in [CERT_PATH, CA_PATH, CRL_PATH, KEY_PATH, PUBKEY_PATH]: if not os.path.exists(path): continue for file in os.listdir(path): file_path = os.path.join(path, file) if os.path.isfile(file_path): os.unlink(file_path) def generate_pki_files_x509(pki, x509_conf): ca_cert_name = x509_conf['ca_certificate'] ca_cert_data = dict_search_args(pki, 'ca', ca_cert_name, 'certificate') ca_cert_crls = dict_search_args(pki, 'ca', ca_cert_name, 'crl') or [] ca_index = 1 crl_index = 1 ca_cert = load_certificate(ca_cert_data) pki_ca_certs = [load_certificate(ca['certificate']) for ca in pki['ca'].values()] ca_cert_chain = find_chain(ca_cert, pki_ca_certs) cert_name = x509_conf['certificate'] cert_data = dict_search_args(pki, 'certificate', cert_name, 'certificate') key_data = dict_search_args(pki, 'certificate', cert_name, 'private', 'key') protected = 'passphrase' in x509_conf for ca_cert_obj in ca_cert_chain: with open(os.path.join(CA_PATH, f'{ca_cert_name}_{ca_index}.pem'), 'w') as f: f.write(encode_certificate(ca_cert_obj)) ca_index += 1 for crl in ca_cert_crls: with open(os.path.join(CRL_PATH, f'{ca_cert_name}_{crl_index}.pem'), 'w') as f: f.write(wrap_crl(crl)) crl_index += 1 with open(os.path.join(CERT_PATH, f'{cert_name}.pem'), 'w') as f: f.write(wrap_certificate(cert_data)) with open(os.path.join(KEY_PATH, f'x509_{cert_name}.pem'), 'w') as f: f.write(wrap_private_key(key_data, protected)) def generate_pki_files_rsa(pki, rsa_conf): local_key_name = rsa_conf['local_key'] local_key_data = dict_search_args(pki, 'key_pair', local_key_name, 'private', 'key') protected = 'passphrase' in rsa_conf remote_key_name = rsa_conf['remote_key'] remote_key_data = dict_search_args(pki, 'key_pair', remote_key_name, 'public', 'key') local_key = load_private_key(local_key_data, rsa_conf['passphrase'] if protected else None) with open(os.path.join(KEY_PATH, f'rsa_{local_key_name}.pem'), 'w') as f: f.write(wrap_private_key(local_key_data, protected)) with open(os.path.join(PUBKEY_PATH, f'{local_key_name}.pem'), 'w') as f: f.write(encode_public_key(local_key.public_key())) with open(os.path.join(PUBKEY_PATH, f'{remote_key_name}.pem'), 'w') as f: f.write(wrap_public_key(remote_key_data)) def generate(ipsec): cleanup_pki_files() if not ipsec: for config_file in [charon_dhcp_conf, charon_radius_conf, interface_conf, swanctl_conf]: if os.path.isfile(config_file): os.unlink(config_file) render(charon_conf, 'ipsec/charon.j2', {'install_routes': default_install_routes}) return if ipsec['dhcp_no_address']: with open(DHCP_HOOK_IFLIST, 'w') as f: f.write(" ".join(ipsec['dhcp_no_address'].values())) elif os.path.exists(DHCP_HOOK_IFLIST): os.unlink(DHCP_HOOK_IFLIST) for path in [swanctl_dir, CERT_PATH, CA_PATH, CRL_PATH, PUBKEY_PATH]: if not os.path.exists(path): os.mkdir(path, mode=0o755) if not os.path.exists(KEY_PATH): os.mkdir(KEY_PATH, mode=0o700) if 'l2tp' in ipsec: if 'authentication' in ipsec['l2tp'] and 'x509' in ipsec['l2tp']['authentication']: generate_pki_files_x509(ipsec['pki'], ipsec['l2tp']['authentication']['x509']) if 'remote_access' in ipsec and 'connection' in ipsec['remote_access']: for rw, rw_conf in ipsec['remote_access']['connection'].items(): if 'authentication' in rw_conf and 'x509' in rw_conf['authentication']: generate_pki_files_x509(ipsec['pki'], rw_conf['authentication']['x509']) if 'site_to_site' in ipsec and 'peer' in ipsec['site_to_site']: for peer, peer_conf in ipsec['site_to_site']['peer'].items(): if peer in ipsec['dhcp_no_address']: continue if peer_conf['authentication']['mode'] == 'x509': generate_pki_files_x509(ipsec['pki'], peer_conf['authentication']['x509']) elif peer_conf['authentication']['mode'] == 'rsa': generate_pki_files_rsa(ipsec['pki'], peer_conf['authentication']['rsa']) local_ip = '' if 'local_address' in peer_conf: local_ip = peer_conf['local_address'] elif 'dhcp_interface' in peer_conf: local_ip = get_dhcp_address(peer_conf['dhcp_interface']) ipsec['site_to_site']['peer'][peer]['local_address'] = local_ip if 'tunnel' in peer_conf: for tunnel, tunnel_conf in peer_conf['tunnel'].items(): local_prefixes = dict_search_args(tunnel_conf, 'local', 'prefix') remote_prefixes = dict_search_args(tunnel_conf, 'remote', 'prefix') if not local_prefixes or not remote_prefixes: continue passthrough = None for local_prefix in local_prefixes: for remote_prefix in remote_prefixes: local_net = ipaddress.ip_network(local_prefix) remote_net = ipaddress.ip_network(remote_prefix) if local_net.overlaps(remote_net): if passthrough is None: passthrough = [] passthrough.append(local_prefix) ipsec['site_to_site']['peer'][peer]['tunnel'][tunnel]['passthrough'] = passthrough # auth psk <tag> dhcp-interface <xxx> if jmespath.search('authentication.psk.*.dhcp_interface', ipsec): for psk, psk_config in ipsec['authentication']['psk'].items(): if 'dhcp_interface' in psk_config: for iface in psk_config['dhcp_interface']: id = get_dhcp_address(iface) if id: ipsec['authentication']['psk'][psk]['id'].append(id) render(charon_conf, 'ipsec/charon.j2', ipsec) render(charon_dhcp_conf, 'ipsec/charon/dhcp.conf.j2', ipsec) render(charon_radius_conf, 'ipsec/charon/eap-radius.conf.j2', ipsec) render(interface_conf, 'ipsec/interfaces_use.conf.j2', ipsec) render(swanctl_conf, 'ipsec/swanctl.conf.j2', ipsec) def resync_nhrp(ipsec): if ipsec and not ipsec['nhrp_exists']: return tmp = run('/usr/libexec/vyos/conf_mode/protocols_nhrp.py') if tmp > 0: print('ERROR: failed to reapply NHRP settings!') def apply(ipsec): systemd_service = 'strongswan.service' if not ipsec: call(f'systemctl stop {systemd_service}') else: call(f'systemctl reload-or-restart {systemd_service}') resync_nhrp(ipsec) if __name__ == '__main__': try: ipsec = get_config() verify(ipsec) generate(ipsec) apply(ipsec) except ConfigError as e: print(e) exit(1)