diff --git a/smoketest/scripts/cli/base_interfaces_test.py b/smoketest/scripts/cli/base_interfaces_test.py index 4072fd5c2..66b789e94 100644 --- a/smoketest/scripts/cli/base_interfaces_test.py +++ b/smoketest/scripts/cli/base_interfaces_test.py @@ -1,1138 +1,1156 @@ # Copyright (C) 2019-2024 VyOS maintainers and contributors # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License version 2 or later as # published by the Free Software Foundation. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. from netifaces import AF_INET from netifaces import AF_INET6 from netifaces import ifaddresses from netifaces import interfaces from base_vyostest_shim import VyOSUnitTestSHIM from vyos.configsession import ConfigSessionError from vyos.defaults import directories from vyos.ifconfig import Interface from vyos.ifconfig import Section from vyos.utils.file import read_file from vyos.utils.dict import dict_search from vyos.utils.process import process_named_running from vyos.utils.network import get_interface_config from vyos.utils.network import get_interface_vrf from vyos.utils.network import get_vrf_tableid from vyos.utils.process import cmd from vyos.utils.network import is_intf_addr_assigned from vyos.utils.network import is_ipv6_link_local from vyos.xml_ref import cli_defined dhclient_base_dir = directories['isc_dhclient_dir'] dhclient_process_name = 'dhclient' dhcp6c_base_dir = directories['dhcp6_client_dir'] dhcp6c_process_name = 'dhcp6c' def is_mirrored_to(interface, mirror_if, qdisc): """ Ask TC if we are mirroring traffic to a discrete interface. interface: source interface mirror_if: destination where we mirror our data to qdisc: must be ffff or 1 for ingress/egress """ if qdisc not in ['ffff', '1']: raise ValueError() ret_val = False tmp = cmd(f'tc -s -p filter ls dev {interface} parent {qdisc}: | grep mirred') tmp = tmp.lower() if mirror_if in tmp: ret_val = True return ret_val class BasicInterfaceTest: class TestCase(VyOSUnitTestSHIM.TestCase): _test_dhcp = False _test_ip = False _test_mtu = False _test_vlan = False _test_qinq = False _test_ipv6 = False _test_ipv6_pd = False _test_ipv6_dhcpc6 = False _test_mirror = False _test_vrf = False _base_path = [] _options = {} _interfaces = [] _qinq_range = ['10', '20', '30'] _vlan_range = ['100', '200', '300', '2000'] _test_addr = ['192.0.2.1/26', '192.0.2.255/31', '192.0.2.64/32', '2001:db8:1::ffff/64', '2001:db8:101::1/112'] _mirror_interfaces = [] # choose IPv6 minimum MTU value for tests - this must always work _mtu = '1280' @classmethod def setUpClass(cls): super(BasicInterfaceTest.TestCase, cls).setUpClass() # XXX the case of test_vif_8021q_mtu_limits, below, shows that # we should extend cli_defined to support more complex queries cls._test_vlan = cli_defined(cls._base_path, 'vif') cls._test_qinq = cli_defined(cls._base_path, 'vif-s') cls._test_dhcp = cli_defined(cls._base_path, 'dhcp-options') cls._test_ip = cli_defined(cls._base_path, 'ip') cls._test_ipv6 = cli_defined(cls._base_path, 'ipv6') cls._test_ipv6_dhcpc6 = cli_defined(cls._base_path, 'dhcpv6-options') cls._test_ipv6_pd = cli_defined(cls._base_path + ['dhcpv6-options'], 'pd') cls._test_mtu = cli_defined(cls._base_path, 'mtu') cls._test_vrf = cli_defined(cls._base_path, 'vrf') # Setup mirror interfaces for SPAN (Switch Port Analyzer) for span in cls._mirror_interfaces: section = Section.section(span) cls.cli_set(cls, ['interfaces', section, span]) @classmethod def tearDownClass(cls): # Tear down mirror interfaces for SPAN (Switch Port Analyzer) for span in cls._mirror_interfaces: section = Section.section(span) cls.cli_delete(cls, ['interfaces', section, span]) super(BasicInterfaceTest.TestCase, cls).tearDownClass() def tearDown(self): self.cli_delete(self._base_path) self.cli_commit() # Verify that no previously interface remained on the system for intf in self._interfaces: self.assertNotIn(intf, interfaces()) # No daemon started during tests should remain running for daemon in ['dhcp6c', 'dhclient']: # if _interface list is populated do a more fine grained search # by also checking the cmd arguments passed to the daemon if self._interfaces: for tmp in self._interfaces: self.assertFalse(process_named_running(daemon, tmp)) else: self.assertFalse(process_named_running(daemon)) def test_dhcp_disable_interface(self): if not self._test_dhcp: self.skipTest('not supported') # When interface is configured as admin down, it must be admin down # even when dhcpc starts on the given interface for interface in self._interfaces: self.cli_set(self._base_path + [interface, 'disable']) for option in self._options.get(interface, []): self.cli_set(self._base_path + [interface] + option.split()) # Also enable DHCP (ISC DHCP always places interface in admin up # state so we check that we do not start DHCP client. # https://vyos.dev/T2767 self.cli_set(self._base_path + [interface, 'address', 'dhcp']) self.cli_commit() # Validate interface state for interface in self._interfaces: flags = read_file(f'/sys/class/net/{interface}/flags') self.assertEqual(int(flags, 16) & 1, 0) def test_dhcp_client_options(self): if not self._test_dhcp or not self._test_vrf: self.skipTest('not supported') client_id = 'VyOS-router' distance = '100' hostname = 'vyos' vendor_class_id = 'vyos-vendor' user_class = 'vyos' for interface in self._interfaces: for option in self._options.get(interface, []): self.cli_set(self._base_path + [interface] + option.split()) self.cli_set(self._base_path + [interface, 'address', 'dhcp']) self.cli_set(self._base_path + [interface, 'dhcp-options', 'client-id', client_id]) self.cli_set(self._base_path + [interface, 'dhcp-options', 'default-route-distance', distance]) self.cli_set(self._base_path + [interface, 'dhcp-options', 'host-name', hostname]) self.cli_set(self._base_path + [interface, 'dhcp-options', 'vendor-class-id', vendor_class_id]) self.cli_set(self._base_path + [interface, 'dhcp-options', 'user-class', user_class]) self.cli_commit() for interface in self._interfaces: # Check if dhclient process runs dhclient_pid = process_named_running(dhclient_process_name, cmdline=interface, timeout=10) self.assertTrue(dhclient_pid) dhclient_config = read_file(f'{dhclient_base_dir}/dhclient_{interface}.conf') self.assertIn(f'request subnet-mask, broadcast-address, routers, domain-name-servers', dhclient_config) self.assertIn(f'require subnet-mask;', dhclient_config) self.assertIn(f'send host-name "{hostname}";', dhclient_config) self.assertIn(f'send dhcp-client-identifier "{client_id}";', dhclient_config) self.assertIn(f'send vendor-class-identifier "{vendor_class_id}";', dhclient_config) self.assertIn(f'send user-class "{user_class}";', dhclient_config) # and the commandline has the appropriate options cmdline = read_file(f'/proc/{dhclient_pid}/cmdline') self.assertIn(f'-e\x00IF_METRIC={distance}', cmdline) def test_dhcp_vrf(self): if not self._test_dhcp or not self._test_vrf: self.skipTest('not supported') vrf_name = 'purple4' self.cli_set(['vrf', 'name', vrf_name, 'table', '65000']) for interface in self._interfaces: for option in self._options.get(interface, []): self.cli_set(self._base_path + [interface] + option.split()) self.cli_set(self._base_path + [interface, 'address', 'dhcp']) self.cli_set(self._base_path + [interface, 'vrf', vrf_name]) self.cli_commit() # Validate interface state for interface in self._interfaces: tmp = get_interface_vrf(interface) self.assertEqual(tmp, vrf_name) # Check if dhclient process runs dhclient_pid = process_named_running(dhclient_process_name, cmdline=interface, timeout=10) self.assertTrue(dhclient_pid) # .. inside the appropriate VRF instance vrf_pids = cmd(f'ip vrf pids {vrf_name}') self.assertIn(str(dhclient_pid), vrf_pids) # and the commandline has the appropriate options cmdline = read_file(f'/proc/{dhclient_pid}/cmdline') self.assertIn('-e\x00IF_METRIC=210', cmdline) # 210 is the default value self.cli_delete(['vrf', 'name', vrf_name]) def test_dhcpv6_vrf(self): if not self._test_ipv6_dhcpc6 or not self._test_vrf: self.skipTest('not supported') vrf_name = 'purple6' self.cli_set(['vrf', 'name', vrf_name, 'table', '65001']) # When interface is configured as admin down, it must be admin down # even when dhcpc starts on the given interface for interface in self._interfaces: for option in self._options.get(interface, []): self.cli_set(self._base_path + [interface] + option.split()) self.cli_set(self._base_path + [interface, 'address', 'dhcpv6']) self.cli_set(self._base_path + [interface, 'vrf', vrf_name]) self.cli_commit() # Validate interface state for interface in self._interfaces: tmp = get_interface_vrf(interface) self.assertEqual(tmp, vrf_name) # Check if dhclient process runs tmp = process_named_running(dhcp6c_process_name, cmdline=interface, timeout=10) self.assertTrue(tmp) # .. inside the appropriate VRF instance vrf_pids = cmd(f'ip vrf pids {vrf_name}') self.assertIn(str(tmp), vrf_pids) self.cli_delete(['vrf', 'name', vrf_name]) def test_move_interface_between_vrf_instances(self): if not self._test_vrf: self.skipTest('not supported') vrf1_name = 'smoketest_mgmt1' vrf1_table = '5424' vrf2_name = 'smoketest_mgmt2' vrf2_table = '7412' self.cli_set(['vrf', 'name', vrf1_name, 'table', vrf1_table]) self.cli_set(['vrf', 'name', vrf2_name, 'table', vrf2_table]) # move interface into first VRF for interface in self._interfaces: for option in self._options.get(interface, []): self.cli_set(self._base_path + [interface] + option.split()) self.cli_set(self._base_path + [interface, 'vrf', vrf1_name]) self.cli_commit() # check that interface belongs to proper VRF for interface in self._interfaces: tmp = get_interface_vrf(interface) self.assertEqual(tmp, vrf1_name) tmp = get_interface_config(vrf1_name) self.assertEqual(int(vrf1_table), get_vrf_tableid(interface)) # move interface into second VRF for interface in self._interfaces: self.cli_set(self._base_path + [interface, 'vrf', vrf2_name]) self.cli_commit() # check that interface belongs to proper VRF for interface in self._interfaces: tmp = get_interface_vrf(interface) self.assertEqual(tmp, vrf2_name) tmp = get_interface_config(vrf2_name) self.assertEqual(int(vrf2_table), get_vrf_tableid(interface)) self.cli_delete(['vrf', 'name', vrf1_name]) self.cli_delete(['vrf', 'name', vrf2_name]) + def test_add_to_invalid_vrf(self): + if not self._test_vrf: + self.skipTest('not supported') + + # move interface into first VRF + for interface in self._interfaces: + for option in self._options.get(interface, []): + self.cli_set(self._base_path + [interface] + option.split()) + self.cli_set(self._base_path + [interface, 'vrf', 'invalid']) + + # check validate() - can not use a non-existing VRF + with self.assertRaises(ConfigSessionError): + self.cli_commit() + + for interface in self._interfaces: + self.cli_delete(self._base_path + [interface, 'vrf', 'invalid']) + self.cli_set(self._base_path + [interface, 'description', 'test_add_to_invalid_vrf']) + def test_span_mirror(self): if not self._mirror_interfaces: self.skipTest('not supported') # Check the two-way mirror rules of ingress and egress for mirror in self._mirror_interfaces: for interface in self._interfaces: self.cli_set(self._base_path + [interface, 'mirror', 'ingress', mirror]) self.cli_set(self._base_path + [interface, 'mirror', 'egress', mirror]) self.cli_commit() # Verify config for mirror in self._mirror_interfaces: for interface in self._interfaces: self.assertTrue(is_mirrored_to(interface, mirror, 'ffff')) self.assertTrue(is_mirrored_to(interface, mirror, '1')) def test_interface_disable(self): # Check if description can be added to interface and # can be read back for intf in self._interfaces: self.cli_set(self._base_path + [intf, 'disable']) for option in self._options.get(intf, []): self.cli_set(self._base_path + [intf] + option.split()) self.cli_commit() # Validate interface description for intf in self._interfaces: self.assertEqual(Interface(intf).get_admin_state(), 'down') def test_interface_description(self): # Check if description can be added to interface and # can be read back for intf in self._interfaces: test_string=f'Description-Test-{intf}' self.cli_set(self._base_path + [intf, 'description', test_string]) for option in self._options.get(intf, []): self.cli_set(self._base_path + [intf] + option.split()) self.cli_commit() # Validate interface description for intf in self._interfaces: test_string=f'Description-Test-{intf}' tmp = read_file(f'/sys/class/net/{intf}/ifalias') self.assertEqual(tmp, test_string) self.assertEqual(Interface(intf).get_alias(), test_string) self.cli_delete(self._base_path + [intf, 'description']) self.cli_commit() # Validate remove interface description "empty" for intf in self._interfaces: tmp = read_file(f'/sys/class/net/{intf}/ifalias') self.assertEqual(tmp, str()) self.assertEqual(Interface(intf).get_alias(), str()) # Test maximum interface description lengt (255 characters) test_string='abcdefghijklmnopqrstuvwxyz0123456789abcdefghijklmnopqrstuvwxyz0123456789abcdefghijklmnopqrstuvwxyz0123456789abcdefghijklmnopqrstuvwxyz0123456789abcdefghijklmnopqrstuvwxyz0123456789abcdefghijklmnopqrstuvwxyz0123456789abcdefghijklmnopqrstuvwxyz0123456789___' for intf in self._interfaces: self.cli_set(self._base_path + [intf, 'description', test_string]) for option in self._options.get(intf, []): self.cli_set(self._base_path + [intf] + option.split()) self.cli_commit() # Validate interface description for intf in self._interfaces: tmp = read_file(f'/sys/class/net/{intf}/ifalias') self.assertEqual(tmp, test_string) self.assertEqual(Interface(intf).get_alias(), test_string) def test_add_single_ip_address(self): addr = '192.0.2.0/31' for intf in self._interfaces: self.cli_set(self._base_path + [intf, 'address', addr]) for option in self._options.get(intf, []): self.cli_set(self._base_path + [intf] + option.split()) self.cli_commit() for intf in self._interfaces: self.assertTrue(is_intf_addr_assigned(intf, addr)) self.assertEqual(Interface(intf).get_admin_state(), 'up') def test_add_multiple_ip_addresses(self): # Add address for intf in self._interfaces: for option in self._options.get(intf, []): self.cli_set(self._base_path + [intf] + option.split()) for addr in self._test_addr: self.cli_set(self._base_path + [intf, 'address', addr]) self.cli_commit() # Validate address for intf in self._interfaces: for af in AF_INET, AF_INET6: for addr in ifaddresses(intf)[af]: # checking link local addresses makes no sense if is_ipv6_link_local(addr['addr']): continue self.assertTrue(is_intf_addr_assigned(intf, addr['addr'])) def test_ipv6_link_local_address(self): # Common function for IPv6 link-local address assignemnts if not self._test_ipv6: self.skipTest('not supported') for interface in self._interfaces: base = self._base_path + [interface] # just set the interface base without any option - some interfaces # (VTI) do not require any option to be brought up self.cli_set(base) for option in self._options.get(interface, []): self.cli_set(base + option.split()) # after commit we must have an IPv6 link-local address self.cli_commit() for interface in self._interfaces: self.assertIn(AF_INET6, ifaddresses(interface)) for addr in ifaddresses(interface)[AF_INET6]: self.assertTrue(is_ipv6_link_local(addr['addr'])) # disable IPv6 link-local address assignment for interface in self._interfaces: base = self._base_path + [interface] self.cli_set(base + ['ipv6', 'address', 'no-default-link-local']) # after commit we must have no IPv6 link-local address self.cli_commit() for interface in self._interfaces: self.assertNotIn(AF_INET6, ifaddresses(interface)) def test_interface_mtu(self): if not self._test_mtu: self.skipTest('not supported') for intf in self._interfaces: base = self._base_path + [intf] self.cli_set(base + ['mtu', self._mtu]) for option in self._options.get(intf, []): self.cli_set(base + option.split()) # commit interface changes self.cli_commit() # verify changed MTU for intf in self._interfaces: tmp = get_interface_config(intf) self.assertEqual(tmp['mtu'], int(self._mtu)) def test_mtu_1200_no_ipv6_interface(self): # Testcase if MTU can be changed to 1200 on non IPv6 # enabled interfaces if not self._test_mtu: self.skipTest('not supported') old_mtu = self._mtu self._mtu = '1200' for intf in self._interfaces: base = self._base_path + [intf] for option in self._options.get(intf, []): self.cli_set(base + option.split()) self.cli_set(base + ['mtu', self._mtu]) # check validate() - can not set low MTU if 'no-default-link-local' # is not set on CLI with self.assertRaises(ConfigSessionError): self.cli_commit() for intf in self._interfaces: base = self._base_path + [intf] self.cli_set(base + ['ipv6', 'address', 'no-default-link-local']) # commit interface changes self.cli_commit() # verify changed MTU for intf in self._interfaces: tmp = get_interface_config(intf) self.assertEqual(tmp['mtu'], int(self._mtu)) self._mtu = old_mtu def test_vif_8021q_interfaces(self): # XXX: This testcase is not allowed to run as first testcase, reason # is the Wireless test will first load the wifi kernel hwsim module # which creates a wlan0 and wlan1 interface which will fail the # tearDown() test in the end that no interface is allowed to survive! if not self._test_vlan: self.skipTest('not supported') for interface in self._interfaces: base = self._base_path + [interface] for option in self._options.get(interface, []): self.cli_set(base + option.split()) for vlan in self._vlan_range: base = self._base_path + [interface, 'vif', vlan] for address in self._test_addr: self.cli_set(base + ['address', address]) self.cli_commit() for intf in self._interfaces: for vlan in self._vlan_range: vif = f'{intf}.{vlan}' for address in self._test_addr: self.assertTrue(is_intf_addr_assigned(vif, address)) self.assertEqual(Interface(vif).get_admin_state(), 'up') # T4064: Delete interface addresses, keep VLAN interface for interface in self._interfaces: base = self._base_path + [interface] for vlan in self._vlan_range: base = self._base_path + [interface, 'vif', vlan] self.cli_delete(base + ['address']) self.cli_commit() # Verify no IP address is assigned for interface in self._interfaces: for vlan in self._vlan_range: vif = f'{intf}.{vlan}' for address in self._test_addr: self.assertFalse(is_intf_addr_assigned(vif, address)) def test_vif_8021q_mtu_limits(self): # XXX: This testcase is not allowed to run as first testcase, reason # is the Wireless test will first load the wifi kernel hwsim module # which creates a wlan0 and wlan1 interface which will fail the # tearDown() test in the end that no interface is allowed to survive! if not self._test_vlan or not self._test_mtu: self.skipTest('not supported') mtu_1500 = '1500' mtu_9000 = '9000' for interface in self._interfaces: base = self._base_path + [interface] self.cli_set(base + ['mtu', mtu_1500]) for option in self._options.get(interface, []): self.cli_set(base + option.split()) if 'source-interface' in option: iface = option.split()[-1] iface_type = Section.section(iface) self.cli_set(['interfaces', iface_type, iface, 'mtu', mtu_9000]) for vlan in self._vlan_range: base = self._base_path + [interface, 'vif', vlan] self.cli_set(base + ['mtu', mtu_9000]) # check validate() - Interface MTU "9000" too high, parent interface MTU is "1500"! with self.assertRaises(ConfigSessionError): self.cli_commit() # Change MTU on base interface to be the same as on the VIF interface for interface in self._interfaces: base = self._base_path + [interface] self.cli_set(base + ['mtu', mtu_9000]) self.cli_commit() # Verify MTU on base and VIF interfaces for interface in self._interfaces: tmp = get_interface_config(interface) self.assertEqual(tmp['mtu'], int(mtu_9000)) for vlan in self._vlan_range: tmp = get_interface_config(f'{interface}.{vlan}') self.assertEqual(tmp['mtu'], int(mtu_9000)) def test_vif_8021q_qos_change(self): # XXX: This testcase is not allowed to run as first testcase, reason # is the Wireless test will first load the wifi kernel hwsim module # which creates a wlan0 and wlan1 interface which will fail the # tearDown() test in the end that no interface is allowed to survive! if not self._test_vlan: self.skipTest('not supported') for interface in self._interfaces: base = self._base_path + [interface] for option in self._options.get(interface, []): self.cli_set(base + option.split()) for vlan in self._vlan_range: base = self._base_path + [interface, 'vif', vlan] self.cli_set(base + ['ingress-qos', '0:1']) self.cli_set(base + ['egress-qos', '1:6']) self.cli_commit() for intf in self._interfaces: for vlan in self._vlan_range: vif = f'{intf}.{vlan}' tmp = get_interface_config(f'{vif}') tmp2 = dict_search('linkinfo.info_data.ingress_qos', tmp) for item in tmp2 if tmp2 else []: from_key = item['from'] to_key = item['to'] self.assertEqual(from_key, 0) self.assertEqual(to_key, 1) tmp2 = dict_search('linkinfo.info_data.egress_qos', tmp) for item in tmp2 if tmp2 else []: from_key = item['from'] to_key = item['to'] self.assertEqual(from_key, 1) self.assertEqual(to_key, 6) self.assertEqual(Interface(vif).get_admin_state(), 'up') new_ingress_qos_from = 1 new_ingress_qos_to = 6 new_egress_qos_from = 2 new_egress_qos_to = 7 for interface in self._interfaces: base = self._base_path + [interface] for vlan in self._vlan_range: base = self._base_path + [interface, 'vif', vlan] self.cli_set(base + ['ingress-qos', f'{new_ingress_qos_from}:{new_ingress_qos_to}']) self.cli_set(base + ['egress-qos', f'{new_egress_qos_from}:{new_egress_qos_to}']) self.cli_commit() for intf in self._interfaces: for vlan in self._vlan_range: vif = f'{intf}.{vlan}' tmp = get_interface_config(f'{vif}') tmp2 = dict_search('linkinfo.info_data.ingress_qos', tmp) if tmp2: from_key = tmp2[0]['from'] to_key = tmp2[0]['to'] self.assertEqual(from_key, new_ingress_qos_from) self.assertEqual(to_key, new_ingress_qos_to) tmp2 = dict_search('linkinfo.info_data.egress_qos', tmp) if tmp2: from_key = tmp2[0]['from'] to_key = tmp2[0]['to'] self.assertEqual(from_key, new_egress_qos_from) self.assertEqual(to_key, new_egress_qos_to) def test_vif_8021q_lower_up_down(self): # Testcase for https://vyos.dev/T3349 if not self._test_vlan: self.skipTest('not supported') for interface in self._interfaces: base = self._base_path + [interface] for option in self._options.get(interface, []): self.cli_set(base + option.split()) # disable the lower interface self.cli_set(base + ['disable']) for vlan in self._vlan_range: vlan_base = self._base_path + [interface, 'vif', vlan] # disable the vlan interface self.cli_set(vlan_base + ['disable']) self.cli_commit() # re-enable all lower interfaces for interface in self._interfaces: base = self._base_path + [interface] self.cli_delete(base + ['disable']) self.cli_commit() # verify that the lower interfaces are admin up and the vlan # interfaces are all admin down for interface in self._interfaces: self.assertEqual(Interface(interface).get_admin_state(), 'up') for vlan in self._vlan_range: ifname = f'{interface}.{vlan}' self.assertEqual(Interface(ifname).get_admin_state(), 'down') def test_vif_s_8021ad_vlan_interfaces(self): # XXX: This testcase is not allowed to run as first testcase, reason # is the Wireless test will first load the wifi kernel hwsim module # which creates a wlan0 and wlan1 interface which will fail the # tearDown() test in the end that no interface is allowed to survive! if not self._test_qinq: self.skipTest('not supported') for interface in self._interfaces: base = self._base_path + [interface] for option in self._options.get(interface, []): self.cli_set(base + option.split()) for vif_s in self._qinq_range: for vif_c in self._vlan_range: base = self._base_path + [interface, 'vif-s', vif_s, 'vif-c', vif_c] self.cli_set(base + ['mtu', self._mtu]) for address in self._test_addr: self.cli_set(base + ['address', address]) self.cli_commit() for interface in self._interfaces: for vif_s in self._qinq_range: tmp = get_interface_config(f'{interface}.{vif_s}') self.assertEqual(dict_search('linkinfo.info_data.protocol', tmp), '802.1ad') for vif_c in self._vlan_range: vif = f'{interface}.{vif_s}.{vif_c}' # For an unknown reason this regularely fails on the QEMU builds, # thus the test for reading back IP addresses is temporary # disabled. There is no big deal here, as this uses the same # methods on 802.1q and here it works and is verified. # for address in self._test_addr: # self.assertTrue(is_intf_addr_assigned(vif, address)) tmp = get_interface_config(vif) self.assertEqual(tmp['mtu'], int(self._mtu)) # T4064: Delete interface addresses, keep VLAN interface for interface in self._interfaces: base = self._base_path + [interface] for vif_s in self._qinq_range: for vif_c in self._vlan_range: self.cli_delete(self._base_path + [interface, 'vif-s', vif_s, 'vif-c', vif_c, 'address']) self.cli_commit() # Verify no IP address is assigned for interface in self._interfaces: base = self._base_path + [interface] for vif_s in self._qinq_range: for vif_c in self._vlan_range: vif = f'{interface}.{vif_s}.{vif_c}' for address in self._test_addr: self.assertFalse(is_intf_addr_assigned(vif, address)) # T3972: remove vif-c interfaces from vif-s for interface in self._interfaces: base = self._base_path + [interface] for vif_s in self._qinq_range: base = self._base_path + [interface, 'vif-s', vif_s, 'vif-c'] self.cli_delete(base) self.cli_commit() def test_vif_s_protocol_change(self): # XXX: This testcase is not allowed to run as first testcase, reason # is the Wireless test will first load the wifi kernel hwsim module # which creates a wlan0 and wlan1 interface which will fail the # tearDown() test in the end that no interface is allowed to survive! if not self._test_qinq: self.skipTest('not supported') for interface in self._interfaces: base = self._base_path + [interface] for option in self._options.get(interface, []): self.cli_set(base + option.split()) for vif_s in self._qinq_range: for vif_c in self._vlan_range: base = self._base_path + [interface, 'vif-s', vif_s, 'vif-c', vif_c] for address in self._test_addr: self.cli_set(base + ['address', address]) self.cli_commit() for interface in self._interfaces: for vif_s in self._qinq_range: tmp = get_interface_config(f'{interface}.{vif_s}') # check for the default value self.assertEqual(tmp['linkinfo']['info_data']['protocol'], '802.1ad') # T3532: now change ethertype new_protocol = '802.1q' for interface in self._interfaces: for vif_s in self._qinq_range: base = self._base_path + [interface, 'vif-s', vif_s] self.cli_set(base + ['protocol', new_protocol]) self.cli_commit() # Verify new ethertype configuration for interface in self._interfaces: for vif_s in self._qinq_range: tmp = get_interface_config(f'{interface}.{vif_s}') self.assertEqual(tmp['linkinfo']['info_data']['protocol'], new_protocol.upper()) def test_interface_ip_options(self): if not self._test_ip: self.skipTest('not supported') arp_tmo = '300' mss = '1420' for interface in self._interfaces: path = self._base_path + [interface] for option in self._options.get(interface, []): self.cli_set(path + option.split()) # Options if cli_defined(self._base_path + ['ip'], 'adjust-mss'): self.cli_set(path + ['ip', 'adjust-mss', mss]) if cli_defined(self._base_path + ['ip'], 'arp-cache-timeout'): self.cli_set(path + ['ip', 'arp-cache-timeout', arp_tmo]) if cli_defined(self._base_path + ['ip'], 'disable-arp-filter'): self.cli_set(path + ['ip', 'disable-arp-filter']) if cli_defined(self._base_path + ['ip'], 'disable-forwarding'): self.cli_set(path + ['ip', 'disable-forwarding']) if cli_defined(self._base_path + ['ip'], 'enable-directed-broadcast'): self.cli_set(path + ['ip', 'enable-directed-broadcast']) if cli_defined(self._base_path + ['ip'], 'enable-arp-accept'): self.cli_set(path + ['ip', 'enable-arp-accept']) if cli_defined(self._base_path + ['ip'], 'enable-arp-announce'): self.cli_set(path + ['ip', 'enable-arp-announce']) if cli_defined(self._base_path + ['ip'], 'enable-arp-ignore'): self.cli_set(path + ['ip', 'enable-arp-ignore']) if cli_defined(self._base_path + ['ip'], 'enable-proxy-arp'): self.cli_set(path + ['ip', 'enable-proxy-arp']) if cli_defined(self._base_path + ['ip'], 'proxy-arp-pvlan'): self.cli_set(path + ['ip', 'proxy-arp-pvlan']) if cli_defined(self._base_path + ['ip'], 'source-validation'): self.cli_set(path + ['ip', 'source-validation', 'loose']) self.cli_commit() for interface in self._interfaces: if cli_defined(self._base_path + ['ip'], 'adjust-mss'): base_options = f'oifname "{interface}"' out = cmd('sudo nft list chain raw VYOS_TCP_MSS') for line in out.splitlines(): if line.startswith(base_options): self.assertIn(f'tcp option maxseg size set {mss}', line) if cli_defined(self._base_path + ['ip'], 'arp-cache-timeout'): tmp = read_file(f'/proc/sys/net/ipv4/neigh/{interface}/base_reachable_time_ms') self.assertEqual(tmp, str((int(arp_tmo) * 1000))) # tmo value is in milli seconds proc_base = f'/proc/sys/net/ipv4/conf/{interface}' if cli_defined(self._base_path + ['ip'], 'disable-arp-filter'): tmp = read_file(f'{proc_base}/arp_filter') self.assertEqual('0', tmp) if cli_defined(self._base_path + ['ip'], 'enable-arp-accept'): tmp = read_file(f'{proc_base}/arp_accept') self.assertEqual('1', tmp) if cli_defined(self._base_path + ['ip'], 'enable-arp-announce'): tmp = read_file(f'{proc_base}/arp_announce') self.assertEqual('1', tmp) if cli_defined(self._base_path + ['ip'], 'enable-arp-ignore'): tmp = read_file(f'{proc_base}/arp_ignore') self.assertEqual('1', tmp) if cli_defined(self._base_path + ['ip'], 'disable-forwarding'): tmp = read_file(f'{proc_base}/forwarding') self.assertEqual('0', tmp) if cli_defined(self._base_path + ['ip'], 'enable-directed-broadcast'): tmp = read_file(f'{proc_base}/bc_forwarding') self.assertEqual('1', tmp) if cli_defined(self._base_path + ['ip'], 'enable-proxy-arp'): tmp = read_file(f'{proc_base}/proxy_arp') self.assertEqual('1', tmp) if cli_defined(self._base_path + ['ip'], 'proxy-arp-pvlan'): tmp = read_file(f'{proc_base}/proxy_arp_pvlan') self.assertEqual('1', tmp) if cli_defined(self._base_path + ['ip'], 'source-validation'): base_options = f'iifname "{interface}"' out = cmd('sudo nft list chain ip raw vyos_rpfilter') for line in out.splitlines(): if line.startswith(base_options): self.assertIn('fib saddr oif 0', line) self.assertIn('drop', line) def test_interface_ipv6_options(self): if not self._test_ipv6: self.skipTest('not supported') mss = '1400' dad_transmits = '10' accept_dad = '0' source_validation = 'strict' for interface in self._interfaces: path = self._base_path + [interface] for option in self._options.get(interface, []): self.cli_set(path + option.split()) # Options if cli_defined(self._base_path + ['ipv6'], 'adjust-mss'): self.cli_set(path + ['ipv6', 'adjust-mss', mss]) if cli_defined(self._base_path + ['ipv6'], 'accept-dad'): self.cli_set(path + ['ipv6', 'accept-dad', accept_dad]) if cli_defined(self._base_path + ['ipv6'], 'dup-addr-detect-transmits'): self.cli_set(path + ['ipv6', 'dup-addr-detect-transmits', dad_transmits]) if cli_defined(self._base_path + ['ipv6'], 'disable-forwarding'): self.cli_set(path + ['ipv6', 'disable-forwarding']) if cli_defined(self._base_path + ['ipv6'], 'source-validation'): self.cli_set(path + ['ipv6', 'source-validation', source_validation]) self.cli_commit() for interface in self._interfaces: proc_base = f'/proc/sys/net/ipv6/conf/{interface}' if cli_defined(self._base_path + ['ipv6'], 'adjust-mss'): base_options = f'oifname "{interface}"' out = cmd('sudo nft list chain ip6 raw VYOS_TCP_MSS') for line in out.splitlines(): if line.startswith(base_options): self.assertIn(f'tcp option maxseg size set {mss}', line) if cli_defined(self._base_path + ['ipv6'], 'accept-dad'): tmp = read_file(f'{proc_base}/accept_dad') self.assertEqual(accept_dad, tmp) if cli_defined(self._base_path + ['ipv6'], 'dup-addr-detect-transmits'): tmp = read_file(f'{proc_base}/dad_transmits') self.assertEqual(dad_transmits, tmp) if cli_defined(self._base_path + ['ipv6'], 'disable-forwarding'): tmp = read_file(f'{proc_base}/forwarding') self.assertEqual('0', tmp) if cli_defined(self._base_path + ['ipv6'], 'source-validation'): base_options = f'iifname "{interface}"' out = cmd('sudo nft list chain ip6 raw vyos_rpfilter') for line in out.splitlines(): if line.startswith(base_options): self.assertIn('fib saddr . iif oif 0', line) self.assertIn('drop', line) def test_dhcpv6_client_options(self): if not self._test_ipv6_dhcpc6: self.skipTest('not supported') duid_base = 10 for interface in self._interfaces: duid = '00:01:00:01:27:71:db:f0:00:50:00:00:00:{}'.format(duid_base) path = self._base_path + [interface] for option in self._options.get(interface, []): self.cli_set(path + option.split()) # Enable DHCPv6 client self.cli_set(path + ['address', 'dhcpv6']) self.cli_set(path + ['dhcpv6-options', 'no-release']) self.cli_set(path + ['dhcpv6-options', 'rapid-commit']) self.cli_set(path + ['dhcpv6-options', 'parameters-only']) self.cli_set(path + ['dhcpv6-options', 'duid', duid]) duid_base += 1 self.cli_commit() duid_base = 10 for interface in self._interfaces: duid = '00:01:00:01:27:71:db:f0:00:50:00:00:00:{}'.format(duid_base) dhcpc6_config = read_file(f'{dhcp6c_base_dir}/dhcp6c.{interface}.conf') self.assertIn(f'interface {interface} ' + '{', dhcpc6_config) self.assertIn(f' request domain-name-servers;', dhcpc6_config) self.assertIn(f' request domain-name;', dhcpc6_config) self.assertIn(f' information-only;', dhcpc6_config) self.assertIn(f' send ia-na 0;', dhcpc6_config) self.assertIn(f' send rapid-commit;', dhcpc6_config) self.assertIn(f' send client-id {duid};', dhcpc6_config) self.assertIn('};', dhcpc6_config) duid_base += 1 # Better ask the process about it's commandline in the future pid = process_named_running(dhcp6c_process_name, cmdline=interface, timeout=10) self.assertTrue(pid) dhcp6c_options = read_file(f'/proc/{pid}/cmdline') self.assertIn('-n', dhcp6c_options) def test_dhcpv6pd_auto_sla_id(self): if not self._test_ipv6_pd: self.skipTest('not supported') prefix_len = '56' sla_len = str(64 - int(prefix_len)) # Create delegatee interfaces first to avoid any confusion by dhcpc6 # this is mainly an "issue" with virtual-ethernet interfaces delegatees = ['dum2340', 'dum2341', 'dum2342', 'dum2343', 'dum2344'] for delegatee in delegatees: section = Section.section(delegatee) self.cli_set(['interfaces', section, delegatee]) self.cli_commit() for interface in self._interfaces: path = self._base_path + [interface] for option in self._options.get(interface, []): self.cli_set(path + option.split()) address = '1' # prefix delegation stuff pd_base = path + ['dhcpv6-options', 'pd', '0'] self.cli_set(pd_base + ['length', prefix_len]) for delegatee in delegatees: self.cli_set(pd_base + ['interface', delegatee, 'address', address]) # increment interface address address = str(int(address) + 1) self.cli_commit() for interface in self._interfaces: dhcpc6_config = read_file(f'{dhcp6c_base_dir}/dhcp6c.{interface}.conf') # verify DHCPv6 prefix delegation self.assertIn(f'prefix ::/{prefix_len} infinity;', dhcpc6_config) address = '1' sla_id = '0' for delegatee in delegatees: self.assertIn(f'prefix-interface {delegatee}' + r' {', dhcpc6_config) self.assertIn(f'ifid {address};', dhcpc6_config) self.assertIn(f'sla-id {sla_id};', dhcpc6_config) self.assertIn(f'sla-len {sla_len};', dhcpc6_config) # increment sla-id sla_id = str(int(sla_id) + 1) # increment interface address address = str(int(address) + 1) # Check for running process self.assertTrue(process_named_running(dhcp6c_process_name, cmdline=interface, timeout=10)) for delegatee in delegatees: # we can already cleanup the test delegatee interface here # as until commit() is called, nothing happens section = Section.section(delegatee) self.cli_delete(['interfaces', section, delegatee]) def test_dhcpv6pd_manual_sla_id(self): if not self._test_ipv6_pd: self.skipTest('not supported') prefix_len = '56' sla_len = str(64 - int(prefix_len)) # Create delegatee interfaces first to avoid any confusion by dhcpc6 # this is mainly an "issue" with virtual-ethernet interfaces delegatees = ['dum3340', 'dum3341', 'dum3342', 'dum3343', 'dum3344'] for delegatee in delegatees: section = Section.section(delegatee) self.cli_set(['interfaces', section, delegatee]) self.cli_commit() for interface in self._interfaces: path = self._base_path + [interface] for option in self._options.get(interface, []): self.cli_set(path + option.split()) # prefix delegation stuff address = '1' sla_id = '1' pd_base = path + ['dhcpv6-options', 'pd', '0'] self.cli_set(pd_base + ['length', prefix_len]) for delegatee in delegatees: self.cli_set(pd_base + ['interface', delegatee, 'address', address]) self.cli_set(pd_base + ['interface', delegatee, 'sla-id', sla_id]) # increment interface address address = str(int(address) + 1) sla_id = str(int(sla_id) + 1) self.cli_commit() # Verify dhcpc6 client configuration for interface in self._interfaces: address = '1' sla_id = '1' dhcpc6_config = read_file(f'{dhcp6c_base_dir}/dhcp6c.{interface}.conf') # verify DHCPv6 prefix delegation self.assertIn(f'prefix ::/{prefix_len} infinity;', dhcpc6_config) for delegatee in delegatees: self.assertIn(f'prefix-interface {delegatee}' + r' {', dhcpc6_config) self.assertIn(f'ifid {address};', dhcpc6_config) self.assertIn(f'sla-id {sla_id};', dhcpc6_config) self.assertIn(f'sla-len {sla_len};', dhcpc6_config) # increment sla-id sla_id = str(int(sla_id) + 1) # increment interface address address = str(int(address) + 1) # Check for running process self.assertTrue(process_named_running(dhcp6c_process_name, cmdline=interface, timeout=10)) for delegatee in delegatees: # we can already cleanup the test delegatee interface here # as until commit() is called, nothing happens section = Section.section(delegatee) self.cli_delete(['interfaces', section, delegatee]) diff --git a/src/conf_mode/interfaces_geneve.py b/src/conf_mode/interfaces_geneve.py index 769139e0f..007708d4a 100755 --- a/src/conf_mode/interfaces_geneve.py +++ b/src/conf_mode/interfaces_geneve.py @@ -1,102 +1,104 @@ #!/usr/bin/env python3 # # Copyright (C) 2019-2024 VyOS maintainers and contributors # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License version 2 or later as # published by the Free Software Foundation. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. from sys import exit from vyos.config import Config from vyos.configdict import get_interface_dict from vyos.configdict import is_node_changed from vyos.configverify import verify_address from vyos.configverify import verify_mtu_ipv6 from vyos.configverify import verify_bridge_delete from vyos.configverify import verify_mirror_redirect from vyos.configverify import verify_bond_bridge_member +from vyos.configverify import verify_vrf from vyos.ifconfig import GeneveIf from vyos.utils.network import interface_exists from vyos import ConfigError from vyos import airbag airbag.enable() def get_config(config=None): """ Retrive CLI config as dictionary. Dictionary can never be empty, as at least the interface name will be added or a deleted flag """ if config: conf = config else: conf = Config() base = ['interfaces', 'geneve'] ifname, geneve = get_interface_dict(conf, base) # GENEVE interfaces are picky and require recreation if certain parameters # change. But a GENEVE interface should - of course - not be re-created if # it's description or IP address is adjusted. Feels somehow logic doesn't it? for cli_option in ['remote', 'vni', 'parameters']: if is_node_changed(conf, base + [ifname, cli_option]): geneve.update({'rebuild_required': {}}) return geneve def verify(geneve): if 'deleted' in geneve: verify_bridge_delete(geneve) return None verify_mtu_ipv6(geneve) verify_address(geneve) + verify_vrf(geneve) verify_bond_bridge_member(geneve) verify_mirror_redirect(geneve) if 'remote' not in geneve: raise ConfigError('Remote side must be configured') if 'vni' not in geneve: raise ConfigError('VNI must be configured') return None def generate(geneve): return None def apply(geneve): # Check if GENEVE interface already exists if 'rebuild_required' in geneve or 'delete' in geneve: if interface_exists(geneve['ifname']): g = GeneveIf(**geneve) # GENEVE is super picky and the tunnel always needs to be recreated, # thus we can simply always delete it first. g.remove() if 'deleted' not in geneve: # Finally create the new interface g = GeneveIf(**geneve) g.update(geneve) return None if __name__ == '__main__': try: c = get_config() verify(c) generate(c) apply(c) except ConfigError as e: print(e) exit(1) diff --git a/src/conf_mode/interfaces_l2tpv3.py b/src/conf_mode/interfaces_l2tpv3.py index e25793543..b9f827bee 100755 --- a/src/conf_mode/interfaces_l2tpv3.py +++ b/src/conf_mode/interfaces_l2tpv3.py @@ -1,110 +1,112 @@ #!/usr/bin/env python3 # # Copyright (C) 2019-2024 VyOS maintainers and contributors # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License version 2 or later as # published by the Free Software Foundation. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. from sys import exit from vyos.config import Config from vyos.configdict import get_interface_dict from vyos.configdict import leaf_node_changed from vyos.configverify import verify_address from vyos.configverify import verify_bridge_delete from vyos.configverify import verify_mtu_ipv6 from vyos.configverify import verify_mirror_redirect from vyos.configverify import verify_bond_bridge_member +from vyos.configverify import verify_vrf from vyos.ifconfig import L2TPv3If from vyos.utils.kernel import check_kmod from vyos.utils.network import is_addr_assigned from vyos.utils.network import interface_exists from vyos import ConfigError from vyos import airbag airbag.enable() k_mod = ['l2tp_eth', 'l2tp_netlink', 'l2tp_ip', 'l2tp_ip6'] def get_config(config=None): """ Retrive CLI config as dictionary. Dictionary can never be empty, as at least the interface name will be added or a deleted flag """ if config: conf = config else: conf = Config() base = ['interfaces', 'l2tpv3'] ifname, l2tpv3 = get_interface_dict(conf, base) # To delete an l2tpv3 interface we need the current tunnel and session-id if 'deleted' in l2tpv3: tmp = leaf_node_changed(conf, base + [ifname, 'tunnel-id']) # leaf_node_changed() returns a list l2tpv3.update({'tunnel_id': tmp[0]}) tmp = leaf_node_changed(conf, base + [ifname, 'session-id']) l2tpv3.update({'session_id': tmp[0]}) return l2tpv3 def verify(l2tpv3): if 'deleted' in l2tpv3: verify_bridge_delete(l2tpv3) return None interface = l2tpv3['ifname'] for key in ['source_address', 'remote', 'tunnel_id', 'peer_tunnel_id', 'session_id', 'peer_session_id']: if key not in l2tpv3: tmp = key.replace('_', '-') raise ConfigError(f'Missing mandatory L2TPv3 option: "{tmp}"!') if not is_addr_assigned(l2tpv3['source_address']): raise ConfigError('L2TPv3 source-address address "{source_address}" ' 'not configured on any interface!'.format(**l2tpv3)) verify_mtu_ipv6(l2tpv3) verify_address(l2tpv3) + verify_vrf(l2tpv3) verify_bond_bridge_member(l2tpv3) verify_mirror_redirect(l2tpv3) return None def generate(l2tpv3): return None def apply(l2tpv3): # Check if L2TPv3 interface already exists if interface_exists(l2tpv3['ifname']): # L2TPv3 is picky when changing tunnels/sessions, thus we can simply # always delete it first. l = L2TPv3If(**l2tpv3) l.remove() if 'deleted' not in l2tpv3: # Finally create the new interface l = L2TPv3If(**l2tpv3) l.update(l2tpv3) return None if __name__ == '__main__': try: check_kmod(k_mod) c = get_config() verify(c) generate(c) apply(c) except ConfigError as e: print(e) exit(1) diff --git a/src/conf_mode/interfaces_vti.py b/src/conf_mode/interfaces_vti.py index e6a833df7..20629c6c1 100755 --- a/src/conf_mode/interfaces_vti.py +++ b/src/conf_mode/interfaces_vti.py @@ -1,66 +1,68 @@ #!/usr/bin/env python3 # # Copyright (C) 2021-2024 VyOS maintainers and contributors # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License version 2 or later as # published by the Free Software Foundation. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. from sys import exit from vyos.config import Config from vyos.configdict import get_interface_dict from vyos.configverify import verify_mirror_redirect +from vyos.configverify import verify_vrf from vyos.ifconfig import VTIIf from vyos import ConfigError from vyos import airbag airbag.enable() def get_config(config=None): """ Retrive CLI config as dictionary. Dictionary can never be empty, as at least the interface name will be added or a deleted flag """ if config: conf = config else: conf = Config() base = ['interfaces', 'vti'] _, vti = get_interface_dict(conf, base) return vti def verify(vti): + verify_vrf(vti) verify_mirror_redirect(vti) return None def generate(vti): return None def apply(vti): # Remove macsec interface if 'deleted' in vti: VTIIf(**vti).remove() return None tmp = VTIIf(**vti) tmp.update(vti) return None if __name__ == '__main__': try: c = get_config() verify(c) generate(c) apply(c) except ConfigError as e: print(e) exit(1) diff --git a/src/conf_mode/interfaces_vxlan.py b/src/conf_mode/interfaces_vxlan.py index aca0a20e4..68646e8ff 100755 --- a/src/conf_mode/interfaces_vxlan.py +++ b/src/conf_mode/interfaces_vxlan.py @@ -1,257 +1,259 @@ #!/usr/bin/env python3 # # Copyright (C) 2019-2024 VyOS maintainers and contributors # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License version 2 or later as # published by the Free Software Foundation. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. from sys import exit from vyos.base import Warning from vyos.config import Config from vyos.configdict import get_interface_dict from vyos.configdict import leaf_node_changed from vyos.configdict import is_node_changed from vyos.configdict import node_changed from vyos.configverify import verify_address from vyos.configverify import verify_bridge_delete from vyos.configverify import verify_mtu_ipv6 from vyos.configverify import verify_mirror_redirect from vyos.configverify import verify_source_interface from vyos.configverify import verify_bond_bridge_member +from vyos.configverify import verify_vrf from vyos.ifconfig import Interface from vyos.ifconfig import VXLANIf from vyos.template import is_ipv6 from vyos.utils.dict import dict_search from vyos.utils.network import interface_exists from vyos import ConfigError from vyos import airbag airbag.enable() def get_config(config=None): """ Retrive CLI config as dictionary. Dictionary can never be empty, as at least the interface name will be added or a deleted flag """ if config: conf = config else: conf = Config() base = ['interfaces', 'vxlan'] ifname, vxlan = get_interface_dict(conf, base) # VXLAN interfaces are picky and require recreation if certain parameters # change. But a VXLAN interface should - of course - not be re-created if # it's description or IP address is adjusted. Feels somehow logic doesn't it? for cli_option in ['parameters', 'gpe', 'group', 'port', 'remote', 'source-address', 'source-interface', 'vni']: if is_node_changed(conf, base + [ifname, cli_option]): vxlan.update({'rebuild_required': {}}) break # When dealing with VNI filtering we need to know what VNI was actually removed, # so build up a dict matching the vlan_to_vni structure but with removed values. tmp = node_changed(conf, base + [ifname, 'vlan-to-vni'], recursive=True) if tmp: vxlan.update({'vlan_to_vni_removed': {}}) for vlan in tmp: vni = leaf_node_changed(conf, base + [ifname, 'vlan-to-vni', vlan, 'vni']) vxlan['vlan_to_vni_removed'].update({vlan : {'vni' : vni[0]}}) # We need to verify that no other VXLAN tunnel is configured when external # mode is in use - Linux Kernel limitation conf.set_level(base) vxlan['other_tunnels'] = conf.get_config_dict([], key_mangling=('-', '_'), get_first_key=True, no_tag_node_value_mangle=True) # This if-clause is just to be sure - it will always evaluate to true ifname = vxlan['ifname'] if ifname in vxlan['other_tunnels']: del vxlan['other_tunnels'][ifname] if len(vxlan['other_tunnels']) == 0: del vxlan['other_tunnels'] return vxlan def verify(vxlan): if 'deleted' in vxlan: verify_bridge_delete(vxlan) return None if int(vxlan['mtu']) < 1500: Warning('RFC7348 recommends VXLAN tunnels preserve a 1500 byte MTU') if 'group' in vxlan: if 'source_interface' not in vxlan: raise ConfigError('Multicast VXLAN requires an underlaying interface') verify_source_interface(vxlan) if not any(tmp in ['group', 'remote', 'source_address', 'source_interface'] for tmp in vxlan): raise ConfigError('Group, remote, source-address or source-interface must be configured') if 'vni' not in vxlan and dict_search('parameters.external', vxlan) == None: raise ConfigError('Must either configure VXLAN "vni" or use "external" CLI option!') if dict_search('parameters.external', vxlan) != None: if 'vni' in vxlan: raise ConfigError('Can not specify both "external" and "VNI"!') if 'other_tunnels' in vxlan: # When multiple VXLAN interfaces are defined and "external" is used, # all VXLAN interfaces need to have vni-filter enabled! # See Linux Kernel commit f9c4bb0b245cee35ef66f75bf409c9573d934cf9 other_vni_filter = False for tunnel, tunnel_config in vxlan['other_tunnels'].items(): if dict_search('parameters.vni_filter', tunnel_config) != None: other_vni_filter = True break # eqivalent of the C foo ? 'a' : 'b' statement vni_filter = True and (dict_search('parameters.vni_filter', vxlan) != None) or False # If either one is enabled, so must be the other. Both can be off and both can be on if (vni_filter and not other_vni_filter) or (not vni_filter and other_vni_filter): raise ConfigError(f'Using multiple VXLAN interfaces with "external" '\ 'requires all VXLAN interfaces to have "vni-filter" configured!') if not vni_filter and not other_vni_filter: other_tunnels = ', '.join(vxlan['other_tunnels']) raise ConfigError(f'Only one VXLAN tunnel is supported when "external" '\ f'CLI option is used and "vni-filter" is unset. '\ f'Additional tunnels: {other_tunnels}') if 'gpe' in vxlan and 'external' not in vxlan: raise ConfigError(f'VXLAN-GPE is only supported when "external" '\ f'CLI option is used.') if 'source_interface' in vxlan: # VXLAN adds at least an overhead of 50 byte - we need to check the # underlaying device if our VXLAN package is not going to be fragmented! vxlan_overhead = 50 if 'source_address' in vxlan and is_ipv6(vxlan['source_address']): # IPv6 adds an extra 20 bytes overhead because the IPv6 header is 20 # bytes larger than the IPv4 header - assuming no extra options are # in use. vxlan_overhead += 20 # If source_address is not used - check IPv6 'remote' list elif 'remote' in vxlan: if any(is_ipv6(a) for a in vxlan['remote']): vxlan_overhead += 20 lower_mtu = Interface(vxlan['source_interface']).get_mtu() if lower_mtu < (int(vxlan['mtu']) + vxlan_overhead): raise ConfigError(f'Underlaying device MTU is to small ({lower_mtu} '\ f'bytes) for VXLAN overhead ({vxlan_overhead} bytes!)') # Check for mixed IPv4 and IPv6 addresses protocol = None if 'source_address' in vxlan: if is_ipv6(vxlan['source_address']): protocol = 'ipv6' else: protocol = 'ipv4' if 'remote' in vxlan: error_msg = 'Can not mix both IPv4 and IPv6 for VXLAN underlay' for remote in vxlan['remote']: if is_ipv6(remote): if protocol == 'ipv4': raise ConfigError(error_msg) protocol = 'ipv6' else: if protocol == 'ipv6': raise ConfigError(error_msg) protocol = 'ipv4' if 'vlan_to_vni' in vxlan: if 'is_bridge_member' not in vxlan: raise ConfigError('VLAN to VNI mapping requires that VXLAN interface '\ 'is member of a bridge interface!') vnis_used = [] vlans_used = [] for vif, vif_config in vxlan['vlan_to_vni'].items(): if 'vni' not in vif_config: raise ConfigError(f'Must define VNI for VLAN "{vif}"!') vni = vif_config['vni'] err_msg = f'VLAN range "{vif}" does not match VNI range "{vni}"!' vif_range, vni_range = list(map(int, vif.split('-'))), list(map(int, vni.split('-'))) if len(vif_range) != len(vni_range): raise ConfigError(err_msg) if len(vif_range) > 1: if vni_range[0] > vni_range[-1] or vif_range[0] > vif_range[-1]: raise ConfigError('The upper bound of the range must be greater than the lower bound!') vni_range = range(vni_range[0], vni_range[1] + 1) vif_range = range(vif_range[0], vif_range[1] + 1) if len(vif_range) != len(vni_range): raise ConfigError(err_msg) for vni_id in vni_range: if vni_id in vnis_used: raise ConfigError(f'VNI "{vni_id}" is already assigned to a different VLAN!') vnis_used.append(vni_id) for vif_id in vif_range: if vif_id in vlans_used: raise ConfigError(f'VLAN "{vif_id}" is already in use!') vlans_used.append(vif_id) if dict_search('parameters.neighbor_suppress', vxlan) != None: if 'is_bridge_member' not in vxlan: raise ConfigError('Neighbor suppression requires that VXLAN interface '\ 'is member of a bridge interface!') verify_mtu_ipv6(vxlan) verify_address(vxlan) + verify_vrf(vxlan) verify_bond_bridge_member(vxlan) verify_mirror_redirect(vxlan) # We use a defaultValue for port, thus it's always safe to use if vxlan['port'] == '8472': Warning('Starting from VyOS 1.4, the default port for VXLAN '\ 'has been changed to 4789. This matches the IANA assigned '\ 'standard port number!') return None def generate(vxlan): return None def apply(vxlan): # Check if the VXLAN interface already exists if 'rebuild_required' in vxlan or 'delete' in vxlan: if interface_exists(vxlan['ifname']): v = VXLANIf(**vxlan) # VXLAN is super picky and the tunnel always needs to be recreated, # thus we can simply always delete it first. v.remove() if 'deleted' not in vxlan: # Finally create the new interface v = VXLANIf(**vxlan) v.update(vxlan) return None if __name__ == '__main__': try: c = get_config() verify(c) generate(c) apply(c) except ConfigError as e: print(e) exit(1)