diff --git a/src/conf_mode/interfaces_wireguard.py b/src/conf_mode/interfaces_wireguard.py index 0e0b77877..d7a638b51 100755 --- a/src/conf_mode/interfaces_wireguard.py +++ b/src/conf_mode/interfaces_wireguard.py @@ -1,132 +1,135 @@ #!/usr/bin/env python3 # # Copyright (C) 2018-2024 VyOS maintainers and contributors # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License version 2 or later as # published by the Free Software Foundation. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. from sys import exit from vyos.config import Config from vyos.configdict import get_interface_dict from vyos.configdict import is_node_changed from vyos.configverify import verify_vrf from vyos.configverify import verify_address from vyos.configverify import verify_bridge_delete from vyos.configverify import verify_mtu_ipv6 from vyos.configverify import verify_mirror_redirect from vyos.configverify import verify_bond_bridge_member from vyos.ifconfig import WireGuardIf from vyos.utils.kernel import check_kmod from vyos.utils.network import check_port_availability from vyos.utils.network import is_wireguard_key_pair from vyos import ConfigError from vyos import airbag airbag.enable() def get_config(config=None): """ Retrive CLI config as dictionary. Dictionary can never be empty, as at least the interface name will be added or a deleted flag """ if config: conf = config else: conf = Config() base = ['interfaces', 'wireguard'] ifname, wireguard = get_interface_dict(conf, base) # Check if a port was changed tmp = is_node_changed(conf, base + [ifname, 'port']) if tmp: wireguard['port_changed'] = {} # T4702: If anything on a peer changes we remove the peer first and re-add it if is_node_changed(conf, base + [ifname, 'peer']): wireguard.update({'rebuild_required': {}}) return wireguard def verify(wireguard): if 'deleted' in wireguard: verify_bridge_delete(wireguard) return None verify_mtu_ipv6(wireguard) verify_address(wireguard) verify_vrf(wireguard) verify_bond_bridge_member(wireguard) verify_mirror_redirect(wireguard) if 'private_key' not in wireguard: raise ConfigError('Wireguard private-key not defined') if 'peer' not in wireguard: raise ConfigError('At least one Wireguard peer is required!') if 'port' in wireguard and 'port_changed' in wireguard: listen_port = int(wireguard['port']) if check_port_availability('0.0.0.0', listen_port, 'udp') is not True: raise ConfigError(f'UDP port {listen_port} is busy or unavailable and ' 'cannot be used for the interface!') # run checks on individual configured WireGuard peer public_keys = [] for tmp in wireguard['peer']: peer = wireguard['peer'][tmp] if 'allowed_ips' not in peer: raise ConfigError(f'Wireguard allowed-ips required for peer "{tmp}"!') if 'public_key' not in peer: raise ConfigError(f'Wireguard public-key required for peer "{tmp}"!') if ('address' in peer and 'port' not in peer) or ('port' in peer and 'address' not in peer): raise ConfigError('Both Wireguard port and address must be defined ' f'for peer "{tmp}" if either one of them is set!') if peer['public_key'] in public_keys: raise ConfigError(f'Duplicate public-key defined on peer "{tmp}"') if 'disable' not in peer: if is_wireguard_key_pair(wireguard['private_key'], peer['public_key']): raise ConfigError(f'Peer "{tmp}" has the same public key as the interface "{wireguard["ifname"]}"') public_keys.append(peer['public_key']) +def generate(wireguard): + return None + def apply(wireguard): if 'rebuild_required' in wireguard or 'deleted' in wireguard: wg = WireGuardIf(**wireguard) # WireGuard only supports peer removal based on the configured public-key, # by deleting the entire interface this is the shortcut instead of parsing # out all peers and removing them one by one. # # Peer reconfiguration will always come with a short downtime while the # WireGuard interface is recreated (see below) wg.remove() # Create the new interface if required if 'deleted' not in wireguard: wg = WireGuardIf(**wireguard) wg.update(wireguard) return None if __name__ == '__main__': try: check_kmod('wireguard') c = get_config() verify(c) apply(c) except ConfigError as e: print(e) exit(1) diff --git a/src/conf_mode/system_acceleration.py b/src/conf_mode/system_acceleration.py index e4b248675..d2cf44ff0 100755 --- a/src/conf_mode/system_acceleration.py +++ b/src/conf_mode/system_acceleration.py @@ -1,106 +1,109 @@ #!/usr/bin/env python3 # # Copyright (C) 2019-2023 VyOS maintainers and contributors # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License version 2 or later as # published by the Free Software Foundation. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. import os import re from sys import exit from vyos.config import Config from vyos.utils.process import popen from vyos.utils.process import run from vyos import ConfigError from vyos import airbag airbag.enable() qat_init_script = '/etc/init.d/qat_service' def get_config(config=None): if config: conf = config else: conf = Config() data = {} if conf.exists(['system', 'acceleration', 'qat']): data.update({'qat_enable' : ''}) if conf.exists(['vpn', 'ipsec']): data.update({'ipsec' : ''}) if conf.exists(['interfaces', 'openvpn']): data.update({'openvpn' : ''}) return data def vpn_control(action, force_ipsec=False): # XXX: Should these commands report failure? if action == 'restore' and force_ipsec: return run('ipsec start') return run(f'ipsec {action}') def verify(qat): if 'qat_enable' not in qat: return # Check if QAT service installed if not os.path.exists(qat_init_script): raise ConfigError('QAT init script not found') # Check if QAT device exist output, err = popen('lspci -nn', decode='utf-8') if not err: # PCI id | Chipset # 19e2 -> C3xx # 37c8 -> C62x # 0435 -> DH895 # 6f54 -> D15xx # 18ee -> QAT_200XX data = re.findall( '(8086:19e2)|(8086:37c8)|(8086:0435)|(8086:6f54)|(8086:18ee)', output) # If QAT devices found if not data: raise ConfigError('No QAT acceleration device found') +def generate(qat): + return + def apply(qat): # Shutdown VPN service which can use QAT if 'ipsec' in qat: vpn_control('stop') # Enable/Disable QAT service if 'qat_enable' in qat: run(f'{qat_init_script} start') else: run(f'{qat_init_script} stop') # Recover VPN service if 'ipsec' in qat: vpn_control('start') if __name__ == '__main__': try: c = get_config() verify(c) apply(c) except ConfigError as e: print(e) vpn_control('restore', force_ipsec=('ipsec' in c)) exit(1) diff --git a/src/tests/test_configd_inspect.py b/src/tests/test_configd_inspect.py index 98552c8f3..ccd631893 100644 --- a/src/tests/test_configd_inspect.py +++ b/src/tests/test_configd_inspect.py @@ -1,105 +1,104 @@ # Copyright (C) 2020-2024 VyOS maintainers and contributors # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License version 2 or later as # published by the Free Software Foundation. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. import os import re import json import warnings import importlib.util from inspect import signature from inspect import getsource from functools import wraps from unittest import TestCase INC_FILE = 'data/configd-include.json' CONF_DIR = 'src/conf_mode' f_list = ['get_config', 'verify', 'generate', 'apply'] def import_script(s): path = os.path.join(CONF_DIR, s) name = os.path.splitext(s)[0].replace('-', '_') spec = importlib.util.spec_from_file_location(name, path) module = importlib.util.module_from_spec(spec) spec.loader.exec_module(module) return module # importing conf_mode scripts imports jinja2 with deprecation warning def ignore_deprecation_warning(f): @wraps(f) def decorated_function(*args, **kwargs): with warnings.catch_warnings(): warnings.simplefilter("ignore") f(*args, **kwargs) return decorated_function class TestConfigdInspect(TestCase): def setUp(self): with open(INC_FILE) as f: self.inc_list = json.load(f) @ignore_deprecation_warning def test_signatures(self): for s in self.inc_list: m = import_script(s) for i in f_list: f = getattr(m, i, None) - if not f: - continue + self.assertIsNotNone(f, f"'{s}': missing function '{i}'") sig = signature(f) par = sig.parameters l = len(par) self.assertEqual(l, 1, f"'{s}': '{i}' incorrect signature") if i == 'get_config': for p in par.values(): self.assertTrue(p.default is None, f"'{s}': '{i}' incorrect signature") @ignore_deprecation_warning def test_function_instance(self): for s in self.inc_list: m = import_script(s) for i in f_list: f = getattr(m, i, None) if not f: continue str_f = getsource(f) # Regex not XXXConfig() T3108 n = len(re.findall(r'[^a-zA-Z]Config\(\)', str_f)) if i == 'get_config': self.assertEqual(n, 1, f"'{s}': '{i}' no instance of Config") if i != 'get_config': self.assertEqual(n, 0, f"'{s}': '{i}' instance of Config") @ignore_deprecation_warning def test_file_instance(self): for s in self.inc_list: m = import_script(s) str_m = getsource(m) # Regex not XXXConfig T3108 n = len(re.findall(r'[^a-zA-Z]Config\(\)', str_m)) self.assertEqual(n, 1, f"'{s}' more than one instance of Config") @ignore_deprecation_warning def test_config_modification(self): for s in self.inc_list: m = import_script(s) str_m = getsource(m) n = str_m.count('my_set') self.assertEqual(n, 0, f"'{s}' modifies config")