diff --git a/debian/control b/debian/control index af42202fc..8343144c4 100644 --- a/debian/control +++ b/debian/control @@ -1,330 +1,333 @@ Source: vyos-1x Section: contrib/net Priority: extra Maintainer: VyOS Package Maintainers <maintainers@vyos.net> Build-Depends: debhelper (>= 9), dh-python, fakeroot, gcc, iproute2, libvyosconfig0 (>= 0.0.7), libzmq3-dev, python3 (>= 3.10), # For generating command definitions python3-lxml, python3-xmltodict, # For running tests python3-coverage, python3-netifaces, python3-nose, python3-jinja2, python3-psutil, python3-setuptools, python3-sphinx, quilt, whois Standards-Version: 3.9.6 Package: vyos-1x Architecture: amd64 arm64 Pre-Depends: libnss-tacplus [amd64], libpam-tacplus [amd64], libpam-radius-auth [amd64] Depends: ## Fundamentals ${python3:Depends} (>= 3.10), libvyosconfig0, vyatta-bash, vyatta-cfg, vyos-http-api-tools, vyos-utils, ## End of Fundamentals ## Python libraries used in multiple modules and scripts python3, python3-certbot-nginx, python3-cryptography, python3-hurry.filesize, python3-inotify, python3-jinja2, python3-jmespath, python3-netaddr, python3-netifaces, python3-paramiko, python3-passlib, python3-psutil, python3-pyhumps, python3-pystache, python3-pyudev, python3-six, python3-tabulate, python3-voluptuous, python3-xmltodict, python3-zmq, ## End of Python libraries ## Basic System services and utilities sudo, systemd, bsdmainutils, openssl, curl, dbus, file, iproute2 (>= 6.0.0), linux-cpupower, # ipaddrcheck is widely used in IP value validators ipaddrcheck, ethtool, fdisk, lm-sensors, procps, netplug, sed, ssl-cert, tuned, beep, wide-dhcpv6-client, # Generic colorizer grc, ## End of System services and utilities ## For the installer # Image signature verification tool minisign, # Live filesystem tools squashfs-tools, fuse-overlayfs, ## End installer auditd, iputils-arping, isc-dhcp-client, # For "vpn pptp", "vpn l2tp", "vpn sstp", "service ipoe-server" accel-ppp, # End "vpn pptp", "vpn l2tp", "vpn sstp", "service ipoe-server" avahi-daemon, conntrack, conntrackd, ## Conf mode features # For "interfaces wireless" hostapd, hsflowd, iw, wireless-regdb, wpasupplicant (>= 0.6.7), # End "interfaces wireless" # For "interfaces wwan" modemmanager, usb-modeswitch, libqmi-utils, # End "interfaces wwan" # For "interfaces openvpn" openvpn, openvpn-auth-ldap, openvpn-auth-radius, openvpn-otp, libpam-google-authenticator, # End "interfaces openvpn" # For "interfaces wireguard" wireguard-tools, qrencode, # End "interfaces wireguard" # For "interfaces pppoe" pppoe, # End "interfaces pppoe" # For "interfaces sstpc" sstp-client, # End "interfaces sstpc" # For "protocols *" frr (>= 7.5), frr-pythontools, frr-rpki-rtrlib, frr-snmp, # End "protocols *" # For "protocols nhrp" (part of DMVPN) opennhrp, # End "protocols nhrp" # For "protocols igmp-proxy" igmpproxy, # End "protocols igmp-proxy" +# For "pki" + certbot, +# End "pki" # For "service console-server" conserver-client, conserver-server, console-data, dropbear, # End "service console-server" # For "service aws glb" aws-gwlbtun, # For "service dns dynamic" ddclient (>= 3.11.1), # End "service dns dynamic" # # For "service ids" fastnetmon [amd64], # End "service ids" # # For "service ndp-proxy" ndppd, # End "service ndp-proxy" # For "service router-advert" radvd, # End "service route-advert" # For "high-availability reverse-proxy" haproxy, # End "high-availability reverse-proxy" # For "service dhcp-relay" isc-dhcp-relay, # For "service dhcp-server" kea, # End "service dhcp-server" # For "service lldp" lldpd, # End "service lldp" # For "service https" nginx-light, # End "service https" # For "service ssh" openssh-server, sshguard, # End "service ssh" # For "service salt-minion" salt-minion, # End "service salt-minion" # For "service snmp" snmp, snmpd, # End "service snmp" # For "service upnp" miniupnpd-nftables, # End "service upnp" # For "service webproxy" squid, squidclient, squidguard, # End "service webproxy" # For "service monitoring telegraf" telegraf (>= 1.20), # End "service monitoring telegraf" # For "service monitoring zabbix-agent" zabbix-agent2, # End "service monitoring zabbix-agent" # For "service tftp-server" tftpd-hpa, # End "service tftp-server" # For "service dns forwarding" pdns-recursor, # End "service dns forwarding" # For "service sla owamp" owamp-client, owamp-server, # End "service sla owamp" # For "service sla twamp" twamp-client, twamp-server, # End "service sla twamp" # For "service broadcast-relay" udp-broadcast-relay, # End "service broadcast-relay" # For "high-availability vrrp" keepalived (>=2.0.5), # End "high-availability-vrrp" # For "system task-scheduler" cron, # End "system task-scheduler" # For "system lcd" lcdproc, lcdproc-extra-drivers, # End "system lcd" # For "system config-management commit-archive" git, # End "system config-management commit-archive" # For firewall libndp-tools, libnetfilter-conntrack3, libnfnetlink0, nfct, nftables (>= 0.9.3), # For "vpn ipsec" strongswan (>= 5.9), strongswan-swanctl (>= 5.9), charon-systemd, libcharon-extra-plugins (>=5.9), libcharon-extauth-plugins (>=5.9), libstrongswan-extra-plugins (>=5.9), libstrongswan-standard-plugins (>=5.9), python3-vici (>= 5.7.2), # End "vpn ipsec" # For "nat64" jool, # End "nat64" # For "system ntp" chrony, # End "system ntp" # For "vpn openconnect" ocserv, # End "vpn openconnect" # For "system flow-accounting" pmacct (>= 1.6.0), # End "system flow-accounting" # For container podman, netavark, aardvark-dns, # iptables is only used for containers now, not the the firewall CLI iptables, # End container ## End Configuration mode ## Operational mode # Used for hypervisor model in "run show version" hvinfo, # For "run traceroute" traceroute, # For "run monitor traffic" tcpdump, # End "run monitor traffic" # For "show hardware dmi" dmidecode, # For "run show hardware storage smart" smartmontools, # For "run show hardware scsi" lsscsi, # For "run show hardware pci" pciutils, # For "show hardware usb" usbutils, # For "run show hardware storage nvme" nvme-cli, # For "run monitor bandwidth-test" iperf, iperf3, # End "run monitor bandwidth-test" # For "run wake-on-lan" etherwake, # For "run force ipv6-nd" ndisc6, # For "run monitor bandwidth" bmon, # End Operational mode ## Optional utilities easy-rsa, tcptraceroute, mtr-tiny, telnet, stunnel4, uidmap ## End optional utilities Description: VyOS configuration scripts and data VyOS configuration scripts, interface definitions, and everything Package: vyos-1x-vmware Architecture: amd64 Depends: vyos-1x, open-vm-tools Description: VyOS configuration scripts and data for VMware Adds configuration files required for VyOS running on VMware hosts. Package: vyos-1x-smoketest Architecture: all Depends: skopeo, snmp, vyos-1x Description: VyOS build sanity checking toolkit diff --git a/interface-definitions/include/constraint/email.xml.i b/interface-definitions/include/constraint/email.xml.i new file mode 100644 index 000000000..b19a88d64 --- /dev/null +++ b/interface-definitions/include/constraint/email.xml.i @@ -0,0 +1,3 @@ +<!-- include start from constraint/email.xml.i --> +<regex>[^\s@]+@([^\s@.,]+\.)+[^\s@.,]{2,}</regex> +<!-- include end --> diff --git a/interface-definitions/pki.xml.in b/interface-definitions/pki.xml.in index 097c541ac..0ed199539 100644 --- a/interface-definitions/pki.xml.in +++ b/interface-definitions/pki.xml.in @@ -1,243 +1,297 @@ <?xml version="1.0"?> <interfaceDefinition> <node name="pki" owner="${vyos_conf_scripts_dir}/pki.py"> <properties> <help>VyOS PKI configuration</help> <priority>300</priority> </properties> <children> <tagNode name="ca"> <properties> <help>Certificate Authority</help> <constraint> #include <include/constraint/alpha-numeric-hyphen-underscore-dot.xml.i> </constraint> </properties> <children> <leafNode name="certificate"> <properties> <help>CA certificate in PEM format</help> <constraint> <validator name="base64"/> </constraint> <constraintErrorMessage>CA certificate is not base64-encoded</constraintErrorMessage> </properties> </leafNode> #include <include/generic-description.xml.i> <node name="private"> <properties> <help>CA private key in PEM format</help> </properties> <children> <leafNode name="key"> <properties> <help>CA private key in PEM format</help> <constraint> <validator name="base64"/> </constraint> <constraintErrorMessage>CA private key is not base64-encoded</constraintErrorMessage> </properties> </leafNode> <leafNode name="password-protected"> <properties> <help>CA private key is password protected</help> <valueless/> </properties> </leafNode> </children> </node> <leafNode name="crl"> <properties> <help>Certificate revocation list in PEM format</help> <constraint> <validator name="base64"/> </constraint> <constraintErrorMessage>CRL is not base64-encoded</constraintErrorMessage> <multi/> </properties> </leafNode> <leafNode name="revoke"> <properties> <help>If parent CA is present, this CA certificate will be included in generated CRLs</help> <valueless/> </properties> </leafNode> </children> </tagNode> <tagNode name="certificate"> <properties> <help>Certificate</help> <constraint> #include <include/constraint/alpha-numeric-hyphen-underscore-dot.xml.i> </constraint> </properties> <children> <leafNode name="certificate"> <properties> <help>Certificate in PEM format</help> <constraint> <validator name="base64"/> </constraint> <constraintErrorMessage>Certificate is not base64-encoded</constraintErrorMessage> </properties> </leafNode> + <node name="acme"> + <properties> + <help>Automatic Certificate Management Environment (ACME) request</help> + </properties> + <children> + #include <include/url-http-https.xml.i> + <leafNode name="url"> + <defaultValue>https://acme-v02.api.letsencrypt.org/directory</defaultValue> + </leafNode> + <leafNode name="domain-name"> + <properties> + <help>Domain Name</help> + <constraint> + <validator name="fqdn"/> + </constraint> + <constraintErrorMessage>Invalid domain name (RFC 1123 section 2).\nMay only contain letters, numbers and .-_</constraintErrorMessage> + <multi/> + </properties> + </leafNode> + <leafNode name="email"> + <properties> + <help>Email address to associate with certificate</help> + <constraint> + #include <include/constraint/email.xml.i> + </constraint> + </properties> + </leafNode> + #include <include/listen-address-ipv4-single.xml.i> + <leafNode name="rsa-key-size"> + <properties> + <help>Size of the RSA key</help> + <completionHelp> + <list>2048 3072 4096</list> + </completionHelp> + <valueHelp> + <format>2048</format> + <description>RSA key length 2048 bit</description> + </valueHelp> + <valueHelp> + <format>3072</format> + <description>RSA key length 3072 bit</description> + </valueHelp> + <valueHelp> + <format>4096</format> + <description>RSA key length 4096 bit</description> + </valueHelp> + <constraint> + <regex>(2048|3072|4096)</regex> + </constraint> + </properties> + <defaultValue>2048</defaultValue> + </leafNode> + </children> + </node> #include <include/generic-description.xml.i> <node name="private"> <properties> <help>Certificate private key</help> </properties> <children> <leafNode name="key"> <properties> <help>Certificate private key in PEM format</help> <constraint> <validator name="base64"/> </constraint> <constraintErrorMessage>Certificate private key is not base64-encoded</constraintErrorMessage> </properties> </leafNode> <leafNode name="password-protected"> <properties> <help>Certificate private key is password protected</help> <valueless/> </properties> </leafNode> </children> </node> <leafNode name="revoke"> <properties> <help>If CA is present, this certificate will be included in generated CRLs</help> <valueless/> </properties> </leafNode> </children> </tagNode> <tagNode name="dh"> <properties> <help>Diffie-Hellman parameters</help> <constraint> #include <include/constraint/alpha-numeric-hyphen-underscore-dot.xml.i> </constraint> </properties> <children> <leafNode name="parameters"> <properties> <help>DH parameters in PEM format</help> <constraint> <validator name="base64"/> </constraint> <constraintErrorMessage>DH parameters are not base64-encoded</constraintErrorMessage> </properties> </leafNode> </children> </tagNode> <tagNode name="key-pair"> <properties> <help>Public and private keys</help> </properties> <children> <node name="public"> <properties> <help>Public key</help> </properties> <children> <leafNode name="key"> <properties> <help>Public key in PEM format</help> <constraint> <validator name="base64"/> </constraint> <constraintErrorMessage>Public key is not base64-encoded</constraintErrorMessage> </properties> </leafNode> </children> </node> <node name="private"> <properties> <help>Private key</help> </properties> <children> <leafNode name="key"> <properties> <help>Private key in PEM format</help> <constraint> <validator name="base64"/> </constraint> <constraintErrorMessage>Private key is not base64-encoded</constraintErrorMessage> </properties> </leafNode> <leafNode name="password-protected"> <properties> <help>Private key is password protected</help> <valueless/> </properties> </leafNode> </children> </node> </children> </tagNode> <node name="openvpn"> <properties> <help>OpenVPN keys</help> </properties> <children> <tagNode name="shared-secret"> <properties> <help>OpenVPN shared secret key</help> </properties> <children> <leafNode name="key"> <properties> <help>OpenVPN shared secret key data</help> </properties> </leafNode> <leafNode name="version"> <properties> <help>OpenVPN shared secret key version</help> </properties> </leafNode> </children> </tagNode> </children> </node> <node name="x509"> <properties> <help>X509 Settings</help> </properties> <children> <node name="default"> <properties> <help>X509 Default Values</help> </properties> <children> <leafNode name="country"> <properties> <help>Default country</help> </properties> <defaultValue>GB</defaultValue> </leafNode> <leafNode name="state"> <properties> <help>Default state</help> </properties> <defaultValue>Some-State</defaultValue> </leafNode> <leafNode name="locality"> <properties> <help>Default locality</help> </properties> <defaultValue>Some-City</defaultValue> </leafNode> <leafNode name="organization"> <properties> <help>Default organization</help> </properties> <defaultValue>VyOS</defaultValue> </leafNode> </children> </node> </children> </node> </children> </node> </interfaceDefinition> diff --git a/python/vyos/config.py b/python/vyos/config.py index ca7b035e5..bee85315d 100644 --- a/python/vyos/config.py +++ b/python/vyos/config.py @@ -1,583 +1,620 @@ # Copyright 2017, 2019-2023 VyOS maintainers and contributors <maintainers@vyos.io> # # This library is free software; you can redistribute it and/or # modify it under the terms of the GNU Lesser General Public # License as published by the Free Software Foundation; either # version 2.1 of the License, or (at your option) any later version. # # This library is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU # Lesser General Public License for more details. # # You should have received a copy of the GNU Lesser General Public # License along with this library. If not, see <http://www.gnu.org/licenses/>. """ A library for reading VyOS running config data. This library is used internally by all config scripts of VyOS, but its API should be considered stable and safe to use in user scripts. Note that this module will not work outside VyOS. Node taxonomy ############# There are multiple types of config tree nodes in VyOS, each requires its own set of operations. *Leaf nodes* (such as "address" in interfaces) can have values, but cannot have children. Leaf nodes can have one value, multiple values, or no values at all. For example, "system host-name" is a single-value leaf node, "system name-server" is a multi-value leaf node (commonly abbreviated "multi node"), and "system ip disable-forwarding" is a valueless leaf node. Non-leaf nodes cannot have values, but they can have child nodes. They are divided into two classes depending on whether the names of their children are fixed or not. For example, under "system", the names of all valid child nodes are predefined ("login", "name-server" etc.). To the contrary, children of the "system task-scheduler task" node can have arbitrary names. Such nodes are called *tag nodes*. This terminology is confusing but we keep using it for lack of a better word. No one remembers if the "tag" in "task Foo" is "task" or "Foo", but the distinction is irrelevant in practice. Configuration modes ################### VyOS has two distinct modes: operational mode and configuration mode. When a user logins, the CLI is in the operational mode. In this mode, only the running (effective) config is accessible for reading. When a user enters the "configure" command, a configuration session is setup. Every config session has its *proposed* (or *session*) config built on top of the current running config. When changes are commited, if commit succeeds, the proposed config is merged into the running config. In configuration mode, "base" functions like `exists`, `return_value` return values from the session config, while functions prefixed "effective" return values from the running config. In operational mode, all functions return values from the running config. """ import re import json from copy import deepcopy from typing import Union import vyos.configtree from vyos.xml_ref import multi_to_list from vyos.xml_ref import from_source from vyos.xml_ref import ext_dict_merge from vyos.xml_ref import relative_defaults from vyos.utils.dict import get_sub_dict from vyos.utils.dict import mangle_dict_keys from vyos.configsource import ConfigSource from vyos.configsource import ConfigSourceSession class ConfigDict(dict): _from_defaults = {} _dict_kwargs = {} def from_defaults(self, path: list[str]) -> bool: return from_source(self._from_defaults, path) @property def kwargs(self) -> dict: return self._dict_kwargs def config_dict_merge(src: dict, dest: Union[dict, ConfigDict]) -> ConfigDict: if not isinstance(dest, ConfigDict): dest = ConfigDict(dest) return ext_dict_merge(src, dest) +def config_dict_mangle_acme(name, cli_dict): + """ + Load CLI PKI dictionary and if an ACME certificate is used, load it's content + and place it into the CLI dictionary as it would be a "regular" CLI PKI based + certificate with private key + """ + from vyos.base import ConfigError + from vyos.defaults import directories + from vyos.utils.file import read_file + from vyos.pki import encode_certificate + from vyos.pki import encode_private_key + from vyos.pki import load_certificate + from vyos.pki import load_private_key + + try: + vyos_certbot_dir = directories['certbot'] + + if 'acme' in cli_dict: + tmp = read_file(f'{vyos_certbot_dir}/live/{name}/cert.pem') + tmp = load_certificate(tmp, wrap_tags=False) + cert_base64 = "".join(encode_certificate(tmp).strip().split("\n")[1:-1]) + + tmp = read_file(f'{vyos_certbot_dir}/live/{name}/privkey.pem') + tmp = load_private_key(tmp, wrap_tags=False) + key_base64 = "".join(encode_private_key(tmp).strip().split("\n")[1:-1]) + # install ACME based PEM keys into "regular" CLI config keys + cli_dict.update({'certificate' : cert_base64, 'private' : {'key' : key_base64}}) + except: + raise ConfigError(f'Unable to load ACME certificates for "{name}"!') + + return cli_dict + class Config(object): """ The class of config access objects. Internally, in the current implementation, this object is *almost* stateless, the only state it keeps is relative *config path* for convenient access to config subtrees. """ def __init__(self, session_env=None, config_source=None): if config_source is None: self._config_source = ConfigSourceSession(session_env) else: if not isinstance(config_source, ConfigSource): raise TypeError("config_source not of type ConfigSource") self._config_source = config_source self._level = [] self._dict_cache = {} (self._running_config, self._session_config) = self._config_source.get_configtree_tuple() def get_config_tree(self, effective=False): if effective: return self._running_config return self._session_config def _make_path(self, path): # Backwards-compatibility stuff: original implementation used string paths # libvyosconfig paths are lists, but since node names cannot contain whitespace, # splitting at whitespace is reasonably safe. # It may cause problems with exists() when it's used for checking values, # since values may contain whitespace. if isinstance(path, str): path = re.split(r'\s+', path) elif isinstance(path, list): pass else: raise TypeError("Path must be a whitespace-separated string or a list") return (self._level + path) def set_level(self, path): """ Set the *edit level*, that is, a relative config tree path. Once set, all operations will be relative to this path, for example, after ``set_level("system")``, calling ``exists("name-server")`` is equivalent to calling ``exists("system name-server"`` without ``set_level``. Args: path (str|list): relative config path """ # Make sure there's always a space between default path (level) # and path supplied as method argument # XXX: for small strings in-place concatenation is not a problem if isinstance(path, str): if path: self._level = re.split(r'\s+', path) else: self._level = [] elif isinstance(path, list): self._level = path.copy() else: raise TypeError("Level path must be either a whitespace-separated string or a list") def get_level(self): """ Gets the current edit level. Returns: str: current edit level """ return(self._level.copy()) def exists(self, path): """ Checks if a node or value with given path exists in the proposed config. Args: path (str): Configuration tree path Returns: True if node or value exists in the proposed config, False otherwise Note: This function should not be used outside of configuration sessions. In operational mode scripts, use ``exists_effective``. """ if self._session_config is None: return False # Assume the path is a node path first if self._session_config.exists(self._make_path(path)): return True else: # If that check fails, it may mean the path has a value at the end. # libvyosconfig exists() works only for _nodes_, not _values_ # libvyattacfg also worked for values, so we emulate that case here if isinstance(path, str): path = re.split(r'\s+', path) path_without_value = path[:-1] try: # return_values() is safe to use with single-value nodes, # it simply returns a single-item list in that case. values = self._session_config.return_values(self._make_path(path_without_value)) # If we got this far, the node does exist and has values, # so we need to check if it has the value in question among its values. return (path[-1] in values) except vyos.configtree.ConfigTreeError: # Even the parent node doesn't exist at all return False def session_changed(self): """ Returns: True if the config session has uncommited changes, False otherwise. """ return self._config_source.session_changed() def in_session(self): """ Returns: True if called from a configuration session, False otherwise. """ return self._config_source.in_session() def show_config(self, path=[], default=None, effective=False): """ Args: path (str list): Configuration tree path, or empty default (str): Default value to return Returns: str: working configuration """ return self._config_source.show_config(path, default, effective) def get_cached_root_dict(self, effective=False): cached = self._dict_cache.get(effective, {}) if cached: return cached if effective: config = self._running_config else: config = self._session_config if config: config_dict = json.loads(config.to_json()) else: config_dict = {} self._dict_cache[effective] = config_dict return config_dict def verify_mangling(self, key_mangling): if not (isinstance(key_mangling, tuple) and \ (len(key_mangling) == 2) and \ isinstance(key_mangling[0], str) and \ isinstance(key_mangling[1], str)): raise ValueError("key_mangling must be a tuple of two strings") def get_config_dict(self, path=[], effective=False, key_mangling=None, get_first_key=False, no_multi_convert=False, no_tag_node_value_mangle=False, with_defaults=False, with_recursive_defaults=False, with_pki=False): """ Args: path (str list): Configuration tree path, can be empty effective=False: effective or session config key_mangling=None: mangle dict keys according to regex and replacement get_first_key=False: if k = path[:-1], return sub-dict d[k] instead of {k: d[k]} no_multi_convert=False: if convert, return single value of multi node as list Returns: a dict representation of the config under path """ kwargs = locals().copy() del kwargs['self'] del kwargs['no_multi_convert'] del kwargs['with_defaults'] del kwargs['with_recursive_defaults'] del kwargs['with_pki'] lpath = self._make_path(path) root_dict = self.get_cached_root_dict(effective) conf_dict = get_sub_dict(root_dict, lpath, get_first_key=get_first_key) rpath = lpath if get_first_key else lpath[:-1] if not no_multi_convert: conf_dict = multi_to_list(rpath, conf_dict) if key_mangling is not None: self.verify_mangling(key_mangling) conf_dict = mangle_dict_keys(conf_dict, key_mangling[0], key_mangling[1], abs_path=rpath, no_tag_node_value_mangle=no_tag_node_value_mangle) if with_defaults or with_recursive_defaults: defaults = self.get_config_defaults(**kwargs, recursive=with_recursive_defaults) conf_dict = config_dict_merge(defaults, conf_dict) else: conf_dict = ConfigDict(conf_dict) if with_pki and conf_dict: pki_dict = self.get_config_dict(['pki'], key_mangling=('-', '_'), no_tag_node_value_mangle=True, get_first_key=True) if pki_dict: + if 'certificate' in pki_dict: + for certificate in pki_dict['certificate']: + pki_dict['certificate'][certificate] = config_dict_mangle_acme( + certificate, pki_dict['certificate'][certificate]) + conf_dict['pki'] = pki_dict # save optional args for a call to get_config_defaults setattr(conf_dict, '_dict_kwargs', kwargs) return conf_dict def get_config_defaults(self, path=[], effective=False, key_mangling=None, no_tag_node_value_mangle=False, get_first_key=False, recursive=False) -> dict: lpath = self._make_path(path) root_dict = self.get_cached_root_dict(effective) conf_dict = get_sub_dict(root_dict, lpath, get_first_key) defaults = relative_defaults(lpath, conf_dict, get_first_key=get_first_key, recursive=recursive) rpath = lpath if get_first_key else lpath[:-1] if key_mangling is not None: self.verify_mangling(key_mangling) defaults = mangle_dict_keys(defaults, key_mangling[0], key_mangling[1], abs_path=rpath, no_tag_node_value_mangle=no_tag_node_value_mangle) return defaults def merge_defaults(self, config_dict: ConfigDict, recursive=False): if not isinstance(config_dict, ConfigDict): raise TypeError('argument is not of type ConfigDict') if not config_dict.kwargs: raise ValueError('argument missing metadata') args = config_dict.kwargs d = self.get_config_defaults(**args, recursive=recursive) config_dict = config_dict_merge(d, config_dict) return config_dict def is_multi(self, path): """ Args: path (str): Configuration tree path Returns: True if a node can have multiple values, False otherwise. Note: It also returns False if node doesn't exist. """ self._config_source.set_level(self.get_level) return self._config_source.is_multi(path) def is_tag(self, path): """ Args: path (str): Configuration tree path Returns: True if a node is a tag node, False otherwise. Note: It also returns False if node doesn't exist. """ self._config_source.set_level(self.get_level) return self._config_source.is_tag(path) def is_leaf(self, path): """ Args: path (str): Configuration tree path Returns: True if a node is a leaf node, False otherwise. Note: It also returns False if node doesn't exist. """ self._config_source.set_level(self.get_level) return self._config_source.is_leaf(path) def return_value(self, path, default=None): """ Retrieve a value of single-value leaf node in the running or proposed config Args: path (str): Configuration tree path default (str): Default value to return if node does not exist Returns: str: Node value, if it has any None: if node is valueless *or* if it doesn't exist Note: Due to the issue with treatment of valueless nodes by this function, valueless nodes should be checked with ``exists`` instead. This function cannot be used outside a configuration session. In operational mode scripts, use ``return_effective_value``. """ if self._session_config: try: value = self._session_config.return_value(self._make_path(path)) except vyos.configtree.ConfigTreeError: value = None else: value = None if not value: return(default) else: return(value) def return_values(self, path, default=[]): """ Retrieve all values of a multi-value leaf node in the running or proposed config Args: path (str): Configuration tree path Returns: str list: Node values, if it has any []: if node does not exist Note: This function cannot be used outside a configuration session. In operational mode scripts, use ``return_effective_values``. """ if self._session_config: try: values = self._session_config.return_values(self._make_path(path)) except vyos.configtree.ConfigTreeError: values = [] else: values = [] if not values: return(default.copy()) else: return(values) def list_nodes(self, path, default=[]): """ Retrieve names of all children of a tag node in the running or proposed config Args: path (str): Configuration tree path Returns: string list: child node names """ if self._session_config: try: nodes = self._session_config.list_nodes(self._make_path(path)) except vyos.configtree.ConfigTreeError: nodes = [] else: nodes = [] if not nodes: return(default.copy()) else: return(nodes) def exists_effective(self, path): """ Checks if a node or value exists in the running (effective) config. Args: path (str): Configuration tree path Returns: True if node exists in the running config, False otherwise Note: This function is safe to use in operational mode. In configuration mode, it ignores uncommited changes. """ if self._running_config is None: return False # Assume the path is a node path first if self._running_config.exists(self._make_path(path)): return True else: # If that check fails, it may mean the path has a value at the end. # libvyosconfig exists() works only for _nodes_, not _values_ # libvyattacfg also worked for values, so we emulate that case here if isinstance(path, str): path = re.split(r'\s+', path) path_without_value = path[:-1] try: # return_values() is safe to use with single-value nodes, # it simply returns a single-item list in that case. values = self._running_config.return_values(self._make_path(path_without_value)) # If we got this far, the node does exist and has values, # so we need to check if it has the value in question among its values. return (path[-1] in values) except vyos.configtree.ConfigTreeError: # Even the parent node doesn't exist at all return False def return_effective_value(self, path, default=None): """ Retrieve a values of a single-value leaf node in a running (effective) config Args: path (str): Configuration tree path default (str): Default value to return if node does not exist Returns: str: Node value """ if self._running_config: try: value = self._running_config.return_value(self._make_path(path)) except vyos.configtree.ConfigTreeError: value = None else: value = None if not value: return(default) else: return(value) def return_effective_values(self, path, default=[]): """ Retrieve all values of a multi-value node in a running (effective) config Args: path (str): Configuration tree path Returns: str list: A list of values """ if self._running_config: try: values = self._running_config.return_values(self._make_path(path)) except vyos.configtree.ConfigTreeError: values = [] else: values = [] if not values: return(default.copy()) else: return(values) def list_effective_nodes(self, path, default=[]): """ Retrieve names of all children of a tag node in the running config Args: path (str): Configuration tree path Returns: str list: child node names """ if self._running_config: try: nodes = self._running_config.list_nodes(self._make_path(path)) except vyos.configtree.ConfigTreeError: nodes = [] else: nodes = [] if not nodes: return(default.copy()) else: return(nodes) diff --git a/src/conf_mode/pki.py b/src/conf_mode/pki.py index f7e14aa16..310519abd 100755 --- a/src/conf_mode/pki.py +++ b/src/conf_mode/pki.py @@ -1,305 +1,426 @@ #!/usr/bin/env python3 # -# Copyright (C) 2021 VyOS maintainers and contributors +# Copyright (C) 2021-2024 VyOS maintainers and contributors # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License version 2 or later as # published by the Free Software Foundation. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. +import os + +from sys import argv from sys import exit from vyos.config import Config -from vyos.configdep import set_dependents, call_dependents +from vyos.config import config_dict_merge +from vyos.configdep import set_dependents +from vyos.configdep import call_dependents from vyos.configdict import node_changed +from vyos.configdiff import Diff +from vyos.defaults import directories from vyos.pki import is_ca_certificate from vyos.pki import load_certificate from vyos.pki import load_public_key from vyos.pki import load_private_key from vyos.pki import load_crl from vyos.pki import load_dh_parameters +from vyos.utils.boot import boot_configuration_complete +from vyos.utils.dict import dict_search from vyos.utils.dict import dict_search_args from vyos.utils.dict import dict_search_recursive +from vyos.utils.process import call +from vyos.utils.process import cmd +from vyos.utils.process import is_systemd_service_active from vyos import ConfigError from vyos import airbag airbag.enable() -# keys to recursively search for under specified path, script to call if update required +vyos_certbot_dir = directories['certbot'] + +# keys to recursively search for under specified path sync_search = [ { 'keys': ['certificate'], 'path': ['service', 'https'], - 'script': '/usr/libexec/vyos/conf_mode/service_https.py' }, { 'keys': ['certificate', 'ca_certificate'], 'path': ['interfaces', 'ethernet'], - 'script': '/usr/libexec/vyos/conf_mode/interfaces_ethernet.py' }, { 'keys': ['certificate', 'ca_certificate', 'dh_params', 'shared_secret_key', 'auth_key', 'crypt_key'], 'path': ['interfaces', 'openvpn'], - 'script': '/usr/libexec/vyos/conf_mode/interfaces_openvpn.py' }, { 'keys': ['ca_certificate'], 'path': ['interfaces', 'sstpc'], - 'script': '/usr/libexec/vyos/conf_mode/interfaces_sstpc.py' }, { 'keys': ['certificate', 'ca_certificate', 'local_key', 'remote_key'], 'path': ['vpn', 'ipsec'], - 'script': '/usr/libexec/vyos/conf_mode/vpn_ipsec.py' }, { 'keys': ['certificate', 'ca_certificate'], 'path': ['vpn', 'openconnect'], - 'script': '/usr/libexec/vyos/conf_mode/vpn_openconnect.py' }, { 'keys': ['certificate', 'ca_certificate'], 'path': ['vpn', 'sstp'], - 'script': '/usr/libexec/vyos/conf_mode/vpn_sstp.py' } ] # key from other config nodes -> key in pki['changed'] and pki sync_translate = { 'certificate': 'certificate', 'ca_certificate': 'ca', 'dh_params': 'dh', 'local_key': 'key_pair', 'remote_key': 'key_pair', 'shared_secret_key': 'openvpn', 'auth_key': 'openvpn', 'crypt_key': 'openvpn' } +def certbot_delete(certificate): + if not boot_configuration_complete(): + return + if os.path.exists(f'{vyos_certbot_dir}/renewal/{certificate}.conf'): + cmd(f'certbot delete --non-interactive --config-dir {vyos_certbot_dir} --cert-name {certificate}') + +def certbot_request(name: str, config: dict, dry_run: bool=True): + # We do not call certbot when booting the system - there is no need to do so and + # request new certificates during boot/image upgrade as the certbot configuration + # is stored persistent under /config - thus we do not open the door to transient + # errors + if not boot_configuration_complete(): + return + + domains = '--domains ' + ' --domains '.join(config['domain_name']) + tmp = f'certbot certonly --config-dir {vyos_certbot_dir} --cert-name {name} '\ + f'--non-interactive --standalone --agree-tos --no-eff-email --expand '\ + f'--server {config["url"]} --email {config["email"]} '\ + f'--key-type rsa --rsa-key-size {config["rsa_key_size"]} {domains}' + if 'listen_address' in config: + tmp += f' --http-01-address {config["listen_address"]}' + # verify() does not need to actually request a cert but only test for plausability + if dry_run: + tmp += ' --dry-run' + + cmd(tmp, raising=ConfigError, message=f'ACME certbot request failed for "{name}"!') + def get_config(config=None): if config: conf = config else: conf = Config() base = ['pki'] pki = conf.get_config_dict(base, key_mangling=('-', '_'), get_first_key=True, no_tag_node_value_mangle=True) - pki['changed'] = {} + if len(argv) > 1 and argv[1] == 'certbot_renew': + pki['certbot_renew'] = {} + tmp = node_changed(conf, base + ['ca'], key_mangling=('-', '_'), recursive=True) - if tmp: pki['changed'].update({'ca' : tmp}) + if tmp: + if 'changed' not in pki: pki.update({'changed':{}}) + pki['changed'].update({'ca' : tmp}) - tmp = node_changed(conf, base + ['certificate'], key_mangling=('-', '_'), recursive=True) - if tmp: pki['changed'].update({'certificate' : tmp}) + tmp = node_changed(conf, base + ['certificate'], key_mangling=('-', '_'), + recursive=True, expand_nodes=Diff.ADD|Diff.DELETE) + if tmp: + if 'changed' not in pki: pki.update({'changed':{}}) + pki['changed'].update({'certificate' : tmp}) tmp = node_changed(conf, base + ['dh'], key_mangling=('-', '_'), recursive=True) - if tmp: pki['changed'].update({'dh' : tmp}) + if tmp: + if 'changed' not in pki: pki.update({'changed':{}}) + pki['changed'].update({'dh' : tmp}) tmp = node_changed(conf, base + ['key-pair'], key_mangling=('-', '_'), recursive=True) - if tmp: pki['changed'].update({'key_pair' : tmp}) + if tmp: + if 'changed' not in pki: pki.update({'changed':{}}) + pki['changed'].update({'key_pair' : tmp}) - tmp = node_changed(conf, base + ['openvpn', 'shared-secret'], key_mangling=('-', '_'), recursive=True) - if tmp: pki['changed'].update({'openvpn' : tmp}) + tmp = node_changed(conf, base + ['openvpn', 'shared-secret'], key_mangling=('-', '_'), + recursive=True) + if tmp: + if 'changed' not in pki: pki.update({'changed':{}}) + pki['changed'].update({'openvpn' : tmp}) # We only merge on the defaults of there is a configuration at all if conf.exists(base): - pki = conf.merge_defaults(pki, recursive=True) + # We have gathered the dict representation of the CLI, but there are default + # options which we need to update into the dictionary retrived. + default_values = conf.get_config_defaults(**pki.kwargs, recursive=True) + # remove ACME default configuration if unused by CLI + if 'certificate' in pki: + for name, cert_config in pki['certificate'].items(): + if 'acme' not in cert_config: + # Remove ACME default values + del default_values['certificate'][name]['acme'] + + # merge CLI and default dictionary + pki = config_dict_merge(default_values, pki) + + # Certbot triggered an external renew of the certificates. + # Mark all ACME based certificates as "changed" to trigger + # update of dependent services + if 'certificate' in pki and 'certbot_renew' in pki: + renew = [] + for name, cert_config in pki['certificate'].items(): + if 'acme' in cert_config: + renew.append(name) + # If triggered externally by certbot, certificate key is not present in changed + if 'changed' not in pki: pki.update({'changed':{}}) + pki['changed'].update({'certificate' : renew}) # We need to get the entire system configuration to verify that we are not # deleting a certificate that is still referenced somewhere! pki['system'] = conf.get_config_dict([], key_mangling=('-', '_'), get_first_key=True, no_tag_node_value_mangle=True) - if 'changed' in pki: - for search in sync_search: - for key in search['keys']: - changed_key = sync_translate[key] - - if changed_key not in pki['changed']: - continue - - for item_name in pki['changed'][changed_key]: - node_present = False - if changed_key == 'openvpn': - node_present = dict_search_args(pki, 'openvpn', 'shared_secret', item_name) - else: - node_present = dict_search_args(pki, changed_key, item_name) - - if node_present: - search_dict = dict_search_args(pki['system'], *search['path']) - - if not search_dict: - continue - - for found_name, found_path in dict_search_recursive(search_dict, key): - if found_name == item_name: - path = search['path'] - path_str = ' '.join(path + found_path) - print(f'pki: Updating config: {path_str} {found_name}') - - if path[0] == 'interfaces': - ifname = found_path[0] - set_dependents(path[1], conf, ifname) - else: - set_dependents(path[1], conf) + for search in sync_search: + for key in search['keys']: + changed_key = sync_translate[key] + if 'changed' not in pki or changed_key not in pki['changed']: + continue + + for item_name in pki['changed'][changed_key]: + node_present = False + if changed_key == 'openvpn': + node_present = dict_search_args(pki, 'openvpn', 'shared_secret', item_name) + else: + node_present = dict_search_args(pki, changed_key, item_name) + + if node_present: + search_dict = dict_search_args(pki['system'], *search['path']) + if not search_dict: + continue + for found_name, found_path in dict_search_recursive(search_dict, key): + if found_name == item_name: + path = search['path'] + path_str = ' '.join(path + found_path) + print(f'pki: Updating config: {path_str} {found_name}') + + if path[0] == 'interfaces': + ifname = found_path[0] + set_dependents(path[1], conf, ifname) + else: + set_dependents(path[1], conf) return pki def is_valid_certificate(raw_data): # If it loads correctly we're good, or return False return load_certificate(raw_data, wrap_tags=True) def is_valid_ca_certificate(raw_data): # Check if this is a valid certificate with CA attributes cert = load_certificate(raw_data, wrap_tags=True) if not cert: return False return is_ca_certificate(cert) def is_valid_public_key(raw_data): # If it loads correctly we're good, or return False return load_public_key(raw_data, wrap_tags=True) def is_valid_private_key(raw_data, protected=False): # If it loads correctly we're good, or return False # With encrypted private keys, we always return true as we cannot ask for password to verify if protected: return True return load_private_key(raw_data, passphrase=None, wrap_tags=True) def is_valid_crl(raw_data): # If it loads correctly we're good, or return False return load_crl(raw_data, wrap_tags=True) def is_valid_dh_parameters(raw_data): # If it loads correctly we're good, or return False return load_dh_parameters(raw_data, wrap_tags=True) def verify(pki): if not pki: return None if 'ca' in pki: for name, ca_conf in pki['ca'].items(): if 'certificate' in ca_conf: if not is_valid_ca_certificate(ca_conf['certificate']): raise ConfigError(f'Invalid certificate on CA certificate "{name}"') if 'private' in ca_conf and 'key' in ca_conf['private']: private = ca_conf['private'] protected = 'password_protected' in private if not is_valid_private_key(private['key'], protected): raise ConfigError(f'Invalid private key on CA certificate "{name}"') if 'crl' in ca_conf: ca_crls = ca_conf['crl'] if isinstance(ca_crls, str): ca_crls = [ca_crls] for crl in ca_crls: if not is_valid_crl(crl): raise ConfigError(f'Invalid CRL on CA certificate "{name}"') if 'certificate' in pki: for name, cert_conf in pki['certificate'].items(): if 'certificate' in cert_conf: if not is_valid_certificate(cert_conf['certificate']): raise ConfigError(f'Invalid certificate on certificate "{name}"') if 'private' in cert_conf and 'key' in cert_conf['private']: private = cert_conf['private'] protected = 'password_protected' in private if not is_valid_private_key(private['key'], protected): raise ConfigError(f'Invalid private key on certificate "{name}"') + if 'acme' in cert_conf: + if 'domain_name' not in cert_conf['acme']: + raise ConfigError(f'At least one domain-name is required to request '\ + f'certificate for "{name}" via ACME!') + + if 'email' not in cert_conf['acme']: + raise ConfigError(f'An email address is required to request '\ + f'certificate for "{name}" via ACME!') + + if 'certbot_renew' not in pki: + # Only run the ACME command if something on this entity changed, + # as this is time intensive + tmp = dict_search('changed.certificate', pki) + if tmp != None and name in tmp: + certbot_request(name, cert_conf['acme']) + if 'dh' in pki: for name, dh_conf in pki['dh'].items(): if 'parameters' in dh_conf: if not is_valid_dh_parameters(dh_conf['parameters']): raise ConfigError(f'Invalid DH parameters on "{name}"') if 'key_pair' in pki: for name, key_conf in pki['key_pair'].items(): if 'public' in key_conf and 'key' in key_conf['public']: if not is_valid_public_key(key_conf['public']['key']): raise ConfigError(f'Invalid public key on key-pair "{name}"') if 'private' in key_conf and 'key' in key_conf['private']: private = key_conf['private'] protected = 'password_protected' in private if not is_valid_private_key(private['key'], protected): raise ConfigError(f'Invalid private key on key-pair "{name}"') if 'x509' in pki: if 'default' in pki['x509']: default_values = pki['x509']['default'] if 'country' in default_values: country = default_values['country'] if len(country) != 2 or not country.isalpha(): raise ConfigError(f'Invalid default country value. Value must be 2 alpha characters.') if 'changed' in pki: # if the list is getting longer, we can move to a dict() and also embed the # search key as value from line 173 or 176 for search in sync_search: for key in search['keys']: changed_key = sync_translate[key] if changed_key not in pki['changed']: continue for item_name in pki['changed'][changed_key]: node_present = False if changed_key == 'openvpn': node_present = dict_search_args(pki, 'openvpn', 'shared_secret', item_name) else: node_present = dict_search_args(pki, changed_key, item_name) if not node_present: search_dict = dict_search_args(pki['system'], *search['path']) if not search_dict: continue for found_name, found_path in dict_search_recursive(search_dict, key): if found_name == item_name: path_str = " ".join(search['path'] + found_path) raise ConfigError(f'PKI object "{item_name}" still in use by "{path_str}"') return None def generate(pki): if not pki: return None + # Certbot renewal only needs to re-trigger the services to load up the + # new PEM file + if 'certbot_renew' in pki: + return None + + # list of certificates issued via certbot + certbot_list = [] + if 'certificate' in pki: + for name, cert_conf in pki['certificate'].items(): + if 'acme' in cert_conf: + certbot_list.append(name) + # when something for the certificate changed, we should delete it + if name in dict_search('changed.certificate', pki): + certbot_delete(name) + certbot_request(name, cert_conf['acme'], dry_run=False) + + # Cleanup certbot configuration and certificates if no longer in use by CLI + # Get foldernames under vyos_certbot_dir which each represent a certbot cert + if os.path.exists(f'{vyos_certbot_dir}/live'): + for cert in [f.path.split('/')[-1] for f in os.scandir(f'{vyos_certbot_dir}/live') if f.is_dir()]: + if cert not in certbot_list: + # certificate is no longer active on the CLI - remove it + certbot_delete(cert) + return None def apply(pki): + systemd_certbot_name = 'certbot.timer' if not pki: + call(f'systemctl stop {systemd_certbot_name}') return None + has_certbot = False + if 'certificate' in pki: + for name, cert_conf in pki['certificate'].items(): + if 'acme' in cert_conf: + has_certbot = True + break + + if not has_certbot: + call(f'systemctl stop {systemd_certbot_name}') + elif has_certbot and not is_systemd_service_active(systemd_certbot_name): + call(f'systemctl restart {systemd_certbot_name}') + if 'changed' in pki: call_dependents() return None if __name__ == '__main__': try: c = get_config() verify(c) generate(c) apply(c) except ConfigError as e: print(e) exit(1) diff --git a/src/etc/systemd/system/certbot.service.d/10-override.conf b/src/etc/systemd/system/certbot.service.d/10-override.conf new file mode 100644 index 000000000..542f77eb2 --- /dev/null +++ b/src/etc/systemd/system/certbot.service.d/10-override.conf @@ -0,0 +1,7 @@ +[Unit] +After= +After=vyos-router.service + +[Service] +ExecStart= +ExecStart=/usr/bin/certbot renew --config-dir /config/auth/letsencrypt --no-random-sleep-on-renew --post-hook "/usr/libexec/vyos/vyos-certbot-renew-pki.sh" diff --git a/src/helpers/vyos-certbot-renew-pki.sh b/src/helpers/vyos-certbot-renew-pki.sh new file mode 100755 index 000000000..d0b663f7b --- /dev/null +++ b/src/helpers/vyos-certbot-renew-pki.sh @@ -0,0 +1,3 @@ +#!/bin/sh +source /opt/vyatta/etc/functions/script-template +/usr/libexec/vyos/conf_mode/pki.py certbot_renew diff --git a/src/op_mode/pki.py b/src/op_mode/pki.py index 6c854afb5..ad2c1ada0 100755 --- a/src/op_mode/pki.py +++ b/src/op_mode/pki.py @@ -1,1080 +1,1088 @@ #!/usr/bin/env python3 # -# Copyright (C) 2021-2023 VyOS maintainers and contributors +# Copyright (C) 2021-2024 VyOS maintainers and contributors # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License version 2 or later as # published by the Free Software Foundation. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. import argparse import ipaddress import os import re import sys import tabulate from cryptography import x509 from cryptography.x509.oid import ExtendedKeyUsageOID from vyos.config import Config +from vyos.config import config_dict_mangle_acme from vyos.pki import encode_certificate, encode_public_key, encode_private_key, encode_dh_parameters from vyos.pki import get_certificate_fingerprint from vyos.pki import create_certificate, create_certificate_request, create_certificate_revocation_list from vyos.pki import create_private_key from vyos.pki import create_dh_parameters from vyos.pki import load_certificate, load_certificate_request, load_private_key from vyos.pki import load_crl, load_dh_parameters, load_public_key from vyos.pki import verify_certificate from vyos.utils.io import ask_input from vyos.utils.io import ask_yes_no from vyos.utils.misc import install_into_config from vyos.utils.process import cmd CERT_REQ_END = '-----END CERTIFICATE REQUEST-----' auth_dir = '/config/auth' # Helper Functions conf = Config() def get_default_values(): # Fetch default x509 values base = ['pki', 'x509', 'default'] x509_defaults = conf.get_config_dict(base, key_mangling=('-', '_'), no_tag_node_value_mangle=True, get_first_key=True, with_recursive_defaults=True) return x509_defaults def get_config_ca_certificate(name=None): # Fetch ca certificates from config base = ['pki', 'ca'] if not conf.exists(base): return False if name: base = base + [name] if not conf.exists(base + ['private', 'key']) or not conf.exists(base + ['certificate']): return False return conf.get_config_dict(base, key_mangling=('-', '_'), get_first_key=True, no_tag_node_value_mangle=True) def get_config_certificate(name=None): # Get certificates from config base = ['pki', 'certificate'] if not conf.exists(base): return False if name: base = base + [name] if not conf.exists(base + ['private', 'key']) or not conf.exists(base + ['certificate']): return False - return conf.get_config_dict(base, key_mangling=('-', '_'), + pki = conf.get_config_dict(base, key_mangling=('-', '_'), get_first_key=True, no_tag_node_value_mangle=True) + if pki: + for certificate in pki: + pki[certificate] = config_dict_mangle_acme(certificate, pki[certificate]) + + return pki def get_certificate_ca(cert, ca_certs): # Find CA certificate for given certificate if not ca_certs: return None for ca_name, ca_dict in ca_certs.items(): if 'certificate' not in ca_dict: continue ca_cert = load_certificate(ca_dict['certificate']) if not ca_cert: continue if verify_certificate(cert, ca_cert): return ca_name return None def get_config_revoked_certificates(): # Fetch revoked certificates from config ca_base = ['pki', 'ca'] cert_base = ['pki', 'certificate'] certs = [] if conf.exists(ca_base): ca_certificates = conf.get_config_dict(ca_base, key_mangling=('-', '_'), get_first_key=True, no_tag_node_value_mangle=True) certs.extend(ca_certificates.values()) if conf.exists(cert_base): certificates = conf.get_config_dict(cert_base, key_mangling=('-', '_'), get_first_key=True, no_tag_node_value_mangle=True) certs.extend(certificates.values()) return [cert_dict for cert_dict in certs if 'revoke' in cert_dict] def get_revoked_by_serial_numbers(serial_numbers=[]): # Return serial numbers of revoked certificates certs_out = [] certs = get_config_certificate() ca_certs = get_config_ca_certificate() if certs: for cert_name, cert_dict in certs.items(): if 'certificate' not in cert_dict: continue cert = load_certificate(cert_dict['certificate']) if cert.serial_number in serial_numbers: certs_out.append(cert_name) if ca_certs: for cert_name, cert_dict in ca_certs.items(): if 'certificate' not in cert_dict: continue cert = load_certificate(cert_dict['certificate']) if cert.serial_number in serial_numbers: certs_out.append(cert_name) return certs_out def install_certificate(name, cert='', private_key=None, key_type=None, key_passphrase=None, is_ca=False): # Show/install conf commands for certificate prefix = 'ca' if is_ca else 'certificate' base = f"pki {prefix} {name}" config_paths = [] if cert: cert_pem = "".join(encode_certificate(cert).strip().split("\n")[1:-1]) config_paths.append(f"{base} certificate '{cert_pem}'") if private_key: key_pem = "".join(encode_private_key(private_key, passphrase=key_passphrase).strip().split("\n")[1:-1]) config_paths.append(f"{base} private key '{key_pem}'") if key_passphrase: config_paths.append(f"{base} private password-protected") install_into_config(conf, config_paths) def install_crl(ca_name, crl): # Show/install conf commands for crl crl_pem = "".join(encode_certificate(crl).strip().split("\n")[1:-1]) install_into_config(conf, [f"pki ca {ca_name} crl '{crl_pem}'"]) def install_dh_parameters(name, params): # Show/install conf commands for dh params dh_pem = "".join(encode_dh_parameters(params).strip().split("\n")[1:-1]) install_into_config(conf, [f"pki dh {name} parameters '{dh_pem}'"]) def install_ssh_key(name, public_key, private_key, passphrase=None): # Show/install conf commands for ssh key key_openssh = encode_public_key(public_key, encoding='OpenSSH', key_format='OpenSSH') username = os.getlogin() type_key_split = key_openssh.split(" ") base = f"system login user {username} authentication public-keys {name}" install_into_config(conf, [ f"{base} key '{type_key_split[1]}'", f"{base} type '{type_key_split[0]}'" ]) print(encode_private_key(private_key, encoding='PEM', key_format='OpenSSH', passphrase=passphrase)) def install_keypair(name, key_type, private_key=None, public_key=None, passphrase=None, prompt=True): # Show/install conf commands for key-pair config_paths = [] if public_key: install_public_key = not prompt or ask_yes_no('Do you want to install the public key?', default=True) public_key_pem = encode_public_key(public_key) if install_public_key: install_public_pem = "".join(public_key_pem.strip().split("\n")[1:-1]) config_paths.append(f"pki key-pair {name} public key '{install_public_pem}'") else: print("Public key:") print(public_key_pem) if private_key: install_private_key = not prompt or ask_yes_no('Do you want to install the private key?', default=True) private_key_pem = encode_private_key(private_key, passphrase=passphrase) if install_private_key: install_private_pem = "".join(private_key_pem.strip().split("\n")[1:-1]) config_paths.append(f"pki key-pair {name} private key '{install_private_pem}'") if passphrase: config_paths.append(f"pki key-pair {name} private password-protected") else: print("Private key:") print(private_key_pem) install_into_config(conf, config_paths) def install_openvpn_key(name, key_data, key_version='1'): config_paths = [ f"pki openvpn shared-secret {name} key '{key_data}'", f"pki openvpn shared-secret {name} version '{key_version}'" ] install_into_config(conf, config_paths) def install_wireguard_key(interface, private_key, public_key): # Show conf commands for installing wireguard key pairs from vyos.ifconfig import Section if Section.section(interface) != 'wireguard': print(f'"{interface}" is not a WireGuard interface name!') exit(1) # Check if we are running in a config session - if yes, we can directly write to the CLI install_into_config(conf, [f"interfaces wireguard {interface} private-key '{private_key}'"]) print(f"Corresponding public-key to use on peer system is: '{public_key}'") def install_wireguard_psk(interface, peer, psk): from vyos.ifconfig import Section if Section.section(interface) != 'wireguard': print(f'"{interface}" is not a WireGuard interface name!') exit(1) # Check if we are running in a config session - if yes, we can directly write to the CLI install_into_config(conf, [f"interfaces wireguard {interface} peer {peer} preshared-key '{psk}'"]) def ask_passphrase(): passphrase = None print("Note: If you plan to use the generated key on this router, do not encrypt the private key.") if ask_yes_no('Do you want to encrypt the private key with a passphrase?'): passphrase = ask_input('Enter passphrase:') return passphrase def write_file(filename, contents): full_path = os.path.join(auth_dir, filename) directory = os.path.dirname(full_path) if not os.path.exists(directory): print('Failed to write file: directory does not exist') return False if os.path.exists(full_path) and not ask_yes_no('Do you want to overwrite the existing file?'): return False with open(full_path, 'w') as f: f.write(contents) print(f'File written to {full_path}') # Generation functions def generate_private_key(): key_type = ask_input('Enter private key type: [rsa, dsa, ec]', default='rsa', valid_responses=['rsa', 'dsa', 'ec']) size_valid = [] size_default = 0 if key_type in ['rsa', 'dsa']: size_default = 2048 size_valid = [512, 1024, 2048, 4096] elif key_type == 'ec': size_default = 256 size_valid = [224, 256, 384, 521] size = ask_input('Enter private key bits:', default=size_default, numeric_only=True, valid_responses=size_valid) return create_private_key(key_type, size), key_type def parse_san_string(san_string): if not san_string: return None output = [] san_split = san_string.strip().split(",") for pair_str in san_split: tag, value = pair_str.strip().split(":", 1) if tag == 'ipv4': output.append(ipaddress.IPv4Address(value)) elif tag == 'ipv6': output.append(ipaddress.IPv6Address(value)) elif tag == 'dns': output.append(value) return output def generate_certificate_request(private_key=None, key_type=None, return_request=False, name=None, install=False, file=False, ask_san=True): if not private_key: private_key, key_type = generate_private_key() default_values = get_default_values() subject = {} subject['country'] = ask_input('Enter country code:', default=default_values['country']) subject['state'] = ask_input('Enter state:', default=default_values['state']) subject['locality'] = ask_input('Enter locality:', default=default_values['locality']) subject['organization'] = ask_input('Enter organization name:', default=default_values['organization']) subject['common_name'] = ask_input('Enter common name:', default='vyos.io') subject_alt_names = None if ask_san and ask_yes_no('Do you want to configure Subject Alternative Names?'): print("Enter alternative names in a comma separate list, example: ipv4:1.1.1.1,ipv6:fe80::1,dns:vyos.net") san_string = ask_input('Enter Subject Alternative Names:') subject_alt_names = parse_san_string(san_string) cert_req = create_certificate_request(subject, private_key, subject_alt_names) if return_request: return cert_req passphrase = ask_passphrase() if not install and not file: print(encode_certificate(cert_req)) print(encode_private_key(private_key, passphrase=passphrase)) return None if install: print("Certificate request:") print(encode_certificate(cert_req) + "\n") install_certificate(name, private_key=private_key, key_type=key_type, key_passphrase=passphrase, is_ca=False) if file: write_file(f'{name}.csr', encode_certificate(cert_req)) write_file(f'{name}.key', encode_private_key(private_key, passphrase=passphrase)) def generate_certificate(cert_req, ca_cert, ca_private_key, is_ca=False, is_sub_ca=False): valid_days = ask_input('Enter how many days certificate will be valid:', default='365' if not is_ca else '1825', numeric_only=True) cert_type = None if not is_ca: cert_type = ask_input('Enter certificate type: (client, server)', default='server', valid_responses=['client', 'server']) return create_certificate(cert_req, ca_cert, ca_private_key, valid_days, cert_type, is_ca, is_sub_ca) def generate_ca_certificate(name, install=False, file=False): private_key, key_type = generate_private_key() cert_req = generate_certificate_request(private_key, key_type, return_request=True, ask_san=False) cert = generate_certificate(cert_req, cert_req, private_key, is_ca=True) passphrase = ask_passphrase() if not install and not file: print(encode_certificate(cert)) print(encode_private_key(private_key, passphrase=passphrase)) return None if install: install_certificate(name, cert, private_key, key_type, key_passphrase=passphrase, is_ca=True) if file: write_file(f'{name}.pem', encode_certificate(cert)) write_file(f'{name}.key', encode_private_key(private_key, passphrase=passphrase)) def generate_ca_certificate_sign(name, ca_name, install=False, file=False): ca_dict = get_config_ca_certificate(ca_name) if not ca_dict: print(f"CA certificate or private key for '{ca_name}' not found") return None ca_cert = load_certificate(ca_dict['certificate']) if not ca_cert: print("Failed to load signing CA certificate, aborting") return None ca_private = ca_dict['private'] ca_private_passphrase = None if 'password_protected' in ca_private: ca_private_passphrase = ask_input('Enter signing CA private key passphrase:') ca_private_key = load_private_key(ca_private['key'], passphrase=ca_private_passphrase) if not ca_private_key: print("Failed to load signing CA private key, aborting") return None private_key = None key_type = None cert_req = None if not ask_yes_no('Do you already have a certificate request?'): private_key, key_type = generate_private_key() cert_req = generate_certificate_request(private_key, key_type, return_request=True, ask_san=False) else: print("Paste certificate request and press enter:") lines = [] curr_line = '' while True: curr_line = input().strip() if not curr_line or curr_line == CERT_REQ_END: break lines.append(curr_line) if not lines: print("Aborted") return None wrap = lines[0].find('-----') < 0 # Only base64 pasted, add the CSR tags for parsing cert_req = load_certificate_request("\n".join(lines), wrap) if not cert_req: print("Invalid certificate request") return None cert = generate_certificate(cert_req, ca_cert, ca_private_key, is_ca=True, is_sub_ca=True) passphrase = ask_passphrase() if not install and not file: print(encode_certificate(cert)) print(encode_private_key(private_key, passphrase=passphrase)) return None if install: install_certificate(name, cert, private_key, key_type, key_passphrase=passphrase, is_ca=True) if file: write_file(f'{name}.pem', encode_certificate(cert)) write_file(f'{name}.key', encode_private_key(private_key, passphrase=passphrase)) def generate_certificate_sign(name, ca_name, install=False, file=False): ca_dict = get_config_ca_certificate(ca_name) if not ca_dict: print(f"CA certificate or private key for '{ca_name}' not found") return None ca_cert = load_certificate(ca_dict['certificate']) if not ca_cert: print("Failed to load CA certificate, aborting") return None ca_private = ca_dict['private'] ca_private_passphrase = None if 'password_protected' in ca_private: ca_private_passphrase = ask_input('Enter CA private key passphrase:') ca_private_key = load_private_key(ca_private['key'], passphrase=ca_private_passphrase) if not ca_private_key: print("Failed to load CA private key, aborting") return None private_key = None key_type = None cert_req = None if not ask_yes_no('Do you already have a certificate request?'): private_key, key_type = generate_private_key() cert_req = generate_certificate_request(private_key, key_type, return_request=True) else: print("Paste certificate request and press enter:") lines = [] curr_line = '' while True: curr_line = input().strip() if not curr_line or curr_line == CERT_REQ_END: break lines.append(curr_line) if not lines: print("Aborted") return None wrap = lines[0].find('-----') < 0 # Only base64 pasted, add the CSR tags for parsing cert_req = load_certificate_request("\n".join(lines), wrap) if not cert_req: print("Invalid certificate request") return None cert = generate_certificate(cert_req, ca_cert, ca_private_key, is_ca=False) passphrase = ask_passphrase() if not install and not file: print(encode_certificate(cert)) print(encode_private_key(private_key, passphrase=passphrase)) return None if install: install_certificate(name, cert, private_key, key_type, key_passphrase=passphrase, is_ca=False) if file: write_file(f'{name}.pem', encode_certificate(cert)) write_file(f'{name}.key', encode_private_key(private_key, passphrase=passphrase)) def generate_certificate_selfsign(name, install=False, file=False): private_key, key_type = generate_private_key() cert_req = generate_certificate_request(private_key, key_type, return_request=True) cert = generate_certificate(cert_req, cert_req, private_key, is_ca=False) passphrase = ask_passphrase() if not install and not file: print(encode_certificate(cert)) print(encode_private_key(private_key, passphrase=passphrase)) return None if install: install_certificate(name, cert, private_key=private_key, key_type=key_type, key_passphrase=passphrase, is_ca=False) if file: write_file(f'{name}.pem', encode_certificate(cert)) write_file(f'{name}.key', encode_private_key(private_key, passphrase=passphrase)) def generate_certificate_revocation_list(ca_name, install=False, file=False): ca_dict = get_config_ca_certificate(ca_name) if not ca_dict: print(f"CA certificate or private key for '{ca_name}' not found") return None ca_cert = load_certificate(ca_dict['certificate']) if not ca_cert: print("Failed to load CA certificate, aborting") return None ca_private = ca_dict['private'] ca_private_passphrase = None if 'password_protected' in ca_private: ca_private_passphrase = ask_input('Enter CA private key passphrase:') ca_private_key = load_private_key(ca_private['key'], passphrase=ca_private_passphrase) if not ca_private_key: print("Failed to load CA private key, aborting") return None revoked_certs = get_config_revoked_certificates() to_revoke = [] for cert_dict in revoked_certs: if 'certificate' not in cert_dict: continue cert_data = cert_dict['certificate'] try: cert = load_certificate(cert_data) if cert.issuer == ca_cert.subject: to_revoke.append(cert.serial_number) except ValueError: continue if not to_revoke: print("No revoked certificates to add to the CRL") return None crl = create_certificate_revocation_list(ca_cert, ca_private_key, to_revoke) if not crl: print("Failed to create CRL") return None if not install and not file: print(encode_certificate(crl)) return None if install: install_crl(ca_name, crl) if file: write_file(f'{name}.crl', encode_certificate(crl)) def generate_ssh_keypair(name, install=False, file=False): private_key, key_type = generate_private_key() public_key = private_key.public_key() passphrase = ask_passphrase() if not install and not file: print(encode_public_key(public_key, encoding='OpenSSH', key_format='OpenSSH')) print("") print(encode_private_key(private_key, encoding='PEM', key_format='OpenSSH', passphrase=passphrase)) return None if install: install_ssh_key(name, public_key, private_key, passphrase) if file: write_file(f'{name}.pem', encode_public_key(public_key, encoding='OpenSSH', key_format='OpenSSH')) write_file(f'{name}.key', encode_private_key(private_key, encoding='PEM', key_format='OpenSSH', passphrase=passphrase)) def generate_dh_parameters(name, install=False, file=False): bits = ask_input('Enter DH parameters key size:', default=2048, numeric_only=True) print("Generating parameters...") dh_params = create_dh_parameters(bits) if not dh_params: print("Failed to create DH parameters") return None if not install and not file: print("DH Parameters:") print(encode_dh_parameters(dh_params)) if install: install_dh_parameters(name, dh_params) if file: write_file(f'{name}.pem', encode_dh_parameters(dh_params)) def generate_keypair(name, install=False, file=False): private_key, key_type = generate_private_key() public_key = private_key.public_key() passphrase = ask_passphrase() if not install and not file: print(encode_public_key(public_key)) print("") print(encode_private_key(private_key, passphrase=passphrase)) return None if install: install_keypair(name, key_type, private_key, public_key, passphrase) if file: write_file(f'{name}.pem', encode_public_key(public_key)) write_file(f'{name}.key', encode_private_key(private_key, passphrase=passphrase)) def generate_openvpn_key(name, install=False, file=False): result = cmd('openvpn --genkey secret /dev/stdout | grep -o "^[^#]*"') if not result: print("Failed to generate OpenVPN key") return None if not install and not file: print(result) return None if install: key_lines = result.split("\n") key_data = "".join(key_lines[1:-1]) # Remove wrapper tags and line endings key_version = '1' version_search = re.search(r'BEGIN OpenVPN Static key V(\d+)', result) # Future-proofing (hopefully) if version_search: key_version = version_search[1] install_openvpn_key(name, key_data, key_version) if file: write_file(f'{name}.key', result) def generate_wireguard_key(interface=None, install=False): private_key = cmd('wg genkey') public_key = cmd('wg pubkey', input=private_key) if interface and install: install_wireguard_key(interface, private_key, public_key) else: print(f'Private key: {private_key}') print(f'Public key: {public_key}', end='\n\n') def generate_wireguard_psk(interface=None, peer=None, install=False): psk = cmd('wg genpsk') if interface and peer and install: install_wireguard_psk(interface, peer, psk) else: print(f'Pre-shared key: {psk}') # Import functions def import_ca_certificate(name, path=None, key_path=None): if path: if not os.path.exists(path): print(f'File not found: {path}') return cert = None with open(path) as f: cert_data = f.read() cert = load_certificate(cert_data, wrap_tags=False) if not cert: print(f'Invalid certificate: {path}') return install_certificate(name, cert, is_ca=True) if key_path: if not os.path.exists(key_path): print(f'File not found: {key_path}') return key = None passphrase = ask_input('Enter private key passphrase: ') or None with open(key_path) as f: key_data = f.read() key = load_private_key(key_data, passphrase=passphrase, wrap_tags=False) if not key: print(f'Invalid private key or passphrase: {path}') return install_certificate(name, private_key=key, is_ca=True) def import_certificate(name, path=None, key_path=None): if path: if not os.path.exists(path): print(f'File not found: {path}') return cert = None with open(path) as f: cert_data = f.read() cert = load_certificate(cert_data, wrap_tags=False) if not cert: print(f'Invalid certificate: {path}') return install_certificate(name, cert, is_ca=False) if key_path: if not os.path.exists(key_path): print(f'File not found: {key_path}') return key = None passphrase = ask_input('Enter private key passphrase: ') or None with open(key_path) as f: key_data = f.read() key = load_private_key(key_data, passphrase=passphrase, wrap_tags=False) if not key: print(f'Invalid private key or passphrase: {path}') return install_certificate(name, private_key=key, is_ca=False) def import_crl(name, path): if not os.path.exists(path): print(f'File not found: {path}') return crl = None with open(path) as f: crl_data = f.read() crl = load_crl(crl_data, wrap_tags=False) if not crl: print(f'Invalid certificate: {path}') return install_crl(name, crl) def import_dh_parameters(name, path): if not os.path.exists(path): print(f'File not found: {path}') return dh = None with open(path) as f: dh_data = f.read() dh = load_dh_parameters(dh_data, wrap_tags=False) if not dh: print(f'Invalid DH parameters: {path}') return install_dh_parameters(name, dh) def import_keypair(name, path=None, key_path=None): if path: if not os.path.exists(path): print(f'File not found: {path}') return key = None with open(path) as f: key_data = f.read() key = load_public_key(key_data, wrap_tags=False) if not key: print(f'Invalid public key: {path}') return install_keypair(name, None, public_key=key, prompt=False) if key_path: if not os.path.exists(key_path): print(f'File not found: {key_path}') return key = None passphrase = ask_input('Enter private key passphrase: ') or None with open(key_path) as f: key_data = f.read() key = load_private_key(key_data, passphrase=passphrase, wrap_tags=False) if not key: print(f'Invalid private key or passphrase: {path}') return install_keypair(name, None, private_key=key, prompt=False) def import_openvpn_secret(name, path): if not os.path.exists(path): print(f'File not found: {path}') return key_data = None key_version = '1' with open(path) as f: key_lines = f.read().split("\n") key_data = "".join(key_lines[1:-1]) # Remove wrapper tags and line endings version_search = re.search(r'BEGIN OpenVPN Static key V(\d+)', key_lines[0]) # Future-proofing (hopefully) if version_search: key_version = version_search[1] install_openvpn_key(name, key_data, key_version) # Show functions def show_certificate_authority(name=None, pem=False): headers = ['Name', 'Subject', 'Issuer CN', 'Issued', 'Expiry', 'Private Key', 'Parent'] data = [] certs = get_config_ca_certificate() if certs: for cert_name, cert_dict in certs.items(): if name and name != cert_name: continue if 'certificate' not in cert_dict: continue cert = load_certificate(cert_dict['certificate']) if name and pem: print(encode_certificate(cert)) return parent_ca_name = get_certificate_ca(cert, certs) cert_issuer_cn = cert.issuer.rfc4514_string().split(",")[0] if not parent_ca_name or parent_ca_name == cert_name: parent_ca_name = 'N/A' if not cert: continue have_private = 'Yes' if 'private' in cert_dict and 'key' in cert_dict['private'] else 'No' data.append([cert_name, cert.subject.rfc4514_string(), cert_issuer_cn, cert.not_valid_before, cert.not_valid_after, have_private, parent_ca_name]) print("Certificate Authorities:") print(tabulate.tabulate(data, headers)) def show_certificate(name=None, pem=False): headers = ['Name', 'Type', 'Subject CN', 'Issuer CN', 'Issued', 'Expiry', 'Revoked', 'Private Key', 'CA Present'] data = [] certs = get_config_certificate() if certs: ca_certs = get_config_ca_certificate() for cert_name, cert_dict in certs.items(): if name and name != cert_name: continue if 'certificate' not in cert_dict: continue cert = load_certificate(cert_dict['certificate']) if not cert: continue if name and pem: print(encode_certificate(cert)) return ca_name = get_certificate_ca(cert, ca_certs) cert_subject_cn = cert.subject.rfc4514_string().split(",")[0] cert_issuer_cn = cert.issuer.rfc4514_string().split(",")[0] cert_type = 'Unknown' try: ext = cert.extensions.get_extension_for_class(x509.ExtendedKeyUsage) if ext and ExtendedKeyUsageOID.SERVER_AUTH in ext.value: cert_type = 'Server' elif ext and ExtendedKeyUsageOID.CLIENT_AUTH in ext.value: cert_type = 'Client' except: pass revoked = 'Yes' if 'revoke' in cert_dict else 'No' have_private = 'Yes' if 'private' in cert_dict and 'key' in cert_dict['private'] else 'No' have_ca = f'Yes ({ca_name})' if ca_name else 'No' data.append([ cert_name, cert_type, cert_subject_cn, cert_issuer_cn, cert.not_valid_before, cert.not_valid_after, revoked, have_private, have_ca]) print("Certificates:") print(tabulate.tabulate(data, headers)) def show_certificate_fingerprint(name, hash): cert = get_config_certificate(name=name) cert = load_certificate(cert['certificate']) print(get_certificate_fingerprint(cert, hash)) def show_crl(name=None, pem=False): headers = ['CA Name', 'Updated', 'Revokes'] data = [] certs = get_config_ca_certificate() if certs: for cert_name, cert_dict in certs.items(): if name and name != cert_name: continue if 'crl' not in cert_dict: continue crls = cert_dict['crl'] if isinstance(crls, str): crls = [crls] for crl_data in cert_dict['crl']: crl = load_crl(crl_data) if not crl: continue if name and pem: print(encode_certificate(crl)) continue certs = get_revoked_by_serial_numbers([revoked.serial_number for revoked in crl]) data.append([cert_name, crl.last_update, ", ".join(certs)]) if name and pem: return print("Certificate Revocation Lists:") print(tabulate.tabulate(data, headers)) if __name__ == '__main__': parser = argparse.ArgumentParser() parser.add_argument('--action', help='PKI action', required=True) # X509 parser.add_argument('--ca', help='Certificate Authority', required=False) parser.add_argument('--certificate', help='Certificate', required=False) parser.add_argument('--crl', help='Certificate Revocation List', required=False) parser.add_argument('--sign', help='Sign certificate with specified CA', required=False) parser.add_argument('--self-sign', help='Self-sign the certificate', action='store_true') parser.add_argument('--pem', help='Output using PEM encoding', action='store_true') parser.add_argument('--fingerprint', help='Show fingerprint and exit', action='store') # SSH parser.add_argument('--ssh', help='SSH Key', required=False) # DH parser.add_argument('--dh', help='DH Parameters', required=False) # Key pair parser.add_argument('--keypair', help='Key pair', required=False) # OpenVPN parser.add_argument('--openvpn', help='OpenVPN TLS key', required=False) # WireGuard parser.add_argument('--wireguard', help='Wireguard', action='store_true') group = parser.add_mutually_exclusive_group() group.add_argument('--key', help='Wireguard key pair', action='store_true', required=False) group.add_argument('--psk', help='Wireguard pre shared key', action='store_true', required=False) parser.add_argument('--interface', help='Install generated keys into running-config for named interface', action='store') parser.add_argument('--peer', help='Install generated keys into running-config for peer', action='store') # Global parser.add_argument('--file', help='Write generated keys into specified filename', action='store_true') parser.add_argument('--install', help='Install generated keys into running-config', action='store_true') parser.add_argument('--filename', help='Write certificate into specified filename', action='store') parser.add_argument('--key-filename', help='Write key into specified filename', action='store') args = parser.parse_args() try: if args.action == 'generate': if args.ca: if args.sign: generate_ca_certificate_sign(args.ca, args.sign, install=args.install, file=args.file) else: generate_ca_certificate(args.ca, install=args.install, file=args.file) elif args.certificate: if args.sign: generate_certificate_sign(args.certificate, args.sign, install=args.install, file=args.file) elif args.self_sign: generate_certificate_selfsign(args.certificate, install=args.install, file=args.file) else: generate_certificate_request(name=args.certificate, install=args.install, file=args.file) elif args.crl: generate_certificate_revocation_list(args.crl, install=args.install, file=args.file) elif args.ssh: generate_ssh_keypair(args.ssh, install=args.install, file=args.file) elif args.dh: generate_dh_parameters(args.dh, install=args.install, file=args.file) elif args.keypair: generate_keypair(args.keypair, install=args.install, file=args.file) elif args.openvpn: generate_openvpn_key(args.openvpn, install=args.install, file=args.file) elif args.wireguard: # WireGuard supports writing key directly into the CLI, but this # requires the vyos_libexec_dir environment variable to be set os.environ["vyos_libexec_dir"] = "/usr/libexec/vyos" if args.key: generate_wireguard_key(args.interface, install=args.install) if args.psk: generate_wireguard_psk(args.interface, peer=args.peer, install=args.install) elif args.action == 'import': if args.ca: import_ca_certificate(args.ca, path=args.filename, key_path=args.key_filename) elif args.certificate: import_certificate(args.certificate, path=args.filename, key_path=args.key_filename) elif args.crl: import_crl(args.crl, args.filename) elif args.dh: import_dh_parameters(args.dh, args.filename) elif args.keypair: import_keypair(args.keypair, path=args.filename, key_path=args.key_filename) elif args.openvpn: import_openvpn_secret(args.openvpn, args.filename) elif args.action == 'show': if args.ca: ca_name = None if args.ca == 'all' else args.ca if ca_name: if not conf.exists(['pki', 'ca', ca_name]): print(f'CA "{ca_name}" does not exist!') exit(1) show_certificate_authority(ca_name, args.pem) elif args.certificate: cert_name = None if args.certificate == 'all' else args.certificate if cert_name: if not conf.exists(['pki', 'certificate', cert_name]): print(f'Certificate "{cert_name}" does not exist!') exit(1) if args.fingerprint is None: show_certificate(None if args.certificate == 'all' else args.certificate, args.pem) else: show_certificate_fingerprint(args.certificate, args.fingerprint) elif args.crl: show_crl(None if args.crl == 'all' else args.crl, args.pem) else: show_certificate_authority() + print('\n') show_certificate() + print('\n') show_crl() except KeyboardInterrupt: print("Aborted") sys.exit(0)