diff --git a/smoketest/scripts/cli/test_interfaces_pppoe.py b/smoketest/scripts/cli/test_interfaces_pppoe.py index e99d8b3d1..2683a3122 100755 --- a/smoketest/scripts/cli/test_interfaces_pppoe.py +++ b/smoketest/scripts/cli/test_interfaces_pppoe.py @@ -1,258 +1,259 @@ #!/usr/bin/env python3 # -# Copyright (C) 2019-2023 VyOS maintainers and contributors +# Copyright (C) 2019-2024 VyOS maintainers and contributors # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License version 2 or later as # published by the Free Software Foundation. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. import unittest from psutil import process_iter from base_vyostest_shim import VyOSUnitTestSHIM from vyos.configsession import ConfigSessionError +from vyos.xml_ref import default_value config_file = '/etc/ppp/peers/{}' base_path = ['interfaces', 'pppoe'] def get_config_value(interface, key): with open(config_file.format(interface), 'r') as f: for line in f: if line.startswith(key): return list(line.split()) return [] # add a classmethod to setup a temporaray PPPoE server for "proper" validation class PPPoEInterfaceTest(VyOSUnitTestSHIM.TestCase): @classmethod def setUpClass(cls): super(PPPoEInterfaceTest, cls).setUpClass() # ensure we can also run this test on a live system - so lets clean # out the current configuration :) cls.cli_delete(cls, base_path) cls._interfaces = ['pppoe10', 'pppoe20', 'pppoe30'] cls._source_interface = 'eth0' def tearDown(self): # Validate PPPoE client process for interface in self._interfaces: running = False for proc in process_iter(): if interface in proc.cmdline(): running = True break self.assertTrue(running) self.cli_delete(base_path) self.cli_commit() def test_pppoe_client(self): # Check if PPPoE dialer can be configured and runs for interface in self._interfaces: user = f'VyOS-user-{interface}' passwd = f'VyOS-passwd-{interface}' mtu = '1400' self.cli_set(base_path + [interface, 'authentication', 'username', user]) self.cli_set(base_path + [interface, 'authentication', 'password', passwd]) self.cli_set(base_path + [interface, 'mtu', mtu]) self.cli_set(base_path + [interface, 'no-peer-dns']) # check validate() - a source-interface is required with self.assertRaises(ConfigSessionError): self.cli_commit() self.cli_set(base_path + [interface, 'source-interface', self._source_interface]) # commit changes self.cli_commit() # verify configuration file(s) for interface in self._interfaces: user = f'VyOS-user-{interface}' passwd = f'VyOS-passwd-{interface}' tmp = get_config_value(interface, 'mtu')[1] self.assertEqual(tmp, mtu) # MRU must default to MTU if not specified on CLI tmp = get_config_value(interface, 'mru')[1] self.assertEqual(tmp, mtu) tmp = get_config_value(interface, 'user')[1].replace('"', '') self.assertEqual(tmp, user) tmp = get_config_value(interface, 'password')[1].replace('"', '') self.assertEqual(tmp, passwd) tmp = get_config_value(interface, 'ifname')[1] self.assertEqual(tmp, interface) def test_pppoe_client_disabled_interface(self): # Check if PPPoE Client can be disabled for interface in self._interfaces: user = f'VyOS-user-{interface}' passwd = f'VyOS-passwd-{interface}' self.cli_set(base_path + [interface, 'authentication', 'username', user]) self.cli_set(base_path + [interface, 'authentication', 'password', passwd]) self.cli_set(base_path + [interface, 'source-interface', self._source_interface]) self.cli_set(base_path + [interface, 'disable']) self.cli_commit() # Validate PPPoE client process - must not run as interfaces are disabled for interface in self._interfaces: running = False for proc in process_iter(): if interface in proc.cmdline(): running = True break self.assertFalse(running) # enable PPPoE interfaces for interface in self._interfaces: self.cli_delete(base_path + [interface, 'disable']) self.cli_commit() def test_pppoe_authentication(self): # When username or password is set - so must be the other for interface in self._interfaces: user = f'VyOS-user-{interface}' passwd = f'VyOS-passwd-{interface}' self.cli_set(base_path + [interface, 'source-interface', self._source_interface]) self.cli_set(base_path + [interface, 'ipv6', 'address', 'autoconf']) self.cli_set(base_path + [interface, 'authentication', 'username', user]) # check validate() - if user is set, so must be the password with self.assertRaises(ConfigSessionError): self.cli_commit() self.cli_set(base_path + [interface, 'authentication', 'password', passwd]) self.cli_commit() def test_pppoe_dhcpv6pd(self): # Check if PPPoE dialer can be configured with DHCPv6-PD address = '1' sla_id = '0' sla_len = '8' for interface in self._interfaces: user = f'VyOS-user-{interface}' passwd = f'VyOS-passwd-{interface}' self.cli_set(base_path + [interface, 'authentication', 'username', user]) self.cli_set(base_path + [interface, 'authentication', 'password', passwd]) self.cli_set(base_path + [interface, 'no-default-route']) self.cli_set(base_path + [interface, 'no-peer-dns']) self.cli_set(base_path + [interface, 'source-interface', self._source_interface]) self.cli_set(base_path + [interface, 'ipv6', 'address', 'autoconf']) # prefix delegation stuff dhcpv6_pd_base = base_path + [interface, 'dhcpv6-options', 'pd', '0'] self.cli_set(dhcpv6_pd_base + ['length', '56']) self.cli_set(dhcpv6_pd_base + ['interface', self._source_interface, 'address', address]) self.cli_set(dhcpv6_pd_base + ['interface', self._source_interface, 'sla-id', sla_id]) # commit changes self.cli_commit() for interface in self._interfaces: user = f'VyOS-user-{interface}' passwd = f'VyOS-passwd-{interface}' + mtu_default = default_value(base_path + [interface, 'mtu']) - # verify "normal" PPPoE value - 1492 is default MTU tmp = get_config_value(interface, 'mtu')[1] - self.assertEqual(tmp, '1492') + self.assertEqual(tmp, mtu_default) tmp = get_config_value(interface, 'user')[1].replace('"', '') self.assertEqual(tmp, user) tmp = get_config_value(interface, 'password')[1].replace('"', '') self.assertEqual(tmp, passwd) tmp = get_config_value(interface, '+ipv6 ipv6cp-use-ipaddr') self.assertListEqual(tmp, ['+ipv6', 'ipv6cp-use-ipaddr']) def test_pppoe_options(self): # Check if PPPoE dialer can be configured with DHCPv6-PD for interface in self._interfaces: user = f'VyOS-user-{interface}' passwd = f'VyOS-passwd-{interface}' ac_name = f'AC{interface}' service_name = f'SRV{interface}' host_uniq = 'cafebeefBABE123456' self.cli_set(base_path + [interface, 'authentication', 'username', user]) self.cli_set(base_path + [interface, 'authentication', 'password', passwd]) self.cli_set(base_path + [interface, 'source-interface', self._source_interface]) self.cli_set(base_path + [interface, 'access-concentrator', ac_name]) self.cli_set(base_path + [interface, 'service-name', service_name]) self.cli_set(base_path + [interface, 'host-uniq', host_uniq]) # commit changes self.cli_commit() for interface in self._interfaces: ac_name = f'AC{interface}' service_name = f'SRV{interface}' host_uniq = 'cafebeefBABE123456' tmp = get_config_value(interface, 'pppoe-ac')[1] self.assertEqual(tmp, f'"{ac_name}"') tmp = get_config_value(interface, 'pppoe-service')[1] self.assertEqual(tmp, f'"{service_name}"') tmp = get_config_value(interface, 'pppoe-host-uniq')[1] self.assertEqual(tmp, f'"{host_uniq}"') def test_pppoe_mtu_mru(self): # Check if PPPoE dialer can be configured and runs for interface in self._interfaces: user = f'VyOS-user-{interface}' passwd = f'VyOS-passwd-{interface}' mtu = '1400' mru = '1300' self.cli_set(base_path + [interface, 'authentication', 'username', user]) self.cli_set(base_path + [interface, 'authentication', 'password', passwd]) self.cli_set(base_path + [interface, 'mtu', mtu]) self.cli_set(base_path + [interface, 'mru', '9000']) # check validate() - a source-interface is required with self.assertRaises(ConfigSessionError): self.cli_commit() self.cli_set(base_path + [interface, 'source-interface', self._source_interface]) # check validate() - MRU needs to be less or equal then MTU with self.assertRaises(ConfigSessionError): self.cli_commit() self.cli_set(base_path + [interface, 'mru', mru]) # commit changes self.cli_commit() # verify configuration file(s) for interface in self._interfaces: user = f'VyOS-user-{interface}' passwd = f'VyOS-passwd-{interface}' tmp = get_config_value(interface, 'mtu')[1] self.assertEqual(tmp, mtu) tmp = get_config_value(interface, 'mru')[1] self.assertEqual(tmp, mru) tmp = get_config_value(interface, 'user')[1].replace('"', '') self.assertEqual(tmp, user) tmp = get_config_value(interface, 'password')[1].replace('"', '') self.assertEqual(tmp, passwd) tmp = get_config_value(interface, 'ifname')[1] self.assertEqual(tmp, interface) if __name__ == '__main__': unittest.main(verbosity=2) diff --git a/smoketest/scripts/cli/test_interfaces_wireless.py b/smoketest/scripts/cli/test_interfaces_wireless.py index 95246a7b9..83b00ac0c 100755 --- a/smoketest/scripts/cli/test_interfaces_wireless.py +++ b/smoketest/scripts/cli/test_interfaces_wireless.py @@ -1,285 +1,287 @@ #!/usr/bin/env python3 # # Copyright (C) 2020-2023 VyOS maintainers and contributors # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License version 2 or later as # published by the Free Software Foundation. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. import os import re import unittest from base_interfaces_test import BasicInterfaceTest from glob import glob from vyos.configsession import ConfigSessionError from vyos.utils.process import process_named_running from vyos.utils.kernel import check_kmod from vyos.utils.file import read_file +from vyos.xml_ref import default_value def get_config_value(interface, key): tmp = read_file(f'/run/hostapd/{interface}.conf') tmp = re.findall(f'{key}=+(.*)', tmp) return tmp[0] class WirelessInterfaceTest(BasicInterfaceTest.TestCase): @classmethod def setUpClass(cls): cls._base_path = ['interfaces', 'wireless'] cls._options = { 'wlan0': ['physical-device phy0', 'ssid VyOS-WIFI-0', 'type station', 'address 192.0.2.1/30'], 'wlan1': ['physical-device phy0', 'ssid VyOS-WIFI-1', 'country-code se', 'type access-point', 'address 192.0.2.5/30', 'channel 0'], 'wlan10': ['physical-device phy1', 'ssid VyOS-WIFI-2', 'type station', 'address 192.0.2.9/30'], 'wlan11': ['physical-device phy1', 'ssid VyOS-WIFI-3', 'country-code se', 'type access-point', 'address 192.0.2.13/30', 'channel 0'], } cls._interfaces = list(cls._options) # call base-classes classmethod super(WirelessInterfaceTest, cls).setUpClass() # T5245 - currently testcases are disabled cls._test_ipv6 = False cls._test_vlan = False def test_wireless_add_single_ip_address(self): # derived method to check if member interfaces are enslaved properly super().test_add_single_ip_address() for option, option_value in self._options.items(): if 'type access-point' in option_value: # Check for running process self.assertTrue(process_named_running('hostapd')) elif 'type station' in option_value: # Check for running process self.assertTrue(process_named_running('wpa_supplicant')) else: self.assertTrue(False) def test_wireless_hostapd_config(self): # Only set the hostapd (access-point) options interface = 'wlan0' ssid = 'ssid' self.cli_set(self._base_path + [interface, 'ssid', ssid]) self.cli_set(self._base_path + [interface, 'country-code', 'se']) self.cli_set(self._base_path + [interface, 'type', 'access-point']) # auto-powersave is special self.cli_set(self._base_path + [interface, 'capabilities', 'ht', 'auto-powersave']) ht_opt = { # VyOS CLI option hostapd - ht_capab setting '40mhz-incapable' : '[40-INTOLERANT]', 'delayed-block-ack' : '[DELAYED-BA]', 'greenfield' : '[GF]', 'ldpc' : '[LDPC]', 'lsig-protection' : '[LSIG-TXOP-PROT]', 'channel-set-width ht40+' : '[HT40+]', 'stbc tx' : '[TX-STBC]', 'stbc rx 123' : '[RX-STBC-123]', 'max-amsdu 7935' : '[MAX-AMSDU-7935]', 'smps static' : '[SMPS-STATIC]', } for key in ht_opt: self.cli_set(self._base_path + [interface, 'capabilities', 'ht'] + key.split()) vht_opt = { # VyOS CLI option hostapd - ht_capab setting 'channel-set-width 3' : '[VHT160-80PLUS80]', 'stbc tx' : '[TX-STBC-2BY1]', 'stbc rx 12' : '[RX-STBC-12]', 'ldpc' : '[RXLDPC]', 'tx-powersave' : '[VHT-TXOP-PS]', 'vht-cf' : '[HTC-VHT]', 'antenna-pattern-fixed' : '[RX-ANTENNA-PATTERN][TX-ANTENNA-PATTERN]', 'max-mpdu 11454' : '[MAX-MPDU-11454]', 'max-mpdu-exp 2' : '[MAX-A-MPDU-LEN-EXP-2]', 'link-adaptation both' : '[VHT-LINK-ADAPT3]', 'short-gi 80' : '[SHORT-GI-80]', 'short-gi 160' : '[SHORT-GI-160]', } for key in vht_opt: self.cli_set(self._base_path + [interface, 'capabilities', 'vht'] + key.split()) self.cli_commit() # # Validate Config # tmp = get_config_value(interface, 'interface') self.assertEqual(interface, tmp) # ssid tmp = get_config_value(interface, 'ssid') self.assertEqual(ssid, tmp) # channel tmp = get_config_value(interface, 'channel') - self.assertEqual('0', tmp) # default is channel 0 + cli_default = default_value(self._base_path + [interface, 'channel']) + self.assertEqual(cli_default, tmp) # auto-powersave is special tmp = get_config_value(interface, 'uapsd_advertisement_enabled') self.assertEqual('1', tmp) tmp = get_config_value(interface, 'ht_capab') for key, value in ht_opt.items(): self.assertIn(value, tmp) tmp = get_config_value(interface, 'vht_capab') for key, value in vht_opt.items(): self.assertIn(value, tmp) # Check for running process self.assertTrue(process_named_running('hostapd')) def test_wireless_hostapd_wpa_config(self): # Only set the hostapd (access-point) options interface = 'wlan0' phy = 'phy0' ssid = 'ssid' channel = '1' wpa_key = 'VyOSVyOSVyOS' mode = 'n' country = 'de' self.cli_set(self._base_path + [interface, 'physical-device', phy]) self.cli_set(self._base_path + [interface, 'type', 'access-point']) self.cli_set(self._base_path + [interface, 'mode', mode]) # SSID must be set with self.assertRaises(ConfigSessionError): self.cli_commit() self.cli_set(self._base_path + [interface, 'ssid', ssid]) # Channel must be set with self.assertRaises(ConfigSessionError): self.cli_commit() self.cli_set(self._base_path + [interface, 'channel', channel]) # Country-Code must be set with self.assertRaises(ConfigSessionError): self.cli_commit() self.cli_set(self._base_path + [interface, 'country-code', country]) self.cli_set(self._base_path + [interface, 'security', 'wpa', 'mode', 'wpa2']) self.cli_set(self._base_path + [interface, 'security', 'wpa', 'passphrase', wpa_key]) self.cli_commit() # # Validate Config # tmp = get_config_value(interface, 'interface') self.assertEqual(interface, tmp) tmp = get_config_value(interface, 'hw_mode') # rewrite special mode if mode == 'n': mode = 'g' self.assertEqual(mode, tmp) # WPA key tmp = get_config_value(interface, 'wpa') self.assertEqual('2', tmp) tmp = get_config_value(interface, 'wpa_passphrase') self.assertEqual(wpa_key, tmp) # SSID tmp = get_config_value(interface, 'ssid') self.assertEqual(ssid, tmp) # channel tmp = get_config_value(interface, 'channel') self.assertEqual(channel, tmp) # Country code tmp = get_config_value(interface, 'country_code') self.assertEqual(country.upper(), tmp) # Check for running process self.assertTrue(process_named_running('hostapd')) def test_wireless_access_point_bridge(self): interface = 'wlan0' ssid = 'VyOS-Test' bridge = 'br42477' # We need a bridge where we can hook our access-point interface to bridge_path = ['interfaces', 'bridge', bridge] self.cli_set(bridge_path + ['member', 'interface', interface]) self.cli_set(self._base_path + [interface, 'ssid', ssid]) self.cli_set(self._base_path + [interface, 'country-code', 'se']) self.cli_set(self._base_path + [interface, 'type', 'access-point']) self.cli_commit() # Check for running process self.assertTrue(process_named_running('hostapd')) bridge_members = [] for tmp in glob(f'/sys/class/net/{bridge}/lower_*'): bridge_members.append(os.path.basename(tmp).replace('lower_', '')) self.assertIn(interface, bridge_members) self.cli_delete(bridge_path) def test_wireless_security_station_address(self): interface = 'wlan0' ssid = 'VyOS-ACL' hostapd_accept_station_conf = f'/run/hostapd/{interface}_station_accept.conf' hostapd_deny_station_conf = f'/run/hostapd/{interface}_station_deny.conf' accept_mac = ['00:00:00:00:ac:01', '00:00:00:00:ac:02', '00:00:00:00:ac:03', '00:00:00:00:ac:04'] deny_mac = ['00:00:00:00:de:01', '00:00:00:00:de:02', '00:00:00:00:de:03', '00:00:00:00:de:04'] self.cli_set(self._base_path + [interface, 'ssid', ssid]) self.cli_set(self._base_path + [interface, 'country-code', 'se']) self.cli_set(self._base_path + [interface, 'type', 'access-point']) self.cli_set(self._base_path + [interface, 'security', 'station-address', 'mode', 'accept']) for mac in accept_mac: self.cli_set(self._base_path + [interface, 'security', 'station-address', 'accept', 'mac', mac]) for mac in deny_mac: self.cli_set(self._base_path + [interface, 'security', 'station-address', 'deny', 'mac', mac]) self.cli_commit() # in accept mode all addresses are allowed unless specified in the deny list tmp = get_config_value(interface, 'macaddr_acl') self.assertEqual(tmp, '0') accept_list = read_file(hostapd_accept_station_conf) for mac in accept_mac: self.assertIn(mac, accept_list) deny_list = read_file(hostapd_deny_station_conf) for mac in deny_mac: self.assertIn(mac, deny_list) # Switch mode accept -> deny self.cli_set(self._base_path + [interface, 'security', 'station-address', 'mode', 'deny']) self.cli_commit() # In deny mode all addresses are denied unless specified in the allow list tmp = get_config_value(interface, 'macaddr_acl') self.assertEqual(tmp, '1') # Check for running process self.assertTrue(process_named_running('hostapd')) if __name__ == '__main__': check_kmod('mac80211_hwsim') unittest.main(verbosity=2) diff --git a/smoketest/scripts/cli/test_service_https.py b/smoketest/scripts/cli/test_service_https.py index 94eade2d7..f2a64627f 100755 --- a/smoketest/scripts/cli/test_service_https.py +++ b/smoketest/scripts/cli/test_service_https.py @@ -1,463 +1,461 @@ #!/usr/bin/env python3 # # Copyright (C) 2019-2024 VyOS maintainers and contributors # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License version 2 or later as # published by the Free Software Foundation. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. import unittest import json from requests import request from urllib3.exceptions import InsecureRequestWarning from time import sleep from base_vyostest_shim import VyOSUnitTestSHIM from base_vyostest_shim import ignore_warning from vyos.utils.file import read_file from vyos.utils.file import write_file from vyos.utils.process import call from vyos.utils.process import process_named_running +from vyos.xml_ref import default_value from vyos.configsession import ConfigSessionError base_path = ['service', 'https'] pki_base = ['pki'] cert_data = """ MIICFDCCAbugAwIBAgIUfMbIsB/ozMXijYgUYG80T1ry+mcwCgYIKoZIzj0EAwIw WTELMAkGA1UEBhMCR0IxEzARBgNVBAgMClNvbWUtU3RhdGUxEjAQBgNVBAcMCVNv bWUtQ2l0eTENMAsGA1UECgwEVnlPUzESMBAGA1UEAwwJVnlPUyBUZXN0MB4XDTIx MDcyMDEyNDUxMloXDTI2MDcxOTEyNDUxMlowWTELMAkGA1UEBhMCR0IxEzARBgNV BAgMClNvbWUtU3RhdGUxEjAQBgNVBAcMCVNvbWUtQ2l0eTENMAsGA1UECgwEVnlP UzESMBAGA1UEAwwJVnlPUyBUZXN0MFkwEwYHKoZIzj0CAQYIKoZIzj0DAQcDQgAE 01HrLcNttqq4/PtoMua8rMWEkOdBu7vP94xzDO7A8C92ls1v86eePy4QllKCzIw3 QxBIoCuH2peGRfWgPRdFsKNhMF8wDwYDVR0TAQH/BAUwAwEB/zAOBgNVHQ8BAf8E BAMCAYYwHQYDVR0lBBYwFAYIKwYBBQUHAwIGCCsGAQUFBwMBMB0GA1UdDgQWBBSu +JnU5ZC4mkuEpqg2+Mk4K79oeDAKBggqhkjOPQQDAgNHADBEAiBEFdzQ/Bc3Lftz ngrY605UhA6UprHhAogKgROv7iR4QgIgEFUxTtW3xXJcnUPWhhUFhyZoqfn8dE93 +dm/LDnp7C0= """ key_data = """ MIGHAgEAMBMGByqGSM49AgEGCCqGSM49AwEHBG0wawIBAQQgPLpD0Ohhoq0g4nhx 2KMIuze7ucKUt/lBEB2wc03IxXyhRANCAATTUestw222qrj8+2gy5rysxYSQ50G7 u8/3jHMM7sDwL3aWzW/zp54/LhCWUoLMjDdDEEigK4fal4ZF9aA9F0Ww """ dh_1024 = """ MIGHAoGBAM3nvMkHGi/xmRs8cYg4pcl5sAanxel9EM+1XobVhUViXw8JvlmSEVOj n2aXUifc4SEs3WDzVPRC8O8qQWjvErpTq/HOgt3aqBCabMgvflmt706XP0KiqnpW EyvNiI27J3wBUzEXLIS110MxPAX5Tcug974PecFcOxn1RWrbWcx/AgEC """ dh_2048 = """ MIIBCAKCAQEA1mld/V7WnxxRinkOlhx/BoZkRELtIUQFYxyARBqYk4C5G3YnZNNu zjaGyPnfIKHu8SIUH85OecM+5/co9nYlcUJuph2tbR6qNgPw7LOKIhf27u7WhvJk iVsJhwZiWmvvMV4jTParNEI2svoooMyhHXzeweYsg6YtgLVmwiwKj3XP3gRH2i3B Mq8CDS7X6xaKvjfeMPZBFqOM5nb6HhsbaAUyiZxrfipLvXxtnbzd/eJUQVfVdxM3 pn0i+QrO2tuNAzX7GoPc9pefrbb5xJmGS50G0uqsR59+7LhYmyZSBASA0lxTEW9t kv/0LPvaYTY57WL7hBeqqHy/WPZHPzDI3wIBAg== """ # to test load config via HTTP URL nginx_tmp_site = '/etc/nginx/sites-enabled/smoketest' nginx_conf_smoketest = """ server { listen 8000; server_name localhost; root /tmp; index index.html; location / { try_files $uri $uri/ =404; autoindex on; } } """ PROCESS_NAME = 'nginx' class TestHTTPSService(VyOSUnitTestSHIM.TestCase): @classmethod def setUpClass(cls): super(TestHTTPSService, cls).setUpClass() # ensure we can also run this test on a live system - so lets clean # out the current configuration :) cls.cli_delete(cls, base_path) cls.cli_delete(cls, pki_base) @classmethod def tearDownClass(cls): super(TestHTTPSService, cls).tearDownClass() call(f'sudo rm -f {nginx_tmp_site}') def tearDown(self): self.cli_delete(base_path) self.cli_delete(pki_base) self.cli_commit() # Check for stopped process self.assertFalse(process_named_running(PROCESS_NAME)) def test_certificate(self): cert_name = 'test_https' dh_name = 'dh-test' self.cli_set(base_path + ['certificates', 'certificate', cert_name]) # verify() - certificates do not exist (yet) with self.assertRaises(ConfigSessionError): self.cli_commit() self.cli_set(pki_base + ['certificate', cert_name, 'certificate', cert_data.replace('\n','')]) self.cli_set(pki_base + ['certificate', cert_name, 'private', 'key', key_data.replace('\n','')]) self.cli_set(base_path + ['certificates', 'dh-params', dh_name]) # verify() - dh-params do not exist (yet) with self.assertRaises(ConfigSessionError): self.cli_commit() self.cli_set(pki_base + ['dh', dh_name, 'parameters', dh_1024.replace('\n','')]) # verify() - dh-param minimum length is 2048 bit with self.assertRaises(ConfigSessionError): self.cli_commit() self.cli_set(pki_base + ['dh', dh_name, 'parameters', dh_2048.replace('\n','')]) self.cli_commit() self.assertTrue(process_named_running(PROCESS_NAME)) self.debug = False def test_api_missing_keys(self): self.cli_set(base_path + ['api']) self.assertRaises(ConfigSessionError, self.cli_commit) def test_api_incomplete_key(self): self.cli_set(base_path + ['api', 'keys', 'id', 'key-01']) self.assertRaises(ConfigSessionError, self.cli_commit) @ignore_warning(InsecureRequestWarning) def test_api_auth(self): - vhost_id = 'example' address = '127.0.0.1' - port = '443' # default value - name = 'localhost' + port = default_value(base_path + ['port']) key = 'MySuperSecretVyOS' self.cli_set(base_path + ['api', 'keys', 'id', 'key-01', 'key', key]) self.cli_set(base_path + ['listen-address', address]) self.cli_commit() nginx_config = read_file('/etc/nginx/sites-enabled/default') self.assertIn(f'listen {address}:{port} ssl;', nginx_config) self.assertIn(f'ssl_protocols TLSv1.2 TLSv1.3;', nginx_config) # default url = f'https://{address}/retrieve' payload = {'data': '{"op": "showConfig", "path": []}', 'key': f'{key}'} headers = {} r = request('POST', url, verify=False, headers=headers, data=payload) # Must get HTTP code 200 on success self.assertEqual(r.status_code, 200) payload_invalid = {'data': '{"op": "showConfig", "path": []}', 'key': 'invalid'} r = request('POST', url, verify=False, headers=headers, data=payload_invalid) # Must get HTTP code 401 on invalid key (Unauthorized) self.assertEqual(r.status_code, 401) payload_no_key = {'data': '{"op": "showConfig", "path": []}'} r = request('POST', url, verify=False, headers=headers, data=payload_no_key) # Must get HTTP code 401 on missing key (Unauthorized) self.assertEqual(r.status_code, 401) # Check path config payload = {'data': '{"op": "showConfig", "path": ["system", "login"]}', 'key': f'{key}'} r = request('POST', url, verify=False, headers=headers, data=payload) response = r.json() vyos_user_exists = 'vyos' in response.get('data', {}).get('user', {}) self.assertTrue(vyos_user_exists, "The 'vyos' user does not exist in the response.") # GraphQL auth test: a missing key will return status code 400, as # 'key' is a non-nullable field in the schema; an incorrect key is # caught by the resolver, and returns success 'False', so one must # check the return value. self.cli_set(base_path + ['api', 'graphql']) self.cli_commit() graphql_url = f'https://{address}/graphql' query_valid_key = f""" {{ SystemStatus (data: {{key: "{key}"}}) {{ success errors data {{ result }} }} }} """ r = request('POST', graphql_url, verify=False, headers=headers, json={'query': query_valid_key}) success = r.json()['data']['SystemStatus']['success'] self.assertTrue(success) query_invalid_key = """ { SystemStatus (data: {key: "invalid"}) { success errors data { result } } } """ r = request('POST', graphql_url, verify=False, headers=headers, json={'query': query_invalid_key}) success = r.json()['data']['SystemStatus']['success'] self.assertFalse(success) query_no_key = """ { SystemStatus (data: {}) { success errors data { result } } } """ r = request('POST', graphql_url, verify=False, headers=headers, json={'query': query_no_key}) success = r.json()['data']['SystemStatus']['success'] self.assertFalse(success) # GraphQL token authentication test: request token; pass in header # of query. self.cli_set(base_path + ['api', 'graphql', 'authentication', 'type', 'token']) self.cli_commit() mutation = """ mutation { AuthToken (data: {username: "vyos", password: "vyos"}) { success errors data { result } } } """ r = request('POST', graphql_url, verify=False, headers=headers, json={'query': mutation}) token = r.json()['data']['AuthToken']['data']['result']['token'] headers = {'Authorization': f'Bearer {token}'} query = """ { ShowVersion (data: {}) { success errors op_mode_error { name message vyos_code } data { result } } } """ r = request('POST', graphql_url, verify=False, headers=headers, json={'query': query}) success = r.json()['data']['ShowVersion']['success'] self.assertTrue(success) @ignore_warning(InsecureRequestWarning) def test_api_add_delete(self): address = '127.0.0.1' key = 'VyOS-key' url = f'https://{address}/retrieve' payload = {'data': '{"op": "showConfig", "path": []}', 'key': f'{key}'} headers = {} self.cli_set(base_path) self.cli_commit() r = request('POST', url, verify=False, headers=headers, data=payload) # api not configured; expect 503 self.assertEqual(r.status_code, 503) self.cli_set(base_path + ['api', 'keys', 'id', 'key-01', 'key', key]) self.cli_commit() sleep(2) r = request('POST', url, verify=False, headers=headers, data=payload) # api configured; expect 200 self.assertEqual(r.status_code, 200) self.cli_delete(base_path + ['api']) self.cli_commit() r = request('POST', url, verify=False, headers=headers, data=payload) # api deleted; expect 503 self.assertEqual(r.status_code, 503) @ignore_warning(InsecureRequestWarning) def test_api_show(self): address = '127.0.0.1' key = 'VyOS-key' url = f'https://{address}/show' headers = {} self.cli_set(base_path + ['api', 'keys', 'id', 'key-01', 'key', key]) self.cli_commit() payload = { 'data': '{"op": "show", "path": ["system", "image"]}', 'key': f'{key}', } r = request('POST', url, verify=False, headers=headers, data=payload) self.assertEqual(r.status_code, 200) @ignore_warning(InsecureRequestWarning) def test_api_generate(self): address = '127.0.0.1' key = 'VyOS-key' url = f'https://{address}/generate' headers = {} self.cli_set(base_path + ['api', 'keys', 'id', 'key-01', 'key', key]) self.cli_commit() payload = { 'data': '{"op": "generate", "path": ["macsec", "mka", "cak", "gcm-aes-256"]}', 'key': f'{key}', } r = request('POST', url, verify=False, headers=headers, data=payload) self.assertEqual(r.status_code, 200) @ignore_warning(InsecureRequestWarning) def test_api_configure(self): address = '127.0.0.1' key = 'VyOS-key' url = f'https://{address}/configure' headers = {} conf_interface = 'dum0' conf_address = '192.0.2.44/32' self.cli_set(base_path + ['api', 'keys', 'id', 'key-01', 'key', key]) self.cli_commit() payload_path = [ "interfaces", "dummy", f"{conf_interface}", "address", f"{conf_address}", ] payload = {'data': json.dumps({"op": "set", "path": payload_path}), 'key': key} r = request('POST', url, verify=False, headers=headers, data=payload) self.assertEqual(r.status_code, 200) @ignore_warning(InsecureRequestWarning) def test_api_config_file(self): address = '127.0.0.1' key = 'VyOS-key' url = f'https://{address}/config-file' headers = {} self.cli_set(base_path + ['api', 'keys', 'id', 'key-01', 'key', key]) self.cli_commit() payload = { 'data': '{"op": "save"}', 'key': f'{key}', } r = request('POST', url, verify=False, headers=headers, data=payload) self.assertEqual(r.status_code, 200) @ignore_warning(InsecureRequestWarning) def test_api_reset(self): address = '127.0.0.1' key = 'VyOS-key' url = f'https://{address}/reset' headers = {} self.cli_set(base_path + ['api', 'keys', 'id', 'key-01', 'key', key]) self.cli_commit() payload = { 'data': '{"op": "reset", "path": ["ip", "arp", "table"]}', 'key': f'{key}', } r = request('POST', url, verify=False, headers=headers, data=payload) self.assertEqual(r.status_code, 200) @ignore_warning(InsecureRequestWarning) def test_api_config_file_load_http(self): # Test load config from HTTP URL address = '127.0.0.1' key = 'VyOS-key' url = f'https://{address}/config-file' url_config = f'https://{address}/configure' headers = {} - tmp_file = 'tmp-config.boot' self.cli_set(base_path + ['api', 'keys', 'id', 'key-01', 'key', key]) self.cli_commit() # load config via HTTP requires nginx config call(f'sudo touch {nginx_tmp_site}') call(f'sudo chmod 666 {nginx_tmp_site}') write_file(nginx_tmp_site, nginx_conf_smoketest) call('sudo systemctl reload nginx') # save config payload = { 'data': '{"op": "save", "file": "/tmp/tmp-config.boot"}', 'key': f'{key}', } r = request('POST', url, verify=False, headers=headers, data=payload) self.assertEqual(r.status_code, 200) # change config payload = { 'data': '{"op": "set", "path": ["interfaces", "dummy", "dum1", "address", "192.0.2.31/32"]}', 'key': f'{key}', } r = request('POST', url_config, verify=False, headers=headers, data=payload) self.assertEqual(r.status_code, 200) # load config from URL payload = { 'data': '{"op": "load", "file": "http://localhost:8000/tmp-config.boot"}', 'key': f'{key}', } r = request('POST', url, verify=False, headers=headers, data=payload) self.assertEqual(r.status_code, 200) # cleanup tmp nginx conf call(f'sudo rm -f {nginx_tmp_site}') call('sudo systemctl reload nginx') if __name__ == '__main__': unittest.main(verbosity=5) diff --git a/smoketest/scripts/cli/test_service_ssh.py b/smoketest/scripts/cli/test_service_ssh.py index 031897c26..b09990c92 100755 --- a/smoketest/scripts/cli/test_service_ssh.py +++ b/smoketest/scripts/cli/test_service_ssh.py @@ -1,307 +1,309 @@ #!/usr/bin/env python3 # -# Copyright (C) 2019-2022 VyOS maintainers and contributors +# Copyright (C) 2019-2024 VyOS maintainers and contributors # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License version 2 or later as # published by the Free Software Foundation. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. import os import paramiko import re import unittest from pwd import getpwall from base_vyostest_shim import VyOSUnitTestSHIM from vyos.configsession import ConfigSessionError from vyos.utils.process import cmd from vyos.utils.process import is_systemd_service_running from vyos.utils.process import process_named_running from vyos.utils.file import read_file +from vyos.xml_ref import default_value PROCESS_NAME = 'sshd' SSHD_CONF = '/run/sshd/sshd_config' base_path = ['service', 'ssh'] key_rsa = '/etc/ssh/ssh_host_rsa_key' key_dsa = '/etc/ssh/ssh_host_dsa_key' key_ed25519 = '/etc/ssh/ssh_host_ed25519_key' def get_config_value(key): tmp = read_file(SSHD_CONF) tmp = re.findall(f'\n?{key}\s+(.*)', tmp) return tmp class TestServiceSSH(VyOSUnitTestSHIM.TestCase): @classmethod def setUpClass(cls): super(TestServiceSSH, cls).setUpClass() # ensure we can also run this test on a live system - so lets clean # out the current configuration :) cls.cli_delete(cls, base_path) cls.cli_delete(cls, ['vrf']) def tearDown(self): # Check for running process self.assertTrue(process_named_running(PROCESS_NAME)) # delete testing SSH config self.cli_delete(base_path) self.cli_delete(['vrf']) self.cli_commit() self.assertTrue(os.path.isfile(key_rsa)) self.assertTrue(os.path.isfile(key_dsa)) self.assertTrue(os.path.isfile(key_ed25519)) # Established SSH connections remains running after service is stopped. # We can not use process_named_running here - we rather need to check # that the systemd service is no longer running self.assertFalse(is_systemd_service_running(PROCESS_NAME)) def test_ssh_default(self): # Check if SSH service runs with default settings - used for checking # behavior of <defaultValue> in XML definition self.cli_set(base_path) # commit changes self.cli_commit() - # Check configured port - port = get_config_value('Port')[0] - self.assertEqual('22', port) # default value + # Check configured port agains CLI default value + port = get_config_value('Port') + cli_default = default_value(base_path + ['port']) + self.assertEqual(port, cli_default) def test_ssh_single_listen_address(self): # Check if SSH service can be configured and runs self.cli_set(base_path + ['port', '1234']) self.cli_set(base_path + ['disable-host-validation']) self.cli_set(base_path + ['disable-password-authentication']) self.cli_set(base_path + ['loglevel', 'verbose']) self.cli_set(base_path + ['client-keepalive-interval', '100']) self.cli_set(base_path + ['listen-address', '127.0.0.1']) # commit changes self.cli_commit() # Check configured port port = get_config_value('Port')[0] self.assertTrue("1234" in port) # Check DNS usage dns = get_config_value('UseDNS')[0] self.assertTrue("no" in dns) # Check PasswordAuthentication pwd = get_config_value('PasswordAuthentication')[0] self.assertTrue("no" in pwd) # Check loglevel loglevel = get_config_value('LogLevel')[0] self.assertTrue("VERBOSE" in loglevel) # Check listen address address = get_config_value('ListenAddress')[0] self.assertTrue("127.0.0.1" in address) # Check keepalive keepalive = get_config_value('ClientAliveInterval')[0] self.assertTrue("100" in keepalive) def test_ssh_multiple_listen_addresses(self): # Check if SSH service can be configured and runs with multiple # listen ports and listen-addresses ports = ['22', '2222', '2223', '2224'] for port in ports: self.cli_set(base_path + ['port', port]) addresses = ['127.0.0.1', '::1'] for address in addresses: self.cli_set(base_path + ['listen-address', address]) # commit changes self.cli_commit() # Check configured port tmp = get_config_value('Port') for port in ports: self.assertIn(port, tmp) # Check listen address tmp = get_config_value('ListenAddress') for address in addresses: self.assertIn(address, tmp) def test_ssh_vrf_single(self): vrf = 'mgmt' # Check if SSH service can be bound to given VRF self.cli_set(base_path + ['vrf', vrf]) # VRF does yet not exist - an error must be thrown with self.assertRaises(ConfigSessionError): self.cli_commit() self.cli_set(['vrf', 'name', vrf, 'table', '1338']) # commit changes self.cli_commit() # Check for process in VRF tmp = cmd(f'ip vrf pids {vrf}') self.assertIn(PROCESS_NAME, tmp) def test_ssh_vrf_multi(self): # Check if SSH service can be bound to multiple VRFs vrfs = ['red', 'blue', 'green'] for vrf in vrfs: self.cli_set(base_path + ['vrf', vrf]) # VRF does yet not exist - an error must be thrown with self.assertRaises(ConfigSessionError): self.cli_commit() table = 12345 for vrf in vrfs: self.cli_set(['vrf', 'name', vrf, 'table', str(table)]) table += 1 # commit changes self.cli_commit() # Check for process in VRF for vrf in vrfs: tmp = cmd(f'ip vrf pids {vrf}') self.assertIn(PROCESS_NAME, tmp) def test_ssh_login(self): # Perform SSH login and command execution with a predefined user. The # result (output of uname -a) must match the output if the command is # run natively. # # We also try to login as an invalid user - this is not allowed to work. test_user = 'ssh_test' test_pass = 'v2i57DZs8idUwMN3VC92' test_command = 'uname -a' self.cli_set(base_path) self.cli_set(['system', 'login', 'user', test_user, 'authentication', 'plaintext-password', test_pass]) # commit changes self.cli_commit() # Login with proper credentials output, error = self.ssh_send_cmd(test_command, test_user, test_pass) # verify login self.assertFalse(error) self.assertEqual(output, cmd(test_command)) # Login with invalid credentials with self.assertRaises(paramiko.ssh_exception.AuthenticationException): output, error = self.ssh_send_cmd(test_command, 'invalid_user', 'invalid_password') self.cli_delete(['system', 'login', 'user', test_user]) self.cli_commit() # After deletion the test user is not allowed to remain in /etc/passwd usernames = [x[0] for x in getpwall()] self.assertNotIn(test_user, usernames) def test_ssh_dynamic_protection(self): # check sshguard service SSHGUARD_CONFIG = '/etc/sshguard/sshguard.conf' SSHGUARD_WHITELIST = '/etc/sshguard/whitelist' SSHGUARD_PROCESS = 'sshguard' block_time = '123' detect_time = '1804' port = '22' threshold = '10' allow_list = ['192.0.2.0/24', '2001:db8::/48'] self.cli_set(base_path + ['dynamic-protection', 'block-time', block_time]) self.cli_set(base_path + ['dynamic-protection', 'detect-time', detect_time]) self.cli_set(base_path + ['dynamic-protection', 'threshold', threshold]) for allow in allow_list: self.cli_set(base_path + ['dynamic-protection', 'allow-from', allow]) # commit changes self.cli_commit() # Check configured port tmp = get_config_value('Port') self.assertIn(port, tmp) # Check sshgurad service self.assertTrue(process_named_running(SSHGUARD_PROCESS)) sshguard_lines = [ f'THRESHOLD={threshold}', f'BLOCK_TIME={block_time}', f'DETECTION_TIME={detect_time}' ] tmp_sshguard_conf = read_file(SSHGUARD_CONFIG) for line in sshguard_lines: self.assertIn(line, tmp_sshguard_conf) tmp_whitelist_conf = read_file(SSHGUARD_WHITELIST) for allow in allow_list: self.assertIn(allow, tmp_whitelist_conf) # Delete service ssh dynamic-protection # but not service ssh itself self.cli_delete(base_path + ['dynamic-protection']) self.cli_commit() self.assertFalse(process_named_running(SSHGUARD_PROCESS)) # Network Device Collaborative Protection Profile def test_ssh_ndcpp(self): ciphers = ['aes128-cbc', 'aes128-ctr', 'aes256-cbc', 'aes256-ctr'] host_key_algs = ['sk-ssh-ed25519@openssh.com', 'ssh-rsa', 'ssh-ed25519'] kexes = ['diffie-hellman-group14-sha1', 'ecdh-sha2-nistp256', 'ecdh-sha2-nistp384', 'ecdh-sha2-nistp521'] macs = ['hmac-sha1', 'hmac-sha2-256', 'hmac-sha2-512'] rekey_time = '60' rekey_data = '1024' for cipher in ciphers: self.cli_set(base_path + ['ciphers', cipher]) for host_key in host_key_algs: self.cli_set(base_path + ['hostkey-algorithm', host_key]) for kex in kexes: self.cli_set(base_path + ['key-exchange', kex]) for mac in macs: self.cli_set(base_path + ['mac', mac]) # Optional rekey parameters self.cli_set(base_path + ['rekey', 'data', rekey_data]) self.cli_set(base_path + ['rekey', 'time', rekey_time]) # commit changes self.cli_commit() ssh_lines = ['Ciphers aes128-cbc,aes128-ctr,aes256-cbc,aes256-ctr', 'HostKeyAlgorithms sk-ssh-ed25519@openssh.com,ssh-rsa,ssh-ed25519', 'MACs hmac-sha1,hmac-sha2-256,hmac-sha2-512', 'KexAlgorithms diffie-hellman-group14-sha1,ecdh-sha2-nistp256,ecdh-sha2-nistp384,ecdh-sha2-nistp521', 'RekeyLimit 1024M 60M' ] tmp_sshd_conf = read_file(SSHD_CONF) for line in ssh_lines: self.assertIn(line, tmp_sshd_conf) if __name__ == '__main__': unittest.main(verbosity=2)