diff --git a/interface-definitions/interfaces-wireguard.xml.in b/interface-definitions/interfaces-wireguard.xml.in index 378251fed..773bde09c 100644 --- a/interface-definitions/interfaces-wireguard.xml.in +++ b/interface-definitions/interfaces-wireguard.xml.in @@ -1,135 +1,135 @@ <?xml version="1.0"?> <interfaceDefinition> <node name="interfaces"> <children> <tagNode name="wireguard" owner="${vyos_conf_scripts_dir}/interfaces-wireguard.py"> <properties> <help>WireGuard Interface</help> <priority>459</priority> <constraint> <regex>^wg[0-9]+$</regex> </constraint> <constraintErrorMessage>WireGuard interface must be named wgN</constraintErrorMessage> <valueHelp> <format>wgN</format> <description>WireGuard interface name</description> </valueHelp> </properties> <children> #include <include/interface/address-ipv4-ipv6.xml.i> #include <include/interface/interface-description.xml.i> #include <include/interface/interface-disable.xml.i> #include <include/interface/interface-vrf.xml.i> #include <include/port-number.xml.i> #include <include/interface/interface-mtu-68-16000.xml.i> <leafNode name="mtu"> <defaultValue>1420</defaultValue> </leafNode> #include <include/interface/interface-ipv4-options.xml.i> #include <include/interface/interface-ipv6-options.xml.i> <leafNode name="fwmark"> <properties> <help>A 32-bit fwmark value set on all outgoing packets</help> <valueHelp> <format>number</format> <description>value which marks the packet for QoS/shaper</description> </valueHelp> <constraint> <validator name="numeric" argument="--range 0-4294967295"/> </constraint> </properties> <defaultValue>0</defaultValue> </leafNode> <leafNode name="private-key"> <properties> - <help>Private key to use on that interface</help> - <completionHelp> - <script>${vyos_op_scripts_dir}/wireguard.py --listkdir</script> - </completionHelp> + <help>Base64 encoded private key</help> + <constraint> + <regex>[0-9a-zA-Z\+/]{43}=$</regex> + </constraint> + <constraintErrorMessage>Key is not valid 44-character (32-bytes) base64</constraintErrorMessage> </properties> - <defaultValue>default</defaultValue> </leafNode> <tagNode name="peer"> <properties> <help>peer alias</help> <constraint> <regex>[^ ]{1,100}$</regex> </constraint> <constraintErrorMessage>peer alias too long (limit 100 characters)</constraintErrorMessage> </properties> <children> #include <include/generic-disable-node.xml.i> - <leafNode name="pubkey"> + <leafNode name="public-key"> <properties> <help>base64 encoded public key</help> <constraint> <regex>[0-9a-zA-Z\+/]{43}=$</regex> </constraint> <constraintErrorMessage>Key is not valid 44-character (32-bytes) base64</constraintErrorMessage> </properties> </leafNode> <leafNode name="preshared-key"> <properties> <help>base64 encoded preshared key</help> <constraint> <regex>[0-9a-zA-Z\+/]{43}=$</regex> </constraint> <constraintErrorMessage>Key is not valid 44-character (32-bytes) base64</constraintErrorMessage> </properties> </leafNode> <leafNode name="allowed-ips"> <properties> <help>IP addresses allowed to traverse the peer</help> <constraint> <validator name="ip-prefix"/> </constraint> <multi/> </properties> </leafNode> <leafNode name="address"> <properties> <help>IP address of tunnel endpoint</help> <valueHelp> <format>ipv4</format> <description>IPv4 address of remote tunnel endpoint</description> </valueHelp> <valueHelp> <format>ipv6</format> <description>IPv6 address of remote tunnel endpoint</description> </valueHelp> <constraint> <validator name="ip-address"/> </constraint> </properties> </leafNode> <leafNode name="port"> <properties> <help>Port number used for tunnel endpoint</help> <valueHelp> <format>u32:1-65535</format> <description>Numeric IP port</description> </valueHelp> <constraint> <validator name="numeric" argument="--range 1-65535"/> </constraint> </properties> </leafNode> <leafNode name="persistent-keepalive"> <properties> <help>Interval to send keepalive messages</help> <valueHelp> <format>1-65535</format> <description>Interval in seconds</description> </valueHelp> <constraint> <validator name="numeric" argument="--range 1-65535"/> </constraint> </properties> </leafNode> </children> </tagNode> </children> </tagNode> </children> </node> </interfaceDefinition> diff --git a/python/vyos/ifconfig/wireguard.py b/python/vyos/ifconfig/wireguard.py index e5b9c4408..c4cf2fbbf 100644 --- a/python/vyos/ifconfig/wireguard.py +++ b/python/vyos/ifconfig/wireguard.py @@ -1,249 +1,253 @@ # Copyright 2019-2021 VyOS maintainers and contributors <maintainers@vyos.io> # # This library is free software; you can redistribute it and/or # modify it under the terms of the GNU Lesser General Public # License as published by the Free Software Foundation; either # version 2.1 of the License, or (at your option) any later version. # # This library is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU # Lesser General Public License for more details. # # You should have received a copy of the GNU Lesser General Public # License along with this library. If not, see <http://www.gnu.org/licenses/>. import os import time from datetime import timedelta from netaddr import EUI from netaddr import mac_unix_expanded from random import getrandbits from hurry.filesize import size from hurry.filesize import alternative from vyos.config import Config from vyos.ifconfig import Interface from vyos.ifconfig import Operational from vyos.template import is_ipv6 class WireGuardOperational(Operational): def _dump(self): """Dump wireguard data in a python friendly way.""" last_device = None output = {} # Dump wireguard connection data _f = self._cmd('wg show all dump') for line in _f.split('\n'): if not line: # Skip empty lines and last line continue items = line.split('\t') if last_device != items[0]: # We are currently entering a new node device, private_key, public_key, listen_port, fw_mark = items last_device = device output[device] = { 'private_key': None if private_key == '(none)' else private_key, 'public_key': None if public_key == '(none)' else public_key, 'listen_port': int(listen_port), 'fw_mark': None if fw_mark == 'off' else int(fw_mark), 'peers': {}, } else: # We are entering a peer device, public_key, preshared_key, endpoint, allowed_ips, latest_handshake, transfer_rx, transfer_tx, persistent_keepalive = items if allowed_ips == '(none)': allowed_ips = [] else: allowed_ips = allowed_ips.split('\t') output[device]['peers'][public_key] = { 'preshared_key': None if preshared_key == '(none)' else preshared_key, 'endpoint': None if endpoint == '(none)' else endpoint, 'allowed_ips': allowed_ips, 'latest_handshake': None if latest_handshake == '0' else int(latest_handshake), 'transfer_rx': int(transfer_rx), 'transfer_tx': int(transfer_tx), 'persistent_keepalive': None if persistent_keepalive == 'off' else int(persistent_keepalive), } return output def show_interface(self): wgdump = self._dump().get(self.config['ifname'], None) c = Config() c.set_level(["interfaces", "wireguard", self.config['ifname']]) description = c.return_effective_value(["description"]) ips = c.return_effective_values(["address"]) answer = "interface: {}\n".format(self.config['ifname']) if (description): answer += " description: {}\n".format(description) if (ips): answer += " address: {}\n".format(", ".join(ips)) answer += " public key: {}\n".format(wgdump['public_key']) answer += " private key: (hidden)\n" answer += " listening port: {}\n".format(wgdump['listen_port']) answer += "\n" for peer in c.list_effective_nodes(["peer"]): if wgdump['peers']: - pubkey = c.return_effective_value(["peer", peer, "pubkey"]) + pubkey = c.return_effective_value(["peer", peer, "public_key"]) if pubkey in wgdump['peers']: wgpeer = wgdump['peers'][pubkey] answer += " peer: {}\n".format(peer) answer += " public key: {}\n".format(pubkey) """ figure out if the tunnel is recently active or not """ status = "inactive" if (wgpeer['latest_handshake'] is None): """ no handshake ever """ status = "inactive" else: if int(wgpeer['latest_handshake']) > 0: delta = timedelta(seconds=int( time.time() - wgpeer['latest_handshake'])) answer += " latest handshake: {}\n".format(delta) if (time.time() - int(wgpeer['latest_handshake']) < (60*5)): """ Five minutes and the tunnel is still active """ status = "active" else: """ it's been longer than 5 minutes """ status = "inactive" elif int(wgpeer['latest_handshake']) == 0: """ no handshake ever """ status = "inactive" answer += " status: {}\n".format(status) if wgpeer['endpoint'] is not None: answer += " endpoint: {}\n".format(wgpeer['endpoint']) if wgpeer['allowed_ips'] is not None: answer += " allowed ips: {}\n".format( ",".join(wgpeer['allowed_ips']).replace(",", ", ")) if wgpeer['transfer_rx'] > 0 or wgpeer['transfer_tx'] > 0: rx_size = size( wgpeer['transfer_rx'], system=alternative) tx_size = size( wgpeer['transfer_tx'], system=alternative) answer += " transfer: {} received, {} sent\n".format( rx_size, tx_size) if wgpeer['persistent_keepalive'] is not None: answer += " persistent keepalive: every {} seconds\n".format( wgpeer['persistent_keepalive']) answer += '\n' return answer + super().formated_stats() @Interface.register class WireGuardIf(Interface): OperationalClass = WireGuardOperational iftype = 'wireguard' definition = { **Interface.definition, **{ 'section': 'wireguard', 'prefixes': ['wg', ], 'bridgeable': False, } } def get_mac(self): """ Get current interface MAC (Media Access Contrl) address used. NOTE: Tunnel interfaces have no "MAC" address by default. The content of the 'address' file in /sys/class/net/device contains the local-ip thus we generate a random MAC address instead Example: >>> from vyos.ifconfig import Interface >>> Interface('eth0').get_mac() '00:50:ab:cd:ef:00' """ # we choose 40 random bytes for the MAC address, this gives # us e.g. EUI('00-EA-EE-D6-A3-C8') or EUI('00-41-B9-0D-F2-2A') tmp = EUI(getrandbits(48)).value # set locally administered bit in MAC address tmp |= 0xf20000000000 # convert integer to "real" MAC address representation mac = EUI(hex(tmp).split('x')[-1]) # change dialect to use : as delimiter instead of - mac.dialect = mac_unix_expanded return str(mac) def update(self, config): """ General helper function which works on a dictionary retrived by get_config_dict(). It's main intention is to consolidate the scattered interface setup code and provide a single point of entry when workin on any interface. """ # remove no longer associated peers first if 'peer_remove' in config: for tmp in config['peer_remove']: peer = config['peer_remove'][tmp] peer['ifname'] = config['ifname'] - cmd = 'wg set {ifname} peer {pubkey} remove' + cmd = 'wg set {ifname} peer {public_key} remove' self._cmd(cmd.format(**peer)) + config['private_key_file'] = '/tmp/tmp.wireguard.key' + with open(config['private_key_file'], 'w') as f: + f.write(config['private_key']) + # Wireguard base command is identical for every peer - base_cmd = 'wg set {ifname} private-key {private_key}' + base_cmd = 'wg set {ifname} private-key {private_key_file}' if 'port' in config: base_cmd += ' listen-port {port}' if 'fwmark' in config: base_cmd += ' fwmark {fwmark}' base_cmd = base_cmd.format(**config) for tmp in config['peer']: peer = config['peer'][tmp] # start of with a fresh 'wg' command - cmd = base_cmd + ' peer {pubkey}' + cmd = base_cmd + ' peer {public_key}' # If no PSK is given remove it by using /dev/null - passing keys via # the shell (usually bash) is considered insecure, thus we use a file no_psk_file = '/dev/null' psk_file = no_psk_file if 'preshared_key' in peer: psk_file = '/tmp/tmp.wireguard.psk' with open(psk_file, 'w') as f: f.write(peer['preshared_key']) cmd += f' preshared-key {psk_file}' # Persistent keepalive is optional if 'persistent_keepalive'in peer: cmd += ' persistent-keepalive {persistent_keepalive}' # Multiple allowed-ip ranges can be defined - ensure we are always # dealing with a list if isinstance(peer['allowed_ips'], str): peer['allowed_ips'] = [peer['allowed_ips']] cmd += ' allowed-ips ' + ','.join(peer['allowed_ips']) # Endpoint configuration is optional if {'address', 'port'} <= set(peer): if is_ipv6(peer['address']): cmd += ' endpoint [{address}]:{port}' else: cmd += ' endpoint {address}:{port}' self._cmd(cmd.format(**peer)) # PSK key file is not required to be stored persistently as its backed by CLI if psk_file != no_psk_file and os.path.exists(psk_file): os.remove(psk_file) # call base class super().update(config) diff --git a/smoketest/scripts/cli/test_interfaces_wireguard.py b/smoketest/scripts/cli/test_interfaces_wireguard.py index d31ec0332..3707eaac3 100755 --- a/smoketest/scripts/cli/test_interfaces_wireguard.py +++ b/smoketest/scripts/cli/test_interfaces_wireguard.py @@ -1,96 +1,96 @@ #!/usr/bin/env python3 # # Copyright (C) 2020-2021 VyOS maintainers and contributors # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License version 2 or later as # published by the Free Software Foundation. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. import os import unittest from base_vyostest_shim import VyOSUnitTestSHIM from vyos.configsession import ConfigSession from vyos.configsession import ConfigSessionError - -# Generate WireGuard default keypair -if not os.path.isdir('/config/auth/wireguard/default'): - os.system('sudo /usr/libexec/vyos/op_mode/wireguard.py --genkey') - base_path = ['interfaces', 'wireguard'] class WireGuardInterfaceTest(VyOSUnitTestSHIM.TestCase): def setUp(self): self._test_addr = ['192.0.2.1/26', '192.0.2.255/31', '192.0.2.64/32', '2001:db8:1::ffff/64', '2001:db8:101::1/112'] self._interfaces = ['wg0', 'wg1'] def tearDown(self): self.cli_delete(base_path) self.cli_commit() def test_wireguard_peer(self): # Create WireGuard interfaces with associated peers for intf in self._interfaces: peer = 'foo-' + intf + privkey = '6ISOkASm6VhHOOSz/5iIxw+Q9adq9zA17iMM4X40dlc=' psk = 'u2xdA70hkz0S1CG0dZlOh0aq2orwFXRIVrKo4DCvHgM=' pubkey = 'n6ZZL7ph/QJUJSUUTyu19c77my1dRCDHkMzFQUO9Z3A=' for addr in self._test_addr: self.cli_set(base_path + [intf, 'address', addr]) + self.cli_set(base_path + [intf, 'private-key', privkey]) + self.cli_set(base_path + [intf, 'peer', peer, 'address', '127.0.0.1']) self.cli_set(base_path + [intf, 'peer', peer, 'port', '1337']) # Allow different prefixes to traverse the tunnel allowed_ips = ['10.0.0.0/8', '172.16.0.0/12', '192.168.0.0/16'] for ip in allowed_ips: self.cli_set(base_path + [intf, 'peer', peer, 'allowed-ips', ip]) self.cli_set(base_path + [intf, 'peer', peer, 'preshared-key', psk]) - self.cli_set(base_path + [intf, 'peer', peer, 'pubkey', pubkey]) + self.cli_set(base_path + [intf, 'peer', peer, 'public-key', pubkey]) self.cli_commit() self.assertTrue(os.path.isdir(f'/sys/class/net/{intf}')) def test_wireguard_add_remove_peer(self): # T2939: Create WireGuard interfaces with associated peers. # Remove one of the configured peers. interface = 'wg0' port = '12345' + privkey = '6ISOkASm6VhHOOSz/5iIxw+Q9adq9zA17iMM4X40dlc=' pubkey_1 = 'n1CUsmR0M2LUUsyicBd6blZICwUqqWWHbu4ifZ2/9gk=' pubkey_2 = 'ebFx/1G0ti8tvuZd94sEIosAZZIznX+dBAKG/8DFm0I=' self.cli_set(base_path + [interface, 'address', '172.16.0.1/24']) + self.cli_set(base_path + [interface, 'private-key', privkey]) - self.cli_set(base_path + [interface, 'peer', 'PEER01', 'pubkey', pubkey_1]) + self.cli_set(base_path + [interface, 'peer', 'PEER01', 'public-key', pubkey_1]) self.cli_set(base_path + [interface, 'peer', 'PEER01', 'port', port]) self.cli_set(base_path + [interface, 'peer', 'PEER01', 'allowed-ips', '10.205.212.10/32']) self.cli_set(base_path + [interface, 'peer', 'PEER01', 'address', '192.0.2.1']) - self.cli_set(base_path + [interface, 'peer', 'PEER02', 'pubkey', pubkey_2]) + self.cli_set(base_path + [interface, 'peer', 'PEER02', 'public-key', pubkey_2]) self.cli_set(base_path + [interface, 'peer', 'PEER02', 'port', port]) self.cli_set(base_path + [interface, 'peer', 'PEER02', 'allowed-ips', '10.205.212.11/32']) self.cli_set(base_path + [interface, 'peer', 'PEER02', 'address', '192.0.2.2']) # Commit peers self.cli_commit() self.assertTrue(os.path.isdir(f'/sys/class/net/{interface}')) # Delete second peer self.cli_delete(base_path + [interface, 'peer', 'PEER01']) self.cli_commit() if __name__ == '__main__': unittest.main(verbosity=2) diff --git a/src/conf_mode/interfaces-wireguard.py b/src/conf_mode/interfaces-wireguard.py index 024ab8f59..4c566a5ad 100755 --- a/src/conf_mode/interfaces-wireguard.py +++ b/src/conf_mode/interfaces-wireguard.py @@ -1,111 +1,107 @@ #!/usr/bin/env python3 # # Copyright (C) 2018-2020 VyOS maintainers and contributors # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License version 2 or later as # published by the Free Software Foundation. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. import os from sys import exit from copy import deepcopy from vyos.config import Config from vyos.configdict import dict_merge from vyos.configdict import get_interface_dict from vyos.configdict import node_changed from vyos.configdict import leaf_node_changed from vyos.configverify import verify_vrf from vyos.configverify import verify_address from vyos.configverify import verify_bridge_delete from vyos.configverify import verify_mtu_ipv6 from vyos.ifconfig import WireGuardIf from vyos.util import check_kmod from vyos import ConfigError from vyos import airbag airbag.enable() def get_config(config=None): """ Retrive CLI config as dictionary. Dictionary can never be empty, as at least the interface name will be added or a deleted flag """ if config: conf = config else: conf = Config() base = ['interfaces', 'wireguard'] wireguard = get_interface_dict(conf, base) - # Mangle private key - it has a default so its always valid - wireguard['private_key'] = '/config/auth/wireguard/{private_key}/private.key'.format(**wireguard) - # Determine which Wireguard peer has been removed. # Peers can only be removed with their public key! dict = {} tmp = node_changed(conf, ['peer'], key_mangling=('-', '_')) for peer in (tmp or []): - pubkey = leaf_node_changed(conf, ['peer', peer, 'pubkey']) - if pubkey: - dict = dict_merge({'peer_remove' : {peer : {'pubkey' : pubkey[0]}}}, dict) + public_key = leaf_node_changed(conf, ['peer', peer, 'public_key']) + if public_key: + dict = dict_merge({'peer_remove' : {peer : {'public_key' : public_key[0]}}}, dict) wireguard.update(dict) return wireguard def verify(wireguard): if 'deleted' in wireguard: verify_bridge_delete(wireguard) return None verify_mtu_ipv6(wireguard) verify_address(wireguard) verify_vrf(wireguard) - if not os.path.exists(wireguard['private_key']): - raise ConfigError('Wireguard private-key not found! Execute: ' \ - '"run generate wireguard [default-keypair|named-keypairs]"') + if 'private_key' not in wireguard: + raise ConfigError('Wireguard private-key not defined') if 'peer' not in wireguard: raise ConfigError('At least one Wireguard peer is required!') # run checks on individual configured WireGuard peer for tmp in wireguard['peer']: peer = wireguard['peer'][tmp] if 'allowed_ips' not in peer: raise ConfigError(f'Wireguard allowed-ips required for peer "{tmp}"!') - if 'pubkey' not in peer: + if 'public_key' not in peer: raise ConfigError(f'Wireguard public-key required for peer "{tmp}"!') if ('address' in peer and 'port' not in peer) or ('port' in peer and 'address' not in peer): raise ConfigError('Both Wireguard port and address must be defined ' f'for peer "{tmp}" if either one of them is set!') def apply(wireguard): tmp = WireGuardIf(wireguard['ifname']) if 'deleted' in wireguard: tmp.remove() return None tmp.update(wireguard) return None if __name__ == '__main__': try: check_kmod('wireguard') c = get_config() verify(c) apply(c) except ConfigError as e: print(e) exit(1) diff --git a/src/migration-scripts/interfaces/22-to-23 b/src/migration-scripts/interfaces/22-to-23 new file mode 100755 index 000000000..c52a26908 --- /dev/null +++ b/src/migration-scripts/interfaces/22-to-23 @@ -0,0 +1,66 @@ +#!/usr/bin/env python3 +# +# Copyright (C) 2021 VyOS maintainers and contributors +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License version 2 or later as +# published by the Free Software Foundation. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + +# A VTI interface also requires an IPSec configuration - VyOS 1.2 supported +# having a VTI interface in the CLI but no IPSec configuration - drop VTI +# configuration if this is the case for VyOS 1.4 + +import os +import sys +from vyos.configtree import ConfigTree + +if __name__ == '__main__': + if (len(sys.argv) < 1): + print("Must specify file name!") + sys.exit(1) + + file_name = sys.argv[1] + + with open(file_name, 'r') as f: + config_file = f.read() + + config = ConfigTree(config_file) + base = ['interfaces', 'wireguard'] + if not config.exists(base): + # Nothing to do + sys.exit(0) + + for interface in config.list_nodes(base): + private_key_path = base + [interface, 'private-key'] + + key_file = 'default' + if config.exists(private_key_path): + key_file = config.return_value(private_key_path) + + full_key_path = f'/config/auth/wireguard/{key_file}/private.key' + + if not os.path.exists(full_key_path): + print(f'Could not find wireguard private key for migration on interface "{interface}"') + continue + + with open(full_key_path, 'r') as f: + key_data = f.read().strip() + config.set(private_key_path, value=key_data) + + for peer in config.list_nodes(base + [interface, 'peer']): + config.rename(base + [interface, 'peer', peer, 'pubkey'], 'public-key') + + try: + with open(file_name, 'w') as f: + f.write(config.to_string()) + except OSError as e: + print("Failed to save the modified config: {}".format(e)) + sys.exit(1) diff --git a/src/op_mode/pki.py b/src/op_mode/pki.py index 7dbeb4097..b4a68b31c 100755 --- a/src/op_mode/pki.py +++ b/src/op_mode/pki.py @@ -1,770 +1,770 @@ #!/usr/bin/env python3 # # Copyright (C) 2021 VyOS maintainers and contributors # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License version 2 or later as # published by the Free Software Foundation. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. import argparse import ipaddress import os import re import sys import tabulate from cryptography import x509 from cryptography.x509.oid import ExtendedKeyUsageOID from vyos.config import Config from vyos.configdict import dict_merge from vyos.pki import encode_certificate, encode_public_key, encode_private_key, encode_dh_parameters from vyos.pki import create_certificate, create_certificate_request, create_certificate_revocation_list from vyos.pki import create_private_key from vyos.pki import create_dh_parameters from vyos.pki import load_certificate, load_certificate_request, load_private_key, load_crl from vyos.pki import verify_certificate from vyos.xml import defaults from vyos.util import ask_input, ask_yes_no from vyos.util import cmd CERT_REQ_END = '-----END CERTIFICATE REQUEST-----' # Helper Functions def get_default_values(): # Fetch default x509 values conf = Config() base = ['pki', 'x509', 'default'] x509_defaults = conf.get_config_dict(base, key_mangling=('-', '_'), get_first_key=True, no_tag_node_value_mangle=True) default_values = defaults(base) return dict_merge(default_values, x509_defaults) def get_config_ca_certificate(name=None): # Fetch ca certificates from config conf = Config() base = ['pki', 'ca'] if not conf.exists(base): return False if name: base = base + [name] if not conf.exists(base + ['private', 'key']) or not conf.exists(base + ['certificate']): return False return conf.get_config_dict(base, key_mangling=('-', '_'), get_first_key=True, no_tag_node_value_mangle=True) def get_config_certificate(name=None): # Get certificates from config conf = Config() base = ['pki', 'certificate'] if not conf.exists(base): return False if name: base = base + [name] if not conf.exists(base + ['private', 'key']) or not conf.exists(base + ['certificate']): return False return conf.get_config_dict(base, key_mangling=('-', '_'), get_first_key=True, no_tag_node_value_mangle=True) def get_certificate_ca(cert, ca_certs): # Find CA certificate for given certificate for ca_name, ca_dict in ca_certs.items(): if 'certificate' not in ca_dict: continue ca_cert = load_certificate(ca_dict['certificate']) if not ca_cert: continue if verify_certificate(cert, ca_cert): return ca_name return None def get_config_revoked_certificates(): # Fetch revoked certificates from config conf = Config() ca_base = ['pki', 'ca'] cert_base = ['pki', 'certificate'] certs = [] if conf.exists(ca_base): ca_certificates = conf.get_config_dict(ca_base, key_mangling=('-', '_'), get_first_key=True, no_tag_node_value_mangle=True) certs.extend(ca_certificates.values()) if conf.exists(cert_base): certificates = conf.get_config_dict(cert_base, key_mangling=('-', '_'), get_first_key=True, no_tag_node_value_mangle=True) certs.extend(certificates.values()) return [cert_dict for cert_dict in certs if 'revoke' in cert_dict] def get_revoked_by_serial_numbers(serial_numbers=[]): # Return serial numbers of revoked certificates certs_out = [] certs = get_config_certificate() ca_certs = get_config_ca_certificate() if certs: for cert_name, cert_dict in certs.items(): if 'certificate' not in cert_dict: continue cert = load_certificate(cert_dict['certificate']) if cert.serial_number in serial_numbers: certs_out.append(cert_name) if ca_certs: for cert_name, cert_dict in ca_certs.items(): if 'certificate' not in cert_dict: continue cert = load_certificate(cert_dict['certificate']) if cert.serial_number in serial_numbers: certs_out.append(cert_name) return certs_out def install_certificate(name, cert='', private_key=None, key_type=None, key_passphrase=None, is_ca=False): # Show conf commands for installing certificate prefix = 'ca' if is_ca else 'certificate' print("Configure mode commands to install:") if cert: cert_pem = "".join(encode_certificate(cert).strip().split("\n")[1:-1]) print("set pki %s %s certificate '%s'" % (prefix, name, cert_pem)) if private_key: key_pem = "".join(encode_private_key(private_key, passphrase=key_passphrase).strip().split("\n")[1:-1]) print("set pki %s %s private key '%s'" % (prefix, name, key_pem)) if key_passphrase: print("set pki %s %s private password-protected" % (prefix, name)) def install_crl(ca_name, crl): # Show conf commands for installing crl print("Configure mode commands to install CRL:") crl_pem = "".join(encode_certificate(crl).strip().split("\n")[1:-1]) print("set pki ca %s crl '%s'" % (ca_name, crl_pem)) def install_dh_parameters(name, params): # Show conf commands for installing dh params print("Configure mode commands to install DH parameters:") dh_pem = "".join(encode_dh_parameters(params).strip().split("\n")[1:-1]) print("set pki dh %s parameters '%s'" % (name, dh_pem)) def install_ssh_key(name, public_key, private_key, passphrase=None): # Show conf commands for installing ssh key key_openssh = encode_public_key(public_key, encoding='OpenSSH', key_format='OpenSSH') username = os.getlogin() type_key_split = key_openssh.split(" ") print("Configure mode commands to install SSH key:") print("set system login user %s authentication public-keys %s key '%s'" % (username, name, type_key_split[1])) print("set system login user %s authentication public-keys %s type '%s'" % (username, name, type_key_split[0])) print("") print(encode_private_key(private_key, encoding='PEM', key_format='OpenSSH', passphrase=passphrase)) def install_keypair(name, key_type, private_key=None, public_key=None, passphrase=None): # Show conf commands for installing key-pair print("Configure mode commands to install key pair:") if public_key: install_public_key = ask_yes_no('Do you want to install the public key?', default=True) public_key_pem = encode_public_key(public_key) if install_public_key: install_public_pem = "".join(public_key_pem.strip().split("\n")[1:-1]) print("set pki key-pair %s public key '%s'" % (name, install_public_pem)) else: print("Public key:") print(public_key_pem) if private_key: install_private_key = ask_yes_no('Do you want to install the private key?', default=True) private_key_pem = encode_private_key(private_key, passphrase=passphrase) if install_private_key: install_private_pem = "".join(private_key_pem.strip().split("\n")[1:-1]) print("set pki key-pair %s private key '%s'" % (name, install_private_pem)) if passphrase: print("set pki key-pair %s private password-protected" % (name,)) else: print("Private key:") print(private_key_pem) def install_wireguard_key(name, private_key, public_key): # Show conf commands for installing wireguard key pairs is_interface = re.match(r'^wg[\d]+$', name) print("Configure mode commands to install key:") if is_interface: print("set interfaces wireguard %s private-key '%s'" % (name, private_key)) print("") print("Public key for use on peer configuration: " + public_key) else: - print("set interfaces wireguard [INTERFACE] peer %s pubkey '%s'" % (name, public_key)) + print("set interfaces wireguard [INTERFACE] peer %s public-key '%s'" % (name, public_key)) print("") print("Private key for use on peer configuration: " + private_key) def install_wireguard_psk(name, psk): # Show conf commands for installing wireguard psk print("set interfaces wireguard [INTERFACE] peer %s preshared-key '%s'" % (name, psk)) def ask_passphrase(): passphrase = None print("Note: If you plan to use the generated key on this router, do not encrypt the private key.") if ask_yes_no('Do you want to encrypt the private key with a passphrase?'): passphrase = ask_input('Enter passphrase:') return passphrase # Generation functions def generate_private_key(): key_type = ask_input('Enter private key type: [rsa, dsa, ec]', default='rsa', valid_responses=['rsa', 'dsa', 'ec']) size_valid = [] size_default = 0 if key_type in ['rsa', 'dsa']: size_default = 2048 size_valid = [512, 1024, 2048, 4096] elif key_type == 'ec': size_default = 256 size_valid = [224, 256, 384, 521] size = ask_input('Enter private key bits:', default=size_default, numeric_only=True, valid_responses=size_valid) return create_private_key(key_type, size), key_type def parse_san_string(san_string): if not san_string: return None output = [] san_split = san_string.strip().split(",") for pair_str in san_split: tag, value = pair_str.strip().split(":", 1) if tag == 'ipv4': output.append(ipaddress.IPv4Address(value)) elif tag == 'ipv6': output.append(ipaddress.IPv6Address(value)) elif tag == 'dns': output.append(value) return output def generate_certificate_request(private_key=None, key_type=None, return_request=False, name=None, install=False, ask_san=True): if not private_key: private_key, key_type = generate_private_key() default_values = get_default_values() subject = {} subject['country'] = ask_input('Enter country code:', default=default_values['country']) subject['state'] = ask_input('Enter state:', default=default_values['state']) subject['locality'] = ask_input('Enter locality:', default=default_values['locality']) subject['organization'] = ask_input('Enter organization name:', default=default_values['organization']) subject['common_name'] = ask_input('Enter common name:', default='vyos.io') subject_alt_names = None if ask_san and ask_yes_no('Do you want to configure Subject Alternative Names?'): print("Enter alternative names in a comma separate list, example: ipv4:1.1.1.1,ipv6:fe80::1,dns:vyos.net") san_string = ask_input('Enter Subject Alternative Names:') subject_alt_names = parse_san_string(san_string) cert_req = create_certificate_request(subject, private_key, subject_alt_names) if return_request: return cert_req passphrase = ask_passphrase() if not install: print(encode_certificate(cert_req)) print(encode_private_key(private_key, passphrase=passphrase)) return None print("Certificate request:") print(encode_certificate(cert_req) + "\n") install_certificate(name, private_key=private_key, key_type=key_type, key_passphrase=passphrase, is_ca=False) def generate_certificate(cert_req, ca_cert, ca_private_key, is_ca=False, is_sub_ca=False): valid_days = ask_input('Enter how many days certificate will be valid:', default='365' if not is_ca else '1825', numeric_only=True) cert_type = None if not is_ca: cert_type = ask_input('Enter certificate type: (client, server)', default='server', valid_responses=['client', 'server']) return create_certificate(cert_req, ca_cert, ca_private_key, valid_days, cert_type, is_ca, is_sub_ca) def generate_ca_certificate(name, install=False): private_key, key_type = generate_private_key() cert_req = generate_certificate_request(private_key, key_type, return_request=True, ask_san=False) cert = generate_certificate(cert_req, cert_req, private_key, is_ca=True) passphrase = ask_passphrase() if not install: print(encode_certificate(cert)) print(encode_private_key(private_key, passphrase=passphrase)) return None install_certificate(name, cert, private_key, key_type, key_passphrase=passphrase, is_ca=True) def generate_ca_certificate_sign(name, ca_name, install=False): ca_dict = get_config_ca_certificate(ca_name) if not ca_dict: print(f"CA certificate or private key for '{ca_name}' not found") return None ca_cert = load_certificate(ca_dict['certificate']) if not ca_cert: print("Failed to load signing CA certificate, aborting") return None ca_private = ca_dict['private'] ca_private_passphrase = None if 'password_protected' in ca_private: ca_private_passphrase = ask_input('Enter signing CA private key passphrase:') ca_private_key = load_private_key(ca_private['key'], passphrase=ca_private_passphrase) if not ca_private_key: print("Failed to load signing CA private key, aborting") return None private_key = None key_type = None cert_req = None if not ask_yes_no('Do you already have a certificate request?'): private_key, key_type = generate_private_key() cert_req = generate_certificate_request(private_key, key_type, return_request=True, ask_san=False) else: print("Paste certificate request and press enter:") lines = [] curr_line = '' while True: curr_line = input().strip() if not curr_line or curr_line == CERT_REQ_END: break lines.append(curr_line) if not lines: print("Aborted") return None wrap = lines[0].find('-----') < 0 # Only base64 pasted, add the CSR tags for parsing cert_req = load_certificate_request("\n".join(lines), wrap) if not cert_req: print("Invalid certificate request") return None cert = generate_certificate(cert_req, ca_cert, ca_private_key, is_ca=True, is_sub_ca=True) passphrase = ask_passphrase() if not install: print(encode_certificate(cert)) print(encode_private_key(private_key, passphrase=passphrase)) return None install_certificate(name, cert, private_key, key_type, key_passphrase=passphrase, is_ca=True) def generate_certificate_sign(name, ca_name, install=False): ca_dict = get_config_ca_certificate(ca_name) if not ca_dict: print(f"CA certificate or private key for '{ca_name}' not found") return None ca_cert = load_certificate(ca_dict['certificate']) if not ca_cert: print("Failed to load CA certificate, aborting") return None ca_private = ca_dict['private'] ca_private_passphrase = None if 'password_protected' in ca_private: ca_private_passphrase = ask_input('Enter CA private key passphrase:') ca_private_key = load_private_key(ca_private['key'], passphrase=ca_private_passphrase) if not ca_private_key: print("Failed to load CA private key, aborting") return None private_key = None key_type = None cert_req = None if not ask_yes_no('Do you already have a certificate request?'): private_key, key_type = generate_private_key() cert_req = generate_certificate_request(private_key, key_type, return_request=True) else: print("Paste certificate request and press enter:") lines = [] curr_line = '' while True: curr_line = input().strip() if not curr_line or curr_line == CERT_REQ_END: break lines.append(curr_line) if not lines: print("Aborted") return None wrap = lines[0].find('-----') < 0 # Only base64 pasted, add the CSR tags for parsing cert_req = load_certificate_request("\n".join(lines), wrap) if not cert_req: print("Invalid certificate request") return None cert = generate_certificate(cert_req, ca_cert, ca_private_key, is_ca=False) passphrase = ask_passphrase() if not install: print(encode_certificate(cert)) print(encode_private_key(private_key, passphrase=passphrase)) return None install_certificate(name, cert, private_key, key_type, key_passphrase=passphrase, is_ca=False) def generate_certificate_selfsign(name, install=False): private_key, key_type = generate_private_key() cert_req = generate_certificate_request(private_key, key_type, return_request=True) cert = generate_certificate(cert_req, cert_req, private_key, is_ca=False) passphrase = ask_passphrase() if not install: print(encode_certificate(cert)) print(encode_private_key(private_key, passphrase=passphrase)) return None install_certificate(name, cert, private_key=private_key, key_type=key_type, key_passphrase=passphrase, is_ca=False) def generate_certificate_revocation_list(ca_name, install=False): ca_dict = get_config_ca_certificate(ca_name) if not ca_dict: print(f"CA certificate or private key for '{ca_name}' not found") return None ca_cert = load_certificate(ca_dict['certificate']) if not ca_cert: print("Failed to load CA certificate, aborting") return None ca_private = ca_dict['private'] ca_private_passphrase = None if 'password_protected' in ca_private: ca_private_passphrase = ask_input('Enter CA private key passphrase:') ca_private_key = load_private_key(ca_private['key'], passphrase=ca_private_passphrase) if not ca_private_key: print("Failed to load CA private key, aborting") return None revoked_certs = get_config_revoked_certificates() to_revoke = [] for cert_dict in revoked_certs: if 'certificate' not in cert_dict: continue cert_data = cert_dict['certificate'] try: cert = load_certificate(cert_data) if cert.issuer == ca_cert.subject: to_revoke.append(cert.serial_number) except ValueError: continue if not to_revoke: print("No revoked certificates to add to the CRL") return None crl = create_certificate_revocation_list(ca_cert, ca_private_key, to_revoke) if not crl: print("Failed to create CRL") return None if not install: print(encode_certificate(crl)) return None install_crl(ca_name, crl) def generate_ssh_keypair(name, install=False): private_key, key_type = generate_private_key() public_key = private_key.public_key() passphrase = ask_passphrase() if not install: print(encode_public_key(public_key, encoding='OpenSSH', key_format='OpenSSH')) print("") print(encode_private_key(private_key, encoding='PEM', key_format='OpenSSH', passphrase=passphrase)) return None install_ssh_key(name, public_key, private_key, passphrase) def generate_dh_parameters(name, install=False): bits = ask_input('Enter DH parameters key size:', default=2048, numeric_only=True) print("Generating parameters...") dh_params = create_dh_parameters(bits) if not dh_params: print("Failed to create DH parameters") return None if not install: print("DH Parameters:") print(encode_dh_parameters(dh_params)) install_dh_parameters(name, dh_params) def generate_keypair(name, install=False): private_key, key_type = generate_private_key() public_key = private_key.public_key() passphrase = ask_passphrase() if not install: print(encode_public_key(public_key)) print("") print(encode_private_key(private_key, passphrase=passphrase)) return None install_keypair(name, key_type, private_key, public_key, passphrase) def generate_openvpn_key(name, install=False): result = cmd('openvpn --genkey secret /dev/stdout | grep -o "^[^#]*"') if not result: print("Failed to generate OpenVPN key") return None if not install: print(result) return None key_lines = result.split("\n") key_data = "".join(key_lines[1:-1]) # Remove wrapper tags and line endings key_version = '1' version_search = re.search(r'BEGIN OpenVPN Static key V(\d+)', result) # Future-proofing (hopefully) if version_search: key_version = version_search[1] print("Configure mode commands to install OpenVPN key:") print("set pki openvpn shared-secret %s key '%s'" % (name, key_data)) print("set pki openvpn shared-secret %s version '%s'" % (name, key_version)) def generate_wireguard_key(name, install=False): private_key = cmd('wg genkey') public_key = cmd('wg pubkey', input=private_key) if not install: print("Private key: " + private_key) print("Public key: " + public_key) return None install_wireguard_key(name, private_key, public_key) def generate_wireguard_psk(name, install=False): psk = cmd('wg genpsk') if not install: print("Pre-shared key:") print(psk) return None install_wireguard_psk(name, psk) # Show functions def show_certificate_authority(name=None): headers = ['Name', 'Subject', 'Issuer CN', 'Issued', 'Expiry', 'Private Key', 'Parent'] data = [] certs = get_config_ca_certificate() if certs: for cert_name, cert_dict in certs.items(): if name and name != cert_name: continue if 'certificate' not in cert_dict: continue cert = load_certificate(cert_dict['certificate']) parent_ca_name = get_certificate_ca(cert, certs) cert_issuer_cn = cert.issuer.rfc4514_string().split(",")[0] if not parent_ca_name or parent_ca_name == cert_name: parent_ca_name = 'N/A' if not cert: continue have_private = 'Yes' if 'private' in cert_dict and 'key' in cert_dict['private'] else 'No' data.append([cert_name, cert.subject.rfc4514_string(), cert_issuer_cn, cert.not_valid_before, cert.not_valid_after, have_private, parent_ca_name]) print("Certificate Authorities:") print(tabulate.tabulate(data, headers)) def show_certificate(name=None): headers = ['Name', 'Type', 'Subject CN', 'Issuer CN', 'Issued', 'Expiry', 'Revoked', 'Private Key', 'CA Present'] data = [] certs = get_config_certificate() if certs: ca_certs = get_config_ca_certificate() for cert_name, cert_dict in certs.items(): if name and name != cert_name: continue if 'certificate' not in cert_dict: continue cert = load_certificate(cert_dict['certificate']) if not cert: continue ca_name = get_certificate_ca(cert, ca_certs) cert_subject_cn = cert.subject.rfc4514_string().split(",")[0] cert_issuer_cn = cert.issuer.rfc4514_string().split(",")[0] cert_type = 'Unknown' ext = cert.extensions.get_extension_for_class(x509.ExtendedKeyUsage) if ext and ExtendedKeyUsageOID.SERVER_AUTH in ext.value: cert_type = 'Server' elif ext and ExtendedKeyUsageOID.CLIENT_AUTH in ext.value: cert_type = 'Client' revoked = 'Yes' if 'revoke' in cert_dict else 'No' have_private = 'Yes' if 'private' in cert_dict and 'key' in cert_dict['private'] else 'No' have_ca = f'Yes ({ca_name})' if ca_name else 'No' data.append([ cert_name, cert_type, cert_subject_cn, cert_issuer_cn, cert.not_valid_before, cert.not_valid_after, revoked, have_private, have_ca]) print("Certificates:") print(tabulate.tabulate(data, headers)) def show_crl(name=None): headers = ['CA Name', 'Updated', 'Revokes'] data = [] certs = get_config_ca_certificate() if certs: for cert_name, cert_dict in certs.items(): if name and name != cert_name: continue if 'crl' not in cert_dict: continue crls = cert_dict['crl'] if isinstance(crls, str): crls = [crls] for crl_data in cert_dict['crl']: crl = load_crl(crl_data) if not crl: continue certs = get_revoked_by_serial_numbers([revoked.serial_number for revoked in crl]) data.append([cert_name, crl.last_update, ", ".join(certs)]) print("Certificate Revocation Lists:") print(tabulate.tabulate(data, headers)) if __name__ == '__main__': parser = argparse.ArgumentParser() parser.add_argument('--action', help='PKI action', required=True) # X509 parser.add_argument('--ca', help='Certificate Authority', required=False) parser.add_argument('--certificate', help='Certificate', required=False) parser.add_argument('--crl', help='Certificate Revocation List', required=False) parser.add_argument('--sign', help='Sign certificate with specified CA', required=False) parser.add_argument('--self-sign', help='Self-sign the certificate', action='store_true') # SSH parser.add_argument('--ssh', help='SSH Key', required=False) # DH parser.add_argument('--dh', help='DH Parameters', required=False) # Key pair parser.add_argument('--keypair', help='Key pair', required=False) # OpenVPN parser.add_argument('--openvpn', help='OpenVPN TLS key', required=False) # Wireguard parser.add_argument('--wireguard', help='Wireguard', action='store_true') parser.add_argument('--key', help='Wireguard key pair', required=False) parser.add_argument('--psk', help='Wireguard pre shared key', required=False) # Global parser.add_argument('--install', help='Install generated keys into running-config', action='store_true') args = parser.parse_args() try: if args.action == 'generate': if args.ca: if args.sign: generate_ca_certificate_sign(args.ca, args.sign, args.install) else: generate_ca_certificate(args.ca, args.install) elif args.certificate: if args.sign: generate_certificate_sign(args.certificate, args.sign, args.install) elif args.self_sign: generate_certificate_selfsign(args.certificate, args.install) else: generate_certificate_request(name=args.certificate, install=args.install) elif args.crl: generate_certificate_revocation_list(args.crl, args.install) elif args.ssh: generate_ssh_keypair(args.ssh, args.install) elif args.dh: generate_dh_parameters(args.dh, args.install) elif args.keypair: generate_keypair(args.keypair, args.install) elif args.openvpn: generate_openvpn_key(args.openvpn, args.install) elif args.wireguard: if args.key: generate_wireguard_key(args.key, args.install) elif args.psk: generate_wireguard_psk(args.psk, args.install) elif args.action == 'show': if args.ca: show_certificate_authority(None if args.ca == 'all' else args.ca) elif args.certificate: show_certificate(None if args.certificate == 'all' else args.certificate) elif args.crl: show_crl(None if args.crl == 'all' else args.crl) else: show_certificate_authority() show_certificate() show_crl() except KeyboardInterrupt: print("Aborted") sys.exit(0) diff --git a/src/op_mode/wireguard_client.py b/src/op_mode/wireguard_client.py index 7a620a01e..7661254da 100755 --- a/src/op_mode/wireguard_client.py +++ b/src/op_mode/wireguard_client.py @@ -1,118 +1,118 @@ #!/usr/bin/env python3 # # Copyright (C) 2021 VyOS maintainers and contributors # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License version 2 or later as # published by the Free Software Foundation. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. import argparse import os from jinja2 import Template from ipaddress import ip_interface from vyos.ifconfig import Section from vyos.template import is_ipv4 from vyos.template import is_ipv6 from vyos.util import cmd from vyos.util import popen if os.geteuid() != 0: exit("You need to have root privileges to run this script.\nPlease try again, this time using 'sudo'. Exiting.") server_config = """WireGuard client configuration for interface: {{ interface }} To enable this configuration on a VyOS router you can use the following commands: === VyOS (server) configurtation === {% for addr in address if address is defined %} set interfaces wireguard {{ interface }} peer {{ name }} allowed-ips '{{ addr }}' {% endfor %} -set interfaces wireguard {{ interface }} peer {{ name }} pubkey '{{ pubkey }}' +set interfaces wireguard {{ interface }} peer {{ name }} public-key '{{ pubkey }}' """ client_config = """ === RoadWarrior (client) configuration === [Interface] PrivateKey = {{ privkey }} {% if address is defined and address|length > 0 %} Address = {{ address | join(', ')}} {% endif %} DNS = 1.1.1.1 [Peer] PublicKey = {{ system_pubkey }} Endpoint = {{ server }}:{{ port }} AllowedIPs = 0.0.0.0/0, ::/0 """ if __name__ == '__main__': parser = argparse.ArgumentParser() parser.add_argument("-n", "--name", type=str, help='WireGuard peer name', required=True) parser.add_argument("-i", "--interface", type=str, help='WireGuard interface the client is connecting to', required=True) parser.add_argument("-s", "--server", type=str, help='WireGuard server IPv4/IPv6 address or FQDN', required=True) parser.add_argument("-a", "--address", type=str, help='WireGuard client IPv4/IPv6 address', action='append') args = parser.parse_args() interface = args.interface if interface not in Section.interfaces('wireguard'): exit(f'WireGuard interface "{interface}" does not exist!') wg_pubkey = cmd(f'wg show {interface} | grep "public key"').split(':')[-1].lstrip() wg_port = cmd(f'wg show {interface} | grep "listening port"').split(':')[-1].lstrip() # Generate WireGuard private key privkey,_ = popen('wg genkey') # Generate public key portion from given private key pubkey,_ = popen('wg pubkey', input=privkey) config = { 'name' : args.name, 'interface' : interface, 'system_pubkey' : wg_pubkey, 'privkey': privkey, 'pubkey' : pubkey, 'server' : args.server, 'port' : wg_port, 'address' : [], } if args.address: v4_addr = 0 v6_addr = 0 for tmp in args.address: try: ip = str(ip_interface(tmp).ip) if is_ipv4(tmp): config['address'].append(f'{ip}/32') v4_addr += 1 elif is_ipv6(tmp): config['address'].append(f'{ip}/128') v6_addr += 1 except: print(tmp) exit('Client IP address invalid!') if (v4_addr > 1) or (v6_addr > 1): exit('Client can only have one IPv4 and one IPv6 address.') # Clear out terminal first print('\x1b[2J\x1b[H') server = Template(server_config, trim_blocks=True).render(config) print(server) client = Template(client_config, trim_blocks=True).render(config) print(client) qrcode,err = popen('qrencode -t ansiutf8', input=client) print(qrcode)