diff --git a/data/config-mode-dependencies/vyos-1x.json b/data/config-mode-dependencies/vyos-1x.json index b0586e0bb..6ab36005b 100644 --- a/data/config-mode-dependencies/vyos-1x.json +++ b/data/config-mode-dependencies/vyos-1x.json @@ -1,56 +1,57 @@ { "system_conntrack": { - "conntrack_sync": ["service_conntrack-sync"] + "conntrack_sync": ["service_conntrack-sync"], + "vrf": ["vrf"] }, "firewall": { "conntrack": ["system_conntrack"], "group_resync": ["system_conntrack", "nat", "policy_route"] }, "interfaces_bonding": { "ethernet": ["interfaces_ethernet"] }, "interfaces_bridge": { "vxlan": ["interfaces_vxlan"] }, "load_balancing_wan": { "conntrack": ["system_conntrack"] }, "nat": { "conntrack": ["system_conntrack"] }, "nat66": { "conntrack": ["system_conntrack"] }, "pki": { "ethernet": ["interfaces_ethernet"], "openvpn": ["interfaces_openvpn"], "https": ["service_https"], "ipsec": ["vpn_ipsec"], "openconnect": ["vpn_openconnect"], "rpki": ["protocols_rpki"], "sstp": ["vpn_sstp"] }, "vpn_l2tp": { "ipsec": ["vpn_ipsec"] }, "qos": { "bonding": ["interfaces_bonding"], "bridge": ["interfaces_bridge"], "dummy": ["interfaces_dummy"], "ethernet": ["interfaces_ethernet"], "geneve": ["interfaces_geneve"], "input": ["interfaces_input"], "l2tpv3": ["interfaces_l2tpv3"], "loopback": ["interfaces_loopback"], "macsec": ["interfaces_macsec"], "openvpn": ["interfaces_openvpn"], "pppoe": ["interfaces_pppoe"], "pseudo-ethernet": ["interfaces_pseudo-ethernet"], "tunnel": ["interfaces_tunnel"], "vti": ["interfaces_vti"], "vxlan": ["interfaces_vxlan"], "wireguard": ["interfaces_wireguard"], "wireless": ["interfaces_wireless"], "wwan": ["interfaces_wwan"] } } diff --git a/data/vyos-firewall-init.conf b/data/vyos-firewall-init.conf index 5a4e03015..3929edf0b 100644 --- a/data/vyos-firewall-init.conf +++ b/data/vyos-firewall-init.conf @@ -1,75 +1,73 @@ #!/usr/sbin/nft -f # Required by wanloadbalance table ip nat { chain VYOS_PRE_SNAT_HOOK { type nat hook postrouting priority 99; policy accept; return } } table inet mangle { # Used by system flow-accounting chain FORWARD { type filter hook forward priority -150; policy accept; } } table raw { chain VYOS_TCP_MSS { type filter hook forward priority -300; policy accept; } chain vyos_global_rpfilter { return } chain vyos_rpfilter { type filter hook prerouting priority -300; policy accept; counter jump vyos_global_rpfilter } # Used by system flow-accounting chain VYOS_PREROUTING_HOOK { type filter hook prerouting priority -300; policy accept; } } table ip6 raw { chain VYOS_TCP_MSS { type filter hook forward priority -300; policy accept; } chain vyos_global_rpfilter { return } chain vyos_rpfilter { type filter hook prerouting priority -300; policy accept; counter jump vyos_global_rpfilter } # Used by system flow-accounting chain VYOS_PREROUTING_HOOK { type filter hook prerouting priority -300; policy accept; } } # Required by VRF table inet vrf_zones { # Map of interfaces and connections tracking zones map ct_iface_map { typeof iifname : ct zone } # Assign unique zones for each VRF # Chain for inbound traffic chain vrf_zones_ct_in { type filter hook prerouting priority raw; policy accept; - counter ct original zone set iifname map @ct_iface_map } # Chain for locally-generated traffic chain vrf_zones_ct_out { type filter hook output priority raw; policy accept; - counter ct original zone set oifname map @ct_iface_map } } diff --git a/python/vyos/firewall.py b/python/vyos/firewall.py index eee11bd2d..49e095946 100644 --- a/python/vyos/firewall.py +++ b/python/vyos/firewall.py @@ -1,645 +1,663 @@ #!/usr/bin/env python3 # # Copyright (C) 2021-2023 VyOS maintainers and contributors # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License version 2 or later as # published by the Free Software Foundation. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. import csv import gzip import os import re from pathlib import Path from socket import AF_INET from socket import AF_INET6 from socket import getaddrinfo from time import strftime from vyos.remote import download from vyos.template import is_ipv4 from vyos.template import render from vyos.utils.dict import dict_search_args from vyos.utils.dict import dict_search_recursive from vyos.utils.process import call from vyos.utils.process import cmd from vyos.utils.process import run +# Conntrack + +def conntrack_required(conf): + required_nodes = ['nat', 'nat66', 'load-balancing wan'] + + for path in required_nodes: + if conf.exists(path): + return True + + firewall = conf.get_config_dict(['firewall'], key_mangling=('-', '_'), + no_tag_node_value_mangle=True, get_first_key=True) + + for rules, path in dict_search_recursive(firewall, 'rule'): + if any(('state' in rule_conf or 'connection_status' in rule_conf or 'offload_target' in rule_conf) for rule_conf in rules.values()): + return True + + return False + # Domain Resolver def fqdn_config_parse(firewall): firewall['ip_fqdn'] = {} firewall['ip6_fqdn'] = {} for domain, path in dict_search_recursive(firewall, 'fqdn'): hook_name = path[1] priority = path[2] fw_name = path[2] rule = path[4] suffix = path[5][0] set_name = f'{hook_name}_{priority}_{rule}_{suffix}' if (path[0] == 'ipv4') and (path[1] == 'forward' or path[1] == 'input' or path[1] == 'output' or path[1] == 'name'): firewall['ip_fqdn'][set_name] = domain elif (path[0] == 'ipv6') and (path[1] == 'forward' or path[1] == 'input' or path[1] == 'output' or path[1] == 'name'): if path[1] == 'name': set_name = f'name6_{priority}_{rule}_{suffix}' firewall['ip6_fqdn'][set_name] = domain def fqdn_resolve(fqdn, ipv6=False): try: res = getaddrinfo(fqdn, None, AF_INET6 if ipv6 else AF_INET) return set(item[4][0] for item in res) except: return None # End Domain Resolver def find_nftables_rule(table, chain, rule_matches=[]): # Find rule in table/chain that matches all criteria and return the handle results = cmd(f'sudo nft -a list chain {table} {chain}').split("\n") for line in results: if all(rule_match in line for rule_match in rule_matches): handle_search = re.search('handle (\d+)', line) if handle_search: return handle_search[1] return None def remove_nftables_rule(table, chain, handle): cmd(f'sudo nft delete rule {table} {chain} handle {handle}') # Functions below used by template generation def nft_action(vyos_action): if vyos_action == 'accept': return 'return' return vyos_action def parse_rule(rule_conf, hook, fw_name, rule_id, ip_name): output = [] if ip_name == 'ip6': def_suffix = '6' family = 'ipv6' else: def_suffix = '' family = 'bri' if ip_name == 'bri' else 'ipv4' if 'state' in rule_conf and rule_conf['state']: states = ",".join([s for s in rule_conf['state']]) if states: output.append(f'ct state {{{states}}}') if 'conntrack_helper' in rule_conf: helper_map = {'h323': ['RAS', 'Q.931'], 'nfs': ['rpc'], 'sqlnet': ['tns']} helper_out = [] for helper in rule_conf['conntrack_helper']: if helper in helper_map: helper_out.extend(helper_map[helper]) else: helper_out.append(helper) if helper_out: helper_str = ','.join(f'"{s}"' for s in helper_out) output.append(f'ct helper {{{helper_str}}}') if 'connection_status' in rule_conf and rule_conf['connection_status']: status = rule_conf['connection_status'] if status['nat'] == 'destination': nat_status = '{dnat}' output.append(f'ct status {nat_status}') if status['nat'] == 'source': nat_status = '{snat}' output.append(f'ct status {nat_status}') if 'protocol' in rule_conf and rule_conf['protocol'] != 'all': proto = rule_conf['protocol'] operator = '' if proto[0] == '!': operator = '!=' proto = proto[1:] if proto == 'tcp_udp': proto = '{tcp, udp}' output.append(f'meta l4proto {operator} {proto}') for side in ['destination', 'source']: if side in rule_conf: prefix = side[0] side_conf = rule_conf[side] address_mask = side_conf.get('address_mask', None) if 'address' in side_conf: suffix = side_conf['address'] operator = '' exclude = suffix[0] == '!' if exclude: operator = '!= ' suffix = suffix[1:] if address_mask: operator = '!=' if exclude else '==' operator = f'& {address_mask} {operator} ' output.append(f'{ip_name} {prefix}addr {operator}{suffix}') if 'fqdn' in side_conf: fqdn = side_conf['fqdn'] hook_name = '' operator = '' if fqdn[0] == '!': operator = '!=' if hook == 'FWD': hook_name = 'forward' if hook == 'INP': hook_name = 'input' if hook == 'OUT': hook_name = 'output' if hook == 'NAM': hook_name = f'name{def_suffix}' output.append(f'{ip_name} {prefix}addr {operator} @FQDN_{hook_name}_{fw_name}_{rule_id}_{prefix}') if dict_search_args(side_conf, 'geoip', 'country_code'): operator = '' hook_name = '' if dict_search_args(side_conf, 'geoip', 'inverse_match') != None: operator = '!=' if hook == 'FWD': hook_name = 'forward' if hook == 'INP': hook_name = 'input' if hook == 'OUT': hook_name = 'output' if hook == 'NAM': hook_name = f'name' output.append(f'{ip_name} {prefix}addr {operator} @GEOIP_CC{def_suffix}_{hook_name}_{fw_name}_{rule_id}') if 'mac_address' in side_conf: suffix = side_conf["mac_address"] if suffix[0] == '!': suffix = f'!= {suffix[1:]}' output.append(f'ether {prefix}addr {suffix}') if 'port' in side_conf: proto = rule_conf['protocol'] port = side_conf['port'].split(',') ports = [] negated_ports = [] for p in port: if p[0] == '!': negated_ports.append(p[1:]) else: ports.append(p) if proto == 'tcp_udp': proto = 'th' if ports: ports_str = ','.join(ports) output.append(f'{proto} {prefix}port {{{ports_str}}}') if negated_ports: negated_ports_str = ','.join(negated_ports) output.append(f'{proto} {prefix}port != {{{negated_ports_str}}}') if 'group' in side_conf: group = side_conf['group'] if 'address_group' in group: group_name = group['address_group'] operator = '' exclude = group_name[0] == "!" if exclude: operator = '!=' group_name = group_name[1:] if address_mask: operator = '!=' if exclude else '==' operator = f'& {address_mask} {operator}' output.append(f'{ip_name} {prefix}addr {operator} @A{def_suffix}_{group_name}') elif 'dynamic_address_group' in group: group_name = group['dynamic_address_group'] operator = '' exclude = group_name[0] == "!" if exclude: operator = '!=' group_name = group_name[1:] output.append(f'{ip_name} {prefix}addr {operator} @DA{def_suffix}_{group_name}') # Generate firewall group domain-group elif 'domain_group' in group: group_name = group['domain_group'] operator = '' if group_name[0] == '!': operator = '!=' group_name = group_name[1:] output.append(f'{ip_name} {prefix}addr {operator} @D_{group_name}') elif 'network_group' in group: group_name = group['network_group'] operator = '' if group_name[0] == '!': operator = '!=' group_name = group_name[1:] output.append(f'{ip_name} {prefix}addr {operator} @N{def_suffix}_{group_name}') if 'mac_group' in group: group_name = group['mac_group'] operator = '' if group_name[0] == '!': operator = '!=' group_name = group_name[1:] output.append(f'ether {prefix}addr {operator} @M_{group_name}') if 'port_group' in group: proto = rule_conf['protocol'] group_name = group['port_group'] if proto == 'tcp_udp': proto = 'th' operator = '' if group_name[0] == '!': operator = '!=' group_name = group_name[1:] output.append(f'{proto} {prefix}port {operator} @P_{group_name}') if dict_search_args(rule_conf, 'action') == 'synproxy': output.append('ct state invalid,untracked') if 'hop_limit' in rule_conf: operators = {'eq': '==', 'gt': '>', 'lt': '<'} for op, operator in operators.items(): if op in rule_conf['hop_limit']: value = rule_conf['hop_limit'][op] output.append(f'ip6 hoplimit {operator} {value}') if 'inbound_interface' in rule_conf: operator = '' if 'name' in rule_conf['inbound_interface']: iiface = rule_conf['inbound_interface']['name'] if iiface[0] == '!': operator = '!=' iiface = iiface[1:] output.append(f'iifname {operator} {{{iiface}}}') elif 'group' in rule_conf['inbound_interface']: iiface = rule_conf['inbound_interface']['group'] if iiface[0] == '!': operator = '!=' iiface = iiface[1:] output.append(f'iifname {operator} @I_{iiface}') if 'outbound_interface' in rule_conf: operator = '' if 'name' in rule_conf['outbound_interface']: oiface = rule_conf['outbound_interface']['name'] if oiface[0] == '!': operator = '!=' oiface = oiface[1:] output.append(f'oifname {operator} {{{oiface}}}') elif 'group' in rule_conf['outbound_interface']: oiface = rule_conf['outbound_interface']['group'] if oiface[0] == '!': operator = '!=' oiface = oiface[1:] output.append(f'oifname {operator} @I_{oiface}') if 'ttl' in rule_conf: operators = {'eq': '==', 'gt': '>', 'lt': '<'} for op, operator in operators.items(): if op in rule_conf['ttl']: value = rule_conf['ttl'][op] output.append(f'ip ttl {operator} {value}') for icmp in ['icmp', 'icmpv6']: if icmp in rule_conf: if 'type_name' in rule_conf[icmp]: output.append(icmp + ' type ' + rule_conf[icmp]['type_name']) else: if 'code' in rule_conf[icmp]: output.append(icmp + ' code ' + rule_conf[icmp]['code']) if 'type' in rule_conf[icmp]: output.append(icmp + ' type ' + rule_conf[icmp]['type']) if 'packet_length' in rule_conf: lengths_str = ','.join(rule_conf['packet_length']) output.append(f'ip{def_suffix} length {{{lengths_str}}}') if 'packet_length_exclude' in rule_conf: negated_lengths_str = ','.join(rule_conf['packet_length_exclude']) output.append(f'ip{def_suffix} length != {{{negated_lengths_str}}}') if 'packet_type' in rule_conf: output.append(f'pkttype ' + rule_conf['packet_type']) if 'dscp' in rule_conf: dscp_str = ','.join(rule_conf['dscp']) output.append(f'ip{def_suffix} dscp {{{dscp_str}}}') if 'dscp_exclude' in rule_conf: negated_dscp_str = ','.join(rule_conf['dscp_exclude']) output.append(f'ip{def_suffix} dscp != {{{negated_dscp_str}}}') if 'ipsec' in rule_conf: if 'match_ipsec' in rule_conf['ipsec']: output.append('meta ipsec == 1') if 'match_none' in rule_conf['ipsec']: output.append('meta ipsec == 0') if 'fragment' in rule_conf: # Checking for fragmentation after priority -400 is not possible, # so we use a priority -450 hook to set a mark if 'match_frag' in rule_conf['fragment']: output.append('meta mark 0xffff1') if 'match_non_frag' in rule_conf['fragment']: output.append('meta mark != 0xffff1') if 'limit' in rule_conf: if 'rate' in rule_conf['limit']: output.append(f'limit rate {rule_conf["limit"]["rate"]}') if 'burst' in rule_conf['limit']: output.append(f'burst {rule_conf["limit"]["burst"]} packets') if 'recent' in rule_conf: count = rule_conf['recent']['count'] time = rule_conf['recent']['time'] output.append(f'add @RECENT{def_suffix}_{hook}_{fw_name}_{rule_id} {{ {ip_name} saddr limit rate over {count}/{time} burst {count} packets }}') if 'time' in rule_conf: output.append(parse_time(rule_conf['time'])) tcp_flags = dict_search_args(rule_conf, 'tcp', 'flags') if tcp_flags: output.append(parse_tcp_flags(tcp_flags)) # TCP MSS tcp_mss = dict_search_args(rule_conf, 'tcp', 'mss') if tcp_mss: output.append(f'tcp option maxseg size {tcp_mss}') if 'connection_mark' in rule_conf: conn_mark_str = ','.join(rule_conf['connection_mark']) output.append(f'ct mark {{{conn_mark_str}}}') if 'mark' in rule_conf: mark = rule_conf['mark'] operator = '' if mark[0] == '!': operator = '!=' mark = mark[1:] output.append(f'meta mark {operator} {{{mark}}}') if 'vlan' in rule_conf: if 'id' in rule_conf['vlan']: output.append(f'vlan id {rule_conf["vlan"]["id"]}') if 'priority' in rule_conf['vlan']: output.append(f'vlan pcp {rule_conf["vlan"]["priority"]}') if 'log' in rule_conf: action = rule_conf['action'] if 'action' in rule_conf else 'accept' #output.append(f'log prefix "[{fw_name[:19]}-{rule_id}-{action[:1].upper()}]"') output.append(f'log prefix "[{family}-{hook}-{fw_name}-{rule_id}-{action[:1].upper()}]"') ##{family}-{hook}-{fw_name}-{rule_id} if 'log_options' in rule_conf: if 'level' in rule_conf['log_options']: log_level = rule_conf['log_options']['level'] output.append(f'log level {log_level}') if 'group' in rule_conf['log_options']: log_group = rule_conf['log_options']['group'] output.append(f'log group {log_group}') if 'queue_threshold' in rule_conf['log_options']: queue_threshold = rule_conf['log_options']['queue_threshold'] output.append(f'queue-threshold {queue_threshold}') if 'snapshot_length' in rule_conf['log_options']: log_snaplen = rule_conf['log_options']['snapshot_length'] output.append(f'snaplen {log_snaplen}') output.append('counter') if 'add_address_to_group' in rule_conf: for side in ['destination_address', 'source_address']: if side in rule_conf['add_address_to_group']: prefix = side[0] side_conf = rule_conf['add_address_to_group'][side] dyn_group = side_conf['address_group'] if 'timeout' in side_conf: timeout_value = side_conf['timeout'] output.append(f'set update ip{def_suffix} {prefix}addr timeout {timeout_value} @DA{def_suffix}_{dyn_group}') else: output.append(f'set update ip{def_suffix} saddr @DA{def_suffix}_{dyn_group}') if 'set' in rule_conf: output.append(parse_policy_set(rule_conf['set'], def_suffix)) if 'action' in rule_conf: # Change action=return to action=action # #output.append(nft_action(rule_conf['action'])) if rule_conf['action'] == 'offload': offload_target = rule_conf['offload_target'] output.append(f'flow add @VYOS_FLOWTABLE_{offload_target}') else: output.append(f'{rule_conf["action"]}') if 'jump' in rule_conf['action']: target = rule_conf['jump_target'] output.append(f'NAME{def_suffix}_{target}') if 'queue' in rule_conf['action']: if 'queue' in rule_conf: target = rule_conf['queue'] output.append(f'num {target}') if 'queue_options' in rule_conf: queue_opts = ','.join(rule_conf['queue_options']) output.append(f'{queue_opts}') # Synproxy if 'synproxy' in rule_conf: synproxy_mss = dict_search_args(rule_conf, 'synproxy', 'tcp', 'mss') if synproxy_mss: output.append(f'mss {synproxy_mss}') synproxy_ws = dict_search_args(rule_conf, 'synproxy', 'tcp', 'window_scale') if synproxy_ws: output.append(f'wscale {synproxy_ws} timestamp sack-perm') else: output.append('return') output.append(f'comment "{family}-{hook}-{fw_name}-{rule_id}"') return " ".join(output) def parse_tcp_flags(flags): include = [flag for flag in flags if flag != 'not'] exclude = list(flags['not']) if 'not' in flags else [] return f'tcp flags & ({"|".join(include + exclude)}) == {"|".join(include) if include else "0x0"}' def parse_time(time): out = [] if 'startdate' in time: start = time['startdate'] if 'T' not in start and 'starttime' in time: start += f' {time["starttime"]}' out.append(f'time >= "{start}"') if 'starttime' in time and 'startdate' not in time: out.append(f'hour >= "{time["starttime"]}"') if 'stopdate' in time: stop = time['stopdate'] if 'T' not in stop and 'stoptime' in time: stop += f' {time["stoptime"]}' out.append(f'time < "{stop}"') if 'stoptime' in time and 'stopdate' not in time: out.append(f'hour < "{time["stoptime"]}"') if 'weekdays' in time: days = time['weekdays'].split(",") out_days = [f'"{day}"' for day in days if day[0] != '!'] out.append(f'day {{{",".join(out_days)}}}') return " ".join(out) def parse_policy_set(set_conf, def_suffix): out = [] if 'connection_mark' in set_conf: conn_mark = set_conf['connection_mark'] out.append(f'ct mark set {conn_mark}') if 'dscp' in set_conf: dscp = set_conf['dscp'] out.append(f'ip{def_suffix} dscp set {dscp}') if 'mark' in set_conf: mark = set_conf['mark'] out.append(f'meta mark set {mark}') if 'table' in set_conf: table = set_conf['table'] if table == 'main': table = '254' mark = 0x7FFFFFFF - int(table) out.append(f'meta mark set {mark}') if 'tcp_mss' in set_conf: mss = set_conf['tcp_mss'] out.append(f'tcp option maxseg size set {mss}') return " ".join(out) # GeoIP nftables_geoip_conf = '/run/nftables-geoip.conf' geoip_database = '/usr/share/vyos-geoip/dbip-country-lite.csv.gz' geoip_lock_file = '/run/vyos-geoip.lock' def geoip_load_data(codes=[]): data = None if not os.path.exists(geoip_database): return [] try: with gzip.open(geoip_database, mode='rt') as csv_fh: reader = csv.reader(csv_fh) out = [] for start, end, code in reader: if code.lower() in codes: out.append([start, end, code.lower()]) return out except: print('Error: Failed to open GeoIP database') return [] def geoip_download_data(): url = 'https://download.db-ip.com/free/dbip-country-lite-{}.csv.gz'.format(strftime("%Y-%m")) try: dirname = os.path.dirname(geoip_database) if not os.path.exists(dirname): os.mkdir(dirname) download(geoip_database, url) print("Downloaded GeoIP database") return True except: print("Error: Failed to download GeoIP database") return False class GeoIPLock(object): def __init__(self, file): self.file = file def __enter__(self): if os.path.exists(self.file): return False Path(self.file).touch() return True def __exit__(self, exc_type, exc_value, tb): os.unlink(self.file) def geoip_update(firewall, force=False): with GeoIPLock(geoip_lock_file) as lock: if not lock: print("Script is already running") return False if not firewall: print("Firewall is not configured") return True if not os.path.exists(geoip_database): if not geoip_download_data(): return False elif force: geoip_download_data() ipv4_codes = {} ipv6_codes = {} ipv4_sets = {} ipv6_sets = {} # Map country codes to set names for codes, path in dict_search_recursive(firewall, 'country_code'): set_name = f'GEOIP_CC_{path[1]}_{path[2]}_{path[4]}' if ( path[0] == 'ipv4'): for code in codes: ipv4_codes.setdefault(code, []).append(set_name) elif ( path[0] == 'ipv6' ): set_name = f'GEOIP_CC6_{path[1]}_{path[2]}_{path[4]}' for code in codes: ipv6_codes.setdefault(code, []).append(set_name) if not ipv4_codes and not ipv6_codes: if force: print("GeoIP not in use by firewall") return True geoip_data = geoip_load_data([*ipv4_codes, *ipv6_codes]) # Iterate IP blocks to assign to sets for start, end, code in geoip_data: ipv4 = is_ipv4(start) if code in ipv4_codes and ipv4: ip_range = f'{start}-{end}' if start != end else start for setname in ipv4_codes[code]: ipv4_sets.setdefault(setname, []).append(ip_range) if code in ipv6_codes and not ipv4: ip_range = f'{start}-{end}' if start != end else start for setname in ipv6_codes[code]: ipv6_sets.setdefault(setname, []).append(ip_range) render(nftables_geoip_conf, 'firewall/nftables-geoip-update.j2', { 'ipv4_sets': ipv4_sets, 'ipv6_sets': ipv6_sets }) result = run(f'nft -f {nftables_geoip_conf}') if result != 0: print('Error: GeoIP failed to update firewall') return False return True diff --git a/smoketest/scripts/cli/base_vyostest_shim.py b/smoketest/scripts/cli/base_vyostest_shim.py index 140642806..c49d3e76c 100644 --- a/smoketest/scripts/cli/base_vyostest_shim.py +++ b/smoketest/scripts/cli/base_vyostest_shim.py @@ -1,117 +1,140 @@ # Copyright (C) 2021-2023 VyOS maintainers and contributors # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License version 2 or later as # published by the Free Software Foundation. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. import os import unittest import paramiko from time import sleep from typing import Type from vyos.configsession import ConfigSession from vyos.configsession import ConfigSessionError from vyos import ConfigError from vyos.defaults import commit_lock from vyos.utils.process import cmd from vyos.utils.process import run save_config = '/tmp/vyos-smoketest-save' # This class acts as shim between individual Smoketests developed for VyOS and # the Python UnitTest framework. Before every test is loaded, we dump the current # system configuration and reload it after the test - despite the test results. # # Using this approach we can not render a live system useless while running any # kind of smoketest. In addition it adds debug capabilities like printing the # command used to execute the test. class VyOSUnitTestSHIM: class TestCase(unittest.TestCase): # if enabled in derived class, print out each and every set/del command # on the CLI. This is usefull to grap all the commands required to # trigger the certain failure condition. # Use "self.debug = True" in derived classes setUp() method debug = False @classmethod def setUpClass(cls): cls._session = ConfigSession(os.getpid()) cls._session.save_config(save_config) pass @classmethod def tearDownClass(cls): # discard any pending changes which might caused a messed up config cls._session.discard() # ... and restore the initial state cls._session.migrate_and_load_config(save_config) try: cls._session.commit() except (ConfigError, ConfigSessionError): cls._session.discard() cls.fail(cls) def cli_set(self, config): if self.debug: print('set ' + ' '.join(config)) self._session.set(config) def cli_delete(self, config): if self.debug: print('del ' + ' '.join(config)) self._session.delete(config) def cli_commit(self): self._session.commit() # during a commit there is a process opening commit_lock, and run() returns 0 while run(f'sudo lsof -nP {commit_lock}') == 0: sleep(0.250) def getFRRconfig(self, string=None, end='$', endsection='^!', daemon=''): """ Retrieve current "running configuration" from FRR """ command = f'vtysh -c "show run {daemon} no-header"' if string: command += f' | sed -n "/^{string}{end}/,/{endsection}/p"' out = cmd(command) if self.debug: import pprint print(f'\n\ncommand "{command}" returned:\n') pprint.pprint(out) return out @staticmethod def ssh_send_cmd(command, username, password, hostname='localhost'): """ SSH command execution helper """ # Try to login via SSH ssh_client = paramiko.SSHClient() ssh_client.set_missing_host_key_policy(paramiko.AutoAddPolicy()) ssh_client.connect(hostname=hostname, username=username, password=password) _, stdout, stderr = ssh_client.exec_command(command) output = stdout.read().decode().strip() error = stderr.read().decode().strip() ssh_client.close() return output, error + # Verify nftables output + def verify_nftables(self, nftables_search, table, inverse=False, args=''): + nftables_output = cmd(f'sudo nft {args} list table {table}') + + for search in nftables_search: + matched = False + for line in nftables_output.split("\n"): + if all(item in line for item in search): + matched = True + break + self.assertTrue(not matched if inverse else matched, msg=search) + + def verify_nftables_chain(self, nftables_search, table, chain, inverse=False, args=''): + nftables_output = cmd(f'sudo nft {args} list chain {table} {chain}') + + for search in nftables_search: + matched = False + for line in nftables_output.split("\n"): + if all(item in line for item in search): + matched = True + break + self.assertTrue(not matched if inverse else matched, msg=search) + # standard construction; typing suggestion: https://stackoverflow.com/a/70292317 def ignore_warning(warning: Type[Warning]): import warnings from functools import wraps def inner(f): @wraps(f) def wrapped(*args, **kwargs): with warnings.catch_warnings(): warnings.simplefilter("ignore", category=warning) return f(*args, **kwargs) return wrapped return inner diff --git a/smoketest/scripts/cli/test_firewall.py b/smoketest/scripts/cli/test_firewall.py index bc2848492..be5960bbd 100755 --- a/smoketest/scripts/cli/test_firewall.py +++ b/smoketest/scripts/cli/test_firewall.py @@ -1,891 +1,867 @@ #!/usr/bin/env python3 # # Copyright (C) 2021-2023 VyOS maintainers and contributors # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License version 2 or later as # published by the Free Software Foundation. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. import unittest from glob import glob from time import sleep from base_vyostest_shim import VyOSUnitTestSHIM from vyos.configsession import ConfigSessionError -from vyos.utils.process import cmd from vyos.utils.process import run sysfs_config = { 'all_ping': {'sysfs': '/proc/sys/net/ipv4/icmp_echo_ignore_all', 'default': '0', 'test_value': 'disable'}, 'broadcast_ping': {'sysfs': '/proc/sys/net/ipv4/icmp_echo_ignore_broadcasts', 'default': '1', 'test_value': 'enable'}, 'ip_src_route': {'sysfs': '/proc/sys/net/ipv4/conf/*/accept_source_route', 'default': '0', 'test_value': 'enable'}, 'ipv6_receive_redirects': {'sysfs': '/proc/sys/net/ipv6/conf/*/accept_redirects', 'default': '0', 'test_value': 'enable'}, 'ipv6_src_route': {'sysfs': '/proc/sys/net/ipv6/conf/*/accept_source_route', 'default': '-1', 'test_value': 'enable'}, 'log_martians': {'sysfs': '/proc/sys/net/ipv4/conf/all/log_martians', 'default': '1', 'test_value': 'disable'}, 'receive_redirects': {'sysfs': '/proc/sys/net/ipv4/conf/*/accept_redirects', 'default': '0', 'test_value': 'enable'}, 'send_redirects': {'sysfs': '/proc/sys/net/ipv4/conf/*/send_redirects', 'default': '1', 'test_value': 'disable'}, 'syn_cookies': {'sysfs': '/proc/sys/net/ipv4/tcp_syncookies', 'default': '1', 'test_value': 'disable'}, 'twa_hazards_protection': {'sysfs': '/proc/sys/net/ipv4/tcp_rfc1337', 'default': '0', 'test_value': 'enable'} } class TestFirewall(VyOSUnitTestSHIM.TestCase): @classmethod def setUpClass(cls): super(TestFirewall, cls).setUpClass() # ensure we can also run this test on a live system - so lets clean # out the current configuration :) cls.cli_delete(cls, ['firewall']) @classmethod def tearDownClass(cls): super(TestFirewall, cls).tearDownClass() def tearDown(self): self.cli_delete(['firewall']) self.cli_commit() # Verify chains/sets are cleaned up from nftables nftables_search = [ ['set M_smoketest_mac'], ['set N_smoketest_network'], ['set P_smoketest_port'], ['set D_smoketest_domain'], ['set RECENT_smoketest_4'], ['chain NAME_smoketest'] ] self.verify_nftables(nftables_search, 'ip vyos_filter', inverse=True) - def verify_nftables(self, nftables_search, table, inverse=False, args=''): - nftables_output = cmd(f'sudo nft {args} list table {table}') - - for search in nftables_search: - matched = False - for line in nftables_output.split("\n"): - if all(item in line for item in search): - matched = True - break - self.assertTrue(not matched if inverse else matched, msg=search) - - def verify_nftables_chain(self, nftables_search, table, chain, inverse=False, args=''): - nftables_output = cmd(f'sudo nft {args} list chain {table} {chain}') - - for search in nftables_search: - matched = False - for line in nftables_output.split("\n"): - if all(item in line for item in search): - matched = True - break - self.assertTrue(not matched if inverse else matched, msg=search) - def wait_for_domain_resolver(self, table, set_name, element, max_wait=10): # Resolver no longer blocks commit, need to wait for daemon to populate set count = 0 while count < max_wait: code = run(f'sudo nft get element {table} {set_name} {{ {element} }}') if code == 0: return True count += 1 sleep(1) return False def test_geoip(self): self.cli_set(['firewall', 'ipv4', 'name', 'smoketest', 'rule', '1', 'action', 'drop']) self.cli_set(['firewall', 'ipv4', 'name', 'smoketest', 'rule', '1', 'source', 'geoip', 'country-code', 'se']) self.cli_set(['firewall', 'ipv4', 'name', 'smoketest', 'rule', '1', 'source', 'geoip', 'country-code', 'gb']) self.cli_set(['firewall', 'ipv4', 'name', 'smoketest', 'rule', '2', 'action', 'accept']) self.cli_set(['firewall', 'ipv4', 'name', 'smoketest', 'rule', '2', 'source', 'geoip', 'country-code', 'de']) self.cli_set(['firewall', 'ipv4', 'name', 'smoketest', 'rule', '2', 'source', 'geoip', 'country-code', 'fr']) self.cli_set(['firewall', 'ipv4', 'name', 'smoketest', 'rule', '2', 'source', 'geoip', 'inverse-match']) self.cli_commit() nftables_search = [ ['ip saddr @GEOIP_CC_name_smoketest_1', 'drop'], ['ip saddr != @GEOIP_CC_name_smoketest_2', 'accept'] ] # -t prevents 1000+ GeoIP elements being returned self.verify_nftables(nftables_search, 'ip vyos_filter', args='-t') def test_groups(self): hostmap_path = ['system', 'static-host-mapping', 'host-name'] example_org = ['192.0.2.8', '192.0.2.10', '192.0.2.11'] self.cli_set(hostmap_path + ['example.com', 'inet', '192.0.2.5']) for ips in example_org: self.cli_set(hostmap_path + ['example.org', 'inet', ips]) self.cli_commit() self.cli_set(['firewall', 'group', 'mac-group', 'smoketest_mac', 'mac-address', '00:01:02:03:04:05']) self.cli_set(['firewall', 'group', 'network-group', 'smoketest_network', 'network', '172.16.99.0/24']) self.cli_set(['firewall', 'group', 'port-group', 'smoketest_port', 'port', '53']) self.cli_set(['firewall', 'group', 'port-group', 'smoketest_port', 'port', '123']) self.cli_set(['firewall', 'group', 'domain-group', 'smoketest_domain', 'address', 'example.com']) self.cli_set(['firewall', 'group', 'domain-group', 'smoketest_domain', 'address', 'example.org']) self.cli_set(['firewall', 'group', 'interface-group', 'smoketest_interface', 'interface', 'eth0']) self.cli_set(['firewall', 'group', 'interface-group', 'smoketest_interface', 'interface', 'vtun0']) self.cli_set(['firewall', 'ipv4', 'forward', 'filter', 'rule', '1', 'action', 'accept']) self.cli_set(['firewall', 'ipv4', 'forward', 'filter', 'rule', '1', 'source', 'group', 'network-group', 'smoketest_network']) self.cli_set(['firewall', 'ipv4', 'forward', 'filter', 'rule', '1', 'destination', 'address', '172.16.10.10']) self.cli_set(['firewall', 'ipv4', 'forward', 'filter', 'rule', '1', 'destination', 'group', 'port-group', 'smoketest_port']) self.cli_set(['firewall', 'ipv4', 'forward', 'filter', 'rule', '1', 'protocol', 'tcp_udp']) self.cli_set(['firewall', 'ipv4', 'forward', 'filter', 'rule', '2', 'action', 'accept']) self.cli_set(['firewall', 'ipv4', 'forward', 'filter', 'rule', '2', 'source', 'group', 'mac-group', 'smoketest_mac']) self.cli_set(['firewall', 'ipv4', 'forward', 'filter', 'rule', '3', 'action', 'accept']) self.cli_set(['firewall', 'ipv4', 'forward', 'filter', 'rule', '3', 'source', 'group', 'domain-group', 'smoketest_domain']) self.cli_set(['firewall', 'ipv4', 'forward', 'filter', 'rule', '4', 'action', 'accept']) self.cli_set(['firewall', 'ipv4', 'forward', 'filter', 'rule', '4', 'outbound-interface', 'group', '!smoketest_interface']) self.cli_commit() self.wait_for_domain_resolver('ip vyos_filter', 'D_smoketest_domain', '192.0.2.5') nftables_search = [ ['ip saddr @N_smoketest_network', 'ip daddr 172.16.10.10', 'th dport @P_smoketest_port', 'accept'], ['elements = { 172.16.99.0/24 }'], ['elements = { 53, 123 }'], ['ether saddr @M_smoketest_mac', 'accept'], ['elements = { 00:01:02:03:04:05 }'], ['set D_smoketest_domain'], ['elements = { 192.0.2.5, 192.0.2.8,'], ['192.0.2.10, 192.0.2.11 }'], ['ip saddr @D_smoketest_domain', 'accept'], ['oifname != @I_smoketest_interface', 'accept'] ] self.verify_nftables(nftables_search, 'ip vyos_filter') self.cli_delete(['system', 'static-host-mapping']) self.cli_commit() def test_nested_groups(self): self.cli_set(['firewall', 'group', 'network-group', 'smoketest_network', 'network', '172.16.99.0/24']) self.cli_set(['firewall', 'group', 'network-group', 'smoketest_network1', 'network', '172.16.101.0/24']) self.cli_set(['firewall', 'group', 'network-group', 'smoketest_network1', 'include', 'smoketest_network']) self.cli_set(['firewall', 'group', 'port-group', 'smoketest_port', 'port', '53']) self.cli_set(['firewall', 'group', 'port-group', 'smoketest_port1', 'port', '123']) self.cli_set(['firewall', 'group', 'port-group', 'smoketest_port1', 'include', 'smoketest_port']) self.cli_set(['firewall', 'ipv4', 'name', 'smoketest', 'rule', '1', 'action', 'accept']) self.cli_set(['firewall', 'ipv4', 'name', 'smoketest', 'rule', '1', 'source', 'group', 'network-group', 'smoketest_network1']) self.cli_set(['firewall', 'ipv4', 'name', 'smoketest', 'rule', '1', 'destination', 'group', 'port-group', 'smoketest_port1']) self.cli_set(['firewall', 'ipv4', 'name', 'smoketest', 'rule', '1', 'protocol', 'tcp_udp']) self.cli_commit() # Test circular includes self.cli_set(['firewall', 'group', 'network-group', 'smoketest_network', 'include', 'smoketest_network1']) with self.assertRaises(ConfigSessionError): self.cli_commit() self.cli_delete(['firewall', 'group', 'network-group', 'smoketest_network', 'include', 'smoketest_network1']) nftables_search = [ ['ip saddr @N_smoketest_network1', 'th dport @P_smoketest_port1', 'accept'], ['elements = { 172.16.99.0/24, 172.16.101.0/24 }'], ['elements = { 53, 123 }'] ] self.verify_nftables(nftables_search, 'ip vyos_filter') def test_ipv4_basic_rules(self): name = 'smoketest' interface = 'eth0' interface_inv = '!eth0' interface_wc = 'l2tp*' mss_range = '501-1460' conn_mark = '555' self.cli_set(['firewall', 'ipv4', 'name', name, 'default-action', 'drop']) self.cli_set(['firewall', 'ipv4', 'name', name, 'default-log']) self.cli_set(['firewall', 'ipv4', 'name', name, 'rule', '1', 'action', 'accept']) self.cli_set(['firewall', 'ipv4', 'name', name, 'rule', '1', 'source', 'address', '172.16.20.10']) self.cli_set(['firewall', 'ipv4', 'name', name, 'rule', '1', 'destination', 'address', '172.16.10.10']) self.cli_set(['firewall', 'ipv4', 'name', name, 'rule', '1', 'log']) self.cli_set(['firewall', 'ipv4', 'name', name, 'rule', '1', 'log-options', 'level', 'debug']) self.cli_set(['firewall', 'ipv4', 'name', name, 'rule', '1', 'ttl', 'eq', '15']) self.cli_set(['firewall', 'ipv4', 'name', name, 'rule', '2', 'action', 'reject']) self.cli_set(['firewall', 'ipv4', 'name', name, 'rule', '2', 'protocol', 'tcp']) self.cli_set(['firewall', 'ipv4', 'name', name, 'rule', '2', 'destination', 'port', '8888']) self.cli_set(['firewall', 'ipv4', 'name', name, 'rule', '2', 'log']) self.cli_set(['firewall', 'ipv4', 'name', name, 'rule', '2', 'log-options', 'level', 'err']) self.cli_set(['firewall', 'ipv4', 'name', name, 'rule', '2', 'tcp', 'flags', 'syn']) self.cli_set(['firewall', 'ipv4', 'name', name, 'rule', '2', 'tcp', 'flags', 'not', 'ack']) self.cli_set(['firewall', 'ipv4', 'name', name, 'rule', '2', 'ttl', 'gt', '102']) self.cli_set(['firewall', 'ipv4', 'forward', 'filter', 'default-action', 'drop']) self.cli_set(['firewall', 'ipv4', 'forward', 'filter', 'default-log']) self.cli_set(['firewall', 'ipv4', 'forward', 'filter', 'rule', '3', 'action', 'accept']) self.cli_set(['firewall', 'ipv4', 'forward', 'filter', 'rule', '3', 'protocol', 'tcp']) self.cli_set(['firewall', 'ipv4', 'forward', 'filter', 'rule', '3', 'destination', 'port', '22']) self.cli_set(['firewall', 'ipv4', 'forward', 'filter', 'rule', '3', 'limit', 'rate', '5/minute']) self.cli_set(['firewall', 'ipv4', 'forward', 'filter', 'rule', '3', 'log']) self.cli_set(['firewall', 'ipv4', 'forward', 'filter', 'rule', '4', 'action', 'drop']) self.cli_set(['firewall', 'ipv4', 'forward', 'filter', 'rule', '4', 'protocol', 'tcp']) self.cli_set(['firewall', 'ipv4', 'forward', 'filter', 'rule', '4', 'destination', 'port', '22']) self.cli_set(['firewall', 'ipv4', 'forward', 'filter', 'rule', '4', 'recent', 'count', '10']) self.cli_set(['firewall', 'ipv4', 'forward', 'filter', 'rule', '4', 'recent', 'time', 'minute']) self.cli_set(['firewall', 'ipv4', 'forward', 'filter', 'rule', '4', 'packet-type', 'host']) self.cli_set(['firewall', 'ipv4', 'input', 'filter', 'rule', '5', 'action', 'accept']) self.cli_set(['firewall', 'ipv4', 'input', 'filter', 'rule', '5', 'protocol', 'tcp']) self.cli_set(['firewall', 'ipv4', 'input', 'filter', 'rule', '5', 'tcp', 'flags', 'syn']) self.cli_set(['firewall', 'ipv4', 'input', 'filter', 'rule', '5', 'tcp', 'mss', mss_range]) self.cli_set(['firewall', 'ipv4', 'input', 'filter', 'rule', '5', 'packet-type', 'broadcast']) self.cli_set(['firewall', 'ipv4', 'input', 'filter', 'rule', '5', 'inbound-interface', 'name', interface_wc]) self.cli_set(['firewall', 'ipv4', 'input', 'filter', 'rule', '6', 'action', 'return']) self.cli_set(['firewall', 'ipv4', 'input', 'filter', 'rule', '6', 'protocol', 'gre']) self.cli_set(['firewall', 'ipv4', 'input', 'filter', 'rule', '6', 'connection-mark', conn_mark]) self.cli_set(['firewall', 'ipv4', 'output', 'filter', 'default-action', 'drop']) self.cli_set(['firewall', 'ipv4', 'output', 'filter', 'default-log']) self.cli_set(['firewall', 'ipv4', 'output', 'filter', 'rule', '5', 'action', 'drop']) self.cli_set(['firewall', 'ipv4', 'output', 'filter', 'rule', '5', 'protocol', 'gre']) self.cli_set(['firewall', 'ipv4', 'output', 'filter', 'rule', '5', 'outbound-interface', 'name', interface_inv]) self.cli_set(['firewall', 'ipv4', 'output', 'filter', 'rule', '6', 'action', 'return']) self.cli_set(['firewall', 'ipv4', 'output', 'filter', 'rule', '6', 'protocol', 'icmp']) self.cli_set(['firewall', 'ipv4', 'output', 'filter', 'rule', '6', 'connection-mark', conn_mark]) self.cli_commit() mark_hex = "{0:#010x}".format(int(conn_mark)) nftables_search = [ ['chain VYOS_FORWARD_filter'], ['type filter hook forward priority filter; policy accept;'], ['tcp dport 22', 'limit rate 5/minute', 'accept'], ['tcp dport 22', 'add @RECENT_FWD_filter_4 { ip saddr limit rate over 10/minute burst 10 packets }', 'meta pkttype host', 'drop'], ['log prefix "[ipv4-FWD-filter-default-D]"','FWD-filter default-action drop', 'drop'], ['chain VYOS_INPUT_filter'], ['type filter hook input priority filter; policy accept;'], ['tcp flags & syn == syn', f'tcp option maxseg size {mss_range}', f'iifname "{interface_wc}"', 'meta pkttype broadcast', 'accept'], ['meta l4proto gre', f'ct mark {mark_hex}', 'return'], ['INP-filter default-action accept', 'accept'], ['chain VYOS_OUTPUT_filter'], ['type filter hook output priority filter; policy accept;'], ['meta l4proto gre', f'oifname != "{interface}"', 'drop'], ['meta l4proto icmp', f'ct mark {mark_hex}', 'return'], ['log prefix "[ipv4-OUT-filter-default-D]"','OUT-filter default-action drop', 'drop'], ['chain NAME_smoketest'], ['saddr 172.16.20.10', 'daddr 172.16.10.10', 'log prefix "[ipv4-NAM-smoketest-1-A]" log level debug', 'ip ttl 15', 'accept'], ['tcp flags syn / syn,ack', 'tcp dport 8888', 'log prefix "[ipv4-NAM-smoketest-2-R]" log level err', 'ip ttl > 102', 'reject'], ['log prefix "[ipv4-smoketest-default-D]"','smoketest default-action', 'drop'] ] self.verify_nftables(nftables_search, 'ip vyos_filter') def test_ipv4_advanced(self): name = 'smoketest-adv' name2 = 'smoketest-adv2' interface = 'eth0' self.cli_set(['firewall', 'ipv4', 'name', name, 'default-action', 'drop']) self.cli_set(['firewall', 'ipv4', 'name', name, 'default-log']) self.cli_set(['firewall', 'ipv4', 'name', name, 'rule', '6', 'action', 'accept']) self.cli_set(['firewall', 'ipv4', 'name', name, 'rule', '6', 'packet-length', '64']) self.cli_set(['firewall', 'ipv4', 'name', name, 'rule', '6', 'packet-length', '512']) self.cli_set(['firewall', 'ipv4', 'name', name, 'rule', '6', 'packet-length', '1024']) self.cli_set(['firewall', 'ipv4', 'name', name, 'rule', '6', 'dscp', '17']) self.cli_set(['firewall', 'ipv4', 'name', name, 'rule', '6', 'dscp', '52']) self.cli_set(['firewall', 'ipv4', 'name', name, 'rule', '6', 'log']) self.cli_set(['firewall', 'ipv4', 'name', name, 'rule', '6', 'log-options', 'group', '66']) self.cli_set(['firewall', 'ipv4', 'name', name, 'rule', '6', 'log-options', 'snapshot-length', '6666']) self.cli_set(['firewall', 'ipv4', 'name', name, 'rule', '6', 'log-options', 'queue-threshold','32000']) self.cli_set(['firewall', 'ipv4', 'name', name, 'rule', '7', 'action', 'accept']) self.cli_set(['firewall', 'ipv4', 'name', name, 'rule', '7', 'packet-length', '1-30000']) self.cli_set(['firewall', 'ipv4', 'name', name, 'rule', '7', 'packet-length-exclude', '60000-65535']) self.cli_set(['firewall', 'ipv4', 'name', name, 'rule', '7', 'dscp', '3-11']) self.cli_set(['firewall', 'ipv4', 'name', name, 'rule', '7', 'dscp-exclude', '21-25']) self.cli_set(['firewall', 'ipv4', 'forward', 'filter', 'default-action', 'drop']) self.cli_set(['firewall', 'ipv4', 'forward', 'filter', 'rule', '1', 'source', 'address', '198.51.100.1']) self.cli_set(['firewall', 'ipv4', 'forward', 'filter', 'rule', '1', 'mark', '1010']) self.cli_set(['firewall', 'ipv4', 'forward', 'filter', 'rule', '1', 'action', 'jump']) self.cli_set(['firewall', 'ipv4', 'forward', 'filter', 'rule', '1', 'jump-target', name]) self.cli_set(['firewall', 'ipv4', 'input', 'filter', 'rule', '2', 'protocol', 'tcp']) self.cli_set(['firewall', 'ipv4', 'input', 'filter', 'rule', '2', 'mark', '!98765']) self.cli_set(['firewall', 'ipv4', 'input', 'filter', 'rule', '2', 'action', 'queue']) self.cli_set(['firewall', 'ipv4', 'input', 'filter', 'rule', '2', 'queue', '3']) self.cli_set(['firewall', 'ipv4', 'input', 'filter', 'rule', '3', 'protocol', 'udp']) self.cli_set(['firewall', 'ipv4', 'input', 'filter', 'rule', '3', 'action', 'queue']) self.cli_set(['firewall', 'ipv4', 'input', 'filter', 'rule', '3', 'queue-options', 'fanout']) self.cli_set(['firewall', 'ipv4', 'input', 'filter', 'rule', '3', 'queue-options', 'bypass']) self.cli_set(['firewall', 'ipv4', 'input', 'filter', 'rule', '3', 'queue', '0-15']) self.cli_commit() nftables_search = [ ['chain VYOS_FORWARD_filter'], ['type filter hook forward priority filter; policy accept;'], ['ip saddr 198.51.100.1', 'meta mark 0x000003f2', f'jump NAME_{name}'], ['FWD-filter default-action drop', 'drop'], ['chain VYOS_INPUT_filter'], ['type filter hook input priority filter; policy accept;'], ['meta mark != 0x000181cd', 'meta l4proto tcp','queue to 3'], ['meta l4proto udp','queue flags bypass,fanout to 0-15'], ['INP-filter default-action accept', 'accept'], [f'chain NAME_{name}'], ['ip length { 64, 512, 1024 }', 'ip dscp { 0x11, 0x34 }', f'log prefix "[ipv4-NAM-{name}-6-A]" log group 66 snaplen 6666 queue-threshold 32000', 'accept'], ['ip length 1-30000', 'ip length != 60000-65535', 'ip dscp 0x03-0x0b', 'ip dscp != 0x15-0x19', 'accept'], [f'log prefix "[ipv4-{name}-default-D]"', 'drop'] ] self.verify_nftables(nftables_search, 'ip vyos_filter') def test_ipv4_synproxy(self): tcp_mss = '1460' tcp_wscale = '7' dport = '22' self.cli_set(['firewall', 'ipv4', 'input', 'filter', 'rule', '10', 'action', 'drop']) self.cli_set(['firewall', 'ipv4', 'input', 'filter', 'rule', '10', 'protocol', 'tcp']) self.cli_set(['firewall', 'ipv4', 'input', 'filter', 'rule', '10', 'destination', 'port', dport]) self.cli_set(['firewall', 'ipv4', 'input', 'filter', 'rule', '10', 'synproxy', 'tcp', 'mss', tcp_mss]) self.cli_set(['firewall', 'ipv4', 'input', 'filter', 'rule', '10', 'synproxy', 'tcp', 'window-scale', tcp_wscale]) with self.assertRaises(ConfigSessionError): self.cli_commit() self.cli_set(['firewall', 'ipv4', 'input', 'filter', 'rule', '10', 'action', 'synproxy']) self.cli_commit() nftables_search = [ [f'tcp dport {dport} ct state invalid,untracked', f'synproxy mss {tcp_mss} wscale {tcp_wscale} timestamp sack-perm'] ] self.verify_nftables(nftables_search, 'ip vyos_filter') def test_ipv4_mask(self): name = 'smoketest-mask' interface = 'eth0' self.cli_set(['firewall', 'group', 'address-group', 'mask_group', 'address', '1.1.1.1']) self.cli_set(['firewall', 'ipv4', 'name', name, 'default-action', 'drop']) self.cli_set(['firewall', 'ipv4', 'name', name, 'default-log']) self.cli_set(['firewall', 'ipv4', 'name', name, 'rule', '1', 'action', 'drop']) self.cli_set(['firewall', 'ipv4', 'name', name, 'rule', '1', 'destination', 'address', '0.0.1.2']) self.cli_set(['firewall', 'ipv4', 'name', name, 'rule', '1', 'destination', 'address-mask', '0.0.255.255']) self.cli_set(['firewall', 'ipv4', 'name', name, 'rule', '2', 'action', 'accept']) self.cli_set(['firewall', 'ipv4', 'name', name, 'rule', '2', 'source', 'address', '!0.0.3.4']) self.cli_set(['firewall', 'ipv4', 'name', name, 'rule', '2', 'source', 'address-mask', '0.0.255.255']) self.cli_set(['firewall', 'ipv4', 'name', name, 'rule', '3', 'action', 'drop']) self.cli_set(['firewall', 'ipv4', 'name', name, 'rule', '3', 'source', 'group', 'address-group', 'mask_group']) self.cli_set(['firewall', 'ipv4', 'name', name, 'rule', '3', 'source', 'address-mask', '0.0.255.255']) self.cli_commit() nftables_search = [ [f'daddr & 0.0.255.255 == 0.0.1.2'], [f'saddr & 0.0.255.255 != 0.0.3.4'], [f'saddr & 0.0.255.255 == @A_mask_group'] ] self.verify_nftables(nftables_search, 'ip vyos_filter') def test_ipv4_dynamic_groups(self): group01 = 'knock01' group02 = 'allowed' self.cli_set(['firewall', 'group', 'dynamic-group', 'address-group', group01]) self.cli_set(['firewall', 'group', 'dynamic-group', 'address-group', group02]) self.cli_set(['firewall', 'ipv4', 'input', 'filter', 'rule', '10', 'action', 'drop']) self.cli_set(['firewall', 'ipv4', 'input', 'filter', 'rule', '10', 'protocol', 'tcp']) self.cli_set(['firewall', 'ipv4', 'input', 'filter', 'rule', '10', 'destination', 'port', '5151']) self.cli_set(['firewall', 'ipv4', 'input', 'filter', 'rule', '10', 'add-address-to-group', 'source-address', 'address-group', group01]) self.cli_set(['firewall', 'ipv4', 'input', 'filter', 'rule', '10', 'add-address-to-group', 'source-address', 'timeout', '30s']) self.cli_set(['firewall', 'ipv4', 'input', 'filter', 'rule', '20', 'action', 'drop']) self.cli_set(['firewall', 'ipv4', 'input', 'filter', 'rule', '20', 'protocol', 'tcp']) self.cli_set(['firewall', 'ipv4', 'input', 'filter', 'rule', '20', 'destination', 'port', '7272']) self.cli_set(['firewall', 'ipv4', 'input', 'filter', 'rule', '20', 'source', 'group', 'dynamic-address-group', group01]) self.cli_set(['firewall', 'ipv4', 'input', 'filter', 'rule', '20', 'add-address-to-group', 'source-address', 'address-group', group02]) self.cli_set(['firewall', 'ipv4', 'input', 'filter', 'rule', '20', 'add-address-to-group', 'source-address', 'timeout', '5m']) self.cli_set(['firewall', 'ipv4', 'input', 'filter', 'rule', '30', 'action', 'accept']) self.cli_set(['firewall', 'ipv4', 'input', 'filter', 'rule', '30', 'protocol', 'tcp']) self.cli_set(['firewall', 'ipv4', 'input', 'filter', 'rule', '30', 'destination', 'port', '22']) self.cli_set(['firewall', 'ipv4', 'input', 'filter', 'rule', '30', 'source', 'group', 'dynamic-address-group', group02]) self.cli_commit() nftables_search = [ [f'DA_{group01}'], [f'DA_{group02}'], ['type ipv4_addr'], ['flags dynamic,timeout'], ['chain VYOS_INPUT_filter {'], ['type filter hook input priority filter', 'policy accept'], ['tcp dport 5151', f'update @DA_{group01}', '{ ip saddr timeout 30s }', 'drop'], ['tcp dport 7272', f'ip saddr @DA_{group01}', f'update @DA_{group02}', '{ ip saddr timeout 5m }', 'drop'], ['tcp dport 22', f'ip saddr @DA_{group02}', 'accept'] ] self.verify_nftables(nftables_search, 'ip vyos_filter') def test_ipv6_basic_rules(self): name = 'v6-smoketest' interface = 'eth0' self.cli_set(['firewall', 'global-options', 'state-policy', 'established', 'action', 'accept']) self.cli_set(['firewall', 'global-options', 'state-policy', 'related', 'action', 'accept']) self.cli_set(['firewall', 'global-options', 'state-policy', 'invalid', 'action', 'drop']) self.cli_set(['firewall', 'ipv6', 'name', name, 'default-action', 'drop']) self.cli_set(['firewall', 'ipv6', 'name', name, 'default-log']) self.cli_set(['firewall', 'ipv6', 'name', name, 'rule', '1', 'action', 'accept']) self.cli_set(['firewall', 'ipv6', 'name', name, 'rule', '1', 'source', 'address', '2002::1']) self.cli_set(['firewall', 'ipv6', 'name', name, 'rule', '1', 'destination', 'address', '2002::1:1']) self.cli_set(['firewall', 'ipv6', 'name', name, 'rule', '1', 'log']) self.cli_set(['firewall', 'ipv6', 'name', name, 'rule', '1', 'log-options', 'level', 'crit']) self.cli_set(['firewall', 'ipv6', 'forward', 'filter', 'default-action', 'accept']) self.cli_set(['firewall', 'ipv6', 'forward', 'filter', 'default-log']) self.cli_set(['firewall', 'ipv6', 'forward', 'filter', 'rule', '2', 'action', 'reject']) self.cli_set(['firewall', 'ipv6', 'forward', 'filter', 'rule', '2', 'protocol', 'tcp_udp']) self.cli_set(['firewall', 'ipv6', 'forward', 'filter', 'rule', '2', 'destination', 'port', '8888']) self.cli_set(['firewall', 'ipv6', 'forward', 'filter', 'rule', '2', 'inbound-interface', 'name', interface]) self.cli_set(['firewall', 'ipv6', 'output', 'filter', 'default-action', 'drop']) self.cli_set(['firewall', 'ipv6', 'output', 'filter', 'default-log']) self.cli_set(['firewall', 'ipv6', 'output', 'filter', 'rule', '3', 'action', 'return']) self.cli_set(['firewall', 'ipv6', 'output', 'filter', 'rule', '3', 'protocol', 'gre']) self.cli_set(['firewall', 'ipv6', 'output', 'filter', 'rule', '3', 'outbound-interface', 'name', interface]) self.cli_set(['firewall', 'ipv6', 'input', 'filter', 'rule', '3', 'action', 'accept']) self.cli_set(['firewall', 'ipv6', 'input', 'filter', 'rule', '3', 'protocol', 'udp']) self.cli_set(['firewall', 'ipv6', 'input', 'filter', 'rule', '3', 'source', 'address', '2002::1:2']) self.cli_set(['firewall', 'ipv6', 'input', 'filter', 'rule', '3', 'inbound-interface', 'name', interface]) self.cli_commit() nftables_search = [ ['chain VYOS_IPV6_FORWARD_filter'], ['type filter hook forward priority filter; policy accept;'], ['meta l4proto { tcp, udp }', 'th dport 8888', f'iifname "{interface}"', 'reject'], ['log prefix "[ipv6-FWD-filter-default-A]"','FWD-filter default-action accept', 'accept'], ['chain VYOS_IPV6_INPUT_filter'], ['type filter hook input priority filter; policy accept;'], ['meta l4proto udp', 'ip6 saddr 2002::1:2', f'iifname "{interface}"', 'accept'], ['INP-filter default-action accept', 'accept'], ['chain VYOS_IPV6_OUTPUT_filter'], ['type filter hook output priority filter; policy accept;'], ['meta l4proto gre', f'oifname "{interface}"', 'return'], ['log prefix "[ipv6-OUT-filter-default-D]"','OUT-filter default-action drop', 'drop'], [f'chain NAME6_{name}'], ['saddr 2002::1', 'daddr 2002::1:1', 'log prefix "[ipv6-NAM-v6-smoketest-1-A]" log level crit', 'accept'], [f'"{name} default-action drop"', f'log prefix "[ipv6-{name}-default-D]"', 'drop'], ['jump VYOS_STATE_POLICY6'], ['chain VYOS_STATE_POLICY6'], ['ct state established', 'accept'], ['ct state invalid', 'drop'], ['ct state related', 'accept'] ] self.verify_nftables(nftables_search, 'ip6 vyos_filter') def test_ipv6_advanced(self): name = 'v6-smoketest-adv' name2 = 'v6-smoketest-adv2' interface = 'eth0' self.cli_set(['firewall', 'ipv6', 'name', name, 'default-action', 'drop']) self.cli_set(['firewall', 'ipv6', 'name', name, 'default-log']) self.cli_set(['firewall', 'ipv6', 'name', name, 'rule', '3', 'action', 'accept']) self.cli_set(['firewall', 'ipv6', 'name', name, 'rule', '3', 'packet-length', '65']) self.cli_set(['firewall', 'ipv6', 'name', name, 'rule', '3', 'packet-length', '513']) self.cli_set(['firewall', 'ipv6', 'name', name, 'rule', '3', 'packet-length', '1025']) self.cli_set(['firewall', 'ipv6', 'name', name, 'rule', '3', 'dscp', '18']) self.cli_set(['firewall', 'ipv6', 'name', name, 'rule', '3', 'dscp', '53']) self.cli_set(['firewall', 'ipv6', 'forward', 'filter', 'rule', '4', 'action', 'accept']) self.cli_set(['firewall', 'ipv6', 'forward', 'filter', 'rule', '4', 'packet-length', '1-1999']) self.cli_set(['firewall', 'ipv6', 'forward', 'filter', 'rule', '4', 'packet-length-exclude', '60000-65535']) self.cli_set(['firewall', 'ipv6', 'forward', 'filter', 'rule', '4', 'dscp', '4-14']) self.cli_set(['firewall', 'ipv6', 'forward', 'filter', 'rule', '4', 'dscp-exclude', '31-35']) self.cli_set(['firewall', 'ipv6', 'input', 'filter', 'default-action', 'accept']) self.cli_set(['firewall', 'ipv6', 'input', 'filter', 'rule', '1', 'source', 'address', '2001:db8::/64']) self.cli_set(['firewall', 'ipv6', 'input', 'filter', 'rule', '1', 'mark', '!6655-7766']) self.cli_set(['firewall', 'ipv6', 'input', 'filter', 'rule', '1', 'action', 'jump']) self.cli_set(['firewall', 'ipv6', 'input', 'filter', 'rule', '1', 'jump-target', name]) self.cli_commit() nftables_search = [ ['chain VYOS_IPV6_FORWARD_filter'], ['type filter hook forward priority filter; policy accept;'], ['ip6 length 1-1999', 'ip6 length != 60000-65535', 'ip6 dscp 0x04-0x0e', 'ip6 dscp != 0x1f-0x23', 'accept'], ['chain VYOS_IPV6_INPUT_filter'], ['type filter hook input priority filter; policy accept;'], ['ip6 saddr 2001:db8::/64', 'meta mark != 0x000019ff-0x00001e56', f'jump NAME6_{name}'], [f'chain NAME6_{name}'], ['ip6 length { 65, 513, 1025 }', 'ip6 dscp { af21, 0x35 }', 'accept'], [f'log prefix "[ipv6-{name}-default-D]"', 'drop'] ] self.verify_nftables(nftables_search, 'ip6 vyos_filter') def test_ipv6_mask(self): name = 'v6-smoketest-mask' interface = 'eth0' self.cli_set(['firewall', 'group', 'ipv6-address-group', 'mask_group', 'address', '::beef']) self.cli_set(['firewall', 'ipv6', 'name', name, 'default-action', 'drop']) self.cli_set(['firewall', 'ipv6', 'name', name, 'default-log']) self.cli_set(['firewall', 'ipv6', 'name', name, 'rule', '1', 'action', 'drop']) self.cli_set(['firewall', 'ipv6', 'name', name, 'rule', '1', 'destination', 'address', '::1111:2222:3333:4444']) self.cli_set(['firewall', 'ipv6', 'name', name, 'rule', '1', 'destination', 'address-mask', '::ffff:ffff:ffff:ffff']) self.cli_set(['firewall', 'ipv6', 'name', name, 'rule', '2', 'action', 'accept']) self.cli_set(['firewall', 'ipv6', 'name', name, 'rule', '2', 'source', 'address', '!::aaaa:bbbb:cccc:dddd']) self.cli_set(['firewall', 'ipv6', 'name', name, 'rule', '2', 'source', 'address-mask', '::ffff:ffff:ffff:ffff']) self.cli_set(['firewall', 'ipv6', 'name', name, 'rule', '3', 'action', 'drop']) self.cli_set(['firewall', 'ipv6', 'name', name, 'rule', '3', 'source', 'group', 'address-group', 'mask_group']) self.cli_set(['firewall', 'ipv6', 'name', name, 'rule', '3', 'source', 'address-mask', '::ffff:ffff:ffff:ffff']) self.cli_commit() nftables_search = [ ['daddr & ::ffff:ffff:ffff:ffff == ::1111:2222:3333:4444'], ['saddr & ::ffff:ffff:ffff:ffff != ::aaaa:bbbb:cccc:dddd'], ['saddr & ::ffff:ffff:ffff:ffff == @A6_mask_group'] ] self.verify_nftables(nftables_search, 'ip6 vyos_filter') def test_ipv6_dynamic_groups(self): group01 = 'knock01' group02 = 'allowed' self.cli_set(['firewall', 'group', 'dynamic-group', 'ipv6-address-group', group01]) self.cli_set(['firewall', 'group', 'dynamic-group', 'ipv6-address-group', group02]) self.cli_set(['firewall', 'ipv6', 'input', 'filter', 'rule', '10', 'action', 'drop']) self.cli_set(['firewall', 'ipv6', 'input', 'filter', 'rule', '10', 'protocol', 'tcp']) self.cli_set(['firewall', 'ipv6', 'input', 'filter', 'rule', '10', 'destination', 'port', '5151']) self.cli_set(['firewall', 'ipv6', 'input', 'filter', 'rule', '10', 'add-address-to-group', 'source-address', 'address-group', group01]) self.cli_set(['firewall', 'ipv6', 'input', 'filter', 'rule', '10', 'add-address-to-group', 'source-address', 'timeout', '30s']) self.cli_set(['firewall', 'ipv6', 'input', 'filter', 'rule', '20', 'action', 'drop']) self.cli_set(['firewall', 'ipv6', 'input', 'filter', 'rule', '20', 'protocol', 'tcp']) self.cli_set(['firewall', 'ipv6', 'input', 'filter', 'rule', '20', 'destination', 'port', '7272']) self.cli_set(['firewall', 'ipv6', 'input', 'filter', 'rule', '20', 'source', 'group', 'dynamic-address-group', group01]) self.cli_set(['firewall', 'ipv6', 'input', 'filter', 'rule', '20', 'add-address-to-group', 'source-address', 'address-group', group02]) self.cli_set(['firewall', 'ipv6', 'input', 'filter', 'rule', '20', 'add-address-to-group', 'source-address', 'timeout', '5m']) self.cli_set(['firewall', 'ipv6', 'input', 'filter', 'rule', '30', 'action', 'accept']) self.cli_set(['firewall', 'ipv6', 'input', 'filter', 'rule', '30', 'protocol', 'tcp']) self.cli_set(['firewall', 'ipv6', 'input', 'filter', 'rule', '30', 'destination', 'port', '22']) self.cli_set(['firewall', 'ipv6', 'input', 'filter', 'rule', '30', 'source', 'group', 'dynamic-address-group', group02]) self.cli_commit() nftables_search = [ [f'DA6_{group01}'], [f'DA6_{group02}'], ['type ipv6_addr'], ['flags dynamic,timeout'], ['chain VYOS_IPV6_INPUT_filter {'], ['type filter hook input priority filter', 'policy accept'], ['tcp dport 5151', f'update @DA6_{group01}', '{ ip6 saddr timeout 30s }', 'drop'], ['tcp dport 7272', f'ip6 saddr @DA6_{group01}', f'update @DA6_{group02}', '{ ip6 saddr timeout 5m }', 'drop'], ['tcp dport 22', f'ip6 saddr @DA6_{group02}', 'accept'] ] self.verify_nftables(nftables_search, 'ip6 vyos_filter') def test_ipv4_state_and_status_rules(self): name = 'smoketest-state' interface = 'eth0' self.cli_set(['firewall', 'global-options', 'state-policy', 'established', 'action', 'accept']) self.cli_set(['firewall', 'global-options', 'state-policy', 'related', 'action', 'accept']) self.cli_set(['firewall', 'global-options', 'state-policy', 'invalid', 'action', 'drop']) self.cli_set(['firewall', 'ipv4', 'name', name, 'default-action', 'drop']) self.cli_set(['firewall', 'ipv4', 'name', name, 'rule', '1', 'action', 'accept']) self.cli_set(['firewall', 'ipv4', 'name', name, 'rule', '1', 'state', 'established']) self.cli_set(['firewall', 'ipv4', 'name', name, 'rule', '1', 'state', 'related']) self.cli_set(['firewall', 'ipv4', 'name', name, 'rule', '2', 'action', 'reject']) self.cli_set(['firewall', 'ipv4', 'name', name, 'rule', '2', 'state', 'invalid']) self.cli_set(['firewall', 'ipv4', 'name', name, 'rule', '3', 'action', 'accept']) self.cli_set(['firewall', 'ipv4', 'name', name, 'rule', '3', 'state', 'new']) self.cli_set(['firewall', 'ipv4', 'name', name, 'rule', '3', 'connection-status', 'nat', 'destination']) self.cli_set(['firewall', 'ipv4', 'name', name, 'rule', '4', 'action', 'accept']) self.cli_set(['firewall', 'ipv4', 'name', name, 'rule', '4', 'state', 'new']) self.cli_set(['firewall', 'ipv4', 'name', name, 'rule', '4', 'state', 'established']) self.cli_set(['firewall', 'ipv4', 'name', name, 'rule', '4', 'connection-status', 'nat', 'source']) self.cli_set(['firewall', 'ipv4', 'name', name, 'rule', '5', 'action', 'accept']) self.cli_set(['firewall', 'ipv4', 'name', name, 'rule', '5', 'state', 'related']) self.cli_set(['firewall', 'ipv4', 'name', name, 'rule', '5', 'conntrack-helper', 'ftp']) self.cli_set(['firewall', 'ipv4', 'name', name, 'rule', '5', 'conntrack-helper', 'pptp']) self.cli_commit() nftables_search = [ ['ct state { established, related }', 'accept'], ['ct state invalid', 'reject'], ['ct state new', 'ct status == dnat', 'accept'], ['ct state { established, new }', 'ct status == snat', 'accept'], ['ct state related', 'ct helper { "ftp", "pptp" }', 'accept'], ['drop', f'comment "{name} default-action drop"'], ['jump VYOS_STATE_POLICY'], ['chain VYOS_STATE_POLICY'], ['ct state established', 'accept'], ['ct state invalid', 'drop'], ['ct state related', 'accept'] ] self.verify_nftables(nftables_search, 'ip vyos_filter') # Check conntrack self.verify_nftables_chain([['accept']], 'ip vyos_conntrack', 'FW_CONNTRACK') self.verify_nftables_chain([['return']], 'ip6 vyos_conntrack', 'FW_CONNTRACK') def test_bridge_basic_rules(self): name = 'smoketest' interface_in = 'eth0' mac_address = '00:53:00:00:00:01' vlan_id = '12' vlan_prior = '3' self.cli_set(['firewall', 'bridge', 'name', name, 'default-action', 'accept']) self.cli_set(['firewall', 'bridge', 'name', name, 'default-log']) self.cli_set(['firewall', 'bridge', 'name', name, 'rule', '1', 'action', 'accept']) self.cli_set(['firewall', 'bridge', 'name', name, 'rule', '1', 'source', 'mac-address', mac_address]) self.cli_set(['firewall', 'bridge', 'name', name, 'rule', '1', 'inbound-interface', 'name', interface_in]) self.cli_set(['firewall', 'bridge', 'name', name, 'rule', '1', 'log']) self.cli_set(['firewall', 'bridge', 'name', name, 'rule', '1', 'log-options', 'level', 'crit']) self.cli_set(['firewall', 'bridge', 'forward', 'filter', 'default-action', 'drop']) self.cli_set(['firewall', 'bridge', 'forward', 'filter', 'default-log']) self.cli_set(['firewall', 'bridge', 'forward', 'filter', 'rule', '1', 'action', 'accept']) self.cli_set(['firewall', 'bridge', 'forward', 'filter', 'rule', '1', 'vlan', 'id', vlan_id]) self.cli_set(['firewall', 'bridge', 'forward', 'filter', 'rule', '2', 'action', 'jump']) self.cli_set(['firewall', 'bridge', 'forward', 'filter', 'rule', '2', 'jump-target', name]) self.cli_set(['firewall', 'bridge', 'forward', 'filter', 'rule', '2', 'vlan', 'priority', vlan_prior]) self.cli_commit() nftables_search = [ ['chain VYOS_FORWARD_filter'], ['type filter hook forward priority filter; policy accept;'], [f'vlan id {vlan_id}', 'accept'], [f'vlan pcp {vlan_prior}', f'jump NAME_{name}'], ['log prefix "[bri-FWD-filter-default-D]"', 'drop', 'FWD-filter default-action drop'], [f'chain NAME_{name}'], [f'ether saddr {mac_address}', f'iifname "{interface_in}"', f'log prefix "[bri-NAM-{name}-1-A]" log level crit', 'accept'], ['accept', f'{name} default-action accept'] ] self.verify_nftables(nftables_search, 'bridge vyos_filter') def test_source_validation(self): # Strict self.cli_set(['firewall', 'global-options', 'source-validation', 'strict']) self.cli_set(['firewall', 'global-options', 'ipv6-source-validation', 'strict']) self.cli_commit() nftables_strict_search = [ ['fib saddr . iif oif 0', 'drop'] ] self.verify_nftables_chain(nftables_strict_search, 'ip raw', 'vyos_global_rpfilter') self.verify_nftables_chain(nftables_strict_search, 'ip6 raw', 'vyos_global_rpfilter') # Loose self.cli_set(['firewall', 'global-options', 'source-validation', 'loose']) self.cli_set(['firewall', 'global-options', 'ipv6-source-validation', 'loose']) self.cli_commit() nftables_loose_search = [ ['fib saddr oif 0', 'drop'] ] self.verify_nftables_chain(nftables_loose_search, 'ip raw', 'vyos_global_rpfilter') self.verify_nftables_chain(nftables_loose_search, 'ip6 raw', 'vyos_global_rpfilter') def test_sysfs(self): for name, conf in sysfs_config.items(): paths = glob(conf['sysfs']) for path in paths: with open(path, 'r') as f: self.assertEqual(f.read().strip(), conf['default'], msg=path) self.cli_set(['firewall', 'global-options', name.replace("_", "-"), conf['test_value']]) self.cli_commit() for name, conf in sysfs_config.items(): paths = glob(conf['sysfs']) for path in paths: with open(path, 'r') as f: self.assertNotEqual(f.read().strip(), conf['default'], msg=path) ### Zone def test_zone_basic(self): self.cli_set(['firewall', 'ipv4', 'name', 'smoketest', 'default-action', 'drop']) self.cli_set(['firewall', 'ipv6', 'name', 'smoketestv6', 'default-action', 'drop']) self.cli_set(['firewall', 'zone', 'smoketest-eth0', 'interface', 'eth0']) self.cli_set(['firewall', 'zone', 'smoketest-eth0', 'from', 'smoketest-local', 'firewall', 'name', 'smoketest']) self.cli_set(['firewall', 'zone', 'smoketest-eth0', 'intra-zone-filtering', 'firewall', 'ipv6-name', 'smoketestv6']) self.cli_set(['firewall', 'zone', 'smoketest-local', 'local-zone']) self.cli_set(['firewall', 'zone', 'smoketest-local', 'from', 'smoketest-eth0', 'firewall', 'name', 'smoketest']) self.cli_set(['firewall', 'global-options', 'state-policy', 'established', 'action', 'accept']) self.cli_set(['firewall', 'global-options', 'state-policy', 'established', 'log']) self.cli_set(['firewall', 'global-options', 'state-policy', 'related', 'action', 'accept']) self.cli_set(['firewall', 'global-options', 'state-policy', 'invalid', 'action', 'drop']) self.cli_commit() nftables_search = [ ['chain VYOS_ZONE_FORWARD'], ['type filter hook forward priority filter + 1'], ['chain VYOS_ZONE_OUTPUT'], ['type filter hook output priority filter + 1'], ['chain VYOS_ZONE_LOCAL'], ['type filter hook input priority filter + 1'], ['chain VZONE_smoketest-eth0'], ['chain VZONE_smoketest-local_IN'], ['chain VZONE_smoketest-local_OUT'], ['oifname "eth0"', 'jump VZONE_smoketest-eth0'], ['jump VZONE_smoketest-local_IN'], ['jump VZONE_smoketest-local_OUT'], ['iifname "eth0"', 'jump NAME_smoketest'], ['oifname "eth0"', 'jump NAME_smoketest'], ['jump VYOS_STATE_POLICY'], ['chain VYOS_STATE_POLICY'], ['ct state established', 'log prefix "[STATE-POLICY-EST-A]"', 'accept'], ['ct state invalid', 'drop'], ['ct state related', 'accept'] ] nftables_search_v6 = [ ['chain VYOS_ZONE_FORWARD'], ['type filter hook forward priority filter + 1'], ['chain VYOS_ZONE_OUTPUT'], ['type filter hook output priority filter + 1'], ['chain VYOS_ZONE_LOCAL'], ['type filter hook input priority filter + 1'], ['chain VZONE_smoketest-eth0'], ['chain VZONE_smoketest-local_IN'], ['chain VZONE_smoketest-local_OUT'], ['oifname "eth0"', 'jump VZONE_smoketest-eth0'], ['jump VZONE_smoketest-local_IN'], ['jump VZONE_smoketest-local_OUT'], ['iifname "eth0"', 'jump NAME6_smoketestv6'], ['jump VYOS_STATE_POLICY6'], ['chain VYOS_STATE_POLICY6'], ['ct state established', 'log prefix "[STATE-POLICY-EST-A]"', 'accept'], ['ct state invalid', 'drop'], ['ct state related', 'accept'] ] - nftables_output = cmd('sudo nft list table ip vyos_filter') self.verify_nftables(nftables_search, 'ip vyos_filter') self.verify_nftables(nftables_search_v6, 'ip6 vyos_filter') def test_flow_offload(self): self.cli_set(['interfaces', 'ethernet', 'eth0', 'vif', '10']) self.cli_set(['firewall', 'flowtable', 'smoketest', 'interface', 'eth0.10']) self.cli_set(['firewall', 'flowtable', 'smoketest', 'offload', 'hardware']) # QEMU virtual NIC does not support hw-tc-offload with self.assertRaises(ConfigSessionError): self.cli_commit() self.cli_set(['firewall', 'flowtable', 'smoketest', 'offload', 'software']) self.cli_set(['firewall', 'ipv4', 'forward', 'filter', 'rule', '1', 'action', 'offload']) self.cli_set(['firewall', 'ipv4', 'forward', 'filter', 'rule', '1', 'offload-target', 'smoketest']) self.cli_set(['firewall', 'ipv4', 'forward', 'filter', 'rule', '1', 'protocol', 'tcp_udp']) self.cli_set(['firewall', 'ipv4', 'forward', 'filter', 'rule', '1', 'state', 'established']) self.cli_set(['firewall', 'ipv4', 'forward', 'filter', 'rule', '1', 'state', 'related']) self.cli_set(['firewall', 'ipv6', 'forward', 'filter', 'rule', '1', 'action', 'offload']) self.cli_set(['firewall', 'ipv6', 'forward', 'filter', 'rule', '1', 'offload-target', 'smoketest']) self.cli_set(['firewall', 'ipv6', 'forward', 'filter', 'rule', '1', 'protocol', 'tcp_udp']) self.cli_set(['firewall', 'ipv6', 'forward', 'filter', 'rule', '1', 'state', 'established']) self.cli_set(['firewall', 'ipv6', 'forward', 'filter', 'rule', '1', 'state', 'related']) self.cli_commit() nftables_search = [ ['flowtable VYOS_FLOWTABLE_smoketest'], ['hook ingress priority filter'], ['devices = { eth0.10 }'], ['ct state { established, related }', 'meta l4proto { tcp, udp }', 'flow add @VYOS_FLOWTABLE_smoketest'], ] self.verify_nftables(nftables_search, 'ip vyos_filter') self.verify_nftables(nftables_search, 'ip6 vyos_filter') # Check conntrack self.verify_nftables_chain([['accept']], 'ip vyos_conntrack', 'FW_CONNTRACK') self.verify_nftables_chain([['accept']], 'ip6 vyos_conntrack', 'FW_CONNTRACK') def test_zone_flow_offload(self): self.cli_set(['firewall', 'flowtable', 'smoketest', 'interface', 'eth0']) self.cli_set(['firewall', 'flowtable', 'smoketest', 'offload', 'hardware']) # QEMU virtual NIC does not support hw-tc-offload with self.assertRaises(ConfigSessionError): self.cli_commit() self.cli_set(['firewall', 'flowtable', 'smoketest', 'offload', 'software']) self.cli_set(['firewall', 'ipv4', 'name', 'smoketest', 'rule', '1', 'action', 'offload']) self.cli_set(['firewall', 'ipv4', 'name', 'smoketest', 'rule', '1', 'offload-target', 'smoketest']) self.cli_set(['firewall', 'ipv6', 'name', 'smoketest', 'rule', '1', 'action', 'offload']) self.cli_set(['firewall', 'ipv6', 'name', 'smoketest', 'rule', '1', 'offload-target', 'smoketest']) self.cli_commit() nftables_search = [ ['chain NAME_smoketest'], ['flow add @VYOS_FLOWTABLE_smoketest'] ] self.verify_nftables(nftables_search, 'ip vyos_filter') nftables_search = [ ['chain NAME6_smoketest'], ['flow add @VYOS_FLOWTABLE_smoketest'] ] self.verify_nftables(nftables_search, 'ip6 vyos_filter') # Check conntrack self.verify_nftables_chain([['accept']], 'ip vyos_conntrack', 'FW_CONNTRACK') self.verify_nftables_chain([['accept']], 'ip6 vyos_conntrack', 'FW_CONNTRACK') if __name__ == '__main__': unittest.main(verbosity=2) diff --git a/smoketest/scripts/cli/test_nat.py b/smoketest/scripts/cli/test_nat.py index 1e6435df8..4f1c3cb4f 100755 --- a/smoketest/scripts/cli/test_nat.py +++ b/smoketest/scripts/cli/test_nat.py @@ -1,316 +1,303 @@ #!/usr/bin/env python3 # # Copyright (C) 2020-2022 VyOS maintainers and contributors # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License version 2 or later as # published by the Free Software Foundation. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. import jmespath import json import os import unittest from base_vyostest_shim import VyOSUnitTestSHIM from vyos.configsession import ConfigSessionError -from vyos.utils.process import cmd -from vyos.utils.dict import dict_search base_path = ['nat'] src_path = base_path + ['source'] dst_path = base_path + ['destination'] static_path = base_path + ['static'] nftables_nat_config = '/run/nftables_nat.conf' nftables_static_nat_conf = '/run/nftables_static-nat-rules.nft' class TestNAT(VyOSUnitTestSHIM.TestCase): @classmethod def setUpClass(cls): super(TestNAT, cls).setUpClass() # ensure we can also run this test on a live system - so lets clean # out the current configuration :) cls.cli_delete(cls, base_path) def tearDown(self): self.cli_delete(base_path) self.cli_commit() self.assertFalse(os.path.exists(nftables_nat_config)) self.assertFalse(os.path.exists(nftables_static_nat_conf)) - def verify_nftables(self, nftables_search, table, inverse=False, args=''): - nftables_output = cmd(f'sudo nft {args} list table {table}') - - for search in nftables_search: - matched = False - for line in nftables_output.split("\n"): - if all(item in line for item in search): - matched = True - break - self.assertTrue(not matched if inverse else matched, msg=search) - def wait_for_domain_resolver(self, table, set_name, element, max_wait=10): # Resolver no longer blocks commit, need to wait for daemon to populate set count = 0 while count < max_wait: code = run(f'sudo nft get element {table} {set_name} {{ {element} }}') if code == 0: return True count += 1 sleep(1) return False def test_snat(self): rules = ['100', '110', '120', '130', '200', '210', '220', '230'] outbound_iface_100 = 'eth0' outbound_iface_200 = 'eth1' nftables_search = ['jump VYOS_PRE_SNAT_HOOK'] for rule in rules: network = f'192.168.{rule}.0/24' # depending of rule order we check either for source address for NAT # or configured destination address for NAT if int(rule) < 200: self.cli_set(src_path + ['rule', rule, 'source', 'address', network]) self.cli_set(src_path + ['rule', rule, 'outbound-interface', 'name', outbound_iface_100]) self.cli_set(src_path + ['rule', rule, 'translation', 'address', 'masquerade']) nftables_search.append([f'saddr {network}', f'oifname "{outbound_iface_100}"', 'masquerade']) else: self.cli_set(src_path + ['rule', rule, 'destination', 'address', network]) self.cli_set(src_path + ['rule', rule, 'outbound-interface', 'name', outbound_iface_200]) self.cli_set(src_path + ['rule', rule, 'exclude']) nftables_search.append([f'daddr {network}', f'oifname "{outbound_iface_200}"', 'return']) self.cli_commit() self.verify_nftables(nftables_search, 'ip vyos_nat') def test_snat_groups(self): address_group = 'smoketest_addr' address_group_member = '192.0.2.1' interface_group = 'smoketest_ifaces' interface_group_member = 'bond.99' rule = '100' self.cli_set(['firewall', 'group', 'address-group', address_group, 'address', address_group_member]) self.cli_set(['firewall', 'group', 'interface-group', interface_group, 'interface', interface_group_member]) self.cli_set(src_path + ['rule', rule, 'source', 'group', 'address-group', address_group]) self.cli_set(src_path + ['rule', rule, 'outbound-interface', 'group', interface_group]) self.cli_set(src_path + ['rule', rule, 'translation', 'address', 'masquerade']) self.cli_commit() nftables_search = [ [f'set A_{address_group}'], [f'elements = {{ {address_group_member} }}'], [f'ip saddr @A_{address_group}', f'oifname @I_{interface_group}', 'masquerade'] ] self.verify_nftables(nftables_search, 'ip vyos_nat') self.cli_delete(['firewall']) def test_dnat(self): rules = ['100', '110', '120', '130', '200', '210', '220', '230'] inbound_iface_100 = 'eth0' inbound_iface_200 = 'eth1' inbound_proto_100 = 'udp' inbound_proto_200 = 'tcp' nftables_search = ['jump VYOS_PRE_DNAT_HOOK'] for rule in rules: port = f'10{rule}' self.cli_set(dst_path + ['rule', rule, 'source', 'port', port]) self.cli_set(dst_path + ['rule', rule, 'translation', 'address', '192.0.2.1']) self.cli_set(dst_path + ['rule', rule, 'translation', 'port', port]) rule_search = [f'dnat to 192.0.2.1:{port}'] if int(rule) < 200: self.cli_set(dst_path + ['rule', rule, 'protocol', inbound_proto_100]) self.cli_set(dst_path + ['rule', rule, 'inbound-interface', 'name', inbound_iface_100]) rule_search.append(f'{inbound_proto_100} sport {port}') rule_search.append(f'iifname "{inbound_iface_100}"') else: self.cli_set(dst_path + ['rule', rule, 'protocol', inbound_proto_200]) self.cli_set(dst_path + ['rule', rule, 'inbound-interface', 'name', inbound_iface_200]) rule_search.append(f'iifname "{inbound_iface_200}"') nftables_search.append(rule_search) self.cli_commit() self.verify_nftables(nftables_search, 'ip vyos_nat') def test_snat_required_translation_address(self): # T2813: Ensure translation address is specified rule = '5' self.cli_set(src_path + ['rule', rule, 'source', 'address', '192.0.2.0/24']) # check validate() - translation address not specified with self.assertRaises(ConfigSessionError): self.cli_commit() self.cli_set(src_path + ['rule', rule, 'translation', 'address', 'masquerade']) self.cli_commit() def test_dnat_negated_addresses(self): # T3186: negated addresses are not accepted by nftables rule = '1000' self.cli_set(dst_path + ['rule', rule, 'destination', 'address', '!192.0.2.1']) self.cli_set(dst_path + ['rule', rule, 'destination', 'port', '53']) self.cli_set(dst_path + ['rule', rule, 'inbound-interface', 'name', 'eth0']) self.cli_set(dst_path + ['rule', rule, 'protocol', 'tcp_udp']) self.cli_set(dst_path + ['rule', rule, 'source', 'address', '!192.0.2.1']) self.cli_set(dst_path + ['rule', rule, 'translation', 'address', '192.0.2.1']) self.cli_set(dst_path + ['rule', rule, 'translation', 'port', '53']) self.cli_commit() def test_nat_no_rules(self): # T3206: deleting all rules but keep the direction 'destination' or # 'source' resulteds in KeyError: 'rule'. # # Test that both 'nat destination' and 'nat source' nodes can exist # without any rule self.cli_set(src_path) self.cli_set(dst_path) self.cli_set(static_path) self.cli_commit() def test_dnat_without_translation_address(self): self.cli_set(dst_path + ['rule', '1', 'inbound-interface', 'name', 'eth1']) self.cli_set(dst_path + ['rule', '1', 'destination', 'port', '443']) self.cli_set(dst_path + ['rule', '1', 'protocol', 'tcp']) self.cli_set(dst_path + ['rule', '1', 'packet-type', 'host']) self.cli_set(dst_path + ['rule', '1', 'translation', 'port', '443']) self.cli_commit() nftables_search = [ ['iifname "eth1"', 'tcp dport 443', 'pkttype host', 'dnat to :443'] ] self.verify_nftables(nftables_search, 'ip vyos_nat') def test_static_nat(self): dst_addr_1 = '10.0.1.1' translate_addr_1 = '192.168.1.1' dst_addr_2 = '203.0.113.0/24' translate_addr_2 = '192.0.2.0/24' ifname = 'eth0' self.cli_set(static_path + ['rule', '10', 'destination', 'address', dst_addr_1]) self.cli_set(static_path + ['rule', '10', 'inbound-interface', ifname]) self.cli_set(static_path + ['rule', '10', 'translation', 'address', translate_addr_1]) self.cli_set(static_path + ['rule', '20', 'destination', 'address', dst_addr_2]) self.cli_set(static_path + ['rule', '20', 'inbound-interface', ifname]) self.cli_set(static_path + ['rule', '20', 'translation', 'address', translate_addr_2]) self.cli_commit() nftables_search = [ [f'iifname "{ifname}"', f'ip daddr {dst_addr_1}', f'dnat to {translate_addr_1}'], [f'oifname "{ifname}"', f'ip saddr {translate_addr_1}', f'snat to {dst_addr_1}'], [f'iifname "{ifname}"', f'dnat ip prefix to ip daddr map {{ {dst_addr_2} : {translate_addr_2} }}'], [f'oifname "{ifname}"', f'snat ip prefix to ip saddr map {{ {translate_addr_2} : {dst_addr_2} }}'] ] self.verify_nftables(nftables_search, 'ip vyos_static_nat') def test_dnat_redirect(self): dst_addr_1 = '10.0.1.1' dest_port = '5122' protocol = 'tcp' redirected_port = '22' ifname = 'eth0' self.cli_set(dst_path + ['rule', '10', 'destination', 'address', dst_addr_1]) self.cli_set(dst_path + ['rule', '10', 'destination', 'port', dest_port]) self.cli_set(dst_path + ['rule', '10', 'protocol', protocol]) self.cli_set(dst_path + ['rule', '10', 'inbound-interface', 'name', ifname]) self.cli_set(dst_path + ['rule', '10', 'translation', 'redirect', 'port', redirected_port]) self.cli_set(dst_path + ['rule', '20', 'destination', 'address', dst_addr_1]) self.cli_set(dst_path + ['rule', '20', 'destination', 'port', dest_port]) self.cli_set(dst_path + ['rule', '20', 'protocol', protocol]) self.cli_set(dst_path + ['rule', '20', 'inbound-interface', 'name', ifname]) self.cli_set(dst_path + ['rule', '20', 'translation', 'redirect']) self.cli_commit() nftables_search = [ [f'iifname "{ifname}"', f'ip daddr {dst_addr_1}', f'{protocol} dport {dest_port}', f'redirect to :{redirected_port}'], [f'iifname "{ifname}"', f'ip daddr {dst_addr_1}', f'{protocol} dport {dest_port}', f'redirect'] ] self.verify_nftables(nftables_search, 'ip vyos_nat') def test_nat_balance(self): ifname = 'eth0' member_1 = '198.51.100.1' weight_1 = '10' member_2 = '198.51.100.2' weight_2 = '90' member_3 = '192.0.2.1' weight_3 = '35' member_4 = '192.0.2.2' weight_4 = '65' dst_port = '443' self.cli_set(dst_path + ['rule', '1', 'inbound-interface', 'name', ifname]) self.cli_set(dst_path + ['rule', '1', 'protocol', 'tcp']) self.cli_set(dst_path + ['rule', '1', 'destination', 'port', dst_port]) self.cli_set(dst_path + ['rule', '1', 'load-balance', 'hash', 'source-address']) self.cli_set(dst_path + ['rule', '1', 'load-balance', 'hash', 'source-port']) self.cli_set(dst_path + ['rule', '1', 'load-balance', 'hash', 'destination-address']) self.cli_set(dst_path + ['rule', '1', 'load-balance', 'hash', 'destination-port']) self.cli_set(dst_path + ['rule', '1', 'load-balance', 'backend', member_1, 'weight', weight_1]) self.cli_set(dst_path + ['rule', '1', 'load-balance', 'backend', member_2, 'weight', weight_2]) self.cli_set(src_path + ['rule', '1', 'outbound-interface', 'name', ifname]) self.cli_set(src_path + ['rule', '1', 'load-balance', 'hash', 'random']) self.cli_set(src_path + ['rule', '1', 'load-balance', 'backend', member_3, 'weight', weight_3]) self.cli_set(src_path + ['rule', '1', 'load-balance', 'backend', member_4, 'weight', weight_4]) self.cli_commit() nftables_search = [ [f'iifname "{ifname}"', f'tcp dport {dst_port}', f'dnat to jhash ip saddr . tcp sport . ip daddr . tcp dport mod 100 map', f'0-9 : {member_1}, 10-99 : {member_2}'], [f'oifname "{ifname}"', f'snat to numgen random mod 100 map', f'0-34 : {member_3}, 35-99 : {member_4}'] ] self.verify_nftables(nftables_search, 'ip vyos_nat') def test_snat_net_port_map(self): self.cli_set(src_path + ['rule', '10', 'protocol', 'tcp_udp']) self.cli_set(src_path + ['rule', '10', 'source', 'address', '100.64.0.0/25']) self.cli_set(src_path + ['rule', '10', 'translation', 'address', '203.0.113.0/25']) self.cli_set(src_path + ['rule', '10', 'translation', 'port', '1025-3072']) self.cli_set(src_path + ['rule', '20', 'protocol', 'tcp_udp']) self.cli_set(src_path + ['rule', '20', 'source', 'address', '100.64.0.128/25']) self.cli_set(src_path + ['rule', '20', 'translation', 'address', '203.0.113.128/25']) self.cli_set(src_path + ['rule', '20', 'translation', 'port', '1025-3072']) self.cli_commit() nftables_search = [ ['meta l4proto { tcp, udp }', 'snat ip prefix to ip saddr map { 100.64.0.0/25 : 203.0.113.0/25 . 1025-3072 }', 'comment "SRC-NAT-10"'], ['meta l4proto { tcp, udp }', 'snat ip prefix to ip saddr map { 100.64.0.128/25 : 203.0.113.128/25 . 1025-3072 }', 'comment "SRC-NAT-20"'] ] self.verify_nftables(nftables_search, 'ip vyos_nat') if __name__ == '__main__': unittest.main(verbosity=2) diff --git a/smoketest/scripts/cli/test_nat66.py b/smoketest/scripts/cli/test_nat66.py index 0607f6616..400a895ff 100755 --- a/smoketest/scripts/cli/test_nat66.py +++ b/smoketest/scripts/cli/test_nat66.py @@ -1,227 +1,214 @@ #!/usr/bin/env python3 # # Copyright (C) 2020 VyOS maintainers and contributors # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License version 2 or later as # published by the Free Software Foundation. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. import os import jmespath import json import unittest from base_vyostest_shim import VyOSUnitTestSHIM from vyos.configsession import ConfigSessionError -from vyos.utils.process import cmd -from vyos.utils.dict import dict_search base_path = ['nat66'] src_path = base_path + ['source'] dst_path = base_path + ['destination'] class TestNAT66(VyOSUnitTestSHIM.TestCase): @classmethod def setUpClass(cls): super(TestNAT66, cls).setUpClass() # ensure we can also run this test on a live system - so lets clean # out the current configuration :) cls.cli_delete(cls, base_path) def tearDown(self): self.cli_delete(base_path) self.cli_commit() - def verify_nftables(self, nftables_search, table, inverse=False, args=''): - nftables_output = cmd(f'sudo nft {args} list table {table}') - - for search in nftables_search: - matched = False - for line in nftables_output.split("\n"): - if all(item in line for item in search): - matched = True - break - self.assertTrue(not matched if inverse else matched, msg=search) - def test_source_nat66(self): source_prefix = 'fc00::/64' translation_prefix = 'fc01::/64' self.cli_set(src_path + ['rule', '1', 'outbound-interface', 'name', 'eth1']) self.cli_set(src_path + ['rule', '1', 'source', 'prefix', source_prefix]) self.cli_set(src_path + ['rule', '1', 'translation', 'address', translation_prefix]) self.cli_set(src_path + ['rule', '2', 'outbound-interface', 'name', 'eth1']) self.cli_set(src_path + ['rule', '2', 'source', 'prefix', source_prefix]) self.cli_set(src_path + ['rule', '2', 'translation', 'address', 'masquerade']) self.cli_set(src_path + ['rule', '3', 'outbound-interface', 'name', 'eth1']) self.cli_set(src_path + ['rule', '3', 'source', 'prefix', source_prefix]) self.cli_set(src_path + ['rule', '3', 'exclude']) self.cli_commit() nftables_search = [ ['oifname "eth1"', f'ip6 saddr {source_prefix}', f'snat prefix to {translation_prefix}'], ['oifname "eth1"', f'ip6 saddr {source_prefix}', 'masquerade'], ['oifname "eth1"', f'ip6 saddr {source_prefix}', 'return'] ] self.verify_nftables(nftables_search, 'ip6 vyos_nat') def test_source_nat66_address(self): source_prefix = 'fc00::/64' translation_address = 'fc00::1' self.cli_set(src_path + ['rule', '1', 'outbound-interface', 'name', 'eth1']) self.cli_set(src_path + ['rule', '1', 'source', 'prefix', source_prefix]) self.cli_set(src_path + ['rule', '1', 'translation', 'address', translation_address]) # check validate() - outbound-interface must be defined self.cli_commit() nftables_search = [ ['oifname "eth1"', f'ip6 saddr {source_prefix}', f'snat to {translation_address}'] ] self.verify_nftables(nftables_search, 'ip6 vyos_nat') def test_destination_nat66(self): destination_address = 'fc00::1' translation_address = 'fc01::1' source_address = 'fc02::1' self.cli_set(dst_path + ['rule', '1', 'inbound-interface', 'name', 'eth1']) self.cli_set(dst_path + ['rule', '1', 'destination', 'address', destination_address]) self.cli_set(dst_path + ['rule', '1', 'translation', 'address', translation_address]) self.cli_set(dst_path + ['rule', '2', 'inbound-interface', 'name', 'eth1']) self.cli_set(dst_path + ['rule', '2', 'destination', 'address', destination_address]) self.cli_set(dst_path + ['rule', '2', 'source', 'address', source_address]) self.cli_set(dst_path + ['rule', '2', 'exclude']) # check validate() - outbound-interface must be defined self.cli_commit() nftables_search = [ ['iifname "eth1"', 'ip6 daddr fc00::1', 'dnat to fc01::1'], ['iifname "eth1"', 'ip6 saddr fc02::1', 'ip6 daddr fc00::1', 'return'] ] self.verify_nftables(nftables_search, 'ip6 vyos_nat') def test_destination_nat66_protocol(self): translation_address = '2001:db8:1111::1' source_prefix = '2001:db8:2222::/64' dport = '4545' sport = '8080' tport = '5555' proto = 'tcp' self.cli_set(dst_path + ['rule', '1', 'inbound-interface', 'name', 'eth1']) self.cli_set(dst_path + ['rule', '1', 'destination', 'port', dport]) self.cli_set(dst_path + ['rule', '1', 'source', 'address', source_prefix]) self.cli_set(dst_path + ['rule', '1', 'source', 'port', sport]) self.cli_set(dst_path + ['rule', '1', 'protocol', proto]) self.cli_set(dst_path + ['rule', '1', 'translation', 'address', translation_address]) self.cli_set(dst_path + ['rule', '1', 'translation', 'port', tport]) # check validate() - outbound-interface must be defined self.cli_commit() nftables_search = [ ['iifname "eth1"', 'tcp dport 4545', 'ip6 saddr 2001:db8:2222::/64', 'tcp sport 8080', 'dnat to [2001:db8:1111::1]:5555'] ] self.verify_nftables(nftables_search, 'ip6 vyos_nat') def test_destination_nat66_prefix(self): destination_prefix = 'fc00::/64' translation_prefix = 'fc01::/64' self.cli_set(dst_path + ['rule', '1', 'inbound-interface', 'name', 'eth1']) self.cli_set(dst_path + ['rule', '1', 'destination', 'address', destination_prefix]) self.cli_set(dst_path + ['rule', '1', 'translation', 'address', translation_prefix]) # check validate() - outbound-interface must be defined self.cli_commit() nftables_search = [ ['iifname "eth1"', f'ip6 daddr {destination_prefix}', f'dnat prefix to {translation_prefix}'] ] self.verify_nftables(nftables_search, 'ip6 vyos_nat') def test_destination_nat66_without_translation_address(self): self.cli_set(dst_path + ['rule', '1', 'inbound-interface', 'name', 'eth1']) self.cli_set(dst_path + ['rule', '1', 'destination', 'port', '443']) self.cli_set(dst_path + ['rule', '1', 'protocol', 'tcp']) self.cli_set(dst_path + ['rule', '1', 'translation', 'port', '443']) self.cli_commit() nftables_search = [ ['iifname "eth1"', 'tcp dport 443', 'dnat to :443'] ] self.verify_nftables(nftables_search, 'ip6 vyos_nat') def test_source_nat66_required_translation_prefix(self): # T2813: Ensure translation address is specified rule = '5' source_prefix = 'fc00::/64' self.cli_set(src_path + ['rule', rule, 'source', 'prefix', source_prefix]) # check validate() - outbound-interface must be defined with self.assertRaises(ConfigSessionError): self.cli_commit() self.cli_set(src_path + ['rule', rule, 'outbound-interface', 'name', 'eth0']) # check validate() - translation address not specified with self.assertRaises(ConfigSessionError): self.cli_commit() self.cli_set(src_path + ['rule', rule, 'translation', 'address', 'masquerade']) self.cli_commit() def test_source_nat66_protocol(self): translation_address = '2001:db8:1111::1' source_prefix = '2001:db8:2222::/64' dport = '9999' sport = '8080' tport = '80' proto = 'tcp' self.cli_set(src_path + ['rule', '1', 'outbound-interface', 'name', 'eth1']) self.cli_set(src_path + ['rule', '1', 'destination', 'port', dport]) self.cli_set(src_path + ['rule', '1', 'source', 'prefix', source_prefix]) self.cli_set(src_path + ['rule', '1', 'source', 'port', sport]) self.cli_set(src_path + ['rule', '1', 'protocol', proto]) self.cli_set(src_path + ['rule', '1', 'translation', 'address', translation_address]) self.cli_set(src_path + ['rule', '1', 'translation', 'port', tport]) # check validate() - outbound-interface must be defined self.cli_commit() nftables_search = [ ['oifname "eth1"', 'ip6 saddr 2001:db8:2222::/64', 'tcp dport 9999', 'tcp sport 8080', 'snat to [2001:db8:1111::1]:80'] ] self.verify_nftables(nftables_search, 'ip6 vyos_nat') def test_nat66_no_rules(self): # T3206: deleting all rules but keep the direction 'destination' or # 'source' resulteds in KeyError: 'rule'. # # Test that both 'nat destination' and 'nat source' nodes can exist # without any rule self.cli_set(src_path) self.cli_set(dst_path) self.cli_commit() if __name__ == '__main__': unittest.main(verbosity=2) diff --git a/smoketest/scripts/cli/test_policy_route.py b/smoketest/scripts/cli/test_policy_route.py index c0b7c1fe7..462fc24d0 100755 --- a/smoketest/scripts/cli/test_policy_route.py +++ b/smoketest/scripts/cli/test_policy_route.py @@ -1,283 +1,272 @@ #!/usr/bin/env python3 # # Copyright (C) 2021-2023 VyOS maintainers and contributors # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License version 2 or later as # published by the Free Software Foundation. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. import unittest from base_vyostest_shim import VyOSUnitTestSHIM from vyos.utils.process import cmd mark = '100' conn_mark = '555' conn_mark_set = '111' table_mark_offset = 0x7fffffff table_id = '101' interface = 'eth0' interface_wc = 'ppp*' interface_ip = '172.16.10.1/24' class TestPolicyRoute(VyOSUnitTestSHIM.TestCase): @classmethod def setUpClass(cls): super(TestPolicyRoute, cls).setUpClass() # Clear out current configuration to allow running this test on a live system cls.cli_delete(cls, ['policy', 'route']) cls.cli_delete(cls, ['policy', 'route6']) cls.cli_set(cls, ['interfaces', 'ethernet', interface, 'address', interface_ip]) cls.cli_set(cls, ['protocols', 'static', 'table', table_id, 'route', '0.0.0.0/0', 'interface', interface]) @classmethod def tearDownClass(cls): cls.cli_delete(cls, ['interfaces', 'ethernet', interface, 'address', interface_ip]) cls.cli_delete(cls, ['protocols', 'static', 'table', table_id]) super(TestPolicyRoute, cls).tearDownClass() def tearDown(self): self.cli_delete(['policy', 'route']) self.cli_delete(['policy', 'route6']) self.cli_commit() # Verify nftables cleanup nftables_search = [ ['set N_smoketest_network'], ['set N_smoketest_network1'], ['chain VYOS_PBR_smoketest'] ] self.verify_nftables(nftables_search, 'ip vyos_mangle', inverse=True) # Verify ip rule cleanup ip_rule_search = [ ['fwmark ' + hex(table_mark_offset - int(table_id)), 'lookup ' + table_id] ] self.verify_rules(ip_rule_search, inverse=True) - def verify_nftables(self, nftables_search, table, inverse=False): - nftables_output = cmd(f'sudo nft list table {table}') - - for search in nftables_search: - matched = False - for line in nftables_output.split("\n"): - if all(item in line for item in search): - matched = True - break - self.assertTrue(not matched if inverse else matched, msg=search) - def verify_rules(self, rules_search, inverse=False): rule_output = cmd('ip rule show') for search in rules_search: matched = False for line in rule_output.split("\n"): if all(item in line for item in search): matched = True break self.assertTrue(not matched if inverse else matched, msg=search) def test_pbr_group(self): self.cli_set(['firewall', 'group', 'network-group', 'smoketest_network', 'network', '172.16.99.0/24']) self.cli_set(['firewall', 'group', 'network-group', 'smoketest_network1', 'network', '172.16.101.0/24']) self.cli_set(['firewall', 'group', 'network-group', 'smoketest_network1', 'include', 'smoketest_network']) self.cli_set(['policy', 'route', 'smoketest', 'rule', '1', 'source', 'group', 'network-group', 'smoketest_network']) self.cli_set(['policy', 'route', 'smoketest', 'rule', '1', 'destination', 'group', 'network-group', 'smoketest_network1']) self.cli_set(['policy', 'route', 'smoketest', 'rule', '1', 'set', 'mark', mark]) self.cli_set(['policy', 'route', 'smoketest', 'interface', interface]) self.cli_commit() nftables_search = [ [f'iifname "{interface}"','jump VYOS_PBR_UD_smoketest'], ['ip daddr @N_smoketest_network1', 'ip saddr @N_smoketest_network'], ] self.verify_nftables(nftables_search, 'ip vyos_mangle') self.cli_delete(['firewall']) def test_pbr_mark(self): self.cli_set(['policy', 'route', 'smoketest', 'rule', '1', 'source', 'address', '172.16.20.10']) self.cli_set(['policy', 'route', 'smoketest', 'rule', '1', 'destination', 'address', '172.16.10.10']) self.cli_set(['policy', 'route', 'smoketest', 'rule', '1', 'set', 'mark', mark]) self.cli_set(['policy', 'route', 'smoketest', 'interface', interface]) self.cli_commit() mark_hex = "{0:#010x}".format(int(mark)) nftables_search = [ [f'iifname "{interface}"','jump VYOS_PBR_UD_smoketest'], ['ip daddr 172.16.10.10', 'ip saddr 172.16.20.10', 'meta mark set ' + mark_hex], ] self.verify_nftables(nftables_search, 'ip vyos_mangle') def test_pbr_mark_connection(self): self.cli_set(['policy', 'route', 'smoketest', 'rule', '1', 'source', 'address', '172.16.20.10']) self.cli_set(['policy', 'route', 'smoketest', 'rule', '1', 'destination', 'address', '172.16.10.10']) self.cli_set(['policy', 'route', 'smoketest', 'rule', '1', 'connection-mark', conn_mark]) self.cli_set(['policy', 'route', 'smoketest', 'rule', '1', 'set', 'connection-mark', conn_mark_set]) self.cli_set(['policy', 'route', 'smoketest', 'interface', interface]) self.cli_commit() mark_hex = "{0:#010x}".format(int(conn_mark)) mark_hex_set = "{0:#010x}".format(int(conn_mark_set)) nftables_search = [ [f'iifname "{interface}"','jump VYOS_PBR_UD_smoketest'], ['ip daddr 172.16.10.10', 'ip saddr 172.16.20.10', 'ct mark ' + mark_hex, 'ct mark set ' + mark_hex_set], ] self.verify_nftables(nftables_search, 'ip vyos_mangle') def test_pbr_table(self): self.cli_set(['policy', 'route', 'smoketest', 'rule', '1', 'protocol', 'tcp']) self.cli_set(['policy', 'route', 'smoketest', 'rule', '1', 'destination', 'port', '8888']) self.cli_set(['policy', 'route', 'smoketest', 'rule', '1', 'tcp', 'flags', 'syn']) self.cli_set(['policy', 'route', 'smoketest', 'rule', '1', 'tcp', 'flags', 'not', 'ack']) self.cli_set(['policy', 'route', 'smoketest', 'rule', '1', 'set', 'table', table_id]) self.cli_set(['policy', 'route6', 'smoketest6', 'rule', '1', 'protocol', 'tcp_udp']) self.cli_set(['policy', 'route6', 'smoketest6', 'rule', '1', 'destination', 'port', '8888']) self.cli_set(['policy', 'route6', 'smoketest6', 'rule', '1', 'set', 'table', table_id]) self.cli_set(['policy', 'route', 'smoketest', 'interface', interface]) self.cli_set(['policy', 'route6', 'smoketest6', 'interface', interface]) self.cli_commit() mark_hex = "{0:#010x}".format(table_mark_offset - int(table_id)) # IPv4 nftables_search = [ [f'iifname "{interface}"', 'jump VYOS_PBR_UD_smoketest'], ['tcp flags syn / syn,ack', 'tcp dport 8888', 'meta mark set ' + mark_hex] ] self.verify_nftables(nftables_search, 'ip vyos_mangle') # IPv6 nftables6_search = [ [f'iifname "{interface}"', 'jump VYOS_PBR6_UD_smoketest'], ['meta l4proto { tcp, udp }', 'th dport 8888', 'meta mark set ' + mark_hex] ] self.verify_nftables(nftables6_search, 'ip6 vyos_mangle') # IP rule fwmark -> table ip_rule_search = [ ['fwmark ' + hex(table_mark_offset - int(table_id)), 'lookup ' + table_id] ] self.verify_rules(ip_rule_search) def test_pbr_matching_criteria(self): self.cli_set(['policy', 'route', 'smoketest', 'default-log']) self.cli_set(['policy', 'route', 'smoketest', 'rule', '1', 'protocol', 'udp']) self.cli_set(['policy', 'route', 'smoketest', 'rule', '1', 'action', 'drop']) self.cli_set(['policy', 'route', 'smoketest', 'rule', '1', 'mark', '2020']) self.cli_set(['policy', 'route', 'smoketest', 'rule', '2', 'protocol', 'tcp']) self.cli_set(['policy', 'route', 'smoketest', 'rule', '2', 'tcp', 'flags', 'syn']) self.cli_set(['policy', 'route', 'smoketest', 'rule', '2', 'tcp', 'flags', 'not', 'ack']) self.cli_set(['policy', 'route', 'smoketest', 'rule', '2', 'mark', '2-3000']) self.cli_set(['policy', 'route', 'smoketest', 'rule', '2', 'set', 'table', table_id]) self.cli_set(['policy', 'route', 'smoketest', 'rule', '3', 'source', 'address', '198.51.100.0/24']) self.cli_set(['policy', 'route', 'smoketest', 'rule', '3', 'protocol', 'tcp']) self.cli_set(['policy', 'route', 'smoketest', 'rule', '3', 'destination', 'port', '22']) self.cli_set(['policy', 'route', 'smoketest', 'rule', '3', 'state', 'new']) self.cli_set(['policy', 'route', 'smoketest', 'rule', '3', 'ttl', 'gt', '2']) self.cli_set(['policy', 'route', 'smoketest', 'rule', '3', 'mark', '!456']) self.cli_set(['policy', 'route', 'smoketest', 'rule', '3', 'set', 'table', table_id]) self.cli_set(['policy', 'route', 'smoketest', 'rule', '4', 'protocol', 'icmp']) self.cli_set(['policy', 'route', 'smoketest', 'rule', '4', 'icmp', 'type-name', 'echo-request']) self.cli_set(['policy', 'route', 'smoketest', 'rule', '4', 'packet-length', '128']) self.cli_set(['policy', 'route', 'smoketest', 'rule', '4', 'packet-length', '1024-2048']) self.cli_set(['policy', 'route', 'smoketest', 'rule', '4', 'packet-type', 'other']) self.cli_set(['policy', 'route', 'smoketest', 'rule', '4', 'log']) self.cli_set(['policy', 'route', 'smoketest', 'rule', '4', 'set', 'table', table_id]) self.cli_set(['policy', 'route', 'smoketest', 'rule', '5', 'dscp', '41']) self.cli_set(['policy', 'route', 'smoketest', 'rule', '5', 'dscp', '57-59']) self.cli_set(['policy', 'route', 'smoketest', 'rule', '5', 'mark', '!456-500']) self.cli_set(['policy', 'route', 'smoketest', 'rule', '5', 'set', 'table', table_id]) self.cli_set(['policy', 'route6', 'smoketest6', 'default-log']) self.cli_set(['policy', 'route6', 'smoketest6', 'rule', '1', 'protocol', 'udp']) self.cli_set(['policy', 'route6', 'smoketest6', 'rule', '1', 'action', 'drop']) self.cli_set(['policy', 'route6', 'smoketest6', 'rule', '2', 'protocol', 'tcp']) self.cli_set(['policy', 'route6', 'smoketest6', 'rule', '2', 'tcp', 'flags', 'syn']) self.cli_set(['policy', 'route6', 'smoketest6', 'rule', '2', 'tcp', 'flags', 'not', 'ack']) self.cli_set(['policy', 'route6', 'smoketest6', 'rule', '2', 'set', 'table', table_id]) self.cli_set(['policy', 'route6', 'smoketest6', 'rule', '3', 'source', 'address', '2001:db8::0/64']) self.cli_set(['policy', 'route6', 'smoketest6', 'rule', '3', 'protocol', 'tcp']) self.cli_set(['policy', 'route6', 'smoketest6', 'rule', '3', 'destination', 'port', '22']) self.cli_set(['policy', 'route6', 'smoketest6', 'rule', '3', 'state', 'new']) self.cli_set(['policy', 'route6', 'smoketest6', 'rule', '3', 'hop-limit', 'gt', '2']) self.cli_set(['policy', 'route6', 'smoketest6', 'rule', '3', 'set', 'table', table_id]) self.cli_set(['policy', 'route6', 'smoketest6', 'rule', '4', 'protocol', 'icmpv6']) self.cli_set(['policy', 'route6', 'smoketest6', 'rule', '4', 'icmpv6', 'type', 'echo-request']) self.cli_set(['policy', 'route6', 'smoketest6', 'rule', '4', 'packet-length-exclude', '128']) self.cli_set(['policy', 'route6', 'smoketest6', 'rule', '4', 'packet-length-exclude', '1024-2048']) self.cli_set(['policy', 'route6', 'smoketest6', 'rule', '4', 'packet-type', 'multicast']) self.cli_set(['policy', 'route6', 'smoketest6', 'rule', '4', 'log']) self.cli_set(['policy', 'route6', 'smoketest6', 'rule', '4', 'set', 'table', table_id]) self.cli_set(['policy', 'route6', 'smoketest6', 'rule', '5', 'dscp-exclude', '61']) self.cli_set(['policy', 'route6', 'smoketest6', 'rule', '5', 'dscp-exclude', '14-19']) self.cli_set(['policy', 'route6', 'smoketest6', 'rule', '5', 'set', 'table', table_id]) self.cli_set(['policy', 'route', 'smoketest', 'interface', interface]) self.cli_set(['policy', 'route', 'smoketest', 'interface', interface_wc]) self.cli_set(['policy', 'route6', 'smoketest6', 'interface', interface_wc]) self.cli_commit() mark_hex = "{0:#010x}".format(table_mark_offset - int(table_id)) # IPv4 nftables_search = [ ['iifname { "' + interface + '", "' + interface_wc + '" }', 'jump VYOS_PBR_UD_smoketest'], ['meta l4proto udp', 'meta mark 0x000007e4', 'drop'], ['tcp flags syn / syn,ack', 'meta mark 0x00000002-0x00000bb8', 'meta mark set ' + mark_hex], ['ct state new', 'tcp dport 22', 'ip saddr 198.51.100.0/24', 'ip ttl > 2', 'meta mark != 0x000001c8', 'meta mark set ' + mark_hex], ['log prefix "[ipv4-route-smoketest-4-A]"', 'icmp type echo-request', 'ip length { 128, 1024-2048 }', 'meta pkttype other', 'meta mark set ' + mark_hex], ['ip dscp { 0x29, 0x39-0x3b }', 'meta mark != 0x000001c8-0x000001f4', 'meta mark set ' + mark_hex], ['log prefix "[ipv4-smoketest-default]"'] ] self.verify_nftables(nftables_search, 'ip vyos_mangle') # IPv6 nftables6_search = [ [f'iifname "{interface_wc}"', 'jump VYOS_PBR6_UD_smoketest'], ['meta l4proto udp', 'drop'], ['tcp flags syn / syn,ack', 'meta mark set ' + mark_hex], ['ct state new', 'tcp dport 22', 'ip6 saddr 2001:db8::/64', 'ip6 hoplimit > 2', 'meta mark set ' + mark_hex], ['log prefix "[ipv6-route6-smoketest6-4-A]"', 'icmpv6 type echo-request', 'ip6 length != { 128, 1024-2048 }', 'meta pkttype multicast', 'meta mark set ' + mark_hex], ['ip6 dscp != { 0x0e-0x13, 0x3d }', 'meta mark set ' + mark_hex], ['log prefix "[ipv6-smoketest6-default]"'] ] self.verify_nftables(nftables6_search, 'ip6 vyos_mangle') if __name__ == '__main__': unittest.main(verbosity=2) diff --git a/smoketest/scripts/cli/test_system_conntrack.py b/smoketest/scripts/cli/test_system_conntrack.py index 02473da95..950619e1a 100755 --- a/smoketest/scripts/cli/test_system_conntrack.py +++ b/smoketest/scripts/cli/test_system_conntrack.py @@ -1,345 +1,333 @@ #!/usr/bin/env python3 # # Copyright (C) 2021-2023 VyOS maintainers and contributors # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License version 2 or later as # published by the Free Software Foundation. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. import os import re import unittest from base_vyostest_shim import VyOSUnitTestSHIM from vyos.firewall import find_nftables_rule -from vyos.utils.process import cmd from vyos.utils.file import read_file base_path = ['system', 'conntrack'] def get_sysctl(parameter): tmp = parameter.replace(r'.', r'/') return read_file(f'/proc/sys/{tmp}') class TestSystemConntrack(VyOSUnitTestSHIM.TestCase): def tearDown(self): self.cli_delete(base_path) self.cli_commit() - def verify_nftables(self, nftables_search, table, inverse=False, args=''): - nftables_output = cmd(f'sudo nft {args} list table {table}') - - for search in nftables_search: - matched = False - for line in nftables_output.split("\n"): - if all(item in line for item in search): - matched = True - break - self.assertTrue(not matched if inverse else matched, msg=search) - def test_conntrack_options(self): conntrack_config = { 'net.netfilter.nf_conntrack_expect_max' : { 'cli' : ['expect-table-size'], 'test_value' : '8192', 'default_value' : '2048', }, 'net.nf_conntrack_max' :{ 'cli' : ['table-size'], 'test_value' : '500000', 'default_value' : '262144', }, 'net.ipv4.tcp_max_syn_backlog' :{ 'cli' : ['tcp', 'half-open-connections'], 'test_value' : '2048', 'default_value' : '512', }, 'net.netfilter.nf_conntrack_tcp_loose' :{ 'cli' : ['tcp', 'loose'], 'test_value' : 'disable', 'default_value' : '1', }, 'net.netfilter.nf_conntrack_tcp_max_retrans' :{ 'cli' : ['tcp', 'max-retrans'], 'test_value' : '128', 'default_value' : '3', }, 'net.netfilter.nf_conntrack_icmp_timeout' :{ 'cli' : ['timeout', 'icmp'], 'test_value' : '180', 'default_value' : '30', }, 'net.netfilter.nf_conntrack_generic_timeout' :{ 'cli' : ['timeout', 'other'], 'test_value' : '1200', 'default_value' : '600', }, 'net.netfilter.nf_conntrack_tcp_timeout_close_wait' :{ 'cli' : ['timeout', 'tcp', 'close-wait'], 'test_value' : '30', 'default_value' : '60', }, 'net.netfilter.nf_conntrack_tcp_timeout_close' :{ 'cli' : ['timeout', 'tcp', 'close'], 'test_value' : '20', 'default_value' : '10', }, 'net.netfilter.nf_conntrack_tcp_timeout_established' :{ 'cli' : ['timeout', 'tcp', 'established'], 'test_value' : '1000', 'default_value' : '432000', }, 'net.netfilter.nf_conntrack_tcp_timeout_fin_wait' :{ 'cli' : ['timeout', 'tcp', 'fin-wait'], 'test_value' : '240', 'default_value' : '120', }, 'net.netfilter.nf_conntrack_tcp_timeout_last_ack' :{ 'cli' : ['timeout', 'tcp', 'last-ack'], 'test_value' : '300', 'default_value' : '30', }, 'net.netfilter.nf_conntrack_tcp_timeout_syn_recv' :{ 'cli' : ['timeout', 'tcp', 'syn-recv'], 'test_value' : '100', 'default_value' : '60', }, 'net.netfilter.nf_conntrack_tcp_timeout_syn_sent' :{ 'cli' : ['timeout', 'tcp', 'syn-sent'], 'test_value' : '300', 'default_value' : '120', }, 'net.netfilter.nf_conntrack_tcp_timeout_time_wait' :{ 'cli' : ['timeout', 'tcp', 'time-wait'], 'test_value' : '303', 'default_value' : '120', }, 'net.netfilter.nf_conntrack_udp_timeout' :{ 'cli' : ['timeout', 'udp', 'other'], 'test_value' : '90', 'default_value' : '30', }, 'net.netfilter.nf_conntrack_udp_timeout_stream' :{ 'cli' : ['timeout', 'udp', 'stream'], 'test_value' : '200', 'default_value' : '180', }, } for parameter, parameter_config in conntrack_config.items(): self.cli_set(base_path + parameter_config['cli'] + [parameter_config['test_value']]) # commit changes self.cli_commit() # validate configuration for parameter, parameter_config in conntrack_config.items(): tmp = parameter_config['test_value'] # net.netfilter.nf_conntrack_tcp_loose has a fancy "disable" value, # make this work if tmp == 'disable': tmp = '0' self.assertEqual(get_sysctl(f'{parameter}'), tmp) # delete all configuration options and revert back to defaults self.cli_delete(base_path) self.cli_commit() # validate configuration for parameter, parameter_config in conntrack_config.items(): self.assertEqual(get_sysctl(f'{parameter}'), parameter_config['default_value']) def test_conntrack_module_enable(self): # conntrack helper modules are disabled by default modules = { 'ftp': { 'driver': ['nf_nat_ftp', 'nf_conntrack_ftp'], 'nftables': ['ct helper set "ftp_tcp"'] }, 'h323': { 'driver': ['nf_nat_h323', 'nf_conntrack_h323'], 'nftables': ['ct helper set "ras_udp"', 'ct helper set "q931_tcp"'] }, 'nfs': { 'nftables': ['ct helper set "rpc_tcp"', 'ct helper set "rpc_udp"'] }, 'pptp': { 'driver': ['nf_nat_pptp', 'nf_conntrack_pptp'], 'nftables': ['ct helper set "pptp_tcp"'] }, 'sip': { 'driver': ['nf_nat_sip', 'nf_conntrack_sip'], 'nftables': ['ct helper set "sip_tcp"', 'ct helper set "sip_udp"'] }, 'sqlnet': { 'nftables': ['ct helper set "tns_tcp"'] }, 'tftp': { 'driver': ['nf_nat_tftp', 'nf_conntrack_tftp'], 'nftables': ['ct helper set "tftp_udp"'] }, } # load modules for module in modules: self.cli_set(base_path + ['modules', module]) # commit changes self.cli_commit() # verify modules are loaded on the system for module, module_options in modules.items(): if 'driver' in module_options: for driver in module_options['driver']: self.assertTrue(os.path.isdir(f'/sys/module/{driver}')) if 'nftables' in module_options: for rule in module_options['nftables']: self.assertTrue(find_nftables_rule('ip vyos_conntrack', 'VYOS_CT_HELPER', [rule]) != None) # unload modules for module in modules: self.cli_delete(base_path + ['modules', module]) # commit changes self.cli_commit() # verify modules are not loaded on the system for module, module_options in modules.items(): if 'driver' in module_options: for driver in module_options['driver']: self.assertFalse(os.path.isdir(f'/sys/module/{driver}')) if 'nftables' in module_options: for rule in module_options['nftables']: self.assertTrue(find_nftables_rule('ip vyos_conntrack', 'VYOS_CT_HELPER', [rule]) == None) def test_conntrack_hash_size(self): hash_size = '65536' hash_size_default = '32768' self.cli_set(base_path + ['hash-size', hash_size]) # commit changes self.cli_commit() # verify new configuration - only effective after reboot, but # a valid config file is sufficient tmp = read_file('/etc/modprobe.d/vyatta_nf_conntrack.conf') self.assertIn(hash_size, tmp) # Test default value by deleting the configuration self.cli_delete(base_path + ['hash-size']) # commit changes self.cli_commit() # verify new configuration - only effective after reboot, but # a valid config file is sufficient tmp = read_file('/etc/modprobe.d/vyatta_nf_conntrack.conf') self.assertIn(hash_size_default, tmp) def test_conntrack_ignore(self): address_group = 'conntracktest' address_group_member = '192.168.0.1' ipv6_address_group = 'conntracktest6' ipv6_address_group_member = 'dead:beef::1' self.cli_set(['firewall', 'group', 'address-group', address_group, 'address', address_group_member]) self.cli_set(['firewall', 'group', 'ipv6-address-group', ipv6_address_group, 'address', ipv6_address_group_member]) self.cli_set(base_path + ['ignore', 'ipv4', 'rule', '1', 'source', 'address', '192.0.2.1']) self.cli_set(base_path + ['ignore', 'ipv4', 'rule', '1', 'destination', 'address', '192.0.2.2']) self.cli_set(base_path + ['ignore', 'ipv4', 'rule', '1', 'destination', 'port', '22']) self.cli_set(base_path + ['ignore', 'ipv4', 'rule', '1', 'protocol', 'tcp']) self.cli_set(base_path + ['ignore', 'ipv4', 'rule', '1', 'tcp', 'flags', 'syn']) self.cli_set(base_path + ['ignore', 'ipv4', 'rule', '2', 'source', 'address', '192.0.2.1']) self.cli_set(base_path + ['ignore', 'ipv4', 'rule', '2', 'destination', 'group', 'address-group', address_group]) self.cli_set(base_path + ['ignore', 'ipv6', 'rule', '11', 'source', 'address', 'fe80::1']) self.cli_set(base_path + ['ignore', 'ipv6', 'rule', '11', 'destination', 'address', 'fe80::2']) self.cli_set(base_path + ['ignore', 'ipv6', 'rule', '11', 'destination', 'port', '22']) self.cli_set(base_path + ['ignore', 'ipv6', 'rule', '11', 'protocol', 'tcp']) self.cli_set(base_path + ['ignore', 'ipv6', 'rule', '12', 'source', 'address', 'fe80::1']) self.cli_set(base_path + ['ignore', 'ipv6', 'rule', '12', 'destination', 'group', 'address-group', ipv6_address_group]) self.cli_set(base_path + ['ignore', 'ipv6', 'rule', '13', 'source', 'address', 'fe80::1']) self.cli_set(base_path + ['ignore', 'ipv6', 'rule', '13', 'destination', 'address', '!fe80::3']) self.cli_commit() nftables_search = [ ['ip saddr 192.0.2.1', 'ip daddr 192.0.2.2', 'tcp dport 22', 'tcp flags & syn == syn', 'notrack'], ['ip saddr 192.0.2.1', 'ip daddr @A_conntracktest', 'notrack'] ] nftables6_search = [ ['ip6 saddr fe80::1', 'ip6 daddr fe80::2', 'tcp dport 22', 'notrack'], ['ip6 saddr fe80::1', 'ip6 daddr @A6_conntracktest6', 'notrack'], ['ip6 saddr fe80::1', 'ip6 daddr != fe80::3', 'notrack'] ] self.verify_nftables(nftables_search, 'ip vyos_conntrack') self.verify_nftables(nftables6_search, 'ip6 vyos_conntrack') self.cli_delete(['firewall']) def test_conntrack_timeout_custom(self): self.cli_set(base_path + ['timeout', 'custom', 'ipv4', 'rule', '1', 'source', 'address', '192.0.2.1']) self.cli_set(base_path + ['timeout', 'custom', 'ipv4', 'rule', '1', 'destination', 'address', '192.0.2.2']) self.cli_set(base_path + ['timeout', 'custom', 'ipv4', 'rule', '1', 'destination', 'port', '22']) self.cli_set(base_path + ['timeout', 'custom', 'ipv4', 'rule', '1', 'protocol', 'tcp', 'syn-sent', '77']) self.cli_set(base_path + ['timeout', 'custom', 'ipv4', 'rule', '1', 'protocol', 'tcp', 'close', '88']) self.cli_set(base_path + ['timeout', 'custom', 'ipv4', 'rule', '1', 'protocol', 'tcp', 'established', '99']) self.cli_set(base_path + ['timeout', 'custom', 'ipv4', 'rule', '2', 'inbound-interface', 'eth1']) self.cli_set(base_path + ['timeout', 'custom', 'ipv4', 'rule', '2', 'source', 'address', '198.51.100.1']) self.cli_set(base_path + ['timeout', 'custom', 'ipv4', 'rule', '2', 'protocol', 'udp', 'unreplied', '55']) self.cli_set(base_path + ['timeout', 'custom', 'ipv6', 'rule', '1', 'source', 'address', '2001:db8::1']) self.cli_set(base_path + ['timeout', 'custom', 'ipv6', 'rule', '1', 'inbound-interface', 'eth2']) self.cli_set(base_path + ['timeout', 'custom', 'ipv6', 'rule', '1', 'protocol', 'tcp', 'time-wait', '22']) self.cli_set(base_path + ['timeout', 'custom', 'ipv6', 'rule', '1', 'protocol', 'tcp', 'last-ack', '33']) self.cli_commit() nftables_search = [ ['ct timeout ct-timeout-1 {'], ['protocol tcp'], ['policy = { syn_sent : 1m17s, established : 1m39s, close : 1m28s }'], ['ct timeout ct-timeout-2 {'], ['protocol udp'], ['policy = { unreplied : 55s }'], ['chain VYOS_CT_TIMEOUT {'], ['ip saddr 192.0.2.1', 'ip daddr 192.0.2.2', 'tcp dport 22', 'ct timeout set "ct-timeout-1"'], ['iifname "eth1"', 'meta l4proto udp', 'ip saddr 198.51.100.1', 'ct timeout set "ct-timeout-2"'] ] nftables6_search = [ ['ct timeout ct-timeout-1 {'], ['protocol tcp'], ['policy = { last_ack : 33s, time_wait : 22s }'], ['chain VYOS_CT_TIMEOUT {'], ['iifname "eth2"', 'meta l4proto tcp', 'ip6 saddr 2001:db8::1', 'ct timeout set "ct-timeout-1"'] ] self.verify_nftables(nftables_search, 'ip vyos_conntrack') self.verify_nftables(nftables6_search, 'ip6 vyos_conntrack') self.cli_delete(['firewall']) if __name__ == '__main__': unittest.main(verbosity=2) diff --git a/smoketest/scripts/cli/test_vrf.py b/smoketest/scripts/cli/test_vrf.py index 438387f2d..c96b8e374 100755 --- a/smoketest/scripts/cli/test_vrf.py +++ b/smoketest/scripts/cli/test_vrf.py @@ -1,533 +1,556 @@ #!/usr/bin/env python3 # # Copyright (C) 2020-2024 VyOS maintainers and contributors # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License version 2 or later as # published by the Free Software Foundation. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. import re import os import json import unittest from netifaces import interfaces from base_vyostest_shim import VyOSUnitTestSHIM from vyos.configsession import ConfigSessionError from vyos.ifconfig import Interface from vyos.ifconfig import Section from vyos.template import is_ipv4 from vyos.utils.process import cmd from vyos.utils.file import read_file from vyos.utils.network import get_interface_config from vyos.utils.network import is_intf_addr_assigned from vyos.utils.system import sysctl_read base_path = ['vrf'] vrfs = ['red', 'green', 'blue', 'foo-bar', 'baz_foo'] v4_protocols = ['any', 'babel', 'bgp', 'connected', 'eigrp', 'isis', 'kernel', 'ospf', 'rip', 'static', 'table'] v6_protocols = ['any', 'babel', 'bgp', 'connected', 'isis', 'kernel', 'ospfv3', 'ripng', 'static', 'table'] class VRFTest(VyOSUnitTestSHIM.TestCase): _interfaces = [] @classmethod def setUpClass(cls): # we need to filter out VLAN interfaces identified by a dot (.) # in their name - just in case! if 'TEST_ETH' in os.environ: tmp = os.environ['TEST_ETH'].split() cls._interfaces = tmp else: for tmp in Section.interfaces('ethernet', vlan=False): cls._interfaces.append(tmp) # call base-classes classmethod super(VRFTest, cls).setUpClass() def setUp(self): # VRF strict_most ist always enabled tmp = read_file('/proc/sys/net/vrf/strict_mode') self.assertEqual(tmp, '1') def tearDown(self): # delete all VRFs self.cli_delete(base_path) self.cli_commit() for vrf in vrfs: self.assertNotIn(vrf, interfaces()) def test_vrf_vni_and_table_id(self): base_table = '1000' table = base_table for vrf in vrfs: base = base_path + ['name', vrf] description = f'VyOS-VRF-{vrf}' self.cli_set(base + ['description', description]) # check validate() - a table ID is mandatory with self.assertRaises(ConfigSessionError): self.cli_commit() self.cli_set(base + ['table', table]) self.cli_set(base + ['vni', table]) if vrf == 'green': self.cli_set(base + ['disable']) table = str(int(table) + 1) # commit changes self.cli_commit() # Verify VRF configuration table = base_table iproute2_config = read_file('/etc/iproute2/rt_tables.d/vyos-vrf.conf') for vrf in vrfs: description = f'VyOS-VRF-{vrf}' self.assertTrue(vrf in interfaces()) vrf_if = Interface(vrf) # validate proper interface description self.assertEqual(vrf_if.get_alias(), description) # validate admin up/down state of VRF state = 'up' if vrf == 'green': state = 'down' self.assertEqual(vrf_if.get_admin_state(), state) # Test the iproute2 lookup file, syntax is as follows: # # # id vrf name comment # 1000 red # VyOS-VRF-red # 1001 green # VyOS-VRF-green # ... regex = f'{table}\s+{vrf}\s+#\s+{description}' self.assertTrue(re.findall(regex, iproute2_config)) frrconfig = self.getFRRconfig(f'vrf {vrf}') self.assertIn(f' vni {table}', frrconfig) tmp = get_interface_config(vrf) self.assertEqual(int(table), tmp['linkinfo']['info_data']['table']) # Increment table ID for the next run table = str(int(table) + 1) def test_vrf_loopbacks_ips(self): table = '2000' for vrf in vrfs: base = base_path + ['name', vrf] self.cli_set(base + ['table', str(table)]) table = str(int(table) + 1) # commit changes self.cli_commit() # Verify VRF configuration loopbacks = ['127.0.0.1', '::1'] for vrf in vrfs: # Ensure VRF was created self.assertIn(vrf, interfaces()) # Verify IP forwarding is 1 (enabled) self.assertEqual(sysctl_read(f'net.ipv4.conf.{vrf}.forwarding'), '1') self.assertEqual(sysctl_read(f'net.ipv6.conf.{vrf}.forwarding'), '1') # Test for proper loopback IP assignment for addr in loopbacks: self.assertTrue(is_intf_addr_assigned(vrf, addr)) def test_vrf_bind_all(self): table = '2000' for vrf in vrfs: base = base_path + ['name', vrf] self.cli_set(base + ['table', str(table)]) table = str(int(table) + 1) self.cli_set(base_path + ['bind-to-all']) # commit changes self.cli_commit() # Verify VRF configuration self.assertEqual(sysctl_read('net.ipv4.tcp_l3mdev_accept'), '1') self.assertEqual(sysctl_read('net.ipv4.udp_l3mdev_accept'), '1') # If there is any VRF defined, strict_mode should be on self.assertEqual(sysctl_read('net.vrf.strict_mode'), '1') def test_vrf_table_id_is_unalterable(self): # Linux Kernel prohibits the change of a VRF table on the fly. # VRF must be deleted and recreated! table = '1000' vrf = vrfs[0] base = base_path + ['name', vrf] self.cli_set(base + ['table', table]) # commit changes self.cli_commit() # Check if VRF has been created self.assertTrue(vrf in interfaces()) table = str(int(table) + 1) self.cli_set(base + ['table', table]) # check validate() - table ID can not be altered! with self.assertRaises(ConfigSessionError): self.cli_commit() def test_vrf_assign_interface(self): vrf = vrfs[0] table = '5000' self.cli_set(['vrf', 'name', vrf, 'table', table]) for interface in self._interfaces: section = Section.section(interface) self.cli_set(['interfaces', section, interface, 'vrf', vrf]) # commit changes self.cli_commit() # Verify VRF assignmant for interface in self._interfaces: tmp = get_interface_config(interface) self.assertEqual(vrf, tmp['master']) # cleanup section = Section.section(interface) self.cli_delete(['interfaces', section, interface, 'vrf']) def test_vrf_static_route(self): base_table = '100' table = base_table for vrf in vrfs: next_hop = f'192.0.{table}.1' prefix = f'10.0.{table}.0/24' base = base_path + ['name', vrf] self.cli_set(base + ['vni', table]) # check validate() - a table ID is mandatory with self.assertRaises(ConfigSessionError): self.cli_commit() self.cli_set(base + ['table', table]) self.cli_set(base + ['protocols', 'static', 'route', prefix, 'next-hop', next_hop]) table = str(int(table) + 1) # commit changes self.cli_commit() # Verify VRF configuration table = base_table for vrf in vrfs: next_hop = f'192.0.{table}.1' prefix = f'10.0.{table}.0/24' self.assertTrue(vrf in interfaces()) frrconfig = self.getFRRconfig(f'vrf {vrf}') self.assertIn(f' vni {table}', frrconfig) self.assertIn(f' ip route {prefix} {next_hop}', frrconfig) # Increment table ID for the next run table = str(int(table) + 1) def test_vrf_link_local_ip_addresses(self): # Testcase for issue T4331 table = '100' vrf = 'orange' interface = 'dum9998' addresses = ['192.0.2.1/26', '2001:db8:9998::1/64', 'fe80::1/64'] for address in addresses: self.cli_set(['interfaces', 'dummy', interface, 'address', address]) # Create dummy interfaces self.cli_commit() # ... and verify IP addresses got assigned for address in addresses: self.assertTrue(is_intf_addr_assigned(interface, address)) # Move interface to VRF self.cli_set(base_path + ['name', vrf, 'table', table]) self.cli_set(['interfaces', 'dummy', interface, 'vrf', vrf]) # Apply VRF config self.cli_commit() # Ensure VRF got created self.assertIn(vrf, interfaces()) # ... and IP addresses are still assigned for address in addresses: self.assertTrue(is_intf_addr_assigned(interface, address)) # Verify VRF table ID tmp = get_interface_config(vrf) self.assertEqual(int(table), tmp['linkinfo']['info_data']['table']) # Verify interface is assigned to VRF tmp = get_interface_config(interface) self.assertEqual(vrf, tmp['master']) # Delete Interface self.cli_delete(['interfaces', 'dummy', interface]) self.cli_commit() def test_vrf_disable_forwarding(self): table = '2000' for vrf in vrfs: base = base_path + ['name', vrf] self.cli_set(base + ['table', table]) self.cli_set(base + ['ip', 'disable-forwarding']) self.cli_set(base + ['ipv6', 'disable-forwarding']) table = str(int(table) + 1) # commit changes self.cli_commit() # Verify VRF configuration loopbacks = ['127.0.0.1', '::1'] for vrf in vrfs: # Ensure VRF was created self.assertIn(vrf, interfaces()) # Verify IP forwarding is 0 (disabled) self.assertEqual(sysctl_read(f'net.ipv4.conf.{vrf}.forwarding'), '0') self.assertEqual(sysctl_read(f'net.ipv6.conf.{vrf}.forwarding'), '0') def test_vrf_ip_protocol_route_map(self): table = '6000' for vrf in vrfs: base = base_path + ['name', vrf] self.cli_set(base + ['table', table]) for protocol in v4_protocols: self.cli_set(['policy', 'route-map', f'route-map-{vrf}-{protocol}', 'rule', '10', 'action', 'permit']) self.cli_set(base + ['ip', 'protocol', protocol, 'route-map', f'route-map-{vrf}-{protocol}']) table = str(int(table) + 1) self.cli_commit() # Verify route-map properly applied to FRR for vrf in vrfs: frrconfig = self.getFRRconfig(f'vrf {vrf}', daemon='zebra') self.assertIn(f'vrf {vrf}', frrconfig) for protocol in v4_protocols: self.assertIn(f' ip protocol {protocol} route-map route-map-{vrf}-{protocol}', frrconfig) # Delete route-maps for vrf in vrfs: base = base_path + ['name', vrf] self.cli_delete(['policy', 'route-map', f'route-map-{vrf}-{protocol}']) self.cli_delete(base + ['ip', 'protocol']) self.cli_commit() # Verify route-map properly is removed from FRR for vrf in vrfs: frrconfig = self.getFRRconfig(f'vrf {vrf}', daemon='zebra') self.assertNotIn(f'vrf {vrf}', frrconfig) def test_vrf_ip_ipv6_protocol_non_existing_route_map(self): table = '6100' non_existing = 'non-existing' for vrf in vrfs: base = base_path + ['name', vrf] self.cli_set(base + ['table', table]) for protocol in v4_protocols: self.cli_set(base + ['ip', 'protocol', protocol, 'route-map', f'v4-{non_existing}']) for protocol in v6_protocols: self.cli_set(base + ['ipv6', 'protocol', protocol, 'route-map', f'v6-{non_existing}']) table = str(int(table) + 1) # Both v4 and v6 route-maps do not exist yet with self.assertRaises(ConfigSessionError): self.cli_commit() self.cli_set(['policy', 'route-map', f'v4-{non_existing}', 'rule', '10', 'action', 'deny']) # v6 route-map does not exist yet with self.assertRaises(ConfigSessionError): self.cli_commit() self.cli_set(['policy', 'route-map', f'v6-{non_existing}', 'rule', '10', 'action', 'deny']) # Commit again self.cli_commit() def test_vrf_ipv6_protocol_route_map(self): table = '6200' for vrf in vrfs: base = base_path + ['name', vrf] self.cli_set(base + ['table', table]) for protocol in v6_protocols: route_map = f'route-map-{vrf}-{protocol.replace("ospfv3", "ospf6")}' self.cli_set(['policy', 'route-map', route_map, 'rule', '10', 'action', 'permit']) self.cli_set(base + ['ipv6', 'protocol', protocol, 'route-map', route_map]) table = str(int(table) + 1) self.cli_commit() # Verify route-map properly applied to FRR for vrf in vrfs: frrconfig = self.getFRRconfig(f'vrf {vrf}', daemon='zebra') self.assertIn(f'vrf {vrf}', frrconfig) for protocol in v6_protocols: # VyOS and FRR use a different name for OSPFv3 (IPv6) if protocol == 'ospfv3': protocol = 'ospf6' route_map = f'route-map-{vrf}-{protocol}' self.assertIn(f' ipv6 protocol {protocol} route-map {route_map}', frrconfig) # Delete route-maps for vrf in vrfs: base = base_path + ['name', vrf] self.cli_delete(['policy', 'route-map', f'route-map-{vrf}-{protocol}']) self.cli_delete(base + ['ipv6', 'protocol']) self.cli_commit() # Verify route-map properly is removed from FRR for vrf in vrfs: frrconfig = self.getFRRconfig(f'vrf {vrf}', daemon='zebra') self.assertNotIn(f'vrf {vrf}', frrconfig) def test_vrf_vni_duplicates(self): base_table = '6300' table = base_table for vrf in vrfs: base = base_path + ['name', vrf] self.cli_set(base + ['table', str(table)]) self.cli_set(base + ['vni', '100']) table = str(int(table) + 1) # L3VNIs can only be used once with self.assertRaises(ConfigSessionError): self.cli_commit() table = base_table for vrf in vrfs: base = base_path + ['name', vrf] self.cli_set(base + ['vni', str(table)]) table = str(int(table) + 1) # commit changes self.cli_commit() # Verify VRF configuration table = base_table for vrf in vrfs: self.assertTrue(vrf in interfaces()) frrconfig = self.getFRRconfig(f'vrf {vrf}') self.assertIn(f' vni {table}', frrconfig) # Increment table ID for the next run table = str(int(table) + 1) def test_vrf_vni_add_change_remove(self): base_table = '6300' table = base_table for vrf in vrfs: base = base_path + ['name', vrf] self.cli_set(base + ['table', str(table)]) self.cli_set(base + ['vni', str(table)]) table = str(int(table) + 1) # commit changes self.cli_commit() # Verify VRF configuration table = base_table for vrf in vrfs: self.assertTrue(vrf in interfaces()) frrconfig = self.getFRRconfig(f'vrf {vrf}') self.assertIn(f' vni {table}', frrconfig) # Increment table ID for the next run table = str(int(table) + 1) # Now change all L3VNIs (increment 2) # We must also change the base_table number as we probably could get # duplicate VNI's during the test as VNIs are applied 1:1 to FRR base_table = '5000' table = base_table for vrf in vrfs: base = base_path + ['name', vrf] self.cli_set(base + ['vni', str(table)]) table = str(int(table) + 2) # commit changes self.cli_commit() # Verify VRF configuration table = base_table for vrf in vrfs: self.assertTrue(vrf in interfaces()) frrconfig = self.getFRRconfig(f'vrf {vrf}') self.assertIn(f' vni {table}', frrconfig) # Increment table ID for the next run table = str(int(table) + 2) # Now delete all the VNIs for vrf in vrfs: base = base_path + ['name', vrf] self.cli_delete(base + ['vni']) # commit changes self.cli_commit() # Verify no VNI is defined for vrf in vrfs: self.assertTrue(vrf in interfaces()) frrconfig = self.getFRRconfig(f'vrf {vrf}') self.assertNotIn('vni', frrconfig) def test_vrf_ip_ipv6_nht(self): table = '6910' for vrf in vrfs: base = base_path + ['name', vrf] self.cli_set(base + ['table', table]) self.cli_set(base + ['ip', 'nht', 'no-resolve-via-default']) self.cli_set(base + ['ipv6', 'nht', 'no-resolve-via-default']) table = str(int(table) + 1) self.cli_commit() # Verify route-map properly applied to FRR for vrf in vrfs: frrconfig = self.getFRRconfig(f'vrf {vrf}', daemon='zebra') self.assertIn(f'vrf {vrf}', frrconfig) self.assertIn(f' no ip nht resolve-via-default', frrconfig) self.assertIn(f' no ipv6 nht resolve-via-default', frrconfig) # Delete route-maps for vrf in vrfs: base = base_path + ['name', vrf] self.cli_delete(base + ['ip']) self.cli_delete(base + ['ipv6']) self.cli_commit() # Verify route-map properly is removed from FRR for vrf in vrfs: frrconfig = self.getFRRconfig(f'vrf {vrf}', daemon='zebra') self.assertNotIn(f' no ip nht resolve-via-default', frrconfig) self.assertNotIn(f' no ipv6 nht resolve-via-default', frrconfig) + def test_vrf_conntrack(self): + table = '1000' + nftables_rules = { + 'vrf_zones_ct_in': ['ct original zone set iifname map @ct_iface_map'], + 'vrf_zones_ct_out': ['ct original zone set oifname map @ct_iface_map'] + } + + self.cli_set(base_path + ['name', 'blue', 'table', table]) + self.cli_commit() + + # Conntrack rules should not be present + for chain, rule in nftables_rules.items(): + self.verify_nftables_chain(rule, 'inet vrf_zones', chain, inverse=True) + + self.cli_set(['nat']) + self.cli_commit() + + # Conntrack rules should now be present + for chain, rule in nftables_rules.items(): + self.verify_nftables_chain(rule, 'inet vrf_zones', chain, inverse=False) + + self.cli_delete(['nat']) + if __name__ == '__main__': unittest.main(verbosity=2) diff --git a/src/conf_mode/system_conntrack.py b/src/conf_mode/system_conntrack.py index 7f6c71440..e075bc928 100755 --- a/src/conf_mode/system_conntrack.py +++ b/src/conf_mode/system_conntrack.py @@ -1,243 +1,247 @@ #!/usr/bin/env python3 # # Copyright (C) 2021-2023 VyOS maintainers and contributors # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License version 2 or later as # published by the Free Software Foundation. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. import os import re from sys import exit from vyos.config import Config from vyos.configdep import set_dependents, call_dependents from vyos.utils.process import process_named_running from vyos.utils.dict import dict_search from vyos.utils.dict import dict_search_args from vyos.utils.dict import dict_search_recursive from vyos.utils.process import cmd from vyos.utils.process import rc_cmd from vyos.utils.process import run from vyos.template import render from vyos import ConfigError from vyos import airbag airbag.enable() conntrack_config = r'/etc/modprobe.d/vyatta_nf_conntrack.conf' sysctl_file = r'/run/sysctl/10-vyos-conntrack.conf' nftables_ct_file = r'/run/nftables-ct.conf' # Every ALG (Application Layer Gateway) consists of either a Kernel Object # also called a Kernel Module/Driver or some rules present in iptables module_map = { 'ftp': { 'ko': ['nf_nat_ftp', 'nf_conntrack_ftp'], 'nftables': ['ct helper set "ftp_tcp" tcp dport {21} return'] }, 'h323': { 'ko': ['nf_nat_h323', 'nf_conntrack_h323'], 'nftables': ['ct helper set "ras_udp" udp dport {1719} return', 'ct helper set "q931_tcp" tcp dport {1720} return'] }, 'nfs': { 'nftables': ['ct helper set "rpc_tcp" tcp dport {111} return', 'ct helper set "rpc_udp" udp dport {111} return'] }, 'pptp': { 'ko': ['nf_nat_pptp', 'nf_conntrack_pptp'], 'nftables': ['ct helper set "pptp_tcp" tcp dport {1723} return'], 'ipv4': True }, 'sip': { 'ko': ['nf_nat_sip', 'nf_conntrack_sip'], 'nftables': ['ct helper set "sip_tcp" tcp dport {5060,5061} return', 'ct helper set "sip_udp" udp dport {5060,5061} return'] }, 'sqlnet': { 'nftables': ['ct helper set "tns_tcp" tcp dport {1521,1525,1536} return'] }, 'tftp': { 'ko': ['nf_nat_tftp', 'nf_conntrack_tftp'], 'nftables': ['ct helper set "tftp_udp" udp dport {69} return'] }, } valid_groups = [ 'address_group', 'domain_group', 'network_group', 'port_group' ] def get_config(config=None): if config: conf = config else: conf = Config() base = ['system', 'conntrack'] conntrack = conf.get_config_dict(base, key_mangling=('-', '_'), get_first_key=True, with_recursive_defaults=True) conntrack['firewall'] = conf.get_config_dict(['firewall'], key_mangling=('-', '_'), get_first_key=True, no_tag_node_value_mangle=True) conntrack['ipv4_nat_action'] = 'accept' if conf.exists(['nat']) else 'return' conntrack['ipv6_nat_action'] = 'accept' if conf.exists(['nat66']) else 'return' conntrack['wlb_action'] = 'accept' if conf.exists(['load-balancing', 'wan']) else 'return' conntrack['wlb_local_action'] = conf.exists(['load-balancing', 'wan', 'enable-local-traffic']) conntrack['module_map'] = module_map if conf.exists(['service', 'conntrack-sync']): set_dependents('conntrack_sync', conf) + # If conntrack status changes, VRF zone rules need updating + if conf.exists(['vrf']): + set_dependents('vrf', conf) + return conntrack def verify(conntrack): for inet in ['ipv4', 'ipv6']: if dict_search_args(conntrack, 'ignore', inet, 'rule') != None: for rule, rule_config in conntrack['ignore'][inet]['rule'].items(): if dict_search('destination.port', rule_config) or \ dict_search('destination.group.port_group', rule_config) or \ dict_search('source.port', rule_config) or \ dict_search('source.group.port_group', rule_config): if 'protocol' not in rule_config or rule_config['protocol'] not in ['tcp', 'udp']: raise ConfigError(f'Port requires tcp or udp as protocol in rule {rule}') tcp_flags = dict_search_args(rule_config, 'tcp', 'flags') if tcp_flags: if dict_search_args(rule_config, 'protocol') != 'tcp': raise ConfigError('Protocol must be tcp when specifying tcp flags') not_flags = dict_search_args(rule_config, 'tcp', 'flags', 'not') if not_flags: duplicates = [flag for flag in tcp_flags if flag in not_flags] if duplicates: raise ConfigError(f'Cannot match a tcp flag as set and not set') for side in ['destination', 'source']: if side in rule_config: side_conf = rule_config[side] if 'group' in side_conf: if len({'address_group', 'network_group', 'domain_group'} & set(side_conf['group'])) > 1: raise ConfigError('Only one address-group, network-group or domain-group can be specified') for group in valid_groups: if group in side_conf['group']: group_name = side_conf['group'][group] error_group = group.replace("_", "-") if group in ['address_group', 'network_group', 'domain_group']: if 'address' in side_conf: raise ConfigError(f'{error_group} and address cannot both be defined') if group_name and group_name[0] == '!': group_name = group_name[1:] if inet == 'ipv6': group = f'ipv6_{group}' group_obj = dict_search_args(conntrack['firewall'], 'group', group, group_name) if group_obj is None: raise ConfigError(f'Invalid {error_group} "{group_name}" on ignore rule') if not group_obj: Warning(f'{error_group} "{group_name}" has no members!') if dict_search_args(conntrack, 'timeout', 'custom', inet, 'rule') != None: for rule, rule_config in conntrack['timeout']['custom'][inet]['rule'].items(): if 'protocol' not in rule_config: raise ConfigError(f'Conntrack custom timeout rule {rule} requires protocol tcp or udp') else: if 'tcp' in rule_config['protocol'] and 'udp' in rule_config['protocol']: raise ConfigError(f'conntrack custom timeout rule {rule} - Cant use both tcp and udp protocol') return None def generate(conntrack): if not os.path.exists(nftables_ct_file): conntrack['first_install'] = True # Determine if conntrack is needed conntrack['ipv4_firewall_action'] = 'return' conntrack['ipv6_firewall_action'] = 'return' for rules, path in dict_search_recursive(conntrack['firewall'], 'rule'): if any(('state' in rule_conf or 'connection_status' in rule_conf or 'offload_target' in rule_conf) for rule_conf in rules.values()): if path[0] == 'ipv4': conntrack['ipv4_firewall_action'] = 'accept' elif path[0] == 'ipv6': conntrack['ipv6_firewall_action'] = 'accept' render(conntrack_config, 'conntrack/vyos_nf_conntrack.conf.j2', conntrack) render(sysctl_file, 'conntrack/sysctl.conf.j2', conntrack) render(nftables_ct_file, 'conntrack/nftables-ct.j2', conntrack) return None def apply(conntrack): # Depending on the enable/disable state of the ALG (Application Layer Gateway) # modules we need to either insmod or rmmod the helpers. add_modules = [] rm_modules = [] for module, module_config in module_map.items(): if dict_search_args(conntrack, 'modules', module) is None: if 'ko' in module_config: unloaded = [mod for mod in module_config['ko'] if os.path.exists(f'/sys/module/{mod}')] rm_modules.extend(unloaded) else: if 'ko' in module_config: add_modules.extend(module_config['ko']) # Add modules before nftables uses them if add_modules: module_str = ' '.join(add_modules) cmd(f'modprobe -a {module_str}') # Load new nftables ruleset install_result, output = rc_cmd(f'nft -f {nftables_ct_file}') if install_result == 1: raise ConfigError(f'Failed to apply configuration: {output}') # Remove modules after nftables stops using them if rm_modules: module_str = ' '.join(rm_modules) cmd(f'rmmod {module_str}') try: call_dependents() except ConfigError: # Ignore config errors on dependent due to being called too early. Example: # ConfigError("ConfigError('Interface ethN requires an IP address!')") pass # We silently ignore all errors # See: https://bugzilla.redhat.com/show_bug.cgi?id=1264080 cmd(f'sysctl -f {sysctl_file}') return None if __name__ == '__main__': try: c = get_config() verify(c) generate(c) apply(c) except ConfigError as e: print(e) exit(1) diff --git a/src/conf_mode/vrf.py b/src/conf_mode/vrf.py index a2f4956be..16908100f 100755 --- a/src/conf_mode/vrf.py +++ b/src/conf_mode/vrf.py @@ -1,319 +1,337 @@ #!/usr/bin/env python3 # # Copyright (C) 2020-2024 VyOS maintainers and contributors # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License version 2 or later as # published by the Free Software Foundation. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. import os from sys import exit from json import loads from vyos.config import Config from vyos.configdict import dict_merge from vyos.configdict import node_changed from vyos.configverify import verify_route_map +from vyos.firewall import conntrack_required from vyos.ifconfig import Interface from vyos.template import render from vyos.template import render_to_string from vyos.utils.dict import dict_search from vyos.utils.network import get_interface_config from vyos.utils.network import get_vrf_members from vyos.utils.network import interface_exists from vyos.utils.process import call from vyos.utils.process import cmd from vyos.utils.system import sysctl_write from vyos import ConfigError from vyos import frr from vyos import airbag airbag.enable() config_file = '/etc/iproute2/rt_tables.d/vyos-vrf.conf' k_mod = ['vrf'] +nftables_table = 'inet vrf_zones' +nftables_rules = { + 'vrf_zones_ct_in': 'counter ct original zone set iifname map @ct_iface_map', + 'vrf_zones_ct_out': 'counter ct original zone set oifname map @ct_iface_map' +} + def has_rule(af : str, priority : int, table : str=None): """ Check if a given ip rule exists $ ip --json -4 rule show [{'l3mdev': None, 'priority': 1000, 'src': 'all'}, {'action': 'unreachable', 'l3mdev': None, 'priority': 2000, 'src': 'all'}, {'priority': 32765, 'src': 'all', 'table': 'local'}, {'priority': 32766, 'src': 'all', 'table': 'main'}, {'priority': 32767, 'src': 'all', 'table': 'default'}] """ if af not in ['-4', '-6']: raise ValueError() command = f'ip --detail --json {af} rule show' for tmp in loads(cmd(command)): if 'priority' in tmp and 'table' in tmp: if tmp['priority'] == priority and tmp['table'] == table: return True elif 'priority' in tmp and table in tmp: # l3mdev table has a different layout if tmp['priority'] == priority: return True return False def vrf_interfaces(c, match): matched = [] old_level = c.get_level() c.set_level(['interfaces']) section = c.get_config_dict([], get_first_key=True) for type in section: interfaces = section[type] for name in interfaces: interface = interfaces[name] if 'vrf' in interface: v = interface.get('vrf', '') if v == match: matched.append(name) c.set_level(old_level) return matched def vrf_routing(c, match): matched = [] old_level = c.get_level() c.set_level(['protocols', 'vrf']) if match in c.list_nodes([]): matched.append(match) c.set_level(old_level) return matched def get_config(config=None): if config: conf = config else: conf = Config() base = ['vrf'] vrf = conf.get_config_dict(base, key_mangling=('-', '_'), no_tag_node_value_mangle=True, get_first_key=True) # determine which VRF has been removed for name in node_changed(conf, base + ['name']): if 'vrf_remove' not in vrf: vrf.update({'vrf_remove' : {}}) vrf['vrf_remove'][name] = {} # get VRF bound interfaces interfaces = vrf_interfaces(conf, name) if interfaces: vrf['vrf_remove'][name]['interface'] = interfaces # get VRF bound routing instances routes = vrf_routing(conf, name) if routes: vrf['vrf_remove'][name]['route'] = routes + if 'name' in vrf: + vrf['conntrack'] = conntrack_required(conf) + # We also need the route-map information from the config # # XXX: one MUST always call this without the key_mangling() option! See # vyos.configverify.verify_common_route_maps() for more information. tmp = {'policy' : {'route-map' : conf.get_config_dict(['policy', 'route-map'], get_first_key=True)}} # L3VNI setup is done via vrf_vni.py as it must be de-configured (on node # deletetion prior to the BGP process. Tell the Jinja2 template no VNI # setup is needed vrf.update({'no_vni' : ''}) # Merge policy dict into "regular" config dict vrf = dict_merge(tmp, vrf) return vrf def verify(vrf): # ensure VRF is not assigned to any interface if 'vrf_remove' in vrf: for name, config in vrf['vrf_remove'].items(): if 'interface' in config: raise ConfigError(f'Can not remove VRF "{name}", it still has '\ f'member interfaces!') if 'route' in config: raise ConfigError(f'Can not remove VRF "{name}", it still has '\ f'static routes installed!') if 'name' in vrf: reserved_names = ["add", "all", "broadcast", "default", "delete", "dev", "get", "inet", "mtu", "link", "type", "vrf"] table_ids = [] for name, vrf_config in vrf['name'].items(): # Reserved VRF names if name in reserved_names: raise ConfigError(f'VRF name "{name}" is reserved and connot be used!') # table id is mandatory if 'table' not in vrf_config: raise ConfigError(f'VRF "{name}" table id is mandatory!') # routing table id can't be changed - OS restriction if interface_exists(name): tmp = str(dict_search('linkinfo.info_data.table', get_interface_config(name))) if tmp and tmp != vrf_config['table']: raise ConfigError(f'VRF "{name}" table id modification not possible!') # VRF routing table ID must be unique on the system if 'table' in vrf_config and vrf_config['table'] in table_ids: raise ConfigError(f'VRF "{name}" table id is not unique!') table_ids.append(vrf_config['table']) tmp = dict_search('ip.protocol', vrf_config) if tmp != None: for protocol, protocol_options in tmp.items(): if 'route_map' in protocol_options: verify_route_map(protocol_options['route_map'], vrf) tmp = dict_search('ipv6.protocol', vrf_config) if tmp != None: for protocol, protocol_options in tmp.items(): if 'route_map' in protocol_options: verify_route_map(protocol_options['route_map'], vrf) return None def generate(vrf): # Render iproute2 VR helper names render(config_file, 'iproute2/vrf.conf.j2', vrf) # Render VRF Kernel/Zebra route-map filters vrf['frr_zebra_config'] = render_to_string('frr/zebra.vrf.route-map.frr.j2', vrf) return None def apply(vrf): # Documentation # # - https://github.com/torvalds/linux/blob/master/Documentation/networking/vrf.txt # - https://github.com/Mellanox/mlxsw/wiki/Virtual-Routing-and-Forwarding-(VRF) # - https://github.com/Mellanox/mlxsw/wiki/L3-Tunneling # - https://netdevconf.info/1.1/proceedings/slides/ahern-vrf-tutorial.pdf # - https://netdevconf.info/1.2/slides/oct6/02_ahern_what_is_l3mdev_slides.pdf # set the default VRF global behaviour bind_all = '0' if 'bind_to_all' in vrf: bind_all = '1' sysctl_write('net.ipv4.tcp_l3mdev_accept', bind_all) sysctl_write('net.ipv4.udp_l3mdev_accept', bind_all) for tmp in (dict_search('vrf_remove', vrf) or []): if interface_exists(tmp): # T5492: deleting a VRF instance may leafe processes running # (e.g. dhclient) as there is a depedency ordering issue in the CLI. # We need to ensure that we stop the dhclient processes first so # a proper DHCLP RELEASE message is sent for interface in get_vrf_members(tmp): vrf_iface = Interface(interface) vrf_iface.set_dhcp(False) vrf_iface.set_dhcpv6(False) # Remove nftables conntrack zone map item nft_del_element = f'delete element inet vrf_zones ct_iface_map {{ "{tmp}" }}' cmd(f'nft {nft_del_element}') # Delete the VRF Kernel interface call(f'ip link delete dev {tmp}') if 'name' in vrf: # Linux routing uses rules to find tables - routing targets are then # looked up in those tables. If the lookup got a matching route, the # process ends. # # TL;DR; first table with a matching entry wins! # # You can see your routing table lookup rules using "ip rule", sadly the # local lookup is hit before any VRF lookup. Pinging an addresses from the # VRF will usually find a hit in the local table, and never reach the VRF # routing table - this is usually not what you want. Thus we will # re-arrange the tables and move the local lookup further down once VRFs # are enabled. # # Thanks to https://stbuehler.de/blog/article/2020/02/29/using_vrf__virtual_routing_and_forwarding__on_linux.html for afi in ['-4', '-6']: # move lookup local to pref 32765 (from 0) if not has_rule(afi, 32765, 'local'): call(f'ip {afi} rule add pref 32765 table local') if has_rule(afi, 0, 'local'): call(f'ip {afi} rule del pref 0') # make sure that in VRFs after failed lookup in the VRF specific table # nothing else is reached if not has_rule(afi, 1000, 'l3mdev'): # this should be added by the kernel when a VRF is created # add it here for completeness call(f'ip {afi} rule add pref 1000 l3mdev protocol kernel') # add another rule with an unreachable target which only triggers in VRF context # if a route could not be reached if not has_rule(afi, 2000, 'l3mdev'): call(f'ip {afi} rule add pref 2000 l3mdev unreachable') for name, config in vrf['name'].items(): table = config['table'] if not interface_exists(name): # For each VRF apart from your default context create a VRF # interface with a separate routing table call(f'ip link add {name} type vrf table {table}') # set VRF description for e.g. SNMP monitoring vrf_if = Interface(name) # We also should add proper loopback IP addresses to the newly added # VRF for services bound to the loopback address (SNMP, NTP) vrf_if.add_addr('127.0.0.1/8') vrf_if.add_addr('::1/128') # add VRF description if available vrf_if.set_alias(config.get('description', '')) # Enable/Disable IPv4 forwarding tmp = dict_search('ip.disable_forwarding', config) value = '0' if (tmp != None) else '1' vrf_if.set_ipv4_forwarding(value) # Enable/Disable IPv6 forwarding tmp = dict_search('ipv6.disable_forwarding', config) value = '0' if (tmp != None) else '1' vrf_if.set_ipv6_forwarding(value) # Enable/Disable of an interface must always be done at the end of the # derived class to make use of the ref-counting set_admin_state() # function. We will only enable the interface if 'up' was called as # often as 'down'. This is required by some interface implementations # as certain parameters can only be changed when the interface is # in admin-down state. This ensures the link does not flap during # reconfiguration. state = 'down' if 'disable' in config else 'up' vrf_if.set_admin_state(state) # Add nftables conntrack zone map item nft_add_element = f'add element inet vrf_zones ct_iface_map {{ "{name}" : {table} }}' cmd(f'nft {nft_add_element}') + if vrf['conntrack']: + for chain, rule in nftables_rules.items(): + cmd(f'nft add rule inet vrf_zones {chain} {rule}') + + if 'name' not in vrf or not vrf['conntrack']: + for chain, rule in nftables_rules.items(): + cmd(f'nft flush chain inet vrf_zones {chain}') + # Apply FRR filters zebra_daemon = 'zebra' # Save original configuration prior to starting any commit actions frr_cfg = frr.FRRConfig() # The route-map used for the FIB (zebra) is part of the zebra daemon frr_cfg.load_configuration(zebra_daemon) frr_cfg.modify_section(f'^vrf .+', stop_pattern='^exit-vrf', remove_stop_mark=True) if 'frr_zebra_config' in vrf: frr_cfg.add_before(frr.default_add_before, vrf['frr_zebra_config']) frr_cfg.commit_configuration(zebra_daemon) return None if __name__ == '__main__': try: c = get_config() verify(c) generate(c) apply(c) except ConfigError as e: print(e) exit(1)