diff --git a/src/conf_mode/interfaces_l2tpv3.py b/src/conf_mode/interfaces_l2tpv3.py index b9f827bee..f0a70436e 100755 --- a/src/conf_mode/interfaces_l2tpv3.py +++ b/src/conf_mode/interfaces_l2tpv3.py @@ -1,112 +1,113 @@ #!/usr/bin/env python3 # # Copyright (C) 2019-2024 VyOS maintainers and contributors # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License version 2 or later as # published by the Free Software Foundation. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. from sys import exit from vyos.config import Config from vyos.configdict import get_interface_dict from vyos.configdict import leaf_node_changed from vyos.configverify import verify_address from vyos.configverify import verify_bridge_delete from vyos.configverify import verify_mtu_ipv6 from vyos.configverify import verify_mirror_redirect from vyos.configverify import verify_bond_bridge_member from vyos.configverify import verify_vrf from vyos.ifconfig import L2TPv3If from vyos.utils.kernel import check_kmod from vyos.utils.network import is_addr_assigned from vyos.utils.network import interface_exists from vyos import ConfigError from vyos import airbag airbag.enable() k_mod = ['l2tp_eth', 'l2tp_netlink', 'l2tp_ip', 'l2tp_ip6'] def get_config(config=None): """ Retrive CLI config as dictionary. Dictionary can never be empty, as at least the interface name will be added or a deleted flag """ if config: conf = config else: conf = Config() base = ['interfaces', 'l2tpv3'] ifname, l2tpv3 = get_interface_dict(conf, base) # To delete an l2tpv3 interface we need the current tunnel and session-id if 'deleted' in l2tpv3: tmp = leaf_node_changed(conf, base + [ifname, 'tunnel-id']) # leaf_node_changed() returns a list l2tpv3.update({'tunnel_id': tmp[0]}) tmp = leaf_node_changed(conf, base + [ifname, 'session-id']) l2tpv3.update({'session_id': tmp[0]}) return l2tpv3 def verify(l2tpv3): if 'deleted' in l2tpv3: verify_bridge_delete(l2tpv3) return None interface = l2tpv3['ifname'] for key in ['source_address', 'remote', 'tunnel_id', 'peer_tunnel_id', 'session_id', 'peer_session_id']: if key not in l2tpv3: tmp = key.replace('_', '-') raise ConfigError(f'Missing mandatory L2TPv3 option: "{tmp}"!') if not is_addr_assigned(l2tpv3['source_address']): raise ConfigError('L2TPv3 source-address address "{source_address}" ' 'not configured on any interface!'.format(**l2tpv3)) verify_mtu_ipv6(l2tpv3) verify_address(l2tpv3) verify_vrf(l2tpv3) verify_bond_bridge_member(l2tpv3) verify_mirror_redirect(l2tpv3) return None def generate(l2tpv3): return None def apply(l2tpv3): + check_kmod(k_mod) + # Check if L2TPv3 interface already exists if interface_exists(l2tpv3['ifname']): # L2TPv3 is picky when changing tunnels/sessions, thus we can simply # always delete it first. l = L2TPv3If(**l2tpv3) l.remove() if 'deleted' not in l2tpv3: # Finally create the new interface l = L2TPv3If(**l2tpv3) l.update(l2tpv3) return None if __name__ == '__main__': try: - check_kmod(k_mod) c = get_config() verify(c) generate(c) apply(c) except ConfigError as e: print(e) exit(1) diff --git a/src/conf_mode/interfaces_wireguard.py b/src/conf_mode/interfaces_wireguard.py index 0e0b77877..482da1c66 100755 --- a/src/conf_mode/interfaces_wireguard.py +++ b/src/conf_mode/interfaces_wireguard.py @@ -1,132 +1,133 @@ #!/usr/bin/env python3 # # Copyright (C) 2018-2024 VyOS maintainers and contributors # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License version 2 or later as # published by the Free Software Foundation. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. from sys import exit from vyos.config import Config from vyos.configdict import get_interface_dict from vyos.configdict import is_node_changed from vyos.configverify import verify_vrf from vyos.configverify import verify_address from vyos.configverify import verify_bridge_delete from vyos.configverify import verify_mtu_ipv6 from vyos.configverify import verify_mirror_redirect from vyos.configverify import verify_bond_bridge_member from vyos.ifconfig import WireGuardIf from vyos.utils.kernel import check_kmod from vyos.utils.network import check_port_availability from vyos.utils.network import is_wireguard_key_pair from vyos import ConfigError from vyos import airbag airbag.enable() def get_config(config=None): """ Retrive CLI config as dictionary. Dictionary can never be empty, as at least the interface name will be added or a deleted flag """ if config: conf = config else: conf = Config() base = ['interfaces', 'wireguard'] ifname, wireguard = get_interface_dict(conf, base) # Check if a port was changed tmp = is_node_changed(conf, base + [ifname, 'port']) if tmp: wireguard['port_changed'] = {} # T4702: If anything on a peer changes we remove the peer first and re-add it if is_node_changed(conf, base + [ifname, 'peer']): wireguard.update({'rebuild_required': {}}) return wireguard def verify(wireguard): if 'deleted' in wireguard: verify_bridge_delete(wireguard) return None verify_mtu_ipv6(wireguard) verify_address(wireguard) verify_vrf(wireguard) verify_bond_bridge_member(wireguard) verify_mirror_redirect(wireguard) if 'private_key' not in wireguard: raise ConfigError('Wireguard private-key not defined') if 'peer' not in wireguard: raise ConfigError('At least one Wireguard peer is required!') if 'port' in wireguard and 'port_changed' in wireguard: listen_port = int(wireguard['port']) if check_port_availability('0.0.0.0', listen_port, 'udp') is not True: raise ConfigError(f'UDP port {listen_port} is busy or unavailable and ' 'cannot be used for the interface!') # run checks on individual configured WireGuard peer public_keys = [] for tmp in wireguard['peer']: peer = wireguard['peer'][tmp] if 'allowed_ips' not in peer: raise ConfigError(f'Wireguard allowed-ips required for peer "{tmp}"!') if 'public_key' not in peer: raise ConfigError(f'Wireguard public-key required for peer "{tmp}"!') if ('address' in peer and 'port' not in peer) or ('port' in peer and 'address' not in peer): raise ConfigError('Both Wireguard port and address must be defined ' f'for peer "{tmp}" if either one of them is set!') if peer['public_key'] in public_keys: raise ConfigError(f'Duplicate public-key defined on peer "{tmp}"') if 'disable' not in peer: if is_wireguard_key_pair(wireguard['private_key'], peer['public_key']): raise ConfigError(f'Peer "{tmp}" has the same public key as the interface "{wireguard["ifname"]}"') public_keys.append(peer['public_key']) def apply(wireguard): + check_kmod('wireguard') + if 'rebuild_required' in wireguard or 'deleted' in wireguard: wg = WireGuardIf(**wireguard) # WireGuard only supports peer removal based on the configured public-key, # by deleting the entire interface this is the shortcut instead of parsing # out all peers and removing them one by one. # # Peer reconfiguration will always come with a short downtime while the # WireGuard interface is recreated (see below) wg.remove() # Create the new interface if required if 'deleted' not in wireguard: wg = WireGuardIf(**wireguard) wg.update(wireguard) return None if __name__ == '__main__': try: - check_kmod('wireguard') c = get_config() verify(c) apply(c) except ConfigError as e: print(e) exit(1) diff --git a/src/conf_mode/interfaces_wireless.py b/src/conf_mode/interfaces_wireless.py index ff38c979c..9488f6797 100755 --- a/src/conf_mode/interfaces_wireless.py +++ b/src/conf_mode/interfaces_wireless.py @@ -1,329 +1,330 @@ #!/usr/bin/env python3 # # Copyright (C) 2019-2024 VyOS maintainers and contributors # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License version 2 or later as # published by the Free Software Foundation. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. import os from sys import exit from re import findall from netaddr import EUI, mac_unix_expanded from time import sleep from vyos.config import Config from vyos.configdict import get_interface_dict from vyos.configdict import dict_merge from vyos.configverify import verify_address from vyos.configverify import verify_bridge_delete from vyos.configverify import verify_mirror_redirect from vyos.configverify import verify_vlan_config from vyos.configverify import verify_vrf from vyos.configverify import verify_bond_bridge_member from vyos.ifconfig import WiFiIf from vyos.template import render from vyos.utils.dict import dict_search from vyos.utils.kernel import check_kmod from vyos.utils.process import call from vyos.utils.process import is_systemd_service_active from vyos.utils.process import is_systemd_service_running from vyos.utils.network import interface_exists from vyos import ConfigError from vyos import airbag airbag.enable() # XXX: wpa_supplicant works on the source interface wpa_suppl_conf = '/run/wpa_supplicant/{ifname}.conf' hostapd_conf = '/run/hostapd/{ifname}.conf' hostapd_accept_station_conf = '/run/hostapd/{ifname}_station_accept.conf' hostapd_deny_station_conf = '/run/hostapd/{ifname}_station_deny.conf' def find_other_stations(conf, base, ifname): """ Only one wireless interface per phy can be in station mode - find all interfaces attached to a phy which run in station mode """ old_level = conf.get_level() conf.set_level(base) dict = {} for phy in os.listdir('/sys/class/ieee80211'): list = [] for interface in conf.list_nodes([]): if interface == ifname: continue # the following node is mandatory if conf.exists([interface, 'physical-device', phy]): tmp = conf.return_value([interface, 'type']) if tmp == 'station': list.append(interface) if list: dict.update({phy: list}) conf.set_level(old_level) return dict def get_config(config=None): """ Retrive CLI config as dictionary. Dictionary can never be empty, as at least the interface name will be added or a deleted flag """ if config: conf = config else: conf = Config() base = ['interfaces', 'wireless'] ifname, wifi = get_interface_dict(conf, base) if 'deleted' not in wifi: # then get_interface_dict provides default keys if wifi.from_defaults(['security', 'wep']): # if not set by user del wifi['security']['wep'] if wifi.from_defaults(['security', 'wpa']): # if not set by user del wifi['security']['wpa'] # XXX: Jinja2 can not operate on a dictionary key when it starts of with a number if '40mhz_incapable' in (dict_search('capabilities.ht', wifi) or []): wifi['capabilities']['ht']['fourtymhz_incapable'] = wifi['capabilities']['ht']['40mhz_incapable'] del wifi['capabilities']['ht']['40mhz_incapable'] if dict_search('security.wpa', wifi) != None: wpa_cipher = wifi['security']['wpa'].get('cipher') wpa_mode = wifi['security']['wpa'].get('mode') if not wpa_cipher: tmp = None if wpa_mode == 'wpa': tmp = {'security': {'wpa': {'cipher' : ['TKIP', 'CCMP']}}} elif wpa_mode == 'wpa2': tmp = {'security': {'wpa': {'cipher' : ['CCMP']}}} elif wpa_mode == 'both': tmp = {'security': {'wpa': {'cipher' : ['CCMP', 'TKIP']}}} elif wpa_mode == 'wpa3': # According to WiFi specs (https://www.wi-fi.org/file/wpa3-specification) # section 3.5: WPA3-Enterprise 192-bit mode # WiFi NICs which would be able to connect to WPA3-Enterprise managed # networks MUST support GCMP-256. # Reasoning: Provided that chipsets would most likely _not_ be # "private user only", they all would come with built-in support # for GCMP-256. tmp = {'security': {'wpa': {'cipher' : ['CCMP', 'CCMP-256', 'GCMP', 'GCMP-256']}}} if tmp: wifi = dict_merge(tmp, wifi) # Only one wireless interface per phy can be in station mode tmp = find_other_stations(conf, base, wifi['ifname']) if tmp: wifi['station_interfaces'] = tmp # used in hostapd.conf.j2 wifi['hostapd_accept_station_conf'] = hostapd_accept_station_conf.format(**wifi) wifi['hostapd_deny_station_conf'] = hostapd_deny_station_conf.format(**wifi) return wifi def verify(wifi): if 'deleted' in wifi: verify_bridge_delete(wifi) return None if 'physical_device' not in wifi: raise ConfigError('You must specify a physical-device "phy"') physical_device = wifi['physical_device'] if not os.path.exists(f'/sys/class/ieee80211/{physical_device}'): raise ConfigError(f'Wirelss interface PHY "{physical_device}" does not exist!') if 'type' not in wifi: raise ConfigError('You must specify a WiFi mode') if 'ssid' not in wifi and wifi['type'] != 'monitor': raise ConfigError('SSID must be configured unless type is set to "monitor"!') if wifi['type'] == 'access-point': if 'country_code' not in wifi: raise ConfigError('Wireless country-code is mandatory') if 'channel' not in wifi: raise ConfigError('Wireless channel must be configured!') if 'capabilities' in wifi and 'he' in wifi['capabilities']: if 'channel_set_width' not in wifi['capabilities']['he']: raise ConfigError('Channel width must be configured!') # op_modes drawn from: # https://w1.fi/cgit/hostap/tree/src/common/ieee802_11_common.c?id=195cc3d919503fb0d699d9a56a58a72602b25f51#n1525 # 802.11ax (WiFi-6e - HE) can use up to 160MHz bandwidth channels six_ghz_op_modes_he = ['131', '132', '133', '134', '135'] # 802.11be (WiFi-7 - EHT) can use up to 320MHz bandwidth channels six_ghz_op_modes_eht = six_ghz_op_modes_he.append('137') if 'security' in wifi and 'wpa' in wifi['security'] and 'mode' in wifi['security']['wpa']: if wifi['security']['wpa']['mode'] == 'wpa3': if 'he' in wifi['capabilities']: if wifi['capabilities']['he']['channel_set_width'] in six_ghz_op_modes_he: if 'mgmt_frame_protection' not in wifi or wifi['mgmt_frame_protection'] != 'required': raise ConfigError('Management Frame Protection (MFP) is required with WPA3 at 6GHz! Consider also enabling Beacon Frame Protection (BFP) if your device supports it.') if 'security' in wifi: if {'wep', 'wpa'} <= set(wifi.get('security', {})): raise ConfigError('Must either use WEP or WPA security!') if 'wep' in wifi['security']: if 'key' in wifi['security']['wep'] and len(wifi['security']['wep']) > 4: raise ConfigError('No more then 4 WEP keys configurable') elif 'key' not in wifi['security']['wep']: raise ConfigError('Security WEP configured - missing WEP keys!') elif 'wpa' in wifi['security']: wpa = wifi['security']['wpa'] if not any(i in ['passphrase', 'radius'] for i in wpa): raise ConfigError('Misssing WPA key or RADIUS server') if 'radius' in wpa: if 'server' in wpa['radius']: for server in wpa['radius']['server']: if 'key' not in wpa['radius']['server'][server]: raise ConfigError(f'Misssing RADIUS shared secret key for server: {server}') if 'capabilities' in wifi: capabilities = wifi['capabilities'] if 'vht' in capabilities: if 'ht' not in capabilities: raise ConfigError('Specify HT flags if you want to use VHT!') if {'beamform', 'antenna_count'} <= set(capabilities.get('vht', {})): if capabilities['vht']['antenna_count'] == '1': raise ConfigError('Cannot use beam forming with just one antenna!') if capabilities['vht']['beamform'] == 'single-user-beamformer': if int(capabilities['vht']['antenna_count']) < 3: # Nasty Gotcha: see lines 708-721 in: # https://w1.fi/cgit/hostap/tree/hostapd/hostapd.conf?h=hostap_2_10&id=cff80b4f7d3c0a47c052e8187d671710f48939e4#n708 raise ConfigError('Single-user beam former requires at least 3 antennas!') if 'station_interfaces' in wifi and wifi['type'] == 'station': phy = wifi['physical_device'] if phy in wifi['station_interfaces']: if len(wifi['station_interfaces'][phy]) > 0: raise ConfigError('Only one station per wireless physical interface possible!') verify_address(wifi) verify_vrf(wifi) verify_bond_bridge_member(wifi) verify_mirror_redirect(wifi) # use common function to verify VLAN configuration verify_vlan_config(wifi) return None def generate(wifi): + check_kmod('mac80211') + interface = wifi['ifname'] # Delete config files if interface is removed if 'deleted' in wifi: if os.path.isfile(hostapd_conf.format(**wifi)): os.unlink(hostapd_conf.format(**wifi)) if os.path.isfile(hostapd_accept_station_conf.format(**wifi)): os.unlink(hostapd_accept_station_conf.format(**wifi)) if os.path.isfile(hostapd_deny_station_conf.format(**wifi)): os.unlink(hostapd_deny_station_conf.format(**wifi)) if os.path.isfile(wpa_suppl_conf.format(**wifi)): os.unlink(wpa_suppl_conf.format(**wifi)) return None if 'mac' not in wifi: # http://wiki.stocksy.co.uk/wiki/Multiple_SSIDs_with_hostapd # generate locally administered MAC address from used phy interface with open('/sys/class/ieee80211/{physical_device}/addresses'.format(**wifi), 'r') as f: # some PHYs tend to have multiple interfaces and thus supply multiple MAC # addresses - we only need the first one for our calculation tmp = f.readline().rstrip() tmp = EUI(tmp).value # mask last nibble from the MAC address tmp &= 0xfffffffffff0 # set locally administered bit in MAC address tmp |= 0x020000000000 # we now need to add an offset to our MAC address indicating this # subinterfaces index tmp += int(findall(r'\d+', interface)[0]) # convert integer to "real" MAC address representation mac = EUI(hex(tmp).split('x')[-1]) # change dialect to use : as delimiter instead of - mac.dialect = mac_unix_expanded wifi['mac'] = str(mac) # render appropriate new config files depending on access-point or station mode if wifi['type'] == 'access-point': render(hostapd_conf.format(**wifi), 'wifi/hostapd.conf.j2', wifi) render(hostapd_accept_station_conf.format(**wifi), 'wifi/hostapd_accept_station.conf.j2', wifi) render(hostapd_deny_station_conf.format(**wifi), 'wifi/hostapd_deny_station.conf.j2', wifi) elif wifi['type'] == 'station': render(wpa_suppl_conf.format(**wifi), 'wifi/wpa_supplicant.conf.j2', wifi) return None def apply(wifi): interface = wifi['ifname'] # From systemd source code: # If there's a stop job queued before we enter the DEAD state, we shouldn't act on Restart=, # in order to not undo what has already been enqueued. */ # # It was found that calling restart on hostapd will (4 out of 10 cases) deactivate # the service instead of restarting it, when it was not yet properly stopped # systemd[1]: hostapd@wlan1.service: Deactivated successfully. # Thus kill all WIFI service and start them again after it's ensured nothing lives call(f'systemctl stop hostapd@{interface}.service') call(f'systemctl stop wpa_supplicant@{interface}.service') if 'deleted' in wifi: WiFiIf(**wifi).remove() return None while (is_systemd_service_running(f'hostapd@{interface}.service') or \ is_systemd_service_active(f'hostapd@{interface}.service')): sleep(0.250) # wait 250ms # Finally create the new interface w = WiFiIf(**wifi) w.update(wifi) # Enable/Disable interface - interface is always placed in # administrative down state in WiFiIf class if 'disable' not in wifi: # Wait until interface was properly added to the Kernel ii = 0 while not (interface_exists(interface) and ii < 20): sleep(0.250) # wait 250ms ii += 1 # Physical interface is now configured. Proceed by starting hostapd or # wpa_supplicant daemon. When type is monitor we can just skip this. if wifi['type'] == 'access-point': call(f'systemctl start hostapd@{interface}.service') elif wifi['type'] == 'station': call(f'systemctl start wpa_supplicant@{interface}.service') return None if __name__ == '__main__': try: - check_kmod('mac80211') c = get_config() verify(c) generate(c) apply(c) except ConfigError as e: print(e) exit(1) diff --git a/src/conf_mode/nat.py b/src/conf_mode/nat.py index f74bb217e..39803fa02 100755 --- a/src/conf_mode/nat.py +++ b/src/conf_mode/nat.py @@ -1,263 +1,264 @@ #!/usr/bin/env python3 # # Copyright (C) 2020-2024 VyOS maintainers and contributors # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License version 2 or later as # published by the Free Software Foundation. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. import os from sys import exit from vyos.base import Warning from vyos.config import Config from vyos.configdep import set_dependents, call_dependents from vyos.template import render from vyos.template import is_ip_network from vyos.utils.kernel import check_kmod from vyos.utils.dict import dict_search from vyos.utils.dict import dict_search_args from vyos.utils.process import cmd from vyos.utils.process import run from vyos.utils.network import is_addr_assigned from vyos.utils.network import interface_exists from vyos import ConfigError from vyos import airbag airbag.enable() k_mod = ['nft_nat', 'nft_chain_nat'] nftables_nat_config = '/run/nftables_nat.conf' nftables_static_nat_conf = '/run/nftables_static-nat-rules.nft' valid_groups = [ 'address_group', 'domain_group', 'network_group', 'port_group' ] def get_config(config=None): if config: conf = config else: conf = Config() base = ['nat'] nat = conf.get_config_dict(base, key_mangling=('-', '_'), get_first_key=True, with_recursive_defaults=True) set_dependents('conntrack', conf) if not conf.exists(base): nat['deleted'] = '' return nat nat['firewall_group'] = conf.get_config_dict(['firewall', 'group'], key_mangling=('-', '_'), get_first_key=True, no_tag_node_value_mangle=True) # Remove dynamic firewall groups if present: if 'dynamic_group' in nat['firewall_group']: del nat['firewall_group']['dynamic_group'] return nat def verify_rule(config, err_msg, groups_dict): """ Common verify steps used for both source and destination NAT """ if (dict_search('translation.port', config) != None or dict_search('translation.redirect.port', config) != None or dict_search('destination.port', config) != None or dict_search('source.port', config)): if config['protocol'] not in ['tcp', 'udp', 'tcp_udp']: raise ConfigError(f'{err_msg} ports can only be specified when '\ 'protocol is either tcp, udp or tcp_udp!') for side in ['destination', 'source']: if side in config: side_conf = config[side] if len({'address', 'fqdn'} & set(side_conf)) > 1: raise ConfigError('Only one of address, fqdn or geoip can be specified') if 'group' in side_conf: if len({'address_group', 'network_group', 'domain_group'} & set(side_conf['group'])) > 1: raise ConfigError('Only one address-group, network-group or domain-group can be specified') for group in valid_groups: if group in side_conf['group']: group_name = side_conf['group'][group] error_group = group.replace("_", "-") if group in ['address_group', 'network_group', 'domain_group']: types = [t for t in ['address', 'fqdn'] if t in side_conf] if types: raise ConfigError(f'{error_group} and {types[0]} cannot both be defined') if group_name and group_name[0] == '!': group_name = group_name[1:] group_obj = dict_search_args(groups_dict, group, group_name) if group_obj is None: raise ConfigError(f'Invalid {error_group} "{group_name}" on nat rule') if not group_obj: Warning(f'{error_group} "{group_name}" has no members!') if dict_search_args(side_conf, 'group', 'port_group'): if 'protocol' not in config: raise ConfigError('Protocol must be defined if specifying a port-group') if config['protocol'] not in ['tcp', 'udp', 'tcp_udp']: raise ConfigError('Protocol must be tcp, udp, or tcp_udp when specifying a port-group') if 'load_balance' in config: for item in ['source-port', 'destination-port']: if item in config['load_balance']['hash'] and config['protocol'] not in ['tcp', 'udp']: raise ConfigError('Protocol must be tcp or udp when specifying hash ports') count = 0 if 'backend' in config['load_balance']: for member in config['load_balance']['backend']: weight = config['load_balance']['backend'][member]['weight'] count = count + int(weight) if count != 100: Warning(f'Sum of weight for nat load balance rule is not 100. You may get unexpected behaviour') def verify(nat): if not nat or 'deleted' in nat: # no need to verify the CLI as NAT is going to be deactivated return None if dict_search('source.rule', nat): for rule, config in dict_search('source.rule', nat).items(): err_msg = f'Source NAT configuration error in rule {rule}:' if 'outbound_interface' in config: if 'name' in config['outbound_interface'] and 'group' in config['outbound_interface']: raise ConfigError(f'{err_msg} cannot specify both interface group and interface name for nat source rule "{rule}"') elif 'name' in config['outbound_interface']: interface_name = config['outbound_interface']['name'] if interface_name not in 'any': if interface_name.startswith('!'): interface_name = interface_name[1:] if not interface_exists(interface_name): Warning(f'Interface "{interface_name}" for source NAT rule "{rule}" does not exist!') else: group_name = config['outbound_interface']['group'] if group_name[0] == '!': group_name = group_name[1:] group_obj = dict_search_args(nat['firewall_group'], 'interface_group', group_name) if group_obj is None: raise ConfigError(f'Invalid interface group "{group_name}" on source nat rule') if not group_obj: Warning(f'interface-group "{group_name}" has no members!') if not dict_search('translation.address', config) and not dict_search('translation.port', config): if 'exclude' not in config and 'backend' not in config['load_balance']: raise ConfigError(f'{err_msg} translation requires address and/or port') addr = dict_search('translation.address', config) if addr != None and addr != 'masquerade' and not is_ip_network(addr): for ip in addr.split('-'): if not is_addr_assigned(ip): Warning(f'IP address {ip} does not exist on the system!') # common rule verification verify_rule(config, err_msg, nat['firewall_group']) if dict_search('destination.rule', nat): for rule, config in dict_search('destination.rule', nat).items(): err_msg = f'Destination NAT configuration error in rule {rule}:' if 'inbound_interface' in config: if 'name' in config['inbound_interface'] and 'group' in config['inbound_interface']: raise ConfigError(f'{err_msg} cannot specify both interface group and interface name for destination nat rule "{rule}"') elif 'name' in config['inbound_interface']: interface_name = config['inbound_interface']['name'] if interface_name not in 'any': if interface_name.startswith('!'): interface_name = interface_name[1:] if not interface_exists(interface_name): Warning(f'Interface "{interface_name}" for destination NAT rule "{rule}" does not exist!') else: group_name = config['inbound_interface']['group'] if group_name[0] == '!': group_name = group_name[1:] group_obj = dict_search_args(nat['firewall_group'], 'interface_group', group_name) if group_obj is None: raise ConfigError(f'Invalid interface group "{group_name}" on destination nat rule') if not group_obj: Warning(f'interface-group "{group_name}" has no members!') if not dict_search('translation.address', config) and not dict_search('translation.port', config) and 'redirect' not in config['translation']: if 'exclude' not in config and 'backend' not in config['load_balance']: raise ConfigError(f'{err_msg} translation requires address and/or port') # common rule verification verify_rule(config, err_msg, nat['firewall_group']) if dict_search('static.rule', nat): for rule, config in dict_search('static.rule', nat).items(): err_msg = f'Static NAT configuration error in rule {rule}:' if 'inbound_interface' not in config: raise ConfigError(f'{err_msg} inbound-interface not specified') # common rule verification verify_rule(config, err_msg, nat['firewall_group']) return None def generate(nat): if not os.path.exists(nftables_nat_config): nat['first_install'] = True render(nftables_nat_config, 'firewall/nftables-nat.j2', nat) render(nftables_static_nat_conf, 'firewall/nftables-static-nat.j2', nat) # dry-run newly generated configuration tmp = run(f'nft --check --file {nftables_nat_config}') if tmp > 0: raise ConfigError('Configuration file errors encountered!') tmp = run(f'nft --check --file {nftables_static_nat_conf}') if tmp > 0: raise ConfigError('Configuration file errors encountered!') return None def apply(nat): + check_kmod(k_mod) + cmd(f'nft --file {nftables_nat_config}') cmd(f'nft --file {nftables_static_nat_conf}') if not nat or 'deleted' in nat: os.unlink(nftables_nat_config) os.unlink(nftables_static_nat_conf) call_dependents() return None if __name__ == '__main__': try: - check_kmod(k_mod) c = get_config() verify(c) generate(c) apply(c) except ConfigError as e: print(e) exit(1) diff --git a/src/conf_mode/nat66.py b/src/conf_mode/nat66.py index 075738dad..c44320f36 100755 --- a/src/conf_mode/nat66.py +++ b/src/conf_mode/nat66.py @@ -1,129 +1,130 @@ #!/usr/bin/env python3 # # Copyright (C) 2020-2024 VyOS maintainers and contributors # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License version 2 or later as # published by the Free Software Foundation. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. import os from sys import exit from vyos.base import Warning from vyos.config import Config from vyos.configdep import set_dependents, call_dependents from vyos.template import render from vyos.utils.dict import dict_search from vyos.utils.kernel import check_kmod from vyos.utils.network import interface_exists from vyos.utils.process import cmd from vyos.template import is_ipv6 from vyos import ConfigError from vyos import airbag airbag.enable() k_mod = ['nft_nat', 'nft_chain_nat'] nftables_nat66_config = '/run/nftables_nat66.nft' def get_config(config=None): if config: conf = config else: conf = Config() base = ['nat66'] nat = conf.get_config_dict(base, key_mangling=('-', '_'), get_first_key=True) set_dependents('conntrack', conf) if not conf.exists(base): nat['deleted'] = '' return nat def verify(nat): if not nat or 'deleted' in nat: # no need to verify the CLI as NAT66 is going to be deactivated return None if dict_search('source.rule', nat): for rule, config in dict_search('source.rule', nat).items(): err_msg = f'Source NAT66 configuration error in rule {rule}:' if 'outbound_interface' in config: if 'name' in config['outbound_interface'] and 'group' in config['outbound_interface']: raise ConfigError(f'{err_msg} cannot specify both interface group and interface name for nat source rule "{rule}"') elif 'name' in config['outbound_interface']: interface_name = config['outbound_interface']['name'] if interface_name not in 'any': if interface_name.startswith('!'): interface_name = interface_name[1:] if not interface_exists(interface_name): Warning(f'Interface "{interface_name}" for source NAT66 rule "{rule}" does not exist!') addr = dict_search('translation.address', config) if addr != None: if addr != 'masquerade' and not is_ipv6(addr): raise ConfigError(f'IPv6 address {addr} is not a valid address') else: if 'exclude' not in config: raise ConfigError(f'{err_msg} translation address not specified') prefix = dict_search('source.prefix', config) if prefix != None: if not is_ipv6(prefix): raise ConfigError(f'{err_msg} source-prefix not specified') if dict_search('destination.rule', nat): for rule, config in dict_search('destination.rule', nat).items(): err_msg = f'Destination NAT66 configuration error in rule {rule}:' if 'inbound_interface' in config: if 'name' in config['inbound_interface'] and 'group' in config['inbound_interface']: raise ConfigError(f'{err_msg} cannot specify both interface group and interface name for destination nat rule "{rule}"') elif 'name' in config['inbound_interface']: interface_name = config['inbound_interface']['name'] if interface_name not in 'any': if interface_name.startswith('!'): interface_name = interface_name[1:] if not interface_exists(interface_name): Warning(f'Interface "{interface_name}" for destination NAT66 rule "{rule}" does not exist!') return None def generate(nat): if not os.path.exists(nftables_nat66_config): nat['first_install'] = True render(nftables_nat66_config, 'firewall/nftables-nat66.j2', nat, permission=0o755) return None def apply(nat): if not nat: return None + check_kmod(k_mod) + cmd(f'nft --file {nftables_nat66_config}') call_dependents() return None if __name__ == '__main__': try: - check_kmod(k_mod) c = get_config() verify(c) generate(c) apply(c) except ConfigError as e: print(e) exit(1)