diff --git a/interface-definitions/service_dhcp-server.xml.in b/interface-definitions/service_dhcp-server.xml.in index 5c9d4a360..1c10a462d 100644 --- a/interface-definitions/service_dhcp-server.xml.in +++ b/interface-definitions/service_dhcp-server.xml.in @@ -1,223 +1,229 @@ <?xml version="1.0"?> <!-- DHCP server configuration --> <interfaceDefinition> <node name="service"> <children> <node name="dhcp-server" owner="${vyos_conf_scripts_dir}/service_dhcp-server.py"> <properties> <help>Dynamic Host Configuration Protocol (DHCP) for DHCP server</help> <priority>911</priority> </properties> <children> #include <include/generic-disable-node.xml.i> <leafNode name="dynamic-dns-update"> <properties> <help>Dynamically update Domain Name System (RFC4702)</help> <valueless/> </properties> </leafNode> <node name="failover"> <properties> <help>DHCP failover configuration</help> </properties> <children> #include <include/source-address-ipv4.xml.i> <leafNode name="remote"> <properties> <help>IPv4 remote address used for connectio</help> <valueHelp> <format>ipv4</format> <description>IPv4 address of failover peer</description> </valueHelp> <constraint> <validator name="ipv4-address"/> </constraint> </properties> </leafNode> <leafNode name="name"> <properties> <help>Peer name used to identify connection</help> <constraint> #include <include/constraint/alpha-numeric-hyphen-underscore-dot.xml.i> </constraint> <constraintErrorMessage>Invalid failover peer name. May only contain letters, numbers and .-_</constraintErrorMessage> </properties> </leafNode> <leafNode name="status"> <properties> <help>Failover hierarchy</help> <completionHelp> <list>primary secondary</list> </completionHelp> <valueHelp> <format>primary</format> <description>Configure this server to be the primary node</description> </valueHelp> <valueHelp> <format>secondary</format> <description>Configure this server to be the secondary node</description> </valueHelp> <constraint> <regex>(primary|secondary)</regex> </constraint> <constraintErrorMessage>Invalid DHCP failover peer status</constraintErrorMessage> </properties> </leafNode> #include <include/pki/ca-certificate.xml.i> #include <include/pki/certificate.xml.i> </children> </node> <leafNode name="hostfile-update"> <properties> <help>Updating /etc/hosts file (per client lease)</help> <valueless/> </properties> </leafNode> #include <include/listen-address-ipv4.xml.i> #include <include/listen-interface-multi-broadcast.xml.i> <tagNode name="shared-network-name"> <properties> <help>Name of DHCP shared network</help> <constraint> #include <include/constraint/alpha-numeric-hyphen-underscore-dot.xml.i> </constraint> <constraintErrorMessage>Invalid shared network name. May only contain letters, numbers and .-_</constraintErrorMessage> </properties> <children> <leafNode name="authoritative"> <properties> <help>Option to make DHCP server authoritative for this physical network</help> <valueless/> </properties> </leafNode> #include <include/dhcp/option-v4.xml.i> #include <include/generic-description.xml.i> #include <include/generic-disable-node.xml.i> <tagNode name="subnet"> <properties> <help>DHCP subnet for shared network</help> <valueHelp> <format>ipv4net</format> <description>IPv4 address and prefix length</description> </valueHelp> <constraint> <validator name="ipv4-prefix"/> </constraint> <constraintErrorMessage>Invalid IPv4 subnet definition</constraintErrorMessage> </properties> <children> #include <include/dhcp/option-v4.xml.i> #include <include/generic-description.xml.i> #include <include/generic-disable-node.xml.i> <leafNode name="exclude"> <properties> <help>IP address to exclude from DHCP lease range</help> <valueHelp> <format>ipv4</format> <description>IPv4 address to exclude from lease range</description> </valueHelp> <constraint> <validator name="ipv4-address"/> </constraint> <multi/> </properties> </leafNode> + <leafNode name="ignore-client-id"> + <properties> + <help>Ignore client identifier for lease lookups</help> + <valueless/> + </properties> + </leafNode> <leafNode name="lease"> <properties> <help>Lease timeout in seconds</help> <valueHelp> <format>u32</format> <description>DHCP lease time in seconds</description> </valueHelp> <constraint> <validator name="numeric" argument="--range 0-4294967295"/> </constraint> <constraintErrorMessage>DHCP lease time must be between 0 and 4294967295 (49 days)</constraintErrorMessage> </properties> <defaultValue>86400</defaultValue> </leafNode> <tagNode name="range"> <properties> <help>DHCP lease range</help> <constraint> #include <include/constraint/alpha-numeric-hyphen-underscore-dot.xml.i> </constraint> <constraintErrorMessage>Invalid range name, may only be alphanumeric, dot and hyphen</constraintErrorMessage> </properties> <children> #include <include/dhcp/option-v4.xml.i> <leafNode name="start"> <properties> <help>First IP address for DHCP lease range</help> <valueHelp> <format>ipv4</format> <description>IPv4 start address of pool</description> </valueHelp> <constraint> <validator name="ipv4-address"/> </constraint> </properties> </leafNode> <leafNode name="stop"> <properties> <help>Last IP address for DHCP lease range</help> <valueHelp> <format>ipv4</format> <description>IPv4 end address of pool</description> </valueHelp> <constraint> <validator name="ipv4-address"/> </constraint> </properties> </leafNode> </children> </tagNode> <tagNode name="static-mapping"> <properties> <help>Hostname for static mapping reservation</help> <constraint> <validator name="fqdn"/> </constraint> <constraintErrorMessage>Invalid static mapping hostname</constraintErrorMessage> </properties> <children> #include <include/dhcp/option-v4.xml.i> #include <include/generic-description.xml.i> #include <include/generic-disable-node.xml.i> <leafNode name="ip-address"> <properties> <help>Fixed IP address of static mapping</help> <valueHelp> <format>ipv4</format> <description>IPv4 address used in static mapping</description> </valueHelp> <constraint> <validator name="ipv4-address"/> </constraint> </properties> </leafNode> #include <include/interface/mac.xml.i> #include <include/interface/duid.xml.i> </children> </tagNode> <leafNode name="subnet-id"> <properties> <help>Unique ID mapped to leases in the lease file</help> <valueHelp> <format>u32</format> <description>Unique subnet ID</description> </valueHelp> <constraint> <validator name="numeric" argument="--range 1-4294967295"/> </constraint> </properties> </leafNode> </children> </tagNode> </children> </tagNode> </children> </node> </children> </node> </interfaceDefinition> diff --git a/python/vyos/kea.py b/python/vyos/kea.py index 894ac9e9a..2328d0b00 100644 --- a/python/vyos/kea.py +++ b/python/vyos/kea.py @@ -1,352 +1,355 @@ # Copyright 2023 VyOS maintainers and contributors <maintainers@vyos.io> # # This library is free software; you can redistribute it and/or # modify it under the terms of the GNU Lesser General Public # License as published by the Free Software Foundation; either # version 2.1 of the License, or (at your option) any later version. # # This library is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU # Lesser General Public License for more details. # # You should have received a copy of the GNU Lesser General Public # License along with this library. If not, see <http://www.gnu.org/licenses/>. import json import os import socket from vyos.template import is_ipv6 from vyos.template import isc_static_route from vyos.template import netmask_from_cidr from vyos.utils.dict import dict_search_args from vyos.utils.file import file_permissions from vyos.utils.file import read_file from vyos.utils.process import run kea4_options = { 'name_server': 'domain-name-servers', 'domain_name': 'domain-name', 'domain_search': 'domain-search', 'ntp_server': 'ntp-servers', 'pop_server': 'pop-server', 'smtp_server': 'smtp-server', 'time_server': 'time-servers', 'wins_server': 'netbios-name-servers', 'default_router': 'routers', 'server_identifier': 'dhcp-server-identifier', 'tftp_server_name': 'tftp-server-name', 'bootfile_size': 'boot-size', 'time_offset': 'time-offset', 'wpad_url': 'wpad-url', 'ipv6_only_preferred': 'v6-only-preferred', 'captive_portal': 'v4-captive-portal' } kea6_options = { 'info_refresh_time': 'information-refresh-time', 'name_server': 'dns-servers', 'domain_search': 'domain-search', 'nis_domain': 'nis-domain-name', 'nis_server': 'nis-servers', 'nisplus_domain': 'nisp-domain-name', 'nisplus_server': 'nisp-servers', 'sntp_server': 'sntp-servers', 'captive_portal': 'v6-captive-portal' } def kea_parse_options(config): options = [] for node, option_name in kea4_options.items(): if node not in config: continue value = ", ".join(config[node]) if isinstance(config[node], list) else config[node] options.append({'name': option_name, 'data': value}) if 'client_prefix_length' in config: options.append({'name': 'subnet-mask', 'data': netmask_from_cidr('0.0.0.0/' + config['client_prefix_length'])}) if 'ip_forwarding' in config: options.append({'name': 'ip-forwarding', 'data': "true"}) if 'static_route' in config: default_route = '' if 'default_router' in config: default_route = isc_static_route('0.0.0.0/0', config['default_router']) routes = [isc_static_route(route, route_options['next_hop']) for route, route_options in config['static_route'].items()] options.append({'name': 'rfc3442-static-route', 'data': ", ".join(routes if not default_route else routes + [default_route])}) options.append({'name': 'windows-static-route', 'data': ", ".join(routes)}) if 'time_zone' in config: with open("/usr/share/zoneinfo/" + config['time_zone'], "rb") as f: tz_string = f.read().split(b"\n")[-2].decode("utf-8") options.append({'name': 'pcode', 'data': tz_string}) options.append({'name': 'tcode', 'data': config['time_zone']}) unifi_controller = dict_search_args(config, 'vendor_option', 'ubiquiti', 'unifi_controller') if unifi_controller: options.append({ 'name': 'unifi-controller', 'data': unifi_controller, 'space': 'ubnt' }) return options def kea_parse_subnet(subnet, config): out = {'subnet': subnet, 'id': int(config['subnet_id'])} options = [] if 'option' in config: out['option-data'] = kea_parse_options(config['option']) if 'bootfile_name' in config['option']: out['boot-file-name'] = config['option']['bootfile_name'] if 'bootfile_server' in config['option']: out['next-server'] = config['option']['bootfile_server'] + if 'ignore_client_id' in config: + out['match-client-id'] = False + if 'lease' in config: out['valid-lifetime'] = int(config['lease']) out['max-valid-lifetime'] = int(config['lease']) if 'range' in config: pools = [] for num, range_config in config['range'].items(): start, stop = range_config['start'], range_config['stop'] pool = { 'pool': f'{start} - {stop}' } if 'option' in range_config: pool['option-data'] = kea_parse_options(range_config['option']) if 'bootfile_name' in range_config['option']: pool['boot-file-name'] = range_config['option']['bootfile_name'] if 'bootfile_server' in range_config['option']: pool['next-server'] = range_config['option']['bootfile_server'] pools.append(pool) out['pools'] = pools if 'static_mapping' in config: reservations = [] for host, host_config in config['static_mapping'].items(): if 'disable' in host_config: continue reservation = { 'hostname': host, } if 'mac' in host_config: reservation['hw-address'] = host_config['mac'] if 'duid' in host_config: reservation['duid'] = host_config['duid'] if 'ip_address' in host_config: reservation['ip-address'] = host_config['ip_address'] if 'option' in host_config: reservation['option-data'] = kea_parse_options(host_config['option']) if 'bootfile_name' in host_config['option']: reservation['boot-file-name'] = host_config['option']['bootfile_name'] if 'bootfile_server' in host_config['option']: reservation['next-server'] = host_config['option']['bootfile_server'] reservations.append(reservation) out['reservations'] = reservations return out def kea6_parse_options(config): options = [] for node, option_name in kea6_options.items(): if node not in config: continue value = ", ".join(config[node]) if isinstance(config[node], list) else config[node] options.append({'name': option_name, 'data': value}) if 'sip_server' in config: sip_servers = config['sip_server'] addrs = [] hosts = [] for server in sip_servers: if is_ipv6(server): addrs.append(server) else: hosts.append(server) if addrs: options.append({'name': 'sip-server-addr', 'data': ", ".join(addrs)}) if hosts: options.append({'name': 'sip-server-dns', 'data': ", ".join(hosts)}) cisco_tftp = dict_search_args(config, 'vendor_option', 'cisco', 'tftp-server') if cisco_tftp: options.append({'name': 'tftp-servers', 'code': 2, 'space': 'cisco', 'data': cisco_tftp}) return options def kea6_parse_subnet(subnet, config): out = {'subnet': subnet, 'id': int(config['subnet_id'])} if 'option' in config: out['option-data'] = kea6_parse_options(config['option']) if 'interface' in config: out['interface'] = config['interface'] if 'range' in config: pools = [] for num, range_config in config['range'].items(): pool = {} if 'prefix' in range_config: pool['pool'] = range_config['prefix'] if 'start' in range_config: start = range_config['start'] stop = range_config['stop'] pool['pool'] = f'{start} - {stop}' if 'option' in range_config: pool['option-data'] = kea6_parse_options(range_config['option']) pools.append(pool) out['pools'] = pools if 'prefix_delegation' in config: pd_pools = [] if 'prefix' in config['prefix_delegation']: for prefix, pd_conf in config['prefix_delegation']['prefix'].items(): pd_pool = { 'prefix': prefix, 'prefix-len': int(pd_conf['prefix_length']), 'delegated-len': int(pd_conf['delegated_length']) } if 'excluded_prefix' in pd_conf: pd_pool['excluded-prefix'] = pd_conf['excluded_prefix'] pd_pool['excluded-prefix-len'] = int(pd_conf['excluded_prefix_length']) pd_pools.append(pd_pool) out['pd-pools'] = pd_pools if 'lease_time' in config: if 'default' in config['lease_time']: out['valid-lifetime'] = int(config['lease_time']['default']) if 'maximum' in config['lease_time']: out['max-valid-lifetime'] = int(config['lease_time']['maximum']) if 'minimum' in config['lease_time']: out['min-valid-lifetime'] = int(config['lease_time']['minimum']) if 'static_mapping' in config: reservations = [] for host, host_config in config['static_mapping'].items(): if 'disable' in host_config: continue reservation = { 'hostname': host } if 'mac' in host_config: reservation['hw-address'] = host_config['mac'] if 'duid' in host_config: reservation['duid'] = host_config['duid'] if 'ipv6_address' in host_config: reservation['ip-addresses'] = [ host_config['ipv6_address'] ] if 'ipv6_prefix' in host_config: reservation['prefixes'] = [ host_config['ipv6_prefix'] ] if 'option' in host_config: reservation['option-data'] = kea6_parse_options(host_config['option']) reservations.append(reservation) out['reservations'] = reservations return out def _ctrl_socket_command(path, command, args=None): if not os.path.exists(path): return None if file_permissions(path) != '0775': run(f'sudo chmod 775 {path}') with socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) as sock: sock.connect(path) payload = {'command': command} if args: payload['arguments'] = args sock.send(bytes(json.dumps(payload), 'utf-8')) result = b'' while True: data = sock.recv(4096) result += data if len(data) < 4096: break return json.loads(result.decode('utf-8')) def kea_get_leases(inet): ctrl_socket = f'/run/kea/dhcp{inet}-ctrl-socket' leases = _ctrl_socket_command(ctrl_socket, f'lease{inet}-get-all') if not leases or 'result' not in leases or leases['result'] != 0: return [] return leases['arguments']['leases'] def kea_get_active_config(inet): ctrl_socket = f'/run/kea/dhcp{inet}-ctrl-socket' config = _ctrl_socket_command(ctrl_socket, 'config-get') if not config or 'result' not in config or config['result'] != 0: return None return config def kea_get_pool_from_subnet_id(config, inet, subnet_id): shared_networks = dict_search_args(config, 'arguments', f'Dhcp{inet}', 'shared-networks') if not shared_networks: return None for network in shared_networks: if f'subnet{inet}' not in network: continue for subnet in network[f'subnet{inet}']: if 'id' in subnet and int(subnet['id']) == int(subnet_id): return network['name'] return None diff --git a/smoketest/scripts/cli/test_service_dhcp-server.py b/smoketest/scripts/cli/test_service_dhcp-server.py index 194289567..b582a2108 100755 --- a/smoketest/scripts/cli/test_service_dhcp-server.py +++ b/smoketest/scripts/cli/test_service_dhcp-server.py @@ -1,740 +1,742 @@ #!/usr/bin/env python3 # # Copyright (C) 2020-2024 VyOS maintainers and contributors # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License version 2 or later as # published by the Free Software Foundation. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. import os import unittest from json import loads from base_vyostest_shim import VyOSUnitTestSHIM from vyos.configsession import ConfigSessionError from vyos.utils.process import process_named_running from vyos.utils.file import read_file from vyos.template import inc_ip from vyos.template import dec_ip PROCESS_NAME = 'kea-dhcp4' CTRL_PROCESS_NAME = 'kea-ctrl-agent' KEA4_CONF = '/run/kea/kea-dhcp4.conf' KEA4_CTRL = '/run/kea/dhcp4-ctrl-socket' base_path = ['service', 'dhcp-server'] interface = 'dum8765' subnet = '192.0.2.0/25' router = inc_ip(subnet, 1) dns_1 = inc_ip(subnet, 2) dns_2 = inc_ip(subnet, 3) domain_name = 'vyos.net' class TestServiceDHCPServer(VyOSUnitTestSHIM.TestCase): @classmethod def setUpClass(cls): super(TestServiceDHCPServer, cls).setUpClass() # Clear out current configuration to allow running this test on a live system cls.cli_delete(cls, base_path) cidr_mask = subnet.split('/')[-1] cls.cli_set(cls, ['interfaces', 'dummy', interface, 'address', f'{router}/{cidr_mask}']) @classmethod def tearDownClass(cls): cls.cli_delete(cls, ['interfaces', 'dummy', interface]) super(TestServiceDHCPServer, cls).tearDownClass() def tearDown(self): self.cli_delete(base_path) self.cli_commit() def walk_path(self, obj, path): current = obj for i, key in enumerate(path): if isinstance(key, str): self.assertTrue(isinstance(current, dict), msg=f'Failed path: {path}') self.assertTrue(key in current, msg=f'Failed path: {path}') elif isinstance(key, int): self.assertTrue(isinstance(current, list), msg=f'Failed path: {path}') self.assertTrue(0 <= key < len(current), msg=f'Failed path: {path}') else: assert False, "Invalid type" current = current[key] return current def verify_config_object(self, obj, path, value): base_obj = self.walk_path(obj, path) self.assertTrue(isinstance(base_obj, list)) self.assertTrue(any(True for v in base_obj if v == value)) def verify_config_value(self, obj, path, key, value): base_obj = self.walk_path(obj, path) if isinstance(base_obj, list): self.assertTrue(any(True for v in base_obj if key in v and v[key] == value)) elif isinstance(base_obj, dict): self.assertTrue(key in base_obj) self.assertEqual(base_obj[key], value) def test_dhcp_single_pool_range(self): shared_net_name = 'SMOKE-1' range_0_start = inc_ip(subnet, 10) range_0_stop = inc_ip(subnet, 20) range_1_start = inc_ip(subnet, 40) range_1_stop = inc_ip(subnet, 50) self.cli_set(base_path + ['listen-interface', interface]) pool = base_path + ['shared-network-name', shared_net_name, 'subnet', subnet] self.cli_set(pool + ['subnet-id', '1']) + self.cli_set(pool + ['ignore-client-id']) # we use the first subnet IP address as default gateway self.cli_set(pool + ['option', 'default-router', router]) self.cli_set(pool + ['option', 'name-server', dns_1]) self.cli_set(pool + ['option', 'name-server', dns_2]) self.cli_set(pool + ['option', 'domain-name', domain_name]) # check validate() - No DHCP address range or active static-mapping set with self.assertRaises(ConfigSessionError): self.cli_commit() self.cli_set(pool + ['range', '0', 'start', range_0_start]) self.cli_set(pool + ['range', '0', 'stop', range_0_stop]) self.cli_set(pool + ['range', '1', 'start', range_1_start]) self.cli_set(pool + ['range', '1', 'stop', range_1_stop]) # commit changes self.cli_commit() config = read_file(KEA4_CONF) obj = loads(config) self.verify_config_value(obj, ['Dhcp4', 'interfaces-config'], 'interfaces', [interface]) self.verify_config_value(obj, ['Dhcp4', 'shared-networks'], 'name', shared_net_name) self.verify_config_value(obj, ['Dhcp4', 'shared-networks', 0, 'subnet4'], 'subnet', subnet) self.verify_config_value(obj, ['Dhcp4', 'shared-networks', 0, 'subnet4'], 'id', 1) + self.verify_config_value(obj, ['Dhcp4', 'shared-networks', 0, 'subnet4'], 'match-client-id', False) self.verify_config_value(obj, ['Dhcp4', 'shared-networks', 0, 'subnet4'], 'valid-lifetime', 86400) self.verify_config_value(obj, ['Dhcp4', 'shared-networks', 0, 'subnet4'], 'max-valid-lifetime', 86400) # Verify options self.verify_config_object( obj, ['Dhcp4', 'shared-networks', 0, 'subnet4', 0, 'option-data'], {'name': 'domain-name', 'data': domain_name}) self.verify_config_object( obj, ['Dhcp4', 'shared-networks', 0, 'subnet4', 0, 'option-data'], {'name': 'domain-name-servers', 'data': f'{dns_1}, {dns_2}'}) self.verify_config_object( obj, ['Dhcp4', 'shared-networks', 0, 'subnet4', 0, 'option-data'], {'name': 'routers', 'data': router}) # Verify pools self.verify_config_object( obj, ['Dhcp4', 'shared-networks', 0, 'subnet4', 0, 'pools'], {'pool': f'{range_0_start} - {range_0_stop}'}) self.verify_config_object( obj, ['Dhcp4', 'shared-networks', 0, 'subnet4', 0, 'pools'], {'pool': f'{range_1_start} - {range_1_stop}'}) # Check for running process self.assertTrue(process_named_running(PROCESS_NAME)) def test_dhcp_single_pool_options(self): shared_net_name = 'SMOKE-0815' range_0_start = inc_ip(subnet, 10) range_0_stop = inc_ip(subnet, 20) smtp_server = '1.2.3.4' time_server = '4.3.2.1' tftp_server = 'tftp.vyos.io' search_domains = ['foo.vyos.net', 'bar.vyos.net'] bootfile_name = 'vyos' bootfile_server = '192.0.2.1' wpad = 'http://wpad.vyos.io/foo/bar' server_identifier = bootfile_server ipv6_only_preferred = '300' pool = base_path + ['shared-network-name', shared_net_name, 'subnet', subnet] self.cli_set(pool + ['subnet-id', '1']) # we use the first subnet IP address as default gateway self.cli_set(pool + ['option', 'default-router', router]) self.cli_set(pool + ['option', 'name-server', dns_1]) self.cli_set(pool + ['option', 'name-server', dns_2]) self.cli_set(pool + ['option', 'domain-name', domain_name]) self.cli_set(pool + ['option', 'ip-forwarding']) self.cli_set(pool + ['option', 'smtp-server', smtp_server]) self.cli_set(pool + ['option', 'pop-server', smtp_server]) self.cli_set(pool + ['option', 'time-server', time_server]) self.cli_set(pool + ['option', 'tftp-server-name', tftp_server]) for search in search_domains: self.cli_set(pool + ['option', 'domain-search', search]) self.cli_set(pool + ['option', 'bootfile-name', bootfile_name]) self.cli_set(pool + ['option', 'bootfile-server', bootfile_server]) self.cli_set(pool + ['option', 'wpad-url', wpad]) self.cli_set(pool + ['option', 'server-identifier', server_identifier]) self.cli_set(pool + ['option', 'static-route', '10.0.0.0/24', 'next-hop', '192.0.2.1']) self.cli_set(pool + ['option', 'ipv6-only-preferred', ipv6_only_preferred]) self.cli_set(pool + ['option', 'time-zone', 'Europe/London']) self.cli_set(pool + ['range', '0', 'start', range_0_start]) self.cli_set(pool + ['range', '0', 'stop', range_0_stop]) # commit changes self.cli_commit() config = read_file(KEA4_CONF) obj = loads(config) self.verify_config_value(obj, ['Dhcp4', 'shared-networks'], 'name', shared_net_name) self.verify_config_value(obj, ['Dhcp4', 'shared-networks', 0, 'subnet4'], 'subnet', subnet) self.verify_config_value(obj, ['Dhcp4', 'shared-networks', 0, 'subnet4'], 'boot-file-name', bootfile_name) self.verify_config_value(obj, ['Dhcp4', 'shared-networks', 0, 'subnet4'], 'next-server', bootfile_server) self.verify_config_value(obj, ['Dhcp4', 'shared-networks', 0, 'subnet4'], 'valid-lifetime', 86400) self.verify_config_value(obj, ['Dhcp4', 'shared-networks', 0, 'subnet4'], 'max-valid-lifetime', 86400) # Verify options self.verify_config_object( obj, ['Dhcp4', 'shared-networks', 0, 'subnet4', 0, 'option-data'], {'name': 'domain-name', 'data': domain_name}) self.verify_config_object( obj, ['Dhcp4', 'shared-networks', 0, 'subnet4', 0, 'option-data'], {'name': 'domain-name-servers', 'data': f'{dns_1}, {dns_2}'}) self.verify_config_object( obj, ['Dhcp4', 'shared-networks', 0, 'subnet4', 0, 'option-data'], {'name': 'domain-search', 'data': ', '.join(search_domains)}) self.verify_config_object( obj, ['Dhcp4', 'shared-networks', 0, 'subnet4', 0, 'option-data'], {'name': 'pop-server', 'data': smtp_server}) self.verify_config_object( obj, ['Dhcp4', 'shared-networks', 0, 'subnet4', 0, 'option-data'], {'name': 'smtp-server', 'data': smtp_server}) self.verify_config_object( obj, ['Dhcp4', 'shared-networks', 0, 'subnet4', 0, 'option-data'], {'name': 'time-servers', 'data': time_server}) self.verify_config_object( obj, ['Dhcp4', 'shared-networks', 0, 'subnet4', 0, 'option-data'], {'name': 'routers', 'data': router}) self.verify_config_object( obj, ['Dhcp4', 'shared-networks', 0, 'subnet4', 0, 'option-data'], {'name': 'dhcp-server-identifier', 'data': server_identifier}) self.verify_config_object( obj, ['Dhcp4', 'shared-networks', 0, 'subnet4', 0, 'option-data'], {'name': 'tftp-server-name', 'data': tftp_server}) self.verify_config_object( obj, ['Dhcp4', 'shared-networks', 0, 'subnet4', 0, 'option-data'], {'name': 'wpad-url', 'data': wpad}) self.verify_config_object( obj, ['Dhcp4', 'shared-networks', 0, 'subnet4', 0, 'option-data'], {'name': 'rfc3442-static-route', 'data': '24,10,0,0,192,0,2,1, 0,192,0,2,1'}) self.verify_config_object( obj, ['Dhcp4', 'shared-networks', 0, 'subnet4', 0, 'option-data'], {'name': 'windows-static-route', 'data': '24,10,0,0,192,0,2,1'}) self.verify_config_object( obj, ['Dhcp4', 'shared-networks', 0, 'subnet4', 0, 'option-data'], {'name': 'v6-only-preferred', 'data': ipv6_only_preferred}) self.verify_config_object( obj, ['Dhcp4', 'shared-networks', 0, 'subnet4', 0, 'option-data'], {'name': 'ip-forwarding', 'data': "true"}) # Time zone self.verify_config_object( obj, ['Dhcp4', 'shared-networks', 0, 'subnet4', 0, 'option-data'], {'name': 'pcode', 'data': 'GMT0BST,M3.5.0/1,M10.5.0'}) self.verify_config_object( obj, ['Dhcp4', 'shared-networks', 0, 'subnet4', 0, 'option-data'], {'name': 'tcode', 'data': 'Europe/London'}) # Verify pools self.verify_config_object( obj, ['Dhcp4', 'shared-networks', 0, 'subnet4', 0, 'pools'], {'pool': f'{range_0_start} - {range_0_stop}'}) # Check for running process self.assertTrue(process_named_running(PROCESS_NAME)) def test_dhcp_single_pool_options_scoped(self): shared_net_name = 'SMOKE-2' range_0_start = inc_ip(subnet, 10) range_0_stop = inc_ip(subnet, 20) range_router = inc_ip(subnet, 5) range_dns_1 = inc_ip(subnet, 6) range_dns_2 = inc_ip(subnet, 7) shared_network = base_path + ['shared-network-name', shared_net_name] pool = shared_network + ['subnet', subnet] self.cli_set(pool + ['subnet-id', '1']) # we use the first subnet IP address as default gateway self.cli_set(shared_network + ['option', 'default-router', router]) self.cli_set(shared_network + ['option', 'name-server', dns_1]) self.cli_set(shared_network + ['option', 'name-server', dns_2]) self.cli_set(shared_network + ['option', 'domain-name', domain_name]) self.cli_set(pool + ['range', '0', 'start', range_0_start]) self.cli_set(pool + ['range', '0', 'stop', range_0_stop]) self.cli_set(pool + ['range', '0', 'option', 'default-router', range_router]) self.cli_set(pool + ['range', '0', 'option', 'name-server', range_dns_1]) self.cli_set(pool + ['range', '0', 'option', 'name-server', range_dns_2]) # commit changes self.cli_commit() config = read_file(KEA4_CONF) obj = loads(config) self.verify_config_value(obj, ['Dhcp4', 'shared-networks'], 'name', shared_net_name) self.verify_config_value(obj, ['Dhcp4', 'shared-networks', 0, 'subnet4'], 'subnet', subnet) self.verify_config_value(obj, ['Dhcp4', 'shared-networks', 0, 'subnet4'], 'valid-lifetime', 86400) self.verify_config_value(obj, ['Dhcp4', 'shared-networks', 0, 'subnet4'], 'max-valid-lifetime', 86400) # Verify shared-network options self.verify_config_object( obj, ['Dhcp4', 'shared-networks', 0, 'option-data'], {'name': 'domain-name', 'data': domain_name}) self.verify_config_object( obj, ['Dhcp4', 'shared-networks', 0, 'option-data'], {'name': 'domain-name-servers', 'data': f'{dns_1}, {dns_2}'}) self.verify_config_object( obj, ['Dhcp4', 'shared-networks', 0, 'option-data'], {'name': 'routers', 'data': router}) # Verify range options self.verify_config_object( obj, ['Dhcp4', 'shared-networks', 0, 'subnet4', 0, 'pools', 0, 'option-data'], {'name': 'domain-name-servers', 'data': f'{range_dns_1}, {range_dns_2}'}) self.verify_config_object( obj, ['Dhcp4', 'shared-networks', 0, 'subnet4', 0, 'pools', 0, 'option-data'], {'name': 'routers', 'data': range_router}) # Verify pool self.verify_config_value(obj, ['Dhcp4', 'shared-networks', 0, 'subnet4', 0, 'pools'], 'pool', f'{range_0_start} - {range_0_stop}') # Check for running process self.assertTrue(process_named_running(PROCESS_NAME)) def test_dhcp_single_pool_static_mapping(self): shared_net_name = 'SMOKE-2' domain_name = 'private' pool = base_path + ['shared-network-name', shared_net_name, 'subnet', subnet] self.cli_set(pool + ['subnet-id', '1']) # we use the first subnet IP address as default gateway self.cli_set(pool + ['option', 'default-router', router]) self.cli_set(pool + ['option', 'name-server', dns_1]) self.cli_set(pool + ['option', 'name-server', dns_2]) self.cli_set(pool + ['option', 'domain-name', domain_name]) # check validate() - No DHCP address range or active static-mapping set with self.assertRaises(ConfigSessionError): self.cli_commit() client_base = 10 for client in ['client1', 'client2', 'client3']: mac = '00:50:00:00:00:{}'.format(client_base) self.cli_set(pool + ['static-mapping', client, 'mac', mac]) self.cli_set(pool + ['static-mapping', client, 'ip-address', inc_ip(subnet, client_base)]) client_base += 1 # cannot have both mac-address and duid set with self.assertRaises(ConfigSessionError): self.cli_set(pool + ['static-mapping', 'client1', 'duid', '00:01:00:01:12:34:56:78:aa:bb:cc:dd:ee:11']) self.cli_commit() self.cli_delete(pool + ['static-mapping', 'client1', 'duid']) # cannot have mappings with duplicate IP addresses self.cli_set(pool + ['static-mapping', 'dupe1', 'mac', '00:50:00:00:fe:ff']) self.cli_set(pool + ['static-mapping', 'dupe1', 'ip-address', inc_ip(subnet, 10)]) with self.assertRaises(ConfigSessionError): self.cli_commit() # Should allow disabled duplicate self.cli_set(pool + ['static-mapping', 'dupe1', 'disable']) self.cli_commit() self.cli_delete(pool + ['static-mapping', 'dupe1']) # cannot have mappings with duplicate MAC addresses self.cli_set(pool + ['static-mapping', 'dupe2', 'mac', '00:50:00:00:00:10']) self.cli_set(pool + ['static-mapping', 'dupe2', 'ip-address', inc_ip(subnet, 120)]) with self.assertRaises(ConfigSessionError): self.cli_commit() self.cli_delete(pool + ['static-mapping', 'dupe2']) # cannot have mappings with duplicate MAC addresses self.cli_set(pool + ['static-mapping', 'dupe3', 'duid', '00:01:02:03:04:05:06:07:aa:aa:aa:aa:aa:01']) self.cli_set(pool + ['static-mapping', 'dupe3', 'ip-address', inc_ip(subnet, 121)]) self.cli_set(pool + ['static-mapping', 'dupe4', 'duid', '00:01:02:03:04:05:06:07:aa:aa:aa:aa:aa:01']) self.cli_set(pool + ['static-mapping', 'dupe4', 'ip-address', inc_ip(subnet, 121)]) with self.assertRaises(ConfigSessionError): self.cli_commit() self.cli_delete(pool + ['static-mapping', 'dupe3']) self.cli_delete(pool + ['static-mapping', 'dupe4']) # commit changes self.cli_commit() config = read_file(KEA4_CONF) obj = loads(config) self.verify_config_value(obj, ['Dhcp4', 'shared-networks'], 'name', shared_net_name) self.verify_config_value(obj, ['Dhcp4', 'shared-networks', 0, 'subnet4'], 'subnet', subnet) self.verify_config_value(obj, ['Dhcp4', 'shared-networks', 0, 'subnet4'], 'id', 1) self.verify_config_value(obj, ['Dhcp4', 'shared-networks', 0, 'subnet4'], 'valid-lifetime', 86400) self.verify_config_value(obj, ['Dhcp4', 'shared-networks', 0, 'subnet4'], 'max-valid-lifetime', 86400) # Verify options self.verify_config_object( obj, ['Dhcp4', 'shared-networks', 0, 'subnet4', 0, 'option-data'], {'name': 'domain-name', 'data': domain_name}) self.verify_config_object( obj, ['Dhcp4', 'shared-networks', 0, 'subnet4', 0, 'option-data'], {'name': 'domain-name-servers', 'data': f'{dns_1}, {dns_2}'}) self.verify_config_object( obj, ['Dhcp4', 'shared-networks', 0, 'subnet4', 0, 'option-data'], {'name': 'routers', 'data': router}) client_base = 10 for client in ['client1', 'client2', 'client3']: mac = '00:50:00:00:00:{}'.format(client_base) ip = inc_ip(subnet, client_base) self.verify_config_object( obj, ['Dhcp4', 'shared-networks', 0, 'subnet4', 0, 'reservations'], {'hostname': client, 'hw-address': mac, 'ip-address': ip}) client_base += 1 # Check for running process self.assertTrue(process_named_running(PROCESS_NAME)) def test_dhcp_multiple_pools(self): lease_time = '14400' for network in ['0', '1', '2', '3']: shared_net_name = f'VyOS-SMOKETEST-{network}' subnet = f'192.0.{network}.0/24' router = inc_ip(subnet, 1) dns_1 = inc_ip(subnet, 2) range_0_start = inc_ip(subnet, 10) range_0_stop = inc_ip(subnet, 20) range_1_start = inc_ip(subnet, 30) range_1_stop = inc_ip(subnet, 40) pool = base_path + ['shared-network-name', shared_net_name, 'subnet', subnet] self.cli_set(pool + ['subnet-id', str(int(network) + 1)]) # we use the first subnet IP address as default gateway self.cli_set(pool + ['option', 'default-router', router]) self.cli_set(pool + ['option', 'name-server', dns_1]) self.cli_set(pool + ['option', 'domain-name', domain_name]) self.cli_set(pool + ['lease', lease_time]) self.cli_set(pool + ['range', '0', 'start', range_0_start]) self.cli_set(pool + ['range', '0', 'stop', range_0_stop]) self.cli_set(pool + ['range', '1', 'start', range_1_start]) self.cli_set(pool + ['range', '1', 'stop', range_1_stop]) client_base = 60 for client in ['client1', 'client2', 'client3', 'client4']: mac = '02:50:00:00:00:{}'.format(client_base) self.cli_set(pool + ['static-mapping', client, 'mac', mac]) self.cli_set(pool + ['static-mapping', client, 'ip-address', inc_ip(subnet, client_base)]) client_base += 1 # commit changes self.cli_commit() config = read_file(KEA4_CONF) obj = loads(config) for network in ['0', '1', '2', '3']: shared_net_name = f'VyOS-SMOKETEST-{network}' subnet = f'192.0.{network}.0/24' router = inc_ip(subnet, 1) dns_1 = inc_ip(subnet, 2) range_0_start = inc_ip(subnet, 10) range_0_stop = inc_ip(subnet, 20) range_1_start = inc_ip(subnet, 30) range_1_stop = inc_ip(subnet, 40) self.verify_config_value(obj, ['Dhcp4', 'shared-networks'], 'name', shared_net_name) self.verify_config_value(obj, ['Dhcp4', 'shared-networks', int(network), 'subnet4'], 'subnet', subnet) self.verify_config_value(obj, ['Dhcp4', 'shared-networks', int(network), 'subnet4'], 'id', int(network) + 1) self.verify_config_value(obj, ['Dhcp4', 'shared-networks', int(network), 'subnet4'], 'valid-lifetime', int(lease_time)) self.verify_config_value(obj, ['Dhcp4', 'shared-networks', int(network), 'subnet4'], 'max-valid-lifetime', int(lease_time)) self.verify_config_object( obj, ['Dhcp4', 'shared-networks', int(network), 'subnet4', 0, 'option-data'], {'name': 'domain-name', 'data': domain_name}) self.verify_config_object( obj, ['Dhcp4', 'shared-networks', int(network), 'subnet4', 0, 'option-data'], {'name': 'domain-name-servers', 'data': dns_1}) self.verify_config_object( obj, ['Dhcp4', 'shared-networks', int(network), 'subnet4', 0, 'option-data'], {'name': 'routers', 'data': router}) self.verify_config_object( obj, ['Dhcp4', 'shared-networks', int(network), 'subnet4', 0, 'pools'], {'pool': f'{range_0_start} - {range_0_stop}'}) self.verify_config_object( obj, ['Dhcp4', 'shared-networks', int(network), 'subnet4', 0, 'pools'], {'pool': f'{range_1_start} - {range_1_stop}'}) client_base = 60 for client in ['client1', 'client2', 'client3', 'client4']: mac = '02:50:00:00:00:{}'.format(client_base) ip = inc_ip(subnet, client_base) self.verify_config_object( obj, ['Dhcp4', 'shared-networks', int(network), 'subnet4', 0, 'reservations'], {'hostname': client, 'hw-address': mac, 'ip-address': ip}) client_base += 1 # Check for running process self.assertTrue(process_named_running(PROCESS_NAME)) def test_dhcp_exclude_not_in_range(self): # T3180: verify else path when slicing DHCP ranges and exclude address # is not part of the DHCP range range_0_start = inc_ip(subnet, 10) range_0_stop = inc_ip(subnet, 20) pool = base_path + ['shared-network-name', 'EXCLUDE-TEST', 'subnet', subnet] self.cli_set(pool + ['subnet-id', '1']) self.cli_set(pool + ['option', 'default-router', router]) self.cli_set(pool + ['exclude', router]) self.cli_set(pool + ['range', '0', 'start', range_0_start]) self.cli_set(pool + ['range', '0', 'stop', range_0_stop]) # commit changes self.cli_commit() config = read_file(KEA4_CONF) obj = loads(config) self.verify_config_value(obj, ['Dhcp4', 'shared-networks'], 'name', 'EXCLUDE-TEST') self.verify_config_value(obj, ['Dhcp4', 'shared-networks', 0, 'subnet4'], 'subnet', subnet) # Verify options self.verify_config_object( obj, ['Dhcp4', 'shared-networks', 0, 'subnet4', 0, 'option-data'], {'name': 'routers', 'data': router}) # Verify pools self.verify_config_object( obj, ['Dhcp4', 'shared-networks', 0, 'subnet4', 0, 'pools'], {'pool': f'{range_0_start} - {range_0_stop}'}) # Check for running process self.assertTrue(process_named_running(PROCESS_NAME)) def test_dhcp_exclude_in_range(self): # T3180: verify else path when slicing DHCP ranges and exclude address # is not part of the DHCP range range_0_start = inc_ip(subnet, 10) range_0_stop = inc_ip(subnet, 100) # the DHCP exclude addresse is blanked out of the range which is done # by slicing one range into two ranges exclude_addr = inc_ip(range_0_start, 20) range_0_stop_excl = dec_ip(exclude_addr, 1) range_0_start_excl = inc_ip(exclude_addr, 1) pool = base_path + ['shared-network-name', 'EXCLUDE-TEST-2', 'subnet', subnet] self.cli_set(pool + ['subnet-id', '1']) self.cli_set(pool + ['option', 'default-router', router]) self.cli_set(pool + ['exclude', exclude_addr]) self.cli_set(pool + ['range', '0', 'start', range_0_start]) self.cli_set(pool + ['range', '0', 'stop', range_0_stop]) # commit changes self.cli_commit() config = read_file(KEA4_CONF) obj = loads(config) self.verify_config_value(obj, ['Dhcp4', 'shared-networks'], 'name', 'EXCLUDE-TEST-2') self.verify_config_value(obj, ['Dhcp4', 'shared-networks', 0, 'subnet4'], 'subnet', subnet) # Verify options self.verify_config_object( obj, ['Dhcp4', 'shared-networks', 0, 'subnet4', 0, 'option-data'], {'name': 'routers', 'data': router}) self.verify_config_object( obj, ['Dhcp4', 'shared-networks', 0, 'subnet4', 0, 'pools'], {'pool': f'{range_0_start} - {range_0_stop_excl}'}) self.verify_config_object( obj, ['Dhcp4', 'shared-networks', 0, 'subnet4', 0, 'pools'], {'pool': f'{range_0_start_excl} - {range_0_stop}'}) # Check for running process self.assertTrue(process_named_running(PROCESS_NAME)) def test_dhcp_relay_server(self): # Listen on specific address and return DHCP leases from a non # directly connected pool self.cli_set(base_path + ['listen-address', router]) relay_subnet = '10.0.0.0/16' relay_router = inc_ip(relay_subnet, 1) range_0_start = '10.0.1.0' range_0_stop = '10.0.250.255' pool = base_path + ['shared-network-name', 'RELAY', 'subnet', relay_subnet] self.cli_set(pool + ['subnet-id', '1']) self.cli_set(pool + ['option', 'default-router', relay_router]) self.cli_set(pool + ['range', '0', 'start', range_0_start]) self.cli_set(pool + ['range', '0', 'stop', range_0_stop]) # commit changes self.cli_commit() config = read_file(KEA4_CONF) obj = loads(config) self.verify_config_value(obj, ['Dhcp4', 'interfaces-config'], 'interfaces', [f'{interface}/{router}']) self.verify_config_value(obj, ['Dhcp4', 'shared-networks'], 'name', 'RELAY') self.verify_config_value(obj, ['Dhcp4', 'shared-networks', 0, 'subnet4'], 'subnet', relay_subnet) # Verify options self.verify_config_object( obj, ['Dhcp4', 'shared-networks', 0, 'subnet4', 0, 'option-data'], {'name': 'routers', 'data': relay_router}) # Verify pools self.verify_config_object( obj, ['Dhcp4', 'shared-networks', 0, 'subnet4', 0, 'pools'], {'pool': f'{range_0_start} - {range_0_stop}'}) # Check for running process self.assertTrue(process_named_running(PROCESS_NAME)) def test_dhcp_failover(self): shared_net_name = 'FAILOVER' failover_name = 'VyOS-Failover' range_0_start = inc_ip(subnet, 10) range_0_stop = inc_ip(subnet, 20) pool = base_path + ['shared-network-name', shared_net_name, 'subnet', subnet] self.cli_set(pool + ['subnet-id', '1']) # we use the first subnet IP address as default gateway self.cli_set(pool + ['option', 'default-router', router]) # check validate() - No DHCP address range or active static-mapping set with self.assertRaises(ConfigSessionError): self.cli_commit() self.cli_set(pool + ['range', '0', 'start', range_0_start]) self.cli_set(pool + ['range', '0', 'stop', range_0_stop]) # failover failover_local = router failover_remote = inc_ip(router, 1) self.cli_set(base_path + ['failover', 'source-address', failover_local]) self.cli_set(base_path + ['failover', 'name', failover_name]) self.cli_set(base_path + ['failover', 'remote', failover_remote]) self.cli_set(base_path + ['failover', 'status', 'primary']) # commit changes self.cli_commit() config = read_file(KEA4_CONF) obj = loads(config) # Verify failover self.verify_config_value(obj, ['Dhcp4', 'control-socket'], 'socket-name', KEA4_CTRL) self.verify_config_object( obj, ['Dhcp4', 'hooks-libraries', 0, 'parameters', 'high-availability', 0, 'peers'], {'name': os.uname()[1], 'url': f'http://{failover_local}:647/', 'role': 'primary', 'auto-failover': True}) self.verify_config_object( obj, ['Dhcp4', 'hooks-libraries', 0, 'parameters', 'high-availability', 0, 'peers'], {'name': failover_name, 'url': f'http://{failover_remote}:647/', 'role': 'standby', 'auto-failover': True}) self.verify_config_value(obj, ['Dhcp4', 'shared-networks'], 'name', shared_net_name) self.verify_config_value(obj, ['Dhcp4', 'shared-networks', 0, 'subnet4'], 'subnet', subnet) # Verify options self.verify_config_object( obj, ['Dhcp4', 'shared-networks', 0, 'subnet4', 0, 'option-data'], {'name': 'routers', 'data': router}) # Verify pools self.verify_config_object( obj, ['Dhcp4', 'shared-networks', 0, 'subnet4', 0, 'pools'], {'pool': f'{range_0_start} - {range_0_stop}'}) # Check for running process self.assertTrue(process_named_running(PROCESS_NAME)) self.assertTrue(process_named_running(CTRL_PROCESS_NAME)) if __name__ == '__main__': unittest.main(verbosity=2)