diff --git a/data/templates/firewall/nftables-nat66.j2 b/data/templates/firewall/nftables-nat66.j2 index 67eb2c109..09b5b6ac2 100644 --- a/data/templates/firewall/nftables-nat66.j2 +++ b/data/templates/firewall/nftables-nat66.j2 @@ -1,40 +1,46 @@ #!/usr/sbin/nft -f +{% import 'firewall/nftables-defines.j2' as group_tmpl %} + {% if first_install is not vyos_defined %} delete table ip6 vyos_nat {% endif %} +{% if deleted is not vyos_defined %} table ip6 vyos_nat { # # Destination NAT66 rules build up here # chain PREROUTING { type nat hook prerouting priority -100; policy accept; counter jump VYOS_DNPT_HOOK -{% if destination.rule is vyos_defined %} -{% for rule, config in destination.rule.items() if config.disable is not vyos_defined %} - {{ config | nat_rule(rule, 'destination', ipv6=True) }} -{% endfor %} -{% endif %} +{% if destination.rule is vyos_defined %} +{% for rule, config in destination.rule.items() if config.disable is not vyos_defined %} + {{ config | nat_rule(rule, 'destination', ipv6=True) }} +{% endfor %} +{% endif %} } # # Source NAT66 rules build up here # chain POSTROUTING { type nat hook postrouting priority 100; policy accept; counter jump VYOS_SNPT_HOOK -{% if source.rule is vyos_defined %} -{% for rule, config in source.rule.items() if config.disable is not vyos_defined %} +{% if source.rule is vyos_defined %} +{% for rule, config in source.rule.items() if config.disable is not vyos_defined %} {{ config | nat_rule(rule, 'source', ipv6=True) }} -{% endfor %} -{% endif %} +{% endfor %} +{% endif %} } chain VYOS_DNPT_HOOK { return } chain VYOS_SNPT_HOOK { return } + +{{ group_tmpl.groups(firewall_group, True, True) }} } +{% endif %} diff --git a/interface-definitions/nat66.xml.in b/interface-definitions/nat66.xml.in index 32d501cce..c59725c53 100644 --- a/interface-definitions/nat66.xml.in +++ b/interface-definitions/nat66.xml.in @@ -1,250 +1,251 @@ <?xml version="1.0"?> <interfaceDefinition> <node name="nat66" owner="${vyos_conf_scripts_dir}/nat66.py"> <properties> <help>Network Prefix Translation (NAT66/NPTv6) parameters</help> <priority>500</priority> </properties> <children> <node name="source"> <properties> <help>Prefix mapping of IPv6 source address translation</help> </properties> <children> <tagNode name="rule"> <properties> <help>Source NAT66 rule number</help> <valueHelp> <format>u32:1-999999</format> <description>Number for this rule</description> </valueHelp> <constraint> <validator name="numeric" argument="--range 1-999999"/> </constraint> <constraintErrorMessage>NAT66 rule number must be between 1 and 999999</constraintErrorMessage> </properties> <children> #include <include/generic-description.xml.i> #include <include/generic-disable-node.xml.i> #include <include/nat-exclude.xml.i> #include <include/firewall/log.xml.i> #include <include/firewall/outbound-interface-no-group.xml.i> #include <include/nat/protocol.xml.i> <node name="destination"> <properties> <help>IPv6 destination prefix options</help> </properties> <children> <leafNode name="prefix"> <properties> <help>IPv6 prefix to be translated</help> <valueHelp> <format>ipv6net</format> <description>IPv6 prefix</description> </valueHelp> <valueHelp> <format>!ipv6net</format> <description>Match everything except the specified IPv6 prefix</description> </valueHelp> <constraint> <validator name="ipv6-prefix"/> <validator name="ipv6-prefix-exclude"/> </constraint> </properties> </leafNode> #include <include/nat-port.xml.i> </children> </node> <node name="source"> <properties> <help>IPv6 source prefix options</help> </properties> <children> <leafNode name="prefix"> <properties> <help>IPv6 prefix to be translated</help> <valueHelp> <format>ipv6net</format> <description>IPv6 prefix</description> </valueHelp> <valueHelp> <format>!ipv6net</format> <description>Match everything except the specified IPv6 prefix</description> </valueHelp> <constraint> <validator name="ipv6-prefix"/> <validator name="ipv6-prefix-exclude"/> </constraint> </properties> </leafNode> #include <include/nat-port.xml.i> </children> </node> <node name="translation"> <properties> <help>Translated IPv6 address options</help> </properties> <children> <leafNode name="address"> <properties> <help>IPv6 address to translate to</help> <completionHelp> <list>masquerade</list> </completionHelp> <valueHelp> <format>ipv6</format> <description>IPv6 address</description> </valueHelp> <valueHelp> <format>ipv6net</format> <description>IPv6 prefix</description> </valueHelp> <valueHelp> <format>masquerade</format> <description>NAT to the primary address of outbound-interface</description> </valueHelp> <constraint> <validator name="ipv6-address"/> <validator name="ipv6-prefix"/> <regex>(masquerade)</regex> </constraint> </properties> </leafNode> #include <include/nat-translation-port.xml.i> </children> </node> </children> </tagNode> </children> </node> <node name="destination"> <properties> <help>Prefix mapping for IPv6 destination address translation</help> </properties> <children> <tagNode name="rule"> <properties> <help>Destination NAT66 rule number</help> <valueHelp> <format>u32:1-999999</format> <description>Number for this rule</description> </valueHelp> <constraint> <validator name="numeric" argument="--range 1-999999"/> </constraint> <constraintErrorMessage>NAT66 rule number must be between 1 and 999999</constraintErrorMessage> </properties> <children> #include <include/generic-description.xml.i> #include <include/generic-disable-node.xml.i> #include <include/nat-exclude.xml.i> <leafNode name="log"> <properties> <help>NAT66 rule logging</help> <valueless/> </properties> </leafNode> #include <include/firewall/inbound-interface-no-group.xml.i> #include <include/nat/protocol.xml.i> <node name="destination"> <properties> <help>IPv6 destination prefix options</help> </properties> <children> <leafNode name="address"> <properties> <help>IPv6 address or prefix to be translated</help> <valueHelp> <format>ipv6</format> <description>IPv6 address</description> </valueHelp> <valueHelp> <format>ipv6net</format> <description>IPv6 prefix</description> </valueHelp> <valueHelp> <format>!ipv6</format> <description>Match everything except the specified IPv6 address</description> </valueHelp> <valueHelp> <format>!ipv6net</format> <description>Match everything except the specified IPv6 prefix</description> </valueHelp> <constraint> <validator name="ipv6-address"/> <validator name="ipv6-prefix"/> <validator name="ipv6-address-exclude"/> <validator name="ipv6-prefix-exclude"/> </constraint> </properties> </leafNode> #include <include/nat-port.xml.i> + #include <include/firewall/source-destination-group-ipv6.xml.i> </children> </node> <node name="source"> <properties> <help>IPv6 source prefix options</help> </properties> <children> <leafNode name="address"> <properties> <help>IPv6 address or prefix to be translated</help> <valueHelp> <format>ipv6</format> <description>IPv6 address</description> </valueHelp> <valueHelp> <format>ipv6net</format> <description>IPv6 prefix</description> </valueHelp> <valueHelp> <format>!ipv6</format> <description>Match everything except the specified IPv6 address</description> </valueHelp> <valueHelp> <format>!ipv6net</format> <description>Match everything except the specified IPv6 prefix</description> </valueHelp> <constraint> <validator name="ipv6-address"/> <validator name="ipv6-prefix"/> <validator name="ipv6-address-exclude"/> <validator name="ipv6-prefix-exclude"/> </constraint> </properties> </leafNode> #include <include/nat-port.xml.i> </children> </node> <node name="translation"> <properties> <help>Translated IPv6 address options</help> </properties> <children> <leafNode name="address"> <properties> <help>IPv6 address or prefix to translate to</help> <valueHelp> <format>ipv6</format> <description>IPv6 address</description> </valueHelp> <valueHelp> <format>ipv6net</format> <description>IPv6 prefix</description> </valueHelp> <constraint> <validator name="ipv6-address"/> <validator name="ipv6-prefix"/> </constraint> </properties> </leafNode> #include <include/nat-translation-port.xml.i> </children> </node> </children> </tagNode> </children> </node> </children> </node> </interfaceDefinition> diff --git a/python/vyos/nat.py b/python/vyos/nat.py index e54548788..5fab3c2a1 100644 --- a/python/vyos/nat.py +++ b/python/vyos/nat.py @@ -1,311 +1,317 @@ # Copyright (C) 2022 VyOS maintainers and contributors # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License version 2 or later as # published by the Free Software Foundation. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. from vyos.template import is_ip_network from vyos.utils.dict import dict_search_args from vyos.template import bracketize_ipv6 def parse_nat_rule(rule_conf, rule_id, nat_type, ipv6=False): output = [] ip_prefix = 'ip6' if ipv6 else 'ip' log_prefix = ('DST' if nat_type == 'destination' else 'SRC') + f'-NAT-{rule_id}' log_suffix = '' if ipv6: log_prefix = log_prefix.replace("NAT-", "NAT66-") ignore_type_addr = False translation_str = '' if 'inbound_interface' in rule_conf: operator = '' if 'name' in rule_conf['inbound_interface']: iiface = rule_conf['inbound_interface']['name'] if iiface[0] == '!': operator = '!=' iiface = iiface[1:] output.append(f'iifname {operator} {{{iiface}}}') else: iiface = rule_conf['inbound_interface']['group'] if iiface[0] == '!': operator = '!=' iiface = iiface[1:] output.append(f'iifname {operator} @I_{iiface}') if 'outbound_interface' in rule_conf: operator = '' if 'name' in rule_conf['outbound_interface']: oiface = rule_conf['outbound_interface']['name'] if oiface[0] == '!': operator = '!=' oiface = oiface[1:] output.append(f'oifname {operator} {{{oiface}}}') else: oiface = rule_conf['outbound_interface']['group'] if oiface[0] == '!': operator = '!=' oiface = oiface[1:] output.append(f'oifname {operator} @I_{oiface}') if 'protocol' in rule_conf and rule_conf['protocol'] != 'all': protocol = rule_conf['protocol'] if protocol == 'tcp_udp': protocol = '{ tcp, udp }' output.append(f'meta l4proto {protocol}') if 'packet_type' in rule_conf: output.append(f'pkttype ' + rule_conf['packet_type']) if 'exclude' in rule_conf: translation_str = 'return' log_suffix = '-EXCL' elif 'translation' in rule_conf: addr = dict_search_args(rule_conf, 'translation', 'address') port = dict_search_args(rule_conf, 'translation', 'port') if 'redirect' in rule_conf['translation']: translation_output = [f'redirect'] redirect_port = dict_search_args(rule_conf, 'translation', 'redirect', 'port') if redirect_port: translation_output.append(f'to {redirect_port}') else: translation_prefix = nat_type[:1] translation_output = [f'{translation_prefix}nat'] if addr and is_ip_network(addr): if not ipv6: map_addr = dict_search_args(rule_conf, nat_type, 'address') if map_addr: if port: translation_output.append(f'{ip_prefix} prefix to {ip_prefix} {translation_prefix}addr map {{ {map_addr} : {addr} . {port} }}') else: translation_output.append(f'{ip_prefix} prefix to {ip_prefix} {translation_prefix}addr map {{ {map_addr} : {addr} }}') ignore_type_addr = True else: translation_output.append(f'prefix to {addr}') else: translation_output.append(f'prefix to {addr}') elif addr == 'masquerade': if port: addr = f'{addr} to ' translation_output = [addr] log_suffix = '-MASQ' else: translation_output.append('to') if addr: addr = bracketize_ipv6(addr) translation_output.append(addr) options = [] addr_mapping = dict_search_args(rule_conf, 'translation', 'options', 'address_mapping') port_mapping = dict_search_args(rule_conf, 'translation', 'options', 'port_mapping') if addr_mapping == 'persistent': options.append('persistent') if port_mapping and port_mapping != 'none': options.append(port_mapping) if ((not addr) or (addr and not is_ip_network(addr))) and port: translation_str = " ".join(translation_output) + (f':{port}') else: translation_str = " ".join(translation_output) if options: translation_str += f' {",".join(options)}' if not ipv6 and 'backend' in rule_conf['load_balance']: hash_input_items = [] current_prob = 0 nat_map = [] for trans_addr, addr in rule_conf['load_balance']['backend'].items(): item_prob = int(addr['weight']) upper_limit = current_prob + item_prob - 1 hash_val = str(current_prob) + '-' + str(upper_limit) element = hash_val + " : " + trans_addr nat_map.append(element) current_prob = current_prob + item_prob elements = ' , '.join(nat_map) if 'hash' in rule_conf['load_balance'] and 'random' in rule_conf['load_balance']['hash']: translation_str += ' numgen random mod 100 map ' + '{ ' + f'{elements}' + ' }' else: for input_param in rule_conf['load_balance']['hash']: if input_param == 'source-address': param = 'ip saddr' elif input_param == 'destination-address': param = 'ip daddr' elif input_param == 'source-port': prot = rule_conf['protocol'] param = f'{prot} sport' elif input_param == 'destination-port': prot = rule_conf['protocol'] param = f'{prot} dport' hash_input_items.append(param) hash_input = ' . '.join(hash_input_items) translation_str += f' jhash ' + f'{hash_input}' + ' mod 100 map ' + '{ ' + f'{elements}' + ' }' for target in ['source', 'destination']: if target not in rule_conf: continue side_conf = rule_conf[target] prefix = target[:1] addr = dict_search_args(side_conf, 'address') if addr and not (ignore_type_addr and target == nat_type): operator = '' if addr[:1] == '!': operator = '!=' addr = addr[1:] output.append(f'{ip_prefix} {prefix}addr {operator} {addr}') addr_prefix = dict_search_args(side_conf, 'prefix') if addr_prefix and ipv6: operator = '' if addr_prefix[:1] == '!': operator = '!=' addr_prefix = addr_prefix[1:] output.append(f'ip6 {prefix}addr {operator} {addr_prefix}') port = dict_search_args(side_conf, 'port') if port: protocol = rule_conf['protocol'] if protocol == 'tcp_udp': protocol = 'th' operator = '' if port[:1] == '!': operator = '!=' port = port[1:] output.append(f'{protocol} {prefix}port {operator} {{ {port} }}') if 'group' in side_conf: group = side_conf['group'] if 'address_group' in group and not (ignore_type_addr and target == nat_type): group_name = group['address_group'] operator = '' if group_name[0] == '!': operator = '!=' group_name = group_name[1:] - output.append(f'{ip_prefix} {prefix}addr {operator} @A_{group_name}') + if ipv6: + output.append(f'{ip_prefix} {prefix}addr {operator} @A6_{group_name}') + else: + output.append(f'{ip_prefix} {prefix}addr {operator} @A_{group_name}') # Generate firewall group domain-group elif 'domain_group' in group and not (ignore_type_addr and target == nat_type): group_name = group['domain_group'] operator = '' if group_name[0] == '!': operator = '!=' group_name = group_name[1:] output.append(f'{ip_prefix} {prefix}addr {operator} @D_{group_name}') elif 'network_group' in group and not (ignore_type_addr and target == nat_type): group_name = group['network_group'] operator = '' if group_name[0] == '!': operator = '!=' group_name = group_name[1:] - output.append(f'{ip_prefix} {prefix}addr {operator} @N_{group_name}') + if ipv6: + output.append(f'{ip_prefix} {prefix}addr {operator} @N6_{group_name}') + else: + output.append(f'{ip_prefix} {prefix}addr {operator} @N_{group_name}') if 'mac_group' in group: group_name = group['mac_group'] operator = '' if group_name[0] == '!': operator = '!=' group_name = group_name[1:] output.append(f'ether {prefix}addr {operator} @M_{group_name}') if 'port_group' in group: proto = rule_conf['protocol'] group_name = group['port_group'] if proto == 'tcp_udp': proto = 'th' operator = '' if group_name[0] == '!': operator = '!=' group_name = group_name[1:] output.append(f'{proto} {prefix}port {operator} @P_{group_name}') output.append('counter') if 'log' in rule_conf: output.append(f'log prefix "[{log_prefix}{log_suffix}]"') if translation_str: output.append(translation_str) output.append(f'comment "{log_prefix}"') return " ".join(output) def parse_nat_static_rule(rule_conf, rule_id, nat_type): output = [] log_prefix = ('STATIC-DST' if nat_type == 'destination' else 'STATIC-SRC') + f'-NAT-{rule_id}' log_suffix = '' ignore_type_addr = False translation_str = '' if 'inbound_interface' in rule_conf: ifname = rule_conf['inbound_interface'] ifprefix = 'i' if nat_type == 'destination' else 'o' if ifname != 'any': output.append(f'{ifprefix}ifname "{ifname}"') if 'exclude' in rule_conf: translation_str = 'return' log_suffix = '-EXCL' elif 'translation' in rule_conf: translation_prefix = nat_type[:1] translation_output = [f'{translation_prefix}nat'] addr = dict_search_args(rule_conf, 'translation', 'address') map_addr = dict_search_args(rule_conf, 'destination', 'address') if nat_type == 'source': addr, map_addr = map_addr, addr # Swap if addr and is_ip_network(addr): translation_output.append(f'ip prefix to ip {translation_prefix}addr map {{ {map_addr} : {addr} }}') ignore_type_addr = True elif addr: translation_output.append(f'to {addr}') options = [] addr_mapping = dict_search_args(rule_conf, 'translation', 'options', 'address_mapping') port_mapping = dict_search_args(rule_conf, 'translation', 'options', 'port_mapping') if addr_mapping == 'persistent': options.append('persistent') if port_mapping and port_mapping != 'none': options.append(port_mapping) if options: translation_output.append(",".join(options)) translation_str = " ".join(translation_output) prefix = nat_type[:1] addr = dict_search_args(rule_conf, 'translation' if nat_type == 'source' else nat_type, 'address') if addr and not ignore_type_addr: output.append(f'ip {prefix}addr {addr}') output.append('counter') if 'log' in rule_conf: output.append(f'log prefix "[{log_prefix}{log_suffix}]"') if translation_str: output.append(translation_str) output.append(f'comment "{log_prefix}"') return " ".join(output) diff --git a/smoketest/scripts/cli/test_nat66.py b/smoketest/scripts/cli/test_nat66.py index e8eeae26f..52ad8e3ef 100755 --- a/smoketest/scripts/cli/test_nat66.py +++ b/smoketest/scripts/cli/test_nat66.py @@ -1,211 +1,241 @@ #!/usr/bin/env python3 # # Copyright (C) 2020-2024 VyOS maintainers and contributors # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License version 2 or later as # published by the Free Software Foundation. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. import unittest from base_vyostest_shim import VyOSUnitTestSHIM from vyos.configsession import ConfigSessionError base_path = ['nat66'] src_path = base_path + ['source'] dst_path = base_path + ['destination'] class TestNAT66(VyOSUnitTestSHIM.TestCase): @classmethod def setUpClass(cls): super(TestNAT66, cls).setUpClass() # ensure we can also run this test on a live system - so lets clean # out the current configuration :) cls.cli_delete(cls, base_path) def tearDown(self): self.cli_delete(base_path) self.cli_commit() def test_source_nat66(self): source_prefix = 'fc00::/64' translation_prefix = 'fc01::/64' self.cli_set(src_path + ['rule', '1', 'outbound-interface', 'name', 'eth1']) self.cli_set(src_path + ['rule', '1', 'source', 'prefix', source_prefix]) self.cli_set(src_path + ['rule', '1', 'translation', 'address', translation_prefix]) self.cli_set(src_path + ['rule', '2', 'outbound-interface', 'name', 'eth1']) self.cli_set(src_path + ['rule', '2', 'source', 'prefix', source_prefix]) self.cli_set(src_path + ['rule', '2', 'translation', 'address', 'masquerade']) self.cli_set(src_path + ['rule', '3', 'outbound-interface', 'name', 'eth1']) self.cli_set(src_path + ['rule', '3', 'source', 'prefix', source_prefix]) self.cli_set(src_path + ['rule', '3', 'exclude']) self.cli_commit() nftables_search = [ ['oifname "eth1"', f'ip6 saddr {source_prefix}', f'snat prefix to {translation_prefix}'], ['oifname "eth1"', f'ip6 saddr {source_prefix}', 'masquerade'], ['oifname "eth1"', f'ip6 saddr {source_prefix}', 'return'] ] self.verify_nftables(nftables_search, 'ip6 vyos_nat') def test_source_nat66_address(self): source_prefix = 'fc00::/64' translation_address = 'fc00::1' self.cli_set(src_path + ['rule', '1', 'outbound-interface', 'name', 'eth1']) self.cli_set(src_path + ['rule', '1', 'source', 'prefix', source_prefix]) self.cli_set(src_path + ['rule', '1', 'translation', 'address', translation_address]) # check validate() - outbound-interface must be defined self.cli_commit() nftables_search = [ ['oifname "eth1"', f'ip6 saddr {source_prefix}', f'snat to {translation_address}'] ] self.verify_nftables(nftables_search, 'ip6 vyos_nat') def test_destination_nat66(self): destination_address = 'fc00::1' translation_address = 'fc01::1' source_address = 'fc02::1' self.cli_set(dst_path + ['rule', '1', 'inbound-interface', 'name', 'eth1']) self.cli_set(dst_path + ['rule', '1', 'destination', 'address', destination_address]) self.cli_set(dst_path + ['rule', '1', 'translation', 'address', translation_address]) self.cli_set(dst_path + ['rule', '2', 'inbound-interface', 'name', 'eth1']) self.cli_set(dst_path + ['rule', '2', 'destination', 'address', destination_address]) self.cli_set(dst_path + ['rule', '2', 'source', 'address', source_address]) self.cli_set(dst_path + ['rule', '2', 'exclude']) # check validate() - outbound-interface must be defined self.cli_commit() nftables_search = [ ['iifname "eth1"', 'ip6 daddr fc00::1', 'dnat to fc01::1'], ['iifname "eth1"', 'ip6 saddr fc02::1', 'ip6 daddr fc00::1', 'return'] ] self.verify_nftables(nftables_search, 'ip6 vyos_nat') def test_destination_nat66_protocol(self): translation_address = '2001:db8:1111::1' source_prefix = '2001:db8:2222::/64' dport = '4545' sport = '8080' tport = '5555' proto = 'tcp' self.cli_set(dst_path + ['rule', '1', 'inbound-interface', 'name', 'eth1']) self.cli_set(dst_path + ['rule', '1', 'destination', 'port', dport]) self.cli_set(dst_path + ['rule', '1', 'source', 'address', source_prefix]) self.cli_set(dst_path + ['rule', '1', 'source', 'port', sport]) self.cli_set(dst_path + ['rule', '1', 'protocol', proto]) self.cli_set(dst_path + ['rule', '1', 'translation', 'address', translation_address]) self.cli_set(dst_path + ['rule', '1', 'translation', 'port', tport]) # check validate() - outbound-interface must be defined self.cli_commit() nftables_search = [ ['iifname "eth1"', 'tcp dport 4545', 'ip6 saddr 2001:db8:2222::/64', 'tcp sport 8080', 'dnat to [2001:db8:1111::1]:5555'] ] self.verify_nftables(nftables_search, 'ip6 vyos_nat') def test_destination_nat66_prefix(self): destination_prefix = 'fc00::/64' translation_prefix = 'fc01::/64' self.cli_set(dst_path + ['rule', '1', 'inbound-interface', 'name', 'eth1']) self.cli_set(dst_path + ['rule', '1', 'destination', 'address', destination_prefix]) self.cli_set(dst_path + ['rule', '1', 'translation', 'address', translation_prefix]) # check validate() - outbound-interface must be defined self.cli_commit() nftables_search = [ ['iifname "eth1"', f'ip6 daddr {destination_prefix}', f'dnat prefix to {translation_prefix}'] ] self.verify_nftables(nftables_search, 'ip6 vyos_nat') + def test_destination_nat66_network_group(self): + address_group = 'smoketest_addr' + address_group_member = 'fc00::1' + network_group = 'smoketest_net' + network_group_member = 'fc00::/64' + translation_prefix = 'fc01::/64' + + self.cli_set(['firewall', 'group', 'ipv6-address-group', address_group, 'address', address_group_member]) + self.cli_set(['firewall', 'group', 'ipv6-network-group', network_group, 'network', network_group_member]) + + self.cli_set(dst_path + ['rule', '1', 'destination', 'group', 'address-group', address_group]) + self.cli_set(dst_path + ['rule', '1', 'translation', 'address', translation_prefix]) + + self.cli_set(dst_path + ['rule', '2', 'destination', 'group', 'network-group', network_group]) + self.cli_set(dst_path + ['rule', '2', 'translation', 'address', translation_prefix]) + + self.cli_commit() + + nftables_search = [ + [f'set A6_{address_group}'], + [f'elements = {{ {address_group_member} }}'], + [f'set N6_{network_group}'], + [f'elements = {{ {network_group_member} }}'], + ['ip6 daddr', f'@A6_{address_group}', 'dnat prefix to fc01::/64'], + ['ip6 daddr', f'@N6_{network_group}', 'dnat prefix to fc01::/64'] + ] + + self.verify_nftables(nftables_search, 'ip6 vyos_nat') + + def test_destination_nat66_without_translation_address(self): self.cli_set(dst_path + ['rule', '1', 'inbound-interface', 'name', 'eth1']) self.cli_set(dst_path + ['rule', '1', 'destination', 'port', '443']) self.cli_set(dst_path + ['rule', '1', 'protocol', 'tcp']) self.cli_set(dst_path + ['rule', '1', 'translation', 'port', '443']) self.cli_commit() nftables_search = [ ['iifname "eth1"', 'tcp dport 443', 'dnat to :443'] ] self.verify_nftables(nftables_search, 'ip6 vyos_nat') def test_source_nat66_required_translation_prefix(self): # T2813: Ensure translation address is specified rule = '5' source_prefix = 'fc00::/64' self.cli_set(src_path + ['rule', rule, 'source', 'prefix', source_prefix]) # check validate() - outbound-interface must be defined with self.assertRaises(ConfigSessionError): self.cli_commit() self.cli_set(src_path + ['rule', rule, 'outbound-interface', 'name', 'eth0']) # check validate() - translation address not specified with self.assertRaises(ConfigSessionError): self.cli_commit() self.cli_set(src_path + ['rule', rule, 'translation', 'address', 'masquerade']) self.cli_commit() def test_source_nat66_protocol(self): translation_address = '2001:db8:1111::1' source_prefix = '2001:db8:2222::/64' dport = '9999' sport = '8080' tport = '80' proto = 'tcp' self.cli_set(src_path + ['rule', '1', 'outbound-interface', 'name', 'eth1']) self.cli_set(src_path + ['rule', '1', 'destination', 'port', dport]) self.cli_set(src_path + ['rule', '1', 'source', 'prefix', source_prefix]) self.cli_set(src_path + ['rule', '1', 'source', 'port', sport]) self.cli_set(src_path + ['rule', '1', 'protocol', proto]) self.cli_set(src_path + ['rule', '1', 'translation', 'address', translation_address]) self.cli_set(src_path + ['rule', '1', 'translation', 'port', tport]) # check validate() - outbound-interface must be defined self.cli_commit() nftables_search = [ ['oifname "eth1"', 'ip6 saddr 2001:db8:2222::/64', 'tcp dport 9999', 'tcp sport 8080', 'snat to [2001:db8:1111::1]:80'] ] self.verify_nftables(nftables_search, 'ip6 vyos_nat') def test_nat66_no_rules(self): # T3206: deleting all rules but keep the direction 'destination' or # 'source' resulteds in KeyError: 'rule'. # # Test that both 'nat destination' and 'nat source' nodes can exist # without any rule self.cli_set(src_path) self.cli_set(dst_path) self.cli_commit() if __name__ == '__main__': unittest.main(verbosity=2) diff --git a/src/conf_mode/nat66.py b/src/conf_mode/nat66.py index c44320f36..95dfae3a5 100755 --- a/src/conf_mode/nat66.py +++ b/src/conf_mode/nat66.py @@ -1,130 +1,150 @@ #!/usr/bin/env python3 # # Copyright (C) 2020-2024 VyOS maintainers and contributors # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License version 2 or later as # published by the Free Software Foundation. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. import os from sys import exit from vyos.base import Warning from vyos.config import Config from vyos.configdep import set_dependents, call_dependents from vyos.template import render from vyos.utils.dict import dict_search from vyos.utils.kernel import check_kmod from vyos.utils.network import interface_exists from vyos.utils.process import cmd +from vyos.utils.process import run from vyos.template import is_ipv6 from vyos import ConfigError from vyos import airbag airbag.enable() k_mod = ['nft_nat', 'nft_chain_nat'] nftables_nat66_config = '/run/nftables_nat66.nft' def get_config(config=None): if config: conf = config else: conf = Config() base = ['nat66'] nat = conf.get_config_dict(base, key_mangling=('-', '_'), get_first_key=True) set_dependents('conntrack', conf) if not conf.exists(base): nat['deleted'] = '' + return nat + + nat['firewall_group'] = conf.get_config_dict(['firewall', 'group'], key_mangling=('-', '_'), get_first_key=True, + no_tag_node_value_mangle=True) + + # Remove dynamic firewall groups if present: + if 'dynamic_group' in nat['firewall_group']: + del nat['firewall_group']['dynamic_group'] return nat def verify(nat): if not nat or 'deleted' in nat: # no need to verify the CLI as NAT66 is going to be deactivated return None if dict_search('source.rule', nat): for rule, config in dict_search('source.rule', nat).items(): err_msg = f'Source NAT66 configuration error in rule {rule}:' if 'outbound_interface' in config: if 'name' in config['outbound_interface'] and 'group' in config['outbound_interface']: raise ConfigError(f'{err_msg} cannot specify both interface group and interface name for nat source rule "{rule}"') elif 'name' in config['outbound_interface']: interface_name = config['outbound_interface']['name'] if interface_name not in 'any': if interface_name.startswith('!'): interface_name = interface_name[1:] if not interface_exists(interface_name): Warning(f'Interface "{interface_name}" for source NAT66 rule "{rule}" does not exist!') addr = dict_search('translation.address', config) if addr != None: if addr != 'masquerade' and not is_ipv6(addr): raise ConfigError(f'IPv6 address {addr} is not a valid address') else: if 'exclude' not in config: raise ConfigError(f'{err_msg} translation address not specified') prefix = dict_search('source.prefix', config) if prefix != None: if not is_ipv6(prefix): raise ConfigError(f'{err_msg} source-prefix not specified') if dict_search('destination.rule', nat): for rule, config in dict_search('destination.rule', nat).items(): err_msg = f'Destination NAT66 configuration error in rule {rule}:' if 'inbound_interface' in config: if 'name' in config['inbound_interface'] and 'group' in config['inbound_interface']: raise ConfigError(f'{err_msg} cannot specify both interface group and interface name for destination nat rule "{rule}"') elif 'name' in config['inbound_interface']: interface_name = config['inbound_interface']['name'] if interface_name not in 'any': if interface_name.startswith('!'): interface_name = interface_name[1:] if not interface_exists(interface_name): Warning(f'Interface "{interface_name}" for destination NAT66 rule "{rule}" does not exist!') + if 'destination' in config and 'group' in config['destination']: + if len({'address_group', 'network_group', 'domain_group'} & set(config['destination']['group'])) > 1: + raise ConfigError('Only one address-group, network-group or domain-group can be specified') + return None def generate(nat): if not os.path.exists(nftables_nat66_config): nat['first_install'] = True - render(nftables_nat66_config, 'firewall/nftables-nat66.j2', nat, permission=0o755) + render(nftables_nat66_config, 'firewall/nftables-nat66.j2', nat) + + # dry-run newly generated configuration + tmp = run(f'nft --check --file {nftables_nat66_config}') + if tmp > 0: + raise ConfigError('Configuration file errors encountered!') + return None def apply(nat): - if not nat: - return None - check_kmod(k_mod) cmd(f'nft --file {nftables_nat66_config}') + + if not nat or 'deleted' in nat: + os.unlink(nftables_nat66_config) + call_dependents() return None if __name__ == '__main__': try: c = get_config() verify(c) generate(c) apply(c) except ConfigError as e: print(e) exit(1)