diff --git a/smoketest/scripts/cli/test_protocols_babel.py b/smoketest/scripts/cli/test_protocols_babel.py index 107e262cc..12f6ecf38 100755 --- a/smoketest/scripts/cli/test_protocols_babel.py +++ b/smoketest/scripts/cli/test_protocols_babel.py @@ -1,218 +1,219 @@ #!/usr/bin/env python3 # # Copyright (C) 2024 VyOS maintainers and contributors # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License version 2 or later as # published by the Free Software Foundation. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. import unittest from base_vyostest_shim import VyOSUnitTestSHIM from vyos.ifconfig import Section from vyos.frrender import babel_daemon from vyos.utils.process import process_named_running from vyos.xml_ref import default_value base_path = ['protocols', 'babel'] class TestProtocolsBABEL(VyOSUnitTestSHIM.TestCase): @classmethod def setUpClass(cls): cls._interfaces = Section.interfaces('ethernet', vlan=False) # call base-classes classmethod super(TestProtocolsBABEL, cls).setUpClass() # Retrieve FRR daemon PID - it is not allowed to crash, thus PID must remain the same cls.daemon_pid = process_named_running(babel_daemon) # ensure we can also run this test on a live system - so lets clean # out the current configuration :) cls.cli_delete(cls, base_path) cls.cli_delete(cls, ['policy', 'prefix-list']) cls.cli_delete(cls, ['policy', 'prefix-list6']) def tearDown(self): # always destroy the entire babel configuration to make the processes # life as hard as possible self.cli_delete(base_path) self.cli_delete(['policy', 'prefix-list']) self.cli_delete(['policy', 'prefix-list6']) self.cli_commit() # check process health and continuity self.assertEqual(self.daemon_pid, process_named_running(babel_daemon)) - def test_babel_interfaces(self): - def_update_interval = default_value(base_path + ['interface', 'eth0', 'update-interval']) - channel = '20' - hello_interval = '1000' - max_rtt_penalty = '100' - rtt_decay = '23' - rtt_max = '119' - rtt_min = '11' - rxcost = '40000' - type = 'wired' + def test_01_basic(self): + diversity_factor = '64' + resend_delay = '100' + smoothing_half_life = '400' - for interface in self._interfaces: - self.cli_set(base_path + ['interface', interface]) - self.cli_set(base_path + ['interface', interface, 'channel', channel]) - self.cli_set(base_path + ['interface', interface, 'enable-timestamps']) - self.cli_set(base_path + ['interface', interface, 'hello-interval', hello_interval]) - self.cli_set(base_path + ['interface', interface, 'max-rtt-penalty', max_rtt_penalty]) - self.cli_set(base_path + ['interface', interface, 'rtt-decay', rtt_decay]) - self.cli_set(base_path + ['interface', interface, 'rtt-max', rtt_max]) - self.cli_set(base_path + ['interface', interface, 'rtt-min', rtt_min]) - self.cli_set(base_path + ['interface', interface, 'enable-timestamps']) - self.cli_set(base_path + ['interface', interface, 'rxcost', rxcost]) - self.cli_set(base_path + ['interface', interface, 'split-horizon', 'disable']) - self.cli_set(base_path + ['interface', interface, 'type', type]) + self.cli_set(base_path + ['parameters', 'diversity']) + self.cli_set(base_path + ['parameters', 'diversity-factor', diversity_factor]) + self.cli_set(base_path + ['parameters', 'resend-delay', resend_delay]) + self.cli_set(base_path + ['parameters', 'smoothing-half-life', smoothing_half_life]) self.cli_commit() frrconfig = self.getFRRconfig('router babel', endsection='^exit', daemon=babel_daemon) - for interface in self._interfaces: - self.assertIn(f' network {interface}', frrconfig) - - iface_config = self.getFRRconfig(f'interface {interface}', endsection='^exit', daemon=babel_daemon) - self.assertIn(f' babel channel {channel}', iface_config) - self.assertIn(f' babel enable-timestamps', iface_config) - self.assertIn(f' babel update-interval {def_update_interval}', iface_config) - self.assertIn(f' babel hello-interval {hello_interval}', iface_config) - self.assertIn(f' babel rtt-decay {rtt_decay}', iface_config) - self.assertIn(f' babel rtt-max {rtt_max}', iface_config) - self.assertIn(f' babel rtt-min {rtt_min}', iface_config) - self.assertIn(f' babel rxcost {rxcost}', iface_config) - self.assertIn(f' babel max-rtt-penalty {max_rtt_penalty}', iface_config) - self.assertIn(f' no babel split-horizon', iface_config) - self.assertIn(f' babel {type}', iface_config) + self.assertIn(f' babel diversity', frrconfig) + self.assertIn(f' babel diversity-factor {diversity_factor}', frrconfig) + self.assertIn(f' babel resend-delay {resend_delay}', frrconfig) + self.assertIn(f' babel smoothing-half-life {smoothing_half_life}', frrconfig) - def test_babel_redistribute(self): + def test_02_redistribute(self): ipv4_protos = ['bgp', 'connected', 'isis', 'kernel', 'ospf', 'rip', 'static'] ipv6_protos = ['bgp', 'connected', 'isis', 'kernel', 'ospfv3', 'ripng', 'static'] + self.cli_set(base_path + ['interface', self._interfaces[0], 'enable-timestamps']) + for protocol in ipv4_protos: self.cli_set(base_path + ['redistribute', 'ipv4', protocol]) for protocol in ipv6_protos: self.cli_set(base_path + ['redistribute', 'ipv6', protocol]) self.cli_commit() frrconfig = self.getFRRconfig('router babel', endsection='^exit', daemon=babel_daemon, empty_retry=5) for protocol in ipv4_protos: self.assertIn(f' redistribute ipv4 {protocol}', frrconfig) for protocol in ipv6_protos: if protocol == 'ospfv3': protocol = 'ospf6' self.assertIn(f' redistribute ipv6 {protocol}', frrconfig) - def test_babel_basic(self): - diversity_factor = '64' - resend_delay = '100' - smoothing_half_life = '400' - - self.cli_set(base_path + ['parameters', 'diversity']) - self.cli_set(base_path + ['parameters', 'diversity-factor', diversity_factor]) - self.cli_set(base_path + ['parameters', 'resend-delay', resend_delay]) - self.cli_set(base_path + ['parameters', 'smoothing-half-life', smoothing_half_life]) - - self.cli_commit() - - frrconfig = self.getFRRconfig('router babel', endsection='^exit', daemon=babel_daemon) - self.assertIn(f' babel diversity', frrconfig) - self.assertIn(f' babel diversity-factor {diversity_factor}', frrconfig) - self.assertIn(f' babel resend-delay {resend_delay}', frrconfig) - self.assertIn(f' babel smoothing-half-life {smoothing_half_life}', frrconfig) - - def test_babel_distribute_list(self): + def test_03_distribute_list(self): access_list_in4 = '40' access_list_out4 = '50' access_list_in4_iface = '44' access_list_out4_iface = '55' access_list_in6 = 'AL-foo-in6' access_list_out6 = 'AL-foo-out6' prefix_list_in4 = 'PL-foo-in4' prefix_list_out4 = 'PL-foo-out4' prefix_list_in6 = 'PL-foo-in6' prefix_list_out6 = 'PL-foo-out6' self.cli_set(['policy', 'access-list', access_list_in4]) self.cli_set(['policy', 'access-list', access_list_out4]) self.cli_set(['policy', 'access-list6', access_list_in6]) self.cli_set(['policy', 'access-list6', access_list_out6]) self.cli_set(['policy', 'access-list', f'{access_list_in4_iface}']) self.cli_set(['policy', 'access-list', f'{access_list_out4_iface}']) self.cli_set(['policy', 'prefix-list', prefix_list_in4]) self.cli_set(['policy', 'prefix-list', prefix_list_out4]) self.cli_set(['policy', 'prefix-list6', prefix_list_in6]) self.cli_set(['policy', 'prefix-list6', prefix_list_out6]) self.cli_set(base_path + ['distribute-list', 'ipv4', 'access-list', 'in', access_list_in4]) self.cli_set(base_path + ['distribute-list', 'ipv4', 'access-list', 'out', access_list_out4]) self.cli_set(base_path + ['distribute-list', 'ipv6', 'access-list', 'in', access_list_in6]) self.cli_set(base_path + ['distribute-list', 'ipv6', 'access-list', 'out', access_list_out6]) self.cli_set(base_path + ['distribute-list', 'ipv4', 'prefix-list', 'in', prefix_list_in4]) self.cli_set(base_path + ['distribute-list', 'ipv4', 'prefix-list', 'out', prefix_list_out4]) self.cli_set(base_path + ['distribute-list', 'ipv6', 'prefix-list', 'in', prefix_list_in6]) self.cli_set(base_path + ['distribute-list', 'ipv6', 'prefix-list', 'out', prefix_list_out6]) for interface in self._interfaces: self.cli_set(base_path + ['interface', interface]) self.cli_set(['policy', 'access-list6', f'{access_list_in6}-{interface}']) self.cli_set(['policy', 'access-list6', f'{access_list_out6}-{interface}']) self.cli_set(['policy', 'prefix-list', f'{prefix_list_in4}-{interface}']) self.cli_set(['policy', 'prefix-list', f'{prefix_list_out4}-{interface}']) self.cli_set(['policy', 'prefix-list6', f'{prefix_list_in6}-{interface}']) self.cli_set(['policy', 'prefix-list6', f'{prefix_list_out6}-{interface}']) tmp_path = base_path + ['distribute-list', 'ipv4', 'interface', interface] self.cli_set(tmp_path + ['access-list', 'in', f'{access_list_in4_iface}']) self.cli_set(tmp_path + ['access-list', 'out', f'{access_list_out4_iface}']) self.cli_set(tmp_path + ['prefix-list', 'in', f'{prefix_list_in4}-{interface}']) self.cli_set(tmp_path + ['prefix-list', 'out', f'{prefix_list_out4}-{interface}']) tmp_path = base_path + ['distribute-list', 'ipv6', 'interface', interface] self.cli_set(tmp_path + ['access-list', 'in', f'{access_list_in6}-{interface}']) self.cli_set(tmp_path + ['access-list', 'out', f'{access_list_out6}-{interface}']) self.cli_set(tmp_path + ['prefix-list', 'in', f'{prefix_list_in6}-{interface}']) self.cli_set(tmp_path + ['prefix-list', 'out', f'{prefix_list_out6}-{interface}']) self.cli_commit() frrconfig = self.getFRRconfig('router babel', endsection='^exit', daemon=babel_daemon) self.assertIn(f' distribute-list {access_list_in4} in', frrconfig) self.assertIn(f' distribute-list {access_list_out4} out', frrconfig) self.assertIn(f' ipv6 distribute-list {access_list_in6} in', frrconfig) self.assertIn(f' ipv6 distribute-list {access_list_out6} out', frrconfig) self.assertIn(f' distribute-list prefix {prefix_list_in4} in', frrconfig) self.assertIn(f' distribute-list prefix {prefix_list_out4} out', frrconfig) self.assertIn(f' ipv6 distribute-list prefix {prefix_list_in6} in', frrconfig) self.assertIn(f' ipv6 distribute-list prefix {prefix_list_out6} out', frrconfig) for interface in self._interfaces: self.assertIn(f' distribute-list {access_list_in4_iface} in {interface}', frrconfig) self.assertIn(f' distribute-list {access_list_out4_iface} out {interface}', frrconfig) self.assertIn(f' ipv6 distribute-list {access_list_in6}-{interface} in {interface}', frrconfig) self.assertIn(f' ipv6 distribute-list {access_list_out6}-{interface} out {interface}', frrconfig) self.assertIn(f' distribute-list prefix {prefix_list_in4}-{interface} in {interface}', frrconfig) self.assertIn(f' distribute-list prefix {prefix_list_out4}-{interface} out {interface}', frrconfig) self.assertIn(f' ipv6 distribute-list prefix {prefix_list_in6}-{interface} in {interface}', frrconfig) self.assertIn(f' ipv6 distribute-list prefix {prefix_list_out6}-{interface} out {interface}', frrconfig) + def test_04_interfaces(self): + def_update_interval = default_value(base_path + ['interface', 'eth0', 'update-interval']) + channel = '20' + hello_interval = '1000' + max_rtt_penalty = '100' + rtt_decay = '23' + rtt_max = '119' + rtt_min = '11' + rxcost = '40000' + type = 'wired' + + for interface in self._interfaces: + self.cli_set(base_path + ['interface', interface]) + self.cli_set(base_path + ['interface', interface, 'channel', channel]) + self.cli_set(base_path + ['interface', interface, 'enable-timestamps']) + self.cli_set(base_path + ['interface', interface, 'hello-interval', hello_interval]) + self.cli_set(base_path + ['interface', interface, 'max-rtt-penalty', max_rtt_penalty]) + self.cli_set(base_path + ['interface', interface, 'rtt-decay', rtt_decay]) + self.cli_set(base_path + ['interface', interface, 'rtt-max', rtt_max]) + self.cli_set(base_path + ['interface', interface, 'rtt-min', rtt_min]) + self.cli_set(base_path + ['interface', interface, 'rxcost', rxcost]) + self.cli_set(base_path + ['interface', interface, 'split-horizon', 'disable']) + self.cli_set(base_path + ['interface', interface, 'type', type]) + + self.cli_commit() + + frrconfig = self.getFRRconfig('router babel', endsection='^exit', daemon=babel_daemon) + for interface in self._interfaces: + self.assertIn(f' network {interface}', frrconfig) + + iface_config = self.getFRRconfig(f'interface {interface}', endsection='^exit', daemon=babel_daemon) + self.assertIn(f' babel channel {channel}', iface_config) + self.assertIn(f' babel enable-timestamps', iface_config) + self.assertIn(f' babel update-interval {def_update_interval}', iface_config) + self.assertIn(f' babel hello-interval {hello_interval}', iface_config) + self.assertIn(f' babel rtt-decay {rtt_decay}', iface_config) + self.assertIn(f' babel rtt-max {rtt_max}', iface_config) + self.assertIn(f' babel rtt-min {rtt_min}', iface_config) + self.assertIn(f' babel rxcost {rxcost}', iface_config) + self.assertIn(f' babel max-rtt-penalty {max_rtt_penalty}', iface_config) + self.assertIn(f' no babel split-horizon', iface_config) + self.assertIn(f' babel {type}', iface_config) + if __name__ == '__main__': unittest.main(verbosity=2) diff --git a/smoketest/scripts/cli/test_service_dns_dynamic.py b/smoketest/scripts/cli/test_service_dns_dynamic.py index 3e14976fc..3ce459e44 100755 --- a/smoketest/scripts/cli/test_service_dns_dynamic.py +++ b/smoketest/scripts/cli/test_service_dns_dynamic.py @@ -1,351 +1,350 @@ #!/usr/bin/env python3 # # Copyright (C) 2019-2024 VyOS maintainers and contributors # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License version 2 or later as # published by the Free Software Foundation. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. -import os import unittest import tempfile from base_vyostest_shim import VyOSUnitTestSHIM from vyos.configsession import ConfigSessionError from vyos.utils.process import cmd from vyos.utils.process import process_named_running DDCLIENT_SYSTEMD_UNIT = '/run/systemd/system/ddclient.service.d/override.conf' DDCLIENT_CONF = '/run/ddclient/ddclient.conf' DDCLIENT_PNAME = 'ddclient' base_path = ['service', 'dns', 'dynamic'] name_path = base_path + ['name'] server = 'ddns.vyos.io' hostname = 'test.ddns.vyos.io' zone = 'vyos.io' username = 'vyos_user' password = 'paSS_@4ord' ttl = '300' interface = 'eth0' class TestServiceDDNS(VyOSUnitTestSHIM.TestCase): def setUp(self): # Always start with a clean CLI instance self.cli_delete(base_path) def tearDown(self): # Check for running process self.assertTrue(process_named_running(DDCLIENT_PNAME)) # Delete DDNS configuration self.cli_delete(base_path) self.cli_commit() # Check for process not running anymore self.assertFalse(process_named_running(DDCLIENT_PNAME)) # IPv4 standard DDNS service configuration def test_01_dyndns_service_standard(self): services = {'cloudflare': {'protocol': 'cloudflare'}, 'freedns': {'protocol': 'freedns', 'username': username}, 'zoneedit': {'protocol': 'zoneedit1', 'username': username}} for svc, details in services.items(): self.cli_set(name_path + [svc, 'address', 'interface', interface]) self.cli_set(name_path + [svc, 'host-name', hostname]) self.cli_set(name_path + [svc, 'password', password]) for opt, value in details.items(): self.cli_set(name_path + [svc, opt, value]) # 'zone' option is supported by 'cloudfare' and 'zoneedit1', but not 'freedns' self.cli_set(name_path + [svc, 'zone', zone]) if details['protocol'] in ['cloudflare', 'zoneedit1']: pass else: # exception is raised for unsupported ones with self.assertRaises(ConfigSessionError): self.cli_commit() self.cli_delete(name_path + [svc, 'zone']) # 'ttl' option is supported by 'cloudfare', but not 'freedns' and 'zoneedit' self.cli_set(name_path + [svc, 'ttl', ttl]) if details['protocol'] == 'cloudflare': pass else: # exception is raised for unsupported ones with self.assertRaises(ConfigSessionError): self.cli_commit() self.cli_delete(name_path + [svc, 'ttl']) # commit changes self.cli_commit() # Check the generating config parameters ddclient_conf = cmd(f'sudo cat {DDCLIENT_CONF}') # default value 300 seconds self.assertIn(f'daemon=300', ddclient_conf) self.assertIn(f'usev4=ifv4', ddclient_conf) self.assertIn(f'ifv4={interface}', ddclient_conf) self.assertIn(f'password=\'{password}\'', ddclient_conf) for opt in details.keys(): if opt == 'username': login = details[opt] self.assertIn(f'login={login}', ddclient_conf) else: tmp = details[opt] self.assertIn(f'{opt}={tmp}', ddclient_conf) # IPv6 only DDNS service configuration def test_02_dyndns_service_ipv6(self): interval = '60' svc_path = name_path + ['dynv6'] proto = 'dyndns2' ip_version = 'ipv6' wait_time = '600' expiry_time_good = '3600' expiry_time_bad = '360' self.cli_set(base_path + ['interval', interval]) self.cli_set(svc_path + ['address', 'interface', interface]) self.cli_set(svc_path + ['ip-version', ip_version]) self.cli_set(svc_path + ['protocol', proto]) self.cli_set(svc_path + ['server', server]) self.cli_set(svc_path + ['username', username]) self.cli_set(svc_path + ['password', password]) self.cli_set(svc_path + ['host-name', hostname]) self.cli_set(svc_path + ['wait-time', wait_time]) # expiry-time must be greater than wait-time, exception is raised otherwise with self.assertRaises(ConfigSessionError): self.cli_set(svc_path + ['expiry-time', expiry_time_bad]) self.cli_commit() self.cli_set(svc_path + ['expiry-time', expiry_time_good]) # commit changes self.cli_commit() # Check the generating config parameters ddclient_conf = cmd(f'sudo cat {DDCLIENT_CONF}') self.assertIn(f'daemon={interval}', ddclient_conf) self.assertIn(f'usev6=ifv6', ddclient_conf) self.assertIn(f'ifv6={interface}', ddclient_conf) self.assertIn(f'protocol={proto}', ddclient_conf) self.assertIn(f'server={server}', ddclient_conf) self.assertIn(f'login={username}', ddclient_conf) self.assertIn(f'password=\'{password}\'', ddclient_conf) self.assertIn(f'min-interval={wait_time}', ddclient_conf) self.assertIn(f'max-interval={expiry_time_good}', ddclient_conf) # IPv4+IPv6 dual DDNS service configuration def test_03_dyndns_service_dual_stack(self): services = {'cloudflare': {'protocol': 'cloudflare', 'zone': zone}, 'freedns': {'protocol': 'freedns', 'username': username}, 'google': {'protocol': 'googledomains', 'username': username}} ip_version = 'both' for name, details in services.items(): self.cli_set(name_path + [name, 'address', 'interface', interface]) self.cli_set(name_path + [name, 'host-name', hostname]) self.cli_set(name_path + [name, 'password', password]) for opt, value in details.items(): self.cli_set(name_path + [name, opt, value]) # Dual stack is supported by 'cloudfare' and 'freedns' but not 'googledomains' # exception is raised for unsupported ones self.cli_set(name_path + [name, 'ip-version', ip_version]) if details['protocol'] not in ['cloudflare', 'freedns']: with self.assertRaises(ConfigSessionError): self.cli_commit() self.cli_delete(name_path + [name, 'ip-version']) # commit changes self.cli_commit() # Check the generating config parameters ddclient_conf = cmd(f'sudo cat {DDCLIENT_CONF}') if details['protocol'] not in ['cloudflare', 'freedns']: self.assertIn(f'usev4=ifv4', ddclient_conf) self.assertIn(f'ifv4={interface}', ddclient_conf) else: self.assertIn(f'usev4=ifv4', ddclient_conf) self.assertIn(f'usev6=ifv6', ddclient_conf) self.assertIn(f'ifv4={interface}', ddclient_conf) self.assertIn(f'ifv6={interface}', ddclient_conf) self.assertIn(f'password=\'{password}\'', ddclient_conf) for opt in details.keys(): if opt == 'username': login = details[opt] self.assertIn(f'login={login}', ddclient_conf) else: tmp = details[opt] self.assertIn(f'{opt}={tmp}', ddclient_conf) def test_04_dyndns_rfc2136(self): # Check if DDNS service can be configured and runs svc_path = name_path + ['vyos'] proto = 'nsupdate' with tempfile.NamedTemporaryFile(prefix='/config/auth/') as key_file: key_file.write(b'S3cretKey') self.cli_set(svc_path + ['address', 'interface', interface]) self.cli_set(svc_path + ['protocol', proto]) self.cli_set(svc_path + ['server', server]) self.cli_set(svc_path + ['zone', zone]) self.cli_set(svc_path + ['key', key_file.name]) self.cli_set(svc_path + ['ttl', ttl]) self.cli_set(svc_path + ['host-name', hostname]) # commit changes self.cli_commit() # Check some generating config parameters ddclient_conf = cmd(f'sudo cat {DDCLIENT_CONF}') self.assertIn(f'use=if', ddclient_conf) self.assertIn(f'if={interface}', ddclient_conf) self.assertIn(f'protocol={proto}', ddclient_conf) self.assertIn(f'server={server}', ddclient_conf) self.assertIn(f'zone={zone}', ddclient_conf) self.assertIn(f'password=\'{key_file.name}\'', ddclient_conf) self.assertIn(f'ttl={ttl}', ddclient_conf) def test_05_dyndns_hostname(self): # Check if DDNS service can be configured and runs svc_path = name_path + ['namecheap'] proto = 'namecheap' hostnames = ['@', 'www', hostname, f'@.{hostname}'] for name in hostnames: self.cli_set(svc_path + ['address', 'interface', interface]) self.cli_set(svc_path + ['protocol', proto]) self.cli_set(svc_path + ['server', server]) self.cli_set(svc_path + ['username', username]) self.cli_set(svc_path + ['password', password]) self.cli_set(svc_path + ['host-name', name]) # commit changes self.cli_commit() # Check the generating config parameters ddclient_conf = cmd(f'sudo cat {DDCLIENT_CONF}') self.assertIn(f'protocol={proto}', ddclient_conf) self.assertIn(f'server={server}', ddclient_conf) self.assertIn(f'login={username}', ddclient_conf) self.assertIn(f'password=\'{password}\'', ddclient_conf) self.assertIn(f'{name}', ddclient_conf) def test_06_dyndns_web_options(self): # Check if DDNS service can be configured and runs svc_path = name_path + ['cloudflare'] proto = 'cloudflare' web_url = 'https://ifconfig.me/ip' web_skip = 'Current IP Address:' self.cli_set(svc_path + ['protocol', proto]) self.cli_set(svc_path + ['zone', zone]) self.cli_set(svc_path + ['password', password]) self.cli_set(svc_path + ['host-name', hostname]) # not specifying either 'interface' or 'web' will raise an exception with self.assertRaises(ConfigSessionError): self.cli_commit() self.cli_set(svc_path + ['address', 'web']) # specifying both 'interface' and 'web' will raise an exception as well with self.assertRaises(ConfigSessionError): self.cli_set(svc_path + ['address', 'interface', interface]) self.cli_commit() self.cli_delete(svc_path + ['address', 'interface']) self.cli_commit() # web option 'skip' is useless without the option 'url' with self.assertRaises(ConfigSessionError): self.cli_set(svc_path + ['address', 'web', 'skip', web_skip]) self.cli_commit() self.cli_set(svc_path + ['address', 'web', 'url', web_url]) self.cli_commit() # Check the generating config parameters ddclient_conf = cmd(f'sudo cat {DDCLIENT_CONF}') self.assertIn(f'usev4=webv4', ddclient_conf) self.assertIn(f'webv4={web_url}', ddclient_conf) self.assertIn(f'webv4-skip=\'{web_skip}\'', ddclient_conf) self.assertIn(f'protocol={proto}', ddclient_conf) self.assertIn(f'zone={zone}', ddclient_conf) self.assertIn(f'password=\'{password}\'', ddclient_conf) self.assertIn(f'{hostname}', ddclient_conf) def test_07_dyndns_dynamic_interface(self): # Check if DDNS service can be configured and runs svc_path = name_path + ['namecheap'] proto = 'namecheap' dyn_interface = 'pppoe587' self.cli_set(svc_path + ['address', 'interface', dyn_interface]) self.cli_set(svc_path + ['protocol', proto]) self.cli_set(svc_path + ['server', server]) self.cli_set(svc_path + ['username', username]) self.cli_set(svc_path + ['password', password]) self.cli_set(svc_path + ['host-name', hostname]) # Dynamic interface will raise a warning but still go through # XXX: We should have idiomatic class "ConfigSessionWarning" wrapping # "Warning" similar to "ConfigSessionError". # with self.assertWarns(Warning): # self.cli_commit() self.cli_commit() # Check the generating config parameters ddclient_conf = cmd(f'sudo cat {DDCLIENT_CONF}') self.assertIn(f'ifv4={dyn_interface}', ddclient_conf) self.assertIn(f'protocol={proto}', ddclient_conf) self.assertIn(f'server={server}', ddclient_conf) self.assertIn(f'login={username}', ddclient_conf) self.assertIn(f'password=\'{password}\'', ddclient_conf) self.assertIn(f'{hostname}', ddclient_conf) def test_08_dyndns_vrf(self): # Table number randomized, but should be within range 100-65535 vrf_table = '58710' vrf_name = f'vyos-test-{vrf_table}' svc_path = name_path + ['cloudflare'] proto = 'cloudflare' self.cli_set(['vrf', 'name', vrf_name, 'table', vrf_table]) self.cli_set(base_path + ['vrf', vrf_name]) self.cli_set(svc_path + ['address', 'interface', interface]) self.cli_set(svc_path + ['protocol', proto]) self.cli_set(svc_path + ['host-name', hostname]) self.cli_set(svc_path + ['zone', zone]) self.cli_set(svc_path + ['password', password]) # commit changes self.cli_commit() # Check for process in VRF systemd_override = cmd(f'cat {DDCLIENT_SYSTEMD_UNIT}') self.assertIn(f'ExecStart=ip vrf exec {vrf_name} /usr/bin/ddclient -file {DDCLIENT_CONF}', systemd_override) # Check for process in VRF proc = cmd(f'ip vrf pids {vrf_name}') self.assertIn(DDCLIENT_PNAME, proc) # Cleanup VRF self.cli_delete(['vrf', 'name', vrf_name]) if __name__ == '__main__': unittest.main(verbosity=2)