Page MenuHomeVyOS Platform

22-to-23
No OneTemporary

Size
14 KB
Referenced Files
None
Subscribers
None

22-to-23

#!/usr/bin/env python3
#
# Copyright (C) 2021 VyOS maintainers and contributors
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License version 2 or later as
# published by the Free Software Foundation.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
# Migrate Wireguard to store keys in CLI
# Migrate EAPoL to PKI configuration
import os
import sys
from vyos.configtree import ConfigTree
from vyos.pki import load_certificate
from vyos.pki import load_crl
from vyos.pki import load_dh_parameters
from vyos.pki import load_private_key
from vyos.pki import encode_certificate
from vyos.pki import encode_dh_parameters
from vyos.pki import encode_private_key
from vyos.util import run
def wrapped_pem_to_config_value(pem):
out = []
for line in pem.strip().split("\n"):
if not line or line.startswith("-----") or line[0] == '#':
continue
out.append(line)
return "".join(out)
def read_file_for_pki(config_auth_path):
full_path = os.path.join(AUTH_DIR, config_auth_path)
output = None
if os.path.isfile(full_path):
if not os.access(full_path, os.R_OK):
run(f'sudo chmod 644 {full_path}')
with open(full_path, 'r') as f:
output = f.read()
return output
if (len(sys.argv) < 1):
print("Must specify file name!")
sys.exit(1)
file_name = sys.argv[1]
with open(file_name, 'r') as f:
config_file = f.read()
config = ConfigTree(config_file)
AUTH_DIR = '/config/auth'
pki_base = ['pki']
# OpenVPN
base = ['interfaces', 'openvpn']
if config.exists(base):
for interface in config.list_nodes(base):
x509_base = base + [interface, 'tls']
pki_name = f'openvpn_{interface}'
if config.exists(base + [interface, 'shared-secret-key-file']):
if not config.exists(pki_base + ['openvpn', 'shared-secret']):
config.set(pki_base + ['openvpn', 'shared-secret'])
config.set_tag(pki_base + ['openvpn', 'shared-secret'])
key_file = config.return_value(base + [interface, 'shared-secret-key-file'])
key = read_file_for_pki(key_file)
key_pki_name = f'{pki_name}_shared'
if key:
config.set(pki_base + ['openvpn', 'shared-secret', key_pki_name, 'key'], value=wrapped_pem_to_config_value(key))
config.set(pki_base + ['openvpn', 'shared-secret', key_pki_name, 'version'], value='1')
config.set(base + [interface, 'shared-secret-key'], value=key_pki_name)
else:
print(f'Failed to migrate shared-secret-key on openvpn interface {interface}')
config.delete(base + [interface, 'shared-secret-key-file'])
if not config.exists(base + [interface, 'tls']):
continue
if config.exists(base + [interface, 'tls', 'auth-file']):
if not config.exists(pki_base + ['openvpn', 'shared-secret']):
config.set(pki_base + ['openvpn', 'shared-secret'])
config.set_tag(pki_base + ['openvpn', 'shared-secret'])
key_file = config.return_value(base + [interface, 'tls', 'auth-file'])
key = read_file_for_pki(key_file)
key_pki_name = f'{pki_name}_auth'
if key:
config.set(pki_base + ['openvpn', 'shared-secret', key_pki_name, 'key'], value=wrapped_pem_to_config_value(key))
config.set(pki_base + ['openvpn', 'shared-secret', key_pki_name, 'version'], value='1')
config.set(base + [interface, 'tls', 'auth-key'], value=key_pki_name)
else:
print(f'Failed to migrate auth-key on openvpn interface {interface}')
config.delete(base + [interface, 'tls', 'auth-file'])
if config.exists(base + [interface, 'tls', 'crypt-file']):
if not config.exists(pki_base + ['openvpn', 'shared-secret']):
config.set(pki_base + ['openvpn', 'shared-secret'])
config.set_tag(pki_base + ['openvpn', 'shared-secret'])
key_file = config.return_value(base + [interface, 'tls', 'crypt-file'])
key = read_file_for_pki(key_file)
key_pki_name = f'{pki_name}_crypt'
if key:
config.set(pki_base + ['openvpn', 'shared-secret', key_pki_name, 'key'], value=wrapped_pem_to_config_value(key))
config.set(pki_base + ['openvpn', 'shared-secret', key_pki_name, 'version'], value='1')
config.set(base + [interface, 'tls', 'crypt-key'], value=key_pki_name)
else:
print(f'Failed to migrate crypt-key on openvpn interface {interface}')
config.delete(base + [interface, 'tls', 'crypt-file'])
if config.exists(x509_base + ['ca-cert-file']):
if not config.exists(pki_base + ['ca']):
config.set(pki_base + ['ca'])
config.set_tag(pki_base + ['ca'])
cert_file = config.return_value(x509_base + ['ca-cert-file'])
cert_path = os.path.join(AUTH_DIR, cert_file)
cert = None
if os.path.isfile(cert_path):
if not os.access(cert_path, os.R_OK):
run(f'sudo chmod 644 {cert_path}')
with open(cert_path, 'r') as f:
cert_data = f.read()
cert = load_certificate(cert_data, wrap_tags=False)
if cert:
cert_pem = encode_certificate(cert)
config.set(pki_base + ['ca', pki_name, 'certificate'], value=wrapped_pem_to_config_value(cert_pem))
config.set(x509_base + ['ca-certificate'], value=pki_name)
else:
print(f'Failed to migrate CA certificate on openvpn interface {interface}')
config.delete(x509_base + ['ca-cert-file'])
if config.exists(x509_base + ['crl-file']):
if not config.exists(pki_base + ['ca']):
config.set(pki_base + ['ca'])
config.set_tag(pki_base + ['ca'])
crl_file = config.return_value(x509_base + ['crl-file'])
crl_path = os.path.join(AUTH_DIR, crl_file)
crl = None
if os.path.isfile(crl_path):
if not os.access(crl_path, os.R_OK):
run(f'sudo chmod 644 {crl_path}')
with open(crl_path, 'r') as f:
crl_data = f.read()
crl = load_crl(crl_data, wrap_tags=False)
if crl:
crl_pem = encode_certificate(crl)
config.set(pki_base + ['ca', pki_name, 'crl'], value=wrapped_pem_to_config_value(crl_pem))
else:
print(f'Failed to migrate CRL on openvpn interface {interface}')
config.delete(x509_base + ['crl-file'])
if config.exists(x509_base + ['cert-file']):
if not config.exists(pki_base + ['certificate']):
config.set(pki_base + ['certificate'])
config.set_tag(pki_base + ['certificate'])
cert_file = config.return_value(x509_base + ['cert-file'])
cert_path = os.path.join(AUTH_DIR, cert_file)
cert = None
if os.path.isfile(cert_path):
if not os.access(cert_path, os.R_OK):
run(f'sudo chmod 644 {cert_path}')
with open(cert_path, 'r') as f:
cert_data = f.read()
cert = load_certificate(cert_data, wrap_tags=False)
if cert:
cert_pem = encode_certificate(cert)
config.set(pki_base + ['certificate', pki_name, 'certificate'], value=wrapped_pem_to_config_value(cert_pem))
config.set(x509_base + ['certificate'], value=pki_name)
else:
print(f'Failed to migrate certificate on openvpn interface {interface}')
config.delete(x509_base + ['cert-file'])
if config.exists(x509_base + ['key-file']):
key_file = config.return_value(x509_base + ['key-file'])
key_path = os.path.join(AUTH_DIR, key_file)
key = None
if os.path.isfile(key_path):
if not os.access(key_path, os.R_OK):
run(f'sudo chmod 644 {key_path}')
with open(key_path, 'r') as f:
key_data = f.read()
key = load_private_key(key_data, passphrase=None, wrap_tags=False)
if key:
key_pem = encode_private_key(key, passphrase=None)
config.set(pki_base + ['certificate', pki_name, 'private', 'key'], value=wrapped_pem_to_config_value(key_pem))
else:
print(f'Failed to migrate private key on openvpn interface {interface}')
config.delete(x509_base + ['key-file'])
if config.exists(x509_base + ['dh-file']):
if not config.exists(pki_base + ['dh']):
config.set(pki_base + ['dh'])
config.set_tag(pki_base + ['dh'])
dh_file = config.return_value(x509_base + ['dh-file'])
dh_path = os.path.join(AUTH_DIR, dh_file)
dh = None
if os.path.isfile(dh_path):
if not os.access(dh_path, os.R_OK):
run(f'sudo chmod 644 {dh_path}')
with open(dh_path, 'r') as f:
dh_data = f.read()
dh = load_dh_parameters(dh_data, wrap_tags=False)
if dh:
dh_pem = encode_dh_parameters(dh)
config.set(pki_base + ['dh', pki_name, 'parameters'], value=wrapped_pem_to_config_value(dh_pem))
config.set(x509_base + ['dh-params'], value=pki_name)
else:
print(f'Failed to migrate DH parameters on openvpn interface {interface}')
config.delete(x509_base + ['dh-file'])
# Wireguard
base = ['interfaces', 'wireguard']
if config.exists(base):
for interface in config.list_nodes(base):
private_key_path = base + [interface, 'private-key']
key_file = 'default'
if config.exists(private_key_path):
key_file = config.return_value(private_key_path)
full_key_path = f'/config/auth/wireguard/{key_file}/private.key'
if not os.path.exists(full_key_path):
print(f'Could not find wireguard private key for migration on interface "{interface}"')
continue
with open(full_key_path, 'r') as f:
key_data = f.read().strip()
config.set(private_key_path, value=key_data)
for peer in config.list_nodes(base + [interface, 'peer']):
config.rename(base + [interface, 'peer', peer, 'pubkey'], 'public-key')
# Ethernet EAPoL
base = ['interfaces', 'ethernet']
if config.exists(base):
for interface in config.list_nodes(base):
if not config.exists(base + [interface, 'eapol']):
continue
x509_base = base + [interface, 'eapol']
pki_name = f'eapol_{interface}'
if config.exists(x509_base + ['ca-cert-file']):
if not config.exists(pki_base + ['ca']):
config.set(pki_base + ['ca'])
config.set_tag(pki_base + ['ca'])
cert_file = config.return_value(x509_base + ['ca-cert-file'])
cert_path = os.path.join(AUTH_DIR, cert_file)
cert = None
if os.path.isfile(cert_path):
if not os.access(cert_path, os.R_OK):
run(f'sudo chmod 644 {cert_path}')
with open(cert_path, 'r') as f:
cert_data = f.read()
cert = load_certificate(cert_data, wrap_tags=False)
if cert:
cert_pem = encode_certificate(cert)
config.set(pki_base + ['ca', pki_name, 'certificate'], value=wrapped_pem_to_config_value(cert_pem))
config.set(x509_base + ['ca-certificate'], value=pki_name)
else:
print(f'Failed to migrate CA certificate on eapol config for interface {interface}')
config.delete(x509_base + ['ca-cert-file'])
if config.exists(x509_base + ['cert-file']):
if not config.exists(pki_base + ['certificate']):
config.set(pki_base + ['certificate'])
config.set_tag(pki_base + ['certificate'])
cert_file = config.return_value(x509_base + ['cert-file'])
cert_path = os.path.join(AUTH_DIR, cert_file)
cert = None
if os.path.isfile(cert_path):
if not os.access(cert_path, os.R_OK):
run(f'sudo chmod 644 {cert_path}')
with open(cert_path, 'r') as f:
cert_data = f.read()
cert = load_certificate(cert_data, wrap_tags=False)
if cert:
cert_pem = encode_certificate(cert)
config.set(pki_base + ['certificate', pki_name, 'certificate'], value=wrapped_pem_to_config_value(cert_pem))
config.set(x509_base + ['certificate'], value=pki_name)
else:
print(f'Failed to migrate certificate on eapol config for interface {interface}')
config.delete(x509_base + ['cert-file'])
if config.exists(x509_base + ['key-file']):
key_file = config.return_value(x509_base + ['key-file'])
key_path = os.path.join(AUTH_DIR, key_file)
key = None
if os.path.isfile(key_path):
if not os.access(key_path, os.R_OK):
run(f'sudo chmod 644 {key_path}')
with open(key_path, 'r') as f:
key_data = f.read()
key = load_private_key(key_data, passphrase=None, wrap_tags=False)
if key:
key_pem = encode_private_key(key, passphrase=None)
config.set(pki_base + ['certificate', pki_name, 'private', 'key'], value=wrapped_pem_to_config_value(key_pem))
else:
print(f'Failed to migrate private key on eapol config for interface {interface}')
config.delete(x509_base + ['key-file'])
try:
with open(file_name, 'w') as f:
f.write(config.to_string())
except OSError as e:
print("Failed to save the modified config: {}".format(e))
sys.exit(1)

File Metadata

Mime Type
text/x-script.python
Expires
Sun, Jan 11, 10:04 AM (8 h, 54 m)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3203886
Default Alt Text
22-to-23 (14 KB)

Event Timeline