diff --git a/interface-definitions/include/pki/openssh-key.xml.i b/interface-definitions/include/pki/openssh-key.xml.i new file mode 100644 index 000000000..8f005d077 --- /dev/null +++ b/interface-definitions/include/pki/openssh-key.xml.i @@ -0,0 +1,14 @@ +<!-- include start from pki/openssh-key.xml.i --> +<leafNode name="key"> + <properties> + <help>OpenSSH key in PKI configuration</help> + <completionHelp> + <path>pki openssh</path> + </completionHelp> + <valueHelp> + <format>txt</format> + <description>Name of OpenSSH key in PKI configuration</description> + </valueHelp> + </properties> +</leafNode> +<!-- include end --> diff --git a/interface-definitions/pki.xml.in b/interface-definitions/pki.xml.in index 617bdd584..7a0b073b4 100644 --- a/interface-definitions/pki.xml.in +++ b/interface-definitions/pki.xml.in @@ -1,248 +1,287 @@ <?xml version="1.0"?> <interfaceDefinition> <node name="pki" owner="${vyos_conf_scripts_dir}/pki.py"> <properties> <help>VyOS PKI configuration</help> <priority>300</priority> </properties> <children> <tagNode name="ca"> <properties> <help>Certificate Authority</help> <constraint> #include <include/constraint/alpha-numeric-hyphen-underscore-dot.xml.i> </constraint> </properties> <children> #include <include/pki/cli-certificate-base64.xml.i> #include <include/generic-description.xml.i> <node name="private"> <properties> <help>CA private key in PEM format</help> </properties> <children> #include <include/pki/cli-private-key-base64.xml.i> #include <include/pki/password-protected.xml.i> </children> </node> <leafNode name="crl"> <properties> <help>Certificate revocation list in PEM format</help> <constraint> <validator name="base64"/> </constraint> <constraintErrorMessage>CRL is not base64-encoded</constraintErrorMessage> <multi/> </properties> </leafNode> #include <include/pki/cli-revoke.xml.i> </children> </tagNode> <tagNode name="certificate"> <properties> <help>Certificate</help> <constraint> #include <include/constraint/alpha-numeric-hyphen-underscore-dot.xml.i> </constraint> </properties> <children> #include <include/pki/cli-certificate-base64.xml.i> <node name="acme"> <properties> <help>Automatic Certificate Management Environment (ACME) request</help> </properties> <children> #include <include/url-http-https.xml.i> <leafNode name="url"> <defaultValue>https://acme-v02.api.letsencrypt.org/directory</defaultValue> </leafNode> <leafNode name="domain-name"> <properties> <help>Domain Name</help> <constraint> <validator name="fqdn"/> </constraint> <constraintErrorMessage>Invalid domain name (RFC 1123 section 2).\nMay only contain letters, numbers and .-_</constraintErrorMessage> <multi/> </properties> </leafNode> <leafNode name="email"> <properties> <help>Email address to associate with certificate</help> <constraint> #include <include/constraint/email.xml.i> </constraint> </properties> </leafNode> #include <include/listen-address-ipv4-single.xml.i> <leafNode name="rsa-key-size"> <properties> <help>Size of the RSA key</help> <completionHelp> <list>2048 3072 4096</list> </completionHelp> <valueHelp> <format>2048</format> <description>RSA key length 2048 bit</description> </valueHelp> <valueHelp> <format>3072</format> <description>RSA key length 3072 bit</description> </valueHelp> <valueHelp> <format>4096</format> <description>RSA key length 4096 bit</description> </valueHelp> <constraint> <regex>(2048|3072|4096)</regex> </constraint> </properties> <defaultValue>2048</defaultValue> </leafNode> </children> </node> #include <include/generic-description.xml.i> <node name="private"> <properties> <help>Certificate private key</help> </properties> <children> #include <include/pki/cli-private-key-base64.xml.i> #include <include/pki/password-protected.xml.i> </children> </node> #include <include/pki/cli-revoke.xml.i> </children> </tagNode> <tagNode name="dh"> <properties> <help>Diffie-Hellman parameters</help> <constraint> #include <include/constraint/alpha-numeric-hyphen-underscore-dot.xml.i> </constraint> </properties> <children> <leafNode name="parameters"> <properties> <help>DH parameters in PEM format</help> <constraint> <validator name="base64"/> </constraint> <constraintErrorMessage>DH parameters are not base64-encoded</constraintErrorMessage> </properties> </leafNode> </children> </tagNode> <tagNode name="key-pair"> <properties> <help>Public and private keys</help> </properties> <children> <node name="public"> <properties> <help>Public key</help> </properties> <children> #include <include/pki/cli-public-key-base64.xml.i> </children> </node> <node name="private"> <properties> <help>Private key</help> </properties> <children> #include <include/pki/cli-private-key-base64.xml.i> #include <include/pki/password-protected.xml.i> </children> </node> </children> </tagNode> + <tagNode name="openssh"> + <properties> + <help>OpenSSH public and private keys</help> + </properties> + <children> + <node name="public"> + <properties> + <help>Public key</help> + </properties> + <children> + #include <include/pki/cli-public-key-base64.xml.i> + <leafNode name="type"> + <properties> + <help>SSH public key type</help> + <completionHelp> + <list>ssh-rsa</list> + </completionHelp> + <valueHelp> + <format>ssh-rsa</format> + <description>Key pair based on RSA algorithm</description> + </valueHelp> + <constraint> + <regex>(ssh-rsa)</regex> + </constraint> + </properties> + </leafNode> + </children> + </node> + <node name="private"> + <properties> + <help>Private key</help> + </properties> + <children> + #include <include/pki/cli-private-key-base64.xml.i> + #include <include/pki/password-protected.xml.i> + </children> + </node> + </children> + </tagNode> <tagNode name="openssh"> <properties> <help>OpenSSH public and private keys</help> </properties> <children> <node name="public"> <properties> <help>Public key</help> </properties> <children> #include <include/pki/cli-public-key-base64.xml.i> </children> </node> <node name="private"> <properties> <help>Private key</help> </properties> <children> #include <include/pki/cli-private-key-base64.xml.i> #include <include/pki/password-protected.xml.i> </children> </node> </children> </tagNode> <node name="openvpn"> <properties> <help>OpenVPN keys</help> </properties> <children> <tagNode name="shared-secret"> <properties> <help>OpenVPN shared secret key</help> </properties> <children> <leafNode name="key"> <properties> <help>OpenVPN shared secret key data</help> </properties> </leafNode> <leafNode name="version"> <properties> <help>OpenVPN shared secret key version</help> </properties> </leafNode> </children> </tagNode> </children> </node> <node name="x509"> <properties> <help>X509 Settings</help> </properties> <children> <node name="default"> <properties> <help>X509 Default Values</help> </properties> <children> <leafNode name="country"> <properties> <help>Default country</help> </properties> <defaultValue>GB</defaultValue> </leafNode> <leafNode name="state"> <properties> <help>Default state</help> </properties> <defaultValue>Some-State</defaultValue> </leafNode> <leafNode name="locality"> <properties> <help>Default locality</help> </properties> <defaultValue>Some-City</defaultValue> </leafNode> <leafNode name="organization"> <properties> <help>Default organization</help> </properties> <defaultValue>VyOS</defaultValue> </leafNode> </children> </node> </children> </node> </children> </node> </interfaceDefinition> diff --git a/python/vyos/pki.py b/python/vyos/pki.py index 792e24b76..02dece471 100644 --- a/python/vyos/pki.py +++ b/python/vyos/pki.py @@ -1,432 +1,455 @@ #!/usr/bin/env python3 # -# Copyright (C) 2023 VyOS maintainers and contributors +# Copyright (C) 2023-2024 VyOS maintainers and contributors # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License version 2 or later as # published by the Free Software Foundation. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. import datetime import ipaddress from cryptography import x509 from cryptography.exceptions import InvalidSignature from cryptography.x509.extensions import ExtensionNotFound -from cryptography.x509.oid import NameOID, ExtendedKeyUsageOID, ExtensionOID +from cryptography.x509.oid import NameOID +from cryptography.x509.oid import ExtendedKeyUsageOID +from cryptography.x509.oid import ExtensionOID from cryptography.hazmat.primitives import hashes from cryptography.hazmat.primitives import serialization from cryptography.hazmat.primitives.asymmetric import dh from cryptography.hazmat.primitives.asymmetric import dsa from cryptography.hazmat.primitives.asymmetric import ec from cryptography.hazmat.primitives.asymmetric import padding from cryptography.hazmat.primitives.asymmetric import rsa CERT_BEGIN='-----BEGIN CERTIFICATE-----\n' CERT_END='\n-----END CERTIFICATE-----' KEY_BEGIN='-----BEGIN PRIVATE KEY-----\n' KEY_END='\n-----END PRIVATE KEY-----' KEY_ENC_BEGIN='-----BEGIN ENCRYPTED PRIVATE KEY-----\n' KEY_ENC_END='\n-----END ENCRYPTED PRIVATE KEY-----' KEY_PUB_BEGIN='-----BEGIN PUBLIC KEY-----\n' KEY_PUB_END='\n-----END PUBLIC KEY-----' CRL_BEGIN='-----BEGIN X509 CRL-----\n' CRL_END='\n-----END X509 CRL-----' CSR_BEGIN='-----BEGIN CERTIFICATE REQUEST-----\n' CSR_END='\n-----END CERTIFICATE REQUEST-----' DH_BEGIN='-----BEGIN DH PARAMETERS-----\n' DH_END='\n-----END DH PARAMETERS-----' OVPN_BEGIN = '-----BEGIN OpenVPN Static key V{0}-----\n' OVPN_END = '\n-----END OpenVPN Static key V{0}-----' +OPENSSH_KEY_BEGIN='-----BEGIN OPENSSH PRIVATE KEY-----\n' +OPENSSH_KEY_END='\n-----END OPENSSH PRIVATE KEY-----' # Print functions encoding_map = { 'PEM': serialization.Encoding.PEM, 'OpenSSH': serialization.Encoding.OpenSSH } public_format_map = { 'SubjectPublicKeyInfo': serialization.PublicFormat.SubjectPublicKeyInfo, 'OpenSSH': serialization.PublicFormat.OpenSSH } private_format_map = { 'PKCS8': serialization.PrivateFormat.PKCS8, 'OpenSSH': serialization.PrivateFormat.OpenSSH } hash_map = { 'sha256': hashes.SHA256, 'sha384': hashes.SHA384, 'sha512': hashes.SHA512, } def get_certificate_fingerprint(cert, hash): hash_algorithm = hash_map[hash]() fp = cert.fingerprint(hash_algorithm) return fp.hex(':').upper() def encode_certificate(cert): return cert.public_bytes(encoding=serialization.Encoding.PEM).decode('utf-8') def encode_public_key(cert, encoding='PEM', key_format='SubjectPublicKeyInfo'): if encoding not in encoding_map: encoding = 'PEM' if key_format not in public_format_map: key_format = 'SubjectPublicKeyInfo' return cert.public_bytes( encoding=encoding_map[encoding], format=public_format_map[key_format]).decode('utf-8') def encode_private_key(private_key, encoding='PEM', key_format='PKCS8', passphrase=None): if encoding not in encoding_map: encoding = 'PEM' if key_format not in private_format_map: key_format = 'PKCS8' encryption = serialization.NoEncryption() if not passphrase else serialization.BestAvailableEncryption(bytes(passphrase, 'utf-8')) return private_key.private_bytes( encoding=encoding_map[encoding], format=private_format_map[key_format], encryption_algorithm=encryption).decode('utf-8') def encode_dh_parameters(dh_parameters): return dh_parameters.parameter_bytes( encoding=serialization.Encoding.PEM, format=serialization.ParameterFormat.PKCS3).decode('utf-8') # EC Helper def get_elliptic_curve(size): curve_func = None name = f'SECP{size}R1' if hasattr(ec, name): curve_func = getattr(ec, name) else: curve_func = ec.SECP256R1() # Default to SECP256R1 return curve_func() # Creation functions def create_private_key(key_type, key_size=None): private_key = None if key_type == 'rsa': private_key = rsa.generate_private_key(public_exponent=65537, key_size=key_size) elif key_type == 'dsa': private_key = dsa.generate_private_key(key_size=key_size) elif key_type == 'ec': curve = get_elliptic_curve(key_size) private_key = ec.generate_private_key(curve) return private_key def create_certificate_request(subject, private_key, subject_alt_names=[]): subject_obj = x509.Name([ x509.NameAttribute(NameOID.COUNTRY_NAME, subject['country']), x509.NameAttribute(NameOID.STATE_OR_PROVINCE_NAME, subject['state']), x509.NameAttribute(NameOID.LOCALITY_NAME, subject['locality']), x509.NameAttribute(NameOID.ORGANIZATION_NAME, subject['organization']), x509.NameAttribute(NameOID.COMMON_NAME, subject['common_name'])]) builder = x509.CertificateSigningRequestBuilder() \ .subject_name(subject_obj) if subject_alt_names: alt_names = [] for obj in subject_alt_names: if isinstance(obj, ipaddress.IPv4Address) or isinstance(obj, ipaddress.IPv6Address): alt_names.append(x509.IPAddress(obj)) elif isinstance(obj, str): alt_names.append(x509.DNSName(obj)) if alt_names: builder = builder.add_extension(x509.SubjectAlternativeName(alt_names), critical=False) return builder.sign(private_key, hashes.SHA256()) def add_key_identifier(ca_cert): try: ski_ext = ca_cert.extensions.get_extension_for_class(x509.SubjectKeyIdentifier) return x509.AuthorityKeyIdentifier.from_issuer_subject_key_identifier(ski_ext.value) except: return x509.AuthorityKeyIdentifier.from_issuer_public_key(ca_cert.public_key()) def create_certificate(cert_req, ca_cert, ca_private_key, valid_days=365, cert_type='server', is_ca=False, is_sub_ca=False): ext_key_usage = [] if is_ca: ext_key_usage = [ExtendedKeyUsageOID.CLIENT_AUTH, ExtendedKeyUsageOID.SERVER_AUTH] elif cert_type == 'client': ext_key_usage = [ExtendedKeyUsageOID.CLIENT_AUTH] elif cert_type == 'server': ext_key_usage = [ExtendedKeyUsageOID.SERVER_AUTH] builder = x509.CertificateBuilder() \ .subject_name(cert_req.subject) \ .issuer_name(ca_cert.subject) \ .public_key(cert_req.public_key()) \ .serial_number(x509.random_serial_number()) \ .not_valid_before(datetime.datetime.utcnow()) \ .not_valid_after(datetime.datetime.utcnow() + datetime.timedelta(days=int(valid_days))) builder = builder.add_extension(x509.BasicConstraints(ca=is_ca, path_length=0 if is_sub_ca else None), critical=True) builder = builder.add_extension(x509.KeyUsage( digital_signature=True, content_commitment=False, key_encipherment=False, data_encipherment=False, key_agreement=False, key_cert_sign=is_ca, crl_sign=is_ca, encipher_only=False, decipher_only=False), critical=True) builder = builder.add_extension(x509.ExtendedKeyUsage(ext_key_usage), critical=False) builder = builder.add_extension(x509.SubjectKeyIdentifier.from_public_key(cert_req.public_key()), critical=False) if not is_ca or is_sub_ca: builder = builder.add_extension(add_key_identifier(ca_cert), critical=False) for ext in cert_req.extensions: builder = builder.add_extension(ext.value, critical=False) return builder.sign(ca_private_key, hashes.SHA256()) def create_certificate_revocation_list(ca_cert, ca_private_key, serial_numbers=[]): if not serial_numbers: return False builder = x509.CertificateRevocationListBuilder() \ .issuer_name(ca_cert.subject) \ .last_update(datetime.datetime.today()) \ .next_update(datetime.datetime.today() + datetime.timedelta(1, 0, 0)) for serial_number in serial_numbers: revoked_cert = x509.RevokedCertificateBuilder() \ .serial_number(serial_number) \ .revocation_date(datetime.datetime.today()) \ .build() builder = builder.add_revoked_certificate(revoked_cert) return builder.sign(private_key=ca_private_key, algorithm=hashes.SHA256()) def create_dh_parameters(bits=2048): if not bits or bits < 512: print("Invalid DH parameter key size") return False return dh.generate_parameters(generator=2, key_size=int(bits)) # Wrap functions def wrap_public_key(raw_data): return KEY_PUB_BEGIN + raw_data + KEY_PUB_END def wrap_private_key(raw_data, passphrase=None): return (KEY_ENC_BEGIN if passphrase else KEY_BEGIN) + raw_data + (KEY_ENC_END if passphrase else KEY_END) +def wrap_openssh_public_key(raw_data, type): + return f'{type} {raw_data}' + +def wrap_openssh_private_key(raw_data): + return OPENSSH_KEY_BEGIN + raw_data + OPENSSH_KEY_END + def wrap_certificate_request(raw_data): return CSR_BEGIN + raw_data + CSR_END def wrap_certificate(raw_data): return CERT_BEGIN + raw_data + CERT_END def wrap_crl(raw_data): return CRL_BEGIN + raw_data + CRL_END def wrap_dh_parameters(raw_data): return DH_BEGIN + raw_data + DH_END def wrap_openvpn_key(raw_data, version='1'): return OVPN_BEGIN.format(version) + raw_data + OVPN_END.format(version) # Load functions - def load_public_key(raw_data, wrap_tags=True): if wrap_tags: raw_data = wrap_public_key(raw_data) try: return serialization.load_pem_public_key(bytes(raw_data, 'utf-8')) except ValueError: return False def load_private_key(raw_data, passphrase=None, wrap_tags=True): if wrap_tags: raw_data = wrap_private_key(raw_data, passphrase) if passphrase is not None: passphrase = bytes(passphrase, 'utf-8') try: return serialization.load_pem_private_key(bytes(raw_data, 'utf-8'), password=passphrase) except ValueError: return False +def load_openssh_public_key(raw_data, type): + try: + return serialization.load_ssh_public_key(bytes(f'{type} {raw_data}', 'utf-8')) + except ValueError: + return False + +def load_openssh_private_key(raw_data, passphrase=None, wrap_tags=True): + if wrap_tags: + raw_data = wrap_openssh_private_key(raw_data) + + try: + return serialization.load_ssh_private_key(bytes(raw_data, 'utf-8'), password=passphrase) + except ValueError: + return False + def load_certificate_request(raw_data, wrap_tags=True): if wrap_tags: raw_data = wrap_certificate_request(raw_data) try: return x509.load_pem_x509_csr(bytes(raw_data, 'utf-8')) except ValueError: return False def load_certificate(raw_data, wrap_tags=True): if wrap_tags: raw_data = wrap_certificate(raw_data) try: return x509.load_pem_x509_certificate(bytes(raw_data, 'utf-8')) except ValueError: return False def load_crl(raw_data, wrap_tags=True): if wrap_tags: raw_data = wrap_crl(raw_data) try: return x509.load_pem_x509_crl(bytes(raw_data, 'utf-8')) except ValueError: return False def load_dh_parameters(raw_data, wrap_tags=True): if wrap_tags: raw_data = wrap_dh_parameters(raw_data) try: return serialization.load_pem_parameters(bytes(raw_data, 'utf-8')) except ValueError: return False # Verify def is_ca_certificate(cert): if not cert: return False try: ext = cert.extensions.get_extension_for_oid(ExtensionOID.BASIC_CONSTRAINTS) return ext.value.ca except ExtensionNotFound: return False def verify_certificate(cert, ca_cert): # Verify certificate was signed by specified CA if ca_cert.subject != cert.issuer: return False ca_public_key = ca_cert.public_key() try: if isinstance(ca_public_key, rsa.RSAPublicKeyWithSerialization): ca_public_key.verify( cert.signature, cert.tbs_certificate_bytes, padding=padding.PKCS1v15(), algorithm=cert.signature_hash_algorithm) elif isinstance(ca_public_key, dsa.DSAPublicKeyWithSerialization): ca_public_key.verify( cert.signature, cert.tbs_certificate_bytes, algorithm=cert.signature_hash_algorithm) elif isinstance(ca_public_key, ec.EllipticCurvePublicKeyWithSerialization): ca_public_key.verify( cert.signature, cert.tbs_certificate_bytes, signature_algorithm=ec.ECDSA(cert.signature_hash_algorithm)) else: return False # We cannot verify it return True except InvalidSignature: return False def verify_crl(crl, ca_cert): # Verify CRL was signed by specified CA if ca_cert.subject != crl.issuer: return False ca_public_key = ca_cert.public_key() try: if isinstance(ca_public_key, rsa.RSAPublicKeyWithSerialization): ca_public_key.verify( crl.signature, crl.tbs_certlist_bytes, padding=padding.PKCS1v15(), algorithm=crl.signature_hash_algorithm) elif isinstance(ca_public_key, dsa.DSAPublicKeyWithSerialization): ca_public_key.verify( crl.signature, crl.tbs_certlist_bytes, algorithm=crl.signature_hash_algorithm) elif isinstance(ca_public_key, ec.EllipticCurvePublicKeyWithSerialization): ca_public_key.verify( crl.signature, crl.tbs_certlist_bytes, signature_algorithm=ec.ECDSA(crl.signature_hash_algorithm)) else: return False # We cannot verify it return True except InvalidSignature: return False def verify_ca_chain(sorted_names, pki_node): if len(sorted_names) == 1: # Single cert, no chain return True for name in sorted_names: cert = load_certificate(pki_node[name]['certificate']) verified = False for ca_name in sorted_names: if name == ca_name: continue ca_cert = load_certificate(pki_node[ca_name]['certificate']) if verify_certificate(cert, ca_cert): verified = True break if not verified and name != sorted_names[-1]: # Only permit top-most certificate to fail verify (e.g. signed by public CA not explicitly in chain) return False return True # Certificate chain def find_parent(cert, ca_certs): for ca_cert in ca_certs: if verify_certificate(cert, ca_cert): return ca_cert return None def find_chain(cert, ca_certs): remaining = ca_certs.copy() chain = [cert] while remaining: parent = find_parent(chain[-1], remaining) if parent is None: # No parent in the list of remaining certificates or there's a circular dependency break elif parent == chain[-1]: # Self-signed: must be root CA (end of chain) break else: remaining.remove(parent) chain.append(parent) return chain def sort_ca_chain(ca_names, pki_node): def ca_cmp(ca_name1, ca_name2, pki_node): cert1 = load_certificate(pki_node[ca_name1]['certificate']) cert2 = load_certificate(pki_node[ca_name2]['certificate']) if verify_certificate(cert1, cert2): # cert1 is child of cert2 return -1 return 1 from functools import cmp_to_key return sorted(ca_names, key=cmp_to_key(lambda cert1, cert2: ca_cmp(cert1, cert2, pki_node))) - diff --git a/src/conf_mode/pki.py b/src/conf_mode/pki.py index 4be40e99e..2d076e42d 100755 --- a/src/conf_mode/pki.py +++ b/src/conf_mode/pki.py @@ -1,432 +1,464 @@ #!/usr/bin/env python3 # # Copyright (C) 2021-2024 VyOS maintainers and contributors # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License version 2 or later as # published by the Free Software Foundation. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. import os from sys import argv from sys import exit from vyos.config import Config from vyos.config import config_dict_merge from vyos.configdep import set_dependents from vyos.configdep import call_dependents from vyos.configdict import node_changed from vyos.configdiff import Diff from vyos.defaults import directories from vyos.pki import is_ca_certificate from vyos.pki import load_certificate from vyos.pki import load_public_key +from vyos.pki import load_openssh_public_key +from vyos.pki import load_openssh_private_key from vyos.pki import load_private_key from vyos.pki import load_crl from vyos.pki import load_dh_parameters from vyos.utils.boot import boot_configuration_complete from vyos.utils.dict import dict_search from vyos.utils.dict import dict_search_args from vyos.utils.dict import dict_search_recursive from vyos.utils.process import call from vyos.utils.process import cmd from vyos.utils.process import is_systemd_service_active from vyos import ConfigError from vyos import airbag airbag.enable() vyos_certbot_dir = directories['certbot'] # keys to recursively search for under specified path sync_search = [ { 'keys': ['certificate'], 'path': ['service', 'https'], }, { 'keys': ['certificate', 'ca_certificate'], 'path': ['interfaces', 'ethernet'], }, { 'keys': ['certificate', 'ca_certificate', 'dh_params', 'shared_secret_key', 'auth_key', 'crypt_key'], 'path': ['interfaces', 'openvpn'], }, { 'keys': ['ca_certificate'], 'path': ['interfaces', 'sstpc'], }, { 'keys': ['certificate', 'ca_certificate', 'local_key', 'remote_key'], 'path': ['vpn', 'ipsec'], }, { 'keys': ['certificate', 'ca_certificate'], 'path': ['vpn', 'openconnect'], }, { 'keys': ['certificate', 'ca_certificate'], 'path': ['vpn', 'sstp'], } ] # key from other config nodes -> key in pki['changed'] and pki sync_translate = { 'certificate': 'certificate', 'ca_certificate': 'ca', 'dh_params': 'dh', 'local_key': 'key_pair', 'remote_key': 'key_pair', 'shared_secret_key': 'openvpn', 'auth_key': 'openvpn', 'crypt_key': 'openvpn' } def certbot_delete(certificate): if not boot_configuration_complete(): return if os.path.exists(f'{vyos_certbot_dir}/renewal/{certificate}.conf'): cmd(f'certbot delete --non-interactive --config-dir {vyos_certbot_dir} --cert-name {certificate}') def certbot_request(name: str, config: dict, dry_run: bool=True): # We do not call certbot when booting the system - there is no need to do so and # request new certificates during boot/image upgrade as the certbot configuration # is stored persistent under /config - thus we do not open the door to transient # errors if not boot_configuration_complete(): return domains = '--domains ' + ' --domains '.join(config['domain_name']) tmp = f'certbot certonly --non-interactive --config-dir {vyos_certbot_dir} --cert-name {name} '\ f'--standalone --agree-tos --no-eff-email --expand --server {config["url"]} '\ f'--email {config["email"]} --key-type rsa --rsa-key-size {config["rsa_key_size"]} '\ f'{domains}' if 'listen_address' in config: tmp += f' --http-01-address {config["listen_address"]}' # verify() does not need to actually request a cert but only test for plausability if dry_run: tmp += ' --dry-run' cmd(tmp, raising=ConfigError, message=f'ACME certbot request failed for "{name}"!') def get_config(config=None): if config: conf = config else: conf = Config() base = ['pki'] pki = conf.get_config_dict(base, key_mangling=('-', '_'), get_first_key=True, no_tag_node_value_mangle=True) if len(argv) > 1 and argv[1] == 'certbot_renew': pki['certbot_renew'] = {} tmp = node_changed(conf, base + ['ca'], recursive=True) if tmp: if 'changed' not in pki: pki.update({'changed':{}}) pki['changed'].update({'ca' : tmp}) tmp = node_changed(conf, base + ['certificate'], recursive=True) if tmp: if 'changed' not in pki: pki.update({'changed':{}}) pki['changed'].update({'certificate' : tmp}) tmp = node_changed(conf, base + ['dh'], recursive=True) if tmp: if 'changed' not in pki: pki.update({'changed':{}}) pki['changed'].update({'dh' : tmp}) tmp = node_changed(conf, base + ['key-pair'], recursive=True) if tmp: if 'changed' not in pki: pki.update({'changed':{}}) pki['changed'].update({'key_pair' : tmp}) + tmp = node_changed(conf, base + ['openssh'], recursive=True) + if tmp: + if 'changed' not in pki: pki.update({'changed':{}}) + pki['changed'].update({'openssh' : tmp}) + tmp = node_changed(conf, base + ['openvpn', 'shared-secret'], recursive=True) if tmp: if 'changed' not in pki: pki.update({'changed':{}}) pki['changed'].update({'openvpn' : tmp}) # We only merge on the defaults of there is a configuration at all if conf.exists(base): # We have gathered the dict representation of the CLI, but there are default # options which we need to update into the dictionary retrived. default_values = conf.get_config_defaults(**pki.kwargs, recursive=True) # remove ACME default configuration if unused by CLI if 'certificate' in pki: for name, cert_config in pki['certificate'].items(): if 'acme' not in cert_config: # Remove ACME default values del default_values['certificate'][name]['acme'] # merge CLI and default dictionary pki = config_dict_merge(default_values, pki) # Certbot triggered an external renew of the certificates. # Mark all ACME based certificates as "changed" to trigger # update of dependent services if 'certificate' in pki and 'certbot_renew' in pki: renew = [] for name, cert_config in pki['certificate'].items(): if 'acme' in cert_config: renew.append(name) # If triggered externally by certbot, certificate key is not present in changed if 'changed' not in pki: pki.update({'changed':{}}) pki['changed'].update({'certificate' : renew}) # We need to get the entire system configuration to verify that we are not # deleting a certificate that is still referenced somewhere! pki['system'] = conf.get_config_dict([], key_mangling=('-', '_'), get_first_key=True, no_tag_node_value_mangle=True) for search in sync_search: for key in search['keys']: changed_key = sync_translate[key] if 'changed' not in pki or changed_key not in pki['changed']: continue for item_name in pki['changed'][changed_key]: node_present = False if changed_key == 'openvpn': node_present = dict_search_args(pki, 'openvpn', 'shared_secret', item_name) else: node_present = dict_search_args(pki, changed_key, item_name) if node_present: search_dict = dict_search_args(pki['system'], *search['path']) if not search_dict: continue for found_name, found_path in dict_search_recursive(search_dict, key): if found_name == item_name: path = search['path'] path_str = ' '.join(path + found_path) print(f'PKI: Updating config: {path_str} {found_name}') if path[0] == 'interfaces': ifname = found_path[0] set_dependents(path[1], conf, ifname) else: set_dependents(path[1], conf) return pki def is_valid_certificate(raw_data): # If it loads correctly we're good, or return False return load_certificate(raw_data, wrap_tags=True) def is_valid_ca_certificate(raw_data): # Check if this is a valid certificate with CA attributes cert = load_certificate(raw_data, wrap_tags=True) if not cert: return False return is_ca_certificate(cert) def is_valid_public_key(raw_data): # If it loads correctly we're good, or return False return load_public_key(raw_data, wrap_tags=True) def is_valid_private_key(raw_data, protected=False): # If it loads correctly we're good, or return False # With encrypted private keys, we always return true as we cannot ask for password to verify if protected: return True return load_private_key(raw_data, passphrase=None, wrap_tags=True) +def is_valid_openssh_public_key(raw_data, type): + # If it loads correctly we're good, or return False + return load_openssh_public_key(raw_data, type) + +def is_valid_openssh_private_key(raw_data, protected=False): + # If it loads correctly we're good, or return False + # With encrypted private keys, we always return true as we cannot ask for password to verify + if protected: + return True + return load_openssh_private_key(raw_data, passphrase=None, wrap_tags=True) + def is_valid_crl(raw_data): # If it loads correctly we're good, or return False return load_crl(raw_data, wrap_tags=True) def is_valid_dh_parameters(raw_data): # If it loads correctly we're good, or return False return load_dh_parameters(raw_data, wrap_tags=True) def verify(pki): if not pki: return None if 'ca' in pki: for name, ca_conf in pki['ca'].items(): if 'certificate' in ca_conf: if not is_valid_ca_certificate(ca_conf['certificate']): raise ConfigError(f'Invalid certificate on CA certificate "{name}"') if 'private' in ca_conf and 'key' in ca_conf['private']: private = ca_conf['private'] protected = 'password_protected' in private if not is_valid_private_key(private['key'], protected): raise ConfigError(f'Invalid private key on CA certificate "{name}"') if 'crl' in ca_conf: ca_crls = ca_conf['crl'] if isinstance(ca_crls, str): ca_crls = [ca_crls] for crl in ca_crls: if not is_valid_crl(crl): raise ConfigError(f'Invalid CRL on CA certificate "{name}"') if 'certificate' in pki: for name, cert_conf in pki['certificate'].items(): if 'certificate' in cert_conf: if not is_valid_certificate(cert_conf['certificate']): raise ConfigError(f'Invalid certificate on certificate "{name}"') if 'private' in cert_conf and 'key' in cert_conf['private']: private = cert_conf['private'] protected = 'password_protected' in private if not is_valid_private_key(private['key'], protected): raise ConfigError(f'Invalid private key on certificate "{name}"') if 'acme' in cert_conf: if 'domain_name' not in cert_conf['acme']: raise ConfigError(f'At least one domain-name is required to request '\ f'certificate for "{name}" via ACME!') if 'email' not in cert_conf['acme']: raise ConfigError(f'An email address is required to request '\ f'certificate for "{name}" via ACME!') if 'certbot_renew' not in pki: # Only run the ACME command if something on this entity changed, # as this is time intensive tmp = dict_search('changed.certificate', pki) if tmp != None and name in tmp: certbot_request(name, cert_conf['acme']) if 'dh' in pki: for name, dh_conf in pki['dh'].items(): if 'parameters' in dh_conf: if not is_valid_dh_parameters(dh_conf['parameters']): raise ConfigError(f'Invalid DH parameters on "{name}"') if 'key_pair' in pki: for name, key_conf in pki['key_pair'].items(): if 'public' in key_conf and 'key' in key_conf['public']: if not is_valid_public_key(key_conf['public']['key']): raise ConfigError(f'Invalid public key on key-pair "{name}"') if 'private' in key_conf and 'key' in key_conf['private']: private = key_conf['private'] protected = 'password_protected' in private if not is_valid_private_key(private['key'], protected): raise ConfigError(f'Invalid private key on key-pair "{name}"') + if 'openssh' in pki: + for name, key_conf in pki['openssh'].items(): + if 'public' in key_conf and 'key' in key_conf['public']: + if 'type' not in key_conf['public']: + raise ConfigError(f'Must define OpenSSH public key type for "{name}"') + if not is_valid_openssh_public_key(key_conf['public']['key'], key_conf['public']['type']): + raise ConfigError(f'Invalid OpenSSH public key "{name}"') + + if 'private' in key_conf and 'key' in key_conf['private']: + private = key_conf['private'] + protected = 'password_protected' in private + if not is_valid_openssh_private_key(private['key'], protected): + raise ConfigError(f'Invalid OpenSSH private key "{name}"') + if 'x509' in pki: if 'default' in pki['x509']: default_values = pki['x509']['default'] if 'country' in default_values: country = default_values['country'] if len(country) != 2 or not country.isalpha(): raise ConfigError(f'Invalid default country value. Value must be 2 alpha characters.') if 'changed' in pki: # if the list is getting longer, we can move to a dict() and also embed the # search key as value from line 173 or 176 for search in sync_search: for key in search['keys']: changed_key = sync_translate[key] if changed_key not in pki['changed']: continue for item_name in pki['changed'][changed_key]: node_present = False if changed_key == 'openvpn': node_present = dict_search_args(pki, 'openvpn', 'shared_secret', item_name) else: node_present = dict_search_args(pki, changed_key, item_name) if not node_present: search_dict = dict_search_args(pki['system'], *search['path']) if not search_dict: continue for found_name, found_path in dict_search_recursive(search_dict, key): if found_name == item_name: path_str = " ".join(search['path'] + found_path) raise ConfigError(f'PKI object "{item_name}" still in use by "{path_str}"') return None def generate(pki): if not pki: return None # Certbot renewal only needs to re-trigger the services to load up the # new PEM file if 'certbot_renew' in pki: return None certbot_list = [] certbot_list_on_disk = [] if os.path.exists(f'{vyos_certbot_dir}/live'): certbot_list_on_disk = [f.path.split('/')[-1] for f in os.scandir(f'{vyos_certbot_dir}/live') if f.is_dir()] if 'certificate' in pki: changed_certificates = dict_search('changed.certificate', pki) for name, cert_conf in pki['certificate'].items(): if 'acme' in cert_conf: certbot_list.append(name) # generate certificate if not found on disk if name not in certbot_list_on_disk: certbot_request(name, cert_conf['acme'], dry_run=False) elif changed_certificates != None and name in changed_certificates: # when something for the certificate changed, we should delete it if name in certbot_list_on_disk: certbot_delete(name) certbot_request(name, cert_conf['acme'], dry_run=False) # Cleanup certbot configuration and certificates if no longer in use by CLI # Get foldernames under vyos_certbot_dir which each represent a certbot cert if os.path.exists(f'{vyos_certbot_dir}/live'): for cert in certbot_list_on_disk: if cert not in certbot_list: # certificate is no longer active on the CLI - remove it certbot_delete(cert) return None def apply(pki): systemd_certbot_name = 'certbot.timer' if not pki: call(f'systemctl stop {systemd_certbot_name}') return None has_certbot = False if 'certificate' in pki: for name, cert_conf in pki['certificate'].items(): if 'acme' in cert_conf: has_certbot = True break if not has_certbot: call(f'systemctl stop {systemd_certbot_name}') elif has_certbot and not is_systemd_service_active(systemd_certbot_name): call(f'systemctl restart {systemd_certbot_name}') if 'changed' in pki: call_dependents() return None if __name__ == '__main__': try: c = get_config() verify(c) generate(c) apply(c) except ConfigError as e: print(e) exit(1)