diff --git a/op-mode-definitions/clear-dhcp-server-lease.xml.in b/op-mode-definitions/clear-dhcp-server-lease.xml.in deleted file mode 100644 index aef0eb22a..000000000 --- a/op-mode-definitions/clear-dhcp-server-lease.xml.in +++ /dev/null @@ -1,20 +0,0 @@ -<?xml version="1.0"?> -<interfaceDefinition> - <node name="clear"> - <children> - <node name="dhcp-server"> - <properties> - <help>Clear DHCP server lease</help> - </properties> - <children> - <tagNode name="lease"> - <properties> - <help>DHCP server lease</help> - </properties> - <command>sudo ${vyos_op_scripts_dir}/clear_dhcp_lease.py --ip $4</command> - </tagNode> - </children> - </node> - </children> - </node> -</interfaceDefinition> diff --git a/op-mode-definitions/dhcp.xml.in b/op-mode-definitions/dhcp.xml.in index 0db7471e5..3c42c8e8f 100644 --- a/op-mode-definitions/dhcp.xml.in +++ b/op-mode-definitions/dhcp.xml.in @@ -1,288 +1,318 @@ <?xml version="1.0" encoding="UTF-8"?> <interfaceDefinition> + <node name="clear"> + <children> + <node name="dhcp-server"> + <properties> + <help>Clear DHCP server lease</help> + </properties> + <children> + <tagNode name="lease"> + <properties> + <help>DHCP server lease</help> + </properties> + <command>${vyos_op_scripts_dir}/dhcp.py clear_dhcp_server_lease --family inet --address $4</command> + </tagNode> + </children> + </node> + <node name="dhcpv6-server"> + <properties> + <help>Clear DHCPv6 server lease</help> + </properties> + <children> + <tagNode name="lease"> + <properties> + <help>DHCPv6 server lease</help> + </properties> + <command>${vyos_op_scripts_dir}/dhcp.py clear_dhcp_server_lease --family inet6 --address $4</command> + </tagNode> + </children> + </node> + </children> + </node> <node name="show"> <children> <node name="dhcp"> <properties> <help>Show DHCP (Dynamic Host Configuration Protocol) information</help> </properties> <children> <node name="client"> <properties> <help>Show DHCP client information</help> </properties> <children> <node name="leases"> <properties> <help>Show DHCP client leases</help> </properties> <children> <tagNode name="interface"> <properties> <help> Show DHCP client information for a given interface</help> <completionHelp> <script>${vyos_completion_dir}/list_interfaces --broadcast</script> </completionHelp> </properties> <command>${vyos_op_scripts_dir}/dhcp.py show_client_leases --family inet --interface $6</command> </tagNode> </children> <command>${vyos_op_scripts_dir}/dhcp.py show_client_leases --family inet</command> </node> </children> </node> <node name="server"> <properties> <help>Show DHCP server information</help> </properties> <children> <node name="leases"> <properties> <help>Show DHCP server leases</help> </properties> <command>${vyos_op_scripts_dir}/dhcp.py show_server_leases --family inet</command> <children> <tagNode name="origin"> <properties> <help>Show DHCP server leases granted by local or remote DHCP server</help> <completionHelp> <list>local remote</list> </completionHelp> </properties> <command>${vyos_op_scripts_dir}/dhcp.py show_server_leases --family inet --origin $6</command> </tagNode> <tagNode name="pool"> <properties> <help>Show DHCP server leases for a specific pool</help> <completionHelp> <path>service dhcp-server shared-network-name</path> </completionHelp> </properties> <command>${vyos_op_scripts_dir}/dhcp.py show_server_leases --family inet --pool $6</command> </tagNode> <tagNode name="sort"> <properties> <help>Show DHCP server leases sorted by the specified key</help> <completionHelp> <list>end hostname ip mac pool remaining start state</list> </completionHelp> </properties> <command>${vyos_op_scripts_dir}/dhcp.py show_server_leases --family inet --sort $6</command> </tagNode> <tagNode name="state"> <properties> <help>Show DHCP server leases with a specific state (can be multiple, comma-separated)</help> <completionHelp> <list>abandoned active all backup expired free released reset</list> </completionHelp> </properties> <command>${vyos_op_scripts_dir}/dhcp.py show_server_leases --family inet --state $6</command> </tagNode> </children> </node> <node name="static-mappings"> <properties> <help>Show DHCP server static mappings</help> </properties> <command>${vyos_op_scripts_dir}/dhcp.py show_server_static_mappings --family inet</command> <children> <tagNode name="pool"> <properties> <help>Show DHCP server static mappings for a specific pool</help> <completionHelp> <path>service dhcp-server shared-network-name</path> </completionHelp> </properties> <command>${vyos_op_scripts_dir}/dhcp.py show_server_static_mappings --family inet --pool $6</command> </tagNode> <tagNode name="sort"> <properties> <help>Show DHCP server static mappings sorted by the specified key</help> <completionHelp> <list>ip mac duid pool</list> </completionHelp> </properties> <command>${vyos_op_scripts_dir}/dhcp.py show_server_static_mappings --family inet --sort $6</command> </tagNode> </children> </node> <node name="statistics"> <properties> <help>Show DHCP server statistics</help> </properties> <command>${vyos_op_scripts_dir}/dhcp.py show_pool_statistics --family inet</command> <children> <tagNode name="pool"> <properties> <help>Show DHCP server statistics for a specific pool</help> <completionHelp> <path>service dhcp-server shared-network-name</path> </completionHelp> </properties> <command>${vyos_op_scripts_dir}/dhcp.py show_pool_statistics --family inet --pool $6</command> </tagNode> </children> </node> </children> </node> </children> </node> <node name="dhcpv6"> <properties> <help>Show DHCPv6 (IPv6 Dynamic Host Configuration Protocol) information</help> </properties> <children> <node name="server"> <properties> <help>Show DHCPv6 server information</help> </properties> <children> <node name="leases"> <properties> <help>Show DHCPv6 server leases</help> </properties> <command>sudo ${vyos_op_scripts_dir}/dhcp.py show_server_leases --family inet6</command> <children> <tagNode name="pool"> <properties> <help>Show DHCPv6 server leases for a specific pool</help> <completionHelp> <path>service dhcpv6-server shared-network-name</path> </completionHelp> </properties> <command>${vyos_op_scripts_dir}/dhcp.py show_server_leases --family inet6 --pool $6</command> </tagNode> <tagNode name="sort"> <properties> <help>Show DHCPv6 server leases sorted by the specified key</help> <completionHelp> <list>end duid ip last_communication pool remaining state type</list> </completionHelp> </properties> <command>${vyos_op_scripts_dir}/dhcp.py show_server_leases --family inet6 --sort $6</command> </tagNode> <tagNode name="state"> <properties> <help>Show DHCPv6 server leases with a specific state (can be multiple, comma-separated)</help> <completionHelp> <list>abandoned active all backup expired free released reset</list> </completionHelp> </properties> <command>${vyos_op_scripts_dir}/dhcp.py show_server_leases --family inet6 --state $6</command> </tagNode> </children> </node> <node name="static-mappings"> <properties> <help>Show DHCPv6 server static mappings</help> </properties> <command>${vyos_op_scripts_dir}/dhcp.py show_server_static_mappings --family inet6</command> <children> <tagNode name="pool"> <properties> <help>Show DHCPv6 server static mappings for a specific pool</help> <completionHelp> <path>service dhcp-server shared-network-name</path> </completionHelp> </properties> <command>${vyos_op_scripts_dir}/dhcp.py show_server_static_mappings --family inet6 --pool $6</command> </tagNode> <tagNode name="sort"> <properties> <help>Show DHCPv6 server static mappings sorted by the specified key</help> <completionHelp> <list>ip mac duid pool</list> </completionHelp> </properties> <command>${vyos_op_scripts_dir}/dhcp.py show_server_static_mappings --family inet6 --sort $6</command> </tagNode> </children> </node> </children> </node> </children> </node> </children> </node> <node name="restart"> <children> <node name="dhcp"> <properties> <help>Restart DHCP processes</help> </properties> <children> <node name="server"> <properties> <help>Restart DHCP server</help> </properties> <command>if cli-shell-api existsActive service dhcp-server; then sudo systemctl restart kea-dhcp4-server.service; else echo "DHCP server not configured"; fi</command> </node> <node name="relay-agent"> <properties> <help>Restart DHCP relay-agent</help> </properties> <command>sudo ${vyos_op_scripts_dir}/restart_dhcp_relay.py --ipv4</command> </node> </children> </node> <node name="dhcpv6"> <properties> <help>Restart DHCPv6 processes</help> </properties> <children> <node name="server"> <properties> <help>Restart DHCPv6 server</help> </properties> <command>if cli-shell-api existsActive service dhcpv6-server; then sudo systemctl restart kea-dhcp6-server.service; else echo "DHCPv6 server not configured"; fi</command> </node> <node name="relay-agent"> <properties> <help>Restart DHCPv6 relay-agent</help> </properties> <command>sudo ${vyos_op_scripts_dir}/restart_dhcp_relay.py --ipv6</command> </node> </children> </node> </children> </node> <node name="renew"> <properties> <help>Renew specified variable</help> </properties> <children> <node name="dhcp"> <properties> <help>Renew DHCP client lease</help> </properties> <children> <tagNode name="interface"> <properties> <help>Renew DHCP client lease for specified interface</help> <completionHelp> <script>${vyos_completion_dir}/list_interfaces</script> </completionHelp> </properties> <command>sudo systemctl restart "dhclient@$4.service"</command> </tagNode> </children> </node> <node name="dhcpv6"> <properties> <help>Renew DHCPv6 client lease</help> </properties> <children> <tagNode name="interface"> <properties> <help>Renew DHCPv6 client lease for specified interface</help> <completionHelp> <script>${vyos_completion_dir}/list_interfaces</script> </completionHelp> </properties> <command>sudo systemctl restart "dhcp6c@$4.service"</command> </tagNode> </children> </node> </children> </node> </interfaceDefinition> diff --git a/python/vyos/kea.py b/python/vyos/kea.py index 2328d0b00..5aa91e652 100644 --- a/python/vyos/kea.py +++ b/python/vyos/kea.py @@ -1,355 +1,367 @@ # Copyright 2023 VyOS maintainers and contributors <maintainers@vyos.io> # # This library is free software; you can redistribute it and/or # modify it under the terms of the GNU Lesser General Public # License as published by the Free Software Foundation; either # version 2.1 of the License, or (at your option) any later version. # # This library is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU # Lesser General Public License for more details. # # You should have received a copy of the GNU Lesser General Public # License along with this library. If not, see <http://www.gnu.org/licenses/>. import json import os import socket from vyos.template import is_ipv6 from vyos.template import isc_static_route from vyos.template import netmask_from_cidr from vyos.utils.dict import dict_search_args from vyos.utils.file import file_permissions from vyos.utils.file import read_file from vyos.utils.process import run kea4_options = { 'name_server': 'domain-name-servers', 'domain_name': 'domain-name', 'domain_search': 'domain-search', 'ntp_server': 'ntp-servers', 'pop_server': 'pop-server', 'smtp_server': 'smtp-server', 'time_server': 'time-servers', 'wins_server': 'netbios-name-servers', 'default_router': 'routers', 'server_identifier': 'dhcp-server-identifier', 'tftp_server_name': 'tftp-server-name', 'bootfile_size': 'boot-size', 'time_offset': 'time-offset', 'wpad_url': 'wpad-url', 'ipv6_only_preferred': 'v6-only-preferred', 'captive_portal': 'v4-captive-portal' } kea6_options = { 'info_refresh_time': 'information-refresh-time', 'name_server': 'dns-servers', 'domain_search': 'domain-search', 'nis_domain': 'nis-domain-name', 'nis_server': 'nis-servers', 'nisplus_domain': 'nisp-domain-name', 'nisplus_server': 'nisp-servers', 'sntp_server': 'sntp-servers', 'captive_portal': 'v6-captive-portal' } def kea_parse_options(config): options = [] for node, option_name in kea4_options.items(): if node not in config: continue value = ", ".join(config[node]) if isinstance(config[node], list) else config[node] options.append({'name': option_name, 'data': value}) if 'client_prefix_length' in config: options.append({'name': 'subnet-mask', 'data': netmask_from_cidr('0.0.0.0/' + config['client_prefix_length'])}) if 'ip_forwarding' in config: options.append({'name': 'ip-forwarding', 'data': "true"}) if 'static_route' in config: default_route = '' if 'default_router' in config: default_route = isc_static_route('0.0.0.0/0', config['default_router']) routes = [isc_static_route(route, route_options['next_hop']) for route, route_options in config['static_route'].items()] options.append({'name': 'rfc3442-static-route', 'data': ", ".join(routes if not default_route else routes + [default_route])}) options.append({'name': 'windows-static-route', 'data': ", ".join(routes)}) if 'time_zone' in config: with open("/usr/share/zoneinfo/" + config['time_zone'], "rb") as f: tz_string = f.read().split(b"\n")[-2].decode("utf-8") options.append({'name': 'pcode', 'data': tz_string}) options.append({'name': 'tcode', 'data': config['time_zone']}) unifi_controller = dict_search_args(config, 'vendor_option', 'ubiquiti', 'unifi_controller') if unifi_controller: options.append({ 'name': 'unifi-controller', 'data': unifi_controller, 'space': 'ubnt' }) return options def kea_parse_subnet(subnet, config): out = {'subnet': subnet, 'id': int(config['subnet_id'])} options = [] if 'option' in config: out['option-data'] = kea_parse_options(config['option']) if 'bootfile_name' in config['option']: out['boot-file-name'] = config['option']['bootfile_name'] if 'bootfile_server' in config['option']: out['next-server'] = config['option']['bootfile_server'] if 'ignore_client_id' in config: out['match-client-id'] = False if 'lease' in config: out['valid-lifetime'] = int(config['lease']) out['max-valid-lifetime'] = int(config['lease']) if 'range' in config: pools = [] for num, range_config in config['range'].items(): start, stop = range_config['start'], range_config['stop'] pool = { 'pool': f'{start} - {stop}' } if 'option' in range_config: pool['option-data'] = kea_parse_options(range_config['option']) if 'bootfile_name' in range_config['option']: pool['boot-file-name'] = range_config['option']['bootfile_name'] if 'bootfile_server' in range_config['option']: pool['next-server'] = range_config['option']['bootfile_server'] pools.append(pool) out['pools'] = pools if 'static_mapping' in config: reservations = [] for host, host_config in config['static_mapping'].items(): if 'disable' in host_config: continue reservation = { 'hostname': host, } if 'mac' in host_config: reservation['hw-address'] = host_config['mac'] if 'duid' in host_config: reservation['duid'] = host_config['duid'] if 'ip_address' in host_config: reservation['ip-address'] = host_config['ip_address'] if 'option' in host_config: reservation['option-data'] = kea_parse_options(host_config['option']) if 'bootfile_name' in host_config['option']: reservation['boot-file-name'] = host_config['option']['bootfile_name'] if 'bootfile_server' in host_config['option']: reservation['next-server'] = host_config['option']['bootfile_server'] reservations.append(reservation) out['reservations'] = reservations return out def kea6_parse_options(config): options = [] for node, option_name in kea6_options.items(): if node not in config: continue value = ", ".join(config[node]) if isinstance(config[node], list) else config[node] options.append({'name': option_name, 'data': value}) if 'sip_server' in config: sip_servers = config['sip_server'] addrs = [] hosts = [] for server in sip_servers: if is_ipv6(server): addrs.append(server) else: hosts.append(server) if addrs: options.append({'name': 'sip-server-addr', 'data': ", ".join(addrs)}) if hosts: options.append({'name': 'sip-server-dns', 'data': ", ".join(hosts)}) cisco_tftp = dict_search_args(config, 'vendor_option', 'cisco', 'tftp-server') if cisco_tftp: options.append({'name': 'tftp-servers', 'code': 2, 'space': 'cisco', 'data': cisco_tftp}) return options def kea6_parse_subnet(subnet, config): out = {'subnet': subnet, 'id': int(config['subnet_id'])} if 'option' in config: out['option-data'] = kea6_parse_options(config['option']) if 'interface' in config: out['interface'] = config['interface'] if 'range' in config: pools = [] for num, range_config in config['range'].items(): pool = {} if 'prefix' in range_config: pool['pool'] = range_config['prefix'] if 'start' in range_config: start = range_config['start'] stop = range_config['stop'] pool['pool'] = f'{start} - {stop}' if 'option' in range_config: pool['option-data'] = kea6_parse_options(range_config['option']) pools.append(pool) out['pools'] = pools if 'prefix_delegation' in config: pd_pools = [] if 'prefix' in config['prefix_delegation']: for prefix, pd_conf in config['prefix_delegation']['prefix'].items(): pd_pool = { 'prefix': prefix, 'prefix-len': int(pd_conf['prefix_length']), 'delegated-len': int(pd_conf['delegated_length']) } if 'excluded_prefix' in pd_conf: pd_pool['excluded-prefix'] = pd_conf['excluded_prefix'] pd_pool['excluded-prefix-len'] = int(pd_conf['excluded_prefix_length']) pd_pools.append(pd_pool) out['pd-pools'] = pd_pools if 'lease_time' in config: if 'default' in config['lease_time']: out['valid-lifetime'] = int(config['lease_time']['default']) if 'maximum' in config['lease_time']: out['max-valid-lifetime'] = int(config['lease_time']['maximum']) if 'minimum' in config['lease_time']: out['min-valid-lifetime'] = int(config['lease_time']['minimum']) if 'static_mapping' in config: reservations = [] for host, host_config in config['static_mapping'].items(): if 'disable' in host_config: continue reservation = { 'hostname': host } if 'mac' in host_config: reservation['hw-address'] = host_config['mac'] if 'duid' in host_config: reservation['duid'] = host_config['duid'] if 'ipv6_address' in host_config: reservation['ip-addresses'] = [ host_config['ipv6_address'] ] if 'ipv6_prefix' in host_config: reservation['prefixes'] = [ host_config['ipv6_prefix'] ] if 'option' in host_config: reservation['option-data'] = kea6_parse_options(host_config['option']) reservations.append(reservation) out['reservations'] = reservations return out def _ctrl_socket_command(path, command, args=None): if not os.path.exists(path): return None if file_permissions(path) != '0775': run(f'sudo chmod 775 {path}') with socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) as sock: sock.connect(path) payload = {'command': command} if args: payload['arguments'] = args sock.send(bytes(json.dumps(payload), 'utf-8')) result = b'' while True: data = sock.recv(4096) result += data if len(data) < 4096: break return json.loads(result.decode('utf-8')) def kea_get_leases(inet): ctrl_socket = f'/run/kea/dhcp{inet}-ctrl-socket' leases = _ctrl_socket_command(ctrl_socket, f'lease{inet}-get-all') if not leases or 'result' not in leases or leases['result'] != 0: return [] return leases['arguments']['leases'] +def kea_delete_lease(inet, ip_address): + ctrl_socket = f'/run/kea/dhcp{inet}-ctrl-socket' + + args = {'ip-address': ip_address} + + result = _ctrl_socket_command(ctrl_socket, f'lease{inet}-del', args) + + if result and 'result' in result: + return result['result'] == 0 + + return False + def kea_get_active_config(inet): ctrl_socket = f'/run/kea/dhcp{inet}-ctrl-socket' config = _ctrl_socket_command(ctrl_socket, 'config-get') if not config or 'result' not in config or config['result'] != 0: return None return config def kea_get_pool_from_subnet_id(config, inet, subnet_id): shared_networks = dict_search_args(config, 'arguments', f'Dhcp{inet}', 'shared-networks') if not shared_networks: return None for network in shared_networks: if f'subnet{inet}' not in network: continue for subnet in network[f'subnet{inet}']: if 'id' in subnet and int(subnet['id']) == int(subnet_id): return network['name'] return None diff --git a/src/op_mode/clear_dhcp_lease.py b/src/op_mode/clear_dhcp_lease.py deleted file mode 100755 index 7d4b47104..000000000 --- a/src/op_mode/clear_dhcp_lease.py +++ /dev/null @@ -1,88 +0,0 @@ -#!/usr/bin/env python3 -# -# Copyright 2023 VyOS maintainers and contributors <maintainers@vyos.io> -# -# This library is free software; you can redistribute it and/or -# modify it under the terms of the GNU Lesser General Public -# License as published by the Free Software Foundation; either -# version 2.1 of the License, or (at your option) any later version. -# -# This library is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU -# Lesser General Public License for more details. -# -# You should have received a copy of the GNU Lesser General Public -# License along with this library. If not, see <http://www.gnu.org/licenses/>. - -import argparse -import re - -from vyos.configquery import ConfigTreeQuery -from vyos.kea import kea_parse_leases -from vyos.utils.io import ask_yes_no -from vyos.utils.process import call -from vyos.utils.commit import commit_in_progress - -# TODO: Update to use Kea control socket command "lease4-del" - -config = ConfigTreeQuery() -base = ['service', 'dhcp-server'] -lease_file = '/config/dhcp/dhcp4-leases.csv' - - -def del_lease_ip(address): - """ - Read lease_file and write data to this file - without specific section "lease ip" - Delete section "lease x.x.x.x { x;x;x; }" - """ - with open(lease_file, encoding='utf-8') as f: - data = f.read().rstrip() - pattern = rf"^{address},[^\n]+\n" - # Delete lease for ip block - data = re.sub(pattern, '', data) - - # Write new data to original lease_file - with open(lease_file, 'w', encoding='utf-8') as f: - f.write(data) - -def is_ip_in_leases(address): - """ - Return True if address found in the lease file - """ - leases = kea_parse_leases(lease_file) - for lease in leases: - if address == lease['address']: - return True - print(f'Address "{address}" not found in "{lease_file}"') - return False - -if not config.exists(base): - print('DHCP-server not configured!') - exit(0) - -if config.exists(base + ['failover']): - print('Lease cannot be reset in failover mode!') - exit(0) - - -if __name__ == '__main__': - parser = argparse.ArgumentParser() - parser.add_argument('--ip', help='IPv4 address', action='store', required=True) - - args = parser.parse_args() - address = args.ip - - if not is_ip_in_leases(address): - exit(1) - - if commit_in_progress(): - print('Cannot clear DHCP lease while a commit is in progress') - exit(1) - - if not ask_yes_no(f'This will restart DHCP server.\nContinue?'): - exit(1) - else: - del_lease_ip(address) - call('systemctl restart kea-dhcp4-server.service') diff --git a/src/op_mode/dhcp.py b/src/op_mode/dhcp.py index 1d9ad0e76..d27e1baf7 100755 --- a/src/op_mode/dhcp.py +++ b/src/op_mode/dhcp.py @@ -1,461 +1,484 @@ #!/usr/bin/env python3 # # Copyright (C) 2022-2023 VyOS maintainers and contributors # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License version 2 or later as # published by the Free Software Foundation. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. import os import sys import typing from datetime import datetime from glob import glob from ipaddress import ip_address from tabulate import tabulate import vyos.opmode from vyos.base import Warning from vyos.configquery import ConfigTreeQuery from vyos.kea import kea_get_active_config from vyos.kea import kea_get_leases from vyos.kea import kea_get_pool_from_subnet_id +from vyos.kea import kea_delete_lease from vyos.utils.process import is_systemd_service_running time_string = "%a %b %d %H:%M:%S %Z %Y" config = ConfigTreeQuery() lease_valid_states = ['all', 'active', 'free', 'expired', 'released', 'abandoned', 'reset', 'backup'] sort_valid_inet = ['end', 'mac', 'hostname', 'ip', 'pool', 'remaining', 'start', 'state'] sort_valid_inet6 = ['end', 'duid', 'ip', 'last_communication', 'pool', 'remaining', 'state', 'type'] mapping_sort_valid = ['mac', 'ip', 'pool', 'duid'] ArgFamily = typing.Literal['inet', 'inet6'] ArgState = typing.Literal['all', 'active', 'free', 'expired', 'released', 'abandoned', 'reset', 'backup'] ArgOrigin = typing.Literal['local', 'remote'] def _utc_to_local(utc_dt): return datetime.fromtimestamp((datetime.fromtimestamp(utc_dt) - datetime(1970, 1, 1)).total_seconds()) def _format_hex_string(in_str): out_str = "" # if input is divisible by 2, add : every 2 chars if len(in_str) > 0 and len(in_str) % 2 == 0: out_str = ':'.join(a+b for a,b in zip(in_str[::2], in_str[1::2])) else: out_str = in_str return out_str def _find_list_of_dict_index(lst, key='ip', value='') -> int: """ Find the index entry of list of dict matching the dict value Exampe: % lst = [{'ip': '192.0.2.1'}, {'ip': '192.0.2.2'}] % _find_list_of_dict_index(lst, key='ip', value='192.0.2.2') % 1 """ idx = next((index for (index, d) in enumerate(lst) if d[key] == value), None) return idx def _get_raw_server_leases(family='inet', pool=None, sorted=None, state=[], origin=None) -> list: """ Get DHCP server leases :return list """ inet_suffix = '6' if family == 'inet6' else '4' leases = kea_get_leases(inet_suffix) if pool is None: pool = _get_dhcp_pools(family=family) else: pool = [pool] active_config = kea_get_active_config(inet_suffix) data = [] for lease in leases: lifetime = lease['valid-lft'] expiry = (lease['cltt'] + lifetime) lease['start_timestamp'] = datetime.utcfromtimestamp(expiry - lifetime) lease['expire_timestamp'] = datetime.utcfromtimestamp(expiry) if expiry else None data_lease = {} data_lease['ip'] = lease['ip-address'] lease_state_long = {0: 'active', 1: 'rejected', 2: 'expired'} data_lease['state'] = lease_state_long[lease['state']] data_lease['pool'] = kea_get_pool_from_subnet_id(active_config, inet_suffix, lease['subnet-id']) if active_config else '-' data_lease['end'] = lease['expire_timestamp'].timestamp() if lease['expire_timestamp'] else None data_lease['origin'] = 'local' # TODO: Determine remote in HA if family == 'inet': data_lease['mac'] = lease['hw-address'] data_lease['start'] = lease['start_timestamp'].timestamp() data_lease['hostname'] = lease['hostname'] if family == 'inet6': data_lease['last_communication'] = lease['start_timestamp'].timestamp() data_lease['duid'] = _format_hex_string(lease['duid']) data_lease['type'] = lease['type'] if lease['type'] == 'IA_PD': prefix_len = lease['prefix-len'] data_lease['ip'] += f'/{prefix_len}' data_lease['remaining'] = '-' if lease['valid-lft'] > 0: data_lease['remaining'] = lease['expire_timestamp'] - datetime.utcnow() if data_lease['remaining'].days >= 0: # substraction gives us a timedelta object which can't be formatted with strftime # so we use str(), split gets rid of the microseconds data_lease['remaining'] = str(data_lease["remaining"]).split('.')[0] # Do not add old leases if data_lease['remaining'] != '' and data_lease['pool'] in pool and data_lease['state'] != 'free': if not state or state == 'all' or data_lease['state'] in state: data.append(data_lease) # deduplicate checked = [] for entry in data: addr = entry.get('ip') if addr not in checked: checked.append(addr) else: idx = _find_list_of_dict_index(data, key='ip', value=addr) data.pop(idx) if sorted: if sorted == 'ip': data.sort(key = lambda x:ip_address(x['ip'])) else: data.sort(key = lambda x:x[sorted]) return data def _get_formatted_server_leases(raw_data, family='inet'): data_entries = [] if family == 'inet': for lease in raw_data: ipaddr = lease.get('ip') hw_addr = lease.get('mac') state = lease.get('state') start = lease.get('start') start = _utc_to_local(start).strftime('%Y/%m/%d %H:%M:%S') end = lease.get('end') end = _utc_to_local(end).strftime('%Y/%m/%d %H:%M:%S') if end else '-' remain = lease.get('remaining') pool = lease.get('pool') hostname = lease.get('hostname') origin = lease.get('origin') data_entries.append([ipaddr, hw_addr, state, start, end, remain, pool, hostname, origin]) headers = ['IP Address', 'MAC address', 'State', 'Lease start', 'Lease expiration', 'Remaining', 'Pool', 'Hostname', 'Origin'] if family == 'inet6': for lease in raw_data: ipaddr = lease.get('ip') state = lease.get('state') start = lease.get('last_communication') start = _utc_to_local(start).strftime('%Y/%m/%d %H:%M:%S') end = lease.get('end') end = _utc_to_local(end).strftime('%Y/%m/%d %H:%M:%S') remain = lease.get('remaining') lease_type = lease.get('type') pool = lease.get('pool') host_identifier = lease.get('duid') data_entries.append([ipaddr, state, start, end, remain, lease_type, pool, host_identifier]) headers = ['IPv6 address', 'State', 'Last communication', 'Lease expiration', 'Remaining', 'Type', 'Pool', 'DUID'] output = tabulate(data_entries, headers, numalign='left') return output def _get_dhcp_pools(family='inet') -> list: v = 'v6' if family == 'inet6' else '' pools = config.list_nodes(f'service dhcp{v}-server shared-network-name') return pools def _get_pool_size(pool, family='inet'): v = 'v6' if family == 'inet6' else '' base = f'service dhcp{v}-server shared-network-name {pool}' size = 0 subnets = config.list_nodes(f'{base} subnet') for subnet in subnets: ranges = config.list_nodes(f'{base} subnet {subnet} range') for range in ranges: if family == 'inet6': start = config.value(f'{base} subnet {subnet} range {range} start') stop = config.value(f'{base} subnet {subnet} range {range} stop') else: start = config.value(f'{base} subnet {subnet} range {range} start') stop = config.value(f'{base} subnet {subnet} range {range} stop') # Add +1 because both range boundaries are inclusive size += int(ip_address(stop)) - int(ip_address(start)) + 1 return size def _get_raw_pool_statistics(family='inet', pool=None): if pool is None: pool = _get_dhcp_pools(family=family) else: pool = [pool] v = 'v6' if family == 'inet6' else '' stats = [] for p in pool: subnet = config.list_nodes(f'service dhcp{v}-server shared-network-name {p} subnet') size = _get_pool_size(family=family, pool=p) leases = len(_get_raw_server_leases(family=family, pool=p)) use_percentage = round(leases / size * 100) if size != 0 else 0 pool_stats = {'pool': p, 'size': size, 'leases': leases, 'available': (size - leases), 'use_percentage': use_percentage, 'subnet': subnet} stats.append(pool_stats) return stats def _get_formatted_pool_statistics(pool_data, family='inet'): data_entries = [] for entry in pool_data: pool = entry.get('pool') size = entry.get('size') leases = entry.get('leases') available = entry.get('available') use_percentage = entry.get('use_percentage') use_percentage = f'{use_percentage}%' data_entries.append([pool, size, leases, available, use_percentage]) headers = ['Pool', 'Size','Leases', 'Available', 'Usage'] output = tabulate(data_entries, headers, numalign='left') return output def _get_raw_server_static_mappings(family='inet', pool=None, sorted=None): if pool is None: pool = _get_dhcp_pools(family=family) else: pool = [pool] v = 'v6' if family == 'inet6' else '' mappings = [] for p in pool: pool_config = config.get_config_dict(['service', f'dhcp{v}-server', 'shared-network-name', p], get_first_key=True) if 'subnet' in pool_config: for subnet, subnet_config in pool_config['subnet'].items(): if 'static-mapping' in subnet_config: for name, mapping_config in subnet_config['static-mapping'].items(): mapping = {'pool': p, 'subnet': subnet, 'name': name} mapping.update(mapping_config) mappings.append(mapping) if sorted: if sorted == 'ip': data.sort(key = lambda x:ip_address(x['ip-address'])) else: data.sort(key = lambda x:x[sorted]) return mappings def _get_formatted_server_static_mappings(raw_data, family='inet'): data_entries = [] for entry in raw_data: pool = entry.get('pool') subnet = entry.get('subnet') name = entry.get('name') ip_addr = entry.get('ip-address', 'N/A') mac_addr = entry.get('mac', 'N/A') duid = entry.get('duid', 'N/A') description = entry.get('description', 'N/A') data_entries.append([pool, subnet, name, ip_addr, mac_addr, duid, description]) headers = ['Pool', 'Subnet', 'Name', 'IP Address', 'MAC Address', 'DUID', 'Description'] output = tabulate(data_entries, headers, numalign='left') return output def _verify(func): """Decorator checks if DHCP(v6) config exists""" from functools import wraps @wraps(func) def _wrapper(*args, **kwargs): config = ConfigTreeQuery() family = kwargs.get('family') v = 'v6' if family == 'inet6' else '' unconf_message = f'DHCP{v} server is not configured' # Check if config does not exist if not config.exists(f'service dhcp{v}-server'): raise vyos.opmode.UnconfiguredSubsystem(unconf_message) return func(*args, **kwargs) return _wrapper @_verify def show_pool_statistics(raw: bool, family: ArgFamily, pool: typing.Optional[str]): pool_data = _get_raw_pool_statistics(family=family, pool=pool) if raw: return pool_data else: return _get_formatted_pool_statistics(pool_data, family=family) @_verify def show_server_leases(raw: bool, family: ArgFamily, pool: typing.Optional[str], sorted: typing.Optional[str], state: typing.Optional[ArgState], origin: typing.Optional[ArgOrigin] ): # if dhcp server is down, inactive leases may still be shown as active, so warn the user. v = '6' if family == 'inet6' else '4' if not is_systemd_service_running(f'kea-dhcp{v}-server.service'): Warning('DHCP server is configured but not started. Data may be stale.') v = 'v6' if family == 'inet6' else '' if pool and pool not in _get_dhcp_pools(family=family): raise vyos.opmode.IncorrectValue(f'DHCP{v} pool "{pool}" does not exist!') if state and state not in lease_valid_states: raise vyos.opmode.IncorrectValue(f'DHCP{v} state "{state}" is invalid!') sort_valid = sort_valid_inet6 if family == 'inet6' else sort_valid_inet if sorted and sorted not in sort_valid: raise vyos.opmode.IncorrectValue(f'DHCP{v} sort "{sorted}" is invalid!') lease_data = _get_raw_server_leases(family=family, pool=pool, sorted=sorted, state=state, origin=origin) if raw: return lease_data else: return _get_formatted_server_leases(lease_data, family=family) @_verify def show_server_static_mappings(raw: bool, family: ArgFamily, pool: typing.Optional[str], sorted: typing.Optional[str]): v = 'v6' if family == 'inet6' else '' if pool and pool not in _get_dhcp_pools(family=family): raise vyos.opmode.IncorrectValue(f'DHCP{v} pool "{pool}" does not exist!') if sorted and sorted not in mapping_sort_valid: raise vyos.opmode.IncorrectValue(f'DHCP{v} sort "{sorted}" is invalid!') static_mappings = _get_raw_server_static_mappings(family=family, pool=pool, sorted=sorted) if raw: return static_mappings else: return _get_formatted_server_static_mappings(static_mappings, family=family) +def _lease_valid(inet, address): + leases = kea_get_leases(inet) + for lease in leases: + if address == lease['ip-address']: + return True + return False + +@_verify +def clear_dhcp_server_lease(family: ArgFamily, address: str): + v = 'v6' if family == 'inet6' else '' + inet = '6' if family == 'inet6' else '4' + + if not _lease_valid(inet, address): + print(f'Lease not found on DHCP{v} server') + return None + + if not kea_delete_lease(inet, address): + print(f'Failed to clear lease for "{address}"') + return None + + print(f'Lease "{address}" has been cleared') + def _get_raw_client_leases(family='inet', interface=None): from time import mktime from datetime import datetime from vyos.defaults import directories from vyos.utils.network import get_interface_vrf lease_dir = directories['isc_dhclient_dir'] lease_files = [] lease_data = [] if interface: tmp = f'{lease_dir}/dhclient_{interface}.lease' if os.path.exists(tmp): lease_files.append(tmp) else: # All DHCP leases lease_files = glob(f'{lease_dir}/dhclient_*.lease') for lease in lease_files: tmp = {} with open(lease, 'r') as f: for line in f.readlines(): line = line.rstrip() if 'last_update' not in tmp: # ISC dhcp client contains least_update timestamp in human readable # format this makes less sense for an API and also the expiry # timestamp is provided in UNIX time. Convert string (e.g. Sun Jul # 30 18:13:44 CEST 2023) to UNIX time (1690733624) tmp.update({'last_update' : int(mktime(datetime.strptime(line, time_string).timetuple()))}) continue k, v = line.split('=') tmp.update({k : v.replace("'", "")}) if 'interface' in tmp: vrf = get_interface_vrf(tmp['interface']) if vrf: tmp.update({'vrf' : vrf}) lease_data.append(tmp) return lease_data def _get_formatted_client_leases(lease_data, family): from time import localtime from time import strftime from vyos.utils.network import is_intf_addr_assigned data_entries = [] for lease in lease_data: if not lease.get('new_ip_address'): continue data_entries.append(["Interface", lease['interface']]) if 'new_ip_address' in lease: tmp = '[Active]' if is_intf_addr_assigned(lease['interface'], lease['new_ip_address']) else '[Inactive]' data_entries.append(["IP address", lease['new_ip_address'], tmp]) if 'new_subnet_mask' in lease: data_entries.append(["Subnet Mask", lease['new_subnet_mask']]) if 'new_domain_name' in lease: data_entries.append(["Domain Name", lease['new_domain_name']]) if 'new_routers' in lease: data_entries.append(["Router", lease['new_routers']]) if 'new_domain_name_servers' in lease: data_entries.append(["Name Server", lease['new_domain_name_servers']]) if 'new_dhcp_server_identifier' in lease: data_entries.append(["DHCP Server", lease['new_dhcp_server_identifier']]) if 'new_dhcp_lease_time' in lease: data_entries.append(["DHCP Server", lease['new_dhcp_lease_time']]) if 'vrf' in lease: data_entries.append(["VRF", lease['vrf']]) if 'last_update' in lease: tmp = strftime(time_string, localtime(int(lease['last_update']))) data_entries.append(["Last Update", tmp]) if 'new_expiry' in lease: tmp = strftime(time_string, localtime(int(lease['new_expiry']))) data_entries.append(["Expiry", tmp]) # Add empty marker data_entries.append(['']) output = tabulate(data_entries, tablefmt='plain') return output def show_client_leases(raw: bool, family: ArgFamily, interface: typing.Optional[str]): lease_data = _get_raw_client_leases(family=family, interface=interface) if raw: return lease_data else: return _get_formatted_client_leases(lease_data, family=family) if __name__ == '__main__': try: res = vyos.opmode.run(sys.modules[__name__]) if res: print(res) except (ValueError, vyos.opmode.Error) as e: print(e) sys.exit(1)