diff --git a/data/templates/firewall/nftables-static-nat.j2 b/data/templates/firewall/nftables-static-nat.j2 index d3c43858f..790c33ce9 100644 --- a/data/templates/firewall/nftables-static-nat.j2 +++ b/data/templates/firewall/nftables-static-nat.j2 @@ -1,115 +1,31 @@ #!/usr/sbin/nft -f -{% macro nat_rule(rule, config, chain) %} -{% set comment = '' %} -{% set base_log = '' %} - -{% if chain is vyos_defined('PREROUTING') %} -{% set comment = 'STATIC-NAT-' ~ rule %} -{% set base_log = '[NAT-DST-' ~ rule %} -{% set interface = ' iifname "' ~ config.inbound_interface ~ '"' if config.inbound_interface is vyos_defined and config.inbound_interface is not vyos_defined('any') else '' %} -{% if config.translation.address is vyos_defined %} -{# support 1:1 network translation #} -{% if config.translation.address | is_ip_network %} -{% set trns_addr = 'dnat ip prefix to ip daddr map { ' ~ config.destination.address ~ ' : ' ~ config.translation.address ~ ' }' %} -{# we can now clear out the dst_addr part as it's already covered in aboves map #} -{% else %} -{% set dst_addr = 'ip daddr ' ~ config.destination.address if config.destination.address is vyos_defined %} -{% set trns_addr = 'dnat to ' ~ config.translation.address %} -{% endif %} -{% endif %} -{% elif chain is vyos_defined('POSTROUTING') %} -{% set comment = 'STATIC-NAT-' ~ rule %} -{% set base_log = '[NAT-SRC-' ~ rule %} -{% set interface = ' oifname "' ~ config.inbound_interface ~ '"' if config.inbound_interface is vyos_defined and config.inbound_interface is not vyos_defined('any') else '' %} -{% if config.translation.address is vyos_defined %} -{# support 1:1 network translation #} -{% if config.translation.address | is_ip_network %} -{% set trns_addr = 'snat ip prefix to ip saddr map { ' ~ config.translation.address ~ ' : ' ~ config.destination.address ~ ' }' %} -{# we can now clear out the src_addr part as it's already covered in aboves map #} -{% else %} -{% set src_addr = 'ip saddr ' ~ config.translation.address if config.translation.address is vyos_defined %} -{% set trns_addr = 'snat to ' ~ config.destination.address %} -{% endif %} -{% endif %} -{% endif %} - -{% if config.exclude is vyos_defined %} -{# rule has been marked as 'exclude' thus we simply return here #} -{% set trns_addr = 'return' %} -{% set trns_port = '' %} -{% endif %} - -{% if config.translation.options is vyos_defined %} -{% if config.translation.options.address_mapping is vyos_defined('persistent') %} -{% set trns_opts_addr = 'persistent' %} -{% endif %} -{% if config.translation.options.port_mapping is vyos_defined('random') %} -{% set trns_opts_port = 'random' %} -{% elif config.translation.options.port_mapping is vyos_defined('fully-random') %} -{% set trns_opts_port = 'fully-random' %} -{% endif %} -{% endif %} - -{% if trns_opts_addr is vyos_defined and trns_opts_port is vyos_defined %} -{% set trns_opts = trns_opts_addr ~ ',' ~ trns_opts_port %} -{% elif trns_opts_addr is vyos_defined %} -{% set trns_opts = trns_opts_addr %} -{% elif trns_opts_port is vyos_defined %} -{% set trns_opts = trns_opts_port %} -{% endif %} - -{% set output = 'add rule ip vyos_static_nat ' ~ chain ~ interface %} - -{% if dst_addr is vyos_defined %} -{% set output = output ~ ' ' ~ dst_addr %} +{% if first_install is not vyos_defined %} +delete table ip vyos_static_nat {% endif %} -{% if src_addr is vyos_defined %} -{% set output = output ~ ' ' ~ src_addr %} -{% endif %} - -{# Count packets #} -{% set output = output ~ ' counter' %} -{# Special handling of log option, we must repeat the entire rule before the #} -{# NAT translation options are added, this is essential #} -{% if log is vyos_defined %} -{% set log_output = output ~ ' log prefix "' ~ log ~ '" comment "' ~ comment ~ '"' %} -{% endif %} -{% if trns_addr is vyos_defined %} -{% set output = output ~ ' ' ~ trns_addr %} -{% endif %} - -{% if trns_opts is vyos_defined %} -{% set output = output ~ ' ' ~ trns_opts %} -{% endif %} -{% if comment is vyos_defined %} -{% set output = output ~ ' comment "' ~ comment ~ '"' %} -{% endif %} -{{ log_output if log_output is vyos_defined }} -{{ output }} -{% endmacro %} - -# Start with clean STATIC NAT chains -flush chain ip vyos_static_nat PREROUTING -flush chain ip vyos_static_nat POSTROUTING +table ip vyos_static_nat { + # + # Destination NAT rules build up here + # -{# NAT if enabled - add targets to nftables #} - -# -# Destination NAT rules build up here -# -add rule ip vyos_static_nat PREROUTING counter jump VYOS_PRE_DNAT_HOOK + chain PREROUTING { + type nat hook prerouting priority -100; policy accept; {% if static.rule is vyos_defined %} {% for rule, config in static.rule.items() if config.disable is not vyos_defined %} -{{ nat_rule(rule, config, 'PREROUTING') }} + {{ config | nat_static_rule(rule, 'destination') }} {% endfor %} {% endif %} -# -# Source NAT rules build up here -# -add rule ip vyos_static_nat POSTROUTING counter jump VYOS_PRE_SNAT_HOOK + } + + # + # Source NAT rules build up here + # + chain POSTROUTING { + type nat hook postrouting priority 100; policy accept; {% if static.rule is vyos_defined %} {% for rule, config in static.rule.items() if config.disable is not vyos_defined %} -{{ nat_rule(rule, config, 'POSTROUTING') }} + {{ config | nat_static_rule(rule, 'source') }} {% endfor %} {% endif %} + } +} diff --git a/data/vyos-firewall-init.conf b/data/vyos-firewall-init.conf index 3a0e1ee48..11a5bc7bf 100644 --- a/data/vyos-firewall-init.conf +++ b/data/vyos-firewall-init.conf @@ -1,130 +1,110 @@ #!/usr/sbin/nft -f -table ip vyos_static_nat { - chain PREROUTING { - type nat hook prerouting priority -100; policy accept; - counter jump VYOS_PRE_DNAT_HOOK - } - - chain POSTROUTING { - type nat hook postrouting priority 100; policy accept; - counter jump VYOS_PRE_SNAT_HOOK - } - - chain VYOS_PRE_DNAT_HOOK { - return - } - - chain VYOS_PRE_SNAT_HOOK { - return - } -} - # Required by wanloadbalance table ip nat { chain VYOS_PRE_SNAT_HOOK { type nat hook postrouting priority 99; policy accept; return } } table inet mangle { chain FORWARD { type filter hook forward priority -150; policy accept; } } table raw { chain VYOS_TCP_MSS { type filter hook forward priority -300; policy accept; } chain PREROUTING { type filter hook prerouting priority -200; policy accept; counter jump VYOS_CT_IGNORE counter jump VYOS_CT_TIMEOUT counter jump VYOS_CT_PREROUTING_HOOK counter jump FW_CONNTRACK notrack } chain OUTPUT { type filter hook output priority -200; policy accept; counter jump VYOS_CT_IGNORE counter jump VYOS_CT_TIMEOUT counter jump VYOS_CT_OUTPUT_HOOK counter jump FW_CONNTRACK notrack } ct helper rpc_tcp { type "rpc" protocol tcp; } ct helper rpc_udp { type "rpc" protocol udp; } ct helper tns_tcp { type "tns" protocol tcp; } chain VYOS_CT_HELPER { ct helper set "rpc_tcp" tcp dport {111} return ct helper set "rpc_udp" udp dport {111} return ct helper set "tns_tcp" tcp dport {1521,1525,1536} return return } chain VYOS_CT_IGNORE { return } chain VYOS_CT_TIMEOUT { return } chain VYOS_CT_PREROUTING_HOOK { return } chain VYOS_CT_OUTPUT_HOOK { return } chain FW_CONNTRACK { accept } } table ip6 raw { chain VYOS_TCP_MSS { type filter hook forward priority -300; policy accept; } chain PREROUTING { type filter hook prerouting priority -300; policy accept; counter jump VYOS_CT_PREROUTING_HOOK counter jump FW_CONNTRACK notrack } chain OUTPUT { type filter hook output priority -300; policy accept; counter jump VYOS_CT_OUTPUT_HOOK counter jump FW_CONNTRACK notrack } chain VYOS_CT_PREROUTING_HOOK { return } chain VYOS_CT_OUTPUT_HOOK { return } chain FW_CONNTRACK { accept } } diff --git a/python/vyos/nat.py b/python/vyos/nat.py index 44dd65372..31bbdc386 100644 --- a/python/vyos/nat.py +++ b/python/vyos/nat.py @@ -1,126 +1,188 @@ #!/usr/bin/env python3 # # Copyright (C) 2022 VyOS maintainers and contributors # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License version 2 or later as # published by the Free Software Foundation. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. from vyos.template import is_ip_network from vyos.util import dict_search_args def parse_nat_rule(rule_conf, rule_id, nat_type, ipv6=False): output = [] ip_prefix = 'ip6' if ipv6 else 'ip' log_prefix = ('DST' if nat_type == 'destination' else 'SRC') + f'-NAT-{rule_id}' log_suffix = '' if ipv6: log_prefix = log_prefix.replace("NAT-", "NAT66-") ignore_type_addr = False translation_str = '' if 'inbound_interface' in rule_conf: ifname = rule_conf['inbound_interface'] if ifname != 'any': output.append(f'iifname "{ifname}"') if 'outbound_interface' in rule_conf: ifname = rule_conf['outbound_interface'] if ifname != 'any': output.append(f'oifname "{ifname}"') if 'protocol' in rule_conf and rule_conf['protocol'] != 'all': protocol = rule_conf['protocol'] if protocol == 'tcp_udp': protocol = '{ tcp, udp }' output.append(f'meta l4proto {protocol}') if 'exclude' in rule_conf: translation_str = 'return' log_suffix = '-EXCL' elif 'translation' in rule_conf: translation_prefix = nat_type[:1] translation_output = [f'{translation_prefix}nat'] addr = dict_search_args(rule_conf, 'translation', 'address') port = dict_search_args(rule_conf, 'translation', 'port') if addr and is_ip_network(addr): if not ipv6: map_addr = dict_search_args(rule_conf, nat_type, 'address') translation_output.append(f'{ip_prefix} prefix to {ip_prefix} {translation_prefix}addr map {{ {map_addr} : {addr} }}') ignore_type_addr = True else: translation_output.append(f'prefix to {addr}') elif addr == 'masquerade': if port: addr = f'{addr} to ' translation_output = [addr] log_suffix = '-MASQ' else: translation_output.append('to') if addr: translation_output.append(addr) options = [] addr_mapping = dict_search_args(rule_conf, 'translation', 'options', 'address_mapping') port_mapping = dict_search_args(rule_conf, 'translation', 'options', 'port_mapping') if addr_mapping == 'persistent': options.append('persistent') if port_mapping and port_mapping != 'none': options.append(port_mapping) translation_str = " ".join(translation_output) + (f':{port}' if port else '') if options: translation_str += f' {",".join(options)}' for target in ['source', 'destination']: prefix = target[:1] addr = dict_search_args(rule_conf, target, 'address') if addr and not (ignore_type_addr and target == nat_type): operator = '' if addr[:1] == '!': operator = '!=' addr = addr[1:] output.append(f'{ip_prefix} {prefix}addr {operator} {addr}') addr_prefix = dict_search_args(rule_conf, target, 'prefix') if addr_prefix and ipv6: operator = '' if addr_prefix[:1] == '!': operator = '!=' addr_prefix = addr[1:] output.append(f'ip6 {prefix}addr {operator} {addr_prefix}') port = dict_search_args(rule_conf, target, 'port') if port: protocol = rule_conf['protocol'] if protocol == 'tcp_udp': protocol = 'th' operator = '' if port[:1] == '!': operator = '!=' port = port[1:] output.append(f'{protocol} {prefix}port {operator} {{ {port} }}') output.append('counter') if 'log' in rule_conf: output.append(f'log prefix "[{log_prefix}{log_suffix}]"') if translation_str: output.append(translation_str) output.append(f'comment "{log_prefix}"') return " ".join(output) + +def parse_nat_static_rule(rule_conf, rule_id, nat_type): + output = [] + log_prefix = ('STATIC-DST' if nat_type == 'destination' else 'STATIC-SRC') + f'-NAT-{rule_id}' + log_suffix = '' + + ignore_type_addr = False + translation_str = '' + + if 'inbound_interface' in rule_conf: + ifname = rule_conf['inbound_interface'] + ifprefix = 'i' if nat_type == 'destination' else 'o' + if ifname != 'any': + output.append(f'{ifprefix}ifname "{ifname}"') + + if 'exclude' in rule_conf: + translation_str = 'return' + log_suffix = '-EXCL' + elif 'translation' in rule_conf: + translation_prefix = nat_type[:1] + translation_output = [f'{translation_prefix}nat'] + addr = dict_search_args(rule_conf, 'translation', 'address') + map_addr = dict_search_args(rule_conf, 'destination', 'address') + + if nat_type == 'source': + addr, map_addr = map_addr, addr # Swap + + if addr and is_ip_network(addr): + translation_output.append(f'ip prefix to ip {translation_prefix}addr map {{ {map_addr} : {addr} }}') + ignore_type_addr = True + elif addr: + translation_output.append(f'to {addr}') + + options = [] + addr_mapping = dict_search_args(rule_conf, 'translation', 'options', 'address_mapping') + port_mapping = dict_search_args(rule_conf, 'translation', 'options', 'port_mapping') + if addr_mapping == 'persistent': + options.append('persistent') + if port_mapping and port_mapping != 'none': + options.append(port_mapping) + + if options: + translation_output.append(",".join(options)) + + translation_str = " ".join(translation_output) + + prefix = nat_type[:1] + addr = dict_search_args(rule_conf, 'translation' if nat_type == 'source' else nat_type, 'address') + if addr and not ignore_type_addr: + output.append(f'ip {prefix}addr {addr}') + + output.append('counter') + + if translation_str: + output.append(translation_str) + + if 'log' in rule_conf: + output.append(f'log prefix "[{log_prefix}{log_suffix}]"') + + output.append(f'comment "{log_prefix}"') + + return " ".join(output) diff --git a/python/vyos/template.py b/python/vyos/template.py index d9ff98d2e..0870a0523 100644 --- a/python/vyos/template.py +++ b/python/vyos/template.py @@ -1,683 +1,688 @@ # Copyright 2019-2022 VyOS maintainers and contributors <maintainers@vyos.io> # # This library is free software; you can redistribute it and/or # modify it under the terms of the GNU Lesser General Public # License as published by the Free Software Foundation; either # version 2.1 of the License, or (at your option) any later version. # # This library is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU # Lesser General Public License for more details. # # You should have received a copy of the GNU Lesser General Public # License along with this library. If not, see <http://www.gnu.org/licenses/>. import functools import os from jinja2 import Environment from jinja2 import FileSystemLoader from jinja2 import ChainableUndefined from vyos.defaults import directories from vyos.util import chmod from vyos.util import chown from vyos.util import dict_search_args from vyos.util import makedir # Holds template filters registered via register_filter() _FILTERS = {} _TESTS = {} # reuse Environments with identical settings to improve performance @functools.lru_cache(maxsize=2) def _get_environment(location=None): if location is None: loc_loader=FileSystemLoader(directories["templates"]) else: loc_loader=FileSystemLoader(location) env = Environment( # Don't check if template files were modified upon re-rendering auto_reload=False, # Cache up to this number of templates for quick re-rendering cache_size=100, loader=loc_loader, trim_blocks=True, undefined=ChainableUndefined, ) env.filters.update(_FILTERS) env.tests.update(_TESTS) return env def register_filter(name, func=None): """Register a function to be available as filter in templates under given name. It can also be used as a decorator, see below in this module for examples. :raise RuntimeError: when trying to register a filter after a template has been rendered already :raise ValueError: when trying to register a name which was taken already """ if func is None: return functools.partial(register_filter, name) if _get_environment.cache_info().currsize: raise RuntimeError( "Filters can only be registered before rendering the first template" ) if name in _FILTERS: raise ValueError(f"A filter with name {name!r} was registered already") _FILTERS[name] = func return func def register_test(name, func=None): """Register a function to be available as test in templates under given name. It can also be used as a decorator, see below in this module for examples. :raise RuntimeError: when trying to register a test after a template has been rendered already :raise ValueError: when trying to register a name which was taken already """ if func is None: return functools.partial(register_test, name) if _get_environment.cache_info().currsize: raise RuntimeError( "Tests can only be registered before rendering the first template" ) if name in _TESTS: raise ValueError(f"A test with name {name!r} was registered already") _TESTS[name] = func return func def render_to_string(template, content, formater=None, location=None): """Render a template from the template directory, raise on any errors. :param template: the path to the template relative to the template folder :param content: the dictionary of variables to put into rendering context :param formater: if given, it has to be a callable the rendered string is passed through The parsed template files are cached, so rendering the same file multiple times does not cause as too much overhead. If used everywhere, it could be changed to load the template from Python environment variables from an importable Python module generated when the Debian package is build (recovering the load time and overhead caused by having the file out of the code). """ template = _get_environment(location).get_template(template) rendered = template.render(content) if formater is not None: rendered = formater(rendered) return rendered def render( destination, template, content, formater=None, permission=None, user=None, group=None, location=None, ): """Render a template from the template directory to a file, raise on any errors. :param destination: path to the file to save the rendered template in :param permission: permission bitmask to set for the output file :param user: user to own the output file :param group: group to own the output file All other parameters are as for :func:`render_to_string`. """ # Create the directory if it does not exist folder = os.path.dirname(destination) makedir(folder, user, group) # As we are opening the file with 'w', we are performing the rendering before # calling open() to not accidentally erase the file if rendering fails rendered = render_to_string(template, content, formater, location) # Write to file with open(destination, "w") as file: chmod(file.fileno(), permission) chown(file.fileno(), user, group) file.write(rendered) ################################## # Custom template filters follow # ################################## @register_filter('force_to_list') def force_to_list(value): """ Convert scalars to single-item lists and leave lists untouched """ if isinstance(value, list): return value else: return [value] @register_filter('ip_from_cidr') def ip_from_cidr(prefix): """ Take an IPv4/IPv6 CIDR host and strip cidr mask. Example: 192.0.2.1/24 -> 192.0.2.1, 2001:db8::1/64 -> 2001:db8::1 """ from ipaddress import ip_interface return str(ip_interface(prefix).ip) @register_filter('address_from_cidr') def address_from_cidr(prefix): """ Take an IPv4/IPv6 CIDR prefix and convert the network to an "address". Example: 192.0.2.0/24 -> 192.0.2.0, 2001:db8::/48 -> 2001:db8:: """ from ipaddress import ip_network return str(ip_network(prefix).network_address) @register_filter('bracketize_ipv6') def bracketize_ipv6(address): """ Place a passed IPv6 address into [] brackets, do nothing for IPv4 """ if is_ipv6(address): return f'[{address}]' return address @register_filter('dot_colon_to_dash') def dot_colon_to_dash(text): """ Replace dot and colon to dash for string Example: 192.0.2.1 => 192-0-2-1, 2001:db8::1 => 2001-db8--1 """ text = text.replace(":", "-") text = text.replace(".", "-") return text @register_filter('netmask_from_cidr') def netmask_from_cidr(prefix): """ Take CIDR prefix and convert the prefix length to a "subnet mask". Example: - 192.0.2.0/24 -> 255.255.255.0 - 2001:db8::/48 -> ffff:ffff:ffff:: """ from ipaddress import ip_network return str(ip_network(prefix).netmask) @register_filter('netmask_from_ipv4') def netmask_from_ipv4(address): """ Take IP address and search all attached interface IP addresses for the given one. After address has been found, return the associated netmask. Example: - 172.18.201.10 -> 255.255.255.128 """ from netifaces import interfaces from netifaces import ifaddresses from netifaces import AF_INET for interface in interfaces(): tmp = ifaddresses(interface) if AF_INET in tmp: for af_addr in tmp[AF_INET]: if 'addr' in af_addr: if af_addr['addr'] == address: return af_addr['netmask'] raise ValueError @register_filter('is_ip_network') def is_ip_network(addr): """ Take IP(v4/v6) address and validate if the passed argument is a network or a host address. Example: - 192.0.2.0 -> False - 192.0.2.10/24 -> False - 192.0.2.0/24 -> True - 2001:db8:: -> False - 2001:db8::100 -> False - 2001:db8::/48 -> True - 2001:db8:1000::/64 -> True """ try: from ipaddress import ip_network # input variables must contain a / to indicate its CIDR notation if len(addr.split('/')) != 2: raise ValueError() ip_network(addr) return True except: return False @register_filter('network_from_ipv4') def network_from_ipv4(address): """ Take IP address and search all attached interface IP addresses for the given one. After address has been found, return the associated network address. Example: - 172.18.201.10 has mask 255.255.255.128 -> network is 172.18.201.0 """ netmask = netmask_from_ipv4(address) from ipaddress import ip_interface cidr_prefix = ip_interface(f'{address}/{netmask}').network return address_from_cidr(cidr_prefix) @register_filter('is_interface') def is_interface(interface): """ Check if parameter is a valid local interface name """ return os.path.exists(f'/sys/class/net/{interface}') @register_filter('is_ip') def is_ip(addr): """ Check addr if it is an IPv4 or IPv6 address """ return is_ipv4(addr) or is_ipv6(addr) @register_filter('is_ipv4') def is_ipv4(text): """ Filter IP address, return True on IPv4 address, False otherwise """ from ipaddress import ip_interface try: return ip_interface(text).version == 4 except: return False @register_filter('is_ipv6') def is_ipv6(text): """ Filter IP address, return True on IPv6 address, False otherwise """ from ipaddress import ip_interface try: return ip_interface(text).version == 6 except: return False @register_filter('first_host_address') def first_host_address(text): """ Return first usable (host) IP address from given prefix. Example: - 10.0.0.0/24 -> 10.0.0.1 - 2001:db8::/64 -> 2001:db8:: """ from ipaddress import ip_interface from ipaddress import IPv4Network from ipaddress import IPv6Network addr = ip_interface(text) if addr.version == 4: return str(addr.ip +1) return str(addr.ip) @register_filter('last_host_address') def last_host_address(text): """ Return first usable IP address from given prefix. Example: - 10.0.0.0/24 -> 10.0.0.254 - 2001:db8::/64 -> 2001:db8::ffff:ffff:ffff:ffff """ from ipaddress import ip_interface from ipaddress import IPv4Network from ipaddress import IPv6Network addr = ip_interface(text) if addr.version == 4: return str(IPv4Network(addr).broadcast_address - 1) return str(IPv6Network(addr).broadcast_address) @register_filter('inc_ip') def inc_ip(address, increment): """ Increment given IP address by 'increment' Example (inc by 2): - 10.0.0.0/24 -> 10.0.0.2 - 2001:db8::/64 -> 2001:db8::2 """ from ipaddress import ip_interface return str(ip_interface(address).ip + int(increment)) @register_filter('dec_ip') def dec_ip(address, decrement): """ Decrement given IP address by 'decrement' Example (inc by 2): - 10.0.0.0/24 -> 10.0.0.2 - 2001:db8::/64 -> 2001:db8::2 """ from ipaddress import ip_interface return str(ip_interface(address).ip - int(decrement)) @register_filter('compare_netmask') def compare_netmask(netmask1, netmask2): """ Compare two IP netmask if they have the exact same size. compare_netmask('10.0.0.0/8', '20.0.0.0/8') -> True compare_netmask('10.0.0.0/8', '20.0.0.0/16') -> False """ from ipaddress import ip_network try: return ip_network(netmask1).netmask == ip_network(netmask2).netmask except: return False @register_filter('isc_static_route') def isc_static_route(subnet, router): # https://ercpe.de/blog/pushing-static-routes-with-isc-dhcp-server # Option format is: # <netmask>, <network-byte1>, <network-byte2>, <network-byte3>, <router-byte1>, <router-byte2>, <router-byte3> # where bytes with the value 0 are omitted. from ipaddress import ip_network net = ip_network(subnet) # add netmask string = str(net.prefixlen) + ',' # add network bytes if net.prefixlen: width = net.prefixlen // 8 if net.prefixlen % 8: width += 1 string += ','.join(map(str,tuple(net.network_address.packed)[:width])) + ',' # add router bytes string += ','.join(router.split('.')) return string @register_filter('is_file') def is_file(filename): if os.path.exists(filename): return os.path.isfile(filename) return False @register_filter('get_dhcp_router') def get_dhcp_router(interface): """ Static routes can point to a router received by a DHCP reply. This helper is used to get the current default router from the DHCP reply. Returns False of no router is found, returns the IP address as string if a router is found. """ lease_file = f'/var/lib/dhcp/dhclient_{interface}.leases' if not os.path.exists(lease_file): return None from vyos.util import read_file for line in read_file(lease_file).splitlines(): if 'option routers' in line: (_, _, address) = line.split() return address.rstrip(';') @register_filter('natural_sort') def natural_sort(iterable): import re from jinja2.runtime import Undefined if isinstance(iterable, Undefined) or iterable is None: return list() def convert(text): return int(text) if text.isdigit() else text.lower() def alphanum_key(key): return [convert(c) for c in re.split('([0-9]+)', str(key))] return sorted(iterable, key=alphanum_key) @register_filter('get_ipv4') def get_ipv4(interface): """ Get interface IPv4 addresses""" from vyos.ifconfig import Interface return Interface(interface).get_addr_v4() @register_filter('get_ipv6') def get_ipv6(interface): """ Get interface IPv6 addresses""" from vyos.ifconfig import Interface return Interface(interface).get_addr_v6() @register_filter('get_ip') def get_ip(interface): """ Get interface IP addresses""" from vyos.ifconfig import Interface return Interface(interface).get_addr() def get_first_ike_dh_group(ike_group): if ike_group and 'proposal' in ike_group: for priority, proposal in ike_group['proposal'].items(): if 'dh_group' in proposal: return 'dh-group' + proposal['dh_group'] return 'dh-group2' # Fallback on dh-group2 @register_filter('get_esp_ike_cipher') def get_esp_ike_cipher(group_config, ike_group=None): pfs_lut = { 'dh-group1' : 'modp768', 'dh-group2' : 'modp1024', 'dh-group5' : 'modp1536', 'dh-group14' : 'modp2048', 'dh-group15' : 'modp3072', 'dh-group16' : 'modp4096', 'dh-group17' : 'modp6144', 'dh-group18' : 'modp8192', 'dh-group19' : 'ecp256', 'dh-group20' : 'ecp384', 'dh-group21' : 'ecp521', 'dh-group22' : 'modp1024s160', 'dh-group23' : 'modp2048s224', 'dh-group24' : 'modp2048s256', 'dh-group25' : 'ecp192', 'dh-group26' : 'ecp224', 'dh-group27' : 'ecp224bp', 'dh-group28' : 'ecp256bp', 'dh-group29' : 'ecp384bp', 'dh-group30' : 'ecp512bp', 'dh-group31' : 'curve25519', 'dh-group32' : 'curve448' } ciphers = [] if 'proposal' in group_config: for priority, proposal in group_config['proposal'].items(): # both encryption and hash need to be specified for a proposal if not {'encryption', 'hash'} <= set(proposal): continue tmp = '{encryption}-{hash}'.format(**proposal) if 'dh_group' in proposal: tmp += '-' + pfs_lut[ 'dh-group' + proposal['dh_group'] ] elif 'pfs' in group_config and group_config['pfs'] != 'disable': group = group_config['pfs'] if group_config['pfs'] == 'enable': group = get_first_ike_dh_group(ike_group) tmp += '-' + pfs_lut[group] ciphers.append(tmp) return ciphers @register_filter('get_uuid') def get_uuid(interface): """ Get interface IP addresses""" from uuid import uuid1 return uuid1() openvpn_translate = { 'des': 'des-cbc', '3des': 'des-ede3-cbc', 'bf128': 'bf-cbc', 'bf256': 'bf-cbc', 'aes128gcm': 'aes-128-gcm', 'aes128': 'aes-128-cbc', 'aes192gcm': 'aes-192-gcm', 'aes192': 'aes-192-cbc', 'aes256gcm': 'aes-256-gcm', 'aes256': 'aes-256-cbc' } @register_filter('openvpn_cipher') def get_openvpn_cipher(cipher): if cipher in openvpn_translate: return openvpn_translate[cipher].upper() return cipher.upper() @register_filter('openvpn_ncp_ciphers') def get_openvpn_ncp_ciphers(ciphers): out = [] for cipher in ciphers: if cipher in openvpn_translate: out.append(openvpn_translate[cipher]) else: out.append(cipher) return ':'.join(out).upper() @register_filter('snmp_auth_oid') def snmp_auth_oid(type): if type not in ['md5', 'sha', 'aes', 'des', 'none']: raise ValueError() OIDs = { 'md5' : '.1.3.6.1.6.3.10.1.1.2', 'sha' : '.1.3.6.1.6.3.10.1.1.3', 'aes' : '.1.3.6.1.6.3.10.1.2.4', 'des' : '.1.3.6.1.6.3.10.1.2.2', 'none': '.1.3.6.1.6.3.10.1.2.1' } return OIDs[type] @register_filter('nft_action') def nft_action(vyos_action): if vyos_action == 'accept': return 'return' return vyos_action @register_filter('nft_rule') def nft_rule(rule_conf, fw_name, rule_id, ip_name='ip'): from vyos.firewall import parse_rule return parse_rule(rule_conf, fw_name, rule_id, ip_name) @register_filter('nft_default_rule') def nft_default_rule(fw_conf, fw_name, ipv6=False): output = ['counter'] default_action = fw_conf['default_action'] if 'enable_default_log' in fw_conf: action_suffix = default_action[:1].upper() output.append(f'log prefix "[{fw_name[:19]}-default-{action_suffix}]"') output.append(nft_action(default_action)) if 'default_jump_target' in fw_conf: target = fw_conf['default_jump_target'] def_suffix = '6' if ipv6 else '' output.append(f'NAME{def_suffix}_{target}') output.append(f'comment "{fw_name} default-action {default_action}"') return " ".join(output) @register_filter('nft_state_policy') def nft_state_policy(conf, state, ipv6=False): out = [f'ct state {state}'] if 'log' in conf: log_level = conf['log'] out.append(f'log level {log_level}') out.append('counter') if 'action' in conf: out.append(conf['action']) return " ".join(out) @register_filter('nft_intra_zone_action') def nft_intra_zone_action(zone_conf, ipv6=False): if 'intra_zone_filtering' in zone_conf: intra_zone = zone_conf['intra_zone_filtering'] fw_name = 'ipv6_name' if ipv6 else 'name' name_prefix = 'NAME6_' if ipv6 else 'NAME_' if 'action' in intra_zone: if intra_zone['action'] == 'accept': return 'return' return intra_zone['action'] elif dict_search_args(intra_zone, 'firewall', fw_name): name = dict_search_args(intra_zone, 'firewall', fw_name) return f'jump {name_prefix}{name}' return 'return' @register_filter('nft_nested_group') def nft_nested_group(out_list, includes, groups, key): if not vyos_defined(out_list): out_list = [] def add_includes(name): if key in groups[name]: for item in groups[name][key]: if item in out_list: continue out_list.append(item) if 'include' in groups[name]: for name_inc in groups[name]['include']: add_includes(name_inc) for name in includes: add_includes(name) return out_list @register_filter('nat_rule') def nat_rule(rule_conf, rule_id, nat_type, ipv6=False): from vyos.nat import parse_nat_rule return parse_nat_rule(rule_conf, rule_id, nat_type, ipv6) +@register_filter('nat_static_rule') +def nat_static_rule(rule_conf, rule_id, nat_type): + from vyos.nat import parse_nat_static_rule + return parse_nat_static_rule(rule_conf, rule_id, nat_type) + @register_filter('range_to_regex') def range_to_regex(num_range): from vyos.range_regex import range_to_regex if '-' not in num_range: return num_range regex = range_to_regex(num_range) return f'({regex})' @register_test('vyos_defined') def vyos_defined(value, test_value=None, var_type=None): """ Jinja2 plugin to test if a variable is defined and not none - vyos_defined will test value if defined and is not none and return true or false. If test_value is supplied, the value must also pass == test_value to return true. If var_type is supplied, the value must also be of the specified class/type Examples: 1. Test if var is defined and not none: {% if foo is vyos_defined %} ... {% endif %} 2. Test if variable is defined, not none and has value "something" {% if bar is vyos_defined("something") %} ... {% endif %} Parameters ---------- value : any Value to test from ansible test_value : any, optional Value to test in addition of defined and not none, by default None var_type : ['float', 'int', 'str', 'list', 'dict', 'tuple', 'bool'], optional Type or Class to test for Returns ------- boolean True if variable matches criteria, False in other cases. Implementation inspired and re-used from https://github.com/aristanetworks/ansible-avd/ """ from jinja2 import Undefined if isinstance(value, Undefined) or value is None: # Invalid value - return false return False elif test_value is not None and value != test_value: # Valid value but not matching the optional argument return False elif str(var_type).lower() in ['float', 'int', 'str', 'list', 'dict', 'tuple', 'bool'] and str(var_type).lower() != type(value).__name__: # Invalid class - return false return False else: # Valid value and is matching optional argument if provided - return true return True diff --git a/smoketest/scripts/cli/test_nat.py b/smoketest/scripts/cli/test_nat.py index 5863409be..f824838c0 100755 --- a/smoketest/scripts/cli/test_nat.py +++ b/smoketest/scripts/cli/test_nat.py @@ -1,193 +1,193 @@ #!/usr/bin/env python3 # # Copyright (C) 2020-2022 VyOS maintainers and contributors # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License version 2 or later as # published by the Free Software Foundation. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. import jmespath import json import unittest from base_vyostest_shim import VyOSUnitTestSHIM from vyos.configsession import ConfigSessionError from vyos.util import cmd from vyos.util import dict_search base_path = ['nat'] src_path = base_path + ['source'] dst_path = base_path + ['destination'] static_path = base_path + ['static'] class TestNAT(VyOSUnitTestSHIM.TestCase): @classmethod def setUpClass(cls): super(TestNAT, cls).setUpClass() # ensure we can also run this test on a live system - so lets clean # out the current configuration :) cls.cli_delete(cls, base_path) def tearDown(self): self.cli_delete(base_path) self.cli_commit() def verify_nftables(self, nftables_search, table, inverse=False, args=''): nftables_output = cmd(f'sudo nft {args} list table {table}') for search in nftables_search: matched = False for line in nftables_output.split("\n"): if all(item in line for item in search): matched = True break self.assertTrue(not matched if inverse else matched, msg=search) def test_snat(self): rules = ['100', '110', '120', '130', '200', '210', '220', '230'] outbound_iface_100 = 'eth0' outbound_iface_200 = 'eth1' nftables_search = ['jump VYOS_PRE_SNAT_HOOK'] for rule in rules: network = f'192.168.{rule}.0/24' # depending of rule order we check either for source address for NAT # or configured destination address for NAT if int(rule) < 200: self.cli_set(src_path + ['rule', rule, 'source', 'address', network]) self.cli_set(src_path + ['rule', rule, 'outbound-interface', outbound_iface_100]) self.cli_set(src_path + ['rule', rule, 'translation', 'address', 'masquerade']) nftables_search.append([f'saddr {network}', f'oifname "{outbound_iface_100}"', 'masquerade']) else: self.cli_set(src_path + ['rule', rule, 'destination', 'address', network]) self.cli_set(src_path + ['rule', rule, 'outbound-interface', outbound_iface_200]) self.cli_set(src_path + ['rule', rule, 'exclude']) nftables_search.append([f'daddr {network}', f'oifname "{outbound_iface_200}"', 'return']) self.cli_commit() self.verify_nftables(nftables_search, 'ip vyos_nat') def test_dnat(self): rules = ['100', '110', '120', '130', '200', '210', '220', '230'] inbound_iface_100 = 'eth0' inbound_iface_200 = 'eth1' inbound_proto_100 = 'udp' inbound_proto_200 = 'tcp' nftables_search = ['jump VYOS_PRE_DNAT_HOOK'] for rule in rules: port = f'10{rule}' self.cli_set(dst_path + ['rule', rule, 'source', 'port', port]) self.cli_set(dst_path + ['rule', rule, 'translation', 'address', '192.0.2.1']) self.cli_set(dst_path + ['rule', rule, 'translation', 'port', port]) rule_search = [f'dnat to 192.0.2.1:{port}'] if int(rule) < 200: self.cli_set(dst_path + ['rule', rule, 'protocol', inbound_proto_100]) self.cli_set(dst_path + ['rule', rule, 'inbound-interface', inbound_iface_100]) rule_search.append(f'{inbound_proto_100} sport {port}') rule_search.append(f'iifname "{inbound_iface_100}"') else: self.cli_set(dst_path + ['rule', rule, 'protocol', inbound_proto_200]) self.cli_set(dst_path + ['rule', rule, 'inbound-interface', inbound_iface_200]) rule_search.append(f'iifname "{inbound_iface_200}"') nftables_search.append(rule_search) self.cli_commit() self.verify_nftables(nftables_search, 'ip vyos_nat') def test_snat_required_translation_address(self): # T2813: Ensure translation address is specified rule = '5' self.cli_set(src_path + ['rule', rule, 'source', 'address', '192.0.2.0/24']) # check validate() - outbound-interface must be defined with self.assertRaises(ConfigSessionError): self.cli_commit() self.cli_set(src_path + ['rule', rule, 'outbound-interface', 'eth0']) # check validate() - translation address not specified with self.assertRaises(ConfigSessionError): self.cli_commit() self.cli_set(src_path + ['rule', rule, 'translation', 'address', 'masquerade']) self.cli_commit() def test_dnat_negated_addresses(self): # T3186: negated addresses are not accepted by nftables rule = '1000' self.cli_set(dst_path + ['rule', rule, 'destination', 'address', '!192.0.2.1']) self.cli_set(dst_path + ['rule', rule, 'destination', 'port', '53']) self.cli_set(dst_path + ['rule', rule, 'inbound-interface', 'eth0']) self.cli_set(dst_path + ['rule', rule, 'protocol', 'tcp_udp']) self.cli_set(dst_path + ['rule', rule, 'source', 'address', '!192.0.2.1']) self.cli_set(dst_path + ['rule', rule, 'translation', 'address', '192.0.2.1']) self.cli_set(dst_path + ['rule', rule, 'translation', 'port', '53']) self.cli_commit() def test_nat_no_rules(self): # T3206: deleting all rules but keep the direction 'destination' or # 'source' resulteds in KeyError: 'rule'. # # Test that both 'nat destination' and 'nat source' nodes can exist # without any rule self.cli_set(src_path) self.cli_set(dst_path) self.cli_set(static_path) self.cli_commit() def test_dnat_without_translation_address(self): self.cli_set(dst_path + ['rule', '1', 'inbound-interface', 'eth1']) self.cli_set(dst_path + ['rule', '1', 'destination', 'port', '443']) self.cli_set(dst_path + ['rule', '1', 'protocol', 'tcp']) self.cli_set(dst_path + ['rule', '1', 'translation', 'port', '443']) self.cli_commit() nftables_search = [ ['iifname "eth1"', 'tcp dport 443', 'dnat to :443'] ] self.verify_nftables(nftables_search, 'ip vyos_nat') def test_static_nat(self): dst_addr_1 = '10.0.1.1' translate_addr_1 = '192.168.1.1' dst_addr_2 = '203.0.113.0/24' translate_addr_2 = '192.0.2.0/24' ifname = 'eth0' self.cli_set(static_path + ['rule', '10', 'destination', 'address', dst_addr_1]) self.cli_set(static_path + ['rule', '10', 'inbound-interface', ifname]) self.cli_set(static_path + ['rule', '10', 'translation', 'address', translate_addr_1]) self.cli_set(static_path + ['rule', '20', 'destination', 'address', dst_addr_2]) self.cli_set(static_path + ['rule', '20', 'inbound-interface', ifname]) self.cli_set(static_path + ['rule', '20', 'translation', 'address', translate_addr_2]) self.cli_commit() nftables_search = [ [f'iifname "{ifname}"', f'ip daddr {dst_addr_1}', f'dnat to {translate_addr_1}'], [f'oifname "{ifname}"', f'ip saddr {translate_addr_1}', f'snat to {dst_addr_1}'], [f'iifname "{ifname}"', f'dnat ip prefix to ip daddr map {{ {dst_addr_2} : {translate_addr_2} }}'], - [f'oifname "{ifname}"', f'snat ip prefix to ip daddr map {{ {translate_addr_2} : {dst_addr_2} }}'] + [f'oifname "{ifname}"', f'snat ip prefix to ip saddr map {{ {translate_addr_2} : {dst_addr_2} }}'] ] - self.verify_nftables(nftables_search, 'ip vyos_nat') + self.verify_nftables(nftables_search, 'ip vyos_static_nat') if __name__ == '__main__': unittest.main(verbosity=2) diff --git a/src/conf_mode/nat.py b/src/conf_mode/nat.py index 3f52d7c1f..8b1a5a720 100755 --- a/src/conf_mode/nat.py +++ b/src/conf_mode/nat.py @@ -1,216 +1,218 @@ #!/usr/bin/env python3 # # Copyright (C) 2020-2022 VyOS maintainers and contributors # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License version 2 or later as # published by the Free Software Foundation. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. import jmespath import json import os from distutils.version import LooseVersion from platform import release as kernel_version from sys import exit from netifaces import interfaces from vyos.base import Warning from vyos.config import Config from vyos.configdict import dict_merge from vyos.template import render from vyos.template import is_ip_network from vyos.util import cmd from vyos.util import run from vyos.util import check_kmod from vyos.util import dict_search from vyos.validate import is_addr_assigned from vyos.xml import defaults from vyos import ConfigError from vyos import airbag airbag.enable() if LooseVersion(kernel_version()) > LooseVersion('5.1'): k_mod = ['nft_nat', 'nft_chain_nat'] else: k_mod = ['nft_nat', 'nft_chain_nat_ipv4'] nftables_nat_config = '/run/nftables_nat.conf' nftables_static_nat_conf = '/run/nftables_static-nat-rules.nft' def get_handler(json, chain, target): """ Get nftable rule handler number of given chain/target combination. Handler is required when adding NAT/Conntrack helper targets """ for x in json: if x['chain'] != chain: continue if x['target'] != target: continue return x['handle'] return None def verify_rule(config, err_msg): """ Common verify steps used for both source and destination NAT """ if (dict_search('translation.port', config) != None or dict_search('destination.port', config) != None or dict_search('source.port', config)): if config['protocol'] not in ['tcp', 'udp', 'tcp_udp']: raise ConfigError(f'{err_msg}\n' \ 'ports can only be specified when protocol is '\ 'either tcp, udp or tcp_udp!') if is_ip_network(dict_search('translation.address', config)): raise ConfigError(f'{err_msg}\n' \ 'Cannot use ports with an IPv4 network as translation address as it\n' \ 'statically maps a whole network of addresses onto another\n' \ 'network of addresses') def get_config(config=None): if config: conf = config else: conf = Config() base = ['nat'] nat = conf.get_config_dict(base, key_mangling=('-', '_'), get_first_key=True) # T2665: we must add the tagNode defaults individually until this is # moved to the base class for direction in ['source', 'destination', 'static']: if direction in nat: default_values = defaults(base + [direction, 'rule']) for rule in dict_search(f'{direction}.rule', nat) or []: nat[direction]['rule'][rule] = dict_merge(default_values, nat[direction]['rule'][rule]) # read in current nftable (once) for further processing tmp = cmd('nft -j list table raw') nftable_json = json.loads(tmp) # condense the full JSON table into a list with only relevand informations pattern = 'nftables[?rule].rule[?expr[].jump].{chain: chain, handle: handle, target: expr[].jump.target | [0]}' condensed_json = jmespath.search(pattern, nftable_json) if not conf.exists(base): nat['helper_functions'] = 'remove' # Retrieve current table handler positions nat['pre_ct_ignore'] = get_handler(condensed_json, 'PREROUTING', 'VYOS_CT_HELPER') nat['pre_ct_conntrack'] = get_handler(condensed_json, 'PREROUTING', 'NAT_CONNTRACK') nat['out_ct_ignore'] = get_handler(condensed_json, 'OUTPUT', 'VYOS_CT_HELPER') nat['out_ct_conntrack'] = get_handler(condensed_json, 'OUTPUT', 'NAT_CONNTRACK') nat['deleted'] = '' return nat # check if NAT connection tracking helpers need to be set up - this has to # be done only once if not get_handler(condensed_json, 'PREROUTING', 'NAT_CONNTRACK'): nat['helper_functions'] = 'add' # Retrieve current table handler positions nat['pre_ct_ignore'] = get_handler(condensed_json, 'PREROUTING', 'VYOS_CT_IGNORE') nat['pre_ct_conntrack'] = get_handler(condensed_json, 'PREROUTING', 'VYOS_CT_PREROUTING_HOOK') nat['out_ct_ignore'] = get_handler(condensed_json, 'OUTPUT', 'VYOS_CT_IGNORE') nat['out_ct_conntrack'] = get_handler(condensed_json, 'OUTPUT', 'VYOS_CT_OUTPUT_HOOK') return nat def verify(nat): if not nat or 'deleted' in nat: # no need to verify the CLI as NAT is going to be deactivated return None if 'helper_functions' in nat: if not (nat['pre_ct_ignore'] or nat['pre_ct_conntrack'] or nat['out_ct_ignore'] or nat['out_ct_conntrack']): raise Exception('could not determine nftable ruleset handlers') if dict_search('source.rule', nat): for rule, config in dict_search('source.rule', nat).items(): err_msg = f'Source NAT configuration error in rule {rule}:' if 'outbound_interface' not in config: raise ConfigError(f'{err_msg} outbound-interface not specified') if config['outbound_interface'] not in 'any' and config['outbound_interface'] not in interfaces(): Warning(f'rule "{rule}" interface "{config["outbound_interface"]}" does not exist on this system') addr = dict_search('translation.address', config) if addr != None and addr != 'masquerade' and not is_ip_network(addr): for ip in addr.split('-'): if not is_addr_assigned(ip): Warning(f'IP address {ip} does not exist on the system!') # common rule verification verify_rule(config, err_msg) if dict_search('destination.rule', nat): for rule, config in dict_search('destination.rule', nat).items(): err_msg = f'Destination NAT configuration error in rule {rule}:' if 'inbound_interface' not in config: raise ConfigError(f'{err_msg}\n' \ 'inbound-interface not specified') elif config['inbound_interface'] not in 'any' and config['inbound_interface'] not in interfaces(): Warning(f'rule "{rule}" interface "{config["inbound_interface"]}" does not exist on this system') # common rule verification verify_rule(config, err_msg) if dict_search('static.rule', nat): for rule, config in dict_search('static.rule', nat).items(): err_msg = f'Static NAT configuration error in rule {rule}:' if 'inbound_interface' not in config: raise ConfigError(f'{err_msg}\n' \ 'inbound-interface not specified') # common rule verification verify_rule(config, err_msg) return None def generate(nat): if not os.path.exists(nftables_nat_config): nat['first_install'] = True render(nftables_nat_config, 'firewall/nftables-nat.j2', nat) render(nftables_static_nat_conf, 'firewall/nftables-static-nat.j2', nat) # dry-run newly generated configuration tmp = run(f'nft -c -f {nftables_nat_config}') if tmp > 0: raise ConfigError('Configuration file errors encountered!') - tmp = run(f'nft -c -f {nftables_nat_config}') + tmp = run(f'nft -c -f {nftables_static_nat_conf}') + if tmp > 0: + raise ConfigError('Configuration file errors encountered!') return None def apply(nat): cmd(f'nft -f {nftables_nat_config}') cmd(f'nft -f {nftables_static_nat_conf}') return None if __name__ == '__main__': try: check_kmod(k_mod) c = get_config() verify(c) generate(c) apply(c) except ConfigError as e: print(e) exit(1)