Page Menu
Home
VyOS Platform
Search
Configure Global Search
Log In
Files
F38742390
pki.py
No One
Temporary
Actions
Download File
Edit File
Delete File
View Transforms
Subscribe
Flag For Later
Award Token
Size
24 KB
Referenced Files
None
Subscribers
None
pki.py
View Options
#!/usr/bin/env python3
#
# Copyright (C) 2021-2025 VyOS maintainers and contributors
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License version 2 or later as
# published by the Free Software Foundation.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import
os
from
sys
import
argv
from
sys
import
exit
from
vyos.base
import
Message
from
vyos.config
import
Config
from
vyos.config
import
config_dict_merge
from
vyos.configdep
import
set_dependents
from
vyos.configdep
import
call_dependents
from
vyos.configdict
import
node_changed
from
vyos.configdiff
import
Diff
from
vyos.configdiff
import
get_config_diff
from
vyos.defaults
import
directories
from
vyos.defaults
import
internal_ports
from
vyos.defaults
import
systemd_services
from
vyos.pki
import
encode_certificate
from
vyos.pki
import
is_ca_certificate
from
vyos.pki
import
load_certificate
from
vyos.pki
import
load_public_key
from
vyos.pki
import
load_openssh_public_key
from
vyos.pki
import
load_openssh_private_key
from
vyos.pki
import
load_private_key
from
vyos.pki
import
load_crl
from
vyos.pki
import
load_dh_parameters
from
vyos.utils.boot
import
boot_configuration_complete
from
vyos.utils.configfs
import
add_cli_node
from
vyos.utils.dict
import
dict_search
from
vyos.utils.dict
import
dict_search_args
from
vyos.utils.dict
import
dict_search_recursive
from
vyos.utils.file
import
read_file
from
vyos.utils.network
import
check_port_availability
from
vyos.utils.process
import
call
from
vyos.utils.process
import
cmd
from
vyos.utils.process
import
is_systemd_service_active
from
vyos.utils.process
import
is_systemd_service_running
from
vyos
import
ConfigError
from
vyos
import
airbag
airbag
.
enable
()
vyos_certbot_dir
=
directories
[
'certbot'
]
vyos_ca_certificates_dir
=
directories
[
'ca_certificates'
]
# keys to recursively search for under specified path
sync_search
=
[
{
'keys'
:
[
'certificate'
],
'path'
:
[
'service'
,
'https'
],
},
{
'keys'
:
[
'key'
],
'path'
:
[
'service'
,
'ssh'
],
},
{
'keys'
:
[
'certificate'
,
'ca_certificate'
],
'path'
:
[
'interfaces'
,
'ethernet'
],
},
{
'keys'
:
[
'certificate'
,
'ca_certificate'
,
'dh_params'
,
'shared_secret_key'
,
'auth_key'
,
'crypt_key'
],
'path'
:
[
'interfaces'
,
'openvpn'
],
},
{
'keys'
:
[
'ca_certificate'
],
'path'
:
[
'interfaces'
,
'sstpc'
],
},
{
'keys'
:
[
'certificate'
,
'ca_certificate'
],
'path'
:
[
'load_balancing'
,
'haproxy'
],
},
{
'keys'
:
[
'key'
],
'path'
:
[
'protocols'
,
'rpki'
,
'cache'
],
},
{
'keys'
:
[
'certificate'
,
'ca_certificate'
,
'local_key'
,
'remote_key'
],
'path'
:
[
'vpn'
,
'ipsec'
],
},
{
'keys'
:
[
'certificate'
,
'ca_certificate'
],
'path'
:
[
'vpn'
,
'openconnect'
],
},
{
'keys'
:
[
'certificate'
,
'ca_certificate'
],
'path'
:
[
'vpn'
,
'sstp'
],
},
{
'keys'
:
[
'certificate'
,
'ca_certificate'
],
'path'
:
[
'service'
,
'stunnel'
],
}
]
# key from other config nodes -> key in pki['changed'] and pki
sync_translate
=
{
'certificate'
:
'certificate'
,
'ca_certificate'
:
'ca'
,
'dh_params'
:
'dh'
,
'local_key'
:
'key_pair'
,
'remote_key'
:
'key_pair'
,
'shared_secret_key'
:
'openvpn'
,
'auth_key'
:
'openvpn'
,
'crypt_key'
:
'openvpn'
,
'key'
:
'openssh'
,
}
def
certbot_delete
(
certificate
):
if
not
boot_configuration_complete
():
return
if
os
.
path
.
exists
(
f
'{vyos_certbot_dir}/renewal/{certificate}.conf'
):
cmd
(
f
'certbot delete --non-interactive --config-dir {vyos_certbot_dir} --cert-name {certificate}'
)
def
certbot_request
(
name
:
str
,
config
:
dict
,
dry_run
:
bool
=
True
):
# We do not call certbot when booting the system - there is no need to do so and
# request new certificates during boot/image upgrade as the certbot configuration
# is stored persistent under /config - thus we do not open the door to transient
# errors
if
not
boot_configuration_complete
():
return
domains
=
'--domains '
+
' --domains '
.
join
(
config
[
'domain_name'
])
tmp
=
f
'certbot certonly --non-interactive --config-dir {vyos_certbot_dir} --cert-name {name} '
\
f
'--standalone --agree-tos --no-eff-email --expand --server {config["url"]} '
\
f
'--email {config["email"]} --key-type rsa --rsa-key-size {config["rsa_key_size"]} '
\
f
'{domains}'
listen_address
=
None
if
'listen_address'
in
config
:
listen_address
=
config
[
'listen_address'
]
# When ACME is used behind a reverse proxy, we always bind to localhost
# whatever the CLI listen-address is configured for.
if
(
'haproxy'
in
dict_search
(
'used_by'
,
config
)
and
is_systemd_service_running
(
systemd_services
[
'haproxy'
])
and
not
check_port_availability
(
listen_address
,
80
)):
tmp
+=
f
' --http-01-address 127.0.0.1 --http-01-port {internal_ports["certbot_haproxy"]}'
elif
listen_address
:
tmp
+=
f
' --http-01-address {listen_address}'
# verify() does not need to actually request a cert but only test for plausability
if
dry_run
:
tmp
+=
' --dry-run'
cmd
(
tmp
,
raising
=
ConfigError
,
message
=
f
'ACME certbot request failed for "{name}"!'
)
def
get_config
(
config
=
None
):
if
config
:
conf
=
config
else
:
conf
=
Config
()
base
=
[
'pki'
]
pki
=
conf
.
get_config_dict
(
base
,
key_mangling
=
(
'-'
,
'_'
),
get_first_key
=
True
,
no_tag_node_value_mangle
=
True
)
if
len
(
argv
)
>
1
and
argv
[
1
]
==
'certbot_renew'
:
pki
[
'certbot_renew'
]
=
{}
# Walk through the list of sync_translate mapping and build a list
# which is later used to check if the node was changed in the CLI config
changed_keys
=
[]
for
value
in
sync_translate
.
values
():
if
value
not
in
changed_keys
:
changed_keys
.
append
(
value
)
# Check for changes to said given keys in the CLI config
for
key
in
changed_keys
:
tmp
=
node_changed
(
conf
,
base
+
[
key
],
recursive
=
True
,
expand_nodes
=
Diff
.
DELETE
|
Diff
.
ADD
)
if
'changed'
not
in
pki
:
pki
.
update
({
'changed'
:{}})
pki
[
'changed'
]
.
update
({
key
.
replace
(
'-'
,
'_'
)
:
tmp
})
# We only merge on the defaults of there is a configuration at all
if
conf
.
exists
(
base
):
# We have gathered the dict representation of the CLI, but there are default
# options which we need to update into the dictionary retrived.
default_values
=
conf
.
get_config_defaults
(
**
pki
.
kwargs
,
recursive
=
True
)
# remove ACME default configuration if unused by CLI
if
'certificate'
in
pki
:
for
name
,
cert_config
in
pki
[
'certificate'
]
.
items
():
if
'acme'
not
in
cert_config
:
# Remove ACME default values
del
default_values
[
'certificate'
][
name
][
'acme'
]
# merge CLI and default dictionary
pki
=
config_dict_merge
(
default_values
,
pki
)
# Certbot triggered an external renew of the certificates.
# Mark all ACME based certificates as "changed" to trigger
# update of dependent services
if
'certificate'
in
pki
and
'certbot_renew'
in
pki
:
renew
=
[]
for
name
,
cert_config
in
pki
[
'certificate'
]
.
items
():
if
'acme'
in
cert_config
:
renew
.
append
(
name
)
# If triggered externally by certbot, certificate key is not present in changed
if
'changed'
not
in
pki
:
pki
.
update
({
'changed'
:{}})
pki
[
'changed'
]
.
update
({
'certificate'
:
renew
})
# We need to get the entire system configuration to verify that we are not
# deleting a certificate that is still referenced somewhere!
pki
[
'system'
]
=
conf
.
get_config_dict
([],
key_mangling
=
(
'-'
,
'_'
),
get_first_key
=
True
,
no_tag_node_value_mangle
=
True
)
D
=
get_config_diff
(
conf
)
for
search
in
sync_search
:
for
key
in
search
[
'keys'
]:
changed_key
=
sync_translate
[
key
]
if
'changed'
not
in
pki
or
changed_key
not
in
pki
[
'changed'
]:
continue
for
item_name
in
pki
[
'changed'
][
changed_key
]:
node_present
=
False
if
changed_key
==
'openvpn'
:
node_present
=
dict_search_args
(
pki
,
'openvpn'
,
'shared_secret'
,
item_name
)
else
:
node_present
=
dict_search_args
(
pki
,
changed_key
,
item_name
)
if
node_present
:
search_dict
=
dict_search_args
(
pki
[
'system'
],
*
search
[
'path'
])
if
not
search_dict
:
continue
for
found_name
,
found_path
in
dict_search_recursive
(
search_dict
,
key
):
if
isinstance
(
found_name
,
list
)
and
item_name
not
in
found_name
:
continue
if
isinstance
(
found_name
,
str
)
and
found_name
!=
item_name
:
continue
path
=
search
[
'path'
]
path_str
=
' '
.
join
(
path
+
found_path
)
.
replace
(
'_'
,
'-'
)
Message
(
f
'Updating configuration: "{path_str} {item_name}"'
)
if
path
[
0
]
==
'interfaces'
:
ifname
=
found_path
[
0
]
if
not
D
.
node_changed_presence
(
path
+
[
ifname
]):
set_dependents
(
path
[
1
],
conf
,
ifname
)
else
:
if
not
D
.
node_changed_presence
(
path
):
set_dependents
(
path
[
1
],
conf
)
# Check PKI certificates if they are auto-generated by ACME. If they are,
# traverse the current configuration and determine the service where the
# certificate is used by.
# Required to check if we might need to run certbot behing a reverse proxy.
if
'certificate'
in
pki
:
for
name
,
cert_config
in
pki
[
'certificate'
]
.
items
():
if
'acme'
not
in
cert_config
:
continue
if
not
dict_search
(
'system.load_balancing.haproxy'
,
pki
):
continue
used_by
=
[]
for
cert_list
,
_
in
dict_search_recursive
(
pki
[
'system'
][
'load_balancing'
][
'haproxy'
],
'certificate'
):
if
name
in
cert_list
:
used_by
.
append
(
'haproxy'
)
if
used_by
:
pki
[
'certificate'
][
name
][
'acme'
]
.
update
({
'used_by'
:
used_by
})
return
pki
def
is_valid_certificate
(
raw_data
):
# If it loads correctly we're good, or return False
return
load_certificate
(
raw_data
,
wrap_tags
=
True
)
def
is_valid_ca_certificate
(
raw_data
):
# Check if this is a valid certificate with CA attributes
cert
=
load_certificate
(
raw_data
,
wrap_tags
=
True
)
if
not
cert
:
return
False
return
is_ca_certificate
(
cert
)
def
is_valid_public_key
(
raw_data
):
# If it loads correctly we're good, or return False
return
load_public_key
(
raw_data
,
wrap_tags
=
True
)
def
is_valid_private_key
(
raw_data
,
protected
=
False
):
# If it loads correctly we're good, or return False
# With encrypted private keys, we always return true as we cannot ask for password to verify
if
protected
:
return
True
return
load_private_key
(
raw_data
,
passphrase
=
None
,
wrap_tags
=
True
)
def
is_valid_openssh_public_key
(
raw_data
,
type
):
# If it loads correctly we're good, or return False
return
load_openssh_public_key
(
raw_data
,
type
)
def
is_valid_openssh_private_key
(
raw_data
,
protected
=
False
):
# If it loads correctly we're good, or return False
# With encrypted private keys, we always return true as we cannot ask for password to verify
if
protected
:
return
True
return
load_openssh_private_key
(
raw_data
,
passphrase
=
None
,
wrap_tags
=
True
)
def
is_valid_crl
(
raw_data
):
# If it loads correctly we're good, or return False
return
load_crl
(
raw_data
,
wrap_tags
=
True
)
def
is_valid_dh_parameters
(
raw_data
):
# If it loads correctly we're good, or return False
return
load_dh_parameters
(
raw_data
,
wrap_tags
=
True
)
def
verify
(
pki
):
if
not
pki
:
return
None
if
'ca'
in
pki
:
for
name
,
ca_conf
in
pki
[
'ca'
]
.
items
():
if
'certificate'
in
ca_conf
:
if
not
is_valid_ca_certificate
(
ca_conf
[
'certificate'
]):
raise
ConfigError
(
f
'Invalid certificate on CA certificate "{name}"'
)
if
'private'
in
ca_conf
and
'key'
in
ca_conf
[
'private'
]:
private
=
ca_conf
[
'private'
]
protected
=
'password_protected'
in
private
if
not
is_valid_private_key
(
private
[
'key'
],
protected
):
raise
ConfigError
(
f
'Invalid private key on CA certificate "{name}"'
)
if
'crl'
in
ca_conf
:
ca_crls
=
ca_conf
[
'crl'
]
if
isinstance
(
ca_crls
,
str
):
ca_crls
=
[
ca_crls
]
for
crl
in
ca_crls
:
if
not
is_valid_crl
(
crl
):
raise
ConfigError
(
f
'Invalid CRL on CA certificate "{name}"'
)
if
'certificate'
in
pki
:
for
name
,
cert_conf
in
pki
[
'certificate'
]
.
items
():
if
'certificate'
in
cert_conf
:
if
not
is_valid_certificate
(
cert_conf
[
'certificate'
]):
raise
ConfigError
(
f
'Invalid certificate on certificate "{name}"'
)
if
'private'
in
cert_conf
and
'key'
in
cert_conf
[
'private'
]:
private
=
cert_conf
[
'private'
]
protected
=
'password_protected'
in
private
if
not
is_valid_private_key
(
private
[
'key'
],
protected
):
raise
ConfigError
(
f
'Invalid private key on certificate "{name}"'
)
if
'acme'
in
cert_conf
:
if
'domain_name'
not
in
cert_conf
[
'acme'
]:
raise
ConfigError
(
f
'At least one domain-name is required to request '
\
f
'certificate for "{name}" via ACME!'
)
if
'email'
not
in
cert_conf
[
'acme'
]:
raise
ConfigError
(
f
'An email address is required to request '
\
f
'certificate for "{name}" via ACME!'
)
listen_address
=
None
if
'listen_address'
in
cert_conf
[
'acme'
]:
listen_address
=
cert_conf
[
'acme'
][
'listen_address'
]
if
'used_by'
not
in
cert_conf
[
'acme'
]:
if
not
check_port_availability
(
listen_address
,
80
):
raise
ConfigError
(
'Port 80 is already in use and not available '
\
f
'to provide ACME challenge for "{name}"!'
)
if
'certbot_renew'
not
in
pki
:
# Only run the ACME command if something on this entity changed,
# as this is time intensive
tmp
=
dict_search
(
'changed.certificate'
,
pki
)
if
tmp
!=
None
and
name
in
tmp
:
certbot_request
(
name
,
cert_conf
[
'acme'
])
if
'dh'
in
pki
:
for
name
,
dh_conf
in
pki
[
'dh'
]
.
items
():
if
'parameters'
in
dh_conf
:
if
not
is_valid_dh_parameters
(
dh_conf
[
'parameters'
]):
raise
ConfigError
(
f
'Invalid DH parameters on "{name}"'
)
if
'key_pair'
in
pki
:
for
name
,
key_conf
in
pki
[
'key_pair'
]
.
items
():
if
'public'
in
key_conf
and
'key'
in
key_conf
[
'public'
]:
if
not
is_valid_public_key
(
key_conf
[
'public'
][
'key'
]):
raise
ConfigError
(
f
'Invalid public key on key-pair "{name}"'
)
if
'private'
in
key_conf
and
'key'
in
key_conf
[
'private'
]:
private
=
key_conf
[
'private'
]
protected
=
'password_protected'
in
private
if
not
is_valid_private_key
(
private
[
'key'
],
protected
):
raise
ConfigError
(
f
'Invalid private key on key-pair "{name}"'
)
if
'openssh'
in
pki
:
for
name
,
key_conf
in
pki
[
'openssh'
]
.
items
():
if
'public'
in
key_conf
and
'key'
in
key_conf
[
'public'
]:
if
'type'
not
in
key_conf
[
'public'
]:
raise
ConfigError
(
f
'Must define OpenSSH public key type for "{name}"'
)
if
not
is_valid_openssh_public_key
(
key_conf
[
'public'
][
'key'
],
key_conf
[
'public'
][
'type'
]):
raise
ConfigError
(
f
'Invalid OpenSSH public key "{name}"'
)
if
'private'
in
key_conf
and
'key'
in
key_conf
[
'private'
]:
private
=
key_conf
[
'private'
]
protected
=
'password_protected'
in
private
if
not
is_valid_openssh_private_key
(
private
[
'key'
],
protected
):
raise
ConfigError
(
f
'Invalid OpenSSH private key "{name}"'
)
if
'x509'
in
pki
:
if
'default'
in
pki
[
'x509'
]:
default_values
=
pki
[
'x509'
][
'default'
]
if
'country'
in
default_values
:
country
=
default_values
[
'country'
]
if
len
(
country
)
!=
2
or
not
country
.
isalpha
():
raise
ConfigError
(
'Invalid default country value. '
\
'Value must be 2 alpha characters.'
)
if
'changed'
in
pki
:
# if the list is getting longer, we can move to a dict() and also embed the
# search key as value from line 173 or 176
for
search
in
sync_search
:
for
key
in
search
[
'keys'
]:
changed_key
=
sync_translate
[
key
]
if
changed_key
not
in
pki
[
'changed'
]:
continue
for
item_name
in
pki
[
'changed'
][
changed_key
]:
node_present
=
False
if
changed_key
==
'openvpn'
:
node_present
=
dict_search_args
(
pki
,
'openvpn'
,
'shared_secret'
,
item_name
)
else
:
node_present
=
dict_search_args
(
pki
,
changed_key
,
item_name
)
# If the node is still present, we can skip the check
# as we are not deleting it
if
node_present
:
continue
search_dict
=
dict_search_args
(
pki
[
'system'
],
*
search
[
'path'
])
if
not
search_dict
:
continue
for
found_name
,
found_path
in
dict_search_recursive
(
search_dict
,
key
):
# Check if the name matches either by string compare, or beeing
# part of a list
if
((
isinstance
(
found_name
,
str
)
and
found_name
==
item_name
)
or
(
isinstance
(
found_name
,
list
)
and
item_name
in
found_name
)):
# We do not support _ in CLI paths - this is only a convenience
# as we mangle all - to _, now it's time to reverse this!
path_str
=
' '
.
join
(
search
[
'path'
]
+
found_path
)
.
replace
(
'_'
,
'-'
)
object
=
changed_key
.
replace
(
'_'
,
'-'
)
tmp
=
f
'Embedded PKI {object} with name "{item_name}" is still '
\
f
'in use by CLI path "{path_str}"'
raise
ConfigError
(
tmp
)
return
None
def
cleanup_system_ca
():
if
not
os
.
path
.
exists
(
vyos_ca_certificates_dir
):
os
.
mkdir
(
vyos_ca_certificates_dir
)
else
:
for
filename
in
os
.
listdir
(
vyos_ca_certificates_dir
):
full_path
=
os
.
path
.
join
(
vyos_ca_certificates_dir
,
filename
)
if
os
.
path
.
isfile
(
full_path
):
os
.
unlink
(
full_path
)
def
generate
(
pki
):
if
not
pki
:
cleanup_system_ca
()
return
None
# Create or cleanup CA install directory
if
'changed'
in
pki
and
'ca'
in
pki
[
'changed'
]:
cleanup_system_ca
()
if
'ca'
in
pki
:
for
ca
,
ca_conf
in
pki
[
'ca'
]
.
items
():
if
'system_install'
in
ca_conf
:
ca_obj
=
load_certificate
(
ca_conf
[
'certificate'
])
ca_path
=
os
.
path
.
join
(
vyos_ca_certificates_dir
,
f
'{ca}.crt'
)
with
open
(
ca_path
,
'w'
)
as
f
:
f
.
write
(
encode_certificate
(
ca_obj
))
# Certbot renewal only needs to re-trigger the services to load up the
# new PEM file
if
'certbot_renew'
in
pki
:
return
None
certbot_list
=
[]
certbot_list_on_disk
=
[]
if
os
.
path
.
exists
(
f
'{vyos_certbot_dir}/live'
):
certbot_list_on_disk
=
[
f
.
path
.
split
(
'/'
)[
-
1
]
for
f
in
os
.
scandir
(
f
'{vyos_certbot_dir}/live'
)
if
f
.
is_dir
()]
if
'certificate'
in
pki
:
changed_certificates
=
dict_search
(
'changed.certificate'
,
pki
)
for
name
,
cert_conf
in
pki
[
'certificate'
]
.
items
():
if
'acme'
in
cert_conf
:
certbot_list
.
append
(
name
)
# There is no ACME/certbot managed certificate presend on the
# system, generate it
if
name
not
in
certbot_list_on_disk
:
certbot_request
(
name
,
cert_conf
[
'acme'
],
dry_run
=
False
)
# Now that the certificate was properly generated we have
# the PEM files on disk. We need to add the certificate to
# certbot_list_on_disk to automatically import the CA chain
certbot_list_on_disk
.
append
(
name
)
# We alredy had an ACME managed certificate on the system, but
# something changed in the configuration
elif
changed_certificates
!=
None
and
name
in
changed_certificates
:
# Delete old ACME certificate first
if
name
in
certbot_list_on_disk
:
certbot_delete
(
name
)
# Request new certificate via certbot
certbot_request
(
name
,
cert_conf
[
'acme'
],
dry_run
=
False
)
# Cleanup certbot configuration and certificates if no longer in use by CLI
# Get foldernames under vyos_certbot_dir which each represent a certbot cert
if
os
.
path
.
exists
(
f
'{vyos_certbot_dir}/live'
):
for
cert
in
certbot_list_on_disk
:
# ACME certificate is no longer in use by CLI remove it
if
cert
not
in
certbot_list
:
certbot_delete
(
cert
)
continue
# ACME not enabled for individual certificate - bail out early
if
'acme'
not
in
pki
[
'certificate'
][
cert
]:
continue
# Read in ACME certificate chain information
tmp
=
read_file
(
f
'{vyos_certbot_dir}/live/{cert}/chain.pem'
)
tmp
=
load_certificate
(
tmp
,
wrap_tags
=
False
)
cert_chain_base64
=
""
.
join
(
encode_certificate
(
tmp
)
.
strip
()
.
split
(
"
\n
"
)[
1
:
-
1
])
# Check if CA chain certificate is already present on CLI to avoid adding
# a duplicate. This only checks for manual added CA certificates and not
# auto added ones with the AUTOCHAIN_ prefix
autochain_prefix
=
'AUTOCHAIN_'
ca_cert_present
=
False
if
'ca'
in
pki
:
for
ca_base64
,
cli_path
in
dict_search_recursive
(
pki
[
'ca'
],
'certificate'
):
# Ignore automatic added CA certificates
if
any
(
item
.
startswith
(
autochain_prefix
)
for
item
in
cli_path
):
continue
if
cert_chain_base64
==
ca_base64
:
ca_cert_present
=
True
if
not
ca_cert_present
:
tmp
=
dict_search_args
(
pki
,
'ca'
,
f
'{autochain_prefix}{cert}'
,
'certificate'
)
if
not
bool
(
tmp
)
or
tmp
!=
cert_chain_base64
:
Message
(
f
'Add/replace automatically imported CA certificate for "{cert}"...'
)
add_cli_node
([
'pki'
,
'ca'
,
f
'{autochain_prefix}{cert}'
,
'certificate'
],
value
=
cert_chain_base64
)
return
None
def
apply
(
pki
):
systemd_certbot_name
=
'certbot.timer'
if
not
pki
:
call
(
f
'systemctl stop {systemd_certbot_name}'
)
call
(
'update-ca-certificates'
)
return
None
has_certbot
=
False
if
'certificate'
in
pki
:
for
name
,
cert_conf
in
pki
[
'certificate'
]
.
items
():
if
'acme'
in
cert_conf
:
has_certbot
=
True
break
if
not
has_certbot
:
call
(
f
'systemctl stop {systemd_certbot_name}'
)
elif
has_certbot
and
not
is_systemd_service_active
(
systemd_certbot_name
):
call
(
f
'systemctl restart {systemd_certbot_name}'
)
if
'changed'
in
pki
:
call_dependents
()
# Rebuild ca-certificates bundle
if
'ca'
in
pki
[
'changed'
]:
call
(
'update-ca-certificates'
)
return
None
if
__name__
==
'__main__'
:
try
:
c
=
get_config
()
verify
(
c
)
generate
(
c
)
apply
(
c
)
except
ConfigError
as
e
:
print
(
e
)
exit
(
1
)
File Metadata
Details
Attached
Mime Type
text/x-script.python
Expires
Mon, Dec 15, 9:09 PM (9 h, 14 m)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3067182
Default Alt Text
pki.py (24 KB)
Attached To
Mode
rVYOSONEX vyos-1x
Attached
Detach File
Event Timeline
Log In to Comment