Page MenuHomeVyOS Platform

pki.py
No OneTemporary

Size
24 KB
Referenced Files
None
Subscribers
None
#!/usr/bin/env python3
#
# Copyright (C) 2021-2025 VyOS maintainers and contributors
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License version 2 or later as
# published by the Free Software Foundation.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import os
from sys import argv
from sys import exit
from vyos.base import Message
from vyos.config import Config
from vyos.config import config_dict_merge
from vyos.configdep import set_dependents
from vyos.configdep import call_dependents
from vyos.configdict import node_changed
from vyos.configdiff import Diff
from vyos.configdiff import get_config_diff
from vyos.defaults import directories
from vyos.defaults import internal_ports
from vyos.defaults import systemd_services
from vyos.pki import encode_certificate
from vyos.pki import is_ca_certificate
from vyos.pki import load_certificate
from vyos.pki import load_public_key
from vyos.pki import load_openssh_public_key
from vyos.pki import load_openssh_private_key
from vyos.pki import load_private_key
from vyos.pki import load_crl
from vyos.pki import load_dh_parameters
from vyos.utils.boot import boot_configuration_complete
from vyos.utils.configfs import add_cli_node
from vyos.utils.dict import dict_search
from vyos.utils.dict import dict_search_args
from vyos.utils.dict import dict_search_recursive
from vyos.utils.file import read_file
from vyos.utils.network import check_port_availability
from vyos.utils.process import call
from vyos.utils.process import cmd
from vyos.utils.process import is_systemd_service_active
from vyos.utils.process import is_systemd_service_running
from vyos import ConfigError
from vyos import airbag
airbag.enable()
vyos_certbot_dir = directories['certbot']
vyos_ca_certificates_dir = directories['ca_certificates']
# keys to recursively search for under specified path
sync_search = [
{
'keys': ['certificate'],
'path': ['service', 'https'],
},
{
'keys': ['key'],
'path': ['service', 'ssh'],
},
{
'keys': ['certificate', 'ca_certificate'],
'path': ['interfaces', 'ethernet'],
},
{
'keys': ['certificate', 'ca_certificate', 'dh_params', 'shared_secret_key', 'auth_key', 'crypt_key'],
'path': ['interfaces', 'openvpn'],
},
{
'keys': ['ca_certificate'],
'path': ['interfaces', 'sstpc'],
},
{
'keys': ['certificate', 'ca_certificate'],
'path': ['load_balancing', 'haproxy'],
},
{
'keys': ['key'],
'path': ['protocols', 'rpki', 'cache'],
},
{
'keys': ['certificate', 'ca_certificate', 'local_key', 'remote_key'],
'path': ['vpn', 'ipsec'],
},
{
'keys': ['certificate', 'ca_certificate'],
'path': ['vpn', 'openconnect'],
},
{
'keys': ['certificate', 'ca_certificate'],
'path': ['vpn', 'sstp'],
},
{
'keys': ['certificate', 'ca_certificate'],
'path': ['service', 'stunnel'],
}
]
# key from other config nodes -> key in pki['changed'] and pki
sync_translate = {
'certificate': 'certificate',
'ca_certificate': 'ca',
'dh_params': 'dh',
'local_key': 'key_pair',
'remote_key': 'key_pair',
'shared_secret_key': 'openvpn',
'auth_key': 'openvpn',
'crypt_key': 'openvpn',
'key': 'openssh',
}
def certbot_delete(certificate):
if not boot_configuration_complete():
return
if os.path.exists(f'{vyos_certbot_dir}/renewal/{certificate}.conf'):
cmd(f'certbot delete --non-interactive --config-dir {vyos_certbot_dir} --cert-name {certificate}')
def certbot_request(name: str, config: dict, dry_run: bool=True):
# We do not call certbot when booting the system - there is no need to do so and
# request new certificates during boot/image upgrade as the certbot configuration
# is stored persistent under /config - thus we do not open the door to transient
# errors
if not boot_configuration_complete():
return
domains = '--domains ' + ' --domains '.join(config['domain_name'])
tmp = f'certbot certonly --non-interactive --config-dir {vyos_certbot_dir} --cert-name {name} '\
f'--standalone --agree-tos --no-eff-email --expand --server {config["url"]} '\
f'--email {config["email"]} --key-type rsa --rsa-key-size {config["rsa_key_size"]} '\
f'{domains}'
listen_address = None
if 'listen_address' in config:
listen_address = config['listen_address']
# When ACME is used behind a reverse proxy, we always bind to localhost
# whatever the CLI listen-address is configured for.
if ('haproxy' in dict_search('used_by', config) and
is_systemd_service_running(systemd_services['haproxy']) and
not check_port_availability(listen_address, 80)):
tmp += f' --http-01-address 127.0.0.1 --http-01-port {internal_ports["certbot_haproxy"]}'
elif listen_address:
tmp += f' --http-01-address {listen_address}'
# verify() does not need to actually request a cert but only test for plausability
if dry_run:
tmp += ' --dry-run'
cmd(tmp, raising=ConfigError, message=f'ACME certbot request failed for "{name}"!')
def get_config(config=None):
if config:
conf = config
else:
conf = Config()
base = ['pki']
pki = conf.get_config_dict(base, key_mangling=('-', '_'),
get_first_key=True,
no_tag_node_value_mangle=True)
if len(argv) > 1 and argv[1] == 'certbot_renew':
pki['certbot_renew'] = {}
# Walk through the list of sync_translate mapping and build a list
# which is later used to check if the node was changed in the CLI config
changed_keys = []
for value in sync_translate.values():
if value not in changed_keys:
changed_keys.append(value)
# Check for changes to said given keys in the CLI config
for key in changed_keys:
tmp = node_changed(conf, base + [key], recursive=True, expand_nodes=Diff.DELETE | Diff.ADD)
if 'changed' not in pki:
pki.update({'changed':{}})
pki['changed'].update({key.replace('-', '_') : tmp})
# We only merge on the defaults of there is a configuration at all
if conf.exists(base):
# We have gathered the dict representation of the CLI, but there are default
# options which we need to update into the dictionary retrived.
default_values = conf.get_config_defaults(**pki.kwargs, recursive=True)
# remove ACME default configuration if unused by CLI
if 'certificate' in pki:
for name, cert_config in pki['certificate'].items():
if 'acme' not in cert_config:
# Remove ACME default values
del default_values['certificate'][name]['acme']
# merge CLI and default dictionary
pki = config_dict_merge(default_values, pki)
# Certbot triggered an external renew of the certificates.
# Mark all ACME based certificates as "changed" to trigger
# update of dependent services
if 'certificate' in pki and 'certbot_renew' in pki:
renew = []
for name, cert_config in pki['certificate'].items():
if 'acme' in cert_config:
renew.append(name)
# If triggered externally by certbot, certificate key is not present in changed
if 'changed' not in pki: pki.update({'changed':{}})
pki['changed'].update({'certificate' : renew})
# We need to get the entire system configuration to verify that we are not
# deleting a certificate that is still referenced somewhere!
pki['system'] = conf.get_config_dict([], key_mangling=('-', '_'),
get_first_key=True,
no_tag_node_value_mangle=True)
D = get_config_diff(conf)
for search in sync_search:
for key in search['keys']:
changed_key = sync_translate[key]
if 'changed' not in pki or changed_key not in pki['changed']:
continue
for item_name in pki['changed'][changed_key]:
node_present = False
if changed_key == 'openvpn':
node_present = dict_search_args(pki, 'openvpn', 'shared_secret', item_name)
else:
node_present = dict_search_args(pki, changed_key, item_name)
if node_present:
search_dict = dict_search_args(pki['system'], *search['path'])
if not search_dict:
continue
for found_name, found_path in dict_search_recursive(search_dict, key):
if isinstance(found_name, list) and item_name not in found_name:
continue
if isinstance(found_name, str) and found_name != item_name:
continue
path = search['path']
path_str = ' '.join(path + found_path).replace('_','-')
Message(f'Updating configuration: "{path_str} {item_name}"')
if path[0] == 'interfaces':
ifname = found_path[0]
if not D.node_changed_presence(path + [ifname]):
set_dependents(path[1], conf, ifname)
else:
if not D.node_changed_presence(path):
set_dependents(path[1], conf)
# Check PKI certificates if they are auto-generated by ACME. If they are,
# traverse the current configuration and determine the service where the
# certificate is used by.
# Required to check if we might need to run certbot behing a reverse proxy.
if 'certificate' in pki:
for name, cert_config in pki['certificate'].items():
if 'acme' not in cert_config:
continue
if not dict_search('system.load_balancing.haproxy', pki):
continue
used_by = []
for cert_list, _ in dict_search_recursive(
pki['system']['load_balancing']['haproxy'], 'certificate'):
if name in cert_list:
used_by.append('haproxy')
if used_by:
pki['certificate'][name]['acme'].update({'used_by': used_by})
return pki
def is_valid_certificate(raw_data):
# If it loads correctly we're good, or return False
return load_certificate(raw_data, wrap_tags=True)
def is_valid_ca_certificate(raw_data):
# Check if this is a valid certificate with CA attributes
cert = load_certificate(raw_data, wrap_tags=True)
if not cert:
return False
return is_ca_certificate(cert)
def is_valid_public_key(raw_data):
# If it loads correctly we're good, or return False
return load_public_key(raw_data, wrap_tags=True)
def is_valid_private_key(raw_data, protected=False):
# If it loads correctly we're good, or return False
# With encrypted private keys, we always return true as we cannot ask for password to verify
if protected:
return True
return load_private_key(raw_data, passphrase=None, wrap_tags=True)
def is_valid_openssh_public_key(raw_data, type):
# If it loads correctly we're good, or return False
return load_openssh_public_key(raw_data, type)
def is_valid_openssh_private_key(raw_data, protected=False):
# If it loads correctly we're good, or return False
# With encrypted private keys, we always return true as we cannot ask for password to verify
if protected:
return True
return load_openssh_private_key(raw_data, passphrase=None, wrap_tags=True)
def is_valid_crl(raw_data):
# If it loads correctly we're good, or return False
return load_crl(raw_data, wrap_tags=True)
def is_valid_dh_parameters(raw_data):
# If it loads correctly we're good, or return False
return load_dh_parameters(raw_data, wrap_tags=True)
def verify(pki):
if not pki:
return None
if 'ca' in pki:
for name, ca_conf in pki['ca'].items():
if 'certificate' in ca_conf:
if not is_valid_ca_certificate(ca_conf['certificate']):
raise ConfigError(f'Invalid certificate on CA certificate "{name}"')
if 'private' in ca_conf and 'key' in ca_conf['private']:
private = ca_conf['private']
protected = 'password_protected' in private
if not is_valid_private_key(private['key'], protected):
raise ConfigError(f'Invalid private key on CA certificate "{name}"')
if 'crl' in ca_conf:
ca_crls = ca_conf['crl']
if isinstance(ca_crls, str):
ca_crls = [ca_crls]
for crl in ca_crls:
if not is_valid_crl(crl):
raise ConfigError(f'Invalid CRL on CA certificate "{name}"')
if 'certificate' in pki:
for name, cert_conf in pki['certificate'].items():
if 'certificate' in cert_conf:
if not is_valid_certificate(cert_conf['certificate']):
raise ConfigError(f'Invalid certificate on certificate "{name}"')
if 'private' in cert_conf and 'key' in cert_conf['private']:
private = cert_conf['private']
protected = 'password_protected' in private
if not is_valid_private_key(private['key'], protected):
raise ConfigError(f'Invalid private key on certificate "{name}"')
if 'acme' in cert_conf:
if 'domain_name' not in cert_conf['acme']:
raise ConfigError(f'At least one domain-name is required to request '\
f'certificate for "{name}" via ACME!')
if 'email' not in cert_conf['acme']:
raise ConfigError(f'An email address is required to request '\
f'certificate for "{name}" via ACME!')
listen_address = None
if 'listen_address' in cert_conf['acme']:
listen_address = cert_conf['acme']['listen_address']
if 'used_by' not in cert_conf['acme']:
if not check_port_availability(listen_address, 80):
raise ConfigError('Port 80 is already in use and not available '\
f'to provide ACME challenge for "{name}"!')
if 'certbot_renew' not in pki:
# Only run the ACME command if something on this entity changed,
# as this is time intensive
tmp = dict_search('changed.certificate', pki)
if tmp != None and name in tmp:
certbot_request(name, cert_conf['acme'])
if 'dh' in pki:
for name, dh_conf in pki['dh'].items():
if 'parameters' in dh_conf:
if not is_valid_dh_parameters(dh_conf['parameters']):
raise ConfigError(f'Invalid DH parameters on "{name}"')
if 'key_pair' in pki:
for name, key_conf in pki['key_pair'].items():
if 'public' in key_conf and 'key' in key_conf['public']:
if not is_valid_public_key(key_conf['public']['key']):
raise ConfigError(f'Invalid public key on key-pair "{name}"')
if 'private' in key_conf and 'key' in key_conf['private']:
private = key_conf['private']
protected = 'password_protected' in private
if not is_valid_private_key(private['key'], protected):
raise ConfigError(f'Invalid private key on key-pair "{name}"')
if 'openssh' in pki:
for name, key_conf in pki['openssh'].items():
if 'public' in key_conf and 'key' in key_conf['public']:
if 'type' not in key_conf['public']:
raise ConfigError(f'Must define OpenSSH public key type for "{name}"')
if not is_valid_openssh_public_key(key_conf['public']['key'], key_conf['public']['type']):
raise ConfigError(f'Invalid OpenSSH public key "{name}"')
if 'private' in key_conf and 'key' in key_conf['private']:
private = key_conf['private']
protected = 'password_protected' in private
if not is_valid_openssh_private_key(private['key'], protected):
raise ConfigError(f'Invalid OpenSSH private key "{name}"')
if 'x509' in pki:
if 'default' in pki['x509']:
default_values = pki['x509']['default']
if 'country' in default_values:
country = default_values['country']
if len(country) != 2 or not country.isalpha():
raise ConfigError('Invalid default country value. '\
'Value must be 2 alpha characters.')
if 'changed' in pki:
# if the list is getting longer, we can move to a dict() and also embed the
# search key as value from line 173 or 176
for search in sync_search:
for key in search['keys']:
changed_key = sync_translate[key]
if changed_key not in pki['changed']:
continue
for item_name in pki['changed'][changed_key]:
node_present = False
if changed_key == 'openvpn':
node_present = dict_search_args(pki, 'openvpn', 'shared_secret', item_name)
else:
node_present = dict_search_args(pki, changed_key, item_name)
# If the node is still present, we can skip the check
# as we are not deleting it
if node_present:
continue
search_dict = dict_search_args(pki['system'], *search['path'])
if not search_dict:
continue
for found_name, found_path in dict_search_recursive(search_dict, key):
# Check if the name matches either by string compare, or beeing
# part of a list
if ((isinstance(found_name, str) and found_name == item_name) or
(isinstance(found_name, list) and item_name in found_name)):
# We do not support _ in CLI paths - this is only a convenience
# as we mangle all - to _, now it's time to reverse this!
path_str = ' '.join(search['path'] + found_path).replace('_','-')
object = changed_key.replace('_','-')
tmp = f'Embedded PKI {object} with name "{item_name}" is still '\
f'in use by CLI path "{path_str}"'
raise ConfigError(tmp)
return None
def cleanup_system_ca():
if not os.path.exists(vyos_ca_certificates_dir):
os.mkdir(vyos_ca_certificates_dir)
else:
for filename in os.listdir(vyos_ca_certificates_dir):
full_path = os.path.join(vyos_ca_certificates_dir, filename)
if os.path.isfile(full_path):
os.unlink(full_path)
def generate(pki):
if not pki:
cleanup_system_ca()
return None
# Create or cleanup CA install directory
if 'changed' in pki and 'ca' in pki['changed']:
cleanup_system_ca()
if 'ca' in pki:
for ca, ca_conf in pki['ca'].items():
if 'system_install' in ca_conf:
ca_obj = load_certificate(ca_conf['certificate'])
ca_path = os.path.join(vyos_ca_certificates_dir, f'{ca}.crt')
with open(ca_path, 'w') as f:
f.write(encode_certificate(ca_obj))
# Certbot renewal only needs to re-trigger the services to load up the
# new PEM file
if 'certbot_renew' in pki:
return None
certbot_list = []
certbot_list_on_disk = []
if os.path.exists(f'{vyos_certbot_dir}/live'):
certbot_list_on_disk = [f.path.split('/')[-1] for f in os.scandir(f'{vyos_certbot_dir}/live') if f.is_dir()]
if 'certificate' in pki:
changed_certificates = dict_search('changed.certificate', pki)
for name, cert_conf in pki['certificate'].items():
if 'acme' in cert_conf:
certbot_list.append(name)
# There is no ACME/certbot managed certificate presend on the
# system, generate it
if name not in certbot_list_on_disk:
certbot_request(name, cert_conf['acme'], dry_run=False)
# Now that the certificate was properly generated we have
# the PEM files on disk. We need to add the certificate to
# certbot_list_on_disk to automatically import the CA chain
certbot_list_on_disk.append(name)
# We alredy had an ACME managed certificate on the system, but
# something changed in the configuration
elif changed_certificates != None and name in changed_certificates:
# Delete old ACME certificate first
if name in certbot_list_on_disk:
certbot_delete(name)
# Request new certificate via certbot
certbot_request(name, cert_conf['acme'], dry_run=False)
# Cleanup certbot configuration and certificates if no longer in use by CLI
# Get foldernames under vyos_certbot_dir which each represent a certbot cert
if os.path.exists(f'{vyos_certbot_dir}/live'):
for cert in certbot_list_on_disk:
# ACME certificate is no longer in use by CLI remove it
if cert not in certbot_list:
certbot_delete(cert)
continue
# ACME not enabled for individual certificate - bail out early
if 'acme' not in pki['certificate'][cert]:
continue
# Read in ACME certificate chain information
tmp = read_file(f'{vyos_certbot_dir}/live/{cert}/chain.pem')
tmp = load_certificate(tmp, wrap_tags=False)
cert_chain_base64 = "".join(encode_certificate(tmp).strip().split("\n")[1:-1])
# Check if CA chain certificate is already present on CLI to avoid adding
# a duplicate. This only checks for manual added CA certificates and not
# auto added ones with the AUTOCHAIN_ prefix
autochain_prefix = 'AUTOCHAIN_'
ca_cert_present = False
if 'ca' in pki:
for ca_base64, cli_path in dict_search_recursive(pki['ca'], 'certificate'):
# Ignore automatic added CA certificates
if any(item.startswith(autochain_prefix) for item in cli_path):
continue
if cert_chain_base64 == ca_base64:
ca_cert_present = True
if not ca_cert_present:
tmp = dict_search_args(pki, 'ca', f'{autochain_prefix}{cert}', 'certificate')
if not bool(tmp) or tmp != cert_chain_base64:
Message(f'Add/replace automatically imported CA certificate for "{cert}"...')
add_cli_node(['pki', 'ca', f'{autochain_prefix}{cert}', 'certificate'], value=cert_chain_base64)
return None
def apply(pki):
systemd_certbot_name = 'certbot.timer'
if not pki:
call(f'systemctl stop {systemd_certbot_name}')
call('update-ca-certificates')
return None
has_certbot = False
if 'certificate' in pki:
for name, cert_conf in pki['certificate'].items():
if 'acme' in cert_conf:
has_certbot = True
break
if not has_certbot:
call(f'systemctl stop {systemd_certbot_name}')
elif has_certbot and not is_systemd_service_active(systemd_certbot_name):
call(f'systemctl restart {systemd_certbot_name}')
if 'changed' in pki:
call_dependents()
# Rebuild ca-certificates bundle
if 'ca' in pki['changed']:
call('update-ca-certificates')
return None
if __name__ == '__main__':
try:
c = get_config()
verify(c)
generate(c)
apply(c)
except ConfigError as e:
print(e)
exit(1)

File Metadata

Mime Type
text/x-script.python
Expires
Mon, Dec 15, 9:09 PM (9 h, 14 m)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3067182
Default Alt Text
pki.py (24 KB)

Event Timeline