diff --git a/interface-definitions/include/firewall/common-rule.xml.i b/interface-definitions/include/firewall/common-rule.xml.i index 2a5137dbf..0b8838872 100644 --- a/interface-definitions/include/firewall/common-rule.xml.i +++ b/interface-definitions/include/firewall/common-rule.xml.i @@ -1,376 +1,358 @@ <!-- include start from firewall/common-rule.xml.i --> #include <include/firewall/action.xml.i> #include <include/generic-description.xml.i> <leafNode name="disable"> <properties> <help>Option to disable firewall rule</help> <valueless/> </properties> </leafNode> <node name="fragment"> <properties> <help>IP fragment match</help> </properties> <children> <leafNode name="match-frag"> <properties> <help>Second and further fragments of fragmented packets</help> <valueless/> </properties> </leafNode> <leafNode name="match-non-frag"> <properties> <help>Head fragments or unfragmented packets</help> <valueless/> </properties> </leafNode> </children> </node> <node name="ipsec"> <properties> <help>Inbound IPsec packets</help> </properties> <children> <leafNode name="match-ipsec"> <properties> <help>Inbound IPsec packets</help> <valueless/> </properties> </leafNode> <leafNode name="match-none"> <properties> <help>Inbound non-IPsec packets</help> <valueless/> </properties> </leafNode> </children> </node> <node name="limit"> <properties> <help>Rate limit using a token bucket filter</help> </properties> <children> <leafNode name="burst"> <properties> <help>Maximum number of packets to allow in excess of rate</help> <valueHelp> <format>u32:0-4294967295</format> <description>Maximum number of packets to allow in excess of rate</description> </valueHelp> <constraint> <validator name="numeric" argument="--range 0-4294967295"/> </constraint> </properties> </leafNode> <leafNode name="rate"> <properties> <help>Maximum average matching rate</help> <valueHelp> <format>txt</format> <description>integer/unit (Example: 5/minute)</description> </valueHelp> <constraint> <regex>\d+/(second|minute|hour|day)</regex> </constraint> </properties> </leafNode> </children> </node> -<leafNode name="log"> - <properties> - <help>Option to log packets matching rule</help> - <completionHelp> - <list>enable disable</list> - </completionHelp> - <valueHelp> - <format>enable</format> - <description>Enable log</description> - </valueHelp> - <valueHelp> - <format>disable</format> - <description>Disable log</description> - </valueHelp> - <constraint> - <regex>(enable|disable)</regex> - </constraint> - </properties> -</leafNode> +#include <include/firewall/rule-log-level.xml.i> <node name="connection-status"> <properties> <help>Connection status</help> </properties> <children> <leafNode name="nat"> <properties> <help>NAT connection status</help> <completionHelp> <list>destination source</list> </completionHelp> <valueHelp> <format>destination</format> <description>Match connections that are subject to destination NAT</description> </valueHelp> <valueHelp> <format>source</format> <description>Match connections that are subject to source NAT</description> </valueHelp> <constraint> <regex>^(destination|source)$</regex> </constraint> </properties> </leafNode> </children> </node> <leafNode name="protocol"> <properties> <help>Protocol to match (protocol name, number, or "all")</help> <completionHelp> <script>${vyos_completion_dir}/list_protocols.sh</script> <list>all tcp_udp</list> </completionHelp> <valueHelp> <format>all</format> <description>All IP protocols</description> </valueHelp> <valueHelp> <format>tcp_udp</format> <description>Both TCP and UDP</description> </valueHelp> <valueHelp> <format>u32:0-255</format> <description>IP protocol number</description> </valueHelp> <valueHelp> <format><protocol></format> <description>IP protocol name</description> </valueHelp> <valueHelp> <format>!<protocol></format> <description>IP protocol name</description> </valueHelp> <constraint> <validator name="ip-protocol"/> </constraint> </properties> </leafNode> <node name="recent"> <properties> <help>Parameters for matching recently seen sources</help> </properties> <children> <leafNode name="count"> <properties> <help>Source addresses seen more than N times</help> <valueHelp> <format>u32:1-255</format> <description>Source addresses seen more than N times</description> </valueHelp> <constraint> <validator name="numeric" argument="--range 1-255"/> </constraint> </properties> </leafNode> <leafNode name="time"> <properties> <help>Source addresses seen in the last second/minute/hour</help> <completionHelp> <list>second minute hour</list> </completionHelp> <valueHelp> <format>second</format> <description>Source addresses seen COUNT times in the last second</description> </valueHelp> <valueHelp> <format>minute</format> <description>Source addresses seen COUNT times in the last minute</description> </valueHelp> <valueHelp> <format>hour</format> <description>Source addresses seen COUNT times in the last hour</description> </valueHelp> <constraint> <regex>(second|minute|hour)</regex> </constraint> </properties> </leafNode> </children> </node> <node name="source"> <properties> <help>Source parameters</help> </properties> <children> #include <include/firewall/address.xml.i> #include <include/firewall/source-destination-group.xml.i> <leafNode name="mac-address"> <properties> <help>Source MAC address</help> <valueHelp> <format><MAC address></format> <description>MAC address to match</description> </valueHelp> <valueHelp> <format>!<MAC address></format> <description>Match everything except the specified MAC address</description> </valueHelp> <constraint> <validator name="mac-address-firewall"/> </constraint> </properties> </leafNode> #include <include/firewall/port.xml.i> </children> </node> <node name="state"> <properties> <help>Session state</help> </properties> <children> <leafNode name="established"> <properties> <help>Established state</help> <completionHelp> <list>enable disable</list> </completionHelp> <valueHelp> <format>enable</format> <description>Enable</description> </valueHelp> <valueHelp> <format>disable</format> <description>Disable</description> </valueHelp> <constraint> <regex>(enable|disable)</regex> </constraint> </properties> </leafNode> <leafNode name="invalid"> <properties> <help>Invalid state</help> <completionHelp> <list>enable disable</list> </completionHelp> <valueHelp> <format>enable</format> <description>Enable</description> </valueHelp> <valueHelp> <format>disable</format> <description>Disable</description> </valueHelp> <constraint> <regex>(enable|disable)</regex> </constraint> </properties> </leafNode> <leafNode name="new"> <properties> <help>New state</help> <completionHelp> <list>enable disable</list> </completionHelp> <valueHelp> <format>enable</format> <description>Enable</description> </valueHelp> <valueHelp> <format>disable</format> <description>Disable</description> </valueHelp> <constraint> <regex>(enable|disable)</regex> </constraint> </properties> </leafNode> <leafNode name="related"> <properties> <help>Related state</help> <completionHelp> <list>enable disable</list> </completionHelp> <valueHelp> <format>enable</format> <description>Enable</description> </valueHelp> <valueHelp> <format>disable</format> <description>Disable</description> </valueHelp> <constraint> <regex>(enable|disable)</regex> </constraint> </properties> </leafNode> </children> </node> #include <include/firewall/tcp-flags.xml.i> <node name="time"> <properties> <help>Time to match rule</help> </properties> <children> <leafNode name="startdate"> <properties> <help>Date to start matching rule</help> <valueHelp> <format>txt</format> <description>Enter date using following notation - YYYY-MM-DD</description> </valueHelp> <constraint> <regex>(\d{4}\-\d{2}\-\d{2})</regex> </constraint> </properties> </leafNode> <leafNode name="starttime"> <properties> <help>Time of day to start matching rule</help> <valueHelp> <format>txt</format> <description>Enter time using using 24 hour notation - hh:mm:ss</description> </valueHelp> <constraint> <regex>([0-2][0-9](\:[0-5][0-9]){1,2})</regex> </constraint> </properties> </leafNode> <leafNode name="stopdate"> <properties> <help>Date to stop matching rule</help> <valueHelp> <format>txt</format> <description>Enter date using following notation - YYYY-MM-DD</description> </valueHelp> <constraint> <regex>(\d{4}\-\d{2}\-\d{2})</regex> </constraint> </properties> </leafNode> <leafNode name="stoptime"> <properties> <help>Time of day to stop matching rule</help> <valueHelp> <format>txt</format> <description>Enter time using using 24 hour notation - hh:mm:ss</description> </valueHelp> <constraint> <regex>([0-2][0-9](\:[0-5][0-9]){1,2})</regex> </constraint> </properties> </leafNode> <leafNode name="weekdays"> <properties> <help>Comma separated weekdays to match rule on</help> <valueHelp> <format>txt</format> <description>Name of day (Monday, Tuesday, Wednesday, Thursdays, Friday, Saturday, Sunday)</description> </valueHelp> <valueHelp> <format>u32:0-6</format> <description>Day number (0 = Sunday ... 6 = Saturday)</description> </valueHelp> </properties> </leafNode> </children> </node> <!-- include end --> diff --git a/interface-definitions/include/firewall/name-default-log.xml.i b/interface-definitions/include/firewall/name-default-log.xml.i index 979395146..c3f6f0171 100644 --- a/interface-definitions/include/firewall/name-default-log.xml.i +++ b/interface-definitions/include/firewall/name-default-log.xml.i @@ -1,8 +1,45 @@ <!-- include start from firewall/name-default-log.xml.i --> <leafNode name="enable-default-log"> <properties> - <help>Option to log packets hitting default-action</help> - <valueless/> + <help>Option to log packets matching default-action</help> + <completionHelp> + <list>emerg alert crit err warn notice info debug</list> + </completionHelp> + <valueHelp> + <format>emerg</format> + <description>Emerg log level</description> + </valueHelp> + <valueHelp> + <format>alert</format> + <description>Alert log level</description> + </valueHelp> + <valueHelp> + <format>crit</format> + <description>Critical log level</description> + </valueHelp> + <valueHelp> + <format>err</format> + <description>Error log level</description> + </valueHelp> + <valueHelp> + <format>warn</format> + <description>Warning log level</description> + </valueHelp> + <valueHelp> + <format>notice</format> + <description>Notice log level</description> + </valueHelp> + <valueHelp> + <format>info</format> + <description>Info log level</description> + </valueHelp> + <valueHelp> + <format>debug</format> + <description>Debug log level</description> + </valueHelp> + <constraint> + <regex>(emerg|alert|crit|err|warn|notice|info|debug)</regex> + </constraint> </properties> </leafNode> -<!-- include end --> +<!-- include end --> \ No newline at end of file diff --git a/interface-definitions/include/firewall/rule-log-level.xml.i b/interface-definitions/include/firewall/rule-log-level.xml.i new file mode 100644 index 000000000..4842b73ca --- /dev/null +++ b/interface-definitions/include/firewall/rule-log-level.xml.i @@ -0,0 +1,45 @@ +<!-- include start from firewall/common-rule.xml.i --> +<leafNode name="log"> + <properties> + <help>Option to log packets matching rule</help> + <completionHelp> + <list>emerg alert crit err warn notice info debug</list> + </completionHelp> + <valueHelp> + <format>emerg</format> + <description>Emerg log level</description> + </valueHelp> + <valueHelp> + <format>alert</format> + <description>Alert log level</description> + </valueHelp> + <valueHelp> + <format>crit</format> + <description>Critical log level</description> + </valueHelp> + <valueHelp> + <format>err</format> + <description>Error log level</description> + </valueHelp> + <valueHelp> + <format>warn</format> + <description>Warning log level</description> + </valueHelp> + <valueHelp> + <format>notice</format> + <description>Notice log level</description> + </valueHelp> + <valueHelp> + <format>info</format> + <description>Info log level</description> + </valueHelp> + <valueHelp> + <format>debug</format> + <description>Debug log level</description> + </valueHelp> + <constraint> + <regex>(emerg|alert|crit|err|warn|notice|info|debug)</regex> + </constraint> + </properties> +</leafNode> +<!-- include end --> \ No newline at end of file diff --git a/python/vyos/firewall.py b/python/vyos/firewall.py index 04fd44173..0c6811d72 100644 --- a/python/vyos/firewall.py +++ b/python/vyos/firewall.py @@ -1,259 +1,260 @@ #!/usr/bin/env python3 # # Copyright (C) 2021 VyOS maintainers and contributors # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License version 2 or later as # published by the Free Software Foundation. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. import re from vyos.util import cmd from vyos.util import dict_search_args def find_nftables_rule(table, chain, rule_matches=[]): # Find rule in table/chain that matches all criteria and return the handle results = cmd(f'sudo nft -a list chain {table} {chain}').split("\n") for line in results: if all(rule_match in line for rule_match in rule_matches): handle_search = re.search('handle (\d+)', line) if handle_search: return handle_search[1] return None def remove_nftables_rule(table, chain, handle): cmd(f'sudo nft delete rule {table} {chain} handle {handle}') # Functions below used by template generation def nft_action(vyos_action): if vyos_action == 'accept': return 'return' return vyos_action def parse_rule(rule_conf, fw_name, rule_id, ip_name): output = [] def_suffix = '6' if ip_name == 'ip6' else '' if 'state' in rule_conf and rule_conf['state']: states = ",".join([s for s, v in rule_conf['state'].items() if v == 'enable']) if states: output.append(f'ct state {{{states}}}') if 'connection_status' in rule_conf and rule_conf['connection_status']: status = rule_conf['connection_status'] if status['nat'] == 'destination': nat_status = '{dnat}' output.append(f'ct status {nat_status}') if status['nat'] == 'source': nat_status = '{snat}' output.append(f'ct status {nat_status}') if 'protocol' in rule_conf and rule_conf['protocol'] != 'all': proto = rule_conf['protocol'] operator = '' if proto[0] == '!': operator = '!=' proto = proto[1:] if proto == 'tcp_udp': proto = '{tcp, udp}' output.append(f'meta l4proto {operator} {proto}') for side in ['destination', 'source']: if side in rule_conf: prefix = side[0] side_conf = rule_conf[side] if 'address' in side_conf: suffix = side_conf['address'] if suffix[0] == '!': suffix = f'!= {suffix[1:]}' output.append(f'{ip_name} {prefix}addr {suffix}') if 'mac_address' in side_conf: suffix = side_conf["mac_address"] if suffix[0] == '!': suffix = f'!= {suffix[1:]}' output.append(f'ether {prefix}addr {suffix}') if 'port' in side_conf: proto = rule_conf['protocol'] port = side_conf['port'].split(',') ports = [] negated_ports = [] for p in port: if p[0] == '!': negated_ports.append(p[1:]) else: ports.append(p) if proto == 'tcp_udp': proto = 'th' if ports: ports_str = ','.join(ports) output.append(f'{proto} {prefix}port {{{ports_str}}}') if negated_ports: negated_ports_str = ','.join(negated_ports) output.append(f'{proto} {prefix}port != {{{negated_ports_str}}}') if 'group' in side_conf: group = side_conf['group'] if 'address_group' in group: group_name = group['address_group'] operator = '' if group_name[0] == '!': operator = '!=' group_name = group_name[1:] output.append(f'{ip_name} {prefix}addr {operator} $A{def_suffix}_{group_name}') elif 'network_group' in group: group_name = group['network_group'] operator = '' if group_name[0] == '!': operator = '!=' group_name = group_name[1:] output.append(f'{ip_name} {prefix}addr {operator} $N{def_suffix}_{group_name}') if 'mac_group' in group: group_name = group['mac_group'] operator = '' if group_name[0] == '!': operator = '!=' group_name = group_name[1:] output.append(f'ether {prefix}addr {operator} $M_{group_name}') if 'port_group' in group: proto = rule_conf['protocol'] group_name = group['port_group'] if proto == 'tcp_udp': proto = 'th' operator = '' if group_name[0] == '!': operator = '!=' group_name = group_name[1:] output.append(f'{proto} {prefix}port {operator} $P_{group_name}') - if 'log' in rule_conf and rule_conf['log'] == 'enable': + if 'log' in rule_conf: action = rule_conf['action'] if 'action' in rule_conf else 'accept' - output.append(f'log prefix "[{fw_name[:19]}-{rule_id}-{action[:1].upper()}] "') + log_level = rule_conf['log'] + output.append(f'log prefix "[{fw_name[:19]}-{rule_id}-{action[:1].upper()}]" level {log_level}') if 'hop_limit' in rule_conf: operators = {'eq': '==', 'gt': '>', 'lt': '<'} for op, operator in operators.items(): if op in rule_conf['hop_limit']: value = rule_conf['hop_limit'][op] output.append(f'ip6 hoplimit {operator} {value}') for icmp in ['icmp', 'icmpv6']: if icmp in rule_conf: if 'type_name' in rule_conf[icmp]: output.append(icmp + ' type ' + rule_conf[icmp]['type_name']) else: if 'code' in rule_conf[icmp]: output.append(icmp + ' code ' + rule_conf[icmp]['code']) if 'type' in rule_conf[icmp]: output.append(icmp + ' type ' + rule_conf[icmp]['type']) if 'ipsec' in rule_conf: if 'match_ipsec' in rule_conf['ipsec']: output.append('meta ipsec == 1') if 'match_non_ipsec' in rule_conf['ipsec']: output.append('meta ipsec == 0') if 'fragment' in rule_conf: # Checking for fragmentation after priority -400 is not possible, # so we use a priority -450 hook to set a mark if 'match_frag' in rule_conf['fragment']: output.append('meta mark 0xffff1') if 'match_non_frag' in rule_conf['fragment']: output.append('meta mark != 0xffff1') if 'limit' in rule_conf: if 'rate' in rule_conf['limit']: output.append(f'limit rate {rule_conf["limit"]["rate"]}') if 'burst' in rule_conf['limit']: output.append(f'burst {rule_conf["limit"]["burst"]} packets') if 'recent' in rule_conf: count = rule_conf['recent']['count'] time = rule_conf['recent']['time'] output.append(f'add @RECENT{def_suffix}_{fw_name}_{rule_id} {{ {ip_name} saddr limit rate over {count}/{time} burst {count} packets }}') if 'time' in rule_conf: output.append(parse_time(rule_conf['time'])) tcp_flags = dict_search_args(rule_conf, 'tcp', 'flags') if tcp_flags: output.append(parse_tcp_flags(tcp_flags)) output.append('counter') if 'set' in rule_conf: output.append(parse_policy_set(rule_conf['set'], def_suffix)) if 'action' in rule_conf: output.append(nft_action(rule_conf['action'])) else: output.append('return') output.append(f'comment "{fw_name}-{rule_id}"') return " ".join(output) def parse_tcp_flags(flags): include = [flag for flag in flags if flag != 'not'] exclude = list(flags['not']) if 'not' in flags else [] return f'tcp flags & ({"|".join(include + exclude)}) == {"|".join(include) if include else "0x0"}' def parse_time(time): out = [] if 'startdate' in time: start = time['startdate'] if 'T' not in start and 'starttime' in time: start += f' {time["starttime"]}' out.append(f'time >= "{start}"') if 'starttime' in time and 'startdate' not in time: out.append(f'hour >= "{time["starttime"]}"') if 'stopdate' in time: stop = time['stopdate'] if 'T' not in stop and 'stoptime' in time: stop += f' {time["stoptime"]}' out.append(f'time < "{stop}"') if 'stoptime' in time and 'stopdate' not in time: out.append(f'hour < "{time["stoptime"]}"') if 'weekdays' in time: days = time['weekdays'].split(",") out_days = [f'"{day}"' for day in days if day[0] != '!'] out.append(f'day {{{",".join(out_days)}}}') return " ".join(out) def parse_policy_set(set_conf, def_suffix): out = [] if 'dscp' in set_conf: dscp = set_conf['dscp'] out.append(f'ip{def_suffix} dscp set {dscp}') if 'mark' in set_conf: mark = set_conf['mark'] out.append(f'meta mark set {mark}') if 'table' in set_conf: table = set_conf['table'] if table == 'main': table = '254' mark = 0x7FFFFFFF - int(table) out.append(f'meta mark set {mark}') if 'tcp_mss' in set_conf: mss = set_conf['tcp_mss'] out.append(f'tcp option maxseg size set {mss}') return " ".join(out) diff --git a/python/vyos/template.py b/python/vyos/template.py index 132f5ddde..b41525421 100644 --- a/python/vyos/template.py +++ b/python/vyos/template.py @@ -1,643 +1,644 @@ # Copyright 2019-2020 VyOS maintainers and contributors <maintainers@vyos.io> # # This library is free software; you can redistribute it and/or # modify it under the terms of the GNU Lesser General Public # License as published by the Free Software Foundation; either # version 2.1 of the License, or (at your option) any later version. # # This library is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU # Lesser General Public License for more details. # # You should have received a copy of the GNU Lesser General Public # License along with this library. If not, see <http://www.gnu.org/licenses/>. import functools import os from jinja2 import Environment from jinja2 import FileSystemLoader from jinja2 import ChainableUndefined from vyos.defaults import directories from vyos.util import chmod from vyos.util import chown from vyos.util import dict_search_args from vyos.util import makedir # Holds template filters registered via register_filter() _FILTERS = {} _TESTS = {} # reuse Environments with identical settings to improve performance @functools.lru_cache(maxsize=2) def _get_environment(location=None): if location is None: loc_loader=FileSystemLoader(directories["templates"]) else: loc_loader=FileSystemLoader(location) env = Environment( # Don't check if template files were modified upon re-rendering auto_reload=False, # Cache up to this number of templates for quick re-rendering cache_size=100, loader=loc_loader, trim_blocks=True, undefined=ChainableUndefined, ) env.filters.update(_FILTERS) env.tests.update(_TESTS) return env def register_filter(name, func=None): """Register a function to be available as filter in templates under given name. It can also be used as a decorator, see below in this module for examples. :raise RuntimeError: when trying to register a filter after a template has been rendered already :raise ValueError: when trying to register a name which was taken already """ if func is None: return functools.partial(register_filter, name) if _get_environment.cache_info().currsize: raise RuntimeError( "Filters can only be registered before rendering the first template" ) if name in _FILTERS: raise ValueError(f"A filter with name {name!r} was registered already") _FILTERS[name] = func return func def register_test(name, func=None): """Register a function to be available as test in templates under given name. It can also be used as a decorator, see below in this module for examples. :raise RuntimeError: when trying to register a test after a template has been rendered already :raise ValueError: when trying to register a name which was taken already """ if func is None: return functools.partial(register_test, name) if _get_environment.cache_info().currsize: raise RuntimeError( "Tests can only be registered before rendering the first template" ) if name in _TESTS: raise ValueError(f"A test with name {name!r} was registered already") _TESTS[name] = func return func def render_to_string(template, content, formater=None, location=None): """Render a template from the template directory, raise on any errors. :param template: the path to the template relative to the template folder :param content: the dictionary of variables to put into rendering context :param formater: if given, it has to be a callable the rendered string is passed through The parsed template files are cached, so rendering the same file multiple times does not cause as too much overhead. If used everywhere, it could be changed to load the template from Python environment variables from an importable Python module generated when the Debian package is build (recovering the load time and overhead caused by having the file out of the code). """ template = _get_environment(location).get_template(template) rendered = template.render(content) if formater is not None: rendered = formater(rendered) return rendered def render( destination, template, content, formater=None, permission=None, user=None, group=None, location=None, ): """Render a template from the template directory to a file, raise on any errors. :param destination: path to the file to save the rendered template in :param permission: permission bitmask to set for the output file :param user: user to own the output file :param group: group to own the output file All other parameters are as for :func:`render_to_string`. """ # Create the directory if it does not exist folder = os.path.dirname(destination) makedir(folder, user, group) # As we are opening the file with 'w', we are performing the rendering before # calling open() to not accidentally erase the file if rendering fails rendered = render_to_string(template, content, formater, location) # Write to file with open(destination, "w") as file: chmod(file.fileno(), permission) chown(file.fileno(), user, group) file.write(rendered) ################################## # Custom template filters follow # ################################## @register_filter('force_to_list') def force_to_list(value): """ Convert scalars to single-item lists and leave lists untouched """ if isinstance(value, list): return value else: return [value] @register_filter('ip_from_cidr') def ip_from_cidr(prefix): """ Take an IPv4/IPv6 CIDR host and strip cidr mask. Example: 192.0.2.1/24 -> 192.0.2.1, 2001:db8::1/64 -> 2001:db8::1 """ from ipaddress import ip_interface return str(ip_interface(prefix).ip) @register_filter('address_from_cidr') def address_from_cidr(prefix): """ Take an IPv4/IPv6 CIDR prefix and convert the network to an "address". Example: 192.0.2.0/24 -> 192.0.2.0, 2001:db8::/48 -> 2001:db8:: """ from ipaddress import ip_network return str(ip_network(prefix).network_address) @register_filter('bracketize_ipv6') def bracketize_ipv6(address): """ Place a passed IPv6 address into [] brackets, do nothing for IPv4 """ if is_ipv6(address): return f'[{address}]' return address @register_filter('dot_colon_to_dash') def dot_colon_to_dash(text): """ Replace dot and colon to dash for string Example: 192.0.2.1 => 192-0-2-1, 2001:db8::1 => 2001-db8--1 """ text = text.replace(":", "-") text = text.replace(".", "-") return text @register_filter('netmask_from_cidr') def netmask_from_cidr(prefix): """ Take CIDR prefix and convert the prefix length to a "subnet mask". Example: - 192.0.2.0/24 -> 255.255.255.0 - 2001:db8::/48 -> ffff:ffff:ffff:: """ from ipaddress import ip_network return str(ip_network(prefix).netmask) @register_filter('netmask_from_ipv4') def netmask_from_ipv4(address): """ Take IP address and search all attached interface IP addresses for the given one. After address has been found, return the associated netmask. Example: - 172.18.201.10 -> 255.255.255.128 """ from netifaces import interfaces from netifaces import ifaddresses from netifaces import AF_INET for interface in interfaces(): tmp = ifaddresses(interface) if AF_INET in tmp: for af_addr in tmp[AF_INET]: if 'addr' in af_addr: if af_addr['addr'] == address: return af_addr['netmask'] raise ValueError @register_filter('is_ip_network') def is_ip_network(addr): """ Take IP(v4/v6) address and validate if the passed argument is a network or a host address. Example: - 192.0.2.0 -> False - 192.0.2.10/24 -> False - 192.0.2.0/24 -> True - 2001:db8:: -> False - 2001:db8::100 -> False - 2001:db8::/48 -> True - 2001:db8:1000::/64 -> True """ try: from ipaddress import ip_network # input variables must contain a / to indicate its CIDR notation if len(addr.split('/')) != 2: raise ValueError() ip_network(addr) return True except: return False @register_filter('network_from_ipv4') def network_from_ipv4(address): """ Take IP address and search all attached interface IP addresses for the given one. After address has been found, return the associated network address. Example: - 172.18.201.10 has mask 255.255.255.128 -> network is 172.18.201.0 """ netmask = netmask_from_ipv4(address) from ipaddress import ip_interface cidr_prefix = ip_interface(f'{address}/{netmask}').network return address_from_cidr(cidr_prefix) @register_filter('is_interface') def is_interface(interface): """ Check if parameter is a valid local interface name """ return os.path.exists(f'/sys/class/net/{interface}') @register_filter('is_ip') def is_ip(addr): """ Check addr if it is an IPv4 or IPv6 address """ return is_ipv4(addr) or is_ipv6(addr) @register_filter('is_ipv4') def is_ipv4(text): """ Filter IP address, return True on IPv4 address, False otherwise """ from ipaddress import ip_interface try: return ip_interface(text).version == 4 except: return False @register_filter('is_ipv6') def is_ipv6(text): """ Filter IP address, return True on IPv6 address, False otherwise """ from ipaddress import ip_interface try: return ip_interface(text).version == 6 except: return False @register_filter('first_host_address') def first_host_address(text): """ Return first usable (host) IP address from given prefix. Example: - 10.0.0.0/24 -> 10.0.0.1 - 2001:db8::/64 -> 2001:db8:: """ from ipaddress import ip_interface from ipaddress import IPv4Network from ipaddress import IPv6Network addr = ip_interface(text) if addr.version == 4: return str(addr.ip +1) return str(addr.ip) @register_filter('last_host_address') def last_host_address(text): """ Return first usable IP address from given prefix. Example: - 10.0.0.0/24 -> 10.0.0.254 - 2001:db8::/64 -> 2001:db8::ffff:ffff:ffff:ffff """ from ipaddress import ip_interface from ipaddress import IPv4Network from ipaddress import IPv6Network addr = ip_interface(text) if addr.version == 4: return str(IPv4Network(addr).broadcast_address - 1) return str(IPv6Network(addr).broadcast_address) @register_filter('inc_ip') def inc_ip(address, increment): """ Increment given IP address by 'increment' Example (inc by 2): - 10.0.0.0/24 -> 10.0.0.2 - 2001:db8::/64 -> 2001:db8::2 """ from ipaddress import ip_interface return str(ip_interface(address).ip + int(increment)) @register_filter('dec_ip') def dec_ip(address, decrement): """ Decrement given IP address by 'decrement' Example (inc by 2): - 10.0.0.0/24 -> 10.0.0.2 - 2001:db8::/64 -> 2001:db8::2 """ from ipaddress import ip_interface return str(ip_interface(address).ip - int(decrement)) @register_filter('compare_netmask') def compare_netmask(netmask1, netmask2): """ Compare two IP netmask if they have the exact same size. compare_netmask('10.0.0.0/8', '20.0.0.0/8') -> True compare_netmask('10.0.0.0/8', '20.0.0.0/16') -> False """ from ipaddress import ip_network try: return ip_network(netmask1).netmask == ip_network(netmask2).netmask except: return False @register_filter('isc_static_route') def isc_static_route(subnet, router): # https://ercpe.de/blog/pushing-static-routes-with-isc-dhcp-server # Option format is: # <netmask>, <network-byte1>, <network-byte2>, <network-byte3>, <router-byte1>, <router-byte2>, <router-byte3> # where bytes with the value 0 are omitted. from ipaddress import ip_network net = ip_network(subnet) # add netmask string = str(net.prefixlen) + ',' # add network bytes if net.prefixlen: width = net.prefixlen // 8 if net.prefixlen % 8: width += 1 string += ','.join(map(str,tuple(net.network_address.packed)[:width])) + ',' # add router bytes string += ','.join(router.split('.')) return string @register_filter('is_file') def is_file(filename): if os.path.exists(filename): return os.path.isfile(filename) return False @register_filter('get_dhcp_router') def get_dhcp_router(interface): """ Static routes can point to a router received by a DHCP reply. This helper is used to get the current default router from the DHCP reply. Returns False of no router is found, returns the IP address as string if a router is found. """ lease_file = f'/var/lib/dhcp/dhclient_{interface}.leases' if not os.path.exists(lease_file): return None from vyos.util import read_file for line in read_file(lease_file).splitlines(): if 'option routers' in line: (_, _, address) = line.split() return address.rstrip(';') @register_filter('natural_sort') def natural_sort(iterable): import re from jinja2.runtime import Undefined if isinstance(iterable, Undefined) or iterable is None: return list() def convert(text): return int(text) if text.isdigit() else text.lower() def alphanum_key(key): return [convert(c) for c in re.split('([0-9]+)', str(key))] return sorted(iterable, key=alphanum_key) @register_filter('get_ipv4') def get_ipv4(interface): """ Get interface IPv4 addresses""" from vyos.ifconfig import Interface return Interface(interface).get_addr_v4() @register_filter('get_ipv6') def get_ipv6(interface): """ Get interface IPv6 addresses""" from vyos.ifconfig import Interface return Interface(interface).get_addr_v6() @register_filter('get_ip') def get_ip(interface): """ Get interface IP addresses""" from vyos.ifconfig import Interface return Interface(interface).get_addr() def get_first_ike_dh_group(ike_group): if ike_group and 'proposal' in ike_group: for priority, proposal in ike_group['proposal'].items(): if 'dh_group' in proposal: return 'dh-group' + proposal['dh_group'] return 'dh-group2' # Fallback on dh-group2 @register_filter('get_esp_ike_cipher') def get_esp_ike_cipher(group_config, ike_group=None): pfs_lut = { 'dh-group1' : 'modp768', 'dh-group2' : 'modp1024', 'dh-group5' : 'modp1536', 'dh-group14' : 'modp2048', 'dh-group15' : 'modp3072', 'dh-group16' : 'modp4096', 'dh-group17' : 'modp6144', 'dh-group18' : 'modp8192', 'dh-group19' : 'ecp256', 'dh-group20' : 'ecp384', 'dh-group21' : 'ecp521', 'dh-group22' : 'modp1024s160', 'dh-group23' : 'modp2048s224', 'dh-group24' : 'modp2048s256', 'dh-group25' : 'ecp192', 'dh-group26' : 'ecp224', 'dh-group27' : 'ecp224bp', 'dh-group28' : 'ecp256bp', 'dh-group29' : 'ecp384bp', 'dh-group30' : 'ecp512bp', 'dh-group31' : 'curve25519', 'dh-group32' : 'curve448' } ciphers = [] if 'proposal' in group_config: for priority, proposal in group_config['proposal'].items(): # both encryption and hash need to be specified for a proposal if not {'encryption', 'hash'} <= set(proposal): continue tmp = '{encryption}-{hash}'.format(**proposal) if 'dh_group' in proposal: tmp += '-' + pfs_lut[ 'dh-group' + proposal['dh_group'] ] elif 'pfs' in group_config and group_config['pfs'] != 'disable': group = group_config['pfs'] if group_config['pfs'] == 'enable': group = get_first_ike_dh_group(ike_group) tmp += '-' + pfs_lut[group] ciphers.append(tmp) return ciphers @register_filter('get_uuid') def get_uuid(interface): """ Get interface IP addresses""" from uuid import uuid1 return uuid1() openvpn_translate = { 'des': 'des-cbc', '3des': 'des-ede3-cbc', 'bf128': 'bf-cbc', 'bf256': 'bf-cbc', 'aes128gcm': 'aes-128-gcm', 'aes128': 'aes-128-cbc', 'aes192gcm': 'aes-192-gcm', 'aes192': 'aes-192-cbc', 'aes256gcm': 'aes-256-gcm', 'aes256': 'aes-256-cbc' } @register_filter('openvpn_cipher') def get_openvpn_cipher(cipher): if cipher in openvpn_translate: return openvpn_translate[cipher].upper() return cipher.upper() @register_filter('openvpn_ncp_ciphers') def get_openvpn_ncp_ciphers(ciphers): out = [] for cipher in ciphers: if cipher in openvpn_translate: out.append(openvpn_translate[cipher]) else: out.append(cipher) return ':'.join(out).upper() @register_filter('snmp_auth_oid') def snmp_auth_oid(type): if type not in ['md5', 'sha', 'aes', 'des', 'none']: raise ValueError() OIDs = { 'md5' : '.1.3.6.1.6.3.10.1.1.2', 'sha' : '.1.3.6.1.6.3.10.1.1.3', 'aes' : '.1.3.6.1.6.3.10.1.2.4', 'des' : '.1.3.6.1.6.3.10.1.2.2', 'none': '.1.3.6.1.6.3.10.1.2.1' } return OIDs[type] @register_filter('nft_action') def nft_action(vyos_action): if vyos_action == 'accept': return 'return' return vyos_action @register_filter('nft_rule') def nft_rule(rule_conf, fw_name, rule_id, ip_name='ip'): from vyos.firewall import parse_rule return parse_rule(rule_conf, fw_name, rule_id, ip_name) @register_filter('nft_default_rule') def nft_default_rule(fw_conf, fw_name): output = ['counter'] default_action = fw_conf.get('default_action', 'accept') if 'enable_default_log' in fw_conf: action_suffix = default_action[:1].upper() - output.append(f'log prefix "[{fw_name[:19]}-default-{action_suffix}] "') + log_level = fw_conf['enable_default_log'] + output.append(f'log prefix "[{fw_name[:19]}-default-{action_suffix}]" level {log_level}') output.append(nft_action(default_action)) output.append(f'comment "{fw_name} default-action {default_action}"') return " ".join(output) @register_filter('nft_state_policy') def nft_state_policy(conf, state, ipv6=False): out = [f'ct state {state}'] if 'log' in conf and 'enable' in conf['log']: out.append('log') out.append('counter') if 'action' in conf: out.append(conf['action']) return " ".join(out) @register_filter('nft_intra_zone_action') def nft_intra_zone_action(zone_conf, ipv6=False): if 'intra_zone_filtering' in zone_conf: intra_zone = zone_conf['intra_zone_filtering'] fw_name = 'ipv6_name' if ipv6 else 'name' name_prefix = 'NAME6_' if ipv6 else 'NAME_' if 'action' in intra_zone: if intra_zone['action'] == 'accept': return 'return' return intra_zone['action'] elif dict_search_args(intra_zone, 'firewall', fw_name): name = dict_search_args(intra_zone, 'firewall', fw_name) return f'jump {name_prefix}{name}' return 'return' @register_test('vyos_defined') def vyos_defined(value, test_value=None, var_type=None): """ Jinja2 plugin to test if a variable is defined and not none - vyos_defined will test value if defined and is not none and return true or false. If test_value is supplied, the value must also pass == test_value to return true. If var_type is supplied, the value must also be of the specified class/type Examples: 1. Test if var is defined and not none: {% if foo is vyos_defined %} ... {% endif %} 2. Test if variable is defined, not none and has value "something" {% if bar is vyos_defined("something") %} ... {% endif %} Parameters ---------- value : any Value to test from ansible test_value : any, optional Value to test in addition of defined and not none, by default None var_type : ['float', 'int', 'str', 'list', 'dict', 'tuple', 'bool'], optional Type or Class to test for Returns ------- boolean True if variable matches criteria, False in other cases. Implementation inspired and re-used from https://github.com/aristanetworks/ansible-avd/ """ from jinja2 import Undefined if isinstance(value, Undefined) or value is None: # Invalid value - return false return False elif test_value is not None and value != test_value: # Valid value but not matching the optional argument return False elif str(var_type).lower() in ['float', 'int', 'str', 'list', 'dict', 'tuple', 'bool'] and str(var_type).lower() != type(value).__name__: # Invalid class - return false return False else: # Valid value and is matching optional argument if provided - return true return True diff --git a/smoketest/scripts/cli/test_firewall.py b/smoketest/scripts/cli/test_firewall.py index b8f944575..7b9691d9d 100755 --- a/smoketest/scripts/cli/test_firewall.py +++ b/smoketest/scripts/cli/test_firewall.py @@ -1,233 +1,240 @@ #!/usr/bin/env python3 # # Copyright (C) 2021-2022 VyOS maintainers and contributors # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License version 2 or later as # published by the Free Software Foundation. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. import unittest from glob import glob from base_vyostest_shim import VyOSUnitTestSHIM from vyos.util import cmd sysfs_config = { 'all_ping': {'sysfs': '/proc/sys/net/ipv4/icmp_echo_ignore_all', 'default': '0', 'test_value': 'disable'}, 'broadcast_ping': {'sysfs': '/proc/sys/net/ipv4/icmp_echo_ignore_broadcasts', 'default': '1', 'test_value': 'enable'}, 'ip_src_route': {'sysfs': '/proc/sys/net/ipv4/conf/*/accept_source_route', 'default': '0', 'test_value': 'enable'}, 'ipv6_receive_redirects': {'sysfs': '/proc/sys/net/ipv6/conf/*/accept_redirects', 'default': '0', 'test_value': 'enable'}, 'ipv6_src_route': {'sysfs': '/proc/sys/net/ipv6/conf/*/accept_source_route', 'default': '-1', 'test_value': 'enable'}, 'log_martians': {'sysfs': '/proc/sys/net/ipv4/conf/all/log_martians', 'default': '1', 'test_value': 'disable'}, 'receive_redirects': {'sysfs': '/proc/sys/net/ipv4/conf/*/accept_redirects', 'default': '0', 'test_value': 'enable'}, 'send_redirects': {'sysfs': '/proc/sys/net/ipv4/conf/*/send_redirects', 'default': '1', 'test_value': 'disable'}, 'syn_cookies': {'sysfs': '/proc/sys/net/ipv4/tcp_syncookies', 'default': '1', 'test_value': 'disable'}, 'twa_hazards_protection': {'sysfs': '/proc/sys/net/ipv4/tcp_rfc1337', 'default': '0', 'test_value': 'enable'} } class TestFirewall(VyOSUnitTestSHIM.TestCase): @classmethod def setUpClass(cls): super(TestFirewall, cls).setUpClass() # ensure we can also run this test on a live system - so lets clean # out the current configuration :) cls.cli_delete(cls, ['firewall']) cls.cli_set(cls, ['interfaces', 'ethernet', 'eth0', 'address', '172.16.10.1/24']) @classmethod def tearDownClass(cls): cls.cli_delete(cls, ['interfaces', 'ethernet', 'eth0', 'address', '172.16.10.1/24']) super(TestFirewall, cls).tearDownClass() def tearDown(self): self.cli_delete(['interfaces', 'ethernet', 'eth0', 'firewall']) self.cli_delete(['firewall']) self.cli_commit() def test_groups(self): self.cli_set(['firewall', 'group', 'mac-group', 'smoketest_mac', 'mac-address', '00:01:02:03:04:05']) self.cli_set(['firewall', 'group', 'network-group', 'smoketest_network', 'network', '172.16.99.0/24']) self.cli_set(['firewall', 'group', 'port-group', 'smoketest_port', 'port', '53']) self.cli_set(['firewall', 'group', 'port-group', 'smoketest_port', 'port', '123']) self.cli_set(['firewall', 'name', 'smoketest', 'rule', '1', 'action', 'accept']) self.cli_set(['firewall', 'name', 'smoketest', 'rule', '1', 'source', 'group', 'network-group', 'smoketest_network']) self.cli_set(['firewall', 'name', 'smoketest', 'rule', '1', 'destination', 'address', '172.16.10.10']) self.cli_set(['firewall', 'name', 'smoketest', 'rule', '1', 'destination', 'group', 'port-group', 'smoketest_port']) self.cli_set(['firewall', 'name', 'smoketest', 'rule', '1', 'protocol', 'tcp_udp']) self.cli_set(['firewall', 'name', 'smoketest', 'rule', '2', 'action', 'accept']) self.cli_set(['firewall', 'name', 'smoketest', 'rule', '2', 'source', 'group', 'mac-group', 'smoketest_mac']) self.cli_set(['interfaces', 'ethernet', 'eth0', 'firewall', 'in', 'name', 'smoketest']) self.cli_commit() nftables_search = [ ['iifname "eth0"', 'jump NAME_smoketest'], ['ip saddr { 172.16.99.0/24 }', 'ip daddr 172.16.10.10', 'th dport { 53, 123 }', 'return'], ['ether saddr { 00:01:02:03:04:05 }', 'return'] ] nftables_output = cmd('sudo nft list table ip filter') for search in nftables_search: matched = False for line in nftables_output.split("\n"): if all(item in line for item in search): matched = True break self.assertTrue(matched, msg=search) def test_basic_rules(self): self.cli_set(['firewall', 'name', 'smoketest', 'default-action', 'drop']) + self.cli_set(['firewall', 'name', 'smoketest', 'enable-default-log', 'info']) self.cli_set(['firewall', 'name', 'smoketest', 'rule', '1', 'action', 'accept']) self.cli_set(['firewall', 'name', 'smoketest', 'rule', '1', 'source', 'address', '172.16.20.10']) self.cli_set(['firewall', 'name', 'smoketest', 'rule', '1', 'destination', 'address', '172.16.10.10']) + self.cli_set(['firewall', 'name', 'smoketest', 'rule', '1', 'log', 'debug']) self.cli_set(['firewall', 'name', 'smoketest', 'rule', '2', 'action', 'reject']) self.cli_set(['firewall', 'name', 'smoketest', 'rule', '2', 'protocol', 'tcp']) self.cli_set(['firewall', 'name', 'smoketest', 'rule', '2', 'destination', 'port', '8888']) + self.cli_set(['firewall', 'name', 'smoketest', 'rule', '2', 'log', 'err']) self.cli_set(['firewall', 'name', 'smoketest', 'rule', '2', 'tcp', 'flags', 'syn']) self.cli_set(['firewall', 'name', 'smoketest', 'rule', '2', 'tcp', 'flags', 'not', 'ack']) self.cli_set(['firewall', 'name', 'smoketest', 'rule', '3', 'action', 'accept']) self.cli_set(['firewall', 'name', 'smoketest', 'rule', '3', 'protocol', 'tcp']) self.cli_set(['firewall', 'name', 'smoketest', 'rule', '3', 'destination', 'port', '22']) self.cli_set(['firewall', 'name', 'smoketest', 'rule', '3', 'limit', 'rate', '5/minute']) self.cli_set(['interfaces', 'ethernet', 'eth0', 'firewall', 'in', 'name', 'smoketest']) self.cli_commit() nftables_search = [ ['iifname "eth0"', 'jump NAME_smoketest'], - ['saddr 172.16.20.10', 'daddr 172.16.10.10', 'return'], - ['tcp flags & (syn | ack) == syn', 'tcp dport { 8888 }', 'reject'], + ['saddr 172.16.20.10', 'daddr 172.16.10.10', 'log prefix "[smoketest-1-A]" level debug','return'], + ['tcp flags & (syn | ack) == syn', 'tcp dport { 8888 }', 'log prefix "[smoketest-2-R]" level err', 'reject'], ['tcp dport { 22 }', 'limit rate 5/minute', 'return'], - ['smoketest default-action', 'drop'] + ['log prefix "[smoketest-default-D]" level info','smoketest default-action', 'drop'] ] nftables_output = cmd('sudo nft list table ip filter') for search in nftables_search: matched = False for line in nftables_output.split("\n"): if all(item in line for item in search): matched = True break self.assertTrue(matched, msg=search) def test_basic_rules_ipv6(self): self.cli_set(['firewall', 'ipv6-name', 'v6-smoketest', 'default-action', 'drop']) + self.cli_set(['firewall', 'ipv6-name', 'v6-smoketest', 'enable-default-log', 'emerg']) + self.cli_set(['firewall', 'ipv6-name', 'v6-smoketest', 'rule', '1', 'action', 'accept']) self.cli_set(['firewall', 'ipv6-name', 'v6-smoketest', 'rule', '1', 'source', 'address', '2002::1']) self.cli_set(['firewall', 'ipv6-name', 'v6-smoketest', 'rule', '1', 'destination', 'address', '2002::1:1']) + self.cli_set(['firewall', 'ipv6-name', 'v6-smoketest', 'rule', '1', 'log', 'crit']) + self.cli_set(['firewall', 'ipv6-name', 'v6-smoketest', 'rule', '2', 'action', 'reject']) self.cli_set(['firewall', 'ipv6-name', 'v6-smoketest', 'rule', '2', 'protocol', 'tcp_udp']) self.cli_set(['firewall', 'ipv6-name', 'v6-smoketest', 'rule', '2', 'destination', 'port', '8888']) self.cli_set(['interfaces', 'ethernet', 'eth0', 'firewall', 'in', 'ipv6-name', 'v6-smoketest']) self.cli_commit() nftables_search = [ ['iifname "eth0"', 'jump NAME6_v6-smoketest'], - ['saddr 2002::1', 'daddr 2002::1:1', 'return'], + ['saddr 2002::1', 'daddr 2002::1:1', 'log prefix "[v6-smoketest-1-A]" level crit', 'return'], ['meta l4proto { tcp, udp }', 'th dport { 8888 }', 'reject'], - ['smoketest default-action', 'drop'] + ['smoketest default-action', 'log prefix "[v6-smoketest-default-D]" level emerg', 'drop'] ] nftables_output = cmd('sudo nft list table ip6 filter') for search in nftables_search: matched = False for line in nftables_output.split("\n"): if all(item in line for item in search): matched = True break self.assertTrue(matched, msg=search) def test_state_policy(self): self.cli_set(['firewall', 'state-policy', 'established', 'action', 'accept']) self.cli_set(['firewall', 'state-policy', 'related', 'action', 'accept']) self.cli_set(['firewall', 'state-policy', 'invalid', 'action', 'drop']) self.cli_commit() chains = { 'ip filter': ['VYOS_FW_FORWARD', 'VYOS_FW_OUTPUT', 'VYOS_FW_LOCAL'], 'ip6 filter': ['VYOS_FW6_FORWARD', 'VYOS_FW6_OUTPUT', 'VYOS_FW6_LOCAL'] } for table in ['ip filter', 'ip6 filter']: for chain in chains[table]: nftables_output = cmd(f'sudo nft list chain {table} {chain}') self.assertTrue('jump VYOS_STATE_POLICY' in nftables_output) def test_state_and_status_rules(self): self.cli_set(['firewall', 'name', 'smoketest', 'default-action', 'drop']) self.cli_set(['firewall', 'name', 'smoketest', 'rule', '1', 'action', 'accept']) self.cli_set(['firewall', 'name', 'smoketest', 'rule', '1', 'state', 'established', 'enable']) self.cli_set(['firewall', 'name', 'smoketest', 'rule', '1', 'state', 'related', 'enable']) self.cli_set(['firewall', 'name', 'smoketest', 'rule', '2', 'action', 'reject']) self.cli_set(['firewall', 'name', 'smoketest', 'rule', '2', 'state', 'invalid', 'enable']) self.cli_set(['firewall', 'name', 'smoketest', 'rule', '3', 'action', 'accept']) self.cli_set(['firewall', 'name', 'smoketest', 'rule', '3', 'state', 'new', 'enable']) self.cli_set(['firewall', 'name', 'smoketest', 'rule', '3', 'connection-status', 'nat', 'destination']) self.cli_set(['firewall', 'name', 'smoketest', 'rule', '4', 'action', 'accept']) self.cli_set(['firewall', 'name', 'smoketest', 'rule', '4', 'state', 'new', 'enable']) self.cli_set(['firewall', 'name', 'smoketest', 'rule', '4', 'state', 'established', 'enable']) self.cli_set(['firewall', 'name', 'smoketest', 'rule', '4', 'connection-status', 'nat', 'source']) self.cli_set(['interfaces', 'ethernet', 'eth0', 'firewall', 'in', 'name', 'smoketest']) self.cli_commit() nftables_search = [ ['iifname "eth0"', 'jump NAME_smoketest'], ['ct state { established, related }', 'return'], ['ct state { invalid }', 'reject'], ['ct state { new }', 'ct status { dnat }', 'return'], ['ct state { established, new }', 'ct status { snat }', 'return'], ['smoketest default-action', 'drop'] ] nftables_output = cmd('sudo nft list table ip filter') for search in nftables_search: matched = False for line in nftables_output.split("\n"): if all(item in line for item in search): matched = True break self.assertTrue(matched, msg=search) def test_sysfs(self): for name, conf in sysfs_config.items(): paths = glob(conf['sysfs']) for path in paths: with open(path, 'r') as f: self.assertEqual(f.read().strip(), conf['default'], msg=path) self.cli_set(['firewall', name.replace("_", "-"), conf['test_value']]) self.cli_commit() for name, conf in sysfs_config.items(): paths = glob(conf['sysfs']) for path in paths: with open(path, 'r') as f: self.assertNotEqual(f.read().strip(), conf['default'], msg=path) if __name__ == '__main__': unittest.main(verbosity=2) diff --git a/src/migration-scripts/firewall/6-to-7 b/src/migration-scripts/firewall/6-to-7 index 5f4cff90d..1e698da0b 100755 --- a/src/migration-scripts/firewall/6-to-7 +++ b/src/migration-scripts/firewall/6-to-7 @@ -1,226 +1,253 @@ #!/usr/bin/env python3 # # Copyright (C) 2021 VyOS maintainers and contributors # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License version 2 or later as # published by the Free Software Foundation. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. # T2199: Remove unavailable nodes due to XML/Python implementation using nftables # monthdays: nftables does not have a monthdays equivalent # utc: nftables userspace uses localtime and calculates the UTC offset automatically # icmp/v6: migrate previously available `type-name` to valid type/code # T4178: Update tcp flags to use multi value node +# T3907: Add log levels +# `enable-default-log` --> `enable-default-log warn` +# `rule X log enable` --> `rule X log warn` +# `rule X log disable` --> No log config + import re from sys import argv from sys import exit from vyos.configtree import ConfigTree from vyos.ifconfig import Section if (len(argv) < 1): print("Must specify file name!") exit(1) file_name = argv[1] with open(file_name, 'r') as f: config_file = f.read() base = ['firewall'] config = ConfigTree(config_file) if not config.exists(base): # Nothing to do exit(0) icmp_remove = ['any'] icmp_translations = { 'ping': 'echo-request', 'pong': 'echo-reply', 'ttl-exceeded': 'time-exceeded', # Network Unreachable 'network-unreachable': [3, 0], 'host-unreachable': [3, 1], 'protocol-unreachable': [3, 2], 'port-unreachable': [3, 3], 'fragmentation-needed': [3, 4], 'source-route-failed': [3, 5], 'network-unknown': [3, 6], 'host-unknown': [3, 7], 'network-prohibited': [3, 9], 'host-prohibited': [3, 10], 'TOS-network-unreachable': [3, 11], 'TOS-host-unreachable': [3, 12], 'communication-prohibited': [3, 13], 'host-precedence-violation': [3, 14], 'precedence-cutoff': [3, 15], # Redirect 'network-redirect': [5, 0], 'host-redirect': [5, 1], 'TOS-network-redirect': [5, 2], 'TOS host-redirect': [5, 3], # Time Exceeded 'ttl-zero-during-transit': [11, 0], 'ttl-zero-during-reassembly': [11, 1], # Parameter Problem 'ip-header-bad': [12, 0], 'required-option-missing': [12, 1] } icmpv6_remove = [] icmpv6_translations = { 'ping': 'echo-request', 'pong': 'echo-reply', # Destination Unreachable 'no-route': [1, 0], 'communication-prohibited': [1, 1], 'address-unreachble': [1, 3], 'port-unreachable': [1, 4], # Redirect 'redirect': 'nd-redirect', # Time Exceeded 'ttl-zero-during-transit': [3, 0], 'ttl-zero-during-reassembly': [3, 1], # Parameter Problem 'bad-header': [4, 0], 'unknown-header-type': [4, 1], 'unknown-option': [4, 2] } if config.exists(base + ['name']): for name in config.list_nodes(base + ['name']): + if config.exists(base + ['name', name, 'enable-default-log']): + config.set(base + ['name', name, 'enable-default-log'], value='warn') + if not config.exists(base + ['name', name, 'rule']): continue for rule in config.list_nodes(base + ['name', name, 'rule']): rule_recent = base + ['name', name, 'rule', rule, 'recent'] rule_time = base + ['name', name, 'rule', rule, 'time'] rule_tcp_flags = base + ['name', name, 'rule', rule, 'tcp', 'flags'] rule_icmp = base + ['name', name, 'rule', rule, 'icmp'] + rule_log = base + ['name', name, 'rule', rule, 'log'] if config.exists(rule_time + ['monthdays']): config.delete(rule_time + ['monthdays']) if config.exists(rule_time + ['utc']): config.delete(rule_time + ['utc']) if config.exists(rule_recent + ['time']): tmp = int(config.return_value(rule_recent + ['time'])) unit = 'minute' if tmp > 600: unit = 'hour' elif tmp < 10: unit = 'second' config.set(rule_recent + ['time'], value=unit) if config.exists(rule_tcp_flags): tmp = config.return_value(rule_tcp_flags) config.delete(rule_tcp_flags) for flag in tmp.split(","): if flag[0] == '!': config.set(rule_tcp_flags + ['not', flag[1:].lower()]) else: config.set(rule_tcp_flags + [flag.lower()]) if config.exists(rule_icmp + ['type-name']): tmp = config.return_value(rule_icmp + ['type-name']) if tmp in icmp_remove: config.delete(rule_icmp + ['type-name']) elif tmp in icmp_translations: translate = icmp_translations[tmp] if isinstance(translate, str): config.set(rule_icmp + ['type-name'], value=translate) elif isinstance(translate, list): config.delete(rule_icmp + ['type-name']) config.set(rule_icmp + ['type'], value=translate[0]) config.set(rule_icmp + ['code'], value=translate[1]) + if config.exists(rule_log): + tmp = config.return_value(rule_log) + if tmp == 'disable': + config.delete(rule_log) + else: + config.set(rule_log, value='warn') + for src_dst in ['destination', 'source']: pg_base = base + ['name', name, 'rule', rule, src_dst, 'group', 'port-group'] proto_base = base + ['name', name, 'rule', rule, 'protocol'] if config.exists(pg_base) and not config.exists(proto_base): config.set(proto_base, value='tcp_udp') if config.exists(base + ['ipv6-name']): + if config.exists(base + ['ipv6-name', name, 'enable-default-log']): + config.set(base + ['ipv6-name', name, 'enable-default-log'], value='warn') + for name in config.list_nodes(base + ['ipv6-name']): if not config.exists(base + ['ipv6-name', name, 'rule']): continue for rule in config.list_nodes(base + ['ipv6-name', name, 'rule']): rule_recent = base + ['ipv6-name', name, 'rule', rule, 'recent'] rule_time = base + ['ipv6-name', name, 'rule', rule, 'time'] rule_tcp_flags = base + ['ipv6-name', name, 'rule', rule, 'tcp', 'flags'] rule_icmp = base + ['ipv6-name', name, 'rule', rule, 'icmpv6'] + rule_log = base + ['ipv6-name', name, 'rule', rule, 'log'] if config.exists(rule_time + ['monthdays']): config.delete(rule_time + ['monthdays']) if config.exists(rule_time + ['utc']): config.delete(rule_time + ['utc']) if config.exists(rule_recent + ['time']): tmp = int(config.return_value(rule_recent + ['time'])) unit = 'minute' if tmp > 600: unit = 'hour' elif tmp < 10: unit = 'second' config.set(rule_recent + ['time'], value=unit) if config.exists(rule_tcp_flags): tmp = config.return_value(rule_tcp_flags) config.delete(rule_tcp_flags) for flag in tmp.split(","): if flag[0] == '!': config.set(rule_tcp_flags + ['not', flag[1:].lower()]) else: config.set(rule_tcp_flags + [flag.lower()]) if config.exists(base + ['ipv6-name', name, 'rule', rule, 'protocol']): tmp = config.return_value(base + ['ipv6-name', name, 'rule', rule, 'protocol']) if tmp == 'icmpv6': config.set(base + ['ipv6-name', name, 'rule', rule, 'protocol'], value='ipv6-icmp') if config.exists(rule_icmp + ['type']): tmp = config.return_value(rule_icmp + ['type']) type_code_match = re.match(r'^(\d+)/(\d+)$', tmp) if type_code_match: config.set(rule_icmp + ['type'], value=type_code_match[1]) config.set(rule_icmp + ['code'], value=type_code_match[2]) elif tmp in icmpv6_remove: config.delete(rule_icmp + ['type']) elif tmp in icmpv6_translations: translate = icmpv6_translations[tmp] if isinstance(translate, str): config.delete(rule_icmp + ['type']) config.set(rule_icmp + ['type-name'], value=translate) elif isinstance(translate, list): config.set(rule_icmp + ['type'], value=translate[0]) config.set(rule_icmp + ['code'], value=translate[1]) else: config.rename(rule_icmp + ['type'], 'type-name') + if config.exists(rule_log): + tmp = config.return_value(rule_log) + if tmp == 'disable': + config.delete(rule_log) + else: + config.set(rule_log, value='warn') + for src_dst in ['destination', 'source']: pg_base = base + ['ipv6-name', name, 'rule', rule, src_dst, 'group', 'port-group'] proto_base = base + ['ipv6-name', name, 'rule', rule, 'protocol'] if config.exists(pg_base) and not config.exists(proto_base): config.set(proto_base, value='tcp_udp') try: with open(file_name, 'w') as f: f.write(config.to_string()) except OSError as e: print("Failed to save the modified config: {}".format(e)) exit(1)